diff options
Diffstat (limited to 'xlators/cluster/nsr-recon/src')
-rw-r--r-- | xlators/cluster/nsr-recon/src/recon_driver.c | 533 | ||||
-rw-r--r-- | xlators/cluster/nsr-recon/src/recon_driver.h | 8 | ||||
-rw-r--r-- | xlators/cluster/nsr-recon/src/recon_xlator.c | 138 | ||||
-rw-r--r-- | xlators/cluster/nsr-recon/src/recon_xlator.h | 4 |
4 files changed, 428 insertions, 255 deletions
diff --git a/xlators/cluster/nsr-recon/src/recon_driver.c b/xlators/cluster/nsr-recon/src/recon_driver.c index 7f92b6578..49ee465c5 100644 --- a/xlators/cluster/nsr-recon/src/recon_driver.c +++ b/xlators/cluster/nsr-recon/src/recon_driver.c @@ -254,7 +254,7 @@ out: * Output Arguments: * buf - where the values are written one after the other (NULL seperated) */ -static void +static int32_t get_xattr(struct glfs_fd *fd, char *keys, char *buf, @@ -277,7 +277,7 @@ get_xattr(struct glfs_fd *fd, // TBD - handle error if (r == -1) - return; + return -1; // increment the key to next value keys += len; @@ -285,7 +285,7 @@ get_xattr(struct glfs_fd *fd, // increment buf to hold the next key buf += strlen(buf) + 1; } - return; + return 0; } /* @@ -296,7 +296,7 @@ get_xattr(struct glfs_fd *fd, * keys - bunch of NULL seperated key names * num - number of keys */ -static void delete_xattr(struct glfs_fd *fd, +static int32_t delete_xattr(struct glfs_fd *fd, dict_t *dict_t, char *keys, uint32_t num) @@ -304,10 +304,11 @@ static void delete_xattr(struct glfs_fd *fd, while(num--) { // get the value and copy the value // TBD - handle failure cases when calling glfs_fremovexattr_with_xdata() - glfs_fremovexattr_with_xdata(fd, keys, dict_t); + if (glfs_fremovexattr_with_xdata(fd, keys, dict_t) == -1) + return -1; keys += strlen(keys) +1; } - return; + return 0; } /* @@ -320,7 +321,7 @@ static void delete_xattr(struct glfs_fd *fd, * Each of the key-value is seperated by NULL in turn. * num - Number of such key value pairs. */ -static void +static int32_t fill_xattr(struct glfs_fd *fd, dict_t *dict, char *buf, @@ -336,10 +337,10 @@ fill_xattr(struct glfs_fd *fd, // TBD - handle failure cases when calling glfs_fsetxattr_with_xdata() r = glfs_fsetxattr_with_xdata(fd, k, val, strlen(val), 0, dict); if (r == -1) - return; + return -1; k = val + strlen(val) + 1; } - return; + return 0; } /* @@ -408,7 +409,7 @@ nsr_recon_get_file(char *vol, nsr_replica_worker_t *worker) * ctx - The per worker based context * control - set to true if this worker is for the control plane */ -static int +static int32_t nsr_recon_start_work(nsr_per_node_worker_t *ctx, gf_boolean_t control) { @@ -497,6 +498,7 @@ nsr_recon_start_work(nsr_per_node_worker_t *ctx, if (aux_fd == NULL) { nsr_worker_log(this->name, GF_LOG_ERROR, "cannot open aux log file for thread %s\n",ctx->id); + return -1; } else { nsr_worker_log(this->name, GF_LOG_ERROR, "---opened aux log file for thread %s\n",ctx->id); @@ -517,7 +519,7 @@ nsr_recon_start_work(nsr_per_node_worker_t *ctx, * ctx - The per worker based context * control - set to true if this worker is for the control plane */ -static int +static int32_t nsr_recon_end_work(nsr_per_node_worker_t *ctx, gf_boolean_t control) { @@ -648,13 +650,7 @@ control_worker_func_0(nsr_per_node_worker_t *ctx, nsr_worker_log(this->name, GF_LOG_INFO, "this message should not be sent \n"); - break; - } - case NSR_WORK_ID_END_RECONCILIATION: - { - nsr_worker_log(this->name, GF_LOG_INFO, - "sending reconciliation end message to node %d\n", index); - nsr_recon_return_back(priv, dr->txn_id); + ctx->result = -1; break; } case NSR_WORK_ID_GET_RECONCILATION_WINDOW: @@ -676,16 +672,29 @@ control_worker_func_0(nsr_per_node_worker_t *ctx, rd = GF_CALLOC(num, sizeof(nsr_recon_record_details_t), gf_mt_recon_private_t); + if (rd == NULL) { + ctx->result = -1; + return; + } + recon_info->records = GF_CALLOC(num, sizeof(nsr_reconciliation_record_t), gf_mt_recon_private_t); + if (recon_info->records == NULL) { + ctx->result = -1; + return; + } // TBD - handle errors - nsr_recon_libchangelog_get_records(this, priv->changelog_base_path, + if (nsr_recon_libchangelog_get_records(this, priv->changelog_base_path, recon_info->last_term, recon_info->first_index, recon_info->last_index, - rd); + rd) == _gf_false) { + ctx->result = -1; + return; + } + // The above function writes into rd from 0 to (num -1) // We need to take care of this whenever we deal with records for (i=0; i < num; i++) { @@ -778,7 +787,10 @@ control_worker_func(nsr_per_node_worker_t *ctx, "calling nsr_recon_start_work\n"); // TBD - handle error in case nsr_recon_start_work gives error - nsr_recon_start_work(ctx, _gf_true); + if (nsr_recon_start_work(ctx, _gf_true) != 0) { + ctx->result = -1; + return; + } nsr_worker_log(this->name, GF_LOG_INFO, "finished nsr_recon_start_work\n"); @@ -790,7 +802,10 @@ control_worker_func(nsr_per_node_worker_t *ctx, "calling nsr_recon_end_work\n"); // TBD - handle error in case nsr_recon_end_work gives error - nsr_recon_end_work(ctx, _gf_true); + if (nsr_recon_end_work(ctx, _gf_true) != 0) { + ctx->result = -1; + return; + } nsr_worker_log(this->name, GF_LOG_INFO, "finished nsr_recon_end_work\n"); @@ -807,9 +822,18 @@ control_worker_func(nsr_per_node_worker_t *ctx, // first write the current term term number // TBD - error handling for all the glfs APIs - glfs_lseek(ctx->aux_fd, nsr_recon_xlator_sector_4, SEEK_SET); - glfs_write(ctx->aux_fd, &term, sizeof(term), 0); - glfs_read(ctx->aux_fd, <, sizeof(lt), 0); + if (glfs_lseek(ctx->aux_fd, nsr_recon_xlator_sector_4, SEEK_SET) == -1) { + ctx->result = -1; + return; + } + if (glfs_write(ctx->aux_fd, &term, sizeof(term), 0) == -1) { + ctx->result = -1; + return; + } + if (glfs_read(ctx->aux_fd, <, sizeof(lt), 0) == -1) { + ctx->result = -1; + return; + } ENDIAN_CONVERSION_LT(lt, _gf_true); //ntohl recon_info->last_term = lt.last_term; recon_info->commited_ops = lt.commited_ops; @@ -834,9 +858,18 @@ control_worker_func(nsr_per_node_worker_t *ctx, // first write the term number // TBD - error handling for all the glfs APIs - glfs_lseek(ctx->aux_fd, nsr_recon_xlator_sector_3, SEEK_SET); - glfs_write(ctx->aux_fd, &term, sizeof(term), 0); - glfs_read(ctx->aux_fd, <, sizeof(lt), 0); + if (glfs_lseek(ctx->aux_fd, nsr_recon_xlator_sector_3, SEEK_SET) == -1) { + ctx->result = -1; + return; + } + if (glfs_write(ctx->aux_fd, &term, sizeof(term), 0) == -1) { + ctx->result = -1; + return; + } + if (glfs_read(ctx->aux_fd, <, sizeof(lt), 0) == -1) { + ctx->result = -1; + return; + } ENDIAN_CONVERSION_LT(lt, _gf_true); //ntohl recon_info->last_term = lt.last_term; recon_info->commited_ops = lt.commited_ops; @@ -863,9 +896,12 @@ control_worker_func(nsr_per_node_worker_t *ctx, "trying to make this index %d as reconciliator for term %d\n", index, term); // TBD - error handling for all the glfs APIs - glfs_lseek(ctx->aux_fd, + if (glfs_lseek(ctx->aux_fd, nsr_recon_xlator_sector_1, - SEEK_SET); + SEEK_SET) == -1) { + ctx->result = -1; + return; + } // We have all the info for all other nodes. // Fill all that info when sending data to that process. @@ -888,7 +924,15 @@ control_worker_func(nsr_per_node_worker_t *ctx, rr.num = num; rr.role = reconciliator; ENDIAN_CONVERSION_RR(rr, _gf_false); //htonl - glfs_write(ctx->aux_fd, &rr, sizeof(rr), 0); + if (glfs_write(ctx->aux_fd, &rr, sizeof(rr), 0) == -1) { + ctx->result = -1; + // Put the errno only for this case since we + // are bothered about retrying only for this case. + // For rest of the cases we will just return EIO + // in errno. + ctx->op_errno = errno; + return; + } nsr_worker_log(this->name, GF_LOG_INFO, "sent reconciliator info for term %d with node count as %d\n", term, num); @@ -905,14 +949,18 @@ control_worker_func(nsr_per_node_worker_t *ctx, "trying to make this index %d as resolutor with reconciliator as %d\n",index, rec); // TBD - error handling for all the glfs APIs - glfs_lseek(ctx->aux_fd, + if (glfs_lseek(ctx->aux_fd, nsr_recon_xlator_sector_1, - SEEK_SET); + SEEK_SET) == -1) { + ctx->result = -1; + return; + } + rr.num = 2; // Fill in info[0] as info for the node for which we are seeking resolution. // Fill in info[1] as info of the reconciliator node. - // The function nsr_recon_driver_set_role() that will be called when + // The function nsr_recon_driver_get_role() that will be called when // this message reaches the node will look at index 1 for term information // related to the reconciliator. for (i=0; i < 2; i++) { @@ -943,33 +991,21 @@ control_worker_func(nsr_per_node_worker_t *ctx, } rr.role = resolutor; ENDIAN_CONVERSION_RR(rr, _gf_false); //htonl - glfs_write(ctx->aux_fd, &rr, sizeof(rr), 0); + if (glfs_write(ctx->aux_fd, &rr, sizeof(rr), 0) == -1) { + ctx->result = -1; + // Put the errno only for this case since we + // are bothered about retrying only for this case. + // For rest of the cases we will just return EIO + // in errno. + ctx->op_errno = errno; + return; + } nsr_worker_log(this->name, GF_LOG_INFO, "sent message to this node %d resolutor with reconciliator as %d\n", index, rec); break; } - case NSR_WORK_ID_END_RECONCILIATION: - { - char c[4]; - uint32_t old = htonl(dr->txn_id); - - nsr_worker_log(this->name, GF_LOG_INFO, - "sending reconciliation end message to node %d\n", index); - - memcpy(c, &old, sizeof(uint32_t)); - // TBD - error handling for all the glfs APIs - glfs_lseek(ctx->aux_fd, - nsr_recon_xlator_sector_0, - SEEK_SET); - glfs_write(ctx->aux_fd, c, sizeof(c), 0); - - nsr_worker_log(this->name, GF_LOG_INFO, - "finished sending reconciliation end message to node %d\n", index); - - break; - } case NSR_WORK_ID_GET_RECONCILATION_WINDOW: { nsr_recon_log_info_t li; @@ -985,24 +1021,42 @@ control_worker_func(nsr_per_node_worker_t *ctx, // TBD - error handling for all the glfs APIs - glfs_lseek(ctx->aux_fd, nsr_recon_xlator_sector_2, SEEK_SET); + if (glfs_lseek(ctx->aux_fd, nsr_recon_xlator_sector_2, SEEK_SET) == -1) { + ctx->result = -1; + return; + } // write to node what term & indices we are interested li.term = recon_info->last_term; li.first_index = recon_info->first_index; li.last_index = recon_info->last_index; ENDIAN_CONVERSION_LI(li, _gf_false); //htonl - glfs_write(ctx->aux_fd, &li, sizeof(li), 0); + if (glfs_write(ctx->aux_fd, &li, sizeof(li), 0) == -1) { + ctx->result = -1; + return; + } // then read rd = GF_CALLOC(num, sizeof(nsr_recon_record_details_t), gf_mt_recon_private_t); + if (rd == NULL) { + ctx->result = -1; + return; + } recon_info->records = GF_CALLOC(num, sizeof(nsr_reconciliation_record_t), gf_mt_recon_private_t); + if (recon_info->records == NULL) { + ctx->result = -1; + return; + } + + if (glfs_read(ctx->aux_fd, rd, num * sizeof(nsr_recon_record_details_t), 0) == -1) { + ctx->result = -1; + return; + } - glfs_read(ctx->aux_fd, rd, num * sizeof(nsr_recon_record_details_t), 0); for (i=0; i < num; i++) { ENDIAN_CONVERSION_RD(rd[i], _gf_true); //ntohl memcpy(&(recon_info->records[i].rec), @@ -1132,7 +1186,7 @@ compute_reconciliation_work(nsr_recon_driver_ctx_t *ctx) return; } -static void +static int32_t nsr_recon_in_use(nsr_recon_driver_ctx_t *ctx, uint32_t i, gf_boolean_t in_use); @@ -1145,14 +1199,14 @@ nsr_recon_in_use(nsr_recon_driver_ctx_t *ctx, gf_boolean_t nsr_recon_driver_set_role(nsr_recon_driver_ctx_t *ctx, nsr_recon_role_t *rr, - uint32_t txn_id) + uint32_t term) { nsr_role_work_t *rw; nsr_driver_log(this->name, GF_LOG_INFO, "set role called \n"); rw = GF_CALLOC(1, sizeof (nsr_role_work_t), 0); memcpy(&rw->role, rr, sizeof(nsr_recon_role_t)); - rw->txn_id = txn_id; + rw->term = term; INIT_LIST_HEAD(&(rw->list)); pthread_mutex_lock(&(ctx->mutex)); list_add_tail(&rw->list, &ctx->role_head.list); @@ -1176,15 +1230,11 @@ nsr_recon_driver_set_role(nsr_recon_driver_ctx_t *ctx, * If resolution to be done, then rr.info[0] will have this node's info * which the leader would have got earlier. rr[1].info will have the * info regarding the reconciliator. - * txn_id - All role changes(except when leader becomes reconciliator or resolutor) - * would be initiated as write to the recon xlator which would have got a frame from - * either the brick process(leader change) or other reconciliation process. - * The write function would return immediately after storing the frame which - * needs to be returned back after the actual reconciliation is done. - * For that we store the frame against this id which acts as a key. + * term - leader's term that is causing this role */ nsr_recon_driver_state_t -nsr_recon_driver_get_role(nsr_recon_driver_ctx_t *ctx, +nsr_recon_driver_get_role(int32_t *status, + nsr_recon_driver_ctx_t *ctx, nsr_role_work_t *rw) { uint8_t i=0, j=0; @@ -1192,17 +1242,24 @@ nsr_recon_driver_get_role(nsr_recon_driver_ctx_t *ctx, // First make all the threads uninitialise for (i = 0; i < ctx->replica_group_size; i++) { - nsr_recon_in_use(ctx, i, _gf_false); + if (nsr_recon_in_use(ctx, i, _gf_false) == -1) { + *status = -1; + return 0; + } } if ((rr->role == leader) || (rr->role == joiner)) { // First set info this node - nsr_recon_in_use(ctx, 0, _gf_true); + if (nsr_recon_in_use(ctx, 0, _gf_true) == -1) { + *status = -1; + return 0; + } ctx->workers[0].recon_info = GF_CALLOC (1, sizeof (nsr_reconciliator_info_t), gf_mt_recon_private_t); if (!ctx->workers[0].recon_info) { - return _gf_false; + *status = -1; + return 0; } ctx->current_term = rr->current_term; @@ -1213,11 +1270,14 @@ nsr_recon_driver_get_role(nsr_recon_driver_ctx_t *ctx, if (!strcmp(ctx->workers[i].name, rr->info[j].name)) { //if (strstr(ctx->workers[i].name, rr->info[j].name)) { nsr_driver_log(this->name, GF_LOG_INFO, - "nsr_recon_driver_set_role: this as %s. found other server %s\n", + "nsr_recon_driver_get_role: this as %s. found other server %s\n", (rr->role == leader) ? "leader" : "joiner", ctx->workers[i].name); - nsr_recon_in_use(ctx, i, _gf_true); + if (nsr_recon_in_use(ctx, i, _gf_true) == -1) { + *status = -1; + return 0; + } // Allocate this here. This will get later filled when // the leader tries to get last term information from all // the nodes @@ -1225,7 +1285,8 @@ nsr_recon_driver_get_role(nsr_recon_driver_ctx_t *ctx, sizeof (nsr_reconciliator_info_t), gf_mt_recon_private_t); if (!ctx->workers[i].recon_info) { - return _gf_false; + *status = -1; + return 0; } break; } @@ -1245,13 +1306,14 @@ nsr_recon_driver_get_role(nsr_recon_driver_ctx_t *ctx, if (!strcmp(rr->info[i].name, ctx->workers[j].name)) { //if (strstr(ctx->workers[j].name, rr->info[i].name)) { nsr_driver_log(this->name, GF_LOG_INFO, - "nsr_recon_driver_set_role: this as reconciliator. found other server %s\n", + "nsr_recon_driver_get_role: this as reconciliator. found other server %s\n", ctx->workers[j].name); ctx->workers[j].recon_info = GF_CALLOC (1, sizeof (nsr_reconciliator_info_t), gf_mt_recon_private_t); if (!ctx->workers[j].recon_info) { - return _gf_false; + *status = -1; + return 0; } ctx->workers[j].recon_info->last_term = rr->info[i].last_term; @@ -1261,7 +1323,10 @@ nsr_recon_driver_get_role(nsr_recon_driver_ctx_t *ctx, rr->info[i].last_index; ctx->workers[j].recon_info->first_index = rr->info[i].first_index; - nsr_recon_in_use(ctx, j, _gf_true); + if (nsr_recon_in_use(ctx, j, _gf_true) == -1) { + *status = -1; + return 0; + } break; } } @@ -1272,13 +1337,14 @@ nsr_recon_driver_get_role(nsr_recon_driver_ctx_t *ctx, //if (strstr(ctx->workers[j].name, rr->info[1].name)) { if (!strcmp(rr->info[1].name, ctx->workers[j].name)) { nsr_driver_log(this->name, GF_LOG_INFO, - "nsr_recon_driver_set_role: this as resolutor. found other server %s as reconciliator\n", - ctx->workers[1].name); + "nsr_recon_driver_get_role: this as resolutor. found other server %s as reconciliator\n", + ctx->workers[j].name); ctx->workers[j].recon_info = GF_CALLOC (1, sizeof (nsr_reconciliator_info_t), gf_mt_recon_private_t); if (!ctx->workers[j].recon_info) { - return _gf_false; + *status = -1; + return 0; } ctx->workers[j].recon_info->last_term = rr->info[1].last_term; @@ -1289,7 +1355,10 @@ nsr_recon_driver_get_role(nsr_recon_driver_ctx_t *ctx, ctx->workers[j].recon_info->first_index = rr->info[1].first_index; ctx->reconciliator_index = j; - nsr_recon_in_use(ctx, j, _gf_true); + if (nsr_recon_in_use(ctx, j, _gf_true) == -1) { + *status = -1; + return 0; + } GF_ASSERT(ctx->reconciliator_index != 0); break; } @@ -1298,18 +1367,23 @@ nsr_recon_driver_get_role(nsr_recon_driver_ctx_t *ctx, sizeof (nsr_reconciliator_info_t), gf_mt_recon_private_t); if (!ctx->workers[0].recon_info) { - return _gf_false; + *status = -1; + return 0; } // info[0] has all info for this node ctx->workers[0].recon_info->last_term = rr->info[0].last_term; ctx->workers[0].recon_info->commited_ops = rr->info[0].commited_ops; ctx->workers[0].recon_info->last_index = rr->info[0].last_index; ctx->workers[0].recon_info->first_index = rr->info[0].first_index; - nsr_recon_in_use(ctx, 0, _gf_true); + if (nsr_recon_in_use(ctx, 0, _gf_true) == -1) { + *status = -1; + return 0; + } } - ctx->txn_id = rw->txn_id; + ctx->term = rw->term; + *status = 0; return rr->role; } @@ -1401,7 +1475,7 @@ create_obj(nsr_per_node_worker_t *ctx, char *gfid_str) * and the changelog translator consumes term and index. */ -static void +static gf_boolean_t apply_record(nsr_per_node_worker_t *ctx, nsr_reconciliation_record_t *ri, dict_t * dict) @@ -1420,10 +1494,11 @@ apply_record(nsr_per_node_worker_t *ctx, // TBD - get a way to just stuff the log entry without writing the data so that // changelogs remain identical. if (ri->work.data == NULL) { - return; + return _gf_true; } - if ((obj = create_obj(ctx,ri->rec.gfid)) == NULL) return; + if ((obj = create_obj(ctx,ri->rec.gfid)) == NULL) + return _gf_false;; fd = glfs_h_open_with_xdata(ctx->fs, obj, O_RDWR, dict); if (fd == NULL) { @@ -1431,23 +1506,28 @@ apply_record(nsr_per_node_worker_t *ctx, nsr_worker_log(this->name, GF_LOG_ERROR, "open for file %s failed\n", ri->rec.gfid); - return; + return _gf_false; } if (glfs_lseek_with_xdata(fd, ri->rec.offset, SEEK_SET, dict) != ri->rec.offset) { GF_ASSERT(0); nsr_worker_log(this->name, GF_LOG_ERROR, "lseek for file %s failed at offset %d\n", ri->rec.gfid, ri->rec.offset); - return; + return _gf_false; } if (glfs_write_with_xdata(fd, ri->work.data, ri->rec.len, 0, dict) != ri->rec.len) { GF_ASSERT(0); nsr_worker_log(this->name, GF_LOG_ERROR, "write for file %s failed for bytes %d\n", ri->rec.gfid, ri->rec.len); - return; + return _gf_false; } - glfs_close_with_xdata(fd, dict); + if (glfs_close_with_xdata(fd, dict) == -1) { + GF_ASSERT(0); + nsr_worker_log(this->name, GF_LOG_ERROR, + "close failed\n"); + return _gf_false; + } nsr_worker_log(this->name, GF_LOG_INFO, "Finished DOing write for gfid %s @offset %d for len %d\n", @@ -1459,7 +1539,7 @@ apply_record(nsr_per_node_worker_t *ctx, "DOing truncate for file %s @offset %d \n", ri->rec.gfid, ri->rec.offset); - if ((obj = create_obj(ctx, ri->rec.gfid)) == NULL) return; + if ((obj = create_obj(ctx, ri->rec.gfid)) == NULL) return _gf_false; fd = glfs_h_open_with_xdata(ctx->fs, obj, O_RDWR, dict); if (fd == NULL) { @@ -1467,16 +1547,21 @@ apply_record(nsr_per_node_worker_t *ctx, nsr_worker_log(this->name, GF_LOG_ERROR, "open for file %s failed\n", ri->rec.gfid); - return; + return _gf_false; } if (glfs_ftruncate_with_xdata(fd, ri->rec.offset, dict) == -1) { GF_ASSERT(0); nsr_worker_log(this->name, GF_LOG_ERROR, "trunctae for file %s failed @offset %d\n", ri->rec.gfid,ri->rec.offset ); - return; + return _gf_false; } - glfs_close_with_xdata(fd, dict); + if (glfs_close_with_xdata(fd, dict) == -1) { + GF_ASSERT(0); + nsr_worker_log(this->name, GF_LOG_ERROR, + "close failed\n"); + return _gf_false; + } nsr_worker_log(this->name, GF_LOG_INFO, "Finished DOing truncate for gfid %s @offset %d \n", @@ -1499,10 +1584,10 @@ apply_record(nsr_per_node_worker_t *ctx, // TBD - get a way to just stuff the log entry without writing the data so that // changelogs remain identical. if (ri->work.data == NULL) { - return; + return _gf_true; } - if ((obj = create_obj(ctx, ri->rec.gfid)) == NULL) return; + if ((obj = create_obj(ctx, ri->rec.gfid)) == NULL) return _gf_false; if (obj->inode->ia_type == IA_IFDIR) fd = glfs_h_opendir_with_xdata(ctx->fs, obj, dict); @@ -1513,7 +1598,7 @@ apply_record(nsr_per_node_worker_t *ctx, nsr_worker_log(this->name, GF_LOG_ERROR, "open for file %s failed\n", ri->rec.gfid); - return; + return _gf_false; } if(get_xattr_total_size(fd, &t_b, &k_s, &v_s, &num, dict) == -1) { @@ -1521,10 +1606,15 @@ apply_record(nsr_per_node_worker_t *ctx, GF_ASSERT(0); nsr_worker_log(this->name, GF_LOG_ERROR, "list of xattr of %s failed\n", ri->rec.gfid); - return; + return _gf_false; } - delete_xattr(fd, dict, t_b, num); + if (delete_xattr(fd, dict, t_b, num) == -1) { + GF_ASSERT(0); + nsr_worker_log(this->name, GF_LOG_ERROR, + "deleting xattrs failed\n"); + return _gf_false; + } // Set one special dict flag to indicate the opcode so that // the opcode gets set to this @@ -1532,12 +1622,22 @@ apply_record(nsr_per_node_worker_t *ctx, GF_ASSERT(0); nsr_worker_log(this->name, GF_LOG_ERROR, "setting opcode to %d failed\n",ri->rec.op); - return; + return _gf_false; } - fill_xattr(fd, dict, ri->work.data, ri->work.num); + if (fill_xattr(fd, dict, ri->work.data, ri->work.num) == -1) { + GF_ASSERT(0); + nsr_worker_log(this->name, GF_LOG_ERROR, + "filling xattrs failed\n"); + return _gf_false; + } - glfs_close_with_xdata(fd, dict); + if (glfs_close_with_xdata(fd, dict) == -1) { + GF_ASSERT(0); + nsr_worker_log(this->name, GF_LOG_ERROR, + "close failed\n"); + return _gf_false; + } nsr_worker_log(this->name, GF_LOG_INFO, "Finsihed Doing set extended attr for %s \n", @@ -1553,7 +1653,7 @@ apply_record(nsr_per_node_worker_t *ctx, // TBD - add mode and flags later uuid_parse(ri->rec.gfid, gfid); - if ((obj = create_obj(ctx, ri->rec.pargfid)) == NULL) return; + if ((obj = create_obj(ctx, ri->rec.pargfid)) == NULL) return _gf_false; nsr_worker_log (this->name, GF_LOG_INFO, "creating with mode 0%o", ri->rec.mode); @@ -1562,7 +1662,7 @@ apply_record(nsr_per_node_worker_t *ctx, nsr_worker_log(this->name, GF_LOG_ERROR, "Failure for Doing create for file %s\n", ri->rec.entry); - return; + return _gf_false; } nsr_worker_log(this->name, GF_LOG_INFO, @@ -1579,14 +1679,14 @@ apply_record(nsr_per_node_worker_t *ctx, // TBD - add mode and flags later uuid_parse(ri->rec.gfid, gfid); - if ((obj = create_obj(ctx, ri->rec.pargfid)) == NULL) return; + if ((obj = create_obj(ctx, ri->rec.pargfid)) == NULL) return _gf_false; if (glfs_h_mknod_with_xdata(ctx->fs, obj, ri->rec.entry, O_RDWR, 0777, NULL, gfid, dict) == NULL) { GF_ASSERT(0); nsr_worker_log(this->name, GF_LOG_ERROR, "Failure for Doing mknod for file %s\n", ri->rec.entry); - return; + return _gf_false; } nsr_worker_log(this->name, GF_LOG_INFO, @@ -1603,14 +1703,14 @@ apply_record(nsr_per_node_worker_t *ctx, // TBD - add mode and flags later uuid_parse(ri->rec.gfid, gfid); - if ((obj = create_obj(ctx, ri->rec.pargfid)) == NULL) return; + if ((obj = create_obj(ctx, ri->rec.pargfid)) == NULL) return _gf_false; if (glfs_h_mkdir_with_xdata(ctx->fs, obj, ri->rec.entry, 0777, NULL, gfid, dict) != 0) { GF_ASSERT(0); nsr_worker_log(this->name, GF_LOG_ERROR, "Failure for Doing mkdir for file %s\n", ri->rec.entry); - return; + return _gf_false; } nsr_worker_log(this->name, GF_LOG_INFO, @@ -1623,13 +1723,13 @@ apply_record(nsr_per_node_worker_t *ctx, "Doing rmdir/ublink for dir %s \n", ri->rec.entry); - if ((obj = create_obj(ctx, ri->rec.pargfid)) == NULL) return; + if ((obj = create_obj(ctx, ri->rec.pargfid)) == NULL) return _gf_false; if (glfs_h_unlink_with_xdata(ctx->fs, obj, ri->rec.entry, dict) != 0) { GF_ASSERT(0); nsr_worker_log(this->name, GF_LOG_ERROR, "Failure for Doing rmdir/unlink for file %s\n", ri->rec.entry); - return; + return _gf_false; } nsr_worker_log(this->name, GF_LOG_INFO, @@ -1644,7 +1744,7 @@ apply_record(nsr_per_node_worker_t *ctx, "Doing symlink for file %s to file %s \n", ri->rec.entry, ri->rec.link_path); - if ((obj = create_obj(ctx, ri->rec.pargfid)) == NULL) return; + if ((obj = create_obj(ctx, ri->rec.pargfid)) == NULL) return _gf_false; uuid_parse(ri->rec.gfid, gfid); if (glfs_h_symlink_with_xdata(ctx->fs, obj, ri->rec.entry, ri->rec.link_path, NULL, gfid, dict) == NULL) { @@ -1652,7 +1752,7 @@ apply_record(nsr_per_node_worker_t *ctx, nsr_worker_log(this->name, GF_LOG_ERROR, "Failed to Doing symlink for file %s to file %s \n", ri->rec.entry, ri->rec.link_path); - return; + return _gf_false; } nsr_worker_log(this->name, GF_LOG_INFO, @@ -1667,15 +1767,15 @@ apply_record(nsr_per_node_worker_t *ctx, "Doing hard link for file %s to file %s \n", ri->rec.entry, ri->rec.gfid); - if ((obj = create_obj(ctx, ri->rec.pargfid)) == NULL) return; - if ((to_obj = create_obj(ctx, ri->rec.gfid)) == NULL) return; + if ((obj = create_obj(ctx, ri->rec.pargfid)) == NULL) return _gf_false; + if ((to_obj = create_obj(ctx, ri->rec.gfid)) == NULL) return _gf_false; if (glfs_h_link_with_xdata(ctx->fs, to_obj, obj, ri->rec.entry, dict) == -1) { GF_ASSERT(0); nsr_worker_log(this->name, GF_LOG_ERROR, "Failed to Doing hard link for file %s to file %s \n", ri->rec.entry, ri->rec.gfid); - return; + return _gf_false; } nsr_worker_log(this->name, GF_LOG_INFO, @@ -1690,15 +1790,15 @@ apply_record(nsr_per_node_worker_t *ctx, "Doing rename for file %s to file %s \n", ri->rec.entry, ri->rec.newloc); - if ((obj = create_obj(ctx, ri->rec.pargfid)) == NULL) return; - if ((to_obj = create_obj(ctx, ri->rec.gfid)) == NULL) return; + if ((obj = create_obj(ctx, ri->rec.pargfid)) == NULL) return _gf_false; + if ((to_obj = create_obj(ctx, ri->rec.gfid)) == NULL) return _gf_false; if (glfs_h_rename_with_xdata(ctx->fs, obj, ri->rec.entry, to_obj, ri->rec.newloc, dict) == -1) { GF_ASSERT(0); nsr_worker_log(this->name, GF_LOG_ERROR, "Failed to Doing rename for file %s to file %s \n", ri->rec.entry, ri->rec.newloc); - return; + return _gf_false; } nsr_worker_log(this->name, GF_LOG_INFO, @@ -1719,7 +1819,7 @@ apply_record(nsr_per_node_worker_t *ctx, "Doing attr for file %s \n", ri->rec.gfid); - if ((obj = create_obj(ctx, ri->rec.gfid)) == NULL) return; + if ((obj = create_obj(ctx, ri->rec.gfid)) == NULL) return _gf_false; if (obj->inode->ia_type == IA_IFDIR) fd = glfs_h_opendir_with_xdata(ctx->fs, obj, dict); @@ -1730,7 +1830,7 @@ apply_record(nsr_per_node_worker_t *ctx, nsr_worker_log(this->name, GF_LOG_ERROR, "open for file %s failed\n", ri->rec.gfid); - return; + return _gf_false; } iatt.ia_prot = ia_prot_from_st_mode(777); @@ -1743,7 +1843,7 @@ apply_record(nsr_per_node_worker_t *ctx, GF_ASSERT(0); nsr_worker_log(this->name, GF_LOG_ERROR, "setting opcode to %d failed\n",ri->rec.op); - return; + return _gf_false; } ret = glfs_fsetattr_with_xdata(fd, &iatt, valid, dict); @@ -1752,17 +1852,22 @@ apply_record(nsr_per_node_worker_t *ctx, nsr_worker_log(this->name, GF_LOG_INFO, "failed Doing attr for file %s \n", ri->rec.gfid); - return; + return _gf_false; } - glfs_close_with_xdata(fd, dict); + if (glfs_close_with_xdata(fd, dict) == -1) { + GF_ASSERT(0); + nsr_worker_log(this->name, GF_LOG_ERROR, + "close failed\n"); + return _gf_false; + } nsr_worker_log(this->name, GF_LOG_INFO, "Doing attr for file %s \n", ri->rec.gfid); } - return; + return _gf_true; } //return back opcodes that requires reading from source @@ -1816,7 +1921,10 @@ data_worker_func(nsr_per_node_worker_t *ctx, nsr_worker_log(this->name, GF_LOG_INFO, "started data ini \n"); - nsr_recon_start_work(ctx, _gf_false); + if (nsr_recon_start_work(ctx, _gf_false) != 0) { + ctx->result = -1; + return; + } nsr_worker_log(this->name, GF_LOG_INFO, "finished data ini \n"); @@ -1825,7 +1933,10 @@ data_worker_func(nsr_per_node_worker_t *ctx, nsr_worker_log(this->name, GF_LOG_INFO, "started data fini \n"); - nsr_recon_end_work(ctx, _gf_false); + if (nsr_recon_end_work(ctx, _gf_false) != 0) { + ctx->result = -1; + return; + } nsr_worker_log(this->name, GF_LOG_INFO, "finished data fini \n"); @@ -1838,18 +1949,21 @@ data_worker_func(nsr_per_node_worker_t *ctx, dict = dict_new (); if (!dict) { + ctx->result = -1; GF_ASSERT(0); nsr_worker_log(this->name, GF_LOG_ERROR, "failed allocating for dictionary\n"); break; } if (dict_set_int32(dict,RECON_TERM_XATTR,ri->work.term)) { + ctx->result = -1; GF_ASSERT(0); nsr_worker_log(this->name, GF_LOG_ERROR, "error setting term in dict\n"); break; } if (dict_set_int32(dict,RECON_INDEX_XATTR,ri->work.index)) { + ctx->result = -1; GF_ASSERT(0); nsr_worker_log(this->name, GF_LOG_ERROR, "error setting term in dict\n"); @@ -1978,7 +2092,12 @@ data_worker_func(nsr_per_node_worker_t *ctx, } ri->work.data = GF_CALLOC ((k_s + v_s) , sizeof(char), gf_mt_recon_private_t); - get_xattr(fd, t_b, ri->work.data, v_s, num, dict); + if (get_xattr(fd, t_b, ri->work.data, v_s, num, dict) == -1) { + GF_ASSERT(0); + nsr_worker_log(this->name, GF_LOG_ERROR, + "get xattr of gfid %s failed\n", rd->gfid); + break; + } ri->work.num = num; nsr_worker_log(this->name, GF_LOG_INFO, "finished getattr for gfid %s \n", @@ -2014,24 +2133,30 @@ data_worker_func(nsr_per_node_worker_t *ctx, wip, rd->gfid); dict = dict_new (); if (!dict) { + ctx->result = -1; GF_ASSERT(0); nsr_worker_log(this->name, GF_LOG_ERROR, "failed allocating for dictionary\n"); break; } if (dict_set_int32(dict,RECON_TERM_XATTR,ri->work.term)) { + ctx->result = -1; GF_ASSERT(0); nsr_worker_log(this->name, GF_LOG_ERROR, "error setting term in dict\n"); break; } if (dict_set_int32(dict,RECON_INDEX_XATTR,ri->work.index)) { + ctx->result = -1; GF_ASSERT(0); nsr_worker_log(this->name, GF_LOG_ERROR, "error setting term in dict\n"); break; } - apply_record(ctx, ri, dict); + if (apply_record(ctx, ri, dict) == _gf_false) { + nsr_worker_log(this->name, GF_LOG_ERROR, + "apply_record fails\n"); + } nsr_worker_log(this->name, GF_LOG_INFO, "finished recon commit for gfid %s \n", @@ -2041,18 +2166,21 @@ data_worker_func(nsr_per_node_worker_t *ctx, case NSR_WORK_ID_SINGLE_RECONCILIATION_FLUSH: dict = dict_new (); if (!dict) { + ctx->result = -1; GF_ASSERT(0); nsr_worker_log(this->name, GF_LOG_ERROR, "failed allocating for dictionary\n"); break; } if (dict_set_int32(dict,RECON_TERM_XATTR,ri->work.term)) { + ctx->result = -1; GF_ASSERT(0); nsr_worker_log(this->name, GF_LOG_ERROR, "error setting term in dict\n"); break; } if (dict_set_int32(dict,RECON_INDEX_XATTR,ri->work.index)) { + ctx->result = -1; GF_ASSERT(0); nsr_worker_log(this->name, GF_LOG_ERROR, "error setting term in dict\n"); @@ -2198,7 +2326,9 @@ create_worker_threads(nsr_recon_private_t *priv, * misc - used to overload such as index. */ static void -send_and_wait(int32_t bm, +send_and_wait(int32_t *result, + int32_t *op_errno, + int32_t bm, uint32_t num, nsr_recon_driver_ctx_t *ctx, nsr_recon_work_req_id_t id, @@ -2208,6 +2338,18 @@ send_and_wait(int32_t bm, uint32_t i = 0; nsr_recon_work_t *work; +#define CONTROL_WORKER(i) ctx->workers[i].control_worker +#define DATA_WORKER(i) ctx->workers[i].data_worker +#define WORKER(i) ((q == NSR_RECON_QUEUE_TO_CONTROL) ? (CONTROL_WORKER(i)) : (DATA_WORKER(i))) + + *result = *op_errno = 0; + + for (i=0; i < num; i++) { + if ((bm & (1 << i)) && ctx->workers[i].in_use) { + WORKER(i)->result = 0; + WORKER(i)->op_errno = 0; + } + } if (ctx->mode == NSR_SEQ) { for (i=0; i < num; i++) { if ((bm & (1 << i)) && ctx->workers[i].in_use) { @@ -2222,8 +2364,7 @@ send_and_wait(int32_t bm, } } } - nsr_driver_log(this->name, GF_LOG_INFO, "send_and_wait: all workers have returned\n"); - return; + goto out; } for (i=0; i < num; i++) { @@ -2238,41 +2379,40 @@ send_and_wait(int32_t bm, while (ctx->outstanding) { pthread_yield(); } - nsr_driver_log(this->name, GF_LOG_INFO, "send_and_wait: all workers have returned\n"); - return; -} - -#if 0 -static void -send_and_do_not_wait(int32_t bm, - uint32_t num, - nsr_recon_driver_ctx_t *ctx, - nsr_recon_work_req_id_t id, - nsr_recon_queue_type_t q, - int32_t misc) -{ - uint32_t i = 0; - - for (i=0; i < num; i++) { +out: + for (i=0; i < num; i++) { if ((bm & (1 << i)) && ctx->workers[i].in_use) { - nsr_recon_work_t *work; - recon_make_work(&work, id, misc); - recon_queue_to_worker(ctx, work, i, q); - } - } + if (WORKER(i)->result == -1) { + *result = -1; + } + } + } + if (*result == -1) { + for (i=0; i < num; i++) { + if ((bm & (1 << i)) && ctx->workers[i].in_use) { + if (WORKER(i)->op_errno == EAGAIN) { + *op_errno = EAGAIN; + break; + } else { + *op_errno = EIO; + } + } + } + } + nsr_driver_log(this->name, GF_LOG_INFO, "send_and_wait: all workers have returned with result: %d errno:%d\n", *result, *op_errno); return; } -#endif // send INI or FINI -static void +static int32_t nsr_recon_in_use(nsr_recon_driver_ctx_t *ctx, uint32_t i, gf_boolean_t in_use) { uint32_t bm = 1 << i; gf_boolean_t send = _gf_false; + int32_t status =0, op_errno = 0; if (in_use == _gf_false) { if (ctx->workers[i].in_use == _gf_true) @@ -2284,25 +2424,28 @@ nsr_recon_in_use(nsr_recon_driver_ctx_t *ctx, send = _gf_true; } } -#if 1 if (send == _gf_true) { if (in_use == _gf_true) { nsr_driver_log(this->name, GF_LOG_INFO, "sending INI to index %d\n",i); } else { nsr_driver_log(this->name, GF_LOG_INFO, "sending FINI to index %d\n",i); } - send_and_wait(bm, ctx->replica_group_size, ctx, + send_and_wait(&status, &op_errno, bm, ctx->replica_group_size, ctx, (in_use == _gf_true) ? NSR_WORK_ID_INI : NSR_WORK_ID_FINI, NSR_RECON_QUEUE_TO_CONTROL, -1); - send_and_wait(bm, ctx->replica_group_size, ctx, + if (status == -1) + return -1; + send_and_wait(&status, &op_errno, bm, ctx->replica_group_size, ctx, (in_use == _gf_true) ? NSR_WORK_ID_INI : NSR_WORK_ID_FINI, NSR_RECON_QUEUE_TO_DATA, -1); + if (status == -1) + return -1; if (in_use == _gf_false) { //GF_FREE(ctx->workers[i].recon_info->records); GF_FREE(ctx->workers[i].recon_info); } } -#endif + return 0; } // main recon driver thread @@ -2317,6 +2460,8 @@ nsr_reconciliation_driver(void *arg) int32_t bm; xlator_t *this = priv->this; char *con_name, *data_name; + int32_t status = 0; + int32_t op_errno = 0; driver_ctx = &priv->driver_thread_context; (*driver_ctx) = GF_CALLOC (1, @@ -2414,7 +2559,12 @@ nsr_reconciliation_driver(void *arg) list_for_each_entry(rr, &(ctx->role_head.list), list) { nsr_recon_driver_state_t state; - state = nsr_recon_driver_get_role(ctx, rr); + state = nsr_recon_driver_get_role(&status, ctx, rr); + + if (status == -1) { + op_errno = EIO; + goto out; + } if (state == leader) { @@ -2423,11 +2573,13 @@ nsr_reconciliation_driver(void *arg) nsr_driver_log (this->name, GF_LOG_INFO, "getting last term info from all members of this group\n"); // Get last term info from all members for this group - send_and_wait(-1, + send_and_wait(&status, &op_errno, -1, replica_group_size, ctx, NSR_WORK_ID_GET_LAST_TERM_INFO, NSR_RECON_QUEUE_TO_CONTROL, ctx->current_term); + if (status == -1) + goto out; // compare all the info received and choose the reconciliator @@ -2469,11 +2621,13 @@ nsr_reconciliation_driver(void *arg) if (chosen != 0) { nsr_driver_log (this->name, GF_LOG_INFO, "sending reconciliation work to %d\n", chosen); bm = 1 << ctx->reconciliator_index; - send_and_wait(bm, + send_and_wait(&status, &op_errno, bm, replica_group_size, ctx, NSR_WORK_ID_RECONCILIATOR_DO_WORK, NSR_RECON_QUEUE_TO_CONTROL, -1); + if (status == -1) + goto out; nsr_driver_log (this->name, GF_LOG_INFO, "finished reconciliation work to %d\n", chosen); } else { nsr_driver_log (this->name, GF_LOG_INFO, "local node is reconciliator. before set jmp\n"); @@ -2517,11 +2671,13 @@ nsr_reconciliation_driver(void *arg) nsr_driver_log (this->name, GF_LOG_INFO, "sending resolution work to all nodes except this node and reconciliator\n"); bm = ~((1 << ctx->reconciliator_index) || 1); - send_and_wait(bm, + send_and_wait(&status, &op_errno, bm, replica_group_size, ctx, NSR_WORK_ID_RESOLUTION_DO_WORK, NSR_RECON_QUEUE_TO_CONTROL, -1); + if (status == -1) + goto out; nsr_driver_log (this->name, GF_LOG_INFO, "finished reconciliation work as leader \n"); @@ -2564,11 +2720,13 @@ i_am_reconciliator: // We have set the bm in the above for loop where // we go thru all nodes including this node that // have seen the last term. - send_and_wait(bm, + send_and_wait(&status, &op_errno, bm, replica_group_size, ctx, NSR_WORK_ID_GET_RECONCILATION_WINDOW, NSR_RECON_QUEUE_TO_CONTROL, -1); + if (status == -1) + goto out; nsr_driver_log (this->name, GF_LOG_INFO, "finished getting reconciliation window for term %d from %dto %d \n", ctx->workers[0].recon_info->last_term, @@ -2604,12 +2762,14 @@ i_am_reconciliator: bm = (1 << record->work.source); nsr_driver_log (this->name, GF_LOG_INFO, "reading data from source %d\n",record->work.source); - send_and_wait(bm, + send_and_wait(&status, &op_errno, bm, replica_group_size, ctx, NSR_WORK_ID_SINGLE_RECONCILIATION_READ, NSR_RECON_QUEUE_TO_DATA, i); + if (status == -1) + goto out; nsr_driver_log (this->name, GF_LOG_INFO, "got data from source %d\n",record->work.source); } @@ -2618,12 +2778,14 @@ i_am_reconciliator: "fixing local data as part of reconciliation\n"); bm = 1; - send_and_wait(bm, + send_and_wait(&status, &op_errno, bm, replica_group_size, ctx, NSR_WORK_ID_SINGLE_RECONCILIATION_COMMIT, NSR_RECON_QUEUE_TO_DATA, i); + if (status == -1) + goto out; nsr_driver_log (this->name, GF_LOG_INFO, "finished fixing local data as part of reconciliation\n"); @@ -2634,12 +2796,14 @@ i_am_reconciliator: nsr_driver_log (this->name, GF_LOG_INFO, "fixing this record as a fill\n"); bm = 1; - send_and_wait(bm, + send_and_wait(&status, &op_errno, bm, replica_group_size, ctx, NSR_WORK_ID_SINGLE_RECONCILIATION_FLUSH, NSR_RECON_QUEUE_TO_DATA, i); + if (status == -1) + goto out; nsr_driver_log (this->name, GF_LOG_INFO, "finished fixing this record as a fill\n"); } @@ -2691,11 +2855,13 @@ i_am_resolutor: nsr_driver_log (this->name, GF_LOG_INFO, "getting info from reconciliator for term %d \n", my_info->last_term); bm = (1 << recon_index); - send_and_wait(bm, + send_and_wait(&status, &op_errno, bm, replica_group_size, ctx, NSR_WORK_ID_GET_GIVEN_TERM_INFO, NSR_RECON_QUEUE_TO_CONTROL, his_info->last_term); + if (status == -1) + goto out; nsr_driver_log (this->name, GF_LOG_INFO, "finished getting info from reconciliator for term %d \n", my_info->last_term); @@ -2739,11 +2905,13 @@ i_am_resolutor: "getting reconciliation window for term %d from %d to %d \n", my_info->last_term, my_info->first_index, my_info->last_index); - send_and_wait(bm, + send_and_wait(&status, &op_errno, bm, replica_group_size, ctx, NSR_WORK_ID_GET_RECONCILATION_WINDOW, NSR_RECON_QUEUE_TO_CONTROL, -1); + if (status == -1) + goto out; nsr_driver_log (this->name, GF_LOG_INFO, "finished getting reconciliation window for term %d from %d to %d \n", my_info->last_term, @@ -2771,12 +2939,14 @@ i_am_resolutor: nsr_driver_log (this->name, GF_LOG_INFO, "reading data from source %d\n",recon_index); bm = (1 << recon_index); - send_and_wait(bm, + send_and_wait(&status, &op_errno, bm, replica_group_size, ctx, NSR_WORK_ID_SINGLE_RECONCILIATION_READ, NSR_RECON_QUEUE_TO_DATA, i); + if (status == -1) + goto out; nsr_driver_log (this->name, GF_LOG_INFO, "finished reading data from source %d\n",recon_index); } @@ -2785,12 +2955,14 @@ i_am_resolutor: "fixing local data as part of resolutor\n"); bm = 1; - send_and_wait(bm, + send_and_wait(&status, &op_errno, bm, replica_group_size, ctx, NSR_WORK_ID_SINGLE_RECONCILIATION_COMMIT, NSR_RECON_QUEUE_TO_DATA, i); + if (status == -1) + goto out; nsr_driver_log (this->name, GF_LOG_INFO, "finished fixing local data as part of resolutor\n"); @@ -2816,21 +2988,25 @@ i_am_resolutor: nsr_driver_log (this->name, GF_LOG_INFO, "getting last term info from all members of this group\n"); // Get last term info from all members for this group // which will be the leader(this node) and the node that wants to join. - send_and_wait(-1, + send_and_wait(&status, &op_errno, -1, replica_group_size, ctx, NSR_WORK_ID_GET_LAST_TERM_INFO, NSR_RECON_QUEUE_TO_CONTROL, ctx->current_term); + if (status == -1) + goto out; // send message to other node that just joined to sync up with this node which is also the leader nsr_driver_log (this->name, GF_LOG_INFO, "sending resolution work to all nodes except this\n"); bm = ~(1); - send_and_wait(bm, + send_and_wait(&status, &op_errno, bm, replica_group_size, ctx, NSR_WORK_ID_RESOLUTION_DO_WORK, NSR_RECON_QUEUE_TO_CONTROL, -1); + if (status == -1) + goto out; nsr_driver_log (this->name, GF_LOG_INFO, "finished recon work as joiner \n"); @@ -2843,16 +3019,7 @@ i_am_resolutor: out: nsr_driver_log (this->name, GF_LOG_INFO, "sending end of reconciliation message \n"); - nsr_recon_return_back(priv, ctx->txn_id); -#if 0 - // send message that job is done by writing to local recon translator - bm = 1; - send_and_wait(bm, - replica_group_size, - ctx, - NSR_WORK_ID_END_RECONCILIATION, - NSR_RECON_QUEUE_TO_CONTROL, -1); -#endif + nsr_recon_return_back(priv, ctx->term, status, op_errno); nsr_driver_log (this->name, GF_LOG_INFO, "finished sending end of reconciliation message \n"); } diff --git a/xlators/cluster/nsr-recon/src/recon_driver.h b/xlators/cluster/nsr-recon/src/recon_driver.h index 4030c9d73..8d87e29af 100644 --- a/xlators/cluster/nsr-recon/src/recon_driver.h +++ b/xlators/cluster/nsr-recon/src/recon_driver.h @@ -181,7 +181,7 @@ typedef struct nsr_recon_record_details_s { typedef struct _nsr_role_work_s { nsr_recon_role_t role; - uint32_t txn_id; + uint32_t term; struct list_head list; } nsr_role_work_t; @@ -236,6 +236,8 @@ typedef struct _nsr_per_node_worker_s { #if defined(NSR_DEBUG) FILE *fp; #endif + int32_t result; // result of latest work + int32_t op_errno; // errno } nsr_per_node_worker_t; typedef struct _nsr_replica_worker_s { @@ -256,7 +258,7 @@ typedef struct _nsr_recon_driver_ctxt { nsr_role_work_t role_head; volatile int32_t outstanding; uint32_t reconciliator_index; - uint32_t txn_id; + uint32_t term; uint32_t current_term; jmp_buf *env; nsr_mode_t mode; // default set to seq @@ -269,7 +271,7 @@ void * nsr_reconciliation_driver(void *); gf_boolean_t -nsr_recon_driver_set_role(nsr_recon_driver_ctx_t *ctx, nsr_recon_role_t *rr, uint32_t txn_id); +nsr_recon_driver_set_role(nsr_recon_driver_ctx_t *ctx, nsr_recon_role_t *rr, uint32_t term); #define atomic_inc(ptr) ((void) __sync_fetch_and_add(ptr, 1)) #define atomic_dec(ptr) ((void) __sync_fetch_and_add(ptr, -1)) diff --git a/xlators/cluster/nsr-recon/src/recon_xlator.c b/xlators/cluster/nsr-recon/src/recon_xlator.c index c3c8d4d55..868377bd2 100644 --- a/xlators/cluster/nsr-recon/src/recon_xlator.c +++ b/xlators/cluster/nsr-recon/src/recon_xlator.c @@ -32,13 +32,6 @@ typedef struct _nsr_recon_fd_s { call_frame_t *frame; } nsr_recon_fd_t; - -typedef struct _nsr_txn_id_s { - uint32_t txn_id; - call_frame_t *frame; - struct list_head list; -} nsr_txn_id_t; - #if defined(NSR_DEBUG) void @@ -81,37 +74,34 @@ static int32_t this_fd_ctx_get(fd_t *fd, xlator_t *this, nsr_recon_fd_t **rfd) } } -// Add the frame in q after associating with txn_id +// Add the frame in q after associating with term +// term usage tbd static void put_frame(nsr_recon_private_t *priv, call_frame_t *frame, - uint32_t txn_id) + uint32_t term) { - xlator_t *this = priv->this; - nsr_txn_id_t * tid = GF_CALLOC(1, sizeof(nsr_txn_id_t), gf_mt_recon_private_t); - tid->txn_id = txn_id; - tid->frame = frame; - INIT_LIST_HEAD(&(tid->list)); - list_add_tail(&(tid->list), &(priv->list)); - recon_main_log (this->name, GF_LOG_INFO, "adding framef or txn id %d into queue \n", txn_id); + xlator_t *this = priv->this; + recon_main_log (this->name, GF_LOG_INFO, "adding frame for term %d \n", term); + priv->frame = frame; + return; } -// get the frame from the queue given the txn id +// get the frame from the queue given the term +// term usage tbd static void get_frame(nsr_recon_private_t *priv, call_frame_t **frame, - uint32_t txn_id) + uint32_t term) { - nsr_txn_id_t *tid = NULL; - xlator_t *this = priv->this; + if (frame != NULL) + *frame = priv->frame; + priv->frame = NULL; + return; +} - list_for_each_entry(tid, &(priv->list), list) { - if (tid->txn_id == txn_id) { - *frame = tid->frame; - recon_main_log (this->name, GF_LOG_INFO, "got frame for txn id %d into queue \n", txn_id); - return; - } - } - recon_main_log (this->name, GF_LOG_INFO, "got no frame for txn id %d into queue \n", txn_id); - GF_ASSERT(0); +// check if there are outstanding frames +static gf_boolean_t is_frame(nsr_recon_private_t *priv) +{ + return((priv->frame != NULL) ? _gf_true : _gf_false); } #define ENTRY_SIZE 128 @@ -215,19 +205,17 @@ void nsr_recon_libchangelog_get_last_term_info(xlator_t *this, char *bp, int32_t return; } -// Return back the frame stored against the txn_id -void nsr_recon_return_back(nsr_recon_private_t *priv, uint32_t txn_id) +// Return back the frame stored against the term +void nsr_recon_return_back(nsr_recon_private_t *priv, uint32_t term, int32_t status, int32_t op_errno) { call_frame_t *old_frame = NULL; xlator_t *this = priv->this; - int32_t op_ret = 0; - int32_t op_errno = 0; - get_frame(priv, &old_frame, txn_id); + get_frame(priv, &old_frame, term); if (old_frame) { recon_main_log (this->name, GF_LOG_INFO, "nsr_recon_writev returns old frame \n"); // first return the original write for which this ack was sent - STACK_UNWIND_STRICT (writev, old_frame, op_ret, op_errno, NULL, NULL, NULL); + STACK_UNWIND_STRICT (writev, old_frame, status, op_errno, NULL, NULL, NULL); } else { recon_main_log (this->name, GF_LOG_ERROR, "EIII---nsr_recon_writev cnnot return old frame \n"); } @@ -289,7 +277,7 @@ get_link_using_gfid(nsr_recon_private_t *priv, char *gfid, char *path) // // Really, 90% of this code should just GO AWAY in favor of using // libgfchangelog, enhanced as necessary to support our needs. -void nsr_recon_libchangelog_get_records(xlator_t *this, char *bp, int32_t term, uint32_t first, uint32_t last, void *buf) +gf_boolean_t nsr_recon_libchangelog_get_records(xlator_t *this, char *bp, int32_t term, uint32_t first, uint32_t last, void *buf) { // do a mmap; seek into the first and read all records till last. // TBD - right now all records are pseudo holes but mark them as fills. @@ -307,7 +295,9 @@ void nsr_recon_libchangelog_get_records(xlator_t *this, char *bp, int32_t term, sprintf(path,"%s/%s%d",bp,"TERM.",term); fd = open(path, O_RDONLY); - if (fd != -1) { + if (fd == -1) { + return _gf_false; + } else { char *start = NULL; nsr_recon_record_details_t * rec = (nsr_recon_record_details_t *)buf; @@ -315,7 +305,9 @@ void nsr_recon_libchangelog_get_records(xlator_t *this, char *bp, int32_t term, lseek(fd, 128, SEEK_SET); else lseek(fd, first * 128, SEEK_SET); - read(fd, rb, (last - first + 1) * 128); + if (read(fd, rb, (last - first + 1) * 128) == -1) { + return _gf_false; + } start = rb; index = first; do { @@ -532,7 +524,7 @@ finish: recon_main_log (this->name, GF_LOG_INFO, "libchangelog_get_records finsihed inspecting records for term %d \n", term); - return; + return _gf_true; } int32_t @@ -580,20 +572,6 @@ nsr_recon_writev (call_frame_t *frame, xlator_t *this, fd_t *fd, recon_main_log (this->name, GF_LOG_INFO, "nsr_recon_writev called for offset %d \n",(unsigned int)offset ); GF_ASSERT(count == 1); switch (offset) { - // gets called to return back - case nsr_recon_xlator_sector_0: - { - char c[4]; - uint32_t txn_id; - - recon_main_log (this->name, GF_LOG_INFO, "nsr_recon_writev clled to return back \n"); - memcpy((void *)c, (void *)vector[0].iov_base, 4); - txn_id = ntohl(atoi(c)); - nsr_recon_return_back(priv, txn_id); - STACK_UNWIND_STRICT (writev, frame, op_ret, op_errno, - NULL, NULL, NULL); - break; - } // client(brick, leader) writes the role of the node case nsr_recon_xlator_sector_1 : { @@ -614,22 +592,33 @@ nsr_recon_writev (call_frame_t *frame, xlator_t *this, fd_t *fd, GF_ASSERT(rr.num <= MAXIMUM_REPLICA_STRENGTH); - // Store the stack frame so that when the actual job gets finished - // we send the response back to the brick. - if (nsr_recon_driver_set_role(priv->driver_thread_context, - &rr, - priv->txn_id) == _gf_false) { + // Check if already a role play is going on. If yes return with EAGAIN. + // Ideally we should check if we have got a higher term number while + // servicing a lower term number; if so abort the older one. + // However the abort infrastructure needs to be sketched properly; TBD. + if (is_frame(priv) == _gf_true) { recon_main_log (this->name, GF_LOG_ERROR, - "nsr_recon_writev set_role - cannot seem to set role \n"); - STACK_UNWIND_STRICT (writev, frame, -1, op_errno, + "nsr_recon_writev set_role - already role play \n"); + STACK_UNWIND_STRICT (writev, frame, -1, EAGAIN, NULL, NULL, NULL); - } else { - uint32_t old = priv->txn_id; - atomic_cmpxchg(&priv->txn_id, old,old+1); - put_frame(priv, frame, old); - recon_main_log (this->name, GF_LOG_INFO, - "nsr_recon_writev set_role - set role succesfully \n"); - } + } else { + + // Store the stack frame so that when the actual job gets finished + // we send the response back to the brick. + put_frame(priv, frame, rr.current_term); + if (nsr_recon_driver_set_role(priv->driver_thread_context, + &rr, + rr.current_term) == _gf_false) { + get_frame(priv, NULL, rr.current_term); + recon_main_log (this->name, GF_LOG_ERROR, + "nsr_recon_writev set_role - cannot seem to set role \n"); + STACK_UNWIND_STRICT (writev, frame, -1, op_errno, + NULL, NULL, NULL); + } else { + recon_main_log (this->name, GF_LOG_INFO, + "nsr_recon_writev set_role - set role succesfully \n"); + } + } break; } // client(reconciliator) writes how much it needs for the read @@ -679,6 +668,14 @@ nsr_recon_writev (call_frame_t *frame, xlator_t *this, fd_t *fd, NULL, NULL, NULL); break; } + default: + { + recon_main_log (this->name, GF_LOG_ERROR, + "nsr_recon_writev called with wrong offset\n"); + STACK_UNWIND_STRICT (writev, frame, -1, op_errno, + NULL, NULL, NULL); + break; + } } return 0; @@ -764,6 +761,13 @@ nsr_recon_readv (call_frame_t *frame, xlator_t *this, memcpy(iobuf->ptr, <, size); goto out; } + default: + { + recon_main_log (this->name, GF_LOG_ERROR, + "nsr_recon_readv called with wrong offset\n"); + op_errno = -1; + break; + } } out: diff --git a/xlators/cluster/nsr-recon/src/recon_xlator.h b/xlators/cluster/nsr-recon/src/recon_xlator.h index 8c48f6ff6..c92489db1 100644 --- a/xlators/cluster/nsr-recon/src/recon_xlator.h +++ b/xlators/cluster/nsr-recon/src/recon_xlator.h @@ -70,8 +70,8 @@ _recon_main_log (const char *func, int line, char *member, FILE *fp, void nsr_recon_libchangelog_get_this_term_info(xlator_t *this, char *bp, int32_t term, nsr_recon_last_term_info_t *lt); void nsr_recon_libchangelog_get_last_term_info(xlator_t *this, char *bp, int32_t term, nsr_recon_last_term_info_t *lt); -void nsr_recon_return_back(nsr_recon_private_t *priv, uint32_t term_id); -void nsr_recon_libchangelog_get_records(xlator_t *this, char *bp, int32_t term, uint32_t first, uint32_t last, void *buf); +void nsr_recon_return_back(nsr_recon_private_t *priv, uint32_t term, int32_t status, int32_t op_errno); +gf_boolean_t nsr_recon_libchangelog_get_records(xlator_t *this, char *bp, int32_t term, uint32_t first, uint32_t last, void *buf); #endif /* #ifndef __RECON_XLATOR_H__ */ |