summaryrefslogtreecommitdiffstats
path: root/xlators
diff options
context:
space:
mode:
authorXavi Hernandez <xhernandez@redhat.com>2019-01-24 18:44:06 +0100
committerAmar Tumballi <amarts@redhat.com>2019-02-18 02:58:24 +0000
commitdddcf52020004d98f688ebef968de51d76cbf9a6 (patch)
tree01ee4c39a7859a76562e15aa7045c5bd86417a60 /xlators
parentec273a46820ba17f46488c082c65cd1aa6739be3 (diff)
core: implement a global thread pool
This patch implements a thread pool that is wait-free for adding jobs to the queue and uses a very small locked region to get jobs. This makes it possible to decrease contention drastically. It's based on wfcqueue structure provided by urcu library. It automatically enables more threads when load demands it, and stops them when not needed. There's a maximum number of threads that can be used. This value can be configured. Depending on the workload, the maximum number of threads plays an important role. So it needs to be configured for optimal performance. Currently the thread pool doesn't self adjust the maximum for the workload, so this configuration needs to be changed manually. For this reason, the global thread pool has been made optional, so that volumes can still use the thread pool provided by io-threads. To enable it for bricks, the following option needs to be set: config.global-threading = on This option has no effect if bricks are already running. A restart is required to activate it. It's recommended to also enable the following option when running bricks with the global thread pool: performance.iot-pass-through = on To enable it for a FUSE mount point, the option '--global-threading' must be added to the mount command. To change it, an umount and remount is needed. It's recommended to disable the following option when using global threading on a mount point: performance.client-io-threads = off To enable it for services managed by glusterd, glusterd needs to be started with option '--global-threading'. In this case all daemons, like self-heal, will be using the global thread pool. Currently it can only be enabled for bricks, FUSE mounts and glusterd services. The maximum number of threads for clients and bricks can be configured using the following options: config.client-threads config.brick-threads These options can be applied online and its effect is immediate most of the times. If one of them is set to 0, the maximum number of threads will be calcutated as #cores * 2. Some distributions use a very old userspace-rcu library (version 0.7) for this reason, some header files from version 0.10 have been copied into contrib/userspace-rcu and are used if the detected version is 0.7 or older. An additional change has been made to io-threads to prevent that threads are started when iot-pass-through is set. Change-Id: I09d19e246b9e6d53c6247b29dfca6af6ee00a24b updates: #532 Signed-off-by: Xavi Hernandez <xhernandez@redhat.com>
Diffstat (limited to 'xlators')
-rw-r--r--xlators/debug/io-stats/src/io-stats.c41
-rw-r--r--xlators/mgmt/glusterd/src/glusterd-svc-mgmt.c4
-rw-r--r--xlators/mgmt/glusterd/src/glusterd-utils.c11
-rw-r--r--xlators/mgmt/glusterd/src/glusterd-volgen.c31
-rw-r--r--xlators/mgmt/glusterd/src/glusterd-volgen.h3
-rw-r--r--xlators/mgmt/glusterd/src/glusterd-volume-set.c15
-rw-r--r--xlators/mount/fuse/src/fuse-bridge.c79
-rwxr-xr-xxlators/mount/fuse/utils/mount.glusterfs.in7
-rw-r--r--xlators/performance/io-threads/src/io-threads.c12
9 files changed, 176 insertions, 27 deletions
diff --git a/xlators/debug/io-stats/src/io-stats.c b/xlators/debug/io-stats/src/io-stats.c
index f12191f..101e403 100644
--- a/xlators/debug/io-stats/src/io-stats.c
+++ b/xlators/debug/io-stats/src/io-stats.c
@@ -40,6 +40,7 @@
#include <pwd.h>
#include <grp.h>
#include <glusterfs/upcall-utils.h>
+#include <glusterfs/async.h>
#define MAX_LIST_MEMBERS 100
#define DEFAULT_PWD_BUF_SZ 16384
@@ -3737,6 +3738,7 @@ reconfigure(xlator_t *this, dict_t *options)
uint32_t log_buf_size = 0;
uint32_t log_flush_timeout = 0;
int32_t old_dump_interval;
+ int32_t threads;
if (!this || !this->private)
goto out;
@@ -3809,6 +3811,9 @@ reconfigure(xlator_t *this, dict_t *options)
out);
gf_log_set_log_flush_timeout(log_flush_timeout);
+ GF_OPTION_RECONF("threads", threads, options, int32, out);
+ gf_async_adjust_threads(threads);
+
ret = 0;
out:
gf_log(this ? this->name : "io-stats", GF_LOG_DEBUG,
@@ -3888,6 +3893,7 @@ init(xlator_t *this)
int ret = -1;
uint32_t log_buf_size = 0;
uint32_t log_flush_timeout = 0;
+ int32_t threads;
if (!this)
return -1;
@@ -3951,6 +3957,7 @@ init(xlator_t *this)
gf_log(this->name, GF_LOG_ERROR, "Out of memory.");
goto out;
}
+ ret = -1;
GF_OPTION_INIT("ios-dnscache-ttl-sec", conf->ios_dnscache_ttl_sec, int32,
out);
@@ -3987,6 +3994,9 @@ init(xlator_t *this)
GF_OPTION_INIT("log-flush-timeout", log_flush_timeout, time, out);
gf_log_set_log_flush_timeout(log_flush_timeout);
+ GF_OPTION_INIT("threads", threads, int32, out);
+ gf_async_adjust_threads(threads);
+
this->private = conf;
if (conf->ios_dump_interval > 0) {
conf->dump_thread_running = _gf_true;
@@ -4430,8 +4440,37 @@ struct volume_options options[] = {
.type = GF_OPTION_TYPE_STR,
.default_value = "/no/such/path",
.description = "Unique ID for our files."},
+ {.key = {"global-threading"},
+ .type = GF_OPTION_TYPE_BOOL,
+ .default_value = "off",
+ .op_version = {GD_OP_VERSION_6_0},
+ .flags = OPT_FLAG_SETTABLE,
+ .tags = {"io-stats", "threading"},
+ .description = "This option enables the global threading support for "
+ "bricks. If enabled, it's recommended to also enable "
+ "'performance.iot-pass-through'"},
+ {.key = {"threads"}, .type = GF_OPTION_TYPE_INT},
+ {.key = {"brick-threads"},
+ .type = GF_OPTION_TYPE_INT,
+ .default_value = "16",
+ .min = 0,
+ .max = GF_ASYNC_MAX_THREADS,
+ .op_version = {GD_OP_VERSION_6_0},
+ .flags = OPT_FLAG_SETTABLE | OPT_FLAG_DOC,
+ .tags = {"io-stats", "threading"},
+ .description = "When global threading is used, this value determines the "
+ "maximum amount of threads that can be created on bricks"},
+ {.key = {"client-threads"},
+ .type = GF_OPTION_TYPE_INT,
+ .default_value = "16",
+ .min = 0,
+ .max = GF_ASYNC_MAX_THREADS,
+ .op_version = {GD_OP_VERSION_6_0},
+ .flags = OPT_FLAG_SETTABLE | OPT_FLAG_DOC | OPT_FLAG_CLIENT_OPT,
+ .tags = {"io-stats", "threading"},
+ .description = "When global threading is used, this value determines the "
+ "maximum amount of threads that can be created on clients"},
{.key = {NULL}},
-
};
xlator_api_t xlator_api = {
diff --git a/xlators/mgmt/glusterd/src/glusterd-svc-mgmt.c b/xlators/mgmt/glusterd/src/glusterd-svc-mgmt.c
index 4cd4cea..6325f60 100644
--- a/xlators/mgmt/glusterd/src/glusterd-svc-mgmt.c
+++ b/xlators/mgmt/glusterd/src/glusterd-svc-mgmt.c
@@ -213,6 +213,10 @@ glusterd_svc_start(glusterd_svc_t *svc, int flags, dict_t *cmdline)
runner_add_arg(&runner, daemon_log_level);
}
+ if (this->ctx->cmd_args.global_threading) {
+ runner_add_arg(&runner, "--global-threading");
+ }
+
if (cmdline)
dict_foreach(cmdline, svc_add_args, (void *)&runner);
diff --git a/xlators/mgmt/glusterd/src/glusterd-utils.c b/xlators/mgmt/glusterd/src/glusterd-utils.c
index 1aa6947..85a7884 100644
--- a/xlators/mgmt/glusterd/src/glusterd-utils.c
+++ b/xlators/mgmt/glusterd/src/glusterd-utils.c
@@ -2045,6 +2045,8 @@ glusterd_volume_start_glusterfs(glusterd_volinfo_t *volinfo,
int32_t len = 0;
glusterd_brick_proc_t *brick_proc = NULL;
char *inet_family = NULL;
+ char *global_threading = NULL;
+ bool threading = false;
GF_ASSERT(volinfo);
GF_ASSERT(brickinfo);
@@ -2203,6 +2205,15 @@ retry:
volinfo->volname, rdma_port);
}
+ if (dict_get_strn(volinfo->dict, VKEY_CONFIG_GLOBAL_THREADING,
+ SLEN(VKEY_CONFIG_GLOBAL_THREADING),
+ &global_threading) == 0) {
+ if ((gf_string2boolean(global_threading, &threading) == 0) &&
+ threading) {
+ runner_add_arg(&runner, "--global-threading");
+ }
+ }
+
runner_add_arg(&runner, "--xlator-option");
runner_argprintf(&runner, "%s-server.listen-port=%d", volinfo->volname,
port);
diff --git a/xlators/mgmt/glusterd/src/glusterd-volgen.c b/xlators/mgmt/glusterd/src/glusterd-volgen.c
index b7c7bd9..448dd86 100644
--- a/xlators/mgmt/glusterd/src/glusterd-volgen.c
+++ b/xlators/mgmt/glusterd/src/glusterd-volgen.c
@@ -1215,6 +1215,26 @@ loglevel_option_handler(volgen_graph_t *graph, struct volopt_map_entry *vme,
}
static int
+threads_option_handler(volgen_graph_t *graph, struct volopt_map_entry *vme,
+ void *param)
+{
+ char *role = param;
+ struct volopt_map_entry vme2 = {
+ 0,
+ };
+
+ if ((strcmp(vme->option, "!client-threads") != 0 &&
+ strcmp(vme->option, "!brick-threads") != 0) ||
+ !strstr(vme->key, role))
+ return 0;
+
+ memcpy(&vme2, vme, sizeof(vme2));
+ vme2.option = "threads";
+
+ return basic_option_handler(graph, &vme2, NULL);
+}
+
+static int
server_check_changelog_off(volgen_graph_t *graph, struct volopt_map_entry *vme,
glusterd_volinfo_t *volinfo)
{
@@ -1506,6 +1526,9 @@ server_spec_option_handler(volgen_graph_t *graph, struct volopt_map_entry *vme,
if (!ret)
ret = log_localtime_logging_option_handler(graph, vme, "brick");
+ if (!ret)
+ ret = threads_option_handler(graph, vme, "brick");
+
return ret;
}
@@ -4085,6 +4108,14 @@ graph_set_generic_options(xlator_t *this, volgen_graph_t *graph,
gf_msg(this->name, GF_LOG_WARNING, 0, GD_MSG_GRAPH_SET_OPT_FAIL,
"Failed to change "
"log-localtime-logging option");
+
+ ret = volgen_graph_set_options_generic(graph, set_dict, "client",
+ &threads_option_handler);
+
+ if (ret)
+ gf_msg(this->name, GF_LOG_WARNING, 0, GD_MSG_GRAPH_SET_OPT_FAIL,
+ "changing %s threads failed", identifier);
+
return 0;
}
diff --git a/xlators/mgmt/glusterd/src/glusterd-volgen.h b/xlators/mgmt/glusterd/src/glusterd-volgen.h
index f9fc068..37eecc0 100644
--- a/xlators/mgmt/glusterd/src/glusterd-volgen.h
+++ b/xlators/mgmt/glusterd/src/glusterd-volgen.h
@@ -38,6 +38,9 @@
#define VKEY_RDA_CACHE_LIMIT "performance.rda-cache-limit"
#define VKEY_RDA_REQUEST_SIZE "performance.rda-request-size"
#define VKEY_CONFIG_GFPROXY "config.gfproxyd"
+#define VKEY_CONFIG_GLOBAL_THREADING "config.global-threading"
+#define VKEY_CONFIG_CLIENT_THREADS "config.client-threads"
+#define VKEY_CONFIG_BRICK_THREADS "config.brick-threads"
#define AUTH_ALLOW_MAP_KEY "auth.allow"
#define AUTH_REJECT_MAP_KEY "auth.reject"
diff --git a/xlators/mgmt/glusterd/src/glusterd-volume-set.c b/xlators/mgmt/glusterd/src/glusterd-volume-set.c
index b32b6ce..c8591cf 100644
--- a/xlators/mgmt/glusterd/src/glusterd-volume-set.c
+++ b/xlators/mgmt/glusterd/src/glusterd-volume-set.c
@@ -2961,4 +2961,19 @@ struct volopt_map_entry glusterd_volopt_map[] = {
.validate_fn = validate_boolean,
.description = "option to enforce mandatory lock on a file",
.flags = VOLOPT_FLAG_XLATOR_OPT},
+ {.key = VKEY_CONFIG_GLOBAL_THREADING,
+ .voltype = "debug/io-stats",
+ .option = "global-threading",
+ .value = "off",
+ .op_version = GD_OP_VERSION_6_0},
+ {.key = VKEY_CONFIG_CLIENT_THREADS,
+ .voltype = "debug/io-stats",
+ .option = "!client-threads",
+ .value = "16",
+ .op_version = GD_OP_VERSION_6_0},
+ {.key = VKEY_CONFIG_BRICK_THREADS,
+ .voltype = "debug/io-stats",
+ .option = "!brick-threads",
+ .value = "16",
+ .op_version = GD_OP_VERSION_6_0},
{.key = NULL}};
diff --git a/xlators/mount/fuse/src/fuse-bridge.c b/xlators/mount/fuse/src/fuse-bridge.c
index 3479d40..bd8bc11 100644
--- a/xlators/mount/fuse/src/fuse-bridge.c
+++ b/xlators/mount/fuse/src/fuse-bridge.c
@@ -16,10 +16,17 @@
#include <glusterfs/glusterfs-acl.h>
#include <glusterfs/syscall.h>
#include <glusterfs/timespec.h>
+#include <glusterfs/async.h>
#ifdef __NetBSD__
#undef open /* in perfuse.h, pulled from mount-gluster-compat.h */
#endif
+typedef struct _fuse_async {
+ struct iobuf *iobuf;
+ fuse_in_header_t *finh;
+ void *msg;
+ gf_async_t async;
+} fuse_async_t;
static int gf_fuse_xattr_enotsup_log;
@@ -5810,6 +5817,28 @@ fuse_get_mount_status(xlator_t *this)
return kid_status;
}
+static void
+fuse_dispatch(xlator_t *xl, gf_async_t *async)
+{
+ fuse_async_t *fasync;
+ fuse_private_t *priv;
+ fuse_in_header_t *finh;
+ struct iobuf *iobuf;
+
+ priv = xl->private;
+ fasync = caa_container_of(async, fuse_async_t, async);
+ finh = fasync->finh;
+ iobuf = fasync->iobuf;
+
+ priv->fuse_ops[finh->opcode](xl, finh, fasync->msg, iobuf);
+
+ iobuf_unref(iobuf);
+}
+
+/* We need 512 extra buffer size for BATCH_FORGET fop. By tests, it is
+ * found to be reduces 'REALLOC()' in the loop */
+#define FUSE_EXTRA_ALLOC 512
+
static void *
fuse_thread_proc(void *data)
{
@@ -5821,24 +5850,20 @@ fuse_thread_proc(void *data)
fuse_in_header_t *finh = NULL;
struct iovec iov_in[2];
void *msg = NULL;
- /* we need 512 extra buffer size for BATCH_FORGET fop. By tests, it is
- found to be reduces 'REALLOC()' in the loop */
- const size_t msg0_size = sizeof(*finh) + 512;
- fuse_handler_t **fuse_ops = NULL;
+ size_t msg0_size = sizeof(*finh) + sizeof(struct fuse_write_in);
+ fuse_async_t *fasync;
struct pollfd pfd[2] = {{
0,
}};
+ uint32_t psize;
this = data;
priv = this->private;
- fuse_ops = priv->fuse_ops;
THIS = this;
- iov_in[0].iov_len = sizeof(*finh) + sizeof(struct fuse_write_in);
- iov_in[1].iov_len = ((struct iobuf_pool *)this->ctx->iobuf_pool)
- ->default_page_size;
- priv->msg0_len_p = &iov_in[0].iov_len;
+ psize = ((struct iobuf_pool *)this->ctx->iobuf_pool)->default_page_size;
+ priv->msg0_len_p = &msg0_size;
for (;;) {
/* THIS has to be reset here */
@@ -5895,14 +5920,15 @@ fuse_thread_proc(void *data)
changing this one too */
iobuf = iobuf_get(this->ctx->iobuf_pool);
- /* Add extra 128 byte to the first iov so that it can
+ /* Add extra 512 byte to the first iov so that it can
* accommodate "ordinary" non-write requests. It's not
* guaranteed to be big enough, as SETXATTR and namespace
* operations with very long names may grow behind it,
* but it's good enough in most cases (and we can handle
- * rest via realloc).
- */
- iov_in[0].iov_base = GF_CALLOC(1, msg0_size, gf_fuse_mt_iov_base);
+ * rest via realloc). */
+ iov_in[0].iov_base = GF_MALLOC(
+ sizeof(fuse_async_t) + msg0_size + FUSE_EXTRA_ALLOC,
+ gf_fuse_mt_iov_base);
if (!iobuf || !iov_in[0].iov_base) {
gf_log(this->name, GF_LOG_ERROR, "Out of memory");
@@ -5915,6 +5941,9 @@ fuse_thread_proc(void *data)
iov_in[1].iov_base = iobuf->ptr;
+ iov_in[0].iov_len = msg0_size;
+ iov_in[1].iov_len = psize;
+
res = sys_readv(priv->fd, iov_in, 2);
if (res == -1) {
@@ -5941,7 +5970,7 @@ fuse_thread_proc(void *data)
goto cont_err;
}
- if (res < sizeof(finh)) {
+ if (res < sizeof(*finh)) {
gf_log("glusterfs-fuse", GF_LOG_WARNING, "short read on /dev/fuse");
fuse_log_eh(this,
"glusterfs-fuse: short read on "
@@ -5983,8 +6012,9 @@ fuse_thread_proc(void *data)
if (finh->opcode == FUSE_WRITE)
msg = iov_in[1].iov_base;
else {
- if (res > msg0_size) {
- void *b = GF_REALLOC(iov_in[0].iov_base, res);
+ if (res > msg0_size + FUSE_EXTRA_ALLOC) {
+ void *b = GF_REALLOC(iov_in[0].iov_base,
+ sizeof(fuse_async_t) + res);
if (b) {
iov_in[0].iov_base = b;
finh = (fuse_in_header_t *)iov_in[0].iov_base;
@@ -5996,22 +6026,29 @@ fuse_thread_proc(void *data)
}
}
- if (res > iov_in[0].iov_len)
+ if (res > iov_in[0].iov_len) {
memcpy(iov_in[0].iov_base + iov_in[0].iov_len,
iov_in[1].iov_base, res - iov_in[0].iov_len);
+ iov_in[0].iov_len = res;
+ }
msg = finh + 1;
}
if (priv->uid_map_root && finh->uid == priv->uid_map_root)
finh->uid = 0;
- if (finh->opcode >= FUSE_OP_HIGH)
+ if (finh->opcode >= FUSE_OP_HIGH) {
/* turn down MacFUSE specific messages */
fuse_enosys(this, finh, msg, NULL);
- else
- fuse_ops[finh->opcode](this, finh, msg, iobuf);
+ iobuf_unref(iobuf);
+ } else {
+ fasync = iov_in[0].iov_base + iov_in[0].iov_len;
+ fasync->finh = finh;
+ fasync->msg = msg;
+ fasync->iobuf = iobuf;
+ gf_async(&fasync->async, this, fuse_dispatch);
+ }
- iobuf_unref(iobuf);
continue;
cont_err:
diff --git a/xlators/mount/fuse/utils/mount.glusterfs.in b/xlators/mount/fuse/utils/mount.glusterfs.in
index 3f5d76d..243c9c7 100755
--- a/xlators/mount/fuse/utils/mount.glusterfs.in
+++ b/xlators/mount/fuse/utils/mount.glusterfs.in
@@ -189,6 +189,10 @@ start_glusterfs ()
cmd_line=$(echo "$cmd_line --thin-client");
fi
+ if [ -n "$global_threading" ]; then
+ cmd_line=$(echo "$cmd_line --global-threading");
+ fi
+
#options with values start here
if [ -n "$halo_max_latency" ]; then
cmd_line=$(echo "$cmd_line --xlator-option \
@@ -629,6 +633,9 @@ without_options()
# "mount -t glusterfs" sends this, but it's useless.
"rw")
;;
+ "global-threading")
+ global_threading=1
+ ;;
# TODO: not sure how to handle this yet
"async"|"sync"|"dirsync"|\
"mand"|"nomand"|\
diff --git a/xlators/performance/io-threads/src/io-threads.c b/xlators/performance/io-threads/src/io-threads.c
index bf75015..9a4c728 100644
--- a/xlators/performance/io-threads/src/io-threads.c
+++ b/xlators/performance/io-threads/src/io-threads.c
@@ -1255,12 +1255,14 @@ init(xlator_t *this)
INIT_LIST_HEAD(&conf->no_client[i].reqs);
}
- ret = iot_workers_scale(conf);
+ if (!this->pass_through) {
+ ret = iot_workers_scale(conf);
- if (ret == -1) {
- gf_msg(this->name, GF_LOG_ERROR, 0, IO_THREADS_MSG_INIT_FAILED,
- "cannot initialize worker threads, exiting init");
- goto out;
+ if (ret == -1) {
+ gf_msg(this->name, GF_LOG_ERROR, 0, IO_THREADS_MSG_INIT_FAILED,
+ "cannot initialize worker threads, exiting init");
+ goto out;
+ }
}
this->private = conf;