00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00036 #include "dnxRegistrar.h"
00037
00038 #include "dnxNebMain.h"
00039 #include "dnxError.h"
00040 #include "dnxDebug.h"
00041 #include "dnxQueue.h"
00042 #include "dnxSleep.h"
00043 #include "dnxTransport.h"
00044 #include "dnxSrvProt.h"
00045 #include "dnxLogging.h"
00046 #include "dnxStats.h"
00047
00048 #include <assert.h>
00049 #include <pthread.h>
00050
00052 #define DNX_REGISTRAR_REQUEST_TIMEOUT 5
00053
00055 typedef struct iDnxRegistrar_
00056 {
00057 DnxChannel * dispchan;
00058 DnxQueue * rqueue;
00059 pthread_t tid;
00060 } iDnxRegistrar;
00061
00062
00063
00064
00065
00080 static DnxQueueResult dnxCompareNodeReq(void * pLeft, void * pRight)
00081 {
00082 DnxXID * pxl = &((DnxNodeRequest *)pLeft)->xid;
00083 DnxXID * pxr = &((DnxNodeRequest *)pRight)->xid;
00084
00085 assert(pLeft && pRight);
00086
00087 return pxl->objSerial == pxr->objSerial && pxl->objSlot == pxr->objSlot
00088 ? DNX_QRES_FOUND : DNX_QRES_CONTINUE;
00089 }
00090
00091
00092
00105 static int dnxRegisterNode(iDnxRegistrar * ireg, DnxNodeRequest ** ppMsg)
00106 {
00107 pthread_t tid = pthread_self();
00108 DnxNodeRequest * pReq;
00109 time_t now = time(0);
00110 int ret = DNX_OK;
00111
00112 assert(ireg && ppMsg && *ppMsg);
00113
00114
00115 pReq = *ppMsg;
00116 pReq->expires = now + pReq->ttl;
00117
00118 dnxStatsInc(pReq->address, REQUESTS_RECEIVED);
00119
00120
00121 if (dnxQueueFind(ireg->rqueue, (void **)&pReq, dnxCompareNodeReq) == DNX_QRES_FOUND)
00122 {
00123 pReq->expires = (*ppMsg)->expires;
00124 dnxDebug(2,
00125 "dnxRegistrar[%lx]: Updated req [%lu,%lu] at %u; expires at %u.",
00126 tid, pReq->xid.objSerial, pReq->xid.objSlot,
00127 (unsigned)(now % 1000), (unsigned)(pReq->expires % 1000));
00128 }
00129 else if ((ret = dnxQueuePut(ireg->rqueue, *ppMsg)) == DNX_OK)
00130 {
00131 *ppMsg = 0;
00132 dnxDebug(2,
00133 "dnxRegistrar[%lx]: Added req [%lu,%lu] at %u; expires at %u.",
00134 tid, pReq->xid.objSerial, pReq->xid.objSlot,
00135 (unsigned)(now % 1000), (unsigned)(pReq->expires % 1000));
00136 }
00137 else
00138 dnxLog("DNX Registrar: Unable to enqueue node request: %s.",
00139 dnxErrorString(ret));
00140
00141 return ret;
00142 }
00143
00144
00145
00156 static int dnxDeregisterNode(iDnxRegistrar * ireg, DnxNodeRequest * pMsg)
00157 {
00158 DnxNodeRequest * pReq = pMsg;
00159
00160 assert(ireg && pMsg);
00161
00162 if (dnxQueueRemove(ireg->rqueue, (void **)&pReq,
00163 dnxCompareNodeReq) == DNX_QRES_FOUND)
00164 xfree(pReq);
00165
00166 return DNX_OK;
00167 }
00168
00169
00170
00178 static void * dnxRegistrar(void * data)
00179 {
00180 iDnxRegistrar * ireg = (iDnxRegistrar *)data;
00181 DnxNodeRequest * pMsg = 0;
00182
00183 assert(data);
00184
00185 pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, 0);
00186 pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, 0);
00187
00188 dnxLog("DNX Registrar: Awaiting worker node requests...");
00189
00190 while (1)
00191 {
00192 int ret;
00193
00194
00195 if (pMsg == 0 && (pMsg = (DnxNodeRequest *)xmalloc(sizeof *pMsg)) == 0)
00196 {
00197 dnxCancelableSleep(10);
00198 continue;
00199 }
00200
00201 pthread_cleanup_push(xfree, pMsg);
00202
00203 pthread_testcancel();
00204
00205
00206 if ((ret = dnxWaitForNodeRequest(ireg->dispchan, pMsg, pMsg->address,
00207 DNX_REGISTRAR_REQUEST_TIMEOUT)) == DNX_OK)
00208 {
00209 switch (pMsg->reqType)
00210 {
00211 case DNX_REQ_REGISTER:
00212 ret = dnxRegisterNode(ireg, &pMsg);
00213 break;
00214
00215 case DNX_REQ_DEREGISTER:
00216 ret = dnxDeregisterNode(ireg, pMsg);
00217 break;
00218
00219 default:
00220 ret = DNX_ERR_UNSUPPORTED;
00221 }
00222 }
00223
00224 pthread_cleanup_pop(0);
00225
00226 if (ret != DNX_OK && ret != DNX_ERR_TIMEOUT)
00227 dnxLog("DNX Registrar: Process node request failed: %s.",
00228 dnxErrorString(ret));
00229 }
00230 return 0;
00231 }
00232
00233
00234
00235
00236
00237 int dnxGetNodeRequest(DnxRegistrar * reg, DnxNodeRequest ** ppNode)
00238 {
00239 iDnxRegistrar * ireg = (iDnxRegistrar *)reg;
00240 int ret, discard_count = 0;
00241 DnxNodeRequest * node = 0;
00242
00243 assert(reg && ppNode);
00244
00245 while ((ret = dnxQueueGet(ireg->rqueue, (void **)&node)) == DNX_OK)
00246 {
00247 time_t now = time(0);
00248
00249
00250 if (node->expires > now)
00251 break;
00252
00253
00254 dnxStatsInc(node->address, REQUESTS_EXPIRED);
00255
00256 dnxDebug(3,
00257 "dnxRegisterNode: Expired req [%lu,%lu] at %u; expired at %u.",
00258 node->xid.objSerial, node->xid.objSlot,
00259 (unsigned)(now % 1000), (unsigned)(node->expires % 1000));
00260
00261 discard_count++;
00262
00263 xfree(node);
00264 node = 0;
00265 }
00266
00267 if (discard_count > 0)
00268 dnxDebug(1, "dnxGetNodeRequest: Discarded %d expired node requests.",
00269 discard_count);
00270
00271 if (ret != DNX_OK)
00272 {
00273 dnxStatsInc(0, JOBS_REJECTED_NO_NODES);
00274 dnxDebug(2, "dnxGetNodeRequest: Unable to fulfill node request: %s.",
00275 dnxErrorString(ret));
00276 }
00277
00278 *ppNode = node;
00279
00280 return ret;
00281 }
00282
00283
00284
00285 int dnxRegistrarCreate(unsigned queuesz, DnxChannel * dispchan,
00286 DnxRegistrar ** preg)
00287 {
00288 iDnxRegistrar * ireg;
00289 int ret;
00290
00291 assert(queuesz && dispchan && preg);
00292
00293 if ((ireg = (iDnxRegistrar *)xmalloc(sizeof *ireg)) == 0)
00294 return DNX_ERR_MEMORY;
00295
00296 memset(ireg, 0, sizeof *ireg);
00297 ireg->dispchan = dispchan;
00298
00299 if ((ret = dnxQueueCreate(queuesz, xfree, &ireg->rqueue)) != 0)
00300 {
00301 dnxLog("DNX Registrar: Queue creation failed: %s.", dnxErrorString(ret));
00302 xfree(ireg);
00303 return ret;
00304 }
00305 if ((ret = pthread_create(&ireg->tid, 0, dnxRegistrar, ireg)) != 0)
00306 {
00307 dnxLog("DNX Registrar: Thread creation failed: %s.", strerror(ret));
00308 xfree(ireg);
00309 return DNX_ERR_THREAD;
00310 }
00311
00312 *preg = (DnxRegistrar *)ireg;
00313
00314 return DNX_OK;
00315 }
00316
00317
00318
00319 void dnxRegistrarDestroy(DnxRegistrar * reg)
00320 {
00321 iDnxRegistrar * ireg = (iDnxRegistrar *)reg;
00322
00323 assert(reg && ireg->tid);
00324
00325 pthread_cancel(ireg->tid);
00326 pthread_join(ireg->tid, 0);
00327
00328 dnxQueueDestroy(ireg->rqueue);
00329
00330 xfree(ireg);
00331 }
00332
00333
00334
00335
00336
00337
00338
00339
00340
00341
00342
00343
00344
00345
00346
00347
00348
00349
00350
00351
00352
00353
00354 #ifdef DNX_REGISTRAR_TEST
00355
00356 #include "utesthelp.h"
00357
00358 static int verbose;
00359 static int passes = 0;
00360 static DnxNodeRequest * test_req1;
00361 static DnxNodeRequest * test_req2;
00362
00363
00364 IMPLEMENT_DNX_DEBUG(verbose);
00365 IMPLEMENT_DNX_SYSLOG(verbose);
00366
00367 void dnxStatsInc(char * addr, DnxStatsIndex member) {}
00368
00369 int dnxWaitForNodeRequest(DnxChannel * channel, DnxNodeRequest * pReg,
00370 char * address, int timeout)
00371 {
00372 CHECK_TRUE(channel == (DnxChannel *)17);
00373 CHECK_TRUE(pReg != 0);
00374 CHECK_TRUE(address != 0);
00375 CHECK_TRUE(timeout == DNX_REGISTRAR_REQUEST_TIMEOUT);
00376
00377 passes++;
00378
00379
00380
00381
00382
00383
00384 memset(pReg, 0, sizeof *pReg);
00385 pReg->ttl = 10;
00386 if (passes < 3)
00387 pReg->reqType = DNX_REQ_REGISTER;
00388 else
00389 pReg->reqType = DNX_REQ_DEREGISTER;
00390
00391 if (passes < 4)
00392 return 0;
00393
00394 return DNX_ERR_TIMEOUT;
00395 }
00396
00397 DnxQueueResult dnxQueueFind(DnxQueue * queue, void ** ppPayload,
00398 DnxQueueResult (*Compare)(void * pLeft, void * pRight))
00399 {
00400 CHECK_TRUE(queue = (DnxQueue *)37);
00401 if (passes == 1)
00402 return DNX_QRES_CONTINUE;
00403 *ppPayload = test_req1;
00404 return DNX_QRES_FOUND;
00405 }
00406
00407 int dnxQueuePut(DnxQueue * queue, void * pPayload)
00408 {
00409 CHECK_TRUE(queue = (DnxQueue *)37);
00410 CHECK_TRUE(pPayload != 0);
00411 test_req1 = (DnxNodeRequest *)pPayload;
00412 return 0;
00413 }
00414
00415 DnxQueueResult dnxQueueRemove(DnxQueue * queue, void ** ppPayload,
00416 DnxQueueResult (*Compare)(void * pLeft, void * pRight))
00417 {
00418 CHECK_TRUE(queue = (DnxQueue *)37);
00419 CHECK_TRUE(ppPayload != 0);
00420 CHECK_TRUE(Compare == dnxCompareNodeReq);
00421 *ppPayload = test_req1;
00422 return DNX_QRES_FOUND;
00423 }
00424
00425 int dnxQueueGet(DnxQueue * queue, void ** ppPayload)
00426 {
00427 CHECK_TRUE(queue = (DnxQueue *)37);
00428 CHECK_TRUE(ppPayload != 0);
00429 *ppPayload = test_req1;
00430 return 0;
00431 }
00432
00433 int dnxQueueCreate(unsigned maxsz, void (*pldtor)(void *), DnxQueue ** pqueue)
00434 {
00435 CHECK_TRUE(pqueue != 0);
00436 *pqueue = (DnxQueue *)37;
00437 return 0;
00438 }
00439
00440 void dnxQueueDestroy(DnxQueue * queue)
00441 {
00442 CHECK_TRUE(queue == (DnxQueue *)37);
00443 }
00444
00445 int main(int argc, char ** argv)
00446 {
00447 DnxRegistrar * reg;
00448 iDnxRegistrar * ireg;
00449 DnxNodeRequest * node;
00450
00451 verbose = argc > 1 ? 1 : 0;
00452
00453 CHECK_ZERO(dnxRegistrarCreate(5, (DnxChannel *)17, ®));
00454
00455 ireg = (iDnxRegistrar *)reg;
00456
00457 CHECK_TRUE(ireg->dispchan == (DnxChannel *)17);
00458 CHECK_TRUE(ireg->rqueue == (DnxQueue *)37);
00459 CHECK_TRUE(ireg->tid != 0);
00460
00461 while (passes < 4)
00462 dnxCancelableSleep(10);
00463
00464 CHECK_ZERO(dnxGetNodeRequest(reg, &node));
00465 CHECK_TRUE(node == test_req1);
00466
00467 dnxRegistrarDestroy(reg);
00468
00469 #ifdef DEBUG_HEAP
00470 CHECK_ZERO(dnxCheckHeap());
00471 #endif
00472
00473 return 0;
00474 }
00475
00476 #endif
00477
00478
00479