00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00035 #include "dnxWLM.h"
00036
00037 #include "dnxError.h"
00038 #include "dnxDebug.h"
00039 #include "dnxLogging.h"
00040 #include "dnxTransport.h"
00041 #include "dnxSleep.h"
00042 #include "dnxClntProt.h"
00043 #include "dnxPlugin.h"
00044
00045 #include <sys/time.h>
00046 #include <unistd.h>
00047 #include <stdio.h>
00048 #include <stdlib.h>
00049 #include <string.h>
00050 #include <time.h>
00051 #include <pthread.h>
00052 #include <assert.h>
00053 #include <errno.h>
00054
00055 #include <sys/types.h>
00056 #include <sys/socket.h>
00057 #include <netinet/in.h>
00058 #include <net/if.h>
00059 #include <ifaddrs.h>
00060
00061 #define elemcount(x) (sizeof(x)/sizeof(*(x)))
00062
00064 #define MAX_RESULT_DATA 1024
00065
00066 #define MAX_IP_ADDRSZ 64
00067
00068 struct iDnxWlm;
00069
00071 typedef enum DnxThreadState
00072 {
00073 DNX_THREAD_DEAD = 0,
00074 DNX_THREAD_RUNNING,
00075 DNX_THREAD_ZOMBIE
00076 } DnxThreadState;
00077
00079 typedef struct DnxWorkerStatus
00080 {
00081 DnxThreadState state;
00082 pthread_t tid;
00083 DnxChannel * dispatch;
00084 DnxChannel * collect;
00085 clock_t tstart;
00086 unsigned serial;
00087 struct iDnxWlm * iwlm;
00088 } DnxWorkerStatus;
00089
00091 typedef struct iDnxWlm
00092 {
00093 DnxWlmCfgData cfg;
00094 DnxWorkerStatus ** pool;
00095 pthread_mutex_t mutex;
00096 unsigned jobtm;
00097 unsigned threadtm;
00098 unsigned jobsok;
00099 unsigned jobsfail;
00100 unsigned active;
00101 unsigned tcreated;
00102 unsigned tdestroyed;
00103 unsigned threads;
00104 unsigned reqsent;
00105 unsigned jobsrcvd;
00106 unsigned minexectm;
00107 unsigned avgexectm;
00108 unsigned maxexectm;
00109 unsigned avgthreads;
00110 unsigned avgactive;
00111 unsigned poolsz;
00112 unsigned packets_in;
00113 unsigned packets_out;
00114 int terminate;
00115 unsigned long myipaddr;
00116 char myipaddrstr[MAX_IP_ADDRSZ];
00117 } iDnxWlm;
00118
00119
00120 static void * dnxWorker(void * data);
00121
00122
00123
00124
00125
00134 static void logConfigChanges(DnxWlmCfgData * ocp, DnxWlmCfgData * ncp)
00135 {
00136 if (strcmp(ocp->dispatcherUrl, ncp->dispatcherUrl) != 0)
00137 dnxLog("Config parameter 'channelRegistrar' changed from %s to %s. "
00138 "NOTE: Changing the dispatcher URL requires a restart.",
00139 ocp->dispatcherUrl, ncp->dispatcherUrl);
00140
00141 if (strcmp(ocp->collectorUrl, ncp->collectorUrl) != 0)
00142 dnxLog("Config parameter 'channelCollector' changed from %s to %s. "
00143 "NOTE: Changing the collector URL requires a restart.",
00144 ocp->collectorUrl, ncp->collectorUrl);
00145
00146 if (ocp->reqTimeout != ncp->reqTimeout)
00147 dnxLog("Config parameter 'threadRequestTimeout' changed from %u to %u.",
00148 ocp->reqTimeout, ncp->reqTimeout);
00149
00150 if (ocp->ttlBackoff != ncp->ttlBackoff)
00151 dnxLog("Config parameter 'threadTtlBackoff' changed from %u to %u.",
00152 ocp->ttlBackoff, ncp->ttlBackoff);
00153
00154 if (ocp->maxRetries != ncp->maxRetries)
00155 dnxLog("Config parameter 'threadMaxTimeouts' changed from %u to %u.",
00156 ocp->maxRetries, ncp->maxRetries);
00157
00158 if (ocp->poolMin != ncp->poolMin)
00159 dnxLog("Config parameter 'poolMin' changed from %u to %u.",
00160 ocp->poolMin, ncp->poolMin);
00161
00162 if (ocp->poolInitial != ncp->poolInitial)
00163 dnxLog("Config parameter 'poolInitial' changed from %u to %u.",
00164 ocp->poolInitial, ncp->poolInitial);
00165
00166 if (ocp->poolMax != ncp->poolMax)
00167 dnxLog("Config parameter 'poolMax' changed from %u to %u.",
00168 ocp->poolMax, ncp->poolMax);
00169
00170 if (ocp->poolGrow != ncp->poolGrow)
00171 dnxLog("Config parameter 'poolGrow' changed from %u to %u.",
00172 ocp->poolGrow, ncp->poolGrow);
00173
00174 if (ocp->pollInterval != ncp->pollInterval)
00175 dnxLog("Config parameter 'wlmPollInterval' changed from %u to %u.",
00176 ocp->pollInterval, ncp->pollInterval);
00177
00178 if (ocp->shutdownGrace != ncp->shutdownGrace)
00179 dnxLog("Config parameter 'wlmShutdownGracePeriod' changed from %u to %u.",
00180 ocp->shutdownGrace, ncp->shutdownGrace);
00181
00182 if (ocp->maxResults != ncp->maxResults)
00183 dnxLog("Config parameter 'maxResultBuffer' changed from %u to %u.",
00184 ocp->maxResults, ncp->maxResults);
00185
00186 if (ocp->showNodeAddr != ncp->showNodeAddr)
00187 dnxLog("Config parameter 'showNodeAddr' changed from %s to %s.",
00188 ocp->showNodeAddr? "TRUE" : "FALSE",
00189 ncp->showNodeAddr? "TRUE" : "FALSE");
00190 }
00191
00192
00193
00200 static int initWorkerComm(DnxWorkerStatus * ws)
00201 {
00202
00203 struct { char name[64]; char * url; int mode; DnxChannel ** chptr; } channels[] =
00204 {
00205 { "Dispatch:", ws->iwlm->cfg.dispatcherUrl, DNX_MODE_ACTIVE, &ws->dispatch },
00206 { "Collect :", ws->iwlm->cfg.collectorUrl, DNX_MODE_ACTIVE, &ws->collect },
00207 };
00208
00209 int i, ret;
00210
00211
00212 for (i = 0; i < elemcount(channels); i++)
00213 {
00214 sprintf(channels[i].name + 9, "%lx", ws);
00215 if ((ret = dnxChanMapAdd(channels[i].name, channels[i].url)) != DNX_OK)
00216 dnxLog("WLM: Failed to initialize %s channel: %s.",
00217 channels[i].name, dnxErrorString(ret));
00218 else if ((ret = dnxConnect(channels[i].name, channels[i].mode,
00219 channels[i].chptr)) != DNX_OK)
00220 {
00221 dnxLog("WLM: Failed to open %s channel: %s.",
00222 channels[i].name, dnxErrorString(ret));
00223 dnxChanMapDelete(channels[i].name);
00224 }
00225
00226
00227 if (ret != DNX_OK)
00228 {
00229 while (--i >= 0)
00230 {
00231 dnxDisconnect(*channels[i].chptr);
00232 dnxChanMapDelete(channels[i].name);
00233 }
00234 break;
00235 }
00236 }
00237 return ret;
00238 }
00239
00240
00241
00246 static void releaseWorkerComm(DnxWorkerStatus * ws)
00247 {
00248 char szChan[64];
00249
00250
00251 dnxDisconnect(ws->collect);
00252 sprintf(szChan, "Collect :%lx", ws);
00253 dnxChanMapDelete(szChan);
00254
00255
00256 dnxDisconnect(ws->dispatch);
00257 sprintf(szChan, "Dispatch:%lx", ws);
00258 dnxChanMapDelete(szChan);
00259 }
00260
00261
00262
00271 static int workerCreate(iDnxWlm * iwlm, DnxWorkerStatus ** pws)
00272 {
00273 DnxWorkerStatus * ws = NULL;
00274 int ret;
00275
00276
00277 if ((ws = (DnxWorkerStatus *)xmalloc(sizeof *ws)) == 0)
00278 return DNX_ERR_MEMORY;
00279 memset(ws, 0, sizeof *ws);
00280 ws->iwlm = iwlm;
00281
00282
00283 if ((ret = initWorkerComm(ws)) != 0)
00284 {
00285 dnxLog("WLM: Failed to initialize worker comm channels: %s.",
00286 dnxErrorString(ret));
00287 xfree(ws);
00288 return ret;
00289 }
00290
00291
00292 ws->state = DNX_THREAD_RUNNING;
00293 if ((ret = pthread_create(&ws->tid, 0, dnxWorker, ws)) != 0)
00294 {
00295 dnxLog("WLM: Failed to create worker thread: %s.", strerror(ret));
00296 releaseWorkerComm(ws);
00297 xfree(ws);
00298 return DNX_ERR_THREAD;
00299 }
00300 *pws = ws;
00301 return 0;
00302 }
00303
00308 static void cleanThreadPool(iDnxWlm * iwlm)
00309 {
00310 unsigned i = 0;
00311 clock_t now = clock();
00312
00313
00314 while (i < iwlm->threads)
00315 {
00316 if (iwlm->pool[i]->state == DNX_THREAD_ZOMBIE)
00317 {
00318 DnxWorkerStatus * ws = iwlm->pool[i];
00319 int ret;
00320
00321 dnxDebug(1, "WLM: Joining worker [%lx]...", ws->tid);
00322 pthread_join(ws->tid, 0);
00323
00324
00325 iwlm->threads--;
00326 iwlm->tdestroyed++;
00327 iwlm->threadtm += (unsigned)((now - ws->tstart) / CLOCKS_PER_SEC);
00328
00329
00330 releaseWorkerComm(ws);
00331 xfree(iwlm->pool[i]);
00332 memmove(&iwlm->pool[i], &iwlm->pool[i + 1],
00333 (iwlm->threads - i) * sizeof iwlm->pool[i]);
00334 continue;
00335 }
00336 i++;
00337 }
00338 }
00339
00340
00341
00355 static int growThreadPool(iDnxWlm * iwlm)
00356 {
00357 unsigned i, add, growsz;
00358 int ret;
00359
00360
00361 if (iwlm->threads < iwlm->cfg.poolInitial)
00362 growsz = iwlm->cfg.poolInitial - iwlm->threads;
00363 else if (iwlm->threads + iwlm->cfg.poolGrow > iwlm->cfg.poolMax)
00364 growsz = iwlm->cfg.poolMax - iwlm->threads;
00365 else
00366 growsz = iwlm->cfg.poolGrow;
00367
00368
00369 for (i = iwlm->threads, add = growsz; i < iwlm->poolsz && add > 0; i++, add--)
00370 {
00371 if ((ret = workerCreate(iwlm, &iwlm->pool[i])) != 0)
00372 break;
00373 iwlm->threads++;
00374 iwlm->tcreated++;
00375 }
00376 dnxLog("WLM: Increased thread pool by %d.", growsz - add);
00377 return ret;
00378 }
00379
00380
00381
00386 static void dnxWorkerCleanup(void * data)
00387 {
00388 DnxWorkerStatus * ws = (DnxWorkerStatus *)data;
00389 assert(data);
00390 dnxDebug(2, "Worker [%lx]: Terminating.", pthread_self());
00391 ws->state = DNX_THREAD_ZOMBIE;
00392 }
00393
00394
00395
00403 static void * dnxWorker(void * data)
00404 {
00405 DnxWorkerStatus * ws = (DnxWorkerStatus *)data;
00406 pthread_t tid = pthread_self();
00407 int retries = 0;
00408 iDnxWlm * iwlm;
00409
00410 assert(data);
00411
00412 iwlm = ws->iwlm;
00413
00414 pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, 0);
00415 pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, 0);
00416 pthread_cleanup_push(dnxWorkerCleanup, data);
00417
00418 ws->tstart = clock();
00419
00420 while (!iwlm->terminate)
00421 {
00422 DnxNodeRequest msg;
00423 DnxJob job;
00424 int ret;
00425
00426
00427 dnxMakeXID(&msg.xid, DNX_OBJ_WORKER, tid, iwlm->myipaddr);
00428 msg.reqType = DNX_REQ_REGISTER;
00429 msg.jobCap = 1;
00430 msg.ttl = iwlm->cfg.reqTimeout - iwlm->cfg.ttlBackoff;
00431
00432
00433 dnxDebug(9, "Worker [%lx]: Sending node request: "
00434 "REGISTER for 1 job, ttl: %d seconds.", tid, msg.ttl);
00435
00436
00437 if ((ret = dnxSendNodeRequest(ws->dispatch, &msg)) != DNX_OK)
00438 dnxLog("Worker [%lx]: Error sending node request: %s.",
00439 tid, dnxErrorString(ret));
00440 else
00441 {
00442 DNX_PT_MUTEX_LOCK(&iwlm->mutex);
00443 iwlm->reqsent++;
00444 DNX_PT_MUTEX_UNLOCK(&iwlm->mutex);
00445 }
00446
00447
00448 if ((ret = dnxWaitForJob(ws->dispatch, &job, iwlm->cfg.reqTimeout)) != DNX_OK
00449 && ret != DNX_ERR_TIMEOUT)
00450 {
00451 if (ret == DNX_ERR_ADDRESS)
00452 {
00453 dnxDebug(8, "Worker [%lx]: No server; sleeping for a while.", tid);
00454 dnxCancelableSleep(2 * 1000);
00455 }
00456 else
00457 dnxLog("Worker [%lx]: Error receiving job: %s.",
00458 tid, dnxErrorString(ret));
00459 }
00460 pthread_testcancel();
00461
00462 DNX_PT_MUTEX_LOCK(&iwlm->mutex);
00463 cleanThreadPool(iwlm);
00464 if (ret != DNX_OK)
00465 {
00466
00467 if (iwlm->threads > iwlm->cfg.poolMin
00468 && ++retries > iwlm->cfg.maxRetries)
00469 {
00470 dnxLog("Worker [%lx]: Exiting - max retries exceeded.", tid);
00471 DNX_PT_MUTEX_UNLOCK(&iwlm->mutex);
00472 break;
00473 }
00474 }
00475 else
00476 {
00477 iwlm->jobsrcvd++;
00478 iwlm->active++;
00479
00480
00481
00482
00483 if (!iwlm->terminate
00484 && iwlm->threads < iwlm->cfg.poolMax
00485 && iwlm->active == iwlm->threads)
00486 growThreadPool(iwlm);
00487 }
00488 DNX_PT_MUTEX_UNLOCK(&iwlm->mutex);
00489
00490
00491 if (ret == DNX_OK)
00492 {
00493 char resData[MAX_RESULT_DATA + 1];
00494 clock_t jobstart, jobstop;
00495 DnxResult result;
00496
00497 dnxDebug(3, "Worker [%lx]: Received job [%lu,%lu] (T/O %d): %s.",
00498 tid, job.xid.objSerial, job.xid.objSlot, job.timeout, job.cmd);
00499
00500
00501 result.xid = job.xid;
00502 result.state = DNX_JOB_COMPLETE;
00503 result.delta = 0;
00504 result.resCode = DNX_PLUGIN_RESULT_OK;
00505 result.resData = 0;
00506
00509
00510
00511
00512 pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, 0);
00513
00514 *resData = 0;
00515 jobstart = clock();
00516 dnxPluginExecute(job.cmd, &result.resCode, resData, sizeof resData - 1,
00517 job.timeout,iwlm->cfg.showNodeAddr? iwlm->myipaddrstr: 0);
00518 jobstop = clock();
00519 result.delta = (unsigned)((jobstop > jobstart?
00520 jobstop - jobstart: 0) / CLOCKS_PER_SEC);
00521
00522 pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, 0);
00523
00524
00525 if (*resData) result.resData = xstrdup(resData);
00526
00527 dnxDebug(3, "Worker [%lx]: Job [%lu,%lu] completed in %lu seconds: %d, %s.",
00528 tid, job.xid.objSerial, job.xid.objSlot, result.delta,
00529 result.resCode, result.resData);
00530
00531 if ((ret = dnxSendResult(ws->collect, &result)) != DNX_OK)
00532 dnxDebug(3, "Worker [%lx]: Post job [%lu,%lu] results failed: %s.",
00533 tid, job.xid.objSerial, job.xid.objSlot, dnxErrorString(ret));
00534
00535 xfree(result.resData);
00536 xfree(job.cmd);
00537
00538
00539 DNX_PT_MUTEX_LOCK(&iwlm->mutex);
00540 {
00541
00542 if (result.resCode == DNX_PLUGIN_RESULT_OK)
00543 iwlm->jobsok++;
00544 else
00545 iwlm->jobsfail++;
00546
00547
00548 if (result.delta > iwlm->maxexectm)
00549 iwlm->maxexectm = result.delta;
00550 if (result.delta < iwlm->minexectm)
00551 iwlm->minexectm = result.delta;
00552 iwlm->avgexectm = (iwlm->avgexectm + result.delta) / 2;
00553
00554
00555 iwlm->jobtm += (unsigned)result.delta;
00556 iwlm->active--;
00557 }
00558 DNX_PT_MUTEX_UNLOCK(&iwlm->mutex);
00559
00560 ws->serial++;
00561 retries = 0;
00562 }
00563 }
00564 pthread_cleanup_pop(1);
00565 return 0;
00566 }
00567
00568
00569
00570
00571
00572 void dnxWlmResetStats(DnxWlm * wlm)
00573 {
00574 iDnxWlm * iwlm = (iDnxWlm *)wlm;
00575
00576 assert(wlm);
00577
00578 DNX_PT_MUTEX_LOCK(&iwlm->mutex);
00579 iwlm->jobtm = iwlm->threadtm = 0;
00580 iwlm->jobsok = iwlm->jobsfail = iwlm->tcreated = iwlm->tdestroyed = 0;
00581 iwlm->reqsent = iwlm->jobsrcvd = iwlm->avgexectm = 0;
00582 iwlm->maxexectm = iwlm->avgthreads = iwlm->avgactive = 0;
00583 iwlm->minexectm = (unsigned)(-1);
00584 iwlm->packets_out = 0;
00585 iwlm->packets_in = 0;
00586 DNX_PT_MUTEX_UNLOCK(&iwlm->mutex);
00587 }
00588
00589
00590
00591 void dnxWlmGetStats(DnxWlm * wlm, DnxWlmStats * wsp)
00592 {
00593 iDnxWlm * iwlm = (iDnxWlm *)wlm;
00594
00595 assert(wlm && wsp);
00596
00597 DNX_PT_MUTEX_LOCK(&iwlm->mutex);
00598 wsp->jobs_succeeded = iwlm->jobsok;
00599 wsp->jobs_failed = iwlm->jobsfail;
00600 wsp->threads_created = iwlm->tcreated;
00601 wsp->threads_destroyed = iwlm->tdestroyed;
00602 wsp->total_threads = iwlm->threads;
00603 wsp->active_threads = iwlm->active;
00604 wsp->requests_sent = iwlm->reqsent;
00605 wsp->jobs_received = iwlm->jobsrcvd;
00606 wsp->min_exec_time = iwlm->minexectm;
00607 wsp->avg_exec_time = iwlm->avgexectm;
00608 wsp->max_exec_time = iwlm->maxexectm;
00609 wsp->avg_total_threads = iwlm->avgthreads;
00610 wsp->avg_active_threads = iwlm->avgactive;
00611 wsp->thread_time = iwlm->threadtm;
00612 wsp->job_time = iwlm->jobtm;
00613 wsp->packets_out = iwlm->packets_out;
00614 wsp->packets_in = iwlm->packets_in;
00615 DNX_PT_MUTEX_UNLOCK(&iwlm->mutex);
00616 }
00617
00618
00619
00620 int dnxWlmReconfigure(DnxWlm * wlm, DnxWlmCfgData * cfg)
00621 {
00622 iDnxWlm * iwlm = (iDnxWlm *)wlm;
00623 DnxWorkerStatus ** pool;
00624 int ret = 0;
00625
00626 assert(wlm && cfg);
00627 assert(cfg->poolMin > 0);
00628 assert(cfg->poolMax >= cfg->poolMin);
00629 assert(cfg->poolInitial >= cfg->poolMin);
00630 assert(cfg->poolInitial <= cfg->poolMax);
00631
00632 DNX_PT_MUTEX_LOCK(&iwlm->mutex);
00633
00634
00635
00636 logConfigChanges(&iwlm->cfg, cfg);
00637
00638 iwlm->cfg.reqTimeout = cfg->reqTimeout;
00639 iwlm->cfg.ttlBackoff = cfg->ttlBackoff;
00640 iwlm->cfg.maxRetries = cfg->maxRetries;
00641 iwlm->cfg.poolMin = cfg->poolMin;
00642 iwlm->cfg.poolInitial = cfg->poolInitial;
00643 iwlm->cfg.poolMax = cfg->poolMax;
00644 iwlm->cfg.poolGrow = cfg->poolGrow;
00645 iwlm->cfg.pollInterval = cfg->pollInterval;
00646 iwlm->cfg.shutdownGrace = cfg->shutdownGrace;
00647 iwlm->cfg.maxResults = cfg->maxResults;
00648 iwlm->cfg.showNodeAddr = cfg->showNodeAddr;
00649
00650
00651
00652 while (iwlm->threads > iwlm->cfg.poolMax)
00653 {
00654 DNX_PT_MUTEX_UNLOCK(&iwlm->mutex);
00655 dnxCancelableSleep(3 * 1000);
00656 DNX_PT_MUTEX_LOCK(&iwlm->mutex);
00657 }
00658
00659
00660 if ((pool = (DnxWorkerStatus **)xrealloc(iwlm->pool,
00661 iwlm->cfg.poolMax * sizeof *pool)) == 0)
00662 ret = DNX_ERR_MEMORY;
00663 else
00664 {
00665 iwlm->poolsz = iwlm->cfg.poolMax;
00666 iwlm->pool = pool;
00667 }
00668
00669 DNX_PT_MUTEX_UNLOCK(&iwlm->mutex);
00670
00671 return ret;
00672 }
00673
00674
00675
00676 int dnxWlmCreate(DnxWlmCfgData * cfg, DnxWlm ** pwlm)
00677 {
00678 iDnxWlm * iwlm;
00679 struct ifaddrs * ifa = NULL;
00680
00681 assert(cfg && pwlm);
00682 assert(cfg->poolMin > 0);
00683 assert(cfg->poolMax >= cfg->poolMin);
00684 assert(cfg->poolInitial >= cfg->poolMin);
00685 assert(cfg->poolInitial <= cfg->poolMax);
00686
00687
00688 if ((iwlm = (iDnxWlm *)xmalloc(sizeof *iwlm)) == 0)
00689 return DNX_ERR_MEMORY;
00690
00691 memset(iwlm, 0, sizeof *iwlm);
00692 iwlm->cfg = *cfg;
00693 iwlm->cfg.dispatcherUrl = xstrdup(iwlm->cfg.dispatcherUrl);
00694 iwlm->cfg.collectorUrl = xstrdup(iwlm->cfg.collectorUrl);
00695 iwlm->poolsz = iwlm->cfg.poolMax;
00696 iwlm->pool = (DnxWorkerStatus **)xmalloc(iwlm->poolsz * sizeof *iwlm->pool);
00697 iwlm->minexectm = (unsigned)(-1);
00698 memset(iwlm->pool, 0, iwlm->poolsz * sizeof *iwlm->pool);
00699
00700
00701 if (!iwlm->cfg.dispatcherUrl || !iwlm->cfg.collectorUrl || !iwlm->pool)
00702 {
00703 xfree(iwlm->cfg.collectorUrl);
00704 xfree(iwlm->cfg.dispatcherUrl);
00705 xfree(iwlm);
00706 return DNX_ERR_MEMORY;
00707 }
00708
00709
00710 if (getifaddrs(&ifa) == 0)
00711 {
00712 u_int setflags = IFF_UP | IFF_RUNNING;
00713 u_int clrflags = IFF_LOOPBACK;
00714 struct ifaddrs * ifcur = ifa;
00715
00716
00717 while (ifcur && (ifcur->ifa_addr == 0
00718 || ifcur->ifa_addr->sa_family != AF_INET
00719 || (ifcur->ifa_flags & setflags) != setflags
00720 || (ifcur->ifa_flags & clrflags) != 0))
00721 ifcur = ifcur->ifa_next;
00722
00723 if (ifcur)
00724 {
00725
00726 iwlm->myipaddr = (unsigned long)
00727 ((struct sockaddr_in *)ifcur->ifa_addr)->sin_addr.s_addr;
00728 inet_ntop(ifcur->ifa_addr->sa_family,
00729 &((struct sockaddr_in *)ifcur->ifa_addr)->sin_addr,
00730 iwlm->myipaddrstr, sizeof iwlm->myipaddrstr);
00731 }
00732 freeifaddrs(ifa);
00733 }
00734
00735
00736 DNX_PT_MUTEX_INIT(&iwlm->mutex);
00737 DNX_PT_MUTEX_LOCK(&iwlm->mutex);
00738 {
00739 int ret;
00740 if ((ret = growThreadPool(iwlm)) != DNX_OK)
00741 {
00742 if (iwlm->threads)
00743 dnxLog("WLM: Error creating SOME worker threads: %s; "
00744 "continuing with smaller initial pool.", dnxErrorString(ret));
00745 else
00746 {
00747 dnxLog("WLM: Unable to create ANY worker threads: %s; "
00748 "terminating.", dnxErrorString(ret));
00749 DNX_PT_MUTEX_UNLOCK(&iwlm->mutex);
00750 DNX_PT_MUTEX_DESTROY(&iwlm->mutex);
00751 xfree(iwlm->cfg.collectorUrl);
00752 xfree(iwlm->cfg.dispatcherUrl);
00753 xfree(iwlm);
00754 return ret;
00755 }
00756 }
00757 }
00758 DNX_PT_MUTEX_UNLOCK(&iwlm->mutex);
00759
00760 dnxLog("WLM: Started worker thread pool.");
00761
00762 *pwlm = (DnxWlm *)iwlm;
00763
00764 return DNX_OK;
00765 }
00766
00767
00768
00769 void dnxWlmDestroy(DnxWlm * wlm)
00770 {
00771 iDnxWlm * iwlm = (iDnxWlm *)wlm;
00772 time_t expires;
00773 unsigned i;
00774
00775 assert(wlm);
00776
00777 dnxLog("WLM: Beginning termination sequence...");
00778
00779
00780 iwlm->terminate = 1;
00781 expires = iwlm->cfg.shutdownGrace + time(0);
00782
00783 DNX_PT_MUTEX_LOCK(&iwlm->mutex);
00784 while (iwlm->threads > 0 && time(0) < expires)
00785 {
00786 cleanThreadPool(iwlm);
00787 DNX_PT_MUTEX_UNLOCK(&iwlm->mutex);
00788 dnxCancelableSleep(100);
00789 DNX_PT_MUTEX_LOCK(&iwlm->mutex);
00790 }
00791
00792
00793 if (iwlm->threads)
00794 dnxDebug(1, "WLM: Termination - %d workers remaining"
00795 " after grace period.", iwlm->threads);
00796
00797
00798 for (i = 0; i < iwlm->threads; i++)
00799 if (iwlm->pool[i]->state == DNX_THREAD_RUNNING)
00800 {
00801 dnxDebug(1, "WLMDestroy: Cancelling worker [%lx].", iwlm->pool[i]->tid);
00802 pthread_cancel(iwlm->pool[i]->tid);
00803 }
00804
00805
00806 DNX_PT_MUTEX_UNLOCK(&iwlm->mutex);
00807 dnxCancelableSleep(1000);
00808 DNX_PT_MUTEX_LOCK(&iwlm->mutex);
00809
00810
00811 cleanThreadPool(iwlm);
00812 assert(iwlm->threads == 0);
00813 xfree(iwlm->pool);
00814 DNX_PT_MUTEX_UNLOCK(&iwlm->mutex);
00815
00816 DNX_PT_MUTEX_DESTROY(&iwlm->mutex);
00817
00818 xfree(iwlm->cfg.collectorUrl);
00819 xfree(iwlm->cfg.dispatcherUrl);
00820 xfree(iwlm);
00821
00822 dnxLog("WLM: Termination sequence complete.");
00823 }
00824
00825
00826