diff options
Diffstat (limited to 'xlators/features/changelog/src/policy/changelog-policy-replication.c')
-rw-r--r-- | xlators/features/changelog/src/policy/changelog-policy-replication.c | 1184 |
1 files changed, 1184 insertions, 0 deletions
diff --git a/xlators/features/changelog/src/policy/changelog-policy-replication.c b/xlators/features/changelog/src/policy/changelog-policy-replication.c new file mode 100644 index 000000000..536339939 --- /dev/null +++ b/xlators/features/changelog/src/policy/changelog-policy-replication.c @@ -0,0 +1,1184 @@ +/* + Copyright (c) 2013 Red Hat, Inc. <http://www.redhat.com> + This file is part of GlusterFS. + + This file is licensed to you under your choice of the GNU Lesser + General Public License, version 3 or any later version (LGPLv3 or + later), or the GNU General Public License, version 2 (GPLv2), in all + cases as published by the Free Software Foundation. +*/ + +#include "changelog-policy.h" +#include "changelog-encoders.h" +#include "changelog-fops.h" + +#define JOURNAL_NAME "TERM" + +#define JOURNAL_SECTOR_SIZE 128 + +#define PRE_OP_MARK 0x5F4552505FULL /* _PRE_ */ +#define POST_OP_MARK 0x5F54534F505FULL /* _POST_ */ + +/** + * assume an ever increasing index for now.. + */ +static unsigned long nsr_index = 1; + +static unsigned long +get_index(changelog_priv_t *priv) { + unsigned long idx = 0; + + LOCK (&priv->lock); + { + idx = nsr_index++; + } + UNLOCK (&priv->lock); + + return idx; +} + +static void +reset_index(changelog_priv_t *priv) { + nsr_index = 1; +} + + +#if 0 +static inline void +//changelog_replication_assign_term (changelog_priv_t *priv, + changelog_local_t *local) +{ + local->nr_bytes = 0; + local->lu.val = get_index (priv); +} +#endif + +size_t +number_fn (void *data, char *buffer, gf_boolean_t encode) +{ + char buf[1024] = {0,}; + size_t bufsz = 0; + unsigned long long nr = 0; + + nr = *(unsigned long long *) data; + + if (encode) { + (void) snprintf (buf, sizeof (buf), "%llu", nr); + CHANGELOG_FILL_BUFFER (buffer, bufsz, buf, strlen (buf)); + } else + CHANGELOG_FILL_BUFFER (buffer, bufsz, + &nr, sizeof (unsigned long long)); + + return bufsz; +} + +size_t +uuid_fn (void *data, char *buffer, gf_boolean_t encode) +{ + char buf[1024] = {0,}; + uuid_t uuid = {0,}; + size_t bufsz = 0; + + memcpy (uuid, (uuid_t *) data, sizeof (uuid_t)); + + if (encode) { + char *tmpbuf = uuid_utoa (uuid); + (void) snprintf (buf, sizeof (buf), "%s", tmpbuf); + CHANGELOG_FILL_BUFFER (buffer, bufsz, buf, strlen (buf)); + } else + CHANGELOG_FILL_BUFFER (buffer, bufsz, uuid, sizeof (uuid_t)); + + return bufsz; +} + +#define CHANGELOG_FILL_USIGNLL(co, number, converter, xlen) do { \ + co->co_convert = converter; \ + co->co_free = NULL; \ + co->co_type = CHANGELOG_OPT_REC_ULL; \ + co->co_number = (unsigned long long) number; \ + xlen += sizeof (unsigned long long); \ + if (!co->co_convert) \ + co->co_len = sizeof (unsigned long long); \ + } while (0) + +#define CHANGELOG_FILL_UUID(co, uuid, converter, xlen) do { \ + co->co_convert = converter; \ + co->co_free = NULL; \ + co->co_type = CHANGELOG_OPT_REC_UUID; \ + uuid_copy (co->co_uuid, uuid); \ + xlen += sizeof (uuid_t); \ + } while (0) + + +/* TBD: move declarations here and nsr.c into a common place */ +#define NSR_TERM_XATTR "trusted.nsr.term" +#define RECON_TERM_XATTR "trusted.nsr.recon-term" +#define RECON_INDEX_XATTR "trusted.nsr.recon-index" + +static gf_boolean_t +changelog_fix_term(xlator_t *this, + changelog_local_t *local, + dict_t *xdata) +{ + int32_t old_term, new_term; + changelog_priv_t *priv = this->private; + int ret = 0; + char nfile[PATH_MAX] = {0,}; + int32_t recon_term, recon_index; + changelog_rollover_data_t crd; + + // If coming via the regular IO path, we should get the dict "nsr-term" + // If coming via reconciliation, we should get the dicts "nsr-recon-term" + // that indicates the term and "nsr-recon-index" for the index + if (dict_get_int32(xdata,NSR_TERM_XATTR,&new_term) == 0) { + old_term = priv->term; + + if (old_term != new_term) { + GF_ASSERT(new_term > old_term); + LOCK (&priv->lock); + reset_index(priv); + priv->term = new_term; + (void) snprintf (nfile, PATH_MAX, "%s.%d", + JOURNAL_NAME, priv->term); + ret = CHANGELOG_INVOKE_CFOP(this, priv, rollover, + nfile, _gf_false); + UNLOCK (&priv->lock); + if (ret != 0) + return _gf_false; + } + local->nr_bytes = 0; + local->lu.val = get_index (priv); + } else if ((dict_get_int32(xdata, RECON_TERM_XATTR, &recon_term) == 0) && + (dict_get_int32(xdata, RECON_INDEX_XATTR, &recon_index) == 0)) { + + old_term = priv->term; + + if (old_term != recon_term) { + LOCK (&priv->lock); + priv->term = recon_term; + (void) snprintf (crd.crd_changelog_name, PATH_MAX, "%s.%d", JOURNAL_NAME, priv->term); + crd.crd_prealloc_size = 0; + if (changelog_open(this, priv, local, &crd) != 0) + return _gf_false; + UNLOCK (&priv->lock); + } + local->nr_bytes = 0; + local->lu.val = recon_index; + } else { + return _gf_false; + } + + return _gf_true; +} + + + +/** override FOPS */ + +int32_t +changelog_replication_rmdir (call_frame_t *frame, xlator_t *this, + loc_t *loc, int xflags, dict_t *xdata) +{ + int ret = -1; + size_t xtra_len = 0; + changelog_opt_t *co = NULL; + changelog_priv_t *priv = NULL; + changelog_local_t *local = NULL; + + priv = this->private; + + /* <PRE> + FOP + GFID + Entry */ + CHANGELOG_INIT_NOCHECK (this, local, NULL, loc->inode->gfid, 4); + if (!local) + goto out; + + co = changelog_get_usable_buffer (local); + if (!co) + goto out; + + if (changelog_fix_term(this, local, xdata) == _gf_false) + goto out; + + CHANGELOG_FILL_USIGNLL (co, PRE_OP_MARK, NULL, xtra_len); + co++; + + CHANGELOG_FILL_FOP_NUMBER (co, frame->root->op, fop_fn, xtra_len); + co++; + + CHANGELOG_FILL_UUID (co, loc->inode->gfid, uuid_fn, xtra_len); + co++; + + CHANGELOG_FILL_ENTRY (co, loc->pargfid, loc->name, + entry_fn, entry_free_fn, xtra_len, out); + + changelog_set_usable_record_and_length (local, xtra_len, 4); + + //changelog_replication_assign_term (priv, local); + + frame->local = local; + ret = 0; + + out: + if (ret) + changelog_local_cleanup (this, local); + return ret; +} + +int32_t +changelog_replication_unlink (call_frame_t *frame, xlator_t *this, + loc_t *loc, int xflags, dict_t *xdata) +{ + return changelog_replication_rmdir (frame, this, loc, xflags, xdata); +} + +int32_t +changelog_replication_rename (call_frame_t *frame, xlator_t *this, + loc_t *oldloc, loc_t *newloc, dict_t *xdata) +{ + int ret = -1; + size_t xtra_len = 0; + changelog_opt_t *co = NULL; + changelog_priv_t *priv = NULL; + changelog_local_t *local = NULL; + + priv = this->private; + + /* <PRE> + FOP + GFID + OLDLOC + NEWLOC */ + CHANGELOG_INIT_NOCHECK (this, local, NULL, oldloc->inode->gfid, 5); + if (!local) + goto out; + + co = changelog_get_usable_buffer (local); + if (!co) + goto out; + + if (changelog_fix_term(this, local, xdata) == _gf_false) + goto out; + + CHANGELOG_FILL_USIGNLL (co, PRE_OP_MARK, NULL, xtra_len); + co++; + + CHANGELOG_FILL_FOP_NUMBER (co, frame->root->op, fop_fn, xtra_len); + co++; + + CHANGELOG_FILL_UUID (co, oldloc->inode->gfid, uuid_fn, xtra_len); + co++; + + CHANGELOG_FILL_ENTRY (co, oldloc->pargfid, oldloc->name, + entry_fn, entry_free_fn, xtra_len, out); + co++; + + CHANGELOG_FILL_ENTRY (co, newloc->pargfid, newloc->name, + entry_fn, entry_free_fn, xtra_len, out); + + //changelog_replication_assign_term (priv, local); + + changelog_set_usable_record_and_length (local, xtra_len, 5); + + frame->local = local; + ret = 0; + + out: + if (ret) + changelog_local_cleanup (this, local); + return ret; +} + +int32_t +changelog_replication_link (call_frame_t *frame, + xlator_t *this, loc_t *oldloc, + loc_t *newloc, dict_t *xdata) +{ + int ret = -1; + size_t xtra_len = 0; + changelog_opt_t *co = NULL; + changelog_priv_t *priv = NULL; + changelog_local_t *local = NULL; + + priv = this->private; + + /* <PRE> + FOP + GFID + Entry */ + CHANGELOG_INIT_NOCHECK (this, local, NULL, oldloc->gfid, 4); + if (!local) + goto out; + + co = changelog_get_usable_buffer (local); + if (!co) + goto out; + + if (changelog_fix_term(this, local, xdata) == _gf_false) + goto out; + + CHANGELOG_FILL_USIGNLL (co, PRE_OP_MARK, NULL, xtra_len); + co++; + + CHANGELOG_FILL_FOP_NUMBER (co, frame->root->op, fop_fn, xtra_len); + co++; + + CHANGELOG_FILL_UUID (co, oldloc->gfid, uuid_fn, xtra_len); + co++; + + CHANGELOG_FILL_ENTRY (co, newloc->pargfid, newloc->name, + entry_fn, entry_free_fn, xtra_len, out); + + changelog_set_usable_record_and_length (local, xtra_len, 4); + + //changelog_replication_assign_term (priv, local); + + frame->local = local; + ret = 0; + + out: + if (ret) + changelog_local_cleanup (this, local); + return ret; +} + +int32_t +changelog_replication_mkdir (call_frame_t *frame, + xlator_t *this, loc_t *loc, + mode_t mode, mode_t umask, dict_t *xdata) +{ + int ret = -1; + uuid_t gfid = {0,}; + void *uuid_req = NULL; + size_t xtra_len = 0; + changelog_opt_t *co = NULL; + changelog_priv_t *priv = NULL; + changelog_local_t *local = NULL; + + priv = this->private; + + ret = dict_get_ptr (xdata, "gfid-req", &uuid_req); + if (ret) { + gf_log (this->name, GF_LOG_DEBUG, + "failed to get gfid from dict"); + goto out; + } + uuid_copy (gfid, uuid_req); + + ret = -1; + + /* <PRE> + FOP + GFID + Entry */ + CHANGELOG_INIT_NOCHECK (this, local, NULL, gfid, 4); + if (!local) + goto out; + + co = changelog_get_usable_buffer (local); + if (!co) + goto out; + + if (changelog_fix_term(this, local, xdata) == _gf_false) + goto out; + + CHANGELOG_FILL_USIGNLL (co, PRE_OP_MARK, NULL, xtra_len); + co++; + + CHANGELOG_FILL_FOP_NUMBER (co, frame->root->op, fop_fn, xtra_len); + co++; + + CHANGELOG_FILL_UUID (co, gfid, uuid_fn, xtra_len); + co++; + + CHANGELOG_FILL_ENTRY (co, loc->pargfid, loc->name, + entry_fn, entry_free_fn, xtra_len, out); + + changelog_set_usable_record_and_length (local, xtra_len, 4); + + //changelog_replication_assign_term (priv, local); + + frame->local = local; + ret = 0; + + out: + if (ret) + changelog_local_cleanup (this, local); + return ret; +} + +int32_t +changelog_replication_symlink (call_frame_t *frame, xlator_t *this, + const char *linkname, loc_t *loc, + mode_t umask, dict_t *xdata) +{ + int ret = -1; + size_t xtra_len = 0; + uuid_t gfid = {0,}; + void *uuid_req = NULL; + changelog_opt_t *co = NULL; + changelog_priv_t *priv = NULL; + changelog_local_t *local = NULL; + + priv = this->private; + + ret = dict_get_ptr (xdata, "gfid-req", &uuid_req); + if (ret) { + gf_log (this->name, GF_LOG_DEBUG, + "failed to get gfid from dict"); + goto out; + } + uuid_copy (gfid, uuid_req); + + ret = -1; + + /* <PRE> + FOP + GFID + Entry */ + CHANGELOG_INIT_NOCHECK (this, local, NULL, gfid, 4); + if (!local) + goto out; + + co = changelog_get_usable_buffer (local); + if (!co) + goto out; + + if (changelog_fix_term(this, local, xdata) == _gf_false) + goto out; + + CHANGELOG_FILL_USIGNLL (co, PRE_OP_MARK, NULL, xtra_len); + co++; + + CHANGELOG_FILL_FOP_NUMBER (co, frame->root->op, fop_fn, xtra_len); + co++; + + CHANGELOG_FILL_UUID (co, gfid, uuid_fn, xtra_len); + co++; + + CHANGELOG_FILL_ENTRY (co, loc->pargfid, loc->name, + entry_fn, entry_free_fn, xtra_len, out); + + changelog_set_usable_record_and_length (local, xtra_len, 4); + + //changelog_replication_assign_term (priv, local); + + frame->local = local; + ret = 0; + + out: + if (ret) + changelog_local_cleanup (this, local); + return ret; +} + +int32_t +changelog_replication_mknod (call_frame_t *frame, + xlator_t *this, loc_t *loc, + mode_t mode, dev_t dev, + mode_t umask, dict_t *xdata) +{ + int ret = -1; + uuid_t gfid = {0,}; + void *uuid_req = NULL; + size_t xtra_len = 0; + changelog_opt_t *co = NULL; + changelog_priv_t *priv = NULL; + changelog_local_t *local = NULL; + + priv = this->private; + + ret = dict_get_ptr (xdata, "gfid-req", &uuid_req); + if (ret) { + gf_log (this->name, GF_LOG_DEBUG, + "failed to get gfid from dict"); + goto out; + } + uuid_copy (gfid, uuid_req); + + ret = -1; + + /* <PRE> + FOP + GFID + Entry */ + CHANGELOG_INIT_NOCHECK (this, local, NULL, gfid, 4); + + co = changelog_get_usable_buffer (local); + if (!co) + goto out; + + if (changelog_fix_term(this, local, xdata) == _gf_false) + goto out; + + CHANGELOG_FILL_USIGNLL (co, PRE_OP_MARK, NULL, xtra_len); + co++; + + CHANGELOG_FILL_FOP_NUMBER (co, frame->root->op, fop_fn, xtra_len); + co++; + + CHANGELOG_FILL_UUID (co, gfid, uuid_fn, xtra_len); + co++; + + CHANGELOG_FILL_ENTRY (co, loc->pargfid, loc->name, + entry_fn, entry_free_fn, xtra_len, out); + + changelog_set_usable_record_and_length (local, xtra_len, 4); + + //changelog_replication_assign_term (priv, local); + + frame->local = local; + ret = 0; + + out: + if (ret) + changelog_local_cleanup (this, local); + return ret; +} + +int32_t +changelog_replication_create (call_frame_t *frame, xlator_t *this, + loc_t *loc, int32_t flags, mode_t mode, + mode_t umask, fd_t *fd, dict_t *xdata) +{ + int ret = -1; + uuid_t gfid = {0,}; + void *uuid_req = NULL; + size_t xtra_len = 0; + changelog_opt_t *co = NULL; + changelog_priv_t *priv = NULL; + changelog_local_t *local = NULL; + + priv = this->private; + + ret = dict_get_ptr (xdata, "gfid-req", &uuid_req); + if (ret) { + gf_log (this->name, GF_LOG_DEBUG, + "failed to get gfid from dict"); + goto out; + } + uuid_copy (gfid, uuid_req); + + ret = -1; + + /* <PRE> + FOP + GFID + Entry */ + CHANGELOG_INIT_NOCHECK (this, local, NULL, gfid, 4); + + co = changelog_get_usable_buffer (local); + if (!co) + goto out; + + if (changelog_fix_term(this, local, xdata) == _gf_false) + goto out; + + CHANGELOG_FILL_USIGNLL (co, PRE_OP_MARK, NULL, xtra_len); + co++; + + CHANGELOG_FILL_FOP_NUMBER (co, frame->root->op, fop_fn, xtra_len); + co++; + + CHANGELOG_FILL_UUID (co, gfid, uuid_fn, xtra_len); + co++; + + CHANGELOG_FILL_ENTRY (co, loc->pargfid, loc->name, + entry_fn, entry_free_fn, xtra_len, out); + + changelog_set_usable_record_and_length (local, xtra_len, 4); + + //changelog_replication_assign_term (priv, local); + + frame->local = local; + ret = 0; + + out: + if (ret) + changelog_local_cleanup (this, local); + return ret; +} + +int32_t +changelog_replication_fsetattr (call_frame_t *frame, + xlator_t *this, fd_t *fd, + struct iatt *stbuf, int32_t valid, + dict_t *xdata) +{ + int ret = -1; + size_t xtra_len = 0; + changelog_opt_t *co = NULL; + changelog_priv_t *priv = NULL; + changelog_local_t *local = NULL; + + priv = this->private; + + /* <PRE> + FOP + GFID */ + CHANGELOG_INIT_NOCHECK (this, local, NULL, fd->inode->gfid, 3); + if (!local) + goto out; + + co = changelog_get_usable_buffer (local); + if (!co) + goto out; + + if (changelog_fix_term(this, local, xdata) == _gf_false) + goto out; + + CHANGELOG_FILL_USIGNLL (co, PRE_OP_MARK, NULL, xtra_len); + co++; + + CHANGELOG_FILL_FOP_NUMBER (co, frame->root->op, fop_fn, xtra_len); + co++; + + CHANGELOG_FILL_UUID (co, fd->inode->gfid, uuid_fn, xtra_len); + + changelog_set_usable_record_and_length (local, xtra_len, 3); + + //changelog_replication_assign_term (priv, local); + + frame->local = local; + ret = 0; + + out: + if (ret) + changelog_local_cleanup (this, local); + return ret; +} + +int32_t +changelog_replication_setattr (call_frame_t *frame, + xlator_t *this, loc_t *loc, + struct iatt *stbuf, int32_t valid, dict_t *xdata) +{ + int ret = -1; + size_t xtra_len = 0; + changelog_opt_t *co = NULL; + changelog_priv_t *priv = NULL; + changelog_local_t *local = NULL; + + priv = this->private; + + /* <PRE> + FOP + GFID */ + CHANGELOG_INIT_NOCHECK (this, local, NULL, loc->inode->gfid, 3); + if (!local) + goto out; + + co = changelog_get_usable_buffer (local); + if (!co) + goto out; + + if (changelog_fix_term(this, local, xdata) == _gf_false) + goto out; + + CHANGELOG_FILL_USIGNLL (co, PRE_OP_MARK, NULL, xtra_len); + co++; + + CHANGELOG_FILL_FOP_NUMBER (co, frame->root->op, fop_fn, xtra_len); + co++; + + CHANGELOG_FILL_UUID (co, loc->inode->gfid, uuid_fn, xtra_len); + + changelog_set_usable_record_and_length (local, xtra_len, 3); + + //changelog_replication_assign_term (priv, local); + + frame->local = local; + ret = 0; + + out: + if (ret) + changelog_local_cleanup (this, local); + return ret; +} + +int32_t +changelog_replication_fremovexattr (call_frame_t *frame, xlator_t *this, + fd_t *fd, const char *name, dict_t *xdata) +{ + int ret = -1; + size_t xtra_len = 0; + changelog_opt_t *co = NULL; + changelog_priv_t *priv = NULL; + changelog_local_t *local = NULL; + int32_t xattr_op; + + priv = this->private; + + /* <PRE> + FOP + GFID */ + CHANGELOG_INIT_NOCHECK (this, local, NULL, fd->inode->gfid, 3); + if (!local) + goto out; + + co = changelog_get_usable_buffer (local); + if (!co) + goto out; + + if (changelog_fix_term(this, local, xdata) == _gf_false) + goto out; + + CHANGELOG_FILL_USIGNLL (co, PRE_OP_MARK, NULL, xtra_len); + co++; + + if (dict_get_int32(xdata, "recon-xattr-opcode", &xattr_op) == 0) { + CHANGELOG_FILL_FOP_NUMBER (co, (glusterfs_fop_t)xattr_op, + fop_fn, xtra_len); + } + else { + CHANGELOG_FILL_FOP_NUMBER (co, frame->root->op, fop_fn, + xtra_len); + } + co++; + + CHANGELOG_FILL_UUID (co, fd->inode->gfid, uuid_fn, xtra_len); + + changelog_set_usable_record_and_length (local, xtra_len, 3); + + //changelog_replication_assign_term (priv, local); + + frame->local = local; + ret = 0; + + out: + if (ret) + changelog_local_cleanup (this, local); + return ret; +} + +int32_t +changelog_replication_removexattr (call_frame_t *frame, xlator_t *this, + loc_t *loc, const char *name, dict_t *xdata) +{ + int ret = -1; + size_t xtra_len = 0; + changelog_opt_t *co = NULL; + changelog_priv_t *priv = NULL; + changelog_local_t *local = NULL; + int32_t xattr_op; + + priv = this->private; + + CHANGELOG_INIT_NOCHECK (this, local, NULL, loc->inode->gfid, 3); + if (!local) + goto out; + + co = changelog_get_usable_buffer (local); + if (!co) + goto out; + + if (changelog_fix_term(this, local, xdata) == _gf_false) + goto out; + + CHANGELOG_FILL_USIGNLL (co, PRE_OP_MARK, NULL, xtra_len); + co++; + + if (dict_get_int32(xdata, "recon-xattr-opcode", &xattr_op) == 0) { + CHANGELOG_FILL_FOP_NUMBER (co, (glusterfs_fop_t)xattr_op, + fop_fn, xtra_len); + } + else { + CHANGELOG_FILL_FOP_NUMBER (co, frame->root->op, fop_fn, + xtra_len); + } + co++; + + CHANGELOG_FILL_UUID (co, loc->inode->gfid, uuid_fn, xtra_len); + + changelog_set_usable_record_and_length (local, xtra_len, 3); + + //changelog_replication_assign_term (priv, local); + + frame->local = local; + ret = 0; + + out: + if (ret) + changelog_local_cleanup (this, local); + return ret; +} + +int32_t +changelog_replication_setxattr (call_frame_t *frame, + xlator_t *this, loc_t *loc, + dict_t *dict, int32_t flags, dict_t *xdata) +{ + int ret = -1; + size_t xtra_len = 0; + changelog_opt_t *co = NULL; + changelog_priv_t *priv = NULL; + changelog_local_t *local = NULL; + int32_t xattr_op; + + priv = this->private; + + /* <PRE> + FOP + GFID */ + CHANGELOG_INIT_NOCHECK (this, local, NULL, loc->inode->gfid, 3); + if (!local) + goto out; + + co = changelog_get_usable_buffer (local); + if (!co) + goto out; + + if (changelog_fix_term(this, local, xdata) == _gf_false) + goto out; + + CHANGELOG_FILL_USIGNLL (co, PRE_OP_MARK, NULL, xtra_len); + co++; + + if (dict_get_int32(xdata, "recon-xattr-opcode", &xattr_op) == 0) { + CHANGELOG_FILL_FOP_NUMBER (co, (glusterfs_fop_t)xattr_op, + fop_fn, xtra_len); + } + else { + CHANGELOG_FILL_FOP_NUMBER (co, frame->root->op, fop_fn, + xtra_len); + } + co++; + + CHANGELOG_FILL_UUID (co, loc->inode->gfid, uuid_fn, xtra_len); + + changelog_set_usable_record_and_length (local, xtra_len, 3); + + //changelog_replication_assign_term (priv, local); + + frame->local = local; + ret = 0; + + out: + if (ret) + changelog_local_cleanup (this, local); + return ret; +} + +int32_t +changelog_replication_fsetxattr (call_frame_t *frame, + xlator_t *this, fd_t *fd, dict_t *dict, + int32_t flags, dict_t *xdata) +{ + int ret = -1; + size_t xtra_len = 0; + changelog_opt_t *co = NULL; + changelog_priv_t *priv = NULL; + changelog_local_t *local = NULL; + int32_t xattr_op; + + priv = this->private; + + /* <PRE> + FOP + GFID */ + CHANGELOG_INIT_NOCHECK (this, local, NULL, fd->inode->gfid, 3); + if (!local) + goto out; + + co = changelog_get_usable_buffer (local); + if (!co) + goto out; + + if (changelog_fix_term(this, local, xdata) == _gf_false) + goto out; + + CHANGELOG_FILL_USIGNLL (co, PRE_OP_MARK, NULL, xtra_len); + co++; + + + if (dict_get_int32(xdata, "recon-xattr-opcode", &xattr_op) == 0) { + CHANGELOG_FILL_FOP_NUMBER (co, (glusterfs_fop_t)xattr_op, + fop_fn, xtra_len); + } + else { + CHANGELOG_FILL_FOP_NUMBER (co, frame->root->op, fop_fn, + xtra_len); + } + co++; + + CHANGELOG_FILL_UUID (co, fd->inode->gfid, uuid_fn, xtra_len); + + changelog_set_usable_record_and_length (local, xtra_len, 3); + + //changelog_replication_assign_term (priv, local); + + frame->local = local; + ret = 0; + + out: + if (ret) + changelog_local_cleanup (this, local); + return ret; +} + +int32_t +changelog_replication_truncate (call_frame_t *frame, + xlator_t *this, loc_t *loc, + off_t offset, dict_t *xdata) +{ + int ret = -1; + size_t xtra_len = 0; + changelog_opt_t *co = NULL; + changelog_priv_t *priv = NULL; + changelog_local_t *local = NULL; + + priv = this->private; + + /* <PRE> + FOP + GFID + Offset */ + CHANGELOG_INIT_NOCHECK (this, local, NULL, loc->inode->gfid, 4); + if (!local) + goto out; + + co = changelog_get_usable_buffer (local); + if (!co) + goto out; + + if (changelog_fix_term(this, local, xdata) == _gf_false) + goto out; + + CHANGELOG_FILL_USIGNLL (co, PRE_OP_MARK, NULL, xtra_len); + co++; + + CHANGELOG_FILL_FOP_NUMBER (co, frame->root->op, fop_fn, xtra_len); + co++; + + CHANGELOG_FILL_UUID (co, loc->inode->gfid, uuid_fn, xtra_len); + co++; + + CHANGELOG_FILL_USIGNLL (co, offset, number_fn, xtra_len); + + changelog_set_usable_record_and_length (local, xtra_len, 4); + + //changelog_replication_assign_term (priv, local); + + frame->local = local; + ret = 0; + + out: + if (ret) + changelog_local_cleanup (this, local); + return ret; +} + +int32_t +changelog_replication_ftruncate (call_frame_t *frame, + xlator_t *this, fd_t *fd, + off_t offset, dict_t *xdata) +{ + int ret = -1; + size_t xtra_len = 0; + changelog_opt_t *co = NULL; + changelog_priv_t *priv = NULL; + changelog_local_t *local = NULL; + + priv = this->private; + + /* <PRE> + FOP + GFID + Offset */ + CHANGELOG_INIT_NOCHECK (this, local, NULL, fd->inode->gfid, 4); + if (!local) + goto out; + + co = changelog_get_usable_buffer (local); + if (!co) + goto out; + + if (changelog_fix_term(this, local, xdata) == _gf_false) + goto out; + + CHANGELOG_FILL_USIGNLL (co, PRE_OP_MARK, NULL, xtra_len); + co++; + + CHANGELOG_FILL_FOP_NUMBER (co, frame->root->op, fop_fn, xtra_len); + co++; + + CHANGELOG_FILL_UUID (co, fd->inode->gfid, uuid_fn, xtra_len); + co++; + + CHANGELOG_FILL_USIGNLL (co, offset, number_fn, xtra_len); + + changelog_set_usable_record_and_length (local, xtra_len, 4); + + //changelog_replication_assign_term (priv, local); + + frame->local = local; + ret = 0; + + out: + if (ret) + changelog_local_cleanup (this, local); + return ret; +} + +int32_t +changelog_replication_writev (call_frame_t *frame, + xlator_t *this, fd_t *fd, struct iovec *vector, + int32_t count, off_t offset, uint32_t flags, + struct iobref *iobref, dict_t *xdata) +{ + int ret = -1; + size_t xtra_len = 0; + changelog_opt_t *co = NULL; + changelog_priv_t *priv = NULL; + changelog_local_t *local = NULL; + + priv = this->private; + + /* <PRE> + FOP + GFID + Offset + Length */ + CHANGELOG_INIT_NOCHECK (this, local, NULL, fd->inode->gfid, 5); + if (!local) + goto out; + + co = changelog_get_usable_buffer (local); + if (!co) + goto out; + + if (changelog_fix_term(this, local, xdata) == _gf_false) + goto out; + + CHANGELOG_FILL_USIGNLL (co, PRE_OP_MARK, NULL, xtra_len); + co++; + + CHANGELOG_FILL_FOP_NUMBER (co, frame->root->op, fop_fn, xtra_len); + co++; + + CHANGELOG_FILL_UUID (co, fd->inode->gfid, uuid_fn, xtra_len); + co++; + + CHANGELOG_FILL_USIGNLL (co, offset, number_fn, xtra_len); + co++; + + CHANGELOG_FILL_USIGNLL (co, iov_length (vector, count), + number_fn, xtra_len); + + changelog_set_usable_record_and_length (local, xtra_len, 5); + + //changelog_replication_assign_term (priv, local); + + frame->local = local; + ret = 0; + + out: + if (ret) + changelog_local_cleanup (this, local); + return ret; +} + +/* overriden COPS */ +int +changelog_replication_cops_open (xlator_t *this, + changelog_priv_t *priv, void *cpriv, + char *name, gf_boolean_t last) +{ + changelog_local_t local = {0,}; + changelog_log_data_t cld = {0,}; + changelog_rollover_data_t *crd = NULL; + + crd = &cld.cld_roll; + + cld.cld_type = CHANGELOG_TYPE_ROLLOVER; + + crd->crd_finale = last; + crd->crd_use_suffix = _gf_false; + crd->crd_prealloc_size = 1<<10; /* preallocate 1 MB */ + + + (void) strcpy (crd->crd_changelog_name, name); + + local.lu.val = 0; + local.nr_bytes = 0; + + return changelog_inject_single_event (this, priv, &local, &cld); +} + +/** + * no implicit rollover + */ +int +changelog_replication_cops_rollover (xlator_t *this, + changelog_priv_t *priv, void *cpriv, + char *name, gf_boolean_t last) +{ + return changelog_replication_cops_open(this, priv, cpriv, name, last); +} + +off_t +changelog_replication_cops_get_offset (xlator_t *this, + changelog_priv_t *priv, void *cpriv, + changelog_local_t *local) +{ + if (!local) + return 0; + + return (local->lu.val * JOURNAL_SECTOR_SIZE) + local->nr_bytes; +} + +void +changelog_replication_cops_set_offset (xlator_t *this, + changelog_priv_t *priv, void *cpriv, + changelog_local_t *local, off_t bytes) +{ + local->nr_bytes += bytes; +} + +void +changelog_replication_cops_reset_offset (xlator_t *this, changelog_priv_t *priv, + void *cpriv, changelog_local_t *local) +{ + return; +} + +int +changelog_replication_policy_init (xlator_t *this, + changelog_priv_t *priv, + struct changelog_logpolicy *cp) +{ + struct xlator_fops *r_fops = NULL; + struct changelog_ops *r_cops = NULL; + + r_fops = GF_CALLOC (1, sizeof (struct xlator_fops), + gf_changelog_mt_fop_policy_t); + if (!r_fops) + return -1; + + r_cops = GF_CALLOC (1, sizeof (struct changelog_ops), + gf_changelog_mt_fop_policy_t); + if (!r_cops) { + GF_FREE (r_fops); + return -1; + } + + /* no roll-over, one big fat journal per term */ + priv->rollover_time = 0; + + /* fsync() is internally trigerred by NSR */ + priv->fsync_interval = 0; + + /* no record header: extra data (via iobufs) are always persisted */ + priv->no_gfid_hdr = _gf_true; + + memcpy (r_fops, &changelog_default_fops, sizeof (struct xlator_fops)); + memcpy (r_cops, &changelog_default_cops, sizeof (struct changelog_ops)); + + priv->term = 0; + (void) memset (cp->changelog_name, '\0', PATH_MAX); + memcpy(cp->changelog_name, JOURNAL_NAME, strlen(JOURNAL_NAME)); +#if 0 + (void) snprintf (cp->changelog_name, PATH_MAX, + JOURNAL_NAME, priv->term); +#endif + + /* overload all fops */ + r_fops->writev = changelog_replication_writev; + r_fops->ftruncate = changelog_replication_ftruncate; + r_fops->truncate = changelog_replication_truncate; + r_fops->fsetxattr = changelog_replication_fsetxattr; + r_fops->setxattr = changelog_replication_setxattr; + r_fops->removexattr = changelog_replication_removexattr; + r_fops->fremovexattr = changelog_replication_fremovexattr; + r_fops->setattr = changelog_replication_setattr; + r_fops->fsetattr = changelog_replication_fsetattr; + r_fops->create = changelog_replication_create; + r_fops->mknod = changelog_replication_mknod; + r_fops->symlink = changelog_replication_symlink; + r_fops->mkdir = changelog_replication_mkdir; + r_fops->link = changelog_replication_link; + r_fops->rename = changelog_replication_rename; + r_fops->unlink = changelog_replication_unlink; + r_fops->rmdir = changelog_replication_rmdir; + + /* overload cops */ + r_cops->open = changelog_replication_cops_open; + r_cops->rollover = changelog_replication_cops_rollover; + r_cops->get_offset = changelog_replication_cops_get_offset; + r_cops->set_offset = changelog_replication_cops_set_offset; + r_cops->reset_offset = changelog_replication_cops_reset_offset; + + cp->fops = r_fops; + cp->cops = r_cops; + + return 0; +} + +int +changelog_replication_policy_fini (xlator_t *this, + struct changelog_logpolicy *cp) +{ + GF_FREE (cp->fops); + GF_FREE (cp->cops); + return 0; +} |