summaryrefslogtreecommitdiffstats
path: root/xlators/features/changelog/src/changelog-helpers.c
diff options
context:
space:
mode:
Diffstat (limited to 'xlators/features/changelog/src/changelog-helpers.c')
-rw-r--r--xlators/features/changelog/src/changelog-helpers.c333
1 files changed, 324 insertions, 9 deletions
diff --git a/xlators/features/changelog/src/changelog-helpers.c b/xlators/features/changelog/src/changelog-helpers.c
index 91c43a16c86..c3661b9b76c 100644
--- a/xlators/features/changelog/src/changelog-helpers.c
+++ b/xlators/features/changelog/src/changelog-helpers.c
@@ -24,6 +24,15 @@
#include "changelog-encoders.h"
#include <pthread.h>
+static void
+changelog_cleanup_free_mutex (void *arg_mutex)
+{
+ pthread_mutex_t *p_mutex = (pthread_mutex_t*) arg_mutex;
+
+ if (p_mutex)
+ pthread_mutex_unlock(p_mutex);
+}
+
void
changelog_thread_cleanup (xlator_t *this, pthread_t thr_id)
{
@@ -134,6 +143,12 @@ changelog_rollover_changelog (xlator_t *this,
char nfile[PATH_MAX] = {0,};
if (priv->changelog_fd != -1) {
+ ret = fsync (priv->changelog_fd);
+ if (ret < 0) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "fsync failed (reason: %s)",
+ strerror (errno));
+ }
close (priv->changelog_fd);
priv->changelog_fd = -1;
}
@@ -166,9 +181,34 @@ changelog_rollover_changelog (xlator_t *this,
gf_log (this->name, GF_LOG_ERROR,
"Failed to send file name to notify thread"
" (reason: %s)", strerror (errno));
+ } else {
+ /* If this is explicit rollover initiated by snapshot,
+ * wakeup reconfigure thread waiting for changelog to
+ * rollover
+ */
+ if (priv->explicit_rollover) {
+ priv->explicit_rollover = _gf_false;
+ ret = pthread_mutex_lock (
+ &priv->bn.bnotify_mutex);
+ CHANGELOG_PTHREAD_ERROR_HANDLE_0 (ret, out);
+ {
+ priv->bn.bnotify = _gf_false;
+ ret = pthread_cond_signal (
+ &priv->bn.bnotify_cond);
+ CHANGELOG_PTHREAD_ERROR_HANDLE_0 (ret,
+ out);
+ gf_log (this->name, GF_LOG_INFO,
+ "Changelog published and"
+ " signalled bnotify");
+ }
+ ret = pthread_mutex_unlock (
+ &priv->bn.bnotify_mutex);
+ CHANGELOG_PTHREAD_ERROR_HANDLE_0 (ret, out);
+ }
}
}
+ out:
return ret;
}
@@ -313,9 +353,11 @@ changelog_local_init (xlator_t *this, inode_t *inode,
uuid_t gfid, int xtra_records,
gf_boolean_t update_flag)
{
+ changelog_priv_t *priv = NULL;
changelog_local_t *local = NULL;
struct iobuf *iobuf = NULL;
+ priv = this->private;
/**
* We relax the presence of inode if @update_flag is true.
* The caller (implmentation of the fop) needs to be careful to
@@ -378,6 +420,80 @@ changelog_inject_single_event (xlator_t *this,
return priv->cd.dispatchfn (this, priv, priv->cd.cd_data, cld, NULL);
}
+/* Wait till all the black fops are drained */
+void
+changelog_drain_black_fops (xlator_t *this, changelog_priv_t *priv)
+{
+ int ret = 0;
+
+ /* clean up framework of pthread_mutex is required here as
+ * 'reconfigure' terminates the changelog_rollover thread
+ * on graph change.
+ */
+ pthread_cleanup_push (changelog_cleanup_free_mutex,
+ &priv->dm.drain_black_mutex);
+ ret = pthread_mutex_lock (&priv->dm.drain_black_mutex);
+ if (ret)
+ gf_log (this->name, GF_LOG_ERROR, "pthread error:"
+ " Error:%d", ret);
+ while (priv->dm.black_fop_cnt > 0) {
+ gf_log (this->name, GF_LOG_DEBUG,
+ "Condtional wait on black fops: %ld",
+ priv->dm.black_fop_cnt);
+ priv->dm.drain_wait_black = _gf_true;
+ ret = pthread_cond_wait (&priv->dm.drain_black_cond,
+ &priv->dm.drain_black_mutex);
+ if (ret)
+ gf_log (this->name, GF_LOG_ERROR, "pthread"
+ " cond wait failed: Error:%d", ret);
+ }
+ priv->dm.drain_wait_black = _gf_false;
+ ret = pthread_mutex_unlock (&priv->dm.drain_black_mutex);
+ pthread_cleanup_pop (0);
+ if (ret)
+ gf_log (this->name, GF_LOG_ERROR, "pthread error:"
+ " Error:%d", ret);
+ gf_log (this->name, GF_LOG_DEBUG,
+ "Woke up: Conditional wait on black fops");
+}
+
+/* Wait till all the white fops are drained */
+void
+changelog_drain_white_fops (xlator_t *this, changelog_priv_t *priv)
+{
+ int ret = 0;
+
+ /* clean up framework of pthread_mutex is required here as
+ * 'reconfigure' terminates the changelog_rollover thread
+ * on graph change.
+ */
+ pthread_cleanup_push (changelog_cleanup_free_mutex,
+ &priv->dm.drain_white_mutex);
+ ret = pthread_mutex_lock (&priv->dm.drain_white_mutex);
+ if (ret)
+ gf_log (this->name, GF_LOG_ERROR, "pthread error:"
+ " Error:%d", ret);
+ while (priv->dm.white_fop_cnt > 0) {
+ gf_log (this->name, GF_LOG_DEBUG,
+ "Condtional wait on white fops : %ld",
+ priv->dm.white_fop_cnt);
+ priv->dm.drain_wait_white = _gf_true;
+ ret = pthread_cond_wait (&priv->dm.drain_white_cond,
+ &priv->dm.drain_white_mutex);
+ if (ret)
+ gf_log (this->name, GF_LOG_ERROR, "pthread"
+ " cond wait failed: Error:%d", ret);
+ }
+ priv->dm.drain_wait_white = _gf_false;
+ ret = pthread_mutex_unlock (&priv->dm.drain_white_mutex);
+ if (ret)
+ gf_log (this->name, GF_LOG_ERROR, "pthread error:"
+ " Error:%d", ret);
+ pthread_cleanup_pop (0);
+ gf_log (this->name, GF_LOG_DEBUG,
+ "Woke up: Conditional wait on white fops");
+}
+
/**
* TODO: these threads have many thing in common (wake up after
* a certain time etc..). move them into separate routine.
@@ -385,12 +501,17 @@ changelog_inject_single_event (xlator_t *this,
void *
changelog_rollover (void *data)
{
- int ret = 0;
- xlator_t *this = NULL;
- struct timeval tv = {0,};
- changelog_log_data_t cld = {0,};
- changelog_time_slice_t *slice = NULL;
- changelog_priv_t *priv = data;
+ int ret = 0;
+ xlator_t *this = NULL;
+ struct timeval tv = {0,};
+ changelog_log_data_t cld = {0,};
+ changelog_time_slice_t *slice = NULL;
+ changelog_priv_t *priv = data;
+ int max_fd = 0;
+ char buf[1] = {0};
+ int len = 0;
+
+ fd_set rset;
this = priv->cr.this;
slice = &priv->slice;
@@ -398,10 +519,62 @@ changelog_rollover (void *data)
while (1) {
tv.tv_sec = priv->rollover_time;
tv.tv_usec = 0;
-
- ret = select (0, NULL, NULL, NULL, &tv);
- if (ret)
+ FD_ZERO(&rset);
+ FD_SET(priv->cr.rfd, &rset);
+ max_fd = priv->cr.rfd;
+ max_fd = max_fd + 1;
+
+ /* It seems there is a race between actual rollover and explicit
+ * rollover. But it is handled. If actual rollover is being
+ * done and the explicit rollover event comes, the event is
+ * not missed. The next select will immediately wakeup to
+ * handle explicit wakeup.
+ */
+
+ ret = select (max_fd, &rset, NULL, NULL, &tv);
+ if (ret == -1) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "select failed: %s", strerror(errno));
continue;
+ } else if (ret && FD_ISSET(priv->cr.rfd, &rset)) {
+ gf_log (this->name, GF_LOG_INFO,
+ "Explicit wakeup of select on barrier notify");
+ len = read(priv->cr.rfd, buf, 1);
+ if (len == 0) {
+ gf_log (this->name, GF_LOG_ERROR, "BUG: Got EOF"
+ " from reconfigure notification pipe");
+ continue;
+ }
+ if (len < 0) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "Failed to read wakeup data");
+ continue;
+ }
+ /* Lock is not required as same thread is modifying.*/
+ priv->explicit_rollover = _gf_true;
+ } else {
+ gf_log (this->name, GF_LOG_DEBUG,
+ "select wokeup on timeout");
+ }
+
+ /* Reading curent_color without lock is fine here
+ * as it is only modified here and is next to reading.
+ */
+ if (priv->current_color == FOP_COLOR_BLACK) {
+ LOCK(&priv->lock);
+ priv->current_color = FOP_COLOR_WHITE;
+ UNLOCK(&priv->lock);
+ gf_log (this->name, GF_LOG_DEBUG, "Black fops"
+ " to be drained:%ld",priv->dm.black_fop_cnt);
+ changelog_drain_black_fops (this, priv);
+ } else {
+ LOCK(&priv->lock);
+ priv->current_color = FOP_COLOR_BLACK;
+ UNLOCK(&priv->lock);
+ gf_log (this->name, GF_LOG_DEBUG, "White fops"
+ " to be drained:%ld",priv->dm.white_fop_cnt);
+ changelog_drain_white_fops (this, priv);
+ }
ret = changelog_fill_rollover_data (&cld, _gf_false);
if (ret) {
@@ -694,3 +867,145 @@ changelog_update (xlator_t *this, changelog_priv_t *priv,
return;
}
+
+/* Begin: Geo-rep snapshot dependency changes */
+
+/* changelog_color_fop_and_inc_cnt: Assign color and inc fop cnt.
+ *
+ * Assigning color and increment of corresponding fop count should happen
+ * in a lock (i.e., there should be no window between them). If it does not,
+ * we might miss draining those fops which are colored but not yet incremented
+ * the count. Let's assume black fops are draining. If the black fop count
+ * reaches zero, we say draining is completed but we miss black fops which are
+ * not incremented fop count but color is assigned black.
+ */
+
+inline void
+changelog_color_fop_and_inc_cnt (xlator_t *this, changelog_priv_t *priv,
+ changelog_local_t *local)
+{
+ if (!priv || !local)
+ return;
+
+ LOCK (&priv->lock);
+ {
+ local->color = priv->current_color;
+ changelog_inc_fop_cnt (this, priv, local);
+ }
+ UNLOCK (&priv->lock);
+}
+
+/* Increments the respective fop counter based on the fop color */
+inline void
+changelog_inc_fop_cnt (xlator_t *this, changelog_priv_t *priv,
+ changelog_local_t *local)
+{
+ int ret = 0;
+
+ if (local) {
+ if (local->color == FOP_COLOR_BLACK) {
+ ret = pthread_mutex_lock (&priv->dm.drain_black_mutex);
+ CHANGELOG_PTHREAD_ERROR_HANDLE_0 (ret, out);
+ {
+ priv->dm.black_fop_cnt++;
+ }
+ ret = pthread_mutex_unlock(&priv->dm.drain_black_mutex);
+ CHANGELOG_PTHREAD_ERROR_HANDLE_0 (ret, out);
+ } else {
+ ret = pthread_mutex_lock (&priv->dm.drain_white_mutex);
+ CHANGELOG_PTHREAD_ERROR_HANDLE_0 (ret, out);
+ {
+ priv->dm.white_fop_cnt++;
+ }
+ ret = pthread_mutex_unlock(&priv->dm.drain_white_mutex);
+ CHANGELOG_PTHREAD_ERROR_HANDLE_0 (ret, out);
+ }
+ }
+ out:
+ return;
+}
+
+/* Decrements the respective fop counter based on the fop color */
+inline void
+changelog_dec_fop_cnt (xlator_t *this, changelog_priv_t *priv,
+ changelog_local_t *local)
+{
+ int ret = 0;
+
+ if (local) {
+ if (local->color == FOP_COLOR_BLACK) {
+ ret = pthread_mutex_lock (&priv->dm.drain_black_mutex);
+ CHANGELOG_PTHREAD_ERROR_HANDLE_0 (ret, out);
+ {
+ priv->dm.black_fop_cnt--;
+ if (priv->dm.black_fop_cnt == 0 &&
+ priv->dm.drain_wait_black == _gf_true) {
+ ret = pthread_cond_signal (
+ &priv->dm.drain_black_cond);
+ CHANGELOG_PTHREAD_ERROR_HANDLE_0 (ret,
+ out);
+ gf_log (this->name, GF_LOG_DEBUG,
+ "Signalled draining of black");
+ }
+ }
+ ret = pthread_mutex_unlock(&priv->dm.drain_black_mutex);
+ CHANGELOG_PTHREAD_ERROR_HANDLE_0 (ret, out);
+ } else {
+ ret = pthread_mutex_lock (&priv->dm.drain_white_mutex);
+ CHANGELOG_PTHREAD_ERROR_HANDLE_0 (ret, out);
+ {
+ priv->dm.white_fop_cnt--;
+ if (priv->dm.white_fop_cnt == 0 &&
+ priv->dm.drain_wait_white == _gf_true) {
+ ret = pthread_cond_signal (
+ &priv->dm.drain_white_cond);
+ CHANGELOG_PTHREAD_ERROR_HANDLE_0 (ret,
+ out);
+ gf_log (this->name, GF_LOG_DEBUG,
+ "Signalled draining of white");
+ }
+ }
+ ret = pthread_mutex_unlock(&priv->dm.drain_white_mutex);
+ CHANGELOG_PTHREAD_ERROR_HANDLE_0 (ret, out);
+ }
+ }
+ out:
+ return;
+}
+
+/* Write to a pipe setup between changelog main thread and changelog
+ * rollover thread to initiate explicit rollover of changelog journal.
+ */
+inline int
+changelog_barrier_notify (changelog_priv_t *priv, char *buf)
+{
+ int ret = 0;
+
+ LOCK(&priv->lock);
+ ret = changelog_write (priv->cr_wfd, buf, 1);
+ UNLOCK(&priv->lock);
+ return ret;
+}
+
+/* Clean up flags set on barrier notification */
+/*TODO: Add changelog barrier stop code with changelog barrier patch*/
+inline void
+changelog_barrier_cleanup (xlator_t *this, changelog_priv_t *priv)
+{
+ int ret = 0;
+
+ LOCK (&priv->bflags.lock);
+ priv->bflags.barrier_ext = _gf_false;
+ UNLOCK (&priv->bflags.lock);
+
+ ret = pthread_mutex_lock (&priv->bn.bnotify_mutex);
+ CHANGELOG_PTHREAD_ERROR_HANDLE_0 (ret, out);
+ {
+ priv->bn.bnotify = _gf_false;
+ }
+ ret = pthread_mutex_unlock (&priv->bn.bnotify_mutex);
+ CHANGELOG_PTHREAD_ERROR_HANDLE_0 (ret, out);
+ out:
+ return;
+}
+/* End: Geo-Rep snapshot dependency changes */