diff options
Diffstat (limited to 'xlators/cluster/nsr-recon/src')
-rw-r--r-- | xlators/cluster/nsr-recon/src/recon_driver.c | 473 |
1 files changed, 245 insertions, 228 deletions
diff --git a/xlators/cluster/nsr-recon/src/recon_driver.c b/xlators/cluster/nsr-recon/src/recon_driver.c index 084b7f5ce..00e79b9d3 100644 --- a/xlators/cluster/nsr-recon/src/recon_driver.c +++ b/xlators/cluster/nsr-recon/src/recon_driver.c @@ -1776,262 +1776,279 @@ static void data_worker_func(nsr_per_node_worker_t *ctx, nsr_recon_work_t *work) { - nsr_recon_driver_ctx_t *dr = ctx->driver_ctx; - nsr_reconciliation_record_t *ri = NULL; - nsr_recon_record_details_t *rd = NULL; - glfs_fd_t *fd = NULL; - int wip = 0; + nsr_recon_driver_ctx_t *dr = ctx->driver_ctx; + nsr_reconciliation_record_t *ri = NULL; + nsr_recon_record_details_t *rd = NULL; + int wip = 0; + dict_t * dict = NULL; + struct glfs_fd *fd = NULL; + struct glfs_object *obj = NULL; + uuid_t gfid; + uint32_t k_s = 0, v_s = 0; + char *t_b= NULL; + uint32_t num=0; - switch (work->req_id){ - case NSR_WORK_ID_INI: - { - nsr_worker_log(this->name, GF_LOG_INFO, - "started data ini \n"); - nsr_recon_start_work(ctx, _gf_false); + switch (work->req_id){ + case NSR_WORK_ID_INI: + nsr_worker_log(this->name, GF_LOG_INFO, + "started data ini \n"); - nsr_worker_log(this->name, GF_LOG_INFO, - "finished data ini \n"); - break; - } - case NSR_WORK_ID_FINI: - { - nsr_worker_log(this->name, GF_LOG_INFO, - "started data fini \n"); + nsr_recon_start_work(ctx, _gf_false); - nsr_recon_end_work(ctx, _gf_false); + nsr_worker_log(this->name, GF_LOG_INFO, + "finished data ini \n"); + break; + case NSR_WORK_ID_FINI: + nsr_worker_log(this->name, GF_LOG_INFO, + "started data fini \n"); - nsr_worker_log(this->name, GF_LOG_INFO, - "finished data fini \n"); - break; - } - case NSR_WORK_ID_SINGLE_RECONCILIATION_READ: - { - dict_t * dict = NULL; - // first_index always starts with 1 but records starts at 0. - wip = work->index - (dr->workers[0].recon_info->first_index); - ri = &(dr->workers[0].recon_info->records[wip]); - rd = &(ri->rec); - - dict = dict_new (); - if (!dict) { - GF_ASSERT(0); - nsr_worker_log(this->name, GF_LOG_ERROR, - "failed allocating for dictionary\n"); - goto commit_out; - } - if (dict_set_int32(dict,RECON_TERM_XATTR,ri->work.term)) { - GF_ASSERT(0); - nsr_worker_log(this->name, GF_LOG_ERROR, - "error setting term in dict\n"); - goto commit_out; - } - if (dict_set_int32(dict,RECON_INDEX_XATTR,ri->work.index)) { - GF_ASSERT(0); - nsr_worker_log(this->name, GF_LOG_ERROR, - "error setting term in dict\n"); - goto commit_out; - } + nsr_recon_end_work(ctx, _gf_false); - if (rd->op == GF_FOP_WRITE) { + nsr_worker_log(this->name, GF_LOG_INFO, + "finished data fini \n"); + break; + case NSR_WORK_ID_SINGLE_RECONCILIATION_READ: + // first_index always starts with 1 but records starts at 0. + wip = work->index - (dr->workers[0].recon_info->first_index); + ri = &(dr->workers[0].recon_info->records[wip]); + rd = &(ri->rec); + + dict = dict_new (); + if (!dict) { + GF_ASSERT(0); + nsr_worker_log(this->name, GF_LOG_ERROR, + "failed allocating for dictionary\n"); + break; + } + if (dict_set_int32(dict,RECON_TERM_XATTR,ri->work.term)) { + GF_ASSERT(0); + nsr_worker_log(this->name, GF_LOG_ERROR, + "error setting term in dict\n"); + break; + } + if (dict_set_int32(dict,RECON_INDEX_XATTR,ri->work.index)) { + GF_ASSERT(0); + nsr_worker_log(this->name, GF_LOG_ERROR, + "error setting term in dict\n"); + break; + } - // record already copied. - // copy data to this node's info. - struct glfs_fd *fd = NULL; - struct glfs_object *obj = NULL; - uuid_t gfid; + switch (rd->op) { + case GF_FOP_WRITE: - uuid_parse(ri->rec.gfid, gfid); + // record already copied. + // copy data to this node's info. - nsr_worker_log(this->name, GF_LOG_INFO, - "started recon read for file %s at offset %d at len %d\n", - ri->rec.gfid, rd->offset, rd->len); - - obj = glfs_h_create_from_handle(ctx->fs, gfid, GFAPI_HANDLE_LENGTH, NULL); - if (obj == NULL) { - GF_ASSERT(obj != NULL); - nsr_worker_log(this->name, GF_LOG_ERROR, - "creating of handle failed\n"); - goto read_out; - } + uuid_parse(ri->rec.gfid, gfid); - // The file has probably got deleted. - fd = glfs_h_open_with_xdata(ctx->fs, obj, O_RDONLY, dict); - if (fd == NULL) { - GF_ASSERT(fd != NULL); - nsr_worker_log(this->name, GF_LOG_ERROR, - "opening of file failed\n"); - goto read_out; - } + nsr_worker_log(this->name, GF_LOG_INFO, + "started recon read for file %s at offset %d at len %d\n", + ri->rec.gfid, rd->offset, rd->len); + + obj = glfs_h_create_from_handle (ctx->fs, gfid, + GFAPI_HANDLE_LENGTH, + NULL); + if (obj == NULL) { + GF_ASSERT(obj != NULL); + nsr_worker_log(this->name, GF_LOG_ERROR, + "creating of handle failed\n"); + break; + } - if (glfs_lseek_with_xdata(fd, rd->offset, SEEK_SET, dict) != rd->offset) { - GF_ASSERT(0); - nsr_worker_log(this->name, GF_LOG_ERROR, - "lseek of file failed to offset %d\n", rd->offset); - goto read_out; - } + // The file has probably got deleted. + fd = glfs_h_open_with_xdata (ctx->fs, obj, O_RDONLY, + dict); + if (fd == NULL) { + GF_ASSERT(fd != NULL); + nsr_worker_log(this->name, GF_LOG_ERROR, + "opening of file failed\n"); + break; + } - ri->work.data = GF_CALLOC(rd->len , sizeof(char), gf_mt_recon_private_t); - if (glfs_read_with_xdata(fd, ri->work.data, rd->len, 0, dict) != rd->len) { - GF_ASSERT(0); - nsr_worker_log(this->name, GF_LOG_ERROR, - "read of file failed to offset %d for bytes %d\n", rd->offset, rd->len); - goto read_out; - } + if (glfs_lseek_with_xdata (fd, rd->offset, SEEK_SET, + dict) != rd->offset) { + GF_ASSERT(0); + nsr_worker_log(this->name, GF_LOG_ERROR, + "lseek of file failed to offset %d\n", + rd->offset); + break; + } - glfs_close_with_xdata(fd, dict); - glfs_h_close(obj); + ri->work.data = GF_CALLOC (rd->len , sizeof(char), + gf_mt_recon_private_t); + if (glfs_read_with_xdata (fd, ri->work.data, rd->len, + 0, dict) != rd->len) { + GF_ASSERT(0); + nsr_worker_log(this->name, GF_LOG_ERROR, + "read of file failed to offset %d for bytes %d\n", + rd->offset, rd->len); + break; + } - } else if (rd->op == GF_FOP_FTRUNCATE) { - } else if (rd->op == GF_FOP_SYMLINK) { - } else if ((rd->op == GF_FOP_RMDIR) || (rd->op == GF_FOP_UNLINK) || - (rd->op == GF_FOP_MKNOD) || (rd->op == GF_FOP_CREATE) || - (rd->op == GF_FOP_LINK) || (rd->op == GF_FOP_MKDIR)) { - } else if (rd->op == GF_FOP_RENAME) { - } else if ((rd->op == GF_FOP_FREMOVEXATTR) || - (rd->op == GF_FOP_REMOVEXATTR) || - (rd->op == GF_FOP_SETXATTR) || - (rd->op == GF_FOP_FSETXATTR)) { + glfs_close_with_xdata(fd, dict); + glfs_h_close(obj); + break; - struct glfs_fd *fd = NULL; - struct glfs_object *obj = NULL; - uuid_t gfid; + case GF_FOP_FTRUNCATE: + case GF_FOP_SYMLINK: + case GF_FOP_RMDIR: + case GF_FOP_UNLINK: + case GF_FOP_MKNOD: + case GF_FOP_CREATE: + case GF_FOP_LINK: + case GF_FOP_MKDIR: + case GF_FOP_RENAME: + nsr_worker_log (this->name, GF_LOG_ERROR, + "unimplemented fop %u\n", rd->op); + break; - uuid_parse(ri->rec.gfid, gfid); + case GF_FOP_FREMOVEXATTR: + case GF_FOP_REMOVEXATTR: + case GF_FOP_SETXATTR: + case GF_FOP_FSETXATTR: + uuid_parse(ri->rec.gfid, gfid); - // This is for all the set attribute/extended attributes commands. - // Get all the attributes from the source and fill it in the buffer - // as a NULL seperated key and value which are in turn seperated by - // NULL. - uint32_t k_s = 0, v_s = 0; - char *t_b= NULL; - uint32_t num=0; + // This is for all the set attribute/extended + // attributes commands. Get all the attributes from + // the source and fill it in the buffer as a NULL + // seperated key and value which are in turn seperated + // by NULL. - nsr_worker_log(this->name, GF_LOG_INFO, - "doing getattr for gfid %s \n", - ri->rec.gfid); - - obj = glfs_h_create_from_handle(ctx->fs, gfid, GFAPI_HANDLE_LENGTH, NULL); - if (obj == NULL) { - GF_ASSERT(fd != NULL); - nsr_worker_log(this->name, GF_LOG_ERROR, - "creating of handle failed\n"); - goto read_out; - } + nsr_worker_log(this->name, GF_LOG_INFO, + "doing getattr for gfid %s \n", + ri->rec.gfid); - if (obj->inode->ia_type == IA_IFDIR) - fd = glfs_h_opendir_with_xdata(ctx->fs, obj, dict); - else - fd = glfs_h_open_with_xdata(ctx->fs, obj, O_RDONLY, dict); + obj = glfs_h_create_from_handle (ctx->fs, gfid, + GFAPI_HANDLE_LENGTH, + NULL); + if (obj == NULL) { + GF_ASSERT(fd != NULL); + nsr_worker_log(this->name, GF_LOG_ERROR, + "creating of handle failed\n"); + break; + } - if (fd == NULL) { - GF_ASSERT(fd != NULL); - nsr_worker_log(this->name, GF_LOG_ERROR, - "opening of file failed\n"); - goto read_out; - } + if (obj->inode->ia_type == IA_IFDIR) + fd = glfs_h_opendir_with_xdata (ctx->fs, obj, + dict); + else + fd = glfs_h_open_with_xdata (ctx->fs, obj, + O_RDONLY, dict); + + if (fd == NULL) { + GF_ASSERT(fd != NULL); + nsr_worker_log(this->name, GF_LOG_ERROR, + "opening of file failed\n"); + break; + } - if(get_xattr_total_size(fd, &t_b, &k_s, &v_s, &num, dict) == -1) { - if (t_b) free(t_b); - GF_ASSERT(0); - nsr_worker_log(this->name, GF_LOG_ERROR, - "list of xattr of gfid %s failed\n", rd->gfid); - goto read_out; - } - ri->work.data = GF_CALLOC((k_s + v_s) , sizeof(char), gf_mt_recon_private_t); - get_xattr(fd, t_b, ri->work.data, v_s, num, dict); - ri->work.num = num; - nsr_worker_log(this->name, GF_LOG_INFO, - "finished getattr for gfid %s \n", - ri->rec.gfid); - free(t_b); - goto read_out; + if (get_xattr_total_size (fd, &t_b, &k_s, &v_s, &num, + dict) == -1) { + if (t_b) free(t_b); + GF_ASSERT(0); + nsr_worker_log(this->name, GF_LOG_ERROR, + "list of xattr of gfid %s failed\n", + rd->gfid); + break; + } + ri->work.data = GF_CALLOC ((k_s + v_s) , sizeof(char), + gf_mt_recon_private_t); + get_xattr(fd, t_b, ri->work.data, v_s, num, dict); + ri->work.num = num; + nsr_worker_log(this->name, GF_LOG_INFO, + "finished getattr for gfid %s \n", + ri->rec.gfid); + free(t_b); + break; - } else if ((rd->op == GF_FOP_FSETATTR) || - (rd->op == GF_FOP_SETATTR)) { + case GF_FOP_FSETATTR: + case GF_FOP_SETATTR: + //TBD - to get the actual attrbutes and fill + // mode, uid, gid, size, atime, mtime + nsr_worker_log (this->name, GF_LOG_ERROR, + "unimplemented fop %u\n", rd->op); + break; + default: + nsr_worker_log (this->name, GF_LOG_ERROR, + "unrecognized fop %u\n", rd->op); - //TBD - to get the actual attrbutes and fill - // mode, uid, gid, size, atime, mtime - } -read_out: - nsr_worker_log(this->name, GF_LOG_INFO, - "finished recon read for gfid %s at offset %d for %d bytes \n", - rd->gfid, rd->offset, rd->len); - break; - } - case NSR_WORK_ID_SINGLE_RECONCILIATION_COMMIT: - { - dict_t * dict = NULL; - // first_index always starts with 1 but records starts at 0. - wip = work->index - (dr->workers[0].recon_info->first_index); - ri = &(dr->workers[0].recon_info->records[wip]); - rd = &(ri->rec); + } + nsr_worker_log(this->name, GF_LOG_INFO, + "finished recon read for gfid %s at offset %d for %d bytes \n", + rd->gfid, rd->offset, rd->len); + break; - nsr_worker_log(this->name, GF_LOG_INFO, - "got recon commit for index %d that has gfid %s \n", - wip, rd->gfid); - dict = dict_new (); - if (!dict) { - GF_ASSERT(0); - nsr_worker_log(this->name, GF_LOG_ERROR, - "failed allocating for dictionary\n"); - goto commit_out; - } - if (dict_set_int32(dict,RECON_TERM_XATTR,ri->work.term)) { - GF_ASSERT(0); - nsr_worker_log(this->name, GF_LOG_ERROR, - "error setting term in dict\n"); - goto commit_out; - } - if (dict_set_int32(dict,RECON_INDEX_XATTR,ri->work.index)) { - GF_ASSERT(0); - nsr_worker_log(this->name, GF_LOG_ERROR, - "error setting term in dict\n"); - goto commit_out; - } - apply_record(ctx, ri, dict); -commit_out: - nsr_worker_log(this->name, GF_LOG_INFO, - "finished recon commit for gfid %s \n", - rd->gfid); - break; - } - case NSR_WORK_ID_SINGLE_RECONCILIATION_FLUSH: - { - dict_t * dict = NULL; - dict = dict_new (); - if (!dict) { - GF_ASSERT(0); - nsr_worker_log(this->name, GF_LOG_ERROR, - "failed allocating for dictionary\n"); - goto commit_out; - } - if (dict_set_int32(dict,RECON_TERM_XATTR,ri->work.term)) { - GF_ASSERT(0); - nsr_worker_log(this->name, GF_LOG_ERROR, - "error setting term in dict\n"); - goto commit_out; - } - if (dict_set_int32(dict,RECON_INDEX_XATTR,ri->work.index)) { - GF_ASSERT(0); - nsr_worker_log(this->name, GF_LOG_ERROR, - "error setting term in dict\n"); - goto commit_out; - } + case NSR_WORK_ID_SINGLE_RECONCILIATION_COMMIT: + // first_index always starts with 1 but records starts at 0. + wip = work->index - (dr->workers[0].recon_info->first_index); + ri = &(dr->workers[0].recon_info->records[wip]); + rd = &(ri->rec); - // Increment work index with the start index - wip = work->index - (dr->workers[0].recon_info->first_index); - ri = &(dr->workers[0].recon_info->records[wip]); - rd = &(ri->rec); - //fd = glfs_open(ctx->fs, rd->gfid, O_RDONLY); //TBD - using gfid + nsr_worker_log(this->name, GF_LOG_INFO, + "got recon commit for index %d that has gfid %s \n", + wip, rd->gfid); + dict = dict_new (); + if (!dict) { + GF_ASSERT(0); + nsr_worker_log(this->name, GF_LOG_ERROR, + "failed allocating for dictionary\n"); + break; + } + if (dict_set_int32(dict,RECON_TERM_XATTR,ri->work.term)) { + GF_ASSERT(0); + nsr_worker_log(this->name, GF_LOG_ERROR, + "error setting term in dict\n"); + break; + } + if (dict_set_int32(dict,RECON_INDEX_XATTR,ri->work.index)) { + GF_ASSERT(0); + nsr_worker_log(this->name, GF_LOG_ERROR, + "error setting term in dict\n"); + break; + } + apply_record(ctx, ri, dict); - glfs_fsync_with_xdata(fd, dict); - break; - } - } - return; + nsr_worker_log(this->name, GF_LOG_INFO, + "finished recon commit for gfid %s \n", + rd->gfid); + break; + + case NSR_WORK_ID_SINGLE_RECONCILIATION_FLUSH: + dict = dict_new (); + if (!dict) { + GF_ASSERT(0); + nsr_worker_log(this->name, GF_LOG_ERROR, + "failed allocating for dictionary\n"); + break; + } + if (dict_set_int32(dict,RECON_TERM_XATTR,ri->work.term)) { + GF_ASSERT(0); + nsr_worker_log(this->name, GF_LOG_ERROR, + "error setting term in dict\n"); + break; + } + if (dict_set_int32(dict,RECON_INDEX_XATTR,ri->work.index)) { + GF_ASSERT(0); + nsr_worker_log(this->name, GF_LOG_ERROR, + "error setting term in dict\n"); + break; + } + + // Increment work index with the start index + wip = work->index - (dr->workers[0].recon_info->first_index); + ri = &(dr->workers[0].recon_info->records[wip]); + rd = &(ri->rec); + + glfs_fsync_with_xdata(fd, dict); + break; + + default: + nsr_worker_log (this->name, GF_LOG_ERROR, + "unrecognized request id %u\n", work->req_id); + } } // thread for doing data work |