summaryrefslogtreecommitdiffstats
path: root/xlators/experimental/nsr-server/src/nsr.c
diff options
context:
space:
mode:
authorJeff Darcy <jdarcy@redhat.com>2015-12-10 10:30:32 -0500
committerJeff Darcy <jdarcy@redhat.com>2016-02-11 15:46:26 -0800
commit55617ef037695f47ead1c1b753678402e1545f8c (patch)
tree7bc4dde5a51ae5c8ef3a45454e64ae5f5b7c5a86 /xlators/experimental/nsr-server/src/nsr.c
parent320779d53ae013147d5e2556d2946c73e45734ab (diff)
NSR : nsr server code generation patch
The NSR-server with this patch, appoints the first node on every replica subvolume, as the leader for that subvolume. On receiving a 'write' fop, the leader first checks if there is quorum in the replica subvolume to proceeed. In case there isn't it fails with EROFS. If there is quorum, the leader forwards the fop to the followers. The followers on receiving the fop, perform the operation, and based on the success or failure of the outcome send a +ve or a -ve ack to the leader. The leader after receiving acks from the followers performs a quorum check of the acks, to see if it should even try to perform the fop. If quorum is not being met, and the leader's outcome wouldn't affect quorum, then it would send -ve ack to the client without even performing the fop. If quorum is being met, the leader will then try the fop on itself, and based on it's outcome perform a quorum check of all the acks received (this time, including it's own). Based on the result of the quorum check (irrespective of the outcome on the leader), a +ve or -ve ack is send back to the client. Change-Id: I860654b74c53e9b139b37dba43848e5504df6dce Signed-off-by: Avra Sengupta <asengupt@redhat.com> Reviewed-on: http://review.gluster.org/12705 Smoke: Gluster Build System <jenkins@build.gluster.com> Tested-by: Jeff Darcy <jdarcy@redhat.com> CentOS-regression: Gluster Build System <jenkins@build.gluster.com> NetBSD-regression: NetBSD Build System <jenkins@build.gluster.org> Reviewed-by: Jeff Darcy <jdarcy@redhat.com>
Diffstat (limited to 'xlators/experimental/nsr-server/src/nsr.c')
-rw-r--r--xlators/experimental/nsr-server/src/nsr.c1066
1 files changed, 1066 insertions, 0 deletions
diff --git a/xlators/experimental/nsr-server/src/nsr.c b/xlators/experimental/nsr-server/src/nsr.c
new file mode 100644
index 00000000000..0c494b78125
--- /dev/null
+++ b/xlators/experimental/nsr-server/src/nsr.c
@@ -0,0 +1,1066 @@
+/*
+ Copyright (c) 2013 Red Hat, Inc. <http://www.redhat.com>
+ This file is part of GlusterFS.
+
+ This file is licensed to you under your choice of the GNU Lesser
+ General Public License, version 3 or any later version (LGPLv3 or
+ later), or the GNU General Public License, version 2 (GPLv2), in all
+ cases as published by the Free Software Foundation.
+*/
+
+#ifndef _CONFIG_H
+#define _CONFIG_H
+#include "config.h"
+#endif
+
+#include <fnmatch.h>
+#include "call-stub.h"
+#include "defaults.h"
+#include "xlator.h"
+#include "glfs.h"
+#include "glfs-internal.h"
+#include "run.h"
+#include "common-utils.h"
+#include "syncop.h"
+#include "syscall.h"
+
+#include "nsr-internal.h"
+#include "nsr-messages.h"
+
+#define NSR_FLUSH_INTERVAL 5
+
+enum {
+ /* echo "cluster/nsr-server" | md5sum | cut -c 1-8 */
+ NSR_SERVER_IPC_BASE = 0x0e2d66a5,
+ NSR_SERVER_TERM_RANGE,
+ NSR_SERVER_OPEN_TERM,
+ NSR_SERVER_NEXT_ENTRY
+};
+
+/* Used to check the quorum of acks received after the fop
+ * confirming the status of the fop on all the brick processes
+ * for this particular subvolume
+ */
+gf_boolean_t
+fop_quorum_check (xlator_t *this, double n_children,
+ double current_state)
+{
+ nsr_private_t *priv = NULL;
+ gf_boolean_t result = _gf_false;
+ double required = 0;
+ double current = 0;
+
+ GF_VALIDATE_OR_GOTO ("nsr", this, out);
+ priv = this->private;
+ GF_VALIDATE_OR_GOTO (this->name, priv, out);
+
+ required = n_children * priv->quorum_pct;
+
+ /*
+ * Before performing the fop on the leader, we need to check,
+ * if there is any merit in performing the fop on the leader.
+ * In a case, where even a successful write on the leader, will
+ * not meet quorum, there is no point in trying the fop on the
+ * leader.
+ * When this function is called after the leader has tried
+ * performing the fop, this check will calculate quorum taking into
+ * account the status of the fop on the leader. If the leader's
+ * op_ret was -1, the complete function would account that by
+ * decrementing successful_acks by 1
+ */
+
+ current = current_state * 100.0;
+
+ if (current < required) {
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ N_MSG_QUORUM_NOT_MET,
+ "Quorum not met. quorum_pct = %f "
+ "Current State = %f, Required State = %f",
+ priv->quorum_pct, current,
+ required);
+ } else
+ result = _gf_true;
+
+out:
+ return result;
+}
+
+nsr_inode_ctx_t *
+nsr_get_inode_ctx (xlator_t *this, inode_t *inode)
+{
+ uint64_t ctx_int = 0LL;
+ nsr_inode_ctx_t *ctx_ptr;
+
+ if (__inode_ctx_get(inode, this, &ctx_int) == 0) {
+ ctx_ptr = (nsr_inode_ctx_t *)(long)ctx_int;
+ } else {
+ ctx_ptr = GF_CALLOC (1, sizeof(*ctx_ptr),
+ gf_mt_nsr_inode_ctx_t);
+ if (ctx_ptr) {
+ ctx_int = (uint64_t)(long)ctx_ptr;
+ if (__inode_ctx_set(inode, this, &ctx_int) == 0) {
+ LOCK_INIT(&ctx_ptr->lock);
+ INIT_LIST_HEAD(&ctx_ptr->aqueue);
+ INIT_LIST_HEAD(&ctx_ptr->pqueue);
+ } else {
+ GF_FREE(ctx_ptr);
+ ctx_ptr = NULL;
+ }
+ }
+
+ }
+
+ return ctx_ptr;
+}
+
+nsr_fd_ctx_t *
+nsr_get_fd_ctx (xlator_t *this, fd_t *fd)
+{
+ uint64_t ctx_int = 0LL;
+ nsr_fd_ctx_t *ctx_ptr;
+
+ if (__fd_ctx_get(fd, this, &ctx_int) == 0) {
+ ctx_ptr = (nsr_fd_ctx_t *)(long)ctx_int;
+ } else {
+ ctx_ptr = GF_CALLOC (1, sizeof(*ctx_ptr), gf_mt_nsr_fd_ctx_t);
+ if (ctx_ptr) {
+ if (__fd_ctx_set(fd, this, (uint64_t)ctx_ptr) == 0) {
+ INIT_LIST_HEAD(&ctx_ptr->dirty_list);
+ INIT_LIST_HEAD(&ctx_ptr->fd_list);
+ } else {
+ GF_FREE(ctx_ptr);
+ ctx_ptr = NULL;
+ }
+ }
+
+ }
+
+ return ctx_ptr;
+}
+
+void
+nsr_mark_fd_dirty (xlator_t *this, nsr_local_t *local)
+{
+ fd_t *fd = local->fd;
+ nsr_fd_ctx_t *ctx_ptr;
+ nsr_dirty_list_t *dirty;
+ nsr_private_t *priv = this->private;
+
+ /*
+ * TBD: don't do any of this for O_SYNC/O_DIRECT writes.
+ * Unfortunately, that optimization requires that we distinguish
+ * between writev and other "write" calls, saving the original flags
+ * and checking them in the callback. Too much work for too little
+ * gain right now.
+ */
+
+ LOCK(&fd->lock);
+ ctx_ptr = nsr_get_fd_ctx(this, fd);
+ dirty = GF_CALLOC(1, sizeof(*dirty), gf_mt_nsr_dirty_t);
+ if (ctx_ptr && dirty) {
+ gf_msg_trace (this->name, 0,
+ "marking fd %p as dirty (%p)", fd, dirty);
+ /* TBD: fill dirty->id from what changelog gave us */
+ list_add_tail(&dirty->links, &ctx_ptr->dirty_list);
+ if (list_empty(&ctx_ptr->fd_list)) {
+ /* Add a ref so _release doesn't get called. */
+ ctx_ptr->fd = fd_ref(fd);
+ LOCK(&priv->dirty_lock);
+ list_add_tail (&ctx_ptr->fd_list,
+ &priv->dirty_fds);
+ UNLOCK(&priv->dirty_lock);
+ }
+ } else {
+ gf_msg (this->name, GF_LOG_ERROR, ENOMEM,
+ N_MSG_MEM_ERR, "could not mark %p dirty", fd);
+ if (ctx_ptr) {
+ GF_FREE(ctx_ptr);
+ }
+ if (dirty) {
+ GF_FREE(dirty);
+ }
+ }
+ UNLOCK(&fd->lock);
+}
+
+#define NSR_TERM_XATTR "trusted.nsr.term"
+#define NSR_INDEX_XATTR "trusted.nsr.index"
+#define NSR_REP_COUNT_XATTR "trusted.nsr.rep-count"
+#define RECON_TERM_XATTR "trusted.nsr.recon-term"
+#define RECON_INDEX_XATTR "trusted.nsr.recon-index"
+
+#pragma generate
+
+uint8_t
+nsr_count_up_kids (nsr_private_t *priv)
+{
+ uint8_t retval = 0;
+ uint8_t i;
+
+ for (i = 0; i < priv->n_children; ++i) {
+ if (priv->kid_state & (1 << i)) {
+ ++retval;
+ }
+ }
+
+ return retval;
+}
+
+/*
+ * The fsync machinery looks a lot like that for any write call, but there are
+ * some important differences that are easy to miss. First, we don't care
+ * about the xdata that shows whether the call came from a leader or
+ * reconciliation process. If we're the leader we fan out; if we're not we
+ * don't. Second, we don't wait for followers before we issue the local call.
+ * The code generation system could be updated to handle this, and still might
+ * if we need to implement other "almost identical" paths (e.g. for open), but
+ * a copy is more readable as long as it's just one.
+ */
+
+int32_t
+nsr_fsync_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
+ int32_t op_ret, int32_t op_errno, struct iatt *prebuf,
+ struct iatt *postbuf, dict_t *xdata)
+{
+ nsr_local_t *local = frame->local;
+ gf_boolean_t unwind;
+
+ LOCK(&frame->lock);
+ unwind = !--(local->call_count);
+ UNLOCK(&frame->lock);
+
+ if (unwind) {
+ STACK_UNWIND_STRICT (fsync, frame, op_ret, op_errno, prebuf,
+ postbuf, xdata);
+ }
+ return 0;
+}
+
+int32_t
+nsr_fsync_local_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
+ int32_t op_ret, int32_t op_errno, struct iatt *prebuf,
+ struct iatt *postbuf, dict_t *xdata)
+{
+ nsr_dirty_list_t *dirty;
+ nsr_dirty_list_t *dtmp;
+ nsr_local_t *local = frame->local;
+
+ list_for_each_entry_safe (dirty, dtmp, &local->qlinks, links) {
+ gf_msg_trace (this->name, 0,
+ "sending post-op on %p (%p)", local->fd, dirty);
+ GF_FREE(dirty);
+ }
+
+ return nsr_fsync_cbk (frame, cookie, this, op_ret, op_errno,
+ prebuf, postbuf, xdata);
+}
+
+int32_t
+nsr_fsync (call_frame_t *frame, xlator_t *this, fd_t *fd, int32_t flags,
+ dict_t *xdata)
+{
+ nsr_private_t *priv = this->private;
+ nsr_local_t *local;
+ uint64_t ctx_int = 0LL;
+ nsr_fd_ctx_t *ctx_ptr;
+ xlator_list_t *trav;
+
+ local = mem_get0(this->local_pool);
+ if (!local) {
+ STACK_UNWIND_STRICT(fsync, frame, -1, ENOMEM,
+ NULL, NULL, xdata);
+ return 0;
+ }
+ INIT_LIST_HEAD(&local->qlinks);
+ frame->local = local;
+
+ /* Move the dirty list from the fd to the fsync request. */
+ LOCK(&fd->lock);
+ if (__fd_ctx_get(fd, this, &ctx_int) == 0) {
+ ctx_ptr = (nsr_fd_ctx_t *)(long)ctx_int;
+ list_splice_init (&ctx_ptr->dirty_list,
+ &local->qlinks);
+ }
+ UNLOCK(&fd->lock);
+
+ /* Issue the local call. */
+ local->call_count = priv->leader ? priv->n_children : 1;
+ STACK_WIND (frame, nsr_fsync_local_cbk,
+ FIRST_CHILD(this), FIRST_CHILD(this)->fops->fsync,
+ fd, flags, xdata);
+
+ /* Issue remote calls if we're the leader. */
+ if (priv->leader) {
+ for (trav = this->children->next; trav; trav = trav->next) {
+ STACK_WIND (frame, nsr_fsync_cbk,
+ FIRST_CHILD(this),
+ FIRST_CHILD(this)->fops->fsync,
+ fd, flags, xdata);
+ }
+ }
+
+ return 0;
+}
+
+int32_t
+nsr_getxattr_special (call_frame_t *frame, xlator_t *this, loc_t *loc,
+ const char *name, dict_t *xdata)
+{
+ dict_t *result;
+ nsr_private_t *priv = this->private;
+
+ if (!priv->leader) {
+ STACK_UNWIND_STRICT (getxattr, frame, -1, EREMOTE, NULL, NULL);
+ return 0;
+ }
+
+ if (!name || (strcmp(name, NSR_REP_COUNT_XATTR) != 0)) {
+ STACK_WIND_TAIL (frame,
+ FIRST_CHILD(this),
+ FIRST_CHILD(this)->fops->getxattr,
+ loc, name, xdata);
+ return 0;
+ }
+
+ result = dict_new();
+ if (!result) {
+ goto dn_failed;
+ }
+
+ priv->up_children = nsr_count_up_kids(this->private);
+ if (dict_set_uint32(result, NSR_REP_COUNT_XATTR,
+ priv->up_children) != 0) {
+ goto dsu_failed;
+ }
+
+ STACK_UNWIND_STRICT (getxattr, frame, 0, 0, result, NULL);
+ dict_destroy(result);
+ return 0;
+
+dsu_failed:
+ dict_destroy(result);
+dn_failed:
+ STACK_UNWIND_STRICT (getxattr, frame, -1, ENOMEM, NULL, NULL);
+ return 0;
+}
+
+void
+nsr_flush_fd (xlator_t *this, nsr_fd_ctx_t *fd_ctx)
+{
+ nsr_dirty_list_t *dirty;
+ nsr_dirty_list_t *dtmp;
+
+ list_for_each_entry_safe (dirty, dtmp, &fd_ctx->dirty_list, links) {
+ gf_msg_trace (this->name, 0,
+ "sending post-op on %p (%p)", fd_ctx->fd, dirty);
+ GF_FREE(dirty);
+ }
+
+ INIT_LIST_HEAD(&fd_ctx->dirty_list);
+}
+
+void *
+nsr_flush_thread (void *ctx)
+{
+ xlator_t *this = ctx;
+ nsr_private_t *priv = this->private;
+ struct list_head dirty_fds;
+ nsr_fd_ctx_t *fd_ctx;
+ nsr_fd_ctx_t *fd_tmp;
+ int ret;
+
+ for (;;) {
+ /*
+ * We have to be very careful to avoid lock inversions here, so
+ * we can't just hold priv->dirty_lock while we take and
+ * release locks for each fd. Instead, we only hold dirty_lock
+ * at the beginning of each iteration, as we (effectively) make
+ * a copy of the current list head and then clear the original.
+ * This leads to four scenarios for adding the first entry to
+ * an fd and potentially putting it on the global list.
+ *
+ * (1) While we're asleep. No lock contention, it just gets
+ * added and will be processed on the next iteration.
+ *
+ * (2) After we've made a local copy, but before we've started
+ * processing that fd. The new entry will be added to the
+ * fd (under its lock), and we'll process it on the current
+ * iteration.
+ *
+ * (3) While we're processing the fd. They'll block on the fd
+ * lock, then see that the list is empty and put it on the
+ * global list. We'll process it here on the next
+ * iteration.
+ *
+ * (4) While we're working, but after we've processed that fd.
+ * Same as (1) as far as that fd is concerned.
+ */
+ INIT_LIST_HEAD(&dirty_fds);
+ LOCK(&priv->dirty_lock);
+ list_splice_init(&priv->dirty_fds, &dirty_fds);
+ UNLOCK(&priv->dirty_lock);
+
+ list_for_each_entry_safe (fd_ctx, fd_tmp, &dirty_fds, fd_list) {
+ ret = syncop_fsync(FIRST_CHILD(this), fd_ctx->fd, 0,
+ NULL, NULL);
+ if (ret) {
+ gf_msg (this->name, GF_LOG_WARNING, 0,
+ N_MSG_SYS_CALL_FAILURE,
+ "failed to fsync %p (%d)",
+ fd_ctx->fd, -ret);
+ }
+
+ LOCK(&fd_ctx->fd->lock);
+ nsr_flush_fd(this, fd_ctx);
+ list_del_init(&fd_ctx->fd_list);
+ UNLOCK(&fd_ctx->fd->lock);
+ fd_unref(fd_ctx->fd);
+ }
+
+ sleep(NSR_FLUSH_INTERVAL);
+ }
+
+ return NULL;
+}
+
+
+int32_t
+nsr_get_changelog_dir (xlator_t *this, char **cl_dir_p)
+{
+ xlator_t *cl_xl;
+
+ /* Find our changelog translator. */
+ cl_xl = this;
+ while (cl_xl) {
+ if (strcmp(cl_xl->type, "features/changelog") == 0) {
+ break;
+ }
+ cl_xl = cl_xl->children->xlator;
+ }
+ if (!cl_xl) {
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ N_MSG_INIT_FAIL,
+ "failed to find changelog translator");
+ return ENOENT;
+ }
+
+ /* Find the actual changelog directory. */
+ if (dict_get_str(cl_xl->options, "changelog-dir", cl_dir_p) != 0) {
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ N_MSG_INIT_FAIL,
+ "failed to find changelog-dir for %s", cl_xl->name);
+ return ENODATA;
+ }
+
+ return 0;
+}
+
+
+void
+nsr_get_terms (call_frame_t *frame, xlator_t *this)
+{
+ int32_t op_errno;
+ char *cl_dir;
+ DIR *fp = NULL;
+ struct dirent *rd_entry;
+ struct dirent *rd_result;
+ int32_t term_first = -1;
+ int32_t term_contig = -1;
+ int32_t term_last = -1;
+ int term_num;
+ char *probe_str;
+ dict_t *my_xdata = NULL;
+
+ op_errno = nsr_get_changelog_dir(this, &cl_dir);
+ if (op_errno) {
+ goto err; /* Error was already logged. */
+ }
+ op_errno = ENODATA; /* Most common error after this. */
+
+ rd_entry = alloca (offsetof(struct dirent, d_name) +
+ pathconf(cl_dir, _PC_NAME_MAX) + 1);
+ if (!rd_entry) {
+ goto err;
+ }
+
+ fp = sys_opendir (cl_dir);
+ if (!fp) {
+ op_errno = errno;
+ goto err;
+ }
+
+ /* Find first and last terms. */
+ for (;;) {
+ if (readdir_r(fp, rd_entry, &rd_result) != 0) {
+ op_errno = errno;
+ goto err;
+ }
+ if (!rd_result) {
+ break;
+ }
+ if (fnmatch("TERM.*", rd_entry->d_name, FNM_PATHNAME) != 0) {
+ continue;
+ }
+ /* +5 points to the character after the period */
+ term_num = atoi(rd_entry->d_name+5);
+ gf_msg (this->name, GF_LOG_INFO, 0,
+ N_MSG_GENERIC,
+ "%s => %d", rd_entry->d_name, term_num);
+ if (term_num < 0) {
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ N_MSG_INVALID,
+ "invalid term file name %s", rd_entry->d_name);
+ op_errno = EINVAL;
+ goto err;
+ }
+ if ((term_first < 0) || (term_first > term_num)) {
+ term_first = term_num;
+ }
+ if ((term_last < 0) || (term_last < term_num)) {
+ term_last = term_num;
+ }
+ }
+ if ((term_first < 0) || (term_last < 0)) {
+ /* TBD: are we *sure* there should always be at least one? */
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ N_MSG_NO_DATA, "no terms found");
+ op_errno = EINVAL;
+ goto err;
+ }
+
+ sys_closedir (fp);
+ fp = NULL;
+
+ /*
+ * Find term_contig, which is the earliest term for which there are
+ * no gaps between it and term_last.
+ */
+ for (term_contig = term_last; term_contig > 0; --term_contig) {
+ if (gf_asprintf(&probe_str, "%s/TERM.%d",
+ cl_dir, term_contig-1) <= 0) {
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ N_MSG_MEM_ERR,
+ "failed to format term %d", term_contig-1);
+ goto err;
+ }
+ if (sys_access(probe_str, F_OK) != 0) {
+ GF_FREE(probe_str);
+ break;
+ }
+ GF_FREE(probe_str);
+ }
+
+ gf_msg (this->name, GF_LOG_INFO, 0,
+ N_MSG_GENERIC,
+ "found terms %d-%d (%d)",
+ term_first, term_last, term_contig);
+
+ /* Return what we've found */
+ my_xdata = dict_new();
+ if (!my_xdata) {
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ N_MSG_MEM_ERR,
+ "failed to allocate reply dictionary");
+ goto err;
+ }
+ if (dict_set_int32(my_xdata, "term-first", term_first) != 0) {
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ N_MSG_DICT_FLR,
+ "failed to set term-first");
+ goto err;
+ }
+ if (dict_set_int32(my_xdata, "term-contig", term_contig) != 0) {
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ N_MSG_DICT_FLR,
+ "failed to set term-contig");
+ goto err;
+ }
+ if (dict_set_int32(my_xdata, "term-last", term_last) != 0) {
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ N_MSG_DICT_FLR,
+ "failed to set term-last");
+ goto err;
+ }
+
+ /* Finally! */
+ STACK_UNWIND_STRICT (ipc, frame, 0, 0, my_xdata);
+ dict_unref(my_xdata);
+ return;
+
+err:
+ if (fp) {
+ sys_closedir (fp);
+ }
+ if (my_xdata) {
+ dict_unref(my_xdata);
+ }
+ STACK_UNWIND_STRICT (ipc, frame, -1, op_errno, NULL);
+}
+
+
+long
+get_entry_count (xlator_t *this, int fd)
+{
+ struct stat buf;
+ long min; /* last entry not known to be empty */
+ long max; /* first entry known to be empty */
+ long curr;
+ char entry[CHANGELOG_ENTRY_SIZE];
+
+ if (sys_fstat (fd, &buf) < 0) {
+ return -1;
+ }
+
+ min = 0;
+ max = buf.st_size / CHANGELOG_ENTRY_SIZE;
+
+ while ((min+1) < max) {
+ curr = (min + max) / 2;
+ if (sys_lseek(fd, curr*CHANGELOG_ENTRY_SIZE, SEEK_SET) < 0) {
+ return -1;
+ }
+ if (sys_read(fd, entry, sizeof(entry)) != sizeof(entry)) {
+ return -1;
+ }
+ if ((entry[0] == '_') && (entry[1] == 'P')) {
+ min = curr;
+ } else {
+ max = curr;
+ }
+ }
+
+ if (sys_lseek(fd, 0, SEEK_SET) < 0) {
+ gf_msg (this->name, GF_LOG_WARNING, 0,
+ N_MSG_SYS_CALL_FAILURE,
+ "failed to reset offset");
+ }
+ return max;
+}
+
+
+void
+nsr_open_term (call_frame_t *frame, xlator_t *this, dict_t *xdata)
+{
+ int32_t op_errno;
+ char *cl_dir;
+ char *term;
+ char *path;
+ nsr_private_t *priv = this->private;
+
+ op_errno = nsr_get_changelog_dir(this, &cl_dir);
+ if (op_errno) {
+ goto err;
+ }
+
+ if (dict_get_str(xdata, "term", &term) != 0) {
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ N_MSG_NO_DATA, "missing term");
+ op_errno = ENODATA;
+ goto err;
+ }
+
+ if (gf_asprintf(&path, "%s/TERM.%s", cl_dir, term) < 0) {
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ N_MSG_MEM_ERR, "failed to construct path");
+ op_errno = ENOMEM;
+ goto err;
+ }
+
+ if (priv->term_fd >= 0) {
+ sys_close (priv->term_fd);
+ }
+ priv->term_fd = open(path, O_RDONLY);
+ if (priv->term_fd < 0) {
+ op_errno = errno;
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ N_MSG_SYS_CALL_FAILURE,
+ "failed to open term file");
+ goto err;
+ }
+
+ priv->term_total = get_entry_count(this, priv->term_fd);
+ if (priv->term_total < 0) {
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ N_MSG_NO_DATA, "failed to get entry count");
+ sys_close (priv->term_fd);
+ priv->term_fd = -1;
+ op_errno = EIO;
+ goto err;
+ }
+ priv->term_read = 0;
+
+ /* Success! */
+ STACK_UNWIND_STRICT (ipc, frame, 0, 0, NULL);
+ return;
+
+err:
+ STACK_UNWIND_STRICT (ipc, frame, -1, op_errno, NULL);
+}
+
+
+void
+nsr_next_entry (call_frame_t *frame, xlator_t *this)
+{
+ int32_t op_errno = ENOMEM;
+ nsr_private_t *priv = this->private;
+ ssize_t nbytes;
+ dict_t *my_xdata;
+
+ if (priv->term_fd < 0) {
+ op_errno = EBADFD;
+ goto err;
+ }
+
+ if (priv->term_read >= priv->term_total) {
+ op_errno = ENODATA;
+ goto err;
+ }
+
+ nbytes = sys_read (priv->term_fd, priv->term_buf, CHANGELOG_ENTRY_SIZE);
+ if (nbytes < CHANGELOG_ENTRY_SIZE) {
+ if (nbytes < 0) {
+ op_errno = errno;
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ N_MSG_SYS_CALL_FAILURE,
+ "error reading next entry: %s",
+ strerror(errno));
+ } else {
+ op_errno = EIO;
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ N_MSG_SYS_CALL_FAILURE,
+ "got %ld/%d bytes for next entry",
+ nbytes, CHANGELOG_ENTRY_SIZE);
+ }
+ goto err;
+ }
+ ++(priv->term_read);
+
+ my_xdata = dict_new();
+ if (!my_xdata) {
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ N_MSG_MEM_ERR, "failed to allocate reply xdata");
+ goto err;
+ }
+
+ if (dict_set_static_bin(my_xdata, "data",
+ priv->term_buf, CHANGELOG_ENTRY_SIZE) != 0) {
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ N_MSG_DICT_FLR, "failed to assign reply xdata");
+ goto err;
+ }
+
+ STACK_UNWIND_STRICT (ipc, frame, 0, 0, my_xdata);
+ dict_unref(my_xdata);
+ return;
+
+err:
+ STACK_UNWIND_STRICT (ipc, frame, -1, op_errno, NULL);
+}
+
+
+int32_t
+nsr_ipc (call_frame_t *frame, xlator_t *this, int32_t op, dict_t *xdata)
+{
+ switch (op) {
+ case NSR_SERVER_TERM_RANGE:
+ nsr_get_terms(frame, this);
+ break;
+ case NSR_SERVER_OPEN_TERM:
+ nsr_open_term(frame, this, xdata);
+ break;
+ case NSR_SERVER_NEXT_ENTRY:
+ nsr_next_entry(frame, this);
+ break;
+ default:
+ STACK_WIND_TAIL (frame,
+ FIRST_CHILD(this),
+ FIRST_CHILD(this)->fops->ipc,
+ op, xdata);
+ }
+
+ return 0;
+}
+
+
+int32_t
+nsr_forget (xlator_t *this, inode_t *inode)
+{
+ uint64_t ctx = 0LL;
+
+ if ((inode_ctx_del(inode, this, &ctx) == 0) && ctx) {
+ GF_FREE((void *)(long)ctx);
+ }
+
+ return 0;
+}
+
+int32_t
+nsr_release (xlator_t *this, fd_t *fd)
+{
+ uint64_t ctx = 0LL;
+
+ if ((fd_ctx_del(fd, this, &ctx) == 0) && ctx) {
+ GF_FREE((void *)(long)ctx);
+ }
+
+ return 0;
+}
+
+struct xlator_cbks cbks = {
+ .forget = nsr_forget,
+ .release = nsr_release,
+};
+
+int
+nsr_reconfigure (xlator_t *this, dict_t *options)
+{
+ nsr_private_t *priv = this->private;
+
+ GF_OPTION_RECONF ("leader",
+ priv->config_leader, options, bool, err);
+ GF_OPTION_RECONF ("quorum-percent",
+ priv->quorum_pct, options, percent, err);
+ gf_msg (this->name, GF_LOG_INFO, 0, N_MSG_GENERIC,
+ "reconfigure called, config_leader = %d, quorum_pct = %.1f\n",
+ priv->leader, priv->quorum_pct);
+
+ priv->leader = priv->config_leader;
+
+ return 0;
+
+err:
+ return -1;
+}
+
+int
+nsr_get_child_index (xlator_t *this, xlator_t *kid)
+{
+ xlator_list_t *trav;
+ int retval = -1;
+
+ for (trav = this->children; trav; trav = trav->next) {
+ ++retval;
+ if (trav->xlator == kid) {
+ return retval;
+ }
+ }
+
+ return -1;
+}
+
+/*
+ * Child notify handling is unreasonably FUBAR. Sometimes we'll get a
+ * CHILD_DOWN for a protocol/client child before we ever got a CHILD_UP for it.
+ * Other times we won't. Because it's effectively random (probably racy), we
+ * can't just maintain a count. We actually have to keep track of the state
+ * for each child separately, to filter out the bogus CHILD_DOWN events, and
+ * then generate counts on demand.
+ */
+int
+nsr_notify (xlator_t *this, int event, void *data, ...)
+{
+ nsr_private_t *priv = this->private;
+ int index;
+
+ switch (event) {
+ case GF_EVENT_CHILD_UP:
+ index = nsr_get_child_index(this, data);
+ if (index >= 0) {
+ priv->kid_state |= (1 << index);
+ priv->up_children = nsr_count_up_kids(priv);
+ gf_msg (this->name, GF_LOG_INFO, 0, N_MSG_GENERIC,
+ "got CHILD_UP for %s, now %u kids",
+ ((xlator_t *)data)->name,
+ priv->up_children);
+ if (!priv->config_leader && (priv->up_children > 1)) {
+ priv->leader = _gf_false;
+ }
+ }
+ break;
+ case GF_EVENT_CHILD_DOWN:
+ index = nsr_get_child_index(this, data);
+ if (index >= 0) {
+ priv->kid_state &= ~(1 << index);
+ priv->up_children = nsr_count_up_kids(priv);
+ gf_msg (this->name, GF_LOG_INFO, 0, N_MSG_GENERIC,
+ "got CHILD_DOWN for %s, now %u kids",
+ ((xlator_t *)data)->name,
+ priv->up_children);
+ if (!priv->config_leader && (priv->up_children < 2)) {
+ priv->leader = _gf_true;
+ }
+ }
+ break;
+ default:
+ ;
+ }
+
+ return default_notify(this, event, data);
+}
+
+
+int32_t
+mem_acct_init (xlator_t *this)
+{
+ int ret = -1;
+
+ GF_VALIDATE_OR_GOTO ("nsr", this, out);
+
+ ret = xlator_mem_acct_init (this, gf_mt_nsr_end + 1);
+
+ if (ret != 0) {
+ gf_msg (this->name, GF_LOG_ERROR, 0, N_MSG_MEM_ERR,
+ "Memory accounting init" "failed");
+ return ret;
+ }
+out:
+ return ret;
+}
+
+
+void
+nsr_deallocate_priv (nsr_private_t *priv)
+{
+ if (!priv) {
+ return;
+ }
+
+ GF_FREE(priv);
+}
+
+
+int32_t
+nsr_init (xlator_t *this)
+{
+ xlator_list_t *remote;
+ xlator_list_t *local;
+ nsr_private_t *priv = NULL;
+ xlator_list_t *trav;
+ pthread_t kid;
+ extern xlator_t global_xlator;
+ glusterfs_ctx_t *oldctx = global_xlator.ctx;
+
+ /*
+ * Any fop that gets special treatment has to be patched in here,
+ * because the compiled-in table is produced by the code generator and
+ * only contains generated functions. Note that we have to go through
+ * this->fops because of some dynamic-linking strangeness; modifying
+ * the static table doesn't work.
+ */
+ this->fops->getxattr = nsr_getxattr_special;
+ this->fops->fsync = nsr_fsync;
+ this->fops->ipc = nsr_ipc;
+
+ local = this->children;
+ if (!local) {
+ gf_msg (this->name, GF_LOG_ERROR, 0, N_MSG_NO_DATA,
+ "no local subvolume");
+ goto err;
+ }
+
+ remote = local->next;
+ if (!remote) {
+ gf_msg (this->name, GF_LOG_ERROR, 0, N_MSG_NO_DATA,
+ "no remote subvolumes");
+ goto err;
+ }
+
+ this->local_pool = mem_pool_new (nsr_local_t, 128);
+ if (!this->local_pool) {
+ gf_msg (this->name, GF_LOG_ERROR, 0, N_MSG_MEM_ERR,
+ "failed to create nsr_local_t pool");
+ goto err;
+ }
+
+ priv = GF_CALLOC (1, sizeof(*priv), gf_mt_nsr_private_t);
+ if (!priv) {
+ gf_msg (this->name, GF_LOG_ERROR, 0, N_MSG_MEM_ERR,
+ "could not allocate priv");
+ goto err;
+ }
+
+ for (trav = this->children; trav; trav = trav->next) {
+ ++(priv->n_children);
+ }
+
+ LOCK_INIT(&priv->dirty_lock);
+ LOCK_INIT(&priv->index_lock);
+ INIT_LIST_HEAD(&priv->dirty_fds);
+ priv->term_fd = -1;
+
+ this->private = priv;
+
+ GF_OPTION_INIT ("leader", priv->config_leader, bool, err);
+ GF_OPTION_INIT ("quorum-percent", priv->quorum_pct, percent, err);
+
+ priv->leader = priv->config_leader;
+
+ if (pthread_create(&kid, NULL, nsr_flush_thread,
+ this) != 0) {
+ gf_msg (this->name, GF_LOG_ERROR, 0, N_MSG_SYS_CALL_FAILURE,
+ "could not start flush thread");
+ /* TBD: treat this as a fatal error? */
+ }
+
+ /*
+ * Calling glfs_new changes old->ctx, even if THIS still points
+ * to global_xlator. That causes problems later in the main
+ * thread, when gf_log_dump_graph tries to use the FILE after
+ * we've mucked with it and gets a segfault in __fprintf_chk.
+ * We can avoid all that by undoing the damage before we
+ * continue.
+ */
+ global_xlator.ctx = oldctx;
+
+ return 0;
+
+err:
+ nsr_deallocate_priv(priv);
+ return -1;
+}
+
+
+void
+nsr_fini (xlator_t *this)
+{
+ nsr_deallocate_priv(this->private);
+}
+
+class_methods_t class_methods = {
+ .init = nsr_init,
+ .fini = nsr_fini,
+ .reconfigure = nsr_reconfigure,
+ .notify = nsr_notify,
+};
+
+struct volume_options options[] = {
+ { .key = {"leader"},
+ .type = GF_OPTION_TYPE_BOOL,
+ .default_value = "false",
+ .description = "Start in the leader role. This is only for "
+ "bootstrapping the code, and should go away when we "
+ "have real leader election."
+ },
+ { .key = {"vol-name"},
+ .type = GF_OPTION_TYPE_STR,
+ .description = "volume name"
+ },
+ { .key = {"my-name"},
+ .type = GF_OPTION_TYPE_STR,
+ .description = "brick name in form of host:/path"
+ },
+ { .key = {"etcd-servers"},
+ .type = GF_OPTION_TYPE_STR,
+ .description = "list of comma seperated etc servers"
+ },
+ { .key = {"subvol-uuid"},
+ .type = GF_OPTION_TYPE_STR,
+ .description = "UUID for this NSR (sub)volume"
+ },
+ { .key = {"quorum-percent"},
+ .type = GF_OPTION_TYPE_PERCENT,
+ .default_value = "50.0",
+ .description = "percentage of rep_count-1 that must be up"
+ },
+ { .key = {NULL} },
+};