diff options
Diffstat (limited to 'xlators/performance/io-threads/src')
| -rw-r--r-- | xlators/performance/io-threads/src/Makefile.am | 14 | ||||
| -rw-r--r-- | xlators/performance/io-threads/src/io-threads.c | 1254 | ||||
| -rw-r--r-- | xlators/performance/io-threads/src/io-threads.h | 99 | 
3 files changed, 1367 insertions, 0 deletions
diff --git a/xlators/performance/io-threads/src/Makefile.am b/xlators/performance/io-threads/src/Makefile.am new file mode 100644 index 00000000000..38dea3eb7fc --- /dev/null +++ b/xlators/performance/io-threads/src/Makefile.am @@ -0,0 +1,14 @@ +xlator_LTLIBRARIES = io-threads.la +xlatordir = $(libdir)/glusterfs/$(PACKAGE_VERSION)/xlator/performance + +io_threads_la_LDFLAGS = -module -avoidversion  + +io_threads_la_SOURCES = io-threads.c +io_threads_la_LIBADD = $(top_builddir)/libglusterfs/src/libglusterfs.la + +noinst_HEADERS = io-threads.h + +AM_CFLAGS = -fPIC -D_FILE_OFFSET_BITS=64 -D_GNU_SOURCE -Wall -D$(GF_HOST_OS)\ +	-I$(top_srcdir)/libglusterfs/src -shared -nostartfiles $(GF_CFLAGS) + +CLEANFILES =  diff --git a/xlators/performance/io-threads/src/io-threads.c b/xlators/performance/io-threads/src/io-threads.c new file mode 100644 index 00000000000..5acdd627da4 --- /dev/null +++ b/xlators/performance/io-threads/src/io-threads.c @@ -0,0 +1,1254 @@ +/* +  Copyright (c) 2006, 2007, 2008, 2009 Z RESEARCH, Inc. <http://www.zresearch.com> +  This file is part of GlusterFS. + +  GlusterFS is free software; you can redistribute it and/or modify +  it under the terms of the GNU General Public License as published +  by the Free Software Foundation; either version 3 of the License, +  or (at your option) any later version. + +  GlusterFS is distributed in the hope that it will be useful, but +  WITHOUT ANY WARRANTY; without even the implied warranty of +  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU +  General Public License for more details. + +  You should have received a copy of the GNU General Public License +  along with this program.  If not, see +  <http://www.gnu.org/licenses/>. +*/ + +#ifndef _CONFIG_H +#define _CONFIG_H +#include "config.h" +#endif + +#include "call-stub.h" +#include "glusterfs.h" +#include "logging.h" +#include "dict.h" +#include "xlator.h" +#include "io-threads.h" + +static void +iot_queue (iot_worker_t *worker, +           call_stub_t *stub); + +static call_stub_t * +iot_dequeue (iot_worker_t *worker); + +static iot_worker_t *  +iot_schedule (iot_conf_t *conf, +              iot_file_t *file, +              ino_t ino) +{ +	int32_t cnt = (ino % conf->thread_count); +	iot_worker_t *trav = conf->workers.next; + +	for (; cnt; cnt--) +		trav = trav->next; +   +	if (file) +		file->worker = trav; +	trav->fd_count++; +	return trav; +} + +int32_t +iot_open_cbk (call_frame_t *frame, +              void *cookie, +              xlator_t *this, +              int32_t op_ret, +              int32_t op_errno, +              fd_t *fd) +{ +	iot_conf_t *conf = this->private; + +	if (op_ret >= 0) { +		iot_file_t *file = CALLOC (1, sizeof (*file)); +		ERR_ABORT (file); + +		iot_schedule (conf, file, fd->inode->ino); +		file->fd = fd; + +		fd_ctx_set (fd, this, (uint64_t)(long)file); + +		pthread_mutex_lock (&conf->files_lock); +		file->next = &conf->files; +		file->prev = file->next->prev; +		file->next->prev = file; +		file->prev->next = file; +		pthread_mutex_unlock (&conf->files_lock); +	} +	STACK_UNWIND (frame, op_ret, op_errno, fd); +	return 0; +} + +int32_t +iot_open (call_frame_t *frame, +          xlator_t *this, +          loc_t *loc, +          int32_t flags, +	  fd_t *fd) +{ +	STACK_WIND (frame, +		    iot_open_cbk, +		    FIRST_CHILD(this), +		    FIRST_CHILD(this)->fops->open, +		    loc, +		    flags, +		    fd); +	return 0; +} + + +int32_t +iot_create_cbk (call_frame_t *frame, +		void *cookie, +		xlator_t *this, +		int32_t op_ret, +		int32_t op_errno, +		fd_t *fd, +		inode_t *inode, +		struct stat *stbuf) +{ +	iot_conf_t *conf = this->private; + +	if (op_ret >= 0) { +		iot_file_t *file = CALLOC (1, sizeof (*file)); +		ERR_ABORT (file); + +		iot_schedule (conf, file, fd->inode->ino); +		file->fd = fd; + +		fd_ctx_set (fd, this, (uint64_t)(long)file); + +		pthread_mutex_lock (&conf->files_lock); +		file->next = &conf->files; +		file->prev = file->next->prev; +		file->next->prev = file; +		file->prev->next = file; +		pthread_mutex_unlock (&conf->files_lock); +	} +	STACK_UNWIND (frame, op_ret, op_errno, fd, inode, stbuf); +	return 0; +} + +int32_t +iot_create (call_frame_t *frame, +            xlator_t *this, +	    loc_t *loc, +            int32_t flags, +            mode_t mode, +	    fd_t *fd) +{ +	STACK_WIND (frame, +		    iot_create_cbk, +		    FIRST_CHILD(this), +		    FIRST_CHILD(this)->fops->create, +		    loc, +		    flags, +		    mode, +		    fd); +	return 0; +} + + + +int32_t +iot_readv_cbk (call_frame_t *frame, +               void *cookie, +               xlator_t *this, +               int32_t op_ret, +               int32_t op_errno, +               struct iovec *vector, +               int32_t count, +	       struct stat *stbuf) +{ +	iot_local_t *local = frame->local; + +	local->frame_size = 0; //iov_length (vector, count); + +	STACK_UNWIND (frame, op_ret, op_errno, vector, count, stbuf); + +	return 0; +} + +static int32_t +iot_readv_wrapper (call_frame_t *frame, +                   xlator_t *this, +                   fd_t *fd, +                   size_t size, +                   off_t offset) +{ +	STACK_WIND (frame, +		    iot_readv_cbk, +		    FIRST_CHILD(this), +		    FIRST_CHILD(this)->fops->readv, +		    fd, +		    size, +		    offset); +	return 0; +} + +int32_t +iot_readv (call_frame_t *frame, +           xlator_t *this, +           fd_t *fd, +           size_t size, +           off_t offset) +{ +	call_stub_t *stub; +	iot_local_t *local = NULL; +	iot_file_t *file = NULL; +	iot_worker_t *worker = NULL; +	uint64_t tmp_file = 0; + +	if (fd_ctx_get (fd, this, &tmp_file)) { +		gf_log (this->name, GF_LOG_ERROR,  +			"fd context is NULL, returning EBADFD"); +		STACK_UNWIND (frame, -1, EBADFD); +		return 0; +	} + +	file = (iot_file_t *)(long)tmp_file; +	worker = file->worker; + +	local = CALLOC (1, sizeof (*local)); +	ERR_ABORT (local); +	frame->local = local; +   +	stub = fop_readv_stub (frame,  +			       iot_readv_wrapper, +			       fd, +			       size, +			       offset); +	if (!stub) { +		gf_log (this->name, GF_LOG_ERROR,  +			"cannot get readv call stub"); +		STACK_UNWIND (frame, -1, ENOMEM, NULL, 0); +		return 0; +	} + +	iot_queue (worker, stub); + +	return 0; +} + +int32_t +iot_flush_cbk (call_frame_t *frame, +               void *cookie, +               xlator_t *this, +               int32_t op_ret, +               int32_t op_errno) +{ +	STACK_UNWIND (frame, op_ret, op_errno); +	return 0; +} + +static int32_t +iot_flush_wrapper (call_frame_t *frame, +                   xlator_t *this, +                   fd_t *fd) +{ +	STACK_WIND (frame, +		    iot_flush_cbk, +		    FIRST_CHILD(this), +		    FIRST_CHILD(this)->fops->flush, +		    fd); +	return 0; +} + +int32_t +iot_flush (call_frame_t *frame, +           xlator_t *this, +           fd_t *fd) +{ +	call_stub_t *stub; +	iot_local_t *local = NULL; +	iot_file_t *file = NULL; +	iot_worker_t *worker = NULL; +	uint64_t tmp_file = 0; + +	if (fd_ctx_get (fd, this, &tmp_file)) { +		gf_log (this->name, GF_LOG_ERROR,  +			"fd context is NULL, returning EBADFD"); +		STACK_UNWIND (frame, -1, EBADFD); +		return 0; +	} + +	file = (iot_file_t *)(long)tmp_file; +	worker = file->worker; + +	local = CALLOC (1, sizeof (*local)); +	ERR_ABORT (local); + +	frame->local = local; +   +	stub = fop_flush_stub (frame, +			       iot_flush_wrapper, +			       fd); +	if (!stub) { +		gf_log (this->name, GF_LOG_ERROR, "cannot get flush_cbk call stub"); +		STACK_UNWIND (frame, -1, ENOMEM); +		return 0; +	} +	iot_queue (worker, stub); + +	return 0; +} + +int32_t +iot_fsync_cbk (call_frame_t *frame, +               void *cookie, +               xlator_t *this, +               int32_t op_ret, +               int32_t op_errno) +{ +	STACK_UNWIND (frame, op_ret, op_errno); +	return 0; +} + +static int32_t +iot_fsync_wrapper (call_frame_t *frame, +                   xlator_t *this, +                   fd_t *fd, +                   int32_t datasync) +{ +	STACK_WIND (frame, +		    iot_fsync_cbk, +		    FIRST_CHILD (this), +		    FIRST_CHILD (this)->fops->fsync, +		    fd, +		    datasync); +	return 0; +} + +int32_t +iot_fsync (call_frame_t *frame, +           xlator_t *this, +           fd_t *fd, +           int32_t datasync) +{ +	call_stub_t *stub; +	iot_local_t *local = NULL; +	iot_file_t *file = NULL; +	iot_worker_t *worker = NULL; +	uint64_t tmp_file = 0; + +	if (fd_ctx_get (fd, this, &tmp_file)) { +		gf_log (this->name, GF_LOG_ERROR,  +			"fd context is NULL, returning EBADFD"); +		STACK_UNWIND (frame, -1, EBADFD); +		return 0; +	} + +	file = (iot_file_t *)(long)tmp_file; +	worker = file->worker; + +	local = CALLOC (1, sizeof (*local)); +	ERR_ABORT (local); + +	frame->local = local; +   +	stub = fop_fsync_stub (frame, +			       iot_fsync_wrapper, +			       fd, +			       datasync); +	if (!stub) { +		gf_log (this->name, GF_LOG_ERROR, "cannot get fsync_cbk call stub"); +		STACK_UNWIND (frame, -1, ENOMEM); +		return 0; +	} +	iot_queue (worker, stub); + +	return 0; +} + +int32_t +iot_writev_cbk (call_frame_t *frame, +                void *cookie, +                xlator_t *this, +                int32_t op_ret, +                int32_t op_errno, +		struct stat *stbuf) +{ +	iot_local_t *local = frame->local; + +	local->frame_size = 0; /* hehe, caught me! */ + +	STACK_UNWIND (frame, op_ret, op_errno, stbuf); +	return 0; +} + +static int32_t +iot_writev_wrapper (call_frame_t *frame, +                    xlator_t *this, +                    fd_t *fd, +                    struct iovec *vector, +                    int32_t count, +                    off_t offset) +{ +	STACK_WIND (frame, +		    iot_writev_cbk, +		    FIRST_CHILD(this), +		    FIRST_CHILD(this)->fops->writev, +		    fd, +		    vector, +		    count, +		    offset); +	return 0; +} + +int32_t +iot_writev (call_frame_t *frame, +            xlator_t *this, +            fd_t *fd, +            struct iovec *vector, +            int32_t count, +            off_t offset) +{ +	call_stub_t *stub; +	iot_local_t *local = NULL; +	iot_file_t *file = NULL; +	iot_worker_t *worker = NULL; +	uint64_t tmp_file = 0; + +	if (fd_ctx_get (fd, this, &tmp_file)) { +		gf_log (this->name, GF_LOG_ERROR,  +			"fd context is NULL, returning EBADFD"); +		STACK_UNWIND (frame, -1, EBADFD); +		return 0; +	} + +	file = (iot_file_t *)(long)tmp_file; +	worker = file->worker; + +	local = CALLOC (1, sizeof (*local)); +	ERR_ABORT (local); + +	if (frame->root->req_refs) +		local->frame_size = dict_serialized_length (frame->root->req_refs); +	else +		local->frame_size = iov_length (vector, count); +	frame->local = local; +   +	stub = fop_writev_stub (frame, iot_writev_wrapper, +				fd, vector, count, offset); + +	if (!stub) { +		gf_log (this->name, GF_LOG_ERROR, "cannot get writev call stub"); +		STACK_UNWIND (frame, -1, ENOMEM); +		return 0; +	} + +	iot_queue (worker, stub); + +	return 0; +} + + +int32_t +iot_lk_cbk (call_frame_t *frame, +            void *cookie, +            xlator_t *this, +            int32_t op_ret, +            int32_t op_errno, +            struct flock *flock) +{ +	STACK_UNWIND (frame, op_ret, op_errno, flock); +	return 0; +} + + +static int32_t +iot_lk_wrapper (call_frame_t *frame, +                xlator_t *this, +                fd_t *fd, +                int32_t cmd, +                struct flock *flock) +{ +	STACK_WIND (frame, +		    iot_lk_cbk, +		    FIRST_CHILD(this), +		    FIRST_CHILD(this)->fops->lk, +		    fd, +		    cmd, +		    flock); +	return 0; +} + + +int32_t +iot_lk (call_frame_t *frame, +	xlator_t *this, +	fd_t *fd, +	int32_t cmd, +	struct flock *flock) +{ +	call_stub_t *stub; +	iot_local_t *local = NULL; +	iot_file_t *file = NULL; +	iot_worker_t *worker = NULL; +	uint64_t tmp_file = 0; + +	if (fd_ctx_get (fd, this, &tmp_file)) { +		gf_log (this->name, GF_LOG_ERROR,  +			"fd context is NULL, returning EBADFD"); +		STACK_UNWIND (frame, -1, EBADFD); +		return 0; +	} + +	file = (iot_file_t *)(long)tmp_file; +	worker = file->worker; + +	local = CALLOC (1, sizeof (*local)); +	ERR_ABORT (local); +	frame->local = local; + +	stub = fop_lk_stub (frame, iot_lk_wrapper, +			    fd, cmd, flock); + +	if (!stub) { +		gf_log (this->name, GF_LOG_ERROR, "cannot get fop_lk call stub"); +		STACK_UNWIND (frame, -1, ENOMEM, NULL); +		return 0; +	} +     +	iot_queue (worker, stub); + +	return 0; +} + + +int32_t  +iot_stat_cbk (call_frame_t *frame, +              void *cookie, +              xlator_t *this, +              int32_t op_ret, +              int32_t op_errno, +              struct stat *buf) +{ +	STACK_UNWIND (frame, op_ret, op_errno, buf); +	return 0; +} + + +static int32_t  +iot_stat_wrapper (call_frame_t *frame, +                  xlator_t *this, +                  loc_t *loc) +{ +	STACK_WIND (frame, +		    iot_stat_cbk, +		    FIRST_CHILD(this), +		    FIRST_CHILD(this)->fops->stat, +		    loc); +	return 0; +} + +int32_t  +iot_stat (call_frame_t *frame, +          xlator_t *this, +          loc_t *loc) +{ +	call_stub_t *stub; +	iot_local_t *local = NULL; +	iot_worker_t *worker = NULL; +	iot_conf_t *conf; +	fd_t *fd = NULL; + +	conf = this->private; + +	local = CALLOC (1, sizeof (*local)); +	ERR_ABORT (local); +	frame->local = local; + +	fd = fd_lookup (loc->inode, frame->root->pid); + +	if (fd == NULL) { +		STACK_WIND(frame, +			   iot_stat_cbk, +			   FIRST_CHILD(this), +			   FIRST_CHILD(this)->fops->stat, +			   loc); +		return 0; +	}  +   +	fd_unref (fd); + +	worker = iot_schedule (conf, NULL, loc->inode->ino); + +	stub = fop_stat_stub (frame, +			      iot_stat_wrapper, +			      loc); +	if (!stub) { +		gf_log (this->name, GF_LOG_ERROR, "cannot get fop_stat call stub"); +		STACK_UNWIND (frame, -1, ENOMEM, NULL); +		return 0; +	} +	iot_queue (worker, stub); + +	return 0; +} + + +int32_t  +iot_fstat_cbk (call_frame_t *frame, +               void *cookie, +               xlator_t *this, +               int32_t op_ret, +               int32_t op_errno, +               struct stat *buf) +{ +	STACK_UNWIND (frame, op_ret, op_errno, buf); +	return 0; +} + +static int32_t  +iot_fstat_wrapper (call_frame_t *frame, +                   xlator_t *this, +                   fd_t *fd) +{ +	STACK_WIND (frame, +		    iot_fstat_cbk, +		    FIRST_CHILD(this), +		    FIRST_CHILD(this)->fops->fstat, +		    fd); +	return 0; +} + +int32_t  +iot_fstat (call_frame_t *frame, +           xlator_t *this, +           fd_t *fd) +{ +	call_stub_t *stub; +	iot_local_t *local = NULL; +	iot_file_t *file = NULL; +	iot_worker_t *worker = NULL; +	uint64_t tmp_file = 0; + +	if (fd_ctx_get (fd, this, &tmp_file)) { +		gf_log (this->name, GF_LOG_ERROR,  +			"fd context is NULL, returning EBADFD"); +		STACK_UNWIND (frame, -1, EBADFD); +		return 0; +	} + +	file = (iot_file_t *)(long)tmp_file; +	worker = file->worker; + +	local = CALLOC (1, sizeof (*local)); +	ERR_ABORT (local); +	frame->local = local; +	stub = fop_fstat_stub (frame, +			       iot_fstat_wrapper, +			       fd); +	if (!stub) { +		gf_log (this->name, GF_LOG_ERROR, "cannot get fop_fstat call stub"); +		STACK_UNWIND (frame, -1, ENOMEM, NULL); +		return 0; +	} + +	iot_queue (worker, stub); + +	return 0; +} + +int32_t  +iot_truncate_cbk (call_frame_t *frame, +                  void *cookie, +                  xlator_t *this, +                  int32_t op_ret, +                  int32_t op_errno, +                  struct stat *buf) +{ +	STACK_UNWIND (frame, op_ret, op_errno, buf); +	return 0; +} + +static int32_t  +iot_truncate_wrapper (call_frame_t *frame, +                      xlator_t *this, +                      loc_t *loc, +                      off_t offset) +{ +	STACK_WIND (frame, +		    iot_truncate_cbk, +		    FIRST_CHILD(this), +		    FIRST_CHILD(this)->fops->truncate, +		    loc, +		    offset); +	return 0; +} + +int32_t  +iot_truncate (call_frame_t *frame, +              xlator_t *this, +              loc_t *loc, +              off_t offset) +{ +	call_stub_t *stub; +	iot_local_t *local = NULL; +	iot_worker_t *worker = NULL; +	iot_conf_t *conf; +	fd_t *fd = NULL; +   +	conf = this->private; +	local = CALLOC (1, sizeof (*local)); +	ERR_ABORT (local); +	frame->local = local; + +	fd = fd_lookup (loc->inode, frame->root->pid); + +	if (fd == NULL) { +		STACK_WIND(frame, +			   iot_truncate_cbk, +			   FIRST_CHILD(this), +			   FIRST_CHILD(this)->fops->truncate, +			   loc, +			   offset); +		return 0; +	}  +   +	fd_unref (fd); + +	worker = iot_schedule (conf, NULL, loc->inode->ino); + +	stub = fop_truncate_stub (frame, +				  iot_truncate_wrapper, +				  loc, +				  offset); +	if (!stub) { +		gf_log (this->name, GF_LOG_ERROR, "cannot get fop_stat call stub"); +		STACK_UNWIND (frame, -1, ENOMEM, NULL); +		return 0; +	} +	iot_queue (worker, stub); + +	return 0; +} + +int32_t  +iot_ftruncate_cbk (call_frame_t *frame, +                   void *cookie, +                   xlator_t *this, +                   int32_t op_ret, +                   int32_t op_errno, +                   struct stat *buf) +{ +	STACK_UNWIND (frame, op_ret, op_errno, buf); +	return 0; +} + +static int32_t  +iot_ftruncate_wrapper (call_frame_t *frame, +                       xlator_t *this, +                       fd_t *fd, +                       off_t offset) +{ +	STACK_WIND (frame, +		    iot_ftruncate_cbk, +		    FIRST_CHILD(this), +		    FIRST_CHILD(this)->fops->ftruncate, +		    fd, +		    offset); +	return 0; +} + +int32_t  +iot_ftruncate (call_frame_t *frame, +               xlator_t *this, +               fd_t *fd, +               off_t offset) +{ +	call_stub_t *stub; +	iot_local_t *local = NULL; +	iot_file_t *file = NULL; +	iot_worker_t *worker = NULL; +	uint64_t tmp_file = 0; + +	if (fd_ctx_get (fd, this, &tmp_file)) { +		gf_log (this->name, GF_LOG_ERROR,  +			"fd context is NULL, returning EBADFD"); +		STACK_UNWIND (frame, -1, EBADFD); +		return 0; +	} + +	file = (iot_file_t *)(long)tmp_file; +	worker = file->worker; + +	local = CALLOC (1, sizeof (*local)); +	ERR_ABORT (local); +	frame->local = local; + +	stub = fop_ftruncate_stub (frame, +				   iot_ftruncate_wrapper, +				   fd, +				   offset); +	if (!stub) { +		gf_log (this->name, GF_LOG_ERROR, "cannot get fop_ftruncate call stub"); +		STACK_UNWIND (frame, -1, ENOMEM, NULL); +		return 0; +	} +	iot_queue (worker, stub); + +	return 0; +} + +int32_t  +iot_utimens_cbk (call_frame_t *frame, +                 void *cookie, +                 xlator_t *this, +                 int32_t op_ret, +                 int32_t op_errno, +                 struct stat *buf) +{ +	STACK_UNWIND (frame, op_ret, op_errno, buf); +	return 0; +} + +static int32_t  +iot_utimens_wrapper (call_frame_t *frame, +                     xlator_t *this, +                     loc_t *loc, +                     struct timespec tv[2]) +{ +	STACK_WIND (frame, +		    iot_utimens_cbk, +		    FIRST_CHILD(this), +		    FIRST_CHILD(this)->fops->utimens, +		    loc, +		    tv); +   +	return 0; +} + +int32_t  +iot_utimens (call_frame_t *frame, +             xlator_t *this, +             loc_t *loc, +             struct timespec tv[2]) +{ +	call_stub_t *stub; +	iot_local_t *local = NULL; +	iot_worker_t *worker = NULL; +	iot_conf_t *conf; +	fd_t *fd = NULL; +   +	conf = this->private; + +	local = CALLOC (1, sizeof (*local)); +	ERR_ABORT (local); +	frame->local = local; +   +	fd = fd_lookup (loc->inode, frame->root->pid); + +	if (fd == NULL) { +		STACK_WIND(frame, +			   iot_utimens_cbk, +			   FIRST_CHILD(this), +			   FIRST_CHILD(this)->fops->utimens, +			   loc, +			   tv); +		return 0; +	}  +   +	fd_unref (fd); + +	worker = iot_schedule (conf, NULL, loc->inode->ino); + +	stub = fop_utimens_stub (frame, +				 iot_utimens_wrapper, +				 loc, +				 tv); +	if (!stub) { +		gf_log (this->name, GF_LOG_ERROR, "cannot get fop_utimens call stub"); +		STACK_UNWIND (frame, -1, ENOMEM, NULL); +		return 0; +	} +	iot_queue (worker, stub); + +	return 0; +} + + +int32_t  +iot_checksum_cbk (call_frame_t *frame, +		  void *cookie, +		  xlator_t *this, +		  int32_t op_ret, +		  int32_t op_errno, +		  uint8_t *file_checksum, +		  uint8_t *dir_checksum) +{ +	STACK_UNWIND (frame, op_ret, op_errno, file_checksum, dir_checksum); +	return 0; +} + +static int32_t  +iot_checksum_wrapper (call_frame_t *frame, +		      xlator_t *this, +		      loc_t *loc, +		      int32_t flags) +{ +	STACK_WIND (frame, +		    iot_checksum_cbk, +		    FIRST_CHILD(this), +		    FIRST_CHILD(this)->fops->checksum, +		    loc, +		    flags); +   +	return 0; +} + +int32_t  +iot_checksum (call_frame_t *frame, +	      xlator_t *this, +	      loc_t *loc, +	      int32_t flags) +{ +	call_stub_t *stub = NULL; +	iot_local_t *local = NULL; +	iot_worker_t *worker = NULL; +	iot_conf_t *conf = NULL; +   +	conf = this->private; + +	local = CALLOC (1, sizeof (*local)); +	frame->local = local; + +	worker = iot_schedule (conf, NULL, conf->misc_thread_index++); + +	stub = fop_checksum_stub (frame, +				  iot_checksum_wrapper, +				  loc, +				  flags); +	if (!stub) { +		gf_log (this->name, GF_LOG_ERROR, "cannot get fop_checksum call stub"); +		STACK_UNWIND (frame, -1, ENOMEM, NULL, NULL); +		return 0; +	} +	iot_queue (worker, stub); + +	return 0; +} + + +int32_t  +iot_unlink_cbk (call_frame_t *frame, +		void *cookie, +		xlator_t *this, +		int32_t op_ret, +		int32_t op_errno) +{ +	STACK_UNWIND (frame, op_ret, op_errno); +	return 0; +} + +static int32_t  +iot_unlink_wrapper (call_frame_t *frame, +		    xlator_t *this, +		    loc_t *loc) +{ +	STACK_WIND (frame, +		    iot_unlink_cbk, +		    FIRST_CHILD(this), +		    FIRST_CHILD(this)->fops->unlink, +		    loc); +   +	return 0; +} + +int32_t  +iot_unlink (call_frame_t *frame, +	    xlator_t *this, +	    loc_t *loc) +{ +	call_stub_t *stub = NULL; +	iot_local_t *local = NULL; +	iot_worker_t *worker = NULL; +	iot_conf_t *conf = NULL; + +	conf = this->private; + +	local = CALLOC (1, sizeof (*local)); +	frame->local = local; + +	worker = iot_schedule (conf, NULL, conf->misc_thread_index++); + +	stub = fop_unlink_stub (frame, iot_unlink_wrapper, loc); +	if (!stub) { +		gf_log (this->name, GF_LOG_ERROR, "cannot get fop_unlink call stub"); +		STACK_UNWIND (frame, -1, ENOMEM); +		return 0; +	} +	iot_queue (worker, stub); + +	return 0; +} + +int32_t +iot_release (xlator_t *this, +	     fd_t *fd) +{ +	iot_file_t *file = NULL; +	iot_conf_t *conf = NULL; +	uint64_t tmp_file = 0; +	int ret = 0; + +	conf = this->private; +	ret = fd_ctx_del (fd, this, &tmp_file); +	if (ret) +		return 0; + +	file = (iot_file_t *)(long)tmp_file; + +	pthread_mutex_lock (&conf->files_lock); +	{ +		(file->prev)->next = file->next; +		(file->next)->prev = file->prev; +	} +	pthread_mutex_unlock (&conf->files_lock); + +	FREE (file); +	return 0; +} + + +static void +iot_queue (iot_worker_t *worker, +           call_stub_t *stub) +{ +	iot_queue_t *queue; +	iot_conf_t *conf = worker->conf; +	iot_local_t *local = stub->frame->local; +	size_t frame_size = local->frame_size; + +	queue = CALLOC (1, sizeof (*queue)); +	ERR_ABORT (queue); +	queue->stub = stub; + +	pthread_mutex_lock (&conf->lock); + +	/* +	  while (worker->queue_size >= worker->queue_limit) +	  pthread_cond_wait (&worker->q_cond, &worker->lock); +	*/ +	if (conf->cache_size) { +		while (frame_size && (conf->current_size >= conf->cache_size)) +			pthread_cond_wait (&conf->q_cond, &conf->lock); +	} + +	queue->next = &worker->queue; +	queue->prev = worker->queue.prev; + +	queue->next->prev = queue; +	queue->prev->next = queue; + +	/* dq_cond */ +	worker->queue_size++; +	worker->q++; + +	conf->current_size += local->frame_size; + +	pthread_cond_broadcast (&worker->dq_cond); + +	pthread_mutex_unlock (&conf->lock); +} + +static call_stub_t * +iot_dequeue (iot_worker_t *worker) +{ +	call_stub_t *stub = NULL; +	iot_queue_t *queue = NULL; +	iot_conf_t *conf = worker->conf; +	iot_local_t *local = NULL; + + +	pthread_mutex_lock (&conf->lock); + +	while (!worker->queue_size) +		/* +		  pthread_cond_wait (&worker->dq_cond, &worker->lock); +		*/ +		pthread_cond_wait (&worker->dq_cond, &conf->lock); + +	queue = worker->queue.next; + +	queue->next->prev = queue->prev; +	queue->prev->next = queue->next; + +	stub = queue->stub; +	local = stub->frame->local; + +	worker->queue_size--; +	worker->dq++; + +	/* q_cond */ +	conf->current_size -= local->frame_size; + +	pthread_cond_broadcast (&conf->q_cond); + +	pthread_mutex_unlock (&conf->lock); + +	FREE (queue); + +	return stub; +} + +static void * +iot_worker (void *arg) +{ +	iot_worker_t *worker = arg; + +	while (1) { +		call_stub_t *stub; + +		stub = iot_dequeue (worker); +		call_resume (stub); +	} +} + +#if 0 +static void * +iot_reply (void *arg) +{ +	iot_worker_t *reply = arg; + +	while (1) { +		call_stub_t *stub; + +		stub = iot_dequeue (reply); +		FREE (stub->frame->local); +		stub->frame->local = NULL; +		call_resume (stub); +	} +} +#endif + +static void +workers_init (iot_conf_t *conf) +{ +	int i; + +	conf->workers.next = &conf->workers; +	conf->workers.prev = &conf->workers; + +	for (i=0; i<conf->thread_count; i++) { + +		iot_worker_t *worker = CALLOC (1, sizeof (*worker)); +		ERR_ABORT (worker); + +		worker->next = &conf->workers; +		worker->prev = conf->workers.prev; +		worker->next->prev = worker; +		worker->prev->next = worker; + +		worker->queue.next = &worker->queue; +		worker->queue.prev = &worker->queue; + +		/* +		  pthread_mutex_init (&worker->lock, NULL); +		  pthread_cond_init (&worker->q_cond, NULL); +		*/ +		pthread_cond_init (&worker->dq_cond, NULL); + +		/* +		  worker->queue_limit = conf->queue_limit; +		*/ + +		worker->conf = conf; + +		pthread_create (&worker->thread, NULL, iot_worker, worker); +	} +} + +int32_t  +init (xlator_t *this) +{ +	iot_conf_t *conf; +	dict_t *options = this->options; + +	if (!this->children || this->children->next) { +		gf_log ("io-threads", +			GF_LOG_ERROR, +			"FATAL: iot not configured with exactly one child"); +		return -1; +	} + +	if (!this->parents) { +		gf_log (this->name, GF_LOG_WARNING, +			"dangling volume. check volfile "); +	} + +	conf = (void *) CALLOC (1, sizeof (*conf)); +	ERR_ABORT (conf); + +	conf->thread_count = 1; + +	if (dict_get (options, "thread-count")) { +		conf->thread_count = data_to_int32 (dict_get (options, +							      "thread-count")); +		gf_log ("io-threads", +			GF_LOG_DEBUG, +			"Using conf->thread_count = %d", +			conf->thread_count); +	} + +	pthread_mutex_init (&conf->lock, NULL); +	pthread_cond_init (&conf->q_cond, NULL); + +	conf->files.next = &conf->files; +	conf->files.prev = &conf->files; +	pthread_mutex_init (&conf->files_lock, NULL); + +	workers_init (conf); + +	this->private = conf; +	return 0; +} + +void +fini (xlator_t *this) +{ +	iot_conf_t *conf = this->private; + +	FREE (conf); + +	this->private = NULL; +	return; +} + +struct xlator_fops fops = { +	.open        = iot_open, +	.create      = iot_create, +	.readv       = iot_readv, +	.writev      = iot_writev, +	.flush       = iot_flush, +	.fsync       = iot_fsync, +	.lk          = iot_lk, +	.stat        = iot_stat, +	.fstat       = iot_fstat, +	.truncate    = iot_truncate, +	.ftruncate   = iot_ftruncate, +	.utimens     = iot_utimens, +	.checksum    = iot_checksum, +	.unlink      = iot_unlink, +}; + +struct xlator_mops mops = { +}; + +struct xlator_cbks cbks = { +	.release = iot_release, +}; + +struct volume_options options[] = { +	{ .key  = {"thread-count"},  +	  .type = GF_OPTION_TYPE_INT,  +	  .min  = 1,  +	  .max  = 32 +	}, +	{ .key  = {NULL} }, +}; diff --git a/xlators/performance/io-threads/src/io-threads.h b/xlators/performance/io-threads/src/io-threads.h new file mode 100644 index 00000000000..6595d3e277b --- /dev/null +++ b/xlators/performance/io-threads/src/io-threads.h @@ -0,0 +1,99 @@ +/* +   Copyright (c) 2006, 2007, 2008 Z RESEARCH, Inc. <http://www.zresearch.com> +   This file is part of GlusterFS. + +   GlusterFS is free software; you can redistribute it and/or modify +   it under the terms of the GNU General Public License as published +   by the Free Software Foundation; either version 3 of the License, +   or (at your option) any later version. + +   GlusterFS is distributed in the hope that it will be useful, but +   WITHOUT ANY WARRANTY; without even the implied warranty of +   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU +   General Public License for more details. + +   You should have received a copy of the GNU General Public License +   along with this program.  If not, see +   <http://www.gnu.org/licenses/>. +*/ + +#ifndef __IOT_H +#define __IOT_H + +#ifndef _CONFIG_H +#define _CONFIG_H +#include "config.h" +#endif + + +#include "compat-errno.h" +#include "glusterfs.h" +#include "logging.h" +#include "dict.h" +#include "xlator.h" +#include "common-utils.h" + +#define min(a,b) ((a)<(b)?(a):(b)) +#define max(a,b) ((a)>(b)?(a):(b)) + +struct iot_conf; +struct iot_worker; +struct iot_queue; +struct iot_local; +struct iot_file; + +struct iot_local { +  struct iot_file *file; +  size_t frame_size; +}; + +struct iot_queue { +  struct iot_queue *next, *prev; +  call_stub_t *stub; +}; + +struct iot_worker { +  struct iot_worker *next, *prev; +  struct iot_queue queue; +  struct iot_conf *conf; +  int64_t q,dq; +  pthread_cond_t dq_cond; +  /* +    pthread_cond_t q_cond; +    pthread_mutex_t lock; +  */ +  int32_t fd_count; +  int32_t queue_size; +  /* +    int32_t queue_limit; +  */ +  pthread_t thread; +}; + +struct iot_file { +  struct iot_file *next, *prev; /* all open files via this xlator */ +  struct iot_worker *worker; +  fd_t *fd; +  int32_t pending_ops; +}; + +struct iot_conf { +  int32_t thread_count; +  int32_t misc_thread_index;  /* Used to schedule the miscellaneous calls like checksum */ +  struct iot_worker workers; +  struct iot_file files; +  pthread_mutex_t files_lock; + +  uint64_t cache_size; +  off_t current_size; +  pthread_cond_t q_cond; +  pthread_mutex_t lock; +}; + +typedef struct iot_file iot_file_t; +typedef struct iot_conf iot_conf_t; +typedef struct iot_local iot_local_t; +typedef struct iot_worker iot_worker_t; +typedef struct iot_queue iot_queue_t; + +#endif /* __IOT_H */  | 
