dnxQueue.c
Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00028 #include "dnxQueue.h"
00029
00030 #include "dnxError.h"
00031 #include "dnxDebug.h"
00032 #include "dnxLogging.h"
00033
00034 #include <stdlib.h>
00035 #include <syslog.h>
00036 #include <assert.h>
00037 #include <pthread.h>
00038
00040 typedef struct iDnxQueueEntry_
00041 {
00042 struct iDnxQueueEntry_ * next;
00043 void * pPayload;
00044 } iDnxQueueEntry;
00045
00047 typedef struct iDnxQueue_
00048 {
00049 iDnxQueueEntry * head;
00050 iDnxQueueEntry * tail;
00051 iDnxQueueEntry * current;
00052 void (*freepayload)(void *);
00053 unsigned size;
00054 unsigned maxsz;
00055 pthread_mutex_t mutex;
00056 pthread_cond_t cv;
00057 } iDnxQueue;
00058
00059
00060
00061
00062
00078 int dnxQueueGetWait(DnxQueue * queue, void ** ppPayload)
00079 {
00080 iDnxQueue * iqueue = (iDnxQueue *)queue;
00081 iDnxQueueEntry * item = 0;
00082
00083 assert(queue && ppPayload);
00084
00085 DNX_PT_MUTEX_LOCK(&iqueue->mutex);
00086
00087
00088 while (item == 0)
00089 {
00090
00091 if (iqueue->size > 0)
00092 {
00093 item = iqueue->head;
00094 iqueue->head = item->next;
00095 if (iqueue->current == item)
00096 iqueue->current = item->next;
00097
00098
00099 if (iqueue->head == 0)
00100 iqueue->tail = 0;
00101
00102 iqueue->size--;
00103 }
00104 else
00105 pthread_cond_wait(&iqueue->cv, &iqueue->mutex);
00106 }
00107
00108 DNX_PT_MUTEX_UNLOCK(&iqueue->mutex);
00109
00110
00111 if (item)
00112 {
00113 *ppPayload = item->pPayload;
00114 xfree(item);
00115 return DNX_OK;
00116 }
00117
00118 return DNX_ERR_NOTFOUND;
00119 }
00120
00121
00122
00138 int dnxQueueNext(DnxQueue * queue, void ** ppPayload)
00139 {
00140 iDnxQueue * iqueue = (iDnxQueue *)queue;
00141
00142 assert(queue && ppPayload);
00143
00144 *ppPayload = 0;
00145
00146 DNX_PT_MUTEX_LOCK(&iqueue->mutex);
00147
00148
00149 if (iqueue->current)
00150 {
00151 *ppPayload = iqueue->current->pPayload;
00152
00153
00154 if (iqueue->current->next)
00155 iqueue->current = iqueue->current->next;
00156 else
00157 iqueue->current = iqueue->head;
00158 }
00159
00160 DNX_PT_MUTEX_UNLOCK(&iqueue->mutex);
00161
00162 return *ppPayload ? DNX_OK : DNX_ERR_NOTFOUND;
00163 }
00164
00165
00166
00177 int dnxQueueSize(DnxQueue * queue)
00178 {
00179 iDnxQueue * iqueue = (iDnxQueue *)queue;
00180 int count;
00181
00182 assert(queue);
00183
00184 DNX_PT_MUTEX_LOCK(&iqueue->mutex);
00185
00186 count = (int)iqueue->size;
00187
00188 DNX_PT_MUTEX_UNLOCK(&iqueue->mutex);
00189
00190 return count;
00191 }
00192
00193
00194
00195
00196
00197 int dnxQueuePut(DnxQueue * queue, void * pPayload)
00198 {
00199 iDnxQueue * iqueue = (iDnxQueue *)queue;
00200 iDnxQueueEntry * item;
00201
00202 assert(queue);
00203
00204
00205 if ((item = (iDnxQueueEntry *)xmalloc(sizeof *item)) == 0)
00206 return DNX_ERR_MEMORY;
00207
00208 item->pPayload = pPayload;
00209 item->next = 0;
00210
00211 DNX_PT_MUTEX_LOCK(&iqueue->mutex);
00212
00213
00214 if (iqueue->size == 0)
00215 iqueue->head = iqueue->tail = iqueue->current = item;
00216 else
00217 {
00218 iqueue->tail->next = item;
00219 iqueue->tail = item;
00220 }
00221
00222 iqueue->size++;
00223
00224
00225 if (iqueue->maxsz > 0 && iqueue->size > iqueue->maxsz)
00226 {
00227
00228 item = iqueue->head;
00229 iqueue->head = item->next;
00230 if (iqueue->current == item)
00231 iqueue->current = item->next;
00232
00233
00234 if (iqueue->head == 0)
00235 iqueue->tail = 0;
00236
00237 iqueue->size--;
00238
00239
00240 if (iqueue->freepayload)
00241 iqueue->freepayload(item->pPayload);
00242
00243 xfree(item);
00244 }
00245
00246
00247 pthread_cond_signal(&iqueue->cv);
00248
00249 DNX_PT_MUTEX_UNLOCK(&iqueue->mutex);
00250
00251 return DNX_OK;
00252 }
00253
00254
00255
00256 int dnxQueueGet(DnxQueue * queue, void ** ppPayload)
00257 {
00258 iDnxQueue * iqueue = (iDnxQueue *)queue;
00259 iDnxQueueEntry * item = 0;
00260
00261 assert(queue && ppPayload);
00262
00263 DNX_PT_MUTEX_LOCK(&iqueue->mutex);
00264
00265 if (iqueue->size > 0)
00266 {
00267
00268 item = iqueue->head;
00269 iqueue->head = item->next;
00270 if (iqueue->current == item)
00271 iqueue->current = item->next;
00272
00273
00274 if (iqueue->head == 0)
00275 iqueue->tail = 0;
00276
00277 iqueue->size--;
00278 }
00279
00280 DNX_PT_MUTEX_UNLOCK(&iqueue->mutex);
00281
00282
00283 if (item)
00284 {
00285 *ppPayload = item->pPayload;
00286 xfree(item);
00287 return DNX_OK;
00288 }
00289
00290 return DNX_ERR_NOTFOUND;
00291 }
00292
00293
00294
00295 DnxQueueResult dnxQueueRemove(DnxQueue * queue, void ** ppPayload,
00296 DnxQueueResult (*Compare)(void * pLeft, void * pRight))
00297 {
00298 DnxQueueResult bFound = DNX_QRES_CONTINUE;
00299 iDnxQueue * iqueue = (iDnxQueue *)queue;
00300 iDnxQueueEntry * item, * prev;
00301
00302 assert(queue && ppPayload && Compare);
00303
00304 DNX_PT_MUTEX_LOCK(&iqueue->mutex);
00305
00306 prev = 0;
00307 for (item = iqueue->head; item; item = item->next)
00308 {
00309 if ((bFound = Compare(*ppPayload, item->pPayload)) != DNX_QRES_CONTINUE)
00310 {
00311 if (bFound == DNX_QRES_FOUND)
00312 {
00313 *ppPayload = item->pPayload;
00314
00315
00316 if (prev)
00317 prev->next = item->next;
00318 else
00319 iqueue->head = item->next;
00320
00321 if (item->next == 0)
00322 iqueue->tail = prev;
00323
00324 if (iqueue->current == item)
00325 if ((iqueue->current = item->next) == 0)
00326 iqueue->current = iqueue->head;
00327
00328 iqueue->size--;
00329 }
00330 break;
00331 }
00332 prev = item;
00333 }
00334
00335 DNX_PT_MUTEX_UNLOCK(&iqueue->mutex);
00336
00337 if (bFound == DNX_QRES_FOUND)
00338 xfree(item);
00339
00340 return bFound;
00341 }
00342
00343
00344
00345 DnxQueueResult dnxQueueFind(DnxQueue * queue, void ** ppPayload,
00346 DnxQueueResult (*Compare)(void * pLeft, void * pRight))
00347 {
00348 DnxQueueResult bFound = DNX_QRES_CONTINUE;
00349 iDnxQueue * iqueue = (iDnxQueue *)queue;
00350 iDnxQueueEntry * item;
00351
00352 assert(queue && ppPayload && Compare);
00353
00354 DNX_PT_MUTEX_LOCK(&iqueue->mutex);
00355
00356 for (item = iqueue->head; item; item = item->next)
00357 {
00358 if ((bFound = Compare(*ppPayload, item->pPayload)) != DNX_QRES_CONTINUE)
00359 {
00360 if (bFound == DNX_QRES_FOUND)
00361 *ppPayload = item->pPayload;
00362 break;
00363 }
00364 }
00365
00366 DNX_PT_MUTEX_UNLOCK(&iqueue->mutex);
00367
00368 return bFound;
00369 }
00370
00371
00372
00373 int dnxQueueCreate(unsigned maxsz, void (*pldtor)(void *), DnxQueue ** pqueue)
00374 {
00375 iDnxQueue * iqueue;
00376
00377 assert(pqueue);
00378
00379 if ((iqueue = (iDnxQueue *)xmalloc(sizeof *iqueue)) == 0)
00380 return DNX_ERR_MEMORY;
00381
00382
00383 memset(iqueue, 0, sizeof *iqueue);
00384 iqueue->freepayload = pldtor;
00385 iqueue->maxsz = maxsz;
00386
00387
00388 DNX_PT_MUTEX_INIT(&iqueue->mutex);
00389 pthread_cond_init(&iqueue->cv, 0);
00390
00391 *pqueue = (DnxQueue *)iqueue;
00392
00393 return DNX_OK;
00394 }
00395
00396
00397
00398 void dnxQueueDestroy(DnxQueue * queue)
00399 {
00400 iDnxQueue * iqueue = (iDnxQueue *)queue;
00401 iDnxQueueEntry * item;
00402
00403 assert(queue);
00404
00405 DNX_PT_MUTEX_LOCK(&iqueue->mutex);
00406
00407
00408 item = iqueue->head;
00409 while (item != 0)
00410 {
00411 iDnxQueueEntry * next = item->next;
00412 iqueue->freepayload(item->pPayload);
00413 xfree(item);
00414 item = next;
00415 }
00416
00417 DNX_PT_MUTEX_UNLOCK(&iqueue->mutex);
00418
00419 DNX_PT_MUTEX_DESTROY(&iqueue->mutex);
00420 pthread_cond_destroy(&iqueue->cv);
00421
00422 xfree(iqueue);
00423 }
00424
00425
00426
00427
00428
00429
00430
00431
00432
00433
00434
00435
00436
00437
00438
00439
00440
00441 #ifdef DNX_QUEUE_TEST
00442
00443 #include "utesthelp.h"
00444 #include <time.h>
00445
00446 #define elemcount(x) (sizeof(x)/sizeof(*(x)))
00447
00448 static int free_count;
00449 static int verbose;
00450
00451 IMPLEMENT_DNX_DEBUG(verbose);
00452
00453
00454 static DnxQueueResult qtcmp(void * left, void * right)
00455 { return strcmp((char *)left, (char *)right) == 0?
00456 DNX_QRES_FOUND: DNX_QRES_CONTINUE; }
00457
00458 static void qtfree(void * p)
00459 {
00460 free_count++;
00461 free(p);
00462 }
00463
00464 int main(int argc, char ** argv)
00465 {
00466 DnxQueue * queue;
00467 iDnxQueue * iqueue;
00468 DnxQueueResult qres;
00469 char * msg100_static = "message 100";
00470 char * msg25_static = "message 25";
00471 char * msg250_static = "message 250";
00472 char * msgs[101];
00473 char * msg2;
00474 int i;
00475
00476 verbose = argc > 1? 1: 0;
00477
00478
00479 CHECK_ZERO(dnxQueueCreate(100, qtfree, &queue));
00480 iqueue = (iDnxQueue *)queue;
00481
00482
00483 CHECK_TRUE(iqueue->head == 0);
00484 CHECK_TRUE(iqueue->tail == 0);
00485 CHECK_TRUE(iqueue->current == 0);
00486 CHECK_TRUE(iqueue->freepayload == qtfree);
00487 CHECK_TRUE(iqueue->size == 0);
00488 CHECK_TRUE(iqueue->maxsz == 100);
00489
00490
00491 free_count = 0;
00492 for (i = 0; i < elemcount(msgs); i++)
00493 {
00494 char buf[32];
00495 sprintf(buf, "message %d", i);
00496 CHECK_NONZERO(msgs[i] = strdup(buf));
00497 CHECK_ZERO(dnxQueuePut(queue, msgs[i]));
00498 }
00499
00500
00501
00502 CHECK_TRUE(free_count == 1);
00503
00504
00505 CHECK_ZERO(dnxQueueGet(queue, (void **)&msg2));
00506 CHECK_TRUE(strcmp(msg2, "message 1") == 0);
00507 free(msg2);
00508
00509
00510 msg2 = msg100_static;
00511 CHECK_TRUE(dnxQueueRemove(queue, (void **)&msg2, qtcmp) == DNX_QRES_FOUND);
00512 CHECK_NONZERO(msg2);
00513 CHECK_TRUE(msg2 != msg100_static);
00514 free(msg2);
00515
00516
00517 msg2 = msg25_static;
00518 CHECK_TRUE(dnxQueueFind(queue, (void **)&msg2, qtcmp) == DNX_QRES_FOUND);
00519 CHECK_TRUE(msg2 != msg25_static);
00520 CHECK_TRUE(strcmp(msg2, msgs[25]) == 0);
00521
00522
00523 msg2 = msg250_static;
00524 CHECK_TRUE(dnxQueueFind(queue, (void **)&msg2, qtcmp) == DNX_QRES_CONTINUE);
00525
00526
00527 for (i = 3; i < elemcount(msgs); i++)
00528 {
00529 CHECK_ZERO(dnxQueueGet(queue, (void **)&msg2));
00530 free(msg2);
00531 }
00532
00533
00534 CHECK_NONZERO(dnxQueueGet(queue, (void **)&msg2));
00535
00536
00537 CHECK_TRUE(dnxQueueSize(queue) == 0);
00538
00539 dnxQueueDestroy(queue);
00540
00541
00542 CHECK_TRUE(free_count == 1);
00543
00544 #ifdef DEBUG_HEAP
00545 CHECK_ZERO(dnxCheckHeap());
00546 #endif
00547
00548 return 0;
00549 }
00550
00551 #endif
00552
00553
00554