summaryrefslogtreecommitdiffstats
path: root/xlators/features/changelog/src/changelog.c
diff options
context:
space:
mode:
Diffstat (limited to 'xlators/features/changelog/src/changelog.c')
-rw-r--r--xlators/features/changelog/src/changelog.c240
1 files changed, 217 insertions, 23 deletions
diff --git a/xlators/features/changelog/src/changelog.c b/xlators/features/changelog/src/changelog.c
index 0a491c5ac07..1253a1a6829 100644
--- a/xlators/features/changelog/src/changelog.c
+++ b/xlators/features/changelog/src/changelog.c
@@ -69,12 +69,33 @@ changelog_rmdir_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
}
int32_t
+changelog_rmdir_resume (call_frame_t *frame, xlator_t *this,
+ loc_t *loc, int xflags, dict_t *xdata)
+{
+ changelog_priv_t *priv = NULL;
+
+ priv = this->private;
+
+ gf_log (this->name, GF_LOG_DEBUG, "Dequeue rmdir");
+ changelog_color_fop_and_inc_cnt (this, priv, frame->local);
+ STACK_WIND (frame, changelog_rmdir_cbk,
+ FIRST_CHILD (this), FIRST_CHILD (this)->fops->rmdir,
+ loc, xflags, xdata);
+ return 0;
+}
+
+int32_t
changelog_rmdir (call_frame_t *frame, xlator_t *this,
loc_t *loc, int xflags, dict_t *xdata)
{
- size_t xtra_len = 0;
- changelog_priv_t *priv = NULL;
- changelog_opt_t *co = NULL;
+ size_t xtra_len = 0;
+ changelog_priv_t *priv = NULL;
+ changelog_opt_t *co = NULL;
+ call_stub_t *stub = NULL;
+ struct list_head queue = {0, };
+ gf_boolean_t barrier_enabled = _gf_false;
+
+ INIT_LIST_HEAD (&queue);
priv = this->private;
CHANGELOG_NOT_ACTIVE_THEN_GOTO (frame, priv, wind);
@@ -94,11 +115,49 @@ changelog_rmdir (call_frame_t *frame, xlator_t *this,
changelog_set_usable_record_and_length (frame->local, xtra_len, 2);
+/* changelog barrier */
+ /* Color assignment and increment of fop_cnt for rmdir/unlink/rename
+ * should be made with in priv lock if changelog barrier is not enabled.
+ * Because if counter is not incremented yet, draining wakes up and
+ * publishes the changelog but later these fops might hit the disk and
+ * present in snapped volume but where as the intention is these fops
+ * should not be present in snapped volume.
+ */
+ LOCK (&priv->lock);
+ {
+ if ((barrier_enabled = priv->barrier_enabled)) {
+ stub = fop_rmdir_stub (frame, changelog_rmdir_resume,
+ loc, xflags, xdata);
+ if (!stub)
+ __chlog_barrier_disable (this, &queue);
+ else
+ __chlog_barrier_enqueue (this, stub);
+ } else {
+ ((changelog_local_t *)frame->local)->color
+ = priv->current_color;
+ changelog_inc_fop_cnt (this, priv, frame->local);
+ }
+ }
+ UNLOCK (&priv->lock);
+
+ if (barrier_enabled && stub) {
+ gf_log (this->name, GF_LOG_DEBUG, "Enqueue rmdir");
+ goto out;
+ }
+ if (barrier_enabled && !stub) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "Failed to barrier FOPs, disabling changelog barrier "
+ "FOP: rmdir, ERROR: %s", strerror (ENOMEM));
+ chlog_barrier_dequeue_all (this, &queue);
+ }
+
+/* changelog barrier */
+
wind:
- changelog_color_fop_and_inc_cnt (this, priv, frame->local);
STACK_WIND (frame, changelog_rmdir_cbk,
FIRST_CHILD (this), FIRST_CHILD (this)->fops->rmdir,
loc, xflags, xdata);
+ out:
return 0;
}
@@ -127,12 +186,33 @@ changelog_unlink_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
}
int32_t
+changelog_unlink_resume (call_frame_t *frame, xlator_t *this,
+ loc_t *loc, int xflags, dict_t *xdata)
+{
+ changelog_priv_t *priv = NULL;
+
+ priv = this->private;
+
+ gf_log (this->name, GF_LOG_DEBUG, "Dequeue unlink");
+ changelog_color_fop_and_inc_cnt (this, priv, frame->local);
+ STACK_WIND (frame, changelog_unlink_cbk,
+ FIRST_CHILD (this), FIRST_CHILD (this)->fops->unlink,
+ loc, xflags, xdata);
+ return 0;
+}
+
+int32_t
changelog_unlink (call_frame_t *frame, xlator_t *this,
loc_t *loc, int xflags, dict_t *xdata)
{
- size_t xtra_len = 0;
- changelog_priv_t *priv = NULL;
- changelog_opt_t *co = NULL;
+ size_t xtra_len = 0;
+ changelog_priv_t *priv = NULL;
+ changelog_opt_t *co = NULL;
+ call_stub_t *stub = NULL;
+ struct list_head queue = {0, };
+ gf_boolean_t barrier_enabled = _gf_false;
+
+ INIT_LIST_HEAD (&queue);
priv = this->private;
CHANGELOG_NOT_ACTIVE_THEN_GOTO (frame, priv, wind);
@@ -152,11 +232,42 @@ changelog_unlink (call_frame_t *frame, xlator_t *this,
changelog_set_usable_record_and_length (frame->local, xtra_len, 2);
+/* changelog barrier */
+ LOCK (&priv->lock);
+ {
+ if ((barrier_enabled = priv->barrier_enabled)) {
+ stub = fop_unlink_stub (frame, changelog_unlink_resume,
+ loc, xflags, xdata);
+ if (!stub)
+ __chlog_barrier_disable (this, &queue);
+ else
+ __chlog_barrier_enqueue (this, stub);
+ } else {
+ ((changelog_local_t *)frame->local)->color
+ = priv->current_color;
+ changelog_inc_fop_cnt (this, priv, frame->local);
+ }
+ }
+ UNLOCK (&priv->lock);
+
+ if (barrier_enabled && stub) {
+ gf_log (this->name, GF_LOG_DEBUG, "Enqueue unlink");
+ goto out;
+ }
+ if (barrier_enabled && !stub) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "Failed to barrier FOPs, disabling changelog barrier "
+ "FOP: unlink, ERROR: %s", strerror (ENOMEM));
+ chlog_barrier_dequeue_all (this, &queue);
+ }
+
+/* changelog barrier */
+
wind:
- changelog_color_fop_and_inc_cnt (this, priv, frame->local);
STACK_WIND (frame, changelog_unlink_cbk,
FIRST_CHILD (this), FIRST_CHILD (this)->fops->unlink,
loc, xflags, xdata);
+ out:
return 0;
}
@@ -190,12 +301,33 @@ changelog_rename_cbk (call_frame_t *frame,
int32_t
+changelog_rename_resume (call_frame_t *frame, xlator_t *this,
+ loc_t *oldloc, loc_t *newloc, dict_t *xdata)
+{
+ changelog_priv_t *priv = NULL;
+
+ priv = this->private;
+
+ gf_log (this->name, GF_LOG_DEBUG, "Dequeue rename");
+ changelog_color_fop_and_inc_cnt (this, priv, frame->local);
+ STACK_WIND (frame, changelog_rename_cbk,
+ FIRST_CHILD (this), FIRST_CHILD (this)->fops->rename,
+ oldloc, newloc, xdata);
+ return 0;
+}
+
+int32_t
changelog_rename (call_frame_t *frame, xlator_t *this,
loc_t *oldloc, loc_t *newloc, dict_t *xdata)
{
- size_t xtra_len = 0;
- changelog_priv_t *priv = NULL;
- changelog_opt_t *co = NULL;
+ size_t xtra_len = 0;
+ changelog_priv_t *priv = NULL;
+ changelog_opt_t *co = NULL;
+ call_stub_t *stub = NULL;
+ struct list_head queue = {0, };
+ gf_boolean_t barrier_enabled = _gf_false;
+
+ INIT_LIST_HEAD (&queue);
priv = this->private;
CHANGELOG_NOT_ACTIVE_THEN_GOTO (frame, priv, wind);
@@ -219,12 +351,41 @@ changelog_rename (call_frame_t *frame, xlator_t *this,
entry_fn, entry_free_fn, xtra_len, wind);
changelog_set_usable_record_and_length (frame->local, xtra_len, 3);
+/* changelog barrier */
+ LOCK (&priv->lock);
+ {
+ if ((barrier_enabled = priv->barrier_enabled)) {
+ stub = fop_rename_stub (frame, changelog_rename_resume,
+ oldloc, newloc, xdata);
+ if (!stub)
+ __chlog_barrier_disable (this, &queue);
+ else
+ __chlog_barrier_enqueue (this, stub);
+ } else {
+ ((changelog_local_t *)frame->local)->color
+ = priv->current_color;
+ changelog_inc_fop_cnt (this, priv, frame->local);
+ }
+ }
+ UNLOCK (&priv->lock);
+
+ if (barrier_enabled && stub) {
+ gf_log (this->name, GF_LOG_DEBUG, "Enqueue rename");
+ goto out;
+ }
+ if (barrier_enabled && !stub) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "Failed to barrier FOPs, disabling changelog barrier "
+ "FOP: rename, ERROR: %s", strerror (ENOMEM));
+ chlog_barrier_dequeue_all (this, &queue);
+ }
+/* changelog barrier */
wind:
- changelog_color_fop_and_inc_cnt (this, priv, frame->local);
STACK_WIND (frame, changelog_rename_cbk,
FIRST_CHILD (this), FIRST_CHILD (this)->fops->rename,
oldloc, newloc, xdata);
+ out:
return 0;
}
@@ -1265,12 +1426,15 @@ changelog_spawn_notifier (xlator_t *this, changelog_priv_t *priv)
int
notify (xlator_t *this, int event, void *data, ...)
{
- changelog_priv_t *priv = NULL;
- dict_t *dict = NULL;
- char buf[1] = {1};
- int barrier = DICT_DEFAULT;
- gf_boolean_t bclean_req = _gf_false;
- int ret = 0;
+ changelog_priv_t *priv = NULL;
+ dict_t *dict = NULL;
+ char buf[1] = {1};
+ int barrier = DICT_DEFAULT;
+ gf_boolean_t bclean_req = _gf_false;
+ int ret = 0;
+ struct list_head queue = {0, };
+
+ INIT_LIST_HEAD (&queue);
priv = this->private;
if (!priv)
@@ -1311,14 +1475,33 @@ notify (xlator_t *this, int event, void *data, ...)
goto out;
}
- /*TODO: STOP CHANGELOG BARRIER */
+ /* Stop changelog barrier and dequeue all fops */
+ LOCK (&priv->lock);
+ {
+ if (priv->barrier_enabled == _gf_true)
+ __chlog_barrier_disable (this, &queue);
+ else
+ ret = -1;
+ }
+ UNLOCK (&priv->lock);
+ /* If ret = -1, then changelog barrier is already
+ * disabled because of error or timeout.
+ */
+ if (ret == 0) {
+ chlog_barrier_dequeue_all(this, &queue);
+ gf_log(this->name, GF_LOG_DEBUG,
+ "Disabled changelog barrier");
+ } else {
+ gf_log (this->name, GF_LOG_ERROR,
+ "Changelog barrier already disabled");
+ }
+
LOCK (&priv->bflags.lock);
{
priv->bflags.barrier_ext = _gf_false;
}
UNLOCK (&priv->bflags.lock);
- ret = 0;
goto out;
case BARRIER_ON:
@@ -1353,13 +1536,20 @@ notify (xlator_t *this, int event, void *data, ...)
CHANGELOG_PTHREAD_ERROR_HANDLE_1 (ret, out,
bclean_req);
- /*TODO: START CHANGELOG BARRIER */
+ /* Start changelog barrier */
+ LOCK (&priv->lock);
+ {
+ priv->barrier_enabled = _gf_true;
+ }
+ UNLOCK (&priv->lock);
+ gf_log(this->name, GF_LOG_DEBUG,
+ "Enabled changelog barrier");
ret = changelog_barrier_notify(priv, buf);
if (ret) {
gf_log (this->name, GF_LOG_ERROR,
"Explicit roll over: write failed");
- changelog_barrier_cleanup (this, priv);
+ changelog_barrier_cleanup (this, priv, &queue);
ret = -1;
goto out;
}
@@ -1406,7 +1596,7 @@ notify (xlator_t *this, int event, void *data, ...)
out:
if (bclean_req)
- changelog_barrier_cleanup (this, priv);
+ changelog_barrier_cleanup (this, priv, &queue);
return ret;
}
@@ -1787,6 +1977,10 @@ init (xlator_t *this)
cond_lock_init = _gf_true;
priv->bflags.barrier_ext = _gf_false;
+ /* Changelog barrier init */
+ INIT_LIST_HEAD (&priv->queue);
+ priv->barrier_enabled = _gf_false;
+
ret = changelog_init (this, priv);
if (ret)
goto out;