00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00034 #include "dnxCollector.h"
00035
00036 #include "dnxServerMain.h"
00037 #include "dnxDebug.h"
00038 #include "dnxError.h"
00039 #include "dnxQueue.h"
00040 #include "dnxProtocol.h"
00041 #include "dnxJobList.h"
00042 #include "dnxLogging.h"
00043
00044 #include <stdlib.h>
00045 #include <assert.h>
00046
00047 #define DNX_COLLECTOR_TIMEOUT 30
00048
00050 typedef struct iDnxCollector_
00051 {
00052 char * chname;
00053 char * url;
00054 DnxJobList * joblist;
00055 DnxChannel * channel;
00056 pthread_t tid;
00057 } iDnxCollector;
00058
00059
00060
00061
00062
00071 static void * dnxCollector(void * data)
00072 {
00073 iDnxCollector * icoll = (iDnxCollector *)data;
00074 pthread_t tid = pthread_self();
00075 DnxResult sResult;
00076 DnxNewJob Job;
00077 int ret;
00078
00079 assert(data);
00080
00081 pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, 0);
00082 pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, 0);
00083
00084 dnxLog("dnxCollector[%lx]: Awaiting service check results.", tid);
00085
00086 while (1)
00087 {
00088 pthread_testcancel();
00089
00090 if ((ret = dnxWaitForResult(icoll->channel,
00091 &sResult, sResult.address, DNX_COLLECTOR_TIMEOUT)) == DNX_OK)
00092 {
00093 dnxDebug(2, "dnxCollector[%lx]: Received result for job [%lu,%lu]: %s.",
00094 tid, sResult.xid.objSerial, sResult.xid.objSlot, sResult.resData);
00095
00096
00097 if ((ret = dnxJobListCollect(icoll->joblist, &sResult.xid, &Job)) == DNX_OK)
00098 {
00099 ret = dnxPostResult(Job.payload, Job.xid.objSerial, Job.start_time,
00100 sResult.delta, 0, sResult.resCode, sResult.resData);
00101
00103 xfree(sResult.resData);
00104
00105 dnxDebug(2, "dnxCollector[%lx]: Post result for job [%lu,%lu]: %s.",
00106 tid, sResult.xid.objSerial, sResult.xid.objSlot,
00107 dnxErrorString(ret));
00108
00109 dnxAuditJob(&Job, "COLLECT");
00110
00111 dnxJobCleanup(&Job);
00112 }
00113 else
00114 dnxLog("dnxCollector[%lx]: Dequeue job failed: %s.",
00115 tid, dnxErrorString(ret));
00116 }
00117 else if (ret != DNX_ERR_TIMEOUT)
00118 dnxLog("dnxCollector[%lx]: Receive failed: %s.",
00119 tid, dnxErrorString(ret));
00120 }
00121 return 0;
00122 }
00123
00124
00125
00126
00127
00128 DnxChannel * dnxCollectorGetChannel(DnxCollector * coll)
00129 { return ((iDnxCollector *)coll)->channel; }
00130
00131
00132
00133 int dnxCollectorCreate(char * chname, char * collurl, DnxJobList * joblist, DnxCollector ** pcoll)
00134 {
00135 iDnxCollector * icoll;
00136 int ret;
00137
00138 if ((icoll = (iDnxCollector *)xmalloc(sizeof *icoll)) == 0)
00139 return DNX_ERR_MEMORY;
00140
00141 memset(icoll, 0, sizeof *icoll);
00142 icoll->chname = xstrdup(chname);
00143 icoll->url = xstrdup(collurl);
00144 icoll->joblist = joblist;
00145
00146 if (!icoll->url || !icoll->chname)
00147 {
00148 xfree(icoll);
00149 return DNX_ERR_MEMORY;
00150 }
00151 if ((ret = dnxChanMapAdd(chname, collurl)) != DNX_OK)
00152 {
00153 dnxLog("dnxCollectorCreate: dnxChanMapAdd(%s) failed: %s.",
00154 chname, dnxErrorString(ret));
00155 goto e1;
00156 }
00157 if ((ret = dnxConnect(chname, DNX_MODE_PASSIVE, &icoll->channel)) != DNX_OK)
00158 {
00159 dnxLog("dnxCollectorCreate: dnxConnect(%s) failed: %s.",
00160 chname, dnxErrorString(ret));
00161 goto e2;
00162 }
00163
00164
00165 if ((ret = pthread_create(&icoll->tid, 0, dnxCollector, icoll)) != 0)
00166 {
00167 dnxLog("dnxCollectorCreate: thread creation failed: %s.",
00168 dnxErrorString(ret));
00169 ret = DNX_ERR_THREAD;
00170 goto e3;
00171 }
00172
00173 *pcoll = (DnxCollector *)icoll;
00174
00175 return DNX_OK;
00176
00177
00178
00179 e3:dnxDisconnect(icoll->channel);
00180 e2:dnxChanMapDelete(icoll->chname);
00181 e1:xfree(icoll->url);
00182 xfree(icoll->chname);
00183 xfree(icoll);
00184
00185 return ret;
00186 }
00187
00188
00189
00190 void dnxCollectorDestroy(DnxCollector * coll)
00191 {
00192 iDnxCollector * icoll = (iDnxCollector *)coll;
00193
00194 pthread_cancel(icoll->tid);
00195 pthread_join(icoll->tid, 0);
00196
00197 dnxDisconnect(icoll->channel);
00198 dnxChanMapDelete(icoll->chname);
00199
00200 xfree(icoll->url);
00201 xfree(icoll->chname);
00202 xfree(icoll);
00203 }
00204
00205
00206
00207
00208
00209
00210
00211
00212
00213
00214
00215
00216
00217
00218
00219
00220
00221
00222
00223
00224
00225
00226 #ifdef DNX_COLLECTOR_TEST
00227
00228 #include "utesthelp.h"
00229
00230 static int verbose;
00231 static int once = 0;
00232 static char * test_url = "udp://0.0.0.0:12489";
00233 static char * test_chname = "TestCollector";
00234 static char * test_cmd = "test command";
00235 static DnxChannel * test_channel = (DnxChannel *)17;
00236 static DnxJobList * test_joblist = (DnxJobList *)1;
00237 static DnxResult test_result;
00238 static DnxNodeRequest test_node;
00239 static int test_payload;
00240
00241
00242 IMPLEMENT_DNX_DEBUG(verbose);
00243 IMPLEMENT_DNX_SYSLOG(verbose);
00244
00245 int dnxChanMapAdd(char * name, char * url)
00246 {
00247 CHECK_TRUE(name != 0);
00248 CHECK_TRUE(strcmp(name, test_chname) == 0);
00249 CHECK_TRUE(url != 0);
00250 CHECK_TRUE(strcmp(url, test_url) == 0);
00251 return 0;
00252 }
00253
00254 int dnxConnect(char * name, int active, DnxChannel ** channel)
00255 {
00256 *channel = test_channel;
00257 CHECK_TRUE(name != 0);
00258 CHECK_TRUE(strcmp(name, test_chname) == 0);
00259 CHECK_TRUE(active == 0);
00260 return 0;
00261 }
00262
00263 void dnxDisconnect(DnxChannel * channel)
00264 {
00265 CHECK_TRUE(channel == test_channel);
00266 }
00267
00268 void dnxChanMapDelete(char * name)
00269 {
00270 CHECK_TRUE(name != 0);
00271 CHECK_TRUE(strcmp(name, test_chname) == 0);
00272 }
00273
00274 int dnxWaitForResult(DnxChannel * channel, DnxResult * pResult, char * address, int timeout)
00275 {
00276 CHECK_TRUE(pResult != 0);
00277
00278 memset(pResult, 1, sizeof *pResult);
00279 pResult->resData = 0;
00280
00281 CHECK_TRUE(timeout == DNX_COLLECTOR_TIMEOUT);
00282
00283 once++;
00284
00285 return 0;
00286 }
00287
00288 int dnxJobListCollect(DnxJobList * pJobList, DnxXID * pxid, DnxNewJob * pJob)
00289 {
00290 CHECK_TRUE(pJob != 0);
00291 CHECK_TRUE(pxid != 0);
00292 CHECK_TRUE(pJobList == test_joblist);
00293
00294 memset(pJob, 0, sizeof *pJob);
00295 memcpy(&pJob->xid, pxid, sizeof pJob->xid);
00296 pJob->state = DNX_JOB_COMPLETE;
00297 pJob->cmd = test_cmd;
00298 pJob->payload = &test_payload;
00299 pJob->pNode = &test_node;
00300
00301 return 0;
00302 }
00303
00304 int dnxPostResult(void * payload, unsigned long serial, time_t start_time,
00305 unsigned delta, int early_timeout, int res_code, char * res_data)
00306 {
00307 CHECK_TRUE(payload == &test_payload);
00308 return 0;
00309 }
00310
00311 int dnxAuditJob(DnxNewJob * pJob, char * action)
00312 {
00313 CHECK_TRUE(pJob != 0);
00314 CHECK_TRUE(action != 0);
00315 return 0;
00316 }
00317
00318 void dnxJobCleanup(DnxNewJob * pJob) { CHECK_TRUE(pJob != 0); }
00319
00320 int main(int argc, char ** argv)
00321 {
00322 DnxCollector * cp;
00323 iDnxCollector * icp;
00324
00325 verbose = argc > 1 ? 1 : 0;
00326
00327 memset(&test_result, 0, sizeof test_result);
00328
00329 test_result.xid.objSerial = 1;
00330 test_result.xid.objSlot = 1;
00331 test_result.xid.objType = DNX_OBJ_COLLECTOR;
00332 test_result.state = DNX_JOB_INPROGRESS;
00333 test_result.delta = 1;
00334 test_result.resCode = 1;
00335
00336 CHECK_ZERO(dnxCollectorCreate(test_chname, test_url, test_joblist, &cp));
00337
00338 icp = (iDnxCollector *)cp;
00339
00340 CHECK_TRUE(icp->channel == test_channel);
00341 CHECK_TRUE(strcmp(icp->chname, test_chname) == 0);
00342 CHECK_TRUE(icp->joblist == test_joblist);
00343 CHECK_TRUE(icp->tid != 0);
00344 CHECK_TRUE(strcmp(icp->url, test_url) == 0);
00345
00346 CHECK_TRUE(dnxCollectorGetChannel(cp) == icp->channel);
00347
00348 while (!once)
00349 dnxCancelableSleep(10);
00350
00351 dnxCollectorDestroy(cp);
00352
00353 #ifdef DEBUG_HEAP
00354 CHECK_ZERO(dnxCheckHeap());
00355 #endif
00356
00357 return 0;
00358 }
00359
00360 #endif
00361
00362
00363