dnxCollector.c

Go to the documentation of this file.
00001 /*--------------------------------------------------------------------------
00002 
00003    Copyright (c) 2006-2010, Intellectual Reserve, Inc. All rights reserved.
00004 
00005    This program is free software; you can redistribute it and/or modify
00006    it under the terms of the GNU General Public License version 2 as
00007    published by the Free Software Foundation.
00008 
00009    This program is distributed in the hope that it will be useful,
00010    but WITHOUT ANY WARRANTY; without even the implied warranty of
00011    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00012    GNU General Public License for more details.
00013 
00014    You should have received a copy of the GNU General Public License
00015    along with this program; if not, write to the Free Software
00016    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00017 
00018   --------------------------------------------------------------------------*/
00019 
00034 #include "dnxCollector.h"
00035 
00036 #include "dnxServerMain.h"
00037 #include "dnxDebug.h"
00038 #include "dnxError.h"
00039 #include "dnxQueue.h"
00040 #include "dnxProtocol.h"
00041 #include "dnxJobList.h"
00042 #include "dnxLogging.h"
00043 
00044 #include <stdlib.h>
00045 #include <assert.h>
00046 
00047 #define DNX_COLLECTOR_TIMEOUT 30
00048 
00050 typedef struct iDnxCollector_
00051 {
00052    char * chname;          
00053    char * url;             
00054    DnxJobList * joblist;   
00055    DnxChannel * channel;   
00056    pthread_t tid;          
00057 } iDnxCollector;
00058 
00059 /*--------------------------------------------------------------------------
00060                               IMPLEMENTATION
00061   --------------------------------------------------------------------------*/
00062 
00071 static void * dnxCollector(void * data)
00072 {
00073    iDnxCollector * icoll = (iDnxCollector *)data;
00074    pthread_t tid = pthread_self();
00075    DnxResult sResult;
00076    DnxNewJob Job;
00077    int ret;
00078 
00079    assert(data);
00080 
00081    pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, 0);
00082    pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, 0);
00083 
00084    dnxLog("dnxCollector[%lx]: Awaiting service check results.", tid);
00085 
00086    while (1)
00087    {
00088       pthread_testcancel();
00089 
00090       if ((ret = dnxWaitForResult(icoll->channel,
00091             &sResult, sResult.address, DNX_COLLECTOR_TIMEOUT)) == DNX_OK)
00092       {
00093          dnxDebug(2, "dnxCollector[%lx]: Received result for job [%lu,%lu]: %s.",
00094                tid, sResult.xid.objSerial, sResult.xid.objSlot, sResult.resData);
00095 
00096          // dequeue the matching service request from the pending job queue
00097          if ((ret = dnxJobListCollect(icoll->joblist, &sResult.xid, &Job)) == DNX_OK)
00098          {
00099             ret = dnxPostResult(Job.payload, Job.xid.objSerial, Job.start_time, 
00100                   sResult.delta, 0, sResult.resCode, sResult.resData);
00101 
00103             xfree(sResult.resData);
00104 
00105             dnxDebug(2, "dnxCollector[%lx]: Post result for job [%lu,%lu]: %s.",
00106                   tid, sResult.xid.objSerial, sResult.xid.objSlot,
00107                   dnxErrorString(ret));
00108 
00109             dnxAuditJob(&Job, "COLLECT");
00110 
00111             dnxJobCleanup(&Job);
00112          }
00113          else
00114             dnxLog("dnxCollector[%lx]: Dequeue job failed: %s.",
00115                   tid, dnxErrorString(ret));
00116       }
00117       else if (ret != DNX_ERR_TIMEOUT)
00118          dnxLog("dnxCollector[%lx]: Receive failed: %s.",
00119                tid, dnxErrorString(ret));
00120    }
00121    return 0;
00122 }
00123 
00124 /*--------------------------------------------------------------------------
00125                                  INTERFACE
00126   --------------------------------------------------------------------------*/
00127 
00128 DnxChannel * dnxCollectorGetChannel(DnxCollector * coll)
00129       { return ((iDnxCollector *)coll)->channel; }
00130 
00131 //----------------------------------------------------------------------------
00132 
00133 int dnxCollectorCreate(char * chname, char * collurl, DnxJobList * joblist, DnxCollector ** pcoll)
00134 {
00135    iDnxCollector * icoll;
00136    int ret;
00137 
00138    if ((icoll = (iDnxCollector *)xmalloc(sizeof *icoll)) == 0)
00139       return DNX_ERR_MEMORY;
00140 
00141    memset(icoll, 0, sizeof *icoll);
00142    icoll->chname = xstrdup(chname);
00143    icoll->url = xstrdup(collurl);
00144    icoll->joblist = joblist;
00145 
00146    if (!icoll->url || !icoll->chname)
00147    {
00148       xfree(icoll);
00149       return DNX_ERR_MEMORY;
00150    }
00151    if ((ret = dnxChanMapAdd(chname, collurl)) != DNX_OK)
00152    {
00153       dnxLog("dnxCollectorCreate: dnxChanMapAdd(%s) failed: %s.",
00154             chname, dnxErrorString(ret));
00155       goto e1;
00156    }
00157    if ((ret = dnxConnect(chname, DNX_MODE_PASSIVE, &icoll->channel)) != DNX_OK)
00158    {
00159       dnxLog("dnxCollectorCreate: dnxConnect(%s) failed: %s.",
00160             chname, dnxErrorString(ret));
00161       goto e2;
00162    }
00163 
00164    // create the collector thread
00165    if ((ret = pthread_create(&icoll->tid, 0, dnxCollector, icoll)) != 0)
00166    {
00167       dnxLog("dnxCollectorCreate: thread creation failed: %s.",
00168             dnxErrorString(ret));
00169       ret = DNX_ERR_THREAD;
00170       goto e3;
00171    }
00172 
00173    *pcoll = (DnxCollector *)icoll;
00174 
00175    return DNX_OK;
00176 
00177 // error paths
00178 
00179 e3:dnxDisconnect(icoll->channel);
00180 e2:dnxChanMapDelete(icoll->chname);
00181 e1:xfree(icoll->url);
00182    xfree(icoll->chname);
00183    xfree(icoll);
00184 
00185    return ret;
00186 }
00187 
00188 //----------------------------------------------------------------------------
00189 
00190 void dnxCollectorDestroy(DnxCollector  * coll)
00191 {
00192    iDnxCollector * icoll = (iDnxCollector *)coll;
00193 
00194    pthread_cancel(icoll->tid);
00195    pthread_join(icoll->tid, 0);
00196 
00197    dnxDisconnect(icoll->channel);
00198    dnxChanMapDelete(icoll->chname);
00199 
00200    xfree(icoll->url);
00201    xfree(icoll->chname);
00202    xfree(icoll);
00203 }
00204 
00205 /*--------------------------------------------------------------------------
00206                                  TEST MAIN
00207 
00208    From within dnx/server, compile with GNU tools using this command line:
00209 
00210       gcc -DDEBUG -DDNX_COLLECTOR_TEST -DHAVE_NANOSLEEP -g -O0 \
00211          -lpthread -o dnxCollectorTest -I../nagios/nagios-2.7/include \
00212          -I../common dnxCollector.c ../common/dnxError.c \
00213          ../common/dnxSleep.c
00214 
00215    Alternatively, a heap check may be done with the following command line:
00216 
00217       gcc -DDEBUG -DDEBUG_HEAP -DDNX_COLLECTOR_TEST -DHAVE_NANOSLEEP -g -O0 \
00218          -lpthread -o dnxCollectorTest -I../nagios/nagios-2.7/include \
00219          -I../common dnxCollector.c ../common/dnxError.c \
00220          ../common/dnxSleep.c ../common/dnxHeap.c
00221 
00222    Note: Leave out -DHAVE_NANOSLEEP if your system doesn't have nanosleep.
00223 
00224   --------------------------------------------------------------------------*/
00225 
00226 #ifdef DNX_COLLECTOR_TEST
00227 
00228 #include "utesthelp.h"
00229 
00230 static int verbose;
00231 static int once = 0;
00232 static char * test_url = "udp://0.0.0.0:12489";
00233 static char * test_chname = "TestCollector";
00234 static char * test_cmd = "test command";
00235 static DnxChannel * test_channel = (DnxChannel *)17;
00236 static DnxJobList * test_joblist = (DnxJobList *)1;
00237 static DnxResult test_result;
00238 static DnxNodeRequest test_node;
00239 static int test_payload;
00240 
00241 // test stubs
00242 IMPLEMENT_DNX_DEBUG(verbose);
00243 IMPLEMENT_DNX_SYSLOG(verbose);
00244 
00245 int dnxChanMapAdd(char * name, char * url)
00246 {
00247    CHECK_TRUE(name != 0);
00248    CHECK_TRUE(strcmp(name, test_chname) == 0);
00249    CHECK_TRUE(url != 0);
00250    CHECK_TRUE(strcmp(url, test_url) == 0);
00251    return 0;
00252 }
00253 
00254 int dnxConnect(char * name, int active, DnxChannel ** channel)
00255 {
00256    *channel = test_channel;
00257    CHECK_TRUE(name != 0);
00258    CHECK_TRUE(strcmp(name, test_chname) == 0);
00259    CHECK_TRUE(active == 0);
00260    return 0;
00261 }
00262 
00263 void dnxDisconnect(DnxChannel * channel)
00264 {
00265    CHECK_TRUE(channel == test_channel);
00266 }
00267 
00268 void dnxChanMapDelete(char * name)
00269 {
00270    CHECK_TRUE(name != 0);
00271    CHECK_TRUE(strcmp(name, test_chname) == 0);
00272 }
00273 
00274 int dnxWaitForResult(DnxChannel * channel, DnxResult * pResult, char * address, int timeout)
00275 {
00276    CHECK_TRUE(pResult != 0);
00277 
00278    memset(pResult, 1, sizeof *pResult);
00279    pResult->resData = 0;
00280 
00281    CHECK_TRUE(timeout == DNX_COLLECTOR_TIMEOUT);
00282 
00283    once++;     // stop the test after first pass
00284 
00285    return 0;
00286 }
00287 
00288 int dnxJobListCollect(DnxJobList * pJobList, DnxXID * pxid, DnxNewJob * pJob)
00289 {
00290    CHECK_TRUE(pJob != 0);
00291    CHECK_TRUE(pxid != 0);
00292    CHECK_TRUE(pJobList == test_joblist);
00293 
00294    memset(pJob, 0, sizeof *pJob);
00295    memcpy(&pJob->xid, pxid, sizeof pJob->xid);
00296    pJob->state = DNX_JOB_COMPLETE;
00297    pJob->cmd = test_cmd;
00298    pJob->payload = &test_payload;
00299    pJob->pNode = &test_node;
00300 
00301    return 0;
00302 }
00303 
00304 int dnxPostResult(void * payload, unsigned long serial, time_t start_time, 
00305       unsigned delta, int early_timeout, int res_code, char * res_data)
00306 {
00307    CHECK_TRUE(payload == &test_payload);
00308    return 0;
00309 }
00310 
00311 int dnxAuditJob(DnxNewJob * pJob, char * action)
00312 {
00313    CHECK_TRUE(pJob != 0);
00314    CHECK_TRUE(action != 0);
00315    return 0;
00316 }
00317 
00318 void dnxJobCleanup(DnxNewJob * pJob) { CHECK_TRUE(pJob != 0); }
00319 
00320 int main(int argc, char ** argv)
00321 {
00322    DnxCollector * cp;
00323    iDnxCollector * icp;
00324 
00325    verbose = argc > 1 ? 1 : 0;
00326 
00327    memset(&test_result, 0, sizeof test_result);
00328 
00329    test_result.xid.objSerial = 1;
00330    test_result.xid.objSlot = 1;
00331    test_result.xid.objType = DNX_OBJ_COLLECTOR;
00332    test_result.state = DNX_JOB_INPROGRESS;
00333    test_result.delta = 1;
00334    test_result.resCode = 1;
00335    
00336    CHECK_ZERO(dnxCollectorCreate(test_chname, test_url, test_joblist, &cp));
00337 
00338    icp = (iDnxCollector *)cp;
00339 
00340    CHECK_TRUE(icp->channel == test_channel);
00341    CHECK_TRUE(strcmp(icp->chname, test_chname) == 0);
00342    CHECK_TRUE(icp->joblist == test_joblist);
00343    CHECK_TRUE(icp->tid != 0);
00344    CHECK_TRUE(strcmp(icp->url, test_url) == 0);
00345 
00346    CHECK_TRUE(dnxCollectorGetChannel(cp) == icp->channel);
00347 
00348    while (!once)
00349       dnxCancelableSleep(10);
00350 
00351    dnxCollectorDestroy(cp);
00352 
00353 #ifdef DEBUG_HEAP
00354    CHECK_ZERO(dnxCheckHeap());
00355 #endif
00356 
00357    return 0;
00358 }
00359 
00360 #endif   /* DNX_COLLECTOR_TEST */
00361 
00362 /*--------------------------------------------------------------------------*/
00363 

Generated on Tue Apr 13 15:48:07 2010 for DNX by  doxygen 1.5.6