summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--xlators/cluster/nsr-recon/src/recon_driver.c533
-rw-r--r--xlators/cluster/nsr-recon/src/recon_driver.h8
-rw-r--r--xlators/cluster/nsr-recon/src/recon_xlator.c138
-rw-r--r--xlators/cluster/nsr-recon/src/recon_xlator.h4
-rw-r--r--xlators/cluster/nsr-server/src/recon_notify.c43
5 files changed, 468 insertions, 258 deletions
diff --git a/xlators/cluster/nsr-recon/src/recon_driver.c b/xlators/cluster/nsr-recon/src/recon_driver.c
index 7f92b6578..49ee465c5 100644
--- a/xlators/cluster/nsr-recon/src/recon_driver.c
+++ b/xlators/cluster/nsr-recon/src/recon_driver.c
@@ -254,7 +254,7 @@ out:
* Output Arguments:
* buf - where the values are written one after the other (NULL seperated)
*/
-static void
+static int32_t
get_xattr(struct glfs_fd *fd,
char *keys,
char *buf,
@@ -277,7 +277,7 @@ get_xattr(struct glfs_fd *fd,
// TBD - handle error
if (r == -1)
- return;
+ return -1;
// increment the key to next value
keys += len;
@@ -285,7 +285,7 @@ get_xattr(struct glfs_fd *fd,
// increment buf to hold the next key
buf += strlen(buf) + 1;
}
- return;
+ return 0;
}
/*
@@ -296,7 +296,7 @@ get_xattr(struct glfs_fd *fd,
* keys - bunch of NULL seperated key names
* num - number of keys
*/
-static void delete_xattr(struct glfs_fd *fd,
+static int32_t delete_xattr(struct glfs_fd *fd,
dict_t *dict_t,
char *keys,
uint32_t num)
@@ -304,10 +304,11 @@ static void delete_xattr(struct glfs_fd *fd,
while(num--) {
// get the value and copy the value
// TBD - handle failure cases when calling glfs_fremovexattr_with_xdata()
- glfs_fremovexattr_with_xdata(fd, keys, dict_t);
+ if (glfs_fremovexattr_with_xdata(fd, keys, dict_t) == -1)
+ return -1;
keys += strlen(keys) +1;
}
- return;
+ return 0;
}
/*
@@ -320,7 +321,7 @@ static void delete_xattr(struct glfs_fd *fd,
* Each of the key-value is seperated by NULL in turn.
* num - Number of such key value pairs.
*/
-static void
+static int32_t
fill_xattr(struct glfs_fd *fd,
dict_t *dict,
char *buf,
@@ -336,10 +337,10 @@ fill_xattr(struct glfs_fd *fd,
// TBD - handle failure cases when calling glfs_fsetxattr_with_xdata()
r = glfs_fsetxattr_with_xdata(fd, k, val, strlen(val), 0, dict);
if (r == -1)
- return;
+ return -1;
k = val + strlen(val) + 1;
}
- return;
+ return 0;
}
/*
@@ -408,7 +409,7 @@ nsr_recon_get_file(char *vol, nsr_replica_worker_t *worker)
* ctx - The per worker based context
* control - set to true if this worker is for the control plane
*/
-static int
+static int32_t
nsr_recon_start_work(nsr_per_node_worker_t *ctx,
gf_boolean_t control)
{
@@ -497,6 +498,7 @@ nsr_recon_start_work(nsr_per_node_worker_t *ctx,
if (aux_fd == NULL) {
nsr_worker_log(this->name, GF_LOG_ERROR,
"cannot open aux log file for thread %s\n",ctx->id);
+ return -1;
} else {
nsr_worker_log(this->name, GF_LOG_ERROR,
"---opened aux log file for thread %s\n",ctx->id);
@@ -517,7 +519,7 @@ nsr_recon_start_work(nsr_per_node_worker_t *ctx,
* ctx - The per worker based context
* control - set to true if this worker is for the control plane
*/
-static int
+static int32_t
nsr_recon_end_work(nsr_per_node_worker_t *ctx,
gf_boolean_t control)
{
@@ -648,13 +650,7 @@ control_worker_func_0(nsr_per_node_worker_t *ctx,
nsr_worker_log(this->name, GF_LOG_INFO,
"this message should not be sent \n");
- break;
- }
- case NSR_WORK_ID_END_RECONCILIATION:
- {
- nsr_worker_log(this->name, GF_LOG_INFO,
- "sending reconciliation end message to node %d\n", index);
- nsr_recon_return_back(priv, dr->txn_id);
+ ctx->result = -1;
break;
}
case NSR_WORK_ID_GET_RECONCILATION_WINDOW:
@@ -676,16 +672,29 @@ control_worker_func_0(nsr_per_node_worker_t *ctx,
rd = GF_CALLOC(num,
sizeof(nsr_recon_record_details_t),
gf_mt_recon_private_t);
+ if (rd == NULL) {
+ ctx->result = -1;
+ return;
+ }
+
recon_info->records = GF_CALLOC(num,
sizeof(nsr_reconciliation_record_t),
gf_mt_recon_private_t);
+ if (recon_info->records == NULL) {
+ ctx->result = -1;
+ return;
+ }
// TBD - handle errors
- nsr_recon_libchangelog_get_records(this, priv->changelog_base_path,
+ if (nsr_recon_libchangelog_get_records(this, priv->changelog_base_path,
recon_info->last_term,
recon_info->first_index,
recon_info->last_index,
- rd);
+ rd) == _gf_false) {
+ ctx->result = -1;
+ return;
+ }
+
// The above function writes into rd from 0 to (num -1)
// We need to take care of this whenever we deal with records
for (i=0; i < num; i++) {
@@ -778,7 +787,10 @@ control_worker_func(nsr_per_node_worker_t *ctx,
"calling nsr_recon_start_work\n");
// TBD - handle error in case nsr_recon_start_work gives error
- nsr_recon_start_work(ctx, _gf_true);
+ if (nsr_recon_start_work(ctx, _gf_true) != 0) {
+ ctx->result = -1;
+ return;
+ }
nsr_worker_log(this->name, GF_LOG_INFO,
"finished nsr_recon_start_work\n");
@@ -790,7 +802,10 @@ control_worker_func(nsr_per_node_worker_t *ctx,
"calling nsr_recon_end_work\n");
// TBD - handle error in case nsr_recon_end_work gives error
- nsr_recon_end_work(ctx, _gf_true);
+ if (nsr_recon_end_work(ctx, _gf_true) != 0) {
+ ctx->result = -1;
+ return;
+ }
nsr_worker_log(this->name, GF_LOG_INFO,
"finished nsr_recon_end_work\n");
@@ -807,9 +822,18 @@ control_worker_func(nsr_per_node_worker_t *ctx,
// first write the current term term number
// TBD - error handling for all the glfs APIs
- glfs_lseek(ctx->aux_fd, nsr_recon_xlator_sector_4, SEEK_SET);
- glfs_write(ctx->aux_fd, &term, sizeof(term), 0);
- glfs_read(ctx->aux_fd, &lt, sizeof(lt), 0);
+ if (glfs_lseek(ctx->aux_fd, nsr_recon_xlator_sector_4, SEEK_SET) == -1) {
+ ctx->result = -1;
+ return;
+ }
+ if (glfs_write(ctx->aux_fd, &term, sizeof(term), 0) == -1) {
+ ctx->result = -1;
+ return;
+ }
+ if (glfs_read(ctx->aux_fd, &lt, sizeof(lt), 0) == -1) {
+ ctx->result = -1;
+ return;
+ }
ENDIAN_CONVERSION_LT(lt, _gf_true); //ntohl
recon_info->last_term = lt.last_term;
recon_info->commited_ops = lt.commited_ops;
@@ -834,9 +858,18 @@ control_worker_func(nsr_per_node_worker_t *ctx,
// first write the term number
// TBD - error handling for all the glfs APIs
- glfs_lseek(ctx->aux_fd, nsr_recon_xlator_sector_3, SEEK_SET);
- glfs_write(ctx->aux_fd, &term, sizeof(term), 0);
- glfs_read(ctx->aux_fd, &lt, sizeof(lt), 0);
+ if (glfs_lseek(ctx->aux_fd, nsr_recon_xlator_sector_3, SEEK_SET) == -1) {
+ ctx->result = -1;
+ return;
+ }
+ if (glfs_write(ctx->aux_fd, &term, sizeof(term), 0) == -1) {
+ ctx->result = -1;
+ return;
+ }
+ if (glfs_read(ctx->aux_fd, &lt, sizeof(lt), 0) == -1) {
+ ctx->result = -1;
+ return;
+ }
ENDIAN_CONVERSION_LT(lt, _gf_true); //ntohl
recon_info->last_term = lt.last_term;
recon_info->commited_ops = lt.commited_ops;
@@ -863,9 +896,12 @@ control_worker_func(nsr_per_node_worker_t *ctx,
"trying to make this index %d as reconciliator for term %d\n", index, term);
// TBD - error handling for all the glfs APIs
- glfs_lseek(ctx->aux_fd,
+ if (glfs_lseek(ctx->aux_fd,
nsr_recon_xlator_sector_1,
- SEEK_SET);
+ SEEK_SET) == -1) {
+ ctx->result = -1;
+ return;
+ }
// We have all the info for all other nodes.
// Fill all that info when sending data to that process.
@@ -888,7 +924,15 @@ control_worker_func(nsr_per_node_worker_t *ctx,
rr.num = num;
rr.role = reconciliator;
ENDIAN_CONVERSION_RR(rr, _gf_false); //htonl
- glfs_write(ctx->aux_fd, &rr, sizeof(rr), 0);
+ if (glfs_write(ctx->aux_fd, &rr, sizeof(rr), 0) == -1) {
+ ctx->result = -1;
+ // Put the errno only for this case since we
+ // are bothered about retrying only for this case.
+ // For rest of the cases we will just return EIO
+ // in errno.
+ ctx->op_errno = errno;
+ return;
+ }
nsr_worker_log(this->name, GF_LOG_INFO,
"sent reconciliator info for term %d with node count as %d\n", term, num);
@@ -905,14 +949,18 @@ control_worker_func(nsr_per_node_worker_t *ctx,
"trying to make this index %d as resolutor with reconciliator as %d\n",index, rec);
// TBD - error handling for all the glfs APIs
- glfs_lseek(ctx->aux_fd,
+ if (glfs_lseek(ctx->aux_fd,
nsr_recon_xlator_sector_1,
- SEEK_SET);
+ SEEK_SET) == -1) {
+ ctx->result = -1;
+ return;
+ }
+
rr.num = 2;
// Fill in info[0] as info for the node for which we are seeking resolution.
// Fill in info[1] as info of the reconciliator node.
- // The function nsr_recon_driver_set_role() that will be called when
+ // The function nsr_recon_driver_get_role() that will be called when
// this message reaches the node will look at index 1 for term information
// related to the reconciliator.
for (i=0; i < 2; i++) {
@@ -943,33 +991,21 @@ control_worker_func(nsr_per_node_worker_t *ctx,
}
rr.role = resolutor;
ENDIAN_CONVERSION_RR(rr, _gf_false); //htonl
- glfs_write(ctx->aux_fd, &rr, sizeof(rr), 0);
+ if (glfs_write(ctx->aux_fd, &rr, sizeof(rr), 0) == -1) {
+ ctx->result = -1;
+ // Put the errno only for this case since we
+ // are bothered about retrying only for this case.
+ // For rest of the cases we will just return EIO
+ // in errno.
+ ctx->op_errno = errno;
+ return;
+ }
nsr_worker_log(this->name, GF_LOG_INFO,
"sent message to this node %d resolutor with reconciliator as %d\n", index, rec);
break;
}
- case NSR_WORK_ID_END_RECONCILIATION:
- {
- char c[4];
- uint32_t old = htonl(dr->txn_id);
-
- nsr_worker_log(this->name, GF_LOG_INFO,
- "sending reconciliation end message to node %d\n", index);
-
- memcpy(c, &old, sizeof(uint32_t));
- // TBD - error handling for all the glfs APIs
- glfs_lseek(ctx->aux_fd,
- nsr_recon_xlator_sector_0,
- SEEK_SET);
- glfs_write(ctx->aux_fd, c, sizeof(c), 0);
-
- nsr_worker_log(this->name, GF_LOG_INFO,
- "finished sending reconciliation end message to node %d\n", index);
-
- break;
- }
case NSR_WORK_ID_GET_RECONCILATION_WINDOW:
{
nsr_recon_log_info_t li;
@@ -985,24 +1021,42 @@ control_worker_func(nsr_per_node_worker_t *ctx,
// TBD - error handling for all the glfs APIs
- glfs_lseek(ctx->aux_fd, nsr_recon_xlator_sector_2, SEEK_SET);
+ if (glfs_lseek(ctx->aux_fd, nsr_recon_xlator_sector_2, SEEK_SET) == -1) {
+ ctx->result = -1;
+ return;
+ }
// write to node what term & indices we are interested
li.term = recon_info->last_term;
li.first_index = recon_info->first_index;
li.last_index = recon_info->last_index;
ENDIAN_CONVERSION_LI(li, _gf_false); //htonl
- glfs_write(ctx->aux_fd, &li, sizeof(li), 0);
+ if (glfs_write(ctx->aux_fd, &li, sizeof(li), 0) == -1) {
+ ctx->result = -1;
+ return;
+ }
// then read
rd = GF_CALLOC(num,
sizeof(nsr_recon_record_details_t),
gf_mt_recon_private_t);
+ if (rd == NULL) {
+ ctx->result = -1;
+ return;
+ }
recon_info->records = GF_CALLOC(num,
sizeof(nsr_reconciliation_record_t),
gf_mt_recon_private_t);
+ if (recon_info->records == NULL) {
+ ctx->result = -1;
+ return;
+ }
+
+ if (glfs_read(ctx->aux_fd, rd, num * sizeof(nsr_recon_record_details_t), 0) == -1) {
+ ctx->result = -1;
+ return;
+ }
- glfs_read(ctx->aux_fd, rd, num * sizeof(nsr_recon_record_details_t), 0);
for (i=0; i < num; i++) {
ENDIAN_CONVERSION_RD(rd[i], _gf_true); //ntohl
memcpy(&(recon_info->records[i].rec),
@@ -1132,7 +1186,7 @@ compute_reconciliation_work(nsr_recon_driver_ctx_t *ctx)
return;
}
-static void
+static int32_t
nsr_recon_in_use(nsr_recon_driver_ctx_t *ctx,
uint32_t i,
gf_boolean_t in_use);
@@ -1145,14 +1199,14 @@ nsr_recon_in_use(nsr_recon_driver_ctx_t *ctx,
gf_boolean_t
nsr_recon_driver_set_role(nsr_recon_driver_ctx_t *ctx,
nsr_recon_role_t *rr,
- uint32_t txn_id)
+ uint32_t term)
{
nsr_role_work_t *rw;
nsr_driver_log(this->name, GF_LOG_INFO, "set role called \n");
rw = GF_CALLOC(1, sizeof (nsr_role_work_t), 0);
memcpy(&rw->role, rr, sizeof(nsr_recon_role_t));
- rw->txn_id = txn_id;
+ rw->term = term;
INIT_LIST_HEAD(&(rw->list));
pthread_mutex_lock(&(ctx->mutex));
list_add_tail(&rw->list, &ctx->role_head.list);
@@ -1176,15 +1230,11 @@ nsr_recon_driver_set_role(nsr_recon_driver_ctx_t *ctx,
* If resolution to be done, then rr.info[0] will have this node's info
* which the leader would have got earlier. rr[1].info will have the
* info regarding the reconciliator.
- * txn_id - All role changes(except when leader becomes reconciliator or resolutor)
- * would be initiated as write to the recon xlator which would have got a frame from
- * either the brick process(leader change) or other reconciliation process.
- * The write function would return immediately after storing the frame which
- * needs to be returned back after the actual reconciliation is done.
- * For that we store the frame against this id which acts as a key.
+ * term - leader's term that is causing this role
*/
nsr_recon_driver_state_t
-nsr_recon_driver_get_role(nsr_recon_driver_ctx_t *ctx,
+nsr_recon_driver_get_role(int32_t *status,
+ nsr_recon_driver_ctx_t *ctx,
nsr_role_work_t *rw)
{
uint8_t i=0, j=0;
@@ -1192,17 +1242,24 @@ nsr_recon_driver_get_role(nsr_recon_driver_ctx_t *ctx,
// First make all the threads uninitialise
for (i = 0; i < ctx->replica_group_size; i++) {
- nsr_recon_in_use(ctx, i, _gf_false);
+ if (nsr_recon_in_use(ctx, i, _gf_false) == -1) {
+ *status = -1;
+ return 0;
+ }
}
if ((rr->role == leader) || (rr->role == joiner)) {
// First set info this node
- nsr_recon_in_use(ctx, 0, _gf_true);
+ if (nsr_recon_in_use(ctx, 0, _gf_true) == -1) {
+ *status = -1;
+ return 0;
+ }
ctx->workers[0].recon_info = GF_CALLOC (1,
sizeof (nsr_reconciliator_info_t),
gf_mt_recon_private_t);
if (!ctx->workers[0].recon_info) {
- return _gf_false;
+ *status = -1;
+ return 0;
}
ctx->current_term = rr->current_term;
@@ -1213,11 +1270,14 @@ nsr_recon_driver_get_role(nsr_recon_driver_ctx_t *ctx,
if (!strcmp(ctx->workers[i].name, rr->info[j].name)) {
//if (strstr(ctx->workers[i].name, rr->info[j].name)) {
nsr_driver_log(this->name, GF_LOG_INFO,
- "nsr_recon_driver_set_role: this as %s. found other server %s\n",
+ "nsr_recon_driver_get_role: this as %s. found other server %s\n",
(rr->role == leader) ? "leader" : "joiner",
ctx->workers[i].name);
- nsr_recon_in_use(ctx, i, _gf_true);
+ if (nsr_recon_in_use(ctx, i, _gf_true) == -1) {
+ *status = -1;
+ return 0;
+ }
// Allocate this here. This will get later filled when
// the leader tries to get last term information from all
// the nodes
@@ -1225,7 +1285,8 @@ nsr_recon_driver_get_role(nsr_recon_driver_ctx_t *ctx,
sizeof (nsr_reconciliator_info_t),
gf_mt_recon_private_t);
if (!ctx->workers[i].recon_info) {
- return _gf_false;
+ *status = -1;
+ return 0;
}
break;
}
@@ -1245,13 +1306,14 @@ nsr_recon_driver_get_role(nsr_recon_driver_ctx_t *ctx,
if (!strcmp(rr->info[i].name, ctx->workers[j].name)) {
//if (strstr(ctx->workers[j].name, rr->info[i].name)) {
nsr_driver_log(this->name, GF_LOG_INFO,
- "nsr_recon_driver_set_role: this as reconciliator. found other server %s\n",
+ "nsr_recon_driver_get_role: this as reconciliator. found other server %s\n",
ctx->workers[j].name);
ctx->workers[j].recon_info = GF_CALLOC (1,
sizeof (nsr_reconciliator_info_t),
gf_mt_recon_private_t);
if (!ctx->workers[j].recon_info) {
- return _gf_false;
+ *status = -1;
+ return 0;
}
ctx->workers[j].recon_info->last_term =
rr->info[i].last_term;
@@ -1261,7 +1323,10 @@ nsr_recon_driver_get_role(nsr_recon_driver_ctx_t *ctx,
rr->info[i].last_index;
ctx->workers[j].recon_info->first_index =
rr->info[i].first_index;
- nsr_recon_in_use(ctx, j, _gf_true);
+ if (nsr_recon_in_use(ctx, j, _gf_true) == -1) {
+ *status = -1;
+ return 0;
+ }
break;
}
}
@@ -1272,13 +1337,14 @@ nsr_recon_driver_get_role(nsr_recon_driver_ctx_t *ctx,
//if (strstr(ctx->workers[j].name, rr->info[1].name)) {
if (!strcmp(rr->info[1].name, ctx->workers[j].name)) {
nsr_driver_log(this->name, GF_LOG_INFO,
- "nsr_recon_driver_set_role: this as resolutor. found other server %s as reconciliator\n",
- ctx->workers[1].name);
+ "nsr_recon_driver_get_role: this as resolutor. found other server %s as reconciliator\n",
+ ctx->workers[j].name);
ctx->workers[j].recon_info = GF_CALLOC (1,
sizeof (nsr_reconciliator_info_t),
gf_mt_recon_private_t);
if (!ctx->workers[j].recon_info) {
- return _gf_false;
+ *status = -1;
+ return 0;
}
ctx->workers[j].recon_info->last_term =
rr->info[1].last_term;
@@ -1289,7 +1355,10 @@ nsr_recon_driver_get_role(nsr_recon_driver_ctx_t *ctx,
ctx->workers[j].recon_info->first_index =
rr->info[1].first_index;
ctx->reconciliator_index = j;
- nsr_recon_in_use(ctx, j, _gf_true);
+ if (nsr_recon_in_use(ctx, j, _gf_true) == -1) {
+ *status = -1;
+ return 0;
+ }
GF_ASSERT(ctx->reconciliator_index != 0);
break;
}
@@ -1298,18 +1367,23 @@ nsr_recon_driver_get_role(nsr_recon_driver_ctx_t *ctx,
sizeof (nsr_reconciliator_info_t),
gf_mt_recon_private_t);
if (!ctx->workers[0].recon_info) {
- return _gf_false;
+ *status = -1;
+ return 0;
}
// info[0] has all info for this node
ctx->workers[0].recon_info->last_term = rr->info[0].last_term;
ctx->workers[0].recon_info->commited_ops = rr->info[0].commited_ops;
ctx->workers[0].recon_info->last_index = rr->info[0].last_index;
ctx->workers[0].recon_info->first_index = rr->info[0].first_index;
- nsr_recon_in_use(ctx, 0, _gf_true);
+ if (nsr_recon_in_use(ctx, 0, _gf_true) == -1) {
+ *status = -1;
+ return 0;
+ }
}
- ctx->txn_id = rw->txn_id;
+ ctx->term = rw->term;
+ *status = 0;
return rr->role;
}
@@ -1401,7 +1475,7 @@ create_obj(nsr_per_node_worker_t *ctx, char *gfid_str)
* and the changelog translator consumes term and index.
*/
-static void
+static gf_boolean_t
apply_record(nsr_per_node_worker_t *ctx,
nsr_reconciliation_record_t *ri,
dict_t * dict)
@@ -1420,10 +1494,11 @@ apply_record(nsr_per_node_worker_t *ctx,
// TBD - get a way to just stuff the log entry without writing the data so that
// changelogs remain identical.
if (ri->work.data == NULL) {
- return;
+ return _gf_true;
}
- if ((obj = create_obj(ctx,ri->rec.gfid)) == NULL) return;
+ if ((obj = create_obj(ctx,ri->rec.gfid)) == NULL)
+ return _gf_false;;
fd = glfs_h_open_with_xdata(ctx->fs, obj, O_RDWR, dict);
if (fd == NULL) {
@@ -1431,23 +1506,28 @@ apply_record(nsr_per_node_worker_t *ctx,
nsr_worker_log(this->name, GF_LOG_ERROR,
"open for file %s failed\n",
ri->rec.gfid);
- return;
+ return _gf_false;
}
if (glfs_lseek_with_xdata(fd, ri->rec.offset, SEEK_SET, dict) != ri->rec.offset) {
GF_ASSERT(0);
nsr_worker_log(this->name, GF_LOG_ERROR,
"lseek for file %s failed at offset %d\n",
ri->rec.gfid, ri->rec.offset);
- return;
+ return _gf_false;
}
if (glfs_write_with_xdata(fd, ri->work.data, ri->rec.len, 0, dict) != ri->rec.len) {
GF_ASSERT(0);
nsr_worker_log(this->name, GF_LOG_ERROR,
"write for file %s failed for bytes %d\n",
ri->rec.gfid, ri->rec.len);
- return;
+ return _gf_false;
}
- glfs_close_with_xdata(fd, dict);
+ if (glfs_close_with_xdata(fd, dict) == -1) {
+ GF_ASSERT(0);
+ nsr_worker_log(this->name, GF_LOG_ERROR,
+ "close failed\n");
+ return _gf_false;
+ }
nsr_worker_log(this->name, GF_LOG_INFO,
"Finished DOing write for gfid %s @offset %d for len %d\n",
@@ -1459,7 +1539,7 @@ apply_record(nsr_per_node_worker_t *ctx,
"DOing truncate for file %s @offset %d \n",
ri->rec.gfid, ri->rec.offset);
- if ((obj = create_obj(ctx, ri->rec.gfid)) == NULL) return;
+ if ((obj = create_obj(ctx, ri->rec.gfid)) == NULL) return _gf_false;
fd = glfs_h_open_with_xdata(ctx->fs, obj, O_RDWR, dict);
if (fd == NULL) {
@@ -1467,16 +1547,21 @@ apply_record(nsr_per_node_worker_t *ctx,
nsr_worker_log(this->name, GF_LOG_ERROR,
"open for file %s failed\n",
ri->rec.gfid);
- return;
+ return _gf_false;
}
if (glfs_ftruncate_with_xdata(fd, ri->rec.offset, dict) == -1) {
GF_ASSERT(0);
nsr_worker_log(this->name, GF_LOG_ERROR,
"trunctae for file %s failed @offset %d\n",
ri->rec.gfid,ri->rec.offset );
- return;
+ return _gf_false;
}
- glfs_close_with_xdata(fd, dict);
+ if (glfs_close_with_xdata(fd, dict) == -1) {
+ GF_ASSERT(0);
+ nsr_worker_log(this->name, GF_LOG_ERROR,
+ "close failed\n");
+ return _gf_false;
+ }
nsr_worker_log(this->name, GF_LOG_INFO,
"Finished DOing truncate for gfid %s @offset %d \n",
@@ -1499,10 +1584,10 @@ apply_record(nsr_per_node_worker_t *ctx,
// TBD - get a way to just stuff the log entry without writing the data so that
// changelogs remain identical.
if (ri->work.data == NULL) {
- return;
+ return _gf_true;
}
- if ((obj = create_obj(ctx, ri->rec.gfid)) == NULL) return;
+ if ((obj = create_obj(ctx, ri->rec.gfid)) == NULL) return _gf_false;
if (obj->inode->ia_type == IA_IFDIR)
fd = glfs_h_opendir_with_xdata(ctx->fs, obj, dict);
@@ -1513,7 +1598,7 @@ apply_record(nsr_per_node_worker_t *ctx,
nsr_worker_log(this->name, GF_LOG_ERROR,
"open for file %s failed\n",
ri->rec.gfid);
- return;
+ return _gf_false;
}
if(get_xattr_total_size(fd, &t_b, &k_s, &v_s, &num, dict) == -1) {
@@ -1521,10 +1606,15 @@ apply_record(nsr_per_node_worker_t *ctx,
GF_ASSERT(0);
nsr_worker_log(this->name, GF_LOG_ERROR,
"list of xattr of %s failed\n", ri->rec.gfid);
- return;
+ return _gf_false;
}
- delete_xattr(fd, dict, t_b, num);
+ if (delete_xattr(fd, dict, t_b, num) == -1) {
+ GF_ASSERT(0);
+ nsr_worker_log(this->name, GF_LOG_ERROR,
+ "deleting xattrs failed\n");
+ return _gf_false;
+ }
// Set one special dict flag to indicate the opcode so that
// the opcode gets set to this
@@ -1532,12 +1622,22 @@ apply_record(nsr_per_node_worker_t *ctx,
GF_ASSERT(0);
nsr_worker_log(this->name, GF_LOG_ERROR,
"setting opcode to %d failed\n",ri->rec.op);
- return;
+ return _gf_false;
}
- fill_xattr(fd, dict, ri->work.data, ri->work.num);
+ if (fill_xattr(fd, dict, ri->work.data, ri->work.num) == -1) {
+ GF_ASSERT(0);
+ nsr_worker_log(this->name, GF_LOG_ERROR,
+ "filling xattrs failed\n");
+ return _gf_false;
+ }
- glfs_close_with_xdata(fd, dict);
+ if (glfs_close_with_xdata(fd, dict) == -1) {
+ GF_ASSERT(0);
+ nsr_worker_log(this->name, GF_LOG_ERROR,
+ "close failed\n");
+ return _gf_false;
+ }
nsr_worker_log(this->name, GF_LOG_INFO,
"Finsihed Doing set extended attr for %s \n",
@@ -1553,7 +1653,7 @@ apply_record(nsr_per_node_worker_t *ctx,
// TBD - add mode and flags later
uuid_parse(ri->rec.gfid, gfid);
- if ((obj = create_obj(ctx, ri->rec.pargfid)) == NULL) return;
+ if ((obj = create_obj(ctx, ri->rec.pargfid)) == NULL) return _gf_false;
nsr_worker_log (this->name, GF_LOG_INFO,
"creating with mode 0%o", ri->rec.mode);
@@ -1562,7 +1662,7 @@ apply_record(nsr_per_node_worker_t *ctx,
nsr_worker_log(this->name, GF_LOG_ERROR,
"Failure for Doing create for file %s\n",
ri->rec.entry);
- return;
+ return _gf_false;
}
nsr_worker_log(this->name, GF_LOG_INFO,
@@ -1579,14 +1679,14 @@ apply_record(nsr_per_node_worker_t *ctx,
// TBD - add mode and flags later
uuid_parse(ri->rec.gfid, gfid);
- if ((obj = create_obj(ctx, ri->rec.pargfid)) == NULL) return;
+ if ((obj = create_obj(ctx, ri->rec.pargfid)) == NULL) return _gf_false;
if (glfs_h_mknod_with_xdata(ctx->fs, obj, ri->rec.entry, O_RDWR, 0777, NULL, gfid, dict) == NULL) {
GF_ASSERT(0);
nsr_worker_log(this->name, GF_LOG_ERROR,
"Failure for Doing mknod for file %s\n",
ri->rec.entry);
- return;
+ return _gf_false;
}
nsr_worker_log(this->name, GF_LOG_INFO,
@@ -1603,14 +1703,14 @@ apply_record(nsr_per_node_worker_t *ctx,
// TBD - add mode and flags later
uuid_parse(ri->rec.gfid, gfid);
- if ((obj = create_obj(ctx, ri->rec.pargfid)) == NULL) return;
+ if ((obj = create_obj(ctx, ri->rec.pargfid)) == NULL) return _gf_false;
if (glfs_h_mkdir_with_xdata(ctx->fs, obj, ri->rec.entry, 0777, NULL, gfid, dict) != 0) {
GF_ASSERT(0);
nsr_worker_log(this->name, GF_LOG_ERROR,
"Failure for Doing mkdir for file %s\n",
ri->rec.entry);
- return;
+ return _gf_false;
}
nsr_worker_log(this->name, GF_LOG_INFO,
@@ -1623,13 +1723,13 @@ apply_record(nsr_per_node_worker_t *ctx,
"Doing rmdir/ublink for dir %s \n",
ri->rec.entry);
- if ((obj = create_obj(ctx, ri->rec.pargfid)) == NULL) return;
+ if ((obj = create_obj(ctx, ri->rec.pargfid)) == NULL) return _gf_false;
if (glfs_h_unlink_with_xdata(ctx->fs, obj, ri->rec.entry, dict) != 0) {
GF_ASSERT(0);
nsr_worker_log(this->name, GF_LOG_ERROR,
"Failure for Doing rmdir/unlink for file %s\n",
ri->rec.entry);
- return;
+ return _gf_false;
}
nsr_worker_log(this->name, GF_LOG_INFO,
@@ -1644,7 +1744,7 @@ apply_record(nsr_per_node_worker_t *ctx,
"Doing symlink for file %s to file %s \n",
ri->rec.entry, ri->rec.link_path);
- if ((obj = create_obj(ctx, ri->rec.pargfid)) == NULL) return;
+ if ((obj = create_obj(ctx, ri->rec.pargfid)) == NULL) return _gf_false;
uuid_parse(ri->rec.gfid, gfid);
if (glfs_h_symlink_with_xdata(ctx->fs, obj, ri->rec.entry, ri->rec.link_path, NULL, gfid, dict) == NULL) {
@@ -1652,7 +1752,7 @@ apply_record(nsr_per_node_worker_t *ctx,
nsr_worker_log(this->name, GF_LOG_ERROR,
"Failed to Doing symlink for file %s to file %s \n",
ri->rec.entry, ri->rec.link_path);
- return;
+ return _gf_false;
}
nsr_worker_log(this->name, GF_LOG_INFO,
@@ -1667,15 +1767,15 @@ apply_record(nsr_per_node_worker_t *ctx,
"Doing hard link for file %s to file %s \n",
ri->rec.entry, ri->rec.gfid);
- if ((obj = create_obj(ctx, ri->rec.pargfid)) == NULL) return;
- if ((to_obj = create_obj(ctx, ri->rec.gfid)) == NULL) return;
+ if ((obj = create_obj(ctx, ri->rec.pargfid)) == NULL) return _gf_false;
+ if ((to_obj = create_obj(ctx, ri->rec.gfid)) == NULL) return _gf_false;
if (glfs_h_link_with_xdata(ctx->fs, to_obj, obj, ri->rec.entry, dict) == -1) {
GF_ASSERT(0);
nsr_worker_log(this->name, GF_LOG_ERROR,
"Failed to Doing hard link for file %s to file %s \n",
ri->rec.entry, ri->rec.gfid);
- return;
+ return _gf_false;
}
nsr_worker_log(this->name, GF_LOG_INFO,
@@ -1690,15 +1790,15 @@ apply_record(nsr_per_node_worker_t *ctx,
"Doing rename for file %s to file %s \n",
ri->rec.entry, ri->rec.newloc);
- if ((obj = create_obj(ctx, ri->rec.pargfid)) == NULL) return;
- if ((to_obj = create_obj(ctx, ri->rec.gfid)) == NULL) return;
+ if ((obj = create_obj(ctx, ri->rec.pargfid)) == NULL) return _gf_false;
+ if ((to_obj = create_obj(ctx, ri->rec.gfid)) == NULL) return _gf_false;
if (glfs_h_rename_with_xdata(ctx->fs, obj, ri->rec.entry, to_obj, ri->rec.newloc, dict) == -1) {
GF_ASSERT(0);
nsr_worker_log(this->name, GF_LOG_ERROR,
"Failed to Doing rename for file %s to file %s \n",
ri->rec.entry, ri->rec.newloc);
- return;
+ return _gf_false;
}
nsr_worker_log(this->name, GF_LOG_INFO,
@@ -1719,7 +1819,7 @@ apply_record(nsr_per_node_worker_t *ctx,
"Doing attr for file %s \n",
ri->rec.gfid);
- if ((obj = create_obj(ctx, ri->rec.gfid)) == NULL) return;
+ if ((obj = create_obj(ctx, ri->rec.gfid)) == NULL) return _gf_false;
if (obj->inode->ia_type == IA_IFDIR)
fd = glfs_h_opendir_with_xdata(ctx->fs, obj, dict);
@@ -1730,7 +1830,7 @@ apply_record(nsr_per_node_worker_t *ctx,
nsr_worker_log(this->name, GF_LOG_ERROR,
"open for file %s failed\n",
ri->rec.gfid);
- return;
+ return _gf_false;
}
iatt.ia_prot = ia_prot_from_st_mode(777);
@@ -1743,7 +1843,7 @@ apply_record(nsr_per_node_worker_t *ctx,
GF_ASSERT(0);
nsr_worker_log(this->name, GF_LOG_ERROR,
"setting opcode to %d failed\n",ri->rec.op);
- return;
+ return _gf_false;
}
ret = glfs_fsetattr_with_xdata(fd, &iatt, valid, dict);
@@ -1752,17 +1852,22 @@ apply_record(nsr_per_node_worker_t *ctx,
nsr_worker_log(this->name, GF_LOG_INFO,
"failed Doing attr for file %s \n",
ri->rec.gfid);
- return;
+ return _gf_false;
}
- glfs_close_with_xdata(fd, dict);
+ if (glfs_close_with_xdata(fd, dict) == -1) {
+ GF_ASSERT(0);
+ nsr_worker_log(this->name, GF_LOG_ERROR,
+ "close failed\n");
+ return _gf_false;
+ }
nsr_worker_log(this->name, GF_LOG_INFO,
"Doing attr for file %s \n",
ri->rec.gfid);
}
- return;
+ return _gf_true;
}
//return back opcodes that requires reading from source
@@ -1816,7 +1921,10 @@ data_worker_func(nsr_per_node_worker_t *ctx,
nsr_worker_log(this->name, GF_LOG_INFO,
"started data ini \n");
- nsr_recon_start_work(ctx, _gf_false);
+ if (nsr_recon_start_work(ctx, _gf_false) != 0) {
+ ctx->result = -1;
+ return;
+ }
nsr_worker_log(this->name, GF_LOG_INFO,
"finished data ini \n");
@@ -1825,7 +1933,10 @@ data_worker_func(nsr_per_node_worker_t *ctx,
nsr_worker_log(this->name, GF_LOG_INFO,
"started data fini \n");
- nsr_recon_end_work(ctx, _gf_false);
+ if (nsr_recon_end_work(ctx, _gf_false) != 0) {
+ ctx->result = -1;
+ return;
+ }
nsr_worker_log(this->name, GF_LOG_INFO,
"finished data fini \n");
@@ -1838,18 +1949,21 @@ data_worker_func(nsr_per_node_worker_t *ctx,
dict = dict_new ();
if (!dict) {
+ ctx->result = -1;
GF_ASSERT(0);
nsr_worker_log(this->name, GF_LOG_ERROR,
"failed allocating for dictionary\n");
break;
}
if (dict_set_int32(dict,RECON_TERM_XATTR,ri->work.term)) {
+ ctx->result = -1;
GF_ASSERT(0);
nsr_worker_log(this->name, GF_LOG_ERROR,
"error setting term in dict\n");
break;
}
if (dict_set_int32(dict,RECON_INDEX_XATTR,ri->work.index)) {
+ ctx->result = -1;
GF_ASSERT(0);
nsr_worker_log(this->name, GF_LOG_ERROR,
"error setting term in dict\n");
@@ -1978,7 +2092,12 @@ data_worker_func(nsr_per_node_worker_t *ctx,
}
ri->work.data = GF_CALLOC ((k_s + v_s) , sizeof(char),
gf_mt_recon_private_t);
- get_xattr(fd, t_b, ri->work.data, v_s, num, dict);
+ if (get_xattr(fd, t_b, ri->work.data, v_s, num, dict) == -1) {
+ GF_ASSERT(0);
+ nsr_worker_log(this->name, GF_LOG_ERROR,
+ "get xattr of gfid %s failed\n", rd->gfid);
+ break;
+ }
ri->work.num = num;
nsr_worker_log(this->name, GF_LOG_INFO,
"finished getattr for gfid %s \n",
@@ -2014,24 +2133,30 @@ data_worker_func(nsr_per_node_worker_t *ctx,
wip, rd->gfid);
dict = dict_new ();
if (!dict) {
+ ctx->result = -1;
GF_ASSERT(0);
nsr_worker_log(this->name, GF_LOG_ERROR,
"failed allocating for dictionary\n");
break;
}
if (dict_set_int32(dict,RECON_TERM_XATTR,ri->work.term)) {
+ ctx->result = -1;
GF_ASSERT(0);
nsr_worker_log(this->name, GF_LOG_ERROR,
"error setting term in dict\n");
break;
}
if (dict_set_int32(dict,RECON_INDEX_XATTR,ri->work.index)) {
+ ctx->result = -1;
GF_ASSERT(0);
nsr_worker_log(this->name, GF_LOG_ERROR,
"error setting term in dict\n");
break;
}
- apply_record(ctx, ri, dict);
+ if (apply_record(ctx, ri, dict) == _gf_false) {
+ nsr_worker_log(this->name, GF_LOG_ERROR,
+ "apply_record fails\n");
+ }
nsr_worker_log(this->name, GF_LOG_INFO,
"finished recon commit for gfid %s \n",
@@ -2041,18 +2166,21 @@ data_worker_func(nsr_per_node_worker_t *ctx,
case NSR_WORK_ID_SINGLE_RECONCILIATION_FLUSH:
dict = dict_new ();
if (!dict) {
+ ctx->result = -1;
GF_ASSERT(0);
nsr_worker_log(this->name, GF_LOG_ERROR,
"failed allocating for dictionary\n");
break;
}
if (dict_set_int32(dict,RECON_TERM_XATTR,ri->work.term)) {
+ ctx->result = -1;
GF_ASSERT(0);
nsr_worker_log(this->name, GF_LOG_ERROR,
"error setting term in dict\n");
break;
}
if (dict_set_int32(dict,RECON_INDEX_XATTR,ri->work.index)) {
+ ctx->result = -1;
GF_ASSERT(0);
nsr_worker_log(this->name, GF_LOG_ERROR,
"error setting term in dict\n");
@@ -2198,7 +2326,9 @@ create_worker_threads(nsr_recon_private_t *priv,
* misc - used to overload such as index.
*/
static void
-send_and_wait(int32_t bm,
+send_and_wait(int32_t *result,
+ int32_t *op_errno,
+ int32_t bm,
uint32_t num,
nsr_recon_driver_ctx_t *ctx,
nsr_recon_work_req_id_t id,
@@ -2208,6 +2338,18 @@ send_and_wait(int32_t bm,
uint32_t i = 0;
nsr_recon_work_t *work;
+#define CONTROL_WORKER(i) ctx->workers[i].control_worker
+#define DATA_WORKER(i) ctx->workers[i].data_worker
+#define WORKER(i) ((q == NSR_RECON_QUEUE_TO_CONTROL) ? (CONTROL_WORKER(i)) : (DATA_WORKER(i)))
+
+ *result = *op_errno = 0;
+
+ for (i=0; i < num; i++) {
+ if ((bm & (1 << i)) && ctx->workers[i].in_use) {
+ WORKER(i)->result = 0;
+ WORKER(i)->op_errno = 0;
+ }
+ }
if (ctx->mode == NSR_SEQ) {
for (i=0; i < num; i++) {
if ((bm & (1 << i)) && ctx->workers[i].in_use) {
@@ -2222,8 +2364,7 @@ send_and_wait(int32_t bm,
}
}
}
- nsr_driver_log(this->name, GF_LOG_INFO, "send_and_wait: all workers have returned\n");
- return;
+ goto out;
}
for (i=0; i < num; i++) {
@@ -2238,41 +2379,40 @@ send_and_wait(int32_t bm,
while (ctx->outstanding) {
pthread_yield();
}
- nsr_driver_log(this->name, GF_LOG_INFO, "send_and_wait: all workers have returned\n");
- return;
-}
-
-#if 0
-static void
-send_and_do_not_wait(int32_t bm,
- uint32_t num,
- nsr_recon_driver_ctx_t *ctx,
- nsr_recon_work_req_id_t id,
- nsr_recon_queue_type_t q,
- int32_t misc)
-{
- uint32_t i = 0;
-
- for (i=0; i < num; i++) {
+out:
+ for (i=0; i < num; i++) {
if ((bm & (1 << i)) && ctx->workers[i].in_use) {
- nsr_recon_work_t *work;
- recon_make_work(&work, id, misc);
- recon_queue_to_worker(ctx, work, i, q);
- }
- }
+ if (WORKER(i)->result == -1) {
+ *result = -1;
+ }
+ }
+ }
+ if (*result == -1) {
+ for (i=0; i < num; i++) {
+ if ((bm & (1 << i)) && ctx->workers[i].in_use) {
+ if (WORKER(i)->op_errno == EAGAIN) {
+ *op_errno = EAGAIN;
+ break;
+ } else {
+ *op_errno = EIO;
+ }
+ }
+ }
+ }
+ nsr_driver_log(this->name, GF_LOG_INFO, "send_and_wait: all workers have returned with result: %d errno:%d\n", *result, *op_errno);
return;
}
-#endif
// send INI or FINI
-static void
+static int32_t
nsr_recon_in_use(nsr_recon_driver_ctx_t *ctx,
uint32_t i,
gf_boolean_t in_use)
{
uint32_t bm = 1 << i;
gf_boolean_t send = _gf_false;
+ int32_t status =0, op_errno = 0;
if (in_use == _gf_false) {
if (ctx->workers[i].in_use == _gf_true)
@@ -2284,25 +2424,28 @@ nsr_recon_in_use(nsr_recon_driver_ctx_t *ctx,
send = _gf_true;
}
}
-#if 1
if (send == _gf_true) {
if (in_use == _gf_true) {
nsr_driver_log(this->name, GF_LOG_INFO, "sending INI to index %d\n",i);
} else {
nsr_driver_log(this->name, GF_LOG_INFO, "sending FINI to index %d\n",i);
}
- send_and_wait(bm, ctx->replica_group_size, ctx,
+ send_and_wait(&status, &op_errno, bm, ctx->replica_group_size, ctx,
(in_use == _gf_true) ? NSR_WORK_ID_INI : NSR_WORK_ID_FINI,
NSR_RECON_QUEUE_TO_CONTROL, -1);
- send_and_wait(bm, ctx->replica_group_size, ctx,
+ if (status == -1)
+ return -1;
+ send_and_wait(&status, &op_errno, bm, ctx->replica_group_size, ctx,
(in_use == _gf_true) ? NSR_WORK_ID_INI : NSR_WORK_ID_FINI,
NSR_RECON_QUEUE_TO_DATA, -1);
+ if (status == -1)
+ return -1;
if (in_use == _gf_false) {
//GF_FREE(ctx->workers[i].recon_info->records);
GF_FREE(ctx->workers[i].recon_info);
}
}
-#endif
+ return 0;
}
// main recon driver thread
@@ -2317,6 +2460,8 @@ nsr_reconciliation_driver(void *arg)
int32_t bm;
xlator_t *this = priv->this;
char *con_name, *data_name;
+ int32_t status = 0;
+ int32_t op_errno = 0;
driver_ctx = &priv->driver_thread_context;
(*driver_ctx) = GF_CALLOC (1,
@@ -2414,7 +2559,12 @@ nsr_reconciliation_driver(void *arg)
list_for_each_entry(rr, &(ctx->role_head.list), list) {
nsr_recon_driver_state_t state;
- state = nsr_recon_driver_get_role(ctx, rr);
+ state = nsr_recon_driver_get_role(&status, ctx, rr);
+
+ if (status == -1) {
+ op_errno = EIO;
+ goto out;
+ }
if (state == leader) {
@@ -2423,11 +2573,13 @@ nsr_reconciliation_driver(void *arg)
nsr_driver_log (this->name, GF_LOG_INFO, "getting last term info from all members of this group\n");
// Get last term info from all members for this group
- send_and_wait(-1,
+ send_and_wait(&status, &op_errno, -1,
replica_group_size,
ctx,
NSR_WORK_ID_GET_LAST_TERM_INFO,
NSR_RECON_QUEUE_TO_CONTROL, ctx->current_term);
+ if (status == -1)
+ goto out;
// compare all the info received and choose the reconciliator
@@ -2469,11 +2621,13 @@ nsr_reconciliation_driver(void *arg)
if (chosen != 0) {
nsr_driver_log (this->name, GF_LOG_INFO, "sending reconciliation work to %d\n", chosen);
bm = 1 << ctx->reconciliator_index;
- send_and_wait(bm,
+ send_and_wait(&status, &op_errno, bm,
replica_group_size,
ctx,
NSR_WORK_ID_RECONCILIATOR_DO_WORK,
NSR_RECON_QUEUE_TO_CONTROL, -1);
+ if (status == -1)
+ goto out;
nsr_driver_log (this->name, GF_LOG_INFO, "finished reconciliation work to %d\n", chosen);
} else {
nsr_driver_log (this->name, GF_LOG_INFO, "local node is reconciliator. before set jmp\n");
@@ -2517,11 +2671,13 @@ nsr_reconciliation_driver(void *arg)
nsr_driver_log (this->name, GF_LOG_INFO, "sending resolution work to all nodes except this node and reconciliator\n");
bm = ~((1 << ctx->reconciliator_index) || 1);
- send_and_wait(bm,
+ send_and_wait(&status, &op_errno, bm,
replica_group_size,
ctx,
NSR_WORK_ID_RESOLUTION_DO_WORK,
NSR_RECON_QUEUE_TO_CONTROL, -1);
+ if (status == -1)
+ goto out;
nsr_driver_log (this->name, GF_LOG_INFO, "finished reconciliation work as leader \n");
@@ -2564,11 +2720,13 @@ i_am_reconciliator:
// We have set the bm in the above for loop where
// we go thru all nodes including this node that
// have seen the last term.
- send_and_wait(bm,
+ send_and_wait(&status, &op_errno, bm,
replica_group_size,
ctx,
NSR_WORK_ID_GET_RECONCILATION_WINDOW,
NSR_RECON_QUEUE_TO_CONTROL, -1);
+ if (status == -1)
+ goto out;
nsr_driver_log (this->name, GF_LOG_INFO,
"finished getting reconciliation window for term %d from %dto %d \n",
ctx->workers[0].recon_info->last_term,
@@ -2604,12 +2762,14 @@ i_am_reconciliator:
bm = (1 << record->work.source);
nsr_driver_log (this->name, GF_LOG_INFO,
"reading data from source %d\n",record->work.source);
- send_and_wait(bm,
+ send_and_wait(&status, &op_errno, bm,
replica_group_size,
ctx,
NSR_WORK_ID_SINGLE_RECONCILIATION_READ,
NSR_RECON_QUEUE_TO_DATA,
i);
+ if (status == -1)
+ goto out;
nsr_driver_log (this->name, GF_LOG_INFO,
"got data from source %d\n",record->work.source);
}
@@ -2618,12 +2778,14 @@ i_am_reconciliator:
"fixing local data as part of reconciliation\n");
bm = 1;
- send_and_wait(bm,
+ send_and_wait(&status, &op_errno, bm,
replica_group_size,
ctx,
NSR_WORK_ID_SINGLE_RECONCILIATION_COMMIT,
NSR_RECON_QUEUE_TO_DATA,
i);
+ if (status == -1)
+ goto out;
nsr_driver_log (this->name, GF_LOG_INFO,
"finished fixing local data as part of reconciliation\n");
@@ -2634,12 +2796,14 @@ i_am_reconciliator:
nsr_driver_log (this->name, GF_LOG_INFO,
"fixing this record as a fill\n");
bm = 1;
- send_and_wait(bm,
+ send_and_wait(&status, &op_errno, bm,
replica_group_size,
ctx,
NSR_WORK_ID_SINGLE_RECONCILIATION_FLUSH,
NSR_RECON_QUEUE_TO_DATA,
i);
+ if (status == -1)
+ goto out;
nsr_driver_log (this->name, GF_LOG_INFO,
"finished fixing this record as a fill\n");
}
@@ -2691,11 +2855,13 @@ i_am_resolutor:
nsr_driver_log (this->name, GF_LOG_INFO,
"getting info from reconciliator for term %d \n", my_info->last_term);
bm = (1 << recon_index);
- send_and_wait(bm,
+ send_and_wait(&status, &op_errno, bm,
replica_group_size,
ctx,
NSR_WORK_ID_GET_GIVEN_TERM_INFO,
NSR_RECON_QUEUE_TO_CONTROL, his_info->last_term);
+ if (status == -1)
+ goto out;
nsr_driver_log (this->name, GF_LOG_INFO,
"finished getting info from reconciliator for term %d \n", my_info->last_term);
@@ -2739,11 +2905,13 @@ i_am_resolutor:
"getting reconciliation window for term %d from %d to %d \n",
my_info->last_term,
my_info->first_index, my_info->last_index);
- send_and_wait(bm,
+ send_and_wait(&status, &op_errno, bm,
replica_group_size,
ctx,
NSR_WORK_ID_GET_RECONCILATION_WINDOW,
NSR_RECON_QUEUE_TO_CONTROL, -1);
+ if (status == -1)
+ goto out;
nsr_driver_log (this->name, GF_LOG_INFO,
"finished getting reconciliation window for term %d from %d to %d \n",
my_info->last_term,
@@ -2771,12 +2939,14 @@ i_am_resolutor:
nsr_driver_log (this->name, GF_LOG_INFO,
"reading data from source %d\n",recon_index);
bm = (1 << recon_index);
- send_and_wait(bm,
+ send_and_wait(&status, &op_errno, bm,
replica_group_size,
ctx,
NSR_WORK_ID_SINGLE_RECONCILIATION_READ,
NSR_RECON_QUEUE_TO_DATA,
i);
+ if (status == -1)
+ goto out;
nsr_driver_log (this->name, GF_LOG_INFO,
"finished reading data from source %d\n",recon_index);
}
@@ -2785,12 +2955,14 @@ i_am_resolutor:
"fixing local data as part of resolutor\n");
bm = 1;
- send_and_wait(bm,
+ send_and_wait(&status, &op_errno, bm,
replica_group_size,
ctx,
NSR_WORK_ID_SINGLE_RECONCILIATION_COMMIT,
NSR_RECON_QUEUE_TO_DATA,
i);
+ if (status == -1)
+ goto out;
nsr_driver_log (this->name, GF_LOG_INFO,
"finished fixing local data as part of resolutor\n");
@@ -2816,21 +2988,25 @@ i_am_resolutor:
nsr_driver_log (this->name, GF_LOG_INFO, "getting last term info from all members of this group\n");
// Get last term info from all members for this group
// which will be the leader(this node) and the node that wants to join.
- send_and_wait(-1,
+ send_and_wait(&status, &op_errno, -1,
replica_group_size,
ctx,
NSR_WORK_ID_GET_LAST_TERM_INFO,
NSR_RECON_QUEUE_TO_CONTROL, ctx->current_term);
+ if (status == -1)
+ goto out;
// send message to other node that just joined to sync up with this node which is also the leader
nsr_driver_log (this->name, GF_LOG_INFO, "sending resolution work to all nodes except this\n");
bm = ~(1);
- send_and_wait(bm,
+ send_and_wait(&status, &op_errno, bm,
replica_group_size,
ctx,
NSR_WORK_ID_RESOLUTION_DO_WORK,
NSR_RECON_QUEUE_TO_CONTROL, -1);
+ if (status == -1)
+ goto out;
nsr_driver_log (this->name, GF_LOG_INFO,
"finished recon work as joiner \n");
@@ -2843,16 +3019,7 @@ i_am_resolutor:
out:
nsr_driver_log (this->name, GF_LOG_INFO,
"sending end of reconciliation message \n");
- nsr_recon_return_back(priv, ctx->txn_id);
-#if 0
- // send message that job is done by writing to local recon translator
- bm = 1;
- send_and_wait(bm,
- replica_group_size,
- ctx,
- NSR_WORK_ID_END_RECONCILIATION,
- NSR_RECON_QUEUE_TO_CONTROL, -1);
-#endif
+ nsr_recon_return_back(priv, ctx->term, status, op_errno);
nsr_driver_log (this->name, GF_LOG_INFO,
"finished sending end of reconciliation message \n");
}
diff --git a/xlators/cluster/nsr-recon/src/recon_driver.h b/xlators/cluster/nsr-recon/src/recon_driver.h
index 4030c9d73..8d87e29af 100644
--- a/xlators/cluster/nsr-recon/src/recon_driver.h
+++ b/xlators/cluster/nsr-recon/src/recon_driver.h
@@ -181,7 +181,7 @@ typedef struct nsr_recon_record_details_s {
typedef struct _nsr_role_work_s {
nsr_recon_role_t role;
- uint32_t txn_id;
+ uint32_t term;
struct list_head list;
} nsr_role_work_t;
@@ -236,6 +236,8 @@ typedef struct _nsr_per_node_worker_s {
#if defined(NSR_DEBUG)
FILE *fp;
#endif
+ int32_t result; // result of latest work
+ int32_t op_errno; // errno
} nsr_per_node_worker_t;
typedef struct _nsr_replica_worker_s {
@@ -256,7 +258,7 @@ typedef struct _nsr_recon_driver_ctxt {
nsr_role_work_t role_head;
volatile int32_t outstanding;
uint32_t reconciliator_index;
- uint32_t txn_id;
+ uint32_t term;
uint32_t current_term;
jmp_buf *env;
nsr_mode_t mode; // default set to seq
@@ -269,7 +271,7 @@ void *
nsr_reconciliation_driver(void *);
gf_boolean_t
-nsr_recon_driver_set_role(nsr_recon_driver_ctx_t *ctx, nsr_recon_role_t *rr, uint32_t txn_id);
+nsr_recon_driver_set_role(nsr_recon_driver_ctx_t *ctx, nsr_recon_role_t *rr, uint32_t term);
#define atomic_inc(ptr) ((void) __sync_fetch_and_add(ptr, 1))
#define atomic_dec(ptr) ((void) __sync_fetch_and_add(ptr, -1))
diff --git a/xlators/cluster/nsr-recon/src/recon_xlator.c b/xlators/cluster/nsr-recon/src/recon_xlator.c
index c3c8d4d55..868377bd2 100644
--- a/xlators/cluster/nsr-recon/src/recon_xlator.c
+++ b/xlators/cluster/nsr-recon/src/recon_xlator.c
@@ -32,13 +32,6 @@ typedef struct _nsr_recon_fd_s {
call_frame_t *frame;
} nsr_recon_fd_t;
-
-typedef struct _nsr_txn_id_s {
- uint32_t txn_id;
- call_frame_t *frame;
- struct list_head list;
-} nsr_txn_id_t;
-
#if defined(NSR_DEBUG)
void
@@ -81,37 +74,34 @@ static int32_t this_fd_ctx_get(fd_t *fd, xlator_t *this, nsr_recon_fd_t **rfd)
}
}
-// Add the frame in q after associating with txn_id
+// Add the frame in q after associating with term
+// term usage tbd
static void put_frame(nsr_recon_private_t *priv,
call_frame_t *frame,
- uint32_t txn_id)
+ uint32_t term)
{
- xlator_t *this = priv->this;
- nsr_txn_id_t * tid = GF_CALLOC(1, sizeof(nsr_txn_id_t), gf_mt_recon_private_t);
- tid->txn_id = txn_id;
- tid->frame = frame;
- INIT_LIST_HEAD(&(tid->list));
- list_add_tail(&(tid->list), &(priv->list));
- recon_main_log (this->name, GF_LOG_INFO, "adding framef or txn id %d into queue \n", txn_id);
+ xlator_t *this = priv->this;
+ recon_main_log (this->name, GF_LOG_INFO, "adding frame for term %d \n", term);
+ priv->frame = frame;
+ return;
}
-// get the frame from the queue given the txn id
+// get the frame from the queue given the term
+// term usage tbd
static void get_frame(nsr_recon_private_t *priv,
call_frame_t **frame,
- uint32_t txn_id)
+ uint32_t term)
{
- nsr_txn_id_t *tid = NULL;
- xlator_t *this = priv->this;
+ if (frame != NULL)
+ *frame = priv->frame;
+ priv->frame = NULL;
+ return;
+}
- list_for_each_entry(tid, &(priv->list), list) {
- if (tid->txn_id == txn_id) {
- *frame = tid->frame;
- recon_main_log (this->name, GF_LOG_INFO, "got frame for txn id %d into queue \n", txn_id);
- return;
- }
- }
- recon_main_log (this->name, GF_LOG_INFO, "got no frame for txn id %d into queue \n", txn_id);
- GF_ASSERT(0);
+// check if there are outstanding frames
+static gf_boolean_t is_frame(nsr_recon_private_t *priv)
+{
+ return((priv->frame != NULL) ? _gf_true : _gf_false);
}
#define ENTRY_SIZE 128
@@ -215,19 +205,17 @@ void nsr_recon_libchangelog_get_last_term_info(xlator_t *this, char *bp, int32_t
return;
}
-// Return back the frame stored against the txn_id
-void nsr_recon_return_back(nsr_recon_private_t *priv, uint32_t txn_id)
+// Return back the frame stored against the term
+void nsr_recon_return_back(nsr_recon_private_t *priv, uint32_t term, int32_t status, int32_t op_errno)
{
call_frame_t *old_frame = NULL;
xlator_t *this = priv->this;
- int32_t op_ret = 0;
- int32_t op_errno = 0;
- get_frame(priv, &old_frame, txn_id);
+ get_frame(priv, &old_frame, term);
if (old_frame) {
recon_main_log (this->name, GF_LOG_INFO, "nsr_recon_writev returns old frame \n");
// first return the original write for which this ack was sent
- STACK_UNWIND_STRICT (writev, old_frame, op_ret, op_errno, NULL, NULL, NULL);
+ STACK_UNWIND_STRICT (writev, old_frame, status, op_errno, NULL, NULL, NULL);
} else {
recon_main_log (this->name, GF_LOG_ERROR, "EIII---nsr_recon_writev cnnot return old frame \n");
}
@@ -289,7 +277,7 @@ get_link_using_gfid(nsr_recon_private_t *priv, char *gfid, char *path)
//
// Really, 90% of this code should just GO AWAY in favor of using
// libgfchangelog, enhanced as necessary to support our needs.
-void nsr_recon_libchangelog_get_records(xlator_t *this, char *bp, int32_t term, uint32_t first, uint32_t last, void *buf)
+gf_boolean_t nsr_recon_libchangelog_get_records(xlator_t *this, char *bp, int32_t term, uint32_t first, uint32_t last, void *buf)
{
// do a mmap; seek into the first and read all records till last.
// TBD - right now all records are pseudo holes but mark them as fills.
@@ -307,7 +295,9 @@ void nsr_recon_libchangelog_get_records(xlator_t *this, char *bp, int32_t term,
sprintf(path,"%s/%s%d",bp,"TERM.",term);
fd = open(path, O_RDONLY);
- if (fd != -1) {
+ if (fd == -1) {
+ return _gf_false;
+ } else {
char *start = NULL;
nsr_recon_record_details_t * rec = (nsr_recon_record_details_t *)buf;
@@ -315,7 +305,9 @@ void nsr_recon_libchangelog_get_records(xlator_t *this, char *bp, int32_t term,
lseek(fd, 128, SEEK_SET);
else
lseek(fd, first * 128, SEEK_SET);
- read(fd, rb, (last - first + 1) * 128);
+ if (read(fd, rb, (last - first + 1) * 128) == -1) {
+ return _gf_false;
+ }
start = rb;
index = first;
do {
@@ -532,7 +524,7 @@ finish:
recon_main_log (this->name, GF_LOG_INFO,
"libchangelog_get_records finsihed inspecting records for term %d \n",
term);
- return;
+ return _gf_true;
}
int32_t
@@ -580,20 +572,6 @@ nsr_recon_writev (call_frame_t *frame, xlator_t *this, fd_t *fd,
recon_main_log (this->name, GF_LOG_INFO, "nsr_recon_writev called for offset %d \n",(unsigned int)offset );
GF_ASSERT(count == 1);
switch (offset) {
- // gets called to return back
- case nsr_recon_xlator_sector_0:
- {
- char c[4];
- uint32_t txn_id;
-
- recon_main_log (this->name, GF_LOG_INFO, "nsr_recon_writev clled to return back \n");
- memcpy((void *)c, (void *)vector[0].iov_base, 4);
- txn_id = ntohl(atoi(c));
- nsr_recon_return_back(priv, txn_id);
- STACK_UNWIND_STRICT (writev, frame, op_ret, op_errno,
- NULL, NULL, NULL);
- break;
- }
// client(brick, leader) writes the role of the node
case nsr_recon_xlator_sector_1 :
{
@@ -614,22 +592,33 @@ nsr_recon_writev (call_frame_t *frame, xlator_t *this, fd_t *fd,
GF_ASSERT(rr.num <= MAXIMUM_REPLICA_STRENGTH);
- // Store the stack frame so that when the actual job gets finished
- // we send the response back to the brick.
- if (nsr_recon_driver_set_role(priv->driver_thread_context,
- &rr,
- priv->txn_id) == _gf_false) {
+ // Check if already a role play is going on. If yes return with EAGAIN.
+ // Ideally we should check if we have got a higher term number while
+ // servicing a lower term number; if so abort the older one.
+ // However the abort infrastructure needs to be sketched properly; TBD.
+ if (is_frame(priv) == _gf_true) {
recon_main_log (this->name, GF_LOG_ERROR,
- "nsr_recon_writev set_role - cannot seem to set role \n");
- STACK_UNWIND_STRICT (writev, frame, -1, op_errno,
+ "nsr_recon_writev set_role - already role play \n");
+ STACK_UNWIND_STRICT (writev, frame, -1, EAGAIN,
NULL, NULL, NULL);
- } else {
- uint32_t old = priv->txn_id;
- atomic_cmpxchg(&priv->txn_id, old,old+1);
- put_frame(priv, frame, old);
- recon_main_log (this->name, GF_LOG_INFO,
- "nsr_recon_writev set_role - set role succesfully \n");
- }
+ } else {
+
+ // Store the stack frame so that when the actual job gets finished
+ // we send the response back to the brick.
+ put_frame(priv, frame, rr.current_term);
+ if (nsr_recon_driver_set_role(priv->driver_thread_context,
+ &rr,
+ rr.current_term) == _gf_false) {
+ get_frame(priv, NULL, rr.current_term);
+ recon_main_log (this->name, GF_LOG_ERROR,
+ "nsr_recon_writev set_role - cannot seem to set role \n");
+ STACK_UNWIND_STRICT (writev, frame, -1, op_errno,
+ NULL, NULL, NULL);
+ } else {
+ recon_main_log (this->name, GF_LOG_INFO,
+ "nsr_recon_writev set_role - set role succesfully \n");
+ }
+ }
break;
}
// client(reconciliator) writes how much it needs for the read
@@ -679,6 +668,14 @@ nsr_recon_writev (call_frame_t *frame, xlator_t *this, fd_t *fd,
NULL, NULL, NULL);
break;
}
+ default:
+ {
+ recon_main_log (this->name, GF_LOG_ERROR,
+ "nsr_recon_writev called with wrong offset\n");
+ STACK_UNWIND_STRICT (writev, frame, -1, op_errno,
+ NULL, NULL, NULL);
+ break;
+ }
}
return 0;
@@ -764,6 +761,13 @@ nsr_recon_readv (call_frame_t *frame, xlator_t *this,
memcpy(iobuf->ptr, &lt, size);
goto out;
}
+ default:
+ {
+ recon_main_log (this->name, GF_LOG_ERROR,
+ "nsr_recon_readv called with wrong offset\n");
+ op_errno = -1;
+ break;
+ }
}
out:
diff --git a/xlators/cluster/nsr-recon/src/recon_xlator.h b/xlators/cluster/nsr-recon/src/recon_xlator.h
index 8c48f6ff6..c92489db1 100644
--- a/xlators/cluster/nsr-recon/src/recon_xlator.h
+++ b/xlators/cluster/nsr-recon/src/recon_xlator.h
@@ -70,8 +70,8 @@ _recon_main_log (const char *func, int line, char *member, FILE *fp,
void nsr_recon_libchangelog_get_this_term_info(xlator_t *this, char *bp, int32_t term, nsr_recon_last_term_info_t *lt);
void nsr_recon_libchangelog_get_last_term_info(xlator_t *this, char *bp, int32_t term, nsr_recon_last_term_info_t *lt);
-void nsr_recon_return_back(nsr_recon_private_t *priv, uint32_t term_id);
-void nsr_recon_libchangelog_get_records(xlator_t *this, char *bp, int32_t term, uint32_t first, uint32_t last, void *buf);
+void nsr_recon_return_back(nsr_recon_private_t *priv, uint32_t term, int32_t status, int32_t op_errno);
+gf_boolean_t nsr_recon_libchangelog_get_records(xlator_t *this, char *bp, int32_t term, uint32_t first, uint32_t last, void *buf);
#endif /* #ifndef __RECON_XLATOR_H__ */
diff --git a/xlators/cluster/nsr-server/src/recon_notify.c b/xlators/cluster/nsr-server/src/recon_notify.c
index 7a0de85b1..7397192ae 100644
--- a/xlators/cluster/nsr-server/src/recon_notify.c
+++ b/xlators/cluster/nsr-server/src/recon_notify.c
@@ -120,12 +120,49 @@ nsr_recon_set_leader (xlator_t *this)
// in the callback (once reconciliation is done),
// we will unfence the IOs.
// TBD - error handling later.
- glfs_lseek(ctx->fd, nsr_recon_xlator_sector_1, SEEK_SET);
+ if (glfs_lseek(ctx->fd, nsr_recon_xlator_sector_1, SEEK_SET) == -1) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "doing lseek failed\n");
+ return;
+ }
+
glusterfs_this_set(old);
gf_log (this->name, GF_LOG_INFO,
"Writing to local node to set leader");
- glfs_write(ctx->fd, &role,
- sizeof(role), 0);
+ do {
+ if (priv->leader != _gf_true) {
+ glusterfs_this_set(old);
+ gf_log (this->name, GF_LOG_ERROR, "no longer leader\n");
+ return;
+ }
+ if (glfs_write(ctx->fd, &role, sizeof(role), 0) == -1) {
+ if (errno == EAGAIN) {
+ // Wait for old reconciliation to bail out.
+ glusterfs_this_set(old);
+ gf_log (this->name, GF_LOG_ERROR,
+ "write failed with retry. retrying after some time\n");
+ sleep(5);
+ continue;
+ }
+ else{
+ glusterfs_this_set(old);
+ gf_log (this->name, GF_LOG_ERROR,
+ "doing write failed\n");
+ // This is because reconciliation has returned with error
+ // because some node has died in between.
+ // What should be done? Either we retry being leader
+ // or hook to CHILD_DOWN notification.
+ // Put that logic later. As of now we will just retry.
+ // This is easier.
+ sleep(5);
+ continue;
+ }
+ } else {
+ glusterfs_this_set(old);
+ gf_log (this->name, GF_LOG_INFO, "doing write with success\n");
+ break;
+ }
+ } while(1);
glusterfs_this_set(old);
gf_log (this->name, GF_LOG_INFO,
"glfs_write returned. unfencing IO\n");