summaryrefslogtreecommitdiffstats
path: root/libglusterfs/src/rot-buffs.c
diff options
context:
space:
mode:
Diffstat (limited to 'libglusterfs/src/rot-buffs.c')
-rw-r--r--libglusterfs/src/rot-buffs.c491
1 files changed, 491 insertions, 0 deletions
diff --git a/libglusterfs/src/rot-buffs.c b/libglusterfs/src/rot-buffs.c
new file mode 100644
index 00000000000..19399b824f4
--- /dev/null
+++ b/libglusterfs/src/rot-buffs.c
@@ -0,0 +1,491 @@
+/*
+ Copyright (c) 2008-2015 Red Hat, Inc. <http://www.redhat.com>
+ This file is part of GlusterFS.
+
+ This file is licensed to you under your choice of the GNU Lesser
+ General Public License, version 3 or any later version (LGPLv3 or
+ later), or the GNU General Public License, version 2 (GPLv2), in all
+ cases as published by the Free Software Foundation.
+*/
+
+#include <math.h>
+
+#include "mem-types.h"
+#include "mem-pool.h"
+
+#include "rot-buffs.h"
+
+/**
+ * Producer-Consumer based on top of rotational buffers.
+ *
+ * This favours writers (producer) and keeps the critical section
+ * light weight. Buffer switch happens when a consumer wants to
+ * consume data. This is the slow path and waits for pending
+ * writes to finish.
+ *
+ * TODO: do away with opaques (use arrays with indexing).
+ */
+
+#define ROT_BUFF_DEFAULT_COUNT 2
+#define ROT_BUFF_ALLOC_SIZE (1 * 1024 * 1024) /* 1MB per iovec */
+
+#define RLIST_IOV_MELDED_ALLOC_SIZE (RBUF_IOVEC_SIZE + ROT_BUFF_ALLOC_SIZE)
+
+/**
+ * iovec list is not shrinked (deallocated) if usage/total count
+ * falls in this range. this is the fast path and should satisfy
+ * most of the workloads. for the rest shrinking iovec list is
+ * generous.
+ */
+#define RVEC_LOW_WATERMARK_COUNT 1
+#define RVEC_HIGH_WATERMARK_COUNT (1 << 4)
+
+static inline
+rbuf_list_t *rbuf_current_buffer (rbuf_t *rbuf)
+{
+ return rbuf->current;
+}
+
+static inline void
+rlist_mark_waiting (rbuf_list_t *rlist)
+{
+ LOCK (&rlist->c_lock);
+ {
+ rlist->awaiting = _gf_true;
+ }
+ UNLOCK (&rlist->c_lock);
+}
+
+static inline int
+__rlist_has_waiter (rbuf_list_t *rlist)
+{
+ return (rlist->awaiting == _gf_true);
+}
+
+static inline void *
+rbuf_alloc_rvec ()
+{
+ return GF_CALLOC (1, RLIST_IOV_MELDED_ALLOC_SIZE, gf_common_mt_rvec_t);
+}
+
+static inline void
+rlist_reset_vector_usage (rbuf_list_t *rlist)
+{
+ rlist->used = 1;
+}
+
+static inline void
+rlist_increment_vector_usage (rbuf_list_t *rlist)
+{
+ rlist->used++;
+}
+
+static inline void
+rlist_increment_total_usage (rbuf_list_t *rlist)
+{
+ rlist->total++;
+}
+
+static inline int
+rvec_in_watermark_range (rbuf_list_t *rlist)
+{
+ return ((rlist->total >= RVEC_LOW_WATERMARK_COUNT)
+ && (rlist->total <= RVEC_HIGH_WATERMARK_COUNT));
+}
+
+static inline void
+rbuf_reset_rvec (rbuf_iovec_t *rvec)
+{
+ /* iov_base is _never_ modified */
+ rvec->iov.iov_len = 0;
+}
+
+/* TODO: alloc multiple rbuf_iovec_t */
+static inline int
+rlist_add_new_vec (rbuf_list_t *rlist)
+{
+ rbuf_iovec_t *rvec = NULL;
+
+ rvec = (rbuf_iovec_t *) rbuf_alloc_rvec ();
+ if (!rvec)
+ return -1;
+ INIT_LIST_HEAD (&rvec->list);
+ rvec->iov.iov_base = ((char *)rvec) + RBUF_IOVEC_SIZE;
+ rvec->iov.iov_len = 0;
+
+ list_add_tail (&rvec->list, &rlist->veclist);
+
+ rlist->rvec = rvec; /* cache the latest */
+
+ rlist_increment_vector_usage (rlist);
+ rlist_increment_total_usage (rlist);
+
+ return 0;
+}
+
+static inline void
+rlist_free_rvec (rbuf_iovec_t *rvec)
+{
+ if (!rvec)
+ return;
+ list_del (&rvec->list);
+ GF_FREE (rvec);
+}
+
+static inline void
+rlist_purge_all_rvec (rbuf_list_t *rlist)
+{
+ rbuf_iovec_t *rvec = NULL;
+
+ if (!rlist)
+ return;
+ while (!list_empty (&rlist->veclist)) {
+ rvec = list_first_entry (&rlist->veclist, rbuf_iovec_t, list);
+ rlist_free_rvec (rvec);
+ }
+}
+
+static inline void
+rlist_shrink_rvec (rbuf_list_t *rlist, unsigned long long shrink)
+{
+ rbuf_iovec_t *rvec = NULL;
+
+ while (!list_empty (&rlist->veclist) && (shrink-- > 0)) {
+ rvec = list_first_entry (&rlist->veclist, rbuf_iovec_t, list);
+ rlist_free_rvec (rvec);
+ }
+}
+
+static inline void
+rbuf_purge_rlist (rbuf_t *rbuf)
+{
+ rbuf_list_t *rlist = NULL;
+
+ while (!list_empty (&rbuf->freelist)) {
+ rlist = list_first_entry (&rbuf->freelist, rbuf_list_t, list);
+ list_del (&rlist->list);
+
+ rlist_purge_all_rvec (rlist);
+
+ LOCK_DESTROY (&rlist->c_lock);
+
+ (void) pthread_mutex_destroy (&rlist->b_lock);
+ (void) pthread_cond_destroy (&rlist->b_cond);
+
+ GF_FREE (rlist);
+ }
+}
+
+rbuf_t *
+rbuf_init (int bufcount)
+{
+ int j = 0;
+ int ret = 0;
+ rbuf_t *rbuf = NULL;
+ rbuf_list_t *rlist = NULL;
+
+ if (bufcount <= 0)
+ bufcount = ROT_BUFF_DEFAULT_COUNT;
+
+ rbuf = GF_CALLOC (1, sizeof (rbuf_t), gf_common_mt_rbuf_t);
+ if (!rbuf)
+ goto error_return;
+
+ LOCK_INIT (&rbuf->lock);
+ INIT_LIST_HEAD (&rbuf->freelist);
+
+ /* it could have been one big calloc() but this is just once.. */
+ for (j = 0; j < bufcount; j++) {
+ rlist = GF_CALLOC (1,
+ sizeof (rbuf_list_t), gf_common_mt_rlist_t);
+ if (!rlist) {
+ ret = -1;
+ break;
+ }
+
+ INIT_LIST_HEAD (&rlist->list);
+ INIT_LIST_HEAD (&rlist->veclist);
+
+ rlist->pending = rlist->completed = 0;
+
+ ret = rlist_add_new_vec (rlist);
+ if (ret)
+ break;
+
+ LOCK_INIT (&rlist->c_lock);
+
+ rlist->awaiting = _gf_false;
+ ret = pthread_mutex_init (&rlist->b_lock, 0);
+ if (ret != 0) {
+ GF_FREE (rlist);
+ break;
+ }
+
+ ret = pthread_cond_init (&rlist->b_cond, 0);
+ if (ret != 0) {
+ GF_FREE (rlist);
+ break;
+ }
+
+ list_add_tail (&rlist->list, &rbuf->freelist);
+ }
+
+ if (ret != 0)
+ goto dealloc_rlist;
+
+ /* cache currently used buffer: first in the list */
+ rbuf->current = list_first_entry (&rbuf->freelist, rbuf_list_t, list);
+ return rbuf;
+
+ dealloc_rlist:
+ rbuf_purge_rlist (rbuf);
+ LOCK_DESTROY (&rbuf->lock);
+ GF_FREE (rbuf);
+ error_return:
+ return NULL;
+}
+
+void
+rbuf_dtor (rbuf_t *rbuf)
+{
+ if (!rbuf)
+ return;
+ rbuf->current = NULL;
+ rbuf_purge_rlist (rbuf);
+ LOCK_DESTROY (&rbuf->lock);
+
+ GF_FREE (rbuf);
+}
+
+static inline char *
+rbuf_adjust_write_area (struct iovec *iov, size_t bytes)
+{
+ char *wbuf = NULL;
+
+ wbuf = iov->iov_base + iov->iov_len;
+ iov->iov_len += bytes;
+ return wbuf;
+}
+
+static inline char *
+rbuf_alloc_write_area (rbuf_list_t *rlist, size_t bytes)
+{
+ int ret = 0;
+ struct iovec *iov = NULL;
+
+ /* check for available space in _current_ IO buffer */
+ iov = &rlist->rvec->iov;
+ if (iov->iov_len + bytes <= ROT_BUFF_ALLOC_SIZE)
+ return rbuf_adjust_write_area (iov, bytes); /* fast path */
+
+ /* not enough bytes, try next available buffers */
+ if (list_is_last (&rlist->rvec->list, &rlist->veclist)) {
+ /* OH! consumed all vector buffers */
+ GF_ASSERT (rlist->used == rlist->total);
+ ret = rlist_add_new_vec (rlist);
+ if (ret)
+ goto error_return;
+ } else {
+ /* not the end, have available rbuf_iovec's */
+ rlist->rvec = list_next_entry (rlist->rvec, list);
+ rlist->used++;
+ rbuf_reset_rvec (rlist->rvec);
+ }
+
+ iov = &rlist->rvec->iov;
+ return rbuf_adjust_write_area (iov, bytes);
+
+ error_return:
+ return NULL;
+}
+
+char *
+rbuf_reserve_write_area (rbuf_t *rbuf, size_t bytes, void **opaque)
+{
+ char *wbuf = NULL;
+ rbuf_list_t *rlist = NULL;
+
+ if (!rbuf || (bytes <= 0) || (bytes > ROT_BUFF_ALLOC_SIZE) || !opaque)
+ return NULL;
+
+ LOCK (&rbuf->lock);
+ {
+ rlist = rbuf_current_buffer (rbuf);
+ wbuf = rbuf_alloc_write_area (rlist, bytes);
+ if (!wbuf)
+ goto unblock;
+ rlist->pending++;
+ }
+ unblock:
+ UNLOCK (&rbuf->lock);
+
+ if (wbuf)
+ *opaque = rlist;
+ return wbuf;
+}
+
+static inline void
+rbuf_notify_waiter (rbuf_list_t *rlist)
+{
+ pthread_mutex_lock (&rlist->b_lock);
+ {
+ pthread_cond_signal (&rlist->b_cond);
+ }
+ pthread_mutex_unlock (&rlist->b_lock);
+}
+
+int
+rbuf_write_complete (void *opaque)
+{
+ rbuf_list_t *rlist = NULL;
+ gf_boolean_t notify = _gf_false;
+
+ if (!opaque)
+ return -1;
+
+ rlist = opaque;
+
+ LOCK (&rlist->c_lock);
+ {
+ rlist->completed++;
+ /**
+ * it's safe to test ->pending without rbuf->lock *only* if
+ * there's a waiter as there can be no new incoming writes.
+ */
+ if (__rlist_has_waiter (rlist)
+ && (rlist->completed == rlist->pending))
+ notify = _gf_true;
+ }
+ UNLOCK (&rlist->c_lock);
+
+ if (notify)
+ rbuf_notify_waiter (rlist);
+
+ return 0;
+}
+
+int
+rbuf_get_buffer (rbuf_t *rbuf,
+ void **opaque, sequence_fn *seqfn, void *mydata)
+{
+ int retval = RBUF_CONSUMABLE;
+ rbuf_list_t *rlist = NULL;
+
+ if (!rbuf || !opaque)
+ return -1;
+
+ LOCK (&rbuf->lock);
+ {
+ rlist = rbuf_current_buffer (rbuf);
+ if (!rlist->pending) {
+ retval = RBUF_EMPTY;
+ goto unblock;
+ }
+
+ if (list_is_singular (&rbuf->freelist)) {
+ /**
+ * removal would lead to writer starvation, disallow
+ * switching.
+ */
+ retval = RBUF_WOULD_STARVE;
+ goto unblock;
+ }
+
+ list_del_init (&rlist->list);
+ if (seqfn)
+ seqfn (rlist, mydata);
+ rbuf->current =
+ list_first_entry (&rbuf->freelist, rbuf_list_t, list);
+ }
+ unblock:
+ UNLOCK (&rbuf->lock);
+
+ if (retval == RBUF_CONSUMABLE)
+ *opaque = rlist; /* caller _owns_ the buffer */
+
+ return retval;
+}
+
+/**
+ * Wait for completion of pending writers and invoke dispatcher
+ * routine (for buffer consumption).
+ */
+
+static inline void
+__rbuf_wait_for_writers (rbuf_list_t *rlist)
+{
+ while (rlist->completed != rlist->pending)
+ pthread_cond_wait (&rlist->b_cond, &rlist->b_lock);
+}
+
+#ifndef M_E
+#define M_E 2.7
+#endif
+
+static inline void
+rlist_shrink_vector (rbuf_list_t *rlist)
+{
+ unsigned long long shrink = 0;
+
+ /**
+ * fast path: don't bother to deallocate if vectors are hardly
+ * used.
+ */
+ if (rvec_in_watermark_range (rlist))
+ return;
+
+ /**
+ * Calculate the shrink count based on total allocated vectors.
+ * Note that the calculation sticks to rlist->total irrespective
+ * of the actual usage count (rlist->used). Later, ->used could
+ * be used to apply slack to the calculation based on how much
+ * it lags from ->total. For now, let's stick to slow decay.
+ */
+ shrink = rlist->total - (rlist->total * pow (M_E, -0.2));
+
+ rlist_shrink_rvec (rlist, shrink);
+ rlist->total -= shrink;
+}
+
+int
+rbuf_wait_for_completion (rbuf_t *rbuf, void *opaque,
+ void (*fn)(rbuf_list_t *, void *), void *arg)
+{
+ rbuf_list_t *rlist = NULL;
+
+ if (!rbuf || !opaque)
+ return -1;
+
+ rlist = opaque;
+
+ pthread_mutex_lock (&rlist->b_lock);
+ {
+ rlist_mark_waiting (rlist);
+ __rbuf_wait_for_writers (rlist);
+ }
+ pthread_mutex_unlock (&rlist->b_lock);
+
+ /**
+ * from here on, no need of locking until the rlist is put
+ * back into rotation.
+ */
+
+ fn (rlist, arg); /* invoke dispatcher */
+
+ rlist->awaiting = _gf_false;
+ rlist->pending = rlist->completed = 0;
+
+ rlist_shrink_vector (rlist);
+ rlist_reset_vector_usage (rlist);
+
+ rlist->rvec = list_first_entry (&rlist->veclist, rbuf_iovec_t, list);
+ rbuf_reset_rvec (rlist->rvec);
+
+ LOCK (&rbuf->lock);
+ {
+ list_add_tail (&rlist->list, &rbuf->freelist);
+ }
+ UNLOCK (&rbuf->lock);
+
+ return 0;
+}