00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00028 #include "dnxJobList.h"
00029
00030 #include "dnxError.h"
00031 #include "dnxDebug.h"
00032 #include "dnxLogging.h"
00033 #include "dnxTimer.h"
00034
00035 #include <sys/time.h>
00036
00037 #define DNX_JOBLIST_TIMEOUT 5
00038 #define DNX_TIMER_SLEEP 5000
00040 DnxJobList * joblist; // Fwd declaration
00041
00043 typedef struct iDnxJobList_
00044 {
00045 DnxNewJob * list;
00046 unsigned long size;
00047 unsigned long head;
00048 unsigned long tail;
00049 unsigned long dhead;
00050 pthread_mutex_t mut;
00051 pthread_cond_t cond;
00052 DnxTimer * timer;
00053 } iDnxJobList;
00054
00055
00056
00057
00058
00059
00060 int dnxJobListAdd(DnxJobList * pJobList, DnxNewJob * pJob)
00061 {
00062 iDnxJobList * ilist = (iDnxJobList *)pJobList;
00063 unsigned long tail;
00064 int ret = DNX_OK;
00065
00066 assert(pJobList && pJob);
00067
00068 DNX_PT_MUTEX_LOCK(&ilist->mut);
00069
00070 tail = ilist->tail;
00071
00072
00073 if (ilist->list[tail].state && (tail = (tail + 1) % ilist->size) == ilist->head)
00074 {
00075 dnxLog("dnxJobListAdd: Out of job slots (max=%lu): %s.",
00076 ilist->size, pJob->cmd);
00077 ret = DNX_ERR_CAPACITY;
00078 }
00079 else
00080 {
00081
00082
00083 pJob->xid.objSlot = tail;
00084 pJob->state = DNX_JOB_PENDING;
00085
00086
00087 memcpy(&ilist->list[tail], pJob, sizeof *pJob);
00088
00089
00090 if (ilist->list[ilist->tail].state != DNX_JOB_PENDING)
00091 ilist->dhead = tail;
00092
00093 ilist->tail = tail;
00094
00095 dnxDebug(8, "dnxJobListAdd: Job [%lu,%lu]: Head=%lu, DHead=%lu, Tail=%lu.",
00096 pJob->xid.objSerial, pJob->xid.objSlot, ilist->head, ilist->dhead,
00097 ilist->tail);
00098
00099 pthread_cond_signal(&ilist->cond);
00100 }
00101
00102 DNX_PT_MUTEX_UNLOCK(&ilist->mut);
00103
00104 return ret;
00105 }
00106
00107
00108
00109
00110
00111
00112
00113
00114
00115
00116
00117
00118
00119
00120
00121
00122
00123
00124
00125
00126 int dnxJobListExpire(DnxJobList * pJobList, DnxNewJob * pExpiredJobs,
00127 int * totalJobs)
00128 {
00129 iDnxJobList * ilist = (iDnxJobList *)pJobList;
00130 unsigned long current;
00131 DnxNewJob * pJob;
00132 int jobCount;
00133 time_t now;
00134
00135 assert(pJobList && pExpiredJobs && totalJobs && *totalJobs > 0);
00136
00137 DNX_PT_MUTEX_LOCK(&ilist->mut);
00138
00139
00140 now = time(0);
00141
00142
00143 current = ilist->head;
00144 jobCount = 0;
00145 while (jobCount < *totalJobs)
00146 {
00147
00148 if ((pJob = &ilist->list[current])->state == DNX_JOB_INPROGRESS
00149 || pJob->state == DNX_JOB_PENDING)
00150 {
00151
00152 if (pJob->expires > now)
00153 break;
00154
00155
00156 memcpy(&pExpiredJobs[jobCount], pJob, sizeof(DnxNewJob));
00157
00158 pJob->state = DNX_JOB_NULL;
00159
00160 jobCount++;
00161 }
00162
00163
00164 if (current == ilist->tail)
00165 break;
00166
00167
00168 current = (current + 1) % ilist->size;
00169 }
00170
00171 ilist->head = current;
00172
00173
00174 if (ilist->list[current].state != DNX_JOB_INPROGRESS)
00175 ilist->dhead = current;
00176
00177
00178 *totalJobs = jobCount;
00179
00180 DNX_PT_MUTEX_UNLOCK(&ilist->mut);
00181
00182 return DNX_OK;
00183 }
00184
00185
00186
00187 int dnxJobListDispatch(DnxJobList * pJobList, DnxNewJob * pJob)
00188 {
00189 iDnxJobList * ilist = (iDnxJobList *)pJobList;
00190 unsigned long current;
00191 int ret = 0;
00192
00193 assert(pJobList && pJob);
00194
00195 DNX_PT_MUTEX_LOCK(&ilist->mut);
00196
00197
00198
00203
00204 current = ilist->dhead;
00205
00206
00207 while (ilist->list[current].state != DNX_JOB_PENDING)
00208 {
00209 struct timeval now;
00210 struct timespec timeout;
00211
00212 gettimeofday(&now, 0);
00213 timeout.tv_sec = now.tv_sec + DNX_JOBLIST_TIMEOUT;
00214 timeout.tv_nsec = now.tv_usec * 1000;
00215
00216 dnxDebug(8, "dnxJobListDispatch: BEFORE: Head=%lu, DHead=%lu, Tail=%lu.",
00217 ilist->head, ilist->dhead, ilist->tail);
00218
00219 if ((ret = pthread_cond_timedwait(&ilist->cond, &ilist->mut,
00220 &timeout)) == ETIMEDOUT)
00221 break;
00222
00223 current = ilist->dhead;
00224 }
00225
00226 if (ret == 0)
00227 {
00228
00229 ilist->list[current].state = DNX_JOB_INPROGRESS;
00230
00231
00232 memcpy(pJob, &ilist->list[current], sizeof *pJob);
00233
00234
00235 if (ilist->dhead != ilist->tail)
00236 ilist->dhead = (current + 1) % ilist->size;
00237
00238 dnxDebug(8, "dnxJobListDispatch: AFTER: Job [%lu,%lu]; Head=%lu, DHead=%lu, Tail=%lu.",
00239 pJob->xid.objSerial, pJob->xid.objSlot, ilist->head, ilist->dhead, ilist->tail);
00240 }
00241
00242 DNX_PT_MUTEX_UNLOCK(&ilist->mut);
00243
00244 return ret;
00245 }
00246
00247
00248
00249 int dnxJobListCollect(DnxJobList * pJobList, DnxXID * pxid, DnxNewJob * pJob)
00250 {
00251 iDnxJobList * ilist = (iDnxJobList *)pJobList;
00252 unsigned long current;
00253 int ret = DNX_OK;
00254
00255 assert(pJobList && pxid && pJob);
00256
00257 current = pxid->objSlot;
00258
00259 assert(current < ilist->size);
00260 if (current >= ilist->size)
00261 return DNX_ERR_INVALID;
00262
00263 DNX_PT_MUTEX_LOCK(&ilist->mut);
00264
00265 dnxDebug(8,
00266 "dnxJobListCollect: Compare job [%lu,%lu] to job [%lu,%lu]: "
00267 "Head=%lu, DHead=%lu, Tail=%lu.", pxid->objSerial, pxid->objSlot,
00268 ilist->list[current].xid.objSerial, ilist->list[current].xid.objSlot,
00269 ilist->head, ilist->dhead, ilist->tail);
00270
00271
00272 if (ilist->list[current].state == DNX_JOB_NULL
00273 || !dnxEqualXIDs(pxid, &ilist->list[current].xid))
00274 ret = DNX_ERR_NOTFOUND;
00275 else
00276 {
00277
00278 memcpy(pJob, &ilist->list[current], sizeof *pJob);
00279 pJob->state = DNX_JOB_COMPLETE;
00280
00281
00282 ilist->list[current].state = DNX_JOB_NULL;
00283
00284
00285 if (current == ilist->head && current != ilist->tail)
00286 ilist->head = (current + 1) % ilist->size;
00287 }
00288
00289 DNX_PT_MUTEX_UNLOCK(&ilist->mut);
00290
00291 return ret;
00292 }
00293
00294
00295
00296 int dnxJobListCreate(unsigned size, DnxJobList ** ppJobList)
00297 {
00298 iDnxJobList * ilist;
00299 int ret;
00300
00301 assert(ppJobList && size);
00302
00303 if ((ilist = (iDnxJobList *)xmalloc(sizeof *ilist)) == 0)
00304 return DNX_ERR_MEMORY;
00305 memset(ilist, 0, sizeof *ilist);
00306
00307 if ((ilist->list = (DnxNewJob *)xmalloc(sizeof *ilist->list * size)) == 0)
00308 {
00309 xfree(ilist);
00310 return DNX_ERR_MEMORY;
00311 }
00312 memset(ilist->list, 0, sizeof *ilist->list * size);
00313
00314 ilist->size = size;
00315
00316 DNX_PT_MUTEX_INIT(&ilist->mut);
00317 pthread_cond_init(&ilist->cond, 0);
00318
00319 if ((ret = dnxTimerCreate((DnxJobList *)ilist, DNX_TIMER_SLEEP,
00320 &ilist->timer)) != 0)
00321 {
00322 DNX_PT_COND_DESTROY(&ilist->cond);
00323 DNX_PT_MUTEX_DESTROY(&ilist->mut);
00324 xfree(ilist->list);
00325 xfree(ilist);
00326 return ret;
00327 }
00328
00329 *ppJobList = (DnxJobList *)ilist;
00330
00331 return DNX_OK;
00332 }
00333
00334
00335
00336 void dnxJobListDestroy(DnxJobList * pJobList)
00337 {
00338 iDnxJobList * ilist = (iDnxJobList *)pJobList;
00339
00340 assert(pJobList);
00341
00342 dnxTimerDestroy(ilist->timer);
00343
00344 DNX_PT_COND_DESTROY(&ilist->cond);
00345 DNX_PT_MUTEX_DESTROY(&ilist->mut);
00346
00347 xfree(ilist->list);
00348 xfree(ilist);
00349 }
00350
00351
00352
00353
00354
00355
00356
00357
00358
00359
00360
00361 #ifdef DNX_JOBLIST_TEST
00362
00363 #include "utesthelp.h"
00364 #include <time.h>
00365
00366 #define elemcount(x) (sizeof(x)/sizeof(*(x)))
00367
00368 static int verbose;
00369
00370
00371 IMPLEMENT_DNX_DEBUG(verbose);
00372 IMPLEMENT_DNX_SYSLOG(verbose);
00373
00374 int dnxTimerCreate(DnxJobList * jl, int s, DnxTimer ** pt) { *pt = 0; return 0; }
00375 void dnxTimerDestroy(DnxTimer * t) { }
00376
00377 int dnxEqualXIDs(DnxXID * pxa, DnxXID * pxb)
00378 { return pxa->objType == pxb->objType && pxa->objSerial == pxb->objSerial
00379 && pxa->objSlot == pxb->objSlot; }
00380
00381 int dnxMakeXID(DnxXID * x, DnxObjType t, unsigned long s, unsigned long l)
00382 { x->objType = t; x->objSerial = s; x->objSlot = l; return DNX_OK; }
00383
00384 int main(int argc, char ** argv)
00385 {
00386 DnxJobList * jobs;
00387 DnxNodeRequest n1[101];
00388 DnxNewJob j1[101];
00389 DnxNewJob jtmp;
00390 DnxXID xid;
00391 int serial, xlsz, expcount;
00392 iDnxJobList * ijobs;
00393 time_t now;
00394
00395 verbose = argc > 1;
00396
00397
00398 CHECK_ZERO(dnxJobListCreate(100, &jobs));
00399 ijobs = (iDnxJobList *)jobs;
00400
00401
00402 memset(j1, 0xcc, sizeof j1);
00403 memset(n1, 0xdd, sizeof n1);
00404
00405
00406 now = time(0);
00407 for (serial = 0; serial < elemcount(j1); serial++)
00408 {
00409
00410 dnxMakeXID(&n1[serial].xid, DNX_OBJ_WORKER, serial, 0);
00411 n1[serial].reqType = DNX_REQ_REGISTER;
00412 n1[serial].jobCap = 1;
00413 n1[serial].ttl = 5;
00414 n1[serial].expires = 5;
00415 strcpy(n1[serial].address, "localhost");
00416
00417
00418 dnxMakeXID(&j1[serial].xid, DNX_OBJ_JOB, serial, 0);
00419 j1[serial].cmd = "some command line";
00420 j1[serial].start_time = now;
00421 j1[serial].timeout = serial < 50? 0: 5;
00422 j1[serial].expires = j1[serial].start_time + j1[serial].timeout;
00423 j1[serial].payload = 0;
00424 j1[serial].pNode = &n1[serial];
00425
00426 if (serial < 100)
00427 CHECK_ZERO(dnxJobListAdd(jobs, &j1[serial]));
00428 else
00429 CHECK_NONZERO(dnxJobListAdd(jobs, &j1[serial]));
00430 }
00431
00432
00433 expcount = 0;
00434 do
00435 {
00436 DnxNewJob xl[10];
00437 xlsz = (int)elemcount(xl);
00438 CHECK_ZERO(dnxJobListExpire(jobs, xl, &xlsz));
00439 expcount += xlsz;
00440 } while (xlsz != 0);
00441 CHECK_TRUE(expcount == 50);
00442
00443
00444 for (serial = 50; serial < elemcount(j1) - 2; serial++)
00445 CHECK_ZERO(dnxJobListDispatch(jobs, &jtmp));
00446
00447
00448 CHECK_TRUE(ijobs->dhead != 0);
00449 CHECK_TRUE(ijobs->list[ijobs->dhead].state != DNX_JOB_INPROGRESS);
00450 CHECK_TRUE(ijobs->list[ijobs->head].state == DNX_JOB_INPROGRESS);
00451
00452
00453 CHECK_TRUE(ijobs->dhead == 99);
00454
00455
00456 for (serial = 50; serial < elemcount(j1) - 2; serial++)
00457 {
00458 dnxMakeXID(&xid, DNX_OBJ_JOB, serial, serial);
00459 CHECK_ZERO(dnxJobListCollect(jobs, &xid, &jtmp));
00460 }
00461
00462
00463 CHECK_TRUE(ijobs->head == ijobs->tail);
00464 CHECK_TRUE(ijobs->head != 0);
00465
00466
00467 CHECK_TRUE(ijobs->head == 99);
00468 CHECK_TRUE(ijobs->tail == 99);
00469 CHECK_TRUE(ijobs->dhead == 99);
00470
00471 dnxJobListDestroy(jobs);
00472
00473 return 0;
00474 }
00475
00476 #endif
00477
00478
00479