summaryrefslogtreecommitdiffstats
path: root/rpc/rpc-lib/src/rpcsvc.c
diff options
context:
space:
mode:
authorRaghavendra Gowdappa <rgowdapp@redhat.com>2018-10-31 16:10:58 +0530
committerRaghavendra G <rgowdapp@redhat.com>2018-11-29 01:19:12 +0000
commit95e380eca19b9f0d03a53429535f15556e5724ad (patch)
treebe32fca4fbfa7d29e8571545af26e784d34e294c /rpc/rpc-lib/src/rpcsvc.c
parentf0232d07f7e6543b56830be28f6e80f9085e6241 (diff)
rpcsvc: provide each request handler thread its own queue
A single global per program queue is contended by all request handler threads and event threads. This can lead to high contention. So, reduce the contention by providing each request handler thread its own private queue. Thanks to "Manoj Pillai"<mpillai@redhat.com> for the idea of pairing a single queue with a fixed request-handler-thread and event-thread, which brought down the performance regression due to overhead of queuing significantly. Thanks to "Xavi Hernandez"<xhernandez@redhat.com> for discussion on how to communicate the event-thread death to request-handler-thread. Thanks to "Karan Sandha"<ksandha@redhat.com> for voluntarily running the perf benchmarks to qualify that performance regression introduced by ping-timer-fixes is fixed with this patch and patiently running many iterations of regression tests while RCAing the issue. Thanks to "Milind Changire"<mchangir@redhat.com> for patiently running the many iterations of perf benchmarking tests while RCAing the regression caused by ping-timer-expiry fixes. Change-Id: I578c3fc67713f4234bd3abbec5d3fbba19059ea5 Fixes: bz#1644629 Signed-off-by: Raghavendra Gowdappa <rgowdapp@redhat.com>
Diffstat (limited to 'rpc/rpc-lib/src/rpcsvc.c')
-rw-r--r--rpc/rpc-lib/src/rpcsvc.c412
1 files changed, 287 insertions, 125 deletions
diff --git a/rpc/rpc-lib/src/rpcsvc.c b/rpc/rpc-lib/src/rpcsvc.c
index c6545193a11..d678bca43a8 100644
--- a/rpc/rpc-lib/src/rpcsvc.c
+++ b/rpc/rpc-lib/src/rpcsvc.c
@@ -36,6 +36,7 @@
#include <fnmatch.h>
#include <stdarg.h>
#include <stdio.h>
+#include <math.h>
#ifdef IPV6_DEFAULT
#include <netconfig.h>
@@ -63,10 +64,76 @@ rpcsvc_get_listener(rpcsvc_t *svc, uint16_t port, rpc_transport_t *trans);
int
rpcsvc_notify(rpc_transport_t *trans, void *mydata, rpc_transport_event_t event,
void *data, ...);
+void *
+rpcsvc_request_handler(void *arg);
static int
rpcsvc_match_subnet_v4(const char *addrtok, const char *ipaddr);
+void
+rpcsvc_toggle_queue_status(rpcsvc_program_t *prog,
+ rpcsvc_request_queue_t *queue, char status[])
+{
+ int queue_index = 0, status_index = 0, set_bit = 0;
+
+ if (queue != &prog->request_queue[0]) {
+ queue_index = (queue - &prog->request_queue[0]);
+ }
+
+ status_index = queue_index / 8;
+ set_bit = queue_index % 8;
+
+ status[status_index] ^= (1 << set_bit);
+
+ return;
+}
+
+static int
+get_rightmost_set_bit(int n)
+{
+ return log2(n & -n);
+}
+
+int
+rpcsvc_get_free_queue_index(rpcsvc_program_t *prog)
+{
+ int queue_index = 0, max_index = 0, i = 0;
+ unsigned int right_most_unset_bit = 0;
+
+ right_most_unset_bit = 8;
+
+ max_index = gf_roof(EVENT_MAX_THREADS, 8) / 8;
+ for (i = 0; i < max_index; i++) {
+ if (prog->request_queue_status[i] == 0) {
+ right_most_unset_bit = 0;
+ break;
+ } else {
+ right_most_unset_bit = get_rightmost_set_bit(
+ ~prog->request_queue_status[i]);
+ if (right_most_unset_bit < 8) {
+ break;
+ }
+ }
+ }
+
+ if (right_most_unset_bit > 7) {
+ queue_index = -1;
+ } else {
+ queue_index = i * 8;
+ queue_index += right_most_unset_bit;
+
+ if (queue_index > EVENT_MAX_THREADS) {
+ queue_index = -1;
+ }
+ }
+
+ if (queue_index != -1) {
+ prog->request_queue_status[i] |= (0x1 << right_most_unset_bit);
+ }
+
+ return queue_index;
+}
+
rpcsvc_notify_wrapper_t *
rpcsvc_notify_wrapper_alloc(void)
{
@@ -575,6 +642,73 @@ rpcsvc_check_and_reply_error(int ret, call_frame_t *frame, void *opaque)
return 0;
}
+void
+rpcsvc_queue_event_thread_death(rpcsvc_t *svc, rpcsvc_program_t *prog, int gen)
+{
+ rpcsvc_request_queue_t *queue = NULL;
+ int num = 0;
+ void *value = NULL;
+ rpcsvc_request_t *req = NULL;
+ char empty = 0;
+
+ value = pthread_getspecific(prog->req_queue_key);
+ if (value == NULL) {
+ return;
+ }
+
+ num = ((unsigned long)value) - 1;
+
+ queue = &prog->request_queue[num];
+
+ if (queue->gen == gen) {
+ /* duplicate event */
+ gf_log(GF_RPCSVC, GF_LOG_INFO,
+ "not queuing duplicate event thread death. "
+ "queue %d program %s",
+ num, prog->progname);
+ return;
+ }
+
+ rpcsvc_alloc_request(svc, req);
+ req->prognum = RPCSVC_INFRA_PROGRAM;
+ req->procnum = RPCSVC_PROC_EVENT_THREAD_DEATH;
+ gf_log(GF_RPCSVC, GF_LOG_INFO,
+ "queuing event thread death request to queue %d of program %s", num,
+ prog->progname);
+
+ pthread_mutex_lock(&queue->queue_lock);
+ {
+ empty = list_empty(&queue->request_queue);
+
+ list_add_tail(&req->request_list, &queue->request_queue);
+ queue->gen = gen;
+
+ if (empty && queue->waiting)
+ pthread_cond_signal(&queue->queue_cond);
+ }
+ pthread_mutex_unlock(&queue->queue_lock);
+
+ return;
+}
+
+int
+rpcsvc_handle_event_thread_death(rpcsvc_t *svc, rpc_transport_t *trans, int gen)
+{
+ rpcsvc_program_t *prog = NULL;
+
+ pthread_rwlock_rdlock(&svc->rpclock);
+ {
+ list_for_each_entry(prog, &svc->programs, program)
+ {
+ if (prog->ownthread)
+ rpcsvc_queue_event_thread_death(svc, prog, gen);
+ }
+ }
+ pthread_rwlock_unlock(&svc->rpclock);
+
+ return 0;
+}
+
int
rpcsvc_handle_rpc_call(rpcsvc_t *svc, rpc_transport_t *trans,
rpc_transport_pollin_t *msg)
@@ -585,9 +719,12 @@ rpcsvc_handle_rpc_call(rpcsvc_t *svc, rpc_transport_t *trans,
int ret = -1;
uint16_t port = 0;
gf_boolean_t is_unix = _gf_false, empty = _gf_false;
- gf_boolean_t unprivileged = _gf_false;
+ gf_boolean_t unprivileged = _gf_false, spawn_request_handler = 0;
drc_cached_op_t *reply = NULL;
rpcsvc_drc_globals_t *drc = NULL;
+ rpcsvc_request_queue_t *queue = NULL;
+ long num = 0;
+ void *value = NULL;
if (!trans || !svc)
return -1;
@@ -700,19 +837,81 @@ rpcsvc_handle_rpc_call(rpcsvc_t *svc, rpc_transport_t *trans,
ret = synctask_new(THIS->ctx->env, (synctask_fn_t)actor_fn,
rpcsvc_check_and_reply_error, NULL, req);
} else if (req->ownthread) {
- pthread_mutex_lock(&req->prog->queue_lock);
+ value = pthread_getspecific(req->prog->req_queue_key);
+ if (value == NULL) {
+ pthread_mutex_lock(&req->prog->thr_lock);
+ {
+ num = rpcsvc_get_free_queue_index(req->prog);
+ if (num != -1) {
+ num++;
+ value = (void *)num;
+ ret = pthread_setspecific(req->prog->req_queue_key,
+ value);
+ if (ret < 0) {
+ gf_log(GF_RPCSVC, GF_LOG_WARNING,
+ "setting request queue in TLS failed");
+ rpcsvc_toggle_queue_status(
+ req->prog, &req->prog->request_queue[num - 1],
+ req->prog->request_queue_status);
+ num = -1;
+ } else {
+ spawn_request_handler = 1;
+ }
+ }
+ }
+ pthread_mutex_unlock(&req->prog->thr_lock);
+ }
+
+ if (num == -1)
+ goto noqueue;
+
+ num = ((unsigned long)value) - 1;
+
+ queue = &req->prog->request_queue[num];
+
+ if (spawn_request_handler) {
+ ret = gf_thread_create(&queue->thread, NULL,
+ rpcsvc_request_handler, queue,
+ "rpcrqhnd");
+ if (!ret) {
+ gf_log(GF_RPCSVC, GF_LOG_INFO,
+ "spawned a request handler thread for queue %d",
+ (int)num);
+
+ req->prog->threadcount++;
+ } else {
+ gf_log(
+ GF_RPCSVC, GF_LOG_INFO,
+ "spawning a request handler thread for queue %d failed",
+ (int)num);
+ ret = pthread_setspecific(req->prog->req_queue_key, 0);
+ if (ret < 0) {
+ gf_log(GF_RPCSVC, GF_LOG_WARNING,
+ "resetting request queue in TLS failed");
+ }
+
+ rpcsvc_toggle_queue_status(
+ req->prog, &req->prog->request_queue[num - 1],
+ req->prog->request_queue_status);
+
+ goto noqueue;
+ }
+ }
+
+ pthread_mutex_lock(&queue->queue_lock);
{
- empty = list_empty(&req->prog->request_queue);
+ empty = list_empty(&queue->request_queue);
- list_add_tail(&req->request_list, &req->prog->request_queue);
+ list_add_tail(&req->request_list, &queue->request_queue);
- if (empty)
- pthread_cond_signal(&req->prog->queue_cond);
+ if (empty && queue->waiting)
+ pthread_cond_signal(&queue->queue_cond);
}
- pthread_mutex_unlock(&req->prog->queue_lock);
+ pthread_mutex_unlock(&queue->queue_lock);
ret = 0;
} else {
+ noqueue:
ret = actor_fn(req);
}
}
@@ -839,6 +1038,12 @@ rpcsvc_notify(rpc_transport_t *trans, void *mydata, rpc_transport_event_t event,
"got MAP_XID event, which should have not come");
ret = 0;
break;
+
+ case RPC_TRANSPORT_EVENT_THREAD_DIED:
+ rpcsvc_handle_event_thread_death(svc, trans,
+ (int)(unsigned long)data);
+ ret = 0;
+ break;
}
out:
@@ -1877,6 +2082,7 @@ rpcsvc_create_listeners(rpcsvc_t *svc, dict_t *options, char *name)
goto out;
}
+ dict_del(options, "notify-poller-death");
GF_FREE(transport_name);
transport_name = NULL;
count++;
@@ -1961,55 +2167,86 @@ out:
void *
rpcsvc_request_handler(void *arg)
{
- rpcsvc_program_t *program = arg;
- rpcsvc_request_t *req = NULL;
+ rpcsvc_request_queue_t *queue = NULL;
+ rpcsvc_program_t *program = NULL;
+ rpcsvc_request_t *req = NULL, *tmp_req = NULL;
rpcsvc_actor_t *actor = NULL;
gf_boolean_t done = _gf_false;
int ret = 0;
+ struct list_head tmp_list = {
+ 0,
+ };
+
+ queue = arg;
+ program = queue->program;
+
+ INIT_LIST_HEAD(&tmp_list);
if (!program)
return NULL;
while (1) {
- pthread_mutex_lock(&program->queue_lock);
+ pthread_mutex_lock(&queue->queue_lock);
{
- if (!program->alive && list_empty(&program->request_queue)) {
+ if (!program->alive && list_empty(&queue->request_queue)) {
done = 1;
goto unlock;
}
- while (list_empty(&program->request_queue) &&
- (program->threadcount <= program->eventthreadcount)) {
- pthread_cond_wait(&program->queue_cond, &program->queue_lock);
+ while (list_empty(&queue->request_queue)) {
+ queue->waiting = _gf_true;
+ pthread_cond_wait(&queue->queue_cond, &queue->queue_lock);
}
- if (program->threadcount > program->eventthreadcount) {
- done = 1;
- program->threadcount--;
+ queue->waiting = _gf_false;
- gf_log(GF_RPCSVC, GF_LOG_INFO,
- "program '%s' thread terminated; "
- "total count:%d",
- program->progname, program->threadcount);
- } else if (!list_empty(&program->request_queue)) {
- req = list_entry(program->request_queue.next, typeof(*req),
- request_list);
-
- list_del_init(&req->request_list);
+ if (!list_empty(&queue->request_queue)) {
+ INIT_LIST_HEAD(&tmp_list);
+ list_splice_init(&queue->request_queue, &tmp_list);
}
}
unlock:
- pthread_mutex_unlock(&program->queue_lock);
-
- if (req) {
- THIS = req->svc->xl;
- actor = rpcsvc_program_actor(req);
- ret = actor->actor(req);
+ pthread_mutex_unlock(&queue->queue_lock);
- if (ret != 0) {
- rpcsvc_check_and_reply_error(ret, NULL, req);
+ list_for_each_entry_safe(req, tmp_req, &tmp_list, request_list)
+ {
+ list_del_init(&req->request_list);
+
+ if (req) {
+ if (req->prognum == RPCSVC_INFRA_PROGRAM) {
+ switch (req->procnum) {
+ case RPCSVC_PROC_EVENT_THREAD_DEATH:
+ gf_log(GF_RPCSVC, GF_LOG_INFO,
+ "event thread died, exiting request handler "
+ "thread for queue %d of program %s",
+ (int)(queue - &program->request_queue[0]),
+ program->progname);
+ done = 1;
+ pthread_mutex_lock(&program->thr_lock);
+ {
+ rpcsvc_toggle_queue_status(
+ program, queue,
+ program->request_queue_status);
+ program->threadcount--;
+ }
+ pthread_mutex_unlock(&program->thr_lock);
+ rpcsvc_request_destroy(req);
+ break;
+
+ default:
+ break;
+ }
+ } else {
+ THIS = req->svc->xl;
+ actor = rpcsvc_program_actor(req);
+ ret = actor->actor(req);
+
+ if (ret != 0) {
+ rpcsvc_check_and_reply_error(ret, NULL, req);
+ }
+ req = NULL;
+ }
}
- req = NULL;
}
if (done)
@@ -2020,59 +2257,10 @@ rpcsvc_request_handler(void *arg)
}
int
-rpcsvc_spawn_threads(rpcsvc_t *svc, rpcsvc_program_t *program)
-{
- int ret = 0, delta = 0, creates = 0;
-
- if (!program || !svc)
- goto out;
-
- pthread_mutex_lock(&program->queue_lock);
- {
- delta = program->eventthreadcount - program->threadcount;
-
- if (delta >= 0) {
- while (delta--) {
- ret = gf_thread_create(&program->thread, NULL,
- rpcsvc_request_handler, program,
- "rpcrqhnd");
- if (!ret) {
- program->threadcount++;
- creates++;
- }
- }
-
- if (creates) {
- gf_log(GF_RPCSVC, GF_LOG_INFO,
- "spawned %d threads for program '%s'; "
- "total count:%d",
- creates, program->progname, program->threadcount);
- }
- } else {
- gf_log(GF_RPCSVC, GF_LOG_INFO,
- "terminating %d threads for program '%s'", -delta,
- program->progname);
-
- /* this signal is to just wake up the threads so they
- * test for the change in eventthreadcount and kill
- * themselves until the program thread count becomes
- * equal to the event thread count
- */
- pthread_cond_broadcast(&program->queue_cond);
- }
- }
- pthread_mutex_unlock(&program->queue_lock);
-
-out:
- return creates;
-}
-
-int
rpcsvc_program_register(rpcsvc_t *svc, rpcsvc_program_t *program,
gf_boolean_t add_to_head)
{
- int ret = -1;
- int creates = -1;
+ int ret = -1, i = 0;
rpcsvc_program_t *newprog = NULL;
char already_registered = 0;
@@ -2110,9 +2298,16 @@ rpcsvc_program_register(rpcsvc_t *svc, rpcsvc_program_t *program,
memcpy(newprog, program, sizeof(*program));
INIT_LIST_HEAD(&newprog->program);
- INIT_LIST_HEAD(&newprog->request_queue);
- pthread_mutex_init(&newprog->queue_lock, NULL);
- pthread_cond_init(&newprog->queue_cond, NULL);
+
+ for (i = 0; i < EVENT_MAX_THREADS; i++) {
+ INIT_LIST_HEAD(&newprog->request_queue[i].request_queue);
+ pthread_mutex_init(&newprog->request_queue[i].queue_lock, NULL);
+ pthread_cond_init(&newprog->request_queue[i].queue_cond, NULL);
+ newprog->request_queue[i].program = newprog;
+ }
+
+ pthread_mutex_init(&newprog->thr_lock, NULL);
+ pthread_cond_init(&newprog->thr_cond, NULL);
newprog->alive = _gf_true;
@@ -2121,12 +2316,11 @@ rpcsvc_program_register(rpcsvc_t *svc, rpcsvc_program_t *program,
newprog->ownthread = _gf_false;
if (newprog->ownthread) {
- newprog->eventthreadcount = 1;
- creates = rpcsvc_spawn_threads(svc, newprog);
+ struct event_pool *ep = svc->ctx->event_pool;
+ newprog->eventthreadcount = ep->eventthreadcount;
- if (creates < 1) {
- goto out;
- }
+ pthread_key_create(&newprog->req_queue_key, NULL);
+ newprog->thr_queue = 1;
}
pthread_rwlock_wrlock(&svc->rpclock);
@@ -3003,38 +3197,6 @@ out:
return ret;
}
-/* During reconfigure, Make sure to call this function after event-threads are
- * reconfigured as programs' threadcount will be made equal to event threads.
- */
-
-int
-rpcsvc_ownthread_reconf(rpcsvc_t *svc, int new_eventthreadcount)
-{
- int ret = -1;
- rpcsvc_program_t *program = NULL;
-
- if (!svc) {
- ret = 0;
- goto out;
- }
-
- pthread_rwlock_wrlock(&svc->rpclock);
- {
- list_for_each_entry(program, &svc->programs, program)
- {
- if (program->ownthread) {
- program->eventthreadcount = new_eventthreadcount;
- rpcsvc_spawn_threads(svc, program);
- }
- }
- }
- pthread_rwlock_unlock(&svc->rpclock);
-
- ret = 0;
-out:
- return ret;
-}
-
rpcsvc_actor_t gluster_dump_actors[GF_DUMP_MAXVALUE] = {
[GF_DUMP_NULL] = {"NULL", GF_DUMP_NULL, NULL, NULL, 0, DRC_NA},
[GF_DUMP_DUMP] = {"DUMP", GF_DUMP_DUMP, rpcsvc_dump, NULL, 0, DRC_NA},