00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00034 #include "dnxTimer.h"
00035
00036 #include "dnxNebMain.h"
00037 #include "dnxError.h"
00038 #include "dnxDebug.h"
00039 #include "dnxSrvProt.h"
00040 #include "dnxJobList.h"
00041 #include "dnxLogging.h"
00042 #include "dnxSleep.h"
00043
00044 #if HAVE_CONFIG_H
00045 # include "config.h"
00046 #endif
00047
00048 #include <stdio.h>
00049 #include <stdlib.h>
00050 #include <string.h>
00051 #include <time.h>
00052 #include <assert.h>
00053
00054 #define DNX_DEF_TIMER_SLEEP 5000
00055 #define MAX_EXPIRED 10
00058 typedef struct iDnxTimer_
00059 {
00060 DnxJobList * joblist;
00061 pthread_t tid;
00062 int sleepms;
00063 } iDnxTimer;
00064
00065
00066
00067
00068
00077 static void dnxTimerCleanup(void * data)
00078 {
00079 iDnxTimer * itimer = (iDnxTimer *)data;
00080 assert(data);
00081 }
00082
00083
00084
00092 static void * dnxTimer(void * data)
00093 {
00094 iDnxTimer * itimer = (iDnxTimer *)data;
00095 DnxNewJob ExpiredList[MAX_EXPIRED];
00096 int i, totalExpired;
00097 int ret = 0;
00098
00099 assert(data);
00100
00101 pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, 0);
00102 pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, 0);
00103 pthread_cleanup_push(dnxTimerCleanup, data);
00104
00105 dnxLog("dnxTimer[%lx]: Watching for expired jobs...", pthread_self());
00106
00107 while (1)
00108 {
00109 pthread_testcancel();
00110
00111 dnxCancelableSleep(itimer->sleepms);
00112
00113
00114 totalExpired = MAX_EXPIRED;
00115 if ((ret = dnxJobListExpire(itimer->joblist, ExpiredList,
00116 &totalExpired)) == DNX_OK && totalExpired > 0)
00117 {
00118 for (i = 0; i < totalExpired; i++)
00119 {
00120 char msg[128];
00121 char addrstr[DNX_MAX_ADDRSTR];
00122 DnxNewJob * job = &ExpiredList[i];
00123
00124 dnxDebug(1, "dnxTimer[%lx]: Expiring Job [%lu,%lu]: %s.",
00125 pthread_self(), job->xid.objSerial, job->xid.objSlot, job->cmd);
00126
00127 dnxAuditJob(job, "EXPIRE");
00128
00129 dnxNtop(job->pNode->address, addrstr, sizeof addrstr);
00130
00131
00132 sprintf(msg, "(DNX: Service Check [%lu,%lu] Timed Out - "
00133 "Node: %s - Failed to return job response in time allowed)",
00134 job->xid.objSerial, job->xid.objSlot, addrstr);
00135
00136
00137
00138
00139
00140
00141
00142
00143
00144 dnxDebug(2, msg);
00145
00146
00147 ret = dnxPostResult(job->payload, job->start_time,
00148 time(0) - job->start_time, 1, 0, msg);
00149
00150 dnxJobCleanup(job);
00151 }
00152 }
00153
00154 if (totalExpired > 0 || ret != DNX_OK)
00155 dnxDebug(2, "dnxTimer[%lx]: Expired job count: %d Retcode=%d: %s.",
00156 pthread_self(), totalExpired, ret, dnxErrorString(ret));
00157 }
00158
00159 dnxLog("dnxTimer[%lx]: Terminating: %s.", pthread_self(), dnxErrorString(ret));
00160
00161 pthread_cleanup_pop(1);
00162 return 0;
00163 }
00164
00165
00166
00167
00168
00169 int dnxTimerCreate(DnxJobList * joblist, int sleeptime, DnxTimer ** ptimer)
00170 {
00171 iDnxTimer * itimer;
00172 int ret;
00173
00174 assert(joblist && ptimer);
00175
00176
00177 if (sleeptime < 100 || sleeptime > 300000)
00178 sleeptime = DNX_DEF_TIMER_SLEEP;
00179
00180 if ((itimer = (iDnxTimer *)xmalloc(sizeof *itimer)) == 0)
00181 return DNX_ERR_MEMORY;
00182
00183
00184 memset(itimer, 0, sizeof *itimer);
00185 itimer->joblist = joblist;
00186 itimer->sleepms = sleeptime;
00187
00188
00189 if ((ret = pthread_create(&itimer->tid, 0, dnxTimer, itimer)) != 0)
00190 {
00191 dnxLog("Timer thread creation failed: %s.", dnxErrorString(ret));
00192 xfree(itimer);
00193 return DNX_ERR_THREAD;
00194 }
00195
00196 *ptimer = (DnxTimer *)itimer;
00197
00198 return DNX_OK;
00199 }
00200
00201
00202
00203 void dnxTimerDestroy(DnxTimer * timer)
00204 {
00205 iDnxTimer * itimer = (iDnxTimer *)timer;
00206
00207 pthread_cancel(itimer->tid);
00208 pthread_join(itimer->tid, 0);
00209
00210 xfree(itimer);
00211 }
00212
00213
00214
00215
00216
00217
00218
00219
00220
00221
00222
00223
00224
00225
00226 #ifdef DNX_TIMER_TEST
00227
00228 #include "utesthelp.h"
00229 #include <time.h>
00230
00231 #define elemcount(x) (sizeof(x)/sizeof(*(x)))
00232
00233 static int verbose;
00234 static DnxNewJob fakejob;
00235 static DnxJobList fakejoblist;
00236 static int fakepayload;
00237 static DnxNodeRequest fakenode;
00238 static int entered_dnxJobListExpire;
00239
00240
00241 IMPLEMENT_DNX_SYSLOG(verbose);
00242 IMPLEMENT_DNX_DEBUG(verbose);
00243
00244 int dnxJobListExpire(DnxJobList * pJobList, DnxNewJob * pExpiredJobs, int * totalJobs)
00245 {
00246 CHECK_TRUE(pJobList == &fakejoblist);
00247 CHECK_TRUE(*totalJobs > 0);
00248 memcpy(&pExpiredJobs[0], &fakejob, sizeof *pExpiredJobs);
00249 *totalJobs = 1;
00250 entered_dnxJobListExpire = 1;
00251 return 0;
00252 }
00253
00254 int dnxPostResult(void * payload, time_t start_time, unsigned delta,
00255 int early_timeout, int res_code, char * res_data)
00256 {
00257 CHECK_TRUE(payload == &fakepayload);
00258 CHECK_TRUE(start_time == 100);
00259 CHECK_TRUE(early_timeout != 0);
00260 CHECK_TRUE(res_code == 0);
00261 CHECK_ZERO(memcmp(res_data, "(DNX: Service", 13));
00262 return 0;
00263 }
00264
00265 int dnxAuditJob(DnxNewJob * pJob, char * action) { return 0; }
00266
00267 void dnxJobCleanup(DnxNewJob * pJob) { }
00268
00269 char * dnxNtop(const void * sastr, char * buf, size_t bufsz) { return buf; }
00270
00271 int main(int argc, char ** argv)
00272 {
00273 DnxTimer * timer;
00274 iDnxTimer * itimer;
00275
00276 verbose = argc > 1? 1: 0;
00277
00278
00279 fakenode.xid.objType = DNX_OBJ_JOB;
00280 fakenode.xid.objSerial = 1;
00281 fakenode.xid.objSlot = 2;
00282 fakenode.reqType = DNX_REQ_DEREGISTER;
00283 fakenode.jobCap = 1;
00284 fakenode.ttl = 2;
00285 fakenode.expires = 3;
00286 strcpy(fakenode.address, "fake address");
00287
00288 fakejob.state = DNX_JOB_INPROGRESS;
00289 fakejob.xid.objType = DNX_OBJ_JOB;
00290 fakejob.xid.objSerial = 1;
00291 fakejob.xid.objSlot = 2;
00292 fakejob.cmd = "fake command line";
00293 fakejob.start_time = 100;
00294 fakejob.timeout = 10;
00295 fakejob.expires = fakejob.start_time + fakejob.timeout;
00296 fakejob.payload = &fakepayload;
00297 fakejob.pNode = &fakenode;
00298
00299 entered_dnxJobListExpire = 0;
00300
00301
00302 CHECK_ZERO(dnxTimerCreate(&fakejoblist, 100, &timer));
00303 itimer = (iDnxTimer *)timer;
00304
00305
00306 CHECK_TRUE(itimer->joblist == &fakejoblist);
00307 CHECK_TRUE(itimer->tid != 0);
00308 CHECK_TRUE(itimer->sleepms == 100);
00309
00310
00311 while (!entered_dnxJobListExpire)
00312 dnxCancelableSleep(10);
00313
00314
00315 dnxTimerDestroy(timer);
00316
00317 return 0;
00318 }
00319
00320 #endif
00321
00322
00323