00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00035 #include "dnxDispatcher.h"
00036
00037 #include "dnxError.h"
00038 #include "dnxDebug.h"
00039 #include "dnxProtocol.h"
00040 #include "dnxXml.h"
00041 #include "dnxServerMain.h"
00042 #include "dnxRegistrar.h"
00043 #include "dnxJobList.h"
00044 #include "dnxLogging.h"
00045 #include "dnxStats.h"
00046
00047 #include <sys/socket.h>
00048 #include <netinet/in.h>
00049 #include <assert.h>
00050
00052 typedef struct iDnxDispatcher_
00053 {
00054 char * chname;
00055 char * url;
00056 DnxJobList * joblist;
00057 DnxChannel * channel;
00058 pthread_t tid;
00059 } iDnxDispatcher;
00060
00061
00062
00063
00064
00074 static int dnxSendJobMsg(iDnxDispatcher * idisp, DnxNewJob * pSvcReq,
00075 DnxNodeRequest * pNode)
00076 {
00077 pthread_t tid = pthread_self();
00078 DnxJob job;
00079 int ret;
00080
00081 dnxDebug(2, "dnxDispatcher[%lx]: Dispatching job [%lu,%lu] (%s) to node %s.",
00082 tid, pSvcReq->xid.objSerial, pSvcReq->xid.objSlot, pSvcReq->cmd,
00083 pNode->addrstr);
00084
00085 memset(&job, 0, sizeof job);
00086 job.xid = pSvcReq->xid;
00087 job.state = DNX_JOB_PENDING;
00088 job.priority = 1;
00089 job.timeout = pSvcReq->timeout;
00090 job.cmd = pSvcReq->cmd;
00091
00092 if ((ret = dnxSendJob(idisp->channel, &job, pNode->address)) != DNX_OK)
00093 dnxLog("Unable to send job [%lu,%lu] (%s) to worker node %s: %s.",
00094 tid, pSvcReq->xid.objSerial, pSvcReq->xid.objSlot, pSvcReq->cmd,
00095 pNode->addrstr, dnxErrorString(ret));
00096
00097 return ret;
00098 }
00099
00100
00101
00109 static int dnxDispatchJob(iDnxDispatcher * idisp, DnxNewJob * pSvcReq)
00110 {
00111 DnxNodeRequest * pNode = pSvcReq->pNode;
00112 int ret;
00113
00114 ret = dnxSendJobMsg(idisp, pSvcReq, pNode);
00115
00120 return ret;
00121 }
00122
00123
00124
00131 static void * dnxDispatcher(void * data)
00132 {
00133 iDnxDispatcher * idisp = (iDnxDispatcher *)data;
00134
00135 assert(data);
00136
00137 pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, 0);
00138 pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, 0);
00139
00140 dnxLog("Dispatcher awaiting jobs...");
00141
00142 while (1)
00143 {
00144 DnxNewJob svcReq;
00145 int ret;
00146
00147 pthread_testcancel();
00148
00149
00150 if ((ret = dnxJobListDispatch(idisp->joblist, &svcReq)) == DNX_OK)
00151 {
00152 if ((ret = dnxDispatchJob(idisp, &svcReq)) == DNX_OK)
00153 {
00154 dnxStatsInc(svcReq.pNode->address, DISPATCHES_OK);
00155 dnxAuditJob(&svcReq, "DISPATCH");
00156 }
00157 else
00158 {
00159 dnxStatsInc(svcReq.pNode->address, DISPATCHES_FAILED);
00160 dnxAuditJob(&svcReq, "DISPATCH-FAIL");
00161 }
00162 }
00163
00164 }
00165 return 0;
00166 }
00167
00168
00169
00170
00171
00172 DnxChannel * dnxDispatcherGetChannel(DnxDispatcher * disp)
00173 { return ((iDnxDispatcher *)disp)->channel; }
00174
00175
00176
00177 int dnxDispatcherCreate(char * chname, char * url, DnxJobList * joblist,
00178 DnxDispatcher ** pdisp)
00179 {
00180 iDnxDispatcher * idisp;
00181 int ret = DNX_ERR_MEMORY;
00182
00183 if ((idisp = (iDnxDispatcher *)xmalloc(sizeof *idisp)) == 0)
00184 return ret;
00185
00186 memset(idisp, 0, sizeof *idisp);
00187 idisp->chname = xstrdup(chname);
00188 idisp->url = xstrdup(url);
00189 idisp->joblist = joblist;
00190
00191 if (!idisp->chname || !idisp->url)
00192 goto e0;
00193
00194 if ((ret = dnxChanMapAdd(chname, url)) != DNX_OK)
00195 {
00196 dnxLog("dnxDispatcherCreate: dnxChanMapAdd(%s) failed: %s.",
00197 chname, dnxErrorString(ret));
00198 goto e1;
00199 }
00200
00201
00202
00203
00204
00205
00206 if ((ret = dnxConnect(chname, DNX_MODE_PASSIVE, &idisp->channel)) != DNX_OK)
00207 {
00208 dnxLog("dnxDispatcherCreate: dnxConnect(%s) failed: %s.",
00209 chname, dnxErrorString(ret));
00210 goto e2;
00211 }
00212
00213
00214 if ((ret = pthread_create(&idisp->tid, 0, dnxDispatcher, idisp)) != 0)
00215 {
00216 dnxLog("dnxDispatcherCreate: thread creation failed: %s.",
00217 dnxErrorString(ret));
00218 ret = DNX_ERR_THREAD;
00219 goto e3;
00220 }
00221
00222 *pdisp = (DnxDispatcher*)idisp;
00223
00224 return DNX_OK;
00225
00226
00227
00228 e3:dnxDisconnect(idisp->channel);
00229 e2:dnxChanMapDelete(idisp->chname);
00230 e1:xfree(idisp->url);
00231 e0:xfree(idisp->chname);
00232 xfree(idisp);
00233
00234 return ret;
00235 }
00236
00237
00238
00239 void dnxDispatcherDestroy(DnxDispatcher * disp)
00240 {
00241 iDnxDispatcher * idisp = (iDnxDispatcher *)disp;
00242
00243 pthread_cancel(idisp->tid);
00244 pthread_join(idisp->tid, 0);
00245
00246 dnxDisconnect(idisp->channel);
00247 dnxChanMapDelete(idisp->chname);
00248
00249 xfree(idisp->url);
00250 xfree(idisp->chname);
00251 xfree(idisp);
00252 }
00253
00254
00255
00256
00257
00258
00259
00260
00261
00262
00263
00264
00265
00266
00267
00268
00269
00270
00271
00272
00273
00274
00275 #ifdef DNX_DISPATCHER_TEST
00276
00277 #include "utesthelp.h"
00278
00279 static int verbose;
00280 static int once = 0;
00281 static char * test_url = "udp://0.0.0.0:12489";
00282 static char * test_chname = "TestCollector";
00283 static DnxChannel * test_channel = (DnxChannel *)17;
00284 static DnxJobList * test_joblist = (DnxJobList *)1;
00285 static DnxNewJob test_job;
00286 static int test_payload;
00287 static DnxNodeRequest test_node;
00288
00289
00290 IMPLEMENT_DNX_DEBUG(verbose);
00291 IMPLEMENT_DNX_SYSLOG(verbose);
00292
00293 int dnxEqualXIDs(DnxXID * pxa, DnxXID * pxb)
00294 {
00295 return pxa->objType == pxb->objType
00296 && pxa->objSerial == pxb->objSerial
00297 && pxa->objSlot == pxb->objSlot;
00298 }
00299
00300 int dnxChanMapAdd(char * name, char * url)
00301 {
00302 CHECK_TRUE(name != 0);
00303 CHECK_TRUE(strcmp(name, test_chname) == 0);
00304 CHECK_TRUE(url != 0);
00305 CHECK_TRUE(strcmp(url, test_url) == 0);
00306 return 0;
00307 }
00308
00309 int dnxConnect(char * name, int active, DnxChannel ** channel)
00310 {
00311 *channel = test_channel;
00312 CHECK_TRUE(name != 0);
00313 CHECK_TRUE(strcmp(name, test_chname) == 0);
00314 CHECK_TRUE(active == 0);
00315 return 0;
00316 }
00317
00318 int dnxJobListDispatch(DnxJobList * pJobList, DnxNewJob * pJob)
00319 {
00320 CHECK_TRUE(pJobList == test_joblist);
00321 CHECK_TRUE(pJob != 0);
00322 memcpy(pJob, &test_job, sizeof *pJob);
00323
00324 once++;
00325
00326 return 0;
00327 }
00328
00329 int dnxSendJob(DnxChannel * channel, DnxJob * pJob, char * address)
00330 {
00331 CHECK_TRUE(channel != 0);
00332 CHECK_TRUE(pJob != 0);
00333
00334 CHECK_TRUE(dnxEqualXIDs(&pJob->xid, &test_job.xid));
00335 CHECK_TRUE(pJob->state == DNX_JOB_PENDING);
00336 CHECK_TRUE(pJob->priority == 1);
00337 CHECK_TRUE(pJob->timeout == test_job.timeout);
00338 CHECK_TRUE(pJob->cmd == test_job.cmd);
00339
00340 return 0;
00341 }
00342
00343 int dnxAuditJob(DnxNewJob * pJob, char * action)
00344 {
00345 CHECK_TRUE(pJob != 0);
00346 CHECK_TRUE(strcmp(action, "DISPATCH") == 0);
00347 return 0;
00348 }
00349
00350 void dnxDisconnect(DnxChannel * channel)
00351 {
00352 CHECK_TRUE(channel == test_channel);
00353 }
00354
00355 void dnxChanMapDelete(char * name)
00356 {
00357 CHECK_TRUE(name != 0);
00358 CHECK_TRUE(strcmp(name, test_chname) == 0);
00359 }
00360
00361 void dnxStatsInc(char * addr, DnxStatsIndex member) {}
00362
00363 int main(int argc, char ** argv)
00364 {
00365 DnxDispatcher * dp;
00366 iDnxDispatcher * idp;
00367
00368 verbose = argc > 1 ? 1 : 0;
00369
00370 memset(&test_node, 0, sizeof test_node);
00371 test_job.state = DNX_JOB_PENDING;
00372 memset(&test_job.xid, 1, sizeof test_job.xid);
00373 test_job.cmd = "test command";
00374 test_job.start_time = 1000;
00375 test_job.timeout = 5;
00376 test_job.expires = 5000;
00377 test_job.payload = &test_payload;
00378 test_job.pNode = &test_node;
00379
00380 CHECK_ZERO(dnxDispatcherCreate(test_chname, test_url, test_joblist, &dp));
00381
00382 idp = (iDnxDispatcher *)dp;
00383
00384 CHECK_TRUE(strcmp(idp->chname, test_chname) == 0);
00385 CHECK_TRUE(idp->joblist == test_joblist);
00386 CHECK_TRUE(idp->tid != 0);
00387 CHECK_TRUE(strcmp(idp->url, test_url) == 0);
00388
00389 CHECK_TRUE(dnxDispatcherGetChannel(dp) == idp->channel);
00390
00391 while (!once)
00392 dnxCancelableSleep(10);
00393
00394 dnxDispatcherDestroy(dp);
00395
00396 #ifdef DEBUG_HEAP
00397 CHECK_ZERO(dnxCheckHeap());
00398 #endif
00399
00400 return 0;
00401 }
00402
00403 #endif
00404
00405
00406