diff options
Diffstat (limited to 'xlators/performance/io-threads/src/io-threads.c')
| -rw-r--r-- | xlators/performance/io-threads/src/io-threads.c | 360 |
1 files changed, 323 insertions, 37 deletions
diff --git a/xlators/performance/io-threads/src/io-threads.c b/xlators/performance/io-threads/src/io-threads.c index c2d660d4f..bbcf4ed26 100644 --- a/xlators/performance/io-threads/src/io-threads.c +++ b/xlators/performance/io-threads/src/io-threads.c @@ -1,20 +1,11 @@ /* - Copyright (c) 2006-2011 Gluster, Inc. <http://www.gluster.com> + Copyright (c) 2008-2012 Red Hat, Inc. <http://www.redhat.com> This file is part of GlusterFS. - GlusterFS is free software; you can redistribute it and/or modify - it under the terms of the GNU General Public License as published - by the Free Software Foundation; either version 3 of the License, - or (at your option) any later version. - - GlusterFS is distributed in the hope that it will be useful, but - WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - General Public License for more details. - - You should have received a copy of the GNU General Public License - along with this program. If not, see - <http://www.gnu.org/licenses/>. + This file is licensed to you under your choice of the GNU Lesser + General Public License, version 3 or any later version (LGPLv3 or + later), or the GNU General Public License, version 2 (GPLv2), in all + cases as published by the Free Software Foundation. */ #ifndef _CONFIG_H @@ -39,16 +30,69 @@ int __iot_workers_scale (iot_conf_t *conf); struct volume_options options[]; call_stub_t * -__iot_dequeue (iot_conf_t *conf, int *pri) +__iot_dequeue (iot_conf_t *conf, int *pri, struct timespec *sleep) { call_stub_t *stub = NULL; int i = 0; + struct timeval curtv = {0,}, difftv = {0,}; *pri = -1; + sleep->tv_sec = 0; + sleep->tv_nsec = 0; for (i = 0; i < IOT_PRI_MAX; i++) { if (list_empty (&conf->reqs[i]) || (conf->ac_iot_count[i] >= conf->ac_iot_limit[i])) continue; + + if (i == IOT_PRI_LEAST) { + pthread_mutex_lock(&conf->throttle.lock); + if (!conf->throttle.sample_time.tv_sec) { + /* initialize */ + gettimeofday(&conf->throttle.sample_time, NULL); + } else { + /* + * Maintain a running count of least priority + * operations that are handled over a particular + * time interval. The count is provided via + * state dump and is used as a measure against + * least priority op throttling. + */ + gettimeofday(&curtv, NULL); + timersub(&curtv, &conf->throttle.sample_time, + &difftv); + if (difftv.tv_sec >= IOT_LEAST_THROTTLE_DELAY) { + conf->throttle.cached_rate = + conf->throttle.sample_cnt; + conf->throttle.sample_cnt = 0; + conf->throttle.sample_time = curtv; + } + + /* + * If we're over the configured rate limit, + * provide an absolute time to the caller that + * represents the soonest we're allowed to + * return another least priority request. + */ + if (conf->throttle.rate_limit && + conf->throttle.sample_cnt >= + conf->throttle.rate_limit) { + struct timeval delay; + delay.tv_sec = IOT_LEAST_THROTTLE_DELAY; + delay.tv_usec = 0; + + timeradd(&conf->throttle.sample_time, + &delay, &curtv); + TIMEVAL_TO_TIMESPEC(&curtv, sleep); + + pthread_mutex_unlock( + &conf->throttle.lock); + break; + } + } + conf->throttle.sample_cnt++; + pthread_mutex_unlock(&conf->throttle.lock); + } + stub = list_entry (conf->reqs[i].next, call_stub_t, list); conf->ac_iot_count[i]++; *pri = i; @@ -59,6 +103,7 @@ __iot_dequeue (iot_conf_t *conf, int *pri) return NULL; conf->queue_size--; + conf->queue_sizes[*pri]--; list_del_init (&stub->list); return stub; @@ -74,6 +119,7 @@ __iot_enqueue (iot_conf_t *conf, call_stub_t *stub, int pri) list_add_tail (&stub->list, &conf->reqs[pri]); conf->queue_size++; + conf->queue_sizes[pri]++; return; } @@ -90,6 +136,7 @@ iot_worker (void *data) int pri = -1; char timeout = 0; char bye = 0; + struct timespec sleep = {0,}; conf = data; this = conf->this; @@ -130,7 +177,13 @@ iot_worker (void *data) } } - stub = __iot_dequeue (conf, &pri); + stub = __iot_dequeue (conf, &pri, &sleep); + if (!stub && (sleep.tv_sec || sleep.tv_nsec)) { + pthread_cond_timedwait(&conf->cond, + &conf->mutex, &sleep); + pthread_mutex_unlock(&conf->mutex); + continue; + } } pthread_mutex_unlock (&conf->mutex); @@ -199,8 +252,9 @@ iot_schedule (call_frame_t *frame, xlator_t *this, call_stub_t *stub) { int ret = -1; iot_pri_t pri = IOT_PRI_MAX - 1; + iot_conf_t *conf = this->private; - if (frame->root->pid < 0) { + if ((frame->root->pid < GF_CLIENT_PID_MAX) && conf->least_priority) { pri = IOT_PRI_LEAST; goto out; } @@ -252,11 +306,11 @@ iot_schedule (call_frame_t *frame, xlator_t *this, call_stub_t *stub) case GF_FOP_FSYNCDIR: case GF_FOP_XATTROP: case GF_FOP_FXATTROP: - pri = IOT_PRI_LO; - break; - case GF_FOP_RCHECKSUM: - pri = IOT_PRI_LEAST; + case GF_FOP_FALLOCATE: + case GF_FOP_DISCARD: + case GF_FOP_ZEROFILL: + pri = IOT_PRI_LO; break; case GF_FOP_NULL: @@ -270,9 +324,9 @@ iot_schedule (call_frame_t *frame, xlator_t *this, call_stub_t *stub) break; } out: - ret = do_iot_schedule (this->private, stub, pri); gf_log (this->name, GF_LOG_DEBUG, "%s scheduled as %s fop", gf_fop_list[stub->fop], iot_get_pri_meaning (pri)); + ret = do_iot_schedule (this->private, stub, pri); return ret; } @@ -2355,24 +2409,172 @@ out: return 0; } +int +iot_fallocate_cbk(call_frame_t *frame, void *cookie, xlator_t *this, + int32_t op_ret, int32_t op_errno, + struct iatt *preop, struct iatt *postop, dict_t *xdata) +{ + STACK_UNWIND_STRICT (fallocate, frame, op_ret, op_errno, preop, postop, + xdata); + return 0; +} + + +int +iot_fallocate_wrapper(call_frame_t *frame, xlator_t *this, fd_t *fd, int32_t mode, + off_t offset, size_t len, dict_t *xdata) +{ + STACK_WIND (frame, iot_fallocate_cbk, FIRST_CHILD (this), + FIRST_CHILD (this)->fops->fallocate, fd, mode, offset, len, + xdata); + return 0; +} + + +int +iot_fallocate(call_frame_t *frame, xlator_t *this, fd_t *fd, int32_t mode, + off_t offset, size_t len, dict_t *xdata) +{ + call_stub_t *stub = NULL; + int ret = -1; + + stub = fop_fallocate_stub(frame, iot_fallocate_wrapper, fd, mode, offset, + len, xdata); + if (!stub) { + gf_log (this->name, GF_LOG_ERROR, "cannot create fallocate stub" + "(out of memory)"); + ret = -ENOMEM; + goto out; + } + + ret = iot_schedule (frame, this, stub); + +out: + if (ret < 0) { + STACK_UNWIND_STRICT (fallocate, frame, -1, -ret, NULL, NULL, + NULL); + if (stub != NULL) { + call_stub_destroy (stub); + } + } + return 0; +} + +int +iot_discard_cbk(call_frame_t *frame, void *cookie, xlator_t *this, + int32_t op_ret, int32_t op_errno, + struct iatt *preop, struct iatt *postop, dict_t *xdata) +{ + STACK_UNWIND_STRICT (discard, frame, op_ret, op_errno, preop, postop, + xdata); + return 0; +} + + +int +iot_discard_wrapper(call_frame_t *frame, xlator_t *this, fd_t *fd, off_t offset, + size_t len, dict_t *xdata) +{ + STACK_WIND (frame, iot_discard_cbk, FIRST_CHILD (this), + FIRST_CHILD (this)->fops->discard, fd, offset, len, xdata); + return 0; +} + + +int +iot_discard(call_frame_t *frame, xlator_t *this, fd_t *fd, off_t offset, + size_t len, dict_t *xdata) +{ + call_stub_t *stub = NULL; + int ret = -1; + + stub = fop_discard_stub(frame, iot_discard_wrapper, fd, offset, len, + xdata); + if (!stub) { + gf_log (this->name, GF_LOG_ERROR, "cannot create discard stub" + "(out of memory)"); + ret = -ENOMEM; + goto out; + } + + ret = iot_schedule (frame, this, stub); + +out: + if (ret < 0) { + STACK_UNWIND_STRICT (discard, frame, -1, -ret, NULL, NULL, + NULL); + if (stub != NULL) { + call_stub_destroy (stub); + } + } + return 0; +} + +int +iot_zerofill_cbk(call_frame_t *frame, void *cookie, xlator_t *this, + int32_t op_ret, int32_t op_errno, + struct iatt *preop, struct iatt *postop, dict_t *xdata) +{ + STACK_UNWIND_STRICT (zerofill, frame, op_ret, op_errno, preop, postop, + xdata); + return 0; +} + +int +iot_zerofill_wrapper(call_frame_t *frame, xlator_t *this, fd_t *fd, + off_t offset, size_t len, dict_t *xdata) +{ + STACK_WIND (frame, iot_zerofill_cbk, FIRST_CHILD (this), + FIRST_CHILD (this)->fops->zerofill, fd, offset, len, xdata); + return 0; +} + +int +iot_zerofill(call_frame_t *frame, xlator_t *this, fd_t *fd, off_t offset, + size_t len, dict_t *xdata) +{ + call_stub_t *stub = NULL; + int ret = -1; + + stub = fop_zerofill_stub(frame, iot_zerofill_wrapper, fd, + offset, len, xdata); + if (!stub) { + gf_log (this->name, GF_LOG_ERROR, "cannot create zerofill stub" + "(out of memory)"); + ret = -ENOMEM; + goto out; + } + + ret = iot_schedule (frame, this, stub); + +out: + if (ret < 0) { + STACK_UNWIND_STRICT (zerofill, frame, -1, -ret, NULL, NULL, + NULL); + if (stub != NULL) { + call_stub_destroy (stub); + } + } + return 0; +} + int __iot_workers_scale (iot_conf_t *conf) { - int log2 = 0; int scale = 0; int diff = 0; pthread_t thread; int ret = 0; + int i = 0; - log2 = log_base2 (conf->queue_size); - - scale = log2; + for (i = 0; i < IOT_PRI_MAX; i++) + scale += min (conf->queue_sizes[i], conf->ac_iot_limit[i]); - if (log2 < IOT_MIN_THREADS) + if (scale < IOT_MIN_THREADS) scale = IOT_MIN_THREADS; - if (log2 > conf->max_count) + if (scale > conf->max_count) scale = conf->max_count; if (conf->curr_count < scale) { @@ -2382,7 +2584,7 @@ __iot_workers_scale (iot_conf_t *conf) while (diff) { diff --; - ret = pthread_create (&thread, &conf->w_attr, iot_worker, conf); + ret = gf_thread_create (&thread, &conf->w_attr, iot_worker, conf); if (ret == 0) { conf->curr_count++; gf_log (conf->this->name, GF_LOG_DEBUG, @@ -2423,13 +2625,24 @@ set_stack_size (iot_conf_t *conf) { int err = 0; size_t stacksize = IOT_THREAD_STACK_SIZE; + xlator_t *this = NULL; + + this = THIS; pthread_attr_init (&conf->w_attr); err = pthread_attr_setstacksize (&conf->w_attr, stacksize); if (err == EINVAL) { - gf_log (conf->this->name, GF_LOG_WARNING, - "Using default thread stack size"); + err = pthread_attr_getstacksize (&conf->w_attr, &stacksize); + if (!err) + gf_log (this->name, GF_LOG_WARNING, + "Using default thread stack size %zd", + stacksize); + else + gf_log (this->name, GF_LOG_WARNING, + "Using default thread stack size"); } + + conf->stack_size = stacksize; } @@ -2452,6 +2665,44 @@ mem_acct_init (xlator_t *this) return ret; } +int +iot_priv_dump (xlator_t *this) +{ + iot_conf_t *conf = NULL; + char key_prefix[GF_DUMP_MAX_BUF_LEN]; + + if (!this) + return 0; + + conf = this->private; + if (!conf) + return 0; + + snprintf (key_prefix, GF_DUMP_MAX_BUF_LEN, "%s.%s", this->type, + this->name); + + gf_proc_dump_add_section(key_prefix); + + gf_proc_dump_write("maximum_threads_count", "%d", conf->max_count); + gf_proc_dump_write("current_threads_count", "%d", conf->curr_count); + gf_proc_dump_write("sleep_count", "%d", conf->sleep_count); + gf_proc_dump_write("idle_time", "%d", conf->idle_time); + gf_proc_dump_write("stack_size", "%zd", conf->stack_size); + gf_proc_dump_write("high_priority_threads", "%d", + conf->ac_iot_limit[IOT_PRI_HI]); + gf_proc_dump_write("normal_priority_threads", "%d", + conf->ac_iot_limit[IOT_PRI_NORMAL]); + gf_proc_dump_write("low_priority_threads", "%d", + conf->ac_iot_limit[IOT_PRI_LO]); + gf_proc_dump_write("least_priority_threads", "%d", + conf->ac_iot_limit[IOT_PRI_LEAST]); + + gf_proc_dump_write("cached least rate", "%u", + conf->throttle.cached_rate); + gf_proc_dump_write("least rate limit", "%u", conf->throttle.rate_limit); + + return 0; +} int reconfigure (xlator_t *this, dict_t *options) @@ -2478,6 +2729,11 @@ reconfigure (xlator_t *this, dict_t *options) GF_OPTION_RECONF ("least-prio-threads", conf->ac_iot_limit[IOT_PRI_LEAST], options, int32, out); + GF_OPTION_RECONF ("enable-least-priority", conf->least_priority, + options, bool, out); + + GF_OPTION_RECONF("least-rate-limit", conf->throttle.rate_limit, options, + int32, out); ret = 0; out: @@ -2488,9 +2744,9 @@ out: int init (xlator_t *this) { - iot_conf_t *conf = NULL; - int ret = -1; - int i = 0; + iot_conf_t *conf = NULL; + int ret = -1; + int i = 0; if (!this->children || this->children->next) { gf_log ("io-threads", GF_LOG_ERROR, @@ -2540,6 +2796,16 @@ init (xlator_t *this) conf->ac_iot_limit[IOT_PRI_LEAST], int32, out); GF_OPTION_INIT ("idle-time", conf->idle_time, int32, out); + GF_OPTION_INIT ("enable-least-priority", conf->least_priority, + bool, out); + + GF_OPTION_INIT("least-rate-limit", conf->throttle.rate_limit, int32, + out); + if ((ret = pthread_mutex_init(&conf->throttle.lock, NULL)) != 0) { + gf_log (this->name, GF_LOG_ERROR, + "pthread_mutex_init failed (%d)", ret); + goto out; + } conf->this = this; @@ -2552,13 +2818,15 @@ init (xlator_t *this) if (ret == -1) { gf_log (this->name, GF_LOG_ERROR, "cannot initialize worker threads, exiting init"); - GF_FREE (conf); goto out; } this->private = conf; ret = 0; out: + if (ret) + GF_FREE (conf); + return ret; } @@ -2574,6 +2842,9 @@ fini (xlator_t *this) return; } +struct xlator_dumpops dumpops = { + .priv = iot_priv_dump, +}; struct xlator_fops fops = { .open = iot_open, @@ -2617,10 +2888,12 @@ struct xlator_fops fops = { .xattrop = iot_xattrop, .fxattrop = iot_fxattrop, .rchecksum = iot_rchecksum, + .fallocate = iot_fallocate, + .discard = iot_discard, + .zerofill = iot_zerofill, }; -struct xlator_cbks cbks = { -}; +struct xlator_cbks cbks; struct volume_options options[] = { { .key = {"thread-count"}, @@ -2667,12 +2940,25 @@ struct volume_options options[] = { .description = "Max number of threads in IO threads translator which " "perform least priority IO operations at a given time" }, + { .key = {"enable-least-priority"}, + .type = GF_OPTION_TYPE_BOOL, + .default_value = "on", + .description = "Enable/Disable least priority" + }, {.key = {"idle-time"}, .type = GF_OPTION_TYPE_INT, .min = 1, .max = 0x7fffffff, .default_value = "120", }, + {.key = {"least-rate-limit"}, + .type = GF_OPTION_TYPE_INT, + .min = 0, + .max = INT_MAX, + .default_value = "0", + .description = "Max number of least priority operations to handle " + "per-second" + }, { .key = {NULL}, }, }; |
