dnxQueue.c

Go to the documentation of this file.
00001 /*--------------------------------------------------------------------------
00002 
00003    Copyright (c) 2006-2007, Intellectual Reserve, Inc. All rights reserved.
00004 
00005    This program is free software; you can redistribute it and/or modify
00006    it under the terms of the GNU General Public License version 2 as
00007    published by the Free Software Foundation.
00008 
00009    This program is distributed in the hope that it will be useful,
00010    but WITHOUT ANY WARRANTY; without even the implied warranty of
00011    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00012    GNU General Public License for more details.
00013 
00014    You should have received a copy of the GNU General Public License
00015    along with this program; if not, write to the Free Software
00016    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00017 
00018   --------------------------------------------------------------------------*/
00019 
00028 #include "dnxQueue.h"
00029 
00030 #include "dnxError.h"
00031 #include "dnxDebug.h"
00032 #include "dnxLogging.h"
00033 
00034 #include <stdlib.h>
00035 #include <syslog.h>
00036 #include <assert.h>
00037 #include <pthread.h>
00038 
00040 typedef struct iDnxQueueEntry_
00041 {
00042    struct iDnxQueueEntry_ * next;   
00043    void * pPayload;                 
00044 } iDnxQueueEntry;
00045 
00047 typedef struct iDnxQueue_
00048 {
00049    iDnxQueueEntry * head;           
00050    iDnxQueueEntry * tail;           
00051    iDnxQueueEntry * current;        
00052    void (*freepayload)(void *);     
00053    unsigned size;                   
00054    unsigned maxsz;                  
00055    pthread_mutex_t mutex;           
00056    pthread_cond_t cv;               
00057 } iDnxQueue;
00058 
00059 /*--------------------------------------------------------------------------
00060                      NON-EXPORTED INTERFACE (not used yet)
00061   --------------------------------------------------------------------------*/
00062 
00078 int dnxQueueGetWait(DnxQueue * queue, void ** ppPayload)
00079 {
00080    iDnxQueue * iqueue = (iDnxQueue *)queue;
00081    iDnxQueueEntry * item = 0;
00082 
00083    assert(queue && ppPayload);
00084 
00085    DNX_PT_MUTEX_LOCK(&iqueue->mutex);
00086 
00087    // block this thread until it can dequeue a request
00088    while (item == 0)
00089    {
00090       // see if we have any queue items already waiting
00091       if (iqueue->size > 0)
00092       {
00093          item = iqueue->head;
00094          iqueue->head = item->next;
00095          if (iqueue->current == item)
00096             iqueue->current = item->next;
00097 
00098          // adjust the tail pointer if the queue is now empty
00099          if (iqueue->head == 0)
00100             iqueue->tail = 0;
00101 
00102          iqueue->size--;
00103       }
00104       else     // queue is empty
00105          pthread_cond_wait(&iqueue->cv, &iqueue->mutex);
00106    }
00107 
00108    DNX_PT_MUTEX_UNLOCK(&iqueue->mutex);
00109 
00110    // return the payload to the caller.
00111    if (item)
00112    {
00113       *ppPayload = item->pPayload;
00114       xfree(item);
00115       return DNX_OK;
00116    }
00117 
00118    return DNX_ERR_NOTFOUND;
00119 }
00120 
00121 //----------------------------------------------------------------------------
00122 
00138 int dnxQueueNext(DnxQueue * queue, void ** ppPayload)
00139 {
00140    iDnxQueue * iqueue = (iDnxQueue *)queue;
00141 
00142    assert(queue && ppPayload);
00143 
00144    *ppPayload = 0;
00145 
00146    DNX_PT_MUTEX_LOCK(&iqueue->mutex);
00147 
00148    // save pointer to current payload
00149    if (iqueue->current)
00150    {
00151       *ppPayload = iqueue->current->pPayload;
00152 
00153       // advance circular buffer pointer
00154       if (iqueue->current->next)
00155          iqueue->current = iqueue->current->next;
00156       else
00157          iqueue->current = iqueue->head;
00158    }
00159 
00160    DNX_PT_MUTEX_UNLOCK(&iqueue->mutex);
00161 
00162    return *ppPayload ? DNX_OK : DNX_ERR_NOTFOUND;
00163 }
00164 
00165 //----------------------------------------------------------------------------
00166 
00177 int dnxQueueSize(DnxQueue * queue)
00178 {
00179    iDnxQueue * iqueue = (iDnxQueue *)queue;
00180    int count;
00181 
00182    assert(queue);
00183 
00184    DNX_PT_MUTEX_LOCK(&iqueue->mutex);
00185 
00186    count = (int)iqueue->size;
00187 
00188    DNX_PT_MUTEX_UNLOCK(&iqueue->mutex);
00189 
00190    return count;
00191 }
00192 
00193 /*--------------------------------------------------------------------------
00194                                  INTERFACE
00195   --------------------------------------------------------------------------*/
00196 
00197 int dnxQueuePut(DnxQueue * queue, void * pPayload)
00198 {
00199    iDnxQueue * iqueue = (iDnxQueue *)queue;
00200    iDnxQueueEntry * item;
00201 
00202    assert(queue);
00203 
00204    // create structure with new request
00205    if ((item = (iDnxQueueEntry *)xmalloc(sizeof *item)) == 0)
00206       return DNX_ERR_MEMORY;
00207 
00208    item->pPayload = pPayload;
00209    item->next = 0;
00210 
00211    DNX_PT_MUTEX_LOCK(&iqueue->mutex);
00212 
00213    // add new request to end of list, updating list pointers as required
00214    if (iqueue->size == 0) // special case - list is empty
00215       iqueue->head = iqueue->tail = iqueue->current = item;
00216    else
00217    {
00218       iqueue->tail->next = item;
00219       iqueue->tail = item;
00220    }
00221 
00222    iqueue->size++;
00223 
00224    // check for queue overflow if this queue was created with a maximum size
00225    if (iqueue->maxsz > 0 && iqueue->size > iqueue->maxsz)
00226    {
00227       // remove the oldest entry at the queue head
00228       item = iqueue->head;
00229       iqueue->head = item->next;
00230       if (iqueue->current == item)
00231          iqueue->current = item->next;
00232 
00233       // adjust tail if queue is now empty
00234       if (iqueue->head == 0)
00235          iqueue->tail = 0;
00236 
00237       iqueue->size--;
00238 
00239       // call item payload destructor, if one was supplied
00240       if (iqueue->freepayload)
00241          iqueue->freepayload(item->pPayload);
00242 
00243       xfree(item);
00244    }
00245 
00246    // signal any waiters - there's a new item in the queue
00247    pthread_cond_signal(&iqueue->cv);
00248 
00249    DNX_PT_MUTEX_UNLOCK(&iqueue->mutex);
00250 
00251    return DNX_OK;
00252 }
00253 
00254 //----------------------------------------------------------------------------
00255 
00256 int dnxQueueGet(DnxQueue * queue, void ** ppPayload)
00257 {
00258    iDnxQueue * iqueue = (iDnxQueue *)queue;
00259    iDnxQueueEntry * item = 0;
00260 
00261    assert(queue && ppPayload);
00262 
00263    DNX_PT_MUTEX_LOCK(&iqueue->mutex);
00264 
00265    if (iqueue->size > 0)
00266    {
00267       // remove the 'head' item from the queue
00268       item = iqueue->head;
00269       iqueue->head = item->next;
00270       if (iqueue->current == item)
00271          iqueue->current = item->next;
00272 
00273       // adjust tail pointer if queue is now empty
00274       if (iqueue->head == 0)
00275          iqueue->tail = 0;
00276 
00277       iqueue->size--;
00278    }
00279 
00280    DNX_PT_MUTEX_UNLOCK(&iqueue->mutex);
00281 
00282    // return the payload to the caller, free queue item
00283    if (item)
00284    {
00285       *ppPayload = item->pPayload;
00286       xfree(item);
00287       return DNX_OK;
00288    }
00289 
00290    return DNX_ERR_NOTFOUND;
00291 }
00292 
00293 //----------------------------------------------------------------------------
00294 
00295 DnxQueueResult dnxQueueRemove(DnxQueue * queue, void ** ppPayload,
00296       DnxQueueResult (*Compare)(void * pLeft, void * pRight))
00297 {
00298    DnxQueueResult bFound = DNX_QRES_CONTINUE;
00299    iDnxQueue * iqueue = (iDnxQueue *)queue;
00300    iDnxQueueEntry * item, * prev;
00301 
00302    assert(queue && ppPayload && Compare);
00303 
00304    DNX_PT_MUTEX_LOCK(&iqueue->mutex);
00305 
00306    prev = 0;
00307    for (item = iqueue->head; item; item = item->next)
00308    {
00309       if ((bFound = Compare(*ppPayload, item->pPayload)) != DNX_QRES_CONTINUE)
00310       {
00311          if (bFound == DNX_QRES_FOUND)
00312          {
00313             *ppPayload = item->pPayload;
00314 
00315             // cross-link previous to next and free current
00316             if (prev)
00317                prev->next = item->next;
00318             else                          // removing the head item
00319                iqueue->head = item->next;
00320 
00321             if (item->next == 0)          // removing the tail item
00322                iqueue->tail = prev;
00323 
00324             if (iqueue->current == item)  // advance circular pointer
00325                if ((iqueue->current = item->next) == 0)
00326                   iqueue->current = iqueue->head;
00327 
00328             iqueue->size--;
00329          }
00330          break;
00331       }
00332       prev = item;
00333    }
00334 
00335    DNX_PT_MUTEX_UNLOCK(&iqueue->mutex);
00336 
00337    if (bFound == DNX_QRES_FOUND)
00338       xfree(item);       // free the queue entry wrapper object
00339 
00340    return bFound;
00341 }
00342 
00343 //----------------------------------------------------------------------------
00344 
00345 DnxQueueResult dnxQueueFind(DnxQueue * queue, void ** ppPayload,
00346       DnxQueueResult (*Compare)(void * pLeft, void * pRight))
00347 {
00348    DnxQueueResult bFound = DNX_QRES_CONTINUE;
00349    iDnxQueue * iqueue = (iDnxQueue *)queue;
00350    iDnxQueueEntry * item;
00351 
00352    assert(queue && ppPayload && Compare);
00353 
00354    DNX_PT_MUTEX_LOCK(&iqueue->mutex);
00355 
00356    for (item = iqueue->head; item; item = item->next)
00357    {
00358       if ((bFound = Compare(*ppPayload, item->pPayload)) != DNX_QRES_CONTINUE)
00359       {
00360          if (bFound == DNX_QRES_FOUND)
00361             *ppPayload = item->pPayload;
00362          break;
00363       }
00364    }
00365 
00366    DNX_PT_MUTEX_UNLOCK(&iqueue->mutex);
00367 
00368    return bFound;
00369 }
00370 
00371 //----------------------------------------------------------------------------
00372 
00373 int dnxQueueCreate(unsigned maxsz, void (*pldtor)(void *), DnxQueue ** pqueue)
00374 {
00375    iDnxQueue * iqueue;
00376 
00377    assert(pqueue);
00378 
00379    if ((iqueue = (iDnxQueue *)xmalloc(sizeof *iqueue)) == 0)
00380       return DNX_ERR_MEMORY;
00381 
00382    // initialize queue
00383    memset(iqueue, 0, sizeof *iqueue);
00384    iqueue->freepayload = pldtor;
00385    iqueue->maxsz = maxsz;
00386 
00387    // initialize thread sync
00388    DNX_PT_MUTEX_INIT(&iqueue->mutex);
00389    pthread_cond_init(&iqueue->cv, 0);
00390 
00391    *pqueue = (DnxQueue *)iqueue;
00392 
00393    return DNX_OK;
00394 }
00395 
00396 //----------------------------------------------------------------------------
00397 
00398 void dnxQueueDestroy(DnxQueue * queue)
00399 {
00400    iDnxQueue * iqueue = (iDnxQueue *)queue;
00401    iDnxQueueEntry * item;
00402 
00403    assert(queue);
00404 
00405    DNX_PT_MUTEX_LOCK(&iqueue->mutex);
00406 
00407    // first free any requests that might be on the queue
00408    item = iqueue->head;
00409    while (item != 0)
00410    {
00411       iDnxQueueEntry * next = item->next;
00412       iqueue->freepayload(item->pPayload);
00413       xfree(item);
00414       item = next;
00415    }
00416 
00417    DNX_PT_MUTEX_UNLOCK(&iqueue->mutex);
00418 
00419    DNX_PT_MUTEX_DESTROY(&iqueue->mutex);
00420    pthread_cond_destroy(&iqueue->cv);
00421 
00422    xfree(iqueue);
00423 }
00424 
00425 /*--------------------------------------------------------------------------
00426                                  UNIT TEST
00427 
00428    From within dnx/server, compile with GNU tools using this command line:
00429 
00430       gcc -DDEBUG -DDNX_QUEUE_TEST -g -O0 -I../common dnxQueue.c \
00431          ../common/dnxError.c -lpthread -lgcc_s -lrt -o dnxQueueTest
00432 
00433    Alternatively, a heap check may be done with the following command line:
00434 
00435       gcc -DDEBUG -DDEBUG_HEAP -DDNX_QUEUE_TEST -g -O0 -I../common \
00436          dnxQueue.c ../common/dnxError.c ../common/dnxHeap.c \
00437          -lpthread -lgcc_s -lrt -o dnxQueueTest
00438 
00439   --------------------------------------------------------------------------*/
00440 
00441 #ifdef DNX_QUEUE_TEST
00442 
00443 #include "utesthelp.h"
00444 #include <time.h>
00445 
00446 #define elemcount(x) (sizeof(x)/sizeof(*(x)))
00447 
00448 static int free_count;
00449 static int verbose;
00450 
00451 IMPLEMENT_DNX_DEBUG(verbose);
00452 
00453 // functional stubs
00454 static DnxQueueResult qtcmp(void * left, void * right)
00455       { return strcmp((char *)left, (char *)right) == 0?
00456             DNX_QRES_FOUND: DNX_QRES_CONTINUE; }
00457 
00458 static void qtfree(void * p)
00459 {
00460    free_count++;
00461    free(p);
00462 }
00463 
00464 int main(int argc, char ** argv)
00465 {
00466    DnxQueue * queue;
00467    iDnxQueue * iqueue;
00468    DnxQueueResult qres;
00469    char * msg100_static = "message 100";
00470    char * msg25_static = "message 25";
00471    char * msg250_static = "message 250";
00472    char * msgs[101];
00473    char * msg2;
00474    int i;
00475 
00476    verbose = argc > 1? 1: 0;
00477 
00478    // create a new queue and get a concrete reference to it for testing
00479    CHECK_ZERO(dnxQueueCreate(100, qtfree, &queue));
00480    iqueue = (iDnxQueue *)queue;
00481 
00482    // check internal data structure state
00483    CHECK_TRUE(iqueue->head == 0);
00484    CHECK_TRUE(iqueue->tail == 0);
00485    CHECK_TRUE(iqueue->current == 0);
00486    CHECK_TRUE(iqueue->freepayload == qtfree);
00487    CHECK_TRUE(iqueue->size == 0);
00488    CHECK_TRUE(iqueue->maxsz == 100);
00489 
00490    // enqueue the messages
00491    free_count = 0;
00492    for (i = 0; i < elemcount(msgs); i++)
00493    {
00494       char buf[32];
00495       sprintf(buf, "message %d", i);
00496       CHECK_NONZERO(msgs[i] = strdup(buf));
00497       CHECK_ZERO(dnxQueuePut(queue, msgs[i]));
00498    }
00499 
00500    // we pushed one more than the size of the queue
00501    // item 0 should have been freed by the destructor
00502    CHECK_TRUE(free_count == 1);
00503 
00504    // get item 1 from the queue - we'll own it after this call
00505    CHECK_ZERO(dnxQueueGet(queue, (void **)&msg2));
00506    CHECK_TRUE(strcmp(msg2, "message 1") == 0);
00507    free(msg2);
00508 
00509    // find and remove item 100 from the queue - we'll own it on success
00510    msg2 = msg100_static;
00511    CHECK_TRUE(dnxQueueRemove(queue, (void **)&msg2, qtcmp) == DNX_QRES_FOUND);
00512    CHECK_NONZERO(msg2);
00513    CHECK_TRUE(msg2 != msg100_static);
00514    free(msg2);
00515 
00516    // attempt to find an existing item
00517    msg2 = msg25_static;
00518    CHECK_TRUE(dnxQueueFind(queue, (void **)&msg2, qtcmp) == DNX_QRES_FOUND);
00519    CHECK_TRUE(msg2 != msg25_static);
00520    CHECK_TRUE(strcmp(msg2, msgs[25]) == 0);
00521 
00522    // attempt to find a non-existent item
00523    msg2 = msg250_static;
00524    CHECK_TRUE(dnxQueueFind(queue, (void **)&msg2, qtcmp) == DNX_QRES_CONTINUE);
00525 
00526    // remove remaining entries
00527    for (i = 3; i < elemcount(msgs); i++)
00528    {
00529       CHECK_ZERO(dnxQueueGet(queue, (void **)&msg2));
00530       free(msg2);
00531    }
00532 
00533    // attempt to remove one more entry
00534    CHECK_NONZERO(dnxQueueGet(queue, (void **)&msg2));
00535 
00536    // ensure queue is now empty
00537    CHECK_TRUE(dnxQueueSize(queue) == 0);
00538 
00539    dnxQueueDestroy(queue);
00540 
00541    // we should have called the destructor only once
00542    CHECK_TRUE(free_count == 1);
00543 
00544 #ifdef DEBUG_HEAP
00545    CHECK_ZERO(dnxCheckHeap());
00546 #endif
00547 
00548    return 0;
00549 }
00550 
00551 #endif   /* DNX_QUEUE_TEST */
00552 
00553 /*--------------------------------------------------------------------------*/
00554 

Generated on Tue Apr 13 15:48:07 2010 for DNX by  doxygen 1.5.6