summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRaghavan P <rpichai@redhat.com>2014-01-03 16:09:04 +0530
committerRaghavan P <rpichai@redhat.com>2014-01-08 14:48:21 +0530
commite0cce4cf7c22d5cd8ab6c2aff4ecf28c18c6a469 (patch)
tree5e30d20eaf43c77f77d5aa9d4351492af659b39f
parent82ce8acfdfb141c6b34b6b6b43ef78eee891f9e8 (diff)
Changes to NSR reconciliation code.
Following is list of changes: 1) Simulation of etcd using a file as a registry protected using locks. 2) Implement notifications for child up and child down. 3) Join a new brick into quorum. 4) add support for proper fencing and draining of IO required for reconciliaiton 5) misc changes and addressed review comments. Change-Id: Iddd1137ad6205252ed03301888bb1e83fa2221e0 Signed-off-by: Raghavan P <rpichai@redhat.com>
-rw-r--r--configure.ac13
-rw-r--r--xlators/cluster/nsr-recon/src/recon_driver.c172
-rw-r--r--xlators/cluster/nsr-recon/src/recon_driver.h46
-rw-r--r--xlators/cluster/nsr-recon/src/recon_xlator.c13
-rw-r--r--xlators/cluster/nsr-recon/src/recon_xlator.h16
-rw-r--r--xlators/cluster/nsr-server/src/Makefile.am8
-rw-r--r--xlators/cluster/nsr-server/src/all-templates.c8
-rw-r--r--xlators/cluster/nsr-server/src/etcd-sim.c222
-rw-r--r--xlators/cluster/nsr-server/src/leader.c173
-rw-r--r--xlators/cluster/nsr-server/src/nsr-internal.h20
-rw-r--r--xlators/cluster/nsr-server/src/nsr.c45
-rw-r--r--xlators/cluster/nsr-server/src/recon_notify.c345
-rw-r--r--xlators/mgmt/glusterd/src/glusterd-volgen.c9
13 files changed, 849 insertions, 241 deletions
diff --git a/configure.ac b/configure.ac
index 581b976a0..89aba7781 100644
--- a/configure.ac
+++ b/configure.ac
@@ -774,6 +774,19 @@ fi
AM_CONDITIONAL([ENABLE_SYSLOG], [test x$USE_SYSLOG = xyes])
#end syslog section
+
+#etcd section
+AC_CHECK_PROG(ETCD,etcd,yes)
+
+ETCD_SIM=yes
+if test "x${ETCD}" = "xyes"; then
+ ETCD_SIM=no
+ AC_DEFINE(HAVE_ETCD, 1, [define if found etcd])
+fi
+AM_CONDITIONAL([ENABLE_ETCD_SIM], [test x$ETCD_SIM = xyes])
+#end etcd section
+
+
BUILD_READLINE=no
AC_CHECK_LIB([readline -lcurses],[readline],[RLLIBS="-lreadline -lcurses"])
AC_CHECK_LIB([readline -ltermcap],[readline],[RLLIBS="-lreadline -ltermcap"])
diff --git a/xlators/cluster/nsr-recon/src/recon_driver.c b/xlators/cluster/nsr-recon/src/recon_driver.c
index 1328d52dc..2e2299ad1 100644
--- a/xlators/cluster/nsr-recon/src/recon_driver.c
+++ b/xlators/cluster/nsr-recon/src/recon_driver.c
@@ -495,7 +495,7 @@ control_worker_func_0(nsr_per_node_worker_t *ctx,
nsr_worker_log(this->name, GF_LOG_INFO,
"out of get term info for term %d. got ops %d with first %d and last %d \n",
recon_info->last_term, recon_info->commited_ops,
- recon_info->last_index, recon_info->first_index);
+ recon_info->first_index, recon_info->last_index);
break;
}
@@ -538,12 +538,14 @@ control_worker_func_0(nsr_per_node_worker_t *ctx,
"trying to get reconciliation window records for node %d for term %d with first %d last %d\n",
index, recon_info->last_term, recon_info->first_index, recon_info->last_index);
- GF_ASSERT(num <= MAX_RECONCILIATION_WINDOW_SIZE);
// TBD - handle buffer allocation errors
rd = GF_CALLOC(num,
sizeof(nsr_recon_record_details_t),
gf_mt_recon_private_t);
+ recon_info->records = GF_CALLOC(num,
+ sizeof(nsr_reconciliation_record_t),
+ gf_mt_recon_private_t);
// TBD - handle errors
nsr_recon_libchangelog_get_records(this, priv->changelog_base_path,
@@ -684,7 +686,7 @@ control_worker_func(nsr_per_node_worker_t *ctx,
nsr_worker_log(this->name, GF_LOG_INFO,
"out of get last term info with current term %d. got ops %d with first %d and last %d \n",
recon_info->last_term, recon_info->commited_ops,
- recon_info->last_index, recon_info->first_index);
+ recon_info->first_index, recon_info->last_index);
break;
}
@@ -711,7 +713,7 @@ control_worker_func(nsr_per_node_worker_t *ctx,
nsr_worker_log(this->name, GF_LOG_INFO,
"out of get term info for term %d. got ops %d with first %d and last %d \n",
recon_info->last_term, recon_info->commited_ops,
- recon_info->last_index, recon_info->first_index);
+ recon_info->first_index, recon_info->last_index);
break;
}
@@ -848,7 +850,6 @@ control_worker_func(nsr_per_node_worker_t *ctx,
"trying to get reconciliation window records for node %d for term %d with first %d last %d\n",
index, recon_info->last_term, recon_info->first_index, recon_info->last_index);
- GF_ASSERT(num <= MAX_RECONCILIATION_WINDOW_SIZE);
// TBD - error handling for all the glfs APIs
glfs_lseek(ctx->aux_fd, nsr_recon_xlator_sector_2, SEEK_SET);
@@ -864,6 +865,10 @@ control_worker_func(nsr_per_node_worker_t *ctx,
rd = GF_CALLOC(num,
sizeof(nsr_recon_record_details_t),
gf_mt_recon_private_t);
+ recon_info->records = GF_CALLOC(num,
+ sizeof(nsr_reconciliation_record_t),
+ gf_mt_recon_private_t);
+
glfs_read(ctx->aux_fd, rd, num * sizeof(nsr_recon_record_details_t), 0);
for (i=0; i < num; i++) {
ENDIAN_CONVERSION_RD(rd[i], _gf_true); //ntohl
@@ -875,7 +880,7 @@ control_worker_func(nsr_per_node_worker_t *ctx,
recon_info->records[i].rec.type,
i + recon_info->first_index);
}
- free(rd);
+ GF_FREE(rd);
nsr_worker_log(this->name, GF_LOG_INFO,
"got reconciliation window records for node %d for term %d \n",
@@ -1003,6 +1008,28 @@ nsr_recon_in_use(nsr_recon_driver_ctx_t *ctx,
* Write the role and associated information to the node.
* This gets called from recon xlator indicating node is either
* leader, reconciliator or should do resolution.
+ */
+gf_boolean_t
+nsr_recon_driver_set_role(nsr_recon_driver_ctx_t *ctx,
+ nsr_recon_role_t *rr,
+ uint32_t txn_id)
+{
+ nsr_role_work_t *rw;
+
+ nsr_driver_log(this->name, GF_LOG_INFO, "set role called \n");
+ rw = GF_CALLOC(1, sizeof (nsr_role_work_t), 0);
+ memcpy(&rw->role, rr, sizeof(nsr_recon_role_t));
+ rw->txn_id = txn_id;
+ INIT_LIST_HEAD(&(rw->list));
+ pthread_mutex_lock(&(ctx->mutex));
+ list_add_tail(&rw->list, &ctx->role_head.list);
+ pthread_cond_signal(&(ctx->cv));
+ pthread_mutex_unlock(&(ctx->mutex));
+ nsr_driver_log(this->name, GF_LOG_INFO, "set role returns \n");
+ return _gf_true;
+}
+
+/*
* First we undo the last role to make sure we clean up.
*
* Input arguments
@@ -1023,19 +1050,18 @@ nsr_recon_in_use(nsr_recon_driver_ctx_t *ctx,
* needs to be returned back after the actual reconciliation is done.
* For that we store the frame against this id which acts as a key.
*/
-gf_boolean_t
-nsr_recon_driver_set_role(nsr_recon_driver_ctx_t *ctx,
- nsr_recon_role_t *rr,
- uint32_t txn_id)
+nsr_recon_driver_state_t
+nsr_recon_driver_get_role(nsr_recon_driver_ctx_t *ctx,
+ nsr_role_work_t *rw)
{
uint8_t i=0, j=0;
- pthread_mutex_lock(&(ctx->mutex));
- ctx->state = rr->role;
+ nsr_recon_role_t *rr = &(rw->role);
+
// First make all the threads uninitialise
for (i = 0; i < ctx->replica_group_size; i++) {
nsr_recon_in_use(ctx, i, _gf_false);
}
- if (rr->role == leader) {
+ if ((rr->role == leader) || (rr->role == joiner)) {
// First set info this node
nsr_recon_in_use(ctx, 0, _gf_true);
@@ -1051,10 +1077,11 @@ nsr_recon_driver_set_role(nsr_recon_driver_ctx_t *ctx,
for (i=1; i < ctx->replica_group_size; i++) {
for (j=0 ; j < rr->num; j++) {
// TBD - make this strcmp later when etcd servers set properly
- //if (!strcmp(ctx->workers[i].name, rr->info[j].name)) {
- if (strstr(ctx->workers[i].name, rr->info[j].name)) {
+ if (!strcmp(ctx->workers[i].name, rr->info[j].name)) {
+ //if (strstr(ctx->workers[i].name, rr->info[j].name)) {
nsr_driver_log(this->name, GF_LOG_INFO,
- "nsr_recon_driver_set_role: this as leader. found other server %s\n",
+ "nsr_recon_driver_set_role: this as %s. found other server %s\n",
+ (rr->role == leader) ? "leader" : "joiner",
ctx->workers[i].name);
nsr_recon_in_use(ctx, i, _gf_true);
@@ -1071,14 +1098,19 @@ nsr_recon_driver_set_role(nsr_recon_driver_ctx_t *ctx,
}
}
}
- ctx->reconciliator_index = -1;
+ // If leader, reconciliator has to be chosen.
+ // If joiner, we are the reconciliator.
+ if (rr->role == leader)
+ ctx->reconciliator_index = -1;
+ else
+ ctx->reconciliator_index = 0;
} else if (rr->role == reconciliator) {
ctx->reconciliator_index = 0;
// Copy information about all the other members which had the same term
for (i=0; i < rr->num; i++) {
for (j=0; j < ctx->replica_group_size; j++) {
- //if (!strcmp(rr->info[i].name, ctx->workers[j].name)) {
- if (strstr(ctx->workers[j].name, rr->info[i].name)) {
+ if (!strcmp(rr->info[i].name, ctx->workers[j].name)) {
+ //if (strstr(ctx->workers[j].name, rr->info[i].name)) {
nsr_driver_log(this->name, GF_LOG_INFO,
"nsr_recon_driver_set_role: this as reconciliator. found other server %s\n",
ctx->workers[j].name);
@@ -1104,8 +1136,8 @@ nsr_recon_driver_set_role(nsr_recon_driver_ctx_t *ctx,
} else if (rr->role == resolutor) {
for (j=0; j < ctx->replica_group_size; j++) {
// info[1] has the information regarding the reconciliator
- if (strstr(ctx->workers[j].name, rr->info[1].name)) {
- //if (!strcmp(rr->info[1].name, ctx->workers[j].name)) {
+ //if (strstr(ctx->workers[j].name, rr->info[1].name)) {
+ if (!strcmp(rr->info[1].name, ctx->workers[j].name)) {
nsr_driver_log(this->name, GF_LOG_INFO,
"nsr_recon_driver_set_role: this as resolutor. found other server %s as reconciliator\n",
ctx->workers[1].name);
@@ -1143,11 +1175,9 @@ nsr_recon_driver_set_role(nsr_recon_driver_ctx_t *ctx,
nsr_recon_in_use(ctx, 0, _gf_true);
}
- ctx->txn_id = txn_id;
- // Signal the main driver thread
- pthread_cond_signal(&(ctx->cv));
- pthread_mutex_unlock(&(ctx->mutex));
- return _gf_true;
+ ctx->txn_id = rw->txn_id;
+
+ return rr->role;
}
@@ -1171,6 +1201,15 @@ compute_resolution_work(nsr_recon_driver_ctx_t *ctx,
uint32_t i=0;
uint32_t num = (my_info->last_index - my_info->first_index + 1);
+ if (invalidate) {
+ if (my_info->records) {
+ GF_FREE(my_info->records);
+ my_info->records = GF_CALLOC(num,
+ sizeof(nsr_reconciliation_record_t),
+ gf_mt_recon_private_t);
+ }
+ }
+
for (i=0; i < num; i++) {
nsr_log_type_t orig, new;
nsr_recon_work_type_t tw = NSR_RECON_WORK_NONE;
@@ -1841,7 +1880,6 @@ read_out:
}
apply_record(ctx, ri, dict);
commit_out:
- dict_unref (dict);
nsr_worker_log(this->name, GF_LOG_INFO,
"finished recon commit for gfid %s \n",
rd->gfid);
@@ -2107,6 +2145,10 @@ nsr_recon_in_use(nsr_recon_driver_ctx_t *ctx,
send_and_wait(bm, ctx->replica_group_size, ctx,
(in_use == _gf_true) ? NSR_WORK_ID_INI : NSR_WORK_ID_FINI,
NSR_RECON_QUEUE_TO_DATA, -1);
+ if (in_use == _gf_false) {
+ //GF_FREE(ctx->workers[i].recon_info->records);
+ GF_FREE(ctx->workers[i].recon_info);
+ }
}
#endif
}
@@ -2139,6 +2181,7 @@ nsr_reconciliation_driver(void *arg)
nsr_driver_log (this->name, GF_LOG_ERROR, "mutex init error \n");
return NULL;
}
+ INIT_LIST_HEAD(&(ctx->role_head.list));
ctx->workers = GF_CALLOC (replica_group_size,
sizeof(nsr_replica_worker_t),
@@ -2188,24 +2231,20 @@ nsr_reconciliation_driver(void *arg)
while (1) {
- nsr_driver_log (this->name, GF_LOG_INFO, "waiting for state change \n");
- pthread_mutex_lock(&(ctx->mutex));
- while ((*driver_ctx)->state == 0) {
- pthread_cond_wait(&(ctx->cv), &(ctx->mutex));
- }
- pthread_mutex_unlock(&(ctx->mutex));
+ nsr_role_work_t *rr;
- nsr_driver_log (this->name, GF_LOG_INFO, " state changed to %d \n", ctx->state);
-#if 0
- for (i=0; i < replica_group_size; i++) {
- if (ctx->workers[i].in_use) {
- nsr_recon_start_work(ctx->workers[i].control_worker, _gf_true);
- nsr_recon_start_work(ctx->workers[i].data_worker, _gf_false);
- }
- }
-#endif
+ nsr_driver_log (this->name, GF_LOG_INFO, "waiting for role to be queued \n");
+ pthread_mutex_lock(&(ctx->mutex));
+ while (list_empty(&(ctx->role_head.list))) {
+ pthread_cond_wait(&(ctx->cv), &(ctx->mutex));
+ }
+ pthread_mutex_unlock(&(ctx->mutex));
+
+ list_for_each_entry(rr, &(ctx->role_head.list), list) {
+ nsr_recon_driver_state_t state;
+ state = nsr_recon_driver_get_role(ctx, rr);
- if (ctx->state == leader) {
+ if (state == leader) {
int32_t chosen = -1;
int32_t last_term = -1, last_ops = -1;
@@ -2278,13 +2317,13 @@ nsr_reconciliation_driver(void *arg)
* file almost unreadable.
*/
if (!setjmp(*(ctx->env))) {
- ctx->state = reconciliator;
+ state = reconciliator;
goto i_am_reconciliator;
} else {
nsr_driver_log (this->name, GF_LOG_INFO, "long jmp return to leader\n");
free(ctx->env);
ctx->env = NULL;
- ctx->state = leader;
+ state = leader;
}
}
@@ -2294,13 +2333,13 @@ nsr_reconciliation_driver(void *arg)
nsr_driver_log (this->name, GF_LOG_INFO, "local node resolution needs to be done. before set jmp\n");
ctx->env = calloc(1,sizeof(jmp_buf));
if (!setjmp(*(ctx->env))) {
- ctx->state = resolutor;
+ state = resolutor;
goto i_am_resolutor;
} else {
nsr_driver_log (this->name, GF_LOG_INFO, "long jmp return to leader\n");
free(ctx->env);
ctx->env = NULL;
- ctx->state = leader;
+ state = leader;
}
}
@@ -2316,7 +2355,7 @@ nsr_reconciliation_driver(void *arg)
}
i_am_reconciliator:
- if (ctx->state == reconciliator) {
+ if (state == reconciliator) {
gf_boolean_t do_recon = _gf_false;
uint32_t start_index = ctx->workers[0].recon_info->first_index;
uint32_t end_index = ctx->workers[0].recon_info->last_index;
@@ -2331,7 +2370,7 @@ i_am_reconciliator:
(ctx->workers[0].recon_info->last_term == ctx->workers[i].recon_info->last_term)) {
ctx->workers[i].recon_info->last_index = end_index;
ctx->workers[i].recon_info->first_index = start_index;
- bm = (1 << i);
+ bm |= (1 << i);
do_recon = _gf_true;
}
}
@@ -2445,7 +2484,7 @@ i_am_reconciliator:
}
i_am_resolutor:
- if (ctx->state == resolutor) {
+ if (state == resolutor) {
// This node's last term is filled when it gets a message
// from the leader to act as a reconciliator.
@@ -2600,6 +2639,36 @@ i_am_resolutor:
}
+ if (state == joiner) {
+
+ int32_t chosen = -1;
+ int32_t last_term = -1, last_ops = -1;
+
+ nsr_driver_log (this->name, GF_LOG_INFO, "getting last term info from all members of this group\n");
+ // Get last term info from all members for this group
+ // which will be the leader(this node) and the node that wants to join.
+ send_and_wait(-1,
+ replica_group_size,
+ ctx,
+ NSR_WORK_ID_GET_LAST_TERM_INFO,
+ NSR_RECON_QUEUE_TO_CONTROL, ctx->current_term);
+
+
+ // send message to other node that just joined to sync up with this node which is also the leader
+ nsr_driver_log (this->name, GF_LOG_INFO, "sending resolution work to all nodes except this\n");
+ bm = ~(1);
+ send_and_wait(bm,
+ replica_group_size,
+ ctx,
+ NSR_WORK_ID_RESOLUTION_DO_WORK,
+ NSR_RECON_QUEUE_TO_CONTROL, -1);
+
+ nsr_driver_log (this->name, GF_LOG_INFO,
+ "finished recon work as joiner \n");
+
+ }
+
+
// free the asasociated recon_info contexts created as part of this role
out:
@@ -2617,7 +2686,8 @@ out:
#endif
nsr_driver_log (this->name, GF_LOG_INFO,
"finished sending end of reconciliation message \n");
- ctx->state = 0;
+ }
+ list_del_init (&rr->list);
}
return NULL;
diff --git a/xlators/cluster/nsr-recon/src/recon_driver.h b/xlators/cluster/nsr-recon/src/recon_driver.h
index 67f4d6014..a9a9a9182 100644
--- a/xlators/cluster/nsr-recon/src/recon_driver.h
+++ b/xlators/cluster/nsr-recon/src/recon_driver.h
@@ -78,6 +78,7 @@ typedef enum nsr_recon_driver_state_t {
leader = 1,
reconciliator = 2,
resolutor = 3,
+ joiner = 4,
} nsr_recon_driver_state_t;
// role structure
@@ -177,6 +178,12 @@ typedef struct nsr_recon_record_details_s {
rd.len = f(rd.len); \
}
+typedef struct _nsr_role_work_s {
+ nsr_recon_role_t role;
+ uint32_t txn_id;
+ struct list_head list;
+} nsr_role_work_t;
+
typedef struct _nsr_recon_work_s {
gf_boolean_t in_use;
uint32_t index;
@@ -206,7 +213,8 @@ typedef struct _nsr_reconciliator_info {
int32_t commited_ops;
uint32_t last_index;
uint32_t first_index;
- nsr_reconciliation_record_t records[MAX_RECONCILIATION_WINDOW_SIZE];
+ //nsr_reconciliation_record_t records[MAX_RECONCILIATION_WINDOW_SIZE];
+ nsr_reconciliation_record_t *records;
} nsr_reconciliator_info_t;
typedef struct _nsr_per_node_worker_s {
@@ -221,7 +229,7 @@ typedef struct _nsr_per_node_worker_s {
char local; // local data worker
//struct list_head list; //list of work items
nsr_recon_work_t head;
- pthread_mutex_t mutex; //mutex to gaurd the above list
+ pthread_mutex_t mutex; //mutex to guard the state
pthread_cond_t cv; //condition variable for signaling the worker thread
gf_boolean_t is_control;
#ifdef NSR_DEBUG
@@ -242,9 +250,9 @@ typedef struct _nsr_recon_driver_ctxt {
uint32_t replica_group_size; // number of static members of replica group
nsr_replica_worker_t *workers; // worker info
int32_t reconciliator;
- pthread_mutex_t mutex; //mutex to gaurd the state
- pthread_cond_t cv; //condition variable for signaling the driver thread
- uint32_t state; //driver state
+ pthread_mutex_t mutex;
+ pthread_cond_t cv;
+ nsr_role_work_t role_head;
volatile int32_t outstanding;
uint32_t reconciliator_index;
uint32_t txn_id;
@@ -278,8 +286,20 @@ nsr_recon_driver_set_role(nsr_recon_driver_ctx_t *ctx, nsr_recon_role_t *rr, uin
{ \
char c[255]; \
if (!ctx->driver_log_fd) { \
+ char str[255], b[255]; \
+ char *ptr; \
+ nsr_recon_private_t *priv = ctx->this->private; \
+ strcpy(b, priv->replica_group_members[0]); \
+ ptr = strchr (b, '/'); \
+ while (ptr) { \
+ *ptr = '-'; \
+ ptr = strchr (b, '/'); \
+ } \
+ sprintf(str,"/tmp/nsr-logs/%s",b); \
mkdir("/tmp/nsr-logs/", 0777); \
- ctx->driver_log_fd = open("/tmp/nsr-logs/nsr-driver-log", O_RDWR|O_CREAT|O_TRUNC); \
+ mkdir(str, 0777); \
+ sprintf(str,"/tmp/nsr-logs/%s/nsr-driver-log",b); \
+ ctx->driver_log_fd = open(str, O_RDWR|O_CREAT|O_TRUNC); \
} \
sprintf(c, fmt); \
write(ctx->driver_log_fd, c, strlen(c)); \
@@ -293,9 +313,19 @@ nsr_recon_driver_set_role(nsr_recon_driver_ctx_t *ctx, nsr_recon_role_t *rr, uin
{ \
char c[255]; \
if (!ctx->worker_log_fd) { \
- char str[255]; \
- sprintf(str,"/tmp/nsr-logs/%s-%d",ctx->is_control? "con" : "data",ctx->index); \
+ char str[255], b[255]; \
+ char *ptr; \
+ nsr_recon_private_t *priv = ctx->driver_ctx->this->private; \
+ strcpy(b, priv->replica_group_members[0]); \
+ ptr = strchr (b, '/'); \
+ while (ptr) { \
+ *ptr = '-'; \
+ ptr = strchr (b, '/'); \
+ } \
+ sprintf(str,"/tmp/nsr-logs/%s",b); \
mkdir("/tmp/nsr-logs/", 0777); \
+ mkdir(str, 0777); \
+ sprintf(str,"/tmp/nsr-logs/%s/%s-%d",b,ctx->is_control?"con":"data",ctx->index); \
ctx->worker_log_fd = open(str, O_RDWR|O_CREAT|O_TRUNC); \
} \
sprintf(c, fmt); \
diff --git a/xlators/cluster/nsr-recon/src/recon_xlator.c b/xlators/cluster/nsr-recon/src/recon_xlator.c
index 62583d526..5f63f6671 100644
--- a/xlators/cluster/nsr-recon/src/recon_xlator.c
+++ b/xlators/cluster/nsr-recon/src/recon_xlator.c
@@ -196,8 +196,7 @@ void nsr_recon_libchangelog_get_records(xlator_t *this, char *bp, int32_t term,
// do a mmap; seek into the first and read all records till last.
// TBD - right now all records are pseudo holes but mark them as fills.
// TBD - pseudo hole to be implemented when actual fsync gets done on data.
- char read_buf[((last - first) + 1) * 128];
- char *rb = &(read_buf[0]);
+ char *rb = NULL, *orig = NULL;
char path[PATH_MAX];
int fd;
uint32_t index = 0;
@@ -206,11 +205,14 @@ void nsr_recon_libchangelog_get_records(xlator_t *this, char *bp, int32_t term,
"libchangelog_get_records called for term %d index from %d to %d \n",
term, first, last );
+ orig = rb = GF_CALLOC(128, ((last - first) + 1), gf_mt_recon_private_t);
+
sprintf(path,"%s/%s%d",bp,"TERM.",term);
fd = open(path, O_RDONLY);
if (fd != -1) {
char *start = NULL;
nsr_recon_record_details_t * rec = (nsr_recon_record_details_t *)buf;
+
if (first == 0)
lseek(fd, 128, SEEK_SET);
else
@@ -407,6 +409,7 @@ finish:
rec++;
} while(1);
}
+ GF_FREE(orig);
close(fd);
recon_main_log (this->name, GF_LOG_INFO,
@@ -484,7 +487,8 @@ nsr_recon_writev (call_frame_t *frame, xlator_t *this, fd_t *fd,
recon_main_log (this->name, GF_LOG_INFO, "nsr_recon_writev called to set role %d\n", rr.role);
if ((rr.role != leader) &&
(rr.role != reconciliator) &&
- (rr.role != resolutor)) {
+ (rr.role != resolutor) &&
+ (rr.role != joiner)) {
recon_main_log (this->name, GF_LOG_ERROR,
"EIII---nsr_recon_writev cannot set state \n");
STACK_UNWIND_STRICT (writev, frame, -1, op_errno,
@@ -577,7 +581,7 @@ nsr_recon_readv (call_frame_t *frame, xlator_t *this,
int32_t ret = -1;
nsr_recon_private_t *priv = NULL;
- iobuf = iobuf_get2 (this->ctx->iobuf_pool, op_ret);
+ iobuf = iobuf_get2 (this->ctx->iobuf_pool, size);
if (!iobuf) {
op_errno = ENOMEM;
goto out;
@@ -623,6 +627,7 @@ nsr_recon_readv (call_frame_t *frame, xlator_t *this,
(num * sizeof(nsr_recon_record_details_t)), size);
GF_ASSERT(size == (num * sizeof(nsr_recon_record_details_t)));
+ bzero(iobuf->ptr, size);
recon_main_log (this->name, GF_LOG_INFO,
"nsr_recon_readv - getting records for term=%d from %d to %d\n",
rfd->term, rfd->first_index, rfd->last_index);
diff --git a/xlators/cluster/nsr-recon/src/recon_xlator.h b/xlators/cluster/nsr-recon/src/recon_xlator.h
index c0f1e2145..168db518b 100644
--- a/xlators/cluster/nsr-recon/src/recon_xlator.h
+++ b/xlators/cluster/nsr-recon/src/recon_xlator.h
@@ -55,11 +55,22 @@ typedef struct _nsr_recon_private_s {
#ifdef NSR_DEBUG
#define recon_main_log(dom, levl, fmt...) \
{ \
- nsr_recon_private_t *priv = this->private; \
char c[255]; \
+ nsr_recon_private_t *priv = this->private; \
if (!priv->recon_main_log_fd) { \
+ char str[255], b[255]; \
+ char *ptr; \
+ strcpy(b, priv->replica_group_members[0]); \
+ ptr = strchr (b, '/'); \
+ while (ptr) { \
+ *ptr = '-'; \
+ ptr = strchr (b, '/'); \
+ } \
+ sprintf(str,"/tmp/nsr-logs/%s",b); \
mkdir("/tmp/nsr-logs/", 0777); \
- priv->recon_main_log_fd = open("/tmp/nsr-logs/recon-main-log", O_RDWR|O_CREAT|O_TRUNC); \
+ mkdir(str, 0777); \
+ sprintf(str,"/tmp/nsr-logs/%s/recon-main-log",b); \
+ priv->recon_main_log_fd = open(str, O_RDWR|O_CREAT|O_TRUNC); \
} \
sprintf(c, fmt); \
write(priv->recon_main_log_fd, c, strlen(c)); \
@@ -68,7 +79,6 @@ typedef struct _nsr_recon_private_s {
#define recon_main_log(dom, levl, fmt...) gf_log(dom, levl, fmt)
#endif
-
void nsr_recon_libchangelog_get_this_term_info(xlator_t *this, char *bp, int32_t term, nsr_recon_last_term_info_t *lt);
void nsr_recon_libchangelog_get_last_term_info(xlator_t *this, char *bp, int32_t term, nsr_recon_last_term_info_t *lt);
void nsr_recon_return_back(nsr_recon_private_t *priv, uint32_t term_id);
diff --git a/xlators/cluster/nsr-server/src/Makefile.am b/xlators/cluster/nsr-server/src/Makefile.am
index df0d68539..85a560d09 100644
--- a/xlators/cluster/nsr-server/src/Makefile.am
+++ b/xlators/cluster/nsr-server/src/Makefile.am
@@ -4,9 +4,15 @@ xlator_LTLIBRARIES = nsr.la
xlatordir = $(libdir)/glusterfs/$(PACKAGE_VERSION)/xlator/cluster
nsr_la_LDFLAGS = -module -avoid-version -lgfapi -lcurl
-nsr_la_SOURCES = nsr.c leader.c etcd-api.c \
+
+if ENABLE_ETCD_SIM
+nsr_la_SOURCES = nsr.c leader.c recon_notify.c etcd-sim.c
+else
+nsr_la_SOURCES = nsr.c leader.c recon_notify.c etcd-api.c \
yajl.c yajl_alloc.c yajl_buf.c yajl_encode.c yajl_gen.c \
yajl_lex.c yajl_parser.c yajl_tree.c yajl_version.c
+endif
+
nsr_la_LIBADD = $(top_builddir)/libglusterfs/src/libglusterfs.la
diff --git a/xlators/cluster/nsr-server/src/all-templates.c b/xlators/cluster/nsr-server/src/all-templates.c
index 541653029..7300973d5 100644
--- a/xlators/cluster/nsr-server/src/all-templates.c
+++ b/xlators/cluster/nsr-server/src/all-templates.c
@@ -83,17 +83,20 @@ nsr_$NAME$ (call_frame_t *frame, xlator_t *this,
// follower/recon path
// just send it to local node
if (from_leader || from_recon) {
+ atomic_inc(&priv->ops_in_flight);
STACK_WIND (frame, nsr_$NAME$_complete,
FIRST_CHILD(this), FIRST_CHILD(this)->fops->$NAME$,
$ARGS_SHORT$);
return 0;
}
+
if (!priv->leader || priv->fence_io) {
op_errno = EREMOTE;
goto err;
}
+
if (!xdata) {
xdata = dict_new();
if (!xdata) {
@@ -115,6 +118,7 @@ nsr_$NAME$ (call_frame_t *frame, xlator_t *this,
goto err;
}
+
#if defined(NSR_CG_QUEUE)
nsr_inode_ctx_t *ictx = nsr_get_inode_ctx(this,fd->inode);
if (!ictx) {
@@ -188,6 +192,8 @@ nsr_$NAME$_dispatch (call_frame_t *frame, xlator_t *this,
nsr_private_t *priv = this->private;
xlator_list_t *trav;
+ atomic_inc(&priv->ops_in_flight);
+
/*
* TBD: unblock pending request(s) if we fail after this point but
* before we get to nsr_$NAME$_complete (where that code currently
@@ -246,6 +252,7 @@ nsr_$NAME$_complete (call_frame_t *frame, void *cookie, xlator_t *this,
int32_t op_ret, int32_t op_errno,
$ARGS_LONG$)
{
+ nsr_private_t *priv = this->private;
#if defined(NSR_CG_NEED_FD)
nsr_local_t *local = frame->local;
#endif
@@ -294,6 +301,7 @@ nsr_$NAME$_complete (call_frame_t *frame, void *cookie, xlator_t *this,
STACK_UNWIND_STRICT ($NAME$, frame, op_ret, op_errno,
$ARGS_SHORT$);
+ atomic_dec(&priv->ops_in_flight);
return 0;
}
diff --git a/xlators/cluster/nsr-server/src/etcd-sim.c b/xlators/cluster/nsr-server/src/etcd-sim.c
new file mode 100644
index 000000000..5c5cdcec0
--- /dev/null
+++ b/xlators/cluster/nsr-server/src/etcd-sim.c
@@ -0,0 +1,222 @@
+/*
+ * Copyright (c) 2013, Red Hat
+ * All rights reserved.
+
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+
+ * Redistributions of source code must retain the above copyright notice, this
+ * list of conditions and the following disclaimer. Redistributions in binary
+ * form must reproduce the above copyright notice, this list of conditions and
+ * the following disclaimer in the documentation and/or other materials
+ * provided with the distribution.
+
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <fcntl.h>
+
+#include "call-stub.h"
+#include "defaults.h"
+#include "xlator.h"
+#include "api/src/glfs.h"
+#include "api/src/glfs-internal.h"
+#include "run.h"
+
+
+/*
+ * Mock implementation of etcd
+ * The etcd file is simulated in /tmp/<server-names>
+ * Writes from Multiple writers are protected using file lock.
+*/
+
+#include "etcd-api.h"
+#define T_FORMAT "%d-%b-%Y,%H:%M:%S"
+#define MAX_KEY_LEN 64
+#define MAX_VALUE_LEN 64
+#define MAX_TTL_LEN 12
+
+etcd_session
+etcd_open (etcd_server *server_list)
+{
+ return NULL;
+}
+
+typedef struct _etcd_sim_s {
+ int fd;
+ FILE *stream;
+} etcd_sim_t;
+
+void
+etcd_close (etcd_session this)
+{
+ etcd_sim_t *sim = (etcd_sim_t *)this;
+ fflush(sim->stream);
+ fclose(sim->stream);
+ close(sim->fd);
+ free(this);
+}
+
+
+char *
+etcd_get (etcd_session this, char *key)
+{
+ char *str = NULL;
+ size_t len;
+ etcd_sim_t *sim = (etcd_sim_t *)this;
+ struct tm tm;
+ time_t old, new;
+ lockf(sim->fd, F_LOCK, 0 );
+ if (fseek(sim->stream, 0, SEEK_SET) == -1) {
+ lockf(sim->fd, F_ULOCK, 0 );
+ return NULL;
+ }
+ // Read the file
+ while(1) {
+ if(str) {
+ free(str);
+ str = NULL;
+ }
+ if (getline((char **)&str, &len,sim->stream) == -1) {
+ break;
+ }
+ if (!strncmp(str, key, strlen(key))) {
+ char k[256], s[256], *ret, past[256];
+ unsigned int ttl;
+ double delta;
+ sscanf(str,"%s %s %d %s",k, s, &ttl, past);
+ strptime(past, T_FORMAT, &tm);
+ old = mktime(&tm);
+ new = time(NULL);
+ delta = difftime(new, old);
+ // check if key is expired.
+ // If ttl is 0, it means key has infinite ttk=l.
+ if ((!ttl) || ((delta >= 0) && (delta < ttl))) {
+ ret = calloc(1, strlen(s) + 1);
+ strcpy(ret,s);
+ free(str);
+ lockf(sim->fd, F_ULOCK, 0 );
+ return(ret);
+ }
+ }
+ }
+ lockf(sim->fd, F_ULOCK, 0 );
+ return NULL;
+}
+
+
+etcd_result
+etcd_set (etcd_session this, char *key, char *value,
+ char *precond, unsigned int ttl)
+{
+ char *str = NULL;
+ char buf[255];
+ char tp[255];
+ char s[255];
+ size_t len;
+ etcd_sim_t *sim = (etcd_sim_t *)this;
+ struct tm tm;
+ time_t old, new;
+ lockf(sim->fd, F_LOCK, 0 );
+ if (fseek(sim->stream, 0, SEEK_SET) == -1) {
+ lockf(sim->fd, F_ULOCK, 0 );
+ return ETCD_WTF;
+ }
+ while(1) {
+ if(str) {
+ free(str);
+ str = NULL;
+ }
+ if (getline((char **)&str, &len,sim->stream) == -1) {
+ break;
+ }
+ if (!strncmp(str, key, strlen(key))) {
+ char k[256], s[256], past[256];
+ unsigned int ttl;
+ double delta;
+ sscanf(str,"%s %s %d %s",k, s, &ttl, past);
+ strptime(past, T_FORMAT, &tm);
+ old = mktime(&tm);
+ new = time(NULL);
+ delta = difftime(new, old);
+ // check if the present key is expired
+ if ( (!ttl) || ((delta >= 0) && (delta < ttl))) {
+ // present key not expired. In case of precondition,
+ // check if it matches. If not return with error
+ // In case of no precond, return error since
+ // present key not yet expired.
+ if ((!precond) || (strcmp(precond, s))) {
+ free(str);
+ lockf(sim->fd, F_ULOCK, 0 );
+ return ETCD_WTF;
+ }
+ }
+ fseek(sim->stream, -strlen(str), SEEK_CUR);
+ free(str);
+ goto here;
+ }
+ }
+here:
+ memset(tp, 0, 255);
+ new = time(NULL);
+ memcpy(&tm, localtime(&new), sizeof(struct tm));
+ strftime(buf, sizeof(buf), T_FORMAT, &tm);
+ // what we want to print in the file is something like this
+ // key value(at offset of 64) ttl(offset to 128) time(left offset to 140)
+ // hence we would want to create a format buf as follows:
+ // "%-64s%-64s%-16d%-18s"
+ // Hence we construct this first (in string s) and use that to print into tp
+ // which gets written to the registry file.
+ sprintf(s,"%%-%ds%%-%ds%%-%dd%%s\n",
+ MAX_KEY_LEN, MAX_VALUE_LEN, MAX_TTL_LEN);
+ sprintf(tp,s,key, value, ttl, buf);
+ if (fwrite(tp, 1,strlen(tp), sim->stream) != strlen(tp)) {
+ lockf(sim->fd, F_ULOCK, 0 );
+ return ETCD_WTF;
+ }
+ fflush(sim->stream);
+ lockf(sim->fd, F_ULOCK, 0 );
+ return ETCD_OK;
+}
+
+
+
+etcd_session
+etcd_open_str (char *server_names)
+{
+ etcd_sim_t *sim;
+ char name[256];
+
+ sim = calloc(1, sizeof(etcd_sim_t));
+ sprintf(name, "/tmp/%s", server_names);
+ sim->fd = open(name, O_RDWR | O_CREAT);
+ if (sim->fd == -1)
+ return NULL;
+ sim->stream = fopen(name, "r+");
+ if (sim->stream == NULL)
+ return NULL;
+
+ return ((void *)sim);
+}
+
+
+void
+etcd_close_str (etcd_session this)
+{
+ etcd_close(this);
+}
diff --git a/xlators/cluster/nsr-server/src/leader.c b/xlators/cluster/nsr-server/src/leader.c
index bb0dbabe7..319f99317 100644
--- a/xlators/cluster/nsr-server/src/leader.c
+++ b/xlators/cluster/nsr-server/src/leader.c
@@ -23,11 +23,14 @@
#include "api/src/glfs.h"
#include "api/src/glfs-internal.h"
+#ifndef NSR_SIM_ETCD
#include "etcd-api.h"
+#endif
#include "nsr-internal.h"
#include "../../nsr-recon/src/recon_driver.h"
#include "../../nsr-recon/src/recon_xlator.h"
+
/* Vote format: UUID,vote_status,fitness,term_number */
#define VOTE_ELEMS 4 /* Whole match plus four actual pieces. */
#define DEFAULT_FITNESS 42
@@ -39,6 +42,10 @@ enum { NO_LEADER, TENTATIVE, CONFIRMED };
regex_t vote_re;
+// Simulation of etcd routines
+#ifndef NSR_SIM_ETCD
+#endif
+
long
nsr_get_fitness (xlator_t *this)
{
@@ -46,69 +53,14 @@ nsr_get_fitness (xlator_t *this)
return 42;
}
-long
-nsr_get_term (xlator_t *this)
-{
- nsr_private_t *priv = this->private;
- char *text = NULL;
- etcd_session etcd = priv->etcd;
-
- text = etcd_get(etcd, priv->term_uuid);
- // first time and hence no key at all.
- // this should ideally be done at vol creation time
- // by glusterd. Move it there later
- if(text == NULL) {
- gf_log (this->name, GF_LOG_TRACE, "nsr_get_term returns 1");
- return 0;
- } else {
- gf_log (this->name, GF_LOG_TRACE,
- "nsr_get_term returns %ld", strtol(text, NULL, 10));
- return (strtol(text, NULL, 10));
- }
-}
-
-
-// in etcd-api-master.
-// send a patch to this package to expose this
-extern size_t
-parse_get_response (void *ptr, size_t size, size_t nmemb, void *stream);
-typedef struct {
- etcd_server *servers;
-} _etcd_session;
-typedef size_t curl_callback_t (void *, size_t, size_t, void *);
-extern etcd_result etcd_get_one (_etcd_session *this, char *key, etcd_server *srv, char *prefix,
- char *post, curl_callback_t cb, char **stream);
-
-
-
-void
-nsr_leader_cb(glfs_fd_t *fd, ssize_t ret, void *data)
-{
- xlator_t *this = (xlator_t *) data;
- nsr_private_t *priv = this->private;
-
- gf_log (this->name, GF_LOG_INFO,
- "nsr_leader_cb arrived with return value %d", (int)ret);
-
- // TBD - error handling; look at ret
- atomic_fetch_and(&(priv->fence_io), 0);
-
- return;
-}
-
-void
-nsr_set_leader (xlator_t *this)
+static void
+nsr_set_leader (xlator_t *this, etcd_session etcd)
{
long term = 0;
- etcd_server *srv;
etcd_result res;
- char *value = NULL;
nsr_private_t *priv = this->private;
- _etcd_session *etcd = priv->etcd;
char *term_key = priv->term_uuid;
- char *master_key = priv->vol_uuid;
char n_t[sizeof(long)+1];
- nsr_recon_role_t role;
char *text = NULL;
gf_log (this->name, GF_LOG_INFO, "Just became leader");
@@ -134,45 +86,12 @@ nsr_set_leader (xlator_t *this)
priv->current_term = term + 1;
+ // Move this inside recon notify???
atomic_fetch_or(&(priv->fence_io), 1);
- role.num = 0;
- role.role = leader;
- // Get the rest of nodes for this term.
- // TBD: fix this so that it uses per-brick keys instead of violating
- // modularity and making bad assumptions about etcd behavior.
- for (srv = etcd->servers; srv->host; ++srv) {
- res = etcd_get_one(etcd,master_key,srv,"keys/",NULL,
- parse_get_response,&value);
- gf_log (this->name, GF_LOG_INFO,
- "Probing for %s, got %d, value:%s",
- srv->host, res, value);
- if ((res == ETCD_OK) && value) {
- gf_log (this->name, GF_LOG_INFO,
- "Found for %s", srv->host);
- strcpy(role.info[role.num].name, srv->host);
- (role.num)++;
- }
- value = NULL;
- }
- gf_log (this->name, GF_LOG_INFO,
- "Discovered %d nodes that has key %s", role.num, master_key);
-
- gf_log (this->name, GF_LOG_INFO,
- "setting current term as %ld", term + 1);
- role.current_term = term + 1;
- ENDIAN_CONVERSION_RR(role, _gf_false);
-
- // inform the reconciliator that this is leader
- // in the callback (once reconciliation is done),
- // we will unfence the IOs.
- // TBD - error handling later.
- glfs_lseek(priv->fd, nsr_recon_xlator_sector_1, SEEK_SET);
- gf_log (this->name, GF_LOG_INFO,
- "Writing to local node to set leader");
- glfs_write_async(priv->fd, &role,
- sizeof(role),nsr_recon_xlator_sector_1,
- nsr_leader_cb, this);
+ nsr_recon_notify_event_set_leader(priv);
+
+ return;
}
@@ -218,7 +137,7 @@ nsr_get_leader (xlator_t *this, etcd_session etcd, char *key)
gf_log (this->name, GF_LOG_TRACE,
"leader is %s\n",nominee);
if (strcmp(nominee,priv->brick_uuid) == 0) {
- nsr_set_leader(this);
+ nsr_set_leader(this, etcd);
retval = LS_SUCCESS;
}
else {
@@ -316,60 +235,10 @@ nsr_init_re (xlator_t *this)
}
-uint32_t
-nsr_leader_setup_recon (xlator_t *this)
-{
- nsr_private_t *priv = this->private;
- xlator_t *old = this;
- uint32_t ret = 0;
-
- if (priv->nsr_recon_start == _gf_false)
- return 0;
-
- priv->fs = glfs_new(priv->vol_uuid);
- if (!priv->fs) {
- ret = 1;
- gf_log (this->name, GF_LOG_ERROR, "failed to initialise glfs \n");
- goto done;
- }
-
- glusterfs_this_set(old);
- ret = glfs_set_volfile(priv->fs, priv->vol_file);
- if (ret != 0) {
- gf_log (this->name, GF_LOG_ERROR, "failed to set volfile \n");
- goto done;
- }
-
- glusterfs_this_set(old);
- /*
- * REVIEW
- * Logs belong in /var/log not /tmp.
- */
- glfs_set_logging (priv->fs,"/tmp/glfs-log", 7);
- if (glfs_init(priv->fs) < 0) {
- gf_log (this->name, GF_LOG_ERROR, "failed to init volfile \n");
- ret = 1;
- goto done;
- }
-
- glusterfs_this_set(old);
- priv->fd = glfs_open (priv->fs, "/", O_RDWR);
- if (priv->fd == NULL) {
- ret = 1;
- gf_log (this->name, GF_LOG_ERROR,
- "failed to open fd to communicate with recon process \n");
- goto done;
- }
-
-
-done:
- glusterfs_this_set(old);
- return ret;
-}
-
void *
-nsr_leader_thread (xlator_t *this)
+nsr_leader_thread (void *arg)
{
+ xlator_t *this = (xlator_t *) arg;
leader_retval_t retval;
nsr_private_t *priv = this->private;
@@ -378,14 +247,6 @@ nsr_leader_thread (xlator_t *this)
return NULL;
}
- if (nsr_leader_setup_recon(this)) {
- gf_log (this->name, GF_LOG_ERROR,
- "failed to do glfs initialisation inside leader thread");
- return NULL;
- }
-
- priv->leader_inited = 1;
-
gf_log (this->name, GF_LOG_INFO,
"calling glfs_opens_str on servers %s", priv->etcd_servers);
@@ -396,6 +257,8 @@ nsr_leader_thread (xlator_t *this)
return NULL;
}
+ priv->leader_inited = 1;
+
for (;;) {
if (nsr_get_leader(this,priv->etcd,priv->vol_uuid) == LS_ERROR) {
break;
diff --git a/xlators/cluster/nsr-server/src/nsr-internal.h b/xlators/cluster/nsr-server/src/nsr-internal.h
index 282247a47..4382f5426 100644
--- a/xlators/cluster/nsr-server/src/nsr-internal.h
+++ b/xlators/cluster/nsr-server/src/nsr-internal.h
@@ -22,6 +22,16 @@ enum {
gf_mt_nsr_end
};
+typedef enum nsr_recon_notify_ev_id_t {
+ NSR_RECON_SET_LEADER = 1,
+ NSR_RECON_ADD_CHILD = 2
+} nsr_recon_notify_ev_id_t;
+
+typedef struct _nsr_recon_notify_ev_s {
+ nsr_recon_notify_ev_id_t id;
+ uint32_t index; // in case of add
+ struct list_head list;
+} nsr_recon_notify_ev_t;
typedef struct {
char *etcd_servers;
@@ -29,21 +39,23 @@ typedef struct {
char *term_uuid;
char *brick_uuid;
gf_boolean_t leader;
+ uint8_t up_children;
uint8_t n_children;
char *vol_file;
- glfs_t *fs;
etcd_session etcd;
volatile unsigned int fence_io;
- glfs_fd_t *fd;
uint32_t current_term;
#ifdef NSR_DEBUG
uint32_t leader_log_fd;
#endif
+ volatile int recon_notify_inited;
volatile int leader_inited;
uint32_t kid_state;
gf_lock_t dirty_lock;
struct list_head dirty_fds;
gf_boolean_t nsr_recon_start;
+ void * recon_ctx;
+ volatile uint32_t ops_in_flight;
} nsr_private_t;
typedef struct {
@@ -79,3 +91,7 @@ typedef struct {
struct list_head pqueue;
} nsr_inode_ctx_t;
+void nsr_recon_notify_event_set_leader(nsr_private_t *priv);
+void nsr_recon_notify_event_add_child(nsr_private_t *priv, uint32_t index);
+void* nsr_recon_notify_thread (void *this);
+
diff --git a/xlators/cluster/nsr-server/src/nsr.c b/xlators/cluster/nsr-server/src/nsr.c
index 3707b3003..f85368456 100644
--- a/xlators/cluster/nsr-server/src/nsr.c
+++ b/xlators/cluster/nsr-server/src/nsr.c
@@ -258,7 +258,6 @@ nsr_getxattr_special (call_frame_t *frame, xlator_t *this, loc_t *loc,
const char *name, dict_t *xdata)
{
dict_t *result;
- uint8_t up;
nsr_private_t *priv = this->private;
if (!priv->leader) {
@@ -279,8 +278,8 @@ nsr_getxattr_special (call_frame_t *frame, xlator_t *this, loc_t *loc,
goto dn_failed;
}
- up = nsr_count_up_kids(this->private);
- if (dict_set_uint32(result,NSR_REP_COUNT_XATTR,up) != 0) {
+ priv->up_children = nsr_count_up_kids(this->private);
+ if (dict_set_uint32(result,NSR_REP_COUNT_XATTR,priv->up_children) != 0) {
goto dsu_failed;
}
@@ -399,6 +398,8 @@ nsr_reconfigure (xlator_t *this, dict_t *options)
nsr_private_t *priv = this->private;
GF_OPTION_RECONF ("leader", priv->leader, options, bool, err);
+ gf_log (this->name, GF_LOG_INFO,
+ "reconfigure called. setting priv->leader to %d\n", priv->leader);
return 0;
err:
@@ -440,20 +441,25 @@ nsr_notify (xlator_t *this, int event, void *data, ...)
index = nsr_get_child_index(this,data);
if (index >= 0) {
priv->kid_state |= (1 << index);
+ priv->up_children = nsr_count_up_kids(priv);
gf_log (this->name, GF_LOG_INFO,
"got CHILD_UP for %s, now %u kids",
((xlator_t *)data)->name,
- nsr_count_up_kids(priv));
+ priv->up_children);
+ if (priv->nsr_recon_start == _gf_true) {
+ nsr_recon_notify_event_add_child(priv, index);
+ }
}
break;
case GF_EVENT_CHILD_DOWN:
index = nsr_get_child_index(this,data);
if (index >= 0) {
priv->kid_state &= ~(1 << index);
+ priv->up_children = nsr_count_up_kids(priv);
gf_log (this->name, GF_LOG_INFO,
"got CHILD_DOWN for %s, now %u kids",
((xlator_t *)data)->name,
- nsr_count_up_kids(priv));
+ priv->up_children);
}
break;
default:
@@ -475,7 +481,7 @@ nsr_init (xlator_t *this)
xlator_list_t *trav;
pthread_t kid;
uuid_t tmp_uuid;
- char *my_name = NULL, *recon_file = NULL, *recon_pid_file = NULL, *ptr = NULL;
+ char *my_name = NULL, *morph_name = NULL, *recon_file = NULL, *recon_pid_file = NULL, *ptr = NULL;
char *volname;
extern xlator_t global_xlator;
glusterfs_ctx_t *oldctx = global_xlator.ctx;
@@ -552,34 +558,36 @@ nsr_init (xlator_t *this)
gf_log (this->name, GF_LOG_ERROR, "vol name not generated. ???");
goto err;
}
-
- recon_file = GF_CALLOC (1,PATH_MAX + strlen(my_name) + strlen("con") +1, gf_mt_nsr_private_t);
- recon_pid_file = GF_CALLOC (1,PATH_MAX + strlen(my_name) + strlen("recon") +1, gf_mt_nsr_private_t);
+
+ morph_name = GF_CALLOC (1, strlen(my_name) + 1, gf_mt_nsr_private_t);
+ strcpy(morph_name, my_name);
+ recon_file = GF_CALLOC (1,PATH_MAX + strlen(morph_name) + strlen("con") +1, gf_mt_nsr_private_t);
+ recon_pid_file = GF_CALLOC (1,PATH_MAX + strlen(morph_name) + strlen("recon") +1, gf_mt_nsr_private_t);
if ((!recon_file) || (!recon_pid_file)) {
gf_log (this->name, GF_LOG_ERROR, "could not allocate reconciliation file name");
goto err;
}
- ptr = strchr (my_name, '/');
+ ptr = strchr (morph_name, '/');
while (ptr) {
*ptr = '-';
- ptr = strchr (my_name, '/');
+ ptr = strchr (morph_name, '/');
}
sprintf(recon_file,"/%s/%s/%s/%s/",GLUSTERD_DEFAULT_WORKDIR,
GLUSTERD_VOLUME_DIR_PREFIX,
volname,
GLUSTERD_BRICK_INFO_DIR);
- strcat(recon_file, my_name);
+ strcat(recon_file, morph_name);
strcat(recon_file, "-nsr-recon.vol");
sprintf(recon_pid_file,"/%s/%s/%s/%s/",GLUSTERD_DEFAULT_WORKDIR,
GLUSTERD_VOLUME_DIR_PREFIX,
volname,
"run");
- strcat(recon_pid_file, my_name);
+ strcat(recon_pid_file, morph_name);
strcat(recon_pid_file, "-recon.pid");
- priv->vol_file = GF_CALLOC (1,PATH_MAX + strlen(my_name) + strlen("con") +1, gf_mt_nsr_private_t);
+ priv->vol_file = GF_CALLOC (1,PATH_MAX + strlen(morph_name) + strlen("con") +1, gf_mt_nsr_private_t);
if (!priv->vol_file) {
gf_log (this->name, GF_LOG_ERROR, "could not allocate reconciliation file name");
goto err;
@@ -590,7 +598,7 @@ nsr_init (xlator_t *this)
volname,
GLUSTERD_BRICK_INFO_DIR);
strcat(priv->vol_file, "con:");
- strcat(priv->vol_file, my_name);
+ strcat(priv->vol_file, morph_name);
if (pthread_create(&kid,NULL,nsr_flush_thread,this) != 0) {
gf_log (this->name, GF_LOG_ERROR,
@@ -622,10 +630,17 @@ nsr_init (xlator_t *this)
}
+ (void)pthread_create(&kid,NULL,nsr_recon_notify_thread,this);
+ while (priv->recon_notify_inited == 0) {
+ sleep(1);
+ }
+
(void)pthread_create(&kid,NULL,nsr_leader_thread,this);
while (priv->leader_inited == 0) {
sleep(1);
}
+
+
/*
* Calling glfs_new changes old->ctx, even if THIS still points
* to global_xlator. That causes problems later in the main
diff --git a/xlators/cluster/nsr-server/src/recon_notify.c b/xlators/cluster/nsr-server/src/recon_notify.c
new file mode 100644
index 000000000..9cf2fce5d
--- /dev/null
+++ b/xlators/cluster/nsr-server/src/recon_notify.c
@@ -0,0 +1,345 @@
+/*
+ Copyright (c) 2013 Red Hat, Inc. <http://www.redhat.com>
+ This file is part of GlusterFS.
+
+ This file is licensed to you under your choice of the GNU Lesser
+ General Public License, version 3 or any later version (LGPLv3 or
+ later), or the GNU General Public License, version 2 (GPLv2), in all
+ cases as published by the Free Software Foundation.
+*/
+
+#include <string.h>
+
+#ifndef _CONFIG_H
+#define _CONFIG_H
+#include "config.h"
+#endif
+
+#include "call-stub.h"
+#include "defaults.h"
+#include "xlator.h"
+#include "api/src/glfs.h"
+#include "api/src/glfs-internal.h"
+#include "etcd-api.h"
+#include "nsr-internal.h"
+#include "../../nsr-recon/src/recon_driver.h"
+#include "../../nsr-recon/src/recon_xlator.h"
+
+
+
+typedef struct _nsr_recon_notify_ctx_t {
+ nsr_recon_notify_ev_t recon_head;
+ pthread_mutex_t recon_mutex;
+ pthread_cond_t recon_cv;
+ char **hosts; // list of hosts ordered depending on child indices
+ uint32_t current_term;
+ uint32_t last_reconciled_term;
+ glfs_t *fs;
+ glfs_fd_t *fd;
+} nsr_recon_notify_ctx_t;
+
+static int
+xlator_get_option (xlator_t *xl, char *key, char **value)
+{
+ GF_ASSERT (xl);
+ return dict_get_str (xl->options, key, value);
+}
+
+void nsr_recon_notify_event_set_leader(nsr_private_t *priv)
+{
+ nsr_recon_notify_ev_t *ev;
+ nsr_recon_notify_ctx_t *ctx = (nsr_recon_notify_ctx_t *)priv->recon_ctx;
+
+ ev = GF_CALLOC (1, sizeof (nsr_recon_notify_ev_t), 0);
+ ev->id = NSR_RECON_SET_LEADER;
+ INIT_LIST_HEAD(&(ev->list));
+ pthread_mutex_lock(&ctx->recon_mutex);
+ list_add_tail(&ev->list, &ctx->recon_head.list);
+ pthread_cond_signal(&ctx->recon_cv);
+ pthread_mutex_unlock(&ctx->recon_mutex);
+}
+
+void nsr_recon_notify_event_add_child(nsr_private_t *priv, uint32_t index)
+{
+ nsr_recon_notify_ev_t *ev;
+ nsr_recon_notify_ctx_t *ctx = (nsr_recon_notify_ctx_t *)priv->recon_ctx;
+
+ ev = GF_CALLOC (1, sizeof (nsr_recon_notify_ev_t), 0);
+ ev->id = NSR_RECON_ADD_CHILD;
+ ev->index = index;
+ INIT_LIST_HEAD(&(ev->list));
+ pthread_mutex_lock(&ctx->recon_mutex);
+ list_add_tail(&ev->list, &ctx->recon_head.list);
+ pthread_cond_signal(&ctx->recon_cv);
+ pthread_mutex_unlock(&ctx->recon_mutex);
+}
+
+
+static void
+nsr_recon_set_leader (xlator_t *this)
+{
+
+ nsr_private_t *priv = this->private;
+ nsr_recon_notify_ctx_t *ctx = (nsr_recon_notify_ctx_t *)priv->recon_ctx;
+ nsr_recon_role_t role;
+ xlator_t *old = this;
+ uint32_t i=0;
+
+ if (priv->leader != _gf_true)
+ return;
+
+ if (ctx->last_reconciled_term == priv->current_term)
+ return;
+
+ // No majority as of yet
+ if (priv->up_children <= (priv->n_children / 2))
+ return;
+
+ gf_log (this->name, GF_LOG_INFO,
+ "Sending message to do recon with %d nodes\n",
+ priv->up_children);
+
+ role.num = 0;
+ role.role = leader;
+ for (i = 0; i < priv->n_children; ++i) {
+ if (priv->kid_state & (1 << i)) {
+ gf_log (this->name, GF_LOG_INFO,
+ "Recon using host %s",
+ ctx->hosts[i]);
+ strcpy(role.info[role.num].name, ctx->hosts[i]);
+ (role.num)++;
+ }
+ }
+
+ gf_log (this->name, GF_LOG_INFO,
+ "setting current term as %d", priv->current_term);
+ role.current_term = priv->current_term;
+ ENDIAN_CONVERSION_RR(role, _gf_false);
+
+ // inform the reconciliator that this is leader
+ // in the callback (once reconciliation is done),
+ // we will unfence the IOs.
+ // TBD - error handling later.
+ glfs_lseek(ctx->fd, nsr_recon_xlator_sector_1, SEEK_SET);
+ glusterfs_this_set(old);
+ gf_log (this->name, GF_LOG_INFO,
+ "Writing to local node to set leader");
+ glfs_write(ctx->fd, &role,
+ sizeof(role), 0);
+ glusterfs_this_set(old);
+ gf_log (this->name, GF_LOG_INFO,
+ "glfs_write returned. unfencing IO\n");
+
+ // TBD - error handling
+
+ ctx->last_reconciled_term = priv->current_term;
+ atomic_fetch_and(&(priv->fence_io), 0);
+
+ return;
+}
+
+static void
+nsr_recon_add_child (xlator_t *this, uint32_t index)
+{
+ nsr_private_t *priv = this->private;
+ nsr_recon_notify_ctx_t *ctx = (nsr_recon_notify_ctx_t *)priv->recon_ctx;
+ nsr_recon_role_t role;
+ xlator_t *old = this;
+
+ if (priv->leader != _gf_true)
+ return;
+
+ // reconciliation still pending.
+ // Check if we have majority
+ if (ctx->last_reconciled_term != priv->current_term) {
+ nsr_recon_set_leader(this);
+ } else {
+ // Reconciliation done.
+ // new child joining the majority/
+ // Do reconciliation only fot this child but after fencing new IO and draining old IO
+ role.num = 1;
+ role.role = joiner;
+
+ atomic_fetch_or(&(priv->fence_io), 1);
+ while(priv->ops_in_flight) {
+ sleep(1);
+ }
+
+ strcpy(role.info[0].name, ctx->hosts[index]);
+ role.current_term = priv->current_term;
+ ENDIAN_CONVERSION_RR(role, _gf_false);
+ glfs_lseek(ctx->fd, nsr_recon_xlator_sector_1, SEEK_SET);
+ glusterfs_this_set(old);
+ gf_log (this->name, GF_LOG_INFO,
+ "Writing to local node to join %s\n", role.info[0].name);
+ glfs_write(ctx->fd, &role,
+ sizeof(role), 0);
+ glusterfs_this_set(old);
+ gf_log (this->name, GF_LOG_INFO,
+ "Write to local node to set joiner returned\n");
+
+ // TBD - error handling
+ atomic_fetch_and(&(priv->fence_io), 0);
+ }
+
+ return;
+}
+
+static uint32_t
+nsr_setup_recon (xlator_t *this)
+{
+ nsr_private_t *priv = this->private;
+ xlator_t *old = this;
+ uint32_t ret = 0;
+ nsr_recon_notify_ctx_t *ctx = (nsr_recon_notify_ctx_t *)priv->recon_ctx;
+
+ if (priv->nsr_recon_start == _gf_false)
+ return 0;
+
+ ctx->fs = glfs_new(priv->vol_uuid);
+ if (!ctx->fs) {
+ ret = 1;
+ gf_log (this->name, GF_LOG_ERROR, "failed to initialise glfs \n");
+ goto done;
+ }
+
+ glusterfs_this_set(old);
+ ret = glfs_set_volfile(ctx->fs, priv->vol_file);
+ if (ret != 0) {
+ gf_log (this->name, GF_LOG_ERROR, "failed to set volfile \n");
+ goto done;
+ }
+
+ glusterfs_this_set(old);
+ /*
+ * REVIEW
+ * Logs belong in /var/log not /tmp.
+ */
+ glfs_set_logging (ctx->fs,"/tmp/glfs-log", 7);
+ if (glfs_init(ctx->fs) < 0) {
+ gf_log (this->name, GF_LOG_ERROR, "failed to init volfile \n");
+ ret = 1;
+ goto done;
+ }
+
+ glusterfs_this_set(old);
+ ctx->fd = glfs_open (ctx->fs, "/", O_RDWR);
+ if (ctx->fd == NULL) {
+ ret = 1;
+ gf_log (this->name, GF_LOG_ERROR,
+ "failed to open fd to communicate with recon process \n");
+ goto done;
+ }
+
+
+done:
+ glusterfs_this_set(old);
+ return ret;
+}
+
+
+static void
+nsr_setup_hosts(xlator_t *this)
+{
+ xlator_list_t *trav;
+ nsr_private_t *priv = this->private;
+ uint32_t i = 0;
+ nsr_recon_notify_ctx_t *ctx = (nsr_recon_notify_ctx_t *)priv->recon_ctx;
+
+ ctx->hosts = GF_CALLOC(sizeof(char *), priv->n_children, gf_mt_nsr_private_t);
+ // Iterate thru all the children
+ for (trav = this->children; trav; trav = trav->next) {
+ char *hostname = NULL, *vol = NULL;
+ int ret1 = 0, ret2 = 0, ret = 0;
+ xlator_t *xl = trav->xlator;
+ // If the child type is that of protocol/client
+ if (!strcmp(trav->xlator->type, "protocol/client")) {
+ ret1 = xlator_get_option (xl, "remote-host", &hostname);
+ ret2 = xlator_get_option (xl, "remote-subvolume", &vol);
+ if (!ret1 && !ret2) {
+ // add the name of that host to the hosts
+ ctx->hosts[i] = GF_CALLOC(sizeof(char), strlen(hostname) + strlen(vol) + 2, 0);
+ strcpy(ctx->hosts[i], hostname);
+ strcat(ctx->hosts[i], ":");
+ strcat(ctx->hosts[i], vol);
+ gf_log (this->name, GF_LOG_INFO,
+ "adding hosts %s to recon notfiy list", ctx->hosts[i]);
+ } else {
+ gf_log (this->name, GF_LOG_ERROR,
+ "CANNOT FIND HOSTNAME FOR A CHILD");
+ GF_ASSERT(0);
+ }
+ // local brick
+ } else {
+ ret = xlator_get_option (this, "my-name", &hostname);
+ if (!ret) {
+ uint32_t len = strlen(hostname);
+ ctx->hosts[i] = GF_CALLOC(sizeof(char),
+ len+1,
+ gf_mt_nsr_private_t);
+ strcpy(ctx->hosts[i], hostname);
+ gf_log (this->name, GF_LOG_INFO,
+ "adding my host %s to recon notfiy list", ctx->hosts[i]);
+ } else {
+ gf_log (this->name, GF_LOG_ERROR,
+ "CANNOT FIND MY HOSTNAME");
+ GF_ASSERT(0);
+ }
+ }
+ i++;
+ }
+}
+
+void *
+nsr_recon_notify_thread (void *arg)
+{
+ xlator_t *this = (xlator_t *)arg;
+ nsr_private_t *priv = this->private;
+ nsr_recon_notify_ev_t *ev;
+ nsr_recon_notify_ctx_t *ctx;
+
+ priv->recon_ctx = GF_CALLOC(1, sizeof(nsr_recon_notify_ctx_t), gf_mt_nsr_private_t);
+ if (!priv->recon_ctx) {
+ gf_log (this->name, GF_LOG_ERROR, "calloc error");
+ return NULL;
+ }
+ ctx = priv->recon_ctx;
+
+ pthread_mutex_init(&(ctx->recon_mutex), NULL);
+ pthread_cond_init(&(ctx->recon_cv), NULL);
+ INIT_LIST_HEAD(&(ctx->recon_head.list));
+
+ nsr_setup_hosts(this);
+
+ if (nsr_setup_recon(this)) {
+ gf_log (this->name, GF_LOG_ERROR, "recon notify thread : initing glfs error");
+ return NULL;
+ }
+
+ priv->recon_notify_inited = 1;
+
+ while(1) {
+ pthread_mutex_lock(&ctx->recon_mutex);
+ while (list_empty(&(ctx->recon_head.list))) {
+ pthread_cond_wait(&ctx->recon_cv, &ctx->recon_mutex);
+ }
+ pthread_mutex_unlock(&ctx->recon_mutex);
+
+ list_for_each_entry(ev, &(ctx->recon_head.list), list) {
+
+ if (ev->id == NSR_RECON_SET_LEADER) {
+ gf_log (this->name, GF_LOG_INFO,
+ "got add leader notfiy event");
+ nsr_recon_set_leader(this);
+ } else if (ev->id == NSR_RECON_ADD_CHILD) {
+ gf_log (this->name, GF_LOG_INFO,
+ "got add child notify event");
+ nsr_recon_add_child(this, ev->index);
+ }
+ }
+ list_del_init (&ev->list);
+ }
+
+ return NULL;
+}
+
diff --git a/xlators/mgmt/glusterd/src/glusterd-volgen.c b/xlators/mgmt/glusterd/src/glusterd-volgen.c
index 59bc7bcd5..fe33c8d7d 100644
--- a/xlators/mgmt/glusterd/src/glusterd-volgen.c
+++ b/xlators/mgmt/glusterd/src/glusterd-volgen.c
@@ -1468,6 +1468,11 @@ add_nsr_stuff (volgen_graph_t *graph, char *volname,
char c_d[NSR_MAX_PATH_SIZE];
char *username = NULL, *password = NULL;
gf_boolean_t enable_recon = _gf_false;
+#ifdef HAVE_ETCD
+ uint32_t nsr_port = 27000;
+#else
+ static uint32_t nsr_port = 27000;
+#endif
if (glusterd_volinfo_get_boolean(volinfo,"cluster.nsr.recon") > 0) {
@@ -1623,7 +1628,7 @@ add_nsr_stuff (volgen_graph_t *graph, char *volname,
get_vol_transport_type (volinfo, transt);
if(xlator_set_option (xl, "transport-type", transt) == -1)
return -1;
- sprintf(s,"%d",27000);
+ sprintf(s,"%d",nsr_port);
if(xlator_set_option (xl, "transport.socket.listen-port", s) == -1)
return -1;
strcpy(auth, "auth.addr.");
@@ -1655,7 +1660,7 @@ add_nsr_stuff (volgen_graph_t *graph, char *volname,
return -1;
if(xlator_set_option (kid, "transport-type", transt) == -1)
return -1;
- sprintf(s,"%d",27000);
+ sprintf(s,"%d",nsr_port++);
if(xlator_set_option (kid, "remote-port", s) == -1)
return -1;
snprintf (c_d, PATH_MAX,