diff options
author | Venky Shankar <vshankar@redhat.com> | 2015-02-03 19:22:16 +0530 |
---|---|---|
committer | Vijay Bellur <vbellur@redhat.com> | 2015-03-18 18:22:36 -0700 |
commit | 4737584fffcd25dbe35d17b076c95bf90a422cf2 (patch) | |
tree | 9f30e0e90c88c245787b78af3ca78d7ae05e30f2 /xlators/features/changelog/src/changelog.c | |
parent | 728fcd41eb39f66744d84b979dd8195fd47313ed (diff) |
features/changelog: RPC'fy {libgf}changelog
This patch introduces RPC based communication between the changelog
translator and libgfchangelog. It replaces the old pathetic stream
based interaction that existed earlier (due to time constraints :-/).
Changelog, upon initialization starts a RPC server (rpcsvc) allowing
clients to invoke a probe API as a bootup mechanism to request for
event notifications. During probe, clients can choose an event
filter specifying the type(s) of events they are interested in. As
of now there is no way to change the event notification set once
the probe RPC call is made, but that is easier to implement.
The actual event notifications is done on a separate RPC session.
The client (libgfchangelog) itself starts and RPC server which the
changelog translator "connects back" during probe. Notifications
are dispatched by a bunch of threads from the server (translator)
and the client optionally orders them if ordered notifications
are requried. FOPs fill in their respective event details in a
buffer (rot-buffs to be particular) and a bunch of threads
(consumers) swap the buffers out of roatation and dispatch them
via RPC. To avoid writer starvation, then number of dispatcher
threads is one less than the number of buffer list in rot-buffs.x
libgfchangelog becomes purely callback based -- upon event
notification from the server (and re-ordering them if required)
invoke a callback routine specified by consumer(s).
A major part of the patch is also aimed at providing backward
compatibility for geo-replication, which was one of the main
consumer of the stream based API. Also, this patch does not\
"turn on" event notifications for all fops, just a bunch which
is currently in requirement. Another pain point is that the
server does not filter events before dispatching it to the
clients. That load is taken up by the client itself (although
it's done at the library layer rather than making it hard on
the callback implementor). This needs improvement and care
needs to be taken to not load the server up with expensive
filtering mechanisms.
Change-Id: Ibf60a432b68f2dfa60c6f9add2bcfd37a9c41395
BUG: 1170075
Signed-off-by: Venky Shankar <vshankar@redhat.com>
Reviewed-on: http://review.gluster.org/9708
Reviewed-by: Jeff Darcy <jdarcy@redhat.com>
Tested-by: Gluster Build System <jenkins@build.gluster.com>
Reviewed-by: Vijay Bellur <vbellur@redhat.com>
Diffstat (limited to 'xlators/features/changelog/src/changelog.c')
-rw-r--r-- | xlators/features/changelog/src/changelog.c | 473 |
1 files changed, 293 insertions, 180 deletions
diff --git a/xlators/features/changelog/src/changelog.c b/xlators/features/changelog/src/changelog.c index 4263a462ad7..e7d8522ae8c 100644 --- a/xlators/features/changelog/src/changelog.c +++ b/xlators/features/changelog/src/changelog.c @@ -19,14 +19,13 @@ #include "iobuf.h" #include "changelog-rt.h" -#include "changelog-helpers.h" #include "changelog-encoders.h" #include "changelog-mem-types.h" #include <pthread.h> -#include "changelog-notifier.h" +#include "changelog-rpc.h" static struct changelog_bootstrap cb_bootstrap[] = { @@ -912,14 +911,30 @@ changelog_create_cbk (call_frame_t *frame, struct iatt *preparent, struct iatt *postparent, dict_t *xdata) { + int32_t ret = 0; changelog_priv_t *priv = NULL; changelog_local_t *local = NULL; + changelog_event_t ev = {0,}; priv = this->private; local = frame->local; CHANGELOG_COND_GOTO (priv, ((op_ret < 0) || !local), unwind); + /* fill the event structure.. similar to open() */ + ev.ev_type = CHANGELOG_OP_TYPE_CREATE; + uuid_copy (ev.u.create.gfid, buf->ia_gfid); + ev.u.create.flags = fd->flags; + changelog_dispatch_event (this, priv, &ev); + + if (changelog_ev_selected + (this, &priv->ev_selection, CHANGELOG_OP_TYPE_RELEASE)) { + ret = fd_ctx_set (fd, this, (uint64_t)(long) 0x1); + if (ret) + gf_log (this->name, GF_LOG_WARNING, + "could not set fd context (for release cbk)"); + } + changelog_update (this, priv, local, CHANGELOG_TYPE_ENTRY); unwind: @@ -1633,6 +1648,92 @@ changelog_writev (call_frame_t *frame, /* }}} */ +/* open, release and other beasts */ + +/* {{{ */ + + + +int +changelog_open_cbk (call_frame_t *frame, void *cookie, xlator_t *this, + int op_ret, int op_errno, fd_t *fd, dict_t *xdata) +{ + int ret = 0; + void *opaque = NULL; + char *buf = NULL; + ssize_t buflen = 0; + changelog_priv_t *priv = NULL; + changelog_event_t ev = {0,}; + gf_boolean_t logopen = _gf_false; + + priv = this->private; + if (frame->local) { + frame->local = NULL; + logopen = _gf_true; + } + + CHANGELOG_COND_GOTO (priv, ((op_ret < 0) || !logopen), unwind); + + /* fill the event structure */ + ev.ev_type = CHANGELOG_OP_TYPE_OPEN; + uuid_copy (ev.u.open.gfid, fd->inode->gfid); + ev.u.open.flags = fd->flags; + changelog_dispatch_event (this, priv, &ev); + + if (changelog_ev_selected + (this, &priv->ev_selection, CHANGELOG_OP_TYPE_RELEASE)) { + ret = fd_ctx_set (fd, this, (uint64_t)(long) 0x1); + if (ret) + gf_log (this->name, GF_LOG_WARNING, + "could not set fd context (for release cbk)"); + } + + unwind: + CHANGELOG_STACK_UNWIND (open, frame, op_ret, op_errno, fd, xdata); + return 0; +} + +int +changelog_open (call_frame_t *frame, xlator_t *this, + loc_t *loc, int flags, fd_t *fd, dict_t *xdata) +{ + changelog_priv_t *priv = NULL; + + priv = this->private; + CHANGELOG_NOT_ACTIVE_THEN_GOTO (frame, priv, wind); + + frame->local = (void *)0x1; /* do not dereference in ->cbk */ + + wind: + STACK_WIND (frame, changelog_open_cbk, FIRST_CHILD (this), + FIRST_CHILD (this)->fops->open, loc, flags, fd, xdata); + return 0; +} + +/* }}} */ + +/* {{{ */ + +int32_t +changelog_release (xlator_t *this, fd_t *fd) +{ + changelog_event_t ev = {0,}; + changelog_priv_t *priv = NULL; + + priv = this->private; + + ev.ev_type = CHANGELOG_OP_TYPE_RELEASE; + uuid_copy (ev.u.release.gfid, fd->inode->gfid); + changelog_dispatch_event (this, priv, &ev); + + (void) fd_ctx_del (fd, this, NULL); + + return 0; +} + + +/* }}} */ + /** * The * - @init () @@ -1679,7 +1780,7 @@ changelog_cleanup_helper_threads (xlator_t *this, changelog_priv_t *priv) int ret = 0; if (priv->cr.rollover_th) { - changelog_thread_cleanup (this, priv->cr.rollover_th); + (void) changelog_thread_cleanup (this, priv->cr.rollover_th); priv->cr.rollover_th = 0; ret = close (priv->cr_wfd); if (ret) @@ -1689,7 +1790,7 @@ changelog_cleanup_helper_threads (xlator_t *this, changelog_priv_t *priv) } if (priv->cf.fsync_th) { - changelog_thread_cleanup (this, priv->cf.fsync_th); + (void) changelog_thread_cleanup (this, priv->cf.fsync_th); priv->cf.fsync_th = 0; } } @@ -1754,67 +1855,6 @@ changelog_spawn_helper_threads (xlator_t *this, changelog_priv_t *priv) return ret; } -/* cleanup the notifier thread */ -static int -changelog_cleanup_notifier (xlator_t *this, changelog_priv_t *priv) -{ - int ret = 0; - - if (priv->cn.notify_th) { - changelog_thread_cleanup (this, priv->cn.notify_th); - priv->cn.notify_th = 0; - - ret = close (priv->wfd); - if (ret) - gf_log (this->name, GF_LOG_ERROR, - "error closing writer end of notifier pipe" - " (reason: %s)", strerror (errno)); - } - - return ret; -} - -/* spawn the notifier thread - nop if already running */ -static int -changelog_spawn_notifier (xlator_t *this, changelog_priv_t *priv) -{ - int ret = 0; - int flags = 0; - int pipe_fd[2] = {0, 0}; - - if (priv->cn.notify_th) - goto out; /* notifier thread already running */ - - ret = pipe (pipe_fd); - if (ret == -1) { - gf_log (this->name, GF_LOG_ERROR, - "Cannot create pipe (reason: %s)", strerror (errno)); - goto out; - } - - /* writer is non-blocking */ - flags = fcntl (pipe_fd[1], F_GETFL); - flags |= O_NONBLOCK; - - ret = fcntl (pipe_fd[1], F_SETFL, flags); - if (ret) { - gf_log (this->name, GF_LOG_ERROR, - "failed to set O_NONBLOCK flag"); - goto out; - } - - priv->wfd = pipe_fd[1]; - - priv->cn.this = this; - priv->cn.rfd = pipe_fd[0]; - - ret = gf_thread_create (&priv->cn.notify_th, - NULL, changelog_notifier, priv); - - out: - return ret; -} - int notify (xlator_t *this, int event, void *data, ...) { @@ -2054,11 +2094,6 @@ changelog_init (xlator_t *this, changelog_priv_t *priv) if (!priv->active) return ret; - /* spawn the notifier thread */ - ret = changelog_spawn_notifier (this, priv); - if (ret) - goto out; - /** * start with a fresh changelog file every time. this is done * in case there was an encoding change. so... things are kept @@ -2086,9 +2121,11 @@ changelog_init (xlator_t *this, changelog_priv_t *priv) return ret; } -/* Init all pthread condition variables and locks in changelog*/ +/** + * Init barrier related condition variables and locks + */ static int -changelog_pthread_init (xlator_t *this, changelog_priv_t *priv) +changelog_barrier_pthread_init (xlator_t *this, changelog_priv_t *priv) { gf_boolean_t bn_mutex_init = _gf_false; gf_boolean_t bn_cond_init = _gf_false; @@ -2165,9 +2202,9 @@ changelog_pthread_init (xlator_t *this, changelog_priv_t *priv) return ret; } -/* Destroy all pthread condition variables and locks in changelog */ +/* Destroy barrier related condition variables and locks */ static inline void -changelog_pthread_destroy (changelog_priv_t *priv) +changelog_barrier_pthread_destroy (changelog_priv_t *priv) { pthread_mutex_destroy (&priv->bn.bnotify_mutex); pthread_cond_destroy (&priv->bn.bnotify_cond); @@ -2284,17 +2321,13 @@ reconfigure (xlator_t *this, dict_t *options) } htime_open(this, priv, tv.tv_sec); } - ret = changelog_spawn_notifier (this, priv); - if (!ret) - ret = changelog_spawn_helper_threads (this, - priv); - } else - ret = changelog_cleanup_notifier (this, priv); + ret = changelog_spawn_helper_threads (this, priv); + } } out: if (ret) { - ret = changelog_cleanup_notifier (this, priv); + /* TODO */ } else { gf_log (this->name, GF_LOG_DEBUG, "changelog reconfigured"); @@ -2305,67 +2338,40 @@ reconfigure (xlator_t *this, dict_t *options) return ret; } -int32_t -init (xlator_t *this) +static void +changelog_freeup_options (xlator_t *this, changelog_priv_t *priv) { - int ret = -1; - char *tmp = NULL; - changelog_priv_t *priv = NULL; - gf_boolean_t cond_lock_init = _gf_false; - char htime_dir[PATH_MAX] = {0,}; - char csnap_dir[PATH_MAX] = {0,}; - uint32_t timeout = 0; - - GF_VALIDATE_OR_GOTO ("changelog", this, out); - - if (!this->children || this->children->next) { - gf_log (this->name, GF_LOG_ERROR, - "translator needs a single subvolume"); - goto out; - } - - if (!this->parents) { - gf_log (this->name, GF_LOG_ERROR, - "dangling volume. please check volfile"); - goto out; - } - - priv = GF_CALLOC (1, sizeof (*priv), gf_changelog_mt_priv_t); - if (!priv) - goto out; + int ret = 0; - this->local_pool = mem_pool_new (changelog_local_t, 64); - if (!this->local_pool) { + ret = priv->cb->dtor (this, &priv->cd); + if (ret) gf_log (this->name, GF_LOG_ERROR, - "failed to create local memory pool"); - goto out; - } - - LOCK_INIT (&priv->lock); - LOCK_INIT (&priv->c_snap_lock); + "could not cleanup bootstrapper"); + GF_FREE (priv->changelog_brick); + GF_FREE (priv->changelog_dir); +} - GF_OPTION_INIT ("changelog-brick", tmp, str, out); - if (!tmp) { - gf_log (this->name, GF_LOG_ERROR, - "\"changelog-brick\" option is not set"); - goto out; - } +static int +changelog_init_options (xlator_t *this, changelog_priv_t *priv) +{ + int ret = 0; + char *tmp = NULL; + uint32_t timeout = 0; + char htime_dir[PATH_MAX] = {0,}; + char csnap_dir[PATH_MAX] = {0,}; + GF_OPTION_INIT ("changelog-brick", tmp, str, error_return); priv->changelog_brick = gf_strdup (tmp); if (!priv->changelog_brick) - goto out; - tmp = NULL; + goto error_return; - GF_OPTION_INIT ("changelog-dir", tmp, str, out); - if (!tmp) { - gf_log (this->name, GF_LOG_ERROR, - "\"changelog-dir\" option is not set"); - goto out; - } + tmp = NULL; + GF_OPTION_INIT ("changelog-dir", tmp, str, dealloc_1); priv->changelog_dir = gf_strdup (tmp); if (!priv->changelog_dir) - goto out; + goto dealloc_1; + tmp = NULL; /** @@ -2375,35 +2381,38 @@ init (xlator_t *this) ret = mkdir_p (priv->changelog_dir, 0600, _gf_true); if (ret) - goto out; + goto dealloc_2; - CHANGELOG_FILL_HTIME_DIR(priv->changelog_dir, htime_dir); + CHANGELOG_FILL_HTIME_DIR (priv->changelog_dir, htime_dir); ret = mkdir_p (htime_dir, 0600, _gf_true); if (ret) - goto out; + goto dealloc_2; - CHANGELOG_FILL_CSNAP_DIR(priv->changelog_dir, csnap_dir); + CHANGELOG_FILL_CSNAP_DIR (priv->changelog_dir, csnap_dir); ret = mkdir_p (csnap_dir, 0600, _gf_true); if (ret) - goto out; + goto dealloc_2; - GF_OPTION_INIT ("changelog", priv->active, bool, out); + GF_OPTION_INIT ("changelog", priv->active, bool, dealloc_2); - GF_OPTION_INIT ("op-mode", tmp, str, out); + GF_OPTION_INIT ("op-mode", tmp, str, dealloc_2); changelog_assign_opmode (priv, tmp); tmp = NULL; - GF_OPTION_INIT ("encoding", tmp, str, out); + GF_OPTION_INIT ("encoding", tmp, str, dealloc_2); changelog_assign_encoding (priv, tmp); + changelog_encode_change (priv); - GF_OPTION_INIT ("rollover-time", priv->rollover_time, int32, out); + GF_OPTION_INIT ("rollover-time", + priv->rollover_time, int32, dealloc_2); - GF_OPTION_INIT ("fsync-interval", priv->fsync_interval, int32, out); - GF_OPTION_INIT ("changelog-barrier-timeout", timeout, time, out); - priv->timeout.tv_sec = timeout; + GF_OPTION_INIT ("fsync-interval", + priv->fsync_interval, int32, dealloc_2); - changelog_encode_change(priv); + GF_OPTION_INIT ("changelog-barrier-timeout", + timeout, time, dealloc_2); + changelog_assign_barrier_timeout (priv, timeout); GF_ASSERT (cb_bootstrap[priv->op_mode].mode == priv->op_mode); priv->cb = &cb_bootstrap[priv->op_mode]; @@ -2411,10 +2420,111 @@ init (xlator_t *this) /* ... now bootstrap the logger */ ret = priv->cb->ctor (this, &priv->cd); if (ret) - goto out; + goto dealloc_2; priv->changelog_fd = -1; + return 0; + + dealloc_2: + GF_FREE (priv->changelog_dir); + dealloc_1: + GF_FREE (priv->changelog_brick); + error_return: + return -1; +} + +static void +changelog_cleanup_rpc (xlator_t *this, changelog_priv_t *priv) +{ + /* terminate rpc server */ + changelog_destroy_rpc_listner (this, priv); + + /* cleanup rot buffs */ + rbuf_dtor (priv->rbuf); + + /* cleanup poller thread */ + (void) changelog_thread_cleanup (this, priv->poller); +} + +static int +changelog_init_rpc (xlator_t *this, changelog_priv_t *priv) +{ + int ret = 0; + rpcsvc_t *rpc = NULL; + changelog_ev_selector_t *selection = NULL; + + selection = &priv->ev_selection; + + /* initialize event selection */ + changelog_init_event_selection (this, selection); + + ret = pthread_create (&priv->poller, NULL, changelog_rpc_poller, this); + if (ret != 0) { + gf_log (this->name, GF_LOG_ERROR, + "failed to spawn poller thread"); + goto error_return; + } + + priv->rbuf = rbuf_init (NR_ROTT_BUFFS); + if (!priv->rbuf) + goto cleanup_thread; + + rpc = changelog_init_rpc_listner (this, priv, + priv->rbuf, NR_DISPATCHERS); + if (!rpc) + goto cleanup_rbuf; + priv->rpc = rpc; + + return 0; + + cleanup_rbuf: + rbuf_dtor (priv->rbuf); + cleanup_thread: + (void) changelog_thread_cleanup (this, priv->poller); + error_return: + return -1; +} + +int32_t +init (xlator_t *this) +{ + int ret = -1; + char *tmp = NULL; + changelog_priv_t *priv = NULL; + + GF_VALIDATE_OR_GOTO ("changelog", this, error_return); + + if (!this->children || this->children->next) { + gf_log (this->name, GF_LOG_ERROR, + "translator needs a single subvolume"); + goto error_return; + } + + if (!this->parents) { + gf_log (this->name, GF_LOG_ERROR, + "dangling volume. please check volfile"); + goto error_return; + } + + priv = GF_CALLOC (1, sizeof (*priv), gf_changelog_mt_priv_t); + if (!priv) + goto error_return; + + this->local_pool = mem_pool_new (changelog_local_t, 64); + if (!this->local_pool) { + gf_log (this->name, GF_LOG_ERROR, + "failed to create local memory pool"); + goto cleanup_priv; + } + + LOCK_INIT (&priv->lock); + LOCK_INIT (&priv->c_snap_lock); + + ret = changelog_init_options (this, priv); + if (ret) + goto cleanup_mempool; + /* snap dependency changes */ priv->dm.black_fop_cnt = 0; priv->dm.white_fop_cnt = 0; @@ -2422,67 +2532,68 @@ init (xlator_t *this) priv->dm.drain_wait_white = _gf_false; priv->current_color = FOP_COLOR_BLACK; priv->explicit_rollover = _gf_false; + /* Mutex is not needed as threads are not spawned yet */ priv->bn.bnotify = _gf_false; - ret = changelog_pthread_init (this, priv); + ret = changelog_barrier_pthread_init (this, priv); if (ret) - goto out; - + goto cleanup_options; LOCK_INIT (&priv->bflags.lock); - cond_lock_init = _gf_true; priv->bflags.barrier_ext = _gf_false; /* Changelog barrier init */ INIT_LIST_HEAD (&priv->queue); priv->barrier_enabled = _gf_false; - ret = changelog_init (this, priv); + /* RPC ball rolling.. */ + ret = changelog_init_rpc (this, priv); if (ret) - goto out; + goto cleanup_barrier; + ret = changelog_init (this, priv); + if (ret) + goto cleanup_rpc; gf_log (this->name, GF_LOG_DEBUG, "changelog translator loaded"); - out: - if (ret) { - if (this && this->local_pool) - mem_pool_destroy (this->local_pool); - if (priv) { - if (priv->cb) { - ret = priv->cb->dtor (this, &priv->cd); - if (ret) - gf_log (this->name, GF_LOG_ERROR, - "error in cleanup during init()"); - } - GF_FREE (priv->changelog_brick); - GF_FREE (priv->changelog_dir); - if (cond_lock_init) - changelog_pthread_destroy (priv); - GF_FREE (priv); - } - this->private = NULL; - } else - this->private = priv; + this->private = priv; + return 0; - return ret; + cleanup_rpc: + changelog_cleanup_rpc (this, priv); + cleanup_barrier: + changelog_barrier_pthread_destroy (priv); + cleanup_options: + changelog_freeup_options (this, priv); + cleanup_mempool: + mem_pool_destroy (this->local_pool); + cleanup_priv: + GF_FREE (priv); + error_return: + this->private = NULL; + return -1; } void fini (xlator_t *this) { - int ret = -1; changelog_priv_t *priv = NULL; priv = this->private; if (priv) { - ret = priv->cb->dtor (this, &priv->cd); - if (ret) - gf_log (this->name, GF_LOG_ERROR, - "error in fini"); + /* terminate RPC server/threads */ + changelog_cleanup_rpc (this, priv); + + /* cleanup barrier related objects */ + changelog_barrier_pthread_destroy (priv); + + /* cleanup allocated options */ + changelog_freeup_options (this, priv); + + /* deallocate mempool */ mem_pool_destroy (this->local_pool); - GF_FREE (priv->changelog_brick); - GF_FREE (priv->changelog_dir); - changelog_pthread_destroy (priv); + + /* finally, dealloac private variable */ GF_FREE (priv); } @@ -2492,6 +2603,7 @@ fini (xlator_t *this) } struct xlator_fops fops = { + .open = changelog_open, .mknod = changelog_mknod, .mkdir = changelog_mkdir, .create = changelog_create, @@ -2513,6 +2625,7 @@ struct xlator_fops fops = { struct xlator_cbks cbks = { .forget = changelog_forget, + .release = changelog_release, }; struct volume_options options[] = { |