summaryrefslogtreecommitdiffstats
path: root/xlators/performance/io-threads/src/io-threads.c
diff options
context:
space:
mode:
Diffstat (limited to 'xlators/performance/io-threads/src/io-threads.c')
-rw-r--r--xlators/performance/io-threads/src/io-threads.c1254
1 files changed, 1254 insertions, 0 deletions
diff --git a/xlators/performance/io-threads/src/io-threads.c b/xlators/performance/io-threads/src/io-threads.c
new file mode 100644
index 000000000..5acdd627d
--- /dev/null
+++ b/xlators/performance/io-threads/src/io-threads.c
@@ -0,0 +1,1254 @@
+/*
+ Copyright (c) 2006, 2007, 2008, 2009 Z RESEARCH, Inc. <http://www.zresearch.com>
+ This file is part of GlusterFS.
+
+ GlusterFS is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published
+ by the Free Software Foundation; either version 3 of the License,
+ or (at your option) any later version.
+
+ GlusterFS is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program. If not, see
+ <http://www.gnu.org/licenses/>.
+*/
+
+#ifndef _CONFIG_H
+#define _CONFIG_H
+#include "config.h"
+#endif
+
+#include "call-stub.h"
+#include "glusterfs.h"
+#include "logging.h"
+#include "dict.h"
+#include "xlator.h"
+#include "io-threads.h"
+
+static void
+iot_queue (iot_worker_t *worker,
+ call_stub_t *stub);
+
+static call_stub_t *
+iot_dequeue (iot_worker_t *worker);
+
+static iot_worker_t *
+iot_schedule (iot_conf_t *conf,
+ iot_file_t *file,
+ ino_t ino)
+{
+ int32_t cnt = (ino % conf->thread_count);
+ iot_worker_t *trav = conf->workers.next;
+
+ for (; cnt; cnt--)
+ trav = trav->next;
+
+ if (file)
+ file->worker = trav;
+ trav->fd_count++;
+ return trav;
+}
+
+int32_t
+iot_open_cbk (call_frame_t *frame,
+ void *cookie,
+ xlator_t *this,
+ int32_t op_ret,
+ int32_t op_errno,
+ fd_t *fd)
+{
+ iot_conf_t *conf = this->private;
+
+ if (op_ret >= 0) {
+ iot_file_t *file = CALLOC (1, sizeof (*file));
+ ERR_ABORT (file);
+
+ iot_schedule (conf, file, fd->inode->ino);
+ file->fd = fd;
+
+ fd_ctx_set (fd, this, (uint64_t)(long)file);
+
+ pthread_mutex_lock (&conf->files_lock);
+ file->next = &conf->files;
+ file->prev = file->next->prev;
+ file->next->prev = file;
+ file->prev->next = file;
+ pthread_mutex_unlock (&conf->files_lock);
+ }
+ STACK_UNWIND (frame, op_ret, op_errno, fd);
+ return 0;
+}
+
+int32_t
+iot_open (call_frame_t *frame,
+ xlator_t *this,
+ loc_t *loc,
+ int32_t flags,
+ fd_t *fd)
+{
+ STACK_WIND (frame,
+ iot_open_cbk,
+ FIRST_CHILD(this),
+ FIRST_CHILD(this)->fops->open,
+ loc,
+ flags,
+ fd);
+ return 0;
+}
+
+
+int32_t
+iot_create_cbk (call_frame_t *frame,
+ void *cookie,
+ xlator_t *this,
+ int32_t op_ret,
+ int32_t op_errno,
+ fd_t *fd,
+ inode_t *inode,
+ struct stat *stbuf)
+{
+ iot_conf_t *conf = this->private;
+
+ if (op_ret >= 0) {
+ iot_file_t *file = CALLOC (1, sizeof (*file));
+ ERR_ABORT (file);
+
+ iot_schedule (conf, file, fd->inode->ino);
+ file->fd = fd;
+
+ fd_ctx_set (fd, this, (uint64_t)(long)file);
+
+ pthread_mutex_lock (&conf->files_lock);
+ file->next = &conf->files;
+ file->prev = file->next->prev;
+ file->next->prev = file;
+ file->prev->next = file;
+ pthread_mutex_unlock (&conf->files_lock);
+ }
+ STACK_UNWIND (frame, op_ret, op_errno, fd, inode, stbuf);
+ return 0;
+}
+
+int32_t
+iot_create (call_frame_t *frame,
+ xlator_t *this,
+ loc_t *loc,
+ int32_t flags,
+ mode_t mode,
+ fd_t *fd)
+{
+ STACK_WIND (frame,
+ iot_create_cbk,
+ FIRST_CHILD(this),
+ FIRST_CHILD(this)->fops->create,
+ loc,
+ flags,
+ mode,
+ fd);
+ return 0;
+}
+
+
+
+int32_t
+iot_readv_cbk (call_frame_t *frame,
+ void *cookie,
+ xlator_t *this,
+ int32_t op_ret,
+ int32_t op_errno,
+ struct iovec *vector,
+ int32_t count,
+ struct stat *stbuf)
+{
+ iot_local_t *local = frame->local;
+
+ local->frame_size = 0; //iov_length (vector, count);
+
+ STACK_UNWIND (frame, op_ret, op_errno, vector, count, stbuf);
+
+ return 0;
+}
+
+static int32_t
+iot_readv_wrapper (call_frame_t *frame,
+ xlator_t *this,
+ fd_t *fd,
+ size_t size,
+ off_t offset)
+{
+ STACK_WIND (frame,
+ iot_readv_cbk,
+ FIRST_CHILD(this),
+ FIRST_CHILD(this)->fops->readv,
+ fd,
+ size,
+ offset);
+ return 0;
+}
+
+int32_t
+iot_readv (call_frame_t *frame,
+ xlator_t *this,
+ fd_t *fd,
+ size_t size,
+ off_t offset)
+{
+ call_stub_t *stub;
+ iot_local_t *local = NULL;
+ iot_file_t *file = NULL;
+ iot_worker_t *worker = NULL;
+ uint64_t tmp_file = 0;
+
+ if (fd_ctx_get (fd, this, &tmp_file)) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "fd context is NULL, returning EBADFD");
+ STACK_UNWIND (frame, -1, EBADFD);
+ return 0;
+ }
+
+ file = (iot_file_t *)(long)tmp_file;
+ worker = file->worker;
+
+ local = CALLOC (1, sizeof (*local));
+ ERR_ABORT (local);
+ frame->local = local;
+
+ stub = fop_readv_stub (frame,
+ iot_readv_wrapper,
+ fd,
+ size,
+ offset);
+ if (!stub) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "cannot get readv call stub");
+ STACK_UNWIND (frame, -1, ENOMEM, NULL, 0);
+ return 0;
+ }
+
+ iot_queue (worker, stub);
+
+ return 0;
+}
+
+int32_t
+iot_flush_cbk (call_frame_t *frame,
+ void *cookie,
+ xlator_t *this,
+ int32_t op_ret,
+ int32_t op_errno)
+{
+ STACK_UNWIND (frame, op_ret, op_errno);
+ return 0;
+}
+
+static int32_t
+iot_flush_wrapper (call_frame_t *frame,
+ xlator_t *this,
+ fd_t *fd)
+{
+ STACK_WIND (frame,
+ iot_flush_cbk,
+ FIRST_CHILD(this),
+ FIRST_CHILD(this)->fops->flush,
+ fd);
+ return 0;
+}
+
+int32_t
+iot_flush (call_frame_t *frame,
+ xlator_t *this,
+ fd_t *fd)
+{
+ call_stub_t *stub;
+ iot_local_t *local = NULL;
+ iot_file_t *file = NULL;
+ iot_worker_t *worker = NULL;
+ uint64_t tmp_file = 0;
+
+ if (fd_ctx_get (fd, this, &tmp_file)) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "fd context is NULL, returning EBADFD");
+ STACK_UNWIND (frame, -1, EBADFD);
+ return 0;
+ }
+
+ file = (iot_file_t *)(long)tmp_file;
+ worker = file->worker;
+
+ local = CALLOC (1, sizeof (*local));
+ ERR_ABORT (local);
+
+ frame->local = local;
+
+ stub = fop_flush_stub (frame,
+ iot_flush_wrapper,
+ fd);
+ if (!stub) {
+ gf_log (this->name, GF_LOG_ERROR, "cannot get flush_cbk call stub");
+ STACK_UNWIND (frame, -1, ENOMEM);
+ return 0;
+ }
+ iot_queue (worker, stub);
+
+ return 0;
+}
+
+int32_t
+iot_fsync_cbk (call_frame_t *frame,
+ void *cookie,
+ xlator_t *this,
+ int32_t op_ret,
+ int32_t op_errno)
+{
+ STACK_UNWIND (frame, op_ret, op_errno);
+ return 0;
+}
+
+static int32_t
+iot_fsync_wrapper (call_frame_t *frame,
+ xlator_t *this,
+ fd_t *fd,
+ int32_t datasync)
+{
+ STACK_WIND (frame,
+ iot_fsync_cbk,
+ FIRST_CHILD (this),
+ FIRST_CHILD (this)->fops->fsync,
+ fd,
+ datasync);
+ return 0;
+}
+
+int32_t
+iot_fsync (call_frame_t *frame,
+ xlator_t *this,
+ fd_t *fd,
+ int32_t datasync)
+{
+ call_stub_t *stub;
+ iot_local_t *local = NULL;
+ iot_file_t *file = NULL;
+ iot_worker_t *worker = NULL;
+ uint64_t tmp_file = 0;
+
+ if (fd_ctx_get (fd, this, &tmp_file)) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "fd context is NULL, returning EBADFD");
+ STACK_UNWIND (frame, -1, EBADFD);
+ return 0;
+ }
+
+ file = (iot_file_t *)(long)tmp_file;
+ worker = file->worker;
+
+ local = CALLOC (1, sizeof (*local));
+ ERR_ABORT (local);
+
+ frame->local = local;
+
+ stub = fop_fsync_stub (frame,
+ iot_fsync_wrapper,
+ fd,
+ datasync);
+ if (!stub) {
+ gf_log (this->name, GF_LOG_ERROR, "cannot get fsync_cbk call stub");
+ STACK_UNWIND (frame, -1, ENOMEM);
+ return 0;
+ }
+ iot_queue (worker, stub);
+
+ return 0;
+}
+
+int32_t
+iot_writev_cbk (call_frame_t *frame,
+ void *cookie,
+ xlator_t *this,
+ int32_t op_ret,
+ int32_t op_errno,
+ struct stat *stbuf)
+{
+ iot_local_t *local = frame->local;
+
+ local->frame_size = 0; /* hehe, caught me! */
+
+ STACK_UNWIND (frame, op_ret, op_errno, stbuf);
+ return 0;
+}
+
+static int32_t
+iot_writev_wrapper (call_frame_t *frame,
+ xlator_t *this,
+ fd_t *fd,
+ struct iovec *vector,
+ int32_t count,
+ off_t offset)
+{
+ STACK_WIND (frame,
+ iot_writev_cbk,
+ FIRST_CHILD(this),
+ FIRST_CHILD(this)->fops->writev,
+ fd,
+ vector,
+ count,
+ offset);
+ return 0;
+}
+
+int32_t
+iot_writev (call_frame_t *frame,
+ xlator_t *this,
+ fd_t *fd,
+ struct iovec *vector,
+ int32_t count,
+ off_t offset)
+{
+ call_stub_t *stub;
+ iot_local_t *local = NULL;
+ iot_file_t *file = NULL;
+ iot_worker_t *worker = NULL;
+ uint64_t tmp_file = 0;
+
+ if (fd_ctx_get (fd, this, &tmp_file)) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "fd context is NULL, returning EBADFD");
+ STACK_UNWIND (frame, -1, EBADFD);
+ return 0;
+ }
+
+ file = (iot_file_t *)(long)tmp_file;
+ worker = file->worker;
+
+ local = CALLOC (1, sizeof (*local));
+ ERR_ABORT (local);
+
+ if (frame->root->req_refs)
+ local->frame_size = dict_serialized_length (frame->root->req_refs);
+ else
+ local->frame_size = iov_length (vector, count);
+ frame->local = local;
+
+ stub = fop_writev_stub (frame, iot_writev_wrapper,
+ fd, vector, count, offset);
+
+ if (!stub) {
+ gf_log (this->name, GF_LOG_ERROR, "cannot get writev call stub");
+ STACK_UNWIND (frame, -1, ENOMEM);
+ return 0;
+ }
+
+ iot_queue (worker, stub);
+
+ return 0;
+}
+
+
+int32_t
+iot_lk_cbk (call_frame_t *frame,
+ void *cookie,
+ xlator_t *this,
+ int32_t op_ret,
+ int32_t op_errno,
+ struct flock *flock)
+{
+ STACK_UNWIND (frame, op_ret, op_errno, flock);
+ return 0;
+}
+
+
+static int32_t
+iot_lk_wrapper (call_frame_t *frame,
+ xlator_t *this,
+ fd_t *fd,
+ int32_t cmd,
+ struct flock *flock)
+{
+ STACK_WIND (frame,
+ iot_lk_cbk,
+ FIRST_CHILD(this),
+ FIRST_CHILD(this)->fops->lk,
+ fd,
+ cmd,
+ flock);
+ return 0;
+}
+
+
+int32_t
+iot_lk (call_frame_t *frame,
+ xlator_t *this,
+ fd_t *fd,
+ int32_t cmd,
+ struct flock *flock)
+{
+ call_stub_t *stub;
+ iot_local_t *local = NULL;
+ iot_file_t *file = NULL;
+ iot_worker_t *worker = NULL;
+ uint64_t tmp_file = 0;
+
+ if (fd_ctx_get (fd, this, &tmp_file)) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "fd context is NULL, returning EBADFD");
+ STACK_UNWIND (frame, -1, EBADFD);
+ return 0;
+ }
+
+ file = (iot_file_t *)(long)tmp_file;
+ worker = file->worker;
+
+ local = CALLOC (1, sizeof (*local));
+ ERR_ABORT (local);
+ frame->local = local;
+
+ stub = fop_lk_stub (frame, iot_lk_wrapper,
+ fd, cmd, flock);
+
+ if (!stub) {
+ gf_log (this->name, GF_LOG_ERROR, "cannot get fop_lk call stub");
+ STACK_UNWIND (frame, -1, ENOMEM, NULL);
+ return 0;
+ }
+
+ iot_queue (worker, stub);
+
+ return 0;
+}
+
+
+int32_t
+iot_stat_cbk (call_frame_t *frame,
+ void *cookie,
+ xlator_t *this,
+ int32_t op_ret,
+ int32_t op_errno,
+ struct stat *buf)
+{
+ STACK_UNWIND (frame, op_ret, op_errno, buf);
+ return 0;
+}
+
+
+static int32_t
+iot_stat_wrapper (call_frame_t *frame,
+ xlator_t *this,
+ loc_t *loc)
+{
+ STACK_WIND (frame,
+ iot_stat_cbk,
+ FIRST_CHILD(this),
+ FIRST_CHILD(this)->fops->stat,
+ loc);
+ return 0;
+}
+
+int32_t
+iot_stat (call_frame_t *frame,
+ xlator_t *this,
+ loc_t *loc)
+{
+ call_stub_t *stub;
+ iot_local_t *local = NULL;
+ iot_worker_t *worker = NULL;
+ iot_conf_t *conf;
+ fd_t *fd = NULL;
+
+ conf = this->private;
+
+ local = CALLOC (1, sizeof (*local));
+ ERR_ABORT (local);
+ frame->local = local;
+
+ fd = fd_lookup (loc->inode, frame->root->pid);
+
+ if (fd == NULL) {
+ STACK_WIND(frame,
+ iot_stat_cbk,
+ FIRST_CHILD(this),
+ FIRST_CHILD(this)->fops->stat,
+ loc);
+ return 0;
+ }
+
+ fd_unref (fd);
+
+ worker = iot_schedule (conf, NULL, loc->inode->ino);
+
+ stub = fop_stat_stub (frame,
+ iot_stat_wrapper,
+ loc);
+ if (!stub) {
+ gf_log (this->name, GF_LOG_ERROR, "cannot get fop_stat call stub");
+ STACK_UNWIND (frame, -1, ENOMEM, NULL);
+ return 0;
+ }
+ iot_queue (worker, stub);
+
+ return 0;
+}
+
+
+int32_t
+iot_fstat_cbk (call_frame_t *frame,
+ void *cookie,
+ xlator_t *this,
+ int32_t op_ret,
+ int32_t op_errno,
+ struct stat *buf)
+{
+ STACK_UNWIND (frame, op_ret, op_errno, buf);
+ return 0;
+}
+
+static int32_t
+iot_fstat_wrapper (call_frame_t *frame,
+ xlator_t *this,
+ fd_t *fd)
+{
+ STACK_WIND (frame,
+ iot_fstat_cbk,
+ FIRST_CHILD(this),
+ FIRST_CHILD(this)->fops->fstat,
+ fd);
+ return 0;
+}
+
+int32_t
+iot_fstat (call_frame_t *frame,
+ xlator_t *this,
+ fd_t *fd)
+{
+ call_stub_t *stub;
+ iot_local_t *local = NULL;
+ iot_file_t *file = NULL;
+ iot_worker_t *worker = NULL;
+ uint64_t tmp_file = 0;
+
+ if (fd_ctx_get (fd, this, &tmp_file)) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "fd context is NULL, returning EBADFD");
+ STACK_UNWIND (frame, -1, EBADFD);
+ return 0;
+ }
+
+ file = (iot_file_t *)(long)tmp_file;
+ worker = file->worker;
+
+ local = CALLOC (1, sizeof (*local));
+ ERR_ABORT (local);
+ frame->local = local;
+ stub = fop_fstat_stub (frame,
+ iot_fstat_wrapper,
+ fd);
+ if (!stub) {
+ gf_log (this->name, GF_LOG_ERROR, "cannot get fop_fstat call stub");
+ STACK_UNWIND (frame, -1, ENOMEM, NULL);
+ return 0;
+ }
+
+ iot_queue (worker, stub);
+
+ return 0;
+}
+
+int32_t
+iot_truncate_cbk (call_frame_t *frame,
+ void *cookie,
+ xlator_t *this,
+ int32_t op_ret,
+ int32_t op_errno,
+ struct stat *buf)
+{
+ STACK_UNWIND (frame, op_ret, op_errno, buf);
+ return 0;
+}
+
+static int32_t
+iot_truncate_wrapper (call_frame_t *frame,
+ xlator_t *this,
+ loc_t *loc,
+ off_t offset)
+{
+ STACK_WIND (frame,
+ iot_truncate_cbk,
+ FIRST_CHILD(this),
+ FIRST_CHILD(this)->fops->truncate,
+ loc,
+ offset);
+ return 0;
+}
+
+int32_t
+iot_truncate (call_frame_t *frame,
+ xlator_t *this,
+ loc_t *loc,
+ off_t offset)
+{
+ call_stub_t *stub;
+ iot_local_t *local = NULL;
+ iot_worker_t *worker = NULL;
+ iot_conf_t *conf;
+ fd_t *fd = NULL;
+
+ conf = this->private;
+ local = CALLOC (1, sizeof (*local));
+ ERR_ABORT (local);
+ frame->local = local;
+
+ fd = fd_lookup (loc->inode, frame->root->pid);
+
+ if (fd == NULL) {
+ STACK_WIND(frame,
+ iot_truncate_cbk,
+ FIRST_CHILD(this),
+ FIRST_CHILD(this)->fops->truncate,
+ loc,
+ offset);
+ return 0;
+ }
+
+ fd_unref (fd);
+
+ worker = iot_schedule (conf, NULL, loc->inode->ino);
+
+ stub = fop_truncate_stub (frame,
+ iot_truncate_wrapper,
+ loc,
+ offset);
+ if (!stub) {
+ gf_log (this->name, GF_LOG_ERROR, "cannot get fop_stat call stub");
+ STACK_UNWIND (frame, -1, ENOMEM, NULL);
+ return 0;
+ }
+ iot_queue (worker, stub);
+
+ return 0;
+}
+
+int32_t
+iot_ftruncate_cbk (call_frame_t *frame,
+ void *cookie,
+ xlator_t *this,
+ int32_t op_ret,
+ int32_t op_errno,
+ struct stat *buf)
+{
+ STACK_UNWIND (frame, op_ret, op_errno, buf);
+ return 0;
+}
+
+static int32_t
+iot_ftruncate_wrapper (call_frame_t *frame,
+ xlator_t *this,
+ fd_t *fd,
+ off_t offset)
+{
+ STACK_WIND (frame,
+ iot_ftruncate_cbk,
+ FIRST_CHILD(this),
+ FIRST_CHILD(this)->fops->ftruncate,
+ fd,
+ offset);
+ return 0;
+}
+
+int32_t
+iot_ftruncate (call_frame_t *frame,
+ xlator_t *this,
+ fd_t *fd,
+ off_t offset)
+{
+ call_stub_t *stub;
+ iot_local_t *local = NULL;
+ iot_file_t *file = NULL;
+ iot_worker_t *worker = NULL;
+ uint64_t tmp_file = 0;
+
+ if (fd_ctx_get (fd, this, &tmp_file)) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "fd context is NULL, returning EBADFD");
+ STACK_UNWIND (frame, -1, EBADFD);
+ return 0;
+ }
+
+ file = (iot_file_t *)(long)tmp_file;
+ worker = file->worker;
+
+ local = CALLOC (1, sizeof (*local));
+ ERR_ABORT (local);
+ frame->local = local;
+
+ stub = fop_ftruncate_stub (frame,
+ iot_ftruncate_wrapper,
+ fd,
+ offset);
+ if (!stub) {
+ gf_log (this->name, GF_LOG_ERROR, "cannot get fop_ftruncate call stub");
+ STACK_UNWIND (frame, -1, ENOMEM, NULL);
+ return 0;
+ }
+ iot_queue (worker, stub);
+
+ return 0;
+}
+
+int32_t
+iot_utimens_cbk (call_frame_t *frame,
+ void *cookie,
+ xlator_t *this,
+ int32_t op_ret,
+ int32_t op_errno,
+ struct stat *buf)
+{
+ STACK_UNWIND (frame, op_ret, op_errno, buf);
+ return 0;
+}
+
+static int32_t
+iot_utimens_wrapper (call_frame_t *frame,
+ xlator_t *this,
+ loc_t *loc,
+ struct timespec tv[2])
+{
+ STACK_WIND (frame,
+ iot_utimens_cbk,
+ FIRST_CHILD(this),
+ FIRST_CHILD(this)->fops->utimens,
+ loc,
+ tv);
+
+ return 0;
+}
+
+int32_t
+iot_utimens (call_frame_t *frame,
+ xlator_t *this,
+ loc_t *loc,
+ struct timespec tv[2])
+{
+ call_stub_t *stub;
+ iot_local_t *local = NULL;
+ iot_worker_t *worker = NULL;
+ iot_conf_t *conf;
+ fd_t *fd = NULL;
+
+ conf = this->private;
+
+ local = CALLOC (1, sizeof (*local));
+ ERR_ABORT (local);
+ frame->local = local;
+
+ fd = fd_lookup (loc->inode, frame->root->pid);
+
+ if (fd == NULL) {
+ STACK_WIND(frame,
+ iot_utimens_cbk,
+ FIRST_CHILD(this),
+ FIRST_CHILD(this)->fops->utimens,
+ loc,
+ tv);
+ return 0;
+ }
+
+ fd_unref (fd);
+
+ worker = iot_schedule (conf, NULL, loc->inode->ino);
+
+ stub = fop_utimens_stub (frame,
+ iot_utimens_wrapper,
+ loc,
+ tv);
+ if (!stub) {
+ gf_log (this->name, GF_LOG_ERROR, "cannot get fop_utimens call stub");
+ STACK_UNWIND (frame, -1, ENOMEM, NULL);
+ return 0;
+ }
+ iot_queue (worker, stub);
+
+ return 0;
+}
+
+
+int32_t
+iot_checksum_cbk (call_frame_t *frame,
+ void *cookie,
+ xlator_t *this,
+ int32_t op_ret,
+ int32_t op_errno,
+ uint8_t *file_checksum,
+ uint8_t *dir_checksum)
+{
+ STACK_UNWIND (frame, op_ret, op_errno, file_checksum, dir_checksum);
+ return 0;
+}
+
+static int32_t
+iot_checksum_wrapper (call_frame_t *frame,
+ xlator_t *this,
+ loc_t *loc,
+ int32_t flags)
+{
+ STACK_WIND (frame,
+ iot_checksum_cbk,
+ FIRST_CHILD(this),
+ FIRST_CHILD(this)->fops->checksum,
+ loc,
+ flags);
+
+ return 0;
+}
+
+int32_t
+iot_checksum (call_frame_t *frame,
+ xlator_t *this,
+ loc_t *loc,
+ int32_t flags)
+{
+ call_stub_t *stub = NULL;
+ iot_local_t *local = NULL;
+ iot_worker_t *worker = NULL;
+ iot_conf_t *conf = NULL;
+
+ conf = this->private;
+
+ local = CALLOC (1, sizeof (*local));
+ frame->local = local;
+
+ worker = iot_schedule (conf, NULL, conf->misc_thread_index++);
+
+ stub = fop_checksum_stub (frame,
+ iot_checksum_wrapper,
+ loc,
+ flags);
+ if (!stub) {
+ gf_log (this->name, GF_LOG_ERROR, "cannot get fop_checksum call stub");
+ STACK_UNWIND (frame, -1, ENOMEM, NULL, NULL);
+ return 0;
+ }
+ iot_queue (worker, stub);
+
+ return 0;
+}
+
+
+int32_t
+iot_unlink_cbk (call_frame_t *frame,
+ void *cookie,
+ xlator_t *this,
+ int32_t op_ret,
+ int32_t op_errno)
+{
+ STACK_UNWIND (frame, op_ret, op_errno);
+ return 0;
+}
+
+static int32_t
+iot_unlink_wrapper (call_frame_t *frame,
+ xlator_t *this,
+ loc_t *loc)
+{
+ STACK_WIND (frame,
+ iot_unlink_cbk,
+ FIRST_CHILD(this),
+ FIRST_CHILD(this)->fops->unlink,
+ loc);
+
+ return 0;
+}
+
+int32_t
+iot_unlink (call_frame_t *frame,
+ xlator_t *this,
+ loc_t *loc)
+{
+ call_stub_t *stub = NULL;
+ iot_local_t *local = NULL;
+ iot_worker_t *worker = NULL;
+ iot_conf_t *conf = NULL;
+
+ conf = this->private;
+
+ local = CALLOC (1, sizeof (*local));
+ frame->local = local;
+
+ worker = iot_schedule (conf, NULL, conf->misc_thread_index++);
+
+ stub = fop_unlink_stub (frame, iot_unlink_wrapper, loc);
+ if (!stub) {
+ gf_log (this->name, GF_LOG_ERROR, "cannot get fop_unlink call stub");
+ STACK_UNWIND (frame, -1, ENOMEM);
+ return 0;
+ }
+ iot_queue (worker, stub);
+
+ return 0;
+}
+
+int32_t
+iot_release (xlator_t *this,
+ fd_t *fd)
+{
+ iot_file_t *file = NULL;
+ iot_conf_t *conf = NULL;
+ uint64_t tmp_file = 0;
+ int ret = 0;
+
+ conf = this->private;
+ ret = fd_ctx_del (fd, this, &tmp_file);
+ if (ret)
+ return 0;
+
+ file = (iot_file_t *)(long)tmp_file;
+
+ pthread_mutex_lock (&conf->files_lock);
+ {
+ (file->prev)->next = file->next;
+ (file->next)->prev = file->prev;
+ }
+ pthread_mutex_unlock (&conf->files_lock);
+
+ FREE (file);
+ return 0;
+}
+
+
+static void
+iot_queue (iot_worker_t *worker,
+ call_stub_t *stub)
+{
+ iot_queue_t *queue;
+ iot_conf_t *conf = worker->conf;
+ iot_local_t *local = stub->frame->local;
+ size_t frame_size = local->frame_size;
+
+ queue = CALLOC (1, sizeof (*queue));
+ ERR_ABORT (queue);
+ queue->stub = stub;
+
+ pthread_mutex_lock (&conf->lock);
+
+ /*
+ while (worker->queue_size >= worker->queue_limit)
+ pthread_cond_wait (&worker->q_cond, &worker->lock);
+ */
+ if (conf->cache_size) {
+ while (frame_size && (conf->current_size >= conf->cache_size))
+ pthread_cond_wait (&conf->q_cond, &conf->lock);
+ }
+
+ queue->next = &worker->queue;
+ queue->prev = worker->queue.prev;
+
+ queue->next->prev = queue;
+ queue->prev->next = queue;
+
+ /* dq_cond */
+ worker->queue_size++;
+ worker->q++;
+
+ conf->current_size += local->frame_size;
+
+ pthread_cond_broadcast (&worker->dq_cond);
+
+ pthread_mutex_unlock (&conf->lock);
+}
+
+static call_stub_t *
+iot_dequeue (iot_worker_t *worker)
+{
+ call_stub_t *stub = NULL;
+ iot_queue_t *queue = NULL;
+ iot_conf_t *conf = worker->conf;
+ iot_local_t *local = NULL;
+
+
+ pthread_mutex_lock (&conf->lock);
+
+ while (!worker->queue_size)
+ /*
+ pthread_cond_wait (&worker->dq_cond, &worker->lock);
+ */
+ pthread_cond_wait (&worker->dq_cond, &conf->lock);
+
+ queue = worker->queue.next;
+
+ queue->next->prev = queue->prev;
+ queue->prev->next = queue->next;
+
+ stub = queue->stub;
+ local = stub->frame->local;
+
+ worker->queue_size--;
+ worker->dq++;
+
+ /* q_cond */
+ conf->current_size -= local->frame_size;
+
+ pthread_cond_broadcast (&conf->q_cond);
+
+ pthread_mutex_unlock (&conf->lock);
+
+ FREE (queue);
+
+ return stub;
+}
+
+static void *
+iot_worker (void *arg)
+{
+ iot_worker_t *worker = arg;
+
+ while (1) {
+ call_stub_t *stub;
+
+ stub = iot_dequeue (worker);
+ call_resume (stub);
+ }
+}
+
+#if 0
+static void *
+iot_reply (void *arg)
+{
+ iot_worker_t *reply = arg;
+
+ while (1) {
+ call_stub_t *stub;
+
+ stub = iot_dequeue (reply);
+ FREE (stub->frame->local);
+ stub->frame->local = NULL;
+ call_resume (stub);
+ }
+}
+#endif
+
+static void
+workers_init (iot_conf_t *conf)
+{
+ int i;
+
+ conf->workers.next = &conf->workers;
+ conf->workers.prev = &conf->workers;
+
+ for (i=0; i<conf->thread_count; i++) {
+
+ iot_worker_t *worker = CALLOC (1, sizeof (*worker));
+ ERR_ABORT (worker);
+
+ worker->next = &conf->workers;
+ worker->prev = conf->workers.prev;
+ worker->next->prev = worker;
+ worker->prev->next = worker;
+
+ worker->queue.next = &worker->queue;
+ worker->queue.prev = &worker->queue;
+
+ /*
+ pthread_mutex_init (&worker->lock, NULL);
+ pthread_cond_init (&worker->q_cond, NULL);
+ */
+ pthread_cond_init (&worker->dq_cond, NULL);
+
+ /*
+ worker->queue_limit = conf->queue_limit;
+ */
+
+ worker->conf = conf;
+
+ pthread_create (&worker->thread, NULL, iot_worker, worker);
+ }
+}
+
+int32_t
+init (xlator_t *this)
+{
+ iot_conf_t *conf;
+ dict_t *options = this->options;
+
+ if (!this->children || this->children->next) {
+ gf_log ("io-threads",
+ GF_LOG_ERROR,
+ "FATAL: iot not configured with exactly one child");
+ return -1;
+ }
+
+ if (!this->parents) {
+ gf_log (this->name, GF_LOG_WARNING,
+ "dangling volume. check volfile ");
+ }
+
+ conf = (void *) CALLOC (1, sizeof (*conf));
+ ERR_ABORT (conf);
+
+ conf->thread_count = 1;
+
+ if (dict_get (options, "thread-count")) {
+ conf->thread_count = data_to_int32 (dict_get (options,
+ "thread-count"));
+ gf_log ("io-threads",
+ GF_LOG_DEBUG,
+ "Using conf->thread_count = %d",
+ conf->thread_count);
+ }
+
+ pthread_mutex_init (&conf->lock, NULL);
+ pthread_cond_init (&conf->q_cond, NULL);
+
+ conf->files.next = &conf->files;
+ conf->files.prev = &conf->files;
+ pthread_mutex_init (&conf->files_lock, NULL);
+
+ workers_init (conf);
+
+ this->private = conf;
+ return 0;
+}
+
+void
+fini (xlator_t *this)
+{
+ iot_conf_t *conf = this->private;
+
+ FREE (conf);
+
+ this->private = NULL;
+ return;
+}
+
+struct xlator_fops fops = {
+ .open = iot_open,
+ .create = iot_create,
+ .readv = iot_readv,
+ .writev = iot_writev,
+ .flush = iot_flush,
+ .fsync = iot_fsync,
+ .lk = iot_lk,
+ .stat = iot_stat,
+ .fstat = iot_fstat,
+ .truncate = iot_truncate,
+ .ftruncate = iot_ftruncate,
+ .utimens = iot_utimens,
+ .checksum = iot_checksum,
+ .unlink = iot_unlink,
+};
+
+struct xlator_mops mops = {
+};
+
+struct xlator_cbks cbks = {
+ .release = iot_release,
+};
+
+struct volume_options options[] = {
+ { .key = {"thread-count"},
+ .type = GF_OPTION_TYPE_INT,
+ .min = 1,
+ .max = 32
+ },
+ { .key = {NULL} },
+};