blob: b8f53624380b23f4463c7e6e0925fdf9decbcd68 (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
|
#include "bounded-queue.h"
#include <assert.h>
/// @brief Initialize a bounded queue structure.
///
/// @returns zero on success.
int bounded_queue_init (struct bounded_queue *q, uint32_t limit) {
int rc;
q->head = NULL;
q->tail = NULL;
q->count = 0;
q->limit = limit;
q->killed = 0;
rc = pthread_mutex_init (&q->mutex, NULL);
rc |= pthread_cond_init (&q->cond, NULL);
return rc;
}
/// @brief Push an entry onto the bounded queue tail.
///
/// If the queue is full, will block until space is available
/// or someone calls bounded_queue_kill().
///
/// @returns zero on success.
int bounded_queue_push (struct bounded_queue *q, struct queue_entry *e) {
int rc;
if ((rc = pthread_mutex_lock (&q->mutex)) != 0) {
return rc;
}
while (q->count >= q->limit && !q->killed) {
if (pthread_cond_wait (&q->cond, &q->mutex) || q->killed) {
pthread_mutex_unlock (&q->mutex);
return -1;
}
}
e->next = NULL;
if (!q->head) {
assert (q->count == 0);
q->head = e;
}
if (q->tail) {
q->tail->next = e;
}
q->tail = e;
q->count++;
rc = pthread_mutex_unlock (&q->mutex);
rc |= pthread_cond_signal (&q->cond);
return rc;
}
/// @brief Pop an entry from the bounded queue head.
///
/// If the queue is empty, blocks until an entry is pushed or
/// someone calls bounded_queue_kill().
///
/// @returns valid pointer on success, NULL on any error.
struct queue_entry *bounded_queue_pop (struct bounded_queue *q) {
struct queue_entry *e;
if (pthread_mutex_lock (&q->mutex)) {
return NULL;
}
while (q->count == 0 && !q->killed) {
if (pthread_cond_wait (&q->cond, &q->mutex) || q->killed) {
pthread_mutex_unlock (&q->mutex);
return NULL;
}
}
e = q->head;
q->head = e->next;
if (q->tail == e) {
assert (q->count == 1);
q->tail = NULL;
}
q->count--;
pthread_mutex_unlock (&q->mutex);
pthread_cond_signal (&q->cond);
return e;
}
/// @brief Make all current and future calls to push/pop fail immediately.
///
/// @returns zero on success.
int bounded_queue_kill (struct bounded_queue *q) {
int rc;
rc = pthread_mutex_lock (&q->mutex);
q->killed = 1;
rc |= pthread_cond_broadcast (&q->cond);
rc |= pthread_mutex_unlock (&q->mutex);
return rc;
}
|