summaryrefslogtreecommitdiffstats
path: root/xlators/cluster
diff options
context:
space:
mode:
authorJeff Darcy <jdarcy@redhat.com>2014-02-03 20:04:39 +0000
committerJeff Darcy <jdarcy@redhat.com>2014-02-04 19:46:52 +0000
commita3388532787ec42a8cd3fbb4cc88b84fcf0b5cc3 (patch)
tree109a17c8445f44c6f0490884ce776699ecd821b3 /xlators/cluster
parent00eab2c50adfb440354b4c5b650fdbf961901c50 (diff)
nsr-recon: bring data_worker_fun in line with coding standard
...or any reasonable one. OK, not in line, but closer. This introduces only minor functional change, in the form of new error messages for cases that haven't been implemented yet. Mostly it's just a matter of making the code readable and reviewable. Change-Id: Ide76f8b9ca1fb79f8f2772bc70a6f39656f49816 Signed-off-by: Jeff Darcy <jdarcy@redhat.com>
Diffstat (limited to 'xlators/cluster')
-rw-r--r--xlators/cluster/nsr-recon/src/recon_driver.c473
1 files changed, 245 insertions, 228 deletions
diff --git a/xlators/cluster/nsr-recon/src/recon_driver.c b/xlators/cluster/nsr-recon/src/recon_driver.c
index 084b7f5ce..00e79b9d3 100644
--- a/xlators/cluster/nsr-recon/src/recon_driver.c
+++ b/xlators/cluster/nsr-recon/src/recon_driver.c
@@ -1776,262 +1776,279 @@ static void
data_worker_func(nsr_per_node_worker_t *ctx,
nsr_recon_work_t *work)
{
- nsr_recon_driver_ctx_t *dr = ctx->driver_ctx;
- nsr_reconciliation_record_t *ri = NULL;
- nsr_recon_record_details_t *rd = NULL;
- glfs_fd_t *fd = NULL;
- int wip = 0;
+ nsr_recon_driver_ctx_t *dr = ctx->driver_ctx;
+ nsr_reconciliation_record_t *ri = NULL;
+ nsr_recon_record_details_t *rd = NULL;
+ int wip = 0;
+ dict_t * dict = NULL;
+ struct glfs_fd *fd = NULL;
+ struct glfs_object *obj = NULL;
+ uuid_t gfid;
+ uint32_t k_s = 0, v_s = 0;
+ char *t_b= NULL;
+ uint32_t num=0;
- switch (work->req_id){
- case NSR_WORK_ID_INI:
- {
- nsr_worker_log(this->name, GF_LOG_INFO,
- "started data ini \n");
- nsr_recon_start_work(ctx, _gf_false);
+ switch (work->req_id){
+ case NSR_WORK_ID_INI:
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "started data ini \n");
- nsr_worker_log(this->name, GF_LOG_INFO,
- "finished data ini \n");
- break;
- }
- case NSR_WORK_ID_FINI:
- {
- nsr_worker_log(this->name, GF_LOG_INFO,
- "started data fini \n");
+ nsr_recon_start_work(ctx, _gf_false);
- nsr_recon_end_work(ctx, _gf_false);
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "finished data ini \n");
+ break;
+ case NSR_WORK_ID_FINI:
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "started data fini \n");
- nsr_worker_log(this->name, GF_LOG_INFO,
- "finished data fini \n");
- break;
- }
- case NSR_WORK_ID_SINGLE_RECONCILIATION_READ:
- {
- dict_t * dict = NULL;
- // first_index always starts with 1 but records starts at 0.
- wip = work->index - (dr->workers[0].recon_info->first_index);
- ri = &(dr->workers[0].recon_info->records[wip]);
- rd = &(ri->rec);
-
- dict = dict_new ();
- if (!dict) {
- GF_ASSERT(0);
- nsr_worker_log(this->name, GF_LOG_ERROR,
- "failed allocating for dictionary\n");
- goto commit_out;
- }
- if (dict_set_int32(dict,RECON_TERM_XATTR,ri->work.term)) {
- GF_ASSERT(0);
- nsr_worker_log(this->name, GF_LOG_ERROR,
- "error setting term in dict\n");
- goto commit_out;
- }
- if (dict_set_int32(dict,RECON_INDEX_XATTR,ri->work.index)) {
- GF_ASSERT(0);
- nsr_worker_log(this->name, GF_LOG_ERROR,
- "error setting term in dict\n");
- goto commit_out;
- }
+ nsr_recon_end_work(ctx, _gf_false);
- if (rd->op == GF_FOP_WRITE) {
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "finished data fini \n");
+ break;
+ case NSR_WORK_ID_SINGLE_RECONCILIATION_READ:
+ // first_index always starts with 1 but records starts at 0.
+ wip = work->index - (dr->workers[0].recon_info->first_index);
+ ri = &(dr->workers[0].recon_info->records[wip]);
+ rd = &(ri->rec);
+
+ dict = dict_new ();
+ if (!dict) {
+ GF_ASSERT(0);
+ nsr_worker_log(this->name, GF_LOG_ERROR,
+ "failed allocating for dictionary\n");
+ break;
+ }
+ if (dict_set_int32(dict,RECON_TERM_XATTR,ri->work.term)) {
+ GF_ASSERT(0);
+ nsr_worker_log(this->name, GF_LOG_ERROR,
+ "error setting term in dict\n");
+ break;
+ }
+ if (dict_set_int32(dict,RECON_INDEX_XATTR,ri->work.index)) {
+ GF_ASSERT(0);
+ nsr_worker_log(this->name, GF_LOG_ERROR,
+ "error setting term in dict\n");
+ break;
+ }
- // record already copied.
- // copy data to this node's info.
- struct glfs_fd *fd = NULL;
- struct glfs_object *obj = NULL;
- uuid_t gfid;
+ switch (rd->op) {
+ case GF_FOP_WRITE:
- uuid_parse(ri->rec.gfid, gfid);
+ // record already copied.
+ // copy data to this node's info.
- nsr_worker_log(this->name, GF_LOG_INFO,
- "started recon read for file %s at offset %d at len %d\n",
- ri->rec.gfid, rd->offset, rd->len);
-
- obj = glfs_h_create_from_handle(ctx->fs, gfid, GFAPI_HANDLE_LENGTH, NULL);
- if (obj == NULL) {
- GF_ASSERT(obj != NULL);
- nsr_worker_log(this->name, GF_LOG_ERROR,
- "creating of handle failed\n");
- goto read_out;
- }
+ uuid_parse(ri->rec.gfid, gfid);
- // The file has probably got deleted.
- fd = glfs_h_open_with_xdata(ctx->fs, obj, O_RDONLY, dict);
- if (fd == NULL) {
- GF_ASSERT(fd != NULL);
- nsr_worker_log(this->name, GF_LOG_ERROR,
- "opening of file failed\n");
- goto read_out;
- }
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "started recon read for file %s at offset %d at len %d\n",
+ ri->rec.gfid, rd->offset, rd->len);
+
+ obj = glfs_h_create_from_handle (ctx->fs, gfid,
+ GFAPI_HANDLE_LENGTH,
+ NULL);
+ if (obj == NULL) {
+ GF_ASSERT(obj != NULL);
+ nsr_worker_log(this->name, GF_LOG_ERROR,
+ "creating of handle failed\n");
+ break;
+ }
- if (glfs_lseek_with_xdata(fd, rd->offset, SEEK_SET, dict) != rd->offset) {
- GF_ASSERT(0);
- nsr_worker_log(this->name, GF_LOG_ERROR,
- "lseek of file failed to offset %d\n", rd->offset);
- goto read_out;
- }
+ // The file has probably got deleted.
+ fd = glfs_h_open_with_xdata (ctx->fs, obj, O_RDONLY,
+ dict);
+ if (fd == NULL) {
+ GF_ASSERT(fd != NULL);
+ nsr_worker_log(this->name, GF_LOG_ERROR,
+ "opening of file failed\n");
+ break;
+ }
- ri->work.data = GF_CALLOC(rd->len , sizeof(char), gf_mt_recon_private_t);
- if (glfs_read_with_xdata(fd, ri->work.data, rd->len, 0, dict) != rd->len) {
- GF_ASSERT(0);
- nsr_worker_log(this->name, GF_LOG_ERROR,
- "read of file failed to offset %d for bytes %d\n", rd->offset, rd->len);
- goto read_out;
- }
+ if (glfs_lseek_with_xdata (fd, rd->offset, SEEK_SET,
+ dict) != rd->offset) {
+ GF_ASSERT(0);
+ nsr_worker_log(this->name, GF_LOG_ERROR,
+ "lseek of file failed to offset %d\n",
+ rd->offset);
+ break;
+ }
- glfs_close_with_xdata(fd, dict);
- glfs_h_close(obj);
+ ri->work.data = GF_CALLOC (rd->len , sizeof(char),
+ gf_mt_recon_private_t);
+ if (glfs_read_with_xdata (fd, ri->work.data, rd->len,
+ 0, dict) != rd->len) {
+ GF_ASSERT(0);
+ nsr_worker_log(this->name, GF_LOG_ERROR,
+ "read of file failed to offset %d for bytes %d\n",
+ rd->offset, rd->len);
+ break;
+ }
- } else if (rd->op == GF_FOP_FTRUNCATE) {
- } else if (rd->op == GF_FOP_SYMLINK) {
- } else if ((rd->op == GF_FOP_RMDIR) || (rd->op == GF_FOP_UNLINK) ||
- (rd->op == GF_FOP_MKNOD) || (rd->op == GF_FOP_CREATE) ||
- (rd->op == GF_FOP_LINK) || (rd->op == GF_FOP_MKDIR)) {
- } else if (rd->op == GF_FOP_RENAME) {
- } else if ((rd->op == GF_FOP_FREMOVEXATTR) ||
- (rd->op == GF_FOP_REMOVEXATTR) ||
- (rd->op == GF_FOP_SETXATTR) ||
- (rd->op == GF_FOP_FSETXATTR)) {
+ glfs_close_with_xdata(fd, dict);
+ glfs_h_close(obj);
+ break;
- struct glfs_fd *fd = NULL;
- struct glfs_object *obj = NULL;
- uuid_t gfid;
+ case GF_FOP_FTRUNCATE:
+ case GF_FOP_SYMLINK:
+ case GF_FOP_RMDIR:
+ case GF_FOP_UNLINK:
+ case GF_FOP_MKNOD:
+ case GF_FOP_CREATE:
+ case GF_FOP_LINK:
+ case GF_FOP_MKDIR:
+ case GF_FOP_RENAME:
+ nsr_worker_log (this->name, GF_LOG_ERROR,
+ "unimplemented fop %u\n", rd->op);
+ break;
- uuid_parse(ri->rec.gfid, gfid);
+ case GF_FOP_FREMOVEXATTR:
+ case GF_FOP_REMOVEXATTR:
+ case GF_FOP_SETXATTR:
+ case GF_FOP_FSETXATTR:
+ uuid_parse(ri->rec.gfid, gfid);
- // This is for all the set attribute/extended attributes commands.
- // Get all the attributes from the source and fill it in the buffer
- // as a NULL seperated key and value which are in turn seperated by
- // NULL.
- uint32_t k_s = 0, v_s = 0;
- char *t_b= NULL;
- uint32_t num=0;
+ // This is for all the set attribute/extended
+ // attributes commands. Get all the attributes from
+ // the source and fill it in the buffer as a NULL
+ // seperated key and value which are in turn seperated
+ // by NULL.
- nsr_worker_log(this->name, GF_LOG_INFO,
- "doing getattr for gfid %s \n",
- ri->rec.gfid);
-
- obj = glfs_h_create_from_handle(ctx->fs, gfid, GFAPI_HANDLE_LENGTH, NULL);
- if (obj == NULL) {
- GF_ASSERT(fd != NULL);
- nsr_worker_log(this->name, GF_LOG_ERROR,
- "creating of handle failed\n");
- goto read_out;
- }
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "doing getattr for gfid %s \n",
+ ri->rec.gfid);
- if (obj->inode->ia_type == IA_IFDIR)
- fd = glfs_h_opendir_with_xdata(ctx->fs, obj, dict);
- else
- fd = glfs_h_open_with_xdata(ctx->fs, obj, O_RDONLY, dict);
+ obj = glfs_h_create_from_handle (ctx->fs, gfid,
+ GFAPI_HANDLE_LENGTH,
+ NULL);
+ if (obj == NULL) {
+ GF_ASSERT(fd != NULL);
+ nsr_worker_log(this->name, GF_LOG_ERROR,
+ "creating of handle failed\n");
+ break;
+ }
- if (fd == NULL) {
- GF_ASSERT(fd != NULL);
- nsr_worker_log(this->name, GF_LOG_ERROR,
- "opening of file failed\n");
- goto read_out;
- }
+ if (obj->inode->ia_type == IA_IFDIR)
+ fd = glfs_h_opendir_with_xdata (ctx->fs, obj,
+ dict);
+ else
+ fd = glfs_h_open_with_xdata (ctx->fs, obj,
+ O_RDONLY, dict);
+
+ if (fd == NULL) {
+ GF_ASSERT(fd != NULL);
+ nsr_worker_log(this->name, GF_LOG_ERROR,
+ "opening of file failed\n");
+ break;
+ }
- if(get_xattr_total_size(fd, &t_b, &k_s, &v_s, &num, dict) == -1) {
- if (t_b) free(t_b);
- GF_ASSERT(0);
- nsr_worker_log(this->name, GF_LOG_ERROR,
- "list of xattr of gfid %s failed\n", rd->gfid);
- goto read_out;
- }
- ri->work.data = GF_CALLOC((k_s + v_s) , sizeof(char), gf_mt_recon_private_t);
- get_xattr(fd, t_b, ri->work.data, v_s, num, dict);
- ri->work.num = num;
- nsr_worker_log(this->name, GF_LOG_INFO,
- "finished getattr for gfid %s \n",
- ri->rec.gfid);
- free(t_b);
- goto read_out;
+ if (get_xattr_total_size (fd, &t_b, &k_s, &v_s, &num,
+ dict) == -1) {
+ if (t_b) free(t_b);
+ GF_ASSERT(0);
+ nsr_worker_log(this->name, GF_LOG_ERROR,
+ "list of xattr of gfid %s failed\n",
+ rd->gfid);
+ break;
+ }
+ ri->work.data = GF_CALLOC ((k_s + v_s) , sizeof(char),
+ gf_mt_recon_private_t);
+ get_xattr(fd, t_b, ri->work.data, v_s, num, dict);
+ ri->work.num = num;
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "finished getattr for gfid %s \n",
+ ri->rec.gfid);
+ free(t_b);
+ break;
- } else if ((rd->op == GF_FOP_FSETATTR) ||
- (rd->op == GF_FOP_SETATTR)) {
+ case GF_FOP_FSETATTR:
+ case GF_FOP_SETATTR:
+ //TBD - to get the actual attrbutes and fill
+ // mode, uid, gid, size, atime, mtime
+ nsr_worker_log (this->name, GF_LOG_ERROR,
+ "unimplemented fop %u\n", rd->op);
+ break;
+ default:
+ nsr_worker_log (this->name, GF_LOG_ERROR,
+ "unrecognized fop %u\n", rd->op);
- //TBD - to get the actual attrbutes and fill
- // mode, uid, gid, size, atime, mtime
- }
-read_out:
- nsr_worker_log(this->name, GF_LOG_INFO,
- "finished recon read for gfid %s at offset %d for %d bytes \n",
- rd->gfid, rd->offset, rd->len);
- break;
- }
- case NSR_WORK_ID_SINGLE_RECONCILIATION_COMMIT:
- {
- dict_t * dict = NULL;
- // first_index always starts with 1 but records starts at 0.
- wip = work->index - (dr->workers[0].recon_info->first_index);
- ri = &(dr->workers[0].recon_info->records[wip]);
- rd = &(ri->rec);
+ }
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "finished recon read for gfid %s at offset %d for %d bytes \n",
+ rd->gfid, rd->offset, rd->len);
+ break;
- nsr_worker_log(this->name, GF_LOG_INFO,
- "got recon commit for index %d that has gfid %s \n",
- wip, rd->gfid);
- dict = dict_new ();
- if (!dict) {
- GF_ASSERT(0);
- nsr_worker_log(this->name, GF_LOG_ERROR,
- "failed allocating for dictionary\n");
- goto commit_out;
- }
- if (dict_set_int32(dict,RECON_TERM_XATTR,ri->work.term)) {
- GF_ASSERT(0);
- nsr_worker_log(this->name, GF_LOG_ERROR,
- "error setting term in dict\n");
- goto commit_out;
- }
- if (dict_set_int32(dict,RECON_INDEX_XATTR,ri->work.index)) {
- GF_ASSERT(0);
- nsr_worker_log(this->name, GF_LOG_ERROR,
- "error setting term in dict\n");
- goto commit_out;
- }
- apply_record(ctx, ri, dict);
-commit_out:
- nsr_worker_log(this->name, GF_LOG_INFO,
- "finished recon commit for gfid %s \n",
- rd->gfid);
- break;
- }
- case NSR_WORK_ID_SINGLE_RECONCILIATION_FLUSH:
- {
- dict_t * dict = NULL;
- dict = dict_new ();
- if (!dict) {
- GF_ASSERT(0);
- nsr_worker_log(this->name, GF_LOG_ERROR,
- "failed allocating for dictionary\n");
- goto commit_out;
- }
- if (dict_set_int32(dict,RECON_TERM_XATTR,ri->work.term)) {
- GF_ASSERT(0);
- nsr_worker_log(this->name, GF_LOG_ERROR,
- "error setting term in dict\n");
- goto commit_out;
- }
- if (dict_set_int32(dict,RECON_INDEX_XATTR,ri->work.index)) {
- GF_ASSERT(0);
- nsr_worker_log(this->name, GF_LOG_ERROR,
- "error setting term in dict\n");
- goto commit_out;
- }
+ case NSR_WORK_ID_SINGLE_RECONCILIATION_COMMIT:
+ // first_index always starts with 1 but records starts at 0.
+ wip = work->index - (dr->workers[0].recon_info->first_index);
+ ri = &(dr->workers[0].recon_info->records[wip]);
+ rd = &(ri->rec);
- // Increment work index with the start index
- wip = work->index - (dr->workers[0].recon_info->first_index);
- ri = &(dr->workers[0].recon_info->records[wip]);
- rd = &(ri->rec);
- //fd = glfs_open(ctx->fs, rd->gfid, O_RDONLY); //TBD - using gfid
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "got recon commit for index %d that has gfid %s \n",
+ wip, rd->gfid);
+ dict = dict_new ();
+ if (!dict) {
+ GF_ASSERT(0);
+ nsr_worker_log(this->name, GF_LOG_ERROR,
+ "failed allocating for dictionary\n");
+ break;
+ }
+ if (dict_set_int32(dict,RECON_TERM_XATTR,ri->work.term)) {
+ GF_ASSERT(0);
+ nsr_worker_log(this->name, GF_LOG_ERROR,
+ "error setting term in dict\n");
+ break;
+ }
+ if (dict_set_int32(dict,RECON_INDEX_XATTR,ri->work.index)) {
+ GF_ASSERT(0);
+ nsr_worker_log(this->name, GF_LOG_ERROR,
+ "error setting term in dict\n");
+ break;
+ }
+ apply_record(ctx, ri, dict);
- glfs_fsync_with_xdata(fd, dict);
- break;
- }
- }
- return;
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "finished recon commit for gfid %s \n",
+ rd->gfid);
+ break;
+
+ case NSR_WORK_ID_SINGLE_RECONCILIATION_FLUSH:
+ dict = dict_new ();
+ if (!dict) {
+ GF_ASSERT(0);
+ nsr_worker_log(this->name, GF_LOG_ERROR,
+ "failed allocating for dictionary\n");
+ break;
+ }
+ if (dict_set_int32(dict,RECON_TERM_XATTR,ri->work.term)) {
+ GF_ASSERT(0);
+ nsr_worker_log(this->name, GF_LOG_ERROR,
+ "error setting term in dict\n");
+ break;
+ }
+ if (dict_set_int32(dict,RECON_INDEX_XATTR,ri->work.index)) {
+ GF_ASSERT(0);
+ nsr_worker_log(this->name, GF_LOG_ERROR,
+ "error setting term in dict\n");
+ break;
+ }
+
+ // Increment work index with the start index
+ wip = work->index - (dr->workers[0].recon_info->first_index);
+ ri = &(dr->workers[0].recon_info->records[wip]);
+ rd = &(ri->rec);
+
+ glfs_fsync_with_xdata(fd, dict);
+ break;
+
+ default:
+ nsr_worker_log (this->name, GF_LOG_ERROR,
+ "unrecognized request id %u\n", work->req_id);
+ }
}
// thread for doing data work