dnxQueue.c
Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00028 #include "dnxQueue.h"
00029
00030 #include "dnxError.h"
00031 #include "dnxDebug.h"
00032 #include "dnxLogging.h"
00033
00034 #include <stdlib.h>
00035 #include <syslog.h>
00036 #include <assert.h>
00037 #include <pthread.h>
00038
00040 typedef struct iDnxQueueEntry_
00041 {
00042 struct iDnxQueueEntry_ * next;
00043 void * pPayload;
00044 } iDnxQueueEntry;
00045
00047 typedef struct iDnxQueue_
00048 {
00049 iDnxQueueEntry * head;
00050 iDnxQueueEntry * tail;
00051 iDnxQueueEntry * current;
00052 void (*freepayload)(void *);
00053 unsigned size;
00054 unsigned maxsz;
00055 pthread_mutex_t mutex;
00056 pthread_cond_t cv;
00057 } iDnxQueue;
00058
00059
00060
00061
00062
00078 int dnxQueueNext(DnxQueue * queue, void ** ppPayload)
00079 {
00080 iDnxQueue * iqueue = (iDnxQueue *)queue;
00081
00082 assert(queue && ppPayload);
00083
00084 *ppPayload = 0;
00085
00086 DNX_PT_MUTEX_LOCK(&iqueue->mutex);
00087
00088
00089 if (iqueue->current)
00090 {
00091 *ppPayload = iqueue->current->pPayload;
00092
00093
00094 if (iqueue->current->next)
00095 iqueue->current = iqueue->current->next;
00096 else
00097 iqueue->current = iqueue->head;
00098 }
00099
00100 DNX_PT_MUTEX_UNLOCK(&iqueue->mutex);
00101
00102 return *ppPayload ? DNX_OK : DNX_ERR_NOTFOUND;
00103 }
00104
00105
00106
00117 int dnxQueueSize(DnxQueue * queue)
00118 {
00119 iDnxQueue * iqueue = (iDnxQueue *)queue;
00120 int count;
00121
00122 assert(queue);
00123
00124 DNX_PT_MUTEX_LOCK(&iqueue->mutex);
00125
00126 count = (int)iqueue->size;
00127
00128 DNX_PT_MUTEX_UNLOCK(&iqueue->mutex);
00129
00130 return count;
00131 }
00132
00133
00134
00135
00136
00137 int dnxQueuePut(DnxQueue * queue, void * pPayload)
00138 {
00139 iDnxQueue * iqueue = (iDnxQueue *)queue;
00140 iDnxQueueEntry * item;
00141
00142 assert(queue);
00143
00144
00145 if ((item = (iDnxQueueEntry *)xmalloc(sizeof *item)) == 0)
00146 return DNX_ERR_MEMORY;
00147
00148 item->pPayload = pPayload;
00149 item->next = 0;
00150
00151 DNX_PT_MUTEX_LOCK(&iqueue->mutex);
00152
00153
00154 if (iqueue->size == 0)
00155 iqueue->head = iqueue->tail = iqueue->current = item;
00156 else
00157 {
00158 iqueue->tail->next = item;
00159 iqueue->tail = item;
00160 }
00161
00162 iqueue->size++;
00163
00164
00165 if (iqueue->maxsz > 0 && iqueue->size > iqueue->maxsz)
00166 {
00167
00168 item = iqueue->head;
00169 iqueue->head = item->next;
00170 if (iqueue->current == item)
00171 iqueue->current = item->next;
00172
00173
00174 if (iqueue->head == 0)
00175 iqueue->tail = 0;
00176
00177 iqueue->size--;
00178
00179
00180 if (iqueue->freepayload)
00181 iqueue->freepayload(item->pPayload);
00182
00183 xfree(item);
00184 }
00185
00186
00187 pthread_cond_signal(&iqueue->cv);
00188
00189 DNX_PT_MUTEX_UNLOCK(&iqueue->mutex);
00190
00191 return DNX_OK;
00192 }
00193
00194
00195
00196 int dnxQueueGet(DnxQueue * queue, void ** ppPayload)
00197 {
00198 iDnxQueue * iqueue = (iDnxQueue *)queue;
00199 iDnxQueueEntry * item = 0;
00200
00201 assert(queue && ppPayload);
00202
00203 DNX_PT_MUTEX_LOCK(&iqueue->mutex);
00204
00205 if (iqueue->size > 0)
00206 {
00207
00208 item = iqueue->head;
00209 iqueue->head = item->next;
00210 if (iqueue->current == item)
00211 iqueue->current = item->next;
00212
00213
00214 if (iqueue->head == 0)
00215 iqueue->tail = 0;
00216
00217 iqueue->size--;
00218 }
00219
00220 DNX_PT_MUTEX_UNLOCK(&iqueue->mutex);
00221
00222
00223 if (item)
00224 {
00225 *ppPayload = item->pPayload;
00226 xfree(item);
00227 return DNX_OK;
00228 }
00229
00230 return DNX_ERR_NOTFOUND;
00231 }
00232
00233
00234
00235 int dnxQueueGetWait(DnxQueue * queue, unsigned timeout, void ** ppPayload)
00236 {
00237 iDnxQueue * iqueue = (iDnxQueue *)queue;
00238 iDnxQueueEntry * item = 0;
00239
00240 assert(queue && ppPayload);
00241
00242 DNX_PT_MUTEX_LOCK(&iqueue->mutex);
00243
00244
00245 while (item == 0)
00246 {
00247
00248 if (iqueue->size > 0)
00249 {
00250 item = iqueue->head;
00251 iqueue->head = item->next;
00252 if (iqueue->current == item)
00253 iqueue->current = item->next;
00254
00255
00256 if (iqueue->head == 0)
00257 iqueue->tail = 0;
00258
00259 iqueue->size--;
00260 }
00261 else if (timeout > 0)
00262 {
00263 struct timespec to;
00264 clock_gettime(CLOCK_REALTIME, &to);
00265 to.tv_sec += timeout;
00266 if (pthread_cond_timedwait(&iqueue->cv,
00267 &iqueue->mutex, &to) == ETIMEDOUT)
00268 break;
00269 }
00270 else
00271 pthread_cond_wait(&iqueue->cv, &iqueue->mutex);
00272 }
00273
00274 DNX_PT_MUTEX_UNLOCK(&iqueue->mutex);
00275
00276
00277 if (item)
00278 {
00279 *ppPayload = item->pPayload;
00280 xfree(item);
00281 return DNX_OK;
00282 }
00283
00284 return DNX_ERR_NOTFOUND;
00285 }
00286
00287
00288
00289 DnxQueueResult dnxQueueRemove(DnxQueue * queue, void ** ppPayload,
00290 DnxQueueResult (*Compare)(void * pLeft, void * pRight))
00291 {
00292 DnxQueueResult bFound = DNX_QRES_CONTINUE;
00293 iDnxQueue * iqueue = (iDnxQueue *)queue;
00294 iDnxQueueEntry * item, * prev;
00295
00296 assert(queue && ppPayload && Compare);
00297
00298 DNX_PT_MUTEX_LOCK(&iqueue->mutex);
00299
00300 prev = 0;
00301 for (item = iqueue->head; item; item = item->next)
00302 {
00303 if ((bFound = Compare(*ppPayload, item->pPayload)) != DNX_QRES_CONTINUE)
00304 {
00305 if (bFound == DNX_QRES_FOUND)
00306 {
00307 *ppPayload = item->pPayload;
00308
00309
00310 if (prev)
00311 prev->next = item->next;
00312 else
00313 iqueue->head = item->next;
00314
00315 if (item->next == 0)
00316 iqueue->tail = prev;
00317
00318 if (iqueue->current == item)
00319 if ((iqueue->current = item->next) == 0)
00320 iqueue->current = iqueue->head;
00321
00322 iqueue->size--;
00323 }
00324 break;
00325 }
00326 prev = item;
00327 }
00328
00329 DNX_PT_MUTEX_UNLOCK(&iqueue->mutex);
00330
00331 if (bFound == DNX_QRES_FOUND)
00332 xfree(item);
00333
00334 return bFound;
00335 }
00336
00337
00338
00339 DnxQueueResult dnxQueueFind(DnxQueue * queue, void ** ppPayload,
00340 DnxQueueResult (*Compare)(void * pLeft, void * pRight))
00341 {
00342 DnxQueueResult bFound = DNX_QRES_CONTINUE;
00343 iDnxQueue * iqueue = (iDnxQueue *)queue;
00344 iDnxQueueEntry * item;
00345
00346 assert(queue && ppPayload && Compare);
00347
00348 DNX_PT_MUTEX_LOCK(&iqueue->mutex);
00349
00350 for (item = iqueue->head; item; item = item->next)
00351 {
00352 if ((bFound = Compare(*ppPayload, item->pPayload)) != DNX_QRES_CONTINUE)
00353 {
00354 if (bFound == DNX_QRES_FOUND)
00355 *ppPayload = item->pPayload;
00356 break;
00357 }
00358 }
00359
00360 DNX_PT_MUTEX_UNLOCK(&iqueue->mutex);
00361
00362 return bFound;
00363 }
00364
00365
00366
00367 int dnxQueueCreate(unsigned maxsz, void (*pldtor)(void *), DnxQueue ** pqueue)
00368 {
00369 iDnxQueue * iqueue;
00370
00371 assert(pqueue);
00372
00373 if ((iqueue = (iDnxQueue *)xmalloc(sizeof *iqueue)) == 0)
00374 return DNX_ERR_MEMORY;
00375
00376
00377 memset(iqueue, 0, sizeof *iqueue);
00378 iqueue->freepayload = pldtor;
00379 iqueue->maxsz = maxsz;
00380
00381
00382 DNX_PT_MUTEX_INIT(&iqueue->mutex);
00383 pthread_cond_init(&iqueue->cv, 0);
00384
00385 *pqueue = (DnxQueue *)iqueue;
00386
00387 return DNX_OK;
00388 }
00389
00390
00391
00392 void dnxQueueDestroy(DnxQueue * queue)
00393 {
00394 iDnxQueue * iqueue = (iDnxQueue *)queue;
00395 iDnxQueueEntry * item;
00396
00397 assert(queue);
00398
00399 DNX_PT_MUTEX_LOCK(&iqueue->mutex);
00400
00401
00402 item = iqueue->head;
00403 while (item != 0)
00404 {
00405 iDnxQueueEntry * next = item->next;
00406 iqueue->freepayload(item->pPayload);
00407 xfree(item);
00408 item = next;
00409 }
00410
00411 DNX_PT_MUTEX_UNLOCK(&iqueue->mutex);
00412
00413 DNX_PT_MUTEX_DESTROY(&iqueue->mutex);
00414 pthread_cond_destroy(&iqueue->cv);
00415
00416 xfree(iqueue);
00417 }
00418
00419
00420
00421
00422
00423
00424
00425
00426
00427
00428
00429
00430
00431
00432
00433
00434
00435 #ifdef DNX_QUEUE_TEST
00436
00437 #include "utesthelp.h"
00438 #include <time.h>
00439
00440 #define elemcount(x) (sizeof(x)/sizeof(*(x)))
00441
00442 static int free_count;
00443 static int verbose;
00444
00445 IMPLEMENT_DNX_DEBUG(verbose);
00446
00447
00448 static DnxQueueResult qtcmp(void * left, void * right)
00449 { return strcmp((char *)left, (char *)right) == 0?
00450 DNX_QRES_FOUND: DNX_QRES_CONTINUE; }
00451
00452 static void qtfree(void * p)
00453 {
00454 free_count++;
00455 free(p);
00456 }
00457
00458 int main(int argc, char ** argv)
00459 {
00460 DnxQueue * queue;
00461 iDnxQueue * iqueue;
00462 DnxQueueResult qres;
00463 char * msg100_static = "message 100";
00464 char * msg25_static = "message 25";
00465 char * msg250_static = "message 250";
00466 char * msgs[101];
00467 char * msg2;
00468 int i;
00469
00470 verbose = argc > 1? 1: 0;
00471
00472
00473 CHECK_ZERO(dnxQueueCreate(100, qtfree, &queue));
00474 iqueue = (iDnxQueue *)queue;
00475
00476
00477 CHECK_TRUE(iqueue->head == 0);
00478 CHECK_TRUE(iqueue->tail == 0);
00479 CHECK_TRUE(iqueue->current == 0);
00480 CHECK_TRUE(iqueue->freepayload == qtfree);
00481 CHECK_TRUE(iqueue->size == 0);
00482 CHECK_TRUE(iqueue->maxsz == 100);
00483
00484
00485 free_count = 0;
00486 for (i = 0; i < elemcount(msgs); i++)
00487 {
00488 char buf[32];
00489 sprintf(buf, "message %d", i);
00490 CHECK_NONZERO(msgs[i] = strdup(buf));
00491 CHECK_ZERO(dnxQueuePut(queue, msgs[i]));
00492 }
00493
00494
00495
00496 CHECK_TRUE(free_count == 1);
00497
00498
00499 CHECK_ZERO(dnxQueueGet(queue, (void **)&msg2));
00500 CHECK_TRUE(strcmp(msg2, "message 1") == 0);
00501 free(msg2);
00502
00503
00504 msg2 = msg100_static;
00505 CHECK_TRUE(dnxQueueRemove(queue, (void **)&msg2, qtcmp) == DNX_QRES_FOUND);
00506 CHECK_NONZERO(msg2);
00507 CHECK_TRUE(msg2 != msg100_static);
00508 free(msg2);
00509
00510
00511 msg2 = msg25_static;
00512 CHECK_TRUE(dnxQueueFind(queue, (void **)&msg2, qtcmp) == DNX_QRES_FOUND);
00513 CHECK_TRUE(msg2 != msg25_static);
00514 CHECK_TRUE(strcmp(msg2, msgs[25]) == 0);
00515
00516
00517 msg2 = msg250_static;
00518 CHECK_TRUE(dnxQueueFind(queue, (void **)&msg2, qtcmp) == DNX_QRES_CONTINUE);
00519
00520
00521 for (i = 3; i < elemcount(msgs); i++)
00522 {
00523 CHECK_ZERO(dnxQueueGet(queue, (void **)&msg2));
00524 free(msg2);
00525 }
00526
00527
00528 CHECK_NONZERO(dnxQueueGet(queue, (void **)&msg2));
00529
00530
00531 CHECK_TRUE(dnxQueueSize(queue) == 0);
00532
00533 dnxQueueDestroy(queue);
00534
00535
00536 CHECK_TRUE(free_count == 1);
00537
00538 #ifdef DEBUG_HEAP
00539 CHECK_ZERO(dnxCheckHeap());
00540 #endif
00541
00542 return 0;
00543 }
00544
00545 #endif
00546
00547
00548