diff options
| -rw-r--r-- | libglusterfs/src/Makefile.am | 4 | ||||
| -rw-r--r-- | libglusterfs/src/bounded-queue.c | 102 | ||||
| -rw-r--r-- | libglusterfs/src/bounded-queue.h | 32 | ||||
| -rw-r--r-- | libglusterfs/src/unittest/bounded_queue_unittest.c | 184 | 
4 files changed, 320 insertions, 2 deletions
diff --git a/libglusterfs/src/Makefile.am b/libglusterfs/src/Makefile.am index 2a4f76421fa..3299a85f16f 100644 --- a/libglusterfs/src/Makefile.am +++ b/libglusterfs/src/Makefile.am @@ -32,7 +32,7 @@ libglusterfs_la_SOURCES = dict.c xlator.c logging.c \  	$(CONTRIBDIR)/libexecinfo/execinfo.c quota-common-utils.c rot-buffs.c \  	$(CONTRIBDIR)/timer-wheel/timer-wheel.c \  	$(CONTRIBDIR)/timer-wheel/find_last_bit.c tw.c default-args.c locking.c \ -	compound-fop-utils.c +	compound-fop-utils.c bounded-queue.c  nodist_libglusterfs_la_SOURCES = y.tab.c graph.lex.c defaults.c  nodist_libglusterfs_la_HEADERS = y.tab.h glusterfs-fops.h @@ -51,7 +51,7 @@ libglusterfs_la_HEADERS = common-utils.h defaults.h default-args.h \  	glfs-message-id.h template-component-messages.h strfd.h \  	syncop-utils.h parse-utils.h libglusterfs-messages.h tw.h \  	lvm-defaults.h quota-common-utils.h rot-buffs.h \ -	compat-uuid.h upcall-utils.h +	compat-uuid.h upcall-utils.h bounded-queue.h  libglusterfs_ladir = $(includedir)/glusterfs diff --git a/libglusterfs/src/bounded-queue.c b/libglusterfs/src/bounded-queue.c new file mode 100644 index 00000000000..b8f53624380 --- /dev/null +++ b/libglusterfs/src/bounded-queue.c @@ -0,0 +1,102 @@ +#include "bounded-queue.h" + +#include <assert.h> + +/// @brief Initialize a bounded queue structure. +/// +/// @returns zero on success. +int bounded_queue_init (struct bounded_queue *q, uint32_t limit) { +        int rc; + +        q->head = NULL; +        q->tail = NULL; +        q->count = 0; +        q->limit = limit; +        q->killed = 0; +        rc = pthread_mutex_init (&q->mutex, NULL); +        rc |= pthread_cond_init (&q->cond, NULL); + +        return rc; +} + +/// @brief Push an entry onto the bounded queue tail. +/// +/// If the queue is full, will block until space is available +/// or someone calls bounded_queue_kill(). +/// +/// @returns zero on success. +int bounded_queue_push (struct bounded_queue *q, struct queue_entry *e) { +        int rc; + +        if ((rc = pthread_mutex_lock (&q->mutex)) != 0) { +                return rc; +        } +        while (q->count >= q->limit && !q->killed) { +                if (pthread_cond_wait (&q->cond, &q->mutex) || q->killed) { +                        pthread_mutex_unlock (&q->mutex); +                        return -1; +                } +        } + +        e->next = NULL; +        if (!q->head) { +                assert (q->count == 0); +                q->head = e; +        } +        if (q->tail) { +                q->tail->next = e; +        } +        q->tail = e; + +        q->count++; +        rc = pthread_mutex_unlock (&q->mutex); +        rc |= pthread_cond_signal (&q->cond); +        return rc; +} + +/// @brief Pop an entry from the bounded queue head. +/// +/// If the queue is empty, blocks until an entry is pushed or +/// someone calls bounded_queue_kill(). +/// +/// @returns valid pointer on success, NULL on any error. +struct queue_entry *bounded_queue_pop (struct bounded_queue *q) { +        struct queue_entry *e; + +        if (pthread_mutex_lock (&q->mutex)) { +                return NULL; +        } +        while (q->count == 0 && !q->killed) { +                if (pthread_cond_wait (&q->cond, &q->mutex) || q->killed) { +                        pthread_mutex_unlock (&q->mutex); +                        return NULL; +                } +        } + +        e = q->head; +        q->head = e->next; +        if (q->tail == e) { +                assert (q->count == 1); +                q->tail = NULL; +        } + +        q->count--; +        pthread_mutex_unlock (&q->mutex); +        pthread_cond_signal (&q->cond); + +        return e; +} + +/// @brief Make all current and future calls to push/pop fail immediately. +/// +/// @returns zero on success. +int bounded_queue_kill (struct bounded_queue *q) { +        int rc; + +        rc = pthread_mutex_lock (&q->mutex); +        q->killed = 1; +        rc |= pthread_cond_broadcast (&q->cond); +        rc |= pthread_mutex_unlock (&q->mutex); + +        return rc; +} diff --git a/libglusterfs/src/bounded-queue.h b/libglusterfs/src/bounded-queue.h new file mode 100644 index 00000000000..9de62ab5447 --- /dev/null +++ b/libglusterfs/src/bounded-queue.h @@ -0,0 +1,32 @@ +#pragma once + +#include <stdint.h> +#include <pthread.h> + +struct queue_entry { +        struct queue_entry *next; +}; + +/// @brief a FIFO queue that holds a limited number of elements. +/// +/// The queue is blocking, i.e. any attempt to push to a full +/// queue or read from an empty one will block indefinitely. +/// +/// When tearing down the queue, blocked processes may be +/// unblocked by calling bounded_queue_kill(), which will +/// cause all blocked processes to return with error status. +struct bounded_queue { +        struct queue_entry *head; +        struct queue_entry *tail; +        uint32_t count; +        uint32_t limit; +        int killed; +        pthread_mutex_t mutex; +        pthread_cond_t cond; +}; + +int bounded_queue_init (struct bounded_queue *q, uint32_t limit); +int bounded_queue_push (struct bounded_queue *q, struct queue_entry *e); +struct queue_entry *bounded_queue_pop (struct bounded_queue *q); +int bounded_queue_kill (struct bounded_queue *q); + diff --git a/libglusterfs/src/unittest/bounded_queue_unittest.c b/libglusterfs/src/unittest/bounded_queue_unittest.c new file mode 100644 index 00000000000..230e3ba55a1 --- /dev/null +++ b/libglusterfs/src/unittest/bounded_queue_unittest.c @@ -0,0 +1,184 @@ +#include <stdlib.h> +#include <string.h> +#include <stdio.h> +#include <unistd.h> +#include <assert.h> + +#include "bounded-queue.h" + +struct int_entry { +        struct queue_entry link; +        int val; +}; + +struct test_state { +        struct bounded_queue *q; +        pthread_mutex_t mutex; +        pthread_cond_t cond; +        int successful_pushes; +        int failed_pushes; +        int successful_pops; +        int failed_pops; +}; + +static void *push_thread (void *arg) { +        struct test_state *state = arg; +        struct int_entry *e = malloc (sizeof (*e)); +        int rc; + +        e->val = 42; + +        rc = bounded_queue_push (state->q, &e->link); + +        pthread_mutex_lock (&state->mutex); +        if (!rc) { +                state->successful_pushes++; +        } else { +                free (e); +                state->failed_pushes++; +        } +        pthread_mutex_unlock (&state->mutex); +        pthread_cond_signal (&state->cond); + +        return NULL; +} + +static void *pop_thread (void *arg) { +        struct test_state *state = arg; +        struct int_entry *e; + +        e = (struct int_entry *)bounded_queue_pop (state->q); + +        pthread_mutex_lock (&state->mutex); +        if (e) { +                assert (e->val == 42); +                state->successful_pops++; +                free (e); +        } else { +                state->failed_pops++; +        } +        pthread_mutex_unlock (&state->mutex); +        pthread_cond_signal (&state->cond); + +        return NULL; +} + +int main (void) { +        struct bounded_queue q; +        struct int_entry *e; +        struct test_state state; +        int i, rc; + +        rc = bounded_queue_init (&q, 10); + +        for (i = 0; i < 10; ++i) { +                e = malloc (sizeof (*e)); +                e->val = i; +                rc = bounded_queue_push (&q, &e->link); +                assert (rc == 0); +        } + +        for (i = 0; i < 10; ++i) { +                e = (struct int_entry *)bounded_queue_pop (&q); +                assert (e); +                assert (e->val == i); +                free (e); +        } + +        puts ("Single-threaded test passed."); + +        memset (&state, 0, sizeof (state)); +        pthread_mutex_init (&state.mutex, NULL); +        pthread_cond_init (&state.cond, NULL); +        state.q = &q; + +        puts ("Starting 20 pusher threads."); + +        for (i = 0; i < 20; ++i) { +                pthread_t t; +                pthread_create (&t, NULL, push_thread, (void *)&state); +                pthread_detach (t); +        } + +        // Since queue limit is 10, 10 and only 10 pushers should succeed. +        puts ("Waiting for winners..."); + +        pthread_mutex_lock (&state.mutex); +        while (state.successful_pushes < 10) { +                pthread_cond_wait (&state.cond, &state.mutex); +        } +        pthread_mutex_unlock (&state.mutex); + +        sleep (1); + +        pthread_mutex_lock (&state.mutex); +        assert (state.successful_pushes == 10); +        assert (state.failed_pushes == 0); +        pthread_mutex_unlock (&state.mutex); + +        // Kill the queue: all 10 blocked pushers should fail. +        bounded_queue_kill (&q); + +        puts ("Waiting for losers..."); + +        pthread_mutex_lock (&state.mutex); +        while (state.failed_pushes != 10) { +                pthread_cond_wait (&state.cond, &state.mutex); +        } +        pthread_mutex_unlock (&state.mutex); + +        pthread_mutex_lock (&state.mutex); +        assert (state.successful_pushes == 10); +        assert (state.failed_pushes == 10); +        pthread_mutex_unlock (&state.mutex); + +        q.killed = 0; + +        puts ("Starting 20 popper threads..."); +        for (i = 0; i < 20; ++i) { +                pthread_t t; +                pthread_create (&t, NULL, pop_thread, (void *)&state); +                pthread_detach (t); +        } + +        // We have 10 entries on the queue, so 10 pops should succeed. +        puts ("Waiting for winners..."); + +        pthread_mutex_lock (&state.mutex); +        while (state.successful_pops != 10) { +                pthread_cond_wait (&state.cond, &state.mutex); +        } +        pthread_mutex_unlock (&state.mutex); + +        sleep (1); + +        pthread_mutex_lock (&state.mutex); +        assert (state.successful_pops == 10); +        assert (state.failed_pops == 0); +        pthread_mutex_unlock (&state.mutex); + +        bounded_queue_kill (&q); + +        puts ("Waiting for losers..."); + +        pthread_mutex_lock (&state.mutex); +        while (state.failed_pops != 10) { +                pthread_cond_wait (&state.cond, &state.mutex); +        } +        pthread_mutex_unlock (&state.mutex); + +        sleep (1); + +        pthread_mutex_lock (&state.mutex); +        assert (state.successful_pops == 10); +        assert (state.failed_pops == 10); +        pthread_mutex_unlock (&state.mutex); + +        puts ("Multi-threaded tests passed."); + +        pthread_cond_destroy (&state.cond); +        pthread_mutex_destroy (&state.mutex); + +        return 0; +} +  | 
