diff options
Diffstat (limited to 'xlators/performance')
26 files changed, 8190 insertions, 0 deletions
diff --git a/xlators/performance/Makefile.am b/xlators/performance/Makefile.am new file mode 100644 index 00000000000..f7504bbe8f3 --- /dev/null +++ b/xlators/performance/Makefile.am @@ -0,0 +1,3 @@ +SUBDIRS = write-behind read-ahead io-threads io-cache symlink-cache + +CLEANFILES =  diff --git a/xlators/performance/io-cache/Makefile.am b/xlators/performance/io-cache/Makefile.am new file mode 100644 index 00000000000..d471a3f9243 --- /dev/null +++ b/xlators/performance/io-cache/Makefile.am @@ -0,0 +1,3 @@ +SUBDIRS = src + +CLEANFILES =  diff --git a/xlators/performance/io-cache/src/Makefile.am b/xlators/performance/io-cache/src/Makefile.am new file mode 100644 index 00000000000..b1bf5bfbf71 --- /dev/null +++ b/xlators/performance/io-cache/src/Makefile.am @@ -0,0 +1,14 @@ +xlator_LTLIBRARIES = io-cache.la +xlatordir = $(libdir)/glusterfs/$(PACKAGE_VERSION)/xlator/performance + +io_cache_la_LDFLAGS = -module -avoidversion  + +io_cache_la_SOURCES = io-cache.c page.c ioc-inode.c +io_cache_la_LIBADD = $(top_builddir)/libglusterfs/src/libglusterfs.la + +noinst_HEADERS = io-cache.h + +AM_CFLAGS = -fPIC -D_FILE_OFFSET_BITS=64 -D_GNU_SOURCE -Wall -D$(GF_HOST_OS) \ +	-I$(top_srcdir)/libglusterfs/src -shared -nostartfiles $(GF_CFLAGS) + +CLEANFILES =  diff --git a/xlators/performance/io-cache/src/io-cache.c b/xlators/performance/io-cache/src/io-cache.c new file mode 100644 index 00000000000..f367cdb88de --- /dev/null +++ b/xlators/performance/io-cache/src/io-cache.c @@ -0,0 +1,1478 @@ +/* +  Copyright (c) 2007, 2008, 2009 Z RESEARCH, Inc. <http://www.zresearch.com> +  This file is part of GlusterFS. + +  GlusterFS is free software; you can redistribute it and/or modify +  it under the terms of the GNU General Public License as published +  by the Free Software Foundation; either version 3 of the License, +  or (at your option) any later version. + +  GlusterFS is distributed in the hope that it will be useful, but +  WITHOUT ANY WARRANTY; without even the implied warranty of +  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU +  General Public License for more details. + +  You should have received a copy of the GNU General Public License +  along with this program.  If not, see +  <http://www.gnu.org/licenses/>. +*/ + +#ifndef _CONFIG_H +#define _CONFIG_H +#include "config.h" +#endif + +#include "glusterfs.h" +#include "logging.h" +#include "dict.h" +#include "xlator.h" +#include "io-cache.h" +#include <assert.h> +#include <sys/time.h> + +static uint32_t +ioc_get_priority (ioc_table_t *table,  +		  const char *path); + +static uint32_t +ioc_get_priority (ioc_table_t *table,  +		  const char *path); + +static inline ioc_inode_t * +ioc_inode_reupdate (ioc_inode_t *ioc_inode) +{ +	ioc_table_t *table = ioc_inode->table; + +	list_add_tail (&ioc_inode->inode_lru,  +		       &table->inode_lru[ioc_inode->weight]); +   +	return ioc_inode; +} + +static inline ioc_inode_t * +ioc_get_inode (dict_t *dict, +	       char *name) +{ +	ioc_inode_t *ioc_inode = NULL; +	data_t *ioc_inode_data = dict_get (dict, name); +	ioc_table_t *table = NULL; + +	if (ioc_inode_data) { +		ioc_inode = data_to_ptr (ioc_inode_data); +		table = ioc_inode->table; + +		ioc_table_lock (table); +		{ +			if (list_empty (&ioc_inode->inode_lru)) { +				ioc_inode = ioc_inode_reupdate (ioc_inode); +			} +		} +		ioc_table_unlock (table); +	} +   +	return ioc_inode; +} + +int32_t +ioc_inode_need_revalidate (ioc_inode_t *ioc_inode) +{ +	int8_t need_revalidate = 0; +	struct timeval tv = {0,}; +	int32_t ret = -1; +	ioc_table_t *table = ioc_inode->table; + +	ret = gettimeofday (&tv, NULL); + +	if (time_elapsed (&tv, &ioc_inode->tv) >= table->cache_timeout) +		need_revalidate = 1; + +	return need_revalidate; +} + +/* + * __ioc_inode_flush - flush all the cached pages of the given inode + * + * @ioc_inode:  + * + * assumes lock is held + */ +int32_t +__ioc_inode_flush (ioc_inode_t *ioc_inode) +{ +	ioc_page_t *curr = NULL, *next = NULL; +	int32_t destroy_size = 0; +	int32_t ret = 0; + +	list_for_each_entry_safe (curr, next, &ioc_inode->pages, pages) { +		ret = ioc_page_destroy (curr); +     +		if (ret != -1)  +			destroy_size += ret; +	} +   +	return destroy_size; +} + +void +ioc_inode_flush (ioc_inode_t *ioc_inode) +{ +	int32_t destroy_size = 0;     + +	ioc_inode_lock (ioc_inode); +	{ +		destroy_size = __ioc_inode_flush (ioc_inode); +	} +	ioc_inode_unlock (ioc_inode); +   +	if (destroy_size) { +		ioc_table_lock (ioc_inode->table); +		{ +			ioc_inode->table->cache_used -= destroy_size; +		} +		ioc_table_unlock (ioc_inode->table); +	} + +	return; +} + +/*  + * ioc_utimens_cbk - + *  + * @frame: + * @cookie: + * @this: + * @op_ret: + * @op_errno: + * @stbuf: + * + */ +int32_t +ioc_utimens_cbk (call_frame_t *frame, +		 void *cookie, +		 xlator_t *this, +		 int32_t op_ret, +		 int32_t op_errno, +		 struct stat *stbuf) +{ +	STACK_UNWIND (frame, op_ret, op_errno, stbuf); +	return 0; +} + +/*  + * ioc_utimens - + *  + * @frame: + * @this: + * @loc: + * @tv: + * + */ +int32_t +ioc_utimens (call_frame_t *frame, +	     xlator_t *this, +	     loc_t *loc, +	     struct timespec *tv) +{ +	uint64_t ioc_inode = 0; +	inode_ctx_get (loc->inode, this, &ioc_inode); + +	if (ioc_inode) +		ioc_inode_flush ((ioc_inode_t *)(long)ioc_inode); + +	STACK_WIND (frame, ioc_utimens_cbk, +		    FIRST_CHILD (this), +		    FIRST_CHILD (this)->fops->utimens, +		    loc, tv); +	return 0; +} + +int32_t +ioc_lookup_cbk (call_frame_t *frame, +		void *cookie, +		xlator_t *this, +		int32_t op_ret, +		int32_t op_errno, +		inode_t *inode, +		struct stat *stbuf, +		dict_t *dict) +{ +	ioc_inode_t *ioc_inode = NULL; +	ioc_local_t *local = frame->local; +	ioc_table_t *table = this->private; +	ioc_page_t  *page = NULL; +	data_t      *page_data = NULL; +	data_t      *content_data = NULL; +	char        *src = NULL; +	char        *dst = NULL; +	char         need_unref = 0; +	uint8_t      cache_still_valid = 0; +	uint32_t     weight = 0; +	uint64_t     tmp_ioc_inode = 0; +	char        *buf = NULL; +	char        *tmp = NULL; +	int          i; +	 +	if (op_ret != 0)  +		goto out; + +	inode_ctx_get (inode, this, &tmp_ioc_inode); +	ioc_inode = (ioc_inode_t *)(long)tmp_ioc_inode; +	if (ioc_inode) { +		cache_still_valid = ioc_cache_still_valid (ioc_inode,  +							   stbuf); +		 +		if (!cache_still_valid) { +			ioc_inode_flush (ioc_inode); +		}  +		/* update the time-stamp of revalidation */ +		ioc_inode_lock (ioc_inode); +		{ +			gettimeofday (&ioc_inode->tv, NULL); +		} +		ioc_inode_unlock (ioc_inode); +		 +		ioc_table_lock (ioc_inode->table); +		{ +			list_move_tail (&ioc_inode->inode_lru, +					&table->inode_lru[ioc_inode->weight]); +		} +		ioc_table_unlock (ioc_inode->table); +	} +	 +	if (local && stbuf->st_size &&  +	    local->need_xattr >= stbuf->st_size) { +		if (!ioc_inode) { +			weight = ioc_get_priority (table,  +						   local->file_loc.path); +			ioc_inode = ioc_inode_update (table,  +						      inode, weight); +			inode_ctx_put (inode, this,  +				       (uint64_t)(long)ioc_inode); +		} +		 +		ioc_inode_lock (ioc_inode); +		{ +			content_data = dict_get (dict, "glusterfs.content"); +			page = ioc_page_get (ioc_inode, 0); +			 +			if (content_data) { +				if (page) { +					dict_unref (page->ref); +					free (page->vector); +					page->vector = NULL; +					 +					ioc_table_lock (table); +					{ +						table->cache_used -=  +							page->size; +					} +					ioc_table_unlock (table); +				} else { +					page = ioc_page_create (ioc_inode, 0); +				} +				 +				dst = CALLOC (1, stbuf->st_size); +				page->ref = dict_ref (get_new_dict ()); +				page_data = data_from_dynptr (dst,  +							      stbuf->st_size); +				dict_set (page->ref, NULL, page_data); +				 +				src = data_to_ptr (content_data); +				memcpy (dst, src, stbuf->st_size); + +				page->vector = CALLOC (1,  +						       sizeof (*page->vector)); +				page->vector->iov_base = dst; +				page->vector->iov_len = stbuf->st_size; +				page->count = 1; +       +				page->waitq = NULL; +				page->size = stbuf->st_size; +				page->ready = 1; + +				ioc_table_lock (table); +				{ +					table->cache_used += page->size; +				} +				ioc_table_unlock (table); +				 +			} else { +				if (!(page && page->ready)) { +					gf_log (this->name, GF_LOG_DEBUG, +						"page not present"); +					 +					ioc_inode_unlock (ioc_inode); +					STACK_WIND (frame, +						    ioc_lookup_cbk, +						    FIRST_CHILD (this), +						    FIRST_CHILD (this)->fops->lookup, +						    &local->file_loc, +						    local->xattr_req); +					return 0; +				}  +				buf = CALLOC (1, stbuf->st_size); +				tmp = buf; + +				for (i = 0; i < page->count; i++) { +					memcpy (tmp, page->vector[i].iov_base,  +						page->vector[i].iov_len); +					tmp += page->vector[i].iov_len; +				} +				 +				gf_log (this->name, GF_LOG_DEBUG, +					"serving file %s from cache",  +					local->file_loc.path); +				 +				if (!dict) { +					need_unref = 1; +					dict = dict_ref ( +						get_new_dict ()); +				} +				dict_set (dict, "glusterfs.content", +					  data_from_dynptr (buf,  +							    stbuf->st_size)); +			} + +			ioc_inode->mtime = stbuf->st_mtime; +			gettimeofday (&ioc_inode->tv, NULL); +		} +		ioc_inode_unlock (ioc_inode); +		 +		if (content_data &&  +		    ioc_need_prune (ioc_inode->table)) { +			ioc_prune (ioc_inode->table); +		} +	} + + out: +	STACK_UNWIND (frame, op_ret, op_errno, inode, stbuf, dict); + +	if (need_unref) { +		dict_unref (dict); +	} + +	return 0; +} + +int32_t  +ioc_lookup (call_frame_t *frame, +	    xlator_t *this, +	    loc_t *loc, +	    dict_t *xattr_req) +{ +	uint64_t content_limit = 0; + +	if (GF_FILE_CONTENT_REQUESTED(xattr_req, &content_limit)) { +		uint64_t     tmp_ioc_inode = 0; +		ioc_inode_t *ioc_inode = NULL; +		ioc_page_t  *page = NULL; +		ioc_local_t *local = CALLOC (1, sizeof (*local)); + +		local->need_xattr = content_limit; +		local->file_loc.path = loc->path; +		local->file_loc.inode = loc->inode; +		frame->local = local; + +		inode_ctx_get (loc->inode, this, &tmp_ioc_inode); +		ioc_inode = (ioc_inode_t *)(long)tmp_ioc_inode; + +		if (ioc_inode) { +			ioc_inode_lock (ioc_inode); +			{ +				page = ioc_page_get (ioc_inode, 0); +				if ((content_limit <=  +				     ioc_inode->table->page_size) &&  +				    page && page->ready) { +					local->need_xattr = -1; +				} +			} +			ioc_inode_unlock (ioc_inode); +		} +	} + +	STACK_WIND (frame, +		    ioc_lookup_cbk, +		    FIRST_CHILD (this), +		    FIRST_CHILD (this)->fops->lookup, +		    loc, +		    xattr_req); +	return 0; +} + +/* + * ioc_forget -  + * + * @frame: + * @this: + * @inode: + * + */ +int32_t +ioc_forget (xlator_t *this, +	    inode_t *inode) +{ +	uint64_t ioc_inode = 0; + +	inode_ctx_get (inode, this, &ioc_inode); + +	if (ioc_inode) +		ioc_inode_destroy ((ioc_inode_t *)(long)ioc_inode); +	     +	return 0; +} + + +/*  + * ioc_cache_validate_cbk -  + * + * @frame: + * @cookie: + * @this: + * @op_ret: + * @op_errno: + * @buf + * + */ +int32_t +ioc_cache_validate_cbk (call_frame_t *frame, +			void *cookie, +			xlator_t *this, +			int32_t op_ret, +			int32_t op_errno, +			struct stat *stbuf) +{ +	ioc_local_t *local = frame->local; +	ioc_inode_t *ioc_inode = NULL; +	size_t destroy_size = 0; +	struct stat *local_stbuf = stbuf; + +	ioc_inode = local->inode; + +	if ((op_ret == -1) ||  +	    ((op_ret >= 0) && !ioc_cache_still_valid(ioc_inode, stbuf))) { +		gf_log (ioc_inode->table->xl->name, GF_LOG_DEBUG, +			"cache for inode(%p) is invalid. flushing all pages", +			ioc_inode); +		/* NOTE: only pages with no waiting frames are flushed by  +		 * ioc_inode_flush. page_fault will be generated for all  +		 * the pages which have waiting frames by ioc_inode_wakeup() +		 */ +		ioc_inode_lock (ioc_inode); +		{ +			destroy_size = __ioc_inode_flush (ioc_inode); +			if (op_ret >= 0) +				ioc_inode->mtime = stbuf->st_mtime; +		} +		ioc_inode_unlock (ioc_inode); +		local_stbuf = NULL; +	} + +	if (destroy_size) { +		ioc_table_lock (ioc_inode->table); +		{ +			ioc_inode->table->cache_used -= destroy_size; +		} +		ioc_table_unlock (ioc_inode->table); +	} + +	if (op_ret < 0) +		local_stbuf = NULL; +   +	ioc_inode_lock (ioc_inode); +	{ +		gettimeofday (&ioc_inode->tv, NULL); +	} +	ioc_inode_unlock (ioc_inode); + +	ioc_inode_wakeup (frame, ioc_inode, local_stbuf); +   +	/* any page-fault initiated by ioc_inode_wakeup() will have its own  +	 * fd_ref on fd, safe to unref validate frame's private copy  +	 */ +	fd_unref (local->fd); + +	STACK_DESTROY (frame->root); + +	return 0; +} + +static int32_t +ioc_wait_on_inode (ioc_inode_t *ioc_inode,  +		   ioc_page_t *page) +{ +	ioc_waitq_t *waiter = NULL, *trav = NULL; +	uint32_t page_found = 0; + +	trav = ioc_inode->waitq; + +	while (trav) { +		if (trav->data == page) { +			page_found = 1; +			break; +		} +		trav = trav->next; +	} +   +	if (!page_found) { +		waiter = CALLOC (1, sizeof (ioc_waitq_t)); +		ERR_ABORT (waiter); +		waiter->data = page; +		waiter->next = ioc_inode->waitq; +		ioc_inode->waitq = waiter; +	} +   +	return 0; +} + +/* + * ioc_cache_validate - + * + * @frame: + * @ioc_inode: + * @fd: + * + */ +static int32_t +ioc_cache_validate (call_frame_t *frame, +		    ioc_inode_t *ioc_inode, +		    fd_t *fd, +		    ioc_page_t *page) +{ +	call_frame_t *validate_frame = NULL; +	ioc_local_t *validate_local = NULL; + +	validate_local = CALLOC (1, sizeof (ioc_local_t)); +	ERR_ABORT (validate_local); +	validate_frame = copy_frame (frame); +	validate_local->fd = fd_ref (fd); +	validate_local->inode = ioc_inode; +	validate_frame->local = validate_local; +     +	STACK_WIND (validate_frame, +		    ioc_cache_validate_cbk, +		    FIRST_CHILD (frame->this), +		    FIRST_CHILD (frame->this)->fops->fstat, +		    fd); + +	return 0; +} + +static inline uint32_t +is_match (const char *path, +	  const char *pattern) +{ +	char *pathname = strdup (path); +	int32_t ret = 0; + +	ret = fnmatch (pattern, path, FNM_NOESCAPE); +   +	free (pathname); +   +	return (ret == 0); +} + +static uint32_t +ioc_get_priority (ioc_table_t *table,  +		  const char *path) +{ +	uint32_t priority = 0; +	struct ioc_priority *curr = NULL; +   +	list_for_each_entry (curr, &table->priority_list, list) { +		if (is_match (path, curr->pattern))  +			priority = curr->priority; +	} + +	return priority; +} + +/*  + * ioc_open_cbk - open callback for io cache + * + * @frame: call frame + * @cookie: + * @this: + * @op_ret: + * @op_errno: + * @fd: + * + */ +int32_t +ioc_open_cbk (call_frame_t *frame, +	      void *cookie, +	      xlator_t *this, +	      int32_t op_ret, +	      int32_t op_errno, +	      fd_t *fd) +{ +	uint64_t     tmp_ioc_inode = 0; +	ioc_local_t *local = frame->local; +	ioc_table_t *table = this->private; +	ioc_inode_t *ioc_inode = NULL; +	inode_t *inode = local->file_loc.inode; +	uint32_t weight = 0; +	const char *path = local->file_loc.path; + +	if (op_ret != -1) { +		/* look for ioc_inode corresponding to this fd */ +		LOCK (&fd->inode->lock); +		//{ + +		inode_ctx_get (fd->inode, this, &tmp_ioc_inode); +		ioc_inode = (ioc_inode_t *)(long)tmp_ioc_inode; +       +		if (!ioc_inode) { +			/* this is the first time someone is opening this  +			   file, assign weight  +			*/ +			weight = ioc_get_priority (table, path); +  +			ioc_inode = ioc_inode_update (table, inode, weight); +			inode_ctx_put (fd->inode, this,  +				       (uint64_t)(long)ioc_inode); +		} else { +			ioc_table_lock (ioc_inode->table); +			//{ +			list_move_tail (&ioc_inode->inode_lru, +					&table->inode_lru[ioc_inode->weight]); +			//} +			ioc_table_unlock (ioc_inode->table); +		} + +		//} +		UNLOCK (&fd->inode->lock); + +		/* If mandatory locking has been enabled on this file, +		   we disable caching on it */ +		if (((inode->st_mode & S_ISGID) &&  +		     !(inode->st_mode & S_IXGRP))) { +			fd_ctx_set (fd, this, 1); +		} +   +		/* If O_DIRECT open, we disable caching on it */ +		if ((local->flags & O_DIRECT)){ +			/* O_DIRECT is only for one fd, not the inode  +			 * as a whole  +			 */ +			fd_ctx_set (fd, this, 1); +		} +	} + +	FREE (local); +	frame->local = NULL; + +	STACK_UNWIND (frame, op_ret, op_errno, fd); + +	return 0; +} + +/* + * ioc_create_cbk - create callback for io cache + * + * @frame: call frame + * @cookie: + * @this: + * @op_ret: + * @op_errno: + * @fd: + * @inode: + * @buf: + * + */ +int32_t +ioc_create_cbk (call_frame_t *frame, +		void *cookie, +		xlator_t *this, +		int32_t op_ret, +		int32_t op_errno, +		fd_t *fd, +		inode_t *inode, +		struct stat *buf) +{ +	ioc_local_t *local = frame->local; +	ioc_table_t *table = this->private; +	ioc_inode_t *ioc_inode = NULL; +	uint32_t weight = 0; +	const char *path = local->file_loc.path; + +	if (op_ret != -1) { +		{ +			/* assign weight */ +			weight = ioc_get_priority (table, path); + +			ioc_inode = ioc_inode_update (table, inode, weight); +			LOCK (&fd->inode->lock); +			{ +				inode_ctx_put (fd->inode, this,  +					       (uint64_t)(long)ioc_inode); +			} +			UNLOCK (&fd->inode->lock); +		} +		/* If mandatory locking has been enabled on this file, +		   we disable caching on it */ +		if ((inode->st_mode & S_ISGID) &&  +		    !(inode->st_mode & S_IXGRP)) { +			fd_ctx_set (fd, this, 1); +		} + +		/* If O_DIRECT open, we disable caching on it */ +		if (local->flags & O_DIRECT){ +			/* O_DIRECT is only for one fd, not the inode  +			 * as a whole  +			 */ +			fd_ctx_set (fd, this, 1); +		} +     +	} +   +	frame->local = NULL; +	FREE (local); + +	STACK_UNWIND (frame, op_ret, op_errno, fd, inode, buf); + +	return 0; +} + +/* + * ioc_open - open fop for io cache + * @frame: + * @this: + * @loc: + * @flags: + * + */ +int32_t +ioc_open (call_frame_t *frame, +	  xlator_t *this, +	  loc_t *loc, +	  int32_t flags, +	  fd_t *fd) +{ +   +	ioc_local_t *local = CALLOC (1, sizeof (ioc_local_t)); +	ERR_ABORT (local); + +	local->flags = flags; +	local->file_loc.path = loc->path; +	local->file_loc.inode = loc->inode; +   +	frame->local = local; +   +	STACK_WIND (frame, +		    ioc_open_cbk, +		    FIRST_CHILD(this), +		    FIRST_CHILD(this)->fops->open, +		    loc, +		    flags, +		    fd); + +	return 0; +} + +/* + * ioc_create - create fop for io cache + *  + * @frame: + * @this: + * @pathname: + * @flags: + * @mode: + * + */ +int32_t +ioc_create (call_frame_t *frame, +	    xlator_t *this, +	    loc_t *loc, +	    int32_t flags, +	    mode_t mode, +	    fd_t *fd) +{ +	ioc_local_t *local = CALLOC (1, sizeof (ioc_local_t)); +	ERR_ABORT (local); + +	local->flags = flags; +	local->file_loc.path = loc->path; +	frame->local = local; + +	STACK_WIND (frame, ioc_create_cbk, +		    FIRST_CHILD(this), +		    FIRST_CHILD(this)->fops->create, +		    loc, flags, mode, fd); +	return 0; +} + + + + +/* + * ioc_release - release fop for io cache + *  + * @frame: + * @this: + * @fd: + * + */ +int32_t +ioc_release (xlator_t *this, +	     fd_t *fd) +{ +	return 0; +} + +/*  + * ioc_readv_disabled_cbk  + * @frame: + * @cookie: + * @this: + * @op_ret: + * @op_errno: + * @vector: + * @count: + * + */  +int32_t +ioc_readv_disabled_cbk (call_frame_t *frame,  +			void *cookie, +			xlator_t *this, +			int32_t op_ret, +			int32_t op_errno, +			struct iovec *vector, +			int32_t count, +			struct stat *stbuf) +{ +	STACK_UNWIND (frame, op_ret, op_errno, vector, count, stbuf); +	return 0; +} + + +int32_t +ioc_need_prune (ioc_table_t *table) +{ +	int64_t cache_difference = 0; +   +	ioc_table_lock (table); +	{ +		cache_difference = table->cache_used - table->cache_size; +	} +	ioc_table_unlock (table); + +	if (cache_difference > 0) +		return 1; +	else  +		return 0; +} + +/* + * dispatch_requests - + *  + * @frame: + * @inode: + * + *  + */ +static void +dispatch_requests (call_frame_t *frame, +		   ioc_inode_t *ioc_inode, +		   fd_t *fd, +		   off_t offset, +		   size_t size) +{ +	ioc_local_t *local = frame->local; +	ioc_table_t *table = ioc_inode->table; +	ioc_page_t  *trav = NULL; +	ioc_waitq_t *waitq = NULL; +	off_t   rounded_offset = 0; +	off_t   rounded_end = 0; +	off_t   trav_offset = 0; +	int32_t fault = 0; +	int8_t  need_validate = 0; +	int8_t  might_need_validate = 0;  /* if a page exists, do we need  +					    to validate it? */ + +	rounded_offset = floor (offset, table->page_size); +	rounded_end = roof (offset + size, table->page_size); +	trav_offset = rounded_offset; + +	/* once a frame does read, it should be waiting on something */ +	local->wait_count++; + +	/* Requested region can fall in three different pages, +	 * 1. Ready - region is already in cache, we just have to serve it. +	 * 2. In-transit - page fault has been generated on this page, we need +	 *    to wait till the page is ready +	 * 3. Fault - page is not in cache, we have to generate a page fault +	 */ + +	might_need_validate = ioc_inode_need_revalidate (ioc_inode); + +	while (trav_offset < rounded_end) { +		size_t trav_size = 0; +		off_t local_offset = 0; + +		ioc_inode_lock (ioc_inode); +		//{ + +		/* look for requested region in the cache */ +		trav = ioc_page_get (ioc_inode, trav_offset); + +		local_offset = max (trav_offset, offset); +		trav_size = min (((offset+size) - local_offset),  +				 table->page_size); + +		if (!trav) { +			/* page not in cache, we need to generate page fault */ +			trav = ioc_page_create (ioc_inode, trav_offset); +			fault = 1; +			if (!trav) { +				gf_log (frame->this->name, GF_LOG_CRITICAL, +					"ioc_page_create returned NULL"); +			} +		}  + +		ioc_wait_on_page (trav, frame, local_offset, trav_size); + +		if (trav->ready) { +			/* page found in cache */ +			if (!might_need_validate) { +				/* fresh enough */ +				gf_log (frame->this->name, GF_LOG_DEBUG, +					"cache hit for trav_offset=%"PRId64"" +					"/local_offset=%"PRId64"", +					trav_offset, local_offset); +				waitq = ioc_page_wakeup (trav); +			} else { +				/* if waitq already exists, fstat revalidate is +				   already on the way */ +				if (!ioc_inode->waitq) { +					need_validate = 1; +				} +				ioc_wait_on_inode (ioc_inode, trav); +			} +		} + +		//} +		ioc_inode_unlock (ioc_inode); +     +		ioc_waitq_return (waitq); +		waitq = NULL; + +		if (fault) { +			fault = 0; +			/* new page created, increase the table->cache_used */ +			ioc_page_fault (ioc_inode, frame, fd, trav_offset); +		} + +		if (need_validate) { +			need_validate = 0; +			gf_log (frame->this->name, GF_LOG_DEBUG, +				"sending validate request for " +				"inode(%"PRId64") at offset=%"PRId64"", +				fd->inode->ino, trav_offset); +			ioc_cache_validate (frame, ioc_inode, fd, trav); +		} +     +		trav_offset += table->page_size; +	} + +	ioc_frame_return (frame); + +	if (ioc_need_prune (ioc_inode->table)) { +		ioc_prune (ioc_inode->table); +	} + +	return; +} + + +/* + * ioc_readv - + *  + * @frame: + * @this: + * @fd: + * @size: + * @offset: + * + */ +int32_t +ioc_readv (call_frame_t *frame, +	   xlator_t *this, +	   fd_t *fd, +	   size_t size, +	   off_t offset) +{ +	uint64_t     tmp_ioc_inode = 0; +	ioc_inode_t *ioc_inode = NULL; +	ioc_local_t *local = NULL; +	uint32_t     weight = 0; + +	inode_ctx_get (fd->inode, this, &tmp_ioc_inode); +	ioc_inode = (ioc_inode_t *)(long)tmp_ioc_inode; +	if (!ioc_inode) { +		/* caching disabled, go ahead with normal readv */ +		STACK_WIND (frame,  +			    ioc_readv_disabled_cbk, +			    FIRST_CHILD (frame->this),  +			    FIRST_CHILD (frame->this)->fops->readv, +			    fd,  +			    size,  +			    offset); +		return 0; +	} + +	if (!fd_ctx_get (fd, this, NULL)) { +		/* disable caching for this fd, go ahead with normal readv */ +		STACK_WIND (frame,  +			    ioc_readv_disabled_cbk, +			    FIRST_CHILD (frame->this),  +			    FIRST_CHILD (frame->this)->fops->readv, +			    fd,  +			    size,  +			    offset); +		return 0; +	} + +	local = (ioc_local_t *) CALLOC (1, sizeof (ioc_local_t)); +	ERR_ABORT (local); +	INIT_LIST_HEAD (&local->fill_list); + +	frame->local = local;   +	local->pending_offset = offset; +	local->pending_size = size; +	local->offset = offset; +	local->size = size; +	local->inode = ioc_inode; + +	gf_log (this->name, GF_LOG_DEBUG, +		"NEW REQ (%p) offset = %"PRId64" && size = %"GF_PRI_SIZET"",  +		frame, offset, size); + +	weight = ioc_inode->weight; + +	ioc_table_lock (ioc_inode->table); +	{ +		list_move_tail (&ioc_inode->inode_lru,  +				&ioc_inode->table->inode_lru[weight]); +	} +	ioc_table_unlock (ioc_inode->table); + +	dispatch_requests (frame, ioc_inode, fd, offset, size); +   +	return 0; +} + +/* + * ioc_writev_cbk - + *  + * @frame: + * @cookie: + * @this: + * @op_ret: + * @op_errno: + * + */ +int32_t +ioc_writev_cbk (call_frame_t *frame, +		void *cookie, +		xlator_t *this, +		int32_t op_ret, +		int32_t op_errno, +		struct stat *stbuf) +{ +	ioc_local_t *local     = frame->local; +	uint64_t     ioc_inode = 0; + +	inode_ctx_get (local->fd->inode, this, &ioc_inode); +   +	if (ioc_inode) +		ioc_inode_flush ((ioc_inode_t *)(long)ioc_inode); + +	STACK_UNWIND (frame, op_ret, op_errno, stbuf); +	return 0; +} + +/* + * ioc_writev + *  + * @frame: + * @this: + * @fd: + * @vector: + * @count: + * @offset: + * + */ +int32_t +ioc_writev (call_frame_t *frame, +	    xlator_t *this, +	    fd_t *fd, +	    struct iovec *vector, +	    int32_t count, +	    off_t offset) +{ +	ioc_local_t *local     = NULL; +	uint64_t     ioc_inode = 0; +	 +	local = CALLOC (1, sizeof (ioc_local_t)); +	ERR_ABORT (local); + +	/* TODO: why is it not fd_ref'ed */ +	local->fd = fd; +	frame->local = local; + +	inode_ctx_get (fd->inode, this, &ioc_inode); +	if (ioc_inode) +		ioc_inode_flush ((ioc_inode_t *)(long)ioc_inode); + +	STACK_WIND (frame, +		    ioc_writev_cbk, +		    FIRST_CHILD(this), +		    FIRST_CHILD(this)->fops->writev, +		    fd, +		    vector, +		    count, +		    offset); + +	return 0; +} + +/* + * ioc_truncate_cbk - + *  + * @frame: + * @cookie: + * @this: + * @op_ret: + * @op_errno: + * @buf: + * + */ +int32_t  +ioc_truncate_cbk (call_frame_t *frame, +		  void *cookie, +		  xlator_t *this, +		  int32_t op_ret, +		  int32_t op_errno, +		  struct stat *buf) +{ + +	STACK_UNWIND (frame, op_ret, op_errno, buf); +	return 0; +} + +/* + * ioc_truncate - + *  + * @frame: + * @this: + * @loc: + * @offset: + * + */ +int32_t  +ioc_truncate (call_frame_t *frame, +	      xlator_t *this, +	      loc_t *loc, +	      off_t offset) +{ +	uint64_t ioc_inode = 0; +	inode_ctx_get (loc->inode, this, &ioc_inode); + +	if (ioc_inode) +		ioc_inode_flush ((ioc_inode_t *)(long)ioc_inode); + +	STACK_WIND (frame, +		    ioc_truncate_cbk, +		    FIRST_CHILD(this), +		    FIRST_CHILD(this)->fops->truncate, +		    loc, +		    offset); +	return 0; +} + +/* + * ioc_ftruncate - + *  + * @frame: + * @this: + * @fd: + * @offset: + * + */ +int32_t +ioc_ftruncate (call_frame_t *frame, +	       xlator_t *this, +	       fd_t *fd, +	       off_t offset) +{ +	uint64_t ioc_inode = 0; +	inode_ctx_get (fd->inode, this, &ioc_inode); + +	if (ioc_inode) +		ioc_inode_flush ((ioc_inode_t *)(long)ioc_inode); + +	STACK_WIND (frame, +		    ioc_truncate_cbk, +		    FIRST_CHILD(this), +		    FIRST_CHILD(this)->fops->ftruncate, +		    fd, +		    offset); +	return 0; +} + +int32_t +ioc_lk_cbk (call_frame_t *frame, +	    void *cookie, +	    xlator_t *this, +	    int32_t op_ret, +	    int32_t op_errno, +	    struct flock *lock) +{ +	STACK_UNWIND (frame, op_ret, op_errno, lock); +	return 0; +} + +int32_t  +ioc_lk (call_frame_t *frame, +	xlator_t *this, +	fd_t *fd, +	int32_t cmd, +	struct flock *lock) +{ +	ioc_inode_t *ioc_inode = NULL; +	uint64_t     tmp_inode = 0; + +	inode_ctx_get (fd->inode, this, &tmp_inode); +	ioc_inode = (ioc_inode_t *)(long)tmp_inode; +	if (!ioc_inode) { +		gf_log (this->name, GF_LOG_ERROR, +			"inode context is NULL: returning EBADFD"); +		STACK_UNWIND (frame, -1, EBADFD, NULL); +		return 0; +	} + +	ioc_inode_lock (ioc_inode); +	{ +		gettimeofday (&ioc_inode->tv, NULL); +	} +	ioc_inode_unlock (ioc_inode); + +	STACK_WIND (frame, ioc_lk_cbk,  +		    FIRST_CHILD (this), +		    FIRST_CHILD (this)->fops->lk, fd, cmd, lock); +	return 0; +} + +int32_t +ioc_get_priority_list (const char *opt_str, struct list_head *first) +{ +	int32_t max_pri = 0; +	char *tmp_str = NULL; +	char *tmp_str1 = NULL; +	char *tmp_str2 = NULL; +	char *dup_str = NULL; +	char *stripe_str = NULL; +	char *pattern = NULL; +	char *priority = NULL; +	char *string = strdup (opt_str); +	struct ioc_priority *curr = NULL; + +	/* Get the pattern for cache priority.  +	 * "option priority *.jpg:1,abc*:2" etc  +	 */ +	/* TODO: inode_lru in table is statically hard-coded to 5,  +	 * should be changed to run-time configuration  +	 */ +	stripe_str = strtok_r (string, ",", &tmp_str); +	while (stripe_str) { +		curr = CALLOC (1, sizeof (struct ioc_priority)); +		ERR_ABORT (curr); +		list_add_tail (&curr->list, first); + +		dup_str = strdup (stripe_str); +		pattern = strtok_r (dup_str, ":", &tmp_str1); +		if (!pattern) +			return -1; +		priority = strtok_r (NULL, ":", &tmp_str1); +		if (!priority) +			return -1; +		gf_log ("io-cache",  +			GF_LOG_DEBUG,  +			"ioc priority : pattern %s : priority %s",  +			pattern, +			priority); +		curr->pattern = strdup (pattern); +		curr->priority = strtol (priority, &tmp_str2, 0); +		if (tmp_str2 && (*tmp_str2)) +			return -1; +		else +			max_pri = max (max_pri, curr->priority); +		stripe_str = strtok_r (NULL, ",", &tmp_str); +	} + +	return max_pri; +} + +/* + * init -  + * @this: + * + */ +int32_t  +init (xlator_t *this) +{ +	ioc_table_t *table; +	dict_t *options = this->options; +	uint32_t index = 0; +	char *page_size_string = NULL; +	char *cache_size_string = NULL; + +	if (!this->children || this->children->next) { +		gf_log (this->name, GF_LOG_ERROR, +			"FATAL: io-cache not configured with exactly " +			"one child"); +		return -1; +	} + +	if (!this->parents) { +		gf_log (this->name, GF_LOG_WARNING, +			"dangling volume. check volfile "); +	} + +	table = (void *) CALLOC (1, sizeof (*table)); +	ERR_ABORT (table); +   +	table->xl = this; +	table->page_size = IOC_PAGE_SIZE; +	table->cache_size = IOC_CACHE_SIZE; + +	if (dict_get (options, "page-size")) +		page_size_string = data_to_str (dict_get (options,  +							  "page-size")); + +	if (page_size_string) { +		if (gf_string2bytesize (page_size_string,  +					&table->page_size) != 0) { +			gf_log ("io-cache", GF_LOG_ERROR,  +				"invalid number format \"%s\" of " +				"\"option page-size\"",  +				page_size_string); +			return -1; +		} +		gf_log (this->name, GF_LOG_DEBUG,  +			"using page-size %"PRIu64"",  table->page_size); +	} +   +	if (dict_get (options, "cache-size")) +		cache_size_string = data_to_str (dict_get (options,  +							   "cache-size")); +	if (cache_size_string) { +		if (gf_string2bytesize (cache_size_string,  +					&table->cache_size) != 0) { +			gf_log ("io-cache", GF_LOG_ERROR,  +				"invalid number format \"%s\" of " +				"\"option cache-size\"",  +				cache_size_string); +			return -1; +		} +       +		gf_log (this->name, GF_LOG_DEBUG,  +			"using cache-size %"PRIu64"", table->cache_size); +	} +   +	table->cache_timeout = 1; + +	if (dict_get (options, "cache-timeout")) { +		table->cache_timeout =  +			data_to_uint32 (dict_get (options, +						  "cache-timeout")); +		gf_log (this->name, GF_LOG_DEBUG, +			"Using %d seconds to revalidate cache", +			table->cache_timeout); +	} + +	INIT_LIST_HEAD (&table->priority_list); +	if (dict_get (options, "priority")) { +		char *option_list = data_to_str (dict_get (options,  +							   "priority")); +		gf_log (this->name, GF_LOG_DEBUG, +			"option path %s", option_list); +		/* parse the list of pattern:priority */ +		table->max_pri = ioc_get_priority_list (option_list,  +							&table->priority_list); +     +		if (table->max_pri == -1) +			return -1; +	} +	table->max_pri ++; +	INIT_LIST_HEAD (&table->inodes); +   +	table->inode_lru = CALLOC (table->max_pri, sizeof (struct list_head)); +	ERR_ABORT (table->inode_lru); +	for (index = 0; index < (table->max_pri); index++) +		INIT_LIST_HEAD (&table->inode_lru[index]); + +	pthread_mutex_init (&table->table_lock, NULL); +	this->private = table; +	return 0; +} + +/* + * fini - + *  + * @this: + * + */ +void +fini (xlator_t *this) +{ +	ioc_table_t *table = this->private; + +	pthread_mutex_destroy (&table->table_lock); +	FREE (table); + +	this->private = NULL; +	return; +} + +struct xlator_fops fops = { +	.open        = ioc_open, +	.create      = ioc_create, +	.readv       = ioc_readv, +	.writev      = ioc_writev, +	.truncate    = ioc_truncate, +	.ftruncate   = ioc_ftruncate, +	.utimens     = ioc_utimens, +	.lookup      = ioc_lookup, +	.lk          = ioc_lk +}; + +struct xlator_mops mops = { +}; + +struct xlator_cbks cbks = { +	.forget      = ioc_forget, +  	.release     = ioc_release +}; + +struct volume_options options[] = { +	{ .key  = {"priority"},  +	  .type = GF_OPTION_TYPE_ANY  +	}, +	{ .key  = {"cache-timeout", "force-revalidate-timeout"}, +	  .type = GF_OPTION_TYPE_INT, +	  .min  = 0,  +	  .max  = 60  +	},  +	{ .key  = {"page-size"},  +	  .type = GF_OPTION_TYPE_SIZET,  +	  .min  = 16 * GF_UNIT_KB,  +	  .max  =  4 * GF_UNIT_MB  +	}, +	{ .key  = {"cache-size"},  +	  .type = GF_OPTION_TYPE_SIZET, +	  .min  = 4 * GF_UNIT_MB,  +	  .max  = 6 * GF_UNIT_GB  +	}, +	{ .key = {NULL} }, +}; diff --git a/xlators/performance/io-cache/src/io-cache.h b/xlators/performance/io-cache/src/io-cache.h new file mode 100644 index 00000000000..e997f6e7ca6 --- /dev/null +++ b/xlators/performance/io-cache/src/io-cache.h @@ -0,0 +1,330 @@ +/* +   Copyright (c) 2007, 2008 Z RESEARCH, Inc. <http://www.zresearch.com> +   This file is part of GlusterFS. + +   GlusterFS is free software; you can redistribute it and/or modify +   it under the terms of the GNU General Public License as published +   by the Free Software Foundation; either version 3 of the License, +   or (at your option) any later version. + +   GlusterFS is distributed in the hope that it will be useful, but +   WITHOUT ANY WARRANTY; without even the implied warranty of +   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU +   General Public License for more details. + +   You should have received a copy of the GNU General Public License +   along with this program.  If not, see +   <http://www.gnu.org/licenses/>. +*/ + +#ifndef __IO_CACHE_H +#define __IO_CACHE_H + +#ifndef _CONFIG_H +#define _CONFIG_H +#include "config.h" +#endif + +#include <sys/types.h> +#include "compat-errno.h" + +#include "glusterfs.h" +#include "logging.h" +#include "dict.h" +#include "xlator.h" +#include "common-utils.h" +#include "call-stub.h" +#include <sys/time.h> +#include <fnmatch.h> + +#define IOC_PAGE_SIZE    (1024 * 128)   /* 128KB */ +#define IOC_CACHE_SIZE   (32 * 1024 * 1024) + +struct ioc_table; +struct ioc_local; +struct ioc_page; +struct ioc_inode; + +struct ioc_priority { +	struct list_head list; +	char *pattern; +	uint32_t priority; +}; + +/* + * ioc_waitq - this structure is used to represents the waiting  + *             frames on a page + * + * @next: pointer to next object in waitq + * @data: pointer to the frame which is waiting + */ +struct ioc_waitq { +	struct ioc_waitq *next; +	void *data; +	off_t pending_offset; +	size_t pending_size; +}; + +/* + * ioc_fill -  + * + */ +struct ioc_fill { +	struct list_head list;  /* list of ioc_fill structures of a frame */ +	off_t offset;           +	size_t size;            +	struct iovec *vector;   +	int32_t count; +	dict_t *refs; +}; + +struct ioc_local { +	mode_t mode; +	int32_t flags; +	loc_t file_loc; +	off_t offset; +	size_t size; +	int32_t op_ret; +	int32_t op_errno; +	struct list_head fill_list;      /* list of ioc_fill structures */ +	off_t pending_offset;            /* offset from this frame should continue */ +	size_t pending_size;             /* size of data this frame is waiting on */ +	struct ioc_inode *inode; +	int32_t wait_count; +	pthread_mutex_t local_lock; +	struct ioc_waitq *waitq; +	void *stub; +	fd_t *fd; +	int32_t need_xattr; +	dict_t *xattr_req; +}; + +/* + * ioc_page - structure to store page of data from file  + * + */ +struct ioc_page { +	struct list_head pages; +	struct list_head page_lru; +	struct ioc_inode *inode;   /* inode this page belongs to */ +	struct ioc_priority *priority; +	char dirty; +	char ready; +	struct iovec *vector; +	int32_t count; +	off_t offset; +	size_t size; +	struct ioc_waitq *waitq; +	dict_t *ref; +	pthread_mutex_t page_lock; +}; + +struct ioc_inode { +	struct ioc_table *table; +	struct list_head pages;      /* list of pages of this inode */ +	struct list_head inode_list; /* list of inodes, maintained by io-cache translator */ +	struct list_head inode_lru; +	struct list_head page_lru; +	struct ioc_waitq *waitq; +	pthread_mutex_t inode_lock; +	uint32_t weight;             /* weight of the inode, increases on each read */ +	time_t mtime;             /* mtime of the server file when last cached */ +	struct timeval tv;           /* time-stamp at last re-validate */ +}; + +struct ioc_table { +	uint64_t page_size; +	uint64_t cache_size; +	uint64_t cache_used; +	struct list_head inodes; /* list of inodes cached */ +	struct list_head active;  +	struct list_head *inode_lru; +	struct list_head priority_list; +	int32_t readv_count; +	pthread_mutex_t table_lock; +	xlator_t *xl; +	uint32_t inode_count; +	int32_t cache_timeout; +	int32_t max_pri; +}; + +typedef struct ioc_table ioc_table_t; +typedef struct ioc_local ioc_local_t; +typedef struct ioc_page ioc_page_t; +typedef struct ioc_inode ioc_inode_t; +typedef struct ioc_waitq ioc_waitq_t; +typedef struct ioc_fill ioc_fill_t; + +void * +str_to_ptr (char *string); + +char * +ptr_to_str (void *ptr); + +int32_t  +ioc_readv_disabled_cbk (call_frame_t *frame, +			void *cookie, +			xlator_t *this, +			int32_t op_ret, +			int32_t op_errno, +			struct iovec *vector, +			int32_t count, +			struct stat *stbuf); + +ioc_page_t * +ioc_page_get (ioc_inode_t *ioc_inode, +	      off_t offset); + +ioc_page_t * +ioc_page_create (ioc_inode_t *ioc_inode, +		 off_t offset); + +void +ioc_page_fault (ioc_inode_t *ioc_inode, +		call_frame_t *frame, +		fd_t *fd, +		off_t offset); +void +ioc_wait_on_page (ioc_page_t *page, +		  call_frame_t *frame, +		  off_t offset, +		  size_t size); + +ioc_waitq_t * +ioc_page_wakeup (ioc_page_t *page); + +void +ioc_page_flush (ioc_page_t *page); + +ioc_waitq_t * +ioc_page_error (ioc_page_t *page, +		int32_t op_ret, +		int32_t op_errno); +void +ioc_page_purge (ioc_page_t *page); + +void +ioc_frame_return (call_frame_t *frame); + +void +ioc_waitq_return (ioc_waitq_t *waitq); + +void +ioc_frame_fill (ioc_page_t *page, +		call_frame_t *frame, +		off_t offset, +		size_t size); + +#define ioc_inode_lock(ioc_inode)					\ +	do {								\ +		gf_log (ioc_inode->table->xl->name, GF_LOG_DEBUG,	\ +			"locked inode(%p)", ioc_inode);			\ +		pthread_mutex_lock (&ioc_inode->inode_lock);		\ +	} while (0) + + +#define ioc_inode_unlock(ioc_inode)					\ +	do {								\ +		gf_log (ioc_inode->table->xl->name, GF_LOG_DEBUG,	\ +			"unlocked inode(%p)", ioc_inode);		\ +		pthread_mutex_unlock (&ioc_inode->inode_lock);		\ +	} while (0) + + +#define ioc_table_lock(table)					\ +	do {							\ +		gf_log (table->xl->name, GF_LOG_DEBUG,		\ +			"locked table(%p)", table);		\ +		pthread_mutex_lock (&table->table_lock);	\ +	} while (0) + + +#define ioc_table_unlock(table)					\ +	do {							\ +		gf_log (table->xl->name, GF_LOG_DEBUG,		\ +			"unlocked table(%p)", table);		\ +		pthread_mutex_unlock (&table->table_lock);	\ +	} while (0) + + +#define ioc_local_lock(local)						\ +	do {								\ +		gf_log (local->inode->table->xl->name, GF_LOG_DEBUG,	\ +			"locked local(%p)", local);			\ +		pthread_mutex_lock (&local->local_lock);		\ +	} while (0) + + +#define ioc_local_unlock(local)						\ +	do {								\ +		gf_log (local->inode->table->xl->name, GF_LOG_DEBUG,	\ +			"unlocked local(%p)", local);			\ +		pthread_mutex_unlock (&local->local_lock);		\ +	} while (0) + + +#define ioc_page_lock(page)						\ +	do {								\ +		gf_log (page->inode->table->xl->name, GF_LOG_DEBUG,	\ +			"locked page(%p)", page);			\ +		pthread_mutex_lock (&page->page_lock);			\ +	} while (0) + + +#define ioc_page_unlock(page)						\ +	do {								\ +		gf_log (page->inode->table->xl->name, GF_LOG_DEBUG,	\ +			"unlocked page(%p)", page);			\ +		pthread_mutex_unlock (&page->page_lock);		\ +	} while (0) + + +static inline uint64_t +time_elapsed (struct timeval *now, +	      struct timeval *then) +{ +	uint64_t sec = now->tv_sec - then->tv_sec; + +	if (sec) +		return sec; +   +	return 0; +} + +ioc_inode_t * +ioc_inode_search (ioc_table_t *table, +		  inode_t *inode); + +void  +ioc_inode_destroy (ioc_inode_t *ioc_inode); + +ioc_inode_t * +ioc_inode_update (ioc_table_t *table, +		  inode_t *inode, +		  uint32_t weight); + +int64_t  +ioc_page_destroy (ioc_page_t *page); + +int32_t +__ioc_inode_flush (ioc_inode_t *ioc_inode); + +void +ioc_inode_flush (ioc_inode_t *ioc_inode); + +void +ioc_inode_wakeup (call_frame_t *frame,  +		  ioc_inode_t *ioc_inode,  +		  struct stat *stbuf); + +int8_t +ioc_cache_still_valid (ioc_inode_t *ioc_inode, +		       struct stat *stbuf); + +int32_t +ioc_prune (ioc_table_t *table); + +int32_t +ioc_need_prune (ioc_table_t *table); + +#endif /* __READ_AHEAD_H */ diff --git a/xlators/performance/io-cache/src/ioc-inode.c b/xlators/performance/io-cache/src/ioc-inode.c new file mode 100644 index 00000000000..2e2e561dd23 --- /dev/null +++ b/xlators/performance/io-cache/src/ioc-inode.c @@ -0,0 +1,201 @@ +/* +  Copyright (c) 2007, 2008, 2009 Z RESEARCH, Inc. <http://www.zresearch.com> +  This file is part of GlusterFS. + +  GlusterFS is free software; you can redistribute it and/or modify +  it under the terms of the GNU General Public License as published +  by the Free Software Foundation; either version 3 of the License, +  or (at your option) any later version. + +  GlusterFS is distributed in the hope that it will be useful, but +  WITHOUT ANY WARRANTY; without even the implied warranty of +  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU +  General Public License for more details. + +  You should have received a copy of the GNU General Public License +  along with this program.  If not, see +  <http://www.gnu.org/licenses/>. +*/ + +#ifndef _CONFIG_H +#define _CONFIG_H +#include "config.h" +#endif + +#include "io-cache.h" + + +/* + * str_to_ptr - convert a string to pointer + * @string: string + * + */ +void * +str_to_ptr (char *string) +{ +	void *ptr = (void *)strtoul (string, NULL, 16); +	return ptr; +} + + +/* + * ptr_to_str - convert a pointer to string + * @ptr: pointer + * + */ +char * +ptr_to_str (void *ptr) +{ +	char *str; +	asprintf (&str, "%p", ptr); +	return str; +} + +void +ioc_inode_wakeup (call_frame_t *frame, +		  ioc_inode_t *ioc_inode,  +		  struct stat *stbuf) +{ +	ioc_waitq_t *waiter = NULL, *waited = NULL; +	ioc_waitq_t *page_waitq = NULL; +	int8_t cache_still_valid = 1; +	ioc_local_t *local = frame->local; +	int8_t need_fault = 0; +	ioc_page_t *waiter_page = NULL; + +	ioc_inode_lock (ioc_inode); +	{ +		waiter = ioc_inode->waitq; +		ioc_inode->waitq = NULL; +	} +	ioc_inode_unlock (ioc_inode); + +	if (stbuf) +		cache_still_valid = ioc_cache_still_valid (ioc_inode, stbuf); +	else +		cache_still_valid = 0; + +	if (!waiter) { +		gf_log (frame->this->name, GF_LOG_DEBUG, +			"cache validate called without any " +			"page waiting to be validated"); +	} + +	while (waiter) { +		waiter_page = waiter->data; +		page_waitq = NULL; +     +		if (waiter_page) { +			if (cache_still_valid) { +				/* cache valid, wake up page */ +				ioc_inode_lock (ioc_inode); +				{ +					page_waitq =  +						ioc_page_wakeup (waiter_page); +				} +				ioc_inode_unlock (ioc_inode); +				if (page_waitq) +					ioc_waitq_return (page_waitq); +			} else { +				/* cache invalid, generate page fault and set  +				 * page->ready = 0, to avoid double faults   +				 */ +				ioc_inode_lock (ioc_inode); +	 +				if (waiter_page->ready) { +					waiter_page->ready = 0; +					need_fault = 1; +				} else { +					gf_log (frame->this->name,  +						GF_LOG_DEBUG, +						"validate frame(%p) is waiting" +						"for in-transit page = %p", +						frame, waiter_page); +				} +	 +				ioc_inode_unlock (ioc_inode); +       +				if (need_fault) { +					need_fault = 0; +					ioc_page_fault (ioc_inode, frame,  +							local->fd,  +							waiter_page->offset); +				} +			} +		} + +		waited = waiter; +		waiter = waiter->next; +     +		waited->data = NULL; +		free (waited); +	} +} + +/*  + * ioc_inode_update - create a new ioc_inode_t structure and add it to  + *                    the table table. fill in the fields which are derived  + *                    from inode_t corresponding to the file + *  + * @table: io-table structure + * @inode: inode structure + * + * not for external reference + */ +ioc_inode_t * +ioc_inode_update (ioc_table_t *table,  +		  inode_t *inode, +		  uint32_t weight) +{ +	ioc_inode_t *ioc_inode = CALLOC (1, sizeof (ioc_inode_t)); +	ERR_ABORT (ioc_inode); +   +	ioc_inode->table = table; +  +	/* initialize the list for pages */ +	INIT_LIST_HEAD (&ioc_inode->pages); +	INIT_LIST_HEAD (&ioc_inode->page_lru); + +	ioc_table_lock (table); + +	table->inode_count++; +	list_add (&ioc_inode->inode_list, &table->inodes); +	list_add_tail (&ioc_inode->inode_lru, &table->inode_lru[weight]); + +	gf_log (table->xl->name, +		GF_LOG_DEBUG, +		"adding to inode_lru[%d]", weight); + +	ioc_table_unlock (table); + +	pthread_mutex_init (&ioc_inode->inode_lock, NULL); +	ioc_inode->weight = weight; +   +	return ioc_inode; +} + + +/*  + * ioc_inode_destroy - destroy an ioc_inode_t object. + * + * @inode: inode to destroy + * + * to be called only from ioc_forget.  + */ +void +ioc_inode_destroy (ioc_inode_t *ioc_inode) +{ +	ioc_table_t *table = ioc_inode->table; + +	ioc_table_lock (table); +	table->inode_count--; +	list_del (&ioc_inode->inode_list); +	list_del (&ioc_inode->inode_lru); +	ioc_table_unlock (table); +   +	ioc_inode_flush (ioc_inode); + +	pthread_mutex_destroy (&ioc_inode->inode_lock); +	free (ioc_inode); +} + diff --git a/xlators/performance/io-cache/src/page.c b/xlators/performance/io-cache/src/page.c new file mode 100644 index 00000000000..e549f0bb547 --- /dev/null +++ b/xlators/performance/io-cache/src/page.c @@ -0,0 +1,778 @@ +/* +   Copyright (c) 2007, 2008, 2009 Z RESEARCH, Inc. <http://www.zresearch.com> +   This file is part of GlusterFS. + +   GlusterFS is free software; you can redistribute it and/or modify +   it under the terms of the GNU General Public License as published +   by the Free Software Foundation; either version 3 of the License, +   or (at your option) any later version. + +   GlusterFS is distributed in the hope that it will be useful, but +   WITHOUT ANY WARRANTY; without even the implied warranty of +   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU +   General Public License for more details. + +   You should have received a copy of the GNU General Public License +   along with this program.  If not, see +   <http://www.gnu.org/licenses/>. +*/ + +#ifndef _CONFIG_H +#define _CONFIG_H +#include "config.h" +#endif + +#include "glusterfs.h" +#include "logging.h" +#include "dict.h" +#include "xlator.h" +#include "io-cache.h" +#include <assert.h> +#include <sys/time.h> + +ioc_page_t * +ioc_page_get (ioc_inode_t *ioc_inode, +	      off_t offset) +{ +	int8_t       found = 0; +	ioc_page_t  *page = NULL; +	ioc_table_t *table = ioc_inode->table; +	off_t        rounded_offset = floor (offset, table->page_size); + +	if (list_empty (&ioc_inode->pages)) { +		return NULL; +	} + +	list_for_each_entry (page, &ioc_inode->pages, pages) { +		if (page->offset == rounded_offset) { +			found = 1; +			break; +		} +	} + +	/* was previously returning ioc_inode itself..,  +	 * 1st of its type and found one more downstairs :O */ +	if (!found){ +		page = NULL; +	} else { +		/* push the page to the end of the lru list */ +		list_move_tail (&page->page_lru, &ioc_inode->page_lru); +	} + +	return page; +} + + +/* + * ioc_page_destroy - + * + * @page: + * + */ +int64_t +ioc_page_destroy (ioc_page_t *page) +{ +	int64_t page_size = 0; + +	page_size = page->size; + +	if (page->waitq) { +		/* frames waiting on this page, do not destroy this page */ +		page_size = -1; +	} else { + +		list_del (&page->pages); +		list_del (&page->page_lru); +     +		gf_log (page->inode->table->xl->name, GF_LOG_DEBUG, +			"destroying page = %p, offset = %"PRId64" " +			"&& inode = %p", +			page, page->offset, page->inode); +     +		if (page->vector){ +			dict_unref (page->ref); +			free (page->vector); +			page->vector = NULL; +		} +     +		page->inode = NULL; +     +	} + +	if (page_size != -1) { +		pthread_mutex_destroy (&page->page_lock); +		free (page); +	} + +	return page_size; +} + +/* + * ioc_prune - prune the cache. we have a limit to the number of pages we + *             can have in-memory. + * + * @table: ioc_table_t of this translator + * + */ +int32_t +ioc_prune (ioc_table_t *table) +{ +	ioc_inode_t *curr = NULL, *next_ioc_inode = NULL; +	ioc_page_t *page = NULL, *next = NULL; +	int32_t ret = -1; +	int32_t index = 0; +	uint64_t size_to_prune = 0; +	uint64_t size_pruned = 0; + +	ioc_table_lock (table); +	{ +		size_to_prune = table->cache_used - table->cache_size; +		/* take out the least recently used inode */ +		for (index=0; index < table->max_pri; index++) { +			list_for_each_entry_safe (curr, next_ioc_inode,  +						  &table->inode_lru[index],  +						  inode_lru) { +				/* prune page-by-page for this inode, till  +				 * we reach the equilibrium */ +				ioc_inode_lock (curr); +				/* { */ + +				list_for_each_entry_safe (page, next,  +							  &curr->page_lru,  +							  page_lru) { +					/* done with all pages, and not  +					 * reached equilibrium yet?? +					 * continue with next inode in  +					 * lru_list */ +					size_pruned += page->size; +					ret = ioc_page_destroy (page); + +					if (ret != -1) +						table->cache_used -= ret; +	     +					gf_log (table->xl->name, +						GF_LOG_DEBUG, +						"index = %d && table->cache_" +						"used = %"PRIu64" && table->" +						"cache_size = %"PRIu64,  +						index, table->cache_used,  +						table->cache_size); +	     +					if (size_pruned >= size_to_prune) +						break; +				} /* list_for_each_entry_safe(page...) */ +				if (list_empty (&curr->pages)) { +					list_del_init (&curr->inode_lru); +				} + +				/* } */  +				ioc_inode_unlock (curr); +	 +				if (size_pruned >= size_to_prune) +					break; +			} /* list_for_each_entry_safe (curr...) */ +       +			if (size_pruned >= size_to_prune) +				break; +		} /* for(index=0;...) */ + +	} /* ioc_inode_table locked region end */ +	ioc_table_unlock (table); + +	return 0; +} + +/* + * ioc_page_create - create a new page.  + * + * @ioc_inode:  + * @offset: + * + */ +ioc_page_t * +ioc_page_create (ioc_inode_t *ioc_inode, +		 off_t offset) +{ +	ioc_table_t *table = ioc_inode->table; +	ioc_page_t *page = NULL; +	off_t rounded_offset = floor (offset, table->page_size); +	ioc_page_t *newpage = CALLOC (1, sizeof (*newpage)); +	ERR_ABORT (newpage); +   +	if (ioc_inode) +		table = ioc_inode->table; +	else { +		return NULL; +	} +    +	newpage->offset = rounded_offset; +	newpage->inode = ioc_inode; +	pthread_mutex_init (&newpage->page_lock, NULL); + +	list_add_tail (&newpage->page_lru, &ioc_inode->page_lru); +	list_add_tail (&newpage->pages, &ioc_inode->pages); + +	page = newpage; + +	gf_log ("io-cache", GF_LOG_DEBUG, +		"returning new page %p", page); +	return page; +} + +/*  + * ioc_wait_on_page - pause a frame to wait till the arrival of a page.  + * here we need to handle the case when the frame who calls wait_on_page  + * himself has caused page_fault  + * + * @page: page to wait on + * @frame: call frame who is waiting on page + * + */ +void +ioc_wait_on_page (ioc_page_t *page, +		  call_frame_t *frame, +		  off_t offset, +		  size_t size) +{ +	ioc_waitq_t *waitq = NULL; +	ioc_local_t *local = frame->local; + +	waitq = CALLOC (1, sizeof (*waitq)); +	ERR_ABORT (waitq); +   +	gf_log (frame->this->name, GF_LOG_DEBUG, +		"frame(%p) waiting on page = %p, offset=%"PRId64", " +		"size=%"GF_PRI_SIZET"", +		frame, page, offset, size); + +	waitq->data = frame; +	waitq->next = page->waitq; +	waitq->pending_offset = offset; +	waitq->pending_size = size; +	page->waitq = waitq; +	/* one frame can wait only once on a given page,  +	 * local->wait_count is number of pages a frame is waiting on */ +	ioc_local_lock (local); +	{ +		local->wait_count++; +	} +	ioc_local_unlock (local); +} + + +/* + * ioc_cache_still_valid - see if cached pages ioc_inode are still valid  + * against given stbuf + * + * @ioc_inode: + * @stbuf: + * + * assumes ioc_inode is locked + */ +int8_t +ioc_cache_still_valid (ioc_inode_t *ioc_inode, +		       struct stat *stbuf) +{ +	int8_t cache_still_valid = 1; +   +#if 0 +	if (!stbuf || (stbuf->st_mtime != ioc_inode->mtime) ||  +	    (stbuf->st_mtim.tv_nsec != ioc_inode->stbuf.st_mtim.tv_nsec)) +		cache_still_valid = 0; + +#else +	if (!stbuf || (stbuf->st_mtime != ioc_inode->mtime)) +		cache_still_valid = 0; + +#endif + +#if 0 +	/* talk with avati@zresearch.com to enable this section */ +	if (!ioc_inode->mtime && stbuf) { +		cache_still_valid = 1; +		ioc_inode->mtime = stbuf->st_mtime; +	} +#endif + +	return cache_still_valid; +} + + +void +ioc_waitq_return (ioc_waitq_t *waitq) +{ +	ioc_waitq_t  *trav   = NULL; +	ioc_waitq_t  *next   = NULL; +	call_frame_t *frame = NULL; + +	for (trav = waitq; trav; trav = next) { +		next = trav->next; + +		frame = trav->data; +		ioc_frame_return (frame); +		free (trav); +	} +} + + +int +ioc_fault_cbk (call_frame_t *frame, +	       void *cookie, +	       xlator_t *this, +	       int32_t op_ret, +	       int32_t op_errno, +	       struct iovec *vector, +	       int32_t count, +	       struct stat *stbuf) +{ +	ioc_local_t *local = frame->local; +	off_t offset = local->pending_offset; +	ioc_inode_t *ioc_inode = local->inode; +	ioc_table_t *table = ioc_inode->table; +	ioc_page_t *page = NULL; +	off_t trav_offset = 0; +	size_t payload_size = 0; +	int32_t destroy_size = 0; +	size_t page_size = 0; +	ioc_waitq_t *waitq = NULL; + +	trav_offset = offset;   +	payload_size = op_ret; + +	ioc_inode_lock (ioc_inode); +	{ +		if (op_ret == -1 ||  +		    (op_ret >= 0 &&  +		     !ioc_cache_still_valid(ioc_inode, stbuf))) { +			gf_log (ioc_inode->table->xl->name, GF_LOG_DEBUG, +				"cache for inode(%p) is invalid. flushing " +				"all pages", ioc_inode); +			destroy_size = __ioc_inode_flush (ioc_inode); +		}  +     +		if (op_ret >= 0) +			ioc_inode->mtime = stbuf->st_mtime; +     +		gettimeofday (&ioc_inode->tv, NULL); +     +		if (op_ret < 0) { +			/* error, readv returned -1 */ +			page = ioc_page_get (ioc_inode, offset); +			if (page) +				waitq = ioc_page_error (page, op_ret,  +							op_errno); +		} else { +			gf_log (ioc_inode->table->xl->name, GF_LOG_DEBUG, +				"op_ret = %d", op_ret); +			page = ioc_page_get (ioc_inode, offset); +			if (!page) { +				/* page was flushed */ +				/* some serious bug ? */ +				gf_log (this->name, GF_LOG_DEBUG, +					"wasted copy: %"PRId64"[+%"PRId64"] " +					"ioc_inode=%p", offset,  +					table->page_size, ioc_inode); +			} else { +				if (page->vector) { +					dict_unref (page->ref); +					free (page->vector); +					page->vector = NULL; +				} +	 +				/* keep a copy of the page for our cache */ +				page->vector = iov_dup (vector, count); +				page->count = count; +				if (frame->root->rsp_refs) { +					dict_ref (frame->root->rsp_refs); +					page->ref = frame->root->rsp_refs; +				} else { +					/* TODO: we have got a response to  +					 * our request and no data */ +					gf_log (this->name, GF_LOG_CRITICAL, +						"frame>root>rsp_refs is null"); +				} /* if(frame->root->rsp_refs) */ +	 +				/* page->size should indicate exactly how  +				 * much the readv call to the child +				 * translator returned. earlier op_ret  +				 * from child translator was used, which  +				 * gave rise to a bug where reads from  +				 * io-cached volume were resulting in 0  +				 * byte replies */ +				page_size = iov_length(vector, count); +	 +				page->size = page_size; + +				if (page->waitq) { +					/* wake up all the frames waiting on  +					 * this page, including  +					 * the frame which triggered fault */ +					waitq = ioc_page_wakeup (page); +				} /* if(page->waitq) */ +			} /* if(!page)...else */ +		} /* if(op_ret < 0)...else */ +	} /* ioc_inode locked region end */ +	ioc_inode_unlock (ioc_inode); + +	ioc_waitq_return (waitq); + +	if (page_size) { +		ioc_table_lock (table); +		{ +			table->cache_used += page_size; +		} +		ioc_table_unlock (table); +	} + +	if (destroy_size) { +		ioc_table_lock (table); +		{ +			table->cache_used -= destroy_size; +		} +		ioc_table_unlock (table); +	} + +	if (ioc_need_prune (ioc_inode->table)) { +		ioc_prune (ioc_inode->table); +	} + +	gf_log (this->name, GF_LOG_DEBUG, "fault frame %p returned", frame); +	pthread_mutex_destroy (&local->local_lock); + +	fd_unref (local->fd); + +	STACK_DESTROY (frame->root); +	return 0; +} + +/* + * ioc_page_fault - + *  + * @ioc_inode: + * @frame: + * @fd: + * @offset: + * + */ +void +ioc_page_fault (ioc_inode_t *ioc_inode, +		call_frame_t *frame, +		fd_t *fd, +		off_t offset) +{ +	ioc_table_t *table = ioc_inode->table; +	call_frame_t *fault_frame = copy_frame (frame); +	ioc_local_t *fault_local = CALLOC (1, sizeof (ioc_local_t)); +	ERR_ABORT (fault_local); + +	/* NOTE: copy_frame() means, the frame the fop whose fd_ref we  +	 * are using till now won't be valid till we get reply from server.  +	 * we unref this fd, in fault_cbk */ +	fault_local->fd = fd_ref (fd); + +	fault_frame->local = fault_local; +	pthread_mutex_init (&fault_local->local_lock, NULL); + +	INIT_LIST_HEAD (&fault_local->fill_list); +	fault_local->pending_offset = offset; +	fault_local->pending_size = table->page_size; +	fault_local->inode = ioc_inode; + +	gf_log (frame->this->name, GF_LOG_DEBUG, +		"stack winding page fault for offset = %"PRId64" with " +		"frame %p", offset, fault_frame); +   +	STACK_WIND (fault_frame, ioc_fault_cbk, +		    FIRST_CHILD(fault_frame->this), +		    FIRST_CHILD(fault_frame->this)->fops->readv, +		    fd, table->page_size, offset); +	return; +} + +void +ioc_frame_fill (ioc_page_t *page, +		call_frame_t *frame, +		off_t offset, +		size_t size) +{ +	ioc_local_t *local = frame->local; +	ioc_fill_t *fill = NULL; +	off_t src_offset = 0; +	off_t dst_offset = 0; +	ssize_t copy_size = 0; +	ioc_inode_t *ioc_inode = page->inode; +   +	gf_log (frame->this->name, GF_LOG_DEBUG, +		"frame (%p) offset = %"PRId64" && size = %"GF_PRI_SIZET" " +		"&& page->size = %"GF_PRI_SIZET" && wait_count = %d",  +		frame, offset, size, page->size, local->wait_count); + +	/* immediately move this page to the end of the page_lru list */ +	list_move_tail (&page->page_lru, &ioc_inode->page_lru); +	/* fill local->pending_size bytes from local->pending_offset */ +	if (local->op_ret != -1 && page->size) { +		if (offset > page->offset) +			/* offset is offset in file, convert it to offset in  +			 * page */ +			src_offset = offset - page->offset; +		/*FIXME: since offset is the offset within page is the  +		 * else case valid? */ +		else +			/* local->pending_offset is in previous page. do not +			 * fill until we have filled all previous pages */ +			dst_offset = page->offset - offset; + +		/* we have to copy from offset to either end of this page  +		 * or till the requested size */ +		copy_size = min (page->size - src_offset, +				 size - dst_offset); + +		if (copy_size < 0) { +			/* if page contains fewer bytes and the required offset +			   is beyond the page size in the page */ +			copy_size = src_offset = 0; +		} +     +		gf_log (page->inode->table->xl->name, GF_LOG_DEBUG, +			"copy_size = %"GF_PRI_SIZET" && src_offset = " +			"%"PRId64" && dst_offset = %"PRId64"", +			copy_size, src_offset, dst_offset); + +		{ +			ioc_fill_t *new = CALLOC (1, sizeof (*new)); +			ERR_ABORT (new); +			new->offset = page->offset; +			new->size = copy_size; +			new->refs = dict_ref (page->ref); +			new->count = iov_subset (page->vector, +						 page->count, +						 src_offset, +						 src_offset + copy_size, +						 NULL); +			new->vector = CALLOC (new->count,  +					      sizeof (struct iovec)); +			ERR_ABORT (new->vector); +			new->count = iov_subset (page->vector, +						 page->count, +						 src_offset, +						 src_offset + copy_size, +						 new->vector); + + + +			/* add the ioc_fill to fill_list for this frame */ +			if (list_empty (&local->fill_list)) { +				/* if list is empty, then this is the first  +				 * time we are filling frame, add the  +				 * ioc_fill_t to the end of list */ +				list_add_tail (&new->list, &local->fill_list); +			} else { +				int8_t found = 0; +				/* list is not empty, we need to look for  +				 * where this offset fits in list */ +				list_for_each_entry (fill, &local->fill_list,  +						     list) { +					if (fill->offset > new->offset) { +						found = 1; +						break; +					} +				} + +				if (found) { +					found = 0; +					list_add_tail (&new->list,  +						       &fill->list); +				} else { +					list_add_tail (&new->list,  +						       &local->fill_list); +				} +			} +		} +		local->op_ret += copy_size; +	} +} + +/* + * ioc_frame_unwind - frame unwinds only from here  + * + * @frame: call frame to unwind + * + * to be used only by ioc_frame_return(), when a frame has + * finished waiting on all pages, required + * + */ +static void +ioc_frame_unwind (call_frame_t *frame) +{ +	ioc_local_t *local = frame->local; +	ioc_fill_t *fill = NULL, *next = NULL; +	int32_t count = 0; +	struct iovec *vector = NULL; +	int32_t copied = 0; +	dict_t *refs = NULL; +	struct stat stbuf = {0,}; +	int32_t op_ret = 0; + +	//  ioc_local_lock (local); +	refs = get_new_dict (); + +	frame->local = NULL; + +	if (list_empty (&local->fill_list)) { +		gf_log (frame->this->name, GF_LOG_DEBUG, +			"frame(%p) has 0 entries in local->fill_list " +			"(offset = %"PRId64" && size = %"GF_PRI_SIZET")", +			frame, local->offset, local->size); +	} + +	list_for_each_entry (fill, &local->fill_list, list) { +		count += fill->count; +	} + +	vector = CALLOC (count, sizeof (*vector)); +	ERR_ABORT (vector); +   +	list_for_each_entry_safe (fill, next, &local->fill_list, list) { +		memcpy (((char *)vector) + copied, +			fill->vector, +			fill->count * sizeof (*vector)); +     +		copied += (fill->count * sizeof (*vector)); + +		dict_copy (fill->refs, refs); + +		list_del (&fill->list); +		dict_unref (fill->refs); +		free (fill->vector); +		free (fill); +	} +   +	frame->root->rsp_refs = dict_ref (refs); +   +	op_ret = iov_length (vector, count); +	gf_log (frame->this->name, GF_LOG_DEBUG, +		"frame(%p) unwinding with op_ret=%d", frame, op_ret); + +	//  ioc_local_unlock (local); + +	STACK_UNWIND (frame, +		      op_ret, +		      local->op_errno, +		      vector, +		      count, +		      &stbuf); + +	dict_unref (refs); +     +	pthread_mutex_destroy (&local->local_lock); +	free (local); +	free (vector); + +	return; +} + +/* + * ioc_frame_return - + * @frame: + * + * to be called only when a frame is waiting on an in-transit page + */ +void +ioc_frame_return (call_frame_t *frame) +{ +	ioc_local_t *local = frame->local; +	int32_t wait_count; +	assert (local->wait_count > 0); + +	ioc_local_lock (local); +	{ +		wait_count = --local->wait_count; +	} +	ioc_local_unlock (local); + +	if (!wait_count) { +		ioc_frame_unwind (frame); +	}  + +	return; +} + +/*  + * ioc_page_wakeup - + * @page: + * + * to be called only when a frame is waiting on an in-transit page + */ +ioc_waitq_t * +ioc_page_wakeup (ioc_page_t *page) +{ +	ioc_waitq_t *waitq = NULL, *trav = NULL; +	call_frame_t *frame = NULL; + +	waitq = page->waitq; +	page->waitq = NULL; + +	trav = waitq; +	page->ready = 1; + +	gf_log (page->inode->table->xl->name, GF_LOG_DEBUG, +		"page is %p && waitq = %p", page, waitq); +   +	for (trav = waitq; trav; trav = trav->next) { +		frame = trav->data;  +		ioc_frame_fill (page, frame, trav->pending_offset,  +				trav->pending_size); +	} +	 +	return waitq; +} + + +/* + * ioc_page_error - + * @page: + * @op_ret: + * @op_errno: + * + */ +ioc_waitq_t * +ioc_page_error (ioc_page_t *page, +		int32_t op_ret, +		int32_t op_errno) +{ +	ioc_waitq_t *waitq = NULL, *trav = NULL; +	call_frame_t *frame = NULL; +	int64_t ret = 0; +	ioc_table_t *table = NULL; +	ioc_local_t *local = NULL; + +	waitq = page->waitq; +	page->waitq = NULL; +   +	gf_log (page->inode->table->xl->name, GF_LOG_DEBUG, +		"page error for page = %p & waitq = %p", page, waitq); + +	for (trav = waitq; trav; trav = trav->next) { + +		frame = trav->data; + +		local = frame->local; +		ioc_local_lock (local); +		{ +			if (local->op_ret != -1) { +				local->op_ret = op_ret; +				local->op_errno = op_errno; +			} +		} +		ioc_local_unlock (local); +	} + +	table = page->inode->table; +	ret = ioc_page_destroy (page); + +	if (ret != -1) { +		table->cache_used -= ret; +	} + +	return waitq; +} diff --git a/xlators/performance/io-threads/Makefile.am b/xlators/performance/io-threads/Makefile.am new file mode 100644 index 00000000000..d471a3f9243 --- /dev/null +++ b/xlators/performance/io-threads/Makefile.am @@ -0,0 +1,3 @@ +SUBDIRS = src + +CLEANFILES =  diff --git a/xlators/performance/io-threads/src/Makefile.am b/xlators/performance/io-threads/src/Makefile.am new file mode 100644 index 00000000000..38dea3eb7fc --- /dev/null +++ b/xlators/performance/io-threads/src/Makefile.am @@ -0,0 +1,14 @@ +xlator_LTLIBRARIES = io-threads.la +xlatordir = $(libdir)/glusterfs/$(PACKAGE_VERSION)/xlator/performance + +io_threads_la_LDFLAGS = -module -avoidversion  + +io_threads_la_SOURCES = io-threads.c +io_threads_la_LIBADD = $(top_builddir)/libglusterfs/src/libglusterfs.la + +noinst_HEADERS = io-threads.h + +AM_CFLAGS = -fPIC -D_FILE_OFFSET_BITS=64 -D_GNU_SOURCE -Wall -D$(GF_HOST_OS)\ +	-I$(top_srcdir)/libglusterfs/src -shared -nostartfiles $(GF_CFLAGS) + +CLEANFILES =  diff --git a/xlators/performance/io-threads/src/io-threads.c b/xlators/performance/io-threads/src/io-threads.c new file mode 100644 index 00000000000..5acdd627da4 --- /dev/null +++ b/xlators/performance/io-threads/src/io-threads.c @@ -0,0 +1,1254 @@ +/* +  Copyright (c) 2006, 2007, 2008, 2009 Z RESEARCH, Inc. <http://www.zresearch.com> +  This file is part of GlusterFS. + +  GlusterFS is free software; you can redistribute it and/or modify +  it under the terms of the GNU General Public License as published +  by the Free Software Foundation; either version 3 of the License, +  or (at your option) any later version. + +  GlusterFS is distributed in the hope that it will be useful, but +  WITHOUT ANY WARRANTY; without even the implied warranty of +  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU +  General Public License for more details. + +  You should have received a copy of the GNU General Public License +  along with this program.  If not, see +  <http://www.gnu.org/licenses/>. +*/ + +#ifndef _CONFIG_H +#define _CONFIG_H +#include "config.h" +#endif + +#include "call-stub.h" +#include "glusterfs.h" +#include "logging.h" +#include "dict.h" +#include "xlator.h" +#include "io-threads.h" + +static void +iot_queue (iot_worker_t *worker, +           call_stub_t *stub); + +static call_stub_t * +iot_dequeue (iot_worker_t *worker); + +static iot_worker_t *  +iot_schedule (iot_conf_t *conf, +              iot_file_t *file, +              ino_t ino) +{ +	int32_t cnt = (ino % conf->thread_count); +	iot_worker_t *trav = conf->workers.next; + +	for (; cnt; cnt--) +		trav = trav->next; +   +	if (file) +		file->worker = trav; +	trav->fd_count++; +	return trav; +} + +int32_t +iot_open_cbk (call_frame_t *frame, +              void *cookie, +              xlator_t *this, +              int32_t op_ret, +              int32_t op_errno, +              fd_t *fd) +{ +	iot_conf_t *conf = this->private; + +	if (op_ret >= 0) { +		iot_file_t *file = CALLOC (1, sizeof (*file)); +		ERR_ABORT (file); + +		iot_schedule (conf, file, fd->inode->ino); +		file->fd = fd; + +		fd_ctx_set (fd, this, (uint64_t)(long)file); + +		pthread_mutex_lock (&conf->files_lock); +		file->next = &conf->files; +		file->prev = file->next->prev; +		file->next->prev = file; +		file->prev->next = file; +		pthread_mutex_unlock (&conf->files_lock); +	} +	STACK_UNWIND (frame, op_ret, op_errno, fd); +	return 0; +} + +int32_t +iot_open (call_frame_t *frame, +          xlator_t *this, +          loc_t *loc, +          int32_t flags, +	  fd_t *fd) +{ +	STACK_WIND (frame, +		    iot_open_cbk, +		    FIRST_CHILD(this), +		    FIRST_CHILD(this)->fops->open, +		    loc, +		    flags, +		    fd); +	return 0; +} + + +int32_t +iot_create_cbk (call_frame_t *frame, +		void *cookie, +		xlator_t *this, +		int32_t op_ret, +		int32_t op_errno, +		fd_t *fd, +		inode_t *inode, +		struct stat *stbuf) +{ +	iot_conf_t *conf = this->private; + +	if (op_ret >= 0) { +		iot_file_t *file = CALLOC (1, sizeof (*file)); +		ERR_ABORT (file); + +		iot_schedule (conf, file, fd->inode->ino); +		file->fd = fd; + +		fd_ctx_set (fd, this, (uint64_t)(long)file); + +		pthread_mutex_lock (&conf->files_lock); +		file->next = &conf->files; +		file->prev = file->next->prev; +		file->next->prev = file; +		file->prev->next = file; +		pthread_mutex_unlock (&conf->files_lock); +	} +	STACK_UNWIND (frame, op_ret, op_errno, fd, inode, stbuf); +	return 0; +} + +int32_t +iot_create (call_frame_t *frame, +            xlator_t *this, +	    loc_t *loc, +            int32_t flags, +            mode_t mode, +	    fd_t *fd) +{ +	STACK_WIND (frame, +		    iot_create_cbk, +		    FIRST_CHILD(this), +		    FIRST_CHILD(this)->fops->create, +		    loc, +		    flags, +		    mode, +		    fd); +	return 0; +} + + + +int32_t +iot_readv_cbk (call_frame_t *frame, +               void *cookie, +               xlator_t *this, +               int32_t op_ret, +               int32_t op_errno, +               struct iovec *vector, +               int32_t count, +	       struct stat *stbuf) +{ +	iot_local_t *local = frame->local; + +	local->frame_size = 0; //iov_length (vector, count); + +	STACK_UNWIND (frame, op_ret, op_errno, vector, count, stbuf); + +	return 0; +} + +static int32_t +iot_readv_wrapper (call_frame_t *frame, +                   xlator_t *this, +                   fd_t *fd, +                   size_t size, +                   off_t offset) +{ +	STACK_WIND (frame, +		    iot_readv_cbk, +		    FIRST_CHILD(this), +		    FIRST_CHILD(this)->fops->readv, +		    fd, +		    size, +		    offset); +	return 0; +} + +int32_t +iot_readv (call_frame_t *frame, +           xlator_t *this, +           fd_t *fd, +           size_t size, +           off_t offset) +{ +	call_stub_t *stub; +	iot_local_t *local = NULL; +	iot_file_t *file = NULL; +	iot_worker_t *worker = NULL; +	uint64_t tmp_file = 0; + +	if (fd_ctx_get (fd, this, &tmp_file)) { +		gf_log (this->name, GF_LOG_ERROR,  +			"fd context is NULL, returning EBADFD"); +		STACK_UNWIND (frame, -1, EBADFD); +		return 0; +	} + +	file = (iot_file_t *)(long)tmp_file; +	worker = file->worker; + +	local = CALLOC (1, sizeof (*local)); +	ERR_ABORT (local); +	frame->local = local; +   +	stub = fop_readv_stub (frame,  +			       iot_readv_wrapper, +			       fd, +			       size, +			       offset); +	if (!stub) { +		gf_log (this->name, GF_LOG_ERROR,  +			"cannot get readv call stub"); +		STACK_UNWIND (frame, -1, ENOMEM, NULL, 0); +		return 0; +	} + +	iot_queue (worker, stub); + +	return 0; +} + +int32_t +iot_flush_cbk (call_frame_t *frame, +               void *cookie, +               xlator_t *this, +               int32_t op_ret, +               int32_t op_errno) +{ +	STACK_UNWIND (frame, op_ret, op_errno); +	return 0; +} + +static int32_t +iot_flush_wrapper (call_frame_t *frame, +                   xlator_t *this, +                   fd_t *fd) +{ +	STACK_WIND (frame, +		    iot_flush_cbk, +		    FIRST_CHILD(this), +		    FIRST_CHILD(this)->fops->flush, +		    fd); +	return 0; +} + +int32_t +iot_flush (call_frame_t *frame, +           xlator_t *this, +           fd_t *fd) +{ +	call_stub_t *stub; +	iot_local_t *local = NULL; +	iot_file_t *file = NULL; +	iot_worker_t *worker = NULL; +	uint64_t tmp_file = 0; + +	if (fd_ctx_get (fd, this, &tmp_file)) { +		gf_log (this->name, GF_LOG_ERROR,  +			"fd context is NULL, returning EBADFD"); +		STACK_UNWIND (frame, -1, EBADFD); +		return 0; +	} + +	file = (iot_file_t *)(long)tmp_file; +	worker = file->worker; + +	local = CALLOC (1, sizeof (*local)); +	ERR_ABORT (local); + +	frame->local = local; +   +	stub = fop_flush_stub (frame, +			       iot_flush_wrapper, +			       fd); +	if (!stub) { +		gf_log (this->name, GF_LOG_ERROR, "cannot get flush_cbk call stub"); +		STACK_UNWIND (frame, -1, ENOMEM); +		return 0; +	} +	iot_queue (worker, stub); + +	return 0; +} + +int32_t +iot_fsync_cbk (call_frame_t *frame, +               void *cookie, +               xlator_t *this, +               int32_t op_ret, +               int32_t op_errno) +{ +	STACK_UNWIND (frame, op_ret, op_errno); +	return 0; +} + +static int32_t +iot_fsync_wrapper (call_frame_t *frame, +                   xlator_t *this, +                   fd_t *fd, +                   int32_t datasync) +{ +	STACK_WIND (frame, +		    iot_fsync_cbk, +		    FIRST_CHILD (this), +		    FIRST_CHILD (this)->fops->fsync, +		    fd, +		    datasync); +	return 0; +} + +int32_t +iot_fsync (call_frame_t *frame, +           xlator_t *this, +           fd_t *fd, +           int32_t datasync) +{ +	call_stub_t *stub; +	iot_local_t *local = NULL; +	iot_file_t *file = NULL; +	iot_worker_t *worker = NULL; +	uint64_t tmp_file = 0; + +	if (fd_ctx_get (fd, this, &tmp_file)) { +		gf_log (this->name, GF_LOG_ERROR,  +			"fd context is NULL, returning EBADFD"); +		STACK_UNWIND (frame, -1, EBADFD); +		return 0; +	} + +	file = (iot_file_t *)(long)tmp_file; +	worker = file->worker; + +	local = CALLOC (1, sizeof (*local)); +	ERR_ABORT (local); + +	frame->local = local; +   +	stub = fop_fsync_stub (frame, +			       iot_fsync_wrapper, +			       fd, +			       datasync); +	if (!stub) { +		gf_log (this->name, GF_LOG_ERROR, "cannot get fsync_cbk call stub"); +		STACK_UNWIND (frame, -1, ENOMEM); +		return 0; +	} +	iot_queue (worker, stub); + +	return 0; +} + +int32_t +iot_writev_cbk (call_frame_t *frame, +                void *cookie, +                xlator_t *this, +                int32_t op_ret, +                int32_t op_errno, +		struct stat *stbuf) +{ +	iot_local_t *local = frame->local; + +	local->frame_size = 0; /* hehe, caught me! */ + +	STACK_UNWIND (frame, op_ret, op_errno, stbuf); +	return 0; +} + +static int32_t +iot_writev_wrapper (call_frame_t *frame, +                    xlator_t *this, +                    fd_t *fd, +                    struct iovec *vector, +                    int32_t count, +                    off_t offset) +{ +	STACK_WIND (frame, +		    iot_writev_cbk, +		    FIRST_CHILD(this), +		    FIRST_CHILD(this)->fops->writev, +		    fd, +		    vector, +		    count, +		    offset); +	return 0; +} + +int32_t +iot_writev (call_frame_t *frame, +            xlator_t *this, +            fd_t *fd, +            struct iovec *vector, +            int32_t count, +            off_t offset) +{ +	call_stub_t *stub; +	iot_local_t *local = NULL; +	iot_file_t *file = NULL; +	iot_worker_t *worker = NULL; +	uint64_t tmp_file = 0; + +	if (fd_ctx_get (fd, this, &tmp_file)) { +		gf_log (this->name, GF_LOG_ERROR,  +			"fd context is NULL, returning EBADFD"); +		STACK_UNWIND (frame, -1, EBADFD); +		return 0; +	} + +	file = (iot_file_t *)(long)tmp_file; +	worker = file->worker; + +	local = CALLOC (1, sizeof (*local)); +	ERR_ABORT (local); + +	if (frame->root->req_refs) +		local->frame_size = dict_serialized_length (frame->root->req_refs); +	else +		local->frame_size = iov_length (vector, count); +	frame->local = local; +   +	stub = fop_writev_stub (frame, iot_writev_wrapper, +				fd, vector, count, offset); + +	if (!stub) { +		gf_log (this->name, GF_LOG_ERROR, "cannot get writev call stub"); +		STACK_UNWIND (frame, -1, ENOMEM); +		return 0; +	} + +	iot_queue (worker, stub); + +	return 0; +} + + +int32_t +iot_lk_cbk (call_frame_t *frame, +            void *cookie, +            xlator_t *this, +            int32_t op_ret, +            int32_t op_errno, +            struct flock *flock) +{ +	STACK_UNWIND (frame, op_ret, op_errno, flock); +	return 0; +} + + +static int32_t +iot_lk_wrapper (call_frame_t *frame, +                xlator_t *this, +                fd_t *fd, +                int32_t cmd, +                struct flock *flock) +{ +	STACK_WIND (frame, +		    iot_lk_cbk, +		    FIRST_CHILD(this), +		    FIRST_CHILD(this)->fops->lk, +		    fd, +		    cmd, +		    flock); +	return 0; +} + + +int32_t +iot_lk (call_frame_t *frame, +	xlator_t *this, +	fd_t *fd, +	int32_t cmd, +	struct flock *flock) +{ +	call_stub_t *stub; +	iot_local_t *local = NULL; +	iot_file_t *file = NULL; +	iot_worker_t *worker = NULL; +	uint64_t tmp_file = 0; + +	if (fd_ctx_get (fd, this, &tmp_file)) { +		gf_log (this->name, GF_LOG_ERROR,  +			"fd context is NULL, returning EBADFD"); +		STACK_UNWIND (frame, -1, EBADFD); +		return 0; +	} + +	file = (iot_file_t *)(long)tmp_file; +	worker = file->worker; + +	local = CALLOC (1, sizeof (*local)); +	ERR_ABORT (local); +	frame->local = local; + +	stub = fop_lk_stub (frame, iot_lk_wrapper, +			    fd, cmd, flock); + +	if (!stub) { +		gf_log (this->name, GF_LOG_ERROR, "cannot get fop_lk call stub"); +		STACK_UNWIND (frame, -1, ENOMEM, NULL); +		return 0; +	} +     +	iot_queue (worker, stub); + +	return 0; +} + + +int32_t  +iot_stat_cbk (call_frame_t *frame, +              void *cookie, +              xlator_t *this, +              int32_t op_ret, +              int32_t op_errno, +              struct stat *buf) +{ +	STACK_UNWIND (frame, op_ret, op_errno, buf); +	return 0; +} + + +static int32_t  +iot_stat_wrapper (call_frame_t *frame, +                  xlator_t *this, +                  loc_t *loc) +{ +	STACK_WIND (frame, +		    iot_stat_cbk, +		    FIRST_CHILD(this), +		    FIRST_CHILD(this)->fops->stat, +		    loc); +	return 0; +} + +int32_t  +iot_stat (call_frame_t *frame, +          xlator_t *this, +          loc_t *loc) +{ +	call_stub_t *stub; +	iot_local_t *local = NULL; +	iot_worker_t *worker = NULL; +	iot_conf_t *conf; +	fd_t *fd = NULL; + +	conf = this->private; + +	local = CALLOC (1, sizeof (*local)); +	ERR_ABORT (local); +	frame->local = local; + +	fd = fd_lookup (loc->inode, frame->root->pid); + +	if (fd == NULL) { +		STACK_WIND(frame, +			   iot_stat_cbk, +			   FIRST_CHILD(this), +			   FIRST_CHILD(this)->fops->stat, +			   loc); +		return 0; +	}  +   +	fd_unref (fd); + +	worker = iot_schedule (conf, NULL, loc->inode->ino); + +	stub = fop_stat_stub (frame, +			      iot_stat_wrapper, +			      loc); +	if (!stub) { +		gf_log (this->name, GF_LOG_ERROR, "cannot get fop_stat call stub"); +		STACK_UNWIND (frame, -1, ENOMEM, NULL); +		return 0; +	} +	iot_queue (worker, stub); + +	return 0; +} + + +int32_t  +iot_fstat_cbk (call_frame_t *frame, +               void *cookie, +               xlator_t *this, +               int32_t op_ret, +               int32_t op_errno, +               struct stat *buf) +{ +	STACK_UNWIND (frame, op_ret, op_errno, buf); +	return 0; +} + +static int32_t  +iot_fstat_wrapper (call_frame_t *frame, +                   xlator_t *this, +                   fd_t *fd) +{ +	STACK_WIND (frame, +		    iot_fstat_cbk, +		    FIRST_CHILD(this), +		    FIRST_CHILD(this)->fops->fstat, +		    fd); +	return 0; +} + +int32_t  +iot_fstat (call_frame_t *frame, +           xlator_t *this, +           fd_t *fd) +{ +	call_stub_t *stub; +	iot_local_t *local = NULL; +	iot_file_t *file = NULL; +	iot_worker_t *worker = NULL; +	uint64_t tmp_file = 0; + +	if (fd_ctx_get (fd, this, &tmp_file)) { +		gf_log (this->name, GF_LOG_ERROR,  +			"fd context is NULL, returning EBADFD"); +		STACK_UNWIND (frame, -1, EBADFD); +		return 0; +	} + +	file = (iot_file_t *)(long)tmp_file; +	worker = file->worker; + +	local = CALLOC (1, sizeof (*local)); +	ERR_ABORT (local); +	frame->local = local; +	stub = fop_fstat_stub (frame, +			       iot_fstat_wrapper, +			       fd); +	if (!stub) { +		gf_log (this->name, GF_LOG_ERROR, "cannot get fop_fstat call stub"); +		STACK_UNWIND (frame, -1, ENOMEM, NULL); +		return 0; +	} + +	iot_queue (worker, stub); + +	return 0; +} + +int32_t  +iot_truncate_cbk (call_frame_t *frame, +                  void *cookie, +                  xlator_t *this, +                  int32_t op_ret, +                  int32_t op_errno, +                  struct stat *buf) +{ +	STACK_UNWIND (frame, op_ret, op_errno, buf); +	return 0; +} + +static int32_t  +iot_truncate_wrapper (call_frame_t *frame, +                      xlator_t *this, +                      loc_t *loc, +                      off_t offset) +{ +	STACK_WIND (frame, +		    iot_truncate_cbk, +		    FIRST_CHILD(this), +		    FIRST_CHILD(this)->fops->truncate, +		    loc, +		    offset); +	return 0; +} + +int32_t  +iot_truncate (call_frame_t *frame, +              xlator_t *this, +              loc_t *loc, +              off_t offset) +{ +	call_stub_t *stub; +	iot_local_t *local = NULL; +	iot_worker_t *worker = NULL; +	iot_conf_t *conf; +	fd_t *fd = NULL; +   +	conf = this->private; +	local = CALLOC (1, sizeof (*local)); +	ERR_ABORT (local); +	frame->local = local; + +	fd = fd_lookup (loc->inode, frame->root->pid); + +	if (fd == NULL) { +		STACK_WIND(frame, +			   iot_truncate_cbk, +			   FIRST_CHILD(this), +			   FIRST_CHILD(this)->fops->truncate, +			   loc, +			   offset); +		return 0; +	}  +   +	fd_unref (fd); + +	worker = iot_schedule (conf, NULL, loc->inode->ino); + +	stub = fop_truncate_stub (frame, +				  iot_truncate_wrapper, +				  loc, +				  offset); +	if (!stub) { +		gf_log (this->name, GF_LOG_ERROR, "cannot get fop_stat call stub"); +		STACK_UNWIND (frame, -1, ENOMEM, NULL); +		return 0; +	} +	iot_queue (worker, stub); + +	return 0; +} + +int32_t  +iot_ftruncate_cbk (call_frame_t *frame, +                   void *cookie, +                   xlator_t *this, +                   int32_t op_ret, +                   int32_t op_errno, +                   struct stat *buf) +{ +	STACK_UNWIND (frame, op_ret, op_errno, buf); +	return 0; +} + +static int32_t  +iot_ftruncate_wrapper (call_frame_t *frame, +                       xlator_t *this, +                       fd_t *fd, +                       off_t offset) +{ +	STACK_WIND (frame, +		    iot_ftruncate_cbk, +		    FIRST_CHILD(this), +		    FIRST_CHILD(this)->fops->ftruncate, +		    fd, +		    offset); +	return 0; +} + +int32_t  +iot_ftruncate (call_frame_t *frame, +               xlator_t *this, +               fd_t *fd, +               off_t offset) +{ +	call_stub_t *stub; +	iot_local_t *local = NULL; +	iot_file_t *file = NULL; +	iot_worker_t *worker = NULL; +	uint64_t tmp_file = 0; + +	if (fd_ctx_get (fd, this, &tmp_file)) { +		gf_log (this->name, GF_LOG_ERROR,  +			"fd context is NULL, returning EBADFD"); +		STACK_UNWIND (frame, -1, EBADFD); +		return 0; +	} + +	file = (iot_file_t *)(long)tmp_file; +	worker = file->worker; + +	local = CALLOC (1, sizeof (*local)); +	ERR_ABORT (local); +	frame->local = local; + +	stub = fop_ftruncate_stub (frame, +				   iot_ftruncate_wrapper, +				   fd, +				   offset); +	if (!stub) { +		gf_log (this->name, GF_LOG_ERROR, "cannot get fop_ftruncate call stub"); +		STACK_UNWIND (frame, -1, ENOMEM, NULL); +		return 0; +	} +	iot_queue (worker, stub); + +	return 0; +} + +int32_t  +iot_utimens_cbk (call_frame_t *frame, +                 void *cookie, +                 xlator_t *this, +                 int32_t op_ret, +                 int32_t op_errno, +                 struct stat *buf) +{ +	STACK_UNWIND (frame, op_ret, op_errno, buf); +	return 0; +} + +static int32_t  +iot_utimens_wrapper (call_frame_t *frame, +                     xlator_t *this, +                     loc_t *loc, +                     struct timespec tv[2]) +{ +	STACK_WIND (frame, +		    iot_utimens_cbk, +		    FIRST_CHILD(this), +		    FIRST_CHILD(this)->fops->utimens, +		    loc, +		    tv); +   +	return 0; +} + +int32_t  +iot_utimens (call_frame_t *frame, +             xlator_t *this, +             loc_t *loc, +             struct timespec tv[2]) +{ +	call_stub_t *stub; +	iot_local_t *local = NULL; +	iot_worker_t *worker = NULL; +	iot_conf_t *conf; +	fd_t *fd = NULL; +   +	conf = this->private; + +	local = CALLOC (1, sizeof (*local)); +	ERR_ABORT (local); +	frame->local = local; +   +	fd = fd_lookup (loc->inode, frame->root->pid); + +	if (fd == NULL) { +		STACK_WIND(frame, +			   iot_utimens_cbk, +			   FIRST_CHILD(this), +			   FIRST_CHILD(this)->fops->utimens, +			   loc, +			   tv); +		return 0; +	}  +   +	fd_unref (fd); + +	worker = iot_schedule (conf, NULL, loc->inode->ino); + +	stub = fop_utimens_stub (frame, +				 iot_utimens_wrapper, +				 loc, +				 tv); +	if (!stub) { +		gf_log (this->name, GF_LOG_ERROR, "cannot get fop_utimens call stub"); +		STACK_UNWIND (frame, -1, ENOMEM, NULL); +		return 0; +	} +	iot_queue (worker, stub); + +	return 0; +} + + +int32_t  +iot_checksum_cbk (call_frame_t *frame, +		  void *cookie, +		  xlator_t *this, +		  int32_t op_ret, +		  int32_t op_errno, +		  uint8_t *file_checksum, +		  uint8_t *dir_checksum) +{ +	STACK_UNWIND (frame, op_ret, op_errno, file_checksum, dir_checksum); +	return 0; +} + +static int32_t  +iot_checksum_wrapper (call_frame_t *frame, +		      xlator_t *this, +		      loc_t *loc, +		      int32_t flags) +{ +	STACK_WIND (frame, +		    iot_checksum_cbk, +		    FIRST_CHILD(this), +		    FIRST_CHILD(this)->fops->checksum, +		    loc, +		    flags); +   +	return 0; +} + +int32_t  +iot_checksum (call_frame_t *frame, +	      xlator_t *this, +	      loc_t *loc, +	      int32_t flags) +{ +	call_stub_t *stub = NULL; +	iot_local_t *local = NULL; +	iot_worker_t *worker = NULL; +	iot_conf_t *conf = NULL; +   +	conf = this->private; + +	local = CALLOC (1, sizeof (*local)); +	frame->local = local; + +	worker = iot_schedule (conf, NULL, conf->misc_thread_index++); + +	stub = fop_checksum_stub (frame, +				  iot_checksum_wrapper, +				  loc, +				  flags); +	if (!stub) { +		gf_log (this->name, GF_LOG_ERROR, "cannot get fop_checksum call stub"); +		STACK_UNWIND (frame, -1, ENOMEM, NULL, NULL); +		return 0; +	} +	iot_queue (worker, stub); + +	return 0; +} + + +int32_t  +iot_unlink_cbk (call_frame_t *frame, +		void *cookie, +		xlator_t *this, +		int32_t op_ret, +		int32_t op_errno) +{ +	STACK_UNWIND (frame, op_ret, op_errno); +	return 0; +} + +static int32_t  +iot_unlink_wrapper (call_frame_t *frame, +		    xlator_t *this, +		    loc_t *loc) +{ +	STACK_WIND (frame, +		    iot_unlink_cbk, +		    FIRST_CHILD(this), +		    FIRST_CHILD(this)->fops->unlink, +		    loc); +   +	return 0; +} + +int32_t  +iot_unlink (call_frame_t *frame, +	    xlator_t *this, +	    loc_t *loc) +{ +	call_stub_t *stub = NULL; +	iot_local_t *local = NULL; +	iot_worker_t *worker = NULL; +	iot_conf_t *conf = NULL; + +	conf = this->private; + +	local = CALLOC (1, sizeof (*local)); +	frame->local = local; + +	worker = iot_schedule (conf, NULL, conf->misc_thread_index++); + +	stub = fop_unlink_stub (frame, iot_unlink_wrapper, loc); +	if (!stub) { +		gf_log (this->name, GF_LOG_ERROR, "cannot get fop_unlink call stub"); +		STACK_UNWIND (frame, -1, ENOMEM); +		return 0; +	} +	iot_queue (worker, stub); + +	return 0; +} + +int32_t +iot_release (xlator_t *this, +	     fd_t *fd) +{ +	iot_file_t *file = NULL; +	iot_conf_t *conf = NULL; +	uint64_t tmp_file = 0; +	int ret = 0; + +	conf = this->private; +	ret = fd_ctx_del (fd, this, &tmp_file); +	if (ret) +		return 0; + +	file = (iot_file_t *)(long)tmp_file; + +	pthread_mutex_lock (&conf->files_lock); +	{ +		(file->prev)->next = file->next; +		(file->next)->prev = file->prev; +	} +	pthread_mutex_unlock (&conf->files_lock); + +	FREE (file); +	return 0; +} + + +static void +iot_queue (iot_worker_t *worker, +           call_stub_t *stub) +{ +	iot_queue_t *queue; +	iot_conf_t *conf = worker->conf; +	iot_local_t *local = stub->frame->local; +	size_t frame_size = local->frame_size; + +	queue = CALLOC (1, sizeof (*queue)); +	ERR_ABORT (queue); +	queue->stub = stub; + +	pthread_mutex_lock (&conf->lock); + +	/* +	  while (worker->queue_size >= worker->queue_limit) +	  pthread_cond_wait (&worker->q_cond, &worker->lock); +	*/ +	if (conf->cache_size) { +		while (frame_size && (conf->current_size >= conf->cache_size)) +			pthread_cond_wait (&conf->q_cond, &conf->lock); +	} + +	queue->next = &worker->queue; +	queue->prev = worker->queue.prev; + +	queue->next->prev = queue; +	queue->prev->next = queue; + +	/* dq_cond */ +	worker->queue_size++; +	worker->q++; + +	conf->current_size += local->frame_size; + +	pthread_cond_broadcast (&worker->dq_cond); + +	pthread_mutex_unlock (&conf->lock); +} + +static call_stub_t * +iot_dequeue (iot_worker_t *worker) +{ +	call_stub_t *stub = NULL; +	iot_queue_t *queue = NULL; +	iot_conf_t *conf = worker->conf; +	iot_local_t *local = NULL; + + +	pthread_mutex_lock (&conf->lock); + +	while (!worker->queue_size) +		/* +		  pthread_cond_wait (&worker->dq_cond, &worker->lock); +		*/ +		pthread_cond_wait (&worker->dq_cond, &conf->lock); + +	queue = worker->queue.next; + +	queue->next->prev = queue->prev; +	queue->prev->next = queue->next; + +	stub = queue->stub; +	local = stub->frame->local; + +	worker->queue_size--; +	worker->dq++; + +	/* q_cond */ +	conf->current_size -= local->frame_size; + +	pthread_cond_broadcast (&conf->q_cond); + +	pthread_mutex_unlock (&conf->lock); + +	FREE (queue); + +	return stub; +} + +static void * +iot_worker (void *arg) +{ +	iot_worker_t *worker = arg; + +	while (1) { +		call_stub_t *stub; + +		stub = iot_dequeue (worker); +		call_resume (stub); +	} +} + +#if 0 +static void * +iot_reply (void *arg) +{ +	iot_worker_t *reply = arg; + +	while (1) { +		call_stub_t *stub; + +		stub = iot_dequeue (reply); +		FREE (stub->frame->local); +		stub->frame->local = NULL; +		call_resume (stub); +	} +} +#endif + +static void +workers_init (iot_conf_t *conf) +{ +	int i; + +	conf->workers.next = &conf->workers; +	conf->workers.prev = &conf->workers; + +	for (i=0; i<conf->thread_count; i++) { + +		iot_worker_t *worker = CALLOC (1, sizeof (*worker)); +		ERR_ABORT (worker); + +		worker->next = &conf->workers; +		worker->prev = conf->workers.prev; +		worker->next->prev = worker; +		worker->prev->next = worker; + +		worker->queue.next = &worker->queue; +		worker->queue.prev = &worker->queue; + +		/* +		  pthread_mutex_init (&worker->lock, NULL); +		  pthread_cond_init (&worker->q_cond, NULL); +		*/ +		pthread_cond_init (&worker->dq_cond, NULL); + +		/* +		  worker->queue_limit = conf->queue_limit; +		*/ + +		worker->conf = conf; + +		pthread_create (&worker->thread, NULL, iot_worker, worker); +	} +} + +int32_t  +init (xlator_t *this) +{ +	iot_conf_t *conf; +	dict_t *options = this->options; + +	if (!this->children || this->children->next) { +		gf_log ("io-threads", +			GF_LOG_ERROR, +			"FATAL: iot not configured with exactly one child"); +		return -1; +	} + +	if (!this->parents) { +		gf_log (this->name, GF_LOG_WARNING, +			"dangling volume. check volfile "); +	} + +	conf = (void *) CALLOC (1, sizeof (*conf)); +	ERR_ABORT (conf); + +	conf->thread_count = 1; + +	if (dict_get (options, "thread-count")) { +		conf->thread_count = data_to_int32 (dict_get (options, +							      "thread-count")); +		gf_log ("io-threads", +			GF_LOG_DEBUG, +			"Using conf->thread_count = %d", +			conf->thread_count); +	} + +	pthread_mutex_init (&conf->lock, NULL); +	pthread_cond_init (&conf->q_cond, NULL); + +	conf->files.next = &conf->files; +	conf->files.prev = &conf->files; +	pthread_mutex_init (&conf->files_lock, NULL); + +	workers_init (conf); + +	this->private = conf; +	return 0; +} + +void +fini (xlator_t *this) +{ +	iot_conf_t *conf = this->private; + +	FREE (conf); + +	this->private = NULL; +	return; +} + +struct xlator_fops fops = { +	.open        = iot_open, +	.create      = iot_create, +	.readv       = iot_readv, +	.writev      = iot_writev, +	.flush       = iot_flush, +	.fsync       = iot_fsync, +	.lk          = iot_lk, +	.stat        = iot_stat, +	.fstat       = iot_fstat, +	.truncate    = iot_truncate, +	.ftruncate   = iot_ftruncate, +	.utimens     = iot_utimens, +	.checksum    = iot_checksum, +	.unlink      = iot_unlink, +}; + +struct xlator_mops mops = { +}; + +struct xlator_cbks cbks = { +	.release = iot_release, +}; + +struct volume_options options[] = { +	{ .key  = {"thread-count"},  +	  .type = GF_OPTION_TYPE_INT,  +	  .min  = 1,  +	  .max  = 32 +	}, +	{ .key  = {NULL} }, +}; diff --git a/xlators/performance/io-threads/src/io-threads.h b/xlators/performance/io-threads/src/io-threads.h new file mode 100644 index 00000000000..6595d3e277b --- /dev/null +++ b/xlators/performance/io-threads/src/io-threads.h @@ -0,0 +1,99 @@ +/* +   Copyright (c) 2006, 2007, 2008 Z RESEARCH, Inc. <http://www.zresearch.com> +   This file is part of GlusterFS. + +   GlusterFS is free software; you can redistribute it and/or modify +   it under the terms of the GNU General Public License as published +   by the Free Software Foundation; either version 3 of the License, +   or (at your option) any later version. + +   GlusterFS is distributed in the hope that it will be useful, but +   WITHOUT ANY WARRANTY; without even the implied warranty of +   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU +   General Public License for more details. + +   You should have received a copy of the GNU General Public License +   along with this program.  If not, see +   <http://www.gnu.org/licenses/>. +*/ + +#ifndef __IOT_H +#define __IOT_H + +#ifndef _CONFIG_H +#define _CONFIG_H +#include "config.h" +#endif + + +#include "compat-errno.h" +#include "glusterfs.h" +#include "logging.h" +#include "dict.h" +#include "xlator.h" +#include "common-utils.h" + +#define min(a,b) ((a)<(b)?(a):(b)) +#define max(a,b) ((a)>(b)?(a):(b)) + +struct iot_conf; +struct iot_worker; +struct iot_queue; +struct iot_local; +struct iot_file; + +struct iot_local { +  struct iot_file *file; +  size_t frame_size; +}; + +struct iot_queue { +  struct iot_queue *next, *prev; +  call_stub_t *stub; +}; + +struct iot_worker { +  struct iot_worker *next, *prev; +  struct iot_queue queue; +  struct iot_conf *conf; +  int64_t q,dq; +  pthread_cond_t dq_cond; +  /* +    pthread_cond_t q_cond; +    pthread_mutex_t lock; +  */ +  int32_t fd_count; +  int32_t queue_size; +  /* +    int32_t queue_limit; +  */ +  pthread_t thread; +}; + +struct iot_file { +  struct iot_file *next, *prev; /* all open files via this xlator */ +  struct iot_worker *worker; +  fd_t *fd; +  int32_t pending_ops; +}; + +struct iot_conf { +  int32_t thread_count; +  int32_t misc_thread_index;  /* Used to schedule the miscellaneous calls like checksum */ +  struct iot_worker workers; +  struct iot_file files; +  pthread_mutex_t files_lock; + +  uint64_t cache_size; +  off_t current_size; +  pthread_cond_t q_cond; +  pthread_mutex_t lock; +}; + +typedef struct iot_file iot_file_t; +typedef struct iot_conf iot_conf_t; +typedef struct iot_local iot_local_t; +typedef struct iot_worker iot_worker_t; +typedef struct iot_queue iot_queue_t; + +#endif /* __IOT_H */ diff --git a/xlators/performance/read-ahead/Makefile.am b/xlators/performance/read-ahead/Makefile.am new file mode 100644 index 00000000000..d471a3f9243 --- /dev/null +++ b/xlators/performance/read-ahead/Makefile.am @@ -0,0 +1,3 @@ +SUBDIRS = src + +CLEANFILES =  diff --git a/xlators/performance/read-ahead/src/Makefile.am b/xlators/performance/read-ahead/src/Makefile.am new file mode 100644 index 00000000000..7bb90228227 --- /dev/null +++ b/xlators/performance/read-ahead/src/Makefile.am @@ -0,0 +1,14 @@ +xlator_LTLIBRARIES = read-ahead.la +xlatordir = $(libdir)/glusterfs/$(PACKAGE_VERSION)/xlator/performance + +read_ahead_la_LDFLAGS = -module -avoidversion + +read_ahead_la_SOURCES = read-ahead.c page.c +read_ahead_la_LIBADD = $(top_builddir)/libglusterfs/src/libglusterfs.la + +noinst_HEADERS = read-ahead.h + +AM_CFLAGS = -fPIC -D_FILE_OFFSET_BITS=64 -D_GNU_SOURCE -Wall -D$(GF_HOST_OS)\ +	-I$(top_srcdir)/libglusterfs/src -shared -nostartfiles $(GF_CFLAGS) + +CLEANFILES =  diff --git a/xlators/performance/read-ahead/src/page.c b/xlators/performance/read-ahead/src/page.c new file mode 100644 index 00000000000..3b8d4d2093e --- /dev/null +++ b/xlators/performance/read-ahead/src/page.c @@ -0,0 +1,487 @@ +/* +  Copyright (c) 2006, 2007, 2008, 2009 Z RESEARCH, Inc. <http://www.zresearch.com> +  This file is part of GlusterFS. + +  GlusterFS is free software; you can redistribute it and/or modify +  it under the terms of the GNU General Public License as published +  by the Free Software Foundation; either version 3 of the License, +  or (at your option) any later version. + +  GlusterFS is distributed in the hope that it will be useful, but +  WITHOUT ANY WARRANTY; without even the implied warranty of +  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU +  General Public License for more details. + +  You should have received a copy of the GNU General Public License +  along with this program.  If not, see +  <http://www.gnu.org/licenses/>. +*/ + +#ifndef _CONFIG_H +#define _CONFIG_H +#include "config.h" +#endif + +#include "glusterfs.h" +#include "logging.h" +#include "dict.h" +#include "xlator.h" +#include "read-ahead.h" +#include <assert.h> + + +ra_page_t * +ra_page_get (ra_file_t *file, +	     off_t offset) +{ +	ra_page_t *page = NULL; +	off_t      rounded_offset = 0; + +	page = file->pages.next; +	rounded_offset = floor (offset, file->page_size); + +	while (page != &file->pages && page->offset < rounded_offset) +		page = page->next; + +	if (page == &file->pages || page->offset != rounded_offset) +		page = NULL; + +	return page; +} + + +ra_page_t * +ra_page_create (ra_file_t *file, off_t offset) +{ +	ra_page_t *page      = NULL; +	off_t      rounded_offset = 0; +	ra_page_t *newpage   = NULL; + +	page           = file->pages.next; +	rounded_offset = floor (offset, file->page_size); + +	while (page != &file->pages && page->offset < rounded_offset) +		page = page->next; + +	if (page == &file->pages || page->offset != rounded_offset) { +		newpage = CALLOC (1, sizeof (*newpage)); +		if (!newpage) +			return NULL; + +		newpage->offset = rounded_offset; +		newpage->prev = page->prev; +		newpage->next = page; +		newpage->file = file; +		page->prev->next = newpage; +		page->prev = newpage; + +		page = newpage; +	} + +	return page; +} + + +void +ra_wait_on_page (ra_page_t *page, call_frame_t *frame) +{ +	ra_waitq_t *waitq = NULL; +	ra_local_t *local = NULL; + + +	local = frame->local; +	waitq = CALLOC (1, sizeof (*waitq)); +	if (!waitq) { +		gf_log (frame->this->name, GF_LOG_ERROR, +			"out of memory :("); +		return; +	} + +	waitq->data = frame; +	waitq->next = page->waitq; +	page->waitq = waitq; + +	ra_local_lock (local); +	{ +		local->wait_count++; +	} +	ra_local_unlock (local); +} + + +void +ra_waitq_return (ra_waitq_t *waitq) +{ +	ra_waitq_t   *trav = NULL; +	ra_waitq_t   *next = NULL; +	call_frame_t *frame = NULL; + +	for (trav = waitq; trav; trav = next) { +		next = trav->next; + +		frame = trav->data; +		ra_frame_return (frame); +		free (trav); +	} +} + + +int +ra_fault_cbk (call_frame_t *frame, void *cookie, xlator_t *this, +	      int32_t op_ret, int32_t op_errno, struct iovec *vector, +	      int32_t count, struct stat *stbuf) +{ +	ra_local_t   *local = NULL; +	off_t         pending_offset = 0; +	ra_file_t    *file = NULL; +	ra_page_t    *page = NULL; +	off_t         trav_offset = 0; +	size_t        payload_size = 0; +	ra_waitq_t   *waitq = NULL; +	fd_t         *fd = NULL; +	int           ret = 0; +	uint64_t      tmp_file = 0; + +	local = frame->local; +	fd  = local->fd; + +	ret = fd_ctx_get (fd, this, &tmp_file); + +	file = (ra_file_t *)(long)tmp_file; +	pending_offset = local->pending_offset; +	trav_offset    = pending_offset;   +	payload_size   = op_ret; + +	ra_file_lock (file); +	{ +		if (op_ret >= 0) +			file->stbuf = *stbuf; + +		if (op_ret < 0) { +			page = ra_page_get (file, pending_offset); +			if (page) +				waitq = ra_page_error (page, op_ret, op_errno); +			goto unlock; +		} + +		page = ra_page_get (file, pending_offset); +		if (!page) { +			gf_log (this->name, GF_LOG_DEBUG, +				"wasted copy: %"PRId64"[+%"PRId64"] file=%p",  +				pending_offset, file->page_size, file); +			goto unlock; +		} + +		if (page->vector) { +			dict_unref (page->ref); +			free (page->vector); +		} + +		page->vector = iov_dup (vector, count); +		page->count = count; +		page->ref = dict_ref (frame->root->rsp_refs); +		page->ready = 1; + +		page->size = iov_length (vector, count); + +		waitq = ra_page_wakeup (page); +	} +unlock: +	ra_file_unlock (file); + +	ra_waitq_return (waitq); + +	fd_unref (local->fd); + +	free (frame->local); +	frame->local = NULL; + +	STACK_DESTROY (frame->root); +	return 0; +} + + +void +ra_page_fault (ra_file_t *file, +	       call_frame_t *frame, +	       off_t offset) +{ +	call_frame_t *fault_frame = NULL; +	ra_local_t   *fault_local = NULL; +     +	fault_frame = copy_frame (frame); +	fault_local = CALLOC (1, sizeof (ra_local_t)); + +	fault_frame->local = fault_local; +	fault_local->pending_offset = offset; +	fault_local->pending_size = file->page_size; + +	fault_local->fd = fd_ref (file->fd); + +	STACK_WIND (fault_frame, ra_fault_cbk, +		    FIRST_CHILD (fault_frame->this), +		    FIRST_CHILD (fault_frame->this)->fops->readv, +		    file->fd, file->page_size, offset); +	return; +} + +void +ra_frame_fill (ra_page_t *page, call_frame_t *frame) +{ +	ra_local_t *local = NULL; +	ra_fill_t  *fill = NULL; +	off_t       src_offset = 0; +	off_t       dst_offset = 0; +	ssize_t     copy_size = 0; +	ra_fill_t  *new = NULL; + + +	local = frame->local; +	fill  = &local->fill; + +	if (local->op_ret != -1 && page->size) { +		if (local->offset > page->offset) +			src_offset = local->offset - page->offset; +		else +			dst_offset = page->offset - local->offset; + +		copy_size = min (page->size - src_offset, +				 local->size - dst_offset); + +		if (copy_size < 0) { +			/* if page contains fewer bytes and the required offset +			   is beyond the page size in the page */ +			copy_size = src_offset = 0; +		} + +		fill = fill->next; +		while (fill != &local->fill) { +			if (fill->offset > page->offset) { +				break; +			} +			fill = fill->next; +		} + +		new = CALLOC (1, sizeof (*new)); + +		new->offset = page->offset; +		new->size = copy_size; +		new->refs = dict_ref (page->ref); +		new->count = iov_subset (page->vector, page->count, +					 src_offset, src_offset+copy_size, +					 NULL); +		new->vector = CALLOC (new->count, sizeof (struct iovec)); + +		new->count = iov_subset (page->vector, page->count, +					 src_offset, src_offset+copy_size, +					 new->vector); + +		new->next = fill; +		new->prev = new->next->prev; +		new->next->prev = new; +		new->prev->next = new; + +		local->op_ret += copy_size; +	} +} + + +void +ra_frame_unwind (call_frame_t *frame) +{ +	ra_local_t   *local = NULL; +	ra_fill_t    *fill = NULL; +	int32_t       count = 0; +	struct iovec *vector; +	int32_t       copied = 0; +	dict_t       *refs = NULL; +	ra_fill_t    *next = NULL; +	fd_t         *fd = NULL; +	ra_file_t    *file = NULL; +	int           ret = 0; +	uint64_t      tmp_file = 0; + +	local = frame->local; +	fill  = local->fill.next; + +	refs  = get_new_dict (); + +	frame->local = NULL; + +	while (fill != &local->fill) { +		count += fill->count; +		fill = fill->next; +	} + +	vector = CALLOC (count, sizeof (*vector)); + +	fill = local->fill.next; + +	while (fill != &local->fill) { +		next = fill->next; + +		memcpy (((char *)vector) + copied, fill->vector, +			fill->count * sizeof (*vector)); + +		copied += (fill->count * sizeof (*vector)); +		dict_copy (fill->refs, refs); + +		fill->next->prev = fill->prev; +		fill->prev->next = fill->prev; + +		dict_unref (fill->refs); +		free (fill->vector); +		free (fill); + +		fill = next; +	} + +	frame->root->rsp_refs = dict_ref (refs); + +	fd = local->fd; +	ret = fd_ctx_get (fd, frame->this, &tmp_file); +	file = (ra_file_t *)(long)tmp_file; +	 +	STACK_UNWIND (frame, local->op_ret, local->op_errno, +		      vector, count, &file->stbuf); +   +	dict_unref (refs); +	pthread_mutex_destroy (&local->local_lock); +	free (local); +	free (vector); + +	return; +} + +/* + * ra_frame_return - + * @frame: + * + */ +void +ra_frame_return (call_frame_t *frame) +{ +	ra_local_t *local = NULL; +	int32_t     wait_count = 0; + +	local = frame->local; +	assert (local->wait_count > 0); + +	ra_local_lock (local); +	{ +		wait_count = --local->wait_count; +	} +	ra_local_unlock (local); + +	if (!wait_count) +		ra_frame_unwind (frame); + +	return; +} + +/*  + * ra_page_wakeup - + * @page: + * + */ +ra_waitq_t * +ra_page_wakeup (ra_page_t *page) +{ +	ra_waitq_t *waitq = NULL, *trav = NULL; +	call_frame_t *frame; + +	waitq = page->waitq; +	page->waitq = NULL; + +	trav = waitq; +	for (trav = waitq; trav; trav = trav->next) { +		frame = trav->data; +		ra_frame_fill (page, frame); +	} + +	return waitq; +} + +/* + * ra_page_purge - + * @page: + * + */ +void +ra_page_purge (ra_page_t *page) +{ +	page->prev->next = page->next; +	page->next->prev = page->prev; + +	if (page->ref) { +		dict_unref (page->ref); +	} +	free (page->vector); +	free (page); +} + +/* + * ra_page_error - + * @page: + * @op_ret: + * @op_errno: + * + */ +ra_waitq_t * +ra_page_error (ra_page_t *page, int32_t op_ret, int32_t op_errno) +{ + +	ra_waitq_t   *waitq = NULL; +	ra_waitq_t   *trav = NULL; +	call_frame_t *frame = NULL; +	ra_local_t   *local = NULL; + +	waitq = page->waitq; +	page->waitq = NULL; + +	trav = waitq; +	for (trav = waitq; trav; trav = trav->next) { +		frame = trav->data; + +		local = frame->local; +		if (local->op_ret != -1) { +			local->op_ret   = op_ret; +			local->op_errno = op_errno; +		} +	} + +	ra_page_purge (page); + +	return waitq; +} + +/*  + * ra_file_destroy - + * @file: + * + */ +void +ra_file_destroy (ra_file_t *file) +{ +	ra_conf_t *conf = NULL; +	ra_page_t *trav = NULL; + +	conf = file->conf; + +	ra_conf_lock (conf); +	{ +		file->prev->next = file->next; +		file->next->prev = file->prev; +	} +	ra_conf_unlock (conf); + +	trav = file->pages.next; +	while (trav != &file->pages) { +		ra_page_error (trav, -1, EINVAL); +		trav = file->pages.next; +	} + +	pthread_mutex_destroy (&file->file_lock); +	free (file); +} + diff --git a/xlators/performance/read-ahead/src/read-ahead.c b/xlators/performance/read-ahead/src/read-ahead.c new file mode 100644 index 00000000000..0060e00fd41 --- /dev/null +++ b/xlators/performance/read-ahead/src/read-ahead.c @@ -0,0 +1,890 @@ +/* +  Copyright (c) 2006, 2007, 2008, 2009 Z RESEARCH, Inc. <http://www.zresearch.com> +  This file is part of GlusterFS. + +  GlusterFS is free software; you can redistribute it and/or modify +  it under the terms of the GNU General Public License as published +  by the Free Software Foundation; either version 3 of the License, +  or (at your option) any later version. + +  GlusterFS is distributed in the hope that it will be useful, but +  WITHOUT ANY WARRANTY; without even the implied warranty of +  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU +  General Public License for more details. + +  You should have received a copy of the GNU General Public License +  along with this program.  If not, see +  <http://www.gnu.org/licenses/>. +*/ + +/*  +   TODO: +   - handle O_DIRECT +   - maintain offset, flush on lseek +   - ensure efficient memory managment in case of random seek +*/ + +#ifndef _CONFIG_H +#define _CONFIG_H +#include "config.h" +#endif + +#include "glusterfs.h" +#include "logging.h" +#include "dict.h" +#include "xlator.h" +#include "read-ahead.h" +#include <assert.h> +#include <sys/time.h> + + +static void +read_ahead (call_frame_t *frame, +            ra_file_t *file); + + +int +ra_open_cbk (call_frame_t *frame, void *cookie, xlator_t *this, +             int32_t op_ret, int32_t op_errno, fd_t *fd) +{ +	ra_conf_t  *conf = NULL; +	ra_file_t  *file = NULL; +	int         ret = 0; + +	conf  = this->private; + +	if (op_ret == -1) { +		goto unwind; +	} + +	file = CALLOC (1, sizeof (*file)); +	if (!file) { +		gf_log (this->name, GF_LOG_ERROR, +			"out of memory :("); +		goto unwind; +	} + +	ret = fd_ctx_set (fd, this, (uint64_t)(long)file); + +	/* If mandatory locking has been enabled on this file, +	   we disable caching on it */ + +	if ((fd->inode->st_mode & S_ISGID) && !(fd->inode->st_mode & S_IXGRP)) +		file->disabled = 1; + +	/* If O_DIRECT open, we disable caching on it */ + +	if ((fd->flags & O_DIRECT) || (fd->flags & O_WRONLY)) +		file->disabled = 1; + +	file->offset = (unsigned long long) 0; +	file->conf = conf; +	file->pages.next = &file->pages; +	file->pages.prev = &file->pages; +	file->pages.offset = (unsigned long long) 0; +	file->pages.file = file; + +	ra_conf_lock (conf); +	{ +		file->next = conf->files.next; +		conf->files.next = file; +		file->next->prev = file; +		file->prev = &conf->files; +	} +	ra_conf_unlock (conf); + +	file->fd = fd; +	file->page_count = conf->page_count; +	file->page_size = conf->page_size; +	pthread_mutex_init (&file->file_lock, NULL); + +	if (!file->disabled) { +		file->page_count = 1; +	} + +unwind: +	STACK_UNWIND (frame, op_ret, op_errno, fd); + +	return 0; +} + + +int +ra_create_cbk (call_frame_t *frame, void *cookie, xlator_t *this, +               int32_t op_ret, int32_t op_errno, +	       fd_t *fd, inode_t *inode, struct stat *buf) +{ +	ra_conf_t  *conf = NULL; +	ra_file_t  *file = NULL; +	int         ret = 0; + +	conf  = this->private; + +	if (op_ret == -1) { +		goto unwind; +	} + +	file = CALLOC (1, sizeof (*file)); +	if (!file) { +		gf_log (this->name, GF_LOG_ERROR, +			"out of memory :("); +		goto unwind; +	} + +	ret = fd_ctx_set (fd, this, (uint64_t)(long)file); + +	/* If mandatory locking has been enabled on this file, +	   we disable caching on it */ + +	if ((fd->inode->st_mode & S_ISGID) && !(fd->inode->st_mode & S_IXGRP)) +		file->disabled = 1; + +	/* If O_DIRECT open, we disable caching on it */ + +	if ((fd->flags & O_DIRECT) || (fd->flags & O_WRONLY)) +			file->disabled = 1; + +	file->offset = (unsigned long long) 0; +	//file->size = fd->inode->buf.st_size; +	file->conf = conf; +	file->pages.next = &file->pages; +	file->pages.prev = &file->pages; +	file->pages.offset = (unsigned long long) 0; +	file->pages.file = file; + +	ra_conf_lock (conf); +	{ +		file->next = conf->files.next; +		conf->files.next = file; +		file->next->prev = file; +		file->prev = &conf->files; +	} +	ra_conf_unlock (conf); + +	file->fd = fd; +	file->page_count = conf->page_count; +	file->page_size = conf->page_size; +	pthread_mutex_init (&file->file_lock, NULL); + +unwind: +	STACK_UNWIND (frame, op_ret, op_errno, fd, inode, buf); + +	return 0; +} + + +int +ra_open (call_frame_t *frame, xlator_t *this, +         loc_t *loc, int32_t flags, fd_t *fd) +{ +	STACK_WIND (frame, ra_open_cbk, +		    FIRST_CHILD (this), +		    FIRST_CHILD (this)->fops->open, +		    loc, flags, fd); + +	return 0; +} + +int +ra_create (call_frame_t *frame, xlator_t *this, +	   loc_t *loc, int32_t flags, mode_t mode, fd_t *fd) +{ +	STACK_WIND (frame, ra_create_cbk, +		    FIRST_CHILD(this), +		    FIRST_CHILD(this)->fops->create, +		    loc, flags, mode, fd); + +	return 0; +} + +/* free cache pages between offset and offset+size, +   does not touch pages with frames waiting on it +*/ + +static void +flush_region (call_frame_t *frame, +              ra_file_t *file, +              off_t offset, +              off_t size) +{ +	ra_page_t *trav = NULL; +	ra_page_t *next = NULL; + + +	ra_file_lock (file); +	{ +		trav = file->pages.next; +		while (trav != &file->pages +		       && trav->offset < (offset + size)) { + +			next = trav->next; +			if (trav->offset >= offset && !trav->waitq) { +				ra_page_purge (trav); +			} +			trav = next; +		} +	} +	ra_file_unlock (file); +} + + + +int +ra_release (xlator_t *this, +	    fd_t *fd) +{ +	uint64_t tmp_file = 0; +	int      ret = 0; + +	ret = fd_ctx_del (fd, this, &tmp_file); +	 +	if (!ret) { +		ra_file_destroy ((ra_file_t *)(long)tmp_file); +	} + +	return 0; +} + + +void +read_ahead (call_frame_t *frame, ra_file_t *file) +{ +	off_t      ra_offset = 0; +	size_t     ra_size = 0; +	off_t      trav_offset = 0; +	ra_page_t *trav = NULL; +	off_t      cap = 0; +	char       fault = 0; + +	if (!file->page_count) +		return; + +	ra_size   = file->page_size * file->page_count; +	ra_offset = floor (file->offset, file->page_size); +	cap       = file->size ? file->size : file->offset + ra_size; + +	while (ra_offset < min (file->offset + ra_size, cap)) { + +		ra_file_lock (file); +		{ +			trav = ra_page_get (file, ra_offset); +		} +		ra_file_unlock (file); + +		if (!trav) +			break; + +		ra_offset += file->page_size; +	} + +	if (trav) +		/* comfortable enough */ +		return; + +	trav_offset = ra_offset; + +	trav = file->pages.next; +	cap  = file->size ? file->size : ra_offset + ra_size; + +	while (trav_offset < min(ra_offset + ra_size, cap)) { +		fault = 0; +		ra_file_lock (file); +		{ +			trav = ra_page_get (file, trav_offset); +			if (!trav) { +				fault = 1; +				trav = ra_page_create (file, trav_offset); +				if (trav)  +					trav->dirty = 1; +			} +		} +		ra_file_unlock (file); + +		if (!trav) { +			/* OUT OF MEMORY */ +			break; +		} + +		if (fault) { +			gf_log (frame->this->name, GF_LOG_DEBUG, +				"RA at offset=%"PRId64, trav_offset); +			ra_page_fault (file, frame, trav_offset); +		} +		trav_offset += file->page_size; +	} + +	return; +} + + +int +ra_need_atime_cbk (call_frame_t *frame, void *cookie, xlator_t *this, +                   int32_t op_ret, int32_t op_errno, struct iovec *vector, +                   int32_t count, struct stat *stbuf) +{ +	STACK_DESTROY (frame->root); +	return 0; +} + + +static void +dispatch_requests (call_frame_t *frame, +                   ra_file_t *file) +{ +	ra_local_t   *local = NULL; +	ra_conf_t    *conf = NULL; +	off_t         rounded_offset = 0; +	off_t         rounded_end = 0; +	off_t         trav_offset = 0; +	ra_page_t    *trav = NULL; +	call_frame_t *ra_frame = NULL; +	char          need_atime_update = 1; +	char          fault = 0; + + +	local = frame->local; +	conf  = file->conf; + +	rounded_offset = floor (local->offset, file->page_size); +	rounded_end    = roof (local->offset + local->size, file->page_size); + +	trav_offset = rounded_offset; +	trav        = file->pages.next; + +	while (trav_offset < rounded_end) { +		fault = 0; + +		ra_file_lock (file); +		{ +			trav = ra_page_get (file, trav_offset); +			if (!trav) { +				trav = ra_page_create (file, trav_offset); +				fault = 1; +				need_atime_update = 0; +			} + +			if (!trav) +				goto unlock; + +			if (trav->ready) { +				gf_log (frame->this->name, GF_LOG_DEBUG, +					"HIT at offset=%"PRId64".", +					trav_offset); +				ra_frame_fill (trav, frame); +			} else { +				gf_log (frame->this->name, GF_LOG_DEBUG, +					"IN-TRANSIT at offset=%"PRId64".", +					trav_offset); +				ra_wait_on_page (trav, frame); +				need_atime_update = 0; +			} +		} +	unlock: +		ra_file_unlock (file); + +		if (fault) { +			gf_log (frame->this->name, GF_LOG_DEBUG, +				"MISS at offset=%"PRId64".", +				trav_offset); +			ra_page_fault (file, frame, trav_offset); +		} + +		trav_offset += file->page_size; +	} + +	if (need_atime_update && conf->force_atime_update) { +		/* TODO: use untimens() since readv() can confuse underlying +		   io-cache and others */ +		ra_frame = copy_frame (frame); +		STACK_WIND (ra_frame, ra_need_atime_cbk, +			    FIRST_CHILD (frame->this),  +			    FIRST_CHILD (frame->this)->fops->readv, +			    file->fd, 1, 1); +	} + +	return ; +} + + +int +ra_readv_disabled_cbk (call_frame_t *frame, void *cookie, xlator_t *this, +                       int32_t op_ret, int32_t op_errno, +		       struct iovec *vector, int32_t count, struct stat *stbuf) +{ +	STACK_UNWIND (frame, op_ret, op_errno, vector, count, stbuf); + +	return 0; +} + + +int +ra_readv (call_frame_t *frame, xlator_t *this, +	  fd_t *fd, size_t size, off_t offset) +{ +	ra_file_t    *file = NULL; +	ra_local_t   *local = NULL; +	ra_conf_t    *conf = NULL; +	int           op_errno = 0; +	int           ret = 0; +	char expected_offset = 1; +	uint64_t tmp_file = 0; + +	conf = this->private; + +	gf_log (this->name, GF_LOG_DEBUG, +		"NEW REQ at offset=%"PRId64" for size=%"GF_PRI_SIZET"", +		offset, size); + +	ret = fd_ctx_get (fd, this, &tmp_file); +	file = (ra_file_t *)(long)tmp_file; + +	if (file->offset != offset) { +		gf_log (this->name, GF_LOG_DEBUG, +			"unexpected offset (%"PRId64" != %"PRId64") resetting", +			file->offset, offset); + +		expected_offset = file->expected = file->page_count = 0; +	} else { +		gf_log (this->name, GF_LOG_DEBUG, +			"expected offset (%"PRId64") when page_count=%d", +			offset, file->page_count); + +		if (file->expected < (conf->page_size * conf->page_count)) { +			file->expected += size; +			file->page_count = min ((file->expected / file->page_size), +						conf->page_count); +		} +	} + +	if (!expected_offset) { +		flush_region (frame, file, 0, file->pages.prev->offset + 1); +	} + +	if (file->disabled) { +		STACK_WIND (frame, ra_readv_disabled_cbk, +			    FIRST_CHILD (frame->this),  +			    FIRST_CHILD (frame->this)->fops->readv, +			    file->fd, size, offset); +		return 0; +	} + +	local = (void *) CALLOC (1, sizeof (*local)); +	if (!local) { +		gf_log (this->name, GF_LOG_ERROR, +			"out of memory :("); +		op_errno = ENOMEM; +		goto unwind; +	} + +	local->fd         = fd; +	local->offset     = offset; +	local->size       = size; +	local->wait_count = 1; + +	local->fill.next  = &local->fill; +	local->fill.prev  = &local->fill; + +	pthread_mutex_init (&local->local_lock, NULL); + +	frame->local = local; + +	dispatch_requests (frame, file); + +	flush_region (frame, file, 0, floor (offset, file->page_size)); + +	read_ahead (frame, file); + +	ra_frame_return (frame); + +	file->offset = offset + size; + +	return 0; + +unwind: +	STACK_UNWIND (frame, -1, op_errno, NULL, 0, NULL); + +	return 0; +} + + +int +ra_flush_cbk (call_frame_t *frame, +              void *cookie, +              xlator_t *this, +              int32_t op_ret, +              int32_t op_errno) +{ +	STACK_UNWIND (frame, op_ret, op_errno); +	return 0; +} + + +int +ra_flush (call_frame_t *frame, xlator_t *this, fd_t *fd) +{ +	ra_file_t *file = NULL; +	int        ret = 0; +	uint64_t tmp_file = 0; + +	ret = fd_ctx_get (fd, this, &tmp_file); +	file = (ra_file_t *)(long)tmp_file; + +	if (file) { +		flush_region (frame, file, 0, file->pages.prev->offset+1); +	} + +	STACK_WIND (frame, ra_flush_cbk, +		    FIRST_CHILD (this), +		    FIRST_CHILD (this)->fops->flush, +		    fd); +	return 0; +} + + +int +ra_fsync (call_frame_t *frame, xlator_t *this, fd_t *fd, +          int32_t datasync) +{ +	ra_file_t *file = NULL; +	int        ret = 0; +	uint64_t tmp_file = 0; + +	ret = fd_ctx_get (fd, this, &tmp_file); +	file = (ra_file_t *)(long)tmp_file; + +	if (file) { +		flush_region (frame, file, 0, file->pages.prev->offset+1); +	} + +	STACK_WIND (frame, ra_flush_cbk, +		    FIRST_CHILD (this), +		    FIRST_CHILD (this)->fops->fsync, +		    fd, datasync); +	return 0; +} + + +int +ra_writev_cbk (call_frame_t *frame, void *cookie, xlator_t *this, +               int32_t op_ret, int32_t op_errno, struct stat *stbuf) +{ +	fd_t      *fd = NULL; +	ra_file_t *file = NULL; +	int        ret = 0; +	uint64_t tmp_file = 0; + +	fd = frame->local; + +	ret = fd_ctx_get (fd, this, &tmp_file); +	file = (ra_file_t *)(long)tmp_file; + +	if (file) { +		flush_region (frame, file, 0, file->pages.prev->offset+1); +	} + +	frame->local = NULL; +	STACK_UNWIND (frame, op_ret, op_errno, stbuf); +	return 0; +} + + +int +ra_writev (call_frame_t *frame, xlator_t *this, fd_t *fd, +           struct iovec *vector, int32_t count, off_t offset) +{ +	ra_file_t *file = NULL; +	int        ret = 0; +	uint64_t tmp_file = 0; + +	ret = fd_ctx_get (fd, this, &tmp_file); +	file = (ra_file_t *)(long)tmp_file; + +	if (file) { +		flush_region (frame, file, 0, file->pages.prev->offset+1); + +		/* reset the read-ahead counters too */ +		file->expected = file->page_count = 0; +	} + +	frame->local = fd; + +	STACK_WIND (frame, ra_writev_cbk, +		    FIRST_CHILD(this), +		    FIRST_CHILD(this)->fops->writev, +		    fd, vector, count, offset); + +	return 0; +} + + +int +ra_attr_cbk (call_frame_t *frame, void *cookie, xlator_t *this, +	     int32_t op_ret, int32_t op_errno, struct stat *buf) +{ +	STACK_UNWIND (frame, op_ret, op_errno, buf); +	return 0; +} + + +int +ra_truncate (call_frame_t *frame, xlator_t *this, +             loc_t *loc, off_t offset) +{ +	ra_file_t *file = NULL; +	fd_t      *iter_fd = NULL; +	inode_t   *inode = NULL; +	int        ret = 0; +	uint64_t tmp_file = 0; + +	inode = loc->inode; + +	LOCK (&inode->lock); +	{ +		list_for_each_entry (iter_fd, &inode->fd_list, inode_list) { +			ret = fd_ctx_get (iter_fd, this, &tmp_file); +			file = (ra_file_t *)(long)tmp_file; + +			if (!file) +				continue; +			flush_region (frame, file, 0, +				      file->pages.prev->offset + 1); +		} +	} +	UNLOCK (&inode->lock); + +	STACK_WIND (frame, ra_attr_cbk, +		    FIRST_CHILD (this), +		    FIRST_CHILD (this)->fops->truncate, +		    loc, offset); +	return 0; +} + + +int +ra_fstat (call_frame_t *frame, xlator_t *this, +	  fd_t *fd) +{ +	ra_file_t *file = NULL; +	fd_t      *iter_fd = NULL; +	inode_t   *inode = NULL; +	int        ret = 0; +	uint64_t tmp_file = 0; + +	inode = fd->inode; + +	LOCK (&inode->lock); +	{ +		list_for_each_entry (iter_fd, &inode->fd_list, inode_list) { +			ret = fd_ctx_get (iter_fd, this, &tmp_file); +			file = (ra_file_t *)(long)tmp_file; + +			if (!file) +				continue; +			flush_region (frame, file, 0, +				      file->pages.prev->offset + 1); +		} +	} +	UNLOCK (&inode->lock); + +	STACK_WIND (frame, ra_attr_cbk, +		    FIRST_CHILD (this), +		    FIRST_CHILD (this)->fops->fstat, +		    fd); +	return 0; +} + + +int +ra_fchown (call_frame_t *frame, xlator_t *this, +	   fd_t *fd, uid_t uid, gid_t gid) +{ +	ra_file_t *file = NULL; +	fd_t      *iter_fd = NULL; +	inode_t   *inode = NULL; +	int        ret = 0; +	uint64_t tmp_file = 0; + +	inode = fd->inode; + +	LOCK (&inode->lock); +	{ +		list_for_each_entry (iter_fd, &inode->fd_list, inode_list) { +			ret = fd_ctx_get (iter_fd, this, &tmp_file); +			file = (ra_file_t *)(long)tmp_file; + +			if (!file) +				continue; +			flush_region (frame, file, 0, +				      file->pages.prev->offset + 1); +		} +	} +	UNLOCK (&inode->lock); + +	STACK_WIND (frame, ra_attr_cbk, +		    FIRST_CHILD (this), +		    FIRST_CHILD (this)->fops->fchown, +		    fd, uid, gid); +	return 0; +} + + +int +ra_ftruncate (call_frame_t *frame, xlator_t *this, +              fd_t *fd, off_t offset) +{ +	ra_file_t *file = NULL; +	fd_t      *iter_fd = NULL; +	inode_t   *inode = NULL; +	int        ret = 0; +	uint64_t tmp_file = 0; + +	inode = fd->inode; + +	LOCK (&inode->lock); +	{ +		list_for_each_entry (iter_fd, &inode->fd_list, inode_list) { +			ret = fd_ctx_get (iter_fd, this, &tmp_file); +			file = (ra_file_t *)(long)tmp_file; +			if (!file) +				continue; +			flush_region (frame, file, 0, +				      file->pages.prev->offset + 1); +		} +	} +	UNLOCK (&inode->lock); + +	STACK_WIND (frame, ra_attr_cbk, +		    FIRST_CHILD (this), +		    FIRST_CHILD (this)->fops->ftruncate, +		    fd, offset); +	return 0; +} + + +int +init (xlator_t *this) +{ +	ra_conf_t *conf; +	dict_t *options = this->options; +	char *page_size_string = NULL; +	char *page_count_string = NULL; + +	if (!this->children || this->children->next) { +		gf_log (this->name,  GF_LOG_ERROR, +			"FATAL: read-ahead not configured with exactly one child"); +		return -1; +	} + +	if (!this->parents) { +		gf_log (this->name, GF_LOG_WARNING, +			"dangling volume. check volfile "); +	} +  +	conf = (void *) CALLOC (1, sizeof (*conf)); +	ERR_ABORT (conf); +	conf->page_size = 256 * 1024; +	conf->page_count = 2; + +	if (dict_get (options, "page-size")) +		page_size_string = data_to_str (dict_get (options, +							  "page-size")); +	if (page_size_string) +	{ +		if (gf_string2bytesize (page_size_string, &conf->page_size) != 0) +		{ +			gf_log ("read-ahead",  +				GF_LOG_ERROR,  +				"invalid number format \"%s\" of \"option page-size\"",  +				page_size_string); +			return -1; +		} +       +		gf_log (this->name, GF_LOG_DEBUG, "Using conf->page_size = %"PRIu64"", +			conf->page_size); +	} +   +	if (dict_get (options, "page-count")) +		page_count_string = data_to_str (dict_get (options,  +							   "page-count")); +	if (page_count_string) +	{ +		if (gf_string2uint_base10 (page_count_string, &conf->page_count) != 0) +		{ +			gf_log ("read-ahead",  +				GF_LOG_ERROR,  +				"invalid number format \"%s\" of \"option page-count\"",  +				page_count_string); +			return -1; +		} +		gf_log (this->name, GF_LOG_DEBUG, "Using conf->page_count = %u", +			conf->page_count); +	} +   +	if (dict_get (options, "force-atime-update")) { +		char *force_atime_update_str = data_to_str (dict_get (options, +								      "force-atime-update")); +		if (gf_string2boolean (force_atime_update_str, &conf->force_atime_update) == -1) { +			gf_log (this->name, GF_LOG_ERROR, +				"'force-atime-update' takes only boolean options"); +			return -1; +		} +		if (conf->force_atime_update) +			gf_log (this->name, GF_LOG_DEBUG, "Forcing atime updates on cache hit"); +	} + +	conf->files.next = &conf->files; +	conf->files.prev = &conf->files; + +	pthread_mutex_init (&conf->conf_lock, NULL); +	this->private = conf; +	return 0; +} + +void +fini (xlator_t *this) +{ +	ra_conf_t *conf = this->private; + +	pthread_mutex_destroy (&conf->conf_lock); +	FREE (conf); + +	this->private = NULL; +	return; +} + +struct xlator_fops fops = { +	.open        = ra_open, +	.create      = ra_create, +	.readv       = ra_readv, +	.writev      = ra_writev, +	.flush       = ra_flush, +	.fsync       = ra_fsync, +	.truncate    = ra_truncate, +	.ftruncate   = ra_ftruncate, +	.fstat       = ra_fstat, +	.fchown      = ra_fchown, +}; + +struct xlator_mops mops = { +}; + +struct xlator_cbks cbks = { +	.release       = ra_release, +}; + +struct volume_options options[] = { +	{ .key  = {"force-atime-update"},  +	  .type = GF_OPTION_TYPE_BOOL  +	}, +	{ .key  = {"page-size"},  +	  .type = GF_OPTION_TYPE_SIZET,  +	  .min  = 64 * GF_UNIT_KB,  +	  .max  = 2 * GF_UNIT_MB  +	}, +	{ .key  = {"page-count"},  +	  .type = GF_OPTION_TYPE_INT,  +	  .min  = 1,  +	  .max  = 16  +	}, +	{ .key = {NULL} }, +}; diff --git a/xlators/performance/read-ahead/src/read-ahead.h b/xlators/performance/read-ahead/src/read-ahead.h new file mode 100644 index 00000000000..d624ca8abc8 --- /dev/null +++ b/xlators/performance/read-ahead/src/read-ahead.h @@ -0,0 +1,194 @@ +/* +  Copyright (c) 2006, 2007, 2008 Z RESEARCH, Inc. <http://www.zresearch.com> +  This file is part of GlusterFS. + +  GlusterFS is free software; you can redistribute it and/or modify +  it under the terms of the GNU General Public License as published +  by the Free Software Foundation; either version 3 of the License, +  or (at your option) any later version. + +  GlusterFS is distributed in the hope that it will be useful, but +  WITHOUT ANY WARRANTY; without even the implied warranty of +  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU +  General Public License for more details. + +  You should have received a copy of the GNU General Public License +  along with this program.  If not, see +  <http://www.gnu.org/licenses/>. +*/ + +#ifndef __READ_AHEAD_H +#define __READ_AHEAD_H + +#ifndef _CONFIG_H +#define _CONFIG_H +#include "config.h" +#endif + + +#include "glusterfs.h" +#include "logging.h" +#include "dict.h" +#include "xlator.h" +#include "common-utils.h" + +struct ra_conf; +struct ra_local; +struct ra_page; +struct ra_file; +struct ra_waitq; + + +struct ra_waitq { +	struct ra_waitq *next; +	void            *data; +}; + + +struct ra_fill { +	struct ra_fill *next; +	struct ra_fill *prev; +	off_t           offset; +	size_t          size; +	struct iovec   *vector; +	int32_t         count; +	dict_t         *refs; +}; + + +struct ra_local { +	mode_t            mode; +	struct ra_fill    fill; +	off_t             offset; +	size_t            size; +	int32_t           op_ret; +	int32_t           op_errno; +	off_t             pending_offset; +	size_t            pending_size; +	fd_t             *fd; +	int32_t           wait_count; +	pthread_mutex_t   local_lock; +}; + + +struct ra_page { +	struct ra_page   *next; +	struct ra_page   *prev; +	struct ra_file   *file; +	char              dirty; +	char              ready; +	struct iovec     *vector; +	int32_t           count; +	off_t             offset; +	size_t            size; +	struct ra_waitq  *waitq; +	dict_t           *ref; +}; + + +struct ra_file { +	struct ra_file    *next; +	struct ra_file    *prev; +	struct ra_conf    *conf; +	fd_t              *fd; +	int                disabled; +	size_t             expected; +	struct ra_page     pages; +	off_t              offset; +	size_t             size; +	int32_t            refcount; +	pthread_mutex_t    file_lock; +	struct stat        stbuf; +	uint64_t           page_size; +	uint32_t           page_count; +}; + + +struct ra_conf { +	uint64_t          page_size; +	uint32_t          page_count; +	void             *cache_block; +	struct ra_file    files; +	gf_boolean_t      force_atime_update; +	pthread_mutex_t   conf_lock; +}; + + +typedef struct ra_conf ra_conf_t; +typedef struct ra_local ra_local_t; +typedef struct ra_page ra_page_t; +typedef struct ra_file ra_file_t; +typedef struct ra_waitq ra_waitq_t; +typedef struct ra_fill ra_fill_t; + +ra_page_t * +ra_page_get (ra_file_t *file, +	     off_t offset); +ra_page_t * +ra_page_create (ra_file_t *file, +		off_t offset); +void +ra_page_fault (ra_file_t *file, +	       call_frame_t *frame, +	       off_t offset); +void +ra_wait_on_page (ra_page_t *page, +		 call_frame_t *frame); +ra_waitq_t * +ra_page_wakeup (ra_page_t *page); + +void +ra_page_flush (ra_page_t *page); + +ra_waitq_t * +ra_page_error (ra_page_t *page, +	       int32_t op_ret, +	       int32_t op_errno); +void +ra_page_purge (ra_page_t *page); + +void +ra_frame_return (call_frame_t *frame); +void +ra_frame_fill (ra_page_t *page, +	       call_frame_t *frame); + +void +ra_file_destroy (ra_file_t *file); + +static inline void +ra_file_lock (ra_file_t *file) +{ +	pthread_mutex_lock (&file->file_lock); +} + +static inline void +ra_file_unlock (ra_file_t *file) +{ +	pthread_mutex_unlock (&file->file_lock); +} + +static inline void +ra_conf_lock (ra_conf_t *conf) +{ +	pthread_mutex_lock (&conf->conf_lock); +} + +static inline void +ra_conf_unlock (ra_conf_t *conf) +{ +	pthread_mutex_unlock (&conf->conf_lock); +} +static inline void +ra_local_lock (ra_local_t *local) +{ +	pthread_mutex_lock (&local->local_lock); +} + +static inline void +ra_local_unlock (ra_local_t *local) +{ +	pthread_mutex_unlock (&local->local_lock); +} + +#endif /* __READ_AHEAD_H */ diff --git a/xlators/performance/stat-prefetch/Makefile.am b/xlators/performance/stat-prefetch/Makefile.am new file mode 100644 index 00000000000..af437a64d6d --- /dev/null +++ b/xlators/performance/stat-prefetch/Makefile.am @@ -0,0 +1 @@ +SUBDIRS = src diff --git a/xlators/performance/stat-prefetch/src/Makefile.am b/xlators/performance/stat-prefetch/src/Makefile.am new file mode 100644 index 00000000000..e52f2df48fd --- /dev/null +++ b/xlators/performance/stat-prefetch/src/Makefile.am @@ -0,0 +1,11 @@ +xlator_PROGRAMS = stat-prefetch.so +xlatordir = $(libdir)/glusterfs/$(PACKAGE_VERSION)/xlator/performance + +stat_prefetch_so_SOURCES = stat-prefetch.c +noinst_HEADERS = stat-prefetch.h + +AM_CFLAGS = -fPIC -D_FILE_OFFSET_BITS=64 -D_GNU_SOURCE -Wall \ +	-I$(top_srcdir)/libglusterfs/src -shared -nostartfiles + +CLEANFILES =  + diff --git a/xlators/performance/stat-prefetch/src/stat-prefetch.c b/xlators/performance/stat-prefetch/src/stat-prefetch.c new file mode 100644 index 00000000000..f2a78f676f9 --- /dev/null +++ b/xlators/performance/stat-prefetch/src/stat-prefetch.c @@ -0,0 +1,508 @@ +/* +   Copyright (c) 2006, 2007, 2008 Z RESEARCH, Inc. <http://www.zresearch.com> +   This file is part of GlusterFS. + +   GlusterFS is free software; you can redistribute it and/or modify +   it under the terms of the GNU General Public License as published +   by the Free Software Foundation; either version 3 of the License, +   or (at your option) any later version. + +   GlusterFS is distributed in the hope that it will be useful, but +   WITHOUT ANY WARRANTY; without even the implied warranty of +   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU +   General Public License for more details. + +   You should have received a copy of the GNU General Public License +   along with this program.  If not, see +   <http://www.gnu.org/licenses/>. +*/ + +#ifndef _CONFIG_H +#define _CONFIG_H +#include "config.h" +#endif + +#include "glusterfs.h" +#include "stat-prefetch.h" +#include "dict.h" +#include "xlator.h" +#include <sys/time.h> + +struct sp_cache { +  struct sp_cache *next; +  struct sp_cache *prev; +  pid_t pid; +  long long tv_time; +  char *dirname; +  dir_entry_t entries; +  int32_t count; +  pthread_mutex_t lock; +}; + +static void +stat_prefetch_cache_flush (struct sp_cache *cache, int32_t force) +{ +  struct sp_cache *trav; +  struct timeval tv; +  long long tv_time; + +  gettimeofday (&tv, NULL); +  tv_time = (tv.tv_usec + (tv.tv_sec * 1000000)); + +  pthread_mutex_lock (&cache->lock); + +  trav = cache->next; +  while (trav != cache) { +    struct sp_cache *next = trav->next; +    { +      if (tv_time > trav->tv_time || force) { +	gf_log ("stat-prefetch", +		GF_LOG_DEBUG, +		"flush on: %s", +		trav->dirname); +	dir_entry_t *entries; + +	trav->prev->next = trav->next; +	trav->next->prev = trav->prev; + +	entries = trav->entries.next; + +	while (entries) { +	  dir_entry_t *nextentry = entries->next; +	  { +	    free (entries->name); +	    free (entries); +	  } +	  entries = nextentry; +	} +	free (trav->dirname); +	free (trav); +      } +    } +    trav = next; +  } + +  pthread_mutex_unlock (&cache->lock); +} + +static int32_t +stat_prefetch_cache_fill (struct sp_cache *cache, +			  pid_t pid, +			  char *dirname, +			  dir_entry_t *entries) +{ +  struct sp_cache *trav; +  struct timeval tv; + +  pthread_mutex_unlock (&cache->lock); +  trav = cache->next; +  while (trav != cache) { +    //    if (trav->pid == pid && !strcmp (trav->dirname, dirname)) { +    if (!strcmp (trav->dirname, dirname)) { +      break; +    } +    trav = trav->next; +  } + +  if (trav == cache) { +    trav = CALLOC (1, sizeof (*trav)); +    ERR_ABORT (trav); +    trav->pid = pid; +    trav->dirname = dirname; + +    trav->prev = cache->prev; +    trav->next = cache; +    trav->next->prev = trav; +    trav->prev->next = trav; +  } else { +    free (dirname); +  } + +  while (trav->entries.next) { +    dir_entry_t *tmp = trav->entries.next; + +    trav->entries.next = trav->entries.next->next; +    free (tmp->name); +    free (tmp); +  } +  trav->entries.next = entries->next; +  entries->next = NULL; + +  gettimeofday (&tv, NULL); +  trav->tv_time = (tv.tv_usec + (tv.tv_sec * 1000000)) + cache->tv_time; + +  pthread_mutex_unlock (&cache->lock); +  return 0; +} + +static int32_t +stat_prefetch_cache_lookup (struct sp_cache *cache, +			    pid_t pid, +			    const char *path, +			    struct stat *buf) +{ +  struct sp_cache *trav; +  char *dirname = strdup (path); +  char *filename = strrchr (dirname, '/'); +  dir_entry_t *entries; +  dir_entry_t *prev = NULL; + +  *filename = '\0'; +  filename ++; + +  pthread_mutex_lock (&cache->lock); +  trav = cache->next; +  while (trav != cache) { +    //    if ((trav->pid == pid) && !strcmp (dirname, trav->dirname)) +    if (!strcmp (dirname, trav->dirname)) +      break; +    trav = trav->next; +  } +  if (trav == cache) { +    free (dirname); +    pthread_mutex_unlock (&cache->lock); +    return -1; +  } + +  entries = trav->entries.next; +  prev = &trav->entries; +  while (entries) { +    if (!strcmp (entries->name, filename)) +      break; +    prev = entries; +    entries = entries->next; +  } +  if (!entries) { +    free (dirname); +    pthread_mutex_unlock (&cache->lock); +    return -1; +  } + +  *buf = entries->buf; +  prev->next = entries->next; +  free (entries->name); +  free (entries); +  free (dirname); + +  pthread_mutex_unlock (&cache->lock); + +  return 0; +} + +			     +int32_t +stat_prefetch_readdir_cbk (call_frame_t *frame, +			   void *cookie, +			   xlator_t *this, +			   int32_t op_ret, +			   int32_t op_errno, +			   dir_entry_t *entries, +			   int32_t count) +{ +  char *path = frame->local; +  pid_t pid = frame->root->pid; +  frame->local = NULL; + +  STACK_UNWIND (frame, op_ret, op_errno, entries, count); + +  if (op_ret == 0) +    stat_prefetch_cache_fill (this->private, +			      pid, +			      path, +			      entries); +  else +    free (path); + +  return 0; +} + +int32_t +stat_prefetch_readdir (call_frame_t *frame, +		       xlator_t *this, +		       const char *path) +{ +  stat_prefetch_cache_flush (this->private, 0); + +  frame->local = strdup (path); +  STACK_WIND (frame, +	      stat_prefetch_readdir_cbk, +	      FIRST_CHILD(this), +	      FIRST_CHILD(this)->fops->readdir, +	      path); +  return 0; +} + + +int32_t +stat_prefetch_getattr_cbk (call_frame_t *frame, +			   void *cookie, +			   xlator_t *this, +			   int32_t op_ret, +			   int32_t op_errno, +			   struct stat *buf) +{ +  STACK_UNWIND (frame, op_ret, op_errno, buf); +  return 0; +} + +int32_t +stat_prefetch_getattr (call_frame_t *frame, +		       struct xlator *this, +		       const char *path) +{ +  struct stat buf; +  pid_t pid = frame->root->pid; +  stat_prefetch_cache_flush (this->private, 0); + +  if (stat_prefetch_cache_lookup (this->private, +				  pid, +				  path, +				  &buf) == 0) { +    STACK_UNWIND (frame, 0, 0, &buf); +    return 0; +  } + +  STACK_WIND (frame, +	      stat_prefetch_getattr_cbk, +	      FIRST_CHILD(this), +	      FIRST_CHILD(this)->fops->getattr, +	      path); + +  return 0; +} + + +int32_t +stat_prefetch_unlink_cbk (call_frame_t *frame, +                          void *cookie, +                          xlator_t *this, +                          int32_t op_ret, +                          int32_t op_errno) +{ +  STACK_UNWIND (frame, op_ret, op_errno); +  return 0; +} + +int32_t +stat_prefetch_unlink (call_frame_t *frame, +                      struct xlator *this, +                      const char *path) +{ +  stat_prefetch_cache_flush (this->private, 1); + +  STACK_WIND (frame, +              stat_prefetch_unlink_cbk, +              FIRST_CHILD(this), +              FIRST_CHILD(this)->fops->unlink, +              path); + +  return 0; +} + + +int32_t +stat_prefetch_chmod_cbk (call_frame_t *frame, +			 void *cookie, +			 xlator_t *this, +			 int32_t op_ret, +			 int32_t op_errno, +			 struct stat *buf) +{ +  STACK_UNWIND (frame, op_ret, op_errno, buf); +  return 0; +} + +int32_t +stat_prefetch_chmod (call_frame_t *frame, +		     struct xlator *this, +		     const char *path, +		     mode_t mode) +{ +  stat_prefetch_cache_flush (this->private, 1); + +  STACK_WIND (frame, +              stat_prefetch_chmod_cbk, +              FIRST_CHILD(this), +              FIRST_CHILD(this)->fops->chmod, +              path, +	      mode); + +  return 0; +} + + +int32_t +stat_prefetch_chown_cbk (call_frame_t *frame, +			 void *cookie, +			 xlator_t *this, +			 int32_t op_ret, +			 int32_t op_errno, +			 struct stat *buf) +{ +  STACK_UNWIND (frame, op_ret, op_errno, buf); +  return 0; +} + +int32_t +stat_prefetch_chown (call_frame_t *frame, +		     struct xlator *this, +		     const char *path, +		     uid_t uid, +		     gid_t gid) +{ +  stat_prefetch_cache_flush (this->private, 1); + +  STACK_WIND (frame, +              stat_prefetch_chown_cbk, +              FIRST_CHILD(this), +              FIRST_CHILD(this)->fops->chown, +              path, +	      uid, +	      gid); + +  return 0; +} + + +int32_t +stat_prefetch_utimes_cbk (call_frame_t *frame, +                          void *cookie, +                          xlator_t *this, +                          int32_t op_ret, +                          int32_t op_errno, +			  struct stat *buf) +{ +  STACK_UNWIND (frame, op_ret, op_errno, buf); +  return 0; +} + +int32_t +stat_prefetch_utimes (call_frame_t *frame, +		      struct xlator *this, +		      const char *path, +		      struct timespec *tvp) +{ +  stat_prefetch_cache_flush (this->private, 1); + +  STACK_WIND (frame, +              stat_prefetch_utimes_cbk, +              FIRST_CHILD(this), +              FIRST_CHILD(this)->fops->utimes, +              path, +	      tvp); + +  return 0; +} + + +int32_t +stat_prefetch_truncate_cbk (call_frame_t *frame, +			    void *cookie, +			    xlator_t *this, +			    int32_t op_ret, +			    int32_t op_errno, +			    struct stat *buf) +{ +  STACK_UNWIND (frame, op_ret, op_errno, buf); +  return 0; +} + +int32_t +stat_prefetch_truncate (call_frame_t *frame, +			struct xlator *this, +			const char *path, +			off_t offset) +{ +  stat_prefetch_cache_flush (this->private, 1); + +  STACK_WIND (frame, +              stat_prefetch_truncate_cbk, +              FIRST_CHILD(this), +              FIRST_CHILD(this)->fops->truncate, +              path, +	      offset); + +  return 0; +} + + +int32_t +stat_prefetch_rename_cbk (call_frame_t *frame, +                          void *cookie, +                          xlator_t *this, +                          int32_t op_ret, +                          int32_t op_errno) +{ +  STACK_UNWIND (frame, op_ret, op_errno); +  return 0; +} + +int32_t +stat_prefetch_rename (call_frame_t *frame, +                      struct xlator *this, +                      const char *oldpath, +		      const char *newpath) +{ +  stat_prefetch_cache_flush (this->private, 1); + +  STACK_WIND (frame, +              stat_prefetch_rename_cbk, +              FIRST_CHILD(this), +              FIRST_CHILD(this)->fops->rename, +              oldpath, +	      newpath); + +  return 0; +} + +int32_t  +init (struct xlator *this) +{ +  struct sp_cache *cache; +  dict_t *options = this->options; + +  if (!this->children || this->children->next) { +    gf_log ("stat-prefetch", +	    GF_LOG_ERROR, +	    "FATAL: translator %s does not have exactly one child node", +	    this->name); +    return -1; +  } + +  cache = (void *) CALLOC (1, sizeof (*cache)); +  ERR_ABORT (cache); +  cache->next = cache->prev = cache; + +  cache->tv_time = 1 * 1000000; + +  if (dict_get (options, "cache-seconds")) { +    cache->tv_time = (data_to_int64 (dict_get (options, "cache-seconds")) * +		      1000000); +  } + +  pthread_mutex_init (&cache->lock, NULL); + +  this->private = cache; +  return 0; +} + +void +fini (struct xlator *this) +{ +  return; +} + + +struct xlator_fops fops = { +  .getattr     = stat_prefetch_getattr, +  .readdir     = stat_prefetch_readdir, +  .unlink      = stat_prefetch_unlink, +  .chmod       = stat_prefetch_chmod, +  .chown       = stat_prefetch_chown, +  .rename      = stat_prefetch_rename, +  .utimes      = stat_prefetch_utimes, +  .truncate    = stat_prefetch_truncate, +}; + +struct xlator_mops mops = { +}; diff --git a/xlators/performance/stat-prefetch/src/stat-prefetch.h b/xlators/performance/stat-prefetch/src/stat-prefetch.h new file mode 100644 index 00000000000..7d9645a2a81 --- /dev/null +++ b/xlators/performance/stat-prefetch/src/stat-prefetch.h @@ -0,0 +1,32 @@ +/* +   Copyright (c) 2006, 2007, 2008 Z RESEARCH, Inc. <http://www.zresearch.com> +   This file is part of GlusterFS. + +   GlusterFS is free software; you can redistribute it and/or modify +   it under the terms of the GNU General Public License as published +   by the Free Software Foundation; either version 3 of the License, +   or (at your option) any later version. + +   GlusterFS is distributed in the hope that it will be useful, but +   WITHOUT ANY WARRANTY; without even the implied warranty of +   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU +   General Public License for more details. + +   You should have received a copy of the GNU General Public License +   along with this program.  If not, see +   <http://www.gnu.org/licenses/>. +*/ + +#ifndef _STAT_PREFETCH_H_ +#define _STAT_PREFETCH_H_ + +#ifndef _CONFIG_H +#define _CONFIG_H +#include "config.h" +#endif + +#include <stdio.h> +#include <sys/time.h> +#include "xlator.h" + +#endif /* _STAT_PREFETCH_H_ */ diff --git a/xlators/performance/symlink-cache/Makefile.am b/xlators/performance/symlink-cache/Makefile.am new file mode 100644 index 00000000000..d471a3f9243 --- /dev/null +++ b/xlators/performance/symlink-cache/Makefile.am @@ -0,0 +1,3 @@ +SUBDIRS = src + +CLEANFILES =  diff --git a/xlators/performance/symlink-cache/src/Makefile.am b/xlators/performance/symlink-cache/src/Makefile.am new file mode 100644 index 00000000000..b8b257c186c --- /dev/null +++ b/xlators/performance/symlink-cache/src/Makefile.am @@ -0,0 +1,12 @@ +xlator_LTLIBRARIES = symlink-cache.la +xlatordir = $(libdir)/glusterfs/$(PACKAGE_VERSION)/xlator/performance + +symlink_cache_la_LDFLAGS = -module -avoidversion  + +symlink_cache_la_SOURCES = symlink-cache.c +symlink_cache_la_LIBADD = $(top_builddir)/libglusterfs/src/libglusterfs.la + +AM_CFLAGS = -fPIC -D_FILE_OFFSET_BITS=64 -D_GNU_SOURCE -Wall -D$(GF_HOST_OS)\ +	-I$(top_srcdir)/libglusterfs/src -shared -nostartfiles $(GF_CFLAGS) + +CLEANFILES =  diff --git a/xlators/performance/symlink-cache/src/symlink-cache.c b/xlators/performance/symlink-cache/src/symlink-cache.c new file mode 100644 index 00000000000..fc207a6272e --- /dev/null +++ b/xlators/performance/symlink-cache/src/symlink-cache.c @@ -0,0 +1,399 @@ +/* +  Copyright (c) 2008 Z RESEARCH, Inc. <http://www.zresearch.com> +  This file is part of GlusterFS. + +  GlusterFS is free software; you can redistribute it and/or modify +  it under the terms of the GNU General Public License as published +  by the Free Software Foundation; either version 3 of the License, +  or (at your option) any later version. + +  GlusterFS is distributed in the hope that it will be useful, but +  WITHOUT ANY WARRANTY; without even the implied warranty of +  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU +  General Public License for more details. + +  You should have received a copy of the GNU General Public License +  along with this program.  If not, see +  <http://www.gnu.org/licenses/>. +*/ + + +#ifndef _CONFIG_H +#define _CONFIG_H +#include "config.h" +#endif + +#include "glusterfs.h" +#include "logging.h" +#include "dict.h" +#include "xlator.h" +#include "list.h" +#include "compat.h" +#include "compat-errno.h" +#include "common-utils.h" + +struct symlink_cache { +	time_t ctime; +	char   *readlink; +}; + + +static int +symlink_inode_ctx_get (inode_t *inode, xlator_t *this, void **ctx) +{ +	int ret = 0; +	uint64_t tmp_ctx = 0; +	ret = inode_ctx_get (inode, this, &tmp_ctx); +	if (-1 == ret) +		gf_log (this->name, GF_LOG_ERROR, "dict get failed"); +	else +		*ctx = (void *)(long)tmp_ctx; + +	return 0; +} + + +static int +symlink_inode_ctx_set (inode_t *inode, xlator_t *this, void *ctx) +{ +	int ret = 0; +	ret = inode_ctx_put (inode, this, (uint64_t)(long) ctx); +	if (-1 == ret) +		gf_log (this->name, GF_LOG_ERROR, "dict set failed"); + +	return 0; +} + + +int +sc_cache_update (xlator_t *this, inode_t *inode, const char *link) +{ +	struct symlink_cache *sc = NULL; + +	symlink_inode_ctx_get (inode, this, VOID(&sc)); +	if (!sc) +		return 0; + +	if (!sc->readlink) { +		gf_log (this->name, GF_LOG_DEBUG, +			"updating cache: %s", link); + +		sc->readlink = strdup (link); +	} else { +		gf_log (this->name, GF_LOG_DEBUG, +			"not updating existing cache: %s with %s", +			sc->readlink, link); +	} + +	return 0; +} + + +int +sc_cache_set (xlator_t *this, inode_t *inode, struct stat *buf, +	      const char *link) +{ +	struct symlink_cache *sc = NULL; +	int                   ret = -1; +	int                   need_set = 0; + + +	symlink_inode_ctx_get (inode, this, VOID(&sc)); +	if (!sc) { +		need_set = 1; +		sc = CALLOC (1, sizeof (*sc)); +		if (!sc) { +			gf_log (this->name, GF_LOG_ERROR, +				"out of memory :("); +			goto err; +		} +	} + +	if (sc->readlink) { +		gf_log (this->name, GF_LOG_DEBUG, +			"replacing old cache: %s with new cache: %s", +			sc->readlink, link); +		FREE (sc->readlink); +		sc->readlink = NULL; +	} + +	if (link) { +		sc->readlink = strdup (link); +		if (!sc->readlink) { +			gf_log (this->name, GF_LOG_ERROR, +				"out of memory :("); +			goto err; +		} +	} + +	sc->ctime = buf->st_ctime; + +	gf_log (this->name, GF_LOG_DEBUG, +		"setting symlink cache: %s", link); + +	if (need_set) { +		ret = symlink_inode_ctx_set (inode, this, sc); + +		if (ret < 0) { +			gf_log (this->name, GF_LOG_ERROR, +				"could not set inode context (%s)", +				strerror (-ret)); +			goto err; +		} +	} + +	return 0; +err: + +	if (sc) { +		if (sc->readlink) +			FREE (sc->readlink); +		sc->readlink = NULL; +		FREE (sc); +	} + +	return -1; +} + + +int +sc_cache_flush (xlator_t *this, inode_t *inode) +{ +	struct symlink_cache *sc = NULL; + +	symlink_inode_ctx_get (inode, this, VOID(&sc)); +	if (!sc) +		return 0; + +	if (sc->readlink) { +		gf_log (this->name, GF_LOG_DEBUG, +			"flushing cache: %s", sc->readlink); + +		FREE (sc->readlink); +		sc->readlink = NULL; +	} + +	FREE (sc); + +	return 0; +} + + +int +sc_cache_validate (xlator_t *this, inode_t *inode, struct stat *buf) +{ +	struct symlink_cache *sc = NULL; +	uint64_t tmp_sc = 0; + +	if (!S_ISLNK (buf->st_mode)) { +		sc_cache_flush (this, inode); +		return 0; +	} + +	symlink_inode_ctx_get (inode, this, VOID(&sc)); + +	if (!sc) { +		sc_cache_set (this, inode, buf, NULL); +		inode_ctx_get (inode, this, &tmp_sc); + +		if (!sc) { +			gf_log (this->name, GF_LOG_ERROR, +				"out of memory :("); +			return 0; +		} +		sc = (struct symlink_cache *)(long)tmp_sc; +	} + +	if (sc->ctime == buf->st_ctime) +		return 0; + +	/* STALE */ +	if (sc->readlink) { +		gf_log (this->name, GF_LOG_DEBUG, +			"flushing cache: %s", sc->readlink); + +		FREE (sc->readlink); +		sc->readlink = NULL; +	} + +	sc->ctime = buf->st_ctime; + +	return 0; +} + + + +int +sc_cache_get (xlator_t *this, inode_t *inode, char **link) +{ +	struct symlink_cache *sc = NULL; + +	symlink_inode_ctx_get (inode, this, VOID(&sc)); + +	if (!sc) +		return 0; + +	if (link && sc->readlink) +		*link = strdup (sc->readlink); +	return 0; +} + + +int +sc_readlink_cbk (call_frame_t *frame, void *cookie, +		 xlator_t *this, int op_ret, int op_errno, +		 const char *link) +{ +	if (op_ret > 0) +		sc_cache_update (this, frame->local, link); + +	inode_unref (frame->local); +	frame->local = NULL; + +        STACK_UNWIND (frame, op_ret, op_errno, link); +        return 0; +} + + +int +sc_readlink (call_frame_t *frame, xlator_t *this, +	     loc_t *loc, size_t size) +{ +	char *link = NULL; + +	sc_cache_get (this, loc->inode, &link); + +	if (link) { +		/* cache hit */ +		gf_log (this->name, GF_LOG_DEBUG, +			"cache hit %s -> %s", +			loc->path, link); +		STACK_UNWIND (frame, strlen (link) + 1, 0, link); +		FREE (link); +		return 0; +	} + +	frame->local = inode_ref (loc->inode); + +        STACK_WIND (frame, sc_readlink_cbk, +                    FIRST_CHILD(this), +                    FIRST_CHILD(this)->fops->readlink, +                    loc, size); + +	return 0; +} + + +int +sc_symlink_cbk (call_frame_t *frame, void *cookie, +		xlator_t *this, int op_ret, int op_errno, +		inode_t *inode, struct stat *buf) +{ +	if (op_ret == 0) { +		if (frame->local) { +			sc_cache_set (this, inode, buf, frame->local); +		} +	} + +        STACK_UNWIND (frame, op_ret, op_errno, inode, buf); +        return 0; +} + + +int +sc_symlink (call_frame_t *frame, xlator_t *this, +	    const char *dst, loc_t *src) +{ +	frame->local = strdup (dst); + +        STACK_WIND (frame, sc_symlink_cbk, +                    FIRST_CHILD(this), +                    FIRST_CHILD(this)->fops->symlink, +                    dst, src); + +	return 0; +} + + +int +sc_lookup_cbk (call_frame_t *frame, void *cookie, +	       xlator_t *this, int op_ret, int op_errno, +	       inode_t *inode, struct stat *buf, dict_t *xattr) +{ +	if (op_ret == 0) +		sc_cache_validate (this, inode, buf); +	else +		sc_cache_flush (this, inode); + +        STACK_UNWIND (frame, op_ret, op_errno, inode, buf, xattr); +        return 0; +} + + +int +sc_lookup (call_frame_t *frame, xlator_t *this, +	   loc_t *loc, dict_t *xattr_req) +{ +        STACK_WIND (frame, sc_lookup_cbk, +                    FIRST_CHILD(this), +                    FIRST_CHILD(this)->fops->lookup, +                    loc, xattr_req); + +        return 0; +} + + +int +sc_forget (xlator_t *this, +	   inode_t *inode) +{ +	sc_cache_flush (this, inode); + +        return 0; +} + + +int32_t  +init (xlator_t *this) +{ +	 +        if (!this->children || this->children->next) +        { +                gf_log (this->name, GF_LOG_ERROR, +                        "FATAL: volume (%s) not configured with exactly one " +			"child", this->name); +                return -1; +        } + +	if (!this->parents) { +		gf_log (this->name, GF_LOG_WARNING, +			"dangling volume. check volfile "); +	} + +        return 0; +} + + +void +fini (xlator_t *this) +{ +        return; +} + + +struct xlator_fops fops = { +	.lookup      = sc_lookup, +	.symlink     = sc_symlink, +	.readlink    = sc_readlink, +}; + +struct xlator_mops mops = { +}; + +struct xlator_cbks cbks = { +        .forget  = sc_forget, +}; + +struct volume_options options[] = { +	{ .key = {NULL} }, +}; diff --git a/xlators/performance/write-behind/Makefile.am b/xlators/performance/write-behind/Makefile.am new file mode 100644 index 00000000000..d471a3f9243 --- /dev/null +++ b/xlators/performance/write-behind/Makefile.am @@ -0,0 +1,3 @@ +SUBDIRS = src + +CLEANFILES =  diff --git a/xlators/performance/write-behind/src/Makefile.am b/xlators/performance/write-behind/src/Makefile.am new file mode 100644 index 00000000000..f800abad50d --- /dev/null +++ b/xlators/performance/write-behind/src/Makefile.am @@ -0,0 +1,12 @@ +xlator_LTLIBRARIES = write-behind.la +xlatordir = $(libdir)/glusterfs/$(PACKAGE_VERSION)/xlator/performance + +write_behind_la_LDFLAGS = -module -avoidversion  + +write_behind_la_SOURCES = write-behind.c +write_behind_la_LIBADD = $(top_builddir)/libglusterfs/src/libglusterfs.la + +AM_CFLAGS = -fPIC -D_FILE_OFFSET_BITS=64 -D_GNU_SOURCE -Wall -D$(GF_HOST_OS)\ +	-I$(top_srcdir)/libglusterfs/src -shared -nostartfiles $(GF_CFLAGS) + +CLEANFILES =  diff --git a/xlators/performance/write-behind/src/write-behind.c b/xlators/performance/write-behind/src/write-behind.c new file mode 100644 index 00000000000..04a447d49e9 --- /dev/null +++ b/xlators/performance/write-behind/src/write-behind.c @@ -0,0 +1,1444 @@ +/* +  Copyright (c) 2006, 2007, 2008, 2009 Z RESEARCH, Inc. <http://www.zresearch.com> +  This file is part of GlusterFS. + +  GlusterFS is free software; you can redistribute it and/or modify +  it under the terms of the GNU General Public License as published +  by the Free Software Foundation; either version 3 of the License, +  or (at your option) any later version. + +  GlusterFS is distributed in the hope that it will be useful, but +  WITHOUT ANY WARRANTY; without even the implied warranty of +  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU +  General Public License for more details. + +  You should have received a copy of the GNU General Public License +  along with this program.  If not, see +  <http://www.gnu.org/licenses/>. +*/ + +/*TODO: check for non null wb_file_data before getting wb_file */ + + +#ifndef _CONFIG_H +#define _CONFIG_H +#include "config.h" +#endif + +#include "glusterfs.h" +#include "logging.h" +#include "dict.h" +#include "xlator.h" +#include "list.h" +#include "compat.h" +#include "compat-errno.h" +#include "common-utils.h" + +#define MAX_VECTOR_COUNT 8 +  +typedef struct list_head list_head_t; +struct wb_conf; +struct wb_page; +struct wb_file; + + +struct wb_conf { +        uint64_t aggregate_size; +        uint64_t window_size; +        uint64_t disable_till; +        gf_boolean_t enable_O_SYNC; +        gf_boolean_t flush_behind; +}; + + +typedef struct wb_local { +        list_head_t winds; +        struct wb_file *file; +        list_head_t unwind_frames; +        int op_ret; +        int op_errno; +        call_frame_t *frame; +} wb_local_t; + + +typedef struct write_request { +        call_frame_t *frame; +        off_t offset; +        /*  int32_t op_ret; +            int32_t op_errno; */ +        struct iovec *vector; +        int32_t count; +        dict_t *refs; +        char write_behind; +        char stack_wound; +        char got_reply; +        list_head_t list; +        list_head_t winds; +        /*  list_head_t unwinds;*/ +} wb_write_request_t; + + +struct wb_file { +        int disabled; +        uint64_t disable_till; +        off_t offset; +        size_t window_size; +        int32_t refcount; +        int32_t op_ret; +        int32_t op_errno; +        list_head_t request; +        fd_t *fd; +        gf_lock_t lock; +        xlator_t *this; +}; + + +typedef struct wb_conf wb_conf_t; +typedef struct wb_page wb_page_t; +typedef struct wb_file wb_file_t; + + +int32_t  +wb_process_queue (call_frame_t *frame, wb_file_t *file, char flush_all); + +int32_t +wb_sync (call_frame_t *frame, wb_file_t *file, list_head_t *winds); + +int32_t +wb_sync_all (call_frame_t *frame, wb_file_t *file); + +int32_t  +__wb_mark_winds (list_head_t *list, list_head_t *winds, size_t aggregate_size); + + +wb_file_t * +wb_file_create (xlator_t *this, +                fd_t *fd) +{ +        wb_file_t *file = NULL; +        wb_conf_t *conf = this->private;  + +        file = CALLOC (1, sizeof (*file)); +        INIT_LIST_HEAD (&file->request); + +        /* fd_ref() not required, file should never decide the existance of +         * an fd */ +        file->fd= fd; +        file->disable_till = conf->disable_till; +        file->this = this; +        file->refcount = 1; + +        fd_ctx_set (fd, this, (uint64_t)(long)file); +         +        return file; +} + +void +wb_file_destroy (wb_file_t *file) +{ +        int32_t refcount = 0; + +        LOCK (&file->lock); +        { +                refcount = --file->refcount; +        } +        UNLOCK (&file->lock); + +        if (!refcount){ +                LOCK_DESTROY (&file->lock); +                FREE (file); +        } + +        return; +} + + +int32_t +wb_sync_cbk (call_frame_t *frame, +             void *cookie, +             xlator_t *this, +             int32_t op_ret, +             int32_t op_errno, +             struct stat *stbuf) +{ +        wb_local_t *local = NULL; +        list_head_t *winds = NULL; +        wb_file_t *file = NULL; +        wb_write_request_t *request = NULL, *dummy = NULL; + +        local = frame->local; +        winds = &local->winds; +        file = local->file; + +        LOCK (&file->lock); +        { +                list_for_each_entry_safe (request, dummy, winds, winds) { +                        request->got_reply = 1; +                        if (!request->write_behind && (op_ret == -1)) { +                                wb_local_t *per_request_local = request->frame->local; +                                per_request_local->op_ret = op_ret; +                                per_request_local->op_errno = op_errno; +                        } + +                        /* +                          request->op_ret = op_ret; +                          request->op_errno = op_errno;  +                        */ +                } +        } +        UNLOCK (&file->lock); + +        if (op_ret == -1) +        { +                file->op_ret = op_ret; +                file->op_errno = op_errno; +        } + +        wb_process_queue (frame, file, 0);   +   +        /* safe place to do fd_unref */ +        fd_unref (file->fd); + +        STACK_DESTROY (frame->root); + +        return 0; +} + +int32_t +wb_sync_all (call_frame_t *frame, wb_file_t *file)  +{ +        list_head_t winds; +        int32_t bytes = 0; + +        INIT_LIST_HEAD (&winds); + +        LOCK (&file->lock); +        { +                bytes = __wb_mark_winds (&file->request, &winds, 0); +        } +        UNLOCK (&file->lock); + +        wb_sync (frame, file, &winds); + +        return bytes; +} + + +int32_t +wb_sync (call_frame_t *frame, wb_file_t *file, list_head_t *winds) +{ +        wb_write_request_t *dummy = NULL, *request = NULL, *first_request = NULL, *next = NULL; +        size_t total_count = 0, count = 0; +        size_t copied = 0; +        call_frame_t *sync_frame = NULL; +        dict_t *refs = NULL; +        wb_local_t *local = NULL; +        struct iovec *vector = NULL; +        int32_t bytes = 0; +        size_t bytecount = 0; + +        list_for_each_entry (request, winds, winds) +        { +                total_count += request->count; +                bytes += iov_length (request->vector, request->count); +        } + +        if (!total_count) { +                return 0; +        } +   +        list_for_each_entry_safe (request, dummy, winds, winds) { +                if (!vector) { +                        vector = MALLOC (VECTORSIZE (MAX_VECTOR_COUNT)); +                        refs = get_new_dict (); +         +                        local = CALLOC (1, sizeof (*local)); +                        INIT_LIST_HEAD (&local->winds); +             +                        first_request = request; +                } + +                count += request->count; +                bytecount = VECTORSIZE (request->count); +                memcpy (((char *)vector)+copied, +                        request->vector, +                        bytecount); +                copied += bytecount; +       +                if (request->refs) { +                        dict_copy (request->refs, refs); +                } + +                next = NULL; +                if (request->winds.next != winds) {     +                        next = list_entry (request->winds.next, struct write_request, winds); +                } + +                list_del_init (&request->winds); +                list_add_tail (&request->winds, &local->winds); + +                if (!next || ((count + next->count) > MAX_VECTOR_COUNT)) { +                        sync_frame = copy_frame (frame);   +                        sync_frame->local = local; +                        local->file = file; +                        sync_frame->root->req_refs = dict_ref (refs); +                        fd_ref (file->fd); +                        STACK_WIND (sync_frame, +                                    wb_sync_cbk, +                                    FIRST_CHILD(sync_frame->this), +                                    FIRST_CHILD(sync_frame->this)->fops->writev, +                                    file->fd, vector, +                                    count, first_request->offset); +         +                        dict_unref (refs); +                        FREE (vector); +                        first_request = NULL; +                        refs = NULL; +                        vector = NULL; +                        copied = count = 0; +                } +        } + +        return bytes; +} + + +int32_t  +wb_stat_cbk (call_frame_t *frame, +             void *cookie, +             xlator_t *this, +             int32_t op_ret, +             int32_t op_errno, +             struct stat *buf) +{ +        wb_local_t *local = NULL; +   +        local = frame->local; +   +        if (local->file) +                fd_unref (local->file->fd); + +        STACK_UNWIND (frame, op_ret, op_errno, buf); + +        return 0; +} + + +int32_t +wb_stat (call_frame_t *frame, +         xlator_t *this, +         loc_t *loc) +{ +        wb_file_t *file = NULL; +        fd_t *iter_fd = NULL; +        wb_local_t *local = NULL; +	uint64_t tmp_file = 0; + +        if (loc->inode) +        { +                iter_fd = fd_lookup (loc->inode, frame->root->pid); +                if (iter_fd) { +                        if (!fd_ctx_get (iter_fd, this, &tmp_file)) { +				file = (wb_file_t *)(long)tmp_file; +                        } else { +                                fd_unref (iter_fd); +                        } +                } +                if (file) { +                        wb_sync_all (frame, file); +                } +        } + +        local = CALLOC (1, sizeof (*local)); +        local->file = file; + +        frame->local = local; + +        STACK_WIND (frame, wb_stat_cbk, +                    FIRST_CHILD(this), +                    FIRST_CHILD(this)->fops->stat, +                    loc); +        return 0; +} + + +int32_t  +wb_fstat (call_frame_t *frame, +          xlator_t *this, +          fd_t *fd) +{ +        wb_file_t *file = NULL; +        wb_local_t *local = NULL; +  	uint64_t tmp_file = 0; + +        if (fd_ctx_get (fd, this, &tmp_file)) { +                gf_log (this->name, GF_LOG_ERROR, "returning EBADFD"); +                STACK_UNWIND (frame, -1, EBADFD, NULL); +                return 0; +        } + +	file = (wb_file_t *)(long)tmp_file; +        if (file) { +                fd_ref (file->fd); +                wb_sync_all (frame, file); +        } + +        local = CALLOC (1, sizeof (*local)); +        local->file = file; + +        frame->local = local; +   +        STACK_WIND (frame, +                    wb_stat_cbk, +                    FIRST_CHILD(this), +                    FIRST_CHILD(this)->fops->fstat, +                    fd); +        return 0; +} + + +int32_t +wb_truncate_cbk (call_frame_t *frame, +                 void *cookie, +                 xlator_t *this, +                 int32_t op_ret, +                 int32_t op_errno, +                 struct stat *buf) +{ +        wb_local_t *local = NULL;  +   +        local = frame->local; +        if (local->file) +                fd_unref (local->file->fd); + +        STACK_UNWIND (frame, op_ret, op_errno, buf); +        return 0; +} + + +int32_t  +wb_truncate (call_frame_t *frame, +             xlator_t *this, +             loc_t *loc, +             off_t offset) +{ +        wb_file_t *file = NULL; +        fd_t *iter_fd = NULL; +        wb_local_t *local = NULL; +	uint64_t tmp_file = 0; + +        if (loc->inode) +        { +                iter_fd = fd_lookup (loc->inode, frame->root->pid); +                if (iter_fd) { +                        if (!fd_ctx_get (iter_fd, this, &tmp_file)){ +				file = (wb_file_t *)(long)tmp_file; +                        } else { +                                fd_unref (iter_fd); +                        } +                } +     +                if (file) +                { +                        wb_sync_all (frame, file); +                } +        } +   +        local = CALLOC (1, sizeof (*local)); +        local->file = file; + +        frame->local = local; + +        STACK_WIND (frame, +                    wb_truncate_cbk, +                    FIRST_CHILD(this), +                    FIRST_CHILD(this)->fops->truncate, +                    loc, +                    offset); +        return 0; +} + + +int32_t +wb_ftruncate (call_frame_t *frame, +              xlator_t *this, +              fd_t *fd, +              off_t offset) +{ +        wb_file_t *file = NULL; +        wb_local_t *local = NULL; +	uint64_t tmp_file = 0; + +        if (fd_ctx_get (fd, this, &tmp_file)) { +                gf_log (this->name, GF_LOG_ERROR, "returning EBADFD"); +                STACK_UNWIND (frame, -1, EBADFD, NULL); +                return 0; +        } + +	file = (wb_file_t *)(long)tmp_file; +        if (file) +                wb_sync_all (frame, file); + +        local = CALLOC (1, sizeof (*local)); +        local->file = file; + +        if (file) +                fd_ref (file->fd); + +        frame->local = local; + +        STACK_WIND (frame, +                    wb_truncate_cbk, +                    FIRST_CHILD(this), +                    FIRST_CHILD(this)->fops->ftruncate, +                    fd, +                    offset); +        return 0; +} + + +int32_t  +wb_utimens_cbk (call_frame_t *frame, +                void *cookie, +                xlator_t *this, +                int32_t op_ret, +                int32_t op_errno, +                struct stat *buf) +{ +        wb_local_t *local = NULL;        +   +        local = frame->local; +        if (local->file) +                fd_unref (local->file->fd); + +        STACK_UNWIND (frame, op_ret, op_errno, buf); +        return 0; +} + + +int32_t  +wb_utimens (call_frame_t *frame, +            xlator_t *this, +            loc_t *loc, +            struct timespec tv[2]) +{ +        wb_file_t *file = NULL; +        fd_t *iter_fd = NULL; +        wb_local_t *local = NULL; +	uint64_t tmp_file = 0; + +        if (loc->inode) { +                iter_fd = fd_lookup (loc->inode, frame->root->pid); +                if (iter_fd) { +                        if (!fd_ctx_get (iter_fd, this, &tmp_file)) { +				file = (wb_file_t *)(long)tmp_file; +                        } else { +                                fd_unref (iter_fd); +                        } +                } + +                if (file) +                        wb_sync_all (frame, file); +        } + +        local = CALLOC (1, sizeof (*local)); +        local->file = file; + +        frame->local = local; + +        STACK_WIND (frame, +                    wb_utimens_cbk, +                    FIRST_CHILD(this), +                    FIRST_CHILD(this)->fops->utimens, +                    loc, +                    tv); +        return 0; +} + +int32_t +wb_open_cbk (call_frame_t *frame, +             void *cookie, +             xlator_t *this, +             int32_t op_ret, +             int32_t op_errno, +             fd_t *fd) +{ +        int32_t flags = 0; +        wb_file_t *file = NULL; +        wb_conf_t *conf = this->private; + +        if (op_ret != -1) +        { +                file = wb_file_create (this, fd); + +                /* If mandatory locking has been enabled on this file, +                   we disable caching on it */ + +                if ((fd->inode->st_mode & S_ISGID) && !(fd->inode->st_mode & S_IXGRP)) +                        file->disabled = 1; + +                /* If O_DIRECT then, we disable chaching */ +                if (frame->local) +                { +                        flags = *((int32_t *)frame->local); +                        if (((flags & O_DIRECT) == O_DIRECT) ||  +                            ((flags & O_RDONLY) == O_RDONLY) || +                            (((flags & O_SYNC) == O_SYNC) && +                             conf->enable_O_SYNC == _gf_true)) {  +                                file->disabled = 1; +                        } +                } + +                LOCK_INIT (&file->lock); +        } + +        STACK_UNWIND (frame, op_ret, op_errno, fd); +        return 0; +} + + +int32_t +wb_open (call_frame_t *frame, +         xlator_t *this, +         loc_t *loc, +         int32_t flags, +         fd_t *fd) +{ +        frame->local = CALLOC (1, sizeof(int32_t)); +        *((int32_t *)frame->local) = flags; + +        STACK_WIND (frame, +                    wb_open_cbk, +                    FIRST_CHILD(this), +                    FIRST_CHILD(this)->fops->open, +                    loc, flags, fd); +        return 0; +} + + +int32_t +wb_create_cbk (call_frame_t *frame, +               void *cookie, +               xlator_t *this, +               int32_t op_ret, +               int32_t op_errno, +               fd_t *fd, +               inode_t *inode, +               struct stat *buf) +{ +        wb_file_t *file = NULL; + +        if (op_ret != -1) +        { +                file = wb_file_create (this, fd); +                /*  +                 * If mandatory locking has been enabled on this file, +                 * we disable caching on it +                 */ +                if ((fd->inode->st_mode & S_ISGID) &&  +                    !(fd->inode->st_mode & S_IXGRP)) +                { +                        file->disabled = 1; +                } + +                LOCK_INIT (&file->lock); +        } + +        STACK_UNWIND (frame, op_ret, op_errno, fd, inode, buf); +        return 0; +} + + +int32_t +wb_create (call_frame_t *frame, +           xlator_t *this, +           loc_t *loc, +           int32_t flags, +           mode_t mode, +           fd_t *fd) +{ +        STACK_WIND (frame, +                    wb_create_cbk, +                    FIRST_CHILD(this), +                    FIRST_CHILD(this)->fops->create, +                    loc, flags, mode, fd); +        return 0; +} + + +int32_t  +__wb_cleanup_queue (wb_file_t *file) +{ +        wb_write_request_t *request = NULL, *dummy = NULL; +        int32_t bytes = 0; + +        list_for_each_entry_safe (request, dummy, &file->request, list) +        { +                if (request->got_reply && request->write_behind) +                { +                        bytes += iov_length (request->vector, request->count); +                        list_del_init (&request->list); + +                        FREE (request->vector); +                        dict_unref (request->refs); +       +                        FREE (request); +                } +        } + +        return bytes; +} + + +int32_t  +__wb_mark_wind_all (list_head_t *list, list_head_t *winds) +{ +        wb_write_request_t *request = NULL; +        size_t size = 0; + +        list_for_each_entry (request, list, list) +        { +                if (!request->stack_wound) +                { +                        size += iov_length (request->vector, request->count); +                        request->stack_wound = 1; +                        list_add_tail (&request->winds, winds); +                } +        } +   +        return size; +} + + +size_t  +__wb_get_aggregate_size (list_head_t *list) +{ +        wb_write_request_t *request = NULL; +        size_t size = 0; + +        list_for_each_entry (request, list, list) +        { +                if (!request->stack_wound) +                { +                        size += iov_length (request->vector, request->count); +                } +        } + +        return size; +} + +uint32_t +__wb_get_incomplete_writes (list_head_t *list) +{ +        wb_write_request_t *request = NULL; +        uint32_t count = 0; + +        list_for_each_entry (request, list, list) +        { +                if (request->stack_wound && !request->got_reply) +                { +                        count++; +                } +        } + +        return count; +} + +int32_t +__wb_mark_winds (list_head_t *list, list_head_t *winds, size_t aggregate_conf) +{ +        size_t aggregate_current = 0; +        uint32_t incomplete_writes = 0; + +        incomplete_writes = __wb_get_incomplete_writes (list);  + +        aggregate_current = __wb_get_aggregate_size (list); + +        if ((incomplete_writes == 0) || (aggregate_current >= aggregate_conf)) +        { +                __wb_mark_wind_all (list, winds); +        } + +        return aggregate_current; +} + + +size_t +__wb_get_window_size (list_head_t *list) +{ +        wb_write_request_t *request = NULL; +        size_t size = 0; + +        list_for_each_entry (request, list, list) +        { +                if (request->write_behind && !request->got_reply) +                { +                        size += iov_length (request->vector, request->count); +                } +        } + +        return size; +} + + +size_t  +__wb_mark_unwind_till (list_head_t *list, list_head_t *unwinds, size_t size) +{ +        size_t written_behind = 0; +        wb_write_request_t *request = NULL; + +        list_for_each_entry (request, list, list) +        { +                if (written_behind <= size) +                { +                        if (!request->write_behind) +                        { +                                wb_local_t *local = request->frame->local; +                                written_behind += iov_length (request->vector, request->count); +                                request->write_behind = 1; +                                list_add_tail (&local->unwind_frames, unwinds); +                        } +                } +                else +                { +                        break; +                } +        } + +        return written_behind; +} + + +int32_t  +__wb_mark_unwinds (list_head_t *list, list_head_t *unwinds, size_t window_conf) +{ +        size_t window_current = 0; + +        window_current = __wb_get_window_size (list); +        if (window_current <= window_conf) +        { +                window_current += __wb_mark_unwind_till (list, unwinds, +                                                         window_conf - window_current); +        } + +        return window_current; +} + + +int32_t +wb_stack_unwind (list_head_t *unwinds) +{ +        struct stat buf = {0,}; +        wb_local_t *local = NULL, *dummy = NULL; + +        list_for_each_entry_safe (local, dummy, unwinds, unwind_frames) +        { +                list_del_init (&local->unwind_frames); +                STACK_UNWIND (local->frame, local->op_ret, local->op_errno, &buf); +        } + +        return 0; +} + + +int32_t +wb_do_ops (call_frame_t *frame, wb_file_t *file, list_head_t *winds, list_head_t *unwinds) +{ +        /* copy the frame before calling wb_stack_unwind, since this request containing current frame might get unwound */ +        /*  call_frame_t *sync_frame = copy_frame (frame); */ +  +        wb_stack_unwind (unwinds); +        wb_sync (frame, file, winds); + +        return 0; +} + + +int32_t  +wb_process_queue (call_frame_t *frame, wb_file_t *file, char flush_all)  +{ +        list_head_t winds, unwinds; +        size_t size = 0; +        wb_conf_t *conf = file->this->private; + +        INIT_LIST_HEAD (&winds); +        INIT_LIST_HEAD (&unwinds); + +        if (!file) +        { +                return -1; +        } + +        size = flush_all ? 0 : conf->aggregate_size; +        LOCK (&file->lock); +        { +                __wb_cleanup_queue (file); +                __wb_mark_winds (&file->request, &winds, size); +                __wb_mark_unwinds (&file->request, &unwinds, conf->window_size); +        } +        UNLOCK (&file->lock); + +        wb_do_ops (frame, file, &winds, &unwinds); +        return 0; +} + + +wb_write_request_t * +wb_enqueue (wb_file_t *file,  +            call_frame_t *frame, +            struct iovec *vector, +            int32_t count, +            off_t offset) +{ +        wb_write_request_t *request = NULL; +        wb_local_t *local = CALLOC (1, sizeof (*local)); + +        request = CALLOC (1, sizeof (*request)); + +        INIT_LIST_HEAD (&request->list); +        INIT_LIST_HEAD (&request->winds); + +        request->frame = frame; +        request->vector = iov_dup (vector, count); +        request->count = count; +        request->offset = offset; +        request->refs = dict_ref (frame->root->req_refs); + +        frame->local = local; +        local->frame = frame; +        local->op_ret = iov_length (vector, count); +        local->op_errno = 0; +        INIT_LIST_HEAD (&local->unwind_frames); + +        LOCK (&file->lock); +        { +                list_add_tail (&request->list, &file->request); +                file->offset = offset + iov_length (vector, count); +        } +        UNLOCK (&file->lock); + +        return request; +} + + +int32_t +wb_writev_cbk (call_frame_t *frame, +               void *cookie, +               xlator_t *this, +               int32_t op_ret, +               int32_t op_errno, +               struct stat *stbuf) +{ +        STACK_UNWIND (frame, op_ret, op_errno, stbuf); +        return 0; +} + + +int32_t +wb_writev (call_frame_t *frame, +           xlator_t *this, +           fd_t *fd, +           struct iovec *vector, +           int32_t count, +           off_t offset) +{ +        wb_file_t *file = NULL; +        char offset_expected = 1, wb_disabled = 0;  +        call_frame_t *process_frame = NULL; +        size_t size = 0; +	uint64_t tmp_file = 0; + +        if (vector != NULL)  +                size = iov_length (vector, count); + +        if (fd_ctx_get (fd, this, &tmp_file)) { +                gf_log (this->name, GF_LOG_ERROR, "returning EBADFD"); +                STACK_UNWIND (frame, -1, EBADFD, NULL); +                return 0; +        } + +	file = (wb_file_t *)(long)tmp_file; +        if (!file) { +                gf_log (this->name, GF_LOG_ERROR, +                        "wb_file not found for fd %p", fd); +                STACK_UNWIND (frame, -1, EBADFD, NULL); +                return 0; +        } + +        LOCK (&file->lock); +        { +                if (file->disabled || file->disable_till) { +                        if (size > file->disable_till) { +                                file->disable_till = 0; +                        } else { +                                file->disable_till -= size; +                        } +                        wb_disabled = 1; +                } + +                if (file->offset != offset) +                        offset_expected = 0; +        } +        UNLOCK (&file->lock); + +        if (wb_disabled) { +                STACK_WIND (frame, +                            wb_writev_cbk, +                            FIRST_CHILD (frame->this), +                            FIRST_CHILD (frame->this)->fops->writev, +                            file->fd, +                            vector, +                            count, +                            offset); +                return 0; +        } + +        process_frame = copy_frame (frame); + +        if (!offset_expected) +                wb_process_queue (process_frame, file, 1); + +        wb_enqueue (file, frame, vector, count, offset); +        wb_process_queue (process_frame, file, 0); + +        STACK_DESTROY (process_frame->root); + +        return 0; +} + + +int32_t +wb_readv_cbk (call_frame_t *frame, +              void *cookie, +              xlator_t *this, +              int32_t op_ret, +              int32_t op_errno, +              struct iovec *vector, +              int32_t count, +              struct stat *stbuf) +{ +        wb_local_t *local = NULL; + +        local = frame->local; + +        STACK_UNWIND (frame, op_ret, op_errno, vector, count, stbuf); +        return 0; +} + + +int32_t +wb_readv (call_frame_t *frame, +          xlator_t *this, +          fd_t *fd, +          size_t size, +          off_t offset) +{ +        wb_file_t *file = NULL; +        wb_local_t *local = NULL; +	uint64_t tmp_file = 0; + +        if (fd_ctx_get (fd, this, &tmp_file)) { +                gf_log (this->name, GF_LOG_ERROR, "returning EBADFD"); +                STACK_UNWIND (frame, -1, EBADFD, NULL); +                return 0; +        } + +	file = (wb_file_t *)(long)tmp_file; +        if (file) +                wb_sync_all (frame, file); + +        local = CALLOC (1, sizeof (*local)); +        local->file = file; + +        frame->local = local; + +        STACK_WIND (frame, +                    wb_readv_cbk, +                    FIRST_CHILD(this), +                    FIRST_CHILD(this)->fops->readv, +                    fd, size, offset); + +        return 0; +} + + +int32_t +wb_ffr_bg_cbk (call_frame_t *frame, +               void *cookie, +               xlator_t *this, +               int32_t op_ret, +               int32_t op_errno) +{ +        wb_local_t *local = NULL; +        wb_file_t *file = NULL; + +        local = frame->local; +        file = local->file; + +        if (file) { +                fd_unref (file->fd); +        } + +        if (file->op_ret == -1) +        { +                op_ret = file->op_ret; +                op_errno = file->op_errno; + +                file->op_ret = 0; +        } +   +        STACK_DESTROY (frame->root); +        return 0; +} + + +int32_t +wb_ffr_cbk (call_frame_t *frame, +            void *cookie, +            xlator_t *this, +            int32_t op_ret, +            int32_t op_errno) +{ +        wb_local_t *local = NULL; +        wb_file_t *file = NULL; + +        local = frame->local; +        file = local->file; +        if (file) { +                /* corresponds to the fd_ref() done during wb_file_create() */ +                fd_unref (file->fd); +        } + +        if (file->op_ret == -1) +        { +                op_ret = file->op_ret; +                op_errno = file->op_errno; + +                file->op_ret = 0; +        } + +        STACK_UNWIND (frame, op_ret, op_errno); +        return 0; +} + + +int32_t +wb_flush (call_frame_t *frame, +          xlator_t *this, +          fd_t *fd) +{ +        wb_conf_t *conf = NULL; +        wb_file_t *file = NULL; +        call_frame_t *flush_frame = NULL; +        wb_local_t *local = NULL; +	uint64_t tmp_file = 0; + +        conf = this->private; + +        if (fd_ctx_get (fd, this, &tmp_file)) { +                gf_log (this->name, GF_LOG_ERROR, "returning EBADFD"); +                STACK_UNWIND (frame, -1, EBADFD); +                return 0; +        } + +	file = (wb_file_t *)(long)tmp_file; + +        local = CALLOC (1, sizeof (*local)); +        local->file = file; +        if (file) +                fd_ref (file->fd); + +        if (&file->request != file->request.next) { +                gf_log (this->name, GF_LOG_DEBUG, +                        "request queue is not empty, it has to be synced"); +        } + +        if (conf->flush_behind &&  +	    (!file->disabled) && (file->disable_till == 0)) { +                flush_frame = copy_frame (frame);      +                STACK_UNWIND (frame, file->op_ret,  +			      file->op_errno); // liar! liar! :O + +                flush_frame->local = local; +                wb_sync_all (flush_frame, file); + +                STACK_WIND (flush_frame, +                            wb_ffr_bg_cbk, +                            FIRST_CHILD(this), +                            FIRST_CHILD(this)->fops->flush, +                            fd); +        } else { +                wb_sync_all (frame, file); + +                frame->local = local; +                STACK_WIND (frame, +                            wb_ffr_cbk, +                            FIRST_CHILD(this), +                            FIRST_CHILD(this)->fops->flush, +                            fd); +        } + +        return 0; +} + + +int32_t +wb_fsync_cbk (call_frame_t *frame, +              void *cookie, +              xlator_t *this, +              int32_t op_ret, +              int32_t op_errno) +{ +        wb_local_t *local = NULL; +        wb_file_t *file = NULL; + +        local = frame->local; +        file = local->file; + +        if (file->op_ret == -1) +        { +                op_ret = file->op_ret; +                op_errno = file->op_errno; + +                file->op_ret = 0; +        } + +        STACK_UNWIND (frame, op_ret, op_errno); +        return 0; +} + +int32_t +wb_fsync (call_frame_t *frame, +          xlator_t *this, +          fd_t *fd, +          int32_t datasync) +{ +        wb_file_t *file = NULL; +        wb_local_t *local = NULL; +	uint64_t tmp_file = 0; + +        if (fd_ctx_get (fd, this, &tmp_file)) { +                gf_log (this->name, GF_LOG_ERROR, "returning EBADFD"); +                STACK_UNWIND (frame, -1, EBADFD); +                return 0; +        } + +	file = (wb_file_t *)(long)tmp_file; +        if (file) +                wb_sync_all (frame, file); + +        local = CALLOC (1, sizeof (*local)); +        local->file = file; + +        frame->local = local; + +        STACK_WIND (frame, +                    wb_fsync_cbk, +                    FIRST_CHILD(this), +                    FIRST_CHILD(this)->fops->fsync, +                    fd, datasync); +        return 0; +} + + +int32_t +wb_release (xlator_t *this, +            fd_t *fd) +{ +        uint64_t file = 0; + +	fd_ctx_get (fd, this, &file); +  	wb_file_destroy ((wb_file_t *)(long)file); + +        return 0; +} + + +int32_t  +init (xlator_t *this) +{ +        dict_t *options = NULL; +        wb_conf_t *conf = NULL; +        char *aggregate_size_string = NULL; +        char *window_size_string    = NULL; +        char *flush_behind_string   = NULL; +        char *disable_till_string = NULL; +        char *enable_O_SYNC_string = NULL; +        int32_t ret = -1; + +        if ((this->children == NULL) +            || this->children->next) { +                gf_log (this->name, GF_LOG_ERROR, +                        "FATAL: write-behind (%s) not configured with exactly one child", +                        this->name); +                return -1; +        } + +        if (this->parents == NULL) { +                gf_log (this->name, GF_LOG_WARNING, +                        "dangling volume. check volfile"); +        } +         +        options = this->options; + +        conf = CALLOC (1, sizeof (*conf)); +         +        conf->enable_O_SYNC = _gf_false; +        ret = dict_get_str (options, "enable-O_SYNC", +                            &enable_O_SYNC_string); +        if (ret == 0) { +                ret = gf_string2boolean (enable_O_SYNC_string, +                                         &conf->enable_O_SYNC); +                if (ret == -1) { +                        gf_log (this->name, GF_LOG_ERROR, +                                "'enable-O_SYNC' takes only boolean arguments"); +                        return -1; +                } +        } + +        /* configure 'options aggregate-size <size>' */ +        conf->aggregate_size = 0; +        ret = dict_get_str (options, "block-size",  +                            &aggregate_size_string); +        if (ret == 0) { +                ret = gf_string2bytesize (aggregate_size_string,  +                                          &conf->aggregate_size); +                if (ret != 0) { +                        gf_log (this->name, GF_LOG_ERROR,  +                                "invalid number format \"%s\" of \"option aggregate-size\"",  +                                aggregate_size_string); +                        return -1; +                } +        } + +        gf_log (this->name, GF_LOG_DEBUG, +                "using aggregate-size = %"PRIu64"",  +                conf->aggregate_size); +   +        conf->disable_till = 1; +        ret = dict_get_str (options, "disable-for-first-nbytes",  +                            &disable_till_string); +        if (ret == 0) { +                ret = gf_string2bytesize (disable_till_string,  +                                          &conf->disable_till); +                if (ret != 0) { +                        gf_log (this->name, GF_LOG_ERROR,  +                                "invalid number format \"%s\" of \"option disable-for-first-nbytes\"",  +                                disable_till_string); +                        return -1; +                } +        } + +        gf_log (this->name, GF_LOG_DEBUG, +                "disabling write-behind for first %"PRIu64" bytes",  +                conf->disable_till); +   +        /* configure 'option window-size <size>' */ +        conf->window_size = 0; +        ret = dict_get_str (options, "cache-size",  +                            &window_size_string); +        if (ret == 0) { +                ret = gf_string2bytesize (window_size_string,  +                                          &conf->window_size); +                if (ret != 0) { +                        gf_log (this->name, GF_LOG_ERROR,  +                                "invalid number format \"%s\" of \"option window-size\"",  +                                window_size_string); +                        FREE (conf); +                        return -1; +                } +        } + +        if (!conf->window_size && conf->aggregate_size) { +                gf_log (this->name, GF_LOG_WARNING, +                        "setting window-size to be equal to aggregate-size(%"PRIu64")", +                        conf->aggregate_size); +                conf->window_size = conf->aggregate_size; +        } + +        if (conf->window_size < conf->aggregate_size) { +                gf_log (this->name, GF_LOG_ERROR, +                        "aggregate-size(%"PRIu64") cannot be more than window-size" +                        "(%"PRIu64")", conf->window_size, conf->aggregate_size); +                FREE (conf); +                return -1; +        } + +        /* configure 'option flush-behind <on/off>' */ +        conf->flush_behind = 0; +        ret = dict_get_str (options, "flush-behind",  +                            &flush_behind_string); +        if (ret == 0) { +                ret = gf_string2boolean (flush_behind_string,  +                                         &conf->flush_behind); +                if (ret == -1) { +                        gf_log (this->name, GF_LOG_ERROR, +                                "'flush-behind' takes only boolean arguments"); +                        return -1; +                } + +                if (conf->flush_behind) { +			gf_log (this->name, GF_LOG_DEBUG, +				"enabling flush-behind"); +                } +        } +        this->private = conf; +        return 0; +} + + +void +fini (xlator_t *this) +{ +        wb_conf_t *conf = this->private; + +        FREE (conf); +        return; +} + + +struct xlator_fops fops = { +        .writev      = wb_writev, +        .open        = wb_open, +        .create      = wb_create, +        .readv       = wb_readv, +        .flush       = wb_flush, +        .fsync       = wb_fsync, +        .stat        = wb_stat, +        .fstat       = wb_fstat, +        .truncate    = wb_truncate, +        .ftruncate   = wb_ftruncate, +        .utimens     = wb_utimens, +}; + +struct xlator_mops mops = { +}; + +struct xlator_cbks cbks = { +        .release  = wb_release +}; + +struct volume_options options[] = { +        { .key  = {"flush-behind"},  +          .type = GF_OPTION_TYPE_BOOL +        }, +        { .key  = {"block-size", "aggregate-size"},  +          .type = GF_OPTION_TYPE_SIZET,  +          .min  = 128 * GF_UNIT_KB,  +          .max  = 4 * GF_UNIT_MB  +        }, +        { .key  = {"cache-size", "window-size"},  +          .type = GF_OPTION_TYPE_SIZET,  +          .min  = 512 * GF_UNIT_KB,  +          .max  = 1 * GF_UNIT_GB  +        }, +        { .key = {"disable-for-first-nbytes"}, +          .type = GF_OPTION_TYPE_SIZET, +          .min = 1, +          .max = 1 * GF_UNIT_MB, +        }, +        { .key = {"enable-O_SYNC"}, +          .type = GF_OPTION_TYPE_BOOL, +        },  +        { .key = {NULL} }, +};  | 
