diff options
Diffstat (limited to 'xlators/features/changelog/src/changelog-helpers.c')
-rw-r--r-- | xlators/features/changelog/src/changelog-helpers.c | 210 |
1 files changed, 171 insertions, 39 deletions
diff --git a/xlators/features/changelog/src/changelog-helpers.c b/xlators/features/changelog/src/changelog-helpers.c index 3af2938190d..5c755d76d69 100644 --- a/xlators/features/changelog/src/changelog-helpers.c +++ b/xlators/features/changelog/src/changelog-helpers.c @@ -24,6 +24,7 @@ #include "changelog-mem-types.h" #include "changelog-encoders.h" +#include "changelog-rpc-common.h" #include <pthread.h> static inline void @@ -57,7 +58,7 @@ changelog_cleanup_free_mutex (void *arg_mutex) pthread_mutex_unlock(p_mutex); } -void +int changelog_thread_cleanup (xlator_t *this, pthread_t thr_id) { int ret = 0; @@ -65,7 +66,7 @@ changelog_thread_cleanup (xlator_t *this, pthread_t thr_id) /* send a cancel request to the thread */ ret = pthread_cancel (thr_id); - if (ret) { + if (ret != 0) { gf_log (this->name, GF_LOG_ERROR, "could not cancel thread (reason: %s)", strerror (errno)); @@ -73,14 +74,14 @@ changelog_thread_cleanup (xlator_t *this, pthread_t thr_id) } ret = pthread_join (thr_id, &retval); - if (ret || (retval != PTHREAD_CANCELED)) { + if ((ret != 0) || (retval != PTHREAD_CANCELED)) { gf_log (this->name, GF_LOG_ERROR, "cancel request not adhered as expected" " (reason: %s)", strerror (errno)); } out: - return; + return ret; } inline void * @@ -98,6 +99,145 @@ changelog_get_usable_buffer (changelog_local_t *local) return cld->cld_iobuf->ptr; } +static inline int +changelog_selector_index (unsigned int selector) +{ + return (ffs (selector) - 1); +} + +inline int +changelog_ev_selected (xlator_t *this, + changelog_ev_selector_t *selection, + unsigned int selector) +{ + int idx = 0; + + idx = changelog_selector_index (selector); + gf_log (this->name, GF_LOG_DEBUG, + "selector ref count for %d (idx: %d): %d", + selector, idx, selection->ref[idx]); + /* this can be lockless */ + return (idx < CHANGELOG_EV_SELECTION_RANGE + && (selection->ref[idx] > 0)); +} + +inline void +changelog_select_event (xlator_t *this, + changelog_ev_selector_t *selection, + unsigned int selector) +{ + int idx = 0; + + LOCK (&selection->reflock); + { + while (selector) { + idx = changelog_selector_index (selector); + if (idx < CHANGELOG_EV_SELECTION_RANGE) { + selection->ref[idx]++; + gf_log (this->name, GF_LOG_DEBUG, + "selecting event %d", idx); + } + selector &= ~(1 << idx); + } + } + UNLOCK (&selection->reflock); +} + +inline void +changelog_deselect_event (xlator_t *this, + changelog_ev_selector_t *selection, + unsigned int selector) +{ + int idx = 0; + + LOCK (&selection->reflock); + { + while (selector) { + idx = changelog_selector_index (selector); + if (idx < CHANGELOG_EV_SELECTION_RANGE) { + selection->ref[idx]--; + gf_log (this->name, GF_LOG_DEBUG, + "de-selecting event %d", idx); + } + selector &= ~(1 << idx); + } + } + UNLOCK (&selection->reflock); +} + +inline int +changelog_init_event_selection (xlator_t *this, + changelog_ev_selector_t *selection) +{ + int ret = 0; + int j = CHANGELOG_EV_SELECTION_RANGE; + + ret = LOCK_INIT (&selection->reflock); + if (ret != 0) + return -1; + + LOCK (&selection->reflock); + { + while (j--) { + selection->ref[j] = 0; + } + } + UNLOCK (&selection->reflock); + + return 0; +} + +inline int +changelog_cleanup_event_selection (xlator_t *this, + changelog_ev_selector_t *selection) +{ + int ret = 0; + int j = CHANGELOG_EV_SELECTION_RANGE; + + LOCK (&selection->reflock); + { + while (j--) { + if (selection->ref[j] > 0) + gf_log (this->name, GF_LOG_WARNING, + "changelog event selection cleaning up " + " on active references"); + } + } + UNLOCK (&selection->reflock); + + return LOCK_DESTROY (&selection->reflock); +} + +static inline void +changelog_perform_dispatch (xlator_t *this, + changelog_priv_t *priv, void *mem, size_t size) +{ + char *buf = NULL; + void *opaque = NULL; + + buf = rbuf_reserve_write_area (priv->rbuf, size, &opaque); + if (!buf) { + gf_log_callingfn (this->name, + GF_LOG_WARNING, "failed to dispatch event"); + return; + } + + memcpy (buf, mem, size); + rbuf_write_complete (opaque); +} + +inline void +changelog_dispatch_event (xlator_t *this, + changelog_priv_t *priv, changelog_event_t *ev) +{ + changelog_ev_selector_t *selection = NULL; + + selection = &priv->ev_selection; + if (changelog_ev_selected (this, selection, ev->ev_type)) { + changelog_perform_dispatch (this, priv, ev, CHANGELOG_EV_SIZE); + } +} + inline void changelog_set_usable_record_and_length (changelog_local_t *local, size_t len, int xr) @@ -206,9 +346,9 @@ changelog_rollover_changelog (xlator_t *this, { int ret = -1; int notify = 0; - char *bname = NULL; char ofile[PATH_MAX] = {0,}; char nfile[PATH_MAX] = {0,}; + changelog_event_t ev = {0,}; if (priv->changelog_fd != -1) { ret = fsync (priv->changelog_fd); @@ -252,40 +392,32 @@ changelog_rollover_changelog (xlator_t *this, } if (notify) { - bname = basename (nfile); - gf_log (this->name, GF_LOG_DEBUG, "notifying: %s", bname); - ret = changelog_write (priv->wfd, bname, strlen (bname) + 1); - if (ret) { - gf_log (this->name, GF_LOG_ERROR, - "Failed to send file name to notify thread" - " (reason: %s)", strerror (errno)); - } else { - /* If this is explicit rollover initiated by snapshot, - * wakeup reconfigure thread waiting for changelog to - * rollover - */ - if (priv->explicit_rollover) { - priv->explicit_rollover = _gf_false; - ret = pthread_mutex_lock ( - &priv->bn.bnotify_mutex); - CHANGELOG_PTHREAD_ERROR_HANDLE_0 (ret, out); - { - priv->bn.bnotify = _gf_false; - ret = pthread_cond_signal ( - &priv->bn.bnotify_cond); - CHANGELOG_PTHREAD_ERROR_HANDLE_0 (ret, - out); - gf_log (this->name, GF_LOG_INFO, - "Changelog published: %s and" - " signalled bnotify", bname); - } - ret = pthread_mutex_unlock ( - &priv->bn.bnotify_mutex); + ev.ev_type = CHANGELOG_OP_TYPE_JOURNAL; + memcpy (ev.u.journal.path, nfile, strlen (nfile) + 1); + changelog_dispatch_event (this, priv, &ev); + + /* If this is explicit rollover initiated by snapshot, + * wakeup reconfigure thread waiting for changelog to + * rollover + */ + if (priv->explicit_rollover) { + priv->explicit_rollover = _gf_false; + + ret = pthread_mutex_lock (&priv->bn.bnotify_mutex); + CHANGELOG_PTHREAD_ERROR_HANDLE_0 (ret, out); + { + priv->bn.bnotify = _gf_false; + ret = pthread_cond_signal + (&priv->bn.bnotify_cond); CHANGELOG_PTHREAD_ERROR_HANDLE_0 (ret, out); + gf_log (this->name, GF_LOG_INFO, + "Changelog published: %s signalled" + " bnotify", nfile); } + ret = pthread_mutex_unlock (&priv->bn.bnotify_mutex); + CHANGELOG_PTHREAD_ERROR_HANDLE_0 (ret, out); } } - out: return ret; } @@ -434,8 +566,8 @@ changelog_snap_logging_stop (xlator_t *this, } int -changelog_open (xlator_t *this, - changelog_priv_t *priv) +changelog_open_journal (xlator_t *this, + changelog_priv_t *priv) { int fd = 0; int ret = -1; @@ -490,7 +622,7 @@ changelog_start_next_change (xlator_t *this, ret = changelog_rollover_changelog (this, priv, ts); if (!ret && !finale) - ret = changelog_open (this, priv); + ret = changelog_open_journal (this, priv); return ret; } @@ -975,7 +1107,7 @@ __changelog_inode_ctx_set (xlator_t *this, * one shot routine to get the address and the value of a inode version * for a particular type. */ -static changelog_inode_ctx_t * +changelog_inode_ctx_t * __changelog_inode_ctx_get (xlator_t *this, inode_t *inode, unsigned long **iver, unsigned long *version, changelog_log_type type) |