summaryrefslogtreecommitdiffstats
path: root/xlators/features/changelog/src/changelog.c
diff options
context:
space:
mode:
authorVenky Shankar <vshankar@redhat.com>2015-02-03 19:22:16 +0530
committerVijay Bellur <vbellur@redhat.com>2015-03-18 18:22:36 -0700
commit4737584fffcd25dbe35d17b076c95bf90a422cf2 (patch)
tree9f30e0e90c88c245787b78af3ca78d7ae05e30f2 /xlators/features/changelog/src/changelog.c
parent728fcd41eb39f66744d84b979dd8195fd47313ed (diff)
features/changelog: RPC'fy {libgf}changelog
This patch introduces RPC based communication between the changelog translator and libgfchangelog. It replaces the old pathetic stream based interaction that existed earlier (due to time constraints :-/). Changelog, upon initialization starts a RPC server (rpcsvc) allowing clients to invoke a probe API as a bootup mechanism to request for event notifications. During probe, clients can choose an event filter specifying the type(s) of events they are interested in. As of now there is no way to change the event notification set once the probe RPC call is made, but that is easier to implement. The actual event notifications is done on a separate RPC session. The client (libgfchangelog) itself starts and RPC server which the changelog translator "connects back" during probe. Notifications are dispatched by a bunch of threads from the server (translator) and the client optionally orders them if ordered notifications are requried. FOPs fill in their respective event details in a buffer (rot-buffs to be particular) and a bunch of threads (consumers) swap the buffers out of roatation and dispatch them via RPC. To avoid writer starvation, then number of dispatcher threads is one less than the number of buffer list in rot-buffs.x libgfchangelog becomes purely callback based -- upon event notification from the server (and re-ordering them if required) invoke a callback routine specified by consumer(s). A major part of the patch is also aimed at providing backward compatibility for geo-replication, which was one of the main consumer of the stream based API. Also, this patch does not\ "turn on" event notifications for all fops, just a bunch which is currently in requirement. Another pain point is that the server does not filter events before dispatching it to the clients. That load is taken up by the client itself (although it's done at the library layer rather than making it hard on the callback implementor). This needs improvement and care needs to be taken to not load the server up with expensive filtering mechanisms. Change-Id: Ibf60a432b68f2dfa60c6f9add2bcfd37a9c41395 BUG: 1170075 Signed-off-by: Venky Shankar <vshankar@redhat.com> Reviewed-on: http://review.gluster.org/9708 Reviewed-by: Jeff Darcy <jdarcy@redhat.com> Tested-by: Gluster Build System <jenkins@build.gluster.com> Reviewed-by: Vijay Bellur <vbellur@redhat.com>
Diffstat (limited to 'xlators/features/changelog/src/changelog.c')
-rw-r--r--xlators/features/changelog/src/changelog.c473
1 files changed, 293 insertions, 180 deletions
diff --git a/xlators/features/changelog/src/changelog.c b/xlators/features/changelog/src/changelog.c
index 4263a462ad7..e7d8522ae8c 100644
--- a/xlators/features/changelog/src/changelog.c
+++ b/xlators/features/changelog/src/changelog.c
@@ -19,14 +19,13 @@
#include "iobuf.h"
#include "changelog-rt.h"
-#include "changelog-helpers.h"
#include "changelog-encoders.h"
#include "changelog-mem-types.h"
#include <pthread.h>
-#include "changelog-notifier.h"
+#include "changelog-rpc.h"
static struct changelog_bootstrap
cb_bootstrap[] = {
@@ -912,14 +911,30 @@ changelog_create_cbk (call_frame_t *frame,
struct iatt *preparent,
struct iatt *postparent, dict_t *xdata)
{
+ int32_t ret = 0;
changelog_priv_t *priv = NULL;
changelog_local_t *local = NULL;
+ changelog_event_t ev = {0,};
priv = this->private;
local = frame->local;
CHANGELOG_COND_GOTO (priv, ((op_ret < 0) || !local), unwind);
+ /* fill the event structure.. similar to open() */
+ ev.ev_type = CHANGELOG_OP_TYPE_CREATE;
+ uuid_copy (ev.u.create.gfid, buf->ia_gfid);
+ ev.u.create.flags = fd->flags;
+ changelog_dispatch_event (this, priv, &ev);
+
+ if (changelog_ev_selected
+ (this, &priv->ev_selection, CHANGELOG_OP_TYPE_RELEASE)) {
+ ret = fd_ctx_set (fd, this, (uint64_t)(long) 0x1);
+ if (ret)
+ gf_log (this->name, GF_LOG_WARNING,
+ "could not set fd context (for release cbk)");
+ }
+
changelog_update (this, priv, local, CHANGELOG_TYPE_ENTRY);
unwind:
@@ -1633,6 +1648,92 @@ changelog_writev (call_frame_t *frame,
/* }}} */
+/* open, release and other beasts */
+
+/* {{{ */
+
+
+
+int
+changelog_open_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
+ int op_ret, int op_errno, fd_t *fd, dict_t *xdata)
+{
+ int ret = 0;
+ void *opaque = NULL;
+ char *buf = NULL;
+ ssize_t buflen = 0;
+ changelog_priv_t *priv = NULL;
+ changelog_event_t ev = {0,};
+ gf_boolean_t logopen = _gf_false;
+
+ priv = this->private;
+ if (frame->local) {
+ frame->local = NULL;
+ logopen = _gf_true;
+ }
+
+ CHANGELOG_COND_GOTO (priv, ((op_ret < 0) || !logopen), unwind);
+
+ /* fill the event structure */
+ ev.ev_type = CHANGELOG_OP_TYPE_OPEN;
+ uuid_copy (ev.u.open.gfid, fd->inode->gfid);
+ ev.u.open.flags = fd->flags;
+ changelog_dispatch_event (this, priv, &ev);
+
+ if (changelog_ev_selected
+ (this, &priv->ev_selection, CHANGELOG_OP_TYPE_RELEASE)) {
+ ret = fd_ctx_set (fd, this, (uint64_t)(long) 0x1);
+ if (ret)
+ gf_log (this->name, GF_LOG_WARNING,
+ "could not set fd context (for release cbk)");
+ }
+
+ unwind:
+ CHANGELOG_STACK_UNWIND (open, frame, op_ret, op_errno, fd, xdata);
+ return 0;
+}
+
+int
+changelog_open (call_frame_t *frame, xlator_t *this,
+ loc_t *loc, int flags, fd_t *fd, dict_t *xdata)
+{
+ changelog_priv_t *priv = NULL;
+
+ priv = this->private;
+ CHANGELOG_NOT_ACTIVE_THEN_GOTO (frame, priv, wind);
+
+ frame->local = (void *)0x1; /* do not dereference in ->cbk */
+
+ wind:
+ STACK_WIND (frame, changelog_open_cbk, FIRST_CHILD (this),
+ FIRST_CHILD (this)->fops->open, loc, flags, fd, xdata);
+ return 0;
+}
+
+/* }}} */
+
+/* {{{ */
+
+int32_t
+changelog_release (xlator_t *this, fd_t *fd)
+{
+ changelog_event_t ev = {0,};
+ changelog_priv_t *priv = NULL;
+
+ priv = this->private;
+
+ ev.ev_type = CHANGELOG_OP_TYPE_RELEASE;
+ uuid_copy (ev.u.release.gfid, fd->inode->gfid);
+ changelog_dispatch_event (this, priv, &ev);
+
+ (void) fd_ctx_del (fd, this, NULL);
+
+ return 0;
+}
+
+
+/* }}} */
+
/**
* The
* - @init ()
@@ -1679,7 +1780,7 @@ changelog_cleanup_helper_threads (xlator_t *this, changelog_priv_t *priv)
int ret = 0;
if (priv->cr.rollover_th) {
- changelog_thread_cleanup (this, priv->cr.rollover_th);
+ (void) changelog_thread_cleanup (this, priv->cr.rollover_th);
priv->cr.rollover_th = 0;
ret = close (priv->cr_wfd);
if (ret)
@@ -1689,7 +1790,7 @@ changelog_cleanup_helper_threads (xlator_t *this, changelog_priv_t *priv)
}
if (priv->cf.fsync_th) {
- changelog_thread_cleanup (this, priv->cf.fsync_th);
+ (void) changelog_thread_cleanup (this, priv->cf.fsync_th);
priv->cf.fsync_th = 0;
}
}
@@ -1754,67 +1855,6 @@ changelog_spawn_helper_threads (xlator_t *this, changelog_priv_t *priv)
return ret;
}
-/* cleanup the notifier thread */
-static int
-changelog_cleanup_notifier (xlator_t *this, changelog_priv_t *priv)
-{
- int ret = 0;
-
- if (priv->cn.notify_th) {
- changelog_thread_cleanup (this, priv->cn.notify_th);
- priv->cn.notify_th = 0;
-
- ret = close (priv->wfd);
- if (ret)
- gf_log (this->name, GF_LOG_ERROR,
- "error closing writer end of notifier pipe"
- " (reason: %s)", strerror (errno));
- }
-
- return ret;
-}
-
-/* spawn the notifier thread - nop if already running */
-static int
-changelog_spawn_notifier (xlator_t *this, changelog_priv_t *priv)
-{
- int ret = 0;
- int flags = 0;
- int pipe_fd[2] = {0, 0};
-
- if (priv->cn.notify_th)
- goto out; /* notifier thread already running */
-
- ret = pipe (pipe_fd);
- if (ret == -1) {
- gf_log (this->name, GF_LOG_ERROR,
- "Cannot create pipe (reason: %s)", strerror (errno));
- goto out;
- }
-
- /* writer is non-blocking */
- flags = fcntl (pipe_fd[1], F_GETFL);
- flags |= O_NONBLOCK;
-
- ret = fcntl (pipe_fd[1], F_SETFL, flags);
- if (ret) {
- gf_log (this->name, GF_LOG_ERROR,
- "failed to set O_NONBLOCK flag");
- goto out;
- }
-
- priv->wfd = pipe_fd[1];
-
- priv->cn.this = this;
- priv->cn.rfd = pipe_fd[0];
-
- ret = gf_thread_create (&priv->cn.notify_th,
- NULL, changelog_notifier, priv);
-
- out:
- return ret;
-}
-
int
notify (xlator_t *this, int event, void *data, ...)
{
@@ -2054,11 +2094,6 @@ changelog_init (xlator_t *this, changelog_priv_t *priv)
if (!priv->active)
return ret;
- /* spawn the notifier thread */
- ret = changelog_spawn_notifier (this, priv);
- if (ret)
- goto out;
-
/**
* start with a fresh changelog file every time. this is done
* in case there was an encoding change. so... things are kept
@@ -2086,9 +2121,11 @@ changelog_init (xlator_t *this, changelog_priv_t *priv)
return ret;
}
-/* Init all pthread condition variables and locks in changelog*/
+/**
+ * Init barrier related condition variables and locks
+ */
static int
-changelog_pthread_init (xlator_t *this, changelog_priv_t *priv)
+changelog_barrier_pthread_init (xlator_t *this, changelog_priv_t *priv)
{
gf_boolean_t bn_mutex_init = _gf_false;
gf_boolean_t bn_cond_init = _gf_false;
@@ -2165,9 +2202,9 @@ changelog_pthread_init (xlator_t *this, changelog_priv_t *priv)
return ret;
}
-/* Destroy all pthread condition variables and locks in changelog */
+/* Destroy barrier related condition variables and locks */
static inline void
-changelog_pthread_destroy (changelog_priv_t *priv)
+changelog_barrier_pthread_destroy (changelog_priv_t *priv)
{
pthread_mutex_destroy (&priv->bn.bnotify_mutex);
pthread_cond_destroy (&priv->bn.bnotify_cond);
@@ -2284,17 +2321,13 @@ reconfigure (xlator_t *this, dict_t *options)
}
htime_open(this, priv, tv.tv_sec);
}
- ret = changelog_spawn_notifier (this, priv);
- if (!ret)
- ret = changelog_spawn_helper_threads (this,
- priv);
- } else
- ret = changelog_cleanup_notifier (this, priv);
+ ret = changelog_spawn_helper_threads (this, priv);
+ }
}
out:
if (ret) {
- ret = changelog_cleanup_notifier (this, priv);
+ /* TODO */
} else {
gf_log (this->name, GF_LOG_DEBUG,
"changelog reconfigured");
@@ -2305,67 +2338,40 @@ reconfigure (xlator_t *this, dict_t *options)
return ret;
}
-int32_t
-init (xlator_t *this)
+static void
+changelog_freeup_options (xlator_t *this, changelog_priv_t *priv)
{
- int ret = -1;
- char *tmp = NULL;
- changelog_priv_t *priv = NULL;
- gf_boolean_t cond_lock_init = _gf_false;
- char htime_dir[PATH_MAX] = {0,};
- char csnap_dir[PATH_MAX] = {0,};
- uint32_t timeout = 0;
-
- GF_VALIDATE_OR_GOTO ("changelog", this, out);
-
- if (!this->children || this->children->next) {
- gf_log (this->name, GF_LOG_ERROR,
- "translator needs a single subvolume");
- goto out;
- }
-
- if (!this->parents) {
- gf_log (this->name, GF_LOG_ERROR,
- "dangling volume. please check volfile");
- goto out;
- }
-
- priv = GF_CALLOC (1, sizeof (*priv), gf_changelog_mt_priv_t);
- if (!priv)
- goto out;
+ int ret = 0;
- this->local_pool = mem_pool_new (changelog_local_t, 64);
- if (!this->local_pool) {
+ ret = priv->cb->dtor (this, &priv->cd);
+ if (ret)
gf_log (this->name, GF_LOG_ERROR,
- "failed to create local memory pool");
- goto out;
- }
-
- LOCK_INIT (&priv->lock);
- LOCK_INIT (&priv->c_snap_lock);
+ "could not cleanup bootstrapper");
+ GF_FREE (priv->changelog_brick);
+ GF_FREE (priv->changelog_dir);
+}
- GF_OPTION_INIT ("changelog-brick", tmp, str, out);
- if (!tmp) {
- gf_log (this->name, GF_LOG_ERROR,
- "\"changelog-brick\" option is not set");
- goto out;
- }
+static int
+changelog_init_options (xlator_t *this, changelog_priv_t *priv)
+{
+ int ret = 0;
+ char *tmp = NULL;
+ uint32_t timeout = 0;
+ char htime_dir[PATH_MAX] = {0,};
+ char csnap_dir[PATH_MAX] = {0,};
+ GF_OPTION_INIT ("changelog-brick", tmp, str, error_return);
priv->changelog_brick = gf_strdup (tmp);
if (!priv->changelog_brick)
- goto out;
- tmp = NULL;
+ goto error_return;
- GF_OPTION_INIT ("changelog-dir", tmp, str, out);
- if (!tmp) {
- gf_log (this->name, GF_LOG_ERROR,
- "\"changelog-dir\" option is not set");
- goto out;
- }
+ tmp = NULL;
+ GF_OPTION_INIT ("changelog-dir", tmp, str, dealloc_1);
priv->changelog_dir = gf_strdup (tmp);
if (!priv->changelog_dir)
- goto out;
+ goto dealloc_1;
+
tmp = NULL;
/**
@@ -2375,35 +2381,38 @@ init (xlator_t *this)
ret = mkdir_p (priv->changelog_dir, 0600, _gf_true);
if (ret)
- goto out;
+ goto dealloc_2;
- CHANGELOG_FILL_HTIME_DIR(priv->changelog_dir, htime_dir);
+ CHANGELOG_FILL_HTIME_DIR (priv->changelog_dir, htime_dir);
ret = mkdir_p (htime_dir, 0600, _gf_true);
if (ret)
- goto out;
+ goto dealloc_2;
- CHANGELOG_FILL_CSNAP_DIR(priv->changelog_dir, csnap_dir);
+ CHANGELOG_FILL_CSNAP_DIR (priv->changelog_dir, csnap_dir);
ret = mkdir_p (csnap_dir, 0600, _gf_true);
if (ret)
- goto out;
+ goto dealloc_2;
- GF_OPTION_INIT ("changelog", priv->active, bool, out);
+ GF_OPTION_INIT ("changelog", priv->active, bool, dealloc_2);
- GF_OPTION_INIT ("op-mode", tmp, str, out);
+ GF_OPTION_INIT ("op-mode", tmp, str, dealloc_2);
changelog_assign_opmode (priv, tmp);
tmp = NULL;
- GF_OPTION_INIT ("encoding", tmp, str, out);
+ GF_OPTION_INIT ("encoding", tmp, str, dealloc_2);
changelog_assign_encoding (priv, tmp);
+ changelog_encode_change (priv);
- GF_OPTION_INIT ("rollover-time", priv->rollover_time, int32, out);
+ GF_OPTION_INIT ("rollover-time",
+ priv->rollover_time, int32, dealloc_2);
- GF_OPTION_INIT ("fsync-interval", priv->fsync_interval, int32, out);
- GF_OPTION_INIT ("changelog-barrier-timeout", timeout, time, out);
- priv->timeout.tv_sec = timeout;
+ GF_OPTION_INIT ("fsync-interval",
+ priv->fsync_interval, int32, dealloc_2);
- changelog_encode_change(priv);
+ GF_OPTION_INIT ("changelog-barrier-timeout",
+ timeout, time, dealloc_2);
+ changelog_assign_barrier_timeout (priv, timeout);
GF_ASSERT (cb_bootstrap[priv->op_mode].mode == priv->op_mode);
priv->cb = &cb_bootstrap[priv->op_mode];
@@ -2411,10 +2420,111 @@ init (xlator_t *this)
/* ... now bootstrap the logger */
ret = priv->cb->ctor (this, &priv->cd);
if (ret)
- goto out;
+ goto dealloc_2;
priv->changelog_fd = -1;
+ return 0;
+
+ dealloc_2:
+ GF_FREE (priv->changelog_dir);
+ dealloc_1:
+ GF_FREE (priv->changelog_brick);
+ error_return:
+ return -1;
+}
+
+static void
+changelog_cleanup_rpc (xlator_t *this, changelog_priv_t *priv)
+{
+ /* terminate rpc server */
+ changelog_destroy_rpc_listner (this, priv);
+
+ /* cleanup rot buffs */
+ rbuf_dtor (priv->rbuf);
+
+ /* cleanup poller thread */
+ (void) changelog_thread_cleanup (this, priv->poller);
+}
+
+static int
+changelog_init_rpc (xlator_t *this, changelog_priv_t *priv)
+{
+ int ret = 0;
+ rpcsvc_t *rpc = NULL;
+ changelog_ev_selector_t *selection = NULL;
+
+ selection = &priv->ev_selection;
+
+ /* initialize event selection */
+ changelog_init_event_selection (this, selection);
+
+ ret = pthread_create (&priv->poller, NULL, changelog_rpc_poller, this);
+ if (ret != 0) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "failed to spawn poller thread");
+ goto error_return;
+ }
+
+ priv->rbuf = rbuf_init (NR_ROTT_BUFFS);
+ if (!priv->rbuf)
+ goto cleanup_thread;
+
+ rpc = changelog_init_rpc_listner (this, priv,
+ priv->rbuf, NR_DISPATCHERS);
+ if (!rpc)
+ goto cleanup_rbuf;
+ priv->rpc = rpc;
+
+ return 0;
+
+ cleanup_rbuf:
+ rbuf_dtor (priv->rbuf);
+ cleanup_thread:
+ (void) changelog_thread_cleanup (this, priv->poller);
+ error_return:
+ return -1;
+}
+
+int32_t
+init (xlator_t *this)
+{
+ int ret = -1;
+ char *tmp = NULL;
+ changelog_priv_t *priv = NULL;
+
+ GF_VALIDATE_OR_GOTO ("changelog", this, error_return);
+
+ if (!this->children || this->children->next) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "translator needs a single subvolume");
+ goto error_return;
+ }
+
+ if (!this->parents) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "dangling volume. please check volfile");
+ goto error_return;
+ }
+
+ priv = GF_CALLOC (1, sizeof (*priv), gf_changelog_mt_priv_t);
+ if (!priv)
+ goto error_return;
+
+ this->local_pool = mem_pool_new (changelog_local_t, 64);
+ if (!this->local_pool) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "failed to create local memory pool");
+ goto cleanup_priv;
+ }
+
+ LOCK_INIT (&priv->lock);
+ LOCK_INIT (&priv->c_snap_lock);
+
+ ret = changelog_init_options (this, priv);
+ if (ret)
+ goto cleanup_mempool;
+
/* snap dependency changes */
priv->dm.black_fop_cnt = 0;
priv->dm.white_fop_cnt = 0;
@@ -2422,67 +2532,68 @@ init (xlator_t *this)
priv->dm.drain_wait_white = _gf_false;
priv->current_color = FOP_COLOR_BLACK;
priv->explicit_rollover = _gf_false;
+
/* Mutex is not needed as threads are not spawned yet */
priv->bn.bnotify = _gf_false;
- ret = changelog_pthread_init (this, priv);
+ ret = changelog_barrier_pthread_init (this, priv);
if (ret)
- goto out;
-
+ goto cleanup_options;
LOCK_INIT (&priv->bflags.lock);
- cond_lock_init = _gf_true;
priv->bflags.barrier_ext = _gf_false;
/* Changelog barrier init */
INIT_LIST_HEAD (&priv->queue);
priv->barrier_enabled = _gf_false;
- ret = changelog_init (this, priv);
+ /* RPC ball rolling.. */
+ ret = changelog_init_rpc (this, priv);
if (ret)
- goto out;
+ goto cleanup_barrier;
+ ret = changelog_init (this, priv);
+ if (ret)
+ goto cleanup_rpc;
gf_log (this->name, GF_LOG_DEBUG, "changelog translator loaded");
- out:
- if (ret) {
- if (this && this->local_pool)
- mem_pool_destroy (this->local_pool);
- if (priv) {
- if (priv->cb) {
- ret = priv->cb->dtor (this, &priv->cd);
- if (ret)
- gf_log (this->name, GF_LOG_ERROR,
- "error in cleanup during init()");
- }
- GF_FREE (priv->changelog_brick);
- GF_FREE (priv->changelog_dir);
- if (cond_lock_init)
- changelog_pthread_destroy (priv);
- GF_FREE (priv);
- }
- this->private = NULL;
- } else
- this->private = priv;
+ this->private = priv;
+ return 0;
- return ret;
+ cleanup_rpc:
+ changelog_cleanup_rpc (this, priv);
+ cleanup_barrier:
+ changelog_barrier_pthread_destroy (priv);
+ cleanup_options:
+ changelog_freeup_options (this, priv);
+ cleanup_mempool:
+ mem_pool_destroy (this->local_pool);
+ cleanup_priv:
+ GF_FREE (priv);
+ error_return:
+ this->private = NULL;
+ return -1;
}
void
fini (xlator_t *this)
{
- int ret = -1;
changelog_priv_t *priv = NULL;
priv = this->private;
if (priv) {
- ret = priv->cb->dtor (this, &priv->cd);
- if (ret)
- gf_log (this->name, GF_LOG_ERROR,
- "error in fini");
+ /* terminate RPC server/threads */
+ changelog_cleanup_rpc (this, priv);
+
+ /* cleanup barrier related objects */
+ changelog_barrier_pthread_destroy (priv);
+
+ /* cleanup allocated options */
+ changelog_freeup_options (this, priv);
+
+ /* deallocate mempool */
mem_pool_destroy (this->local_pool);
- GF_FREE (priv->changelog_brick);
- GF_FREE (priv->changelog_dir);
- changelog_pthread_destroy (priv);
+
+ /* finally, dealloac private variable */
GF_FREE (priv);
}
@@ -2492,6 +2603,7 @@ fini (xlator_t *this)
}
struct xlator_fops fops = {
+ .open = changelog_open,
.mknod = changelog_mknod,
.mkdir = changelog_mkdir,
.create = changelog_create,
@@ -2513,6 +2625,7 @@ struct xlator_fops fops = {
struct xlator_cbks cbks = {
.forget = changelog_forget,
+ .release = changelog_release,
};
struct volume_options options[] = {