diff options
Diffstat (limited to 'libglusterfs/src/event-epoll.c')
| -rw-r--r-- | libglusterfs/src/event-epoll.c | 1032 |
1 files changed, 1032 insertions, 0 deletions
diff --git a/libglusterfs/src/event-epoll.c b/libglusterfs/src/event-epoll.c new file mode 100644 index 00000000000..fb4fb845b40 --- /dev/null +++ b/libglusterfs/src/event-epoll.c @@ -0,0 +1,1032 @@ +/* + Copyright (c) 2012 Red Hat, Inc. <http://www.redhat.com> + This file is part of GlusterFS. + + This file is licensed to you under your choice of the GNU Lesser + General Public License, version 3 or any later version (LGPLv3 or + later), or the GNU General Public License, version 2 (GPLv2), in all + cases as published by the Free Software Foundation. +*/ + +#include <pthread.h> +#include <stdlib.h> +#include <errno.h> + +#include "glusterfs/gf-event.h" +#include "glusterfs/common-utils.h" +#include "glusterfs/syscall.h" +#include "glusterfs/libglusterfs-messages.h" + +#ifdef HAVE_SYS_EPOLL_H +#include <sys/epoll.h> + +struct event_slot_epoll { + int fd; + int events; + int gen; + int idx; + gf_atomic_t ref; + int do_close; + int in_handler; + int handled_error; + void *data; + event_handler_t handler; + gf_lock_t lock; + struct list_head poller_death; +}; + +struct event_thread_data { + struct event_pool *event_pool; + int event_index; +}; + +static struct event_slot_epoll * +__event_newtable(struct event_pool *event_pool, int table_idx) +{ + struct event_slot_epoll *table = NULL; + int i = -1; + + table = GF_CALLOC(sizeof(*table), EVENT_EPOLL_SLOTS, gf_common_mt_ereg); + if (!table) + return NULL; + + for (i = 0; i < EVENT_EPOLL_SLOTS; i++) { + table[i].fd = -1; + LOCK_INIT(&table[i].lock); + INIT_LIST_HEAD(&table[i].poller_death); + } + + event_pool->ereg[table_idx] = table; + event_pool->slots_used[table_idx] = 0; + + return table; +} + +static int +event_slot_ref(struct event_slot_epoll *slot) +{ + if (!slot) + return -1; + + return GF_ATOMIC_INC(slot->ref); +} + +static int +__event_slot_alloc(struct event_pool *event_pool, int fd, + char notify_poller_death, struct event_slot_epoll **slot) +{ + int i = 0; + int j = 0; + int table_idx = -1; + int gen = -1; + struct event_slot_epoll *table = NULL; + +retry: + + while (i < EVENT_EPOLL_TABLES) { + switch (event_pool->slots_used[i]) { + case EVENT_EPOLL_SLOTS: + break; + case 0: + if (!event_pool->ereg[i]) { + table = __event_newtable(event_pool, i); + if (!table) + return -1; + } else { + table = event_pool->ereg[i]; + } + break; + default: + table = event_pool->ereg[i]; + break; + } + + if (table) + /* break out of the loop */ + break; + i++; + } + + if (!table) + return -1; + + table_idx = i; + + for (j = 0; j < EVENT_EPOLL_SLOTS; j++) { + if (table[j].fd == -1) { + /* wipe everything except bump the generation */ + gen = table[j].gen; + memset(&table[j], 0, sizeof(table[j])); + table[j].gen = gen + 1; + + LOCK_INIT(&table[j].lock); + INIT_LIST_HEAD(&table[j].poller_death); + + table[j].fd = fd; + if (notify_poller_death) { + table[j].idx = table_idx * EVENT_EPOLL_SLOTS + j; + list_add_tail(&table[j].poller_death, + &event_pool->poller_death); + } + + event_pool->slots_used[table_idx]++; + + break; + } + } + + if (j == EVENT_EPOLL_SLOTS) { + table = NULL; + i++; + goto retry; + } else { + (*slot) = &table[j]; + event_slot_ref(*slot); + return table_idx * EVENT_EPOLL_SLOTS + j; + } +} + +static int +event_slot_alloc(struct event_pool *event_pool, int fd, + char notify_poller_death, struct event_slot_epoll **slot) +{ + int idx = -1; + + pthread_mutex_lock(&event_pool->mutex); + { + idx = __event_slot_alloc(event_pool, fd, notify_poller_death, slot); + } + pthread_mutex_unlock(&event_pool->mutex); + + return idx; +} + +static void +__event_slot_dealloc(struct event_pool *event_pool, int idx) +{ + int table_idx = 0; + int offset = 0; + struct event_slot_epoll *table = NULL; + struct event_slot_epoll *slot = NULL; + int fd = -1; + + table_idx = idx / EVENT_EPOLL_SLOTS; + offset = idx % EVENT_EPOLL_SLOTS; + + table = event_pool->ereg[table_idx]; + if (!table) + return; + + slot = &table[offset]; + slot->gen++; + + fd = slot->fd; + slot->fd = -1; + slot->handled_error = 0; + slot->in_handler = 0; + list_del_init(&slot->poller_death); + if (fd != -1) + event_pool->slots_used[table_idx]--; + + return; +} + +static void +event_slot_dealloc(struct event_pool *event_pool, int idx) +{ + pthread_mutex_lock(&event_pool->mutex); + { + __event_slot_dealloc(event_pool, idx); + } + pthread_mutex_unlock(&event_pool->mutex); + + return; +} + +static struct event_slot_epoll * +event_slot_get(struct event_pool *event_pool, int idx) +{ + struct event_slot_epoll *slot = NULL; + struct event_slot_epoll *table = NULL; + int table_idx = 0; + int offset = 0; + + table_idx = idx / EVENT_EPOLL_SLOTS; + offset = idx % EVENT_EPOLL_SLOTS; + + table = event_pool->ereg[table_idx]; + if (!table) + return NULL; + + slot = &table[offset]; + + event_slot_ref(slot); + return slot; +} + +static void +__event_slot_unref(struct event_pool *event_pool, struct event_slot_epoll *slot, + int idx) +{ + int ref = -1; + int fd = -1; + int do_close = 0; + + ref = GF_ATOMIC_DEC(slot->ref); + if (ref) + /* slot still alive */ + goto done; + + LOCK(&slot->lock); + { + fd = slot->fd; + do_close = slot->do_close; + slot->do_close = 0; + } + UNLOCK(&slot->lock); + + __event_slot_dealloc(event_pool, idx); + + if (do_close) + sys_close(fd); +done: + return; +} + +static void +event_slot_unref(struct event_pool *event_pool, struct event_slot_epoll *slot, + int idx) +{ + int ref = -1; + int fd = -1; + int do_close = 0; + + ref = GF_ATOMIC_DEC(slot->ref); + if (ref) + /* slot still alive */ + goto done; + + LOCK(&slot->lock); + { + fd = slot->fd; + do_close = slot->do_close; + slot->do_close = 0; + } + UNLOCK(&slot->lock); + + event_slot_dealloc(event_pool, idx); + + if (do_close) + sys_close(fd); +done: + return; +} + +static struct event_pool * +event_pool_new_epoll(int count, int eventthreadcount) +{ + struct event_pool *event_pool = NULL; + int epfd = -1; + + event_pool = GF_CALLOC(1, sizeof(*event_pool), gf_common_mt_event_pool); + + if (!event_pool) + goto out; + + epfd = epoll_create(count); + + if (epfd == -1) { + gf_smsg("epoll", GF_LOG_ERROR, errno, LG_MSG_EPOLL_FD_CREATE_FAILED, + NULL); + GF_FREE(event_pool->reg); + GF_FREE(event_pool); + event_pool = NULL; + goto out; + } + + event_pool->fd = epfd; + + event_pool->count = count; + INIT_LIST_HEAD(&event_pool->poller_death); + event_pool->eventthreadcount = eventthreadcount; + event_pool->auto_thread_count = 0; + + pthread_mutex_init(&event_pool->mutex, NULL); + +out: + return event_pool; +} + +static void +__slot_update_events(struct event_slot_epoll *slot, int poll_in, int poll_out) +{ + switch (poll_in) { + case 1: + slot->events |= EPOLLIN; + break; + case 0: + slot->events &= ~EPOLLIN; + break; + case -1: + /* do nothing */ + break; + default: + gf_smsg("epoll", GF_LOG_ERROR, 0, LG_MSG_INVALID_POLL_IN, + "value=%d", poll_in, NULL); + break; + } + + switch (poll_out) { + case 1: + slot->events |= EPOLLOUT; + break; + case 0: + slot->events &= ~EPOLLOUT; + break; + case -1: + /* do nothing */ + break; + default: + gf_smsg("epoll", GF_LOG_ERROR, 0, LG_MSG_INVALID_POLL_OUT, + "value=%d", poll_out, NULL); + break; + } +} + +int +event_register_epoll(struct event_pool *event_pool, int fd, + event_handler_t handler, void *data, int poll_in, + int poll_out, char notify_poller_death) +{ + int idx = -1; + int ret = -1; + int destroy = 0; + struct epoll_event epoll_event = { + 0, + }; + struct event_data *ev_data = (void *)&epoll_event.data; + struct event_slot_epoll *slot = NULL; + + GF_VALIDATE_OR_GOTO("event", event_pool, out); + + /* TODO: Even with the below check, there is a possibility of race, + * What if the destroy mode is set after the check is done. + * Not sure of the best way to prevent this race, ref counting + * is one possibility. + * There is no harm in registering and unregistering the fd + * even after destroy mode is set, just that such fds will remain + * open until unregister is called, also the events on that fd will be + * notified, until one of the poller thread is alive. + */ + pthread_mutex_lock(&event_pool->mutex); + { + destroy = event_pool->destroy; + } + pthread_mutex_unlock(&event_pool->mutex); + + if (destroy == 1) + goto out; + + idx = event_slot_alloc(event_pool, fd, notify_poller_death, &slot); + if (idx == -1) { + gf_smsg("epoll", GF_LOG_ERROR, 0, LG_MSG_SLOT_NOT_FOUND, "fd=%d", fd, + NULL); + return -1; + } + + assert(slot->fd == fd); + + LOCK(&slot->lock); + { + /* make epoll 'singleshot', which + means we need to re-add the fd with + epoll_ctl(EPOLL_CTL_MOD) after delivery of every + single event. This assures us that while a poller + thread has picked up and is processing an event, + another poller will not try to pick this at the same + time as well. + */ + + slot->events = EPOLLPRI | EPOLLHUP | EPOLLERR | EPOLLONESHOT; + slot->handler = handler; + slot->data = data; + + __slot_update_events(slot, poll_in, poll_out); + + epoll_event.events = slot->events; + ev_data->idx = idx; + ev_data->gen = slot->gen; + + ret = epoll_ctl(event_pool->fd, EPOLL_CTL_ADD, fd, &epoll_event); + /* check ret after UNLOCK() to avoid deadlock in + event_slot_unref() + */ + } + UNLOCK(&slot->lock); + + if (ret == -1) { + gf_smsg("epoll", GF_LOG_ERROR, errno, LG_MSG_EPOLL_FD_ADD_FAILED, + "fd=%d", fd, "epoll_fd=%d", event_pool->fd, NULL); + event_slot_unref(event_pool, slot, idx); + idx = -1; + } + + /* keep slot->ref (do not event_slot_unref) if successful */ +out: + return idx; +} + +static int +event_unregister_epoll_common(struct event_pool *event_pool, int fd, int idx, + int do_close) +{ + int ret = -1; + struct event_slot_epoll *slot = NULL; + + GF_VALIDATE_OR_GOTO("event", event_pool, out); + + /* During shutdown, it may happen that a socket registration with + * the event sub-system may fail and an rpc_transport_unref() may + * be called for such an unregistered socket with idx == -1. This + * may cause the following assert(slot->fd == fd) to fail. + */ + if (idx < 0) + goto out; + + slot = event_slot_get(event_pool, idx); + if (!slot) { + gf_smsg("epoll", GF_LOG_ERROR, 0, LG_MSG_SLOT_NOT_FOUND, "fd=%d", fd, + "idx=%d", idx, NULL); + return -1; + } + + assert(slot->fd == fd); + + LOCK(&slot->lock); + { + ret = epoll_ctl(event_pool->fd, EPOLL_CTL_DEL, fd, NULL); + + if (ret == -1) { + gf_smsg("epoll", GF_LOG_ERROR, errno, LG_MSG_EPOLL_FD_DEL_FAILED, + "fd=%d", fd, "epoll_fd=%d", event_pool->fd, NULL); + goto unlock; + } + + slot->do_close = do_close; + slot->gen++; /* detect unregister in dispatch_handler() */ + } +unlock: + UNLOCK(&slot->lock); + + event_slot_unref(event_pool, slot, idx); /* one for event_register() */ + event_slot_unref(event_pool, slot, idx); /* one for event_slot_get() */ +out: + return ret; +} + +static int +event_unregister_epoll(struct event_pool *event_pool, int fd, int idx_hint) +{ + int ret = -1; + + ret = event_unregister_epoll_common(event_pool, fd, idx_hint, 0); + + return ret; +} + +static int +event_unregister_close_epoll(struct event_pool *event_pool, int fd, + int idx_hint) +{ + int ret = -1; + + ret = event_unregister_epoll_common(event_pool, fd, idx_hint, 1); + + return ret; +} + +static int +event_select_on_epoll(struct event_pool *event_pool, int fd, int idx, + int poll_in, int poll_out) +{ + int ret = -1; + struct event_slot_epoll *slot = NULL; + struct epoll_event epoll_event = { + 0, + }; + struct event_data *ev_data = (void *)&epoll_event.data; + + GF_VALIDATE_OR_GOTO("event", event_pool, out); + + slot = event_slot_get(event_pool, idx); + if (!slot) { + gf_smsg("epoll", GF_LOG_ERROR, 0, LG_MSG_SLOT_NOT_FOUND, "fd=%d", fd, + "idx=%d", idx, NULL); + return -1; + } + + assert(slot->fd == fd); + + LOCK(&slot->lock); + { + __slot_update_events(slot, poll_in, poll_out); + + epoll_event.events = slot->events; + ev_data->idx = idx; + ev_data->gen = slot->gen; + + if (slot->in_handler) + /* + * in_handler indicates at least one thread + * executing event_dispatch_epoll_handler() + * which will perform epoll_ctl(EPOLL_CTL_MOD) + * anyways (because of EPOLLET) + * + * This not only saves a system call, but also + * avoids possibility of another epoll thread + * picking up the next event while the ongoing + * handler is still in progress (and resulting + * in unnecessary contention on rpc_transport_t->mutex). + */ + goto unlock; + + ret = epoll_ctl(event_pool->fd, EPOLL_CTL_MOD, fd, &epoll_event); + if (ret == -1) { + gf_smsg("epoll", GF_LOG_ERROR, errno, LG_MSG_EPOLL_FD_MODIFY_FAILED, + "fd=%d", fd, "events=%d", epoll_event.events, NULL); + } + } +unlock: + UNLOCK(&slot->lock); + + event_slot_unref(event_pool, slot, idx); + +out: + return idx; +} + +static int +event_dispatch_epoll_handler(struct event_pool *event_pool, + struct epoll_event *event) +{ + struct event_data *ev_data = NULL; + struct event_slot_epoll *slot = NULL; + event_handler_t handler = NULL; + void *data = NULL; + int idx = -1; + int gen = -1; + int ret = -1; + int fd = -1; + gf_boolean_t handled_error_previously = _gf_false; + + ev_data = (void *)&event->data; + handler = NULL; + data = NULL; + + idx = ev_data->idx; + gen = ev_data->gen; + + slot = event_slot_get(event_pool, idx); + if (!slot) { + gf_smsg("epoll", GF_LOG_ERROR, 0, LG_MSG_SLOT_NOT_FOUND, "idx=%d", idx, + NULL); + return -1; + } + + LOCK(&slot->lock); + { + fd = slot->fd; + if (fd == -1) { + gf_smsg("epoll", GF_LOG_ERROR, 0, LG_MSG_STALE_FD_FOUND, "idx=%d", + idx, "gen=%d", gen, "events=%d", event->events, + "slot->gen=%d", slot->gen, NULL); + /* fd got unregistered in another thread */ + goto pre_unlock; + } + + if (gen != slot->gen) { + gf_smsg("epoll", GF_LOG_ERROR, 0, LG_MSG_GENERATION_MISMATCH, + "idx=%d", idx, "gen=%d", gen, "slot->gen=%d", slot->gen, + "slot->fd=%d", slot->fd, NULL); + /* slot was re-used and therefore is another fd! */ + goto pre_unlock; + } + + handler = slot->handler; + data = slot->data; + + if (slot->in_handler > 0) { + /* Another handler is inprogress, skip this one. */ + handler = NULL; + goto pre_unlock; + } + + if (slot->handled_error) { + handled_error_previously = _gf_true; + } else { + slot->handled_error = (event->events & (EPOLLERR | EPOLLHUP)); + slot->in_handler++; + } + } +pre_unlock: + UNLOCK(&slot->lock); + + ret = 0; + + if (!handler) + goto out; + + if (!handled_error_previously) { + handler(fd, idx, gen, data, (event->events & (EPOLLIN | EPOLLPRI)), + (event->events & (EPOLLOUT)), + (event->events & (EPOLLERR | EPOLLHUP)), 0); + } +out: + event_slot_unref(event_pool, slot, idx); + + return ret; +} + +static void * +event_dispatch_epoll_worker(void *data) +{ + struct epoll_event event; + int ret = -1; + struct event_thread_data *ev_data = data; + struct event_pool *event_pool; + int myindex = -1; + int timetodie = 0, gen = 0; + struct list_head poller_death_notify; + struct event_slot_epoll *slot = NULL, *tmp = NULL; + + GF_VALIDATE_OR_GOTO("event", ev_data, out); + + event_pool = ev_data->event_pool; + myindex = ev_data->event_index; + + GF_VALIDATE_OR_GOTO("event", event_pool, out); + + gf_smsg("epoll", GF_LOG_INFO, 0, LG_MSG_STARTED_EPOLL_THREAD, "index=%d", + myindex - 1, NULL); + + pthread_mutex_lock(&event_pool->mutex); + { + event_pool->activethreadcount++; + } + pthread_mutex_unlock(&event_pool->mutex); + + for (;;) { + if (event_pool->eventthreadcount < myindex) { + /* ...time to die, thread count was decreased below + * this threads index */ + /* Start with extra safety at this point, reducing + * lock conention in normal case when threads are not + * reconfigured always */ + pthread_mutex_lock(&event_pool->mutex); + { + if (event_pool->eventthreadcount < myindex) { + while (event_pool->poller_death_sliced) { + pthread_cond_wait(&event_pool->cond, + &event_pool->mutex); + } + + INIT_LIST_HEAD(&poller_death_notify); + /* if found true in critical section, + * die */ + event_pool->pollers[myindex - 1] = 0; + event_pool->activethreadcount--; + timetodie = 1; + gen = ++event_pool->poller_gen; + list_for_each_entry(slot, &event_pool->poller_death, + poller_death) + { + event_slot_ref(slot); + } + + list_splice_init(&event_pool->poller_death, + &poller_death_notify); + event_pool->poller_death_sliced = 1; + pthread_cond_broadcast(&event_pool->cond); + } + } + pthread_mutex_unlock(&event_pool->mutex); + if (timetodie) { + list_for_each_entry(slot, &poller_death_notify, poller_death) + { + slot->handler(slot->fd, 0, gen, slot->data, 0, 0, 0, 1); + } + + pthread_mutex_lock(&event_pool->mutex); + { + list_for_each_entry_safe(slot, tmp, &poller_death_notify, + poller_death) + { + __event_slot_unref(event_pool, slot, slot->idx); + } + + list_splice(&poller_death_notify, + &event_pool->poller_death); + event_pool->poller_death_sliced = 0; + pthread_cond_broadcast(&event_pool->cond); + } + pthread_mutex_unlock(&event_pool->mutex); + + gf_smsg("epoll", GF_LOG_INFO, 0, LG_MSG_EXITED_EPOLL_THREAD, + "index=%d", myindex, NULL); + + goto out; + } + } + + ret = epoll_wait(event_pool->fd, &event, 1, -1); + + if (ret == 0) + /* timeout */ + continue; + + if (ret == -1 && errno == EINTR) + /* sys call */ + continue; + + ret = event_dispatch_epoll_handler(event_pool, &event); + if (ret) { + gf_smsg("epoll", GF_LOG_ERROR, 0, LG_MSG_DISPATCH_HANDLER_FAILED, + NULL); + } + } +out: + if (ev_data) + GF_FREE(ev_data); + return NULL; +} + +/* Attempts to start the # of configured pollers, ensuring at least the first + * is started in a joinable state */ +static int +event_dispatch_epoll(struct event_pool *event_pool) +{ + int i = 0; + pthread_t t_id; + int pollercount = 0; + int ret = -1; + struct event_thread_data *ev_data = NULL; + + /* Start the configured number of pollers */ + pthread_mutex_lock(&event_pool->mutex); + { + pollercount = event_pool->eventthreadcount; + + /* Set to MAX if greater */ + if (pollercount > EVENT_MAX_THREADS) + pollercount = EVENT_MAX_THREADS; + + /* Default pollers to 1 in case this is incorrectly set */ + if (pollercount <= 0) + pollercount = 1; + + event_pool->activethreadcount++; + + for (i = 0; i < pollercount; i++) { + ev_data = GF_CALLOC(1, sizeof(*ev_data), gf_common_mt_event_pool); + if (!ev_data) { + if (i == 0) { + /* Need to succeed creating 0'th + * thread, to joinable and wait */ + break; + } else { + /* Inability to create other threads + * are a lesser evil, and ignored */ + continue; + } + } + + ev_data->event_pool = event_pool; + ev_data->event_index = i + 1; + + ret = gf_thread_create(&t_id, NULL, event_dispatch_epoll_worker, + ev_data, "epoll%03hx", i & 0x3ff); + if (!ret) { + event_pool->pollers[i] = t_id; + + /* mark all threads other than one in index 0 + * as detachable. Errors can be ignored, they + * spend their time as zombies if not detched + * and the thread counts are decreased */ + if (i != 0) + pthread_detach(event_pool->pollers[i]); + } else { + gf_smsg("epoll", GF_LOG_WARNING, 0, + LG_MSG_START_EPOLL_THREAD_FAILED, "index=%d", i, NULL); + if (i == 0) { + GF_FREE(ev_data); + break; + } else { + GF_FREE(ev_data); + continue; + } + } + } + } + pthread_mutex_unlock(&event_pool->mutex); + + /* Just wait for the first thread, that is created in a joinable state + * and will never die, ensuring this function never returns */ + if (event_pool->pollers[0] != 0) + pthread_join(event_pool->pollers[0], NULL); + + pthread_mutex_lock(&event_pool->mutex); + { + event_pool->activethreadcount--; + } + pthread_mutex_unlock(&event_pool->mutex); + + return ret; +} + +/** + * @param event_pool event_pool on which fds of interest are registered for + * events. + * + * @return 1 if at least one epoll worker thread is spawned, 0 otherwise + * + * NB This function SHOULD be called under event_pool->mutex. + */ + +static int +event_pool_dispatched_unlocked(struct event_pool *event_pool) +{ + return (event_pool->pollers[0] != 0); +} + +int +event_reconfigure_threads_epoll(struct event_pool *event_pool, int value) +{ + int i; + int ret = 0; + pthread_t t_id; + int oldthreadcount; + struct event_thread_data *ev_data = NULL; + + pthread_mutex_lock(&event_pool->mutex); + { + /* Reconfigure to 0 threads is allowed only in destroy mode */ + if (event_pool->destroy == 1) { + value = 0; + } else { + /* Set to MAX if greater */ + if (value > EVENT_MAX_THREADS) + value = EVENT_MAX_THREADS; + + /* Default pollers to 1 in case this is set incorrectly */ + if (value <= 0) + value = 1; + } + + oldthreadcount = event_pool->eventthreadcount; + + /* Start 'worker' threads as necessary only if event_dispatch() + * was called before. If event_dispatch() was not called, there + * will be no epoll 'worker' threads running yet. */ + + if (event_pool_dispatched_unlocked(event_pool) && + (oldthreadcount < value)) { + /* create more poll threads */ + for (i = oldthreadcount; i < value; i++) { + /* Start a thread if the index at this location + * is a 0, so that the older thread is confirmed + * as dead */ + if (event_pool->pollers[i] == 0) { + ev_data = GF_CALLOC(1, sizeof(*ev_data), + gf_common_mt_event_pool); + if (!ev_data) { + continue; + } + + ev_data->event_pool = event_pool; + ev_data->event_index = i + 1; + + ret = gf_thread_create(&t_id, NULL, + event_dispatch_epoll_worker, ev_data, + "epoll%03hx", i & 0x3ff); + if (ret) { + gf_smsg("epoll", GF_LOG_WARNING, 0, + LG_MSG_START_EPOLL_THREAD_FAILED, "index=%d", i, + NULL); + GF_FREE(ev_data); + } else { + pthread_detach(t_id); + event_pool->pollers[i] = t_id; + } + } + } + } + + /* if value decreases, threads will terminate, themselves */ + event_pool->eventthreadcount = value; + } + pthread_mutex_unlock(&event_pool->mutex); + + return 0; +} + +/* This function is the destructor for the event_pool data structure + * Should be called only after poller_threads_destroy() is called, + * else will lead to crashes. + */ +static int +event_pool_destroy_epoll(struct event_pool *event_pool) +{ + int ret = 0, i = 0, j = 0; + struct event_slot_epoll *table = NULL; + + ret = sys_close(event_pool->fd); + + for (i = 0; i < EVENT_EPOLL_TABLES; i++) { + if (event_pool->ereg[i]) { + table = event_pool->ereg[i]; + event_pool->ereg[i] = NULL; + for (j = 0; j < EVENT_EPOLL_SLOTS; j++) { + LOCK_DESTROY(&table[j].lock); + } + GF_FREE(table); + } + } + + pthread_mutex_destroy(&event_pool->mutex); + pthread_cond_destroy(&event_pool->cond); + + GF_FREE(event_pool->evcache); + GF_FREE(event_pool->reg); + GF_FREE(event_pool); + + return ret; +} + +static int +event_handled_epoll(struct event_pool *event_pool, int fd, int idx, int gen) +{ + struct event_slot_epoll *slot = NULL; + struct epoll_event epoll_event = { + 0, + }; + struct event_data *ev_data = (void *)&epoll_event.data; + int ret = 0; + + slot = event_slot_get(event_pool, idx); + if (!slot) { + gf_smsg("epoll", GF_LOG_ERROR, 0, LG_MSG_SLOT_NOT_FOUND, "fd=%d", fd, + "idx=%d", idx, NULL); + return -1; + } + + assert(slot->fd == fd); + + LOCK(&slot->lock); + { + slot->in_handler--; + + if (gen != slot->gen) { + /* event_unregister() happened while we were + in handler() + */ + gf_msg_debug("epoll", 0, + "generation bumped on idx=%d" + " from gen=%d to slot->gen=%d, fd=%d, " + "slot->fd=%d", + idx, gen, slot->gen, fd, slot->fd); + goto unlock; + } + + /* This call also picks up the changes made by another + thread calling event_select_on_epoll() while this + thread was busy in handler() + */ + if (slot->in_handler == 0) { + epoll_event.events = slot->events; + ev_data->idx = idx; + ev_data->gen = gen; + + ret = epoll_ctl(event_pool->fd, EPOLL_CTL_MOD, fd, &epoll_event); + } + } +unlock: + UNLOCK(&slot->lock); + + event_slot_unref(event_pool, slot, idx); + + return ret; +} + +struct event_ops event_ops_epoll = { + .new = event_pool_new_epoll, + .event_register = event_register_epoll, + .event_select_on = event_select_on_epoll, + .event_unregister = event_unregister_epoll, + .event_unregister_close = event_unregister_close_epoll, + .event_dispatch = event_dispatch_epoll, + .event_reconfigure_threads = event_reconfigure_threads_epoll, + .event_pool_destroy = event_pool_destroy_epoll, + .event_handled = event_handled_epoll, +}; + +#endif |
