From dddcf52020004d98f688ebef968de51d76cbf9a6 Mon Sep 17 00:00:00 2001 From: Xavi Hernandez Date: Thu, 24 Jan 2019 18:44:06 +0100 Subject: core: implement a global thread pool This patch implements a thread pool that is wait-free for adding jobs to the queue and uses a very small locked region to get jobs. This makes it possible to decrease contention drastically. It's based on wfcqueue structure provided by urcu library. It automatically enables more threads when load demands it, and stops them when not needed. There's a maximum number of threads that can be used. This value can be configured. Depending on the workload, the maximum number of threads plays an important role. So it needs to be configured for optimal performance. Currently the thread pool doesn't self adjust the maximum for the workload, so this configuration needs to be changed manually. For this reason, the global thread pool has been made optional, so that volumes can still use the thread pool provided by io-threads. To enable it for bricks, the following option needs to be set: config.global-threading = on This option has no effect if bricks are already running. A restart is required to activate it. It's recommended to also enable the following option when running bricks with the global thread pool: performance.iot-pass-through = on To enable it for a FUSE mount point, the option '--global-threading' must be added to the mount command. To change it, an umount and remount is needed. It's recommended to disable the following option when using global threading on a mount point: performance.client-io-threads = off To enable it for services managed by glusterd, glusterd needs to be started with option '--global-threading'. In this case all daemons, like self-heal, will be using the global thread pool. Currently it can only be enabled for bricks, FUSE mounts and glusterd services. The maximum number of threads for clients and bricks can be configured using the following options: config.client-threads config.brick-threads These options can be applied online and its effect is immediate most of the times. If one of them is set to 0, the maximum number of threads will be calcutated as #cores * 2. Some distributions use a very old userspace-rcu library (version 0.7) for this reason, some header files from version 0.10 have been copied into contrib/userspace-rcu and are used if the detected version is 0.7 or older. An additional change has been made to io-threads to prevent that threads are started when iot-pass-through is set. Change-Id: I09d19e246b9e6d53c6247b29dfca6af6ee00a24b updates: #532 Signed-off-by: Xavi Hernandez --- xlators/debug/io-stats/src/io-stats.c | 41 ++++++++++++- xlators/mgmt/glusterd/src/glusterd-svc-mgmt.c | 4 ++ xlators/mgmt/glusterd/src/glusterd-utils.c | 11 ++++ xlators/mgmt/glusterd/src/glusterd-volgen.c | 31 ++++++++++ xlators/mgmt/glusterd/src/glusterd-volgen.h | 3 + xlators/mgmt/glusterd/src/glusterd-volume-set.c | 15 +++++ xlators/mount/fuse/src/fuse-bridge.c | 79 ++++++++++++++++++------- xlators/mount/fuse/utils/mount.glusterfs.in | 7 +++ xlators/performance/io-threads/src/io-threads.c | 12 ++-- 9 files changed, 176 insertions(+), 27 deletions(-) (limited to 'xlators') diff --git a/xlators/debug/io-stats/src/io-stats.c b/xlators/debug/io-stats/src/io-stats.c index f12191fb8df..101e403d39a 100644 --- a/xlators/debug/io-stats/src/io-stats.c +++ b/xlators/debug/io-stats/src/io-stats.c @@ -40,6 +40,7 @@ #include #include #include +#include #define MAX_LIST_MEMBERS 100 #define DEFAULT_PWD_BUF_SZ 16384 @@ -3737,6 +3738,7 @@ reconfigure(xlator_t *this, dict_t *options) uint32_t log_buf_size = 0; uint32_t log_flush_timeout = 0; int32_t old_dump_interval; + int32_t threads; if (!this || !this->private) goto out; @@ -3809,6 +3811,9 @@ reconfigure(xlator_t *this, dict_t *options) out); gf_log_set_log_flush_timeout(log_flush_timeout); + GF_OPTION_RECONF("threads", threads, options, int32, out); + gf_async_adjust_threads(threads); + ret = 0; out: gf_log(this ? this->name : "io-stats", GF_LOG_DEBUG, @@ -3888,6 +3893,7 @@ init(xlator_t *this) int ret = -1; uint32_t log_buf_size = 0; uint32_t log_flush_timeout = 0; + int32_t threads; if (!this) return -1; @@ -3951,6 +3957,7 @@ init(xlator_t *this) gf_log(this->name, GF_LOG_ERROR, "Out of memory."); goto out; } + ret = -1; GF_OPTION_INIT("ios-dnscache-ttl-sec", conf->ios_dnscache_ttl_sec, int32, out); @@ -3987,6 +3994,9 @@ init(xlator_t *this) GF_OPTION_INIT("log-flush-timeout", log_flush_timeout, time, out); gf_log_set_log_flush_timeout(log_flush_timeout); + GF_OPTION_INIT("threads", threads, int32, out); + gf_async_adjust_threads(threads); + this->private = conf; if (conf->ios_dump_interval > 0) { conf->dump_thread_running = _gf_true; @@ -4430,8 +4440,37 @@ struct volume_options options[] = { .type = GF_OPTION_TYPE_STR, .default_value = "/no/such/path", .description = "Unique ID for our files."}, + {.key = {"global-threading"}, + .type = GF_OPTION_TYPE_BOOL, + .default_value = "off", + .op_version = {GD_OP_VERSION_6_0}, + .flags = OPT_FLAG_SETTABLE, + .tags = {"io-stats", "threading"}, + .description = "This option enables the global threading support for " + "bricks. If enabled, it's recommended to also enable " + "'performance.iot-pass-through'"}, + {.key = {"threads"}, .type = GF_OPTION_TYPE_INT}, + {.key = {"brick-threads"}, + .type = GF_OPTION_TYPE_INT, + .default_value = "16", + .min = 0, + .max = GF_ASYNC_MAX_THREADS, + .op_version = {GD_OP_VERSION_6_0}, + .flags = OPT_FLAG_SETTABLE | OPT_FLAG_DOC, + .tags = {"io-stats", "threading"}, + .description = "When global threading is used, this value determines the " + "maximum amount of threads that can be created on bricks"}, + {.key = {"client-threads"}, + .type = GF_OPTION_TYPE_INT, + .default_value = "16", + .min = 0, + .max = GF_ASYNC_MAX_THREADS, + .op_version = {GD_OP_VERSION_6_0}, + .flags = OPT_FLAG_SETTABLE | OPT_FLAG_DOC | OPT_FLAG_CLIENT_OPT, + .tags = {"io-stats", "threading"}, + .description = "When global threading is used, this value determines the " + "maximum amount of threads that can be created on clients"}, {.key = {NULL}}, - }; xlator_api_t xlator_api = { diff --git a/xlators/mgmt/glusterd/src/glusterd-svc-mgmt.c b/xlators/mgmt/glusterd/src/glusterd-svc-mgmt.c index 4cd4cea15e4..6325f60f94a 100644 --- a/xlators/mgmt/glusterd/src/glusterd-svc-mgmt.c +++ b/xlators/mgmt/glusterd/src/glusterd-svc-mgmt.c @@ -213,6 +213,10 @@ glusterd_svc_start(glusterd_svc_t *svc, int flags, dict_t *cmdline) runner_add_arg(&runner, daemon_log_level); } + if (this->ctx->cmd_args.global_threading) { + runner_add_arg(&runner, "--global-threading"); + } + if (cmdline) dict_foreach(cmdline, svc_add_args, (void *)&runner); diff --git a/xlators/mgmt/glusterd/src/glusterd-utils.c b/xlators/mgmt/glusterd/src/glusterd-utils.c index 1aa6947fbba..85a7884b51a 100644 --- a/xlators/mgmt/glusterd/src/glusterd-utils.c +++ b/xlators/mgmt/glusterd/src/glusterd-utils.c @@ -2045,6 +2045,8 @@ glusterd_volume_start_glusterfs(glusterd_volinfo_t *volinfo, int32_t len = 0; glusterd_brick_proc_t *brick_proc = NULL; char *inet_family = NULL; + char *global_threading = NULL; + bool threading = false; GF_ASSERT(volinfo); GF_ASSERT(brickinfo); @@ -2203,6 +2205,15 @@ retry: volinfo->volname, rdma_port); } + if (dict_get_strn(volinfo->dict, VKEY_CONFIG_GLOBAL_THREADING, + SLEN(VKEY_CONFIG_GLOBAL_THREADING), + &global_threading) == 0) { + if ((gf_string2boolean(global_threading, &threading) == 0) && + threading) { + runner_add_arg(&runner, "--global-threading"); + } + } + runner_add_arg(&runner, "--xlator-option"); runner_argprintf(&runner, "%s-server.listen-port=%d", volinfo->volname, port); diff --git a/xlators/mgmt/glusterd/src/glusterd-volgen.c b/xlators/mgmt/glusterd/src/glusterd-volgen.c index b7c7bd9b638..448dd8669a1 100644 --- a/xlators/mgmt/glusterd/src/glusterd-volgen.c +++ b/xlators/mgmt/glusterd/src/glusterd-volgen.c @@ -1214,6 +1214,26 @@ loglevel_option_handler(volgen_graph_t *graph, struct volopt_map_entry *vme, return basic_option_handler(graph, &vme2, NULL); } +static int +threads_option_handler(volgen_graph_t *graph, struct volopt_map_entry *vme, + void *param) +{ + char *role = param; + struct volopt_map_entry vme2 = { + 0, + }; + + if ((strcmp(vme->option, "!client-threads") != 0 && + strcmp(vme->option, "!brick-threads") != 0) || + !strstr(vme->key, role)) + return 0; + + memcpy(&vme2, vme, sizeof(vme2)); + vme2.option = "threads"; + + return basic_option_handler(graph, &vme2, NULL); +} + static int server_check_changelog_off(volgen_graph_t *graph, struct volopt_map_entry *vme, glusterd_volinfo_t *volinfo) @@ -1506,6 +1526,9 @@ server_spec_option_handler(volgen_graph_t *graph, struct volopt_map_entry *vme, if (!ret) ret = log_localtime_logging_option_handler(graph, vme, "brick"); + if (!ret) + ret = threads_option_handler(graph, vme, "brick"); + return ret; } @@ -4085,6 +4108,14 @@ graph_set_generic_options(xlator_t *this, volgen_graph_t *graph, gf_msg(this->name, GF_LOG_WARNING, 0, GD_MSG_GRAPH_SET_OPT_FAIL, "Failed to change " "log-localtime-logging option"); + + ret = volgen_graph_set_options_generic(graph, set_dict, "client", + &threads_option_handler); + + if (ret) + gf_msg(this->name, GF_LOG_WARNING, 0, GD_MSG_GRAPH_SET_OPT_FAIL, + "changing %s threads failed", identifier); + return 0; } diff --git a/xlators/mgmt/glusterd/src/glusterd-volgen.h b/xlators/mgmt/glusterd/src/glusterd-volgen.h index f9fc068931b..37eecc04bef 100644 --- a/xlators/mgmt/glusterd/src/glusterd-volgen.h +++ b/xlators/mgmt/glusterd/src/glusterd-volgen.h @@ -38,6 +38,9 @@ #define VKEY_RDA_CACHE_LIMIT "performance.rda-cache-limit" #define VKEY_RDA_REQUEST_SIZE "performance.rda-request-size" #define VKEY_CONFIG_GFPROXY "config.gfproxyd" +#define VKEY_CONFIG_GLOBAL_THREADING "config.global-threading" +#define VKEY_CONFIG_CLIENT_THREADS "config.client-threads" +#define VKEY_CONFIG_BRICK_THREADS "config.brick-threads" #define AUTH_ALLOW_MAP_KEY "auth.allow" #define AUTH_REJECT_MAP_KEY "auth.reject" diff --git a/xlators/mgmt/glusterd/src/glusterd-volume-set.c b/xlators/mgmt/glusterd/src/glusterd-volume-set.c index b32b6ce0ec4..c8591cf0487 100644 --- a/xlators/mgmt/glusterd/src/glusterd-volume-set.c +++ b/xlators/mgmt/glusterd/src/glusterd-volume-set.c @@ -2961,4 +2961,19 @@ struct volopt_map_entry glusterd_volopt_map[] = { .validate_fn = validate_boolean, .description = "option to enforce mandatory lock on a file", .flags = VOLOPT_FLAG_XLATOR_OPT}, + {.key = VKEY_CONFIG_GLOBAL_THREADING, + .voltype = "debug/io-stats", + .option = "global-threading", + .value = "off", + .op_version = GD_OP_VERSION_6_0}, + {.key = VKEY_CONFIG_CLIENT_THREADS, + .voltype = "debug/io-stats", + .option = "!client-threads", + .value = "16", + .op_version = GD_OP_VERSION_6_0}, + {.key = VKEY_CONFIG_BRICK_THREADS, + .voltype = "debug/io-stats", + .option = "!brick-threads", + .value = "16", + .op_version = GD_OP_VERSION_6_0}, {.key = NULL}}; diff --git a/xlators/mount/fuse/src/fuse-bridge.c b/xlators/mount/fuse/src/fuse-bridge.c index 3479d40ceeb..bd8bc114a32 100644 --- a/xlators/mount/fuse/src/fuse-bridge.c +++ b/xlators/mount/fuse/src/fuse-bridge.c @@ -16,10 +16,17 @@ #include #include #include +#include #ifdef __NetBSD__ #undef open /* in perfuse.h, pulled from mount-gluster-compat.h */ #endif +typedef struct _fuse_async { + struct iobuf *iobuf; + fuse_in_header_t *finh; + void *msg; + gf_async_t async; +} fuse_async_t; static int gf_fuse_xattr_enotsup_log; @@ -5810,6 +5817,28 @@ fuse_get_mount_status(xlator_t *this) return kid_status; } +static void +fuse_dispatch(xlator_t *xl, gf_async_t *async) +{ + fuse_async_t *fasync; + fuse_private_t *priv; + fuse_in_header_t *finh; + struct iobuf *iobuf; + + priv = xl->private; + fasync = caa_container_of(async, fuse_async_t, async); + finh = fasync->finh; + iobuf = fasync->iobuf; + + priv->fuse_ops[finh->opcode](xl, finh, fasync->msg, iobuf); + + iobuf_unref(iobuf); +} + +/* We need 512 extra buffer size for BATCH_FORGET fop. By tests, it is + * found to be reduces 'REALLOC()' in the loop */ +#define FUSE_EXTRA_ALLOC 512 + static void * fuse_thread_proc(void *data) { @@ -5821,24 +5850,20 @@ fuse_thread_proc(void *data) fuse_in_header_t *finh = NULL; struct iovec iov_in[2]; void *msg = NULL; - /* we need 512 extra buffer size for BATCH_FORGET fop. By tests, it is - found to be reduces 'REALLOC()' in the loop */ - const size_t msg0_size = sizeof(*finh) + 512; - fuse_handler_t **fuse_ops = NULL; + size_t msg0_size = sizeof(*finh) + sizeof(struct fuse_write_in); + fuse_async_t *fasync; struct pollfd pfd[2] = {{ 0, }}; + uint32_t psize; this = data; priv = this->private; - fuse_ops = priv->fuse_ops; THIS = this; - iov_in[0].iov_len = sizeof(*finh) + sizeof(struct fuse_write_in); - iov_in[1].iov_len = ((struct iobuf_pool *)this->ctx->iobuf_pool) - ->default_page_size; - priv->msg0_len_p = &iov_in[0].iov_len; + psize = ((struct iobuf_pool *)this->ctx->iobuf_pool)->default_page_size; + priv->msg0_len_p = &msg0_size; for (;;) { /* THIS has to be reset here */ @@ -5895,14 +5920,15 @@ fuse_thread_proc(void *data) changing this one too */ iobuf = iobuf_get(this->ctx->iobuf_pool); - /* Add extra 128 byte to the first iov so that it can + /* Add extra 512 byte to the first iov so that it can * accommodate "ordinary" non-write requests. It's not * guaranteed to be big enough, as SETXATTR and namespace * operations with very long names may grow behind it, * but it's good enough in most cases (and we can handle - * rest via realloc). - */ - iov_in[0].iov_base = GF_CALLOC(1, msg0_size, gf_fuse_mt_iov_base); + * rest via realloc). */ + iov_in[0].iov_base = GF_MALLOC( + sizeof(fuse_async_t) + msg0_size + FUSE_EXTRA_ALLOC, + gf_fuse_mt_iov_base); if (!iobuf || !iov_in[0].iov_base) { gf_log(this->name, GF_LOG_ERROR, "Out of memory"); @@ -5915,6 +5941,9 @@ fuse_thread_proc(void *data) iov_in[1].iov_base = iobuf->ptr; + iov_in[0].iov_len = msg0_size; + iov_in[1].iov_len = psize; + res = sys_readv(priv->fd, iov_in, 2); if (res == -1) { @@ -5941,7 +5970,7 @@ fuse_thread_proc(void *data) goto cont_err; } - if (res < sizeof(finh)) { + if (res < sizeof(*finh)) { gf_log("glusterfs-fuse", GF_LOG_WARNING, "short read on /dev/fuse"); fuse_log_eh(this, "glusterfs-fuse: short read on " @@ -5983,8 +6012,9 @@ fuse_thread_proc(void *data) if (finh->opcode == FUSE_WRITE) msg = iov_in[1].iov_base; else { - if (res > msg0_size) { - void *b = GF_REALLOC(iov_in[0].iov_base, res); + if (res > msg0_size + FUSE_EXTRA_ALLOC) { + void *b = GF_REALLOC(iov_in[0].iov_base, + sizeof(fuse_async_t) + res); if (b) { iov_in[0].iov_base = b; finh = (fuse_in_header_t *)iov_in[0].iov_base; @@ -5996,22 +6026,29 @@ fuse_thread_proc(void *data) } } - if (res > iov_in[0].iov_len) + if (res > iov_in[0].iov_len) { memcpy(iov_in[0].iov_base + iov_in[0].iov_len, iov_in[1].iov_base, res - iov_in[0].iov_len); + iov_in[0].iov_len = res; + } msg = finh + 1; } if (priv->uid_map_root && finh->uid == priv->uid_map_root) finh->uid = 0; - if (finh->opcode >= FUSE_OP_HIGH) + if (finh->opcode >= FUSE_OP_HIGH) { /* turn down MacFUSE specific messages */ fuse_enosys(this, finh, msg, NULL); - else - fuse_ops[finh->opcode](this, finh, msg, iobuf); + iobuf_unref(iobuf); + } else { + fasync = iov_in[0].iov_base + iov_in[0].iov_len; + fasync->finh = finh; + fasync->msg = msg; + fasync->iobuf = iobuf; + gf_async(&fasync->async, this, fuse_dispatch); + } - iobuf_unref(iobuf); continue; cont_err: diff --git a/xlators/mount/fuse/utils/mount.glusterfs.in b/xlators/mount/fuse/utils/mount.glusterfs.in index 3f5d76d2e93..243c9c71af4 100755 --- a/xlators/mount/fuse/utils/mount.glusterfs.in +++ b/xlators/mount/fuse/utils/mount.glusterfs.in @@ -189,6 +189,10 @@ start_glusterfs () cmd_line=$(echo "$cmd_line --thin-client"); fi + if [ -n "$global_threading" ]; then + cmd_line=$(echo "$cmd_line --global-threading"); + fi + #options with values start here if [ -n "$halo_max_latency" ]; then cmd_line=$(echo "$cmd_line --xlator-option \ @@ -629,6 +633,9 @@ without_options() # "mount -t glusterfs" sends this, but it's useless. "rw") ;; + "global-threading") + global_threading=1 + ;; # TODO: not sure how to handle this yet "async"|"sync"|"dirsync"|\ "mand"|"nomand"|\ diff --git a/xlators/performance/io-threads/src/io-threads.c b/xlators/performance/io-threads/src/io-threads.c index bf75015eda8..9a4c728ae02 100644 --- a/xlators/performance/io-threads/src/io-threads.c +++ b/xlators/performance/io-threads/src/io-threads.c @@ -1255,12 +1255,14 @@ init(xlator_t *this) INIT_LIST_HEAD(&conf->no_client[i].reqs); } - ret = iot_workers_scale(conf); + if (!this->pass_through) { + ret = iot_workers_scale(conf); - if (ret == -1) { - gf_msg(this->name, GF_LOG_ERROR, 0, IO_THREADS_MSG_INIT_FAILED, - "cannot initialize worker threads, exiting init"); - goto out; + if (ret == -1) { + gf_msg(this->name, GF_LOG_ERROR, 0, IO_THREADS_MSG_INIT_FAILED, + "cannot initialize worker threads, exiting init"); + goto out; + } } this->private = conf; -- cgit