dnxAgent.c

Go to the documentation of this file.
00001 /*--------------------------------------------------------------------------
00002 
00003    Copyright (c) 2006-2010, Intellectual Reserve, Inc. All rights reserved.
00004 
00005    This program is free software; you can redistribute it and/or modify
00006    it under the terms of the GNU General Public License version 2 as
00007    published by the Free Software Foundation.
00008 
00009    This program is distributed in the hope that it will be useful,
00010    but WITHOUT ANY WARRANTY; without even the implied warranty of
00011    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00012    GNU General Public License for more details.
00013 
00014    You should have received a copy of the GNU General Public License
00015    along with this program; if not, write to the Free Software
00016    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00017 
00018   --------------------------------------------------------------------------*/
00019 
00028 #ifdef HAVE_CONFIG_H
00029 # include "config.h"
00030 #endif
00031 
00032 #include "dnxAgent.h"
00033 #include "dnxNebMain.h"
00034 #include "dnxTransport.h"
00035 #include "dnxProtocol.h"
00036 #include "dnxCfgParser.h"
00037 #include "dnxError.h"
00038 #include "dnxDebug.h"
00039 #include "dnxLogging.h"
00040 #include "dnxStats.h"
00041 
00042 #include <unistd.h>
00043 #include <pthread.h>
00044 #include <stdio.h>
00045 #include <stdlib.h>
00046 #include <stdarg.h>
00047 
00048 #define elemcount(x) (sizeof(x)/sizeof(*(x)))
00049 
00050 // module static data
00051 static char * s_agentName = "Agent";   
00052 static DnxChannel * s_agent;           
00053 static pthread_t s_agentTid;           
00054 static DnxCfgParser * s_parser;        
00055 static int s_shutdown;                 
00056 
00057 /*--------------------------------------------------------------------------
00058                               IMPLEMENTATION
00059   --------------------------------------------------------------------------*/
00060 
00065 static char * buildHelpReply(void)
00066 {
00067    static char * help =
00068          "DNX Integrated Plugin/Server Management Commands:\n"
00069          "  RESETSTATS\n"
00070          "  GETSTATS stat-list\n"
00071          "  GETNODESTATS ip-address stat-list\n"
00072          "    ip-address is a dot-delimited IPv4 node address or DNS name.\n"
00073          "    stat-list is a comma-delimited list of stat names:\n"
00074          "      Server only:\n"
00075          "        nodes_registered       - total client nodes registered\n"
00076          "        jobs_handled           - jobs accepted from Nagios\n"
00077          "        jobs_rejected_no_slots - jobs rejected due to no job list slots\n"
00078          "        jobs_rejected_no_nodes - jobs rejected due to no node requests\n"
00079          "        post_results_ok        - results successfully posted to Nagios\n"
00080          "        post_results_failed    - results failing post to Nagios\n"
00081          "      Server and individual nodes:\n"
00082          "        requests_received      - total requests received from all nodes\n"
00083          "        requests_expired       - total requests that have expired\n"
00084          "        dispatches_ok          - jobs successfully dispatched to nodes\n"
00085          "        dispatches_failed      - jobs failing dispatch to nodes\n"
00086          "        results_ok             - results returned with zero result code\n"
00087          "        results_failed         - results returned with non-zero result code\n"
00088          "        results_timed_out      - results timed out from assigned node\n"
00089          "    Note: Stats are returned in the order they are requested.\n"
00090          "          Unrecognized requested stats are returned as '?'.\n"
00091          "  GETNODELIST\n"
00092          "  GETCONFIG\n"
00093          "  GETVERSION\n"
00094          "  HELP";
00095    return xstrdup(help);
00096 }
00097 
00098 //----------------------------------------------------------------------------
00099 
00110 static int appendString(char ** spp, char * fmt, ... )
00111 {
00112    char buf[1024];
00113    char * newstr;
00114    size_t strsz;
00115    va_list ap;
00116 
00117    // build new string
00118    va_start(ap, fmt);
00119    vsnprintf(buf, sizeof buf, fmt, ap);
00120    va_end(ap);
00121 
00122    // reallocate buffer; initialize if necessary
00123    strsz = strlen(buf) + 1;
00124    if ((newstr = xrealloc(*spp, (*spp? strlen(*spp): 0) + strsz)) == 0) 
00125       return DNX_ERR_MEMORY;
00126    if (*spp == 0)
00127       *newstr = 0;
00128 
00129    // concatenate new string onto exiting string; return updated pointer
00130    strcat(newstr, buf);
00131    *spp = newstr;
00132    return 0;
00133 }
00134 
00135 //----------------------------------------------------------------------------
00136 
00145 static int dnxCountNodes(DnxNodeData * node, void * data)
00146 { 
00147    // only count active nodes
00148    if (node->stats[REQUESTS_RECEIVED] != 0)
00149       (*(int *)data)++;
00150    return 0;
00151 }
00152 
00153 //----------------------------------------------------------------------------
00154 
00161 static char * buildMgmtStatsReply(char * req)
00162 {
00163    char * rsp = 0;
00164    int nodes_registered = 0;
00165    unsigned stats[STATS_COUNT];
00166 
00167    struct { char * str; unsigned * stat; } rs[] =
00168    {
00169       { "nodes_registered",       &nodes_registered              },
00170       { "requests_received",      &stats[REQUESTS_RECEIVED]      },
00171       { "requests_expired",       &stats[REQUESTS_EXPIRED]       },
00172       { "dispatches_ok",          &stats[DISPATCHES_OK]          },
00173       { "dispatches_failed",      &stats[DISPATCHES_FAILED]      },
00174       { "results_ok",             &stats[RESULTS_OK]             },
00175       { "results_failed",         &stats[RESULTS_FAILED]         },
00176       { "results_timed_out",      &stats[RESULTS_TIMED_OUT]      },
00177       { "jobs_handled",           &stats[JOBS_HANDLED]           },
00178       { "jobs_rejected_no_slots", &stats[JOBS_REJECTED_NO_SLOTS] },
00179       { "jobs_rejected_no_nodes", &stats[JOBS_REJECTED_NO_NODES] },
00180       { "post_results_ok",        &stats[POST_RESULTS_OK]        },
00181       { "post_results_failed",    &stats[POST_RESULTS_FAILED]    },
00182    };
00183 
00184    assert(req);
00185 
00186    // count number of registered nodes
00187    dnxStatsForEachNode(dnxCountNodes, &nodes_registered);
00188 
00189    // get a copy of the server stats
00190    dnxStatsGetServerStats(stats);
00191 
00192    // trim leading ws
00193    while (isspace(*req)) req++;
00194 
00195    while (*req)
00196    {
00197       char * ep, * np;
00198       unsigned i;
00199 
00200       // find start of next string or end
00201       if ((np = strchr(req, ',')) == 0)
00202          np = req + strlen(req);
00203 
00204       // trim trailing ws
00205       ep = np;
00206       while (ep > req && isspace(ep[-1])) ep--;
00207 
00208       // search table for sub-string, append requested stat to rsp
00209       for (i = 0; i < elemcount(rs); i++)
00210          if (memcmp(req, rs[i].str, ep - req) == 0)
00211          {
00212             if (appendString(&rsp, "%u,", *rs[i].stat) != 0)
00213                return xfree(rsp), (char *)0;
00214             break;
00215          }
00216 
00217       // check for unknown stat
00218       if (i == elemcount(rs) && appendString(&rsp, "?,") != 0)
00219          return xfree(rsp), (char *)0;
00220 
00221       // move to next sub-string or end
00222       if (*(req = np)) req++;
00223 
00224       // trim leading ws
00225       while (isspace(*req)) req++;
00226    }
00227 
00228    // remove trailing comma in non-empty response
00229    if (rsp)
00230    {
00231       size_t len = strlen(rsp);
00232       if (len && rsp[len - 1] == ',') rsp[len - 1] = 0;
00233    }
00234    return rsp;
00235 }
00236 
00237 //----------------------------------------------------------------------------
00238 
00239 typedef struct MgmtNodeStatsData
00240 {
00241    char * stats;
00242    char * rsp;
00243 } MgmtNodeStatsData;
00244 
00254 static int dnxBuildNodeStatsReply(DnxNodeData * node, void * data)
00255 {
00256    MgmtNodeStatsData * msd = (MgmtNodeStatsData *)data;
00257    char * req = msd->stats;
00258    char * rsp = 0;
00259 
00260    struct { char * str; unsigned * stat; } rs[] =
00261    {
00262       { "requests_received",  &node->stats[REQUESTS_RECEIVED] },
00263       { "requests_expired",   &node->stats[REQUESTS_EXPIRED]  },
00264       { "dispatches_ok",      &node->stats[DISPATCHES_OK]     },
00265       { "dispatches_failed",  &node->stats[DISPATCHES_FAILED] },
00266       { "results_ok",         &node->stats[RESULTS_OK]        },
00267       { "results_failed",     &node->stats[RESULTS_FAILED]    },
00268       { "results_timed_out",  &node->stats[RESULTS_TIMED_OUT] },
00269    };
00270 
00271    msd->rsp = 0;
00272 
00273    // check for inactive node status - return empty if inactive
00274    if (node->stats[REQUESTS_RECEIVED] == 0)
00275       return DNX_OK;
00276 
00277    while (*req)
00278    {
00279       char * ep, * np;
00280       unsigned i;
00281 
00282       // find start of next string or end
00283       if ((np = strchr(req, ',')) == 0)
00284          np = req + strlen(req);
00285 
00286       // trim trailing ws
00287       ep = np;
00288       while (ep > req && isspace(ep[-1])) ep--;
00289 
00290       // search table for sub-string, append requested stat to rsp
00291       for (i = 0; i < elemcount(rs); i++)
00292          if (memcmp(req, rs[i].str, ep - req) == 0)
00293          {
00294             if (appendString(&rsp, "%u,", *rs[i].stat) != 0)
00295                return xfree(rsp), DNX_ERR_MEMORY;
00296             break;
00297          }
00298 
00299       // check for unknown stat
00300       if (i == elemcount(rs) && appendString(&rsp, "?,") != 0)
00301          return xfree(rsp), DNX_ERR_MEMORY;
00302 
00303       // move to next sub-string or end
00304       if (*(req = np)) req++;
00305 
00306       // trim leading ws
00307       while (isspace(*req)) req++;
00308    }
00309 
00310    // remove trailing comma in non-empty response
00311    if (rsp)
00312    {
00313       size_t len = strlen(rsp);
00314       if (len && rsp[len - 1] == ',') rsp[len - 1] = 0;
00315    }
00316 
00317    msd->rsp = rsp;
00318    return DNX_OK;
00319 }
00320 
00321 //----------------------------------------------------------------------------
00322 
00330 static char * buildMgmtNodeStatsReply(char * req)
00331 {
00332    MgmtNodeStatsData data;
00333 
00334    assert(req);
00335 
00336    // req points to ip-address, remove leading ws
00337    while (isspace(*req)) req++;
00338 
00339    // locate stats-list, error if missing
00340    if ((data.stats = strchr(req, ' ')) == 0)
00341       return xstrdup("");
00342 
00343    // data.stats points to stats-list, remove leading ws
00344    *data.stats++ = 0;
00345    while (isspace(*data.stats)) data.stats++;
00346 
00347    dnxStatsForNodeByAddrStr(req, dnxBuildNodeStatsReply, &data);
00348 
00349    return data.rsp? data.rsp: xstrdup("");
00350 }
00351 
00352 //----------------------------------------------------------------------------
00353 
00362 static int dnxResetNodeStats(DnxNodeData * node, void * data)
00363 {
00364    memset(node->stats, 0, sizeof node->stats);
00365    return 0;
00366 }
00367 
00368 //----------------------------------------------------------------------------
00369 
00378 static int dnxNodeListBuilder(DnxNodeData * node, void * data)
00379 {
00380    // only return node information if node has been active since last reset
00381    return node->stats[REQUESTS_RECEIVED]? 
00382          appendString((char **)data, "%s,", node->addrstr): 0;
00383 }
00384 
00385 //----------------------------------------------------------------------------
00386 
00391 static char * buildMgmtNodeListReply(void)
00392 {
00393    char * rsp = 0;   
00394 
00395    dnxStatsForEachNode(dnxNodeListBuilder, &rsp);
00396 
00397    // remove trailing comma in non-empty response
00398    if (rsp)
00399    {
00400       size_t len = strlen(rsp);
00401       if (len && rsp[len - 1] == ',') rsp[len - 1] = 0;
00402    }
00403    return rsp? rsp: xstrdup("");
00404 }
00405 
00406 //----------------------------------------------------------------------------
00407 
00412 static char * buildMgmtCfgReply(void)
00413 {
00414    char * buf;
00415    size_t bufsz = 0;
00416 
00417    // first call gets size
00418    if (dnxCfgParserGetCfg(s_parser, 0, &bufsz) != 0)
00419       return 0;
00420 
00421    // second call gets configuration
00422    if ((buf = (char *)xmalloc(bufsz)) != 0
00423          && dnxCfgParserGetCfg(s_parser, buf, &bufsz) != 0)
00424       xfree(buf), (buf = 0);
00425 
00426    return buf;
00427 }
00428 
00429 //----------------------------------------------------------------------------
00430 
00437 static void * dnxAgentServer(void * data)
00438 {
00439    int ret;
00440    DnxMgmtRequest Msg;
00441    Msg.action = 0;
00442 
00443    dnxLog("DNX Server Agent awaiting commands...");
00444 
00445    while (!s_shutdown)
00446    {
00447       memset(Msg.address, '\0', DNX_MAX_ADDRESS);
00448 
00449       // wait 2 second for a request; process the request, if valid
00450       if ((ret = dnxWaitForMgmtRequest(s_agent, &Msg, Msg.address, 2)) == DNX_OK)
00451       {
00452          DnxMgmtReply Rsp;
00453          char addrstr[DNX_MAX_ADDRSTR];
00454 
00455          dnxDebug(2, "Received MgmtRequest from %s.", 
00456                dnxNtop(Msg.address, addrstr, sizeof addrstr));
00457 
00458          // setup some default response values
00459          Rsp.xid = Msg.xid;
00460          Rsp.status = DNX_REQ_ACK;
00461          Rsp.reply = 0;
00462 
00463          // perform the requested action
00464          if (!strcmp(Msg.action, "RESETSTATS"))
00465          {
00466             dnxStatsResetServerStats();
00467             dnxStatsForEachNode(dnxResetNodeStats, 0);
00468             Rsp.reply = xstrdup("OK");
00469          }
00470          else if (!strncmp(Msg.action, "GETSTATS ", 9))
00471          {
00472             if ((Rsp.reply = buildMgmtStatsReply(Msg.action + 9)) == 0)
00473                Rsp.status = DNX_REQ_NAK;
00474          }
00475          else if (!strncmp(Msg.action, "GETNODESTATS ", 13))
00476          {
00477             if ((Rsp.reply = buildMgmtNodeStatsReply(Msg.action + 13)) == 0)
00478                Rsp.status = DNX_REQ_NAK;
00479          }
00480          else if (!strcmp(Msg.action, "GETNODELIST"))
00481          {
00482             if ((Rsp.reply = buildMgmtNodeListReply()) == 0)
00483                Rsp.status = DNX_REQ_NAK;
00484          }
00485          else if (!strcmp(Msg.action, "GETCONFIG"))
00486          {
00487             if ((Rsp.reply = buildMgmtCfgReply()) == 0)
00488                Rsp.status = DNX_REQ_NAK;
00489          }
00490          else if (!strcmp(Msg.action, "GETVERSION"))
00491          {
00492             if ((Rsp.reply = versionText()) == 0)
00493                Rsp.status = DNX_REQ_NAK;
00494          }
00495          else if (!strcmp(Msg.action, "HELP"))
00496          {
00497             if ((Rsp.reply = buildHelpReply()) == 0)
00498                Rsp.status = DNX_REQ_NAK;
00499          }
00500 
00501          // send response, log response failures
00502          if ((ret = dnxSendMgmtReply(s_agent, &Rsp, Msg.address)) != 0)
00503             dnxLog("Agent response failure: %s.", dnxErrorString(ret));
00504 
00505          // free request and reply message buffers
00506          xfree(Rsp.reply);
00507          xfree(Msg.action);
00508       }
00509       else if (ret != DNX_ERR_TIMEOUT)
00510          dnxLog("Agent channel failure: %s.", dnxErrorString(ret));
00511    }
00512 
00513    dnxLog("Agent terminating...");
00514 
00515    return 0;
00516 }
00517 
00518 /*--------------------------------------------------------------------------
00519                                  INTERFACE
00520   --------------------------------------------------------------------------*/
00521 
00522 int dnxInitAgent(char * agentUrl, DnxCfgParser * parser)
00523 {
00524    int ret;
00525 
00526    s_shutdown = 0;
00527    s_agentTid = 0;
00528    s_parser = parser;
00529 
00530    if ((ret = dnxChanMapAdd(s_agentName, agentUrl)) != DNX_OK)
00531       dnxLog("AGENT channel init failed: %s.", dnxErrorString(ret));
00532    else if ((ret = dnxConnect(s_agentName, DNX_MODE_PASSIVE, &s_agent)) != DNX_OK)
00533    {
00534       dnxLog("AGENT channel connect failed: %s.", dnxErrorString(ret));
00535       dnxChanMapDelete(s_agentName);
00536    }
00537    else if ((ret = pthread_create(&s_agentTid, 0, dnxAgentServer, 0)) != 0)
00538    {
00539       dnxLog("AGENT server init failed: %s.", strerror(ret));
00540       dnxDisconnect(s_agent);
00541       dnxChanMapDelete(s_agentName);
00542       ret = DNX_ERR_THREAD;
00543    }
00544    return ret;
00545 }
00546 
00547 //----------------------------------------------------------------------------
00548 
00549 void dnxReleaseAgent(void)
00550 {
00551    if (s_agentTid)
00552    { 
00553       s_shutdown = 1;
00554       pthread_join(s_agentTid, 0);
00555       dnxDisconnect(s_agent);
00556       dnxChanMapDelete(s_agentName);
00557    }
00558 }
00559 
00560 /*--------------------------------------------------------------------------*/
00561 

Generated on Tue Apr 13 15:48:07 2010 for DNX by  doxygen 1.5.6