00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00028 #include "dnxJobList.h"
00029
00030 #include "dnxError.h"
00031 #include "dnxDebug.h"
00032 #include "dnxLogging.h"
00033 #include "dnxTimer.h"
00034
00035 #include <sys/time.h>
00036
00037 #define DNX_JOBLIST_TIMEOUT 5
00038 #define DNX_TIMER_SLEEP 5000
00040 DnxJobList * joblist; // Fwd declaration
00041
00043 typedef struct iDnxJobList_
00044 {
00045 DnxNewJob * list;
00046 unsigned long size;
00047 unsigned long head;
00048 unsigned long tail;
00049 unsigned long dhead;
00050 pthread_mutex_t mut;
00051 pthread_cond_t cond;
00052 DnxTimer * timer;
00053 } iDnxJobList;
00054
00055
00056
00057
00058
00059 int dnxJobListAdd(DnxJobList * pJobList, DnxNewJob * pJob)
00060 {
00061 iDnxJobList * ilist = (iDnxJobList *)pJobList;
00062 unsigned long tail;
00063 int ret = DNX_OK;
00064
00065 assert(pJobList && pJob);
00066
00067 DNX_PT_MUTEX_LOCK(&ilist->mut);
00068
00069 tail = ilist->tail;
00070
00071
00072 if (ilist->list[tail].state && (tail = (tail + 1) % ilist->size) == ilist->head)
00073 {
00074 dnxLog("dnxJobListAdd: Out of job slots (max=%lu): %s.",
00075 ilist->size, pJob->cmd);
00076 ret = DNX_ERR_CAPACITY;
00077 }
00078 else
00079 {
00080
00081
00082 pJob->xid.objSlot = tail;
00083 pJob->state = DNX_JOB_PENDING;
00084
00085
00086 memcpy(&ilist->list[tail], pJob, sizeof *pJob);
00087
00088
00089 if (ilist->list[ilist->tail].state != DNX_JOB_PENDING)
00090 ilist->dhead = tail;
00091
00092 ilist->tail = tail;
00093
00094 dnxDebug(8, "dnxJobListAdd: Job [%lu,%lu]: Head=%lu, DHead=%lu, Tail=%lu.",
00095 pJob->xid.objSerial, pJob->xid.objSlot, ilist->head, ilist->dhead,
00096 ilist->tail);
00097
00098 pthread_cond_signal(&ilist->cond);
00099 }
00100
00101 DNX_PT_MUTEX_UNLOCK(&ilist->mut);
00102
00103 return ret;
00104 }
00105
00106
00107
00108 int dnxJobListExpire(DnxJobList * pJobList, DnxNewJob * pExpiredJobs,
00109 int * totalJobs)
00110 {
00111 iDnxJobList * ilist = (iDnxJobList *)pJobList;
00112 unsigned long current;
00113 DnxNewJob * pJob;
00114 int jobCount;
00115 time_t now;
00116
00117 assert(pJobList && pExpiredJobs && totalJobs && *totalJobs > 0);
00118
00119 DNX_PT_MUTEX_LOCK(&ilist->mut);
00120
00121
00122 now = time(0);
00123
00124
00125 current = ilist->head;
00126 jobCount = 0;
00127 while (jobCount < *totalJobs)
00128 {
00129
00130 if ((pJob = &ilist->list[current])->state == DNX_JOB_INPROGRESS
00131 || pJob->state == DNX_JOB_PENDING)
00132 {
00133
00134 if (pJob->expires > now)
00135 break;
00136
00137
00138 memcpy(&pExpiredJobs[jobCount], pJob, sizeof(DnxNewJob));
00139
00140 pJob->state = DNX_JOB_NULL;
00141
00142 jobCount++;
00143 }
00144
00145
00146 if (current == ilist->tail)
00147 break;
00148
00149
00150 current = (current + 1) % ilist->size;
00151 }
00152
00153 ilist->head = current;
00154
00155
00156 if (ilist->list[current].state != DNX_JOB_INPROGRESS)
00157 ilist->dhead = current;
00158
00159
00160 *totalJobs = jobCount;
00161
00162 DNX_PT_MUTEX_UNLOCK(&ilist->mut);
00163
00164 return DNX_OK;
00165 }
00166
00167
00168
00169 int dnxJobListDispatch(DnxJobList * pJobList, DnxNewJob * pJob)
00170 {
00171 iDnxJobList * ilist = (iDnxJobList *)pJobList;
00172 unsigned long current;
00173 int ret = 0;
00174
00175 assert(pJobList && pJob);
00176
00177 DNX_PT_MUTEX_LOCK(&ilist->mut);
00178
00183
00184 current = ilist->dhead;
00185
00186
00187 while (ilist->list[current].state != DNX_JOB_PENDING)
00188 {
00189 struct timeval now;
00190 struct timespec timeout;
00191
00192 gettimeofday(&now, 0);
00193 timeout.tv_sec = now.tv_sec + DNX_JOBLIST_TIMEOUT;
00194 timeout.tv_nsec = now.tv_usec * 1000;
00195
00196 dnxDebug(8, "dnxJobListDispatch: BEFORE: Head=%lu, DHead=%lu, Tail=%lu.",
00197 ilist->head, ilist->dhead, ilist->tail);
00198
00199
00200 if ((ret = pthread_cond_timedwait(&ilist->cond, &ilist->mut,
00201 &timeout)) == ETIMEDOUT)
00202 break;
00203
00204 current = ilist->dhead;
00205 }
00206
00207 if (ret == 0)
00208 {
00209
00210 ilist->list[current].state = DNX_JOB_INPROGRESS;
00211
00212
00213 memcpy(pJob, &ilist->list[current], sizeof *pJob);
00214
00215
00216 if (ilist->dhead != ilist->tail)
00217 ilist->dhead = (current + 1) % ilist->size;
00218
00219 dnxDebug(8, "dnxJobListDispatch: AFTER: Job [%lu,%lu]; Head=%lu, DHead=%lu, Tail=%lu.",
00220 pJob->xid.objSerial, pJob->xid.objSlot, ilist->head, ilist->dhead, ilist->tail);
00221 }
00222
00223 DNX_PT_MUTEX_UNLOCK(&ilist->mut);
00224
00225 return ret;
00226 }
00227
00228
00229
00230 int dnxJobListCollect(DnxJobList * pJobList, DnxXID * pxid, DnxNewJob * pJob)
00231 {
00232 iDnxJobList * ilist = (iDnxJobList *)pJobList;
00233 unsigned long current;
00234 int ret = DNX_OK;
00235
00236 assert(pJobList && pxid && pJob);
00237
00238 current = pxid->objSlot;
00239
00240 assert(current < ilist->size);
00241 if (current >= ilist->size)
00242 return DNX_ERR_INVALID;
00243
00244 DNX_PT_MUTEX_LOCK(&ilist->mut);
00245
00246 dnxDebug(8, "dnxJobListCollect: Compare job [%lu,%lu] to job [%lu,%lu]: "
00247 "Head=%lu, DHead=%lu, Tail=%lu.", pxid->objSerial, pxid->objSlot,
00248 ilist->list[current].xid.objSerial, ilist->list[current].xid.objSlot,
00249 ilist->head, ilist->dhead, ilist->tail);
00250
00251
00252 if (ilist->list[current].state == DNX_JOB_NULL
00253 || !dnxEqualXIDs(pxid, &ilist->list[current].xid))
00254 ret = DNX_ERR_NOTFOUND;
00255 else
00256 {
00257
00258 memcpy(pJob, &ilist->list[current], sizeof *pJob);
00259 pJob->state = DNX_JOB_COMPLETE;
00260
00261
00262 ilist->list[current].state = DNX_JOB_NULL;
00263
00264
00265 if (current == ilist->head && current != ilist->tail)
00266 ilist->head = (current + 1) % ilist->size;
00267 }
00268
00269 DNX_PT_MUTEX_UNLOCK(&ilist->mut);
00270
00271 return ret;
00272 }
00273
00274
00275
00276 int dnxJobListCreate(unsigned size, DnxJobList ** ppJobList)
00277 {
00278 iDnxJobList * ilist;
00279 int ret;
00280
00281 assert(ppJobList && size);
00282
00283 if ((ilist = (iDnxJobList *)xmalloc(sizeof *ilist)) == 0)
00284 return DNX_ERR_MEMORY;
00285 memset(ilist, 0, sizeof *ilist);
00286
00287 if ((ilist->list = (DnxNewJob *)xmalloc(sizeof *ilist->list * size)) == 0)
00288 {
00289 xfree(ilist);
00290 return DNX_ERR_MEMORY;
00291 }
00292 memset(ilist->list, 0, sizeof *ilist->list * size);
00293
00294 ilist->size = size;
00295
00296 DNX_PT_MUTEX_INIT(&ilist->mut);
00297 pthread_cond_init(&ilist->cond, 0);
00298
00299 if ((ret = dnxTimerCreate((DnxJobList *)ilist, DNX_TIMER_SLEEP,
00300 &ilist->timer)) != 0)
00301 {
00302 DNX_PT_COND_DESTROY(&ilist->cond);
00303 DNX_PT_MUTEX_DESTROY(&ilist->mut);
00304 xfree(ilist->list);
00305 xfree(ilist);
00306 return ret;
00307 }
00308
00309 *ppJobList = (DnxJobList *)ilist;
00310
00311 return DNX_OK;
00312 }
00313
00314
00315
00316 void dnxJobListDestroy(DnxJobList * pJobList)
00317 {
00318 iDnxJobList * ilist = (iDnxJobList *)pJobList;
00319
00320 assert(pJobList);
00321
00322 dnxTimerDestroy(ilist->timer);
00323
00324 DNX_PT_COND_DESTROY(&ilist->cond);
00325 DNX_PT_MUTEX_DESTROY(&ilist->mut);
00326
00327 xfree(ilist->list);
00328 xfree(ilist);
00329 }
00330
00331
00332
00333
00334
00335
00336
00337
00338
00339
00340
00341 #ifdef DNX_JOBLIST_TEST
00342
00343 #include "utesthelp.h"
00344 #include <time.h>
00345
00346 #define elemcount(x) (sizeof(x)/sizeof(*(x)))
00347
00348 static int verbose;
00349
00350
00351 IMPLEMENT_DNX_DEBUG(verbose);
00352 IMPLEMENT_DNX_SYSLOG(verbose);
00353
00354 int dnxTimerCreate(DnxJobList * jl, int s, DnxTimer ** pt) { *pt = 0; return 0; }
00355 void dnxTimerDestroy(DnxTimer * t) { }
00356
00357 int dnxEqualXIDs(DnxXID * pxa, DnxXID * pxb)
00358 { return pxa->objType == pxb->objType && pxa->objSerial == pxb->objSerial
00359 && pxa->objSlot == pxb->objSlot; }
00360
00361 int dnxMakeXID(DnxXID * x, DnxObjType t, unsigned long s, unsigned long l)
00362 { x->objType = t; x->objSerial = s; x->objSlot = l; return DNX_OK; }
00363
00364 int main(int argc, char ** argv)
00365 {
00366 DnxJobList * jobs;
00367 DnxNodeRequest n1[101];
00368 DnxNewJob j1[101];
00369 DnxNewJob jtmp;
00370 DnxXID xid;
00371 int serial, xlsz, expcount;
00372 iDnxJobList * ijobs;
00373 time_t now;
00374
00375 verbose = argc > 1;
00376
00377
00378 CHECK_ZERO(dnxJobListCreate(100, &jobs));
00379 ijobs = (iDnxJobList *)jobs;
00380
00381
00382 memset(j1, 0xcc, sizeof j1);
00383 memset(n1, 0xdd, sizeof n1);
00384
00385
00386 now = time(0);
00387 for (serial = 0; serial < elemcount(j1); serial++)
00388 {
00389
00390 dnxMakeXID(&n1[serial].xid, DNX_OBJ_WORKER, serial, 0);
00391 n1[serial].reqType = DNX_REQ_REGISTER;
00392 n1[serial].jobCap = 1;
00393 n1[serial].ttl = 5;
00394 n1[serial].expires = 5;
00395 strcpy(n1[serial].address, "localhost");
00396
00397
00398 dnxMakeXID(&j1[serial].xid, DNX_OBJ_JOB, serial, 0);
00399 j1[serial].cmd = "some command line";
00400 j1[serial].start_time = now;
00401 j1[serial].timeout = serial < 50? 0: 5;
00402 j1[serial].expires = j1[serial].start_time + j1[serial].timeout;
00403 j1[serial].payload = 0;
00404 j1[serial].pNode = &n1[serial];
00405
00406 if (serial < 100)
00407 CHECK_ZERO(dnxJobListAdd(jobs, &j1[serial]));
00408 else
00409 CHECK_NONZERO(dnxJobListAdd(jobs, &j1[serial]));
00410 }
00411
00412
00413 expcount = 0;
00414 do
00415 {
00416 DnxNewJob xl[10];
00417 xlsz = (int)elemcount(xl);
00418 CHECK_ZERO(dnxJobListExpire(jobs, xl, &xlsz));
00419 expcount += xlsz;
00420 } while (xlsz != 0);
00421 CHECK_TRUE(expcount == 50);
00422
00423
00424 for (serial = 50; serial < elemcount(j1) - 2; serial++)
00425 CHECK_ZERO(dnxJobListDispatch(jobs, &jtmp));
00426
00427
00428 CHECK_TRUE(ijobs->dhead != 0);
00429 CHECK_TRUE(ijobs->list[ijobs->dhead].state != DNX_JOB_INPROGRESS);
00430 CHECK_TRUE(ijobs->list[ijobs->head].state == DNX_JOB_INPROGRESS);
00431
00432
00433 CHECK_TRUE(ijobs->dhead == 99);
00434
00435
00436 for (serial = 50; serial < elemcount(j1) - 2; serial++)
00437 {
00438 dnxMakeXID(&xid, DNX_OBJ_JOB, serial, serial);
00439 CHECK_ZERO(dnxJobListCollect(jobs, &xid, &jtmp));
00440 }
00441
00442
00443 CHECK_TRUE(ijobs->head == ijobs->tail);
00444 CHECK_TRUE(ijobs->head != 0);
00445
00446
00447 CHECK_TRUE(ijobs->head == 99);
00448 CHECK_TRUE(ijobs->tail == 99);
00449 CHECK_TRUE(ijobs->dhead == 99);
00450
00451 dnxJobListDestroy(jobs);
00452
00453 return 0;
00454 }
00455
00456 #endif
00457
00458
00459