diff options
| author | Anand Avati <avati@redhat.com> | 2013-10-20 08:45:18 -0700 | 
|---|---|---|
| committer | Vijay Bellur <vbellur@redhat.com> | 2013-10-28 00:33:19 -0700 | 
| commit | a4056292528db49a666422c7f8e0c032441cc83f (patch) | |
| tree | 3d433a1ed29f1a07fdfc1425d2a52018e67328ad /rpc | |
| parent | 0162933589d025ca1812e159368d107cfc355e8e (diff) | |
rpcsvc: implement per-client RPC throttling
Implement a limit on the total number of outstanding RPC requests
from a given cient. Once the limit is reached the client socket
is removed from POLL-IN event polling.
Change-Id: I8071b8c89b78d02e830e6af5a540308199d6bdcd
BUG: 1008301
Signed-off-by: Anand Avati <avati@redhat.com>
Reviewed-on: http://review.gluster.org/6114
Reviewed-by: Santosh Pradhan <spradhan@redhat.com>
Reviewed-by: Rajesh Joseph <rjoseph@redhat.com>
Reviewed-by: Harshavardhana <harsha@harshavardhana.net>
Reviewed-by: Vijay Bellur <vbellur@redhat.com>
Tested-by: Gluster Build System <jenkins@build.gluster.com>
Diffstat (limited to 'rpc')
| -rw-r--r-- | rpc/rpc-lib/src/rpc-transport.c | 13 | ||||
| -rw-r--r-- | rpc/rpc-lib/src/rpc-transport.h | 6 | ||||
| -rw-r--r-- | rpc/rpc-lib/src/rpcsvc-common.h | 3 | ||||
| -rw-r--r-- | rpc/rpc-lib/src/rpcsvc.c | 62 | ||||
| -rw-r--r-- | rpc/rpc-lib/src/rpcsvc.h | 3 | ||||
| -rw-r--r-- | rpc/rpc-transport/socket/src/socket.c | 20 | 
6 files changed, 107 insertions, 0 deletions
diff --git a/rpc/rpc-lib/src/rpc-transport.c b/rpc/rpc-lib/src/rpc-transport.c index 89f3b3e8a0a..5f2e91c7099 100644 --- a/rpc/rpc-lib/src/rpc-transport.c +++ b/rpc/rpc-lib/src/rpc-transport.c @@ -69,6 +69,19 @@ out:          return ret;  } +int +rpc_transport_throttle (rpc_transport_t *this, gf_boolean_t onoff) +{ +        int ret = 0; + +        if (!this->ops->throttle) +                return -ENOSYS; + +        ret = this->ops->throttle (this, onoff); + +        return ret; +} +  int32_t  rpc_transport_get_peeraddr (rpc_transport_t *this, char *peeraddr, int addrlen,                              struct sockaddr_storage *sa, size_t salen) diff --git a/rpc/rpc-lib/src/rpc-transport.h b/rpc/rpc-lib/src/rpc-transport.h index aa9df72b2dd..2db9072ae49 100644 --- a/rpc/rpc-lib/src/rpc-transport.h +++ b/rpc/rpc-lib/src/rpc-transport.h @@ -192,6 +192,8 @@ struct rpc_transport {          pthread_mutex_t            lock;          int32_t                    refcount; +        int32_t                    outstanding_rpc_count; +          glusterfs_ctx_t           *ctx;          dict_t                    *options;          char                      *name; @@ -235,6 +237,7 @@ struct rpc_transport_ops {          int32_t (*get_myaddr)     (rpc_transport_t *this, char *peeraddr,                                     int addrlen, struct sockaddr_storage *sa,                                     socklen_t sasize); +        int32_t (*throttle)       (rpc_transport_t *this, gf_boolean_t onoff);  }; @@ -288,6 +291,9 @@ int32_t  rpc_transport_get_myaddr (rpc_transport_t *this, char *peeraddr, int addrlen,                            struct sockaddr_storage *sa, size_t salen); +int +rpc_transport_throttle (rpc_transport_t *this, gf_boolean_t onoff); +  rpc_transport_pollin_t *  rpc_transport_pollin_alloc (rpc_transport_t *this, struct iovec *vector,                              int count, struct iobuf *hdr_iobuf, diff --git a/rpc/rpc-lib/src/rpcsvc-common.h b/rpc/rpc-lib/src/rpcsvc-common.h index 054e187c96d..53c1a8fe3b2 100644 --- a/rpc/rpc-lib/src/rpcsvc-common.h +++ b/rpc/rpc-lib/src/rpcsvc-common.h @@ -71,6 +71,9 @@ typedef struct rpcsvc_state {          rpcsvc_notify_t          notifyfn;          struct mem_pool         *rxpool;          rpcsvc_drc_globals_t    *drc; + +	/* per-client limit of outstanding rpc requests */ +	int                      outstanding_rpc_limit;  } rpcsvc_t;  /* DRC START */ diff --git a/rpc/rpc-lib/src/rpcsvc.c b/rpc/rpc-lib/src/rpcsvc.c index 254a05d664d..8fe2e52bcdc 100644 --- a/rpc/rpc-lib/src/rpcsvc.c +++ b/rpc/rpc-lib/src/rpcsvc.c @@ -129,6 +129,37 @@ rpcsvc_get_program_vector_sizer (rpcsvc_t *svc, uint32_t prognum,                  return NULL;  } +int +rpcsvc_request_outstanding (rpcsvc_t *svc, rpc_transport_t *trans, int delta) +{ +        int ret = 0; +        int old_count = 0; +        int new_count = 0; +        int limit = 0; + +        pthread_mutex_lock (&trans->lock); +        { +                limit = svc->outstanding_rpc_limit; +                if (!limit) +                        goto unlock; + +                old_count = trans->outstanding_rpc_count; +                trans->outstanding_rpc_count += delta; +                new_count = trans->outstanding_rpc_count; + +                if (old_count <= limit && new_count > limit) +                        ret = rpc_transport_throttle (trans, _gf_true); + +                if (old_count > limit && new_count <= limit) +                        ret = rpc_transport_throttle (trans, _gf_false); +        } +unlock: +        pthread_mutex_unlock (&trans->lock); + +        return ret; +} + +  /* This needs to change to returning errors, since   * we need to return RPC specific error messages when some   * of the pointers below are NULL. @@ -279,6 +310,13 @@ rpcsvc_request_destroy (rpcsvc_request_t *req)          if (req->hdr_iobuf)                  iobuf_unref (req->hdr_iobuf); +        /* This marks the "end" of an RPC request. Reply is +           completely written to the socket and is on the way +           to the client. It is time to decrement the +           outstanding request counter by 1. +        */ +        rpcsvc_request_outstanding (req->svc, req->trans, -1); +          rpc_transport_unref (req->trans);  	GF_FREE (req->auxgidlarge); @@ -365,6 +403,12 @@ rpcsvc_request_create (rpcsvc_t *svc, rpc_transport_t *trans,                  goto err;          } +        /* We just received a new request from the wire. Account for +           it in the outsanding request counter to make sure we don't +           ingest too many concurrent requests from the same client. +        */ +        ret = rpcsvc_request_outstanding (svc, trans, +1); +          msgbuf = msg->vector[0].iov_base;          msglen = msg->vector[0].iov_len; @@ -1845,6 +1889,24 @@ rpcsvc_init_options (rpcsvc_t *svc, dict_t *options)                  gf_log (GF_RPCSVC, GF_LOG_DEBUG, "Portmap registration "                          "disabled"); +        svc->outstanding_rpc_limit = RPCSVC_DEFAULT_OUTSTANDING_RPC_LIMIT; + +        if (dict_get (options, "rpc.outstanding-rpc-limit")) { +                ret = dict_get_str (options, "rpc.oustanding-rpc-limit", +                                    &optstr); +                if (ret < 0) { +                        gf_log (GF_RPCSVC, GF_LOG_ERROR, "Value went missing"); +                        goto out; +                } + +                ret = gf_string2int32 (optstr, &svc->outstanding_rpc_limit); +                if (ret < 0) { +                        gf_log (GF_RPCSVC, GF_LOG_ERROR, "Invalid RPC limit %s", +                                optstr); +                        goto out; +                } +        } +          ret = 0;  out:          return ret; diff --git a/rpc/rpc-lib/src/rpcsvc.h b/rpc/rpc-lib/src/rpcsvc.h index a08ee4b57d4..ac2f09beac5 100644 --- a/rpc/rpc-lib/src/rpcsvc.h +++ b/rpc/rpc-lib/src/rpcsvc.h @@ -38,6 +38,9 @@  #define MAX_IOVEC 16  #endif +#define RPCSVC_DEFAULT_OUTSTANDING_RPC_LIMIT 64 +#define RPCSVC_MAX_OUTSTANDING_RPC_LIMIT 65536 +  #define GF_RPCSVC       "rpc-service"  #define RPCSVC_THREAD_STACK_SIZE ((size_t)(1024 * GF_UNIT_KB)) diff --git a/rpc/rpc-transport/socket/src/socket.c b/rpc/rpc-transport/socket/src/socket.c index 8883ccbb4d0..93da3f29690 100644 --- a/rpc/rpc-transport/socket/src/socket.c +++ b/rpc/rpc-transport/socket/src/socket.c @@ -3261,6 +3261,25 @@ out:  } +static int +socket_throttle (rpc_transport_t *this, gf_boolean_t onoff) +{ +        socket_private_t *priv = NULL; + +        priv = this->private; + +        /* The way we implement throttling is by taking off +           POLLIN event from the polled flags. This way we +           never get called with the POLLIN event and therefore +           will never read() any more data until throttling +           is turned off. +        */ +        priv->idx = event_select_on (this->ctx->event_pool, priv->sock, +                                     priv->idx, (int) !onoff, -1); +        return 0; +} + +  struct rpc_transport_ops tops = {          .listen             = socket_listen,          .connect            = socket_connect, @@ -3271,6 +3290,7 @@ struct rpc_transport_ops tops = {          .get_peeraddr       = socket_getpeeraddr,          .get_myname         = socket_getmyname,          .get_myaddr         = socket_getmyaddr, +	.throttle           = socket_throttle,  };  int  | 
