dnxAgent.c

Go to the documentation of this file.
00001 /*--------------------------------------------------------------------------
00002 
00003    Copyright (c) 2006-2010, Intellectual Reserve, Inc. All rights reserved.
00004 
00005    This program is free software; you can redistribute it and/or modify
00006    it under the terms of the GNU General Public License version 2 as
00007    published by the Free Software Foundation.
00008 
00009    This program is distributed in the hope that it will be useful,
00010    but WITHOUT ANY WARRANTY; without even the implied warranty of
00011    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00012    GNU General Public License for more details.
00013 
00014    You should have received a copy of the GNU General Public License
00015    along with this program; if not, write to the Free Software
00016    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00017 
00018   --------------------------------------------------------------------------*/
00019 
00028 #ifdef HAVE_CONFIG_H
00029 # include "config.h"
00030 #endif
00031 
00032 #include "dnxAgent.h"
00033 #include "dnxServerMain.h"
00034 #include "dnxTransport.h"
00035 #include "dnxProtocol.h"
00036 #include "dnxCfgParser.h"
00037 #include "dnxError.h"
00038 #include "dnxDebug.h"
00039 #include "dnxLogging.h"
00040 #include "dnxStats.h"
00041 
00042 #include <unistd.h>
00043 #include <pthread.h>
00044 #include <stdio.h>
00045 #include <stdlib.h>
00046 #include <stdarg.h>
00047 
00048 #define elemcount(x) (sizeof(x)/sizeof(*(x)))
00049 
00050 // module static data
00051 static char * s_agentName = "Agent";   
00052 static DnxChannel * s_agent;           
00053 static pthread_t s_agentTid;           
00054 static DnxCfgParser * s_parser;        
00055 static int s_shutdown = 0;             
00056 static int s_reconfig = 0;             
00057 
00058 /*--------------------------------------------------------------------------
00059                               IMPLEMENTATION
00060   --------------------------------------------------------------------------*/
00061 
00066 static char * buildHelpReply(void)
00067 {
00068    static char * help =
00069          "DNX Server Management Commands:\n"
00070          "  SHUTDOWN\n"
00071          "  RECONFIGURE\n"
00072          "  RESETSTATS\n"
00073          "  GETSTATS stat-list\n"
00074          "  GETNODESTATS ip-address stat-list\n"
00075          "    ip-address is a dot-delimited IPv4 node address or DNS name.\n"
00076          "    stat-list is a comma-delimited list of stat names:\n"
00077          "      Server only:\n"
00078          "        nodes_registered       - total client nodes registered\n"
00079          "        jobs_handled           - jobs accepted from Nagios\n"
00080          "        jobs_rejected_no_slots - jobs rejected due to no job list slots\n"
00081          "        jobs_rejected_no_nodes - jobs rejected due to no node requests\n"
00082          "        post_results_ok        - results successfully posted to Nagios\n"
00083          "        post_results_failed    - results failing post to Nagios\n"
00084          "      Server and individual nodes:\n"
00085          "        requests_received      - total requests received from all nodes\n"
00086          "        requests_expired       - total requests that have expired\n"
00087          "        dispatches_ok          - jobs successfully dispatched to nodes\n"
00088          "        dispatches_failed      - jobs failing dispatch to nodes\n"
00089          "        results_ok             - results returned with zero result code\n"
00090          "        results_failed         - results returned with non-zero result code\n"
00091          "        results_timed_out      - results timed out from assigned node\n"
00092          "    Note: Stats are returned in the order they are requested.\n"
00093          "          Unrecognized requested stats are returned as '?'.\n"
00094          "  GETNODELIST\n"
00095          "  GETCONFIG\n"
00096          "  GETVERSION\n"
00097          "  HELP";
00098    return xstrdup(help);
00099 }
00100 
00101 //----------------------------------------------------------------------------
00102 
00113 static int appendString(char ** spp, char * fmt, ... )
00114 {
00115    char buf[1024];
00116    char * newstr;
00117    size_t strsz;
00118    va_list ap;
00119 
00120    // build new string
00121    va_start(ap, fmt);
00122    vsnprintf(buf, sizeof buf, fmt, ap);
00123    va_end(ap);
00124 
00125    // reallocate buffer; initialize if necessary
00126    strsz = strlen(buf) + 1;
00127    if ((newstr = xrealloc(*spp, (*spp? strlen(*spp): 0) + strsz)) == 0) 
00128       return DNX_ERR_MEMORY;
00129    if (*spp == 0)
00130       *newstr = 0;
00131 
00132    // concatenate new string onto exiting string; return updated pointer
00133    strcat(newstr, buf);
00134    *spp = newstr;
00135    return 0;
00136 }
00137 
00138 //----------------------------------------------------------------------------
00139 
00148 static int dnxCountNodes(DnxNodeData * node, void * data)
00149 {
00150    // only count active nodes
00151    if (node->stats[REQUESTS_RECEIVED] != 0)
00152       (*(int *)data)++;
00153    return 0;
00154 }
00155 
00156 //----------------------------------------------------------------------------
00157 
00164 static char * buildMgmtStatsReply(char * req)
00165 {
00166    char * rsp = 0;
00167    int nodes_registered = 0;
00168    unsigned stats[STATS_COUNT];
00169 
00170    struct { char * str; unsigned * stat; } rs[] =
00171    {
00172       { "nodes_registered",       &nodes_registered              },
00173       { "requests_received",      &stats[REQUESTS_RECEIVED]      },
00174       { "requests_expired",       &stats[REQUESTS_EXPIRED]       },
00175       { "dispatches_ok",          &stats[DISPATCHES_OK]          },
00176       { "dispatches_failed",      &stats[DISPATCHES_FAILED]      },
00177       { "results_ok",             &stats[RESULTS_OK]             },
00178       { "results_failed",         &stats[RESULTS_FAILED]         },
00179       { "results_timed_out",      &stats[RESULTS_TIMED_OUT]      },
00180       { "jobs_handled",           &stats[JOBS_HANDLED]           },
00181       { "jobs_rejected_no_slots", &stats[JOBS_REJECTED_NO_SLOTS] },
00182       { "jobs_rejected_no_nodes", &stats[JOBS_REJECTED_NO_NODES] },
00183       { "post_results_ok",        &stats[POST_RESULTS_OK]        },
00184       { "post_results_failed",    &stats[POST_RESULTS_FAILED]    },
00185    };
00186 
00187    assert(req);
00188 
00189    // count number of registered nodes
00190    dnxStatsForEachNode(dnxCountNodes, &nodes_registered);
00191 
00192    // get a copy of the server stats
00193    dnxStatsGetServerStats(stats);
00194 
00195    // trim leading ws
00196    while (isspace(*req)) req++;
00197 
00198    while (*req)
00199    {
00200       char * ep, * np;
00201       unsigned i;
00202 
00203       // find start of next string or end
00204       if ((np = strchr(req, ',')) == 0)
00205          np = req + strlen(req);
00206 
00207       // trim trailing ws
00208       ep = np;
00209       while (ep > req && isspace(ep[-1])) ep--;
00210 
00211       // search table for sub-string, append requested stat to rsp
00212       for (i = 0; i < elemcount(rs); i++)
00213          if (memcmp(req, rs[i].str, ep - req) == 0)
00214          {
00215             if (appendString(&rsp, "%u,", *rs[i].stat) != 0)
00216                return xfree(rsp), (char *)0;
00217             break;
00218          }
00219 
00220       // check for unknown stat
00221       if (i == elemcount(rs) && appendString(&rsp, "?,") != 0)
00222          return xfree(rsp), (char *)0;
00223 
00224       // move to next sub-string or end
00225       if (*(req = np)) req++;
00226 
00227       // trim leading ws
00228       while (isspace(*req)) req++;
00229    }
00230 
00231    // remove trailing comma in non-empty response
00232    if (rsp)
00233    {
00234       size_t len = strlen(rsp);
00235       if (len && rsp[len - 1] == ',') rsp[len - 1] = 0;
00236    }
00237    return rsp;
00238 }
00239 
00240 //----------------------------------------------------------------------------
00241 
00242 typedef struct MgmtNodeStatsData
00243 {
00244    char * stats;
00245    char * rsp;
00246 } MgmtNodeStatsData;
00247 
00257 static int dnxBuildNodeStatsReply(DnxNodeData * node, void * data)
00258 {
00259    MgmtNodeStatsData * msd = (MgmtNodeStatsData *)data;
00260    char * req = msd->stats;
00261    char * rsp = 0;
00262 
00263    struct { char * str; unsigned * stat; } rs[] =
00264    {
00265       { "requests_received",  &node->stats[REQUESTS_RECEIVED] },
00266       { "requests_expired",   &node->stats[REQUESTS_EXPIRED]  },
00267       { "dispatches_ok",      &node->stats[DISPATCHES_OK]     },
00268       { "dispatches_failed",  &node->stats[DISPATCHES_FAILED] },
00269       { "results_ok",         &node->stats[RESULTS_OK]        },
00270       { "results_failed",     &node->stats[RESULTS_FAILED]    },
00271       { "results_timed_out",  &node->stats[RESULTS_TIMED_OUT] },
00272    };
00273 
00274    msd->rsp = 0;
00275 
00276    // check for inactive node status - return empty if inactive
00277    if (node->stats[REQUESTS_RECEIVED] == 0)
00278       return DNX_OK;
00279 
00280    while (*req)
00281    {
00282       char * ep, * np;
00283       unsigned i;
00284 
00285       // find start of next string or end
00286       if ((np = strchr(req, ',')) == 0)
00287          np = req + strlen(req);
00288 
00289       // trim trailing ws
00290       ep = np;
00291       while (ep > req && isspace(ep[-1])) ep--;
00292 
00293       // search table for sub-string, append requested stat to rsp
00294       for (i = 0; i < elemcount(rs); i++)
00295          if (memcmp(req, rs[i].str, ep - req) == 0)
00296          {
00297             if (appendString(&rsp, "%u,", *rs[i].stat) != 0)
00298                return xfree(rsp), DNX_ERR_MEMORY;
00299             break;
00300          }
00301 
00302       // check for unknown stat
00303       if (i == elemcount(rs) && appendString(&rsp, "?,") != 0)
00304          return xfree(rsp), DNX_ERR_MEMORY;
00305 
00306       // move to next sub-string or end
00307       if (*(req = np)) req++;
00308 
00309       // trim leading ws
00310       while (isspace(*req)) req++;
00311    }
00312 
00313    // remove trailing comma in non-empty response
00314    if (rsp)
00315    {
00316       size_t len = strlen(rsp);
00317       if (len && rsp[len - 1] == ',') rsp[len - 1] = 0;
00318    }
00319 
00320    msd->rsp = rsp;
00321    return DNX_OK;
00322 }
00323 
00324 //----------------------------------------------------------------------------
00325 
00333 static char * buildMgmtNodeStatsReply(char * req)
00334 {
00335    MgmtNodeStatsData data;
00336 
00337    assert(req);
00338 
00339    // req points to ip-address, remove leading ws
00340    while (isspace(*req)) req++;
00341 
00342    // locate stats-list, error if missing
00343    if ((data.stats = strchr(req, ' ')) == 0)
00344       return xstrdup("");
00345 
00346    // data.stats points to stats-list, remove leading ws
00347    *data.stats++ = 0;
00348    while (isspace(*data.stats)) data.stats++;
00349 
00350    dnxStatsForNodeByAddrStr(req, dnxBuildNodeStatsReply, &data);
00351 
00352    return data.rsp? data.rsp: xstrdup("");
00353 }
00354 
00355 //----------------------------------------------------------------------------
00356 
00365 static int dnxResetNodeStats(DnxNodeData * node, void * data)
00366 {
00367    memset(node->stats, 0, sizeof node->stats);
00368    return 0;
00369 }
00370 
00371 //----------------------------------------------------------------------------
00372 
00381 static int dnxNodeListBuilder(DnxNodeData * node, void * data)
00382 {
00383    // only return node information if node has been active since last reset
00384    return node->stats[REQUESTS_RECEIVED]? 
00385          appendString((char **)data, "%s,", node->addrstr): 0;
00386 }
00387 
00388 //----------------------------------------------------------------------------
00389 
00394 static char * buildMgmtNodeListReply(void)
00395 {
00396    char * rsp = 0;   
00397 
00398    dnxStatsForEachNode(dnxNodeListBuilder, &rsp);
00399 
00400    // remove trailing comma in non-empty response
00401    if (rsp)
00402    {
00403       size_t len = strlen(rsp);
00404       if (len && rsp[len - 1] == ',') rsp[len - 1] = 0;
00405    }
00406    return rsp? rsp: xstrdup("");
00407 }
00408 
00409 //----------------------------------------------------------------------------
00410 
00415 static char * buildMgmtCfgReply(void)
00416 {
00417    char * buf;
00418    size_t bufsz = 0;
00419 
00420    // first call gets size
00421    if (dnxCfgParserGetCfg(s_parser, 0, &bufsz) != 0)
00422       return 0;
00423 
00424    // second call gets configuration
00425    if ((buf = (char *)xmalloc(bufsz)) != 0
00426          && dnxCfgParserGetCfg(s_parser, buf, &bufsz) != 0)
00427       xfree(buf), (buf = 0);
00428 
00429    return buf;
00430 }
00431 
00432 //----------------------------------------------------------------------------
00433 
00440 static void * dnxAgentServer(void * data)
00441 {
00442    int ret;
00443    DnxMgmtRequest Msg;
00444    Msg.action = 0;
00445 
00446    dnxLog("DNX Server Agent awaiting commands...");
00447 
00448    while (!s_shutdown)
00449    {
00450       memset(Msg.address, '\0', DNX_MAX_ADDRESS);
00451 
00452       // wait 2 second for a request; process the request, if valid
00453       if ((ret = dnxWaitForMgmtRequest(s_agent, &Msg, Msg.address, 2)) == DNX_OK)
00454       {
00455          DnxMgmtReply Rsp;
00456          char addrstr[DNX_MAX_ADDRSTR];
00457 
00458          dnxDebug(2, "Received MgmtRequest from %s.", 
00459                dnxNtop(Msg.address, addrstr, sizeof addrstr));
00460 
00461          // setup some default response values
00462          Rsp.xid = Msg.xid;
00463          Rsp.status = DNX_REQ_ACK;
00464          Rsp.reply = 0;
00465 
00466          // perform the requested action
00467          if (!strcmp(Msg.action, "RECONFIGURE"))
00468          {
00469             s_reconfig = 1;
00470             Rsp.reply = xstrdup("OK");
00471          }
00472          else if (!strcmp(Msg.action, "RESETSTATS"))
00473          {
00474             dnxStatsResetServerStats();
00475             dnxStatsForEachNode(dnxResetNodeStats, 0);
00476             Rsp.reply = xstrdup("OK");
00477          }
00478          else if (!strncmp(Msg.action, "GETSTATS ", 9))
00479          {
00480             if ((Rsp.reply = buildMgmtStatsReply(Msg.action + 9)) == 0)
00481                Rsp.status = DNX_REQ_NAK;
00482          }
00483          else if (!strncmp(Msg.action, "GETNODESTATS ", 13))
00484          {
00485             if ((Rsp.reply = buildMgmtNodeStatsReply(Msg.action + 13)) == 0)
00486                Rsp.status = DNX_REQ_NAK;
00487          }
00488          else if (!strcmp(Msg.action, "GETNODELIST"))
00489          {
00490             if ((Rsp.reply = buildMgmtNodeListReply()) == 0)
00491                Rsp.status = DNX_REQ_NAK;
00492          }
00493          else if (!strcmp(Msg.action, "GETCONFIG"))
00494          {
00495             if ((Rsp.reply = buildMgmtCfgReply()) == 0)
00496                Rsp.status = DNX_REQ_NAK;
00497          }
00498          else if (!strcmp(Msg.action, "GETVERSION"))
00499          {
00500             if ((Rsp.reply = versionText()) == 0)
00501                Rsp.status = DNX_REQ_NAK;
00502          }
00503          else if (!strcmp(Msg.action, "HELP"))
00504          {
00505             if ((Rsp.reply = buildHelpReply()) == 0)
00506                Rsp.status = DNX_REQ_NAK;
00507          }
00508 
00509          // send response, log response failures
00510          if ((ret = dnxSendMgmtReply(s_agent, &Rsp, Msg.address)) != 0)
00511             dnxLog("Agent response failure: %s.", dnxErrorString(ret));
00512 
00513          // free request and reply message buffers
00514          xfree(Rsp.reply);
00515          xfree(Msg.action);
00516       }
00517       else if (ret != DNX_ERR_TIMEOUT)
00518          dnxLog("Agent channel failure: %s.", dnxErrorString(ret));
00519 
00520       if (s_reconfig)
00521       {
00522          dnxReconfigure();
00523          s_reconfig = 0;
00524       }
00525    }
00526 
00527    dnxLog("Agent terminating...");
00528 
00529    return 0;
00530 }
00531 
00532 /*--------------------------------------------------------------------------
00533                                  INTERFACE
00534   --------------------------------------------------------------------------*/
00535 
00536 void dnxAgentSignalReconfig(void)
00537 {
00538    s_reconfig = 1;
00539 }
00540 
00541 int dnxInitAgent(char * agentUrl, DnxCfgParser * parser)
00542 {
00543    int ret;
00544 
00545    s_shutdown = 0;
00546    s_agentTid = 0;
00547    s_parser = parser;
00548 
00549    if ((ret = dnxChanMapAdd(s_agentName, agentUrl)) != DNX_OK)
00550       dnxLog("AGENT channel init failed: %s.", dnxErrorString(ret));
00551    else if ((ret = dnxConnect(s_agentName, DNX_MODE_PASSIVE, &s_agent)) != DNX_OK)
00552    {
00553       dnxLog("AGENT channel connect failed: %s.", dnxErrorString(ret));
00554       dnxChanMapDelete(s_agentName);
00555    }
00556    else if ((ret = pthread_create(&s_agentTid, 0, dnxAgentServer, 0)) != 0)
00557    {
00558       dnxLog("AGENT server init failed: %s.", strerror(ret));
00559       dnxDisconnect(s_agent);
00560       dnxChanMapDelete(s_agentName);
00561       ret = DNX_ERR_THREAD;
00562    }
00563    return ret;
00564 }
00565 
00566 //----------------------------------------------------------------------------
00567 
00568 void dnxReleaseAgent(void)
00569 {
00570    if (s_agentTid)
00571    {
00572       s_shutdown = 1;
00573       pthread_join(s_agentTid, 0);
00574       dnxDisconnect(s_agent);
00575       dnxChanMapDelete(s_agentName);
00576    }
00577 }
00578 
00579 /*--------------------------------------------------------------------------*/
00580 

Generated on Tue Apr 13 15:48:07 2010 for DNX by  doxygen 1.5.6