dnxDispatcher.c

Go to the documentation of this file.
00001 /*--------------------------------------------------------------------------
00002 
00003    Copyright (c) 2006-2007, Intellectual Reserve, Inc. All rights reserved.
00004 
00005    This program is free software; you can redistribute it and/or modify
00006    it under the terms of the GNU General Public License version 2 as
00007    published by the Free Software Foundation.
00008 
00009    This program is distributed in the hope that it will be useful,
00010    but WITHOUT ANY WARRANTY; without even the implied warranty of
00011    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00012    GNU General Public License for more details.
00013 
00014    You should have received a copy of the GNU General Public License
00015    along with this program; if not, write to the Free Software
00016    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00017 
00018   --------------------------------------------------------------------------*/
00019 
00035 #include "dnxDispatcher.h"
00036 
00037 #include "dnxError.h"
00038 #include "dnxDebug.h"
00039 #include "dnxSrvProt.h"
00040 #include "dnxXml.h"
00041 #include "dnxNebMain.h"
00042 #include "dnxRegistrar.h"
00043 #include "dnxJobList.h"
00044 #include "dnxLogging.h"
00045 #include "dnxStats.h"
00046 
00047 #include <sys/socket.h>
00048 #include <netinet/in.h>
00049 #include <assert.h>
00050 
00052 typedef struct iDnxDispatcher_
00053 {
00054    char * chname;          
00055    char * url;             
00056    DnxJobList * joblist;   
00057    DnxChannel * channel;   
00058    pthread_t tid;          
00059 } iDnxDispatcher;
00060 
00061 /*--------------------------------------------------------------------------
00062                               IMPLEMENTATION
00063   --------------------------------------------------------------------------*/
00064 
00074 static int dnxSendJobMsg(iDnxDispatcher * idisp, DnxNewJob * pSvcReq, DnxNodeRequest * pNode)
00075 {
00076    struct sockaddr * sin = (struct sockaddr *)pNode->address;
00077    pthread_t tid = pthread_self();
00078    DnxJob job;
00079    int ret;
00080 
00081    dnxDebug(2, "dnxDispatcher[%lx]: Dispatching job [%lu,%lu] (%s) to node %u.%u.%u.%u.",
00082          tid, pSvcReq->xid.objSerial, pSvcReq->xid.objSlot, pSvcReq->cmd,
00083          sin->sa_data[2], sin->sa_data[3], sin->sa_data[4], sin->sa_data[5]);
00084 
00085    memset(&job, 0, sizeof job);
00086    job.xid      = pSvcReq->xid;
00087    job.state    = DNX_JOB_PENDING;
00088    job.priority = 1;
00089    job.timeout  = pSvcReq->timeout;
00090    job.cmd      = pSvcReq->cmd;
00091 
00092    if ((ret = dnxSendJob(idisp->channel, &job, pNode->address)) != DNX_OK)
00093       dnxLog("Unable to send job [%lu,%lu] (%s) to worker node %u.%u.%u.%u: %s.",
00094             tid, pSvcReq->xid.objSerial, pSvcReq->xid.objSlot, pSvcReq->cmd,
00095             sin->sa_data[2], sin->sa_data[3], sin->sa_data[4], sin->sa_data[5],
00096             dnxErrorString(ret));
00097 
00098    return ret;
00099 }
00100 
00101 //----------------------------------------------------------------------------
00102 
00110 static int dnxDispatchJob(iDnxDispatcher * idisp, DnxNewJob * pSvcReq)
00111 {
00112    DnxNodeRequest * pNode = pSvcReq->pNode;
00113    int ret;
00114 
00115    ret = dnxSendJobMsg(idisp, pSvcReq, pNode);
00116 
00121    return ret;
00122 }
00123 
00124 //----------------------------------------------------------------------------
00125 
00132 static void * dnxDispatcher(void * data)
00133 {
00134    iDnxDispatcher * idisp = (iDnxDispatcher *)data;
00135 
00136    assert(data);
00137 
00138    pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, 0);
00139    pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, 0);
00140 
00141    dnxLog("Dispatcher awaiting jobs...");
00142 
00143    while (1)
00144    {
00145       DnxNewJob svcReq;
00146       int ret;
00147 
00148       pthread_testcancel();
00149 
00150       // wait for a new entry to be added to the job queue
00151       if ((ret = dnxJobListDispatch(idisp->joblist, &svcReq)) == DNX_OK)
00152       {
00153          if ((ret = dnxDispatchJob(idisp, &svcReq)) == DNX_OK)
00154          {
00155             dnxStatsInc(svcReq.pNode->address, DISPATCHES_OK);
00156             dnxAuditJob(&svcReq, "DISPATCH");
00157          }
00158          else
00159          {
00160             dnxStatsInc(svcReq.pNode->address, DISPATCHES_FAILED);
00161             dnxAuditJob(&svcReq, "DISPATCH-FAIL");
00162          }
00163       }
00164 
00165    }
00166    return 0;
00167 }
00168 
00169 /*--------------------------------------------------------------------------
00170                                  INTERFACE
00171   --------------------------------------------------------------------------*/
00172 
00173 DnxChannel * dnxDispatcherGetChannel(DnxDispatcher * disp)
00174       { return ((iDnxDispatcher *)disp)->channel; }
00175 
00176 //----------------------------------------------------------------------------
00177 
00178 int dnxDispatcherCreate(char * chname, char * url, DnxJobList * joblist,
00179       DnxDispatcher ** pdisp)
00180 {
00181    iDnxDispatcher * idisp;
00182    int ret;
00183 
00184    if ((idisp = (iDnxDispatcher *)xmalloc(sizeof *idisp)) == 0)
00185       return DNX_ERR_MEMORY;
00186 
00187    memset(idisp, 0, sizeof *idisp);
00188    idisp->chname = xstrdup(chname);
00189    idisp->url = xstrdup(url);
00190    idisp->joblist = joblist;
00191 
00192    if (!idisp->url || !idisp->chname)
00193    {
00194       xfree(idisp);
00195       return DNX_ERR_MEMORY;
00196    }
00197    if ((ret = dnxChanMapAdd(chname, url)) != DNX_OK)
00198    {
00199       dnxLog("dnxDispatcherCreate: dnxChanMapAdd(%s) failed: %s.",
00200             chname, dnxErrorString(ret));
00201       goto e1;
00202    }
00203    if ((ret = dnxConnect(chname, 0, &idisp->channel)) != DNX_OK)
00204    {
00205       dnxLog("dnxDispatcherCreate: dnxConnect(%s) failed: %s.",
00206             chname, dnxErrorString(ret));
00207       goto e2;
00208    }
00209 
00210    // create the dispatcher thread
00211    if ((ret = pthread_create(&idisp->tid, 0, dnxDispatcher, idisp)) != 0)
00212    {
00213       dnxLog("dnxDispatcherCreate: thread creation failed: %s.",
00214             dnxErrorString(ret));
00215       ret = DNX_ERR_THREAD;
00216       goto e3;
00217    }
00218 
00219    *pdisp = (DnxDispatcher*)idisp;
00220 
00221    return DNX_OK;
00222 
00223 // error paths
00224 
00225 e3:dnxDisconnect(idisp->channel);
00226 e2:dnxChanMapDelete(idisp->chname);
00227 e1:xfree(idisp->url);
00228    xfree(idisp->chname);
00229    xfree(idisp);
00230 
00231    return ret;
00232 }
00233 
00234 //----------------------------------------------------------------------------
00235 
00236 void dnxDispatcherDestroy(DnxDispatcher * disp)
00237 {
00238    iDnxDispatcher * idisp = (iDnxDispatcher *)disp;
00239 
00240    pthread_cancel(idisp->tid);
00241    pthread_join(idisp->tid, 0);
00242 
00243    dnxDisconnect(idisp->channel);
00244    dnxChanMapDelete(idisp->chname);
00245 
00246    xfree(idisp->url);
00247    xfree(idisp->chname);
00248    xfree(idisp);
00249 }
00250 
00251 /*--------------------------------------------------------------------------
00252                                  TEST MAIN
00253 
00254    From within dnx/server, compile with GNU tools using this command line:
00255 
00256       gcc -DDEBUG -DDNX_DISPATCHER_TEST -DHAVE_NANOSLEEP -g -O0 \
00257          -lpthread -o dnxDispatcherTest -I../nagios/nagios-2.7/include \
00258          -I../common dnxDispatcher.c ../common/dnxError.c \
00259          ../common/dnxSleep.c
00260 
00261    Alternatively, a heap check may be done with the following command line:
00262 
00263       gcc -DDEBUG -DDEBUG_HEAP -DDNX_DISPATCHER_TEST -DHAVE_NANOSLEEP -g -O0 \
00264          -lpthread -o dnxDispatcherTest -I../nagios/nagios-2.7/include \
00265          -I../common dnxDispatcher.c ../common/dnxError.c \
00266          ../common/dnxSleep.c ../common/dnxHeap.c
00267 
00268    Note: Leave out -DHAVE_NANOSLEEP if your system doesn't have nanosleep.
00269 
00270   --------------------------------------------------------------------------*/
00271 
00272 #ifdef DNX_DISPATCHER_TEST
00273 
00274 #include "utesthelp.h"
00275 
00276 static int verbose;
00277 static int once = 0;
00278 static char * test_url = "udp://0.0.0.0:12489";
00279 static char * test_chname = "TestCollector";
00280 static DnxChannel * test_channel = (DnxChannel *)17;
00281 static DnxJobList * test_joblist = (DnxJobList *)1;
00282 static DnxNewJob test_job;
00283 static int test_payload;
00284 static DnxNodeRequest test_node;
00285 
00286 // functional stubs
00287 IMPLEMENT_DNX_DEBUG(verbose);
00288 IMPLEMENT_DNX_SYSLOG(verbose);
00289 
00290 int dnxEqualXIDs(DnxXID * pxa, DnxXID * pxb)
00291 {
00292    return pxa->objType == pxb->objType
00293          && pxa->objSerial == pxb->objSerial
00294          && pxa->objSlot == pxb->objSlot;
00295 }
00296 
00297 int dnxChanMapAdd(char * name, char * url)
00298 {
00299    CHECK_TRUE(name != 0);
00300    CHECK_TRUE(strcmp(name, test_chname) == 0);
00301    CHECK_TRUE(url != 0);
00302    CHECK_TRUE(strcmp(url, test_url) == 0);
00303    return 0;
00304 }
00305 
00306 int dnxConnect(char * name, int active, DnxChannel ** channel)
00307 {
00308    *channel = test_channel;
00309    CHECK_TRUE(name != 0);
00310    CHECK_TRUE(strcmp(name, test_chname) == 0);
00311    CHECK_TRUE(active == 0);
00312    return 0;
00313 }
00314 
00315 int dnxJobListDispatch(DnxJobList * pJobList, DnxNewJob * pJob)
00316 {
00317    CHECK_TRUE(pJobList == test_joblist);
00318    CHECK_TRUE(pJob != 0);
00319    memcpy(pJob, &test_job, sizeof *pJob);
00320 
00321    once++;
00322 
00323    return 0;
00324 }
00325 
00326 int dnxSendJob(DnxChannel * channel, DnxJob * pJob, char * address)
00327 {
00328    CHECK_TRUE(channel != 0);
00329    CHECK_TRUE(pJob != 0);
00330 
00331    CHECK_TRUE(dnxEqualXIDs(&pJob->xid, &test_job.xid));
00332    CHECK_TRUE(pJob->state == DNX_JOB_PENDING);
00333    CHECK_TRUE(pJob->priority == 1);
00334    CHECK_TRUE(pJob->timeout == test_job.timeout);
00335    CHECK_TRUE(pJob->cmd == test_job.cmd);
00336 
00337    return 0;
00338 }
00339 
00340 int dnxAuditJob(DnxNewJob * pJob, char * action)
00341 {
00342    CHECK_TRUE(pJob != 0);
00343    CHECK_TRUE(strcmp(action, "DISPATCH") == 0);
00344    return 0;
00345 }
00346 
00347 void dnxDisconnect(DnxChannel * channel)
00348 {
00349    CHECK_TRUE(channel == test_channel);
00350 }
00351 
00352 void dnxChanMapDelete(char * name)
00353 {
00354    CHECK_TRUE(name != 0);
00355    CHECK_TRUE(strcmp(name, test_chname) == 0);
00356 }
00357 
00358 void dnxStatsInc(char * addr, DnxStatsIndex member) {}
00359 
00360 int main(int argc, char ** argv)
00361 {
00362    DnxDispatcher * dp;
00363    iDnxDispatcher * idp;
00364 
00365    verbose = argc > 1 ? 1 : 0;
00366 
00367    memset(&test_node, 0, sizeof test_node);
00368    test_job.state = DNX_JOB_PENDING;
00369    memset(&test_job.xid, 1, sizeof test_job.xid);
00370    test_job.cmd = "test command";
00371    test_job.start_time = 1000;
00372    test_job.timeout = 5;
00373    test_job.expires = 5000;
00374    test_job.payload = &test_payload;
00375    test_job.pNode = &test_node;
00376 
00377    CHECK_ZERO(dnxDispatcherCreate(test_chname, test_url, test_joblist, &dp));
00378 
00379    idp = (iDnxDispatcher *)dp;
00380 
00381    CHECK_TRUE(strcmp(idp->chname, test_chname) == 0);
00382    CHECK_TRUE(idp->joblist == test_joblist);
00383    CHECK_TRUE(idp->tid != 0);
00384    CHECK_TRUE(strcmp(idp->url, test_url) == 0);
00385 
00386    CHECK_TRUE(dnxDispatcherGetChannel(dp) == idp->channel);
00387 
00388    while (!once)
00389       dnxCancelableSleep(10);
00390 
00391    dnxDispatcherDestroy(dp);
00392 
00393 #ifdef DEBUG_HEAP
00394    CHECK_ZERO(dnxCheckHeap());
00395 #endif
00396 
00397    return 0;
00398 }
00399 
00400 #endif   /* DNX_DISPATCHER_TEST */
00401 
00402 /*--------------------------------------------------------------------------*/
00403 

Generated on Tue Apr 13 15:48:07 2010 for DNX by  doxygen 1.5.6