diff options
Diffstat (limited to 'libglusterfs/src')
| -rw-r--r-- | libglusterfs/src/event-epoll.c | 114 | ||||
| -rw-r--r-- | libglusterfs/src/event-poll.c | 10 | ||||
| -rw-r--r-- | libglusterfs/src/event.c | 10 | ||||
| -rw-r--r-- | libglusterfs/src/gf-event.h | 19 | 
4 files changed, 125 insertions, 28 deletions
diff --git a/libglusterfs/src/event-epoll.c b/libglusterfs/src/event-epoll.c index 9826cc9e275..041a7e6c583 100644 --- a/libglusterfs/src/event-epoll.c +++ b/libglusterfs/src/event-epoll.c @@ -30,6 +30,7 @@ struct event_slot_epoll {      int fd;      int events;      int gen; +    int idx;      gf_atomic_t ref;      int do_close;      int in_handler; @@ -37,6 +38,7 @@ struct event_slot_epoll {      void *data;      event_handler_t handler;      gf_lock_t lock; +    struct list_head poller_death;  };  struct event_thread_data { @@ -57,6 +59,7 @@ __event_newtable(struct event_pool *event_pool, int table_idx)      for (i = 0; i < EVENT_EPOLL_SLOTS; i++) {          table[i].fd = -1;          LOCK_INIT(&table[i].lock); +        INIT_LIST_HEAD(&table[i].poller_death);      }      event_pool->ereg[table_idx] = table; @@ -66,7 +69,8 @@ __event_newtable(struct event_pool *event_pool, int table_idx)  }  static int -__event_slot_alloc(struct event_pool *event_pool, int fd) +__event_slot_alloc(struct event_pool *event_pool, int fd, +                   char notify_poller_death)  {      int i = 0;      int table_idx = -1; @@ -109,8 +113,15 @@ __event_slot_alloc(struct event_pool *event_pool, int fd)              table[i].gen = gen + 1;              LOCK_INIT(&table[i].lock); +            INIT_LIST_HEAD(&table[i].poller_death);              table[i].fd = fd; +            if (notify_poller_death) { +                table[i].idx = table_idx * EVENT_EPOLL_SLOTS + i; +                list_add_tail(&table[i].poller_death, +                              &event_pool->poller_death); +            } +              event_pool->slots_used[table_idx]++;              break; @@ -121,13 +132,14 @@ __event_slot_alloc(struct event_pool *event_pool, int fd)  }  static int -event_slot_alloc(struct event_pool *event_pool, int fd) +event_slot_alloc(struct event_pool *event_pool, int fd, +                 char notify_poller_death)  {      int idx = -1;      pthread_mutex_lock(&event_pool->mutex);      { -        idx = __event_slot_alloc(event_pool, fd); +        idx = __event_slot_alloc(event_pool, fd, notify_poller_death);      }      pthread_mutex_unlock(&event_pool->mutex); @@ -155,6 +167,7 @@ __event_slot_dealloc(struct event_pool *event_pool, int idx)      slot->fd = -1;      slot->handled_error = 0;      slot->in_handler = 0; +    list_del_init(&slot->poller_death);      event_pool->slots_used[table_idx]--;      return; @@ -172,6 +185,15 @@ event_slot_dealloc(struct event_pool *event_pool, int idx)      return;  } +static int +event_slot_ref(struct event_slot_epoll *slot) +{ +    if (!slot) +        return -1; + +    return GF_ATOMIC_INC(slot->ref); +} +  static struct event_slot_epoll *  event_slot_get(struct event_pool *event_pool, int idx)  { @@ -188,12 +210,41 @@ event_slot_get(struct event_pool *event_pool, int idx)          return NULL;      slot = &table[offset]; -    GF_ATOMIC_INC(slot->ref); +    event_slot_ref(slot);      return slot;  }  static void +__event_slot_unref(struct event_pool *event_pool, struct event_slot_epoll *slot, +                   int idx) +{ +    int ref = -1; +    int fd = -1; +    int do_close = 0; + +    ref = GF_ATOMIC_DEC(slot->ref); +    if (ref) +        /* slot still alive */ +        goto done; + +    LOCK(&slot->lock); +    { +        fd = slot->fd; +        do_close = slot->do_close; +        slot->do_close = 0; +    } +    UNLOCK(&slot->lock); + +    __event_slot_dealloc(event_pool, idx); + +    if (do_close) +        sys_close(fd); +done: +    return; +} + +static void  event_slot_unref(struct event_pool *event_pool, struct event_slot_epoll *slot,                   int idx)  { @@ -248,7 +299,7 @@ event_pool_new_epoll(int count, int eventthreadcount)      event_pool->fd = epfd;      event_pool->count = count; - +    INIT_LIST_HEAD(&event_pool->poller_death);      event_pool->eventthreadcount = eventthreadcount;      event_pool->auto_thread_count = 0; @@ -297,7 +348,7 @@ __slot_update_events(struct event_slot_epoll *slot, int poll_in, int poll_out)  int  event_register_epoll(struct event_pool *event_pool, int fd,                       event_handler_t handler, void *data, int poll_in, -                     int poll_out) +                     int poll_out, char notify_poller_death)  {      int idx = -1;      int ret = -1; @@ -328,7 +379,7 @@ event_register_epoll(struct event_pool *event_pool, int fd,      if (destroy == 1)          goto out; -    idx = event_slot_alloc(event_pool, fd); +    idx = event_slot_alloc(event_pool, fd, notify_poller_death);      if (idx == -1) {          gf_msg("epoll", GF_LOG_ERROR, 0, LG_MSG_SLOT_NOT_FOUND,                 "could not find slot for fd=%d", fd); @@ -591,7 +642,7 @@ pre_unlock:          ret = handler(fd, idx, gen, data,                        (event->events & (EPOLLIN | EPOLLPRI)),                        (event->events & (EPOLLOUT)), -                      (event->events & (EPOLLERR | EPOLLHUP))); +                      (event->events & (EPOLLERR | EPOLLHUP)), 0);      }  out:      event_slot_unref(event_pool, slot, idx); @@ -607,7 +658,9 @@ event_dispatch_epoll_worker(void *data)      struct event_thread_data *ev_data = data;      struct event_pool *event_pool;      int myindex = -1; -    int timetodie = 0; +    int timetodie = 0, gen = 0; +    struct list_head poller_death_notify; +    struct event_slot_epoll *slot = NULL, *tmp = NULL;      GF_VALIDATE_OR_GOTO("event", ev_data, out); @@ -619,7 +672,7 @@ event_dispatch_epoll_worker(void *data)      gf_msg("epoll", GF_LOG_INFO, 0, LG_MSG_STARTED_EPOLL_THREAD,             "Started"             " thread with index %d", -           myindex); +           myindex - 1);      pthread_mutex_lock(&event_pool->mutex);      { @@ -637,20 +690,55 @@ event_dispatch_epoll_worker(void *data)              pthread_mutex_lock(&event_pool->mutex);              {                  if (event_pool->eventthreadcount < myindex) { +                    while (event_pool->poller_death_sliced) { +                        pthread_cond_wait(&event_pool->cond, +                                          &event_pool->mutex); +                    } + +                    INIT_LIST_HEAD(&poller_death_notify);                      /* if found true in critical section,                       * die */                      event_pool->pollers[myindex - 1] = 0;                      event_pool->activethreadcount--;                      timetodie = 1; +                    gen = ++event_pool->poller_gen; +                    list_for_each_entry(slot, &event_pool->poller_death, +                                        poller_death) +                    { +                        event_slot_ref(slot); +                    } + +                    list_splice_init(&event_pool->poller_death, +                                     &poller_death_notify); +                    event_pool->poller_death_sliced = 1;                      pthread_cond_broadcast(&event_pool->cond);                  }              }              pthread_mutex_unlock(&event_pool->mutex);              if (timetodie) { +                list_for_each_entry(slot, &poller_death_notify, poller_death) +                { +                    slot->handler(slot->fd, 0, gen, slot->data, 0, 0, 0, 1); +                } + +                pthread_mutex_lock(&event_pool->mutex); +                { +                    list_for_each_entry_safe(slot, tmp, &poller_death_notify, +                                             poller_death) +                    { +                        __event_slot_unref(event_pool, slot, slot->idx); +                    } + +                    list_splice(&poller_death_notify, +                                &event_pool->poller_death); +                    event_pool->poller_death_sliced = 0; +                    pthread_cond_broadcast(&event_pool->cond); +                } +                pthread_mutex_unlock(&event_pool->mutex); +                  gf_msg("epoll", GF_LOG_INFO, 0, LG_MSG_EXITED_EPOLL_THREAD, -                       "Exited " -                       "thread with index %d", -                       myindex); +                       "Exited thread with index %d", myindex); +                  goto out;              }          } diff --git a/libglusterfs/src/event-poll.c b/libglusterfs/src/event-poll.c index 727d2a000a2..5bac4291c47 100644 --- a/libglusterfs/src/event-poll.c +++ b/libglusterfs/src/event-poll.c @@ -33,11 +33,11 @@ struct event_slot_poll {  static int  event_register_poll(struct event_pool *event_pool, int fd,                      event_handler_t handler, void *data, int poll_in, -                    int poll_out); +                    int poll_out, char notify_poller_death);  static int  __flush_fd(int fd, int idx, int gen, void *data, int poll_in, int poll_out, -           int poll_err) +           int poll_err, char event_thread_died)  {      char buf[64];      int ret = -1; @@ -146,7 +146,7 @@ event_pool_new_poll(int count, int eventthreadcount)      }      ret = event_register_poll(event_pool, event_pool->breaker[0], __flush_fd, -                              NULL, 1, 0); +                              NULL, 1, 0, 0);      if (ret == -1) {          gf_msg("poll", GF_LOG_ERROR, 0, LG_MSG_REGISTER_PIPE_FAILED,                 "could not register pipe fd with poll event loop"); @@ -180,7 +180,7 @@ event_pool_new_poll(int count, int eventthreadcount)  static int  event_register_poll(struct event_pool *event_pool, int fd,                      event_handler_t handler, void *data, int poll_in, -                    int poll_out) +                    int poll_out, char notify_poller_death)  {      int idx = -1; @@ -378,7 +378,7 @@ unlock:          ret = handler(ufds[i].fd, idx, 0, data,                        (ufds[i].revents & (POLLIN | POLLPRI)),                        (ufds[i].revents & (POLLOUT)), -                      (ufds[i].revents & (POLLERR | POLLHUP | POLLNVAL))); +                      (ufds[i].revents & (POLLERR | POLLHUP | POLLNVAL)), 0);      return ret;  } diff --git a/libglusterfs/src/event.c b/libglusterfs/src/event.c index 49f70c83366..ddba9810b0b 100644 --- a/libglusterfs/src/event.c +++ b/libglusterfs/src/event.c @@ -54,14 +54,14 @@ event_pool_new(int count, int eventthreadcount)  int  event_register(struct event_pool *event_pool, int fd, event_handler_t handler, -               void *data, int poll_in, int poll_out) +               void *data, int poll_in, int poll_out, char notify_poller_death)  {      int ret = -1;      GF_VALIDATE_OR_GOTO("event", event_pool, out); -    ret = event_pool->ops->event_register(event_pool, fd, handler, data, -                                          poll_in, poll_out); +    ret = event_pool->ops->event_register( +        event_pool, fd, handler, data, poll_in, poll_out, notify_poller_death);  out:      return ret;  } @@ -161,7 +161,7 @@ out:  int  poller_destroy_handler(int fd, int idx, int gen, void *data, int poll_out, -                       int poll_in, int poll_err) +                       int poll_in, int poll_err, char event_thread_exit)  {      struct event_destroy_data *destroy = NULL;      int readfd = -1, ret = -1; @@ -233,7 +233,7 @@ event_dispatch_destroy(struct event_pool *event_pool)      /* From the main thread register an event on the pipe fd[0],       */ -    idx = event_register(event_pool, fd[0], poller_destroy_handler, &data, 1, +    idx = event_register(event_pool, fd[0], poller_destroy_handler, &data, 1, 0,                           0);      if (idx < 0)          goto out; diff --git a/libglusterfs/src/gf-event.h b/libglusterfs/src/gf-event.h index 5c3724cc953..5d92a2dd285 100644 --- a/libglusterfs/src/gf-event.h +++ b/libglusterfs/src/gf-event.h @@ -12,6 +12,7 @@  #define _GF_EVENT_H_  #include <pthread.h> +#include "list.h"  struct event_pool;  struct event_ops; @@ -23,7 +24,8 @@ struct event_data {  } __attribute__((__packed__, __may_alias__));  typedef int (*event_handler_t)(int fd, int idx, int gen, void *data, -                               int poll_in, int poll_out, int poll_err); +                               int poll_in, int poll_out, int poll_err, +                               char event_thread_exit);  #define EVENT_EPOLL_TABLES 1024  #define EVENT_EPOLL_SLOTS 1024 @@ -40,6 +42,13 @@ struct event_pool {      struct event_slot_epoll *ereg[EVENT_EPOLL_TABLES];      int slots_used[EVENT_EPOLL_TABLES]; +    struct list_head poller_death; +    int poller_death_sliced; /* track whether the list of fds interested +                              * poller_death is sliced. If yes, new thread death +                              * notification has to wait till the list is added +                              * back +                              */ +    int poller_gen;      int used;      int changed; @@ -52,8 +61,8 @@ struct event_pool {      /* NOTE: Currently used only when event processing is done using       * epoll. */      int eventthreadcount; /* number of event threads to execute. */ -    pthread_t pollers[EVENT_MAX_THREADS]; /* poller thread_id store, -                                           * and live status */ +    pthread_t pollers[EVENT_MAX_THREADS]; /* poller thread_id store, and live +                                             status */      int destroy;      int activethreadcount; @@ -81,7 +90,7 @@ struct event_ops {      int (*event_register)(struct event_pool *event_pool, int fd,                            event_handler_t handler, void *data, int poll_in, -                          int poll_out); +                          int poll_out, char notify_poller_death);      int (*event_select_on)(struct event_pool *event_pool, int fd, int idx,                             int poll_in, int poll_out); @@ -107,7 +116,7 @@ event_select_on(struct event_pool *event_pool, int fd, int idx, int poll_in,                  int poll_out);  int  event_register(struct event_pool *event_pool, int fd, event_handler_t handler, -               void *data, int poll_in, int poll_out); +               void *data, int poll_in, int poll_out, char notify_poller_death);  int  event_unregister(struct event_pool *event_pool, int fd, int idx);  int  | 
