00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00035 #include "dnxDispatcher.h"
00036
00037 #include "dnxError.h"
00038 #include "dnxDebug.h"
00039 #include "dnxSrvProt.h"
00040 #include "dnxXml.h"
00041 #include "dnxNebMain.h"
00042 #include "dnxRegistrar.h"
00043 #include "dnxJobList.h"
00044 #include "dnxLogging.h"
00045 #include "dnxStats.h"
00046
00047 #include <sys/socket.h>
00048 #include <netinet/in.h>
00049 #include <assert.h>
00050
00052 typedef struct iDnxDispatcher_
00053 {
00054 char * chname;
00055 char * url;
00056 DnxJobList * joblist;
00057 DnxChannel * channel;
00058 pthread_t tid;
00059 } iDnxDispatcher;
00060
00061
00062
00063
00064
00074 static int dnxSendJobMsg(iDnxDispatcher * idisp, DnxNewJob * pSvcReq, DnxNodeRequest * pNode)
00075 {
00076 struct sockaddr * sin = (struct sockaddr *)pNode->address;
00077 pthread_t tid = pthread_self();
00078 DnxJob job;
00079 int ret;
00080
00081 dnxDebug(2, "dnxDispatcher[%lx]: Dispatching job [%lu,%lu] (%s) to node %u.%u.%u.%u.",
00082 tid, pSvcReq->xid.objSerial, pSvcReq->xid.objSlot, pSvcReq->cmd,
00083 sin->sa_data[2], sin->sa_data[3], sin->sa_data[4], sin->sa_data[5]);
00084
00085 memset(&job, 0, sizeof job);
00086 job.xid = pSvcReq->xid;
00087 job.state = DNX_JOB_PENDING;
00088 job.priority = 1;
00089 job.timeout = pSvcReq->timeout;
00090 job.cmd = pSvcReq->cmd;
00091
00092 if ((ret = dnxSendJob(idisp->channel, &job, pNode->address)) != DNX_OK)
00093 dnxLog("Unable to send job [%lu,%lu] (%s) to worker node %u.%u.%u.%u: %s.",
00094 tid, pSvcReq->xid.objSerial, pSvcReq->xid.objSlot, pSvcReq->cmd,
00095 sin->sa_data[2], sin->sa_data[3], sin->sa_data[4], sin->sa_data[5],
00096 dnxErrorString(ret));
00097
00098 return ret;
00099 }
00100
00101
00102
00110 static int dnxDispatchJob(iDnxDispatcher * idisp, DnxNewJob * pSvcReq)
00111 {
00112 DnxNodeRequest * pNode = pSvcReq->pNode;
00113 int ret;
00114
00115 ret = dnxSendJobMsg(idisp, pSvcReq, pNode);
00116
00121 return ret;
00122 }
00123
00124
00125
00132 static void * dnxDispatcher(void * data)
00133 {
00134 iDnxDispatcher * idisp = (iDnxDispatcher *)data;
00135
00136 assert(data);
00137
00138 pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, 0);
00139 pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, 0);
00140
00141 dnxLog("Dispatcher awaiting jobs...");
00142
00143 while (1)
00144 {
00145 DnxNewJob svcReq;
00146 int ret;
00147
00148 pthread_testcancel();
00149
00150
00151 if ((ret = dnxJobListDispatch(idisp->joblist, &svcReq)) == DNX_OK)
00152 {
00153 if ((ret = dnxDispatchJob(idisp, &svcReq)) == DNX_OK)
00154 {
00155 dnxStatsInc(svcReq.pNode->address, DISPATCHES_OK);
00156 dnxAuditJob(&svcReq, "DISPATCH");
00157 }
00158 else
00159 {
00160 dnxStatsInc(svcReq.pNode->address, DISPATCHES_FAILED);
00161 dnxAuditJob(&svcReq, "DISPATCH-FAIL");
00162 }
00163 }
00164
00165 }
00166 return 0;
00167 }
00168
00169
00170
00171
00172
00173 DnxChannel * dnxDispatcherGetChannel(DnxDispatcher * disp)
00174 { return ((iDnxDispatcher *)disp)->channel; }
00175
00176
00177
00178 int dnxDispatcherCreate(char * chname, char * url, DnxJobList * joblist,
00179 DnxDispatcher ** pdisp)
00180 {
00181 iDnxDispatcher * idisp;
00182 int ret;
00183
00184 if ((idisp = (iDnxDispatcher *)xmalloc(sizeof *idisp)) == 0)
00185 return DNX_ERR_MEMORY;
00186
00187 memset(idisp, 0, sizeof *idisp);
00188 idisp->chname = xstrdup(chname);
00189 idisp->url = xstrdup(url);
00190 idisp->joblist = joblist;
00191
00192 if (!idisp->url || !idisp->chname)
00193 {
00194 xfree(idisp);
00195 return DNX_ERR_MEMORY;
00196 }
00197 if ((ret = dnxChanMapAdd(chname, url)) != DNX_OK)
00198 {
00199 dnxLog("dnxDispatcherCreate: dnxChanMapAdd(%s) failed: %s.",
00200 chname, dnxErrorString(ret));
00201 goto e1;
00202 }
00203 if ((ret = dnxConnect(chname, 0, &idisp->channel)) != DNX_OK)
00204 {
00205 dnxLog("dnxDispatcherCreate: dnxConnect(%s) failed: %s.",
00206 chname, dnxErrorString(ret));
00207 goto e2;
00208 }
00209
00210
00211 if ((ret = pthread_create(&idisp->tid, 0, dnxDispatcher, idisp)) != 0)
00212 {
00213 dnxLog("dnxDispatcherCreate: thread creation failed: %s.",
00214 dnxErrorString(ret));
00215 ret = DNX_ERR_THREAD;
00216 goto e3;
00217 }
00218
00219 *pdisp = (DnxDispatcher*)idisp;
00220
00221 return DNX_OK;
00222
00223
00224
00225 e3:dnxDisconnect(idisp->channel);
00226 e2:dnxChanMapDelete(idisp->chname);
00227 e1:xfree(idisp->url);
00228 xfree(idisp->chname);
00229 xfree(idisp);
00230
00231 return ret;
00232 }
00233
00234
00235
00236 void dnxDispatcherDestroy(DnxDispatcher * disp)
00237 {
00238 iDnxDispatcher * idisp = (iDnxDispatcher *)disp;
00239
00240 pthread_cancel(idisp->tid);
00241 pthread_join(idisp->tid, 0);
00242
00243 dnxDisconnect(idisp->channel);
00244 dnxChanMapDelete(idisp->chname);
00245
00246 xfree(idisp->url);
00247 xfree(idisp->chname);
00248 xfree(idisp);
00249 }
00250
00251
00252
00253
00254
00255
00256
00257
00258
00259
00260
00261
00262
00263
00264
00265
00266
00267
00268
00269
00270
00271
00272 #ifdef DNX_DISPATCHER_TEST
00273
00274 #include "utesthelp.h"
00275
00276 static int verbose;
00277 static int once = 0;
00278 static char * test_url = "udp://0.0.0.0:12489";
00279 static char * test_chname = "TestCollector";
00280 static DnxChannel * test_channel = (DnxChannel *)17;
00281 static DnxJobList * test_joblist = (DnxJobList *)1;
00282 static DnxNewJob test_job;
00283 static int test_payload;
00284 static DnxNodeRequest test_node;
00285
00286
00287 IMPLEMENT_DNX_DEBUG(verbose);
00288 IMPLEMENT_DNX_SYSLOG(verbose);
00289
00290 int dnxEqualXIDs(DnxXID * pxa, DnxXID * pxb)
00291 {
00292 return pxa->objType == pxb->objType
00293 && pxa->objSerial == pxb->objSerial
00294 && pxa->objSlot == pxb->objSlot;
00295 }
00296
00297 int dnxChanMapAdd(char * name, char * url)
00298 {
00299 CHECK_TRUE(name != 0);
00300 CHECK_TRUE(strcmp(name, test_chname) == 0);
00301 CHECK_TRUE(url != 0);
00302 CHECK_TRUE(strcmp(url, test_url) == 0);
00303 return 0;
00304 }
00305
00306 int dnxConnect(char * name, int active, DnxChannel ** channel)
00307 {
00308 *channel = test_channel;
00309 CHECK_TRUE(name != 0);
00310 CHECK_TRUE(strcmp(name, test_chname) == 0);
00311 CHECK_TRUE(active == 0);
00312 return 0;
00313 }
00314
00315 int dnxJobListDispatch(DnxJobList * pJobList, DnxNewJob * pJob)
00316 {
00317 CHECK_TRUE(pJobList == test_joblist);
00318 CHECK_TRUE(pJob != 0);
00319 memcpy(pJob, &test_job, sizeof *pJob);
00320
00321 once++;
00322
00323 return 0;
00324 }
00325
00326 int dnxSendJob(DnxChannel * channel, DnxJob * pJob, char * address)
00327 {
00328 CHECK_TRUE(channel != 0);
00329 CHECK_TRUE(pJob != 0);
00330
00331 CHECK_TRUE(dnxEqualXIDs(&pJob->xid, &test_job.xid));
00332 CHECK_TRUE(pJob->state == DNX_JOB_PENDING);
00333 CHECK_TRUE(pJob->priority == 1);
00334 CHECK_TRUE(pJob->timeout == test_job.timeout);
00335 CHECK_TRUE(pJob->cmd == test_job.cmd);
00336
00337 return 0;
00338 }
00339
00340 int dnxAuditJob(DnxNewJob * pJob, char * action)
00341 {
00342 CHECK_TRUE(pJob != 0);
00343 CHECK_TRUE(strcmp(action, "DISPATCH") == 0);
00344 return 0;
00345 }
00346
00347 void dnxDisconnect(DnxChannel * channel)
00348 {
00349 CHECK_TRUE(channel == test_channel);
00350 }
00351
00352 void dnxChanMapDelete(char * name)
00353 {
00354 CHECK_TRUE(name != 0);
00355 CHECK_TRUE(strcmp(name, test_chname) == 0);
00356 }
00357
00358 void dnxStatsInc(char * addr, DnxStatsIndex member) {}
00359
00360 int main(int argc, char ** argv)
00361 {
00362 DnxDispatcher * dp;
00363 iDnxDispatcher * idp;
00364
00365 verbose = argc > 1 ? 1 : 0;
00366
00367 memset(&test_node, 0, sizeof test_node);
00368 test_job.state = DNX_JOB_PENDING;
00369 memset(&test_job.xid, 1, sizeof test_job.xid);
00370 test_job.cmd = "test command";
00371 test_job.start_time = 1000;
00372 test_job.timeout = 5;
00373 test_job.expires = 5000;
00374 test_job.payload = &test_payload;
00375 test_job.pNode = &test_node;
00376
00377 CHECK_ZERO(dnxDispatcherCreate(test_chname, test_url, test_joblist, &dp));
00378
00379 idp = (iDnxDispatcher *)dp;
00380
00381 CHECK_TRUE(strcmp(idp->chname, test_chname) == 0);
00382 CHECK_TRUE(idp->joblist == test_joblist);
00383 CHECK_TRUE(idp->tid != 0);
00384 CHECK_TRUE(strcmp(idp->url, test_url) == 0);
00385
00386 CHECK_TRUE(dnxDispatcherGetChannel(dp) == idp->channel);
00387
00388 while (!once)
00389 dnxCancelableSleep(10);
00390
00391 dnxDispatcherDestroy(dp);
00392
00393 #ifdef DEBUG_HEAP
00394 CHECK_ZERO(dnxCheckHeap());
00395 #endif
00396
00397 return 0;
00398 }
00399
00400 #endif
00401
00402
00403