summaryrefslogtreecommitdiffstats
path: root/xlators/cluster/nsr-recon
diff options
context:
space:
mode:
Diffstat (limited to 'xlators/cluster/nsr-recon')
-rw-r--r--xlators/cluster/nsr-recon/Makefile.am3
-rw-r--r--xlators/cluster/nsr-recon/src/Makefile.am22
-rw-r--r--xlators/cluster/nsr-recon/src/recon_driver.c2624
-rw-r--r--xlators/cluster/nsr-recon/src/recon_driver.h308
-rw-r--r--xlators/cluster/nsr-recon/src/recon_xlator.c837
-rw-r--r--xlators/cluster/nsr-recon/src/recon_xlator.h78
6 files changed, 3872 insertions, 0 deletions
diff --git a/xlators/cluster/nsr-recon/Makefile.am b/xlators/cluster/nsr-recon/Makefile.am
new file mode 100644
index 000000000..d471a3f92
--- /dev/null
+++ b/xlators/cluster/nsr-recon/Makefile.am
@@ -0,0 +1,3 @@
+SUBDIRS = src
+
+CLEANFILES =
diff --git a/xlators/cluster/nsr-recon/src/Makefile.am b/xlators/cluster/nsr-recon/src/Makefile.am
new file mode 100644
index 000000000..8fa344864
--- /dev/null
+++ b/xlators/cluster/nsr-recon/src/Makefile.am
@@ -0,0 +1,22 @@
+xlator_LTLIBRARIES = nsr_recon.la
+xlatordir = $(libdir)/glusterfs/$(PACKAGE_VERSION)/xlator/cluster
+
+nsr_recon_la_LDFLAGS = -module -avoid-version -lgfapi
+nsr_recon_la_SOURCES = recon_driver.c recon_xlator.c
+
+nsr_recon_la_LIBADD = $(top_builddir)/libglusterfs/src/libglusterfs.la
+
+noinst_HEADERS = recon_driver.h recon_xlator.h
+
+AM_CPPFLAGS = $(GF_CPPFLAGS) \
+ -I$(top_srcdir)/libglusterfs/src -I$(top_srcdir)/xlators/lib/src \
+ -I$(top_srcdir)/rpc/rpc-lib/src
+
+AM_CFLAGS = -Wall $(GF_CFLAGS)
+
+XLATOR_HEADER = $(top_srcdir)/libglusterfs/src/xlator.h
+
+CLEANFILES =
+
+uninstall-local:
+ rm -f $(DESTDIR)$(xlatordir)/nsr.so
diff --git a/xlators/cluster/nsr-recon/src/recon_driver.c b/xlators/cluster/nsr-recon/src/recon_driver.c
new file mode 100644
index 000000000..1328d52dc
--- /dev/null
+++ b/xlators/cluster/nsr-recon/src/recon_driver.c
@@ -0,0 +1,2624 @@
+/*
+ Copyright (c) 2013 Red Hat, Inc. <http://www.redhat.com>
+ This file is part of GlusterFS.
+
+ This file is licensed to you under your choice of the GNU Lesser
+ General Public License, version 3 or any later version (LGPLv3 or
+ later), or the GNU General Public License, version 2 (GPLv2), in all
+ cases as published by the Free Software Foundation.
+*/
+#ifndef _CONFIG_H
+#define _CONFIG_H
+#include "config.h"
+#endif
+
+#include <sys/types.h>
+#include <fcntl.h>
+#include <string.h>
+#include <unistd.h>
+#include <fnmatch.h>
+
+
+#include "call-stub.h"
+#include "defaults.h"
+#include "xlator.h"
+
+
+#include "recon_driver.h"
+#include "recon_xlator.h"
+#include "api/src/glfs-internal.h"
+#include "api/src/glfs-handles.h"
+
+/* TBD: move declarations here and nsr.c into a common place */
+#define NSR_TERM_XATTR "trusted.nsr.term"
+#define RECON_TERM_XATTR "trusted.nsr.recon-term"
+#define RECON_INDEX_XATTR "trusted.nsr.recon-index"
+
+/*
+ * Execution architecture for the NSR reconciliation driver. The driver runs
+ * as a seperate process in each node where the brick is. The main function of
+ * the driver is nsr_reconciliation_driver() (last function below) The driver
+ * just sits in a tight loop waiting for state changes. When a brick becomes a
+ * replica leader, it fences IO, contacts this process and waits for
+ * reconciliation to finish.
+ *
+ * The replica leader talks to other bricks in replica group which are alive
+ * and gets the last term info using which it decides which has the latest
+ * data. That brick is referred to as the "reconciliator"; leader sends a
+ * message to reconciliator to freeze its data (by reading any incomplete data
+ * from other nodes from that term if required)
+ *
+ * Once that is done leader sends a message to all nodes except the
+ * reconciliator to sync themselves with the reconciliator. This process is
+ * referred to as "resolution".
+ *
+ * Hence the reconciliation processes need to talk to each other to get a given
+ * term info. This is implemented using the recon translator IOs which
+ * implements a bare bone RPC by exposing a file interface to which
+ * reads/writes are done to pass control messages. This is referred to as the
+ * "control plane". This implementation allows the control plane to be
+ * implemented as a bunch of threads for each of the nodes.
+ *
+ * The reconciliation process also needs to talk to the brick process on that
+ * node to actually write the data as part of reconciliation/resolution. This
+ * is referred to as the "data plane". Again there are a bunch of threads that
+ * do this work.
+ *
+ * The way the worker threads are organised is that main driver context has a
+ * pointer to contexts for each of these thread contexts. The thread context at
+ * index 0 always refers to talking with local recon process/brick. So the
+ * control worker at index 0 will get the local changelog info and data worker
+ * at index 0 will talk to local brick.
+ *
+ * All the ops from the control/data planes are implemented using the glfs
+ * APIs.
+ */
+
+/*
+ * This function gets the size of all the extended attributes for a file.
+ * This is used so that caller knows how much to allocate for key-value storage.
+ *
+ * Input Arguments:
+ * fd - the file opened using glfs API.
+ * dict - passed so that NSR translator can get this from the required brick
+ *
+ * Output Arguments:
+ * b - pointer to the buffer where the attributes are filled up.
+ * key_size - the size of all keys
+ * val_size - the size of all values
+ * num - number of key/values
+ */
+static int32_t
+get_xattr_total_size( struct glfs_fd *fd,
+ char **b,
+ uint32_t *key_size,
+ uint32_t *val_size,
+ uint32_t* num,
+ dict_t *dict)
+{
+ int32_t s = -1, ret = -1;
+ char *c = NULL;
+
+ *key_size = 0;
+ *val_size = 0;
+ *num = 0;
+
+ // First get the size of the keys
+ s = glfs_flistxattr_with_xdata(fd, NULL,0, dict);
+ if (s == -1)
+ goto out;
+ *key_size = s;
+
+ // TBD - use the regular calloc
+ (*b) = c = calloc(s+1,1);
+
+ // get the keys themselves
+ if (glfs_flistxattr_with_xdata(fd, c, s+1, dict) == -1)
+ goto out;
+ do {
+ int32_t r;
+ uint32_t len = 0;
+ // for each key get the size of the value
+ r = glfs_fgetxattr_with_xdata(fd, c, NULL, 0, dict);
+ if (r == -1)
+ goto out;
+ (*val_size) += r;
+ len = strlen(c) + 1;
+ c += len;
+ s -= len;
+ (*num)++;
+ } while(s);
+ ret = 0;
+out:
+ return ret;
+}
+
+/*
+ * This function gets bunch of xattr values given set of keys.
+ *
+ * Input Arguments:
+ * fd - the file opened using glfs API.
+ * keys - the bunch of keys
+ * size - size of values
+ * num - number of keys
+ * dict - passed so that NSR translator can get this from the required brick
+ *
+ * Output Arguments:
+ * buf - where the values are written one after the other (NULL seperated)
+ */
+static void
+get_xattr(struct glfs_fd *fd,
+ char *keys,
+ char *buf,
+ uint32_t size,
+ uint32_t num,
+ dict_t *dict)
+{
+ while(num--) {
+ int32_t r;
+ uint32_t len = 0;
+
+ // copy the key
+ strcpy(buf, keys);
+ len = strlen(keys);
+ len++;
+ buf += len;
+
+ // get the value and copy the value after incrementing buf after the key
+ r = glfs_fgetxattr_with_xdata(fd, keys, buf, size, dict);
+
+ // TBD - handle error
+ if (r == -1)
+ return;
+
+ // increment the key to next value
+ keys += len;
+
+ // increment buf to hold the next key
+ buf += strlen(buf) + 1;
+ }
+ return;
+}
+
+/*
+ * Function deletes a bunch of key values in extended attributes of a file.
+ * Input Arguments:
+ * fd - the file opened using glfs API.
+ * dict - passed so that NSR translator can do this from the required brick
+ * keys - bunch of NULL seperated key names
+ * num - number of keys
+ */
+static void delete_xattr(struct glfs_fd *fd,
+ dict_t *dict_t,
+ char *keys,
+ uint32_t num)
+{
+ while(num--) {
+ // get the value and copy the value
+ // TBD - handle failure cases when calling glfs_fremovexattr_with_xdata()
+ glfs_fremovexattr_with_xdata(fd, keys, dict_t);
+ keys += strlen(keys) +1;
+ }
+ return;
+}
+
+/*
+ * Given a bunch of key value pairs, fill them as xattrs for a file
+ *
+ * Input Arguments:
+ * fd - the file opened using glfs API.
+ * dict - passed so that NSR translator can do this from the required brick
+ * buf - buffer containing the keys-values pairs. The key value are NULL seperated.
+ * Each of the key-value is seperated by NULL in turn.
+ * num - Number of such key value pairs.
+ */
+static void
+fill_xattr(struct glfs_fd *fd,
+ dict_t *dict,
+ char *buf,
+ uint32_t num)
+{
+ char *k = buf, *val = NULL;
+
+ while(num--) {
+ int32_t r;
+
+ val = k + strlen(k) + 1;
+
+ // TBD - handle failure cases when calling glfs_fsetxattr_with_xdata()
+ r = glfs_fsetxattr_with_xdata(fd, k, val, strlen(val), 0, dict);
+ if (r == -1)
+ return;
+ k = val + strlen(val) + 1;
+ }
+ return;
+}
+
+/*
+ * This function gets a file that can be used for doing glfs_init later.
+ * The control file is used by control thread(function) to talk to peer reconciliation process.
+ * The data file is used by the data thread(function) to talk to the bricks.
+ * The control file is of name such as con:gfs1:-mnt-a1 where "gfs1" is name of host
+ * and the brick path is "/mnt/a1".
+ * The data file is of name such as data:gfs1:-mnt-a1.
+ *
+ * Input Arguments:
+ * vol - name of the volume. This is used to build the full path of the control and data file
+ * such as /var/lib/glusterd/vols/test/bricks/gfs2:-mnt-test1-nsr-recon.vol.
+ * In above example the volume name is test and brick on gfs2 is on path /mnt/test1
+ *
+ * worker - The worker for a given node. This worker has 2 threads - one on the data plane
+ * and one on the control plane. The worker->name is already filled with hostname:brickname
+ * in the function nsr_reconciliation_driver(). Use that to build the volume file.
+ * So if worker->name has gfs1:/mnt/a1, control file is con:gfs1:-mnt-a1
+ * and data file is data:gfs1:-mnt-a1.
+ * All these files are under the bricks directory. TBD - move this to a NSR recon directory later.
+ */
+static void
+nsr_recon_get_file(char *vol, nsr_replica_worker_t *worker)
+{
+ char *ptr;
+ char tr[256];
+
+ // Replace the "/" to -
+ strcpy(tr, worker->name);
+ ptr = strchr (tr, '/');
+ while (ptr) {
+ *ptr = '-';
+ ptr = strchr (tr, '/');
+ }
+
+ // Build the base directory such as "/var/lib/glusterd/vols/test/bricks/"
+ sprintf(worker->control_worker->vol_file,
+ "/%s/%s/%s/%s/",
+ GLUSTERD_DEFAULT_WORKDIR,
+ GLUSTERD_VOLUME_DIR_PREFIX,
+ vol,
+ GLUSTERD_BRICK_INFO_DIR);
+
+ strcat(worker->control_worker->vol_file, "con:");
+ strcat(worker->control_worker->vol_file, tr);
+
+ sprintf(worker->data_worker->vol_file,
+ "/%s/%s/%s/%s/",
+ GLUSTERD_DEFAULT_WORKDIR,
+ GLUSTERD_VOLUME_DIR_PREFIX,
+ vol,
+ GLUSTERD_BRICK_INFO_DIR);
+ strcat(worker->data_worker->vol_file, "data:");
+ strcat(worker->data_worker->vol_file, tr);
+}
+
+/*
+ * This function does all the glfs initialisation
+ * so that reconciliation process can talk to other recon processes/bricks
+ * for the control/data messages.
+ * This will be done everytime a worker needs to be kicked off to talk
+ * across any plane.
+ *
+ * Input arguments:
+ * ctx - The per worker based context
+ * control - set to true if this worker is for the control plane
+ */
+static int
+nsr_recon_start_work(nsr_per_node_worker_t *ctx,
+ gf_boolean_t control)
+{
+ glfs_t *fs = NULL;
+ xlator_t *this = ctx->driver_ctx->this;
+ int32_t ret = 0;
+ glfs_fd_t *aux_fd = NULL; // fd of auxilary log
+ char lf[256];
+
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "starting work with volfile %s\n",
+ ctx->vol_file);
+
+ fs = glfs_new(ctx->id);
+ if (!fs) {
+ glusterfs_this_set(this);
+ nsr_worker_log(this->name, GF_LOG_ERROR,
+ "cannot create gfls context for thread %s\n",ctx->id);
+ return -1;
+ }
+
+ // For some vague reason, glfs init APIs seem to be clobbering "this". hence resetting it.
+ glusterfs_this_set(this);
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "init done. setting volfile %s\n",
+ ctx->vol_file);
+
+ ret = glfs_set_volfile(fs, ctx->vol_file);
+ if (ret != 0) {
+ glusterfs_this_set(this);
+ nsr_worker_log(this->name, GF_LOG_ERROR,
+ "cannot set volfile %s for thread %s\n",ctx->vol_file, ctx->id);
+ return -1;
+ }
+
+ // TBD - convert this to right /usr/local/var/log based log files.
+ sprintf(lf,"/tmp/logs/%s-%s",(control == _gf_true)?"con":"data",ctx->id);
+ glfs_set_logging (fs, lf, 7);
+ glusterfs_this_set(this);
+
+ ret = glfs_init (fs);
+ if (ret != 0) {
+ glusterfs_this_set(this);
+ nsr_worker_log(this->name, GF_LOG_ERROR, "cannot do init for thread %s with volfile %s\n",ctx->id, ctx->vol_file);
+ return -1;
+ }
+ glusterfs_this_set(this);
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "setting volfile %s done\n",
+ ctx->vol_file);
+
+ // If it is control thread, open the "/" as the aux_fd.
+ // All IOs happening via the fd will do the RPCs across the reconciliation
+ // processes. For some vague reason, the root seems to be open'able like a file.
+ // TBD - try to clean this up. (implement a virtual file???)
+ if (control == _gf_true) {
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "doing open for / \n");
+ aux_fd = glfs_open (fs, "/", O_RDWR);
+ // TBD - proper error handling. Stall reconciliation if such a thing happens?
+ if (aux_fd == NULL) {
+ nsr_worker_log(this->name, GF_LOG_ERROR,
+ "cannot open aux log file for thread %s\n",ctx->id);
+ } else {
+ nsr_worker_log(this->name, GF_LOG_ERROR,
+ "---opened aux log file for thread %s\n",ctx->id);
+ }
+ ctx->aux_fd = aux_fd;
+ }
+ glusterfs_this_set(this);
+ ctx->fs = fs;
+ return 0;
+}
+
+/*
+ *
+ * This function does the cleanup after reconciliation is done
+ * or before we start a new reconciliation.
+ *
+ * Input arguments:
+ * ctx - The per worker based context
+ * control - set to true if this worker is for the control plane
+ */
+static int
+nsr_recon_end_work(nsr_per_node_worker_t *ctx,
+ gf_boolean_t control)
+{
+ int32_t ret = 0;
+ xlator_t *this = ctx->driver_ctx->this;
+
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "doing fini for recon worker\n");
+
+ ret = glfs_fini(ctx->fs);
+ if (ret != 0) {
+ glusterfs_this_set(this);
+ nsr_worker_log(this->name, GF_LOG_ERROR, "cannot do fini for thread %s with volfile %s\n",ctx->id, ctx->vol_file);
+ return -1;
+ }
+ glusterfs_this_set(this);
+ ctx->fs = NULL;
+ if (control == _gf_true) {
+ glfs_close (ctx->aux_fd);
+ ctx->aux_fd = NULL;
+ }
+ return 0;
+}
+
+//called in case all worker functions run as sepeerate threads
+static void
+init_worker(nsr_per_node_worker_t *ctx, gf_boolean_t control)
+{
+ pthread_mutex_init(&(ctx->mutex), NULL);
+ pthread_cond_init(&(ctx->cv), NULL);
+ INIT_LIST_HEAD(&(ctx->head.list));
+}
+
+
+/*
+ * Control worker funct for getting changelog info on this node.
+ * calls directly functions to parse the changelog.
+ *
+ * Input arguments:
+ * ctx - The per worker based context
+ * control - set to true if this worker is for the control plane
+ */
+static void
+control_worker_func_0(nsr_per_node_worker_t *ctx,
+ nsr_recon_work_t *work)
+{
+ unsigned int index = ctx->index;
+ nsr_replica_worker_t *rw = &(ctx->driver_ctx->workers[index]);
+ xlator_t *this = ctx->driver_ctx->this;
+ nsr_recon_private_t *priv = this->private;
+ nsr_recon_driver_ctx_t *dr = ctx->driver_ctx;
+
+ ctx->is_control = _gf_true;
+
+ switch (work->req_id){
+ case NSR_WORK_ID_INI:
+ {
+ break;
+ }
+ case NSR_WORK_ID_FINI:
+ {
+ break;
+ }
+ case NSR_WORK_ID_GET_LAST_TERM_INFO:
+ {
+ nsr_recon_last_term_info_t lt;
+ nsr_reconciliator_info_t *recon_info = rw->recon_info;
+ // term is stuffed inside work->index. overloading.
+ int32_t term = work->index;
+
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "trying to get last term info for node %d with current term %d\n",index, term);
+
+ // TBD - handle errors
+ // This is called by the leader after it gets the current term.
+ // Makes searching easier.
+ nsr_recon_libchangelog_get_last_term_info(this, priv->changelog_base_path, term, &lt);
+ recon_info->last_term = lt.last_term;
+ recon_info->commited_ops = lt.commited_ops;
+ recon_info->last_index = lt.last_index;
+ recon_info->first_index = lt.first_index;
+
+
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "out of get last term info with current term %d. got ops %d with first %d and last %d \n",
+ recon_info->last_term, recon_info->commited_ops,
+ recon_info->first_index, recon_info->last_index);
+ break;
+ }
+ case NSR_WORK_ID_GET_GIVEN_TERM_INFO:
+ {
+ nsr_recon_last_term_info_t lt;
+ nsr_reconciliator_info_t *recon_info = rw->recon_info;
+ // term is stuffed inside work->index. overloading.
+ int32_t term = work->index;
+
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "trying to get term info for node %d for term %d\n",index, term);
+
+ // TBD - handle errors
+ nsr_recon_libchangelog_get_this_term_info(this,priv->changelog_base_path, term, &lt);
+
+ recon_info->last_term = lt.last_term;
+ recon_info->commited_ops = lt.commited_ops;
+ recon_info->last_index = lt.last_index;
+ recon_info->first_index = lt.first_index;
+
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "out of get term info for term %d. got ops %d with first %d and last %d \n",
+ recon_info->last_term, recon_info->commited_ops,
+ recon_info->last_index, recon_info->first_index);
+
+ break;
+ }
+ case NSR_WORK_ID_RECONCILIATOR_DO_WORK:
+ {
+ // For local resolution, the main driver thread does it.
+ // SO there is no way we can have this message for this node.
+ GF_ASSERT(0);
+
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "this message should not be sent \n");
+ break;
+ }
+ case NSR_WORK_ID_RESOLUTION_DO_WORK:
+ {
+ GF_ASSERT(0);
+
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "this message should not be sent \n");
+ break;
+ }
+ case NSR_WORK_ID_END_RECONCILIATION:
+ {
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "sending reconciliation end message to node %d\n", index);
+ nsr_recon_return_back(priv, dr->txn_id);
+ break;
+ }
+ case NSR_WORK_ID_GET_RECONCILATION_WINDOW:
+ {
+ nsr_reconciliator_info_t *recon_info = rw->recon_info;
+ // first_index and last_index at 0 indicates empty log.
+ // For non empty log, the first_index always starts at 1.
+ uint32_t num = (dr->workers[index].recon_info->last_index -
+ dr->workers[index].recon_info->first_index + 1);
+ nsr_recon_record_details_t *rd;
+ uint32_t i=0;
+
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "trying to get reconciliation window records for node %d for term %d with first %d last %d\n",
+ index, recon_info->last_term, recon_info->first_index, recon_info->last_index);
+
+ GF_ASSERT(num <= MAX_RECONCILIATION_WINDOW_SIZE);
+
+ // TBD - handle buffer allocation errors
+ rd = GF_CALLOC(num,
+ sizeof(nsr_recon_record_details_t),
+ gf_mt_recon_private_t);
+
+ // TBD - handle errors
+ nsr_recon_libchangelog_get_records(this, priv->changelog_base_path,
+ recon_info->last_term,
+ recon_info->first_index,
+ recon_info->last_index,
+ rd);
+ // The above function writes into rd from 0 to (num -1)
+ // We need to take care of this whenever we deal with records
+ for (i=0; i < num; i++) {
+ ENDIAN_CONVERSION_RD(rd[i], _gf_true); //ntohl
+ memcpy(&(recon_info->records[i].rec),
+ &(rd[i]),
+ sizeof(nsr_recon_record_details_t));
+ }
+
+ GF_FREE(rd);
+
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "got reconciliation window records for node %d for term %d \n",
+ index, recon_info->last_term);
+ break;
+ }
+ }
+
+ return;
+}
+
+// Control worker thread
+static void*
+control_worker_main_0(nsr_per_node_worker_t *ctx)
+{
+
+ ctx->is_control = _gf_true;
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "starting control worker func 0\n");
+
+ init_worker(ctx, 1);
+
+ while(1)
+ {
+ nsr_recon_work_t *work = NULL;
+ nsr_recon_driver_ctx_t *dr = ctx->driver_ctx;
+
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "waiting for work\n");
+
+ pthread_mutex_lock(&ctx->mutex);
+ while (list_empty(&(ctx->head.list))) {
+ pthread_cond_wait(&ctx->cv, &ctx->mutex);
+ }
+ pthread_mutex_unlock(&ctx->mutex);
+
+
+ list_for_each_entry(work, &(ctx->head.list), list) {
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "got work with id %d\n", work->req_id);
+ work->in_use = _gf_false;
+
+ // Call the main function.
+ control_worker_func_0(ctx, work);
+
+ atomic_dec(&(dr->outstanding));
+ break;
+ }
+
+ nsr_worker_log(this->name, GF_LOG_INFO,"deleting work item\n");
+ list_del_init (&work->list);
+ nsr_worker_log(this->name, GF_LOG_INFO,"finished deleting work item\n");
+ }
+
+ return NULL;
+}
+
+/*
+ * Control worker funct for getting changelog info on some other node.
+ * calls glfs functions to seek/read/write on aux_fd.
+ *
+ * Input arguments:
+ * ctx - The per worker based context
+ * control - set to true if this worker is for the control plane
+ */
+static void
+control_worker_func(nsr_per_node_worker_t *ctx,
+ nsr_recon_work_t *work)
+{
+ unsigned int index = ctx->index;
+ nsr_replica_worker_t *rw = &(ctx->driver_ctx->workers[index]);
+ nsr_recon_driver_ctx_t *dr = ctx->driver_ctx;
+
+ ctx->is_control = _gf_true;
+
+ switch (work->req_id){
+ case NSR_WORK_ID_INI:
+ {
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "calling nsr_recon_start_work\n");
+
+ // TBD - handle error in case nsr_recon_start_work gives error
+ nsr_recon_start_work(ctx, _gf_true);
+
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "finished nsr_recon_start_work\n");
+ break;
+ }
+ case NSR_WORK_ID_FINI:
+ {
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "calling nsr_recon_end_work\n");
+
+ // TBD - handle error in case nsr_recon_end_work gives error
+ nsr_recon_end_work(ctx, _gf_true);
+
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "finished nsr_recon_end_work\n");
+ break;
+ }
+ case NSR_WORK_ID_GET_LAST_TERM_INFO:
+ {
+ nsr_recon_last_term_info_t lt;
+ nsr_reconciliator_info_t *recon_info = rw->recon_info;
+ int32_t term = htonl(work->index); // overloading it
+
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "trying to get last term info for node %d with current term %d\n",index, work->index);
+
+ // first write the current term term number
+ // TBD - error handling for all the glfs APIs
+ glfs_lseek(ctx->aux_fd, nsr_recon_xlator_sector_4, SEEK_SET);
+ glfs_write(ctx->aux_fd, &term, sizeof(term), 0);
+ glfs_read(ctx->aux_fd, &lt, sizeof(lt), 0);
+ ENDIAN_CONVERSION_LT(lt, _gf_true); //ntohl
+ recon_info->last_term = lt.last_term;
+ recon_info->commited_ops = lt.commited_ops;
+ recon_info->last_index = lt.last_index;
+ recon_info->first_index = lt.first_index;
+
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "out of get last term info with current term %d. got ops %d with first %d and last %d \n",
+ recon_info->last_term, recon_info->commited_ops,
+ recon_info->last_index, recon_info->first_index);
+
+ break;
+ }
+ case NSR_WORK_ID_GET_GIVEN_TERM_INFO:
+ {
+ nsr_recon_last_term_info_t lt;
+ nsr_reconciliator_info_t *recon_info = rw->recon_info;
+ int32_t term = htonl(work->index); // overloading it
+
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "trying to get term info for node %d for term %d\n",index, work->index);
+
+ // first write the term number
+ // TBD - error handling for all the glfs APIs
+ glfs_lseek(ctx->aux_fd, nsr_recon_xlator_sector_3, SEEK_SET);
+ glfs_write(ctx->aux_fd, &term, sizeof(term), 0);
+ glfs_read(ctx->aux_fd, &lt, sizeof(lt), 0);
+ ENDIAN_CONVERSION_LT(lt, _gf_true); //ntohl
+ recon_info->last_term = lt.last_term;
+ recon_info->commited_ops = lt.commited_ops;
+ recon_info->last_index = lt.last_index;
+ recon_info->first_index = lt.first_index;
+
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "out of get term info for term %d. got ops %d with first %d and last %d \n",
+ recon_info->last_term, recon_info->commited_ops,
+ recon_info->last_index, recon_info->first_index);
+
+ break;
+ }
+ case NSR_WORK_ID_RECONCILIATOR_DO_WORK:
+ {
+ nsr_recon_role_t rr;
+ uint32_t i=0;
+ uint32_t num=0;
+ uint32_t idx = dr->reconciliator_index;
+ uint32_t term = dr->workers[idx].recon_info->last_term;
+ GF_ASSERT(idx == index);
+
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "trying to make this index %d as reconciliator for term %d\n", index, term);
+
+ // TBD - error handling for all the glfs APIs
+ glfs_lseek(ctx->aux_fd,
+ nsr_recon_xlator_sector_1,
+ SEEK_SET);
+
+ // We have all the info for all other nodes.
+ // Fill all that info when sending data to that process.
+ for (i=0; i < dr->replica_group_size; i++) {
+ if ( dr->workers[i].in_use &&
+ (dr->workers[i].recon_info->last_term == term)) {
+ rr.info[num].last_term =
+ dr->workers[i].recon_info->last_term;
+ rr.info[num].commited_ops =
+ dr->workers[i].recon_info->commited_ops;
+ rr.info[num].last_index =
+ dr->workers[i].recon_info->last_index;
+ rr.info[num].first_index =
+ dr->workers[i].recon_info->first_index;
+ strcpy(rr.info[num].name,
+ dr->workers[i].name);
+ }
+ num++;
+ }
+ rr.num = num;
+ rr.role = reconciliator;
+ ENDIAN_CONVERSION_RR(rr, _gf_false); //htonl
+ glfs_write(ctx->aux_fd, &rr, sizeof(rr), 0);
+
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "sent reconciliator info for term %d with node count as %d\n", term, num);
+
+ break;
+ }
+ case NSR_WORK_ID_RESOLUTION_DO_WORK:
+ {
+ nsr_recon_role_t rr;
+ unsigned int i=0, j=0;
+ unsigned int rec = dr->reconciliator_index;
+
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "trying to make this index %d as resolutor with reconciliator as %d\n",index, rec);
+
+ // TBD - error handling for all the glfs APIs
+ glfs_lseek(ctx->aux_fd,
+ nsr_recon_xlator_sector_1,
+ SEEK_SET);
+ rr.num = 2;
+
+ // Fill in info[0] as info for the node for which we are seeking resolution.
+ // Fill in info[1] as info of the reconciliator node.
+ // The function nsr_recon_driver_set_role() that will be called when
+ // this message reaches the node will look at index 1 for term information
+ // related to the reconciliator.
+ for (i=0; i < 2; i++) {
+ (i == 0) ? (j = index) : (j = rec);
+ rr.info[i].last_term =
+ dr->workers[j].recon_info->last_term;
+ rr.info[i].commited_ops =
+ dr->workers[j].recon_info->commited_ops;
+ rr.info[i].last_index =
+ dr->workers[j].recon_info->last_index;
+ rr.info[i].first_index =
+ dr->workers[j].recon_info->first_index;
+ // The name is used as the key to convert indices since
+ // the reconciliator index could be different across the nodes.
+ strcpy(rr.info[i].name,
+ dr->workers[j].name);
+ if (i == 0) {
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "this node info term=%d, ops=%d, first=%d, last=%d\n",
+ rr.info[i].last_term, rr.info[i].commited_ops,
+ rr.info[i].first_index,rr.info[i].last_index);
+ } else {
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "reconciliator node info term=%d, ops=%d, first=%d, last=%d\n",
+ rr.info[i].last_term, rr.info[i].commited_ops,
+ rr.info[i].first_index,rr.info[i].last_index);
+ }
+ }
+ rr.role = resolutor;
+ ENDIAN_CONVERSION_RR(rr, _gf_false); //htonl
+ glfs_write(ctx->aux_fd, &rr, sizeof(rr), 0);
+
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "sent message to this node %d resolutor with reconciliator as %d\n", index, rec);
+
+ break;
+ }
+ case NSR_WORK_ID_END_RECONCILIATION:
+ {
+ char c[4];
+ uint32_t old = htonl(dr->txn_id);
+
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "sending reconciliation end message to node %d\n", index);
+
+ memcpy(c, &old, sizeof(uint32_t));
+ // TBD - error handling for all the glfs APIs
+ glfs_lseek(ctx->aux_fd,
+ nsr_recon_xlator_sector_0,
+ SEEK_SET);
+ glfs_write(ctx->aux_fd, c, sizeof(c), 0);
+
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "finished sending reconciliation end message to node %d\n", index);
+
+ break;
+ }
+ case NSR_WORK_ID_GET_RECONCILATION_WINDOW:
+ {
+ nsr_recon_log_info_t li;
+ nsr_reconciliator_info_t *recon_info = rw->recon_info;
+ uint32_t i = 0;
+ uint32_t num = (dr->workers[index].recon_info->last_index -
+ dr->workers[index].recon_info->first_index +1);
+ nsr_recon_record_details_t *rd;
+
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "trying to get reconciliation window records for node %d for term %d with first %d last %d\n",
+ index, recon_info->last_term, recon_info->first_index, recon_info->last_index);
+
+ GF_ASSERT(num <= MAX_RECONCILIATION_WINDOW_SIZE);
+
+ // TBD - error handling for all the glfs APIs
+ glfs_lseek(ctx->aux_fd, nsr_recon_xlator_sector_2, SEEK_SET);
+
+ // write to node what term & indices we are interested
+ li.term = recon_info->last_term;
+ li.first_index = recon_info->first_index;
+ li.last_index = recon_info->last_index;
+ ENDIAN_CONVERSION_LI(li, _gf_false); //htonl
+ glfs_write(ctx->aux_fd, &li, sizeof(li), 0);
+
+ // then read
+ rd = GF_CALLOC(num,
+ sizeof(nsr_recon_record_details_t),
+ gf_mt_recon_private_t);
+ glfs_read(ctx->aux_fd, rd, num * sizeof(nsr_recon_record_details_t), 0);
+ for (i=0; i < num; i++) {
+ ENDIAN_CONVERSION_RD(rd[i], _gf_true); //ntohl
+ memcpy(&(recon_info->records[i].rec),
+ &(rd[i]),
+ sizeof(nsr_recon_record_details_t));
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "get_reconcilaition_window:Got %d at index %d\n",
+ recon_info->records[i].rec.type,
+ i + recon_info->first_index);
+ }
+ free(rd);
+
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "got reconciliation window records for node %d for term %d \n",
+ index, recon_info->last_term);
+ break;
+ }
+ }
+
+ return;
+}
+
+// Control worker thread
+static void*
+control_worker_main(nsr_per_node_worker_t *ctx)
+{
+ unsigned int index = ctx->index;
+
+ ctx->is_control = _gf_true;
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "starting control worker func\n");
+
+ // if this is for local processing, call the changelog parsing calls directly
+ if (index == 0) {
+ control_worker_main_0(ctx);
+ return NULL;
+ }
+
+ init_worker(ctx, 1);
+
+
+ while(1)
+ {
+ nsr_recon_work_t *work = NULL;
+ nsr_recon_driver_ctx_t *dr = ctx->driver_ctx;
+
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "waiting for work\n");
+
+ pthread_mutex_lock(&ctx->mutex);
+ while (list_empty(&(ctx->head.list))) {
+ pthread_cond_wait(&ctx->cv, &ctx->mutex);
+ }
+ pthread_mutex_unlock(&ctx->mutex);
+
+
+ list_for_each_entry(work, &(ctx->head.list), list) {
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "got work with id %d\n", work->req_id);
+ work->in_use = _gf_false;
+ control_worker_func(ctx,work);
+ atomic_dec(&(dr->outstanding));
+ break;
+ }
+ nsr_worker_log(this->name, GF_LOG_INFO,"deleting work item\n");
+ list_del_init (&work->list);
+ nsr_worker_log(this->name, GF_LOG_INFO,"finished deleting work item\n");
+ }
+
+ return NULL;
+}
+
+/*
+ * This function gets called if this process is chosen as the reconciliator
+ * for this replica group. It would have already got the records for the last term
+ * for the indices that are required (from the first HOLE to last index) from
+ * all other nodes that also witnessed that term. COmpare all the records and
+ * compute the work required.
+ *
+ * Input arguments
+ * ctx - driver context. All recon work is stored in workers[0].recon_info
+ */
+static void
+compute_reconciliation_work(nsr_recon_driver_ctx_t *ctx)
+{
+ uint32_t i=0, j=0;
+ nsr_reconciliator_info_t *my_recon = ctx->workers[0].recon_info;
+ uint32_t num = (my_recon->last_index - my_recon->first_index + 1);
+
+ for (i=0; i < num; i++) {
+ nsr_log_type_t orig, new;
+ unsigned int src = 0;
+ orig = new = my_recon->records[i].rec.type;
+ nsr_recon_work_type_t tw = NSR_RECON_WORK_NONE;
+ // index 0 means this node. Look at all other nodes.
+ for (j=1; j < ctx->replica_group_size; j++) {
+ if (ctx->workers[j].in_use) {
+ nsr_log_type_t pr = ctx->workers[j].recon_info->records[i].work.type;
+ if ((new != pr) && (pr > new)) {
+ src = j;
+ new = (new | pr);
+ }
+ }
+ }
+ // TBD - compare data if new and orig are all FILLs. (can detect changelog corruption)
+ // Right now we compare if both orig and new are psuedo holes since
+ // only that is of interest to us.
+ if (orig != new) {
+ if ((orig == NSR_LOG_HOLE) && (new == NSR_LOG_PSEUDO_HOLE))
+ tw = NSR_RECON_WORK_HOLE_TO_PSEUDO_HOLE;
+ else if ((orig == NSR_LOG_HOLE) && (new == NSR_LOG_FILL))
+ tw = NSR_RECON_WORK_HOLE_TO_FILL;
+ else if ((orig == NSR_LOG_PSEUDO_HOLE) && (new == NSR_LOG_PSEUDO_HOLE))
+ tw = NSR_RECON_WORK_COMPARE_PSEUDO_HOLE;
+ else if ((orig == NSR_LOG_PSEUDO_HOLE) && (new == NSR_LOG_FILL))
+ tw = NSR_RECON_WORK_HOLE_TO_FILL;
+ }
+ if (tw != NSR_RECON_WORK_NONE) {
+ my_recon->records[i].work.type = tw;
+ my_recon->records[i].work.source = src;
+ // Overwrite the record
+ memcpy(&(my_recon->records[i].rec),
+ &(ctx->workers[src].recon_info->records[i].rec),
+ sizeof(nsr_recon_record_details_t));
+ }
+ }
+ return;
+}
+
+static void
+nsr_recon_in_use(nsr_recon_driver_ctx_t *ctx,
+ uint32_t i,
+ gf_boolean_t in_use);
+
+/*
+ * Write the role and associated information to the node.
+ * This gets called from recon xlator indicating node is either
+ * leader, reconciliator or should do resolution.
+ * First we undo the last role to make sure we clean up.
+ *
+ * Input arguments
+ * ctx - driver context.
+ * rr - Role information.
+ * If leader, the thread now sends the list of all nodes that are part of
+ * the current replica group. Use that to find out the activate the
+ * required worker threads.
+ * If reconciliator, the leader node would have sent information about
+ * all nodes which saw last term as the reconciliator.
+ * If resolution to be done, then rr.info[0] will have this node's info
+ * which the leader would have got earlier. rr[1].info will have the
+ * info regarding the reconciliator.
+ * txn_id - All role changes(except when leader becomes reconciliator or resolutor)
+ * would be initiated as write to the recon xlator which would have got a frame from
+ * either the brick process(leader change) or other reconciliation process.
+ * The write function would return immediately after storing the frame which
+ * needs to be returned back after the actual reconciliation is done.
+ * For that we store the frame against this id which acts as a key.
+ */
+gf_boolean_t
+nsr_recon_driver_set_role(nsr_recon_driver_ctx_t *ctx,
+ nsr_recon_role_t *rr,
+ uint32_t txn_id)
+{
+ uint8_t i=0, j=0;
+ pthread_mutex_lock(&(ctx->mutex));
+ ctx->state = rr->role;
+ // First make all the threads uninitialise
+ for (i = 0; i < ctx->replica_group_size; i++) {
+ nsr_recon_in_use(ctx, i, _gf_false);
+ }
+ if (rr->role == leader) {
+
+ // First set info this node
+ nsr_recon_in_use(ctx, 0, _gf_true);
+ ctx->workers[0].recon_info = GF_CALLOC (1,
+ sizeof (nsr_reconciliator_info_t),
+ gf_mt_recon_private_t);
+ if (!ctx->workers[0].recon_info) {
+ return _gf_false;
+ }
+ ctx->current_term = rr->current_term;
+
+ // Find rest of the nodes
+ for (i=1; i < ctx->replica_group_size; i++) {
+ for (j=0 ; j < rr->num; j++) {
+ // TBD - make this strcmp later when etcd servers set properly
+ //if (!strcmp(ctx->workers[i].name, rr->info[j].name)) {
+ if (strstr(ctx->workers[i].name, rr->info[j].name)) {
+ nsr_driver_log(this->name, GF_LOG_INFO,
+ "nsr_recon_driver_set_role: this as leader. found other server %s\n",
+ ctx->workers[i].name);
+
+ nsr_recon_in_use(ctx, i, _gf_true);
+ // Allocate this here. This will get later filled when
+ // the leader tries to get last term information from all
+ // the nodes
+ ctx->workers[i].recon_info = GF_CALLOC (1,
+ sizeof (nsr_reconciliator_info_t),
+ gf_mt_recon_private_t);
+ if (!ctx->workers[i].recon_info) {
+ return _gf_false;
+ }
+ break;
+ }
+ }
+ }
+ ctx->reconciliator_index = -1;
+ } else if (rr->role == reconciliator) {
+ ctx->reconciliator_index = 0;
+ // Copy information about all the other members which had the same term
+ for (i=0; i < rr->num; i++) {
+ for (j=0; j < ctx->replica_group_size; j++) {
+ //if (!strcmp(rr->info[i].name, ctx->workers[j].name)) {
+ if (strstr(ctx->workers[j].name, rr->info[i].name)) {
+ nsr_driver_log(this->name, GF_LOG_INFO,
+ "nsr_recon_driver_set_role: this as reconciliator. found other server %s\n",
+ ctx->workers[j].name);
+ ctx->workers[j].recon_info = GF_CALLOC (1,
+ sizeof (nsr_reconciliator_info_t),
+ gf_mt_recon_private_t);
+ if (!ctx->workers[j].recon_info) {
+ return _gf_false;
+ }
+ ctx->workers[j].recon_info->last_term =
+ rr->info[i].last_term;
+ ctx->workers[j].recon_info->commited_ops =
+ rr->info[i].commited_ops;
+ ctx->workers[j].recon_info->last_index =
+ rr->info[i].last_index;
+ ctx->workers[j].recon_info->first_index =
+ rr->info[i].first_index;
+ nsr_recon_in_use(ctx, j, _gf_true);
+ break;
+ }
+ }
+ }
+ } else if (rr->role == resolutor) {
+ for (j=0; j < ctx->replica_group_size; j++) {
+ // info[1] has the information regarding the reconciliator
+ if (strstr(ctx->workers[j].name, rr->info[1].name)) {
+ //if (!strcmp(rr->info[1].name, ctx->workers[j].name)) {
+ nsr_driver_log(this->name, GF_LOG_INFO,
+ "nsr_recon_driver_set_role: this as resolutor. found other server %s as reconciliator\n",
+ ctx->workers[1].name);
+ ctx->workers[j].recon_info = GF_CALLOC (1,
+ sizeof (nsr_reconciliator_info_t),
+ gf_mt_recon_private_t);
+ if (!ctx->workers[j].recon_info) {
+ return _gf_false;
+ }
+ ctx->workers[j].recon_info->last_term =
+ rr->info[1].last_term;
+ ctx->workers[j].recon_info->commited_ops =
+ rr->info[1].commited_ops;
+ ctx->workers[j].recon_info->last_index =
+ rr->info[1].last_index;
+ ctx->workers[j].recon_info->first_index =
+ rr->info[1].first_index;
+ ctx->reconciliator_index = j;
+ nsr_recon_in_use(ctx, j, _gf_true);
+ GF_ASSERT(ctx->reconciliator_index != 0);
+ break;
+ }
+ }
+ ctx->workers[0].recon_info = GF_CALLOC (1,
+ sizeof (nsr_reconciliator_info_t),
+ gf_mt_recon_private_t);
+ if (!ctx->workers[0].recon_info) {
+ return _gf_false;
+ }
+ // info[0] has all info for this node
+ ctx->workers[0].recon_info->last_term = rr->info[0].last_term;
+ ctx->workers[0].recon_info->commited_ops = rr->info[0].commited_ops;
+ ctx->workers[0].recon_info->last_index = rr->info[0].last_index;
+ ctx->workers[0].recon_info->first_index = rr->info[0].first_index;
+ nsr_recon_in_use(ctx, 0, _gf_true);
+ }
+
+ ctx->txn_id = txn_id;
+ // Signal the main driver thread
+ pthread_cond_signal(&(ctx->cv));
+ pthread_mutex_unlock(&(ctx->mutex));
+ return _gf_true;
+}
+
+
+/*
+ * This function gets called if this process is chosen to sync itself with
+ * the reconciliator.
+ *
+ * Input arguments
+ * ctx - driver context.
+ * my_info - local changelog info that has all the local records for indices that require work
+ * his_info - reconciliator's info that has all the golden copies
+ * invalidate - if set to true, then do not consult local records
+ */
+
+static void
+compute_resolution_work(nsr_recon_driver_ctx_t *ctx,
+ nsr_reconciliator_info_t *my_info,
+ nsr_reconciliator_info_t *his_info,
+ gf_boolean_t invalidate)
+{
+ uint32_t i=0;
+ uint32_t num = (my_info->last_index - my_info->first_index + 1);
+
+ for (i=0; i < num; i++) {
+ nsr_log_type_t orig, new;
+ nsr_recon_work_type_t tw = NSR_RECON_WORK_NONE;
+ orig = my_info->records[i].rec.type;
+ if (invalidate)
+ orig = NSR_LOG_HOLE;
+ new = his_info->records[i].rec.type;
+ // TBD - we can never have PSUEDO_HOLE in reconciliator's info
+ // We should have taken care of that during reconciliation.
+ // Put an assert to validate that.
+ if (new != orig) {
+ if ((orig != NSR_LOG_FILL) && (new == NSR_LOG_FILL))
+ tw = NSR_RECON_WORK_HOLE_TO_FILL;
+ else if ((orig != NSR_LOG_HOLE) && (new == NSR_LOG_HOLE))
+ tw = NSR_RECON_WORK_UNDO_FILL;
+ }
+ // copy the records anyway
+ my_info->records[i].work.type = tw;
+ my_info->records[i].work.source = ctx->reconciliator_index;
+ memcpy(&(my_info->records[i].rec),
+ &(his_info->records[i].rec),
+ sizeof(nsr_recon_record_details_t));
+ }
+ return;
+}
+
+
+// Create an glfs object
+static struct glfs_object *
+create_obj(nsr_per_node_worker_t *ctx, char *gfid_str)
+{
+ struct glfs_object *obj = NULL;
+ uuid_t gfid;
+
+ uuid_parse(gfid_str, gfid);
+
+ obj = glfs_h_create_from_handle(ctx->fs, gfid, GFAPI_HANDLE_LENGTH, NULL);
+ if (obj == NULL) {
+ GF_ASSERT(obj != NULL);
+ nsr_worker_log(this->name, GF_LOG_ERROR,
+ "creating of handle failed\n");
+ return NULL;
+ }
+ return obj;
+}
+
+/*
+ * Function to apply the actual record onto the local brick.
+ * prior to this we should have read all the data from the
+ * brick that has the data.
+ *
+ * Input parameters:
+ * ctx - per node worker context that has the fs for communicating to brick
+ * ri - Reconciliation record that needs fixup
+ * dict - So that NSR server translator on brick applis fixup only on this brick
+ * and the changelog translator consumes term and index.
+ */
+
+static void
+apply_record(nsr_per_node_worker_t *ctx,
+ nsr_reconciliation_record_t *ri,
+ dict_t * dict)
+{
+ struct glfs_fd *fd = NULL;
+ struct glfs_object *obj = NULL;
+
+
+ if (ri->rec.op == GF_FOP_WRITE) {
+
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "DOing write for file %s @offset %d for len %d\n",
+ ri->rec.gfid, ri->rec.offset, ri->rec.len);
+
+ // The file has got deleted on the source. Hence just ignore this.
+ // TBD - get a way to just stuff the log entry without writing the data so that
+ // changelogs remain identical.
+ if (ri->work.data == NULL) {
+ return;
+ }
+
+ if ((obj = create_obj(ctx,ri->rec.gfid)) == NULL) return;
+
+ fd = glfs_h_open_with_xdata(ctx->fs, obj, O_RDWR, dict);
+ if (fd == NULL) {
+ GF_ASSERT(0);
+ nsr_worker_log(this->name, GF_LOG_ERROR,
+ "open for file %s failed\n",
+ ri->rec.gfid);
+ return;
+ }
+ if (glfs_lseek_with_xdata(fd, ri->rec.offset, SEEK_SET, dict) != ri->rec.offset) {
+ GF_ASSERT(0);
+ nsr_worker_log(this->name, GF_LOG_ERROR,
+ "lseek for file %s failed at offset %d\n",
+ ri->rec.gfid, ri->rec.offset);
+ return;
+ }
+ if (glfs_write_with_xdata(fd, ri->work.data, ri->rec.len, 0, dict) != ri->rec.len) {
+ GF_ASSERT(0);
+ nsr_worker_log(this->name, GF_LOG_ERROR,
+ "write for file %s failed for bytes %d\n",
+ ri->rec.gfid, ri->rec.len);
+ return;
+ }
+ glfs_close_with_xdata(fd, dict);
+
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "Finished DOing write for gfid %s @offset %d for len %d\n",
+ ri->rec.gfid, ri->rec.offset, ri->rec.len);
+
+ } else if (ri->rec.op == GF_FOP_FTRUNCATE) {
+
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "DOing truncate for file %s @offset %d \n",
+ ri->rec.gfid, ri->rec.offset);
+
+ if ((obj = create_obj(ctx, ri->rec.gfid)) == NULL) return;
+
+ fd = glfs_h_open_with_xdata(ctx->fs, obj, O_RDWR, dict);
+ if (fd == NULL) {
+ GF_ASSERT(0);
+ nsr_worker_log(this->name, GF_LOG_ERROR,
+ "open for file %s failed\n",
+ ri->rec.gfid);
+ return;
+ }
+ if (glfs_ftruncate_with_xdata(fd, ri->rec.offset, dict) == -1) {
+ GF_ASSERT(0);
+ nsr_worker_log(this->name, GF_LOG_ERROR
+ "trunctae for file %s failed @offset %d\n",
+ ri->rec.gfid,ri->rec.offset );
+ return;
+ }
+ glfs_close_with_xdata(fd, dict);
+
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "Finished DOing truncate for gfid %s @offset %d \n",
+ ri->rec.gfid, ri->rec.offset);
+
+ } else if ((ri->rec.op == GF_FOP_FREMOVEXATTR) ||
+ (ri->rec.op == GF_FOP_REMOVEXATTR) ||
+ (ri->rec.op == GF_FOP_SETXATTR) ||
+ (ri->rec.op == GF_FOP_FSETXATTR)) {
+
+ uint32_t k_s = 0, v_s = 0;
+ char *t_b= NULL;
+ uint32_t num = 0;
+
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "Doing set extended attr for file %s \n",
+ ri->rec.gfid);
+
+ // The file has got deleted on the source. Hence just ignore this.
+ // TBD - get a way to just stuff the log entry without writing the data so that
+ // changelogs remain identical.
+ if (ri->work.data == NULL) {
+ return;
+ }
+
+ if ((obj = create_obj(ctx, ri->rec.gfid)) == NULL) return;
+
+ if (obj->inode->ia_type == IA_IFDIR)
+ fd = glfs_h_opendir_with_xdata(ctx->fs, obj, dict);
+ else
+ fd = glfs_h_open_with_xdata(ctx->fs, obj, O_RDWR, dict);
+ if (fd == NULL) {
+ GF_ASSERT(0);
+ nsr_worker_log(this->name, GF_LOG_ERROR,
+ "open for file %s failed\n",
+ ri->rec.gfid);
+ return;
+ }
+
+ if(get_xattr_total_size(fd, &t_b, &k_s, &v_s, &num, dict) == -1) {
+ if (t_b) free(t_b);
+ GF_ASSERT(0);
+ nsr_worker_log(this->name, GF_LOG_ERROR,
+ "list of xattr of %s failed\n", ri->rec.gfid);
+ return;
+ }
+
+ delete_xattr(fd, dict, t_b, num);
+
+ // Set one special dict flag to indicate the opcode so that
+ // the opcode gets set to this
+ if (dict_set_int32(dict,"recon-xattr-opcode",ri->rec.op)) {
+ GF_ASSERT(0);
+ nsr_worker_log(this->name, GF_LOG_ERROR,
+ "setting opcode to %d failed\n",ri->rec.op);
+ return;
+ }
+
+ fill_xattr(fd, dict, ri->work.data, ri->work.num);
+
+ glfs_close_with_xdata(fd, dict);
+
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "Finsihed Doing set extended attr for %s \n",
+ ri->rec.gfid);
+
+ } else if (ri->rec.op == GF_FOP_CREATE) {
+
+ uuid_t gfid;
+
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "Doing create for file %s \n",
+ ri->rec.gfid);
+
+ // TBD - add mode and flags later
+ uuid_parse(ri->rec.gfid, gfid);
+ if ((obj = create_obj(ctx, ri->rec.pargfid)) == NULL) return;
+
+ if (glfs_h_creat_with_xdata(ctx->fs, obj, ri->rec.entry, O_RDWR, 0777, NULL, gfid, dict) == NULL) {
+ GF_ASSERT(0);
+ nsr_worker_log(this->name, GF_LOG_ERROR,
+ "Failure for Doing create for file %s\n",
+ ri->rec.entry);
+ return;
+ }
+
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "Finished Doing create for file %s \n",
+ ri->rec.entry);
+
+ } else if (ri->rec.op == GF_FOP_MKNOD) {
+
+ uuid_t gfid;
+
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "Doing mknod for file %s \n",
+ ri->rec.entry);
+
+ // TBD - add mode and flags later
+ uuid_parse(ri->rec.gfid, gfid);
+ if ((obj = create_obj(ctx, ri->rec.pargfid)) == NULL) return;
+
+ if (glfs_h_mknod_with_xdata(ctx->fs, obj, ri->rec.entry, O_RDWR, 0777, NULL, gfid, dict) == NULL) {
+ GF_ASSERT(0);
+ nsr_worker_log(this->name, GF_LOG_ERROR,
+ "Failure for Doing mknod for file %s\n",
+ ri->rec.entry);
+ return;
+ }
+
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "Finished Doing mknod for file %s \n",
+ ri->rec.entry);
+
+ } else if (ri->rec.op == GF_FOP_MKDIR) {
+
+ uuid_t gfid;
+
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "Doing mkdir for dir %s \n",
+ ri->rec.gfid);
+
+ // TBD - add mode and flags later
+ uuid_parse(ri->rec.gfid, gfid);
+ if ((obj = create_obj(ctx, ri->rec.pargfid)) == NULL) return;
+
+ if (glfs_h_mkdir_with_xdata(ctx->fs, obj, ri->rec.entry, 0777, NULL, gfid, dict) != 0) {
+ GF_ASSERT(0);
+ nsr_worker_log(this->name, GF_LOG_ERROR,
+ "Failure for Doing mkdir for file %s\n",
+ ri->rec.entry);
+ return;
+ }
+
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "Finished Doing mkdir for file %s \n",
+ ri->rec.entry);
+
+ } else if ((ri->rec.op == GF_FOP_RMDIR) || (ri->rec.op == GF_FOP_UNLINK)) {
+
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "Doing rmdir/ublink for dir %s \n",
+ ri->rec.entry);
+
+ if ((obj = create_obj(ctx, ri->rec.pargfid)) == NULL) return;
+ if (glfs_h_unlink_with_xdata(ctx->fs, obj, ri->rec.entry, dict) != 0) {
+ GF_ASSERT(0);
+ nsr_worker_log(this->name, GF_LOG_ERROR,
+ "Failure for Doing rmdir/unlink for file %s\n",
+ ri->rec.entry);
+ return;
+ }
+
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "Finished Doing rmdir/unlink for file %s \n",
+ ri->rec.entry);
+
+ } else if (ri->rec.op == GF_FOP_SYMLINK) {
+
+ uuid_t gfid;
+
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "Doing symlink for file %s to file %s \n",
+ ri->rec.entry, ri->rec.link_path);
+
+ if ((obj = create_obj(ctx, ri->rec.pargfid)) == NULL) return;
+ uuid_parse(ri->rec.gfid, gfid);
+
+ if (glfs_h_symlink_with_xdata(ctx->fs, obj, ri->rec.entry, ri->rec.link_path, NULL, gfid, dict) == NULL) {
+ GF_ASSERT(0);
+ nsr_worker_log(this->name, GF_LOG_ERROR,
+ "Failed to Doing symlink for file %s to file %s \n",
+ ri->rec.entry, ri->rec.link_path);
+ return;
+ }
+
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "Finished Doing symlink for file %s to file %s \n",
+ ri->rec.entry, ri->rec.link_path);
+
+ } else if (ri->rec.op == GF_FOP_LINK) {
+
+ struct glfs_object *to_obj = NULL;
+
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "Doing hard link for file %s to file %s \n",
+ ri->rec.entry, ri->rec.gfid);
+
+ if ((obj = create_obj(ctx, ri->rec.pargfid)) == NULL) return;
+ if ((to_obj = create_obj(ctx, ri->rec.gfid)) == NULL) return;
+
+ if (glfs_h_link_with_xdata(ctx->fs, to_obj, obj, ri->rec.entry, dict) == -1) {
+ GF_ASSERT(0);
+ nsr_worker_log(this->name, GF_LOG_ERROR,
+ "Failed to Doing hard link for file %s to file %s \n",
+ ri->rec.entry, ri->rec.gfid);
+ return;
+ }
+
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "Finsihed doing hard link for file %s to file %s \n",
+ ri->rec.entry, ri->rec.gfid);
+
+ } else if (ri->rec.op == GF_FOP_RENAME) {
+
+ struct glfs_object *to_obj = NULL;
+
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "Doing rename for file %s to file %s \n",
+ ri->rec.entry, ri->rec.newloc);
+
+ if ((obj = create_obj(ctx, ri->rec.pargfid)) == NULL) return;
+ if ((to_obj = create_obj(ctx, ri->rec.gfid)) == NULL) return;
+
+ if (glfs_h_rename_with_xdata(ctx->fs, obj, ri->rec.entry, to_obj, ri->rec.newloc, dict) == -1) {
+ GF_ASSERT(0);
+ nsr_worker_log(this->name, GF_LOG_ERROR,
+ "Failed to Doing rename for file %s to file %s \n",
+ ri->rec.entry, ri->rec.newloc);
+ return;
+ }
+
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "Finsihed doing renam for file %s to file %s \n",
+ ri->rec.entry, ri->rec.newloc);
+
+
+ } else if ((ri->rec.op == GF_FOP_SETATTR) || (ri->rec.op == GF_FOP_FSETATTR)) {
+
+ struct iatt iatt = {0, };
+ int valid = 0;
+ int ret = -1;
+
+ // TBD - do the actual settings once we do that
+ // right now we just set the mode so that changelog gets filled
+
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "Doing attr for file %s \n",
+ ri->rec.gfid);
+
+ if ((obj = create_obj(ctx, ri->rec.gfid)) == NULL) return;
+
+ if (obj->inode->ia_type == IA_IFDIR)
+ fd = glfs_h_opendir_with_xdata(ctx->fs, obj, dict);
+ else
+ fd = glfs_h_open_with_xdata(ctx->fs, obj, O_RDWR, dict);
+ if (fd == NULL) {
+ GF_ASSERT(0);
+ nsr_worker_log(this->name, GF_LOG_ERROR,
+ "open for file %s failed\n",
+ ri->rec.gfid);
+ return;
+ }
+
+ iatt.ia_prot = ia_prot_from_st_mode(777);
+ valid = GF_SET_ATTR_MODE;
+
+
+ // Set one special dict flag to indicate the opcode so that
+ // the opcode gets set to this
+ if (dict_set_int32(dict,"recon-attr-opcode",ri->rec.op)) {
+ GF_ASSERT(0);
+ nsr_worker_log(this->name, GF_LOG_ERROR,
+ "setting opcode to %d failed\n",ri->rec.op);
+ return;
+ }
+
+ ret = glfs_fsetattr_with_xdata(fd, &iatt, valid, dict);
+ if (ret == -1) {
+ GF_ASSERT(0);
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "failed Doing attr for file %s \n",
+ ri->rec.gfid);
+ return;
+ }
+
+ glfs_close_with_xdata(fd, dict);
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "Doing attr for file %s \n",
+ ri->rec.gfid);
+
+ }
+
+ return;
+}
+
+//return back opcodes that requires reading from source
+static gf_boolean_t
+recon_check_changelog(nsr_recon_record_details_t *rd)
+{
+ return((rd->op == GF_FOP_WRITE) ||
+ (rd->op == GF_FOP_FSETATTR) ||
+ (rd-> op == GF_FOP_SETATTR) ||
+ (rd->op == GF_FOP_FREMOVEXATTR) ||
+ (rd->op == GF_FOP_SETXATTR) ||
+ (rd->op == GF_FOP_FSETXATTR) ||
+ (rd->op == GF_FOP_SYMLINK));
+
+}
+
+// TBD
+static gf_boolean_t
+recon_compute_undo(nsr_recon_record_details_t *rd)
+{
+ return(_gf_false);
+}
+
+
+/*
+ * Function that talks to the brick for data tranfer.
+ *
+ * Input arguments:
+ * ctx - worker context
+ * work - pointer to work object
+ */
+static void
+data_worker_func(nsr_per_node_worker_t *ctx,
+ nsr_recon_work_t *work)
+{
+ nsr_recon_driver_ctx_t *dr = ctx->driver_ctx;
+ nsr_reconciliation_record_t *ri = NULL;
+ nsr_recon_record_details_t *rd = NULL;
+ glfs_fd_t *fd = NULL;
+ int wip = 0;
+
+ switch (work->req_id){
+ case NSR_WORK_ID_INI:
+ {
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "started data ini \n");
+
+ nsr_recon_start_work(ctx, _gf_false);
+
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "finished data ini \n");
+ break;
+ }
+ case NSR_WORK_ID_FINI:
+ {
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "started data fini \n");
+
+ nsr_recon_end_work(ctx, _gf_false);
+
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "finished data fini \n");
+ break;
+ }
+ case NSR_WORK_ID_SINGLE_RECONCILIATION_READ:
+ {
+ dict_t * dict = NULL;
+ // first_index always starts with 1 but records starts at 0.
+ wip = work->index - (dr->workers[0].recon_info->first_index);
+ ri = &(dr->workers[0].recon_info->records[wip]);
+ rd = &(ri->rec);
+
+ dict = dict_new ();
+ if (!dict) {
+ GF_ASSERT(0);
+ nsr_worker_log(this->name, GF_LOG_ERROR,
+ "failed allocating for dictionary\n");
+ goto commit_out;
+ }
+ if (dict_set_int32(dict,RECON_TERM_XATTR,ri->work.term)) {
+ GF_ASSERT(0);
+ nsr_worker_log(this->name, GF_LOG_ERROR,
+ "error setting term in dict\n");
+ goto commit_out;
+ }
+ if (dict_set_int32(dict,RECON_INDEX_XATTR,ri->work.index)) {
+ GF_ASSERT(0);
+ nsr_worker_log(this->name, GF_LOG_ERROR,
+ "error setting term in dict\n");
+ goto commit_out;
+ }
+
+ if (rd->op == GF_FOP_WRITE) {
+
+ // record already copied.
+ // copy data to this node's info.
+ struct glfs_fd *fd = NULL;
+ struct glfs_object *obj = NULL;
+ uuid_t gfid;
+
+ uuid_parse(ri->rec.gfid, gfid);
+
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "started recon read for file %s at offset %d at len %d\n",
+ ri->rec.gfid, rd->offset, rd->len);
+
+ obj = glfs_h_create_from_handle(ctx->fs, gfid, GFAPI_HANDLE_LENGTH, NULL);
+ if (obj == NULL) {
+ GF_ASSERT(obj != NULL);
+ nsr_worker_log(this->name, GF_LOG_ERROR,
+ "creating of handle failed\n");
+ goto read_out;
+ }
+
+ // The file has probably got deleted.
+ fd = glfs_h_open_with_xdata(ctx->fs, obj, O_RDONLY, dict);
+ if (fd == NULL) {
+ GF_ASSERT(fd != NULL);
+ nsr_worker_log(this->name, GF_LOG_ERROR,
+ "opening of file failed\n");
+ goto read_out;
+ }
+
+ if (glfs_lseek_with_xdata(fd, rd->offset, SEEK_SET, dict) != rd->offset) {
+ GF_ASSERT(0);
+ nsr_worker_log(this->name, GF_LOG_ERROR,
+ "lseek of file failed to offset %d\n", rd->offset);
+ goto read_out;
+ }
+
+ ri->work.data = GF_CALLOC(rd->len , sizeof(char), gf_mt_recon_private_t);
+ if (glfs_read_with_xdata(fd, ri->work.data, rd->len, 0, dict) != rd->len) {
+ GF_ASSERT(0);
+ nsr_worker_log(this->name, GF_LOG_ERROR,
+ "read of file failed to offset %d for bytes %d\n", rd->offset, rd->len);
+ goto read_out;
+ }
+
+ glfs_close_with_xdata(fd, dict);
+ glfs_h_close(obj);
+
+ } else if (rd->op == GF_FOP_FTRUNCATE) {
+ } else if (rd->op == GF_FOP_SYMLINK) {
+ } else if ((rd->op == GF_FOP_RMDIR) || (rd->op == GF_FOP_UNLINK) ||
+ (rd->op == GF_FOP_MKNOD) || (rd->op == GF_FOP_CREATE) ||
+ (rd->op == GF_FOP_LINK) || (rd->op == GF_FOP_MKDIR)) {
+ } else if (rd->op == GF_FOP_RENAME) {
+ } else if ((rd->op == GF_FOP_FREMOVEXATTR) ||
+ (rd->op == GF_FOP_REMOVEXATTR) ||
+ (rd->op == GF_FOP_SETXATTR) ||
+ (rd->op == GF_FOP_FSETXATTR)) {
+
+ struct glfs_fd *fd = NULL;
+ struct glfs_object *obj = NULL;
+ uuid_t gfid;
+
+ uuid_parse(ri->rec.gfid, gfid);
+
+
+ // This is for all the set attribute/extended attributes commands.
+ // Get all the attributes from the source and fill it in the buffer
+ // as a NULL seperated key and value which are in turn seperated by
+ // NULL.
+ uint32_t k_s = 0, v_s = 0;
+ char *t_b= NULL;
+ uint32_t num=0;
+
+
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "doing getattr for gfid %s \n",
+ ri->rec.gfid);
+
+ obj = glfs_h_create_from_handle(ctx->fs, gfid, GFAPI_HANDLE_LENGTH, NULL);
+ if (obj == NULL) {
+ GF_ASSERT(fd != NULL);
+ nsr_worker_log(this->name, GF_LOG_ERROR,
+ "creating of handle failed\n");
+ goto read_out;
+ }
+
+ if (obj->inode->ia_type == IA_IFDIR)
+ fd = glfs_h_opendir_with_xdata(ctx->fs, obj, dict);
+ else
+ fd = glfs_h_open_with_xdata(ctx->fs, obj, O_RDONLY, dict);
+
+ if (fd == NULL) {
+ GF_ASSERT(fd != NULL);
+ nsr_worker_log(this->name, GF_LOG_ERROR,
+ "opening of file failed\n");
+ goto read_out;
+ }
+
+ if(get_xattr_total_size(fd, &t_b, &k_s, &v_s, &num, dict) == -1) {
+ if (t_b) free(t_b);
+ GF_ASSERT(0);
+ nsr_worker_log(this->name, GF_LOG_ERROR,
+ "list of xattr of gfid %s failed\n", rd->gfid);
+ goto read_out;
+ }
+ ri->work.data = GF_CALLOC((k_s + v_s) , sizeof(char), gf_mt_recon_private_t);
+ get_xattr(fd, t_b, ri->work.data, v_s, num, dict);
+ ri->work.num = num;
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "finished getattr for gfid %s \n",
+ ri->rec.gfid);
+ free(t_b);
+ goto read_out;
+
+ } else if ((rd->op == GF_FOP_FSETATTR) ||
+ (rd->op == GF_FOP_SETATTR)) {
+
+ //TBD - to get the actual attrbutes and fill
+ // mode, uid, gid, size, atime, mtime
+ }
+read_out:
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "finished recon read for gfid %s at offset %d for %d bytes \n",
+ rd->gfid, rd->offset, rd->len);
+ break;
+ }
+ case NSR_WORK_ID_SINGLE_RECONCILIATION_COMMIT:
+ {
+ dict_t * dict = NULL;
+ // first_index always starts with 1 but records starts at 0.
+ wip = work->index - (dr->workers[0].recon_info->first_index);
+ ri = &(dr->workers[0].recon_info->records[wip]);
+ rd = &(ri->rec);
+
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "got recon commit for index %d that has gfid %s \n",
+ wip, rd->gfid);
+ dict = dict_new ();
+ if (!dict) {
+ GF_ASSERT(0);
+ nsr_worker_log(this->name, GF_LOG_ERROR,
+ "failed allocating for dictionary\n");
+ goto commit_out;
+ }
+ if (dict_set_int32(dict,RECON_TERM_XATTR,ri->work.term)) {
+ GF_ASSERT(0);
+ nsr_worker_log(this->name, GF_LOG_ERROR,
+ "error setting term in dict\n");
+ goto commit_out;
+ }
+ if (dict_set_int32(dict,RECON_INDEX_XATTR,ri->work.index)) {
+ GF_ASSERT(0);
+ nsr_worker_log(this->name, GF_LOG_ERROR,
+ "error setting term in dict\n");
+ goto commit_out;
+ }
+ apply_record(ctx, ri, dict);
+commit_out:
+ dict_unref (dict);
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "finished recon commit for gfid %s \n",
+ rd->gfid);
+ break;
+ }
+ case NSR_WORK_ID_SINGLE_RECONCILIATION_FLUSH:
+ {
+ dict_t * dict = NULL;
+ dict = dict_new ();
+ if (!dict) {
+ GF_ASSERT(0);
+ nsr_worker_log(this->name, GF_LOG_ERROR,
+ "failed allocating for dictionary\n");
+ goto commit_out;
+ }
+ if (dict_set_int32(dict,RECON_TERM_XATTR,ri->work.term)) {
+ GF_ASSERT(0);
+ nsr_worker_log(this->name, GF_LOG_ERROR,
+ "error setting term in dict\n");
+ goto commit_out;
+ }
+ if (dict_set_int32(dict,RECON_INDEX_XATTR,ri->work.index)) {
+ GF_ASSERT(0);
+ nsr_worker_log(this->name, GF_LOG_ERROR,
+ "error setting term in dict\n");
+ goto commit_out;
+ }
+
+ // Increment work index with the start index
+ wip = work->index - (dr->workers[0].recon_info->first_index);
+ ri = &(dr->workers[0].recon_info->records[wip]);
+ rd = &(ri->rec);
+ //fd = glfs_open(ctx->fs, rd->gfid, O_RDONLY); //TBD - using gfid
+
+ glfs_fsync_with_xdata(fd, dict);
+ break;
+ }
+ }
+ return;
+}
+
+// thread for doing data work
+static void *
+data_worker_main(nsr_per_node_worker_t *ctx)
+{
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "starting data worker func\n");
+ init_worker(ctx, 0);
+
+ while(1) {
+ nsr_recon_work_t *work = NULL;
+ nsr_recon_driver_ctx_t *dr = ctx->driver_ctx;
+
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "waiting for work\n");
+
+ pthread_mutex_lock(&(ctx->mutex));
+ while (list_empty(&(ctx->head.list))) {
+ pthread_cond_wait(&(ctx->cv), &(ctx->mutex));
+ }
+ pthread_mutex_unlock(&(ctx->mutex));
+ list_for_each_entry(work, &(ctx->head.list), list) {
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "got work with id %d\n",work->req_id);
+ work->in_use = _gf_false;
+ data_worker_func(ctx, work);
+ atomic_dec(&(dr->outstanding));
+ break;
+ }
+ nsr_worker_log(this->name, GF_LOG_INFO,"deleting work item\n");
+ list_del_init (&work->list);
+ nsr_worker_log(this->name, GF_LOG_INFO,"finished deleting work item\n");
+ }
+
+ return NULL;
+}
+
+
+//make recon work
+static void
+recon_make_work(nsr_recon_work_t **work,
+ nsr_recon_work_req_id_t req_id,
+ int32_t i)
+{
+ // TBD - change this to get from a static pool
+ // This cannot fail
+ (*work) = GF_CALLOC (1, sizeof (nsr_recon_work_t), gf_mt_recon_private_t);
+ (*work)->req_id = req_id;
+ (*work)->index = i;
+ (*work)->in_use = _gf_true;
+ INIT_LIST_HEAD(&((*work)->list));
+ return;
+}
+
+// Schedule a work object to a worker thread.
+static void
+recon_queue_to_worker(nsr_recon_driver_ctx_t *ctx,
+ nsr_recon_work_t *work,
+ unsigned int id,
+ nsr_recon_queue_type_t type)
+{
+ nsr_per_node_worker_t *worker;
+ if (type == NSR_RECON_QUEUE_TO_CONTROL) {
+ worker = ctx->workers[id].control_worker;
+ nsr_driver_log(this->name, GF_LOG_INFO,
+ "queueing work to control index %d\n",id);
+ } else {
+ worker= ctx->workers[id].data_worker;
+ nsr_driver_log(this->name, GF_LOG_INFO,
+ "queueing work to data index %d\n",id);
+ }
+ pthread_mutex_lock(&worker->mutex);
+ list_add_tail(&work->list, &worker->head.list);
+ pthread_cond_signal(&worker->cv);
+ pthread_mutex_unlock(&worker->mutex);
+ return;
+}
+
+typedef void * (*F_t) (void *);
+
+// In case mode is set to NSR_USE_THREADS, create worker threads.
+static gf_boolean_t
+create_worker_threads(nsr_recon_private_t *priv,
+ nsr_recon_driver_ctx_t *ctx,
+ nsr_per_node_worker_t *w,
+ gf_boolean_t control_or_data,
+ F_t f,
+ uint32_t num)
+{
+ uint32_t i;
+ nsr_per_node_worker_t *worker = w;
+
+
+ for (i=0; i < num; i++) {
+ worker->id = GF_CALLOC(1, 10, gf_mt_recon_private_t);
+ if (!worker->id) {
+ nsr_driver_log (priv->this->name, GF_LOG_ERROR, "memory allocation error \n");
+ return _gf_false;
+ }
+ sprintf(worker->id,"recon_%d", i);
+ worker->driver_ctx = ctx ;
+
+ if (ctx->mode == NSR_USE_THREADS) {
+ if (pthread_create(&worker->thread_id, NULL, f, worker)) {
+ nsr_driver_log (ctx->this->name, GF_LOG_ERROR, "control work thread creation error \n");
+ return _gf_false;
+ }
+ }
+ worker->index = i;
+ worker++;
+ }
+ return _gf_true;
+}
+
+/*
+ * In case of thread, send the work item; else call the function directly.
+ *
+ * Input arguments:
+ * bm - bitmap containing indices of nodes we want to send work
+ * num - number of such indices
+ * ctx - driver context from where we derive per worker context
+ * id - request ID
+ * q - control or data
+ * misc - used to overload such as index.
+ */
+static void
+send_and_wait(int32_t bm,
+ uint32_t num,
+ nsr_recon_driver_ctx_t *ctx,
+ nsr_recon_work_req_id_t id,
+ nsr_recon_queue_type_t q,
+ int32_t misc)
+{
+ uint32_t i = 0;
+ nsr_recon_work_t *work;
+
+ if (ctx->mode == NSR_SEQ) {
+ for (i=0; i < num; i++) {
+ if ((bm & (1 << i)) && ctx->workers[i].in_use) {
+ recon_make_work(&work, id, misc);
+ if (q == NSR_RECON_QUEUE_TO_CONTROL) {
+ if (i == 0)
+ control_worker_func_0(ctx->workers[0].control_worker, work);
+ else
+ control_worker_func(ctx->workers[i].control_worker, work);
+ } else {
+ data_worker_func(ctx->workers[i].data_worker, work);
+ }
+ }
+ }
+ nsr_driver_log(this->name, GF_LOG_INFO, "send_and_wait: all workers have returned\n");
+ return;
+ }
+
+ for (i=0; i < num; i++) {
+ if ((bm & (1 << i)) && ctx->workers[i].in_use) {
+ recon_make_work(&work, id, misc);
+ atomic_inc(&(ctx->outstanding));
+ recon_queue_to_worker(ctx, work, i, q);
+ }
+ }
+
+ nsr_driver_log(this->name, GF_LOG_INFO, "send_and_wait: waiting\n");
+ while (ctx->outstanding) {
+ pthread_yield();
+ }
+ nsr_driver_log(this->name, GF_LOG_INFO, "send_and_wait: all workers have returned\n");
+ return;
+}
+
+#if 0
+static void
+send_and_do_not_wait(int32_t bm,
+ uint32_t num,
+ nsr_recon_driver_ctx_t *ctx,
+ nsr_recon_work_req_id_t id,
+ nsr_recon_queue_type_t q,
+ int32_t misc)
+{
+ uint32_t i = 0;
+
+ for (i=0; i < num; i++) {
+ if ((bm & (1 << i)) && ctx->workers[i].in_use) {
+ nsr_recon_work_t *work;
+ recon_make_work(&work, id, misc);
+ recon_queue_to_worker(ctx, work, i, q);
+ }
+ }
+
+ return;
+}
+#endif
+
+// send INI or FINI
+static void
+nsr_recon_in_use(nsr_recon_driver_ctx_t *ctx,
+ uint32_t i,
+ gf_boolean_t in_use)
+{
+ uint32_t bm = 1 << i;
+ gf_boolean_t send = _gf_false;
+
+ if (in_use == _gf_false) {
+ if (ctx->workers[i].in_use == _gf_true)
+ send = _gf_true;
+ ctx->workers[i].in_use = _gf_false;
+ } else {
+ if (ctx->workers[i].in_use != _gf_true) {
+ ctx->workers[i].in_use = _gf_true;
+ send = _gf_true;
+ }
+ }
+#if 1
+ if (send == _gf_true) {
+ if (in_use == _gf_true) {
+ nsr_driver_log(this->name, GF_LOG_INFO, "sending INI to index %d\n",i);
+ } else {
+ nsr_driver_log(this->name, GF_LOG_INFO, "sending FINI to index %d\n",i);
+ }
+ send_and_wait(bm, ctx->replica_group_size, ctx,
+ (in_use == _gf_true) ? NSR_WORK_ID_INI : NSR_WORK_ID_FINI,
+ NSR_RECON_QUEUE_TO_CONTROL, -1);
+ send_and_wait(bm, ctx->replica_group_size, ctx,
+ (in_use == _gf_true) ? NSR_WORK_ID_INI : NSR_WORK_ID_FINI,
+ NSR_RECON_QUEUE_TO_DATA, -1);
+ }
+#endif
+}
+
+// main recon driver thread
+void *
+nsr_reconciliation_driver(void *arg)
+{
+ nsr_recon_private_t *priv = (nsr_recon_private_t *) arg;
+ uint32_t replica_group_size = priv->replica_group_size;
+ uint32_t i;
+ nsr_per_node_worker_t *control_s, *data_s;
+ nsr_recon_driver_ctx_t **driver_ctx, *ctx;
+ int32_t bm;
+ xlator_t *this = priv->this;
+
+ driver_ctx = &priv->driver_thread_context;
+ (*driver_ctx) = GF_CALLOC (1,
+ sizeof (nsr_recon_driver_ctx_t),
+ gf_mt_recon_private_t);
+ if (!driver_ctx) {
+ gf_log (this->name, GF_LOG_ERROR, "memory allocation error \n");
+ return NULL;
+ }
+ ctx = *driver_ctx;
+ ctx->this = priv->this;
+ ctx->replica_group_size = replica_group_size;
+ if ((pthread_mutex_init(&(ctx->mutex), NULL)) ||
+ (pthread_cond_init(&(ctx->cv), NULL))){
+ nsr_driver_log (this->name, GF_LOG_ERROR, "mutex init error \n");
+ return NULL;
+ }
+
+ ctx->workers = GF_CALLOC (replica_group_size,
+ sizeof(nsr_replica_worker_t),
+ gf_mt_recon_private_t);
+ if (!ctx->workers) {
+ nsr_driver_log (this->name, GF_LOG_ERROR, "memory allocation error \n");
+ return NULL;
+ }
+ for (i=0; i < replica_group_size; i++) {
+ strcpy(ctx->workers[i].name, priv->replica_group_members[i]);
+ }
+
+ control_s = GF_CALLOC (replica_group_size,
+ sizeof(nsr_per_node_worker_t),
+ gf_mt_recon_private_t);
+ if (!control_s) {
+ nsr_driver_log (this->name, GF_LOG_ERROR, "memory allocation error \n");
+ return NULL;
+ }
+
+ data_s = GF_CALLOC (replica_group_size,
+ sizeof(nsr_per_node_worker_t),
+ gf_mt_recon_private_t);
+ if (!data_s) {
+ nsr_driver_log (this->name, GF_LOG_ERROR, "memory allocation error \n");
+ return NULL;
+ }
+ for (i=0; i < replica_group_size; i++) {
+ ctx->workers[i].control_worker = &control_s[i];
+ ctx->workers[i].data_worker = &data_s[i];
+ }
+
+ nsr_driver_log (this->name, GF_LOG_INFO, "creating threads \n");
+ // Create the worker threads
+ // For every brick including itself there will be 2 worker threads:
+ // one for data and one for control
+ if (!create_worker_threads(priv, ctx, control_s, _gf_true,
+ (F_t) control_worker_main, replica_group_size) ||
+ !create_worker_threads(priv, ctx, data_s, _gf_false,
+ (F_t) data_worker_main, replica_group_size)) {
+ return NULL;
+ }
+
+ for (i=0; i < replica_group_size; i++) {
+ nsr_recon_get_file(priv->volname, &(ctx->workers[i]));
+ }
+
+ while (1) {
+
+ nsr_driver_log (this->name, GF_LOG_INFO, "waiting for state change \n");
+ pthread_mutex_lock(&(ctx->mutex));
+ while ((*driver_ctx)->state == 0) {
+ pthread_cond_wait(&(ctx->cv), &(ctx->mutex));
+ }
+ pthread_mutex_unlock(&(ctx->mutex));
+
+ nsr_driver_log (this->name, GF_LOG_INFO, " state changed to %d \n", ctx->state);
+#if 0
+ for (i=0; i < replica_group_size; i++) {
+ if (ctx->workers[i].in_use) {
+ nsr_recon_start_work(ctx->workers[i].control_worker, _gf_true);
+ nsr_recon_start_work(ctx->workers[i].data_worker, _gf_false);
+ }
+ }
+#endif
+
+ if (ctx->state == leader) {
+
+ int32_t chosen = -1;
+ int32_t last_term = -1, last_ops = -1;
+
+ nsr_driver_log (this->name, GF_LOG_INFO, "getting last term info from all members of this group\n");
+ // Get last term info from all members for this group
+ send_and_wait(-1,
+ replica_group_size,
+ ctx,
+ NSR_WORK_ID_GET_LAST_TERM_INFO,
+ NSR_RECON_QUEUE_TO_CONTROL, ctx->current_term);
+
+
+ // compare all the info received and choose the reconciliator
+ // First choose all with latest term
+ for (i=0; i < replica_group_size; i++) {
+ if (ctx->workers[i].in_use) {
+ if (ctx->workers[i].recon_info->last_term > last_term) {
+ last_term = ctx->workers[i].recon_info->last_term;
+ }
+ }
+ }
+ // First choose all with latest term and highest ops
+ for (i=0; i < replica_group_size; i++) {
+ if ((ctx->workers[i].in_use) && (last_term == ctx->workers[i].recon_info->last_term)) {
+ if (ctx->workers[i].recon_info->commited_ops > last_ops) {
+ last_ops = ctx->workers[i].recon_info->commited_ops;
+ }
+ }
+ }
+ // choose the first among the lot
+ for (i=0; i < replica_group_size; i++) {
+ if ((ctx->workers[i].in_use) &&
+ (last_term == ctx->workers[i].recon_info->last_term) &&
+ (last_ops == ctx->workers[i].recon_info->commited_ops)) {
+ chosen = i;
+ break;
+ }
+ }
+
+ nsr_driver_log (this->name, GF_LOG_INFO, "reconciliator chosen is %d\n", chosen);
+ ctx->reconciliator_index = chosen;
+ GF_ASSERT(chosen != -1);
+ if (chosen == -1) {
+ nsr_driver_log (this->name, GF_LOG_INFO, "no reconciliatior chosen\n");
+ goto out;
+ }
+
+ // send the message to reconciliator to do reconciliation with list of nodes that are part of this quorum
+ if (chosen != 0) {
+ nsr_driver_log (this->name, GF_LOG_INFO, "sending reconciliation work to %d\n", chosen);
+ bm = 1 << ctx->reconciliator_index;
+ send_and_wait(bm,
+ replica_group_size,
+ ctx,
+ NSR_WORK_ID_RECONCILIATOR_DO_WORK,
+ NSR_RECON_QUEUE_TO_CONTROL, -1);
+ nsr_driver_log (this->name, GF_LOG_INFO, "finished reconciliation work to %d\n", chosen);
+ } else {
+ nsr_driver_log (this->name, GF_LOG_INFO, "local node is reconciliator. before set jmp\n");
+ ctx->env = calloc(1,sizeof(jmp_buf));
+ /*
+ * REVIEW
+ * Use of setjmp/longjmp in an environment
+ * where we already use ucontext is dangerous
+ * and therefore forbidden. Refactoring will
+ * also help with some of the rampant 80-column
+ * violations and indented code crawling across
+ * the screen, which together make this entire
+ * file almost unreadable.
+ */
+ if (!setjmp(*(ctx->env))) {
+ ctx->state = reconciliator;
+ goto i_am_reconciliator;
+ } else {
+ nsr_driver_log (this->name, GF_LOG_INFO, "long jmp return to leader\n");
+ free(ctx->env);
+ ctx->env = NULL;
+ ctx->state = leader;
+ }
+ }
+
+ // send message to all other nodes to sync up with the reconciliator including itself if required
+ // requires optimisation - TBD
+ if (chosen != 0) {
+ nsr_driver_log (this->name, GF_LOG_INFO, "local node resolution needs to be done. before set jmp\n");
+ ctx->env = calloc(1,sizeof(jmp_buf));
+ if (!setjmp(*(ctx->env))) {
+ ctx->state = resolutor;
+ goto i_am_resolutor;
+ } else {
+ nsr_driver_log (this->name, GF_LOG_INFO, "long jmp return to leader\n");
+ free(ctx->env);
+ ctx->env = NULL;
+ ctx->state = leader;
+ }
+ }
+
+ nsr_driver_log (this->name, GF_LOG_INFO, "sending resolution work to all nodes except this node and reconciliator\n");
+ bm = ~((1 << ctx->reconciliator_index) || 1);
+ send_and_wait(bm,
+ replica_group_size,
+ ctx,
+ NSR_WORK_ID_RESOLUTION_DO_WORK,
+ NSR_RECON_QUEUE_TO_CONTROL, -1);
+
+ nsr_driver_log (this->name, GF_LOG_INFO, "finished reconciliation work as leader \n");
+
+ }
+i_am_reconciliator:
+ if (ctx->state == reconciliator) {
+ gf_boolean_t do_recon = _gf_false;
+ uint32_t start_index = ctx->workers[0].recon_info->first_index;
+ uint32_t end_index = ctx->workers[0].recon_info->last_index;
+ uint32_t num = ((start_index == 0) && (end_index == 0)) ? 0 : (end_index - start_index + 1);
+
+ nsr_driver_log (this->name, GF_LOG_INFO, "starting reconciliation work as reconciliator \n");
+
+ // nothing to be done? signal back to the recon translator that this phase done.
+ bm = 1;
+ for (i=1; i < replica_group_size; i++) {
+ if (ctx->workers[i].in_use &&
+ (ctx->workers[0].recon_info->last_term == ctx->workers[i].recon_info->last_term)) {
+ ctx->workers[i].recon_info->last_index = end_index;
+ ctx->workers[i].recon_info->first_index = start_index;
+ bm = (1 << i);
+ do_recon = _gf_true;
+ }
+ }
+
+ if (!do_recon || !num) {
+ nsr_driver_log (this->name, GF_LOG_INFO, "nothing needs to be done as resolutor \n");
+ if (ctx->env) {
+ nsr_driver_log (this->name, GF_LOG_INFO, "before longjmp \n");
+ longjmp(*(ctx->env), 1);
+ } else {
+ goto out;
+ }
+ }
+
+ nsr_driver_log (this->name, GF_LOG_INFO,
+ "getting reconciliation window for term %d from %dto %d \n",
+ ctx->workers[0].recon_info->last_term,
+ start_index, end_index);
+ // We have set the bm in the above for loop where
+ // we go thru all nodes including this node that
+ // have seen the last term.
+ send_and_wait(bm,
+ replica_group_size,
+ ctx,
+ NSR_WORK_ID_GET_RECONCILATION_WINDOW,
+ NSR_RECON_QUEUE_TO_CONTROL, -1);
+ nsr_driver_log (this->name, GF_LOG_INFO,
+ "finished getting reconciliation window for term %d from %dto %d \n",
+ ctx->workers[0].recon_info->last_term,
+ start_index, end_index);
+
+
+ // from the changelogs, calculate the entries
+ // that need action and the source for each of these entries
+ compute_reconciliation_work(ctx);
+
+ // for each of the entries that need fixup, issue IO
+ for (i=start_index; i < (start_index + num); i++) {
+ nsr_reconciliator_info_t *my_recon_info =
+ ctx->workers[0].recon_info;
+ nsr_reconciliation_record_t *record =
+ &(my_recon_info->records[i - start_index]);
+
+ record->work.term = ctx->workers[0].recon_info->last_term;
+ record->work.index = i;
+
+ nsr_driver_log (this->name, GF_LOG_INFO,
+ "fixing index %d\n",i);
+ if ((record->work.type == NSR_RECON_WORK_HOLE_TO_PSEUDO_HOLE) ||
+ (record->work.type == NSR_RECON_WORK_HOLE_TO_FILL)) {
+ // 1st case (RECON_WORK_HOLE_TO_PSEUDO_HOLE):
+ // If there are only pseudo_holes in others, it is best effort.
+ // Just pick from the first node that has it and proceed.
+ // 2nd case (RECON_WORK_HOLE_TO_FILL):
+ // this node has either a HOLE or PSUEDO_HOLE and some one else has a FILL(source).
+ // analyse the changelog to check if data needs to be read or if the log has all the data required
+
+ if (recon_check_changelog(&record->rec)) {
+ bm = (1 << record->work.source);
+ nsr_driver_log (this->name, GF_LOG_INFO,
+ "reading data from source %d\n",record->work.source);
+ send_and_wait(bm,
+ replica_group_size,
+ ctx,
+ NSR_WORK_ID_SINGLE_RECONCILIATION_READ,
+ NSR_RECON_QUEUE_TO_DATA,
+ i);
+ nsr_driver_log (this->name, GF_LOG_INFO,
+ "got data from source %d\n",record->work.source);
+ }
+
+ nsr_driver_log (this->name, GF_LOG_INFO,
+ "fixing local data as part of reconciliation\n");
+
+ bm = 1;
+ send_and_wait(bm,
+ replica_group_size,
+ ctx,
+ NSR_WORK_ID_SINGLE_RECONCILIATION_COMMIT,
+ NSR_RECON_QUEUE_TO_DATA,
+ i);
+
+ nsr_driver_log (this->name, GF_LOG_INFO,
+ "finished fixing local data as part of reconciliation\n");
+
+ } else if (record->work.type == NSR_RECON_WORK_COMPARE_PSEUDO_HOLE) {
+ // this node has a pseudo_hole and some others have just that too. Just convert this to FILL.
+ // let others blindly pick it from here.
+ nsr_driver_log (this->name, GF_LOG_INFO,
+ "fixing this record as a fill\n");
+ bm = 1;
+ send_and_wait(bm,
+ replica_group_size,
+ ctx,
+ NSR_WORK_ID_SINGLE_RECONCILIATION_FLUSH,
+ NSR_RECON_QUEUE_TO_DATA,
+ i);
+ nsr_driver_log (this->name, GF_LOG_INFO,
+ "finished fixing this record as a fill\n");
+ }
+ }
+
+ nsr_driver_log (this->name, GF_LOG_INFO, "finished reconciliation work as reconciliator \n");
+
+ if (ctx->env) {
+ nsr_driver_log (this->name, GF_LOG_INFO, "before longjmp \n");
+ longjmp(*(ctx->env), 1);
+ }
+
+ // tbd - mark this term golden in the reconciliator
+
+ }
+i_am_resolutor:
+ if (ctx->state == resolutor) {
+
+ // This node's last term is filled when it gets a message
+ // from the leader to act as a reconciliator.
+ uint32_t recon_index = ctx->reconciliator_index;
+ nsr_reconciliator_info_t *my_info =
+ ctx->workers[0].recon_info;
+ nsr_reconciliator_info_t *his_info =
+ ctx->workers[recon_index].recon_info;
+ uint32_t my_last_term = my_info->last_term;
+ uint32_t to_do_term = his_info->last_term;
+ uint32_t my_start_index = 1, my_end_index = 1;
+ uint32_t his_start_index = 1, his_end_index = 1;
+ uint32_t num = 0;
+ gf_boolean_t fl = _gf_true;
+
+ nsr_driver_log (this->name, GF_LOG_INFO,
+ "starting resolutor work with reconciliator as %d from term %d to term %d \n",
+ recon_index, my_last_term, to_do_term);
+
+ do {
+
+ if (!fl) {
+ (his_info->last_term)++;
+ (my_info->last_term)++;
+ } else {
+ his_info->last_term = my_last_term;
+ }
+
+ nsr_driver_log (this->name, GF_LOG_INFO, "resolving term %d \n", my_info->last_term);
+
+ // Get reconciliator's term information for that term
+ nsr_driver_log (this->name, GF_LOG_INFO,
+ "getting info from reconciliator for term %d \n", my_info->last_term);
+ bm = (1 << recon_index);
+ send_and_wait(bm,
+ replica_group_size,
+ ctx,
+ NSR_WORK_ID_GET_GIVEN_TERM_INFO,
+ NSR_RECON_QUEUE_TO_CONTROL, his_info->last_term);
+ nsr_driver_log (this->name, GF_LOG_INFO,
+ "finished getting info from reconciliator for term %d \n", my_info->last_term);
+
+
+ // empty term
+ if (!his_info->commited_ops) {
+ nsr_driver_log (this->name, GF_LOG_INFO,
+ "reconciliator for term %d is empty. moving to next term. \n", my_info->last_term);
+ // TBD - mark the term golden
+ fl = _gf_false;
+ continue;
+ }
+
+ // calculate the resolution window boundary.
+ // for the last term this node saw, we compare the resolution window of this and reconciliator.
+ // for the rest of the nodes, we just accept the reconciliator info.
+ if (fl) {
+ my_start_index = my_info->first_index;
+ my_end_index = my_info->last_index;
+ his_start_index = his_info->first_index;
+ his_end_index = his_info->last_index;
+ my_info->first_index = (my_start_index < his_start_index) ? my_start_index : his_start_index;
+ my_info->last_index = (my_end_index > his_end_index) ? my_end_index : his_end_index;
+ } else {
+ my_info->first_index = his_info->first_index;
+ my_info->last_index = his_info->last_index;
+ my_info->commited_ops = his_info->commited_ops;
+ }
+ if (my_info->first_index == 0)
+ my_info->first_index = 1;
+ num = (my_info->last_index - my_info->first_index) + 1;
+
+
+ // Get the logs from the reconciliator (and this node for this term)
+ if (fl)
+ bm = ((1 << recon_index) | 1);
+ else
+ bm = (1 << recon_index);
+
+ nsr_driver_log (this->name, GF_LOG_INFO,
+ "getting reconciliation window for term %d from %d to %d \n",
+ my_info->last_term,
+ my_info->first_index, my_info->last_index);
+ send_and_wait(bm,
+ replica_group_size,
+ ctx,
+ NSR_WORK_ID_GET_RECONCILATION_WINDOW,
+ NSR_RECON_QUEUE_TO_CONTROL, -1);
+ nsr_driver_log (this->name, GF_LOG_INFO,
+ "finished getting reconciliation window for term %d from %d to %d \n",
+ my_info->last_term,
+ my_info->first_index, my_info->last_index);
+
+ // from the changelogs, calculate the entries that need action
+ compute_resolution_work(ctx, my_info, his_info, !fl);
+
+
+ // for each of the entries that need fixup, issue IO
+ for (i=my_info->first_index; i < (my_info->first_index + num); i++) {
+ nsr_reconciliation_record_t *record =
+ &(my_info->records[i - my_info->first_index]);
+
+ record->work.term = my_info->last_term;
+ record->work.index = i;
+
+ nsr_driver_log (this->name, GF_LOG_INFO, "fixing index %d\n",i);
+ if ((record->work.type == NSR_RECON_WORK_HOLE_TO_FILL) ||
+ (record->work.type == NSR_RECON_WORK_UNDO_FILL)) {
+ if (((record->work.type == NSR_RECON_WORK_HOLE_TO_FILL) &&
+ recon_check_changelog(&record->rec)) ||
+ ((record->work.type == NSR_RECON_WORK_UNDO_FILL) &&
+ recon_compute_undo(&record->rec))) {
+ nsr_driver_log (this->name, GF_LOG_INFO,
+ "reading data from source %d\n",recon_index);
+ bm = (1 << recon_index);
+ send_and_wait(bm,
+ replica_group_size,
+ ctx,
+ NSR_WORK_ID_SINGLE_RECONCILIATION_READ,
+ NSR_RECON_QUEUE_TO_DATA,
+ i);
+ nsr_driver_log (this->name, GF_LOG_INFO,
+ "finished reading data from source %d\n",recon_index);
+ }
+
+ nsr_driver_log (this->name, GF_LOG_INFO,
+ "fixing local data as part of resolutor\n");
+
+ bm = 1;
+ send_and_wait(bm,
+ replica_group_size,
+ ctx,
+ NSR_WORK_ID_SINGLE_RECONCILIATION_COMMIT,
+ NSR_RECON_QUEUE_TO_DATA,
+ i);
+
+ nsr_driver_log (this->name, GF_LOG_INFO,
+ "finished fixing local data as part of resolutor\n");
+ }
+ }
+ fl = _gf_false;
+
+ // tbd - mark this term golden in the reconciliator
+ } while (my_last_term++ != to_do_term);
+
+ nsr_driver_log (this->name, GF_LOG_INFO,
+ "finished resolutor work \n");
+
+ if (ctx->env) {
+ nsr_driver_log (this->name, GF_LOG_INFO, "before longjmp \n");
+ longjmp(*(ctx->env), 1);
+ }
+
+ }
+
+ // free the asasociated recon_info contexts created as part of this role
+
+out:
+ nsr_driver_log (this->name, GF_LOG_INFO,
+ "sending end of reconciliation message \n");
+ nsr_recon_return_back(priv, ctx->txn_id);
+#if 0
+ // send message that job is done by writing to local recon translator
+ bm = 1;
+ send_and_wait(bm,
+ replica_group_size,
+ ctx,
+ NSR_WORK_ID_END_RECONCILIATION,
+ NSR_RECON_QUEUE_TO_CONTROL, -1);
+#endif
+ nsr_driver_log (this->name, GF_LOG_INFO,
+ "finished sending end of reconciliation message \n");
+ ctx->state = 0;
+ }
+
+ return NULL;
+}
diff --git a/xlators/cluster/nsr-recon/src/recon_driver.h b/xlators/cluster/nsr-recon/src/recon_driver.h
new file mode 100644
index 000000000..67f4d6014
--- /dev/null
+++ b/xlators/cluster/nsr-recon/src/recon_driver.h
@@ -0,0 +1,308 @@
+/*
+ Copyright (c) 2008-2012 Red Hat, Inc. <http://www.redhat.com>
+ This file is part of GlusterFS.
+
+ This file is licensed to you under your choice of the GNU Lesser
+ General Public License, version 3 or any later version (LGPLv3 or
+ later), or the GNU General Public License, version 2 (GPLv2), in all
+ cases as published by the Free Software Foundation.
+*/
+
+#ifndef __RECON_DRIVER_H__
+#define __RECON_DRIVER_H__
+
+
+#include "api/src/glfs.h"
+#include <setjmp.h>
+
+#define MAX_HOSTNAME_LEN 32
+#define MAXIMUM_REPLICA_STRENGTH 8
+#define MAX_RECONCILIATION_WINDOW_SIZE 10000
+
+#define GLUSTERD_DEFAULT_WORKDIR "/var/lib/glusterd"
+#define GLUSTERD_VOLUME_DIR_PREFIX "vols"
+#define GLUSTERD_BRICK_INFO_DIR "bricks"
+
+/*
+ * Even with the names fixed, the non-NSR_DEBUG definitions of nsr_*_log don't
+ * work because many callers don't have "this" defined.
+ *
+ * TBD: use gf_log, fix "this" problem, eliminate extra fields and newlines.
+ */
+#define NSR_DEBUG
+
+typedef enum nsr_recon_work_req_id_t {
+ NSR_WORK_ID_GET_NONE = 0,
+ NSR_WORK_ID_GET_LAST_TERM_INFO = NSR_WORK_ID_GET_NONE + 1,
+ NSR_WORK_ID_GET_GIVEN_TERM_INFO = NSR_WORK_ID_GET_LAST_TERM_INFO + 1,
+ NSR_WORK_ID_RECONCILIATOR_DO_WORK = NSR_WORK_ID_GET_GIVEN_TERM_INFO + 1,
+ NSR_WORK_ID_RESOLUTION_DO_WORK = NSR_WORK_ID_RECONCILIATOR_DO_WORK + 1,
+ NSR_WORK_ID_GET_RECONCILATION_WINDOW = NSR_WORK_ID_RESOLUTION_DO_WORK + 1,
+ NSR_WORK_ID_SINGLE_RECONCILIATION_READ = NSR_WORK_ID_GET_RECONCILATION_WINDOW + 1,
+ NSR_WORK_ID_SINGLE_RECONCILIATION_COMMIT = NSR_WORK_ID_SINGLE_RECONCILIATION_READ + 1,
+ NSR_WORK_ID_SINGLE_RECONCILIATION_FLUSH = NSR_WORK_ID_SINGLE_RECONCILIATION_COMMIT + 1,
+ NSR_WORK_ID_GET_RESOLUTION_WINDOW = NSR_WORK_ID_SINGLE_RECONCILIATION_FLUSH + 1,
+ NSR_WORK_ID_END_RECONCILIATION = NSR_WORK_ID_GET_RESOLUTION_WINDOW + 1,
+ NSR_WORK_ID_INI = NSR_WORK_ID_END_RECONCILIATION + 1,
+ NSR_WORK_ID_FINI = NSR_WORK_ID_INI + 1
+} nsr_recon_work_req_id_t;
+
+typedef enum nsr_recon_queue_type_t {
+ NSR_RECON_QUEUE_TO_CONTROL = 0,
+ NSR_RECON_QUEUE_TO_DATA =NSR_RECON_QUEUE_TO_CONTROL + 1,
+} nsr_recon_queue_type_t;
+
+typedef enum nsr_log_type_t {
+ NSR_LOG_HOLE = 0b0,
+ NSR_LOG_PSEUDO_HOLE = 0b1,
+ NSR_LOG_FILL = 0b11
+} nsr_log_type_t;
+
+typedef enum nsr_mode_t {
+ NSR_SEQ = 0,
+ NSR_USE_THREADS = 1,
+ NSR_ASYNC = 2
+} nsr_mode_t;
+
+typedef enum nsr_recon_work_type_t {
+ NSR_RECON_WORK_NONE = 0,
+ NSR_RECON_WORK_HOLE_TO_NOOP = NSR_RECON_WORK_NONE + 1,
+ NSR_RECON_WORK_HOLE_TO_PSEUDO_HOLE = NSR_RECON_WORK_HOLE_TO_NOOP + 1,
+ NSR_RECON_WORK_COMPARE_PSEUDO_HOLE = NSR_RECON_WORK_HOLE_TO_PSEUDO_HOLE + 1,
+ NSR_RECON_WORK_HOLE_TO_FILL = NSR_RECON_WORK_COMPARE_PSEUDO_HOLE + 1,
+ NSR_RECON_WORK_UNDO_FILL = NSR_RECON_WORK_HOLE_TO_FILL + 1,
+} nsr_recon_work_type_t;
+
+typedef enum nsr_recon_driver_state_t {
+ none = 0,
+ leader = 1,
+ reconciliator = 2,
+ resolutor = 3,
+} nsr_recon_driver_state_t;
+
+// role structure
+#pragma pack(push, 1)
+typedef struct _nsr_recon_role_s {
+ uint32_t role; // leader, reconciliator, resolutor
+ uint32_t num; // required in case state is reconciliator
+ uint32_t current_term; // current term used in case of leader
+ // In case this is reconciliator, num is set to nodes that were part
+ // of previous term.
+ // In case this is resolutor, num is set to 2.
+ // info[0] - information for this node.
+ // info[1] - information of the reconciliator.
+ // In case this is leader, num is set to this term's membership list
+ // set info.name to all members including the leader
+ struct {
+ int32_t last_term;
+ int32_t commited_ops;
+ uint32_t last_index;
+ uint32_t first_index;
+ char name[MAX_HOSTNAME_LEN];
+ } info[MAXIMUM_REPLICA_STRENGTH];
+} nsr_recon_role_t;
+#pragma pack(pop)
+
+#define ENDIAN_CONVERSION_RR(rr, is_true) \
+{ \
+ uint32_t i=0; \
+ uint32_t (*f)(uint32_t) = ((is_true == _gf_true) ? ntohl : htonl); \
+ if (is_true == _gf_true) rr.num = f(rr.num); \
+ rr.current_term = f(rr.current_term); \
+ for (i=0; i < rr.num; i++) { \
+ rr.info[i].last_term = f(rr.info[i].last_term); \
+ rr.info[i].commited_ops = f(rr.info[i].commited_ops); \
+ rr.info[i].last_index = f(rr.info[i].last_index); \
+ rr.info[i].first_index = f(rr.info[i].first_index); \
+ } \
+ if (is_true == _gf_false) rr.num = f(rr.num); \
+}
+
+// last term info structure
+#pragma pack(push, 1)
+typedef struct _nsr_recon_last_term_info_s {
+ int32_t last_term;
+ int32_t commited_ops;
+ uint32_t last_index;
+ uint32_t first_index;
+} nsr_recon_last_term_info_t;
+#pragma pack(pop)
+
+#define ENDIAN_CONVERSION_LT(lt, is_true) \
+{ \
+ uint32_t (*f)(uint32_t) = ((is_true == _gf_true) ? ntohl : htonl); \
+ lt.last_term = f(lt.last_term); \
+ lt.commited_ops = f(lt.commited_ops); \
+ lt.last_index = f(lt.last_index); \
+ lt.first_index = f(lt.first_index); \
+}
+
+// log information
+#pragma pack(push, 1)
+typedef struct _nsr_recon_log_info_s {
+ uint32_t term;
+ uint32_t first_index;
+ uint32_t last_index;
+} nsr_recon_log_info_t;
+#pragma pack(pop)
+
+#define ENDIAN_CONVERSION_LI(li, is_true) \
+{ \
+ uint32_t (*f)(uint32_t) = ((is_true == _gf_true) ? ntohl : htonl); \
+ li.term = f(li.term); \
+ li.first_index = f(li.first_index); \
+ li.last_index = f(li.last_index); \
+}
+
+#pragma pack(push, 1)
+typedef struct nsr_recon_record_details_s {
+ uint32_t type;
+ uint32_t op;
+ char gfid[36+1];
+ char pargfid[36+1];
+ char link_path[256]; // should it be PATH_MAX?
+ uint32_t offset;
+ uint32_t len;
+ char entry[128];
+ char newloc[128]; // for rename. can you overload link_path for this? TBD
+} nsr_recon_record_details_t;
+#pragma pack(pop)
+
+#define ENDIAN_CONVERSION_RD(rd, is_true) \
+{ \
+ uint32_t (*f)(uint32_t) = ((is_true == _gf_true) ? ntohl : htonl); \
+ rd.type = f(rd.type); \
+ rd.op = f(rd.op); \
+ rd.offset = f(rd.offset); \
+ rd.len = f(rd.len); \
+}
+
+typedef struct _nsr_recon_work_s {
+ gf_boolean_t in_use;
+ uint32_t index;
+ uint32_t req_id;
+ struct list_head list;
+} nsr_recon_work_t;
+
+typedef struct _nsr_reconciliation_work_s {
+ uint32_t term;
+ uint32_t index;
+ uint32_t type;
+ uint32_t source;
+ void *data;
+
+ uint32_t num; // used for xattr
+
+} nsr_reconciliation_work_t;
+
+typedef struct _nsr_reconciliation_record_s {
+ nsr_reconciliation_work_t work; // will store the computed work
+ nsr_recon_record_details_t rec;
+} nsr_reconciliation_record_t;
+
+typedef struct _nsr_reconciliator_info {
+ uint32_t reconcilator_index;
+ int32_t last_term;
+ int32_t commited_ops;
+ uint32_t last_index;
+ uint32_t first_index;
+ nsr_reconciliation_record_t records[MAX_RECONCILIATION_WINDOW_SIZE];
+} nsr_reconciliator_info_t;
+
+typedef struct _nsr_per_node_worker_s {
+ char *id; // identifier
+ char vol_file[256]; //volfile that will be used by this thread
+ glfs_t *fs;
+ glfs_fd_t *aux_fd;
+ uint32_t index; // index into array of workers
+ pthread_t thread_id; // thread id
+ void * context; // thread context
+ struct _nsr_recon_driver_ctxt *driver_ctx;
+ char local; // local data worker
+ //struct list_head list; //list of work items
+ nsr_recon_work_t head;
+ pthread_mutex_t mutex; //mutex to gaurd the above list
+ pthread_cond_t cv; //condition variable for signaling the worker thread
+ gf_boolean_t is_control;
+#ifdef NSR_DEBUG
+ uint32_t worker_log_fd;
+#endif
+} nsr_per_node_worker_t;
+
+typedef struct _nsr_replica_worker_s {
+ char name[256];
+ nsr_per_node_worker_t *control_worker;
+ nsr_per_node_worker_t *data_worker;
+ gf_boolean_t in_use;
+ nsr_reconciliator_info_t *recon_info; // Bunch of infos kept for this reconciliation
+} nsr_replica_worker_t;
+
+typedef struct _nsr_recon_driver_ctxt {
+ xlator_t *this;
+ uint32_t replica_group_size; // number of static members of replica group
+ nsr_replica_worker_t *workers; // worker info
+ int32_t reconciliator;
+ pthread_mutex_t mutex; //mutex to gaurd the state
+ pthread_cond_t cv; //condition variable for signaling the driver thread
+ uint32_t state; //driver state
+ volatile int32_t outstanding;
+ uint32_t reconciliator_index;
+ uint32_t txn_id;
+ uint32_t current_term;
+ jmp_buf *env;
+#ifdef NSR_DEBUG
+ uint32_t driver_log_fd;
+#endif
+ nsr_mode_t mode; // default set to seq
+} nsr_recon_driver_ctx_t;
+
+void *
+nsr_reconciliation_driver(void *);
+
+gf_boolean_t
+nsr_recon_driver_set_role(nsr_recon_driver_ctx_t *ctx, nsr_recon_role_t *rr, uint32_t txn_id);
+
+#define atomic_inc(ptr) ((void) __sync_fetch_and_add(ptr, 1))
+#define atomic_dec(ptr) ((void) __sync_fetch_and_add(ptr, -1))
+#define atomic_fetch_and __sync_fetch_and_and
+#define atomic_fetch_or __sync_fetch_and_or
+
+/*
+ * REVIEW
+ * Ideally, use gf_log like everyone else. Failing that, at least put the logs
+ * with all the others in /var/log instead of /tmp.
+ * NB two instances, for nsr_driver_log and nsr_worker_log
+ */
+#ifdef NSR_DEBUG
+#define nsr_driver_log(dom, levl, fmt...) \
+ { \
+ char c[255]; \
+ if (!ctx->driver_log_fd) { \
+ mkdir("/tmp/nsr-logs/", 0777); \
+ ctx->driver_log_fd = open("/tmp/nsr-logs/nsr-driver-log", O_RDWR|O_CREAT|O_TRUNC); \
+ } \
+ sprintf(c, fmt); \
+ write(ctx->driver_log_fd, c, strlen(c)); \
+ }
+#else
+#define nsr_driver_log(dom, levl, fmt...) gf_log(dom, levl, fmt)
+#endif
+
+#ifdef NSR_DEBUG
+#define nsr_worker_log(dom, levl, fmt...) \
+ { \
+ char c[255]; \
+ if (!ctx->worker_log_fd) { \
+ char str[255]; \
+ sprintf(str,"/tmp/nsr-logs/%s-%d",ctx->is_control? "con" : "data",ctx->index); \
+ mkdir("/tmp/nsr-logs/", 0777); \
+ ctx->worker_log_fd = open(str, O_RDWR|O_CREAT|O_TRUNC); \
+ } \
+ sprintf(c, fmt); \
+ write(ctx->worker_log_fd, c, strlen(c)); \
+ }
+#else
+#define nsr_worker_log(dom, levl, fmt...) gf_log(dom, levl, fmt)
+#endif
+
+#endif /* #ifndef __RECON_DRIVER_H__ */
diff --git a/xlators/cluster/nsr-recon/src/recon_xlator.c b/xlators/cluster/nsr-recon/src/recon_xlator.c
new file mode 100644
index 000000000..62583d526
--- /dev/null
+++ b/xlators/cluster/nsr-recon/src/recon_xlator.c
@@ -0,0 +1,837 @@
+/*
+ Copyright (c) 2013 Red Hat, Inc. <http://www.redhat.com>
+ This file is part of GlusterFS.
+
+ This file is licensed to you under your choice of the GNU Lesser
+ General Public License, version 3 or any later version (LGPLv3 or
+ later), or the GNU General Public License, version 2 (GPLv2), in all
+ cases as published by the Free Software Foundation.
+*/
+#ifndef _CONFIG_H
+#define _CONFIG_H
+#include "config.h"
+#endif
+
+#include <sys/types.h>
+#include <fcntl.h>
+#include <string.h>
+#include <unistd.h>
+
+#include "call-stub.h"
+#include "defaults.h"
+#include "xlator.h"
+
+#include "recon_driver.h"
+#include "recon_xlator.h"
+
+typedef struct _nsr_recon_fd_s {
+ int32_t term;
+ nsr_recon_driver_state_t state;
+ uint32_t first_index;
+ uint32_t last_index;
+ call_frame_t *frame;
+} nsr_recon_fd_t;
+
+
+typedef struct _nsr_txn_id_s {
+ uint32_t txn_id;
+ call_frame_t *frame;
+ struct list_head list;
+} nsr_txn_id_t;
+
+// Given fd, get back the NSR based fd context.
+static int32_t this_fd_ctx_get(fd_t *fd, xlator_t *this, nsr_recon_fd_t **rfd)
+{
+ uint64_t tmp = 0;
+ int32_t ret = -1;
+
+ if ((ret = fd_ctx_get(fd, this, &tmp)) != 0) {
+ return ret;
+ } else {
+ *rfd = (nsr_recon_fd_t *)tmp;
+ return 0;
+ }
+}
+
+// Add the frame in q after associating with txn_id
+static void put_frame(nsr_recon_private_t *priv,
+ call_frame_t *frame,
+ uint32_t txn_id)
+{
+ xlator_t *this = priv->this;
+ nsr_txn_id_t * tid = GF_CALLOC(1, sizeof(nsr_txn_id_t), gf_mt_recon_private_t);
+ tid->txn_id = txn_id;
+ tid->frame = frame;
+ INIT_LIST_HEAD(&(tid->list));
+ list_add_tail(&(tid->list), &(priv->list));
+ recon_main_log (this->name, GF_LOG_INFO, "adding framef or txn id %d into queue \n", txn_id);
+}
+
+// get the frame from the queue given the txn id
+static void get_frame(nsr_recon_private_t *priv,
+ call_frame_t **frame,
+ uint32_t txn_id)
+{
+ nsr_txn_id_t *tid = NULL;
+ xlator_t *this = priv->this;
+
+ list_for_each_entry(tid, &(priv->list), list) {
+ if (tid->txn_id == txn_id) {
+ *frame = tid->frame;
+ recon_main_log (this->name, GF_LOG_INFO, "got frame for txn id %d into queue \n", txn_id);
+ return;
+ }
+ }
+ recon_main_log (this->name, GF_LOG_INFO, "got no frame for txn id %d into queue \n", txn_id);
+ GF_ASSERT(0);
+}
+
+// Get the term info for the term number specified
+void nsr_recon_libchangelog_get_this_term_info(xlator_t *this, char *bp, int32_t term, nsr_recon_last_term_info_t *lt)
+{
+ struct stat buf;
+ char path[PATH_MAX];
+
+ bzero(lt, sizeof(nsr_recon_last_term_info_t));
+ lt->last_term = term;
+ sprintf(path,"%s/%s%d",bp,"TERM.",term);
+ if (!stat(path, &buf) && (buf.st_size > 128)) {
+ if (buf.st_size <= 128) {
+ lt->first_index = 0;
+ lt->last_index = 0;
+ lt->commited_ops = 0;
+ }
+ else {
+ lt->first_index = 1;
+ lt->last_index = ((buf.st_size - 128)/128) + 1 ;
+ lt->commited_ops = lt->last_index - lt->first_index + 1;
+ }
+ }
+ recon_main_log (this->name, GF_LOG_INFO, "for term=%d got first_index=%d last_index=%d commited_ops=%d\n",
+ term, lt->first_index, lt->last_index, lt->commited_ops);
+ return;
+}
+
+// Given the term number, find the last term in the changelogs
+void nsr_recon_libchangelog_get_last_term_info(xlator_t *this, char *bp, int32_t term, nsr_recon_last_term_info_t *lt)
+{
+ uint32_t t = term;
+ struct stat buf;
+ char path[PATH_MAX];
+ bzero(lt, sizeof(nsr_recon_last_term_info_t));
+ while(t) {
+ // journal file is of type TERM-1.jnl
+ sprintf(path,"%s/%s%d",bp,"TERM.",t);
+ if (!stat(path, &buf)) {
+ nsr_recon_libchangelog_get_this_term_info(this, bp, t, lt);
+ recon_main_log (this->name, GF_LOG_INFO, "got last term given current term %d as %d\n", term, t);
+ return;
+ }
+ t--;
+ }
+ recon_main_log (this->name, GF_LOG_INFO, "got no last term given current term %d \n", term);
+
+ return;
+}
+
+// Return back the frame stored against the txn_id
+void nsr_recon_return_back(nsr_recon_private_t *priv, uint32_t txn_id)
+{
+ call_frame_t *old_frame = NULL;
+ xlator_t *this = priv->this;
+ int32_t op_ret = 0;
+ int32_t op_errno = 0;
+
+ get_frame(priv, &old_frame, txn_id);
+ if (old_frame) {
+ recon_main_log (this->name, GF_LOG_INFO, "nsr_recon_writev returns old frame \n");
+ // first return the original write for which this ack was sent
+ STACK_UNWIND_STRICT (writev, old_frame, op_ret, op_errno, NULL, NULL, NULL);
+ } else {
+ recon_main_log (this->name, GF_LOG_ERROR, "EIII---nsr_recon_writev cnnot return old frame \n");
+ }
+}
+
+typedef enum records_type_t {
+ fop_gfid_pgfid_oldloc_newloc = 1,
+ fop_gfid_pgfid_entry = fop_gfid_pgfid_oldloc_newloc + 1,
+ fop_gfid = fop_gfid_pgfid_entry + 1 ,
+ fop_gfid_offset = fop_gfid + 1,
+ fop_gfid_offset_len = fop_gfid_offset + 1,
+} records_type_t;
+
+// Get the backend ./glusterfs/xx/xx/<...> path
+static void
+get_gfid_path(nsr_recon_private_t *priv, char *gfid, char *path)
+{
+ strcpy(path, priv->base_dir);
+ strcat(path, "/.glusterfs/");
+ strncat(path,gfid,2);
+ strcat(path,"/");
+ strncat(path,gfid+2,2);
+ strcat(path,"/");
+ strcat(path,gfid);
+}
+
+
+// Get the link to which backend points to
+static gf_boolean_t
+get_link_using_gfid(nsr_recon_private_t *priv, char *gfid, char *path)
+{
+ char lp[PATH_MAX];
+ xlator_t *this = priv->this;
+ get_gfid_path(priv,gfid, lp);
+ if (readlink(lp, path, 255) == -1) {
+ GF_ASSERT(0);
+ recon_main_log(priv->this, GF_LOG_ERROR,
+ "cannot get readlink for %s\n",lp);
+ return _gf_false;
+ }
+ return _gf_true;
+}
+
+// Get the list of changelog records given a term , first and last index.
+void nsr_recon_libchangelog_get_records(xlator_t *this, char *bp, int32_t term, uint32_t first, uint32_t last, void *buf)
+{
+ // do a mmap; seek into the first and read all records till last.
+ // TBD - right now all records are pseudo holes but mark them as fills.
+ // TBD - pseudo hole to be implemented when actual fsync gets done on data.
+ char read_buf[((last - first) + 1) * 128];
+ char *rb = &(read_buf[0]);
+ char path[PATH_MAX];
+ int fd;
+ uint32_t index = 0;
+
+ recon_main_log (this->name, GF_LOG_INFO,
+ "libchangelog_get_records called for term %d index from %d to %d \n",
+ term, first, last );
+
+ sprintf(path,"%s/%s%d",bp,"TERM.",term);
+ fd = open(path, O_RDONLY);
+ if (fd != -1) {
+ char *start = NULL;
+ nsr_recon_record_details_t * rec = (nsr_recon_record_details_t *)buf;
+ if (first == 0)
+ lseek(fd, 128, SEEK_SET);
+ else
+ lseek(fd, first * 128, SEEK_SET);
+ read(fd, rb, (last - first + 1) * 128);
+ start = rb;
+ index = first;
+ do {
+ recon_main_log (this->name, GF_LOG_INFO,
+ "libchangelog_get_records start inspecting records at index %d \n",
+ index );
+ if (!strncmp(start, "_PRE_", 5)) {
+ char op_str[4];
+ uint32_t i=0, opcode = 0;
+ records_type_t type;
+
+ start += 5;
+ // increment by the NULLs after the PRE
+ start += 4;
+ // now we have the opcode
+ i = 0;
+ while (*start != 0) {
+ op_str[i++] = (*start);
+ start++;
+ }
+ op_str[i] = '\0';
+ opcode = strtoul(op_str, NULL, 10);
+ recon_main_log (this->name, GF_LOG_ERR,
+ "libchangelog_get_records: got opcode %d @index %d\n", opcode, index);
+ if ((opcode == GF_FOP_RENAME)) {
+ type = fop_gfid_pgfid_oldloc_newloc;
+ } else if ((opcode == GF_FOP_UNLINK) ||
+ (opcode == GF_FOP_RMDIR) ||
+ (opcode == GF_FOP_LINK) ||
+ (opcode == GF_FOP_MKDIR) ||
+ (opcode == GF_FOP_SYMLINK) ||
+ (opcode == GF_FOP_MKNOD) ||
+ (opcode == GF_FOP_CREATE)) {
+ type = fop_gfid_pgfid_entry;
+ } else if ((opcode == GF_FOP_FSETATTR) ||
+ (opcode == GF_FOP_SETATTR) ||
+ (opcode == GF_FOP_FREMOVEXATTR) ||
+ (opcode == GF_FOP_REMOVEXATTR) ||
+ (opcode == GF_FOP_SETXATTR) ||
+ (opcode == GF_FOP_FSETXATTR)) {
+ type = fop_gfid;
+ } else if ((opcode == GF_FOP_TRUNCATE) ||
+ (opcode == GF_FOP_FTRUNCATE)) {
+ type = fop_gfid_offset;
+ } else if (opcode == GF_FOP_WRITE) {
+ type = fop_gfid_offset_len;
+ } else {
+ recon_main_log (this->name, GF_LOG_ERR,
+ "libchangelog_get_records:got no proper opcode %d @index %d\n",
+ opcode, index);
+ //GF_ASSERT(0);
+ // make this as a hole.
+ // TBD - check this logic later. maybe we should raise alarm here because
+ // this means that changelog is corrupted. We are not handling changelog
+ // corruptions as of now.
+ rec->type = NSR_LOG_HOLE;
+ goto finish;
+ }
+ // TBD - handle psuedo holes once that logic is in.
+ rec->type = NSR_LOG_FILL;
+ recon_main_log (this->name, GF_LOG_ERR,
+ "libchangelog_get_records:got type %d at index %d \n",
+ rec->type, index);
+ rec->op = opcode;
+
+ // Now get the gfid and parse it
+ // before that increment the pointer
+ start++;
+ for (i=0; i < 36; i++) {
+ rec->gfid[i] = (*start);
+ start++;
+ }
+ rec->gfid[i] = '\0';
+
+ if (opcode == GF_FOP_SYMLINK) {
+ // the symlink would have been removed. Hence ignore this.
+ // TBD - have an uniform error policy in case of such cases.
+ // Right now we are handling some on the source and some on the destination.
+ if(get_link_using_gfid(this->private, rec->gfid, rec->link_path) == _gf_false) {
+ rec->type = NSR_LOG_HOLE;
+ goto finish;
+ }
+ }
+
+ GF_ASSERT(*start == 0);
+ start ++;
+
+ i = 0;
+ // If type is fop_gfid_offset+_len, get offset
+ if ((type == fop_gfid_offset) || (type == fop_gfid_offset_len)) {
+ char offset_str[128];
+ while(*start != 0) {
+ offset_str[i++] = *start;
+ start ++;
+ }
+ offset_str[i] = '\0';
+ // get over the 0
+ start++;
+ rec->offset = strtoul(offset_str, NULL, 10);
+ recon_main_log (this->name, GF_LOG_ERR,
+ "libchangelog_get_records:got offset %d @index %d \n", rec->offset, index);
+
+ }
+ i = 0;
+ if (type == fop_gfid_offset_len) {
+ char len_str[128];
+ while(*start != 0) {
+ len_str[i++] = *start;
+ start ++;
+ }
+ len_str[i] = '\0';
+ // get over the 0
+ start++;
+ rec->len = strtoul(len_str, NULL, 10);
+ recon_main_log (this->name, GF_LOG_ERR,
+ "libchangelog_get_records:got length %d @index %d \n", rec->len, index);
+ }
+ i = 0;
+ if (type == fop_gfid_pgfid_entry) {
+ // first get the gfid and then the path
+ for (i=0; i < 36; i++) {
+ rec->pargfid[i] = (*start);
+ start++;
+ }
+ rec->pargfid[i] = '\0';
+ GF_ASSERT(*start == '/');
+ start ++;
+
+ i = 0;
+ while(*start != 0) {
+ rec->entry[i++] = *start;
+ start ++;
+ }
+ rec->entry[i] = '\0';
+ // get over the 0
+ start++;
+ recon_main_log (this->name, GF_LOG_ERR,
+ "libchangelog_get_records:got entry %s @index %d \n", rec->entry, index);
+
+ }
+ i = 0;
+ if (type == fop_gfid_pgfid_oldloc_newloc) {
+
+ // first get the source and then the destination
+ // source stuff gets stored in pargfid/entry
+ for (i=0; i < 36; i++) {
+ rec->pargfid[i] = (*start);
+ start++;
+ }
+ rec->pargfid[i] = '\0';
+ GF_ASSERT(*start == '/');
+ start ++;
+
+ i=0;
+ while(*start != 0) {
+ rec->entry[i++] = *start;
+ start ++;
+ }
+ rec->entry[i] = '\0';
+ // get over the 0
+ start++;
+
+ // dst stuff gets stored in gfid/newloc
+ for (i=0; i < 36; i++) {
+ rec->gfid[i] = (*start);
+ start++;
+ }
+ rec->gfid[i] = '\0';
+ GF_ASSERT(*start == '/');
+ start ++;
+ i = 0;
+ while(*start != 0) {
+ rec->newloc[i++] = *start;
+ start ++;
+ }
+ rec->newloc[i] = '\0';
+ // get over the 0
+ start++;
+
+ }
+ ENDIAN_CONVERSION_RD((*rec), _gf_false); //htonl
+ }
+finish:
+ if (index == last)
+ break;
+ index++;
+ rb += 128;
+ start = rb;
+ rec++;
+ } while(1);
+ }
+ close(fd);
+
+ recon_main_log (this->name, GF_LOG_INFO,
+ "libchangelog_get_records finsihed inspecting records for term %d \n",
+ term);
+ return;
+}
+
+int32_t
+nsr_recon_open (call_frame_t *frame, xlator_t *this,
+ loc_t *loc, int32_t flags, fd_t *fd, dict_t *xdata)
+{
+ int32_t op_ret = 0;
+ int32_t op_errno = 0;
+ nsr_recon_fd_t *rfd = NULL;
+
+ recon_main_log (this->name, GF_LOG_INFO, "nsr_recon_open called for path %s \n",loc->path );
+ rfd = GF_CALLOC (1, sizeof (*rfd), gf_mt_recon_private_t);
+ if (!rfd) {
+ op_ret = -1;
+ op_errno = ENOMEM;
+ }
+
+ op_ret = fd_ctx_set (fd, this, (uint64_t)(long)rfd);
+ if (op_ret) {
+ op_ret = -1;
+ op_errno = EINVAL;
+ }
+ recon_main_log (this->name, GF_LOG_INFO, "nsr_recon_open returns with %d for path %s \n",op_ret,loc->path );
+ STACK_UNWIND_STRICT (open, frame, op_ret, op_errno, fd, NULL);
+ return 0;
+}
+
+int32_t
+nsr_recon_writev (call_frame_t *frame, xlator_t *this, fd_t *fd,
+ struct iovec *vector, int32_t count, off_t offset,
+ uint32_t flags, struct iobref *iobref, dict_t *xdata)
+{
+ nsr_recon_fd_t *rfd = NULL;
+ nsr_recon_private_t *priv = NULL;
+ int32_t op_ret = 0;
+ int32_t op_errno = 0;
+ int32_t ret = 0;
+
+ ret = this_fd_ctx_get (fd, this, &rfd);
+ if (ret < 0) {
+ return -1;
+ }
+ priv = (nsr_recon_private_t *)this->private;
+
+ recon_main_log (this->name, GF_LOG_INFO, "nsr_recon_writev called for offset %d \n",(unsigned int)offset );
+ GF_ASSERT(count == 1);
+ switch (offset) {
+ // gets called to return back
+ case nsr_recon_xlator_sector_0:
+ {
+ char c[4];
+ uint32_t txn_id;
+
+ recon_main_log (this->name, GF_LOG_INFO, "nsr_recon_writev clled to return back \n");
+ memcpy((void *)c, (void *)vector[0].iov_base, 4);
+ txn_id = ntohl(atoi(c));
+ nsr_recon_return_back(priv, txn_id);
+ STACK_UNWIND_STRICT (writev, frame, op_ret, op_errno,
+ NULL, NULL, NULL);
+ break;
+ }
+ // client(brick, leader) writes the role of the node
+ case nsr_recon_xlator_sector_1 :
+ {
+ nsr_recon_role_t rr;
+ memcpy((void *)&rr, (void *)vector[0].iov_base, sizeof(rr));
+ ENDIAN_CONVERSION_RR(rr, _gf_true); //ntohl
+
+ recon_main_log (this->name, GF_LOG_INFO, "nsr_recon_writev called to set role %d\n", rr.role);
+ if ((rr.role != leader) &&
+ (rr.role != reconciliator) &&
+ (rr.role != resolutor)) {
+ recon_main_log (this->name, GF_LOG_ERROR,
+ "EIII---nsr_recon_writev cannot set state \n");
+ STACK_UNWIND_STRICT (writev, frame, -1, op_errno,
+ NULL, NULL, NULL);
+ }
+
+ GF_ASSERT(rr.num <= MAXIMUM_REPLICA_STRENGTH);
+
+ // Store the stack frame so that when the actual job gets finished
+ // we send the response back to the brick.
+ if (nsr_recon_driver_set_role(priv->driver_thread_context,
+ &rr,
+ priv->txn_id) == _gf_false) {
+ recon_main_log (this->name, GF_LOG_ERROR,
+ "nsr_recon_writev set_role - cannot seem to set role \n");
+ STACK_UNWIND_STRICT (writev, frame, -1, op_errno,
+ NULL, NULL, NULL);
+ } else {
+ uint32_t old = priv->txn_id;
+ atomic_cmpxchg(&priv->txn_id, old,old+1);
+ put_frame(priv, frame, old);
+ recon_main_log (this->name, GF_LOG_INFO,
+ "nsr_recon_writev set_role - set role succesfully \n");
+ }
+ break;
+ }
+ // client(reconciliator) writes how much it needs for the read
+ case nsr_recon_xlator_sector_2 :
+ {
+ nsr_recon_log_info_t li;
+ memcpy((void *)&li, (void *)vector[0].iov_base, sizeof(li));
+ ENDIAN_CONVERSION_LI(li, _gf_true); //ntohl
+
+ recon_main_log (this->name, GF_LOG_INFO,
+ "nsr_recon_writev - setting term info for reconcilation info. term=%d, first_index=%d,start_index=%d \n",
+ li.term, li.first_index, li.last_index);
+ rfd->term = li.term;
+ rfd->last_index = li.last_index;
+ rfd->first_index = li.first_index;
+ STACK_UNWIND_STRICT (writev, frame, op_ret, op_errno,
+ NULL, NULL, NULL);
+ break;
+ }
+ // client(reconciliator) writes term for which it needs info
+ case nsr_recon_xlator_sector_3 :
+ {
+ int32_t term;
+
+ memcpy((void *)&term, (void *)vector[0].iov_base, sizeof(term));
+ term = ntohl(term); //ntohl
+ recon_main_log (this->name, GF_LOG_INFO,
+ "nsr_recon_writev - setting term info for term info. term=%d\n",
+ term);
+ rfd->term = term;
+ STACK_UNWIND_STRICT (writev, frame, op_ret, op_errno,
+ NULL, NULL, NULL);
+ break;
+ }
+ // client(reconciliator) writes current term so that it gets last term info later
+ case nsr_recon_xlator_sector_4 :
+ {
+ int32_t term;
+
+ memcpy((void *)&term, (void *)vector[0].iov_base, sizeof(term));
+ term = ntohl(term); //ntohl
+ recon_main_log (this->name, GF_LOG_INFO,
+ "nsr_recon_writev - setting term info for last term info given current term=%d\n",
+ term);
+ rfd->term = term;
+ STACK_UNWIND_STRICT (writev, frame, op_ret, op_errno,
+ NULL, NULL, NULL);
+ break;
+ }
+ }
+
+ return 0;
+}
+
+int
+nsr_recon_readv (call_frame_t *frame, xlator_t *this,
+ fd_t *fd, size_t size, off_t offset, uint32_t flags, dict_t *xdata)
+{
+ nsr_recon_fd_t *rfd = NULL;
+ int32_t op_ret = 0;
+ int32_t op_errno = 0;
+ // copied stuff from quick-read.c and posix.c
+ struct iobuf *iobuf = NULL;
+ struct iobref *iobref = NULL;
+ struct iovec iov = {0, };
+ int32_t ret = -1;
+ nsr_recon_private_t *priv = NULL;
+
+ iobuf = iobuf_get2 (this->ctx->iobuf_pool, op_ret);
+ if (!iobuf) {
+ op_errno = ENOMEM;
+ goto out;
+ }
+
+ iobref = iobref_new ();
+ if (!iobref) {
+ op_errno = ENOMEM;
+ goto out;
+ }
+
+ iobref_add (iobref, iobuf);
+
+ ret = this_fd_ctx_get (fd, this, &rfd);
+ if (ret < 0) {
+ op_errno = -ret;
+ goto out;
+ }
+ priv = (nsr_recon_private_t *)this->private;
+
+ recon_main_log (this->name, GF_LOG_INFO, "nsr_recon_readv called for offset %d \n",(unsigned int)offset );
+ switch (offset) {
+ // client(leader) reads from here to get info for this term on this node
+ // invole libchagelog to get the information
+ case nsr_recon_xlator_sector_3 :
+ {
+ nsr_recon_last_term_info_t lt;
+ GF_ASSERT(size == sizeof(lt));
+ nsr_recon_libchangelog_get_this_term_info(this,priv->changelog_base_path, rfd->term, &lt);
+ recon_main_log (this->name, GF_LOG_INFO,
+ "nsr_recon_readv - getting term info for term=%d, ops=%d, first=%d, last=%d\n",
+ rfd->term, lt.commited_ops, lt.first_index, lt.last_index);
+ ENDIAN_CONVERSION_LT(lt, _gf_false); //htonl
+ memcpy(iobuf->ptr, &lt, size);
+ goto out;
+ }
+ // client(reconciliator) reads individual record information
+ case nsr_recon_xlator_sector_2 :
+ {
+ uint32_t num = (rfd->last_index - rfd->first_index + 1);
+ recon_main_log (this->name, GF_LOG_INFO,
+ "nsr_recon_readv - expected size %lu got size %lu\n",
+ (num * sizeof(nsr_recon_record_details_t)), size);
+
+ GF_ASSERT(size == (num * sizeof(nsr_recon_record_details_t)));
+ recon_main_log (this->name, GF_LOG_INFO,
+ "nsr_recon_readv - getting records for term=%d from %d to %d\n",
+ rfd->term, rfd->first_index, rfd->last_index);
+ nsr_recon_libchangelog_get_records(this, priv->changelog_base_path,
+ rfd->term, rfd->first_index, rfd->last_index, iobuf->ptr);
+ goto out;
+ }
+ // read last term info
+ case nsr_recon_xlator_sector_4 :
+ {
+ nsr_recon_last_term_info_t lt;
+ GF_ASSERT(size == sizeof(lt));
+ nsr_recon_libchangelog_get_last_term_info(this, priv->changelog_base_path, rfd->term, &lt);
+ recon_main_log (this->name, GF_LOG_INFO,
+ "nsr_recon_readv - getting last term info given current term=%d. last term = %d ops=%d, first=%d, last=%d\n",
+ rfd->term, lt.last_term, lt.commited_ops, lt.first_index, lt.last_index);
+ ENDIAN_CONVERSION_LT(lt, _gf_false); //htonl
+ memcpy(iobuf->ptr, &lt, size);
+ goto out;
+ }
+ }
+
+out:
+ if (op_errno == 0) {
+ iov.iov_base = iobuf->ptr;
+ ret = iov.iov_len = size;
+ }
+
+ STACK_UNWIND_STRICT (readv, frame, ret, op_errno, &iov, 1, NULL, iobref , NULL);
+
+ if (iobref)
+ iobref_unref (iobref);
+ if (iobuf)
+ iobuf_unref (iobuf);
+ return 0;
+}
+
+int
+nsr_recon_lookup (call_frame_t *frame, xlator_t *this,
+ loc_t *loc, dict_t *xdata)
+{
+ struct iatt buf = {0, };
+ // dirty hack to set root as regular but seems to work.
+ buf.ia_type = IA_IFREG;
+ recon_main_log (this->name, GF_LOG_INFO, "nsr_recon_lookup called \n");
+
+ STACK_UNWIND_STRICT (lookup, frame, 0, 0, this->itable->root, &buf, NULL, NULL);
+ return 0;
+}
+
+
+int32_t
+nsr_recon_flush (call_frame_t *frame, xlator_t *this,
+ fd_t *fd, dict_t *xdata)
+{
+ STACK_UNWIND_STRICT (flush, frame, 0, 0, NULL);
+ return 0;
+}
+
+int32_t
+init (xlator_t *this)
+{
+ nsr_recon_private_t *priv = NULL;
+ char *local, *members;
+ unsigned int i=0;
+
+ priv = GF_CALLOC (1, sizeof (*priv), gf_mt_recon_private_t);
+ if (!priv) {
+ recon_main_log (this->name, GF_LOG_ERROR,
+ "priv allocation error\n");
+ return -1;
+ }
+ GF_OPTION_INIT ("replica-group-size", priv->replica_group_size, uint32, err);
+ GF_OPTION_INIT ("vol-name", priv->volname, str, err);
+ if (!priv->volname) {
+ recon_main_log (this->name, GF_LOG_ERROR,
+ "missing volname option (required)");
+ return -1;
+ }
+ GF_OPTION_INIT ("changelog-dir", priv->changelog_base_path, str, err);
+ if (!priv->changelog_base_path) {
+ recon_main_log (this->name, GF_LOG_ERROR,
+ "missing changelog directory option (required)");
+ return -1;
+ }
+ GF_OPTION_INIT ("base-dir", priv->base_dir, str, err);
+ if (!priv->base_dir) {
+ recon_main_log (this->name, GF_LOG_ERROR,
+ "missing brick base directory option (required)");
+ return -1;
+ }
+ GF_OPTION_INIT ("replica-group-members", members, str, err);
+ if (!members) {
+ recon_main_log (this->name, GF_LOG_ERROR,
+ "missing membership option (required)");
+ return -1;
+ }
+ GF_OPTION_INIT ("local-member", local, str, err);
+ if (!local) {
+ recon_main_log (this->name, GF_LOG_ERROR,
+ "missing local member option (required)");
+ return -1;
+ }
+
+ priv->replica_group_members = GF_CALLOC (priv->replica_group_size,
+ sizeof(char *),
+ gf_mt_recon_private_t);
+ priv->replica_group_members[0] = GF_CALLOC (1,
+ strlen(local),
+ gf_mt_recon_private_t);
+ if (!priv->replica_group_members || !(priv->replica_group_members[0])) {
+ recon_main_log (this->name, GF_LOG_ERROR,
+ "str allocation error\n");
+ return -1;
+ }
+ strcpy(priv->replica_group_members[0], local);
+ for (i=1; i < priv->replica_group_size; i++) {
+ char *member;
+ if (i == 1)
+ member = strtok(members, ",");
+ else
+ member = strtok(NULL, ",");
+ priv->replica_group_members[i] = GF_CALLOC (1, strlen(member) + 1, gf_mt_recon_private_t);
+ if (!priv->replica_group_members[i]) {
+ recon_main_log (this->name, GF_LOG_ERROR,
+ "str allocation error\n");
+ return -1;
+ }
+ strcpy(priv->replica_group_members[i], member);
+ }
+
+
+ priv->this = this;
+ this->private = (void *)priv;
+
+ recon_main_log (this->name, GF_LOG_INFO, "creating reconciliation driver \n");
+
+ if (pthread_create(&priv->thread_id, NULL, nsr_reconciliation_driver, priv)) {
+ recon_main_log (this->name, GF_LOG_ERROR,
+ "pthread creation error \n");
+ return -1;
+ }
+
+ INIT_LIST_HEAD(&(priv->list));
+
+
+ return 0;
+
+err:
+ return -1;
+}
+
+
+void
+fini (xlator_t *this)
+{
+ nsr_recon_private_t *priv = NULL;
+ void *ret = NULL;
+
+ priv = (nsr_recon_private_t *)this->private;
+
+ pthread_cancel(priv->thread_id);
+ pthread_join(priv->thread_id, &ret);
+}
+
+
+struct xlator_fops fops = {
+ .open = nsr_recon_open,
+ .readv = nsr_recon_readv,
+ .writev = nsr_recon_writev,
+ .lookup = nsr_recon_lookup,
+ .flush = nsr_recon_flush
+};
+
+struct xlator_cbks cbks = {
+};
+
+struct volume_options options[] = {
+ { .key = {"replica-group-size"},
+ .type = GF_OPTION_TYPE_INT,
+ .min = 2,
+ .max = INT_MAX,
+ .default_value = "2",
+ .description = "Number of bricks in replica group. can be derived but putting it here for testing."
+ },
+ {
+ .key = {"vol-name"},
+ .type = GF_OPTION_TYPE_STR,
+ .description = "volume name"
+ },
+ {
+ .key = {"local-member"},
+ .type = GF_OPTION_TYPE_STR,
+ .description = "member(brick) for which this translator is responsible."
+ },
+ {
+ .key = {"replica-group-members"},
+ .type = GF_OPTION_TYPE_STR,
+ .description = "Comma seperated member names other than local."
+ },
+ {
+ .key = {"changelog-dir"},
+ .type = GF_OPTION_TYPE_STR,
+ .description = "Base directory where per term changelogs are maintained."
+ },
+ {
+ .key = {"base-dir"},
+ .type = GF_OPTION_TYPE_STR,
+ .description = "Base directory for this brick. This should go away once we fix gfid based lookups"
+ },
+ { .key = {NULL} },
+};
diff --git a/xlators/cluster/nsr-recon/src/recon_xlator.h b/xlators/cluster/nsr-recon/src/recon_xlator.h
new file mode 100644
index 000000000..c0f1e2145
--- /dev/null
+++ b/xlators/cluster/nsr-recon/src/recon_xlator.h
@@ -0,0 +1,78 @@
+/*
+ Copyright (c) 2008-2012 Red Hat, Inc. <http://www.redhat.com>
+ This file is part of GlusterFS.
+
+ This file is licensed to you under your choice of the GNU Lesser
+ General Public License, version 3 or any later version (LGPLv3 or
+ later), or the GNU General Public License, version 2 (GPLv2), in all
+ cases as published by the Free Software Foundation.
+*/
+
+#ifndef __RECON_XLATOR_H__
+#define __RECON_XLATOR_H__
+
+#include <semaphore.h>
+#include <pthread.h>
+
+enum gf_dht_mem_types_ {
+ gf_mt_recon_private_t = gf_common_mt_end + 1,
+};
+
+enum nsr_recon_xlator_sector_t {
+ nsr_recon_xlator_sector_0 = 0, // to report back the status of given transaction ids
+ nsr_recon_xlator_sector_1 = 512, // to write here information about leadership changes from the brick
+ nsr_recon_xlator_sector_2 = (512 * 2), // to write here individual roles and wait for that role to be done
+ nsr_recon_xlator_sector_3 = (512 *3), // read from here to get term info for given term
+ nsr_recon_xlator_sector_4 = (512 * 4), // read from here to get last term info
+};
+
+
+typedef struct _nsr_recon_private_s {
+ xlator_t *this; //back pointer
+ unsigned int replica_group_size; // number of static members of replica group
+ char **replica_group_members; // replica group members (including itself in first slot)
+ pthread_t thread_id; // driver thread id
+ nsr_recon_driver_ctx_t *driver_thread_context; //driver thread context
+ unsigned int outstanding; // for communicating with driver thread
+ call_frame_t *frame; // old frame that is pending (just one as of now)
+ struct list_head list;
+ char *volname;
+ uint32_t txn_id;
+ char *changelog_base_path;
+ char *base_dir;
+#ifdef NSR_DEBUG
+ uint32_t recon_main_log_fd;
+#endif
+} nsr_recon_private_t;
+
+#define atomic_cmpxchg __sync_val_compare_and_swap
+
+/*
+ * REVIEW
+ * Ideally, use gf_log like everyone else. Failing that, at least put the logs
+ * with all the others in /var/log instead of /tmp.
+ */
+#ifdef NSR_DEBUG
+#define recon_main_log(dom, levl, fmt...) \
+ { \
+ nsr_recon_private_t *priv = this->private; \
+ char c[255]; \
+ if (!priv->recon_main_log_fd) { \
+ mkdir("/tmp/nsr-logs/", 0777); \
+ priv->recon_main_log_fd = open("/tmp/nsr-logs/recon-main-log", O_RDWR|O_CREAT|O_TRUNC); \
+ } \
+ sprintf(c, fmt); \
+ write(priv->recon_main_log_fd, c, strlen(c)); \
+ }
+#else
+#define recon_main_log(dom, levl, fmt...) gf_log(dom, levl, fmt)
+#endif
+
+
+void nsr_recon_libchangelog_get_this_term_info(xlator_t *this, char *bp, int32_t term, nsr_recon_last_term_info_t *lt);
+void nsr_recon_libchangelog_get_last_term_info(xlator_t *this, char *bp, int32_t term, nsr_recon_last_term_info_t *lt);
+void nsr_recon_return_back(nsr_recon_private_t *priv, uint32_t term_id);
+void nsr_recon_libchangelog_get_records(xlator_t *this, char *bp, int32_t term, uint32_t first, uint32_t last, void *buf);
+
+
+#endif /* #ifndef __RECON_XLATOR_H__ */