summaryrefslogtreecommitdiffstats
path: root/xlators/features/changelog/src/changelog.c
diff options
context:
space:
mode:
Diffstat (limited to 'xlators/features/changelog/src/changelog.c')
-rw-r--r--xlators/features/changelog/src/changelog.c473
1 files changed, 293 insertions, 180 deletions
diff --git a/xlators/features/changelog/src/changelog.c b/xlators/features/changelog/src/changelog.c
index 4263a462ad7..e7d8522ae8c 100644
--- a/xlators/features/changelog/src/changelog.c
+++ b/xlators/features/changelog/src/changelog.c
@@ -19,14 +19,13 @@
#include "iobuf.h"
#include "changelog-rt.h"
-#include "changelog-helpers.h"
#include "changelog-encoders.h"
#include "changelog-mem-types.h"
#include <pthread.h>
-#include "changelog-notifier.h"
+#include "changelog-rpc.h"
static struct changelog_bootstrap
cb_bootstrap[] = {
@@ -912,14 +911,30 @@ changelog_create_cbk (call_frame_t *frame,
struct iatt *preparent,
struct iatt *postparent, dict_t *xdata)
{
+ int32_t ret = 0;
changelog_priv_t *priv = NULL;
changelog_local_t *local = NULL;
+ changelog_event_t ev = {0,};
priv = this->private;
local = frame->local;
CHANGELOG_COND_GOTO (priv, ((op_ret < 0) || !local), unwind);
+ /* fill the event structure.. similar to open() */
+ ev.ev_type = CHANGELOG_OP_TYPE_CREATE;
+ uuid_copy (ev.u.create.gfid, buf->ia_gfid);
+ ev.u.create.flags = fd->flags;
+ changelog_dispatch_event (this, priv, &ev);
+
+ if (changelog_ev_selected
+ (this, &priv->ev_selection, CHANGELOG_OP_TYPE_RELEASE)) {
+ ret = fd_ctx_set (fd, this, (uint64_t)(long) 0x1);
+ if (ret)
+ gf_log (this->name, GF_LOG_WARNING,
+ "could not set fd context (for release cbk)");
+ }
+
changelog_update (this, priv, local, CHANGELOG_TYPE_ENTRY);
unwind:
@@ -1633,6 +1648,92 @@ changelog_writev (call_frame_t *frame,
/* }}} */
+/* open, release and other beasts */
+
+/* {{{ */
+
+
+
+int
+changelog_open_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
+ int op_ret, int op_errno, fd_t *fd, dict_t *xdata)
+{
+ int ret = 0;
+ void *opaque = NULL;
+ char *buf = NULL;
+ ssize_t buflen = 0;
+ changelog_priv_t *priv = NULL;
+ changelog_event_t ev = {0,};
+ gf_boolean_t logopen = _gf_false;
+
+ priv = this->private;
+ if (frame->local) {
+ frame->local = NULL;
+ logopen = _gf_true;
+ }
+
+ CHANGELOG_COND_GOTO (priv, ((op_ret < 0) || !logopen), unwind);
+
+ /* fill the event structure */
+ ev.ev_type = CHANGELOG_OP_TYPE_OPEN;
+ uuid_copy (ev.u.open.gfid, fd->inode->gfid);
+ ev.u.open.flags = fd->flags;
+ changelog_dispatch_event (this, priv, &ev);
+
+ if (changelog_ev_selected
+ (this, &priv->ev_selection, CHANGELOG_OP_TYPE_RELEASE)) {
+ ret = fd_ctx_set (fd, this, (uint64_t)(long) 0x1);
+ if (ret)
+ gf_log (this->name, GF_LOG_WARNING,
+ "could not set fd context (for release cbk)");
+ }
+
+ unwind:
+ CHANGELOG_STACK_UNWIND (open, frame, op_ret, op_errno, fd, xdata);
+ return 0;
+}
+
+int
+changelog_open (call_frame_t *frame, xlator_t *this,
+ loc_t *loc, int flags, fd_t *fd, dict_t *xdata)
+{
+ changelog_priv_t *priv = NULL;
+
+ priv = this->private;
+ CHANGELOG_NOT_ACTIVE_THEN_GOTO (frame, priv, wind);
+
+ frame->local = (void *)0x1; /* do not dereference in ->cbk */
+
+ wind:
+ STACK_WIND (frame, changelog_open_cbk, FIRST_CHILD (this),
+ FIRST_CHILD (this)->fops->open, loc, flags, fd, xdata);
+ return 0;
+}
+
+/* }}} */
+
+/* {{{ */
+
+int32_t
+changelog_release (xlator_t *this, fd_t *fd)
+{
+ changelog_event_t ev = {0,};
+ changelog_priv_t *priv = NULL;
+
+ priv = this->private;
+
+ ev.ev_type = CHANGELOG_OP_TYPE_RELEASE;
+ uuid_copy (ev.u.release.gfid, fd->inode->gfid);
+ changelog_dispatch_event (this, priv, &ev);
+
+ (void) fd_ctx_del (fd, this, NULL);
+
+ return 0;
+}
+
+
+/* }}} */
+
/**
* The
* - @init ()
@@ -1679,7 +1780,7 @@ changelog_cleanup_helper_threads (xlator_t *this, changelog_priv_t *priv)
int ret = 0;
if (priv->cr.rollover_th) {
- changelog_thread_cleanup (this, priv->cr.rollover_th);
+ (void) changelog_thread_cleanup (this, priv->cr.rollover_th);
priv->cr.rollover_th = 0;
ret = close (priv->cr_wfd);
if (ret)
@@ -1689,7 +1790,7 @@ changelog_cleanup_helper_threads (xlator_t *this, changelog_priv_t *priv)
}
if (priv->cf.fsync_th) {
- changelog_thread_cleanup (this, priv->cf.fsync_th);
+ (void) changelog_thread_cleanup (this, priv->cf.fsync_th);
priv->cf.fsync_th = 0;
}
}
@@ -1754,67 +1855,6 @@ changelog_spawn_helper_threads (xlator_t *this, changelog_priv_t *priv)
return ret;
}
-/* cleanup the notifier thread */
-static int
-changelog_cleanup_notifier (xlator_t *this, changelog_priv_t *priv)
-{
- int ret = 0;
-
- if (priv->cn.notify_th) {
- changelog_thread_cleanup (this, priv->cn.notify_th);
- priv->cn.notify_th = 0;
-
- ret = close (priv->wfd);
- if (ret)
- gf_log (this->name, GF_LOG_ERROR,
- "error closing writer end of notifier pipe"
- " (reason: %s)", strerror (errno));
- }
-
- return ret;
-}
-
-/* spawn the notifier thread - nop if already running */
-static int
-changelog_spawn_notifier (xlator_t *this, changelog_priv_t *priv)
-{
- int ret = 0;
- int flags = 0;
- int pipe_fd[2] = {0, 0};
-
- if (priv->cn.notify_th)
- goto out; /* notifier thread already running */
-
- ret = pipe (pipe_fd);
- if (ret == -1) {
- gf_log (this->name, GF_LOG_ERROR,
- "Cannot create pipe (reason: %s)", strerror (errno));
- goto out;
- }
-
- /* writer is non-blocking */
- flags = fcntl (pipe_fd[1], F_GETFL);
- flags |= O_NONBLOCK;
-
- ret = fcntl (pipe_fd[1], F_SETFL, flags);
- if (ret) {
- gf_log (this->name, GF_LOG_ERROR,
- "failed to set O_NONBLOCK flag");
- goto out;
- }
-
- priv->wfd = pipe_fd[1];
-
- priv->cn.this = this;
- priv->cn.rfd = pipe_fd[0];
-
- ret = gf_thread_create (&priv->cn.notify_th,
- NULL, changelog_notifier, priv);
-
- out:
- return ret;
-}
-
int
notify (xlator_t *this, int event, void *data, ...)
{
@@ -2054,11 +2094,6 @@ changelog_init (xlator_t *this, changelog_priv_t *priv)
if (!priv->active)
return ret;
- /* spawn the notifier thread */
- ret = changelog_spawn_notifier (this, priv);
- if (ret)
- goto out;
-
/**
* start with a fresh changelog file every time. this is done
* in case there was an encoding change. so... things are kept
@@ -2086,9 +2121,11 @@ changelog_init (xlator_t *this, changelog_priv_t *priv)
return ret;
}
-/* Init all pthread condition variables and locks in changelog*/
+/**
+ * Init barrier related condition variables and locks
+ */
static int
-changelog_pthread_init (xlator_t *this, changelog_priv_t *priv)
+changelog_barrier_pthread_init (xlator_t *this, changelog_priv_t *priv)
{
gf_boolean_t bn_mutex_init = _gf_false;
gf_boolean_t bn_cond_init = _gf_false;
@@ -2165,9 +2202,9 @@ changelog_pthread_init (xlator_t *this, changelog_priv_t *priv)
return ret;
}
-/* Destroy all pthread condition variables and locks in changelog */
+/* Destroy barrier related condition variables and locks */
static inline void
-changelog_pthread_destroy (changelog_priv_t *priv)
+changelog_barrier_pthread_destroy (changelog_priv_t *priv)
{
pthread_mutex_destroy (&priv->bn.bnotify_mutex);
pthread_cond_destroy (&priv->bn.bnotify_cond);
@@ -2284,17 +2321,13 @@ reconfigure (xlator_t *this, dict_t *options)
}
htime_open(this, priv, tv.tv_sec);
}
- ret = changelog_spawn_notifier (this, priv);
- if (!ret)
- ret = changelog_spawn_helper_threads (this,
- priv);
- } else
- ret = changelog_cleanup_notifier (this, priv);
+ ret = changelog_spawn_helper_threads (this, priv);
+ }
}
out:
if (ret) {
- ret = changelog_cleanup_notifier (this, priv);
+ /* TODO */
} else {
gf_log (this->name, GF_LOG_DEBUG,
"changelog reconfigured");
@@ -2305,67 +2338,40 @@ reconfigure (xlator_t *this, dict_t *options)
return ret;
}
-int32_t
-init (xlator_t *this)
+static void
+changelog_freeup_options (xlator_t *this, changelog_priv_t *priv)
{
- int ret = -1;
- char *tmp = NULL;
- changelog_priv_t *priv = NULL;
- gf_boolean_t cond_lock_init = _gf_false;
- char htime_dir[PATH_MAX] = {0,};
- char csnap_dir[PATH_MAX] = {0,};
- uint32_t timeout = 0;
-
- GF_VALIDATE_OR_GOTO ("changelog", this, out);
-
- if (!this->children || this->children->next) {
- gf_log (this->name, GF_LOG_ERROR,
- "translator needs a single subvolume");
- goto out;
- }
-
- if (!this->parents) {
- gf_log (this->name, GF_LOG_ERROR,
- "dangling volume. please check volfile");
- goto out;
- }
-
- priv = GF_CALLOC (1, sizeof (*priv), gf_changelog_mt_priv_t);
- if (!priv)
- goto out;
+ int ret = 0;
- this->local_pool = mem_pool_new (changelog_local_t, 64);
- if (!this->local_pool) {
+ ret = priv->cb->dtor (this, &priv->cd);
+ if (ret)
gf_log (this->name, GF_LOG_ERROR,
- "failed to create local memory pool");
- goto out;
- }
-
- LOCK_INIT (&priv->lock);
- LOCK_INIT (&priv->c_snap_lock);
+ "could not cleanup bootstrapper");
+ GF_FREE (priv->changelog_brick);
+ GF_FREE (priv->changelog_dir);
+}
- GF_OPTION_INIT ("changelog-brick", tmp, str, out);
- if (!tmp) {
- gf_log (this->name, GF_LOG_ERROR,
- "\"changelog-brick\" option is not set");
- goto out;
- }
+static int
+changelog_init_options (xlator_t *this, changelog_priv_t *priv)
+{
+ int ret = 0;
+ char *tmp = NULL;
+ uint32_t timeout = 0;
+ char htime_dir[PATH_MAX] = {0,};
+ char csnap_dir[PATH_MAX] = {0,};
+ GF_OPTION_INIT ("changelog-brick", tmp, str, error_return);
priv->changelog_brick = gf_strdup (tmp);
if (!priv->changelog_brick)
- goto out;
- tmp = NULL;
+ goto error_return;
- GF_OPTION_INIT ("changelog-dir", tmp, str, out);
- if (!tmp) {
- gf_log (this->name, GF_LOG_ERROR,
- "\"changelog-dir\" option is not set");
- goto out;
- }
+ tmp = NULL;
+ GF_OPTION_INIT ("changelog-dir", tmp, str, dealloc_1);
priv->changelog_dir = gf_strdup (tmp);
if (!priv->changelog_dir)
- goto out;
+ goto dealloc_1;
+
tmp = NULL;
/**
@@ -2375,35 +2381,38 @@ init (xlator_t *this)
ret = mkdir_p (priv->changelog_dir, 0600, _gf_true);
if (ret)
- goto out;
+ goto dealloc_2;
- CHANGELOG_FILL_HTIME_DIR(priv->changelog_dir, htime_dir);
+ CHANGELOG_FILL_HTIME_DIR (priv->changelog_dir, htime_dir);
ret = mkdir_p (htime_dir, 0600, _gf_true);
if (ret)
- goto out;
+ goto dealloc_2;
- CHANGELOG_FILL_CSNAP_DIR(priv->changelog_dir, csnap_dir);
+ CHANGELOG_FILL_CSNAP_DIR (priv->changelog_dir, csnap_dir);
ret = mkdir_p (csnap_dir, 0600, _gf_true);
if (ret)
- goto out;
+ goto dealloc_2;
- GF_OPTION_INIT ("changelog", priv->active, bool, out);
+ GF_OPTION_INIT ("changelog", priv->active, bool, dealloc_2);
- GF_OPTION_INIT ("op-mode", tmp, str, out);
+ GF_OPTION_INIT ("op-mode", tmp, str, dealloc_2);
changelog_assign_opmode (priv, tmp);
tmp = NULL;
- GF_OPTION_INIT ("encoding", tmp, str, out);
+ GF_OPTION_INIT ("encoding", tmp, str, dealloc_2);
changelog_assign_encoding (priv, tmp);
+ changelog_encode_change (priv);
- GF_OPTION_INIT ("rollover-time", priv->rollover_time, int32, out);
+ GF_OPTION_INIT ("rollover-time",
+ priv->rollover_time, int32, dealloc_2);
- GF_OPTION_INIT ("fsync-interval", priv->fsync_interval, int32, out);
- GF_OPTION_INIT ("changelog-barrier-timeout", timeout, time, out);
- priv->timeout.tv_sec = timeout;
+ GF_OPTION_INIT ("fsync-interval",
+ priv->fsync_interval, int32, dealloc_2);
- changelog_encode_change(priv);
+ GF_OPTION_INIT ("changelog-barrier-timeout",
+ timeout, time, dealloc_2);
+ changelog_assign_barrier_timeout (priv, timeout);
GF_ASSERT (cb_bootstrap[priv->op_mode].mode == priv->op_mode);
priv->cb = &cb_bootstrap[priv->op_mode];
@@ -2411,10 +2420,111 @@ init (xlator_t *this)
/* ... now bootstrap the logger */
ret = priv->cb->ctor (this, &priv->cd);
if (ret)
- goto out;
+ goto dealloc_2;
priv->changelog_fd = -1;
+ return 0;
+
+ dealloc_2:
+ GF_FREE (priv->changelog_dir);
+ dealloc_1:
+ GF_FREE (priv->changelog_brick);
+ error_return:
+ return -1;
+}
+
+static void
+changelog_cleanup_rpc (xlator_t *this, changelog_priv_t *priv)
+{
+ /* terminate rpc server */
+ changelog_destroy_rpc_listner (this, priv);
+
+ /* cleanup rot buffs */
+ rbuf_dtor (priv->rbuf);
+
+ /* cleanup poller thread */
+ (void) changelog_thread_cleanup (this, priv->poller);
+}
+
+static int
+changelog_init_rpc (xlator_t *this, changelog_priv_t *priv)
+{
+ int ret = 0;
+ rpcsvc_t *rpc = NULL;
+ changelog_ev_selector_t *selection = NULL;
+
+ selection = &priv->ev_selection;
+
+ /* initialize event selection */
+ changelog_init_event_selection (this, selection);
+
+ ret = pthread_create (&priv->poller, NULL, changelog_rpc_poller, this);
+ if (ret != 0) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "failed to spawn poller thread");
+ goto error_return;
+ }
+
+ priv->rbuf = rbuf_init (NR_ROTT_BUFFS);
+ if (!priv->rbuf)
+ goto cleanup_thread;
+
+ rpc = changelog_init_rpc_listner (this, priv,
+ priv->rbuf, NR_DISPATCHERS);
+ if (!rpc)
+ goto cleanup_rbuf;
+ priv->rpc = rpc;
+
+ return 0;
+
+ cleanup_rbuf:
+ rbuf_dtor (priv->rbuf);
+ cleanup_thread:
+ (void) changelog_thread_cleanup (this, priv->poller);
+ error_return:
+ return -1;
+}
+
+int32_t
+init (xlator_t *this)
+{
+ int ret = -1;
+ char *tmp = NULL;
+ changelog_priv_t *priv = NULL;
+
+ GF_VALIDATE_OR_GOTO ("changelog", this, error_return);
+
+ if (!this->children || this->children->next) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "translator needs a single subvolume");
+ goto error_return;
+ }
+
+ if (!this->parents) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "dangling volume. please check volfile");
+ goto error_return;
+ }
+
+ priv = GF_CALLOC (1, sizeof (*priv), gf_changelog_mt_priv_t);
+ if (!priv)
+ goto error_return;
+
+ this->local_pool = mem_pool_new (changelog_local_t, 64);
+ if (!this->local_pool) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "failed to create local memory pool");
+ goto cleanup_priv;
+ }
+
+ LOCK_INIT (&priv->lock);
+ LOCK_INIT (&priv->c_snap_lock);
+
+ ret = changelog_init_options (this, priv);
+ if (ret)
+ goto cleanup_mempool;
+
/* snap dependency changes */
priv->dm.black_fop_cnt = 0;
priv->dm.white_fop_cnt = 0;
@@ -2422,67 +2532,68 @@ init (xlator_t *this)
priv->dm.drain_wait_white = _gf_false;
priv->current_color = FOP_COLOR_BLACK;
priv->explicit_rollover = _gf_false;
+
/* Mutex is not needed as threads are not spawned yet */
priv->bn.bnotify = _gf_false;
- ret = changelog_pthread_init (this, priv);
+ ret = changelog_barrier_pthread_init (this, priv);
if (ret)
- goto out;
-
+ goto cleanup_options;
LOCK_INIT (&priv->bflags.lock);
- cond_lock_init = _gf_true;
priv->bflags.barrier_ext = _gf_false;
/* Changelog barrier init */
INIT_LIST_HEAD (&priv->queue);
priv->barrier_enabled = _gf_false;
- ret = changelog_init (this, priv);
+ /* RPC ball rolling.. */
+ ret = changelog_init_rpc (this, priv);
if (ret)
- goto out;
+ goto cleanup_barrier;
+ ret = changelog_init (this, priv);
+ if (ret)
+ goto cleanup_rpc;
gf_log (this->name, GF_LOG_DEBUG, "changelog translator loaded");
- out:
- if (ret) {
- if (this && this->local_pool)
- mem_pool_destroy (this->local_pool);
- if (priv) {
- if (priv->cb) {
- ret = priv->cb->dtor (this, &priv->cd);
- if (ret)
- gf_log (this->name, GF_LOG_ERROR,
- "error in cleanup during init()");
- }
- GF_FREE (priv->changelog_brick);
- GF_FREE (priv->changelog_dir);
- if (cond_lock_init)
- changelog_pthread_destroy (priv);
- GF_FREE (priv);
- }
- this->private = NULL;
- } else
- this->private = priv;
+ this->private = priv;
+ return 0;
- return ret;
+ cleanup_rpc:
+ changelog_cleanup_rpc (this, priv);
+ cleanup_barrier:
+ changelog_barrier_pthread_destroy (priv);
+ cleanup_options:
+ changelog_freeup_options (this, priv);
+ cleanup_mempool:
+ mem_pool_destroy (this->local_pool);
+ cleanup_priv:
+ GF_FREE (priv);
+ error_return:
+ this->private = NULL;
+ return -1;
}
void
fini (xlator_t *this)
{
- int ret = -1;
changelog_priv_t *priv = NULL;
priv = this->private;
if (priv) {
- ret = priv->cb->dtor (this, &priv->cd);
- if (ret)
- gf_log (this->name, GF_LOG_ERROR,
- "error in fini");
+ /* terminate RPC server/threads */
+ changelog_cleanup_rpc (this, priv);
+
+ /* cleanup barrier related objects */
+ changelog_barrier_pthread_destroy (priv);
+
+ /* cleanup allocated options */
+ changelog_freeup_options (this, priv);
+
+ /* deallocate mempool */
mem_pool_destroy (this->local_pool);
- GF_FREE (priv->changelog_brick);
- GF_FREE (priv->changelog_dir);
- changelog_pthread_destroy (priv);
+
+ /* finally, dealloac private variable */
GF_FREE (priv);
}
@@ -2492,6 +2603,7 @@ fini (xlator_t *this)
}
struct xlator_fops fops = {
+ .open = changelog_open,
.mknod = changelog_mknod,
.mkdir = changelog_mkdir,
.create = changelog_create,
@@ -2513,6 +2625,7 @@ struct xlator_fops fops = {
struct xlator_cbks cbks = {
.forget = changelog_forget,
+ .release = changelog_release,
};
struct volume_options options[] = {