diff options
| -rw-r--r-- | api/src/glfs.c | 3 | ||||
| -rw-r--r-- | cli/src/cli.c | 3 | ||||
| -rw-r--r-- | glusterfsd/src/glusterfsd.c | 3 | ||||
| -rw-r--r-- | libglusterfs/src/event-epoll.c | 210 | ||||
| -rw-r--r-- | libglusterfs/src/event-poll.c | 17 | ||||
| -rw-r--r-- | libglusterfs/src/event.c | 21 | ||||
| -rw-r--r-- | libglusterfs/src/event.h | 15 | ||||
| -rw-r--r-- | libglusterfs/src/glusterfs.h | 2 | ||||
| -rw-r--r-- | xlators/mgmt/glusterd/src/glusterd-volume-set.c | 8 | ||||
| -rw-r--r-- | xlators/protocol/client/src/client.c | 39 | ||||
| -rw-r--r-- | xlators/protocol/client/src/client.h | 3 | ||||
| -rw-r--r-- | xlators/protocol/server/src/server.c | 42 | ||||
| -rw-r--r-- | xlators/protocol/server/src/server.h | 3 | 
13 files changed, 336 insertions, 33 deletions
diff --git a/api/src/glfs.c b/api/src/glfs.c index 7542d8b9fcd..48af2412b8b 100644 --- a/api/src/glfs.c +++ b/api/src/glfs.c @@ -95,7 +95,8 @@ glusterfs_ctx_defaults_init (glusterfs_ctx_t *ctx)  		goto err;  	} -	ctx->event_pool = event_pool_new (DEFAULT_EVENT_POOL_SIZE); +	ctx->event_pool = event_pool_new (DEFAULT_EVENT_POOL_SIZE, +                                          STARTING_EVENT_THREADS);  	if (!ctx->event_pool) {  		goto err;  	} diff --git a/cli/src/cli.c b/cli/src/cli.c index b33ce950e11..cd2825e9c4a 100644 --- a/cli/src/cli.c +++ b/cli/src/cli.c @@ -114,7 +114,8 @@ glusterfs_ctx_defaults_init (glusterfs_ctx_t *ctx)          if (!ctx->iobuf_pool)                  return -1; -        ctx->event_pool = event_pool_new (DEFAULT_EVENT_POOL_SIZE); +        ctx->event_pool = event_pool_new (DEFAULT_EVENT_POOL_SIZE, +                                          STARTING_EVENT_THREADS);          if (!ctx->event_pool)                  return -1; diff --git a/glusterfsd/src/glusterfsd.c b/glusterfsd/src/glusterfsd.c index e750d68dd98..a46385aa292 100644 --- a/glusterfsd/src/glusterfsd.c +++ b/glusterfsd/src/glusterfsd.c @@ -1340,7 +1340,8 @@ glusterfs_ctx_defaults_init (glusterfs_ctx_t *ctx)                  goto out;          } -        ctx->event_pool = event_pool_new (DEFAULT_EVENT_POOL_SIZE); +        ctx->event_pool = event_pool_new (DEFAULT_EVENT_POOL_SIZE, +                                          STARTING_EVENT_THREADS);          if (!ctx->event_pool) {                  gf_msg ("", GF_LOG_CRITICAL, 0, glusterfsd_msg_14, "event");                  goto out; diff --git a/libglusterfs/src/event-epoll.c b/libglusterfs/src/event-epoll.c index 9082954e4e4..8d42fa71fb6 100644 --- a/libglusterfs/src/event-epoll.c +++ b/libglusterfs/src/event-epoll.c @@ -43,6 +43,10 @@ struct event_slot_epoll {  	gf_lock_t lock;  }; +struct event_thread_data { +        struct event_pool *event_pool; +        int    event_index; +};  static struct event_slot_epoll *  __event_newtable (struct event_pool *event_pool, int table_idx) @@ -232,7 +236,7 @@ done:  static struct event_pool * -event_pool_new_epoll (int count) +event_pool_new_epoll (int count, int eventthreadcount)  {          struct event_pool *event_pool = NULL;          int                epfd = -1; @@ -258,6 +262,8 @@ event_pool_new_epoll (int count)          event_pool->count = count; +        event_pool->eventthreadcount = eventthreadcount; +          pthread_mutex_init (&event_pool->mutex, NULL);  out: @@ -585,11 +591,45 @@ event_dispatch_epoll_worker (void *data)  {          struct epoll_event  event;          int                 ret = -1; -	struct event_pool  *event_pool = data; +        struct event_thread_data *ev_data = data; +	struct event_pool  *event_pool; +        int                 myindex = -1; +        int                 timetodie = 0; + +        GF_VALIDATE_OR_GOTO ("event", ev_data, out); + +        event_pool = ev_data->event_pool; +        myindex = ev_data->event_index;          GF_VALIDATE_OR_GOTO ("event", event_pool, out); +        gf_log ("epoll", GF_LOG_INFO, "Started thread with index %d", myindex); +  	for (;;) { +                if (event_pool->eventthreadcount < myindex) { +                        /* ...time to die, thread count was decreased below +                         * this threads index */ +                        /* Start with extra safety at this point, reducing +                         * lock conention in normal case when threads are not +                         * reconfigured always */ +                        pthread_mutex_lock (&event_pool->mutex); +                        { +                                if (event_pool->eventthreadcount < +                                    myindex) { +                                        /* if found true in critical section, +                                         * die */ +                                        event_pool->pollers[myindex - 1] = 0; +                                        timetodie = 1; +                                } +                        } +                        pthread_mutex_unlock (&event_pool->mutex); +                        if (timetodie) { +                                gf_log ("epoll", GF_LOG_INFO, +                                        "Exited thread with index %d", myindex); +                                goto out; +                        } +                } +                  ret = epoll_wait (event_pool->fd, &event, 1, -1);                  if (ret == 0) @@ -603,40 +643,164 @@ event_dispatch_epoll_worker (void *data)  		ret = event_dispatch_epoll_handler (event_pool, &event);          }  out: +        if (ev_data) +                GF_FREE (ev_data);          return NULL;  } - -#define GLUSTERFS_EPOLL_MAXTHREADS 2 - - +/* Attempts to start the # of configured pollers, ensuring at least the first + * is started in a joinable state */  static int  event_dispatch_epoll (struct event_pool *event_pool)  { -	int               i = 0; -	pthread_t         pollers[GLUSTERFS_EPOLL_MAXTHREADS]; -	int               ret = -1; - -	for (i = 0; i < GLUSTERFS_EPOLL_MAXTHREADS; i++) { -		ret = pthread_create (&pollers[i], NULL, -				      event_dispatch_epoll_worker, -				      event_pool); -	} +	int                       i = 0; +        pthread_t                 t_id; +        int                       pollercount = 0; +	int                       ret = -1; +        struct event_thread_data *ev_data = NULL; + +        /* Start the configured number of pollers */ +        pthread_mutex_lock (&event_pool->mutex); +        { +                pollercount = event_pool->eventthreadcount; + +                /* Set to MAX if greater */ +                if (pollercount > EVENT_MAX_THREADS) +                        pollercount = EVENT_MAX_THREADS; + +                /* Default pollers to 1 in case this is incorrectly set */ +                if (pollercount <= 0) +                        pollercount = 1; + +                for (i = 0; i < pollercount; i++) { +                        ev_data = GF_CALLOC (1, sizeof (*ev_data), +                                     gf_common_mt_event_pool); +                        if (!ev_data) { +                                gf_log ("epoll", GF_LOG_WARNING, +                                        "Allocation failure for index %d", i); +                                if (i == 0) { +                                        /* Need to suceed creating 0'th +                                         * thread, to joinable and wait */ +                                        break; +                                } else { +                                        /* Inability to create other threads +                                         * are a lesser evil, and ignored */ +                                        continue; +                                } +                        } + +                        ev_data->event_pool = event_pool; +                        ev_data->event_index = i + 1; + +                        ret = pthread_create (&t_id, NULL, +                                              event_dispatch_epoll_worker, +                                              ev_data); +                        if (!ret) { +                                event_pool->pollers[i] = t_id; + +                                /* mark all threads other than one in index 0 +                                 * as detachable. Errors can be ignored, they +                                 * spend their time as zombies if not detched +                                 * and the thread counts are decreased */ +                                if (i != 0) +                                        pthread_detach (event_pool->pollers[i]); +                        } else { +                                gf_log ("epoll", GF_LOG_WARNING, +                                        "Failed to start thread for index %d", +                                        i); +                                if (i == 0) { +                                        GF_FREE (ev_data); +                                        break; +                                } else { +                                        GF_FREE (ev_data); +                                        continue; +                                } +                        } +                } +        } +        pthread_mutex_unlock (&event_pool->mutex); -	for (i = 0; i < GLUSTERFS_EPOLL_MAXTHREADS; i++) -		pthread_join (pollers[i], NULL); +        /* Just wait for the first thread, that is created in a joinable state +         * and will never die, ensuring this function never returns */ +        if (event_pool->pollers[0] != 0) +		pthread_join (event_pool->pollers[0], NULL);  	return ret;  } +int +event_reconfigure_threads_epoll (struct event_pool *event_pool, int value) +{ +        int                              i; +        int                              ret; +        pthread_t                        t_id; +        int                              oldthreadcount; +        struct event_thread_data        *ev_data = NULL; + +        /* Set to MAX if greater */ +        if (value > EVENT_MAX_THREADS) +                value = EVENT_MAX_THREADS; + +        /* Default pollers to 1 in case this is set incorrectly */ +        if (value <= 0) +                value = 1; + +        pthread_mutex_lock (&event_pool->mutex); +        { +                oldthreadcount = event_pool->eventthreadcount; + +                if (oldthreadcount < value) { +                        /* create more poll threads */ +                        for (i = oldthreadcount; i < value; i++) { +                                /* Start a thread if the index at this location +                                 * is a 0, so that the older thread is confirmed +                                 * as dead */ +                                if (event_pool->pollers[i] == 0) { +                                        ev_data = GF_CALLOC (1, +                                                      sizeof (*ev_data), +                                                      gf_common_mt_event_pool); +                                        if (!ev_data) { +                                                gf_log ("epoll", GF_LOG_WARNING, +                                                  "Allocation failure for" +                                                  " index %d", i); +                                                continue; +                                        } + +                                        ev_data->event_pool = event_pool; +                                        ev_data->event_index = i + 1; + +                                        ret = pthread_create (&t_id, NULL, +                                                event_dispatch_epoll_worker, +                                                ev_data); +                                        if (ret) { +                                                gf_log ("epoll", GF_LOG_WARNING, +                                                  "Failed to start thread for" +                                                  " index %d", i); +                                                GF_FREE (ev_data); +                                        } else { +                                                pthread_detach (t_id); +                                                event_pool->pollers[i] = t_id; +                                        } +                                } +                        } +                } + +                /* if value decreases, threads will terminate, themselves */ +                event_pool->eventthreadcount = value; +        } +        pthread_mutex_unlock (&event_pool->mutex); + +        return 0; +}  struct event_ops event_ops_epoll = { -        .new                    = event_pool_new_epoll, -        .event_register         = event_register_epoll, -        .event_select_on        = event_select_on_epoll, -        .event_unregister       = event_unregister_epoll, -        .event_unregister_close = event_unregister_close_epoll, -        .event_dispatch         = event_dispatch_epoll +        .new                       = event_pool_new_epoll, +        .event_register            = event_register_epoll, +        .event_select_on           = event_select_on_epoll, +        .event_unregister          = event_unregister_epoll, +        .event_unregister_close    = event_unregister_close_epoll, +        .event_dispatch            = event_dispatch_epoll, +        .event_reconfigure_threads = event_reconfigure_threads_epoll  };  #endif diff --git a/libglusterfs/src/event-poll.c b/libglusterfs/src/event-poll.c index a7e2e663103..c91fa8487b5 100644 --- a/libglusterfs/src/event-poll.c +++ b/libglusterfs/src/event-poll.c @@ -95,7 +95,7 @@ out:  static struct event_pool * -event_pool_new_poll (int count) +event_pool_new_poll (int count, int eventthreadcount)  {          struct event_pool *event_pool = NULL;          int                ret = -1; @@ -171,6 +171,12 @@ event_pool_new_poll (int count)                  return NULL;          } +        if (eventthreadcount > 1) { +                gf_log ("poll", GF_LOG_INFO, +                        "Currently poll does not use multiple event processing" +                        " threads, thread count (%d) ignored", eventthreadcount); +        } +          return event_pool;  } @@ -469,6 +475,12 @@ out:          return -1;  } +int +event_reconfigure_threads_poll (struct event_pool *event_pool, int value) +{ +        /* No-op for poll */ +        return 0; +}  struct event_ops event_ops_poll = {          .new                    = event_pool_new_poll, @@ -476,5 +488,6 @@ struct event_ops event_ops_poll = {          .event_select_on        = event_select_on_poll,          .event_unregister       = event_unregister_poll,          .event_unregister_close = event_unregister_close_poll, -        .event_dispatch         = event_dispatch_poll +        .event_dispatch         = event_dispatch_poll, +        .event_reconfigure_threads = event_reconfigure_threads_poll  }; diff --git a/libglusterfs/src/event.c b/libglusterfs/src/event.c index 6c253df3c1a..4dd0f991700 100644 --- a/libglusterfs/src/event.c +++ b/libglusterfs/src/event.c @@ -29,7 +29,7 @@  struct event_pool * -event_pool_new (int count) +event_pool_new (int count, int eventthreadcount)  {          struct event_pool *event_pool = NULL;  	extern struct event_ops event_ops_poll; @@ -37,7 +37,7 @@ event_pool_new (int count)  #ifdef HAVE_SYS_EPOLL_H  	extern struct event_ops event_ops_epoll; -        event_pool = event_ops_epoll.new (count); +        event_pool = event_ops_epoll.new (count, eventthreadcount);          if (event_pool) {                  event_pool->ops = &event_ops_epoll; @@ -48,7 +48,7 @@ event_pool_new (int count)  #endif          if (!event_pool) { -                event_pool = event_ops_poll.new (count); +                event_pool = event_ops_poll.new (count, eventthreadcount);                  if (event_pool)                          event_pool->ops = &event_ops_poll; @@ -129,3 +129,18 @@ event_dispatch (struct event_pool *event_pool)  out:          return ret;  } + +int +event_reconfigure_threads (struct event_pool *event_pool, int value) +{ +        int ret = -1; + +        GF_VALIDATE_OR_GOTO ("event", event_pool, out); + +        /* call event refresh function */ +        ret = event_pool->ops->event_reconfigure_threads (event_pool, +                                                          value); + +out: +        return ret; +} diff --git a/libglusterfs/src/event.h b/libglusterfs/src/event.h index 3b3ab0e4b2f..930a7d1e28b 100644 --- a/libglusterfs/src/event.h +++ b/libglusterfs/src/event.h @@ -33,6 +33,7 @@ typedef int (*event_handler_t) (int fd, int idx, void *data,  #define EVENT_EPOLL_TABLES 1024  #define EVENT_EPOLL_SLOTS 1024 +#define EVENT_MAX_THREADS  32  struct event_pool {  	struct event_ops *ops; @@ -53,10 +54,16 @@ struct event_pool {  	void *evcache;  	int evcache_size; + +        /* NOTE: Currently used only when event processing is done using +         * epoll. */ +        int eventthreadcount; /* number of event threads to execute. */ +        pthread_t pollers[EVENT_MAX_THREADS]; /* poller thread_id store, +                                                     * and live status */  };  struct event_ops { -        struct event_pool * (*new) (int count); +        struct event_pool * (*new) (int count, int eventthreadcount);          int (*event_register) (struct event_pool *event_pool, int fd,                                 event_handler_t handler, @@ -71,9 +78,12 @@ struct event_ops {  				       int idx);          int (*event_dispatch) (struct event_pool *event_pool); + +        int (*event_reconfigure_threads) (struct event_pool *event_pool, +                                          int newcount);  }; -struct event_pool * event_pool_new (int count); +struct event_pool *event_pool_new (int count, int eventthreadcount);  int event_select_on (struct event_pool *event_pool, int fd, int idx,  		     int poll_in, int poll_out);  int event_register (struct event_pool *event_pool, int fd, @@ -82,5 +92,6 @@ int event_register (struct event_pool *event_pool, int fd,  int event_unregister (struct event_pool *event_pool, int fd, int idx);  int event_unregister_close (struct event_pool *event_pool, int fd, int idx);  int event_dispatch (struct event_pool *event_pool); +int event_reconfigure_threads (struct event_pool *event_pool, int value);  #endif /* _EVENT_H_ */ diff --git a/libglusterfs/src/glusterfs.h b/libglusterfs/src/glusterfs.h index 8059c976368..9c078e1d5f9 100644 --- a/libglusterfs/src/glusterfs.h +++ b/libglusterfs/src/glusterfs.h @@ -157,6 +157,8 @@  #define GLUSTERFS_RPC_REPLY_SIZE               24 +#define STARTING_EVENT_THREADS                 1 +  #define ZR_FILE_CONTENT_REQUEST(key) (!strncmp(key, ZR_FILE_CONTENT_STR, \                                                 ZR_FILE_CONTENT_STRLEN)) diff --git a/xlators/mgmt/glusterd/src/glusterd-volume-set.c b/xlators/mgmt/glusterd/src/glusterd-volume-set.c index 1d015a94698..e9473658176 100644 --- a/xlators/mgmt/glusterd/src/glusterd-volume-set.c +++ b/xlators/mgmt/glusterd/src/glusterd-volume-set.c @@ -836,6 +836,10 @@ struct volopt_map_entry glusterd_volopt_map[] = {            .type        = NO_DOC,            .op_version  = GD_OP_VERSION_3_7_0,          }, +        { .key         = "client.event-threads", +          .voltype     = "protocol/client", +          .op_version  = GD_OP_VERSION_3_7_0, +        },          /* Server xlator options */          { .key         = "network.tcp-window-size", @@ -939,6 +943,10 @@ struct volopt_map_entry glusterd_volopt_map[] = {            .type        = NO_DOC,            .op_version  = GD_OP_VERSION_3_7_0,          }, +        { .key         = "server.event-threads", +          .voltype     = "protocol/server", +          .op_version  = GD_OP_VERSION_3_7_0, +        },          /* Generic transport options */          { .key         = SSL_CERT_DEPTH_OPT, diff --git a/xlators/protocol/client/src/client.c b/xlators/protocol/client/src/client.c index fbd0ff22737..999a4a5c836 100644 --- a/xlators/protocol/client/src/client.c +++ b/xlators/protocol/client/src/client.c @@ -20,6 +20,7 @@  #include "glusterfs.h"  #include "statedump.h"  #include "compat-errno.h" +#include "event.h"  #include "xdr-rpc.h"  #include "glusterfs3.h" @@ -2513,6 +2514,23 @@ out:  }  int +client_check_event_threads (xlator_t *this, dict_t *options, clnt_conf_t *conf) +{ +        int          ret = -1; +        int          eventthreads = 0; + +        /* Read event-threads from the new configuration */ +        ret = dict_get_int32 (options, "event-threads", &eventthreads); +        if (!ret) { +                conf->event_threads = eventthreads; +        } +        ret = event_reconfigure_threads (this->ctx->event_pool, +                                         conf->event_threads); + +        return ret; +} + +int  reconfigure (xlator_t *this, dict_t *options)  {  	clnt_conf_t *conf              = NULL; @@ -2531,6 +2549,10 @@ reconfigure (xlator_t *this, dict_t *options)          GF_OPTION_RECONF ("ping-timeout", conf->opt.ping_timeout,                            options, int32, out); +        ret = client_check_event_threads (this, options, conf); +        if (ret) +                goto out; +          ret = client_check_remote_host (this, options);          if (ret)                  goto out; @@ -2609,6 +2631,13 @@ init (xlator_t *this)          conf->grace_timer        = NULL;          conf->grace_timer_needed = _gf_true; +        /* Set event threads to a default */ +        conf->event_threads = STARTING_EVENT_THREADS; + +        ret = client_check_event_threads (this, this->options, conf); +        if (ret) +                goto out; +          ret = client_init_grace_timer (this, this->options, conf);          if (ret)                  goto out; @@ -2936,5 +2965,15 @@ struct volume_options options[] = {            .type  = GF_OPTION_TYPE_BOOL,            .default_value = "on",          }, +        { .key   = {"event-threads"}, +          .type  = GF_OPTION_TYPE_INT, +          .min   = 1, +          .max   = 32, +          .default_value = "2", +          .description = "Specifies the number of event threads to execute in" +                         "in parallel. Larger values would help process" +                         " responses faster, depending on available processing" +                         " power. Range 1-32 threads." +        },          { .key   = {NULL} },  }; diff --git a/xlators/protocol/client/src/client.h b/xlators/protocol/client/src/client.h index b4809310939..af70926b178 100644 --- a/xlators/protocol/client/src/client.h +++ b/xlators/protocol/client/src/client.h @@ -125,6 +125,9 @@ typedef struct clnt_conf {          uint64_t               setvol_count;          gf_boolean_t           send_gids; /* let the server resolve gids */ + +        int                     event_threads; /* # of event threads +                                                * configured */  } clnt_conf_t;  typedef struct _client_fd_ctx { diff --git a/xlators/protocol/server/src/server.c b/xlators/protocol/server/src/server.c index 7a2b7fa3297..92113c7c28b 100644 --- a/xlators/protocol/server/src/server.c +++ b/xlators/protocol/server/src/server.c @@ -25,6 +25,7 @@  #include "statedump.h"  #include "defaults.h"  #include "authenticate.h" +#include "event.h"  void  grace_time_handler (void *data) @@ -674,6 +675,24 @@ out:  }  int +server_check_event_threads (xlator_t *this, dict_t *options, +                            server_conf_t *conf) +{ +        int              ret = -1; +        int              eventthreads = 0; + +        /* Read event-threads from the new configuration */ +        ret = dict_get_int32 (options, "event-threads", &eventthreads); +        if (!ret) { +                conf->event_threads = eventthreads; +        } +        ret = event_reconfigure_threads (this->ctx->event_pool, +                                         conf->event_threads); + +        return ret; +} + +int  reconfigure (xlator_t *this, dict_t *options)  { @@ -693,6 +712,7 @@ reconfigure (xlator_t *this, dict_t *options)                  gf_log_callingfn (this->name, GF_LOG_DEBUG, "conf == null!!!");                  goto out;          } +          if (dict_get_int32 ( options, "inode-lru-limit", &inode_lru_limit) == 0){                  conf->inode_lru_limit = inode_lru_limit;                  gf_log (this->name, GF_LOG_TRACE, "Reconfigured inode-lru-limit" @@ -790,6 +810,11 @@ reconfigure (xlator_t *this, dict_t *options)                                          "Reconfigure not found for transport" );                  }          } + +        ret = server_check_event_threads (this, options, conf); +        if (ret) +                goto out; +          ret = server_init_grace_timer (this, options, conf);  out: @@ -846,6 +871,13 @@ init (xlator_t *this)          INIT_LIST_HEAD (&conf->xprt_list);          pthread_mutex_init (&conf->mutex, NULL); +         /* Set event threads to a default */ +        conf->event_threads = STARTING_EVENT_THREADS; + +        ret = server_check_event_threads (this, this->options, conf); +        if (ret) +                goto out; +          ret = server_init_grace_timer (this, this->options, conf);          if (ret)                  goto out; @@ -1199,6 +1231,16 @@ struct volume_options options[] = {            .default_value = "2",            .description = "Timeout in seconds for the cached groups to expire."          }, +        { .key   = {"event-threads"}, +          .type  = GF_OPTION_TYPE_INT, +          .min   = 1, +          .max   = 32, +          .default_value = "2", +          .description = "Specifies the number of event threads to execute in" +                         "in parallel. Larger values would help process" +                         " responses faster, depending on available processing" +                         " power. Range 1-32 threads." +        },          { .key   = {NULL} },  }; diff --git a/xlators/protocol/server/src/server.h b/xlators/protocol/server/src/server.h index 3e1feacb94b..dc64edd0ab2 100644 --- a/xlators/protocol/server/src/server.h +++ b/xlators/protocol/server/src/server.h @@ -63,6 +63,9 @@ struct server_conf {          gf_boolean_t            server_manage_gids; /* resolve gids on brick */          gid_cache_t             gid_cache;          int32_t                 gid_cache_timeout; + +        int                     event_threads; /* # of event threads +                                                * configured */  };  typedef struct server_conf server_conf_t;  | 
