dnxJobList.c

Go to the documentation of this file.
00001 /*--------------------------------------------------------------------------
00002 
00003    Copyright (c) 2006-2010, Intellectual Reserve, Inc. All rights reserved.
00004 
00005    This program is free software; you can redistribute it and/or modify
00006    it under the terms of the GNU General Public License version 2 as
00007    published by the Free Software Foundation.
00008 
00009    This program is distributed in the hope that it will be useful,
00010    but WITHOUT ANY WARRANTY; without even the implied warranty of
00011    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00012    GNU General Public License for more details.
00013 
00014    You should have received a copy of the GNU General Public License
00015    along with this program; if not, write to the Free Software
00016    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00017 
00018   --------------------------------------------------------------------------*/
00019 
00028 #include "dnxJobList.h"
00029 
00030 #include "dnxError.h"
00031 #include "dnxDebug.h"
00032 #include "dnxLogging.h"
00033 #include "dnxTimer.h"
00034 
00035 #include <sys/time.h>
00036 
00037 #define DNX_JOBLIST_TIMEOUT   5     
00038 #define DNX_TIMER_SLEEP       5000  
00040 DnxJobList * joblist; // Fwd declaration
00041 
00043 typedef struct iDnxJobList_
00044 {
00045    DnxNewJob * list;       
00046    unsigned long size;     
00047    unsigned long head;     
00048    unsigned long tail;     
00049    unsigned long dhead;    
00050    pthread_mutex_t mut;    
00051    pthread_cond_t cond;    
00052    DnxTimer * timer;       
00053 } iDnxJobList;
00054 
00055 /*--------------------------------------------------------------------------
00056                                  INTERFACE
00057   --------------------------------------------------------------------------*/
00058 
00059 int dnxJobListAdd(DnxJobList * pJobList, DnxNewJob * pJob)
00060 {
00061    iDnxJobList * ilist = (iDnxJobList *)pJobList;
00062    unsigned long tail;
00063    int ret = DNX_OK;
00064 
00065    assert(pJobList && pJob);
00066 
00067    DNX_PT_MUTEX_LOCK(&ilist->mut);
00068 
00069    tail = ilist->tail;
00070 
00071    // verify space in the job list
00072    if (ilist->list[tail].state && (tail = (tail + 1) % ilist->size) == ilist->head)
00073    {
00074       dnxLog("dnxJobListAdd: Out of job slots (max=%lu): %s.",
00075             ilist->size, pJob->cmd);
00076       ret = DNX_ERR_CAPACITY;
00077    }
00078    else
00079    {
00080       // add the slot index to the Job's XID - this allows us to index
00081       //    the job list using the returned result's XID.objSlot field
00082       pJob->xid.objSlot = tail;
00083       pJob->state = DNX_JOB_PENDING;
00084 
00085       // add this job to the job list
00086       memcpy(&ilist->list[tail], pJob, sizeof *pJob);
00087 
00088       // update dispatch head index
00089       if (ilist->list[ilist->tail].state != DNX_JOB_PENDING)
00090          ilist->dhead = tail;
00091 
00092       ilist->tail = tail;
00093 
00094       dnxDebug(8, "dnxJobListAdd: Job [%lu,%lu]: Head=%lu, DHead=%lu, Tail=%lu.",
00095             pJob->xid.objSerial, pJob->xid.objSlot, ilist->head, ilist->dhead,
00096             ilist->tail);
00097 
00098       pthread_cond_signal(&ilist->cond);  // signal that a new job is available
00099    }
00100 
00101    DNX_PT_MUTEX_UNLOCK(&ilist->mut);
00102 
00103    return ret;
00104 }
00105 
00106 //----------------------------------------------------------------------------
00107 
00108 int dnxJobListExpire(DnxJobList * pJobList, DnxNewJob * pExpiredJobs,
00109       int * totalJobs)
00110 {
00111    iDnxJobList * ilist = (iDnxJobList *)pJobList;
00112    unsigned long current;
00113    DnxNewJob * pJob;
00114    int jobCount;
00115    time_t now;
00116 
00117    assert(pJobList && pExpiredJobs && totalJobs && *totalJobs > 0);
00118 
00119    DNX_PT_MUTEX_LOCK(&ilist->mut);
00120 
00121    // get the current time (after we acquire the lock! In case we had to wait)
00122    now = time(0);
00123 
00124    // walk the entire job list - InProgress and Pending jobs (in that order)
00125    current = ilist->head;
00126    jobCount = 0;
00127    while (jobCount < *totalJobs)
00128    {
00129       // only examine jobs that are either awaiting dispatch or results
00130       if ((pJob = &ilist->list[current])->state == DNX_JOB_INPROGRESS
00131             || pJob->state == DNX_JOB_PENDING)
00132       {
00133          // check the job's expiration stamp
00134          if (pJob->expires > now)
00135             break;   // Bail-out: this job hasn't expired yet
00136 
00137          // job has expired - add it to the expired job list
00138          memcpy(&pExpiredJobs[jobCount], pJob, sizeof(DnxNewJob));
00139 
00140          pJob->state = DNX_JOB_NULL;
00141 
00142          jobCount++;
00143       }
00144 
00145       // bail-out if this was the job list tail
00146       if (current == ilist->tail)
00147          break;
00148 
00149       // increment the job list index
00150       current = (current + 1) % ilist->size;
00151    }
00152 
00153    ilist->head = current;
00154 
00155    // if this job is awaiting dispatch, then it is the new dispatch head
00156    if (ilist->list[current].state != DNX_JOB_INPROGRESS)
00157       ilist->dhead = current;
00158 
00159    // update the total jobs in the expired job list
00160    *totalJobs = jobCount;
00161 
00162    DNX_PT_MUTEX_UNLOCK(&ilist->mut);
00163 
00164    return DNX_OK;
00165 }
00166 
00167 //----------------------------------------------------------------------------
00168 
00169 int dnxJobListDispatch(DnxJobList * pJobList, DnxNewJob * pJob)
00170 {
00171    iDnxJobList * ilist = (iDnxJobList *)pJobList;
00172    unsigned long current;
00173    int ret = 0;
00174 
00175    assert(pJobList && pJob);
00176 
00177    DNX_PT_MUTEX_LOCK(&ilist->mut);
00178 
00183    // start at current dispatch head
00184    current = ilist->dhead;
00185 
00186    // see if we have a pending job
00187    while (ilist->list[current].state != DNX_JOB_PENDING)
00188    {
00189       struct timeval now;
00190       struct timespec timeout;
00191 
00192       gettimeofday(&now, 0);
00193       timeout.tv_sec = now.tv_sec + DNX_JOBLIST_TIMEOUT;
00194       timeout.tv_nsec = now.tv_usec * 1000;
00195 
00196       dnxDebug(8, "dnxJobListDispatch: BEFORE: Head=%lu, DHead=%lu, Tail=%lu.",
00197             ilist->head, ilist->dhead, ilist->tail);
00198 
00199       // wait on the condition variable if there are no pending jobs
00200       if ((ret = pthread_cond_timedwait(&ilist->cond, &ilist->mut,
00201             &timeout)) == ETIMEDOUT)
00202          break;
00203 
00204       current = ilist->dhead;
00205    }
00206 
00207    if (ret == 0)
00208    {
00209       // transition this job's state to InProgress
00210       ilist->list[current].state = DNX_JOB_INPROGRESS;
00211 
00212       // make a copy for the Dispatcher
00213       memcpy(pJob, &ilist->list[current], sizeof *pJob);
00214 
00215       // update the dispatch head
00216       if (ilist->dhead != ilist->tail)
00217          ilist->dhead = (current + 1) % ilist->size;
00218 
00219       dnxDebug(8, "dnxJobListDispatch: AFTER: Job [%lu,%lu]; Head=%lu, DHead=%lu, Tail=%lu.",
00220             pJob->xid.objSerial, pJob->xid.objSlot, ilist->head, ilist->dhead, ilist->tail);
00221    }
00222 
00223    DNX_PT_MUTEX_UNLOCK(&ilist->mut);
00224 
00225    return ret;
00226 }
00227 
00228 //----------------------------------------------------------------------------
00229 
00230 int dnxJobListCollect(DnxJobList * pJobList, DnxXID * pxid, DnxNewJob * pJob)
00231 {
00232    iDnxJobList * ilist = (iDnxJobList *)pJobList;
00233    unsigned long current;
00234    int ret = DNX_OK;
00235 
00236    assert(pJobList && pxid && pJob);   // parameter validation
00237 
00238    current = pxid->objSlot;
00239 
00240    assert(current < ilist->size);
00241    if (current >= ilist->size)         // runtime validation requires check
00242       return DNX_ERR_INVALID;          // corrupt client network message
00243 
00244    DNX_PT_MUTEX_LOCK(&ilist->mut);
00245 
00246    dnxDebug(8, "dnxJobListCollect: Compare job [%lu,%lu] to job [%lu,%lu]: "
00247          "Head=%lu, DHead=%lu, Tail=%lu.", pxid->objSerial, pxid->objSlot,
00248          ilist->list[current].xid.objSerial, ilist->list[current].xid.objSlot,
00249          ilist->head, ilist->dhead, ilist->tail);
00250 
00251    // verify that the XID of this result matches the XID of the service check
00252    if (ilist->list[current].state == DNX_JOB_NULL
00253          || !dnxEqualXIDs(pxid, &ilist->list[current].xid))
00254       ret = DNX_ERR_NOTFOUND;          // job expired; removed by the timer
00255    else
00256    {
00257       // make a copy for the Collector
00258       memcpy(pJob, &ilist->list[current], sizeof *pJob);
00259       pJob->state = DNX_JOB_COMPLETE;
00260 
00261       // dequeue this job; make slot available for another job
00262       ilist->list[current].state = DNX_JOB_NULL;
00263 
00264       // update the job list head
00265       if (current == ilist->head && current != ilist->tail)
00266          ilist->head = (current + 1) % ilist->size;
00267    }
00268 
00269    DNX_PT_MUTEX_UNLOCK(&ilist->mut);
00270 
00271    return ret;
00272 }
00273 
00274 //----------------------------------------------------------------------------
00275 
00276 int dnxJobListCreate(unsigned size, DnxJobList ** ppJobList)
00277 {
00278    iDnxJobList * ilist;
00279    int ret;
00280 
00281    assert(ppJobList && size);
00282 
00283    if ((ilist = (iDnxJobList *)xmalloc(sizeof *ilist)) == 0)
00284       return DNX_ERR_MEMORY;
00285    memset(ilist, 0, sizeof *ilist);
00286 
00287    if ((ilist->list = (DnxNewJob *)xmalloc(sizeof *ilist->list * size)) == 0)
00288    {
00289       xfree(ilist);
00290       return DNX_ERR_MEMORY;
00291    }
00292    memset(ilist->list, 0, sizeof *ilist->list * size);
00293 
00294    ilist->size = size;
00295 
00296    DNX_PT_MUTEX_INIT(&ilist->mut);
00297    pthread_cond_init(&ilist->cond, 0);
00298 
00299    if ((ret = dnxTimerCreate((DnxJobList *)ilist, DNX_TIMER_SLEEP,
00300          &ilist->timer)) != 0)
00301    {
00302       DNX_PT_COND_DESTROY(&ilist->cond);
00303       DNX_PT_MUTEX_DESTROY(&ilist->mut);
00304       xfree(ilist->list);
00305       xfree(ilist);
00306       return ret;
00307    }
00308 
00309    *ppJobList = (DnxJobList *)ilist;
00310 
00311    return DNX_OK;
00312 }
00313 
00314 //----------------------------------------------------------------------------
00315 
00316 void dnxJobListDestroy(DnxJobList * pJobList)
00317 {
00318    iDnxJobList * ilist = (iDnxJobList *)pJobList;
00319 
00320    assert(pJobList);
00321 
00322    dnxTimerDestroy(ilist->timer);
00323 
00324    DNX_PT_COND_DESTROY(&ilist->cond);
00325    DNX_PT_MUTEX_DESTROY(&ilist->mut);
00326 
00327    xfree(ilist->list);
00328    xfree(ilist);
00329 }
00330 
00331 /*--------------------------------------------------------------------------
00332                                  UNIT TEST
00333 
00334    From within dnx/server, compile with GNU tools using this command line:
00335 
00336       gcc -DDEBUG -DDNX_JOBLIST_TEST -g -O0 -I../common dnxJobList.c \
00337          ../common/dnxError.c -lpthread -lgcc_s -lrt -o dnxJobListTest
00338 
00339   --------------------------------------------------------------------------*/
00340 
00341 #ifdef DNX_JOBLIST_TEST
00342 
00343 #include "utesthelp.h"
00344 #include <time.h>
00345 
00346 #define elemcount(x) (sizeof(x)/sizeof(*(x)))
00347 
00348 static int verbose;
00349 
00350 // functional stubs
00351 IMPLEMENT_DNX_DEBUG(verbose);
00352 IMPLEMENT_DNX_SYSLOG(verbose);
00353 
00354 int dnxTimerCreate(DnxJobList * jl, int s, DnxTimer ** pt) { *pt = 0; return 0; }
00355 void dnxTimerDestroy(DnxTimer * t) { }
00356 
00357 int dnxEqualXIDs(DnxXID * pxa, DnxXID * pxb)
00358       { return pxa->objType == pxb->objType && pxa->objSerial == pxb->objSerial
00359             && pxa->objSlot == pxb->objSlot; }
00360 
00361 int dnxMakeXID(DnxXID * x, DnxObjType t, unsigned long s, unsigned long l)
00362       { x->objType = t; x->objSerial = s; x->objSlot = l; return DNX_OK; }
00363 
00364 int main(int argc, char ** argv)
00365 {
00366    DnxJobList * jobs;
00367    DnxNodeRequest n1[101];
00368    DnxNewJob j1[101];
00369    DnxNewJob jtmp;
00370    DnxXID xid;
00371    int serial, xlsz, expcount;
00372    iDnxJobList * ijobs;
00373    time_t now;
00374 
00375    verbose = argc > 1;
00376 
00377    // create a new job list and get a concrete reference to it for testing
00378    CHECK_ZERO(dnxJobListCreate(100, &jobs));
00379    ijobs = (iDnxJobList *)jobs;
00380 
00381    // force entire array to non-zero values for testing
00382    memset(j1, 0xcc, sizeof j1);
00383    memset(n1, 0xdd, sizeof n1);
00384 
00385    // test that we CAN add 100 jobs to the 100-job list
00386    now = time(0);
00387    for (serial = 0; serial < elemcount(j1); serial++)
00388    {
00389       // configure request node
00390       dnxMakeXID(&n1[serial].xid, DNX_OBJ_WORKER, serial, 0);
00391       n1[serial].reqType      = DNX_REQ_REGISTER;
00392       n1[serial].jobCap       = 1;     // jobs
00393       n1[serial].ttl          = 5;     // seconds
00394       n1[serial].expires      = 5;     // seconds
00395       strcpy(n1[serial].address, "localhost");
00396 
00397       // configure job
00398       dnxMakeXID(&j1[serial].xid, DNX_OBJ_JOB, serial, 0);
00399       j1[serial].cmd          = "some command line";
00400       j1[serial].start_time   = now;
00401       j1[serial].timeout      = serial < 50? 0: 5;  // 50 expire immediately
00402       j1[serial].expires      = j1[serial].start_time + j1[serial].timeout;
00403       j1[serial].payload      = 0;     // no payload for tests
00404       j1[serial].pNode        = &n1[serial];
00405 
00406       if (serial < 100)
00407          CHECK_ZERO(dnxJobListAdd(jobs, &j1[serial]));
00408       else  // test that we CAN'T add 101 jobs
00409          CHECK_NONZERO(dnxJobListAdd(jobs, &j1[serial]));
00410    }
00411 
00412    // test job expiration - ensure first 50 jobs have already expired
00413    expcount = 0;
00414    do
00415    {
00416       DnxNewJob xl[10];
00417       xlsz = (int)elemcount(xl);
00418       CHECK_ZERO(dnxJobListExpire(jobs, xl, &xlsz));
00419       expcount += xlsz;
00420    } while (xlsz != 0);
00421    CHECK_TRUE(expcount == 50);
00422 
00423    // dispatch 49 of the remaining 50 jobs
00424    for (serial = 50; serial < elemcount(j1) - 2; serial++)
00425       CHECK_ZERO(dnxJobListDispatch(jobs, &jtmp));
00426 
00427    // ensure the dispatch head is valid and not in progress
00428    CHECK_TRUE(ijobs->dhead != 0);
00429    CHECK_TRUE(ijobs->list[ijobs->dhead].state != DNX_JOB_INPROGRESS);
00430    CHECK_TRUE(ijobs->list[ijobs->head].state == DNX_JOB_INPROGRESS);
00431 
00432    // ensure dispatch head points to last item in list
00433    CHECK_TRUE(ijobs->dhead == 99);
00434 
00435    // collect all pending jobs
00436    for (serial = 50; serial < elemcount(j1) - 2; serial++)
00437    {
00438       dnxMakeXID(&xid, DNX_OBJ_JOB, serial, serial);
00439       CHECK_ZERO(dnxJobListCollect(jobs, &xid, &jtmp));
00440    }
00441 
00442    // ensure there's one left
00443    CHECK_TRUE(ijobs->head == ijobs->tail);
00444    CHECK_TRUE(ijobs->head != 0);
00445 
00446    // ensure head, tail and dhead all point to the last element (99)
00447    CHECK_TRUE(ijobs->head == 99);
00448    CHECK_TRUE(ijobs->tail == 99);
00449    CHECK_TRUE(ijobs->dhead == 99);
00450 
00451    dnxJobListDestroy(jobs);
00452 
00453    return 0;
00454 }
00455 
00456 #endif   /* DNX_JOBLIST_TEST */
00457 
00458 /*--------------------------------------------------------------------------*/
00459 

Generated on Tue Apr 13 15:48:07 2010 for DNX by  doxygen 1.5.6