dnxDispatcher.c

Go to the documentation of this file.
00001 /*--------------------------------------------------------------------------
00002 
00003    Copyright (c) 2006-2010, Intellectual Reserve, Inc. All rights reserved.
00004 
00005    This program is free software; you can redistribute it and/or modify
00006    it under the terms of the GNU General Public License version 2 as
00007    published by the Free Software Foundation.
00008 
00009    This program is distributed in the hope that it will be useful,
00010    but WITHOUT ANY WARRANTY; without even the implied warranty of
00011    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00012    GNU General Public License for more details.
00013 
00014    You should have received a copy of the GNU General Public License
00015    along with this program; if not, write to the Free Software
00016    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00017 
00018   --------------------------------------------------------------------------*/
00019 
00035 #include "dnxDispatcher.h"
00036 
00037 #include "dnxError.h"
00038 #include "dnxDebug.h"
00039 #include "dnxProtocol.h"
00040 #include "dnxXml.h"
00041 #include "dnxServerMain.h"
00042 #include "dnxRegistrar.h"
00043 #include "dnxJobList.h"
00044 #include "dnxLogging.h"
00045 #include "dnxStats.h"
00046 
00047 #include <sys/socket.h>
00048 #include <netinet/in.h>
00049 #include <assert.h>
00050 
00052 typedef struct iDnxDispatcher_
00053 {
00054    char * chname;          
00055    char * url;             
00056    DnxJobList * joblist;   
00057    DnxChannel * channel;   
00058    pthread_t tid;          
00059 } iDnxDispatcher;
00060 
00061 /*--------------------------------------------------------------------------
00062                               IMPLEMENTATION
00063   --------------------------------------------------------------------------*/
00064 
00074 static int dnxSendJobMsg(iDnxDispatcher * idisp, DnxNewJob * pSvcReq, 
00075       DnxNodeRequest * pNode)
00076 {
00077    pthread_t tid = pthread_self();
00078    DnxJob job;
00079    int ret;
00080 
00081    dnxDebug(2, "dnxDispatcher[%lx]: Dispatching job [%lu,%lu] (%s) to node %s.",
00082          tid, pSvcReq->xid.objSerial, pSvcReq->xid.objSlot, pSvcReq->cmd, 
00083          pNode->addrstr);
00084 
00085    memset(&job, 0, sizeof job);
00086    job.xid      = pSvcReq->xid;
00087    job.state    = DNX_JOB_PENDING;
00088    job.priority = 1;
00089    job.timeout  = pSvcReq->timeout;
00090    job.cmd      = pSvcReq->cmd;
00091 
00092    if ((ret = dnxSendJob(idisp->channel, &job, pNode->address)) != DNX_OK)
00093       dnxLog("Unable to send job [%lu,%lu] (%s) to worker node %s: %s.",
00094             tid, pSvcReq->xid.objSerial, pSvcReq->xid.objSlot, pSvcReq->cmd, 
00095             pNode->addrstr, dnxErrorString(ret));
00096 
00097    return ret;
00098 }
00099 
00100 //----------------------------------------------------------------------------
00101 
00109 static int dnxDispatchJob(iDnxDispatcher * idisp, DnxNewJob * pSvcReq)
00110 {
00111    DnxNodeRequest * pNode = pSvcReq->pNode;
00112    int ret;
00113 
00114    ret = dnxSendJobMsg(idisp, pSvcReq, pNode);
00115 
00120    return ret;
00121 }
00122 
00123 //----------------------------------------------------------------------------
00124 
00131 static void * dnxDispatcher(void * data)
00132 {
00133    iDnxDispatcher * idisp = (iDnxDispatcher *)data;
00134 
00135    assert(data);
00136 
00137    pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, 0);
00138    pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, 0);
00139 
00140    dnxLog("Dispatcher awaiting jobs...");
00141 
00142    while (1)
00143    {
00144       DnxNewJob svcReq;
00145       int ret;
00146 
00147       pthread_testcancel();
00148 
00149       // wait for a new entry to be added to the job queue
00150       if ((ret = dnxJobListDispatch(idisp->joblist, &svcReq)) == DNX_OK)
00151       {
00152          if ((ret = dnxDispatchJob(idisp, &svcReq)) == DNX_OK)
00153          {
00154             dnxStatsInc(svcReq.pNode->address, DISPATCHES_OK);
00155             dnxAuditJob(&svcReq, "DISPATCH");
00156          }
00157          else
00158          {
00159             dnxStatsInc(svcReq.pNode->address, DISPATCHES_FAILED);
00160             dnxAuditJob(&svcReq, "DISPATCH-FAIL");
00161          } 
00162       }
00163 
00164    }
00165    return 0;
00166 }
00167 
00168 /*--------------------------------------------------------------------------
00169                                  INTERFACE
00170   --------------------------------------------------------------------------*/
00171 
00172 DnxChannel * dnxDispatcherGetChannel(DnxDispatcher * disp)
00173       { return ((iDnxDispatcher *)disp)->channel; }
00174       
00175 //----------------------------------------------------------------------------
00176 
00177 int dnxDispatcherCreate(char * chname, char * url, DnxJobList * joblist,
00178       DnxDispatcher ** pdisp)
00179 {
00180    iDnxDispatcher * idisp;
00181    int ret = DNX_ERR_MEMORY;
00182 
00183    if ((idisp = (iDnxDispatcher *)xmalloc(sizeof *idisp)) == 0)
00184       return ret;
00185 
00186    memset(idisp, 0, sizeof *idisp);
00187    idisp->chname = xstrdup(chname);
00188    idisp->url = xstrdup(url);
00189    idisp->joblist = joblist;
00190 
00191    if (!idisp->chname || !idisp->url)
00192       goto e0;
00193 
00194    if ((ret = dnxChanMapAdd(chname, url)) != DNX_OK)
00195    {
00196       dnxLog("dnxDispatcherCreate: dnxChanMapAdd(%s) failed: %s.",
00197             chname, dnxErrorString(ret));
00198       goto e1;
00199    }
00200 
00201    // the dispatcher is created as a listen channel because it's shared
00202    // by the registrar. The registrar listens on this channel for inbound
00203    // work requests from client nodes. The Dispatcher sends on this channel
00204    // to the addresses saved with these requests.
00205 
00206    if ((ret = dnxConnect(chname, DNX_MODE_PASSIVE, &idisp->channel)) != DNX_OK)
00207    {
00208       dnxLog("dnxDispatcherCreate: dnxConnect(%s) failed: %s.",
00209             chname, dnxErrorString(ret));
00210       goto e2;
00211    }
00212 
00213    // create the dispatcher thread
00214    if ((ret = pthread_create(&idisp->tid, 0, dnxDispatcher, idisp)) != 0)
00215    {
00216       dnxLog("dnxDispatcherCreate: thread creation failed: %s.",
00217             dnxErrorString(ret));
00218       ret = DNX_ERR_THREAD;
00219       goto e3;
00220    }
00221 
00222    *pdisp = (DnxDispatcher*)idisp;
00223 
00224    return DNX_OK;
00225 
00226 // error paths
00227 
00228 e3:dnxDisconnect(idisp->channel);
00229 e2:dnxChanMapDelete(idisp->chname);
00230 e1:xfree(idisp->url);
00231 e0:xfree(idisp->chname);
00232    xfree(idisp);
00233 
00234    return ret;
00235 }
00236 
00237 //----------------------------------------------------------------------------
00238 
00239 void dnxDispatcherDestroy(DnxDispatcher * disp)
00240 {
00241    iDnxDispatcher * idisp = (iDnxDispatcher *)disp;
00242 
00243    pthread_cancel(idisp->tid);
00244    pthread_join(idisp->tid, 0);
00245 
00246    dnxDisconnect(idisp->channel);
00247    dnxChanMapDelete(idisp->chname);
00248 
00249    xfree(idisp->url);
00250    xfree(idisp->chname);
00251    xfree(idisp);
00252 }
00253 
00254 /*--------------------------------------------------------------------------
00255                                  TEST MAIN
00256 
00257    From within dnx/server, compile with GNU tools using this command line:
00258 
00259       gcc -DDEBUG -DDNX_DISPATCHER_TEST -DHAVE_NANOSLEEP -g -O0 \
00260          -lpthread -o dnxDispatcherTest -I../nagios/nagios-2.7/include \
00261          -I../common dnxDispatcher.c ../common/dnxError.c \
00262          ../common/dnxSleep.c
00263 
00264    Alternatively, a heap check may be done with the following command line:
00265 
00266       gcc -DDEBUG -DDEBUG_HEAP -DDNX_DISPATCHER_TEST -DHAVE_NANOSLEEP -g -O0 \
00267          -lpthread -o dnxDispatcherTest -I../nagios/nagios-2.7/include \
00268          -I../common dnxDispatcher.c ../common/dnxError.c \
00269          ../common/dnxSleep.c ../common/dnxHeap.c
00270 
00271    Note: Leave out -DHAVE_NANOSLEEP if your system doesn't have nanosleep.
00272 
00273   --------------------------------------------------------------------------*/
00274 
00275 #ifdef DNX_DISPATCHER_TEST
00276 
00277 #include "utesthelp.h"
00278 
00279 static int verbose;
00280 static int once = 0;
00281 static char * test_url = "udp://0.0.0.0:12489";
00282 static char * test_chname = "TestCollector";
00283 static DnxChannel * test_channel = (DnxChannel *)17;
00284 static DnxJobList * test_joblist = (DnxJobList *)1;
00285 static DnxNewJob test_job;
00286 static int test_payload;
00287 static DnxNodeRequest test_node;
00288 
00289 // functional stubs
00290 IMPLEMENT_DNX_DEBUG(verbose);
00291 IMPLEMENT_DNX_SYSLOG(verbose);
00292 
00293 int dnxEqualXIDs(DnxXID * pxa, DnxXID * pxb)
00294 {
00295    return pxa->objType == pxb->objType
00296          && pxa->objSerial == pxb->objSerial
00297          && pxa->objSlot == pxb->objSlot;
00298 }
00299 
00300 int dnxChanMapAdd(char * name, char * url)
00301 {
00302    CHECK_TRUE(name != 0);
00303    CHECK_TRUE(strcmp(name, test_chname) == 0);
00304    CHECK_TRUE(url != 0);
00305    CHECK_TRUE(strcmp(url, test_url) == 0);
00306    return 0;
00307 }
00308 
00309 int dnxConnect(char * name, int active, DnxChannel ** channel)
00310 {
00311    *channel = test_channel;
00312    CHECK_TRUE(name != 0);
00313    CHECK_TRUE(strcmp(name, test_chname) == 0);
00314    CHECK_TRUE(active == 0);
00315    return 0;
00316 }
00317 
00318 int dnxJobListDispatch(DnxJobList * pJobList, DnxNewJob * pJob)
00319 {
00320    CHECK_TRUE(pJobList == test_joblist);
00321    CHECK_TRUE(pJob != 0);
00322    memcpy(pJob, &test_job, sizeof *pJob);
00323 
00324    once++;
00325 
00326    return 0;
00327 }
00328 
00329 int dnxSendJob(DnxChannel * channel, DnxJob * pJob, char * address)
00330 {
00331    CHECK_TRUE(channel != 0);
00332    CHECK_TRUE(pJob != 0);
00333 
00334    CHECK_TRUE(dnxEqualXIDs(&pJob->xid, &test_job.xid));
00335    CHECK_TRUE(pJob->state == DNX_JOB_PENDING);
00336    CHECK_TRUE(pJob->priority == 1);
00337    CHECK_TRUE(pJob->timeout == test_job.timeout);
00338    CHECK_TRUE(pJob->cmd == test_job.cmd);
00339 
00340    return 0;
00341 }
00342 
00343 int dnxAuditJob(DnxNewJob * pJob, char * action)
00344 {
00345    CHECK_TRUE(pJob != 0);
00346    CHECK_TRUE(strcmp(action, "DISPATCH") == 0);
00347    return 0;
00348 }
00349 
00350 void dnxDisconnect(DnxChannel * channel)
00351 {
00352    CHECK_TRUE(channel == test_channel);
00353 }
00354 
00355 void dnxChanMapDelete(char * name)
00356 {
00357    CHECK_TRUE(name != 0);
00358    CHECK_TRUE(strcmp(name, test_chname) == 0);
00359 }
00360 
00361 void dnxStatsInc(char * addr, DnxStatsIndex member) {}
00362 
00363 int main(int argc, char ** argv)
00364 {
00365    DnxDispatcher * dp;
00366    iDnxDispatcher * idp;
00367 
00368    verbose = argc > 1 ? 1 : 0;
00369 
00370    memset(&test_node, 0, sizeof test_node);
00371    test_job.state = DNX_JOB_PENDING;
00372    memset(&test_job.xid, 1, sizeof test_job.xid);
00373    test_job.cmd = "test command";
00374    test_job.start_time = 1000;
00375    test_job.timeout = 5;
00376    test_job.expires = 5000;
00377    test_job.payload = &test_payload;
00378    test_job.pNode = &test_node;
00379 
00380    CHECK_ZERO(dnxDispatcherCreate(test_chname, test_url, test_joblist, &dp));
00381 
00382    idp = (iDnxDispatcher *)dp;
00383 
00384    CHECK_TRUE(strcmp(idp->chname, test_chname) == 0);
00385    CHECK_TRUE(idp->joblist == test_joblist);
00386    CHECK_TRUE(idp->tid != 0);
00387    CHECK_TRUE(strcmp(idp->url, test_url) == 0);
00388 
00389    CHECK_TRUE(dnxDispatcherGetChannel(dp) == idp->channel);
00390 
00391    while (!once)
00392       dnxCancelableSleep(10);
00393 
00394    dnxDispatcherDestroy(dp);
00395 
00396 #ifdef DEBUG_HEAP
00397    CHECK_ZERO(dnxCheckHeap());
00398 #endif
00399 
00400    return 0;
00401 }
00402 
00403 #endif   /* DNX_DISPATCHER_TEST */
00404 
00405 /*--------------------------------------------------------------------------*/
00406 

Generated on Tue Apr 13 15:48:07 2010 for DNX by  doxygen 1.5.6