diff options
| author | Milind Changire <mchangir@redhat.com> | 2018-03-20 17:47:20 +0530 | 
|---|---|---|
| committer | Milind Changire <mchangir@redhat.com> | 2018-03-20 17:47:40 +0530 | 
| commit | 9d482b4e8ddcc17c39ab45ade6afcfd125bda2bb (patch) | |
| tree | df55b66c321ce88e55e8957bbce49a54df3d3e04 | |
| parent | 2bb0623cfbeca29264a1b8102c738851fa0d9202 (diff) | |
rpcsvc: scale rpcsvc_request_handler threads
Scale rpcsvc_request_handler threads to match the scaling of event
handler threads.
Please refer to https://bugzilla.redhat.com/show_bug.cgi?id=1467614#c51
for a discussion about why we need multi-threaded rpcsvc request
handlers.
mainline:
> Reviewed-on: https://review.gluster.org/19337
> Reviewed-by: Raghavendra G <rgowdapp@redhat.com>
> Signed-off-by: Milind Changire <mchangir@redhat.com>
(cherry picked from commit 7d641313f46789ec0a7ba0cc04f504724c780855)
Change-Id: Ib6838fb8b928e15602a3d36fd66b7ba08999430b
BUG: 1550946
Signed-off-by: Milind Changire <mchangir@redhat.com>
| -rw-r--r-- | glusterfsd/src/Makefile.am | 1 | ||||
| -rw-r--r-- | glusterfsd/src/glusterfsd-mgmt.c | 11 | ||||
| -rw-r--r-- | libglusterfs/src/event-poll.c | 7 | ||||
| -rw-r--r-- | rpc/rpc-lib/src/Makefile.am | 1 | ||||
| -rw-r--r-- | rpc/rpc-lib/src/autoscale-threads.c | 5 | ||||
| -rw-r--r-- | rpc/rpc-lib/src/libgfrpc.sym | 1 | ||||
| -rw-r--r-- | rpc/rpc-lib/src/rpcsvc.c | 131 | ||||
| -rw-r--r-- | rpc/rpc-lib/src/rpcsvc.h | 10 | ||||
| -rw-r--r-- | xlators/protocol/server/src/server.c | 9 | 
9 files changed, 148 insertions, 28 deletions
diff --git a/glusterfsd/src/Makefile.am b/glusterfsd/src/Makefile.am index 3286e639bcf..eb92e66e989 100644 --- a/glusterfsd/src/Makefile.am +++ b/glusterfsd/src/Makefile.am @@ -25,7 +25,6 @@ AM_CPPFLAGS = $(GF_CPPFLAGS) \  	-I$(top_srcdir)/rpc/xdr/src \  	-I$(top_builddir)/rpc/xdr/src \  	-I$(top_srcdir)/xlators/nfs/server/src \ -	-I$(top_srcdir)/xlators/protocol/server/src \  	-I$(top_srcdir)/api/src  AM_CFLAGS = -Wall $(GF_CFLAGS) diff --git a/glusterfsd/src/glusterfsd-mgmt.c b/glusterfsd/src/glusterfsd-mgmt.c index 3404a5931f0..e007149c08c 100644 --- a/glusterfsd/src/glusterfsd-mgmt.c +++ b/glusterfsd/src/glusterfsd-mgmt.c @@ -33,7 +33,6 @@  #include "xlator.h"  #include "syscall.h"  #include "monitoring.h" -#include "server.h"  static gf_boolean_t is_mgmt_rpc_reconnect = _gf_false;  int need_emancipate = 0; @@ -834,8 +833,7 @@ glusterfs_handle_attach (rpcsvc_request_t *req)          xlator_t                *nextchild      = NULL;          glusterfs_graph_t       *newgraph       = NULL;          glusterfs_ctx_t         *ctx            = NULL; -        xlator_t                *srv_xl         = NULL; -        server_conf_t           *srv_conf       = NULL; +        xlator_t                *protocol_server = NULL;          GF_ASSERT (req);          this = THIS; @@ -876,10 +874,9 @@ glusterfs_handle_attach (rpcsvc_request_t *req)                                  /* we need a protocol/server xlator as                                   * nextchild                                   */ -                                srv_xl = this->ctx->active->first; -                                srv_conf = (server_conf_t *)srv_xl->private; -                                rpcsvc_autoscale_threads (this->ctx, -                                                          srv_conf->rpc, 1); +                                protocol_server = this->ctx->active->first; +                                rpcsvc_autoscale_threads (this->ctx, 1, +                                                          protocol_server);                          }                  } else {                          gf_log (this->name, GF_LOG_WARNING, diff --git a/libglusterfs/src/event-poll.c b/libglusterfs/src/event-poll.c index 3bffc4784d7..b1aca826759 100644 --- a/libglusterfs/src/event-poll.c +++ b/libglusterfs/src/event-poll.c @@ -173,6 +173,13 @@ event_pool_new_poll (int count, int eventthreadcount)                          "thread count (%d) ignored", eventthreadcount);          } +        /* although, eventhreadcount for poll implementaiton is always +         * going to be 1, eventthreadcount needs to be set to 1 so that +         * rpcsvc_request_handler() thread scaling works flawlessly in +         * both epoll and poll models +         */ +        event_pool->eventthreadcount = 1; +          return event_pool;  } diff --git a/rpc/rpc-lib/src/Makefile.am b/rpc/rpc-lib/src/Makefile.am index 81a96476883..f784c2c1ff3 100644 --- a/rpc/rpc-lib/src/Makefile.am +++ b/rpc/rpc-lib/src/Makefile.am @@ -22,6 +22,7 @@ AM_CPPFLAGS = $(GF_CPPFLAGS) -I$(top_srcdir)/libglusterfs/src \  	-I$(top_srcdir)/rpc/xdr/src \  	-I$(top_builddir)/rpc/xdr/src \  	-DRPC_TRANSPORTDIR=\"$(libdir)/glusterfs/$(PACKAGE_VERSION)/rpc-transport\" \ +	-I$(top_srcdir)/xlators/protocol/server/src \  	-I$(top_srcdir)/contrib/rbtree  AM_CFLAGS = -Wall $(GF_CFLAGS) diff --git a/rpc/rpc-lib/src/autoscale-threads.c b/rpc/rpc-lib/src/autoscale-threads.c index e0e89586160..9e20b37ac63 100644 --- a/rpc/rpc-lib/src/autoscale-threads.c +++ b/rpc/rpc-lib/src/autoscale-threads.c @@ -10,13 +10,16 @@  #include "event.h"  #include "rpcsvc.h" +#include "server.h"  void -rpcsvc_autoscale_threads (glusterfs_ctx_t *ctx, rpcsvc_t *rpc, int incr) +rpcsvc_autoscale_threads (glusterfs_ctx_t *ctx, int incr, xlator_t *this)  {          struct event_pool       *pool           = ctx->event_pool; +        server_conf_t           *conf           = this->private;          int                      thread_count   = pool->eventthreadcount;          pool->auto_thread_count += incr;          (void) event_reconfigure_threads (pool, thread_count+incr); +        rpcsvc_ownthread_reconf (conf->rpc, pool->eventthreadcount);  } diff --git a/rpc/rpc-lib/src/libgfrpc.sym b/rpc/rpc-lib/src/libgfrpc.sym index 540181dabb6..4fab688c66d 100644 --- a/rpc/rpc-lib/src/libgfrpc.sym +++ b/rpc/rpc-lib/src/libgfrpc.sym @@ -53,6 +53,7 @@ rpcsvc_transport_connect  rpcsvc_transport_getpeeraddr  rpcsvc_unregister_notify  rpcsvc_volume_allowed +rpcsvc_ownthread_reconf  rpc_transport_count  rpc_transport_connect  rpc_transport_disconnect diff --git a/rpc/rpc-lib/src/rpcsvc.c b/rpc/rpc-lib/src/rpcsvc.c index 8766da47b7b..34e7563e163 100644 --- a/rpc/rpc-lib/src/rpcsvc.c +++ b/rpc/rpc-lib/src/rpcsvc.c @@ -1970,33 +1970,98 @@ rpcsvc_request_handler (void *arg)                                  goto unlock;                          } -                        while (list_empty (&program->request_queue)) +                        while (list_empty (&program->request_queue) && +                               (program->threadcount <= +                                        program->eventthreadcount)) {                                  pthread_cond_wait (&program->queue_cond,                                                     &program->queue_lock); +                        } -                        req = list_entry (program->request_queue.next, -                                          typeof (*req), request_list); - -                        list_del_init (&req->request_list); +                        if (program->threadcount > program->eventthreadcount) { +                                done = 1; +                                program->threadcount--; + +                                gf_log (GF_RPCSVC, GF_LOG_INFO, +                                        "program '%s' thread terminated; " +                                        "total count:%d", +                                        program->progname, +                                        program->threadcount); +                        } else if (!list_empty (&program->request_queue)) { +                                req = list_entry (program->request_queue.next, +                                                  typeof (*req), request_list); + +                                list_del_init (&req->request_list); +                        }                  }          unlock:                  pthread_mutex_unlock (&program->queue_lock); +                if (req) { +                        THIS = req->svc->xl; +                        actor = rpcsvc_program_actor (req); +                        ret = actor->actor (req); + +                        if (ret != 0) { +                                rpcsvc_check_and_reply_error (ret, NULL, req); +                        } +                        req = NULL; +                } +                  if (done)                          break; +        } -                THIS = req->svc->xl; +        return NULL; +} -                actor = rpcsvc_program_actor (req); +int +rpcsvc_spawn_threads (rpcsvc_t *svc, rpcsvc_program_t *program) +{ +        int                ret  = 0, delta = 0, creates = 0; -                ret = actor->actor (req); +        if (!program || !svc) +                goto out; -                if (ret != 0) { -                        rpcsvc_check_and_reply_error (ret, NULL, req); +        pthread_mutex_lock (&program->queue_lock); +        { +                delta = program->eventthreadcount - program->threadcount; + +                if (delta >= 0) { +                        while (delta--) { +                                ret = gf_thread_create (&program->thread, NULL, +                                                        rpcsvc_request_handler, +                                                        program, "rpcrqhnd"); +                                if (!ret) { +                                        program->threadcount++; +                                        creates++; +                                } +                        } + +                        if (creates) { +                                gf_log (GF_RPCSVC, GF_LOG_INFO, +                                        "spawned %d threads for program '%s'; " +                                        "total count:%d", +                                        creates, +                                        program->progname, +                                        program->threadcount); +                        } +                } else { +                        gf_log (GF_RPCSVC, GF_LOG_INFO, +                                "terminating %d threads for program '%s'", +                                -delta, program->progname); + +                        /* this signal is to just wake up the threads so they +                         * test for the change in eventthreadcount and kill +                         * themselves until the program thread count becomes +                         * equal to the event thread count +                         */ +                        pthread_cond_broadcast (&program->queue_cond);                  }          } +        pthread_mutex_unlock (&program->queue_lock); -        return NULL; +out: +        return creates;  }  int @@ -2004,6 +2069,7 @@ rpcsvc_program_register (rpcsvc_t *svc, rpcsvc_program_t *program,                           gf_boolean_t add_to_head)  {          int               ret                = -1; +        int               creates            = -1;          rpcsvc_program_t *newprog            = NULL;          char              already_registered = 0; @@ -2051,12 +2117,11 @@ rpcsvc_program_register (rpcsvc_t *svc, rpcsvc_program_t *program,                  newprog->ownthread = _gf_false;          if (newprog->ownthread) { -                ret = gf_thread_create (&newprog->thread, NULL, -                                        rpcsvc_request_handler, -                                        newprog, "rpcsvcrh"); -                if (ret != 0) { -                        gf_log (GF_RPCSVC, GF_LOG_ERROR, -                                "error creating request handler thread"); +                newprog->eventthreadcount = 1; +                creates = rpcsvc_spawn_threads (svc, newprog); + +                if (creates < 1) { +                        goto out;                  }          } @@ -2924,6 +2989,38 @@ out:          return ret;  } +/* During reconfigure, Make sure to call this function after event-threads are + * reconfigured as programs' threadcount will be made equal to event threads. + */ + +int +rpcsvc_ownthread_reconf (rpcsvc_t *svc, int new_eventthreadcount) +{ +        int ret = -1; +        rpcsvc_program_t *program = NULL; + +        if (!svc) { +                ret = 0; +                goto out; +        } + +        pthread_rwlock_wrlock (&svc->rpclock); +        { +                list_for_each_entry (program, &svc->programs, program) { +                        if (program->ownthread) { +                                program->eventthreadcount = +                                        new_eventthreadcount; +                                rpcsvc_spawn_threads (svc, program); +                        } +                } +        } +        pthread_rwlock_unlock (&svc->rpclock); + +        ret = 0; +out: +        return ret; +} +  rpcsvc_actor_t gluster_dump_actors[GF_DUMP_MAXVALUE] = {          [GF_DUMP_NULL]      = {"NULL",     GF_DUMP_NULL,     NULL,        NULL, 0, DRC_NA}, diff --git a/rpc/rpc-lib/src/rpcsvc.h b/rpc/rpc-lib/src/rpcsvc.h index b2814de4c45..578b4e13025 100644 --- a/rpc/rpc-lib/src/rpcsvc.h +++ b/rpc/rpc-lib/src/rpcsvc.h @@ -422,6 +422,12 @@ struct rpcsvc_program {          pthread_mutex_t         queue_lock;          pthread_cond_t          queue_cond;          pthread_t               thread; +        int                     threadcount; +        /* eventthreadcount is just a readonly copy of the actual value +         * owned by the event sub-system +         * It is used to control the scaling of rpcsvc_request_handler threads +         */ +        int                     eventthreadcount;  };  typedef struct rpcsvc_cbk_program { @@ -642,6 +648,8 @@ rpcsvc_get_program_vector_sizer (rpcsvc_t *svc, uint32_t prognum,                                   uint32_t progver, int procnum);  void -rpcsvc_autoscale_threads (glusterfs_ctx_t *ctx, rpcsvc_t *rpc, int incr); +rpcsvc_autoscale_threads (glusterfs_ctx_t *ctx, int incr, xlator_t *this); +extern int +rpcsvc_ownthread_reconf (rpcsvc_t *svc, int new_eventthreadcount);  #endif diff --git a/xlators/protocol/server/src/server.c b/xlators/protocol/server/src/server.c index db96363123b..6f1d2728847 100644 --- a/xlators/protocol/server/src/server.c +++ b/xlators/protocol/server/src/server.c @@ -844,6 +844,12 @@ do_rpc:          if (ret)                  goto out; +        /* rpcsvc thread reconfigure should be after events thread +         * reconfigure +         */ +        new_nthread = +        ((struct event_pool *)(this->ctx->event_pool))->eventthreadcount; +        ret = rpcsvc_ownthread_reconf (rpc_conf, new_nthread);  out:          THIS = oldTHIS;          gf_msg_debug ("", 0, "returning %d", ret); @@ -1499,7 +1505,8 @@ server_notify (xlator_t *this, int32_t event, void *data, ...)                          if (victim_found)                                  (*trav_p) = (*trav_p)->next;                          rpc_clnt_mgmt_pmap_signout (ctx, victim->name); -                        rpcsvc_autoscale_threads (ctx, conf->rpc, -1); +                        /* we need the protocol/server xlator here as 'this' */ +                        rpcsvc_autoscale_threads (ctx, -1, this);                          default_notify (victim, GF_EVENT_CLEANUP, data);                  }                  break;  | 
