00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00036 #include "dnxRegistrar.h"
00037
00038 #include "dnxError.h"
00039 #include "dnxDebug.h"
00040 #include "dnxSleep.h"
00041 #include "dnxLogging.h"
00042 #include "dnxStats.h"
00043
00044 #include <assert.h>
00045 #include <pthread.h>
00046
00048 #define DNX_REGISTRAR_REQUEST_TIMEOUT 5
00049
00051 typedef struct iDnxRegistrar_
00052 {
00053 DnxChannel * channel;
00054 DnxQueue * rqueue;
00055 pthread_t tid;
00056 } iDnxRegistrar;
00057
00058
00059
00060
00061
00076 static DnxQueueResult dnxCompareNodeReq(void * pLeft, void * pRight)
00077 {
00078 DnxXID * pxl = &((DnxNodeRequest *)pLeft)->xid;
00079 DnxXID * pxr = &((DnxNodeRequest *)pRight)->xid;
00080
00081 assert(pLeft && pRight);
00082
00083 return pxl->objSerial == pxr->objSerial && pxl->objSlot == pxr->objSlot
00084 ? DNX_QRES_FOUND : DNX_QRES_CONTINUE;
00085 }
00086
00087
00088
00101 static int dnxRegisterNode(iDnxRegistrar * ireg, DnxNodeRequest ** ppMsg)
00102 {
00103 pthread_t tid = pthread_self();
00104 DnxNodeRequest * pReq;
00105 time_t now = time(0);
00106 int ret = DNX_OK;
00107
00108 assert(ireg && ppMsg && *ppMsg);
00109
00110
00111 pReq = *ppMsg;
00112 pReq->expires = now + pReq->ttl;
00113
00114 dnxStatsInc(pReq->address, REQUESTS_RECEIVED);
00115
00116
00117 if (dnxQueueFind(ireg->rqueue, (void **)&pReq, dnxCompareNodeReq) == DNX_QRES_FOUND)
00118 {
00119 pReq->expires = (*ppMsg)->expires;
00120 dnxDebug(2,
00121 "dnxRegistrar[%lx]: Updated req [%lu,%lu] at %u; expires at %u.",
00122 tid, pReq->xid.objSerial, pReq->xid.objSlot,
00123 (unsigned)(now % 1000), (unsigned)(pReq->expires % 1000));
00124 }
00125 else if ((ret = dnxQueuePut(ireg->rqueue, *ppMsg)) == DNX_OK)
00126 {
00127 *ppMsg = 0;
00128 dnxDebug(2,
00129 "dnxRegistrar[%lx]: Added req [%lu,%lu] at %u; expires at %u.",
00130 tid, pReq->xid.objSerial, pReq->xid.objSlot,
00131 (unsigned)(now % 1000), (unsigned)(pReq->expires % 1000));
00132 }
00133 else
00134 dnxLog("DNX Registrar: Unable to enqueue node request: %s.",
00135 dnxErrorString(ret));
00136
00137 return ret;
00138 }
00139
00140
00141
00152 static int dnxDeregisterNode(iDnxRegistrar * ireg, DnxNodeRequest * pMsg)
00153 {
00154 DnxNodeRequest * pReq = pMsg;
00155
00156 assert(ireg && pMsg);
00157
00158 if (dnxQueueRemove(ireg->rqueue, (void **)&pReq,
00159 dnxCompareNodeReq) == DNX_QRES_FOUND)
00160 xfree(pReq);
00161
00162 return DNX_OK;
00163 }
00164
00165
00166
00177 static void * dnxRegistrar(void * data)
00178 {
00179 iDnxRegistrar * ireg = (iDnxRegistrar *)data;
00180 DnxNodeRequest * pMsg = 0;
00181
00182 assert(data);
00183
00184 pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, 0);
00185 pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, 0);
00186
00187 dnxLog("DNX Registrar: Awaiting worker node requests...");
00188
00189 while (1)
00190 {
00191 int ret;
00192
00193
00194 if (pMsg == 0 && (pMsg = (DnxNodeRequest *)xmalloc(sizeof *pMsg)) == 0)
00195 {
00196 dnxCancelableSleep(10);
00197 continue;
00198 }
00199
00200 pthread_cleanup_push(xfree, pMsg);
00201
00202 pthread_testcancel();
00203
00204
00205 if ((ret = dnxWaitForNodeRequest(ireg->channel, pMsg, pMsg->address,
00206 DNX_REGISTRAR_REQUEST_TIMEOUT)) == DNX_OK)
00207 {
00208 switch (pMsg->reqType)
00209 {
00210 case DNX_REQ_REGISTER:
00211 ret = dnxRegisterNode(ireg, &pMsg);
00212 break;
00213
00214 case DNX_REQ_DEREGISTER:
00215 ret = dnxDeregisterNode(ireg, pMsg);
00216 break;
00217
00218 default:
00219 ret = DNX_ERR_UNSUPPORTED;
00220 }
00221 }
00222
00223 pthread_cleanup_pop(0);
00224
00225 if (ret != DNX_OK && ret != DNX_ERR_TIMEOUT)
00226 dnxLog("DNX Registrar: Process node request failed: %s.",
00227 dnxErrorString(ret));
00228 }
00229 return 0;
00230 }
00231
00232
00233
00234
00235
00236 int dnxGetNodeRequest(DnxRegistrar * reg, DnxNodeRequest ** ppNode)
00237 {
00238 iDnxRegistrar * ireg = (iDnxRegistrar *)reg;
00239 int ret, discard_count = 0;
00240 DnxNodeRequest * node = 0;
00241
00242 assert(reg && ppNode);
00243
00244 while ((ret = dnxQueueGet(ireg->rqueue, (void **)&node)) == DNX_OK)
00245 {
00246 time_t now = time(0);
00247
00248
00249 if (node->expires > now)
00250 break;
00251
00252 dnxStatsInc(node->address, REQUESTS_EXPIRED);
00253
00254 dnxDebug(3, "dnxRegisterNode: Expired req [%lu,%lu] at %u; expired at %u.",
00255 node->xid.objSerial, node->xid.objSlot, (unsigned)(now % 1000),
00256 (unsigned)(node->expires % 1000));
00257
00258 discard_count++;
00259
00260 xfree(node);
00261 node = 0;
00262 }
00263
00264 if (discard_count > 0)
00265 dnxDebug(1, "dnxGetNodeRequest: Discarded %d expired node requests.",
00266 discard_count);
00267
00268 if (ret != DNX_OK && ret != DNX_ERR_TIMEOUT)
00269 {
00270 dnxStatsInc(0, JOBS_REJECTED_NO_NODES);
00271 dnxDebug(2, "dnxGetNodeRequest: Unable to fulfill node request: %s.",
00272 dnxErrorString(ret));
00273 }
00274
00275 *ppNode = node;
00276
00277 return ret;
00278 }
00279
00280
00281
00282 int dnxRegistrarCreate(DnxChannel * chan, unsigned queuesz,
00283 DnxRegistrar ** preg)
00284 {
00285 iDnxRegistrar * ireg;
00286 int ret;
00287
00288 assert(chan && queuesz && preg);
00289
00290 if ((ireg = (iDnxRegistrar *)xmalloc(sizeof *ireg)) == 0)
00291 return DNX_ERR_MEMORY;
00292
00293 memset(ireg, 0, sizeof *ireg);
00294
00295 ireg->channel = chan;
00296
00297 if ((ret = dnxQueueCreate(queuesz, xfree, &ireg->rqueue)) != 0)
00298 {
00299 dnxLog("DNX Registrar: Queue creation failed: %s.", dnxErrorString(ret));
00300 xfree(ireg);
00301 return ret;
00302 }
00303
00304 if ((ret = pthread_create(&ireg->tid, 0, dnxRegistrar, ireg)) != 0)
00305 {
00306 dnxLog("DNX Registrar: Thread creation failed: %s.", strerror(ret));
00307 dnxQueueDestroy(ireg->rqueue);
00308 xfree(ireg);
00309 return DNX_ERR_THREAD;
00310 }
00311
00312 *preg = (DnxRegistrar *)ireg;
00313 return DNX_OK;
00314 }
00315
00316
00317
00318 void dnxRegistrarDestroy(DnxRegistrar * reg)
00319 {
00320 iDnxRegistrar * ireg = (iDnxRegistrar *)reg;
00321
00322 assert(reg && ireg->tid);
00323
00324 pthread_cancel(ireg->tid);
00325 pthread_join(ireg->tid, 0);
00326
00327 dnxQueueDestroy(ireg->rqueue);
00328
00329 xfree(ireg);
00330 }
00331
00332
00333
00334
00335
00336
00337
00338
00339
00340
00341
00342
00343
00344
00345
00346
00347
00348
00349
00350
00351
00352
00353 #ifdef DNX_REGISTRAR_TEST
00354
00355 #include "utesthelp.h"
00356
00357 static int verbose;
00358 static int passes = 0;
00359 static DnxNodeRequest * test_req1;
00360 static DnxNodeRequest * test_req2;
00361
00362
00363 IMPLEMENT_DNX_DEBUG(verbose);
00364 IMPLEMENT_DNX_SYSLOG(verbose);
00365
00366 int dnxWaitForNodeRequest(DnxChannel * channel, DnxNodeRequest * pReg,
00367 char * address, int timeout)
00368 {
00369 CHECK_TRUE(channel != 0);
00370 CHECK_TRUE(pReg != 0);
00371 CHECK_TRUE(address != 0);
00372 CHECK_TRUE(timeout == DNX_REGISTRAR_REQUEST_TIMEOUT);
00373
00374 passes++;
00375
00376
00377
00378
00379
00380
00381 memset(pReg, 0, sizeof *pReg);
00382 pReg->ttl = 10;
00383 if (passes < 3)
00384 pReg->reqType = DNX_REQ_REGISTER;
00385 else
00386 pReg->reqType = DNX_REQ_DEREGISTER;
00387
00388 if (passes < 4)
00389 return 0;
00390
00391 return DNX_ERR_TIMEOUT;
00392 }
00393
00394 DnxQueueResult dnxQueueFind(DnxQueue * queue, void ** ppPayload,
00395 DnxQueueResult (*Compare)(void * pLeft, void * pRight))
00396 {
00397 CHECK_TRUE(queue = (DnxQueue *)37);
00398 if (passes == 1)
00399 return DNX_QRES_CONTINUE;
00400 *ppPayload = test_req1;
00401 return DNX_QRES_FOUND;
00402 }
00403
00404 int dnxQueuePut(DnxQueue * queue, void * pPayload)
00405 {
00406 CHECK_TRUE(queue = (DnxQueue *)37);
00407 CHECK_TRUE(pPayload != 0);
00408 test_req1 = (DnxNodeRequest *)pPayload;
00409 return 0;
00410 }
00411
00412 DnxQueueResult dnxQueueRemove(DnxQueue * queue, void ** ppPayload,
00413 DnxQueueResult (*Compare)(void * pLeft, void * pRight))
00414 {
00415 CHECK_TRUE(queue = (DnxQueue *)37);
00416 CHECK_TRUE(ppPayload != 0);
00417 CHECK_TRUE(Compare == dnxCompareNodeReq);
00418 *ppPayload = test_req1;
00419 return DNX_QRES_FOUND;
00420 }
00421
00422 int dnxQueueGet(DnxQueue * queue, void ** ppPayload)
00423 {
00424 CHECK_TRUE(queue = (DnxQueue *)37);
00425 CHECK_TRUE(ppPayload != 0);
00426 *ppPayload = test_req1;
00427 return 0;
00428 }
00429
00430 int dnxQueueGetWait(DnxQueue * queue, unsigned timeout, void ** ppPayload)
00431 {
00432 CHECK_TRUE(queue = (DnxQueue *)37);
00433 CHECK_TRUE(ppPayload != 0);
00434 *ppPayload = test_req1;
00435 return 0;
00436 }
00437
00438 int dnxQueueCreate(unsigned maxsz, void (*pldtor)(void *), DnxQueue ** pqueue)
00439 {
00440 CHECK_TRUE(pqueue != 0);
00441 *pqueue = (DnxQueue *)37;
00442 return 0;
00443 }
00444
00445 void dnxQueueDestroy(DnxQueue * queue)
00446 {
00447 CHECK_TRUE(queue == (DnxQueue *)37);
00448 }
00449
00450 void dnxStatsInc(char * addr, DnxStatsIndex member) {}
00451
00452 int main(int argc, char ** argv)
00453 {
00454 DnxRegistrar * reg;
00455 iDnxRegistrar * ireg;
00456 DnxNodeRequest * node;
00457
00458 verbose = argc > 1 ? 1 : 0;
00459
00460 CHECK_ZERO(dnxRegistrarCreate((DnxChannel*)17, 5, ®));
00461
00462 ireg = (iDnxRegistrar *)reg;
00463
00464 CHECK_TRUE(ireg->channel == (DnxChannel*)17);
00465 CHECK_TRUE(ireg->rqueue != 0);
00466 CHECK_TRUE(ireg->tid != 0);
00467
00468 while (passes < 4)
00469 dnxCancelableSleep(10);
00470
00471 CHECK_ZERO(dnxGetNodeRequest(reg, &node));
00472 CHECK_TRUE(node == test_req1);
00473
00474 dnxRegistrarDestroy(reg);
00475
00476 #ifdef DEBUG_HEAP
00477 CHECK_ZERO(dnxCheckHeap());
00478 #endif
00479
00480 return 0;
00481 }
00482
00483 #endif
00484
00485
00486