dnxWLM.c

Go to the documentation of this file.
00001 /*--------------------------------------------------------------------------
00002 
00003    Copyright (c) 2006-2007, Intellectual Reserve, Inc. All rights reserved.
00004 
00005    This program is free software; you can redistribute it and/or modify
00006    it under the terms of the GNU General Public License version 2 as
00007    published by the Free Software Foundation.
00008 
00009    This program is distributed in the hope that it will be useful,
00010    but WITHOUT ANY WARRANTY; without even the implied warranty of
00011    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00012    GNU General Public License for more details.
00013 
00014    You should have received a copy of the GNU General Public License
00015    along with this program; if not, write to the Free Software
00016    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00017 
00018   --------------------------------------------------------------------------*/
00019 
00035 #include "dnxWLM.h"
00036 
00037 #include "dnxError.h"
00038 #include "dnxDebug.h"
00039 #include "dnxLogging.h"
00040 #include "dnxTransport.h"
00041 #include "dnxSleep.h"
00042 #include "dnxClntProt.h"
00043 #include "dnxPlugin.h"
00044 
00045 #include <sys/time.h>
00046 #include <unistd.h>
00047 #include <stdio.h>
00048 #include <stdlib.h>
00049 #include <string.h>
00050 #include <time.h>
00051 #include <pthread.h>
00052 #include <assert.h>
00053 #include <errno.h>
00054 
00055 #include <sys/types.h>
00056 #include <sys/socket.h>
00057 #include <netinet/in.h>
00058 #include <net/if.h>     // MUST be included before ifaddrs.h!
00059 #include <ifaddrs.h>
00060 
00061 #define elemcount(x) (sizeof(x)/sizeof(*(x)))
00062 
00064 #define MAX_RESULT_DATA 1024
00065 
00066 #define MAX_IP_ADDRSZ   64
00067 
00068 struct iDnxWlm;               // forward declaration: circular reference
00069 
00071 typedef enum DnxThreadState
00072 {
00073    DNX_THREAD_DEAD = 0,
00074    DNX_THREAD_RUNNING,
00075    DNX_THREAD_ZOMBIE
00076 } DnxThreadState;
00077 
00079 typedef struct DnxWorkerStatus
00080 {
00081    DnxThreadState state;      
00082    pthread_t tid;             
00083    DnxChannel * dispatch;     
00084    DnxChannel * collect;      
00085    clock_t tstart;            
00086    unsigned serial;           
00087    struct iDnxWlm * iwlm;     
00088 } DnxWorkerStatus;
00089 
00091 typedef struct iDnxWlm
00092 {
00093    DnxWlmCfgData cfg;         
00094    DnxWorkerStatus ** pool;   
00095    pthread_mutex_t mutex;     
00096    unsigned jobtm;            
00097    unsigned threadtm;         
00098    unsigned jobsok;           
00099    unsigned jobsfail;         
00100    unsigned active;           
00101    unsigned tcreated;         
00102    unsigned tdestroyed;       
00103    unsigned threads;          
00104    unsigned reqsent;          
00105    unsigned jobsrcvd;         
00106    unsigned minexectm;        
00107    unsigned avgexectm;        
00108    unsigned maxexectm;        
00109    unsigned avgthreads;       
00110    unsigned avgactive;        
00111    unsigned poolsz;           
00112    unsigned packets_in;       
00113    unsigned packets_out;      
00114    int terminate;             
00115    unsigned long myipaddr;    
00116    char myipaddrstr[MAX_IP_ADDRSZ];
00117 } iDnxWlm;
00118 
00119 // forward declaration required by source code organization
00120 static void * dnxWorker(void * data);
00121 
00122 /*--------------------------------------------------------------------------
00123                      WORK LOAD MANAGER IMPLEMENTATION
00124   --------------------------------------------------------------------------*/
00125 
00134 static void logConfigChanges(DnxWlmCfgData * ocp, DnxWlmCfgData * ncp)
00135 {
00136    if (strcmp(ocp->dispatcherUrl, ncp->dispatcherUrl) != 0)
00137       dnxLog("Config parameter 'channelRegistrar' changed from %s to %s. "
00138             "NOTE: Changing the dispatcher URL requires a restart.",
00139             ocp->dispatcherUrl, ncp->dispatcherUrl);
00140 
00141    if (strcmp(ocp->collectorUrl, ncp->collectorUrl) != 0)
00142       dnxLog("Config parameter 'channelCollector' changed from %s to %s. "
00143             "NOTE: Changing the collector URL requires a restart.",
00144             ocp->collectorUrl, ncp->collectorUrl);
00145 
00146    if (ocp->reqTimeout != ncp->reqTimeout)
00147       dnxLog("Config parameter 'threadRequestTimeout' changed from %u to %u.",
00148             ocp->reqTimeout, ncp->reqTimeout);
00149 
00150    if (ocp->ttlBackoff != ncp->ttlBackoff)
00151       dnxLog("Config parameter 'threadTtlBackoff' changed from %u to %u.",
00152             ocp->ttlBackoff, ncp->ttlBackoff);
00153 
00154    if (ocp->maxRetries != ncp->maxRetries)
00155       dnxLog("Config parameter 'threadMaxTimeouts' changed from %u to %u.",
00156             ocp->maxRetries, ncp->maxRetries);
00157 
00158    if (ocp->poolMin != ncp->poolMin)
00159       dnxLog("Config parameter 'poolMin' changed from %u to %u.",
00160             ocp->poolMin, ncp->poolMin);
00161 
00162    if (ocp->poolInitial != ncp->poolInitial)
00163       dnxLog("Config parameter 'poolInitial' changed from %u to %u.",
00164             ocp->poolInitial, ncp->poolInitial);
00165 
00166    if (ocp->poolMax != ncp->poolMax)
00167       dnxLog("Config parameter 'poolMax' changed from %u to %u.",
00168             ocp->poolMax, ncp->poolMax);
00169 
00170    if (ocp->poolGrow != ncp->poolGrow)
00171       dnxLog("Config parameter 'poolGrow' changed from %u to %u.",
00172             ocp->poolGrow, ncp->poolGrow);
00173 
00174    if (ocp->pollInterval != ncp->pollInterval)
00175       dnxLog("Config parameter 'wlmPollInterval' changed from %u to %u.",
00176             ocp->pollInterval, ncp->pollInterval);
00177 
00178    if (ocp->shutdownGrace != ncp->shutdownGrace)
00179       dnxLog("Config parameter 'wlmShutdownGracePeriod' changed from %u to %u.",
00180             ocp->shutdownGrace, ncp->shutdownGrace);
00181 
00182    if (ocp->maxResults != ncp->maxResults)
00183       dnxLog("Config parameter 'maxResultBuffer' changed from %u to %u.",
00184             ocp->maxResults, ncp->maxResults);
00185 
00186    if (ocp->showNodeAddr != ncp->showNodeAddr)
00187       dnxLog("Config parameter 'showNodeAddr' changed from %s to %s.",
00188             ocp->showNodeAddr? "TRUE" : "FALSE",
00189             ncp->showNodeAddr? "TRUE" : "FALSE");
00190 }
00191 
00192 //----------------------------------------------------------------------------
00193 
00200 static int initWorkerComm(DnxWorkerStatus * ws)
00201 {
00202    // note that this is NOT a static struct - it must be reinitialized each time
00203    struct { char name[64]; char * url; int mode; DnxChannel ** chptr; } channels[] =
00204    {
00205       { "Dispatch:", ws->iwlm->cfg.dispatcherUrl, DNX_MODE_ACTIVE, &ws->dispatch },
00206       { "Collect :", ws->iwlm->cfg.collectorUrl,  DNX_MODE_ACTIVE, &ws->collect  },
00207    };
00208 
00209    int i, ret;
00210 
00211    // create all comm channels (named after thread id) for this worker thread. 
00212    for (i = 0; i < elemcount(channels); i++)
00213    {
00214       sprintf(channels[i].name + 9, "%lx", ws);
00215       if ((ret = dnxChanMapAdd(channels[i].name, channels[i].url)) != DNX_OK)
00216          dnxLog("WLM: Failed to initialize %s channel: %s.", 
00217                channels[i].name, dnxErrorString(ret));
00218       else if ((ret = dnxConnect(channels[i].name, channels[i].mode, 
00219             channels[i].chptr)) != DNX_OK)
00220       {
00221          dnxLog("WLM: Failed to open %s channel: %s.", 
00222                channels[i].name, dnxErrorString(ret));
00223          dnxChanMapDelete(channels[i].name);
00224       }
00225 
00226       // reverse cleanup and bail out after an error
00227       if (ret != DNX_OK)
00228       {
00229          while (--i >= 0)
00230          {
00231             dnxDisconnect(*channels[i].chptr);
00232             dnxChanMapDelete(channels[i].name);
00233          }
00234          break;
00235       }
00236    }
00237    return ret;
00238 }
00239 
00240 //----------------------------------------------------------------------------
00241 
00246 static void releaseWorkerComm(DnxWorkerStatus * ws)
00247 {
00248    char szChan[64];
00249 
00250    // close and delete the collector channel
00251    dnxDisconnect(ws->collect);
00252    sprintf(szChan, "Collect :%lx", ws);
00253    dnxChanMapDelete(szChan);
00254 
00255    // close and delete the dispatcher channel
00256    dnxDisconnect(ws->dispatch);
00257    sprintf(szChan, "Dispatch:%lx", ws);
00258    dnxChanMapDelete(szChan);
00259 }
00260 
00261 //----------------------------------------------------------------------------
00262 
00271 static int workerCreate(iDnxWlm * iwlm, DnxWorkerStatus ** pws)
00272 {
00273    DnxWorkerStatus * ws = NULL;
00274    int ret;
00275 
00276    // allocate and clear a new worker status structure
00277    if ((ws = (DnxWorkerStatus *)xmalloc(sizeof *ws)) == 0)
00278       return DNX_ERR_MEMORY;
00279    memset(ws, 0, sizeof *ws);
00280    ws->iwlm = iwlm;
00281 
00282    // initialize our communications channels
00283    if ((ret = initWorkerComm(ws)) != 0)
00284    {
00285       dnxLog("WLM: Failed to initialize worker comm channels: %s.",
00286             dnxErrorString(ret));
00287       xfree(ws);
00288       return ret;
00289    }
00290 
00291    // create a worker thread
00292    ws->state = DNX_THREAD_RUNNING; // set thread state to active
00293    if ((ret = pthread_create(&ws->tid, 0, dnxWorker, ws)) != 0)
00294    {
00295       dnxLog("WLM: Failed to create worker thread: %s.", strerror(ret));
00296       releaseWorkerComm(ws);
00297       xfree(ws);
00298       return DNX_ERR_THREAD;
00299    }
00300    *pws = ws;
00301    return 0;
00302 }
00303 
00308 static void cleanThreadPool(iDnxWlm * iwlm)
00309 {
00310    unsigned i = 0;
00311    clock_t now = clock();
00312 
00313    // look for zombie threads to join
00314    while (i < iwlm->threads)
00315    {
00316       if (iwlm->pool[i]->state == DNX_THREAD_ZOMBIE)
00317       {
00318          DnxWorkerStatus * ws = iwlm->pool[i];
00319          int ret;
00320 
00321          dnxDebug(1, "WLM: Joining worker [%lx]...", ws->tid);
00322          pthread_join(ws->tid, 0);
00323 
00324          // reduce thread count; update stats
00325          iwlm->threads--;
00326          iwlm->tdestroyed++;
00327          iwlm->threadtm += (unsigned)((now - ws->tstart) / CLOCKS_PER_SEC);
00328 
00329          // release thread resources; delete thread; compact ptr array
00330          releaseWorkerComm(ws);
00331          xfree(iwlm->pool[i]);
00332          memmove(&iwlm->pool[i], &iwlm->pool[i + 1],
00333                (iwlm->threads - i) * sizeof iwlm->pool[i]);
00334          continue;
00335       }
00336       i++;
00337    }
00338 }
00339 
00340 //----------------------------------------------------------------------------
00341 
00355 static int growThreadPool(iDnxWlm * iwlm)
00356 {
00357    unsigned i, add, growsz;
00358    int ret;
00359 
00360    // set additional thread count - keep us between the min and the max
00361    if (iwlm->threads < iwlm->cfg.poolInitial)
00362       growsz = iwlm->cfg.poolInitial - iwlm->threads;
00363    else if (iwlm->threads + iwlm->cfg.poolGrow > iwlm->cfg.poolMax)
00364       growsz = iwlm->cfg.poolMax - iwlm->threads;
00365    else
00366       growsz = iwlm->cfg.poolGrow;
00367 
00368    // fill as many empty slots as we can or need to
00369    for (i = iwlm->threads, add = growsz; i < iwlm->poolsz && add > 0; i++, add--)
00370    {
00371       if ((ret = workerCreate(iwlm, &iwlm->pool[i])) != 0)
00372          break;
00373       iwlm->threads++;
00374       iwlm->tcreated++;
00375    }
00376    dnxLog("WLM: Increased thread pool by %d.", growsz - add);
00377    return ret;
00378 }
00379 
00380 //----------------------------------------------------------------------------
00381 
00386 static void dnxWorkerCleanup(void * data)
00387 {
00388    DnxWorkerStatus * ws = (DnxWorkerStatus *)data;
00389    assert(data);
00390    dnxDebug(2, "Worker [%lx]: Terminating.", pthread_self());
00391    ws->state = DNX_THREAD_ZOMBIE;
00392 }
00393 
00394 //----------------------------------------------------------------------------
00395 
00403 static void * dnxWorker(void * data)
00404 {
00405    DnxWorkerStatus * ws = (DnxWorkerStatus *)data;
00406    pthread_t tid = pthread_self();
00407    int retries = 0;
00408    iDnxWlm * iwlm;
00409 
00410    assert(data);
00411 
00412    iwlm = ws->iwlm;
00413 
00414    pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, 0);
00415    pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, 0);
00416    pthread_cleanup_push(dnxWorkerCleanup, data);
00417 
00418    ws->tstart = clock();      // set thread start time (for stats)
00419 
00420    while (!iwlm->terminate)
00421    {
00422       DnxNodeRequest msg;
00423       DnxJob job;
00424       int ret;
00425 
00426       // setup job request message - use thread id and node address in XID
00427       dnxMakeXID(&msg.xid, DNX_OBJ_WORKER, tid, iwlm->myipaddr);
00428       msg.reqType = DNX_REQ_REGISTER;
00429       msg.jobCap = 1;
00430       msg.ttl = iwlm->cfg.reqTimeout - iwlm->cfg.ttlBackoff;
00431 
00432       // this debug line is hit hundreds of times per sec when comm is down 
00433       dnxDebug(9, "Worker [%lx]: Sending node request: "
00434             "REGISTER for 1 job, ttl: %d seconds.", tid, msg.ttl);
00435 
00436       // request a job, and then wait for a job to come in...
00437       if ((ret = dnxSendNodeRequest(ws->dispatch, &msg)) != DNX_OK)
00438          dnxLog("Worker [%lx]: Error sending node request: %s.",
00439                tid, dnxErrorString(ret));
00440       else
00441       {
00442          DNX_PT_MUTEX_LOCK(&iwlm->mutex);
00443          iwlm->reqsent++;
00444          DNX_PT_MUTEX_UNLOCK(&iwlm->mutex);
00445       }
00446 
00447       // wait for job, even if request was never sent
00448       if ((ret = dnxWaitForJob(ws->dispatch, &job, iwlm->cfg.reqTimeout)) != DNX_OK 
00449             && ret != DNX_ERR_TIMEOUT)
00450       {
00451          if (ret == DNX_ERR_ADDRESS)
00452          {
00453             dnxDebug(8, "Worker [%lx]: No server; sleeping for a while.", tid); 
00454             dnxCancelableSleep(2 * 1000);
00455          }
00456          else
00457             dnxLog("Worker [%lx]: Error receiving job: %s.",
00458                   tid, dnxErrorString(ret));
00459       }
00460       pthread_testcancel();
00461 
00462       DNX_PT_MUTEX_LOCK(&iwlm->mutex);
00463       cleanThreadPool(iwlm); // ensure counts are accurate before using them
00464       if (ret != DNX_OK)
00465       {
00466          // if above pool minimum and exceeded max retries...
00467          if (iwlm->threads > iwlm->cfg.poolMin
00468                && ++retries > iwlm->cfg.maxRetries)
00469          {
00470             dnxLog("Worker [%lx]: Exiting - max retries exceeded.", tid);
00471             DNX_PT_MUTEX_UNLOCK(&iwlm->mutex);
00472             break;
00473          }
00474       }
00475       else
00476       {
00477          iwlm->jobsrcvd++;
00478          iwlm->active++;
00479 
00480          // check pool size before we get too busy -
00481          // if we're not shutting down and we haven't reached the configured
00482          // maximum and this is the last thread out, then increase the pool
00483          if (!iwlm->terminate
00484                && iwlm->threads < iwlm->cfg.poolMax
00485                && iwlm->active == iwlm->threads)
00486             growThreadPool(iwlm);
00487       }
00488       DNX_PT_MUTEX_UNLOCK(&iwlm->mutex);
00489 
00490       // if we have a job, execute it and reset retry count
00491       if (ret == DNX_OK)
00492       {
00493          char resData[MAX_RESULT_DATA + 1];
00494          clock_t jobstart, jobstop;
00495          DnxResult result;
00496 
00497          dnxDebug(3, "Worker [%lx]: Received job [%lu,%lu] (T/O %d): %s.",
00498                tid, job.xid.objSerial, job.xid.objSlot, job.timeout, job.cmd);
00499 
00500          // prepare result structure
00501          result.xid = job.xid;               // result xid must match job xid
00502          result.state = DNX_JOB_COMPLETE;    // complete or expired
00503          result.delta = 0;
00504          result.resCode = DNX_PLUGIN_RESULT_OK;
00505          result.resData = 0;
00506 
00509          // we want to be able to cancel threads while they're out on a task
00510          // in order to obtain timely shutdown for long jobs - move into
00511          // async cancel mode, but only for the duration of the check
00512          pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, 0);
00513 
00514          *resData = 0;
00515          jobstart = clock();
00516          dnxPluginExecute(job.cmd, &result.resCode, resData, sizeof resData - 1, 
00517                job.timeout,iwlm->cfg.showNodeAddr? iwlm->myipaddrstr: 0);
00518          jobstop = clock();
00519          result.delta = (unsigned)((jobstop > jobstart? 
00520                jobstop - jobstart: 0) / CLOCKS_PER_SEC);
00521 
00522          pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, 0);
00523 
00524          // store allocated copy of the result string
00525          if (*resData) result.resData = xstrdup(resData);
00526 
00527          dnxDebug(3, "Worker [%lx]: Job [%lu,%lu] completed in %lu seconds: %d, %s.",
00528                tid, job.xid.objSerial, job.xid.objSlot, result.delta,
00529                result.resCode, result.resData);
00530 
00531          if ((ret = dnxSendResult(ws->collect, &result)) != DNX_OK)
00532             dnxDebug(3, "Worker [%lx]: Post job [%lu,%lu] results failed: %s.",
00533                   tid, job.xid.objSerial, job.xid.objSlot, dnxErrorString(ret));
00534 
00535          xfree(result.resData);
00536          xfree(job.cmd);
00537 
00538          // update all statistics
00539          DNX_PT_MUTEX_LOCK(&iwlm->mutex);
00540          {
00541             // track status
00542             if (result.resCode == DNX_PLUGIN_RESULT_OK)
00543                iwlm->jobsok++;
00544             else
00545                iwlm->jobsfail++;
00546 
00547             // track min/max/avg execution time
00548             if (result.delta > iwlm->maxexectm)
00549                iwlm->maxexectm = result.delta;
00550             if (result.delta < iwlm->minexectm)
00551                iwlm->minexectm = result.delta;
00552             iwlm->avgexectm = (iwlm->avgexectm + result.delta) / 2;
00553 
00554             // total job processing time
00555             iwlm->jobtm += (unsigned)result.delta;
00556             iwlm->active--;   // reduce active count
00557          }
00558          DNX_PT_MUTEX_UNLOCK(&iwlm->mutex);
00559 
00560          ws->serial++;        // increment job serial number for next job
00561          retries = 0;
00562       }
00563    }
00564    pthread_cleanup_pop(1);
00565    return 0;
00566 }
00567 
00568 /*--------------------------------------------------------------------------
00569                         WORK LOAD MANAGER INTERFACE
00570   --------------------------------------------------------------------------*/
00571 
00572 void dnxWlmResetStats(DnxWlm * wlm)
00573 {
00574    iDnxWlm * iwlm = (iDnxWlm *)wlm;
00575 
00576    assert(wlm);
00577 
00578    DNX_PT_MUTEX_LOCK(&iwlm->mutex);
00579    iwlm->jobtm = iwlm->threadtm = 0;
00580    iwlm->jobsok = iwlm->jobsfail = iwlm->tcreated = iwlm->tdestroyed = 0;
00581    iwlm->reqsent = iwlm->jobsrcvd = iwlm->avgexectm = 0;
00582    iwlm->maxexectm = iwlm->avgthreads = iwlm->avgactive = 0;
00583    iwlm->minexectm = (unsigned)(-1);   // the largest possible value
00584    iwlm->packets_out = 0;
00585    iwlm->packets_in = 0;
00586    DNX_PT_MUTEX_UNLOCK(&iwlm->mutex);
00587 }
00588 
00589 //----------------------------------------------------------------------------
00590 
00591 void dnxWlmGetStats(DnxWlm * wlm, DnxWlmStats * wsp)
00592 {
00593    iDnxWlm * iwlm = (iDnxWlm *)wlm;
00594 
00595    assert(wlm && wsp);
00596 
00597    DNX_PT_MUTEX_LOCK(&iwlm->mutex);
00598    wsp->jobs_succeeded = iwlm->jobsok;
00599    wsp->jobs_failed = iwlm->jobsfail;
00600    wsp->threads_created = iwlm->tcreated;
00601    wsp->threads_destroyed = iwlm->tdestroyed;
00602    wsp->total_threads = iwlm->threads;
00603    wsp->active_threads = iwlm->active;
00604    wsp->requests_sent = iwlm->reqsent;
00605    wsp->jobs_received = iwlm->jobsrcvd;
00606    wsp->min_exec_time = iwlm->minexectm;
00607    wsp->avg_exec_time = iwlm->avgexectm;
00608    wsp->max_exec_time = iwlm->maxexectm;
00609    wsp->avg_total_threads = iwlm->avgthreads;
00610    wsp->avg_active_threads = iwlm->avgactive;
00611    wsp->thread_time = iwlm->threadtm;
00612    wsp->job_time = iwlm->jobtm;
00613    wsp->packets_out = iwlm->packets_out;
00614    wsp->packets_in = iwlm->packets_in;
00615    DNX_PT_MUTEX_UNLOCK(&iwlm->mutex);
00616 }
00617 
00618 //----------------------------------------------------------------------------
00619 
00620 int dnxWlmReconfigure(DnxWlm * wlm, DnxWlmCfgData * cfg)
00621 {
00622    iDnxWlm * iwlm = (iDnxWlm *)wlm;
00623    DnxWorkerStatus ** pool;
00624    int ret = 0;
00625 
00626    assert(wlm && cfg);
00627    assert(cfg->poolMin > 0);
00628    assert(cfg->poolMax >= cfg->poolMin);
00629    assert(cfg->poolInitial >= cfg->poolMin);
00630    assert(cfg->poolInitial <= cfg->poolMax);
00631 
00632    DNX_PT_MUTEX_LOCK(&iwlm->mutex);
00633 
00634    // dynamic reconfiguration of dispatcher/collector URL's is not allowed
00635 
00636    logConfigChanges(&iwlm->cfg, cfg);
00637 
00638    iwlm->cfg.reqTimeout = cfg->reqTimeout;
00639    iwlm->cfg.ttlBackoff = cfg->ttlBackoff;
00640    iwlm->cfg.maxRetries = cfg->maxRetries;
00641    iwlm->cfg.poolMin = cfg->poolMin;
00642    iwlm->cfg.poolInitial = cfg->poolInitial;
00643    iwlm->cfg.poolMax = cfg->poolMax;
00644    iwlm->cfg.poolGrow = cfg->poolGrow;
00645    iwlm->cfg.pollInterval = cfg->pollInterval;
00646    iwlm->cfg.shutdownGrace = cfg->shutdownGrace;
00647    iwlm->cfg.maxResults = cfg->maxResults;
00648    iwlm->cfg.showNodeAddr = cfg->showNodeAddr;
00649 
00650    // we can't reduce the poolsz until the number of threads
00651    //    drops below the new maximum
00652    while (iwlm->threads > iwlm->cfg.poolMax)
00653    {
00654       DNX_PT_MUTEX_UNLOCK(&iwlm->mutex);
00655       dnxCancelableSleep(3 * 1000);
00656       DNX_PT_MUTEX_LOCK(&iwlm->mutex);
00657    }
00658 
00659    // reallocate the pool to the new size
00660    if ((pool = (DnxWorkerStatus **)xrealloc(iwlm->pool,
00661          iwlm->cfg.poolMax * sizeof *pool)) == 0)
00662       ret = DNX_ERR_MEMORY;
00663    else
00664    {
00665       iwlm->poolsz = iwlm->cfg.poolMax;
00666       iwlm->pool = pool;
00667    }
00668 
00669    DNX_PT_MUTEX_UNLOCK(&iwlm->mutex);
00670 
00671    return ret;
00672 }
00673 
00674 //----------------------------------------------------------------------------
00675 
00676 int dnxWlmCreate(DnxWlmCfgData * cfg, DnxWlm ** pwlm)
00677 {
00678    iDnxWlm * iwlm;
00679    struct ifaddrs * ifa = NULL;
00680 
00681    assert(cfg && pwlm);
00682    assert(cfg->poolMin > 0);
00683    assert(cfg->poolMax >= cfg->poolMin);
00684    assert(cfg->poolInitial >= cfg->poolMin);
00685    assert(cfg->poolInitial <= cfg->poolMax);
00686 
00687    // allocate and configure the master thread pool data structure
00688    if ((iwlm = (iDnxWlm *)xmalloc(sizeof *iwlm)) == 0)
00689       return DNX_ERR_MEMORY;
00690 
00691    memset(iwlm, 0, sizeof *iwlm);
00692    iwlm->cfg = *cfg;
00693    iwlm->cfg.dispatcherUrl = xstrdup(iwlm->cfg.dispatcherUrl);
00694    iwlm->cfg.collectorUrl = xstrdup(iwlm->cfg.collectorUrl);
00695    iwlm->poolsz = iwlm->cfg.poolMax;
00696    iwlm->pool = (DnxWorkerStatus **)xmalloc(iwlm->poolsz * sizeof *iwlm->pool);
00697    iwlm->minexectm = (unsigned)(-1);   // the largest possible value
00698    memset(iwlm->pool, 0, iwlm->poolsz * sizeof *iwlm->pool);
00699 
00700    // if any of the above allocations failed, we really can't continue
00701    if (!iwlm->cfg.dispatcherUrl || !iwlm->cfg.collectorUrl || !iwlm->pool)
00702    {
00703       xfree(iwlm->cfg.collectorUrl);
00704       xfree(iwlm->cfg.dispatcherUrl);
00705       xfree(iwlm);
00706       return DNX_ERR_MEMORY;
00707    }
00708 
00709    // cache our ip address in binary and string format
00710    if (getifaddrs(&ifa) == 0)
00711    {
00712       u_int setflags = IFF_UP | IFF_RUNNING;
00713       u_int clrflags = IFF_LOOPBACK;
00714       struct ifaddrs * ifcur = ifa;
00715 
00716       // locate the first proper AF_NET address in our interface list
00717       while (ifcur && (ifcur->ifa_addr == 0
00718             || ifcur->ifa_addr->sa_family != AF_INET
00719             || (ifcur->ifa_flags & setflags) != setflags
00720             || (ifcur->ifa_flags & clrflags) != 0))
00721          ifcur = ifcur->ifa_next;
00722 
00723       if (ifcur)
00724       {
00725          // cache binary and presentation (string) versions of the ip address
00726          iwlm->myipaddr = (unsigned long)
00727                ((struct sockaddr_in *)ifcur->ifa_addr)->sin_addr.s_addr;
00728          inet_ntop(ifcur->ifa_addr->sa_family, 
00729                &((struct sockaddr_in *)ifcur->ifa_addr)->sin_addr,
00730                iwlm->myipaddrstr, sizeof iwlm->myipaddrstr);
00731       }
00732       freeifaddrs(ifa);
00733    }
00734 
00735    // create initial worker thread pool
00736    DNX_PT_MUTEX_INIT(&iwlm->mutex);
00737    DNX_PT_MUTEX_LOCK(&iwlm->mutex);
00738    {
00739       int ret;
00740       if ((ret = growThreadPool(iwlm)) != DNX_OK)
00741       {
00742          if (iwlm->threads)
00743             dnxLog("WLM: Error creating SOME worker threads: %s; "
00744                   "continuing with smaller initial pool.", dnxErrorString(ret));
00745          else
00746          {
00747             dnxLog("WLM: Unable to create ANY worker threads: %s; "
00748                   "terminating.", dnxErrorString(ret));
00749             DNX_PT_MUTEX_UNLOCK(&iwlm->mutex);
00750             DNX_PT_MUTEX_DESTROY(&iwlm->mutex);
00751             xfree(iwlm->cfg.collectorUrl);
00752             xfree(iwlm->cfg.dispatcherUrl);
00753             xfree(iwlm);
00754             return ret;
00755          }
00756       }
00757    }
00758    DNX_PT_MUTEX_UNLOCK(&iwlm->mutex);
00759 
00760    dnxLog("WLM: Started worker thread pool.");
00761 
00762    *pwlm = (DnxWlm *)iwlm;
00763 
00764    return DNX_OK;
00765 }
00766 
00767 //----------------------------------------------------------------------------
00768 
00769 void dnxWlmDestroy(DnxWlm * wlm)
00770 {
00771    iDnxWlm * iwlm = (iDnxWlm *)wlm;
00772    time_t expires;
00773    unsigned i;
00774 
00775    assert(wlm);
00776 
00777    dnxLog("WLM: Beginning termination sequence...");
00778 
00779    // sleep till we can't stand it anymore, then kill everyone
00780    iwlm->terminate = 1;
00781    expires = iwlm->cfg.shutdownGrace + time(0);
00782 
00783    DNX_PT_MUTEX_LOCK(&iwlm->mutex);
00784    while (iwlm->threads > 0 && time(0) < expires)
00785    {
00786       cleanThreadPool(iwlm);
00787       DNX_PT_MUTEX_UNLOCK(&iwlm->mutex);
00788       dnxCancelableSleep(100);
00789       DNX_PT_MUTEX_LOCK(&iwlm->mutex);
00790    }
00791 
00792    // check for workers remaining after grace period
00793    if (iwlm->threads)
00794       dnxDebug(1, "WLM: Termination - %d workers remaining"
00795             " after grace period.", iwlm->threads);
00796 
00797    // cancel all remaining workers
00798    for (i = 0; i < iwlm->threads; i++)
00799       if (iwlm->pool[i]->state == DNX_THREAD_RUNNING)
00800       {
00801          dnxDebug(1, "WLMDestroy: Cancelling worker [%lx].", iwlm->pool[i]->tid);
00802          pthread_cancel(iwlm->pool[i]->tid);
00803       }
00804 
00805    // give remaining thread some time to quit
00806    DNX_PT_MUTEX_UNLOCK(&iwlm->mutex);
00807    dnxCancelableSleep(1000);
00808    DNX_PT_MUTEX_LOCK(&iwlm->mutex);
00809 
00810    // join all zombies (should be everything left)
00811    cleanThreadPool(iwlm);
00812    assert(iwlm->threads == 0);
00813    xfree(iwlm->pool);
00814    DNX_PT_MUTEX_UNLOCK(&iwlm->mutex);
00815 
00816    DNX_PT_MUTEX_DESTROY(&iwlm->mutex);
00817 
00818    xfree(iwlm->cfg.collectorUrl);
00819    xfree(iwlm->cfg.dispatcherUrl);
00820    xfree(iwlm);
00821 
00822    dnxLog("WLM: Termination sequence complete.");
00823 }
00824 
00825 /*--------------------------------------------------------------------------*/
00826 

Generated on Tue Apr 13 15:48:07 2010 for DNX by  doxygen 1.5.6