summaryrefslogtreecommitdiffstats
path: root/xlators/features/changelog/src/changelog-helpers.c
diff options
context:
space:
mode:
Diffstat (limited to 'xlators/features/changelog/src/changelog-helpers.c')
-rw-r--r--xlators/features/changelog/src/changelog-helpers.c210
1 files changed, 171 insertions, 39 deletions
diff --git a/xlators/features/changelog/src/changelog-helpers.c b/xlators/features/changelog/src/changelog-helpers.c
index 3af2938190d..5c755d76d69 100644
--- a/xlators/features/changelog/src/changelog-helpers.c
+++ b/xlators/features/changelog/src/changelog-helpers.c
@@ -24,6 +24,7 @@
#include "changelog-mem-types.h"
#include "changelog-encoders.h"
+#include "changelog-rpc-common.h"
#include <pthread.h>
static inline void
@@ -57,7 +58,7 @@ changelog_cleanup_free_mutex (void *arg_mutex)
pthread_mutex_unlock(p_mutex);
}
-void
+int
changelog_thread_cleanup (xlator_t *this, pthread_t thr_id)
{
int ret = 0;
@@ -65,7 +66,7 @@ changelog_thread_cleanup (xlator_t *this, pthread_t thr_id)
/* send a cancel request to the thread */
ret = pthread_cancel (thr_id);
- if (ret) {
+ if (ret != 0) {
gf_log (this->name, GF_LOG_ERROR,
"could not cancel thread (reason: %s)",
strerror (errno));
@@ -73,14 +74,14 @@ changelog_thread_cleanup (xlator_t *this, pthread_t thr_id)
}
ret = pthread_join (thr_id, &retval);
- if (ret || (retval != PTHREAD_CANCELED)) {
+ if ((ret != 0) || (retval != PTHREAD_CANCELED)) {
gf_log (this->name, GF_LOG_ERROR,
"cancel request not adhered as expected"
" (reason: %s)", strerror (errno));
}
out:
- return;
+ return ret;
}
inline void *
@@ -98,6 +99,145 @@ changelog_get_usable_buffer (changelog_local_t *local)
return cld->cld_iobuf->ptr;
}
+static inline int
+changelog_selector_index (unsigned int selector)
+{
+ return (ffs (selector) - 1);
+}
+
+inline int
+changelog_ev_selected (xlator_t *this,
+ changelog_ev_selector_t *selection,
+ unsigned int selector)
+{
+ int idx = 0;
+
+ idx = changelog_selector_index (selector);
+ gf_log (this->name, GF_LOG_DEBUG,
+ "selector ref count for %d (idx: %d): %d",
+ selector, idx, selection->ref[idx]);
+ /* this can be lockless */
+ return (idx < CHANGELOG_EV_SELECTION_RANGE
+ && (selection->ref[idx] > 0));
+}
+
+inline void
+changelog_select_event (xlator_t *this,
+ changelog_ev_selector_t *selection,
+ unsigned int selector)
+{
+ int idx = 0;
+
+ LOCK (&selection->reflock);
+ {
+ while (selector) {
+ idx = changelog_selector_index (selector);
+ if (idx < CHANGELOG_EV_SELECTION_RANGE) {
+ selection->ref[idx]++;
+ gf_log (this->name, GF_LOG_DEBUG,
+ "selecting event %d", idx);
+ }
+ selector &= ~(1 << idx);
+ }
+ }
+ UNLOCK (&selection->reflock);
+}
+
+inline void
+changelog_deselect_event (xlator_t *this,
+ changelog_ev_selector_t *selection,
+ unsigned int selector)
+{
+ int idx = 0;
+
+ LOCK (&selection->reflock);
+ {
+ while (selector) {
+ idx = changelog_selector_index (selector);
+ if (idx < CHANGELOG_EV_SELECTION_RANGE) {
+ selection->ref[idx]--;
+ gf_log (this->name, GF_LOG_DEBUG,
+ "de-selecting event %d", idx);
+ }
+ selector &= ~(1 << idx);
+ }
+ }
+ UNLOCK (&selection->reflock);
+}
+
+inline int
+changelog_init_event_selection (xlator_t *this,
+ changelog_ev_selector_t *selection)
+{
+ int ret = 0;
+ int j = CHANGELOG_EV_SELECTION_RANGE;
+
+ ret = LOCK_INIT (&selection->reflock);
+ if (ret != 0)
+ return -1;
+
+ LOCK (&selection->reflock);
+ {
+ while (j--) {
+ selection->ref[j] = 0;
+ }
+ }
+ UNLOCK (&selection->reflock);
+
+ return 0;
+}
+
+inline int
+changelog_cleanup_event_selection (xlator_t *this,
+ changelog_ev_selector_t *selection)
+{
+ int ret = 0;
+ int j = CHANGELOG_EV_SELECTION_RANGE;
+
+ LOCK (&selection->reflock);
+ {
+ while (j--) {
+ if (selection->ref[j] > 0)
+ gf_log (this->name, GF_LOG_WARNING,
+ "changelog event selection cleaning up "
+ " on active references");
+ }
+ }
+ UNLOCK (&selection->reflock);
+
+ return LOCK_DESTROY (&selection->reflock);
+}
+
+static inline void
+changelog_perform_dispatch (xlator_t *this,
+ changelog_priv_t *priv, void *mem, size_t size)
+{
+ char *buf = NULL;
+ void *opaque = NULL;
+
+ buf = rbuf_reserve_write_area (priv->rbuf, size, &opaque);
+ if (!buf) {
+ gf_log_callingfn (this->name,
+ GF_LOG_WARNING, "failed to dispatch event");
+ return;
+ }
+
+ memcpy (buf, mem, size);
+ rbuf_write_complete (opaque);
+}
+
+inline void
+changelog_dispatch_event (xlator_t *this,
+ changelog_priv_t *priv, changelog_event_t *ev)
+{
+ changelog_ev_selector_t *selection = NULL;
+
+ selection = &priv->ev_selection;
+ if (changelog_ev_selected (this, selection, ev->ev_type)) {
+ changelog_perform_dispatch (this, priv, ev, CHANGELOG_EV_SIZE);
+ }
+}
+
inline void
changelog_set_usable_record_and_length (changelog_local_t *local,
size_t len, int xr)
@@ -206,9 +346,9 @@ changelog_rollover_changelog (xlator_t *this,
{
int ret = -1;
int notify = 0;
- char *bname = NULL;
char ofile[PATH_MAX] = {0,};
char nfile[PATH_MAX] = {0,};
+ changelog_event_t ev = {0,};
if (priv->changelog_fd != -1) {
ret = fsync (priv->changelog_fd);
@@ -252,40 +392,32 @@ changelog_rollover_changelog (xlator_t *this,
}
if (notify) {
- bname = basename (nfile);
- gf_log (this->name, GF_LOG_DEBUG, "notifying: %s", bname);
- ret = changelog_write (priv->wfd, bname, strlen (bname) + 1);
- if (ret) {
- gf_log (this->name, GF_LOG_ERROR,
- "Failed to send file name to notify thread"
- " (reason: %s)", strerror (errno));
- } else {
- /* If this is explicit rollover initiated by snapshot,
- * wakeup reconfigure thread waiting for changelog to
- * rollover
- */
- if (priv->explicit_rollover) {
- priv->explicit_rollover = _gf_false;
- ret = pthread_mutex_lock (
- &priv->bn.bnotify_mutex);
- CHANGELOG_PTHREAD_ERROR_HANDLE_0 (ret, out);
- {
- priv->bn.bnotify = _gf_false;
- ret = pthread_cond_signal (
- &priv->bn.bnotify_cond);
- CHANGELOG_PTHREAD_ERROR_HANDLE_0 (ret,
- out);
- gf_log (this->name, GF_LOG_INFO,
- "Changelog published: %s and"
- " signalled bnotify", bname);
- }
- ret = pthread_mutex_unlock (
- &priv->bn.bnotify_mutex);
+ ev.ev_type = CHANGELOG_OP_TYPE_JOURNAL;
+ memcpy (ev.u.journal.path, nfile, strlen (nfile) + 1);
+ changelog_dispatch_event (this, priv, &ev);
+
+ /* If this is explicit rollover initiated by snapshot,
+ * wakeup reconfigure thread waiting for changelog to
+ * rollover
+ */
+ if (priv->explicit_rollover) {
+ priv->explicit_rollover = _gf_false;
+
+ ret = pthread_mutex_lock (&priv->bn.bnotify_mutex);
+ CHANGELOG_PTHREAD_ERROR_HANDLE_0 (ret, out);
+ {
+ priv->bn.bnotify = _gf_false;
+ ret = pthread_cond_signal
+ (&priv->bn.bnotify_cond);
CHANGELOG_PTHREAD_ERROR_HANDLE_0 (ret, out);
+ gf_log (this->name, GF_LOG_INFO,
+ "Changelog published: %s signalled"
+ " bnotify", nfile);
}
+ ret = pthread_mutex_unlock (&priv->bn.bnotify_mutex);
+ CHANGELOG_PTHREAD_ERROR_HANDLE_0 (ret, out);
}
}
-
out:
return ret;
}
@@ -434,8 +566,8 @@ changelog_snap_logging_stop (xlator_t *this,
}
int
-changelog_open (xlator_t *this,
- changelog_priv_t *priv)
+changelog_open_journal (xlator_t *this,
+ changelog_priv_t *priv)
{
int fd = 0;
int ret = -1;
@@ -490,7 +622,7 @@ changelog_start_next_change (xlator_t *this,
ret = changelog_rollover_changelog (this, priv, ts);
if (!ret && !finale)
- ret = changelog_open (this, priv);
+ ret = changelog_open_journal (this, priv);
return ret;
}
@@ -975,7 +1107,7 @@ __changelog_inode_ctx_set (xlator_t *this,
* one shot routine to get the address and the value of a inode version
* for a particular type.
*/
-static changelog_inode_ctx_t *
+changelog_inode_ctx_t *
__changelog_inode_ctx_get (xlator_t *this,
inode_t *inode, unsigned long **iver,
unsigned long *version, changelog_log_type type)