summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--xlators/features/changelog/src/changelog-helpers.c333
-rw-r--r--xlators/features/changelog/src/changelog-helpers.h133
-rw-r--r--xlators/features/changelog/src/changelog.c346
3 files changed, 800 insertions, 12 deletions
diff --git a/xlators/features/changelog/src/changelog-helpers.c b/xlators/features/changelog/src/changelog-helpers.c
index 91c43a16c86..c3661b9b76c 100644
--- a/xlators/features/changelog/src/changelog-helpers.c
+++ b/xlators/features/changelog/src/changelog-helpers.c
@@ -24,6 +24,15 @@
#include "changelog-encoders.h"
#include <pthread.h>
+static void
+changelog_cleanup_free_mutex (void *arg_mutex)
+{
+ pthread_mutex_t *p_mutex = (pthread_mutex_t*) arg_mutex;
+
+ if (p_mutex)
+ pthread_mutex_unlock(p_mutex);
+}
+
void
changelog_thread_cleanup (xlator_t *this, pthread_t thr_id)
{
@@ -134,6 +143,12 @@ changelog_rollover_changelog (xlator_t *this,
char nfile[PATH_MAX] = {0,};
if (priv->changelog_fd != -1) {
+ ret = fsync (priv->changelog_fd);
+ if (ret < 0) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "fsync failed (reason: %s)",
+ strerror (errno));
+ }
close (priv->changelog_fd);
priv->changelog_fd = -1;
}
@@ -166,9 +181,34 @@ changelog_rollover_changelog (xlator_t *this,
gf_log (this->name, GF_LOG_ERROR,
"Failed to send file name to notify thread"
" (reason: %s)", strerror (errno));
+ } else {
+ /* If this is explicit rollover initiated by snapshot,
+ * wakeup reconfigure thread waiting for changelog to
+ * rollover
+ */
+ if (priv->explicit_rollover) {
+ priv->explicit_rollover = _gf_false;
+ ret = pthread_mutex_lock (
+ &priv->bn.bnotify_mutex);
+ CHANGELOG_PTHREAD_ERROR_HANDLE_0 (ret, out);
+ {
+ priv->bn.bnotify = _gf_false;
+ ret = pthread_cond_signal (
+ &priv->bn.bnotify_cond);
+ CHANGELOG_PTHREAD_ERROR_HANDLE_0 (ret,
+ out);
+ gf_log (this->name, GF_LOG_INFO,
+ "Changelog published and"
+ " signalled bnotify");
+ }
+ ret = pthread_mutex_unlock (
+ &priv->bn.bnotify_mutex);
+ CHANGELOG_PTHREAD_ERROR_HANDLE_0 (ret, out);
+ }
}
}
+ out:
return ret;
}
@@ -313,9 +353,11 @@ changelog_local_init (xlator_t *this, inode_t *inode,
uuid_t gfid, int xtra_records,
gf_boolean_t update_flag)
{
+ changelog_priv_t *priv = NULL;
changelog_local_t *local = NULL;
struct iobuf *iobuf = NULL;
+ priv = this->private;
/**
* We relax the presence of inode if @update_flag is true.
* The caller (implmentation of the fop) needs to be careful to
@@ -378,6 +420,80 @@ changelog_inject_single_event (xlator_t *this,
return priv->cd.dispatchfn (this, priv, priv->cd.cd_data, cld, NULL);
}
+/* Wait till all the black fops are drained */
+void
+changelog_drain_black_fops (xlator_t *this, changelog_priv_t *priv)
+{
+ int ret = 0;
+
+ /* clean up framework of pthread_mutex is required here as
+ * 'reconfigure' terminates the changelog_rollover thread
+ * on graph change.
+ */
+ pthread_cleanup_push (changelog_cleanup_free_mutex,
+ &priv->dm.drain_black_mutex);
+ ret = pthread_mutex_lock (&priv->dm.drain_black_mutex);
+ if (ret)
+ gf_log (this->name, GF_LOG_ERROR, "pthread error:"
+ " Error:%d", ret);
+ while (priv->dm.black_fop_cnt > 0) {
+ gf_log (this->name, GF_LOG_DEBUG,
+ "Condtional wait on black fops: %ld",
+ priv->dm.black_fop_cnt);
+ priv->dm.drain_wait_black = _gf_true;
+ ret = pthread_cond_wait (&priv->dm.drain_black_cond,
+ &priv->dm.drain_black_mutex);
+ if (ret)
+ gf_log (this->name, GF_LOG_ERROR, "pthread"
+ " cond wait failed: Error:%d", ret);
+ }
+ priv->dm.drain_wait_black = _gf_false;
+ ret = pthread_mutex_unlock (&priv->dm.drain_black_mutex);
+ pthread_cleanup_pop (0);
+ if (ret)
+ gf_log (this->name, GF_LOG_ERROR, "pthread error:"
+ " Error:%d", ret);
+ gf_log (this->name, GF_LOG_DEBUG,
+ "Woke up: Conditional wait on black fops");
+}
+
+/* Wait till all the white fops are drained */
+void
+changelog_drain_white_fops (xlator_t *this, changelog_priv_t *priv)
+{
+ int ret = 0;
+
+ /* clean up framework of pthread_mutex is required here as
+ * 'reconfigure' terminates the changelog_rollover thread
+ * on graph change.
+ */
+ pthread_cleanup_push (changelog_cleanup_free_mutex,
+ &priv->dm.drain_white_mutex);
+ ret = pthread_mutex_lock (&priv->dm.drain_white_mutex);
+ if (ret)
+ gf_log (this->name, GF_LOG_ERROR, "pthread error:"
+ " Error:%d", ret);
+ while (priv->dm.white_fop_cnt > 0) {
+ gf_log (this->name, GF_LOG_DEBUG,
+ "Condtional wait on white fops : %ld",
+ priv->dm.white_fop_cnt);
+ priv->dm.drain_wait_white = _gf_true;
+ ret = pthread_cond_wait (&priv->dm.drain_white_cond,
+ &priv->dm.drain_white_mutex);
+ if (ret)
+ gf_log (this->name, GF_LOG_ERROR, "pthread"
+ " cond wait failed: Error:%d", ret);
+ }
+ priv->dm.drain_wait_white = _gf_false;
+ ret = pthread_mutex_unlock (&priv->dm.drain_white_mutex);
+ if (ret)
+ gf_log (this->name, GF_LOG_ERROR, "pthread error:"
+ " Error:%d", ret);
+ pthread_cleanup_pop (0);
+ gf_log (this->name, GF_LOG_DEBUG,
+ "Woke up: Conditional wait on white fops");
+}
+
/**
* TODO: these threads have many thing in common (wake up after
* a certain time etc..). move them into separate routine.
@@ -385,12 +501,17 @@ changelog_inject_single_event (xlator_t *this,
void *
changelog_rollover (void *data)
{
- int ret = 0;
- xlator_t *this = NULL;
- struct timeval tv = {0,};
- changelog_log_data_t cld = {0,};
- changelog_time_slice_t *slice = NULL;
- changelog_priv_t *priv = data;
+ int ret = 0;
+ xlator_t *this = NULL;
+ struct timeval tv = {0,};
+ changelog_log_data_t cld = {0,};
+ changelog_time_slice_t *slice = NULL;
+ changelog_priv_t *priv = data;
+ int max_fd = 0;
+ char buf[1] = {0};
+ int len = 0;
+
+ fd_set rset;
this = priv->cr.this;
slice = &priv->slice;
@@ -398,10 +519,62 @@ changelog_rollover (void *data)
while (1) {
tv.tv_sec = priv->rollover_time;
tv.tv_usec = 0;
-
- ret = select (0, NULL, NULL, NULL, &tv);
- if (ret)
+ FD_ZERO(&rset);
+ FD_SET(priv->cr.rfd, &rset);
+ max_fd = priv->cr.rfd;
+ max_fd = max_fd + 1;
+
+ /* It seems there is a race between actual rollover and explicit
+ * rollover. But it is handled. If actual rollover is being
+ * done and the explicit rollover event comes, the event is
+ * not missed. The next select will immediately wakeup to
+ * handle explicit wakeup.
+ */
+
+ ret = select (max_fd, &rset, NULL, NULL, &tv);
+ if (ret == -1) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "select failed: %s", strerror(errno));
continue;
+ } else if (ret && FD_ISSET(priv->cr.rfd, &rset)) {
+ gf_log (this->name, GF_LOG_INFO,
+ "Explicit wakeup of select on barrier notify");
+ len = read(priv->cr.rfd, buf, 1);
+ if (len == 0) {
+ gf_log (this->name, GF_LOG_ERROR, "BUG: Got EOF"
+ " from reconfigure notification pipe");
+ continue;
+ }
+ if (len < 0) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "Failed to read wakeup data");
+ continue;
+ }
+ /* Lock is not required as same thread is modifying.*/
+ priv->explicit_rollover = _gf_true;
+ } else {
+ gf_log (this->name, GF_LOG_DEBUG,
+ "select wokeup on timeout");
+ }
+
+ /* Reading curent_color without lock is fine here
+ * as it is only modified here and is next to reading.
+ */
+ if (priv->current_color == FOP_COLOR_BLACK) {
+ LOCK(&priv->lock);
+ priv->current_color = FOP_COLOR_WHITE;
+ UNLOCK(&priv->lock);
+ gf_log (this->name, GF_LOG_DEBUG, "Black fops"
+ " to be drained:%ld",priv->dm.black_fop_cnt);
+ changelog_drain_black_fops (this, priv);
+ } else {
+ LOCK(&priv->lock);
+ priv->current_color = FOP_COLOR_BLACK;
+ UNLOCK(&priv->lock);
+ gf_log (this->name, GF_LOG_DEBUG, "White fops"
+ " to be drained:%ld",priv->dm.white_fop_cnt);
+ changelog_drain_white_fops (this, priv);
+ }
ret = changelog_fill_rollover_data (&cld, _gf_false);
if (ret) {
@@ -694,3 +867,145 @@ changelog_update (xlator_t *this, changelog_priv_t *priv,
return;
}
+
+/* Begin: Geo-rep snapshot dependency changes */
+
+/* changelog_color_fop_and_inc_cnt: Assign color and inc fop cnt.
+ *
+ * Assigning color and increment of corresponding fop count should happen
+ * in a lock (i.e., there should be no window between them). If it does not,
+ * we might miss draining those fops which are colored but not yet incremented
+ * the count. Let's assume black fops are draining. If the black fop count
+ * reaches zero, we say draining is completed but we miss black fops which are
+ * not incremented fop count but color is assigned black.
+ */
+
+inline void
+changelog_color_fop_and_inc_cnt (xlator_t *this, changelog_priv_t *priv,
+ changelog_local_t *local)
+{
+ if (!priv || !local)
+ return;
+
+ LOCK (&priv->lock);
+ {
+ local->color = priv->current_color;
+ changelog_inc_fop_cnt (this, priv, local);
+ }
+ UNLOCK (&priv->lock);
+}
+
+/* Increments the respective fop counter based on the fop color */
+inline void
+changelog_inc_fop_cnt (xlator_t *this, changelog_priv_t *priv,
+ changelog_local_t *local)
+{
+ int ret = 0;
+
+ if (local) {
+ if (local->color == FOP_COLOR_BLACK) {
+ ret = pthread_mutex_lock (&priv->dm.drain_black_mutex);
+ CHANGELOG_PTHREAD_ERROR_HANDLE_0 (ret, out);
+ {
+ priv->dm.black_fop_cnt++;
+ }
+ ret = pthread_mutex_unlock(&priv->dm.drain_black_mutex);
+ CHANGELOG_PTHREAD_ERROR_HANDLE_0 (ret, out);
+ } else {
+ ret = pthread_mutex_lock (&priv->dm.drain_white_mutex);
+ CHANGELOG_PTHREAD_ERROR_HANDLE_0 (ret, out);
+ {
+ priv->dm.white_fop_cnt++;
+ }
+ ret = pthread_mutex_unlock(&priv->dm.drain_white_mutex);
+ CHANGELOG_PTHREAD_ERROR_HANDLE_0 (ret, out);
+ }
+ }
+ out:
+ return;
+}
+
+/* Decrements the respective fop counter based on the fop color */
+inline void
+changelog_dec_fop_cnt (xlator_t *this, changelog_priv_t *priv,
+ changelog_local_t *local)
+{
+ int ret = 0;
+
+ if (local) {
+ if (local->color == FOP_COLOR_BLACK) {
+ ret = pthread_mutex_lock (&priv->dm.drain_black_mutex);
+ CHANGELOG_PTHREAD_ERROR_HANDLE_0 (ret, out);
+ {
+ priv->dm.black_fop_cnt--;
+ if (priv->dm.black_fop_cnt == 0 &&
+ priv->dm.drain_wait_black == _gf_true) {
+ ret = pthread_cond_signal (
+ &priv->dm.drain_black_cond);
+ CHANGELOG_PTHREAD_ERROR_HANDLE_0 (ret,
+ out);
+ gf_log (this->name, GF_LOG_DEBUG,
+ "Signalled draining of black");
+ }
+ }
+ ret = pthread_mutex_unlock(&priv->dm.drain_black_mutex);
+ CHANGELOG_PTHREAD_ERROR_HANDLE_0 (ret, out);
+ } else {
+ ret = pthread_mutex_lock (&priv->dm.drain_white_mutex);
+ CHANGELOG_PTHREAD_ERROR_HANDLE_0 (ret, out);
+ {
+ priv->dm.white_fop_cnt--;
+ if (priv->dm.white_fop_cnt == 0 &&
+ priv->dm.drain_wait_white == _gf_true) {
+ ret = pthread_cond_signal (
+ &priv->dm.drain_white_cond);
+ CHANGELOG_PTHREAD_ERROR_HANDLE_0 (ret,
+ out);
+ gf_log (this->name, GF_LOG_DEBUG,
+ "Signalled draining of white");
+ }
+ }
+ ret = pthread_mutex_unlock(&priv->dm.drain_white_mutex);
+ CHANGELOG_PTHREAD_ERROR_HANDLE_0 (ret, out);
+ }
+ }
+ out:
+ return;
+}
+
+/* Write to a pipe setup between changelog main thread and changelog
+ * rollover thread to initiate explicit rollover of changelog journal.
+ */
+inline int
+changelog_barrier_notify (changelog_priv_t *priv, char *buf)
+{
+ int ret = 0;
+
+ LOCK(&priv->lock);
+ ret = changelog_write (priv->cr_wfd, buf, 1);
+ UNLOCK(&priv->lock);
+ return ret;
+}
+
+/* Clean up flags set on barrier notification */
+/*TODO: Add changelog barrier stop code with changelog barrier patch*/
+inline void
+changelog_barrier_cleanup (xlator_t *this, changelog_priv_t *priv)
+{
+ int ret = 0;
+
+ LOCK (&priv->bflags.lock);
+ priv->bflags.barrier_ext = _gf_false;
+ UNLOCK (&priv->bflags.lock);
+
+ ret = pthread_mutex_lock (&priv->bn.bnotify_mutex);
+ CHANGELOG_PTHREAD_ERROR_HANDLE_0 (ret, out);
+ {
+ priv->bn.bnotify = _gf_false;
+ }
+ ret = pthread_mutex_unlock (&priv->bn.bnotify_mutex);
+ CHANGELOG_PTHREAD_ERROR_HANDLE_0 (ret, out);
+ out:
+ return;
+}
+/* End: Geo-Rep snapshot dependency changes */
diff --git a/xlators/features/changelog/src/changelog-helpers.h b/xlators/features/changelog/src/changelog-helpers.h
index 53588f55efa..54577592c90 100644
--- a/xlators/features/changelog/src/changelog-helpers.h
+++ b/xlators/features/changelog/src/changelog-helpers.h
@@ -107,6 +107,9 @@ typedef struct changelog_rollover {
pthread_t rollover_th;
xlator_t *this;
+
+ /* read end of pipe used as event from barrier on snapshot */
+ int rfd;
} changelog_rollover_t;
typedef struct changelog_fsync {
@@ -139,6 +142,57 @@ typedef struct changelog_notify {
xlator_t *this;
} changelog_notify_t;
+/* Draining during changelog rollover (for geo-rep snapshot dependency):
+ * --------------------------------------------------------------------
+ * The introduction of draining of in-transit fops during changelog rollover
+ * (both explicit/timeout triggered) requires coloring of fops. Basically the
+ * implementation requires two counters, one counter which keeps the count of
+ * current intransit fops which should end up in current changelog and the other
+ * counter to keep track of incoming fops which should be drained as part of
+ * next changelog rollover event. The fops are colored w.r.t these counters.
+ * The fops that are to be drained as part of current changelog rollover is
+ * given one color and the fops which keep incoming during this and not
+ * necessarily should end up in current changelog and should be drained as part
+ * of next changelog rollover are given other color. The color switching
+ * continues with each changelog rollover. Two colors(black and white) are
+ * chosen here and initially black is chosen is default.
+ */
+
+typedef enum chlog_fop_color {
+ FOP_COLOR_BLACK,
+ FOP_COLOR_WHITE
+}chlog_fop_color_t;
+
+/* Barrier notify variable */
+typedef struct barrier_notify {
+ pthread_mutex_t bnotify_mutex;
+ pthread_cond_t bnotify_cond;
+ gf_boolean_t bnotify;
+}barrier_notify_t;
+
+/* Two separate mutex and conditional variable set is used
+ * to drain white and black fops. */
+
+typedef struct drain_mgmt {
+ pthread_mutex_t drain_black_mutex;
+ pthread_cond_t drain_black_cond;
+ pthread_mutex_t drain_white_mutex;
+ pthread_cond_t drain_white_cond;
+ /* Represents black fops count in-transit */
+ unsigned long black_fop_cnt;
+ /* Represents white fops count in-transit */
+ unsigned long white_fop_cnt;
+ gf_boolean_t drain_wait_black;
+ gf_boolean_t drain_wait_white;
+}drain_mgmt_t;
+
+/* Internal and External barrier on/off indicating flags */
+typedef struct barrier_flags {
+ gf_lock_t lock;
+ gf_boolean_t barrier_ext;
+}barrier_flags_t;
+
+
struct changelog_priv {
gf_boolean_t active;
@@ -191,6 +245,26 @@ struct changelog_priv {
/* encoder */
struct changelog_encoder *ce;
+
+ /* snapshot dependency changes */
+
+ /* Draining of fops*/
+ drain_mgmt_t dm;
+
+ /* Represents the active color. Initially by default black */
+ chlog_fop_color_t current_color;
+
+ /* write end of pipe to do explicit rollover on barrier during snap */
+ int cr_wfd;
+
+ /* flag to determine explicit rollover is triggered */
+ gf_boolean_t explicit_rollover;
+
+ /* barrier notification variable protected by mutex */
+ barrier_notify_t bn;
+
+ /* barrier on/off indicating flags */
+ barrier_flags_t bflags;
};
struct changelog_local {
@@ -206,6 +280,9 @@ struct changelog_local {
* but we call it as ->prev_entry... ha ha ha
*/
struct changelog_local *prev_entry;
+
+ /* snap dependency changes */
+ chlog_fop_color_t color;
};
typedef struct changelog_local changelog_local_t;
@@ -311,6 +388,25 @@ changelog_fsync_thread (void *data);
int
changelog_forget (xlator_t *this, inode_t *inode);
+/* Geo-Rep snapshot dependency changes */
+inline void
+changelog_color_fop_and_inc_cnt (xlator_t *this, changelog_priv_t *priv,
+ changelog_local_t *local);
+inline void
+changelog_inc_fop_cnt (xlator_t *this, changelog_priv_t *priv,
+ changelog_local_t *local);
+inline void
+changelog_dec_fop_cnt (xlator_t *this, changelog_priv_t *priv,
+ changelog_local_t *local);
+inline int
+changelog_barrier_notify (changelog_priv_t *priv, char* buf);
+inline void
+changelog_barrier_cleanup (xlator_t *this, changelog_priv_t *priv);
+void
+changelog_drain_white_fops (xlator_t *this, changelog_priv_t *priv);
+void
+changelog_drain_black_fops (xlator_t *this, changelog_priv_t *priv);
+
/* macros */
#define CHANGELOG_STACK_UNWIND(fop, frame, params ...) do { \
@@ -404,4 +500,41 @@ changelog_forget (xlator_t *this, inode_t *inode);
goto label; \
} while (0)
+/* Begin: Geo-Rep snapshot dependency changes */
+
+#define DICT_ERROR -1
+#define BARRIER_OFF 0
+#define BARRIER_ON 1
+#define DICT_DEFAULT 2
+
+#define CHANGELOG_NOT_ON_THEN_GOTO(priv, ret, label) do { \
+ if (!priv->active) { \
+ gf_log (this->name, GF_LOG_WARNING, \
+ "Changelog is not active, return success"); \
+ ret = 0; \
+ goto label; \
+ } \
+ } while (0)
+
+/* Log pthread error and goto label */
+#define CHANGELOG_PTHREAD_ERROR_HANDLE_0(ret, label) do { \
+ if (ret) { \
+ gf_log (this->name, GF_LOG_ERROR, \
+ "pthread error: Error: %d", ret); \
+ ret = -1; \
+ goto label; \
+ } \
+ } while (0)
+
+/* Log pthread error, set flag and goto label */
+#define CHANGELOG_PTHREAD_ERROR_HANDLE_1(ret, label, flag) do { \
+ if (ret) { \
+ gf_log (this->name, GF_LOG_ERROR, \
+ "pthread error: Error: %d", ret); \
+ ret = -1; \
+ flag = _gf_true; \
+ goto label; \
+ } \
+ } while (0)
#endif /* _CHANGELOG_HELPERS_H */
+/* End: Geo-Rep snapshot dependency changes */
diff --git a/xlators/features/changelog/src/changelog.c b/xlators/features/changelog/src/changelog.c
index 3e40984f6de..0a491c5ac07 100644
--- a/xlators/features/changelog/src/changelog.c
+++ b/xlators/features/changelog/src/changelog.c
@@ -62,6 +62,7 @@ changelog_rmdir_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
changelog_update (this, priv, local, CHANGELOG_TYPE_ENTRY);
unwind:
+ changelog_dec_fop_cnt (this, priv, local);
CHANGELOG_STACK_UNWIND (rmdir, frame, op_ret, op_errno,
preparent, postparent, xdata);
return 0;
@@ -94,6 +95,7 @@ changelog_rmdir (call_frame_t *frame, xlator_t *this,
changelog_set_usable_record_and_length (frame->local, xtra_len, 2);
wind:
+ changelog_color_fop_and_inc_cnt (this, priv, frame->local);
STACK_WIND (frame, changelog_rmdir_cbk,
FIRST_CHILD (this), FIRST_CHILD (this)->fops->rmdir,
loc, xflags, xdata);
@@ -118,6 +120,7 @@ changelog_unlink_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
changelog_update (this, priv, local, CHANGELOG_TYPE_ENTRY);
unwind:
+ changelog_dec_fop_cnt (this, priv, local);
CHANGELOG_STACK_UNWIND (unlink, frame, op_ret, op_errno,
preparent, postparent, xdata);
return 0;
@@ -150,6 +153,7 @@ changelog_unlink (call_frame_t *frame, xlator_t *this,
changelog_set_usable_record_and_length (frame->local, xtra_len, 2);
wind:
+ changelog_color_fop_and_inc_cnt (this, priv, frame->local);
STACK_WIND (frame, changelog_unlink_cbk,
FIRST_CHILD (this), FIRST_CHILD (this)->fops->unlink,
loc, xflags, xdata);
@@ -177,6 +181,7 @@ changelog_rename_cbk (call_frame_t *frame,
changelog_update (this, priv, local, CHANGELOG_TYPE_ENTRY);
unwind:
+ changelog_dec_fop_cnt (this, priv, local);
CHANGELOG_STACK_UNWIND (rename, frame, op_ret, op_errno,
buf, preoldparent, postoldparent,
prenewparent, postnewparent, xdata);
@@ -216,6 +221,7 @@ changelog_rename (call_frame_t *frame, xlator_t *this,
changelog_set_usable_record_and_length (frame->local, xtra_len, 3);
wind:
+ changelog_color_fop_and_inc_cnt (this, priv, frame->local);
STACK_WIND (frame, changelog_rename_cbk,
FIRST_CHILD (this), FIRST_CHILD (this)->fops->rename,
oldloc, newloc, xdata);
@@ -242,6 +248,7 @@ changelog_link_cbk (call_frame_t *frame,
changelog_update (this, priv, local, CHANGELOG_TYPE_ENTRY);
unwind:
+ changelog_dec_fop_cnt (this, priv, local);
CHANGELOG_STACK_UNWIND (link, frame, op_ret, op_errno,
inode, buf, preparent, postparent, xdata);
return 0;
@@ -276,6 +283,7 @@ changelog_link (call_frame_t *frame,
changelog_set_usable_record_and_length (frame->local, xtra_len, 2);
wind:
+ changelog_color_fop_and_inc_cnt (this, priv, frame->local);
STACK_WIND (frame, changelog_link_cbk,
FIRST_CHILD (this), FIRST_CHILD (this)->fops->link,
oldloc, newloc, xdata);
@@ -302,6 +310,7 @@ changelog_mkdir_cbk (call_frame_t *frame,
changelog_update (this, priv, local, CHANGELOG_TYPE_ENTRY);
unwind:
+ changelog_dec_fop_cnt (this, priv, local);
CHANGELOG_STACK_UNWIND (mkdir, frame, op_ret, op_errno,
inode, buf, preparent, postparent, xdata);
return 0;
@@ -353,6 +362,7 @@ changelog_mkdir (call_frame_t *frame, xlator_t *this,
changelog_set_usable_record_and_length (frame->local, xtra_len, 5);
wind:
+ changelog_color_fop_and_inc_cnt (this, priv, frame->local);
STACK_WIND (frame, changelog_mkdir_cbk,
FIRST_CHILD (this), FIRST_CHILD (this)->fops->mkdir,
loc, mode, umask, xdata);
@@ -379,6 +389,7 @@ changelog_symlink_cbk (call_frame_t *frame,
changelog_update (this, priv, local, CHANGELOG_TYPE_ENTRY);
unwind:
+ changelog_dec_fop_cnt (this, priv, local);
CHANGELOG_STACK_UNWIND (symlink, frame, op_ret, op_errno,
inode, buf, preparent, postparent, xdata);
return 0;
@@ -422,6 +433,7 @@ changelog_symlink (call_frame_t *frame, xlator_t *this,
changelog_set_usable_record_and_length (frame->local, xtra_len, 2);
wind:
+ changelog_color_fop_and_inc_cnt (this, priv, frame->local);
STACK_WIND (frame, changelog_symlink_cbk,
FIRST_CHILD (this), FIRST_CHILD (this)->fops->symlink,
linkname, loc, umask, xdata);
@@ -448,6 +460,7 @@ changelog_mknod_cbk (call_frame_t *frame,
changelog_update (this, priv, local, CHANGELOG_TYPE_ENTRY);
unwind:
+ changelog_dec_fop_cnt (this, priv, local);
CHANGELOG_STACK_UNWIND (mknod, frame, op_ret, op_errno,
inode, buf, preparent, postparent, xdata);
return 0;
@@ -500,6 +513,7 @@ changelog_mknod (call_frame_t *frame,
changelog_set_usable_record_and_length (frame->local, xtra_len, 5);
wind:
+ changelog_color_fop_and_inc_cnt (this, priv, frame->local);
STACK_WIND (frame, changelog_mknod_cbk,
FIRST_CHILD (this), FIRST_CHILD (this)->fops->mknod,
loc, mode, dev, umask, xdata);
@@ -527,6 +541,7 @@ changelog_create_cbk (call_frame_t *frame,
changelog_update (this, priv, local, CHANGELOG_TYPE_ENTRY);
unwind:
+ changelog_dec_fop_cnt (this, priv, local);
CHANGELOG_STACK_UNWIND (create, frame,
op_ret, op_errno, fd, inode,
buf, preparent, postparent, xdata);
@@ -583,6 +598,7 @@ changelog_create (call_frame_t *frame, xlator_t *this,
changelog_set_usable_record_and_length (frame->local, xtra_len, 5);
wind:
+ changelog_color_fop_and_inc_cnt (this, priv, frame->local);
STACK_WIND (frame, changelog_create_cbk,
FIRST_CHILD (this), FIRST_CHILD (this)->fops->create,
loc, flags, mode, umask, fd, xdata);
@@ -615,6 +631,7 @@ changelog_fsetattr_cbk (call_frame_t *frame,
changelog_update (this, priv, local, CHANGELOG_TYPE_METADATA);
unwind:
+ changelog_dec_fop_cnt (this, priv, local);
CHANGELOG_STACK_UNWIND (fsetattr, frame, op_ret, op_errno,
preop_stbuf, postop_stbuf, xdata);
@@ -649,6 +666,7 @@ changelog_fsetattr (call_frame_t *frame,
changelog_set_usable_record_and_length (frame->local, xtra_len, 1);
wind:
+ changelog_color_fop_and_inc_cnt (this, priv, frame->local);
STACK_WIND (frame, changelog_fsetattr_cbk,
FIRST_CHILD (this), FIRST_CHILD (this)->fops->fsetattr,
fd, stbuf, valid, xdata);
@@ -674,6 +692,7 @@ changelog_setattr_cbk (call_frame_t *frame,
changelog_update (this, priv, local, CHANGELOG_TYPE_METADATA);
unwind:
+ changelog_dec_fop_cnt (this, priv, local);
CHANGELOG_STACK_UNWIND (setattr, frame, op_ret, op_errno,
preop_stbuf, postop_stbuf, xdata);
@@ -706,6 +725,7 @@ changelog_setattr (call_frame_t *frame,
changelog_set_usable_record_and_length (frame->local, xtra_len, 1);
wind:
+ changelog_color_fop_and_inc_cnt (this, priv, frame->local);
STACK_WIND (frame, changelog_setattr_cbk,
FIRST_CHILD (this), FIRST_CHILD (this)->fops->setattr,
loc, stbuf, valid, xdata);
@@ -730,6 +750,7 @@ changelog_fremovexattr_cbk (call_frame_t *frame,
changelog_update (this, priv, local, CHANGELOG_TYPE_METADATA);
unwind:
+ changelog_dec_fop_cnt (this, priv, local);
CHANGELOG_STACK_UNWIND (fremovexattr, frame, op_ret, op_errno, xdata);
return 0;
@@ -758,6 +779,7 @@ changelog_fremovexattr (call_frame_t *frame, xlator_t *this,
changelog_set_usable_record_and_length (frame->local, xtra_len, 1);
wind:
+ changelog_color_fop_and_inc_cnt (this, priv, frame->local);
STACK_WIND (frame, changelog_fremovexattr_cbk,
FIRST_CHILD (this), FIRST_CHILD (this)->fops->fremovexattr,
fd, name, xdata);
@@ -780,6 +802,7 @@ changelog_removexattr_cbk (call_frame_t *frame,
changelog_update (this, priv, local, CHANGELOG_TYPE_METADATA);
unwind:
+ changelog_dec_fop_cnt (this, priv, local);
CHANGELOG_STACK_UNWIND (removexattr, frame, op_ret, op_errno, xdata);
return 0;
@@ -808,6 +831,7 @@ changelog_removexattr (call_frame_t *frame, xlator_t *this,
changelog_set_usable_record_and_length (frame->local, xtra_len, 1);
wind:
+ changelog_color_fop_and_inc_cnt (this, priv, frame->local);
STACK_WIND (frame, changelog_removexattr_cbk,
FIRST_CHILD (this), FIRST_CHILD (this)->fops->removexattr,
loc, name, xdata);
@@ -832,6 +856,7 @@ changelog_setxattr_cbk (call_frame_t *frame,
changelog_update (this, priv, local, CHANGELOG_TYPE_METADATA);
unwind:
+ changelog_dec_fop_cnt (this, priv, local);
CHANGELOG_STACK_UNWIND (setxattr, frame, op_ret, op_errno, xdata);
return 0;
@@ -861,6 +886,7 @@ changelog_setxattr (call_frame_t *frame,
changelog_set_usable_record_and_length (frame->local, xtra_len, 1);
wind:
+ changelog_color_fop_and_inc_cnt (this, priv, frame->local);
STACK_WIND (frame, changelog_setxattr_cbk,
FIRST_CHILD (this), FIRST_CHILD (this)->fops->setxattr,
loc, dict, flags, xdata);
@@ -883,6 +909,7 @@ changelog_fsetxattr_cbk (call_frame_t *frame,
changelog_update (this, priv, local, CHANGELOG_TYPE_METADATA);
unwind:
+ changelog_dec_fop_cnt (this, priv, local);
CHANGELOG_STACK_UNWIND (fsetxattr, frame, op_ret, op_errno, xdata);
return 0;
@@ -912,6 +939,7 @@ changelog_fsetxattr (call_frame_t *frame,
changelog_set_usable_record_and_length (frame->local, xtra_len, 1);
wind:
+ changelog_color_fop_and_inc_cnt (this, priv, frame->local);
STACK_WIND (frame, changelog_fsetxattr_cbk,
FIRST_CHILD (this), FIRST_CHILD (this)->fops->fsetxattr,
fd, dict, flags, xdata);
@@ -944,6 +972,7 @@ changelog_truncate_cbk (call_frame_t *frame,
changelog_update (this, priv, local, CHANGELOG_TYPE_DATA);
unwind:
+ changelog_dec_fop_cnt (this, priv, local);
CHANGELOG_STACK_UNWIND (truncate, frame,
op_ret, op_errno, prebuf, postbuf, xdata);
return 0;
@@ -962,6 +991,7 @@ changelog_truncate (call_frame_t *frame,
loc->inode, loc->inode->gfid, 0);
wind:
+ changelog_color_fop_and_inc_cnt (this, priv, frame->local);
STACK_WIND (frame, changelog_truncate_cbk,
FIRST_CHILD (this), FIRST_CHILD (this)->fops->truncate,
loc, offset, xdata);
@@ -985,6 +1015,7 @@ changelog_ftruncate_cbk (call_frame_t *frame,
changelog_update (this, priv, local, CHANGELOG_TYPE_DATA);
unwind:
+ changelog_dec_fop_cnt (this, priv, local);
CHANGELOG_STACK_UNWIND (ftruncate, frame,
op_ret, op_errno, prebuf, postbuf, xdata);
return 0;
@@ -1003,6 +1034,7 @@ changelog_ftruncate (call_frame_t *frame,
fd->inode, fd->inode->gfid, 0);
wind:
+ changelog_color_fop_and_inc_cnt (this, priv, frame->local);
STACK_WIND (frame, changelog_ftruncate_cbk,
FIRST_CHILD (this), FIRST_CHILD (this)->fops->ftruncate,
fd, offset, xdata);
@@ -1028,6 +1060,7 @@ changelog_writev_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
changelog_update (this, priv, local, CHANGELOG_TYPE_DATA);
unwind:
+ changelog_dec_fop_cnt (this, priv, local);
CHANGELOG_STACK_UNWIND (writev, frame,
op_ret, op_errno, prebuf, postbuf, xdata);
return 0;
@@ -1048,6 +1081,7 @@ changelog_writev (call_frame_t *frame,
fd->inode, fd->inode->gfid, 0);
wind:
+ changelog_color_fop_and_inc_cnt (this, priv, frame->local);
STACK_WIND (frame, changelog_writev_cbk, FIRST_CHILD (this),
FIRST_CHILD (this)->fops->writev, fd, vector,
count, offset, flags, iobref, xdata);
@@ -1089,9 +1123,16 @@ changelog_assign_encoding (changelog_priv_t *priv, char *enc)
static void
changelog_cleanup_helper_threads (xlator_t *this, changelog_priv_t *priv)
{
+ int ret = 0;
+
if (priv->cr.rollover_th) {
changelog_thread_cleanup (this, priv->cr.rollover_th);
priv->cr.rollover_th = 0;
+ ret = close (priv->cr_wfd);
+ if (ret)
+ gf_log (this->name, GF_LOG_ERROR,
+ "error closing write end of rollover pipe"
+ " (reason: %s)", strerror (errno));
}
if (priv->cf.fsync_th) {
@@ -1105,6 +1146,41 @@ static int
changelog_spawn_helper_threads (xlator_t *this, changelog_priv_t *priv)
{
int ret = 0;
+ int flags = 0;
+ int pipe_fd[2] = {0, 0};
+
+ /* Geo-Rep snapshot dependency:
+ *
+ * To implement explicit rollover of changlog journal on barrier
+ * notification, a pipe is created to communicate between
+ * 'changelog_rollover' thread and changelog main thread. The select
+ * call used to wait till roll-over time in changelog_rollover thread
+ * is modified to wait on read end of the pipe. When barrier
+ * notification comes (i.e, in 'reconfigure'), select in
+ * changelog_rollover thread is woken up explicitly by writing into
+ * the write end of the pipe in 'reconfigure'.
+ */
+
+ ret = pipe (pipe_fd);
+ if (ret == -1) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "Cannot create pipe (reason: %s)", strerror (errno));
+ goto out;
+ }
+
+ /* writer is non-blocking */
+ flags = fcntl (pipe_fd[1], F_GETFL);
+ flags |= O_NONBLOCK;
+
+ ret = fcntl (pipe_fd[1], F_SETFL, flags);
+ if (ret) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "failed to set O_NONBLOCK flag");
+ goto out;
+ }
+
+ priv->cr_wfd = pipe_fd[1];
+ priv->cr.rfd = pipe_fd[0];
priv->cr.this = this;
ret = gf_thread_create (&priv->cr.rollover_th,
@@ -1186,6 +1262,155 @@ changelog_spawn_notifier (xlator_t *this, changelog_priv_t *priv)
return ret;
}
+int
+notify (xlator_t *this, int event, void *data, ...)
+{
+ changelog_priv_t *priv = NULL;
+ dict_t *dict = NULL;
+ char buf[1] = {1};
+ int barrier = DICT_DEFAULT;
+ gf_boolean_t bclean_req = _gf_false;
+ int ret = 0;
+
+ priv = this->private;
+ if (!priv)
+ goto out;
+
+ if (event == GF_EVENT_TRANSLATOR_OP) {
+
+ dict = data;
+ /*TODO: Also barrier option is persistent. Need to
+ * decide on the brick crash scenarios.
+ */
+ barrier = dict_get_str_boolean (dict, "barrier", DICT_DEFAULT);
+
+ switch (barrier) {
+ case DICT_ERROR:
+ gf_log (this->name, GF_LOG_ERROR,
+ "Barrier dict_get_str_boolean failed");
+ ret = -1;
+ goto out;
+
+ case BARRIER_OFF:
+ gf_log (this->name, GF_LOG_INFO,
+ "Barrier off notification");
+
+ CHANGELOG_NOT_ON_THEN_GOTO(priv, ret, out);
+
+ LOCK (&priv->bflags.lock);
+ {
+ if (priv->bflags.barrier_ext == _gf_false)
+ ret = -1;
+ }
+ UNLOCK (&priv->bflags.lock);
+
+ if (ret == -1 ) {
+ gf_log (this->name, GF_LOG_ERROR, "Received"
+ " another barrier off notification"
+ " while already off");
+ goto out;
+ }
+
+ /*TODO: STOP CHANGELOG BARRIER */
+ LOCK (&priv->bflags.lock);
+ {
+ priv->bflags.barrier_ext = _gf_false;
+ }
+ UNLOCK (&priv->bflags.lock);
+
+ ret = 0;
+ goto out;
+
+ case BARRIER_ON:
+ gf_log (this->name, GF_LOG_INFO,
+ "Barrier on notification");
+
+ CHANGELOG_NOT_ON_THEN_GOTO(priv, ret, out);
+
+ LOCK (&priv->bflags.lock);
+ {
+ if (priv->bflags.barrier_ext == _gf_true)
+ ret = -1;
+ else
+ priv->bflags.barrier_ext = _gf_true;
+ }
+ UNLOCK (&priv->bflags.lock);
+
+ if (ret == -1 ) {
+ gf_log (this->name, GF_LOG_ERROR, "Received"
+ " another barrier on notification when"
+ " last one is not served yet");
+ goto out;
+ }
+
+ ret = pthread_mutex_lock (&priv->bn.bnotify_mutex);
+ CHANGELOG_PTHREAD_ERROR_HANDLE_1 (ret, out,
+ bclean_req);
+ {
+ priv->bn.bnotify = _gf_true;
+ }
+ ret = pthread_mutex_unlock (&priv->bn.bnotify_mutex);
+ CHANGELOG_PTHREAD_ERROR_HANDLE_1 (ret, out,
+ bclean_req);
+
+ /*TODO: START CHANGELOG BARRIER */
+
+ ret = changelog_barrier_notify(priv, buf);
+ if (ret) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "Explicit roll over: write failed");
+ changelog_barrier_cleanup (this, priv);
+ ret = -1;
+ goto out;
+ }
+
+ ret = pthread_mutex_lock (&priv->bn.bnotify_mutex);
+ CHANGELOG_PTHREAD_ERROR_HANDLE_1 (ret, out,
+ bclean_req);
+ {
+ /* The while condition check is required here to
+ * handle spurious wakeup of cond wait that can
+ * happen with pthreads. See man page */
+ while (priv->bn.bnotify == _gf_true) {
+ ret = pthread_cond_wait (
+ &priv->bn.bnotify_cond,
+ &priv->bn.bnotify_mutex);
+ CHANGELOG_PTHREAD_ERROR_HANDLE_1 (ret,
+ out,
+ bclean_req);
+ }
+ }
+ ret = pthread_mutex_unlock (&priv->bn.bnotify_mutex);
+ CHANGELOG_PTHREAD_ERROR_HANDLE_1 (ret, out, bclean_req);
+ gf_log (this->name, GF_LOG_INFO,
+ "Woke up: bnotify conditional wait");
+
+ ret = 0;
+ goto out;
+
+ case DICT_DEFAULT:
+ gf_log (this->name, GF_LOG_ERROR,
+ "barrier key not found");
+ ret = -1;
+ goto out;
+
+ default:
+ gf_log (this->name, GF_LOG_ERROR,
+ "Something went bad in dict_get_str_boolean");
+ ret = -1;
+ goto out;
+ }
+ } else {
+ ret = default_notify (this, event, data);
+ }
+
+ out:
+ if (bclean_req)
+ changelog_barrier_cleanup (this, priv);
+
+ return ret;
+}
+
int32_t
mem_acct_init (xlator_t *this)
{
@@ -1261,6 +1486,98 @@ changelog_init (xlator_t *this, changelog_priv_t *priv)
return ret;
}
+/* Init all pthread condition variables and locks in changelog*/
+static int
+changelog_pthread_init (xlator_t *this, changelog_priv_t *priv)
+{
+ gf_boolean_t bn_mutex_init = _gf_false;
+ gf_boolean_t bn_cond_init = _gf_false;
+ gf_boolean_t dm_mutex_black_init = _gf_false;
+ gf_boolean_t dm_cond_black_init = _gf_false;
+ gf_boolean_t dm_mutex_white_init = _gf_false;
+ gf_boolean_t dm_cond_white_init = _gf_false;
+ int ret = 0;
+
+ if ((ret = pthread_mutex_init(&priv->bn.bnotify_mutex, NULL)) != 0) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "bnotify pthread_mutex_init failed (%d)", ret);
+ ret = -1;
+ goto out;
+ }
+ bn_mutex_init = _gf_true;
+
+ if ((ret = pthread_cond_init(&priv->bn.bnotify_cond, NULL)) != 0) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "bnotify pthread_cond_init failed (%d)", ret);
+ ret = -1;
+ goto out;
+ }
+ bn_cond_init = _gf_true;
+
+ if ((ret = pthread_mutex_init(&priv->dm.drain_black_mutex, NULL)) != 0)
+ {
+ gf_log (this->name, GF_LOG_ERROR,
+ "drain_black pthread_mutex_init failed (%d)", ret);
+ ret = -1;
+ goto out;
+ }
+ dm_mutex_black_init = _gf_true;
+
+ if ((ret = pthread_cond_init(&priv->dm.drain_black_cond, NULL)) != 0) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "drain_black pthread_cond_init failed (%d)", ret);
+ ret = -1;
+ goto out;
+ }
+ dm_cond_black_init = _gf_true;
+
+ if ((ret = pthread_mutex_init(&priv->dm.drain_white_mutex, NULL)) != 0)
+ {
+ gf_log (this->name, GF_LOG_ERROR,
+ "drain_white pthread_mutex_init failed (%d)", ret);
+ ret = -1;
+ goto out;
+ }
+ dm_mutex_white_init = _gf_true;
+
+ if ((ret = pthread_cond_init(&priv->dm.drain_white_cond, NULL)) != 0) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "drain_white pthread_cond_init failed (%d)", ret);
+ ret = -1;
+ goto out;
+ }
+ dm_cond_white_init = _gf_true;
+ out:
+ if (ret) {
+ if (bn_mutex_init)
+ pthread_mutex_destroy(&priv->bn.bnotify_mutex);
+ if (bn_cond_init)
+ pthread_cond_destroy (&priv->bn.bnotify_cond);
+ if (dm_mutex_black_init)
+ pthread_mutex_destroy(&priv->dm.drain_black_mutex);
+ if (dm_cond_black_init)
+ pthread_cond_destroy (&priv->dm.drain_black_cond);
+ if (dm_mutex_white_init)
+ pthread_mutex_destroy(&priv->dm.drain_white_mutex);
+ if (dm_cond_white_init)
+ pthread_cond_destroy (&priv->dm.drain_white_cond);
+ }
+ return ret;
+}
+
+/* Destroy all pthread condition variables and locks in changelog */
+static inline void
+changelog_pthread_destroy (changelog_priv_t *priv)
+{
+ pthread_mutex_destroy (&priv->bn.bnotify_mutex);
+ pthread_cond_destroy (&priv->bn.bnotify_cond);
+ pthread_mutex_destroy (&priv->dm.drain_black_mutex);
+ pthread_cond_destroy (&priv->dm.drain_black_cond);
+ pthread_mutex_destroy (&priv->dm.drain_white_mutex);
+ pthread_cond_destroy (&priv->dm.drain_white_cond);
+ LOCK_DESTROY (&priv->bflags.lock);
+}
+
int
reconfigure (xlator_t *this, dict_t *options)
{
@@ -1363,9 +1680,10 @@ reconfigure (xlator_t *this, dict_t *options)
int32_t
init (xlator_t *this)
{
- int ret = -1;
- char *tmp = NULL;
- changelog_priv_t *priv = NULL;
+ int ret = -1;
+ char *tmp = NULL;
+ changelog_priv_t *priv = NULL;
+ gf_boolean_t cond_lock_init = _gf_false;
GF_VALIDATE_OR_GOTO ("changelog", this, out);
@@ -1451,6 +1769,24 @@ init (xlator_t *this)
goto out;
priv->changelog_fd = -1;
+
+ /* snap dependency changes */
+ priv->dm.black_fop_cnt = 0;
+ priv->dm.white_fop_cnt = 0;
+ priv->dm.drain_wait_black = _gf_false;
+ priv->dm.drain_wait_white = _gf_false;
+ priv->current_color = FOP_COLOR_BLACK;
+ priv->explicit_rollover = _gf_false;
+ /* Mutex is not needed as threads are not spawned yet */
+ priv->bn.bnotify = _gf_false;
+ ret = changelog_pthread_init (this, priv);
+ if (ret)
+ goto out;
+
+ LOCK_INIT (&priv->bflags.lock);
+ cond_lock_init = _gf_true;
+ priv->bflags.barrier_ext = _gf_false;
+
ret = changelog_init (this, priv);
if (ret)
goto out;
@@ -1469,6 +1805,9 @@ init (xlator_t *this)
}
GF_FREE (priv->changelog_brick);
GF_FREE (priv->changelog_dir);
+ if (cond_lock_init)
+ changelog_pthread_destroy (priv);
+
GF_FREE (priv);
this->private = NULL;
} else
@@ -1493,6 +1832,7 @@ fini (xlator_t *this)
mem_pool_destroy (this->local_pool);
GF_FREE (priv->changelog_brick);
GF_FREE (priv->changelog_dir);
+ changelog_pthread_destroy (priv);
GF_FREE (priv);
}