summaryrefslogtreecommitdiffstats
path: root/rpc/rpc-lib/src
diff options
context:
space:
mode:
authorRaghavendra G <rgowdapp@redhat.com>2017-04-25 10:43:07 +0530
committerJeff Darcy <jeff@pl.atyp.us>2017-07-18 10:45:29 +0000
commit2e72b24707f1886833db0b09e48b3f48b8d68d37 (patch)
treecb1f83ed4ac84e5390d5bbc54d0bfb649fcead7e /rpc/rpc-lib/src
parent3b069f4d7e2140c1cad8c875a4397a1c90f99551 (diff)
program/GF-DUMP: Shield ping processing from traffic to Glusterfs
Program Since poller thread bears the brunt of execution till the request is handed over to io-threads, poller thread experiencies lock contention(s) in the control flow till io-threads, which slows it down. This delay invariably affects reading ping requests from network and responding to them, resulting in increased ping latencies, which sometimes results in a ping-timer-expiry on client leading to disconnect of transport. So, this patch aims to free up poller thread from executing code of Glusterfs Program. We do this by making * Glusterfs Program registering itself asking rpcsvc to execute its actors in its own threads. * GF-DUMP Program registering itself asking rpcsvc to _NOT_ execute its actors in its own threads. Otherwise program's ownthreads become bottleneck in processing ping traffic. This means that poller thread reads a ping packet, invokes its actor and hands the response msg to transport queue. Change-Id: I526268c10bdd5ef93f322a4f95385137550a6a49 Signed-off-by: Raghavendra G <rgowdapp@redhat.com> BUG: 1421938 Reviewed-on: https://review.gluster.org/17105 NetBSD-regression: NetBSD Build System <jenkins@build.gluster.org> CentOS-regression: Gluster Build System <jenkins@build.gluster.org> Smoke: Gluster Build System <jenkins@build.gluster.org> Reviewed-by: Amar Tumballi <amarts@redhat.com> Reviewed-by: Jeff Darcy <jeff@pl.atyp.us>
Diffstat (limited to 'rpc/rpc-lib/src')
-rw-r--r--rpc/rpc-lib/src/rpcsvc.c89
-rw-r--r--rpc/rpc-lib/src/rpcsvc.h19
2 files changed, 103 insertions, 5 deletions
diff --git a/rpc/rpc-lib/src/rpcsvc.c b/rpc/rpc-lib/src/rpcsvc.c
index ce4e2bf0dc2..16d76a159e8 100644
--- a/rpc/rpc-lib/src/rpcsvc.c
+++ b/rpc/rpc-lib/src/rpcsvc.c
@@ -303,6 +303,7 @@ rpcsvc_program_actor (rpcsvc_request_t *req)
goto err;
}
+ req->ownthread = program->ownthread;
req->synctask = program->synctask;
err = SUCCESS;
@@ -410,6 +411,7 @@ rpcsvc_request_init (rpcsvc_t *svc, rpc_transport_t *trans,
req->progver = rpc_call_progver (callmsg);
req->procnum = rpc_call_progproc (callmsg);
req->trans = rpc_transport_ref (trans);
+ gf_client_ref (req->trans->xl_private);
req->count = msg->count;
req->msg[0] = progmsg;
req->iobref = iobref_ref (msg->iobref);
@@ -425,6 +427,7 @@ rpcsvc_request_init (rpcsvc_t *svc, rpc_transport_t *trans,
req->trans_private = msg->private;
INIT_LIST_HEAD (&req->txlist);
+ INIT_LIST_HEAD (&req->request_list);
req->payloadsize = 0;
/* By this time, the data bytes for the auth scheme would have already
@@ -575,7 +578,7 @@ rpcsvc_handle_rpc_call (rpcsvc_t *svc, rpc_transport_t *trans,
rpcsvc_request_t *req = NULL;
int ret = -1;
uint16_t port = 0;
- gf_boolean_t is_unix = _gf_false;
+ gf_boolean_t is_unix = _gf_false, empty = _gf_false;
gf_boolean_t unprivileged = _gf_false;
drc_cached_op_t *reply = NULL;
rpcsvc_drc_globals_t *drc = NULL;
@@ -691,6 +694,20 @@ rpcsvc_handle_rpc_call (rpcsvc_t *svc, rpc_transport_t *trans,
(synctask_fn_t) actor_fn,
rpcsvc_check_and_reply_error, NULL,
req);
+ } else if (req->ownthread) {
+ pthread_mutex_lock (&req->prog->queue_lock);
+ {
+ empty = list_empty (&req->prog->request_queue);
+
+ list_add_tail (&req->request_list,
+ &req->prog->request_queue);
+
+ if (empty)
+ pthread_cond_signal (&req->prog->queue_cond);
+ }
+ pthread_mutex_unlock (&req->prog->queue_lock);
+
+ ret = 0;
} else {
ret = actor_fn (req);
}
@@ -1570,6 +1587,12 @@ rpcsvc_program_unregister (rpcsvc_t *svc, rpcsvc_program_t *program)
" Ver: %d, Port: %d", prog->progname, prog->prognum,
prog->progver, prog->progport);
+ if (prog->ownthread) {
+ prog->alive = _gf_false;
+ ret = 0;
+ goto out;
+ }
+
pthread_mutex_lock (&svc->rpclock);
{
list_del_init (&prog->program);
@@ -1834,6 +1857,55 @@ out:
return ret;
}
+void *
+rpcsvc_request_handler (void *arg)
+{
+ rpcsvc_program_t *program = arg;
+ rpcsvc_request_t *req = NULL;
+ rpcsvc_actor_t *actor = NULL;
+ gf_boolean_t done = _gf_false;
+ int ret = 0;
+
+ if (!program)
+ return NULL;
+
+ while (1) {
+ pthread_mutex_lock (&program->queue_lock);
+ {
+ if (!program->alive
+ && list_empty (&program->request_queue)) {
+ done = 1;
+ goto unlock;
+ }
+
+ while (list_empty (&program->request_queue))
+ pthread_cond_wait (&program->queue_cond,
+ &program->queue_lock);
+
+ req = list_entry (program->request_queue.next,
+ typeof (*req), request_list);
+
+ list_del_init (&req->request_list);
+ }
+ unlock:
+ pthread_mutex_unlock (&program->queue_lock);
+
+ if (done)
+ break;
+
+ THIS = req->svc->xl;
+
+ actor = rpcsvc_program_actor (req);
+
+ ret = actor->actor (req);
+
+ if (ret != 0) {
+ rpcsvc_check_and_reply_error (ret, NULL, req);
+ }
+ }
+
+ return NULL;
+}
int
rpcsvc_program_register (rpcsvc_t *svc, rpcsvc_program_t *program)
@@ -1875,6 +1947,21 @@ rpcsvc_program_register (rpcsvc_t *svc, rpcsvc_program_t *program)
memcpy (newprog, program, sizeof (*program));
INIT_LIST_HEAD (&newprog->program);
+ INIT_LIST_HEAD (&newprog->request_queue);
+ pthread_mutex_init (&newprog->queue_lock, NULL);
+ pthread_cond_init (&newprog->queue_cond, NULL);
+
+ newprog->alive = _gf_true;
+
+ /* make sure synctask gets priority over ownthread */
+ if (newprog->synctask)
+ newprog->ownthread = _gf_false;
+
+ if (newprog->ownthread) {
+ gf_thread_create (&newprog->thread, NULL,
+ rpcsvc_request_handler,
+ newprog);
+ }
pthread_mutex_lock (&svc->rpclock);
{
diff --git a/rpc/rpc-lib/src/rpcsvc.h b/rpc/rpc-lib/src/rpcsvc.h
index cf3e5906de1..73507b6538b 100644
--- a/rpc/rpc-lib/src/rpcsvc.h
+++ b/rpc/rpc-lib/src/rpcsvc.h
@@ -233,9 +233,10 @@ struct rpcsvc_request {
*/
rpcsvc_auth_data_t verf;
- /* Execute this request's actor function as a synctask? */
- gf_boolean_t synctask;
+ /* Execute this request's actor function in ownthread of program?*/
+ gf_boolean_t ownthread;
+ gf_boolean_t synctask;
/* Container for a RPC program wanting to store a temp
* request-specific item.
*/
@@ -246,6 +247,9 @@ struct rpcsvc_request {
/* pointer to cached reply for use in DRC */
drc_cached_op_t *reply;
+
+ /* request queue in rpcsvc */
+ struct list_head request_list;
};
#define rpcsvc_request_program(req) ((rpcsvc_program_t *)((req)->prog))
@@ -396,11 +400,18 @@ struct rpcsvc_program {
*/
int min_auth;
- /* Execute actor function as a synctask? */
- gf_boolean_t synctask;
+ /* Execute actor function in program's own thread? This will reduce */
+ /* the workload on poller threads */
+ gf_boolean_t ownthread;
+ gf_boolean_t alive;
+ gf_boolean_t synctask;
/* list member to link to list of registered services with rpcsvc */
struct list_head program;
+ struct list_head request_queue;
+ pthread_mutex_t queue_lock;
+ pthread_cond_t queue_cond;
+ pthread_t thread;
};
typedef struct rpcsvc_cbk_program {