summaryrefslogtreecommitdiffstats
path: root/rpc/rpc-lib
diff options
context:
space:
mode:
Diffstat (limited to 'rpc/rpc-lib')
-rw-r--r--rpc/rpc-lib/src/autoscale-threads.c1
-rw-r--r--rpc/rpc-lib/src/libgfrpc.sym1
-rw-r--r--rpc/rpc-lib/src/rpc-clnt.c6
-rw-r--r--rpc/rpc-lib/src/rpc-transport.c4
-rw-r--r--rpc/rpc-lib/src/rpc-transport.h3
-rw-r--r--rpc/rpc-lib/src/rpcsvc.c412
-rw-r--r--rpc/rpc-lib/src/rpcsvc.h34
7 files changed, 327 insertions, 134 deletions
diff --git a/rpc/rpc-lib/src/autoscale-threads.c b/rpc/rpc-lib/src/autoscale-threads.c
index 337f002df10..d629a1cd430 100644
--- a/rpc/rpc-lib/src/autoscale-threads.c
+++ b/rpc/rpc-lib/src/autoscale-threads.c
@@ -19,5 +19,4 @@ rpcsvc_autoscale_threads(glusterfs_ctx_t *ctx, rpcsvc_t *rpc, int incr)
pool->auto_thread_count += incr;
(void)event_reconfigure_threads(pool, thread_count + incr);
- rpcsvc_ownthread_reconf(rpc, pool->eventthreadcount);
}
diff --git a/rpc/rpc-lib/src/libgfrpc.sym b/rpc/rpc-lib/src/libgfrpc.sym
index a7cb5f6b5cb..4f42485044f 100644
--- a/rpc/rpc-lib/src/libgfrpc.sym
+++ b/rpc/rpc-lib/src/libgfrpc.sym
@@ -51,7 +51,6 @@ rpcsvc_transport_connect
rpcsvc_transport_getpeeraddr
rpcsvc_unregister_notify
rpcsvc_volume_allowed
-rpcsvc_ownthread_reconf
rpc_transport_count
rpc_transport_connect
rpc_transport_disconnect
diff --git a/rpc/rpc-lib/src/rpc-clnt.c b/rpc/rpc-lib/src/rpc-clnt.c
index 2505998b3d4..b26d645bb12 100644
--- a/rpc/rpc-lib/src/rpc-clnt.c
+++ b/rpc/rpc-lib/src/rpc-clnt.c
@@ -969,6 +969,12 @@ rpc_clnt_notify(rpc_transport_t *trans, void *mydata,
*/
ret = 0;
break;
+
+ case RPC_TRANSPORT_EVENT_THREAD_DIED:
+ /* only meaningful on a server, no need of handling this event on a
+ * client */
+ ret = 0;
+ break;
}
out:
diff --git a/rpc/rpc-lib/src/rpc-transport.c b/rpc/rpc-lib/src/rpc-transport.c
index d70334476c7..54636dcbf00 100644
--- a/rpc/rpc-lib/src/rpc-transport.c
+++ b/rpc/rpc-lib/src/rpc-transport.c
@@ -266,6 +266,10 @@ rpc_transport_load(glusterfs_ctx_t *ctx, dict_t *options, char *trans_name)
goto fail;
}
+ if (dict_get(options, "notify-poller-death")) {
+ trans->notify_poller_death = 1;
+ }
+
gf_log("rpc-transport", GF_LOG_DEBUG, "attempt to load file %s", name);
handle = dlopen(name, RTLD_NOW);
diff --git a/rpc/rpc-lib/src/rpc-transport.h b/rpc/rpc-lib/src/rpc-transport.h
index c238501b5c7..fd737d0c764 100644
--- a/rpc/rpc-lib/src/rpc-transport.h
+++ b/rpc/rpc-lib/src/rpc-transport.h
@@ -97,6 +97,7 @@ typedef enum {
RPC_TRANSPORT_MSG_RECEIVED, /* Complete rpc msg has been read */
RPC_TRANSPORT_CONNECT, /* client is connected to server */
RPC_TRANSPORT_MSG_SENT,
+ RPC_TRANSPORT_EVENT_THREAD_DIED /* event-thread has died */
} rpc_transport_event_t;
struct rpc_transport_msg {
@@ -213,6 +214,8 @@ struct rpc_transport {
* layer or in client management notification handler functions
*/
gf_boolean_t connect_failed;
+ char notify_poller_death;
+ char poller_death_accept;
};
struct rpc_transport_ops {
diff --git a/rpc/rpc-lib/src/rpcsvc.c b/rpc/rpc-lib/src/rpcsvc.c
index c6545193a11..d678bca43a8 100644
--- a/rpc/rpc-lib/src/rpcsvc.c
+++ b/rpc/rpc-lib/src/rpcsvc.c
@@ -36,6 +36,7 @@
#include <fnmatch.h>
#include <stdarg.h>
#include <stdio.h>
+#include <math.h>
#ifdef IPV6_DEFAULT
#include <netconfig.h>
@@ -63,10 +64,76 @@ rpcsvc_get_listener(rpcsvc_t *svc, uint16_t port, rpc_transport_t *trans);
int
rpcsvc_notify(rpc_transport_t *trans, void *mydata, rpc_transport_event_t event,
void *data, ...);
+void *
+rpcsvc_request_handler(void *arg);
static int
rpcsvc_match_subnet_v4(const char *addrtok, const char *ipaddr);
+void
+rpcsvc_toggle_queue_status(rpcsvc_program_t *prog,
+ rpcsvc_request_queue_t *queue, char status[])
+{
+ int queue_index = 0, status_index = 0, set_bit = 0;
+
+ if (queue != &prog->request_queue[0]) {
+ queue_index = (queue - &prog->request_queue[0]);
+ }
+
+ status_index = queue_index / 8;
+ set_bit = queue_index % 8;
+
+ status[status_index] ^= (1 << set_bit);
+
+ return;
+}
+
+static int
+get_rightmost_set_bit(int n)
+{
+ return log2(n & -n);
+}
+
+int
+rpcsvc_get_free_queue_index(rpcsvc_program_t *prog)
+{
+ int queue_index = 0, max_index = 0, i = 0;
+ unsigned int right_most_unset_bit = 0;
+
+ right_most_unset_bit = 8;
+
+ max_index = gf_roof(EVENT_MAX_THREADS, 8) / 8;
+ for (i = 0; i < max_index; i++) {
+ if (prog->request_queue_status[i] == 0) {
+ right_most_unset_bit = 0;
+ break;
+ } else {
+ right_most_unset_bit = get_rightmost_set_bit(
+ ~prog->request_queue_status[i]);
+ if (right_most_unset_bit < 8) {
+ break;
+ }
+ }
+ }
+
+ if (right_most_unset_bit > 7) {
+ queue_index = -1;
+ } else {
+ queue_index = i * 8;
+ queue_index += right_most_unset_bit;
+
+ if (queue_index > EVENT_MAX_THREADS) {
+ queue_index = -1;
+ }
+ }
+
+ if (queue_index != -1) {
+ prog->request_queue_status[i] |= (0x1 << right_most_unset_bit);
+ }
+
+ return queue_index;
+}
+
rpcsvc_notify_wrapper_t *
rpcsvc_notify_wrapper_alloc(void)
{
@@ -575,6 +642,73 @@ rpcsvc_check_and_reply_error(int ret, call_frame_t *frame, void *opaque)
return 0;
}
+void
+rpcsvc_queue_event_thread_death(rpcsvc_t *svc, rpcsvc_program_t *prog, int gen)
+{
+ rpcsvc_request_queue_t *queue = NULL;
+ int num = 0;
+ void *value = NULL;
+ rpcsvc_request_t *req = NULL;
+ char empty = 0;
+
+ value = pthread_getspecific(prog->req_queue_key);
+ if (value == NULL) {
+ return;
+ }
+
+ num = ((unsigned long)value) - 1;
+
+ queue = &prog->request_queue[num];
+
+ if (queue->gen == gen) {
+ /* duplicate event */
+ gf_log(GF_RPCSVC, GF_LOG_INFO,
+ "not queuing duplicate event thread death. "
+ "queue %d program %s",
+ num, prog->progname);
+ return;
+ }
+
+ rpcsvc_alloc_request(svc, req);
+ req->prognum = RPCSVC_INFRA_PROGRAM;
+ req->procnum = RPCSVC_PROC_EVENT_THREAD_DEATH;
+ gf_log(GF_RPCSVC, GF_LOG_INFO,
+ "queuing event thread death request to queue %d of program %s", num,
+ prog->progname);
+
+ pthread_mutex_lock(&queue->queue_lock);
+ {
+ empty = list_empty(&queue->request_queue);
+
+ list_add_tail(&req->request_list, &queue->request_queue);
+ queue->gen = gen;
+
+ if (empty && queue->waiting)
+ pthread_cond_signal(&queue->queue_cond);
+ }
+ pthread_mutex_unlock(&queue->queue_lock);
+
+ return;
+}
+
+int
+rpcsvc_handle_event_thread_death(rpcsvc_t *svc, rpc_transport_t *trans, int gen)
+{
+ rpcsvc_program_t *prog = NULL;
+
+ pthread_rwlock_rdlock(&svc->rpclock);
+ {
+ list_for_each_entry(prog, &svc->programs, program)
+ {
+ if (prog->ownthread)
+ rpcsvc_queue_event_thread_death(svc, prog, gen);
+ }
+ }
+ pthread_rwlock_unlock(&svc->rpclock);
+
+ return 0;
+}
+
int
rpcsvc_handle_rpc_call(rpcsvc_t *svc, rpc_transport_t *trans,
rpc_transport_pollin_t *msg)
@@ -585,9 +719,12 @@ rpcsvc_handle_rpc_call(rpcsvc_t *svc, rpc_transport_t *trans,
int ret = -1;
uint16_t port = 0;
gf_boolean_t is_unix = _gf_false, empty = _gf_false;
- gf_boolean_t unprivileged = _gf_false;
+ gf_boolean_t unprivileged = _gf_false, spawn_request_handler = 0;
drc_cached_op_t *reply = NULL;
rpcsvc_drc_globals_t *drc = NULL;
+ rpcsvc_request_queue_t *queue = NULL;
+ long num = 0;
+ void *value = NULL;
if (!trans || !svc)
return -1;
@@ -700,19 +837,81 @@ rpcsvc_handle_rpc_call(rpcsvc_t *svc, rpc_transport_t *trans,
ret = synctask_new(THIS->ctx->env, (synctask_fn_t)actor_fn,
rpcsvc_check_and_reply_error, NULL, req);
} else if (req->ownthread) {
- pthread_mutex_lock(&req->prog->queue_lock);
+ value = pthread_getspecific(req->prog->req_queue_key);
+ if (value == NULL) {
+ pthread_mutex_lock(&req->prog->thr_lock);
+ {
+ num = rpcsvc_get_free_queue_index(req->prog);
+ if (num != -1) {
+ num++;
+ value = (void *)num;
+ ret = pthread_setspecific(req->prog->req_queue_key,
+ value);
+ if (ret < 0) {
+ gf_log(GF_RPCSVC, GF_LOG_WARNING,
+ "setting request queue in TLS failed");
+ rpcsvc_toggle_queue_status(
+ req->prog, &req->prog->request_queue[num - 1],
+ req->prog->request_queue_status);
+ num = -1;
+ } else {
+ spawn_request_handler = 1;
+ }
+ }
+ }
+ pthread_mutex_unlock(&req->prog->thr_lock);
+ }
+
+ if (num == -1)
+ goto noqueue;
+
+ num = ((unsigned long)value) - 1;
+
+ queue = &req->prog->request_queue[num];
+
+ if (spawn_request_handler) {
+ ret = gf_thread_create(&queue->thread, NULL,
+ rpcsvc_request_handler, queue,
+ "rpcrqhnd");
+ if (!ret) {
+ gf_log(GF_RPCSVC, GF_LOG_INFO,
+ "spawned a request handler thread for queue %d",
+ (int)num);
+
+ req->prog->threadcount++;
+ } else {
+ gf_log(
+ GF_RPCSVC, GF_LOG_INFO,
+ "spawning a request handler thread for queue %d failed",
+ (int)num);
+ ret = pthread_setspecific(req->prog->req_queue_key, 0);
+ if (ret < 0) {
+ gf_log(GF_RPCSVC, GF_LOG_WARNING,
+ "resetting request queue in TLS failed");
+ }
+
+ rpcsvc_toggle_queue_status(
+ req->prog, &req->prog->request_queue[num - 1],
+ req->prog->request_queue_status);
+
+ goto noqueue;
+ }
+ }
+
+ pthread_mutex_lock(&queue->queue_lock);
{
- empty = list_empty(&req->prog->request_queue);
+ empty = list_empty(&queue->request_queue);
- list_add_tail(&req->request_list, &req->prog->request_queue);
+ list_add_tail(&req->request_list, &queue->request_queue);
- if (empty)
- pthread_cond_signal(&req->prog->queue_cond);
+ if (empty && queue->waiting)
+ pthread_cond_signal(&queue->queue_cond);
}
- pthread_mutex_unlock(&req->prog->queue_lock);
+ pthread_mutex_unlock(&queue->queue_lock);
ret = 0;
} else {
+ noqueue:
ret = actor_fn(req);
}
}
@@ -839,6 +1038,12 @@ rpcsvc_notify(rpc_transport_t *trans, void *mydata, rpc_transport_event_t event,
"got MAP_XID event, which should have not come");
ret = 0;
break;
+
+ case RPC_TRANSPORT_EVENT_THREAD_DIED:
+ rpcsvc_handle_event_thread_death(svc, trans,
+ (int)(unsigned long)data);
+ ret = 0;
+ break;
}
out:
@@ -1877,6 +2082,7 @@ rpcsvc_create_listeners(rpcsvc_t *svc, dict_t *options, char *name)
goto out;
}
+ dict_del(options, "notify-poller-death");
GF_FREE(transport_name);
transport_name = NULL;
count++;
@@ -1961,55 +2167,86 @@ out:
void *
rpcsvc_request_handler(void *arg)
{
- rpcsvc_program_t *program = arg;
- rpcsvc_request_t *req = NULL;
+ rpcsvc_request_queue_t *queue = NULL;
+ rpcsvc_program_t *program = NULL;
+ rpcsvc_request_t *req = NULL, *tmp_req = NULL;
rpcsvc_actor_t *actor = NULL;
gf_boolean_t done = _gf_false;
int ret = 0;
+ struct list_head tmp_list = {
+ 0,
+ };
+
+ queue = arg;
+ program = queue->program;
+
+ INIT_LIST_HEAD(&tmp_list);
if (!program)
return NULL;
while (1) {
- pthread_mutex_lock(&program->queue_lock);
+ pthread_mutex_lock(&queue->queue_lock);
{
- if (!program->alive && list_empty(&program->request_queue)) {
+ if (!program->alive && list_empty(&queue->request_queue)) {
done = 1;
goto unlock;
}
- while (list_empty(&program->request_queue) &&
- (program->threadcount <= program->eventthreadcount)) {
- pthread_cond_wait(&program->queue_cond, &program->queue_lock);
+ while (list_empty(&queue->request_queue)) {
+ queue->waiting = _gf_true;
+ pthread_cond_wait(&queue->queue_cond, &queue->queue_lock);
}
- if (program->threadcount > program->eventthreadcount) {
- done = 1;
- program->threadcount--;
+ queue->waiting = _gf_false;
- gf_log(GF_RPCSVC, GF_LOG_INFO,
- "program '%s' thread terminated; "
- "total count:%d",
- program->progname, program->threadcount);
- } else if (!list_empty(&program->request_queue)) {
- req = list_entry(program->request_queue.next, typeof(*req),
- request_list);
-
- list_del_init(&req->request_list);
+ if (!list_empty(&queue->request_queue)) {
+ INIT_LIST_HEAD(&tmp_list);
+ list_splice_init(&queue->request_queue, &tmp_list);
}
}
unlock:
- pthread_mutex_unlock(&program->queue_lock);
-
- if (req) {
- THIS = req->svc->xl;
- actor = rpcsvc_program_actor(req);
- ret = actor->actor(req);
+ pthread_mutex_unlock(&queue->queue_lock);
- if (ret != 0) {
- rpcsvc_check_and_reply_error(ret, NULL, req);
+ list_for_each_entry_safe(req, tmp_req, &tmp_list, request_list)
+ {
+ list_del_init(&req->request_list);
+
+ if (req) {
+ if (req->prognum == RPCSVC_INFRA_PROGRAM) {
+ switch (req->procnum) {
+ case RPCSVC_PROC_EVENT_THREAD_DEATH:
+ gf_log(GF_RPCSVC, GF_LOG_INFO,
+ "event thread died, exiting request handler "
+ "thread for queue %d of program %s",
+ (int)(queue - &program->request_queue[0]),
+ program->progname);
+ done = 1;
+ pthread_mutex_lock(&program->thr_lock);
+ {
+ rpcsvc_toggle_queue_status(
+ program, queue,
+ program->request_queue_status);
+ program->threadcount--;
+ }
+ pthread_mutex_unlock(&program->thr_lock);
+ rpcsvc_request_destroy(req);
+ break;
+
+ default:
+ break;
+ }
+ } else {
+ THIS = req->svc->xl;
+ actor = rpcsvc_program_actor(req);
+ ret = actor->actor(req);
+
+ if (ret != 0) {
+ rpcsvc_check_and_reply_error(ret, NULL, req);
+ }
+ req = NULL;
+ }
}
- req = NULL;
}
if (done)
@@ -2020,59 +2257,10 @@ rpcsvc_request_handler(void *arg)
}
int
-rpcsvc_spawn_threads(rpcsvc_t *svc, rpcsvc_program_t *program)
-{
- int ret = 0, delta = 0, creates = 0;
-
- if (!program || !svc)
- goto out;
-
- pthread_mutex_lock(&program->queue_lock);
- {
- delta = program->eventthreadcount - program->threadcount;
-
- if (delta >= 0) {
- while (delta--) {
- ret = gf_thread_create(&program->thread, NULL,
- rpcsvc_request_handler, program,
- "rpcrqhnd");
- if (!ret) {
- program->threadcount++;
- creates++;
- }
- }
-
- if (creates) {
- gf_log(GF_RPCSVC, GF_LOG_INFO,
- "spawned %d threads for program '%s'; "
- "total count:%d",
- creates, program->progname, program->threadcount);
- }
- } else {
- gf_log(GF_RPCSVC, GF_LOG_INFO,
- "terminating %d threads for program '%s'", -delta,
- program->progname);
-
- /* this signal is to just wake up the threads so they
- * test for the change in eventthreadcount and kill
- * themselves until the program thread count becomes
- * equal to the event thread count
- */
- pthread_cond_broadcast(&program->queue_cond);
- }
- }
- pthread_mutex_unlock(&program->queue_lock);
-
-out:
- return creates;
-}
-
-int
rpcsvc_program_register(rpcsvc_t *svc, rpcsvc_program_t *program,
gf_boolean_t add_to_head)
{
- int ret = -1;
- int creates = -1;
+ int ret = -1, i = 0;
rpcsvc_program_t *newprog = NULL;
char already_registered = 0;
@@ -2110,9 +2298,16 @@ rpcsvc_program_register(rpcsvc_t *svc, rpcsvc_program_t *program,
memcpy(newprog, program, sizeof(*program));
INIT_LIST_HEAD(&newprog->program);
- INIT_LIST_HEAD(&newprog->request_queue);
- pthread_mutex_init(&newprog->queue_lock, NULL);
- pthread_cond_init(&newprog->queue_cond, NULL);
+
+ for (i = 0; i < EVENT_MAX_THREADS; i++) {
+ INIT_LIST_HEAD(&newprog->request_queue[i].request_queue);
+ pthread_mutex_init(&newprog->request_queue[i].queue_lock, NULL);
+ pthread_cond_init(&newprog->request_queue[i].queue_cond, NULL);
+ newprog->request_queue[i].program = newprog;
+ }
+
+ pthread_mutex_init(&newprog->thr_lock, NULL);
+ pthread_cond_init(&newprog->thr_cond, NULL);
newprog->alive = _gf_true;
@@ -2121,12 +2316,11 @@ rpcsvc_program_register(rpcsvc_t *svc, rpcsvc_program_t *program,
newprog->ownthread = _gf_false;
if (newprog->ownthread) {
- newprog->eventthreadcount = 1;
- creates = rpcsvc_spawn_threads(svc, newprog);
+ struct event_pool *ep = svc->ctx->event_pool;
+ newprog->eventthreadcount = ep->eventthreadcount;
- if (creates < 1) {
- goto out;
- }
+ pthread_key_create(&newprog->req_queue_key, NULL);
+ newprog->thr_queue = 1;
}
pthread_rwlock_wrlock(&svc->rpclock);
@@ -3003,38 +3197,6 @@ out:
return ret;
}
-/* During reconfigure, Make sure to call this function after event-threads are
- * reconfigured as programs' threadcount will be made equal to event threads.
- */
-
-int
-rpcsvc_ownthread_reconf(rpcsvc_t *svc, int new_eventthreadcount)
-{
- int ret = -1;
- rpcsvc_program_t *program = NULL;
-
- if (!svc) {
- ret = 0;
- goto out;
- }
-
- pthread_rwlock_wrlock(&svc->rpclock);
- {
- list_for_each_entry(program, &svc->programs, program)
- {
- if (program->ownthread) {
- program->eventthreadcount = new_eventthreadcount;
- rpcsvc_spawn_threads(svc, program);
- }
- }
- }
- pthread_rwlock_unlock(&svc->rpclock);
-
- ret = 0;
-out:
- return ret;
-}
-
rpcsvc_actor_t gluster_dump_actors[GF_DUMP_MAXVALUE] = {
[GF_DUMP_NULL] = {"NULL", GF_DUMP_NULL, NULL, NULL, 0, DRC_NA},
[GF_DUMP_DUMP] = {"DUMP", GF_DUMP_DUMP, rpcsvc_dump, NULL, 0, DRC_NA},
diff --git a/rpc/rpc-lib/src/rpcsvc.h b/rpc/rpc-lib/src/rpcsvc.h
index ebb836fba3f..8388dd404c5 100644
--- a/rpc/rpc-lib/src/rpcsvc.h
+++ b/rpc/rpc-lib/src/rpcsvc.h
@@ -33,6 +33,16 @@
#define MAX_IOVEC 16
#endif
+/* TODO: we should store prognums at a centralized location to avoid conflict
+ or use a robust random number generator to avoid conflicts
+*/
+
+#define RPCSVC_INFRA_PROGRAM 7712846 /* random number */
+
+typedef enum {
+ RPCSVC_PROC_EVENT_THREAD_DEATH = 0,
+} rpcsvc_infra_procnum_t;
+
#define RPCSVC_DEFAULT_OUTSTANDING_RPC_LIMIT \
64 /* Default for protocol/server */
#define RPCSVC_DEF_NFS_OUTSTANDING_RPC_LIMIT 16 /* Default for nfs/server */
@@ -362,6 +372,16 @@ typedef struct rpcsvc_actor_desc {
drc_op_type_t op_type;
} rpcsvc_actor_t;
+typedef struct rpcsvc_request_queue {
+ int gen;
+ struct list_head request_queue;
+ pthread_mutex_t queue_lock;
+ pthread_cond_t queue_cond;
+ pthread_t thread;
+ struct rpcsvc_program *program;
+ gf_boolean_t waiting;
+} rpcsvc_request_queue_t;
+
/* Describes a program and its version along with the function pointers
* required to handle the procedures/actors of each program/version.
* Never changed ever by any thread so no need for a lock.
@@ -421,11 +441,14 @@ struct rpcsvc_program {
gf_boolean_t synctask;
/* list member to link to list of registered services with rpcsvc */
struct list_head program;
- struct list_head request_queue;
- pthread_mutex_t queue_lock;
- pthread_cond_t queue_cond;
- pthread_t thread;
+ rpcsvc_request_queue_t request_queue[EVENT_MAX_THREADS];
+ char request_queue_status[EVENT_MAX_THREADS / 8 + 1];
+ pthread_mutex_t thr_lock;
+ pthread_cond_t thr_cond;
int threadcount;
+ int thr_queue;
+ pthread_key_t req_queue_key;
+
/* eventthreadcount is just a readonly copy of the actual value
* owned by the event sub-system
* It is used to control the scaling of rpcsvc_request_handler threads
@@ -652,9 +675,6 @@ rpcsvc_auth_array(rpcsvc_t *svc, char *volname, int *autharr, int arrlen);
rpcsvc_vector_sizer
rpcsvc_get_program_vector_sizer(rpcsvc_t *svc, uint32_t prognum,
uint32_t progver, int procnum);
-extern int
-rpcsvc_ownthread_reconf(rpcsvc_t *svc, int new_eventthreadcount);
-
void
rpcsvc_autoscale_threads(glusterfs_ctx_t *ctx, rpcsvc_t *rpc, int incr);
#endif