summaryrefslogtreecommitdiffstats
path: root/libglusterfs/src/event-epoll.c
diff options
context:
space:
mode:
Diffstat (limited to 'libglusterfs/src/event-epoll.c')
-rw-r--r--libglusterfs/src/event-epoll.c210
1 files changed, 187 insertions, 23 deletions
diff --git a/libglusterfs/src/event-epoll.c b/libglusterfs/src/event-epoll.c
index 9082954e4e4..8d42fa71fb6 100644
--- a/libglusterfs/src/event-epoll.c
+++ b/libglusterfs/src/event-epoll.c
@@ -43,6 +43,10 @@ struct event_slot_epoll {
gf_lock_t lock;
};
+struct event_thread_data {
+ struct event_pool *event_pool;
+ int event_index;
+};
static struct event_slot_epoll *
__event_newtable (struct event_pool *event_pool, int table_idx)
@@ -232,7 +236,7 @@ done:
static struct event_pool *
-event_pool_new_epoll (int count)
+event_pool_new_epoll (int count, int eventthreadcount)
{
struct event_pool *event_pool = NULL;
int epfd = -1;
@@ -258,6 +262,8 @@ event_pool_new_epoll (int count)
event_pool->count = count;
+ event_pool->eventthreadcount = eventthreadcount;
+
pthread_mutex_init (&event_pool->mutex, NULL);
out:
@@ -585,11 +591,45 @@ event_dispatch_epoll_worker (void *data)
{
struct epoll_event event;
int ret = -1;
- struct event_pool *event_pool = data;
+ struct event_thread_data *ev_data = data;
+ struct event_pool *event_pool;
+ int myindex = -1;
+ int timetodie = 0;
+
+ GF_VALIDATE_OR_GOTO ("event", ev_data, out);
+
+ event_pool = ev_data->event_pool;
+ myindex = ev_data->event_index;
GF_VALIDATE_OR_GOTO ("event", event_pool, out);
+ gf_log ("epoll", GF_LOG_INFO, "Started thread with index %d", myindex);
+
for (;;) {
+ if (event_pool->eventthreadcount < myindex) {
+ /* ...time to die, thread count was decreased below
+ * this threads index */
+ /* Start with extra safety at this point, reducing
+ * lock conention in normal case when threads are not
+ * reconfigured always */
+ pthread_mutex_lock (&event_pool->mutex);
+ {
+ if (event_pool->eventthreadcount <
+ myindex) {
+ /* if found true in critical section,
+ * die */
+ event_pool->pollers[myindex - 1] = 0;
+ timetodie = 1;
+ }
+ }
+ pthread_mutex_unlock (&event_pool->mutex);
+ if (timetodie) {
+ gf_log ("epoll", GF_LOG_INFO,
+ "Exited thread with index %d", myindex);
+ goto out;
+ }
+ }
+
ret = epoll_wait (event_pool->fd, &event, 1, -1);
if (ret == 0)
@@ -603,40 +643,164 @@ event_dispatch_epoll_worker (void *data)
ret = event_dispatch_epoll_handler (event_pool, &event);
}
out:
+ if (ev_data)
+ GF_FREE (ev_data);
return NULL;
}
-
-#define GLUSTERFS_EPOLL_MAXTHREADS 2
-
-
+/* Attempts to start the # of configured pollers, ensuring at least the first
+ * is started in a joinable state */
static int
event_dispatch_epoll (struct event_pool *event_pool)
{
- int i = 0;
- pthread_t pollers[GLUSTERFS_EPOLL_MAXTHREADS];
- int ret = -1;
-
- for (i = 0; i < GLUSTERFS_EPOLL_MAXTHREADS; i++) {
- ret = pthread_create (&pollers[i], NULL,
- event_dispatch_epoll_worker,
- event_pool);
- }
+ int i = 0;
+ pthread_t t_id;
+ int pollercount = 0;
+ int ret = -1;
+ struct event_thread_data *ev_data = NULL;
+
+ /* Start the configured number of pollers */
+ pthread_mutex_lock (&event_pool->mutex);
+ {
+ pollercount = event_pool->eventthreadcount;
+
+ /* Set to MAX if greater */
+ if (pollercount > EVENT_MAX_THREADS)
+ pollercount = EVENT_MAX_THREADS;
+
+ /* Default pollers to 1 in case this is incorrectly set */
+ if (pollercount <= 0)
+ pollercount = 1;
+
+ for (i = 0; i < pollercount; i++) {
+ ev_data = GF_CALLOC (1, sizeof (*ev_data),
+ gf_common_mt_event_pool);
+ if (!ev_data) {
+ gf_log ("epoll", GF_LOG_WARNING,
+ "Allocation failure for index %d", i);
+ if (i == 0) {
+ /* Need to suceed creating 0'th
+ * thread, to joinable and wait */
+ break;
+ } else {
+ /* Inability to create other threads
+ * are a lesser evil, and ignored */
+ continue;
+ }
+ }
+
+ ev_data->event_pool = event_pool;
+ ev_data->event_index = i + 1;
+
+ ret = pthread_create (&t_id, NULL,
+ event_dispatch_epoll_worker,
+ ev_data);
+ if (!ret) {
+ event_pool->pollers[i] = t_id;
+
+ /* mark all threads other than one in index 0
+ * as detachable. Errors can be ignored, they
+ * spend their time as zombies if not detched
+ * and the thread counts are decreased */
+ if (i != 0)
+ pthread_detach (event_pool->pollers[i]);
+ } else {
+ gf_log ("epoll", GF_LOG_WARNING,
+ "Failed to start thread for index %d",
+ i);
+ if (i == 0) {
+ GF_FREE (ev_data);
+ break;
+ } else {
+ GF_FREE (ev_data);
+ continue;
+ }
+ }
+ }
+ }
+ pthread_mutex_unlock (&event_pool->mutex);
- for (i = 0; i < GLUSTERFS_EPOLL_MAXTHREADS; i++)
- pthread_join (pollers[i], NULL);
+ /* Just wait for the first thread, that is created in a joinable state
+ * and will never die, ensuring this function never returns */
+ if (event_pool->pollers[0] != 0)
+ pthread_join (event_pool->pollers[0], NULL);
return ret;
}
+int
+event_reconfigure_threads_epoll (struct event_pool *event_pool, int value)
+{
+ int i;
+ int ret;
+ pthread_t t_id;
+ int oldthreadcount;
+ struct event_thread_data *ev_data = NULL;
+
+ /* Set to MAX if greater */
+ if (value > EVENT_MAX_THREADS)
+ value = EVENT_MAX_THREADS;
+
+ /* Default pollers to 1 in case this is set incorrectly */
+ if (value <= 0)
+ value = 1;
+
+ pthread_mutex_lock (&event_pool->mutex);
+ {
+ oldthreadcount = event_pool->eventthreadcount;
+
+ if (oldthreadcount < value) {
+ /* create more poll threads */
+ for (i = oldthreadcount; i < value; i++) {
+ /* Start a thread if the index at this location
+ * is a 0, so that the older thread is confirmed
+ * as dead */
+ if (event_pool->pollers[i] == 0) {
+ ev_data = GF_CALLOC (1,
+ sizeof (*ev_data),
+ gf_common_mt_event_pool);
+ if (!ev_data) {
+ gf_log ("epoll", GF_LOG_WARNING,
+ "Allocation failure for"
+ " index %d", i);
+ continue;
+ }
+
+ ev_data->event_pool = event_pool;
+ ev_data->event_index = i + 1;
+
+ ret = pthread_create (&t_id, NULL,
+ event_dispatch_epoll_worker,
+ ev_data);
+ if (ret) {
+ gf_log ("epoll", GF_LOG_WARNING,
+ "Failed to start thread for"
+ " index %d", i);
+ GF_FREE (ev_data);
+ } else {
+ pthread_detach (t_id);
+ event_pool->pollers[i] = t_id;
+ }
+ }
+ }
+ }
+
+ /* if value decreases, threads will terminate, themselves */
+ event_pool->eventthreadcount = value;
+ }
+ pthread_mutex_unlock (&event_pool->mutex);
+
+ return 0;
+}
struct event_ops event_ops_epoll = {
- .new = event_pool_new_epoll,
- .event_register = event_register_epoll,
- .event_select_on = event_select_on_epoll,
- .event_unregister = event_unregister_epoll,
- .event_unregister_close = event_unregister_close_epoll,
- .event_dispatch = event_dispatch_epoll
+ .new = event_pool_new_epoll,
+ .event_register = event_register_epoll,
+ .event_select_on = event_select_on_epoll,
+ .event_unregister = event_unregister_epoll,
+ .event_unregister_close = event_unregister_close_epoll,
+ .event_dispatch = event_dispatch_epoll,
+ .event_reconfigure_threads = event_reconfigure_threads_epoll
};
#endif