diff options
Diffstat (limited to 'libglusterfs/src/event-epoll.c')
| -rw-r--r-- | libglusterfs/src/event-epoll.c | 1249 |
1 files changed, 909 insertions, 340 deletions
diff --git a/libglusterfs/src/event-epoll.c b/libglusterfs/src/event-epoll.c index 06b3236246a..fb4fb845b40 100644 --- a/libglusterfs/src/event-epoll.c +++ b/libglusterfs/src/event-epoll.c @@ -8,456 +8,1025 @@ cases as published by the Free Software Foundation. */ -#include <sys/poll.h> #include <pthread.h> -#include <unistd.h> -#include <fcntl.h> #include <stdlib.h> #include <errno.h> -#include <string.h> - -#include "logging.h" -#include "event.h" -#include "mem-pool.h" -#include "common-utils.h" - -#ifndef _CONFIG_H -#define _CONFIG_H -#include "config.h" -#endif +#include "glusterfs/gf-event.h" +#include "glusterfs/common-utils.h" +#include "glusterfs/syscall.h" +#include "glusterfs/libglusterfs-messages.h" #ifdef HAVE_SYS_EPOLL_H #include <sys/epoll.h> +struct event_slot_epoll { + int fd; + int events; + int gen; + int idx; + gf_atomic_t ref; + int do_close; + int in_handler; + int handled_error; + void *data; + event_handler_t handler; + gf_lock_t lock; + struct list_head poller_death; +}; -static int -__event_getindex (struct event_pool *event_pool, int fd, int idx) +struct event_thread_data { + struct event_pool *event_pool; + int event_index; +}; + +static struct event_slot_epoll * +__event_newtable(struct event_pool *event_pool, int table_idx) { - int ret = -1; - int i = 0; + struct event_slot_epoll *table = NULL; + int i = -1; - GF_VALIDATE_OR_GOTO ("event", event_pool, out); + table = GF_CALLOC(sizeof(*table), EVENT_EPOLL_SLOTS, gf_common_mt_ereg); + if (!table) + return NULL; - if (idx > -1 && idx < event_pool->used) { - if (event_pool->reg[idx].fd == fd) - ret = idx; - } + for (i = 0; i < EVENT_EPOLL_SLOTS; i++) { + table[i].fd = -1; + LOCK_INIT(&table[i].lock); + INIT_LIST_HEAD(&table[i].poller_death); + } - for (i=0; ret == -1 && i<event_pool->used; i++) { - if (event_pool->reg[i].fd == fd) { - ret = i; - break; - } - } + event_pool->ereg[table_idx] = table; + event_pool->slots_used[table_idx] = 0; -out: - return ret; + return table; } +static int +event_slot_ref(struct event_slot_epoll *slot) +{ + if (!slot) + return -1; + + return GF_ATOMIC_INC(slot->ref); +} -static struct event_pool * -event_pool_new_epoll (int count) +static int +__event_slot_alloc(struct event_pool *event_pool, int fd, + char notify_poller_death, struct event_slot_epoll **slot) { - struct event_pool *event_pool = NULL; - int epfd = -1; + int i = 0; + int j = 0; + int table_idx = -1; + int gen = -1; + struct event_slot_epoll *table = NULL; + +retry: + + while (i < EVENT_EPOLL_TABLES) { + switch (event_pool->slots_used[i]) { + case EVENT_EPOLL_SLOTS: + break; + case 0: + if (!event_pool->ereg[i]) { + table = __event_newtable(event_pool, i); + if (!table) + return -1; + } else { + table = event_pool->ereg[i]; + } + break; + default: + table = event_pool->ereg[i]; + break; + } - event_pool = GF_CALLOC (1, sizeof (*event_pool), - gf_common_mt_event_pool); + if (table) + /* break out of the loop */ + break; + i++; + } - if (!event_pool) - goto out; + if (!table) + return -1; - event_pool->count = count; - event_pool->reg = GF_CALLOC (event_pool->count, - sizeof (*event_pool->reg), - gf_common_mt_reg); + table_idx = i; - if (!event_pool->reg) { - GF_FREE (event_pool); - event_pool = NULL; - goto out; - } + for (j = 0; j < EVENT_EPOLL_SLOTS; j++) { + if (table[j].fd == -1) { + /* wipe everything except bump the generation */ + gen = table[j].gen; + memset(&table[j], 0, sizeof(table[j])); + table[j].gen = gen + 1; - epfd = epoll_create (count); + LOCK_INIT(&table[j].lock); + INIT_LIST_HEAD(&table[j].poller_death); - if (epfd == -1) { - gf_log ("epoll", GF_LOG_ERROR, "epoll fd creation failed (%s)", - strerror (errno)); - GF_FREE (event_pool->reg); - GF_FREE (event_pool); - event_pool = NULL; - goto out; + table[j].fd = fd; + if (notify_poller_death) { + table[j].idx = table_idx * EVENT_EPOLL_SLOTS + j; + list_add_tail(&table[j].poller_death, + &event_pool->poller_death); + } + + event_pool->slots_used[table_idx]++; + + break; } + } + + if (j == EVENT_EPOLL_SLOTS) { + table = NULL; + i++; + goto retry; + } else { + (*slot) = &table[j]; + event_slot_ref(*slot); + return table_idx * EVENT_EPOLL_SLOTS + j; + } +} - event_pool->fd = epfd; +static int +event_slot_alloc(struct event_pool *event_pool, int fd, + char notify_poller_death, struct event_slot_epoll **slot) +{ + int idx = -1; - event_pool->count = count; + pthread_mutex_lock(&event_pool->mutex); + { + idx = __event_slot_alloc(event_pool, fd, notify_poller_death, slot); + } + pthread_mutex_unlock(&event_pool->mutex); - pthread_mutex_init (&event_pool->mutex, NULL); - pthread_cond_init (&event_pool->cond, NULL); + return idx; +} -out: - return event_pool; +static void +__event_slot_dealloc(struct event_pool *event_pool, int idx) +{ + int table_idx = 0; + int offset = 0; + struct event_slot_epoll *table = NULL; + struct event_slot_epoll *slot = NULL; + int fd = -1; + + table_idx = idx / EVENT_EPOLL_SLOTS; + offset = idx % EVENT_EPOLL_SLOTS; + + table = event_pool->ereg[table_idx]; + if (!table) + return; + + slot = &table[offset]; + slot->gen++; + + fd = slot->fd; + slot->fd = -1; + slot->handled_error = 0; + slot->in_handler = 0; + list_del_init(&slot->poller_death); + if (fd != -1) + event_pool->slots_used[table_idx]--; + + return; } +static void +event_slot_dealloc(struct event_pool *event_pool, int idx) +{ + pthread_mutex_lock(&event_pool->mutex); + { + __event_slot_dealloc(event_pool, idx); + } + pthread_mutex_unlock(&event_pool->mutex); -int -event_register_epoll (struct event_pool *event_pool, int fd, - event_handler_t handler, - void *data, int poll_in, int poll_out) + return; +} + +static struct event_slot_epoll * +event_slot_get(struct event_pool *event_pool, int idx) { - int idx = -1; - int ret = -1; - struct epoll_event epoll_event = {0, }; - struct event_data *ev_data = (void *)&epoll_event.data; + struct event_slot_epoll *slot = NULL; + struct event_slot_epoll *table = NULL; + int table_idx = 0; + int offset = 0; + table_idx = idx / EVENT_EPOLL_SLOTS; + offset = idx % EVENT_EPOLL_SLOTS; - GF_VALIDATE_OR_GOTO ("event", event_pool, out); + table = event_pool->ereg[table_idx]; + if (!table) + return NULL; - pthread_mutex_lock (&event_pool->mutex); - { - if (event_pool->count == event_pool->used) { - event_pool->count *= 2; + slot = &table[offset]; - event_pool->reg = GF_REALLOC (event_pool->reg, - event_pool->count * - sizeof (*event_pool->reg)); + event_slot_ref(slot); + return slot; +} - if (!event_pool->reg) { - gf_log ("epoll", GF_LOG_ERROR, - "event registry re-allocation failed"); - goto unlock; - } - } +static void +__event_slot_unref(struct event_pool *event_pool, struct event_slot_epoll *slot, + int idx) +{ + int ref = -1; + int fd = -1; + int do_close = 0; + + ref = GF_ATOMIC_DEC(slot->ref); + if (ref) + /* slot still alive */ + goto done; + + LOCK(&slot->lock); + { + fd = slot->fd; + do_close = slot->do_close; + slot->do_close = 0; + } + UNLOCK(&slot->lock); + + __event_slot_dealloc(event_pool, idx); + + if (do_close) + sys_close(fd); +done: + return; +} - idx = event_pool->used; - event_pool->used++; - - event_pool->reg[idx].fd = fd; - event_pool->reg[idx].events = EPOLLPRI; - event_pool->reg[idx].handler = handler; - event_pool->reg[idx].data = data; - - switch (poll_in) { - case 1: - event_pool->reg[idx].events |= EPOLLIN; - break; - case 0: - event_pool->reg[idx].events &= ~EPOLLIN; - break; - case -1: - /* do nothing */ - break; - default: - gf_log ("epoll", GF_LOG_ERROR, - "invalid poll_in value %d", poll_in); - break; - } +static void +event_slot_unref(struct event_pool *event_pool, struct event_slot_epoll *slot, + int idx) +{ + int ref = -1; + int fd = -1; + int do_close = 0; + + ref = GF_ATOMIC_DEC(slot->ref); + if (ref) + /* slot still alive */ + goto done; + + LOCK(&slot->lock); + { + fd = slot->fd; + do_close = slot->do_close; + slot->do_close = 0; + } + UNLOCK(&slot->lock); + + event_slot_dealloc(event_pool, idx); + + if (do_close) + sys_close(fd); +done: + return; +} - switch (poll_out) { - case 1: - event_pool->reg[idx].events |= EPOLLOUT; - break; - case 0: - event_pool->reg[idx].events &= ~EPOLLOUT; - break; - case -1: - /* do nothing */ - break; - default: - gf_log ("epoll", GF_LOG_ERROR, - "invalid poll_out value %d", poll_out); - break; - } +static struct event_pool * +event_pool_new_epoll(int count, int eventthreadcount) +{ + struct event_pool *event_pool = NULL; + int epfd = -1; - event_pool->changed = 1; + event_pool = GF_CALLOC(1, sizeof(*event_pool), gf_common_mt_event_pool); - epoll_event.events = event_pool->reg[idx].events; - ev_data->fd = fd; - ev_data->idx = idx; + if (!event_pool) + goto out; - ret = epoll_ctl (event_pool->fd, EPOLL_CTL_ADD, fd, - &epoll_event); + epfd = epoll_create(count); - if (ret == -1) { - gf_log ("epoll", GF_LOG_ERROR, - "failed to add fd(=%d) to epoll fd(=%d) (%s)", - fd, event_pool->fd, strerror (errno)); - goto unlock; - } + if (epfd == -1) { + gf_smsg("epoll", GF_LOG_ERROR, errno, LG_MSG_EPOLL_FD_CREATE_FAILED, + NULL); + GF_FREE(event_pool->reg); + GF_FREE(event_pool); + event_pool = NULL; + goto out; + } - pthread_cond_broadcast (&event_pool->cond); - } -unlock: - pthread_mutex_unlock (&event_pool->mutex); + event_pool->fd = epfd; + + event_pool->count = count; + INIT_LIST_HEAD(&event_pool->poller_death); + event_pool->eventthreadcount = eventthreadcount; + event_pool->auto_thread_count = 0; + + pthread_mutex_init(&event_pool->mutex, NULL); out: - return ret; + return event_pool; } - -static int -event_unregister_epoll (struct event_pool *event_pool, int fd, int idx_hint) +static void +__slot_update_events(struct event_slot_epoll *slot, int poll_in, int poll_out) { - int idx = -1; - int ret = -1; + switch (poll_in) { + case 1: + slot->events |= EPOLLIN; + break; + case 0: + slot->events &= ~EPOLLIN; + break; + case -1: + /* do nothing */ + break; + default: + gf_smsg("epoll", GF_LOG_ERROR, 0, LG_MSG_INVALID_POLL_IN, + "value=%d", poll_in, NULL); + break; + } + + switch (poll_out) { + case 1: + slot->events |= EPOLLOUT; + break; + case 0: + slot->events &= ~EPOLLOUT; + break; + case -1: + /* do nothing */ + break; + default: + gf_smsg("epoll", GF_LOG_ERROR, 0, LG_MSG_INVALID_POLL_OUT, + "value=%d", poll_out, NULL); + break; + } +} - struct epoll_event epoll_event = {0, }; - struct event_data *ev_data = (void *)&epoll_event.data; - int lastidx = -1; +int +event_register_epoll(struct event_pool *event_pool, int fd, + event_handler_t handler, void *data, int poll_in, + int poll_out, char notify_poller_death) +{ + int idx = -1; + int ret = -1; + int destroy = 0; + struct epoll_event epoll_event = { + 0, + }; + struct event_data *ev_data = (void *)&epoll_event.data; + struct event_slot_epoll *slot = NULL; + + GF_VALIDATE_OR_GOTO("event", event_pool, out); + + /* TODO: Even with the below check, there is a possibility of race, + * What if the destroy mode is set after the check is done. + * Not sure of the best way to prevent this race, ref counting + * is one possibility. + * There is no harm in registering and unregistering the fd + * even after destroy mode is set, just that such fds will remain + * open until unregister is called, also the events on that fd will be + * notified, until one of the poller thread is alive. + */ + pthread_mutex_lock(&event_pool->mutex); + { + destroy = event_pool->destroy; + } + pthread_mutex_unlock(&event_pool->mutex); + + if (destroy == 1) + goto out; + + idx = event_slot_alloc(event_pool, fd, notify_poller_death, &slot); + if (idx == -1) { + gf_smsg("epoll", GF_LOG_ERROR, 0, LG_MSG_SLOT_NOT_FOUND, "fd=%d", fd, + NULL); + return -1; + } + + assert(slot->fd == fd); + + LOCK(&slot->lock); + { + /* make epoll 'singleshot', which + means we need to re-add the fd with + epoll_ctl(EPOLL_CTL_MOD) after delivery of every + single event. This assures us that while a poller + thread has picked up and is processing an event, + another poller will not try to pick this at the same + time as well. + */ + + slot->events = EPOLLPRI | EPOLLHUP | EPOLLERR | EPOLLONESHOT; + slot->handler = handler; + slot->data = data; + + __slot_update_events(slot, poll_in, poll_out); + + epoll_event.events = slot->events; + ev_data->idx = idx; + ev_data->gen = slot->gen; + + ret = epoll_ctl(event_pool->fd, EPOLL_CTL_ADD, fd, &epoll_event); + /* check ret after UNLOCK() to avoid deadlock in + event_slot_unref() + */ + } + UNLOCK(&slot->lock); + + if (ret == -1) { + gf_smsg("epoll", GF_LOG_ERROR, errno, LG_MSG_EPOLL_FD_ADD_FAILED, + "fd=%d", fd, "epoll_fd=%d", event_pool->fd, NULL); + event_slot_unref(event_pool, slot, idx); + idx = -1; + } + + /* keep slot->ref (do not event_slot_unref) if successful */ +out: + return idx; +} - GF_VALIDATE_OR_GOTO ("event", event_pool, out); +static int +event_unregister_epoll_common(struct event_pool *event_pool, int fd, int idx, + int do_close) +{ + int ret = -1; + struct event_slot_epoll *slot = NULL; + + GF_VALIDATE_OR_GOTO("event", event_pool, out); + + /* During shutdown, it may happen that a socket registration with + * the event sub-system may fail and an rpc_transport_unref() may + * be called for such an unregistered socket with idx == -1. This + * may cause the following assert(slot->fd == fd) to fail. + */ + if (idx < 0) + goto out; + + slot = event_slot_get(event_pool, idx); + if (!slot) { + gf_smsg("epoll", GF_LOG_ERROR, 0, LG_MSG_SLOT_NOT_FOUND, "fd=%d", fd, + "idx=%d", idx, NULL); + return -1; + } + + assert(slot->fd == fd); + + LOCK(&slot->lock); + { + ret = epoll_ctl(event_pool->fd, EPOLL_CTL_DEL, fd, NULL); + + if (ret == -1) { + gf_smsg("epoll", GF_LOG_ERROR, errno, LG_MSG_EPOLL_FD_DEL_FAILED, + "fd=%d", fd, "epoll_fd=%d", event_pool->fd, NULL); + goto unlock; + } - pthread_mutex_lock (&event_pool->mutex); - { - idx = __event_getindex (event_pool, fd, idx_hint); + slot->do_close = do_close; + slot->gen++; /* detect unregister in dispatch_handler() */ + } +unlock: + UNLOCK(&slot->lock); - if (idx == -1) { - gf_log ("epoll", GF_LOG_ERROR, - "index not found for fd=%d (idx_hint=%d)", - fd, idx_hint); - errno = ENOENT; - goto unlock; - } + event_slot_unref(event_pool, slot, idx); /* one for event_register() */ + event_slot_unref(event_pool, slot, idx); /* one for event_slot_get() */ +out: + return ret; +} - ret = epoll_ctl (event_pool->fd, EPOLL_CTL_DEL, fd, NULL); +static int +event_unregister_epoll(struct event_pool *event_pool, int fd, int idx_hint) +{ + int ret = -1; - /* if ret is -1, this array member should never be accessed */ - /* if it is 0, the array member might be used by idx_cache - * in which case the member should not be accessed till - * it is reallocated - */ + ret = event_unregister_epoll_common(event_pool, fd, idx_hint, 0); - event_pool->reg[idx].fd = -1; + return ret; +} - if (ret == -1) { - gf_log ("epoll", GF_LOG_ERROR, - "fail to del fd(=%d) from epoll fd(=%d) (%s)", - fd, event_pool->fd, strerror (errno)); - goto unlock; - } +static int +event_unregister_close_epoll(struct event_pool *event_pool, int fd, + int idx_hint) +{ + int ret = -1; - lastidx = event_pool->used - 1; - if (lastidx == idx) { - event_pool->used--; - goto unlock; - } + ret = event_unregister_epoll_common(event_pool, fd, idx_hint, 1); - epoll_event.events = event_pool->reg[lastidx].events; - ev_data->fd = event_pool->reg[lastidx].fd; - ev_data->idx = idx; - - ret = epoll_ctl (event_pool->fd, EPOLL_CTL_MOD, ev_data->fd, - &epoll_event); - if (ret == -1) { - gf_log ("epoll", GF_LOG_ERROR, - "fail to modify fd(=%d) index %d to %d (%s)", - ev_data->fd, event_pool->used, idx, - strerror (errno)); - goto unlock; - } + return ret; +} - /* just replace the unregistered idx by last one */ - event_pool->reg[idx] = event_pool->reg[lastidx]; - event_pool->used--; +static int +event_select_on_epoll(struct event_pool *event_pool, int fd, int idx, + int poll_in, int poll_out) +{ + int ret = -1; + struct event_slot_epoll *slot = NULL; + struct epoll_event epoll_event = { + 0, + }; + struct event_data *ev_data = (void *)&epoll_event.data; + + GF_VALIDATE_OR_GOTO("event", event_pool, out); + + slot = event_slot_get(event_pool, idx); + if (!slot) { + gf_smsg("epoll", GF_LOG_ERROR, 0, LG_MSG_SLOT_NOT_FOUND, "fd=%d", fd, + "idx=%d", idx, NULL); + return -1; + } + + assert(slot->fd == fd); + + LOCK(&slot->lock); + { + __slot_update_events(slot, poll_in, poll_out); + + epoll_event.events = slot->events; + ev_data->idx = idx; + ev_data->gen = slot->gen; + + if (slot->in_handler) + /* + * in_handler indicates at least one thread + * executing event_dispatch_epoll_handler() + * which will perform epoll_ctl(EPOLL_CTL_MOD) + * anyways (because of EPOLLET) + * + * This not only saves a system call, but also + * avoids possibility of another epoll thread + * picking up the next event while the ongoing + * handler is still in progress (and resulting + * in unnecessary contention on rpc_transport_t->mutex). + */ + goto unlock; + + ret = epoll_ctl(event_pool->fd, EPOLL_CTL_MOD, fd, &epoll_event); + if (ret == -1) { + gf_smsg("epoll", GF_LOG_ERROR, errno, LG_MSG_EPOLL_FD_MODIFY_FAILED, + "fd=%d", fd, "events=%d", epoll_event.events, NULL); } + } unlock: - pthread_mutex_unlock (&event_pool->mutex); + UNLOCK(&slot->lock); + + event_slot_unref(event_pool, slot, idx); out: - return ret; + return idx; } - static int -event_select_on_epoll (struct event_pool *event_pool, int fd, int idx_hint, - int poll_in, int poll_out) +event_dispatch_epoll_handler(struct event_pool *event_pool, + struct epoll_event *event) { - int idx = -1; - int ret = -1; + struct event_data *ev_data = NULL; + struct event_slot_epoll *slot = NULL; + event_handler_t handler = NULL; + void *data = NULL; + int idx = -1; + int gen = -1; + int ret = -1; + int fd = -1; + gf_boolean_t handled_error_previously = _gf_false; + + ev_data = (void *)&event->data; + handler = NULL; + data = NULL; + + idx = ev_data->idx; + gen = ev_data->gen; + + slot = event_slot_get(event_pool, idx); + if (!slot) { + gf_smsg("epoll", GF_LOG_ERROR, 0, LG_MSG_SLOT_NOT_FOUND, "idx=%d", idx, + NULL); + return -1; + } + + LOCK(&slot->lock); + { + fd = slot->fd; + if (fd == -1) { + gf_smsg("epoll", GF_LOG_ERROR, 0, LG_MSG_STALE_FD_FOUND, "idx=%d", + idx, "gen=%d", gen, "events=%d", event->events, + "slot->gen=%d", slot->gen, NULL); + /* fd got unregistered in another thread */ + goto pre_unlock; + } + + if (gen != slot->gen) { + gf_smsg("epoll", GF_LOG_ERROR, 0, LG_MSG_GENERATION_MISMATCH, + "idx=%d", idx, "gen=%d", gen, "slot->gen=%d", slot->gen, + "slot->fd=%d", slot->fd, NULL); + /* slot was re-used and therefore is another fd! */ + goto pre_unlock; + } - struct epoll_event epoll_event = {0, }; - struct event_data *ev_data = (void *)&epoll_event.data; + handler = slot->handler; + data = slot->data; + if (slot->in_handler > 0) { + /* Another handler is inprogress, skip this one. */ + handler = NULL; + goto pre_unlock; + } - GF_VALIDATE_OR_GOTO ("event", event_pool, out); + if (slot->handled_error) { + handled_error_previously = _gf_true; + } else { + slot->handled_error = (event->events & (EPOLLERR | EPOLLHUP)); + slot->in_handler++; + } + } +pre_unlock: + UNLOCK(&slot->lock); - pthread_mutex_lock (&event_pool->mutex); - { - idx = __event_getindex (event_pool, fd, idx_hint); + ret = 0; - if (idx == -1) { - gf_log ("epoll", GF_LOG_ERROR, - "index not found for fd=%d (idx_hint=%d)", - fd, idx_hint); - errno = ENOENT; - goto unlock; - } + if (!handler) + goto out; - switch (poll_in) { - case 1: - event_pool->reg[idx].events |= EPOLLIN; - break; - case 0: - event_pool->reg[idx].events &= ~EPOLLIN; - break; - case -1: - /* do nothing */ - break; - default: - gf_log ("epoll", GF_LOG_ERROR, - "invalid poll_in value %d", poll_in); - break; + if (!handled_error_previously) { + handler(fd, idx, gen, data, (event->events & (EPOLLIN | EPOLLPRI)), + (event->events & (EPOLLOUT)), + (event->events & (EPOLLERR | EPOLLHUP)), 0); + } +out: + event_slot_unref(event_pool, slot, idx); + + return ret; +} + +static void * +event_dispatch_epoll_worker(void *data) +{ + struct epoll_event event; + int ret = -1; + struct event_thread_data *ev_data = data; + struct event_pool *event_pool; + int myindex = -1; + int timetodie = 0, gen = 0; + struct list_head poller_death_notify; + struct event_slot_epoll *slot = NULL, *tmp = NULL; + + GF_VALIDATE_OR_GOTO("event", ev_data, out); + + event_pool = ev_data->event_pool; + myindex = ev_data->event_index; + + GF_VALIDATE_OR_GOTO("event", event_pool, out); + + gf_smsg("epoll", GF_LOG_INFO, 0, LG_MSG_STARTED_EPOLL_THREAD, "index=%d", + myindex - 1, NULL); + + pthread_mutex_lock(&event_pool->mutex); + { + event_pool->activethreadcount++; + } + pthread_mutex_unlock(&event_pool->mutex); + + for (;;) { + if (event_pool->eventthreadcount < myindex) { + /* ...time to die, thread count was decreased below + * this threads index */ + /* Start with extra safety at this point, reducing + * lock conention in normal case when threads are not + * reconfigured always */ + pthread_mutex_lock(&event_pool->mutex); + { + if (event_pool->eventthreadcount < myindex) { + while (event_pool->poller_death_sliced) { + pthread_cond_wait(&event_pool->cond, + &event_pool->mutex); + } + + INIT_LIST_HEAD(&poller_death_notify); + /* if found true in critical section, + * die */ + event_pool->pollers[myindex - 1] = 0; + event_pool->activethreadcount--; + timetodie = 1; + gen = ++event_pool->poller_gen; + list_for_each_entry(slot, &event_pool->poller_death, + poller_death) + { + event_slot_ref(slot); + } + + list_splice_init(&event_pool->poller_death, + &poller_death_notify); + event_pool->poller_death_sliced = 1; + pthread_cond_broadcast(&event_pool->cond); + } + } + pthread_mutex_unlock(&event_pool->mutex); + if (timetodie) { + list_for_each_entry(slot, &poller_death_notify, poller_death) + { + slot->handler(slot->fd, 0, gen, slot->data, 0, 0, 0, 1); } - switch (poll_out) { - case 1: - event_pool->reg[idx].events |= EPOLLOUT; - break; - case 0: - event_pool->reg[idx].events &= ~EPOLLOUT; - break; - case -1: - /* do nothing */ - break; - default: - gf_log ("epoll", GF_LOG_ERROR, - "invalid poll_out value %d", poll_out); - break; + pthread_mutex_lock(&event_pool->mutex); + { + list_for_each_entry_safe(slot, tmp, &poller_death_notify, + poller_death) + { + __event_slot_unref(event_pool, slot, slot->idx); + } + + list_splice(&poller_death_notify, + &event_pool->poller_death); + event_pool->poller_death_sliced = 0; + pthread_cond_broadcast(&event_pool->cond); } + pthread_mutex_unlock(&event_pool->mutex); - epoll_event.events = event_pool->reg[idx].events; - ev_data->fd = fd; - ev_data->idx = idx; + gf_smsg("epoll", GF_LOG_INFO, 0, LG_MSG_EXITED_EPOLL_THREAD, + "index=%d", myindex, NULL); - ret = epoll_ctl (event_pool->fd, EPOLL_CTL_MOD, fd, - &epoll_event); - if (ret == -1) { - gf_log ("epoll", GF_LOG_ERROR, - "failed to modify fd(=%d) events to %d", - fd, epoll_event.events); - } + goto out; + } } -unlock: - pthread_mutex_unlock (&event_pool->mutex); + ret = epoll_wait(event_pool->fd, &event, 1, -1); + + if (ret == 0) + /* timeout */ + continue; + + if (ret == -1 && errno == EINTR) + /* sys call */ + continue; + + ret = event_dispatch_epoll_handler(event_pool, &event); + if (ret) { + gf_smsg("epoll", GF_LOG_ERROR, 0, LG_MSG_DISPATCH_HANDLER_FAILED, + NULL); + } + } out: - return ret; + if (ev_data) + GF_FREE(ev_data); + return NULL; } - +/* Attempts to start the # of configured pollers, ensuring at least the first + * is started in a joinable state */ static int -event_dispatch_epoll_handler (struct event_pool *event_pool, - struct epoll_event *events, int i) +event_dispatch_epoll(struct event_pool *event_pool) { - struct event_data *event_data = NULL; - event_handler_t handler = NULL; - void *data = NULL; - int idx = -1; - int ret = -1; - - - event_data = (void *)&events[i].data; - handler = NULL; - data = NULL; - - pthread_mutex_lock (&event_pool->mutex); - { - idx = __event_getindex (event_pool, event_data->fd, - event_data->idx); - - if (idx == -1) { - gf_log ("epoll", GF_LOG_ERROR, - "index not found for fd(=%d) (idx_hint=%d)", - event_data->fd, event_data->idx); - goto unlock; + int i = 0; + pthread_t t_id; + int pollercount = 0; + int ret = -1; + struct event_thread_data *ev_data = NULL; + + /* Start the configured number of pollers */ + pthread_mutex_lock(&event_pool->mutex); + { + pollercount = event_pool->eventthreadcount; + + /* Set to MAX if greater */ + if (pollercount > EVENT_MAX_THREADS) + pollercount = EVENT_MAX_THREADS; + + /* Default pollers to 1 in case this is incorrectly set */ + if (pollercount <= 0) + pollercount = 1; + + event_pool->activethreadcount++; + + for (i = 0; i < pollercount; i++) { + ev_data = GF_CALLOC(1, sizeof(*ev_data), gf_common_mt_event_pool); + if (!ev_data) { + if (i == 0) { + /* Need to succeed creating 0'th + * thread, to joinable and wait */ + break; + } else { + /* Inability to create other threads + * are a lesser evil, and ignored */ + continue; } - - handler = event_pool->reg[idx].handler; - data = event_pool->reg[idx].data; + } + + ev_data->event_pool = event_pool; + ev_data->event_index = i + 1; + + ret = gf_thread_create(&t_id, NULL, event_dispatch_epoll_worker, + ev_data, "epoll%03hx", i & 0x3ff); + if (!ret) { + event_pool->pollers[i] = t_id; + + /* mark all threads other than one in index 0 + * as detachable. Errors can be ignored, they + * spend their time as zombies if not detched + * and the thread counts are decreased */ + if (i != 0) + pthread_detach(event_pool->pollers[i]); + } else { + gf_smsg("epoll", GF_LOG_WARNING, 0, + LG_MSG_START_EPOLL_THREAD_FAILED, "index=%d", i, NULL); + if (i == 0) { + GF_FREE(ev_data); + break; + } else { + GF_FREE(ev_data); + continue; + } + } } -unlock: - pthread_mutex_unlock (&event_pool->mutex); - - if (handler) - ret = handler (event_data->fd, event_data->idx, data, - (events[i].events & (EPOLLIN|EPOLLPRI)), - (events[i].events & (EPOLLOUT)), - (events[i].events & (EPOLLERR|EPOLLHUP))); - return ret; + } + pthread_mutex_unlock(&event_pool->mutex); + + /* Just wait for the first thread, that is created in a joinable state + * and will never die, ensuring this function never returns */ + if (event_pool->pollers[0] != 0) + pthread_join(event_pool->pollers[0], NULL); + + pthread_mutex_lock(&event_pool->mutex); + { + event_pool->activethreadcount--; + } + pthread_mutex_unlock(&event_pool->mutex); + + return ret; } +/** + * @param event_pool event_pool on which fds of interest are registered for + * events. + * + * @return 1 if at least one epoll worker thread is spawned, 0 otherwise + * + * NB This function SHOULD be called under event_pool->mutex. + */ static int -event_dispatch_epoll (struct event_pool *event_pool) +event_pool_dispatched_unlocked(struct event_pool *event_pool) { - struct epoll_event *events = NULL; - int size = 0; - int i = 0; - int ret = -1; - - GF_VALIDATE_OR_GOTO ("event", event_pool, out); - - while (1) { - pthread_mutex_lock (&event_pool->mutex); - { - while (event_pool->used == 0) - pthread_cond_wait (&event_pool->cond, - &event_pool->mutex); + return (event_pool->pollers[0] != 0); +} - if (event_pool->used > event_pool->evcache_size) { - GF_FREE (event_pool->evcache); +int +event_reconfigure_threads_epoll(struct event_pool *event_pool, int value) +{ + int i; + int ret = 0; + pthread_t t_id; + int oldthreadcount; + struct event_thread_data *ev_data = NULL; + + pthread_mutex_lock(&event_pool->mutex); + { + /* Reconfigure to 0 threads is allowed only in destroy mode */ + if (event_pool->destroy == 1) { + value = 0; + } else { + /* Set to MAX if greater */ + if (value > EVENT_MAX_THREADS) + value = EVENT_MAX_THREADS; + + /* Default pollers to 1 in case this is set incorrectly */ + if (value <= 0) + value = 1; + } - event_pool->evcache = events = NULL; + oldthreadcount = event_pool->eventthreadcount; + + /* Start 'worker' threads as necessary only if event_dispatch() + * was called before. If event_dispatch() was not called, there + * will be no epoll 'worker' threads running yet. */ + + if (event_pool_dispatched_unlocked(event_pool) && + (oldthreadcount < value)) { + /* create more poll threads */ + for (i = oldthreadcount; i < value; i++) { + /* Start a thread if the index at this location + * is a 0, so that the older thread is confirmed + * as dead */ + if (event_pool->pollers[i] == 0) { + ev_data = GF_CALLOC(1, sizeof(*ev_data), + gf_common_mt_event_pool); + if (!ev_data) { + continue; + } + + ev_data->event_pool = event_pool; + ev_data->event_index = i + 1; + + ret = gf_thread_create(&t_id, NULL, + event_dispatch_epoll_worker, ev_data, + "epoll%03hx", i & 0x3ff); + if (ret) { + gf_smsg("epoll", GF_LOG_WARNING, 0, + LG_MSG_START_EPOLL_THREAD_FAILED, "index=%d", i, + NULL); + GF_FREE(ev_data); + } else { + pthread_detach(t_id); + event_pool->pollers[i] = t_id; + } + } + } + } - event_pool->evcache_size = - event_pool->used + 256; + /* if value decreases, threads will terminate, themselves */ + event_pool->eventthreadcount = value; + } + pthread_mutex_unlock(&event_pool->mutex); - events = GF_CALLOC (event_pool->evcache_size, - sizeof (struct epoll_event), - gf_common_mt_epoll_event); - if (!events) - break; + return 0; +} - event_pool->evcache = events; - } - } - pthread_mutex_unlock (&event_pool->mutex); +/* This function is the destructor for the event_pool data structure + * Should be called only after poller_threads_destroy() is called, + * else will lead to crashes. + */ +static int +event_pool_destroy_epoll(struct event_pool *event_pool) +{ + int ret = 0, i = 0, j = 0; + struct event_slot_epoll *table = NULL; + + ret = sys_close(event_pool->fd); + + for (i = 0; i < EVENT_EPOLL_TABLES; i++) { + if (event_pool->ereg[i]) { + table = event_pool->ereg[i]; + event_pool->ereg[i] = NULL; + for (j = 0; j < EVENT_EPOLL_SLOTS; j++) { + LOCK_DESTROY(&table[j].lock); + } + GF_FREE(table); + } + } - ret = epoll_wait (event_pool->fd, event_pool->evcache, - event_pool->evcache_size, -1); + pthread_mutex_destroy(&event_pool->mutex); + pthread_cond_destroy(&event_pool->cond); - if (ret == 0) - /* timeout */ - continue; + GF_FREE(event_pool->evcache); + GF_FREE(event_pool->reg); + GF_FREE(event_pool); - if (ret == -1 && errno == EINTR) - /* sys call */ - continue; + return ret; +} - size = ret; +static int +event_handled_epoll(struct event_pool *event_pool, int fd, int idx, int gen) +{ + struct event_slot_epoll *slot = NULL; + struct epoll_event epoll_event = { + 0, + }; + struct event_data *ev_data = (void *)&epoll_event.data; + int ret = 0; + + slot = event_slot_get(event_pool, idx); + if (!slot) { + gf_smsg("epoll", GF_LOG_ERROR, 0, LG_MSG_SLOT_NOT_FOUND, "fd=%d", fd, + "idx=%d", idx, NULL); + return -1; + } + + assert(slot->fd == fd); + + LOCK(&slot->lock); + { + slot->in_handler--; + + if (gen != slot->gen) { + /* event_unregister() happened while we were + in handler() + */ + gf_msg_debug("epoll", 0, + "generation bumped on idx=%d" + " from gen=%d to slot->gen=%d, fd=%d, " + "slot->fd=%d", + idx, gen, slot->gen, fd, slot->fd); + goto unlock; + } - for (i = 0; i < size; i++) { - if (!events || !events[i].events) - continue; + /* This call also picks up the changes made by another + thread calling event_select_on_epoll() while this + thread was busy in handler() + */ + if (slot->in_handler == 0) { + epoll_event.events = slot->events; + ev_data->idx = idx; + ev_data->gen = gen; - ret = event_dispatch_epoll_handler (event_pool, - events, i); - } + ret = epoll_ctl(event_pool->fd, EPOLL_CTL_MOD, fd, &epoll_event); } + } +unlock: + UNLOCK(&slot->lock); -out: - return ret; -} + event_slot_unref(event_pool, slot, idx); + return ret; +} struct event_ops event_ops_epoll = { - .new = event_pool_new_epoll, - .event_register = event_register_epoll, - .event_select_on = event_select_on_epoll, - .event_unregister = event_unregister_epoll, - .event_dispatch = event_dispatch_epoll + .new = event_pool_new_epoll, + .event_register = event_register_epoll, + .event_select_on = event_select_on_epoll, + .event_unregister = event_unregister_epoll, + .event_unregister_close = event_unregister_close_epoll, + .event_dispatch = event_dispatch_epoll, + .event_reconfigure_threads = event_reconfigure_threads_epoll, + .event_pool_destroy = event_pool_destroy_epoll, + .event_handled = event_handled_epoll, }; #endif |
