00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00034 #include "dnxTimer.h"
00035
00036 #include "dnxServerMain.h"
00037 #include "dnxError.h"
00038 #include "dnxDebug.h"
00039 #include "dnxProtocol.h"
00040 #include "dnxJobList.h"
00041 #include "dnxLogging.h"
00042 #include "dnxSleep.h"
00043 #include "dnxStats.h"
00044
00045 #if HAVE_CONFIG_H
00046 # include "config.h"
00047 #endif
00048
00049 #include <stdio.h>
00050 #include <stdlib.h>
00051 #include <string.h>
00052 #include <time.h>
00053 #include <assert.h>
00054
00055 #define DNX_DEF_TIMER_SLEEP 5000
00056 #define MAX_EXPIRED 10
00059 typedef struct iDnxTimer_
00060 {
00061 DnxJobList * joblist;
00062 pthread_t tid;
00063 int sleepms;
00064 } iDnxTimer;
00065
00066
00067
00068
00069
00078 static void dnxTimerCleanup(void * data)
00079 {
00080 iDnxTimer * itimer = (iDnxTimer *)data;
00081 assert(data);
00082 }
00083
00084
00085
00093 static void * dnxTimer(void * data)
00094 {
00095 iDnxTimer * itimer = (iDnxTimer *)data;
00096 DnxNewJob ExpiredList[MAX_EXPIRED];
00097 int i, totalExpired;
00098 int ret = 0;
00099
00100 assert(data);
00101
00102 pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, 0);
00103 pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, 0);
00104 pthread_cleanup_push(dnxTimerCleanup, data);
00105
00106 dnxLog("dnxTimer[%lx]: Watching for expired jobs...", pthread_self());
00107
00108 while (1)
00109 {
00110 pthread_testcancel();
00111
00112 dnxCancelableSleep(itimer->sleepms);
00113
00114
00115 totalExpired = MAX_EXPIRED;
00116 if ((ret = dnxJobListExpire(itimer->joblist, ExpiredList,
00117 &totalExpired)) == DNX_OK && totalExpired > 0)
00118 {
00119 for (i = 0; i < totalExpired; i++)
00120 {
00121 char msg[256];
00122 char addrstr[DNX_MAX_ADDRSTR];
00123 DnxNewJob * job = &ExpiredList[i];
00124
00125 dnxDebug(1, "dnxTimer[%lx]: Expiring Job [%lu,%lu]: %s.",
00126 pthread_self(), job->xid.objSerial, job->xid.objSlot, job->cmd);
00127
00128 dnxStatsInc(job->pNode->address, RESULTS_TIMED_OUT);
00129 dnxAuditJob(job, "EXPIRE");
00130
00131
00132 snprintf(msg, sizeof msg,
00133 "(DNX: Service Check [%lu,%lu] Timed Out - "
00134 "Node: %s - Failed to return job response in time allowed)",
00135 job->xid.objSerial, job->xid.objSlot, addrstr);
00136
00137
00138
00139
00140
00141
00142 dnxDebug(2, msg);
00143
00144
00145 ret = dnxPostResult(job->payload, job->xid.objSerial, job->start_time,
00146 time(0) - job->start_time, 1, 0, msg);
00147 dnxJobCleanup(job);
00148 }
00149 }
00150
00151 if (totalExpired > 0 || ret != DNX_OK)
00152 dnxDebug(2, "dnxTimer[%lx]: Expired job count: %d Retcode=%d: %s.",
00153 pthread_self(), totalExpired, ret, dnxErrorString(ret));
00154 }
00155
00156 dnxLog("dnxTimer[%lx]: Terminating: %s.", pthread_self(), dnxErrorString(ret));
00157
00158 pthread_cleanup_pop(1);
00159 return 0;
00160 }
00161
00162
00163
00164
00165
00166 int dnxTimerCreate(DnxJobList * joblist, int sleeptime, DnxTimer ** ptimer)
00167 {
00168 iDnxTimer * itimer;
00169 int ret;
00170
00171 assert(joblist && ptimer);
00172
00173
00174 if (sleeptime < 100 || sleeptime > 300000)
00175 sleeptime = DNX_DEF_TIMER_SLEEP;
00176
00177 if ((itimer = (iDnxTimer *)xmalloc(sizeof *itimer)) == 0)
00178 return DNX_ERR_MEMORY;
00179
00180
00181 memset(itimer, 0, sizeof *itimer);
00182 itimer->joblist = joblist;
00183 itimer->sleepms = sleeptime;
00184
00185
00186 if ((ret = pthread_create(&itimer->tid, 0, dnxTimer, itimer)) != 0)
00187 {
00188 dnxLog("Timer thread creation failed: %s.", dnxErrorString(ret));
00189 xfree(itimer);
00190 return DNX_ERR_THREAD;
00191 }
00192
00193 *ptimer = (DnxTimer *)itimer;
00194
00195 return DNX_OK;
00196 }
00197
00198
00199
00200 void dnxTimerDestroy(DnxTimer * timer)
00201 {
00202 iDnxTimer * itimer = (iDnxTimer *)timer;
00203
00204 pthread_cancel(itimer->tid);
00205 pthread_join(itimer->tid, 0);
00206
00207 xfree(itimer);
00208 }
00209
00210
00211
00212
00213
00214
00215
00216
00217
00218
00219
00220
00221
00222
00223 #ifdef DNX_TIMER_TEST
00224
00225 #include "utesthelp.h"
00226 #include <time.h>
00227
00228 #define elemcount(x) (sizeof(x)/sizeof(*(x)))
00229
00230 static int verbose;
00231 static DnxNewJob fakejob;
00232 static DnxJobList fakejoblist;
00233 static int fakepayload;
00234 static DnxNodeRequest fakenode;
00235 static int entered_dnxJobListExpire;
00236
00237
00238 IMPLEMENT_DNX_SYSLOG(verbose);
00239 IMPLEMENT_DNX_DEBUG(verbose);
00240
00241 int dnxJobListExpire(DnxJobList * pJobList, DnxNewJob * pExpiredJobs, int * totalJobs)
00242 {
00243 CHECK_TRUE(pJobList == &fakejoblist);
00244 CHECK_TRUE(*totalJobs > 0);
00245 memcpy(&pExpiredJobs[0], &fakejob, sizeof *pExpiredJobs);
00246 *totalJobs = 1;
00247 entered_dnxJobListExpire = 1;
00248 return 0;
00249 }
00250
00251 int dnxPostResult(void * payload, unsigned long serial,
00252 time_t start_time, unsigned delta, int early_timeout,
00253 int res_code, char * res_data)
00254 {
00255 CHECK_TRUE(payload == &fakepayload);
00256 CHECK_TRUE(start_time == 100);
00257 CHECK_TRUE(early_timeout != 0);
00258 CHECK_TRUE(res_code == 0);
00259 CHECK_ZERO(memcmp(res_data, "(DNX: Service", 13));
00260 return 0;
00261 }
00262
00263 int dnxAuditJob(DnxNewJob * pJob, char * action) { return 0; }
00264
00265 void dnxJobCleanup(DnxNewJob * pJob) { }
00266
00267 void dnxStatsInc(char * addr, DnxStatsIndex member) {}
00268
00269 int main(int argc, char ** argv)
00270 {
00271 DnxTimer * timer;
00272 iDnxTimer * itimer;
00273
00274 verbose = argc > 1? 1: 0;
00275
00276
00277 fakenode.xid.objType = DNX_OBJ_JOB;
00278 fakenode.xid.objSerial = 1;
00279 fakenode.xid.objSlot = 2;
00280 fakenode.reqType = DNX_REQ_DEREGISTER;
00281 fakenode.jobCap = 1;
00282 fakenode.ttl = 2;
00283 fakenode.expires = 3;
00284 strcpy(fakenode.address, "fake address");
00285
00286 fakejob.state = DNX_JOB_INPROGRESS;
00287 fakejob.xid.objType = DNX_OBJ_JOB;
00288 fakejob.xid.objSerial = 1;
00289 fakejob.xid.objSlot = 2;
00290 fakejob.cmd = "fake command line";
00291 fakejob.start_time = 100;
00292 fakejob.timeout = 10;
00293 fakejob.expires = fakejob.start_time + fakejob.timeout;
00294 fakejob.payload = &fakepayload;
00295 fakejob.pNode = &fakenode;
00296
00297 entered_dnxJobListExpire = 0;
00298
00299
00300 CHECK_ZERO(dnxTimerCreate(&fakejoblist, 100, &timer));
00301 itimer = (iDnxTimer *)timer;
00302
00303
00304 CHECK_TRUE(itimer->joblist == &fakejoblist);
00305 CHECK_TRUE(itimer->tid != 0);
00306 CHECK_TRUE(itimer->sleepms == 100);
00307
00308
00309 while (!entered_dnxJobListExpire)
00310 dnxCancelableSleep(10);
00311
00312
00313 dnxTimerDestroy(timer);
00314
00315 return 0;
00316 }
00317
00318 #endif
00319
00320
00321