diff options
Diffstat (limited to 'xlators/features/changelog/src/changelog.c')
-rw-r--r-- | xlators/features/changelog/src/changelog.c | 240 |
1 files changed, 217 insertions, 23 deletions
diff --git a/xlators/features/changelog/src/changelog.c b/xlators/features/changelog/src/changelog.c index 0a491c5ac07..1253a1a6829 100644 --- a/xlators/features/changelog/src/changelog.c +++ b/xlators/features/changelog/src/changelog.c @@ -69,12 +69,33 @@ changelog_rmdir_cbk (call_frame_t *frame, void *cookie, xlator_t *this, } int32_t +changelog_rmdir_resume (call_frame_t *frame, xlator_t *this, + loc_t *loc, int xflags, dict_t *xdata) +{ + changelog_priv_t *priv = NULL; + + priv = this->private; + + gf_log (this->name, GF_LOG_DEBUG, "Dequeue rmdir"); + changelog_color_fop_and_inc_cnt (this, priv, frame->local); + STACK_WIND (frame, changelog_rmdir_cbk, + FIRST_CHILD (this), FIRST_CHILD (this)->fops->rmdir, + loc, xflags, xdata); + return 0; +} + +int32_t changelog_rmdir (call_frame_t *frame, xlator_t *this, loc_t *loc, int xflags, dict_t *xdata) { - size_t xtra_len = 0; - changelog_priv_t *priv = NULL; - changelog_opt_t *co = NULL; + size_t xtra_len = 0; + changelog_priv_t *priv = NULL; + changelog_opt_t *co = NULL; + call_stub_t *stub = NULL; + struct list_head queue = {0, }; + gf_boolean_t barrier_enabled = _gf_false; + + INIT_LIST_HEAD (&queue); priv = this->private; CHANGELOG_NOT_ACTIVE_THEN_GOTO (frame, priv, wind); @@ -94,11 +115,49 @@ changelog_rmdir (call_frame_t *frame, xlator_t *this, changelog_set_usable_record_and_length (frame->local, xtra_len, 2); +/* changelog barrier */ + /* Color assignment and increment of fop_cnt for rmdir/unlink/rename + * should be made with in priv lock if changelog barrier is not enabled. + * Because if counter is not incremented yet, draining wakes up and + * publishes the changelog but later these fops might hit the disk and + * present in snapped volume but where as the intention is these fops + * should not be present in snapped volume. + */ + LOCK (&priv->lock); + { + if ((barrier_enabled = priv->barrier_enabled)) { + stub = fop_rmdir_stub (frame, changelog_rmdir_resume, + loc, xflags, xdata); + if (!stub) + __chlog_barrier_disable (this, &queue); + else + __chlog_barrier_enqueue (this, stub); + } else { + ((changelog_local_t *)frame->local)->color + = priv->current_color; + changelog_inc_fop_cnt (this, priv, frame->local); + } + } + UNLOCK (&priv->lock); + + if (barrier_enabled && stub) { + gf_log (this->name, GF_LOG_DEBUG, "Enqueue rmdir"); + goto out; + } + if (barrier_enabled && !stub) { + gf_log (this->name, GF_LOG_ERROR, + "Failed to barrier FOPs, disabling changelog barrier " + "FOP: rmdir, ERROR: %s", strerror (ENOMEM)); + chlog_barrier_dequeue_all (this, &queue); + } + +/* changelog barrier */ + wind: - changelog_color_fop_and_inc_cnt (this, priv, frame->local); STACK_WIND (frame, changelog_rmdir_cbk, FIRST_CHILD (this), FIRST_CHILD (this)->fops->rmdir, loc, xflags, xdata); + out: return 0; } @@ -127,12 +186,33 @@ changelog_unlink_cbk (call_frame_t *frame, void *cookie, xlator_t *this, } int32_t +changelog_unlink_resume (call_frame_t *frame, xlator_t *this, + loc_t *loc, int xflags, dict_t *xdata) +{ + changelog_priv_t *priv = NULL; + + priv = this->private; + + gf_log (this->name, GF_LOG_DEBUG, "Dequeue unlink"); + changelog_color_fop_and_inc_cnt (this, priv, frame->local); + STACK_WIND (frame, changelog_unlink_cbk, + FIRST_CHILD (this), FIRST_CHILD (this)->fops->unlink, + loc, xflags, xdata); + return 0; +} + +int32_t changelog_unlink (call_frame_t *frame, xlator_t *this, loc_t *loc, int xflags, dict_t *xdata) { - size_t xtra_len = 0; - changelog_priv_t *priv = NULL; - changelog_opt_t *co = NULL; + size_t xtra_len = 0; + changelog_priv_t *priv = NULL; + changelog_opt_t *co = NULL; + call_stub_t *stub = NULL; + struct list_head queue = {0, }; + gf_boolean_t barrier_enabled = _gf_false; + + INIT_LIST_HEAD (&queue); priv = this->private; CHANGELOG_NOT_ACTIVE_THEN_GOTO (frame, priv, wind); @@ -152,11 +232,42 @@ changelog_unlink (call_frame_t *frame, xlator_t *this, changelog_set_usable_record_and_length (frame->local, xtra_len, 2); +/* changelog barrier */ + LOCK (&priv->lock); + { + if ((barrier_enabled = priv->barrier_enabled)) { + stub = fop_unlink_stub (frame, changelog_unlink_resume, + loc, xflags, xdata); + if (!stub) + __chlog_barrier_disable (this, &queue); + else + __chlog_barrier_enqueue (this, stub); + } else { + ((changelog_local_t *)frame->local)->color + = priv->current_color; + changelog_inc_fop_cnt (this, priv, frame->local); + } + } + UNLOCK (&priv->lock); + + if (barrier_enabled && stub) { + gf_log (this->name, GF_LOG_DEBUG, "Enqueue unlink"); + goto out; + } + if (barrier_enabled && !stub) { + gf_log (this->name, GF_LOG_ERROR, + "Failed to barrier FOPs, disabling changelog barrier " + "FOP: unlink, ERROR: %s", strerror (ENOMEM)); + chlog_barrier_dequeue_all (this, &queue); + } + +/* changelog barrier */ + wind: - changelog_color_fop_and_inc_cnt (this, priv, frame->local); STACK_WIND (frame, changelog_unlink_cbk, FIRST_CHILD (this), FIRST_CHILD (this)->fops->unlink, loc, xflags, xdata); + out: return 0; } @@ -190,12 +301,33 @@ changelog_rename_cbk (call_frame_t *frame, int32_t +changelog_rename_resume (call_frame_t *frame, xlator_t *this, + loc_t *oldloc, loc_t *newloc, dict_t *xdata) +{ + changelog_priv_t *priv = NULL; + + priv = this->private; + + gf_log (this->name, GF_LOG_DEBUG, "Dequeue rename"); + changelog_color_fop_and_inc_cnt (this, priv, frame->local); + STACK_WIND (frame, changelog_rename_cbk, + FIRST_CHILD (this), FIRST_CHILD (this)->fops->rename, + oldloc, newloc, xdata); + return 0; +} + +int32_t changelog_rename (call_frame_t *frame, xlator_t *this, loc_t *oldloc, loc_t *newloc, dict_t *xdata) { - size_t xtra_len = 0; - changelog_priv_t *priv = NULL; - changelog_opt_t *co = NULL; + size_t xtra_len = 0; + changelog_priv_t *priv = NULL; + changelog_opt_t *co = NULL; + call_stub_t *stub = NULL; + struct list_head queue = {0, }; + gf_boolean_t barrier_enabled = _gf_false; + + INIT_LIST_HEAD (&queue); priv = this->private; CHANGELOG_NOT_ACTIVE_THEN_GOTO (frame, priv, wind); @@ -219,12 +351,41 @@ changelog_rename (call_frame_t *frame, xlator_t *this, entry_fn, entry_free_fn, xtra_len, wind); changelog_set_usable_record_and_length (frame->local, xtra_len, 3); +/* changelog barrier */ + LOCK (&priv->lock); + { + if ((barrier_enabled = priv->barrier_enabled)) { + stub = fop_rename_stub (frame, changelog_rename_resume, + oldloc, newloc, xdata); + if (!stub) + __chlog_barrier_disable (this, &queue); + else + __chlog_barrier_enqueue (this, stub); + } else { + ((changelog_local_t *)frame->local)->color + = priv->current_color; + changelog_inc_fop_cnt (this, priv, frame->local); + } + } + UNLOCK (&priv->lock); + + if (barrier_enabled && stub) { + gf_log (this->name, GF_LOG_DEBUG, "Enqueue rename"); + goto out; + } + if (barrier_enabled && !stub) { + gf_log (this->name, GF_LOG_ERROR, + "Failed to barrier FOPs, disabling changelog barrier " + "FOP: rename, ERROR: %s", strerror (ENOMEM)); + chlog_barrier_dequeue_all (this, &queue); + } +/* changelog barrier */ wind: - changelog_color_fop_and_inc_cnt (this, priv, frame->local); STACK_WIND (frame, changelog_rename_cbk, FIRST_CHILD (this), FIRST_CHILD (this)->fops->rename, oldloc, newloc, xdata); + out: return 0; } @@ -1265,12 +1426,15 @@ changelog_spawn_notifier (xlator_t *this, changelog_priv_t *priv) int notify (xlator_t *this, int event, void *data, ...) { - changelog_priv_t *priv = NULL; - dict_t *dict = NULL; - char buf[1] = {1}; - int barrier = DICT_DEFAULT; - gf_boolean_t bclean_req = _gf_false; - int ret = 0; + changelog_priv_t *priv = NULL; + dict_t *dict = NULL; + char buf[1] = {1}; + int barrier = DICT_DEFAULT; + gf_boolean_t bclean_req = _gf_false; + int ret = 0; + struct list_head queue = {0, }; + + INIT_LIST_HEAD (&queue); priv = this->private; if (!priv) @@ -1311,14 +1475,33 @@ notify (xlator_t *this, int event, void *data, ...) goto out; } - /*TODO: STOP CHANGELOG BARRIER */ + /* Stop changelog barrier and dequeue all fops */ + LOCK (&priv->lock); + { + if (priv->barrier_enabled == _gf_true) + __chlog_barrier_disable (this, &queue); + else + ret = -1; + } + UNLOCK (&priv->lock); + /* If ret = -1, then changelog barrier is already + * disabled because of error or timeout. + */ + if (ret == 0) { + chlog_barrier_dequeue_all(this, &queue); + gf_log(this->name, GF_LOG_DEBUG, + "Disabled changelog barrier"); + } else { + gf_log (this->name, GF_LOG_ERROR, + "Changelog barrier already disabled"); + } + LOCK (&priv->bflags.lock); { priv->bflags.barrier_ext = _gf_false; } UNLOCK (&priv->bflags.lock); - ret = 0; goto out; case BARRIER_ON: @@ -1353,13 +1536,20 @@ notify (xlator_t *this, int event, void *data, ...) CHANGELOG_PTHREAD_ERROR_HANDLE_1 (ret, out, bclean_req); - /*TODO: START CHANGELOG BARRIER */ + /* Start changelog barrier */ + LOCK (&priv->lock); + { + priv->barrier_enabled = _gf_true; + } + UNLOCK (&priv->lock); + gf_log(this->name, GF_LOG_DEBUG, + "Enabled changelog barrier"); ret = changelog_barrier_notify(priv, buf); if (ret) { gf_log (this->name, GF_LOG_ERROR, "Explicit roll over: write failed"); - changelog_barrier_cleanup (this, priv); + changelog_barrier_cleanup (this, priv, &queue); ret = -1; goto out; } @@ -1406,7 +1596,7 @@ notify (xlator_t *this, int event, void *data, ...) out: if (bclean_req) - changelog_barrier_cleanup (this, priv); + changelog_barrier_cleanup (this, priv, &queue); return ret; } @@ -1787,6 +1977,10 @@ init (xlator_t *this) cond_lock_init = _gf_true; priv->bflags.barrier_ext = _gf_false; + /* Changelog barrier init */ + INIT_LIST_HEAD (&priv->queue); + priv->barrier_enabled = _gf_false; + ret = changelog_init (this, priv); if (ret) goto out; |