diff options
Diffstat (limited to 'xlators/cluster/ec/src/ec-dir-read.c')
| -rw-r--r-- | xlators/cluster/ec/src/ec-dir-read.c | 647 |
1 files changed, 647 insertions, 0 deletions
diff --git a/xlators/cluster/ec/src/ec-dir-read.c b/xlators/cluster/ec/src/ec-dir-read.c new file mode 100644 index 00000000000..f71dcfac293 --- /dev/null +++ b/xlators/cluster/ec/src/ec-dir-read.c @@ -0,0 +1,647 @@ +/* + Copyright (c) 2012-2014 DataLab, s.l. <http://www.datalab.es> + This file is part of GlusterFS. + + This file is licensed to you under your choice of the GNU Lesser + General Public License, version 3 or any later version (LGPLv3 or + later), or the GNU General Public License, version 2 (GPLv2), in all + cases as published by the Free Software Foundation. +*/ + +#include "ec.h" +#include "ec-messages.h" +#include "ec-helpers.h" +#include "ec-common.h" +#include "ec-combine.h" +#include "ec-fops.h" + +/**************************************************************** + * + * File Operation: opendir + * + ***************************************************************/ + +int32_t +ec_combine_opendir(ec_fop_data_t *fop, ec_cbk_data_t *dst, ec_cbk_data_t *src) +{ + if (dst->fd != src->fd) { + gf_msg(fop->xl->name, GF_LOG_NOTICE, 0, EC_MSG_FD_MISMATCH, + "Mismatching fd in answers " + "of 'GF_FOP_OPENDIR': %p <-> %p", + dst->fd, src->fd); + + return 0; + } + + return 1; +} + +int32_t +ec_opendir_cbk(call_frame_t *frame, void *cookie, xlator_t *this, + int32_t op_ret, int32_t op_errno, fd_t *fd, dict_t *xdata) +{ + ec_fop_data_t *fop = NULL; + ec_cbk_data_t *cbk = NULL; + int32_t idx = (int32_t)(uintptr_t)cookie; + + VALIDATE_OR_GOTO(this, out); + GF_VALIDATE_OR_GOTO(this->name, frame, out); + GF_VALIDATE_OR_GOTO(this->name, frame->local, out); + GF_VALIDATE_OR_GOTO(this->name, this->private, out); + + fop = frame->local; + + ec_trace("CBK", fop, "idx=%d, frame=%p, op_ret=%d, op_errno=%d", idx, frame, + op_ret, op_errno); + + cbk = ec_cbk_data_allocate(frame, this, fop, GF_FOP_OPENDIR, idx, op_ret, + op_errno); + if (cbk != NULL) { + if (op_ret >= 0) { + if (fd != NULL) { + cbk->fd = fd_ref(fd); + if (cbk->fd == NULL) { + gf_msg(this->name, GF_LOG_ERROR, 0, + EC_MSG_FILE_DESC_REF_FAIL, + "Failed to reference a " + "file descriptor."); + + goto out; + } + } + } + if (xdata != NULL) { + cbk->xdata = dict_ref(xdata); + if (cbk->xdata == NULL) { + gf_msg(this->name, GF_LOG_ERROR, 0, EC_MSG_DICT_REF_FAIL, + "Failed to reference a " + "dictionary."); + + goto out; + } + } + + ec_combine(cbk, ec_combine_opendir); + + ec_update_fd_status(fd, this, idx, op_ret); + } + +out: + if (fop != NULL) { + ec_complete(fop); + } + + return 0; +} + +void +ec_wind_opendir(ec_t *ec, ec_fop_data_t *fop, int32_t idx) +{ + ec_trace("WIND", fop, "idx=%d", idx); + + STACK_WIND_COOKIE(fop->frame, ec_opendir_cbk, (void *)(uintptr_t)idx, + ec->xl_list[idx], ec->xl_list[idx]->fops->opendir, + &fop->loc[0], fop->fd, fop->xdata); +} + +int32_t +ec_manager_opendir(ec_fop_data_t *fop, int32_t state) +{ + ec_cbk_data_t *cbk; + ec_fd_t *ctx; + int32_t err; + + switch (state) { + case EC_STATE_INIT: + LOCK(&fop->fd->lock); + + ctx = __ec_fd_get(fop->fd, fop->xl); + if (ctx == NULL) { + UNLOCK(&fop->fd->lock); + + fop->error = ENOMEM; + + return EC_STATE_REPORT; + } + if (!ctx->loc.inode) { + err = ec_loc_from_loc(fop->xl, &ctx->loc, &fop->loc[0]); + if (err != 0) { + UNLOCK(&fop->fd->lock); + + fop->error = -err; + + return EC_STATE_REPORT; + } + } + + UNLOCK(&fop->fd->lock); + + /* Fall through */ + + case EC_STATE_LOCK: + ec_lock_prepare_inode(fop, &fop->loc[0], EC_QUERY_INFO, 0, + EC_RANGE_FULL); + ec_lock(fop); + + return EC_STATE_DISPATCH; + + case EC_STATE_DISPATCH: + ec_dispatch_all(fop); + + return EC_STATE_PREPARE_ANSWER; + + case EC_STATE_PREPARE_ANSWER: + cbk = ec_fop_prepare_answer(fop, _gf_true); + if (cbk != NULL) { + /* Save which subvolumes successfully opened the directory. + * If ctx->open is 0, it means that readdir cannot be + * processed in this directory. + */ + LOCK(&fop->fd->lock); + + ctx = __ec_fd_get(fop->fd, fop->xl); + if (ctx != NULL) { + ctx->open |= cbk->mask; + } + + UNLOCK(&fop->fd->lock); + } + + return EC_STATE_REPORT; + + case EC_STATE_REPORT: + cbk = fop->answer; + + GF_ASSERT(cbk != NULL); + + if (fop->cbks.opendir != NULL) { + fop->cbks.opendir(fop->req_frame, fop, fop->xl, cbk->op_ret, + cbk->op_errno, cbk->fd, cbk->xdata); + } + + return EC_STATE_LOCK_REUSE; + + case -EC_STATE_INIT: + case -EC_STATE_LOCK: + case -EC_STATE_DISPATCH: + case -EC_STATE_PREPARE_ANSWER: + case -EC_STATE_REPORT: + GF_ASSERT(fop->error != 0); + + if (fop->cbks.opendir != NULL) { + fop->cbks.opendir(fop->req_frame, fop, fop->xl, -1, fop->error, + NULL, NULL); + } + + return EC_STATE_LOCK_REUSE; + + case -EC_STATE_LOCK_REUSE: + case EC_STATE_LOCK_REUSE: + ec_lock_reuse(fop); + + return EC_STATE_UNLOCK; + + case -EC_STATE_UNLOCK: + case EC_STATE_UNLOCK: + ec_unlock(fop); + + return EC_STATE_END; + + default: + gf_msg(fop->xl->name, GF_LOG_ERROR, EINVAL, EC_MSG_UNHANDLED_STATE, + "Unhandled state %d for %s", state, ec_fop_name(fop->id)); + + return EC_STATE_END; + } +} + +void +ec_opendir(call_frame_t *frame, xlator_t *this, uintptr_t target, + uint32_t fop_flags, fop_opendir_cbk_t func, void *data, loc_t *loc, + fd_t *fd, dict_t *xdata) +{ + ec_cbk_t callback = {.opendir = func}; + ec_fop_data_t *fop = NULL; + int32_t error = ENOMEM; + + gf_msg_trace("ec", 0, "EC(OPENDIR) %p", frame); + + VALIDATE_OR_GOTO(this, out); + GF_VALIDATE_OR_GOTO(this->name, frame, out); + GF_VALIDATE_OR_GOTO(this->name, this->private, out); + + fop = ec_fop_data_allocate(frame, this, GF_FOP_OPENDIR, EC_FLAG_LOCK_SHARED, + target, fop_flags, ec_wind_opendir, + ec_manager_opendir, callback, data); + if (fop == NULL) { + goto out; + } + + if (loc != NULL) { + if (loc_copy(&fop->loc[0], loc) != 0) { + gf_msg(this->name, GF_LOG_ERROR, ENOMEM, EC_MSG_LOC_COPY_FAIL, + "Failed to copy a location."); + + goto out; + } + } + if (fd != NULL) { + fop->fd = fd_ref(fd); + if (fop->fd == NULL) { + gf_msg(this->name, GF_LOG_ERROR, 0, EC_MSG_FILE_DESC_REF_FAIL, + "Failed to reference a " + "file descriptor."); + + goto out; + } + } + if (xdata != NULL) { + fop->xdata = dict_ref(xdata); + if (fop->xdata == NULL) { + gf_msg(this->name, GF_LOG_ERROR, 0, EC_MSG_DICT_REF_FAIL, + "Failed to reference a " + "dictionary."); + + goto out; + } + } + + error = 0; + +out: + if (fop != NULL) { + ec_manager(fop, error); + } else { + func(frame, NULL, this, -1, error, NULL, NULL); + } +} + +/* Returns -1 if client_id is invalid else index of child subvol in xl_list */ +int +ec_deitransform(xlator_t *this, off_t offset) +{ + int idx = -1; + int client_id = -1; + ec_t *ec = this->private; + char id[32] = {0}; + int err; + + client_id = gf_deitransform(this, offset); + sprintf(id, "%d", client_id); + err = dict_get_int32(ec->leaf_to_subvolid, id, &idx); + if (err < 0) { + idx = err; + goto out; + } + +out: + if (idx < 0) { + gf_msg(this->name, GF_LOG_ERROR, EINVAL, EC_MSG_INVALID_REQUEST, + "Invalid index %d in readdirp request", client_id); + idx = -EINVAL; + } + return idx; +} + +/* FOP: readdir */ + +void +ec_adjust_readdirp(ec_t *ec, int32_t idx, gf_dirent_t *entries) +{ + gf_dirent_t *entry; + + list_for_each_entry(entry, &entries->list, list) + { + if (!entry->inode) + continue; + + if (entry->d_stat.ia_type == IA_IFREG) { + if ((entry->dict == NULL) || + (ec_dict_del_number(entry->dict, EC_XATTR_SIZE, + &entry->d_stat.ia_size) != 0)) { + inode_unref(entry->inode); + entry->inode = NULL; + } else { + ec_iatt_rebuild(ec, &entry->d_stat, 1, 1); + } + } + } +} + +int32_t +ec_common_readdir_cbk(call_frame_t *frame, void *cookie, xlator_t *this, + int32_t op_ret, int32_t op_errno, gf_dirent_t *entries, + dict_t *xdata) +{ + ec_fop_data_t *fop = NULL; + ec_cbk_data_t *cbk = NULL; + int32_t idx = (int32_t)(uintptr_t)cookie; + + VALIDATE_OR_GOTO(this, out); + GF_VALIDATE_OR_GOTO(this->name, frame, out); + GF_VALIDATE_OR_GOTO(this->name, frame->local, out); + GF_VALIDATE_OR_GOTO(this->name, this->private, out); + + fop = frame->local; + + ec_trace("CBK", fop, "idx=%d, frame=%p, op_ret=%d, op_errno=%d", idx, frame, + op_ret, op_errno); + + cbk = ec_cbk_data_allocate(frame, this, fop, fop->id, idx, op_ret, + op_errno); + if (cbk) { + if (xdata) + cbk->xdata = dict_ref(xdata); + if (cbk->op_ret >= 0) + list_splice_init(&entries->list, &cbk->entries.list); + ec_combine(cbk, NULL); + } + +out: + if (fop != NULL) { + ec_complete(fop); + } + + return 0; +} + +void +ec_wind_readdir(ec_t *ec, ec_fop_data_t *fop, int32_t idx) +{ + ec_trace("WIND", fop, "idx=%d", idx); + + STACK_WIND_COOKIE(fop->frame, ec_common_readdir_cbk, (void *)(uintptr_t)idx, + ec->xl_list[idx], ec->xl_list[idx]->fops->readdir, + fop->fd, fop->size, fop->offset, fop->xdata); +} + +int32_t +ec_manager_readdir(ec_fop_data_t *fop, int32_t state) +{ + ec_fd_t *ctx = NULL; + ec_cbk_data_t *cbk = NULL; + + switch (state) { + case EC_STATE_INIT: + /* Return error if opendir has not been successfully called on + * any subvolume. */ + ctx = ec_fd_get(fop->fd, fop->xl); + if (ctx == NULL) { + fop->error = ENOMEM; + } else if (ctx->open == 0) { + fop->error = EBADFD; + } + + if (fop->error) { + gf_msg(fop->xl->name, GF_LOG_ERROR, fop->error, + EC_MSG_INVALID_REQUEST, "EC is not winding readdir: %s", + ec_msg_str(fop)); + return EC_STATE_REPORT; + } + + if (fop->id == GF_FOP_READDIRP) { + int32_t err; + + if (fop->xdata == NULL) { + fop->xdata = dict_new(); + if (fop->xdata == NULL) { + fop->error = ENOMEM; + + return EC_STATE_REPORT; + } + } + + err = dict_set_uint64(fop->xdata, EC_XATTR_SIZE, 0); + if (err != 0) { + fop->error = -err; + + return EC_STATE_REPORT; + } + } + + if (fop->offset != 0) { + /* Non-zero offset is irrecoverable error as the offset may not + * be valid on other bricks*/ + int32_t idx = -1; + + idx = ec_deitransform(fop->xl, fop->offset); + + if (idx < 0) { + fop->error = -idx; + return EC_STATE_REPORT; + } + fop->mask &= 1ULL << idx; + } else { + ec_lock_prepare_fd(fop, fop->fd, EC_QUERY_INFO, 0, + EC_RANGE_FULL); + ec_lock(fop); + } + + return EC_STATE_DISPATCH; + + case EC_STATE_DISPATCH: + ec_dispatch_one(fop); + + return EC_STATE_PREPARE_ANSWER; + + case EC_STATE_PREPARE_ANSWER: + if (ec_dispatch_one_retry(fop, &cbk)) { + return EC_STATE_DISPATCH; + } + + if ((cbk != NULL) && (cbk->op_ret > 0) && + (fop->id == GF_FOP_READDIRP)) { + ec_adjust_readdirp(fop->xl->private, cbk->idx, &cbk->entries); + } + + return EC_STATE_REPORT; + + case EC_STATE_REPORT: + cbk = fop->answer; + GF_ASSERT(cbk); + if (fop->id == GF_FOP_READDIR) { + if (fop->cbks.readdir != NULL) { + fop->cbks.readdir(fop->req_frame, fop, fop->xl, cbk->op_ret, + cbk->op_errno, &cbk->entries, cbk->xdata); + } + } else { + if (fop->cbks.readdirp != NULL) { + fop->cbks.readdirp(fop->req_frame, fop, fop->xl, + cbk->op_ret, cbk->op_errno, + &cbk->entries, cbk->xdata); + } + } + if (fop->offset == 0) + return EC_STATE_LOCK_REUSE; + else + return EC_STATE_END; + + case -EC_STATE_INIT: + case -EC_STATE_LOCK: + case -EC_STATE_DISPATCH: + case -EC_STATE_PREPARE_ANSWER: + case -EC_STATE_REPORT: + if (fop->id == GF_FOP_READDIR) { + if (fop->cbks.readdir != NULL) { + fop->cbks.readdir(fop->req_frame, fop, fop->xl, -1, + fop->error, NULL, NULL); + } + } else { + if (fop->cbks.readdirp != NULL) { + fop->cbks.readdirp(fop->req_frame, fop, fop->xl, -1, + fop->error, NULL, NULL); + } + } + if (fop->offset == 0) + return EC_STATE_LOCK_REUSE; + else + return EC_STATE_END; + + case -EC_STATE_LOCK_REUSE: + case EC_STATE_LOCK_REUSE: + GF_ASSERT(fop->offset == 0); + ec_lock_reuse(fop); + + return EC_STATE_UNLOCK; + + case -EC_STATE_UNLOCK: + case EC_STATE_UNLOCK: + GF_ASSERT(fop->offset == 0); + ec_unlock(fop); + + return EC_STATE_END; + default: + gf_msg(fop->xl->name, GF_LOG_ERROR, EINVAL, EC_MSG_UNHANDLED_STATE, + "Unhandled state %d for %s", state, ec_fop_name(fop->id)); + + return EC_STATE_END; + } +} + +void +ec_readdir(call_frame_t *frame, xlator_t *this, uintptr_t target, + uint32_t fop_flags, fop_readdir_cbk_t func, void *data, fd_t *fd, + size_t size, off_t offset, dict_t *xdata) +{ + ec_cbk_t callback = {.readdir = func}; + ec_fop_data_t *fop = NULL; + int32_t error = ENOMEM; + + gf_msg_trace("ec", 0, "EC(READDIR) %p", frame); + + VALIDATE_OR_GOTO(this, out); + GF_VALIDATE_OR_GOTO(this->name, frame, out); + GF_VALIDATE_OR_GOTO(this->name, this->private, out); + + fop = ec_fop_data_allocate(frame, this, GF_FOP_READDIR, EC_FLAG_LOCK_SHARED, + target, fop_flags, ec_wind_readdir, + ec_manager_readdir, callback, data); + if (fop == NULL) { + goto out; + } + + fop->use_fd = 1; + + fop->size = size; + fop->offset = offset; + + if (fd != NULL) { + fop->fd = fd_ref(fd); + if (fop->fd == NULL) { + gf_msg(this->name, GF_LOG_ERROR, 0, EC_MSG_FILE_DESC_REF_FAIL, + "Failed to reference a " + "file descriptor."); + + goto out; + } + } + if (xdata != NULL) { + fop->xdata = dict_ref(xdata); + if (fop->xdata == NULL) { + gf_msg(this->name, GF_LOG_ERROR, 0, EC_MSG_DICT_REF_FAIL, + "Failed to reference a " + "dictionary."); + + goto out; + } + } + + error = 0; + +out: + if (fop != NULL) { + ec_manager(fop, error); + } else { + func(frame, NULL, this, -1, error, NULL, NULL); + } +} + +/* FOP: readdirp */ + +void +ec_wind_readdirp(ec_t *ec, ec_fop_data_t *fop, int32_t idx) +{ + ec_trace("WIND", fop, "idx=%d", idx); + + STACK_WIND_COOKIE(fop->frame, ec_common_readdir_cbk, (void *)(uintptr_t)idx, + ec->xl_list[idx], ec->xl_list[idx]->fops->readdirp, + fop->fd, fop->size, fop->offset, fop->xdata); +} + +void +ec_readdirp(call_frame_t *frame, xlator_t *this, uintptr_t target, + uint32_t fop_flags, fop_readdirp_cbk_t func, void *data, fd_t *fd, + size_t size, off_t offset, dict_t *xdata) +{ + ec_cbk_t callback = {.readdirp = func}; + ec_fop_data_t *fop = NULL; + int32_t error = ENOMEM; + + gf_msg_trace("ec", 0, "EC(READDIRP) %p", frame); + + VALIDATE_OR_GOTO(this, out); + GF_VALIDATE_OR_GOTO(this->name, frame, out); + GF_VALIDATE_OR_GOTO(this->name, this->private, out); + + fop = ec_fop_data_allocate( + frame, this, GF_FOP_READDIRP, EC_FLAG_LOCK_SHARED, target, fop_flags, + ec_wind_readdirp, ec_manager_readdir, callback, data); + if (fop == NULL) { + goto out; + } + + fop->use_fd = 1; + + fop->size = size; + fop->offset = offset; + + if (fd != NULL) { + fop->fd = fd_ref(fd); + if (fop->fd == NULL) { + gf_msg(this->name, GF_LOG_ERROR, 0, EC_MSG_FILE_DESC_REF_FAIL, + "Failed to reference a " + "file descriptor."); + + goto out; + } + } + if (xdata != NULL) { + fop->xdata = dict_ref(xdata); + if (fop->xdata == NULL) { + gf_msg(this->name, GF_LOG_ERROR, 0, EC_MSG_DICT_REF_FAIL, + "Failed to reference a " + "dictionary."); + + goto out; + } + } + + error = 0; + +out: + if (fop != NULL) { + ec_manager(fop, error); + } else { + func(frame, NULL, this, -1, error, NULL, NULL); + } +} |
