From ef171ff2bfd114e46442441fbdeb692a416cc951 Mon Sep 17 00:00:00 2001 From: Jeff Darcy Date: Wed, 11 Dec 2013 16:26:25 -0500 Subject: Roll-up patch for NSR so far. Previous history: https://forge.gluster.org/~jdarcy/glusterfs-core/glusterfs-nsr Change-Id: I2b56328788753c6a74d9589815f2dd705ac9ce6a Signed-off-by: Jeff Darcy --- .../changelog/lib/src/gf-changelog-helpers.h | 1 + xlators/features/changelog/src/Makefile.am | 8 +- .../changelog/src/changelog-default-fops.c | 561 ++++++++++ .../features/changelog/src/changelog-encoders.c | 99 +- .../features/changelog/src/changelog-encoders.h | 10 +- xlators/features/changelog/src/changelog-fops.h | 157 +++ xlators/features/changelog/src/changelog-helpers.c | 208 ++-- xlators/features/changelog/src/changelog-helpers.h | 246 +++- .../features/changelog/src/changelog-mem-types.h | 9 +- xlators/features/changelog/src/changelog-misc.h | 8 +- xlators/features/changelog/src/changelog-rt.c | 9 +- xlators/features/changelog/src/changelog-rt.h | 5 +- xlators/features/changelog/src/changelog.c | 428 +++---- .../src/policy/changelog-policy-default.c | 44 + .../src/policy/changelog-policy-replication.c | 1184 ++++++++++++++++++++ .../changelog/src/policy/changelog-policy.h | 41 + 16 files changed, 2550 insertions(+), 468 deletions(-) create mode 100644 xlators/features/changelog/src/changelog-default-fops.c create mode 100644 xlators/features/changelog/src/changelog-fops.h create mode 100644 xlators/features/changelog/src/policy/changelog-policy-default.c create mode 100644 xlators/features/changelog/src/policy/changelog-policy-replication.c create mode 100644 xlators/features/changelog/src/policy/changelog-policy.h (limited to 'xlators/features/changelog') diff --git a/xlators/features/changelog/lib/src/gf-changelog-helpers.h b/xlators/features/changelog/lib/src/gf-changelog-helpers.h index 3aa6ed7b8..f35220ccb 100644 --- a/xlators/features/changelog/lib/src/gf-changelog-helpers.h +++ b/xlators/features/changelog/lib/src/gf-changelog-helpers.h @@ -94,4 +94,5 @@ gf_ftruncate (int fd, off_t length); off_t gf_lseek (int fd, off_t offset, int whence); + #endif diff --git a/xlators/features/changelog/src/Makefile.am b/xlators/features/changelog/src/Makefile.am index e85031ad4..f8beba430 100644 --- a/xlators/features/changelog/src/Makefile.am +++ b/xlators/features/changelog/src/Makefile.am @@ -3,15 +3,17 @@ xlator_LTLIBRARIES = changelog.la xlatordir = $(libdir)/glusterfs/$(PACKAGE_VERSION)/xlator/features noinst_HEADERS = changelog-helpers.h changelog-mem-types.h changelog-rt.h \ - changelog-misc.h changelog-encoders.h changelog-notifier.h + changelog-misc.h changelog-encoders.h changelog-notifier.h \ + changelog-fops.h policy/changelog-policy.h changelog_la_LDFLAGS = -module -avoidversion changelog_la_SOURCES = changelog.c changelog-rt.c changelog-helpers.c \ - changelog-encoders.c changelog-notifier.c + changelog-encoders.c changelog-notifier.c changelog-default-fops.c \ + policy/changelog-policy-default.c policy/changelog-policy-replication.c changelog_la_LIBADD = $(top_builddir)/libglusterfs/src/libglusterfs.la -AM_CPPFLAGS = $(GF_CPPFLAGS) -I$(top_srcdir)/libglusterfs/src -fPIC -D_FILE_OFFSET_BITS=64 \ +AM_CPPFLAGS = $(GF_CPPFLAGS) -I$(top_srcdir)/libglusterfs/src -Ipolicy/ -fPIC -D_FILE_OFFSET_BITS=64 \ -D_GNU_SOURCE -D$(GF_HOST_OS) -shared -nostartfiles -DDATADIR=\"$(localstatedir)\" AM_CFLAGS = -Wall $(GF_CFLAGS) diff --git a/xlators/features/changelog/src/changelog-default-fops.c b/xlators/features/changelog/src/changelog-default-fops.c new file mode 100644 index 000000000..59749905e --- /dev/null +++ b/xlators/features/changelog/src/changelog-default-fops.c @@ -0,0 +1,561 @@ +/* + Copyright (c) 2013 Red Hat, Inc. + This file is part of GlusterFS. + + This file is licensed to you under your choice of the GNU Lesser + General Public License, version 3 or any later version (LGPLv3 or + later), or the GNU General Public License, version 2 (GPLv2), in all + cases as published by the Free Software Foundation. +*/ + +#include "xlator.h" +#include "defaults.h" +#include "logging.h" + +#include "changelog-encoders.h" + +/** FOPS */ + +/* default rmdir */ +int32_t +changelog_default_rmdir (call_frame_t *frame, xlator_t *this, + loc_t *loc, int xflags, dict_t *xdata) +{ + int ret = -1; + size_t xtra_len = 0; + changelog_opt_t *co = NULL; + changelog_local_t *local = NULL; + + CHANGELOG_INIT_NOCHECK (this, local, NULL, loc->inode->gfid, 2); + if (!local) + goto out; + + co = changelog_get_usable_buffer (local); + if (!co) + goto out; + + CHANGELOG_FILL_FOP_NUMBER (co, frame->root->op, fop_fn, xtra_len); + + co++; + CHANGELOG_FILL_ENTRY (co, loc->pargfid, loc->name, + entry_fn, entry_free_fn, xtra_len, out); + + changelog_set_usable_record_and_length (local, xtra_len, 2); + + frame->local = local; + ret = 0; + + out: + if (ret) + changelog_local_cleanup (this, local); + return ret; +} + +/* default unlink */ +int32_t +changelog_default_unlink (call_frame_t *frame, xlator_t *this, + loc_t *loc, int xflags, dict_t *xdata) +{ + int ret = -1; + size_t xtra_len = 0; + changelog_opt_t *co = NULL; + changelog_local_t *local = NULL; + + CHANGELOG_INIT_NOCHECK (this, local, NULL, loc->inode->gfid, 2); + if (!local) + goto out; + + co = changelog_get_usable_buffer (local); + if (!co) + goto out; + + CHANGELOG_FILL_FOP_NUMBER (co, frame->root->op, fop_fn, xtra_len); + + co++; + CHANGELOG_FILL_ENTRY (co, loc->pargfid, loc->name, + entry_fn, entry_free_fn, xtra_len, out); + + changelog_set_usable_record_and_length (local, xtra_len, 2); + + frame->local = local; + ret = 0; + + out: + if (ret) + changelog_local_cleanup (this, local); + return ret; +} + +/* default rename */ +int32_t +changelog_default_rename (call_frame_t *frame, xlator_t *this, + loc_t *oldloc, loc_t *newloc, dict_t *xdata) +{ + int ret = -1; + size_t xtra_len = 0; + changelog_opt_t *co = NULL; + changelog_local_t *local = NULL; + + /* 3 == fop + oldloc + newloc */ + CHANGELOG_INIT_NOCHECK (this, local, NULL, oldloc->inode->gfid, 3); + if (!local) + goto out; + + co = changelog_get_usable_buffer (local); + if (!co) + goto out; + + CHANGELOG_FILL_FOP_NUMBER (co, frame->root->op, fop_fn, xtra_len); + + co++; + CHANGELOG_FILL_ENTRY (co, oldloc->pargfid, oldloc->name, + entry_fn, entry_free_fn, xtra_len, out); + + co++; + CHANGELOG_FILL_ENTRY (co, newloc->pargfid, newloc->name, + entry_fn, entry_free_fn, xtra_len, out); + + changelog_set_usable_record_and_length (local, xtra_len, 3); + + frame->local = local; + ret = 0; + + out: + if (ret) + changelog_local_cleanup (this, local); + return ret; +} + +/* default link */ +int32_t +changelog_default_link (call_frame_t *frame, + xlator_t *this, loc_t *oldloc, + loc_t *newloc, dict_t *xdata) +{ + int ret = 1; + size_t xtra_len = 0; + changelog_opt_t *co = NULL; + changelog_local_t *local = NULL; + + CHANGELOG_INIT_NOCHECK (this, local, NULL, oldloc->gfid, 2); + if (!local) + goto out; + + co = changelog_get_usable_buffer (local); + if (!co) + goto out; + + CHANGELOG_FILL_FOP_NUMBER (co, frame->root->op, fop_fn, xtra_len); + + co++; + CHANGELOG_FILL_ENTRY (co, newloc->pargfid, newloc->name, + entry_fn, entry_free_fn, xtra_len, out); + + changelog_set_usable_record_and_length (local, xtra_len, 2); + + frame->local = local; + ret = 0; + + out: + if (ret) + changelog_local_cleanup (this, local); + return ret; +} + +/* default mknid */ +int32_t +changelog_default_mkdir (call_frame_t *frame, xlator_t *this, + loc_t *loc, mode_t mode, mode_t umask, dict_t *xdata) +{ + int ret = -1; + uuid_t gfid = {0,}; + void *uuid_req = NULL; + size_t xtra_len = 0; + changelog_opt_t *co = NULL; + changelog_local_t *local = NULL; + + ret = dict_get_ptr (xdata, "gfid-req", &uuid_req); + if (ret) { + gf_log (this->name, GF_LOG_DEBUG, + "failed to get gfid from dict"); + goto out; + } + uuid_copy (gfid, uuid_req); + + ret = -1; + CHANGELOG_INIT_NOCHECK (this, local, NULL, gfid, 2); + if (!local) + goto out; + + co = changelog_get_usable_buffer (local); + if (!co) + goto out; + + CHANGELOG_FILL_FOP_NUMBER (co, frame->root->op, fop_fn, xtra_len); + + co++; + CHANGELOG_FILL_ENTRY (co, loc->pargfid, loc->name, + entry_fn, entry_free_fn, xtra_len, out); + + changelog_set_usable_record_and_length (local, xtra_len, 2); + + frame->local = local; + ret = 0; + + out: + if (ret) + changelog_local_cleanup (this, local); + return ret; +} + +/* default symlink */ +int32_t +changelog_default_symlink (call_frame_t *frame, xlator_t *this, + const char *linkname, loc_t *loc, + mode_t umask, dict_t *xdata) +{ + int ret = -1; + size_t xtra_len = 0; + uuid_t gfid = {0,}; + void *uuid_req = NULL; + changelog_opt_t *co = NULL; + changelog_local_t *local = NULL; + + ret = dict_get_ptr (xdata, "gfid-req", &uuid_req); + if (ret) { + gf_log (this->name, GF_LOG_DEBUG, + "failed to get gfid from dict"); + goto out; + } + uuid_copy (gfid, uuid_req); + + ret = -1; + CHANGELOG_INIT_NOCHECK (this, local, NULL, gfid, 2); + if (!local) + goto out; + + co = changelog_get_usable_buffer (local); + if (!co) + goto out; + + CHANGELOG_FILL_FOP_NUMBER (co, frame->root->op, fop_fn, xtra_len); + + co++; + CHANGELOG_FILL_ENTRY (co, loc->pargfid, loc->name, + entry_fn, entry_free_fn, xtra_len, out); + + changelog_set_usable_record_and_length (local, xtra_len, 2); + + frame->local = local; + ret = 0; + + out: + if (ret) + changelog_local_cleanup (this, local); + return ret; +} + +/* default mknod */ +int32_t +changelog_default_mknod (call_frame_t *frame, + xlator_t *this, loc_t *loc, + mode_t mode, dev_t dev, mode_t umask, dict_t *xdata) +{ + int ret = -1; + uuid_t gfid = {0,}; + void *uuid_req = NULL; + size_t xtra_len = 0; + changelog_opt_t *co = NULL; + changelog_local_t *local = NULL; + + ret = dict_get_ptr (xdata, "gfid-req", &uuid_req); + if (ret) { + gf_log (this->name, GF_LOG_DEBUG, + "failed to get gfid from dict"); + goto out; + } + uuid_copy (gfid, uuid_req); + + ret = -1; + CHANGELOG_INIT_NOCHECK (this, local, NULL, gfid, 2); + + co = changelog_get_usable_buffer (frame->local); + if (!co) + goto out; + + CHANGELOG_FILL_FOP_NUMBER (co, frame->root->op, fop_fn, xtra_len); + + co++; + CHANGELOG_FILL_ENTRY (co, loc->pargfid, loc->name, + entry_fn, entry_free_fn, xtra_len, out); + + changelog_set_usable_record_and_length (local, xtra_len, 2); + + frame->local = local; + ret = 0; + + out: + if (ret) + changelog_local_cleanup (this, local); + return ret; +} + +/* default create */ +int32_t +changelog_default_create (call_frame_t *frame, xlator_t *this, + loc_t *loc, int32_t flags, mode_t mode, + mode_t umask, fd_t *fd, dict_t *xdata) +{ + int ret = -1; + uuid_t gfid = {0,}; + void *uuid_req = NULL; + changelog_opt_t *co = NULL; + size_t xtra_len = 0; + changelog_local_t *local = NULL; + + ret = dict_get_ptr (xdata, "gfid-req", &uuid_req); + if (ret) { + gf_log (this->name, GF_LOG_DEBUG, + "failed to get gfid from dict"); + goto out; + } + uuid_copy (gfid, uuid_req); + + /* init with two extra records */ + ret = -1; + CHANGELOG_INIT_NOCHECK (this, local, NULL, gfid, 2); + if (!local) + goto out; + + co = changelog_get_usable_buffer (local); + if (!co) + goto out; + + CHANGELOG_FILL_FOP_NUMBER (co, frame->root->op, fop_fn, xtra_len); + + co++; + CHANGELOG_FILL_ENTRY (co, loc->pargfid, loc->name, + entry_fn, entry_free_fn, xtra_len, out); + + changelog_set_usable_record_and_length (local, xtra_len, 2); + + frame->local = local; + ret = 0; + + out: + if (ret) + changelog_local_cleanup (this, local); + return ret; +} + +/* default fsetattr */ +int32_t +changelog_default_fsetattr (call_frame_t *frame, + xlator_t *this, fd_t *fd, + struct iatt *stbuf, int32_t valid, dict_t *xdata) +{ + CHANGELOG_INIT (this, frame->local, + fd->inode, fd->inode->gfid, 0); + return 0; +} + +/* default setattr */ +int32_t +changelog_default_setattr (call_frame_t *frame, + xlator_t *this, loc_t *loc, + struct iatt *stbuf, int32_t valid, dict_t *xdata) +{ + CHANGELOG_INIT (this, frame->local, + loc->inode, loc->inode->gfid, 0); + return 0; +} + +/* default fremovexattr */ +int32_t +changelog_default_fremovexattr (call_frame_t *frame, xlator_t *this, + fd_t *fd, const char *name, dict_t *xdata) +{ + CHANGELOG_INIT (this, frame->local, + fd->inode, fd->inode->gfid, 0); + return 0; +} + +/* default removexattr */ +int32_t +changelog_default_removexattr (call_frame_t *frame, xlator_t *this, + loc_t *loc, const char *name, dict_t *xdata) +{ + CHANGELOG_INIT (this, frame->local, + loc->inode, loc->inode->gfid, 0); + return 0; +} + +/* default setxattr */ +int32_t +changelog_default_setxattr (call_frame_t *frame, + xlator_t *this, loc_t *loc, + dict_t *dict, int32_t flags, dict_t *xdata) +{ + CHANGELOG_INIT (this, frame->local, + loc->inode, loc->inode->gfid, 0); + return 0; +} + +/* default fsetxattr */ +int32_t +changelog_default_fsetxattr (call_frame_t *frame, + xlator_t *this, fd_t *fd, dict_t *dict, + int32_t flags, dict_t *xdata) +{ + CHANGELOG_INIT (this, frame->local, + fd->inode, fd->inode->gfid, 0); + return 0; +} + +/* default truncate */ +int32_t +changelog_default_truncate (call_frame_t *frame, + xlator_t *this, loc_t *loc, + off_t offset, dict_t *xdata) +{ + CHANGELOG_INIT (this, frame->local, + loc->inode, loc->inode->gfid, 0); + return 0; +} + +/* default ftruncate */ +int32_t +changelog_default_ftruncate (call_frame_t *frame, + xlator_t *this, fd_t *fd, + off_t offset, dict_t *xdata) +{ + CHANGELOG_INIT (this, frame->local, + fd->inode, fd->inode->gfid, 0); + return 0; +} + +/* default writev */ +int32_t +changelog_default_writev (call_frame_t *frame, + xlator_t *this, fd_t *fd, struct iovec *vector, + int32_t count, off_t offset, uint32_t flags, + struct iobref *iobref, dict_t *xdata) +{ + CHANGELOG_INIT (this, frame->local, + fd->inode, fd->inode->gfid, 0); + return 0; +} + +/** COPS */ + +int +changelog_default_cops_open (xlator_t *this, + changelog_priv_t *priv, + void *cpriv, char *name, gf_boolean_t last) +{ + changelog_log_data_t cld = {0,}; + changelog_rollover_data_t *crd = NULL; + struct timeval tv = {0,}; + + crd = &cld.cld_roll; + + cld.cld_type = CHANGELOG_TYPE_ROLLOVER; + + if (gettimeofday (&tv, NULL)) + return -1; + + crd->crd_prealloc_size = 0; /* no preallocation */ + crd->crd_finale = last; + crd->crd_use_suffix = _gf_true; + crd->crd_roll_key = (unsigned long) tv.tv_sec; + + (void) strcpy (crd->crd_changelog_name, name); + (void) strcpy (crd->crd_changelog_oname, name); + + /* inject a roll-over event */ + return changelog_inject_single_event (this, priv, NULL, &cld); +} + +int +changelog_default_cops_rollover (xlator_t *this, + changelog_priv_t *priv, void *cpriv, + char *name, gf_boolean_t last) +{ + return changelog_default_cops_open (this, priv, cpriv, name, last); +} + +int +changelog_default_cops_sync (xlator_t *this, + changelog_priv_t *priv, void *cpriv) +{ + changelog_log_data_t cld = {0,}; + + cld.cld_type = CHANGELOG_TYPE_FSYNC; + return changelog_inject_single_event (this, priv, NULL, &cld); +} + +/** + * write to the changelog: @changelog_update() implements inode version + * checking and all other stuffs... + */ +int +changelog_default_cops_write (xlator_t *this, + changelog_priv_t *priv, void *cpriv, + changelog_local_t *local, changelog_log_type type) +{ + changelog_update (this, priv, local, type); + return 0; +} + +off_t +changelog_default_cops_get_offset (xlator_t *this, + changelog_priv_t *priv, void *cpriv, + changelog_local_t *local) +{ + return *(off_t *)cpriv; +} + +void +changelog_default_cops_set_offset (xlator_t *this, + changelog_priv_t *priv, void *cpriv, + changelog_local_t *local, off_t bytes) +{ + *(off_t *)cpriv += bytes; +} + +void +changelog_default_cops_reset_offset (xlator_t *this, changelog_priv_t *priv, + void *cpriv, changelog_local_t *local) +{ + *(off_t *)cpriv = 0; +} + +/** + * roll-over takes care of close and open + */ +int +changelog_default_cops_close (xlator_t *this, + changelog_priv_t *priv, void *cpriv) +{ + errno = ENOTSUP; + return -1; +} + +int +changelog_default_cops_read (xlator_t *this, + changelog_priv_t *priv, void *cpriv, char *buffer) +{ + errno = ENOTSUP; + return -1; +} + +/** + * no purging of changelogs + */ +int +changelog_default_cops_unlink (xlator_t *this, + changelog_priv_t *priv, void *cpriv, char *name) +{ + errno = ENOTSUP; + return -1; +} diff --git a/xlators/features/changelog/src/changelog-encoders.c b/xlators/features/changelog/src/changelog-encoders.c index 553eec85c..8d45ee1ff 100644 --- a/xlators/features/changelog/src/changelog-encoders.c +++ b/xlators/features/changelog/src/changelog-encoders.c @@ -72,7 +72,7 @@ entry_free_fn (void *data) */ static inline void -changelog_encode_write_xtra (changelog_log_data_t *cld, +changelog_encode_write_xtra (changelog_write_data_t *cwd, char *buffer, size_t *off, gf_boolean_t encode) { int i = 0; @@ -82,10 +82,11 @@ changelog_encode_write_xtra (changelog_log_data_t *cld, offset = *off; - co = (changelog_opt_t *) cld->cld_ptr; + co = (changelog_opt_t *) cwd->cwd_ptr; - for (; i < cld->cld_xtra_records; i++, co++) { - CHANGELOG_FILL_BUFFER (buffer, offset, "\0", 1); + for (; i < cwd->cwd_xtra_records; i++, co++) { + if (i) + CHANGELOG_FILL_BUFFER (buffer, offset, "\0", 1); switch (co->co_type) { case CHANGELOG_OPT_REC_FOP: @@ -94,6 +95,12 @@ changelog_encode_write_xtra (changelog_log_data_t *cld, case CHANGELOG_OPT_REC_ENTRY: data = &co->co_entry; break; + case CHANGELOG_OPT_REC_ULL: + data = &co->co_number; + break; + case CHANGELOG_OPT_REC_UUID: + data = &co->co_uuid; + break; } if (co->co_convert) @@ -108,69 +115,59 @@ changelog_encode_write_xtra (changelog_log_data_t *cld, } int -changelog_encode_ascii (xlator_t *this, changelog_log_data_t *cld) +changelog_encode_ascii (xlator_t *this, + changelog_local_t *local, changelog_log_data_t *cld) { - size_t off = 0; - size_t gfid_len = 0; - char *gfid_str = NULL; - char *buffer = NULL; - changelog_priv_t *priv = NULL; + size_t off = 0; + size_t gfid_len = 0; + char *gfid_str = NULL; + char *buffer = NULL; + changelog_priv_t *priv = NULL; + changelog_write_data_t *cwd = NULL; priv = this->private; + cwd = &cld->cld_wdata; - gfid_str = uuid_utoa (cld->cld_gfid); + gfid_str = uuid_utoa (cwd->cwd_gfid); gfid_len = strlen (gfid_str); /* extra bytes for decorations */ - buffer = alloca (gfid_len + cld->cld_ptr_len + 10); - CHANGELOG_STORE_ASCII (priv, buffer, - off, gfid_str, gfid_len, cld); - - if (cld->cld_xtra_records) - changelog_encode_write_xtra (cld, buffer, &off, _gf_true); - - CHANGELOG_FILL_BUFFER (buffer, off, "\0", 1); + buffer = alloca (gfid_len + cwd->cwd_ptr_len + 100); + if (!priv->no_gfid_hdr) + CHANGELOG_STORE_ASCII (priv, buffer, + off, gfid_str, gfid_len, cld); + + if (cwd->cwd_xtra_records) { + changelog_encode_write_xtra (cwd, buffer, &off, _gf_true); + CHANGELOG_FILL_BUFFER (buffer, off, "\0", 1); + } - return changelog_write_change (priv, buffer, off); + return changelog_write_change (this, priv, + local, buffer, off); } int -changelog_encode_binary (xlator_t *this, changelog_log_data_t *cld) +changelog_encode_binary (xlator_t *this, + changelog_local_t *local, changelog_log_data_t *cld) { - size_t off = 0; - char *buffer = NULL; - changelog_priv_t *priv = NULL; + size_t off = 0; + char *buffer = NULL; + changelog_priv_t *priv = NULL; + changelog_write_data_t *cwd = NULL; priv = this->private; + cwd = &cld->cld_wdata; /* extra bytes for decorations */ - buffer = alloca (sizeof (uuid_t) + cld->cld_ptr_len + 10); - CHANGELOG_STORE_BINARY (priv, buffer, off, cld->cld_gfid, cld); - - if (cld->cld_xtra_records) - changelog_encode_write_xtra (cld, buffer, &off, _gf_false); + buffer = alloca (sizeof (uuid_t) + cwd->cwd_ptr_len + 100); + if (!priv->no_gfid_hdr) + CHANGELOG_STORE_BINARY (priv, buffer, off, cwd->cwd_gfid, cld); - CHANGELOG_FILL_BUFFER (buffer, off, "\0", 1); - - return changelog_write_change (priv, buffer, off); -} - -static struct changelog_encoder -cb_encoder[] = { - [CHANGELOG_ENCODE_BINARY] = - { - .encoder = CHANGELOG_ENCODE_BINARY, - .encode = changelog_encode_binary, - }, - [CHANGELOG_ENCODE_ASCII] = - { - .encoder = CHANGELOG_ENCODE_ASCII, - .encode = changelog_encode_ascii, - }, -}; + if (cwd->cwd_xtra_records) { + changelog_encode_write_xtra (cwd, buffer, &off, _gf_false); + CHANGELOG_FILL_BUFFER (buffer, off, "\0", 1); + } -void -changelog_encode_change( changelog_priv_t * priv) -{ - priv->ce = &cb_encoder[priv->encode_mode]; + return changelog_write_change (this, priv, + local, buffer, off); } diff --git a/xlators/features/changelog/src/changelog-encoders.h b/xlators/features/changelog/src/changelog-encoders.h index a3efbee05..2a96ba4dd 100644 --- a/xlators/features/changelog/src/changelog-encoders.h +++ b/xlators/features/changelog/src/changelog-encoders.h @@ -21,6 +21,7 @@ priv->maps[cld->cld_type], 1); \ CHANGELOG_FILL_BUFFER (buffer, \ off, gfid, gfid_len); \ + CHANGELOG_FILL_BUFFER (buffer, off, "\0", 1); \ } while (0) #define CHANGELOG_STORE_BINARY(priv, buf, off, gfid, cld) do { \ @@ -28,6 +29,7 @@ priv->maps[cld->cld_type], 1); \ CHANGELOG_FILL_BUFFER (buffer, \ off, gfid, sizeof (uuid_t)); \ + CHANGELOG_FILL_BUFFER (buffer, off, "\0", 1); \ } while (0) size_t @@ -37,10 +39,10 @@ fop_fn (void *data, char *buffer, gf_boolean_t encode); void entry_free_fn (void *data); int -changelog_encode_binary (xlator_t *, changelog_log_data_t *); +changelog_encode_binary (xlator_t *, + changelog_local_t *, changelog_log_data_t *); int -changelog_encode_ascii (xlator_t *, changelog_log_data_t *); -void -changelog_encode_change(changelog_priv_t *); +changelog_encode_ascii (xlator_t *, + changelog_local_t *, changelog_log_data_t *); #endif /* _CHANGELOG_ENCODERS_H */ diff --git a/xlators/features/changelog/src/changelog-fops.h b/xlators/features/changelog/src/changelog-fops.h new file mode 100644 index 000000000..597327be3 --- /dev/null +++ b/xlators/features/changelog/src/changelog-fops.h @@ -0,0 +1,157 @@ +/* + Copyright (c) 2013 Red Hat, Inc. + This file is part of GlusterFS. + + This file is licensed to you under your choice of the GNU Lesser + General Public License, version 3 or any later version (LGPLv3 or + later), or the GNU General Public License, version 2 (GPLv2), in all + cases as published by the Free Software Foundation. +*/ + +#ifndef _CHANGELOG_FOPS_H +#define _CHANGELOG_FOPS_H + +/* FOPS */ + +int32_t +changelog_default_rmdir (call_frame_t *frame, xlator_t *this, + loc_t *loc, int xflags, dict_t *xdata); +int32_t +changelog_default_unlink (call_frame_t *frame, xlator_t *this, + loc_t *loc, int xflags, dict_t *xdata); +int32_t +changelog_default_rename (call_frame_t *frame, xlator_t *this, + loc_t *oldloc, loc_t *newloc, dict_t *xdata); +int32_t +changelog_default_link (call_frame_t *frame, + xlator_t *this, loc_t *oldloc, + loc_t *newloc, dict_t *xdata); +int32_t +changelog_default_mkdir (call_frame_t *frame, xlator_t *this, + loc_t *loc, mode_t mode, mode_t umask, dict_t *xdata); +int32_t +changelog_default_symlink (call_frame_t *frame, xlator_t *this, + const char *linkname, loc_t *loc, + mode_t umask, dict_t *xdata); +int32_t +changelog_default_mknod (call_frame_t *frame, + xlator_t *this, loc_t *loc, + mode_t mode, dev_t dev, mode_t umask, dict_t *xdata); +int32_t +changelog_default_create (call_frame_t *frame, xlator_t *this, + loc_t *loc, int32_t flags, mode_t mode, + mode_t umask, fd_t *fd, dict_t *xdata); +int32_t +changelog_default_fsetattr (call_frame_t *frame, + xlator_t *this, fd_t *fd, + struct iatt *stbuf, int32_t valid, dict_t *xdata); +int32_t +changelog_default_setattr (call_frame_t *frame, + xlator_t *this, loc_t *loc, + struct iatt *stbuf, int32_t valid, dict_t *xdata); +int32_t +changelog_default_fremovexattr (call_frame_t *frame, xlator_t *this, + fd_t *fd, const char *name, dict_t *xdata); +int32_t +changelog_default_removexattr (call_frame_t *frame, xlator_t *this, + loc_t *loc, const char *name, dict_t *xdata); +int32_t +changelog_default_setxattr (call_frame_t *frame, + xlator_t *this, loc_t *loc, + dict_t *dict, int32_t flags, dict_t *xdata); +int32_t +changelog_default_fsetxattr (call_frame_t *frame, + xlator_t *this, fd_t *fd, dict_t *dict, + int32_t flags, dict_t *xdata); +int32_t +changelog_default_truncate (call_frame_t *frame, + xlator_t *this, loc_t *loc, + off_t offset, dict_t *xdata); +int32_t +changelog_default_ftruncate (call_frame_t *frame, + xlator_t *this, fd_t *fd, + off_t offset, dict_t *xdata); +int32_t +changelog_default_writev (call_frame_t *frame, + xlator_t *this, fd_t *fd, struct iovec *vector, + int32_t count, off_t offset, uint32_t flags, + struct iobref *iobref, dict_t *xdata); + +/* COPS */ +int +changelog_default_cops_open (xlator_t *, changelog_priv_t *, + void *, char*, gf_boolean_t); + +int +changelog_default_cops_close (xlator_t *, changelog_priv_t *, void *); + +int +changelog_default_cops_sync (xlator_t *this, + changelog_priv_t *priv, void *); + +int +changelog_default_cops_rollover (xlator_t *, + changelog_priv_t *, void *, + char *, gf_boolean_t); +int +changelog_default_cops_write (xlator_t *, + changelog_priv_t *, void *, + changelog_local_t *, changelog_log_type); + +int +changelog_default_cops_read (xlator_t *, + changelog_priv_t *, void *, char *); + +int +changelog_default_cops_unlink (xlator_t *, + changelog_priv_t *, void *, char *); + +off_t +changelog_default_cops_get_offset (xlator_t *, + changelog_priv_t *, void *, + changelog_local_t *); + +void +changelog_default_cops_set_offset (xlator_t *, + changelog_priv_t *, void *, + changelog_local_t *, off_t ); + +void +changelog_default_cops_reset_offset (xlator_t *, changelog_priv_t *, + void *, changelog_local_t *); + + +GF_UNUSED static struct xlator_fops changelog_default_fops = { + .mknod = changelog_default_mknod, + .mkdir = changelog_default_mkdir, + .create = changelog_default_create, + .symlink = changelog_default_symlink, + .writev = changelog_default_writev, + .truncate = changelog_default_truncate, + .ftruncate = changelog_default_ftruncate, + .link = changelog_default_link, + .rename = changelog_default_rename, + .unlink = changelog_default_unlink, + .rmdir = changelog_default_rmdir, + .setattr = changelog_default_setattr, + .fsetattr = changelog_default_fsetattr, + .setxattr = changelog_default_setxattr, + .fsetxattr = changelog_default_fsetxattr, + .removexattr = changelog_default_removexattr, + .fremovexattr = changelog_default_fremovexattr, +}; + +GF_UNUSED static struct changelog_ops changelog_default_cops = { + .open = changelog_default_cops_open, + .sync = changelog_default_cops_sync, + .read = changelog_default_cops_read, + .close = changelog_default_cops_close, + .write = changelog_default_cops_write, + .unlink = changelog_default_cops_unlink, + .rollover = changelog_default_cops_rollover, + .get_offset = changelog_default_cops_get_offset, + .set_offset = changelog_default_cops_set_offset, + .reset_offset = changelog_default_cops_reset_offset, +}; + +#endif /* _CHANGELOG_FOPS_H */ diff --git a/xlators/features/changelog/src/changelog-helpers.c b/xlators/features/changelog/src/changelog-helpers.c index 7ab0091b5..ad4fe4013 100644 --- a/xlators/features/changelog/src/changelog-helpers.c +++ b/xlators/features/changelog/src/changelog-helpers.c @@ -21,7 +21,6 @@ #include "changelog-helpers.h" #include "changelog-mem-types.h" -#include "changelog-encoders.h" #include void @@ -53,48 +52,45 @@ changelog_thread_cleanup (xlator_t *this, pthread_t thr_id) inline void * changelog_get_usable_buffer (changelog_local_t *local) { - changelog_log_data_t *cld = NULL; + changelog_write_data_t *cwd = &local->cld.cld_wdata; - cld = &local->cld; - if (!cld->cld_iobuf) + if (!cwd->cwd_iobuf) return NULL; - return cld->cld_iobuf->ptr; + return cwd->cwd_ptr; } inline void changelog_set_usable_record_and_length (changelog_local_t *local, size_t len, int xr) { - changelog_log_data_t *cld = NULL; + changelog_write_data_t *cwd = &local->cld.cld_wdata; - cld = &local->cld; - - cld->cld_ptr_len = len; - cld->cld_xtra_records = xr; + cwd->cwd_ptr_len = len; + cwd->cwd_xtra_records = xr; } void changelog_local_cleanup (xlator_t *xl, changelog_local_t *local) { - int i = 0; - changelog_opt_t *co = NULL; - changelog_log_data_t *cld = NULL; + int i = 0; + changelog_opt_t *co = NULL; + changelog_write_data_t *cwd = NULL; if (!local) return; - cld = &local->cld; + cwd = &local->cld.cld_wdata; /* cleanup dynamic allocation for extra records */ - if (cld->cld_xtra_records) { - co = (changelog_opt_t *) cld->cld_ptr; - for (; i < cld->cld_xtra_records; i++, co++) + if (cwd->cwd_xtra_records) { + co = (changelog_opt_t *) cwd->cwd_ptr; + for (; i < cwd->cwd_xtra_records; i++, co++) if (co->co_free) co->co_free (co); } - CHANGELOG_IOBUF_UNREF (cld->cld_iobuf); + CHANGELOG_IOBUF_UNREF (cwd->cwd_iobuf); if (local->inode) inode_unref (local->inode); @@ -122,7 +118,8 @@ changelog_write (int fd, char *buffer, size_t len) static int changelog_rollover_changelog (xlator_t *this, - changelog_priv_t *priv, unsigned long ts) + changelog_priv_t *priv, + changelog_rollover_data_t *crd) { int ret = -1; int notify = 0; @@ -135,11 +132,22 @@ changelog_rollover_changelog (xlator_t *this, priv->changelog_fd = -1; } + /** + * no rolling-over of changelogs, policy implementer choose + * to do the heavy-lifting of having distinct changelog name. + * + * NOTE: This implies libgfchangelog would not be notified + (well, we could, but lets not do that now...) + */ + if (!crd->crd_use_suffix) + return 0; + (void) snprintf (ofile, PATH_MAX, - "%s/"CHANGELOG_FILE_NAME, priv->changelog_dir); - (void) snprintf (nfile, PATH_MAX, - "%s/"CHANGELOG_FILE_NAME".%lu", - priv->changelog_dir, ts); + "%s/%s", priv->changelog_dir, + crd->crd_changelog_oname); + (void) snprintf (nfile, PATH_MAX, "%s/%s.%lu", + priv->changelog_dir, + crd->crd_changelog_name, crd->crd_roll_key); ret = rename (ofile, nfile); if (!ret) @@ -171,7 +179,8 @@ changelog_rollover_changelog (xlator_t *this, int changelog_open (xlator_t *this, - changelog_priv_t *priv) + changelog_priv_t *priv, + changelog_local_t *local, changelog_rollover_data_t *crd) { int fd = 0; int ret = -1; @@ -180,12 +189,12 @@ changelog_open (xlator_t *this, char changelog_path[PATH_MAX] = {0,}; (void) snprintf (changelog_path, PATH_MAX, - "%s/"CHANGELOG_FILE_NAME, - priv->changelog_dir); + "%s/%s", priv->changelog_dir, + crd->crd_changelog_name); flags |= (O_CREAT | O_RDWR); if (priv->fsync_interval == 0) - flags |= O_SYNC; + flags |= O_SYNC; fd = open (changelog_path, flags, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH); @@ -198,12 +207,25 @@ changelog_open (xlator_t *this, } priv->changelog_fd = fd; + CHANGELOG_INVOKE_CFOP (this, priv, reset_offset, local); + + /* preallocate if required */ + if (crd->crd_prealloc_size > 0) { + ret = posix_fallocate (priv->changelog_fd, + 0, crd->crd_prealloc_size); + if (ret) { + gf_log (this->name, GF_LOG_ERROR, + "failed to preallocate %llu bytes", + (unsigned long long) crd->crd_prealloc_size); + } + } (void) snprintf (buffer, 1024, CHANGELOG_HEADER, CHANGELOG_VERSION_MAJOR, CHANGELOG_VERSION_MINOR, - priv->ce->encoder); - ret = changelog_write_change (priv, buffer, strlen (buffer)); + priv->encode_mode); + ret = changelog_write_change (this, priv, + local, buffer, strlen (buffer)); if (ret) { close (priv->changelog_fd); priv->changelog_fd = -1; @@ -216,18 +238,19 @@ changelog_open (xlator_t *this, return ret; } -int +static int changelog_start_next_change (xlator_t *this, changelog_priv_t *priv, - unsigned long ts, gf_boolean_t finale) + changelog_local_t *local, + changelog_log_data_t *cld) { - int ret = -1; - - ret = changelog_rollover_changelog (this, priv, ts); + int ret = 0; + changelog_rollover_data_t *crd = &cld->cld_roll; - if (!ret && !finale) - ret = changelog_open (this, priv); + ret = changelog_rollover_changelog (this, priv, crd); + if (!ret && !crd->crd_finale) + ret = changelog_open (this, priv, local, crd); return ret; } @@ -241,37 +264,42 @@ changelog_entry_length () } int -changelog_fill_rollover_data (changelog_log_data_t *cld, gf_boolean_t is_last) +changelog_write_change (xlator_t *this, changelog_priv_t *priv, + changelog_local_t *local, char *buffer, size_t len) { - struct timeval tv = {0,}; + int ret = -1; + off_t offset = 0; + ssize_t size = 0; + size_t writen = 0; - cld->cld_type = CHANGELOG_TYPE_ROLLOVER; + offset = CHANGELOG_INVOKE_CFOP (this, priv, get_offset, local); - if (gettimeofday (&tv, NULL)) - return -1; + while (writen < len) { + size = pwrite (priv->changelog_fd, + buffer + writen, len - writen, offset + writen); + if (size <= 0) + break; - cld->cld_roll_time = (unsigned long) tv.tv_sec; - cld->cld_finale = is_last; - return 0; -} + writen += size; + } -int -changelog_write_change (changelog_priv_t *priv, char *buffer, size_t len) -{ - return changelog_write (priv->changelog_fd, buffer, len); + if (writen == len) { + ret = 0; + CHANGELOG_INVOKE_CFOP (this, priv, set_offset, local, writen); + } + + return ret; } inline int changelog_handle_change (xlator_t *this, - changelog_priv_t *priv, changelog_log_data_t *cld) + changelog_priv_t *priv, + changelog_local_t *local, changelog_log_data_t *cld) { int ret = 0; if (CHANGELOG_TYPE_IS_ROLLOVER (cld->cld_type)) { - changelog_encode_change(priv); - ret = changelog_start_next_change (this, priv, - cld->cld_roll_time, - cld->cld_finale); + ret = changelog_start_next_change (this, priv, local, cld); if (ret) gf_log (this->name, GF_LOG_ERROR, "Problem rolling over changelog(s)"); @@ -295,7 +323,7 @@ changelog_handle_change (xlator_t *this, goto out; } - ret = priv->ce->encode (this, cld); + ret = priv->ce->encode (this, local, cld); if (ret) { gf_log (this->name, GF_LOG_ERROR, "error writing changelog to disk"); @@ -305,6 +333,17 @@ changelog_handle_change (xlator_t *this, return ret; } +static inline void +changelog_local_init_defaults (changelog_local_t *local, + uuid_t gfid, struct iobuf *iobuf) +{ + changelog_write_data_t *cwd = &(local->cld.cld_wdata); + + uuid_copy (cwd->cwd_gfid, gfid); + cwd->cwd_iobuf = iobuf; + cwd->cwd_xtra_records = 0; /* set by the caller */ +} + changelog_local_t * changelog_local_init (xlator_t *this, inode_t *inode, uuid_t gfid, int xtra_records, @@ -314,7 +353,7 @@ changelog_local_init (xlator_t *this, inode_t *inode, struct iobuf *iobuf = NULL; /** - * We relax the presence of inode if @update_flag is true. + * Relax the presence of inode if @update_flag is true. * The caller (implmentation of the fop) needs to be careful to * not blindly use local->inode. */ @@ -339,10 +378,7 @@ changelog_local_init (xlator_t *this, inode_t *inode, local->update_no_check = update_flag; - uuid_copy (local->cld.cld_gfid, gfid); - - local->cld.cld_iobuf = iobuf; - local->cld.cld_xtra_records = 0; /* set by the caller */ + (void) changelog_local_init_defaults (local, gfid, iobuf); if (inode) local->inode = inode_ref (inode); @@ -370,9 +406,11 @@ changelog_forget (xlator_t *this, inode_t *inode) int changelog_inject_single_event (xlator_t *this, changelog_priv_t *priv, + changelog_local_t *local, changelog_log_data_t *cld) { - return priv->cd.dispatchfn (this, priv, priv->cd.cd_data, cld, NULL); + return priv->cd.dispatchfn (this, priv, + priv->cd.cd_data, local, cld); } /** @@ -383,9 +421,9 @@ void * changelog_rollover (void *data) { int ret = 0; + char *cname = NULL; xlator_t *this = NULL; struct timeval tv = {0,}; - changelog_log_data_t cld = {0,}; changelog_time_slice_t *slice = NULL; changelog_priv_t *priv = data; @@ -400,16 +438,11 @@ changelog_rollover (void *data) if (ret) continue; - ret = changelog_fill_rollover_data (&cld, _gf_false); - if (ret) { - gf_log (this->name, GF_LOG_ERROR, - "failed to fill rollover data"); - continue; - } - LOCK (&priv->lock); { - ret = changelog_inject_single_event (this, priv, &cld); + cname = CHANGELOG_FNAME_FROM_POLICY (priv->cp); + ret = CHANGELOG_INVOKE_CFOP (this, priv, rollover, + cname, _gf_false); if (!ret) SLICE_VERSION_UPDATE (slice); } @@ -425,11 +458,9 @@ changelog_fsync_thread (void *data) int ret = 0; xlator_t *this = NULL; struct timeval tv = {0,}; - changelog_log_data_t cld = {0,}; changelog_priv_t *priv = data; this = priv->cf.this; - cld.cld_type = CHANGELOG_TYPE_FSYNC; while (1) { tv.tv_sec = priv->fsync_interval; @@ -439,7 +470,7 @@ changelog_fsync_thread (void *data) if (ret) continue; - ret = changelog_inject_single_event (this, priv, &cld); + ret = CHANGELOG_INVOKE_CFOP (this, priv, sync); if (ret) gf_log (this->name, GF_LOG_ERROR, "failed to inject fsync event"); @@ -637,19 +668,19 @@ changelog_inode_ctx_get (xlator_t *this, * signifies an update was recorded in the current time slice). */ inline void -changelog_update (xlator_t *this, changelog_priv_t *priv, - changelog_local_t *local, changelog_log_type type) +changelog_update (xlator_t *this, + changelog_priv_t *priv, + changelog_local_t *local, + changelog_log_type type) { - int ret = 0; - unsigned long *iver = NULL; - unsigned long version = 0; - inode_t *inode = NULL; - changelog_time_slice_t *slice = NULL; - changelog_inode_ctx_t *ctx = NULL; - changelog_log_data_t *cld_0 = NULL; - changelog_log_data_t *cld_1 = NULL; - changelog_local_t *next_local = NULL; - gf_boolean_t need_upd = _gf_true; + int ret = 0; + unsigned long *iver = NULL; + unsigned long version = 0; + inode_t *inode = NULL; + changelog_time_slice_t *slice = NULL; + changelog_inode_ctx_t *ctx = NULL; + changelog_log_data_t *cld_0 = NULL; + gf_boolean_t need_upd = _gf_true; slice = &priv->slice; @@ -673,13 +704,8 @@ changelog_update (xlator_t *this, changelog_priv_t *priv, cld_0 = &local->cld; cld_0->cld_type = type; - if ( (next_local = local->prev_entry) != NULL ) { - cld_1 = &next_local->cld; - cld_1->cld_type = type; - } - ret = priv->cd.dispatchfn (this, priv, - priv->cd.cd_data, cld_0, cld_1); + priv->cd.cd_data, local, cld_0); /** * update after the dispatcher has successfully done diff --git a/xlators/features/changelog/src/changelog-helpers.h b/xlators/features/changelog/src/changelog-helpers.h index ad79636b0..656fb7ffa 100644 --- a/xlators/features/changelog/src/changelog-helpers.h +++ b/xlators/features/changelog/src/changelog-helpers.h @@ -19,23 +19,15 @@ #include "changelog-misc.h" /** - * the changelog entry + * structures representing the changelog entries */ -typedef struct changelog_log_data { - /* rollover related */ - unsigned long cld_roll_time; - - /* reopen changelog? */ - gf_boolean_t cld_finale; - - changelog_log_type cld_type; - +typedef struct changelog_write_data { /** * sincd gfid is _always_ a necessity, it's not a part * of the iobuf. by doing this we do not add any overhead * for data and metadata related fops. */ - uuid_t cld_gfid; + uuid_t cwd_gfid; /** * iobufs are used for optionals records: pargfid, path, @@ -43,25 +35,78 @@ typedef struct changelog_log_data { * to allocate (iobuf_get() in the fop) and get unref'ed * in the callback (CHANGELOG_STACK_UNWIND). */ - struct iobuf *cld_iobuf; - -#define cld_ptr cld_iobuf->ptr + struct iobuf *cwd_iobuf; /** * after allocation you can point this to the length of * usable data, but make sure it does not exceed the * the size of the requested iobuf. */ - size_t cld_iobuf_len; - -#define cld_ptr_len cld_iobuf_len + size_t cwd_iobuf_len; + #define cwd_ptr cwd_iobuf->ptr + #define cwd_ptr_len cwd_iobuf_len /** * number of optional records */ - int cld_xtra_records; + int cwd_xtra_records; +} changelog_write_data_t; + +typedef struct changelog_rollover_data { + /** + * need a changelog reopen? + */ + gf_boolean_t crd_finale; + + /** + * changelog file name to be opened after a rollover + */ + char crd_changelog_name[PATH_MAX]; + + /** + * changelog file name before rollover + */ + char crd_changelog_oname[PATH_MAX]; + + /** + * use @crd_roll_key as suffix during roll-over + */ + gf_boolean_t crd_use_suffix; + + /** + * suffix used when rolling a changelog + */ + unsigned long crd_roll_key; + + /** + * preallocation? if yes, how much? + */ + off_t crd_prealloc_size; +} changelog_rollover_data_t; + +/** + * the changelog entry: structure representing the type of entry + * and a union encapsulating the above declared structures. + */ +typedef struct changelog_log_data { + /** + * type of the log data entry + */ + changelog_log_type cld_type; + + /** + * union for the type of changelog operations. @fsync() does + * not have a corresponding entry in this union as it just + * performs and @fsync() on ->changelog_fd. + */ + union { + changelog_write_data_t cld_wdata; + changelog_rollover_data_t cld_roll; + }; } changelog_log_data_t; +typedef struct changelog_local changelog_local_t; + /** * holder for dispatch function and private data */ @@ -70,8 +115,9 @@ typedef struct changelog_priv changelog_priv_t; typedef struct changelog_dispatcher { void *cd_data; - int (*dispatchfn) (xlator_t *, changelog_priv_t *, void *, - changelog_log_data_t *, changelog_log_data_t *); + int (*dispatchfn) (xlator_t *, + changelog_priv_t *, void *, + changelog_local_t *, changelog_log_data_t *); } changelog_dispatcher_t; struct changelog_bootstrap { @@ -82,9 +128,84 @@ struct changelog_bootstrap { struct changelog_encoder { changelog_encoder_t encoder; - int (*encode) (xlator_t *, changelog_log_data_t *); + int (*encode) (xlator_t *, + changelog_local_t *, changelog_log_data_t *); }; +struct changelog_ops { + /* changelog open */ + int (*open) (xlator_t *, changelog_priv_t *, + void *, char *, gf_boolean_t); + + /* changelog close */ + int (*close) (xlator_t *, changelog_priv_t *, void *); + + /* changelog rollover */ + int (*rollover) (xlator_t *, + changelog_priv_t *, + void *, char *, gf_boolean_t); + + int (*sync) (xlator_t *, changelog_priv_t *, void *); + + /* changelog write */ + int (*write) (xlator_t *, + changelog_priv_t *, void *, + changelog_local_t *, changelog_log_type); + + /* changelog read */ + int (*read) (xlator_t *, + changelog_priv_t *, void *, char *); + + int (*unlink) (xlator_t *, + changelog_priv_t *, void *, char *); + + /* {get|set} offset */ + off_t (*get_offset) (xlator_t *this, + changelog_priv_t *, void *, changelog_local_t *); + + void (*set_offset) (xlator_t *this, + changelog_priv_t *, void *, + changelog_local_t *, off_t); + + void (*reset_offset) (xlator_t *this, changelog_priv_t *, + void *, changelog_local_t *); +}; + +/** + * This structure is _filled_ by the policy init (@init_policy) routine. + * Default @fops and @cops are passed to the init routine, which can + * choose to override the file operation or changelog operation behaviour. + * Just by _replacing_ the function pointers, a policy can change it's + * file and changelog operation behaviour. Kind of inheritance... + */ +struct changelog_logpolicy { + /* current changelog name */ + char changelog_name[PATH_MAX]; + + /* private data */ + void *cpriv; + + /* file ops for the policy */ + struct xlator_fops *fops; + + /* changelog operations for the policy */ + struct changelog_ops *cops; + + /* current active policy */ + changelog_log_policy_t policy; + + int (*init_policy) (xlator_t *, + changelog_priv_t *priv, + struct changelog_logpolicy *); + int (*fini_policy) (xlator_t *, struct changelog_logpolicy *); +}; + +#define CHANGELOG_FNAME_FROM_POLICY(c) c->changelog_name + +#define CHANGELOG_INVOKE_FOP(priv,fop,...) priv->cp->fops->fop (__VA_ARGS__) + +#define CHANGELOG_INVOKE_CFOP(this,priv,fop,...) \ + priv->cp->cops->fop (this, priv, priv->cp->cpriv, ##__VA_ARGS__) /* xlator private */ @@ -142,6 +263,11 @@ typedef struct changelog_notify { struct changelog_priv { gf_boolean_t active; + /** + * write the record header? + */ + gf_boolean_t no_gfid_hdr; + /* to generate unique socket file per brick */ char *changelog_brick; @@ -191,24 +317,43 @@ struct changelog_priv { /* encoder */ struct changelog_encoder *ce; + + /* logging policy */ + changelog_log_policy_t policy; + + /* policy logger */ + struct changelog_logpolicy *cp; + + /* current NSR term */ + uint32_t term; }; struct changelog_local { inode_t *inode; + + /** + * fops that do not need inode version checks + */ gf_boolean_t update_no_check; + /** + * the log data entry + */ changelog_log_data_t cld; /** - * ->prev_entry is used in cases when there needs to be - * additional changelog entry for the parent (eg. rename) - * It's analogous to ->next in single linked list world, - * but we call it as ->prev_entry... ha ha ha + * number of bytes written: used for continuation */ - struct changelog_local *prev_entry; -}; + off_t nr_bytes; -typedef struct changelog_local changelog_local_t; + /** + * temporary scratch pads + */ + union { + void *ptr; + unsigned long val; + } lu; +}; /* inode version is stored in inode ctx */ typedef struct changelog_inode_ctx { @@ -224,6 +369,8 @@ typedef struct changelog_inode_ctx { */ typedef enum { CHANGELOG_OPT_REC_FOP, + CHANGELOG_OPT_REC_ULL, + CHANGELOG_OPT_REC_UUID, CHANGELOG_OPT_REC_ENTRY, } changelog_optional_rec_type_t; @@ -253,7 +400,9 @@ typedef struct { size_t co_len; union { - glusterfs_fop_t co_fop; + uuid_t co_uuid; + glusterfs_fop_t co_fop; + unsigned long long co_number; struct changelog_entry_fields co_entry; }; } changelog_opt_t; @@ -277,29 +426,26 @@ changelog_local_t * changelog_local_init (xlator_t *this, inode_t *inode, uuid_t gfid, int xtra_records, gf_boolean_t update_flag); int -changelog_start_next_change (xlator_t *this, - changelog_priv_t *priv, - unsigned long ts, gf_boolean_t finale); -int -changelog_open (xlator_t *this, changelog_priv_t *priv); -int -changelog_fill_rollover_data (changelog_log_data_t *cld, gf_boolean_t is_last); -int changelog_inject_single_event (xlator_t *this, changelog_priv_t *priv, + changelog_local_t *local, changelog_log_data_t *cld); inline size_t changelog_entry_length (); inline int changelog_write (int fd, char *buffer, size_t len); int -changelog_write_change (changelog_priv_t *priv, char *buffer, size_t len); +changelog_write_change (xlator_t *this, changelog_priv_t *priv, + changelog_local_t *local, char *buffer, size_t len); inline int changelog_handle_change (xlator_t *this, - changelog_priv_t *priv, changelog_log_data_t *cld); + changelog_priv_t *priv, + changelog_local_t *local, changelog_log_data_t *cld); inline void -changelog_update (xlator_t *this, changelog_priv_t *priv, - changelog_local_t *local, changelog_log_type type); +changelog_update (xlator_t *this, + changelog_priv_t *priv, + changelog_local_t *local, + changelog_log_type type); void * changelog_rollover (void *data); void * @@ -319,9 +465,6 @@ changelog_forget (xlator_t *this, inode_t *inode); } \ STACK_UNWIND_STRICT (fop, frame, params); \ changelog_local_cleanup (__xl, __local); \ - if (__local && __local->prev_entry) \ - changelog_local_cleanup (__xl, \ - __local->prev_entry); \ } while (0) #define CHANGELOG_IOBUF_REF(iobuf) do { \ @@ -346,12 +489,12 @@ changelog_forget (xlator_t *this, inode_t *inode); } \ } while (0) -#define CHANGLOG_FILL_FOP_NUMBER(co, fop, converter, xlen) do { \ - co->co_convert = converter; \ - co->co_free = NULL; \ - co->co_type = CHANGELOG_OPT_REC_FOP; \ - co->co_fop = fop; \ - xlen += sizeof (fop); \ +#define CHANGELOG_FILL_FOP_NUMBER(co, fop, converter, xlen) do { \ + co->co_convert = converter; \ + co->co_free = NULL; \ + co->co_type = CHANGELOG_OPT_REC_FOP; \ + co->co_fop = fop; \ + xlen += sizeof (fop); \ } while (0) #define CHANGELOG_FILL_ENTRY(co, pargfid, bname, \ @@ -392,4 +535,7 @@ changelog_forget (xlator_t *this, inode_t *inode); goto label; \ } while (0) +int +changelog_open (xlator_t *this, changelog_priv_t *priv, changelog_local_t *local, changelog_rollover_data_t *crd); + #endif /* _CHANGELOG_HELPERS_H */ diff --git a/xlators/features/changelog/src/changelog-mem-types.h b/xlators/features/changelog/src/changelog-mem-types.h index d72464eab..a65bbb4f2 100644 --- a/xlators/features/changelog/src/changelog-mem-types.h +++ b/xlators/features/changelog/src/changelog-mem-types.h @@ -19,10 +19,11 @@ enum gf_changelog_mem_types { gf_changelog_mt_batch_t = gf_common_mt_end + 3, gf_changelog_mt_rt_t = gf_common_mt_end + 4, gf_changelog_mt_inode_ctx_t = gf_common_mt_end + 5, - gf_changelog_mt_libgfchangelog_t = gf_common_mt_end + 6, - gf_changelog_mt_libgfchangelog_rl_t = gf_common_mt_end + 7, - gf_changelog_mt_libgfchangelog_dirent_t = gf_common_mt_end + 8, - gf_changelog_mt_changelog_buffer_t = gf_common_mt_end + 9, + gf_changelog_mt_fop_policy_t = gf_common_mt_end + 6, + gf_changelog_mt_libgfchangelog_t = gf_common_mt_end + 7, + gf_changelog_mt_libgfchangelog_rl_t = gf_common_mt_end + 8, + gf_changelog_mt_libgfchangelog_dirent_t = gf_common_mt_end + 9, + gf_changelog_mt_changelog_buffer_t = gf_common_mt_end + 10, gf_changelog_mt_end }; diff --git a/xlators/features/changelog/src/changelog-misc.h b/xlators/features/changelog/src/changelog-misc.h index 0712a3771..58bd3279d 100644 --- a/xlators/features/changelog/src/changelog-misc.h +++ b/xlators/features/changelog/src/changelog-misc.h @@ -65,7 +65,7 @@ } while (0) /** - * everything after 'CHANGELOG_TYPE_ENTRY' are internal types + * everything after @CHANGELOG_TYPE_ENTRY are internal types * (ie. none of the fops trigger this type of event), hence * CHANGELOG_MAX_TYPE = 3 */ @@ -91,6 +91,12 @@ typedef enum { CHANGELOG_ENCODE_MAX, } changelog_encoder_t; +/* logging policies */ +typedef enum { + CHANGELOG_LOG_POLICY_DEFAULT = 0, + CHANGELOG_LOG_POLICY_REPLICATE, +} changelog_log_policy_t; + #define CHANGELOG_VALID_ENCODING(enc) \ (enc > CHANGELOG_ENCODE_MIN && enc < CHANGELOG_ENCODE_MAX) diff --git a/xlators/features/changelog/src/changelog-rt.c b/xlators/features/changelog/src/changelog-rt.c index c147f68ca..4e801ae85 100644 --- a/xlators/features/changelog/src/changelog-rt.c +++ b/xlators/features/changelog/src/changelog-rt.c @@ -52,8 +52,9 @@ changelog_rt_fini (xlator_t *this, changelog_dispatcher_t *cd) } int -changelog_rt_enqueue (xlator_t *this, changelog_priv_t *priv, void *cbatch, - changelog_log_data_t *cld_0, changelog_log_data_t *cld_1) +changelog_rt_enqueue (xlator_t *this, + changelog_priv_t *priv, void *cbatch, + changelog_local_t *local, changelog_log_data_t *cld_0) { int ret = 0; changelog_rt_t *crt = NULL; @@ -62,9 +63,7 @@ changelog_rt_enqueue (xlator_t *this, changelog_priv_t *priv, void *cbatch, LOCK (&crt->lock); { - ret = changelog_handle_change (this, priv, cld_0); - if (!ret && cld_1) - ret = changelog_handle_change (this, priv, cld_1); + ret = changelog_handle_change (this, priv, local, cld_0); } UNLOCK (&crt->lock); diff --git a/xlators/features/changelog/src/changelog-rt.h b/xlators/features/changelog/src/changelog-rt.h index 1fc2bbc5b..09398041d 100644 --- a/xlators/features/changelog/src/changelog-rt.h +++ b/xlators/features/changelog/src/changelog-rt.h @@ -27,7 +27,8 @@ changelog_rt_init (xlator_t *this, changelog_dispatcher_t *cd); int changelog_rt_fini (xlator_t *this, changelog_dispatcher_t *cd); int -changelog_rt_enqueue (xlator_t *this, changelog_priv_t *priv, void *cbatch, - changelog_log_data_t *cld_0, changelog_log_data_t *cld_1); +changelog_rt_enqueue (xlator_t *this, + changelog_priv_t *priv, void *cbatch, + changelog_local_t *local, changelog_log_data_t *cld_0); #endif /* _CHANGELOG_RT_H */ diff --git a/xlators/features/changelog/src/changelog.c b/xlators/features/changelog/src/changelog.c index cea0e8c70..2e01161a9 100644 --- a/xlators/features/changelog/src/changelog.c +++ b/xlators/features/changelog/src/changelog.c @@ -17,15 +17,15 @@ #include "defaults.h" #include "logging.h" #include "iobuf.h" +#include #include "changelog-rt.h" - +#include "changelog-notifier.h" #include "changelog-encoders.h" #include "changelog-mem-types.h" -#include - -#include "changelog-notifier.h" +#include "changelog-fops.h" +#include "changelog-policy.h" static struct changelog_bootstrap cb_bootstrap[] = { @@ -36,11 +36,41 @@ cb_bootstrap[] = { }, }; -/* Entry operations - TYPE III */ +static struct changelog_encoder +cb_encoder[] = { + [CHANGELOG_ENCODE_BINARY] = + { + .encoder = CHANGELOG_ENCODE_BINARY, + .encode = changelog_encode_binary, + }, + [CHANGELOG_ENCODE_ASCII] = + { + .encoder = CHANGELOG_ENCODE_ASCII, + .encode = changelog_encode_ascii, + }, +}; -/** - * entry operations do not undergo inode version checking. - */ +static struct changelog_logpolicy +cb_policy[] = { + [CHANGELOG_LOG_POLICY_DEFAULT] = + { + .fops = NULL, + .cops = NULL, + .policy = CHANGELOG_LOG_POLICY_DEFAULT, + .init_policy = changelog_default_policy_init, + .fini_policy = changelog_default_policy_fini, + }, + [CHANGELOG_LOG_POLICY_REPLICATE] = + { + .fops = NULL, + .cops = NULL, + .policy = CHANGELOG_LOG_POLICY_REPLICATE, + .init_policy = changelog_replication_policy_init, + .fini_policy = changelog_replication_policy_fini, + }, +}; + +/* Entry operations - TYPE III */ /* {{{ */ @@ -59,7 +89,8 @@ changelog_rmdir_cbk (call_frame_t *frame, void *cookie, xlator_t *this, CHANGELOG_COND_GOTO (priv, ((op_ret < 0) || !local), unwind); - changelog_update (this, priv, local, CHANGELOG_TYPE_ENTRY); + CHANGELOG_INVOKE_CFOP (this, priv, + write, local, CHANGELOG_TYPE_ENTRY); unwind: CHANGELOG_STACK_UNWIND (rmdir, frame, op_ret, op_errno, @@ -71,27 +102,12 @@ int32_t changelog_rmdir (call_frame_t *frame, xlator_t *this, loc_t *loc, int xflags, dict_t *xdata) { - size_t xtra_len = 0; - changelog_priv_t *priv = NULL; - changelog_opt_t *co = NULL; + changelog_priv_t *priv = NULL; priv = this->private; CHANGELOG_NOT_ACTIVE_THEN_GOTO (frame, priv, wind); - CHANGELOG_INIT_NOCHECK (this, frame->local, - NULL, loc->inode->gfid, 2); - - co = changelog_get_usable_buffer (frame->local); - if (!co) - goto wind; - - CHANGLOG_FILL_FOP_NUMBER (co, frame->root->op, fop_fn, xtra_len); - - co++; - CHANGELOG_FILL_ENTRY (co, loc->pargfid, loc->name, - entry_fn, entry_free_fn, xtra_len, wind); - - changelog_set_usable_record_and_length (frame->local, xtra_len, 2); + CHANGELOG_INVOKE_FOP (priv, rmdir, frame, this, loc, xflags, xdata); wind: STACK_WIND (frame, changelog_rmdir_cbk, @@ -115,7 +131,8 @@ changelog_unlink_cbk (call_frame_t *frame, void *cookie, xlator_t *this, CHANGELOG_COND_GOTO (priv, ((op_ret < 0) || !local), unwind); - changelog_update (this, priv, local, CHANGELOG_TYPE_ENTRY); + CHANGELOG_INVOKE_CFOP (this, priv, + write, local, CHANGELOG_TYPE_ENTRY); unwind: CHANGELOG_STACK_UNWIND (unlink, frame, op_ret, op_errno, @@ -127,27 +144,13 @@ int32_t changelog_unlink (call_frame_t *frame, xlator_t *this, loc_t *loc, int xflags, dict_t *xdata) { - size_t xtra_len = 0; - changelog_priv_t *priv = NULL; - changelog_opt_t *co = NULL; + changelog_priv_t *priv = NULL; priv = this->private; CHANGELOG_NOT_ACTIVE_THEN_GOTO (frame, priv, wind); CHANGELOG_IF_INTERNAL_FOP_THEN_GOTO (xdata, wind); - CHANGELOG_INIT_NOCHECK (this, frame->local, NULL, loc->inode->gfid, 2); - - co = changelog_get_usable_buffer (frame->local); - if (!co) - goto wind; - - CHANGLOG_FILL_FOP_NUMBER (co, frame->root->op, fop_fn, xtra_len); - - co++; - CHANGELOG_FILL_ENTRY (co, loc->pargfid, loc->name, - entry_fn, entry_free_fn, xtra_len, wind); - - changelog_set_usable_record_and_length (frame->local, xtra_len, 2); + CHANGELOG_INVOKE_FOP (priv, unlink, frame, this, loc, xflags, xdata); wind: STACK_WIND (frame, changelog_unlink_cbk, @@ -174,7 +177,8 @@ changelog_rename_cbk (call_frame_t *frame, CHANGELOG_COND_GOTO (priv, ((op_ret < 0) || !local), unwind); - changelog_update (this, priv, local, CHANGELOG_TYPE_ENTRY); + CHANGELOG_INVOKE_CFOP (this, priv, + write, local, CHANGELOG_TYPE_ENTRY); unwind: CHANGELOG_STACK_UNWIND (rename, frame, op_ret, op_errno, @@ -188,32 +192,12 @@ int32_t changelog_rename (call_frame_t *frame, xlator_t *this, loc_t *oldloc, loc_t *newloc, dict_t *xdata) { - size_t xtra_len = 0; - changelog_priv_t *priv = NULL; - changelog_opt_t *co = NULL; + changelog_priv_t *priv = NULL; priv = this->private; CHANGELOG_NOT_ACTIVE_THEN_GOTO (frame, priv, wind); - /* 3 == fop + oldloc + newloc */ - CHANGELOG_INIT_NOCHECK (this, frame->local, - NULL, oldloc->inode->gfid, 3); - - co = changelog_get_usable_buffer (frame->local); - if (!co) - goto wind; - - CHANGLOG_FILL_FOP_NUMBER (co, frame->root->op, fop_fn, xtra_len); - - co++; - CHANGELOG_FILL_ENTRY (co, oldloc->pargfid, oldloc->name, - entry_fn, entry_free_fn, xtra_len, wind); - - co++; - CHANGELOG_FILL_ENTRY (co, newloc->pargfid, newloc->name, - entry_fn, entry_free_fn, xtra_len, wind); - - changelog_set_usable_record_and_length (frame->local, xtra_len, 3); + CHANGELOG_INVOKE_FOP (priv, rename, frame, this, oldloc, newloc, xdata); wind: STACK_WIND (frame, changelog_rename_cbk, @@ -239,7 +223,8 @@ changelog_link_cbk (call_frame_t *frame, CHANGELOG_COND_GOTO (priv, ((op_ret < 0) || !local), unwind); - changelog_update (this, priv, local, CHANGELOG_TYPE_ENTRY); + CHANGELOG_INVOKE_CFOP (this, priv, + write, local, CHANGELOG_TYPE_ENTRY); unwind: CHANGELOG_STACK_UNWIND (link, frame, op_ret, op_errno, @@ -252,28 +237,14 @@ changelog_link (call_frame_t *frame, xlator_t *this, loc_t *oldloc, loc_t *newloc, dict_t *xdata) { - size_t xtra_len = 0; - changelog_priv_t *priv = NULL; - changelog_opt_t *co = NULL; + changelog_priv_t *priv = NULL; priv = this->private; CHANGELOG_NOT_ACTIVE_THEN_GOTO (frame, priv, wind); CHANGELOG_IF_INTERNAL_FOP_THEN_GOTO (xdata, wind); - CHANGELOG_INIT_NOCHECK (this, frame->local, NULL, oldloc->gfid, 2); - - co = changelog_get_usable_buffer (frame->local); - if (!co) - goto wind; - - CHANGLOG_FILL_FOP_NUMBER (co, frame->root->op, fop_fn, xtra_len); - - co++; - CHANGELOG_FILL_ENTRY (co, newloc->pargfid, newloc->name, - entry_fn, entry_free_fn, xtra_len, wind); - - changelog_set_usable_record_and_length (frame->local, xtra_len, 2); + CHANGELOG_INVOKE_FOP (priv, link, frame, this, oldloc, newloc, xdata); wind: STACK_WIND (frame, changelog_link_cbk, @@ -299,7 +270,8 @@ changelog_mkdir_cbk (call_frame_t *frame, CHANGELOG_COND_GOTO (priv, ((op_ret < 0) || !local), unwind); - changelog_update (this, priv, local, CHANGELOG_TYPE_ENTRY); + CHANGELOG_INVOKE_CFOP (this, priv, + write, local, CHANGELOG_TYPE_ENTRY); unwind: CHANGELOG_STACK_UNWIND (mkdir, frame, op_ret, op_errno, @@ -311,37 +283,13 @@ int32_t changelog_mkdir (call_frame_t *frame, xlator_t *this, loc_t *loc, mode_t mode, mode_t umask, dict_t *xdata) { - int ret = -1; - uuid_t gfid = {0,}; - void *uuid_req = NULL; - size_t xtra_len = 0; - changelog_priv_t *priv = NULL; - changelog_opt_t *co = NULL; + changelog_priv_t *priv = NULL; priv = this->private; CHANGELOG_NOT_ACTIVE_THEN_GOTO (frame, priv, wind); - ret = dict_get_ptr (xdata, "gfid-req", &uuid_req); - if (ret) { - gf_log (this->name, GF_LOG_DEBUG, - "failed to get gfid from dict"); - goto wind; - } - uuid_copy (gfid, uuid_req); - - CHANGELOG_INIT_NOCHECK (this, frame->local, NULL, gfid, 2); - - co = changelog_get_usable_buffer (frame->local); - if (!co) - goto wind; - - CHANGLOG_FILL_FOP_NUMBER (co, frame->root->op, fop_fn, xtra_len); - - co++; - CHANGELOG_FILL_ENTRY (co, loc->pargfid, loc->name, - entry_fn, entry_free_fn, xtra_len, wind); - - changelog_set_usable_record_and_length (frame->local, xtra_len, 2); + CHANGELOG_INVOKE_FOP (priv, mkdir, frame, this, + loc, mode, umask, xdata); wind: STACK_WIND (frame, changelog_mkdir_cbk, @@ -367,7 +315,8 @@ changelog_symlink_cbk (call_frame_t *frame, CHANGELOG_COND_GOTO (priv, ((op_ret < 0) || !local), unwind); - changelog_update (this, priv, local, CHANGELOG_TYPE_ENTRY); + CHANGELOG_INVOKE_CFOP (this, priv, + write, local, CHANGELOG_TYPE_ENTRY); unwind: CHANGELOG_STACK_UNWIND (symlink, frame, op_ret, op_errno, @@ -380,37 +329,13 @@ changelog_symlink (call_frame_t *frame, xlator_t *this, const char *linkname, loc_t *loc, mode_t umask, dict_t *xdata) { - int ret = -1; - size_t xtra_len = 0; - uuid_t gfid = {0,}; - void *uuid_req = NULL; - changelog_priv_t *priv = NULL; - changelog_opt_t *co = NULL; + changelog_priv_t *priv = NULL; priv = this->private; CHANGELOG_NOT_ACTIVE_THEN_GOTO (frame, priv, wind); - ret = dict_get_ptr (xdata, "gfid-req", &uuid_req); - if (ret) { - gf_log (this->name, GF_LOG_DEBUG, - "failed to get gfid from dict"); - goto wind; - } - uuid_copy (gfid, uuid_req); - - CHANGELOG_INIT_NOCHECK (this, frame->local, NULL, gfid, 2); - - co = changelog_get_usable_buffer (frame->local); - if (!co) - goto wind; - - CHANGLOG_FILL_FOP_NUMBER (co, frame->root->op, fop_fn, xtra_len); - - co++; - CHANGELOG_FILL_ENTRY (co, loc->pargfid, loc->name, - entry_fn, entry_free_fn, xtra_len, wind); - - changelog_set_usable_record_and_length (frame->local, xtra_len, 2); + CHANGELOG_INVOKE_FOP (priv, symlink, frame, this, + linkname, loc, umask, xdata); wind: STACK_WIND (frame, changelog_symlink_cbk, @@ -436,7 +361,8 @@ changelog_mknod_cbk (call_frame_t *frame, CHANGELOG_COND_GOTO (priv, ((op_ret < 0) || !local), unwind); - changelog_update (this, priv, local, CHANGELOG_TYPE_ENTRY); + CHANGELOG_INVOKE_CFOP (this, priv, + write, local, CHANGELOG_TYPE_ENTRY); unwind: CHANGELOG_STACK_UNWIND (mknod, frame, op_ret, op_errno, @@ -449,37 +375,13 @@ changelog_mknod (call_frame_t *frame, xlator_t *this, loc_t *loc, mode_t mode, dev_t dev, mode_t umask, dict_t *xdata) { - int ret = -1; - uuid_t gfid = {0,}; - void *uuid_req = NULL; - size_t xtra_len = 0; - changelog_priv_t *priv = NULL; - changelog_opt_t *co = NULL; + changelog_priv_t *priv = NULL; priv = this->private; CHANGELOG_NOT_ACTIVE_THEN_GOTO (frame, priv, wind); - ret = dict_get_ptr (xdata, "gfid-req", &uuid_req); - if (ret) { - gf_log (this->name, GF_LOG_DEBUG, - "failed to get gfid from dict"); - goto wind; - } - uuid_copy (gfid, uuid_req); - - CHANGELOG_INIT_NOCHECK (this, frame->local, NULL, gfid, 2); - - co = changelog_get_usable_buffer (frame->local); - if (!co) - goto wind; - - CHANGLOG_FILL_FOP_NUMBER (co, frame->root->op, fop_fn, xtra_len); - - co++; - CHANGELOG_FILL_ENTRY (co, loc->pargfid, loc->name, - entry_fn, entry_free_fn, xtra_len, wind); - - changelog_set_usable_record_and_length (frame->local, xtra_len, 2); + CHANGELOG_INVOKE_FOP (priv, mknod, frame, this, + loc, mode, dev, umask, xdata); wind: STACK_WIND (frame, changelog_mknod_cbk, @@ -506,7 +408,8 @@ changelog_create_cbk (call_frame_t *frame, CHANGELOG_COND_GOTO (priv, ((op_ret < 0) || !local), unwind); - changelog_update (this, priv, local, CHANGELOG_TYPE_ENTRY); + CHANGELOG_INVOKE_CFOP (this, priv, + write, local, CHANGELOG_TYPE_ENTRY); unwind: CHANGELOG_STACK_UNWIND (create, frame, @@ -520,40 +423,13 @@ changelog_create (call_frame_t *frame, xlator_t *this, loc_t *loc, int32_t flags, mode_t mode, mode_t umask, fd_t *fd, dict_t *xdata) { - int ret = -1; - uuid_t gfid = {0,}; - void *uuid_req = NULL; - changelog_opt_t *co = NULL; - changelog_priv_t *priv = NULL; - size_t xtra_len = 0; + changelog_priv_t *priv = NULL; priv = this->private; CHANGELOG_NOT_ACTIVE_THEN_GOTO (frame, priv, wind); - ret = dict_get_ptr (xdata, "gfid-req", &uuid_req); - if (ret) { - gf_log (this->name, GF_LOG_DEBUG, - "failed to get gfid from dict"); - goto wind; - } - uuid_copy (gfid, uuid_req); - - /* init with two extra records */ - CHANGELOG_INIT_NOCHECK (this, frame->local, NULL, gfid, 2); - if (!frame->local) - goto wind; - - co = changelog_get_usable_buffer (frame->local); - if (!co) - goto wind; - - CHANGLOG_FILL_FOP_NUMBER (co, frame->root->op, fop_fn, xtra_len); - - co++; - CHANGELOG_FILL_ENTRY (co, loc->pargfid, loc->name, - entry_fn, entry_free_fn, xtra_len, wind); - - changelog_set_usable_record_and_length (frame->local, xtra_len, 2); + CHANGELOG_INVOKE_FOP (priv, create, frame, this, loc, + flags, mode, umask, fd, xdata); wind: STACK_WIND (frame, changelog_create_cbk, @@ -585,7 +461,8 @@ changelog_fsetattr_cbk (call_frame_t *frame, CHANGELOG_COND_GOTO (priv, ((op_ret < 0) || !local), unwind); - changelog_update (this, priv, local, CHANGELOG_TYPE_METADATA); + CHANGELOG_INVOKE_CFOP (this, priv, + write, local, CHANGELOG_TYPE_METADATA); unwind: CHANGELOG_STACK_UNWIND (fsetattr, frame, op_ret, op_errno, @@ -606,8 +483,8 @@ changelog_fsetattr (call_frame_t *frame, priv = this->private; CHANGELOG_NOT_ACTIVE_THEN_GOTO (frame, priv, wind); - CHANGELOG_INIT (this, frame->local, - fd->inode, fd->inode->gfid, 0); + CHANGELOG_INVOKE_FOP (priv, fsetattr, + frame, this, fd, stbuf, valid, xdata); wind: STACK_WIND (frame, changelog_fsetattr_cbk, @@ -632,7 +509,8 @@ changelog_setattr_cbk (call_frame_t *frame, CHANGELOG_COND_GOTO (priv, ((op_ret < 0) || !local), unwind); - changelog_update (this, priv, local, CHANGELOG_TYPE_METADATA); + CHANGELOG_INVOKE_CFOP (this, priv, + write, local, CHANGELOG_TYPE_METADATA); unwind: CHANGELOG_STACK_UNWIND (setattr, frame, op_ret, op_errno, @@ -651,8 +529,8 @@ changelog_setattr (call_frame_t *frame, priv = this->private; CHANGELOG_NOT_ACTIVE_THEN_GOTO (frame, priv, wind); - CHANGELOG_INIT (this, frame->local, - loc->inode, loc->inode->gfid, 0); + CHANGELOG_INVOKE_FOP (priv, setattr, + frame, this, loc, stbuf, valid, xdata); wind: STACK_WIND (frame, changelog_setattr_cbk, @@ -676,7 +554,8 @@ changelog_fremovexattr_cbk (call_frame_t *frame, CHANGELOG_COND_GOTO (priv, ((op_ret < 0) || !local), unwind); - changelog_update (this, priv, local, CHANGELOG_TYPE_METADATA); + CHANGELOG_INVOKE_CFOP (this, priv, + write, local, CHANGELOG_TYPE_METADATA); unwind: CHANGELOG_STACK_UNWIND (fremovexattr, frame, op_ret, op_errno, xdata); @@ -693,8 +572,8 @@ changelog_fremovexattr (call_frame_t *frame, xlator_t *this, priv = this->private; CHANGELOG_NOT_ACTIVE_THEN_GOTO (frame, priv, wind); - CHANGELOG_INIT (this, frame->local, - fd->inode, fd->inode->gfid, 0); + CHANGELOG_INVOKE_FOP (priv, fremovexattr, + frame, this, fd, name, xdata); wind: STACK_WIND (frame, changelog_fremovexattr_cbk, @@ -716,7 +595,8 @@ changelog_removexattr_cbk (call_frame_t *frame, CHANGELOG_COND_GOTO (priv, ((op_ret < 0) || !local), unwind); - changelog_update (this, priv, local, CHANGELOG_TYPE_METADATA); + CHANGELOG_INVOKE_CFOP (this, priv, + write, local, CHANGELOG_TYPE_METADATA); unwind: CHANGELOG_STACK_UNWIND (removexattr, frame, op_ret, op_errno, xdata); @@ -733,8 +613,7 @@ changelog_removexattr (call_frame_t *frame, xlator_t *this, priv = this->private; CHANGELOG_NOT_ACTIVE_THEN_GOTO (frame, priv, wind); - CHANGELOG_INIT (this, frame->local, - loc->inode, loc->inode->gfid, 0); + CHANGELOG_INVOKE_FOP (priv, removexattr, frame, this, loc, name, xdata); wind: STACK_WIND (frame, changelog_removexattr_cbk, @@ -758,7 +637,8 @@ changelog_setxattr_cbk (call_frame_t *frame, CHANGELOG_COND_GOTO (priv, ((op_ret < 0) || !local), unwind); - changelog_update (this, priv, local, CHANGELOG_TYPE_METADATA); + CHANGELOG_INVOKE_CFOP (this, priv, + write, local, CHANGELOG_TYPE_METADATA); unwind: CHANGELOG_STACK_UNWIND (setxattr, frame, op_ret, op_errno, xdata); @@ -776,8 +656,8 @@ changelog_setxattr (call_frame_t *frame, priv = this->private; CHANGELOG_NOT_ACTIVE_THEN_GOTO (frame, priv, wind); - CHANGELOG_INIT (this, frame->local, - loc->inode, loc->inode->gfid, 0); + CHANGELOG_INVOKE_FOP (priv, setxattr, + frame, this, loc, dict, flags, xdata); wind: STACK_WIND (frame, changelog_setxattr_cbk, @@ -799,7 +679,8 @@ changelog_fsetxattr_cbk (call_frame_t *frame, CHANGELOG_COND_GOTO (priv, ((op_ret < 0) || !local), unwind); - changelog_update (this, priv, local, CHANGELOG_TYPE_METADATA); + CHANGELOG_INVOKE_CFOP (this, priv, + write, local, CHANGELOG_TYPE_METADATA); unwind: CHANGELOG_STACK_UNWIND (fsetxattr, frame, op_ret, op_errno, xdata); @@ -817,8 +698,8 @@ changelog_fsetxattr (call_frame_t *frame, priv = this->private; CHANGELOG_NOT_ACTIVE_THEN_GOTO (frame, priv, wind); - CHANGELOG_INIT (this, frame->local, - fd->inode, fd->inode->gfid, 0); + CHANGELOG_INVOKE_FOP (priv, fsetxattr, + frame, this, fd, dict, flags, xdata); wind: STACK_WIND (frame, changelog_fsetxattr_cbk, @@ -850,7 +731,7 @@ changelog_truncate_cbk (call_frame_t *frame, CHANGELOG_COND_GOTO (priv, ((op_ret < 0) || !local), unwind); - changelog_update (this, priv, local, CHANGELOG_TYPE_DATA); + CHANGELOG_INVOKE_CFOP (this, priv, write, local, CHANGELOG_TYPE_DATA); unwind: CHANGELOG_STACK_UNWIND (truncate, frame, @@ -867,8 +748,7 @@ changelog_truncate (call_frame_t *frame, priv = this->private; CHANGELOG_NOT_ACTIVE_THEN_GOTO (frame, priv, wind); - CHANGELOG_INIT (this, frame->local, - loc->inode, loc->inode->gfid, 0); + CHANGELOG_INVOKE_FOP (priv, truncate, frame, this, loc, offset, xdata); wind: STACK_WIND (frame, changelog_truncate_cbk, @@ -891,7 +771,7 @@ changelog_ftruncate_cbk (call_frame_t *frame, CHANGELOG_COND_GOTO (priv, ((op_ret < 0) || !local), unwind); - changelog_update (this, priv, local, CHANGELOG_TYPE_DATA); + CHANGELOG_INVOKE_CFOP (this, priv, write, local, CHANGELOG_TYPE_DATA); unwind: CHANGELOG_STACK_UNWIND (ftruncate, frame, @@ -908,8 +788,7 @@ changelog_ftruncate (call_frame_t *frame, priv = this->private; CHANGELOG_NOT_ACTIVE_THEN_GOTO (frame, priv, wind); - CHANGELOG_INIT (this, frame->local, - fd->inode, fd->inode->gfid, 0); + CHANGELOG_INVOKE_FOP (priv, ftruncate, frame, this, fd, offset, xdata); wind: STACK_WIND (frame, changelog_ftruncate_cbk, @@ -934,7 +813,7 @@ changelog_writev_cbk (call_frame_t *frame, void *cookie, xlator_t *this, CHANGELOG_COND_GOTO (priv, ((op_ret <= 0) || !local), unwind); - changelog_update (this, priv, local, CHANGELOG_TYPE_DATA); + CHANGELOG_INVOKE_CFOP (this, priv, write, local, CHANGELOG_TYPE_DATA); unwind: CHANGELOG_STACK_UNWIND (writev, frame, @@ -951,10 +830,11 @@ changelog_writev (call_frame_t *frame, changelog_priv_t *priv = NULL; priv = this->private; + CHANGELOG_NOT_ACTIVE_THEN_GOTO (frame, priv, wind); - CHANGELOG_INIT (this, frame->local, - fd->inode, fd->inode->gfid, 0); + CHANGELOG_INVOKE_FOP (priv, writev, frame, this, fd, + vector, count, offset, flags, iobref, xdata); wind: STACK_WIND (frame, changelog_writev_cbk, FIRST_CHILD (this), @@ -994,6 +874,15 @@ changelog_assign_encoding (changelog_priv_t *priv, char *enc) } } +static void +changelog_assign_policy (changelog_priv_t *priv, char *pol) +{ + if ( strncmp (pol, "default", 7) == 0 ) + priv->policy = CHANGELOG_LOG_POLICY_DEFAULT; + else if ( strncmp (pol, "replication", 11) == 0 ) + priv->policy = CHANGELOG_LOG_POLICY_REPLICATE; +} + /* cleanup any helper threads that are running */ static void changelog_cleanup_helper_threads (xlator_t *this, changelog_priv_t *priv) @@ -1016,15 +905,17 @@ changelog_spawn_helper_threads (xlator_t *this, changelog_priv_t *priv) int ret = 0; priv->cr.this = this; - ret = gf_thread_create (&priv->cr.rollover_th, - NULL, changelog_rollover, priv); - if (ret) - goto out; + if (priv->rollover_time) { + ret = pthread_create (&priv->cr.rollover_th, + NULL, changelog_rollover, priv); + if (ret) + goto out; + } if (priv->fsync_interval) { priv->cf.this = this; - ret = gf_thread_create (&priv->cf.fsync_th, - NULL, changelog_fsync_thread, priv); + ret = pthread_create (&priv->cf.fsync_th, + NULL, changelog_fsync_thread, priv); } if (ret) @@ -1088,8 +979,8 @@ changelog_spawn_notifier (xlator_t *this, changelog_priv_t *priv) priv->cn.this = this; priv->cn.rfd = pipe_fd[0]; - ret = gf_thread_create (&priv->cn.notify_th, - NULL, changelog_notifier, priv); + ret = pthread_create (&priv->cn.notify_th, + NULL, changelog_notifier, priv); out: return ret; @@ -1117,10 +1008,10 @@ mem_acct_init (xlator_t *this) static int changelog_init (xlator_t *this, changelog_priv_t *priv) { - int i = 0; - int ret = -1; - struct timeval tv = {0,}; - changelog_log_data_t cld = {0,}; + int i = 0; + int ret = -1; + char *cname = NULL; + struct timeval tv = {0,}; ret = gettimeofday (&tv, NULL); if (ret) { @@ -1148,21 +1039,18 @@ changelog_init (xlator_t *this, changelog_priv_t *priv) if (ret) goto out; - /** - * start with a fresh changelog file every time. this is done - * in case there was an encoding change. so... things are kept - * simple here. - */ - ret = changelog_fill_rollover_data (&cld, _gf_false); - if (ret) - goto out; + cname = CHANGELOG_FNAME_FROM_POLICY (priv->cp); LOCK (&priv->lock); { - ret = changelog_inject_single_event (this, priv, &cld); + ret = CHANGELOG_INVOKE_CFOP (this, priv, + open, cname, _gf_false); } UNLOCK (&priv->lock); + if (ret) + goto out; + /* ... and finally spawn the helpers threads */ ret = changelog_spawn_helper_threads (this, priv); @@ -1175,11 +1063,11 @@ reconfigure (xlator_t *this, dict_t *options) { int ret = 0; char *tmp = NULL; + char *cname = NULL; changelog_priv_t *priv = NULL; gf_boolean_t active_earlier = _gf_true; gf_boolean_t active_now = _gf_true; changelog_time_slice_t *slice = NULL; - changelog_log_data_t cld = {0,}; priv = this->private; if (!priv) @@ -1230,15 +1118,13 @@ reconfigure (xlator_t *this, dict_t *options) priv->fsync_interval, options, int32, out); if (active_now || active_earlier) { - ret = changelog_fill_rollover_data (&cld, !active_now); - if (ret) - goto out; - slice = &priv->slice; + cname = CHANGELOG_FNAME_FROM_POLICY (priv->cp); LOCK (&priv->lock); { - ret = changelog_inject_single_event (this, priv, &cld); + ret = CHANGELOG_INVOKE_CFOP (this, priv, rollover, + cname, !active_now); if (!ret && active_now) SLICE_VERSION_UPDATE (slice); } @@ -1345,20 +1231,43 @@ init (xlator_t *this) GF_OPTION_INIT ("encoding", tmp, str, out); changelog_assign_encoding (priv, tmp); - GF_OPTION_INIT ("rollover-time", priv->rollover_time, int32, out); + tmp = NULL; + + GF_OPTION_INIT ("policy", tmp, str, out); + changelog_assign_policy (priv, tmp); GF_OPTION_INIT ("fsync-interval", priv->fsync_interval, int32, out); - changelog_encode_change(priv); + GF_ASSERT (cb_encoder[priv->encode_mode].encoder == priv->encode_mode); + priv->ce = &cb_encoder[priv->encode_mode]; GF_ASSERT (cb_bootstrap[priv->op_mode].mode == priv->op_mode); priv->cb = &cb_bootstrap[priv->op_mode]; + GF_ASSERT (cb_policy[priv->policy].policy == priv->policy); + priv->cp = &cb_policy[priv->policy]; + /* ... now bootstrap the logger */ ret = priv->cb->ctor (this, &priv->cd); if (ret) goto out; + /* ... init logging policy */ + ret = priv->cp->init_policy (this, priv, priv->cp); + if (ret) + goto out; + + /* override the value if set */ + if (dict_get (this->options, "rollover-time")) { + ret = dict_get_int32 (this->options, + "rollover-time", &priv->rollover_time); + if (ret) { + gf_log (this->name, GF_LOG_ERROR, + "Cannot get value for \"rollover-time\""); + goto out; + } + } + priv->changelog_fd = -1; ret = changelog_init (this, priv); if (ret) @@ -1437,7 +1346,7 @@ struct xlator_cbks cbks = { struct volume_options options[] = { {.key = {"changelog"}, .type = GF_OPTION_TYPE_BOOL, - .default_value = "off", + .default_value = "on", .description = "enable/disable change-logging" }, {.key = {"changelog-brick"}, @@ -1462,8 +1371,7 @@ struct volume_options options[] = { .description = "encoding type for changelogs" }, {.key = {"rollover-time"}, - .default_value = "60", - .type = GF_OPTION_TYPE_TIME, + .type = GF_OPTION_TYPE_INT, .description = "time to switch to a new changelog file (in seconds)" }, {.key = {"fsync-interval"}, @@ -1472,6 +1380,12 @@ struct volume_options options[] = { .description = "do not open CHANGELOG file with O_SYNC mode." " instead perform fsync() at specified intervals" }, + {.key = {"policy"}, + .type = GF_OPTION_TYPE_STR, + .default_value = "replication", + .value = {"default", "replication"}, + .description = "Logging policies" + }, {.key = {NULL} }, }; diff --git a/xlators/features/changelog/src/policy/changelog-policy-default.c b/xlators/features/changelog/src/policy/changelog-policy-default.c new file mode 100644 index 000000000..089bc10e4 --- /dev/null +++ b/xlators/features/changelog/src/policy/changelog-policy-default.c @@ -0,0 +1,44 @@ +/* + Copyright (c) 2013 Red Hat, Inc. + This file is part of GlusterFS. + + This file is licensed to you under your choice of the GNU Lesser + General Public License, version 3 or any later version (LGPLv3 or + later), or the GNU General Public License, version 2 (GPLv2), in all + cases as published by the Free Software Foundation. +*/ + +#include "changelog-policy.h" +#include "changelog-fops.h" + +int +changelog_default_policy_init (xlator_t *this, + changelog_priv_t *priv, + struct changelog_logpolicy *cp) +{ + priv->rollover_time = 15; + + priv->no_gfid_hdr = _gf_false; + + cp->cpriv = GF_CALLOC (1, sizeof (off_t), + gf_changelog_mt_fop_policy_t); + if (!cp->cpriv) + return -1; + + (void) memset (cp->changelog_name, '\0', PATH_MAX); + (void) memcpy (cp->changelog_name, + CHANGELOG_FILE_NAME, strlen (CHANGELOG_FILE_NAME)); + + cp->fops = &changelog_default_fops; /* default logging policy */ + cp->cops = &changelog_default_cops; /* default changelog operations */ + + return 0; +} + +int +changelog_default_policy_fini (xlator_t *this, + struct changelog_logpolicy *cp) +{ + GF_FREE (cp->cpriv); + return 0; +} diff --git a/xlators/features/changelog/src/policy/changelog-policy-replication.c b/xlators/features/changelog/src/policy/changelog-policy-replication.c new file mode 100644 index 000000000..536339939 --- /dev/null +++ b/xlators/features/changelog/src/policy/changelog-policy-replication.c @@ -0,0 +1,1184 @@ +/* + Copyright (c) 2013 Red Hat, Inc. + This file is part of GlusterFS. + + This file is licensed to you under your choice of the GNU Lesser + General Public License, version 3 or any later version (LGPLv3 or + later), or the GNU General Public License, version 2 (GPLv2), in all + cases as published by the Free Software Foundation. +*/ + +#include "changelog-policy.h" +#include "changelog-encoders.h" +#include "changelog-fops.h" + +#define JOURNAL_NAME "TERM" + +#define JOURNAL_SECTOR_SIZE 128 + +#define PRE_OP_MARK 0x5F4552505FULL /* _PRE_ */ +#define POST_OP_MARK 0x5F54534F505FULL /* _POST_ */ + +/** + * assume an ever increasing index for now.. + */ +static unsigned long nsr_index = 1; + +static unsigned long +get_index(changelog_priv_t *priv) { + unsigned long idx = 0; + + LOCK (&priv->lock); + { + idx = nsr_index++; + } + UNLOCK (&priv->lock); + + return idx; +} + +static void +reset_index(changelog_priv_t *priv) { + nsr_index = 1; +} + + +#if 0 +static inline void +//changelog_replication_assign_term (changelog_priv_t *priv, + changelog_local_t *local) +{ + local->nr_bytes = 0; + local->lu.val = get_index (priv); +} +#endif + +size_t +number_fn (void *data, char *buffer, gf_boolean_t encode) +{ + char buf[1024] = {0,}; + size_t bufsz = 0; + unsigned long long nr = 0; + + nr = *(unsigned long long *) data; + + if (encode) { + (void) snprintf (buf, sizeof (buf), "%llu", nr); + CHANGELOG_FILL_BUFFER (buffer, bufsz, buf, strlen (buf)); + } else + CHANGELOG_FILL_BUFFER (buffer, bufsz, + &nr, sizeof (unsigned long long)); + + return bufsz; +} + +size_t +uuid_fn (void *data, char *buffer, gf_boolean_t encode) +{ + char buf[1024] = {0,}; + uuid_t uuid = {0,}; + size_t bufsz = 0; + + memcpy (uuid, (uuid_t *) data, sizeof (uuid_t)); + + if (encode) { + char *tmpbuf = uuid_utoa (uuid); + (void) snprintf (buf, sizeof (buf), "%s", tmpbuf); + CHANGELOG_FILL_BUFFER (buffer, bufsz, buf, strlen (buf)); + } else + CHANGELOG_FILL_BUFFER (buffer, bufsz, uuid, sizeof (uuid_t)); + + return bufsz; +} + +#define CHANGELOG_FILL_USIGNLL(co, number, converter, xlen) do { \ + co->co_convert = converter; \ + co->co_free = NULL; \ + co->co_type = CHANGELOG_OPT_REC_ULL; \ + co->co_number = (unsigned long long) number; \ + xlen += sizeof (unsigned long long); \ + if (!co->co_convert) \ + co->co_len = sizeof (unsigned long long); \ + } while (0) + +#define CHANGELOG_FILL_UUID(co, uuid, converter, xlen) do { \ + co->co_convert = converter; \ + co->co_free = NULL; \ + co->co_type = CHANGELOG_OPT_REC_UUID; \ + uuid_copy (co->co_uuid, uuid); \ + xlen += sizeof (uuid_t); \ + } while (0) + + +/* TBD: move declarations here and nsr.c into a common place */ +#define NSR_TERM_XATTR "trusted.nsr.term" +#define RECON_TERM_XATTR "trusted.nsr.recon-term" +#define RECON_INDEX_XATTR "trusted.nsr.recon-index" + +static gf_boolean_t +changelog_fix_term(xlator_t *this, + changelog_local_t *local, + dict_t *xdata) +{ + int32_t old_term, new_term; + changelog_priv_t *priv = this->private; + int ret = 0; + char nfile[PATH_MAX] = {0,}; + int32_t recon_term, recon_index; + changelog_rollover_data_t crd; + + // If coming via the regular IO path, we should get the dict "nsr-term" + // If coming via reconciliation, we should get the dicts "nsr-recon-term" + // that indicates the term and "nsr-recon-index" for the index + if (dict_get_int32(xdata,NSR_TERM_XATTR,&new_term) == 0) { + old_term = priv->term; + + if (old_term != new_term) { + GF_ASSERT(new_term > old_term); + LOCK (&priv->lock); + reset_index(priv); + priv->term = new_term; + (void) snprintf (nfile, PATH_MAX, "%s.%d", + JOURNAL_NAME, priv->term); + ret = CHANGELOG_INVOKE_CFOP(this, priv, rollover, + nfile, _gf_false); + UNLOCK (&priv->lock); + if (ret != 0) + return _gf_false; + } + local->nr_bytes = 0; + local->lu.val = get_index (priv); + } else if ((dict_get_int32(xdata, RECON_TERM_XATTR, &recon_term) == 0) && + (dict_get_int32(xdata, RECON_INDEX_XATTR, &recon_index) == 0)) { + + old_term = priv->term; + + if (old_term != recon_term) { + LOCK (&priv->lock); + priv->term = recon_term; + (void) snprintf (crd.crd_changelog_name, PATH_MAX, "%s.%d", JOURNAL_NAME, priv->term); + crd.crd_prealloc_size = 0; + if (changelog_open(this, priv, local, &crd) != 0) + return _gf_false; + UNLOCK (&priv->lock); + } + local->nr_bytes = 0; + local->lu.val = recon_index; + } else { + return _gf_false; + } + + return _gf_true; +} + + + +/** override FOPS */ + +int32_t +changelog_replication_rmdir (call_frame_t *frame, xlator_t *this, + loc_t *loc, int xflags, dict_t *xdata) +{ + int ret = -1; + size_t xtra_len = 0; + changelog_opt_t *co = NULL; + changelog_priv_t *priv = NULL; + changelog_local_t *local = NULL; + + priv = this->private; + + /*
 + FOP + GFID + Entry */
+        CHANGELOG_INIT_NOCHECK (this, local, NULL, loc->inode->gfid, 4);
+        if (!local)
+                goto out;
+
+        co = changelog_get_usable_buffer (local);
+        if (!co)
+                goto out;
+
+        if (changelog_fix_term(this, local, xdata) == _gf_false)
+                goto out;
+
+        CHANGELOG_FILL_USIGNLL (co, PRE_OP_MARK, NULL, xtra_len);
+        co++;
+
+        CHANGELOG_FILL_FOP_NUMBER (co, frame->root->op, fop_fn, xtra_len);
+        co++;
+
+        CHANGELOG_FILL_UUID (co, loc->inode->gfid, uuid_fn, xtra_len);
+        co++;
+
+        CHANGELOG_FILL_ENTRY (co, loc->pargfid, loc->name,
+                              entry_fn, entry_free_fn, xtra_len, out);
+
+        changelog_set_usable_record_and_length (local, xtra_len, 4);
+
+        //changelog_replication_assign_term (priv, local);
+
+        frame->local = local;
+        ret = 0;
+
+ out:
+        if (ret)
+                changelog_local_cleanup (this, local);
+        return ret;
+}
+
+int32_t
+changelog_replication_unlink (call_frame_t *frame, xlator_t *this,
+                              loc_t *loc, int xflags, dict_t *xdata)
+{
+        return changelog_replication_rmdir (frame, this, loc, xflags, xdata);
+}
+
+int32_t
+changelog_replication_rename (call_frame_t *frame, xlator_t *this,
+                              loc_t *oldloc, loc_t *newloc, dict_t *xdata)
+{
+        int                ret      = -1;
+        size_t             xtra_len = 0;
+        changelog_opt_t   *co       = NULL;
+        changelog_priv_t  *priv     = NULL;
+        changelog_local_t *local    = NULL;
+
+        priv = this->private;
+
+        /* 
 + FOP + GFID + OLDLOC + NEWLOC */
+        CHANGELOG_INIT_NOCHECK (this, local, NULL, oldloc->inode->gfid, 5);
+        if (!local)
+                goto out;
+
+        co = changelog_get_usable_buffer (local);
+        if (!co)
+                goto out;
+
+        if (changelog_fix_term(this, local, xdata) == _gf_false)
+                goto out;
+
+        CHANGELOG_FILL_USIGNLL (co, PRE_OP_MARK, NULL, xtra_len);
+        co++;
+
+        CHANGELOG_FILL_FOP_NUMBER (co, frame->root->op, fop_fn, xtra_len);
+        co++;
+
+        CHANGELOG_FILL_UUID (co, oldloc->inode->gfid, uuid_fn, xtra_len);
+        co++;
+
+        CHANGELOG_FILL_ENTRY (co, oldloc->pargfid, oldloc->name,
+                              entry_fn, entry_free_fn, xtra_len, out);
+        co++;
+
+        CHANGELOG_FILL_ENTRY (co, newloc->pargfid, newloc->name,
+                              entry_fn, entry_free_fn, xtra_len, out);
+
+        //changelog_replication_assign_term (priv, local);
+
+        changelog_set_usable_record_and_length (local, xtra_len, 5);
+
+        frame->local = local;
+        ret = 0;
+
+ out:
+        if (ret)
+                changelog_local_cleanup (this, local);
+        return ret;
+}
+
+int32_t
+changelog_replication_link (call_frame_t *frame,
+                            xlator_t *this, loc_t *oldloc,
+                            loc_t *newloc, dict_t *xdata)
+{
+        int                ret      = -1;
+        size_t             xtra_len = 0;
+        changelog_opt_t   *co       = NULL;
+        changelog_priv_t  *priv     = NULL;
+        changelog_local_t *local    = NULL;
+
+        priv = this->private;
+
+        /* 
 + FOP + GFID + Entry */
+        CHANGELOG_INIT_NOCHECK (this, local, NULL, oldloc->gfid, 4);
+        if (!local)
+                goto out;
+
+        co = changelog_get_usable_buffer (local);
+        if (!co)
+                goto out;
+
+        if (changelog_fix_term(this, local, xdata) == _gf_false)
+                goto out;
+
+        CHANGELOG_FILL_USIGNLL (co, PRE_OP_MARK, NULL, xtra_len);
+        co++;
+
+        CHANGELOG_FILL_FOP_NUMBER (co, frame->root->op, fop_fn, xtra_len);
+        co++;
+
+        CHANGELOG_FILL_UUID (co, oldloc->gfid, uuid_fn, xtra_len);
+        co++;
+
+        CHANGELOG_FILL_ENTRY (co, newloc->pargfid, newloc->name,
+                              entry_fn, entry_free_fn, xtra_len, out);
+
+        changelog_set_usable_record_and_length (local, xtra_len, 4);
+
+        //changelog_replication_assign_term (priv, local);
+
+        frame->local = local;
+        ret = 0;
+
+ out:
+        if (ret)
+                changelog_local_cleanup (this, local);
+        return ret;
+}
+
+int32_t
+changelog_replication_mkdir (call_frame_t *frame,
+                             xlator_t *this, loc_t *loc,
+                             mode_t mode, mode_t umask, dict_t *xdata)
+{
+        int                ret      = -1;
+        uuid_t             gfid     = {0,};
+        void              *uuid_req = NULL;
+        size_t             xtra_len = 0;
+        changelog_opt_t   *co       = NULL;
+        changelog_priv_t  *priv     = NULL;
+        changelog_local_t *local    = NULL;
+
+        priv = this->private;
+
+        ret = dict_get_ptr (xdata, "gfid-req", &uuid_req);
+        if (ret) {
+                gf_log (this->name, GF_LOG_DEBUG,
+                        "failed to get gfid from dict");
+                goto out;
+        }
+        uuid_copy (gfid, uuid_req);
+
+        ret = -1;
+
+        /* 
 + FOP + GFID + Entry */
+        CHANGELOG_INIT_NOCHECK (this, local, NULL, gfid, 4);
+        if (!local)
+                goto out;
+
+        co = changelog_get_usable_buffer (local);
+        if (!co)
+                goto out;
+
+        if (changelog_fix_term(this, local, xdata) == _gf_false)
+                goto out;
+
+        CHANGELOG_FILL_USIGNLL (co, PRE_OP_MARK, NULL, xtra_len);
+        co++;
+
+        CHANGELOG_FILL_FOP_NUMBER (co, frame->root->op, fop_fn, xtra_len);
+        co++;
+
+        CHANGELOG_FILL_UUID (co, gfid, uuid_fn, xtra_len);
+        co++;
+
+        CHANGELOG_FILL_ENTRY (co, loc->pargfid, loc->name,
+                              entry_fn, entry_free_fn, xtra_len, out);
+
+        changelog_set_usable_record_and_length (local, xtra_len, 4);
+
+        //changelog_replication_assign_term (priv, local);
+
+        frame->local = local;
+        ret = 0;
+
+ out:
+        if (ret)
+                changelog_local_cleanup (this, local);
+        return ret;
+}
+
+int32_t
+changelog_replication_symlink (call_frame_t *frame, xlator_t *this,
+                               const char *linkname, loc_t *loc,
+                               mode_t umask, dict_t *xdata)
+{
+        int                ret      = -1;
+        size_t             xtra_len = 0;
+        uuid_t             gfid     = {0,};
+        void              *uuid_req = NULL;
+        changelog_opt_t   *co       = NULL;
+        changelog_priv_t  *priv     = NULL;
+        changelog_local_t *local    = NULL;
+
+        priv = this->private;
+
+        ret = dict_get_ptr (xdata, "gfid-req", &uuid_req);
+        if (ret) {
+                gf_log (this->name, GF_LOG_DEBUG,
+                        "failed to get gfid from dict");
+                goto out;
+        }
+        uuid_copy (gfid, uuid_req);
+
+        ret = -1;
+
+        /* 
 + FOP + GFID + Entry */
+        CHANGELOG_INIT_NOCHECK (this, local, NULL, gfid, 4);
+        if (!local)
+                goto out;
+
+        co = changelog_get_usable_buffer (local);
+        if (!co)
+                goto out;
+
+        if (changelog_fix_term(this, local, xdata) == _gf_false)
+                goto out;
+
+        CHANGELOG_FILL_USIGNLL (co, PRE_OP_MARK, NULL, xtra_len);
+        co++;
+
+        CHANGELOG_FILL_FOP_NUMBER (co, frame->root->op, fop_fn, xtra_len);
+        co++;
+
+        CHANGELOG_FILL_UUID (co, gfid, uuid_fn, xtra_len);
+        co++;
+
+        CHANGELOG_FILL_ENTRY (co, loc->pargfid, loc->name,
+                              entry_fn, entry_free_fn, xtra_len, out);
+
+        changelog_set_usable_record_and_length (local, xtra_len, 4);
+
+        //changelog_replication_assign_term (priv, local);
+
+        frame->local = local;
+        ret = 0;
+
+ out:
+        if (ret)
+                changelog_local_cleanup (this, local);
+        return ret;
+}
+
+int32_t
+changelog_replication_mknod (call_frame_t *frame,
+                             xlator_t *this, loc_t *loc,
+                             mode_t mode, dev_t dev,
+                             mode_t umask, dict_t *xdata)
+{
+        int                ret      = -1;
+        uuid_t             gfid     = {0,};
+        void              *uuid_req = NULL;
+        size_t             xtra_len = 0;
+        changelog_opt_t   *co       = NULL;
+        changelog_priv_t  *priv     = NULL;
+        changelog_local_t *local    = NULL;
+
+        priv = this->private;
+
+        ret = dict_get_ptr (xdata, "gfid-req", &uuid_req);
+        if (ret) {
+                gf_log (this->name, GF_LOG_DEBUG,
+                        "failed to get gfid from dict");
+                goto out;
+        }
+        uuid_copy (gfid, uuid_req);
+
+        ret = -1;
+
+        /* 
 + FOP + GFID + Entry */
+        CHANGELOG_INIT_NOCHECK (this, local, NULL, gfid, 4);
+
+        co = changelog_get_usable_buffer (local);
+        if (!co)
+                goto out;
+
+        if (changelog_fix_term(this, local, xdata) == _gf_false)
+                goto out;
+
+        CHANGELOG_FILL_USIGNLL (co, PRE_OP_MARK, NULL, xtra_len);
+        co++;
+
+        CHANGELOG_FILL_FOP_NUMBER (co, frame->root->op, fop_fn, xtra_len);
+        co++;
+
+        CHANGELOG_FILL_UUID (co, gfid, uuid_fn, xtra_len);
+        co++;
+
+        CHANGELOG_FILL_ENTRY (co, loc->pargfid, loc->name,
+                              entry_fn, entry_free_fn, xtra_len, out);
+
+        changelog_set_usable_record_and_length (local, xtra_len, 4);
+
+        //changelog_replication_assign_term (priv, local);
+
+        frame->local = local;
+        ret = 0;
+
+ out:
+        if (ret)
+                changelog_local_cleanup (this, local);
+        return ret;
+}
+
+int32_t
+changelog_replication_create (call_frame_t *frame, xlator_t *this,
+                              loc_t *loc, int32_t flags, mode_t mode,
+                              mode_t umask, fd_t *fd, dict_t *xdata)
+{
+        int                ret      = -1;
+        uuid_t             gfid     = {0,};
+        void              *uuid_req = NULL;
+        size_t             xtra_len = 0;
+        changelog_opt_t   *co       = NULL;
+        changelog_priv_t  *priv     = NULL;
+        changelog_local_t *local    = NULL;
+
+        priv = this->private;
+
+        ret = dict_get_ptr (xdata, "gfid-req", &uuid_req);
+        if (ret) {
+                gf_log (this->name, GF_LOG_DEBUG,
+                        "failed to get gfid from dict");
+                goto out;
+        }
+        uuid_copy (gfid, uuid_req);
+
+        ret = -1;
+
+        /* 
 + FOP + GFID + Entry */
+        CHANGELOG_INIT_NOCHECK (this, local, NULL, gfid, 4);
+
+        co = changelog_get_usable_buffer (local);
+        if (!co)
+                goto out;
+
+        if (changelog_fix_term(this, local, xdata) == _gf_false)
+                goto out;
+
+        CHANGELOG_FILL_USIGNLL (co, PRE_OP_MARK, NULL, xtra_len);
+        co++;
+
+        CHANGELOG_FILL_FOP_NUMBER (co, frame->root->op, fop_fn, xtra_len);
+        co++;
+
+        CHANGELOG_FILL_UUID (co, gfid, uuid_fn, xtra_len);
+        co++;
+
+        CHANGELOG_FILL_ENTRY (co, loc->pargfid, loc->name,
+                              entry_fn, entry_free_fn, xtra_len, out);
+
+        changelog_set_usable_record_and_length (local, xtra_len, 4);
+
+        //changelog_replication_assign_term (priv, local);
+
+        frame->local = local;
+        ret = 0;
+
+ out:
+        if (ret)
+                changelog_local_cleanup (this, local);
+        return ret;
+}
+
+int32_t
+changelog_replication_fsetattr (call_frame_t *frame,
+                                xlator_t *this, fd_t *fd,
+                                struct iatt *stbuf, int32_t valid,
+                                dict_t *xdata)
+{
+        int                ret      = -1;
+        size_t             xtra_len = 0;
+        changelog_opt_t   *co       = NULL;
+        changelog_priv_t  *priv     = NULL;
+        changelog_local_t *local    = NULL;
+
+        priv = this->private;
+
+        /* 
 + FOP + GFID */
+        CHANGELOG_INIT_NOCHECK (this, local, NULL, fd->inode->gfid, 3);
+        if (!local)
+                goto out;
+
+        co = changelog_get_usable_buffer (local);
+        if (!co)
+                goto out;
+
+        if (changelog_fix_term(this, local, xdata) == _gf_false)
+                goto out;
+
+        CHANGELOG_FILL_USIGNLL (co, PRE_OP_MARK, NULL, xtra_len);
+        co++;
+
+        CHANGELOG_FILL_FOP_NUMBER (co, frame->root->op, fop_fn, xtra_len);
+        co++;
+
+        CHANGELOG_FILL_UUID (co, fd->inode->gfid, uuid_fn, xtra_len);
+
+        changelog_set_usable_record_and_length (local, xtra_len, 3);
+
+        //changelog_replication_assign_term (priv, local);
+
+        frame->local = local;
+        ret = 0;
+
+ out:
+        if (ret)
+                changelog_local_cleanup (this, local);
+        return ret;
+}
+
+int32_t
+changelog_replication_setattr (call_frame_t *frame,
+                               xlator_t *this, loc_t *loc,
+                               struct iatt *stbuf, int32_t valid, dict_t *xdata)
+{
+        int                ret      = -1;
+        size_t             xtra_len = 0;
+        changelog_opt_t   *co       = NULL;
+        changelog_priv_t  *priv     = NULL;
+        changelog_local_t *local    = NULL;
+
+        priv = this->private;
+
+        /* 
 + FOP + GFID */
+        CHANGELOG_INIT_NOCHECK (this, local, NULL, loc->inode->gfid, 3);
+        if (!local)
+                goto out;
+
+        co = changelog_get_usable_buffer (local);
+        if (!co)
+                goto out;
+
+        if (changelog_fix_term(this, local, xdata) == _gf_false)
+                goto out;
+
+        CHANGELOG_FILL_USIGNLL (co, PRE_OP_MARK, NULL, xtra_len);
+        co++;
+
+        CHANGELOG_FILL_FOP_NUMBER (co, frame->root->op, fop_fn, xtra_len);
+        co++;
+
+        CHANGELOG_FILL_UUID (co, loc->inode->gfid, uuid_fn, xtra_len);
+
+        changelog_set_usable_record_and_length (local, xtra_len, 3);
+
+        //changelog_replication_assign_term (priv, local);
+
+        frame->local = local;
+        ret = 0;
+
+ out:
+        if (ret)
+                changelog_local_cleanup (this, local);
+        return ret;
+}
+
+int32_t
+changelog_replication_fremovexattr (call_frame_t *frame, xlator_t *this,
+                                    fd_t *fd, const char *name, dict_t *xdata)
+{
+        int                ret      = -1;
+        size_t             xtra_len = 0;
+        changelog_opt_t   *co       = NULL;
+        changelog_priv_t  *priv     = NULL;
+        changelog_local_t *local    = NULL;
+        int32_t            xattr_op;
+
+        priv = this->private;
+
+        /* 
 + FOP + GFID */
+        CHANGELOG_INIT_NOCHECK (this, local, NULL, fd->inode->gfid, 3);
+        if (!local)
+                goto out;
+
+        co = changelog_get_usable_buffer (local);
+        if (!co)
+                goto out;
+
+        if (changelog_fix_term(this, local, xdata) == _gf_false)
+                goto out;
+
+        CHANGELOG_FILL_USIGNLL (co, PRE_OP_MARK, NULL, xtra_len);
+        co++;
+
+        if (dict_get_int32(xdata, "recon-xattr-opcode", &xattr_op) == 0) {
+                CHANGELOG_FILL_FOP_NUMBER (co, (glusterfs_fop_t)xattr_op,
+                                                fop_fn, xtra_len);
+        }
+        else {
+                CHANGELOG_FILL_FOP_NUMBER (co, frame->root->op, fop_fn,
+                                           xtra_len);
+        }
+        co++;
+
+        CHANGELOG_FILL_UUID (co, fd->inode->gfid, uuid_fn, xtra_len);
+
+        changelog_set_usable_record_and_length (local, xtra_len, 3);
+
+        //changelog_replication_assign_term (priv, local);
+
+        frame->local = local;
+        ret = 0;
+
+ out:
+        if (ret)
+                changelog_local_cleanup (this, local);
+        return ret;
+}
+
+int32_t
+changelog_replication_removexattr (call_frame_t *frame, xlator_t *this,
+                                   loc_t *loc, const char *name, dict_t *xdata)
+{
+        int                ret      = -1;
+        size_t             xtra_len = 0;
+        changelog_opt_t   *co       = NULL;
+        changelog_priv_t  *priv     = NULL;
+        changelog_local_t *local    = NULL;
+        int32_t            xattr_op;
+
+        priv = this->private;
+
+        CHANGELOG_INIT_NOCHECK (this, local, NULL, loc->inode->gfid, 3);
+        if (!local)
+                goto out;
+
+        co = changelog_get_usable_buffer (local);
+        if (!co)
+                goto out;
+
+        if (changelog_fix_term(this, local, xdata) == _gf_false)
+                goto out;
+
+        CHANGELOG_FILL_USIGNLL (co, PRE_OP_MARK, NULL, xtra_len);
+        co++;
+
+        if (dict_get_int32(xdata, "recon-xattr-opcode", &xattr_op) == 0) {
+                CHANGELOG_FILL_FOP_NUMBER (co, (glusterfs_fop_t)xattr_op,
+                                           fop_fn, xtra_len);
+        }
+        else {
+                CHANGELOG_FILL_FOP_NUMBER (co, frame->root->op, fop_fn,
+                                           xtra_len);
+        }
+        co++;
+
+        CHANGELOG_FILL_UUID (co, loc->inode->gfid, uuid_fn, xtra_len);
+
+        changelog_set_usable_record_and_length (local, xtra_len, 3);
+
+        //changelog_replication_assign_term (priv, local);
+
+        frame->local = local;
+        ret = 0;
+
+ out:
+        if (ret)
+                changelog_local_cleanup (this, local);
+        return ret;
+}
+
+int32_t
+changelog_replication_setxattr (call_frame_t *frame,
+                                xlator_t *this, loc_t *loc,
+                                dict_t *dict, int32_t flags, dict_t *xdata)
+{
+        int                ret      = -1;
+        size_t             xtra_len = 0;
+        changelog_opt_t   *co       = NULL;
+        changelog_priv_t  *priv     = NULL;
+        changelog_local_t *local    = NULL;
+        int32_t            xattr_op;
+
+        priv = this->private;
+
+        /* 
 + FOP + GFID */
+        CHANGELOG_INIT_NOCHECK (this, local, NULL, loc->inode->gfid, 3);
+        if (!local)
+                goto out;
+
+        co = changelog_get_usable_buffer (local);
+        if (!co)
+                goto out;
+
+        if (changelog_fix_term(this, local, xdata) == _gf_false)
+                goto out;
+
+        CHANGELOG_FILL_USIGNLL (co, PRE_OP_MARK, NULL, xtra_len);
+        co++;
+
+        if (dict_get_int32(xdata, "recon-xattr-opcode", &xattr_op) == 0) {
+                CHANGELOG_FILL_FOP_NUMBER (co, (glusterfs_fop_t)xattr_op,
+                                           fop_fn, xtra_len);
+        }
+        else {
+                CHANGELOG_FILL_FOP_NUMBER (co, frame->root->op, fop_fn,
+                                           xtra_len);
+        }
+        co++;
+
+        CHANGELOG_FILL_UUID (co, loc->inode->gfid, uuid_fn, xtra_len);
+
+        changelog_set_usable_record_and_length (local, xtra_len, 3);
+
+        //changelog_replication_assign_term (priv, local);
+
+        frame->local = local;
+        ret = 0;
+
+ out:
+        if (ret)
+                changelog_local_cleanup (this, local);
+        return ret;
+}
+
+int32_t
+changelog_replication_fsetxattr (call_frame_t *frame,
+                                 xlator_t *this, fd_t *fd, dict_t *dict,
+                                 int32_t flags, dict_t *xdata)
+{
+        int                ret      = -1;
+        size_t             xtra_len = 0;
+        changelog_opt_t   *co       = NULL;
+        changelog_priv_t  *priv     = NULL;
+        changelog_local_t *local    = NULL;
+        int32_t            xattr_op;
+
+        priv = this->private;
+
+        /* 
 + FOP + GFID */
+        CHANGELOG_INIT_NOCHECK (this, local, NULL, fd->inode->gfid, 3);
+        if (!local)
+                goto out;
+
+        co = changelog_get_usable_buffer (local);
+        if (!co)
+                goto out;
+
+        if (changelog_fix_term(this, local, xdata) == _gf_false)
+                goto out;
+
+        CHANGELOG_FILL_USIGNLL (co, PRE_OP_MARK, NULL, xtra_len);
+        co++;
+
+
+        if (dict_get_int32(xdata, "recon-xattr-opcode", &xattr_op) == 0) {
+                CHANGELOG_FILL_FOP_NUMBER (co, (glusterfs_fop_t)xattr_op,
+                                           fop_fn, xtra_len);
+        }
+        else {
+                CHANGELOG_FILL_FOP_NUMBER (co, frame->root->op, fop_fn,
+                                           xtra_len);
+        }
+        co++;
+
+        CHANGELOG_FILL_UUID (co, fd->inode->gfid, uuid_fn, xtra_len);
+
+        changelog_set_usable_record_and_length (local, xtra_len, 3);
+
+        //changelog_replication_assign_term (priv, local);
+
+        frame->local = local;
+        ret = 0;
+
+ out:
+        if (ret)
+                changelog_local_cleanup (this, local);
+        return ret;
+}
+
+int32_t
+changelog_replication_truncate (call_frame_t *frame,
+                                xlator_t *this, loc_t *loc,
+                                off_t offset, dict_t *xdata)
+{
+        int                ret      = -1;
+        size_t             xtra_len = 0;
+        changelog_opt_t   *co       = NULL;
+        changelog_priv_t  *priv     = NULL;
+        changelog_local_t *local    = NULL;
+
+        priv = this->private;
+
+        /* 
 + FOP + GFID + Offset */
+        CHANGELOG_INIT_NOCHECK (this, local, NULL, loc->inode->gfid, 4);
+        if (!local)
+                goto out;
+
+        co = changelog_get_usable_buffer (local);
+        if (!co)
+                goto out;
+
+        if (changelog_fix_term(this, local, xdata) == _gf_false)
+                goto out;
+
+        CHANGELOG_FILL_USIGNLL (co, PRE_OP_MARK, NULL, xtra_len);
+        co++;
+
+        CHANGELOG_FILL_FOP_NUMBER (co, frame->root->op, fop_fn, xtra_len);
+        co++;
+
+        CHANGELOG_FILL_UUID (co, loc->inode->gfid, uuid_fn, xtra_len);
+        co++;
+
+        CHANGELOG_FILL_USIGNLL (co, offset, number_fn, xtra_len);
+
+        changelog_set_usable_record_and_length (local, xtra_len, 4);
+
+        //changelog_replication_assign_term (priv, local);
+
+        frame->local = local;
+        ret = 0;
+
+ out:
+        if (ret)
+                changelog_local_cleanup (this, local);
+        return ret;
+}
+
+int32_t
+changelog_replication_ftruncate (call_frame_t *frame,
+                                 xlator_t *this, fd_t *fd,
+                                 off_t offset, dict_t *xdata)
+{
+        int                ret      = -1;
+        size_t             xtra_len = 0;
+        changelog_opt_t   *co       = NULL;
+        changelog_priv_t  *priv     = NULL;
+        changelog_local_t *local    = NULL;
+
+        priv = this->private;
+
+        /* 
 + FOP + GFID + Offset */
+        CHANGELOG_INIT_NOCHECK (this, local, NULL, fd->inode->gfid, 4);
+        if (!local)
+                goto out;
+
+        co = changelog_get_usable_buffer (local);
+        if (!co)
+                goto out;
+
+        if (changelog_fix_term(this, local, xdata) == _gf_false)
+                goto out;
+
+        CHANGELOG_FILL_USIGNLL (co, PRE_OP_MARK, NULL, xtra_len);
+        co++;
+
+        CHANGELOG_FILL_FOP_NUMBER (co, frame->root->op, fop_fn, xtra_len);
+        co++;
+
+        CHANGELOG_FILL_UUID (co, fd->inode->gfid, uuid_fn, xtra_len);
+        co++;
+
+        CHANGELOG_FILL_USIGNLL (co, offset, number_fn, xtra_len);
+
+        changelog_set_usable_record_and_length (local, xtra_len, 4);
+
+        //changelog_replication_assign_term (priv, local);
+
+        frame->local = local;
+        ret = 0;
+
+ out:
+        if (ret)
+                changelog_local_cleanup (this, local);
+        return ret;
+}
+
+int32_t
+changelog_replication_writev (call_frame_t *frame,
+                              xlator_t *this, fd_t *fd, struct iovec *vector,
+                              int32_t count, off_t offset, uint32_t flags,
+                              struct iobref *iobref, dict_t *xdata)
+{
+        int                ret      = -1;
+        size_t             xtra_len = 0;
+        changelog_opt_t   *co       = NULL;
+        changelog_priv_t  *priv     = NULL;
+        changelog_local_t *local    = NULL;
+
+        priv = this->private;
+
+        /* 
 + FOP + GFID + Offset + Length */
+        CHANGELOG_INIT_NOCHECK (this, local, NULL, fd->inode->gfid, 5);
+        if (!local)
+                goto out;
+
+        co = changelog_get_usable_buffer (local);
+        if (!co)
+                goto out;
+
+        if (changelog_fix_term(this, local, xdata) == _gf_false)
+                goto out;
+
+        CHANGELOG_FILL_USIGNLL (co, PRE_OP_MARK, NULL, xtra_len);
+        co++;
+
+        CHANGELOG_FILL_FOP_NUMBER (co, frame->root->op, fop_fn, xtra_len);
+        co++;
+
+        CHANGELOG_FILL_UUID (co, fd->inode->gfid, uuid_fn, xtra_len);
+        co++;
+
+        CHANGELOG_FILL_USIGNLL (co, offset, number_fn, xtra_len);
+        co++;
+
+        CHANGELOG_FILL_USIGNLL (co, iov_length (vector, count),
+                               number_fn, xtra_len);
+
+        changelog_set_usable_record_and_length (local, xtra_len, 5);
+
+        //changelog_replication_assign_term (priv, local);
+
+        frame->local = local;
+        ret = 0;
+
+ out:
+        if (ret)
+                changelog_local_cleanup (this, local);
+        return ret;
+}
+
+/* overriden COPS */
+int
+changelog_replication_cops_open (xlator_t *this,
+                                 changelog_priv_t *priv, void *cpriv,
+                                 char *name, gf_boolean_t last)
+{
+        changelog_local_t local = {0,};
+        changelog_log_data_t       cld = {0,};
+        changelog_rollover_data_t *crd = NULL;
+
+        crd = &cld.cld_roll;
+
+        cld.cld_type = CHANGELOG_TYPE_ROLLOVER;
+
+        crd->crd_finale = last;
+        crd->crd_use_suffix = _gf_false;
+        crd->crd_prealloc_size = 1<<10; /* preallocate 1 MB */
+
+
+        (void) strcpy (crd->crd_changelog_name, name);
+
+        local.lu.val = 0;
+        local.nr_bytes = 0;
+
+        return changelog_inject_single_event (this, priv, &local, &cld);
+}
+
+/**
+ * no implicit rollover
+ */
+int
+changelog_replication_cops_rollover (xlator_t *this,
+                                     changelog_priv_t *priv, void *cpriv,
+                                     char *name, gf_boolean_t last)
+{
+        return changelog_replication_cops_open(this, priv, cpriv, name, last);
+}
+
+off_t
+changelog_replication_cops_get_offset (xlator_t *this,
+                                       changelog_priv_t *priv, void *cpriv,
+                                       changelog_local_t *local)
+{
+        if (!local)
+                return 0;
+
+        return (local->lu.val * JOURNAL_SECTOR_SIZE) + local->nr_bytes;
+}
+
+void
+changelog_replication_cops_set_offset (xlator_t *this,
+                                       changelog_priv_t *priv, void *cpriv,
+                                       changelog_local_t *local, off_t bytes)
+{
+        local->nr_bytes += bytes;
+}
+
+void
+changelog_replication_cops_reset_offset (xlator_t *this, changelog_priv_t *priv,
+                                         void *cpriv, changelog_local_t *local)
+{
+        return;
+}
+
+int
+changelog_replication_policy_init (xlator_t *this,
+                                   changelog_priv_t *priv,
+                                   struct changelog_logpolicy *cp)
+{
+        struct xlator_fops   *r_fops = NULL;
+        struct changelog_ops *r_cops = NULL;
+
+        r_fops = GF_CALLOC (1, sizeof (struct xlator_fops),
+                            gf_changelog_mt_fop_policy_t);
+        if (!r_fops)
+                return -1;
+
+        r_cops = GF_CALLOC (1, sizeof (struct changelog_ops),
+                            gf_changelog_mt_fop_policy_t);
+        if (!r_cops) {
+                GF_FREE (r_fops);
+                return -1;
+        }
+
+        /* no roll-over, one big fat journal per term */
+        priv->rollover_time = 0;
+
+        /* fsync() is internally trigerred by NSR */
+        priv->fsync_interval = 0;
+
+        /* no record header: extra data (via iobufs) are always persisted */
+        priv->no_gfid_hdr = _gf_true;
+
+        memcpy (r_fops, &changelog_default_fops, sizeof (struct xlator_fops));
+        memcpy (r_cops, &changelog_default_cops, sizeof (struct changelog_ops));
+
+        priv->term = 0;
+        (void) memset (cp->changelog_name, '\0', PATH_MAX);
+        memcpy(cp->changelog_name, JOURNAL_NAME, strlen(JOURNAL_NAME));
+#if 0
+        (void) snprintf (cp->changelog_name, PATH_MAX,
+                         JOURNAL_NAME, priv->term);
+#endif
+
+        /* overload all fops */
+        r_fops->writev       = changelog_replication_writev;
+        r_fops->ftruncate    = changelog_replication_ftruncate;
+        r_fops->truncate     = changelog_replication_truncate;
+        r_fops->fsetxattr    = changelog_replication_fsetxattr;
+        r_fops->setxattr     = changelog_replication_setxattr;
+        r_fops->removexattr  = changelog_replication_removexattr;
+        r_fops->fremovexattr = changelog_replication_fremovexattr;
+        r_fops->setattr      = changelog_replication_setattr;
+        r_fops->fsetattr     = changelog_replication_fsetattr;
+        r_fops->create       = changelog_replication_create;
+        r_fops->mknod        = changelog_replication_mknod;
+        r_fops->symlink      = changelog_replication_symlink;
+        r_fops->mkdir        = changelog_replication_mkdir;
+        r_fops->link         = changelog_replication_link;
+        r_fops->rename       = changelog_replication_rename;
+        r_fops->unlink       = changelog_replication_unlink;
+        r_fops->rmdir        = changelog_replication_rmdir;
+
+        /* overload cops */
+        r_cops->open         = changelog_replication_cops_open;
+        r_cops->rollover     = changelog_replication_cops_rollover;
+        r_cops->get_offset   = changelog_replication_cops_get_offset;
+        r_cops->set_offset   = changelog_replication_cops_set_offset;
+        r_cops->reset_offset = changelog_replication_cops_reset_offset;
+
+        cp->fops = r_fops;
+        cp->cops = r_cops;
+
+        return 0;
+}
+
+int
+changelog_replication_policy_fini (xlator_t *this,
+                                   struct changelog_logpolicy *cp)
+{
+        GF_FREE (cp->fops);
+        GF_FREE (cp->cops);
+        return 0;
+}
diff --git a/xlators/features/changelog/src/policy/changelog-policy.h b/xlators/features/changelog/src/policy/changelog-policy.h
new file mode 100644
index 000000000..73fdc1a98
--- /dev/null
+++ b/xlators/features/changelog/src/policy/changelog-policy.h
@@ -0,0 +1,41 @@
+/*
+   Copyright (c) 2013 Red Hat, Inc. 
+   This file is part of GlusterFS.
+
+   This file is licensed to you under your choice of the GNU Lesser
+   General Public License, version 3 or any later version (LGPLv3 or
+   later), or the GNU General Public License, version 2 (GPLv2), in all
+   cases as published by the Free Software Foundation.
+*/
+
+#ifndef _CHANGELOG_POLICY_H
+#define _CHANGELOG_POLICY_H
+
+#ifndef _CONFIG_H
+#define _CONFIG_H
+#include "config.h"
+#endif
+
+#include "xlator.h"
+#include "defaults.h"
+#include "logging.h"
+
+#include "changelog-mem-types.h"
+#include "changelog-helpers.h"
+
+int
+changelog_default_policy_init (xlator_t *this,
+                               changelog_priv_t *priv,
+                               struct changelog_logpolicy *);
+int
+changelog_default_policy_fini (xlator_t *this,
+                               struct changelog_logpolicy *);
+int
+changelog_replication_policy_init (xlator_t *this,
+                                   changelog_priv_t *priv,
+                                   struct changelog_logpolicy *cp);
+int
+changelog_replication_policy_fini (xlator_t *this,
+                                   struct changelog_logpolicy *cp);
+
+#endif /* _CHANGELOG_POLICY_H */
-- 
cgit