00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00035 #ifdef HAVE_CONFIG_H
00036 # include "config.h"
00037 #else
00038 # define VERSION "<unknown>"
00039 #endif
00040
00041 #include "dnxServerMain.h"
00042 #include "dnxNebMain.h"
00043 #include "dnxError.h"
00044 #include "dnxDebug.h"
00045 #include "dnxCfgParser.h"
00046 #include "dnxLogging.h"
00047 #include "dnxCollector.h"
00048 #include "dnxDispatcher.h"
00049 #include "dnxRegistrar.h"
00050 #include "dnxJobList.h"
00051 #include "dnxStats.h"
00052 #include "dnxXml.h"
00053
00054 #if HAVE_GETOPT_LONG
00055 #define _GNU_SOURCE
00056 #include <getopt.h>
00057 #endif
00058
00059 #ifndef SYSCONFDIR
00060 # define SYSCONFDIR "/etc"
00061 #endif
00062
00063 #ifndef SYSLOGDIR
00064 # define SYSLOGDIR "/var/log"
00065 #endif
00066
00067 #ifndef COMPILE_FLAGS
00068 # define COMPILE_FLAGS "<unknown>"
00069 #endif
00070
00071 #include <sys/types.h>
00072 #include <sys/stat.h>
00073 #include <unistd.h>
00074 #include <stdio.h>
00075 #include <stdlib.h>
00076 #include <stdint.h>
00077 #include <stdarg.h>
00078 #include <signal.h>
00079 #include <fcntl.h>
00080 #include <netdb.h>
00081 #include <netinet/in.h>
00082
00083 #define elemcount(x) (sizeof(x)/sizeof(*(x)))
00084
00085 #define DNX_DEFAULT_SERVER_CONFIG_FILE SYSCONFDIR "/dnxServer.cfg"
00086 #define DNX_DEFAULT_LOG SYSLOGDIR "/dnxsrv.log"
00087 #define DNX_DEFAULT_DBGLOG SYSLOGDIR "/dnxsrv.dbg.log"
00088 #define DNX_DEFAULT_SERVER_PATH SYSBINDIR "/dnxServer"
00089
00091 typedef struct DnxCfgData
00092 {
00093 char * dispatcherUrl;
00094 char * collectorUrl;
00095 char * agentUrl;
00096 char * authWorkerNodes;
00097 unsigned maxNodeRequests;
00098 unsigned minServiceSlots;
00099 unsigned expirePollInterval;
00100 char * logFilePath;
00101 char * debugFilePath;
00102 unsigned debugLevel;
00103 char * auditFilePath;
00104 } DnxCfgData;
00105
00106
00107 static DnxCfgData cfg;
00108 static DnxCfgParser * parser;
00109 static DnxJobList * joblist;
00110 static DnxRegistrar * registrar;
00111 static DnxDispatcher * dispatcher;
00112 static DnxCollector * collector;
00113 static time_t start_time;
00114 static char * s_cfgfile;
00115 static int s_queuesize;
00116 static char * s_progname;
00117 static char * s_cmdover = 0;
00118
00119
00120
00121
00122
00127 static void version(FILE * fp)
00128 {
00129 char * vertxt = versionText();
00130 if (vertxt)
00131 {
00132 fprintf(fp, "%s\n", vertxt);
00133 xfree(vertxt);
00134 vertxt = 0;
00135 }
00136 }
00137
00138
00139
00142 static void usage(void)
00143 {
00144
00145 #if HAVE_GETOPT_LONG
00146 # define OL_CFGFILE ", --cfgfile "
00147 # define OL_QUEUESIZE ", --queuesize"
00148 # define OL_VERSION ", --version "
00149 # define OL_HELP ", --help "
00150 #else
00151 # define OL_CFGFILE
00152 # define OL_QUEUESIZE
00153 # define OL_VERSION
00154 # define OL_HELP
00155 #endif
00156
00157 version(stderr);
00158 fprintf(stderr,
00159 " Usage: %s [options]\n"
00160 " Where [options] are:\n"
00161 " -c" OL_CFGFILE " <file> specify the file and path of the config file.\n"
00162 " -j" OL_QUEUESIZE " <num> specify the size of the Nagios service queue.\n"
00163 " -v" OL_VERSION " display DNX server version and exit.\n"
00164 " -h" OL_HELP " display this help screen and exit.\n"
00165 "\n",
00166 s_progname
00167 );
00168 exit(-1);
00169 }
00170
00171
00172
00180 static int getOptions(int argc, char ** argv)
00181 {
00182
00183 extern char * optarg;
00184 extern int opterr, optopt;
00185
00186 static char opts[] = "c:j:vh";
00187
00188 #if HAVE_GETOPT_LONG
00189 static struct option longopts[] =
00190 {
00191 { "cfgfile", required_argument, 0, 'c' },
00192 { "queuesize",required_argument, 0, 'j' },
00193 { "version", no_argument, 0, 'v' },
00194 { "help", no_argument, 0, 'h' },
00195 { 0, 0, 0, 0 },
00196 };
00197 #endif
00198
00199 int ch;
00200 char * cp;
00201
00202
00203 s_progname = (char *)((cp = strrchr(argv[0], '/')) != 0 ? (cp + 1) : argv[0]);
00204
00205 opterr = 0;
00206
00207 #if HAVE_GETOPT_LONG
00208 while ((ch = getopt_long(argc, argv, opts, longopts, 0)) != -1)
00209 #else
00210 while ((ch = getopt(argc, argv, opts)) != -1)
00211 #endif
00212 {
00213 switch (ch)
00214 {
00215 case 'c': s_cfgfile = optarg; break;
00216 case 'j': s_queuesize = atoi(optarg); break;
00217 case 'v': version(stdout); exit(0);
00218 case 'h':
00219 default : usage();
00220 }
00221 }
00222
00223 if (!s_cfgfile)
00224 s_cfgfile = DNX_DEFAULT_SERVER_CONFIG_FILE;
00225
00226 return 0;
00227 }
00228
00229
00230
00243 static int validateCfg(DnxCfgDict * dict, void ** vptrs, void * passthru)
00244 {
00245 int err, ret = DNX_ERR_INVALID;
00246 DnxCfgData cfg;
00247
00248 assert(dict && vptrs);
00249
00250
00251 cfg.dispatcherUrl = (char *)vptrs[ 0];
00252 cfg.collectorUrl = (char *)vptrs[ 1];
00253 cfg.agentUrl = (char *)vptrs[ 2];
00254 cfg.authWorkerNodes = (char *)vptrs[ 3];
00255 cfg.maxNodeRequests = (unsigned)(intptr_t)vptrs[ 4];
00256 cfg.minServiceSlots = (unsigned)(intptr_t)vptrs[ 5];
00257 cfg.expirePollInterval = (unsigned)(intptr_t)vptrs[ 6];
00258 cfg.logFilePath = (char *)vptrs[ 7];
00259 cfg.debugFilePath = (char *)vptrs[ 8];
00260 cfg.debugLevel = (unsigned)(intptr_t)vptrs[ 9];
00261 cfg.auditFilePath = (char *)vptrs[10];
00262
00263
00264 if (!cfg.dispatcherUrl)
00265 dnxLog("config: Missing channelDispatcher parameter.");
00266 else if (!cfg.collectorUrl)
00267 dnxLog("config: Missing channelCollector parameter.");
00268 else if (!cfg.agentUrl)
00269 dnxLog("config: Missing channelAgent parameter.");
00270 else if (cfg.maxNodeRequests < 1)
00271 dnxLog("config: Invalid maxNodeRequests parameter.");
00272 else if (cfg.minServiceSlots < 1)
00273 dnxLog("config: Invalid minServiceSlots parameter.");
00274 else if (cfg.expirePollInterval < 1)
00275 dnxLog("config: Invalid expirePollInterval parameter.");
00276 else
00277 ret = 0;
00278
00279 return ret;
00280 }
00281
00282
00283
00290 static int initConfig(char * cfgfile)
00291 {
00292 DnxCfgDict dict[] =
00293 {
00294 { "channelDispatcher", DNX_CFG_URL, &cfg.dispatcherUrl },
00295 { "channelCollector", DNX_CFG_URL, &cfg.collectorUrl },
00296 { "channelAgent", DNX_CFG_URL, &cfg.agentUrl },
00297 { "authWorkerNodes", DNX_CFG_STRING, &cfg.authWorkerNodes },
00298 { "maxNodeRequests", DNX_CFG_UNSIGNED, &cfg.maxNodeRequests },
00299 { "minServiceSlots", DNX_CFG_UNSIGNED, &cfg.minServiceSlots },
00300 { "expirePollInterval", DNX_CFG_UNSIGNED, &cfg.expirePollInterval },
00301 { "logFile", DNX_CFG_FSPATH, &cfg.logFilePath },
00302 { "debugFile", DNX_CFG_FSPATH, &cfg.debugFilePath },
00303 { "debugLevel", DNX_CFG_UNSIGNED, &cfg.debugLevel },
00304 { "auditFile", DNX_CFG_FSPATH, &cfg.auditFilePath },
00305 { 0 },
00306 };
00307 char cfgdefs[] =
00308 "channelDispatcher = udp://0:12480\n"
00309 "channelCollector = udp://0:12481\n"
00310 "channelAgent = udp://0:12482\n"
00311 "maxNodeRequests = 0x7FFFFFFF\n"
00312 "minServiceSlots = 100\n"
00313 "expirePollInterval = 5\n"
00314 "logFile = " DNX_DEFAULT_LOG "\n"
00315 "debugFile = " DNX_DEFAULT_DBGLOG "\n";
00316
00317 int ret;
00318
00319
00320 if ((ret = dnxCfgParserCreate(cfgdefs, cfgfile, s_cmdover, dict,
00321 validateCfg, &parser)) != 0)
00322 return ret;
00323
00324
00325 if ((ret = dnxCfgParserParse(parser, 0)) != 0)
00326 dnxCfgParserDestroy(parser);
00327
00328 return ret;
00329 }
00330
00331
00332
00337 static void freeCfgData(DnxCfgData * cpy)
00338 {
00339 xfree(cpy->dispatcherUrl);
00340 xfree(cpy->collectorUrl);
00341 xfree(cpy->agentUrl);
00342 xfree(cpy->authWorkerNodes);
00343 xfree(cpy->logFilePath);
00344 xfree(cpy->debugFilePath);
00345 xfree(cpy->auditFilePath);
00346 xfree(cpy);
00347 cpy = 0;
00348 }
00349
00350
00351
00358 static DnxCfgData * copyCfgData(DnxCfgData * org)
00359 {
00360 DnxCfgData * cpy;
00361
00362
00363 if ((cpy = (DnxCfgData *)xmalloc(sizeof *cpy)) == 0)
00364 return 0;
00365
00366
00367 *cpy = *org;
00368
00369
00370 cpy->dispatcherUrl = xstrdup(org->dispatcherUrl);
00371 cpy->collectorUrl = xstrdup(org->collectorUrl);
00372 cpy->agentUrl = xstrdup(org->agentUrl);
00373 cpy->authWorkerNodes = xstrdup(org->authWorkerNodes);
00374 cpy->maxNodeRequests = org->maxNodeRequests;
00375 cpy->minServiceSlots = org->minServiceSlots;
00376 cpy->expirePollInterval = org->expirePollInterval;
00377 cpy->logFilePath = xstrdup(org->logFilePath);
00378 cpy->debugFilePath = xstrdup(org->debugFilePath);
00379 cpy->debugLevel = org->debugLevel;
00380 cpy->auditFilePath = xstrdup(org->auditFilePath);
00381
00382 return cpy;
00383 }
00384
00385
00386
00395 static void logGblConfigChanges(DnxCfgData * ocp, DnxCfgData * ncp)
00396 {
00397 if (strcmp(ocp->dispatcherUrl, ncp->dispatcherUrl) != 0)
00398 dnxLog("Config parameter 'channelDispatcher' changed from %s to %s. "
00399 "NOTE: Changing the Dispatcher URL requires a restart.",
00400 ocp? ocp->dispatcherUrl: "<unknown>", ncp->dispatcherUrl);
00401
00402 if (strcmp(ocp->collectorUrl, ncp->collectorUrl) != 0)
00403 dnxLog("Config parameter 'channelCollector' changed from %s to %s. "
00404 "NOTE: Changing the Collector URL requires a restart.",
00405 ocp? ocp->collectorUrl: "<unknown>", ncp->collectorUrl);
00406
00407 if (strcmp(ocp->agentUrl, ncp->agentUrl) != 0)
00408 dnxLog("Config parameter 'channelAgent' changed from %s to %s. "
00409 "NOTE: Changing the Agent URL requires a restart.",
00410 ocp? ocp->agentUrl: "<unknown>", ncp->agentUrl);
00411
00412 if (strcmp(ocp->authWorkerNodes, ncp->authWorkerNodes) != 0)
00413 dnxLog("Config parameter 'authWorkerNodes' changed from %s to %s. ",
00414 ocp? ocp->authWorkerNodes: "<unknown>", ncp->authWorkerNodes);
00415
00416 if (ocp->maxNodeRequests != ncp->maxNodeRequests)
00417 dnxLog("Config parameter 'maxNodeRequests' changed from %u to %u. "
00418 "NOTE: Changing the maximum node requests requires a restart.",
00419 ocp? ocp->maxNodeRequests: 0, ncp->maxNodeRequests);
00420
00421 if (ocp->minServiceSlots != ncp->minServiceSlots)
00422 dnxLog("Config parameter 'minServiceSlots' changed from %u to %u. "
00423 "NOTE: Changing the minimum service requests requires a restart.",
00424 ocp? ocp->minServiceSlots: 0, ncp->minServiceSlots);
00425
00426 if (ocp->expirePollInterval != ncp->expirePollInterval)
00427 dnxLog("Config parameter 'expirePollInterval' changed from %u to %u. ",
00428 ocp? ocp->expirePollInterval: 0, ncp->expirePollInterval);
00429
00430 if (strcmp(ocp->logFilePath, ncp->logFilePath) != 0)
00431 dnxLog("Config parameter 'logFile' changed from %s to %s. "
00432 "NOTE: Changing the server log file path requires a restart.",
00433 ocp? ocp->logFilePath: "<unknown>", ncp->logFilePath);
00434
00435 if (strcmp(ocp->debugFilePath, ncp->debugFilePath) != 0)
00436 dnxLog("Config parameter 'debugFile' changed from %s to %s. "
00437 "NOTE: Changing the server debug log file path requires a restart.",
00438 ocp? ocp->debugFilePath: "<unknown>", ncp->debugFilePath);
00439
00440 if (ocp->debugLevel != ncp->debugLevel)
00441 dnxLog("Config parameter 'debugLevel' changed from %u to %u.",
00442 ocp? ocp->debugLevel: 0, ncp->debugLevel);
00443
00444 if (strcmp(ocp->auditFilePath, ncp->auditFilePath) != 0)
00445 dnxLog("Config parameter 'auditFile' changed from %s to %s. "
00446 "NOTE: Changing the audit log file path requires a restart.",
00447 ocp? ocp->auditFilePath: "<unknown>", ncp->auditFilePath);
00448 }
00449
00450
00451
00460 static int dnxCalculateJobListSize(void)
00461 {
00462 int size = s_queuesize;
00463
00464
00465 if (size < 1)
00466 {
00467 size = 100;
00468 dnxLog("No Nagios services defined! "
00469 "Defaulting to %d slots in the DNX job queue.", size);
00470 }
00471
00472
00473 if (size < cfg.minServiceSlots)
00474 {
00475 dnxLog("Overriding requested service check slot count. "
00476 "Increasing from %d to configured minimum: %d.",
00477 size, cfg.minServiceSlots);
00478 size = cfg.minServiceSlots;
00479 }
00480
00481
00482 if (size > cfg.maxNodeRequests)
00483 {
00484 dnxLog("Overriding requested service check slot count. "
00485 "Decreasing from %d to configured maximum: %d.", size,
00486 cfg.maxNodeRequests);
00487 size = cfg.maxNodeRequests;
00488 }
00489 return size;
00490 }
00491
00492
00493
00504 static int dnxPostNewJob(DnxJobList * joblist, DnxJobTransfer * job,
00505 DnxNodeRequest * pNode)
00506 {
00507 int ret;
00508 DnxNewJob Job;
00509
00510
00511 dnxMakeXID(&Job.xid, DNX_OBJ_JOB, job->serial, 0);
00512 Job.cmd = xstrdup(job->cmd);
00513 Job.start_time = job->start_time;
00514 Job.timeout = job->timeout;
00515 Job.expires = job->expires;
00516 Job.payload = job->payload;
00517 Job.pNode = pNode;
00518
00519 dnxDebug(2, "Posting New Job [%lu]: %s.", job->serial, job->cmd);
00520
00521
00522 if ((ret = dnxJobListAdd(joblist, &Job)) != DNX_OK)
00523 {
00524 dnxStatsInc(0, JOBS_REJECTED_NO_SLOTS);
00525 dnxLog("Failed to post Job [%lu]; \"%s\": %d.",
00526 Job.xid.objSerial, Job.cmd, ret);
00527 }
00528 else
00529 {
00530 dnxStatsInc(0, JOBS_HANDLED);
00531 dnxAuditJob(&Job, "ASSIGN");
00532 }
00533 return ret;
00534 }
00535
00536
00537
00546 static int ProcessJobTransferMsg(DnxJobTransfer * job, DnxNodeRequest * node)
00547 {
00548 size_t toread = job->hdr.structsz - sizeof job->hdr;
00549 char * bufptr = (char *)job + sizeof job->hdr;
00550 ssize_t bytesread;
00551 int ret;
00552
00553
00554 while (toread > 0 && (bytesread = read(DNX_SRVFD, bufptr, toread)) > 0)
00555 {
00556 bufptr += bytesread;
00557 toread -= bytesread;
00558 }
00559
00560
00561 if (bytesread == 0)
00562 return EPIPE;
00563
00564
00565 if (bytesread < 0)
00566 {
00567 ret = errno;
00568 dnxDebug(1, "ProcessJobTransferMsg: read failed: %s.", strerror(ret));
00569 return ret;
00570 }
00571
00572
00573 if (node || (ret = dnxGetNodeRequest(registrar, &node)) == 0)
00574 ret = dnxPostNewJob(joblist, job, node);
00575
00576 if (ret != 0)
00577 dnxDebug(1, "ProcessJobTransferMsg: %s.", dnxErrorString(ret));
00578
00579 return ret;
00580 }
00581
00582
00583
00590 static int GetMsgHeader(DnxMsgHeader * phdr)
00591 {
00592 size_t toread = sizeof *phdr;
00593 char * bufptr = (char *)phdr;
00594 ssize_t bytesread;
00595
00596
00597 while (toread > 0 && (bytesread = read(DNX_SRVFD, bufptr, toread)) > 0)
00598 {
00599 toread -= bytesread;
00600 bufptr += bytesread;
00601 }
00602
00603
00604 if (bytesread == 0)
00605 return EPIPE;
00606
00607
00608 if (bytesread < 0)
00609 {
00610 int ret = bytesread == 0? EPIPE: errno;
00611 dnxDebug(1, "GetMsgHeader: read failed: %s.", strerror(ret));
00612 return ret;
00613 }
00614
00615
00616 if (phdr->signature != DNX_MSG_SIGNATURE
00617 || phdr->structsz > DNX_MAX_XFER_SIZE)
00618 {
00619 dnxDebug(1, "GetMsgHeader: unsupported message format.");
00620 return DNX_ERR_UNSUPPORTED;
00621 }
00622
00623 return 0;
00624 }
00625
00626
00627
00634 static int SendMsgResponse(int result)
00635 {
00636 ssize_t byteswritten;
00637 size_t towrite = sizeof result;
00638 char * bufptr = (char *)&result;
00639
00640
00641 while (towrite > 0 && (byteswritten = write(DNX_SAKFD, bufptr, towrite)) > 0)
00642 {
00643 towrite -= byteswritten;
00644 bufptr += byteswritten;
00645 }
00646
00647
00648 if (byteswritten == 0)
00649 return EPIPE;
00650
00651
00652 if (byteswritten < 0)
00653 {
00654 int ret = byteswritten == 0? EPIPE: errno;
00655 dnxDebug(1, "SendMsgResponse: write failed: %s.", strerror(ret));
00656 return ret;
00657 }
00658
00659 return 0;
00660 }
00661
00662
00663
00672 static int processRequests()
00673 {
00674 DnxNodeRequest * node = 0;
00675 DnxJobTransfer * job = 0;
00676 size_t jobsz = 0;
00677 int terminating = 0;
00678
00679 dnxLog("DNX Server awaiting service check requests...");
00680
00681 while (!terminating)
00682 {
00683 int ret;
00684 DnxMsgHeader hdr;
00685 if ((ret = GetMsgHeader(&hdr)) == 0)
00686 {
00687 switch (hdr.msgtype)
00688 {
00689 case DNX_MSG_RESERVE_NODEREQ:
00690 ret = dnxGetNodeRequest(registrar, &node);
00691 break;
00692
00693 case DNX_MSG_JOB_TRANSFER:
00694
00695 if (hdr.structsz > jobsz)
00696 {
00697 DnxJobTransfer * tmp;
00698 if ((tmp = (DnxJobTransfer *)xrealloc(job, hdr.structsz)) != 0)
00699 {
00700 job = tmp;
00701 jobsz = hdr.structsz;
00702 }
00703 }
00704
00705
00706 job->hdr = hdr;
00707 if ((ret = ProcessJobTransferMsg(job, node)) == 0)
00708 node = 0;
00709 break;
00710
00711 case DNX_MSG_TERMINATE:
00712 terminating = 1;
00713 break;
00714 }
00715 SendMsgResponse(ret);
00716 }
00717 if (ret == EPIPE)
00718 terminating = 1;
00719 }
00720
00721 dnxDebug(2, "Request loop terminating.");
00722
00723 close(DNX_SRVFD);
00724 close(DNX_SAKFD);
00725
00726 xfree(job);
00727 xfree(node);
00728 return 0;
00729 }
00730
00731
00732
00737 static int dnxServerDeInit(void)
00738 {
00739
00740
00741 dnxReleaseAgent();
00742
00743 if (registrar)
00744 dnxRegistrarDestroy(registrar);
00745
00746 if (collector)
00747 dnxCollectorDestroy(collector);
00748
00749 if (dispatcher)
00750 dnxDispatcherDestroy(dispatcher);
00751
00752 if (joblist)
00753 dnxJobListDestroy(joblist);
00754
00755 dnxChanMapRelease();
00756 dnxStatsCleanup();
00757
00758 return 0;
00759 }
00760
00761
00762
00767 static int dnxServerInit(void)
00768 {
00769 int ret, joblistsz;
00770
00771
00772 joblist = 0;
00773 registrar = 0;
00774 dispatcher = 0;
00775 collector = 0;
00776
00777
00778 if ((ret = dnxChanMapInit(0)) != 0)
00779 {
00780 dnxLog("Failed to initialize channel map: %s.", dnxErrorString(ret));
00781 return ret;
00782 }
00783
00784 joblistsz = dnxCalculateJobListSize();
00785
00786 dnxLog("Allocating %d service request slots in the DNX job list.", joblistsz);
00787
00788 if ((ret = dnxJobListCreate(joblistsz, &joblist)) != 0)
00789 {
00790 dnxLog("Failed to initialize DNX job list with %d slots: %s.",
00791 joblistsz, dnxErrorString(ret));
00792 return ret;
00793 }
00794
00795
00796 if ((ret = dnxDispatcherCreate("Dispatch", cfg.dispatcherUrl,
00797 joblist, &dispatcher)) != 0)
00798 return ret;
00799
00800
00801 if ((ret = dnxCollectorCreate("Collect", cfg.collectorUrl,
00802 joblist, &collector)) != 0)
00803 return ret;
00804
00805
00806 if ((ret = dnxRegistrarCreate(dnxDispatcherGetChannel(dispatcher),
00807 joblistsz * 2, ®istrar)) != 0)
00808 return ret;
00809
00810
00811 if ((ret = dnxInitAgent(cfg.agentUrl)) != 0)
00812 return ret;
00813
00814 dnxLog("Server initialization completed.");
00815
00816 return 0;
00817 }
00818
00823 static void sighandler(int sig)
00824 {
00825 switch(sig)
00826 {
00827 case SIGHUP: dnxAgentSignalReconfig(); break;
00828 break;
00829 }
00830 }
00831
00832
00833
00834
00835
00836 char * versionText(void)
00837 {
00838 char buf[1024];
00839 snprintf(buf, sizeof(buf) - 1,
00840 "\n"
00841 " %s Version " VERSION ", Built " __DATE__ " at " __TIME__ ".\n"
00842 " Distributed Nagios eXecutor (DNX) Server Daemon.\n"
00843 " Please report bugs to <" PACKAGE_BUGREPORT ">.\n"
00844 "\n"
00845 " Default configuration:\n"
00846 " Default config file: " DNX_DEFAULT_SERVER_CONFIG_FILE "\n"
00847 " Default log file: " DNX_DEFAULT_LOG "\n"
00848 " Default debug log file: " DNX_DEFAULT_DBGLOG "\n"
00849
00850 #if DEBUG_HEAP
00851 " Debug heap is ENABLED.\n"
00852 #endif
00853 #if DEBUG_LOCKS
00854 " Debug locks are ENABLED.\n"
00855 #endif
00856 , s_progname
00857 );
00858 return xstrdup(buf);
00859 }
00860
00861
00862
00863 void dnxReconfigure(void)
00864 {
00865 int ret;
00866 DnxCfgData * old;
00867
00868 dnxLog("RECONFIGURE request received. Reconfiguring...");
00869
00870
00871 old = copyCfgData(&cfg);
00872 if ((ret = dnxCfgParserParse(parser, 0)) == 0)
00873 logGblConfigChanges(old, &cfg);
00874 if (old) freeCfgData(old);
00875
00876 dnxLog("Reconfiguration: %s.", dnxErrorString(ret));
00877 }
00878
00879
00880
00881 int dnxPostResult(void * payload, unsigned long serial, time_t start_time,
00882 unsigned delta, int early_timeout, int res_code, char * res_data)
00883 {
00884 int ret = 0;
00885 DnxResultTransfer * result;
00886 ssize_t byteswritten;
00887 char * bufptr;
00888 size_t resultsz;
00889
00890 dnxDebug(2, "Transferring result [%lu] to plugin: %s.", serial, res_data);
00891
00892
00893 if (early_timeout)
00894 dnxStatsInc(0, RESULTS_TIMED_OUT);
00895 else if (!res_code)
00896 dnxStatsInc(0, RESULTS_OK);
00897 else
00898 dnxStatsInc(0, RESULTS_FAILED);
00899
00900
00901 resultsz = sizeof(*result) + strlen(res_data);
00902 if ((result = (DnxResultTransfer *)xmalloc(resultsz)) == 0)
00903 return DNX_ERR_MEMORY;
00904
00905
00906 result->hdr.structsz = resultsz;
00907 result->hdr.signature = DNX_MSG_SIGNATURE;
00908 result->hdr.msgtype = DNX_MSG_RESULT_TRANSFER;
00909 result->serial = serial;
00910 result->payload = payload;
00911 result->start_time = start_time;
00912 result->delta = delta;
00913 result->early_timeout = early_timeout;
00914 result->res_code = res_code;
00915 strcpy(result->res_data, res_data);
00916
00917
00918 bufptr = (char *)result;
00919 while (resultsz > 0 && (byteswritten = write(DNX_PLGFD, bufptr, resultsz)) > 0)
00920 {
00921 bufptr += byteswritten;
00922 resultsz -= byteswritten;
00923 }
00924
00925
00926 if (byteswritten <= 0)
00927 ret = byteswritten == 0? EPIPE: errno;
00928
00929 xfree(result);
00930
00931
00932 if (ret == 0)
00933 dnxStatsInc(0, POST_RESULTS_OK);
00934 else
00935 dnxStatsInc(0, POST_RESULTS_FAILED);
00936
00937 return ret;
00938 }
00939
00940
00941
00942 void dnxJobCleanup(DnxNewJob * pJob)
00943 {
00944 if (pJob)
00945 {
00946 xfree(pJob->cmd);
00947 xfree(pJob->pNode);
00948 }
00949 }
00950
00951
00952
00953 int dnxAuditJob(DnxNewJob * pJob, char * action)
00954 {
00955 if (cfg.auditFilePath)
00956 {
00957 struct sockaddr_in src_addr;
00958 in_addr_t addr;
00959
00960
00961
00969 memcpy(&src_addr, pJob->pNode->address, sizeof(src_addr));
00970 addr = ntohl(src_addr.sin_addr.s_addr);
00971
00972 return dnxAudit(
00973 "%s: Job %lu: Worker %s-%lx: %s",
00974 action, pJob->xid.objSerial,
00975 pJob->pNode->addrstr,
00976 pJob->pNode->xid.objSlot, pJob->cmd);
00977 }
00978 return DNX_OK;
00979 }
00980
00981
00982
00991 int main(int argc, char ** argv)
00992 {
00993 int ret;
00994
00995
00996 if ((ret = getOptions(argc, argv)) != DNX_OK
00997 || (ret = initConfig(s_cfgfile)) != DNX_OK)
00998 goto e0;
00999
01000
01001 dnxLogInit(cfg.logFilePath, cfg.debugFilePath, cfg.auditFilePath,
01002 &cfg.debugLevel);
01003
01004 dnxLog("-------- DNX Server Version %s Startup --------", VERSION);
01005 dnxLog("Copyright (c) 2006-2010 Intellectual Reserve. All rights reserved.");
01006 dnxLog("Configuration file: %s.", s_cfgfile);
01007 dnxLog("Dispatcher: %s.", cfg.dispatcherUrl);
01008 dnxLog("Collector: %s.", cfg.collectorUrl);
01009 dnxLog("Agent: %s.", cfg.agentUrl);
01010 if (cfg.debugFilePath && cfg.debugLevel != 0)
01011 {
01012 dnxLog("Debug logging enabled at level %d to %s.",
01013 cfg.debugLevel, cfg.debugFilePath);
01014 #if DEBUG_HEAP
01015 dnxLog("Debug heap is enabled.");
01016 #endif
01017 #if DEBUG_LOCKS
01018 dnxLog("Debug locks are enabled.");
01019 #endif
01020 }
01021 if (cfg.auditFilePath)
01022 dnxLog("Auditing enabled to %s.", cfg.auditFilePath);
01023
01024 start_time = time(0);
01025
01026
01027 signal(SIGHUP, sighandler);
01028 signal(SIGINT, sighandler);
01029 signal(SIGQUIT, sighandler);
01030 signal(SIGABRT, sighandler);
01031 signal(SIGTERM, sighandler);
01032 signal(SIGPIPE, SIG_IGN);
01033 signal(SIGALRM, SIG_IGN);
01034 signal(SIGUSR1, sighandler);
01035 signal(SIGUSR2, SIG_IGN);
01036
01037 if ((ret = dnxServerInit()) != 0)
01038 goto e2;
01039
01040
01041 ret = processRequests();
01042
01043
01044 dnxDebug(1, "Command-loop exited: %s.", dnxErrorString(ret));
01045
01046 e2:dnxServerDeInit();
01047 e1:dnxCfgParserDestroy(parser);
01048 e0:dnxLog("-------- DNX Server Daemon Shutdown Complete --------");
01049
01050 xheapchk();
01051 closelog();
01052
01053 return ret;
01054 }
01055
01056
01057