From 77adf4cd648dce41f89469dd185deec6b6b53a0b Mon Sep 17 00:00:00 2001 From: Vikas Gorur Date: Wed, 18 Feb 2009 17:36:07 +0530 Subject: Added all files --- xlators/cluster/unify/src/unify.c | 4451 +++++++++++++++++++++++++++++++++++++ 1 file changed, 4451 insertions(+) create mode 100644 xlators/cluster/unify/src/unify.c (limited to 'xlators/cluster/unify/src/unify.c') diff --git a/xlators/cluster/unify/src/unify.c b/xlators/cluster/unify/src/unify.c new file mode 100644 index 000000000..e2a5e14b1 --- /dev/null +++ b/xlators/cluster/unify/src/unify.c @@ -0,0 +1,4451 @@ +/* + Copyright (c) 2006, 2007, 2008, 2009 Z RESEARCH, Inc. + This file is part of GlusterFS. + + GlusterFS is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published + by the Free Software Foundation; either version 3 of the License, + or (at your option) any later version. + + GlusterFS is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see + . +*/ + +/** + * xlators/cluster/unify: + * - This xlator is one of the main translator in GlusterFS, which + * actually does the clustering work of the file system. One need to + * understand that, unify assumes file to be existing in only one of + * the child node, and directories to be present on all the nodes. + * + * NOTE: + * Now, unify has support for global namespace, which is used to keep a + * global view of fs's namespace tree. The stat for directories are taken + * just from the namespace, where as for files, just 'st_ino' is taken from + * Namespace node, and other stat info is taken from the actual storage node. + * Also Namespace node helps to keep consistant inode for files across + * glusterfs (re-)mounts. + */ + +#ifndef _CONFIG_H +#define _CONFIG_H +#include "config.h" +#endif + +#include "glusterfs.h" +#include "unify.h" +#include "dict.h" +#include "xlator.h" +#include "hashfn.h" +#include "logging.h" +#include "stack.h" +#include "defaults.h" +#include "common-utils.h" +#include +#include +#include "compat-errno.h" +#include "compat.h" + +#define UNIFY_CHECK_INODE_CTX_AND_UNWIND_ON_ERR(_loc) do { \ + if (!(_loc && _loc->inode)) { \ + STACK_UNWIND (frame, -1, EINVAL, NULL, NULL, NULL); \ + return 0; \ + } \ +} while(0) + + +#define UNIFY_CHECK_FD_CTX_AND_UNWIND_ON_ERR(_fd) do { \ + if (!(_fd && !fd_ctx_get (_fd, this, NULL))) { \ + STACK_UNWIND (frame, -1, EBADFD, NULL, NULL); \ + return 0; \ + } \ +} while(0) + +#define UNIFY_CHECK_FD_AND_UNWIND_ON_ERR(_fd) do { \ + if (!_fd) { \ + STACK_UNWIND (frame, -1, EBADFD, NULL, NULL); \ + return 0; \ + } \ +} while(0) + +/** + * unify_local_wipe - free all the extra allocation of local->* here. + */ +static void +unify_local_wipe (unify_local_t *local) +{ + /* Free the strdup'd variables in the local structure */ + if (local->name) { + FREE (local->name); + } + loc_wipe (&local->loc1); + loc_wipe (&local->loc2); +} + + + +/* + * unify_normalize_stats - + */ +void +unify_normalize_stats (struct statvfs *buf, + unsigned long bsize, + unsigned long frsize) +{ + double factor; + + if (buf->f_bsize != bsize) { + factor = ((double) buf->f_bsize) / bsize; + buf->f_bsize = bsize; + buf->f_bfree = (fsblkcnt_t) (factor * buf->f_bfree); + buf->f_bavail = (fsblkcnt_t) (factor * buf->f_bavail); + } + + if (buf->f_frsize != frsize) { + factor = ((double) buf->f_frsize) / frsize; + buf->f_frsize = frsize; + buf->f_blocks = (fsblkcnt_t) (factor * buf->f_blocks); + } +} + + +xlator_t * +unify_loc_subvol (loc_t *loc, xlator_t *this) +{ + unify_private_t *priv = NULL; + xlator_t *subvol = NULL; + int16_t *list = NULL; + long index = 0; + xlator_t *subvol_i = NULL; + int ret = 0; + uint64_t tmp_list = 0; + + priv = this->private; + subvol = NS (this); + + if (!S_ISDIR (loc->inode->st_mode)) { + ret = inode_ctx_get (loc->inode, this, &tmp_list); + list = (int16_t *)(long)tmp_list; + if (!list) + goto out; + + for (index = 0; list[index] != -1; index++) { + subvol_i = priv->xl_array[list[index]]; + if (subvol_i != NS (this)) { + subvol = subvol_i; + break; + } + } + } +out: + return subvol; +} + + + +/** + * unify_statfs_cbk - + */ +int32_t +unify_statfs_cbk (call_frame_t *frame, + void *cookie, + xlator_t *this, + int32_t op_ret, + int32_t op_errno, + struct statvfs *stbuf) +{ + int32_t callcnt = 0; + struct statvfs *dict_buf = NULL; + unsigned long bsize; + unsigned long frsize; + unify_local_t *local = (unify_local_t *)frame->local; + call_frame_t *prev_frame = cookie; + + LOCK (&frame->lock); + { + if (op_ret >= 0) { + /* when a call is successfull, add it to local->dict */ + dict_buf = &local->statvfs_buf; + + if (dict_buf->f_bsize != 0) { + bsize = max (dict_buf->f_bsize, + stbuf->f_bsize); + + frsize = max (dict_buf->f_frsize, + stbuf->f_frsize); + unify_normalize_stats(dict_buf, bsize, frsize); + unify_normalize_stats(stbuf, bsize, frsize); + } else { + dict_buf->f_bsize = stbuf->f_bsize; + dict_buf->f_frsize = stbuf->f_frsize; + } + + dict_buf->f_blocks += stbuf->f_blocks; + dict_buf->f_bfree += stbuf->f_bfree; + dict_buf->f_bavail += stbuf->f_bavail; + dict_buf->f_files += stbuf->f_files; + dict_buf->f_ffree += stbuf->f_ffree; + dict_buf->f_favail += stbuf->f_favail; + dict_buf->f_fsid = stbuf->f_fsid; + dict_buf->f_flag = stbuf->f_flag; + dict_buf->f_namemax = stbuf->f_namemax; + local->op_ret = op_ret; + } else { + /* fop on storage node has failed due to some error */ + if (op_errno != ENOTCONN) { + gf_log (this->name, GF_LOG_ERROR, + "child(%s): %s", + prev_frame->this->name, + strerror (op_errno)); + } + local->op_errno = op_errno; + } + callcnt = --local->call_count; + } + UNLOCK (&frame->lock); + + if (!callcnt) { + STACK_UNWIND (frame, local->op_ret, local->op_errno, + &local->statvfs_buf); + } + + return 0; +} + +/** + * unify_statfs - + */ +int32_t +unify_statfs (call_frame_t *frame, + xlator_t *this, + loc_t *loc) +{ + unify_local_t *local = NULL; + xlator_list_t *trav = this->children; + + INIT_LOCAL (frame, local); + local->call_count = ((unify_private_t *)this->private)->child_count; + + while(trav) { + STACK_WIND (frame, + unify_statfs_cbk, + trav->xlator, + trav->xlator->fops->statfs, + loc); + trav = trav->next; + } + + return 0; +} + +/** + * unify_buf_cbk - + */ +int32_t +unify_buf_cbk (call_frame_t *frame, + void *cookie, + xlator_t *this, + int32_t op_ret, + int32_t op_errno, + struct stat *buf) +{ + int32_t callcnt = 0; + unify_private_t *priv = this->private; + unify_local_t *local = frame->local; + call_frame_t *prev_frame = cookie; + + LOCK (&frame->lock); + { + callcnt = --local->call_count; + + if (op_ret == -1) { + gf_log (this->name, GF_LOG_ERROR, + "%s(): child(%s): path(%s): %s", + gf_fop_list[frame->root->op], + prev_frame->this->name, + (local->loc1.path)?local->loc1.path:"", + strerror (op_errno)); + + local->op_errno = op_errno; + if ((op_errno == ENOENT) && priv->optimist) + local->op_ret = 0; + } + + if (op_ret >= 0) { + local->op_ret = 0; + + if (NS (this) == prev_frame->this) { + local->st_ino = buf->st_ino; + /* If the entry is directory, get the stat + from NS node */ + if (S_ISDIR (buf->st_mode) || + !local->stbuf.st_blksize) { + local->stbuf = *buf; + } + } + + if ((!S_ISDIR (buf->st_mode)) && + (NS (this) != prev_frame->this)) { + /* If file, take the stat info from Storage + node. */ + local->stbuf = *buf; + } + } + } + UNLOCK (&frame->lock); + + if (!callcnt) { + /* If the inode number is not filled, operation should + fail */ + if (!local->st_ino) + local->op_ret = -1; + + local->stbuf.st_ino = local->st_ino; + unify_local_wipe (local); + STACK_UNWIND (frame, local->op_ret, local->op_errno, + &local->stbuf); + } + + return 0; +} + +#define check_if_dht_linkfile(s) ((s->st_mode & ~S_IFMT) == S_ISVTX) + +/** + * unify_lookup_cbk - + */ +int32_t +unify_lookup_cbk (call_frame_t *frame, + void *cookie, + xlator_t *this, + int32_t op_ret, + int32_t op_errno, + inode_t *inode, + struct stat *buf, + dict_t *dict) +{ + int32_t callcnt = 0; + unify_private_t *priv = this->private; + unify_local_t *local = frame->local; + inode_t *tmp_inode = NULL; + dict_t *local_dict = NULL; + + LOCK (&frame->lock); + { + callcnt = --local->call_count; + + if (op_ret == -1) { + if ((op_errno != ENOTCONN) && (op_errno != ENOENT)) { + gf_log (this->name, GF_LOG_ERROR, + "child(%s): path(%s): %s", + priv->xl_array[(long)cookie]->name, + local->loc1.path, strerror (op_errno)); + local->op_errno = op_errno; + local->failed = 1; + + } else if (local->revalidate && + !(priv->optimist && (op_errno == ENOENT))) { + + gf_log (this->name, + (op_errno == ENOTCONN) ? + GF_LOG_DEBUG:GF_LOG_ERROR, + "child(%s): path(%s): %s", + priv->xl_array[(long)cookie]->name, + local->loc1.path, strerror (op_errno)); + local->op_errno = op_errno; + local->failed = 1; + } + } + + if (op_ret == 0) { + local->op_ret = 0; + + if (check_if_dht_linkfile(buf)) { + gf_log (this->name, GF_LOG_CRITICAL, + "file %s may be DHT link file on %s, " + "make sure the backend is not shared " + "between unify and DHT", + local->loc1.path, + priv->xl_array[(long)cookie]->name); + } + + if (local->stbuf.st_mode && local->stbuf.st_blksize) { + /* make sure we already have a stbuf + stored in local->stbuf */ + if (S_ISDIR (local->stbuf.st_mode) && + !S_ISDIR (buf->st_mode)) { + gf_log (this->name, GF_LOG_CRITICAL, + "[CRITICAL] '%s' is directory " + "on namespace, non-directory " + "on node '%s', returning EIO", + local->loc1.path, + priv->xl_array[(long)cookie]->name); + local->return_eio = 1; + } + if (!S_ISDIR (local->stbuf.st_mode) && + S_ISDIR (buf->st_mode)) { + gf_log (this->name, GF_LOG_CRITICAL, + "[CRITICAL] '%s' is directory " + "on node '%s', non-directory " + "on namespace, returning EIO", + local->loc1.path, + priv->xl_array[(long)cookie]->name); + local->return_eio = 1; + } + } + + if (!local->revalidate && !S_ISDIR (buf->st_mode)) { + /* This is the first time lookup on file*/ + if (!local->list) { + /* list is not allocated, allocate + the max possible range */ + local->list = CALLOC (1, 2 * (priv->child_count + 2)); + if (!local->list) { + gf_log (this->name, + GF_LOG_CRITICAL, + "Not enough memory"); + STACK_UNWIND (frame, -1, + ENOMEM, inode, + NULL, NULL); + return 0; + } + } + /* update the index of the list */ + local->list [local->index++] = + (int16_t)(long)cookie; + } + + if ((!local->dict) && dict && + (priv->xl_array[(long)cookie] != NS(this))) { + local->dict = dict_ref (dict); + } + + /* index of NS node is == total child count */ + if (priv->child_count == (int16_t)(long)cookie) { + /* Take the inode number from namespace */ + local->st_ino = buf->st_ino; + if (S_ISDIR (buf->st_mode) || + !(local->stbuf.st_blksize)) { + local->stbuf = *buf; + } + } else if (!S_ISDIR (buf->st_mode)) { + /* If file, then get the stat from + storage node */ + local->stbuf = *buf; + } + + if (local->st_nlink < buf->st_nlink) { + local->st_nlink = buf->st_nlink; + } + } + } + UNLOCK (&frame->lock); + + if (!callcnt) { + local_dict = local->dict; + if (local->return_eio) { + gf_log (this->name, GF_LOG_CRITICAL, + "[CRITICAL] Unable to fix the path (%s) with " + "self-heal, try manual verification. " + "returning EIO.", local->loc1.path); + unify_local_wipe (local); + STACK_UNWIND (frame, -1, EIO, inode, NULL, NULL); + if (local_dict) { + dict_unref (local_dict); + } + return 0; + } + + if (!local->stbuf.st_blksize) { + /* Inode not present */ + local->op_ret = -1; + } else { + if (!local->revalidate && + !S_ISDIR (local->stbuf.st_mode)) { + /* If its a file, big array is useless, + allocate the smaller one */ + int16_t *list = NULL; + list = CALLOC (1, 2 * (local->index + 1)); + ERR_ABORT (list); + memcpy (list, local->list, 2 * local->index); + /* Make the end of the list as -1 */ + FREE (local->list); + local->list = list; + local->list [local->index] = -1; + /* Update the inode's ctx with proper array */ + /* TODO: log on failure */ + inode_ctx_put (local->loc1.inode, this, + (uint64_t)(long)local->list); + } + + if (S_ISDIR(local->loc1.inode->st_mode)) { + /* lookup is done for directory */ + if (local->failed && priv->self_heal) { + /* Triggering self-heal */ + /* means, self-heal required for this + inode */ + local->inode_generation = 0; + priv->inode_generation++; + } + } else { + local->stbuf.st_ino = local->st_ino; + } + + local->stbuf.st_nlink = local->st_nlink; + } + if (local->op_ret == -1) { + if (!local->revalidate && local->list) + FREE (local->list); + } + + if ((local->op_ret >= 0) && local->failed && + local->revalidate) { + /* Done revalidate, but it failed */ + if (op_errno != ENOTCONN) { + gf_log (this->name, GF_LOG_ERROR, + "Revalidate failed for path(%s): %s", + local->loc1.path, strerror (op_errno)); + } + local->op_ret = -1; + } + + if ((priv->self_heal && !priv->optimist) && + (!local->revalidate && (local->op_ret == 0) && + S_ISDIR(local->stbuf.st_mode))) { + /* Let the self heal be done here */ + zr_unify_self_heal (frame, this, local); + local_dict = NULL; + } else { + /* either no self heal, or op_ret == -1 (failure) */ + tmp_inode = local->loc1.inode; + unify_local_wipe (local); + STACK_UNWIND (frame, local->op_ret, local->op_errno, + tmp_inode, &local->stbuf, local->dict); + } + if (local_dict) { + dict_unref (local_dict); + } + } + + return 0; +} + +/** + * unify_lookup - + */ +int32_t +unify_lookup (call_frame_t *frame, + xlator_t *this, + loc_t *loc, + dict_t *xattr_req) +{ + unify_local_t *local = NULL; + unify_private_t *priv = this->private; + int16_t *list = NULL; + long index = 0; + + if (!(loc && loc->inode)) { + gf_log (this->name, GF_LOG_ERROR, + "%s: Argument not right", loc?loc->path:"(null)"); + STACK_UNWIND (frame, -1, EINVAL, NULL, NULL, NULL); + return 0; + } + + /* Initialization */ + INIT_LOCAL (frame, local); + loc_copy (&local->loc1, loc); + if (local->loc1.path == NULL) { + gf_log (this->name, GF_LOG_CRITICAL, "Not enough memory :O"); + STACK_UNWIND (frame, -1, ENOMEM, loc->inode, NULL, NULL); + return 0; + } + + if (!inode_ctx_get (loc->inode, this, NULL) && + loc->inode->st_mode && + !S_ISDIR (loc->inode->st_mode)) { + uint64_t tmp_list = 0; + /* check if revalidate or fresh lookup */ + inode_ctx_get (loc->inode, this, &tmp_list); + local->list = (int16_t *)(long)tmp_list; + } + + if (local->list) { + list = local->list; + for (index = 0; list[index] != -1; index++); + if (index != 2) { + if (index < 2) { + gf_log (this->name, GF_LOG_ERROR, + "returning ESTALE for %s: file " + "count is %ld", loc->path, index); + /* Print where all the file is present */ + for (index = 0; + local->list[index] != -1; index++) { + gf_log (this->name, GF_LOG_ERROR, + "%s: found on %s", loc->path, + priv->xl_array[list[index]]->name); + } + unify_local_wipe (local); + STACK_UNWIND (frame, -1, ESTALE, + NULL, NULL, NULL); + return 0; + } else { + /* There are more than 2 presences */ + /* Just log and continue */ + gf_log (this->name, GF_LOG_ERROR, + "%s: file count is %ld", + loc->path, index); + /* Print where all the file is present */ + for (index = 0; + local->list[index] != -1; index++) { + gf_log (this->name, GF_LOG_ERROR, + "%s: found on %s", loc->path, + priv->xl_array[list[index]]->name); + } + } + } + + /* is revalidate */ + local->revalidate = 1; + + for (index = 0; list[index] != -1; index++) + local->call_count++; + + for (index = 0; list[index] != -1; index++) { + char need_break = (list[index+1] == -1); + STACK_WIND_COOKIE (frame, + unify_lookup_cbk, + (void *)(long)list[index], //cookie + priv->xl_array [list[index]], + priv->xl_array [list[index]]->fops->lookup, + loc, + xattr_req); + if (need_break) + break; + } + } else { + if (loc->inode->st_mode) { + if (inode_ctx_get (loc->inode, this, NULL)) { + inode_ctx_get (loc->inode, this, + &local->inode_generation); + } + } + /* This is first call, there is no list */ + /* call count should be all child + 1 namespace */ + local->call_count = priv->child_count + 1; + + for (index = 0; index <= priv->child_count; index++) { + STACK_WIND_COOKIE (frame, + unify_lookup_cbk, + (void *)index, //cookie + priv->xl_array[index], + priv->xl_array[index]->fops->lookup, + loc, + xattr_req); + } + } + + return 0; +} + +/** + * unify_stat - if directory, get the stat directly from NameSpace child. + * if file, check for a hint and send it only there (also to NS). + * if its a fresh stat, then do it on all the nodes. + * + * NOTE: for all the call, sending cookie as xlator pointer, which will be + * used in cbk. + */ +int32_t +unify_stat (call_frame_t *frame, + xlator_t *this, + loc_t *loc) +{ + unify_local_t *local = NULL; + unify_private_t *priv = this->private; + int16_t index = 0; + int16_t *list = NULL; + uint64_t tmp_list = 0; + + UNIFY_CHECK_INODE_CTX_AND_UNWIND_ON_ERR (loc); + + /* Initialization */ + INIT_LOCAL (frame, local); + loc_copy (&local->loc1, loc); + if (local->loc1.path == NULL) { + gf_log (this->name, GF_LOG_CRITICAL, "Not enough memory :O"); + STACK_UNWIND (frame, -1, ENOMEM, NULL); + return 0; + } + local->st_ino = loc->inode->ino; + if (S_ISDIR (loc->inode->st_mode)) { + /* Directory */ + local->call_count = 1; + STACK_WIND (frame, unify_buf_cbk, NS(this), + NS(this)->fops->stat, loc); + } else { + /* File */ + inode_ctx_get (loc->inode, this, &tmp_list); + list = (int16_t *)(long)tmp_list; + + for (index = 0; list[index] != -1; index++) + local->call_count++; + + for (index = 0; list[index] != -1; index++) { + char need_break = (list[index+1] == -1); + STACK_WIND (frame, + unify_buf_cbk, + priv->xl_array[list[index]], + priv->xl_array[list[index]]->fops->stat, + loc); + if (need_break) + break; + } + } + + return 0; +} + +/** + * unify_access_cbk - + */ +int32_t +unify_access_cbk (call_frame_t *frame, + void *cookie, + xlator_t *this, + int32_t op_ret, + int32_t op_errno) +{ + STACK_UNWIND (frame, op_ret, op_errno); + return 0; +} + + +/** + * unify_access - Send request to only namespace, which has all the + * attributes set for the file. + */ +int32_t +unify_access (call_frame_t *frame, + xlator_t *this, + loc_t *loc, + int32_t mask) +{ + UNIFY_CHECK_INODE_CTX_AND_UNWIND_ON_ERR (loc); + + STACK_WIND (frame, + unify_access_cbk, + NS(this), + NS(this)->fops->access, + loc, + mask); + + return 0; +} + +int32_t +unify_mkdir_cbk (call_frame_t *frame, + void *cookie, + xlator_t *this, + int32_t op_ret, + int32_t op_errno, + inode_t *inode, + struct stat *buf) +{ + int32_t callcnt = 0; + unify_private_t *priv = this->private; + unify_local_t *local = frame->local; + inode_t *tmp_inode = NULL; + + LOCK (&frame->lock); + { + callcnt = --local->call_count; + + if ((op_ret == -1) && !(priv->optimist && + (op_errno == ENOENT || + op_errno == EEXIST))) { + /* TODO: Decrement the inode_generation of + * this->inode's parent inode, hence the missing + * directory is created properly by self-heal. + * Currently, there is no way to get the parent + * inode directly. + */ + gf_log (this->name, GF_LOG_ERROR, + "child(%s): path(%s): %s", + priv->xl_array[(long)cookie]->name, + local->loc1.path, strerror (op_errno)); + if (op_errno != EEXIST) + local->failed = 1; + local->op_errno = op_errno; + } + + if (op_ret >= 0) + local->op_ret = 0; + + } + UNLOCK (&frame->lock); + + if (!callcnt) { + if (!local->failed) { + inode_ctx_put (local->loc1.inode, this, + priv->inode_generation); + } + + tmp_inode = local->loc1.inode; + unify_local_wipe (local); + + STACK_UNWIND (frame, local->op_ret, local->op_errno, + tmp_inode, &local->stbuf); + } + + return 0; +} + +/** + * unify_ns_mkdir_cbk - + */ +int32_t +unify_ns_mkdir_cbk (call_frame_t *frame, + void *cookie, + xlator_t *this, + int32_t op_ret, + int32_t op_errno, + inode_t *inode, + struct stat *buf) +{ + unify_private_t *priv = this->private; + unify_local_t *local = frame->local; + long index = 0; + + if (op_ret == -1) { + /* No need to send mkdir request to other servers, + * as namespace action failed + */ + gf_log (this->name, GF_LOG_ERROR, + "namespace: path(%s): %s", + local->name, strerror (op_errno)); + unify_local_wipe (local); + STACK_UNWIND (frame, op_ret, op_errno, inode, NULL); + return 0; + } + + /* Create one inode for this entry */ + local->op_ret = 0; + local->stbuf = *buf; + + local->call_count = priv->child_count; + + /* Send mkdir request to all the nodes now */ + for (index = 0; index < priv->child_count; index++) { + STACK_WIND_COOKIE (frame, + unify_mkdir_cbk, + (void *)index, //cookie + priv->xl_array[index], + priv->xl_array[index]->fops->mkdir, + &local->loc1, + local->mode); + } + + return 0; +} + + +/** + * unify_mkdir - + */ +int32_t +unify_mkdir (call_frame_t *frame, + xlator_t *this, + loc_t *loc, + mode_t mode) +{ + unify_local_t *local = NULL; + + /* Initialization */ + INIT_LOCAL (frame, local); + local->mode = mode; + + loc_copy (&local->loc1, loc); + + if (local->loc1.path == NULL) { + gf_log (this->name, GF_LOG_CRITICAL, "Not enough memory :O"); + STACK_UNWIND (frame, -1, ENOMEM, NULL, NULL); + return 0; + } + + STACK_WIND (frame, + unify_ns_mkdir_cbk, + NS(this), + NS(this)->fops->mkdir, + loc, + mode); + return 0; +} + +/** + * unify_rmdir_cbk - + */ +int32_t +unify_rmdir_cbk (call_frame_t *frame, + void *cookie, + xlator_t *this, + int32_t op_ret, + int32_t op_errno) +{ + int32_t callcnt = 0; + unify_private_t *priv = this->private; + unify_local_t *local = frame->local; + + LOCK (&frame->lock); + { + callcnt = --local->call_count; + if (op_ret == 0 || (priv->optimist && (op_errno == ENOENT))) + local->op_ret = 0; + if (op_ret == -1) + local->op_errno = op_errno; + } + UNLOCK (&frame->lock); + + if (!callcnt) { + unify_local_wipe (local); + STACK_UNWIND (frame, local->op_ret, local->op_errno); + } + + return 0; +} + +/** + * unify_ns_rmdir_cbk - + */ +int32_t +unify_ns_rmdir_cbk (call_frame_t *frame, + void *cookie, + xlator_t *this, + int32_t op_ret, + int32_t op_errno) +{ + int16_t index = 0; + unify_private_t *priv = this->private; + unify_local_t *local = frame->local; + + if (op_ret == -1) { + /* No need to send rmdir request to other servers, + * as namespace action failed + */ + gf_log (this->name, + ((op_errno != ENOTEMPTY) ? + GF_LOG_ERROR : GF_LOG_DEBUG), + "namespace: path(%s): %s", + local->loc1.path, strerror (op_errno)); + unify_local_wipe (local); + STACK_UNWIND (frame, op_ret, op_errno); + return 0; + } + + local->call_count = priv->child_count; + + for (index = 0; index < priv->child_count; index++) { + STACK_WIND (frame, + unify_rmdir_cbk, + priv->xl_array[index], + priv->xl_array[index]->fops->rmdir, + &local->loc1); + } + + return 0; +} + +/** + * unify_rmdir - + */ +int32_t +unify_rmdir (call_frame_t *frame, + xlator_t *this, + loc_t *loc) +{ + unify_local_t *local = NULL; + + UNIFY_CHECK_INODE_CTX_AND_UNWIND_ON_ERR (loc); + + /* Initialization */ + INIT_LOCAL (frame, local); + + loc_copy (&local->loc1, loc); + if (local->loc1.path == NULL) { + gf_log (this->name, GF_LOG_CRITICAL, "Not enough memory :O"); + STACK_UNWIND (frame, -1, ENOMEM); + return 0; + } + + STACK_WIND (frame, + unify_ns_rmdir_cbk, + NS(this), + NS(this)->fops->rmdir, + loc); + + return 0; +} + +/** + * unify_open_cbk - + */ +int32_t +unify_open_cbk (call_frame_t *frame, + void *cookie, + xlator_t *this, + int32_t op_ret, + int32_t op_errno, + fd_t *fd) +{ + int32_t callcnt = 0; + unify_local_t *local = frame->local; + + LOCK (&frame->lock); + { + if (op_ret >= 0) { + local->op_ret = op_ret; + if (NS(this) != (xlator_t *)cookie) { + /* Store child node's ptr, used in + all the f*** / FileIO calls */ + fd_ctx_set (fd, this, (uint64_t)(long)cookie); + } + } + if (op_ret == -1) { + local->op_errno = op_errno; + local->failed = 1; + } + callcnt = --local->call_count; + } + UNLOCK (&frame->lock); + + if (!callcnt) { + if ((local->failed == 1) && (local->op_ret >= 0)) { + local->call_count = 1; + /* return -1 to user */ + local->op_ret = -1; + //local->op_errno = EIO; + + if (!fd_ctx_get (local->fd, this, NULL)) { + gf_log (this->name, GF_LOG_ERROR, + "Open success on child node, " + "failed on namespace"); + } else { + gf_log (this->name, GF_LOG_ERROR, + "Open success on namespace, " + "failed on child node"); + } + } + + unify_local_wipe (local); + STACK_UNWIND (frame, local->op_ret, + local->op_errno, local->fd); + } + + return 0; +} + +#ifdef GF_DARWIN_HOST_OS +/** + * unify_create_lookup_cbk - + */ +int32_t +unify_open_lookup_cbk (call_frame_t *frame, + void *cookie, + xlator_t *this, + int32_t op_ret, + int32_t op_errno, + inode_t *inode, + struct stat *buf, + dict_t *dict) +{ + int32_t callcnt = 0; + int16_t index = 0; + unify_private_t *priv = this->private; + unify_local_t *local = frame->local; + + LOCK (&frame->lock); + { + callcnt = --local->call_count; + if ((op_ret == -1) && (op_errno != ENOENT)) { + gf_log (this->name, GF_LOG_ERROR, + "child(%s): path(%s): %s", + priv->xl_array[(long)cookie]->name, + local->loc1.path, strerror (op_errno)); + local->op_errno = op_errno; + } + + if (op_ret >= 0) { + local->op_ret = op_ret; + local->index++; + if (NS(this) == priv->xl_array[(long)cookie]) { + local->list[0] = (int16_t)(long)cookie; + } else { + local->list[1] = (int16_t)(long)cookie; + } + if (S_ISDIR (buf->st_mode)) + local->failed = 1; + } + } + UNLOCK (&frame->lock); + + if (!callcnt) { + int16_t file_list[3] = {0,}; + local->op_ret = -1; + + file_list[0] = local->list[0]; + file_list[1] = local->list[1]; + file_list[2] = -1; + + if (local->index != 2) { + /* Lookup failed, can't do open */ + gf_log (this->name, GF_LOG_ERROR, + "%s: present on %d nodes", + local->name, local->index); + + if (local->index < 2) { + unify_local_wipe (local); + gf_log (this->name, GF_LOG_ERROR, + "returning as file found on less " + "than 2 nodes"); + STACK_UNWIND (frame, local->op_ret, + local->op_errno, local->fd); + return 0; + } + } + + if (local->failed) { + /* Open on directory, return EISDIR */ + unify_local_wipe (local); + STACK_UNWIND (frame, -1, EISDIR, local->fd); + return 0; + } + + /* Everything is perfect :) */ + local->call_count = 2; + + for (index = 0; file_list[index] != -1; index++) { + char need_break = (file_list[index+1] == -1); + STACK_WIND_COOKIE (frame, + unify_open_cbk, + priv->xl_array[file_list[index]], + priv->xl_array[file_list[index]], + priv->xl_array[file_list[index]]->fops->open, + &local->loc1, + local->flags, + local->fd); + if (need_break) + break; + } + } + + return 0; +} + + +int32_t +unify_open_readlink_cbk (call_frame_t *frame, + void *cookie, + xlator_t *this, + int32_t op_ret, + int32_t op_errno, + const char *path) +{ + int16_t index = 0; + unify_private_t *priv = this->private; + unify_local_t *local = frame->local; + + if (op_ret == -1) { + STACK_UNWIND (frame, -1, ENOENT); + return 0; + } + + if (path[0] == '/') { + local->name = strdup (path); + ERR_ABORT (local->name); + } else { + char *tmp_str = strdup (local->loc1.path); + char *tmp_base = dirname (tmp_str); + local->name = CALLOC (1, ZR_PATH_MAX); + strcpy (local->name, tmp_base); + strncat (local->name, "/", 1); + strcat (local->name, path); + FREE (tmp_str); + } + + local->list = CALLOC (1, sizeof (int16_t) * 3); + ERR_ABORT (local->list); + local->call_count = priv->child_count + 1; + local->op_ret = -1; + for (index = 0; index <= priv->child_count; index++) { + /* Send the lookup to all the nodes including namespace */ + STACK_WIND_COOKIE (frame, + unify_open_lookup_cbk, + (void *)(long)index, + priv->xl_array[index], + priv->xl_array[index]->fops->lookup, + &local->loc1, + NULL); + } + + return 0; +} +#endif /* GF_DARWIN_HOST_OS */ + +/** + * unify_open - + */ +int32_t +unify_open (call_frame_t *frame, + xlator_t *this, + loc_t *loc, + int32_t flags, + fd_t *fd) +{ + unify_private_t *priv = this->private; + unify_local_t *local = NULL; + int16_t *list = NULL; + int16_t index = 0; + int16_t file_list[3] = {0,}; + uint64_t tmp_list = 0; + + UNIFY_CHECK_INODE_CTX_AND_UNWIND_ON_ERR (loc); + + /* Init */ + INIT_LOCAL (frame, local); + loc_copy (&local->loc1, loc); + local->fd = fd; + local->flags = flags; + inode_ctx_get (loc->inode, this, &tmp_list); + list = (int16_t *)(long)tmp_list; + + local->list = list; + file_list[0] = priv->child_count; /* Thats namespace */ + file_list[2] = -1; + for (index = 0; list[index] != -1; index++) { + local->call_count++; + if (list[index] != priv->child_count) + file_list[1] = list[index]; + } + + if (local->call_count != 2) { + /* If the lookup was done for file */ + gf_log (this->name, GF_LOG_ERROR, + "%s: entry_count is %d", + loc->path, local->call_count); + for (index = 0; local->list[index] != -1; index++) + gf_log (this->name, GF_LOG_ERROR, "%s: found on %s", + loc->path, priv->xl_array[list[index]]->name); + + if (local->call_count < 2) { + gf_log (this->name, GF_LOG_ERROR, + "returning EIO as file found on onlyone node"); + STACK_UNWIND (frame, -1, EIO, fd); + return 0; + } + } + +#ifdef GF_DARWIN_HOST_OS + /* Handle symlink here */ + if (S_ISLNK (loc->inode->st_mode)) { + /* Callcount doesn't matter here */ + STACK_WIND (frame, + unify_open_readlink_cbk, + NS(this), + NS(this)->fops->readlink, + loc, ZR_PATH_MAX); + return 0; + } +#endif /* GF_DARWIN_HOST_OS */ + + local->call_count = 2; + for (index = 0; file_list[index] != -1; index++) { + char need_break = (file_list[index+1] == -1); + STACK_WIND_COOKIE (frame, + unify_open_cbk, + priv->xl_array[file_list[index]], //cookie + priv->xl_array[file_list[index]], + priv->xl_array[file_list[index]]->fops->open, + loc, + flags, + fd); + if (need_break) + break; + } + + return 0; +} + + +int32_t +unify_create_unlink_cbk (call_frame_t *frame, + void *cookie, + xlator_t *this, + int32_t op_ret, + int32_t op_errno) +{ + unify_local_t *local = frame->local; + inode_t *inode = local->loc1.inode; + + unify_local_wipe (local); + + STACK_UNWIND (frame, local->op_ret, local->op_errno, local->fd, + inode, &local->stbuf); + + return 0; +} + +/** + * unify_create_open_cbk - + */ +int32_t +unify_create_open_cbk (call_frame_t *frame, + void *cookie, + xlator_t *this, + int32_t op_ret, + int32_t op_errno, + fd_t *fd) +{ + int ret = 0; + int32_t callcnt = 0; + unify_local_t *local = frame->local; + inode_t *inode = NULL; + xlator_t *child = NULL; + uint64_t tmp_value = 0; + + LOCK (&frame->lock); + { + if (op_ret >= 0) { + local->op_ret = op_ret; + if (NS(this) != (xlator_t *)cookie) { + /* Store child node's ptr, used in all + the f*** / FileIO calls */ + /* TODO: log on failure */ + ret = fd_ctx_get (fd, this, &tmp_value); + cookie = (void *)(long)tmp_value; + } else { + /* NOTE: open successful on namespace. + * fd's ctx can be used to identify open + * failure on storage subvolume. cool + * ide ;) */ + local->failed = 0; + } + } else { + gf_log (this->name, GF_LOG_ERROR, + "child(%s): path(%s): %s", + ((xlator_t *)cookie)->name, + local->loc1.path, strerror (op_errno)); + local->op_errno = op_errno; + local->failed = 1; + } + callcnt = --local->call_count; + } + UNLOCK (&frame->lock); + + if (!callcnt) { + if (local->failed == 1 && (local->op_ret >= 0)) { + local->call_count = 1; + /* return -1 to user */ + local->op_ret = -1; + local->op_errno = EIO; + local->fd = fd; + local->call_count = 1; + + if (!fd_ctx_get (local->fd, this, &tmp_value)) { + child = (xlator_t *)(long)tmp_value; + + gf_log (this->name, GF_LOG_ERROR, + "Create success on child node, " + "failed on namespace"); + + STACK_WIND (frame, + unify_create_unlink_cbk, + child, + child->fops->unlink, + &local->loc1); + } else { + gf_log (this->name, GF_LOG_ERROR, + "Create success on namespace, " + "failed on child node"); + + STACK_WIND (frame, + unify_create_unlink_cbk, + NS(this), + NS(this)->fops->unlink, + &local->loc1); + } + return 0; + } + inode = local->loc1.inode; + unify_local_wipe (local); + STACK_UNWIND (frame, local->op_ret, local->op_errno, fd, + inode, &local->stbuf); + } + return 0; +} + +/** + * unify_create_lookup_cbk - + */ +int32_t +unify_create_lookup_cbk (call_frame_t *frame, + void *cookie, + xlator_t *this, + int32_t op_ret, + int32_t op_errno, + inode_t *inode, + struct stat *buf, + dict_t *dict) +{ + int32_t callcnt = 0; + int16_t index = 0; + unify_private_t *priv = this->private; + unify_local_t *local = frame->local; + + LOCK (&frame->lock); + { + callcnt = --local->call_count; + if (op_ret == -1) { + gf_log (this->name, GF_LOG_ERROR, + "child(%s): path(%s): %s", + priv->xl_array[(long)cookie]->name, + local->loc1.path, strerror (op_errno)); + local->op_errno = op_errno; + local->failed = 1; + } + + if (op_ret >= 0) { + local->op_ret = op_ret; + local->list[local->index++] = (int16_t)(long)cookie; + if (NS(this) == priv->xl_array[(long)cookie]) { + local->st_ino = buf->st_ino; + } else { + local->stbuf = *buf; + } + } + } + UNLOCK (&frame->lock); + + if (!callcnt) { + int16_t *list = local->list; + int16_t file_list[3] = {0,}; + local->op_ret = -1; + + local->list [local->index] = -1; + file_list[0] = list[0]; + file_list[1] = list[1]; + file_list[2] = -1; + + local->stbuf.st_ino = local->st_ino; + /* TODO: log on failure */ + inode_ctx_put (local->loc1.inode, this, + (uint64_t)(long)local->list); + + if (local->index != 2) { + /* Lookup failed, can't do open */ + gf_log (this->name, GF_LOG_ERROR, + "%s: present on %d nodes", + local->loc1.path, local->index); + file_list[0] = priv->child_count; + for (index = 0; list[index] != -1; index++) { + gf_log (this->name, GF_LOG_ERROR, + "%s: found on %s", local->loc1.path, + priv->xl_array[list[index]]->name); + if (list[index] != priv->child_count) + file_list[1] = list[index]; + } + + if (local->index < 2) { + unify_local_wipe (local); + gf_log (this->name, GF_LOG_ERROR, + "returning EIO as file found on " + "only one node"); + STACK_UNWIND (frame, -1, EIO, + local->fd, inode, NULL); + return 0; + } + } + /* Everything is perfect :) */ + local->call_count = 2; + + for (index = 0; file_list[index] != -1; index++) { + char need_break = (file_list[index+1] == -1); + STACK_WIND_COOKIE (frame, + unify_create_open_cbk, + priv->xl_array[file_list[index]], + priv->xl_array[file_list[index]], + priv->xl_array[file_list[index]]->fops->open, + &local->loc1, + local->flags, + local->fd); + if (need_break) + break; + } + } + + return 0; +} + + +/** + * unify_create_cbk - + */ +int32_t +unify_create_cbk (call_frame_t *frame, + void *cookie, + xlator_t *this, + int32_t op_ret, + int32_t op_errno, + fd_t *fd, + inode_t *inode, + struct stat *buf) +{ + int ret = 0; + unify_local_t *local = frame->local; + call_frame_t *prev_frame = cookie; + inode_t *tmp_inode = NULL; + + if (op_ret == -1) { + /* send unlink () on Namespace */ + local->op_errno = op_errno; + local->op_ret = -1; + local->call_count = 1; + gf_log (this->name, GF_LOG_ERROR, + "create failed on %s (file %s, error %s), " + "sending unlink to namespace", + prev_frame->this->name, + local->loc1.path, strerror (op_errno)); + + STACK_WIND (frame, + unify_create_unlink_cbk, + NS(this), + NS(this)->fops->unlink, + &local->loc1); + + return 0; + } + + if (op_ret >= 0) { + local->op_ret = op_ret; + local->stbuf = *buf; + /* Just inode number should be from NS node */ + local->stbuf.st_ino = local->st_ino; + + /* TODO: log on failure */ + ret = fd_ctx_set (fd, this, (uint64_t)(long)prev_frame->this); + } + + tmp_inode = local->loc1.inode; + unify_local_wipe (local); + STACK_UNWIND (frame, local->op_ret, local->op_errno, local->fd, + tmp_inode, &local->stbuf); + + return 0; +} + +/** + * unify_ns_create_cbk - + * + */ +int32_t +unify_ns_create_cbk (call_frame_t *frame, + void *cookie, + xlator_t *this, + int32_t op_ret, + int32_t op_errno, + fd_t *fd, + inode_t *inode, + struct stat *buf) +{ + struct sched_ops *sched_ops = NULL; + xlator_t *sched_xl = NULL; + unify_local_t *local = frame->local; + unify_private_t *priv = this->private; + int16_t *list = NULL; + int16_t index = 0; + + if (op_ret == -1) { + /* No need to send create request to other servers, as + namespace action failed. Handle exclusive create here. */ + if ((op_errno != EEXIST) || + ((op_errno == EEXIST) && + ((local->flags & O_EXCL) == O_EXCL))) { + /* If its just a create call without O_EXCL, + don't do this */ + gf_log (this->name, GF_LOG_ERROR, + "namespace: path(%s): %s", + local->loc1.path, strerror (op_errno)); + unify_local_wipe (local); + STACK_UNWIND (frame, op_ret, op_errno, fd, inode, buf); + return 0; + } + } + + if (op_ret >= 0) { + /* Get the inode number from the NS node */ + local->st_ino = buf->st_ino; + + local->op_ret = -1; + + /* Start the mapping list */ + list = CALLOC (1, sizeof (int16_t) * 3); + ERR_ABORT (list); + inode_ctx_put (inode, this, (uint64_t)(long)list); + list[0] = priv->child_count; + list[2] = -1; + + /* This means, file doesn't exist anywhere in the Filesystem */ + sched_ops = priv->sched_ops; + + /* Send create request to the scheduled node now */ + sched_xl = sched_ops->schedule (this, local->loc1.path); + if (sched_xl == NULL) + { + /* send unlink () on Namespace */ + local->op_errno = ENOTCONN; + local->op_ret = -1; + local->call_count = 1; + gf_log (this->name, GF_LOG_ERROR, + "no node online to schedule create:(file %s) " + "sending unlink to namespace", + (local->loc1.path)?local->loc1.path:""); + + STACK_WIND (frame, + unify_create_unlink_cbk, + NS(this), + NS(this)->fops->unlink, + &local->loc1); + + return 0; + } + + for (index = 0; index < priv->child_count; index++) + if (sched_xl == priv->xl_array[index]) + break; + list[1] = index; + + STACK_WIND (frame, unify_create_cbk, + sched_xl, sched_xl->fops->create, + &local->loc1, local->flags, local->mode, fd); + } else { + /* File already exists, and there is no O_EXCL flag */ + + gf_log (this->name, GF_LOG_DEBUG, + "File(%s) already exists on namespace, sending " + "open instead", local->loc1.path); + + local->list = CALLOC (1, sizeof (int16_t) * 3); + ERR_ABORT (local->list); + local->call_count = priv->child_count + 1; + local->op_ret = -1; + for (index = 0; index <= priv->child_count; index++) { + /* Send lookup() to all nodes including namespace */ + STACK_WIND_COOKIE (frame, + unify_create_lookup_cbk, + (void *)(long)index, + priv->xl_array[index], + priv->xl_array[index]->fops->lookup, + &local->loc1, + NULL); + } + } + return 0; +} + +/** + * unify_create - create a file in global namespace first, so other + * clients can see them. Create the file in storage nodes in background. + */ +int32_t +unify_create (call_frame_t *frame, + xlator_t *this, + loc_t *loc, + int32_t flags, + mode_t mode, + fd_t *fd) +{ + unify_local_t *local = NULL; + + /* Initialization */ + INIT_LOCAL (frame, local); + local->mode = mode; + local->flags = flags; + local->fd = fd; + + loc_copy (&local->loc1, loc); + if (local->loc1.path == NULL) { + gf_log (this->name, GF_LOG_CRITICAL, "Not enough memory :O"); + STACK_UNWIND (frame, -1, ENOMEM, fd, loc->inode, NULL); + return 0; + } + + STACK_WIND (frame, + unify_ns_create_cbk, + NS(this), + NS(this)->fops->create, + loc, + flags | O_EXCL, + mode, + fd); + + return 0; +} + + +/** + * unify_opendir_cbk - + */ +int32_t +unify_opendir_cbk (call_frame_t *frame, + void *cookie, + xlator_t *this, + int32_t op_ret, + int32_t op_errno, + fd_t *fd) +{ + STACK_UNWIND (frame, op_ret, op_errno, fd); + + return 0; +} + +/** + * unify_opendir - + */ +int32_t +unify_opendir (call_frame_t *frame, + xlator_t *this, + loc_t *loc, + fd_t *fd) +{ + UNIFY_CHECK_INODE_CTX_AND_UNWIND_ON_ERR (loc); + + STACK_WIND (frame, unify_opendir_cbk, + NS(this), NS(this)->fops->opendir, loc, fd); + + return 0; +} + + +/** + * unify_chmod - + */ +int32_t +unify_chmod (call_frame_t *frame, + xlator_t *this, + loc_t *loc, + mode_t mode) +{ + unify_local_t *local = NULL; + unify_private_t *priv = this->private; + int32_t index = 0; + int32_t callcnt = 0; + uint64_t tmp_list = 0; + + UNIFY_CHECK_INODE_CTX_AND_UNWIND_ON_ERR (loc); + + /* Initialization */ + INIT_LOCAL (frame, local); + + loc_copy (&local->loc1, loc); + local->st_ino = loc->inode->ino; + + if (S_ISDIR (loc->inode->st_mode)) { + local->call_count = priv->child_count + 1; + + for (index = 0; index < (priv->child_count + 1); index++) { + STACK_WIND (frame, + unify_buf_cbk, + priv->xl_array[index], + priv->xl_array[index]->fops->chmod, + loc, mode); + } + } else { + inode_ctx_get (loc->inode, this, &tmp_list); + local->list = (int16_t *)(long)tmp_list; + + for (index = 0; local->list[index] != -1; index++) { + local->call_count++; + callcnt++; + } + + for (index = 0; local->list[index] != -1; index++) { + STACK_WIND (frame, + unify_buf_cbk, + priv->xl_array[local->list[index]], + priv->xl_array[local->list[index]]->fops->chmod, + loc, + mode); + if (!--callcnt) + break; + } + } + + return 0; +} + +/** + * unify_chown - + */ +int32_t +unify_chown (call_frame_t *frame, + xlator_t *this, + loc_t *loc, + uid_t uid, + gid_t gid) +{ + unify_local_t *local = NULL; + unify_private_t *priv = this->private; + int32_t index = 0; + int32_t callcnt = 0; + uint64_t tmp_list = 0; + + UNIFY_CHECK_INODE_CTX_AND_UNWIND_ON_ERR (loc); + + /* Initialization */ + INIT_LOCAL (frame, local); + loc_copy (&local->loc1, loc); + local->st_ino = loc->inode->ino; + + if (S_ISDIR (loc->inode->st_mode)) { + local->call_count = priv->child_count + 1; + + for (index = 0; index < (priv->child_count + 1); index++) { + STACK_WIND (frame, + unify_buf_cbk, + priv->xl_array[index], + priv->xl_array[index]->fops->chown, + loc, uid, gid); + } + } else { + inode_ctx_get (loc->inode, this, &tmp_list); + local->list = (int16_t *)(long)tmp_list; + + for (index = 0; local->list[index] != -1; index++) { + local->call_count++; + callcnt++; + } + + for (index = 0; local->list[index] != -1; index++) { + STACK_WIND (frame, + unify_buf_cbk, + priv->xl_array[local->list[index]], + priv->xl_array[local->list[index]]->fops->chown, + loc, uid, gid); + if (!--callcnt) + break; + } + } + + return 0; +} + + +/** + * unify_truncate_cbk - + */ +int32_t +unify_truncate_cbk (call_frame_t *frame, + void *cookie, + xlator_t *this, + int32_t op_ret, + int32_t op_errno, + struct stat *buf) +{ + int32_t callcnt = 0; + unify_private_t *priv = this->private; + unify_local_t *local = frame->local; + call_frame_t *prev_frame = cookie; + + LOCK (&frame->lock); + { + callcnt = --local->call_count; + + if (op_ret == -1) { + gf_log (this->name, GF_LOG_ERROR, + "child(%s): path(%s): %s", + prev_frame->this->name, + (local->loc1.path)?local->loc1.path:"", + strerror (op_errno)); + local->op_errno = op_errno; + if (!((op_errno == ENOENT) && priv->optimist)) + local->op_ret = -1; + } + + if (op_ret >= 0) { + if (NS (this) == prev_frame->this) { + local->st_ino = buf->st_ino; + /* If the entry is directory, get the + stat from NS node */ + if (S_ISDIR (buf->st_mode) || + !local->stbuf.st_blksize) { + local->stbuf = *buf; + } + } + + if ((!S_ISDIR (buf->st_mode)) && + (NS (this) != prev_frame->this)) { + /* If file, take the stat info from + Storage node. */ + local->stbuf = *buf; + } + } + } + UNLOCK (&frame->lock); + + if (!callcnt) { + if (local->st_ino) + local->stbuf.st_ino = local->st_ino; + else + local->op_ret = -1; + unify_local_wipe (local); + STACK_UNWIND (frame, local->op_ret, local->op_errno, + &local->stbuf); + } + + return 0; +} + +/** + * unify_truncate - + */ +int32_t +unify_truncate (call_frame_t *frame, + xlator_t *this, + loc_t *loc, + off_t offset) +{ + unify_local_t *local = NULL; + unify_private_t *priv = this->private; + int32_t index = 0; + int32_t callcnt = 0; + uint64_t tmp_list = 0; + + UNIFY_CHECK_INODE_CTX_AND_UNWIND_ON_ERR (loc); + + /* Initialization */ + INIT_LOCAL (frame, local); + loc_copy (&local->loc1, loc); + local->st_ino = loc->inode->ino; + + if (S_ISDIR (loc->inode->st_mode)) { + local->call_count = 1; + + STACK_WIND (frame, + unify_buf_cbk, + NS(this), + NS(this)->fops->stat, + loc); + } else { + local->op_ret = 0; + inode_ctx_get (loc->inode, this, &tmp_list); + local->list = (int16_t *)(long)tmp_list; + + for (index = 0; local->list[index] != -1; index++) { + local->call_count++; + callcnt++; + } + + /* Don't send truncate to NS node */ + STACK_WIND (frame, unify_truncate_cbk, NS(this), + NS(this)->fops->stat, loc); + callcnt--; + + for (index = 0; local->list[index] != -1; index++) { + if (NS(this) != priv->xl_array[local->list[index]]) { + STACK_WIND (frame, + unify_truncate_cbk, + priv->xl_array[local->list[index]], + priv->xl_array[local->list[index]]->fops->truncate, + loc, + offset); + if (!--callcnt) + break; + } + } + } + + return 0; +} + +/** + * unify_utimens - + */ +int32_t +unify_utimens (call_frame_t *frame, + xlator_t *this, + loc_t *loc, + struct timespec tv[2]) +{ + unify_local_t *local = NULL; + unify_private_t *priv = this->private; + int32_t index = 0; + int32_t callcnt = 0; + uint64_t tmp_list = 0; + + UNIFY_CHECK_INODE_CTX_AND_UNWIND_ON_ERR (loc); + + /* Initialization */ + INIT_LOCAL (frame, local); + loc_copy (&local->loc1, loc); + local->st_ino = loc->inode->ino; + + if (S_ISDIR (loc->inode->st_mode)) { + local->call_count = priv->child_count + 1; + + for (index = 0; index < (priv->child_count + 1); index++) { + STACK_WIND (frame, + unify_buf_cbk, + priv->xl_array[index], + priv->xl_array[index]->fops->utimens, + loc, tv); + } + } else { + inode_ctx_get (loc->inode, this, &tmp_list); + local->list = (int16_t *)(long)tmp_list; + + for (index = 0; local->list[index] != -1; index++) { + local->call_count++; + callcnt++; + } + + for (index = 0; local->list[index] != -1; index++) { + STACK_WIND (frame, + unify_buf_cbk, + priv->xl_array[local->list[index]], + priv->xl_array[local->list[index]]->fops->utimens, + loc, + tv); + if (!--callcnt) + break; + } + } + + return 0; +} + +/** + * unify_readlink_cbk - + */ +int32_t +unify_readlink_cbk (call_frame_t *frame, + void *cookie, + xlator_t *this, + int32_t op_ret, + int32_t op_errno, + const char *path) +{ + STACK_UNWIND (frame, op_ret, op_errno, path); + return 0; +} + +/** + * unify_readlink - Read the link only from the storage node. + */ +int32_t +unify_readlink (call_frame_t *frame, + xlator_t *this, + loc_t *loc, + size_t size) +{ + unify_private_t *priv = this->private; + int32_t entry_count = 0; + int16_t *list = NULL; + int16_t index = 0; + uint64_t tmp_list = 0; + + UNIFY_CHECK_INODE_CTX_AND_UNWIND_ON_ERR (loc); + + inode_ctx_get (loc->inode, this, &tmp_list); + list = (int16_t *)(long)tmp_list; + + for (index = 0; list[index] != -1; index++) + entry_count++; + + if (entry_count >= 2) { + for (index = 0; list[index] != -1; index++) { + if (priv->xl_array[list[index]] != NS(this)) { + STACK_WIND (frame, + unify_readlink_cbk, + priv->xl_array[list[index]], + priv->xl_array[list[index]]->fops->readlink, + loc, + size); + break; + } + } + } else { + gf_log (this->name, GF_LOG_ERROR, + "returning ENOENT, no softlink files found " + "on storage node"); + STACK_UNWIND (frame, -1, ENOENT, NULL); + } + + return 0; +} + + +/** + * unify_unlink_cbk - + */ +int32_t +unify_unlink_cbk (call_frame_t *frame, + void *cookie, + xlator_t *this, + int32_t op_ret, + int32_t op_errno) +{ + int32_t callcnt = 0; + unify_private_t *priv = this->private; + unify_local_t *local = frame->local; + + LOCK (&frame->lock); + { + callcnt = --local->call_count; + if (op_ret == 0 || ((op_errno == ENOENT) && priv->optimist)) + local->op_ret = 0; + if (op_ret == -1) + local->op_errno = op_errno; + } + UNLOCK (&frame->lock); + + if (!callcnt) { + unify_local_wipe (local); + STACK_UNWIND (frame, local->op_ret, local->op_errno); + } + + return 0; +} + + +/** + * unify_unlink - + */ +int32_t +unify_unlink (call_frame_t *frame, + xlator_t *this, + loc_t *loc) +{ + unify_private_t *priv = this->private; + unify_local_t *local = NULL; + int16_t *list = NULL; + int16_t index = 0; + uint64_t tmp_list = 0; + + UNIFY_CHECK_INODE_CTX_AND_UNWIND_ON_ERR (loc); + + /* Initialization */ + INIT_LOCAL (frame, local); + loc_copy (&local->loc1, loc); + + inode_ctx_get (loc->inode, this, &tmp_list); + list = (int16_t *)(long)tmp_list; + + for (index = 0; list[index] != -1; index++) + local->call_count++; + + if (local->call_count) { + for (index = 0; list[index] != -1; index++) { + char need_break = (list[index+1] == -1); + STACK_WIND (frame, + unify_unlink_cbk, + priv->xl_array[list[index]], + priv->xl_array[list[index]]->fops->unlink, + loc); + if (need_break) + break; + } + } else { + gf_log (this->name, GF_LOG_ERROR, + "%s: returning ENOENT", loc->path); + STACK_UNWIND (frame, -1, ENOENT); + } + + return 0; +} + + +/** + * unify_readv_cbk - + */ +int32_t +unify_readv_cbk (call_frame_t *frame, + void *cookie, + xlator_t *this, + int32_t op_ret, + int32_t op_errno, + struct iovec *vector, + int32_t count, + struct stat *stbuf) +{ + STACK_UNWIND (frame, op_ret, op_errno, vector, count, stbuf); + return 0; +} + +/** + * unify_readv - + */ +int32_t +unify_readv (call_frame_t *frame, + xlator_t *this, + fd_t *fd, + size_t size, + off_t offset) +{ + UNIFY_CHECK_FD_CTX_AND_UNWIND_ON_ERR (fd); + xlator_t *child = NULL; + uint64_t tmp_child = 0; + + fd_ctx_get (fd, this, &tmp_child); + child = (xlator_t *)(long)tmp_child; + + STACK_WIND (frame, + unify_readv_cbk, + child, + child->fops->readv, + fd, + size, + offset); + + + return 0; +} + +/** + * unify_writev_cbk - + */ +int32_t +unify_writev_cbk (call_frame_t *frame, + void *cookie, + xlator_t *this, + int32_t op_ret, + int32_t op_errno, + struct stat *stbuf) +{ + STACK_UNWIND (frame, op_ret, op_errno, stbuf); + return 0; +} + +/** + * unify_writev - + */ +int32_t +unify_writev (call_frame_t *frame, + xlator_t *this, + fd_t *fd, + struct iovec *vector, + int32_t count, + off_t off) +{ + UNIFY_CHECK_FD_CTX_AND_UNWIND_ON_ERR (fd); + xlator_t *child = NULL; + uint64_t tmp_child = 0; + + fd_ctx_get (fd, this, &tmp_child); + child = (xlator_t *)(long)tmp_child; + + STACK_WIND (frame, + unify_writev_cbk, + child, + child->fops->writev, + fd, + vector, + count, + off); + + return 0; +} + +/** + * unify_ftruncate - + */ +int32_t +unify_ftruncate (call_frame_t *frame, + xlator_t *this, + fd_t *fd, + off_t offset) +{ + xlator_t *child = NULL; + unify_local_t *local = NULL; + uint64_t tmp_child = 0; + + UNIFY_CHECK_FD_CTX_AND_UNWIND_ON_ERR(fd); + + /* Initialization */ + INIT_LOCAL (frame, local); + local->op_ret = 0; + + fd_ctx_get (fd, this, &tmp_child); + child = (xlator_t *)(long)tmp_child; + + local->call_count = 2; + + STACK_WIND (frame, unify_truncate_cbk, + child, child->fops->ftruncate, + fd, offset); + + STACK_WIND (frame, unify_truncate_cbk, + NS(this), NS(this)->fops->fstat, + fd); + + return 0; +} + + +/** + * unify_fchmod - + */ +int32_t +unify_fchmod (call_frame_t *frame, + xlator_t *this, + fd_t *fd, + mode_t mode) +{ + unify_local_t *local = NULL; + xlator_t *child = NULL; + uint64_t tmp_child = 0; + + UNIFY_CHECK_FD_AND_UNWIND_ON_ERR(fd); + + /* Initialization */ + INIT_LOCAL (frame, local); + local->st_ino = fd->inode->ino; + + if (!fd_ctx_get (fd, this, &tmp_child)) { + /* If its set, then its file */ + child = (xlator_t *)(long)tmp_child; + + local->call_count = 2; + + STACK_WIND (frame, unify_buf_cbk, child, + child->fops->fchmod, fd, mode); + + STACK_WIND (frame, unify_buf_cbk, NS(this), + NS(this)->fops->fchmod, fd, mode); + + } else { + /* this is an directory */ + local->call_count = 1; + + STACK_WIND (frame, unify_buf_cbk, + NS(this), NS(this)->fops->fchmod, fd, mode); + } + + return 0; +} + +/** + * unify_fchown - + */ +int32_t +unify_fchown (call_frame_t *frame, + xlator_t *this, + fd_t *fd, + uid_t uid, + gid_t gid) +{ + unify_local_t *local = NULL; + xlator_t *child = NULL; + uint64_t tmp_child = 0; + + UNIFY_CHECK_FD_AND_UNWIND_ON_ERR(fd); + + /* Initialization */ + INIT_LOCAL (frame, local); + local->st_ino = fd->inode->ino; + + if (!fd_ctx_get (fd, this, &tmp_child)) { + /* If its set, then its file */ + child = (xlator_t *)(long)tmp_child; + + local->call_count = 2; + + STACK_WIND (frame, unify_buf_cbk, child, + child->fops->fchown, fd, uid, gid); + + STACK_WIND (frame, unify_buf_cbk, NS(this), + NS(this)->fops->fchown, fd, uid, gid); + } else { + local->call_count = 1; + + STACK_WIND (frame, unify_buf_cbk, + NS(this), NS(this)->fops->fchown, + fd, uid, gid); + } + + return 0; +} + +/** + * unify_flush_cbk - + */ +int32_t +unify_flush_cbk (call_frame_t *frame, + void *cookie, + xlator_t *this, + int32_t op_ret, + int32_t op_errno) +{ + STACK_UNWIND (frame, op_ret, op_errno); + return 0; +} + +/** + * unify_flush - + */ +int32_t +unify_flush (call_frame_t *frame, + xlator_t *this, + fd_t *fd) +{ + UNIFY_CHECK_FD_CTX_AND_UNWIND_ON_ERR (fd); + xlator_t *child = NULL; + uint64_t tmp_child = 0; + + fd_ctx_get (fd, this, &tmp_child); + child = (xlator_t *)(long)tmp_child; + + STACK_WIND (frame, unify_flush_cbk, child, + child->fops->flush, fd); + + return 0; +} + + +/** + * unify_fsync_cbk - + */ +int32_t +unify_fsync_cbk (call_frame_t *frame, + void *cookie, + xlator_t *this, + int32_t op_ret, + int32_t op_errno) +{ + STACK_UNWIND (frame, op_ret, op_errno); + return 0; +} + +/** + * unify_fsync - + */ +int32_t +unify_fsync (call_frame_t *frame, + xlator_t *this, + fd_t *fd, + int32_t flags) +{ + UNIFY_CHECK_FD_CTX_AND_UNWIND_ON_ERR (fd); + xlator_t *child = NULL; + uint64_t tmp_child = 0; + + fd_ctx_get (fd, this, &tmp_child); + child = (xlator_t *)(long)tmp_child; + + STACK_WIND (frame, unify_fsync_cbk, child, + child->fops->fsync, fd, flags); + + return 0; +} + +/** + * unify_fstat - Send fstat FOP to Namespace only if its directory, and to + * both namespace and the storage node if its a file. + */ +int32_t +unify_fstat (call_frame_t *frame, + xlator_t *this, + fd_t *fd) +{ + unify_local_t *local = NULL; + xlator_t *child = NULL; + uint64_t tmp_child = 0; + + UNIFY_CHECK_FD_AND_UNWIND_ON_ERR(fd); + + INIT_LOCAL (frame, local); + local->st_ino = fd->inode->ino; + + if (!fd_ctx_get (fd, this, &tmp_child)) { + /* If its set, then its file */ + child = (xlator_t *)(long)tmp_child; + local->call_count = 2; + + STACK_WIND (frame, unify_buf_cbk, child, + child->fops->fstat, fd); + + STACK_WIND (frame, unify_buf_cbk, NS(this), + NS(this)->fops->fstat, fd); + + } else { + /* this is an directory */ + local->call_count = 1; + STACK_WIND (frame, unify_buf_cbk, NS(this), + NS(this)->fops->fstat, fd); + } + + return 0; +} + +/** + * unify_getdents_cbk - + */ +int32_t +unify_getdents_cbk (call_frame_t *frame, + void *cookie, + xlator_t *this, + int32_t op_ret, + int32_t op_errno, + dir_entry_t *entry, + int32_t count) +{ + STACK_UNWIND (frame, op_ret, op_errno, entry, count); + return 0; +} + +/** + * unify_getdents - send the FOP request to all the nodes. + */ +int32_t +unify_getdents (call_frame_t *frame, + xlator_t *this, + fd_t *fd, + size_t size, + off_t offset, + int32_t flag) +{ + UNIFY_CHECK_FD_AND_UNWIND_ON_ERR (fd); + + STACK_WIND (frame, unify_getdents_cbk, NS(this), + NS(this)->fops->getdents, fd, size, offset, flag); + + return 0; +} + + +/** + * unify_readdir_cbk - + */ +int32_t +unify_readdir_cbk (call_frame_t *frame, + void *cookie, + xlator_t *this, + int32_t op_ret, + int32_t op_errno, + gf_dirent_t *buf) +{ + STACK_UNWIND (frame, op_ret, op_errno, buf); + + return 0; +} + +/** + * unify_readdir - send the FOP request to all the nodes. + */ +int32_t +unify_readdir (call_frame_t *frame, + xlator_t *this, + fd_t *fd, + size_t size, + off_t offset) +{ + UNIFY_CHECK_FD_AND_UNWIND_ON_ERR (fd); + + STACK_WIND (frame, unify_readdir_cbk, NS(this), + NS(this)->fops->readdir, fd, size, offset); + + return 0; +} + + +/** + * unify_fsyncdir_cbk - + */ +int32_t +unify_fsyncdir_cbk (call_frame_t *frame, + void *cookie, + xlator_t *this, + int32_t op_ret, + int32_t op_errno) +{ + STACK_UNWIND (frame, op_ret, op_errno); + + return 0; +} + +/** + * unify_fsyncdir - + */ +int32_t +unify_fsyncdir (call_frame_t *frame, + xlator_t *this, + fd_t *fd, + int32_t flags) +{ + UNIFY_CHECK_FD_AND_UNWIND_ON_ERR (fd); + + STACK_WIND (frame, unify_fsyncdir_cbk, + NS(this), NS(this)->fops->fsyncdir, fd, flags); + + return 0; +} + +/** + * unify_lk_cbk - UNWIND frame with the proper return arguments. + */ +int32_t +unify_lk_cbk (call_frame_t *frame, + void *cookie, + xlator_t *this, + int32_t op_ret, + int32_t op_errno, + struct flock *lock) +{ + STACK_UNWIND (frame, op_ret, op_errno, lock); + return 0; +} + +/** + * unify_lk - Send it to all the storage nodes, (should be 1) which has file. + */ +int32_t +unify_lk (call_frame_t *frame, + xlator_t *this, + fd_t *fd, + int32_t cmd, + struct flock *lock) +{ + UNIFY_CHECK_FD_CTX_AND_UNWIND_ON_ERR (fd); + xlator_t *child = NULL; + uint64_t tmp_child = 0; + + fd_ctx_get (fd, this, &tmp_child); + child = (xlator_t *)(long)tmp_child; + + STACK_WIND (frame, unify_lk_cbk, child, + child->fops->lk, fd, cmd, lock); + + return 0; +} + + +int32_t +unify_setxattr_cbk (call_frame_t *frame, + void *cookie, + xlator_t *this, + int32_t op_ret, + int32_t op_errno); + +static int32_t +unify_setxattr_file_cbk (call_frame_t *frame, + void *cookie, + xlator_t *this, + int32_t op_ret, + int32_t op_errno) +{ + unify_private_t *private = this->private; + unify_local_t *local = frame->local; + xlator_t *sched_xl = NULL; + struct sched_ops *sched_ops = NULL; + + if (op_ret == -1) { + if (!ENOTSUP) + gf_log (this->name, GF_LOG_ERROR, + "setxattr with XATTR_CREATE on ns: " + "path(%s) key(%s): %s", + local->loc1.path, local->name, + strerror (op_errno)); + unify_local_wipe (local); + STACK_UNWIND (frame, op_ret, op_errno); + return 0; + } + + LOCK (&frame->lock); + { + local->failed = 0; + local->op_ret = 0; + local->op_errno = 0; + local->call_count = 1; + } + UNLOCK (&frame->lock); + + /* schedule XATTR_CREATE on one of the child node */ + sched_ops = private->sched_ops; + + /* Send create request to the scheduled node now */ + sched_xl = sched_ops->schedule (this, local->name); + if (!sched_xl) { + STACK_UNWIND (frame, -1, ENOTCONN); + return 0; + } + + STACK_WIND (frame, + unify_setxattr_cbk, + sched_xl, + sched_xl->fops->setxattr, + &local->loc1, + local->dict, + local->flags); + return 0; +} + +/** + * unify_setxattr_cbk - When all the child nodes return, UNWIND frame. + */ +int32_t +unify_setxattr_cbk (call_frame_t *frame, + void *cookie, + xlator_t *this, + int32_t op_ret, + int32_t op_errno) +{ + int32_t callcnt = 0; + unify_local_t *local = frame->local; + call_frame_t *prev_frame = cookie; + dict_t *dict = NULL; + + LOCK (&frame->lock); + { + callcnt = --local->call_count; + + if (op_ret == -1) { + gf_log (this->name, (((op_errno == ENOENT) || + (op_errno == ENOTSUP))? + GF_LOG_DEBUG : GF_LOG_ERROR), + "child(%s): path(%s): %s", + prev_frame->this->name, + (local->loc1.path)?local->loc1.path:"", + strerror (op_errno)); + if (local->failed == -1) { + local->failed = 1; + } + local->op_errno = op_errno; + } else { + local->failed = 0; + local->op_ret = op_ret; + } + } + UNLOCK (&frame->lock); + + if (!callcnt) { + if (local->failed && local->name && + ZR_FILE_CONTENT_REQUEST(local->name)) { + dict = get_new_dict (); + dict_set (dict, local->dict->members_list->key, + data_from_dynptr(NULL, 0)); + dict_ref (dict); + + local->call_count = 1; + + STACK_WIND (frame, + unify_setxattr_file_cbk, + NS(this), + NS(this)->fops->setxattr, + &local->loc1, + dict, + XATTR_CREATE); + + dict_unref (dict); + return 0; + } + + unify_local_wipe (local); + STACK_UNWIND (frame, local->op_ret, local->op_errno); + } + + return 0; +} + +/** + * unify_sexattr - This function should be sent to all the storage nodes, + * which contains the file, (excluding namespace). + */ +int32_t +unify_setxattr (call_frame_t *frame, + xlator_t *this, + loc_t *loc, + dict_t *dict, + int32_t flags) +{ + unify_private_t *priv = this->private; + unify_local_t *local = NULL; + int16_t *list = NULL; + int16_t index = 0; + int32_t call_count = 0; + uint64_t tmp_list = 0; + data_pair_t *trav = dict->members_list; + + UNIFY_CHECK_INODE_CTX_AND_UNWIND_ON_ERR (loc); + + /* Initialization */ + INIT_LOCAL (frame, local); + local->failed = -1; + loc_copy (&local->loc1, loc); + + if (S_ISDIR (loc->inode->st_mode)) { + + if (trav && trav->key && ZR_FILE_CONTENT_REQUEST(trav->key)) { + /* direct the storage xlators to change file + content only if file exists */ + local->flags = flags; + local->dict = dict; + local->name = strdup (trav->key); + flags |= XATTR_REPLACE; + } + + local->call_count = priv->child_count; + for (index = 0; index < priv->child_count; index++) { + STACK_WIND (frame, + unify_setxattr_cbk, + priv->xl_array[index], + priv->xl_array[index]->fops->setxattr, + loc, dict, flags); + } + return 0; + } + + inode_ctx_get (loc->inode, this, &tmp_list); + list = (int16_t *)(long)tmp_list; + + for (index = 0; list[index] != -1; index++) { + if (NS(this) != priv->xl_array[list[index]]) { + local->call_count++; + call_count++; + } + } + + if (local->call_count) { + for (index = 0; list[index] != -1; index++) { + if (priv->xl_array[list[index]] != NS(this)) { + STACK_WIND (frame, + unify_setxattr_cbk, + priv->xl_array[list[index]], + priv->xl_array[list[index]]->fops->setxattr, + loc, + dict, + flags); + if (!--call_count) + break; + } + } + return 0; + } + + /* No entry in storage nodes */ + gf_log (this->name, GF_LOG_DEBUG, + "returning ENOENT, file not found on storage node."); + STACK_UNWIND (frame, -1, ENOENT); + + return 0; +} + + +/** + * unify_getxattr_cbk - This function is called from only one child, so, no + * need of any lock or anything else, just send it to above layer + */ +int32_t +unify_getxattr_cbk (call_frame_t *frame, + void *cookie, + xlator_t *this, + int32_t op_ret, + int32_t op_errno, + dict_t *value) +{ + int32_t callcnt = 0; + dict_t *local_value = NULL; + unify_local_t *local = frame->local; + call_frame_t *prev_frame = cookie; + + LOCK (&frame->lock); + { + callcnt = --local->call_count; + + if (op_ret == -1) { + local->op_errno = op_errno; + gf_log (this->name, + (((op_errno == ENOENT) || + (op_errno == ENODATA) || + (op_errno == ENOTSUP)) ? + GF_LOG_DEBUG : GF_LOG_ERROR), + "child(%s): path(%s): %s", + prev_frame->this->name, + (local->loc1.path)?local->loc1.path:"", + strerror (op_errno)); + } else { + if (!local->dict) + local->dict = dict_ref (value); + local->op_ret = op_ret; + } + } + UNLOCK (&frame->lock); + + if (!callcnt) { + local_value = local->dict; + local->dict = NULL; + + STACK_UNWIND (frame, local->op_ret, local->op_errno, + local_value); + + if (local_value) + dict_unref (local_value); + } + + return 0; +} + + +/** + * unify_getxattr - This FOP is sent to only the storage node. + */ +int32_t +unify_getxattr (call_frame_t *frame, + xlator_t *this, + loc_t *loc, + const char *name) +{ + unify_private_t *priv = this->private; + int16_t *list = NULL; + int16_t index = 0; + int16_t count = 0; + unify_local_t *local = NULL; + uint64_t tmp_list = 0; + + UNIFY_CHECK_INODE_CTX_AND_UNWIND_ON_ERR (loc); + INIT_LOCAL (frame, local); + + if (S_ISDIR (loc->inode->st_mode)) { + local->call_count = priv->child_count; + for (index = 0; index < priv->child_count; index++) + STACK_WIND (frame, + unify_getxattr_cbk, + priv->xl_array[index], + priv->xl_array[index]->fops->getxattr, + loc, + name); + return 0; + } + + inode_ctx_get (loc->inode, this, &tmp_list); + list = (int16_t *)(long)tmp_list; + + for (index = 0; list[index] != -1; index++) { + if (NS(this) != priv->xl_array[list[index]]) { + local->call_count++; + count++; + } + } + + if (count) { + for (index = 0; list[index] != -1; index++) { + if (priv->xl_array[list[index]] != NS(this)) { + STACK_WIND (frame, + unify_getxattr_cbk, + priv->xl_array[list[index]], + priv->xl_array[list[index]]->fops->getxattr, + loc, + name); + if (!--count) + break; + } + } + } else { + dict_t *tmp_dict = get_new_dict (); + gf_log (this->name, GF_LOG_DEBUG, + "%s: returning ENODATA, no file found on storage node", + loc->path); + STACK_UNWIND (frame, -1, ENODATA, tmp_dict); + dict_destroy (tmp_dict); + } + + return 0; +} + +/** + * unify_removexattr_cbk - Wait till all the child node returns the call + * and then UNWIND to above layer. + */ +int32_t +unify_removexattr_cbk (call_frame_t *frame, + void *cookie, + xlator_t *this, + int32_t op_ret, + int32_t op_errno) +{ + int32_t callcnt = 0; + unify_local_t *local = frame->local; + call_frame_t *prev_frame = cookie; + + LOCK (&frame->lock); + { + callcnt = --local->call_count; + if (op_ret == -1) { + local->op_errno = op_errno; + if (op_errno != ENOTSUP) + gf_log (this->name, GF_LOG_ERROR, + "child(%s): path(%s): %s", + prev_frame->this->name, + local->loc1.path, strerror (op_errno)); + } else { + local->op_ret = op_ret; + } + } + UNLOCK (&frame->lock); + + if (!callcnt) { + STACK_UNWIND (frame, local->op_ret, local->op_errno); + } + + return 0; +} + +/** + * unify_removexattr - Send it to all the child nodes which has the files. + */ +int32_t +unify_removexattr (call_frame_t *frame, + xlator_t *this, + loc_t *loc, + const char *name) +{ + unify_private_t *priv = this->private; + unify_local_t *local = NULL; + int16_t *list = NULL; + int16_t index = 0; + int32_t call_count = 0; + uint64_t tmp_list = 0; + + UNIFY_CHECK_INODE_CTX_AND_UNWIND_ON_ERR (loc); + + /* Initialization */ + INIT_LOCAL (frame, local); + + if (S_ISDIR (loc->inode->st_mode)) { + local->call_count = priv->child_count; + for (index = 0; index < priv->child_count; index++) + STACK_WIND (frame, + unify_removexattr_cbk, + priv->xl_array[index], + priv->xl_array[index]->fops->removexattr, + loc, + name); + + return 0; + } + + inode_ctx_get (loc->inode, this, &tmp_list); + list = (int16_t *)(long)tmp_list; + + for (index = 0; list[index] != -1; index++) { + if (NS(this) != priv->xl_array[list[index]]) { + local->call_count++; + call_count++; + } + } + + if (local->call_count) { + for (index = 0; list[index] != -1; index++) { + if (priv->xl_array[list[index]] != NS(this)) { + STACK_WIND (frame, + unify_removexattr_cbk, + priv->xl_array[list[index]], + priv->xl_array[list[index]]->fops->removexattr, + loc, + name); + if (!--call_count) + break; + } + } + return 0; + } + + gf_log (this->name, GF_LOG_DEBUG, + "%s: returning ENOENT, not found on storage node.", loc->path); + STACK_UNWIND (frame, -1, ENOENT); + + return 0; +} + + +int32_t +unify_mknod_unlink_cbk (call_frame_t *frame, + void *cookie, + xlator_t *this, + int32_t op_ret, + int32_t op_errno) +{ + unify_local_t *local = frame->local; + + if (op_ret == -1) + gf_log (this->name, GF_LOG_ERROR, + "%s: %s", local->loc1.path, strerror (op_errno)); + + unify_local_wipe (local); + /* No log required here as this -1 is for mknod call */ + STACK_UNWIND (frame, -1, local->op_errno, NULL, NULL); + return 0; +} + +/** + * unify_mknod_cbk - + */ +int32_t +unify_mknod_cbk (call_frame_t *frame, + void *cookie, + xlator_t *this, + int32_t op_ret, + int32_t op_errno, + inode_t *inode, + struct stat *buf) +{ + unify_local_t *local = frame->local; + + if (op_ret == -1) { + gf_log (this->name, GF_LOG_ERROR, + "mknod failed on storage node, sending unlink to " + "namespace"); + local->op_errno = op_errno; + STACK_WIND (frame, + unify_mknod_unlink_cbk, + NS(this), + NS(this)->fops->unlink, + &local->loc1); + return 0; + } + + local->stbuf = *buf; + local->stbuf.st_ino = local->st_ino; + unify_local_wipe (local); + STACK_UNWIND (frame, op_ret, op_errno, inode, &local->stbuf); + return 0; +} + +/** + * unify_ns_mknod_cbk - + */ +int32_t +unify_ns_mknod_cbk (call_frame_t *frame, + void *cookie, + xlator_t *this, + int32_t op_ret, + int32_t op_errno, + inode_t *inode, + struct stat *buf) +{ + struct sched_ops *sched_ops = NULL; + xlator_t *sched_xl = NULL; + unify_local_t *local = frame->local; + unify_private_t *priv = this->private; + int16_t *list = NULL; + int16_t index = 0; + call_frame_t *prev_frame = cookie; + + if (op_ret == -1) { + /* No need to send mknod request to other servers, + * as namespace action failed + */ + gf_log (this->name, GF_LOG_ERROR, + "child(%s): path(%s): %s", + prev_frame->this->name, local->loc1.path, + strerror (op_errno)); + unify_local_wipe (local); + STACK_UNWIND (frame, op_ret, op_errno, inode, buf); + return 0; + } + + /* Create one inode for this entry */ + local->op_ret = 0; + local->stbuf = *buf; + local->st_ino = buf->st_ino; + + list = CALLOC (1, sizeof (int16_t) * 3); + ERR_ABORT (list); + list[0] = priv->child_count; + list[2] = -1; + inode_ctx_put (inode, this, (uint64_t)(long)list); + + sched_ops = priv->sched_ops; + + /* Send mknod request to scheduled node now */ + sched_xl = sched_ops->schedule (this, local->loc1.path); + if (!sched_xl) { + gf_log (this->name, GF_LOG_ERROR, + "mknod failed on storage node, no node online " + "at the moment, sending unlink to NS"); + local->op_errno = ENOTCONN; + STACK_WIND (frame, + unify_mknod_unlink_cbk, + NS(this), + NS(this)->fops->unlink, + &local->loc1); + + return 0; + } + + for (index = 0; index < priv->child_count; index++) + if (sched_xl == priv->xl_array[index]) + break; + list[1] = index; + + STACK_WIND (frame, unify_mknod_cbk, + sched_xl, sched_xl->fops->mknod, + &local->loc1, local->mode, local->dev); + + return 0; +} + +/** + * unify_mknod - Create a device on namespace first, and later create on + * the storage node. + */ +int32_t +unify_mknod (call_frame_t *frame, + xlator_t *this, + loc_t *loc, + mode_t mode, + dev_t rdev) +{ + unify_local_t *local = NULL; + + /* Initialization */ + INIT_LOCAL (frame, local); + local->mode = mode; + local->dev = rdev; + loc_copy (&local->loc1, loc); + if (local->loc1.path == NULL) { + gf_log (this->name, GF_LOG_CRITICAL, "Not enough memory :O"); + STACK_UNWIND (frame, -1, ENOMEM, loc->inode, NULL); + return 0; + } + + STACK_WIND (frame, + unify_ns_mknod_cbk, + NS(this), + NS(this)->fops->mknod, + loc, + mode, + rdev); + + return 0; +} + +int32_t +unify_symlink_unlink_cbk (call_frame_t *frame, + void *cookie, + xlator_t *this, + int32_t op_ret, + int32_t op_errno) +{ + unify_local_t *local = frame->local; + if (op_ret == -1) + gf_log (this->name, GF_LOG_ERROR, + "%s: %s", local->loc1.path, strerror (op_errno)); + + unify_local_wipe (local); + STACK_UNWIND (frame, -1, local->op_errno, NULL, NULL); + return 0; +} + +/** + * unify_symlink_cbk - + */ +int32_t +unify_symlink_cbk (call_frame_t *frame, + void *cookie, + xlator_t *this, + int32_t op_ret, + int32_t op_errno, + inode_t *inode, + struct stat *buf) +{ + unify_local_t *local = frame->local; + + if (op_ret == -1) { + /* Symlink on storage node failed, hence send unlink + to the NS node */ + local->op_errno = op_errno; + gf_log (this->name, GF_LOG_ERROR, + "symlink on storage node failed, sending unlink " + "to namespace"); + + STACK_WIND (frame, + unify_symlink_unlink_cbk, + NS(this), + NS(this)->fops->unlink, + &local->loc1); + + return 0; + } + + local->stbuf = *buf; + local->stbuf.st_ino = local->st_ino; + unify_local_wipe (local); + STACK_UNWIND (frame, op_ret, op_errno, inode, &local->stbuf); + + return 0; +} + +/** + * unify_ns_symlink_cbk - + */ +int32_t +unify_ns_symlink_cbk (call_frame_t *frame, + void *cookie, + xlator_t *this, + int32_t op_ret, + int32_t op_errno, + inode_t *inode, + struct stat *buf) +{ + + struct sched_ops *sched_ops = NULL; + xlator_t *sched_xl = NULL; + int16_t *list = NULL; + unify_local_t *local = frame->local; + unify_private_t *priv = this->private; + int16_t index = 0; + + if (op_ret == -1) { + /* No need to send symlink request to other servers, + * as namespace action failed + */ + gf_log (this->name, GF_LOG_ERROR, + "namespace: path(%s): %s", + local->loc1.path, strerror (op_errno)); + unify_local_wipe (local); + STACK_UNWIND (frame, op_ret, op_errno, NULL, buf); + return 0; + } + + /* Create one inode for this entry */ + local->op_ret = 0; + local->st_ino = buf->st_ino; + + /* Start the mapping list */ + + list = CALLOC (1, sizeof (int16_t) * 3); + ERR_ABORT (list); + list[0] = priv->child_count; //namespace's index + list[2] = -1; + inode_ctx_put (inode, this, (uint64_t)(long)list); + + sched_ops = priv->sched_ops; + + /* Send symlink request to all the nodes now */ + sched_xl = sched_ops->schedule (this, local->loc1.path); + if (!sched_xl) { + /* Symlink on storage node failed, hence send unlink + to the NS node */ + local->op_errno = ENOTCONN; + gf_log (this->name, GF_LOG_ERROR, + "symlink on storage node failed, no node online, " + "sending unlink to namespace"); + + STACK_WIND (frame, + unify_symlink_unlink_cbk, + NS(this), + NS(this)->fops->unlink, + &local->loc1); + + return 0; + } + + for (index = 0; index < priv->child_count; index++) + if (sched_xl == priv->xl_array[index]) + break; + list[1] = index; + + STACK_WIND (frame, + unify_symlink_cbk, + sched_xl, + sched_xl->fops->symlink, + local->name, + &local->loc1); + + return 0; +} + +/** + * unify_symlink - + */ +int32_t +unify_symlink (call_frame_t *frame, + xlator_t *this, + const char *linkpath, + loc_t *loc) +{ + unify_local_t *local = NULL; + + /* Initialization */ + INIT_LOCAL (frame, local); + loc_copy (&local->loc1, loc); + local->name = strdup (linkpath); + + if ((local->name == NULL) || + (local->loc1.path == NULL)) { + gf_log (this->name, GF_LOG_CRITICAL, "Not enough memory :O"); + STACK_UNWIND (frame, -1, ENOMEM, loc->inode, NULL); + return 0; + } + + STACK_WIND (frame, + unify_ns_symlink_cbk, + NS(this), + NS(this)->fops->symlink, + linkpath, + loc); + + return 0; +} + + +int32_t +unify_rename_unlink_cbk (call_frame_t *frame, + void *cookie, + xlator_t *this, + int32_t op_ret, + int32_t op_errno) +{ + int32_t callcnt = 0; + unify_local_t *local = frame->local; + call_frame_t *prev_frame = cookie; + + if (op_ret == -1) { + gf_log (this->name, GF_LOG_ERROR, + "child(%s): path(%s -> %s): %s", + prev_frame->this->name, + local->loc1.path, local->loc2.path, + strerror (op_errno)); + + } + LOCK (&frame->lock); + { + callcnt = --local->call_count; + } + UNLOCK (&frame->lock); + + if (!callcnt) { + local->stbuf.st_ino = local->st_ino; + unify_local_wipe (local); + STACK_UNWIND (frame, local->op_ret, local->op_errno, + &local->stbuf); + } + return 0; +} + +int32_t +unify_ns_rename_undo_cbk (call_frame_t *frame, + void *cookie, + xlator_t *this, + int32_t op_ret, + int32_t op_errno, + struct stat *buf) +{ + unify_local_t *local = frame->local; + + if (op_ret == -1) { + gf_log (this->name, GF_LOG_ERROR, + "namespace: path(%s -> %s): %s", + local->loc1.path, local->loc2.path, + strerror (op_errno)); + } + + local->stbuf.st_ino = local->st_ino; + unify_local_wipe (local); + STACK_UNWIND (frame, local->op_ret, local->op_errno, &local->stbuf); + return 0; +} + +int32_t +unify_rename_cbk (call_frame_t *frame, + void *cookie, + xlator_t *this, + int32_t op_ret, + int32_t op_errno, + struct stat *buf) +{ + int32_t index = 0; + int32_t callcnt = 0; + int16_t *list = NULL; + unify_private_t *priv = this->private; + unify_local_t *local = frame->local; + call_frame_t *prev_frame = cookie; + + LOCK (&frame->lock); + { + callcnt = --local->call_count; + if (op_ret >= 0) { + if (!S_ISDIR (buf->st_mode)) + local->stbuf = *buf; + local->op_ret = op_ret; + } else { + gf_log (this->name, GF_LOG_ERROR, + "child(%s): path(%s -> %s): %s", + prev_frame->this->name, + local->loc1.path, local->loc2.path, + strerror (op_errno)); + local->op_errno = op_errno; + } + } + UNLOCK (&frame->lock); + + if (!callcnt) { + local->stbuf.st_ino = local->st_ino; + if (S_ISDIR (local->loc1.inode->st_mode)) { + unify_local_wipe (local); + STACK_UNWIND (frame, local->op_ret, local->op_errno, &local->stbuf); + return 0; + } + + if (local->op_ret == -1) { + /* TODO: check this logic */ + + /* Rename failed in storage node, successful on NS, + * hence, rename back the entries in NS */ + /* NOTE: this will be done only if the destination + * doesn't exists, if the destination exists, the + * job of correcting NS is left to self-heal + */ + if (!local->index) { + loc_t tmp_oldloc = { + /* its actual 'newloc->path' */ + .path = local->loc2.path, + .inode = local->loc1.inode, + .parent = local->loc2.parent + }; + + loc_t tmp_newloc = { + /* Actual 'oldloc->path' */ + .path = local->loc1.path, + .parent = local->loc1.parent + }; + + gf_log (this->name, GF_LOG_ERROR, + "rename succussful on namespace, on " + "stroage node failed, reverting back"); + + STACK_WIND (frame, + unify_ns_rename_undo_cbk, + NS(this), + NS(this)->fops->rename, + &tmp_oldloc, + &tmp_newloc); + return 0; + } + } else { + /* Rename successful on storage nodes */ + + int32_t idx = 0; + int16_t *tmp_list = NULL; + uint64_t tmp_list_int64 = 0; + if (local->loc2.inode) { + inode_ctx_get (local->loc2.inode, + this, &tmp_list_int64); + list = (int16_t *)(long)tmp_list_int64; + + } + + if (list) { + for (index = 0; list[index] != -1; index++); + tmp_list = CALLOC (1, index * 2); + memcpy (tmp_list, list, index * 2); + + for (index = 0; list[index] != -1; index++) { + /* TODO: Check this logic. */ + /* If the destination file exists in + * the same storage node where we sent + * 'rename' call, no need to send + * unlink + */ + for (idx = 0; + local->list[idx] != -1; idx++) { + if (tmp_list[index] == local->list[idx]) { + tmp_list[index] = priv->child_count; + continue; + } + } + + if (NS(this) != priv->xl_array[tmp_list[index]]) { + local->call_count++; + callcnt++; + } + } + + if (local->call_count) { + if (callcnt > 1) + gf_log (this->name, + GF_LOG_ERROR, + "%s->%s: more (%d) " + "subvolumes have the " + "newloc entry", + local->loc1.path, + local->loc2.path, + callcnt); + + for (index=0; + tmp_list[index] != -1; index++) { + if (NS(this) != priv->xl_array[tmp_list[index]]) { + STACK_WIND (frame, + unify_rename_unlink_cbk, + priv->xl_array[tmp_list[index]], + priv->xl_array[tmp_list[index]]->fops->unlink, + &local->loc2); + if (!--callcnt) + break; + } + } + + FREE (tmp_list); + return 0; + } + if (tmp_list) + FREE (tmp_list); + } + } + + /* Need not send 'unlink' to storage node */ + unify_local_wipe (local); + STACK_UNWIND (frame, local->op_ret, + local->op_errno, &local->stbuf); + } + + return 0; +} + +int32_t +unify_ns_rename_cbk (call_frame_t *frame, + void *cookie, + xlator_t *this, + int32_t op_ret, + int32_t op_errno, + struct stat *buf) +{ + int32_t index = 0; + int32_t callcnt = 0; + int16_t *list = NULL; + unify_private_t *priv = this->private; + unify_local_t *local = frame->local; + + if (op_ret == -1) { + /* Free local->new_inode */ + gf_log (this->name, GF_LOG_ERROR, + "namespace: path(%s -> %s): %s", + local->loc1.path, local->loc2.path, + strerror (op_errno)); + + unify_local_wipe (local); + STACK_UNWIND (frame, op_ret, op_errno, buf); + return 0; + } + + local->stbuf = *buf; + local->st_ino = buf->st_ino; + + /* Everything is fine. */ + if (S_ISDIR (buf->st_mode)) { + local->call_count = priv->child_count; + for (index=0; index < priv->child_count; index++) { + STACK_WIND (frame, + unify_rename_cbk, + priv->xl_array[index], + priv->xl_array[index]->fops->rename, + &local->loc1, + &local->loc2); + } + + return 0; + } + + local->call_count = 0; + /* send rename */ + list = local->list; + for (index=0; list[index] != -1; index++) { + if (NS(this) != priv->xl_array[list[index]]) { + local->call_count++; + callcnt++; + } + } + + if (local->call_count) { + for (index=0; list[index] != -1; index++) { + if (NS(this) != priv->xl_array[list[index]]) { + STACK_WIND (frame, + unify_rename_cbk, + priv->xl_array[list[index]], + priv->xl_array[list[index]]->fops->rename, + &local->loc1, + &local->loc2); + if (!--callcnt) + break; + } + } + } else { + /* file doesn't seem to be present in storage nodes */ + gf_log (this->name, GF_LOG_CRITICAL, + "CRITICAL: source file not in storage node, " + "rename successful on namespace :O"); + unify_local_wipe (local); + STACK_UNWIND (frame, -1, EIO, NULL); + } + return 0; +} + + +/** + * unify_rename - One of the tricky function. The deadliest of all :O + */ +int32_t +unify_rename (call_frame_t *frame, + xlator_t *this, + loc_t *oldloc, + loc_t *newloc) +{ + unify_local_t *local = NULL; + uint64_t tmp_list = 0; + + /* Initialization */ + INIT_LOCAL (frame, local); + loc_copy (&local->loc1, oldloc); + loc_copy (&local->loc2, newloc); + + if ((local->loc1.path == NULL) || + (local->loc2.path == NULL)) { + gf_log (this->name, GF_LOG_CRITICAL, "Not enough memory :O"); + STACK_UNWIND (frame, -1, ENOMEM, NULL); + return 0; + } + + inode_ctx_get (oldloc->inode, this, &tmp_list); + local->list = (int16_t *)(long)tmp_list; + + STACK_WIND (frame, + unify_ns_rename_cbk, + NS(this), + NS(this)->fops->rename, + oldloc, + newloc); + return 0; +} + +/** + * unify_link_cbk - + */ +int32_t +unify_link_cbk (call_frame_t *frame, + void *cookie, + xlator_t *this, + int32_t op_ret, + int32_t op_errno, + inode_t *inode, + struct stat *buf) +{ + unify_local_t *local = frame->local; + + if (op_ret >= 0) + local->stbuf = *buf; + local->stbuf.st_ino = local->st_ino; + + unify_local_wipe (local); + STACK_UNWIND (frame, op_ret, op_errno, inode, &local->stbuf); + + return 0; +} + +/** + * unify_ns_link_cbk - + */ +int32_t +unify_ns_link_cbk (call_frame_t *frame, + void *cookie, + xlator_t *this, + int32_t op_ret, + int32_t op_errno, + inode_t *inode, + struct stat *buf) +{ + unify_private_t *priv = this->private; + unify_local_t *local = frame->local; + int16_t *list = local->list; + int16_t index = 0; + + if (op_ret == -1) { + /* No need to send link request to other servers, + * as namespace action failed + */ + gf_log (this->name, GF_LOG_ERROR, + "namespace: path(%s -> %s): %s", + local->loc1.path, local->loc2.path, + strerror (op_errno)); + unify_local_wipe (local); + STACK_UNWIND (frame, op_ret, op_errno, inode, buf); + return 0; + } + + /* Update inode for this entry */ + local->op_ret = 0; + local->st_ino = buf->st_ino; + + /* Send link request to the node now */ + for (index = 0; list[index] != -1; index++) { + char need_break = (list[index+1] == -1); + if (priv->xl_array[list[index]] != NS (this)) { + STACK_WIND (frame, + unify_link_cbk, + priv->xl_array[list[index]], + priv->xl_array[list[index]]->fops->link, + &local->loc1, + &local->loc2); + } + if (need_break) + break; + } + + return 0; +} + +/** + * unify_link - + */ +int32_t +unify_link (call_frame_t *frame, + xlator_t *this, + loc_t *oldloc, + loc_t *newloc) +{ + unify_local_t *local = NULL; + uint64_t tmp_list = 0; + + UNIFY_CHECK_INODE_CTX_AND_UNWIND_ON_ERR (oldloc); + UNIFY_CHECK_INODE_CTX_AND_UNWIND_ON_ERR (newloc); + + /* Initialization */ + INIT_LOCAL (frame, local); + + loc_copy (&local->loc1, oldloc); + loc_copy (&local->loc2, newloc); + + inode_ctx_get (oldloc->inode, this, &tmp_list); + local->list = (int16_t *)(long)tmp_list; + + STACK_WIND (frame, + unify_ns_link_cbk, + NS(this), + NS(this)->fops->link, + oldloc, + newloc); + + return 0; +} + + +/** + * unify_checksum_cbk - + */ +int32_t +unify_checksum_cbk (call_frame_t *frame, + void *cookie, + xlator_t *this, + int32_t op_ret, + int32_t op_errno, + uint8_t *fchecksum, + uint8_t *dchecksum) +{ + STACK_UNWIND (frame, op_ret, op_errno, fchecksum, dchecksum); + + return 0; +} + +/** + * unify_checksum - + */ +int32_t +unify_checksum (call_frame_t *frame, + xlator_t *this, + loc_t *loc, + int32_t flag) +{ + STACK_WIND (frame, + unify_checksum_cbk, + NS(this), + NS(this)->fops->checksum, + loc, + flag); + + return 0; +} + + +/** + * unify_finodelk_cbk - + */ +int +unify_finodelk_cbk (call_frame_t *frame, void *cookie, xlator_t *this, + int32_t op_ret, int32_t op_errno) +{ + STACK_UNWIND (frame, op_ret, op_errno); + return 0; +} + +/** + * unify_finodelk + */ +int +unify_finodelk (call_frame_t *frame, xlator_t *this, + fd_t *fd, int cmd, struct flock *flock) +{ + UNIFY_CHECK_FD_CTX_AND_UNWIND_ON_ERR (fd); + xlator_t *child = NULL; + uint64_t tmp_child = 0; + + fd_ctx_get (fd, this, &tmp_child); + child = (xlator_t *)(long)tmp_child; + + STACK_WIND (frame, unify_finodelk_cbk, + child, child->fops->finodelk, + fd, cmd, flock); + + return 0; +} + + + +/** + * unify_fentrylk_cbk - + */ +int +unify_fentrylk_cbk (call_frame_t *frame, void *cookie, xlator_t *this, + int32_t op_ret, int32_t op_errno) +{ + STACK_UNWIND (frame, op_ret, op_errno); + return 0; +} + +/** + * unify_fentrylk + */ +int +unify_fentrylk (call_frame_t *frame, xlator_t *this, + fd_t *fd, const char *basename, + entrylk_cmd cmd, entrylk_type type) + +{ + UNIFY_CHECK_FD_CTX_AND_UNWIND_ON_ERR (fd); + xlator_t *child = NULL; + uint64_t tmp_child = 0; + + fd_ctx_get (fd, this, &tmp_child); + child = (xlator_t *)(long)tmp_child; + + STACK_WIND (frame, unify_fentrylk_cbk, + child, child->fops->fentrylk, + fd, basename, cmd, type); + + return 0; +} + + + +/** + * unify_fxattrop_cbk - + */ +int +unify_fxattrop_cbk (call_frame_t *frame, void *cookie, xlator_t *this, + int32_t op_ret, int32_t op_errno, dict_t *xattr) +{ + STACK_UNWIND (frame, op_ret, op_errno, xattr); + return 0; +} + +/** + * unify_fxattrop + */ +int +unify_fxattrop (call_frame_t *frame, xlator_t *this, + fd_t *fd, gf_xattrop_flags_t optype, dict_t *xattr) +{ + UNIFY_CHECK_FD_CTX_AND_UNWIND_ON_ERR (fd); + xlator_t *child = NULL; + uint64_t tmp_child = 0; + + fd_ctx_get (fd, this, &tmp_child); + child = (xlator_t *)(long)tmp_child; + + STACK_WIND (frame, unify_fxattrop_cbk, + child, child->fops->fxattrop, + fd, optype, xattr); + + return 0; +} + + +/** + * unify_inodelk_cbk - + */ +int +unify_inodelk_cbk (call_frame_t *frame, void *cookie, xlator_t *this, + int32_t op_ret, int32_t op_errno) +{ + STACK_UNWIND (frame, op_ret, op_errno); + return 0; +} + + +/** + * unify_inodelk + */ +int +unify_inodelk (call_frame_t *frame, xlator_t *this, + loc_t *loc, int cmd, struct flock *flock) +{ + xlator_t *child = NULL; + + child = unify_loc_subvol (loc, this); + + STACK_WIND (frame, unify_inodelk_cbk, + child, child->fops->inodelk, + loc, cmd, flock); + + return 0; +} + + + +/** + * unify_entrylk_cbk - + */ +int +unify_entrylk_cbk (call_frame_t *frame, void *cookie, xlator_t *this, + int32_t op_ret, int32_t op_errno) +{ + STACK_UNWIND (frame, op_ret, op_errno); + return 0; +} + +/** + * unify_entrylk + */ +int +unify_entrylk (call_frame_t *frame, xlator_t *this, + loc_t *loc, const char *basename, + entrylk_cmd cmd, entrylk_type type) + +{ + xlator_t *child = NULL; + + child = unify_loc_subvol (loc, this); + + STACK_WIND (frame, unify_entrylk_cbk, + child, child->fops->entrylk, + loc, basename, cmd, type); + + return 0; +} + + + +/** + * unify_xattrop_cbk - + */ +int +unify_xattrop_cbk (call_frame_t *frame, void *cookie, xlator_t *this, + int32_t op_ret, int32_t op_errno, dict_t *xattr) +{ + STACK_UNWIND (frame, op_ret, op_errno, xattr); + return 0; +} + +/** + * unify_xattrop + */ +int +unify_xattrop (call_frame_t *frame, xlator_t *this, + loc_t *loc, gf_xattrop_flags_t optype, dict_t *xattr) +{ + xlator_t *child = NULL; + + child = unify_loc_subvol (loc, this); + + STACK_WIND (frame, unify_xattrop_cbk, + child, child->fops->xattrop, + loc, optype, xattr); + + return 0; +} + + +/** + * notify + */ +int32_t +notify (xlator_t *this, + int32_t event, + void *data, + ...) +{ + unify_private_t *priv = this->private; + struct sched_ops *sched = NULL; + + if (!priv) { + return 0; + } + + sched = priv->sched_ops; + if (!sched) { + gf_log (this->name, GF_LOG_CRITICAL, "No scheduler :O"); + raise (SIGTERM); + return 0; + } + if (priv->namespace == data) { + if (event == GF_EVENT_CHILD_UP) { + sched->notify (this, event, data); + } + return 0; + } + + switch (event) + { + case GF_EVENT_CHILD_UP: + { + /* Call scheduler's update () to enable it for scheduling */ + sched->notify (this, event, data); + + LOCK (&priv->lock); + { + /* Increment the inode's generation, which is + used for self_heal */ + ++priv->inode_generation; + ++priv->num_child_up; + } + UNLOCK (&priv->lock); + + if (!priv->is_up) { + default_notify (this, event, data); + priv->is_up = 1; + } + } + break; + case GF_EVENT_CHILD_DOWN: + { + /* Call scheduler's update () to disable the child node + * for scheduling + */ + sched->notify (this, event, data); + LOCK (&priv->lock); + { + --priv->num_child_up; + } + UNLOCK (&priv->lock); + + if (priv->num_child_up == 0) { + /* Send CHILD_DOWN to upper layer */ + default_notify (this, event, data); + priv->is_up = 0; + } + } + break; + + default: + { + default_notify (this, event, data); + } + break; + } + + return 0; +} + +/** + * init - This function is called first in the xlator, while initializing. + * All the config file options are checked and appropriate flags are set. + * + * @this - + */ +int32_t +init (xlator_t *this) +{ + int32_t ret = 0; + int32_t count = 0; + data_t *scheduler = NULL; + data_t *data = NULL; + xlator_t *ns_xl = NULL; + xlator_list_t *trav = NULL; + xlator_list_t *xlparent = NULL; + xlator_list_t *parent = NULL; + unify_private_t *_private = NULL; + + /* Check for number of child nodes, if there is no child nodes, exit */ + if (!this->children) { + gf_log (this->name, GF_LOG_ERROR, + "No child nodes specified. check \"subvolumes \" " + "option in volfile"); + return -1; + } + + if (!this->parents) { + gf_log (this->name, GF_LOG_WARNING, + "dangling volume. check volfile "); + } + + /* Check for 'scheduler' in volume */ + scheduler = dict_get (this->options, "scheduler"); + if (!scheduler) { + gf_log (this->name, GF_LOG_ERROR, + "\"option scheduler \" is missing in volfile"); + return -1; + } + + /* Setting "option namespace " */ + data = dict_get (this->options, "namespace"); + if(!data) { + gf_log (this->name, GF_LOG_CRITICAL, + "namespace option not specified, Exiting"); + return -1; + } + /* Search namespace in the child node, if found, exit */ + trav = this->children; + while (trav) { + if (strcmp (trav->xlator->name, data->data) == 0) + break; + trav = trav->next; + } + if (trav) { + gf_log (this->name, GF_LOG_CRITICAL, + "namespace node used as a subvolume, Exiting"); + return -1; + } + + /* Search for the namespace node, if found, continue */ + ns_xl = this->next; + while (ns_xl) { + if (strcmp (ns_xl->name, data->data) == 0) + break; + ns_xl = ns_xl->next; + } + if (!ns_xl) { + gf_log (this->name, GF_LOG_CRITICAL, + "namespace node not found in volfile, Exiting"); + return -1; + } + + gf_log (this->name, GF_LOG_DEBUG, + "namespace node specified as %s", data->data); + + _private = CALLOC (1, sizeof (*_private)); + ERR_ABORT (_private); + _private->sched_ops = get_scheduler (this, scheduler->data); + if (!_private->sched_ops) { + gf_log (this->name, GF_LOG_CRITICAL, + "Error while loading scheduler. Exiting"); + FREE (_private); + return -1; + } + + if (ns_xl->parents) { + gf_log (this->name, GF_LOG_CRITICAL, + "Namespace node should not be a child of any other node. Exiting"); + FREE (_private); + return -1; + } + + _private->namespace = ns_xl; + + /* update _private structure */ + { + count = 0; + trav = this->children; + /* Get the number of child count */ + while (trav) { + count++; + trav = trav->next; + } + + gf_log (this->name, GF_LOG_DEBUG, + "Child node count is %d", count); + + _private->child_count = count; + if (count == 1) { + /* TODO: Should I error out here? */ + gf_log (this->name, GF_LOG_CRITICAL, + "WARNING: You have defined only one " + "\"subvolumes\" for unify volume. It may not " + "be the desired config, review your volume " + "volfile. If this is how you are testing it," + " you may hit some performance penalty"); + } + + _private->xl_array = CALLOC (1, + sizeof (xlator_t) * (count + 1)); + ERR_ABORT (_private->xl_array); + + count = 0; + trav = this->children; + while (trav) { + _private->xl_array[count++] = trav->xlator; + trav = trav->next; + } + _private->xl_array[count] = _private->namespace; + + /* self-heal part, start with generation '1' */ + _private->inode_generation = 1; + /* Because, Foreground part is tested well */ + _private->self_heal = ZR_UNIFY_FG_SELF_HEAL; + data = dict_get (this->options, "self-heal"); + if (data) { + if (strcasecmp (data->data, "off") == 0) + _private->self_heal = ZR_UNIFY_SELF_HEAL_OFF; + + if (strcasecmp (data->data, "foreground") == 0) + _private->self_heal = ZR_UNIFY_FG_SELF_HEAL; + + if (strcasecmp (data->data, "background") == 0) + _private->self_heal = ZR_UNIFY_BG_SELF_HEAL; + } + + /* optimist - ask bulde for more about it */ + data = dict_get (this->options, "optimist"); + if (data) { + if (gf_string2boolean (data->data, + &_private->optimist) == -1) { + gf_log (this->name, GF_LOG_ERROR, + "optimist excepts only boolean " + "options"); + } + } + + LOCK_INIT (&_private->lock); + } + + /* Now that everything is fine. */ + this->private = (void *)_private; + { + /* Initialize scheduler, if everything else is successful */ + ret = _private->sched_ops->init (this); + if (ret == -1) { + gf_log (this->name, GF_LOG_CRITICAL, + "Initializing scheduler failed, Exiting"); + FREE (_private); + return -1; + } + + ret = 0; + + /* This section is required because some fops may look + * for 'xl->parent' variable + */ + xlparent = CALLOC (1, sizeof (*xlparent)); + xlparent->xlator = this; + if (!ns_xl->parents) { + ns_xl->parents = xlparent; + } else { + parent = ns_xl->parents; + while (parent->next) + parent = parent->next; + parent->next = xlparent; + } + /* Initialize the namespace volume */ + if (!ns_xl->ready) { + ret = xlator_tree_init (ns_xl); + if (ret) { + gf_log (this->name, GF_LOG_ERROR, + "initializing namespace node failed, " + "Exiting"); + FREE (_private); + return -1; + } + } + } + + /* Tell namespace node that init is done */ + ns_xl->notify (ns_xl, GF_EVENT_PARENT_UP, this); + + return 0; +} + +/** + * fini - Free all the allocated memory + */ +void +fini (xlator_t *this) +{ + unify_private_t *priv = this->private; + priv->sched_ops->fini (this); + this->private = NULL; + LOCK_DESTROY (&priv->lock); + FREE (priv->xl_array); + FREE (priv); + return; +} + + +struct xlator_fops fops = { + .stat = unify_stat, + .chmod = unify_chmod, + .readlink = unify_readlink, + .mknod = unify_mknod, + .mkdir = unify_mkdir, + .unlink = unify_unlink, + .rmdir = unify_rmdir, + .symlink = unify_symlink, + .rename = unify_rename, + .link = unify_link, + .chown = unify_chown, + .truncate = unify_truncate, + .create = unify_create, + .open = unify_open, + .readv = unify_readv, + .writev = unify_writev, + .statfs = unify_statfs, + .flush = unify_flush, + .fsync = unify_fsync, + .setxattr = unify_setxattr, + .getxattr = unify_getxattr, + .removexattr = unify_removexattr, + .opendir = unify_opendir, + .readdir = unify_readdir, + .fsyncdir = unify_fsyncdir, + .access = unify_access, + .ftruncate = unify_ftruncate, + .fstat = unify_fstat, + .lk = unify_lk, + .fchown = unify_fchown, + .fchmod = unify_fchmod, + .utimens = unify_utimens, + .lookup = unify_lookup, + .getdents = unify_getdents, + .checksum = unify_checksum, + .inodelk = unify_inodelk, + .finodelk = unify_finodelk, + .entrylk = unify_entrylk, + .fentrylk = unify_fentrylk, + .xattrop = unify_xattrop, + .fxattrop = unify_fxattrop +}; + +struct xlator_mops mops = { +}; + +struct xlator_cbks cbks = { +}; + +struct volume_options options[] = { + { .key = { "namespace" }, + .type = GF_OPTION_TYPE_XLATOR + }, + { .key = { "scheduler" }, + .value = { "alu", "rr", "random", "nufa", "switch" }, + .type = GF_OPTION_TYPE_STR + }, + { .key = {"self-heal"}, + .value = { "foreground", "background", "off" }, + .type = GF_OPTION_TYPE_STR + }, + /* TODO: remove it some time later */ + { .key = {"optimist"}, + .type = GF_OPTION_TYPE_BOOL + }, + + { .key = {NULL} }, +}; -- cgit