summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--rpc/rpc-lib/src/rpc-transport.c13
-rw-r--r--rpc/rpc-lib/src/rpc-transport.h6
-rw-r--r--rpc/rpc-lib/src/rpcsvc-common.h3
-rw-r--r--rpc/rpc-lib/src/rpcsvc.c62
-rw-r--r--rpc/rpc-lib/src/rpcsvc.h3
-rw-r--r--rpc/rpc-transport/socket/src/socket.c20
-rw-r--r--xlators/mgmt/glusterd/src/glusterd-volume-set.c12
-rw-r--r--xlators/nfs/server/src/nfs.c9
8 files changed, 128 insertions, 0 deletions
diff --git a/rpc/rpc-lib/src/rpc-transport.c b/rpc/rpc-lib/src/rpc-transport.c
index 89f3b3e8a0a..5f2e91c7099 100644
--- a/rpc/rpc-lib/src/rpc-transport.c
+++ b/rpc/rpc-lib/src/rpc-transport.c
@@ -69,6 +69,19 @@ out:
return ret;
}
+int
+rpc_transport_throttle (rpc_transport_t *this, gf_boolean_t onoff)
+{
+ int ret = 0;
+
+ if (!this->ops->throttle)
+ return -ENOSYS;
+
+ ret = this->ops->throttle (this, onoff);
+
+ return ret;
+}
+
int32_t
rpc_transport_get_peeraddr (rpc_transport_t *this, char *peeraddr, int addrlen,
struct sockaddr_storage *sa, size_t salen)
diff --git a/rpc/rpc-lib/src/rpc-transport.h b/rpc/rpc-lib/src/rpc-transport.h
index aa9df72b2dd..2db9072ae49 100644
--- a/rpc/rpc-lib/src/rpc-transport.h
+++ b/rpc/rpc-lib/src/rpc-transport.h
@@ -192,6 +192,8 @@ struct rpc_transport {
pthread_mutex_t lock;
int32_t refcount;
+ int32_t outstanding_rpc_count;
+
glusterfs_ctx_t *ctx;
dict_t *options;
char *name;
@@ -235,6 +237,7 @@ struct rpc_transport_ops {
int32_t (*get_myaddr) (rpc_transport_t *this, char *peeraddr,
int addrlen, struct sockaddr_storage *sa,
socklen_t sasize);
+ int32_t (*throttle) (rpc_transport_t *this, gf_boolean_t onoff);
};
@@ -288,6 +291,9 @@ int32_t
rpc_transport_get_myaddr (rpc_transport_t *this, char *peeraddr, int addrlen,
struct sockaddr_storage *sa, size_t salen);
+int
+rpc_transport_throttle (rpc_transport_t *this, gf_boolean_t onoff);
+
rpc_transport_pollin_t *
rpc_transport_pollin_alloc (rpc_transport_t *this, struct iovec *vector,
int count, struct iobuf *hdr_iobuf,
diff --git a/rpc/rpc-lib/src/rpcsvc-common.h b/rpc/rpc-lib/src/rpcsvc-common.h
index 054e187c96d..53c1a8fe3b2 100644
--- a/rpc/rpc-lib/src/rpcsvc-common.h
+++ b/rpc/rpc-lib/src/rpcsvc-common.h
@@ -71,6 +71,9 @@ typedef struct rpcsvc_state {
rpcsvc_notify_t notifyfn;
struct mem_pool *rxpool;
rpcsvc_drc_globals_t *drc;
+
+ /* per-client limit of outstanding rpc requests */
+ int outstanding_rpc_limit;
} rpcsvc_t;
/* DRC START */
diff --git a/rpc/rpc-lib/src/rpcsvc.c b/rpc/rpc-lib/src/rpcsvc.c
index 254a05d664d..8fe2e52bcdc 100644
--- a/rpc/rpc-lib/src/rpcsvc.c
+++ b/rpc/rpc-lib/src/rpcsvc.c
@@ -129,6 +129,37 @@ rpcsvc_get_program_vector_sizer (rpcsvc_t *svc, uint32_t prognum,
return NULL;
}
+int
+rpcsvc_request_outstanding (rpcsvc_t *svc, rpc_transport_t *trans, int delta)
+{
+ int ret = 0;
+ int old_count = 0;
+ int new_count = 0;
+ int limit = 0;
+
+ pthread_mutex_lock (&trans->lock);
+ {
+ limit = svc->outstanding_rpc_limit;
+ if (!limit)
+ goto unlock;
+
+ old_count = trans->outstanding_rpc_count;
+ trans->outstanding_rpc_count += delta;
+ new_count = trans->outstanding_rpc_count;
+
+ if (old_count <= limit && new_count > limit)
+ ret = rpc_transport_throttle (trans, _gf_true);
+
+ if (old_count > limit && new_count <= limit)
+ ret = rpc_transport_throttle (trans, _gf_false);
+ }
+unlock:
+ pthread_mutex_unlock (&trans->lock);
+
+ return ret;
+}
+
+
/* This needs to change to returning errors, since
* we need to return RPC specific error messages when some
* of the pointers below are NULL.
@@ -279,6 +310,13 @@ rpcsvc_request_destroy (rpcsvc_request_t *req)
if (req->hdr_iobuf)
iobuf_unref (req->hdr_iobuf);
+ /* This marks the "end" of an RPC request. Reply is
+ completely written to the socket and is on the way
+ to the client. It is time to decrement the
+ outstanding request counter by 1.
+ */
+ rpcsvc_request_outstanding (req->svc, req->trans, -1);
+
rpc_transport_unref (req->trans);
GF_FREE (req->auxgidlarge);
@@ -365,6 +403,12 @@ rpcsvc_request_create (rpcsvc_t *svc, rpc_transport_t *trans,
goto err;
}
+ /* We just received a new request from the wire. Account for
+ it in the outsanding request counter to make sure we don't
+ ingest too many concurrent requests from the same client.
+ */
+ ret = rpcsvc_request_outstanding (svc, trans, +1);
+
msgbuf = msg->vector[0].iov_base;
msglen = msg->vector[0].iov_len;
@@ -1845,6 +1889,24 @@ rpcsvc_init_options (rpcsvc_t *svc, dict_t *options)
gf_log (GF_RPCSVC, GF_LOG_DEBUG, "Portmap registration "
"disabled");
+ svc->outstanding_rpc_limit = RPCSVC_DEFAULT_OUTSTANDING_RPC_LIMIT;
+
+ if (dict_get (options, "rpc.outstanding-rpc-limit")) {
+ ret = dict_get_str (options, "rpc.oustanding-rpc-limit",
+ &optstr);
+ if (ret < 0) {
+ gf_log (GF_RPCSVC, GF_LOG_ERROR, "Value went missing");
+ goto out;
+ }
+
+ ret = gf_string2int32 (optstr, &svc->outstanding_rpc_limit);
+ if (ret < 0) {
+ gf_log (GF_RPCSVC, GF_LOG_ERROR, "Invalid RPC limit %s",
+ optstr);
+ goto out;
+ }
+ }
+
ret = 0;
out:
return ret;
diff --git a/rpc/rpc-lib/src/rpcsvc.h b/rpc/rpc-lib/src/rpcsvc.h
index a08ee4b57d4..ac2f09beac5 100644
--- a/rpc/rpc-lib/src/rpcsvc.h
+++ b/rpc/rpc-lib/src/rpcsvc.h
@@ -38,6 +38,9 @@
#define MAX_IOVEC 16
#endif
+#define RPCSVC_DEFAULT_OUTSTANDING_RPC_LIMIT 64
+#define RPCSVC_MAX_OUTSTANDING_RPC_LIMIT 65536
+
#define GF_RPCSVC "rpc-service"
#define RPCSVC_THREAD_STACK_SIZE ((size_t)(1024 * GF_UNIT_KB))
diff --git a/rpc/rpc-transport/socket/src/socket.c b/rpc/rpc-transport/socket/src/socket.c
index 8883ccbb4d0..93da3f29690 100644
--- a/rpc/rpc-transport/socket/src/socket.c
+++ b/rpc/rpc-transport/socket/src/socket.c
@@ -3261,6 +3261,25 @@ out:
}
+static int
+socket_throttle (rpc_transport_t *this, gf_boolean_t onoff)
+{
+ socket_private_t *priv = NULL;
+
+ priv = this->private;
+
+ /* The way we implement throttling is by taking off
+ POLLIN event from the polled flags. This way we
+ never get called with the POLLIN event and therefore
+ will never read() any more data until throttling
+ is turned off.
+ */
+ priv->idx = event_select_on (this->ctx->event_pool, priv->sock,
+ priv->idx, (int) !onoff, -1);
+ return 0;
+}
+
+
struct rpc_transport_ops tops = {
.listen = socket_listen,
.connect = socket_connect,
@@ -3271,6 +3290,7 @@ struct rpc_transport_ops tops = {
.get_peeraddr = socket_getpeeraddr,
.get_myname = socket_getmyname,
.get_myaddr = socket_getmyaddr,
+ .throttle = socket_throttle,
};
int
diff --git a/xlators/mgmt/glusterd/src/glusterd-volume-set.c b/xlators/mgmt/glusterd/src/glusterd-volume-set.c
index 76d2333d7db..adf0f2922ee 100644
--- a/xlators/mgmt/glusterd/src/glusterd-volume-set.c
+++ b/xlators/mgmt/glusterd/src/glusterd-volume-set.c
@@ -808,6 +808,12 @@ struct volopt_map_entry glusterd_volopt_map[] = {
.option = "statedump-path",
.op_version = 1
},
+ { .key = "server.outstanding-rpc-limit",
+ .voltype = "protocol/server",
+ .option = "rpc.outstanding-rpc-limit",
+ .type = GLOBAL_DOC,
+ .op_version = 3
+ },
{ .key = "features.lock-heal",
.voltype = "protocol/server",
.option = "lk-heal",
@@ -1142,6 +1148,12 @@ struct volopt_map_entry glusterd_volopt_map[] = {
.type = GLOBAL_DOC,
.op_version = 1
},
+ { .key = "nfs.outstanding-rpc-limit",
+ .voltype = "nfs/server",
+ .option = "rpc.outstanding-rpc-limit",
+ .type = GLOBAL_DOC,
+ .op_version = 3
+ },
{ .key = "nfs.port",
.voltype = "nfs/server",
.option = "nfs.port",
diff --git a/xlators/nfs/server/src/nfs.c b/xlators/nfs/server/src/nfs.c
index 8158f954f22..33bd36e7461 100644
--- a/xlators/nfs/server/src/nfs.c
+++ b/xlators/nfs/server/src/nfs.c
@@ -1359,6 +1359,15 @@ struct volume_options options[] = {
"portmap service. Use this option to turn off portmap "
"registration for Gluster NFS. On by default"
},
+ { .key = {"rpc.outstanding-rpc-limit"},
+ .type = GF_OPTION_TYPE_INT,
+ .min = 0,
+ .max = RPCSVC_MAX_OUTSTANDING_RPC_LIMIT,
+ .default_value = TOSTRING(RPCSVC_DEFAULT_OUTSTANDING_RPC_LIMIT),
+ .description = "Parameter to throttle the number of incoming RPC "
+ "requests from a client. 0 means no limit (can "
+ "potentially run out of memory)"
+ },
{ .key = {"nfs.port"},
.type = GF_OPTION_TYPE_INT,
.min = 1,