00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00028 #ifdef HAVE_CONFIG_H
00029 # include "config.h"
00030 #endif
00031
00032 #include "dnxAgent.h"
00033 #include "dnxServerMain.h"
00034 #include "dnxTransport.h"
00035 #include "dnxProtocol.h"
00036 #include "dnxCfgParser.h"
00037 #include "dnxError.h"
00038 #include "dnxDebug.h"
00039 #include "dnxLogging.h"
00040 #include "dnxStats.h"
00041
00042 #include <unistd.h>
00043 #include <pthread.h>
00044 #include <stdio.h>
00045 #include <stdlib.h>
00046 #include <stdarg.h>
00047
00048 #define elemcount(x) (sizeof(x)/sizeof(*(x)))
00049
00050
00051 static char * s_agentName = "Agent";
00052 static DnxChannel * s_agent;
00053 static pthread_t s_agentTid;
00054 static DnxCfgParser * s_parser;
00055 static int s_shutdown = 0;
00056 static int s_reconfig = 0;
00057
00058
00059
00060
00061
00066 static char * buildHelpReply(void)
00067 {
00068 static char * help =
00069 "DNX Server Management Commands:\n"
00070 " SHUTDOWN\n"
00071 " RECONFIGURE\n"
00072 " RESETSTATS\n"
00073 " GETSTATS stat-list\n"
00074 " GETNODESTATS ip-address stat-list\n"
00075 " ip-address is a dot-delimited IPv4 node address or DNS name.\n"
00076 " stat-list is a comma-delimited list of stat names:\n"
00077 " Server only:\n"
00078 " nodes_registered - total client nodes registered\n"
00079 " jobs_handled - jobs accepted from Nagios\n"
00080 " jobs_rejected_no_slots - jobs rejected due to no job list slots\n"
00081 " jobs_rejected_no_nodes - jobs rejected due to no node requests\n"
00082 " post_results_ok - results successfully posted to Nagios\n"
00083 " post_results_failed - results failing post to Nagios\n"
00084 " Server and individual nodes:\n"
00085 " requests_received - total requests received from all nodes\n"
00086 " requests_expired - total requests that have expired\n"
00087 " dispatches_ok - jobs successfully dispatched to nodes\n"
00088 " dispatches_failed - jobs failing dispatch to nodes\n"
00089 " results_ok - results returned with zero result code\n"
00090 " results_failed - results returned with non-zero result code\n"
00091 " results_timed_out - results timed out from assigned node\n"
00092 " Note: Stats are returned in the order they are requested.\n"
00093 " Unrecognized requested stats are returned as '?'.\n"
00094 " GETNODELIST\n"
00095 " GETCONFIG\n"
00096 " GETVERSION\n"
00097 " HELP";
00098 return xstrdup(help);
00099 }
00100
00101
00102
00113 static int appendString(char ** spp, char * fmt, ... )
00114 {
00115 char buf[1024];
00116 char * newstr;
00117 size_t strsz;
00118 va_list ap;
00119
00120
00121 va_start(ap, fmt);
00122 vsnprintf(buf, sizeof buf, fmt, ap);
00123 va_end(ap);
00124
00125
00126 strsz = strlen(buf) + 1;
00127 if ((newstr = xrealloc(*spp, (*spp? strlen(*spp): 0) + strsz)) == 0)
00128 return DNX_ERR_MEMORY;
00129 if (*spp == 0)
00130 *newstr = 0;
00131
00132
00133 strcat(newstr, buf);
00134 *spp = newstr;
00135 return 0;
00136 }
00137
00138
00139
00148 static int dnxCountNodes(DnxNodeData * node, void * data)
00149 {
00150
00151 if (node->stats[REQUESTS_RECEIVED] != 0)
00152 (*(int *)data)++;
00153 return 0;
00154 }
00155
00156
00157
00164 static char * buildMgmtStatsReply(char * req)
00165 {
00166 char * rsp = 0;
00167 int nodes_registered = 0;
00168 unsigned stats[STATS_COUNT];
00169
00170 struct { char * str; unsigned * stat; } rs[] =
00171 {
00172 { "nodes_registered", &nodes_registered },
00173 { "requests_received", &stats[REQUESTS_RECEIVED] },
00174 { "requests_expired", &stats[REQUESTS_EXPIRED] },
00175 { "dispatches_ok", &stats[DISPATCHES_OK] },
00176 { "dispatches_failed", &stats[DISPATCHES_FAILED] },
00177 { "results_ok", &stats[RESULTS_OK] },
00178 { "results_failed", &stats[RESULTS_FAILED] },
00179 { "results_timed_out", &stats[RESULTS_TIMED_OUT] },
00180 { "jobs_handled", &stats[JOBS_HANDLED] },
00181 { "jobs_rejected_no_slots", &stats[JOBS_REJECTED_NO_SLOTS] },
00182 { "jobs_rejected_no_nodes", &stats[JOBS_REJECTED_NO_NODES] },
00183 { "post_results_ok", &stats[POST_RESULTS_OK] },
00184 { "post_results_failed", &stats[POST_RESULTS_FAILED] },
00185 };
00186
00187 assert(req);
00188
00189
00190 dnxStatsForEachNode(dnxCountNodes, &nodes_registered);
00191
00192
00193 dnxStatsGetServerStats(stats);
00194
00195
00196 while (isspace(*req)) req++;
00197
00198 while (*req)
00199 {
00200 char * ep, * np;
00201 unsigned i;
00202
00203
00204 if ((np = strchr(req, ',')) == 0)
00205 np = req + strlen(req);
00206
00207
00208 ep = np;
00209 while (ep > req && isspace(ep[-1])) ep--;
00210
00211
00212 for (i = 0; i < elemcount(rs); i++)
00213 if (memcmp(req, rs[i].str, ep - req) == 0)
00214 {
00215 if (appendString(&rsp, "%u,", *rs[i].stat) != 0)
00216 return xfree(rsp), (char *)0;
00217 break;
00218 }
00219
00220
00221 if (i == elemcount(rs) && appendString(&rsp, "?,") != 0)
00222 return xfree(rsp), (char *)0;
00223
00224
00225 if (*(req = np)) req++;
00226
00227
00228 while (isspace(*req)) req++;
00229 }
00230
00231
00232 if (rsp)
00233 {
00234 size_t len = strlen(rsp);
00235 if (len && rsp[len - 1] == ',') rsp[len - 1] = 0;
00236 }
00237 return rsp;
00238 }
00239
00240
00241
00242 typedef struct MgmtNodeStatsData
00243 {
00244 char * stats;
00245 char * rsp;
00246 } MgmtNodeStatsData;
00247
00257 static int dnxBuildNodeStatsReply(DnxNodeData * node, void * data)
00258 {
00259 MgmtNodeStatsData * msd = (MgmtNodeStatsData *)data;
00260 char * req = msd->stats;
00261 char * rsp = 0;
00262
00263 struct { char * str; unsigned * stat; } rs[] =
00264 {
00265 { "requests_received", &node->stats[REQUESTS_RECEIVED] },
00266 { "requests_expired", &node->stats[REQUESTS_EXPIRED] },
00267 { "dispatches_ok", &node->stats[DISPATCHES_OK] },
00268 { "dispatches_failed", &node->stats[DISPATCHES_FAILED] },
00269 { "results_ok", &node->stats[RESULTS_OK] },
00270 { "results_failed", &node->stats[RESULTS_FAILED] },
00271 { "results_timed_out", &node->stats[RESULTS_TIMED_OUT] },
00272 };
00273
00274 msd->rsp = 0;
00275
00276
00277 if (node->stats[REQUESTS_RECEIVED] == 0)
00278 return DNX_OK;
00279
00280 while (*req)
00281 {
00282 char * ep, * np;
00283 unsigned i;
00284
00285
00286 if ((np = strchr(req, ',')) == 0)
00287 np = req + strlen(req);
00288
00289
00290 ep = np;
00291 while (ep > req && isspace(ep[-1])) ep--;
00292
00293
00294 for (i = 0; i < elemcount(rs); i++)
00295 if (memcmp(req, rs[i].str, ep - req) == 0)
00296 {
00297 if (appendString(&rsp, "%u,", *rs[i].stat) != 0)
00298 return xfree(rsp), DNX_ERR_MEMORY;
00299 break;
00300 }
00301
00302
00303 if (i == elemcount(rs) && appendString(&rsp, "?,") != 0)
00304 return xfree(rsp), DNX_ERR_MEMORY;
00305
00306
00307 if (*(req = np)) req++;
00308
00309
00310 while (isspace(*req)) req++;
00311 }
00312
00313
00314 if (rsp)
00315 {
00316 size_t len = strlen(rsp);
00317 if (len && rsp[len - 1] == ',') rsp[len - 1] = 0;
00318 }
00319
00320 msd->rsp = rsp;
00321 return DNX_OK;
00322 }
00323
00324
00325
00333 static char * buildMgmtNodeStatsReply(char * req)
00334 {
00335 MgmtNodeStatsData data;
00336
00337 assert(req);
00338
00339
00340 while (isspace(*req)) req++;
00341
00342
00343 if ((data.stats = strchr(req, ' ')) == 0)
00344 return xstrdup("");
00345
00346
00347 *data.stats++ = 0;
00348 while (isspace(*data.stats)) data.stats++;
00349
00350 dnxStatsForNodeByAddrStr(req, dnxBuildNodeStatsReply, &data);
00351
00352 return data.rsp? data.rsp: xstrdup("");
00353 }
00354
00355
00356
00365 static int dnxResetNodeStats(DnxNodeData * node, void * data)
00366 {
00367 memset(node->stats, 0, sizeof node->stats);
00368 return 0;
00369 }
00370
00371
00372
00381 static int dnxNodeListBuilder(DnxNodeData * node, void * data)
00382 {
00383
00384 return node->stats[REQUESTS_RECEIVED]?
00385 appendString((char **)data, "%s,", node->addrstr): 0;
00386 }
00387
00388
00389
00394 static char * buildMgmtNodeListReply(void)
00395 {
00396 char * rsp = 0;
00397
00398 dnxStatsForEachNode(dnxNodeListBuilder, &rsp);
00399
00400
00401 if (rsp)
00402 {
00403 size_t len = strlen(rsp);
00404 if (len && rsp[len - 1] == ',') rsp[len - 1] = 0;
00405 }
00406 return rsp? rsp: xstrdup("");
00407 }
00408
00409
00410
00415 static char * buildMgmtCfgReply(void)
00416 {
00417 char * buf;
00418 size_t bufsz = 0;
00419
00420
00421 if (dnxCfgParserGetCfg(s_parser, 0, &bufsz) != 0)
00422 return 0;
00423
00424
00425 if ((buf = (char *)xmalloc(bufsz)) != 0
00426 && dnxCfgParserGetCfg(s_parser, buf, &bufsz) != 0)
00427 xfree(buf), (buf = 0);
00428
00429 return buf;
00430 }
00431
00432
00433
00440 static void * dnxAgentServer(void * data)
00441 {
00442 int ret;
00443 DnxMgmtRequest Msg;
00444 Msg.action = 0;
00445
00446 dnxLog("DNX Server Agent awaiting commands...");
00447
00448 while (!s_shutdown)
00449 {
00450 memset(Msg.address, '\0', DNX_MAX_ADDRESS);
00451
00452
00453 if ((ret = dnxWaitForMgmtRequest(s_agent, &Msg, Msg.address, 2)) == DNX_OK)
00454 {
00455 DnxMgmtReply Rsp;
00456 char addrstr[DNX_MAX_ADDRSTR];
00457
00458 dnxDebug(2, "Received MgmtRequest from %s.",
00459 dnxNtop(Msg.address, addrstr, sizeof addrstr));
00460
00461
00462 Rsp.xid = Msg.xid;
00463 Rsp.status = DNX_REQ_ACK;
00464 Rsp.reply = 0;
00465
00466
00467 if (!strcmp(Msg.action, "RECONFIGURE"))
00468 {
00469 s_reconfig = 1;
00470 Rsp.reply = xstrdup("OK");
00471 }
00472 else if (!strcmp(Msg.action, "RESETSTATS"))
00473 {
00474 dnxStatsResetServerStats();
00475 dnxStatsForEachNode(dnxResetNodeStats, 0);
00476 Rsp.reply = xstrdup("OK");
00477 }
00478 else if (!strncmp(Msg.action, "GETSTATS ", 9))
00479 {
00480 if ((Rsp.reply = buildMgmtStatsReply(Msg.action + 9)) == 0)
00481 Rsp.status = DNX_REQ_NAK;
00482 }
00483 else if (!strncmp(Msg.action, "GETNODESTATS ", 13))
00484 {
00485 if ((Rsp.reply = buildMgmtNodeStatsReply(Msg.action + 13)) == 0)
00486 Rsp.status = DNX_REQ_NAK;
00487 }
00488 else if (!strcmp(Msg.action, "GETNODELIST"))
00489 {
00490 if ((Rsp.reply = buildMgmtNodeListReply()) == 0)
00491 Rsp.status = DNX_REQ_NAK;
00492 }
00493 else if (!strcmp(Msg.action, "GETCONFIG"))
00494 {
00495 if ((Rsp.reply = buildMgmtCfgReply()) == 0)
00496 Rsp.status = DNX_REQ_NAK;
00497 }
00498 else if (!strcmp(Msg.action, "GETVERSION"))
00499 {
00500 if ((Rsp.reply = versionText()) == 0)
00501 Rsp.status = DNX_REQ_NAK;
00502 }
00503 else if (!strcmp(Msg.action, "HELP"))
00504 {
00505 if ((Rsp.reply = buildHelpReply()) == 0)
00506 Rsp.status = DNX_REQ_NAK;
00507 }
00508
00509
00510 if ((ret = dnxSendMgmtReply(s_agent, &Rsp, Msg.address)) != 0)
00511 dnxLog("Agent response failure: %s.", dnxErrorString(ret));
00512
00513
00514 xfree(Rsp.reply);
00515 xfree(Msg.action);
00516 }
00517 else if (ret != DNX_ERR_TIMEOUT)
00518 dnxLog("Agent channel failure: %s.", dnxErrorString(ret));
00519
00520 if (s_reconfig)
00521 {
00522 dnxReconfigure();
00523 s_reconfig = 0;
00524 }
00525 }
00526
00527 dnxLog("Agent terminating...");
00528
00529 return 0;
00530 }
00531
00532
00533
00534
00535
00536 void dnxAgentSignalReconfig(void)
00537 {
00538 s_reconfig = 1;
00539 }
00540
00541 int dnxInitAgent(char * agentUrl, DnxCfgParser * parser)
00542 {
00543 int ret;
00544
00545 s_shutdown = 0;
00546 s_agentTid = 0;
00547 s_parser = parser;
00548
00549 if ((ret = dnxChanMapAdd(s_agentName, agentUrl)) != DNX_OK)
00550 dnxLog("AGENT channel init failed: %s.", dnxErrorString(ret));
00551 else if ((ret = dnxConnect(s_agentName, DNX_MODE_PASSIVE, &s_agent)) != DNX_OK)
00552 {
00553 dnxLog("AGENT channel connect failed: %s.", dnxErrorString(ret));
00554 dnxChanMapDelete(s_agentName);
00555 }
00556 else if ((ret = pthread_create(&s_agentTid, 0, dnxAgentServer, 0)) != 0)
00557 {
00558 dnxLog("AGENT server init failed: %s.", strerror(ret));
00559 dnxDisconnect(s_agent);
00560 dnxChanMapDelete(s_agentName);
00561 ret = DNX_ERR_THREAD;
00562 }
00563 return ret;
00564 }
00565
00566
00567
00568 void dnxReleaseAgent(void)
00569 {
00570 if (s_agentTid)
00571 {
00572 s_shutdown = 1;
00573 pthread_join(s_agentTid, 0);
00574 dnxDisconnect(s_agent);
00575 dnxChanMapDelete(s_agentName);
00576 }
00577 }
00578
00579
00580