diff options
author | Raghavan P <rpichai@redhat.com> | 2014-01-03 16:09:04 +0530 |
---|---|---|
committer | Raghavan P <rpichai@redhat.com> | 2014-01-08 14:48:21 +0530 |
commit | e0cce4cf7c22d5cd8ab6c2aff4ecf28c18c6a469 (patch) | |
tree | 5e30d20eaf43c77f77d5aa9d4351492af659b39f | |
parent | 82ce8acfdfb141c6b34b6b6b43ef78eee891f9e8 (diff) |
Changes to NSR reconciliation code.
Following is list of changes:
1) Simulation of etcd using a file as a registry protected using locks.
2) Implement notifications for child up and child down.
3) Join a new brick into quorum.
4) add support for proper fencing and draining of IO required for reconciliaiton
5) misc changes and addressed review comments.
Change-Id: Iddd1137ad6205252ed03301888bb1e83fa2221e0
Signed-off-by: Raghavan P <rpichai@redhat.com>
-rw-r--r-- | configure.ac | 13 | ||||
-rw-r--r-- | xlators/cluster/nsr-recon/src/recon_driver.c | 172 | ||||
-rw-r--r-- | xlators/cluster/nsr-recon/src/recon_driver.h | 46 | ||||
-rw-r--r-- | xlators/cluster/nsr-recon/src/recon_xlator.c | 13 | ||||
-rw-r--r-- | xlators/cluster/nsr-recon/src/recon_xlator.h | 16 | ||||
-rw-r--r-- | xlators/cluster/nsr-server/src/Makefile.am | 8 | ||||
-rw-r--r-- | xlators/cluster/nsr-server/src/all-templates.c | 8 | ||||
-rw-r--r-- | xlators/cluster/nsr-server/src/etcd-sim.c | 222 | ||||
-rw-r--r-- | xlators/cluster/nsr-server/src/leader.c | 173 | ||||
-rw-r--r-- | xlators/cluster/nsr-server/src/nsr-internal.h | 20 | ||||
-rw-r--r-- | xlators/cluster/nsr-server/src/nsr.c | 45 | ||||
-rw-r--r-- | xlators/cluster/nsr-server/src/recon_notify.c | 345 | ||||
-rw-r--r-- | xlators/mgmt/glusterd/src/glusterd-volgen.c | 9 |
13 files changed, 849 insertions, 241 deletions
diff --git a/configure.ac b/configure.ac index 581b976a0..89aba7781 100644 --- a/configure.ac +++ b/configure.ac @@ -774,6 +774,19 @@ fi AM_CONDITIONAL([ENABLE_SYSLOG], [test x$USE_SYSLOG = xyes]) #end syslog section + +#etcd section +AC_CHECK_PROG(ETCD,etcd,yes) + +ETCD_SIM=yes +if test "x${ETCD}" = "xyes"; then + ETCD_SIM=no + AC_DEFINE(HAVE_ETCD, 1, [define if found etcd]) +fi +AM_CONDITIONAL([ENABLE_ETCD_SIM], [test x$ETCD_SIM = xyes]) +#end etcd section + + BUILD_READLINE=no AC_CHECK_LIB([readline -lcurses],[readline],[RLLIBS="-lreadline -lcurses"]) AC_CHECK_LIB([readline -ltermcap],[readline],[RLLIBS="-lreadline -ltermcap"]) diff --git a/xlators/cluster/nsr-recon/src/recon_driver.c b/xlators/cluster/nsr-recon/src/recon_driver.c index 1328d52dc..2e2299ad1 100644 --- a/xlators/cluster/nsr-recon/src/recon_driver.c +++ b/xlators/cluster/nsr-recon/src/recon_driver.c @@ -495,7 +495,7 @@ control_worker_func_0(nsr_per_node_worker_t *ctx, nsr_worker_log(this->name, GF_LOG_INFO, "out of get term info for term %d. got ops %d with first %d and last %d \n", recon_info->last_term, recon_info->commited_ops, - recon_info->last_index, recon_info->first_index); + recon_info->first_index, recon_info->last_index); break; } @@ -538,12 +538,14 @@ control_worker_func_0(nsr_per_node_worker_t *ctx, "trying to get reconciliation window records for node %d for term %d with first %d last %d\n", index, recon_info->last_term, recon_info->first_index, recon_info->last_index); - GF_ASSERT(num <= MAX_RECONCILIATION_WINDOW_SIZE); // TBD - handle buffer allocation errors rd = GF_CALLOC(num, sizeof(nsr_recon_record_details_t), gf_mt_recon_private_t); + recon_info->records = GF_CALLOC(num, + sizeof(nsr_reconciliation_record_t), + gf_mt_recon_private_t); // TBD - handle errors nsr_recon_libchangelog_get_records(this, priv->changelog_base_path, @@ -684,7 +686,7 @@ control_worker_func(nsr_per_node_worker_t *ctx, nsr_worker_log(this->name, GF_LOG_INFO, "out of get last term info with current term %d. got ops %d with first %d and last %d \n", recon_info->last_term, recon_info->commited_ops, - recon_info->last_index, recon_info->first_index); + recon_info->first_index, recon_info->last_index); break; } @@ -711,7 +713,7 @@ control_worker_func(nsr_per_node_worker_t *ctx, nsr_worker_log(this->name, GF_LOG_INFO, "out of get term info for term %d. got ops %d with first %d and last %d \n", recon_info->last_term, recon_info->commited_ops, - recon_info->last_index, recon_info->first_index); + recon_info->first_index, recon_info->last_index); break; } @@ -848,7 +850,6 @@ control_worker_func(nsr_per_node_worker_t *ctx, "trying to get reconciliation window records for node %d for term %d with first %d last %d\n", index, recon_info->last_term, recon_info->first_index, recon_info->last_index); - GF_ASSERT(num <= MAX_RECONCILIATION_WINDOW_SIZE); // TBD - error handling for all the glfs APIs glfs_lseek(ctx->aux_fd, nsr_recon_xlator_sector_2, SEEK_SET); @@ -864,6 +865,10 @@ control_worker_func(nsr_per_node_worker_t *ctx, rd = GF_CALLOC(num, sizeof(nsr_recon_record_details_t), gf_mt_recon_private_t); + recon_info->records = GF_CALLOC(num, + sizeof(nsr_reconciliation_record_t), + gf_mt_recon_private_t); + glfs_read(ctx->aux_fd, rd, num * sizeof(nsr_recon_record_details_t), 0); for (i=0; i < num; i++) { ENDIAN_CONVERSION_RD(rd[i], _gf_true); //ntohl @@ -875,7 +880,7 @@ control_worker_func(nsr_per_node_worker_t *ctx, recon_info->records[i].rec.type, i + recon_info->first_index); } - free(rd); + GF_FREE(rd); nsr_worker_log(this->name, GF_LOG_INFO, "got reconciliation window records for node %d for term %d \n", @@ -1003,6 +1008,28 @@ nsr_recon_in_use(nsr_recon_driver_ctx_t *ctx, * Write the role and associated information to the node. * This gets called from recon xlator indicating node is either * leader, reconciliator or should do resolution. + */ +gf_boolean_t +nsr_recon_driver_set_role(nsr_recon_driver_ctx_t *ctx, + nsr_recon_role_t *rr, + uint32_t txn_id) +{ + nsr_role_work_t *rw; + + nsr_driver_log(this->name, GF_LOG_INFO, "set role called \n"); + rw = GF_CALLOC(1, sizeof (nsr_role_work_t), 0); + memcpy(&rw->role, rr, sizeof(nsr_recon_role_t)); + rw->txn_id = txn_id; + INIT_LIST_HEAD(&(rw->list)); + pthread_mutex_lock(&(ctx->mutex)); + list_add_tail(&rw->list, &ctx->role_head.list); + pthread_cond_signal(&(ctx->cv)); + pthread_mutex_unlock(&(ctx->mutex)); + nsr_driver_log(this->name, GF_LOG_INFO, "set role returns \n"); + return _gf_true; +} + +/* * First we undo the last role to make sure we clean up. * * Input arguments @@ -1023,19 +1050,18 @@ nsr_recon_in_use(nsr_recon_driver_ctx_t *ctx, * needs to be returned back after the actual reconciliation is done. * For that we store the frame against this id which acts as a key. */ -gf_boolean_t -nsr_recon_driver_set_role(nsr_recon_driver_ctx_t *ctx, - nsr_recon_role_t *rr, - uint32_t txn_id) +nsr_recon_driver_state_t +nsr_recon_driver_get_role(nsr_recon_driver_ctx_t *ctx, + nsr_role_work_t *rw) { uint8_t i=0, j=0; - pthread_mutex_lock(&(ctx->mutex)); - ctx->state = rr->role; + nsr_recon_role_t *rr = &(rw->role); + // First make all the threads uninitialise for (i = 0; i < ctx->replica_group_size; i++) { nsr_recon_in_use(ctx, i, _gf_false); } - if (rr->role == leader) { + if ((rr->role == leader) || (rr->role == joiner)) { // First set info this node nsr_recon_in_use(ctx, 0, _gf_true); @@ -1051,10 +1077,11 @@ nsr_recon_driver_set_role(nsr_recon_driver_ctx_t *ctx, for (i=1; i < ctx->replica_group_size; i++) { for (j=0 ; j < rr->num; j++) { // TBD - make this strcmp later when etcd servers set properly - //if (!strcmp(ctx->workers[i].name, rr->info[j].name)) { - if (strstr(ctx->workers[i].name, rr->info[j].name)) { + if (!strcmp(ctx->workers[i].name, rr->info[j].name)) { + //if (strstr(ctx->workers[i].name, rr->info[j].name)) { nsr_driver_log(this->name, GF_LOG_INFO, - "nsr_recon_driver_set_role: this as leader. found other server %s\n", + "nsr_recon_driver_set_role: this as %s. found other server %s\n", + (rr->role == leader) ? "leader" : "joiner", ctx->workers[i].name); nsr_recon_in_use(ctx, i, _gf_true); @@ -1071,14 +1098,19 @@ nsr_recon_driver_set_role(nsr_recon_driver_ctx_t *ctx, } } } - ctx->reconciliator_index = -1; + // If leader, reconciliator has to be chosen. + // If joiner, we are the reconciliator. + if (rr->role == leader) + ctx->reconciliator_index = -1; + else + ctx->reconciliator_index = 0; } else if (rr->role == reconciliator) { ctx->reconciliator_index = 0; // Copy information about all the other members which had the same term for (i=0; i < rr->num; i++) { for (j=0; j < ctx->replica_group_size; j++) { - //if (!strcmp(rr->info[i].name, ctx->workers[j].name)) { - if (strstr(ctx->workers[j].name, rr->info[i].name)) { + if (!strcmp(rr->info[i].name, ctx->workers[j].name)) { + //if (strstr(ctx->workers[j].name, rr->info[i].name)) { nsr_driver_log(this->name, GF_LOG_INFO, "nsr_recon_driver_set_role: this as reconciliator. found other server %s\n", ctx->workers[j].name); @@ -1104,8 +1136,8 @@ nsr_recon_driver_set_role(nsr_recon_driver_ctx_t *ctx, } else if (rr->role == resolutor) { for (j=0; j < ctx->replica_group_size; j++) { // info[1] has the information regarding the reconciliator - if (strstr(ctx->workers[j].name, rr->info[1].name)) { - //if (!strcmp(rr->info[1].name, ctx->workers[j].name)) { + //if (strstr(ctx->workers[j].name, rr->info[1].name)) { + if (!strcmp(rr->info[1].name, ctx->workers[j].name)) { nsr_driver_log(this->name, GF_LOG_INFO, "nsr_recon_driver_set_role: this as resolutor. found other server %s as reconciliator\n", ctx->workers[1].name); @@ -1143,11 +1175,9 @@ nsr_recon_driver_set_role(nsr_recon_driver_ctx_t *ctx, nsr_recon_in_use(ctx, 0, _gf_true); } - ctx->txn_id = txn_id; - // Signal the main driver thread - pthread_cond_signal(&(ctx->cv)); - pthread_mutex_unlock(&(ctx->mutex)); - return _gf_true; + ctx->txn_id = rw->txn_id; + + return rr->role; } @@ -1171,6 +1201,15 @@ compute_resolution_work(nsr_recon_driver_ctx_t *ctx, uint32_t i=0; uint32_t num = (my_info->last_index - my_info->first_index + 1); + if (invalidate) { + if (my_info->records) { + GF_FREE(my_info->records); + my_info->records = GF_CALLOC(num, + sizeof(nsr_reconciliation_record_t), + gf_mt_recon_private_t); + } + } + for (i=0; i < num; i++) { nsr_log_type_t orig, new; nsr_recon_work_type_t tw = NSR_RECON_WORK_NONE; @@ -1841,7 +1880,6 @@ read_out: } apply_record(ctx, ri, dict); commit_out: - dict_unref (dict); nsr_worker_log(this->name, GF_LOG_INFO, "finished recon commit for gfid %s \n", rd->gfid); @@ -2107,6 +2145,10 @@ nsr_recon_in_use(nsr_recon_driver_ctx_t *ctx, send_and_wait(bm, ctx->replica_group_size, ctx, (in_use == _gf_true) ? NSR_WORK_ID_INI : NSR_WORK_ID_FINI, NSR_RECON_QUEUE_TO_DATA, -1); + if (in_use == _gf_false) { + //GF_FREE(ctx->workers[i].recon_info->records); + GF_FREE(ctx->workers[i].recon_info); + } } #endif } @@ -2139,6 +2181,7 @@ nsr_reconciliation_driver(void *arg) nsr_driver_log (this->name, GF_LOG_ERROR, "mutex init error \n"); return NULL; } + INIT_LIST_HEAD(&(ctx->role_head.list)); ctx->workers = GF_CALLOC (replica_group_size, sizeof(nsr_replica_worker_t), @@ -2188,24 +2231,20 @@ nsr_reconciliation_driver(void *arg) while (1) { - nsr_driver_log (this->name, GF_LOG_INFO, "waiting for state change \n"); - pthread_mutex_lock(&(ctx->mutex)); - while ((*driver_ctx)->state == 0) { - pthread_cond_wait(&(ctx->cv), &(ctx->mutex)); - } - pthread_mutex_unlock(&(ctx->mutex)); + nsr_role_work_t *rr; - nsr_driver_log (this->name, GF_LOG_INFO, " state changed to %d \n", ctx->state); -#if 0 - for (i=0; i < replica_group_size; i++) { - if (ctx->workers[i].in_use) { - nsr_recon_start_work(ctx->workers[i].control_worker, _gf_true); - nsr_recon_start_work(ctx->workers[i].data_worker, _gf_false); - } - } -#endif + nsr_driver_log (this->name, GF_LOG_INFO, "waiting for role to be queued \n"); + pthread_mutex_lock(&(ctx->mutex)); + while (list_empty(&(ctx->role_head.list))) { + pthread_cond_wait(&(ctx->cv), &(ctx->mutex)); + } + pthread_mutex_unlock(&(ctx->mutex)); + + list_for_each_entry(rr, &(ctx->role_head.list), list) { + nsr_recon_driver_state_t state; + state = nsr_recon_driver_get_role(ctx, rr); - if (ctx->state == leader) { + if (state == leader) { int32_t chosen = -1; int32_t last_term = -1, last_ops = -1; @@ -2278,13 +2317,13 @@ nsr_reconciliation_driver(void *arg) * file almost unreadable. */ if (!setjmp(*(ctx->env))) { - ctx->state = reconciliator; + state = reconciliator; goto i_am_reconciliator; } else { nsr_driver_log (this->name, GF_LOG_INFO, "long jmp return to leader\n"); free(ctx->env); ctx->env = NULL; - ctx->state = leader; + state = leader; } } @@ -2294,13 +2333,13 @@ nsr_reconciliation_driver(void *arg) nsr_driver_log (this->name, GF_LOG_INFO, "local node resolution needs to be done. before set jmp\n"); ctx->env = calloc(1,sizeof(jmp_buf)); if (!setjmp(*(ctx->env))) { - ctx->state = resolutor; + state = resolutor; goto i_am_resolutor; } else { nsr_driver_log (this->name, GF_LOG_INFO, "long jmp return to leader\n"); free(ctx->env); ctx->env = NULL; - ctx->state = leader; + state = leader; } } @@ -2316,7 +2355,7 @@ nsr_reconciliation_driver(void *arg) } i_am_reconciliator: - if (ctx->state == reconciliator) { + if (state == reconciliator) { gf_boolean_t do_recon = _gf_false; uint32_t start_index = ctx->workers[0].recon_info->first_index; uint32_t end_index = ctx->workers[0].recon_info->last_index; @@ -2331,7 +2370,7 @@ i_am_reconciliator: (ctx->workers[0].recon_info->last_term == ctx->workers[i].recon_info->last_term)) { ctx->workers[i].recon_info->last_index = end_index; ctx->workers[i].recon_info->first_index = start_index; - bm = (1 << i); + bm |= (1 << i); do_recon = _gf_true; } } @@ -2445,7 +2484,7 @@ i_am_reconciliator: } i_am_resolutor: - if (ctx->state == resolutor) { + if (state == resolutor) { // This node's last term is filled when it gets a message // from the leader to act as a reconciliator. @@ -2600,6 +2639,36 @@ i_am_resolutor: } + if (state == joiner) { + + int32_t chosen = -1; + int32_t last_term = -1, last_ops = -1; + + nsr_driver_log (this->name, GF_LOG_INFO, "getting last term info from all members of this group\n"); + // Get last term info from all members for this group + // which will be the leader(this node) and the node that wants to join. + send_and_wait(-1, + replica_group_size, + ctx, + NSR_WORK_ID_GET_LAST_TERM_INFO, + NSR_RECON_QUEUE_TO_CONTROL, ctx->current_term); + + + // send message to other node that just joined to sync up with this node which is also the leader + nsr_driver_log (this->name, GF_LOG_INFO, "sending resolution work to all nodes except this\n"); + bm = ~(1); + send_and_wait(bm, + replica_group_size, + ctx, + NSR_WORK_ID_RESOLUTION_DO_WORK, + NSR_RECON_QUEUE_TO_CONTROL, -1); + + nsr_driver_log (this->name, GF_LOG_INFO, + "finished recon work as joiner \n"); + + } + + // free the asasociated recon_info contexts created as part of this role out: @@ -2617,7 +2686,8 @@ out: #endif nsr_driver_log (this->name, GF_LOG_INFO, "finished sending end of reconciliation message \n"); - ctx->state = 0; + } + list_del_init (&rr->list); } return NULL; diff --git a/xlators/cluster/nsr-recon/src/recon_driver.h b/xlators/cluster/nsr-recon/src/recon_driver.h index 67f4d6014..a9a9a9182 100644 --- a/xlators/cluster/nsr-recon/src/recon_driver.h +++ b/xlators/cluster/nsr-recon/src/recon_driver.h @@ -78,6 +78,7 @@ typedef enum nsr_recon_driver_state_t { leader = 1, reconciliator = 2, resolutor = 3, + joiner = 4, } nsr_recon_driver_state_t; // role structure @@ -177,6 +178,12 @@ typedef struct nsr_recon_record_details_s { rd.len = f(rd.len); \ } +typedef struct _nsr_role_work_s { + nsr_recon_role_t role; + uint32_t txn_id; + struct list_head list; +} nsr_role_work_t; + typedef struct _nsr_recon_work_s { gf_boolean_t in_use; uint32_t index; @@ -206,7 +213,8 @@ typedef struct _nsr_reconciliator_info { int32_t commited_ops; uint32_t last_index; uint32_t first_index; - nsr_reconciliation_record_t records[MAX_RECONCILIATION_WINDOW_SIZE]; + //nsr_reconciliation_record_t records[MAX_RECONCILIATION_WINDOW_SIZE]; + nsr_reconciliation_record_t *records; } nsr_reconciliator_info_t; typedef struct _nsr_per_node_worker_s { @@ -221,7 +229,7 @@ typedef struct _nsr_per_node_worker_s { char local; // local data worker //struct list_head list; //list of work items nsr_recon_work_t head; - pthread_mutex_t mutex; //mutex to gaurd the above list + pthread_mutex_t mutex; //mutex to guard the state pthread_cond_t cv; //condition variable for signaling the worker thread gf_boolean_t is_control; #ifdef NSR_DEBUG @@ -242,9 +250,9 @@ typedef struct _nsr_recon_driver_ctxt { uint32_t replica_group_size; // number of static members of replica group nsr_replica_worker_t *workers; // worker info int32_t reconciliator; - pthread_mutex_t mutex; //mutex to gaurd the state - pthread_cond_t cv; //condition variable for signaling the driver thread - uint32_t state; //driver state + pthread_mutex_t mutex; + pthread_cond_t cv; + nsr_role_work_t role_head; volatile int32_t outstanding; uint32_t reconciliator_index; uint32_t txn_id; @@ -278,8 +286,20 @@ nsr_recon_driver_set_role(nsr_recon_driver_ctx_t *ctx, nsr_recon_role_t *rr, uin { \ char c[255]; \ if (!ctx->driver_log_fd) { \ + char str[255], b[255]; \ + char *ptr; \ + nsr_recon_private_t *priv = ctx->this->private; \ + strcpy(b, priv->replica_group_members[0]); \ + ptr = strchr (b, '/'); \ + while (ptr) { \ + *ptr = '-'; \ + ptr = strchr (b, '/'); \ + } \ + sprintf(str,"/tmp/nsr-logs/%s",b); \ mkdir("/tmp/nsr-logs/", 0777); \ - ctx->driver_log_fd = open("/tmp/nsr-logs/nsr-driver-log", O_RDWR|O_CREAT|O_TRUNC); \ + mkdir(str, 0777); \ + sprintf(str,"/tmp/nsr-logs/%s/nsr-driver-log",b); \ + ctx->driver_log_fd = open(str, O_RDWR|O_CREAT|O_TRUNC); \ } \ sprintf(c, fmt); \ write(ctx->driver_log_fd, c, strlen(c)); \ @@ -293,9 +313,19 @@ nsr_recon_driver_set_role(nsr_recon_driver_ctx_t *ctx, nsr_recon_role_t *rr, uin { \ char c[255]; \ if (!ctx->worker_log_fd) { \ - char str[255]; \ - sprintf(str,"/tmp/nsr-logs/%s-%d",ctx->is_control? "con" : "data",ctx->index); \ + char str[255], b[255]; \ + char *ptr; \ + nsr_recon_private_t *priv = ctx->driver_ctx->this->private; \ + strcpy(b, priv->replica_group_members[0]); \ + ptr = strchr (b, '/'); \ + while (ptr) { \ + *ptr = '-'; \ + ptr = strchr (b, '/'); \ + } \ + sprintf(str,"/tmp/nsr-logs/%s",b); \ mkdir("/tmp/nsr-logs/", 0777); \ + mkdir(str, 0777); \ + sprintf(str,"/tmp/nsr-logs/%s/%s-%d",b,ctx->is_control?"con":"data",ctx->index); \ ctx->worker_log_fd = open(str, O_RDWR|O_CREAT|O_TRUNC); \ } \ sprintf(c, fmt); \ diff --git a/xlators/cluster/nsr-recon/src/recon_xlator.c b/xlators/cluster/nsr-recon/src/recon_xlator.c index 62583d526..5f63f6671 100644 --- a/xlators/cluster/nsr-recon/src/recon_xlator.c +++ b/xlators/cluster/nsr-recon/src/recon_xlator.c @@ -196,8 +196,7 @@ void nsr_recon_libchangelog_get_records(xlator_t *this, char *bp, int32_t term, // do a mmap; seek into the first and read all records till last. // TBD - right now all records are pseudo holes but mark them as fills. // TBD - pseudo hole to be implemented when actual fsync gets done on data. - char read_buf[((last - first) + 1) * 128]; - char *rb = &(read_buf[0]); + char *rb = NULL, *orig = NULL; char path[PATH_MAX]; int fd; uint32_t index = 0; @@ -206,11 +205,14 @@ void nsr_recon_libchangelog_get_records(xlator_t *this, char *bp, int32_t term, "libchangelog_get_records called for term %d index from %d to %d \n", term, first, last ); + orig = rb = GF_CALLOC(128, ((last - first) + 1), gf_mt_recon_private_t); + sprintf(path,"%s/%s%d",bp,"TERM.",term); fd = open(path, O_RDONLY); if (fd != -1) { char *start = NULL; nsr_recon_record_details_t * rec = (nsr_recon_record_details_t *)buf; + if (first == 0) lseek(fd, 128, SEEK_SET); else @@ -407,6 +409,7 @@ finish: rec++; } while(1); } + GF_FREE(orig); close(fd); recon_main_log (this->name, GF_LOG_INFO, @@ -484,7 +487,8 @@ nsr_recon_writev (call_frame_t *frame, xlator_t *this, fd_t *fd, recon_main_log (this->name, GF_LOG_INFO, "nsr_recon_writev called to set role %d\n", rr.role); if ((rr.role != leader) && (rr.role != reconciliator) && - (rr.role != resolutor)) { + (rr.role != resolutor) && + (rr.role != joiner)) { recon_main_log (this->name, GF_LOG_ERROR, "EIII---nsr_recon_writev cannot set state \n"); STACK_UNWIND_STRICT (writev, frame, -1, op_errno, @@ -577,7 +581,7 @@ nsr_recon_readv (call_frame_t *frame, xlator_t *this, int32_t ret = -1; nsr_recon_private_t *priv = NULL; - iobuf = iobuf_get2 (this->ctx->iobuf_pool, op_ret); + iobuf = iobuf_get2 (this->ctx->iobuf_pool, size); if (!iobuf) { op_errno = ENOMEM; goto out; @@ -623,6 +627,7 @@ nsr_recon_readv (call_frame_t *frame, xlator_t *this, (num * sizeof(nsr_recon_record_details_t)), size); GF_ASSERT(size == (num * sizeof(nsr_recon_record_details_t))); + bzero(iobuf->ptr, size); recon_main_log (this->name, GF_LOG_INFO, "nsr_recon_readv - getting records for term=%d from %d to %d\n", rfd->term, rfd->first_index, rfd->last_index); diff --git a/xlators/cluster/nsr-recon/src/recon_xlator.h b/xlators/cluster/nsr-recon/src/recon_xlator.h index c0f1e2145..168db518b 100644 --- a/xlators/cluster/nsr-recon/src/recon_xlator.h +++ b/xlators/cluster/nsr-recon/src/recon_xlator.h @@ -55,11 +55,22 @@ typedef struct _nsr_recon_private_s { #ifdef NSR_DEBUG #define recon_main_log(dom, levl, fmt...) \ { \ - nsr_recon_private_t *priv = this->private; \ char c[255]; \ + nsr_recon_private_t *priv = this->private; \ if (!priv->recon_main_log_fd) { \ + char str[255], b[255]; \ + char *ptr; \ + strcpy(b, priv->replica_group_members[0]); \ + ptr = strchr (b, '/'); \ + while (ptr) { \ + *ptr = '-'; \ + ptr = strchr (b, '/'); \ + } \ + sprintf(str,"/tmp/nsr-logs/%s",b); \ mkdir("/tmp/nsr-logs/", 0777); \ - priv->recon_main_log_fd = open("/tmp/nsr-logs/recon-main-log", O_RDWR|O_CREAT|O_TRUNC); \ + mkdir(str, 0777); \ + sprintf(str,"/tmp/nsr-logs/%s/recon-main-log",b); \ + priv->recon_main_log_fd = open(str, O_RDWR|O_CREAT|O_TRUNC); \ } \ sprintf(c, fmt); \ write(priv->recon_main_log_fd, c, strlen(c)); \ @@ -68,7 +79,6 @@ typedef struct _nsr_recon_private_s { #define recon_main_log(dom, levl, fmt...) gf_log(dom, levl, fmt) #endif - void nsr_recon_libchangelog_get_this_term_info(xlator_t *this, char *bp, int32_t term, nsr_recon_last_term_info_t *lt); void nsr_recon_libchangelog_get_last_term_info(xlator_t *this, char *bp, int32_t term, nsr_recon_last_term_info_t *lt); void nsr_recon_return_back(nsr_recon_private_t *priv, uint32_t term_id); diff --git a/xlators/cluster/nsr-server/src/Makefile.am b/xlators/cluster/nsr-server/src/Makefile.am index df0d68539..85a560d09 100644 --- a/xlators/cluster/nsr-server/src/Makefile.am +++ b/xlators/cluster/nsr-server/src/Makefile.am @@ -4,9 +4,15 @@ xlator_LTLIBRARIES = nsr.la xlatordir = $(libdir)/glusterfs/$(PACKAGE_VERSION)/xlator/cluster nsr_la_LDFLAGS = -module -avoid-version -lgfapi -lcurl -nsr_la_SOURCES = nsr.c leader.c etcd-api.c \ + +if ENABLE_ETCD_SIM +nsr_la_SOURCES = nsr.c leader.c recon_notify.c etcd-sim.c +else +nsr_la_SOURCES = nsr.c leader.c recon_notify.c etcd-api.c \ yajl.c yajl_alloc.c yajl_buf.c yajl_encode.c yajl_gen.c \ yajl_lex.c yajl_parser.c yajl_tree.c yajl_version.c +endif + nsr_la_LIBADD = $(top_builddir)/libglusterfs/src/libglusterfs.la diff --git a/xlators/cluster/nsr-server/src/all-templates.c b/xlators/cluster/nsr-server/src/all-templates.c index 541653029..7300973d5 100644 --- a/xlators/cluster/nsr-server/src/all-templates.c +++ b/xlators/cluster/nsr-server/src/all-templates.c @@ -83,17 +83,20 @@ nsr_$NAME$ (call_frame_t *frame, xlator_t *this, // follower/recon path // just send it to local node if (from_leader || from_recon) { + atomic_inc(&priv->ops_in_flight); STACK_WIND (frame, nsr_$NAME$_complete, FIRST_CHILD(this), FIRST_CHILD(this)->fops->$NAME$, $ARGS_SHORT$); return 0; } + if (!priv->leader || priv->fence_io) { op_errno = EREMOTE; goto err; } + if (!xdata) { xdata = dict_new(); if (!xdata) { @@ -115,6 +118,7 @@ nsr_$NAME$ (call_frame_t *frame, xlator_t *this, goto err; } + #if defined(NSR_CG_QUEUE) nsr_inode_ctx_t *ictx = nsr_get_inode_ctx(this,fd->inode); if (!ictx) { @@ -188,6 +192,8 @@ nsr_$NAME$_dispatch (call_frame_t *frame, xlator_t *this, nsr_private_t *priv = this->private; xlator_list_t *trav; + atomic_inc(&priv->ops_in_flight); + /* * TBD: unblock pending request(s) if we fail after this point but * before we get to nsr_$NAME$_complete (where that code currently @@ -246,6 +252,7 @@ nsr_$NAME$_complete (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno, $ARGS_LONG$) { + nsr_private_t *priv = this->private; #if defined(NSR_CG_NEED_FD) nsr_local_t *local = frame->local; #endif @@ -294,6 +301,7 @@ nsr_$NAME$_complete (call_frame_t *frame, void *cookie, xlator_t *this, STACK_UNWIND_STRICT ($NAME$, frame, op_ret, op_errno, $ARGS_SHORT$); + atomic_dec(&priv->ops_in_flight); return 0; } diff --git a/xlators/cluster/nsr-server/src/etcd-sim.c b/xlators/cluster/nsr-server/src/etcd-sim.c new file mode 100644 index 000000000..5c5cdcec0 --- /dev/null +++ b/xlators/cluster/nsr-server/src/etcd-sim.c @@ -0,0 +1,222 @@ +/* + * Copyright (c) 2013, Red Hat + * All rights reserved. + + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + + * Redistributions of source code must retain the above copyright notice, this + * list of conditions and the following disclaimer. Redistributions in binary + * form must reproduce the above copyright notice, this list of conditions and + * the following disclaimer in the documentation and/or other materials + * provided with the distribution. + + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <sys/types.h> +#include <sys/stat.h> +#include <fcntl.h> + +#include "call-stub.h" +#include "defaults.h" +#include "xlator.h" +#include "api/src/glfs.h" +#include "api/src/glfs-internal.h" +#include "run.h" + + +/* + * Mock implementation of etcd + * The etcd file is simulated in /tmp/<server-names> + * Writes from Multiple writers are protected using file lock. +*/ + +#include "etcd-api.h" +#define T_FORMAT "%d-%b-%Y,%H:%M:%S" +#define MAX_KEY_LEN 64 +#define MAX_VALUE_LEN 64 +#define MAX_TTL_LEN 12 + +etcd_session +etcd_open (etcd_server *server_list) +{ + return NULL; +} + +typedef struct _etcd_sim_s { + int fd; + FILE *stream; +} etcd_sim_t; + +void +etcd_close (etcd_session this) +{ + etcd_sim_t *sim = (etcd_sim_t *)this; + fflush(sim->stream); + fclose(sim->stream); + close(sim->fd); + free(this); +} + + +char * +etcd_get (etcd_session this, char *key) +{ + char *str = NULL; + size_t len; + etcd_sim_t *sim = (etcd_sim_t *)this; + struct tm tm; + time_t old, new; + lockf(sim->fd, F_LOCK, 0 ); + if (fseek(sim->stream, 0, SEEK_SET) == -1) { + lockf(sim->fd, F_ULOCK, 0 ); + return NULL; + } + // Read the file + while(1) { + if(str) { + free(str); + str = NULL; + } + if (getline((char **)&str, &len,sim->stream) == -1) { + break; + } + if (!strncmp(str, key, strlen(key))) { + char k[256], s[256], *ret, past[256]; + unsigned int ttl; + double delta; + sscanf(str,"%s %s %d %s",k, s, &ttl, past); + strptime(past, T_FORMAT, &tm); + old = mktime(&tm); + new = time(NULL); + delta = difftime(new, old); + // check if key is expired. + // If ttl is 0, it means key has infinite ttk=l. + if ((!ttl) || ((delta >= 0) && (delta < ttl))) { + ret = calloc(1, strlen(s) + 1); + strcpy(ret,s); + free(str); + lockf(sim->fd, F_ULOCK, 0 ); + return(ret); + } + } + } + lockf(sim->fd, F_ULOCK, 0 ); + return NULL; +} + + +etcd_result +etcd_set (etcd_session this, char *key, char *value, + char *precond, unsigned int ttl) +{ + char *str = NULL; + char buf[255]; + char tp[255]; + char s[255]; + size_t len; + etcd_sim_t *sim = (etcd_sim_t *)this; + struct tm tm; + time_t old, new; + lockf(sim->fd, F_LOCK, 0 ); + if (fseek(sim->stream, 0, SEEK_SET) == -1) { + lockf(sim->fd, F_ULOCK, 0 ); + return ETCD_WTF; + } + while(1) { + if(str) { + free(str); + str = NULL; + } + if (getline((char **)&str, &len,sim->stream) == -1) { + break; + } + if (!strncmp(str, key, strlen(key))) { + char k[256], s[256], past[256]; + unsigned int ttl; + double delta; + sscanf(str,"%s %s %d %s",k, s, &ttl, past); + strptime(past, T_FORMAT, &tm); + old = mktime(&tm); + new = time(NULL); + delta = difftime(new, old); + // check if the present key is expired + if ( (!ttl) || ((delta >= 0) && (delta < ttl))) { + // present key not expired. In case of precondition, + // check if it matches. If not return with error + // In case of no precond, return error since + // present key not yet expired. + if ((!precond) || (strcmp(precond, s))) { + free(str); + lockf(sim->fd, F_ULOCK, 0 ); + return ETCD_WTF; + } + } + fseek(sim->stream, -strlen(str), SEEK_CUR); + free(str); + goto here; + } + } +here: + memset(tp, 0, 255); + new = time(NULL); + memcpy(&tm, localtime(&new), sizeof(struct tm)); + strftime(buf, sizeof(buf), T_FORMAT, &tm); + // what we want to print in the file is something like this + // key value(at offset of 64) ttl(offset to 128) time(left offset to 140) + // hence we would want to create a format buf as follows: + // "%-64s%-64s%-16d%-18s" + // Hence we construct this first (in string s) and use that to print into tp + // which gets written to the registry file. + sprintf(s,"%%-%ds%%-%ds%%-%dd%%s\n", + MAX_KEY_LEN, MAX_VALUE_LEN, MAX_TTL_LEN); + sprintf(tp,s,key, value, ttl, buf); + if (fwrite(tp, 1,strlen(tp), sim->stream) != strlen(tp)) { + lockf(sim->fd, F_ULOCK, 0 ); + return ETCD_WTF; + } + fflush(sim->stream); + lockf(sim->fd, F_ULOCK, 0 ); + return ETCD_OK; +} + + + +etcd_session +etcd_open_str (char *server_names) +{ + etcd_sim_t *sim; + char name[256]; + + sim = calloc(1, sizeof(etcd_sim_t)); + sprintf(name, "/tmp/%s", server_names); + sim->fd = open(name, O_RDWR | O_CREAT); + if (sim->fd == -1) + return NULL; + sim->stream = fopen(name, "r+"); + if (sim->stream == NULL) + return NULL; + + return ((void *)sim); +} + + +void +etcd_close_str (etcd_session this) +{ + etcd_close(this); +} diff --git a/xlators/cluster/nsr-server/src/leader.c b/xlators/cluster/nsr-server/src/leader.c index bb0dbabe7..319f99317 100644 --- a/xlators/cluster/nsr-server/src/leader.c +++ b/xlators/cluster/nsr-server/src/leader.c @@ -23,11 +23,14 @@ #include "api/src/glfs.h" #include "api/src/glfs-internal.h" +#ifndef NSR_SIM_ETCD #include "etcd-api.h" +#endif #include "nsr-internal.h" #include "../../nsr-recon/src/recon_driver.h" #include "../../nsr-recon/src/recon_xlator.h" + /* Vote format: UUID,vote_status,fitness,term_number */ #define VOTE_ELEMS 4 /* Whole match plus four actual pieces. */ #define DEFAULT_FITNESS 42 @@ -39,6 +42,10 @@ enum { NO_LEADER, TENTATIVE, CONFIRMED }; regex_t vote_re; +// Simulation of etcd routines +#ifndef NSR_SIM_ETCD +#endif + long nsr_get_fitness (xlator_t *this) { @@ -46,69 +53,14 @@ nsr_get_fitness (xlator_t *this) return 42; } -long -nsr_get_term (xlator_t *this) -{ - nsr_private_t *priv = this->private; - char *text = NULL; - etcd_session etcd = priv->etcd; - - text = etcd_get(etcd, priv->term_uuid); - // first time and hence no key at all. - // this should ideally be done at vol creation time - // by glusterd. Move it there later - if(text == NULL) { - gf_log (this->name, GF_LOG_TRACE, "nsr_get_term returns 1"); - return 0; - } else { - gf_log (this->name, GF_LOG_TRACE, - "nsr_get_term returns %ld", strtol(text, NULL, 10)); - return (strtol(text, NULL, 10)); - } -} - - -// in etcd-api-master. -// send a patch to this package to expose this -extern size_t -parse_get_response (void *ptr, size_t size, size_t nmemb, void *stream); -typedef struct { - etcd_server *servers; -} _etcd_session; -typedef size_t curl_callback_t (void *, size_t, size_t, void *); -extern etcd_result etcd_get_one (_etcd_session *this, char *key, etcd_server *srv, char *prefix, - char *post, curl_callback_t cb, char **stream); - - - -void -nsr_leader_cb(glfs_fd_t *fd, ssize_t ret, void *data) -{ - xlator_t *this = (xlator_t *) data; - nsr_private_t *priv = this->private; - - gf_log (this->name, GF_LOG_INFO, - "nsr_leader_cb arrived with return value %d", (int)ret); - - // TBD - error handling; look at ret - atomic_fetch_and(&(priv->fence_io), 0); - - return; -} - -void -nsr_set_leader (xlator_t *this) +static void +nsr_set_leader (xlator_t *this, etcd_session etcd) { long term = 0; - etcd_server *srv; etcd_result res; - char *value = NULL; nsr_private_t *priv = this->private; - _etcd_session *etcd = priv->etcd; char *term_key = priv->term_uuid; - char *master_key = priv->vol_uuid; char n_t[sizeof(long)+1]; - nsr_recon_role_t role; char *text = NULL; gf_log (this->name, GF_LOG_INFO, "Just became leader"); @@ -134,45 +86,12 @@ nsr_set_leader (xlator_t *this) priv->current_term = term + 1; + // Move this inside recon notify??? atomic_fetch_or(&(priv->fence_io), 1); - role.num = 0; - role.role = leader; - // Get the rest of nodes for this term. - // TBD: fix this so that it uses per-brick keys instead of violating - // modularity and making bad assumptions about etcd behavior. - for (srv = etcd->servers; srv->host; ++srv) { - res = etcd_get_one(etcd,master_key,srv,"keys/",NULL, - parse_get_response,&value); - gf_log (this->name, GF_LOG_INFO, - "Probing for %s, got %d, value:%s", - srv->host, res, value); - if ((res == ETCD_OK) && value) { - gf_log (this->name, GF_LOG_INFO, - "Found for %s", srv->host); - strcpy(role.info[role.num].name, srv->host); - (role.num)++; - } - value = NULL; - } - gf_log (this->name, GF_LOG_INFO, - "Discovered %d nodes that has key %s", role.num, master_key); - - gf_log (this->name, GF_LOG_INFO, - "setting current term as %ld", term + 1); - role.current_term = term + 1; - ENDIAN_CONVERSION_RR(role, _gf_false); - - // inform the reconciliator that this is leader - // in the callback (once reconciliation is done), - // we will unfence the IOs. - // TBD - error handling later. - glfs_lseek(priv->fd, nsr_recon_xlator_sector_1, SEEK_SET); - gf_log (this->name, GF_LOG_INFO, - "Writing to local node to set leader"); - glfs_write_async(priv->fd, &role, - sizeof(role),nsr_recon_xlator_sector_1, - nsr_leader_cb, this); + nsr_recon_notify_event_set_leader(priv); + + return; } @@ -218,7 +137,7 @@ nsr_get_leader (xlator_t *this, etcd_session etcd, char *key) gf_log (this->name, GF_LOG_TRACE, "leader is %s\n",nominee); if (strcmp(nominee,priv->brick_uuid) == 0) { - nsr_set_leader(this); + nsr_set_leader(this, etcd); retval = LS_SUCCESS; } else { @@ -316,60 +235,10 @@ nsr_init_re (xlator_t *this) } -uint32_t -nsr_leader_setup_recon (xlator_t *this) -{ - nsr_private_t *priv = this->private; - xlator_t *old = this; - uint32_t ret = 0; - - if (priv->nsr_recon_start == _gf_false) - return 0; - - priv->fs = glfs_new(priv->vol_uuid); - if (!priv->fs) { - ret = 1; - gf_log (this->name, GF_LOG_ERROR, "failed to initialise glfs \n"); - goto done; - } - - glusterfs_this_set(old); - ret = glfs_set_volfile(priv->fs, priv->vol_file); - if (ret != 0) { - gf_log (this->name, GF_LOG_ERROR, "failed to set volfile \n"); - goto done; - } - - glusterfs_this_set(old); - /* - * REVIEW - * Logs belong in /var/log not /tmp. - */ - glfs_set_logging (priv->fs,"/tmp/glfs-log", 7); - if (glfs_init(priv->fs) < 0) { - gf_log (this->name, GF_LOG_ERROR, "failed to init volfile \n"); - ret = 1; - goto done; - } - - glusterfs_this_set(old); - priv->fd = glfs_open (priv->fs, "/", O_RDWR); - if (priv->fd == NULL) { - ret = 1; - gf_log (this->name, GF_LOG_ERROR, - "failed to open fd to communicate with recon process \n"); - goto done; - } - - -done: - glusterfs_this_set(old); - return ret; -} - void * -nsr_leader_thread (xlator_t *this) +nsr_leader_thread (void *arg) { + xlator_t *this = (xlator_t *) arg; leader_retval_t retval; nsr_private_t *priv = this->private; @@ -378,14 +247,6 @@ nsr_leader_thread (xlator_t *this) return NULL; } - if (nsr_leader_setup_recon(this)) { - gf_log (this->name, GF_LOG_ERROR, - "failed to do glfs initialisation inside leader thread"); - return NULL; - } - - priv->leader_inited = 1; - gf_log (this->name, GF_LOG_INFO, "calling glfs_opens_str on servers %s", priv->etcd_servers); @@ -396,6 +257,8 @@ nsr_leader_thread (xlator_t *this) return NULL; } + priv->leader_inited = 1; + for (;;) { if (nsr_get_leader(this,priv->etcd,priv->vol_uuid) == LS_ERROR) { break; diff --git a/xlators/cluster/nsr-server/src/nsr-internal.h b/xlators/cluster/nsr-server/src/nsr-internal.h index 282247a47..4382f5426 100644 --- a/xlators/cluster/nsr-server/src/nsr-internal.h +++ b/xlators/cluster/nsr-server/src/nsr-internal.h @@ -22,6 +22,16 @@ enum { gf_mt_nsr_end }; +typedef enum nsr_recon_notify_ev_id_t { + NSR_RECON_SET_LEADER = 1, + NSR_RECON_ADD_CHILD = 2 +} nsr_recon_notify_ev_id_t; + +typedef struct _nsr_recon_notify_ev_s { + nsr_recon_notify_ev_id_t id; + uint32_t index; // in case of add + struct list_head list; +} nsr_recon_notify_ev_t; typedef struct { char *etcd_servers; @@ -29,21 +39,23 @@ typedef struct { char *term_uuid; char *brick_uuid; gf_boolean_t leader; + uint8_t up_children; uint8_t n_children; char *vol_file; - glfs_t *fs; etcd_session etcd; volatile unsigned int fence_io; - glfs_fd_t *fd; uint32_t current_term; #ifdef NSR_DEBUG uint32_t leader_log_fd; #endif + volatile int recon_notify_inited; volatile int leader_inited; uint32_t kid_state; gf_lock_t dirty_lock; struct list_head dirty_fds; gf_boolean_t nsr_recon_start; + void * recon_ctx; + volatile uint32_t ops_in_flight; } nsr_private_t; typedef struct { @@ -79,3 +91,7 @@ typedef struct { struct list_head pqueue; } nsr_inode_ctx_t; +void nsr_recon_notify_event_set_leader(nsr_private_t *priv); +void nsr_recon_notify_event_add_child(nsr_private_t *priv, uint32_t index); +void* nsr_recon_notify_thread (void *this); + diff --git a/xlators/cluster/nsr-server/src/nsr.c b/xlators/cluster/nsr-server/src/nsr.c index 3707b3003..f85368456 100644 --- a/xlators/cluster/nsr-server/src/nsr.c +++ b/xlators/cluster/nsr-server/src/nsr.c @@ -258,7 +258,6 @@ nsr_getxattr_special (call_frame_t *frame, xlator_t *this, loc_t *loc, const char *name, dict_t *xdata) { dict_t *result; - uint8_t up; nsr_private_t *priv = this->private; if (!priv->leader) { @@ -279,8 +278,8 @@ nsr_getxattr_special (call_frame_t *frame, xlator_t *this, loc_t *loc, goto dn_failed; } - up = nsr_count_up_kids(this->private); - if (dict_set_uint32(result,NSR_REP_COUNT_XATTR,up) != 0) { + priv->up_children = nsr_count_up_kids(this->private); + if (dict_set_uint32(result,NSR_REP_COUNT_XATTR,priv->up_children) != 0) { goto dsu_failed; } @@ -399,6 +398,8 @@ nsr_reconfigure (xlator_t *this, dict_t *options) nsr_private_t *priv = this->private; GF_OPTION_RECONF ("leader", priv->leader, options, bool, err); + gf_log (this->name, GF_LOG_INFO, + "reconfigure called. setting priv->leader to %d\n", priv->leader); return 0; err: @@ -440,20 +441,25 @@ nsr_notify (xlator_t *this, int event, void *data, ...) index = nsr_get_child_index(this,data); if (index >= 0) { priv->kid_state |= (1 << index); + priv->up_children = nsr_count_up_kids(priv); gf_log (this->name, GF_LOG_INFO, "got CHILD_UP for %s, now %u kids", ((xlator_t *)data)->name, - nsr_count_up_kids(priv)); + priv->up_children); + if (priv->nsr_recon_start == _gf_true) { + nsr_recon_notify_event_add_child(priv, index); + } } break; case GF_EVENT_CHILD_DOWN: index = nsr_get_child_index(this,data); if (index >= 0) { priv->kid_state &= ~(1 << index); + priv->up_children = nsr_count_up_kids(priv); gf_log (this->name, GF_LOG_INFO, "got CHILD_DOWN for %s, now %u kids", ((xlator_t *)data)->name, - nsr_count_up_kids(priv)); + priv->up_children); } break; default: @@ -475,7 +481,7 @@ nsr_init (xlator_t *this) xlator_list_t *trav; pthread_t kid; uuid_t tmp_uuid; - char *my_name = NULL, *recon_file = NULL, *recon_pid_file = NULL, *ptr = NULL; + char *my_name = NULL, *morph_name = NULL, *recon_file = NULL, *recon_pid_file = NULL, *ptr = NULL; char *volname; extern xlator_t global_xlator; glusterfs_ctx_t *oldctx = global_xlator.ctx; @@ -552,34 +558,36 @@ nsr_init (xlator_t *this) gf_log (this->name, GF_LOG_ERROR, "vol name not generated. ???"); goto err; } - - recon_file = GF_CALLOC (1,PATH_MAX + strlen(my_name) + strlen("con") +1, gf_mt_nsr_private_t); - recon_pid_file = GF_CALLOC (1,PATH_MAX + strlen(my_name) + strlen("recon") +1, gf_mt_nsr_private_t); + + morph_name = GF_CALLOC (1, strlen(my_name) + 1, gf_mt_nsr_private_t); + strcpy(morph_name, my_name); + recon_file = GF_CALLOC (1,PATH_MAX + strlen(morph_name) + strlen("con") +1, gf_mt_nsr_private_t); + recon_pid_file = GF_CALLOC (1,PATH_MAX + strlen(morph_name) + strlen("recon") +1, gf_mt_nsr_private_t); if ((!recon_file) || (!recon_pid_file)) { gf_log (this->name, GF_LOG_ERROR, "could not allocate reconciliation file name"); goto err; } - ptr = strchr (my_name, '/'); + ptr = strchr (morph_name, '/'); while (ptr) { *ptr = '-'; - ptr = strchr (my_name, '/'); + ptr = strchr (morph_name, '/'); } sprintf(recon_file,"/%s/%s/%s/%s/",GLUSTERD_DEFAULT_WORKDIR, GLUSTERD_VOLUME_DIR_PREFIX, volname, GLUSTERD_BRICK_INFO_DIR); - strcat(recon_file, my_name); + strcat(recon_file, morph_name); strcat(recon_file, "-nsr-recon.vol"); sprintf(recon_pid_file,"/%s/%s/%s/%s/",GLUSTERD_DEFAULT_WORKDIR, GLUSTERD_VOLUME_DIR_PREFIX, volname, "run"); - strcat(recon_pid_file, my_name); + strcat(recon_pid_file, morph_name); strcat(recon_pid_file, "-recon.pid"); - priv->vol_file = GF_CALLOC (1,PATH_MAX + strlen(my_name) + strlen("con") +1, gf_mt_nsr_private_t); + priv->vol_file = GF_CALLOC (1,PATH_MAX + strlen(morph_name) + strlen("con") +1, gf_mt_nsr_private_t); if (!priv->vol_file) { gf_log (this->name, GF_LOG_ERROR, "could not allocate reconciliation file name"); goto err; @@ -590,7 +598,7 @@ nsr_init (xlator_t *this) volname, GLUSTERD_BRICK_INFO_DIR); strcat(priv->vol_file, "con:"); - strcat(priv->vol_file, my_name); + strcat(priv->vol_file, morph_name); if (pthread_create(&kid,NULL,nsr_flush_thread,this) != 0) { gf_log (this->name, GF_LOG_ERROR, @@ -622,10 +630,17 @@ nsr_init (xlator_t *this) } + (void)pthread_create(&kid,NULL,nsr_recon_notify_thread,this); + while (priv->recon_notify_inited == 0) { + sleep(1); + } + (void)pthread_create(&kid,NULL,nsr_leader_thread,this); while (priv->leader_inited == 0) { sleep(1); } + + /* * Calling glfs_new changes old->ctx, even if THIS still points * to global_xlator. That causes problems later in the main diff --git a/xlators/cluster/nsr-server/src/recon_notify.c b/xlators/cluster/nsr-server/src/recon_notify.c new file mode 100644 index 000000000..9cf2fce5d --- /dev/null +++ b/xlators/cluster/nsr-server/src/recon_notify.c @@ -0,0 +1,345 @@ +/* + Copyright (c) 2013 Red Hat, Inc. <http://www.redhat.com> + This file is part of GlusterFS. + + This file is licensed to you under your choice of the GNU Lesser + General Public License, version 3 or any later version (LGPLv3 or + later), or the GNU General Public License, version 2 (GPLv2), in all + cases as published by the Free Software Foundation. +*/ + +#include <string.h> + +#ifndef _CONFIG_H +#define _CONFIG_H +#include "config.h" +#endif + +#include "call-stub.h" +#include "defaults.h" +#include "xlator.h" +#include "api/src/glfs.h" +#include "api/src/glfs-internal.h" +#include "etcd-api.h" +#include "nsr-internal.h" +#include "../../nsr-recon/src/recon_driver.h" +#include "../../nsr-recon/src/recon_xlator.h" + + + +typedef struct _nsr_recon_notify_ctx_t { + nsr_recon_notify_ev_t recon_head; + pthread_mutex_t recon_mutex; + pthread_cond_t recon_cv; + char **hosts; // list of hosts ordered depending on child indices + uint32_t current_term; + uint32_t last_reconciled_term; + glfs_t *fs; + glfs_fd_t *fd; +} nsr_recon_notify_ctx_t; + +static int +xlator_get_option (xlator_t *xl, char *key, char **value) +{ + GF_ASSERT (xl); + return dict_get_str (xl->options, key, value); +} + +void nsr_recon_notify_event_set_leader(nsr_private_t *priv) +{ + nsr_recon_notify_ev_t *ev; + nsr_recon_notify_ctx_t *ctx = (nsr_recon_notify_ctx_t *)priv->recon_ctx; + + ev = GF_CALLOC (1, sizeof (nsr_recon_notify_ev_t), 0); + ev->id = NSR_RECON_SET_LEADER; + INIT_LIST_HEAD(&(ev->list)); + pthread_mutex_lock(&ctx->recon_mutex); + list_add_tail(&ev->list, &ctx->recon_head.list); + pthread_cond_signal(&ctx->recon_cv); + pthread_mutex_unlock(&ctx->recon_mutex); +} + +void nsr_recon_notify_event_add_child(nsr_private_t *priv, uint32_t index) +{ + nsr_recon_notify_ev_t *ev; + nsr_recon_notify_ctx_t *ctx = (nsr_recon_notify_ctx_t *)priv->recon_ctx; + + ev = GF_CALLOC (1, sizeof (nsr_recon_notify_ev_t), 0); + ev->id = NSR_RECON_ADD_CHILD; + ev->index = index; + INIT_LIST_HEAD(&(ev->list)); + pthread_mutex_lock(&ctx->recon_mutex); + list_add_tail(&ev->list, &ctx->recon_head.list); + pthread_cond_signal(&ctx->recon_cv); + pthread_mutex_unlock(&ctx->recon_mutex); +} + + +static void +nsr_recon_set_leader (xlator_t *this) +{ + + nsr_private_t *priv = this->private; + nsr_recon_notify_ctx_t *ctx = (nsr_recon_notify_ctx_t *)priv->recon_ctx; + nsr_recon_role_t role; + xlator_t *old = this; + uint32_t i=0; + + if (priv->leader != _gf_true) + return; + + if (ctx->last_reconciled_term == priv->current_term) + return; + + // No majority as of yet + if (priv->up_children <= (priv->n_children / 2)) + return; + + gf_log (this->name, GF_LOG_INFO, + "Sending message to do recon with %d nodes\n", + priv->up_children); + + role.num = 0; + role.role = leader; + for (i = 0; i < priv->n_children; ++i) { + if (priv->kid_state & (1 << i)) { + gf_log (this->name, GF_LOG_INFO, + "Recon using host %s", + ctx->hosts[i]); + strcpy(role.info[role.num].name, ctx->hosts[i]); + (role.num)++; + } + } + + gf_log (this->name, GF_LOG_INFO, + "setting current term as %d", priv->current_term); + role.current_term = priv->current_term; + ENDIAN_CONVERSION_RR(role, _gf_false); + + // inform the reconciliator that this is leader + // in the callback (once reconciliation is done), + // we will unfence the IOs. + // TBD - error handling later. + glfs_lseek(ctx->fd, nsr_recon_xlator_sector_1, SEEK_SET); + glusterfs_this_set(old); + gf_log (this->name, GF_LOG_INFO, + "Writing to local node to set leader"); + glfs_write(ctx->fd, &role, + sizeof(role), 0); + glusterfs_this_set(old); + gf_log (this->name, GF_LOG_INFO, + "glfs_write returned. unfencing IO\n"); + + // TBD - error handling + + ctx->last_reconciled_term = priv->current_term; + atomic_fetch_and(&(priv->fence_io), 0); + + return; +} + +static void +nsr_recon_add_child (xlator_t *this, uint32_t index) +{ + nsr_private_t *priv = this->private; + nsr_recon_notify_ctx_t *ctx = (nsr_recon_notify_ctx_t *)priv->recon_ctx; + nsr_recon_role_t role; + xlator_t *old = this; + + if (priv->leader != _gf_true) + return; + + // reconciliation still pending. + // Check if we have majority + if (ctx->last_reconciled_term != priv->current_term) { + nsr_recon_set_leader(this); + } else { + // Reconciliation done. + // new child joining the majority/ + // Do reconciliation only fot this child but after fencing new IO and draining old IO + role.num = 1; + role.role = joiner; + + atomic_fetch_or(&(priv->fence_io), 1); + while(priv->ops_in_flight) { + sleep(1); + } + + strcpy(role.info[0].name, ctx->hosts[index]); + role.current_term = priv->current_term; + ENDIAN_CONVERSION_RR(role, _gf_false); + glfs_lseek(ctx->fd, nsr_recon_xlator_sector_1, SEEK_SET); + glusterfs_this_set(old); + gf_log (this->name, GF_LOG_INFO, + "Writing to local node to join %s\n", role.info[0].name); + glfs_write(ctx->fd, &role, + sizeof(role), 0); + glusterfs_this_set(old); + gf_log (this->name, GF_LOG_INFO, + "Write to local node to set joiner returned\n"); + + // TBD - error handling + atomic_fetch_and(&(priv->fence_io), 0); + } + + return; +} + +static uint32_t +nsr_setup_recon (xlator_t *this) +{ + nsr_private_t *priv = this->private; + xlator_t *old = this; + uint32_t ret = 0; + nsr_recon_notify_ctx_t *ctx = (nsr_recon_notify_ctx_t *)priv->recon_ctx; + + if (priv->nsr_recon_start == _gf_false) + return 0; + + ctx->fs = glfs_new(priv->vol_uuid); + if (!ctx->fs) { + ret = 1; + gf_log (this->name, GF_LOG_ERROR, "failed to initialise glfs \n"); + goto done; + } + + glusterfs_this_set(old); + ret = glfs_set_volfile(ctx->fs, priv->vol_file); + if (ret != 0) { + gf_log (this->name, GF_LOG_ERROR, "failed to set volfile \n"); + goto done; + } + + glusterfs_this_set(old); + /* + * REVIEW + * Logs belong in /var/log not /tmp. + */ + glfs_set_logging (ctx->fs,"/tmp/glfs-log", 7); + if (glfs_init(ctx->fs) < 0) { + gf_log (this->name, GF_LOG_ERROR, "failed to init volfile \n"); + ret = 1; + goto done; + } + + glusterfs_this_set(old); + ctx->fd = glfs_open (ctx->fs, "/", O_RDWR); + if (ctx->fd == NULL) { + ret = 1; + gf_log (this->name, GF_LOG_ERROR, + "failed to open fd to communicate with recon process \n"); + goto done; + } + + +done: + glusterfs_this_set(old); + return ret; +} + + +static void +nsr_setup_hosts(xlator_t *this) +{ + xlator_list_t *trav; + nsr_private_t *priv = this->private; + uint32_t i = 0; + nsr_recon_notify_ctx_t *ctx = (nsr_recon_notify_ctx_t *)priv->recon_ctx; + + ctx->hosts = GF_CALLOC(sizeof(char *), priv->n_children, gf_mt_nsr_private_t); + // Iterate thru all the children + for (trav = this->children; trav; trav = trav->next) { + char *hostname = NULL, *vol = NULL; + int ret1 = 0, ret2 = 0, ret = 0; + xlator_t *xl = trav->xlator; + // If the child type is that of protocol/client + if (!strcmp(trav->xlator->type, "protocol/client")) { + ret1 = xlator_get_option (xl, "remote-host", &hostname); + ret2 = xlator_get_option (xl, "remote-subvolume", &vol); + if (!ret1 && !ret2) { + // add the name of that host to the hosts + ctx->hosts[i] = GF_CALLOC(sizeof(char), strlen(hostname) + strlen(vol) + 2, 0); + strcpy(ctx->hosts[i], hostname); + strcat(ctx->hosts[i], ":"); + strcat(ctx->hosts[i], vol); + gf_log (this->name, GF_LOG_INFO, + "adding hosts %s to recon notfiy list", ctx->hosts[i]); + } else { + gf_log (this->name, GF_LOG_ERROR, + "CANNOT FIND HOSTNAME FOR A CHILD"); + GF_ASSERT(0); + } + // local brick + } else { + ret = xlator_get_option (this, "my-name", &hostname); + if (!ret) { + uint32_t len = strlen(hostname); + ctx->hosts[i] = GF_CALLOC(sizeof(char), + len+1, + gf_mt_nsr_private_t); + strcpy(ctx->hosts[i], hostname); + gf_log (this->name, GF_LOG_INFO, + "adding my host %s to recon notfiy list", ctx->hosts[i]); + } else { + gf_log (this->name, GF_LOG_ERROR, + "CANNOT FIND MY HOSTNAME"); + GF_ASSERT(0); + } + } + i++; + } +} + +void * +nsr_recon_notify_thread (void *arg) +{ + xlator_t *this = (xlator_t *)arg; + nsr_private_t *priv = this->private; + nsr_recon_notify_ev_t *ev; + nsr_recon_notify_ctx_t *ctx; + + priv->recon_ctx = GF_CALLOC(1, sizeof(nsr_recon_notify_ctx_t), gf_mt_nsr_private_t); + if (!priv->recon_ctx) { + gf_log (this->name, GF_LOG_ERROR, "calloc error"); + return NULL; + } + ctx = priv->recon_ctx; + + pthread_mutex_init(&(ctx->recon_mutex), NULL); + pthread_cond_init(&(ctx->recon_cv), NULL); + INIT_LIST_HEAD(&(ctx->recon_head.list)); + + nsr_setup_hosts(this); + + if (nsr_setup_recon(this)) { + gf_log (this->name, GF_LOG_ERROR, "recon notify thread : initing glfs error"); + return NULL; + } + + priv->recon_notify_inited = 1; + + while(1) { + pthread_mutex_lock(&ctx->recon_mutex); + while (list_empty(&(ctx->recon_head.list))) { + pthread_cond_wait(&ctx->recon_cv, &ctx->recon_mutex); + } + pthread_mutex_unlock(&ctx->recon_mutex); + + list_for_each_entry(ev, &(ctx->recon_head.list), list) { + + if (ev->id == NSR_RECON_SET_LEADER) { + gf_log (this->name, GF_LOG_INFO, + "got add leader notfiy event"); + nsr_recon_set_leader(this); + } else if (ev->id == NSR_RECON_ADD_CHILD) { + gf_log (this->name, GF_LOG_INFO, + "got add child notify event"); + nsr_recon_add_child(this, ev->index); + } + } + list_del_init (&ev->list); + } + + return NULL; +} + diff --git a/xlators/mgmt/glusterd/src/glusterd-volgen.c b/xlators/mgmt/glusterd/src/glusterd-volgen.c index 59bc7bcd5..fe33c8d7d 100644 --- a/xlators/mgmt/glusterd/src/glusterd-volgen.c +++ b/xlators/mgmt/glusterd/src/glusterd-volgen.c @@ -1468,6 +1468,11 @@ add_nsr_stuff (volgen_graph_t *graph, char *volname, char c_d[NSR_MAX_PATH_SIZE]; char *username = NULL, *password = NULL; gf_boolean_t enable_recon = _gf_false; +#ifdef HAVE_ETCD + uint32_t nsr_port = 27000; +#else + static uint32_t nsr_port = 27000; +#endif if (glusterd_volinfo_get_boolean(volinfo,"cluster.nsr.recon") > 0) { @@ -1623,7 +1628,7 @@ add_nsr_stuff (volgen_graph_t *graph, char *volname, get_vol_transport_type (volinfo, transt); if(xlator_set_option (xl, "transport-type", transt) == -1) return -1; - sprintf(s,"%d",27000); + sprintf(s,"%d",nsr_port); if(xlator_set_option (xl, "transport.socket.listen-port", s) == -1) return -1; strcpy(auth, "auth.addr."); @@ -1655,7 +1660,7 @@ add_nsr_stuff (volgen_graph_t *graph, char *volname, return -1; if(xlator_set_option (kid, "transport-type", transt) == -1) return -1; - sprintf(s,"%d",27000); + sprintf(s,"%d",nsr_port++); if(xlator_set_option (kid, "remote-port", s) == -1) return -1; snprintf (c_d, PATH_MAX, |