diff options
Diffstat (limited to 'libglusterfs')
| -rw-r--r-- | libglusterfs/src/Makefile.am | 6 | ||||
| -rw-r--r-- | libglusterfs/src/list.h | 23 | ||||
| -rw-r--r-- | libglusterfs/src/mem-types.h | 23 | ||||
| -rw-r--r-- | libglusterfs/src/rot-buffs.c | 491 | ||||
| -rw-r--r-- | libglusterfs/src/rot-buffs.h | 121 | 
5 files changed, 650 insertions, 14 deletions
diff --git a/libglusterfs/src/Makefile.am b/libglusterfs/src/Makefile.am index 73ee69f8630..2441023ad39 100644 --- a/libglusterfs/src/Makefile.am +++ b/libglusterfs/src/Makefile.am @@ -7,7 +7,7 @@ libglusterfs_la_CPPFLAGS = $(GF_CPPFLAGS) -D__USE_FILE_OFFSET64 \  	-I$(CONTRIBDIR)/libexecinfo ${ARGP_STANDALONE_CPPFLAGS} \  	-DSBIN_DIR=\"$(sbindir)\" -lm -libglusterfs_la_LIBADD = @LEXLIB@ $(ZLIB_LIBS) +libglusterfs_la_LIBADD = @LEXLIB@ $(ZLIB_LIBS) $(MATH_LIB)  libglusterfs_la_LDFLAGS = -version-info $(LIBGLUSTERFS_LT_VERSION)  lib_LTLIBRARIES = libglusterfs.la @@ -30,7 +30,7 @@ libglusterfs_la_SOURCES = dict.c xlator.c logging.c \  	$(CONTRIBDIR)/libgen/basename_r.c $(CONTRIBDIR)/libgen/dirname_r.c \  	$(CONTRIBDIR)/stdlib/gf_mkostemp.c strfd.c parse-utils.c \  	$(CONTRIBDIR)/mount/mntent.c $(CONTRIBDIR)/libexecinfo/execinfo.c\ -	quota-common-utils.c +	quota-common-utils.c rot-buffs.c  nodist_libglusterfs_la_SOURCES = y.tab.c graph.lex.c @@ -49,7 +49,7 @@ noinst_HEADERS = common-utils.h defaults.h dict.h glusterfs.h hashfn.h timespec.  	template-component-messages.h strfd.h syncop-utils.h parse-utils.h \  	$(CONTRIBDIR)/mount/mntent_compat.h lvm-defaults.h \  	$(CONTRIBDIR)/libexecinfo/execinfo_compat.h \ -	unittest/unittest.h quota-common-utils.h +	unittest/unittest.h quota-common-utils.h rot-buffs.h  EXTRA_DIST = graph.l graph.y diff --git a/libglusterfs/src/list.h b/libglusterfs/src/list.h index 04b4047129f..894fa3012cf 100644 --- a/libglusterfs/src/list.h +++ b/libglusterfs/src/list.h @@ -179,15 +179,36 @@ list_append_init (struct list_head *list, struct list_head *head)  	INIT_LIST_HEAD (list);  } +static inline int +list_is_last (struct list_head *list, struct list_head *head) +{ +        return (list->next == head); +} + +static inline int +list_is_singular(struct list_head *head) +{ +        return !list_empty(head) && (head->next == head->prev); +}  #define list_entry(ptr, type, member)					\  	((type *)((char *)(ptr)-(unsigned long)(&((type *)0)->member))) +#define list_first_entry(ptr, type, member)     \ +        list_entry((ptr)->next, type, member) + +#define list_last_entry(ptr, type, member)     \ +        list_entry((ptr)->prev, type, member) + +#define list_next_entry(pos, member) \ +        list_entry((pos)->member.next, typeof(*(pos)), member) + +#define list_prev_entry(pos, member) \ +        list_entry((pos)->member.prev, typeof(*(pos)), member)  #define list_for_each(pos, head)                                        \  	for (pos = (head)->next; pos != (head); pos = pos->next) -  #define list_for_each_entry(pos, head, member)				\  	for (pos = list_entry((head)->next, typeof(*pos), member);	\  	     &pos->member != (head); 					\ diff --git a/libglusterfs/src/mem-types.h b/libglusterfs/src/mem-types.h index 5f4e7bcd2c2..4dd59b002a5 100644 --- a/libglusterfs/src/mem-types.h +++ b/libglusterfs/src/mem-types.h @@ -131,17 +131,20 @@ enum gf_common_mem_types_ {          gf_common_mt_parser_t             = 115,          gf_common_quota_meta_t            = 116,          /*related to gfdb library*/ -        gfdb_mt_time_t, -        gf_mt_sql_cbk_args_t, -        gf_mt_gfdb_query_record_t, -        gf_mt_gfdb_link_info_t, -        gf_mt_gfdb_db_operations_t, -        gf_mt_sql_connection_t, -        gf_mt_sql_conn_node_t, -        gf_mt_db_conn_node_t, -        gf_mt_db_connection_t, -        gfdb_mt_db_record_t, +        gfdb_mt_time_t                    = 117, +        gf_mt_sql_cbk_args_t              = 118, +        gf_mt_gfdb_query_record_t         = 119, +        gf_mt_gfdb_link_info_t            = 120, +        gf_mt_gfdb_db_operations_t        = 121, +        gf_mt_sql_connection_t            = 122, +        gf_mt_sql_conn_node_t             = 123, +        gf_mt_db_conn_node_t              = 124, +        gf_mt_db_connection_t             = 125, +        gfdb_mt_db_record_t               = 126,          /*related to gfdb library*/ +        gf_common_mt_rbuf_t               = 127, +        gf_common_mt_rlist_t              = 128, +        gf_common_mt_rvec_t               = 129,          gf_common_mt_end  };  #endif diff --git a/libglusterfs/src/rot-buffs.c b/libglusterfs/src/rot-buffs.c new file mode 100644 index 00000000000..19399b824f4 --- /dev/null +++ b/libglusterfs/src/rot-buffs.c @@ -0,0 +1,491 @@ +/* +  Copyright (c) 2008-2015 Red Hat, Inc. <http://www.redhat.com> +  This file is part of GlusterFS. + +  This file is licensed to you under your choice of the GNU Lesser +  General Public License, version 3 or any later version (LGPLv3 or +  later), or the GNU General Public License, version 2 (GPLv2), in all +  cases as published by the Free Software Foundation. +*/ + +#include <math.h> + +#include "mem-types.h" +#include "mem-pool.h" + +#include "rot-buffs.h" + +/** + * Producer-Consumer based on top of rotational buffers. + * + * This favours writers (producer) and keeps the critical section + * light weight. Buffer switch happens when a consumer wants to + * consume data. This is the slow path and waits for pending + * writes to finish. + * + * TODO: do away with opaques (use arrays with indexing). + */ + +#define ROT_BUFF_DEFAULT_COUNT  2 +#define ROT_BUFF_ALLOC_SIZE  (1 * 1024 * 1024)  /* 1MB per iovec */ + +#define RLIST_IOV_MELDED_ALLOC_SIZE  (RBUF_IOVEC_SIZE + ROT_BUFF_ALLOC_SIZE) + +/** + * iovec list is not shrinked (deallocated) if usage/total count + * falls in this range. this is the fast path and should satisfy + * most of the workloads. for the rest shrinking iovec list is + * generous. + */ +#define RVEC_LOW_WATERMARK_COUNT  1 +#define RVEC_HIGH_WATERMARK_COUNT (1 << 4) + +static inline +rbuf_list_t *rbuf_current_buffer (rbuf_t *rbuf) +{ +        return rbuf->current; +} + +static inline void +rlist_mark_waiting (rbuf_list_t *rlist) +{ +        LOCK (&rlist->c_lock); +        { +                rlist->awaiting = _gf_true; +        } +        UNLOCK (&rlist->c_lock); +} + +static inline int +__rlist_has_waiter (rbuf_list_t *rlist) +{ +        return (rlist->awaiting == _gf_true); +} + +static inline void * +rbuf_alloc_rvec () +{ +        return GF_CALLOC (1, RLIST_IOV_MELDED_ALLOC_SIZE, gf_common_mt_rvec_t); +} + +static inline void +rlist_reset_vector_usage (rbuf_list_t *rlist) +{ +        rlist->used = 1; +} + +static inline void +rlist_increment_vector_usage (rbuf_list_t *rlist) +{ +        rlist->used++; +} + +static inline void +rlist_increment_total_usage (rbuf_list_t *rlist) +{ +        rlist->total++; +} + +static inline int +rvec_in_watermark_range (rbuf_list_t *rlist) +{ +        return ((rlist->total >= RVEC_LOW_WATERMARK_COUNT) +                    && (rlist->total <= RVEC_HIGH_WATERMARK_COUNT)); +} + +static inline void +rbuf_reset_rvec (rbuf_iovec_t *rvec) +{ +        /* iov_base is _never_ modified */ +        rvec->iov.iov_len = 0; +} + +/* TODO: alloc multiple rbuf_iovec_t */ +static inline int +rlist_add_new_vec (rbuf_list_t *rlist) +{ +        rbuf_iovec_t *rvec = NULL; + +        rvec = (rbuf_iovec_t *) rbuf_alloc_rvec (); +        if (!rvec) +                return -1; +        INIT_LIST_HEAD (&rvec->list); +        rvec->iov.iov_base = ((char *)rvec) + RBUF_IOVEC_SIZE; +        rvec->iov.iov_len = 0; + +        list_add_tail (&rvec->list, &rlist->veclist); + +        rlist->rvec = rvec; /* cache the latest */ + +        rlist_increment_vector_usage (rlist); +        rlist_increment_total_usage (rlist); + +        return 0; +} + +static inline void +rlist_free_rvec (rbuf_iovec_t *rvec) +{ +        if (!rvec) +                return; +        list_del (&rvec->list); +        GF_FREE (rvec); +} + +static inline void +rlist_purge_all_rvec (rbuf_list_t *rlist) +{ +        rbuf_iovec_t *rvec = NULL; + +        if (!rlist) +                return; +        while (!list_empty (&rlist->veclist)) { +                rvec = list_first_entry (&rlist->veclist, rbuf_iovec_t, list); +                rlist_free_rvec (rvec); +        } +} + +static inline void +rlist_shrink_rvec (rbuf_list_t *rlist, unsigned long long shrink) +{ +        rbuf_iovec_t *rvec = NULL; + +        while (!list_empty (&rlist->veclist) && (shrink-- > 0)) { +                rvec = list_first_entry (&rlist->veclist, rbuf_iovec_t, list); +                rlist_free_rvec (rvec); +        } +} + +static inline void +rbuf_purge_rlist (rbuf_t *rbuf) +{ +        rbuf_list_t *rlist = NULL; + +        while (!list_empty (&rbuf->freelist)) { +                rlist = list_first_entry (&rbuf->freelist, rbuf_list_t, list); +                list_del (&rlist->list); + +                rlist_purge_all_rvec (rlist); + +                LOCK_DESTROY (&rlist->c_lock); + +                (void) pthread_mutex_destroy (&rlist->b_lock); +                (void) pthread_cond_destroy (&rlist->b_cond); + +                GF_FREE (rlist); +        } +} + +rbuf_t * +rbuf_init (int bufcount) +{ +        int          j     = 0; +        int          ret   = 0; +        rbuf_t      *rbuf  = NULL; +        rbuf_list_t *rlist = NULL; + +        if (bufcount <= 0) +                bufcount = ROT_BUFF_DEFAULT_COUNT; + +        rbuf = GF_CALLOC (1, sizeof (rbuf_t), gf_common_mt_rbuf_t); +        if (!rbuf) +                goto error_return; + +        LOCK_INIT (&rbuf->lock); +        INIT_LIST_HEAD (&rbuf->freelist); + +        /* it could have been one big calloc() but this is just once.. */ +        for (j = 0; j < bufcount; j++) { +                rlist = GF_CALLOC (1, +                                   sizeof (rbuf_list_t), gf_common_mt_rlist_t); +                if (!rlist) { +                        ret = -1; +                        break; +                } + +                INIT_LIST_HEAD (&rlist->list); +                INIT_LIST_HEAD (&rlist->veclist); + +                rlist->pending = rlist->completed = 0; + +                ret = rlist_add_new_vec (rlist); +                if (ret) +                        break; + +                LOCK_INIT (&rlist->c_lock); + +                rlist->awaiting = _gf_false; +                ret = pthread_mutex_init (&rlist->b_lock, 0); +                if (ret != 0) { +                        GF_FREE (rlist); +                        break; +                } + +                ret = pthread_cond_init (&rlist->b_cond, 0); +                if (ret != 0) { +                        GF_FREE (rlist); +                        break; +                } + +                list_add_tail (&rlist->list, &rbuf->freelist); +        } + +        if (ret != 0) +                goto dealloc_rlist; + +        /* cache currently used buffer: first in the list */ +        rbuf->current = list_first_entry (&rbuf->freelist, rbuf_list_t, list); +        return rbuf; + + dealloc_rlist: +        rbuf_purge_rlist (rbuf); +        LOCK_DESTROY (&rbuf->lock); +        GF_FREE (rbuf); + error_return: +        return NULL; +} + +void +rbuf_dtor (rbuf_t *rbuf) +{ +        if (!rbuf) +                return; +        rbuf->current = NULL; +        rbuf_purge_rlist (rbuf); +        LOCK_DESTROY (&rbuf->lock); + +        GF_FREE (rbuf); +} + +static inline char * +rbuf_adjust_write_area (struct iovec *iov, size_t bytes) +{ +        char *wbuf = NULL; + +        wbuf = iov->iov_base + iov->iov_len; +        iov->iov_len += bytes; +        return wbuf; +} + +static inline char * +rbuf_alloc_write_area (rbuf_list_t *rlist, size_t bytes) +{ +        int           ret = 0; +        struct iovec *iov = NULL; + +        /* check for available space in _current_ IO buffer */ +        iov = &rlist->rvec->iov; +        if (iov->iov_len + bytes <= ROT_BUFF_ALLOC_SIZE) +                return rbuf_adjust_write_area (iov, bytes); /* fast path */ + +        /* not enough bytes, try next available buffers */ +        if (list_is_last (&rlist->rvec->list, &rlist->veclist)) { +                /* OH! consumed all vector buffers */ +                GF_ASSERT (rlist->used == rlist->total); +                ret = rlist_add_new_vec (rlist); +                if (ret) +                        goto error_return; +        } else { +                /* not the end, have available rbuf_iovec's */ +                rlist->rvec = list_next_entry (rlist->rvec, list); +                rlist->used++; +                rbuf_reset_rvec (rlist->rvec); +        } + +        iov = &rlist->rvec->iov; +        return rbuf_adjust_write_area (iov, bytes); + + error_return: +        return NULL; +} + +char * +rbuf_reserve_write_area (rbuf_t *rbuf, size_t bytes, void **opaque) +{ +        char        *wbuf  = NULL; +        rbuf_list_t *rlist = NULL; + +        if (!rbuf || (bytes <= 0) || (bytes > ROT_BUFF_ALLOC_SIZE) || !opaque) +                return NULL; + +        LOCK (&rbuf->lock); +        { +                rlist = rbuf_current_buffer (rbuf); +                wbuf = rbuf_alloc_write_area (rlist, bytes); +                if (!wbuf) +                        goto unblock; +                rlist->pending++; +        } + unblock: +        UNLOCK (&rbuf->lock); + +        if (wbuf) +                *opaque = rlist; +        return wbuf; +} + +static inline void +rbuf_notify_waiter (rbuf_list_t *rlist) +{ +        pthread_mutex_lock (&rlist->b_lock); +        { +                pthread_cond_signal (&rlist->b_cond); +        } +        pthread_mutex_unlock (&rlist->b_lock); +} + +int +rbuf_write_complete (void *opaque) +{ +        rbuf_list_t *rlist = NULL; +        gf_boolean_t notify = _gf_false; + +        if (!opaque) +                return -1; + +        rlist = opaque; + +        LOCK (&rlist->c_lock); +        { +                rlist->completed++; +                /** +                 * it's safe to test ->pending without rbuf->lock *only* if +                 * there's a waiter as there can be no new incoming writes. +                 */ +                if (__rlist_has_waiter (rlist) +                                  && (rlist->completed == rlist->pending)) +                        notify = _gf_true; +        } +        UNLOCK (&rlist->c_lock); + +        if (notify) +                rbuf_notify_waiter (rlist); + +        return 0; +} + +int +rbuf_get_buffer (rbuf_t *rbuf, +                 void **opaque, sequence_fn *seqfn, void *mydata) +{ +        int retval = RBUF_CONSUMABLE; +        rbuf_list_t *rlist = NULL; + +        if (!rbuf || !opaque) +                return -1; + +        LOCK (&rbuf->lock); +        { +                rlist = rbuf_current_buffer (rbuf); +                if (!rlist->pending) { +                        retval = RBUF_EMPTY; +                        goto unblock; +                } + +                if (list_is_singular (&rbuf->freelist)) { +                        /** +                         * removal would lead to writer starvation, disallow +                         * switching. +                         */ +                        retval = RBUF_WOULD_STARVE; +                        goto unblock; +                } + +                list_del_init (&rlist->list); +                if (seqfn) +                        seqfn (rlist, mydata); +                rbuf->current = +                        list_first_entry (&rbuf->freelist, rbuf_list_t, list); +        } + unblock: +        UNLOCK (&rbuf->lock); + +        if (retval == RBUF_CONSUMABLE) +                *opaque = rlist; /* caller _owns_ the buffer */ + +        return retval; +} + +/** + * Wait for completion of pending writers and invoke dispatcher + * routine (for buffer consumption). + */ + +static inline void +__rbuf_wait_for_writers (rbuf_list_t *rlist) +{ +        while (rlist->completed != rlist->pending) +                pthread_cond_wait (&rlist->b_cond, &rlist->b_lock); +} + +#ifndef M_E +#define M_E 2.7 +#endif + +static inline void +rlist_shrink_vector (rbuf_list_t *rlist) +{ +        unsigned long long shrink = 0; + +        /** +         * fast path: don't bother to deallocate if vectors are hardly +         * used. +         */ +        if (rvec_in_watermark_range (rlist)) +                return; + +        /** +         * Calculate the shrink count based on total allocated vectors. +         * Note that the calculation sticks to rlist->total irrespective +         * of the actual usage count (rlist->used). Later, ->used could +         * be used to apply slack to the calculation based on how much +         * it lags from ->total. For now, let's stick to slow decay. +         */ +        shrink = rlist->total - (rlist->total * pow (M_E, -0.2)); + +        rlist_shrink_rvec (rlist, shrink); +        rlist->total -= shrink; +} + +int +rbuf_wait_for_completion (rbuf_t *rbuf, void *opaque, +                          void (*fn)(rbuf_list_t *, void *), void *arg) +{ +        rbuf_list_t *rlist = NULL; + +        if (!rbuf || !opaque) +                return -1; + +        rlist = opaque; + +        pthread_mutex_lock (&rlist->b_lock); +        { +                rlist_mark_waiting (rlist); +                __rbuf_wait_for_writers (rlist); +        } +        pthread_mutex_unlock (&rlist->b_lock); + +        /** +         * from here on, no need of locking until the rlist is put +         * back into rotation. +         */ + +        fn (rlist, arg); /* invoke dispatcher */ + +        rlist->awaiting = _gf_false; +        rlist->pending = rlist->completed = 0; + +        rlist_shrink_vector (rlist); +        rlist_reset_vector_usage (rlist); + +        rlist->rvec = list_first_entry (&rlist->veclist, rbuf_iovec_t, list); +        rbuf_reset_rvec (rlist->rvec); + +        LOCK (&rbuf->lock); +        { +                list_add_tail (&rlist->list, &rbuf->freelist); +        } +        UNLOCK (&rbuf->lock); + +        return 0; +} diff --git a/libglusterfs/src/rot-buffs.h b/libglusterfs/src/rot-buffs.h new file mode 100644 index 00000000000..aac24a4f571 --- /dev/null +++ b/libglusterfs/src/rot-buffs.h @@ -0,0 +1,121 @@ +/* +  Copyright (c) 2008-2015 Red Hat, Inc. <http://www.redhat.com> +  This file is part of GlusterFS. + +  This file is licensed to you under your choice of the GNU Lesser +  General Public License, version 3 or any later version (LGPLv3 or +  later), or the GNU General Public License, version 2 (GPLv2), in all +  cases as published by the Free Software Foundation. +*/ + +#ifndef __ROT_BUFFS_H +#define __ROT_BUFFS_H + +#include "list.h" +#include "locking.h" +#include "common-utils.h" + +typedef struct rbuf_iovec { +        struct iovec iov; + +        struct list_head list; +} rbuf_iovec_t; + +#define RBUF_IOVEC_SIZE  (sizeof (rbuf_iovec_t)) + +typedef struct rbuf_list { +        gf_lock_t c_lock; + +        pthread_mutex_t b_lock;          /* protects this structure */ +        pthread_cond_t b_cond;           /* signal for writer completion */ + +        gf_boolean_t awaiting; + +        unsigned long long pending;      /* pending writers */ +        unsigned long long completed;    /* completed writers */ + +        rbuf_iovec_t *rvec;              /* currently used IO vector */ + +        struct list_head veclist;        /* list of attached rbuf_iov */ + +        unsigned long long used;         /* consumable entries +                                             attached in ->veclist */ +        unsigned long long total;        /* total entries in ->veclist (used +                                            during deallocation) */ + +        unsigned long seq[2];            /* if interested, this whould store +                                            the start sequence number and the +                                            range */ + +        struct list_head list;           /* attachment to rbuf_t */ +} rbuf_list_t; + +struct rlist_iter { +        struct list_head veclist; + +        unsigned long long iter; +}; + +#define RLIST_ENTRY_COUNT(rlist)  rlist->used + +#define rlist_iter_init(riter, rlist)              \ +        do {                                       \ +                (riter)->iter = rlist->used;       \ +                (riter)->veclist = rlist->veclist; \ +        } while (0) + +#define rvec_for_each_entry(pos, riter)                                 \ +        for (pos = list_entry                                           \ +                     ((riter)->veclist.next, typeof(*pos), list);       \ +             (riter)->iter > 0;                                         \ +             pos = list_entry                                           \ +                     (pos->list.next, typeof(*pos), list),              \ +                     --((riter)->iter)) + +/** + * Sequence number assigment routine is called during buffer + * switch under rbuff ->lock. + */ +typedef void (sequence_fn) (rbuf_list_t *, void *); + +#define RLIST_STORE_SEQ(rlist, start, range)    \ +        do {                                    \ +                rlist->seq[0] = start;          \ +                rlist->seq[1] = range;          \ +        } while (0) + +#define RLIST_GET_SEQ(rlist, start, range)      \ +        do {                                    \ +                start = rlist->seq[0];          \ +                range = rlist->seq[1];          \ +        } while (0) + +typedef struct rbuf { +        gf_lock_t lock;        /* protects "current" rlist */ + +        rbuf_list_t *current;  /* cached pointer to first free rlist */ + +        struct list_head freelist; +} rbuf_t; + +typedef enum { +        RBUF_CONSUMABLE = 1, +        RBUF_BUSY, +        RBUF_EMPTY, +        RBUF_WOULD_STARVE, +} rlist_retval_t; + +/* Initialization/Destruction */ +rbuf_t *rbuf_init (int); +void rbuf_dtor (rbuf_t *); + +/* Producer API */ +char *rbuf_reserve_write_area (rbuf_t *, size_t, void **); +int rbuf_write_complete (void *); + +/* Consumer API */ +int rbuf_get_buffer (rbuf_t *, void **, sequence_fn *, void *); +int rbuf_wait_for_completion (rbuf_t *, void *, +                              void (*)(rbuf_list_t *, void *), void *); + +#endif  | 
