dnxNebMain.c

Go to the documentation of this file.
00001 /*--------------------------------------------------------------------------
00002 
00003    Copyright (c) 2006-2010, Intellectual Reserve, Inc. All rights reserved.
00004 
00005    This program is free software; you can redistribute it and/or modify
00006    it under the terms of the GNU General Public License version 2 as
00007    published by the Free Software Foundation.
00008 
00009    This program is distributed in the hope that it will be useful,
00010    but WITHOUT ANY WARRANTY; without even the implied warranty of
00011    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00012    GNU General Public License for more details.
00013 
00014    You should have received a copy of the GNU General Public License
00015    along with this program; if not, write to the Free Software
00016    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00017 
00018   --------------------------------------------------------------------------*/
00019 
00043 #ifdef HAVE_CONFIG_H
00044 # include "config.h"
00045 #endif
00046 
00047 #include "dnxNebMain.h"
00048 
00049 #include "dnxCfgParser.h"
00050 #include "dnxError.h"
00051 #include "dnxDebug.h"
00052 
00053 #ifndef NSCORE
00054 # define NSCORE
00055 #endif
00056 #include "nagios.h"
00057 #include "objects.h"    // for nagios service data type
00058 #include "nebmodules.h"
00059 #include "nebstructs.h"
00060 #include "nebcallbacks.h"
00061 #include "neberrors.h"
00062 #include "broker.h"
00063 
00064 #include <unistd.h>
00065 #include <signal.h>
00066 #include <fcntl.h>
00067 #include <sys/stat.h>
00068 #include <sys/types.h>
00069 #include <sys/time.h>
00070 #include <sys/wait.h>
00071 #include <regex.h>
00072 
00073 #ifndef VERSION
00074 # define VERSION    "<unknown>"
00075 #endif
00076 
00077 #ifndef SYSCONFDIR
00078 # define SYSCONFDIR "/etc"
00079 #endif
00080 
00081 #ifndef SYSLOGDIR
00082 # define SYSLOGDIR  "/var/log"
00083 #endif
00084 
00085 #ifndef LIBEXECDIR
00086 # define LIBEXECDIR "/usr/libexec"
00087 #endif
00088 
00089 #if CURRENT_NEB_API_VERSION == 2
00090 # define OBJECT_FIELD_NAME object
00091 #elif CURRENT_NEB_API_VERSION == 3
00092 # define OBJECT_FIELD_NAME object_ptr
00093 #else
00094 # error Unsupported NEB API version.
00095 #endif
00096 
00097 #define elemcount(x) (sizeof(x)/sizeof(*(x)))
00098 
00099 #define DNX_DEFAULT_SERVER_CONFIG_FILE SYSCONFDIR "/dnxServer.cfg"
00100 #define DNX_DEFAULT_LOG                SYSLOGDIR  "/dnxsrv.log"
00101 #define DNX_DEFAULT_DBGLOG             SYSLOGDIR  "/dnxsrv.dbg.log"
00102 #define DNX_DEFAULT_SERVER             LIBEXECDIR "/dnxServer"
00103 
00105 NEB_API_VERSION(CURRENT_NEB_API_VERSION);
00106 
00108 typedef struct DnxCfgData
00109 {
00110    char * serverPath;         
00111    char * localCheckPattern;  
00112    char * syncScript;         
00113    char * logFilePath;        
00114    char * debugFilePath;      
00115    char * auditFilePath;      
00116    unsigned debugLevel;       
00117 } DnxCfgData;
00118 
00119 // module static data
00120 static char * cfgfile;        
00121 static DnxCfgData cfg;        
00122 static DnxCfgParser * parser; 
00123 static time_t start_time;     
00124 static void * myHandle;       
00125 static regex_t regEx;         
00126 static pid_t serverpid;       
00127 static int outdatafd;         
00128 static int inackfd;           
00129 static int indatafd;          
00130 static pthread_t rlthread;    
00131 
00132 /*--------------------------------------------------------------------------
00133                               IMPLEMENTATION
00134   --------------------------------------------------------------------------*/
00135 
00137 static void releaseConfig(void)
00138 {
00139    if (cfg.localCheckPattern)
00140       regfree(&regEx);
00141 
00142    dnxCfgParserDestroy(parser);
00143 }
00144 
00145 //----------------------------------------------------------------------------
00146 
00159 static int validateCfg(DnxCfgDict * dict, void ** vptrs, void * passthru)
00160 {
00161    regex_t * rep = (regex_t *)passthru;
00162    int err, ret = DNX_ERR_INVALID;
00163    DnxCfgData cfg;
00164    struct stat statbuf;
00165 
00166    assert(dict && vptrs && passthru);
00167 
00168    // setup data structure so we can use the same functionality we had before
00169    cfg.serverPath         = (char *)vptrs[ 0];
00170    cfg.localCheckPattern  = (char *)vptrs[ 1];
00171    cfg.syncScript         = (char *)vptrs[ 2];
00172    cfg.logFilePath        = (char *)vptrs[ 3];
00173    cfg.debugFilePath      = (char *)vptrs[ 4];
00174    cfg.debugLevel         = (unsigned)(intptr_t)vptrs[ 5];
00175    cfg.auditFilePath      = (char *)vptrs[ 6];
00176 
00177    // validate configuration items in context
00178    if (!cfg.serverPath || stat(cfg.serverPath, &statbuf) < 0
00179          || !S_ISREG(statbuf.st_mode))
00180       dnxLog("config: server not specified or invalid (\"%s\").",
00181              cfg.serverPath);
00182    else if (cfg.localCheckPattern && (err = regcomp(rep,
00183          cfg.localCheckPattern, REG_EXTENDED | REG_NOSUB)) != 0)
00184    {
00185       char buffer[128];
00186       regerror(err, rep, buffer, sizeof buffer);
00187       dnxLog("config: Failed to compile localCheckPattern (\"%s\"): %s.",
00188              cfg.localCheckPattern, buffer);
00189       regfree(rep);
00190    }
00191    else
00192       ret = 0;
00193 
00194    return ret;
00195 }
00196 
00197 //----------------------------------------------------------------------------
00198 
00205 static int initConfig(char * cfgfile)
00206 {
00207    DnxCfgDict dict[] =
00208    {  // Do NOT change the order, unless you know what you're doing!
00209       { "serverPath",         DNX_CFG_FSPATH,   &cfg.serverPath         },
00210       { "localCheckPattern",  DNX_CFG_STRING,   &cfg.localCheckPattern  },
00211       { "syncScript",         DNX_CFG_FSPATH,   &cfg.syncScript         },
00212       { "pluginLogFile",      DNX_CFG_FSPATH,   &cfg.logFilePath        },
00213       { "pluginDebugFile",    DNX_CFG_FSPATH,   &cfg.debugFilePath      },
00214       { "pluginDebugLevel",   DNX_CFG_UNSIGNED, &cfg.debugLevel         },
00215       { "pluginAuditFile",    DNX_CFG_FSPATH,   &cfg.auditFilePath      },
00216       { 0 },
00217    };
00218    char cfgdefs[] =
00219       "serverPath = " DNX_DEFAULT_SERVER "\n"
00220       "pluginLogFile = " DNX_DEFAULT_LOG "\n"
00221       "pluginDebugFile = " DNX_DEFAULT_DBGLOG "\n";
00222 
00223    int ret;
00224    regex_t re;
00225 
00226    // clear the regex string, as we may write into it
00227    memset(&re, 0, sizeof re);
00228 
00229    // create global configuration parser object
00230    if ((ret = dnxCfgParserCreate(cfgdefs, cfgfile, 0, dict,
00231          validateCfg, &parser)) != 0)
00232       return ret;
00233 
00234    // parse configuration file; pass defaults
00235    if ((ret = dnxCfgParserParse(parser, &re)) == 0)
00236       regEx = re;
00237    else
00238       dnxCfgParserDestroy(parser);
00239 
00240    return ret;
00241 }
00242 
00243 //----------------------------------------------------------------------------
00244 
00256 static int nagiosGetServiceCount(void)
00257 {
00258    extern service * service_list;      // the global nagios service list
00259 
00260    service * temp_service;
00261    int total_services = 0;
00262 
00263    // walk the service list, count the nodes
00264    for (temp_service = service_list; temp_service;
00265          temp_service = temp_service->next)
00266       total_services++;
00267 
00268    return total_services;
00269 }
00270 
00271 #if CURRENT_NEB_API_VERSION == 2
00272 
00273 //----------------------------------------------------------------------------
00274 
00287 static int nagios2xPostResult(service * svc, time_t start_time,
00288       int early_timeout, int res_code, char * res_data)
00289 {
00290    extern circular_buffer service_result_buffer;
00291    int check_result_buffer_slots = 4096;
00292 
00293    service_message * new_message;
00294 
00295    // note that we're using malloc, not xmalloc - nagios takes ownership
00296    if ((new_message = (service_message *)malloc(sizeof *new_message)) == 0)
00297       return DNX_ERR_MEMORY;
00298 
00299    gettimeofday(&new_message->finish_time, 0);
00300    strncpy(new_message->host_name, svc->host_name, 
00301          sizeof(new_message->host_name) - 1);
00302    new_message->host_name[sizeof(new_message->host_name) - 1] = 0;
00303    strncpy(new_message->description, svc->description,
00304          sizeof(new_message->description) - 1);
00305    new_message->description[sizeof(new_message->description) - 1] = 0;
00306    new_message->return_code = res_code;
00307    new_message->exited_ok = TRUE;
00308    new_message->check_type = SERVICE_CHECK_ACTIVE;
00309    new_message->parallelized = svc->parallelize;
00310    new_message->start_time.tv_sec = start_time;
00311    new_message->start_time.tv_usec = 0L;
00312    new_message->early_timeout = early_timeout;
00313    strncpy(new_message->output, res_data, sizeof(new_message->output) - 1);
00314    new_message->output[sizeof(new_message->output) - 1] = 0;
00315 
00316    pthread_mutex_lock(&service_result_buffer.buffer_lock);
00317 
00318    // handle overflow conditions
00319    if (service_result_buffer.items == check_result_buffer_slots)
00320    {
00321       service_result_buffer.overflow++;
00322       service_result_buffer.tail = (service_result_buffer.tail + 1)
00323             % check_result_buffer_slots;
00324    }
00325 
00326    // save the data to the buffer
00327    ((service_message **)service_result_buffer.buffer)
00328          [service_result_buffer.head] = new_message;
00329 
00330    // increment the head counter and items
00331    service_result_buffer.head = (service_result_buffer.head + 1)
00332          % check_result_buffer_slots;
00333    if (service_result_buffer.items < check_result_buffer_slots)
00334       service_result_buffer.items++;
00335    if (service_result_buffer.items > service_result_buffer.high)
00336       service_result_buffer.high = service_result_buffer.items;
00337 
00338    pthread_mutex_unlock(&service_result_buffer.buffer_lock);
00339 
00340    return 0;
00341 }
00342 
00343 #elif CURRENT_NEB_API_VERSION == 3
00344 
00345 // comment out this line to post to N3 filesystem results queue
00346 #define DIRECT_POST
00347 
00348 #ifdef DIRECT_POST
00349 
00350 // a nagios core global (It shouldn't be global, but we're glad it is!)
00351 extern check_result * check_result_list;
00352 
00353 static check_result * dnxResultList = 0;
00354 static pthread_mutex_t dnxResultListMutex = PTHREAD_MUTEX_INITIALIZER;
00355 
00356 //----------------------------------------------------------------------------
00357 
00368 static long dnxTimeCompare(struct timeval * tv1, struct timeval * tv2)
00369 {
00370    long secdiff = (long)(tv1->tv_sec - tv2->tv_sec);
00371    long usecdiff = (long)(tv1->tv_usec - tv2->tv_usec);
00372    return secdiff? secdiff: usecdiff;
00373 }
00374 
00375 //----------------------------------------------------------------------------
00376 
00384 static void dnxAddResultToList(check_result * newcr)
00385 {
00386    check_result ** curp;
00387 
00388    assert(newcr);
00389 
00390    DNX_PT_MUTEX_LOCK(&dnxResultListMutex);
00391 
00392    for (curp = &dnxResultList; *curp; curp = &(*curp)->next)
00393       if (dnxTimeCompare(&(*curp)->finish_time, &newcr->finish_time) >= 0)
00394          break;
00395 
00396    newcr->next = *curp;
00397    *curp = newcr;
00398 
00399    DNX_PT_MUTEX_UNLOCK(&dnxResultListMutex);
00400 }
00401 
00402 //----------------------------------------------------------------------------
00403 
00426 static int nagios3xPostResult(service * svc, int check_type,
00427       int check_options, int schedule, int reschedule, 
00428       time_t start_time, time_t finish_time, int early_timeout,
00429       int exited_ok, int res_code, char * res_data)
00430 {
00431    check_result * newcr;
00432 
00433    // use malloc here, not xmalloc - nagios will free it
00434    if ((newcr = (check_result *)malloc(sizeof *newcr)) == 0)
00435       return DNX_ERR_MEMORY;
00436 
00437    // initialize and fill with result info
00438    newcr->object_check_type = SERVICE_CHECK;
00439    newcr->host_name = strdup(svc->host_name);
00440    newcr->service_description = strdup(svc->description);
00441    newcr->check_type = check_type;
00442    newcr->check_options = check_options;
00443    newcr->scheduled_check = schedule;
00444    newcr->reschedule_check = reschedule;
00445    newcr->output_file = 0;
00446    newcr->output_file_fd = -1;
00447    newcr->latency = svc->latency;
00448    newcr->start_time.tv_sec = start_time;
00449    newcr->start_time.tv_usec = 0;
00450    newcr->finish_time.tv_sec = finish_time;
00451    newcr->finish_time.tv_usec = 0;
00452    newcr->early_timeout = early_timeout;
00453    newcr->exited_ok = exited_ok;
00454    newcr->return_code = res_code;
00455    newcr->output = strdup(res_data);
00456    newcr->next = 0;
00457 
00458    dnxAddResultToList(newcr);
00459 
00460    return 0;
00461 }
00462 
00463 //----------------------------------------------------------------------------
00464 
00471 static check_result * dnxMergeLists(check_result * lista, check_result * listb)
00472 {
00473    check_result * result = 0;
00474 
00475    check_result ** iter;
00476    for (iter = &result; lista && listb; iter = &(*iter)->next)
00477       if (dnxTimeCompare(&lista->finish_time, &listb->finish_time) <= 0)
00478       { *iter = lista; lista = lista->next; }
00479       else
00480       {*iter = listb; listb = listb->next; }
00481 
00482    *iter = lista? lista: listb;
00483 
00484    return result;
00485 }
00486 
00487 //----------------------------------------------------------------------------
00488 
00495 static void dnxMoveResultsToNagios(void)
00496 {
00497    check_result * local;
00498 
00499    // safely save off currently local list
00500    DNX_PT_MUTEX_LOCK(&dnxResultListMutex);
00501    local = dnxResultList;
00502    dnxResultList = 0;
00503    DNX_PT_MUTEX_UNLOCK(&dnxResultListMutex);
00504 
00505    // merge local into check_result_list, store in check_result_list
00506    check_result_list = dnxMergeLists(local, check_result_list);
00507 }
00508 
00509 //----------------------------------------------------------------------------
00510 
00521 static int ehTimedEvent(int event_type, void * data)
00522 {
00523    nebstruct_timed_event_data * ted = (nebstruct_timed_event_data *)data;
00524    timed_event * event = (timed_event*)data;
00525    int ret;
00526 
00527    // sanity checks
00528    if (event_type != NEBCALLBACK_TIMED_EVENT_DATA || ted == 0)
00529       return ERROR;
00530 
00531    // we only care about REAPER events
00532    if (ted->event_type != EVENT_CHECK_REAPER)
00533       return OK;
00534 
00535    dnxDebug(3, "Reaper handler called.");
00536    
00537    dnxMoveResultsToNagios();
00538 
00539    return OK;
00540 }
00541 
00542 #else // !DIRECT_POST
00543 
00544 //----------------------------------------------------------------------------
00545 
00553 static void nagios3xMoveCheckResultToQueue(char * resfile)
00554 {
00555    char * outfile;
00556    int ret = -1;
00557 
00558    // a nagios 3x global variable
00559    extern char * check_result_path;
00560 
00561    // create a safe temp file
00562    // path length + 'cXXXXXX' (7) + '/' (1) + '.ok' (3) + null (1)
00563    if ((outfile = (char *)xmalloc(strlen(check_result_path) 
00564          + 7 + 1 + 3 + 1)) != 0)
00565    {
00566       int fd;
00567       //int oldmask = umask(077);
00568 
00569       sprintf(outfile, "%s/cXXXXXX", check_result_path);
00570       if ((fd = mkstemp(outfile)) > 0)
00571       {
00572 #ifdef __CYGWIN__
00573          // Cygwin cannot rename open files - gives Permission Denied
00574          close(fd);
00575 #endif
00576 
00577          // move the original file using a nagios core function
00578          ret = my_rename(resfile, outfile);
00579 
00580 #ifndef __CYGWIN__
00581          // close the file
00582          close(fd);
00583 #endif
00584 
00585          // create an ok-to-go indicator file
00586          strcat(outfile, ".ok");
00587          if ((fd = open(outfile, O_CREAT|O_WRONLY|O_TRUNC, S_IRUSR|S_IWUSR)) > 0)
00588             close(fd);
00589 
00590       }
00591 
00592       //umask(oldmask);
00593 
00594       xfree(outfile);
00595    }
00596 
00597    // delete the original file if allocation failed or it couldn't be moved
00598    if (ret != 0)
00599       unlink(resfile);
00600 }
00601 
00602 //----------------------------------------------------------------------------
00603 
00641 static int nagios3xPostResult(service * svc, int check_type,
00642       int check_options, int schedule, int reschedule, 
00643       time_t start_time, time_t finish_time, int early_timeout,
00644       int exited_ok, int res_code, char * res_data)
00645 {
00646    char * filename;
00647    FILE * fp = 0;
00648    int fd;
00649 
00650    // open a file in the check results path for storing check results
00651    // path length + 'checkXXXXXX' (11) + '/' (1) + '.ok' (3) + null (1)
00652    if ((filename = (char *)xmalloc(strlen(temp_path) 
00653          + 11 + 1 + 3 + 1)) == 0)
00654       return DNX_ERR_MEMORY;
00655 
00656    sprintf(filename, "%s/checkXXXXXX", temp_path);
00657    if ((fd = mkstemp(filename)) < 0
00658          || (fp = fdopen(fd, "w")) == 0)
00659    {
00660       xfree(filename);
00661       if (fd >= 0) close(fd);
00662       return DNX_ERR_OPEN;
00663    }
00664 
00665    // write check results to the queue file
00666    fprintf(fp, "### Active Check Result File ###\n");
00667    fprintf(fp, "file_time=%lu\n\n", (unsigned long)start_time);
00668    fprintf(fp, "### Nagios Service Check Result ###\n");
00669    fprintf(fp, "# Time: %s", ctime(&start_time));
00670    fprintf(fp, "host_name=%s\n", svc->host_name);
00671    fprintf(fp, "service_description=%s\n", svc->description);
00672    fprintf(fp, "check_type=%d\n", check_type);
00673    fprintf(fp, "check_options=%d\n", check_options);
00674    fprintf(fp, "scheduled_check=%d\n", schedule);
00675    fprintf(fp, "reschedule_check=%d\n", reschedule);
00676    fprintf(fp, "latency=%f\n", svc->latency);
00677    fprintf(fp, "start_time=%lu.0\n", (unsigned long)start_time);
00678    fprintf(fp, "finish_time=%lu.%lu\n", (unsigned long)finish_time);
00679    fprintf(fp, "early_timeout=%d\n", early_timeout);
00680    fprintf(fp, "exited_ok=%d\n", exited_ok);
00681    fprintf(fp, "return_code=%d\n", res_code);
00682    fprintf(fp, "output=%s\n", res_data);
00683 
00684    fclose(fp);
00685 
00686    nagios3xMoveCheckResultToQueue(filename);
00687 
00688    xfree(filename);
00689 
00690    return 0;
00691 }
00692 
00693 #endif // ?DIRECT_POST
00694 
00695 #else
00696 # error Unsupported NEB API version.
00697 #endif
00698 
00699 //----------------------------------------------------------------------------
00700 
00701 static int dnxPostResult(void * data, time_t start_time, unsigned delta,
00702       int early_timeout, int res_code, char * res_data)
00703 {
00704    DnxJobTransfer * job = (DnxJobTransfer *)data;
00705    service * svc = (service *)job->svc;
00706    int ret;
00707 
00708    assert(job);
00709 
00712    if (early_timeout)
00713       res_code = STATE_UNKNOWN;
00714 
00715 #if CURRENT_NEB_API_VERSION == 2
00716 
00717    ret = nagios2xPostResult(svc, start_time, early_timeout,
00718          res_code, res_data);
00719 
00720 #elif CURRENT_NEB_API_VERSION == 3
00721 
00725    ret = nagios3xPostResult(svc, SERVICE_CHECK_ACTIVE,
00726          job->chkopts, job->schedule, job->reschedule,
00727          start_time, start_time + delta, early_timeout,
00728          1, res_code, res_data);
00729 
00730 #else
00731 # error Unsupported NEB API version.
00732 #endif
00733 
00734    xfree(job); // free the originally allocated DnxJobTransfer structure
00735 
00736    return ret;
00737 }
00738 
00739 //----------------------------------------------------------------------------
00740 
00747 static int GetMsgHeader(DnxMsgHeader * phdr)
00748 {
00749    size_t toread = sizeof *phdr;
00750    char * bufptr = (char *)phdr;
00751    ssize_t bytesread;
00752 
00753    // read the message header from the plugin module
00754    while (toread > 0 && (bytesread = read(indatafd, bufptr, toread)) > 0)
00755    {
00756       toread -= bytesread;
00757       bufptr += bytesread;
00758    }
00759 
00760    // check for closed pipe
00761    if (bytesread == 0)
00762       return EPIPE;
00763 
00764    // check for error
00765    if (bytesread < 0)
00766    {
00767       int ret = errno;
00768 
00771       dnxDebug(1, "GetMsgHeader: read failed: %s.", strerror(ret));
00772       return ret;
00773    }
00774 
00775    // sanity check message format
00776    if (phdr->signature != DNX_MSG_SIGNATURE
00777          || phdr->structsz > DNX_MAX_XFER_SIZE)
00778    {
00779       dnxDebug(1, "GetMsgHeader: unsupported message format.");
00780       return DNX_ERR_UNSUPPORTED;
00781    }
00782 
00783    return 0;
00784 }
00785 
00786 //----------------------------------------------------------------------------
00787 
00794 static int ProcessResultTransferMsg(DnxResultTransfer * result)
00795 {
00796    size_t toread = result->hdr.structsz - sizeof result->hdr;
00797    char * bufptr = (char *)result + sizeof result->hdr;
00798    ssize_t bytesread;
00799    int ret;
00800 
00801    // read the rest of the result transfer structure
00802    while (toread > 0 && (bytesread = read(indatafd, bufptr, toread)) > 0)
00803    {
00804       bufptr += bytesread;
00805       toread -= bytesread;
00806    }
00807 
00808    // check for closed pipe
00809    if (bytesread == 0)
00810       return EPIPE;
00811 
00812    // check for error 
00813    if (bytesread < 0)
00814    {
00815       ret = errno;
00816 
00819       dnxDebug(1, "ProcessResultTransferMsg: read failed: %s.", strerror(ret));
00820       return ret;
00821    }
00822 
00823    // post result to Nagios
00824    if ((ret = dnxPostResult(result->payload, result->start_time, result->delta,
00825          result->early_timeout, result->res_code, result->res_data)) != 0)
00826       dnxLog("Failed to post result [%lu] to Nagios; \"%s\": %d.", 
00827             result->serial, result->res_data, ret);
00828 
00829    return ret;
00830 }
00831 
00832 //----------------------------------------------------------------------------
00833 
00842 static void * dnxResultsListener(void * arg) 
00843 {
00844    DnxResultTransfer * result = 0;
00845    size_t resultsz = 0;
00846    int terminating = 0;
00847 
00848    // ignore broken pipe signals to this thread
00849    signal(SIGPIPE, SIG_IGN);
00850    
00851    dnxLog("Results listener thread awaiting results...");
00852 
00853    while (!terminating)
00854    {
00855       int ret;
00856       DnxMsgHeader hdr;
00857       if ((ret = GetMsgHeader(&hdr)) == 0)
00858       {
00859          switch (hdr.msgtype)
00860          {
00861             case DNX_MSG_RESULT_TRANSFER:
00862                // resize result transfer buffer if necessary
00863                if (hdr.structsz > resultsz)
00864                {
00865                   DnxResultTransfer * tmp;
00866                   if ((tmp = (DnxResultTransfer *)xrealloc(result, hdr.structsz)) != 0)
00867                   {
00868                      result = tmp;
00869                      resultsz = hdr.structsz;
00870                   }
00871                }
00872 
00873                // copy header and read rest of job transfer message
00874                result->hdr = hdr;
00875                ret = ProcessResultTransferMsg(result);
00876                break;
00877          }
00878       }
00879       if (ret == EPIPE)
00880          terminating = 1;
00881    }
00882 
00883    dnxLog("Results listener thread terminating...");
00884 
00885    xfree(result);
00886    return 0;
00887 }
00888 
00889 //----------------------------------------------------------------------------
00890 
00897 static int GetAckNak(void)
00898 {
00899    int ret;
00900    size_t toread = sizeof ret;
00901    char * bufptr = (char *)&ret;
00902    ssize_t bytesread;
00903 
00904    while (toread > 0 && (bytesread = read(inackfd, bufptr, toread)) > 0)
00905    {
00906       bufptr += bytesread;
00907       toread -= bytesread;
00908    }
00909 
00910    // check for closed pipe
00911    if (bytesread == 0)
00912       return EPIPE;
00913 
00914    // check for error
00915    if (bytesread < 0)
00916    {
00917       ret = errno;
00918 
00921       dnxDebug(1, "GetAckNak: read failed: %s.", strerror(ret));
00922       return ret;
00923    }
00924 
00925    return ret;
00926 }
00927 
00928 //----------------------------------------------------------------------------
00929 
00939 static int AllocateNodeRequest(void)
00940 {
00941    DnxMsgHeader nodereq;
00942    size_t towrite = nodereq.structsz = sizeof nodereq;
00943    char * bufptr = (char *)&nodereq;
00944    ssize_t byteswritten;
00945    int ret;
00946 
00947    dnxDebug(3, "Allocating node request.");
00948 
00949    // allocate client node request - bail on EAGAIN - pipe full, no one's listening
00950    nodereq.signature = DNX_MSG_SIGNATURE;
00951    nodereq.msgtype = DNX_MSG_RESERVE_NODEREQ;
00952 
00953    while (towrite > 0 && (byteswritten = write(outdatafd, bufptr, towrite)) > 0)
00954    {
00955       bufptr += byteswritten;
00956       towrite -= byteswritten;
00957    }
00958 
00959    // check for closed pipe
00960    if (byteswritten == 0)
00961       return EPIPE;
00962 
00963    // check for error
00964    if (byteswritten < 0)
00965    {
00966       ret = errno;
00967 
00970       dnxDebug(1, "Alloc node request: write failed: %s.", strerror(ret));
00971       return ret;
00972    }
00973 
00974    return GetAckNak();
00975 }
00976 
00977 //----------------------------------------------------------------------------
00978 
00990 static int TransferJobToServer(DnxJobTransfer * job)
00991 {
00992    char * bufptr = (char *)job;
00993    size_t towrite = job->hdr.structsz;
00994    ssize_t byteswritten;
00995    int ret;
00996 
00997    assert(job);
00998 
00999    dnxDebug(3, "Transferring service check.");
01000 
01001    // write job to server - bail on EAGAIN - pipe full, no one's listening
01002    job->hdr.signature = DNX_MSG_SIGNATURE;
01003    job->hdr.msgtype = DNX_MSG_JOB_TRANSFER;
01004    while (towrite > 0 && (byteswritten = write(outdatafd, bufptr, towrite)) > 0)
01005    {
01006       bufptr += byteswritten;
01007       towrite -= byteswritten;
01008    }
01009 
01010    // check for closed socket
01011    if (byteswritten == 0)
01012       return EPIPE;
01013 
01014    // check for error
01015    if (byteswritten < 0)
01016    {
01017       ret = errno;
01018 
01021       dnxDebug(1, "Alloc node request: write failed: %s.", strerror(ret));
01022       return ret;
01023    }
01024 
01025    return GetAckNak();
01026 }
01027 
01028 //----------------------------------------------------------------------------
01029 
01044 static int dnxPostNewJob(unsigned long serial, nebstruct_service_check_data * ds)
01045 {
01046    service * svc = (service *)ds->OBJECT_FIELD_NAME;
01047    DnxJobTransfer * job;
01048    size_t jobsz;
01049    int ret;
01050 
01051    assert(ds);
01052    assert(svc);
01053    assert(ds->command_line);
01054 
01055    // tell server to allocate a node request
01056    if ((ret = AllocateNodeRequest()) != 0)
01057       return ret;
01058 
01059    // allocate a job transfer structure
01060    jobsz = sizeof(*job) + strlen(ds->command_line);
01061    if ((job = (DnxJobTransfer *)xmalloc(jobsz)) == 0)
01062       return DNX_ERR_MEMORY;
01063 
01064    job->hdr.structsz = jobsz;
01065    job->serial = serial;
01066    job->payload = job;  // track job transfer structure between job and result
01067    job->svc = svc;      // track service for use in result posting 
01068    job->start_time = ds->start_time.tv_sec;
01069    job->timeout = ds->timeout;
01070    job->expires = job->start_time + job->timeout + 5;
01071 
01072 #if CURRENT_NEB_API_VERSION == 3
01073    {
01074       // a nagios 3.x global variable
01075       extern check_result check_result_info;
01076 
01079       job->chkopts = check_result_info.check_options;
01080       job->schedule = check_result_info.scheduled_check;
01081       job->reschedule = check_result_info.reschedule_check;
01082    }
01083 #endif
01084 
01085    strcpy(job->cmd, ds->command_line);
01086 
01087    return TransferJobToServer(job);
01088 } 
01089 
01090 //----------------------------------------------------------------------------
01091 
01104 static int ehSvcCheck(int event_type, void * data)
01105 {
01106    static unsigned long serial = 0;
01107 
01108    nebstruct_service_check_data * svcdata = (nebstruct_service_check_data *)data;
01109    int ret;
01110 
01111    // shouldn't happen because we didn't sign up for others
01112    if (event_type != NEBCALLBACK_SERVICE_CHECK_DATA)
01113       return OK;
01114 
01115    // if this happens it's a bug in Nagios
01116    if (svcdata == 0)
01117    {
01118       dnxDebug(1, "Service handler received NULL service data structure.");
01119       return ERROR;  // shouldn't happen - internal Nagios error
01120    }
01121 
01122    // we only want the first BROKER event we get on each check
01123    if (svcdata->type != NEBTYPE_SERVICECHECK_INITIATE)
01124       return OK;     // ignore non-initiate service checks
01125 
01126    // every valid service check is assigned a unique serial number
01127    serial++;
01128 
01129    dnxDebug(3, "Service handler received service check [%lu] at %lu.",
01130          serial, (unsigned long)svcdata->start_time.tv_sec);
01131 
01132    // check for local execution pattern on command line
01133    if (cfg.localCheckPattern 
01134          && regexec(&regEx, svcdata->command_line, 0, 0, 0) == 0)
01135    {
01136       dnxDebug(1, "Regex filter. Service check [%lu] will execute locally: %s.", 
01137             serial, svcdata->command_line);
01138       return OK;
01139    }
01140 
01141    // try to post this job to the dnx server process
01142    if ((ret = dnxPostNewJob(serial, svcdata)) != DNX_OK)
01143    {
01144       dnxDebug(1, "Post failed: %s. Service check [%lu] will execute locally: %s.", 
01145             dnxErrorString(ret), serial, svcdata->command_line);
01146       return OK;
01147    }
01148 
01149    return NEBERROR_CALLBACKOVERRIDE;   // tell nagios we want it
01150 }
01151 
01152 //----------------------------------------------------------------------------
01153 
01162 static int dnxCalculateJobListSize(void)
01163 {
01164    int size = nagiosGetServiceCount();
01165 
01166    // zero doesn't make sense...
01167    if (size < 1)
01168    {
01169       size = 100;
01170       dnxLog("No Nagios services defined! "
01171              "Defaulting to %d slots in the DNX job queue.", size);
01172    }
01173    return size;
01174 }
01175 
01176 //----------------------------------------------------------------------------
01177 
01191 static int execServerProcess(void)
01192 {
01193 
01194 #define P0R 0     // Pipe 0 Read descriptor
01195 #define P0W 1     // Pipe 0 Write descriptor
01196 #define P1R 2     // Pipe 1 Read descriptor
01197 #define P1W 3     // Pipe 1 Write descriptor
01198 #define P2R 4     // Pipe 2 Read descriptor
01199 #define P2W 5     // Pipe 2 Write descriptor
01200 #define PMX 6     // Pipe Max desscriptors
01201 
01202    pid_t pid;
01203    int flags, fds[PMX];
01204 
01205    // open three IPC pipes and fork the process
01206    memset(fds, -1, sizeof fds);
01207    if (pipe(fds + P0R) < 0 || pipe(fds + P1R) < 0 
01208          || pipe(fds + P2R) < 0 || (pid = fork()) < 0)
01209    {
01210       int i, ret = errno;
01211 
01212       for (i = 0; i < elemcount(fds); i++)
01213          if (fds[i] >= 0) 
01214             close(fds[i]);
01215 
01216       return ret;
01217    }
01218 
01219    // *** zero is child process ***
01220 
01221    if (pid == 0)
01222    {
01223       int svclistsz = dnxCalculateJobListSize();
01224       char strsvclistsz[16];
01225 
01226       // create a string version of the service list size
01227       snprintf(strsvclistsz, sizeof strsvclistsz, "%d", svclistsz);
01228 
01229       close(fds[P0W]);     // job xfer    - server closes write end
01230       close(fds[P1R]);     // ack/nak     - server closes read end
01231       close(fds[P2R]);     // result xfer - server closes read end
01232       
01233       // dup open pipe descriptors onto well-known descriptors, if needed
01234       if (fds[P0R] != DNX_SRVFD)
01235       {
01236          dup2(fds[P0R], DNX_SRVFD);
01237          close(fds[P0R]); 
01238       }
01239       if (fds[P1W] != DNX_SAKFD)
01240       {
01241          dup2(fds[P1W], DNX_SAKFD);
01242          close(fds[P1W]);
01243       }
01244       if (fds[P2W] != DNX_PLGFD)
01245       {
01246          dup2(fds[P2W], DNX_PLGFD);
01247          close(fds[P2W]);
01248       }
01249 
01250       // exec DNX Server process, pass config args
01251       execl(cfg.serverPath, cfg.serverPath, "-c", cfgfile, 
01252             "-j", strsvclistsz, (char *)0);
01253 
01254       _exit(1);            // error in exec - kill child
01255 
01256       // *** NO RETURN FROM HERE ***
01257    }
01258 
01259    // *** otherwise we're still in the parent process ***
01260 
01261    close(fds[P0R]);        // job xfer    - plugin closes read end
01262    close(fds[P1W]);        // ack/nak     - plugin closes write end
01263    close(fds[P2W]);        // result xfer - plugin closes write end
01264 
01265    // save open pipe descriptors in global variables
01266    outdatafd = fds[P0W];
01267    inackfd   = fds[P1R];
01268    indatafd  = fds[P2R];
01269 
01270    // set non-block flag on plugin -> server write channel
01271    // if the pipe is full, we'll let Nagios handle the check
01272    if ((flags = fcntl(outdatafd, F_GETFL, 0)) >= 0)
01273    {
01274       flags |= O_NONBLOCK;
01275       fcntl(outdatafd, F_SETFL, flags);
01276    }
01277 
01278    // save server pid for wait call
01279    serverpid = pid;
01280 
01281    return 0;
01282 }
01283 
01284 //----------------------------------------------------------------------------
01285 
01290 static int terminateServerProcess(void)
01291 {
01292    int ret = 0;
01293    if (serverpid > 0)
01294    {
01295       kill(serverpid, SIGTERM);  // sigterm stops agent thread
01296       close(outdatafd);          // closing pipes stop request handler
01297       close(inackfd);
01298       if (waitpid(serverpid, 0, 0) < 0)
01299          ret = errno;
01300    }
01301    return ret;
01302 }
01303 
01304 //----------------------------------------------------------------------------
01305 
01306 // forward declaration due to circular reference
01307 static int ehProcessData(int event_type, void * data);
01308 
01313 static int dnxPluginDeInit(void)
01314 {
01315    // deregister for all nagios events we previously registered for...
01316    neb_deregister_callback(NEBCALLBACK_PROCESS_DATA, ehProcessData);
01317    neb_deregister_callback(NEBCALLBACK_SERVICE_CHECK_DATA, ehSvcCheck);
01318 #if CURRENT_NEB_API_VERSION == 3 && defined(DIRECT_POST)
01319    neb_deregister_callback(NEBCALLBACK_TIMED_EVENT_DATA, ehTimedEvent);
01320 #endif
01321 
01322    // shutdown server child process
01323    if (serverpid != 0)
01324       terminateServerProcess();
01325 
01326    // terminate results listener thread
01327    if (rlthread != 0)
01328    {
01329       close(indatafd);           // closing pipe signals thread
01330       pthread_join(rlthread, 0);
01331    } 
01332 
01333    dnxLog("DNX Nagios Plugin shutdown completed.");
01334 
01335    return OK;  // Nagios OK value
01336 }
01337 
01338 //----------------------------------------------------------------------------
01339 
01344 static int dnxPluginInit(void)
01345 {
01346    int ret;
01347 
01348    // start dnx server child process
01349    if ((ret = execServerProcess()) != 0)
01350    {
01351       dnxLog("Error starting DNX server process: %s.", dnxErrorString(ret));
01352       return ret;
01353    }
01354 
01355    dnxLog("DNX Server process started.");
01356 
01357    // start results listener
01358    if ((ret = pthread_create(&rlthread, 0, dnxResultsListener, (void *)0)) != 0)
01359       return ret;
01360    
01361    dnxLog("Results listener thread started.");
01362 
01363 #if CURRENT_NEB_API_VERSION == 3 && defined(DIRECT_POST)
01364 
01365    // register for timed event to piggy-back on reaper thread
01366    neb_register_callback(NEBCALLBACK_TIMED_EVENT_DATA, myHandle, 0, ehTimedEvent);
01367    dnxLog("Registered for TIMEDEVENT_EXECUTE event.");
01368 
01369 #endif
01370 
01371    // registration for this event starts everything rolling
01372    neb_register_callback(NEBCALLBACK_SERVICE_CHECK_DATA, myHandle, 0, ehSvcCheck);
01373    dnxLog("Registered for SERVICE_CHECK_DATA event.");
01374 
01375    dnxLog("DNX Nagios Plugin initialization completed.");
01376 
01377    return OK;  // Nagios OK value
01378 }
01379 
01380 //----------------------------------------------------------------------------
01381 
01388 static int launchScript(char * script)
01389 {
01390    int ret;
01391 
01392    assert(script);
01393 
01394    // exec the script - system waits till child completes
01395    if ((ret = system(script)) == -1)
01396    {
01397       dnxLog("Failed to exec script: %s.", strerror(errno));
01398       ret = DNX_ERR_INVALID;
01399    }
01400    else
01401       ret = DNX_OK;
01402 
01403    dnxLog("Sync script returned %d.", WEXITSTATUS(ret));
01404 
01405    return ret;
01406 }
01407 
01408 //----------------------------------------------------------------------------
01409 
01426 static int ehProcessData(int event_type, void * data)
01427 {
01428    nebstruct_process_data *procdata = (nebstruct_process_data *)data;
01429 
01430    // validate our event type - ignore wrong event type
01431    assert(event_type == NEBCALLBACK_PROCESS_DATA);
01432    if (event_type != NEBCALLBACK_PROCESS_DATA)
01433       return OK;
01434 
01435    // sanity-check our data structure - should never happen
01436    if (!procdata)
01437    {
01438       dnxLog("Startup handler received NULL process data structure.");
01439       return ERROR;
01440    }
01441 
01442    // look for process event loop start event
01443    if (procdata->type == NEBTYPE_PROCESS_EVENTLOOPSTART)
01444    {
01445       dnxDebug(2, "Startup handler received PROCESS_EVENTLOOPSTART event.");
01446 
01447       // execute sync script, if defined
01448       if (cfg.syncScript)
01449       {
01450          dnxLog("Startup handler executing plugin sync script: %s.", cfg.syncScript);
01451 
01452          // NB: This halts Nagios execution until the script exits...
01453          launchScript(cfg.syncScript);
01454       }
01455 
01456       // if plugin init fails, do plugin shutdown
01457       if (dnxPluginInit() != 0)
01458          dnxPluginDeInit();
01459    }
01460    return OK;
01461 }
01462 
01463 /*--------------------------------------------------------------------------
01464                                  INTERFACE
01465   --------------------------------------------------------------------------*/
01466 
01467 //----------------------------------------------------------------------------
01468 
01478 int nebmodule_deinit(int flags, int reason)
01479 {
01480    dnxLog("-------- DNX Nagios Plugin Module Shutdown Initiated --------");
01481    
01482    dnxPluginDeInit();
01483    releaseConfig();
01484    xheapchk();
01485 
01486    dnxLog("-------- DNX Nagios Plugin Module Shutdown Completed --------");
01487    return 0;
01488 }
01489 
01490 //----------------------------------------------------------------------------
01491 
01504 int nebmodule_init(int flags, char * args, nebmodule * handle)
01505 {
01506    int ret;
01507 
01508    myHandle = handle;
01509 
01510    // module args string should contain a fully-qualified config file path
01511    if (!args || !*args)
01512       args = DNX_DEFAULT_SERVER_CONFIG_FILE;
01513 
01514    cfgfile = args;
01515    if ((ret = initConfig(cfgfile)) != 0)
01516       return ERROR;
01517 
01518    // set configured debug level and syslog log facility code
01519    dnxLogInit(cfg.logFilePath, cfg.debugFilePath, cfg.auditFilePath, 
01520          &cfg.debugLevel);
01521 
01522    dnxLog("-------- DNX Nagios Plugin Module Version %s Startup --------", VERSION);
01523    dnxLog("Copyright (c) 2006-2010 Intellectual Reserve. All rights reserved.");
01524    if (cfg.debugLevel)
01525       dnxLog("Debug logging enabled at level %d to %s.",
01526             cfg.debugLevel, cfg.debugFilePath);
01527    if (cfg.auditFilePath)
01528       dnxLog("Auditing enabled to %s.", cfg.auditFilePath);
01529 
01530    // subscribe to PROCESS_DATA call-backs in order to defer initialization
01531    //    until after Nagios validates its configuration and environment.
01532    if ((ret = neb_register_callback(NEBCALLBACK_PROCESS_DATA,
01533          myHandle, 0, ehProcessData)) != OK)
01534    {
01535       dnxLog("PROCESS_DATA event registration failed: %s.", dnxErrorString(ret));
01536       terminateServerProcess();
01537       releaseConfig();
01538       return ERROR;
01539    }
01540    start_time = time(0);
01541 
01542    dnxLog("-------- DNX Nagios Plugin Module Startup Complete --------");
01543 
01544    return OK;
01545 }
01546 

Generated on Tue Apr 13 15:48:07 2010 for DNX by  doxygen 1.5.6