summaryrefslogtreecommitdiffstats
path: root/xlators/features/changelog/src/changelog.c
diff options
context:
space:
mode:
authorKotresh H R <khiremat@redhat.com>2014-03-10 20:03:55 +0530
committerVijay Bellur <vbellur@redhat.com>2014-05-01 08:44:56 -0700
commitc523a04a0bd3edce9cf8ed238b838ebd957f1066 (patch)
tree0b96d0993b56a0333b28de71b47598ac7c3eed07 /xlators/features/changelog/src/changelog.c
parent12f1fab930dc0f6f103bae03fab981409ed31b4e (diff)
feature/changelog: Draining of in-transit fops in changelog.
This is required for Geo-rep to work with snapshots. Following things are done in this patch. 1. Draining of in-transit fops during changelog rollover. 2. Explicit rollover of changelog when snapshot barrier notification comes. During this, intransit fops are drained and changelog is rolled over. For more details on the purpose of the patch. Please visit following link. http://www.gluster.org/community/documentation/index.php/Changelog_Design_changes_for_snapshot Change-Id: I22690131e19d3027f6d8957178bdc3431b9062f6 Signed-off-by: Kotresh H R <khiremat@redhat.com> Reviewed-on: http://review.gluster.org/7216 Reviewed-by: Venky Shankar <vshankar@redhat.com> Reviewed-by: Varun Shastry <vshastry@redhat.com> Tested-by: Gluster Build System <jenkins@build.gluster.com>
Diffstat (limited to 'xlators/features/changelog/src/changelog.c')
-rw-r--r--xlators/features/changelog/src/changelog.c346
1 files changed, 343 insertions, 3 deletions
diff --git a/xlators/features/changelog/src/changelog.c b/xlators/features/changelog/src/changelog.c
index 3e40984f6de..0a491c5ac07 100644
--- a/xlators/features/changelog/src/changelog.c
+++ b/xlators/features/changelog/src/changelog.c
@@ -62,6 +62,7 @@ changelog_rmdir_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
changelog_update (this, priv, local, CHANGELOG_TYPE_ENTRY);
unwind:
+ changelog_dec_fop_cnt (this, priv, local);
CHANGELOG_STACK_UNWIND (rmdir, frame, op_ret, op_errno,
preparent, postparent, xdata);
return 0;
@@ -94,6 +95,7 @@ changelog_rmdir (call_frame_t *frame, xlator_t *this,
changelog_set_usable_record_and_length (frame->local, xtra_len, 2);
wind:
+ changelog_color_fop_and_inc_cnt (this, priv, frame->local);
STACK_WIND (frame, changelog_rmdir_cbk,
FIRST_CHILD (this), FIRST_CHILD (this)->fops->rmdir,
loc, xflags, xdata);
@@ -118,6 +120,7 @@ changelog_unlink_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
changelog_update (this, priv, local, CHANGELOG_TYPE_ENTRY);
unwind:
+ changelog_dec_fop_cnt (this, priv, local);
CHANGELOG_STACK_UNWIND (unlink, frame, op_ret, op_errno,
preparent, postparent, xdata);
return 0;
@@ -150,6 +153,7 @@ changelog_unlink (call_frame_t *frame, xlator_t *this,
changelog_set_usable_record_and_length (frame->local, xtra_len, 2);
wind:
+ changelog_color_fop_and_inc_cnt (this, priv, frame->local);
STACK_WIND (frame, changelog_unlink_cbk,
FIRST_CHILD (this), FIRST_CHILD (this)->fops->unlink,
loc, xflags, xdata);
@@ -177,6 +181,7 @@ changelog_rename_cbk (call_frame_t *frame,
changelog_update (this, priv, local, CHANGELOG_TYPE_ENTRY);
unwind:
+ changelog_dec_fop_cnt (this, priv, local);
CHANGELOG_STACK_UNWIND (rename, frame, op_ret, op_errno,
buf, preoldparent, postoldparent,
prenewparent, postnewparent, xdata);
@@ -216,6 +221,7 @@ changelog_rename (call_frame_t *frame, xlator_t *this,
changelog_set_usable_record_and_length (frame->local, xtra_len, 3);
wind:
+ changelog_color_fop_and_inc_cnt (this, priv, frame->local);
STACK_WIND (frame, changelog_rename_cbk,
FIRST_CHILD (this), FIRST_CHILD (this)->fops->rename,
oldloc, newloc, xdata);
@@ -242,6 +248,7 @@ changelog_link_cbk (call_frame_t *frame,
changelog_update (this, priv, local, CHANGELOG_TYPE_ENTRY);
unwind:
+ changelog_dec_fop_cnt (this, priv, local);
CHANGELOG_STACK_UNWIND (link, frame, op_ret, op_errno,
inode, buf, preparent, postparent, xdata);
return 0;
@@ -276,6 +283,7 @@ changelog_link (call_frame_t *frame,
changelog_set_usable_record_and_length (frame->local, xtra_len, 2);
wind:
+ changelog_color_fop_and_inc_cnt (this, priv, frame->local);
STACK_WIND (frame, changelog_link_cbk,
FIRST_CHILD (this), FIRST_CHILD (this)->fops->link,
oldloc, newloc, xdata);
@@ -302,6 +310,7 @@ changelog_mkdir_cbk (call_frame_t *frame,
changelog_update (this, priv, local, CHANGELOG_TYPE_ENTRY);
unwind:
+ changelog_dec_fop_cnt (this, priv, local);
CHANGELOG_STACK_UNWIND (mkdir, frame, op_ret, op_errno,
inode, buf, preparent, postparent, xdata);
return 0;
@@ -353,6 +362,7 @@ changelog_mkdir (call_frame_t *frame, xlator_t *this,
changelog_set_usable_record_and_length (frame->local, xtra_len, 5);
wind:
+ changelog_color_fop_and_inc_cnt (this, priv, frame->local);
STACK_WIND (frame, changelog_mkdir_cbk,
FIRST_CHILD (this), FIRST_CHILD (this)->fops->mkdir,
loc, mode, umask, xdata);
@@ -379,6 +389,7 @@ changelog_symlink_cbk (call_frame_t *frame,
changelog_update (this, priv, local, CHANGELOG_TYPE_ENTRY);
unwind:
+ changelog_dec_fop_cnt (this, priv, local);
CHANGELOG_STACK_UNWIND (symlink, frame, op_ret, op_errno,
inode, buf, preparent, postparent, xdata);
return 0;
@@ -422,6 +433,7 @@ changelog_symlink (call_frame_t *frame, xlator_t *this,
changelog_set_usable_record_and_length (frame->local, xtra_len, 2);
wind:
+ changelog_color_fop_and_inc_cnt (this, priv, frame->local);
STACK_WIND (frame, changelog_symlink_cbk,
FIRST_CHILD (this), FIRST_CHILD (this)->fops->symlink,
linkname, loc, umask, xdata);
@@ -448,6 +460,7 @@ changelog_mknod_cbk (call_frame_t *frame,
changelog_update (this, priv, local, CHANGELOG_TYPE_ENTRY);
unwind:
+ changelog_dec_fop_cnt (this, priv, local);
CHANGELOG_STACK_UNWIND (mknod, frame, op_ret, op_errno,
inode, buf, preparent, postparent, xdata);
return 0;
@@ -500,6 +513,7 @@ changelog_mknod (call_frame_t *frame,
changelog_set_usable_record_and_length (frame->local, xtra_len, 5);
wind:
+ changelog_color_fop_and_inc_cnt (this, priv, frame->local);
STACK_WIND (frame, changelog_mknod_cbk,
FIRST_CHILD (this), FIRST_CHILD (this)->fops->mknod,
loc, mode, dev, umask, xdata);
@@ -527,6 +541,7 @@ changelog_create_cbk (call_frame_t *frame,
changelog_update (this, priv, local, CHANGELOG_TYPE_ENTRY);
unwind:
+ changelog_dec_fop_cnt (this, priv, local);
CHANGELOG_STACK_UNWIND (create, frame,
op_ret, op_errno, fd, inode,
buf, preparent, postparent, xdata);
@@ -583,6 +598,7 @@ changelog_create (call_frame_t *frame, xlator_t *this,
changelog_set_usable_record_and_length (frame->local, xtra_len, 5);
wind:
+ changelog_color_fop_and_inc_cnt (this, priv, frame->local);
STACK_WIND (frame, changelog_create_cbk,
FIRST_CHILD (this), FIRST_CHILD (this)->fops->create,
loc, flags, mode, umask, fd, xdata);
@@ -615,6 +631,7 @@ changelog_fsetattr_cbk (call_frame_t *frame,
changelog_update (this, priv, local, CHANGELOG_TYPE_METADATA);
unwind:
+ changelog_dec_fop_cnt (this, priv, local);
CHANGELOG_STACK_UNWIND (fsetattr, frame, op_ret, op_errno,
preop_stbuf, postop_stbuf, xdata);
@@ -649,6 +666,7 @@ changelog_fsetattr (call_frame_t *frame,
changelog_set_usable_record_and_length (frame->local, xtra_len, 1);
wind:
+ changelog_color_fop_and_inc_cnt (this, priv, frame->local);
STACK_WIND (frame, changelog_fsetattr_cbk,
FIRST_CHILD (this), FIRST_CHILD (this)->fops->fsetattr,
fd, stbuf, valid, xdata);
@@ -674,6 +692,7 @@ changelog_setattr_cbk (call_frame_t *frame,
changelog_update (this, priv, local, CHANGELOG_TYPE_METADATA);
unwind:
+ changelog_dec_fop_cnt (this, priv, local);
CHANGELOG_STACK_UNWIND (setattr, frame, op_ret, op_errno,
preop_stbuf, postop_stbuf, xdata);
@@ -706,6 +725,7 @@ changelog_setattr (call_frame_t *frame,
changelog_set_usable_record_and_length (frame->local, xtra_len, 1);
wind:
+ changelog_color_fop_and_inc_cnt (this, priv, frame->local);
STACK_WIND (frame, changelog_setattr_cbk,
FIRST_CHILD (this), FIRST_CHILD (this)->fops->setattr,
loc, stbuf, valid, xdata);
@@ -730,6 +750,7 @@ changelog_fremovexattr_cbk (call_frame_t *frame,
changelog_update (this, priv, local, CHANGELOG_TYPE_METADATA);
unwind:
+ changelog_dec_fop_cnt (this, priv, local);
CHANGELOG_STACK_UNWIND (fremovexattr, frame, op_ret, op_errno, xdata);
return 0;
@@ -758,6 +779,7 @@ changelog_fremovexattr (call_frame_t *frame, xlator_t *this,
changelog_set_usable_record_and_length (frame->local, xtra_len, 1);
wind:
+ changelog_color_fop_and_inc_cnt (this, priv, frame->local);
STACK_WIND (frame, changelog_fremovexattr_cbk,
FIRST_CHILD (this), FIRST_CHILD (this)->fops->fremovexattr,
fd, name, xdata);
@@ -780,6 +802,7 @@ changelog_removexattr_cbk (call_frame_t *frame,
changelog_update (this, priv, local, CHANGELOG_TYPE_METADATA);
unwind:
+ changelog_dec_fop_cnt (this, priv, local);
CHANGELOG_STACK_UNWIND (removexattr, frame, op_ret, op_errno, xdata);
return 0;
@@ -808,6 +831,7 @@ changelog_removexattr (call_frame_t *frame, xlator_t *this,
changelog_set_usable_record_and_length (frame->local, xtra_len, 1);
wind:
+ changelog_color_fop_and_inc_cnt (this, priv, frame->local);
STACK_WIND (frame, changelog_removexattr_cbk,
FIRST_CHILD (this), FIRST_CHILD (this)->fops->removexattr,
loc, name, xdata);
@@ -832,6 +856,7 @@ changelog_setxattr_cbk (call_frame_t *frame,
changelog_update (this, priv, local, CHANGELOG_TYPE_METADATA);
unwind:
+ changelog_dec_fop_cnt (this, priv, local);
CHANGELOG_STACK_UNWIND (setxattr, frame, op_ret, op_errno, xdata);
return 0;
@@ -861,6 +886,7 @@ changelog_setxattr (call_frame_t *frame,
changelog_set_usable_record_and_length (frame->local, xtra_len, 1);
wind:
+ changelog_color_fop_and_inc_cnt (this, priv, frame->local);
STACK_WIND (frame, changelog_setxattr_cbk,
FIRST_CHILD (this), FIRST_CHILD (this)->fops->setxattr,
loc, dict, flags, xdata);
@@ -883,6 +909,7 @@ changelog_fsetxattr_cbk (call_frame_t *frame,
changelog_update (this, priv, local, CHANGELOG_TYPE_METADATA);
unwind:
+ changelog_dec_fop_cnt (this, priv, local);
CHANGELOG_STACK_UNWIND (fsetxattr, frame, op_ret, op_errno, xdata);
return 0;
@@ -912,6 +939,7 @@ changelog_fsetxattr (call_frame_t *frame,
changelog_set_usable_record_and_length (frame->local, xtra_len, 1);
wind:
+ changelog_color_fop_and_inc_cnt (this, priv, frame->local);
STACK_WIND (frame, changelog_fsetxattr_cbk,
FIRST_CHILD (this), FIRST_CHILD (this)->fops->fsetxattr,
fd, dict, flags, xdata);
@@ -944,6 +972,7 @@ changelog_truncate_cbk (call_frame_t *frame,
changelog_update (this, priv, local, CHANGELOG_TYPE_DATA);
unwind:
+ changelog_dec_fop_cnt (this, priv, local);
CHANGELOG_STACK_UNWIND (truncate, frame,
op_ret, op_errno, prebuf, postbuf, xdata);
return 0;
@@ -962,6 +991,7 @@ changelog_truncate (call_frame_t *frame,
loc->inode, loc->inode->gfid, 0);
wind:
+ changelog_color_fop_and_inc_cnt (this, priv, frame->local);
STACK_WIND (frame, changelog_truncate_cbk,
FIRST_CHILD (this), FIRST_CHILD (this)->fops->truncate,
loc, offset, xdata);
@@ -985,6 +1015,7 @@ changelog_ftruncate_cbk (call_frame_t *frame,
changelog_update (this, priv, local, CHANGELOG_TYPE_DATA);
unwind:
+ changelog_dec_fop_cnt (this, priv, local);
CHANGELOG_STACK_UNWIND (ftruncate, frame,
op_ret, op_errno, prebuf, postbuf, xdata);
return 0;
@@ -1003,6 +1034,7 @@ changelog_ftruncate (call_frame_t *frame,
fd->inode, fd->inode->gfid, 0);
wind:
+ changelog_color_fop_and_inc_cnt (this, priv, frame->local);
STACK_WIND (frame, changelog_ftruncate_cbk,
FIRST_CHILD (this), FIRST_CHILD (this)->fops->ftruncate,
fd, offset, xdata);
@@ -1028,6 +1060,7 @@ changelog_writev_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
changelog_update (this, priv, local, CHANGELOG_TYPE_DATA);
unwind:
+ changelog_dec_fop_cnt (this, priv, local);
CHANGELOG_STACK_UNWIND (writev, frame,
op_ret, op_errno, prebuf, postbuf, xdata);
return 0;
@@ -1048,6 +1081,7 @@ changelog_writev (call_frame_t *frame,
fd->inode, fd->inode->gfid, 0);
wind:
+ changelog_color_fop_and_inc_cnt (this, priv, frame->local);
STACK_WIND (frame, changelog_writev_cbk, FIRST_CHILD (this),
FIRST_CHILD (this)->fops->writev, fd, vector,
count, offset, flags, iobref, xdata);
@@ -1089,9 +1123,16 @@ changelog_assign_encoding (changelog_priv_t *priv, char *enc)
static void
changelog_cleanup_helper_threads (xlator_t *this, changelog_priv_t *priv)
{
+ int ret = 0;
+
if (priv->cr.rollover_th) {
changelog_thread_cleanup (this, priv->cr.rollover_th);
priv->cr.rollover_th = 0;
+ ret = close (priv->cr_wfd);
+ if (ret)
+ gf_log (this->name, GF_LOG_ERROR,
+ "error closing write end of rollover pipe"
+ " (reason: %s)", strerror (errno));
}
if (priv->cf.fsync_th) {
@@ -1105,6 +1146,41 @@ static int
changelog_spawn_helper_threads (xlator_t *this, changelog_priv_t *priv)
{
int ret = 0;
+ int flags = 0;
+ int pipe_fd[2] = {0, 0};
+
+ /* Geo-Rep snapshot dependency:
+ *
+ * To implement explicit rollover of changlog journal on barrier
+ * notification, a pipe is created to communicate between
+ * 'changelog_rollover' thread and changelog main thread. The select
+ * call used to wait till roll-over time in changelog_rollover thread
+ * is modified to wait on read end of the pipe. When barrier
+ * notification comes (i.e, in 'reconfigure'), select in
+ * changelog_rollover thread is woken up explicitly by writing into
+ * the write end of the pipe in 'reconfigure'.
+ */
+
+ ret = pipe (pipe_fd);
+ if (ret == -1) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "Cannot create pipe (reason: %s)", strerror (errno));
+ goto out;
+ }
+
+ /* writer is non-blocking */
+ flags = fcntl (pipe_fd[1], F_GETFL);
+ flags |= O_NONBLOCK;
+
+ ret = fcntl (pipe_fd[1], F_SETFL, flags);
+ if (ret) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "failed to set O_NONBLOCK flag");
+ goto out;
+ }
+
+ priv->cr_wfd = pipe_fd[1];
+ priv->cr.rfd = pipe_fd[0];
priv->cr.this = this;
ret = gf_thread_create (&priv->cr.rollover_th,
@@ -1186,6 +1262,155 @@ changelog_spawn_notifier (xlator_t *this, changelog_priv_t *priv)
return ret;
}
+int
+notify (xlator_t *this, int event, void *data, ...)
+{
+ changelog_priv_t *priv = NULL;
+ dict_t *dict = NULL;
+ char buf[1] = {1};
+ int barrier = DICT_DEFAULT;
+ gf_boolean_t bclean_req = _gf_false;
+ int ret = 0;
+
+ priv = this->private;
+ if (!priv)
+ goto out;
+
+ if (event == GF_EVENT_TRANSLATOR_OP) {
+
+ dict = data;
+ /*TODO: Also barrier option is persistent. Need to
+ * decide on the brick crash scenarios.
+ */
+ barrier = dict_get_str_boolean (dict, "barrier", DICT_DEFAULT);
+
+ switch (barrier) {
+ case DICT_ERROR:
+ gf_log (this->name, GF_LOG_ERROR,
+ "Barrier dict_get_str_boolean failed");
+ ret = -1;
+ goto out;
+
+ case BARRIER_OFF:
+ gf_log (this->name, GF_LOG_INFO,
+ "Barrier off notification");
+
+ CHANGELOG_NOT_ON_THEN_GOTO(priv, ret, out);
+
+ LOCK (&priv->bflags.lock);
+ {
+ if (priv->bflags.barrier_ext == _gf_false)
+ ret = -1;
+ }
+ UNLOCK (&priv->bflags.lock);
+
+ if (ret == -1 ) {
+ gf_log (this->name, GF_LOG_ERROR, "Received"
+ " another barrier off notification"
+ " while already off");
+ goto out;
+ }
+
+ /*TODO: STOP CHANGELOG BARRIER */
+ LOCK (&priv->bflags.lock);
+ {
+ priv->bflags.barrier_ext = _gf_false;
+ }
+ UNLOCK (&priv->bflags.lock);
+
+ ret = 0;
+ goto out;
+
+ case BARRIER_ON:
+ gf_log (this->name, GF_LOG_INFO,
+ "Barrier on notification");
+
+ CHANGELOG_NOT_ON_THEN_GOTO(priv, ret, out);
+
+ LOCK (&priv->bflags.lock);
+ {
+ if (priv->bflags.barrier_ext == _gf_true)
+ ret = -1;
+ else
+ priv->bflags.barrier_ext = _gf_true;
+ }
+ UNLOCK (&priv->bflags.lock);
+
+ if (ret == -1 ) {
+ gf_log (this->name, GF_LOG_ERROR, "Received"
+ " another barrier on notification when"
+ " last one is not served yet");
+ goto out;
+ }
+
+ ret = pthread_mutex_lock (&priv->bn.bnotify_mutex);
+ CHANGELOG_PTHREAD_ERROR_HANDLE_1 (ret, out,
+ bclean_req);
+ {
+ priv->bn.bnotify = _gf_true;
+ }
+ ret = pthread_mutex_unlock (&priv->bn.bnotify_mutex);
+ CHANGELOG_PTHREAD_ERROR_HANDLE_1 (ret, out,
+ bclean_req);
+
+ /*TODO: START CHANGELOG BARRIER */
+
+ ret = changelog_barrier_notify(priv, buf);
+ if (ret) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "Explicit roll over: write failed");
+ changelog_barrier_cleanup (this, priv);
+ ret = -1;
+ goto out;
+ }
+
+ ret = pthread_mutex_lock (&priv->bn.bnotify_mutex);
+ CHANGELOG_PTHREAD_ERROR_HANDLE_1 (ret, out,
+ bclean_req);
+ {
+ /* The while condition check is required here to
+ * handle spurious wakeup of cond wait that can
+ * happen with pthreads. See man page */
+ while (priv->bn.bnotify == _gf_true) {
+ ret = pthread_cond_wait (
+ &priv->bn.bnotify_cond,
+ &priv->bn.bnotify_mutex);
+ CHANGELOG_PTHREAD_ERROR_HANDLE_1 (ret,
+ out,
+ bclean_req);
+ }
+ }
+ ret = pthread_mutex_unlock (&priv->bn.bnotify_mutex);
+ CHANGELOG_PTHREAD_ERROR_HANDLE_1 (ret, out, bclean_req);
+ gf_log (this->name, GF_LOG_INFO,
+ "Woke up: bnotify conditional wait");
+
+ ret = 0;
+ goto out;
+
+ case DICT_DEFAULT:
+ gf_log (this->name, GF_LOG_ERROR,
+ "barrier key not found");
+ ret = -1;
+ goto out;
+
+ default:
+ gf_log (this->name, GF_LOG_ERROR,
+ "Something went bad in dict_get_str_boolean");
+ ret = -1;
+ goto out;
+ }
+ } else {
+ ret = default_notify (this, event, data);
+ }
+
+ out:
+ if (bclean_req)
+ changelog_barrier_cleanup (this, priv);
+
+ return ret;
+}
+
int32_t
mem_acct_init (xlator_t *this)
{
@@ -1261,6 +1486,98 @@ changelog_init (xlator_t *this, changelog_priv_t *priv)
return ret;
}
+/* Init all pthread condition variables and locks in changelog*/
+static int
+changelog_pthread_init (xlator_t *this, changelog_priv_t *priv)
+{
+ gf_boolean_t bn_mutex_init = _gf_false;
+ gf_boolean_t bn_cond_init = _gf_false;
+ gf_boolean_t dm_mutex_black_init = _gf_false;
+ gf_boolean_t dm_cond_black_init = _gf_false;
+ gf_boolean_t dm_mutex_white_init = _gf_false;
+ gf_boolean_t dm_cond_white_init = _gf_false;
+ int ret = 0;
+
+ if ((ret = pthread_mutex_init(&priv->bn.bnotify_mutex, NULL)) != 0) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "bnotify pthread_mutex_init failed (%d)", ret);
+ ret = -1;
+ goto out;
+ }
+ bn_mutex_init = _gf_true;
+
+ if ((ret = pthread_cond_init(&priv->bn.bnotify_cond, NULL)) != 0) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "bnotify pthread_cond_init failed (%d)", ret);
+ ret = -1;
+ goto out;
+ }
+ bn_cond_init = _gf_true;
+
+ if ((ret = pthread_mutex_init(&priv->dm.drain_black_mutex, NULL)) != 0)
+ {
+ gf_log (this->name, GF_LOG_ERROR,
+ "drain_black pthread_mutex_init failed (%d)", ret);
+ ret = -1;
+ goto out;
+ }
+ dm_mutex_black_init = _gf_true;
+
+ if ((ret = pthread_cond_init(&priv->dm.drain_black_cond, NULL)) != 0) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "drain_black pthread_cond_init failed (%d)", ret);
+ ret = -1;
+ goto out;
+ }
+ dm_cond_black_init = _gf_true;
+
+ if ((ret = pthread_mutex_init(&priv->dm.drain_white_mutex, NULL)) != 0)
+ {
+ gf_log (this->name, GF_LOG_ERROR,
+ "drain_white pthread_mutex_init failed (%d)", ret);
+ ret = -1;
+ goto out;
+ }
+ dm_mutex_white_init = _gf_true;
+
+ if ((ret = pthread_cond_init(&priv->dm.drain_white_cond, NULL)) != 0) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "drain_white pthread_cond_init failed (%d)", ret);
+ ret = -1;
+ goto out;
+ }
+ dm_cond_white_init = _gf_true;
+ out:
+ if (ret) {
+ if (bn_mutex_init)
+ pthread_mutex_destroy(&priv->bn.bnotify_mutex);
+ if (bn_cond_init)
+ pthread_cond_destroy (&priv->bn.bnotify_cond);
+ if (dm_mutex_black_init)
+ pthread_mutex_destroy(&priv->dm.drain_black_mutex);
+ if (dm_cond_black_init)
+ pthread_cond_destroy (&priv->dm.drain_black_cond);
+ if (dm_mutex_white_init)
+ pthread_mutex_destroy(&priv->dm.drain_white_mutex);
+ if (dm_cond_white_init)
+ pthread_cond_destroy (&priv->dm.drain_white_cond);
+ }
+ return ret;
+}
+
+/* Destroy all pthread condition variables and locks in changelog */
+static inline void
+changelog_pthread_destroy (changelog_priv_t *priv)
+{
+ pthread_mutex_destroy (&priv->bn.bnotify_mutex);
+ pthread_cond_destroy (&priv->bn.bnotify_cond);
+ pthread_mutex_destroy (&priv->dm.drain_black_mutex);
+ pthread_cond_destroy (&priv->dm.drain_black_cond);
+ pthread_mutex_destroy (&priv->dm.drain_white_mutex);
+ pthread_cond_destroy (&priv->dm.drain_white_cond);
+ LOCK_DESTROY (&priv->bflags.lock);
+}
+
int
reconfigure (xlator_t *this, dict_t *options)
{
@@ -1363,9 +1680,10 @@ reconfigure (xlator_t *this, dict_t *options)
int32_t
init (xlator_t *this)
{
- int ret = -1;
- char *tmp = NULL;
- changelog_priv_t *priv = NULL;
+ int ret = -1;
+ char *tmp = NULL;
+ changelog_priv_t *priv = NULL;
+ gf_boolean_t cond_lock_init = _gf_false;
GF_VALIDATE_OR_GOTO ("changelog", this, out);
@@ -1451,6 +1769,24 @@ init (xlator_t *this)
goto out;
priv->changelog_fd = -1;
+
+ /* snap dependency changes */
+ priv->dm.black_fop_cnt = 0;
+ priv->dm.white_fop_cnt = 0;
+ priv->dm.drain_wait_black = _gf_false;
+ priv->dm.drain_wait_white = _gf_false;
+ priv->current_color = FOP_COLOR_BLACK;
+ priv->explicit_rollover = _gf_false;
+ /* Mutex is not needed as threads are not spawned yet */
+ priv->bn.bnotify = _gf_false;
+ ret = changelog_pthread_init (this, priv);
+ if (ret)
+ goto out;
+
+ LOCK_INIT (&priv->bflags.lock);
+ cond_lock_init = _gf_true;
+ priv->bflags.barrier_ext = _gf_false;
+
ret = changelog_init (this, priv);
if (ret)
goto out;
@@ -1469,6 +1805,9 @@ init (xlator_t *this)
}
GF_FREE (priv->changelog_brick);
GF_FREE (priv->changelog_dir);
+ if (cond_lock_init)
+ changelog_pthread_destroy (priv);
+
GF_FREE (priv);
this->private = NULL;
} else
@@ -1493,6 +1832,7 @@ fini (xlator_t *this)
mem_pool_destroy (this->local_pool);
GF_FREE (priv->changelog_brick);
GF_FREE (priv->changelog_dir);
+ changelog_pthread_destroy (priv);
GF_FREE (priv);
}