summaryrefslogtreecommitdiffstats
path: root/rpc
diff options
context:
space:
mode:
Diffstat (limited to 'rpc')
-rw-r--r--rpc/rpc-transport/socket/src/socket.c96
-rw-r--r--rpc/rpc-transport/socket/src/socket.h6
2 files changed, 83 insertions, 19 deletions
diff --git a/rpc/rpc-transport/socket/src/socket.c b/rpc/rpc-transport/socket/src/socket.c
index 8ba2692cdc6..e14152c5822 100644
--- a/rpc/rpc-transport/socket/src/socket.c
+++ b/rpc/rpc-transport/socket/src/socket.c
@@ -1172,11 +1172,11 @@ out:
}
-static int
-socket_event_poll_err (rpc_transport_t *this)
+static gf_boolean_t
+socket_event_poll_err (rpc_transport_t *this, int gen, int idx)
{
- socket_private_t *priv = NULL;
- int ret = -1;
+ socket_private_t *priv = NULL;
+ gf_boolean_t socket_closed = _gf_false;
GF_VALIDATE_OR_GOTO ("socket", this, out);
GF_VALIDATE_OR_GOTO ("socket", this->private, out);
@@ -1185,15 +1185,29 @@ socket_event_poll_err (rpc_transport_t *this)
pthread_mutex_lock (&priv->lock);
{
- __socket_ioq_flush (this);
- __socket_reset (this);
+ if ((priv->gen == gen) && (priv->idx == idx)
+ && (priv->sock != -1)) {
+ __socket_ioq_flush (this);
+ __socket_reset (this);
+ socket_closed = _gf_true;
+ }
}
pthread_mutex_unlock (&priv->lock);
- rpc_transport_notify (this, RPC_TRANSPORT_DISCONNECT, this);
+ if (socket_closed) {
+ pthread_mutex_lock (&priv->notify.lock);
+ {
+ while (priv->notify.in_progress)
+ pthread_cond_wait (&priv->notify.cond,
+ &priv->notify.lock);
+ }
+ pthread_mutex_unlock (&priv->notify.lock);
+
+ rpc_transport_notify (this, RPC_TRANSPORT_DISCONNECT, this);
+ }
out:
- return ret;
+ return socket_closed;
}
@@ -2271,22 +2285,50 @@ out:
static int
-socket_event_poll_in (rpc_transport_t *this)
+socket_event_poll_in (rpc_transport_t *this, gf_boolean_t notify_handled)
{
int ret = -1;
rpc_transport_pollin_t *pollin = NULL;
socket_private_t *priv = this->private;
+ glusterfs_ctx_t *ctx = NULL;
+
+ ctx = this->ctx;
ret = socket_proto_state_machine (this, &pollin);
+ if (pollin) {
+ pthread_mutex_lock (&priv->notify.lock);
+ {
+ priv->notify.in_progress++;
+ }
+ pthread_mutex_unlock (&priv->notify.lock);
+ }
+
+
+ if (notify_handled && (ret != -1))
+ event_handled (ctx->event_pool, priv->sock, priv->idx,
+ priv->gen);
+
if (pollin) {
priv->ot_state = OT_CALLBACK;
+
ret = rpc_transport_notify (this, RPC_TRANSPORT_MSG_RECEIVED,
pollin);
+
if (priv->ot_state == OT_CALLBACK) {
priv->ot_state = OT_RUNNING;
}
+
rpc_transport_pollin_destroy (pollin);
+
+ pthread_mutex_lock (&priv->notify.lock);
+ {
+ --priv->notify.in_progress;
+
+ if (!priv->notify.in_progress)
+ pthread_cond_signal (&priv->notify.cond);
+ }
+ pthread_mutex_unlock (&priv->notify.lock);
}
return ret;
@@ -2369,24 +2411,29 @@ static int socket_disconnect (rpc_transport_t *this, gf_boolean_t wait);
/* reads rpc_requests during pollin */
static int
-socket_event_handler (int fd, int idx, void *data,
+socket_event_handler (int fd, int idx, int gen, void *data,
int poll_in, int poll_out, int poll_err)
{
- rpc_transport_t *this = NULL;
- socket_private_t *priv = NULL;
- int ret = -1;
+ rpc_transport_t *this = NULL;
+ socket_private_t *priv = NULL;
+ int ret = -1;
+ glusterfs_ctx_t *ctx = NULL;
+ gf_boolean_t socket_closed = _gf_false, notify_handled = _gf_false;
this = data;
+
GF_VALIDATE_OR_GOTO ("socket", this, out);
GF_VALIDATE_OR_GOTO ("socket", this->private, out);
GF_VALIDATE_OR_GOTO ("socket", this->xl, out);
THIS = this->xl;
priv = this->private;
+ ctx = this->ctx;
pthread_mutex_lock (&priv->lock);
{
priv->idx = idx;
+ priv->gen = gen;
}
pthread_mutex_unlock (&priv->lock);
@@ -2417,16 +2464,23 @@ socket_event_handler (int fd, int idx, void *data,
}
if (!ret && poll_in) {
- ret = socket_event_poll_in (this);
+ ret = socket_event_poll_in (this, !poll_err);
+ notify_handled = _gf_true;
}
if ((ret < 0) || poll_err) {
/* Logging has happened already in earlier cases */
gf_log ("transport", ((ret >= 0) ? GF_LOG_INFO : GF_LOG_DEBUG),
"EPOLLERR - disconnecting now");
- socket_event_poll_err (this);
- rpc_transport_unref (this);
- }
+
+ socket_closed = socket_event_poll_err (this, gen, idx);
+
+ if (socket_closed)
+ rpc_transport_unref (this);
+
+ } else if (!notify_handled) {
+ event_handled (ctx->event_pool, fd, idx, gen);
+ }
out:
return ret;
@@ -2533,7 +2587,7 @@ socket_poller (void *ctx)
}
if (pfd[1].revents & POLL_MASK_INPUT) {
- ret = socket_event_poll_in(this);
+ ret = socket_event_poll_in(this, 0);
if (ret >= 0) {
/* Suppress errors while making progress. */
pfd[1].revents &= ~POLL_MASK_ERROR;
@@ -2657,7 +2711,7 @@ socket_spawn (rpc_transport_t *this)
}
static int
-socket_server_event_handler (int fd, int idx, void *data,
+socket_server_event_handler (int fd, int idx, int gen, void *data,
int poll_in, int poll_out, int poll_err)
{
rpc_transport_t *this = NULL;
@@ -2913,6 +2967,8 @@ socket_server_event_handler (int fd, int idx, void *data,
}
}
out:
+ event_handled (ctx->event_pool, fd, idx, gen);
+
if (cname && (cname != this->ssl_name)) {
GF_FREE(cname);
}
@@ -4024,6 +4080,8 @@ socket_init (rpc_transport_t *this)
priv->bio = 0;
priv->windowsize = GF_DEFAULT_SOCKET_WINDOW_SIZE;
INIT_LIST_HEAD (&priv->ioq);
+ pthread_mutex_init (&priv->notify.lock, NULL);
+ pthread_cond_init (&priv->notify.cond, NULL);
/* All the below section needs 'this->options' to be present */
if (!this->options)
diff --git a/rpc/rpc-transport/socket/src/socket.h b/rpc/rpc-transport/socket/src/socket.h
index 6c8875f7fb7..e299a3d7bd5 100644
--- a/rpc/rpc-transport/socket/src/socket.h
+++ b/rpc/rpc-transport/socket/src/socket.h
@@ -203,6 +203,7 @@ typedef enum {
typedef struct {
int32_t sock;
int32_t idx;
+ int32_t gen;
/* -1 = not connected. 0 = in progress. 1 = connected */
char connected;
/* 1 = connect failed for reasons other than EINPROGRESS/ENOENT
@@ -254,6 +255,11 @@ typedef struct {
int log_ctr;
GF_REF_DECL; /* refcount to keep track of socket_poller
threads */
+ struct {
+ pthread_mutex_t lock;
+ pthread_cond_t cond;
+ uint64_t in_progress;
+ } notify;
} socket_private_t;