dnxJobList.c

Go to the documentation of this file.
00001 /*--------------------------------------------------------------------------
00002 
00003    Copyright (c) 2006-2007, Intellectual Reserve, Inc. All rights reserved.
00004 
00005    This program is free software; you can redistribute it and/or modify
00006    it under the terms of the GNU General Public License version 2 as
00007    published by the Free Software Foundation.
00008 
00009    This program is distributed in the hope that it will be useful,
00010    but WITHOUT ANY WARRANTY; without even the implied warranty of
00011    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00012    GNU General Public License for more details.
00013 
00014    You should have received a copy of the GNU General Public License
00015    along with this program; if not, write to the Free Software
00016    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00017 
00018   --------------------------------------------------------------------------*/
00019 
00028 #include "dnxJobList.h"
00029 
00030 #include "dnxError.h"
00031 #include "dnxDebug.h"
00032 #include "dnxLogging.h"
00033 #include "dnxTimer.h"
00034 
00035 #include <sys/time.h>
00036 
00037 #define DNX_JOBLIST_TIMEOUT   5     
00038 #define DNX_TIMER_SLEEP       5000  
00040 DnxJobList * joblist; // Fwd declaration
00041 
00043 typedef struct iDnxJobList_
00044 {
00045    DnxNewJob * list;       
00046    unsigned long size;     
00047    unsigned long head;     
00048    unsigned long tail;     
00049    unsigned long dhead;    
00050    pthread_mutex_t mut;    
00051    pthread_cond_t cond;    
00052    DnxTimer * timer;       
00053 } iDnxJobList;
00054 
00055 
00056 /*--------------------------------------------------------------------------
00057                                  INTERFACE
00058   --------------------------------------------------------------------------*/
00059 
00060 int dnxJobListAdd(DnxJobList * pJobList, DnxNewJob * pJob)
00061 {
00062    iDnxJobList * ilist = (iDnxJobList *)pJobList;
00063    unsigned long tail;
00064    int ret = DNX_OK;
00065 
00066    assert(pJobList && pJob);
00067 
00068    DNX_PT_MUTEX_LOCK(&ilist->mut);
00069 
00070    tail = ilist->tail;
00071 
00072    // verify space in the job list
00073    if (ilist->list[tail].state && (tail = (tail + 1) % ilist->size) == ilist->head)
00074    {
00075       dnxLog("dnxJobListAdd: Out of job slots (max=%lu): %s.",
00076             ilist->size, pJob->cmd);
00077       ret = DNX_ERR_CAPACITY;
00078    }
00079    else
00080    {
00081       // add the slot index to the Job's XID - this allows us to index
00082       //    the job list using the returned result's XID.objSlot field
00083       pJob->xid.objSlot = tail;
00084       pJob->state = DNX_JOB_PENDING;
00085 
00086       // add this job to the job list
00087       memcpy(&ilist->list[tail], pJob, sizeof *pJob);
00088 
00089       // update dispatch head index
00090       if (ilist->list[ilist->tail].state != DNX_JOB_PENDING)
00091          ilist->dhead = tail;
00092 
00093       ilist->tail = tail;
00094 
00095       dnxDebug(8, "dnxJobListAdd: Job [%lu,%lu]: Head=%lu, DHead=%lu, Tail=%lu.",
00096             pJob->xid.objSerial, pJob->xid.objSlot, ilist->head, ilist->dhead,
00097             ilist->tail);
00098 
00099       pthread_cond_signal(&ilist->cond);  // signal that a new job is available
00100    }
00101 
00102    DNX_PT_MUTEX_UNLOCK(&ilist->mut);
00103 
00104    return ret;
00105 }
00106 /*
00107 int dnxJobListMarkAck(DnxXID * pXid)
00108 {
00109     int current = 0;
00110     iDnxJobList * ilist = (iDnxJobList *)joblist;
00111     DNX_PT_MUTEX_LOCK(&ilist->mut);
00112     while(current++ <= ilist->size)
00113     {
00114         if (dnxEqualXIDs(pXid, &ilist->list[current].xid))
00115         {
00116             ilist->list[current].ack = true;
00117             break;
00118         }
00119     }
00120     DNX_PT_MUTEX_UNLOCK(&ilist->mut);
00121     return DNX_OK;
00122 }
00123 */
00124 //----------------------------------------------------------------------------
00125 
00126 int dnxJobListExpire(DnxJobList * pJobList, DnxNewJob * pExpiredJobs,
00127       int * totalJobs)
00128 {
00129    iDnxJobList * ilist = (iDnxJobList *)pJobList;
00130    unsigned long current;
00131    DnxNewJob * pJob;
00132    int jobCount;
00133    time_t now;
00134 
00135    assert(pJobList && pExpiredJobs && totalJobs && *totalJobs > 0);
00136 
00137    DNX_PT_MUTEX_LOCK(&ilist->mut);
00138 
00139    // get the current time (after we acquire the lock! In case we had to wait)
00140    now = time(0);
00141 
00142    // walk the entire job list - InProgress and Pending jobs (in that order)
00143    current = ilist->head;
00144    jobCount = 0;
00145    while (jobCount < *totalJobs)
00146    {
00147       // only examine jobs that are either awaiting dispatch or results
00148       if ((pJob = &ilist->list[current])->state == DNX_JOB_INPROGRESS
00149             || pJob->state == DNX_JOB_PENDING)
00150       {
00151          // check the job's expiration stamp
00152          if (pJob->expires > now)
00153             break;   // Bail-out: this job hasn't expired yet
00154 
00155          // job has expired - add it to the expired job list
00156          memcpy(&pExpiredJobs[jobCount], pJob, sizeof(DnxNewJob));
00157 
00158          pJob->state = DNX_JOB_NULL;
00159 
00160          jobCount++;
00161       }
00162 
00163       // bail-out if this was the job list tail
00164       if (current == ilist->tail)
00165          break;
00166 
00167       // increment the job list index
00168       current = (current + 1) % ilist->size;
00169    }
00170 
00171    ilist->head = current;
00172 
00173    // if this job is awaiting dispatch, then it is the new dispatch head
00174    if (ilist->list[current].state != DNX_JOB_INPROGRESS)
00175       ilist->dhead = current;
00176 
00177    // update the total jobs in the expired job list
00178    *totalJobs = jobCount;
00179 
00180    DNX_PT_MUTEX_UNLOCK(&ilist->mut);
00181 
00182    return DNX_OK;
00183 }
00184 
00185 //----------------------------------------------------------------------------
00186 
00187 int dnxJobListDispatch(DnxJobList * pJobList, DnxNewJob * pJob)
00188 {
00189    iDnxJobList * ilist = (iDnxJobList *)pJobList;
00190    unsigned long current;
00191    int ret = 0;
00192 
00193    assert(pJobList && pJob);
00194 
00195    DNX_PT_MUTEX_LOCK(&ilist->mut);
00196 
00197    // wait on the condition variable if there are no pending jobs
00198 
00203    // start at current dispatch head
00204    current = ilist->dhead;
00205 
00206    // see if we have a pending job
00207    while (ilist->list[current].state != DNX_JOB_PENDING)
00208    {
00209       struct timeval now;
00210       struct timespec timeout;
00211 
00212       gettimeofday(&now, 0);
00213       timeout.tv_sec = now.tv_sec + DNX_JOBLIST_TIMEOUT;
00214       timeout.tv_nsec = now.tv_usec * 1000;
00215 
00216       dnxDebug(8, "dnxJobListDispatch: BEFORE: Head=%lu, DHead=%lu, Tail=%lu.",
00217             ilist->head, ilist->dhead, ilist->tail);
00218 
00219       if ((ret = pthread_cond_timedwait(&ilist->cond, &ilist->mut,
00220             &timeout)) == ETIMEDOUT)
00221          break;
00222 
00223       current = ilist->dhead;
00224    }
00225 
00226    if (ret == 0)
00227    {
00228       // transition this job's state to InProgress
00229       ilist->list[current].state = DNX_JOB_INPROGRESS;
00230 
00231       // make a copy for the Dispatcher
00232       memcpy(pJob, &ilist->list[current], sizeof *pJob);
00233 
00234       // update the dispatch head
00235       if (ilist->dhead != ilist->tail)
00236          ilist->dhead = (current + 1) % ilist->size;
00237 
00238       dnxDebug(8, "dnxJobListDispatch: AFTER: Job [%lu,%lu]; Head=%lu, DHead=%lu, Tail=%lu.",
00239             pJob->xid.objSerial, pJob->xid.objSlot, ilist->head, ilist->dhead, ilist->tail);
00240    }
00241 
00242    DNX_PT_MUTEX_UNLOCK(&ilist->mut);
00243 
00244    return ret;
00245 }
00246 
00247 //----------------------------------------------------------------------------
00248 
00249 int dnxJobListCollect(DnxJobList * pJobList, DnxXID * pxid, DnxNewJob * pJob)
00250 {
00251    iDnxJobList * ilist = (iDnxJobList *)pJobList;
00252    unsigned long current;
00253    int ret = DNX_OK;
00254 
00255    assert(pJobList && pxid && pJob);   // parameter validation
00256 
00257    current = pxid->objSlot;
00258 
00259    assert(current < ilist->size);
00260    if (current >= ilist->size)         // runtime validation requires check
00261       return DNX_ERR_INVALID;          // corrupt client network message
00262 
00263    DNX_PT_MUTEX_LOCK(&ilist->mut);
00264 
00265    dnxDebug(8,
00266          "dnxJobListCollect: Compare job [%lu,%lu] to job [%lu,%lu]: "
00267          "Head=%lu, DHead=%lu, Tail=%lu.", pxid->objSerial, pxid->objSlot,
00268          ilist->list[current].xid.objSerial, ilist->list[current].xid.objSlot,
00269          ilist->head, ilist->dhead, ilist->tail);
00270 
00271    // verify that the XID of this result matches the XID of the service check
00272    if (ilist->list[current].state == DNX_JOB_NULL
00273          || !dnxEqualXIDs(pxid, &ilist->list[current].xid))
00274       ret = DNX_ERR_NOTFOUND;          // job expired; removed by the timer
00275    else
00276    {
00277       // make a copy for the Collector
00278       memcpy(pJob, &ilist->list[current], sizeof *pJob);
00279       pJob->state = DNX_JOB_COMPLETE;
00280 
00281       // dequeue this job; make slot available for another job
00282       ilist->list[current].state = DNX_JOB_NULL;
00283 
00284       // update the job list head
00285       if (current == ilist->head && current != ilist->tail)
00286          ilist->head = (current + 1) % ilist->size;
00287    }
00288 
00289    DNX_PT_MUTEX_UNLOCK(&ilist->mut);
00290 
00291    return ret;
00292 }
00293 
00294 //----------------------------------------------------------------------------
00295 
00296 int dnxJobListCreate(unsigned size, DnxJobList ** ppJobList)
00297 {
00298    iDnxJobList * ilist;
00299    int ret;
00300 
00301    assert(ppJobList && size);
00302 
00303    if ((ilist = (iDnxJobList *)xmalloc(sizeof *ilist)) == 0)
00304       return DNX_ERR_MEMORY;
00305    memset(ilist, 0, sizeof *ilist);
00306 
00307    if ((ilist->list = (DnxNewJob *)xmalloc(sizeof *ilist->list * size)) == 0)
00308    {
00309       xfree(ilist);
00310       return DNX_ERR_MEMORY;
00311    }
00312    memset(ilist->list, 0, sizeof *ilist->list * size);
00313 
00314    ilist->size = size;
00315 
00316    DNX_PT_MUTEX_INIT(&ilist->mut);
00317    pthread_cond_init(&ilist->cond, 0);
00318 
00319    if ((ret = dnxTimerCreate((DnxJobList *)ilist, DNX_TIMER_SLEEP,
00320          &ilist->timer)) != 0)
00321    {
00322       DNX_PT_COND_DESTROY(&ilist->cond);
00323       DNX_PT_MUTEX_DESTROY(&ilist->mut);
00324       xfree(ilist->list);
00325       xfree(ilist);
00326       return ret;
00327    }
00328 
00329    *ppJobList = (DnxJobList *)ilist;
00330 
00331    return DNX_OK;
00332 }
00333 
00334 //----------------------------------------------------------------------------
00335 
00336 void dnxJobListDestroy(DnxJobList * pJobList)
00337 {
00338    iDnxJobList * ilist = (iDnxJobList *)pJobList;
00339 
00340    assert(pJobList);
00341 
00342    dnxTimerDestroy(ilist->timer);
00343 
00344    DNX_PT_COND_DESTROY(&ilist->cond);
00345    DNX_PT_MUTEX_DESTROY(&ilist->mut);
00346 
00347    xfree(ilist->list);
00348    xfree(ilist);
00349 }
00350 
00351 /*--------------------------------------------------------------------------
00352                                  UNIT TEST
00353 
00354    From within dnx/server, compile with GNU tools using this command line:
00355 
00356       gcc -DDEBUG -DDNX_JOBLIST_TEST -g -O0 -I../common dnxJobList.c \
00357          ../common/dnxError.c -lpthread -lgcc_s -lrt -o dnxJobListTest
00358 
00359   --------------------------------------------------------------------------*/
00360 
00361 #ifdef DNX_JOBLIST_TEST
00362 
00363 #include "utesthelp.h"
00364 #include <time.h>
00365 
00366 #define elemcount(x) (sizeof(x)/sizeof(*(x)))
00367 
00368 static int verbose;
00369 
00370 // functional stubs
00371 IMPLEMENT_DNX_DEBUG(verbose);
00372 IMPLEMENT_DNX_SYSLOG(verbose);
00373 
00374 int dnxTimerCreate(DnxJobList * jl, int s, DnxTimer ** pt) { *pt = 0; return 0; }
00375 void dnxTimerDestroy(DnxTimer * t) { }
00376 
00377 int dnxEqualXIDs(DnxXID * pxa, DnxXID * pxb)
00378       { return pxa->objType == pxb->objType && pxa->objSerial == pxb->objSerial
00379             && pxa->objSlot == pxb->objSlot; }
00380 
00381 int dnxMakeXID(DnxXID * x, DnxObjType t, unsigned long s, unsigned long l)
00382       { x->objType = t; x->objSerial = s; x->objSlot = l; return DNX_OK; }
00383 
00384 int main(int argc, char ** argv)
00385 {
00386    DnxJobList * jobs;
00387    DnxNodeRequest n1[101];
00388    DnxNewJob j1[101];
00389    DnxNewJob jtmp;
00390    DnxXID xid;
00391    int serial, xlsz, expcount;
00392    iDnxJobList * ijobs;
00393    time_t now;
00394 
00395    verbose = argc > 1;
00396 
00397    // create a new job list and get a concrete reference to it for testing
00398    CHECK_ZERO(dnxJobListCreate(100, &jobs));
00399    ijobs = (iDnxJobList *)jobs;
00400 
00401    // force entire array to non-zero values for testing
00402    memset(j1, 0xcc, sizeof j1);
00403    memset(n1, 0xdd, sizeof n1);
00404 
00405    // test that we CAN add 100 jobs to the 100-job list
00406    now = time(0);
00407    for (serial = 0; serial < elemcount(j1); serial++)
00408    {
00409       // configure request node
00410       dnxMakeXID(&n1[serial].xid, DNX_OBJ_WORKER, serial, 0);
00411       n1[serial].reqType      = DNX_REQ_REGISTER;
00412       n1[serial].jobCap       = 1;     // jobs
00413       n1[serial].ttl          = 5;     // seconds
00414       n1[serial].expires      = 5;     // seconds
00415       strcpy(n1[serial].address, "localhost");
00416 
00417       // configure job
00418       dnxMakeXID(&j1[serial].xid, DNX_OBJ_JOB, serial, 0);
00419       j1[serial].cmd          = "some command line";
00420       j1[serial].start_time   = now;
00421       j1[serial].timeout      = serial < 50? 0: 5;  // 50 expire immediately
00422       j1[serial].expires      = j1[serial].start_time + j1[serial].timeout;
00423       j1[serial].payload      = 0;     // no payload for tests
00424       j1[serial].pNode        = &n1[serial];
00425 
00426       if (serial < 100)
00427          CHECK_ZERO(dnxJobListAdd(jobs, &j1[serial]));
00428       else  // test that we CAN'T add 101 jobs
00429          CHECK_NONZERO(dnxJobListAdd(jobs, &j1[serial]));
00430    }
00431 
00432    // test job expiration - ensure first 50 jobs have already expired
00433    expcount = 0;
00434    do
00435    {
00436       DnxNewJob xl[10];
00437       xlsz = (int)elemcount(xl);
00438       CHECK_ZERO(dnxJobListExpire(jobs, xl, &xlsz));
00439       expcount += xlsz;
00440    } while (xlsz != 0);
00441    CHECK_TRUE(expcount == 50);
00442 
00443    // dispatch 49 of the remaining 50 jobs
00444    for (serial = 50; serial < elemcount(j1) - 2; serial++)
00445       CHECK_ZERO(dnxJobListDispatch(jobs, &jtmp));
00446 
00447    // ensure the dispatch head is valid and not in progress
00448    CHECK_TRUE(ijobs->dhead != 0);
00449    CHECK_TRUE(ijobs->list[ijobs->dhead].state != DNX_JOB_INPROGRESS);
00450    CHECK_TRUE(ijobs->list[ijobs->head].state == DNX_JOB_INPROGRESS);
00451 
00452    // ensure dispatch head points to last item in list
00453    CHECK_TRUE(ijobs->dhead == 99);
00454 
00455    // collect all pending jobs
00456    for (serial = 50; serial < elemcount(j1) - 2; serial++)
00457    {
00458       dnxMakeXID(&xid, DNX_OBJ_JOB, serial, serial);
00459       CHECK_ZERO(dnxJobListCollect(jobs, &xid, &jtmp));
00460    }
00461 
00462    // ensure there's one left
00463    CHECK_TRUE(ijobs->head == ijobs->tail);
00464    CHECK_TRUE(ijobs->head != 0);
00465 
00466    // ensure head, tail and dhead all point to the last element (99)
00467    CHECK_TRUE(ijobs->head == 99);
00468    CHECK_TRUE(ijobs->tail == 99);
00469    CHECK_TRUE(ijobs->dhead == 99);
00470 
00471    dnxJobListDestroy(jobs);
00472 
00473    return 0;
00474 }
00475 
00476 #endif   /* DNX_JOBLIST_TEST */
00477 
00478 /*--------------------------------------------------------------------------*/
00479 

Generated on Tue Apr 13 15:48:07 2010 for DNX by  doxygen 1.5.6