dnxMsgQ.c

Go to the documentation of this file.
00001 /*--------------------------------------------------------------------------
00002 
00003    Copyright (c) 2006-2007, Intellectual Reserve, Inc. All rights reserved.
00004 
00005    This program is free software; you can redistribute it and/or modify
00006    it under the terms of the GNU General Public License version 2 as
00007    published by the Free Software Foundation.
00008 
00009    This program is distributed in the hope that it will be useful,
00010    but WITHOUT ANY WARRANTY; without even the implied warranty of
00011    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00012    GNU General Public License for more details.
00013 
00014    You should have received a copy of the GNU General Public License
00015    along with this program; if not, write to the Free Software
00016    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00017 
00018   --------------------------------------------------------------------------*/
00019 
00028 #include "dnxMsgQ.h"    // temporary
00029 #include "dnxTSPI.h"
00030 
00031 #include "dnxTransport.h"
00032 #include "dnxError.h"
00033 #include "dnxDebug.h"
00034 #include "dnxLogging.h"
00035 
00036 #include <sys/types.h>
00037 #include <sys/ipc.h>
00038 #include <sys/msg.h>
00039 #include <stdio.h>
00040 #include <stddef.h>
00041 #include <stdlib.h>
00042 #include <string.h>
00043 #include <errno.h>
00044 #include <assert.h>
00045 
00046 #define DNX_MSGQ_STANDARD  1  // Message type
00047 
00048 typedef struct _dnxMsgBuf_
00049 {
00050    long mtype;       /* message type, must be > 0 */
00051    char * mtext;     /* message data */
00052 } dnxMsgBuf;
00053 
00055 typedef struct iDnxMsgQChannel_
00056 {
00057    key_t queuekey;      
00058    int queueid;         
00059    iDnxChannel ichan;   
00060 } iDnxMsgQChannel;
00061 
00062 /*--------------------------------------------------------------------------
00063                   TRANSPORT SERVICE PROVIDER INTERFACE
00064   --------------------------------------------------------------------------*/
00065 
00075 static int dnxMsgQOpen(iDnxChannel * icp, int mode)
00076 {
00077    iDnxMsgQChannel * imcp = (iDnxMsgQChannel *)
00078          ((char *)icp - offsetof(iDnxMsgQChannel, ichan));
00079    int qid;
00080 
00081    assert(icp && imcp->queuekey > 0);
00082 
00083    // attempt to create/open the message queue
00084    if ((qid = msgget(imcp->queuekey, IPC_CREAT | 0x660)) == (key_t)(-1))
00085       return DNX_ERR_OPEN;
00086 
00087    imcp->queueid = qid;
00088 
00089    return DNX_OK;
00090 }
00091 
00092 //----------------------------------------------------------------------------
00093 
00100 static int dnxMsgQClose(iDnxChannel * icp)
00101 {
00102    iDnxMsgQChannel * imcp = (iDnxMsgQChannel *)
00103          ((char *)icp - offsetof(iDnxMsgQChannel, ichan));
00104 
00105    assert(icp && imcp->queueid);
00106 
00107    // This is really a NOP, since we don't "close" our handle to the message queue.
00108    //
00109    // However, the message queue should be deleted when no longer in use by
00110    // any process; but that will have to be implemented in dnxMsgQDeinit().
00111 
00112    imcp->queueid = 0;      // temporary till we get global close implemented
00113 
00114    return DNX_OK;
00115 }
00116 
00117 //----------------------------------------------------------------------------
00118 
00134 static int dnxMsgQRead(iDnxChannel * icp, char * buf, int * size,
00135       int timeout, char * src)
00136 {
00137    iDnxMsgQChannel * imcp = (iDnxMsgQChannel *)
00138          ((char *)icp - offsetof(iDnxMsgQChannel, ichan));
00139    dnxMsgBuf msg;
00140 
00141    assert(icp && imcp->queueid && buf && size && *size > 0);
00142 
00143    msg.mtext = buf;
00144 
00145    // wait for a message, truncate if larger than the specified buffer size
00146    if ((*size = (int)msgrcv(imcp->queueid, &msg, *size, 0L, MSG_NOERROR)) == -1)
00147       return DNX_ERR_RECEIVE;
00148 
00151    return DNX_OK;
00152 }
00153 
00154 //----------------------------------------------------------------------------
00155 
00169 static int dnxMsgQWrite(iDnxChannel * icp, char * buf, int size,
00170       int timeout, char * dst)
00171 {
00172    iDnxMsgQChannel * imcp = (iDnxMsgQChannel *)
00173          ((char *)icp - offsetof(iDnxMsgQChannel, ichan));
00174    dnxMsgBuf msg;
00175 
00176    assert(icp && imcp->queueid && buf && size > 0);
00177 
00178    msg.mtype = (long)DNX_MSGQ_STANDARD;
00179    msg.mtext = buf;
00180 
00181    // send the message
00182    if (msgsnd(imcp->queueid, &msg, size, 0) == -1)
00183       return DNX_ERR_SEND;
00184 
00187    return DNX_OK;
00188 }
00189 
00190 //----------------------------------------------------------------------------
00191 
00196 static void dnxMsgQDelete(iDnxChannel * icp)
00197 {
00198    iDnxMsgQChannel * imcp = (iDnxMsgQChannel *)
00199          ((char *)icp - offsetof(iDnxMsgQChannel, ichan));
00200 
00201    assert(icp && imcp->queueid == 0);
00202 
00203    xfree(icp);
00204 }
00205 
00206 //----------------------------------------------------------------------------
00207 
00216 static int dnxMsgQNew(char * url, iDnxChannel ** icpp)
00217 {
00218    char * cp, * ep, * lastchar;
00219    iDnxMsgQChannel * imcp;
00220    long queuekey;
00221 
00222    assert(icpp && url && *url);
00223 
00224    // search for messageqid in URL
00225    if ((cp = strstr(url, "://")) == 0)
00226       return DNX_ERR_BADURL;
00227    cp += 3;
00228 
00229    // get the message queue ID
00230    errno = 0;
00231    if ((queuekey = strtol(cp, &lastchar, 0)) < 1 || errno == ERANGE
00232          || (*lastchar && *lastchar != '/'))
00233       return DNX_ERR_BADURL;
00234 
00235    // no private keys are allowed
00236    if ((key_t)queuekey == IPC_PRIVATE)
00237       return DNX_ERR_BADURL;
00238 
00239    // allocate a new iDnxMsgQChannel object
00240    if ((imcp = (iDnxMsgQChannel *)xmalloc(sizeof *imcp)) == 0)
00241       return DNX_ERR_MEMORY;
00242 
00243    memset(imcp, 0, sizeof *imcp);
00244 
00245    // save message queue ID
00246    imcp->queuekey = (key_t)queuekey;
00247 
00248    // set I/O methods
00249    imcp->ichan.txOpen   = dnxMsgQOpen;
00250    imcp->ichan.txClose  = dnxMsgQClose;
00251    imcp->ichan.txRead   = dnxMsgQRead;
00252    imcp->ichan.txWrite  = dnxMsgQWrite;
00253    imcp->ichan.txDelete = dnxMsgQDelete;
00254 
00255    *icpp = &imcp->ichan;
00256 
00257    return DNX_OK;
00258 }
00259 
00260 /*--------------------------------------------------------------------------
00261                            EXPORTED INTERFACE
00262   --------------------------------------------------------------------------*/
00263 
00271 int dnxMsgQInit(int (**ptxAlloc)(char * url, iDnxChannel ** icpp))
00272 {
00273    // Could use this routine to loop through the global channel map
00274    // and create all message queues found therein (or error out if
00275    // any of them are already in use...)
00276 
00277    *ptxAlloc = dnxMsgQNew;
00278 
00279    return DNX_OK;
00280 }
00281 
00282 //----------------------------------------------------------------------------
00283 
00286 void dnxMsgQDeInit(void)
00287 {
00288    // Could use this routine to remove all of our own message queues
00289    // from the system IPC space.
00290 }
00291 
00292 /*--------------------------------------------------------------------------*/
00293 

Generated on Tue Apr 13 15:48:07 2010 for DNX by  doxygen 1.5.6