summaryrefslogtreecommitdiffstats
path: root/rpc
diff options
context:
space:
mode:
authorRaghavendra G <rgowdapp@redhat.com>2017-05-05 15:21:30 +0530
committerRaghavendra G <rgowdapp@redhat.com>2017-05-12 05:26:42 +0000
commitcea8b702506ff914deadd056f4b7dd20a3ca7670 (patch)
tree954ca7e37696d57725d06343168bf7c6ed8bf22d /rpc
parent333474e0d6efe1a2b3a9ecffc9bdff3e49325910 (diff)
event/epoll: Add back socket for polling of events immediately after
reading the entire rpc message from the wire Currently socket is added back for future events after higher layers (rpc, xlators etc) have processed the message. If message processing involves signficant delay (as in writev replies processed by Erasure Coding), performance takes hit. Hence this patch modifies transport/socket to add back the socket for polling of events immediately after reading the entire rpc message, but before notification to higher layers. credits: Thanks to "Kotresh Hiremath Ravishankar" <khiremat@redhat.com> for assitance in fixing a regression in bitrot caused by this patch. Change-Id: I04b6b9d0b51a1cfb86ecac3c3d87a5f388cf5800 BUG: 1448364 Signed-off-by: Raghavendra G <rgowdapp@redhat.com> Reviewed-on: https://review.gluster.org/15036 CentOS-regression: Gluster Build System <jenkins@build.gluster.org> NetBSD-regression: NetBSD Build System <jenkins@build.gluster.org> Smoke: Gluster Build System <jenkins@build.gluster.org> Reviewed-by: Amar Tumballi <amarts@redhat.com>
Diffstat (limited to 'rpc')
-rw-r--r--rpc/rpc-transport/socket/src/socket.c96
-rw-r--r--rpc/rpc-transport/socket/src/socket.h6
2 files changed, 83 insertions, 19 deletions
diff --git a/rpc/rpc-transport/socket/src/socket.c b/rpc/rpc-transport/socket/src/socket.c
index 8ba2692..e14152c 100644
--- a/rpc/rpc-transport/socket/src/socket.c
+++ b/rpc/rpc-transport/socket/src/socket.c
@@ -1172,11 +1172,11 @@ out:
}
-static int
-socket_event_poll_err (rpc_transport_t *this)
+static gf_boolean_t
+socket_event_poll_err (rpc_transport_t *this, int gen, int idx)
{
- socket_private_t *priv = NULL;
- int ret = -1;
+ socket_private_t *priv = NULL;
+ gf_boolean_t socket_closed = _gf_false;
GF_VALIDATE_OR_GOTO ("socket", this, out);
GF_VALIDATE_OR_GOTO ("socket", this->private, out);
@@ -1185,15 +1185,29 @@ socket_event_poll_err (rpc_transport_t *this)
pthread_mutex_lock (&priv->lock);
{
- __socket_ioq_flush (this);
- __socket_reset (this);
+ if ((priv->gen == gen) && (priv->idx == idx)
+ && (priv->sock != -1)) {
+ __socket_ioq_flush (this);
+ __socket_reset (this);
+ socket_closed = _gf_true;
+ }
}
pthread_mutex_unlock (&priv->lock);
- rpc_transport_notify (this, RPC_TRANSPORT_DISCONNECT, this);
+ if (socket_closed) {
+ pthread_mutex_lock (&priv->notify.lock);
+ {
+ while (priv->notify.in_progress)
+ pthread_cond_wait (&priv->notify.cond,
+ &priv->notify.lock);
+ }
+ pthread_mutex_unlock (&priv->notify.lock);
+
+ rpc_transport_notify (this, RPC_TRANSPORT_DISCONNECT, this);
+ }
out:
- return ret;
+ return socket_closed;
}
@@ -2271,22 +2285,50 @@ out:
static int
-socket_event_poll_in (rpc_transport_t *this)
+socket_event_poll_in (rpc_transport_t *this, gf_boolean_t notify_handled)
{
int ret = -1;
rpc_transport_pollin_t *pollin = NULL;
socket_private_t *priv = this->private;
+ glusterfs_ctx_t *ctx = NULL;
+
+ ctx = this->ctx;
ret = socket_proto_state_machine (this, &pollin);
+ if (pollin) {
+ pthread_mutex_lock (&priv->notify.lock);
+ {
+ priv->notify.in_progress++;
+ }
+ pthread_mutex_unlock (&priv->notify.lock);
+ }
+
+
+ if (notify_handled && (ret != -1))
+ event_handled (ctx->event_pool, priv->sock, priv->idx,
+ priv->gen);
+
if (pollin) {
priv->ot_state = OT_CALLBACK;
+
ret = rpc_transport_notify (this, RPC_TRANSPORT_MSG_RECEIVED,
pollin);
+
if (priv->ot_state == OT_CALLBACK) {
priv->ot_state = OT_RUNNING;
}
+
rpc_transport_pollin_destroy (pollin);
+
+ pthread_mutex_lock (&priv->notify.lock);
+ {
+ --priv->notify.in_progress;
+
+ if (!priv->notify.in_progress)
+ pthread_cond_signal (&priv->notify.cond);
+ }
+ pthread_mutex_unlock (&priv->notify.lock);
}
return ret;
@@ -2369,24 +2411,29 @@ static int socket_disconnect (rpc_transport_t *this, gf_boolean_t wait);
/* reads rpc_requests during pollin */
static int
-socket_event_handler (int fd, int idx, void *data,
+socket_event_handler (int fd, int idx, int gen, void *data,
int poll_in, int poll_out, int poll_err)
{
- rpc_transport_t *this = NULL;
- socket_private_t *priv = NULL;
- int ret = -1;
+ rpc_transport_t *this = NULL;
+ socket_private_t *priv = NULL;
+ int ret = -1;
+ glusterfs_ctx_t *ctx = NULL;
+ gf_boolean_t socket_closed = _gf_false, notify_handled = _gf_false;
this = data;
+
GF_VALIDATE_OR_GOTO ("socket", this, out);
GF_VALIDATE_OR_GOTO ("socket", this->private, out);
GF_VALIDATE_OR_GOTO ("socket", this->xl, out);
THIS = this->xl;
priv = this->private;
+ ctx = this->ctx;
pthread_mutex_lock (&priv->lock);
{
priv->idx = idx;
+ priv->gen = gen;
}
pthread_mutex_unlock (&priv->lock);
@@ -2417,16 +2464,23 @@ socket_event_handler (int fd, int idx, void *data,
}
if (!ret && poll_in) {
- ret = socket_event_poll_in (this);
+ ret = socket_event_poll_in (this, !poll_err);
+ notify_handled = _gf_true;
}
if ((ret < 0) || poll_err) {
/* Logging has happened already in earlier cases */
gf_log ("transport", ((ret >= 0) ? GF_LOG_INFO : GF_LOG_DEBUG),
"EPOLLERR - disconnecting now");
- socket_event_poll_err (this);
- rpc_transport_unref (this);
- }
+
+ socket_closed = socket_event_poll_err (this, gen, idx);
+
+ if (socket_closed)
+ rpc_transport_unref (this);
+
+ } else if (!notify_handled) {
+ event_handled (ctx->event_pool, fd, idx, gen);
+ }
out:
return ret;
@@ -2533,7 +2587,7 @@ socket_poller (void *ctx)
}
if (pfd[1].revents & POLL_MASK_INPUT) {
- ret = socket_event_poll_in(this);
+ ret = socket_event_poll_in(this, 0);
if (ret >= 0) {
/* Suppress errors while making progress. */
pfd[1].revents &= ~POLL_MASK_ERROR;
@@ -2657,7 +2711,7 @@ socket_spawn (rpc_transport_t *this)
}
static int
-socket_server_event_handler (int fd, int idx, void *data,
+socket_server_event_handler (int fd, int idx, int gen, void *data,
int poll_in, int poll_out, int poll_err)
{
rpc_transport_t *this = NULL;
@@ -2913,6 +2967,8 @@ socket_server_event_handler (int fd, int idx, void *data,
}
}
out:
+ event_handled (ctx->event_pool, fd, idx, gen);
+
if (cname && (cname != this->ssl_name)) {
GF_FREE(cname);
}
@@ -4024,6 +4080,8 @@ socket_init (rpc_transport_t *this)
priv->bio = 0;
priv->windowsize = GF_DEFAULT_SOCKET_WINDOW_SIZE;
INIT_LIST_HEAD (&priv->ioq);
+ pthread_mutex_init (&priv->notify.lock, NULL);
+ pthread_cond_init (&priv->notify.cond, NULL);
/* All the below section needs 'this->options' to be present */
if (!this->options)
diff --git a/rpc/rpc-transport/socket/src/socket.h b/rpc/rpc-transport/socket/src/socket.h
index 6c8875f..e299a3d 100644
--- a/rpc/rpc-transport/socket/src/socket.h
+++ b/rpc/rpc-transport/socket/src/socket.h
@@ -203,6 +203,7 @@ typedef enum {
typedef struct {
int32_t sock;
int32_t idx;
+ int32_t gen;
/* -1 = not connected. 0 = in progress. 1 = connected */
char connected;
/* 1 = connect failed for reasons other than EINPROGRESS/ENOENT
@@ -254,6 +255,11 @@ typedef struct {
int log_ctr;
GF_REF_DECL; /* refcount to keep track of socket_poller
threads */
+ struct {
+ pthread_mutex_t lock;
+ pthread_cond_t cond;
+ uint64_t in_progress;
+ } notify;
} socket_private_t;