summaryrefslogtreecommitdiffstats
path: root/xlators/features/changelog/src/changelog-helpers.c
diff options
context:
space:
mode:
Diffstat (limited to 'xlators/features/changelog/src/changelog-helpers.c')
-rw-r--r--xlators/features/changelog/src/changelog-helpers.c691
1 files changed, 691 insertions, 0 deletions
diff --git a/xlators/features/changelog/src/changelog-helpers.c b/xlators/features/changelog/src/changelog-helpers.c
new file mode 100644
index 000000000..c1bb6e5fe
--- /dev/null
+++ b/xlators/features/changelog/src/changelog-helpers.c
@@ -0,0 +1,691 @@
+/*
+ Copyright (c) 2013 Red Hat, Inc. <http://www.redhat.com>
+ This file is part of GlusterFS.
+
+ This file is licensed to you under your choice of the GNU Lesser
+ General Public License, version 3 or any later version (LGPLv3 or
+ later), or the GNU General Public License, version 2 (GPLv2), in all
+ cases as published by the Free Software Foundation.
+*/
+
+#ifndef _CONFIG_H
+#define _CONFIG_H
+#include "config.h"
+#endif
+
+#include "xlator.h"
+#include "defaults.h"
+#include "logging.h"
+#include "iobuf.h"
+
+#include "changelog-helpers.h"
+#include "changelog-mem-types.h"
+
+#include <pthread.h>
+
+void
+changelog_thread_cleanup (xlator_t *this, pthread_t thr_id)
+{
+ int ret = 0;
+ void *retval = NULL;
+
+ /* send a cancel request to the thread */
+ ret = pthread_cancel (thr_id);
+ if (ret) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "could not cancel thread (reason: %s)",
+ strerror (errno));
+ goto out;
+ }
+
+ ret = pthread_join (thr_id, &retval);
+ if (ret || (retval != PTHREAD_CANCELED)) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "cancel request not adhered as expected"
+ " (reason: %s)", strerror (errno));
+ }
+
+ out:
+ return;
+}
+
+inline void *
+changelog_get_usable_buffer (changelog_local_t *local)
+{
+ changelog_log_data_t *cld = NULL;
+
+ cld = &local->cld;
+ if (!cld->cld_iobuf)
+ return NULL;
+
+ return cld->cld_iobuf->ptr;
+}
+
+inline void
+changelog_set_usable_record_and_length (changelog_local_t *local,
+ size_t len, int xr)
+{
+ changelog_log_data_t *cld = NULL;
+
+ cld = &local->cld;
+
+ cld->cld_ptr_len = len;
+ cld->cld_xtra_records = xr;
+}
+
+void
+changelog_local_cleanup (xlator_t *xl, changelog_local_t *local)
+{
+ int i = 0;
+ changelog_opt_t *co = NULL;
+ changelog_log_data_t *cld = NULL;
+
+ if (!local)
+ return;
+
+ cld = &local->cld;
+
+ /* cleanup dynamic allocation for extra records */
+ if (cld->cld_xtra_records) {
+ co = (changelog_opt_t *) cld->cld_ptr;
+ for (; i < cld->cld_xtra_records; i++, co++)
+ if (co->co_free)
+ co->co_free (co);
+ }
+
+ CHANGELOG_IOBUF_UNREF (cld->cld_iobuf);
+
+ if (local->inode)
+ inode_unref (local->inode);
+
+ mem_put (local);
+}
+
+inline int
+changelog_write (int fd, char *buffer, size_t len)
+{
+ ssize_t size = 0;
+ size_t writen = 0;
+
+ while (writen < len) {
+ size = write (fd,
+ buffer + writen, len - writen);
+ if (size <= 0)
+ break;
+
+ writen += size;
+ }
+
+ return (writen != len);
+}
+
+static int
+changelog_rollover_changelog (xlator_t *this,
+ changelog_priv_t *priv, unsigned long ts)
+{
+ int ret = -1;
+ int notify = 0;
+ char *bname = NULL;
+ char ofile[PATH_MAX] = {0,};
+ char nfile[PATH_MAX] = {0,};
+
+ if (priv->changelog_fd != -1) {
+ close (priv->changelog_fd);
+ priv->changelog_fd = -1;
+ }
+
+ (void) snprintf (ofile, PATH_MAX,
+ "%s/"CHANGELOG_FILE_NAME, priv->changelog_dir);
+ (void) snprintf (nfile, PATH_MAX,
+ "%s/"CHANGELOG_FILE_NAME".%lu",
+ priv->changelog_dir, ts);
+
+ ret = rename (ofile, nfile);
+ if (!ret)
+ notify = 1;
+
+ if (ret && (errno == ENOENT)) {
+ ret = 0;
+ }
+
+ if (ret) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "error renaming %s -> %s (reason %s)",
+ ofile, nfile, strerror (errno));
+ }
+
+ if (notify) {
+ bname = basename (nfile);
+ gf_log (this->name, GF_LOG_DEBUG, "notifying: %s", bname);
+ ret = changelog_write (priv->wfd, bname, strlen (bname) + 1);
+ if (ret) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "Failed to send file name to notify thread"
+ " (reason: %s)", strerror (errno));
+ }
+ }
+
+ return ret;
+}
+
+int
+changelog_open (xlator_t *this,
+ changelog_priv_t *priv)
+{
+ int fd = 0;
+ int ret = -1;
+ int flags = 0;
+ char buffer[1024] = {0,};
+ char changelog_path[PATH_MAX] = {0,};
+
+ (void) snprintf (changelog_path, PATH_MAX,
+ "%s/"CHANGELOG_FILE_NAME,
+ priv->changelog_dir);
+
+ flags |= (O_CREAT | O_RDWR);
+ if (priv->fsync_interval == 0)
+ flags |= O_SYNC;
+
+ fd = open (changelog_path, flags,
+ S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
+ if (fd < 0) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "unable to open/create changelog file %s"
+ " (reason: %s). change-logging will be"
+ " inactive", changelog_path, strerror (errno));
+ goto out;
+ }
+
+ priv->changelog_fd = fd;
+
+ (void) snprintf (buffer, 1024, CHANGELOG_HEADER,
+ CHANGELOG_VERSION_MAJOR,
+ CHANGELOG_VERSION_MINOR,
+ priv->encode_mode);
+ ret = changelog_write_change (priv, buffer, strlen (buffer));
+ if (ret) {
+ close (priv->changelog_fd);
+ priv->changelog_fd = -1;
+ goto out;
+ }
+
+ ret = 0;
+
+ out:
+ return ret;
+}
+
+int
+changelog_start_next_change (xlator_t *this,
+ changelog_priv_t *priv,
+ unsigned long ts, gf_boolean_t finale)
+{
+ int ret = -1;
+
+ ret = changelog_rollover_changelog (this, priv, ts);
+
+ if (!ret && !finale)
+ ret = changelog_open (this, priv);
+
+ return ret;
+}
+
+/**
+ * return the length of entry
+ */
+inline size_t
+changelog_entry_length ()
+{
+ return sizeof (changelog_log_data_t);
+}
+
+int
+changelog_fill_rollover_data (changelog_log_data_t *cld, gf_boolean_t is_last)
+{
+ struct timeval tv = {0,};
+
+ cld->cld_type = CHANGELOG_TYPE_ROLLOVER;
+
+ if (gettimeofday (&tv, NULL))
+ return -1;
+
+ cld->cld_roll_time = (unsigned long) tv.tv_sec;
+ cld->cld_finale = is_last;
+ return 0;
+}
+
+int
+changelog_write_change (changelog_priv_t *priv, char *buffer, size_t len)
+{
+ return changelog_write (priv->changelog_fd, buffer, len);
+}
+
+inline int
+changelog_handle_change (xlator_t *this,
+ changelog_priv_t *priv, changelog_log_data_t *cld)
+{
+ int ret = 0;
+
+ if (CHANGELOG_TYPE_IS_ROLLOVER (cld->cld_type)) {
+ ret = changelog_start_next_change (this, priv,
+ cld->cld_roll_time,
+ cld->cld_finale);
+ if (ret)
+ gf_log (this->name, GF_LOG_ERROR,
+ "Problem rolling over changelog(s)");
+ goto out;
+ }
+
+ /**
+ * case when there is reconfigure done (disabling changelog) and there
+ * are still fops that have updates in prgress.
+ */
+ if (priv->changelog_fd == -1)
+ return 0;
+
+ if (CHANGELOG_TYPE_IS_FSYNC (cld->cld_type)) {
+ ret = fsync (priv->changelog_fd);
+ if (ret < 0) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "fsync failed (reason: %s)",
+ strerror (errno));
+ }
+ goto out;
+ }
+
+ ret = priv->ce->encode (this, cld);
+ if (ret) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "error writing changelog to disk");
+ }
+
+ out:
+ return ret;
+}
+
+changelog_local_t *
+changelog_local_init (xlator_t *this, inode_t *inode,
+ uuid_t gfid, int xtra_records,
+ gf_boolean_t update_flag)
+{
+ changelog_local_t *local = NULL;
+ struct iobuf *iobuf = NULL;
+
+ /**
+ * We relax the presence of inode if @update_flag is true.
+ * The caller (implmentation of the fop) needs to be careful to
+ * not blindly use local->inode.
+ */
+ if (!update_flag && !inode) {
+ gf_log_callingfn (this->name, GF_LOG_WARNING,
+ "inode needed for version checking !!!");
+ goto out;
+ }
+
+ if (xtra_records) {
+ iobuf = iobuf_get2 (this->ctx->iobuf_pool,
+ xtra_records * CHANGELOG_OPT_RECORD_LEN);
+ if (!iobuf)
+ goto out;
+ }
+
+ local = mem_get0 (this->local_pool);
+ if (!local) {
+ CHANGELOG_IOBUF_UNREF (iobuf);
+ goto out;
+ }
+
+ local->update_no_check = update_flag;
+
+ uuid_copy (local->cld.cld_gfid, gfid);
+
+ local->cld.cld_iobuf = iobuf;
+ local->cld.cld_xtra_records = 0; /* set by the caller */
+
+ if (inode)
+ local->inode = inode_ref (inode);
+
+ out:
+ return local;
+}
+
+int
+changelog_forget (xlator_t *this, inode_t *inode)
+{
+ uint64_t ctx_addr = 0;
+ changelog_inode_ctx_t *ctx = NULL;
+
+ inode_ctx_del (inode, this, &ctx_addr);
+ if (!ctx_addr)
+ return 0;
+
+ ctx = (changelog_inode_ctx_t *) (long) ctx_addr;
+ GF_FREE (ctx);
+
+ return 0;
+}
+
+int
+changelog_inject_single_event (xlator_t *this,
+ changelog_priv_t *priv,
+ changelog_log_data_t *cld)
+{
+ return priv->cd.dispatchfn (this, priv, priv->cd.cd_data, cld, NULL);
+}
+
+/**
+ * TODO: these threads have many thing in common (wake up after
+ * a certain time etc..). move them into separate routine.
+ */
+void *
+changelog_rollover (void *data)
+{
+ int ret = 0;
+ xlator_t *this = NULL;
+ struct timeval tv = {0,};
+ changelog_log_data_t cld = {0,};
+ changelog_time_slice_t *slice = NULL;
+ changelog_priv_t *priv = data;
+
+ this = priv->cr.this;
+ slice = &priv->slice;
+
+ while (1) {
+ tv.tv_sec = priv->rollover_time;
+ tv.tv_usec = 0;
+
+ ret = select (0, NULL, NULL, NULL, &tv);
+ if (ret)
+ continue;
+
+ ret = changelog_fill_rollover_data (&cld, _gf_false);
+ if (ret) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "failed to fill rollover data");
+ continue;
+ }
+
+ LOCK (&priv->lock);
+ {
+ ret = changelog_inject_single_event (this, priv, &cld);
+ if (!ret)
+ SLICE_VERSION_UPDATE (slice);
+ }
+ UNLOCK (&priv->lock);
+ }
+
+ return NULL;
+}
+
+void *
+changelog_fsync_thread (void *data)
+{
+ int ret = 0;
+ xlator_t *this = NULL;
+ struct timeval tv = {0,};
+ changelog_log_data_t cld = {0,};
+ changelog_priv_t *priv = data;
+
+ this = priv->cf.this;
+ cld.cld_type = CHANGELOG_TYPE_FSYNC;
+
+ while (1) {
+ tv.tv_sec = priv->fsync_interval;
+ tv.tv_usec = 0;
+
+ ret = select (0, NULL, NULL, NULL, &tv);
+ if (ret)
+ continue;
+
+ ret = changelog_inject_single_event (this, priv, &cld);
+ if (ret)
+ gf_log (this->name, GF_LOG_ERROR,
+ "failed to inject fsync event");
+ }
+
+ return NULL;
+}
+
+/* macros for inode/changelog version checks */
+
+#define INODE_VERSION_UPDATE(priv, inode, iver, slice, type) do { \
+ LOCK (&inode->lock); \
+ { \
+ LOCK (&priv->lock); \
+ { \
+ *iver = slice->changelog_version[type]; \
+ } \
+ UNLOCK (&priv->lock); \
+ } \
+ UNLOCK (&inode->lock); \
+ } while (0)
+
+#define INODE_VERSION_EQUALS_SLICE(priv, ver, slice, type, upd) do { \
+ LOCK (&priv->lock); \
+ { \
+ upd = (ver == slice->changelog_version[type]) \
+ ? _gf_false : _gf_true; \
+ } \
+ UNLOCK (&priv->lock); \
+ } while (0)
+
+static int
+__changelog_inode_ctx_set (xlator_t *this,
+ inode_t *inode, changelog_inode_ctx_t *ctx)
+{
+ uint64_t ctx_addr = (uint64_t) ctx;
+ return __inode_ctx_set (inode, this, &ctx_addr);
+}
+
+/**
+ * one shot routine to get the address and the value of a inode version
+ * for a particular type.
+ */
+static changelog_inode_ctx_t *
+__changelog_inode_ctx_get (xlator_t *this,
+ inode_t *inode, unsigned long **iver,
+ unsigned long *version, changelog_log_type type)
+{
+ int ret = 0;
+ uint64_t ctx_addr = 0;
+ changelog_inode_ctx_t *ctx = NULL;
+
+ ret = __inode_ctx_get (inode, this, &ctx_addr);
+ if (ret < 0)
+ ctx_addr = 0;
+ if (ctx_addr != 0) {
+ ctx = (changelog_inode_ctx_t *) (long)ctx_addr;
+ goto out;
+ }
+
+ ctx = GF_CALLOC (1, sizeof (*ctx), gf_changelog_mt_inode_ctx_t);
+ if (!ctx)
+ goto out;
+
+ ret = __changelog_inode_ctx_set (this, inode, ctx);
+ if (ret) {
+ GF_FREE (ctx);
+ ctx = NULL;
+ }
+
+ out:
+ if (ctx && iver && version) {
+ *iver = CHANGELOG_INODE_VERSION_TYPE (ctx, type);
+ *version = **iver;
+ }
+
+ return ctx;
+}
+
+static changelog_inode_ctx_t *
+changelog_inode_ctx_get (xlator_t *this,
+ inode_t *inode, unsigned long **iver,
+ unsigned long *version, changelog_log_type type)
+{
+ changelog_inode_ctx_t *ctx = NULL;
+
+ LOCK (&inode->lock);
+ {
+ ctx = __changelog_inode_ctx_get (this,
+ inode, iver, version, type);
+ }
+ UNLOCK (&inode->lock);
+
+ return ctx;
+}
+
+/**
+ * This is the main update routine. Locking has been made granular so as to
+ * maximize parallelism of fops - I'll try to explain it below using execution
+ * timelines.
+ *
+ * Basically, the contention is between multiple execution threads of this
+ * routine and the roll-over thread. So, instead of having a big lock, we hold
+ * granular locks: inode->lock and priv->lock. Now I'll explain what happens
+ * when there is an update and a roll-over at just about the same time.
+ * NOTE:
+ * - the dispatcher itself synchronizes updates via it's own lock
+ * - the slice version in incremented by the roll-over thread
+ *
+ * Case 1: When the rollover thread wins before the inode version can be
+ * compared with the slice version.
+ *
+ * [updater] | [rollover]
+ * |
+ * | <SLICE: 1, 1, 1>
+ * <changelog_update> |
+ * <changelog_inode_ctx_get> |
+ * <CTX: 1, 1, 1> |
+ * | <dispatch-rollover-event>
+ * | LOCK (&priv->lock)
+ * | <SLICE_VERSION_UPDATE>
+ * | <SLICE: 2, 2, 2>
+ * | UNLOCK (&priv->lock)
+ * |
+ * LOCK (&priv->lock) |
+ * <INODE_VERSION_EQUALS_SLICE> |
+ * I: 1 <-> S: 2 |
+ * update: true |
+ * UNLOCK (&priv->lock) |
+ * |
+ * <if update == true> |
+ * <dispath-update-event> |
+ * <INODE_VERSION_UPDATE> |
+ * LOCK (&inode->lock) |
+ * LOCK (&priv->lock) |
+ * <CTX: 2, 1, 1> |
+ * UNLOCK (&priv->lock) |
+ * UNLOCK (&inode->lock) |
+ *
+ * Therefore, the change gets recorded in the next change (no lost change). If
+ * the slice version was ahead of the inode version (say I:1, S: 2), then
+ * anyway the comparison would result in a update (I: 1, S: 3).
+ *
+ * If the rollover time is too less, then there is another contention when the
+ * updater tries to bring up inode version to the slice version (this is also
+ * the case when the roll-over thread wakes up during INODE_VERSION_UPDATE.
+ *
+ * <CTX: 1, 1, 1> | <SLICE: 2, 2, 2>
+ * |
+ * |
+ * <dispath-update-event> |
+ * <INODE_VERSION_UPDATE> |
+ * LOCK (&inode->lock) |
+ * LOCK (&priv->lock) |
+ * <CTX: 2, 1, 1> |
+ * UNLOCK (&priv->lock) |
+ * UNLOCK (&inode->lock) |
+ * | <dispatch-rollover-event>
+ * | LOCK (&priv->lock)
+ * | <SLICE_VERSION_UPDATE>
+ * | <SLICE: 3, 3, 3>
+ * | UNLOCK (&priv->lock)
+ *
+ *
+ * Case 2: When the fop thread wins
+ *
+ * [updater] | [rollover]
+ * |
+ * | <SLICE: 1, 1, 1>
+ * <changelog_update> |
+ * <changelog_inode_ctx_get> |
+ * <CTX: 0, 0, 0> |
+ * |
+ * LOCK (&priv->lock) |
+ * <INODE_VERSION_EQUALS_SLICE> |
+ * I: 0 <-> S: 1 |
+ * update: true |
+ * UNLOCK (&priv->lock) |
+ * | <dispatch-rollover-event>
+ * | LOCK (&priv->lock)
+ * | <SLICE_VERSION_UPDATE>
+ * | <SLICE: 2, 2, 2>
+ * | UNLOCK (&priv->lock)
+ * <if update == true> |
+ * <dispath-update-event> |
+ * <INODE_VERSION_UPDATE> |
+ * LOCK (&inode->lock) |
+ * LOCK (&priv->lock) |
+ * <CTX: 2, 0, 0> |
+ * UNLOCK (&priv->lock) |
+ * UNLOCK (&inode->lock) |
+ *
+ * Here again, if the inode version was equal to the slice version (I: 1, S: 1)
+ * then there is no need to record an update (as the equality of the two version
+ * signifies an update was recorded in the current time slice).
+ */
+inline void
+changelog_update (xlator_t *this, changelog_priv_t *priv,
+ changelog_local_t *local, changelog_log_type type)
+{
+ int ret = 0;
+ unsigned long *iver = NULL;
+ unsigned long version = 0;
+ inode_t *inode = NULL;
+ changelog_time_slice_t *slice = NULL;
+ changelog_inode_ctx_t *ctx = NULL;
+ changelog_log_data_t *cld_0 = NULL;
+ changelog_log_data_t *cld_1 = NULL;
+ changelog_local_t *next_local = NULL;
+ gf_boolean_t need_upd = _gf_true;
+
+ slice = &priv->slice;
+
+ /**
+ * for fops that do not require inode version checking
+ */
+ if (local->update_no_check)
+ goto update;
+
+ inode = local->inode;
+
+ ctx = changelog_inode_ctx_get (this,
+ inode, &iver, &version, type);
+ if (!ctx)
+ goto update;
+
+ INODE_VERSION_EQUALS_SLICE (priv, version, slice, type, need_upd);
+
+ update:
+ if (need_upd) {
+ cld_0 = &local->cld;
+ cld_0->cld_type = type;
+
+ if ( (next_local = local->prev_entry) != NULL ) {
+ cld_1 = &next_local->cld;
+ cld_1->cld_type = type;
+ }
+
+ ret = priv->cd.dispatchfn (this, priv,
+ priv->cd.cd_data, cld_0, cld_1);
+
+ /**
+ * update after the dispatcher has successfully done
+ * it's job.
+ */
+ if (!local->update_no_check && iver && !ret)
+ INODE_VERSION_UPDATE (priv, inode, iver, slice, type);
+ }
+
+ return;
+}