dnxRegistrar.c

Go to the documentation of this file.
00001 /*--------------------------------------------------------------------------
00002 
00003    Copyright (c) 2006-2007, Intellectual Reserve, Inc. All rights reserved.
00004 
00005    This program is free software; you can redistribute it and/or modify
00006    it under the terms of the GNU General Public License version 2 as
00007    published by the Free Software Foundation.
00008 
00009    This program is distributed in the hope that it will be useful,
00010    but WITHOUT ANY WARRANTY; without even the implied warranty of
00011    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00012    GNU General Public License for more details.
00013 
00014    You should have received a copy of the GNU General Public License
00015    along with this program; if not, write to the Free Software
00016    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00017 
00018   --------------------------------------------------------------------------*/
00019 
00036 #include "dnxRegistrar.h"
00037 
00038 #include "dnxNebMain.h"
00039 #include "dnxError.h"
00040 #include "dnxDebug.h"
00041 #include "dnxQueue.h"
00042 #include "dnxSleep.h"
00043 #include "dnxTransport.h"
00044 #include "dnxSrvProt.h"
00045 #include "dnxLogging.h"
00046 #include "dnxStats.h"
00047 
00048 #include <assert.h>
00049 #include <pthread.h>
00050 
00052 #define DNX_REGISTRAR_REQUEST_TIMEOUT  5
00053 
00055 typedef struct iDnxRegistrar_
00056 {
00057    DnxChannel * dispchan;  
00058    DnxQueue * rqueue;      
00059    pthread_t tid;          
00060 } iDnxRegistrar;
00061 
00062 /*--------------------------------------------------------------------------
00063                               IMPLEMENTATION
00064   --------------------------------------------------------------------------*/
00065 
00080 static DnxQueueResult dnxCompareNodeReq(void * pLeft, void * pRight)
00081 {
00082    DnxXID * pxl = &((DnxNodeRequest *)pLeft)->xid;
00083    DnxXID * pxr = &((DnxNodeRequest *)pRight)->xid;
00084 
00085    assert(pLeft && pRight);
00086 
00087    return pxl->objSerial == pxr->objSerial && pxl->objSlot == pxr->objSlot
00088          ? DNX_QRES_FOUND : DNX_QRES_CONTINUE;
00089 }
00090 
00091 //----------------------------------------------------------------------------
00092 
00105 static int dnxRegisterNode(iDnxRegistrar * ireg, DnxNodeRequest ** ppMsg)
00106 {
00107    pthread_t tid = pthread_self();
00108    DnxNodeRequest * pReq;
00109    time_t now = time(0);
00110    int ret = DNX_OK;
00111 
00112    assert(ireg && ppMsg && *ppMsg);
00113 
00114    // compute expiration time of this request
00115    pReq = *ppMsg;
00116    pReq->expires = now + pReq->ttl;
00117 
00118    dnxStatsInc(pReq->address, REQUESTS_RECEIVED);
00119 
00120    // locate existing node: update expiration time, or add to the queue
00121    if (dnxQueueFind(ireg->rqueue, (void **)&pReq, dnxCompareNodeReq) == DNX_QRES_FOUND)
00122    {
00123       pReq->expires = (*ppMsg)->expires;
00124       dnxDebug(2,
00125             "dnxRegistrar[%lx]: Updated req [%lu,%lu] at %u; expires at %u.",
00126             tid, pReq->xid.objSerial, pReq->xid.objSlot,
00127             (unsigned)(now % 1000), (unsigned)(pReq->expires % 1000));
00128    }
00129    else if ((ret = dnxQueuePut(ireg->rqueue, *ppMsg)) == DNX_OK)
00130    {
00131       *ppMsg = 0;    // we're keeping this message; return NULL
00132       dnxDebug(2,
00133             "dnxRegistrar[%lx]: Added req [%lu,%lu] at %u; expires at %u.",
00134             tid, pReq->xid.objSerial, pReq->xid.objSlot,
00135             (unsigned)(now % 1000), (unsigned)(pReq->expires % 1000));
00136    }
00137    else
00138       dnxLog("DNX Registrar: Unable to enqueue node request: %s.",
00139             dnxErrorString(ret));
00140 
00141    return ret;
00142 }
00143 
00144 //----------------------------------------------------------------------------
00145 
00156 static int dnxDeregisterNode(iDnxRegistrar * ireg, DnxNodeRequest * pMsg)
00157 {
00158    DnxNodeRequest * pReq = pMsg;
00159 
00160    assert(ireg && pMsg);
00161 
00162    if (dnxQueueRemove(ireg->rqueue, (void **)&pReq,
00163          dnxCompareNodeReq) == DNX_QRES_FOUND)
00164       xfree(pReq);      // free the dequeued DnxNodeRequest message
00165 
00166    return DNX_OK;
00167 }
00168 
00169 //----------------------------------------------------------------------------
00170 
00178 static void * dnxRegistrar(void * data)
00179 {
00180    iDnxRegistrar * ireg = (iDnxRegistrar *)data;
00181    DnxNodeRequest * pMsg = 0;
00182 
00183    assert(data);
00184 
00185    pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, 0);
00186    pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, 0);
00187 
00188    dnxLog("DNX Registrar: Awaiting worker node requests...");
00189 
00190    while (1)
00191    {
00192       int ret;
00193 
00194       // (re)allocate message block if not consumed in last pass
00195       if (pMsg == 0 && (pMsg = (DnxNodeRequest *)xmalloc(sizeof *pMsg)) == 0)
00196       {
00197          dnxCancelableSleep(10);    // sleep for a while and try again...
00198          continue;
00199       }
00200 
00201       pthread_cleanup_push(xfree, pMsg);
00202 
00203       pthread_testcancel();
00204 
00205       // wait on the dispatch socket for a request
00206       if ((ret = dnxWaitForNodeRequest(ireg->dispchan, pMsg, pMsg->address,
00207             DNX_REGISTRAR_REQUEST_TIMEOUT)) == DNX_OK)
00208       {
00209          switch (pMsg->reqType)
00210          {
00211             case DNX_REQ_REGISTER:
00212                ret = dnxRegisterNode(ireg, &pMsg);
00213                break;
00214 
00215             case DNX_REQ_DEREGISTER:
00216                ret = dnxDeregisterNode(ireg, pMsg);
00217                break;
00218 
00219             default:
00220                ret = DNX_ERR_UNSUPPORTED;
00221          }
00222       }
00223 
00224       pthread_cleanup_pop(0);
00225 
00226       if (ret != DNX_OK && ret != DNX_ERR_TIMEOUT)
00227          dnxLog("DNX Registrar: Process node request failed: %s.",
00228                dnxErrorString(ret));
00229    }
00230    return 0;
00231 }
00232 
00233 /*--------------------------------------------------------------------------
00234                                  INTERFACE
00235   --------------------------------------------------------------------------*/
00236 
00237 int dnxGetNodeRequest(DnxRegistrar * reg, DnxNodeRequest ** ppNode)
00238 {
00239    iDnxRegistrar * ireg = (iDnxRegistrar *)reg;
00240    int ret, discard_count = 0;
00241    DnxNodeRequest * node = 0;
00242 
00243    assert(reg && ppNode);
00244 
00245    while ((ret = dnxQueueGet(ireg->rqueue, (void **)&node)) == DNX_OK)
00246    {
00247       time_t now = time(0);
00248 
00249       // verify that this request's Time-To-Live (TTL) has not expired
00250       if (node->expires > now)
00251          break;
00252 
00253       // bump stats
00254       dnxStatsInc(node->address, REQUESTS_EXPIRED);
00255 
00256       dnxDebug(3,
00257             "dnxRegisterNode: Expired req [%lu,%lu] at %u; expired at %u.",
00258             node->xid.objSerial, node->xid.objSlot,
00259             (unsigned)(now % 1000), (unsigned)(node->expires % 1000));
00260 
00261       discard_count++;
00262 
00263       xfree(node);
00264       node = 0;
00265    }
00266 
00267    if (discard_count > 0)
00268       dnxDebug(1, "dnxGetNodeRequest: Discarded %d expired node requests.",
00269             discard_count);
00270 
00271    if (ret != DNX_OK)
00272    {
00273       dnxStatsInc(0, JOBS_REJECTED_NO_NODES);
00274       dnxDebug(2, "dnxGetNodeRequest: Unable to fulfill node request: %s.",
00275             dnxErrorString(ret));
00276    }
00277 
00278    *ppNode = node;   // return a node or NULL
00279 
00280    return ret;
00281 }
00282 
00283 //----------------------------------------------------------------------------
00284 
00285 int dnxRegistrarCreate(unsigned queuesz, DnxChannel * dispchan,
00286       DnxRegistrar ** preg)
00287 {
00288    iDnxRegistrar * ireg;
00289    int ret;
00290 
00291    assert(queuesz && dispchan && preg);
00292 
00293    if ((ireg = (iDnxRegistrar *)xmalloc(sizeof *ireg)) == 0)
00294       return DNX_ERR_MEMORY;
00295 
00296    memset(ireg, 0, sizeof *ireg);
00297    ireg->dispchan = dispchan;
00298 
00299    if ((ret = dnxQueueCreate(queuesz, xfree, &ireg->rqueue)) != 0)
00300    {
00301       dnxLog("DNX Registrar: Queue creation failed: %s.", dnxErrorString(ret));
00302       xfree(ireg);
00303       return ret;
00304    }
00305    if ((ret = pthread_create(&ireg->tid, 0, dnxRegistrar, ireg)) != 0)
00306    {
00307       dnxLog("DNX Registrar: Thread creation failed: %s.", strerror(ret));
00308       xfree(ireg);
00309       return DNX_ERR_THREAD;
00310    }
00311 
00312    *preg = (DnxRegistrar *)ireg;
00313 
00314    return DNX_OK;
00315 }
00316 
00317 //----------------------------------------------------------------------------
00318 
00319 void dnxRegistrarDestroy(DnxRegistrar * reg)
00320 {
00321    iDnxRegistrar * ireg = (iDnxRegistrar *)reg;
00322 
00323    assert(reg && ireg->tid);
00324 
00325    pthread_cancel(ireg->tid);
00326    pthread_join(ireg->tid, 0);
00327 
00328    dnxQueueDestroy(ireg->rqueue);
00329 
00330    xfree(ireg);
00331 }
00332 
00333 /*--------------------------------------------------------------------------
00334                                  TEST MAIN
00335 
00336    From within dnx/server, compile with GNU tools using this command line:
00337 
00338       gcc -DDEBUG -DDNX_REGISTRAR_TEST -DHAVE_NANOSLEEP -g -O0 \
00339          -lpthread -o dnxRegistrarTest -I../nagios/nagios-2.7/include \
00340          -I../common dnxRegistrar.c ../common/dnxError.c \
00341          ../common/dnxSleep.c
00342 
00343    Alternatively, a heap check may be done with the following command line:
00344 
00345       gcc -DDEBUG -DDEBUG_HEAP -DDNX_REGISTRAR_TEST -DHAVE_NANOSLEEP -g -O0 \
00346          -lpthread -o dnxRegistrarTest -I../nagios/nagios-2.7/include \
00347          -I../common dnxRegistrar.c ../common/dnxError.c \
00348          ../common/dnxSleep.c ../common/dnxHeap.c
00349 
00350    Note: Leave out -DHAVE_NANOSLEEP if your system doesn't have nanosleep.
00351 
00352   --------------------------------------------------------------------------*/
00353 
00354 #ifdef DNX_REGISTRAR_TEST
00355 
00356 #include "utesthelp.h"
00357 
00358 static int verbose;
00359 static int passes = 0;
00360 static DnxNodeRequest * test_req1;
00361 static DnxNodeRequest * test_req2;
00362 
00363 // functional stubs
00364 IMPLEMENT_DNX_DEBUG(verbose);
00365 IMPLEMENT_DNX_SYSLOG(verbose);
00366 
00367 void dnxStatsInc(char * addr, DnxStatsIndex member) {}
00368 
00369 int dnxWaitForNodeRequest(DnxChannel * channel, DnxNodeRequest * pReg,
00370       char * address, int timeout)
00371 {
00372    CHECK_TRUE(channel == (DnxChannel *)17);
00373    CHECK_TRUE(pReg != 0);
00374    CHECK_TRUE(address != 0);
00375    CHECK_TRUE(timeout == DNX_REGISTRAR_REQUEST_TIMEOUT);
00376 
00377    passes++;      // bump registrar loop pass count
00378 
00379    // complex test harness -
00380    //   pass 1: add a new registration
00381    //   pass 2: update an existing registration
00382    //   pass 3: remove an existing registration
00383 
00384    memset(pReg, 0, sizeof *pReg);
00385    pReg->ttl = 10;   // seconds - won't timeout during test
00386    if (passes < 3)
00387       pReg->reqType = DNX_REQ_REGISTER;
00388    else
00389       pReg->reqType = DNX_REQ_DEREGISTER;
00390 
00391    if (passes < 4)
00392       return 0;
00393 
00394    return DNX_ERR_TIMEOUT;
00395 }
00396 
00397 DnxQueueResult dnxQueueFind(DnxQueue * queue, void ** ppPayload,
00398       DnxQueueResult (*Compare)(void * pLeft, void * pRight))
00399 {
00400    CHECK_TRUE(queue = (DnxQueue *)37);
00401    if (passes == 1)
00402       return DNX_QRES_CONTINUE;  // pass 1: return not-found
00403    *ppPayload = test_req1;
00404    return DNX_QRES_FOUND;        // pass 2: return found
00405 }
00406 
00407 int dnxQueuePut(DnxQueue * queue, void * pPayload)
00408 {
00409    CHECK_TRUE(queue = (DnxQueue *)37);
00410    CHECK_TRUE(pPayload != 0);
00411    test_req1 = (DnxNodeRequest *)pPayload;
00412    return 0;                     // pass 1: add new registration
00413 }
00414 
00415 DnxQueueResult dnxQueueRemove(DnxQueue * queue, void ** ppPayload,
00416       DnxQueueResult (*Compare)(void * pLeft, void * pRight))
00417 {
00418    CHECK_TRUE(queue = (DnxQueue *)37);
00419    CHECK_TRUE(ppPayload != 0);
00420    CHECK_TRUE(Compare == dnxCompareNodeReq);
00421    *ppPayload = test_req1;       // pass 3: remove existing registration
00422    return DNX_QRES_FOUND;
00423 }
00424 
00425 int dnxQueueGet(DnxQueue * queue, void ** ppPayload)
00426 {
00427    CHECK_TRUE(queue = (DnxQueue *)37);
00428    CHECK_TRUE(ppPayload != 0);
00429    *ppPayload = test_req1;       // pass 4+: called from dnxGetNodeRequest
00430    return 0;
00431 }
00432 
00433 int dnxQueueCreate(unsigned maxsz, void (*pldtor)(void *), DnxQueue ** pqueue)
00434 {
00435    CHECK_TRUE(pqueue != 0);
00436    *pqueue = (DnxQueue *)37;
00437    return 0;
00438 }
00439 
00440 void dnxQueueDestroy(DnxQueue * queue)
00441 {
00442    CHECK_TRUE(queue == (DnxQueue *)37);
00443 }
00444 
00445 int main(int argc, char ** argv)
00446 {
00447    DnxRegistrar * reg;
00448    iDnxRegistrar * ireg;
00449    DnxNodeRequest * node;
00450 
00451    verbose = argc > 1 ? 1 : 0;
00452 
00453    CHECK_ZERO(dnxRegistrarCreate(5, (DnxChannel *)17, &reg));
00454 
00455    ireg = (iDnxRegistrar *)reg;
00456 
00457    CHECK_TRUE(ireg->dispchan == (DnxChannel *)17);
00458    CHECK_TRUE(ireg->rqueue == (DnxQueue *)37);
00459    CHECK_TRUE(ireg->tid != 0);
00460 
00461    while (passes < 4)
00462       dnxCancelableSleep(10);
00463 
00464    CHECK_ZERO(dnxGetNodeRequest(reg, &node));
00465    CHECK_TRUE(node == test_req1);
00466 
00467    dnxRegistrarDestroy(reg);
00468 
00469 #ifdef DEBUG_HEAP
00470    CHECK_ZERO(dnxCheckHeap());
00471 #endif
00472 
00473    return 0;
00474 }
00475 
00476 #endif   /* DNX_REGISTRAR_TEST */
00477 
00478 /*--------------------------------------------------------------------------*/
00479 

Generated on Tue Apr 13 15:48:07 2010 for DNX by  doxygen 1.5.6