/* Copyright (c) 2012 Red Hat, Inc. This file is part of GlusterFS. This file is licensed to you under your choice of the GNU Lesser General Public License, version 3 or any later version (LGPLv3 or later), or the GNU General Public License, version 2 (GPLv2), in all cases as published by the Free Software Foundation. */ #include #include #include #include #include #include #include #include "glusterfs/logging.h" #include "glusterfs/gf-event.h" #include "glusterfs/mem-pool.h" #include "glusterfs/common-utils.h" #include "glusterfs/syscall.h" #include "glusterfs/libglusterfs-messages.h" struct event_slot_poll { int fd; int events; void *data; event_handler_t handler; }; static int event_register_poll(struct event_pool *event_pool, int fd, event_handler_t handler, void *data, int poll_in, int poll_out, char notify_poller_death); static void __flush_fd(int fd, int idx, int gen, void *data, int poll_in, int poll_out, int poll_err, char event_thread_died) { char buf[64]; int ret = -1; if (!poll_in) return; do { ret = sys_read(fd, buf, 64); if (ret == -1 && errno != EAGAIN) { gf_msg("poll", GF_LOG_ERROR, errno, LG_MSG_FILE_OP_FAILED, "read on %d returned " "error", fd); } } while (ret == 64); return; } static int __event_getindex(struct event_pool *event_pool, int fd, int idx) { int ret = -1; int i = 0; GF_VALIDATE_OR_GOTO("event", event_pool, out); /* lookup in used space based on index provided */ if (idx > -1 && idx < event_pool->used) { if (event_pool->reg[idx].fd == fd) { ret = idx; goto out; } } /* search in used space, if lookup fails */ for (i = 0; i < event_pool->used; i++) { if (event_pool->reg[i].fd == fd) { ret = i; break; } } out: return ret; } static struct event_pool * event_pool_new_poll(int count, int eventthreadcount) { struct event_pool *event_pool = NULL; int ret = -1; event_pool = GF_CALLOC(1, sizeof(*event_pool), gf_common_mt_event_pool); if (!event_pool) return NULL; event_pool->count = count; event_pool->reg = GF_CALLOC(event_pool->count, sizeof(*event_pool->reg), gf_common_mt_reg); if (!event_pool->reg) { GF_FREE(event_pool); return NULL; } pthread_mutex_init(&event_pool->mutex, NULL); ret = pipe(event_pool->breaker); if (ret == -1) { gf_msg("poll", GF_LOG_ERROR, errno, LG_MSG_PIPE_CREATE_FAILED, "pipe creation failed"); GF_FREE(event_pool->reg); GF_FREE(event_pool); return NULL; } ret = fcntl(event_pool->breaker[0], F_SETFL, O_NONBLOCK); if (ret == -1) { gf_msg("poll", GF_LOG_ERROR, errno, LG_MSG_SET_PIPE_FAILED, "could not set pipe to non blocking mode"); sys_close(event_pool->breaker[0]); sys_close(event_pool->breaker[1]); event_pool->breaker[0] = event_pool->breaker[1] = -1; GF_FREE(event_pool->reg); GF_FREE(event_pool); return NULL; } ret = fcntl(event_pool->breaker[1], F_SETFL, O_NONBLOCK); if (ret == -1) { gf_msg("poll", GF_LOG_ERROR, errno, LG_MSG_SET_PIPE_FAILED, "could not set pipe to non blocking mode"); sys_close(event_pool->breaker[0]); sys_close(event_pool->breaker[1]); event_pool->breaker[0] = event_pool->breaker[1] = -1; GF_FREE(event_pool->reg); GF_FREE(event_pool); return NULL; } ret = event_register_poll(event_pool, event_pool->breaker[0], __flush_fd, NULL, 1, 0, 0); if (ret == -1) { gf_msg("poll", GF_LOG_ERROR, 0, LG_MSG_REGISTER_PIPE_FAILED, "could not register pipe fd with poll event loop"); sys_close(event_pool->breaker[0]); sys_close(event_pool->breaker[1]); event_pool->breaker[0] = event_pool->breaker[1] = -1; GF_FREE(event_pool->reg); GF_FREE(event_pool); return NULL; } if (eventthreadcount > 1) { gf_msg("poll", GF_LOG_INFO, 0, LG_MSG_POLL_IGNORE_MULTIPLE_THREADS, "Currently poll " "does not use multiple event processing threads, " "thread count (%d) ignored", eventthreadcount); } /* although, eventhreadcount for poll implementation is always * going to be 1, eventthreadcount needs to be set to 1 so that * rpcsvc_request_handler() thread scaling works flawlessly in * both epoll and poll models */ event_pool->eventthreadcount = 1; return event_pool; } static int event_register_poll(struct event_pool *event_pool, int fd, event_handler_t handler, void *data, int poll_in, int poll_out, char notify_poller_death) { int idx = -1; GF_VALIDATE_OR_GOTO("event", event_pool, out); pthread_mutex_lock(&event_pool->mutex); { if (event_pool->count == event_pool->used) { event_pool->count += 256; event_pool->reg = GF_REALLOC( event_pool->reg, event_pool->count * sizeof(*event_pool->reg)); if (!event_pool->reg) goto unlock; } idx = event_pool->used++; event_pool->reg[idx].fd = fd; event_pool->reg[idx].events = POLLPRI; event_pool->reg[idx].handler = handler; event_pool->reg[idx].data = data; switch (poll_in) { case 1: event_pool->reg[idx].events |= POLLIN; break; case 0: event_pool->reg[idx].events &= ~POLLIN; break; case -1: /* do nothing */ break; default: gf_msg("poll", GF_LOG_ERROR, 0, LG_MSG_INVALID_POLL_IN, "invalid poll_in value %d", poll_in); break; } switch (poll_out) { case 1: event_pool->reg[idx].events |= POLLOUT; break; case 0: event_pool->reg[idx].events &= ~POLLOUT; break; case -1: /* do nothing */ break; default: gf_msg("poll", GF_LOG_ERROR, 0, LG_MSG_INVALID_POLL_OUT, "invalid poll_out value %d", poll_out); break; } event_pool->changed = 1; } unlock: pthread_mutex_unlock(&event_pool->mutex); out: return idx; } static int event_unregister_poll(struct event_pool *event_pool, int fd, int idx_hint) { int idx = -1; GF_VALIDATE_OR_GOTO("event", event_pool, out); pthread_mutex_lock(&event_pool->mutex); { idx = __event_getindex(event_pool, fd, idx_hint); if (idx == -1) { gf_msg("poll", GF_LOG_ERROR, 0, LG_MSG_INDEX_NOT_FOUND, "index not found for fd=%d (idx_hint=%d)", fd, idx_hint); errno = ENOENT; goto unlock; } event_pool->reg[idx] = event_pool->reg[--event_pool->used]; event_pool->changed = 1; } unlock: pthread_mutex_unlock(&event_pool->mutex); out: return idx; } static int event_unregister_close_poll(struct event_pool *event_pool, int fd, int idx_hint) { int ret = -1; ret = event_unregister_poll(event_pool, fd, idx_hint); sys_close(fd); return ret; } static int event_select_on_poll(struct event_pool *event_pool, int fd, int idx_hint, int poll_in, int poll_out) { int idx = -1; GF_VALIDATE_OR_GOTO("event", event_pool, out); pthread_mutex_lock(&event_pool->mutex); { idx = __event_getindex(event_pool, fd, idx_hint); if (idx == -1) { gf_msg("poll", GF_LOG_ERROR, 0, LG_MSG_INDEX_NOT_FOUND, "index not found for fd=%d (idx_hint=%d)", fd, idx_hint); errno = ENOENT; goto unlock; } switch (poll_in) { case 1: event_pool->reg[idx].events |= POLLIN; break; case 0: event_pool->reg[idx].events &= ~POLLIN; break; case -1: /* do nothing */ break; default: /* TODO: log error */ break; } switch (poll_out) { case 1: event_pool->reg[idx].events |= POLLOUT; break; case 0: event_pool->reg[idx].events &= ~POLLOUT; break; case -1: /* do nothing */ break; default: /* TODO: log error */ break; } if (poll_in + poll_out > -2) event_pool->changed = 1; } unlock: pthread_mutex_unlock(&event_pool->mutex); out: return idx; } static int event_dispatch_poll_handler(struct event_pool *event_pool, struct pollfd *ufds, int i) { event_handler_t handler = NULL; void *data = NULL; int idx = -1; int ret = 0; handler = NULL; data = NULL; pthread_mutex_lock(&event_pool->mutex); { idx = __event_getindex(event_pool, ufds[i].fd, i); if (idx == -1) { gf_msg("poll", GF_LOG_ERROR, 0, LG_MSG_INDEX_NOT_FOUND, "index not found for " "fd=%d (idx_hint=%d)", ufds[i].fd, i); goto unlock; } handler = event_pool->reg[idx].handler; data = event_pool->reg[idx].data; } unlock: pthread_mutex_unlock(&event_pool->mutex); if (handler) handler(ufds[i].fd, idx, 0, data, (ufds[i].revents & (POLLIN | POLLPRI)), (ufds[i].revents & (POLLOUT)), (ufds[i].revents & (POLLERR | POLLHUP | POLLNVAL)), 0); return ret; } static int event_dispatch_poll_resize(struct event_pool *event_pool, struct pollfd *ufds, int size) { int i = 0; pthread_mutex_lock(&event_pool->mutex); { if (event_pool->changed == 0) { goto unlock; } if (event_pool->used > event_pool->evcache_size) { GF_FREE(event_pool->evcache); event_pool->evcache = ufds = NULL; event_pool->evcache_size = event_pool->used; ufds = GF_CALLOC(sizeof(struct pollfd), event_pool->evcache_size, gf_common_mt_pollfd); if (!ufds) goto unlock; event_pool->evcache = ufds; } if (ufds == NULL) { goto unlock; } for (i = 0; i < event_pool->used; i++) { ufds[i].fd = event_pool->reg[i].fd; ufds[i].events = event_pool->reg[i].events; ufds[i].revents = 0; } size = i; } unlock: pthread_mutex_unlock(&event_pool->mutex); return size; } static int event_dispatch_poll(struct event_pool *event_pool) { struct pollfd *ufds = NULL; int size = 0; int i = 0; int ret = -1; GF_VALIDATE_OR_GOTO("event", event_pool, out); pthread_mutex_lock(&event_pool->mutex); { event_pool->activethreadcount = 1; } pthread_mutex_unlock(&event_pool->mutex); while (1) { pthread_mutex_lock(&event_pool->mutex); { if (event_pool->destroy == 1) { event_pool->activethreadcount = 0; pthread_cond_broadcast(&event_pool->cond); pthread_mutex_unlock(&event_pool->mutex); return 0; } } pthread_mutex_unlock(&event_pool->mutex); size = event_dispatch_poll_resize(event_pool, ufds, size); ufds = event_pool->evcache; ret = poll(ufds, size, 1); if (ret == 0) /* timeout */ continue; if (ret == -1 && errno == EINTR) /* sys call */ continue; for (i = 0; i < size; i++) { if (!ufds[i].revents) continue; event_dispatch_poll_handler(event_pool, ufds, i); } } out: return -1; } int event_reconfigure_threads_poll(struct event_pool *event_pool, int value) { /* No-op for poll */ return 0; } /* This function is the destructor for the event_pool data structure * Should be called only after poller_threads_destroy() is called, * else will lead to crashes. */ static int event_pool_destroy_poll(struct event_pool *event_pool) { int ret = 0; ret = sys_close(event_pool->breaker[0]); if (ret) return ret; ret = sys_close(event_pool->breaker[1]); if (ret) return ret; event_pool->breaker[0] = event_pool->breaker[1] = -1; GF_FREE(event_pool->reg); GF_FREE(event_pool); return ret; } struct event_ops event_ops_poll = { .new = event_pool_new_poll, .event_register = event_register_poll, .event_select_on = event_select_on_poll, .event_unregister = event_unregister_poll, .event_unregister_close = event_unregister_close_poll, .event_dispatch = event_dispatch_poll, .event_reconfigure_threads = event_reconfigure_threads_poll, .event_pool_destroy = event_pool_destroy_poll};