summaryrefslogtreecommitdiffstats
path: root/xlators/cluster/nsr-recon/src/recon_driver.c
diff options
context:
space:
mode:
authorRaghavan P <rpichai@redhat.com>2014-02-19 07:03:26 +0530
committerJeff Darcy <jdarcy@redhat.com>2014-03-03 19:41:32 +0000
commitc28972ea53cc7cdb91c7aac01754dd7f0b66e1a7 (patch)
treefc316e94c6494b282a1179bb97939909e5cbcba0 /xlators/cluster/nsr-recon/src/recon_driver.c
parent3bbfebc8dc21c469d47b576069ae137aec4567c9 (diff)
changes to NSR reconciliation code to add error handling.
Description of chnages added: 1) In recon driver, check for all glfs calls return values. 2) make the driver send back error values to other drivers or to main translator. 3) let the leader retry on errors Change-Id: I050003a819d2314c8fdfd111df465041c30ee6e3 Signed-off-by: Raghavan P <rpichai@redhat.com>
Diffstat (limited to 'xlators/cluster/nsr-recon/src/recon_driver.c')
-rw-r--r--xlators/cluster/nsr-recon/src/recon_driver.c533
1 files changed, 350 insertions, 183 deletions
diff --git a/xlators/cluster/nsr-recon/src/recon_driver.c b/xlators/cluster/nsr-recon/src/recon_driver.c
index 7f92b6578..49ee465c5 100644
--- a/xlators/cluster/nsr-recon/src/recon_driver.c
+++ b/xlators/cluster/nsr-recon/src/recon_driver.c
@@ -254,7 +254,7 @@ out:
* Output Arguments:
* buf - where the values are written one after the other (NULL seperated)
*/
-static void
+static int32_t
get_xattr(struct glfs_fd *fd,
char *keys,
char *buf,
@@ -277,7 +277,7 @@ get_xattr(struct glfs_fd *fd,
// TBD - handle error
if (r == -1)
- return;
+ return -1;
// increment the key to next value
keys += len;
@@ -285,7 +285,7 @@ get_xattr(struct glfs_fd *fd,
// increment buf to hold the next key
buf += strlen(buf) + 1;
}
- return;
+ return 0;
}
/*
@@ -296,7 +296,7 @@ get_xattr(struct glfs_fd *fd,
* keys - bunch of NULL seperated key names
* num - number of keys
*/
-static void delete_xattr(struct glfs_fd *fd,
+static int32_t delete_xattr(struct glfs_fd *fd,
dict_t *dict_t,
char *keys,
uint32_t num)
@@ -304,10 +304,11 @@ static void delete_xattr(struct glfs_fd *fd,
while(num--) {
// get the value and copy the value
// TBD - handle failure cases when calling glfs_fremovexattr_with_xdata()
- glfs_fremovexattr_with_xdata(fd, keys, dict_t);
+ if (glfs_fremovexattr_with_xdata(fd, keys, dict_t) == -1)
+ return -1;
keys += strlen(keys) +1;
}
- return;
+ return 0;
}
/*
@@ -320,7 +321,7 @@ static void delete_xattr(struct glfs_fd *fd,
* Each of the key-value is seperated by NULL in turn.
* num - Number of such key value pairs.
*/
-static void
+static int32_t
fill_xattr(struct glfs_fd *fd,
dict_t *dict,
char *buf,
@@ -336,10 +337,10 @@ fill_xattr(struct glfs_fd *fd,
// TBD - handle failure cases when calling glfs_fsetxattr_with_xdata()
r = glfs_fsetxattr_with_xdata(fd, k, val, strlen(val), 0, dict);
if (r == -1)
- return;
+ return -1;
k = val + strlen(val) + 1;
}
- return;
+ return 0;
}
/*
@@ -408,7 +409,7 @@ nsr_recon_get_file(char *vol, nsr_replica_worker_t *worker)
* ctx - The per worker based context
* control - set to true if this worker is for the control plane
*/
-static int
+static int32_t
nsr_recon_start_work(nsr_per_node_worker_t *ctx,
gf_boolean_t control)
{
@@ -497,6 +498,7 @@ nsr_recon_start_work(nsr_per_node_worker_t *ctx,
if (aux_fd == NULL) {
nsr_worker_log(this->name, GF_LOG_ERROR,
"cannot open aux log file for thread %s\n",ctx->id);
+ return -1;
} else {
nsr_worker_log(this->name, GF_LOG_ERROR,
"---opened aux log file for thread %s\n",ctx->id);
@@ -517,7 +519,7 @@ nsr_recon_start_work(nsr_per_node_worker_t *ctx,
* ctx - The per worker based context
* control - set to true if this worker is for the control plane
*/
-static int
+static int32_t
nsr_recon_end_work(nsr_per_node_worker_t *ctx,
gf_boolean_t control)
{
@@ -648,13 +650,7 @@ control_worker_func_0(nsr_per_node_worker_t *ctx,
nsr_worker_log(this->name, GF_LOG_INFO,
"this message should not be sent \n");
- break;
- }
- case NSR_WORK_ID_END_RECONCILIATION:
- {
- nsr_worker_log(this->name, GF_LOG_INFO,
- "sending reconciliation end message to node %d\n", index);
- nsr_recon_return_back(priv, dr->txn_id);
+ ctx->result = -1;
break;
}
case NSR_WORK_ID_GET_RECONCILATION_WINDOW:
@@ -676,16 +672,29 @@ control_worker_func_0(nsr_per_node_worker_t *ctx,
rd = GF_CALLOC(num,
sizeof(nsr_recon_record_details_t),
gf_mt_recon_private_t);
+ if (rd == NULL) {
+ ctx->result = -1;
+ return;
+ }
+
recon_info->records = GF_CALLOC(num,
sizeof(nsr_reconciliation_record_t),
gf_mt_recon_private_t);
+ if (recon_info->records == NULL) {
+ ctx->result = -1;
+ return;
+ }
// TBD - handle errors
- nsr_recon_libchangelog_get_records(this, priv->changelog_base_path,
+ if (nsr_recon_libchangelog_get_records(this, priv->changelog_base_path,
recon_info->last_term,
recon_info->first_index,
recon_info->last_index,
- rd);
+ rd) == _gf_false) {
+ ctx->result = -1;
+ return;
+ }
+
// The above function writes into rd from 0 to (num -1)
// We need to take care of this whenever we deal with records
for (i=0; i < num; i++) {
@@ -778,7 +787,10 @@ control_worker_func(nsr_per_node_worker_t *ctx,
"calling nsr_recon_start_work\n");
// TBD - handle error in case nsr_recon_start_work gives error
- nsr_recon_start_work(ctx, _gf_true);
+ if (nsr_recon_start_work(ctx, _gf_true) != 0) {
+ ctx->result = -1;
+ return;
+ }
nsr_worker_log(this->name, GF_LOG_INFO,
"finished nsr_recon_start_work\n");
@@ -790,7 +802,10 @@ control_worker_func(nsr_per_node_worker_t *ctx,
"calling nsr_recon_end_work\n");
// TBD - handle error in case nsr_recon_end_work gives error
- nsr_recon_end_work(ctx, _gf_true);
+ if (nsr_recon_end_work(ctx, _gf_true) != 0) {
+ ctx->result = -1;
+ return;
+ }
nsr_worker_log(this->name, GF_LOG_INFO,
"finished nsr_recon_end_work\n");
@@ -807,9 +822,18 @@ control_worker_func(nsr_per_node_worker_t *ctx,
// first write the current term term number
// TBD - error handling for all the glfs APIs
- glfs_lseek(ctx->aux_fd, nsr_recon_xlator_sector_4, SEEK_SET);
- glfs_write(ctx->aux_fd, &term, sizeof(term), 0);
- glfs_read(ctx->aux_fd, &lt, sizeof(lt), 0);
+ if (glfs_lseek(ctx->aux_fd, nsr_recon_xlator_sector_4, SEEK_SET) == -1) {
+ ctx->result = -1;
+ return;
+ }
+ if (glfs_write(ctx->aux_fd, &term, sizeof(term), 0) == -1) {
+ ctx->result = -1;
+ return;
+ }
+ if (glfs_read(ctx->aux_fd, &lt, sizeof(lt), 0) == -1) {
+ ctx->result = -1;
+ return;
+ }
ENDIAN_CONVERSION_LT(lt, _gf_true); //ntohl
recon_info->last_term = lt.last_term;
recon_info->commited_ops = lt.commited_ops;
@@ -834,9 +858,18 @@ control_worker_func(nsr_per_node_worker_t *ctx,
// first write the term number
// TBD - error handling for all the glfs APIs
- glfs_lseek(ctx->aux_fd, nsr_recon_xlator_sector_3, SEEK_SET);
- glfs_write(ctx->aux_fd, &term, sizeof(term), 0);
- glfs_read(ctx->aux_fd, &lt, sizeof(lt), 0);
+ if (glfs_lseek(ctx->aux_fd, nsr_recon_xlator_sector_3, SEEK_SET) == -1) {
+ ctx->result = -1;
+ return;
+ }
+ if (glfs_write(ctx->aux_fd, &term, sizeof(term), 0) == -1) {
+ ctx->result = -1;
+ return;
+ }
+ if (glfs_read(ctx->aux_fd, &lt, sizeof(lt), 0) == -1) {
+ ctx->result = -1;
+ return;
+ }
ENDIAN_CONVERSION_LT(lt, _gf_true); //ntohl
recon_info->last_term = lt.last_term;
recon_info->commited_ops = lt.commited_ops;
@@ -863,9 +896,12 @@ control_worker_func(nsr_per_node_worker_t *ctx,
"trying to make this index %d as reconciliator for term %d\n", index, term);
// TBD - error handling for all the glfs APIs
- glfs_lseek(ctx->aux_fd,
+ if (glfs_lseek(ctx->aux_fd,
nsr_recon_xlator_sector_1,
- SEEK_SET);
+ SEEK_SET) == -1) {
+ ctx->result = -1;
+ return;
+ }
// We have all the info for all other nodes.
// Fill all that info when sending data to that process.
@@ -888,7 +924,15 @@ control_worker_func(nsr_per_node_worker_t *ctx,
rr.num = num;
rr.role = reconciliator;
ENDIAN_CONVERSION_RR(rr, _gf_false); //htonl
- glfs_write(ctx->aux_fd, &rr, sizeof(rr), 0);
+ if (glfs_write(ctx->aux_fd, &rr, sizeof(rr), 0) == -1) {
+ ctx->result = -1;
+ // Put the errno only for this case since we
+ // are bothered about retrying only for this case.
+ // For rest of the cases we will just return EIO
+ // in errno.
+ ctx->op_errno = errno;
+ return;
+ }
nsr_worker_log(this->name, GF_LOG_INFO,
"sent reconciliator info for term %d with node count as %d\n", term, num);
@@ -905,14 +949,18 @@ control_worker_func(nsr_per_node_worker_t *ctx,
"trying to make this index %d as resolutor with reconciliator as %d\n",index, rec);
// TBD - error handling for all the glfs APIs
- glfs_lseek(ctx->aux_fd,
+ if (glfs_lseek(ctx->aux_fd,
nsr_recon_xlator_sector_1,
- SEEK_SET);
+ SEEK_SET) == -1) {
+ ctx->result = -1;
+ return;
+ }
+
rr.num = 2;
// Fill in info[0] as info for the node for which we are seeking resolution.
// Fill in info[1] as info of the reconciliator node.
- // The function nsr_recon_driver_set_role() that will be called when
+ // The function nsr_recon_driver_get_role() that will be called when
// this message reaches the node will look at index 1 for term information
// related to the reconciliator.
for (i=0; i < 2; i++) {
@@ -943,33 +991,21 @@ control_worker_func(nsr_per_node_worker_t *ctx,
}
rr.role = resolutor;
ENDIAN_CONVERSION_RR(rr, _gf_false); //htonl
- glfs_write(ctx->aux_fd, &rr, sizeof(rr), 0);
+ if (glfs_write(ctx->aux_fd, &rr, sizeof(rr), 0) == -1) {
+ ctx->result = -1;
+ // Put the errno only for this case since we
+ // are bothered about retrying only for this case.
+ // For rest of the cases we will just return EIO
+ // in errno.
+ ctx->op_errno = errno;
+ return;
+ }
nsr_worker_log(this->name, GF_LOG_INFO,
"sent message to this node %d resolutor with reconciliator as %d\n", index, rec);
break;
}
- case NSR_WORK_ID_END_RECONCILIATION:
- {
- char c[4];
- uint32_t old = htonl(dr->txn_id);
-
- nsr_worker_log(this->name, GF_LOG_INFO,
- "sending reconciliation end message to node %d\n", index);
-
- memcpy(c, &old, sizeof(uint32_t));
- // TBD - error handling for all the glfs APIs
- glfs_lseek(ctx->aux_fd,
- nsr_recon_xlator_sector_0,
- SEEK_SET);
- glfs_write(ctx->aux_fd, c, sizeof(c), 0);
-
- nsr_worker_log(this->name, GF_LOG_INFO,
- "finished sending reconciliation end message to node %d\n", index);
-
- break;
- }
case NSR_WORK_ID_GET_RECONCILATION_WINDOW:
{
nsr_recon_log_info_t li;
@@ -985,24 +1021,42 @@ control_worker_func(nsr_per_node_worker_t *ctx,
// TBD - error handling for all the glfs APIs
- glfs_lseek(ctx->aux_fd, nsr_recon_xlator_sector_2, SEEK_SET);
+ if (glfs_lseek(ctx->aux_fd, nsr_recon_xlator_sector_2, SEEK_SET) == -1) {
+ ctx->result = -1;
+ return;
+ }
// write to node what term & indices we are interested
li.term = recon_info->last_term;
li.first_index = recon_info->first_index;
li.last_index = recon_info->last_index;
ENDIAN_CONVERSION_LI(li, _gf_false); //htonl
- glfs_write(ctx->aux_fd, &li, sizeof(li), 0);
+ if (glfs_write(ctx->aux_fd, &li, sizeof(li), 0) == -1) {
+ ctx->result = -1;
+ return;
+ }
// then read
rd = GF_CALLOC(num,
sizeof(nsr_recon_record_details_t),
gf_mt_recon_private_t);
+ if (rd == NULL) {
+ ctx->result = -1;
+ return;
+ }
recon_info->records = GF_CALLOC(num,
sizeof(nsr_reconciliation_record_t),
gf_mt_recon_private_t);
+ if (recon_info->records == NULL) {
+ ctx->result = -1;
+ return;
+ }
+
+ if (glfs_read(ctx->aux_fd, rd, num * sizeof(nsr_recon_record_details_t), 0) == -1) {
+ ctx->result = -1;
+ return;
+ }
- glfs_read(ctx->aux_fd, rd, num * sizeof(nsr_recon_record_details_t), 0);
for (i=0; i < num; i++) {
ENDIAN_CONVERSION_RD(rd[i], _gf_true); //ntohl
memcpy(&(recon_info->records[i].rec),
@@ -1132,7 +1186,7 @@ compute_reconciliation_work(nsr_recon_driver_ctx_t *ctx)
return;
}
-static void
+static int32_t
nsr_recon_in_use(nsr_recon_driver_ctx_t *ctx,
uint32_t i,
gf_boolean_t in_use);
@@ -1145,14 +1199,14 @@ nsr_recon_in_use(nsr_recon_driver_ctx_t *ctx,
gf_boolean_t
nsr_recon_driver_set_role(nsr_recon_driver_ctx_t *ctx,
nsr_recon_role_t *rr,
- uint32_t txn_id)
+ uint32_t term)
{
nsr_role_work_t *rw;
nsr_driver_log(this->name, GF_LOG_INFO, "set role called \n");
rw = GF_CALLOC(1, sizeof (nsr_role_work_t), 0);
memcpy(&rw->role, rr, sizeof(nsr_recon_role_t));
- rw->txn_id = txn_id;
+ rw->term = term;
INIT_LIST_HEAD(&(rw->list));
pthread_mutex_lock(&(ctx->mutex));
list_add_tail(&rw->list, &ctx->role_head.list);
@@ -1176,15 +1230,11 @@ nsr_recon_driver_set_role(nsr_recon_driver_ctx_t *ctx,
* If resolution to be done, then rr.info[0] will have this node's info
* which the leader would have got earlier. rr[1].info will have the
* info regarding the reconciliator.
- * txn_id - All role changes(except when leader becomes reconciliator or resolutor)
- * would be initiated as write to the recon xlator which would have got a frame from
- * either the brick process(leader change) or other reconciliation process.
- * The write function would return immediately after storing the frame which
- * needs to be returned back after the actual reconciliation is done.
- * For that we store the frame against this id which acts as a key.
+ * term - leader's term that is causing this role
*/
nsr_recon_driver_state_t
-nsr_recon_driver_get_role(nsr_recon_driver_ctx_t *ctx,
+nsr_recon_driver_get_role(int32_t *status,
+ nsr_recon_driver_ctx_t *ctx,
nsr_role_work_t *rw)
{
uint8_t i=0, j=0;
@@ -1192,17 +1242,24 @@ nsr_recon_driver_get_role(nsr_recon_driver_ctx_t *ctx,
// First make all the threads uninitialise
for (i = 0; i < ctx->replica_group_size; i++) {
- nsr_recon_in_use(ctx, i, _gf_false);
+ if (nsr_recon_in_use(ctx, i, _gf_false) == -1) {
+ *status = -1;
+ return 0;
+ }
}
if ((rr->role == leader) || (rr->role == joiner)) {
// First set info this node
- nsr_recon_in_use(ctx, 0, _gf_true);
+ if (nsr_recon_in_use(ctx, 0, _gf_true) == -1) {
+ *status = -1;
+ return 0;
+ }
ctx->workers[0].recon_info = GF_CALLOC (1,
sizeof (nsr_reconciliator_info_t),
gf_mt_recon_private_t);
if (!ctx->workers[0].recon_info) {
- return _gf_false;
+ *status = -1;
+ return 0;
}
ctx->current_term = rr->current_term;
@@ -1213,11 +1270,14 @@ nsr_recon_driver_get_role(nsr_recon_driver_ctx_t *ctx,
if (!strcmp(ctx->workers[i].name, rr->info[j].name)) {
//if (strstr(ctx->workers[i].name, rr->info[j].name)) {
nsr_driver_log(this->name, GF_LOG_INFO,
- "nsr_recon_driver_set_role: this as %s. found other server %s\n",
+ "nsr_recon_driver_get_role: this as %s. found other server %s\n",
(rr->role == leader) ? "leader" : "joiner",
ctx->workers[i].name);
- nsr_recon_in_use(ctx, i, _gf_true);
+ if (nsr_recon_in_use(ctx, i, _gf_true) == -1) {
+ *status = -1;
+ return 0;
+ }
// Allocate this here. This will get later filled when
// the leader tries to get last term information from all
// the nodes
@@ -1225,7 +1285,8 @@ nsr_recon_driver_get_role(nsr_recon_driver_ctx_t *ctx,
sizeof (nsr_reconciliator_info_t),
gf_mt_recon_private_t);
if (!ctx->workers[i].recon_info) {
- return _gf_false;
+ *status = -1;
+ return 0;
}
break;
}
@@ -1245,13 +1306,14 @@ nsr_recon_driver_get_role(nsr_recon_driver_ctx_t *ctx,
if (!strcmp(rr->info[i].name, ctx->workers[j].name)) {
//if (strstr(ctx->workers[j].name, rr->info[i].name)) {
nsr_driver_log(this->name, GF_LOG_INFO,
- "nsr_recon_driver_set_role: this as reconciliator. found other server %s\n",
+ "nsr_recon_driver_get_role: this as reconciliator. found other server %s\n",
ctx->workers[j].name);
ctx->workers[j].recon_info = GF_CALLOC (1,
sizeof (nsr_reconciliator_info_t),
gf_mt_recon_private_t);
if (!ctx->workers[j].recon_info) {
- return _gf_false;
+ *status = -1;
+ return 0;
}
ctx->workers[j].recon_info->last_term =
rr->info[i].last_term;
@@ -1261,7 +1323,10 @@ nsr_recon_driver_get_role(nsr_recon_driver_ctx_t *ctx,
rr->info[i].last_index;
ctx->workers[j].recon_info->first_index =
rr->info[i].first_index;
- nsr_recon_in_use(ctx, j, _gf_true);
+ if (nsr_recon_in_use(ctx, j, _gf_true) == -1) {
+ *status = -1;
+ return 0;
+ }
break;
}
}
@@ -1272,13 +1337,14 @@ nsr_recon_driver_get_role(nsr_recon_driver_ctx_t *ctx,
//if (strstr(ctx->workers[j].name, rr->info[1].name)) {
if (!strcmp(rr->info[1].name, ctx->workers[j].name)) {
nsr_driver_log(this->name, GF_LOG_INFO,
- "nsr_recon_driver_set_role: this as resolutor. found other server %s as reconciliator\n",
- ctx->workers[1].name);
+ "nsr_recon_driver_get_role: this as resolutor. found other server %s as reconciliator\n",
+ ctx->workers[j].name);
ctx->workers[j].recon_info = GF_CALLOC (1,
sizeof (nsr_reconciliator_info_t),
gf_mt_recon_private_t);
if (!ctx->workers[j].recon_info) {
- return _gf_false;
+ *status = -1;
+ return 0;
}
ctx->workers[j].recon_info->last_term =
rr->info[1].last_term;
@@ -1289,7 +1355,10 @@ nsr_recon_driver_get_role(nsr_recon_driver_ctx_t *ctx,
ctx->workers[j].recon_info->first_index =
rr->info[1].first_index;
ctx->reconciliator_index = j;
- nsr_recon_in_use(ctx, j, _gf_true);
+ if (nsr_recon_in_use(ctx, j, _gf_true) == -1) {
+ *status = -1;
+ return 0;
+ }
GF_ASSERT(ctx->reconciliator_index != 0);
break;
}
@@ -1298,18 +1367,23 @@ nsr_recon_driver_get_role(nsr_recon_driver_ctx_t *ctx,
sizeof (nsr_reconciliator_info_t),
gf_mt_recon_private_t);
if (!ctx->workers[0].recon_info) {
- return _gf_false;
+ *status = -1;
+ return 0;
}
// info[0] has all info for this node
ctx->workers[0].recon_info->last_term = rr->info[0].last_term;
ctx->workers[0].recon_info->commited_ops = rr->info[0].commited_ops;
ctx->workers[0].recon_info->last_index = rr->info[0].last_index;
ctx->workers[0].recon_info->first_index = rr->info[0].first_index;
- nsr_recon_in_use(ctx, 0, _gf_true);
+ if (nsr_recon_in_use(ctx, 0, _gf_true) == -1) {
+ *status = -1;
+ return 0;
+ }
}
- ctx->txn_id = rw->txn_id;
+ ctx->term = rw->term;
+ *status = 0;
return rr->role;
}
@@ -1401,7 +1475,7 @@ create_obj(nsr_per_node_worker_t *ctx, char *gfid_str)
* and the changelog translator consumes term and index.
*/
-static void
+static gf_boolean_t
apply_record(nsr_per_node_worker_t *ctx,
nsr_reconciliation_record_t *ri,
dict_t * dict)
@@ -1420,10 +1494,11 @@ apply_record(nsr_per_node_worker_t *ctx,
// TBD - get a way to just stuff the log entry without writing the data so that
// changelogs remain identical.
if (ri->work.data == NULL) {
- return;
+ return _gf_true;
}
- if ((obj = create_obj(ctx,ri->rec.gfid)) == NULL) return;
+ if ((obj = create_obj(ctx,ri->rec.gfid)) == NULL)
+ return _gf_false;;
fd = glfs_h_open_with_xdata(ctx->fs, obj, O_RDWR, dict);
if (fd == NULL) {
@@ -1431,23 +1506,28 @@ apply_record(nsr_per_node_worker_t *ctx,
nsr_worker_log(this->name, GF_LOG_ERROR,
"open for file %s failed\n",
ri->rec.gfid);
- return;
+ return _gf_false;
}
if (glfs_lseek_with_xdata(fd, ri->rec.offset, SEEK_SET, dict) != ri->rec.offset) {
GF_ASSERT(0);
nsr_worker_log(this->name, GF_LOG_ERROR,
"lseek for file %s failed at offset %d\n",
ri->rec.gfid, ri->rec.offset);
- return;
+ return _gf_false;
}
if (glfs_write_with_xdata(fd, ri->work.data, ri->rec.len, 0, dict) != ri->rec.len) {
GF_ASSERT(0);
nsr_worker_log(this->name, GF_LOG_ERROR,
"write for file %s failed for bytes %d\n",
ri->rec.gfid, ri->rec.len);
- return;
+ return _gf_false;
}
- glfs_close_with_xdata(fd, dict);
+ if (glfs_close_with_xdata(fd, dict) == -1) {
+ GF_ASSERT(0);
+ nsr_worker_log(this->name, GF_LOG_ERROR,
+ "close failed\n");
+ return _gf_false;
+ }
nsr_worker_log(this->name, GF_LOG_INFO,
"Finished DOing write for gfid %s @offset %d for len %d\n",
@@ -1459,7 +1539,7 @@ apply_record(nsr_per_node_worker_t *ctx,
"DOing truncate for file %s @offset %d \n",
ri->rec.gfid, ri->rec.offset);
- if ((obj = create_obj(ctx, ri->rec.gfid)) == NULL) return;
+ if ((obj = create_obj(ctx, ri->rec.gfid)) == NULL) return _gf_false;
fd = glfs_h_open_with_xdata(ctx->fs, obj, O_RDWR, dict);
if (fd == NULL) {
@@ -1467,16 +1547,21 @@ apply_record(nsr_per_node_worker_t *ctx,
nsr_worker_log(this->name, GF_LOG_ERROR,
"open for file %s failed\n",
ri->rec.gfid);
- return;
+ return _gf_false;
}
if (glfs_ftruncate_with_xdata(fd, ri->rec.offset, dict) == -1) {
GF_ASSERT(0);
nsr_worker_log(this->name, GF_LOG_ERROR,
"trunctae for file %s failed @offset %d\n",
ri->rec.gfid,ri->rec.offset );
- return;
+ return _gf_false;
}
- glfs_close_with_xdata(fd, dict);
+ if (glfs_close_with_xdata(fd, dict) == -1) {
+ GF_ASSERT(0);
+ nsr_worker_log(this->name, GF_LOG_ERROR,
+ "close failed\n");
+ return _gf_false;
+ }
nsr_worker_log(this->name, GF_LOG_INFO,
"Finished DOing truncate for gfid %s @offset %d \n",
@@ -1499,10 +1584,10 @@ apply_record(nsr_per_node_worker_t *ctx,
// TBD - get a way to just stuff the log entry without writing the data so that
// changelogs remain identical.
if (ri->work.data == NULL) {
- return;
+ return _gf_true;
}
- if ((obj = create_obj(ctx, ri->rec.gfid)) == NULL) return;
+ if ((obj = create_obj(ctx, ri->rec.gfid)) == NULL) return _gf_false;
if (obj->inode->ia_type == IA_IFDIR)
fd = glfs_h_opendir_with_xdata(ctx->fs, obj, dict);
@@ -1513,7 +1598,7 @@ apply_record(nsr_per_node_worker_t *ctx,
nsr_worker_log(this->name, GF_LOG_ERROR,
"open for file %s failed\n",
ri->rec.gfid);
- return;
+ return _gf_false;
}
if(get_xattr_total_size(fd, &t_b, &k_s, &v_s, &num, dict) == -1) {
@@ -1521,10 +1606,15 @@ apply_record(nsr_per_node_worker_t *ctx,
GF_ASSERT(0);
nsr_worker_log(this->name, GF_LOG_ERROR,
"list of xattr of %s failed\n", ri->rec.gfid);
- return;
+ return _gf_false;
}
- delete_xattr(fd, dict, t_b, num);
+ if (delete_xattr(fd, dict, t_b, num) == -1) {
+ GF_ASSERT(0);
+ nsr_worker_log(this->name, GF_LOG_ERROR,
+ "deleting xattrs failed\n");
+ return _gf_false;
+ }
// Set one special dict flag to indicate the opcode so that
// the opcode gets set to this
@@ -1532,12 +1622,22 @@ apply_record(nsr_per_node_worker_t *ctx,
GF_ASSERT(0);
nsr_worker_log(this->name, GF_LOG_ERROR,
"setting opcode to %d failed\n",ri->rec.op);
- return;
+ return _gf_false;
}
- fill_xattr(fd, dict, ri->work.data, ri->work.num);
+ if (fill_xattr(fd, dict, ri->work.data, ri->work.num) == -1) {
+ GF_ASSERT(0);
+ nsr_worker_log(this->name, GF_LOG_ERROR,
+ "filling xattrs failed\n");
+ return _gf_false;
+ }
- glfs_close_with_xdata(fd, dict);
+ if (glfs_close_with_xdata(fd, dict) == -1) {
+ GF_ASSERT(0);
+ nsr_worker_log(this->name, GF_LOG_ERROR,
+ "close failed\n");
+ return _gf_false;
+ }
nsr_worker_log(this->name, GF_LOG_INFO,
"Finsihed Doing set extended attr for %s \n",
@@ -1553,7 +1653,7 @@ apply_record(nsr_per_node_worker_t *ctx,
// TBD - add mode and flags later
uuid_parse(ri->rec.gfid, gfid);
- if ((obj = create_obj(ctx, ri->rec.pargfid)) == NULL) return;
+ if ((obj = create_obj(ctx, ri->rec.pargfid)) == NULL) return _gf_false;
nsr_worker_log (this->name, GF_LOG_INFO,
"creating with mode 0%o", ri->rec.mode);
@@ -1562,7 +1662,7 @@ apply_record(nsr_per_node_worker_t *ctx,
nsr_worker_log(this->name, GF_LOG_ERROR,
"Failure for Doing create for file %s\n",
ri->rec.entry);
- return;
+ return _gf_false;
}
nsr_worker_log(this->name, GF_LOG_INFO,
@@ -1579,14 +1679,14 @@ apply_record(nsr_per_node_worker_t *ctx,
// TBD - add mode and flags later
uuid_parse(ri->rec.gfid, gfid);
- if ((obj = create_obj(ctx, ri->rec.pargfid)) == NULL) return;
+ if ((obj = create_obj(ctx, ri->rec.pargfid)) == NULL) return _gf_false;
if (glfs_h_mknod_with_xdata(ctx->fs, obj, ri->rec.entry, O_RDWR, 0777, NULL, gfid, dict) == NULL) {
GF_ASSERT(0);
nsr_worker_log(this->name, GF_LOG_ERROR,
"Failure for Doing mknod for file %s\n",
ri->rec.entry);
- return;
+ return _gf_false;
}
nsr_worker_log(this->name, GF_LOG_INFO,
@@ -1603,14 +1703,14 @@ apply_record(nsr_per_node_worker_t *ctx,
// TBD - add mode and flags later
uuid_parse(ri->rec.gfid, gfid);
- if ((obj = create_obj(ctx, ri->rec.pargfid)) == NULL) return;
+ if ((obj = create_obj(ctx, ri->rec.pargfid)) == NULL) return _gf_false;
if (glfs_h_mkdir_with_xdata(ctx->fs, obj, ri->rec.entry, 0777, NULL, gfid, dict) != 0) {
GF_ASSERT(0);
nsr_worker_log(this->name, GF_LOG_ERROR,
"Failure for Doing mkdir for file %s\n",
ri->rec.entry);
- return;
+ return _gf_false;
}
nsr_worker_log(this->name, GF_LOG_INFO,
@@ -1623,13 +1723,13 @@ apply_record(nsr_per_node_worker_t *ctx,
"Doing rmdir/ublink for dir %s \n",
ri->rec.entry);
- if ((obj = create_obj(ctx, ri->rec.pargfid)) == NULL) return;
+ if ((obj = create_obj(ctx, ri->rec.pargfid)) == NULL) return _gf_false;
if (glfs_h_unlink_with_xdata(ctx->fs, obj, ri->rec.entry, dict) != 0) {
GF_ASSERT(0);
nsr_worker_log(this->name, GF_LOG_ERROR,
"Failure for Doing rmdir/unlink for file %s\n",
ri->rec.entry);
- return;
+ return _gf_false;
}
nsr_worker_log(this->name, GF_LOG_INFO,
@@ -1644,7 +1744,7 @@ apply_record(nsr_per_node_worker_t *ctx,
"Doing symlink for file %s to file %s \n",
ri->rec.entry, ri->rec.link_path);
- if ((obj = create_obj(ctx, ri->rec.pargfid)) == NULL) return;
+ if ((obj = create_obj(ctx, ri->rec.pargfid)) == NULL) return _gf_false;
uuid_parse(ri->rec.gfid, gfid);
if (glfs_h_symlink_with_xdata(ctx->fs, obj, ri->rec.entry, ri->rec.link_path, NULL, gfid, dict) == NULL) {
@@ -1652,7 +1752,7 @@ apply_record(nsr_per_node_worker_t *ctx,
nsr_worker_log(this->name, GF_LOG_ERROR,
"Failed to Doing symlink for file %s to file %s \n",
ri->rec.entry, ri->rec.link_path);
- return;
+ return _gf_false;
}
nsr_worker_log(this->name, GF_LOG_INFO,
@@ -1667,15 +1767,15 @@ apply_record(nsr_per_node_worker_t *ctx,
"Doing hard link for file %s to file %s \n",
ri->rec.entry, ri->rec.gfid);
- if ((obj = create_obj(ctx, ri->rec.pargfid)) == NULL) return;
- if ((to_obj = create_obj(ctx, ri->rec.gfid)) == NULL) return;
+ if ((obj = create_obj(ctx, ri->rec.pargfid)) == NULL) return _gf_false;
+ if ((to_obj = create_obj(ctx, ri->rec.gfid)) == NULL) return _gf_false;
if (glfs_h_link_with_xdata(ctx->fs, to_obj, obj, ri->rec.entry, dict) == -1) {
GF_ASSERT(0);
nsr_worker_log(this->name, GF_LOG_ERROR,
"Failed to Doing hard link for file %s to file %s \n",
ri->rec.entry, ri->rec.gfid);
- return;
+ return _gf_false;
}
nsr_worker_log(this->name, GF_LOG_INFO,
@@ -1690,15 +1790,15 @@ apply_record(nsr_per_node_worker_t *ctx,
"Doing rename for file %s to file %s \n",
ri->rec.entry, ri->rec.newloc);
- if ((obj = create_obj(ctx, ri->rec.pargfid)) == NULL) return;
- if ((to_obj = create_obj(ctx, ri->rec.gfid)) == NULL) return;
+ if ((obj = create_obj(ctx, ri->rec.pargfid)) == NULL) return _gf_false;
+ if ((to_obj = create_obj(ctx, ri->rec.gfid)) == NULL) return _gf_false;
if (glfs_h_rename_with_xdata(ctx->fs, obj, ri->rec.entry, to_obj, ri->rec.newloc, dict) == -1) {
GF_ASSERT(0);
nsr_worker_log(this->name, GF_LOG_ERROR,
"Failed to Doing rename for file %s to file %s \n",
ri->rec.entry, ri->rec.newloc);
- return;
+ return _gf_false;
}
nsr_worker_log(this->name, GF_LOG_INFO,
@@ -1719,7 +1819,7 @@ apply_record(nsr_per_node_worker_t *ctx,
"Doing attr for file %s \n",
ri->rec.gfid);
- if ((obj = create_obj(ctx, ri->rec.gfid)) == NULL) return;
+ if ((obj = create_obj(ctx, ri->rec.gfid)) == NULL) return _gf_false;
if (obj->inode->ia_type == IA_IFDIR)
fd = glfs_h_opendir_with_xdata(ctx->fs, obj, dict);
@@ -1730,7 +1830,7 @@ apply_record(nsr_per_node_worker_t *ctx,
nsr_worker_log(this->name, GF_LOG_ERROR,
"open for file %s failed\n",
ri->rec.gfid);
- return;
+ return _gf_false;
}
iatt.ia_prot = ia_prot_from_st_mode(777);
@@ -1743,7 +1843,7 @@ apply_record(nsr_per_node_worker_t *ctx,
GF_ASSERT(0);
nsr_worker_log(this->name, GF_LOG_ERROR,
"setting opcode to %d failed\n",ri->rec.op);
- return;
+ return _gf_false;
}
ret = glfs_fsetattr_with_xdata(fd, &iatt, valid, dict);
@@ -1752,17 +1852,22 @@ apply_record(nsr_per_node_worker_t *ctx,
nsr_worker_log(this->name, GF_LOG_INFO,
"failed Doing attr for file %s \n",
ri->rec.gfid);
- return;
+ return _gf_false;
}
- glfs_close_with_xdata(fd, dict);
+ if (glfs_close_with_xdata(fd, dict) == -1) {
+ GF_ASSERT(0);
+ nsr_worker_log(this->name, GF_LOG_ERROR,
+ "close failed\n");
+ return _gf_false;
+ }
nsr_worker_log(this->name, GF_LOG_INFO,
"Doing attr for file %s \n",
ri->rec.gfid);
}
- return;
+ return _gf_true;
}
//return back opcodes that requires reading from source
@@ -1816,7 +1921,10 @@ data_worker_func(nsr_per_node_worker_t *ctx,
nsr_worker_log(this->name, GF_LOG_INFO,
"started data ini \n");
- nsr_recon_start_work(ctx, _gf_false);
+ if (nsr_recon_start_work(ctx, _gf_false) != 0) {
+ ctx->result = -1;
+ return;
+ }
nsr_worker_log(this->name, GF_LOG_INFO,
"finished data ini \n");
@@ -1825,7 +1933,10 @@ data_worker_func(nsr_per_node_worker_t *ctx,
nsr_worker_log(this->name, GF_LOG_INFO,
"started data fini \n");
- nsr_recon_end_work(ctx, _gf_false);
+ if (nsr_recon_end_work(ctx, _gf_false) != 0) {
+ ctx->result = -1;
+ return;
+ }
nsr_worker_log(this->name, GF_LOG_INFO,
"finished data fini \n");
@@ -1838,18 +1949,21 @@ data_worker_func(nsr_per_node_worker_t *ctx,
dict = dict_new ();
if (!dict) {
+ ctx->result = -1;
GF_ASSERT(0);
nsr_worker_log(this->name, GF_LOG_ERROR,
"failed allocating for dictionary\n");
break;
}
if (dict_set_int32(dict,RECON_TERM_XATTR,ri->work.term)) {
+ ctx->result = -1;
GF_ASSERT(0);
nsr_worker_log(this->name, GF_LOG_ERROR,
"error setting term in dict\n");
break;
}
if (dict_set_int32(dict,RECON_INDEX_XATTR,ri->work.index)) {
+ ctx->result = -1;
GF_ASSERT(0);
nsr_worker_log(this->name, GF_LOG_ERROR,
"error setting term in dict\n");
@@ -1978,7 +2092,12 @@ data_worker_func(nsr_per_node_worker_t *ctx,
}
ri->work.data = GF_CALLOC ((k_s + v_s) , sizeof(char),
gf_mt_recon_private_t);
- get_xattr(fd, t_b, ri->work.data, v_s, num, dict);
+ if (get_xattr(fd, t_b, ri->work.data, v_s, num, dict) == -1) {
+ GF_ASSERT(0);
+ nsr_worker_log(this->name, GF_LOG_ERROR,
+ "get xattr of gfid %s failed\n", rd->gfid);
+ break;
+ }
ri->work.num = num;
nsr_worker_log(this->name, GF_LOG_INFO,
"finished getattr for gfid %s \n",
@@ -2014,24 +2133,30 @@ data_worker_func(nsr_per_node_worker_t *ctx,
wip, rd->gfid);
dict = dict_new ();
if (!dict) {
+ ctx->result = -1;
GF_ASSERT(0);
nsr_worker_log(this->name, GF_LOG_ERROR,
"failed allocating for dictionary\n");
break;
}
if (dict_set_int32(dict,RECON_TERM_XATTR,ri->work.term)) {
+ ctx->result = -1;
GF_ASSERT(0);
nsr_worker_log(this->name, GF_LOG_ERROR,
"error setting term in dict\n");
break;
}
if (dict_set_int32(dict,RECON_INDEX_XATTR,ri->work.index)) {
+ ctx->result = -1;
GF_ASSERT(0);
nsr_worker_log(this->name, GF_LOG_ERROR,
"error setting term in dict\n");
break;
}
- apply_record(ctx, ri, dict);
+ if (apply_record(ctx, ri, dict) == _gf_false) {
+ nsr_worker_log(this->name, GF_LOG_ERROR,
+ "apply_record fails\n");
+ }
nsr_worker_log(this->name, GF_LOG_INFO,
"finished recon commit for gfid %s \n",
@@ -2041,18 +2166,21 @@ data_worker_func(nsr_per_node_worker_t *ctx,
case NSR_WORK_ID_SINGLE_RECONCILIATION_FLUSH:
dict = dict_new ();
if (!dict) {
+ ctx->result = -1;
GF_ASSERT(0);
nsr_worker_log(this->name, GF_LOG_ERROR,
"failed allocating for dictionary\n");
break;
}
if (dict_set_int32(dict,RECON_TERM_XATTR,ri->work.term)) {
+ ctx->result = -1;
GF_ASSERT(0);
nsr_worker_log(this->name, GF_LOG_ERROR,
"error setting term in dict\n");
break;
}
if (dict_set_int32(dict,RECON_INDEX_XATTR,ri->work.index)) {
+ ctx->result = -1;
GF_ASSERT(0);
nsr_worker_log(this->name, GF_LOG_ERROR,
"error setting term in dict\n");
@@ -2198,7 +2326,9 @@ create_worker_threads(nsr_recon_private_t *priv,
* misc - used to overload such as index.
*/
static void
-send_and_wait(int32_t bm,
+send_and_wait(int32_t *result,
+ int32_t *op_errno,
+ int32_t bm,
uint32_t num,
nsr_recon_driver_ctx_t *ctx,
nsr_recon_work_req_id_t id,
@@ -2208,6 +2338,18 @@ send_and_wait(int32_t bm,
uint32_t i = 0;
nsr_recon_work_t *work;
+#define CONTROL_WORKER(i) ctx->workers[i].control_worker
+#define DATA_WORKER(i) ctx->workers[i].data_worker
+#define WORKER(i) ((q == NSR_RECON_QUEUE_TO_CONTROL) ? (CONTROL_WORKER(i)) : (DATA_WORKER(i)))
+
+ *result = *op_errno = 0;
+
+ for (i=0; i < num; i++) {
+ if ((bm & (1 << i)) && ctx->workers[i].in_use) {
+ WORKER(i)->result = 0;
+ WORKER(i)->op_errno = 0;
+ }
+ }
if (ctx->mode == NSR_SEQ) {
for (i=0; i < num; i++) {
if ((bm & (1 << i)) && ctx->workers[i].in_use) {
@@ -2222,8 +2364,7 @@ send_and_wait(int32_t bm,
}
}
}
- nsr_driver_log(this->name, GF_LOG_INFO, "send_and_wait: all workers have returned\n");
- return;
+ goto out;
}
for (i=0; i < num; i++) {
@@ -2238,41 +2379,40 @@ send_and_wait(int32_t bm,
while (ctx->outstanding) {
pthread_yield();
}
- nsr_driver_log(this->name, GF_LOG_INFO, "send_and_wait: all workers have returned\n");
- return;
-}
-
-#if 0
-static void
-send_and_do_not_wait(int32_t bm,
- uint32_t num,
- nsr_recon_driver_ctx_t *ctx,
- nsr_recon_work_req_id_t id,
- nsr_recon_queue_type_t q,
- int32_t misc)
-{
- uint32_t i = 0;
-
- for (i=0; i < num; i++) {
+out:
+ for (i=0; i < num; i++) {
if ((bm & (1 << i)) && ctx->workers[i].in_use) {
- nsr_recon_work_t *work;
- recon_make_work(&work, id, misc);
- recon_queue_to_worker(ctx, work, i, q);
- }
- }
+ if (WORKER(i)->result == -1) {
+ *result = -1;
+ }
+ }
+ }
+ if (*result == -1) {
+ for (i=0; i < num; i++) {
+ if ((bm & (1 << i)) && ctx->workers[i].in_use) {
+ if (WORKER(i)->op_errno == EAGAIN) {
+ *op_errno = EAGAIN;
+ break;
+ } else {
+ *op_errno = EIO;
+ }
+ }
+ }
+ }
+ nsr_driver_log(this->name, GF_LOG_INFO, "send_and_wait: all workers have returned with result: %d errno:%d\n", *result, *op_errno);
return;
}
-#endif
// send INI or FINI
-static void
+static int32_t
nsr_recon_in_use(nsr_recon_driver_ctx_t *ctx,
uint32_t i,
gf_boolean_t in_use)
{
uint32_t bm = 1 << i;
gf_boolean_t send = _gf_false;
+ int32_t status =0, op_errno = 0;
if (in_use == _gf_false) {
if (ctx->workers[i].in_use == _gf_true)
@@ -2284,25 +2424,28 @@ nsr_recon_in_use(nsr_recon_driver_ctx_t *ctx,
send = _gf_true;
}
}
-#if 1
if (send == _gf_true) {
if (in_use == _gf_true) {
nsr_driver_log(this->name, GF_LOG_INFO, "sending INI to index %d\n",i);
} else {
nsr_driver_log(this->name, GF_LOG_INFO, "sending FINI to index %d\n",i);
}
- send_and_wait(bm, ctx->replica_group_size, ctx,
+ send_and_wait(&status, &op_errno, bm, ctx->replica_group_size, ctx,
(in_use == _gf_true) ? NSR_WORK_ID_INI : NSR_WORK_ID_FINI,
NSR_RECON_QUEUE_TO_CONTROL, -1);
- send_and_wait(bm, ctx->replica_group_size, ctx,
+ if (status == -1)
+ return -1;
+ send_and_wait(&status, &op_errno, bm, ctx->replica_group_size, ctx,
(in_use == _gf_true) ? NSR_WORK_ID_INI : NSR_WORK_ID_FINI,
NSR_RECON_QUEUE_TO_DATA, -1);
+ if (status == -1)
+ return -1;
if (in_use == _gf_false) {
//GF_FREE(ctx->workers[i].recon_info->records);
GF_FREE(ctx->workers[i].recon_info);
}
}
-#endif
+ return 0;
}
// main recon driver thread
@@ -2317,6 +2460,8 @@ nsr_reconciliation_driver(void *arg)
int32_t bm;
xlator_t *this = priv->this;
char *con_name, *data_name;
+ int32_t status = 0;
+ int32_t op_errno = 0;
driver_ctx = &priv->driver_thread_context;
(*driver_ctx) = GF_CALLOC (1,
@@ -2414,7 +2559,12 @@ nsr_reconciliation_driver(void *arg)
list_for_each_entry(rr, &(ctx->role_head.list), list) {
nsr_recon_driver_state_t state;
- state = nsr_recon_driver_get_role(ctx, rr);
+ state = nsr_recon_driver_get_role(&status, ctx, rr);
+
+ if (status == -1) {
+ op_errno = EIO;
+ goto out;
+ }
if (state == leader) {
@@ -2423,11 +2573,13 @@ nsr_reconciliation_driver(void *arg)
nsr_driver_log (this->name, GF_LOG_INFO, "getting last term info from all members of this group\n");
// Get last term info from all members for this group
- send_and_wait(-1,
+ send_and_wait(&status, &op_errno, -1,
replica_group_size,
ctx,
NSR_WORK_ID_GET_LAST_TERM_INFO,
NSR_RECON_QUEUE_TO_CONTROL, ctx->current_term);
+ if (status == -1)
+ goto out;
// compare all the info received and choose the reconciliator
@@ -2469,11 +2621,13 @@ nsr_reconciliation_driver(void *arg)
if (chosen != 0) {
nsr_driver_log (this->name, GF_LOG_INFO, "sending reconciliation work to %d\n", chosen);
bm = 1 << ctx->reconciliator_index;
- send_and_wait(bm,
+ send_and_wait(&status, &op_errno, bm,
replica_group_size,
ctx,
NSR_WORK_ID_RECONCILIATOR_DO_WORK,
NSR_RECON_QUEUE_TO_CONTROL, -1);
+ if (status == -1)
+ goto out;
nsr_driver_log (this->name, GF_LOG_INFO, "finished reconciliation work to %d\n", chosen);
} else {
nsr_driver_log (this->name, GF_LOG_INFO, "local node is reconciliator. before set jmp\n");
@@ -2517,11 +2671,13 @@ nsr_reconciliation_driver(void *arg)
nsr_driver_log (this->name, GF_LOG_INFO, "sending resolution work to all nodes except this node and reconciliator\n");
bm = ~((1 << ctx->reconciliator_index) || 1);
- send_and_wait(bm,
+ send_and_wait(&status, &op_errno, bm,
replica_group_size,
ctx,
NSR_WORK_ID_RESOLUTION_DO_WORK,
NSR_RECON_QUEUE_TO_CONTROL, -1);
+ if (status == -1)
+ goto out;
nsr_driver_log (this->name, GF_LOG_INFO, "finished reconciliation work as leader \n");
@@ -2564,11 +2720,13 @@ i_am_reconciliator:
// We have set the bm in the above for loop where
// we go thru all nodes including this node that
// have seen the last term.
- send_and_wait(bm,
+ send_and_wait(&status, &op_errno, bm,
replica_group_size,
ctx,
NSR_WORK_ID_GET_RECONCILATION_WINDOW,
NSR_RECON_QUEUE_TO_CONTROL, -1);
+ if (status == -1)
+ goto out;
nsr_driver_log (this->name, GF_LOG_INFO,
"finished getting reconciliation window for term %d from %dto %d \n",
ctx->workers[0].recon_info->last_term,
@@ -2604,12 +2762,14 @@ i_am_reconciliator:
bm = (1 << record->work.source);
nsr_driver_log (this->name, GF_LOG_INFO,
"reading data from source %d\n",record->work.source);
- send_and_wait(bm,
+ send_and_wait(&status, &op_errno, bm,
replica_group_size,
ctx,
NSR_WORK_ID_SINGLE_RECONCILIATION_READ,
NSR_RECON_QUEUE_TO_DATA,
i);
+ if (status == -1)
+ goto out;
nsr_driver_log (this->name, GF_LOG_INFO,
"got data from source %d\n",record->work.source);
}
@@ -2618,12 +2778,14 @@ i_am_reconciliator:
"fixing local data as part of reconciliation\n");
bm = 1;
- send_and_wait(bm,
+ send_and_wait(&status, &op_errno, bm,
replica_group_size,
ctx,
NSR_WORK_ID_SINGLE_RECONCILIATION_COMMIT,
NSR_RECON_QUEUE_TO_DATA,
i);
+ if (status == -1)
+ goto out;
nsr_driver_log (this->name, GF_LOG_INFO,
"finished fixing local data as part of reconciliation\n");
@@ -2634,12 +2796,14 @@ i_am_reconciliator:
nsr_driver_log (this->name, GF_LOG_INFO,
"fixing this record as a fill\n");
bm = 1;
- send_and_wait(bm,
+ send_and_wait(&status, &op_errno, bm,
replica_group_size,
ctx,
NSR_WORK_ID_SINGLE_RECONCILIATION_FLUSH,
NSR_RECON_QUEUE_TO_DATA,
i);
+ if (status == -1)
+ goto out;
nsr_driver_log (this->name, GF_LOG_INFO,
"finished fixing this record as a fill\n");
}
@@ -2691,11 +2855,13 @@ i_am_resolutor:
nsr_driver_log (this->name, GF_LOG_INFO,
"getting info from reconciliator for term %d \n", my_info->last_term);
bm = (1 << recon_index);
- send_and_wait(bm,
+ send_and_wait(&status, &op_errno, bm,
replica_group_size,
ctx,
NSR_WORK_ID_GET_GIVEN_TERM_INFO,
NSR_RECON_QUEUE_TO_CONTROL, his_info->last_term);
+ if (status == -1)
+ goto out;
nsr_driver_log (this->name, GF_LOG_INFO,
"finished getting info from reconciliator for term %d \n", my_info->last_term);
@@ -2739,11 +2905,13 @@ i_am_resolutor:
"getting reconciliation window for term %d from %d to %d \n",
my_info->last_term,
my_info->first_index, my_info->last_index);
- send_and_wait(bm,
+ send_and_wait(&status, &op_errno, bm,
replica_group_size,
ctx,
NSR_WORK_ID_GET_RECONCILATION_WINDOW,
NSR_RECON_QUEUE_TO_CONTROL, -1);
+ if (status == -1)
+ goto out;
nsr_driver_log (this->name, GF_LOG_INFO,
"finished getting reconciliation window for term %d from %d to %d \n",
my_info->last_term,
@@ -2771,12 +2939,14 @@ i_am_resolutor:
nsr_driver_log (this->name, GF_LOG_INFO,
"reading data from source %d\n",recon_index);
bm = (1 << recon_index);
- send_and_wait(bm,
+ send_and_wait(&status, &op_errno, bm,
replica_group_size,
ctx,
NSR_WORK_ID_SINGLE_RECONCILIATION_READ,
NSR_RECON_QUEUE_TO_DATA,
i);
+ if (status == -1)
+ goto out;
nsr_driver_log (this->name, GF_LOG_INFO,
"finished reading data from source %d\n",recon_index);
}
@@ -2785,12 +2955,14 @@ i_am_resolutor:
"fixing local data as part of resolutor\n");
bm = 1;
- send_and_wait(bm,
+ send_and_wait(&status, &op_errno, bm,
replica_group_size,
ctx,
NSR_WORK_ID_SINGLE_RECONCILIATION_COMMIT,
NSR_RECON_QUEUE_TO_DATA,
i);
+ if (status == -1)
+ goto out;
nsr_driver_log (this->name, GF_LOG_INFO,
"finished fixing local data as part of resolutor\n");
@@ -2816,21 +2988,25 @@ i_am_resolutor:
nsr_driver_log (this->name, GF_LOG_INFO, "getting last term info from all members of this group\n");
// Get last term info from all members for this group
// which will be the leader(this node) and the node that wants to join.
- send_and_wait(-1,
+ send_and_wait(&status, &op_errno, -1,
replica_group_size,
ctx,
NSR_WORK_ID_GET_LAST_TERM_INFO,
NSR_RECON_QUEUE_TO_CONTROL, ctx->current_term);
+ if (status == -1)
+ goto out;
// send message to other node that just joined to sync up with this node which is also the leader
nsr_driver_log (this->name, GF_LOG_INFO, "sending resolution work to all nodes except this\n");
bm = ~(1);
- send_and_wait(bm,
+ send_and_wait(&status, &op_errno, bm,
replica_group_size,
ctx,
NSR_WORK_ID_RESOLUTION_DO_WORK,
NSR_RECON_QUEUE_TO_CONTROL, -1);
+ if (status == -1)
+ goto out;
nsr_driver_log (this->name, GF_LOG_INFO,
"finished recon work as joiner \n");
@@ -2843,16 +3019,7 @@ i_am_resolutor:
out:
nsr_driver_log (this->name, GF_LOG_INFO,
"sending end of reconciliation message \n");
- nsr_recon_return_back(priv, ctx->txn_id);
-#if 0
- // send message that job is done by writing to local recon translator
- bm = 1;
- send_and_wait(bm,
- replica_group_size,
- ctx,
- NSR_WORK_ID_END_RECONCILIATION,
- NSR_RECON_QUEUE_TO_CONTROL, -1);
-#endif
+ nsr_recon_return_back(priv, ctx->term, status, op_errno);
nsr_driver_log (this->name, GF_LOG_INFO,
"finished sending end of reconciliation message \n");
}