dnxServerMain.c

Go to the documentation of this file.
00001 /*--------------------------------------------------------------------------
00002 
00003    Copyright (c) 2006-2010, Intellectual Reserve, Inc. All rights reserved.
00004 
00005    This program is free software; you can redistribute it and/or modify
00006    it under the terms of the GNU General Public License version 2 as
00007    published by the Free Software Foundation.
00008 
00009    This program is distributed in the hope that it will be useful,
00010    but WITHOUT ANY WARRANTY; without even the implied warranty of
00011    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00012    GNU General Public License for more details.
00013 
00014    You should have received a copy of the GNU General Public License
00015    along with this program; if not, write to the Free Software
00016    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00017 
00018   --------------------------------------------------------------------------*/
00019 
00035 #ifdef HAVE_CONFIG_H
00036 # include "config.h"
00037 #else
00038 # define VERSION "<unknown>"
00039 #endif
00040 
00041 #include "dnxServerMain.h"
00042 #include "dnxNebMain.h"
00043 #include "dnxError.h"
00044 #include "dnxDebug.h"
00045 #include "dnxCfgParser.h"
00046 #include "dnxLogging.h"
00047 #include "dnxCollector.h"
00048 #include "dnxDispatcher.h"
00049 #include "dnxRegistrar.h"
00050 #include "dnxJobList.h"
00051 #include "dnxStats.h"
00052 #include "dnxXml.h"
00053 
00054 #if HAVE_GETOPT_LONG
00055 #define _GNU_SOURCE
00056 #include <getopt.h>
00057 #endif
00058 
00059 #ifndef SYSCONFDIR
00060 # define SYSCONFDIR     "/etc"
00061 #endif
00062 
00063 #ifndef SYSLOGDIR
00064 # define SYSLOGDIR      "/var/log"
00065 #endif
00066 
00067 #ifndef COMPILE_FLAGS
00068 # define COMPILE_FLAGS  "<unknown>"
00069 #endif
00070 
00071 #include <sys/types.h>
00072 #include <sys/stat.h>
00073 #include <unistd.h>
00074 #include <stdio.h>
00075 #include <stdlib.h>
00076 #include <stdint.h>
00077 #include <stdarg.h>
00078 #include <signal.h>
00079 #include <fcntl.h>
00080 #include <netdb.h>
00081 #include <netinet/in.h>
00082 
00083 #define elemcount(x) (sizeof(x)/sizeof(*(x)))
00084 
00085 #define DNX_DEFAULT_SERVER_CONFIG_FILE SYSCONFDIR "/dnxServer.cfg"
00086 #define DNX_DEFAULT_LOG                SYSLOGDIR  "/dnxsrv.log"
00087 #define DNX_DEFAULT_DBGLOG             SYSLOGDIR  "/dnxsrv.dbg.log"
00088 #define DNX_DEFAULT_SERVER_PATH        SYSBINDIR  "/dnxServer"
00089 
00091 typedef struct DnxCfgData
00092 {
00093    char * dispatcherUrl;            
00094    char * collectorUrl;             
00095    char * agentUrl;                 
00096    char * authWorkerNodes;          
00097    unsigned maxNodeRequests;        
00098    unsigned minServiceSlots;        
00099    unsigned expirePollInterval;     
00100    char * logFilePath;              
00101    char * debugFilePath;            
00102    unsigned debugLevel;             
00103    char * auditFilePath;            
00104 } DnxCfgData;
00105 
00106 // module static data
00107 static DnxCfgData cfg;              
00108 static DnxCfgParser * parser;       
00109 static DnxJobList * joblist;        
00110 static DnxRegistrar * registrar;    
00111 static DnxDispatcher * dispatcher;  
00112 static DnxCollector * collector;    
00113 static time_t start_time;           
00114 static char * s_cfgfile;            
00115 static int s_queuesize;             
00116 static char * s_progname;           
00117 static char * s_cmdover = 0;        
00118 
00119 /*--------------------------------------------------------------------------
00120                               IMPLEMENTATION
00121   --------------------------------------------------------------------------*/
00122 
00127 static void version(FILE * fp)
00128 {
00129    char * vertxt = versionText();
00130    if (vertxt)
00131    {
00132       fprintf(fp, "%s\n", vertxt);
00133       xfree(vertxt);
00134       vertxt = 0;
00135    }
00136 }
00137 
00138 //----------------------------------------------------------------------------
00139 
00142 static void usage(void)
00143 {
00144 
00145 #if HAVE_GETOPT_LONG
00146 # define OL_CFGFILE   ", --cfgfile  "
00147 # define OL_QUEUESIZE ", --queuesize"
00148 # define OL_VERSION   ", --version  "
00149 # define OL_HELP      ", --help     "
00150 #else
00151 # define OL_CFGFILE
00152 # define OL_QUEUESIZE
00153 # define OL_VERSION
00154 # define OL_HELP
00155 #endif
00156 
00157    version(stderr);
00158    fprintf(stderr,
00159       "  Usage: %s [options]\n"
00160       "    Where [options] are:\n"
00161       "      -c" OL_CFGFILE   " <file>   specify the file and path of the config file.\n"
00162       "      -j" OL_QUEUESIZE " <num>    specify the size of the Nagios service queue.\n"
00163       "      -v" OL_VERSION   "          display DNX server version and exit.\n"
00164       "      -h" OL_HELP      "          display this help screen and exit.\n"
00165       "\n",
00166       s_progname
00167    );
00168    exit(-1);
00169 }
00170 
00171 //----------------------------------------------------------------------------
00172 
00180 static int getOptions(int argc, char ** argv)
00181 {
00182 // extern int optind;
00183    extern char * optarg;
00184    extern int opterr, optopt;
00185 
00186    static char opts[] = "c:j:vh";
00187 
00188 #if HAVE_GETOPT_LONG
00189    static struct option longopts[] =
00190    {
00191       { "cfgfile",  required_argument, 0, 'c' },
00192       { "queuesize",required_argument, 0, 'j' },
00193       { "version",  no_argument,       0, 'v' },
00194       { "help",     no_argument,       0, 'h' },
00195       { 0, 0, 0, 0 },
00196    };
00197 #endif
00198 
00199    int ch;
00200    char * cp;
00201 
00202    // set program base name
00203    s_progname = (char *)((cp = strrchr(argv[0], '/')) != 0 ? (cp + 1) : argv[0]);
00204 
00205    opterr = 0; /* Disable error messages */
00206 
00207 #if HAVE_GETOPT_LONG
00208    while ((ch = getopt_long(argc, argv, opts, longopts, 0)) != -1)
00209 #else
00210    while ((ch = getopt(argc, argv, opts)) != -1)
00211 #endif
00212    {
00213       switch (ch)
00214       {
00215          case 'c': s_cfgfile = optarg; break;
00216          case 'j': s_queuesize = atoi(optarg); break;
00217          case 'v': version(stdout); exit(0);
00218          case 'h':
00219          default : usage();
00220       }
00221    }
00222 
00223    if (!s_cfgfile)
00224       s_cfgfile = DNX_DEFAULT_SERVER_CONFIG_FILE;
00225 
00226    return 0;
00227 }
00228 
00229 //----------------------------------------------------------------------------
00230 
00243 static int validateCfg(DnxCfgDict * dict, void ** vptrs, void * passthru)
00244 {
00245    int err, ret = DNX_ERR_INVALID;
00246    DnxCfgData cfg;
00247 
00248    assert(dict && vptrs);
00249 
00250    // setup data structure so we can use the same functionality we had before
00251    cfg.dispatcherUrl      = (char *)vptrs[ 0];
00252    cfg.collectorUrl       = (char *)vptrs[ 1];
00253    cfg.agentUrl           = (char *)vptrs[ 2];
00254    cfg.authWorkerNodes    = (char *)vptrs[ 3];
00255    cfg.maxNodeRequests    = (unsigned)(intptr_t)vptrs[ 4];
00256    cfg.minServiceSlots    = (unsigned)(intptr_t)vptrs[ 5];
00257    cfg.expirePollInterval = (unsigned)(intptr_t)vptrs[ 6];
00258    cfg.logFilePath        = (char *)vptrs[ 7];
00259    cfg.debugFilePath      = (char *)vptrs[ 8];
00260    cfg.debugLevel         = (unsigned)(intptr_t)vptrs[ 9];
00261    cfg.auditFilePath      = (char *)vptrs[10];
00262 
00263    // validate configuration items in context
00264    if (!cfg.dispatcherUrl)
00265       dnxLog("config: Missing channelDispatcher parameter.");
00266    else if (!cfg.collectorUrl)
00267       dnxLog("config: Missing channelCollector parameter.");
00268    else if (!cfg.agentUrl)
00269       dnxLog("config: Missing channelAgent parameter.");
00270    else if (cfg.maxNodeRequests < 1)
00271       dnxLog("config: Invalid maxNodeRequests parameter.");
00272    else if (cfg.minServiceSlots < 1)
00273       dnxLog("config: Invalid minServiceSlots parameter.");
00274    else if (cfg.expirePollInterval < 1)
00275       dnxLog("config: Invalid expirePollInterval parameter.");
00276    else
00277       ret = 0;
00278 
00279    return ret;
00280 }
00281 
00282 //----------------------------------------------------------------------------
00283 
00290 static int initConfig(char * cfgfile)
00291 {
00292    DnxCfgDict dict[] =
00293    {  // Do NOT change the order, unless you know what you're doing!
00294       { "channelDispatcher",  DNX_CFG_URL,      &cfg.dispatcherUrl      },
00295       { "channelCollector",   DNX_CFG_URL,      &cfg.collectorUrl       },
00296       { "channelAgent",       DNX_CFG_URL,      &cfg.agentUrl           },
00297       { "authWorkerNodes",    DNX_CFG_STRING,   &cfg.authWorkerNodes    },
00298       { "maxNodeRequests",    DNX_CFG_UNSIGNED, &cfg.maxNodeRequests    },
00299       { "minServiceSlots",    DNX_CFG_UNSIGNED, &cfg.minServiceSlots    },
00300       { "expirePollInterval", DNX_CFG_UNSIGNED, &cfg.expirePollInterval },
00301       { "logFile",            DNX_CFG_FSPATH,   &cfg.logFilePath        },
00302       { "debugFile",          DNX_CFG_FSPATH,   &cfg.debugFilePath      },
00303       { "debugLevel",         DNX_CFG_UNSIGNED, &cfg.debugLevel         },
00304       { "auditFile",          DNX_CFG_FSPATH,   &cfg.auditFilePath      },
00305       { 0 },
00306    };
00307    char cfgdefs[] =
00308       "channelDispatcher = udp://0:12480\n"
00309       "channelCollector = udp://0:12481\n"
00310       "channelAgent = udp://0:12482\n"
00311       "maxNodeRequests = 0x7FFFFFFF\n"
00312       "minServiceSlots = 100\n"
00313       "expirePollInterval = 5\n"
00314       "logFile = " DNX_DEFAULT_LOG "\n"
00315       "debugFile = " DNX_DEFAULT_DBGLOG "\n";
00316 
00317    int ret;
00318 
00319    // create global configuration parser object
00320    if ((ret = dnxCfgParserCreate(cfgdefs, cfgfile, s_cmdover, dict,
00321          validateCfg, &parser)) != 0)
00322       return ret;
00323 
00324    // parse configuration file; pass defaults
00325    if ((ret = dnxCfgParserParse(parser, 0)) != 0)
00326       dnxCfgParserDestroy(parser);
00327 
00328    return ret;
00329 }
00330 
00331 //----------------------------------------------------------------------------
00332 
00337 static void freeCfgData(DnxCfgData * cpy)
00338 {
00339    xfree(cpy->dispatcherUrl);
00340    xfree(cpy->collectorUrl);
00341    xfree(cpy->agentUrl);
00342    xfree(cpy->authWorkerNodes);
00343    xfree(cpy->logFilePath);
00344    xfree(cpy->debugFilePath);
00345    xfree(cpy->auditFilePath);
00346    xfree(cpy);
00347    cpy = 0;
00348 }
00349 
00350 //----------------------------------------------------------------------------
00351 
00358 static DnxCfgData * copyCfgData(DnxCfgData * org)
00359 {
00360    DnxCfgData * cpy;
00361 
00362    // make new config structure
00363    if ((cpy = (DnxCfgData *)xmalloc(sizeof *cpy)) == 0)
00364       return 0;
00365 
00366    // copy all values
00367    *cpy = *org;
00368 
00369    // attempt to make string buffer copies
00370    cpy->dispatcherUrl = xstrdup(org->dispatcherUrl);
00371    cpy->collectorUrl = xstrdup(org->collectorUrl);
00372    cpy->agentUrl = xstrdup(org->agentUrl);
00373    cpy->authWorkerNodes = xstrdup(org->authWorkerNodes);
00374    cpy->maxNodeRequests = org->maxNodeRequests;
00375    cpy->minServiceSlots = org->minServiceSlots;
00376    cpy->expirePollInterval = org->expirePollInterval;
00377    cpy->logFilePath = xstrdup(org->logFilePath);
00378    cpy->debugFilePath = xstrdup(org->debugFilePath);
00379    cpy->debugLevel = org->debugLevel;
00380    cpy->auditFilePath = xstrdup(org->auditFilePath);
00381 
00382    return cpy;
00383 }
00384 
00385 //----------------------------------------------------------------------------
00386 
00395 static void logGblConfigChanges(DnxCfgData * ocp, DnxCfgData * ncp)
00396 {
00397    if (strcmp(ocp->dispatcherUrl, ncp->dispatcherUrl) != 0)
00398       dnxLog("Config parameter 'channelDispatcher' changed from %s to %s. "
00399             "NOTE: Changing the Dispatcher URL requires a restart.",
00400             ocp? ocp->dispatcherUrl: "<unknown>", ncp->dispatcherUrl);
00401 
00402    if (strcmp(ocp->collectorUrl, ncp->collectorUrl) != 0)
00403       dnxLog("Config parameter 'channelCollector' changed from %s to %s. "
00404             "NOTE: Changing the Collector URL requires a restart.",
00405             ocp? ocp->collectorUrl: "<unknown>", ncp->collectorUrl);
00406 
00407    if (strcmp(ocp->agentUrl, ncp->agentUrl) != 0)
00408       dnxLog("Config parameter 'channelAgent' changed from %s to %s. "
00409             "NOTE: Changing the Agent URL requires a restart.",
00410             ocp? ocp->agentUrl: "<unknown>", ncp->agentUrl);
00411 
00412    if (strcmp(ocp->authWorkerNodes, ncp->authWorkerNodes) != 0)
00413       dnxLog("Config parameter 'authWorkerNodes' changed from %s to %s. ",
00414             ocp? ocp->authWorkerNodes: "<unknown>", ncp->authWorkerNodes);
00415 
00416    if (ocp->maxNodeRequests != ncp->maxNodeRequests)
00417       dnxLog("Config parameter 'maxNodeRequests' changed from %u to %u. "
00418             "NOTE: Changing the maximum node requests requires a restart.",
00419             ocp? ocp->maxNodeRequests: 0, ncp->maxNodeRequests);
00420 
00421    if (ocp->minServiceSlots != ncp->minServiceSlots)
00422       dnxLog("Config parameter 'minServiceSlots' changed from %u to %u. "
00423             "NOTE: Changing the minimum service requests requires a restart.",
00424             ocp? ocp->minServiceSlots: 0, ncp->minServiceSlots);
00425 
00426    if (ocp->expirePollInterval != ncp->expirePollInterval)
00427       dnxLog("Config parameter 'expirePollInterval' changed from %u to %u. ",
00428             ocp? ocp->expirePollInterval: 0, ncp->expirePollInterval);
00429 
00430    if (strcmp(ocp->logFilePath, ncp->logFilePath) != 0)
00431       dnxLog("Config parameter 'logFile' changed from %s to %s. "
00432             "NOTE: Changing the server log file path requires a restart.",
00433             ocp? ocp->logFilePath: "<unknown>", ncp->logFilePath);
00434 
00435    if (strcmp(ocp->debugFilePath, ncp->debugFilePath) != 0)
00436       dnxLog("Config parameter 'debugFile' changed from %s to %s. "
00437             "NOTE: Changing the server debug log file path requires a restart.",
00438             ocp? ocp->debugFilePath: "<unknown>", ncp->debugFilePath);
00439 
00440    if (ocp->debugLevel != ncp->debugLevel)
00441       dnxLog("Config parameter 'debugLevel' changed from %u to %u.",
00442             ocp? ocp->debugLevel: 0, ncp->debugLevel);
00443    
00444    if (strcmp(ocp->auditFilePath, ncp->auditFilePath) != 0)
00445       dnxLog("Config parameter 'auditFile' changed from %s to %s. "
00446             "NOTE: Changing the audit log file path requires a restart.",
00447             ocp? ocp->auditFilePath: "<unknown>", ncp->auditFilePath);
00448 }
00449 
00450 //----------------------------------------------------------------------------
00451 
00460 static int dnxCalculateJobListSize(void)
00461 {
00462    int size = s_queuesize; // get value passed (maybe) on the cmdline
00463 
00464    // zero doesn't make sense...
00465    if (size < 1)
00466    {
00467       size = 100;
00468       dnxLog("No Nagios services defined! "
00469              "Defaulting to %d slots in the DNX job queue.", size);
00470    }
00471 
00472    // check for configuration minServiceSlots override
00473    if (size < cfg.minServiceSlots)
00474    {
00475       dnxLog("Overriding requested service check slot count. "
00476              "Increasing from %d to configured minimum: %d.",
00477              size, cfg.minServiceSlots);
00478       size = cfg.minServiceSlots;
00479    }
00480 
00481    // check for configuration maxNodeRequests override
00482    if (size > cfg.maxNodeRequests)
00483    {
00484       dnxLog("Overriding requested service check slot count. "
00485              "Decreasing from %d to configured maximum: %d.", size,
00486              cfg.maxNodeRequests);
00487       size = cfg.maxNodeRequests;
00488    }
00489    return size;
00490 }
00491 
00492 //----------------------------------------------------------------------------
00493 
00504 static int dnxPostNewJob(DnxJobList * joblist, DnxJobTransfer * job,
00505       DnxNodeRequest * pNode)
00506 {
00507    int ret;
00508    DnxNewJob Job;
00509 
00510    // fill-in the job structure with the necessary information
00511    dnxMakeXID(&Job.xid, DNX_OBJ_JOB, job->serial, 0);
00512    Job.cmd        = xstrdup(job->cmd);
00513    Job.start_time = job->start_time;
00514    Job.timeout    = job->timeout;
00515    Job.expires    = job->expires;
00516    Job.payload    = job->payload;
00517    Job.pNode      = pNode;
00518 
00519    dnxDebug(2, "Posting New Job [%lu]: %s.", job->serial, job->cmd);
00520 
00521    // post to the Job Queue
00522    if ((ret = dnxJobListAdd(joblist, &Job)) != DNX_OK)
00523    {
00524       dnxStatsInc(0, JOBS_REJECTED_NO_SLOTS);
00525       dnxLog("Failed to post Job [%lu]; \"%s\": %d.", 
00526             Job.xid.objSerial, Job.cmd, ret);
00527    }
00528    else
00529    {
00530       dnxStatsInc(0, JOBS_HANDLED);
00531       dnxAuditJob(&Job, "ASSIGN");
00532    }
00533    return ret;
00534 }
00535 
00536 //----------------------------------------------------------------------------
00537 
00546 static int ProcessJobTransferMsg(DnxJobTransfer * job, DnxNodeRequest * node)
00547 {
00548    size_t toread = job->hdr.structsz - sizeof job->hdr;
00549    char * bufptr = (char *)job + sizeof job->hdr;
00550    ssize_t bytesread;
00551    int ret;
00552 
00553    // read the rest of the job transfer structure
00554    while (toread > 0 && (bytesread = read(DNX_SRVFD, bufptr, toread)) > 0)
00555    {
00556       bufptr += bytesread;
00557       toread -= bytesread;
00558    }
00559 
00560    // check for closed pipe
00561    if (bytesread == 0)
00562       return EPIPE;
00563 
00564    // check for error 
00565    if (bytesread < 0)
00566    {
00567       ret = errno;
00568       dnxDebug(1, "ProcessJobTransferMsg: read failed: %s.", strerror(ret));
00569       return ret;
00570    }
00571 
00572    // process service check request
00573    if (node || (ret = dnxGetNodeRequest(registrar, &node)) == 0)
00574       ret = dnxPostNewJob(joblist, job, node);
00575 
00576    if (ret != 0)
00577       dnxDebug(1, "ProcessJobTransferMsg: %s.", dnxErrorString(ret));
00578 
00579    return ret;
00580 }
00581 
00582 //----------------------------------------------------------------------------
00583 
00590 static int GetMsgHeader(DnxMsgHeader * phdr)
00591 {
00592    size_t toread = sizeof *phdr;
00593    char * bufptr = (char *)phdr;
00594    ssize_t bytesread;
00595 
00596    // read the message header from the plugin module
00597    while (toread > 0 && (bytesread = read(DNX_SRVFD, bufptr, toread)) > 0)
00598    {
00599       toread -= bytesread;
00600       bufptr += bytesread;
00601    }
00602 
00603    // check for closed pipe
00604    if (bytesread == 0)
00605       return EPIPE;
00606 
00607    // check for error
00608    if (bytesread < 0)
00609    {
00610       int ret = bytesread == 0? EPIPE: errno;
00611       dnxDebug(1, "GetMsgHeader: read failed: %s.", strerror(ret));
00612       return ret;
00613    }
00614 
00615    // sanity check message format
00616    if (phdr->signature != DNX_MSG_SIGNATURE
00617          || phdr->structsz > DNX_MAX_XFER_SIZE)
00618    {
00619       dnxDebug(1, "GetMsgHeader: unsupported message format.");
00620       return DNX_ERR_UNSUPPORTED;
00621    }
00622 
00623    return 0;
00624 }
00625 
00626 //----------------------------------------------------------------------------
00627 
00634 static int SendMsgResponse(int result)
00635 {
00636    ssize_t byteswritten;
00637    size_t towrite = sizeof result;
00638    char * bufptr = (char *)&result;
00639 
00640    // write ack/nak to plugin
00641    while (towrite > 0 && (byteswritten = write(DNX_SAKFD, bufptr, towrite)) > 0)
00642    {
00643       towrite -= byteswritten;
00644       bufptr += byteswritten;
00645    }
00646 
00647    // check for closed pipe
00648    if (byteswritten == 0)
00649       return EPIPE;
00650 
00651    // check for error
00652    if (byteswritten < 0)
00653    {
00654       int ret = byteswritten == 0? EPIPE: errno;
00655       dnxDebug(1, "SendMsgResponse: write failed: %s.", strerror(ret));
00656       return ret;
00657    }
00658 
00659    return 0;
00660 }
00661 
00662 //----------------------------------------------------------------------------
00663 
00672 static int processRequests()
00673 {
00674    DnxNodeRequest * node = 0;
00675    DnxJobTransfer * job = 0;
00676    size_t jobsz = 0;
00677    int terminating = 0;
00678 
00679    dnxLog("DNX Server awaiting service check requests...");
00680 
00681    while (!terminating)
00682    {
00683       int ret;
00684       DnxMsgHeader hdr;
00685       if ((ret = GetMsgHeader(&hdr)) == 0)
00686       {
00687          switch (hdr.msgtype)
00688          {
00689             case DNX_MSG_RESERVE_NODEREQ:
00690                ret = dnxGetNodeRequest(registrar, &node);
00691                break;
00692 
00693             case DNX_MSG_JOB_TRANSFER:
00694                // resize job transfer buffer if necessary
00695                if (hdr.structsz > jobsz)
00696                {
00697                   DnxJobTransfer * tmp;
00698                   if ((tmp = (DnxJobTransfer *)xrealloc(job, hdr.structsz)) != 0)
00699                   {
00700                      job = tmp;
00701                      jobsz = hdr.structsz;
00702                   }
00703                }
00704 
00705                // copy header and read rest of job transfer message
00706                job->hdr = hdr;
00707                if ((ret = ProcessJobTransferMsg(job, node)) == 0)
00708                   node = 0;
00709                break;
00710 
00711             case DNX_MSG_TERMINATE:
00712                terminating = 1;
00713                break;
00714          }
00715          SendMsgResponse(ret);
00716       }
00717       if (ret == EPIPE)
00718          terminating = 1;
00719    }
00720 
00721    dnxDebug(2, "Request loop terminating.");
00722 
00723    close(DNX_SRVFD);
00724    close(DNX_SAKFD);
00725 
00726    xfree(job);
00727    xfree(node);
00728    return 0;
00729 }
00730 
00731 //----------------------------------------------------------------------------
00732 
00737 static int dnxServerDeInit(void)
00738 {
00739    // ensure we don't attempt to destroy non-existent objects
00740 
00741    dnxReleaseAgent();
00742 
00743    if (registrar)
00744       dnxRegistrarDestroy(registrar);
00745 
00746    if (collector)
00747       dnxCollectorDestroy(collector);
00748 
00749    if (dispatcher)
00750       dnxDispatcherDestroy(dispatcher);
00751 
00752    if (joblist)
00753       dnxJobListDestroy(joblist);
00754 
00755    dnxChanMapRelease();
00756    dnxStatsCleanup();
00757 
00758    return 0;
00759 }
00760 
00761 //----------------------------------------------------------------------------
00762 
00767 static int dnxServerInit(void)
00768 {
00769    int ret, joblistsz;
00770 
00771    // clear globals so we know what to "undo" as we back out
00772    joblist = 0;
00773    registrar = 0;
00774    dispatcher = 0;
00775    collector = 0;
00776 
00777    // initialize the channel map
00778    if ((ret = dnxChanMapInit(0)) != 0)
00779    {
00780       dnxLog("Failed to initialize channel map: %s.", dnxErrorString(ret));
00781       return ret;
00782    }
00783 
00784    joblistsz = dnxCalculateJobListSize();
00785 
00786    dnxLog("Allocating %d service request slots in the DNX job list.", joblistsz);
00787 
00788    if ((ret = dnxJobListCreate(joblistsz, &joblist)) != 0)
00789    {
00790       dnxLog("Failed to initialize DNX job list with %d slots: %s.",
00791             joblistsz, dnxErrorString(ret));
00792       return ret;
00793    }
00794 
00795    // create and configure dispatcher
00796    if ((ret = dnxDispatcherCreate("Dispatch", cfg.dispatcherUrl, 
00797          joblist, &dispatcher)) != 0)
00798       return ret;
00799 
00800    // create and configure collector
00801    if ((ret = dnxCollectorCreate("Collect", cfg.collectorUrl,
00802          joblist, &collector)) != 0)
00803       return ret;
00804 
00805    // create worker node registrar
00806    if ((ret = dnxRegistrarCreate(dnxDispatcherGetChannel(dispatcher), 
00807          joblistsz * 2, &registrar)) != 0)
00808       return ret;
00809 
00810    // initialize server agent comm and thread
00811    if ((ret = dnxInitAgent(cfg.agentUrl)) != 0)
00812       return ret;
00813 
00814    dnxLog("Server initialization completed.");
00815 
00816    return 0;
00817 }
00818 
00823 static void sighandler(int sig)
00824 {
00825    switch(sig)
00826    {
00827       case SIGHUP: dnxAgentSignalReconfig(); break;
00828       break;
00829    }
00830 }
00831 
00832 /*--------------------------------------------------------------------------
00833                                  INTERFACE
00834   --------------------------------------------------------------------------*/
00835 
00836 char * versionText(void)
00837 {
00838    char buf[1024];
00839    snprintf(buf, sizeof(buf) - 1,
00840       "\n"
00841       "  %s Version " VERSION ", Built " __DATE__ " at " __TIME__ ".\n"
00842       "  Distributed Nagios eXecutor (DNX) Server Daemon.\n"
00843       "  Please report bugs to <" PACKAGE_BUGREPORT ">.\n"
00844       "\n"
00845       "  Default configuration:\n"
00846       "    Default config file: "      DNX_DEFAULT_SERVER_CONFIG_FILE "\n"
00847       "    Default log file: "         DNX_DEFAULT_LOG "\n"
00848       "    Default debug log file: "   DNX_DEFAULT_DBGLOG "\n"
00849 //    "    Compile flags: "            COMPILE_FLAGS "\n"
00850 #if DEBUG_HEAP
00851       "    Debug heap is ENABLED.\n"
00852 #endif
00853 #if DEBUG_LOCKS
00854       "    Debug locks are ENABLED.\n"
00855 #endif
00856       , s_progname
00857    );
00858    return xstrdup(buf);
00859 }
00860 
00861 //----------------------------------------------------------------------------
00862 
00863 void dnxReconfigure(void)
00864 {
00865    int ret;
00866    DnxCfgData * old;
00867 
00868    dnxLog("RECONFIGURE request received. Reconfiguring...");
00869 
00870    // reparse config file into temporary cfg structure and validate
00871    old = copyCfgData(&cfg);
00872    if ((ret = dnxCfgParserParse(parser, 0)) == 0)
00873       logGblConfigChanges(old, &cfg);
00874    if (old) freeCfgData(old);
00875 
00876    dnxLog("Reconfiguration: %s.", dnxErrorString(ret));
00877 }
00878 
00879 //----------------------------------------------------------------------------
00880 
00881 int dnxPostResult(void * payload, unsigned long serial, time_t start_time, 
00882       unsigned delta, int early_timeout, int res_code, char * res_data)
00883 {
00884    int ret = 0;
00885    DnxResultTransfer * result;
00886    ssize_t byteswritten;
00887    char * bufptr;
00888    size_t resultsz;
00889 
00890    dnxDebug(2, "Transferring result [%lu] to plugin: %s.", serial, res_data);
00891 
00892    // update stats
00893    if (early_timeout)
00894       dnxStatsInc(0, RESULTS_TIMED_OUT);
00895    else if (!res_code)
00896       dnxStatsInc(0, RESULTS_OK);
00897    else
00898       dnxStatsInc(0, RESULTS_FAILED);
00899 
00900    // allocate a results transfer buffer
00901    resultsz = sizeof(*result) + strlen(res_data);
00902    if ((result = (DnxResultTransfer *)xmalloc(resultsz)) == 0)
00903       return DNX_ERR_MEMORY;
00904 
00905    // fill in result transfer structure
00906    result->hdr.structsz = resultsz;
00907    result->hdr.signature = DNX_MSG_SIGNATURE;
00908    result->hdr.msgtype = DNX_MSG_RESULT_TRANSFER;
00909    result->serial = serial;
00910    result->payload = payload;
00911    result->start_time = start_time;
00912    result->delta = delta;
00913    result->early_timeout = early_timeout;
00914    result->res_code = res_code;
00915    strcpy(result->res_data, res_data);
00916 
00917    // write result to server
00918    bufptr = (char *)result;
00919    while (resultsz > 0 && (byteswritten = write(DNX_PLGFD, bufptr, resultsz)) > 0)
00920    {
00921       bufptr += byteswritten;
00922       resultsz -= byteswritten;
00923    }
00924 
00925    // check for error or broken pipe
00926    if (byteswritten <= 0)
00927       ret = byteswritten == 0? EPIPE: errno;
00928 
00929    xfree(result);
00930 
00931    // update stats
00932    if (ret == 0)
00933       dnxStatsInc(0, POST_RESULTS_OK);
00934    else
00935       dnxStatsInc(0, POST_RESULTS_FAILED);
00936 
00937    return ret;
00938 }
00939 
00940 //----------------------------------------------------------------------------
00941 
00942 void dnxJobCleanup(DnxNewJob * pJob)
00943 {
00944    if (pJob)
00945    {
00946       xfree(pJob->cmd);
00947       xfree(pJob->pNode);
00948    }
00949 }
00950 
00951 //----------------------------------------------------------------------------
00952 
00953 int dnxAuditJob(DnxNewJob * pJob, char * action)
00954 {
00955    if (cfg.auditFilePath)
00956    {
00957       struct sockaddr_in src_addr;
00958       in_addr_t addr;
00959 
00960       // Convert opaque Worker Node address to IPv4 address
00961 
00969       memcpy(&src_addr, pJob->pNode->address, sizeof(src_addr));
00970       addr = ntohl(src_addr.sin_addr.s_addr);
00971 
00972       return dnxAudit(
00973             "%s: Job %lu: Worker %s-%lx: %s",
00974                   action, pJob->xid.objSerial,
00975                   pJob->pNode->addrstr, 
00976                   pJob->pNode->xid.objSlot, pJob->cmd);
00977    }
00978    return DNX_OK;
00979 }
00980 
00981 //----------------------------------------------------------------------------
00982 
00991 int main(int argc, char ** argv)
00992 {
00993    int ret;
00994 
00995    // parse command line options; read configuration file
00996    if ((ret = getOptions(argc, argv)) != DNX_OK
00997          || (ret = initConfig(s_cfgfile)) != DNX_OK)
00998       goto e0;
00999 
01000    // initialize the logging subsystem with configured settings
01001    dnxLogInit(cfg.logFilePath, cfg.debugFilePath, cfg.auditFilePath, 
01002          &cfg.debugLevel);
01003 
01004    dnxLog("-------- DNX Server Version %s Startup --------", VERSION);
01005    dnxLog("Copyright (c) 2006-2010 Intellectual Reserve. All rights reserved.");
01006    dnxLog("Configuration file: %s.", s_cfgfile);
01007    dnxLog("Dispatcher: %s.", cfg.dispatcherUrl);
01008    dnxLog("Collector: %s.", cfg.collectorUrl);
01009    dnxLog("Agent: %s.", cfg.agentUrl);
01010    if (cfg.debugFilePath && cfg.debugLevel != 0)
01011    {
01012       dnxLog("Debug logging enabled at level %d to %s.",
01013             cfg.debugLevel, cfg.debugFilePath);
01014 #if DEBUG_HEAP
01015       dnxLog("Debug heap is enabled.");
01016 #endif
01017 #if DEBUG_LOCKS
01018       dnxLog("Debug locks are enabled.");
01019 #endif
01020    }
01021    if (cfg.auditFilePath)
01022       dnxLog("Auditing enabled to %s.", cfg.auditFilePath);
01023 
01024    start_time = time(0);
01025 
01026    // install signal handlers
01027    signal(SIGHUP,  sighandler);
01028    signal(SIGINT,  sighandler);
01029    signal(SIGQUIT, sighandler);
01030    signal(SIGABRT, sighandler);
01031    signal(SIGTERM, sighandler);
01032    signal(SIGPIPE, SIG_IGN);
01033    signal(SIGALRM, SIG_IGN);
01034    signal(SIGUSR1, sighandler);
01035    signal(SIGUSR2, SIG_IGN);
01036 
01037    if ((ret = dnxServerInit()) != 0)
01038       goto e2;
01039 
01040    //----------------------------------------------------------------------
01041    ret = processRequests();
01042    //----------------------------------------------------------------------
01043 
01044    dnxDebug(1, "Command-loop exited: %s.", dnxErrorString(ret));
01045 
01046 e2:dnxServerDeInit();
01047 e1:dnxCfgParserDestroy(parser);
01048 e0:dnxLog("-------- DNX Server Daemon Shutdown Complete --------");
01049 
01050    xheapchk();
01051    closelog();
01052 
01053    return ret;
01054 }
01055 
01056 /*--------------------------------------------------------------------------*/
01057 

Generated on Tue Apr 13 15:48:07 2010 for DNX by  doxygen 1.5.6