summaryrefslogtreecommitdiffstats
path: root/libglusterfs/src/bounded-queue.c
blob: b8f53624380b23f4463c7e6e0925fdf9decbcd68 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
#include "bounded-queue.h"

#include <assert.h>

/// @brief Initialize a bounded queue structure.
///
/// @returns zero on success.
int bounded_queue_init (struct bounded_queue *q, uint32_t limit) {
        int rc;

        q->head = NULL;
        q->tail = NULL;
        q->count = 0;
        q->limit = limit;
        q->killed = 0;
        rc = pthread_mutex_init (&q->mutex, NULL);
        rc |= pthread_cond_init (&q->cond, NULL);

        return rc;
}

/// @brief Push an entry onto the bounded queue tail.
///
/// If the queue is full, will block until space is available
/// or someone calls bounded_queue_kill().
///
/// @returns zero on success.
int bounded_queue_push (struct bounded_queue *q, struct queue_entry *e) {
        int rc;

        if ((rc = pthread_mutex_lock (&q->mutex)) != 0) {
                return rc;
        }
        while (q->count >= q->limit && !q->killed) {
                if (pthread_cond_wait (&q->cond, &q->mutex) || q->killed) {
                        pthread_mutex_unlock (&q->mutex);
                        return -1;
                }
        }

        e->next = NULL;
        if (!q->head) {
                assert (q->count == 0);
                q->head = e;
        }
        if (q->tail) {
                q->tail->next = e;
        }
        q->tail = e;

        q->count++;
        rc = pthread_mutex_unlock (&q->mutex);
        rc |= pthread_cond_signal (&q->cond);
        return rc;
}

/// @brief Pop an entry from the bounded queue head.
///
/// If the queue is empty, blocks until an entry is pushed or
/// someone calls bounded_queue_kill().
///
/// @returns valid pointer on success, NULL on any error.
struct queue_entry *bounded_queue_pop (struct bounded_queue *q) {
        struct queue_entry *e;

        if (pthread_mutex_lock (&q->mutex)) {
                return NULL;
        }
        while (q->count == 0 && !q->killed) {
                if (pthread_cond_wait (&q->cond, &q->mutex) || q->killed) {
                        pthread_mutex_unlock (&q->mutex);
                        return NULL;
                }
        }

        e = q->head;
        q->head = e->next;
        if (q->tail == e) {
                assert (q->count == 1);
                q->tail = NULL;
        }

        q->count--;
        pthread_mutex_unlock (&q->mutex);
        pthread_cond_signal (&q->cond);

        return e;
}

/// @brief Make all current and future calls to push/pop fail immediately.
///
/// @returns zero on success.
int bounded_queue_kill (struct bounded_queue *q) {
        int rc;

        rc = pthread_mutex_lock (&q->mutex);
        q->killed = 1;
        rc |= pthread_cond_broadcast (&q->cond);
        rc |= pthread_mutex_unlock (&q->mutex);

        return rc;
}