diff options
Diffstat (limited to 'xlators/performance/io-threads/src')
| -rw-r--r-- | xlators/performance/io-threads/src/Makefile.am | 10 | ||||
| -rw-r--r-- | xlators/performance/io-threads/src/io-threads-messages.h | 41 | ||||
| -rw-r--r-- | xlators/performance/io-threads/src/io-threads.c | 3726 | ||||
| -rw-r--r-- | xlators/performance/io-threads/src/io-threads.h | 229 | ||||
| -rw-r--r-- | xlators/performance/io-threads/src/iot-mem-types.h | 21 |
5 files changed, 1318 insertions, 2709 deletions
diff --git a/xlators/performance/io-threads/src/Makefile.am b/xlators/performance/io-threads/src/Makefile.am index 38dea3eb7fc..7570cf41ed2 100644 --- a/xlators/performance/io-threads/src/Makefile.am +++ b/xlators/performance/io-threads/src/Makefile.am @@ -1,14 +1,16 @@ xlator_LTLIBRARIES = io-threads.la xlatordir = $(libdir)/glusterfs/$(PACKAGE_VERSION)/xlator/performance -io_threads_la_LDFLAGS = -module -avoidversion +io_threads_la_LDFLAGS = -module $(GF_XLATOR_DEFAULT_LDFLAGS) io_threads_la_SOURCES = io-threads.c io_threads_la_LIBADD = $(top_builddir)/libglusterfs/src/libglusterfs.la -noinst_HEADERS = io-threads.h +noinst_HEADERS = io-threads.h iot-mem-types.h io-threads-messages.h -AM_CFLAGS = -fPIC -D_FILE_OFFSET_BITS=64 -D_GNU_SOURCE -Wall -D$(GF_HOST_OS)\ - -I$(top_srcdir)/libglusterfs/src -shared -nostartfiles $(GF_CFLAGS) +AM_CPPFLAGS = $(GF_CPPFLAGS) -I$(top_srcdir)/libglusterfs/src \ + -I$(top_srcdir)/rpc/xdr/src -I$(top_builddir)/rpc/xdr/src + +AM_CFLAGS = -Wall $(GF_CFLAGS) CLEANFILES = diff --git a/xlators/performance/io-threads/src/io-threads-messages.h b/xlators/performance/io-threads/src/io-threads-messages.h new file mode 100644 index 00000000000..6229c353f96 --- /dev/null +++ b/xlators/performance/io-threads/src/io-threads-messages.h @@ -0,0 +1,41 @@ +/*Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com> + This file is part of GlusterFS. + + This file is licensed to you under your choice of the GNU Lesser + General Public License, version 3 or any later version (LGPLv3 or + later), or the GNU General Public License, version 2 (GPLv2), in all + cases as published by the Free Software Foundation. +*/ + +#ifndef _IO_THREADS_MESSAGES_H_ +#define _IO_THREADS_MESSAGES_H_ + +#include <glusterfs/glfs-message-id.h> + +/* To add new message IDs, append new identifiers at the end of the list. + * + * Never remove a message ID. If it's not used anymore, you can rename it or + * leave it as it is, but not delete it. This is to prevent reutilization of + * IDs by other messages. + * + * The component name must match one of the entries defined in + * glfs-message-id.h. + */ + +GLFS_MSGID(IO_THREADS, IO_THREADS_MSG_INIT_FAILED, + IO_THREADS_MSG_XLATOR_CHILD_MISCONFIGURED, IO_THREADS_MSG_NO_MEMORY, + IO_THREADS_MSG_VOL_MISCONFIGURED, IO_THREADS_MSG_SIZE_NOT_SET, + IO_THREADS_MSG_OUT_OF_MEMORY, IO_THREADS_MSG_PTHREAD_INIT_FAILED, + IO_THREADS_MSG_WORKER_THREAD_INIT_FAILED); + +#define IO_THREADS_MSG_INIT_FAILED_STR "Thread attribute initialization failed" +#define IO_THREADS_MSG_SIZE_NOT_SET_STR "Using default thread stack size" +#define IO_THREADS_MSG_NO_MEMORY_STR "Memory accounting init failed" +#define IO_THREADS_MSG_XLATOR_CHILD_MISCONFIGURED_STR \ + "FATAL: iot not configured with exactly one child" +#define IO_THREADS_MSG_VOL_MISCONFIGURED_STR "dangling volume. check volfile" +#define IO_THREADS_MSG_OUT_OF_MEMORY_STR "out of memory" +#define IO_THREADS_MSG_PTHREAD_INIT_FAILED_STR "init failed" +#define IO_THREADS_MSG_WORKER_THREAD_INIT_FAILED_STR \ + "cannot initialize worker threads, exiting init" +#endif /* _IO_THREADS_MESSAGES_H_ */ diff --git a/xlators/performance/io-threads/src/io-threads.c b/xlators/performance/io-threads/src/io-threads.c index e99012cc0d6..3d24cc97f4b 100644 --- a/xlators/performance/io-threads/src/io-threads.c +++ b/xlators/performance/io-threads/src/io-threads.c @@ -1,2948 +1,1590 @@ /* - Copyright (c) 2006-2009 Gluster, Inc. <http://www.gluster.com> + Copyright (c) 2008-2012 Red Hat, Inc. <http://www.redhat.com> This file is part of GlusterFS. - GlusterFS is free software; you can redistribute it and/or modify - it under the terms of the GNU General Public License as published - by the Free Software Foundation; either version 3 of the License, - or (at your option) any later version. - - GlusterFS is distributed in the hope that it will be useful, but - WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - General Public License for more details. - - You should have received a copy of the GNU General Public License - along with this program. If not, see - <http://www.gnu.org/licenses/>. + This file is licensed to you under your choice of the GNU Lesser + General Public License, version 3 or any later version (LGPLv3 or + later), or the GNU General Public License, version 2 (GPLv2), in all + cases as published by the Free Software Foundation. */ -#ifndef _CONFIG_H -#define _CONFIG_H -#include "config.h" -#endif - -#include "call-stub.h" -#include "glusterfs.h" -#include "logging.h" -#include "dict.h" -#include "xlator.h" +#include <glusterfs/call-stub.h> +#include <glusterfs/defaults.h> +#include <glusterfs/glusterfs.h> +#include <glusterfs/logging.h> +#include <glusterfs/dict.h> +#include <glusterfs/xlator.h> #include "io-threads.h" +#include <signal.h> #include <stdlib.h> #include <sys/time.h> #include <time.h> -#include "locking.h" - -typedef void *(*iot_worker_fn)(void*); - -void -iot_stop_worker (iot_worker_t *worker); - -void -iot_stop_workers (iot_worker_t **workers, int start_idx, int count); - -void -_iot_queue (iot_worker_t *worker, iot_request_t *req); - -iot_request_t * -iot_init_request (iot_worker_t *conf, call_stub_t *stub); - -int -iot_startup_workers (iot_worker_t **workers, int start_idx, int count, - iot_worker_fn workerfunc); +#include <glusterfs/locking.h> +#include "io-threads-messages.h" +#include <glusterfs/timespec.h> void * -iot_worker_unordered (void *arg); - -void * -iot_worker_ordered (void *arg); - -int -iot_startup_worker (iot_worker_t *worker, iot_worker_fn workerfunc); - -void -iot_destroy_request (iot_worker_t *worker, iot_request_t * req); - -void -iot_notify_worker (iot_worker_t *worker) -{ -#ifndef HAVE_SPINLOCK - pthread_cond_broadcast (&worker->notifier); -#else - sem_post (&worker->notifier); -#endif - - return; +iot_worker(void *arg); +int +iot_workers_scale(iot_conf_t *conf); +int +__iot_workers_scale(iot_conf_t *conf); +struct volume_options options[]; + +#define IOT_FOP(name, frame, this, args...) \ + do { \ + call_stub_t *__stub = NULL; \ + int __ret = -1; \ + \ + __stub = fop_##name##_stub(frame, default_##name##_resume, args); \ + if (!__stub) { \ + __ret = -ENOMEM; \ + goto out; \ + } \ + \ + __ret = iot_schedule(frame, this, __stub); \ + \ + out: \ + if (__ret < 0) { \ + default_##name##_failure_cbk(frame, -__ret); \ + if (__stub != NULL) { \ + call_stub_destroy(__stub); \ + } \ + } \ + } while (0) + +iot_client_ctx_t * +iot_get_ctx(xlator_t *this, client_t *client) +{ + iot_client_ctx_t *ctx = NULL; + iot_client_ctx_t *setted_ctx = NULL; + int i; + + if (client_ctx_get(client, this, (void **)&ctx) != 0) { + ctx = GF_MALLOC(GF_FOP_PRI_MAX * sizeof(*ctx), gf_iot_mt_client_ctx_t); + if (ctx) { + for (i = 0; i < GF_FOP_PRI_MAX; ++i) { + INIT_LIST_HEAD(&ctx[i].clients); + INIT_LIST_HEAD(&ctx[i].reqs); + } + setted_ctx = client_ctx_set(client, this, ctx); + if (ctx != setted_ctx) { + GF_FREE(ctx); + ctx = setted_ctx; + } + } + } + + return ctx; } -int -iot_notify_wait (iot_worker_t *worker, int idletime) -{ - struct timeval tv; - struct timespec ts = {0, }; - int waitres = 0; - - gettimeofday (&tv, NULL); - /* Slightly skew the idle time for threads so that, we dont - * have all of them rushing to exit at the same time, if - * they've been idle. - */ - ts.tv_sec = skew_sec_idle_time (tv.tv_sec + idletime); - -#ifndef HAVE_SPINLOCK - waitres = pthread_cond_timedwait (&worker->notifier, &worker->qlock, - &ts); -#else - UNLOCK (&worker->qlock); - errno = 0; - waitres = sem_timedwait (&worker->notifier, &ts); - LOCK (&worker->qlock); - if (waitres < 0) - waitres = errno; -#endif - - return waitres; -} - -void -iot_notify_init (iot_worker_t *worker) +call_stub_t * +__iot_dequeue(iot_conf_t *conf, int *pri) { - if (worker == NULL) - return; + call_stub_t *stub = NULL; + int i = 0; + iot_client_ctx_t *ctx; - LOCK_INIT (&worker->qlock); - -#ifndef HAVE_SPINLOCK - pthread_cond_init (&worker->notifier, NULL); -#else - sem_init (&worker->notifier, 0, 0); -#endif - - return; -} - -/* I know this function modularizes things a bit too much, - * but it is easier on the eyes to read this than see all that locking, - * queueing, and thread firing in the same curly block, as was the - * case before this function. - */ -int -iot_request_queue_and_thread_fire (iot_worker_t *worker, - iot_worker_fn workerfunc, iot_request_t *req) -{ - int ret = -1; - LOCK (&worker->qlock); - { - if (iot_worker_active (worker)) { - _iot_queue (worker, req); - ret = 0; - }else { - ret = iot_startup_worker (worker, workerfunc); - if (ret < 0) { - goto unlock; - } - _iot_queue (worker, req); - } + *pri = -1; + for (i = 0; i < GF_FOP_PRI_MAX; i++) { + if (conf->ac_iot_count[i] >= conf->ac_iot_limit[i]) { + continue; } -unlock: - UNLOCK (&worker->qlock); - - return ret; -} - - -int -iot_unordered_request_balancer (iot_conf_t *conf) -{ - long int rand = 0; - int idx = 0; - - /* Decide which thread will service the request. - * FIXME: This should change into some form of load-balancing. - * */ - rand = random (); - - /* If scaling is on, we can choose from any thread - * that has been allocated upto, max_o_threads, but - * with scaling off, we'll never have threads more - * than min_o_threads. - */ - if (iot_unordered_scaling_on (conf)) - idx = (rand % conf->max_u_threads); - else - idx = (rand % conf->min_u_threads); - return idx; -} - - -int -iot_schedule_unordered (iot_conf_t *conf, inode_t *inode, call_stub_t *stub) -{ - int32_t idx = 0; - iot_worker_t *selected_worker = NULL; - iot_request_t *req = NULL; - int ret = -1; - - idx = iot_unordered_request_balancer (conf); - selected_worker = conf->uworkers[idx]; - - req = iot_init_request (selected_worker, stub); - if (req == NULL) { - ret = -ENOMEM; - goto out; + if (list_empty(&conf->clients[i])) { + continue; } - ret = iot_request_queue_and_thread_fire (selected_worker, - iot_worker_unordered, req); - if (ret < 0) { - iot_destroy_request (selected_worker, req); + /* Get the first per-client queue for this priority. */ + ctx = list_first_entry(&conf->clients[i], iot_client_ctx_t, clients); + if (!ctx) { + continue; } -out: - return ret; -} - - -/* Only to be used with ordered requests. - */ -uint64_t -iot_create_inode_worker_assoc (iot_conf_t * conf, inode_t * inode) -{ - long int rand = 0; - uint64_t idx = 0; - rand = random (); - /* If scaling is on, we can choose from any thread - * that has been allocated upto, max_o_threads, but - * with scaling off, we'll never have threads more - * than min_o_threads. - */ - if (iot_ordered_scaling_on (conf)) - idx = (rand % conf->max_o_threads); - else - idx = (rand % conf->min_o_threads); - - __inode_ctx_put (inode, conf->this, idx); - - return idx; -} - - -/* Assumes inode lock is held. */ -int32_t -iot_ordered_request_balancer (iot_conf_t *conf, inode_t *inode, uint64_t *idx) -{ - int ret = -1; - - if (__inode_ctx_get (inode, conf->this, idx) < 0) - *idx = iot_create_inode_worker_assoc (conf, inode); - else { - /* Sanity check to ensure the idx received from the inode - * context is within bounds. We're a bit optimistic in - * assuming that if an index is within bounds, it is - * not corrupted. idx is uint so we dont check for less - * than 0. - */ - if ((*idx >= (uint64_t)conf->max_o_threads)) { - gf_log (conf->this->name, GF_LOG_DEBUG, - "inode context returned insane thread index %" - PRIu64, *idx); - ret = -EINVAL; - goto out; - } + if (list_empty(&ctx->reqs)) { + continue; } - ret = 0; -out: - return ret; -} - -int -iot_schedule_ordered (iot_conf_t *conf, inode_t *inode, call_stub_t *stub) -{ - uint64_t idx = 0; - iot_worker_t *selected_worker = NULL; - iot_request_t *req = NULL; - int balstatus = 0, ret = -1; - - if (inode == NULL) { - gf_log (conf->this->name, GF_LOG_DEBUG, - "Got NULL inode for ordered request"); - ret = -EINVAL; - goto out; + /* Get the first request on that queue. */ + stub = list_first_entry(&ctx->reqs, call_stub_t, list); + list_del_init(&stub->list); + if (list_empty(&ctx->reqs)) { + list_del_init(&ctx->clients); + } else { + list_rotate_left(&conf->clients[i]); } - LOCK (&inode->lock); - { - balstatus = iot_ordered_request_balancer (conf, inode, &idx); - if (balstatus < 0) { - gf_log (conf->this->name, GF_LOG_DEBUG, - "Insane worker index. Unwinding stack"); - ret = -ECANCELED; - goto unlock_out; - } - /* inode lock once acquired, cannot be left here - * because other gluster main threads might be - * contending on it to append a request for this file. - * So we'll also leave the lock only after we've - * added the request to the worker queue. - */ - selected_worker = conf->oworkers[idx]; - - req = iot_init_request (selected_worker, stub); - if (req == NULL) { - gf_log (conf->this->name, GF_LOG_ERROR,"out of memory"); - ret = -ENOMEM; - goto unlock_out; - } + conf->ac_iot_count[i]++; + conf->queue_marked[i] = _gf_false; + *pri = i; + break; + } - ret = iot_request_queue_and_thread_fire (selected_worker, - iot_worker_ordered, - req); - } -unlock_out: - UNLOCK (&inode->lock); - -out: - if (ret < 0) { - if (req != NULL) { - iot_destroy_request (selected_worker, req); - } - } - return ret; -} + if (!stub) + return NULL; + conf->queue_size--; + conf->queue_sizes[*pri]--; -int -iot_lookup_cbk (call_frame_t *frame, void * cookie, xlator_t *this, - int32_t op_ret, int32_t op_errno, - inode_t *inode, struct stat *buf, dict_t *xattr, - struct stat *postparent) -{ - STACK_UNWIND_STRICT (lookup, frame, op_ret, op_errno, inode, buf, xattr, - postparent); - return 0; + return stub; } - -int -iot_lookup_wrapper (call_frame_t *frame, xlator_t *this, loc_t *loc, - dict_t *xattr_req) +void +__iot_enqueue(iot_conf_t *conf, call_stub_t *stub, int pri) { - STACK_WIND (frame, iot_lookup_cbk, - FIRST_CHILD (this), - FIRST_CHILD (this)->fops->lookup, - loc, xattr_req); - return 0; -} + client_t *client = stub->frame->root->client; + iot_client_ctx_t *ctx; + if (pri < 0 || pri >= GF_FOP_PRI_MAX) + pri = GF_FOP_PRI_MAX - 1; -int -iot_lookup (call_frame_t *frame, xlator_t *this, loc_t *loc, dict_t *xattr_req) -{ - call_stub_t *stub = NULL; - int ret = -1; - - stub = fop_lookup_stub (frame, iot_lookup_wrapper, loc, xattr_req); - if (!stub) { - gf_log (this->name, GF_LOG_ERROR, - "cannot create lookup stub (out of memory)"); - ret = -ENOMEM; - goto out; + if (client) { + ctx = iot_get_ctx(THIS, client); + if (ctx) { + ctx = &ctx[pri]; } + } else { + ctx = NULL; + } + if (!ctx) { + ctx = &conf->no_client[pri]; + } - ret = iot_schedule_unordered ((iot_conf_t *)this->private, loc->inode, - stub); - -out: - if (ret < 0) { - if (stub != NULL) { - call_stub_destroy (stub); - } - STACK_UNWIND_STRICT (lookup, frame, -1, -ret, NULL, NULL, NULL, - NULL); - } + if (list_empty(&ctx->reqs)) { + list_add_tail(&ctx->clients, &conf->clients[pri]); + } + list_add_tail(&stub->list, &ctx->reqs); - return 0; -} - - -int -iot_setattr_cbk (call_frame_t *frame, void *cookie, xlator_t *this, - int32_t op_ret, int32_t op_errno, - struct stat *preop, struct stat *postop) -{ - STACK_UNWIND_STRICT (setattr, frame, op_ret, op_errno, preop, postop); - return 0; + conf->queue_size++; + GF_ATOMIC_INC(conf->stub_cnt); + conf->queue_sizes[pri]++; } - -int -iot_setattr_wrapper (call_frame_t *frame, xlator_t *this, loc_t *loc, - struct stat *stbuf, int32_t valid) -{ - STACK_WIND (frame, iot_setattr_cbk, - FIRST_CHILD (this), - FIRST_CHILD (this)->fops->setattr, - loc, stbuf, valid); - return 0; -} - - -int -iot_setattr (call_frame_t *frame, xlator_t *this, loc_t *loc, - struct stat *stbuf, int32_t valid) -{ - call_stub_t *stub = NULL; - int ret = -1; - - stub = fop_setattr_stub (frame, iot_setattr_wrapper, loc, stbuf, valid); - if (!stub) { - gf_log (this->name, GF_LOG_ERROR, "Cannot create setattr stub" - "(Out of memory)"); - ret = -ENOMEM; - goto out; - } - - ret = iot_schedule_unordered ((iot_conf_t *)this->private, - loc->inode, stub); - -out: - if (ret < 0) { - if (stub != NULL) { - call_stub_destroy (stub); +void * +iot_worker(void *data) +{ + iot_conf_t *conf = NULL; + xlator_t *this = NULL; + call_stub_t *stub = NULL; + struct timespec sleep_till = { + 0, + }; + int ret = 0; + int pri = -1; + gf_boolean_t bye = _gf_false; + + conf = data; + this = conf->this; + THIS = this; + + for (;;) { + pthread_mutex_lock(&conf->mutex); + { + if (pri != -1) { + conf->ac_iot_count[pri]--; + pri = -1; + } + while (conf->queue_size == 0) { + if (conf->down) { + bye = _gf_true; /*Avoid sleep*/ + break; } - STACK_UNWIND_STRICT (setattr, frame, -1, -ret, NULL, NULL); - } - - return 0; -} + clock_gettime(CLOCK_REALTIME_COARSE, &sleep_till); + sleep_till.tv_sec += conf->idle_time; + conf->sleep_count++; + ret = pthread_cond_timedwait(&conf->cond, &conf->mutex, + &sleep_till); + conf->sleep_count--; -int -iot_fsetattr_cbk (call_frame_t *frame, void *cookie, xlator_t *this, - int32_t op_ret, int32_t op_errno, - struct stat *preop, struct stat *postop) -{ - STACK_UNWIND_STRICT (fsetattr, frame, op_ret, op_errno, preop, postop); - return 0; -} - - -int -iot_fsetattr_wrapper (call_frame_t *frame, xlator_t *this, - fd_t *fd, struct stat *stbuf, int32_t valid) -{ - STACK_WIND (frame, iot_fsetattr_cbk, FIRST_CHILD (this), - FIRST_CHILD (this)->fops->fsetattr, fd, stbuf, valid); - return 0; -} - - -int -iot_fsetattr (call_frame_t *frame, xlator_t *this, fd_t *fd, - struct stat *stbuf, int32_t valid) -{ - call_stub_t *stub = NULL; - int ret = -1; - - stub = fop_fsetattr_stub (frame, iot_fsetattr_wrapper, fd, stbuf, - valid); - if (!stub) { - gf_log (this->name, GF_LOG_ERROR, "cannot create fsetattr stub" - "(out of memory)"); - ret = -ENOMEM; - goto out; - } - - ret = iot_schedule_ordered ((iot_conf_t *)this->private, fd->inode, - stub); - -out: - if (ret < 0) { - STACK_UNWIND_STRICT (fsetattr, frame, -1, -ret, NULL, NULL); - if (stub != NULL) { - call_stub_destroy (stub); + if (conf->down || ret == ETIMEDOUT) { + bye = _gf_true; + break; } - } - return 0; -} - - -int -iot_access_cbk (call_frame_t *frame, void *cookie, xlator_t *this, - int32_t op_ret, int32_t op_errno) -{ - STACK_UNWIND_STRICT (access, frame, op_ret, op_errno); - return 0; -} - - -int -iot_access_wrapper (call_frame_t *frame, xlator_t *this, loc_t *loc, - int32_t mask) -{ - STACK_WIND (frame, iot_access_cbk, FIRST_CHILD (this), - FIRST_CHILD (this)->fops->access, loc, mask); - return 0; -} - - -int -iot_access (call_frame_t *frame, xlator_t *this, loc_t *loc, int32_t mask) -{ - call_stub_t *stub = NULL; - int ret = -1; - - stub = fop_access_stub (frame, iot_access_wrapper, loc, mask); - if (!stub) { - gf_log (this->name, GF_LOG_ERROR, "cannot create access stub" - "(out of memory)"); - ret = -ENOMEM; - goto out; - } - - ret = iot_schedule_unordered ((iot_conf_t *)this->private, loc->inode, - stub); -out: - if (ret < 0) { - STACK_UNWIND_STRICT (access, frame, -1, -ret); - - if (stub != NULL) { - call_stub_destroy (stub); + } + + if (bye) { + if (conf->down || conf->curr_count > IOT_MIN_THREADS) { + conf->curr_count--; + if (conf->curr_count == 0) + pthread_cond_broadcast(&conf->cond); + gf_msg_debug(conf->this->name, 0, + "terminated. " + "conf->curr_count=%d", + conf->curr_count); + } else { + bye = _gf_false; } - } - return 0; -} - - -int -iot_readlink_cbk (call_frame_t *frame, void *cookie, xlator_t *this, - int32_t op_ret, int32_t op_errno, const char *path, - struct stat *stbuf) -{ - STACK_UNWIND_STRICT (readlink, frame, op_ret, op_errno, path, stbuf); - return 0; -} - - -int -iot_readlink_wrapper (call_frame_t *frame, xlator_t *this, loc_t *loc, - size_t size) -{ - STACK_WIND (frame, iot_readlink_cbk, - FIRST_CHILD (this), - FIRST_CHILD (this)->fops->readlink, - loc, size); - return 0; -} - - -int -iot_readlink (call_frame_t *frame, xlator_t *this, loc_t *loc, size_t size) -{ - call_stub_t *stub = NULL; - int ret = -1; - - stub = fop_readlink_stub (frame, iot_readlink_wrapper, loc, size); - if (!stub) { - gf_log (this->name, GF_LOG_ERROR, "cannot create readlink stub" - "(out of memory)"); - ret = -ENOMEM; - goto out; - } - - ret = iot_schedule_unordered ((iot_conf_t *)this->private, loc->inode, - stub); - + } + + if (!bye) + stub = __iot_dequeue(conf, &pri); + } + pthread_mutex_unlock(&conf->mutex); + + if (stub) { /* guard against spurious wakeups */ + if (stub->poison) { + gf_log(this->name, GF_LOG_INFO, "Dropping poisoned request %p.", + stub); + call_stub_destroy(stub); + } else { + call_resume(stub); + } + GF_ATOMIC_DEC(conf->stub_cnt); + } + stub = NULL; + + if (bye) + break; + } + + return NULL; +} + +int +do_iot_schedule(iot_conf_t *conf, call_stub_t *stub, int pri) +{ + int ret = 0; + + pthread_mutex_lock(&conf->mutex); + { + __iot_enqueue(conf, stub, pri); + + pthread_cond_signal(&conf->cond); + + ret = __iot_workers_scale(conf); + } + pthread_mutex_unlock(&conf->mutex); + + return ret; +} + +char * +iot_get_pri_meaning(gf_fop_pri_t pri) +{ + char *name = NULL; + switch (pri) { + case GF_FOP_PRI_HI: + name = "fast"; + break; + case GF_FOP_PRI_NORMAL: + name = "normal"; + break; + case GF_FOP_PRI_LO: + name = "slow"; + break; + case GF_FOP_PRI_LEAST: + name = "least"; + break; + case GF_FOP_PRI_MAX: + name = "invalid"; + break; + case GF_FOP_PRI_UNSPEC: + name = "unspecified"; + break; + } + return name; +} + +int +iot_schedule(call_frame_t *frame, xlator_t *this, call_stub_t *stub) +{ + int ret = -1; + gf_fop_pri_t pri = GF_FOP_PRI_MAX - 1; + iot_conf_t *conf = this->private; + + if ((frame->root->pid < GF_CLIENT_PID_MAX) && + (frame->root->pid != GF_CLIENT_PID_NO_ROOT_SQUASH) && + conf->least_priority) { + pri = GF_FOP_PRI_LEAST; + goto out; + } + + switch (stub->fop) { + case GF_FOP_OPEN: + case GF_FOP_STAT: + case GF_FOP_FSTAT: + case GF_FOP_LOOKUP: + case GF_FOP_ACCESS: + case GF_FOP_READLINK: + case GF_FOP_OPENDIR: + case GF_FOP_STATFS: + case GF_FOP_READDIR: + case GF_FOP_READDIRP: + case GF_FOP_GETACTIVELK: + case GF_FOP_SETACTIVELK: + case GF_FOP_ICREATE: + case GF_FOP_NAMELINK: + pri = GF_FOP_PRI_HI; + break; + + case GF_FOP_CREATE: + case GF_FOP_FLUSH: + case GF_FOP_LK: + case GF_FOP_INODELK: + case GF_FOP_FINODELK: + case GF_FOP_ENTRYLK: + case GF_FOP_FENTRYLK: + case GF_FOP_LEASE: + case GF_FOP_UNLINK: + case GF_FOP_SETATTR: + case GF_FOP_FSETATTR: + case GF_FOP_MKNOD: + case GF_FOP_MKDIR: + case GF_FOP_RMDIR: + case GF_FOP_SYMLINK: + case GF_FOP_RENAME: + case GF_FOP_LINK: + case GF_FOP_SETXATTR: + case GF_FOP_GETXATTR: + case GF_FOP_FGETXATTR: + case GF_FOP_FSETXATTR: + case GF_FOP_REMOVEXATTR: + case GF_FOP_FREMOVEXATTR: + case GF_FOP_PUT: + pri = GF_FOP_PRI_NORMAL; + break; + + case GF_FOP_READ: + case GF_FOP_WRITE: + case GF_FOP_FSYNC: + case GF_FOP_TRUNCATE: + case GF_FOP_FTRUNCATE: + case GF_FOP_FSYNCDIR: + case GF_FOP_XATTROP: + case GF_FOP_FXATTROP: + case GF_FOP_RCHECKSUM: + case GF_FOP_FALLOCATE: + case GF_FOP_DISCARD: + case GF_FOP_ZEROFILL: + case GF_FOP_SEEK: + pri = GF_FOP_PRI_LO; + break; + + case GF_FOP_FORGET: + case GF_FOP_RELEASE: + case GF_FOP_RELEASEDIR: + case GF_FOP_GETSPEC: + break; + case GF_FOP_IPC: + default: + return -EINVAL; + } out: - if (ret < 0) { - STACK_UNWIND_STRICT (readlink, frame, -1, -ret, NULL, NULL); - - if (stub != NULL) { - call_stub_destroy (stub); - } - } - - return 0; + gf_msg_debug(this->name, 0, "%s scheduled as %s priority fop", + gf_fop_list[stub->fop], iot_get_pri_meaning(pri)); + if (this->private) + ret = do_iot_schedule(this->private, stub, pri); + return ret; } - int -iot_mknod_cbk (call_frame_t *frame, void *cookie, xlator_t *this, - int32_t op_ret, int32_t op_errno, inode_t *inode, - struct stat *buf, struct stat *preparent, - struct stat *postparent) +iot_lookup(call_frame_t *frame, xlator_t *this, loc_t *loc, dict_t *xdata) { - STACK_UNWIND_STRICT (mknod, frame, op_ret, op_errno, inode, buf, - preparent, postparent); - return 0; + IOT_FOP(lookup, frame, this, loc, xdata); + return 0; } - int -iot_mknod_wrapper (call_frame_t *frame, xlator_t *this, loc_t *loc, mode_t mode, - dev_t rdev) +iot_setattr(call_frame_t *frame, xlator_t *this, loc_t *loc, struct iatt *stbuf, + int32_t valid, dict_t *xdata) { - STACK_WIND (frame, iot_mknod_cbk, FIRST_CHILD (this), - FIRST_CHILD (this)->fops->mknod, loc, mode, rdev); - return 0; + IOT_FOP(setattr, frame, this, loc, stbuf, valid, xdata); + return 0; } - int -iot_mknod (call_frame_t *frame, xlator_t *this, loc_t *loc, mode_t mode, - dev_t rdev) -{ - call_stub_t *stub = NULL; - int ret = -1; - - stub = fop_mknod_stub (frame, iot_mknod_wrapper, loc, mode, rdev); - if (!stub) { - gf_log (this->name, GF_LOG_ERROR, "cannot create mknod stub" - "(out of memory)"); - ret = -ENOMEM; - goto out; - } - - ret = iot_schedule_unordered ((iot_conf_t *)this->private, loc->inode, - stub); - -out: - if (ret < 0) { - STACK_UNWIND_STRICT (mknod, frame, -1, -ret, NULL, NULL, NULL, - NULL); - - if (stub != NULL) { - call_stub_destroy (stub); - } - } - return 0; -} - - -int -iot_mkdir_cbk (call_frame_t *frame, void * cookie, xlator_t *this, - int32_t op_ret, int32_t op_errno, inode_t *inode, - struct stat *buf, struct stat *preparent, - struct stat *postparent) +iot_fsetattr(call_frame_t *frame, xlator_t *this, fd_t *fd, struct iatt *stbuf, + int32_t valid, dict_t *xdata) { - STACK_UNWIND_STRICT (mkdir, frame, op_ret, op_errno, inode, buf, - preparent, postparent); - return 0; + IOT_FOP(fsetattr, frame, this, fd, stbuf, valid, xdata); + return 0; } - int -iot_mkdir_wrapper (call_frame_t *frame, xlator_t *this, loc_t *loc, mode_t mode) +iot_access(call_frame_t *frame, xlator_t *this, loc_t *loc, int32_t mask, + dict_t *xdata) { - STACK_WIND (frame, iot_mkdir_cbk, FIRST_CHILD (this), - FIRST_CHILD (this)->fops->mkdir, loc, mode); - return 0; + IOT_FOP(access, frame, this, loc, mask, xdata); + return 0; } - int -iot_mkdir (call_frame_t *frame, xlator_t *this, loc_t *loc, mode_t mode) +iot_readlink(call_frame_t *frame, xlator_t *this, loc_t *loc, size_t size, + dict_t *xdata) { - call_stub_t *stub = NULL; - int ret = -1; - - stub = fop_mkdir_stub (frame, iot_mkdir_wrapper, loc, mode); - if (!stub) { - gf_log (this->name, GF_LOG_ERROR, "cannot create mkdir stub" - "(out of memory)"); - ret = -ENOMEM; - goto out; - } - - ret = iot_schedule_unordered ((iot_conf_t *)this->private, loc->inode, - stub); - -out: - if (ret < 0) { - STACK_UNWIND_STRICT (mkdir, frame, -1, -ret, NULL, NULL, NULL, - NULL); - - if (stub != NULL) { - call_stub_destroy (stub); - } - } - return 0; + IOT_FOP(readlink, frame, this, loc, size, xdata); + return 0; } - int -iot_rmdir_cbk (call_frame_t *frame, void *cookie, xlator_t *this, - int32_t op_ret, int32_t op_errno, struct stat *preparent, - struct stat *postparent) +iot_mknod(call_frame_t *frame, xlator_t *this, loc_t *loc, mode_t mode, + dev_t rdev, mode_t umask, dict_t *xdata) { - STACK_UNWIND_STRICT (rmdir, frame, op_ret, op_errno, preparent, - postparent); - return 0; + IOT_FOP(mknod, frame, this, loc, mode, rdev, umask, xdata); + return 0; } - int -iot_rmdir_wrapper (call_frame_t *frame, xlator_t *this, loc_t *loc) +iot_mkdir(call_frame_t *frame, xlator_t *this, loc_t *loc, mode_t mode, + mode_t umask, dict_t *xdata) { - STACK_WIND (frame, iot_rmdir_cbk, FIRST_CHILD (this), - FIRST_CHILD (this)->fops->rmdir, loc); - return 0; + IOT_FOP(mkdir, frame, this, loc, mode, umask, xdata); + return 0; } - int -iot_rmdir (call_frame_t *frame, xlator_t *this, loc_t *loc) +iot_rmdir(call_frame_t *frame, xlator_t *this, loc_t *loc, int flags, + dict_t *xdata) { - call_stub_t *stub = NULL; - int ret = -1; - - stub = fop_rmdir_stub (frame, iot_rmdir_wrapper, loc); - if (!stub) { - gf_log (this->name, GF_LOG_ERROR, "cannot create rmdir stub" - "(out of memory)"); - ret = -ENOMEM; - goto out; - } - - ret = iot_schedule_unordered ((iot_conf_t *)this->private, loc->inode, - stub); -out: - if (ret < 0) { - STACK_UNWIND_STRICT (rmdir, frame, -1, -ret, NULL, NULL); - - if (stub != NULL) { - call_stub_destroy (stub); - } - } - return 0; + IOT_FOP(rmdir, frame, this, loc, flags, xdata); + return 0; } - int -iot_symlink_cbk (call_frame_t *frame, void * cookie, xlator_t *this, - int32_t op_ret, int32_t op_errno, inode_t *inode, - struct stat *buf, struct stat *preparent, - struct stat *postparent) +iot_symlink(call_frame_t *frame, xlator_t *this, const char *linkname, + loc_t *loc, mode_t umask, dict_t *xdata) { - STACK_UNWIND_STRICT (symlink, frame, op_ret, op_errno, inode, buf, - preparent, postparent); - return 0; + IOT_FOP(symlink, frame, this, linkname, loc, umask, xdata); + return 0; } - int -iot_symlink_wrapper (call_frame_t *frame, xlator_t *this, const char *linkname, - loc_t *loc) +iot_rename(call_frame_t *frame, xlator_t *this, loc_t *oldloc, loc_t *newloc, + dict_t *xdata) { - STACK_WIND (frame, iot_symlink_cbk, FIRST_CHILD (this), - FIRST_CHILD (this)->fops->symlink, linkname, loc); - return 0; -} - - -int -iot_symlink (call_frame_t *frame, xlator_t *this, const char *linkname, - loc_t *loc) -{ - call_stub_t *stub = NULL; - int ret = -1; - - stub = fop_symlink_stub (frame, iot_symlink_wrapper, linkname, loc); - if (!stub) { - gf_log (this->name, GF_LOG_ERROR, "cannot create symlink stub" - "(out of memory)"); - ret = -ENOMEM; - goto out; - } - - ret = iot_schedule_unordered ((iot_conf_t *)this->private, loc->inode, - stub); - -out: - if (ret < 0) { - STACK_UNWIND_STRICT (symlink, frame, -1, -ret, NULL, NULL, NULL, - NULL); - if (stub != NULL) { - call_stub_destroy (stub); - } - } - - return 0; + IOT_FOP(rename, frame, this, oldloc, newloc, xdata); + return 0; } - int -iot_rename_cbk (call_frame_t *frame, void * cookie, xlator_t *this, - int32_t op_ret, int32_t op_errno, struct stat *buf, - struct stat *preoldparent, struct stat *postoldparent, - struct stat *prenewparent, struct stat *postnewparent) +iot_open(call_frame_t *frame, xlator_t *this, loc_t *loc, int32_t flags, + fd_t *fd, dict_t *xdata) { - STACK_UNWIND_STRICT (rename, frame, op_ret, op_errno, buf, preoldparent, - postoldparent, prenewparent, postnewparent); - return 0; + IOT_FOP(open, frame, this, loc, flags, fd, xdata); + return 0; } - int -iot_rename_wrapper (call_frame_t *frame, xlator_t *this, loc_t *oldloc, - loc_t *newloc) +iot_create(call_frame_t *frame, xlator_t *this, loc_t *loc, int32_t flags, + mode_t mode, mode_t umask, fd_t *fd, dict_t *xdata) { - STACK_WIND (frame, iot_rename_cbk, FIRST_CHILD (this), - FIRST_CHILD (this)->fops->rename, oldloc, newloc); - return 0; + IOT_FOP(create, frame, this, loc, flags, mode, umask, fd, xdata); + return 0; } - int -iot_rename (call_frame_t *frame, xlator_t *this, loc_t *oldloc, loc_t *newloc) +iot_put(call_frame_t *frame, xlator_t *this, loc_t *loc, mode_t mode, + mode_t umask, uint32_t flags, struct iovec *vector, int32_t count, + off_t offset, struct iobref *iobref, dict_t *xattr, dict_t *xdata) { - call_stub_t *stub = NULL; - int ret = -1; - - stub = fop_rename_stub (frame, iot_rename_wrapper, oldloc, newloc); - if (!stub) { - gf_log (this->name, GF_LOG_DEBUG, "cannot create rename stub" - "(out of memory)"); - ret = -ENOMEM; - goto out; - } - - ret = iot_schedule_unordered ((iot_conf_t *)this->private, - oldloc->inode, stub); - -out: - if (ret < 0) { - STACK_UNWIND_STRICT (rename, frame, -1, -ret, NULL, NULL, NULL, - NULL, NULL); - if (stub != NULL) { - call_stub_destroy (stub); - } - } - - return 0; -} - - -int -iot_open_cbk (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, - int32_t op_errno, fd_t *fd) -{ - STACK_UNWIND_STRICT (open, frame, op_ret, op_errno, fd); - return 0; -} - - -int -iot_open_wrapper (call_frame_t * frame, xlator_t * this, loc_t *loc, - int32_t flags, fd_t * fd, int32_t wbflags) -{ - STACK_WIND (frame, iot_open_cbk, FIRST_CHILD (this), - FIRST_CHILD (this)->fops->open, loc, flags, fd, wbflags); - return 0; -} - - -int -iot_open (call_frame_t *frame, xlator_t *this, loc_t *loc, int32_t flags, - fd_t *fd, int32_t wbflags) -{ - call_stub_t *stub = NULL; - int ret = -1; - - stub = fop_open_stub (frame, iot_open_wrapper, loc, flags, fd, wbflags); - if (!stub) { - gf_log (this->name, GF_LOG_ERROR, - "cannot create open call stub" - "(out of memory)"); - ret = -ENOMEM; - goto out; - } - - ret = iot_schedule_unordered ((iot_conf_t *)this->private, loc->inode, - stub); - -out: - if (ret < 0) { - STACK_UNWIND_STRICT (open, frame, -1, -ret, NULL); - - if (stub != NULL) { - call_stub_destroy (stub); - } - } - - return 0; + IOT_FOP(put, frame, this, loc, mode, umask, flags, vector, count, offset, + iobref, xattr, xdata); + return 0; } - int -iot_create_cbk (call_frame_t *frame, void *cookie, xlator_t *this, - int32_t op_ret, int32_t op_errno, fd_t *fd, inode_t *inode, - struct stat *stbuf, struct stat *preparent, - struct stat *postparent) +iot_readv(call_frame_t *frame, xlator_t *this, fd_t *fd, size_t size, + off_t offset, uint32_t flags, dict_t *xdata) { - STACK_UNWIND_STRICT (create, frame, op_ret, op_errno, fd, inode, stbuf, - preparent, postparent); - return 0; -} - - -int -iot_create_wrapper (call_frame_t *frame, xlator_t *this, loc_t *loc, - int32_t flags, mode_t mode, fd_t *fd) -{ - STACK_WIND (frame, iot_create_cbk, - FIRST_CHILD(this), - FIRST_CHILD(this)->fops->create, - loc, flags, mode, fd); - return 0; -} - - -int -iot_create (call_frame_t *frame, xlator_t *this, loc_t *loc, int32_t flags, - mode_t mode, fd_t *fd) -{ - call_stub_t *stub = NULL; - int ret = -1; - - stub = fop_create_stub (frame, iot_create_wrapper, loc, flags, mode, - fd); - if (!stub) { - gf_log (this->name, GF_LOG_ERROR, - "cannot create \"create\" call stub" - "(out of memory)"); - ret = -ENOMEM; - goto out; - } - - ret = iot_schedule_unordered ((iot_conf_t *)this->private, loc->inode, - stub); - -out: - if (ret < 0) { - STACK_UNWIND_STRICT (create, frame, -1, -ret, NULL, NULL, NULL, - NULL, NULL); - - if (stub != NULL) { - call_stub_destroy (stub); - } - } - - return 0; + IOT_FOP(readv, frame, this, fd, size, offset, flags, xdata); + return 0; } - int -iot_readv_cbk (call_frame_t *frame, void *cookie, xlator_t *this, - int32_t op_ret, int32_t op_errno, struct iovec *vector, - int32_t count, struct stat *stbuf, struct iobref *iobref) +iot_flush(call_frame_t *frame, xlator_t *this, fd_t *fd, dict_t *xdata) { - STACK_UNWIND_STRICT (readv, frame, op_ret, op_errno, vector, count, - stbuf, iobref); - - return 0; + IOT_FOP(flush, frame, this, fd, xdata); + return 0; } - int -iot_readv_wrapper (call_frame_t *frame, xlator_t *this, fd_t *fd, size_t size, - off_t offset) +iot_fsync(call_frame_t *frame, xlator_t *this, fd_t *fd, int32_t datasync, + dict_t *xdata) { - STACK_WIND (frame, iot_readv_cbk, - FIRST_CHILD(this), - FIRST_CHILD(this)->fops->readv, - fd, size, offset); - return 0; + IOT_FOP(fsync, frame, this, fd, datasync, xdata); + return 0; } - int -iot_readv (call_frame_t *frame, xlator_t *this, fd_t *fd, size_t size, - off_t offset) +iot_writev(call_frame_t *frame, xlator_t *this, fd_t *fd, struct iovec *vector, + int32_t count, off_t offset, uint32_t flags, struct iobref *iobref, + dict_t *xdata) { - call_stub_t *stub = NULL; - int ret = -1; - - stub = fop_readv_stub (frame, iot_readv_wrapper, fd, size, offset); - if (!stub) { - gf_log (this->name, GF_LOG_ERROR, - "cannot create readv call stub" - "(out of memory)"); - ret = -ENOMEM; - goto out; - } - - ret = iot_schedule_ordered ((iot_conf_t *)this->private, fd->inode, - stub); - -out: - if (ret < 0) { - STACK_UNWIND_STRICT (readv, frame, -1, -ret, NULL, -1, NULL, - NULL); - if (stub != NULL) { - call_stub_destroy (stub); - } - } - return 0; + IOT_FOP(writev, frame, this, fd, vector, count, offset, flags, iobref, + xdata); + return 0; } - int -iot_flush_cbk (call_frame_t *frame, void *cookie, xlator_t *this, - int32_t op_ret, int32_t op_errno) +iot_lk(call_frame_t *frame, xlator_t *this, fd_t *fd, int32_t cmd, + struct gf_flock *flock, dict_t *xdata) { - STACK_UNWIND_STRICT (flush, frame, op_ret, op_errno); - return 0; + IOT_FOP(lk, frame, this, fd, cmd, flock, xdata); + return 0; } - int -iot_flush_wrapper (call_frame_t *frame, xlator_t *this, fd_t *fd) +iot_stat(call_frame_t *frame, xlator_t *this, loc_t *loc, dict_t *xdata) { - STACK_WIND (frame, iot_flush_cbk, - FIRST_CHILD(this), - FIRST_CHILD(this)->fops->flush, - fd); - return 0; + IOT_FOP(stat, frame, this, loc, xdata); + return 0; } - int -iot_flush (call_frame_t *frame, xlator_t *this, fd_t *fd) -{ - call_stub_t *stub = NULL; - int ret = -1; - - stub = fop_flush_stub (frame, iot_flush_wrapper, fd); - if (!stub) { - gf_log (this->name, GF_LOG_ERROR, - "cannot create flush_cbk call stub" - "(out of memory)"); - ret = -ENOMEM; - goto out; - } - - ret = iot_schedule_ordered ((iot_conf_t *)this->private, fd->inode, - stub); -out: - if (ret < 0) { - STACK_UNWIND_STRICT (flush, frame, -1, -ret); - - if (stub != NULL) { - call_stub_destroy (stub); - } - } - return 0; -} - - -int -iot_fsync_cbk (call_frame_t *frame, void *cookie, xlator_t *this, - int32_t op_ret, int32_t op_errno, struct stat *prebuf, - struct stat *postbuf) +iot_fstat(call_frame_t *frame, xlator_t *this, fd_t *fd, dict_t *xdata) { - STACK_UNWIND_STRICT (fsync, frame, op_ret, op_errno, prebuf, postbuf); - return 0; + IOT_FOP(fstat, frame, this, fd, xdata); + return 0; } - int -iot_fsync_wrapper (call_frame_t *frame, xlator_t *this, fd_t *fd, - int32_t datasync) +iot_truncate(call_frame_t *frame, xlator_t *this, loc_t *loc, off_t offset, + dict_t *xdata) { - STACK_WIND (frame, iot_fsync_cbk, - FIRST_CHILD (this), - FIRST_CHILD (this)->fops->fsync, - fd, datasync); - return 0; + IOT_FOP(truncate, frame, this, loc, offset, xdata); + return 0; } - int -iot_fsync (call_frame_t *frame, xlator_t *this, fd_t *fd, int32_t datasync) +iot_ftruncate(call_frame_t *frame, xlator_t *this, fd_t *fd, off_t offset, + dict_t *xdata) { - call_stub_t *stub = NULL; - int ret = -1; - - stub = fop_fsync_stub (frame, iot_fsync_wrapper, fd, datasync); - if (!stub) { - gf_log (this->name, GF_LOG_ERROR, - "cannot create fsync_cbk call stub" - "(out of memory)"); - ret = -1; - goto out; - } - - ret = iot_schedule_ordered ((iot_conf_t *)this->private, fd->inode, - stub); - -out: - if (ret < 0) { - STACK_UNWIND_STRICT (fsync, frame, -1, -ret, NULL, NULL); - - if (stub != NULL) { - call_stub_destroy (stub); - } - } - return 0; + IOT_FOP(ftruncate, frame, this, fd, offset, xdata); + return 0; } - int -iot_writev_cbk (call_frame_t *frame, void *cookie, xlator_t *this, - int32_t op_ret, int32_t op_errno, struct stat *prebuf, - struct stat *postbuf) +iot_unlink(call_frame_t *frame, xlator_t *this, loc_t *loc, int32_t xflag, + dict_t *xdata) { - STACK_UNWIND_STRICT (writev, frame, op_ret, op_errno, prebuf, postbuf); - return 0; + IOT_FOP(unlink, frame, this, loc, xflag, xdata); + return 0; } - int -iot_writev_wrapper (call_frame_t *frame, xlator_t *this, fd_t *fd, - struct iovec *vector, int32_t count, - off_t offset, struct iobref *iobref) +iot_link(call_frame_t *frame, xlator_t *this, loc_t *oldloc, loc_t *newloc, + dict_t *xdata) { - STACK_WIND (frame, iot_writev_cbk, - FIRST_CHILD(this), - FIRST_CHILD(this)->fops->writev, - fd, vector, count, offset, iobref); - return 0; + IOT_FOP(link, frame, this, oldloc, newloc, xdata); + return 0; } - int -iot_writev (call_frame_t *frame, xlator_t *this, fd_t *fd, - struct iovec *vector, int32_t count, off_t offset, - struct iobref *iobref) -{ - call_stub_t *stub = NULL; - int ret = -1; - - stub = fop_writev_stub (frame, iot_writev_wrapper, - fd, vector, count, offset, iobref); - - if (!stub) { - gf_log (this->name, GF_LOG_ERROR, - "cannot create writev call stub" - "(out of memory)"); - ret = -ENOMEM; - goto out; - } - - ret = iot_schedule_ordered ((iot_conf_t *)this->private, fd->inode, - stub); -out: - if (ret < 0) { - STACK_UNWIND_STRICT (writev, frame, -1, -ret, NULL, NULL); - - if (stub != NULL) { - call_stub_destroy (stub); - } - } - - return 0; -} - - -int32_t -iot_lk_cbk (call_frame_t *frame, void *cookie, xlator_t *this, - int32_t op_ret, int32_t op_errno, struct flock *flock) +iot_opendir(call_frame_t *frame, xlator_t *this, loc_t *loc, fd_t *fd, + dict_t *xdata) { - STACK_UNWIND_STRICT (lk, frame, op_ret, op_errno, flock); - return 0; + IOT_FOP(opendir, frame, this, loc, fd, xdata); + return 0; } - int -iot_lk_wrapper (call_frame_t *frame, xlator_t *this, fd_t *fd, - int32_t cmd, struct flock *flock) +iot_fsyncdir(call_frame_t *frame, xlator_t *this, fd_t *fd, int datasync, + dict_t *xdata) { - STACK_WIND (frame, iot_lk_cbk, - FIRST_CHILD(this), - FIRST_CHILD(this)->fops->lk, - fd, cmd, flock); - return 0; + IOT_FOP(fsyncdir, frame, this, fd, datasync, xdata); + return 0; } - int -iot_lk (call_frame_t *frame, xlator_t *this, fd_t *fd, int32_t cmd, - struct flock *flock) +iot_statfs(call_frame_t *frame, xlator_t *this, loc_t *loc, dict_t *xdata) { - call_stub_t *stub = NULL; - int ret = -1; - - stub = fop_lk_stub (frame, iot_lk_wrapper, fd, cmd, flock); - - if (!stub) { - gf_log (this->name, GF_LOG_ERROR, - "cannot create fop_lk call stub" - "(out of memory)"); - ret = -ENOMEM; - goto out; - } - - ret = iot_schedule_ordered ((iot_conf_t *)this->private, fd->inode, - stub); -out: - if (ret < 0) { - STACK_UNWIND_STRICT (lk, frame, -1, -ret, NULL); - - if (stub != NULL) { - call_stub_destroy (stub); - } - } - return 0; + IOT_FOP(statfs, frame, this, loc, xdata); + return 0; } - int -iot_stat_cbk (call_frame_t *frame, void *cookie, xlator_t *this, - int32_t op_ret, int32_t op_errno, struct stat *buf) +iot_setxattr(call_frame_t *frame, xlator_t *this, loc_t *loc, dict_t *dict, + int32_t flags, dict_t *xdata) { - STACK_UNWIND_STRICT (stat, frame, op_ret, op_errno, buf); - return 0; + IOT_FOP(setxattr, frame, this, loc, dict, flags, xdata); + return 0; } - int -iot_stat_wrapper (call_frame_t *frame, xlator_t *this, loc_t *loc) +iot_getxattr(call_frame_t *frame, xlator_t *this, loc_t *loc, const char *name, + dict_t *xdata) { - STACK_WIND (frame, iot_stat_cbk, - FIRST_CHILD(this), - FIRST_CHILD(this)->fops->stat, - loc); - return 0; -} + iot_conf_t *conf = NULL; + dict_t *depths = NULL; + int i = 0; + int32_t op_ret = 0; + int32_t op_errno = 0; + conf = this->private; -int -iot_stat (call_frame_t *frame, xlator_t *this, loc_t *loc) -{ - call_stub_t *stub = NULL; - fd_t *fd = NULL; - int ret = -1; - - stub = fop_stat_stub (frame, iot_stat_wrapper, loc); - if (!stub) { - gf_log (this->name, GF_LOG_ERROR, - "cannot create fop_stat call stub" - "(out of memory)"); - ret = -1; - goto out; - } - - fd = fd_lookup (loc->inode, frame->root->pid); - /* File is not open, so we can send it through unordered pool. + if (name && strcmp(name, IO_THREADS_QUEUE_SIZE_KEY) == 0) { + /* + * We explicitly do not want a reference count + * for this dict in this translator */ - if (fd == NULL) - ret = iot_schedule_unordered ((iot_conf_t *)this->private, - loc->inode, stub); - else { - ret = iot_schedule_ordered ((iot_conf_t *)this->private, - loc->inode, stub); - fd_unref (fd); + depths = dict_new(); + if (!depths) { + op_ret = -1; + op_errno = ENOMEM; + goto unwind_special_getxattr; } -out: - if (ret < 0) { - STACK_UNWIND_STRICT (stat, frame, -1, -ret, NULL); - - if (stub != NULL) { - call_stub_destroy (stub); - } + for (i = 0; i < GF_FOP_PRI_MAX; i++) { + if (dict_set_int32(depths, (char *)fop_pri_to_string(i), + conf->queue_sizes[i]) != 0) { + dict_unref(depths); + depths = NULL; + goto unwind_special_getxattr; + } } - return 0; -} + unwind_special_getxattr: + STACK_UNWIND_STRICT(getxattr, frame, op_ret, op_errno, depths, xdata); + if (depths) + dict_unref(depths); + return 0; + } -int -iot_fstat_cbk (call_frame_t *frame, void *cookie, xlator_t *this, - int32_t op_ret, int32_t op_errno, struct stat *buf) -{ - STACK_UNWIND_STRICT (fstat, frame, op_ret, op_errno, buf); - return 0; + IOT_FOP(getxattr, frame, this, loc, name, xdata); + return 0; } - int -iot_fstat_wrapper (call_frame_t *frame, xlator_t *this, fd_t *fd) +iot_fgetxattr(call_frame_t *frame, xlator_t *this, fd_t *fd, const char *name, + dict_t *xdata) { - STACK_WIND (frame, iot_fstat_cbk, - FIRST_CHILD(this), - FIRST_CHILD(this)->fops->fstat, - fd); - return 0; -} - - -int -iot_fstat (call_frame_t *frame, xlator_t *this, fd_t *fd) -{ - call_stub_t *stub = NULL; - int ret = -1; - - stub = fop_fstat_stub (frame, iot_fstat_wrapper, fd); - if (!stub) { - gf_log (this->name, GF_LOG_ERROR, - "cannot create fop_fstat call stub" - "(out of memory)"); - ret = -ENOMEM; - goto out; - } - - ret = iot_schedule_ordered ((iot_conf_t *)this->private, fd->inode, - stub); -out: - if (ret < 0) { - STACK_UNWIND_STRICT (fstat, frame, -1, -ret, NULL); - - if (stub != NULL) { - call_stub_destroy (stub); - } - } - return 0; + IOT_FOP(fgetxattr, frame, this, fd, name, xdata); + return 0; } - int -iot_truncate_cbk (call_frame_t *frame, void *cookie, xlator_t *this, - int32_t op_ret, int32_t op_errno, struct stat *prebuf, - struct stat *postbuf) +iot_fsetxattr(call_frame_t *frame, xlator_t *this, fd_t *fd, dict_t *dict, + int32_t flags, dict_t *xdata) { - STACK_UNWIND_STRICT (truncate, frame, op_ret, op_errno, prebuf, - postbuf); - return 0; + IOT_FOP(fsetxattr, frame, this, fd, dict, flags, xdata); + return 0; } - int -iot_truncate_wrapper (call_frame_t *frame, xlator_t *this, loc_t *loc, - off_t offset) +iot_removexattr(call_frame_t *frame, xlator_t *this, loc_t *loc, + const char *name, dict_t *xdata) { - STACK_WIND (frame, iot_truncate_cbk, - FIRST_CHILD(this), - FIRST_CHILD(this)->fops->truncate, - loc, offset); - return 0; -} - - -int -iot_truncate (call_frame_t *frame, xlator_t *this, loc_t *loc, off_t offset) -{ - call_stub_t *stub; - fd_t *fd = NULL; - int ret = -1; - - stub = fop_truncate_stub (frame, iot_truncate_wrapper, loc, offset); - - if (!stub) { - gf_log (this->name, GF_LOG_ERROR, - "cannot create fop_stat call stub" - "(out of memory)"); - ret = -ENOMEM; - goto out; - } - - fd = fd_lookup (loc->inode, frame->root->pid); - if (fd == NULL) - ret = iot_schedule_unordered ((iot_conf_t *)this->private, - loc->inode, stub); - else { - ret = iot_schedule_ordered ((iot_conf_t *)this->private, - loc->inode, stub); - fd_unref (fd); - } - -out: - if (ret < 0) { - STACK_UNWIND_STRICT (truncate, frame, -1, -ret, NULL, NULL); - - if (stub != NULL) { - call_stub_destroy (stub); - } - } - - return 0; + IOT_FOP(removexattr, frame, this, loc, name, xdata); + return 0; } - int -iot_ftruncate_cbk (call_frame_t *frame, void *cookie, xlator_t *this, - int32_t op_ret, int32_t op_errno, struct stat *prebuf, - struct stat *postbuf) +iot_fremovexattr(call_frame_t *frame, xlator_t *this, fd_t *fd, + const char *name, dict_t *xdata) { - STACK_UNWIND_STRICT (ftruncate, frame, op_ret, op_errno, prebuf, - postbuf); - return 0; + IOT_FOP(fremovexattr, frame, this, fd, name, xdata); + return 0; } - int -iot_ftruncate_wrapper (call_frame_t *frame, xlator_t *this, fd_t *fd, - off_t offset) +iot_readdirp(call_frame_t *frame, xlator_t *this, fd_t *fd, size_t size, + off_t offset, dict_t *xdata) { - STACK_WIND (frame, iot_ftruncate_cbk, - FIRST_CHILD(this), - FIRST_CHILD(this)->fops->ftruncate, - fd, offset); - return 0; -} - - -int -iot_ftruncate (call_frame_t *frame, xlator_t *this, fd_t *fd, off_t offset) -{ - call_stub_t *stub = NULL; - int ret = -1; - - stub = fop_ftruncate_stub (frame, iot_ftruncate_wrapper, fd, offset); - if (!stub) { - gf_log (this->name, GF_LOG_ERROR, - "cannot create fop_ftruncate call stub" - "(out of memory)"); - ret = -ENOMEM; - goto out; - } - ret = iot_schedule_ordered ((iot_conf_t *)this->private, fd->inode, - stub); -out: - if (ret < 0) { - STACK_UNWIND_STRICT (ftruncate, frame, -1, -ret, NULL, NULL); - - if (stub != NULL) { - call_stub_destroy (stub); - } - } - return 0; + IOT_FOP(readdirp, frame, this, fd, size, offset, xdata); + return 0; } - int -iot_checksum_cbk (call_frame_t *frame, void *cookie, xlator_t *this, - int32_t op_ret, int32_t op_errno, uint8_t *file_checksum, - uint8_t *dir_checksum) +iot_readdir(call_frame_t *frame, xlator_t *this, fd_t *fd, size_t size, + off_t offset, dict_t *xdata) { - STACK_UNWIND_STRICT (checksum, frame, op_ret, op_errno, file_checksum, - dir_checksum); - return 0; + IOT_FOP(readdir, frame, this, fd, size, offset, xdata); + return 0; } - int -iot_checksum_wrapper (call_frame_t *frame, xlator_t *this, loc_t *loc, - int32_t flags) +iot_inodelk(call_frame_t *frame, xlator_t *this, const char *volume, loc_t *loc, + int32_t cmd, struct gf_flock *lock, dict_t *xdata) { - STACK_WIND (frame, iot_checksum_cbk, - FIRST_CHILD(this), - FIRST_CHILD(this)->fops->checksum, - loc, flags); - - return 0; + IOT_FOP(inodelk, frame, this, volume, loc, cmd, lock, xdata); + return 0; } - int -iot_checksum (call_frame_t *frame, xlator_t *this, loc_t *loc, int32_t flags) -{ - call_stub_t *stub = NULL; - int ret = -1; - - stub = fop_checksum_stub (frame, iot_checksum_wrapper, loc, flags); - - if (!stub) { - gf_log (this->name, GF_LOG_ERROR, - "cannot create fop_checksum call stub" - "(out of memory)"); - ret = -ENOMEM; - goto out; - } - ret = iot_schedule_unordered ((iot_conf_t *)this->private, loc->inode, - stub); -out: - if (ret < 0) { - STACK_UNWIND_STRICT (checksum, frame, -1, -ret, NULL, NULL); - - if (stub != NULL) { - call_stub_destroy (stub); - } - } - return 0; -} - - -int -iot_unlink_cbk (call_frame_t *frame, void *cookie, xlator_t *this, - int32_t op_ret, int32_t op_errno, struct stat *preparent, - struct stat *postparent) +iot_finodelk(call_frame_t *frame, xlator_t *this, const char *volume, fd_t *fd, + int32_t cmd, struct gf_flock *lock, dict_t *xdata) { - STACK_UNWIND_STRICT (unlink, frame, op_ret, op_errno, preparent, - postparent); - return 0; + IOT_FOP(finodelk, frame, this, volume, fd, cmd, lock, xdata); + return 0; } - int -iot_unlink_wrapper (call_frame_t *frame, xlator_t *this, loc_t *loc) +iot_entrylk(call_frame_t *frame, xlator_t *this, const char *volume, loc_t *loc, + const char *basename, entrylk_cmd cmd, entrylk_type type, + dict_t *xdata) { - STACK_WIND (frame, iot_unlink_cbk, - FIRST_CHILD(this), - FIRST_CHILD(this)->fops->unlink, - loc); - - return 0; + IOT_FOP(entrylk, frame, this, volume, loc, basename, cmd, type, xdata); + return 0; } - int -iot_unlink (call_frame_t *frame, xlator_t *this, loc_t *loc) +iot_fentrylk(call_frame_t *frame, xlator_t *this, const char *volume, fd_t *fd, + const char *basename, entrylk_cmd cmd, entrylk_type type, + dict_t *xdata) { - call_stub_t *stub = NULL; - int ret = -1; - - stub = fop_unlink_stub (frame, iot_unlink_wrapper, loc); - if (!stub) { - gf_log (this->name, GF_LOG_ERROR, - "cannot create fop_unlink call stub" - "(out of memory)"); - ret = -1; - goto out; - } - - ret = iot_schedule_unordered((iot_conf_t *)this->private, loc->inode, - stub); - -out: - if (ret < 0) { - STACK_UNWIND_STRICT (unlink, frame, -1, -ret, NULL, NULL); - - if (stub != NULL) { - call_stub_destroy (stub); - } - } - - return 0; + IOT_FOP(fentrylk, frame, this, volume, fd, basename, cmd, type, xdata); + return 0; } - int -iot_link_cbk (call_frame_t *frame, void *cookie, xlator_t *this, - int32_t op_ret, int32_t op_errno, inode_t *inode, - struct stat *buf, struct stat *preparent, struct stat *postparent) +iot_xattrop(call_frame_t *frame, xlator_t *this, loc_t *loc, + gf_xattrop_flags_t optype, dict_t *xattr, dict_t *xdata) { - STACK_UNWIND_STRICT (link, frame, op_ret, op_errno, inode, buf, - preparent, postparent); - return 0; + IOT_FOP(xattrop, frame, this, loc, optype, xattr, xdata); + return 0; } - int -iot_link_wrapper (call_frame_t *frame, xlator_t *this, loc_t *old, loc_t *new) +iot_fxattrop(call_frame_t *frame, xlator_t *this, fd_t *fd, + gf_xattrop_flags_t optype, dict_t *xattr, dict_t *xdata) { - STACK_WIND (frame, iot_link_cbk, FIRST_CHILD (this), - FIRST_CHILD (this)->fops->link, old, new); - - return 0; + IOT_FOP(fxattrop, frame, this, fd, optype, xattr, xdata); + return 0; } - -int -iot_link (call_frame_t *frame, xlator_t *this, loc_t *oldloc, loc_t *newloc) +int32_t +iot_rchecksum(call_frame_t *frame, xlator_t *this, fd_t *fd, off_t offset, + int32_t len, dict_t *xdata) { - call_stub_t *stub = NULL; - int ret = -1; - - stub = fop_link_stub (frame, iot_link_wrapper, oldloc, newloc); - if (!stub) { - gf_log (this->name, GF_LOG_ERROR, "cannot create link stub" - "(out of memory)"); - ret = -ENOMEM; - goto out; - } - - ret = iot_schedule_unordered ((iot_conf_t *)this->private, - oldloc->inode, stub); -out: - if (ret < 0) { - STACK_UNWIND_STRICT (link, frame, -1, -ret, NULL, NULL, NULL, - NULL); - - if (stub != NULL) { - call_stub_destroy (stub); - } - } - return 0; + IOT_FOP(rchecksum, frame, this, fd, offset, len, xdata); + return 0; } - int -iot_opendir_cbk (call_frame_t *frame, void *cookie, xlator_t *this, - int32_t op_ret, int32_t op_errno, fd_t *fd) +iot_fallocate(call_frame_t *frame, xlator_t *this, fd_t *fd, int32_t mode, + off_t offset, size_t len, dict_t *xdata) { - STACK_UNWIND_STRICT (opendir, frame, op_ret, op_errno, fd); - return 0; + IOT_FOP(fallocate, frame, this, fd, mode, offset, len, xdata); + return 0; } - int -iot_opendir_wrapper (call_frame_t *frame, xlator_t *this, loc_t *loc, fd_t *fd) +iot_discard(call_frame_t *frame, xlator_t *this, fd_t *fd, off_t offset, + size_t len, dict_t *xdata) { - STACK_WIND (frame, iot_opendir_cbk, FIRST_CHILD (this), - FIRST_CHILD (this)->fops->opendir, loc, fd); - return 0; + IOT_FOP(discard, frame, this, fd, offset, len, xdata); + return 0; } - int -iot_opendir (call_frame_t *frame, xlator_t *this, loc_t *loc, fd_t *fd) +iot_zerofill(call_frame_t *frame, xlator_t *this, fd_t *fd, off_t offset, + off_t len, dict_t *xdata) { - call_stub_t *stub = NULL; - int ret = -1; - - stub = fop_opendir_stub (frame, iot_opendir_wrapper, loc, fd); - if (!stub) { - gf_log (this->name, GF_LOG_ERROR, "cannot create opendir stub" - "(out of memory)"); - ret = -ENOMEM; - goto out; - } - - ret = iot_schedule_unordered ((iot_conf_t *)this->private, loc->inode, - stub); -out: - if (ret < 0) { - STACK_UNWIND_STRICT (opendir, frame, -1, -ret, NULL); - - if (stub != NULL) { - call_stub_destroy (stub); - } - } - return 0; + IOT_FOP(zerofill, frame, this, fd, offset, len, xdata); + return 0; } - int -iot_fsyncdir_cbk (call_frame_t *frame, void *cookie, xlator_t *this, - int32_t op_ret, int32_t op_errno) +iot_seek(call_frame_t *frame, xlator_t *this, fd_t *fd, off_t offset, + gf_seek_what_t what, dict_t *xdata) { - STACK_UNWIND_STRICT (fsyncdir, frame, op_ret, op_errno); - return 0; + IOT_FOP(seek, frame, this, fd, offset, what, xdata); + return 0; } - int -iot_fsyncdir_wrapper (call_frame_t *frame, xlator_t *this, fd_t *fd, - int datasync) +iot_lease(call_frame_t *frame, xlator_t *this, loc_t *loc, + struct gf_lease *lease, dict_t *xdata) { - STACK_WIND (frame, iot_fsyncdir_cbk, FIRST_CHILD (this), - FIRST_CHILD (this)->fops->fsyncdir, fd, datasync); - return 0; + IOT_FOP(lease, frame, this, loc, lease, xdata); + return 0; } - int -iot_fsyncdir (call_frame_t *frame, xlator_t *this, fd_t *fd, int datasync) +iot_getactivelk(call_frame_t *frame, xlator_t *this, loc_t *loc, dict_t *xdata) { - call_stub_t *stub = NULL; - int ret = -1; - - stub = fop_fsyncdir_stub (frame, iot_fsyncdir_wrapper, fd, datasync); - if (!stub) { - gf_log (this->name, GF_LOG_ERROR, "cannot create fsyncdir stub" - "(out of memory)"); - ret = -ENOMEM; - goto out; - } - - ret = iot_schedule_ordered ((iot_conf_t *)this->private, fd->inode, - stub); -out: - if (ret < 0) { - STACK_UNWIND_STRICT (fsyncdir, frame, -1, -ret); - - if (stub != NULL) { - call_stub_destroy (stub); - } - } - return 0; + IOT_FOP(getactivelk, frame, this, loc, xdata); + return 0; } - int -iot_statfs_cbk (call_frame_t *frame, void *cookie, xlator_t *this, - int32_t op_ret, int32_t op_errno, struct statvfs *buf) +iot_setactivelk(call_frame_t *frame, xlator_t *this, loc_t *loc, + lock_migration_info_t *locklist, dict_t *xdata) { - STACK_UNWIND_STRICT (statfs, frame, op_ret, op_errno, buf); - return 0; + IOT_FOP(setactivelk, frame, this, loc, locklist, xdata); + return 0; } - int -iot_statfs_wrapper (call_frame_t *frame, xlator_t *this, loc_t *loc) +__iot_workers_scale(iot_conf_t *conf) { - STACK_WIND (frame, iot_statfs_cbk, FIRST_CHILD (this), - FIRST_CHILD (this)->fops->statfs, loc); - return 0; -} - - -int -iot_statfs (call_frame_t *frame, xlator_t *this, loc_t *loc) -{ - call_stub_t *stub = NULL; - int ret = -1; - - stub = fop_statfs_stub (frame, iot_statfs_wrapper, loc); - if (!stub) { - gf_log (this->name, GF_LOG_ERROR, "cannot create statfs stub" - "(out of memory)"); - ret = -ENOMEM; - goto out; - } - - ret = iot_schedule_unordered ((iot_conf_t *)this->private, loc->inode, - stub); -out: - if (ret < 0) { - STACK_UNWIND_STRICT (statfs, frame, -1, -ret, NULL); - - if (stub != NULL) { - call_stub_destroy (stub); - } - } - return 0; -} + int scale = 0; + int diff = 0; + pthread_t thread; + int ret = 0; + int i = 0; + for (i = 0; i < GF_FOP_PRI_MAX; i++) + scale += min(conf->queue_sizes[i], conf->ac_iot_limit[i]); -int -iot_setxattr_cbk (call_frame_t *frame, void *cookie, xlator_t *this, - int32_t op_ret, int32_t op_errno) -{ - STACK_UNWIND_STRICT (setxattr, frame, op_ret, op_errno); - return 0; -} - - -int -iot_setxattr_wrapper (call_frame_t *frame, xlator_t *this, loc_t *loc, - dict_t *dict, int32_t flags) -{ - STACK_WIND (frame, iot_setxattr_cbk, FIRST_CHILD (this), - FIRST_CHILD (this)->fops->setxattr, loc, dict, flags); - return 0; -} - + if (scale < IOT_MIN_THREADS) + scale = IOT_MIN_THREADS; -int -iot_setxattr (call_frame_t *frame, xlator_t *this, loc_t *loc, dict_t *dict, - int32_t flags) -{ - call_stub_t *stub = NULL; - fd_t *fd = NULL; - int ret = -1; - - stub = fop_setxattr_stub (frame, iot_setxattr_wrapper, loc, dict, - flags); - if (!stub) { - gf_log (this->name, GF_LOG_ERROR, "cannot create setxattr stub" - "(out of memory)"); - ret = -ENOMEM; - goto out; - } + if (scale > conf->max_count) + scale = conf->max_count; - fd = fd_lookup (loc->inode, frame->root->pid); - if (fd == NULL) - ret = iot_schedule_unordered ((iot_conf_t *)this->private, - loc->inode, stub); - else { - ret = iot_schedule_ordered ((iot_conf_t *)this->private, - loc->inode, stub); - fd_unref (fd); - } + if (conf->curr_count < scale) { + diff = scale - conf->curr_count; + } -out: - if (ret < 0) { - STACK_UNWIND_STRICT (setxattr, frame, -1, -ret); + while (diff) { + diff--; - if (stub != NULL) { - call_stub_destroy (stub); - } + ret = gf_thread_create(&thread, &conf->w_attr, iot_worker, conf, + "iotwr%03hx", conf->curr_count & 0x3ff); + if (ret == 0) { + pthread_detach(thread); + conf->curr_count++; + gf_msg_debug(conf->this->name, 0, + "scaled threads to %d (queue_size=%d/%d)", + conf->curr_count, conf->queue_size, scale); + } else { + break; } - return 0; -} - + } -int -iot_getxattr_cbk (call_frame_t *frame, void *cookie, xlator_t *this, - int32_t op_ret, int32_t op_errno, dict_t *dict) -{ - STACK_UNWIND_STRICT (getxattr, frame, op_ret, op_errno, dict); - return 0; + return diff; } - int -iot_getxattr_wrapper (call_frame_t *frame, xlator_t *this, loc_t *loc, - const char *name) +iot_workers_scale(iot_conf_t *conf) { - STACK_WIND (frame, iot_getxattr_cbk, FIRST_CHILD (this), - FIRST_CHILD (this)->fops->getxattr, loc, name); - return 0; -} + int ret = -1; + if (conf == NULL) { + ret = -EINVAL; + goto out; + } -int -iot_getxattr (call_frame_t *frame, xlator_t *this, loc_t *loc, - const char *name) -{ - call_stub_t *stub = NULL; - fd_t *fd = NULL; - int ret = -1; - - stub = fop_getxattr_stub (frame, iot_getxattr_wrapper, loc, name); - if (!stub) { - gf_log (this->name, GF_LOG_ERROR, "cannot create getxattr stub" - "(out of memory)"); - ret = -ENOMEM; - goto out; - } - - fd = fd_lookup (loc->inode, frame->root->pid); - if (!fd) - ret = iot_schedule_unordered ((iot_conf_t *)this->private, - loc->inode, stub); - else { - ret = iot_schedule_ordered ((iot_conf_t *)this->private, - loc->inode, stub); - fd_unref (fd); - } + pthread_mutex_lock(&conf->mutex); + { + ret = __iot_workers_scale(conf); + } + pthread_mutex_unlock(&conf->mutex); out: - if (ret < 0) { - STACK_UNWIND_STRICT (getxattr, frame, -1, -ret, NULL); - - if (stub != NULL) { - call_stub_destroy (stub); - } - } - return 0; -} - - -int -iot_fgetxattr_cbk (call_frame_t *frame, void *cookie, xlator_t *this, - int32_t op_ret, int32_t op_errno, dict_t *dict) -{ - STACK_UNWIND_STRICT (fgetxattr, frame, op_ret, op_errno, dict); - return 0; + return ret; } - int -iot_fgetxattr_wrapper (call_frame_t *frame, xlator_t *this, fd_t *fd, - const char *name) +set_stack_size(iot_conf_t *conf) { - STACK_WIND (frame, iot_fgetxattr_cbk, FIRST_CHILD (this), - FIRST_CHILD (this)->fops->fgetxattr, fd, name); - return 0; -} - + int err = 0; + size_t stacksize = IOT_THREAD_STACK_SIZE; + xlator_t *this = NULL; -int -iot_fgetxattr (call_frame_t *frame, xlator_t *this, fd_t *fd, - const char *name) -{ - call_stub_t *stub = NULL; - int ret = -1; - - stub = fop_fgetxattr_stub (frame, iot_fgetxattr_wrapper, fd, name); - if (!stub) { - gf_log (this->name, GF_LOG_ERROR, "cannot create fgetxattr stub" - "(out of memory)"); - ret = -ENOMEM; - goto out; - } + this = THIS; - ret = iot_schedule_ordered ((iot_conf_t *)this->private, fd->inode, - stub); -out: - if (ret < 0) { - STACK_UNWIND_STRICT (fgetxattr, frame, -1, -ret, NULL); + err = pthread_attr_init(&conf->w_attr); + if (err != 0) { + gf_smsg(this->name, GF_LOG_ERROR, err, IO_THREADS_MSG_INIT_FAILED, + NULL); + return err; + } - if (stub != NULL) { - call_stub_destroy (stub); - } + err = pthread_attr_setstacksize(&conf->w_attr, stacksize); + if (err == EINVAL) { + err = pthread_attr_getstacksize(&conf->w_attr, &stacksize); + if (!err) { + gf_smsg(this->name, GF_LOG_WARNING, 0, IO_THREADS_MSG_SIZE_NOT_SET, + "size=%zd", stacksize, NULL); + } else { + gf_smsg(this->name, GF_LOG_WARNING, 0, IO_THREADS_MSG_SIZE_NOT_SET, + NULL); + err = 0; } - return 0; -} + } - -int -iot_fsetxattr_cbk (call_frame_t *frame, void *cookie, xlator_t *this, - int32_t op_ret, int32_t op_errno) -{ - STACK_UNWIND_STRICT (fsetxattr, frame, op_ret, op_errno); - return 0; + conf->stack_size = stacksize; + return err; } - -int -iot_fsetxattr_wrapper (call_frame_t *frame, xlator_t *this, fd_t *fd, - dict_t *dict, int32_t flags) +int32_t +mem_acct_init(xlator_t *this) { - STACK_WIND (frame, iot_fsetxattr_cbk, FIRST_CHILD (this), - FIRST_CHILD (this)->fops->fsetxattr, fd, dict, flags); - return 0; -} + int ret = -1; + if (!this) + return ret; -int -iot_fsetxattr (call_frame_t *frame, xlator_t *this, fd_t *fd, dict_t *dict, - int32_t flags) -{ - call_stub_t *stub = NULL; - int ret = -1; - - stub = fop_fsetxattr_stub (frame, iot_fsetxattr_wrapper, fd, dict, - flags); - if (!stub) { - gf_log (this->name, GF_LOG_ERROR, "cannot create fsetxattr stub" - "(out of memory)"); - ret = -ENOMEM; - goto out; - } + ret = xlator_mem_acct_init(this, gf_iot_mt_end + 1); - ret = iot_schedule_ordered ((iot_conf_t *)this->private, fd->inode, - stub); -out: - if (ret < 0) { - STACK_UNWIND_STRICT (fsetxattr, frame, -1, -ret); + if (ret != 0) { + gf_smsg(this->name, GF_LOG_ERROR, ENOMEM, IO_THREADS_MSG_NO_MEMORY, + NULL); + return ret; + } - if (stub != NULL) { - call_stub_destroy (stub); - } - } - return 0; + return ret; } - int -iot_removexattr_cbk (call_frame_t *frame, void *cookie, xlator_t *this, - int32_t op_ret, int32_t op_errno) +iot_priv_dump(xlator_t *this) { - STACK_UNWIND_STRICT (removexattr, frame, op_ret, op_errno); - return 0; -} + iot_conf_t *conf = NULL; + char key_prefix[GF_DUMP_MAX_BUF_LEN]; + char key[GF_DUMP_MAX_BUF_LEN]; + int i = 0; - -int -iot_removexattr_wrapper (call_frame_t *frame, xlator_t *this, loc_t *loc, - const char *name) -{ - STACK_WIND (frame, iot_removexattr_cbk, FIRST_CHILD (this), - FIRST_CHILD (this)->fops->removexattr, loc, name); + if (!this) return 0; -} - - -int -iot_removexattr (call_frame_t *frame, xlator_t *this, loc_t *loc, - const char *name) -{ - call_stub_t *stub = NULL; - fd_t *fd = NULL; - int ret = -1; - - stub = fop_removexattr_stub (frame, iot_removexattr_wrapper, loc, - name); - if (!stub) { - gf_log (this->name, GF_LOG_ERROR,"cannot get removexattr fop" - "(out of memory)"); - ret = -ENOMEM; - goto out; - } - - fd = fd_lookup (loc->inode, frame->root->pid); - if (!fd) - ret = iot_schedule_unordered ((iot_conf_t *)this->private, - loc->inode, stub); - else { - ret = iot_schedule_ordered ((iot_conf_t *)this->private, - loc->inode, stub); - fd_unref (fd); - } -out: - if (ret < 0) { - STACK_UNWIND_STRICT (removexattr, frame, -1, -ret); - - if (stub != NULL) { - call_stub_destroy (stub); - } - } + conf = this->private; + if (!conf) return 0; -} + snprintf(key_prefix, GF_DUMP_MAX_BUF_LEN, "%s.%s", this->type, this->name); -int -iot_readdirp_cbk (call_frame_t *frame, void *cookie, xlator_t *this, - int32_t op_ret, int32_t op_errno, gf_dirent_t *entries) -{ - STACK_UNWIND_STRICT (readdirp, frame, op_ret, op_errno, entries); - return 0; -} + gf_proc_dump_add_section("%s", key_prefix); + gf_proc_dump_write("maximum_threads_count", "%d", conf->max_count); + gf_proc_dump_write("current_threads_count", "%d", conf->curr_count); + gf_proc_dump_write("sleep_count", "%d", conf->sleep_count); + gf_proc_dump_write("idle_time", "%d", conf->idle_time); + gf_proc_dump_write("stack_size", "%zd", conf->stack_size); + gf_proc_dump_write("max_high_priority_threads", "%d", + conf->ac_iot_limit[GF_FOP_PRI_HI]); + gf_proc_dump_write("max_normal_priority_threads", "%d", + conf->ac_iot_limit[GF_FOP_PRI_NORMAL]); + gf_proc_dump_write("max_low_priority_threads", "%d", + conf->ac_iot_limit[GF_FOP_PRI_LO]); + gf_proc_dump_write("max_least_priority_threads", "%d", + conf->ac_iot_limit[GF_FOP_PRI_LEAST]); + gf_proc_dump_write("current_high_priority_threads", "%d", + conf->ac_iot_count[GF_FOP_PRI_HI]); + gf_proc_dump_write("current_normal_priority_threads", "%d", + conf->ac_iot_count[GF_FOP_PRI_NORMAL]); + gf_proc_dump_write("current_low_priority_threads", "%d", + conf->ac_iot_count[GF_FOP_PRI_LO]); + gf_proc_dump_write("current_least_priority_threads", "%d", + conf->ac_iot_count[GF_FOP_PRI_LEAST]); + for (i = 0; i < GF_FOP_PRI_MAX; i++) { + if (!conf->queue_sizes[i]) + continue; + snprintf(key, sizeof(key), "%s_priority_queue_length", + iot_get_pri_meaning(i)); + gf_proc_dump_write(key, "%d", conf->queue_sizes[i]); + } -int -iot_readdirp_wrapper (call_frame_t *frame, xlator_t *this, fd_t *fd, - size_t size, off_t offset) -{ - STACK_WIND (frame, iot_readdirp_cbk, FIRST_CHILD (this), - FIRST_CHILD (this)->fops->readdirp, fd, size, offset); - return 0; -} - - -int -iot_readdirp (call_frame_t *frame, xlator_t *this, fd_t *fd, size_t size, - off_t offset) -{ - call_stub_t *stub = NULL; - int ret = -1; - - stub = fop_readdirp_stub (frame, iot_readdirp_wrapper, fd, size, - offset); - if (!stub) { - gf_log (this->private, GF_LOG_ERROR,"cannot get readdir stub" - "(out of memory)"); - ret = -ENOMEM; - goto out; - } - - ret = iot_schedule_ordered ((iot_conf_t *)this->private, fd->inode, - stub); -out: - if (ret < 0) { - STACK_UNWIND_STRICT (readdirp, frame, -1, -ret, NULL); - - if (stub != NULL) { - call_stub_destroy (stub); - } - } - return 0; + return 0; } +/* + * We use a decay model to keep track and make sure we're not spawning new + * threads too often. Each increment adds a large value to a counter, and that + * counter keeps ticking back down to zero over a fairly long period. For + * example, let's use ONE_WEEK=604800 seconds, and we want to detect when we + * have N=3 increments during that time. Thus, our threshold is + * (N-1)*ONE_WEEK. To see how it works, look at three examples. + * + * (a) Two events close together, then one more almost a week later. The + * first two events push our counter to 2*ONE_WEEK plus a bit. At the third + * event, we decay down to ONE_WEEK plus a bit and then add ONE_WEEK for the + * new event, exceeding our threshold. + * + * (b) One event, then two more almost a week later. At the time of the + * second and third events, the counter is already non-zero, so when we add + * 2*ONE_WEEK we exceed again. + * + * (c) Three events, spaced three days apart. At the time of the second + * event, we decay down to approxitely ONE_WEEK*4/7 and then add another + * ONE_WEEK. At the third event, we decay again down to ONE_WEEK*8/7 and add + * another ONE_WEEK, so boom. + * + * Note that in all three cases if that last event came a day later our counter + * would have decayed a bit more and we would *not* exceed our threshold. It's + * not exactly the same as a precise "three in one week" limit, but it's very + * close and it allows the same kind of tweaking while requiring only constant + * space - no arrays of variable length N to allocate or maintain. All we need + * (for each queue) is the value plus the time of the last update. + */ -int -iot_readdir_cbk (call_frame_t *frame, void *cookie, xlator_t *this, - int32_t op_ret, int32_t op_errno, gf_dirent_t *entries) -{ - STACK_UNWIND_STRICT (readdir, frame, op_ret, op_errno, entries); - return 0; -} - +typedef struct { + time_t update_time; + uint32_t value; +} threshold_t; +/* + * Variables so that I can hack these for testing. + * TBD: make these tunable? + */ +static uint32_t THRESH_SECONDS = 604800; +static uint32_t THRESH_EVENTS = 3; +static uint32_t THRESH_LIMIT = 1209600; /* SECONDS * (EVENTS-1) */ -int -iot_readdir_wrapper (call_frame_t *frame, xlator_t *this, fd_t *fd, - size_t size, off_t offset) +static void +iot_apply_event(xlator_t *this, threshold_t *thresh) { - STACK_WIND (frame, iot_readdir_cbk, FIRST_CHILD (this), - FIRST_CHILD (this)->fops->readdir, fd, size, offset); - return 0; -} + time_t delta, now = gf_time(); + /* Refresh for manual testing/debugging. It's cheap. */ + THRESH_LIMIT = THRESH_SECONDS * (THRESH_EVENTS - 1); -int -iot_readdir (call_frame_t *frame, xlator_t *this, fd_t *fd, size_t size, - off_t offset) -{ - call_stub_t *stub = NULL; - int ret = -1; - - stub = fop_readdir_stub (frame, iot_readdir_wrapper, fd, size, offset); - if (!stub) { - gf_log (this->private, GF_LOG_ERROR,"cannot get readdir stub" - "(out of memory)"); - ret = -ENOMEM; - goto out; - } - - ret = iot_schedule_ordered ((iot_conf_t *)this->private, fd->inode, - stub); -out: - if (ret < 0) { - STACK_UNWIND_STRICT (readdir, frame, -1, -ret, NULL); - - if (stub != NULL) { - call_stub_destroy (stub); + if (thresh->value && thresh->update_time) { + delta = now - thresh->update_time; + /* Be careful about underflow. */ + if (thresh->value <= delta) { + thresh->value = 0; + } else { + thresh->value -= delta; + } + } + + thresh->value += THRESH_SECONDS; + if (thresh->value >= THRESH_LIMIT) { + gf_log(this->name, GF_LOG_EMERG, "watchdog firing too often"); + /* + * The default action for SIGTRAP is to dump core, but the fact + * that it's distinct from other signals we use means that + * there are other possibilities as well (e.g. drop into gdb or + * invoke a special handler). + */ + kill(getpid(), SIGTRAP); + } + + thresh->update_time = now; +} + +static void * +iot_watchdog(void *arg) +{ + xlator_t *this = arg; + iot_conf_t *priv = this->private; + int i; + int bad_times[GF_FOP_PRI_MAX] = { + 0, + }; + threshold_t thresholds[GF_FOP_PRI_MAX] = {{ + 0, + }}; + + for (;;) { + sleep(max(priv->watchdog_secs / 5, 1)); + pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL); + pthread_mutex_lock(&priv->mutex); + for (i = 0; i < GF_FOP_PRI_MAX; ++i) { + if (priv->queue_marked[i]) { + if (++bad_times[i] >= 5) { + gf_log(this->name, GF_LOG_WARNING, "queue %d stalled", i); + iot_apply_event(this, &thresholds[i]); + /* + * We might not get here if the event + * put us over our threshold. + */ + ++(priv->ac_iot_limit[i]); + bad_times[i] = 0; } + } else { + bad_times[i] = 0; + } + priv->queue_marked[i] = (priv->queue_sizes[i] > 0); } - return 0; -} - + pthread_mutex_unlock(&priv->mutex); + pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL); + } -int -iot_xattrop_cbk (call_frame_t *frame, void *cookie, xlator_t *this, - int32_t op_ret, int32_t op_errno, dict_t *xattr) -{ - STACK_UNWIND_STRICT (xattrop, frame, op_ret, op_errno, xattr); - return 0; + /* NOTREACHED */ + return NULL; } - -int -iot_xattrop_wrapper (call_frame_t *frame, xlator_t *this, loc_t *loc, - gf_xattrop_flags_t optype, dict_t *xattr) +static void +start_iot_watchdog(xlator_t *this) { - STACK_WIND (frame, iot_xattrop_cbk, FIRST_CHILD (this), - FIRST_CHILD (this)->fops->xattrop, loc, optype, xattr); - return 0; -} - - -int -iot_xattrop (call_frame_t *frame, xlator_t *this, loc_t *loc, - gf_xattrop_flags_t optype, dict_t *xattr) -{ - call_stub_t *stub = NULL; - fd_t *fd = NULL; - int ret = -1; - - stub = fop_xattrop_stub (frame, iot_xattrop_wrapper, loc, optype, - xattr); - if (!stub) { - gf_log (this->name, GF_LOG_ERROR, "cannot create xattrop stub" - "(out of memory)"); - ret = -ENOMEM; - goto out; - } - - fd = fd_lookup (loc->inode, frame->root->pid); - if (!fd) - ret = iot_schedule_unordered ((iot_conf_t *)this->private, - loc->inode, stub); - else { - ret = iot_schedule_ordered ((iot_conf_t *)this->private, - loc->inode, stub); - fd_unref (fd); - } - -out: - if (ret < 0) { - STACK_UNWIND_STRICT (xattrop, frame, -1, -ret, NULL); - - if (stub != NULL) { - call_stub_destroy (stub); - } - } - return 0; -} + iot_conf_t *priv = this->private; + int ret; + if (priv->watchdog_running) { + return; + } -int -iot_fxattrop_cbk (call_frame_t *frame, void *cookie, xlator_t *this, - int32_t op_ret, int32_t op_errno, dict_t *xattr) -{ - STACK_UNWIND_STRICT (fxattrop, frame, op_ret, op_errno, xattr); - return 0; + ret = pthread_create(&priv->watchdog_thread, NULL, iot_watchdog, this); + if (ret == 0) { + priv->watchdog_running = _gf_true; + } else { + gf_log(this->name, GF_LOG_WARNING, + "pthread_create(iot_watchdog) failed"); + } } -int -iot_fxattrop_wrapper (call_frame_t *frame, xlator_t *this, fd_t *fd, - gf_xattrop_flags_t optype, dict_t *xattr) +static void +stop_iot_watchdog(xlator_t *this) { - STACK_WIND (frame, iot_fxattrop_cbk, FIRST_CHILD (this), - FIRST_CHILD (this)->fops->fxattrop, fd, optype, xattr); - return 0; -} - -int -iot_fxattrop (call_frame_t *frame, xlator_t *this, fd_t *fd, - gf_xattrop_flags_t optype, dict_t *xattr) -{ - call_stub_t *stub = NULL; - int ret = -1; - - stub = fop_fxattrop_stub (frame, iot_fxattrop_wrapper, fd, optype, - xattr); - if (!stub) { - gf_log (this->name, GF_LOG_ERROR, "cannot create fxattrop stub" - "(out of memory)"); - ret = -ENOMEM; - goto out; - } + iot_conf_t *priv = this->private; - ret = iot_schedule_ordered ((iot_conf_t *)this->private, fd->inode, - stub); -out: - if (ret < 0) { - STACK_UNWIND_STRICT (fxattrop, frame, -1, -ret, NULL); - if (stub != NULL) { - call_stub_destroy (stub); - } - } - return 0; -} + if (!priv->watchdog_running) { + return; + } + if (pthread_cancel(priv->watchdog_thread) != 0) { + gf_log(this->name, GF_LOG_WARNING, + "pthread_cancel(iot_watchdog) failed"); + } -/* Must be called with worker lock held */ -void -_iot_queue (iot_worker_t *worker, iot_request_t *req) -{ - list_add_tail (&req->list, &worker->rqlist); + if (pthread_join(priv->watchdog_thread, NULL) != 0) { + gf_log(this->name, GF_LOG_WARNING, "pthread_join(iot_watchdog) failed"); + } - /* dq_cond */ - worker->queue_size++; - iot_notify_worker(worker); + /* Failure probably means it's already dead. */ + priv->watchdog_running = _gf_false; } - -iot_request_t * -iot_init_request (iot_worker_t *worker, call_stub_t *stub) +int +reconfigure(xlator_t *this, dict_t *options) { - iot_request_t *req = NULL; + iot_conf_t *conf = NULL; + int ret = -1; - req = mem_get (worker->req_pool); - if (req == NULL) { - goto out; - } + conf = this->private; + if (!conf) + goto out; - req->stub = stub; -out: - return req; -} + GF_OPTION_RECONF("thread-count", conf->max_count, options, int32, out); + GF_OPTION_RECONF("high-prio-threads", conf->ac_iot_limit[GF_FOP_PRI_HI], + options, int32, out); -void -iot_destroy_request (iot_worker_t *worker, iot_request_t * req) -{ - if ((req == NULL) || (worker == NULL)) - return; + GF_OPTION_RECONF("normal-prio-threads", + conf->ac_iot_limit[GF_FOP_PRI_NORMAL], options, int32, + out); - mem_put (worker->req_pool, req); -} + GF_OPTION_RECONF("low-prio-threads", conf->ac_iot_limit[GF_FOP_PRI_LO], + options, int32, out); + GF_OPTION_RECONF("least-prio-threads", conf->ac_iot_limit[GF_FOP_PRI_LEAST], + options, int32, out); -/* Must be called with worker lock held. */ -gf_boolean_t -iot_can_ordered_exit (iot_worker_t * worker) -{ - gf_boolean_t allow_exit = _gf_false; - iot_conf_t *conf = NULL; + GF_OPTION_RECONF("enable-least-priority", conf->least_priority, options, + bool, out); - conf = worker->conf; - /* We dont want this thread to exit if its index is - * below the min thread count. - */ - if (worker->thread_idx >= conf->min_o_threads) - allow_exit = _gf_true; + GF_OPTION_RECONF("cleanup-disconnected-reqs", + conf->cleanup_disconnected_reqs, options, bool, out); - return allow_exit; -} + GF_OPTION_RECONF("watchdog-secs", conf->watchdog_secs, options, int32, out); -/* Must be called with worker lock held. */ -gf_boolean_t -iot_ordered_exit (int cond_waitres, iot_worker_t *worker) -{ - gf_boolean_t allow_exit = _gf_false; + GF_OPTION_RECONF("pass-through", this->pass_through, options, bool, out); - if (worker->state == IOT_STATE_EXIT_REQUEST) { - allow_exit = _gf_true; - } else if (cond_waitres == ETIMEDOUT) { - allow_exit = iot_can_ordered_exit (worker); - } - - if (allow_exit) { - worker->state = IOT_STATE_DEAD; - worker->thread = 0; - } + if (conf->watchdog_secs > 0) { + start_iot_watchdog(this); + } else { + stop_iot_watchdog(this); + } - return allow_exit; + ret = 0; +out: + return ret; } - int -iot_ordered_request_wait (iot_worker_t * worker) +init(xlator_t *this) { - int waitres = 0; - int retstat = 0; + iot_conf_t *conf = NULL; + int ret = -1; + int i = 0; - if (worker->state == IOT_STATE_EXIT_REQUEST) { - retstat = -1; - goto out; - } + if (!this->children || this->children->next) { + gf_smsg("io-threads", GF_LOG_ERROR, 0, + IO_THREADS_MSG_XLATOR_CHILD_MISCONFIGURED, NULL); + goto out; + } - waitres = iot_notify_wait (worker, worker->conf->o_idle_time); - if (iot_ordered_exit (waitres, worker)) { - retstat = -1; - } + if (!this->parents) { + gf_smsg(this->name, GF_LOG_WARNING, 0, IO_THREADS_MSG_VOL_MISCONFIGURED, + NULL); + } -out: - return retstat; -} + conf = (void *)GF_CALLOC(1, sizeof(*conf), gf_iot_mt_iot_conf_t); + if (conf == NULL) { + gf_smsg(this->name, GF_LOG_ERROR, ENOMEM, IO_THREADS_MSG_OUT_OF_MEMORY, + NULL); + goto out; + } + if ((ret = pthread_cond_init(&conf->cond, NULL)) != 0) { + gf_smsg(this->name, GF_LOG_ERROR, 0, IO_THREADS_MSG_PTHREAD_INIT_FAILED, + "pthread_cond_init ret=%d", ret, NULL); + goto out; + } + conf->cond_inited = _gf_true; -call_stub_t * -iot_dequeue_ordered (iot_worker_t *worker) -{ - call_stub_t *stub = NULL; - iot_request_t *req = NULL; - int waitstat = 0; + if ((ret = pthread_mutex_init(&conf->mutex, NULL)) != 0) { + gf_smsg(this->name, GF_LOG_ERROR, 0, IO_THREADS_MSG_PTHREAD_INIT_FAILED, + "pthread_mutex_init ret=%d", ret, NULL); + goto out; + } + conf->mutex_inited = _gf_true; - LOCK (&worker->qlock); - { - while (!worker->queue_size) { - waitstat = 0; - waitstat = iot_ordered_request_wait (worker); - /* We must've timed out and are now required to - * exit. - */ - if (waitstat == -1) - goto out; - } + ret = set_stack_size(conf); - list_for_each_entry (req, &worker->rqlist, list) - break; - list_del (&req->list); - stub = req->stub; + if (ret != 0) + goto out; - worker->queue_size--; - } -out: - UNLOCK (&worker->qlock); - iot_destroy_request (worker, req); + ret = -1; - return stub; -} + GF_OPTION_INIT("thread-count", conf->max_count, int32, out); + GF_OPTION_INIT("high-prio-threads", conf->ac_iot_limit[GF_FOP_PRI_HI], + int32, out); -void * -iot_worker_ordered (void *arg) -{ - iot_worker_t *worker = arg; - call_stub_t *stub = NULL; + GF_OPTION_INIT("normal-prio-threads", conf->ac_iot_limit[GF_FOP_PRI_NORMAL], + int32, out); - while (1) { + GF_OPTION_INIT("low-prio-threads", conf->ac_iot_limit[GF_FOP_PRI_LO], int32, + out); - stub = iot_dequeue_ordered (worker); - /* If stub is NULL, we must've timed out waiting for a - * request and have now been allowed to exit. - */ - if (!stub) - break; - call_resume (stub); - } + GF_OPTION_INIT("least-prio-threads", conf->ac_iot_limit[GF_FOP_PRI_LEAST], + int32, out); - return NULL; -} + GF_OPTION_INIT("idle-time", conf->idle_time, int32, out); + GF_OPTION_INIT("enable-least-priority", conf->least_priority, bool, out); -/* Must be called with worker lock held. */ -gf_boolean_t -iot_can_unordered_exit (iot_worker_t * worker) -{ - gf_boolean_t allow_exit = _gf_false; - iot_conf_t *conf = NULL; + GF_OPTION_INIT("cleanup-disconnected-reqs", conf->cleanup_disconnected_reqs, + bool, out); - conf = worker->conf; - /* We dont want this thread to exit if its index is - * below the min thread count. - */ - if (worker->thread_idx >= conf->min_u_threads) - allow_exit = _gf_true; + GF_OPTION_INIT("pass-through", this->pass_through, bool, out); - return allow_exit; -} + conf->this = this; + GF_ATOMIC_INIT(conf->stub_cnt, 0); + for (i = 0; i < GF_FOP_PRI_MAX; i++) { + INIT_LIST_HEAD(&conf->clients[i]); + INIT_LIST_HEAD(&conf->no_client[i].clients); + INIT_LIST_HEAD(&conf->no_client[i].reqs); + } -/* Must be called with worker lock held. */ -gf_boolean_t -iot_unordered_exit (int cond_waitres, iot_worker_t *worker) -{ - gf_boolean_t allow_exit = _gf_false; - - if (worker->state == IOT_STATE_EXIT_REQUEST) { - allow_exit = _gf_true; - } else if (cond_waitres == ETIMEDOUT) { - allow_exit = iot_can_unordered_exit (worker); - } + if (!this->pass_through) { + ret = iot_workers_scale(conf); - if (allow_exit) { - worker->state = IOT_STATE_DEAD; - worker->thread = 0; + if (ret == -1) { + gf_smsg(this->name, GF_LOG_ERROR, 0, + IO_THREADS_MSG_WORKER_THREAD_INIT_FAILED, NULL); + goto out; } + } - return allow_exit; -} - - -int -iot_unordered_request_wait (iot_worker_t * worker) -{ - int waitres = 0; - int retstat = 0; - - if (worker->state == IOT_STATE_EXIT_REQUEST) { - retstat = -1; - goto out; - } + this->private = conf; - waitres = iot_notify_wait (worker, worker->conf->u_idle_time); - if (iot_unordered_exit (waitres, worker)) { - retstat = -1; - } + conf->watchdog_secs = 0; + GF_OPTION_INIT("watchdog-secs", conf->watchdog_secs, int32, out); + if (conf->watchdog_secs > 0) { + start_iot_watchdog(this); + } + ret = 0; out: - return retstat; -} - - -call_stub_t * -iot_dequeue_unordered (iot_worker_t *worker) -{ - call_stub_t *stub= NULL; - iot_request_t *req = NULL; - int waitstat = 0; - - LOCK (&worker->qlock); - { - while (!worker->queue_size) { - waitstat = 0; - waitstat = iot_unordered_request_wait (worker); - /* If -1, request wait must've timed - * out. - */ - if (waitstat == -1) - goto out; + if (ret) + GF_FREE(conf); + + return ret; +} + +static void +iot_exit_threads(iot_conf_t *conf) +{ + pthread_mutex_lock(&conf->mutex); + { + conf->down = _gf_true; + /*Let all the threads know that xl is going down*/ + pthread_cond_broadcast(&conf->cond); + while (conf->curr_count) /*Wait for threads to exit*/ + pthread_cond_wait(&conf->cond, &conf->mutex); + } + pthread_mutex_unlock(&conf->mutex); +} + +int +notify(xlator_t *this, int32_t event, void *data, ...) +{ + iot_conf_t *conf = this->private; + xlator_t *victim = data; + uint64_t stub_cnt = 0; + struct timespec sleep_till = { + 0, + }; + + if (GF_EVENT_PARENT_DOWN == event) { + if (victim->cleanup_starting) { + /* Wait for draining stub from queue before notify PARENT_DOWN */ + stub_cnt = GF_ATOMIC_GET(conf->stub_cnt); + if (stub_cnt) { + timespec_now_realtime(&sleep_till); + sleep_till.tv_sec += 1; + pthread_mutex_lock(&conf->mutex); + { + while (stub_cnt) { + (void)pthread_cond_timedwait(&conf->cond, &conf->mutex, + &sleep_till); + stub_cnt = GF_ATOMIC_GET(conf->stub_cnt); + } } + pthread_mutex_unlock(&conf->mutex); + } - list_for_each_entry (req, &worker->rqlist, list) - break; - list_del (&req->list); - stub = req->stub; - - worker->queue_size--; - } -out: - UNLOCK (&worker->qlock); - iot_destroy_request (worker, req); - - return stub; -} - - -void * -iot_worker_unordered (void *arg) -{ - iot_worker_t *worker = arg; - call_stub_t *stub = NULL; - - while (1) { - - stub = iot_dequeue_unordered (worker); - /* If no request was received, we must've timed out, - * and can exit. */ - if (!stub) - break; - - call_resume (stub); - } - - return NULL; -} - - -void -deallocate_worker_array (iot_worker_t **workers) -{ - FREE (workers); -} - -void -deallocate_workers (iot_worker_t **workers, - int start_alloc_idx, int count) -{ - int i; - int end_count; - - end_count = count + start_alloc_idx; - for (i = start_alloc_idx; (i < end_count); i++) { - if (workers[i] != NULL) { - mem_pool_destroy (workers[i]->req_pool); - FREE (workers[i]); - workers[i] = NULL; - } + gf_log(this->name, GF_LOG_INFO, + "Notify GF_EVENT_PARENT_DOWN for brick %s", victim->name); + } else { + iot_exit_threads(conf); } - -} - - -iot_worker_t ** -allocate_worker_array (int count) -{ - iot_worker_t **warr = NULL; - - warr = CALLOC (count, sizeof (iot_worker_t *)); + } - return warr; -} - - -iot_worker_t * -allocate_worker (iot_conf_t * conf) -{ - iot_worker_t *wrk = NULL; - - wrk = CALLOC (1, sizeof (iot_worker_t)); - if (wrk == NULL) { - gf_log (conf->this->name, GF_LOG_ERROR, "out of memory"); - goto out; + if (GF_EVENT_CHILD_DOWN == event) { + if (victim->cleanup_starting) { + iot_exit_threads(conf); + gf_log(this->name, GF_LOG_INFO, + "Notify GF_EVENT_CHILD_DOWN for brick %s", victim->name); } + } - wrk->req_pool = mem_pool_new (iot_request_t, IOT_REQUEST_MEMPOOL_SIZE); - if (wrk->req_pool == NULL) - goto free_wrk; - - INIT_LIST_HEAD (&wrk->rqlist); - wrk->conf = conf; - iot_notify_init (wrk); - wrk->state = IOT_STATE_DEAD; - -out: - return wrk; + default_notify(this, event, data); -free_wrk: - FREE (wrk); - return NULL; + return 0; } - -int -allocate_workers (iot_conf_t *conf, iot_worker_t **workers, int start_alloc_idx, - int count) -{ - int i; - int end_count, ret = -1; - - end_count = count + start_alloc_idx; - for (i = start_alloc_idx; i < end_count; i++) { - workers[i] = allocate_worker (conf); - if (workers[i] == NULL) { - ret = -ENOMEM; - goto out; - } - workers[i]->thread_idx = i; - } - ret = 0; - -out: - return ret; -} - - void -iot_stop_worker (iot_worker_t *worker) +fini(xlator_t *this) { - LOCK (&worker->qlock); - { - worker->state = IOT_STATE_EXIT_REQUEST; - } - UNLOCK (&worker->qlock); + iot_conf_t *conf = this->private; - iot_notify_worker (worker); - pthread_join (worker->thread, NULL); -} + if (!conf) + return; + if (conf->mutex_inited && conf->cond_inited) + iot_exit_threads(conf); -void -iot_stop_workers (iot_worker_t **workers, int start_idx, int count) -{ - int i = 0; - int end_idx = 0; + if (conf->cond_inited) + pthread_cond_destroy(&conf->cond); - end_idx = start_idx + count; - for (i = start_idx; i < end_idx; i++) { - if (workers[i] != NULL) { - iot_stop_worker (workers[i]); - } - } -} + if (conf->mutex_inited) + pthread_mutex_destroy(&conf->mutex); + stop_iot_watchdog(this); -int -iot_startup_worker (iot_worker_t *worker, iot_worker_fn workerfunc) -{ - int ret = -1; - ret = pthread_create (&worker->thread, &worker->conf->w_attr, - workerfunc, worker); - if (ret != 0) { - gf_log (worker->conf->this->name, GF_LOG_ERROR, - "cannot start worker (%s)", strerror (errno)); - ret = -ret; - } else { - worker->state = IOT_STATE_ACTIVE; - } + GF_FREE(conf); - return ret; + this->private = NULL; + return; } - int -iot_startup_workers (iot_worker_t **workers, int start_idx, int count, - iot_worker_fn workerfunc) -{ - int i = 0; - int end_idx = 0; - int ret = -1; - - end_idx = start_idx + count; - for (i = start_idx; i < end_idx; i++) { - ret = iot_startup_worker (workers[i], workerfunc); - if (ret < 0) { - goto out; - } - } - - ret = 0; -out: - return ret; -} - - -void -set_stack_size (iot_conf_t *conf) +iot_client_destroy(xlator_t *this, client_t *client) { - int err = 0; - size_t stacksize = IOT_THREAD_STACK_SIZE; + void *tmp = NULL; - pthread_attr_init (&conf->w_attr); - err = pthread_attr_setstacksize (&conf->w_attr, stacksize); - if (err == EINVAL) { - gf_log (conf->this->name, GF_LOG_WARNING, - "Using default thread stack size"); - } -} + if (client_ctx_del(client, this, &tmp) == 0) { + GF_FREE(tmp); + } - -void -iot_cleanup_workers (iot_conf_t *conf) -{ - if (conf->uworkers != NULL) { - iot_stop_workers (conf->uworkers, 0, - conf->max_u_threads); - - deallocate_workers (conf->uworkers, 0, - conf->max_u_threads); - - deallocate_worker_array (conf->uworkers); - } - - if (conf->oworkers != NULL) { - iot_stop_workers (conf->oworkers, 0, - conf->max_o_threads); - - deallocate_workers (conf->oworkers, 0, - conf->max_o_threads); - - deallocate_worker_array (conf->oworkers); - } + return 0; } - -int -workers_init (iot_conf_t *conf) +static int +iot_disconnect_cbk(xlator_t *this, client_t *client) { - int ret = -1; + int i; + call_stub_t *curr; + call_stub_t *next; + iot_conf_t *conf = this->private; + iot_client_ctx_t *ctx; - if (conf == NULL) { - ret = -EINVAL; - goto err; - } + if (!conf || !conf->cleanup_disconnected_reqs) { + goto out; + } - /* Initialize un-ordered workers */ - conf->uworkers = allocate_worker_array (conf->max_u_threads); - if (conf->uworkers == NULL) { - gf_log (conf->this->name, GF_LOG_ERROR, "out of memory"); - ret = -ENOMEM; - goto err; - } - - ret = allocate_workers (conf, conf->uworkers, 0, - conf->max_u_threads); - if (ret < 0) { - gf_log (conf->this->name, GF_LOG_ERROR, "out of memory"); - goto err; - } - - /* Initialize ordered workers */ - conf->oworkers = allocate_worker_array (conf->max_o_threads); - if (conf->oworkers == NULL) { - gf_log (conf->this->name, GF_LOG_ERROR, "out of memory"); - ret = -ENOMEM; - goto err; - } - - ret = allocate_workers (conf, conf->oworkers, 0, - conf->max_o_threads); - if (ret < 0) { - gf_log (conf->this->name, GF_LOG_ERROR, "out of memory"); - goto err; - } - - set_stack_size (conf); - ret = iot_startup_workers (conf->oworkers, 0, conf->min_o_threads, - iot_worker_ordered); - if (ret == -1) { - /* logged inside iot_startup_workers */ - goto err; - } - - ret = iot_startup_workers (conf->uworkers, 0, conf->min_u_threads, - iot_worker_unordered); - if (ret == -1) { - /* logged inside iot_startup_workers */ - goto err; - } - - return 0; - -err: - if (conf != NULL) { - iot_cleanup_workers (conf); - } - - return ret; -} - - -int -init (xlator_t *this) -{ - iot_conf_t *conf = NULL; - dict_t *options = this->options; - int thread_count = IOT_DEFAULT_THREADS; - gf_boolean_t autoscaling = IOT_SCALING_OFF; - char *scalestr = NULL; - int min_threads, max_threads, ret = -1; - - if (!this->children || this->children->next) { - gf_log ("io-threads", GF_LOG_ERROR, - "FATAL: iot not configured with exactly one child"); - goto out; - } - - if (!this->parents) { - gf_log (this->name, GF_LOG_WARNING, - "dangling volume. check volfile "); - } - - conf = (void *) CALLOC (1, sizeof (*conf)); - if (conf == NULL) { - gf_log (this->name, GF_LOG_ERROR, - "out of memory"); - goto out; - } - - if ((dict_get_str (options, "autoscaling", &scalestr)) == 0) { - if ((gf_string2boolean (scalestr, &autoscaling)) == -1) { - gf_log (this->name, GF_LOG_ERROR, - "'autoscaling' option must be" - " boolean"); - goto out; - } - } - - if (dict_get (options, "thread-count")) { - thread_count = data_to_int32 (dict_get (options, - "thread-count")); - if (scalestr != NULL) - gf_log (this->name, GF_LOG_WARNING, - "'thread-count' is specified with " - "'autoscaling' on. Ignoring" - "'thread-count' option."); - if (thread_count < 2) - thread_count = IOT_MIN_THREADS; - } - - min_threads = IOT_DEFAULT_THREADS; - max_threads = IOT_MAX_THREADS; - if (dict_get (options, "min-threads")) - min_threads = data_to_int32 (dict_get (options, - "min-threads")); - - if (dict_get (options, "max-threads")) - max_threads = data_to_int32 (dict_get (options, - "max-threads")); - - if (min_threads > max_threads) { - gf_log (this->name, GF_LOG_ERROR, " min-threads must be less " - "than max-threads"); - goto out; - } - - /* If autoscaling is off, then adjust the min and max - * threads according to thread-count. - * This is based on the assumption that despite autoscaling - * being off, we still want to have separate pools for data - * and meta-data threads. - */ - if (!autoscaling) - max_threads = min_threads = thread_count; - - /* If user specifies an odd number of threads, increase it by - * one. The reason for having an even number of threads is - * explained later. - */ - if (max_threads % 2) - max_threads++; - - if(min_threads % 2) - min_threads++; - - /* If the user wants to have only a single thread for - * some strange reason, make sure we set this count to - * 2. Explained later. - */ - if (min_threads < IOT_MIN_THREADS) - min_threads = IOT_MIN_THREADS; - - /* Again, have atleast two. Read on. */ - if (max_threads < IOT_MIN_THREADS) - max_threads = IOT_MIN_THREADS; - - /* This is why we need atleast two threads. - * We're dividing the specified thread pool into - * 2 halves, equally between ordered and unordered - * pools. - */ - - /* Init params for un-ordered workers. */ - pthread_mutex_init (&conf->utlock, NULL); - conf->max_u_threads = max_threads / 2; - conf->min_u_threads = min_threads / 2; - conf->u_idle_time = IOT_DEFAULT_IDLE; - conf->u_scaling = autoscaling; - - /* Init params for ordered workers. */ - pthread_mutex_init (&conf->otlock, NULL); - conf->max_o_threads = max_threads / 2; - conf->min_o_threads = min_threads / 2; - conf->o_idle_time = IOT_DEFAULT_IDLE; - conf->o_scaling = autoscaling; - - gf_log (this->name, GF_LOG_DEBUG, - "io-threads: Autoscaling: %s, " - "min_threads: %d, max_threads: %d", - (autoscaling) ? "on":"off", min_threads, max_threads); - - conf->this = this; - ret = workers_init (conf); - if (ret == -1) { - gf_log (this->name, GF_LOG_ERROR, - "cannot initialize worker threads, exiting init"); - FREE (conf); - goto out; + pthread_mutex_lock(&conf->mutex); + for (i = 0; i < GF_FOP_PRI_MAX; i++) { + ctx = &conf->no_client[i]; + list_for_each_entry_safe(curr, next, &ctx->reqs, list) + { + if (curr->frame->root->client != client) { + continue; + } + gf_log(this->name, GF_LOG_INFO, + "poisoning %s fop at %p for client %s", + gf_fop_list[curr->fop], curr, client->client_uid); + curr->poison = _gf_true; } + } + pthread_mutex_unlock(&conf->mutex); - this->private = conf; - ret = 0; out: - return ret; + return 0; } - -void -fini (xlator_t *this) -{ - iot_conf_t *conf = this->private; - - FREE (conf); - - this->private = NULL; - return; -} - -/* - * O - Goes to ordered threadpool. - * U - Goes to un-ordered threadpool. - * V - Variable, depends on whether the file is open. - * If it is, then goes to ordered, otherwise to - * un-ordered. - */ -struct xlator_fops fops = { - .open = iot_open, /* U */ - .create = iot_create, /* U */ - .readv = iot_readv, /* O */ - .writev = iot_writev, /* O */ - .flush = iot_flush, /* O */ - .fsync = iot_fsync, /* O */ - .lk = iot_lk, /* O */ - .stat = iot_stat, /* V */ - .fstat = iot_fstat, /* O */ - .truncate = iot_truncate, /* V */ - .ftruncate = iot_ftruncate, /* O */ - .checksum = iot_checksum, /* U */ - .unlink = iot_unlink, /* U */ - .lookup = iot_lookup, /* U */ - .setattr = iot_setattr, /* U */ - .fsetattr = iot_fsetattr, /* O */ - .access = iot_access, /* U */ - .readlink = iot_readlink, /* U */ - .mknod = iot_mknod, /* U */ - .mkdir = iot_mkdir, /* U */ - .rmdir = iot_rmdir, /* U */ - .symlink = iot_symlink, /* U */ - .rename = iot_rename, /* U */ - .link = iot_link, /* U */ - .opendir = iot_opendir, /* U */ - .fsyncdir = iot_fsyncdir, /* O */ - .statfs = iot_statfs, /* U */ - .setxattr = iot_setxattr, /* U */ - .getxattr = iot_getxattr, /* U */ - .fgetxattr = iot_fgetxattr, /* O */ - .fsetxattr = iot_fsetxattr, /* O */ - .removexattr = iot_removexattr, /* U */ - .readdir = iot_readdir, /* O */ - .readdirp = iot_readdirp, /* O */ - .xattrop = iot_xattrop, /* U */ - .fxattrop = iot_fxattrop, /* O */ +struct xlator_dumpops dumpops = { + .priv = iot_priv_dump, }; -struct xlator_mops mops = { +struct xlator_fops fops = { + .open = iot_open, + .create = iot_create, + .readv = iot_readv, + .writev = iot_writev, + .flush = iot_flush, + .fsync = iot_fsync, + .lk = iot_lk, + .stat = iot_stat, + .fstat = iot_fstat, + .truncate = iot_truncate, + .ftruncate = iot_ftruncate, + .unlink = iot_unlink, + .lookup = iot_lookup, + .setattr = iot_setattr, + .fsetattr = iot_fsetattr, + .access = iot_access, + .readlink = iot_readlink, + .mknod = iot_mknod, + .mkdir = iot_mkdir, + .rmdir = iot_rmdir, + .symlink = iot_symlink, + .rename = iot_rename, + .link = iot_link, + .opendir = iot_opendir, + .fsyncdir = iot_fsyncdir, + .statfs = iot_statfs, + .setxattr = iot_setxattr, + .getxattr = iot_getxattr, + .fgetxattr = iot_fgetxattr, + .fsetxattr = iot_fsetxattr, + .removexattr = iot_removexattr, + .fremovexattr = iot_fremovexattr, + .readdir = iot_readdir, + .readdirp = iot_readdirp, + .inodelk = iot_inodelk, + .finodelk = iot_finodelk, + .entrylk = iot_entrylk, + .fentrylk = iot_fentrylk, + .xattrop = iot_xattrop, + .fxattrop = iot_fxattrop, + .rchecksum = iot_rchecksum, + .fallocate = iot_fallocate, + .discard = iot_discard, + .zerofill = iot_zerofill, + .seek = iot_seek, + .lease = iot_lease, + .getactivelk = iot_getactivelk, + .setactivelk = iot_setactivelk, + .put = iot_put, }; struct xlator_cbks cbks = { + .client_destroy = iot_client_destroy, + .client_disconnect = iot_disconnect_cbk, }; struct volume_options options[] = { - { .key = {"thread-count"}, - .type = GF_OPTION_TYPE_INT, - .min = IOT_MIN_THREADS, - .max = IOT_MAX_THREADS - }, - { .key = {"autoscaling"}, - .type = GF_OPTION_TYPE_BOOL - }, - { .key = {"min-threads"}, - .type = GF_OPTION_TYPE_INT, - .min = IOT_MIN_THREADS, - .max = IOT_MAX_THREADS, - .description = "Minimum number of threads must be greater than or " - "equal to 2. If the specified value is less than 2 " - "it is adjusted upwards to 2. This is a requirement" - " for the current model of threading in io-threads." - }, - { .key = {"max-threads"}, - .type = GF_OPTION_TYPE_INT, - .min = IOT_MIN_THREADS, - .max = IOT_MAX_THREADS, - .description = "Maximum number of threads is advisory only so the " - "user specified value will be used." - }, - { .key = {NULL} }, + {.key = {"thread-count"}, + .type = GF_OPTION_TYPE_INT, + .min = IOT_MIN_THREADS, + .max = IOT_MAX_THREADS, + .default_value = "16", + .op_version = {1}, + .flags = OPT_FLAG_SETTABLE | OPT_FLAG_DOC, + .tags = {"io-threads"}, + /*.option = "thread-count"*/ + .description = "Number of threads in IO threads translator which " + "perform concurrent IO operations" + + }, + {.key = {"high-prio-threads"}, + .type = GF_OPTION_TYPE_INT, + .min = IOT_MIN_THREADS, + .max = IOT_MAX_THREADS, + .default_value = "16", + .op_version = {1}, + .flags = OPT_FLAG_SETTABLE | OPT_FLAG_DOC, + .tags = {"io-threads"}, + .description = "Max number of threads in IO threads translator which " + "perform high priority IO operations at a given time" + + }, + {.key = {"normal-prio-threads"}, + .type = GF_OPTION_TYPE_INT, + .min = IOT_MIN_THREADS, + .max = IOT_MAX_THREADS, + .default_value = "16", + .op_version = {1}, + .flags = OPT_FLAG_SETTABLE | OPT_FLAG_DOC, + .tags = {"io-threads"}, + .description = "Max number of threads in IO threads translator which " + "perform normal priority IO operations at a given time" + + }, + {.key = {"low-prio-threads"}, + .type = GF_OPTION_TYPE_INT, + .min = IOT_MIN_THREADS, + .max = IOT_MAX_THREADS, + .default_value = "16", + .op_version = {1}, + .flags = OPT_FLAG_SETTABLE | OPT_FLAG_DOC, + .tags = {"io-threads"}, + .description = "Max number of threads in IO threads translator which " + "perform low priority IO operations at a given time" + + }, + {.key = {"least-prio-threads"}, + .type = GF_OPTION_TYPE_INT, + .min = IOT_MIN_THREADS, + .max = IOT_MAX_THREADS, + .default_value = "1", + .op_version = {1}, + .flags = OPT_FLAG_SETTABLE | OPT_FLAG_DOC, + .tags = {"io-threads"}, + .description = "Max number of threads in IO threads translator which " + "perform least priority IO operations at a given time"}, + {.key = {"enable-least-priority"}, + .type = GF_OPTION_TYPE_BOOL, + .default_value = SITE_H_ENABLE_LEAST_PRIORITY, + .op_version = {1}, + .flags = OPT_FLAG_SETTABLE | OPT_FLAG_DOC, + .tags = {"io-threads"}, + .description = "Enable/Disable least priority"}, + { + .key = {"idle-time"}, + .type = GF_OPTION_TYPE_INT, + .min = 1, + .max = 0x7fffffff, + .default_value = "120", + }, + {.key = {"watchdog-secs"}, + .type = GF_OPTION_TYPE_INT, + .min = 0, + .default_value = 0, + .op_version = {GD_OP_VERSION_4_1_0}, + .flags = OPT_FLAG_SETTABLE | OPT_FLAG_DOC, + .tags = {"io-threads"}, + .description = "Number of seconds a queue must be stalled before " + "starting an 'emergency' thread."}, + {.key = {"cleanup-disconnected-reqs"}, + .type = GF_OPTION_TYPE_BOOL, + .default_value = "off", + .op_version = {GD_OP_VERSION_4_1_0}, + .flags = OPT_FLAG_SETTABLE | OPT_FLAG_DOC | OPT_FLAG_CLIENT_OPT, + .tags = {"io-threads"}, + .description = "'Poison' queued requests when a client disconnects"}, + {.key = {"pass-through"}, + .type = GF_OPTION_TYPE_BOOL, + .default_value = "false", + .op_version = {GD_OP_VERSION_4_1_0}, + .flags = OPT_FLAG_SETTABLE | OPT_FLAG_DOC | OPT_FLAG_CLIENT_OPT, + .tags = {"io-threads"}, + .description = "Enable/Disable io threads translator"}, + { + .key = {NULL}, + }, +}; + +xlator_api_t xlator_api = { + .init = init, + .fini = fini, + .notify = notify, + .reconfigure = reconfigure, + .mem_acct_init = mem_acct_init, + .op_version = {1}, /* Present from the initial version */ + .dumpops = &dumpops, + .fops = &fops, + .cbks = &cbks, + .options = options, + .identifier = "io-threads", + .category = GF_MAINTAINED, }; diff --git a/xlators/performance/io-threads/src/io-threads.h b/xlators/performance/io-threads/src/io-threads.h index 3843791ed8e..f54d2f4912d 100644 --- a/xlators/performance/io-threads/src/io-threads.h +++ b/xlators/performance/io-threads/src/io-threads.h @@ -1,183 +1,86 @@ /* - Copyright (c) 2006-2009 Gluster, Inc. <http://www.gluster.com> + Copyright (c) 2008-2012 Red Hat, Inc. <http://www.redhat.com> This file is part of GlusterFS. - GlusterFS is free software; you can redistribute it and/or modify - it under the terms of the GNU General Public License as published - by the Free Software Foundation; either version 3 of the License, - or (at your option) any later version. - - GlusterFS is distributed in the hope that it will be useful, but - WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - General Public License for more details. - - You should have received a copy of the GNU General Public License - along with this program. If not, see - <http://www.gnu.org/licenses/>. + This file is licensed to you under your choice of the GNU Lesser + General Public License, version 3 or any later version (LGPLv3 or + later), or the GNU General Public License, version 2 (GPLv2), in all + cases as published by the Free Software Foundation. */ #ifndef __IOT_H #define __IOT_H -#ifndef _CONFIG_H -#define _CONFIG_H -#include "config.h" -#endif - - -#include "compat-errno.h" -#include "glusterfs.h" -#include "logging.h" -#include "dict.h" -#include "xlator.h" -#include "common-utils.h" -#include "list.h" +#include <glusterfs/compat-errno.h> +#include <glusterfs/glusterfs.h> +#include <glusterfs/logging.h> +#include <glusterfs/dict.h> +#include <glusterfs/xlator.h> +#include <glusterfs/common-utils.h> +#include <glusterfs/list.h> #include <stdlib.h> -#include "locking.h" -#include "compat.h" -#ifndef GF_SOLARIS_HOST_OS +#include <glusterfs/locking.h> +#include "iot-mem-types.h" #include <semaphore.h> -#endif - -#define min(a,b) ((a)<(b)?(a):(b)) -#define max(a,b) ((a)>(b)?(a):(b)) +#include <glusterfs/statedump.h> struct iot_conf; -struct iot_worker; -struct iot_request; - -struct iot_request { - struct list_head list; /* Attaches this request to the list of - requests. - */ - call_stub_t *stub; -}; -typedef enum { - IOT_STATE_ACTIVE, - IOT_STATE_EXIT_REQUEST, - IOT_STATE_DEAD -}iot_state_t; -#define iot_worker_active(wrk) ((wrk)->state == IOT_STATE_ACTIVE) - -#define MAX_IDLE_SKEW 4 /* In secs */ -#define skew_sec_idle_time(sec) ((sec) + (random () % MAX_IDLE_SKEW)) -#define IOT_DEFAULT_IDLE 180 /* In secs. */ - -#define IOT_MIN_THREADS 2 -#define IOT_DEFAULT_THREADS 16 -#define IOT_MAX_THREADS 64 - -#define IOT_SCALING_OFF _gf_false -#define IOT_SCALING_ON _gf_true -#define iot_ordered_scaling_on(conf) ((conf)->o_scaling == IOT_SCALING_ON) -#define iot_unordered_scaling_on(conf) ((conf)->u_scaling == IOT_SCALING_ON) - -#define IOT_THREAD_STACK_SIZE ((size_t)(1024*1024)) - -/* This signifies the max number of outstanding request we're expecting - * at a point for every worker thread. - * For an idea of the memory foot-print, consider at most 16 Bytes per - * iot_request_t on a 64-bit system with another 16 bytes per chunk in the - * header. For 64 slots in the pool, we'll use up 2 KiB, with 64 threads this - * goes up to 128 KiB. - * - * Note that this size defines the size of the per-worker mem pool. The - * advantage is that, we're not only reducing the rate of small iot_request_t - * allocations from the heap but also reducing the contention on the libc heap - * by having a mem pool, though small, for each worker. - */ -#define IOT_REQUEST_MEMPOOL_SIZE 64 - -struct iot_worker { - struct list_head rqlist; /* List of requests assigned to me. */ - struct iot_conf *conf; -#ifndef HAVE_SPINLOCK - pthread_cond_t notifier; -#else - sem_t notifier; -#endif - int64_t q,dq; - gf_lock_t qlock; - int32_t queue_size; - pthread_t thread; - iot_state_t state; /* What state is the thread in. */ - int thread_idx; /* Thread's index into the worker - array. Since this will be thread - local data, for ensuring that - number of threads dont fall below - a minimum, we just dont allow - threads with specific indices to - exit. Helps us in eliminating one - place where otherwise a lock - would have been required to update - centralized state inside conf. - */ - struct mem_pool *req_pool; /* iot_request_t's come from here. */ -}; +#define MAX_IDLE_SKEW 4 /* In secs */ +#define skew_sec_idle_time(sec) ((sec) + (random() % MAX_IDLE_SKEW)) +#define IOT_DEFAULT_IDLE 120 /* In secs. */ + +#define IOT_MIN_THREADS 1 +#define IOT_DEFAULT_THREADS 16 +#define IOT_MAX_THREADS 64 + +#define IOT_THREAD_STACK_SIZE ((size_t)(256 * 1024)) + +typedef struct { + struct list_head clients; + struct list_head reqs; +} iot_client_ctx_t; struct iot_conf { - int32_t thread_count; - struct iot_worker **workers; - - xlator_t *this; - /* Config state for ordered threads. */ - pthread_mutex_t otlock; /* Used to sync any state that needs - to be changed by the ordered - threads. - */ - - int max_o_threads; /* Max. number of ordered threads */ - int min_o_threads; /* Min. number of ordered threads. - Ordered thread count never falls - below this threshold. - */ - - int o_idle_time; /* in Secs. The idle time after - which an ordered thread exits. - */ - gf_boolean_t o_scaling; /* Set to IOT_SCALING_OFF if user - does not want thread scaling on - ordered threads. If scaling is - off, io-threads maintains at - least min_o_threads number of - threads and never lets any thread - exit. - */ - struct iot_worker **oworkers; /* Ordered thread pool. */ - - - /* Config state for unordered threads */ - pthread_mutex_t utlock; /* Used for scaling un-ordered - threads. */ - struct iot_worker **uworkers; /* Un-ordered thread pool. */ - int max_u_threads; /* Number of unordered threads will - not be higher than this. */ - int min_u_threads; /* Number of unordered threads - should not fall below this value. - */ - int u_idle_time; /* If an unordered thread does not - get a request for this amount of - secs, it should try to die. - */ - gf_boolean_t u_scaling; /* Set to IOT_SCALING_OFF if user - does not want thread scaling on - unordered threads. If scaling is - off, io-threads maintains at - least min_u_threads number of - threads and never lets any thread - exit. - */ - - pthread_attr_t w_attr; /* Used to reduce the stack size of - the pthread worker down from the - default of 8MiB. - */ + pthread_mutex_t mutex; + pthread_cond_t cond; + + int32_t max_count; /* configured maximum */ + int32_t curr_count; /* actual number of threads running */ + int32_t sleep_count; + + int32_t idle_time; /* in seconds */ + + struct list_head clients[GF_FOP_PRI_MAX]; + /* + * It turns out that there are several ways a frame can get to us + * without having an associated client (server_first_lookup was the + * first one I hit). Instead of trying to update all such callers, + * we use this to queue them. + */ + iot_client_ctx_t no_client[GF_FOP_PRI_MAX]; + + int32_t ac_iot_limit[GF_FOP_PRI_MAX]; + int32_t ac_iot_count[GF_FOP_PRI_MAX]; + int queue_sizes[GF_FOP_PRI_MAX]; + int32_t queue_size; + gf_atomic_t stub_cnt; + pthread_attr_t w_attr; + gf_boolean_t least_priority; /*Enable/Disable least-priority */ + + xlator_t *this; + size_t stack_size; + gf_boolean_t down; /*PARENT_DOWN event is notified*/ + gf_boolean_t mutex_inited; + gf_boolean_t cond_inited; + + int32_t watchdog_secs; + gf_boolean_t watchdog_running; + pthread_t watchdog_thread; + gf_boolean_t queue_marked[GF_FOP_PRI_MAX]; + gf_boolean_t cleanup_disconnected_reqs; }; typedef struct iot_conf iot_conf_t; -typedef struct iot_worker iot_worker_t; -typedef struct iot_request iot_request_t; #endif /* __IOT_H */ diff --git a/xlators/performance/io-threads/src/iot-mem-types.h b/xlators/performance/io-threads/src/iot-mem-types.h new file mode 100644 index 00000000000..29565f34dd4 --- /dev/null +++ b/xlators/performance/io-threads/src/iot-mem-types.h @@ -0,0 +1,21 @@ +/* + Copyright (c) 2008-2012 Red Hat, Inc. <http://www.redhat.com> + This file is part of GlusterFS. + + This file is licensed to you under your choice of the GNU Lesser + General Public License, version 3 or any later version (LGPLv3 or + later), or the GNU General Public License, version 2 (GPLv2), in all + cases as published by the Free Software Foundation. +*/ + +#ifndef __IOT_MEM_TYPES_H__ +#define __IOT_MEM_TYPES_H__ + +#include <glusterfs/mem-types.h> + +enum gf_iot_mem_types_ { + gf_iot_mt_iot_conf_t = gf_common_mt_end + 1, + gf_iot_mt_client_ctx_t, + gf_iot_mt_end +}; +#endif |
