dnxNebMain.c

Go to the documentation of this file.
00001 /*--------------------------------------------------------------------------
00002 
00003    Copyright (c) 2006-2010, Intellectual Reserve, Inc. All rights reserved.
00004 
00005    This program is free software; you can redistribute it and/or modify
00006    it under the terms of the GNU General Public License version 2 as
00007    published by the Free Software Foundation.
00008 
00009    This program is distributed in the hope that it will be useful,
00010    but WITHOUT ANY WARRANTY; without even the implied warranty of
00011    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00012    GNU General Public License for more details.
00013 
00014    You should have received a copy of the GNU General Public License
00015    along with this program; if not, write to the Free Software
00016    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00017 
00018   --------------------------------------------------------------------------*/
00019 
00041 #include "dnxNebMain.h"
00042 #include "dnxError.h"
00043 #include "dnxDebug.h"
00044 #include "dnxLogging.h"
00045 #include "dnxCollector.h"
00046 #include "dnxDispatcher.h"
00047 #include "dnxRegistrar.h"
00048 #include "dnxJobList.h"
00049 #include "dnxCfgParser.h"
00050 #include "dnxStats.h"
00051 #include "dnxAgent.h"
00052 
00053 #ifdef HAVE_CONFIG_H
00054 # include "config.h"
00055 #else
00056 # define VERSION "<unknown>"
00057 #endif
00058 
00059 #ifndef SYSCONFDIR
00060 # define SYSCONFDIR "/etc"
00061 #endif
00062 
00063 #ifndef SYSLOGDIR
00064 # define SYSLOGDIR  "/var/log"
00065 #endif
00066 
00067 #ifndef NSCORE
00068 # define NSCORE
00069 #endif
00070 #include "nagios.h"
00071 #include "objects.h"    // for nagios service data type
00072 #include "nebmodules.h"
00073 #include "nebstructs.h"
00074 #include "nebcallbacks.h"
00075 #include "neberrors.h"
00076 #include "broker.h"
00077 
00078 #include <stdio.h>
00079 #include <stdlib.h>
00080 #include <stdarg.h>
00081 #include <unistd.h>
00082 #include <sys/socket.h>
00083 #include <sys/stat.h>
00084 #include <sys/wait.h>
00085 #include <netinet/in.h>
00086 #include <fcntl.h>
00087 #include <regex.h>
00088 
00089 #if CURRENT_NEB_API_VERSION == 2
00090 # define OBJECT_FIELD_NAME object
00091 #elif CURRENT_NEB_API_VERSION == 3
00092 # define OBJECT_FIELD_NAME object_ptr
00093 #else
00094 # error Unsupported NEB API version.
00095 #endif
00096 
00097 #define elemcount(x) (sizeof(x)/sizeof(*(x)))
00098 
00099 #define DNX_DEFAULT_SERVER_CONFIG_FILE SYSCONFDIR "/dnxServer.cfg"
00100 #define DNX_DEFAULT_LOG                SYSLOGDIR  "/dnxsrv.log"
00101 #define DNX_DEFAULT_DBGLOG             SYSLOGDIR  "/dnxsrv.dbg.log"
00102 
00103 // specify event broker API version (required)
00104 NEB_API_VERSION(CURRENT_NEB_API_VERSION);
00105 
00107 typedef struct DnxJobData
00108 {
00109    service * svc;                   
00110    int chkopts;                     
00111    int schedule;                    
00112    int reschedule;                  
00113 } DnxJobData;
00114 
00116 typedef struct DnxServerCfg
00117 {
00118    char * agentUrl;                 
00119    char * dispatcherUrl;            
00120    char * collectorUrl;             
00121    char * authWorkerNodes;          
00122    unsigned maxNodeRequests;        
00123    unsigned minServiceSlots;        
00124    unsigned expirePollInterval;     
00125    char * localCheckPattern;        
00126    char * syncScript;               
00127    char * logFilePath;              
00128    char * debugFilePath;            
00129    char * auditFilePath;            
00130    unsigned debugLevel;             
00131 } DnxServerCfg;
00132 
00133 // module static data
00134 static DnxServerCfg cfg;            
00135 static DnxCfgParser * parser;       
00136 static DnxJobList * joblist;        
00137 static DnxRegistrar * registrar;    
00138 static DnxDispatcher * dispatcher;  
00139 static DnxCollector * collector;    
00140 static time_t start_time;           
00141 static int s_shutdown;              
00142 static DnxChannel * agent;          
00143 static pthread_t agentTid;          
00144 static void * myHandle;             
00145 static regex_t regEx;               
00146 
00147 /*--------------------------------------------------------------------------
00148                               IMPLEMENTATION
00149   --------------------------------------------------------------------------*/
00150 
00152 static void releaseConfig(void)
00153 {
00154    if (cfg.localCheckPattern)
00155       regfree(&regEx);
00156 
00157    dnxCfgParserDestroy(parser);
00158 }
00159 
00160 //----------------------------------------------------------------------------
00161 
00174 static int validateCfg(DnxCfgDict * dict, void ** vptrs, void * passthru)
00175 {
00176    regex_t * rep = (regex_t *)passthru;
00177    int err, ret = DNX_ERR_INVALID;
00178    DnxServerCfg cfg;
00179 
00180    assert(dict && vptrs && passthru);
00181 
00182    // setup data structure so we can use the same functionality we had before
00183    cfg.agentUrl           = (char *)vptrs[ 0];
00184    cfg.dispatcherUrl      = (char *)vptrs[ 1];
00185    cfg.collectorUrl       = (char *)vptrs[ 2];
00186    cfg.authWorkerNodes    = (char *)vptrs[ 3];
00187    cfg.maxNodeRequests    = (unsigned)(intptr_t)vptrs[ 4];
00188    cfg.minServiceSlots    = (unsigned)(intptr_t)vptrs[ 5];
00189    cfg.expirePollInterval = (unsigned)(intptr_t)vptrs[ 6];
00190    cfg.localCheckPattern  = (char *)vptrs[ 7];
00191    cfg.syncScript         = (char *)vptrs[ 8];
00192    cfg.logFilePath        = (char *)vptrs[ 9];
00193    cfg.debugFilePath      = (char *)vptrs[10];
00194    cfg.auditFilePath      = (char *)vptrs[11];
00195    cfg.debugLevel         = (unsigned)(intptr_t)vptrs[12];
00196 
00197    // validate configuration items in context
00198    if (!cfg.agentUrl)
00199       dnxLog("config: Missing channelAgent parameter.");
00200    else if (!cfg.dispatcherUrl)
00201       dnxLog("config: Missing channelDispatcher parameter.");
00202    else if (!cfg.collectorUrl)
00203       dnxLog("config: Missing channelCollector parameter.");
00204    else if (cfg.maxNodeRequests < 1)
00205       dnxLog("config: Invalid maxNodeRequests parameter.");
00206    else if (cfg.minServiceSlots < 1)
00207       dnxLog("config: Invalid minServiceSlots parameter.");
00208    else if (cfg.expirePollInterval < 1)
00209       dnxLog("config: Invalid expirePollInterval parameter.");
00210    else if (cfg.localCheckPattern && (err = regcomp(rep,
00211          cfg.localCheckPattern, REG_EXTENDED | REG_NOSUB)) != 0)
00212    {
00213       char buffer[128];
00214       regerror(err, rep, buffer, sizeof buffer);
00215       dnxLog("config: Failed to compile localCheckPattern (\"%s\"): %s.",
00216              cfg.localCheckPattern, buffer);
00217       regfree(rep);
00218    }
00219    else
00220       ret = 0;
00221 
00222    return ret;
00223 }
00224 
00225 //----------------------------------------------------------------------------
00226 
00233 static int initConfig(char * cfgfile)
00234 {
00235    DnxCfgDict dict[] =
00236    {  // Do NOT change the order, unless you know what you're doing!
00237       { "channelAgent",       DNX_CFG_URL,      &cfg.agentUrl           },
00238       { "channelDispatcher",  DNX_CFG_URL,      &cfg.dispatcherUrl      },
00239       { "channelCollector",   DNX_CFG_URL,      &cfg.collectorUrl       },
00240       { "authWorkerNodes",    DNX_CFG_STRING,   &cfg.authWorkerNodes    },
00241       { "maxNodeRequests",    DNX_CFG_UNSIGNED, &cfg.maxNodeRequests    },
00242       { "minServiceSlots",    DNX_CFG_UNSIGNED, &cfg.minServiceSlots    },
00243       { "expirePollInterval", DNX_CFG_UNSIGNED, &cfg.expirePollInterval },
00244       { "localCheckPattern",  DNX_CFG_STRING,   &cfg.localCheckPattern  },
00245       { "syncScript",         DNX_CFG_FSPATH,   &cfg.syncScript         },
00246       { "logFile",            DNX_CFG_FSPATH,   &cfg.logFilePath        },
00247       { "debugFile",          DNX_CFG_FSPATH,   &cfg.debugFilePath      },
00248       { "auditFile",          DNX_CFG_FSPATH,   &cfg.auditFilePath      },
00249       { "debugLevel",         DNX_CFG_UNSIGNED, &cfg.debugLevel         },
00250       { 0 },
00251    };
00252    char cfgdefs[] =
00253       "channelAgent = udp://0:12482\n"
00254       "channelDispatcher = udp://0:12480\n"
00255       "channelCollector = udp://0:12481\n"
00256       "maxNodeRequests = 0x7FFFFFFF\n"
00257       "minServiceSlots = 100\n"
00258       "expirePollInterval = 5\n"
00259       "logFile = " DNX_DEFAULT_LOG "\n"
00260       "debugFile = " DNX_DEFAULT_DBGLOG "\n";
00261 
00262    int ret;
00263    regex_t re;
00264 
00265    // clear the regex string, as we may write into it
00266    memset(&re, 0, sizeof re);
00267 
00268    // create global configuration parser object
00269    if ((ret = dnxCfgParserCreate(cfgdefs, cfgfile, 0, dict,
00270          validateCfg, &parser)) != 0)
00271       return ret;
00272 
00273    // parse configuration file; pass defaults
00274    if ((ret = dnxCfgParserParse(parser, &re)) == 0)
00275       regEx = re;
00276    else
00277       dnxCfgParserDestroy(parser);
00278 
00279    return ret;
00280 }
00281 
00282 //----------------------------------------------------------------------------
00283 
00290 static int nagiosGetServiceCount(void)
00291 {
00292    extern service * service_list;      // the global nagios service list
00293 
00294    service * temp_service;
00295    int total_services = 0;
00296 
00297    // walk the service list, count the nodes
00298    for (temp_service = service_list; temp_service;
00299          temp_service = temp_service->next)
00300       total_services++;
00301 
00302    return total_services;
00303 }
00304 
00305 #if CURRENT_NEB_API_VERSION == 2
00306 
00307 //----------------------------------------------------------------------------
00308 
00321 static int nagios2xPostResult(service * svc, time_t start_time,
00322       int early_timeout, int res_code, char * res_data)
00323 {
00324    extern circular_buffer service_result_buffer;
00325    int check_result_buffer_slots = 4096;
00326 
00327    service_message * new_message;
00328 
00329    // note that we're using malloc, not xmalloc - nagios takes ownership
00330    if ((new_message = (service_message *)malloc(sizeof *new_message)) == 0)
00331       return DNX_ERR_MEMORY;
00332 
00333    gettimeofday(&new_message->finish_time, 0);
00334    strncpy(new_message->host_name, svc->host_name,sizeof(new_message->host_name) - 1);
00335    new_message->host_name[sizeof(new_message->host_name) - 1] = 0;
00336    strncpy(new_message->description, svc->description,
00337          sizeof(new_message->description) - 1);
00338    new_message->description[sizeof(new_message->description) - 1] = 0;
00339    new_message->return_code = res_code;
00340    new_message->exited_ok = TRUE;
00341    new_message->check_type = SERVICE_CHECK_ACTIVE;
00342    new_message->parallelized = svc->parallelize;
00343    new_message->start_time.tv_sec = start_time;
00344    new_message->start_time.tv_usec = 0L;
00345    new_message->early_timeout = early_timeout;
00346    strncpy(new_message->output, res_data, sizeof(new_message->output) - 1);
00347    new_message->output[sizeof(new_message->output) - 1] = 0;
00348 
00349    pthread_mutex_lock(&service_result_buffer.buffer_lock);
00350 
00351    // handle overflow conditions
00352    if (service_result_buffer.items == check_result_buffer_slots)
00353    {
00354       service_result_buffer.overflow++;
00355       service_result_buffer.tail = (service_result_buffer.tail + 1)
00356             % check_result_buffer_slots;
00357    }
00358 
00359    // save the data to the buffer
00360    ((service_message **)service_result_buffer.buffer)
00361          [service_result_buffer.head] = new_message;
00362 
00363    // increment the head counter and items
00364    service_result_buffer.head = (service_result_buffer.head + 1)
00365          % check_result_buffer_slots;
00366    if (service_result_buffer.items < check_result_buffer_slots)
00367       service_result_buffer.items++;
00368    if (service_result_buffer.items > service_result_buffer.high)
00369       service_result_buffer.high = service_result_buffer.items;
00370 
00371    pthread_mutex_unlock(&service_result_buffer.buffer_lock);
00372 
00373    return 0;
00374 }
00375 
00376 #elif CURRENT_NEB_API_VERSION == 3
00377 
00378 // comment out this line to post to N3 filesystem results queue
00379 #define DIRECT_POST
00380 
00381 #ifdef DIRECT_POST
00382 
00383 // a nagios core global (It shouldn't be global, but we're glad it is!)
00384 extern check_result * check_result_list;
00385 
00386 static check_result * dnxResultList = 0;
00387 static pthread_mutex_t dnxResultListMutex = PTHREAD_MUTEX_INITIALIZER;
00388 
00389 //----------------------------------------------------------------------------
00390 
00401 static long dnxTimeCompare(struct timeval * tv1, struct timeval * tv2)
00402 {
00403    long secdiff = (long)(tv1->tv_sec - tv2->tv_sec);
00404    long usecdiff = (long)(tv1->tv_usec - tv2->tv_usec);
00405    return secdiff? secdiff: usecdiff;
00406 }
00407 
00408 //----------------------------------------------------------------------------
00409 
00417 static void dnxAddResultToList(check_result * newcr)
00418 {
00419    check_result ** curp;
00420 
00421    assert(newcr);
00422 
00423    DNX_PT_MUTEX_LOCK(&dnxResultListMutex);
00424 
00425    for (curp = &dnxResultList; *curp; curp = &(*curp)->next)
00426       if (dnxTimeCompare(&(*curp)->finish_time, &newcr->finish_time) >= 0)
00427          break;
00428 
00429    newcr->next = *curp;
00430    *curp = newcr;
00431 
00432    DNX_PT_MUTEX_UNLOCK(&dnxResultListMutex);
00433 }
00434 
00435 //----------------------------------------------------------------------------
00436 
00459 static int nagios3xPostResult(service * svc, int check_type,
00460       int check_options, int schedule, int reschedule, 
00461       time_t start_time, time_t finish_time, int early_timeout,
00462       int exited_ok, int res_code, char * res_data)
00463 {
00464    check_result * newcr;
00465 
00466    // use malloc here, not xmalloc - nagios will free it
00467    if ((newcr = (check_result *)malloc(sizeof *newcr)) == 0)
00468       return DNX_ERR_MEMORY;
00469 
00470    // initialize and fill with result info
00471    newcr->object_check_type = SERVICE_CHECK;
00472    newcr->host_name = strdup(svc->host_name);
00473    newcr->service_description = strdup(svc->description);
00474    newcr->check_type = check_type;
00475    newcr->check_options = check_options;
00476    newcr->scheduled_check = schedule;
00477    newcr->reschedule_check = reschedule;
00478    newcr->output_file = 0;
00479    newcr->output_file_fd = -1;
00480    newcr->latency = svc->latency;
00481    newcr->start_time.tv_sec = start_time;
00482    newcr->start_time.tv_usec = 0;
00483    newcr->finish_time.tv_sec = finish_time;
00484    newcr->finish_time.tv_usec = 0;
00485    newcr->early_timeout = early_timeout;
00486    newcr->exited_ok = exited_ok;
00487    newcr->return_code = res_code;
00488    newcr->output = strdup(res_data);
00489    newcr->next = 0;
00490 
00491    dnxAddResultToList(newcr);
00492 
00493    return 0;
00494 }
00495 
00496 //----------------------------------------------------------------------------
00497 
00504 static check_result * dnxMergeLists(check_result * lista, check_result * listb)
00505 {
00506    check_result * result = 0;
00507 
00508    check_result ** iter;
00509    for (iter = &result; lista && listb; iter = &(*iter)->next)
00510       if (dnxTimeCompare(&lista->finish_time, &listb->finish_time) <= 0)
00511       { *iter = lista; lista = lista->next; }
00512       else
00513       {*iter = listb; listb = listb->next; }
00514 
00515    *iter = lista? lista: listb;
00516 
00517    return result;
00518 }
00519 
00520 //----------------------------------------------------------------------------
00521 
00528 static void dnxMoveResultsToNagios(void)
00529 {
00530    check_result * local;
00531 
00532    // safely save off currently local list
00533    DNX_PT_MUTEX_LOCK(&dnxResultListMutex);
00534    local = dnxResultList;
00535    dnxResultList = 0;
00536    DNX_PT_MUTEX_UNLOCK(&dnxResultListMutex);
00537 
00538    // merge local into check_result_list, store in check_result_list
00539    check_result_list = dnxMergeLists(local, check_result_list);
00540 }
00541 
00542 //----------------------------------------------------------------------------
00543 
00554 static int ehTimedEvent(int event_type, void * data)
00555 {
00556    nebstruct_timed_event_data * ted = (nebstruct_timed_event_data *)data;
00557    timed_event * event = (timed_event*)data;
00558    int ret;
00559 
00560    // sanity checks
00561    if (event_type != NEBCALLBACK_TIMED_EVENT_DATA || ted == 0)
00562       return ERROR;
00563 
00564    // we only care about REAPER events
00565    if (ted->event_type != EVENT_CHECK_REAPER)
00566       return OK;
00567 
00568    dnxDebug(3, "Reaper handler called.");
00569    
00570    dnxMoveResultsToNagios();
00571 
00572    return OK;
00573 }
00574 
00575 #else // !DIRECT_POST
00576 
00577 //----------------------------------------------------------------------------
00578 
00586 static void nagios3xMoveCheckResultToQueue(char * resfile)
00587 {
00588    char * outfile;
00589    int ret = -1;
00590 
00591    // a nagios 3x global variable
00592    extern char * check_result_path;
00593 
00594    // create a safe temp file
00595    // path length + 'cXXXXXX' (7) + '/' (1) + '.ok' (3) + null (1)
00596    if ((outfile = (char *)xmalloc(strlen(check_result_path) 
00597          + 7 + 1 + 3 + 1)) != 0)
00598    {
00599       int fd;
00600       //int oldmask = umask(077);
00601 
00602       sprintf(outfile, "%s/cXXXXXX", check_result_path);
00603       if ((fd = mkstemp(outfile)) > 0)
00604       {
00605 #ifdef __CYGWIN__
00606          // Cygwin cannot rename open files - gives Permission Denied
00607          close(fd);
00608 #endif
00609 
00610          // move the original file using a nagios core function
00611          ret = my_rename(resfile, outfile);
00612 
00613 #ifndef __CYGWIN__
00614          // close the file
00615          close(fd);
00616 #endif
00617 
00618          // create an ok-to-go indicator file
00619          strcat(outfile, ".ok");
00620          if ((fd = open(outfile, O_CREAT|O_WRONLY|O_TRUNC, S_IRUSR|S_IWUSR)) > 0)
00621             close(fd);
00622 
00623       }
00624 
00625       //umask(oldmask);
00626 
00627       xfree(outfile);
00628    }
00629 
00630    // delete the original file if allocation failed or it couldn't be moved
00631    if (ret != 0)
00632       unlink(resfile);
00633 }
00634 
00635 //----------------------------------------------------------------------------
00636 
00673 static int nagios3xPostResult(service * svc, int check_type,
00674       int check_options, int schedule, int reschedule,
00675       time_t start_time, time_t finish_time, int early_timeout,
00676       int exited_ok, int res_code, char * res_data)
00677 {
00678    char * escaped_res_data;
00679    char * filename;
00680    FILE * fp = 0;
00681    int fd;
00682 
00683    // a nagios 3.x global variable
00684    extern char * temp_path;
00685 
00686    // a nagios 3.x core function
00687    if ((escaped_res_data = escape_newlines(res_data)) == 0)
00688       return DNX_ERR_MEMORY;
00689 
00690    // open a file in the check results path for storing check results
00691    // path length + 'checkXXXXXX' (11) + '/' (1) + '.ok' (3) + null (1)
00692    if ((filename = (char *)xmalloc(strlen(temp_path) 
00693          + 11 + 1 + 3 + 1)) == 0)
00694    {
00695       free(escaped_res_data); // allocated by nagios - use free
00696       return DNX_ERR_MEMORY;
00697    }
00698    sprintf(filename, "%s/checkXXXXXX", temp_path);
00699    if ((fd = mkstemp(filename)) < 0
00700          || (fp = fdopen(fd, "w")) == 0)
00701    {
00702       xfree(filename);
00703       free(escaped_res_data); // allocated by nagios - use free
00704       if (fd >= 0) close(fd);
00705       return DNX_ERR_OPEN;
00706    }
00707 
00708    // write check results to the queue file
00709    fprintf(fp, "### Active Check Result File ###\n");
00710    fprintf(fp, "file_time=%lu\n\n", (unsigned long)start_time);
00711    fprintf(fp, "### Nagios Service Check Result ###\n");
00712    fprintf(fp, "# Time: %s", ctime(&start_time));
00713    fprintf(fp, "host_name=%s\n", svc->host_name);
00714    fprintf(fp, "service_description=%s\n", svc->description);
00715    fprintf(fp, "check_type=%d\n", check_type);
00716    fprintf(fp, "check_options=%d\n", check_options);
00717    fprintf(fp, "scheduled_check=%d\n", schedule);
00718    fprintf(fp, "reschedule_check=%d\n", reschedule);
00719    fprintf(fp, "latency=%f\n", svc->latency);
00720    fprintf(fp, "start_time=%lu.0\n", (unsigned long)start_time);
00721    fprintf(fp, "finish_time=%lu.%lu\n", (unsigned long)finish_time);
00722    fprintf(fp, "early_timeout=%d\n", early_timeout);
00723    fprintf(fp, "exited_ok=%d\n", exited_ok);
00724    fprintf(fp, "return_code=%d\n", res_code);
00725    fprintf(fp, "output=%s\n", escaped_res_data);
00726 
00727    fclose(fp);
00728 
00729    free(escaped_res_data); // allocated by nagios - use free
00730 
00731    nagios3xMoveCheckResultToQueue(filename);
00732 
00733    xfree(filename);
00734 
00735    return 0;
00736 }
00737 
00738 #endif // ?DIRECT_POST
00739 
00740 #else
00741 # error Unsupported NEB API version.
00742 #endif
00743 
00744 //----------------------------------------------------------------------------
00745 
00746 int dnxPostResult(void * data, time_t start_time, unsigned delta,
00747       int early_timeout, int res_code, char * res_data)
00748 {
00749    DnxJobData * jdp = (DnxJobData *)data;
00750    int ret;
00751 
00752    if (early_timeout)
00753       res_code = STATE_UNKNOWN;
00754 
00755    // update stats
00756    if (early_timeout)
00757       dnxStatsInc(0, RESULTS_TIMED_OUT);
00758    else if (!res_code)
00759       dnxStatsInc(0, RESULTS_OK);
00760    else
00761       dnxStatsInc(0, RESULTS_FAILED);
00762 
00766 #if CURRENT_NEB_API_VERSION == 2
00767 
00768    ret = nagios2xPostResult(jdp->svc, start_time, early_timeout,
00769          res_code, res_data);
00770 
00771 #elif CURRENT_NEB_API_VERSION == 3
00772 
00773    ret = nagios3xPostResult(jdp->svc, SERVICE_CHECK_ACTIVE,
00774          jdp->chkopts, jdp->schedule, jdp->reschedule,
00775          start_time, start_time + delta, early_timeout,
00776          1, res_code, res_data);
00777 
00778 #else
00779 # error Unsupported NEB API version.
00780 #endif
00781 
00782    // update stats
00783    if (ret == 0)
00784       dnxStatsInc(0, POST_RESULTS_OK);
00785    else
00786       dnxStatsInc(0, POST_RESULTS_FAILED);
00787 
00788    return ret;
00789 }
00790 
00791 //----------------------------------------------------------------------------
00792 
00801 static int dnxCalculateJobListSize(void)
00802 {
00803    int size = nagiosGetServiceCount();
00804 
00805    // zero doesn't make sense...
00806    if (size < 1)
00807    {
00808       size = 100;
00809       dnxLog("No Nagios services defined! "
00810              "Defaulting to %d slots in the DNX job queue.", size);
00811    }
00812 
00813    // check for configuration minServiceSlots override
00814    if (size < cfg.minServiceSlots)
00815    {
00816       dnxLog("Overriding calculated service check slot count. "
00817              "Increasing from %d to configured minimum: %d.",
00818              size, cfg.minServiceSlots);
00819       size = cfg.minServiceSlots;
00820    }
00821 
00822    // check for configuration maxNodeRequests override
00823    if (size > cfg.maxNodeRequests)
00824    {
00825       dnxLog("Overriding calculated service check slot count. "
00826              "Decreasing from %d to configured maximum: %d.", size,
00827              cfg.maxNodeRequests);
00828       size = cfg.maxNodeRequests;
00829    }
00830    return size;
00831 }
00832 
00833 //----------------------------------------------------------------------------
00834 
00847 static int dnxPostNewJob(DnxJobList * joblist, unsigned long serial, DnxJobData * jdp, nebstruct_service_check_data * ds, DnxNodeRequest * pNode)
00848 {
00849    DnxNewJob Job;
00850    int ret;
00851 
00852    assert(ds);
00853    assert(ds->command_line);
00854 
00855    // fill-in the job structure with the necessary information
00856    dnxMakeXID(&Job.xid, DNX_OBJ_JOB, serial, 0);
00857    Job.payload    = jdp;
00858    Job.cmd        = xstrdup(ds->command_line);
00859    Job.start_time = ds->start_time.tv_sec;
00860    Job.timeout    = ds->timeout;
00861    Job.expires    = Job.start_time + Job.timeout + 5;
00862    Job.pNode      = pNode;
00863 
00864    dnxDebug(2, "DnxNebMain: Posting Job [%lu]: %s.", serial, Job.cmd);
00865 
00866    // post to the Job Queue
00867    if ((ret = dnxJobListAdd(joblist, &Job)) != DNX_OK)
00868    {
00869       dnxStatsInc(0, JOBS_REJECTED_NO_SLOTS);
00870       dnxLog("Failed to post Job [%lu]; \"%s\": %d.",
00871             Job.xid.objSerial, Job.cmd, ret);
00872    }
00873    else
00874    {
00875       dnxStatsInc(0, JOBS_HANDLED);
00876       dnxAuditJob(&Job, "ASSIGN");
00877    }
00878    return ret;
00879 }
00880 
00881 //----------------------------------------------------------------------------
00882 
00892 static int ehSvcCheck(int event_type, void * data)
00893 {
00894    static unsigned long serial = 0; // the number of service checks processed
00895 
00896    nebstruct_service_check_data * svcdata = (nebstruct_service_check_data *)data;
00897    DnxNodeRequest * pNode;
00898    DnxJobData * jdp;
00899    int ret;
00900 
00901    if (event_type != NEBCALLBACK_SERVICE_CHECK_DATA)
00902       return OK;
00903 
00904    if (svcdata == 0)
00905    {
00906       dnxLog("Service handler received NULL service data structure.");
00907       return ERROR;  // shouldn't happen - internal Nagios error
00908    }
00909 
00910    if (svcdata->type != NEBTYPE_SERVICECHECK_INITIATE)
00911       return OK;  // ignore non-initiate service checks
00912 
00913    // check for local execution pattern on command line
00914    if (cfg.localCheckPattern && regexec(&regEx, svcdata->command_line, 0, 0, 0) == 0)
00915    {
00916       dnxDebug(1, "Service will execute locally: %s.", svcdata->command_line);
00917       return OK;     // tell nagios execute locally
00918    }
00919 
00920    dnxDebug(3, "ehSvcCheck: Received Job [%lu] at %lu (%lu).",
00921          serial, (unsigned long)time(0),
00922          (unsigned long)svcdata->start_time.tv_sec);
00923 
00924    if ((ret = dnxGetNodeRequest(registrar, &pNode)) != DNX_OK)
00925    {
00926       dnxDebug(3, "ehSvcCheck: No worker nodes requests available: %s.",dnxErrorString(ret));
00927       return OK;     // tell nagios execute locally
00928    }
00929 
00930    // allocate and populate a new job payload object
00931    if ((jdp = (DnxJobData *)xmalloc(sizeof *jdp)) == 0)
00932    {
00933       dnxDebug(1, "ehSvcCheck: Out of memory!");
00934       return OK;
00935    }
00936    memset(jdp, 0, sizeof *jdp);
00937    jdp->svc = (service *)svcdata->OBJECT_FIELD_NAME;
00938 
00939    assert(jdp->svc);
00940 
00941 #if CURRENT_NEB_API_VERSION == 3
00942    {
00943       // a nagios 3.x global variable
00944       extern check_result check_result_info;
00945 
00948       jdp->chkopts    = check_result_info.check_options;
00949       jdp->schedule   = check_result_info.scheduled_check;
00950       jdp->reschedule = check_result_info.reschedule_check;
00951    }
00952 #endif
00953 
00954    if ((ret = dnxPostNewJob(joblist, serial, jdp, svcdata, pNode)) != DNX_OK)
00955    {
00956       dnxLog("Unable to post job [%lu]: %s.", serial, dnxErrorString(ret));
00957       xfree(jdp);
00958       return OK;     // tell nagios execute locally
00959    }
00960 
00961    serial++;                           // bump serial number
00962    return NEBERROR_CALLBACKOVERRIDE;   // tell nagios we want it
00963 }
00964 
00965 //----------------------------------------------------------------------------
00966 
00967 // forward declaration due to circular reference
00968 static int ehProcessData(int event_type, void * data);
00969 
00974 static int dnxServerDeInit(void)
00975 {
00976    // deregister for all nagios events we previously registered for...
00977    neb_deregister_callback(NEBCALLBACK_PROCESS_DATA, ehProcessData);
00978    neb_deregister_callback(NEBCALLBACK_SERVICE_CHECK_DATA, ehSvcCheck);
00979 #if CURRENT_NEB_API_VERSION == 3 && defined(DIRECT_POST)
00980    neb_deregister_callback(NEBCALLBACK_TIMED_EVENT_DATA, ehTimedEvent);
00981 #endif
00982 
00983    dnxReleaseAgent();
00984 
00985    // ensure we don't destroy non-existent objects from here on out...
00986    if (registrar)
00987       dnxRegistrarDestroy(registrar);
00988 
00989    if (dispatcher)
00990       dnxDispatcherDestroy(dispatcher);
00991 
00992    if (collector)
00993       dnxCollectorDestroy(collector);
00994 
00995    if (joblist)
00996       dnxJobListDestroy(joblist);
00997 
00998    // it doesn't matter if we haven't initialized the
00999    // channel map - it can figure that out for itself
01000    dnxChanMapRelease();
01001 
01002    dnxStatsCleanup();
01003 
01004    return OK;
01005 }
01006 
01007 //----------------------------------------------------------------------------
01008 
01013 static int dnxServerInit(void)
01014 {
01015    int ret, joblistsz;
01016 
01017    // clear globals so we know what to "undo" as we back out
01018    joblist = 0;
01019    registrar = 0;
01020    dispatcher = 0;
01021    collector = 0;
01022 
01023    if ((ret = dnxChanMapInit(0)) != 0)
01024    {
01025       dnxLog("Failed to initialize channel map: %s.", dnxErrorString(ret));
01026       return ret;
01027    }
01028 
01029    joblistsz = dnxCalculateJobListSize();
01030 
01031    dnxLog("Allocating %d service request slots in the DNX job list.", joblistsz);
01032 
01033    if ((ret = dnxJobListCreate(joblistsz, &joblist)) != 0)
01034    {
01035       dnxLog("Failed to initialize DNX job list with %d slots.", joblistsz);
01036       return ret;
01037    }
01038 
01039    // create and configure collector
01040    if ((ret = dnxCollectorCreate("Collect", cfg.collectorUrl,
01041                joblist, &collector)) != 0)
01042       return ret;
01043 
01044    // create and configure dispatcher
01045    if ((ret = dnxDispatcherCreate("Dispatch", cfg.dispatcherUrl,
01046                joblist, &dispatcher)) != 0)
01047       return ret;
01048 
01049    // create worker node registrar
01050    if ((ret = dnxRegistrarCreate(joblistsz * 2,
01051                dnxDispatcherGetChannel(dispatcher), &registrar)) != 0)
01052       return ret;
01053 
01054    // initialize server management agent
01055    if ((ret = dnxInitAgent(cfg.agentUrl, parser)) != 0)
01056       return ret;
01057 
01058 #if CURRENT_NEB_API_VERSION == 3 && defined(DIRECT_POST)
01059 
01060    // register for timed event to piggy-back on reaper thread
01061    neb_register_callback(NEBCALLBACK_TIMED_EVENT_DATA, myHandle, 0, ehTimedEvent);
01062    dnxLog("Registered for TIMEDEVENT_EXECUTE event.");
01063 
01064 #endif
01065 
01066    // registration for this event starts everything rolling
01067    neb_register_callback(NEBCALLBACK_SERVICE_CHECK_DATA, myHandle, 0, ehSvcCheck);
01068 
01069    dnxLog("Registered for SERVICE_CHECK_DATA event.");
01070    dnxLog("Server initialization completed.");
01071 
01072    return 0;
01073 }
01074 
01075 //----------------------------------------------------------------------------
01076 
01083 static int launchScript(char * script)
01084 {
01085    int ret;
01086 
01087    assert(script);
01088 
01089    // exec the script - system waits till child completes
01090    if ((ret = system(script)) == -1)
01091    {
01092       dnxLog("Failed to exec script: %s.", strerror(errno));
01093       ret = DNX_ERR_INVALID;
01094    }
01095    else
01096       ret = DNX_OK;
01097 
01098    dnxLog("Sync script returned %d.", WEXITSTATUS(ret));
01099 
01100    return ret;
01101 }
01102 
01103 //----------------------------------------------------------------------------
01104 
01113 static int ehProcessData(int event_type, void * data)
01114 {
01115    nebstruct_process_data *procdata = (nebstruct_process_data *)data;
01116 
01117    // validate our event type - ignore wrong event type
01118    assert(event_type == NEBCALLBACK_PROCESS_DATA);
01119    if (event_type != NEBCALLBACK_PROCESS_DATA)
01120       return OK;
01121 
01122    // sanity-check our data structure - should never happen
01123    assert(procdata);
01124    if (!procdata)
01125    {
01126       dnxLog("Startup handler received NULL process data structure.");
01127       return ERROR;
01128    }
01129 
01130    // look for process event loop start event
01131    if (procdata->type == NEBTYPE_PROCESS_EVENTLOOPSTART)
01132    {
01133       dnxDebug(2, "Startup handler received PROCESS_EVENTLOOPSTART event.");
01134 
01135       // execute sync script, if defined
01136       if (cfg.syncScript)
01137       {
01138          dnxLog("Startup handler executing plugin sync script: %s.", cfg.syncScript);
01139 
01140          // NB: This halts Nagios execution until the script exits...
01141          launchScript(cfg.syncScript);
01142       }
01143 
01144       // if server init fails, do server shutdown
01145       if (dnxServerInit() != 0)
01146          dnxServerDeInit();
01147    }
01148    return OK;
01149 }
01150 
01151 /*--------------------------------------------------------------------------
01152                                  INTERFACE
01153   --------------------------------------------------------------------------*/
01154 
01155 char * versionText(void)
01156 {
01157    static char versionText[] = 
01158       "\n"
01159       "  dnxServer Version " VERSION ", Built " __DATE__ " at " __TIME__ ".\n"
01160       "  Distributed Nagios eXecutor (DNX) Integrated Plugin/Server Library.\n"
01161       "  Please report bugs to <" PACKAGE_BUGREPORT ">.\n"
01162       "\n"
01163       "  Default configuration:\n"
01164       "    Default config file: "      DNX_DEFAULT_SERVER_CONFIG_FILE "\n"
01165       "    Default log file: "         DNX_DEFAULT_LOG "\n"
01166       "    Default debug log file: "   DNX_DEFAULT_DBGLOG "\n"
01167 //    "    Compile flags: "            COMPILE_FLAGS "\n"
01168 #if DEBUG_HEAP
01169       "    Debug heap is ENABLED.\n"
01170 #endif
01171 #if DEBUG_LOCKS
01172       "    Debug locks are ENABLED.\n"
01173 #endif
01174       ;
01175    return xstrdup(versionText);
01176 }
01177 
01178 void dnxJobCleanup(DnxNewJob * pJob)
01179 {
01180    if (pJob)
01181    {
01182       xfree(pJob->cmd);
01183       xfree(pJob->payload);
01184       xfree(pJob->pNode);
01185    }
01186 }
01187 
01188 //----------------------------------------------------------------------------
01189 
01190 int dnxAuditJob(DnxNewJob * pJob, char * action)
01191 {
01192    if (cfg.auditFilePath)
01193       return dnxAudit(
01194             "%s: Job %lu: Worker %s-%lx: %s",
01195                   action, pJob->xid.objSerial,
01196                   pJob->pNode->addrstr, 
01197                   pJob->pNode->xid.objSlot, pJob->cmd);
01198 
01199    return DNX_OK;
01200 }
01201 
01202 //----------------------------------------------------------------------------
01203 
01213 int nebmodule_deinit(int flags, int reason)
01214 {
01215    dnxLog("-------- DNX Server Module Shutdown Initiated --------");
01216    dnxServerDeInit();
01217 
01218    releaseConfig();
01219    xheapchk();
01220 
01221    dnxLog("-------- DNX Server Module Shutdown Completed --------");
01222    return 0;
01223 }
01224 
01225 //----------------------------------------------------------------------------
01226 
01239 int nebmodule_init(int flags, char * args, nebmodule * handle)
01240 {
01241    int ret;
01242 
01243    myHandle = handle;
01244 
01245    // module args string should contain a fully-qualified config file path
01246    if (!args || !*args)
01247       args = DNX_DEFAULT_SERVER_CONFIG_FILE;
01248 
01249    if ((ret = initConfig(args)) != 0)
01250       return ERROR;
01251 
01252    // set configured debug level and syslog log facility code
01253    dnxLogInit(cfg.logFilePath, cfg.debugFilePath, cfg.auditFilePath,
01254          &cfg.debugLevel);
01255 
01256    dnxLog("-------- DNX Server Module Version %s Startup --------", VERSION);
01257    dnxLog("Copyright (c) 2006-2010 Intellectual Reserve. All rights reserved.");
01258    dnxLog("Configuration file: %s.", args);
01259    dnxLog("Dispatcher: %s.", cfg.dispatcherUrl);
01260    dnxLog("Collector: %s.", cfg.collectorUrl);
01261    dnxLog("Agent: %s.", cfg.agentUrl);
01262    if (cfg.debugFilePath && cfg.debugLevel != 0)
01263    {
01264       dnxLog("Debug logging enabled at level %d to %s.",
01265             cfg.debugLevel, cfg.debugFilePath);
01266 #if DEBUG_HEAP
01267       dnxLog("Debug heap is enabled.");
01268 #endif
01269 #if DEBUG_LOCKS
01270       dnxLog("Debug locks are enabled.");
01271 #endif
01272    }
01273    if (cfg.auditFilePath)
01274       dnxLog("Auditing enabled to %s.", cfg.auditFilePath);
01275 
01276 #if DEBUG_HEAP
01277       dnxLog("Debug heap is enabled.");
01278 #endif
01279 #if DEBUG_LOCKS
01280       dnxLog("Debug locks are enabled.");
01281 #endif
01282 
01283    // subscribe to PROCESS_DATA call-backs in order to defer initialization
01284    //    until after Nagios validates its configuration and environment.
01285    if ((ret = neb_register_callback(NEBCALLBACK_PROCESS_DATA,
01286          myHandle, 0, ehProcessData)) != OK)
01287    {
01288       dnxLog("PROCESS_DATA event registration failed: %s.", dnxErrorString(ret));
01289       releaseConfig();
01290       return ERROR;
01291    }
01292    start_time = time(0);
01293 
01294    dnxLog("-------- DNX Server Module Startup Complete --------");
01295 
01296    return OK;
01297 }
01298 
01299 

Generated on Tue Apr 13 15:48:07 2010 for DNX by  doxygen 1.5.6