diff options
Diffstat (limited to 'xlators/cluster/unify/src')
| -rw-r--r-- | xlators/cluster/unify/src/Makefile.am | 16 | ||||
| -rw-r--r-- | xlators/cluster/unify/src/unify-self-heal.c | 1225 | ||||
| -rw-r--r-- | xlators/cluster/unify/src/unify.c | 4451 | ||||
| -rw-r--r-- | xlators/cluster/unify/src/unify.h | 132 | 
4 files changed, 5824 insertions, 0 deletions
diff --git a/xlators/cluster/unify/src/Makefile.am b/xlators/cluster/unify/src/Makefile.am new file mode 100644 index 00000000000..b9e6f63e9d7 --- /dev/null +++ b/xlators/cluster/unify/src/Makefile.am @@ -0,0 +1,16 @@ + +xlator_LTLIBRARIES = unify.la +xlatordir = $(libdir)/glusterfs/$(PACKAGE_VERSION)/xlator/cluster + +unify_la_LDFLAGS = -module -avoidversion + +unify_la_SOURCES = unify.c unify-self-heal.c +unify_la_LIBADD = $(top_builddir)/libglusterfs/src/libglusterfs.la + +noinst_HEADERS = unify.h + +AM_CFLAGS = -fPIC -D_FILE_OFFSET_BITS=64 -D_GNU_SOURCE -Wall -D$(GF_HOST_OS) \ +	-I$(top_srcdir)/libglusterfs/src -shared -nostartfiles $(GF_CFLAGS) + +CLEANFILES =  + diff --git a/xlators/cluster/unify/src/unify-self-heal.c b/xlators/cluster/unify/src/unify-self-heal.c new file mode 100644 index 00000000000..4885dd91a35 --- /dev/null +++ b/xlators/cluster/unify/src/unify-self-heal.c @@ -0,0 +1,1225 @@ +/* +  Copyright (c) 2007, 2008 Z RESEARCH, Inc. <http://www.zresearch.com> +  This file is part of GlusterFS. + +  GlusterFS is free software; you can redistribute it and/or modify +  it under the terms of the GNU General Public License as published +  by the Free Software Foundation; either version 3 of the License, +  or (at your option) any later version. + +  GlusterFS is distributed in the hope that it will be useful, but +  WITHOUT ANY WARRANTY; without even the implied warranty of +  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU +  General Public License for more details. + +  You should have received a copy of the GNU General Public License +  along with this program.  If not, see +  <http://www.gnu.org/licenses/>. +*/ + +/** + * unify-self-heal.c :  + *   This file implements few functions which enables 'unify' translator  + *  to be consistent in its behaviour when  + *     > a node fails,  + *     > a node gets added,  + *     > a failed node comes back + *     > a new namespace server is added (ie, an fresh namespace server). + *  + *  This functionality of 'unify' will enable glusterfs to support storage + *  system failure, and maintain consistancy. This works both ways, ie, when + *  an entry (either file or directory) is found on namespace server, and not + *  on storage nodes, its created in storage nodes and vica-versa. + *  + *  The two fops, where it can be implemented are 'getdents ()' and 'lookup ()' + * + */ + +#ifndef _CONFIG_H +#define _CONFIG_H +#include "config.h" +#endif + +#include "glusterfs.h" +#include "unify.h" +#include "dict.h" +#include "xlator.h" +#include "hashfn.h" +#include "logging.h" +#include "stack.h" +#include "common-utils.h" + +int32_t +unify_sh_getdents_cbk (call_frame_t *frame, +		       void *cookie, +		       xlator_t *this, +		       int32_t op_ret, +		       int32_t op_errno, +		       dir_entry_t *entry, +		       int32_t count); + +int32_t +unify_sh_ns_getdents_cbk (call_frame_t *frame, +			  void *cookie, +			  xlator_t *this, +			  int32_t op_ret, +			  int32_t op_errno, +			  dir_entry_t *entry, +			  int32_t count); + +int32_t +unify_bgsh_getdents_cbk (call_frame_t *frame, +			 void *cookie, +			 xlator_t *this, +			 int32_t op_ret, +			 int32_t op_errno, +			 dir_entry_t *entry, +			 int32_t count); + +int32_t +unify_bgsh_ns_getdents_cbk (call_frame_t *frame, +			    void *cookie, +			    xlator_t *this, +			    int32_t op_ret, +			    int32_t op_errno, +			    dir_entry_t *entry, +			    int32_t count); + +/** + * unify_local_wipe - free all the extra allocation of local->* here. + */ +static void  +unify_local_wipe (unify_local_t *local) +{ +	/* Free the strdup'd variables in the local structure */ +	if (local->name) { +		FREE (local->name); +	} + +	if (local->sh_struct) { +		if (local->sh_struct->offset_list) +			FREE (local->sh_struct->offset_list); + +		if (local->sh_struct->entry_list) +			FREE (local->sh_struct->entry_list); + +		if (local->sh_struct->count_list) +			FREE (local->sh_struct->count_list); + +		FREE (local->sh_struct); +	} + +	loc_wipe (&local->loc1); +	loc_wipe (&local->loc2); +} + +int32_t  +unify_sh_setdents_cbk (call_frame_t *frame, +		       void *cookie, +		       xlator_t *this, +		       int32_t op_ret, +		       int32_t op_errno) +{ +	int32_t callcnt = -1; +	unify_local_t *local = frame->local; +	inode_t *inode = NULL; +	dict_t *tmp_dict = NULL; +	dir_entry_t *prev, *entry, *trav; + +	LOCK (&frame->lock); +	{ +		/* if local->call_count == 0, that means, setdents on  +		 * storagenodes is still pending. +		 */ +		if (local->call_count) +			callcnt = --local->call_count; +	} +	UNLOCK (&frame->lock); + +	if (callcnt == 0) { +		if (local->sh_struct->entry_list[0]) { +			prev = entry = local->sh_struct->entry_list[0]; +			if (!entry) +				return 0; +			trav = entry->next; +			while (trav) { +				prev->next = trav->next; +				FREE (trav->name); +				if (S_ISLNK (trav->buf.st_mode)) +					FREE (trav->link); +				FREE (trav); +				trav = prev->next; +			} +			FREE (entry); +		} + +		if (!local->flags) { +			if (local->sh_struct->count_list[0] >=  +			    UNIFY_SELF_HEAL_GETDENTS_COUNT) { +				/* count == size, that means, there are more entries +				   to read from */ +				//local->call_count = 0; +				local->sh_struct->offset_list[0] +=  +					UNIFY_SELF_HEAL_GETDENTS_COUNT; +				STACK_WIND (frame, +					    unify_sh_ns_getdents_cbk, +					    NS(this), +					    NS(this)->fops->getdents, +					    local->fd, +					    UNIFY_SELF_HEAL_GETDENTS_COUNT, +					    local->sh_struct->offset_list[0], +					    GF_GET_DIR_ONLY); +			}		 +		} else { +			inode = local->loc1.inode; +			fd_unref (local->fd); +			tmp_dict = local->dict; + +			unify_local_wipe (local); +			 +			STACK_UNWIND (frame, local->op_ret, local->op_errno,  +				      inode, &local->stbuf, local->dict); +			if (tmp_dict) +				dict_unref (local->dict); +		} +	} +   +	return 0; +} + + +int32_t +unify_sh_ns_getdents_cbk (call_frame_t *frame, +			  void *cookie, +			  xlator_t *this, +			  int32_t op_ret, +			  int32_t op_errno, +			  dir_entry_t *entry, +			  int32_t count) +{ +	unify_local_t *local = frame->local; +	unify_private_t *priv = this->private; +	long index = 0; +	unsigned long final = 0; +	dir_entry_t *tmp = CALLOC (1, sizeof (dir_entry_t)); +	 +	local->sh_struct->entry_list[0] = tmp; +	local->sh_struct->count_list[0] = count; +	if (entry) { +		tmp->next = entry->next; +		entry->next = NULL; +	} + +	if ((count < UNIFY_SELF_HEAL_GETDENTS_COUNT) || !entry) { +		final = 1; +	} + +	LOCK (&frame->lock); +	{ +		/* local->call_count will be '0' till now. make it 1 so, it  +		   can be UNWIND'ed for the last call. */ +		local->call_count = priv->child_count; +		if (final) +			local->flags = 1; +	} +	UNLOCK (&frame->lock); + +	for (index = 0; index < priv->child_count; index++)  +	{ +		STACK_WIND_COOKIE (frame, +				   unify_sh_setdents_cbk,  +				   (void *)index, +				   priv->xl_array[index], +				   priv->xl_array[index]->fops->setdents, +				   local->fd, GF_SET_DIR_ONLY, +				   local->sh_struct->entry_list[0], count); +	} + +	return 0; +} + +int32_t  +unify_sh_ns_setdents_cbk (call_frame_t *frame, +			  void *cookie, +			  xlator_t *this, +			  int32_t op_ret, +			  int32_t op_errno) +{ +	int32_t callcnt = -1; +	unify_local_t *local = frame->local; +	unify_private_t *priv = this->private; +	long index = (long)cookie; +	dir_entry_t *prev, *entry, *trav; + +	LOCK (&frame->lock); +	{ +		if (local->sh_struct->entry_list[index]) { +			prev = entry = local->sh_struct->entry_list[index]; +			trav = entry->next; +			while (trav) { +				prev->next = trav->next; +				FREE (trav->name); +				if (S_ISLNK (trav->buf.st_mode)) +					FREE (trav->link); +				FREE (trav); +				trav = prev->next; +			} +			FREE (entry); +		} +	} +	UNLOCK (&frame->lock); + +	if (local->sh_struct->count_list[index] <  +	    UNIFY_SELF_HEAL_GETDENTS_COUNT) { +		LOCK (&frame->lock); +		{ +			callcnt = --local->call_count; +		} +		UNLOCK (&frame->lock); +	} else { +		/* count == size, that means, there are more entries  +		   to read from */ +		local->sh_struct->offset_list[index] +=  +			UNIFY_SELF_HEAL_GETDENTS_COUNT; +		STACK_WIND_COOKIE (frame, +				   unify_sh_getdents_cbk, +				   cookie, +				   priv->xl_array[index], +				   priv->xl_array[index]->fops->getdents, +				   local->fd, +				   UNIFY_SELF_HEAL_GETDENTS_COUNT, +				   local->sh_struct->offset_list[index], +				   GF_GET_ALL); +     +		gf_log (this->name, GF_LOG_DEBUG,  +			"readdir on (%s) with offset %"PRId64"",  +			priv->xl_array[index]->name,  +			local->sh_struct->offset_list[index]); +	} + +	if (!callcnt) { +		/* All storage nodes have done unified setdents on NS node. +		 * Now, do getdents from NS and do setdents on storage nodes. +		 */ +     +		/* sh_struct->offset_list is no longer required for +		   storage nodes now */ +		local->sh_struct->offset_list[0] = 0; /* reset */ + +		STACK_WIND (frame, +			    unify_sh_ns_getdents_cbk, +			    NS(this), +			    NS(this)->fops->getdents, +			    local->fd, +			    UNIFY_SELF_HEAL_GETDENTS_COUNT, +			    0, /* In this call, do send '0' as offset */ +			    GF_GET_DIR_ONLY); +	} + +	return 0; +} + + +/** + * unify_sh_getdents_cbk - + */ +int32_t +unify_sh_getdents_cbk (call_frame_t *frame, +		       void *cookie, +		       xlator_t *this, +		       int32_t op_ret, +		       int32_t op_errno, +		       dir_entry_t *entry, +		       int32_t count) +{ +	int32_t callcnt = -1; +	unify_local_t *local = frame->local; +	unify_private_t *priv = this->private; +	long index = (long)cookie; +	dir_entry_t *tmp = NULL;  + +	if (op_ret >= 0 && count > 0) { +		/* There is some dentry found, just send the dentry to NS */ +		tmp = CALLOC (1, sizeof (dir_entry_t)); +		local->sh_struct->entry_list[index] = tmp; +		local->sh_struct->count_list[index] = count; +		if (entry) { +			tmp->next = entry->next; +			entry->next = NULL; +		} +		STACK_WIND_COOKIE (frame, +				   unify_sh_ns_setdents_cbk, +				   cookie, +				   NS(this), +				   NS(this)->fops->setdents, +				   local->fd, +				   GF_SET_IF_NOT_PRESENT, +				   local->sh_struct->entry_list[index], +				   count); +		return 0; +	} +   +	if (count < UNIFY_SELF_HEAL_GETDENTS_COUNT) { +		LOCK (&frame->lock); +		{ +			callcnt = --local->call_count; +		} +		UNLOCK (&frame->lock); +	} else { +		/* count == size, that means, there are more entries  +		   to read from */ +		local->sh_struct->offset_list[index] +=  +			UNIFY_SELF_HEAL_GETDENTS_COUNT; +		STACK_WIND_COOKIE (frame, +				   unify_sh_getdents_cbk, +				   cookie, +				   priv->xl_array[index], +				   priv->xl_array[index]->fops->getdents, +				   local->fd, +				   UNIFY_SELF_HEAL_GETDENTS_COUNT, +				   local->sh_struct->offset_list[index], +				   GF_GET_ALL); +     +		gf_log (this->name, GF_LOG_DEBUG,  +			"readdir on (%s) with offset %"PRId64"",  +			priv->xl_array[index]->name,  +			local->sh_struct->offset_list[index]); +	} + +	if (!callcnt) { +		/* All storage nodes have done unified setdents on NS node. +		 * Now, do getdents from NS and do setdents on storage nodes. +		 */ +     +		/* sh_struct->offset_list is no longer required for +		   storage nodes now */ +		local->sh_struct->offset_list[0] = 0; /* reset */ + +		STACK_WIND (frame, +			    unify_sh_ns_getdents_cbk, +			    NS(this), +			    NS(this)->fops->getdents, +			    local->fd, +			    UNIFY_SELF_HEAL_GETDENTS_COUNT, +			    0, /* In this call, do send '0' as offset */ +			    GF_GET_DIR_ONLY); +	} + +	return 0; +} + +/** + * unify_sh_opendir_cbk - + * + * @cookie:  + */ +int32_t  +unify_sh_opendir_cbk (call_frame_t *frame, +		      void *cookie, +		      xlator_t *this, +		      int32_t op_ret, +		      int32_t op_errno, +		      fd_t *fd) +{ +	int32_t callcnt = 0; +	unify_local_t *local = frame->local; +	unify_private_t *priv = this->private; +	int16_t index = 0; +	inode_t *inode = NULL; +	dict_t *tmp_dict = NULL; + +	LOCK (&frame->lock); +	{ +		callcnt = --local->call_count; +     +		if (op_ret >= 0) { +			local->op_ret = op_ret; +		} else { +			gf_log (this->name, GF_LOG_WARNING, "failed"); +			local->failed = 1; +		} +	} +	UNLOCK (&frame->lock); +   +	if (!callcnt) { +		local->call_count = priv->child_count + 1; +     +		if (!local->failed) { +			/* send getdents() namespace after finishing +			   storage nodes */ +			local->call_count--;  +       +			fd_bind (fd); + +			if (local->call_count) { +				/* Used as the offset index. This list keeps +				 * track of offset sent to each node during +				 * STACK_WIND. +				 */ +				local->sh_struct->offset_list =  +					calloc (priv->child_count,  +						sizeof (off_t)); +				ERR_ABORT (local->sh_struct->offset_list); +	 +				local->sh_struct->entry_list =  +					calloc (priv->child_count,  +						sizeof (dir_entry_t *)); +				ERR_ABORT (local->sh_struct->entry_list); + +				local->sh_struct->count_list =  +					calloc (priv->child_count,  +						sizeof (int)); +				ERR_ABORT (local->sh_struct->count_list); + +				/* Send getdents on all the fds */ +				for (index = 0;  +				     index < priv->child_count; index++) { +					STACK_WIND_COOKIE (frame, +							   unify_sh_getdents_cbk, +							   (void *)(long)index, +							   priv->xl_array[index], +							   priv->xl_array[index]->fops->getdents, +							   local->fd, +							   UNIFY_SELF_HEAL_GETDENTS_COUNT, +							   0, /* In this call, do send '0' as offset */ +							   GF_GET_ALL); +				} + +				/* did stack wind, so no need to unwind here */ +				return 0; +			} /* (local->call_count) */ +		} /* (!local->failed) */ + +		/* Opendir failed on one node. */  +		inode = local->loc1.inode; +		fd_unref (local->fd); +		tmp_dict = local->dict; + +		unify_local_wipe (local); +		/* Only 'self-heal' failed, lookup() was successful. */ +		local->op_ret = 0; + +		/* This is lookup_cbk ()'s UNWIND. */ +		STACK_UNWIND (frame, local->op_ret, local->op_errno, inode, +			      &local->stbuf, local->dict); +		if (tmp_dict) +			dict_unref (tmp_dict); +	} + +	return 0; +} + +/** + * gf_sh_checksum_cbk -  + *  + * @frame: frame used in lookup. get a copy of it, and use that copy. + * @this: pointer to unify xlator. + * @inode: pointer to inode, for which the consistency check is required. + * + */ +int32_t  +unify_sh_checksum_cbk (call_frame_t *frame, +		       void *cookie, +		       xlator_t *this, +		       int32_t op_ret, +		       int32_t op_errno, +		       uint8_t *file_checksum, +		       uint8_t *dir_checksum) +{ +	unify_local_t *local = frame->local; +	unify_private_t *priv = this->private; +	int16_t index = 0; +	int32_t callcnt = 0; +	inode_t *inode = NULL; +	dict_t *tmp_dict = NULL; + +	LOCK (&frame->lock); +	{ +		callcnt = --local->call_count; +		if (op_ret >= 0) { +			if (NS(this) == (xlator_t *)cookie) { +				memcpy (local->sh_struct->ns_file_checksum,  +					file_checksum, ZR_FILENAME_MAX); +				memcpy (local->sh_struct->ns_dir_checksum,  +					dir_checksum, ZR_FILENAME_MAX); +			} else { +				if (local->entry_count == 0) { +					/* Initialize the dir_checksum to be  +					 * used for comparision with other +					 * storage nodes. Should be done for +					 * the first successful call *only*.  +					 */ +                                        /* Using 'entry_count' as a flag */ +					local->entry_count = 1; +					memcpy (local->sh_struct->dir_checksum, +						dir_checksum, ZR_FILENAME_MAX); +				} + +				/* Reply from the storage nodes */ +				for (index = 0;  +				     index < ZR_FILENAME_MAX; index++) { +					/* Files should be present in +					   only one node */ +					local->sh_struct->file_checksum[index] ^= file_checksum[index]; +	   +					/* directory structure should be +					   same accross */ +					if (local->sh_struct->dir_checksum[index] != dir_checksum[index]) +						local->failed = 1; +				} +			} +		}  +	} +	UNLOCK (&frame->lock); + +	if (!callcnt) { +		for (index = 0; index < ZR_FILENAME_MAX ; index++) { +			if (local->sh_struct->file_checksum[index] !=  +			    local->sh_struct->ns_file_checksum[index]) { +				local->failed = 1; +				break; +			} +			if (local->sh_struct->dir_checksum[index] !=  +			    local->sh_struct->ns_dir_checksum[index]) { +				local->failed = 1; +				break; +			} +		} +	 +		if (local->failed) { +			/* Log it, it should be a rare event */ +			gf_log (this->name, GF_LOG_WARNING,  +				"Self-heal triggered on directory %s",  +				local->loc1.path); + +			/* Any self heal will be done at directory level */ +			local->call_count = 0; +			local->op_ret = -1; +			local->failed = 0; +       +			local->fd = fd_create (local->loc1.inode,  +					       frame->root->pid); + +			local->call_count = priv->child_count + 1; +	 +			for (index = 0;  +			     index < (priv->child_count + 1); index++) { +				STACK_WIND_COOKIE (frame, +						   unify_sh_opendir_cbk, +						   priv->xl_array[index]->name, +						   priv->xl_array[index], +						   priv->xl_array[index]->fops->opendir, +						   &local->loc1, +						   local->fd); +			} +			/* opendir can be done on the directory */ +			return 0; +		} + +		/* no mismatch */ +		inode = local->loc1.inode; +		tmp_dict = local->dict; + +		unify_local_wipe (local); + +		/* This is lookup_cbk ()'s UNWIND. */ +		STACK_UNWIND (frame, +			      local->op_ret, +			      local->op_errno, +			      inode, +			      &local->stbuf, +			      local->dict); +		if (tmp_dict) +			dict_unref (tmp_dict); +	} + +	return 0; +} + +/* Foreground self-heal part over */ + +/* Background self-heal part */ + +int32_t  +unify_bgsh_setdents_cbk (call_frame_t *frame, +			 void *cookie, +			 xlator_t *this, +			 int32_t op_ret, +			 int32_t op_errno) +{ +	int32_t callcnt = -1; +	unify_local_t *local = frame->local; +	dir_entry_t *prev, *entry, *trav; + +	LOCK (&frame->lock); +	{ +		/* if local->call_count == 0, that means, setdents  +		   on storagenodes is still pending. */ +		if (local->call_count) +			callcnt = --local->call_count; +	} +	UNLOCK (&frame->lock); + + +	if (callcnt == 0) { +		if (local->sh_struct->entry_list[0]) { +			prev = entry = local->sh_struct->entry_list[0]; +			trav = entry->next; +			while (trav) { +				prev->next = trav->next; +				FREE (trav->name); +				if (S_ISLNK (trav->buf.st_mode)) +					FREE (trav->link); +				FREE (trav); +				trav = prev->next; +			} +			FREE (entry); +		} + +		if (!local->flags) { +			if (local->sh_struct->count_list[0] >=  +			    UNIFY_SELF_HEAL_GETDENTS_COUNT) { +				/* count == size, that means, there are more +				   entries to read from */ +				//local->call_count = 0; +				local->sh_struct->offset_list[0] +=  +					UNIFY_SELF_HEAL_GETDENTS_COUNT; +				STACK_WIND (frame, +					    unify_bgsh_ns_getdents_cbk, +					    NS(this), +					    NS(this)->fops->getdents, +					    local->fd, +					    UNIFY_SELF_HEAL_GETDENTS_COUNT, +					    local->sh_struct->offset_list[0], +					    GF_GET_DIR_ONLY); +			}		 +		} else { +			fd_unref (local->fd); +			unify_local_wipe (local); +			STACK_DESTROY (frame->root); +		} +	} + +	return 0; +} + + +int32_t +unify_bgsh_ns_getdents_cbk (call_frame_t *frame, +			    void *cookie, +			    xlator_t *this, +			    int32_t op_ret, +			    int32_t op_errno, +			    dir_entry_t *entry, +			    int32_t count) +{ +	unify_local_t *local = frame->local; +	unify_private_t *priv = this->private; +	long index = 0; +	unsigned long final = 0; +	dir_entry_t *tmp = CALLOC (1, sizeof (dir_entry_t)); + +	local->sh_struct->entry_list[0] = tmp; +	local->sh_struct->count_list[0] = count; +	if (entry) { +		tmp->next = entry->next; +		entry->next = NULL; +	} + +	if ((count < UNIFY_SELF_HEAL_GETDENTS_COUNT) || !entry) { +		final = 1; +	} + +	LOCK (&frame->lock); +	{ +		/* local->call_count will be '0' till now. make it 1 so,  +		   it can be UNWIND'ed for the last call. */ +		local->call_count = priv->child_count; +		if (final) +			local->flags = 1; +	} +	UNLOCK (&frame->lock); + +	for (index = 0; index < priv->child_count; index++)  +	{ +		STACK_WIND_COOKIE (frame, +				   unify_bgsh_setdents_cbk,  +				   (void *)index, +				   priv->xl_array[index], +				   priv->xl_array[index]->fops->setdents, +				   local->fd, GF_SET_DIR_ONLY, +				   local->sh_struct->entry_list[0], count); +	} + +	return 0; +} + +int32_t  +unify_bgsh_ns_setdents_cbk (call_frame_t *frame, +			    void *cookie, +			    xlator_t *this, +			    int32_t op_ret, +			    int32_t op_errno) +{ +	int32_t callcnt = -1; +	unify_local_t *local = frame->local; +	unify_private_t *priv = this->private; +	long index = (long)cookie; +	dir_entry_t *prev, *entry, *trav; + +	if (local->sh_struct->entry_list[index]) { +		prev = entry = local->sh_struct->entry_list[index]; +		if (!entry) +			return 0; +		trav = entry->next; +		while (trav) { +			prev->next = trav->next; +			FREE (trav->name); +			if (S_ISLNK (trav->buf.st_mode)) +				FREE (trav->link); +			FREE (trav); +			trav = prev->next; +		} +		FREE (entry); +	} + +	if (local->sh_struct->count_list[index] <  +	    UNIFY_SELF_HEAL_GETDENTS_COUNT) { +		LOCK (&frame->lock); +		{ +			callcnt = --local->call_count; +		} +		UNLOCK (&frame->lock); +	} else { +		/* count == size, that means, there are more entries  +		   to read from */ +		local->sh_struct->offset_list[index] +=  +			UNIFY_SELF_HEAL_GETDENTS_COUNT; +		STACK_WIND_COOKIE (frame, +				   unify_bgsh_getdents_cbk, +				   cookie, +				   priv->xl_array[index], +				   priv->xl_array[index]->fops->getdents, +				   local->fd, +				   UNIFY_SELF_HEAL_GETDENTS_COUNT, +				   local->sh_struct->offset_list[index], +				   GF_GET_ALL); +     +		gf_log (this->name, GF_LOG_DEBUG,  +			"readdir on (%s) with offset %"PRId64"",  +			priv->xl_array[index]->name,  +			local->sh_struct->offset_list[index]); +	} + +	if (!callcnt) { +		/* All storage nodes have done unified setdents on NS node. +		 * Now, do getdents from NS and do setdents on storage nodes. +		 */ +     +		/* sh_struct->offset_list is no longer required for +		   storage nodes now */ +		local->sh_struct->offset_list[0] = 0; /* reset */ + +		STACK_WIND (frame, +			    unify_bgsh_ns_getdents_cbk, +			    NS(this), +			    NS(this)->fops->getdents, +			    local->fd, +			    UNIFY_SELF_HEAL_GETDENTS_COUNT, +			    0, /* In this call, do send '0' as offset */ +			    GF_GET_DIR_ONLY); +	} + +	return 0; +} + + +/** + * unify_bgsh_getdents_cbk - + */ +int32_t +unify_bgsh_getdents_cbk (call_frame_t *frame, +			 void *cookie, +			 xlator_t *this, +			 int32_t op_ret, +			 int32_t op_errno, +			 dir_entry_t *entry, +			 int32_t count) +{ +	int32_t callcnt = -1; +	unify_local_t *local = frame->local; +	unify_private_t *priv = this->private; +	long index = (long)cookie; +	dir_entry_t *tmp = NULL;  + +	if (op_ret >= 0 && count > 0) { +		/* There is some dentry found, just send the dentry to NS */ +		tmp = CALLOC (1, sizeof (dir_entry_t)); +		local->sh_struct->entry_list[index] = tmp; +		local->sh_struct->count_list[index] = count; +		if (entry) { +			tmp->next = entry->next; +			entry->next = NULL; +		} +		STACK_WIND_COOKIE (frame, +				   unify_bgsh_ns_setdents_cbk, +				   cookie, +				   NS(this), +				   NS(this)->fops->setdents, +				   local->fd, +				   GF_SET_IF_NOT_PRESENT, +				   local->sh_struct->entry_list[index], +				   count); +		return 0; +	} +   +	if (count < UNIFY_SELF_HEAL_GETDENTS_COUNT) { +		LOCK (&frame->lock); +		{ +			callcnt = --local->call_count; +		} +		UNLOCK (&frame->lock); +	} else { +		/* count == size, that means, there are more entries to read from */ +		local->sh_struct->offset_list[index] +=  +			UNIFY_SELF_HEAL_GETDENTS_COUNT; + +		STACK_WIND_COOKIE (frame, +				   unify_bgsh_getdents_cbk, +				   cookie, +				   priv->xl_array[index], +				   priv->xl_array[index]->fops->getdents, +				   local->fd, +				   UNIFY_SELF_HEAL_GETDENTS_COUNT, +				   local->sh_struct->offset_list[index], +				   GF_GET_ALL); +     +		gf_log (this->name, GF_LOG_DEBUG,  +			"readdir on (%s) with offset %"PRId64"",  +			priv->xl_array[index]->name,  +			local->sh_struct->offset_list[index]); +	} + +	if (!callcnt) { +		/* All storage nodes have done unified setdents on NS node. +		 * Now, do getdents from NS and do setdents on storage nodes. +		 */ +     +		/* sh_struct->offset_list is no longer required for  +		   storage nodes now */ +		local->sh_struct->offset_list[0] = 0; /* reset */ + +		STACK_WIND (frame, +			    unify_bgsh_ns_getdents_cbk, +			    NS(this), +			    NS(this)->fops->getdents, +			    local->fd, +			    UNIFY_SELF_HEAL_GETDENTS_COUNT, +			    0, /* In this call, do send '0' as offset */ +			    GF_GET_DIR_ONLY); +	} + +	return 0; +} + +/** + * unify_bgsh_opendir_cbk - + * + * @cookie:  + */ +int32_t  +unify_bgsh_opendir_cbk (call_frame_t *frame, +			void *cookie, +			xlator_t *this, +			int32_t op_ret, +			int32_t op_errno, +			fd_t *fd) +{ +	unify_local_t *local = frame->local; +	unify_private_t *priv = this->private; +	int32_t callcnt = 0; +	int16_t index = 0; + +	LOCK (&frame->lock); +	{ +		callcnt = --local->call_count; +     +		if (op_ret >= 0) { +			local->op_ret = op_ret; +		} else { +			local->failed = 1; +		} +	} +	UNLOCK (&frame->lock); +   +	if (!callcnt) { +		local->call_count = priv->child_count + 1; +     +		if (!local->failed) { +			/* send getdents() namespace after finishing  +			   storage nodes */ +			local->call_count--;  +			callcnt = local->call_count; +       +			fd_bind (fd); + +			if (local->call_count) { +				/* Used as the offset index. This list keeps  +				   track of offset sent to each node during  +				   STACK_WIND. */ +				local->sh_struct->offset_list =  +					calloc (priv->child_count,  +						sizeof (off_t)); +				ERR_ABORT (local->sh_struct->offset_list); +	 +				local->sh_struct->entry_list =  +					calloc (priv->child_count,  +						sizeof (dir_entry_t *)); +				ERR_ABORT (local->sh_struct->entry_list); + +				local->sh_struct->count_list =  +					calloc (priv->child_count,  +						sizeof (int)); +				ERR_ABORT (local->sh_struct->count_list); + +				/* Send getdents on all the fds */ +				for (index = 0;  +				     index < priv->child_count; index++) { +					STACK_WIND_COOKIE (frame, +							   unify_bgsh_getdents_cbk, +							   (void *)(long)index, +							   priv->xl_array[index], +							   priv->xl_array[index]->fops->getdents, +							   local->fd, +							   UNIFY_SELF_HEAL_GETDENTS_COUNT, +							   0, /* In this call, do send '0' as offset */ +							   GF_GET_ALL); +				} +				/* did a stack wind, so no need to unwind here */ +				return 0; +			} /* (local->call_count) */ +		} /* (!local->failed) */ + +		/* Opendir failed on one node. 	 */ +		fd_unref (local->fd); +		 +		unify_local_wipe (local); +		STACK_DESTROY (frame->root); +	} + +	return 0; +} + +/** + * gf_bgsh_checksum_cbk -  + *  + * @frame: frame used in lookup. get a copy of it, and use that copy. + * @this: pointer to unify xlator. + * @inode: pointer to inode, for which the consistency check is required. + * + */ +int32_t  +unify_bgsh_checksum_cbk (call_frame_t *frame, +			 void *cookie, +			 xlator_t *this, +			 int32_t op_ret, +			 int32_t op_errno, +			 uint8_t *file_checksum, +			 uint8_t *dir_checksum) +{ +	unify_local_t *local = frame->local; +	unify_private_t *priv = this->private; +	int16_t index = 0; +	int32_t callcnt = 0; +   +	LOCK (&frame->lock); +	{ +		callcnt = --local->call_count; +		if (op_ret >= 0) { +			if (NS(this) == (xlator_t *)cookie) { +				memcpy (local->sh_struct->ns_file_checksum,  +					file_checksum, ZR_FILENAME_MAX); +				memcpy (local->sh_struct->ns_dir_checksum,  +					dir_checksum, ZR_FILENAME_MAX); +			} else { +				if (local->entry_count == 0) { +					/* Initialize the dir_checksum to be  +					 * used for comparision with other  +					 * storage nodes. Should be done for +					 * the first successful call *only*.  +					 */ +					/* Using 'entry_count' as a flag */ +					local->entry_count = 1;  +					memcpy (local->sh_struct->dir_checksum, +						dir_checksum, ZR_FILENAME_MAX); +				} + +				/* Reply from the storage nodes */ +				for (index = 0;  +				     index < ZR_FILENAME_MAX; index++) { +					/* Files should be present in only  +					   one node */ +					local->sh_struct->file_checksum[index] ^= file_checksum[index]; +	   +					/* directory structure should be same  +					   accross */ +					if (local->sh_struct->dir_checksum[index] != dir_checksum[index]) +						local->failed = 1; +				} +			} +		}  +	} +	UNLOCK (&frame->lock); + +	if (!callcnt) { +		for (index = 0; index < ZR_FILENAME_MAX ; index++) { +			if (local->sh_struct->file_checksum[index] !=  +			    local->sh_struct->ns_file_checksum[index]) { +				local->failed = 1; +				break; +			} +			if (local->sh_struct->dir_checksum[index] !=  +			    local->sh_struct->ns_dir_checksum[index]) { +				local->failed = 1; +				break; +			} +		} +	 +		if (local->failed) { +			/* Log it, it should be a rare event */ +			gf_log (this->name, GF_LOG_WARNING,  +				"Self-heal triggered on directory %s",  +				local->loc1.path); + +			/* Any self heal will be done at the directory level */ +			local->op_ret = -1; +			local->failed = 0; +       +			local->fd = fd_create (local->loc1.inode,  +					       frame->root->pid); +			local->call_count = priv->child_count + 1; +	 +			for (index = 0;  +			     index < (priv->child_count + 1); index++) { +				STACK_WIND_COOKIE (frame, +						   unify_bgsh_opendir_cbk, +						   priv->xl_array[index]->name, +						   priv->xl_array[index], +						   priv->xl_array[index]->fops->opendir, +						   &local->loc1, +						   local->fd); +			} +       +			/* opendir can be done on the directory */ +			return 0; +		} + +		/* no mismatch */ +		unify_local_wipe (local); +		STACK_DESTROY (frame->root); +	} + +	return 0; +} + +/* Background self-heal part over */ + + + + +/** + * zr_unify_self_heal -  + *  + * @frame: frame used in lookup. get a copy of it, and use that copy. + * @this: pointer to unify xlator. + * @inode: pointer to inode, for which the consistency check is required. + * + */ +int32_t  +zr_unify_self_heal (call_frame_t *frame, +		    xlator_t *this, +		    unify_local_t *local) +{ +	unify_private_t *priv = this->private; +	call_frame_t *bg_frame = NULL; +	unify_local_t *bg_local = NULL; +	inode_t *tmp_inode = NULL; +	dict_t *tmp_dict = NULL; +	int16_t index = 0; +   +	if (local->inode_generation < priv->inode_generation) { +		/* Any self heal will be done at the directory level */ +		/* Update the inode's generation to the current generation +		   value. */ +		local->inode_generation = priv->inode_generation; +		inode_ctx_put (local->loc1.inode, this,  +			  (uint64_t)(long)local->inode_generation); + +		if (priv->self_heal == ZR_UNIFY_FG_SELF_HEAL) { +			local->op_ret = 0; +			local->failed = 0; +			local->call_count = priv->child_count + 1; +			local->sh_struct =  +				calloc (1, sizeof (struct unify_self_heal_struct)); +       +			/* +1 is for NS */ +			for (index = 0;  +			     index < (priv->child_count + 1); index++) { +				STACK_WIND_COOKIE (frame, +						   unify_sh_checksum_cbk, +						   priv->xl_array[index], +						   priv->xl_array[index], +						   priv->xl_array[index]->fops->checksum, +						   &local->loc1, +						   0); +			} + +			/* Self-heal in foreground, hence no need  +			   to UNWIND here */ +			return 0; +		} + +		/* Self Heal done in background */ +		bg_frame = copy_frame (frame); +		INIT_LOCAL (bg_frame, bg_local); +		loc_copy (&bg_local->loc1, &local->loc1); +		bg_local->op_ret = 0; +		bg_local->failed = 0; +		bg_local->call_count = priv->child_count + 1; +		bg_local->sh_struct =  +			calloc (1, sizeof (struct unify_self_heal_struct)); +     +		/* +1 is for NS */ +		for (index = 0; index < (priv->child_count + 1); index++) { +			STACK_WIND_COOKIE (bg_frame, +					   unify_bgsh_checksum_cbk, +					   priv->xl_array[index], +					   priv->xl_array[index], +					   priv->xl_array[index]->fops->checksum, +					   &bg_local->loc1, +					   0); +		} +	} + +	/* generation number matches, self heal already done or +	 * self heal done in background: just do STACK_UNWIND  +	 */ +	tmp_inode = local->loc1.inode; +	tmp_dict = local->dict; + +	unify_local_wipe (local); + +	/* This is lookup_cbk ()'s UNWIND. */ +	STACK_UNWIND (frame, +		      local->op_ret, +		      local->op_errno, +		      tmp_inode, +		      &local->stbuf, +		      local->dict); + +	if (tmp_dict) +		dict_unref (tmp_dict); + +	return 0; +} + diff --git a/xlators/cluster/unify/src/unify.c b/xlators/cluster/unify/src/unify.c new file mode 100644 index 00000000000..e2a5e14b191 --- /dev/null +++ b/xlators/cluster/unify/src/unify.c @@ -0,0 +1,4451 @@ +/* +  Copyright (c) 2006, 2007, 2008, 2009 Z RESEARCH, Inc. <http://www.zresearch.com> +  This file is part of GlusterFS. + +  GlusterFS is free software; you can redistribute it and/or modify +  it under the terms of the GNU General Public License as published +  by the Free Software Foundation; either version 3 of the License, +  or (at your option) any later version. + +  GlusterFS is distributed in the hope that it will be useful, but +  WITHOUT ANY WARRANTY; without even the implied warranty of +  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU +  General Public License for more details. + +  You should have received a copy of the GNU General Public License +  along with this program.  If not, see +  <http://www.gnu.org/licenses/>. +*/ + +/** + * xlators/cluster/unify: + *     - This xlator is one of the main translator in GlusterFS, which + *   actually does the clustering work of the file system. One need to  + *   understand that, unify assumes file to be existing in only one of  + *   the child node, and directories to be present on all the nodes.  + * + * NOTE: + *   Now, unify has support for global namespace, which is used to keep a  + * global view of fs's namespace tree. The stat for directories are taken + * just from the namespace, where as for files, just 'st_ino' is taken from + * Namespace node, and other stat info is taken from the actual storage node. + * Also Namespace node helps to keep consistant inode for files across  + * glusterfs (re-)mounts. + */ + +#ifndef _CONFIG_H +#define _CONFIG_H +#include "config.h" +#endif + +#include "glusterfs.h" +#include "unify.h" +#include "dict.h" +#include "xlator.h" +#include "hashfn.h" +#include "logging.h" +#include "stack.h" +#include "defaults.h" +#include "common-utils.h" +#include <signal.h> +#include <libgen.h> +#include "compat-errno.h" +#include "compat.h" + +#define UNIFY_CHECK_INODE_CTX_AND_UNWIND_ON_ERR(_loc) do { \ +  if (!(_loc && _loc->inode)) {                            \ +    STACK_UNWIND (frame, -1, EINVAL, NULL, NULL, NULL);    \ +    return 0;                                              \ +  }                                                        \ +} while(0) + + +#define UNIFY_CHECK_FD_CTX_AND_UNWIND_ON_ERR(_fd) do { \ +  if (!(_fd && !fd_ctx_get (_fd, this, NULL))) {       \ +    STACK_UNWIND (frame, -1, EBADFD, NULL, NULL);      \ +    return 0;                                          \ +  }                                                    \ +} while(0) + +#define UNIFY_CHECK_FD_AND_UNWIND_ON_ERR(_fd) do { \ +  if (!_fd) {                                      \ +    STACK_UNWIND (frame, -1, EBADFD, NULL, NULL);  \ +    return 0;                                      \ +  }                                                \ +} while(0) + +/** + * unify_local_wipe - free all the extra allocation of local->* here. + */ +static void  +unify_local_wipe (unify_local_t *local) +{ +	/* Free the strdup'd variables in the local structure */ +	if (local->name) { +		FREE (local->name); +	} +	loc_wipe (&local->loc1); +	loc_wipe (&local->loc2); +} + + + +/* + * unify_normalize_stats - + */ +void +unify_normalize_stats (struct statvfs *buf, +		       unsigned long bsize, +		       unsigned long frsize) +{ +	double factor; + +	if (buf->f_bsize != bsize) { +		factor = ((double) buf->f_bsize) / bsize; +		buf->f_bsize  = bsize; +		buf->f_bfree  = (fsblkcnt_t) (factor * buf->f_bfree); +		buf->f_bavail = (fsblkcnt_t) (factor * buf->f_bavail); +	} +   +	if (buf->f_frsize != frsize) { +		factor = ((double) buf->f_frsize) / frsize; +		buf->f_frsize = frsize; +		buf->f_blocks = (fsblkcnt_t) (factor * buf->f_blocks); +	} +} + + +xlator_t * +unify_loc_subvol (loc_t *loc, xlator_t *this) +{ +	unify_private_t *priv = NULL; +	xlator_t        *subvol = NULL; +	int16_t         *list = NULL; +	long             index = 0; +	xlator_t        *subvol_i = NULL; +	int              ret = 0; +	uint64_t         tmp_list = 0; + +	priv   = this->private; +	subvol = NS (this); + +	if (!S_ISDIR (loc->inode->st_mode)) { +		ret = inode_ctx_get (loc->inode, this, &tmp_list); +		list = (int16_t *)(long)tmp_list; +		if (!list) +			goto out; + +		for (index = 0; list[index] != -1; index++) { +			subvol_i = priv->xl_array[list[index]]; +			if (subvol_i != NS (this)) { +				subvol = subvol_i; +				break; +			} +		} +	} +out: +	return subvol; +} + + + +/** + * unify_statfs_cbk - + */ +int32_t +unify_statfs_cbk (call_frame_t *frame, +		  void *cookie, +		  xlator_t *this, +		  int32_t op_ret, +		  int32_t op_errno, +		  struct statvfs *stbuf) +{ +	int32_t callcnt = 0; +	struct statvfs *dict_buf = NULL; +	unsigned long bsize; +	unsigned long frsize; +	unify_local_t *local = (unify_local_t *)frame->local; +	call_frame_t *prev_frame = cookie; + +	LOCK (&frame->lock); +	{ +		if (op_ret >= 0) { +			/* when a call is successfull, add it to local->dict */ +			dict_buf = &local->statvfs_buf; + +			if (dict_buf->f_bsize != 0) { +				bsize  = max (dict_buf->f_bsize,  +					      stbuf->f_bsize); + +				frsize = max (dict_buf->f_frsize,  +					      stbuf->f_frsize); +				unify_normalize_stats(dict_buf, bsize, frsize); +				unify_normalize_stats(stbuf, bsize, frsize); +			} else { +				dict_buf->f_bsize   = stbuf->f_bsize; +				dict_buf->f_frsize  = stbuf->f_frsize; +			} +       +			dict_buf->f_blocks += stbuf->f_blocks; +			dict_buf->f_bfree  += stbuf->f_bfree; +			dict_buf->f_bavail += stbuf->f_bavail; +			dict_buf->f_files  += stbuf->f_files; +			dict_buf->f_ffree  += stbuf->f_ffree; +			dict_buf->f_favail += stbuf->f_favail; +			dict_buf->f_fsid    = stbuf->f_fsid; +			dict_buf->f_flag    = stbuf->f_flag; +			dict_buf->f_namemax = stbuf->f_namemax; +			local->op_ret = op_ret; +		} else { +			/* fop on storage node has failed due to some error */ +			if (op_errno != ENOTCONN) { +				gf_log (this->name, GF_LOG_ERROR,  +					"child(%s): %s",  +					prev_frame->this->name,  +					strerror (op_errno)); +			} +			local->op_errno = op_errno; +		} +		callcnt = --local->call_count; +	} +	UNLOCK (&frame->lock); + +	if (!callcnt) { +		STACK_UNWIND (frame, local->op_ret, local->op_errno,  +			      &local->statvfs_buf); +	} + +	return 0; +} + +/** + * unify_statfs - + */ +int32_t +unify_statfs (call_frame_t *frame, +	      xlator_t *this, +	      loc_t *loc) +{ +	unify_local_t *local = NULL; +	xlator_list_t *trav = this->children; + +	INIT_LOCAL (frame, local); +	local->call_count = ((unify_private_t *)this->private)->child_count; + +	while(trav) { +		STACK_WIND (frame, +			    unify_statfs_cbk, +			    trav->xlator, +			    trav->xlator->fops->statfs, +			    loc); +		trav = trav->next; +	} + +	return 0; +} + +/** + * unify_buf_cbk -  + */ +int32_t +unify_buf_cbk (call_frame_t *frame, +	       void *cookie, +	       xlator_t *this, +	       int32_t op_ret, +	       int32_t op_errno, +	       struct stat *buf) +{ +	int32_t callcnt = 0; +	unify_private_t *priv = this->private; +	unify_local_t *local = frame->local; +	call_frame_t *prev_frame = cookie; + +	LOCK (&frame->lock); +	{ +		callcnt = --local->call_count; +     +		if (op_ret == -1) { +			gf_log (this->name, GF_LOG_ERROR, +				"%s(): child(%s): path(%s): %s",  +				gf_fop_list[frame->root->op], +				prev_frame->this->name,  +				(local->loc1.path)?local->loc1.path:"",  +				strerror (op_errno)); + +			local->op_errno = op_errno; +			if ((op_errno == ENOENT) && priv->optimist)  +				local->op_ret = 0; +		} + +		if (op_ret >= 0) { +			local->op_ret = 0; + +			if (NS (this) == prev_frame->this) { +				local->st_ino = buf->st_ino; +				/* If the entry is directory, get the stat +				   from NS node */ +				if (S_ISDIR (buf->st_mode) ||  +				    !local->stbuf.st_blksize) { +					local->stbuf = *buf; +				} +			} + +			if ((!S_ISDIR (buf->st_mode)) &&  +			    (NS (this) != prev_frame->this)) { +				/* If file, take the stat info from Storage  +				   node. */ +				local->stbuf = *buf; +			} +		} +	} +	UNLOCK (&frame->lock); +     +	if (!callcnt) { +		/* If the inode number is not filled, operation should +		   fail */ +		if (!local->st_ino) +			local->op_ret = -1; + +		local->stbuf.st_ino = local->st_ino; +		unify_local_wipe (local); +		STACK_UNWIND (frame, local->op_ret, local->op_errno,  +			      &local->stbuf); +	} + +	return 0; +} + +#define check_if_dht_linkfile(s) ((s->st_mode & ~S_IFMT) == S_ISVTX) + +/** + * unify_lookup_cbk -  + */ +int32_t  +unify_lookup_cbk (call_frame_t *frame, +		  void *cookie, +		  xlator_t *this, +		  int32_t op_ret, +		  int32_t op_errno, +		  inode_t *inode, +		  struct stat *buf, +		  dict_t *dict) +{ +	int32_t callcnt = 0; +	unify_private_t *priv = this->private; +	unify_local_t *local = frame->local; +	inode_t *tmp_inode = NULL; +	dict_t *local_dict = NULL; + +	LOCK (&frame->lock); +	{ +		callcnt = --local->call_count; +  +		if (op_ret == -1) { +			if ((op_errno != ENOTCONN) && (op_errno != ENOENT)) { +				gf_log (this->name, GF_LOG_ERROR, +					"child(%s): path(%s): %s",  +					priv->xl_array[(long)cookie]->name,  +					local->loc1.path, strerror (op_errno)); +				local->op_errno = op_errno; +				local->failed = 1; + +			} else if (local->revalidate &&  +				   !(priv->optimist && (op_errno == ENOENT))) { + +				gf_log (this->name,  +					(op_errno == ENOTCONN) ?  +					GF_LOG_DEBUG:GF_LOG_ERROR, +					"child(%s): path(%s): %s",  +					priv->xl_array[(long)cookie]->name,  +					local->loc1.path, strerror (op_errno)); +				local->op_errno = op_errno; +				local->failed = 1; +			} +		} + +		if (op_ret == 0) { +			local->op_ret = 0;  +			 +			if (check_if_dht_linkfile(buf)) { +				gf_log (this->name, GF_LOG_CRITICAL, +					"file %s may be DHT link file on %s, " +					"make sure the backend is not shared " +					"between unify and DHT",  +					local->loc1.path,  +					priv->xl_array[(long)cookie]->name); +			} + +			if (local->stbuf.st_mode && local->stbuf.st_blksize) { +				/* make sure we already have a stbuf +				   stored in local->stbuf */ +				if (S_ISDIR (local->stbuf.st_mode) &&  +				    !S_ISDIR (buf->st_mode)) { +					gf_log (this->name, GF_LOG_CRITICAL,  +						"[CRITICAL] '%s' is directory " +						"on namespace, non-directory " +						"on node '%s', returning EIO", +						local->loc1.path,  +						priv->xl_array[(long)cookie]->name); +					local->return_eio = 1; +				} +				if (!S_ISDIR (local->stbuf.st_mode) &&  +				    S_ISDIR (buf->st_mode)) { +					gf_log (this->name, GF_LOG_CRITICAL,  +						"[CRITICAL] '%s' is directory " +						"on node '%s', non-directory " +						"on namespace, returning EIO", +						local->loc1.path,  +						priv->xl_array[(long)cookie]->name); +					local->return_eio = 1; +				} +			} +	 +			if (!local->revalidate && !S_ISDIR (buf->st_mode)) { +				/* This is the first time lookup on file*/ +				if (!local->list) { +					/* list is not allocated, allocate  +					   the max possible range */ +					local->list = CALLOC (1, 2 * (priv->child_count + 2)); +					if (!local->list) { +						gf_log (this->name,  +							GF_LOG_CRITICAL,  +							"Not enough memory"); +						STACK_UNWIND (frame, -1,  +							      ENOMEM, inode,  +							      NULL, NULL); +						return 0; +					} +				} +				/* update the index of the list */ +				local->list [local->index++] =  +					(int16_t)(long)cookie; +			} +       +			if ((!local->dict) && dict && +			    (priv->xl_array[(long)cookie] != NS(this)))	{ +				local->dict = dict_ref (dict); +			} + +			/* index of NS node is == total child count */ +			if (priv->child_count == (int16_t)(long)cookie) { +				/* Take the inode number from namespace */ +				local->st_ino = buf->st_ino; +				if (S_ISDIR (buf->st_mode) ||  +				    !(local->stbuf.st_blksize)) { +					local->stbuf = *buf; +				} +			} else if (!S_ISDIR (buf->st_mode)) { +				/* If file, then get the stat from  +				   storage node */ +				local->stbuf = *buf; +			} + +			if (local->st_nlink < buf->st_nlink) { +				local->st_nlink = buf->st_nlink; +			} +		} +	} +	UNLOCK (&frame->lock); + +	if (!callcnt) { +		local_dict = local->dict; +		if (local->return_eio) { +			gf_log (this->name, GF_LOG_CRITICAL,  +				"[CRITICAL] Unable to fix the path (%s) with " +				"self-heal, try manual verification. " +				"returning EIO.", local->loc1.path); +			unify_local_wipe (local); +			STACK_UNWIND (frame, -1, EIO, inode, NULL, NULL); +			if (local_dict)	{ +				dict_unref (local_dict); +			} +			return 0; +		} + +		if (!local->stbuf.st_blksize) { +			/* Inode not present */ +			local->op_ret = -1; +		} else { +			if (!local->revalidate &&  +			    !S_ISDIR (local->stbuf.st_mode)) {  +				/* If its a file, big array is useless,  +				   allocate the smaller one */ +				int16_t *list = NULL; +				list = CALLOC (1, 2 * (local->index + 1)); +				ERR_ABORT (list); +				memcpy (list, local->list, 2 * local->index); +				/* Make the end of the list as -1 */ +				FREE (local->list); +				local->list = list; +				local->list [local->index] = -1; +				/* Update the inode's ctx with proper array */ +				/* TODO: log on failure */ +				inode_ctx_put (local->loc1.inode, this,  +					       (uint64_t)(long)local->list); +			} + +			if (S_ISDIR(local->loc1.inode->st_mode)) { +				/* lookup is done for directory */ +				if (local->failed && priv->self_heal) { +					/* Triggering self-heal */ +                                        /* means, self-heal required for this  +					   inode */ +					local->inode_generation = 0;  +					priv->inode_generation++; +				} +			} else { +				local->stbuf.st_ino = local->st_ino; +			} +	   +			local->stbuf.st_nlink = local->st_nlink; +		} +		if (local->op_ret == -1) { +			if (!local->revalidate && local->list) +				FREE (local->list); +		} + +		if ((local->op_ret >= 0) && local->failed &&  +		    local->revalidate) { +			/* Done revalidate, but it failed */ +			if (op_errno != ENOTCONN) { +				gf_log (this->name, GF_LOG_ERROR,  +					"Revalidate failed for path(%s): %s",  +					local->loc1.path, strerror (op_errno)); +			} +			local->op_ret = -1; +		} + +		if ((priv->self_heal && !priv->optimist) &&  +		    (!local->revalidate && (local->op_ret == 0) &&  +		     S_ISDIR(local->stbuf.st_mode))) { +			/* Let the self heal be done here */ +			zr_unify_self_heal (frame, this, local); +			local_dict = NULL; +		} else { +			/* either no self heal, or op_ret == -1 (failure) */ +			tmp_inode = local->loc1.inode; +			unify_local_wipe (local); +			STACK_UNWIND (frame, local->op_ret, local->op_errno,  +				      tmp_inode, &local->stbuf, local->dict); +		} +		if (local_dict) { +			dict_unref (local_dict); +		} +	} +   +	return 0; +} + +/** + * unify_lookup -  + */ +int32_t  +unify_lookup (call_frame_t *frame, +	      xlator_t *this, +	      loc_t *loc, +	      dict_t *xattr_req) +{ +	unify_local_t *local = NULL; +	unify_private_t *priv = this->private; +	int16_t *list = NULL; +	long index = 0; + +	if (!(loc && loc->inode)) { +		gf_log (this->name, GF_LOG_ERROR,  +			"%s: Argument not right", loc?loc->path:"(null)"); +		STACK_UNWIND (frame, -1, EINVAL, NULL, NULL, NULL); +		return 0; +	} + +	/* Initialization */ +	INIT_LOCAL (frame, local); +	loc_copy (&local->loc1, loc); +	if (local->loc1.path == NULL) { +		gf_log (this->name, GF_LOG_CRITICAL, "Not enough memory :O"); +		STACK_UNWIND (frame, -1, ENOMEM, loc->inode, NULL, NULL); +		return 0; +	} + +	if (!inode_ctx_get (loc->inode, this, NULL) &&  +	    loc->inode->st_mode &&  +	    !S_ISDIR (loc->inode->st_mode)) { +		uint64_t tmp_list = 0; +		/* check if revalidate or fresh lookup */ +		inode_ctx_get (loc->inode, this, &tmp_list); +		local->list = (int16_t *)(long)tmp_list; +	} + +	if (local->list) { +		list = local->list; +		for (index = 0; list[index] != -1; index++); +		if (index != 2) { +			if (index < 2) { +				gf_log (this->name, GF_LOG_ERROR, +					"returning ESTALE for %s: file " +					"count is %ld", loc->path, index); +				/* Print where all the file is present */ +				for (index = 0;  +				     local->list[index] != -1; index++) { +					gf_log (this->name, GF_LOG_ERROR,  +						"%s: found on %s", loc->path,  +						priv->xl_array[list[index]]->name); +				} +				unify_local_wipe (local); +				STACK_UNWIND (frame, -1, ESTALE,  +					      NULL, NULL, NULL); +				return 0;   +			} else { +				/* There are more than 2 presences */ +				/* Just log and continue */ +				gf_log (this->name, GF_LOG_ERROR, +					"%s: file count is %ld",  +					loc->path, index); +				/* Print where all the file is present */ +				for (index = 0;  +				     local->list[index] != -1; index++) { +					gf_log (this->name, GF_LOG_ERROR,  +						"%s: found on %s", loc->path,  +						priv->xl_array[list[index]]->name); +				} +			} +		} +       +		/* is revalidate */ +		local->revalidate = 1; +       +		for (index = 0; list[index] != -1; index++) +			local->call_count++; +       +		for (index = 0; list[index] != -1; index++) { +			char need_break = (list[index+1] == -1); +			STACK_WIND_COOKIE (frame, +					   unify_lookup_cbk, +					   (void *)(long)list[index], //cookie +					   priv->xl_array [list[index]], +					   priv->xl_array [list[index]]->fops->lookup, +					   loc, +					   xattr_req); +			if (need_break) +				break; +		} +	} else { +		if (loc->inode->st_mode) { +			if (inode_ctx_get (loc->inode, this, NULL)) { +				inode_ctx_get (loc->inode, this,  +					       &local->inode_generation); +			} +		} +		/* This is first call, there is no list */ +		/* call count should be all child + 1 namespace */ +		local->call_count = priv->child_count + 1; +       +		for (index = 0; index <= priv->child_count; index++) { +			STACK_WIND_COOKIE (frame, +					   unify_lookup_cbk, +					   (void *)index, //cookie +					   priv->xl_array[index], +					   priv->xl_array[index]->fops->lookup, +					   loc, +					   xattr_req); +		} +	} + +	return 0; +} + +/** + * unify_stat - if directory, get the stat directly from NameSpace child. + *     if file, check for a hint and send it only there (also to NS). + *     if its a fresh stat, then do it on all the nodes. + * + * NOTE: for all the call, sending cookie as xlator pointer, which will be  + *       used in cbk. + */ +int32_t +unify_stat (call_frame_t *frame, +	    xlator_t *this, +	    loc_t *loc) +{ +	unify_local_t *local = NULL; +	unify_private_t *priv = this->private; +	int16_t index = 0; +	int16_t *list = NULL; +	uint64_t tmp_list = 0; + +	UNIFY_CHECK_INODE_CTX_AND_UNWIND_ON_ERR (loc); + +	/* Initialization */ +	INIT_LOCAL (frame, local); +	loc_copy (&local->loc1, loc); +	if (local->loc1.path == NULL) { +		gf_log (this->name, GF_LOG_CRITICAL, "Not enough memory :O"); +		STACK_UNWIND (frame, -1, ENOMEM, NULL); +		return 0; +	} +	local->st_ino = loc->inode->ino; +	if (S_ISDIR (loc->inode->st_mode)) { +		/* Directory */ +		local->call_count = 1; +		STACK_WIND (frame, unify_buf_cbk, NS(this), +			    NS(this)->fops->stat, loc); +	} else { +		/* File */ +		inode_ctx_get (loc->inode, this, &tmp_list); +    		list = (int16_t *)(long)tmp_list; + +		for (index = 0; list[index] != -1; index++) +			local->call_count++; +     +		for (index = 0; list[index] != -1; index++) { +			char need_break = (list[index+1] == -1); +			STACK_WIND (frame, +				    unify_buf_cbk, +				    priv->xl_array[list[index]], +				    priv->xl_array[list[index]]->fops->stat, +				    loc); +			if (need_break) +				break; +		} +	} + +	return 0; +} + +/** + * unify_access_cbk - + */ +int32_t +unify_access_cbk (call_frame_t *frame, +		  void *cookie, +		  xlator_t *this, +		  int32_t op_ret, +		  int32_t op_errno) +{ +	STACK_UNWIND (frame, op_ret, op_errno); +	return 0; +} + + +/** + * unify_access - Send request to only namespace, which has all the  + *      attributes set for the file. + */ +int32_t +unify_access (call_frame_t *frame, +	      xlator_t *this, +	      loc_t *loc, +	      int32_t mask) +{ +	UNIFY_CHECK_INODE_CTX_AND_UNWIND_ON_ERR (loc); + +	STACK_WIND (frame, +		    unify_access_cbk, +		    NS(this), +		    NS(this)->fops->access, +		    loc, +		    mask); + +	return 0; +} + +int32_t +unify_mkdir_cbk (call_frame_t *frame, +		 void *cookie, +		 xlator_t *this, +		 int32_t op_ret, +		 int32_t op_errno, +		 inode_t *inode, +		 struct stat *buf) +{ +	int32_t callcnt = 0; +	unify_private_t *priv = this->private; +	unify_local_t *local = frame->local; +	inode_t *tmp_inode = NULL; + +	LOCK (&frame->lock); +	{ +		callcnt = --local->call_count; +   +		if ((op_ret == -1) && !(priv->optimist &&  +					(op_errno == ENOENT ||  +					 op_errno == EEXIST))) { +			/* TODO: Decrement the inode_generation of  +			 * this->inode's parent inode, hence the missing  +			 * directory is created properly by self-heal.  +			 * Currently, there is no way to get the parent  +			 * inode directly. +			 */ +			gf_log (this->name, GF_LOG_ERROR, +				"child(%s): path(%s): %s",  +				priv->xl_array[(long)cookie]->name,  +				local->loc1.path, strerror (op_errno)); +			if (op_errno != EEXIST) +				local->failed = 1; +			local->op_errno = op_errno; +		} +   +		if (op_ret >= 0) +			local->op_ret = 0; + +	} +	UNLOCK (&frame->lock); +   +	if (!callcnt) { +		if (!local->failed) { +			inode_ctx_put (local->loc1.inode, this,  +				       priv->inode_generation); +		} +		 +		tmp_inode = local->loc1.inode; +		unify_local_wipe (local); + +		STACK_UNWIND (frame, local->op_ret, local->op_errno,  +			      tmp_inode, &local->stbuf); +	} + +	return 0; +} + +/** + * unify_ns_mkdir_cbk - + */ +int32_t +unify_ns_mkdir_cbk (call_frame_t *frame, +		    void *cookie, +		    xlator_t *this, +		    int32_t op_ret, +		    int32_t op_errno, +		    inode_t *inode, +		    struct stat *buf) +{ +	unify_private_t *priv = this->private; +	unify_local_t *local = frame->local; +	long index = 0; + +	if (op_ret == -1) { +		/* No need to send mkdir request to other servers,  +		 * as namespace action failed  +		 */ +		gf_log (this->name, GF_LOG_ERROR, +			"namespace: path(%s): %s",  +			local->name, strerror (op_errno)); +		unify_local_wipe (local); +		STACK_UNWIND (frame, op_ret, op_errno, inode, NULL); +		return 0; +	} +   +	/* Create one inode for this entry */ +	local->op_ret = 0; +	local->stbuf = *buf; + +	local->call_count = priv->child_count; + +	/* Send mkdir request to all the nodes now */ +	for (index = 0; index < priv->child_count; index++) { +		STACK_WIND_COOKIE (frame, +				   unify_mkdir_cbk, +				   (void *)index, //cookie +				   priv->xl_array[index], +				   priv->xl_array[index]->fops->mkdir, +				   &local->loc1, +				   local->mode); +	} +   +	return 0; +} + + +/** + * unify_mkdir - + */ +int32_t +unify_mkdir (call_frame_t *frame, +	     xlator_t *this, +	     loc_t *loc, +	     mode_t mode) +{ +	unify_local_t *local = NULL; + +	/* Initialization */ +	INIT_LOCAL (frame, local); +	local->mode = mode; + +	loc_copy (&local->loc1, loc); + +	if (local->loc1.path == NULL) { +		gf_log (this->name, GF_LOG_CRITICAL, "Not enough memory :O"); +		STACK_UNWIND (frame, -1, ENOMEM, NULL, NULL); +		return 0; +	} + +	STACK_WIND (frame, +		    unify_ns_mkdir_cbk, +		    NS(this), +		    NS(this)->fops->mkdir, +		    loc, +		    mode); +	return 0; +} + +/** + * unify_rmdir_cbk - + */ +int32_t +unify_rmdir_cbk (call_frame_t *frame, +		 void *cookie, +		 xlator_t *this, +		 int32_t op_ret, +		 int32_t op_errno) +{ +	int32_t callcnt = 0; +	unify_private_t *priv = this->private; +	unify_local_t *local = frame->local; + +	LOCK (&frame->lock); +	{ +		callcnt = --local->call_count; +		if (op_ret == 0 || (priv->optimist && (op_errno == ENOENT))) +			local->op_ret = 0; +		if (op_ret == -1) +			local->op_errno = op_errno; +	} +	UNLOCK (&frame->lock); + +	if (!callcnt) { +		unify_local_wipe (local); +		STACK_UNWIND (frame, local->op_ret, local->op_errno); +	} + +	return 0; +} + +/** + * unify_ns_rmdir_cbk - + */ +int32_t +unify_ns_rmdir_cbk (call_frame_t *frame, +		    void *cookie, +		    xlator_t *this, +		    int32_t op_ret, +		    int32_t op_errno) +{ +	int16_t index = 0; +	unify_private_t *priv = this->private; +	unify_local_t *local = frame->local; +   +	if (op_ret == -1) { +		/* No need to send rmdir request to other servers,  +		 * as namespace action failed  +		 */ +		gf_log (this->name,  +			((op_errno != ENOTEMPTY) ?  +			 GF_LOG_ERROR : GF_LOG_DEBUG), +			"namespace: path(%s): %s",  +			local->loc1.path, strerror (op_errno)); +		unify_local_wipe (local); +		STACK_UNWIND (frame, op_ret, op_errno); +		return 0; +	} + +	local->call_count = priv->child_count; + +	for (index = 0; index < priv->child_count; index++) { +		STACK_WIND (frame, +			    unify_rmdir_cbk, +			    priv->xl_array[index], +			    priv->xl_array[index]->fops->rmdir, +			    &local->loc1); +	} + +	return 0; +} + +/** + * unify_rmdir - + */ +int32_t +unify_rmdir (call_frame_t *frame, +	     xlator_t *this, +	     loc_t *loc) +{ +	unify_local_t *local = NULL; + +	UNIFY_CHECK_INODE_CTX_AND_UNWIND_ON_ERR (loc); + +	/* Initialization */ +	INIT_LOCAL (frame, local); + +	loc_copy (&local->loc1, loc); +	if (local->loc1.path == NULL) { +		gf_log (this->name, GF_LOG_CRITICAL, "Not enough memory :O"); +		STACK_UNWIND (frame, -1, ENOMEM); +		return 0; +	} + +	STACK_WIND (frame, +		    unify_ns_rmdir_cbk, +		    NS(this), +		    NS(this)->fops->rmdir, +		    loc); + +	return 0; +} + +/** + * unify_open_cbk - + */ +int32_t +unify_open_cbk (call_frame_t *frame, +		void *cookie, +		xlator_t *this, +		int32_t op_ret, +		int32_t op_errno, +		fd_t *fd) +{ +	int32_t callcnt = 0; +	unify_local_t *local = frame->local; + +	LOCK (&frame->lock); +	{ +		if (op_ret >= 0) { +			local->op_ret = op_ret; +			if (NS(this) != (xlator_t *)cookie) { +				/* Store child node's ptr, used in  +				   all the f*** / FileIO calls */ +				fd_ctx_set (fd, this, (uint64_t)(long)cookie); +			} +		} +		if (op_ret == -1) { +			local->op_errno = op_errno; +			local->failed = 1; +		} +		callcnt = --local->call_count; +	} +	UNLOCK (&frame->lock); +   +	if (!callcnt) { +		if ((local->failed == 1) && (local->op_ret >= 0)) { +			local->call_count = 1; +			/* return -1 to user */ +			local->op_ret = -1; +			//local->op_errno = EIO;  +       +			if (!fd_ctx_get (local->fd, this, NULL)) { +				gf_log (this->name, GF_LOG_ERROR,  +					"Open success on child node, " +					"failed on namespace"); +			} else { +				gf_log (this->name, GF_LOG_ERROR,  +					"Open success on namespace, " +					"failed on child node"); +			} +		} + +		unify_local_wipe (local); +		STACK_UNWIND (frame, local->op_ret,  +			      local->op_errno, local->fd); +	} + +	return 0; +} + +#ifdef GF_DARWIN_HOST_OS +/** + * unify_create_lookup_cbk -  + */ +int32_t  +unify_open_lookup_cbk (call_frame_t *frame, +		       void *cookie, +		       xlator_t *this, +		       int32_t op_ret, +		       int32_t op_errno, +		       inode_t *inode, +		       struct stat *buf, +		       dict_t *dict) +{ +	int32_t callcnt = 0; +	int16_t index = 0; +	unify_private_t *priv = this->private; +	unify_local_t *local = frame->local; + +	LOCK (&frame->lock); +	{ +		callcnt = --local->call_count; +		if ((op_ret == -1) && (op_errno != ENOENT)) { +			gf_log (this->name, GF_LOG_ERROR, +				"child(%s): path(%s): %s",  +				priv->xl_array[(long)cookie]->name,  +				local->loc1.path, strerror (op_errno)); +			local->op_errno = op_errno; +		} +     +		if (op_ret >= 0) { +			local->op_ret = op_ret;  +			local->index++; +			if (NS(this) == priv->xl_array[(long)cookie]) { +				local->list[0] = (int16_t)(long)cookie; +			} else { +				local->list[1] = (int16_t)(long)cookie; +			} +			if (S_ISDIR (buf->st_mode)) +				local->failed = 1; +		} +	} +	UNLOCK (&frame->lock); + +	if (!callcnt) { +		int16_t file_list[3] = {0,}; +		local->op_ret = -1; + +		file_list[0] = local->list[0]; +		file_list[1] = local->list[1]; +		file_list[2] = -1; + +		if (local->index != 2) { +			/* Lookup failed, can't do open */ +			gf_log (this->name, GF_LOG_ERROR, +				"%s: present on %d nodes",  +				local->name, local->index); + +			if (local->index < 2) { +				unify_local_wipe (local); +				gf_log (this->name, GF_LOG_ERROR, +					"returning as file found on less " +					"than 2 nodes"); +				STACK_UNWIND (frame, local->op_ret,  +					      local->op_errno, local->fd); +				return 0; +			} +		} + +		if (local->failed) { +			/* Open on directory, return EISDIR */ +			unify_local_wipe (local); +			STACK_UNWIND (frame, -1, EISDIR, local->fd); +			return 0; +		} + +		/* Everything is perfect :) */     +		local->call_count = 2; +     +		for (index = 0; file_list[index] != -1; index++) { +			char need_break = (file_list[index+1] == -1); +			STACK_WIND_COOKIE (frame, +					   unify_open_cbk, +					   priv->xl_array[file_list[index]], +					   priv->xl_array[file_list[index]], +					   priv->xl_array[file_list[index]]->fops->open, +					   &local->loc1, +					   local->flags, +					   local->fd); +			if (need_break) +				break; +		} +	} + +	return 0; +} + + +int32_t +unify_open_readlink_cbk (call_frame_t *frame, +			 void *cookie, +			 xlator_t *this, +			 int32_t op_ret, +			 int32_t op_errno, +			 const char *path) +{ +	int16_t index = 0; +	unify_private_t *priv = this->private; +	unify_local_t *local = frame->local; + +	if (op_ret == -1) { +		STACK_UNWIND (frame, -1, ENOENT); +		return 0; +	} + +	if (path[0] == '/') { +		local->name = strdup (path); +		ERR_ABORT (local->name); +	} else { +		char *tmp_str = strdup (local->loc1.path); +		char *tmp_base = dirname (tmp_str); +		local->name = CALLOC (1, ZR_PATH_MAX); +		strcpy (local->name, tmp_base); +		strncat (local->name, "/", 1); +		strcat (local->name, path); +		FREE (tmp_str); +	} +   +	local->list = CALLOC (1, sizeof (int16_t) * 3); +	ERR_ABORT (local->list); +	local->call_count = priv->child_count + 1; +	local->op_ret = -1; +	for (index = 0; index <= priv->child_count; index++) { +		/* Send the lookup to all the nodes including namespace */ +		STACK_WIND_COOKIE (frame, +				   unify_open_lookup_cbk, +				   (void *)(long)index, +				   priv->xl_array[index], +				   priv->xl_array[index]->fops->lookup, +				   &local->loc1, +				   NULL); +	} + +	return 0; +} +#endif /* GF_DARWIN_HOST_OS */ + +/** + * unify_open -  + */ +int32_t +unify_open (call_frame_t *frame, +	    xlator_t *this, +	    loc_t *loc, +	    int32_t flags, +	    fd_t *fd) +{ +	unify_private_t *priv = this->private; +	unify_local_t *local = NULL; +	int16_t *list = NULL; +	int16_t index = 0; +	int16_t file_list[3] = {0,}; +	uint64_t tmp_list = 0; + +	UNIFY_CHECK_INODE_CTX_AND_UNWIND_ON_ERR (loc); + +	/* Init */ +	INIT_LOCAL (frame, local); +	loc_copy (&local->loc1, loc); +	local->fd    = fd; +	local->flags = flags; +	inode_ctx_get (loc->inode, this, &tmp_list); +	list = (int16_t *)(long)tmp_list; + +	local->list = list; +	file_list[0] = priv->child_count; /* Thats namespace */ +	file_list[2] = -1; +	for (index = 0; list[index] != -1; index++) { +		local->call_count++; +		if (list[index] != priv->child_count) +			file_list[1] = list[index]; +	} + +	if (local->call_count != 2) { +		/* If the lookup was done for file */ +		gf_log (this->name, GF_LOG_ERROR, +			"%s: entry_count is %d", +			loc->path, local->call_count); +		for (index = 0; local->list[index] != -1; index++) +			gf_log (this->name, GF_LOG_ERROR, "%s: found on %s", +				loc->path, priv->xl_array[list[index]]->name); + +		if (local->call_count < 2) { +			gf_log (this->name, GF_LOG_ERROR, +				"returning EIO as file found on onlyone node"); +			STACK_UNWIND (frame, -1, EIO, fd); +			return 0; +		} +	} + +#ifdef GF_DARWIN_HOST_OS +	/* Handle symlink here */ +	if (S_ISLNK (loc->inode->st_mode)) { +		/* Callcount doesn't matter here */ +		STACK_WIND (frame, +			    unify_open_readlink_cbk, +			    NS(this), +			    NS(this)->fops->readlink, +			    loc, ZR_PATH_MAX); +		return 0; +	} +#endif /* GF_DARWIN_HOST_OS */ + +	local->call_count = 2; +	for (index = 0; file_list[index] != -1; index++) { +		char need_break = (file_list[index+1] == -1); +		STACK_WIND_COOKIE (frame, +				   unify_open_cbk, +				   priv->xl_array[file_list[index]], //cookie +				   priv->xl_array[file_list[index]], +				   priv->xl_array[file_list[index]]->fops->open, +				   loc, +				   flags, +				   fd); +		if (need_break) +			break; +	} + +	return 0; +} + + +int32_t  +unify_create_unlink_cbk (call_frame_t *frame, +			 void *cookie, +			 xlator_t *this, +			 int32_t op_ret, +			 int32_t op_errno) +{ +	unify_local_t *local = frame->local; +	inode_t *inode = local->loc1.inode; + +	unify_local_wipe (local); + +	STACK_UNWIND (frame, local->op_ret, local->op_errno, local->fd,  +		      inode, &local->stbuf); +   +	return 0; +} + +/** + * unify_create_open_cbk - + */ +int32_t +unify_create_open_cbk (call_frame_t *frame, +		       void *cookie, +		       xlator_t *this, +		       int32_t op_ret, +		       int32_t op_errno, +		       fd_t *fd) +{ +	int ret = 0; +	int32_t callcnt = 0; +	unify_local_t *local = frame->local; +	inode_t *inode = NULL; +	xlator_t *child = NULL; +	uint64_t tmp_value = 0; + +	LOCK (&frame->lock); +	{ +		if (op_ret >= 0) { +			local->op_ret = op_ret; +			if (NS(this) != (xlator_t *)cookie) { +				/* Store child node's ptr, used in all  +				   the f*** / FileIO calls */ +				/* TODO: log on failure */ +				ret = fd_ctx_get (fd, this, &tmp_value); +				cookie = (void *)(long)tmp_value; +			} else { +				/* NOTE: open successful on namespace. +				 *       fd's ctx can be used to identify open  +				 *       failure on storage subvolume. cool  +				 *       ide ;) */ +				local->failed = 0; +			} +		} else { +			gf_log (this->name, GF_LOG_ERROR, +				"child(%s): path(%s): %s",  +				((xlator_t *)cookie)->name, +				local->loc1.path, strerror (op_errno)); +			local->op_errno = op_errno; +			local->failed = 1; +		} +		callcnt = --local->call_count; +	} +	UNLOCK (&frame->lock); +   +	if (!callcnt) { +		if (local->failed == 1 && (local->op_ret >= 0)) { +			local->call_count = 1; +			/* return -1 to user */ +			local->op_ret = -1; +			local->op_errno = EIO; +			local->fd = fd; +			local->call_count = 1; + +			if (!fd_ctx_get (local->fd, this, &tmp_value)) { +				child = (xlator_t *)(long)tmp_value; + +				gf_log (this->name, GF_LOG_ERROR,  +					"Create success on child node, " +					"failed on namespace"); + +				STACK_WIND (frame, +					    unify_create_unlink_cbk, +					    child, +					    child->fops->unlink, +					    &local->loc1); +			} else { +				gf_log (this->name, GF_LOG_ERROR,  +					"Create success on namespace, " +					"failed on child node"); + +				STACK_WIND (frame, +					    unify_create_unlink_cbk, +					    NS(this), +					    NS(this)->fops->unlink, +					    &local->loc1); +			} +			return 0; +		} +		inode = local->loc1.inode; +		unify_local_wipe (local); +		STACK_UNWIND (frame, local->op_ret, local->op_errno, fd, +			      inode, &local->stbuf); +	} +	return 0; +} + +/** + * unify_create_lookup_cbk -  + */ +int32_t  +unify_create_lookup_cbk (call_frame_t *frame, +			 void *cookie, +			 xlator_t *this, +			 int32_t op_ret, +			 int32_t op_errno, +			 inode_t *inode, +			 struct stat *buf, +			 dict_t *dict) +{ +	int32_t callcnt = 0; +	int16_t index = 0; +	unify_private_t *priv = this->private; +	unify_local_t *local = frame->local; + +	LOCK (&frame->lock); +	{ +		callcnt = --local->call_count; +		if (op_ret == -1) { +			gf_log (this->name, GF_LOG_ERROR, +				"child(%s): path(%s): %s",  +				priv->xl_array[(long)cookie]->name,  +				local->loc1.path, strerror (op_errno)); +			local->op_errno = op_errno; +			local->failed = 1; +		} + +		if (op_ret >= 0) { +			local->op_ret = op_ret;  +			local->list[local->index++] = (int16_t)(long)cookie; +			if (NS(this) == priv->xl_array[(long)cookie]) { +				local->st_ino = buf->st_ino; +			} else { +				local->stbuf = *buf; +			} +		} +	} +	UNLOCK (&frame->lock); + +	if (!callcnt) { +		int16_t *list = local->list; +		int16_t file_list[3] = {0,}; +		local->op_ret = -1; + +		local->list [local->index] = -1; +		file_list[0] = list[0]; +		file_list[1] = list[1]; +		file_list[2] = -1; + +		local->stbuf.st_ino = local->st_ino; +		/* TODO: log on failure */ +		inode_ctx_put (local->loc1.inode, this,  +			       (uint64_t)(long)local->list); + +		if (local->index != 2) { +			/* Lookup failed, can't do open */ +			gf_log (this->name, GF_LOG_ERROR, +				"%s: present on %d nodes",  +				local->loc1.path, local->index); +			file_list[0] = priv->child_count; +			for (index = 0; list[index] != -1; index++) { +				gf_log (this->name, GF_LOG_ERROR,  +					"%s: found on %s", local->loc1.path,  +					priv->xl_array[list[index]]->name); +				if (list[index] != priv->child_count) +					file_list[1] = list[index]; +			} + +			if (local->index < 2) { +				unify_local_wipe (local); +				gf_log (this->name, GF_LOG_ERROR, +					"returning EIO as file found on " +					"only one node"); +				STACK_UNWIND (frame, -1, EIO,  +					      local->fd, inode, NULL); +				return 0; +			} +		} +		/* Everything is perfect :) */     +		local->call_count = 2; +     +		for (index = 0; file_list[index] != -1; index++) { +			char need_break = (file_list[index+1] == -1); +			STACK_WIND_COOKIE (frame, +					   unify_create_open_cbk, +					   priv->xl_array[file_list[index]], +					   priv->xl_array[file_list[index]], +					   priv->xl_array[file_list[index]]->fops->open, +					   &local->loc1, +					   local->flags, +					   local->fd); +			if (need_break) +				break; +		} +	} + +	return 0; +} + + +/** + * unify_create_cbk - + */ +int32_t +unify_create_cbk (call_frame_t *frame, +		  void *cookie, +		  xlator_t *this, +		  int32_t op_ret, +		  int32_t op_errno, +		  fd_t *fd, +		  inode_t *inode, +		  struct stat *buf) +{ +	int ret = 0; +	unify_local_t *local = frame->local; +	call_frame_t *prev_frame = cookie; +	inode_t *tmp_inode = NULL; + +	if (op_ret == -1) { +		/* send unlink () on Namespace */ +		local->op_errno = op_errno; +		local->op_ret = -1; +		local->call_count = 1; +		gf_log (this->name, GF_LOG_ERROR, +			"create failed on %s (file %s, error %s), " +			"sending unlink to namespace",  +			prev_frame->this->name,  +			local->loc1.path, strerror (op_errno)); + +		STACK_WIND (frame, +			    unify_create_unlink_cbk, +			    NS(this), +			    NS(this)->fops->unlink, +			    &local->loc1); + +		return 0; +	} + +	if (op_ret >= 0) { +		local->op_ret = op_ret; +		local->stbuf = *buf; +		/* Just inode number should be from NS node */ +		local->stbuf.st_ino = local->st_ino; + +		/* TODO: log on failure */ +		ret = fd_ctx_set (fd, this, (uint64_t)(long)prev_frame->this); +	} +   +	tmp_inode = local->loc1.inode; +	unify_local_wipe (local); +	STACK_UNWIND (frame, local->op_ret, local->op_errno, local->fd,  +		      tmp_inode, &local->stbuf); + +	return 0; +} + +/** + * unify_ns_create_cbk - + *  + */ +int32_t +unify_ns_create_cbk (call_frame_t *frame, +		     void *cookie, +		     xlator_t *this, +		     int32_t op_ret, +		     int32_t op_errno, +		     fd_t *fd, +		     inode_t *inode, +		     struct stat *buf) +{ +	struct sched_ops *sched_ops = NULL; +	xlator_t *sched_xl = NULL; +	unify_local_t *local = frame->local; +	unify_private_t *priv = this->private; +	int16_t *list = NULL; +	int16_t index = 0; + +	if (op_ret == -1) { +		/* No need to send create request to other servers, as  +		   namespace action failed. Handle exclusive create here. */ +		if ((op_errno != EEXIST) ||  +		    ((op_errno == EEXIST) &&  +		     ((local->flags & O_EXCL) == O_EXCL))) { +			/* If its just a create call without O_EXCL,  +			   don't do this */ +			gf_log (this->name, GF_LOG_ERROR, +				"namespace: path(%s): %s",  +				local->loc1.path, strerror (op_errno)); +			unify_local_wipe (local); +			STACK_UNWIND (frame, op_ret, op_errno, fd, inode, buf); +			return 0; +		} +	} +   +	if (op_ret >= 0) { +		/* Get the inode number from the NS node */ +		local->st_ino = buf->st_ino; +   +		local->op_ret = -1; + +		/* Start the mapping list */ +		list = CALLOC (1, sizeof (int16_t) * 3); +		ERR_ABORT (list); +		inode_ctx_put (inode, this, (uint64_t)(long)list); +		list[0] = priv->child_count; +		list[2] = -1; + +		/* This means, file doesn't exist anywhere in the Filesystem */ +		sched_ops = priv->sched_ops; + +		/* Send create request to the scheduled node now */ +		sched_xl = sched_ops->schedule (this, local->loc1.path); +		if (sched_xl == NULL) +		{ +			/* send unlink () on Namespace */ +			local->op_errno = ENOTCONN; +			local->op_ret = -1; +			local->call_count = 1; +			gf_log (this->name, GF_LOG_ERROR, +				"no node online to schedule create:(file %s) " +				"sending unlink to namespace",  +				(local->loc1.path)?local->loc1.path:""); + +			STACK_WIND (frame, +				    unify_create_unlink_cbk, +				    NS(this), +				    NS(this)->fops->unlink, +				    &local->loc1); +	 +			return 0; +		} + +		for (index = 0; index < priv->child_count; index++) +			if (sched_xl == priv->xl_array[index]) +				break; +		list[1] = index; + +		STACK_WIND (frame, unify_create_cbk, +			    sched_xl, sched_xl->fops->create, +			    &local->loc1, local->flags, local->mode, fd); +	} else { +		/* File already exists, and there is no O_EXCL flag */ + +		gf_log (this->name, GF_LOG_DEBUG,  +			"File(%s) already exists on namespace, sending " +			"open instead", local->loc1.path); + +		local->list = CALLOC (1, sizeof (int16_t) * 3); +		ERR_ABORT (local->list); +		local->call_count = priv->child_count + 1; +		local->op_ret = -1; +		for (index = 0; index <= priv->child_count; index++) { +			/* Send lookup() to all nodes including namespace */ +			STACK_WIND_COOKIE (frame, +					   unify_create_lookup_cbk, +					   (void *)(long)index, +					   priv->xl_array[index], +					   priv->xl_array[index]->fops->lookup, +					   &local->loc1, +					   NULL); +		} +	} +	return 0; +} + +/** + * unify_create - create a file in global namespace first, so other  + *    clients can see them. Create the file in storage nodes in background. + */ +int32_t +unify_create (call_frame_t *frame, +	      xlator_t *this, +	      loc_t *loc, +	      int32_t flags, +	      mode_t mode, +	      fd_t *fd) +{ +	unify_local_t *local = NULL; +   +	/* Initialization */ +	INIT_LOCAL (frame, local); +	local->mode = mode; +	local->flags = flags; +	local->fd = fd; + +	loc_copy (&local->loc1, loc); +	if (local->loc1.path == NULL) { +		gf_log (this->name, GF_LOG_CRITICAL, "Not enough memory :O"); +		STACK_UNWIND (frame, -1, ENOMEM, fd, loc->inode, NULL); +		return 0; +	} + +	STACK_WIND (frame, +		    unify_ns_create_cbk, +		    NS(this), +		    NS(this)->fops->create, +		    loc, +		    flags | O_EXCL, +		    mode, +		    fd); +   +	return 0; +} + + +/** + * unify_opendir_cbk -  + */ +int32_t +unify_opendir_cbk (call_frame_t *frame, +		   void *cookie, +		   xlator_t *this, +		   int32_t op_ret, +		   int32_t op_errno, +		   fd_t *fd) +{ +	STACK_UNWIND (frame, op_ret, op_errno, fd); + +	return 0; +} + +/**  + * unify_opendir - + */ +int32_t +unify_opendir (call_frame_t *frame, +	       xlator_t *this, +	       loc_t *loc, +	       fd_t *fd) +{ +	UNIFY_CHECK_INODE_CTX_AND_UNWIND_ON_ERR (loc); + +	STACK_WIND (frame, unify_opendir_cbk, +		    NS(this), NS(this)->fops->opendir, loc, fd); + +	return 0; +} + + +/** + * unify_chmod -  + */ +int32_t +unify_chmod (call_frame_t *frame, +	     xlator_t *this, +	     loc_t *loc, +	     mode_t mode) +{ +	unify_local_t *local = NULL; +	unify_private_t *priv = this->private; +	int32_t index = 0; +	int32_t callcnt = 0; +	uint64_t tmp_list = 0; +		 +	UNIFY_CHECK_INODE_CTX_AND_UNWIND_ON_ERR (loc); + +	/* Initialization */ +	INIT_LOCAL (frame, local); + +	loc_copy (&local->loc1, loc); +	local->st_ino = loc->inode->ino; + +	if (S_ISDIR (loc->inode->st_mode)) { +		local->call_count = priv->child_count + 1; +       +		for (index = 0; index < (priv->child_count + 1); index++) { +			STACK_WIND (frame, +				    unify_buf_cbk, +				    priv->xl_array[index], +				    priv->xl_array[index]->fops->chmod, +				    loc, mode); +		}     +	} else { +		inode_ctx_get (loc->inode, this, &tmp_list); +		local->list = (int16_t *)(long)tmp_list; + +		for (index = 0; local->list[index] != -1; index++) { +			local->call_count++; +			callcnt++; +		} +       +		for (index = 0; local->list[index] != -1; index++) { +			STACK_WIND (frame, +				    unify_buf_cbk, +				    priv->xl_array[local->list[index]], +				    priv->xl_array[local->list[index]]->fops->chmod, +				    loc, +				    mode); +			if (!--callcnt) +				break; +		} +	} + +	return 0; +} + +/** + * unify_chown -  + */ +int32_t +unify_chown (call_frame_t *frame, +	     xlator_t *this, +	     loc_t *loc, +	     uid_t uid, +	     gid_t gid) +{ +	unify_local_t *local = NULL; +	unify_private_t *priv = this->private; +	int32_t index = 0; +	int32_t callcnt = 0; +  	uint64_t tmp_list = 0; + +	UNIFY_CHECK_INODE_CTX_AND_UNWIND_ON_ERR (loc); + +	/* Initialization */ +	INIT_LOCAL (frame, local); +	loc_copy (&local->loc1, loc); +	local->st_ino = loc->inode->ino; + +	if (S_ISDIR (loc->inode->st_mode)) { +		local->call_count = priv->child_count + 1; +       +		for (index = 0; index < (priv->child_count + 1); index++) { +			STACK_WIND (frame, +				    unify_buf_cbk, +				    priv->xl_array[index], +				    priv->xl_array[index]->fops->chown, +				    loc, uid, gid); +		}     +	} else { +		inode_ctx_get (loc->inode, this, &tmp_list); +		local->list = (int16_t *)(long)tmp_list; + +		for (index = 0; local->list[index] != -1; index++) { +			local->call_count++; +			callcnt++; +		} +       +		for (index = 0; local->list[index] != -1; index++) { +			STACK_WIND (frame, +				    unify_buf_cbk, +				    priv->xl_array[local->list[index]], +				    priv->xl_array[local->list[index]]->fops->chown, +				    loc, uid, gid); +			if (!--callcnt) +				break; +		} +	} + +	return 0; +} + + +/** + * unify_truncate_cbk -  + */ +int32_t +unify_truncate_cbk (call_frame_t *frame, +		    void *cookie, +		    xlator_t *this, +		    int32_t op_ret, +		    int32_t op_errno, +		    struct stat *buf) +{ +	int32_t callcnt = 0; +	unify_private_t *priv = this->private; +	unify_local_t *local = frame->local; +	call_frame_t *prev_frame = cookie; + +	LOCK (&frame->lock); +	{ +		callcnt = --local->call_count; +     +		if (op_ret == -1) { +			gf_log (this->name, GF_LOG_ERROR, +				"child(%s): path(%s): %s",  +				prev_frame->this->name,  +				(local->loc1.path)?local->loc1.path:"",  +				strerror (op_errno)); +			local->op_errno = op_errno; +			if (!((op_errno == ENOENT) && priv->optimist)) +				local->op_ret = -1; +		} + +		if (op_ret >= 0) { +			if (NS (this) == prev_frame->this) { +				local->st_ino = buf->st_ino; +				/* If the entry is directory, get the  +				   stat from NS node */ +				if (S_ISDIR (buf->st_mode) ||  +				    !local->stbuf.st_blksize) { +					local->stbuf = *buf; +				} +			} + +			if ((!S_ISDIR (buf->st_mode)) &&  +			    (NS (this) != prev_frame->this)) { +				/* If file, take the stat info from  +				   Storage node. */ +				local->stbuf = *buf; +			} +		} +	} +	UNLOCK (&frame->lock); +     +	if (!callcnt) { +		if (local->st_ino) +			local->stbuf.st_ino = local->st_ino; +		else +			local->op_ret = -1; +		unify_local_wipe (local); +		STACK_UNWIND (frame, local->op_ret, local->op_errno,  +			      &local->stbuf); +	} + +	return 0; +} + +/** + * unify_truncate -  + */ +int32_t +unify_truncate (call_frame_t *frame, +		xlator_t *this, +		loc_t *loc, +		off_t offset) +{ +	unify_local_t *local = NULL; +	unify_private_t *priv = this->private; +	int32_t index = 0; +	int32_t callcnt = 0; +  	uint64_t tmp_list = 0; + +	UNIFY_CHECK_INODE_CTX_AND_UNWIND_ON_ERR (loc); + +	/* Initialization */ +	INIT_LOCAL (frame, local); +	loc_copy (&local->loc1, loc); +	local->st_ino = loc->inode->ino; + +	if (S_ISDIR (loc->inode->st_mode)) { +		local->call_count = 1; +       +		STACK_WIND (frame, +			    unify_buf_cbk, +			    NS(this), +			    NS(this)->fops->stat, +			    loc); +	} else { +		local->op_ret = 0; +		inode_ctx_get (loc->inode, this, &tmp_list); +		local->list = (int16_t *)(long)tmp_list; + +		for (index = 0; local->list[index] != -1; index++) { +			local->call_count++; +			callcnt++; +		} +       +		/* Don't send truncate to NS node */ +		STACK_WIND (frame, unify_truncate_cbk, NS(this), +			    NS(this)->fops->stat, loc); +		callcnt--; + +		for (index = 0; local->list[index] != -1; index++) { +			if (NS(this) != priv->xl_array[local->list[index]]) { +				STACK_WIND (frame, +					    unify_truncate_cbk, +					    priv->xl_array[local->list[index]], +					    priv->xl_array[local->list[index]]->fops->truncate, +					    loc, +					    offset); +				if (!--callcnt) +					break; +			} +		} +	} + +	return 0; +} + +/** + * unify_utimens -  + */ +int32_t  +unify_utimens (call_frame_t *frame, +	       xlator_t *this, +	       loc_t *loc, +	       struct timespec tv[2]) +{ +	unify_local_t *local = NULL; +	unify_private_t *priv = this->private; +	int32_t index = 0; +	int32_t callcnt = 0; +  	uint64_t tmp_list = 0; +   +	UNIFY_CHECK_INODE_CTX_AND_UNWIND_ON_ERR (loc); + +	/* Initialization */ +	INIT_LOCAL (frame, local); +	loc_copy (&local->loc1, loc); +	local->st_ino = loc->inode->ino; + +	if (S_ISDIR (loc->inode->st_mode)) { +		local->call_count = priv->child_count + 1; +       +		for (index = 0; index < (priv->child_count + 1); index++) { +			STACK_WIND (frame, +				    unify_buf_cbk, +				    priv->xl_array[index], +				    priv->xl_array[index]->fops->utimens, +				    loc, tv); +		} +	} else { +		inode_ctx_get (loc->inode, this, &tmp_list); +		local->list = (int16_t *)(long)tmp_list; + +		for (index = 0; local->list[index] != -1; index++) { +			local->call_count++; +			callcnt++; +		} +       +		for (index = 0; local->list[index] != -1; index++) { +			STACK_WIND (frame, +				    unify_buf_cbk, +				    priv->xl_array[local->list[index]], +				    priv->xl_array[local->list[index]]->fops->utimens, +				    loc, +				    tv); +			if (!--callcnt) +				break; +		} +	} +   +	return 0; +} + +/** + * unify_readlink_cbk -  + */ +int32_t +unify_readlink_cbk (call_frame_t *frame, +		    void *cookie, +		    xlator_t *this, +		    int32_t op_ret, +		    int32_t op_errno, +		    const char *path) +{ +	STACK_UNWIND (frame, op_ret, op_errno, path); +	return 0; +} + +/** + * unify_readlink - Read the link only from the storage node. + */ +int32_t +unify_readlink (call_frame_t *frame, +		xlator_t *this, +		loc_t *loc, +		size_t size) +{ +	unify_private_t *priv = this->private; +	int32_t entry_count = 0; +	int16_t *list = NULL; +	int16_t index = 0; +  	uint64_t tmp_list = 0; +   +	UNIFY_CHECK_INODE_CTX_AND_UNWIND_ON_ERR (loc); +	 +	inode_ctx_get (loc->inode, this, &tmp_list); +	list = (int16_t *)(long)tmp_list; + +	for (index = 0; list[index] != -1; index++) +		entry_count++; + +	if (entry_count >= 2) { +		for (index = 0; list[index] != -1; index++) { +			if (priv->xl_array[list[index]] != NS(this)) { +				STACK_WIND (frame, +					    unify_readlink_cbk, +					    priv->xl_array[list[index]], +					    priv->xl_array[list[index]]->fops->readlink, +					    loc, +					    size); +				break; +			} +		} +	} else { +		gf_log (this->name, GF_LOG_ERROR,  +			"returning ENOENT, no softlink files found " +			"on storage node"); +		STACK_UNWIND (frame, -1, ENOENT, NULL); +	} + +	return 0; +} + + +/** + * unify_unlink_cbk -  + */ +int32_t +unify_unlink_cbk (call_frame_t *frame, +		  void *cookie, +		  xlator_t *this, +		  int32_t op_ret, +		  int32_t op_errno) +{ +	int32_t callcnt = 0; +	unify_private_t *priv = this->private; +	unify_local_t *local = frame->local; + +	LOCK (&frame->lock); +	{ +		callcnt = --local->call_count; +		if (op_ret == 0  || ((op_errno == ENOENT) && priv->optimist)) +			local->op_ret = 0; +		if (op_ret == -1) +			local->op_errno = op_errno; +	} +	UNLOCK (&frame->lock); + +	if (!callcnt) { +		unify_local_wipe (local); +		STACK_UNWIND (frame, local->op_ret, local->op_errno); +	} + +	return 0; +} + + +/** + * unify_unlink -  + */ +int32_t +unify_unlink (call_frame_t *frame, +	      xlator_t *this, +	      loc_t *loc) +{ +	unify_private_t *priv = this->private; +	unify_local_t *local = NULL; +	int16_t *list = NULL; +	int16_t index = 0; +  	uint64_t tmp_list = 0; + +	UNIFY_CHECK_INODE_CTX_AND_UNWIND_ON_ERR (loc); + +	/* Initialization */ +	INIT_LOCAL (frame, local); +	loc_copy (&local->loc1, loc); + +	inode_ctx_get (loc->inode, this, &tmp_list); +	list = (int16_t *)(long)tmp_list; + +	for (index = 0; list[index] != -1; index++) +		local->call_count++; + +	if (local->call_count) { +		for (index = 0; list[index] != -1; index++) { +			char need_break = (list[index+1] == -1); +			STACK_WIND (frame, +				    unify_unlink_cbk, +				    priv->xl_array[list[index]], +				    priv->xl_array[list[index]]->fops->unlink, +				    loc); +			if (need_break) +				break; +		} +	} else { +		gf_log (this->name, GF_LOG_ERROR, +			"%s: returning ENOENT", loc->path); +		STACK_UNWIND (frame, -1, ENOENT); +	} + +	return 0; +} + + +/** + * unify_readv_cbk -  + */ +int32_t +unify_readv_cbk (call_frame_t *frame, +		 void *cookie, +		 xlator_t *this, +		 int32_t op_ret, +		 int32_t op_errno, +		 struct iovec *vector, +		 int32_t count, +		 struct stat *stbuf) +{ +	STACK_UNWIND (frame, op_ret, op_errno, vector, count, stbuf); +	return 0; +} + +/** + * unify_readv -  + */ +int32_t +unify_readv (call_frame_t *frame, +	     xlator_t *this, +	     fd_t *fd, +	     size_t size, +	     off_t offset) +{ +	UNIFY_CHECK_FD_CTX_AND_UNWIND_ON_ERR (fd); +	xlator_t *child = NULL; +	uint64_t tmp_child = 0; + +	fd_ctx_get (fd, this, &tmp_child); +	child = (xlator_t *)(long)tmp_child;		      + +	STACK_WIND (frame, +		    unify_readv_cbk, +		    child, +		    child->fops->readv, +		    fd, +		    size, +		    offset); + + +	return 0; +} + +/** + * unify_writev_cbk -  + */ +int32_t +unify_writev_cbk (call_frame_t *frame, +		  void *cookie, +		  xlator_t *this, +		  int32_t op_ret, +		  int32_t op_errno, +		  struct stat *stbuf) +{ +	STACK_UNWIND (frame, op_ret, op_errno, stbuf); +	return 0; +} + +/** + * unify_writev -  + */ +int32_t +unify_writev (call_frame_t *frame, +	      xlator_t *this, +	      fd_t *fd, +	      struct iovec *vector, +	      int32_t count, +	      off_t off) +{ +	UNIFY_CHECK_FD_CTX_AND_UNWIND_ON_ERR (fd); +	xlator_t *child = NULL; +	uint64_t tmp_child = 0; + +	fd_ctx_get (fd, this, &tmp_child); +	child = (xlator_t *)(long)tmp_child;		      + +	STACK_WIND (frame, +		    unify_writev_cbk, +		    child, +		    child->fops->writev, +		    fd, +		    vector, +		    count, +		    off); + +	return 0; +} + +/** + * unify_ftruncate - + */ +int32_t +unify_ftruncate (call_frame_t *frame, +		 xlator_t *this, +		 fd_t *fd, +		 off_t offset) +{ +	xlator_t *child = NULL; +	unify_local_t *local = NULL; +	uint64_t tmp_child = 0; + +	UNIFY_CHECK_FD_CTX_AND_UNWIND_ON_ERR(fd); + +	/* Initialization */ +	INIT_LOCAL (frame, local); +	local->op_ret = 0; + +	fd_ctx_get (fd, this, &tmp_child); +	child = (xlator_t *)(long)tmp_child;		      + +	local->call_count = 2; +   +	STACK_WIND (frame, unify_truncate_cbk,  +		    child, child->fops->ftruncate, +		    fd, offset); +   +	STACK_WIND (frame, unify_truncate_cbk,  +		    NS(this), NS(this)->fops->fstat, +		    fd); +   +	return 0; +} + + +/** + * unify_fchmod -  + */ +int32_t  +unify_fchmod (call_frame_t *frame, +	      xlator_t *this, +	      fd_t *fd, +	      mode_t mode) +{ +	unify_local_t *local = NULL; +	xlator_t *child = NULL;	 +	uint64_t tmp_child = 0; + +	UNIFY_CHECK_FD_AND_UNWIND_ON_ERR(fd); + +	/* Initialization */ +	INIT_LOCAL (frame, local); +	local->st_ino = fd->inode->ino; + +	if (!fd_ctx_get (fd, this, &tmp_child)) { +		/* If its set, then its file */ +		child = (xlator_t *)(long)tmp_child;		      + +		local->call_count = 2; + +		STACK_WIND (frame, unify_buf_cbk, child,  +			    child->fops->fchmod, fd, mode); + +		STACK_WIND (frame, unify_buf_cbk, NS(this),	 +			    NS(this)->fops->fchmod, fd, mode); + +	} else { +		/* this is an directory */ +		local->call_count = 1; +     +		STACK_WIND (frame, unify_buf_cbk, +			    NS(this), NS(this)->fops->fchmod, fd, mode); +	} + +	return 0; +} + +/** + * unify_fchown -  + */ +int32_t  +unify_fchown (call_frame_t *frame, +	      xlator_t *this, +	      fd_t *fd, +	      uid_t uid, +	      gid_t gid) +{ +	unify_local_t *local = NULL; +	xlator_t *child = NULL; +	uint64_t tmp_child = 0; + +	UNIFY_CHECK_FD_AND_UNWIND_ON_ERR(fd); + +	/* Initialization */ +	INIT_LOCAL (frame, local); +	local->st_ino = fd->inode->ino; + +	if (!fd_ctx_get (fd, this, &tmp_child)) { +		/* If its set, then its file */ +		child = (xlator_t *)(long)tmp_child;		      + +		local->call_count = 2; + +		STACK_WIND (frame, unify_buf_cbk, child, +			    child->fops->fchown, fd, uid, gid); + +		STACK_WIND (frame, unify_buf_cbk, NS(this), +			    NS(this)->fops->fchown,	fd, uid, gid); +	} else { +		local->call_count = 1; +     +		STACK_WIND (frame, unify_buf_cbk, +			    NS(this), NS(this)->fops->fchown, +			    fd, uid, gid); +	} +   +	return 0; +} + +/** + * unify_flush_cbk -  + */ +int32_t +unify_flush_cbk (call_frame_t *frame, +		 void *cookie, +		 xlator_t *this, +		 int32_t op_ret, +		 int32_t op_errno) +{ +	STACK_UNWIND (frame, op_ret, op_errno); +	return 0; +} + +/** + * unify_flush - + */ +int32_t +unify_flush (call_frame_t *frame, +	     xlator_t *this, +	     fd_t *fd) +{ +	UNIFY_CHECK_FD_CTX_AND_UNWIND_ON_ERR (fd); +	xlator_t *child = NULL; +	uint64_t tmp_child = 0; + +	fd_ctx_get (fd, this, &tmp_child); +	child = (xlator_t *)(long)tmp_child;		      + +	STACK_WIND (frame, unify_flush_cbk, child,  +		    child->fops->flush, fd); + +	return 0; +} + + +/** + * unify_fsync_cbk -  + */ +int32_t +unify_fsync_cbk (call_frame_t *frame, +		 void *cookie, +		 xlator_t *this, +		 int32_t op_ret, +		 int32_t op_errno) +{ +	STACK_UNWIND (frame, op_ret, op_errno); +	return 0; +} + +/** + * unify_fsync -  + */ +int32_t +unify_fsync (call_frame_t *frame, +	     xlator_t *this, +	     fd_t *fd, +	     int32_t flags) +{ +	UNIFY_CHECK_FD_CTX_AND_UNWIND_ON_ERR (fd); +	xlator_t *child = NULL; +	uint64_t tmp_child = 0; + +	fd_ctx_get (fd, this, &tmp_child); +	child = (xlator_t *)(long)tmp_child;		      + +	STACK_WIND (frame, unify_fsync_cbk, child, +		    child->fops->fsync, fd, flags); + +	return 0; +} + +/** + * unify_fstat - Send fstat FOP to Namespace only if its directory, and to  + *     both namespace and the storage node if its a file. + */ +int32_t +unify_fstat (call_frame_t *frame, +	     xlator_t *this, +	     fd_t *fd) +{ +	unify_local_t *local = NULL; +	xlator_t *child = NULL; +	uint64_t tmp_child = 0; + +	UNIFY_CHECK_FD_AND_UNWIND_ON_ERR(fd); + +	INIT_LOCAL (frame, local); +	local->st_ino = fd->inode->ino; + +	if (!fd_ctx_get (fd, this, &tmp_child)) { +		/* If its set, then its file */ +		child = (xlator_t *)(long)tmp_child;		      +		local->call_count = 2; + +		STACK_WIND (frame, unify_buf_cbk, child, +			    child->fops->fstat, fd); + +		STACK_WIND (frame, unify_buf_cbk, NS(this), +			    NS(this)->fops->fstat, fd); + +	} else { +		/* this is an directory */ +		local->call_count = 1; +		STACK_WIND (frame, unify_buf_cbk, NS(this), +			    NS(this)->fops->fstat, fd); +	} + +	return 0; +} + +/** + * unify_getdents_cbk -  + */ +int32_t +unify_getdents_cbk (call_frame_t *frame, +		    void *cookie, +		    xlator_t *this, +		    int32_t op_ret, +		    int32_t op_errno, +		    dir_entry_t *entry, +		    int32_t count) +{ +	STACK_UNWIND (frame, op_ret, op_errno, entry, count); +	return 0; +} + +/** + * unify_getdents - send the FOP request to all the nodes. + */ +int32_t +unify_getdents (call_frame_t *frame, +		xlator_t *this, +		fd_t *fd, +		size_t size, +		off_t offset, +		int32_t flag) +{ +	UNIFY_CHECK_FD_AND_UNWIND_ON_ERR (fd); + +	STACK_WIND (frame, unify_getdents_cbk, NS(this), +		    NS(this)->fops->getdents, fd, size, offset, flag); + +	return 0; +} + + +/** + * unify_readdir_cbk -  + */ +int32_t +unify_readdir_cbk (call_frame_t *frame, +		   void *cookie, +		   xlator_t *this, +		   int32_t op_ret, +		   int32_t op_errno, +		   gf_dirent_t *buf) +{ +	STACK_UNWIND (frame, op_ret, op_errno, buf); + +	return 0; +} + +/** + * unify_readdir - send the FOP request to all the nodes. + */ +int32_t +unify_readdir (call_frame_t *frame, +	       xlator_t *this, +	       fd_t *fd, +	       size_t size, +	       off_t offset) +{ +	UNIFY_CHECK_FD_AND_UNWIND_ON_ERR (fd); + +	STACK_WIND (frame, unify_readdir_cbk, NS(this), +		    NS(this)->fops->readdir, fd, size, offset); + +	return 0; +} + + +/** + * unify_fsyncdir_cbk -  + */ +int32_t +unify_fsyncdir_cbk (call_frame_t *frame, +		    void *cookie, +		    xlator_t *this, +		    int32_t op_ret, +		    int32_t op_errno) +{ +	STACK_UNWIND (frame, op_ret, op_errno); + +	return 0; +} + +/** + * unify_fsyncdir - + */ +int32_t +unify_fsyncdir (call_frame_t *frame, +		xlator_t *this, +		fd_t *fd, +		int32_t flags) +{ +	UNIFY_CHECK_FD_AND_UNWIND_ON_ERR (fd); + +	STACK_WIND (frame, unify_fsyncdir_cbk, +		    NS(this), NS(this)->fops->fsyncdir, fd, flags); + +	return 0; +} + +/** + * unify_lk_cbk - UNWIND frame with the proper return arguments. + */ +int32_t +unify_lk_cbk (call_frame_t *frame, +	      void *cookie, +	      xlator_t *this, +	      int32_t op_ret, +	      int32_t op_errno, +	      struct flock *lock) +{ +	STACK_UNWIND (frame, op_ret, op_errno, lock); +	return 0; +} + +/** + * unify_lk - Send it to all the storage nodes, (should be 1) which has file. + */ +int32_t +unify_lk (call_frame_t *frame, +	  xlator_t *this, +	  fd_t *fd, +	  int32_t cmd, +	  struct flock *lock) +{ +	UNIFY_CHECK_FD_CTX_AND_UNWIND_ON_ERR (fd); +	xlator_t *child = NULL; +	uint64_t tmp_child = 0; + +	fd_ctx_get (fd, this, &tmp_child); +	child = (xlator_t *)(long)tmp_child;		      + +	STACK_WIND (frame, unify_lk_cbk, child, +		    child->fops->lk, fd, cmd, lock); + +	return 0; +} + + +int32_t +unify_setxattr_cbk (call_frame_t *frame, +		    void *cookie, +		    xlator_t *this, +		    int32_t op_ret, +		    int32_t op_errno); + +static int32_t +unify_setxattr_file_cbk (call_frame_t *frame, +			 void *cookie, +			 xlator_t *this, +			 int32_t op_ret, +			 int32_t op_errno) +{ +	unify_private_t *private = this->private; +	unify_local_t *local = frame->local; +	xlator_t *sched_xl = NULL; +	struct sched_ops *sched_ops = NULL; + +	if (op_ret == -1) { +		if (!ENOTSUP) +			gf_log (this->name, GF_LOG_ERROR, +				"setxattr with XATTR_CREATE on ns: " +				"path(%s) key(%s): %s", +				local->loc1.path, local->name,  +				strerror (op_errno)); +		unify_local_wipe (local); +		STACK_UNWIND (frame, op_ret, op_errno); +		return 0; +	}  + +	LOCK (&frame->lock); +	{ +		local->failed = 0; +		local->op_ret = 0; +		local->op_errno = 0; +		local->call_count = 1; +	} +	UNLOCK (&frame->lock); + +	/* schedule XATTR_CREATE on one of the child node */ +	sched_ops = private->sched_ops; +     +	/* Send create request to the scheduled node now */ +	sched_xl = sched_ops->schedule (this, local->name);  +	if (!sched_xl) { +		STACK_UNWIND (frame, -1, ENOTCONN); +		return 0; +	} + +	STACK_WIND (frame, +		    unify_setxattr_cbk, +		    sched_xl, +		    sched_xl->fops->setxattr, +		    &local->loc1, +		    local->dict, +		    local->flags); +	return 0; +} + +/** + * unify_setxattr_cbk - When all the child nodes return, UNWIND frame. + */ +int32_t +unify_setxattr_cbk (call_frame_t *frame, +		    void *cookie, +		    xlator_t *this, +		    int32_t op_ret, +		    int32_t op_errno) +{ +	int32_t callcnt = 0; +	unify_local_t *local = frame->local; +	call_frame_t *prev_frame = cookie; +	dict_t *dict = NULL; + +	LOCK (&frame->lock); +	{ +		callcnt = --local->call_count; +     +		if (op_ret == -1) { +			gf_log (this->name, (((op_errno == ENOENT) ||  +					      (op_errno == ENOTSUP))?  +					     GF_LOG_DEBUG : GF_LOG_ERROR),  +				"child(%s): path(%s): %s",  +				prev_frame->this->name,  +				(local->loc1.path)?local->loc1.path:"",  +				strerror (op_errno)); +			if (local->failed == -1) { +				local->failed = 1; +			} +			local->op_errno = op_errno; +		} else { +			local->failed = 0; +			local->op_ret = op_ret; +		} +	} +	UNLOCK (&frame->lock); +   +	if (!callcnt) { +		if (local->failed && local->name &&  +		    ZR_FILE_CONTENT_REQUEST(local->name)) {       +			dict = get_new_dict (); +			dict_set (dict, local->dict->members_list->key,  +				  data_from_dynptr(NULL, 0)); +			dict_ref (dict); + +			local->call_count = 1; + +			STACK_WIND (frame, +				    unify_setxattr_file_cbk, +				    NS(this), +				    NS(this)->fops->setxattr, +				    &local->loc1, +				    dict, +				    XATTR_CREATE); + +			dict_unref (dict); +			return 0; +		} +     +		unify_local_wipe (local); +		STACK_UNWIND (frame, local->op_ret, local->op_errno); +	} + +	return 0; +} + +/** + * unify_sexattr - This function should be sent to all the storage nodes,  + *       which contains the file, (excluding namespace). + */ +int32_t +unify_setxattr (call_frame_t *frame, +		xlator_t *this, +		loc_t *loc, +		dict_t *dict, +		int32_t flags) +{ +	unify_private_t *priv = this->private; +	unify_local_t *local = NULL; +	int16_t *list = NULL; +	int16_t index = 0; +	int32_t call_count = 0; +  	uint64_t tmp_list = 0; +	data_pair_t *trav = dict->members_list; + +	UNIFY_CHECK_INODE_CTX_AND_UNWIND_ON_ERR (loc); + +	/* Initialization */ +	INIT_LOCAL (frame, local); +	local->failed = -1; +	loc_copy (&local->loc1, loc); + +	if (S_ISDIR (loc->inode->st_mode)) { + +		if (trav && trav->key && ZR_FILE_CONTENT_REQUEST(trav->key)) { +			/* direct the storage xlators to change file  +			   content only if file exists */ +			local->flags = flags; +			local->dict = dict; +			local->name = strdup (trav->key); +			flags |= XATTR_REPLACE; +		} + +		local->call_count = priv->child_count; +		for (index = 0; index < priv->child_count; index++) { +			STACK_WIND (frame, +				    unify_setxattr_cbk, +				    priv->xl_array[index], +				    priv->xl_array[index]->fops->setxattr, +				    loc, dict, flags); +		} +		return 0; +	} + +	inode_ctx_get (loc->inode, this, &tmp_list); +	list = (int16_t *)(long)tmp_list; + +	for (index = 0; list[index] != -1; index++) { +		if (NS(this) != priv->xl_array[list[index]]) { +			local->call_count++; +			call_count++; +		} +	} +   +	if (local->call_count) { +		for (index = 0; list[index] != -1; index++) { +			if (priv->xl_array[list[index]] != NS(this)) { +				STACK_WIND (frame, +					    unify_setxattr_cbk, +					    priv->xl_array[list[index]], +					    priv->xl_array[list[index]]->fops->setxattr, +					    loc, +					    dict, +					    flags); +				if (!--call_count) +					break; +			} +		} +		return 0; +	} + +	/* No entry in storage nodes */ +	gf_log (this->name, GF_LOG_DEBUG,  +		"returning ENOENT, file not found on storage node."); +	STACK_UNWIND (frame, -1, ENOENT); + +	return 0; +} + + +/** + * unify_getxattr_cbk - This function is called from only one child, so, no + *     need of any lock or anything else, just send it to above layer  + */ +int32_t +unify_getxattr_cbk (call_frame_t *frame, +		    void *cookie, +		    xlator_t *this, +		    int32_t op_ret, +		    int32_t op_errno, +		    dict_t *value) +{ +	int32_t callcnt = 0; +	dict_t *local_value = NULL; +	unify_local_t *local = frame->local; +	call_frame_t *prev_frame = cookie; +   +	LOCK (&frame->lock); +	{ +		callcnt = --local->call_count; +     +		if (op_ret == -1) { +			local->op_errno = op_errno; +			gf_log (this->name,  +				(((op_errno == ENOENT) ||  +				  (op_errno == ENODATA) ||  +				  (op_errno == ENOTSUP)) ?  +				 GF_LOG_DEBUG : GF_LOG_ERROR),  +				"child(%s): path(%s): %s",  +				prev_frame->this->name,  +				(local->loc1.path)?local->loc1.path:"",  +				strerror (op_errno)); +		} else { +			if (!local->dict) +				local->dict = dict_ref (value); +			local->op_ret = op_ret; +		} +	} +	UNLOCK (&frame->lock); +   +	if (!callcnt) { +		local_value = local->dict; +		local->dict = NULL; +       +		STACK_UNWIND (frame, local->op_ret, local->op_errno,  +			      local_value); +       +		if (local_value) +			dict_unref (local_value); +	}  + +	return 0; +} + + +/**  + * unify_getxattr - This FOP is sent to only the storage node. + */ +int32_t +unify_getxattr (call_frame_t *frame, +		xlator_t *this, +		loc_t *loc, +		const char *name) +{ +	unify_private_t *priv = this->private; +	int16_t *list = NULL; +	int16_t index = 0; +	int16_t count = 0; +	unify_local_t *local = NULL; +  	uint64_t tmp_list = 0; + +	UNIFY_CHECK_INODE_CTX_AND_UNWIND_ON_ERR (loc); +	INIT_LOCAL (frame, local); + +	if (S_ISDIR (loc->inode->st_mode)) { +		local->call_count = priv->child_count; +		for (index = 0; index < priv->child_count; index++) +			STACK_WIND (frame, +				    unify_getxattr_cbk, +				    priv->xl_array[index], +				    priv->xl_array[index]->fops->getxattr, +				    loc, +				    name); +		return 0; +	} + +	inode_ctx_get (loc->inode, this, &tmp_list); +	list = (int16_t *)(long)tmp_list; + +	for (index = 0; list[index] != -1; index++) { +		if (NS(this) != priv->xl_array[list[index]]) { +			local->call_count++; +			count++; +		} +	} + +	if (count) { +		for (index = 0; list[index] != -1; index++) { +			if (priv->xl_array[list[index]] != NS(this)) { +				STACK_WIND (frame, +					    unify_getxattr_cbk, +					    priv->xl_array[list[index]], +					    priv->xl_array[list[index]]->fops->getxattr, +					    loc, +					    name); +				if (!--count) +					break; +			} +		} +	} else { +		dict_t *tmp_dict = get_new_dict (); +		gf_log (this->name, GF_LOG_DEBUG,  +			"%s: returning ENODATA, no file found on storage node", +			loc->path); +		STACK_UNWIND (frame, -1, ENODATA, tmp_dict); +		dict_destroy (tmp_dict); +	} + +	return 0; +} + +/** + * unify_removexattr_cbk - Wait till all the child node returns the call + *      and then UNWIND to above layer. + */ +int32_t +unify_removexattr_cbk (call_frame_t *frame, +		       void *cookie, +		       xlator_t *this, +		       int32_t op_ret, +		       int32_t op_errno) +{ +	int32_t callcnt = 0; +	unify_local_t *local = frame->local; +	call_frame_t *prev_frame = cookie; + +	LOCK (&frame->lock); +	{  +		callcnt = --local->call_count; +		if (op_ret == -1) { +			local->op_errno = op_errno; +			if (op_errno != ENOTSUP) +				gf_log (this->name, GF_LOG_ERROR,  +					"child(%s): path(%s): %s",  +					prev_frame->this->name,  +					local->loc1.path, strerror (op_errno)); +		} else { +			local->op_ret = op_ret; +		} +	} +	UNLOCK (&frame->lock);   + +	if (!callcnt) { +		STACK_UNWIND (frame, local->op_ret, local->op_errno); +	} + +	return 0; +} + +/** + * unify_removexattr - Send it to all the child nodes which has the files. + */ +int32_t +unify_removexattr (call_frame_t *frame, +		   xlator_t *this, +		   loc_t *loc, +		   const char *name) +{ +	unify_private_t *priv = this->private; +	unify_local_t *local = NULL; +	int16_t *list = NULL; +	int16_t index = 0; +	int32_t call_count = 0; +  	uint64_t tmp_list = 0; + +	UNIFY_CHECK_INODE_CTX_AND_UNWIND_ON_ERR (loc); + +	/* Initialization */ +	INIT_LOCAL (frame, local); + +	if (S_ISDIR (loc->inode->st_mode)) { +		local->call_count = priv->child_count; +		for (index = 0; index < priv->child_count; index++) +			STACK_WIND (frame, 		     +				    unify_removexattr_cbk, +				    priv->xl_array[index], +				    priv->xl_array[index]->fops->removexattr, +				    loc, +				    name); + +		return 0; +	} + +	inode_ctx_get (loc->inode, this, &tmp_list); +	list = (int16_t *)(long)tmp_list; + +	for (index = 0; list[index] != -1; index++) { +		if (NS(this) != priv->xl_array[list[index]]) { +			local->call_count++; +			call_count++; +		} +	} + +	if (local->call_count) { +		for (index = 0; list[index] != -1; index++) { +			if (priv->xl_array[list[index]] != NS(this)) { +				STACK_WIND (frame, +					    unify_removexattr_cbk, +					    priv->xl_array[list[index]], +					    priv->xl_array[list[index]]->fops->removexattr, +					    loc, +					    name); +				if (!--call_count) +					break; +			} +		} +		return 0; +	}  + +	gf_log (this->name, GF_LOG_DEBUG,  +		"%s: returning ENOENT, not found on storage node.", loc->path); +	STACK_UNWIND (frame, -1, ENOENT); + +	return 0; +} + + +int32_t  +unify_mknod_unlink_cbk (call_frame_t *frame, +			void *cookie, +			xlator_t *this, +			int32_t op_ret, +			int32_t op_errno) +{ +	unify_local_t *local = frame->local; + +	if (op_ret == -1) +		gf_log (this->name, GF_LOG_ERROR,  +			"%s: %s", local->loc1.path, strerror (op_errno)); +   +	unify_local_wipe (local); +	/* No log required here as this -1 is for mknod call */ +	STACK_UNWIND (frame, -1, local->op_errno, NULL, NULL); +	return 0; +} + +/** + * unify_mknod_cbk -  + */ +int32_t +unify_mknod_cbk (call_frame_t *frame, +		 void *cookie, +		 xlator_t *this, +		 int32_t op_ret, +		 int32_t op_errno, +		 inode_t *inode, +		 struct stat *buf) +{ +	unify_local_t *local = frame->local; + +	if (op_ret == -1) { +		gf_log (this->name, GF_LOG_ERROR,  +			"mknod failed on storage node, sending unlink to " +			"namespace"); +		local->op_errno = op_errno; +		STACK_WIND (frame, +			    unify_mknod_unlink_cbk, +			    NS(this), +			    NS(this)->fops->unlink, +			    &local->loc1); +		return 0; +	} +   +	local->stbuf = *buf; +	local->stbuf.st_ino = local->st_ino; +	unify_local_wipe (local); +	STACK_UNWIND (frame, op_ret, op_errno, inode, &local->stbuf); +	return 0; +} + +/** + * unify_ns_mknod_cbk -  + */ +int32_t +unify_ns_mknod_cbk (call_frame_t *frame, +		    void *cookie, +		    xlator_t *this, +		    int32_t op_ret, +		    int32_t op_errno, +		    inode_t *inode, +		    struct stat *buf) +{ +	struct sched_ops *sched_ops = NULL; +	xlator_t *sched_xl = NULL; +	unify_local_t *local = frame->local; +	unify_private_t *priv = this->private; +	int16_t *list = NULL; +	int16_t index = 0; +	call_frame_t *prev_frame = cookie; + +	if (op_ret == -1) { +		/* No need to send mknod request to other servers,  +		 * as namespace action failed  +		 */ +		gf_log (this->name, GF_LOG_ERROR,  +			"child(%s): path(%s): %s",  +			prev_frame->this->name, local->loc1.path,  +			strerror (op_errno)); +		unify_local_wipe (local); +		STACK_UNWIND (frame, op_ret, op_errno, inode, buf); +		return 0; +	} +   +	/* Create one inode for this entry */ +	local->op_ret = 0; +	local->stbuf = *buf; +	local->st_ino = buf->st_ino; + +	list = CALLOC (1, sizeof (int16_t) * 3); +	ERR_ABORT (list); +	list[0] = priv->child_count; +	list[2] = -1; +	inode_ctx_put (inode, this, (uint64_t)(long)list); + +	sched_ops = priv->sched_ops; + +	/* Send mknod request to scheduled node now */ +	sched_xl = sched_ops->schedule (this, local->loc1.path);  +	if (!sched_xl) { +		gf_log (this->name, GF_LOG_ERROR,  +			"mknod failed on storage node, no node online " +			"at the moment, sending unlink to NS"); +		local->op_errno = ENOTCONN; +		STACK_WIND (frame, +			    unify_mknod_unlink_cbk, +			    NS(this), +			    NS(this)->fops->unlink, +			    &local->loc1); +       +		return 0; +	} + +	for (index = 0; index < priv->child_count; index++) +		if (sched_xl == priv->xl_array[index]) +			break; +	list[1] = index; +   +	STACK_WIND (frame,  unify_mknod_cbk, +		    sched_xl,  sched_xl->fops->mknod, +		    &local->loc1, local->mode, local->dev); + +	return 0; +} + +/** + * unify_mknod - Create a device on namespace first, and later create on  + *       the storage node. + */ +int32_t +unify_mknod (call_frame_t *frame, +	     xlator_t *this, +	     loc_t *loc, +	     mode_t mode, +	     dev_t rdev) +{ +	unify_local_t *local = NULL; +   +	/* Initialization */ +	INIT_LOCAL (frame, local); +	local->mode = mode; +	local->dev = rdev; +	loc_copy (&local->loc1, loc); +	if (local->loc1.path == NULL) { +		gf_log (this->name, GF_LOG_CRITICAL, "Not enough memory :O"); +		STACK_UNWIND (frame, -1, ENOMEM, loc->inode, NULL); +		return 0; +	} + +	STACK_WIND (frame, +		    unify_ns_mknod_cbk, +		    NS(this), +		    NS(this)->fops->mknod, +		    loc, +		    mode, +		    rdev); + +	return 0; +} + +int32_t  +unify_symlink_unlink_cbk (call_frame_t *frame, +			  void *cookie, +			  xlator_t *this, +			  int32_t op_ret, +			  int32_t op_errno) +{ +	unify_local_t *local = frame->local; +	if (op_ret == -1) +		gf_log (this->name, GF_LOG_ERROR,  +			"%s: %s", local->loc1.path, strerror (op_errno)); + +	unify_local_wipe (local); +	STACK_UNWIND (frame, -1, local->op_errno, NULL, NULL); +	return 0; +} + +/** + * unify_symlink_cbk -  + */ +int32_t +unify_symlink_cbk (call_frame_t *frame, +		   void *cookie, +		   xlator_t *this, +		   int32_t op_ret, +		   int32_t op_errno, +		   inode_t *inode, +		   struct stat *buf) +{ +	unify_local_t *local = frame->local; + +	if (op_ret == -1) { +		/* Symlink on storage node failed, hence send unlink  +		   to the NS node */ +		local->op_errno = op_errno; +		gf_log (this->name, GF_LOG_ERROR,  +			"symlink on storage node failed, sending unlink " +			"to namespace"); + +		STACK_WIND (frame, +			    unify_symlink_unlink_cbk, +			    NS(this), +			    NS(this)->fops->unlink, +			    &local->loc1); +     +		return 0; +	} +   +	local->stbuf = *buf; +	local->stbuf.st_ino = local->st_ino; +	unify_local_wipe (local); +	STACK_UNWIND (frame, op_ret, op_errno, inode, &local->stbuf); + +	return 0; +} + +/** + * unify_ns_symlink_cbk -  + */ +int32_t +unify_ns_symlink_cbk (call_frame_t *frame, +		      void *cookie, +		      xlator_t *this, +		      int32_t op_ret, +		      int32_t op_errno, +		      inode_t *inode, +		      struct stat *buf) +{ + +	struct sched_ops *sched_ops = NULL; +	xlator_t *sched_xl = NULL; +	int16_t *list = NULL; +	unify_local_t *local = frame->local; +	unify_private_t *priv = this->private; +	int16_t index = 0; + +	if (op_ret == -1) { +		/* No need to send symlink request to other servers,  +		 * as namespace action failed  +		 */ +		gf_log (this->name, GF_LOG_ERROR,  +			"namespace: path(%s): %s",  +			local->loc1.path, strerror (op_errno)); +		unify_local_wipe (local); +		STACK_UNWIND (frame, op_ret, op_errno, NULL, buf); +		return 0; +	} +   +	/* Create one inode for this entry */ +	local->op_ret = 0; +	local->st_ino = buf->st_ino; +   +	/* Start the mapping list */ + +	list = CALLOC (1, sizeof (int16_t) * 3); +	ERR_ABORT (list); +	list[0] = priv->child_count; //namespace's index +	list[2] = -1; +	inode_ctx_put (inode, this, (uint64_t)(long)list); + +	sched_ops = priv->sched_ops; + +	/* Send symlink request to all the nodes now */ +	sched_xl = sched_ops->schedule (this, local->loc1.path);  +	if (!sched_xl) { +		/* Symlink on storage node failed, hence send unlink  +		   to the NS node */ +		local->op_errno = ENOTCONN; +		gf_log (this->name, GF_LOG_ERROR,  +			"symlink on storage node failed, no node online, " +			"sending unlink to namespace"); +       +		STACK_WIND (frame, +			    unify_symlink_unlink_cbk, +			    NS(this), +			    NS(this)->fops->unlink, +			    &local->loc1); +       +		return 0; +	} + +	for (index = 0; index < priv->child_count; index++) +		if (sched_xl == priv->xl_array[index]) +			break; +	list[1] = index; +	 +	STACK_WIND (frame, +		    unify_symlink_cbk, +		    sched_xl, +		    sched_xl->fops->symlink, +		    local->name, +		    &local->loc1); + +	return 0; +} + +/** + * unify_symlink -  + */ +int32_t +unify_symlink (call_frame_t *frame, +	       xlator_t *this, +	       const char *linkpath, +	       loc_t *loc) +{ +	unify_local_t *local = NULL; +   +	/* Initialization */ +	INIT_LOCAL (frame, local); +	loc_copy (&local->loc1, loc); +	local->name = strdup (linkpath); + +	if ((local->name == NULL) ||  +	    (local->loc1.path == NULL)) { +		gf_log (this->name, GF_LOG_CRITICAL, "Not enough memory :O"); +		STACK_UNWIND (frame, -1, ENOMEM, loc->inode, NULL); +		return 0; +	} + +	STACK_WIND (frame, +		    unify_ns_symlink_cbk, +		    NS(this), +		    NS(this)->fops->symlink, +		    linkpath, +		    loc); + +	return 0; +} + + +int32_t  +unify_rename_unlink_cbk (call_frame_t *frame, +			 void *cookie, +			 xlator_t *this, +			 int32_t op_ret, +			 int32_t op_errno) +{ +	int32_t callcnt = 0; +	unify_local_t *local = frame->local; +	call_frame_t *prev_frame = cookie; +   +	if (op_ret == -1) { +		gf_log (this->name, GF_LOG_ERROR,  +			"child(%s): path(%s -> %s): %s",  +			prev_frame->this->name,  +			local->loc1.path, local->loc2.path,  +			strerror (op_errno)); +       +	} +	LOCK (&frame->lock); +	{ +		callcnt = --local->call_count; +	} +	UNLOCK (&frame->lock); + +	if (!callcnt) { +		local->stbuf.st_ino = local->st_ino; +		unify_local_wipe (local); +		STACK_UNWIND (frame, local->op_ret, local->op_errno,  +			      &local->stbuf); +	} +	return 0; +} + +int32_t  +unify_ns_rename_undo_cbk (call_frame_t *frame, +			  void *cookie, +			  xlator_t *this, +			  int32_t op_ret, +			  int32_t op_errno, +			  struct stat *buf) +{ +	unify_local_t *local = frame->local; + +	if (op_ret == -1) { +		gf_log (this->name, GF_LOG_ERROR,  +			"namespace: path(%s -> %s): %s",  +			local->loc1.path, local->loc2.path,  +			strerror (op_errno)); +	} + +	local->stbuf.st_ino = local->st_ino; +	unify_local_wipe (local); +	STACK_UNWIND (frame, local->op_ret, local->op_errno, &local->stbuf); +	return 0; +} + +int32_t  +unify_rename_cbk (call_frame_t *frame, +		  void *cookie, +		  xlator_t *this, +		  int32_t op_ret, +		  int32_t op_errno, +		  struct stat *buf) +{ +	int32_t index = 0; +	int32_t callcnt = 0; +	int16_t *list = NULL; +	unify_private_t *priv = this->private; +	unify_local_t *local = frame->local; +	call_frame_t *prev_frame = cookie; +   +	LOCK (&frame->lock); +	{ +		callcnt = --local->call_count; +		if (op_ret >= 0) { +			if (!S_ISDIR (buf->st_mode)) +				local->stbuf = *buf; +			local->op_ret = op_ret; +		} else { +			gf_log (this->name, GF_LOG_ERROR,  +				"child(%s): path(%s -> %s): %s",  +				prev_frame->this->name,  +				local->loc1.path, local->loc2.path,  +				strerror (op_errno)); +			local->op_errno = op_errno; +		} +	} +	UNLOCK (&frame->lock); + +	if (!callcnt) { +		local->stbuf.st_ino = local->st_ino; +		if (S_ISDIR (local->loc1.inode->st_mode)) { +			unify_local_wipe (local); +			STACK_UNWIND (frame, local->op_ret, local->op_errno, &local->stbuf); +			return 0; +		} + +		if (local->op_ret == -1) { +			/* TODO: check this logic */ + +			/* Rename failed in storage node, successful on NS,  +			 * hence, rename back the entries in NS */ +			/* NOTE: this will be done only if the destination  +			 * doesn't exists, if  the destination exists, the  +			 * job of correcting NS is left to self-heal +			 */ +			if (!local->index) { +				loc_t tmp_oldloc = { +                                        /* its actual 'newloc->path' */ +					.path = local->loc2.path,  +					.inode = local->loc1.inode, +					.parent = local->loc2.parent +				}; +	 +				loc_t tmp_newloc = { +					/* Actual 'oldloc->path' */ +					.path = local->loc1.path, +					.parent = local->loc1.parent +				}; + +				gf_log (this->name, GF_LOG_ERROR,  +					"rename succussful on namespace, on " +					"stroage node failed, reverting back"); + +				STACK_WIND (frame, +					    unify_ns_rename_undo_cbk, +					    NS(this), +					    NS(this)->fops->rename, +					    &tmp_oldloc, +					    &tmp_newloc); +				return 0; +			} +		} else { +			/* Rename successful on storage nodes */ + +			int32_t idx = 0; +			int16_t *tmp_list = NULL; +			uint64_t tmp_list_int64 = 0; +			if (local->loc2.inode) { +				inode_ctx_get (local->loc2.inode,  +					       this, &tmp_list_int64); +				list = (int16_t *)(long)tmp_list_int64; + +			} + +			if (list) {				 +				for (index = 0; list[index] != -1; index++); +				tmp_list = CALLOC (1, index * 2); +				memcpy (tmp_list, list, index * 2); + +				for (index = 0; list[index] != -1; index++) { +					/* TODO: Check this logic. */ +					/* If the destination file exists in  +					 * the same storage node where we sent +					 * 'rename' call, no need to send  +					 * unlink  +					 */ +					for (idx = 0;  +					     local->list[idx] != -1; idx++) { +						if (tmp_list[index] == local->list[idx]) { +							tmp_list[index] = priv->child_count; +							continue; +						} +					} +	   +					if (NS(this) != priv->xl_array[tmp_list[index]]) { +						local->call_count++; +						callcnt++; +					} +				} + +				if (local->call_count) { +					if (callcnt > 1) +						gf_log (this->name,  +							GF_LOG_ERROR,  +							"%s->%s: more (%d) " +							"subvolumes have the " +							"newloc entry",  +							local->loc1.path,  +							local->loc2.path,  +							callcnt); + +					for (index=0;  +					     tmp_list[index] != -1; index++) { +						if (NS(this) != priv->xl_array[tmp_list[index]]) {		     +							STACK_WIND (frame, +								    unify_rename_unlink_cbk, +								    priv->xl_array[tmp_list[index]], +								    priv->xl_array[tmp_list[index]]->fops->unlink, +								    &local->loc2); +							if (!--callcnt) +								break; +						} +					} + +					FREE (tmp_list); +					return 0; +				} +				if (tmp_list) +					FREE (tmp_list); +			} +		} +     +		/* Need not send 'unlink' to storage node */ +		unify_local_wipe (local); +		STACK_UNWIND (frame, local->op_ret,  +			      local->op_errno, &local->stbuf); +	} + +	return 0; +} + +int32_t  +unify_ns_rename_cbk (call_frame_t *frame, +		     void *cookie, +		     xlator_t *this, +		     int32_t op_ret, +		     int32_t op_errno, +		     struct stat *buf) +{ +	int32_t index = 0; +	int32_t callcnt = 0; +	int16_t *list = NULL; +	unify_private_t *priv = this->private; +	unify_local_t *local = frame->local; + +	if (op_ret == -1) { +		/* Free local->new_inode */ +		gf_log (this->name, GF_LOG_ERROR,  +			"namespace: path(%s -> %s): %s",  +			local->loc1.path, local->loc2.path,  +			strerror (op_errno)); + +		unify_local_wipe (local); +		STACK_UNWIND (frame, op_ret, op_errno, buf); +		return 0; +	} + +	local->stbuf = *buf; +	local->st_ino = buf->st_ino; + +	/* Everything is fine. */ +	if (S_ISDIR (buf->st_mode)) { +		local->call_count = priv->child_count; +		for (index=0; index < priv->child_count; index++) { +			STACK_WIND (frame, +				    unify_rename_cbk, +				    priv->xl_array[index], +				    priv->xl_array[index]->fops->rename, +				    &local->loc1, +				    &local->loc2); +		} + +		return 0; +	} + +	local->call_count = 0;   +	/* send rename */ +	list = local->list; +	for (index=0; list[index] != -1; index++) { +		if (NS(this) != priv->xl_array[list[index]]) { +			local->call_count++; +			callcnt++; +		} +	} + +	if (local->call_count) { +		for (index=0; list[index] != -1; index++) { +			if (NS(this) != priv->xl_array[list[index]]) { +				STACK_WIND (frame, +					    unify_rename_cbk, +					    priv->xl_array[list[index]], +					    priv->xl_array[list[index]]->fops->rename, +					    &local->loc1, +					    &local->loc2); +				if (!--callcnt) +					break; +			} +		} +	} else { +		/* file doesn't seem to be present in storage nodes */ +		gf_log (this->name, GF_LOG_CRITICAL, +			"CRITICAL: source file not in storage node, " +			"rename successful on namespace :O"); +		unify_local_wipe (local); +		STACK_UNWIND (frame, -1, EIO, NULL); +	} +	return 0; +} + + +/** + * unify_rename - One of the tricky function. The deadliest of all :O + */ +int32_t +unify_rename (call_frame_t *frame, +	      xlator_t *this, +	      loc_t *oldloc, +	      loc_t *newloc) +{ +	unify_local_t *local = NULL; +  	uint64_t tmp_list = 0; + +	/* Initialization */ +	INIT_LOCAL (frame, local); +	loc_copy (&local->loc1, oldloc); +	loc_copy (&local->loc2, newloc); + +	if ((local->loc1.path == NULL) ||  +	    (local->loc2.path == NULL)) { +		gf_log (this->name, GF_LOG_CRITICAL, "Not enough memory :O"); +		STACK_UNWIND (frame, -1, ENOMEM, NULL); +		return 0; +	} +   +	inode_ctx_get (oldloc->inode, this, &tmp_list); +	local->list = (int16_t *)(long)tmp_list; + +	STACK_WIND (frame, +		    unify_ns_rename_cbk, +		    NS(this), +		    NS(this)->fops->rename, +		    oldloc, +		    newloc); +	return 0; +} + +/** + * unify_link_cbk - + */ +int32_t +unify_link_cbk (call_frame_t *frame, +		void *cookie, +		xlator_t *this, +		int32_t op_ret, +		int32_t op_errno, +		inode_t *inode, +		struct stat *buf) +{ +	unify_local_t *local = frame->local; + +	if (op_ret >= 0)  +		local->stbuf = *buf; +	local->stbuf.st_ino = local->st_ino; + +	unify_local_wipe (local); +	STACK_UNWIND (frame, op_ret, op_errno, inode, &local->stbuf); + +	return 0; +} + +/** + * unify_ns_link_cbk -  + */ +int32_t +unify_ns_link_cbk (call_frame_t *frame, +		   void *cookie, +		   xlator_t *this, +		   int32_t op_ret, +		   int32_t op_errno, +		   inode_t *inode, +		   struct stat *buf) +{ +	unify_private_t *priv = this->private; +	unify_local_t *local = frame->local; +	int16_t *list = local->list; +	int16_t index = 0; + +	if (op_ret == -1) { +		/* No need to send link request to other servers,  +		 * as namespace action failed  +		 */ +		gf_log (this->name, GF_LOG_ERROR,  +			"namespace: path(%s -> %s): %s",  +			local->loc1.path, local->loc2.path,  +			strerror (op_errno)); +		unify_local_wipe (local); +		STACK_UNWIND (frame, op_ret, op_errno, inode, buf); +		return 0; +	} + +	/* Update inode for this entry */ +	local->op_ret = 0; +	local->st_ino = buf->st_ino; + +	/* Send link request to the node now */ +	for (index = 0; list[index] != -1; index++) { +		char need_break = (list[index+1] == -1); +		if (priv->xl_array[list[index]] != NS (this)) { +			STACK_WIND (frame, +				    unify_link_cbk, +				    priv->xl_array[list[index]], +				    priv->xl_array[list[index]]->fops->link, +				    &local->loc1, +				    &local->loc2); +		} +		if (need_break) +			break; +	} + +	return 0; +} + +/** + * unify_link -  + */ +int32_t +unify_link (call_frame_t *frame, +	    xlator_t *this, +	    loc_t *oldloc, +	    loc_t *newloc) +{ +	unify_local_t *local = NULL; +  	uint64_t tmp_list = 0; + +	UNIFY_CHECK_INODE_CTX_AND_UNWIND_ON_ERR (oldloc); +	UNIFY_CHECK_INODE_CTX_AND_UNWIND_ON_ERR (newloc); + +	/* Initialization */ +	INIT_LOCAL (frame, local); + +	loc_copy (&local->loc1, oldloc); +	loc_copy (&local->loc2, newloc); + +	inode_ctx_get (oldloc->inode, this, &tmp_list); +	local->list = (int16_t *)(long)tmp_list; + +	STACK_WIND (frame, +		    unify_ns_link_cbk, +		    NS(this), +		    NS(this)->fops->link, +		    oldloc, +		    newloc); + +	return 0; +} + + +/** + * unify_checksum_cbk -  + */ +int32_t +unify_checksum_cbk (call_frame_t *frame, +		    void *cookie, +		    xlator_t *this, +		    int32_t op_ret, +		    int32_t op_errno, +		    uint8_t *fchecksum, +		    uint8_t *dchecksum) +{ +	STACK_UNWIND (frame, op_ret, op_errno, fchecksum, dchecksum); + +	return 0; +} + +/** + * unify_checksum -  + */ +int32_t +unify_checksum (call_frame_t *frame, +		xlator_t *this, +		loc_t *loc, +		int32_t flag) +{ +	STACK_WIND (frame, +		    unify_checksum_cbk, +		    NS(this), +		    NS(this)->fops->checksum, +		    loc, +		    flag); + +	return 0; +} + + +/** + * unify_finodelk_cbk -  + */ +int +unify_finodelk_cbk (call_frame_t *frame, void *cookie, xlator_t *this, +		    int32_t op_ret, int32_t op_errno) +{ +	STACK_UNWIND (frame, op_ret, op_errno); +	return 0; +} + +/** + * unify_finodelk + */ +int +unify_finodelk (call_frame_t *frame, xlator_t *this, +		fd_t *fd, int cmd, struct flock *flock) +{ +	UNIFY_CHECK_FD_CTX_AND_UNWIND_ON_ERR (fd); +	xlator_t *child = NULL; +	uint64_t tmp_child = 0; + +	fd_ctx_get (fd, this, &tmp_child); +	child = (xlator_t *)(long)tmp_child;		      + +	STACK_WIND (frame, unify_finodelk_cbk, +		    child, child->fops->finodelk, +		    fd, cmd, flock); + +	return 0; +} + + + +/** + * unify_fentrylk_cbk -  + */ +int +unify_fentrylk_cbk (call_frame_t *frame, void *cookie, xlator_t *this, +		    int32_t op_ret, int32_t op_errno) +{ +	STACK_UNWIND (frame, op_ret, op_errno); +	return 0; +} + +/** + * unify_fentrylk + */ +int +unify_fentrylk (call_frame_t *frame, xlator_t *this, +		fd_t *fd, const char *basename, +		entrylk_cmd cmd, entrylk_type type) +		 +{ +	UNIFY_CHECK_FD_CTX_AND_UNWIND_ON_ERR (fd); +	xlator_t *child = NULL; +	uint64_t tmp_child = 0; + +	fd_ctx_get (fd, this, &tmp_child); +	child = (xlator_t *)(long)tmp_child;		      + +	STACK_WIND (frame, unify_fentrylk_cbk, +		    child, child->fops->fentrylk, +		    fd, basename, cmd, type); + +	return 0; +} + + + +/** + * unify_fxattrop_cbk -  + */ +int +unify_fxattrop_cbk (call_frame_t *frame, void *cookie, xlator_t *this, +		    int32_t op_ret, int32_t op_errno, dict_t *xattr) +{ +	STACK_UNWIND (frame, op_ret, op_errno, xattr); +	return 0; +} + +/** + * unify_fxattrop + */ +int +unify_fxattrop (call_frame_t *frame, xlator_t *this, +		fd_t *fd, gf_xattrop_flags_t optype, dict_t *xattr) +{ +	UNIFY_CHECK_FD_CTX_AND_UNWIND_ON_ERR (fd); +	xlator_t *child = NULL; +	uint64_t tmp_child = 0; + +	fd_ctx_get (fd, this, &tmp_child); +	child = (xlator_t *)(long)tmp_child;		      + +	STACK_WIND (frame, unify_fxattrop_cbk, +		    child, child->fops->fxattrop, +		    fd, optype, xattr); + +	return 0; +} + + +/** + * unify_inodelk_cbk -  + */ +int +unify_inodelk_cbk (call_frame_t *frame, void *cookie, xlator_t *this, +		   int32_t op_ret, int32_t op_errno) +{ +	STACK_UNWIND (frame, op_ret, op_errno); +	return 0; +} + + +/** + * unify_inodelk + */ +int +unify_inodelk (call_frame_t *frame, xlator_t *this, +	       loc_t *loc, int cmd, struct flock *flock) +{ +	xlator_t *child = NULL; + +	child = unify_loc_subvol (loc, this); + +	STACK_WIND (frame, unify_inodelk_cbk, +		    child, child->fops->inodelk, +		    loc, cmd, flock); + +	return 0; +} + + + +/** + * unify_entrylk_cbk -  + */ +int +unify_entrylk_cbk (call_frame_t *frame, void *cookie, xlator_t *this, +		   int32_t op_ret, int32_t op_errno) +{ +	STACK_UNWIND (frame, op_ret, op_errno); +	return 0; +} + +/** + * unify_entrylk + */ +int +unify_entrylk (call_frame_t *frame, xlator_t *this, +	       loc_t *loc, const char *basename, +	       entrylk_cmd cmd, entrylk_type type) +		 +{ +	xlator_t *child = NULL; + +	child = unify_loc_subvol (loc, this); + +	STACK_WIND (frame, unify_entrylk_cbk, +		    child, child->fops->entrylk, +		    loc, basename, cmd, type); + +	return 0; +} + + + +/** + * unify_xattrop_cbk -  + */ +int +unify_xattrop_cbk (call_frame_t *frame, void *cookie, xlator_t *this, +		   int32_t op_ret, int32_t op_errno, dict_t *xattr) +{ +	STACK_UNWIND (frame, op_ret, op_errno, xattr); +	return 0; +} + +/** + * unify_xattrop + */ +int +unify_xattrop (call_frame_t *frame, xlator_t *this, +		loc_t *loc, gf_xattrop_flags_t optype, dict_t *xattr) +{ +	xlator_t *child = NULL; + +	child = unify_loc_subvol (loc, this); + +	STACK_WIND (frame, unify_xattrop_cbk, +		    child, child->fops->xattrop, +		    loc, optype, xattr); + +	return 0; +} + + +/** + * notify + */ +int32_t +notify (xlator_t *this, +        int32_t event, +        void *data, +        ...) +{ +	unify_private_t *priv = this->private; +	struct sched_ops *sched = NULL; + +	if (!priv) { +		return 0; +	} + +	sched = priv->sched_ops;     +	if (!sched) { +		gf_log (this->name, GF_LOG_CRITICAL, "No scheduler :O"); +		raise (SIGTERM); +		return 0; +	} +	if (priv->namespace == data) { +		if (event == GF_EVENT_CHILD_UP) { +			sched->notify (this, event, data); +		} +		return 0; +	} + +	switch (event) +	{ +	case GF_EVENT_CHILD_UP: +	{ +		/* Call scheduler's update () to enable it for scheduling */ +		sched->notify (this, event, data); +	 +		LOCK (&priv->lock); +		{ +			/* Increment the inode's generation, which is  +			   used for self_heal */ +			++priv->inode_generation; +			++priv->num_child_up; +		} +		UNLOCK (&priv->lock); + +		if (!priv->is_up) { +			default_notify (this, event, data); +			priv->is_up = 1; +		} +	} +	break; +	case GF_EVENT_CHILD_DOWN: +	{ +		/* Call scheduler's update () to disable the child node  +		 * for scheduling +		 */ +		sched->notify (this, event, data); +		LOCK (&priv->lock); +		{ +			--priv->num_child_up; +		} +		UNLOCK (&priv->lock); + +		if (priv->num_child_up == 0) { +			/* Send CHILD_DOWN to upper layer */ +			default_notify (this, event, data); +			priv->is_up = 0; +		} +	} +	break; + +	default: +	{ +		default_notify (this, event, data); +	} +	break; +	} + +	return 0; +} + +/**  + * init - This function is called first in the xlator, while initializing. + *   All the config file options are checked and appropriate flags are set. + * + * @this -  + */ +int32_t  +init (xlator_t *this) +{ +	int32_t          ret       = 0; +	int32_t          count     = 0; +	data_t          *scheduler = NULL; +	data_t          *data      = NULL; +	xlator_t        *ns_xl     = NULL; +	xlator_list_t   *trav      = NULL; +	xlator_list_t   *xlparent  = NULL; +	xlator_list_t   *parent    = NULL; +	unify_private_t *_private  = NULL;  + +	/* Check for number of child nodes, if there is no child nodes, exit */ +	if (!this->children) { +		gf_log (this->name, GF_LOG_ERROR, +			"No child nodes specified. check \"subvolumes \" " +			"option in volfile"); +		return -1; +	} + +  	if (!this->parents) { +		gf_log (this->name, GF_LOG_WARNING, +			"dangling volume. check volfile "); +	} +   +	/* Check for 'scheduler' in volume */ +	scheduler = dict_get (this->options, "scheduler"); +	if (!scheduler) { +		gf_log (this->name, GF_LOG_ERROR,  +			"\"option scheduler <x>\" is missing in volfile"); +		return -1; +	} + +	/* Setting "option namespace <node>" */ +	data = dict_get (this->options, "namespace"); +	if(!data) { +		gf_log (this->name, GF_LOG_CRITICAL,  +			"namespace option not specified, Exiting"); +		return -1; +	} +	/* Search namespace in the child node, if found, exit */ +	trav = this->children; +	while (trav) { +		if (strcmp (trav->xlator->name, data->data) == 0) +			break; +		trav = trav->next; +	} +	if (trav) { +		gf_log (this->name, GF_LOG_CRITICAL,  +			"namespace node used as a subvolume, Exiting"); +		return -1; +	} +	 +	/* Search for the namespace node, if found, continue */ +	ns_xl = this->next; +	while (ns_xl) { +		if (strcmp (ns_xl->name, data->data) == 0) +			break; +		ns_xl = ns_xl->next; +	} +	if (!ns_xl) { +		gf_log (this->name, GF_LOG_CRITICAL,  +			"namespace node not found in volfile, Exiting"); +		return -1; +	} +	 +	gf_log (this->name, GF_LOG_DEBUG,  +		"namespace node specified as %s", data->data); +	 +	_private = CALLOC (1, sizeof (*_private)); +	ERR_ABORT (_private); +	_private->sched_ops = get_scheduler (this, scheduler->data); +	if (!_private->sched_ops) { +		gf_log (this->name, GF_LOG_CRITICAL,  +			"Error while loading scheduler. Exiting"); +		FREE (_private); +		return -1; +	} +	 +	if (ns_xl->parents) { +		gf_log (this->name, GF_LOG_CRITICAL, +			"Namespace node should not be a child of any other node. Exiting"); +		FREE (_private); +		return -1; +	} + +	_private->namespace = ns_xl; +	 +	/* update _private structure */ +	{ +		count = 0; +		trav = this->children; +		/* Get the number of child count */ +		while (trav) { +			count++; +			trav = trav->next; +		} +		 +		gf_log (this->name, GF_LOG_DEBUG,  +			"Child node count is %d", count);     + +		_private->child_count = count; +		if (count == 1) { +			/* TODO: Should I error out here? */ +			gf_log (this->name, GF_LOG_CRITICAL,  +				"WARNING: You have defined only one " +				"\"subvolumes\" for unify volume. It may not " +				"be the desired config, review your volume " +				"volfile. If this is how you are testing it," +				" you may hit some performance penalty"); +		} +		 +		_private->xl_array = CALLOC (1,  +					     sizeof (xlator_t) * (count + 1)); +		ERR_ABORT (_private->xl_array); +		 +		count = 0; +		trav = this->children; +		while (trav) { +			_private->xl_array[count++] = trav->xlator; +			trav = trav->next; +		} +		_private->xl_array[count] = _private->namespace; +		 +		/* self-heal part, start with generation '1' */ +		_private->inode_generation = 1;  +                /* Because, Foreground part is tested well */ +		_private->self_heal = ZR_UNIFY_FG_SELF_HEAL;  +		data = dict_get (this->options, "self-heal"); +		if (data) { +			if (strcasecmp (data->data, "off") == 0)  +				_private->self_heal = ZR_UNIFY_SELF_HEAL_OFF; + +			if (strcasecmp (data->data, "foreground") == 0) +				_private->self_heal = ZR_UNIFY_FG_SELF_HEAL; + +			if (strcasecmp (data->data, "background") == 0) +				_private->self_heal = ZR_UNIFY_BG_SELF_HEAL; +		} +     +		/* optimist - ask bulde for more about it */ +		data = dict_get (this->options, "optimist"); +		if (data) { +			if (gf_string2boolean (data->data,  +					       &_private->optimist) == -1) { +				gf_log (this->name, GF_LOG_ERROR,  +					"optimist excepts only boolean " +					"options"); +			} +		} + +		LOCK_INIT (&_private->lock); +	} + +	/* Now that everything is fine. */ +	this->private = (void *)_private; +	{ +		/* Initialize scheduler, if everything else is successful */ +		ret = _private->sched_ops->init (this);  +		if (ret == -1) { +			gf_log (this->name, GF_LOG_CRITICAL, +				"Initializing scheduler failed, Exiting"); +			FREE (_private); +			return -1; +		} + +		ret = 0; + +		/* This section is required because some fops may look  +		 * for 'xl->parent' variable  +		 */ +		xlparent = CALLOC (1, sizeof (*xlparent)); +		xlparent->xlator = this; +		if (!ns_xl->parents) { +			ns_xl->parents = xlparent; +		} else { +			parent = ns_xl->parents; +			while (parent->next) +				parent = parent->next; +			parent->next = xlparent; +		} +		/* Initialize the namespace volume */ +		if (!ns_xl->ready) { +			ret = xlator_tree_init (ns_xl); +			if (ret) { +				gf_log (this->name, GF_LOG_ERROR,  +					"initializing namespace node failed, " +					"Exiting"); +			FREE (_private); +			return -1; +			} +		} +	} + +	/* Tell namespace node that init is done */ +	ns_xl->notify (ns_xl, GF_EVENT_PARENT_UP, this); + +	return 0; +} + +/**  + * fini  - Free all the allocated memory  + */ +void +fini (xlator_t *this) +{ +	unify_private_t *priv = this->private; +	priv->sched_ops->fini (this); +	this->private = NULL; +	LOCK_DESTROY (&priv->lock); +	FREE (priv->xl_array); +	FREE (priv); +	return; +} + + +struct xlator_fops fops = { +	.stat        = unify_stat, +	.chmod       = unify_chmod, +	.readlink    = unify_readlink, +	.mknod       = unify_mknod, +	.mkdir       = unify_mkdir, +	.unlink      = unify_unlink, +	.rmdir       = unify_rmdir, +	.symlink     = unify_symlink, +	.rename      = unify_rename, +	.link        = unify_link, +	.chown       = unify_chown, +	.truncate    = unify_truncate, +	.create      = unify_create, +	.open        = unify_open, +	.readv       = unify_readv, +	.writev      = unify_writev, +	.statfs      = unify_statfs, +	.flush       = unify_flush, +	.fsync       = unify_fsync, +	.setxattr    = unify_setxattr, +	.getxattr    = unify_getxattr, +	.removexattr = unify_removexattr, +	.opendir     = unify_opendir, +	.readdir     = unify_readdir, +	.fsyncdir    = unify_fsyncdir, +	.access      = unify_access, +	.ftruncate   = unify_ftruncate, +	.fstat       = unify_fstat, +	.lk          = unify_lk, +	.fchown      = unify_fchown, +	.fchmod      = unify_fchmod, +	.utimens     = unify_utimens, +	.lookup      = unify_lookup, +	.getdents    = unify_getdents, +	.checksum    = unify_checksum, +	.inodelk     = unify_inodelk, +	.finodelk    = unify_finodelk, +	.entrylk     = unify_entrylk, +	.fentrylk    = unify_fentrylk, +	.xattrop     = unify_xattrop, +	.fxattrop    = unify_fxattrop +}; + +struct xlator_mops mops = { +}; + +struct xlator_cbks cbks = { +}; + +struct volume_options options[] = { +	{ .key   = { "namespace" },   +	  .type  = GF_OPTION_TYPE_XLATOR  +	}, +	{ .key   = { "scheduler" },   +	  .value = { "alu", "rr", "random", "nufa", "switch" }, +	  .type  = GF_OPTION_TYPE_STR +	}, +	{ .key   = {"self-heal"},   +	  .value = { "foreground", "background", "off" }, +	  .type  = GF_OPTION_TYPE_STR +	}, +	/* TODO: remove it some time later */ +	{ .key   = {"optimist"},   +	  .type  = GF_OPTION_TYPE_BOOL  +	}, + +	{ .key   = {NULL} }, +}; diff --git a/xlators/cluster/unify/src/unify.h b/xlators/cluster/unify/src/unify.h new file mode 100644 index 00000000000..bc18dc53f52 --- /dev/null +++ b/xlators/cluster/unify/src/unify.h @@ -0,0 +1,132 @@ +/* +  Copyright (c) 2006, 2007, 2008 Z RESEARCH, Inc. <http://www.zresearch.com> +  This file is part of GlusterFS. + +  GlusterFS is free software; you can redistribute it and/or modify +  it under the terms of the GNU General Public License as published +  by the Free Software Foundation; either version 3 of the License, +  or (at your option) any later version. + +  GlusterFS is distributed in the hope that it will be useful, but +  WITHOUT ANY WARRANTY; without even the implied warranty of +  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU +  General Public License for more details. + +  You should have received a copy of the GNU General Public License +  along with this program.  If not, see +  <http://www.gnu.org/licenses/>. +*/ + +#ifndef _CONFIG_H +#define _CONFIG_H +#include "config.h" +#endif + +#ifndef _UNIFY_H +#define _UNIFY_H + +#include "scheduler.h" +#include "list.h" + +#define MAX_DIR_ENTRY_STRING     (32 * 1024) + +#define ZR_UNIFY_SELF_HEAL_OFF 0 +#define ZR_UNIFY_FG_SELF_HEAL  1 +#define ZR_UNIFY_BG_SELF_HEAL  2 + +/* Sometimes one should use completely random numbers.. its good :p */ +#define UNIFY_SELF_HEAL_GETDENTS_COUNT 1024 + +#define NS(xl)          (((unify_private_t *)xl->private)->namespace) + +/* This is used to allocate memory for local structure */ +#define INIT_LOCAL(fr, loc)                   \ +do {                                          \ +  loc = CALLOC (1, sizeof (unify_local_t));   \ +  ERR_ABORT (loc);			      \ +  if (!loc) {                                 \ +    STACK_UNWIND (fr, -1, ENOMEM);            \ +    return 0;                                 \ +  }                                           \ +  fr->local = loc;                            \ +  loc->op_ret = -1;                           \ +  loc->op_errno = ENOENT;                     \ +} while (0) + + + +struct unify_private { +	/* Update this structure depending on requirement */ +	void *scheduler;               /* THIS SHOULD BE THE FIRST VARIABLE,  +					  if xlator is using scheduler */ +	struct sched_ops *sched_ops;   /* Scheduler options  */ +	xlator_t *namespace;           /* ptr to namespace xlator */ +	xlator_t **xl_array; +	gf_boolean_t optimist; +	int16_t child_count; +	int16_t num_child_up; +	uint8_t self_heal; +	uint8_t is_up; +	uint64_t inode_generation; +	gf_lock_t lock; +}; +typedef struct unify_private unify_private_t; + +struct unify_self_heal_struct { +	uint8_t dir_checksum[ZR_FILENAME_MAX]; +	uint8_t ns_dir_checksum[ZR_FILENAME_MAX]; +	uint8_t file_checksum[ZR_FILENAME_MAX]; +	uint8_t ns_file_checksum[ZR_FILENAME_MAX]; +	off_t *offset_list; +	int   *count_list; +	dir_entry_t **entry_list; +}; + + +struct _unify_local_t { +	int32_t call_count; +	int32_t op_ret; +	int32_t op_errno; +	mode_t mode; +	off_t offset; +	dev_t dev; +	uid_t uid; +	gid_t gid; +	int32_t flags; +	int32_t entry_count; +	int32_t count;    // dir_entry_t count; +	fd_t *fd; +	struct stat stbuf; +	struct statvfs statvfs_buf; +	struct timespec tv[2]; +	char *name; +	int32_t revalidate; + +	ino_t st_ino; +	nlink_t st_nlink; +   +	dict_t *dict; + +	int16_t *list; +	int16_t *new_list; /* Used only in case of rename */ +	int16_t index; + +	int32_t failed; +	int32_t return_eio;  /* Used in case of different st-mode  +				present for a given path */ + +	uint64_t inode_generation; /* used to store the per directory  +				    * inode_generation. Got from inode's ctx  +				    * of directory inodes +				    */ + +	struct unify_self_heal_struct *sh_struct; +	loc_t loc1, loc2; +}; +typedef struct _unify_local_t unify_local_t; + +int32_t zr_unify_self_heal (call_frame_t *frame, +			    xlator_t *this, +			    unify_local_t *local); + +#endif /* _UNIFY_H */  | 
