summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVenky Shankar <vshankar@redhat.com>2015-02-03 19:03:30 +0530
committerVijay Bellur <vbellur@redhat.com>2015-03-18 21:22:33 -0700
commit87a9d23627586a5d5cd8a502a08ecbdb6f0f7bb2 (patch)
tree7e627c0810e151177364f66a9442529d1e95826f
parentd236b01a8bf68b83c76ea1cfa8351833e19695f7 (diff)
libglusterfs/rot-buffs: rotational buffers
This patch introduces rotational buffers aiming at the classic multiple producer and multiple consumer problem. A fixed set of buffer list is allocated during initialization, where each list consist of a list of buffers. Each buffer is an iovec pointing to a memory region of fixed allocation size. Multiple producers write data to these buffers. A buffer list starts with a single buffer (iovec) and allocates more when required (although this can be preallocatd in multiples of k). rot-buffs allow multiple producers to write data parallely with a bit of extra cost of taking locks. Therefore, it's much suited for large writes. Multiple producers are allowed to write in the buffer parallely by "reserving" write space for selected number of bytes and returning pointer to the start of the reserved area. The write size is selected by the producer before it starts the write (which is often known). Therefore, the write itself need not be serialized -- just the space reservation needs to be done safely. The other part is when a consumer kicks in to consume what has been produced. At this point, a buffer list switch is performed. The "current" buffer list pointer is safely pointed to the next available buffer list. New writes are now directed to the just switched buffer list (the old buffer list is now considered out of rotation). Note that the old buffer still may have producers in progress (pending writes), so the consumer has to wait till the writers are drained. Currently this is the slow path for producers (write completion) and needs to be improved. Currently, there is special handling for cases where the number of consumers match (or exceed) the number of producers, which could result in writer starvation. In this scenario, when a consumers requests a buffer list for consumption, a check is performed for writer starvation and consumption is denied until at least another buffer list is ready of the producer for writes, i.e., one (or more) consumer(s) completed, thereby putting the buffer list back in rotation. [ NOTE: I've not performance tested this producer-consumer model yet. It's being used in changelog for event notification. The list of buffers (iovecs) are directly passed to RPC layer. ] Change-Id: I88d235522b05ab82509aba861374a2312bff57f2 BUG: 1170075 Signed-off-by: Venky Shankar <vshankar@redhat.com> Reviewed-on: http://review.gluster.org/9706 Tested-by: Vijay Bellur <vbellur@redhat.com>
-rw-r--r--configure.ac11
-rw-r--r--libglusterfs/src/Makefile.am6
-rw-r--r--libglusterfs/src/list.h23
-rw-r--r--libglusterfs/src/mem-types.h23
-rw-r--r--libglusterfs/src/rot-buffs.c491
-rw-r--r--libglusterfs/src/rot-buffs.h121
6 files changed, 661 insertions, 14 deletions
diff --git a/configure.ac b/configure.ac
index 985baf6cb4c..8f9a4b0d150 100644
--- a/configure.ac
+++ b/configure.ac
@@ -376,6 +376,17 @@ AC_CHECK_HEADERS([sys/ioctl.h], AC_DEFINE(HAVE_IOCTL_IN_SYS_IOCTL_H, 1, [have sy
AC_CHECK_HEADERS([sys/extattr.h])
+dnl Math library
+case $host_os in
+ linux*)
+ MATH_LIB=''
+ ;;
+ *)
+ MATH_LIB='-lm'
+ ;;
+esac
+AC_SUBST(MATH_LIB)
+
dnl NetBSD does not support POSIX ACLs :-(
case $host_os in
*netbsd*)
diff --git a/libglusterfs/src/Makefile.am b/libglusterfs/src/Makefile.am
index 73ee69f8630..2441023ad39 100644
--- a/libglusterfs/src/Makefile.am
+++ b/libglusterfs/src/Makefile.am
@@ -7,7 +7,7 @@ libglusterfs_la_CPPFLAGS = $(GF_CPPFLAGS) -D__USE_FILE_OFFSET64 \
-I$(CONTRIBDIR)/libexecinfo ${ARGP_STANDALONE_CPPFLAGS} \
-DSBIN_DIR=\"$(sbindir)\" -lm
-libglusterfs_la_LIBADD = @LEXLIB@ $(ZLIB_LIBS)
+libglusterfs_la_LIBADD = @LEXLIB@ $(ZLIB_LIBS) $(MATH_LIB)
libglusterfs_la_LDFLAGS = -version-info $(LIBGLUSTERFS_LT_VERSION)
lib_LTLIBRARIES = libglusterfs.la
@@ -30,7 +30,7 @@ libglusterfs_la_SOURCES = dict.c xlator.c logging.c \
$(CONTRIBDIR)/libgen/basename_r.c $(CONTRIBDIR)/libgen/dirname_r.c \
$(CONTRIBDIR)/stdlib/gf_mkostemp.c strfd.c parse-utils.c \
$(CONTRIBDIR)/mount/mntent.c $(CONTRIBDIR)/libexecinfo/execinfo.c\
- quota-common-utils.c
+ quota-common-utils.c rot-buffs.c
nodist_libglusterfs_la_SOURCES = y.tab.c graph.lex.c
@@ -49,7 +49,7 @@ noinst_HEADERS = common-utils.h defaults.h dict.h glusterfs.h hashfn.h timespec.
template-component-messages.h strfd.h syncop-utils.h parse-utils.h \
$(CONTRIBDIR)/mount/mntent_compat.h lvm-defaults.h \
$(CONTRIBDIR)/libexecinfo/execinfo_compat.h \
- unittest/unittest.h quota-common-utils.h
+ unittest/unittest.h quota-common-utils.h rot-buffs.h
EXTRA_DIST = graph.l graph.y
diff --git a/libglusterfs/src/list.h b/libglusterfs/src/list.h
index 04b4047129f..894fa3012cf 100644
--- a/libglusterfs/src/list.h
+++ b/libglusterfs/src/list.h
@@ -179,15 +179,36 @@ list_append_init (struct list_head *list, struct list_head *head)
INIT_LIST_HEAD (list);
}
+static inline int
+list_is_last (struct list_head *list, struct list_head *head)
+{
+ return (list->next == head);
+}
+
+static inline int
+list_is_singular(struct list_head *head)
+{
+ return !list_empty(head) && (head->next == head->prev);
+}
#define list_entry(ptr, type, member) \
((type *)((char *)(ptr)-(unsigned long)(&((type *)0)->member)))
+#define list_first_entry(ptr, type, member) \
+ list_entry((ptr)->next, type, member)
+
+#define list_last_entry(ptr, type, member) \
+ list_entry((ptr)->prev, type, member)
+
+#define list_next_entry(pos, member) \
+ list_entry((pos)->member.next, typeof(*(pos)), member)
+
+#define list_prev_entry(pos, member) \
+ list_entry((pos)->member.prev, typeof(*(pos)), member)
#define list_for_each(pos, head) \
for (pos = (head)->next; pos != (head); pos = pos->next)
-
#define list_for_each_entry(pos, head, member) \
for (pos = list_entry((head)->next, typeof(*pos), member); \
&pos->member != (head); \
diff --git a/libglusterfs/src/mem-types.h b/libglusterfs/src/mem-types.h
index 5f4e7bcd2c2..4dd59b002a5 100644
--- a/libglusterfs/src/mem-types.h
+++ b/libglusterfs/src/mem-types.h
@@ -131,17 +131,20 @@ enum gf_common_mem_types_ {
gf_common_mt_parser_t = 115,
gf_common_quota_meta_t = 116,
/*related to gfdb library*/
- gfdb_mt_time_t,
- gf_mt_sql_cbk_args_t,
- gf_mt_gfdb_query_record_t,
- gf_mt_gfdb_link_info_t,
- gf_mt_gfdb_db_operations_t,
- gf_mt_sql_connection_t,
- gf_mt_sql_conn_node_t,
- gf_mt_db_conn_node_t,
- gf_mt_db_connection_t,
- gfdb_mt_db_record_t,
+ gfdb_mt_time_t = 117,
+ gf_mt_sql_cbk_args_t = 118,
+ gf_mt_gfdb_query_record_t = 119,
+ gf_mt_gfdb_link_info_t = 120,
+ gf_mt_gfdb_db_operations_t = 121,
+ gf_mt_sql_connection_t = 122,
+ gf_mt_sql_conn_node_t = 123,
+ gf_mt_db_conn_node_t = 124,
+ gf_mt_db_connection_t = 125,
+ gfdb_mt_db_record_t = 126,
/*related to gfdb library*/
+ gf_common_mt_rbuf_t = 127,
+ gf_common_mt_rlist_t = 128,
+ gf_common_mt_rvec_t = 129,
gf_common_mt_end
};
#endif
diff --git a/libglusterfs/src/rot-buffs.c b/libglusterfs/src/rot-buffs.c
new file mode 100644
index 00000000000..19399b824f4
--- /dev/null
+++ b/libglusterfs/src/rot-buffs.c
@@ -0,0 +1,491 @@
+/*
+ Copyright (c) 2008-2015 Red Hat, Inc. <http://www.redhat.com>
+ This file is part of GlusterFS.
+
+ This file is licensed to you under your choice of the GNU Lesser
+ General Public License, version 3 or any later version (LGPLv3 or
+ later), or the GNU General Public License, version 2 (GPLv2), in all
+ cases as published by the Free Software Foundation.
+*/
+
+#include <math.h>
+
+#include "mem-types.h"
+#include "mem-pool.h"
+
+#include "rot-buffs.h"
+
+/**
+ * Producer-Consumer based on top of rotational buffers.
+ *
+ * This favours writers (producer) and keeps the critical section
+ * light weight. Buffer switch happens when a consumer wants to
+ * consume data. This is the slow path and waits for pending
+ * writes to finish.
+ *
+ * TODO: do away with opaques (use arrays with indexing).
+ */
+
+#define ROT_BUFF_DEFAULT_COUNT 2
+#define ROT_BUFF_ALLOC_SIZE (1 * 1024 * 1024) /* 1MB per iovec */
+
+#define RLIST_IOV_MELDED_ALLOC_SIZE (RBUF_IOVEC_SIZE + ROT_BUFF_ALLOC_SIZE)
+
+/**
+ * iovec list is not shrinked (deallocated) if usage/total count
+ * falls in this range. this is the fast path and should satisfy
+ * most of the workloads. for the rest shrinking iovec list is
+ * generous.
+ */
+#define RVEC_LOW_WATERMARK_COUNT 1
+#define RVEC_HIGH_WATERMARK_COUNT (1 << 4)
+
+static inline
+rbuf_list_t *rbuf_current_buffer (rbuf_t *rbuf)
+{
+ return rbuf->current;
+}
+
+static inline void
+rlist_mark_waiting (rbuf_list_t *rlist)
+{
+ LOCK (&rlist->c_lock);
+ {
+ rlist->awaiting = _gf_true;
+ }
+ UNLOCK (&rlist->c_lock);
+}
+
+static inline int
+__rlist_has_waiter (rbuf_list_t *rlist)
+{
+ return (rlist->awaiting == _gf_true);
+}
+
+static inline void *
+rbuf_alloc_rvec ()
+{
+ return GF_CALLOC (1, RLIST_IOV_MELDED_ALLOC_SIZE, gf_common_mt_rvec_t);
+}
+
+static inline void
+rlist_reset_vector_usage (rbuf_list_t *rlist)
+{
+ rlist->used = 1;
+}
+
+static inline void
+rlist_increment_vector_usage (rbuf_list_t *rlist)
+{
+ rlist->used++;
+}
+
+static inline void
+rlist_increment_total_usage (rbuf_list_t *rlist)
+{
+ rlist->total++;
+}
+
+static inline int
+rvec_in_watermark_range (rbuf_list_t *rlist)
+{
+ return ((rlist->total >= RVEC_LOW_WATERMARK_COUNT)
+ && (rlist->total <= RVEC_HIGH_WATERMARK_COUNT));
+}
+
+static inline void
+rbuf_reset_rvec (rbuf_iovec_t *rvec)
+{
+ /* iov_base is _never_ modified */
+ rvec->iov.iov_len = 0;
+}
+
+/* TODO: alloc multiple rbuf_iovec_t */
+static inline int
+rlist_add_new_vec (rbuf_list_t *rlist)
+{
+ rbuf_iovec_t *rvec = NULL;
+
+ rvec = (rbuf_iovec_t *) rbuf_alloc_rvec ();
+ if (!rvec)
+ return -1;
+ INIT_LIST_HEAD (&rvec->list);
+ rvec->iov.iov_base = ((char *)rvec) + RBUF_IOVEC_SIZE;
+ rvec->iov.iov_len = 0;
+
+ list_add_tail (&rvec->list, &rlist->veclist);
+
+ rlist->rvec = rvec; /* cache the latest */
+
+ rlist_increment_vector_usage (rlist);
+ rlist_increment_total_usage (rlist);
+
+ return 0;
+}
+
+static inline void
+rlist_free_rvec (rbuf_iovec_t *rvec)
+{
+ if (!rvec)
+ return;
+ list_del (&rvec->list);
+ GF_FREE (rvec);
+}
+
+static inline void
+rlist_purge_all_rvec (rbuf_list_t *rlist)
+{
+ rbuf_iovec_t *rvec = NULL;
+
+ if (!rlist)
+ return;
+ while (!list_empty (&rlist->veclist)) {
+ rvec = list_first_entry (&rlist->veclist, rbuf_iovec_t, list);
+ rlist_free_rvec (rvec);
+ }
+}
+
+static inline void
+rlist_shrink_rvec (rbuf_list_t *rlist, unsigned long long shrink)
+{
+ rbuf_iovec_t *rvec = NULL;
+
+ while (!list_empty (&rlist->veclist) && (shrink-- > 0)) {
+ rvec = list_first_entry (&rlist->veclist, rbuf_iovec_t, list);
+ rlist_free_rvec (rvec);
+ }
+}
+
+static inline void
+rbuf_purge_rlist (rbuf_t *rbuf)
+{
+ rbuf_list_t *rlist = NULL;
+
+ while (!list_empty (&rbuf->freelist)) {
+ rlist = list_first_entry (&rbuf->freelist, rbuf_list_t, list);
+ list_del (&rlist->list);
+
+ rlist_purge_all_rvec (rlist);
+
+ LOCK_DESTROY (&rlist->c_lock);
+
+ (void) pthread_mutex_destroy (&rlist->b_lock);
+ (void) pthread_cond_destroy (&rlist->b_cond);
+
+ GF_FREE (rlist);
+ }
+}
+
+rbuf_t *
+rbuf_init (int bufcount)
+{
+ int j = 0;
+ int ret = 0;
+ rbuf_t *rbuf = NULL;
+ rbuf_list_t *rlist = NULL;
+
+ if (bufcount <= 0)
+ bufcount = ROT_BUFF_DEFAULT_COUNT;
+
+ rbuf = GF_CALLOC (1, sizeof (rbuf_t), gf_common_mt_rbuf_t);
+ if (!rbuf)
+ goto error_return;
+
+ LOCK_INIT (&rbuf->lock);
+ INIT_LIST_HEAD (&rbuf->freelist);
+
+ /* it could have been one big calloc() but this is just once.. */
+ for (j = 0; j < bufcount; j++) {
+ rlist = GF_CALLOC (1,
+ sizeof (rbuf_list_t), gf_common_mt_rlist_t);
+ if (!rlist) {
+ ret = -1;
+ break;
+ }
+
+ INIT_LIST_HEAD (&rlist->list);
+ INIT_LIST_HEAD (&rlist->veclist);
+
+ rlist->pending = rlist->completed = 0;
+
+ ret = rlist_add_new_vec (rlist);
+ if (ret)
+ break;
+
+ LOCK_INIT (&rlist->c_lock);
+
+ rlist->awaiting = _gf_false;
+ ret = pthread_mutex_init (&rlist->b_lock, 0);
+ if (ret != 0) {
+ GF_FREE (rlist);
+ break;
+ }
+
+ ret = pthread_cond_init (&rlist->b_cond, 0);
+ if (ret != 0) {
+ GF_FREE (rlist);
+ break;
+ }
+
+ list_add_tail (&rlist->list, &rbuf->freelist);
+ }
+
+ if (ret != 0)
+ goto dealloc_rlist;
+
+ /* cache currently used buffer: first in the list */
+ rbuf->current = list_first_entry (&rbuf->freelist, rbuf_list_t, list);
+ return rbuf;
+
+ dealloc_rlist:
+ rbuf_purge_rlist (rbuf);
+ LOCK_DESTROY (&rbuf->lock);
+ GF_FREE (rbuf);
+ error_return:
+ return NULL;
+}
+
+void
+rbuf_dtor (rbuf_t *rbuf)
+{
+ if (!rbuf)
+ return;
+ rbuf->current = NULL;
+ rbuf_purge_rlist (rbuf);
+ LOCK_DESTROY (&rbuf->lock);
+
+ GF_FREE (rbuf);
+}
+
+static inline char *
+rbuf_adjust_write_area (struct iovec *iov, size_t bytes)
+{
+ char *wbuf = NULL;
+
+ wbuf = iov->iov_base + iov->iov_len;
+ iov->iov_len += bytes;
+ return wbuf;
+}
+
+static inline char *
+rbuf_alloc_write_area (rbuf_list_t *rlist, size_t bytes)
+{
+ int ret = 0;
+ struct iovec *iov = NULL;
+
+ /* check for available space in _current_ IO buffer */
+ iov = &rlist->rvec->iov;
+ if (iov->iov_len + bytes <= ROT_BUFF_ALLOC_SIZE)
+ return rbuf_adjust_write_area (iov, bytes); /* fast path */
+
+ /* not enough bytes, try next available buffers */
+ if (list_is_last (&rlist->rvec->list, &rlist->veclist)) {
+ /* OH! consumed all vector buffers */
+ GF_ASSERT (rlist->used == rlist->total);
+ ret = rlist_add_new_vec (rlist);
+ if (ret)
+ goto error_return;
+ } else {
+ /* not the end, have available rbuf_iovec's */
+ rlist->rvec = list_next_entry (rlist->rvec, list);
+ rlist->used++;
+ rbuf_reset_rvec (rlist->rvec);
+ }
+
+ iov = &rlist->rvec->iov;
+ return rbuf_adjust_write_area (iov, bytes);
+
+ error_return:
+ return NULL;
+}
+
+char *
+rbuf_reserve_write_area (rbuf_t *rbuf, size_t bytes, void **opaque)
+{
+ char *wbuf = NULL;
+ rbuf_list_t *rlist = NULL;
+
+ if (!rbuf || (bytes <= 0) || (bytes > ROT_BUFF_ALLOC_SIZE) || !opaque)
+ return NULL;
+
+ LOCK (&rbuf->lock);
+ {
+ rlist = rbuf_current_buffer (rbuf);
+ wbuf = rbuf_alloc_write_area (rlist, bytes);
+ if (!wbuf)
+ goto unblock;
+ rlist->pending++;
+ }
+ unblock:
+ UNLOCK (&rbuf->lock);
+
+ if (wbuf)
+ *opaque = rlist;
+ return wbuf;
+}
+
+static inline void
+rbuf_notify_waiter (rbuf_list_t *rlist)
+{
+ pthread_mutex_lock (&rlist->b_lock);
+ {
+ pthread_cond_signal (&rlist->b_cond);
+ }
+ pthread_mutex_unlock (&rlist->b_lock);
+}
+
+int
+rbuf_write_complete (void *opaque)
+{
+ rbuf_list_t *rlist = NULL;
+ gf_boolean_t notify = _gf_false;
+
+ if (!opaque)
+ return -1;
+
+ rlist = opaque;
+
+ LOCK (&rlist->c_lock);
+ {
+ rlist->completed++;
+ /**
+ * it's safe to test ->pending without rbuf->lock *only* if
+ * there's a waiter as there can be no new incoming writes.
+ */
+ if (__rlist_has_waiter (rlist)
+ && (rlist->completed == rlist->pending))
+ notify = _gf_true;
+ }
+ UNLOCK (&rlist->c_lock);
+
+ if (notify)
+ rbuf_notify_waiter (rlist);
+
+ return 0;
+}
+
+int
+rbuf_get_buffer (rbuf_t *rbuf,
+ void **opaque, sequence_fn *seqfn, void *mydata)
+{
+ int retval = RBUF_CONSUMABLE;
+ rbuf_list_t *rlist = NULL;
+
+ if (!rbuf || !opaque)
+ return -1;
+
+ LOCK (&rbuf->lock);
+ {
+ rlist = rbuf_current_buffer (rbuf);
+ if (!rlist->pending) {
+ retval = RBUF_EMPTY;
+ goto unblock;
+ }
+
+ if (list_is_singular (&rbuf->freelist)) {
+ /**
+ * removal would lead to writer starvation, disallow
+ * switching.
+ */
+ retval = RBUF_WOULD_STARVE;
+ goto unblock;
+ }
+
+ list_del_init (&rlist->list);
+ if (seqfn)
+ seqfn (rlist, mydata);
+ rbuf->current =
+ list_first_entry (&rbuf->freelist, rbuf_list_t, list);
+ }
+ unblock:
+ UNLOCK (&rbuf->lock);
+
+ if (retval == RBUF_CONSUMABLE)
+ *opaque = rlist; /* caller _owns_ the buffer */
+
+ return retval;
+}
+
+/**
+ * Wait for completion of pending writers and invoke dispatcher
+ * routine (for buffer consumption).
+ */
+
+static inline void
+__rbuf_wait_for_writers (rbuf_list_t *rlist)
+{
+ while (rlist->completed != rlist->pending)
+ pthread_cond_wait (&rlist->b_cond, &rlist->b_lock);
+}
+
+#ifndef M_E
+#define M_E 2.7
+#endif
+
+static inline void
+rlist_shrink_vector (rbuf_list_t *rlist)
+{
+ unsigned long long shrink = 0;
+
+ /**
+ * fast path: don't bother to deallocate if vectors are hardly
+ * used.
+ */
+ if (rvec_in_watermark_range (rlist))
+ return;
+
+ /**
+ * Calculate the shrink count based on total allocated vectors.
+ * Note that the calculation sticks to rlist->total irrespective
+ * of the actual usage count (rlist->used). Later, ->used could
+ * be used to apply slack to the calculation based on how much
+ * it lags from ->total. For now, let's stick to slow decay.
+ */
+ shrink = rlist->total - (rlist->total * pow (M_E, -0.2));
+
+ rlist_shrink_rvec (rlist, shrink);
+ rlist->total -= shrink;
+}
+
+int
+rbuf_wait_for_completion (rbuf_t *rbuf, void *opaque,
+ void (*fn)(rbuf_list_t *, void *), void *arg)
+{
+ rbuf_list_t *rlist = NULL;
+
+ if (!rbuf || !opaque)
+ return -1;
+
+ rlist = opaque;
+
+ pthread_mutex_lock (&rlist->b_lock);
+ {
+ rlist_mark_waiting (rlist);
+ __rbuf_wait_for_writers (rlist);
+ }
+ pthread_mutex_unlock (&rlist->b_lock);
+
+ /**
+ * from here on, no need of locking until the rlist is put
+ * back into rotation.
+ */
+
+ fn (rlist, arg); /* invoke dispatcher */
+
+ rlist->awaiting = _gf_false;
+ rlist->pending = rlist->completed = 0;
+
+ rlist_shrink_vector (rlist);
+ rlist_reset_vector_usage (rlist);
+
+ rlist->rvec = list_first_entry (&rlist->veclist, rbuf_iovec_t, list);
+ rbuf_reset_rvec (rlist->rvec);
+
+ LOCK (&rbuf->lock);
+ {
+ list_add_tail (&rlist->list, &rbuf->freelist);
+ }
+ UNLOCK (&rbuf->lock);
+
+ return 0;
+}
diff --git a/libglusterfs/src/rot-buffs.h b/libglusterfs/src/rot-buffs.h
new file mode 100644
index 00000000000..aac24a4f571
--- /dev/null
+++ b/libglusterfs/src/rot-buffs.h
@@ -0,0 +1,121 @@
+/*
+ Copyright (c) 2008-2015 Red Hat, Inc. <http://www.redhat.com>
+ This file is part of GlusterFS.
+
+ This file is licensed to you under your choice of the GNU Lesser
+ General Public License, version 3 or any later version (LGPLv3 or
+ later), or the GNU General Public License, version 2 (GPLv2), in all
+ cases as published by the Free Software Foundation.
+*/
+
+#ifndef __ROT_BUFFS_H
+#define __ROT_BUFFS_H
+
+#include "list.h"
+#include "locking.h"
+#include "common-utils.h"
+
+typedef struct rbuf_iovec {
+ struct iovec iov;
+
+ struct list_head list;
+} rbuf_iovec_t;
+
+#define RBUF_IOVEC_SIZE (sizeof (rbuf_iovec_t))
+
+typedef struct rbuf_list {
+ gf_lock_t c_lock;
+
+ pthread_mutex_t b_lock; /* protects this structure */
+ pthread_cond_t b_cond; /* signal for writer completion */
+
+ gf_boolean_t awaiting;
+
+ unsigned long long pending; /* pending writers */
+ unsigned long long completed; /* completed writers */
+
+ rbuf_iovec_t *rvec; /* currently used IO vector */
+
+ struct list_head veclist; /* list of attached rbuf_iov */
+
+ unsigned long long used; /* consumable entries
+ attached in ->veclist */
+ unsigned long long total; /* total entries in ->veclist (used
+ during deallocation) */
+
+ unsigned long seq[2]; /* if interested, this whould store
+ the start sequence number and the
+ range */
+
+ struct list_head list; /* attachment to rbuf_t */
+} rbuf_list_t;
+
+struct rlist_iter {
+ struct list_head veclist;
+
+ unsigned long long iter;
+};
+
+#define RLIST_ENTRY_COUNT(rlist) rlist->used
+
+#define rlist_iter_init(riter, rlist) \
+ do { \
+ (riter)->iter = rlist->used; \
+ (riter)->veclist = rlist->veclist; \
+ } while (0)
+
+#define rvec_for_each_entry(pos, riter) \
+ for (pos = list_entry \
+ ((riter)->veclist.next, typeof(*pos), list); \
+ (riter)->iter > 0; \
+ pos = list_entry \
+ (pos->list.next, typeof(*pos), list), \
+ --((riter)->iter))
+
+/**
+ * Sequence number assigment routine is called during buffer
+ * switch under rbuff ->lock.
+ */
+typedef void (sequence_fn) (rbuf_list_t *, void *);
+
+#define RLIST_STORE_SEQ(rlist, start, range) \
+ do { \
+ rlist->seq[0] = start; \
+ rlist->seq[1] = range; \
+ } while (0)
+
+#define RLIST_GET_SEQ(rlist, start, range) \
+ do { \
+ start = rlist->seq[0]; \
+ range = rlist->seq[1]; \
+ } while (0)
+
+typedef struct rbuf {
+ gf_lock_t lock; /* protects "current" rlist */
+
+ rbuf_list_t *current; /* cached pointer to first free rlist */
+
+ struct list_head freelist;
+} rbuf_t;
+
+typedef enum {
+ RBUF_CONSUMABLE = 1,
+ RBUF_BUSY,
+ RBUF_EMPTY,
+ RBUF_WOULD_STARVE,
+} rlist_retval_t;
+
+/* Initialization/Destruction */
+rbuf_t *rbuf_init (int);
+void rbuf_dtor (rbuf_t *);
+
+/* Producer API */
+char *rbuf_reserve_write_area (rbuf_t *, size_t, void **);
+int rbuf_write_complete (void *);
+
+/* Consumer API */
+int rbuf_get_buffer (rbuf_t *, void **, sequence_fn *, void *);
+int rbuf_wait_for_completion (rbuf_t *, void *,
+ void (*)(rbuf_list_t *, void *), void *);
+
+#endif