summaryrefslogtreecommitdiffstats
path: root/rpc
diff options
context:
space:
mode:
authorXavi Hernandez <xhernandez@redhat.com>2019-01-24 18:44:06 +0100
committerAmar Tumballi <amarts@redhat.com>2019-02-18 02:58:24 +0000
commitdddcf52020004d98f688ebef968de51d76cbf9a6 (patch)
tree01ee4c39a7859a76562e15aa7045c5bd86417a60 /rpc
parentec273a46820ba17f46488c082c65cd1aa6739be3 (diff)
core: implement a global thread pool
This patch implements a thread pool that is wait-free for adding jobs to the queue and uses a very small locked region to get jobs. This makes it possible to decrease contention drastically. It's based on wfcqueue structure provided by urcu library. It automatically enables more threads when load demands it, and stops them when not needed. There's a maximum number of threads that can be used. This value can be configured. Depending on the workload, the maximum number of threads plays an important role. So it needs to be configured for optimal performance. Currently the thread pool doesn't self adjust the maximum for the workload, so this configuration needs to be changed manually. For this reason, the global thread pool has been made optional, so that volumes can still use the thread pool provided by io-threads. To enable it for bricks, the following option needs to be set: config.global-threading = on This option has no effect if bricks are already running. A restart is required to activate it. It's recommended to also enable the following option when running bricks with the global thread pool: performance.iot-pass-through = on To enable it for a FUSE mount point, the option '--global-threading' must be added to the mount command. To change it, an umount and remount is needed. It's recommended to disable the following option when using global threading on a mount point: performance.client-io-threads = off To enable it for services managed by glusterd, glusterd needs to be started with option '--global-threading'. In this case all daemons, like self-heal, will be using the global thread pool. Currently it can only be enabled for bricks, FUSE mounts and glusterd services. The maximum number of threads for clients and bricks can be configured using the following options: config.client-threads config.brick-threads These options can be applied online and its effect is immediate most of the times. If one of them is set to 0, the maximum number of threads will be calcutated as #cores * 2. Some distributions use a very old userspace-rcu library (version 0.7) for this reason, some header files from version 0.10 have been copied into contrib/userspace-rcu and are used if the detected version is 0.7 or older. An additional change has been made to io-threads to prevent that threads are started when iot-pass-through is set. Change-Id: I09d19e246b9e6d53c6247b29dfca6af6ee00a24b updates: #532 Signed-off-by: Xavi Hernandez <xhernandez@redhat.com>
Diffstat (limited to 'rpc')
-rw-r--r--rpc/rpc-lib/src/rpc-transport.c2
-rw-r--r--rpc/rpc-lib/src/rpc-transport.h23
-rw-r--r--rpc/rpc-lib/src/rpcsvc.c5
-rw-r--r--rpc/rpc-transport/socket/src/socket.c41
4 files changed, 49 insertions, 22 deletions
diff --git a/rpc/rpc-lib/src/rpc-transport.c b/rpc/rpc-lib/src/rpc-transport.c
index f9cbdf133c7..ce984426cbe 100644
--- a/rpc/rpc-lib/src/rpc-transport.c
+++ b/rpc/rpc-lib/src/rpc-transport.c
@@ -144,6 +144,8 @@ rpc_transport_pollin_alloc(rpc_transport_t *this, struct iovec *vector,
goto out;
}
+ msg->trans = this;
+
if (count > 1) {
msg->vectored = 1;
}
diff --git a/rpc/rpc-lib/src/rpc-transport.h b/rpc/rpc-lib/src/rpc-transport.h
index 9e75d1a2bbb..6830279f07e 100644
--- a/rpc/rpc-lib/src/rpc-transport.h
+++ b/rpc/rpc-lib/src/rpc-transport.h
@@ -58,6 +58,7 @@ typedef struct rpc_transport rpc_transport_t;
#include <glusterfs/dict.h>
#include <glusterfs/compat.h>
+#include <glusterfs/async.h>
#include "rpcsvc-common.h"
struct peer_info {
@@ -155,16 +156,6 @@ struct rpc_request_info {
};
typedef struct rpc_request_info rpc_request_info_t;
-struct rpc_transport_pollin {
- int count;
- void *private;
- struct iobref *iobref;
- struct iovec vector[MAX_IOVEC];
- char is_reply;
- char vectored;
-};
-typedef struct rpc_transport_pollin rpc_transport_pollin_t;
-
typedef int (*rpc_transport_notify_t)(rpc_transport_t *, void *mydata,
rpc_transport_event_t, void *data, ...);
@@ -217,6 +208,18 @@ struct rpc_transport {
gf_atomic_t disconnect_progress;
};
+struct rpc_transport_pollin {
+ struct rpc_transport *trans;
+ int count;
+ void *private;
+ struct iobref *iobref;
+ struct iovec vector[MAX_IOVEC];
+ char is_reply;
+ char vectored;
+ gf_async_t async;
+};
+typedef struct rpc_transport_pollin rpc_transport_pollin_t;
+
struct rpc_transport_ops {
/* no need of receive op, msg will be delivered through an event
* notification
diff --git a/rpc/rpc-lib/src/rpcsvc.c b/rpc/rpc-lib/src/rpcsvc.c
index 74373c44f91..c38a675b8c2 100644
--- a/rpc/rpc-lib/src/rpcsvc.c
+++ b/rpc/rpc-lib/src/rpcsvc.c
@@ -2314,6 +2314,11 @@ rpcsvc_program_register(rpcsvc_t *svc, rpcsvc_program_t *program,
newprog->alive = _gf_true;
+ if (gf_async_ctrl.enabled) {
+ newprog->ownthread = _gf_false;
+ newprog->synctask = _gf_false;
+ }
+
/* make sure synctask gets priority over ownthread */
if (newprog->synctask)
newprog->ownthread = _gf_false;
diff --git a/rpc/rpc-transport/socket/src/socket.c b/rpc/rpc-transport/socket/src/socket.c
index fa0e0f20901..26dbe0b706a 100644
--- a/rpc/rpc-transport/socket/src/socket.c
+++ b/rpc/rpc-transport/socket/src/socket.c
@@ -2495,6 +2495,33 @@ out:
return ret;
}
+static void
+socket_event_poll_in_async(xlator_t *xl, gf_async_t *async)
+{
+ rpc_transport_pollin_t *pollin;
+ rpc_transport_t *this;
+ socket_private_t *priv;
+
+ pollin = caa_container_of(async, rpc_transport_pollin_t, async);
+ this = pollin->trans;
+ priv = this->private;
+
+ rpc_transport_notify(this, RPC_TRANSPORT_MSG_RECEIVED, pollin);
+
+ rpc_transport_unref(this);
+
+ rpc_transport_pollin_destroy(pollin);
+
+ pthread_mutex_lock(&priv->notify.lock);
+ {
+ --priv->notify.in_progress;
+
+ if (!priv->notify.in_progress)
+ pthread_cond_signal(&priv->notify.cond);
+ }
+ pthread_mutex_unlock(&priv->notify.lock);
+}
+
static int
socket_event_poll_in(rpc_transport_t *this, gf_boolean_t notify_handled)
{
@@ -2519,18 +2546,8 @@ socket_event_poll_in(rpc_transport_t *this, gf_boolean_t notify_handled)
event_handled(ctx->event_pool, priv->sock, priv->idx, priv->gen);
if (pollin) {
- rpc_transport_notify(this, RPC_TRANSPORT_MSG_RECEIVED, pollin);
-
- rpc_transport_pollin_destroy(pollin);
-
- pthread_mutex_lock(&priv->notify.lock);
- {
- --priv->notify.in_progress;
-
- if (!priv->notify.in_progress)
- pthread_cond_signal(&priv->notify.cond);
- }
- pthread_mutex_unlock(&priv->notify.lock);
+ rpc_transport_ref(this);
+ gf_async(&pollin->async, THIS, socket_event_poll_in_async);
}
return ret;