summaryrefslogtreecommitdiffstats
path: root/xlators/features/changelog/src/policy/changelog-policy-replication.c
diff options
context:
space:
mode:
Diffstat (limited to 'xlators/features/changelog/src/policy/changelog-policy-replication.c')
-rw-r--r--xlators/features/changelog/src/policy/changelog-policy-replication.c1184
1 files changed, 1184 insertions, 0 deletions
diff --git a/xlators/features/changelog/src/policy/changelog-policy-replication.c b/xlators/features/changelog/src/policy/changelog-policy-replication.c
new file mode 100644
index 000000000..536339939
--- /dev/null
+++ b/xlators/features/changelog/src/policy/changelog-policy-replication.c
@@ -0,0 +1,1184 @@
+/*
+ Copyright (c) 2013 Red Hat, Inc. <http://www.redhat.com>
+ This file is part of GlusterFS.
+
+ This file is licensed to you under your choice of the GNU Lesser
+ General Public License, version 3 or any later version (LGPLv3 or
+ later), or the GNU General Public License, version 2 (GPLv2), in all
+ cases as published by the Free Software Foundation.
+*/
+
+#include "changelog-policy.h"
+#include "changelog-encoders.h"
+#include "changelog-fops.h"
+
+#define JOURNAL_NAME "TERM"
+
+#define JOURNAL_SECTOR_SIZE 128
+
+#define PRE_OP_MARK 0x5F4552505FULL /* _PRE_ */
+#define POST_OP_MARK 0x5F54534F505FULL /* _POST_ */
+
+/**
+ * assume an ever increasing index for now..
+ */
+static unsigned long nsr_index = 1;
+
+static unsigned long
+get_index(changelog_priv_t *priv) {
+ unsigned long idx = 0;
+
+ LOCK (&priv->lock);
+ {
+ idx = nsr_index++;
+ }
+ UNLOCK (&priv->lock);
+
+ return idx;
+}
+
+static void
+reset_index(changelog_priv_t *priv) {
+ nsr_index = 1;
+}
+
+
+#if 0
+static inline void
+//changelog_replication_assign_term (changelog_priv_t *priv,
+ changelog_local_t *local)
+{
+ local->nr_bytes = 0;
+ local->lu.val = get_index (priv);
+}
+#endif
+
+size_t
+number_fn (void *data, char *buffer, gf_boolean_t encode)
+{
+ char buf[1024] = {0,};
+ size_t bufsz = 0;
+ unsigned long long nr = 0;
+
+ nr = *(unsigned long long *) data;
+
+ if (encode) {
+ (void) snprintf (buf, sizeof (buf), "%llu", nr);
+ CHANGELOG_FILL_BUFFER (buffer, bufsz, buf, strlen (buf));
+ } else
+ CHANGELOG_FILL_BUFFER (buffer, bufsz,
+ &nr, sizeof (unsigned long long));
+
+ return bufsz;
+}
+
+size_t
+uuid_fn (void *data, char *buffer, gf_boolean_t encode)
+{
+ char buf[1024] = {0,};
+ uuid_t uuid = {0,};
+ size_t bufsz = 0;
+
+ memcpy (uuid, (uuid_t *) data, sizeof (uuid_t));
+
+ if (encode) {
+ char *tmpbuf = uuid_utoa (uuid);
+ (void) snprintf (buf, sizeof (buf), "%s", tmpbuf);
+ CHANGELOG_FILL_BUFFER (buffer, bufsz, buf, strlen (buf));
+ } else
+ CHANGELOG_FILL_BUFFER (buffer, bufsz, uuid, sizeof (uuid_t));
+
+ return bufsz;
+}
+
+#define CHANGELOG_FILL_USIGNLL(co, number, converter, xlen) do { \
+ co->co_convert = converter; \
+ co->co_free = NULL; \
+ co->co_type = CHANGELOG_OPT_REC_ULL; \
+ co->co_number = (unsigned long long) number; \
+ xlen += sizeof (unsigned long long); \
+ if (!co->co_convert) \
+ co->co_len = sizeof (unsigned long long); \
+ } while (0)
+
+#define CHANGELOG_FILL_UUID(co, uuid, converter, xlen) do { \
+ co->co_convert = converter; \
+ co->co_free = NULL; \
+ co->co_type = CHANGELOG_OPT_REC_UUID; \
+ uuid_copy (co->co_uuid, uuid); \
+ xlen += sizeof (uuid_t); \
+ } while (0)
+
+
+/* TBD: move declarations here and nsr.c into a common place */
+#define NSR_TERM_XATTR "trusted.nsr.term"
+#define RECON_TERM_XATTR "trusted.nsr.recon-term"
+#define RECON_INDEX_XATTR "trusted.nsr.recon-index"
+
+static gf_boolean_t
+changelog_fix_term(xlator_t *this,
+ changelog_local_t *local,
+ dict_t *xdata)
+{
+ int32_t old_term, new_term;
+ changelog_priv_t *priv = this->private;
+ int ret = 0;
+ char nfile[PATH_MAX] = {0,};
+ int32_t recon_term, recon_index;
+ changelog_rollover_data_t crd;
+
+ // If coming via the regular IO path, we should get the dict "nsr-term"
+ // If coming via reconciliation, we should get the dicts "nsr-recon-term"
+ // that indicates the term and "nsr-recon-index" for the index
+ if (dict_get_int32(xdata,NSR_TERM_XATTR,&new_term) == 0) {
+ old_term = priv->term;
+
+ if (old_term != new_term) {
+ GF_ASSERT(new_term > old_term);
+ LOCK (&priv->lock);
+ reset_index(priv);
+ priv->term = new_term;
+ (void) snprintf (nfile, PATH_MAX, "%s.%d",
+ JOURNAL_NAME, priv->term);
+ ret = CHANGELOG_INVOKE_CFOP(this, priv, rollover,
+ nfile, _gf_false);
+ UNLOCK (&priv->lock);
+ if (ret != 0)
+ return _gf_false;
+ }
+ local->nr_bytes = 0;
+ local->lu.val = get_index (priv);
+ } else if ((dict_get_int32(xdata, RECON_TERM_XATTR, &recon_term) == 0) &&
+ (dict_get_int32(xdata, RECON_INDEX_XATTR, &recon_index) == 0)) {
+
+ old_term = priv->term;
+
+ if (old_term != recon_term) {
+ LOCK (&priv->lock);
+ priv->term = recon_term;
+ (void) snprintf (crd.crd_changelog_name, PATH_MAX, "%s.%d", JOURNAL_NAME, priv->term);
+ crd.crd_prealloc_size = 0;
+ if (changelog_open(this, priv, local, &crd) != 0)
+ return _gf_false;
+ UNLOCK (&priv->lock);
+ }
+ local->nr_bytes = 0;
+ local->lu.val = recon_index;
+ } else {
+ return _gf_false;
+ }
+
+ return _gf_true;
+}
+
+
+
+/** override FOPS */
+
+int32_t
+changelog_replication_rmdir (call_frame_t *frame, xlator_t *this,
+ loc_t *loc, int xflags, dict_t *xdata)
+{
+ int ret = -1;
+ size_t xtra_len = 0;
+ changelog_opt_t *co = NULL;
+ changelog_priv_t *priv = NULL;
+ changelog_local_t *local = NULL;
+
+ priv = this->private;
+
+ /* <PRE> + FOP + GFID + Entry */
+ CHANGELOG_INIT_NOCHECK (this, local, NULL, loc->inode->gfid, 4);
+ if (!local)
+ goto out;
+
+ co = changelog_get_usable_buffer (local);
+ if (!co)
+ goto out;
+
+ if (changelog_fix_term(this, local, xdata) == _gf_false)
+ goto out;
+
+ CHANGELOG_FILL_USIGNLL (co, PRE_OP_MARK, NULL, xtra_len);
+ co++;
+
+ CHANGELOG_FILL_FOP_NUMBER (co, frame->root->op, fop_fn, xtra_len);
+ co++;
+
+ CHANGELOG_FILL_UUID (co, loc->inode->gfid, uuid_fn, xtra_len);
+ co++;
+
+ CHANGELOG_FILL_ENTRY (co, loc->pargfid, loc->name,
+ entry_fn, entry_free_fn, xtra_len, out);
+
+ changelog_set_usable_record_and_length (local, xtra_len, 4);
+
+ //changelog_replication_assign_term (priv, local);
+
+ frame->local = local;
+ ret = 0;
+
+ out:
+ if (ret)
+ changelog_local_cleanup (this, local);
+ return ret;
+}
+
+int32_t
+changelog_replication_unlink (call_frame_t *frame, xlator_t *this,
+ loc_t *loc, int xflags, dict_t *xdata)
+{
+ return changelog_replication_rmdir (frame, this, loc, xflags, xdata);
+}
+
+int32_t
+changelog_replication_rename (call_frame_t *frame, xlator_t *this,
+ loc_t *oldloc, loc_t *newloc, dict_t *xdata)
+{
+ int ret = -1;
+ size_t xtra_len = 0;
+ changelog_opt_t *co = NULL;
+ changelog_priv_t *priv = NULL;
+ changelog_local_t *local = NULL;
+
+ priv = this->private;
+
+ /* <PRE> + FOP + GFID + OLDLOC + NEWLOC */
+ CHANGELOG_INIT_NOCHECK (this, local, NULL, oldloc->inode->gfid, 5);
+ if (!local)
+ goto out;
+
+ co = changelog_get_usable_buffer (local);
+ if (!co)
+ goto out;
+
+ if (changelog_fix_term(this, local, xdata) == _gf_false)
+ goto out;
+
+ CHANGELOG_FILL_USIGNLL (co, PRE_OP_MARK, NULL, xtra_len);
+ co++;
+
+ CHANGELOG_FILL_FOP_NUMBER (co, frame->root->op, fop_fn, xtra_len);
+ co++;
+
+ CHANGELOG_FILL_UUID (co, oldloc->inode->gfid, uuid_fn, xtra_len);
+ co++;
+
+ CHANGELOG_FILL_ENTRY (co, oldloc->pargfid, oldloc->name,
+ entry_fn, entry_free_fn, xtra_len, out);
+ co++;
+
+ CHANGELOG_FILL_ENTRY (co, newloc->pargfid, newloc->name,
+ entry_fn, entry_free_fn, xtra_len, out);
+
+ //changelog_replication_assign_term (priv, local);
+
+ changelog_set_usable_record_and_length (local, xtra_len, 5);
+
+ frame->local = local;
+ ret = 0;
+
+ out:
+ if (ret)
+ changelog_local_cleanup (this, local);
+ return ret;
+}
+
+int32_t
+changelog_replication_link (call_frame_t *frame,
+ xlator_t *this, loc_t *oldloc,
+ loc_t *newloc, dict_t *xdata)
+{
+ int ret = -1;
+ size_t xtra_len = 0;
+ changelog_opt_t *co = NULL;
+ changelog_priv_t *priv = NULL;
+ changelog_local_t *local = NULL;
+
+ priv = this->private;
+
+ /* <PRE> + FOP + GFID + Entry */
+ CHANGELOG_INIT_NOCHECK (this, local, NULL, oldloc->gfid, 4);
+ if (!local)
+ goto out;
+
+ co = changelog_get_usable_buffer (local);
+ if (!co)
+ goto out;
+
+ if (changelog_fix_term(this, local, xdata) == _gf_false)
+ goto out;
+
+ CHANGELOG_FILL_USIGNLL (co, PRE_OP_MARK, NULL, xtra_len);
+ co++;
+
+ CHANGELOG_FILL_FOP_NUMBER (co, frame->root->op, fop_fn, xtra_len);
+ co++;
+
+ CHANGELOG_FILL_UUID (co, oldloc->gfid, uuid_fn, xtra_len);
+ co++;
+
+ CHANGELOG_FILL_ENTRY (co, newloc->pargfid, newloc->name,
+ entry_fn, entry_free_fn, xtra_len, out);
+
+ changelog_set_usable_record_and_length (local, xtra_len, 4);
+
+ //changelog_replication_assign_term (priv, local);
+
+ frame->local = local;
+ ret = 0;
+
+ out:
+ if (ret)
+ changelog_local_cleanup (this, local);
+ return ret;
+}
+
+int32_t
+changelog_replication_mkdir (call_frame_t *frame,
+ xlator_t *this, loc_t *loc,
+ mode_t mode, mode_t umask, dict_t *xdata)
+{
+ int ret = -1;
+ uuid_t gfid = {0,};
+ void *uuid_req = NULL;
+ size_t xtra_len = 0;
+ changelog_opt_t *co = NULL;
+ changelog_priv_t *priv = NULL;
+ changelog_local_t *local = NULL;
+
+ priv = this->private;
+
+ ret = dict_get_ptr (xdata, "gfid-req", &uuid_req);
+ if (ret) {
+ gf_log (this->name, GF_LOG_DEBUG,
+ "failed to get gfid from dict");
+ goto out;
+ }
+ uuid_copy (gfid, uuid_req);
+
+ ret = -1;
+
+ /* <PRE> + FOP + GFID + Entry */
+ CHANGELOG_INIT_NOCHECK (this, local, NULL, gfid, 4);
+ if (!local)
+ goto out;
+
+ co = changelog_get_usable_buffer (local);
+ if (!co)
+ goto out;
+
+ if (changelog_fix_term(this, local, xdata) == _gf_false)
+ goto out;
+
+ CHANGELOG_FILL_USIGNLL (co, PRE_OP_MARK, NULL, xtra_len);
+ co++;
+
+ CHANGELOG_FILL_FOP_NUMBER (co, frame->root->op, fop_fn, xtra_len);
+ co++;
+
+ CHANGELOG_FILL_UUID (co, gfid, uuid_fn, xtra_len);
+ co++;
+
+ CHANGELOG_FILL_ENTRY (co, loc->pargfid, loc->name,
+ entry_fn, entry_free_fn, xtra_len, out);
+
+ changelog_set_usable_record_and_length (local, xtra_len, 4);
+
+ //changelog_replication_assign_term (priv, local);
+
+ frame->local = local;
+ ret = 0;
+
+ out:
+ if (ret)
+ changelog_local_cleanup (this, local);
+ return ret;
+}
+
+int32_t
+changelog_replication_symlink (call_frame_t *frame, xlator_t *this,
+ const char *linkname, loc_t *loc,
+ mode_t umask, dict_t *xdata)
+{
+ int ret = -1;
+ size_t xtra_len = 0;
+ uuid_t gfid = {0,};
+ void *uuid_req = NULL;
+ changelog_opt_t *co = NULL;
+ changelog_priv_t *priv = NULL;
+ changelog_local_t *local = NULL;
+
+ priv = this->private;
+
+ ret = dict_get_ptr (xdata, "gfid-req", &uuid_req);
+ if (ret) {
+ gf_log (this->name, GF_LOG_DEBUG,
+ "failed to get gfid from dict");
+ goto out;
+ }
+ uuid_copy (gfid, uuid_req);
+
+ ret = -1;
+
+ /* <PRE> + FOP + GFID + Entry */
+ CHANGELOG_INIT_NOCHECK (this, local, NULL, gfid, 4);
+ if (!local)
+ goto out;
+
+ co = changelog_get_usable_buffer (local);
+ if (!co)
+ goto out;
+
+ if (changelog_fix_term(this, local, xdata) == _gf_false)
+ goto out;
+
+ CHANGELOG_FILL_USIGNLL (co, PRE_OP_MARK, NULL, xtra_len);
+ co++;
+
+ CHANGELOG_FILL_FOP_NUMBER (co, frame->root->op, fop_fn, xtra_len);
+ co++;
+
+ CHANGELOG_FILL_UUID (co, gfid, uuid_fn, xtra_len);
+ co++;
+
+ CHANGELOG_FILL_ENTRY (co, loc->pargfid, loc->name,
+ entry_fn, entry_free_fn, xtra_len, out);
+
+ changelog_set_usable_record_and_length (local, xtra_len, 4);
+
+ //changelog_replication_assign_term (priv, local);
+
+ frame->local = local;
+ ret = 0;
+
+ out:
+ if (ret)
+ changelog_local_cleanup (this, local);
+ return ret;
+}
+
+int32_t
+changelog_replication_mknod (call_frame_t *frame,
+ xlator_t *this, loc_t *loc,
+ mode_t mode, dev_t dev,
+ mode_t umask, dict_t *xdata)
+{
+ int ret = -1;
+ uuid_t gfid = {0,};
+ void *uuid_req = NULL;
+ size_t xtra_len = 0;
+ changelog_opt_t *co = NULL;
+ changelog_priv_t *priv = NULL;
+ changelog_local_t *local = NULL;
+
+ priv = this->private;
+
+ ret = dict_get_ptr (xdata, "gfid-req", &uuid_req);
+ if (ret) {
+ gf_log (this->name, GF_LOG_DEBUG,
+ "failed to get gfid from dict");
+ goto out;
+ }
+ uuid_copy (gfid, uuid_req);
+
+ ret = -1;
+
+ /* <PRE> + FOP + GFID + Entry */
+ CHANGELOG_INIT_NOCHECK (this, local, NULL, gfid, 4);
+
+ co = changelog_get_usable_buffer (local);
+ if (!co)
+ goto out;
+
+ if (changelog_fix_term(this, local, xdata) == _gf_false)
+ goto out;
+
+ CHANGELOG_FILL_USIGNLL (co, PRE_OP_MARK, NULL, xtra_len);
+ co++;
+
+ CHANGELOG_FILL_FOP_NUMBER (co, frame->root->op, fop_fn, xtra_len);
+ co++;
+
+ CHANGELOG_FILL_UUID (co, gfid, uuid_fn, xtra_len);
+ co++;
+
+ CHANGELOG_FILL_ENTRY (co, loc->pargfid, loc->name,
+ entry_fn, entry_free_fn, xtra_len, out);
+
+ changelog_set_usable_record_and_length (local, xtra_len, 4);
+
+ //changelog_replication_assign_term (priv, local);
+
+ frame->local = local;
+ ret = 0;
+
+ out:
+ if (ret)
+ changelog_local_cleanup (this, local);
+ return ret;
+}
+
+int32_t
+changelog_replication_create (call_frame_t *frame, xlator_t *this,
+ loc_t *loc, int32_t flags, mode_t mode,
+ mode_t umask, fd_t *fd, dict_t *xdata)
+{
+ int ret = -1;
+ uuid_t gfid = {0,};
+ void *uuid_req = NULL;
+ size_t xtra_len = 0;
+ changelog_opt_t *co = NULL;
+ changelog_priv_t *priv = NULL;
+ changelog_local_t *local = NULL;
+
+ priv = this->private;
+
+ ret = dict_get_ptr (xdata, "gfid-req", &uuid_req);
+ if (ret) {
+ gf_log (this->name, GF_LOG_DEBUG,
+ "failed to get gfid from dict");
+ goto out;
+ }
+ uuid_copy (gfid, uuid_req);
+
+ ret = -1;
+
+ /* <PRE> + FOP + GFID + Entry */
+ CHANGELOG_INIT_NOCHECK (this, local, NULL, gfid, 4);
+
+ co = changelog_get_usable_buffer (local);
+ if (!co)
+ goto out;
+
+ if (changelog_fix_term(this, local, xdata) == _gf_false)
+ goto out;
+
+ CHANGELOG_FILL_USIGNLL (co, PRE_OP_MARK, NULL, xtra_len);
+ co++;
+
+ CHANGELOG_FILL_FOP_NUMBER (co, frame->root->op, fop_fn, xtra_len);
+ co++;
+
+ CHANGELOG_FILL_UUID (co, gfid, uuid_fn, xtra_len);
+ co++;
+
+ CHANGELOG_FILL_ENTRY (co, loc->pargfid, loc->name,
+ entry_fn, entry_free_fn, xtra_len, out);
+
+ changelog_set_usable_record_and_length (local, xtra_len, 4);
+
+ //changelog_replication_assign_term (priv, local);
+
+ frame->local = local;
+ ret = 0;
+
+ out:
+ if (ret)
+ changelog_local_cleanup (this, local);
+ return ret;
+}
+
+int32_t
+changelog_replication_fsetattr (call_frame_t *frame,
+ xlator_t *this, fd_t *fd,
+ struct iatt *stbuf, int32_t valid,
+ dict_t *xdata)
+{
+ int ret = -1;
+ size_t xtra_len = 0;
+ changelog_opt_t *co = NULL;
+ changelog_priv_t *priv = NULL;
+ changelog_local_t *local = NULL;
+
+ priv = this->private;
+
+ /* <PRE> + FOP + GFID */
+ CHANGELOG_INIT_NOCHECK (this, local, NULL, fd->inode->gfid, 3);
+ if (!local)
+ goto out;
+
+ co = changelog_get_usable_buffer (local);
+ if (!co)
+ goto out;
+
+ if (changelog_fix_term(this, local, xdata) == _gf_false)
+ goto out;
+
+ CHANGELOG_FILL_USIGNLL (co, PRE_OP_MARK, NULL, xtra_len);
+ co++;
+
+ CHANGELOG_FILL_FOP_NUMBER (co, frame->root->op, fop_fn, xtra_len);
+ co++;
+
+ CHANGELOG_FILL_UUID (co, fd->inode->gfid, uuid_fn, xtra_len);
+
+ changelog_set_usable_record_and_length (local, xtra_len, 3);
+
+ //changelog_replication_assign_term (priv, local);
+
+ frame->local = local;
+ ret = 0;
+
+ out:
+ if (ret)
+ changelog_local_cleanup (this, local);
+ return ret;
+}
+
+int32_t
+changelog_replication_setattr (call_frame_t *frame,
+ xlator_t *this, loc_t *loc,
+ struct iatt *stbuf, int32_t valid, dict_t *xdata)
+{
+ int ret = -1;
+ size_t xtra_len = 0;
+ changelog_opt_t *co = NULL;
+ changelog_priv_t *priv = NULL;
+ changelog_local_t *local = NULL;
+
+ priv = this->private;
+
+ /* <PRE> + FOP + GFID */
+ CHANGELOG_INIT_NOCHECK (this, local, NULL, loc->inode->gfid, 3);
+ if (!local)
+ goto out;
+
+ co = changelog_get_usable_buffer (local);
+ if (!co)
+ goto out;
+
+ if (changelog_fix_term(this, local, xdata) == _gf_false)
+ goto out;
+
+ CHANGELOG_FILL_USIGNLL (co, PRE_OP_MARK, NULL, xtra_len);
+ co++;
+
+ CHANGELOG_FILL_FOP_NUMBER (co, frame->root->op, fop_fn, xtra_len);
+ co++;
+
+ CHANGELOG_FILL_UUID (co, loc->inode->gfid, uuid_fn, xtra_len);
+
+ changelog_set_usable_record_and_length (local, xtra_len, 3);
+
+ //changelog_replication_assign_term (priv, local);
+
+ frame->local = local;
+ ret = 0;
+
+ out:
+ if (ret)
+ changelog_local_cleanup (this, local);
+ return ret;
+}
+
+int32_t
+changelog_replication_fremovexattr (call_frame_t *frame, xlator_t *this,
+ fd_t *fd, const char *name, dict_t *xdata)
+{
+ int ret = -1;
+ size_t xtra_len = 0;
+ changelog_opt_t *co = NULL;
+ changelog_priv_t *priv = NULL;
+ changelog_local_t *local = NULL;
+ int32_t xattr_op;
+
+ priv = this->private;
+
+ /* <PRE> + FOP + GFID */
+ CHANGELOG_INIT_NOCHECK (this, local, NULL, fd->inode->gfid, 3);
+ if (!local)
+ goto out;
+
+ co = changelog_get_usable_buffer (local);
+ if (!co)
+ goto out;
+
+ if (changelog_fix_term(this, local, xdata) == _gf_false)
+ goto out;
+
+ CHANGELOG_FILL_USIGNLL (co, PRE_OP_MARK, NULL, xtra_len);
+ co++;
+
+ if (dict_get_int32(xdata, "recon-xattr-opcode", &xattr_op) == 0) {
+ CHANGELOG_FILL_FOP_NUMBER (co, (glusterfs_fop_t)xattr_op,
+ fop_fn, xtra_len);
+ }
+ else {
+ CHANGELOG_FILL_FOP_NUMBER (co, frame->root->op, fop_fn,
+ xtra_len);
+ }
+ co++;
+
+ CHANGELOG_FILL_UUID (co, fd->inode->gfid, uuid_fn, xtra_len);
+
+ changelog_set_usable_record_and_length (local, xtra_len, 3);
+
+ //changelog_replication_assign_term (priv, local);
+
+ frame->local = local;
+ ret = 0;
+
+ out:
+ if (ret)
+ changelog_local_cleanup (this, local);
+ return ret;
+}
+
+int32_t
+changelog_replication_removexattr (call_frame_t *frame, xlator_t *this,
+ loc_t *loc, const char *name, dict_t *xdata)
+{
+ int ret = -1;
+ size_t xtra_len = 0;
+ changelog_opt_t *co = NULL;
+ changelog_priv_t *priv = NULL;
+ changelog_local_t *local = NULL;
+ int32_t xattr_op;
+
+ priv = this->private;
+
+ CHANGELOG_INIT_NOCHECK (this, local, NULL, loc->inode->gfid, 3);
+ if (!local)
+ goto out;
+
+ co = changelog_get_usable_buffer (local);
+ if (!co)
+ goto out;
+
+ if (changelog_fix_term(this, local, xdata) == _gf_false)
+ goto out;
+
+ CHANGELOG_FILL_USIGNLL (co, PRE_OP_MARK, NULL, xtra_len);
+ co++;
+
+ if (dict_get_int32(xdata, "recon-xattr-opcode", &xattr_op) == 0) {
+ CHANGELOG_FILL_FOP_NUMBER (co, (glusterfs_fop_t)xattr_op,
+ fop_fn, xtra_len);
+ }
+ else {
+ CHANGELOG_FILL_FOP_NUMBER (co, frame->root->op, fop_fn,
+ xtra_len);
+ }
+ co++;
+
+ CHANGELOG_FILL_UUID (co, loc->inode->gfid, uuid_fn, xtra_len);
+
+ changelog_set_usable_record_and_length (local, xtra_len, 3);
+
+ //changelog_replication_assign_term (priv, local);
+
+ frame->local = local;
+ ret = 0;
+
+ out:
+ if (ret)
+ changelog_local_cleanup (this, local);
+ return ret;
+}
+
+int32_t
+changelog_replication_setxattr (call_frame_t *frame,
+ xlator_t *this, loc_t *loc,
+ dict_t *dict, int32_t flags, dict_t *xdata)
+{
+ int ret = -1;
+ size_t xtra_len = 0;
+ changelog_opt_t *co = NULL;
+ changelog_priv_t *priv = NULL;
+ changelog_local_t *local = NULL;
+ int32_t xattr_op;
+
+ priv = this->private;
+
+ /* <PRE> + FOP + GFID */
+ CHANGELOG_INIT_NOCHECK (this, local, NULL, loc->inode->gfid, 3);
+ if (!local)
+ goto out;
+
+ co = changelog_get_usable_buffer (local);
+ if (!co)
+ goto out;
+
+ if (changelog_fix_term(this, local, xdata) == _gf_false)
+ goto out;
+
+ CHANGELOG_FILL_USIGNLL (co, PRE_OP_MARK, NULL, xtra_len);
+ co++;
+
+ if (dict_get_int32(xdata, "recon-xattr-opcode", &xattr_op) == 0) {
+ CHANGELOG_FILL_FOP_NUMBER (co, (glusterfs_fop_t)xattr_op,
+ fop_fn, xtra_len);
+ }
+ else {
+ CHANGELOG_FILL_FOP_NUMBER (co, frame->root->op, fop_fn,
+ xtra_len);
+ }
+ co++;
+
+ CHANGELOG_FILL_UUID (co, loc->inode->gfid, uuid_fn, xtra_len);
+
+ changelog_set_usable_record_and_length (local, xtra_len, 3);
+
+ //changelog_replication_assign_term (priv, local);
+
+ frame->local = local;
+ ret = 0;
+
+ out:
+ if (ret)
+ changelog_local_cleanup (this, local);
+ return ret;
+}
+
+int32_t
+changelog_replication_fsetxattr (call_frame_t *frame,
+ xlator_t *this, fd_t *fd, dict_t *dict,
+ int32_t flags, dict_t *xdata)
+{
+ int ret = -1;
+ size_t xtra_len = 0;
+ changelog_opt_t *co = NULL;
+ changelog_priv_t *priv = NULL;
+ changelog_local_t *local = NULL;
+ int32_t xattr_op;
+
+ priv = this->private;
+
+ /* <PRE> + FOP + GFID */
+ CHANGELOG_INIT_NOCHECK (this, local, NULL, fd->inode->gfid, 3);
+ if (!local)
+ goto out;
+
+ co = changelog_get_usable_buffer (local);
+ if (!co)
+ goto out;
+
+ if (changelog_fix_term(this, local, xdata) == _gf_false)
+ goto out;
+
+ CHANGELOG_FILL_USIGNLL (co, PRE_OP_MARK, NULL, xtra_len);
+ co++;
+
+
+ if (dict_get_int32(xdata, "recon-xattr-opcode", &xattr_op) == 0) {
+ CHANGELOG_FILL_FOP_NUMBER (co, (glusterfs_fop_t)xattr_op,
+ fop_fn, xtra_len);
+ }
+ else {
+ CHANGELOG_FILL_FOP_NUMBER (co, frame->root->op, fop_fn,
+ xtra_len);
+ }
+ co++;
+
+ CHANGELOG_FILL_UUID (co, fd->inode->gfid, uuid_fn, xtra_len);
+
+ changelog_set_usable_record_and_length (local, xtra_len, 3);
+
+ //changelog_replication_assign_term (priv, local);
+
+ frame->local = local;
+ ret = 0;
+
+ out:
+ if (ret)
+ changelog_local_cleanup (this, local);
+ return ret;
+}
+
+int32_t
+changelog_replication_truncate (call_frame_t *frame,
+ xlator_t *this, loc_t *loc,
+ off_t offset, dict_t *xdata)
+{
+ int ret = -1;
+ size_t xtra_len = 0;
+ changelog_opt_t *co = NULL;
+ changelog_priv_t *priv = NULL;
+ changelog_local_t *local = NULL;
+
+ priv = this->private;
+
+ /* <PRE> + FOP + GFID + Offset */
+ CHANGELOG_INIT_NOCHECK (this, local, NULL, loc->inode->gfid, 4);
+ if (!local)
+ goto out;
+
+ co = changelog_get_usable_buffer (local);
+ if (!co)
+ goto out;
+
+ if (changelog_fix_term(this, local, xdata) == _gf_false)
+ goto out;
+
+ CHANGELOG_FILL_USIGNLL (co, PRE_OP_MARK, NULL, xtra_len);
+ co++;
+
+ CHANGELOG_FILL_FOP_NUMBER (co, frame->root->op, fop_fn, xtra_len);
+ co++;
+
+ CHANGELOG_FILL_UUID (co, loc->inode->gfid, uuid_fn, xtra_len);
+ co++;
+
+ CHANGELOG_FILL_USIGNLL (co, offset, number_fn, xtra_len);
+
+ changelog_set_usable_record_and_length (local, xtra_len, 4);
+
+ //changelog_replication_assign_term (priv, local);
+
+ frame->local = local;
+ ret = 0;
+
+ out:
+ if (ret)
+ changelog_local_cleanup (this, local);
+ return ret;
+}
+
+int32_t
+changelog_replication_ftruncate (call_frame_t *frame,
+ xlator_t *this, fd_t *fd,
+ off_t offset, dict_t *xdata)
+{
+ int ret = -1;
+ size_t xtra_len = 0;
+ changelog_opt_t *co = NULL;
+ changelog_priv_t *priv = NULL;
+ changelog_local_t *local = NULL;
+
+ priv = this->private;
+
+ /* <PRE> + FOP + GFID + Offset */
+ CHANGELOG_INIT_NOCHECK (this, local, NULL, fd->inode->gfid, 4);
+ if (!local)
+ goto out;
+
+ co = changelog_get_usable_buffer (local);
+ if (!co)
+ goto out;
+
+ if (changelog_fix_term(this, local, xdata) == _gf_false)
+ goto out;
+
+ CHANGELOG_FILL_USIGNLL (co, PRE_OP_MARK, NULL, xtra_len);
+ co++;
+
+ CHANGELOG_FILL_FOP_NUMBER (co, frame->root->op, fop_fn, xtra_len);
+ co++;
+
+ CHANGELOG_FILL_UUID (co, fd->inode->gfid, uuid_fn, xtra_len);
+ co++;
+
+ CHANGELOG_FILL_USIGNLL (co, offset, number_fn, xtra_len);
+
+ changelog_set_usable_record_and_length (local, xtra_len, 4);
+
+ //changelog_replication_assign_term (priv, local);
+
+ frame->local = local;
+ ret = 0;
+
+ out:
+ if (ret)
+ changelog_local_cleanup (this, local);
+ return ret;
+}
+
+int32_t
+changelog_replication_writev (call_frame_t *frame,
+ xlator_t *this, fd_t *fd, struct iovec *vector,
+ int32_t count, off_t offset, uint32_t flags,
+ struct iobref *iobref, dict_t *xdata)
+{
+ int ret = -1;
+ size_t xtra_len = 0;
+ changelog_opt_t *co = NULL;
+ changelog_priv_t *priv = NULL;
+ changelog_local_t *local = NULL;
+
+ priv = this->private;
+
+ /* <PRE> + FOP + GFID + Offset + Length */
+ CHANGELOG_INIT_NOCHECK (this, local, NULL, fd->inode->gfid, 5);
+ if (!local)
+ goto out;
+
+ co = changelog_get_usable_buffer (local);
+ if (!co)
+ goto out;
+
+ if (changelog_fix_term(this, local, xdata) == _gf_false)
+ goto out;
+
+ CHANGELOG_FILL_USIGNLL (co, PRE_OP_MARK, NULL, xtra_len);
+ co++;
+
+ CHANGELOG_FILL_FOP_NUMBER (co, frame->root->op, fop_fn, xtra_len);
+ co++;
+
+ CHANGELOG_FILL_UUID (co, fd->inode->gfid, uuid_fn, xtra_len);
+ co++;
+
+ CHANGELOG_FILL_USIGNLL (co, offset, number_fn, xtra_len);
+ co++;
+
+ CHANGELOG_FILL_USIGNLL (co, iov_length (vector, count),
+ number_fn, xtra_len);
+
+ changelog_set_usable_record_and_length (local, xtra_len, 5);
+
+ //changelog_replication_assign_term (priv, local);
+
+ frame->local = local;
+ ret = 0;
+
+ out:
+ if (ret)
+ changelog_local_cleanup (this, local);
+ return ret;
+}
+
+/* overriden COPS */
+int
+changelog_replication_cops_open (xlator_t *this,
+ changelog_priv_t *priv, void *cpriv,
+ char *name, gf_boolean_t last)
+{
+ changelog_local_t local = {0,};
+ changelog_log_data_t cld = {0,};
+ changelog_rollover_data_t *crd = NULL;
+
+ crd = &cld.cld_roll;
+
+ cld.cld_type = CHANGELOG_TYPE_ROLLOVER;
+
+ crd->crd_finale = last;
+ crd->crd_use_suffix = _gf_false;
+ crd->crd_prealloc_size = 1<<10; /* preallocate 1 MB */
+
+
+ (void) strcpy (crd->crd_changelog_name, name);
+
+ local.lu.val = 0;
+ local.nr_bytes = 0;
+
+ return changelog_inject_single_event (this, priv, &local, &cld);
+}
+
+/**
+ * no implicit rollover
+ */
+int
+changelog_replication_cops_rollover (xlator_t *this,
+ changelog_priv_t *priv, void *cpriv,
+ char *name, gf_boolean_t last)
+{
+ return changelog_replication_cops_open(this, priv, cpriv, name, last);
+}
+
+off_t
+changelog_replication_cops_get_offset (xlator_t *this,
+ changelog_priv_t *priv, void *cpriv,
+ changelog_local_t *local)
+{
+ if (!local)
+ return 0;
+
+ return (local->lu.val * JOURNAL_SECTOR_SIZE) + local->nr_bytes;
+}
+
+void
+changelog_replication_cops_set_offset (xlator_t *this,
+ changelog_priv_t *priv, void *cpriv,
+ changelog_local_t *local, off_t bytes)
+{
+ local->nr_bytes += bytes;
+}
+
+void
+changelog_replication_cops_reset_offset (xlator_t *this, changelog_priv_t *priv,
+ void *cpriv, changelog_local_t *local)
+{
+ return;
+}
+
+int
+changelog_replication_policy_init (xlator_t *this,
+ changelog_priv_t *priv,
+ struct changelog_logpolicy *cp)
+{
+ struct xlator_fops *r_fops = NULL;
+ struct changelog_ops *r_cops = NULL;
+
+ r_fops = GF_CALLOC (1, sizeof (struct xlator_fops),
+ gf_changelog_mt_fop_policy_t);
+ if (!r_fops)
+ return -1;
+
+ r_cops = GF_CALLOC (1, sizeof (struct changelog_ops),
+ gf_changelog_mt_fop_policy_t);
+ if (!r_cops) {
+ GF_FREE (r_fops);
+ return -1;
+ }
+
+ /* no roll-over, one big fat journal per term */
+ priv->rollover_time = 0;
+
+ /* fsync() is internally trigerred by NSR */
+ priv->fsync_interval = 0;
+
+ /* no record header: extra data (via iobufs) are always persisted */
+ priv->no_gfid_hdr = _gf_true;
+
+ memcpy (r_fops, &changelog_default_fops, sizeof (struct xlator_fops));
+ memcpy (r_cops, &changelog_default_cops, sizeof (struct changelog_ops));
+
+ priv->term = 0;
+ (void) memset (cp->changelog_name, '\0', PATH_MAX);
+ memcpy(cp->changelog_name, JOURNAL_NAME, strlen(JOURNAL_NAME));
+#if 0
+ (void) snprintf (cp->changelog_name, PATH_MAX,
+ JOURNAL_NAME, priv->term);
+#endif
+
+ /* overload all fops */
+ r_fops->writev = changelog_replication_writev;
+ r_fops->ftruncate = changelog_replication_ftruncate;
+ r_fops->truncate = changelog_replication_truncate;
+ r_fops->fsetxattr = changelog_replication_fsetxattr;
+ r_fops->setxattr = changelog_replication_setxattr;
+ r_fops->removexattr = changelog_replication_removexattr;
+ r_fops->fremovexattr = changelog_replication_fremovexattr;
+ r_fops->setattr = changelog_replication_setattr;
+ r_fops->fsetattr = changelog_replication_fsetattr;
+ r_fops->create = changelog_replication_create;
+ r_fops->mknod = changelog_replication_mknod;
+ r_fops->symlink = changelog_replication_symlink;
+ r_fops->mkdir = changelog_replication_mkdir;
+ r_fops->link = changelog_replication_link;
+ r_fops->rename = changelog_replication_rename;
+ r_fops->unlink = changelog_replication_unlink;
+ r_fops->rmdir = changelog_replication_rmdir;
+
+ /* overload cops */
+ r_cops->open = changelog_replication_cops_open;
+ r_cops->rollover = changelog_replication_cops_rollover;
+ r_cops->get_offset = changelog_replication_cops_get_offset;
+ r_cops->set_offset = changelog_replication_cops_set_offset;
+ r_cops->reset_offset = changelog_replication_cops_reset_offset;
+
+ cp->fops = r_fops;
+ cp->cops = r_cops;
+
+ return 0;
+}
+
+int
+changelog_replication_policy_fini (xlator_t *this,
+ struct changelog_logpolicy *cp)
+{
+ GF_FREE (cp->fops);
+ GF_FREE (cp->cops);
+ return 0;
+}