/* Copyright (c) 2008-2012 Red Hat, Inc. This file is part of GlusterFS. This file is licensed to you under your choice of the GNU Lesser General Public License, version 3 or any later version (LGPLv3 or later), or the GNU General Public License, version 2 (GPLv2), in all cases as published by the Free Software Foundation. */ #include #include #include #include #include #include #include #include #include #include #include #include #include "afr.h" #include "afr-transaction.h" int32_t afr_opendir_cbk(call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno, fd_t *fd, dict_t *xdata) { afr_local_t *local = NULL; int call_count = -1; int32_t child_index = 0; afr_fd_ctx_t *fd_ctx = NULL; local = frame->local; fd_ctx = local->fd_ctx; child_index = (long)cookie; local->replies[child_index].valid = 1; local->replies[child_index].op_ret = op_ret; local->replies[child_index].op_errno = op_errno; LOCK(&frame->lock); { if (op_ret == -1) { local->op_errno = op_errno; fd_ctx->opened_on[child_index] = AFR_FD_NOT_OPENED; } else { local->op_ret = op_ret; fd_ctx->opened_on[child_index] = AFR_FD_OPENED; if (!local->xdata_rsp && xdata) local->xdata_rsp = dict_ref(xdata); } call_count = --local->call_count; } UNLOCK(&frame->lock); if (call_count == 0) { afr_handle_replies_quorum(frame, this); AFR_STACK_UNWIND(opendir, frame, local->op_ret, local->op_errno, local->fd, NULL); } return 0; } int afr_opendir(call_frame_t *frame, xlator_t *this, loc_t *loc, fd_t *fd) { afr_private_t *priv = NULL; afr_local_t *local = NULL; int i = 0; int call_count = -1; int32_t op_errno = ENOMEM; afr_fd_ctx_t *fd_ctx = NULL; priv = this->private; local = AFR_FRAME_INIT(frame, op_errno); if (!local) goto out; local->op = GF_FOP_OPENDIR; if (priv->quorum_count && !afr_has_quorum(local->child_up, this, NULL)) { op_errno = afr_quorum_errno(priv); goto out; } if (!afr_is_consistent_io_possible(local, priv, &op_errno)) goto out; fd_ctx = afr_fd_ctx_get(fd, this); if (!fd_ctx) goto out; loc_copy(&local->loc, loc); local->fd = fd_ref(fd); local->fd_ctx = fd_ctx; call_count = local->call_count; for (i = 0; i < priv->child_count; i++) { if (local->child_up[i]) { STACK_WIND_COOKIE(frame, afr_opendir_cbk, (void *)(long)i, priv->children[i], priv->children[i]->fops->opendir, loc, fd, NULL); if (!--call_count) break; } } return 0; out: AFR_STACK_UNWIND(opendir, frame, -1, op_errno, fd, NULL); return 0; } static int afr_validate_read_subvol(inode_t *inode, xlator_t *this, int par_read_subvol) { int gen = 0; int entry_read_subvol = 0; unsigned char *data_readable = NULL; unsigned char *metadata_readable = NULL; afr_private_t *priv = NULL; priv = this->private; data_readable = alloca0(priv->child_count); metadata_readable = alloca0(priv->child_count); afr_inode_read_subvol_get(inode, this, data_readable, metadata_readable, &gen); if (gen != priv->event_generation || !data_readable[par_read_subvol] || !metadata_readable[par_read_subvol]) return -1; /* Once the control reaches the following statement, it means that the * parent's read subvol is perfectly readable. So calling * either afr_data_subvol_get() or afr_metadata_subvol_get() would * yield the same result. Hence, choosing afr_data_subvol_get() below. */ if (!priv->consistent_metadata) return 0; /* For an inode fetched through readdirp which is yet to be linked, * inode ctx would not be initialised (yet). So this function returns * -1 above due to gen being 0, which is why it is OK to pass NULL for * read_subvol_args here. */ entry_read_subvol = afr_data_subvol_get(inode, this, NULL, NULL, NULL, NULL); if (entry_read_subvol != par_read_subvol) return -1; return 0; } static void afr_readdir_transform_entries(gf_dirent_t *subvol_entries, int subvol, gf_dirent_t *entries, fd_t *fd) { int ret = -1; gf_dirent_t *entry = NULL; gf_dirent_t *tmp = NULL; xlator_t *this = NULL; afr_private_t *priv = NULL; gf_boolean_t need_heal = _gf_false; gf_boolean_t validate_subvol = _gf_false; this = THIS; priv = this->private; need_heal = afr_get_need_heal(this); validate_subvol = need_heal | priv->consistent_metadata; list_for_each_entry_safe(entry, tmp, &subvol_entries->list, list) { if (__is_root_gfid(fd->inode->gfid) && !strcmp(entry->d_name, GF_REPLICATE_TRASH_DIR)) { continue; } list_del_init(&entry->list); list_add_tail(&entry->list, &entries->list); if (!validate_subvol) continue; if (entry->inode) { ret = afr_validate_read_subvol(entry->inode, this, subvol); if (ret == -1) { inode_unref(entry->inode); entry->inode = NULL; continue; } } } } int32_t afr_readdir_cbk(call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno, gf_dirent_t *subvol_entries, dict_t *xdata) { afr_local_t *local = NULL; gf_dirent_t entries; INIT_LIST_HEAD(&entries.list); local = frame->local; if (op_ret < 0 && !local->cont.readdir.offset) { /* failover only if this was first readdir, detected by offset == 0 */ local->op_ret = op_ret; local->op_errno = op_errno; afr_read_txn_continue(frame, this, (long)cookie); return 0; } if (op_ret >= 0) afr_readdir_transform_entries(subvol_entries, (long)cookie, &entries, local->fd); AFR_STACK_UNWIND(readdir, frame, op_ret, op_errno, &entries, xdata); gf_dirent_free(&entries); return 0; } int afr_readdir_wind(call_frame_t *frame, xlator_t *this, int subvol) { afr_local_t *local = NULL; afr_private_t *priv = NULL; afr_fd_ctx_t *fd_ctx = NULL; priv = this->private; local = frame->local; fd_ctx = afr_fd_ctx_get(local->fd, this); if (!fd_ctx) { local->op_errno = EINVAL; local->op_ret = -1; } if (subvol == -1 || !fd_ctx) { AFR_STACK_UNWIND(readdir, frame, local->op_ret, local->op_errno, 0, 0); return 0; } fd_ctx->readdir_subvol = subvol; if (local->op == GF_FOP_READDIR) STACK_WIND_COOKIE(frame, afr_readdir_cbk, (void *)(long)subvol, priv->children[subvol], priv->children[subvol]->fops->readdir, local->fd, local->cont.readdir.size, local->cont.readdir.offset, local->xdata_req); else STACK_WIND_COOKIE(frame, afr_readdir_cbk, (void *)(long)subvol, priv->children[subvol], priv->children[subvol]->fops->readdirp, local->fd, local->cont.readdir.size, local->cont.readdir.offset, local->xdata_req); return 0; } int afr_do_readdir(call_frame_t *frame, xlator_t *this, fd_t *fd, size_t size, off_t offset, int whichop, dict_t *dict) { afr_local_t *local = NULL; int32_t op_errno = 0; int subvol = -1; afr_fd_ctx_t *fd_ctx = NULL; local = AFR_FRAME_INIT(frame, op_errno); if (!local) goto out; fd_ctx = afr_fd_ctx_get(fd, this); if (!fd_ctx) { op_errno = EINVAL; goto out; } local->op = whichop; local->fd = fd_ref(fd); local->cont.readdir.size = size; local->cont.readdir.offset = offset; local->xdata_req = (dict) ? dict_ref(dict) : NULL; subvol = fd_ctx->readdir_subvol; if (offset == 0 || subvol == -1) { /* First readdir has option of failing over and selecting an appropriate read subvolume */ afr_read_txn(frame, this, fd->inode, afr_readdir_wind, AFR_DATA_TRANSACTION); } else { /* But continued readdirs MUST stick to the same subvolume without an option to failover */ afr_readdir_wind(frame, this, subvol); } return 0; out: AFR_STACK_UNWIND(readdir, frame, -1, op_errno, NULL, NULL); return 0; } int32_t afr_readdir(call_frame_t *frame, xlator_t *this, fd_t *fd, size_t size, off_t offset, dict_t *xdata) { afr_do_readdir(frame, this, fd, size, offset, GF_FOP_READDIR, xdata); return 0; } int32_t afr_readdirp(call_frame_t *frame, xlator_t *this, fd_t *fd, size_t size, off_t offset, dict_t *dict) { afr_do_readdir(frame, this, fd, size, offset, GF_FOP_READDIRP, dict); return 0; } int32_t afr_releasedir(xlator_t *this, fd_t *fd) { afr_cleanup_fd_ctx(this, fd); return 0; }