summaryrefslogtreecommitdiffstats
path: root/xlators/features/changelog/src/changelog-helpers.c
diff options
context:
space:
mode:
authorKotresh H R <khiremat@redhat.com>2014-03-10 20:03:55 +0530
committerVijay Bellur <vbellur@redhat.com>2014-05-01 08:44:56 -0700
commitc523a04a0bd3edce9cf8ed238b838ebd957f1066 (patch)
tree0b96d0993b56a0333b28de71b47598ac7c3eed07 /xlators/features/changelog/src/changelog-helpers.c
parent12f1fab930dc0f6f103bae03fab981409ed31b4e (diff)
feature/changelog: Draining of in-transit fops in changelog.
This is required for Geo-rep to work with snapshots. Following things are done in this patch. 1. Draining of in-transit fops during changelog rollover. 2. Explicit rollover of changelog when snapshot barrier notification comes. During this, intransit fops are drained and changelog is rolled over. For more details on the purpose of the patch. Please visit following link. http://www.gluster.org/community/documentation/index.php/Changelog_Design_changes_for_snapshot Change-Id: I22690131e19d3027f6d8957178bdc3431b9062f6 Signed-off-by: Kotresh H R <khiremat@redhat.com> Reviewed-on: http://review.gluster.org/7216 Reviewed-by: Venky Shankar <vshankar@redhat.com> Reviewed-by: Varun Shastry <vshastry@redhat.com> Tested-by: Gluster Build System <jenkins@build.gluster.com>
Diffstat (limited to 'xlators/features/changelog/src/changelog-helpers.c')
-rw-r--r--xlators/features/changelog/src/changelog-helpers.c333
1 files changed, 324 insertions, 9 deletions
diff --git a/xlators/features/changelog/src/changelog-helpers.c b/xlators/features/changelog/src/changelog-helpers.c
index 91c43a16c86..c3661b9b76c 100644
--- a/xlators/features/changelog/src/changelog-helpers.c
+++ b/xlators/features/changelog/src/changelog-helpers.c
@@ -24,6 +24,15 @@
#include "changelog-encoders.h"
#include <pthread.h>
+static void
+changelog_cleanup_free_mutex (void *arg_mutex)
+{
+ pthread_mutex_t *p_mutex = (pthread_mutex_t*) arg_mutex;
+
+ if (p_mutex)
+ pthread_mutex_unlock(p_mutex);
+}
+
void
changelog_thread_cleanup (xlator_t *this, pthread_t thr_id)
{
@@ -134,6 +143,12 @@ changelog_rollover_changelog (xlator_t *this,
char nfile[PATH_MAX] = {0,};
if (priv->changelog_fd != -1) {
+ ret = fsync (priv->changelog_fd);
+ if (ret < 0) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "fsync failed (reason: %s)",
+ strerror (errno));
+ }
close (priv->changelog_fd);
priv->changelog_fd = -1;
}
@@ -166,9 +181,34 @@ changelog_rollover_changelog (xlator_t *this,
gf_log (this->name, GF_LOG_ERROR,
"Failed to send file name to notify thread"
" (reason: %s)", strerror (errno));
+ } else {
+ /* If this is explicit rollover initiated by snapshot,
+ * wakeup reconfigure thread waiting for changelog to
+ * rollover
+ */
+ if (priv->explicit_rollover) {
+ priv->explicit_rollover = _gf_false;
+ ret = pthread_mutex_lock (
+ &priv->bn.bnotify_mutex);
+ CHANGELOG_PTHREAD_ERROR_HANDLE_0 (ret, out);
+ {
+ priv->bn.bnotify = _gf_false;
+ ret = pthread_cond_signal (
+ &priv->bn.bnotify_cond);
+ CHANGELOG_PTHREAD_ERROR_HANDLE_0 (ret,
+ out);
+ gf_log (this->name, GF_LOG_INFO,
+ "Changelog published and"
+ " signalled bnotify");
+ }
+ ret = pthread_mutex_unlock (
+ &priv->bn.bnotify_mutex);
+ CHANGELOG_PTHREAD_ERROR_HANDLE_0 (ret, out);
+ }
}
}
+ out:
return ret;
}
@@ -313,9 +353,11 @@ changelog_local_init (xlator_t *this, inode_t *inode,
uuid_t gfid, int xtra_records,
gf_boolean_t update_flag)
{
+ changelog_priv_t *priv = NULL;
changelog_local_t *local = NULL;
struct iobuf *iobuf = NULL;
+ priv = this->private;
/**
* We relax the presence of inode if @update_flag is true.
* The caller (implmentation of the fop) needs to be careful to
@@ -378,6 +420,80 @@ changelog_inject_single_event (xlator_t *this,
return priv->cd.dispatchfn (this, priv, priv->cd.cd_data, cld, NULL);
}
+/* Wait till all the black fops are drained */
+void
+changelog_drain_black_fops (xlator_t *this, changelog_priv_t *priv)
+{
+ int ret = 0;
+
+ /* clean up framework of pthread_mutex is required here as
+ * 'reconfigure' terminates the changelog_rollover thread
+ * on graph change.
+ */
+ pthread_cleanup_push (changelog_cleanup_free_mutex,
+ &priv->dm.drain_black_mutex);
+ ret = pthread_mutex_lock (&priv->dm.drain_black_mutex);
+ if (ret)
+ gf_log (this->name, GF_LOG_ERROR, "pthread error:"
+ " Error:%d", ret);
+ while (priv->dm.black_fop_cnt > 0) {
+ gf_log (this->name, GF_LOG_DEBUG,
+ "Condtional wait on black fops: %ld",
+ priv->dm.black_fop_cnt);
+ priv->dm.drain_wait_black = _gf_true;
+ ret = pthread_cond_wait (&priv->dm.drain_black_cond,
+ &priv->dm.drain_black_mutex);
+ if (ret)
+ gf_log (this->name, GF_LOG_ERROR, "pthread"
+ " cond wait failed: Error:%d", ret);
+ }
+ priv->dm.drain_wait_black = _gf_false;
+ ret = pthread_mutex_unlock (&priv->dm.drain_black_mutex);
+ pthread_cleanup_pop (0);
+ if (ret)
+ gf_log (this->name, GF_LOG_ERROR, "pthread error:"
+ " Error:%d", ret);
+ gf_log (this->name, GF_LOG_DEBUG,
+ "Woke up: Conditional wait on black fops");
+}
+
+/* Wait till all the white fops are drained */
+void
+changelog_drain_white_fops (xlator_t *this, changelog_priv_t *priv)
+{
+ int ret = 0;
+
+ /* clean up framework of pthread_mutex is required here as
+ * 'reconfigure' terminates the changelog_rollover thread
+ * on graph change.
+ */
+ pthread_cleanup_push (changelog_cleanup_free_mutex,
+ &priv->dm.drain_white_mutex);
+ ret = pthread_mutex_lock (&priv->dm.drain_white_mutex);
+ if (ret)
+ gf_log (this->name, GF_LOG_ERROR, "pthread error:"
+ " Error:%d", ret);
+ while (priv->dm.white_fop_cnt > 0) {
+ gf_log (this->name, GF_LOG_DEBUG,
+ "Condtional wait on white fops : %ld",
+ priv->dm.white_fop_cnt);
+ priv->dm.drain_wait_white = _gf_true;
+ ret = pthread_cond_wait (&priv->dm.drain_white_cond,
+ &priv->dm.drain_white_mutex);
+ if (ret)
+ gf_log (this->name, GF_LOG_ERROR, "pthread"
+ " cond wait failed: Error:%d", ret);
+ }
+ priv->dm.drain_wait_white = _gf_false;
+ ret = pthread_mutex_unlock (&priv->dm.drain_white_mutex);
+ if (ret)
+ gf_log (this->name, GF_LOG_ERROR, "pthread error:"
+ " Error:%d", ret);
+ pthread_cleanup_pop (0);
+ gf_log (this->name, GF_LOG_DEBUG,
+ "Woke up: Conditional wait on white fops");
+}
+
/**
* TODO: these threads have many thing in common (wake up after
* a certain time etc..). move them into separate routine.
@@ -385,12 +501,17 @@ changelog_inject_single_event (xlator_t *this,
void *
changelog_rollover (void *data)
{
- int ret = 0;
- xlator_t *this = NULL;
- struct timeval tv = {0,};
- changelog_log_data_t cld = {0,};
- changelog_time_slice_t *slice = NULL;
- changelog_priv_t *priv = data;
+ int ret = 0;
+ xlator_t *this = NULL;
+ struct timeval tv = {0,};
+ changelog_log_data_t cld = {0,};
+ changelog_time_slice_t *slice = NULL;
+ changelog_priv_t *priv = data;
+ int max_fd = 0;
+ char buf[1] = {0};
+ int len = 0;
+
+ fd_set rset;
this = priv->cr.this;
slice = &priv->slice;
@@ -398,10 +519,62 @@ changelog_rollover (void *data)
while (1) {
tv.tv_sec = priv->rollover_time;
tv.tv_usec = 0;
-
- ret = select (0, NULL, NULL, NULL, &tv);
- if (ret)
+ FD_ZERO(&rset);
+ FD_SET(priv->cr.rfd, &rset);
+ max_fd = priv->cr.rfd;
+ max_fd = max_fd + 1;
+
+ /* It seems there is a race between actual rollover and explicit
+ * rollover. But it is handled. If actual rollover is being
+ * done and the explicit rollover event comes, the event is
+ * not missed. The next select will immediately wakeup to
+ * handle explicit wakeup.
+ */
+
+ ret = select (max_fd, &rset, NULL, NULL, &tv);
+ if (ret == -1) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "select failed: %s", strerror(errno));
continue;
+ } else if (ret && FD_ISSET(priv->cr.rfd, &rset)) {
+ gf_log (this->name, GF_LOG_INFO,
+ "Explicit wakeup of select on barrier notify");
+ len = read(priv->cr.rfd, buf, 1);
+ if (len == 0) {
+ gf_log (this->name, GF_LOG_ERROR, "BUG: Got EOF"
+ " from reconfigure notification pipe");
+ continue;
+ }
+ if (len < 0) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "Failed to read wakeup data");
+ continue;
+ }
+ /* Lock is not required as same thread is modifying.*/
+ priv->explicit_rollover = _gf_true;
+ } else {
+ gf_log (this->name, GF_LOG_DEBUG,
+ "select wokeup on timeout");
+ }
+
+ /* Reading curent_color without lock is fine here
+ * as it is only modified here and is next to reading.
+ */
+ if (priv->current_color == FOP_COLOR_BLACK) {
+ LOCK(&priv->lock);
+ priv->current_color = FOP_COLOR_WHITE;
+ UNLOCK(&priv->lock);
+ gf_log (this->name, GF_LOG_DEBUG, "Black fops"
+ " to be drained:%ld",priv->dm.black_fop_cnt);
+ changelog_drain_black_fops (this, priv);
+ } else {
+ LOCK(&priv->lock);
+ priv->current_color = FOP_COLOR_BLACK;
+ UNLOCK(&priv->lock);
+ gf_log (this->name, GF_LOG_DEBUG, "White fops"
+ " to be drained:%ld",priv->dm.white_fop_cnt);
+ changelog_drain_white_fops (this, priv);
+ }
ret = changelog_fill_rollover_data (&cld, _gf_false);
if (ret) {
@@ -694,3 +867,145 @@ changelog_update (xlator_t *this, changelog_priv_t *priv,
return;
}
+
+/* Begin: Geo-rep snapshot dependency changes */
+
+/* changelog_color_fop_and_inc_cnt: Assign color and inc fop cnt.
+ *
+ * Assigning color and increment of corresponding fop count should happen
+ * in a lock (i.e., there should be no window between them). If it does not,
+ * we might miss draining those fops which are colored but not yet incremented
+ * the count. Let's assume black fops are draining. If the black fop count
+ * reaches zero, we say draining is completed but we miss black fops which are
+ * not incremented fop count but color is assigned black.
+ */
+
+inline void
+changelog_color_fop_and_inc_cnt (xlator_t *this, changelog_priv_t *priv,
+ changelog_local_t *local)
+{
+ if (!priv || !local)
+ return;
+
+ LOCK (&priv->lock);
+ {
+ local->color = priv->current_color;
+ changelog_inc_fop_cnt (this, priv, local);
+ }
+ UNLOCK (&priv->lock);
+}
+
+/* Increments the respective fop counter based on the fop color */
+inline void
+changelog_inc_fop_cnt (xlator_t *this, changelog_priv_t *priv,
+ changelog_local_t *local)
+{
+ int ret = 0;
+
+ if (local) {
+ if (local->color == FOP_COLOR_BLACK) {
+ ret = pthread_mutex_lock (&priv->dm.drain_black_mutex);
+ CHANGELOG_PTHREAD_ERROR_HANDLE_0 (ret, out);
+ {
+ priv->dm.black_fop_cnt++;
+ }
+ ret = pthread_mutex_unlock(&priv->dm.drain_black_mutex);
+ CHANGELOG_PTHREAD_ERROR_HANDLE_0 (ret, out);
+ } else {
+ ret = pthread_mutex_lock (&priv->dm.drain_white_mutex);
+ CHANGELOG_PTHREAD_ERROR_HANDLE_0 (ret, out);
+ {
+ priv->dm.white_fop_cnt++;
+ }
+ ret = pthread_mutex_unlock(&priv->dm.drain_white_mutex);
+ CHANGELOG_PTHREAD_ERROR_HANDLE_0 (ret, out);
+ }
+ }
+ out:
+ return;
+}
+
+/* Decrements the respective fop counter based on the fop color */
+inline void
+changelog_dec_fop_cnt (xlator_t *this, changelog_priv_t *priv,
+ changelog_local_t *local)
+{
+ int ret = 0;
+
+ if (local) {
+ if (local->color == FOP_COLOR_BLACK) {
+ ret = pthread_mutex_lock (&priv->dm.drain_black_mutex);
+ CHANGELOG_PTHREAD_ERROR_HANDLE_0 (ret, out);
+ {
+ priv->dm.black_fop_cnt--;
+ if (priv->dm.black_fop_cnt == 0 &&
+ priv->dm.drain_wait_black == _gf_true) {
+ ret = pthread_cond_signal (
+ &priv->dm.drain_black_cond);
+ CHANGELOG_PTHREAD_ERROR_HANDLE_0 (ret,
+ out);
+ gf_log (this->name, GF_LOG_DEBUG,
+ "Signalled draining of black");
+ }
+ }
+ ret = pthread_mutex_unlock(&priv->dm.drain_black_mutex);
+ CHANGELOG_PTHREAD_ERROR_HANDLE_0 (ret, out);
+ } else {
+ ret = pthread_mutex_lock (&priv->dm.drain_white_mutex);
+ CHANGELOG_PTHREAD_ERROR_HANDLE_0 (ret, out);
+ {
+ priv->dm.white_fop_cnt--;
+ if (priv->dm.white_fop_cnt == 0 &&
+ priv->dm.drain_wait_white == _gf_true) {
+ ret = pthread_cond_signal (
+ &priv->dm.drain_white_cond);
+ CHANGELOG_PTHREAD_ERROR_HANDLE_0 (ret,
+ out);
+ gf_log (this->name, GF_LOG_DEBUG,
+ "Signalled draining of white");
+ }
+ }
+ ret = pthread_mutex_unlock(&priv->dm.drain_white_mutex);
+ CHANGELOG_PTHREAD_ERROR_HANDLE_0 (ret, out);
+ }
+ }
+ out:
+ return;
+}
+
+/* Write to a pipe setup between changelog main thread and changelog
+ * rollover thread to initiate explicit rollover of changelog journal.
+ */
+inline int
+changelog_barrier_notify (changelog_priv_t *priv, char *buf)
+{
+ int ret = 0;
+
+ LOCK(&priv->lock);
+ ret = changelog_write (priv->cr_wfd, buf, 1);
+ UNLOCK(&priv->lock);
+ return ret;
+}
+
+/* Clean up flags set on barrier notification */
+/*TODO: Add changelog barrier stop code with changelog barrier patch*/
+inline void
+changelog_barrier_cleanup (xlator_t *this, changelog_priv_t *priv)
+{
+ int ret = 0;
+
+ LOCK (&priv->bflags.lock);
+ priv->bflags.barrier_ext = _gf_false;
+ UNLOCK (&priv->bflags.lock);
+
+ ret = pthread_mutex_lock (&priv->bn.bnotify_mutex);
+ CHANGELOG_PTHREAD_ERROR_HANDLE_0 (ret, out);
+ {
+ priv->bn.bnotify = _gf_false;
+ }
+ ret = pthread_mutex_unlock (&priv->bn.bnotify_mutex);
+ CHANGELOG_PTHREAD_ERROR_HANDLE_0 (ret, out);
+ out:
+ return;
+}
+/* End: Geo-Rep snapshot dependency changes */