summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--libglusterfs/src/Makefile.am4
-rw-r--r--libglusterfs/src/bounded-queue.c102
-rw-r--r--libglusterfs/src/bounded-queue.h32
-rw-r--r--libglusterfs/src/unittest/bounded_queue_unittest.c184
4 files changed, 320 insertions, 2 deletions
diff --git a/libglusterfs/src/Makefile.am b/libglusterfs/src/Makefile.am
index 2a4f76421fa..3299a85f16f 100644
--- a/libglusterfs/src/Makefile.am
+++ b/libglusterfs/src/Makefile.am
@@ -32,7 +32,7 @@ libglusterfs_la_SOURCES = dict.c xlator.c logging.c \
$(CONTRIBDIR)/libexecinfo/execinfo.c quota-common-utils.c rot-buffs.c \
$(CONTRIBDIR)/timer-wheel/timer-wheel.c \
$(CONTRIBDIR)/timer-wheel/find_last_bit.c tw.c default-args.c locking.c \
- compound-fop-utils.c
+ compound-fop-utils.c bounded-queue.c
nodist_libglusterfs_la_SOURCES = y.tab.c graph.lex.c defaults.c
nodist_libglusterfs_la_HEADERS = y.tab.h glusterfs-fops.h
@@ -51,7 +51,7 @@ libglusterfs_la_HEADERS = common-utils.h defaults.h default-args.h \
glfs-message-id.h template-component-messages.h strfd.h \
syncop-utils.h parse-utils.h libglusterfs-messages.h tw.h \
lvm-defaults.h quota-common-utils.h rot-buffs.h \
- compat-uuid.h upcall-utils.h
+ compat-uuid.h upcall-utils.h bounded-queue.h
libglusterfs_ladir = $(includedir)/glusterfs
diff --git a/libglusterfs/src/bounded-queue.c b/libglusterfs/src/bounded-queue.c
new file mode 100644
index 00000000000..b8f53624380
--- /dev/null
+++ b/libglusterfs/src/bounded-queue.c
@@ -0,0 +1,102 @@
+#include "bounded-queue.h"
+
+#include <assert.h>
+
+/// @brief Initialize a bounded queue structure.
+///
+/// @returns zero on success.
+int bounded_queue_init (struct bounded_queue *q, uint32_t limit) {
+ int rc;
+
+ q->head = NULL;
+ q->tail = NULL;
+ q->count = 0;
+ q->limit = limit;
+ q->killed = 0;
+ rc = pthread_mutex_init (&q->mutex, NULL);
+ rc |= pthread_cond_init (&q->cond, NULL);
+
+ return rc;
+}
+
+/// @brief Push an entry onto the bounded queue tail.
+///
+/// If the queue is full, will block until space is available
+/// or someone calls bounded_queue_kill().
+///
+/// @returns zero on success.
+int bounded_queue_push (struct bounded_queue *q, struct queue_entry *e) {
+ int rc;
+
+ if ((rc = pthread_mutex_lock (&q->mutex)) != 0) {
+ return rc;
+ }
+ while (q->count >= q->limit && !q->killed) {
+ if (pthread_cond_wait (&q->cond, &q->mutex) || q->killed) {
+ pthread_mutex_unlock (&q->mutex);
+ return -1;
+ }
+ }
+
+ e->next = NULL;
+ if (!q->head) {
+ assert (q->count == 0);
+ q->head = e;
+ }
+ if (q->tail) {
+ q->tail->next = e;
+ }
+ q->tail = e;
+
+ q->count++;
+ rc = pthread_mutex_unlock (&q->mutex);
+ rc |= pthread_cond_signal (&q->cond);
+ return rc;
+}
+
+/// @brief Pop an entry from the bounded queue head.
+///
+/// If the queue is empty, blocks until an entry is pushed or
+/// someone calls bounded_queue_kill().
+///
+/// @returns valid pointer on success, NULL on any error.
+struct queue_entry *bounded_queue_pop (struct bounded_queue *q) {
+ struct queue_entry *e;
+
+ if (pthread_mutex_lock (&q->mutex)) {
+ return NULL;
+ }
+ while (q->count == 0 && !q->killed) {
+ if (pthread_cond_wait (&q->cond, &q->mutex) || q->killed) {
+ pthread_mutex_unlock (&q->mutex);
+ return NULL;
+ }
+ }
+
+ e = q->head;
+ q->head = e->next;
+ if (q->tail == e) {
+ assert (q->count == 1);
+ q->tail = NULL;
+ }
+
+ q->count--;
+ pthread_mutex_unlock (&q->mutex);
+ pthread_cond_signal (&q->cond);
+
+ return e;
+}
+
+/// @brief Make all current and future calls to push/pop fail immediately.
+///
+/// @returns zero on success.
+int bounded_queue_kill (struct bounded_queue *q) {
+ int rc;
+
+ rc = pthread_mutex_lock (&q->mutex);
+ q->killed = 1;
+ rc |= pthread_cond_broadcast (&q->cond);
+ rc |= pthread_mutex_unlock (&q->mutex);
+
+ return rc;
+}
diff --git a/libglusterfs/src/bounded-queue.h b/libglusterfs/src/bounded-queue.h
new file mode 100644
index 00000000000..9de62ab5447
--- /dev/null
+++ b/libglusterfs/src/bounded-queue.h
@@ -0,0 +1,32 @@
+#pragma once
+
+#include <stdint.h>
+#include <pthread.h>
+
+struct queue_entry {
+ struct queue_entry *next;
+};
+
+/// @brief a FIFO queue that holds a limited number of elements.
+///
+/// The queue is blocking, i.e. any attempt to push to a full
+/// queue or read from an empty one will block indefinitely.
+///
+/// When tearing down the queue, blocked processes may be
+/// unblocked by calling bounded_queue_kill(), which will
+/// cause all blocked processes to return with error status.
+struct bounded_queue {
+ struct queue_entry *head;
+ struct queue_entry *tail;
+ uint32_t count;
+ uint32_t limit;
+ int killed;
+ pthread_mutex_t mutex;
+ pthread_cond_t cond;
+};
+
+int bounded_queue_init (struct bounded_queue *q, uint32_t limit);
+int bounded_queue_push (struct bounded_queue *q, struct queue_entry *e);
+struct queue_entry *bounded_queue_pop (struct bounded_queue *q);
+int bounded_queue_kill (struct bounded_queue *q);
+
diff --git a/libglusterfs/src/unittest/bounded_queue_unittest.c b/libglusterfs/src/unittest/bounded_queue_unittest.c
new file mode 100644
index 00000000000..230e3ba55a1
--- /dev/null
+++ b/libglusterfs/src/unittest/bounded_queue_unittest.c
@@ -0,0 +1,184 @@
+#include <stdlib.h>
+#include <string.h>
+#include <stdio.h>
+#include <unistd.h>
+#include <assert.h>
+
+#include "bounded-queue.h"
+
+struct int_entry {
+ struct queue_entry link;
+ int val;
+};
+
+struct test_state {
+ struct bounded_queue *q;
+ pthread_mutex_t mutex;
+ pthread_cond_t cond;
+ int successful_pushes;
+ int failed_pushes;
+ int successful_pops;
+ int failed_pops;
+};
+
+static void *push_thread (void *arg) {
+ struct test_state *state = arg;
+ struct int_entry *e = malloc (sizeof (*e));
+ int rc;
+
+ e->val = 42;
+
+ rc = bounded_queue_push (state->q, &e->link);
+
+ pthread_mutex_lock (&state->mutex);
+ if (!rc) {
+ state->successful_pushes++;
+ } else {
+ free (e);
+ state->failed_pushes++;
+ }
+ pthread_mutex_unlock (&state->mutex);
+ pthread_cond_signal (&state->cond);
+
+ return NULL;
+}
+
+static void *pop_thread (void *arg) {
+ struct test_state *state = arg;
+ struct int_entry *e;
+
+ e = (struct int_entry *)bounded_queue_pop (state->q);
+
+ pthread_mutex_lock (&state->mutex);
+ if (e) {
+ assert (e->val == 42);
+ state->successful_pops++;
+ free (e);
+ } else {
+ state->failed_pops++;
+ }
+ pthread_mutex_unlock (&state->mutex);
+ pthread_cond_signal (&state->cond);
+
+ return NULL;
+}
+
+int main (void) {
+ struct bounded_queue q;
+ struct int_entry *e;
+ struct test_state state;
+ int i, rc;
+
+ rc = bounded_queue_init (&q, 10);
+
+ for (i = 0; i < 10; ++i) {
+ e = malloc (sizeof (*e));
+ e->val = i;
+ rc = bounded_queue_push (&q, &e->link);
+ assert (rc == 0);
+ }
+
+ for (i = 0; i < 10; ++i) {
+ e = (struct int_entry *)bounded_queue_pop (&q);
+ assert (e);
+ assert (e->val == i);
+ free (e);
+ }
+
+ puts ("Single-threaded test passed.");
+
+ memset (&state, 0, sizeof (state));
+ pthread_mutex_init (&state.mutex, NULL);
+ pthread_cond_init (&state.cond, NULL);
+ state.q = &q;
+
+ puts ("Starting 20 pusher threads.");
+
+ for (i = 0; i < 20; ++i) {
+ pthread_t t;
+ pthread_create (&t, NULL, push_thread, (void *)&state);
+ pthread_detach (t);
+ }
+
+ // Since queue limit is 10, 10 and only 10 pushers should succeed.
+ puts ("Waiting for winners...");
+
+ pthread_mutex_lock (&state.mutex);
+ while (state.successful_pushes < 10) {
+ pthread_cond_wait (&state.cond, &state.mutex);
+ }
+ pthread_mutex_unlock (&state.mutex);
+
+ sleep (1);
+
+ pthread_mutex_lock (&state.mutex);
+ assert (state.successful_pushes == 10);
+ assert (state.failed_pushes == 0);
+ pthread_mutex_unlock (&state.mutex);
+
+ // Kill the queue: all 10 blocked pushers should fail.
+ bounded_queue_kill (&q);
+
+ puts ("Waiting for losers...");
+
+ pthread_mutex_lock (&state.mutex);
+ while (state.failed_pushes != 10) {
+ pthread_cond_wait (&state.cond, &state.mutex);
+ }
+ pthread_mutex_unlock (&state.mutex);
+
+ pthread_mutex_lock (&state.mutex);
+ assert (state.successful_pushes == 10);
+ assert (state.failed_pushes == 10);
+ pthread_mutex_unlock (&state.mutex);
+
+ q.killed = 0;
+
+ puts ("Starting 20 popper threads...");
+ for (i = 0; i < 20; ++i) {
+ pthread_t t;
+ pthread_create (&t, NULL, pop_thread, (void *)&state);
+ pthread_detach (t);
+ }
+
+ // We have 10 entries on the queue, so 10 pops should succeed.
+ puts ("Waiting for winners...");
+
+ pthread_mutex_lock (&state.mutex);
+ while (state.successful_pops != 10) {
+ pthread_cond_wait (&state.cond, &state.mutex);
+ }
+ pthread_mutex_unlock (&state.mutex);
+
+ sleep (1);
+
+ pthread_mutex_lock (&state.mutex);
+ assert (state.successful_pops == 10);
+ assert (state.failed_pops == 0);
+ pthread_mutex_unlock (&state.mutex);
+
+ bounded_queue_kill (&q);
+
+ puts ("Waiting for losers...");
+
+ pthread_mutex_lock (&state.mutex);
+ while (state.failed_pops != 10) {
+ pthread_cond_wait (&state.cond, &state.mutex);
+ }
+ pthread_mutex_unlock (&state.mutex);
+
+ sleep (1);
+
+ pthread_mutex_lock (&state.mutex);
+ assert (state.successful_pops == 10);
+ assert (state.failed_pops == 10);
+ pthread_mutex_unlock (&state.mutex);
+
+ puts ("Multi-threaded tests passed.");
+
+ pthread_cond_destroy (&state.cond);
+ pthread_mutex_destroy (&state.mutex);
+
+ return 0;
+}
+