summaryrefslogtreecommitdiffstats
path: root/rpc
diff options
context:
space:
mode:
authorJeff Darcy <jdarcy@redhat.com>2013-06-04 15:20:45 -0400
committerAnand Avati <avati@redhat.com>2013-06-04 15:37:54 -0700
commit5c1710ed60ccb151ccd7a2890b24bb99518d36da (patch)
treee267cd0208f99d5b3b6420c9e8eff3e029d862ca /rpc
parentdbfe779f3049e6fbc2394bdacdb57165d51dc3f3 (diff)
transport/socket: fix connect/disconnect races
We might receive a connect request while a disconnect is still in progress, requiring more states and (the return of) poller generation numbers to avoid redundant pollers. We might also get either kind of request from within our own rpc_transport_notify upcall, so we have to avoid locking and use the PLEASE_DIE state instead. Change-Id: Icbaacf96c516b607a79ff62c90b74d42b241780f BUG: 970194 Signed-off-by: Jeff Darcy <jdarcy@redhat.com> Reviewed-on: http://review.gluster.org/5137 Tested-by: Gluster Build System <jenkins@build.gluster.com> Reviewed-by: Anand Avati <avati@redhat.com>
Diffstat (limited to 'rpc')
-rw-r--r--rpc/rpc-transport/socket/src/socket.c99
-rw-r--r--rpc/rpc-transport/socket/src/socket.h9
2 files changed, 69 insertions, 39 deletions
diff --git a/rpc/rpc-transport/socket/src/socket.c b/rpc/rpc-transport/socket/src/socket.c
index 97c0dc5..d33146a 100644
--- a/rpc/rpc-transport/socket/src/socket.c
+++ b/rpc/rpc-transport/socket/src/socket.c
@@ -629,25 +629,23 @@ __socket_disconnect (rpc_transport_t *this)
priv = this->private;
+ gf_log (this->name, GF_LOG_TRACE,
+ "disconnecting %p, state=%u gen=%u sock=%d", this,
+ priv->ot_state, priv->ot_gen, priv->sock);
+
if (priv->sock != -1) {
ret = __socket_shutdown(this);
if (priv->own_thread) {
- /*
- * Without this, reconnect (= disconnect + connect)
- * won't work except by accident.
- */
- close(priv->sock);
- priv->sock = -1;
/*
- * Closing the socket forces an error that will wake
- * up the polling thread. Wait for it to notice and
- * respond.
+ * Without this, reconnect (= disconnect + connect)
+ * won't work except by accident.
*/
- if (priv->ot_state == OT_ALIVE) {
- priv->ot_state = OT_DYING;
- pthread_cond_wait(&priv->ot_event,&priv->lock);
- }
- }
+ close(priv->sock);
+ priv->sock = -1;
+ gf_log (this->name, GF_LOG_TRACE,
+ "OT_PLEASE_DIE on %p", this);
+ priv->ot_state = OT_PLEASE_DIE;
+ }
else if (priv->use_ssl) {
ssl_teardown_connection(priv);
}
@@ -1415,7 +1413,7 @@ __socket_read_request (rpc_transport_t *this)
buf = rpc_procnum_addr (iobuf_ptr (in->iobuf));
procnum = ntoh32 (*((uint32_t *)buf));
- if (this->listener) {
+ if (priv->is_server) {
/* this check is needed as rpcsvc and rpc-clnt
* actor structures are not same */
vector_sizer =
@@ -2112,12 +2110,17 @@ socket_event_poll_in (rpc_transport_t *this)
{
int ret = -1;
rpc_transport_pollin_t *pollin = NULL;
+ socket_private_t *priv = this->private;
ret = socket_proto_state_machine (this, &pollin);
if (pollin != NULL) {
+ priv->ot_state = OT_CALLBACK;
ret = rpc_transport_notify (this, RPC_TRANSPORT_MSG_RECEIVED,
pollin);
+ if (priv->ot_state == OT_CALLBACK) {
+ priv->ot_state = OT_RUNNING;
+ }
rpc_transport_pollin_destroy (pollin);
}
@@ -2252,6 +2255,9 @@ socket_poller (void *ctx)
struct pollfd pfd[2] = {{0,},};
gf_boolean_t to_write = _gf_false;
int ret = 0;
+ uint32_t gen = 0;
+
+ priv->ot_state = OT_RUNNING;
if (priv->use_ssl) {
if (ssl_setup_connection(this,priv->connected) < 0) {
@@ -2287,6 +2293,7 @@ socket_poller (void *ctx)
"asynchronous rpc_transport_notify failed");
}
+ gen = priv->ot_gen;
for (;;) {
pthread_mutex_lock(&priv->lock);
to_write = !list_empty(&priv->ioq);
@@ -2323,6 +2330,13 @@ socket_poller (void *ctx)
else if (errno == ENOTCONN) {
ret = 0;
}
+ if (priv->ot_state == OT_PLEASE_DIE) {
+ gf_log (this->name, GF_LOG_TRACE,
+ "OT_IDLE on %p (input request)",
+ this);
+ priv->ot_state = OT_IDLE;
+ break;
+ }
}
else if (pfd[1].revents & POLL_MASK_OUTPUT) {
ret = socket_event_poll_out(this);
@@ -2333,6 +2347,13 @@ socket_poller (void *ctx)
else if (errno == ENOTCONN) {
ret = 0;
}
+ if (priv->ot_state == OT_PLEASE_DIE) {
+ gf_log (this->name, GF_LOG_TRACE,
+ "OT_IDLE on %p (output request)",
+ this);
+ priv->ot_state = OT_IDLE;
+ break;
+ }
}
else {
/*
@@ -2353,21 +2374,17 @@ socket_poller (void *ctx)
"error in polling loop");
break;
}
+ if (priv->ot_gen != gen) {
+ gf_log (this->name, GF_LOG_TRACE,
+ "generation mismatch, my %u != %u",
+ gen, priv->ot_gen);
+ return NULL;
+ }
}
err:
/* All (and only) I/O errors should come here. */
pthread_mutex_lock(&priv->lock);
- if (priv->ot_state == OT_ALIVE) {
- /*
- * We have to do this if we're here because of an error we
- * detected ourselves, but need to avoid a recursive call
- * if our death is the result of an external disconnect.
- */
- __socket_shutdown(this);
- close(priv->sock);
- priv->sock = -1;
- }
if (priv->ssl_ssl) {
/*
* We're always responsible for this part, but only actually
@@ -2376,16 +2393,14 @@ err:
*/
ssl_teardown_connection(priv);
}
+ __socket_shutdown(this);
+ close(priv->sock);
+ priv->sock = -1;
priv->ot_state = OT_IDLE;
- /*
- * We expect there to be only one waiter, but if there do happen to
- * be multiple it's probably better to unblock them than to let them
- * hang. If there are none, this is a harmless no-op.
- */
- pthread_cond_broadcast(&priv->ot_event);
pthread_mutex_unlock(&priv->lock);
- rpc_transport_notify (this->listener, RPC_TRANSPORT_DISCONNECT, this);
- rpc_transport_unref (this);
+ rpc_transport_notify (this->listener, RPC_TRANSPORT_DISCONNECT,
+ this);
+ rpc_transport_unref (this);
return NULL;
}
@@ -2395,13 +2410,20 @@ socket_spawn (rpc_transport_t *this)
{
socket_private_t *priv = this->private;
- if (priv->ot_state == OT_ALIVE) {
+ switch (priv->ot_state) {
+ case OT_IDLE:
+ case OT_PLEASE_DIE:
+ break;
+ default:
gf_log (this->name, GF_LOG_WARNING,
"refusing to start redundant poller");
return;
}
- priv->ot_state = OT_ALIVE;
+ priv->ot_gen += 7;
+ priv->ot_state = OT_SPAWNING;
+ gf_log (this->name, GF_LOG_TRACE,
+ "spawning %p with gen %u", this, priv->ot_gen);
if (pthread_create(&priv->thread,NULL,socket_poller,this) != 0) {
gf_log (this->name, GF_LOG_ERROR,
@@ -2554,6 +2576,7 @@ socket_server_event_handler (int fd, int idx, void *data,
* connection.
*/
new_priv->connected = 1;
+ new_priv->is_server = _gf_true;
rpc_transport_ref (new_trans);
if (new_priv->own_thread) {
@@ -2657,6 +2680,10 @@ socket_connect (rpc_transport_t *this, int port)
goto err;
}
+ gf_log (this->name, GF_LOG_TRACE,
+ "connecting %p, state=%u gen=%u sock=%d", this,
+ priv->ot_state, priv->ot_gen, priv->sock);
+
ret = socket_client_get_remote_sockaddr (this, &sock_union.sa,
&sockaddr_len, &sa_family);
if (ret == -1) {
@@ -2822,6 +2849,7 @@ socket_connect (rpc_transport_t *this, int port)
* initializing a client connection.
*/
priv->connected = 0;
+ priv->is_server = _gf_false;
rpc_transport_ref (this);
if (priv->own_thread) {
@@ -3545,7 +3573,6 @@ socket_init (rpc_transport_t *this)
if (priv->own_thread) {
priv->ot_state = OT_IDLE;
- pthread_cond_init (&priv->ot_event, NULL);
}
out:
diff --git a/rpc/rpc-transport/socket/src/socket.h b/rpc/rpc-transport/socket/src/socket.h
index bb342d9..e0b412f 100644
--- a/rpc/rpc-transport/socket/src/socket.h
+++ b/rpc/rpc-transport/socket/src/socket.h
@@ -186,8 +186,10 @@ struct gf_sock_incoming {
typedef enum {
OT_IDLE, /* Uninitialized or termination complete. */
- OT_ALIVE, /* Past pthread_create, no error/disconnect. */
- OT_DYING, /* Disconnect in progress. */
+ OT_SPAWNING, /* Past pthread_create but not in thread yet. */
+ OT_RUNNING, /* Poller thread running normally. */
+ OT_CALLBACK, /* Poller thread in the middle of a callback. */
+ OT_PLEASE_DIE, /* Poller termination requested. */
} ot_state_t;
typedef struct {
@@ -229,7 +231,8 @@ typedef struct {
int pipe[2];
gf_boolean_t own_thread;
ot_state_t ot_state;
- pthread_cond_t ot_event;
+ uint32_t ot_gen;
+ gf_boolean_t is_server;
} socket_private_t;