dnxQueue.c

Go to the documentation of this file.
00001 /*--------------------------------------------------------------------------
00002 
00003    Copyright (c) 2006-2010, Intellectual Reserve, Inc. All rights reserved.
00004 
00005    This program is free software; you can redistribute it and/or modify
00006    it under the terms of the GNU General Public License version 2 as
00007    published by the Free Software Foundation.
00008 
00009    This program is distributed in the hope that it will be useful,
00010    but WITHOUT ANY WARRANTY; without even the implied warranty of
00011    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00012    GNU General Public License for more details.
00013 
00014    You should have received a copy of the GNU General Public License
00015    along with this program; if not, write to the Free Software
00016    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00017 
00018   --------------------------------------------------------------------------*/
00019 
00028 #include "dnxQueue.h"
00029 
00030 #include "dnxError.h"
00031 #include "dnxDebug.h"
00032 #include "dnxLogging.h"
00033 
00034 #include <stdlib.h>
00035 #include <syslog.h>
00036 #include <assert.h>
00037 #include <pthread.h>
00038 
00040 typedef struct iDnxQueueEntry_
00041 {
00042    struct iDnxQueueEntry_ * next;   
00043    void * pPayload;                 
00044 } iDnxQueueEntry;
00045 
00047 typedef struct iDnxQueue_
00048 {
00049    iDnxQueueEntry * head;           
00050    iDnxQueueEntry * tail;           
00051    iDnxQueueEntry * current;        
00052    void (*freepayload)(void *);     
00053    unsigned size;                   
00054    unsigned maxsz;                  
00055    pthread_mutex_t mutex;           
00056    pthread_cond_t cv;               
00057 } iDnxQueue;
00058 
00059 /*--------------------------------------------------------------------------
00060                      NON-EXPORTED INTERFACE (not used yet)
00061   --------------------------------------------------------------------------*/
00062 
00078 int dnxQueueNext(DnxQueue * queue, void ** ppPayload)
00079 {
00080    iDnxQueue * iqueue = (iDnxQueue *)queue;
00081 
00082    assert(queue && ppPayload);
00083 
00084    *ppPayload = 0;
00085 
00086    DNX_PT_MUTEX_LOCK(&iqueue->mutex);
00087 
00088    // save pointer to current payload
00089    if (iqueue->current)
00090    {
00091       *ppPayload = iqueue->current->pPayload;
00092 
00093       // advance circular buffer pointer
00094       if (iqueue->current->next)
00095          iqueue->current = iqueue->current->next;
00096       else
00097          iqueue->current = iqueue->head;
00098    }
00099 
00100    DNX_PT_MUTEX_UNLOCK(&iqueue->mutex);
00101 
00102    return *ppPayload ? DNX_OK : DNX_ERR_NOTFOUND;
00103 }
00104 
00105 //----------------------------------------------------------------------------
00106 
00117 int dnxQueueSize(DnxQueue * queue)
00118 {
00119    iDnxQueue * iqueue = (iDnxQueue *)queue;
00120    int count;
00121 
00122    assert(queue);
00123 
00124    DNX_PT_MUTEX_LOCK(&iqueue->mutex);
00125 
00126    count = (int)iqueue->size;
00127 
00128    DNX_PT_MUTEX_UNLOCK(&iqueue->mutex);
00129 
00130    return count;
00131 }
00132 
00133 /*--------------------------------------------------------------------------
00134                                  INTERFACE
00135   --------------------------------------------------------------------------*/
00136 
00137 int dnxQueuePut(DnxQueue * queue, void * pPayload)
00138 {
00139    iDnxQueue * iqueue = (iDnxQueue *)queue;
00140    iDnxQueueEntry * item;
00141 
00142    assert(queue);
00143 
00144    // create structure with new request
00145    if ((item = (iDnxQueueEntry *)xmalloc(sizeof *item)) == 0)
00146       return DNX_ERR_MEMORY;
00147 
00148    item->pPayload = pPayload;
00149    item->next = 0;
00150 
00151    DNX_PT_MUTEX_LOCK(&iqueue->mutex);
00152 
00153    // add new request to end of list, updating list pointers as required
00154    if (iqueue->size == 0) // special case - list is empty
00155       iqueue->head = iqueue->tail = iqueue->current = item;
00156    else
00157    {
00158       iqueue->tail->next = item;
00159       iqueue->tail = item;
00160    }
00161 
00162    iqueue->size++;
00163 
00164    // check for queue overflow if this queue was created with a maximum size
00165    if (iqueue->maxsz > 0 && iqueue->size > iqueue->maxsz)
00166    {
00167       // remove the oldest entry at the queue head
00168       item = iqueue->head;
00169       iqueue->head = item->next;
00170       if (iqueue->current == item)
00171          iqueue->current = item->next;
00172 
00173       // adjust tail if queue is now empty
00174       if (iqueue->head == 0)
00175          iqueue->tail = 0;
00176 
00177       iqueue->size--;
00178 
00179       // call item payload destructor, if one was supplied
00180       if (iqueue->freepayload)
00181          iqueue->freepayload(item->pPayload);
00182 
00183       xfree(item);
00184    }
00185 
00186    // signal any waiters - there's a new item in the queue
00187    pthread_cond_signal(&iqueue->cv);
00188 
00189    DNX_PT_MUTEX_UNLOCK(&iqueue->mutex);
00190 
00191    return DNX_OK;
00192 }
00193 
00194 //----------------------------------------------------------------------------
00195 
00196 int dnxQueueGet(DnxQueue * queue, void ** ppPayload)
00197 {
00198    iDnxQueue * iqueue = (iDnxQueue *)queue;
00199    iDnxQueueEntry * item = 0;
00200 
00201    assert(queue && ppPayload);
00202 
00203    DNX_PT_MUTEX_LOCK(&iqueue->mutex);
00204 
00205    if (iqueue->size > 0)
00206    {
00207       // remove the 'head' item from the queue
00208       item = iqueue->head;
00209       iqueue->head = item->next;
00210       if (iqueue->current == item)
00211          iqueue->current = item->next;
00212 
00213       // adjust tail pointer if queue is now empty
00214       if (iqueue->head == 0)
00215          iqueue->tail = 0;
00216 
00217       iqueue->size--;
00218    }
00219 
00220    DNX_PT_MUTEX_UNLOCK(&iqueue->mutex);
00221 
00222    // return the payload to the caller, free queue item
00223    if (item)
00224    {
00225       *ppPayload = item->pPayload;
00226       xfree(item);
00227       return DNX_OK;
00228    }
00229 
00230    return DNX_ERR_NOTFOUND;
00231 }
00232 
00233 //----------------------------------------------------------------------------
00234 
00235 int dnxQueueGetWait(DnxQueue * queue, unsigned timeout, void ** ppPayload)
00236 {
00237    iDnxQueue * iqueue = (iDnxQueue *)queue;
00238    iDnxQueueEntry * item = 0;
00239 
00240    assert(queue && ppPayload);
00241 
00242    DNX_PT_MUTEX_LOCK(&iqueue->mutex);
00243 
00244    // block this thread until it can dequeue a request
00245    while (item == 0)
00246    {
00247       // see if we have any queue items already waiting
00248       if (iqueue->size > 0)
00249       {
00250          item = iqueue->head;
00251          iqueue->head = item->next;
00252          if (iqueue->current == item)
00253             iqueue->current = item->next;
00254 
00255          // adjust the tail pointer if the queue is now empty
00256          if (iqueue->head == 0)
00257             iqueue->tail = 0;
00258 
00259          iqueue->size--;
00260       }
00261       else if (timeout > 0) // queue is empty
00262       {
00263          struct timespec to; 
00264          clock_gettime(CLOCK_REALTIME, &to);
00265          to.tv_sec += timeout;
00266          if (pthread_cond_timedwait(&iqueue->cv, 
00267                &iqueue->mutex, &to) == ETIMEDOUT)
00268             break;
00269       }
00270       else
00271          pthread_cond_wait(&iqueue->cv, &iqueue->mutex);
00272    }
00273 
00274    DNX_PT_MUTEX_UNLOCK(&iqueue->mutex);
00275 
00276    // return the payload to the caller.
00277    if (item)
00278    {
00279       *ppPayload = item->pPayload;
00280       xfree(item);
00281       return DNX_OK;
00282    }
00283 
00284    return DNX_ERR_NOTFOUND;
00285 }
00286 
00287 //----------------------------------------------------------------------------
00288 
00289 DnxQueueResult dnxQueueRemove(DnxQueue * queue, void ** ppPayload,
00290       DnxQueueResult (*Compare)(void * pLeft, void * pRight))
00291 {
00292    DnxQueueResult bFound = DNX_QRES_CONTINUE;
00293    iDnxQueue * iqueue = (iDnxQueue *)queue;
00294    iDnxQueueEntry * item, * prev;
00295 
00296    assert(queue && ppPayload && Compare);
00297 
00298    DNX_PT_MUTEX_LOCK(&iqueue->mutex);
00299 
00300    prev = 0;
00301    for (item = iqueue->head; item; item = item->next)
00302    {
00303       if ((bFound = Compare(*ppPayload, item->pPayload)) != DNX_QRES_CONTINUE)
00304       {
00305          if (bFound == DNX_QRES_FOUND)
00306          {
00307             *ppPayload = item->pPayload;
00308 
00309             // cross-link previous to next and free current
00310             if (prev)
00311                prev->next = item->next;
00312             else                          // removing the head item
00313                iqueue->head = item->next;
00314 
00315             if (item->next == 0)          // removing the tail item
00316                iqueue->tail = prev;
00317 
00318             if (iqueue->current == item)  // advance circular pointer
00319                if ((iqueue->current = item->next) == 0)
00320                   iqueue->current = iqueue->head;
00321 
00322             iqueue->size--;
00323          }
00324          break;
00325       }
00326       prev = item;
00327    }
00328 
00329    DNX_PT_MUTEX_UNLOCK(&iqueue->mutex);
00330 
00331    if (bFound == DNX_QRES_FOUND)
00332       xfree(item);       // free the queue entry wrapper object
00333 
00334    return bFound;
00335 }
00336 
00337 //----------------------------------------------------------------------------
00338 
00339 DnxQueueResult dnxQueueFind(DnxQueue * queue, void ** ppPayload,
00340       DnxQueueResult (*Compare)(void * pLeft, void * pRight))
00341 {
00342    DnxQueueResult bFound = DNX_QRES_CONTINUE;
00343    iDnxQueue * iqueue = (iDnxQueue *)queue;
00344    iDnxQueueEntry * item;
00345 
00346    assert(queue && ppPayload && Compare);
00347 
00348    DNX_PT_MUTEX_LOCK(&iqueue->mutex);
00349 
00350    for (item = iqueue->head; item; item = item->next)
00351    {
00352       if ((bFound = Compare(*ppPayload, item->pPayload)) != DNX_QRES_CONTINUE)
00353       {
00354          if (bFound == DNX_QRES_FOUND)
00355             *ppPayload = item->pPayload;
00356          break;
00357       }
00358    }
00359 
00360    DNX_PT_MUTEX_UNLOCK(&iqueue->mutex);
00361 
00362    return bFound;
00363 }
00364 
00365 //----------------------------------------------------------------------------
00366 
00367 int dnxQueueCreate(unsigned maxsz, void (*pldtor)(void *), DnxQueue ** pqueue)
00368 {
00369    iDnxQueue * iqueue;
00370 
00371    assert(pqueue);
00372 
00373    if ((iqueue = (iDnxQueue *)xmalloc(sizeof *iqueue)) == 0)
00374       return DNX_ERR_MEMORY;
00375 
00376    // initialize queue
00377    memset(iqueue, 0, sizeof *iqueue);
00378    iqueue->freepayload = pldtor;
00379    iqueue->maxsz = maxsz;
00380 
00381    // initialize thread sync
00382    DNX_PT_MUTEX_INIT(&iqueue->mutex);
00383    pthread_cond_init(&iqueue->cv, 0);
00384 
00385    *pqueue = (DnxQueue *)iqueue;
00386 
00387    return DNX_OK;
00388 }
00389 
00390 //----------------------------------------------------------------------------
00391 
00392 void dnxQueueDestroy(DnxQueue * queue)
00393 {
00394    iDnxQueue * iqueue = (iDnxQueue *)queue;
00395    iDnxQueueEntry * item;
00396 
00397    assert(queue);
00398 
00399    DNX_PT_MUTEX_LOCK(&iqueue->mutex);
00400 
00401    // first free any requests that might be on the queue
00402    item = iqueue->head;
00403    while (item != 0)
00404    {
00405       iDnxQueueEntry * next = item->next;
00406       iqueue->freepayload(item->pPayload);
00407       xfree(item);
00408       item = next;
00409    }
00410 
00411    DNX_PT_MUTEX_UNLOCK(&iqueue->mutex);
00412 
00413    DNX_PT_MUTEX_DESTROY(&iqueue->mutex);
00414    pthread_cond_destroy(&iqueue->cv);
00415 
00416    xfree(iqueue);
00417 }
00418 
00419 /*--------------------------------------------------------------------------
00420                                  UNIT TEST
00421 
00422    From within dnx/server, compile with GNU tools using this command line:
00423 
00424       gcc -DDEBUG -DDNX_QUEUE_TEST -g -O0 -I../common dnxQueue.c \
00425          ../common/dnxError.c -lpthread -lgcc_s -lrt -o dnxQueueTest
00426 
00427    Alternatively, a heap check may be done with the following command line:
00428 
00429       gcc -DDEBUG -DDEBUG_HEAP -DDNX_QUEUE_TEST -g -O0 -I../common \
00430          dnxQueue.c ../common/dnxError.c ../common/dnxHeap.c \
00431          -lpthread -lgcc_s -lrt -o dnxCfgParserTest
00432 
00433   --------------------------------------------------------------------------*/
00434 
00435 #ifdef DNX_QUEUE_TEST
00436 
00437 #include "utesthelp.h"
00438 #include <time.h>
00439 
00440 #define elemcount(x) (sizeof(x)/sizeof(*(x)))
00441 
00442 static int free_count;
00443 static int verbose;
00444 
00445 IMPLEMENT_DNX_DEBUG(verbose);
00446 
00447 // functional stubs
00448 static DnxQueueResult qtcmp(void * left, void * right)
00449       { return strcmp((char *)left, (char *)right) == 0?
00450             DNX_QRES_FOUND: DNX_QRES_CONTINUE; }
00451 
00452 static void qtfree(void * p)
00453 {
00454    free_count++;
00455    free(p);
00456 }
00457 
00458 int main(int argc, char ** argv)
00459 {
00460    DnxQueue * queue;
00461    iDnxQueue * iqueue;
00462    DnxQueueResult qres;
00463    char * msg100_static = "message 100";
00464    char * msg25_static = "message 25";
00465    char * msg250_static = "message 250";
00466    char * msgs[101];
00467    char * msg2;
00468    int i;
00469 
00470    verbose = argc > 1? 1: 0;
00471 
00472    // create a new queue and get a concrete reference to it for testing
00473    CHECK_ZERO(dnxQueueCreate(100, qtfree, &queue));
00474    iqueue = (iDnxQueue *)queue;
00475 
00476    // check internal data structure state
00477    CHECK_TRUE(iqueue->head == 0);
00478    CHECK_TRUE(iqueue->tail == 0);
00479    CHECK_TRUE(iqueue->current == 0);
00480    CHECK_TRUE(iqueue->freepayload == qtfree);
00481    CHECK_TRUE(iqueue->size == 0);
00482    CHECK_TRUE(iqueue->maxsz == 100);
00483 
00484    // enqueue the messages
00485    free_count = 0;
00486    for (i = 0; i < elemcount(msgs); i++)
00487    {
00488       char buf[32];
00489       sprintf(buf, "message %d", i);
00490       CHECK_NONZERO(msgs[i] = strdup(buf));
00491       CHECK_ZERO(dnxQueuePut(queue, msgs[i]));
00492    }
00493 
00494    // we pushed one more than the size of the queue
00495    // item 0 should have been freed by the destructor
00496    CHECK_TRUE(free_count == 1);
00497 
00498    // get item 1 from the queue - we'll own it after this call
00499    CHECK_ZERO(dnxQueueGet(queue, (void **)&msg2));
00500    CHECK_TRUE(strcmp(msg2, "message 1") == 0);
00501    free(msg2);
00502 
00503    // find and remove item 100 from the queue - we'll own it on success
00504    msg2 = msg100_static;
00505    CHECK_TRUE(dnxQueueRemove(queue, (void **)&msg2, qtcmp) == DNX_QRES_FOUND);
00506    CHECK_NONZERO(msg2);
00507    CHECK_TRUE(msg2 != msg100_static);
00508    free(msg2);
00509 
00510    // attempt to find an existing item
00511    msg2 = msg25_static;
00512    CHECK_TRUE(dnxQueueFind(queue, (void **)&msg2, qtcmp) == DNX_QRES_FOUND);
00513    CHECK_TRUE(msg2 != msg25_static);
00514    CHECK_TRUE(strcmp(msg2, msgs[25]) == 0);
00515 
00516    // attempt to find a non-existent item
00517    msg2 = msg250_static;
00518    CHECK_TRUE(dnxQueueFind(queue, (void **)&msg2, qtcmp) == DNX_QRES_CONTINUE);
00519 
00520    // remove remaining entries
00521    for (i = 3; i < elemcount(msgs); i++)
00522    {
00523       CHECK_ZERO(dnxQueueGet(queue, (void **)&msg2));
00524       free(msg2);
00525    }
00526 
00527    // attempt to remove one more entry
00528    CHECK_NONZERO(dnxQueueGet(queue, (void **)&msg2));
00529 
00530    // ensure queue is now empty
00531    CHECK_TRUE(dnxQueueSize(queue) == 0);
00532 
00533    dnxQueueDestroy(queue);
00534 
00535    // we should have called the destructor only once
00536    CHECK_TRUE(free_count == 1);
00537 
00538 #ifdef DEBUG_HEAP
00539    CHECK_ZERO(dnxCheckHeap());
00540 #endif
00541 
00542    return 0;
00543 }
00544 
00545 #endif   /* DNX_QUEUE_TEST */
00546 
00547 /*--------------------------------------------------------------------------*/
00548 

Generated on Tue Apr 13 15:48:07 2010 for DNX by  doxygen 1.5.6