summaryrefslogtreecommitdiffstats
path: root/xlators/cluster/ec/src/ec-dir-read.c
diff options
context:
space:
mode:
Diffstat (limited to 'xlators/cluster/ec/src/ec-dir-read.c')
-rw-r--r--xlators/cluster/ec/src/ec-dir-read.c647
1 files changed, 647 insertions, 0 deletions
diff --git a/xlators/cluster/ec/src/ec-dir-read.c b/xlators/cluster/ec/src/ec-dir-read.c
new file mode 100644
index 00000000000..f71dcfac293
--- /dev/null
+++ b/xlators/cluster/ec/src/ec-dir-read.c
@@ -0,0 +1,647 @@
+/*
+ Copyright (c) 2012-2014 DataLab, s.l. <http://www.datalab.es>
+ This file is part of GlusterFS.
+
+ This file is licensed to you under your choice of the GNU Lesser
+ General Public License, version 3 or any later version (LGPLv3 or
+ later), or the GNU General Public License, version 2 (GPLv2), in all
+ cases as published by the Free Software Foundation.
+*/
+
+#include "ec.h"
+#include "ec-messages.h"
+#include "ec-helpers.h"
+#include "ec-common.h"
+#include "ec-combine.h"
+#include "ec-fops.h"
+
+/****************************************************************
+ *
+ * File Operation: opendir
+ *
+ ***************************************************************/
+
+int32_t
+ec_combine_opendir(ec_fop_data_t *fop, ec_cbk_data_t *dst, ec_cbk_data_t *src)
+{
+ if (dst->fd != src->fd) {
+ gf_msg(fop->xl->name, GF_LOG_NOTICE, 0, EC_MSG_FD_MISMATCH,
+ "Mismatching fd in answers "
+ "of 'GF_FOP_OPENDIR': %p <-> %p",
+ dst->fd, src->fd);
+
+ return 0;
+ }
+
+ return 1;
+}
+
+int32_t
+ec_opendir_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
+ int32_t op_ret, int32_t op_errno, fd_t *fd, dict_t *xdata)
+{
+ ec_fop_data_t *fop = NULL;
+ ec_cbk_data_t *cbk = NULL;
+ int32_t idx = (int32_t)(uintptr_t)cookie;
+
+ VALIDATE_OR_GOTO(this, out);
+ GF_VALIDATE_OR_GOTO(this->name, frame, out);
+ GF_VALIDATE_OR_GOTO(this->name, frame->local, out);
+ GF_VALIDATE_OR_GOTO(this->name, this->private, out);
+
+ fop = frame->local;
+
+ ec_trace("CBK", fop, "idx=%d, frame=%p, op_ret=%d, op_errno=%d", idx, frame,
+ op_ret, op_errno);
+
+ cbk = ec_cbk_data_allocate(frame, this, fop, GF_FOP_OPENDIR, idx, op_ret,
+ op_errno);
+ if (cbk != NULL) {
+ if (op_ret >= 0) {
+ if (fd != NULL) {
+ cbk->fd = fd_ref(fd);
+ if (cbk->fd == NULL) {
+ gf_msg(this->name, GF_LOG_ERROR, 0,
+ EC_MSG_FILE_DESC_REF_FAIL,
+ "Failed to reference a "
+ "file descriptor.");
+
+ goto out;
+ }
+ }
+ }
+ if (xdata != NULL) {
+ cbk->xdata = dict_ref(xdata);
+ if (cbk->xdata == NULL) {
+ gf_msg(this->name, GF_LOG_ERROR, 0, EC_MSG_DICT_REF_FAIL,
+ "Failed to reference a "
+ "dictionary.");
+
+ goto out;
+ }
+ }
+
+ ec_combine(cbk, ec_combine_opendir);
+
+ ec_update_fd_status(fd, this, idx, op_ret);
+ }
+
+out:
+ if (fop != NULL) {
+ ec_complete(fop);
+ }
+
+ return 0;
+}
+
+void
+ec_wind_opendir(ec_t *ec, ec_fop_data_t *fop, int32_t idx)
+{
+ ec_trace("WIND", fop, "idx=%d", idx);
+
+ STACK_WIND_COOKIE(fop->frame, ec_opendir_cbk, (void *)(uintptr_t)idx,
+ ec->xl_list[idx], ec->xl_list[idx]->fops->opendir,
+ &fop->loc[0], fop->fd, fop->xdata);
+}
+
+int32_t
+ec_manager_opendir(ec_fop_data_t *fop, int32_t state)
+{
+ ec_cbk_data_t *cbk;
+ ec_fd_t *ctx;
+ int32_t err;
+
+ switch (state) {
+ case EC_STATE_INIT:
+ LOCK(&fop->fd->lock);
+
+ ctx = __ec_fd_get(fop->fd, fop->xl);
+ if (ctx == NULL) {
+ UNLOCK(&fop->fd->lock);
+
+ fop->error = ENOMEM;
+
+ return EC_STATE_REPORT;
+ }
+ if (!ctx->loc.inode) {
+ err = ec_loc_from_loc(fop->xl, &ctx->loc, &fop->loc[0]);
+ if (err != 0) {
+ UNLOCK(&fop->fd->lock);
+
+ fop->error = -err;
+
+ return EC_STATE_REPORT;
+ }
+ }
+
+ UNLOCK(&fop->fd->lock);
+
+ /* Fall through */
+
+ case EC_STATE_LOCK:
+ ec_lock_prepare_inode(fop, &fop->loc[0], EC_QUERY_INFO, 0,
+ EC_RANGE_FULL);
+ ec_lock(fop);
+
+ return EC_STATE_DISPATCH;
+
+ case EC_STATE_DISPATCH:
+ ec_dispatch_all(fop);
+
+ return EC_STATE_PREPARE_ANSWER;
+
+ case EC_STATE_PREPARE_ANSWER:
+ cbk = ec_fop_prepare_answer(fop, _gf_true);
+ if (cbk != NULL) {
+ /* Save which subvolumes successfully opened the directory.
+ * If ctx->open is 0, it means that readdir cannot be
+ * processed in this directory.
+ */
+ LOCK(&fop->fd->lock);
+
+ ctx = __ec_fd_get(fop->fd, fop->xl);
+ if (ctx != NULL) {
+ ctx->open |= cbk->mask;
+ }
+
+ UNLOCK(&fop->fd->lock);
+ }
+
+ return EC_STATE_REPORT;
+
+ case EC_STATE_REPORT:
+ cbk = fop->answer;
+
+ GF_ASSERT(cbk != NULL);
+
+ if (fop->cbks.opendir != NULL) {
+ fop->cbks.opendir(fop->req_frame, fop, fop->xl, cbk->op_ret,
+ cbk->op_errno, cbk->fd, cbk->xdata);
+ }
+
+ return EC_STATE_LOCK_REUSE;
+
+ case -EC_STATE_INIT:
+ case -EC_STATE_LOCK:
+ case -EC_STATE_DISPATCH:
+ case -EC_STATE_PREPARE_ANSWER:
+ case -EC_STATE_REPORT:
+ GF_ASSERT(fop->error != 0);
+
+ if (fop->cbks.opendir != NULL) {
+ fop->cbks.opendir(fop->req_frame, fop, fop->xl, -1, fop->error,
+ NULL, NULL);
+ }
+
+ return EC_STATE_LOCK_REUSE;
+
+ case -EC_STATE_LOCK_REUSE:
+ case EC_STATE_LOCK_REUSE:
+ ec_lock_reuse(fop);
+
+ return EC_STATE_UNLOCK;
+
+ case -EC_STATE_UNLOCK:
+ case EC_STATE_UNLOCK:
+ ec_unlock(fop);
+
+ return EC_STATE_END;
+
+ default:
+ gf_msg(fop->xl->name, GF_LOG_ERROR, EINVAL, EC_MSG_UNHANDLED_STATE,
+ "Unhandled state %d for %s", state, ec_fop_name(fop->id));
+
+ return EC_STATE_END;
+ }
+}
+
+void
+ec_opendir(call_frame_t *frame, xlator_t *this, uintptr_t target,
+ uint32_t fop_flags, fop_opendir_cbk_t func, void *data, loc_t *loc,
+ fd_t *fd, dict_t *xdata)
+{
+ ec_cbk_t callback = {.opendir = func};
+ ec_fop_data_t *fop = NULL;
+ int32_t error = ENOMEM;
+
+ gf_msg_trace("ec", 0, "EC(OPENDIR) %p", frame);
+
+ VALIDATE_OR_GOTO(this, out);
+ GF_VALIDATE_OR_GOTO(this->name, frame, out);
+ GF_VALIDATE_OR_GOTO(this->name, this->private, out);
+
+ fop = ec_fop_data_allocate(frame, this, GF_FOP_OPENDIR, EC_FLAG_LOCK_SHARED,
+ target, fop_flags, ec_wind_opendir,
+ ec_manager_opendir, callback, data);
+ if (fop == NULL) {
+ goto out;
+ }
+
+ if (loc != NULL) {
+ if (loc_copy(&fop->loc[0], loc) != 0) {
+ gf_msg(this->name, GF_LOG_ERROR, ENOMEM, EC_MSG_LOC_COPY_FAIL,
+ "Failed to copy a location.");
+
+ goto out;
+ }
+ }
+ if (fd != NULL) {
+ fop->fd = fd_ref(fd);
+ if (fop->fd == NULL) {
+ gf_msg(this->name, GF_LOG_ERROR, 0, EC_MSG_FILE_DESC_REF_FAIL,
+ "Failed to reference a "
+ "file descriptor.");
+
+ goto out;
+ }
+ }
+ if (xdata != NULL) {
+ fop->xdata = dict_ref(xdata);
+ if (fop->xdata == NULL) {
+ gf_msg(this->name, GF_LOG_ERROR, 0, EC_MSG_DICT_REF_FAIL,
+ "Failed to reference a "
+ "dictionary.");
+
+ goto out;
+ }
+ }
+
+ error = 0;
+
+out:
+ if (fop != NULL) {
+ ec_manager(fop, error);
+ } else {
+ func(frame, NULL, this, -1, error, NULL, NULL);
+ }
+}
+
+/* Returns -1 if client_id is invalid else index of child subvol in xl_list */
+int
+ec_deitransform(xlator_t *this, off_t offset)
+{
+ int idx = -1;
+ int client_id = -1;
+ ec_t *ec = this->private;
+ char id[32] = {0};
+ int err;
+
+ client_id = gf_deitransform(this, offset);
+ sprintf(id, "%d", client_id);
+ err = dict_get_int32(ec->leaf_to_subvolid, id, &idx);
+ if (err < 0) {
+ idx = err;
+ goto out;
+ }
+
+out:
+ if (idx < 0) {
+ gf_msg(this->name, GF_LOG_ERROR, EINVAL, EC_MSG_INVALID_REQUEST,
+ "Invalid index %d in readdirp request", client_id);
+ idx = -EINVAL;
+ }
+ return idx;
+}
+
+/* FOP: readdir */
+
+void
+ec_adjust_readdirp(ec_t *ec, int32_t idx, gf_dirent_t *entries)
+{
+ gf_dirent_t *entry;
+
+ list_for_each_entry(entry, &entries->list, list)
+ {
+ if (!entry->inode)
+ continue;
+
+ if (entry->d_stat.ia_type == IA_IFREG) {
+ if ((entry->dict == NULL) ||
+ (ec_dict_del_number(entry->dict, EC_XATTR_SIZE,
+ &entry->d_stat.ia_size) != 0)) {
+ inode_unref(entry->inode);
+ entry->inode = NULL;
+ } else {
+ ec_iatt_rebuild(ec, &entry->d_stat, 1, 1);
+ }
+ }
+ }
+}
+
+int32_t
+ec_common_readdir_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
+ int32_t op_ret, int32_t op_errno, gf_dirent_t *entries,
+ dict_t *xdata)
+{
+ ec_fop_data_t *fop = NULL;
+ ec_cbk_data_t *cbk = NULL;
+ int32_t idx = (int32_t)(uintptr_t)cookie;
+
+ VALIDATE_OR_GOTO(this, out);
+ GF_VALIDATE_OR_GOTO(this->name, frame, out);
+ GF_VALIDATE_OR_GOTO(this->name, frame->local, out);
+ GF_VALIDATE_OR_GOTO(this->name, this->private, out);
+
+ fop = frame->local;
+
+ ec_trace("CBK", fop, "idx=%d, frame=%p, op_ret=%d, op_errno=%d", idx, frame,
+ op_ret, op_errno);
+
+ cbk = ec_cbk_data_allocate(frame, this, fop, fop->id, idx, op_ret,
+ op_errno);
+ if (cbk) {
+ if (xdata)
+ cbk->xdata = dict_ref(xdata);
+ if (cbk->op_ret >= 0)
+ list_splice_init(&entries->list, &cbk->entries.list);
+ ec_combine(cbk, NULL);
+ }
+
+out:
+ if (fop != NULL) {
+ ec_complete(fop);
+ }
+
+ return 0;
+}
+
+void
+ec_wind_readdir(ec_t *ec, ec_fop_data_t *fop, int32_t idx)
+{
+ ec_trace("WIND", fop, "idx=%d", idx);
+
+ STACK_WIND_COOKIE(fop->frame, ec_common_readdir_cbk, (void *)(uintptr_t)idx,
+ ec->xl_list[idx], ec->xl_list[idx]->fops->readdir,
+ fop->fd, fop->size, fop->offset, fop->xdata);
+}
+
+int32_t
+ec_manager_readdir(ec_fop_data_t *fop, int32_t state)
+{
+ ec_fd_t *ctx = NULL;
+ ec_cbk_data_t *cbk = NULL;
+
+ switch (state) {
+ case EC_STATE_INIT:
+ /* Return error if opendir has not been successfully called on
+ * any subvolume. */
+ ctx = ec_fd_get(fop->fd, fop->xl);
+ if (ctx == NULL) {
+ fop->error = ENOMEM;
+ } else if (ctx->open == 0) {
+ fop->error = EBADFD;
+ }
+
+ if (fop->error) {
+ gf_msg(fop->xl->name, GF_LOG_ERROR, fop->error,
+ EC_MSG_INVALID_REQUEST, "EC is not winding readdir: %s",
+ ec_msg_str(fop));
+ return EC_STATE_REPORT;
+ }
+
+ if (fop->id == GF_FOP_READDIRP) {
+ int32_t err;
+
+ if (fop->xdata == NULL) {
+ fop->xdata = dict_new();
+ if (fop->xdata == NULL) {
+ fop->error = ENOMEM;
+
+ return EC_STATE_REPORT;
+ }
+ }
+
+ err = dict_set_uint64(fop->xdata, EC_XATTR_SIZE, 0);
+ if (err != 0) {
+ fop->error = -err;
+
+ return EC_STATE_REPORT;
+ }
+ }
+
+ if (fop->offset != 0) {
+ /* Non-zero offset is irrecoverable error as the offset may not
+ * be valid on other bricks*/
+ int32_t idx = -1;
+
+ idx = ec_deitransform(fop->xl, fop->offset);
+
+ if (idx < 0) {
+ fop->error = -idx;
+ return EC_STATE_REPORT;
+ }
+ fop->mask &= 1ULL << idx;
+ } else {
+ ec_lock_prepare_fd(fop, fop->fd, EC_QUERY_INFO, 0,
+ EC_RANGE_FULL);
+ ec_lock(fop);
+ }
+
+ return EC_STATE_DISPATCH;
+
+ case EC_STATE_DISPATCH:
+ ec_dispatch_one(fop);
+
+ return EC_STATE_PREPARE_ANSWER;
+
+ case EC_STATE_PREPARE_ANSWER:
+ if (ec_dispatch_one_retry(fop, &cbk)) {
+ return EC_STATE_DISPATCH;
+ }
+
+ if ((cbk != NULL) && (cbk->op_ret > 0) &&
+ (fop->id == GF_FOP_READDIRP)) {
+ ec_adjust_readdirp(fop->xl->private, cbk->idx, &cbk->entries);
+ }
+
+ return EC_STATE_REPORT;
+
+ case EC_STATE_REPORT:
+ cbk = fop->answer;
+ GF_ASSERT(cbk);
+ if (fop->id == GF_FOP_READDIR) {
+ if (fop->cbks.readdir != NULL) {
+ fop->cbks.readdir(fop->req_frame, fop, fop->xl, cbk->op_ret,
+ cbk->op_errno, &cbk->entries, cbk->xdata);
+ }
+ } else {
+ if (fop->cbks.readdirp != NULL) {
+ fop->cbks.readdirp(fop->req_frame, fop, fop->xl,
+ cbk->op_ret, cbk->op_errno,
+ &cbk->entries, cbk->xdata);
+ }
+ }
+ if (fop->offset == 0)
+ return EC_STATE_LOCK_REUSE;
+ else
+ return EC_STATE_END;
+
+ case -EC_STATE_INIT:
+ case -EC_STATE_LOCK:
+ case -EC_STATE_DISPATCH:
+ case -EC_STATE_PREPARE_ANSWER:
+ case -EC_STATE_REPORT:
+ if (fop->id == GF_FOP_READDIR) {
+ if (fop->cbks.readdir != NULL) {
+ fop->cbks.readdir(fop->req_frame, fop, fop->xl, -1,
+ fop->error, NULL, NULL);
+ }
+ } else {
+ if (fop->cbks.readdirp != NULL) {
+ fop->cbks.readdirp(fop->req_frame, fop, fop->xl, -1,
+ fop->error, NULL, NULL);
+ }
+ }
+ if (fop->offset == 0)
+ return EC_STATE_LOCK_REUSE;
+ else
+ return EC_STATE_END;
+
+ case -EC_STATE_LOCK_REUSE:
+ case EC_STATE_LOCK_REUSE:
+ GF_ASSERT(fop->offset == 0);
+ ec_lock_reuse(fop);
+
+ return EC_STATE_UNLOCK;
+
+ case -EC_STATE_UNLOCK:
+ case EC_STATE_UNLOCK:
+ GF_ASSERT(fop->offset == 0);
+ ec_unlock(fop);
+
+ return EC_STATE_END;
+ default:
+ gf_msg(fop->xl->name, GF_LOG_ERROR, EINVAL, EC_MSG_UNHANDLED_STATE,
+ "Unhandled state %d for %s", state, ec_fop_name(fop->id));
+
+ return EC_STATE_END;
+ }
+}
+
+void
+ec_readdir(call_frame_t *frame, xlator_t *this, uintptr_t target,
+ uint32_t fop_flags, fop_readdir_cbk_t func, void *data, fd_t *fd,
+ size_t size, off_t offset, dict_t *xdata)
+{
+ ec_cbk_t callback = {.readdir = func};
+ ec_fop_data_t *fop = NULL;
+ int32_t error = ENOMEM;
+
+ gf_msg_trace("ec", 0, "EC(READDIR) %p", frame);
+
+ VALIDATE_OR_GOTO(this, out);
+ GF_VALIDATE_OR_GOTO(this->name, frame, out);
+ GF_VALIDATE_OR_GOTO(this->name, this->private, out);
+
+ fop = ec_fop_data_allocate(frame, this, GF_FOP_READDIR, EC_FLAG_LOCK_SHARED,
+ target, fop_flags, ec_wind_readdir,
+ ec_manager_readdir, callback, data);
+ if (fop == NULL) {
+ goto out;
+ }
+
+ fop->use_fd = 1;
+
+ fop->size = size;
+ fop->offset = offset;
+
+ if (fd != NULL) {
+ fop->fd = fd_ref(fd);
+ if (fop->fd == NULL) {
+ gf_msg(this->name, GF_LOG_ERROR, 0, EC_MSG_FILE_DESC_REF_FAIL,
+ "Failed to reference a "
+ "file descriptor.");
+
+ goto out;
+ }
+ }
+ if (xdata != NULL) {
+ fop->xdata = dict_ref(xdata);
+ if (fop->xdata == NULL) {
+ gf_msg(this->name, GF_LOG_ERROR, 0, EC_MSG_DICT_REF_FAIL,
+ "Failed to reference a "
+ "dictionary.");
+
+ goto out;
+ }
+ }
+
+ error = 0;
+
+out:
+ if (fop != NULL) {
+ ec_manager(fop, error);
+ } else {
+ func(frame, NULL, this, -1, error, NULL, NULL);
+ }
+}
+
+/* FOP: readdirp */
+
+void
+ec_wind_readdirp(ec_t *ec, ec_fop_data_t *fop, int32_t idx)
+{
+ ec_trace("WIND", fop, "idx=%d", idx);
+
+ STACK_WIND_COOKIE(fop->frame, ec_common_readdir_cbk, (void *)(uintptr_t)idx,
+ ec->xl_list[idx], ec->xl_list[idx]->fops->readdirp,
+ fop->fd, fop->size, fop->offset, fop->xdata);
+}
+
+void
+ec_readdirp(call_frame_t *frame, xlator_t *this, uintptr_t target,
+ uint32_t fop_flags, fop_readdirp_cbk_t func, void *data, fd_t *fd,
+ size_t size, off_t offset, dict_t *xdata)
+{
+ ec_cbk_t callback = {.readdirp = func};
+ ec_fop_data_t *fop = NULL;
+ int32_t error = ENOMEM;
+
+ gf_msg_trace("ec", 0, "EC(READDIRP) %p", frame);
+
+ VALIDATE_OR_GOTO(this, out);
+ GF_VALIDATE_OR_GOTO(this->name, frame, out);
+ GF_VALIDATE_OR_GOTO(this->name, this->private, out);
+
+ fop = ec_fop_data_allocate(
+ frame, this, GF_FOP_READDIRP, EC_FLAG_LOCK_SHARED, target, fop_flags,
+ ec_wind_readdirp, ec_manager_readdir, callback, data);
+ if (fop == NULL) {
+ goto out;
+ }
+
+ fop->use_fd = 1;
+
+ fop->size = size;
+ fop->offset = offset;
+
+ if (fd != NULL) {
+ fop->fd = fd_ref(fd);
+ if (fop->fd == NULL) {
+ gf_msg(this->name, GF_LOG_ERROR, 0, EC_MSG_FILE_DESC_REF_FAIL,
+ "Failed to reference a "
+ "file descriptor.");
+
+ goto out;
+ }
+ }
+ if (xdata != NULL) {
+ fop->xdata = dict_ref(xdata);
+ if (fop->xdata == NULL) {
+ gf_msg(this->name, GF_LOG_ERROR, 0, EC_MSG_DICT_REF_FAIL,
+ "Failed to reference a "
+ "dictionary.");
+
+ goto out;
+ }
+ }
+
+ error = 0;
+
+out:
+ if (fop != NULL) {
+ ec_manager(fop, error);
+ } else {
+ func(frame, NULL, this, -1, error, NULL, NULL);
+ }
+}