diff options
-rw-r--r-- | cli/src/cli-rl.c | 5 | ||||
-rw-r--r-- | libglusterfs/src/event-epoll.c | 114 | ||||
-rw-r--r-- | libglusterfs/src/event-poll.c | 10 | ||||
-rw-r--r-- | libglusterfs/src/event.c | 10 | ||||
-rw-r--r-- | libglusterfs/src/gf-event.h | 19 | ||||
-rw-r--r-- | rpc/rpc-lib/src/autoscale-threads.c | 1 | ||||
-rw-r--r-- | rpc/rpc-lib/src/libgfrpc.sym | 1 | ||||
-rw-r--r-- | rpc/rpc-lib/src/rpc-clnt.c | 6 | ||||
-rw-r--r-- | rpc/rpc-lib/src/rpc-transport.c | 4 | ||||
-rw-r--r-- | rpc/rpc-lib/src/rpc-transport.h | 3 | ||||
-rw-r--r-- | rpc/rpc-lib/src/rpcsvc.c | 412 | ||||
-rw-r--r-- | rpc/rpc-lib/src/rpcsvc.h | 34 | ||||
-rw-r--r-- | rpc/rpc-transport/socket/src/socket.c | 28 | ||||
-rw-r--r-- | xlators/protocol/server/src/server.c | 9 |
14 files changed, 479 insertions, 177 deletions
diff --git a/cli/src/cli-rl.c b/cli/src/cli-rl.c index 7831d0bcb40..38aa6f4b7ae 100644 --- a/cli/src/cli-rl.c +++ b/cli/src/cli-rl.c @@ -104,7 +104,7 @@ cli_rl_process_line(char *line) int cli_rl_stdin(int fd, int idx, int gen, void *data, int poll_out, int poll_in, - int poll_err) + int poll_err, char event_thread_died) { struct cli_state *state = NULL; @@ -376,7 +376,8 @@ cli_rl_enable(struct cli_state *state) goto out; } - ret = event_register(state->ctx->event_pool, 0, cli_rl_stdin, state, 1, 0); + ret = event_register(state->ctx->event_pool, 0, cli_rl_stdin, state, 1, 0, + 0); if (ret == -1) goto out; diff --git a/libglusterfs/src/event-epoll.c b/libglusterfs/src/event-epoll.c index 9826cc9e275..041a7e6c583 100644 --- a/libglusterfs/src/event-epoll.c +++ b/libglusterfs/src/event-epoll.c @@ -30,6 +30,7 @@ struct event_slot_epoll { int fd; int events; int gen; + int idx; gf_atomic_t ref; int do_close; int in_handler; @@ -37,6 +38,7 @@ struct event_slot_epoll { void *data; event_handler_t handler; gf_lock_t lock; + struct list_head poller_death; }; struct event_thread_data { @@ -57,6 +59,7 @@ __event_newtable(struct event_pool *event_pool, int table_idx) for (i = 0; i < EVENT_EPOLL_SLOTS; i++) { table[i].fd = -1; LOCK_INIT(&table[i].lock); + INIT_LIST_HEAD(&table[i].poller_death); } event_pool->ereg[table_idx] = table; @@ -66,7 +69,8 @@ __event_newtable(struct event_pool *event_pool, int table_idx) } static int -__event_slot_alloc(struct event_pool *event_pool, int fd) +__event_slot_alloc(struct event_pool *event_pool, int fd, + char notify_poller_death) { int i = 0; int table_idx = -1; @@ -109,8 +113,15 @@ __event_slot_alloc(struct event_pool *event_pool, int fd) table[i].gen = gen + 1; LOCK_INIT(&table[i].lock); + INIT_LIST_HEAD(&table[i].poller_death); table[i].fd = fd; + if (notify_poller_death) { + table[i].idx = table_idx * EVENT_EPOLL_SLOTS + i; + list_add_tail(&table[i].poller_death, + &event_pool->poller_death); + } + event_pool->slots_used[table_idx]++; break; @@ -121,13 +132,14 @@ __event_slot_alloc(struct event_pool *event_pool, int fd) } static int -event_slot_alloc(struct event_pool *event_pool, int fd) +event_slot_alloc(struct event_pool *event_pool, int fd, + char notify_poller_death) { int idx = -1; pthread_mutex_lock(&event_pool->mutex); { - idx = __event_slot_alloc(event_pool, fd); + idx = __event_slot_alloc(event_pool, fd, notify_poller_death); } pthread_mutex_unlock(&event_pool->mutex); @@ -155,6 +167,7 @@ __event_slot_dealloc(struct event_pool *event_pool, int idx) slot->fd = -1; slot->handled_error = 0; slot->in_handler = 0; + list_del_init(&slot->poller_death); event_pool->slots_used[table_idx]--; return; @@ -172,6 +185,15 @@ event_slot_dealloc(struct event_pool *event_pool, int idx) return; } +static int +event_slot_ref(struct event_slot_epoll *slot) +{ + if (!slot) + return -1; + + return GF_ATOMIC_INC(slot->ref); +} + static struct event_slot_epoll * event_slot_get(struct event_pool *event_pool, int idx) { @@ -188,12 +210,41 @@ event_slot_get(struct event_pool *event_pool, int idx) return NULL; slot = &table[offset]; - GF_ATOMIC_INC(slot->ref); + event_slot_ref(slot); return slot; } static void +__event_slot_unref(struct event_pool *event_pool, struct event_slot_epoll *slot, + int idx) +{ + int ref = -1; + int fd = -1; + int do_close = 0; + + ref = GF_ATOMIC_DEC(slot->ref); + if (ref) + /* slot still alive */ + goto done; + + LOCK(&slot->lock); + { + fd = slot->fd; + do_close = slot->do_close; + slot->do_close = 0; + } + UNLOCK(&slot->lock); + + __event_slot_dealloc(event_pool, idx); + + if (do_close) + sys_close(fd); +done: + return; +} + +static void event_slot_unref(struct event_pool *event_pool, struct event_slot_epoll *slot, int idx) { @@ -248,7 +299,7 @@ event_pool_new_epoll(int count, int eventthreadcount) event_pool->fd = epfd; event_pool->count = count; - + INIT_LIST_HEAD(&event_pool->poller_death); event_pool->eventthreadcount = eventthreadcount; event_pool->auto_thread_count = 0; @@ -297,7 +348,7 @@ __slot_update_events(struct event_slot_epoll *slot, int poll_in, int poll_out) int event_register_epoll(struct event_pool *event_pool, int fd, event_handler_t handler, void *data, int poll_in, - int poll_out) + int poll_out, char notify_poller_death) { int idx = -1; int ret = -1; @@ -328,7 +379,7 @@ event_register_epoll(struct event_pool *event_pool, int fd, if (destroy == 1) goto out; - idx = event_slot_alloc(event_pool, fd); + idx = event_slot_alloc(event_pool, fd, notify_poller_death); if (idx == -1) { gf_msg("epoll", GF_LOG_ERROR, 0, LG_MSG_SLOT_NOT_FOUND, "could not find slot for fd=%d", fd); @@ -591,7 +642,7 @@ pre_unlock: ret = handler(fd, idx, gen, data, (event->events & (EPOLLIN | EPOLLPRI)), (event->events & (EPOLLOUT)), - (event->events & (EPOLLERR | EPOLLHUP))); + (event->events & (EPOLLERR | EPOLLHUP)), 0); } out: event_slot_unref(event_pool, slot, idx); @@ -607,7 +658,9 @@ event_dispatch_epoll_worker(void *data) struct event_thread_data *ev_data = data; struct event_pool *event_pool; int myindex = -1; - int timetodie = 0; + int timetodie = 0, gen = 0; + struct list_head poller_death_notify; + struct event_slot_epoll *slot = NULL, *tmp = NULL; GF_VALIDATE_OR_GOTO("event", ev_data, out); @@ -619,7 +672,7 @@ event_dispatch_epoll_worker(void *data) gf_msg("epoll", GF_LOG_INFO, 0, LG_MSG_STARTED_EPOLL_THREAD, "Started" " thread with index %d", - myindex); + myindex - 1); pthread_mutex_lock(&event_pool->mutex); { @@ -637,20 +690,55 @@ event_dispatch_epoll_worker(void *data) pthread_mutex_lock(&event_pool->mutex); { if (event_pool->eventthreadcount < myindex) { + while (event_pool->poller_death_sliced) { + pthread_cond_wait(&event_pool->cond, + &event_pool->mutex); + } + + INIT_LIST_HEAD(&poller_death_notify); /* if found true in critical section, * die */ event_pool->pollers[myindex - 1] = 0; event_pool->activethreadcount--; timetodie = 1; + gen = ++event_pool->poller_gen; + list_for_each_entry(slot, &event_pool->poller_death, + poller_death) + { + event_slot_ref(slot); + } + + list_splice_init(&event_pool->poller_death, + &poller_death_notify); + event_pool->poller_death_sliced = 1; pthread_cond_broadcast(&event_pool->cond); } } pthread_mutex_unlock(&event_pool->mutex); if (timetodie) { + list_for_each_entry(slot, &poller_death_notify, poller_death) + { + slot->handler(slot->fd, 0, gen, slot->data, 0, 0, 0, 1); + } + + pthread_mutex_lock(&event_pool->mutex); + { + list_for_each_entry_safe(slot, tmp, &poller_death_notify, + poller_death) + { + __event_slot_unref(event_pool, slot, slot->idx); + } + + list_splice(&poller_death_notify, + &event_pool->poller_death); + event_pool->poller_death_sliced = 0; + pthread_cond_broadcast(&event_pool->cond); + } + pthread_mutex_unlock(&event_pool->mutex); + gf_msg("epoll", GF_LOG_INFO, 0, LG_MSG_EXITED_EPOLL_THREAD, - "Exited " - "thread with index %d", - myindex); + "Exited thread with index %d", myindex); + goto out; } } diff --git a/libglusterfs/src/event-poll.c b/libglusterfs/src/event-poll.c index 727d2a000a2..5bac4291c47 100644 --- a/libglusterfs/src/event-poll.c +++ b/libglusterfs/src/event-poll.c @@ -33,11 +33,11 @@ struct event_slot_poll { static int event_register_poll(struct event_pool *event_pool, int fd, event_handler_t handler, void *data, int poll_in, - int poll_out); + int poll_out, char notify_poller_death); static int __flush_fd(int fd, int idx, int gen, void *data, int poll_in, int poll_out, - int poll_err) + int poll_err, char event_thread_died) { char buf[64]; int ret = -1; @@ -146,7 +146,7 @@ event_pool_new_poll(int count, int eventthreadcount) } ret = event_register_poll(event_pool, event_pool->breaker[0], __flush_fd, - NULL, 1, 0); + NULL, 1, 0, 0); if (ret == -1) { gf_msg("poll", GF_LOG_ERROR, 0, LG_MSG_REGISTER_PIPE_FAILED, "could not register pipe fd with poll event loop"); @@ -180,7 +180,7 @@ event_pool_new_poll(int count, int eventthreadcount) static int event_register_poll(struct event_pool *event_pool, int fd, event_handler_t handler, void *data, int poll_in, - int poll_out) + int poll_out, char notify_poller_death) { int idx = -1; @@ -378,7 +378,7 @@ unlock: ret = handler(ufds[i].fd, idx, 0, data, (ufds[i].revents & (POLLIN | POLLPRI)), (ufds[i].revents & (POLLOUT)), - (ufds[i].revents & (POLLERR | POLLHUP | POLLNVAL))); + (ufds[i].revents & (POLLERR | POLLHUP | POLLNVAL)), 0); return ret; } diff --git a/libglusterfs/src/event.c b/libglusterfs/src/event.c index 49f70c83366..ddba9810b0b 100644 --- a/libglusterfs/src/event.c +++ b/libglusterfs/src/event.c @@ -54,14 +54,14 @@ event_pool_new(int count, int eventthreadcount) int event_register(struct event_pool *event_pool, int fd, event_handler_t handler, - void *data, int poll_in, int poll_out) + void *data, int poll_in, int poll_out, char notify_poller_death) { int ret = -1; GF_VALIDATE_OR_GOTO("event", event_pool, out); - ret = event_pool->ops->event_register(event_pool, fd, handler, data, - poll_in, poll_out); + ret = event_pool->ops->event_register( + event_pool, fd, handler, data, poll_in, poll_out, notify_poller_death); out: return ret; } @@ -161,7 +161,7 @@ out: int poller_destroy_handler(int fd, int idx, int gen, void *data, int poll_out, - int poll_in, int poll_err) + int poll_in, int poll_err, char event_thread_exit) { struct event_destroy_data *destroy = NULL; int readfd = -1, ret = -1; @@ -233,7 +233,7 @@ event_dispatch_destroy(struct event_pool *event_pool) /* From the main thread register an event on the pipe fd[0], */ - idx = event_register(event_pool, fd[0], poller_destroy_handler, &data, 1, + idx = event_register(event_pool, fd[0], poller_destroy_handler, &data, 1, 0, 0); if (idx < 0) goto out; diff --git a/libglusterfs/src/gf-event.h b/libglusterfs/src/gf-event.h index 5c3724cc953..5d92a2dd285 100644 --- a/libglusterfs/src/gf-event.h +++ b/libglusterfs/src/gf-event.h @@ -12,6 +12,7 @@ #define _GF_EVENT_H_ #include <pthread.h> +#include "list.h" struct event_pool; struct event_ops; @@ -23,7 +24,8 @@ struct event_data { } __attribute__((__packed__, __may_alias__)); typedef int (*event_handler_t)(int fd, int idx, int gen, void *data, - int poll_in, int poll_out, int poll_err); + int poll_in, int poll_out, int poll_err, + char event_thread_exit); #define EVENT_EPOLL_TABLES 1024 #define EVENT_EPOLL_SLOTS 1024 @@ -40,6 +42,13 @@ struct event_pool { struct event_slot_epoll *ereg[EVENT_EPOLL_TABLES]; int slots_used[EVENT_EPOLL_TABLES]; + struct list_head poller_death; + int poller_death_sliced; /* track whether the list of fds interested + * poller_death is sliced. If yes, new thread death + * notification has to wait till the list is added + * back + */ + int poller_gen; int used; int changed; @@ -52,8 +61,8 @@ struct event_pool { /* NOTE: Currently used only when event processing is done using * epoll. */ int eventthreadcount; /* number of event threads to execute. */ - pthread_t pollers[EVENT_MAX_THREADS]; /* poller thread_id store, - * and live status */ + pthread_t pollers[EVENT_MAX_THREADS]; /* poller thread_id store, and live + status */ int destroy; int activethreadcount; @@ -81,7 +90,7 @@ struct event_ops { int (*event_register)(struct event_pool *event_pool, int fd, event_handler_t handler, void *data, int poll_in, - int poll_out); + int poll_out, char notify_poller_death); int (*event_select_on)(struct event_pool *event_pool, int fd, int idx, int poll_in, int poll_out); @@ -107,7 +116,7 @@ event_select_on(struct event_pool *event_pool, int fd, int idx, int poll_in, int poll_out); int event_register(struct event_pool *event_pool, int fd, event_handler_t handler, - void *data, int poll_in, int poll_out); + void *data, int poll_in, int poll_out, char notify_poller_death); int event_unregister(struct event_pool *event_pool, int fd, int idx); int diff --git a/rpc/rpc-lib/src/autoscale-threads.c b/rpc/rpc-lib/src/autoscale-threads.c index 337f002df10..d629a1cd430 100644 --- a/rpc/rpc-lib/src/autoscale-threads.c +++ b/rpc/rpc-lib/src/autoscale-threads.c @@ -19,5 +19,4 @@ rpcsvc_autoscale_threads(glusterfs_ctx_t *ctx, rpcsvc_t *rpc, int incr) pool->auto_thread_count += incr; (void)event_reconfigure_threads(pool, thread_count + incr); - rpcsvc_ownthread_reconf(rpc, pool->eventthreadcount); } diff --git a/rpc/rpc-lib/src/libgfrpc.sym b/rpc/rpc-lib/src/libgfrpc.sym index a7cb5f6b5cb..4f42485044f 100644 --- a/rpc/rpc-lib/src/libgfrpc.sym +++ b/rpc/rpc-lib/src/libgfrpc.sym @@ -51,7 +51,6 @@ rpcsvc_transport_connect rpcsvc_transport_getpeeraddr rpcsvc_unregister_notify rpcsvc_volume_allowed -rpcsvc_ownthread_reconf rpc_transport_count rpc_transport_connect rpc_transport_disconnect diff --git a/rpc/rpc-lib/src/rpc-clnt.c b/rpc/rpc-lib/src/rpc-clnt.c index 2505998b3d4..b26d645bb12 100644 --- a/rpc/rpc-lib/src/rpc-clnt.c +++ b/rpc/rpc-lib/src/rpc-clnt.c @@ -969,6 +969,12 @@ rpc_clnt_notify(rpc_transport_t *trans, void *mydata, */ ret = 0; break; + + case RPC_TRANSPORT_EVENT_THREAD_DIED: + /* only meaningful on a server, no need of handling this event on a + * client */ + ret = 0; + break; } out: diff --git a/rpc/rpc-lib/src/rpc-transport.c b/rpc/rpc-lib/src/rpc-transport.c index d70334476c7..54636dcbf00 100644 --- a/rpc/rpc-lib/src/rpc-transport.c +++ b/rpc/rpc-lib/src/rpc-transport.c @@ -266,6 +266,10 @@ rpc_transport_load(glusterfs_ctx_t *ctx, dict_t *options, char *trans_name) goto fail; } + if (dict_get(options, "notify-poller-death")) { + trans->notify_poller_death = 1; + } + gf_log("rpc-transport", GF_LOG_DEBUG, "attempt to load file %s", name); handle = dlopen(name, RTLD_NOW); diff --git a/rpc/rpc-lib/src/rpc-transport.h b/rpc/rpc-lib/src/rpc-transport.h index c238501b5c7..fd737d0c764 100644 --- a/rpc/rpc-lib/src/rpc-transport.h +++ b/rpc/rpc-lib/src/rpc-transport.h @@ -97,6 +97,7 @@ typedef enum { RPC_TRANSPORT_MSG_RECEIVED, /* Complete rpc msg has been read */ RPC_TRANSPORT_CONNECT, /* client is connected to server */ RPC_TRANSPORT_MSG_SENT, + RPC_TRANSPORT_EVENT_THREAD_DIED /* event-thread has died */ } rpc_transport_event_t; struct rpc_transport_msg { @@ -213,6 +214,8 @@ struct rpc_transport { * layer or in client management notification handler functions */ gf_boolean_t connect_failed; + char notify_poller_death; + char poller_death_accept; }; struct rpc_transport_ops { diff --git a/rpc/rpc-lib/src/rpcsvc.c b/rpc/rpc-lib/src/rpcsvc.c index c6545193a11..d678bca43a8 100644 --- a/rpc/rpc-lib/src/rpcsvc.c +++ b/rpc/rpc-lib/src/rpcsvc.c @@ -36,6 +36,7 @@ #include <fnmatch.h> #include <stdarg.h> #include <stdio.h> +#include <math.h> #ifdef IPV6_DEFAULT #include <netconfig.h> @@ -63,10 +64,76 @@ rpcsvc_get_listener(rpcsvc_t *svc, uint16_t port, rpc_transport_t *trans); int rpcsvc_notify(rpc_transport_t *trans, void *mydata, rpc_transport_event_t event, void *data, ...); +void * +rpcsvc_request_handler(void *arg); static int rpcsvc_match_subnet_v4(const char *addrtok, const char *ipaddr); +void +rpcsvc_toggle_queue_status(rpcsvc_program_t *prog, + rpcsvc_request_queue_t *queue, char status[]) +{ + int queue_index = 0, status_index = 0, set_bit = 0; + + if (queue != &prog->request_queue[0]) { + queue_index = (queue - &prog->request_queue[0]); + } + + status_index = queue_index / 8; + set_bit = queue_index % 8; + + status[status_index] ^= (1 << set_bit); + + return; +} + +static int +get_rightmost_set_bit(int n) +{ + return log2(n & -n); +} + +int +rpcsvc_get_free_queue_index(rpcsvc_program_t *prog) +{ + int queue_index = 0, max_index = 0, i = 0; + unsigned int right_most_unset_bit = 0; + + right_most_unset_bit = 8; + + max_index = gf_roof(EVENT_MAX_THREADS, 8) / 8; + for (i = 0; i < max_index; i++) { + if (prog->request_queue_status[i] == 0) { + right_most_unset_bit = 0; + break; + } else { + right_most_unset_bit = get_rightmost_set_bit( + ~prog->request_queue_status[i]); + if (right_most_unset_bit < 8) { + break; + } + } + } + + if (right_most_unset_bit > 7) { + queue_index = -1; + } else { + queue_index = i * 8; + queue_index += right_most_unset_bit; + + if (queue_index > EVENT_MAX_THREADS) { + queue_index = -1; + } + } + + if (queue_index != -1) { + prog->request_queue_status[i] |= (0x1 << right_most_unset_bit); + } + + return queue_index; +} + rpcsvc_notify_wrapper_t * rpcsvc_notify_wrapper_alloc(void) { @@ -575,6 +642,73 @@ rpcsvc_check_and_reply_error(int ret, call_frame_t *frame, void *opaque) return 0; } +void +rpcsvc_queue_event_thread_death(rpcsvc_t *svc, rpcsvc_program_t *prog, int gen) +{ + rpcsvc_request_queue_t *queue = NULL; + int num = 0; + void *value = NULL; + rpcsvc_request_t *req = NULL; + char empty = 0; + + value = pthread_getspecific(prog->req_queue_key); + if (value == NULL) { + return; + } + + num = ((unsigned long)value) - 1; + + queue = &prog->request_queue[num]; + + if (queue->gen == gen) { + /* duplicate event */ + gf_log(GF_RPCSVC, GF_LOG_INFO, + "not queuing duplicate event thread death. " + "queue %d program %s", + num, prog->progname); + return; + } + + rpcsvc_alloc_request(svc, req); + req->prognum = RPCSVC_INFRA_PROGRAM; + req->procnum = RPCSVC_PROC_EVENT_THREAD_DEATH; + gf_log(GF_RPCSVC, GF_LOG_INFO, + "queuing event thread death request to queue %d of program %s", num, + prog->progname); + + pthread_mutex_lock(&queue->queue_lock); + { + empty = list_empty(&queue->request_queue); + + list_add_tail(&req->request_list, &queue->request_queue); + queue->gen = gen; + + if (empty && queue->waiting) + pthread_cond_signal(&queue->queue_cond); + } + pthread_mutex_unlock(&queue->queue_lock); + + return; +} + +int +rpcsvc_handle_event_thread_death(rpcsvc_t *svc, rpc_transport_t *trans, int gen) +{ + rpcsvc_program_t *prog = NULL; + + pthread_rwlock_rdlock(&svc->rpclock); + { + list_for_each_entry(prog, &svc->programs, program) + { + if (prog->ownthread) + rpcsvc_queue_event_thread_death(svc, prog, gen); + } + } + pthread_rwlock_unlock(&svc->rpclock); + + return 0; +} + int rpcsvc_handle_rpc_call(rpcsvc_t *svc, rpc_transport_t *trans, rpc_transport_pollin_t *msg) @@ -585,9 +719,12 @@ rpcsvc_handle_rpc_call(rpcsvc_t *svc, rpc_transport_t *trans, int ret = -1; uint16_t port = 0; gf_boolean_t is_unix = _gf_false, empty = _gf_false; - gf_boolean_t unprivileged = _gf_false; + gf_boolean_t unprivileged = _gf_false, spawn_request_handler = 0; drc_cached_op_t *reply = NULL; rpcsvc_drc_globals_t *drc = NULL; + rpcsvc_request_queue_t *queue = NULL; + long num = 0; + void *value = NULL; if (!trans || !svc) return -1; @@ -700,19 +837,81 @@ rpcsvc_handle_rpc_call(rpcsvc_t *svc, rpc_transport_t *trans, ret = synctask_new(THIS->ctx->env, (synctask_fn_t)actor_fn, rpcsvc_check_and_reply_error, NULL, req); } else if (req->ownthread) { - pthread_mutex_lock(&req->prog->queue_lock); + value = pthread_getspecific(req->prog->req_queue_key); + if (value == NULL) { + pthread_mutex_lock(&req->prog->thr_lock); + { + num = rpcsvc_get_free_queue_index(req->prog); + if (num != -1) { + num++; + value = (void *)num; + ret = pthread_setspecific(req->prog->req_queue_key, + value); + if (ret < 0) { + gf_log(GF_RPCSVC, GF_LOG_WARNING, + "setting request queue in TLS failed"); + rpcsvc_toggle_queue_status( + req->prog, &req->prog->request_queue[num - 1], + req->prog->request_queue_status); + num = -1; + } else { + spawn_request_handler = 1; + } + } + } + pthread_mutex_unlock(&req->prog->thr_lock); + } + + if (num == -1) + goto noqueue; + + num = ((unsigned long)value) - 1; + + queue = &req->prog->request_queue[num]; + + if (spawn_request_handler) { + ret = gf_thread_create(&queue->thread, NULL, + rpcsvc_request_handler, queue, + "rpcrqhnd"); + if (!ret) { + gf_log(GF_RPCSVC, GF_LOG_INFO, + "spawned a request handler thread for queue %d", + (int)num); + + req->prog->threadcount++; + } else { + gf_log( + GF_RPCSVC, GF_LOG_INFO, + "spawning a request handler thread for queue %d failed", + (int)num); + ret = pthread_setspecific(req->prog->req_queue_key, 0); + if (ret < 0) { + gf_log(GF_RPCSVC, GF_LOG_WARNING, + "resetting request queue in TLS failed"); + } + + rpcsvc_toggle_queue_status( + req->prog, &req->prog->request_queue[num - 1], + req->prog->request_queue_status); + + goto noqueue; + } + } + + pthread_mutex_lock(&queue->queue_lock); { - empty = list_empty(&req->prog->request_queue); + empty = list_empty(&queue->request_queue); - list_add_tail(&req->request_list, &req->prog->request_queue); + list_add_tail(&req->request_list, &queue->request_queue); - if (empty) - pthread_cond_signal(&req->prog->queue_cond); + if (empty && queue->waiting) + pthread_cond_signal(&queue->queue_cond); } - pthread_mutex_unlock(&req->prog->queue_lock); + pthread_mutex_unlock(&queue->queue_lock); ret = 0; } else { + noqueue: ret = actor_fn(req); } } @@ -839,6 +1038,12 @@ rpcsvc_notify(rpc_transport_t *trans, void *mydata, rpc_transport_event_t event, "got MAP_XID event, which should have not come"); ret = 0; break; + + case RPC_TRANSPORT_EVENT_THREAD_DIED: + rpcsvc_handle_event_thread_death(svc, trans, + (int)(unsigned long)data); + ret = 0; + break; } out: @@ -1877,6 +2082,7 @@ rpcsvc_create_listeners(rpcsvc_t *svc, dict_t *options, char *name) goto out; } + dict_del(options, "notify-poller-death"); GF_FREE(transport_name); transport_name = NULL; count++; @@ -1961,55 +2167,86 @@ out: void * rpcsvc_request_handler(void *arg) { - rpcsvc_program_t *program = arg; - rpcsvc_request_t *req = NULL; + rpcsvc_request_queue_t *queue = NULL; + rpcsvc_program_t *program = NULL; + rpcsvc_request_t *req = NULL, *tmp_req = NULL; rpcsvc_actor_t *actor = NULL; gf_boolean_t done = _gf_false; int ret = 0; + struct list_head tmp_list = { + 0, + }; + + queue = arg; + program = queue->program; + + INIT_LIST_HEAD(&tmp_list); if (!program) return NULL; while (1) { - pthread_mutex_lock(&program->queue_lock); + pthread_mutex_lock(&queue->queue_lock); { - if (!program->alive && list_empty(&program->request_queue)) { + if (!program->alive && list_empty(&queue->request_queue)) { done = 1; goto unlock; } - while (list_empty(&program->request_queue) && - (program->threadcount <= program->eventthreadcount)) { - pthread_cond_wait(&program->queue_cond, &program->queue_lock); + while (list_empty(&queue->request_queue)) { + queue->waiting = _gf_true; + pthread_cond_wait(&queue->queue_cond, &queue->queue_lock); } - if (program->threadcount > program->eventthreadcount) { - done = 1; - program->threadcount--; + queue->waiting = _gf_false; - gf_log(GF_RPCSVC, GF_LOG_INFO, - "program '%s' thread terminated; " - "total count:%d", - program->progname, program->threadcount); - } else if (!list_empty(&program->request_queue)) { - req = list_entry(program->request_queue.next, typeof(*req), - request_list); - - list_del_init(&req->request_list); + if (!list_empty(&queue->request_queue)) { + INIT_LIST_HEAD(&tmp_list); + list_splice_init(&queue->request_queue, &tmp_list); } } unlock: - pthread_mutex_unlock(&program->queue_lock); - - if (req) { - THIS = req->svc->xl; - actor = rpcsvc_program_actor(req); - ret = actor->actor(req); + pthread_mutex_unlock(&queue->queue_lock); - if (ret != 0) { - rpcsvc_check_and_reply_error(ret, NULL, req); + list_for_each_entry_safe(req, tmp_req, &tmp_list, request_list) + { + list_del_init(&req->request_list); + + if (req) { + if (req->prognum == RPCSVC_INFRA_PROGRAM) { + switch (req->procnum) { + case RPCSVC_PROC_EVENT_THREAD_DEATH: + gf_log(GF_RPCSVC, GF_LOG_INFO, + "event thread died, exiting request handler " + "thread for queue %d of program %s", + (int)(queue - &program->request_queue[0]), + program->progname); + done = 1; + pthread_mutex_lock(&program->thr_lock); + { + rpcsvc_toggle_queue_status( + program, queue, + program->request_queue_status); + program->threadcount--; + } + pthread_mutex_unlock(&program->thr_lock); + rpcsvc_request_destroy(req); + break; + + default: + break; + } + } else { + THIS = req->svc->xl; + actor = rpcsvc_program_actor(req); + ret = actor->actor(req); + + if (ret != 0) { + rpcsvc_check_and_reply_error(ret, NULL, req); + } + req = NULL; + } } - req = NULL; } if (done) @@ -2020,59 +2257,10 @@ rpcsvc_request_handler(void *arg) } int -rpcsvc_spawn_threads(rpcsvc_t *svc, rpcsvc_program_t *program) -{ - int ret = 0, delta = 0, creates = 0; - - if (!program || !svc) - goto out; - - pthread_mutex_lock(&program->queue_lock); - { - delta = program->eventthreadcount - program->threadcount; - - if (delta >= 0) { - while (delta--) { - ret = gf_thread_create(&program->thread, NULL, - rpcsvc_request_handler, program, - "rpcrqhnd"); - if (!ret) { - program->threadcount++; - creates++; - } - } - - if (creates) { - gf_log(GF_RPCSVC, GF_LOG_INFO, - "spawned %d threads for program '%s'; " - "total count:%d", - creates, program->progname, program->threadcount); - } - } else { - gf_log(GF_RPCSVC, GF_LOG_INFO, - "terminating %d threads for program '%s'", -delta, - program->progname); - - /* this signal is to just wake up the threads so they - * test for the change in eventthreadcount and kill - * themselves until the program thread count becomes - * equal to the event thread count - */ - pthread_cond_broadcast(&program->queue_cond); - } - } - pthread_mutex_unlock(&program->queue_lock); - -out: - return creates; -} - -int rpcsvc_program_register(rpcsvc_t *svc, rpcsvc_program_t *program, gf_boolean_t add_to_head) { - int ret = -1; - int creates = -1; + int ret = -1, i = 0; rpcsvc_program_t *newprog = NULL; char already_registered = 0; @@ -2110,9 +2298,16 @@ rpcsvc_program_register(rpcsvc_t *svc, rpcsvc_program_t *program, memcpy(newprog, program, sizeof(*program)); INIT_LIST_HEAD(&newprog->program); - INIT_LIST_HEAD(&newprog->request_queue); - pthread_mutex_init(&newprog->queue_lock, NULL); - pthread_cond_init(&newprog->queue_cond, NULL); + + for (i = 0; i < EVENT_MAX_THREADS; i++) { + INIT_LIST_HEAD(&newprog->request_queue[i].request_queue); + pthread_mutex_init(&newprog->request_queue[i].queue_lock, NULL); + pthread_cond_init(&newprog->request_queue[i].queue_cond, NULL); + newprog->request_queue[i].program = newprog; + } + + pthread_mutex_init(&newprog->thr_lock, NULL); + pthread_cond_init(&newprog->thr_cond, NULL); newprog->alive = _gf_true; @@ -2121,12 +2316,11 @@ rpcsvc_program_register(rpcsvc_t *svc, rpcsvc_program_t *program, newprog->ownthread = _gf_false; if (newprog->ownthread) { - newprog->eventthreadcount = 1; - creates = rpcsvc_spawn_threads(svc, newprog); + struct event_pool *ep = svc->ctx->event_pool; + newprog->eventthreadcount = ep->eventthreadcount; - if (creates < 1) { - goto out; - } + pthread_key_create(&newprog->req_queue_key, NULL); + newprog->thr_queue = 1; } pthread_rwlock_wrlock(&svc->rpclock); @@ -3003,38 +3197,6 @@ out: return ret; } -/* During reconfigure, Make sure to call this function after event-threads are - * reconfigured as programs' threadcount will be made equal to event threads. - */ - -int -rpcsvc_ownthread_reconf(rpcsvc_t *svc, int new_eventthreadcount) -{ - int ret = -1; - rpcsvc_program_t *program = NULL; - - if (!svc) { - ret = 0; - goto out; - } - - pthread_rwlock_wrlock(&svc->rpclock); - { - list_for_each_entry(program, &svc->programs, program) - { - if (program->ownthread) { - program->eventthreadcount = new_eventthreadcount; - rpcsvc_spawn_threads(svc, program); - } - } - } - pthread_rwlock_unlock(&svc->rpclock); - - ret = 0; -out: - return ret; -} - rpcsvc_actor_t gluster_dump_actors[GF_DUMP_MAXVALUE] = { [GF_DUMP_NULL] = {"NULL", GF_DUMP_NULL, NULL, NULL, 0, DRC_NA}, [GF_DUMP_DUMP] = {"DUMP", GF_DUMP_DUMP, rpcsvc_dump, NULL, 0, DRC_NA}, diff --git a/rpc/rpc-lib/src/rpcsvc.h b/rpc/rpc-lib/src/rpcsvc.h index ebb836fba3f..8388dd404c5 100644 --- a/rpc/rpc-lib/src/rpcsvc.h +++ b/rpc/rpc-lib/src/rpcsvc.h @@ -33,6 +33,16 @@ #define MAX_IOVEC 16 #endif +/* TODO: we should store prognums at a centralized location to avoid conflict + or use a robust random number generator to avoid conflicts +*/ + +#define RPCSVC_INFRA_PROGRAM 7712846 /* random number */ + +typedef enum { + RPCSVC_PROC_EVENT_THREAD_DEATH = 0, +} rpcsvc_infra_procnum_t; + #define RPCSVC_DEFAULT_OUTSTANDING_RPC_LIMIT \ 64 /* Default for protocol/server */ #define RPCSVC_DEF_NFS_OUTSTANDING_RPC_LIMIT 16 /* Default for nfs/server */ @@ -362,6 +372,16 @@ typedef struct rpcsvc_actor_desc { drc_op_type_t op_type; } rpcsvc_actor_t; +typedef struct rpcsvc_request_queue { + int gen; + struct list_head request_queue; + pthread_mutex_t queue_lock; + pthread_cond_t queue_cond; + pthread_t thread; + struct rpcsvc_program *program; + gf_boolean_t waiting; +} rpcsvc_request_queue_t; + /* Describes a program and its version along with the function pointers * required to handle the procedures/actors of each program/version. * Never changed ever by any thread so no need for a lock. @@ -421,11 +441,14 @@ struct rpcsvc_program { gf_boolean_t synctask; /* list member to link to list of registered services with rpcsvc */ struct list_head program; - struct list_head request_queue; - pthread_mutex_t queue_lock; - pthread_cond_t queue_cond; - pthread_t thread; + rpcsvc_request_queue_t request_queue[EVENT_MAX_THREADS]; + char request_queue_status[EVENT_MAX_THREADS / 8 + 1]; + pthread_mutex_t thr_lock; + pthread_cond_t thr_cond; int threadcount; + int thr_queue; + pthread_key_t req_queue_key; + /* eventthreadcount is just a readonly copy of the actual value * owned by the event sub-system * It is used to control the scaling of rpcsvc_request_handler threads @@ -652,9 +675,6 @@ rpcsvc_auth_array(rpcsvc_t *svc, char *volname, int *autharr, int arrlen); rpcsvc_vector_sizer rpcsvc_get_program_vector_sizer(rpcsvc_t *svc, uint32_t prognum, uint32_t progver, int procnum); -extern int -rpcsvc_ownthread_reconf(rpcsvc_t *svc, int new_eventthreadcount); - void rpcsvc_autoscale_threads(glusterfs_ctx_t *ctx, rpcsvc_t *rpc, int incr); #endif diff --git a/rpc/rpc-transport/socket/src/socket.c b/rpc/rpc-transport/socket/src/socket.c index dc227137d57..776e647d4f6 100644 --- a/rpc/rpc-transport/socket/src/socket.c +++ b/rpc/rpc-transport/socket/src/socket.c @@ -2859,7 +2859,7 @@ socket_complete_connection(rpc_transport_t *this) /* reads rpc_requests during pollin */ static int socket_event_handler(int fd, int idx, int gen, void *data, int poll_in, - int poll_out, int poll_err) + int poll_out, int poll_err, char event_thread_died) { rpc_transport_t *this = NULL; socket_private_t *priv = NULL; @@ -2869,6 +2869,11 @@ socket_event_handler(int fd, int idx, int gen, void *data, int poll_in, this = data; + if (event_thread_died) { + /* to avoid duplicate notifications, notify only for listener sockets */ + return 0; + } + GF_VALIDATE_OR_GOTO("socket", this, out); GF_VALIDATE_OR_GOTO("socket", this->private, out); GF_VALIDATE_OR_GOTO("socket", this->xl, out); @@ -2967,7 +2972,7 @@ out: static int socket_server_event_handler(int fd, int idx, int gen, void *data, int poll_in, - int poll_out, int poll_err) + int poll_out, int poll_err, char event_thread_died) { rpc_transport_t *this = NULL; socket_private_t *priv = NULL; @@ -2991,6 +2996,12 @@ socket_server_event_handler(int fd, int idx, int gen, void *data, int poll_in, priv = this->private; ctx = this->ctx; + if (event_thread_died) { + rpc_transport_notify(this, RPC_TRANSPORT_EVENT_THREAD_DIED, + (void *)(unsigned long)gen); + return 0; + } + /* NOTE: * We have done away with the critical section in this function. since * there's little that it helps with. There's no other code that @@ -3099,6 +3110,7 @@ socket_server_event_handler(int fd, int idx, int gen, void *data, int poll_in, new_trans->mydata = this->mydata; new_trans->notify = this->notify; new_trans->listener = this; + new_trans->notify_poller_death = this->poller_death_accept; new_priv = new_trans->private; if (new_sockaddr.ss_family == AF_UNIX) { @@ -3149,9 +3161,9 @@ socket_server_event_handler(int fd, int idx, int gen, void *data, int poll_in, ret = rpc_transport_notify(this, RPC_TRANSPORT_ACCEPT, new_trans); if (ret != -1) { - new_priv->idx = event_register(ctx->event_pool, new_sock, - socket_event_handler, new_trans, - 1, 0); + new_priv->idx = event_register( + ctx->event_pool, new_sock, socket_event_handler, new_trans, + 1, 0, new_trans->notify_poller_death); if (new_priv->idx == -1) { ret = -1; gf_log(this->name, GF_LOG_ERROR, @@ -3530,7 +3542,8 @@ socket_connect(rpc_transport_t *this, int port) this->listener = this; priv->idx = event_register(ctx->event_pool, priv->sock, - socket_event_handler, this, 1, 1); + socket_event_handler, this, 1, 1, + this->notify_poller_death); if (priv->idx == -1) { gf_log("", GF_LOG_WARNING, "failed to register the event; " @@ -3709,7 +3722,8 @@ socket_listen(rpc_transport_t *this) rpc_transport_ref(this); priv->idx = event_register(ctx->event_pool, priv->sock, - socket_server_event_handler, this, 1, 0); + socket_server_event_handler, this, 1, 0, + this->notify_poller_death); if (priv->idx == -1) { gf_log(this->name, GF_LOG_WARNING, diff --git a/xlators/protocol/server/src/server.c b/xlators/protocol/server/src/server.c index 8d8e8fc5718..77e5d74e7c5 100644 --- a/xlators/protocol/server/src/server.c +++ b/xlators/protocol/server/src/server.c @@ -929,12 +929,6 @@ do_rpc: if (ret) goto out; - /* rpcsvc thread reconfigure should be after events thread - * reconfigure - */ - new_nthread = ((struct event_pool *)(this->ctx->event_pool)) - ->eventthreadcount; - ret = rpcsvc_ownthread_reconf(rpc_conf, new_nthread); out: THIS = oldTHIS; gf_msg_debug("", 0, "returning %d", ret); @@ -1133,6 +1127,9 @@ server_init(xlator_t *this) ret = -1; goto out; } + + ret = dict_set_int32(this->options, "notify-poller-death", 1); + ret = rpcsvc_create_listeners(conf->rpc, this->options, this->name); if (ret < 1) { gf_msg(this->name, GF_LOG_WARNING, 0, |