summaryrefslogtreecommitdiffstats
path: root/rpc/rpc-transport/socket/src/socket.c
diff options
context:
space:
mode:
authorJeff Darcy <jdarcy@redhat.com>2013-06-04 15:20:45 -0400
committerAnand Avati <avati@redhat.com>2013-06-04 15:37:54 -0700
commit5c1710ed60ccb151ccd7a2890b24bb99518d36da (patch)
treee267cd0208f99d5b3b6420c9e8eff3e029d862ca /rpc/rpc-transport/socket/src/socket.c
parentdbfe779f3049e6fbc2394bdacdb57165d51dc3f3 (diff)
transport/socket: fix connect/disconnect races
We might receive a connect request while a disconnect is still in progress, requiring more states and (the return of) poller generation numbers to avoid redundant pollers. We might also get either kind of request from within our own rpc_transport_notify upcall, so we have to avoid locking and use the PLEASE_DIE state instead. Change-Id: Icbaacf96c516b607a79ff62c90b74d42b241780f BUG: 970194 Signed-off-by: Jeff Darcy <jdarcy@redhat.com> Reviewed-on: http://review.gluster.org/5137 Tested-by: Gluster Build System <jenkins@build.gluster.com> Reviewed-by: Anand Avati <avati@redhat.com>
Diffstat (limited to 'rpc/rpc-transport/socket/src/socket.c')
-rw-r--r--rpc/rpc-transport/socket/src/socket.c99
1 files changed, 63 insertions, 36 deletions
diff --git a/rpc/rpc-transport/socket/src/socket.c b/rpc/rpc-transport/socket/src/socket.c
index 97c0dc57d..d33146ab5 100644
--- a/rpc/rpc-transport/socket/src/socket.c
+++ b/rpc/rpc-transport/socket/src/socket.c
@@ -629,25 +629,23 @@ __socket_disconnect (rpc_transport_t *this)
priv = this->private;
+ gf_log (this->name, GF_LOG_TRACE,
+ "disconnecting %p, state=%u gen=%u sock=%d", this,
+ priv->ot_state, priv->ot_gen, priv->sock);
+
if (priv->sock != -1) {
ret = __socket_shutdown(this);
if (priv->own_thread) {
- /*
- * Without this, reconnect (= disconnect + connect)
- * won't work except by accident.
- */
- close(priv->sock);
- priv->sock = -1;
/*
- * Closing the socket forces an error that will wake
- * up the polling thread. Wait for it to notice and
- * respond.
+ * Without this, reconnect (= disconnect + connect)
+ * won't work except by accident.
*/
- if (priv->ot_state == OT_ALIVE) {
- priv->ot_state = OT_DYING;
- pthread_cond_wait(&priv->ot_event,&priv->lock);
- }
- }
+ close(priv->sock);
+ priv->sock = -1;
+ gf_log (this->name, GF_LOG_TRACE,
+ "OT_PLEASE_DIE on %p", this);
+ priv->ot_state = OT_PLEASE_DIE;
+ }
else if (priv->use_ssl) {
ssl_teardown_connection(priv);
}
@@ -1415,7 +1413,7 @@ __socket_read_request (rpc_transport_t *this)
buf = rpc_procnum_addr (iobuf_ptr (in->iobuf));
procnum = ntoh32 (*((uint32_t *)buf));
- if (this->listener) {
+ if (priv->is_server) {
/* this check is needed as rpcsvc and rpc-clnt
* actor structures are not same */
vector_sizer =
@@ -2112,12 +2110,17 @@ socket_event_poll_in (rpc_transport_t *this)
{
int ret = -1;
rpc_transport_pollin_t *pollin = NULL;
+ socket_private_t *priv = this->private;
ret = socket_proto_state_machine (this, &pollin);
if (pollin != NULL) {
+ priv->ot_state = OT_CALLBACK;
ret = rpc_transport_notify (this, RPC_TRANSPORT_MSG_RECEIVED,
pollin);
+ if (priv->ot_state == OT_CALLBACK) {
+ priv->ot_state = OT_RUNNING;
+ }
rpc_transport_pollin_destroy (pollin);
}
@@ -2252,6 +2255,9 @@ socket_poller (void *ctx)
struct pollfd pfd[2] = {{0,},};
gf_boolean_t to_write = _gf_false;
int ret = 0;
+ uint32_t gen = 0;
+
+ priv->ot_state = OT_RUNNING;
if (priv->use_ssl) {
if (ssl_setup_connection(this,priv->connected) < 0) {
@@ -2287,6 +2293,7 @@ socket_poller (void *ctx)
"asynchronous rpc_transport_notify failed");
}
+ gen = priv->ot_gen;
for (;;) {
pthread_mutex_lock(&priv->lock);
to_write = !list_empty(&priv->ioq);
@@ -2323,6 +2330,13 @@ socket_poller (void *ctx)
else if (errno == ENOTCONN) {
ret = 0;
}
+ if (priv->ot_state == OT_PLEASE_DIE) {
+ gf_log (this->name, GF_LOG_TRACE,
+ "OT_IDLE on %p (input request)",
+ this);
+ priv->ot_state = OT_IDLE;
+ break;
+ }
}
else if (pfd[1].revents & POLL_MASK_OUTPUT) {
ret = socket_event_poll_out(this);
@@ -2333,6 +2347,13 @@ socket_poller (void *ctx)
else if (errno == ENOTCONN) {
ret = 0;
}
+ if (priv->ot_state == OT_PLEASE_DIE) {
+ gf_log (this->name, GF_LOG_TRACE,
+ "OT_IDLE on %p (output request)",
+ this);
+ priv->ot_state = OT_IDLE;
+ break;
+ }
}
else {
/*
@@ -2353,21 +2374,17 @@ socket_poller (void *ctx)
"error in polling loop");
break;
}
+ if (priv->ot_gen != gen) {
+ gf_log (this->name, GF_LOG_TRACE,
+ "generation mismatch, my %u != %u",
+ gen, priv->ot_gen);
+ return NULL;
+ }
}
err:
/* All (and only) I/O errors should come here. */
pthread_mutex_lock(&priv->lock);
- if (priv->ot_state == OT_ALIVE) {
- /*
- * We have to do this if we're here because of an error we
- * detected ourselves, but need to avoid a recursive call
- * if our death is the result of an external disconnect.
- */
- __socket_shutdown(this);
- close(priv->sock);
- priv->sock = -1;
- }
if (priv->ssl_ssl) {
/*
* We're always responsible for this part, but only actually
@@ -2376,16 +2393,14 @@ err:
*/
ssl_teardown_connection(priv);
}
+ __socket_shutdown(this);
+ close(priv->sock);
+ priv->sock = -1;
priv->ot_state = OT_IDLE;
- /*
- * We expect there to be only one waiter, but if there do happen to
- * be multiple it's probably better to unblock them than to let them
- * hang. If there are none, this is a harmless no-op.
- */
- pthread_cond_broadcast(&priv->ot_event);
pthread_mutex_unlock(&priv->lock);
- rpc_transport_notify (this->listener, RPC_TRANSPORT_DISCONNECT, this);
- rpc_transport_unref (this);
+ rpc_transport_notify (this->listener, RPC_TRANSPORT_DISCONNECT,
+ this);
+ rpc_transport_unref (this);
return NULL;
}
@@ -2395,13 +2410,20 @@ socket_spawn (rpc_transport_t *this)
{
socket_private_t *priv = this->private;
- if (priv->ot_state == OT_ALIVE) {
+ switch (priv->ot_state) {
+ case OT_IDLE:
+ case OT_PLEASE_DIE:
+ break;
+ default:
gf_log (this->name, GF_LOG_WARNING,
"refusing to start redundant poller");
return;
}
- priv->ot_state = OT_ALIVE;
+ priv->ot_gen += 7;
+ priv->ot_state = OT_SPAWNING;
+ gf_log (this->name, GF_LOG_TRACE,
+ "spawning %p with gen %u", this, priv->ot_gen);
if (pthread_create(&priv->thread,NULL,socket_poller,this) != 0) {
gf_log (this->name, GF_LOG_ERROR,
@@ -2554,6 +2576,7 @@ socket_server_event_handler (int fd, int idx, void *data,
* connection.
*/
new_priv->connected = 1;
+ new_priv->is_server = _gf_true;
rpc_transport_ref (new_trans);
if (new_priv->own_thread) {
@@ -2657,6 +2680,10 @@ socket_connect (rpc_transport_t *this, int port)
goto err;
}
+ gf_log (this->name, GF_LOG_TRACE,
+ "connecting %p, state=%u gen=%u sock=%d", this,
+ priv->ot_state, priv->ot_gen, priv->sock);
+
ret = socket_client_get_remote_sockaddr (this, &sock_union.sa,
&sockaddr_len, &sa_family);
if (ret == -1) {
@@ -2822,6 +2849,7 @@ socket_connect (rpc_transport_t *this, int port)
* initializing a client connection.
*/
priv->connected = 0;
+ priv->is_server = _gf_false;
rpc_transport_ref (this);
if (priv->own_thread) {
@@ -3545,7 +3573,6 @@ socket_init (rpc_transport_t *this)
if (priv->own_thread) {
priv->ot_state = OT_IDLE;
- pthread_cond_init (&priv->ot_event, NULL);
}
out: