diff options
Diffstat (limited to 'libglusterfs/src/bounded-queue.c')
| -rw-r--r-- | libglusterfs/src/bounded-queue.c | 102 |
1 files changed, 102 insertions, 0 deletions
diff --git a/libglusterfs/src/bounded-queue.c b/libglusterfs/src/bounded-queue.c new file mode 100644 index 00000000000..b8f53624380 --- /dev/null +++ b/libglusterfs/src/bounded-queue.c @@ -0,0 +1,102 @@ +#include "bounded-queue.h" + +#include <assert.h> + +/// @brief Initialize a bounded queue structure. +/// +/// @returns zero on success. +int bounded_queue_init (struct bounded_queue *q, uint32_t limit) { + int rc; + + q->head = NULL; + q->tail = NULL; + q->count = 0; + q->limit = limit; + q->killed = 0; + rc = pthread_mutex_init (&q->mutex, NULL); + rc |= pthread_cond_init (&q->cond, NULL); + + return rc; +} + +/// @brief Push an entry onto the bounded queue tail. +/// +/// If the queue is full, will block until space is available +/// or someone calls bounded_queue_kill(). +/// +/// @returns zero on success. +int bounded_queue_push (struct bounded_queue *q, struct queue_entry *e) { + int rc; + + if ((rc = pthread_mutex_lock (&q->mutex)) != 0) { + return rc; + } + while (q->count >= q->limit && !q->killed) { + if (pthread_cond_wait (&q->cond, &q->mutex) || q->killed) { + pthread_mutex_unlock (&q->mutex); + return -1; + } + } + + e->next = NULL; + if (!q->head) { + assert (q->count == 0); + q->head = e; + } + if (q->tail) { + q->tail->next = e; + } + q->tail = e; + + q->count++; + rc = pthread_mutex_unlock (&q->mutex); + rc |= pthread_cond_signal (&q->cond); + return rc; +} + +/// @brief Pop an entry from the bounded queue head. +/// +/// If the queue is empty, blocks until an entry is pushed or +/// someone calls bounded_queue_kill(). +/// +/// @returns valid pointer on success, NULL on any error. +struct queue_entry *bounded_queue_pop (struct bounded_queue *q) { + struct queue_entry *e; + + if (pthread_mutex_lock (&q->mutex)) { + return NULL; + } + while (q->count == 0 && !q->killed) { + if (pthread_cond_wait (&q->cond, &q->mutex) || q->killed) { + pthread_mutex_unlock (&q->mutex); + return NULL; + } + } + + e = q->head; + q->head = e->next; + if (q->tail == e) { + assert (q->count == 1); + q->tail = NULL; + } + + q->count--; + pthread_mutex_unlock (&q->mutex); + pthread_cond_signal (&q->cond); + + return e; +} + +/// @brief Make all current and future calls to push/pop fail immediately. +/// +/// @returns zero on success. +int bounded_queue_kill (struct bounded_queue *q) { + int rc; + + rc = pthread_mutex_lock (&q->mutex); + q->killed = 1; + rc |= pthread_cond_broadcast (&q->cond); + rc |= pthread_mutex_unlock (&q->mutex); + + return rc; +} |
