summaryrefslogtreecommitdiffstats
path: root/libglusterfs
diff options
context:
space:
mode:
Diffstat (limited to 'libglusterfs')
-rw-r--r--libglusterfs/src/event-epoll.c114
-rw-r--r--libglusterfs/src/event-poll.c10
-rw-r--r--libglusterfs/src/event.c10
-rw-r--r--libglusterfs/src/gf-event.h19
4 files changed, 125 insertions, 28 deletions
diff --git a/libglusterfs/src/event-epoll.c b/libglusterfs/src/event-epoll.c
index 9826cc9e275..041a7e6c583 100644
--- a/libglusterfs/src/event-epoll.c
+++ b/libglusterfs/src/event-epoll.c
@@ -30,6 +30,7 @@ struct event_slot_epoll {
int fd;
int events;
int gen;
+ int idx;
gf_atomic_t ref;
int do_close;
int in_handler;
@@ -37,6 +38,7 @@ struct event_slot_epoll {
void *data;
event_handler_t handler;
gf_lock_t lock;
+ struct list_head poller_death;
};
struct event_thread_data {
@@ -57,6 +59,7 @@ __event_newtable(struct event_pool *event_pool, int table_idx)
for (i = 0; i < EVENT_EPOLL_SLOTS; i++) {
table[i].fd = -1;
LOCK_INIT(&table[i].lock);
+ INIT_LIST_HEAD(&table[i].poller_death);
}
event_pool->ereg[table_idx] = table;
@@ -66,7 +69,8 @@ __event_newtable(struct event_pool *event_pool, int table_idx)
}
static int
-__event_slot_alloc(struct event_pool *event_pool, int fd)
+__event_slot_alloc(struct event_pool *event_pool, int fd,
+ char notify_poller_death)
{
int i = 0;
int table_idx = -1;
@@ -109,8 +113,15 @@ __event_slot_alloc(struct event_pool *event_pool, int fd)
table[i].gen = gen + 1;
LOCK_INIT(&table[i].lock);
+ INIT_LIST_HEAD(&table[i].poller_death);
table[i].fd = fd;
+ if (notify_poller_death) {
+ table[i].idx = table_idx * EVENT_EPOLL_SLOTS + i;
+ list_add_tail(&table[i].poller_death,
+ &event_pool->poller_death);
+ }
+
event_pool->slots_used[table_idx]++;
break;
@@ -121,13 +132,14 @@ __event_slot_alloc(struct event_pool *event_pool, int fd)
}
static int
-event_slot_alloc(struct event_pool *event_pool, int fd)
+event_slot_alloc(struct event_pool *event_pool, int fd,
+ char notify_poller_death)
{
int idx = -1;
pthread_mutex_lock(&event_pool->mutex);
{
- idx = __event_slot_alloc(event_pool, fd);
+ idx = __event_slot_alloc(event_pool, fd, notify_poller_death);
}
pthread_mutex_unlock(&event_pool->mutex);
@@ -155,6 +167,7 @@ __event_slot_dealloc(struct event_pool *event_pool, int idx)
slot->fd = -1;
slot->handled_error = 0;
slot->in_handler = 0;
+ list_del_init(&slot->poller_death);
event_pool->slots_used[table_idx]--;
return;
@@ -172,6 +185,15 @@ event_slot_dealloc(struct event_pool *event_pool, int idx)
return;
}
+static int
+event_slot_ref(struct event_slot_epoll *slot)
+{
+ if (!slot)
+ return -1;
+
+ return GF_ATOMIC_INC(slot->ref);
+}
+
static struct event_slot_epoll *
event_slot_get(struct event_pool *event_pool, int idx)
{
@@ -188,12 +210,41 @@ event_slot_get(struct event_pool *event_pool, int idx)
return NULL;
slot = &table[offset];
- GF_ATOMIC_INC(slot->ref);
+ event_slot_ref(slot);
return slot;
}
static void
+__event_slot_unref(struct event_pool *event_pool, struct event_slot_epoll *slot,
+ int idx)
+{
+ int ref = -1;
+ int fd = -1;
+ int do_close = 0;
+
+ ref = GF_ATOMIC_DEC(slot->ref);
+ if (ref)
+ /* slot still alive */
+ goto done;
+
+ LOCK(&slot->lock);
+ {
+ fd = slot->fd;
+ do_close = slot->do_close;
+ slot->do_close = 0;
+ }
+ UNLOCK(&slot->lock);
+
+ __event_slot_dealloc(event_pool, idx);
+
+ if (do_close)
+ sys_close(fd);
+done:
+ return;
+}
+
+static void
event_slot_unref(struct event_pool *event_pool, struct event_slot_epoll *slot,
int idx)
{
@@ -248,7 +299,7 @@ event_pool_new_epoll(int count, int eventthreadcount)
event_pool->fd = epfd;
event_pool->count = count;
-
+ INIT_LIST_HEAD(&event_pool->poller_death);
event_pool->eventthreadcount = eventthreadcount;
event_pool->auto_thread_count = 0;
@@ -297,7 +348,7 @@ __slot_update_events(struct event_slot_epoll *slot, int poll_in, int poll_out)
int
event_register_epoll(struct event_pool *event_pool, int fd,
event_handler_t handler, void *data, int poll_in,
- int poll_out)
+ int poll_out, char notify_poller_death)
{
int idx = -1;
int ret = -1;
@@ -328,7 +379,7 @@ event_register_epoll(struct event_pool *event_pool, int fd,
if (destroy == 1)
goto out;
- idx = event_slot_alloc(event_pool, fd);
+ idx = event_slot_alloc(event_pool, fd, notify_poller_death);
if (idx == -1) {
gf_msg("epoll", GF_LOG_ERROR, 0, LG_MSG_SLOT_NOT_FOUND,
"could not find slot for fd=%d", fd);
@@ -591,7 +642,7 @@ pre_unlock:
ret = handler(fd, idx, gen, data,
(event->events & (EPOLLIN | EPOLLPRI)),
(event->events & (EPOLLOUT)),
- (event->events & (EPOLLERR | EPOLLHUP)));
+ (event->events & (EPOLLERR | EPOLLHUP)), 0);
}
out:
event_slot_unref(event_pool, slot, idx);
@@ -607,7 +658,9 @@ event_dispatch_epoll_worker(void *data)
struct event_thread_data *ev_data = data;
struct event_pool *event_pool;
int myindex = -1;
- int timetodie = 0;
+ int timetodie = 0, gen = 0;
+ struct list_head poller_death_notify;
+ struct event_slot_epoll *slot = NULL, *tmp = NULL;
GF_VALIDATE_OR_GOTO("event", ev_data, out);
@@ -619,7 +672,7 @@ event_dispatch_epoll_worker(void *data)
gf_msg("epoll", GF_LOG_INFO, 0, LG_MSG_STARTED_EPOLL_THREAD,
"Started"
" thread with index %d",
- myindex);
+ myindex - 1);
pthread_mutex_lock(&event_pool->mutex);
{
@@ -637,20 +690,55 @@ event_dispatch_epoll_worker(void *data)
pthread_mutex_lock(&event_pool->mutex);
{
if (event_pool->eventthreadcount < myindex) {
+ while (event_pool->poller_death_sliced) {
+ pthread_cond_wait(&event_pool->cond,
+ &event_pool->mutex);
+ }
+
+ INIT_LIST_HEAD(&poller_death_notify);
/* if found true in critical section,
* die */
event_pool->pollers[myindex - 1] = 0;
event_pool->activethreadcount--;
timetodie = 1;
+ gen = ++event_pool->poller_gen;
+ list_for_each_entry(slot, &event_pool->poller_death,
+ poller_death)
+ {
+ event_slot_ref(slot);
+ }
+
+ list_splice_init(&event_pool->poller_death,
+ &poller_death_notify);
+ event_pool->poller_death_sliced = 1;
pthread_cond_broadcast(&event_pool->cond);
}
}
pthread_mutex_unlock(&event_pool->mutex);
if (timetodie) {
+ list_for_each_entry(slot, &poller_death_notify, poller_death)
+ {
+ slot->handler(slot->fd, 0, gen, slot->data, 0, 0, 0, 1);
+ }
+
+ pthread_mutex_lock(&event_pool->mutex);
+ {
+ list_for_each_entry_safe(slot, tmp, &poller_death_notify,
+ poller_death)
+ {
+ __event_slot_unref(event_pool, slot, slot->idx);
+ }
+
+ list_splice(&poller_death_notify,
+ &event_pool->poller_death);
+ event_pool->poller_death_sliced = 0;
+ pthread_cond_broadcast(&event_pool->cond);
+ }
+ pthread_mutex_unlock(&event_pool->mutex);
+
gf_msg("epoll", GF_LOG_INFO, 0, LG_MSG_EXITED_EPOLL_THREAD,
- "Exited "
- "thread with index %d",
- myindex);
+ "Exited thread with index %d", myindex);
+
goto out;
}
}
diff --git a/libglusterfs/src/event-poll.c b/libglusterfs/src/event-poll.c
index 727d2a000a2..5bac4291c47 100644
--- a/libglusterfs/src/event-poll.c
+++ b/libglusterfs/src/event-poll.c
@@ -33,11 +33,11 @@ struct event_slot_poll {
static int
event_register_poll(struct event_pool *event_pool, int fd,
event_handler_t handler, void *data, int poll_in,
- int poll_out);
+ int poll_out, char notify_poller_death);
static int
__flush_fd(int fd, int idx, int gen, void *data, int poll_in, int poll_out,
- int poll_err)
+ int poll_err, char event_thread_died)
{
char buf[64];
int ret = -1;
@@ -146,7 +146,7 @@ event_pool_new_poll(int count, int eventthreadcount)
}
ret = event_register_poll(event_pool, event_pool->breaker[0], __flush_fd,
- NULL, 1, 0);
+ NULL, 1, 0, 0);
if (ret == -1) {
gf_msg("poll", GF_LOG_ERROR, 0, LG_MSG_REGISTER_PIPE_FAILED,
"could not register pipe fd with poll event loop");
@@ -180,7 +180,7 @@ event_pool_new_poll(int count, int eventthreadcount)
static int
event_register_poll(struct event_pool *event_pool, int fd,
event_handler_t handler, void *data, int poll_in,
- int poll_out)
+ int poll_out, char notify_poller_death)
{
int idx = -1;
@@ -378,7 +378,7 @@ unlock:
ret = handler(ufds[i].fd, idx, 0, data,
(ufds[i].revents & (POLLIN | POLLPRI)),
(ufds[i].revents & (POLLOUT)),
- (ufds[i].revents & (POLLERR | POLLHUP | POLLNVAL)));
+ (ufds[i].revents & (POLLERR | POLLHUP | POLLNVAL)), 0);
return ret;
}
diff --git a/libglusterfs/src/event.c b/libglusterfs/src/event.c
index 49f70c83366..ddba9810b0b 100644
--- a/libglusterfs/src/event.c
+++ b/libglusterfs/src/event.c
@@ -54,14 +54,14 @@ event_pool_new(int count, int eventthreadcount)
int
event_register(struct event_pool *event_pool, int fd, event_handler_t handler,
- void *data, int poll_in, int poll_out)
+ void *data, int poll_in, int poll_out, char notify_poller_death)
{
int ret = -1;
GF_VALIDATE_OR_GOTO("event", event_pool, out);
- ret = event_pool->ops->event_register(event_pool, fd, handler, data,
- poll_in, poll_out);
+ ret = event_pool->ops->event_register(
+ event_pool, fd, handler, data, poll_in, poll_out, notify_poller_death);
out:
return ret;
}
@@ -161,7 +161,7 @@ out:
int
poller_destroy_handler(int fd, int idx, int gen, void *data, int poll_out,
- int poll_in, int poll_err)
+ int poll_in, int poll_err, char event_thread_exit)
{
struct event_destroy_data *destroy = NULL;
int readfd = -1, ret = -1;
@@ -233,7 +233,7 @@ event_dispatch_destroy(struct event_pool *event_pool)
/* From the main thread register an event on the pipe fd[0],
*/
- idx = event_register(event_pool, fd[0], poller_destroy_handler, &data, 1,
+ idx = event_register(event_pool, fd[0], poller_destroy_handler, &data, 1, 0,
0);
if (idx < 0)
goto out;
diff --git a/libglusterfs/src/gf-event.h b/libglusterfs/src/gf-event.h
index 5c3724cc953..5d92a2dd285 100644
--- a/libglusterfs/src/gf-event.h
+++ b/libglusterfs/src/gf-event.h
@@ -12,6 +12,7 @@
#define _GF_EVENT_H_
#include <pthread.h>
+#include "list.h"
struct event_pool;
struct event_ops;
@@ -23,7 +24,8 @@ struct event_data {
} __attribute__((__packed__, __may_alias__));
typedef int (*event_handler_t)(int fd, int idx, int gen, void *data,
- int poll_in, int poll_out, int poll_err);
+ int poll_in, int poll_out, int poll_err,
+ char event_thread_exit);
#define EVENT_EPOLL_TABLES 1024
#define EVENT_EPOLL_SLOTS 1024
@@ -40,6 +42,13 @@ struct event_pool {
struct event_slot_epoll *ereg[EVENT_EPOLL_TABLES];
int slots_used[EVENT_EPOLL_TABLES];
+ struct list_head poller_death;
+ int poller_death_sliced; /* track whether the list of fds interested
+ * poller_death is sliced. If yes, new thread death
+ * notification has to wait till the list is added
+ * back
+ */
+ int poller_gen;
int used;
int changed;
@@ -52,8 +61,8 @@ struct event_pool {
/* NOTE: Currently used only when event processing is done using
* epoll. */
int eventthreadcount; /* number of event threads to execute. */
- pthread_t pollers[EVENT_MAX_THREADS]; /* poller thread_id store,
- * and live status */
+ pthread_t pollers[EVENT_MAX_THREADS]; /* poller thread_id store, and live
+ status */
int destroy;
int activethreadcount;
@@ -81,7 +90,7 @@ struct event_ops {
int (*event_register)(struct event_pool *event_pool, int fd,
event_handler_t handler, void *data, int poll_in,
- int poll_out);
+ int poll_out, char notify_poller_death);
int (*event_select_on)(struct event_pool *event_pool, int fd, int idx,
int poll_in, int poll_out);
@@ -107,7 +116,7 @@ event_select_on(struct event_pool *event_pool, int fd, int idx, int poll_in,
int poll_out);
int
event_register(struct event_pool *event_pool, int fd, event_handler_t handler,
- void *data, int poll_in, int poll_out);
+ void *data, int poll_in, int poll_out, char notify_poller_death);
int
event_unregister(struct event_pool *event_pool, int fd, int idx);
int