dnxRegistrar.c

Go to the documentation of this file.
00001 /*--------------------------------------------------------------------------
00002 
00003    Copyright (c) 2006-2010, Intellectual Reserve, Inc. All rights reserved.
00004 
00005    This program is free software; you can redistribute it and/or modify
00006    it under the terms of the GNU General Public License version 2 as
00007    published by the Free Software Foundation.
00008 
00009    This program is distributed in the hope that it will be useful,
00010    but WITHOUT ANY WARRANTY; without even the implied warranty of
00011    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00012    GNU General Public License for more details.
00013 
00014    You should have received a copy of the GNU General Public License
00015    along with this program; if not, write to the Free Software
00016    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00017 
00018   --------------------------------------------------------------------------*/
00019 
00036 #include "dnxRegistrar.h"
00037 
00038 #include "dnxError.h"
00039 #include "dnxDebug.h"
00040 #include "dnxSleep.h"
00041 #include "dnxLogging.h"
00042 #include "dnxStats.h"
00043 
00044 #include <assert.h>
00045 #include <pthread.h>
00046 
00048 #define DNX_REGISTRAR_REQUEST_TIMEOUT  5
00049 
00051 typedef struct iDnxRegistrar_
00052 {
00053    DnxChannel * channel;      
00054    DnxQueue * rqueue;         
00055    pthread_t tid;             
00056 } iDnxRegistrar;
00057 
00058 /*--------------------------------------------------------------------------
00059                               IMPLEMENTATION
00060   --------------------------------------------------------------------------*/
00061 
00076 static DnxQueueResult dnxCompareNodeReq(void * pLeft, void * pRight)
00077 {
00078    DnxXID * pxl = &((DnxNodeRequest *)pLeft)->xid;
00079    DnxXID * pxr = &((DnxNodeRequest *)pRight)->xid;
00080 
00081    assert(pLeft && pRight);
00082 
00083    return pxl->objSerial == pxr->objSerial && pxl->objSlot == pxr->objSlot
00084          ? DNX_QRES_FOUND : DNX_QRES_CONTINUE;
00085 }
00086 
00087 //----------------------------------------------------------------------------
00088 
00101 static int dnxRegisterNode(iDnxRegistrar * ireg, DnxNodeRequest ** ppMsg)
00102 {
00103    pthread_t tid = pthread_self();
00104    DnxNodeRequest * pReq;
00105    time_t now = time(0);
00106    int ret = DNX_OK;
00107 
00108    assert(ireg && ppMsg && *ppMsg);
00109 
00110    // compute expiration time of this request
00111    pReq = *ppMsg;
00112    pReq->expires = now + pReq->ttl;
00113 
00114    dnxStatsInc(pReq->address, REQUESTS_RECEIVED);
00115 
00116    // locate existing node: update expiration time, or add to the queue
00117    if (dnxQueueFind(ireg->rqueue, (void **)&pReq, dnxCompareNodeReq) == DNX_QRES_FOUND)
00118    {
00119       pReq->expires = (*ppMsg)->expires;
00120       dnxDebug(2,
00121             "dnxRegistrar[%lx]: Updated req [%lu,%lu] at %u; expires at %u.",
00122             tid, pReq->xid.objSerial, pReq->xid.objSlot,
00123             (unsigned)(now % 1000), (unsigned)(pReq->expires % 1000));
00124    }
00125    else if ((ret = dnxQueuePut(ireg->rqueue, *ppMsg)) == DNX_OK)
00126    {
00127       *ppMsg = 0;    // we're keeping this message; return NULL
00128       dnxDebug(2,
00129             "dnxRegistrar[%lx]: Added req [%lu,%lu] at %u; expires at %u.",
00130             tid, pReq->xid.objSerial, pReq->xid.objSlot,
00131             (unsigned)(now % 1000), (unsigned)(pReq->expires % 1000));
00132    }
00133    else
00134       dnxLog("DNX Registrar: Unable to enqueue node request: %s.",
00135             dnxErrorString(ret));
00136 
00137    return ret;
00138 }
00139 
00140 //----------------------------------------------------------------------------
00141 
00152 static int dnxDeregisterNode(iDnxRegistrar * ireg, DnxNodeRequest * pMsg)
00153 {
00154    DnxNodeRequest * pReq = pMsg;
00155 
00156    assert(ireg && pMsg);
00157 
00158    if (dnxQueueRemove(ireg->rqueue, (void **)&pReq,
00159          dnxCompareNodeReq) == DNX_QRES_FOUND)
00160       xfree(pReq);      // free the dequeued DnxNodeRequest message
00161 
00162    return DNX_OK;
00163 }
00164 
00165 //----------------------------------------------------------------------------
00166 
00177 static void * dnxRegistrar(void * data)
00178 {
00179    iDnxRegistrar * ireg = (iDnxRegistrar *)data;
00180    DnxNodeRequest * pMsg = 0;
00181 
00182    assert(data);
00183 
00184    pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, 0);
00185    pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, 0);
00186 
00187    dnxLog("DNX Registrar: Awaiting worker node requests...");
00188 
00189    while (1)
00190    {
00191       int ret;
00192 
00193       // (re)allocate message block if not consumed in last pass
00194       if (pMsg == 0 && (pMsg = (DnxNodeRequest *)xmalloc(sizeof *pMsg)) == 0)
00195       {
00196          dnxCancelableSleep(10);    // sleep for a while and try again...
00197          continue;
00198       }
00199 
00200       pthread_cleanup_push(xfree, pMsg);
00201 
00202       pthread_testcancel();
00203 
00204       // wait on the registrar socket for a request
00205       if ((ret = dnxWaitForNodeRequest(ireg->channel, pMsg, pMsg->address,
00206             DNX_REGISTRAR_REQUEST_TIMEOUT)) == DNX_OK)
00207       {
00208          switch (pMsg->reqType)
00209          {
00210             case DNX_REQ_REGISTER:
00211                ret = dnxRegisterNode(ireg, &pMsg);
00212                break;
00213 
00214             case DNX_REQ_DEREGISTER:
00215                ret = dnxDeregisterNode(ireg, pMsg);
00216                break;
00217 
00218             default:
00219                ret = DNX_ERR_UNSUPPORTED;
00220          }
00221       }
00222 
00223       pthread_cleanup_pop(0);
00224 
00225       if (ret != DNX_OK && ret != DNX_ERR_TIMEOUT)
00226          dnxLog("DNX Registrar: Process node request failed: %s.",
00227                dnxErrorString(ret));
00228    }
00229    return 0;
00230 }
00231 
00232 /*--------------------------------------------------------------------------
00233                                  INTERFACE
00234   --------------------------------------------------------------------------*/
00235 
00236 int dnxGetNodeRequest(DnxRegistrar * reg, DnxNodeRequest ** ppNode)
00237 {
00238    iDnxRegistrar * ireg = (iDnxRegistrar *)reg;
00239    int ret, discard_count = 0;
00240    DnxNodeRequest * node = 0;
00241 
00242    assert(reg && ppNode);
00243 
00244    while ((ret = dnxQueueGet(ireg->rqueue, (void **)&node)) == DNX_OK)
00245    {
00246       time_t now = time(0);
00247 
00248       // verify that this request's Time-To-Live (TTL) has not expired
00249       if (node->expires > now)
00250          break;
00251 
00252       dnxStatsInc(node->address, REQUESTS_EXPIRED);
00253 
00254       dnxDebug(3, "dnxRegisterNode: Expired req [%lu,%lu] at %u; expired at %u.",
00255             node->xid.objSerial, node->xid.objSlot, (unsigned)(now % 1000), 
00256             (unsigned)(node->expires % 1000));
00257 
00258       discard_count++;
00259 
00260       xfree(node);
00261       node = 0;
00262    }
00263 
00264    if (discard_count > 0)
00265       dnxDebug(1, "dnxGetNodeRequest: Discarded %d expired node requests.",
00266             discard_count);
00267 
00268    if (ret != DNX_OK && ret != DNX_ERR_TIMEOUT)
00269    {
00270       dnxStatsInc(0, JOBS_REJECTED_NO_NODES);
00271       dnxDebug(2, "dnxGetNodeRequest: Unable to fulfill node request: %s.",
00272             dnxErrorString(ret));
00273    }
00274 
00275    *ppNode = node;   // return a node or NULL
00276 
00277    return ret;
00278 }
00279 
00280 //----------------------------------------------------------------------------
00281 
00282 int dnxRegistrarCreate(DnxChannel * chan, unsigned queuesz, 
00283       DnxRegistrar ** preg)
00284 {
00285    iDnxRegistrar * ireg;
00286    int ret;
00287 
00288    assert(chan && queuesz && preg);
00289 
00290    if ((ireg = (iDnxRegistrar *)xmalloc(sizeof *ireg)) == 0)
00291       return DNX_ERR_MEMORY;
00292 
00293    memset(ireg, 0, sizeof *ireg);
00294 
00295    ireg->channel = chan;
00296 
00297    if ((ret = dnxQueueCreate(queuesz, xfree, &ireg->rqueue)) != 0)
00298    {
00299       dnxLog("DNX Registrar: Queue creation failed: %s.", dnxErrorString(ret));
00300       xfree(ireg);
00301       return ret;
00302    }
00303 
00304    if ((ret = pthread_create(&ireg->tid, 0, dnxRegistrar, ireg)) != 0)
00305    {
00306       dnxLog("DNX Registrar: Thread creation failed: %s.", strerror(ret));
00307       dnxQueueDestroy(ireg->rqueue);
00308       xfree(ireg);
00309       return DNX_ERR_THREAD;
00310    }
00311 
00312    *preg = (DnxRegistrar *)ireg;
00313    return DNX_OK;
00314 }
00315 
00316 //----------------------------------------------------------------------------
00317 
00318 void dnxRegistrarDestroy(DnxRegistrar * reg)
00319 {
00320    iDnxRegistrar * ireg = (iDnxRegistrar *)reg;
00321 
00322    assert(reg && ireg->tid);
00323 
00324    pthread_cancel(ireg->tid);
00325    pthread_join(ireg->tid, 0);
00326 
00327    dnxQueueDestroy(ireg->rqueue);
00328 
00329    xfree(ireg);
00330 }
00331 
00332 /*--------------------------------------------------------------------------
00333                                  TEST MAIN
00334 
00335    From within dnx/server, compile with GNU tools using this command line:
00336 
00337       gcc -DDEBUG -DDNX_REGISTRAR_TEST -DHAVE_NANOSLEEP -g -O0 \
00338          -lpthread -o dnxRegistrarTest -I../nagios/nagios-2.7/include \
00339          -I../common dnxRegistrar.c ../common/dnxError.c \
00340          ../common/dnxSleep.c
00341 
00342    Alternatively, a heap check may be done with the following command line:
00343 
00344       gcc -DDEBUG -DDEBUG_HEAP -DDNX_REGISTRAR_TEST -DHAVE_NANOSLEEP -g -O0 \
00345          -lpthread -o dnxRegistrarTest -I../nagios/nagios-2.7/include \
00346          -I../common dnxRegistrar.c ../common/dnxError.c \
00347          ../common/dnxSleep.c ../common/dnxHeap.c
00348 
00349    Note: Leave out -DHAVE_NANOSLEEP if your system doesn't have nanosleep.
00350 
00351   --------------------------------------------------------------------------*/
00352 
00353 #ifdef DNX_REGISTRAR_TEST
00354 
00355 #include "utesthelp.h"
00356 
00357 static int verbose;
00358 static int passes = 0;
00359 static DnxNodeRequest * test_req1;
00360 static DnxNodeRequest * test_req2;
00361 
00362 // functional stubs
00363 IMPLEMENT_DNX_DEBUG(verbose);
00364 IMPLEMENT_DNX_SYSLOG(verbose);
00365 
00366 int dnxWaitForNodeRequest(DnxChannel * channel, DnxNodeRequest * pReg,
00367       char * address, int timeout)
00368 {
00369    CHECK_TRUE(channel != 0);
00370    CHECK_TRUE(pReg != 0);
00371    CHECK_TRUE(address != 0);
00372    CHECK_TRUE(timeout == DNX_REGISTRAR_REQUEST_TIMEOUT);
00373 
00374    passes++;      // bump registrar loop pass count
00375 
00376    // complex test harness -
00377    //   pass 1: add a new registration
00378    //   pass 2: update an existing registration
00379    //   pass 3: remove an existing registration
00380 
00381    memset(pReg, 0, sizeof *pReg);
00382    pReg->ttl = 10;   // seconds - won't timeout during test
00383    if (passes < 3)
00384       pReg->reqType = DNX_REQ_REGISTER;
00385    else
00386       pReg->reqType = DNX_REQ_DEREGISTER;
00387 
00388    if (passes < 4)
00389       return 0;
00390 
00391    return DNX_ERR_TIMEOUT;
00392 }
00393 
00394 DnxQueueResult dnxQueueFind(DnxQueue * queue, void ** ppPayload,
00395       DnxQueueResult (*Compare)(void * pLeft, void * pRight))
00396 {
00397    CHECK_TRUE(queue = (DnxQueue *)37);
00398    if (passes == 1)
00399       return DNX_QRES_CONTINUE;  // pass 1: return not-found
00400    *ppPayload = test_req1;
00401    return DNX_QRES_FOUND;        // pass 2: return found
00402 }
00403 
00404 int dnxQueuePut(DnxQueue * queue, void * pPayload)
00405 {
00406    CHECK_TRUE(queue = (DnxQueue *)37);
00407    CHECK_TRUE(pPayload != 0);
00408    test_req1 = (DnxNodeRequest *)pPayload;
00409    return 0;                     // pass 1: add new registration
00410 }
00411 
00412 DnxQueueResult dnxQueueRemove(DnxQueue * queue, void ** ppPayload,
00413       DnxQueueResult (*Compare)(void * pLeft, void * pRight))
00414 {
00415    CHECK_TRUE(queue = (DnxQueue *)37);
00416    CHECK_TRUE(ppPayload != 0);
00417    CHECK_TRUE(Compare == dnxCompareNodeReq);
00418    *ppPayload = test_req1;       // pass 3: remove existing registration
00419    return DNX_QRES_FOUND;
00420 }
00421 
00422 int dnxQueueGet(DnxQueue * queue, void ** ppPayload)
00423 {
00424    CHECK_TRUE(queue = (DnxQueue *)37);
00425    CHECK_TRUE(ppPayload != 0);
00426    *ppPayload = test_req1;       // pass 4+: called from dnxGetNodeRequest
00427    return 0;
00428 }
00429 
00430 int dnxQueueGetWait(DnxQueue * queue, unsigned timeout, void ** ppPayload)
00431 {
00432    CHECK_TRUE(queue = (DnxQueue *)37);
00433    CHECK_TRUE(ppPayload != 0);
00434    *ppPayload = test_req1;       // pass 4+: called from dnxGetNodeRequest
00435    return 0;
00436 }
00437 
00438 int dnxQueueCreate(unsigned maxsz, void (*pldtor)(void *), DnxQueue ** pqueue)
00439 {
00440    CHECK_TRUE(pqueue != 0);
00441    *pqueue = (DnxQueue *)37;
00442    return 0;
00443 }
00444 
00445 void dnxQueueDestroy(DnxQueue * queue)
00446 {
00447    CHECK_TRUE(queue == (DnxQueue *)37);
00448 }
00449 
00450 void dnxStatsInc(char * addr, DnxStatsIndex member) {}
00451 
00452 int main(int argc, char ** argv)
00453 {
00454    DnxRegistrar * reg;
00455    iDnxRegistrar * ireg;
00456    DnxNodeRequest * node;
00457 
00458    verbose = argc > 1 ? 1 : 0;
00459 
00460    CHECK_ZERO(dnxRegistrarCreate((DnxChannel*)17, 5, &reg));
00461 
00462    ireg = (iDnxRegistrar *)reg;
00463 
00464    CHECK_TRUE(ireg->channel == (DnxChannel*)17);
00465    CHECK_TRUE(ireg->rqueue != 0);
00466    CHECK_TRUE(ireg->tid != 0);
00467 
00468    while (passes < 4)
00469       dnxCancelableSleep(10);
00470 
00471    CHECK_ZERO(dnxGetNodeRequest(reg, &node));
00472    CHECK_TRUE(node == test_req1);
00473 
00474    dnxRegistrarDestroy(reg);
00475 
00476 #ifdef DEBUG_HEAP
00477    CHECK_ZERO(dnxCheckHeap());
00478 #endif
00479 
00480    return 0;
00481 }
00482 
00483 #endif   /* DNX_REGISTRAR_TEST */
00484 
00485 /*--------------------------------------------------------------------------*/
00486 

Generated on Tue Apr 13 15:48:07 2010 for DNX by  doxygen 1.5.6