diff options
Diffstat (limited to 'xlators/cluster/nsr-recon/src/recon_driver.c')
-rw-r--r-- | xlators/cluster/nsr-recon/src/recon_driver.c | 533 |
1 files changed, 350 insertions, 183 deletions
diff --git a/xlators/cluster/nsr-recon/src/recon_driver.c b/xlators/cluster/nsr-recon/src/recon_driver.c index 7f92b6578..49ee465c5 100644 --- a/xlators/cluster/nsr-recon/src/recon_driver.c +++ b/xlators/cluster/nsr-recon/src/recon_driver.c @@ -254,7 +254,7 @@ out: * Output Arguments: * buf - where the values are written one after the other (NULL seperated) */ -static void +static int32_t get_xattr(struct glfs_fd *fd, char *keys, char *buf, @@ -277,7 +277,7 @@ get_xattr(struct glfs_fd *fd, // TBD - handle error if (r == -1) - return; + return -1; // increment the key to next value keys += len; @@ -285,7 +285,7 @@ get_xattr(struct glfs_fd *fd, // increment buf to hold the next key buf += strlen(buf) + 1; } - return; + return 0; } /* @@ -296,7 +296,7 @@ get_xattr(struct glfs_fd *fd, * keys - bunch of NULL seperated key names * num - number of keys */ -static void delete_xattr(struct glfs_fd *fd, +static int32_t delete_xattr(struct glfs_fd *fd, dict_t *dict_t, char *keys, uint32_t num) @@ -304,10 +304,11 @@ static void delete_xattr(struct glfs_fd *fd, while(num--) { // get the value and copy the value // TBD - handle failure cases when calling glfs_fremovexattr_with_xdata() - glfs_fremovexattr_with_xdata(fd, keys, dict_t); + if (glfs_fremovexattr_with_xdata(fd, keys, dict_t) == -1) + return -1; keys += strlen(keys) +1; } - return; + return 0; } /* @@ -320,7 +321,7 @@ static void delete_xattr(struct glfs_fd *fd, * Each of the key-value is seperated by NULL in turn. * num - Number of such key value pairs. */ -static void +static int32_t fill_xattr(struct glfs_fd *fd, dict_t *dict, char *buf, @@ -336,10 +337,10 @@ fill_xattr(struct glfs_fd *fd, // TBD - handle failure cases when calling glfs_fsetxattr_with_xdata() r = glfs_fsetxattr_with_xdata(fd, k, val, strlen(val), 0, dict); if (r == -1) - return; + return -1; k = val + strlen(val) + 1; } - return; + return 0; } /* @@ -408,7 +409,7 @@ nsr_recon_get_file(char *vol, nsr_replica_worker_t *worker) * ctx - The per worker based context * control - set to true if this worker is for the control plane */ -static int +static int32_t nsr_recon_start_work(nsr_per_node_worker_t *ctx, gf_boolean_t control) { @@ -497,6 +498,7 @@ nsr_recon_start_work(nsr_per_node_worker_t *ctx, if (aux_fd == NULL) { nsr_worker_log(this->name, GF_LOG_ERROR, "cannot open aux log file for thread %s\n",ctx->id); + return -1; } else { nsr_worker_log(this->name, GF_LOG_ERROR, "---opened aux log file for thread %s\n",ctx->id); @@ -517,7 +519,7 @@ nsr_recon_start_work(nsr_per_node_worker_t *ctx, * ctx - The per worker based context * control - set to true if this worker is for the control plane */ -static int +static int32_t nsr_recon_end_work(nsr_per_node_worker_t *ctx, gf_boolean_t control) { @@ -648,13 +650,7 @@ control_worker_func_0(nsr_per_node_worker_t *ctx, nsr_worker_log(this->name, GF_LOG_INFO, "this message should not be sent \n"); - break; - } - case NSR_WORK_ID_END_RECONCILIATION: - { - nsr_worker_log(this->name, GF_LOG_INFO, - "sending reconciliation end message to node %d\n", index); - nsr_recon_return_back(priv, dr->txn_id); + ctx->result = -1; break; } case NSR_WORK_ID_GET_RECONCILATION_WINDOW: @@ -676,16 +672,29 @@ control_worker_func_0(nsr_per_node_worker_t *ctx, rd = GF_CALLOC(num, sizeof(nsr_recon_record_details_t), gf_mt_recon_private_t); + if (rd == NULL) { + ctx->result = -1; + return; + } + recon_info->records = GF_CALLOC(num, sizeof(nsr_reconciliation_record_t), gf_mt_recon_private_t); + if (recon_info->records == NULL) { + ctx->result = -1; + return; + } // TBD - handle errors - nsr_recon_libchangelog_get_records(this, priv->changelog_base_path, + if (nsr_recon_libchangelog_get_records(this, priv->changelog_base_path, recon_info->last_term, recon_info->first_index, recon_info->last_index, - rd); + rd) == _gf_false) { + ctx->result = -1; + return; + } + // The above function writes into rd from 0 to (num -1) // We need to take care of this whenever we deal with records for (i=0; i < num; i++) { @@ -778,7 +787,10 @@ control_worker_func(nsr_per_node_worker_t *ctx, "calling nsr_recon_start_work\n"); // TBD - handle error in case nsr_recon_start_work gives error - nsr_recon_start_work(ctx, _gf_true); + if (nsr_recon_start_work(ctx, _gf_true) != 0) { + ctx->result = -1; + return; + } nsr_worker_log(this->name, GF_LOG_INFO, "finished nsr_recon_start_work\n"); @@ -790,7 +802,10 @@ control_worker_func(nsr_per_node_worker_t *ctx, "calling nsr_recon_end_work\n"); // TBD - handle error in case nsr_recon_end_work gives error - nsr_recon_end_work(ctx, _gf_true); + if (nsr_recon_end_work(ctx, _gf_true) != 0) { + ctx->result = -1; + return; + } nsr_worker_log(this->name, GF_LOG_INFO, "finished nsr_recon_end_work\n"); @@ -807,9 +822,18 @@ control_worker_func(nsr_per_node_worker_t *ctx, // first write the current term term number // TBD - error handling for all the glfs APIs - glfs_lseek(ctx->aux_fd, nsr_recon_xlator_sector_4, SEEK_SET); - glfs_write(ctx->aux_fd, &term, sizeof(term), 0); - glfs_read(ctx->aux_fd, <, sizeof(lt), 0); + if (glfs_lseek(ctx->aux_fd, nsr_recon_xlator_sector_4, SEEK_SET) == -1) { + ctx->result = -1; + return; + } + if (glfs_write(ctx->aux_fd, &term, sizeof(term), 0) == -1) { + ctx->result = -1; + return; + } + if (glfs_read(ctx->aux_fd, <, sizeof(lt), 0) == -1) { + ctx->result = -1; + return; + } ENDIAN_CONVERSION_LT(lt, _gf_true); //ntohl recon_info->last_term = lt.last_term; recon_info->commited_ops = lt.commited_ops; @@ -834,9 +858,18 @@ control_worker_func(nsr_per_node_worker_t *ctx, // first write the term number // TBD - error handling for all the glfs APIs - glfs_lseek(ctx->aux_fd, nsr_recon_xlator_sector_3, SEEK_SET); - glfs_write(ctx->aux_fd, &term, sizeof(term), 0); - glfs_read(ctx->aux_fd, <, sizeof(lt), 0); + if (glfs_lseek(ctx->aux_fd, nsr_recon_xlator_sector_3, SEEK_SET) == -1) { + ctx->result = -1; + return; + } + if (glfs_write(ctx->aux_fd, &term, sizeof(term), 0) == -1) { + ctx->result = -1; + return; + } + if (glfs_read(ctx->aux_fd, <, sizeof(lt), 0) == -1) { + ctx->result = -1; + return; + } ENDIAN_CONVERSION_LT(lt, _gf_true); //ntohl recon_info->last_term = lt.last_term; recon_info->commited_ops = lt.commited_ops; @@ -863,9 +896,12 @@ control_worker_func(nsr_per_node_worker_t *ctx, "trying to make this index %d as reconciliator for term %d\n", index, term); // TBD - error handling for all the glfs APIs - glfs_lseek(ctx->aux_fd, + if (glfs_lseek(ctx->aux_fd, nsr_recon_xlator_sector_1, - SEEK_SET); + SEEK_SET) == -1) { + ctx->result = -1; + return; + } // We have all the info for all other nodes. // Fill all that info when sending data to that process. @@ -888,7 +924,15 @@ control_worker_func(nsr_per_node_worker_t *ctx, rr.num = num; rr.role = reconciliator; ENDIAN_CONVERSION_RR(rr, _gf_false); //htonl - glfs_write(ctx->aux_fd, &rr, sizeof(rr), 0); + if (glfs_write(ctx->aux_fd, &rr, sizeof(rr), 0) == -1) { + ctx->result = -1; + // Put the errno only for this case since we + // are bothered about retrying only for this case. + // For rest of the cases we will just return EIO + // in errno. + ctx->op_errno = errno; + return; + } nsr_worker_log(this->name, GF_LOG_INFO, "sent reconciliator info for term %d with node count as %d\n", term, num); @@ -905,14 +949,18 @@ control_worker_func(nsr_per_node_worker_t *ctx, "trying to make this index %d as resolutor with reconciliator as %d\n",index, rec); // TBD - error handling for all the glfs APIs - glfs_lseek(ctx->aux_fd, + if (glfs_lseek(ctx->aux_fd, nsr_recon_xlator_sector_1, - SEEK_SET); + SEEK_SET) == -1) { + ctx->result = -1; + return; + } + rr.num = 2; // Fill in info[0] as info for the node for which we are seeking resolution. // Fill in info[1] as info of the reconciliator node. - // The function nsr_recon_driver_set_role() that will be called when + // The function nsr_recon_driver_get_role() that will be called when // this message reaches the node will look at index 1 for term information // related to the reconciliator. for (i=0; i < 2; i++) { @@ -943,33 +991,21 @@ control_worker_func(nsr_per_node_worker_t *ctx, } rr.role = resolutor; ENDIAN_CONVERSION_RR(rr, _gf_false); //htonl - glfs_write(ctx->aux_fd, &rr, sizeof(rr), 0); + if (glfs_write(ctx->aux_fd, &rr, sizeof(rr), 0) == -1) { + ctx->result = -1; + // Put the errno only for this case since we + // are bothered about retrying only for this case. + // For rest of the cases we will just return EIO + // in errno. + ctx->op_errno = errno; + return; + } nsr_worker_log(this->name, GF_LOG_INFO, "sent message to this node %d resolutor with reconciliator as %d\n", index, rec); break; } - case NSR_WORK_ID_END_RECONCILIATION: - { - char c[4]; - uint32_t old = htonl(dr->txn_id); - - nsr_worker_log(this->name, GF_LOG_INFO, - "sending reconciliation end message to node %d\n", index); - - memcpy(c, &old, sizeof(uint32_t)); - // TBD - error handling for all the glfs APIs - glfs_lseek(ctx->aux_fd, - nsr_recon_xlator_sector_0, - SEEK_SET); - glfs_write(ctx->aux_fd, c, sizeof(c), 0); - - nsr_worker_log(this->name, GF_LOG_INFO, - "finished sending reconciliation end message to node %d\n", index); - - break; - } case NSR_WORK_ID_GET_RECONCILATION_WINDOW: { nsr_recon_log_info_t li; @@ -985,24 +1021,42 @@ control_worker_func(nsr_per_node_worker_t *ctx, // TBD - error handling for all the glfs APIs - glfs_lseek(ctx->aux_fd, nsr_recon_xlator_sector_2, SEEK_SET); + if (glfs_lseek(ctx->aux_fd, nsr_recon_xlator_sector_2, SEEK_SET) == -1) { + ctx->result = -1; + return; + } // write to node what term & indices we are interested li.term = recon_info->last_term; li.first_index = recon_info->first_index; li.last_index = recon_info->last_index; ENDIAN_CONVERSION_LI(li, _gf_false); //htonl - glfs_write(ctx->aux_fd, &li, sizeof(li), 0); + if (glfs_write(ctx->aux_fd, &li, sizeof(li), 0) == -1) { + ctx->result = -1; + return; + } // then read rd = GF_CALLOC(num, sizeof(nsr_recon_record_details_t), gf_mt_recon_private_t); + if (rd == NULL) { + ctx->result = -1; + return; + } recon_info->records = GF_CALLOC(num, sizeof(nsr_reconciliation_record_t), gf_mt_recon_private_t); + if (recon_info->records == NULL) { + ctx->result = -1; + return; + } + + if (glfs_read(ctx->aux_fd, rd, num * sizeof(nsr_recon_record_details_t), 0) == -1) { + ctx->result = -1; + return; + } - glfs_read(ctx->aux_fd, rd, num * sizeof(nsr_recon_record_details_t), 0); for (i=0; i < num; i++) { ENDIAN_CONVERSION_RD(rd[i], _gf_true); //ntohl memcpy(&(recon_info->records[i].rec), @@ -1132,7 +1186,7 @@ compute_reconciliation_work(nsr_recon_driver_ctx_t *ctx) return; } -static void +static int32_t nsr_recon_in_use(nsr_recon_driver_ctx_t *ctx, uint32_t i, gf_boolean_t in_use); @@ -1145,14 +1199,14 @@ nsr_recon_in_use(nsr_recon_driver_ctx_t *ctx, gf_boolean_t nsr_recon_driver_set_role(nsr_recon_driver_ctx_t *ctx, nsr_recon_role_t *rr, - uint32_t txn_id) + uint32_t term) { nsr_role_work_t *rw; nsr_driver_log(this->name, GF_LOG_INFO, "set role called \n"); rw = GF_CALLOC(1, sizeof (nsr_role_work_t), 0); memcpy(&rw->role, rr, sizeof(nsr_recon_role_t)); - rw->txn_id = txn_id; + rw->term = term; INIT_LIST_HEAD(&(rw->list)); pthread_mutex_lock(&(ctx->mutex)); list_add_tail(&rw->list, &ctx->role_head.list); @@ -1176,15 +1230,11 @@ nsr_recon_driver_set_role(nsr_recon_driver_ctx_t *ctx, * If resolution to be done, then rr.info[0] will have this node's info * which the leader would have got earlier. rr[1].info will have the * info regarding the reconciliator. - * txn_id - All role changes(except when leader becomes reconciliator or resolutor) - * would be initiated as write to the recon xlator which would have got a frame from - * either the brick process(leader change) or other reconciliation process. - * The write function would return immediately after storing the frame which - * needs to be returned back after the actual reconciliation is done. - * For that we store the frame against this id which acts as a key. + * term - leader's term that is causing this role */ nsr_recon_driver_state_t -nsr_recon_driver_get_role(nsr_recon_driver_ctx_t *ctx, +nsr_recon_driver_get_role(int32_t *status, + nsr_recon_driver_ctx_t *ctx, nsr_role_work_t *rw) { uint8_t i=0, j=0; @@ -1192,17 +1242,24 @@ nsr_recon_driver_get_role(nsr_recon_driver_ctx_t *ctx, // First make all the threads uninitialise for (i = 0; i < ctx->replica_group_size; i++) { - nsr_recon_in_use(ctx, i, _gf_false); + if (nsr_recon_in_use(ctx, i, _gf_false) == -1) { + *status = -1; + return 0; + } } if ((rr->role == leader) || (rr->role == joiner)) { // First set info this node - nsr_recon_in_use(ctx, 0, _gf_true); + if (nsr_recon_in_use(ctx, 0, _gf_true) == -1) { + *status = -1; + return 0; + } ctx->workers[0].recon_info = GF_CALLOC (1, sizeof (nsr_reconciliator_info_t), gf_mt_recon_private_t); if (!ctx->workers[0].recon_info) { - return _gf_false; + *status = -1; + return 0; } ctx->current_term = rr->current_term; @@ -1213,11 +1270,14 @@ nsr_recon_driver_get_role(nsr_recon_driver_ctx_t *ctx, if (!strcmp(ctx->workers[i].name, rr->info[j].name)) { //if (strstr(ctx->workers[i].name, rr->info[j].name)) { nsr_driver_log(this->name, GF_LOG_INFO, - "nsr_recon_driver_set_role: this as %s. found other server %s\n", + "nsr_recon_driver_get_role: this as %s. found other server %s\n", (rr->role == leader) ? "leader" : "joiner", ctx->workers[i].name); - nsr_recon_in_use(ctx, i, _gf_true); + if (nsr_recon_in_use(ctx, i, _gf_true) == -1) { + *status = -1; + return 0; + } // Allocate this here. This will get later filled when // the leader tries to get last term information from all // the nodes @@ -1225,7 +1285,8 @@ nsr_recon_driver_get_role(nsr_recon_driver_ctx_t *ctx, sizeof (nsr_reconciliator_info_t), gf_mt_recon_private_t); if (!ctx->workers[i].recon_info) { - return _gf_false; + *status = -1; + return 0; } break; } @@ -1245,13 +1306,14 @@ nsr_recon_driver_get_role(nsr_recon_driver_ctx_t *ctx, if (!strcmp(rr->info[i].name, ctx->workers[j].name)) { //if (strstr(ctx->workers[j].name, rr->info[i].name)) { nsr_driver_log(this->name, GF_LOG_INFO, - "nsr_recon_driver_set_role: this as reconciliator. found other server %s\n", + "nsr_recon_driver_get_role: this as reconciliator. found other server %s\n", ctx->workers[j].name); ctx->workers[j].recon_info = GF_CALLOC (1, sizeof (nsr_reconciliator_info_t), gf_mt_recon_private_t); if (!ctx->workers[j].recon_info) { - return _gf_false; + *status = -1; + return 0; } ctx->workers[j].recon_info->last_term = rr->info[i].last_term; @@ -1261,7 +1323,10 @@ nsr_recon_driver_get_role(nsr_recon_driver_ctx_t *ctx, rr->info[i].last_index; ctx->workers[j].recon_info->first_index = rr->info[i].first_index; - nsr_recon_in_use(ctx, j, _gf_true); + if (nsr_recon_in_use(ctx, j, _gf_true) == -1) { + *status = -1; + return 0; + } break; } } @@ -1272,13 +1337,14 @@ nsr_recon_driver_get_role(nsr_recon_driver_ctx_t *ctx, //if (strstr(ctx->workers[j].name, rr->info[1].name)) { if (!strcmp(rr->info[1].name, ctx->workers[j].name)) { nsr_driver_log(this->name, GF_LOG_INFO, - "nsr_recon_driver_set_role: this as resolutor. found other server %s as reconciliator\n", - ctx->workers[1].name); + "nsr_recon_driver_get_role: this as resolutor. found other server %s as reconciliator\n", + ctx->workers[j].name); ctx->workers[j].recon_info = GF_CALLOC (1, sizeof (nsr_reconciliator_info_t), gf_mt_recon_private_t); if (!ctx->workers[j].recon_info) { - return _gf_false; + *status = -1; + return 0; } ctx->workers[j].recon_info->last_term = rr->info[1].last_term; @@ -1289,7 +1355,10 @@ nsr_recon_driver_get_role(nsr_recon_driver_ctx_t *ctx, ctx->workers[j].recon_info->first_index = rr->info[1].first_index; ctx->reconciliator_index = j; - nsr_recon_in_use(ctx, j, _gf_true); + if (nsr_recon_in_use(ctx, j, _gf_true) == -1) { + *status = -1; + return 0; + } GF_ASSERT(ctx->reconciliator_index != 0); break; } @@ -1298,18 +1367,23 @@ nsr_recon_driver_get_role(nsr_recon_driver_ctx_t *ctx, sizeof (nsr_reconciliator_info_t), gf_mt_recon_private_t); if (!ctx->workers[0].recon_info) { - return _gf_false; + *status = -1; + return 0; } // info[0] has all info for this node ctx->workers[0].recon_info->last_term = rr->info[0].last_term; ctx->workers[0].recon_info->commited_ops = rr->info[0].commited_ops; ctx->workers[0].recon_info->last_index = rr->info[0].last_index; ctx->workers[0].recon_info->first_index = rr->info[0].first_index; - nsr_recon_in_use(ctx, 0, _gf_true); + if (nsr_recon_in_use(ctx, 0, _gf_true) == -1) { + *status = -1; + return 0; + } } - ctx->txn_id = rw->txn_id; + ctx->term = rw->term; + *status = 0; return rr->role; } @@ -1401,7 +1475,7 @@ create_obj(nsr_per_node_worker_t *ctx, char *gfid_str) * and the changelog translator consumes term and index. */ -static void +static gf_boolean_t apply_record(nsr_per_node_worker_t *ctx, nsr_reconciliation_record_t *ri, dict_t * dict) @@ -1420,10 +1494,11 @@ apply_record(nsr_per_node_worker_t *ctx, // TBD - get a way to just stuff the log entry without writing the data so that // changelogs remain identical. if (ri->work.data == NULL) { - return; + return _gf_true; } - if ((obj = create_obj(ctx,ri->rec.gfid)) == NULL) return; + if ((obj = create_obj(ctx,ri->rec.gfid)) == NULL) + return _gf_false;; fd = glfs_h_open_with_xdata(ctx->fs, obj, O_RDWR, dict); if (fd == NULL) { @@ -1431,23 +1506,28 @@ apply_record(nsr_per_node_worker_t *ctx, nsr_worker_log(this->name, GF_LOG_ERROR, "open for file %s failed\n", ri->rec.gfid); - return; + return _gf_false; } if (glfs_lseek_with_xdata(fd, ri->rec.offset, SEEK_SET, dict) != ri->rec.offset) { GF_ASSERT(0); nsr_worker_log(this->name, GF_LOG_ERROR, "lseek for file %s failed at offset %d\n", ri->rec.gfid, ri->rec.offset); - return; + return _gf_false; } if (glfs_write_with_xdata(fd, ri->work.data, ri->rec.len, 0, dict) != ri->rec.len) { GF_ASSERT(0); nsr_worker_log(this->name, GF_LOG_ERROR, "write for file %s failed for bytes %d\n", ri->rec.gfid, ri->rec.len); - return; + return _gf_false; } - glfs_close_with_xdata(fd, dict); + if (glfs_close_with_xdata(fd, dict) == -1) { + GF_ASSERT(0); + nsr_worker_log(this->name, GF_LOG_ERROR, + "close failed\n"); + return _gf_false; + } nsr_worker_log(this->name, GF_LOG_INFO, "Finished DOing write for gfid %s @offset %d for len %d\n", @@ -1459,7 +1539,7 @@ apply_record(nsr_per_node_worker_t *ctx, "DOing truncate for file %s @offset %d \n", ri->rec.gfid, ri->rec.offset); - if ((obj = create_obj(ctx, ri->rec.gfid)) == NULL) return; + if ((obj = create_obj(ctx, ri->rec.gfid)) == NULL) return _gf_false; fd = glfs_h_open_with_xdata(ctx->fs, obj, O_RDWR, dict); if (fd == NULL) { @@ -1467,16 +1547,21 @@ apply_record(nsr_per_node_worker_t *ctx, nsr_worker_log(this->name, GF_LOG_ERROR, "open for file %s failed\n", ri->rec.gfid); - return; + return _gf_false; } if (glfs_ftruncate_with_xdata(fd, ri->rec.offset, dict) == -1) { GF_ASSERT(0); nsr_worker_log(this->name, GF_LOG_ERROR, "trunctae for file %s failed @offset %d\n", ri->rec.gfid,ri->rec.offset ); - return; + return _gf_false; } - glfs_close_with_xdata(fd, dict); + if (glfs_close_with_xdata(fd, dict) == -1) { + GF_ASSERT(0); + nsr_worker_log(this->name, GF_LOG_ERROR, + "close failed\n"); + return _gf_false; + } nsr_worker_log(this->name, GF_LOG_INFO, "Finished DOing truncate for gfid %s @offset %d \n", @@ -1499,10 +1584,10 @@ apply_record(nsr_per_node_worker_t *ctx, // TBD - get a way to just stuff the log entry without writing the data so that // changelogs remain identical. if (ri->work.data == NULL) { - return; + return _gf_true; } - if ((obj = create_obj(ctx, ri->rec.gfid)) == NULL) return; + if ((obj = create_obj(ctx, ri->rec.gfid)) == NULL) return _gf_false; if (obj->inode->ia_type == IA_IFDIR) fd = glfs_h_opendir_with_xdata(ctx->fs, obj, dict); @@ -1513,7 +1598,7 @@ apply_record(nsr_per_node_worker_t *ctx, nsr_worker_log(this->name, GF_LOG_ERROR, "open for file %s failed\n", ri->rec.gfid); - return; + return _gf_false; } if(get_xattr_total_size(fd, &t_b, &k_s, &v_s, &num, dict) == -1) { @@ -1521,10 +1606,15 @@ apply_record(nsr_per_node_worker_t *ctx, GF_ASSERT(0); nsr_worker_log(this->name, GF_LOG_ERROR, "list of xattr of %s failed\n", ri->rec.gfid); - return; + return _gf_false; } - delete_xattr(fd, dict, t_b, num); + if (delete_xattr(fd, dict, t_b, num) == -1) { + GF_ASSERT(0); + nsr_worker_log(this->name, GF_LOG_ERROR, + "deleting xattrs failed\n"); + return _gf_false; + } // Set one special dict flag to indicate the opcode so that // the opcode gets set to this @@ -1532,12 +1622,22 @@ apply_record(nsr_per_node_worker_t *ctx, GF_ASSERT(0); nsr_worker_log(this->name, GF_LOG_ERROR, "setting opcode to %d failed\n",ri->rec.op); - return; + return _gf_false; } - fill_xattr(fd, dict, ri->work.data, ri->work.num); + if (fill_xattr(fd, dict, ri->work.data, ri->work.num) == -1) { + GF_ASSERT(0); + nsr_worker_log(this->name, GF_LOG_ERROR, + "filling xattrs failed\n"); + return _gf_false; + } - glfs_close_with_xdata(fd, dict); + if (glfs_close_with_xdata(fd, dict) == -1) { + GF_ASSERT(0); + nsr_worker_log(this->name, GF_LOG_ERROR, + "close failed\n"); + return _gf_false; + } nsr_worker_log(this->name, GF_LOG_INFO, "Finsihed Doing set extended attr for %s \n", @@ -1553,7 +1653,7 @@ apply_record(nsr_per_node_worker_t *ctx, // TBD - add mode and flags later uuid_parse(ri->rec.gfid, gfid); - if ((obj = create_obj(ctx, ri->rec.pargfid)) == NULL) return; + if ((obj = create_obj(ctx, ri->rec.pargfid)) == NULL) return _gf_false; nsr_worker_log (this->name, GF_LOG_INFO, "creating with mode 0%o", ri->rec.mode); @@ -1562,7 +1662,7 @@ apply_record(nsr_per_node_worker_t *ctx, nsr_worker_log(this->name, GF_LOG_ERROR, "Failure for Doing create for file %s\n", ri->rec.entry); - return; + return _gf_false; } nsr_worker_log(this->name, GF_LOG_INFO, @@ -1579,14 +1679,14 @@ apply_record(nsr_per_node_worker_t *ctx, // TBD - add mode and flags later uuid_parse(ri->rec.gfid, gfid); - if ((obj = create_obj(ctx, ri->rec.pargfid)) == NULL) return; + if ((obj = create_obj(ctx, ri->rec.pargfid)) == NULL) return _gf_false; if (glfs_h_mknod_with_xdata(ctx->fs, obj, ri->rec.entry, O_RDWR, 0777, NULL, gfid, dict) == NULL) { GF_ASSERT(0); nsr_worker_log(this->name, GF_LOG_ERROR, "Failure for Doing mknod for file %s\n", ri->rec.entry); - return; + return _gf_false; } nsr_worker_log(this->name, GF_LOG_INFO, @@ -1603,14 +1703,14 @@ apply_record(nsr_per_node_worker_t *ctx, // TBD - add mode and flags later uuid_parse(ri->rec.gfid, gfid); - if ((obj = create_obj(ctx, ri->rec.pargfid)) == NULL) return; + if ((obj = create_obj(ctx, ri->rec.pargfid)) == NULL) return _gf_false; if (glfs_h_mkdir_with_xdata(ctx->fs, obj, ri->rec.entry, 0777, NULL, gfid, dict) != 0) { GF_ASSERT(0); nsr_worker_log(this->name, GF_LOG_ERROR, "Failure for Doing mkdir for file %s\n", ri->rec.entry); - return; + return _gf_false; } nsr_worker_log(this->name, GF_LOG_INFO, @@ -1623,13 +1723,13 @@ apply_record(nsr_per_node_worker_t *ctx, "Doing rmdir/ublink for dir %s \n", ri->rec.entry); - if ((obj = create_obj(ctx, ri->rec.pargfid)) == NULL) return; + if ((obj = create_obj(ctx, ri->rec.pargfid)) == NULL) return _gf_false; if (glfs_h_unlink_with_xdata(ctx->fs, obj, ri->rec.entry, dict) != 0) { GF_ASSERT(0); nsr_worker_log(this->name, GF_LOG_ERROR, "Failure for Doing rmdir/unlink for file %s\n", ri->rec.entry); - return; + return _gf_false; } nsr_worker_log(this->name, GF_LOG_INFO, @@ -1644,7 +1744,7 @@ apply_record(nsr_per_node_worker_t *ctx, "Doing symlink for file %s to file %s \n", ri->rec.entry, ri->rec.link_path); - if ((obj = create_obj(ctx, ri->rec.pargfid)) == NULL) return; + if ((obj = create_obj(ctx, ri->rec.pargfid)) == NULL) return _gf_false; uuid_parse(ri->rec.gfid, gfid); if (glfs_h_symlink_with_xdata(ctx->fs, obj, ri->rec.entry, ri->rec.link_path, NULL, gfid, dict) == NULL) { @@ -1652,7 +1752,7 @@ apply_record(nsr_per_node_worker_t *ctx, nsr_worker_log(this->name, GF_LOG_ERROR, "Failed to Doing symlink for file %s to file %s \n", ri->rec.entry, ri->rec.link_path); - return; + return _gf_false; } nsr_worker_log(this->name, GF_LOG_INFO, @@ -1667,15 +1767,15 @@ apply_record(nsr_per_node_worker_t *ctx, "Doing hard link for file %s to file %s \n", ri->rec.entry, ri->rec.gfid); - if ((obj = create_obj(ctx, ri->rec.pargfid)) == NULL) return; - if ((to_obj = create_obj(ctx, ri->rec.gfid)) == NULL) return; + if ((obj = create_obj(ctx, ri->rec.pargfid)) == NULL) return _gf_false; + if ((to_obj = create_obj(ctx, ri->rec.gfid)) == NULL) return _gf_false; if (glfs_h_link_with_xdata(ctx->fs, to_obj, obj, ri->rec.entry, dict) == -1) { GF_ASSERT(0); nsr_worker_log(this->name, GF_LOG_ERROR, "Failed to Doing hard link for file %s to file %s \n", ri->rec.entry, ri->rec.gfid); - return; + return _gf_false; } nsr_worker_log(this->name, GF_LOG_INFO, @@ -1690,15 +1790,15 @@ apply_record(nsr_per_node_worker_t *ctx, "Doing rename for file %s to file %s \n", ri->rec.entry, ri->rec.newloc); - if ((obj = create_obj(ctx, ri->rec.pargfid)) == NULL) return; - if ((to_obj = create_obj(ctx, ri->rec.gfid)) == NULL) return; + if ((obj = create_obj(ctx, ri->rec.pargfid)) == NULL) return _gf_false; + if ((to_obj = create_obj(ctx, ri->rec.gfid)) == NULL) return _gf_false; if (glfs_h_rename_with_xdata(ctx->fs, obj, ri->rec.entry, to_obj, ri->rec.newloc, dict) == -1) { GF_ASSERT(0); nsr_worker_log(this->name, GF_LOG_ERROR, "Failed to Doing rename for file %s to file %s \n", ri->rec.entry, ri->rec.newloc); - return; + return _gf_false; } nsr_worker_log(this->name, GF_LOG_INFO, @@ -1719,7 +1819,7 @@ apply_record(nsr_per_node_worker_t *ctx, "Doing attr for file %s \n", ri->rec.gfid); - if ((obj = create_obj(ctx, ri->rec.gfid)) == NULL) return; + if ((obj = create_obj(ctx, ri->rec.gfid)) == NULL) return _gf_false; if (obj->inode->ia_type == IA_IFDIR) fd = glfs_h_opendir_with_xdata(ctx->fs, obj, dict); @@ -1730,7 +1830,7 @@ apply_record(nsr_per_node_worker_t *ctx, nsr_worker_log(this->name, GF_LOG_ERROR, "open for file %s failed\n", ri->rec.gfid); - return; + return _gf_false; } iatt.ia_prot = ia_prot_from_st_mode(777); @@ -1743,7 +1843,7 @@ apply_record(nsr_per_node_worker_t *ctx, GF_ASSERT(0); nsr_worker_log(this->name, GF_LOG_ERROR, "setting opcode to %d failed\n",ri->rec.op); - return; + return _gf_false; } ret = glfs_fsetattr_with_xdata(fd, &iatt, valid, dict); @@ -1752,17 +1852,22 @@ apply_record(nsr_per_node_worker_t *ctx, nsr_worker_log(this->name, GF_LOG_INFO, "failed Doing attr for file %s \n", ri->rec.gfid); - return; + return _gf_false; } - glfs_close_with_xdata(fd, dict); + if (glfs_close_with_xdata(fd, dict) == -1) { + GF_ASSERT(0); + nsr_worker_log(this->name, GF_LOG_ERROR, + "close failed\n"); + return _gf_false; + } nsr_worker_log(this->name, GF_LOG_INFO, "Doing attr for file %s \n", ri->rec.gfid); } - return; + return _gf_true; } //return back opcodes that requires reading from source @@ -1816,7 +1921,10 @@ data_worker_func(nsr_per_node_worker_t *ctx, nsr_worker_log(this->name, GF_LOG_INFO, "started data ini \n"); - nsr_recon_start_work(ctx, _gf_false); + if (nsr_recon_start_work(ctx, _gf_false) != 0) { + ctx->result = -1; + return; + } nsr_worker_log(this->name, GF_LOG_INFO, "finished data ini \n"); @@ -1825,7 +1933,10 @@ data_worker_func(nsr_per_node_worker_t *ctx, nsr_worker_log(this->name, GF_LOG_INFO, "started data fini \n"); - nsr_recon_end_work(ctx, _gf_false); + if (nsr_recon_end_work(ctx, _gf_false) != 0) { + ctx->result = -1; + return; + } nsr_worker_log(this->name, GF_LOG_INFO, "finished data fini \n"); @@ -1838,18 +1949,21 @@ data_worker_func(nsr_per_node_worker_t *ctx, dict = dict_new (); if (!dict) { + ctx->result = -1; GF_ASSERT(0); nsr_worker_log(this->name, GF_LOG_ERROR, "failed allocating for dictionary\n"); break; } if (dict_set_int32(dict,RECON_TERM_XATTR,ri->work.term)) { + ctx->result = -1; GF_ASSERT(0); nsr_worker_log(this->name, GF_LOG_ERROR, "error setting term in dict\n"); break; } if (dict_set_int32(dict,RECON_INDEX_XATTR,ri->work.index)) { + ctx->result = -1; GF_ASSERT(0); nsr_worker_log(this->name, GF_LOG_ERROR, "error setting term in dict\n"); @@ -1978,7 +2092,12 @@ data_worker_func(nsr_per_node_worker_t *ctx, } ri->work.data = GF_CALLOC ((k_s + v_s) , sizeof(char), gf_mt_recon_private_t); - get_xattr(fd, t_b, ri->work.data, v_s, num, dict); + if (get_xattr(fd, t_b, ri->work.data, v_s, num, dict) == -1) { + GF_ASSERT(0); + nsr_worker_log(this->name, GF_LOG_ERROR, + "get xattr of gfid %s failed\n", rd->gfid); + break; + } ri->work.num = num; nsr_worker_log(this->name, GF_LOG_INFO, "finished getattr for gfid %s \n", @@ -2014,24 +2133,30 @@ data_worker_func(nsr_per_node_worker_t *ctx, wip, rd->gfid); dict = dict_new (); if (!dict) { + ctx->result = -1; GF_ASSERT(0); nsr_worker_log(this->name, GF_LOG_ERROR, "failed allocating for dictionary\n"); break; } if (dict_set_int32(dict,RECON_TERM_XATTR,ri->work.term)) { + ctx->result = -1; GF_ASSERT(0); nsr_worker_log(this->name, GF_LOG_ERROR, "error setting term in dict\n"); break; } if (dict_set_int32(dict,RECON_INDEX_XATTR,ri->work.index)) { + ctx->result = -1; GF_ASSERT(0); nsr_worker_log(this->name, GF_LOG_ERROR, "error setting term in dict\n"); break; } - apply_record(ctx, ri, dict); + if (apply_record(ctx, ri, dict) == _gf_false) { + nsr_worker_log(this->name, GF_LOG_ERROR, + "apply_record fails\n"); + } nsr_worker_log(this->name, GF_LOG_INFO, "finished recon commit for gfid %s \n", @@ -2041,18 +2166,21 @@ data_worker_func(nsr_per_node_worker_t *ctx, case NSR_WORK_ID_SINGLE_RECONCILIATION_FLUSH: dict = dict_new (); if (!dict) { + ctx->result = -1; GF_ASSERT(0); nsr_worker_log(this->name, GF_LOG_ERROR, "failed allocating for dictionary\n"); break; } if (dict_set_int32(dict,RECON_TERM_XATTR,ri->work.term)) { + ctx->result = -1; GF_ASSERT(0); nsr_worker_log(this->name, GF_LOG_ERROR, "error setting term in dict\n"); break; } if (dict_set_int32(dict,RECON_INDEX_XATTR,ri->work.index)) { + ctx->result = -1; GF_ASSERT(0); nsr_worker_log(this->name, GF_LOG_ERROR, "error setting term in dict\n"); @@ -2198,7 +2326,9 @@ create_worker_threads(nsr_recon_private_t *priv, * misc - used to overload such as index. */ static void -send_and_wait(int32_t bm, +send_and_wait(int32_t *result, + int32_t *op_errno, + int32_t bm, uint32_t num, nsr_recon_driver_ctx_t *ctx, nsr_recon_work_req_id_t id, @@ -2208,6 +2338,18 @@ send_and_wait(int32_t bm, uint32_t i = 0; nsr_recon_work_t *work; +#define CONTROL_WORKER(i) ctx->workers[i].control_worker +#define DATA_WORKER(i) ctx->workers[i].data_worker +#define WORKER(i) ((q == NSR_RECON_QUEUE_TO_CONTROL) ? (CONTROL_WORKER(i)) : (DATA_WORKER(i))) + + *result = *op_errno = 0; + + for (i=0; i < num; i++) { + if ((bm & (1 << i)) && ctx->workers[i].in_use) { + WORKER(i)->result = 0; + WORKER(i)->op_errno = 0; + } + } if (ctx->mode == NSR_SEQ) { for (i=0; i < num; i++) { if ((bm & (1 << i)) && ctx->workers[i].in_use) { @@ -2222,8 +2364,7 @@ send_and_wait(int32_t bm, } } } - nsr_driver_log(this->name, GF_LOG_INFO, "send_and_wait: all workers have returned\n"); - return; + goto out; } for (i=0; i < num; i++) { @@ -2238,41 +2379,40 @@ send_and_wait(int32_t bm, while (ctx->outstanding) { pthread_yield(); } - nsr_driver_log(this->name, GF_LOG_INFO, "send_and_wait: all workers have returned\n"); - return; -} - -#if 0 -static void -send_and_do_not_wait(int32_t bm, - uint32_t num, - nsr_recon_driver_ctx_t *ctx, - nsr_recon_work_req_id_t id, - nsr_recon_queue_type_t q, - int32_t misc) -{ - uint32_t i = 0; - - for (i=0; i < num; i++) { +out: + for (i=0; i < num; i++) { if ((bm & (1 << i)) && ctx->workers[i].in_use) { - nsr_recon_work_t *work; - recon_make_work(&work, id, misc); - recon_queue_to_worker(ctx, work, i, q); - } - } + if (WORKER(i)->result == -1) { + *result = -1; + } + } + } + if (*result == -1) { + for (i=0; i < num; i++) { + if ((bm & (1 << i)) && ctx->workers[i].in_use) { + if (WORKER(i)->op_errno == EAGAIN) { + *op_errno = EAGAIN; + break; + } else { + *op_errno = EIO; + } + } + } + } + nsr_driver_log(this->name, GF_LOG_INFO, "send_and_wait: all workers have returned with result: %d errno:%d\n", *result, *op_errno); return; } -#endif // send INI or FINI -static void +static int32_t nsr_recon_in_use(nsr_recon_driver_ctx_t *ctx, uint32_t i, gf_boolean_t in_use) { uint32_t bm = 1 << i; gf_boolean_t send = _gf_false; + int32_t status =0, op_errno = 0; if (in_use == _gf_false) { if (ctx->workers[i].in_use == _gf_true) @@ -2284,25 +2424,28 @@ nsr_recon_in_use(nsr_recon_driver_ctx_t *ctx, send = _gf_true; } } -#if 1 if (send == _gf_true) { if (in_use == _gf_true) { nsr_driver_log(this->name, GF_LOG_INFO, "sending INI to index %d\n",i); } else { nsr_driver_log(this->name, GF_LOG_INFO, "sending FINI to index %d\n",i); } - send_and_wait(bm, ctx->replica_group_size, ctx, + send_and_wait(&status, &op_errno, bm, ctx->replica_group_size, ctx, (in_use == _gf_true) ? NSR_WORK_ID_INI : NSR_WORK_ID_FINI, NSR_RECON_QUEUE_TO_CONTROL, -1); - send_and_wait(bm, ctx->replica_group_size, ctx, + if (status == -1) + return -1; + send_and_wait(&status, &op_errno, bm, ctx->replica_group_size, ctx, (in_use == _gf_true) ? NSR_WORK_ID_INI : NSR_WORK_ID_FINI, NSR_RECON_QUEUE_TO_DATA, -1); + if (status == -1) + return -1; if (in_use == _gf_false) { //GF_FREE(ctx->workers[i].recon_info->records); GF_FREE(ctx->workers[i].recon_info); } } -#endif + return 0; } // main recon driver thread @@ -2317,6 +2460,8 @@ nsr_reconciliation_driver(void *arg) int32_t bm; xlator_t *this = priv->this; char *con_name, *data_name; + int32_t status = 0; + int32_t op_errno = 0; driver_ctx = &priv->driver_thread_context; (*driver_ctx) = GF_CALLOC (1, @@ -2414,7 +2559,12 @@ nsr_reconciliation_driver(void *arg) list_for_each_entry(rr, &(ctx->role_head.list), list) { nsr_recon_driver_state_t state; - state = nsr_recon_driver_get_role(ctx, rr); + state = nsr_recon_driver_get_role(&status, ctx, rr); + + if (status == -1) { + op_errno = EIO; + goto out; + } if (state == leader) { @@ -2423,11 +2573,13 @@ nsr_reconciliation_driver(void *arg) nsr_driver_log (this->name, GF_LOG_INFO, "getting last term info from all members of this group\n"); // Get last term info from all members for this group - send_and_wait(-1, + send_and_wait(&status, &op_errno, -1, replica_group_size, ctx, NSR_WORK_ID_GET_LAST_TERM_INFO, NSR_RECON_QUEUE_TO_CONTROL, ctx->current_term); + if (status == -1) + goto out; // compare all the info received and choose the reconciliator @@ -2469,11 +2621,13 @@ nsr_reconciliation_driver(void *arg) if (chosen != 0) { nsr_driver_log (this->name, GF_LOG_INFO, "sending reconciliation work to %d\n", chosen); bm = 1 << ctx->reconciliator_index; - send_and_wait(bm, + send_and_wait(&status, &op_errno, bm, replica_group_size, ctx, NSR_WORK_ID_RECONCILIATOR_DO_WORK, NSR_RECON_QUEUE_TO_CONTROL, -1); + if (status == -1) + goto out; nsr_driver_log (this->name, GF_LOG_INFO, "finished reconciliation work to %d\n", chosen); } else { nsr_driver_log (this->name, GF_LOG_INFO, "local node is reconciliator. before set jmp\n"); @@ -2517,11 +2671,13 @@ nsr_reconciliation_driver(void *arg) nsr_driver_log (this->name, GF_LOG_INFO, "sending resolution work to all nodes except this node and reconciliator\n"); bm = ~((1 << ctx->reconciliator_index) || 1); - send_and_wait(bm, + send_and_wait(&status, &op_errno, bm, replica_group_size, ctx, NSR_WORK_ID_RESOLUTION_DO_WORK, NSR_RECON_QUEUE_TO_CONTROL, -1); + if (status == -1) + goto out; nsr_driver_log (this->name, GF_LOG_INFO, "finished reconciliation work as leader \n"); @@ -2564,11 +2720,13 @@ i_am_reconciliator: // We have set the bm in the above for loop where // we go thru all nodes including this node that // have seen the last term. - send_and_wait(bm, + send_and_wait(&status, &op_errno, bm, replica_group_size, ctx, NSR_WORK_ID_GET_RECONCILATION_WINDOW, NSR_RECON_QUEUE_TO_CONTROL, -1); + if (status == -1) + goto out; nsr_driver_log (this->name, GF_LOG_INFO, "finished getting reconciliation window for term %d from %dto %d \n", ctx->workers[0].recon_info->last_term, @@ -2604,12 +2762,14 @@ i_am_reconciliator: bm = (1 << record->work.source); nsr_driver_log (this->name, GF_LOG_INFO, "reading data from source %d\n",record->work.source); - send_and_wait(bm, + send_and_wait(&status, &op_errno, bm, replica_group_size, ctx, NSR_WORK_ID_SINGLE_RECONCILIATION_READ, NSR_RECON_QUEUE_TO_DATA, i); + if (status == -1) + goto out; nsr_driver_log (this->name, GF_LOG_INFO, "got data from source %d\n",record->work.source); } @@ -2618,12 +2778,14 @@ i_am_reconciliator: "fixing local data as part of reconciliation\n"); bm = 1; - send_and_wait(bm, + send_and_wait(&status, &op_errno, bm, replica_group_size, ctx, NSR_WORK_ID_SINGLE_RECONCILIATION_COMMIT, NSR_RECON_QUEUE_TO_DATA, i); + if (status == -1) + goto out; nsr_driver_log (this->name, GF_LOG_INFO, "finished fixing local data as part of reconciliation\n"); @@ -2634,12 +2796,14 @@ i_am_reconciliator: nsr_driver_log (this->name, GF_LOG_INFO, "fixing this record as a fill\n"); bm = 1; - send_and_wait(bm, + send_and_wait(&status, &op_errno, bm, replica_group_size, ctx, NSR_WORK_ID_SINGLE_RECONCILIATION_FLUSH, NSR_RECON_QUEUE_TO_DATA, i); + if (status == -1) + goto out; nsr_driver_log (this->name, GF_LOG_INFO, "finished fixing this record as a fill\n"); } @@ -2691,11 +2855,13 @@ i_am_resolutor: nsr_driver_log (this->name, GF_LOG_INFO, "getting info from reconciliator for term %d \n", my_info->last_term); bm = (1 << recon_index); - send_and_wait(bm, + send_and_wait(&status, &op_errno, bm, replica_group_size, ctx, NSR_WORK_ID_GET_GIVEN_TERM_INFO, NSR_RECON_QUEUE_TO_CONTROL, his_info->last_term); + if (status == -1) + goto out; nsr_driver_log (this->name, GF_LOG_INFO, "finished getting info from reconciliator for term %d \n", my_info->last_term); @@ -2739,11 +2905,13 @@ i_am_resolutor: "getting reconciliation window for term %d from %d to %d \n", my_info->last_term, my_info->first_index, my_info->last_index); - send_and_wait(bm, + send_and_wait(&status, &op_errno, bm, replica_group_size, ctx, NSR_WORK_ID_GET_RECONCILATION_WINDOW, NSR_RECON_QUEUE_TO_CONTROL, -1); + if (status == -1) + goto out; nsr_driver_log (this->name, GF_LOG_INFO, "finished getting reconciliation window for term %d from %d to %d \n", my_info->last_term, @@ -2771,12 +2939,14 @@ i_am_resolutor: nsr_driver_log (this->name, GF_LOG_INFO, "reading data from source %d\n",recon_index); bm = (1 << recon_index); - send_and_wait(bm, + send_and_wait(&status, &op_errno, bm, replica_group_size, ctx, NSR_WORK_ID_SINGLE_RECONCILIATION_READ, NSR_RECON_QUEUE_TO_DATA, i); + if (status == -1) + goto out; nsr_driver_log (this->name, GF_LOG_INFO, "finished reading data from source %d\n",recon_index); } @@ -2785,12 +2955,14 @@ i_am_resolutor: "fixing local data as part of resolutor\n"); bm = 1; - send_and_wait(bm, + send_and_wait(&status, &op_errno, bm, replica_group_size, ctx, NSR_WORK_ID_SINGLE_RECONCILIATION_COMMIT, NSR_RECON_QUEUE_TO_DATA, i); + if (status == -1) + goto out; nsr_driver_log (this->name, GF_LOG_INFO, "finished fixing local data as part of resolutor\n"); @@ -2816,21 +2988,25 @@ i_am_resolutor: nsr_driver_log (this->name, GF_LOG_INFO, "getting last term info from all members of this group\n"); // Get last term info from all members for this group // which will be the leader(this node) and the node that wants to join. - send_and_wait(-1, + send_and_wait(&status, &op_errno, -1, replica_group_size, ctx, NSR_WORK_ID_GET_LAST_TERM_INFO, NSR_RECON_QUEUE_TO_CONTROL, ctx->current_term); + if (status == -1) + goto out; // send message to other node that just joined to sync up with this node which is also the leader nsr_driver_log (this->name, GF_LOG_INFO, "sending resolution work to all nodes except this\n"); bm = ~(1); - send_and_wait(bm, + send_and_wait(&status, &op_errno, bm, replica_group_size, ctx, NSR_WORK_ID_RESOLUTION_DO_WORK, NSR_RECON_QUEUE_TO_CONTROL, -1); + if (status == -1) + goto out; nsr_driver_log (this->name, GF_LOG_INFO, "finished recon work as joiner \n"); @@ -2843,16 +3019,7 @@ i_am_resolutor: out: nsr_driver_log (this->name, GF_LOG_INFO, "sending end of reconciliation message \n"); - nsr_recon_return_back(priv, ctx->txn_id); -#if 0 - // send message that job is done by writing to local recon translator - bm = 1; - send_and_wait(bm, - replica_group_size, - ctx, - NSR_WORK_ID_END_RECONCILIATION, - NSR_RECON_QUEUE_TO_CONTROL, -1); -#endif + nsr_recon_return_back(priv, ctx->term, status, op_errno); nsr_driver_log (this->name, GF_LOG_INFO, "finished sending end of reconciliation message \n"); } |