diff options
Diffstat (limited to 'libglusterfs/src/event.c')
| -rw-r--r-- | libglusterfs/src/event.c | 1136 |
1 files changed, 227 insertions, 909 deletions
diff --git a/libglusterfs/src/event.c b/libglusterfs/src/event.c index cb6abd4180e..402c253ca25 100644 --- a/libglusterfs/src/event.c +++ b/libglusterfs/src/event.c @@ -1,20 +1,11 @@ /* - Copyright (c) 2008-2010 Gluster, Inc. <http://www.gluster.com> - This file is part of GlusterFS. - - GlusterFS is free software; you can redistribute it and/or modify - it under the terms of the GNU Affero General Public License as published - by the Free Software Foundation; either version 3 of the License, - or (at your option) any later version. - - GlusterFS is distributed in the hope that it will be useful, but - WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - Affero General Public License for more details. - - You should have received a copy of the GNU Affero General Public License - along with this program. If not, see - <http://www.gnu.org/licenses/>. + Copyright (c) 2008-2012 Red Hat, Inc. <http://www.redhat.com> + This file is part of GlusterFS. + + This file is licensed to you under your choice of the GNU Lesser + General Public License, version 3 or any later version (LGPLv3 or + later), or the GNU General Public License, version 2 (GPLv2), in all + cases as published by the Free Software Foundation. */ #include <sys/poll.h> @@ -25,963 +16,290 @@ #include <errno.h> #include <string.h> -#include "logging.h" -#include "event.h" -#include "mem-pool.h" - - -#ifndef _CONFIG_H -#define _CONFIG_H -#include "config.h" -#endif - -static int -event_register_poll (struct event_pool *event_pool, int fd, - event_handler_t handler, - void *data, int poll_in, int poll_out); - +#include "glusterfs/gf-event.h" +#include "glusterfs/timespec.h" +#include "glusterfs/common-utils.h" +#include "glusterfs/libglusterfs-messages.h" +#include "glusterfs/syscall.h" -static int -__flush_fd (int fd, int idx, void *data, - int poll_in, int poll_out, int poll_err) -{ - char buf[64]; - int ret = -1; - - if (!poll_in) - return ret; - - do { - ret = read (fd, buf, 64); - if (ret == -1 && errno != EAGAIN) { - gf_log ("poll", GF_LOG_ERROR, - "read on %d returned error (%s)", - fd, strerror (errno)); - } - } while (ret == 64); - - return ret; -} - - -static int -__event_getindex (struct event_pool *event_pool, int fd, int idx) +struct event_pool * +gf_event_pool_new(int count, int eventthreadcount) { - int ret = -1; - int i = 0; - - if (event_pool == NULL) { - gf_log ("event", GF_LOG_ERROR, "invalid argument"); - return -1; - } - - if (idx > -1 && idx < event_pool->used) { - if (event_pool->reg[idx].fd == fd) - ret = idx; - } - - for (i=0; ret == -1 && i<event_pool->used; i++) { - if (event_pool->reg[i].fd == fd) { - ret = i; - break; - } - } - - return ret; -} + struct event_pool *event_pool = NULL; + extern struct event_ops event_ops_poll; +#ifdef HAVE_SYS_EPOLL_H + extern struct event_ops event_ops_epoll; -static struct event_pool * -event_pool_new_poll (int count) -{ - struct event_pool *event_pool = NULL; - int ret = -1; - - event_pool = GF_CALLOC (1, sizeof (*event_pool), - gf_common_mt_event_pool); - - if (!event_pool) - return NULL; - - event_pool->count = count; - event_pool->reg = GF_CALLOC (event_pool->count, - sizeof (*event_pool->reg), - gf_common_mt_reg); - - if (!event_pool->reg) { - gf_log ("poll", GF_LOG_CRITICAL, - "failed to allocate event registry"); - GF_FREE (event_pool); - return NULL; - } - - pthread_mutex_init (&event_pool->mutex, NULL); - - ret = pipe (event_pool->breaker); - - if (ret == -1) { - gf_log ("poll", GF_LOG_ERROR, - "pipe creation failed (%s)", strerror (errno)); - GF_FREE (event_pool->reg); - GF_FREE (event_pool); - return NULL; - } - - ret = fcntl (event_pool->breaker[0], F_SETFL, O_NONBLOCK); - if (ret == -1) { - gf_log ("poll", GF_LOG_ERROR, - "could not set pipe to non blocking mode (%s)", - strerror (errno)); - close (event_pool->breaker[0]); - close (event_pool->breaker[1]); - event_pool->breaker[0] = event_pool->breaker[1] = -1; - - GF_FREE (event_pool->reg); - GF_FREE (event_pool); - return NULL; - } - - ret = fcntl (event_pool->breaker[1], F_SETFL, O_NONBLOCK); - if (ret == -1) { - gf_log ("poll", GF_LOG_ERROR, - "could not set pipe to non blocking mode (%s)", - strerror (errno)); - - close (event_pool->breaker[0]); - close (event_pool->breaker[1]); - event_pool->breaker[0] = event_pool->breaker[1] = -1; - - GF_FREE (event_pool->reg); - GF_FREE (event_pool); - return NULL; - } - - ret = event_register_poll (event_pool, event_pool->breaker[0], - __flush_fd, NULL, 1, 0); - if (ret == -1) { - gf_log ("poll", GF_LOG_ERROR, - "could not register pipe fd with poll event loop"); - close (event_pool->breaker[0]); - close (event_pool->breaker[1]); - event_pool->breaker[0] = event_pool->breaker[1] = -1; - - GF_FREE (event_pool->reg); - GF_FREE (event_pool); - return NULL; - } - - return event_pool; -} + event_pool = event_ops_epoll.new(count, eventthreadcount); + if (event_pool) { + event_pool->ops = &event_ops_epoll; + } else { + gf_msg("event", GF_LOG_WARNING, 0, LG_MSG_FALLBACK_TO_POLL, + "falling back to poll based event handling"); + } +#endif -static int -event_register_poll (struct event_pool *event_pool, int fd, - event_handler_t handler, - void *data, int poll_in, int poll_out) -{ - int idx = -1; - - if (event_pool == NULL) { - gf_log ("event", GF_LOG_ERROR, "invalid argument"); - return -1; - } - - pthread_mutex_lock (&event_pool->mutex); - { - if (event_pool->count == event_pool->used) - { - event_pool->count += 256; - - event_pool->reg = GF_REALLOC (event_pool->reg, - event_pool->count * - sizeof (*event_pool->reg)); - if (!event_pool->reg) - goto unlock; - } - - idx = event_pool->used++; - - event_pool->reg[idx].fd = fd; - event_pool->reg[idx].events = POLLPRI; - event_pool->reg[idx].handler = handler; - event_pool->reg[idx].data = data; - - switch (poll_in) { - case 1: - event_pool->reg[idx].events |= POLLIN; - break; - case 0: - event_pool->reg[idx].events &= ~POLLIN; - break; - case -1: - /* do nothing */ - break; - default: - gf_log ("poll", GF_LOG_ERROR, - "invalid poll_in value %d", poll_in); - break; - } - - switch (poll_out) { - case 1: - event_pool->reg[idx].events |= POLLOUT; - break; - case 0: - event_pool->reg[idx].events &= ~POLLOUT; - break; - case -1: - /* do nothing */ - break; - default: - gf_log ("poll", GF_LOG_ERROR, - "invalid poll_out value %d", poll_out); - break; - } - - event_pool->changed = 1; - - } -unlock: - pthread_mutex_unlock (&event_pool->mutex); - - return idx; -} + if (!event_pool) { + event_pool = event_ops_poll.new(count, eventthreadcount); + if (event_pool) + event_pool->ops = &event_ops_poll; + } -static int -event_unregister_poll (struct event_pool *event_pool, int fd, int idx_hint) -{ - int idx = -1; - - if (event_pool == NULL) { - gf_log ("event", GF_LOG_ERROR, "invalid argument"); - return -1; - } - - pthread_mutex_lock (&event_pool->mutex); - { - idx = __event_getindex (event_pool, fd, idx_hint); - - if (idx == -1) { - gf_log ("poll", GF_LOG_ERROR, - "index not found for fd=%d (idx_hint=%d)", - fd, idx_hint); - errno = ENOENT; - goto unlock; - } - - event_pool->reg[idx] = event_pool->reg[--event_pool->used]; - event_pool->changed = 1; - } -unlock: - pthread_mutex_unlock (&event_pool->mutex); - - return idx; + return event_pool; } - -static int -event_select_on_poll (struct event_pool *event_pool, int fd, int idx_hint, - int poll_in, int poll_out) -{ - int idx = -1; - - if (event_pool == NULL) { - gf_log ("event", GF_LOG_ERROR, "invalid argument"); - return -1; - } - - pthread_mutex_lock (&event_pool->mutex); - { - idx = __event_getindex (event_pool, fd, idx_hint); - - if (idx == -1) { - gf_log ("poll", GF_LOG_ERROR, - "index not found for fd=%d (idx_hint=%d)", - fd, idx_hint); - errno = ENOENT; - goto unlock; - } - - switch (poll_in) { - case 1: - event_pool->reg[idx].events |= POLLIN; - break; - case 0: - event_pool->reg[idx].events &= ~POLLIN; - break; - case -1: - /* do nothing */ - break; - default: - /* TODO: log error */ - break; - } - - switch (poll_out) { - case 1: - event_pool->reg[idx].events |= POLLOUT; - break; - case 0: - event_pool->reg[idx].events &= ~POLLOUT; - break; - case -1: - /* do nothing */ - break; - default: - /* TODO: log error */ - break; - } - - if (poll_in + poll_out > -2) - event_pool->changed = 1; - } -unlock: - pthread_mutex_unlock (&event_pool->mutex); - - return idx; -} - - -static int -event_dispatch_poll_handler (struct event_pool *event_pool, - struct pollfd *ufds, int i) +int +gf_event_register(struct event_pool *event_pool, int fd, + event_handler_t handler, void *data, int poll_in, + int poll_out, char notify_poller_death) { - event_handler_t handler = NULL; - void *data = NULL; - int idx = -1; - int ret = 0; - - handler = NULL; - data = NULL; - - pthread_mutex_lock (&event_pool->mutex); - { - idx = __event_getindex (event_pool, ufds[i].fd, i); - - if (idx == -1) { - gf_log ("poll", GF_LOG_ERROR, - "index not found for fd=%d (idx_hint=%d)", - ufds[i].fd, i); - goto unlock; - } - - handler = event_pool->reg[idx].handler; - data = event_pool->reg[idx].data; - } -unlock: - pthread_mutex_unlock (&event_pool->mutex); - - if (handler) - ret = handler (ufds[i].fd, idx, data, - (ufds[i].revents & (POLLIN|POLLPRI)), - (ufds[i].revents & (POLLOUT)), - (ufds[i].revents & (POLLERR|POLLHUP|POLLNVAL))); - - return ret; -} + int ret = -1; + GF_VALIDATE_OR_GOTO("event", event_pool, out); -static int -event_dispatch_poll_resize (struct event_pool *event_pool, - struct pollfd *ufds, int size) -{ - int i = 0; - - pthread_mutex_lock (&event_pool->mutex); - { - if (event_pool->changed == 0) { - goto unlock; - } - - if (event_pool->used > event_pool->evcache_size) { - if (event_pool->evcache) - GF_FREE (event_pool->evcache); - - event_pool->evcache = ufds = NULL; - - event_pool->evcache_size = event_pool->used; - - ufds = GF_CALLOC (sizeof (struct pollfd), - event_pool->evcache_size, - gf_common_mt_pollfd); - if (!ufds) - goto unlock; - event_pool->evcache = ufds; - } - - for (i = 0; i < event_pool->used; i++) { - ufds[i].fd = event_pool->reg[i].fd; - ufds[i].events = event_pool->reg[i].events; - ufds[i].revents = 0; - } - - size = i; - } -unlock: - pthread_mutex_unlock (&event_pool->mutex); - - return size; + ret = event_pool->ops->event_register( + event_pool, fd, handler, data, poll_in, poll_out, notify_poller_death); +out: + return ret; } - -static int -event_dispatch_poll (struct event_pool *event_pool) +int +gf_event_unregister(struct event_pool *event_pool, int fd, int idx) { - struct pollfd *ufds = NULL; - int size = 0; - int i = 0; - int ret = -1; - - - if (event_pool == NULL) { - gf_log ("event", GF_LOG_ERROR, "invalid argument"); - return -1; - } - - while (1) { - size = event_dispatch_poll_resize (event_pool, ufds, size); - ufds = event_pool->evcache; - - ret = poll (ufds, size, 1); - - if (ret == 0) - /* timeout */ - continue; + int ret = -1; - if (ret == -1 && errno == EINTR) - /* sys call */ - continue; + GF_VALIDATE_OR_GOTO("event", event_pool, out); - for (i = 0; i < size; i++) { - if (!ufds[i].revents) - continue; + ret = event_pool->ops->event_unregister(event_pool, fd, idx); - event_dispatch_poll_handler (event_pool, ufds, i); - } - } - - return -1; +out: + return ret; } - -static struct event_ops event_ops_poll = { - .new = event_pool_new_poll, - .event_register = event_register_poll, - .event_select_on = event_select_on_poll, - .event_unregister = event_unregister_poll, - .event_dispatch = event_dispatch_poll -}; - - - -#ifdef HAVE_SYS_EPOLL_H -#include <sys/epoll.h> - - -static struct event_pool * -event_pool_new_epoll (int count) +int +gf_event_unregister_close(struct event_pool *event_pool, int fd, int idx) { - struct event_pool *event_pool = NULL; - int epfd = -1; - - event_pool = GF_CALLOC (1, sizeof (*event_pool), - gf_common_mt_event_pool); + int ret = -1; - if (!event_pool) - return NULL; + GF_VALIDATE_OR_GOTO("event", event_pool, out); - event_pool->count = count; - event_pool->reg = GF_CALLOC (event_pool->count, - sizeof (*event_pool->reg), - gf_common_mt_reg); + ret = event_pool->ops->event_unregister_close(event_pool, fd, idx); - if (!event_pool->reg) { - gf_log ("epoll", GF_LOG_CRITICAL, - "event registry allocation failed"); - GF_FREE (event_pool); - return NULL; - } - - epfd = epoll_create (count); - - if (epfd == -1) { - gf_log ("epoll", GF_LOG_ERROR, "epoll fd creation failed (%s)", - strerror (errno)); - GF_FREE (event_pool->reg); - GF_FREE (event_pool); - return NULL; - } - - event_pool->fd = epfd; - - event_pool->count = count; - - pthread_mutex_init (&event_pool->mutex, NULL); - pthread_cond_init (&event_pool->cond, NULL); - - return event_pool; +out: + return ret; } - int -event_register_epoll (struct event_pool *event_pool, int fd, - event_handler_t handler, - void *data, int poll_in, int poll_out) +gf_event_select_on(struct event_pool *event_pool, int fd, int idx_hint, + int poll_in, int poll_out) { - int idx = -1; - int ret = -1; - struct epoll_event epoll_event = {0, }; - struct event_data *ev_data = (void *)&epoll_event.data; - - - if (event_pool == NULL) { - gf_log ("event", GF_LOG_ERROR, "invalid argument"); - return -1; - } - - pthread_mutex_lock (&event_pool->mutex); - { - if (event_pool->count == event_pool->used) { - event_pool->count *= 2; - - event_pool->reg = GF_REALLOC (event_pool->reg, - event_pool->count * - sizeof (*event_pool->reg)); - - if (!event_pool->reg) { - gf_log ("epoll", GF_LOG_ERROR, - "event registry re-allocation failed"); - goto unlock; - } - } - - idx = event_pool->used; - event_pool->used++; - - event_pool->reg[idx].fd = fd; - event_pool->reg[idx].events = EPOLLPRI; - event_pool->reg[idx].handler = handler; - event_pool->reg[idx].data = data; - - switch (poll_in) { - case 1: - event_pool->reg[idx].events |= EPOLLIN; - break; - case 0: - event_pool->reg[idx].events &= ~EPOLLIN; - break; - case -1: - /* do nothing */ - break; - default: - gf_log ("epoll", GF_LOG_ERROR, - "invalid poll_in value %d", poll_in); - break; - } - - switch (poll_out) { - case 1: - event_pool->reg[idx].events |= EPOLLOUT; - break; - case 0: - event_pool->reg[idx].events &= ~EPOLLOUT; - break; - case -1: - /* do nothing */ - break; - default: - gf_log ("epoll", GF_LOG_ERROR, - "invalid poll_out value %d", poll_out); - break; - } - - event_pool->changed = 1; - - epoll_event.events = event_pool->reg[idx].events; - ev_data->fd = fd; - ev_data->idx = idx; - - ret = epoll_ctl (event_pool->fd, EPOLL_CTL_ADD, fd, - &epoll_event); - - if (ret == -1) { - gf_log ("epoll", GF_LOG_ERROR, - "failed to add fd(=%d) to epoll fd(=%d) (%s)", - fd, event_pool->fd, strerror (errno)); - goto unlock; - } - - pthread_cond_broadcast (&event_pool->cond); - } -unlock: - pthread_mutex_unlock (&event_pool->mutex); - - return ret; -} + int ret = -1; + GF_VALIDATE_OR_GOTO("event", event_pool, out); -static int -event_unregister_epoll (struct event_pool *event_pool, int fd, int idx_hint) -{ - int idx = -1; - int ret = -1; - - struct epoll_event epoll_event = {0, }; - struct event_data *ev_data = (void *)&epoll_event.data; - int lastidx = -1; - - if (event_pool == NULL) { - gf_log ("event", GF_LOG_ERROR, "invalid argument"); - return -1; - } - - pthread_mutex_lock (&event_pool->mutex); - { - idx = __event_getindex (event_pool, fd, idx_hint); - - if (idx == -1) { - gf_log ("epoll", GF_LOG_ERROR, - "index not found for fd=%d (idx_hint=%d)", - fd, idx_hint); - errno = ENOENT; - goto unlock; - } - - ret = epoll_ctl (event_pool->fd, EPOLL_CTL_DEL, fd, NULL); - - /* if ret is -1, this array member should never be accessed */ - /* if it is 0, the array member might be used by idx_cache - * in which case the member should not be accessed till - * it is reallocated - */ - - event_pool->reg[idx].fd = -1; - - if (ret == -1) { - gf_log ("epoll", GF_LOG_ERROR, - "fail to del fd(=%d) from epoll fd(=%d) (%s)", - fd, event_pool->fd, strerror (errno)); - goto unlock; - } - - lastidx = event_pool->used - 1; - if (lastidx == idx) { - event_pool->used--; - goto unlock; - } - - epoll_event.events = event_pool->reg[lastidx].events; - ev_data->fd = event_pool->reg[lastidx].fd; - ev_data->idx = idx; - - ret = epoll_ctl (event_pool->fd, EPOLL_CTL_MOD, ev_data->fd, - &epoll_event); - if (ret == -1) { - gf_log ("epoll", GF_LOG_ERROR, - "fail to modify fd(=%d) index %d to %d (%s)", - ev_data->fd, event_pool->used, idx, - strerror (errno)); - goto unlock; - } - - /* just replace the unregistered idx by last one */ - event_pool->reg[idx] = event_pool->reg[lastidx]; - event_pool->used--; - } -unlock: - pthread_mutex_unlock (&event_pool->mutex); - - return ret; + ret = event_pool->ops->event_select_on(event_pool, fd, idx_hint, poll_in, + poll_out); +out: + return ret; } - -static int -event_select_on_epoll (struct event_pool *event_pool, int fd, int idx_hint, - int poll_in, int poll_out) +int +gf_event_dispatch(struct event_pool *event_pool) { - int idx = -1; - int ret = -1; - - struct epoll_event epoll_event = {0, }; - struct event_data *ev_data = (void *)&epoll_event.data; - - - if (event_pool == NULL) { - gf_log ("event", GF_LOG_ERROR, "invalid argument"); - return -1; - } - - pthread_mutex_lock (&event_pool->mutex); - { - idx = __event_getindex (event_pool, fd, idx_hint); - - if (idx == -1) { - gf_log ("epoll", GF_LOG_ERROR, - "index not found for fd=%d (idx_hint=%d)", - fd, idx_hint); - errno = ENOENT; - goto unlock; - } - - switch (poll_in) { - case 1: - event_pool->reg[idx].events |= EPOLLIN; - break; - case 0: - event_pool->reg[idx].events &= ~EPOLLIN; - break; - case -1: - /* do nothing */ - break; - default: - gf_log ("epoll", GF_LOG_ERROR, - "invalid poll_in value %d", poll_in); - break; - } - - switch (poll_out) { - case 1: - event_pool->reg[idx].events |= EPOLLOUT; - break; - case 0: - event_pool->reg[idx].events &= ~EPOLLOUT; - break; - case -1: - /* do nothing */ - break; - default: - gf_log ("epoll", GF_LOG_ERROR, - "invalid poll_out value %d", poll_out); - break; - } - - epoll_event.events = event_pool->reg[idx].events; - ev_data->fd = fd; - ev_data->idx = idx; - - ret = epoll_ctl (event_pool->fd, EPOLL_CTL_MOD, fd, - &epoll_event); - if (ret == -1) { - gf_log ("epoll", GF_LOG_ERROR, - "failed to modify fd(=%d) events to %d", - fd, epoll_event.events); - } - } -unlock: - pthread_mutex_unlock (&event_pool->mutex); - - return ret; -} + int ret = -1; + GF_VALIDATE_OR_GOTO("event", event_pool, out); -static int -event_dispatch_epoll_handler (struct event_pool *event_pool, - struct epoll_event *events, int i) -{ - struct event_data *event_data = NULL; - event_handler_t handler = NULL; - void *data = NULL; - int idx = -1; - int ret = -1; - - - event_data = (void *)&events[i].data; - handler = NULL; - data = NULL; - - pthread_mutex_lock (&event_pool->mutex); - { - idx = __event_getindex (event_pool, event_data->fd, - event_data->idx); - - if (idx == -1) { - gf_log ("epoll", GF_LOG_ERROR, - "index not found for fd(=%d) (idx_hint=%d)", - event_data->fd, event_data->idx); - goto unlock; - } - - handler = event_pool->reg[idx].handler; - data = event_pool->reg[idx].data; - } -unlock: - pthread_mutex_unlock (&event_pool->mutex); - - if (handler) - ret = handler (event_data->fd, event_data->idx, data, - (events[i].events & (EPOLLIN|EPOLLPRI)), - (events[i].events & (EPOLLOUT)), - (events[i].events & (EPOLLERR|EPOLLHUP))); - return ret; -} + ret = event_pool->ops->event_dispatch(event_pool); + if (ret) + goto out; +out: + return ret; +} -static int -event_dispatch_epoll (struct event_pool *event_pool) +int +gf_event_reconfigure_threads(struct event_pool *event_pool, int value) { - struct epoll_event *events = NULL; - int size = 0; - int i = 0; - int ret = -1; - - - if (event_pool == NULL) { - gf_log ("event", GF_LOG_ERROR, "invalid argument"); - return -1; - } - - while (1) { - pthread_mutex_lock (&event_pool->mutex); - { - while (event_pool->used == 0) - pthread_cond_wait (&event_pool->cond, - &event_pool->mutex); - - if (event_pool->used > event_pool->evcache_size) { - if (event_pool->evcache) - GF_FREE (event_pool->evcache); - - event_pool->evcache = events = NULL; - - event_pool->evcache_size = - event_pool->used + 256; - - events = GF_CALLOC (event_pool->evcache_size, - sizeof (struct epoll_event), - gf_common_mt_epoll_event); + int ret = -1; - event_pool->evcache = events; - } - } - pthread_mutex_unlock (&event_pool->mutex); + GF_VALIDATE_OR_GOTO("event", event_pool, out); - ret = epoll_wait (event_pool->fd, event_pool->evcache, - event_pool->evcache_size, -1); + /* call event refresh function */ + ret = event_pool->ops->event_reconfigure_threads(event_pool, value); - if (ret == 0) - /* timeout */ - continue; - - if (ret == -1 && errno == EINTR) - /* sys call */ - continue; - - size = ret; - - for (i = 0; i < size; i++) { - if (!events || !events[i].events) - continue; - - ret = event_dispatch_epoll_handler (event_pool, - events, i); - } - } - - return -1; +out: + return ret; } - -static struct event_ops event_ops_epoll = { - .new = event_pool_new_epoll, - .event_register = event_register_epoll, - .event_select_on = event_select_on_epoll, - .event_unregister = event_unregister_epoll, - .event_dispatch = event_dispatch_epoll -}; - -#endif - - -struct event_pool * -event_pool_new (int count) +int +gf_event_pool_destroy(struct event_pool *event_pool) { - struct event_pool *event_pool = NULL; + int ret = -1; + int destroy = 0, activethreadcount = 0; -#ifdef HAVE_SYS_EPOLL_H - event_pool = event_ops_epoll.new (count); - - if (event_pool) { - event_pool->ops = &event_ops_epoll; - } else { - gf_log ("event", GF_LOG_WARNING, - "failing back to poll based event handling"); - } -#endif + GF_VALIDATE_OR_GOTO("event", event_pool, out); - if (!event_pool) { - event_pool = event_ops_poll.new (count); + pthread_mutex_lock(&event_pool->mutex); + { + destroy = event_pool->destroy; + activethreadcount = event_pool->activethreadcount; + } + pthread_mutex_unlock(&event_pool->mutex); - if (event_pool) - event_pool->ops = &event_ops_poll; - } + if (!destroy || (activethreadcount > 0)) { + goto out; + } - return event_pool; + ret = event_pool->ops->event_pool_destroy(event_pool); +out: + return ret; } - -int -event_register (struct event_pool *event_pool, int fd, - event_handler_t handler, - void *data, int poll_in, int poll_out) +void +poller_destroy_handler(int fd, int idx, int gen, void *data, int poll_out, + int poll_in, int poll_err, char event_thread_exit) { - int ret = -1; - - if (event_pool == NULL) { - gf_log ("event", GF_LOG_ERROR, "invalid argument"); - return -1; - } - - ret = event_pool->ops->event_register (event_pool, fd, handler, data, - poll_in, poll_out); - return ret; -} + struct event_destroy_data *destroy = NULL; + int readfd = -1; + char buf = '\0'; + destroy = data; + readfd = destroy->readfd; + if (readfd < 0) { + goto out; + } -int -event_unregister (struct event_pool *event_pool, int fd, int idx) -{ - int ret = -1; + while (sys_read(readfd, &buf, 1) > 0) { + } - if (event_pool == NULL) { - gf_log ("event", GF_LOG_ERROR, "invalid argument"); - return -1; - } - - ret = event_pool->ops->event_unregister (event_pool, fd, idx); +out: + gf_event_handled(destroy->pool, fd, idx, gen); - return ret; + return; } - +/* This function destroys all the poller threads. + * Note: to be called before gf_event_pool_destroy is called. + * The order in which cleaning is performed: + * - Register a pipe fd(this is for waking threads in poll()/epoll_wait()) + * - Set the destroy mode, which this no new event registration will succeed + * - Reconfigure the thread count to 0(this will succeed only in destroy mode) + * - Wake up all the threads in poll() or epoll_wait(), so that they can + * destroy themselves. + * - Wait for the thread to join(which will happen only after all the other + * threads are destroyed) + */ int -event_select_on (struct event_pool *event_pool, int fd, int idx_hint, - int poll_in, int poll_out) +gf_event_dispatch_destroy(struct event_pool *event_pool) { - int ret = -1; - - if (event_pool == NULL) { - gf_log ("event", GF_LOG_ERROR, "invalid argument"); - return -1; - } - - ret = event_pool->ops->event_select_on (event_pool, fd, idx_hint, - poll_in, poll_out); - return ret; + int ret = -1, threadcount = 0; + int fd[2] = {-1}; + int idx = -1; + int flags = 0; + struct timespec sleep_till = { + 0, + }; + struct event_destroy_data data = { + 0, + }; + + GF_VALIDATE_OR_GOTO("event", event_pool, out); + + ret = pipe(fd); + if (ret < 0) + goto out; + + /* Make the read end of the pipe nonblocking */ + flags = fcntl(fd[0], F_GETFL); + flags |= O_NONBLOCK; + ret = fcntl(fd[0], F_SETFL, flags); + if (ret < 0) + goto out; + + /* Make the write end of the pipe nonblocking */ + flags = fcntl(fd[1], F_GETFL); + flags |= O_NONBLOCK; + ret = fcntl(fd[1], F_SETFL, flags); + if (ret < 0) + goto out; + + data.pool = event_pool; + data.readfd = fd[1]; + + /* From the main thread register an event on the pipe fd[0], + */ + idx = gf_event_register(event_pool, fd[0], poller_destroy_handler, &data, 1, + 0, 0); + if (idx < 0) + goto out; + + /* Enter the destroy mode first, set this before reconfiguring to 0 + * threads, to prevent further reconfigure to thread count > 0. + */ + pthread_mutex_lock(&event_pool->mutex); + { + threadcount = event_pool->eventthreadcount; + event_pool->destroy = 1; + } + pthread_mutex_unlock(&event_pool->mutex); + + ret = gf_event_reconfigure_threads(event_pool, 0); + if (ret < 0) + goto out; + + /* Write something onto the write end of the pipe(fd[1]) so that + * poll wakes up and calls the handler, poller_destroy_handler() + */ + pthread_mutex_lock(&event_pool->mutex); + { + /* Write to pipe(fd[1]) and then wait for 1 second or until + * a poller thread that is dying, broadcasts. Make sure we + * do not loop forever by limiting to 10 retries + */ + int retry = 0; + + while (event_pool->activethreadcount > 0 && + (retry++ < (threadcount + 10))) { + if (sys_write(fd[1], "dummy", 6) == -1) { + break; + } + timespec_now_realtime(&sleep_till); + sleep_till.tv_sec += 1; + ret = pthread_cond_timedwait(&event_pool->cond, &event_pool->mutex, + &sleep_till); + if (ret) { + gf_msg_debug("event", 0, + "thread cond-timedwait failed " + "active-thread-count: %d, " + "retry: %d", + event_pool->activethreadcount, retry); + } + } + } + pthread_mutex_unlock(&event_pool->mutex); + + ret = gf_event_unregister(event_pool, fd[0], idx); + +out: + if (fd[0] != -1) + sys_close(fd[0]); + if (fd[1] != -1) + sys_close(fd[1]); + + return ret; } - int -event_dispatch (struct event_pool *event_pool) +gf_event_handled(struct event_pool *event_pool, int fd, int idx, int gen) { - int ret = -1; + int ret = 0; - if (event_pool == NULL) { - gf_log ("event", GF_LOG_ERROR, "invalid argument"); - return -1; - } - - ret = event_pool->ops->event_dispatch (event_pool); + if (event_pool->ops->event_handled) + ret = event_pool->ops->event_handled(event_pool, fd, idx, gen); - return ret; + return ret; } |
