diff options
Diffstat (limited to 'xlators/features/barrier/src/barrier.c')
| -rw-r--r-- | xlators/features/barrier/src/barrier.c | 809 |
1 files changed, 809 insertions, 0 deletions
diff --git a/xlators/features/barrier/src/barrier.c b/xlators/features/barrier/src/barrier.c new file mode 100644 index 00000000000..852bbacb99d --- /dev/null +++ b/xlators/features/barrier/src/barrier.c @@ -0,0 +1,809 @@ +/* + Copyright (c) 2014 Red Hat, Inc. <http://www.redhat.com> + This file is part of GlusterFS. + + This file is licensed to you under your choice of the GNU Lesser + General Public License, version 3 or any later version (LGPLv3 or + later), or the GNU General Public License, version 2 (GPLv2), in all + cases as published by the Free Software Foundation. +*/ + +#include "barrier.h" +#include <glusterfs/defaults.h> +#include <glusterfs/call-stub.h> + +#include <glusterfs/statedump.h> + +void +barrier_local_set_gfid(call_frame_t *frame, uuid_t gfid, xlator_t *this) +{ + if (gfid) { + uuid_t *id = GF_MALLOC(sizeof(uuid_t), gf_common_mt_uuid_t); + if (!id) { + gf_log(this->name, GF_LOG_WARNING, + "Could not set gfid" + ". gfid will not be dumped in statedump file."); + return; + } + gf_uuid_copy(*id, gfid); + frame->local = id; + } +} + +void +barrier_local_free_gfid(call_frame_t *frame) +{ + if (frame->local) { + GF_FREE(frame->local); + frame->local = NULL; + } +} + +int32_t +barrier_truncate_cbk_resume(call_frame_t *frame, void *cookie, xlator_t *this, + int32_t op_ret, int32_t op_errno, + struct iatt *prebuf, struct iatt *postbuf, + dict_t *xdata) +{ + barrier_local_free_gfid(frame); + STACK_UNWIND_STRICT(truncate, frame, op_ret, op_errno, prebuf, postbuf, + xdata); + return 0; +} + +int32_t +barrier_ftruncate_cbk_resume(call_frame_t *frame, void *cookie, xlator_t *this, + int32_t op_ret, int32_t op_errno, + struct iatt *prebuf, struct iatt *postbuf, + dict_t *xdata) +{ + barrier_local_free_gfid(frame); + STACK_UNWIND_STRICT(ftruncate, frame, op_ret, op_errno, prebuf, postbuf, + xdata); + return 0; +} + +int32_t +barrier_unlink_cbk_resume(call_frame_t *frame, void *cookie, xlator_t *this, + int32_t op_ret, int32_t op_errno, + struct iatt *preparent, struct iatt *postparent, + dict_t *xdata) +{ + barrier_local_free_gfid(frame); + STACK_UNWIND_STRICT(unlink, frame, op_ret, op_errno, preparent, postparent, + xdata); + return 0; +} + +int32_t +barrier_rmdir_cbk_resume(call_frame_t *frame, void *cookie, xlator_t *this, + int32_t op_ret, int32_t op_errno, + struct iatt *preparent, struct iatt *postparent, + dict_t *xdata) +{ + barrier_local_free_gfid(frame); + STACK_UNWIND_STRICT(rmdir, frame, op_ret, op_errno, preparent, postparent, + xdata); + return 0; +} + +int32_t +barrier_rename_cbk_resume(call_frame_t *frame, void *cookie, xlator_t *this, + int32_t op_ret, int32_t op_errno, struct iatt *buf, + struct iatt *preoldparent, struct iatt *postoldparent, + struct iatt *prenewparent, struct iatt *postnewparent, + dict_t *xdata) +{ + barrier_local_free_gfid(frame); + STACK_UNWIND_STRICT(rename, frame, op_ret, op_errno, buf, preoldparent, + postoldparent, prenewparent, postnewparent, xdata); + return 0; +} + +int32_t +barrier_writev_cbk_resume(call_frame_t *frame, void *cookie, xlator_t *this, + int32_t op_ret, int32_t op_errno, struct iatt *prebuf, + struct iatt *postbuf, dict_t *xdata) +{ + barrier_local_free_gfid(frame); + STACK_UNWIND_STRICT(writev, frame, op_ret, op_errno, prebuf, postbuf, + xdata); + return 0; +} + +int32_t +barrier_fsync_cbk_resume(call_frame_t *frame, void *cookie, xlator_t *this, + int32_t op_ret, int32_t op_errno, struct iatt *prebuf, + struct iatt *postbuf, dict_t *xdata) +{ + barrier_local_free_gfid(frame); + STACK_UNWIND_STRICT(fsync, frame, op_ret, op_errno, prebuf, postbuf, xdata); + return 0; +} + +int32_t +barrier_removexattr_cbk_resume(call_frame_t *frame, void *cookie, + xlator_t *this, int32_t op_ret, int32_t op_errno, + dict_t *xdata) +{ + barrier_local_free_gfid(frame); + STACK_UNWIND_STRICT(removexattr, frame, op_ret, op_errno, xdata); + return 0; +} + +int32_t +barrier_fremovexattr_cbk_resume(call_frame_t *frame, void *cookie, + xlator_t *this, int32_t op_ret, + int32_t op_errno, dict_t *xdata) +{ + barrier_local_free_gfid(frame); + STACK_UNWIND_STRICT(fremovexattr, frame, op_ret, op_errno, xdata); + return 0; +} + +int32_t +barrier_writev_cbk(call_frame_t *frame, void *cookie, xlator_t *this, + int32_t op_ret, int32_t op_errno, struct iatt *prebuf, + struct iatt *postbuf, dict_t *xdata) +{ + BARRIER_FOP_CBK(writev, out, frame, this, op_ret, op_errno, prebuf, postbuf, + xdata); +out: + return 0; +} + +int32_t +barrier_fremovexattr_cbk(call_frame_t *frame, void *cookie, xlator_t *this, + int32_t op_ret, int32_t op_errno, dict_t *xdata) +{ + BARRIER_FOP_CBK(fremovexattr, out, frame, this, op_ret, op_errno, xdata); +out: + return 0; +} + +int32_t +barrier_removexattr_cbk(call_frame_t *frame, void *cookie, xlator_t *this, + int32_t op_ret, int32_t op_errno, dict_t *xdata) +{ + BARRIER_FOP_CBK(removexattr, out, frame, this, op_ret, op_errno, xdata); +out: + return 0; +} + +int32_t +barrier_truncate_cbk(call_frame_t *frame, void *cookie, xlator_t *this, + int32_t op_ret, int32_t op_errno, struct iatt *prebuf, + struct iatt *postbuf, dict_t *xdata) +{ + BARRIER_FOP_CBK(truncate, out, frame, this, op_ret, op_errno, prebuf, + postbuf, xdata); +out: + return 0; +} + +int32_t +barrier_ftruncate_cbk(call_frame_t *frame, void *cookie, xlator_t *this, + int32_t op_ret, int32_t op_errno, struct iatt *prebuf, + struct iatt *postbuf, dict_t *xdata) +{ + BARRIER_FOP_CBK(ftruncate, out, frame, this, op_ret, op_errno, prebuf, + postbuf, xdata); +out: + return 0; +} + +int32_t +barrier_rename_cbk(call_frame_t *frame, void *cookie, xlator_t *this, + int32_t op_ret, int32_t op_errno, struct iatt *buf, + struct iatt *preoldparent, struct iatt *postoldparent, + struct iatt *prenewparent, struct iatt *postnewparent, + dict_t *xdata) +{ + BARRIER_FOP_CBK(rename, out, frame, this, op_ret, op_errno, buf, + preoldparent, postoldparent, prenewparent, postnewparent, + xdata); +out: + return 0; +} + +int32_t +barrier_rmdir_cbk(call_frame_t *frame, void *cookie, xlator_t *this, + int32_t op_ret, int32_t op_errno, struct iatt *preparent, + struct iatt *postparent, dict_t *xdata) +{ + BARRIER_FOP_CBK(rmdir, out, frame, this, op_ret, op_errno, preparent, + postparent, xdata); +out: + return 0; +} + +int32_t +barrier_unlink_cbk(call_frame_t *frame, void *cookie, xlator_t *this, + int32_t op_ret, int32_t op_errno, struct iatt *preparent, + struct iatt *postparent, dict_t *xdata) +{ + BARRIER_FOP_CBK(unlink, out, frame, this, op_ret, op_errno, preparent, + postparent, xdata); +out: + return 0; +} + +int32_t +barrier_fsync_cbk(call_frame_t *frame, void *cookie, xlator_t *this, + int32_t op_ret, int32_t op_errno, struct iatt *prebuf, + struct iatt *postbuf, dict_t *xdata) +{ + BARRIER_FOP_CBK(fsync, out, frame, this, op_ret, op_errno, prebuf, postbuf, + xdata); +out: + return 0; +} + +int32_t +barrier_writev(call_frame_t *frame, xlator_t *this, fd_t *fd, + struct iovec *vector, int32_t count, off_t off, uint32_t flags, + struct iobref *iobref, dict_t *xdata) +{ + if (!((flags | fd->flags) & (O_SYNC | O_DSYNC))) { + STACK_WIND_TAIL(frame, FIRST_CHILD(this), + FIRST_CHILD(this)->fops->writev, fd, vector, count, off, + flags, iobref, xdata); + + return 0; + } + + barrier_local_set_gfid(frame, fd->inode->gfid, this); + STACK_WIND(frame, barrier_writev_cbk, FIRST_CHILD(this), + FIRST_CHILD(this)->fops->writev, fd, vector, count, off, flags, + iobref, xdata); + return 0; +} + +int32_t +barrier_fremovexattr(call_frame_t *frame, xlator_t *this, fd_t *fd, + const char *name, dict_t *xdata) +{ + barrier_local_set_gfid(frame, fd->inode->gfid, this); + STACK_WIND(frame, barrier_fremovexattr_cbk, FIRST_CHILD(this), + FIRST_CHILD(this)->fops->fremovexattr, fd, name, xdata); + return 0; +} + +int32_t +barrier_removexattr(call_frame_t *frame, xlator_t *this, loc_t *loc, + const char *name, dict_t *xdata) +{ + barrier_local_set_gfid(frame, loc->inode->gfid, this); + STACK_WIND(frame, barrier_removexattr_cbk, FIRST_CHILD(this), + FIRST_CHILD(this)->fops->removexattr, loc, name, xdata); + return 0; +} + +int32_t +barrier_truncate(call_frame_t *frame, xlator_t *this, loc_t *loc, off_t offset, + dict_t *xdata) +{ + barrier_local_set_gfid(frame, loc->inode->gfid, this); + STACK_WIND(frame, barrier_truncate_cbk, FIRST_CHILD(this), + FIRST_CHILD(this)->fops->truncate, loc, offset, xdata); + return 0; +} + +int32_t +barrier_rename(call_frame_t *frame, xlator_t *this, loc_t *oldloc, + loc_t *newloc, dict_t *xdata) +{ + barrier_local_set_gfid(frame, oldloc->inode->gfid, this); + STACK_WIND(frame, barrier_rename_cbk, FIRST_CHILD(this), + FIRST_CHILD(this)->fops->rename, oldloc, newloc, xdata); + return 0; +} + +int +barrier_rmdir(call_frame_t *frame, xlator_t *this, loc_t *loc, int flags, + dict_t *xdata) +{ + barrier_local_set_gfid(frame, loc->inode->gfid, this); + STACK_WIND(frame, barrier_rmdir_cbk, FIRST_CHILD(this), + FIRST_CHILD(this)->fops->rmdir, loc, flags, xdata); + return 0; +} + +int32_t +barrier_unlink(call_frame_t *frame, xlator_t *this, loc_t *loc, int xflag, + dict_t *xdata) +{ + barrier_local_set_gfid(frame, loc->inode->gfid, this); + STACK_WIND(frame, barrier_unlink_cbk, FIRST_CHILD(this), + FIRST_CHILD(this)->fops->unlink, loc, xflag, xdata); + return 0; +} + +int32_t +barrier_ftruncate(call_frame_t *frame, xlator_t *this, fd_t *fd, off_t offset, + dict_t *xdata) +{ + barrier_local_set_gfid(frame, fd->inode->gfid, this); + STACK_WIND(frame, barrier_ftruncate_cbk, FIRST_CHILD(this), + FIRST_CHILD(this)->fops->ftruncate, fd, offset, xdata); + return 0; +} + +int32_t +barrier_fsync(call_frame_t *frame, xlator_t *this, fd_t *fd, int32_t flags, + dict_t *xdata) +{ + barrier_local_set_gfid(frame, fd->inode->gfid, this); + STACK_WIND(frame, barrier_fsync_cbk, FIRST_CHILD(this), + FIRST_CHILD(this)->fops->fsync, fd, flags, xdata); + return 0; +} + +call_stub_t * +__barrier_dequeue(xlator_t *this, struct list_head *queue) +{ + call_stub_t *stub = NULL; + barrier_priv_t *priv = NULL; + + priv = this->private; + GF_ASSERT(priv); + + if (list_empty(queue)) + goto out; + + stub = list_entry(queue->next, call_stub_t, list); + list_del_init(&stub->list); + +out: + return stub; +} + +void +barrier_dequeue_all(xlator_t *this, struct list_head *queue) +{ + call_stub_t *stub = NULL; + + gf_log(this->name, GF_LOG_INFO, "Dequeuing all the barriered fops"); + + /* TODO: Start the below task in a new thread */ + while ((stub = __barrier_dequeue(this, queue))) + call_resume(stub); + + gf_log(this->name, GF_LOG_INFO, + "Dequeuing the barriered fops is " + "finished"); + return; +} + +void +barrier_timeout(void *data) +{ + xlator_t *this = NULL; + barrier_priv_t *priv = NULL; + struct list_head queue = { + 0, + }; + + this = data; + THIS = this; + priv = this->private; + + INIT_LIST_HEAD(&queue); + + gf_log(this->name, GF_LOG_CRITICAL, + "Disabling barrier because of " + "the barrier timeout."); + + LOCK(&priv->lock); + { + __barrier_disable(this, &queue); + } + UNLOCK(&priv->lock); + + barrier_dequeue_all(this, &queue); + + return; +} + +void +__barrier_enqueue(xlator_t *this, call_stub_t *stub) +{ + barrier_priv_t *priv = NULL; + + priv = this->private; + GF_ASSERT(priv); + + list_add_tail(&stub->list, &priv->queue); + priv->queue_size++; + + return; +} + +void +__barrier_disable(xlator_t *this, struct list_head *queue) +{ + GF_UNUSED int ret = 0; + barrier_priv_t *priv = NULL; + + priv = this->private; + GF_ASSERT(priv); + + if (priv->timer) { + ret = gf_timer_call_cancel(this->ctx, priv->timer); + priv->timer = NULL; + } + + list_splice_init(&priv->queue, queue); + priv->queue_size = 0; + priv->barrier_enabled = _gf_false; +} + +int +__barrier_enable(xlator_t *this, barrier_priv_t *priv) +{ + int ret = -1; + + priv->timer = gf_timer_call_after(this->ctx, priv->timeout, barrier_timeout, + (void *)this); + if (!priv->timer) { + gf_log(this->name, GF_LOG_CRITICAL, + "Couldn't add barrier " + "timeout event."); + goto out; + } + + priv->barrier_enabled = _gf_true; + ret = 0; +out: + return ret; +} + +int +notify(xlator_t *this, int event, void *data, ...) +{ + barrier_priv_t *priv = this->private; + dict_t *dict = NULL; + int ret = -1; + int barrier_enabled = _gf_false; + struct list_head queue = { + 0, + }; + + GF_ASSERT(priv); + INIT_LIST_HEAD(&queue); + + switch (event) { + case GF_EVENT_TRANSLATOR_OP: { + dict = data; + barrier_enabled = dict_get_str_boolean(dict, "barrier", -1); + + if (barrier_enabled == -1) { + gf_log(this->name, GF_LOG_ERROR, + "Could not fetch " + " barrier key from the dictionary."); + goto out; + } + + LOCK(&priv->lock); + { + if (!priv->barrier_enabled) { + if (barrier_enabled) { + ret = __barrier_enable(this, priv); + } else { + UNLOCK(&priv->lock); + gf_log(this->name, GF_LOG_ERROR, "Already disabled."); + goto post_unlock; + } + } else { + if (!barrier_enabled) { + __barrier_disable(this, &queue); + ret = 0; + } else { + UNLOCK(&priv->lock); + gf_log(this->name, GF_LOG_ERROR, "Already enabled"); + goto post_unlock; + } + } + } + UNLOCK(&priv->lock); + post_unlock: + if (!list_empty(&queue)) + barrier_dequeue_all(this, &queue); + + break; + } + default: { + default_notify(this, event, data); + ret = 0; + goto out; + } + } +out: + return ret; +} + +int +reconfigure(xlator_t *this, dict_t *options) +{ + barrier_priv_t *priv = NULL; + int ret = -1; + gf_boolean_t barrier_enabled = _gf_false; + uint32_t timeout = { + 0, + }; + struct list_head queue = { + 0, + }; + + priv = this->private; + GF_ASSERT(priv); + + GF_OPTION_RECONF("barrier", barrier_enabled, options, bool, out); + GF_OPTION_RECONF("barrier-timeout", timeout, options, time, out); + + INIT_LIST_HEAD(&queue); + + LOCK(&priv->lock); + { + if (!priv->barrier_enabled) { + if (barrier_enabled) { + ret = __barrier_enable(this, priv); + if (ret) { + goto unlock; + } + } + } else { + if (!barrier_enabled) { + __barrier_disable(this, &queue); + } + } + priv->timeout.tv_sec = timeout; + ret = 0; + } +unlock: + UNLOCK(&priv->lock); + + if (!list_empty(&queue)) + barrier_dequeue_all(this, &queue); + +out: + return ret; +} + +int32_t +mem_acct_init(xlator_t *this) +{ + int ret = -1; + + ret = xlator_mem_acct_init(this, gf_barrier_mt_end + 1); + if (ret) + gf_log(this->name, GF_LOG_ERROR, + "Memory accounting " + "initialization failed."); + + return ret; +} + +int +init(xlator_t *this) +{ + int ret = -1; + barrier_priv_t *priv = NULL; + uint32_t timeout = { + 0, + }; + + if (!this->children || this->children->next) { + gf_log(this->name, GF_LOG_ERROR, + "'barrier' not configured with exactly one child"); + goto out; + } + + if (!this->parents) + gf_log(this->name, GF_LOG_WARNING, "dangling volume. check volfile "); + + priv = GF_CALLOC(1, sizeof(*priv), gf_barrier_mt_priv_t); + if (!priv) + goto out; + + LOCK_INIT(&priv->lock); + + GF_OPTION_INIT("barrier", priv->barrier_enabled, bool, out); + GF_OPTION_INIT("barrier-timeout", timeout, time, out); + priv->timeout.tv_sec = timeout; + + INIT_LIST_HEAD(&priv->queue); + + if (priv->barrier_enabled) { + ret = __barrier_enable(this, priv); + if (ret == -1) + goto out; + } + + this->private = priv; + ret = 0; +out: + if (ret && priv) + GF_FREE(priv); + + return ret; +} + +void +fini(xlator_t *this) +{ + barrier_priv_t *priv = NULL; + struct list_head queue = { + 0, + }; + + priv = this->private; + if (!priv) + goto out; + + INIT_LIST_HEAD(&queue); + + gf_log(this->name, GF_LOG_INFO, + "Disabling barriering and dequeuing " + "all the queued fops"); + LOCK(&priv->lock); + { + __barrier_disable(this, &queue); + } + UNLOCK(&priv->lock); + + if (!list_empty(&queue)) + barrier_dequeue_all(this, &queue); + + this->private = NULL; + + LOCK_DESTROY(&priv->lock); + GF_FREE(priv); +out: + return; +} + +static void +barrier_dump_stub(call_stub_t *stub, char *prefix) +{ + char key[GF_DUMP_MAX_BUF_LEN] = { + 0, + }; + + gf_proc_dump_build_key(key, prefix, "fop"); + gf_proc_dump_write(key, "%s", gf_fop_list[stub->fop]); + + if (stub->frame->local) { + gf_proc_dump_build_key(key, prefix, "gfid"); + gf_proc_dump_write(key, "%s", + uuid_utoa(*(uuid_t *)(stub->frame->local))); + } + if (stub->args.loc.path) { + gf_proc_dump_build_key(key, prefix, "path"); + gf_proc_dump_write(key, "%s", stub->args.loc.path); + } + if (stub->args.loc.name) { + gf_proc_dump_build_key(key, prefix, "name"); + gf_proc_dump_write(key, "%s", stub->args.loc.name); + } + + return; +} + +static void +__barrier_dump_queue(barrier_priv_t *priv) +{ + call_stub_t *stub = NULL; + char key[GF_DUMP_MAX_BUF_LEN] = { + 0, + }; + int i = 0; + + GF_VALIDATE_OR_GOTO("barrier", priv, out); + + list_for_each_entry(stub, &priv->queue, list) + { + snprintf(key, sizeof(key), "stub.%d", i++); + gf_proc_dump_add_section("%s", key); + barrier_dump_stub(stub, key); + } + +out: + return; +} + +int +barrier_dump_priv(xlator_t *this) +{ + int ret = -1; + char key[GF_DUMP_MAX_BUF_LEN] = { + 0, + }; + barrier_priv_t *priv = NULL; + + GF_VALIDATE_OR_GOTO("barrier", this, out); + + priv = this->private; + if (!priv) + return 0; + + gf_proc_dump_build_key(key, "xlator.features.barrier", "priv"); + gf_proc_dump_add_section("%s", key); + gf_proc_dump_build_key(key, "barrier", "enabled"); + + LOCK(&priv->lock); + { + gf_proc_dump_write(key, "%d", priv->barrier_enabled); + gf_proc_dump_build_key(key, "barrier", "timeout"); + gf_proc_dump_write(key, "%ld", priv->timeout.tv_sec); + if (priv->barrier_enabled) { + gf_proc_dump_build_key(key, "barrier", "queue_size"); + gf_proc_dump_write(key, "%d", priv->queue_size); + __barrier_dump_queue(priv); + } + } + UNLOCK(&priv->lock); + +out: + return ret; +} + +struct xlator_fops fops = { + + /* Barrier Class fops */ + .rmdir = barrier_rmdir, + .unlink = barrier_unlink, + .rename = barrier_rename, + .removexattr = barrier_removexattr, + .fremovexattr = barrier_fremovexattr, + .truncate = barrier_truncate, + .ftruncate = barrier_ftruncate, + .fsync = barrier_fsync, + + /* Writes with only O_SYNC flag */ + .writev = barrier_writev, +}; + +struct xlator_dumpops dumpops = { + .priv = barrier_dump_priv, +}; + +struct xlator_cbks cbks; + +struct volume_options options[] = { + {.key = {"barrier"}, + .type = GF_OPTION_TYPE_BOOL, + .default_value = "disable", + .op_version = {GD_OP_VERSION_3_6_0}, + .flags = OPT_FLAG_SETTABLE, + .description = "When \"enabled\", blocks acknowledgements to application " + "for file operations such as rmdir, rename, unlink, " + "removexattr, fremovexattr, truncate, ftruncate, " + "write (with O_SYNC), fsync. It is turned \"off\" by " + "default."}, + {.key = {"barrier-timeout"}, + .type = GF_OPTION_TYPE_TIME, + .default_value = BARRIER_TIMEOUT, + .op_version = {GD_OP_VERSION_3_6_0}, + .flags = OPT_FLAG_SETTABLE, + .description = "After 'timeout' seconds since the time 'barrier' " + "option was set to \"on\", acknowledgements to file " + "operations are no longer blocked and previously " + "blocked acknowledgements are sent to the application"}, + {.key = {NULL}}, +}; + +xlator_api_t xlator_api = { + .init = init, + .fini = fini, + .notify = notify, + .reconfigure = reconfigure, + .mem_acct_init = mem_acct_init, + .op_version = {1}, /* Present from the initial version */ + .dumpops = &dumpops, + .fops = &fops, + .cbks = &cbks, + .options = options, + .identifier = "barrier", + .category = GF_MAINTAINED, +}; |
