summaryrefslogtreecommitdiffstats
path: root/rpc/rpc-transport
diff options
context:
space:
mode:
authorJeff Darcy <jdarcy@redhat.com>2013-01-31 14:23:36 -0500
committerAnand Avati <avati@redhat.com>2013-02-03 12:09:54 -0800
commit26d9d2bd27dd9e6ed9a77789afea0944032223d8 (patch)
tree89ba327fc9bf6d0cd8b5427cdf4f3ef5f666cfe1 /rpc/rpc-transport
parent77f86aae037ddbe581aea9829c4f541de47a35d6 (diff)
socket: restructure disconnect/poll-thread interactions
Change-Id: I792c28f52068e4ed666069b740739662685160bc BUG: 906401 Signed-off-by: Jeff Darcy <jdarcy@redhat.com> Reviewed-on: http://review.gluster.org/4456 Tested-by: Gluster Build System <jenkins@build.gluster.com> Reviewed-by: Anand Avati <avati@redhat.com>
Diffstat (limited to 'rpc/rpc-transport')
-rw-r--r--rpc/rpc-transport/socket/src/socket.c131
-rw-r--r--rpc/rpc-transport/socket/src/socket.h9
2 files changed, 102 insertions, 38 deletions
diff --git a/rpc/rpc-transport/socket/src/socket.c b/rpc/rpc-transport/socket/src/socket.c
index 7657e5d35..fffc137f6 100644
--- a/rpc/rpc-transport/socket/src/socket.c
+++ b/rpc/rpc-transport/socket/src/socket.c
@@ -311,6 +311,15 @@ done:
}
+void
+ssl_teardown_connection (socket_private_t *priv)
+{
+ SSL_shutdown(priv->ssl_ssl);
+ SSL_clear(priv->ssl_ssl);
+ SSL_free(priv->ssl_ssl);
+ priv->ssl_ssl = NULL;
+}
+
ssize_t
__socket_ssl_readv (rpc_transport_t *this, struct iovec *opvector, int opcount)
@@ -581,10 +590,29 @@ __socket_writev (rpc_transport_t *this, struct iovec *vector, int count,
int
+__socket_shutdown (rpc_transport_t *this)
+{
+ int ret = -1;
+ socket_private_t *priv = this->private;
+
+ priv->connected = -1;
+ ret = shutdown (priv->sock, SHUT_RDWR);
+ if (ret) {
+ /* its already disconnected.. no need to understand
+ why it failed to shutdown in normal cases */
+ gf_log (this->name, GF_LOG_DEBUG,
+ "shutdown() returned %d. %s",
+ ret, strerror (errno));
+ }
+
+ return ret;
+}
+
+int
__socket_disconnect (rpc_transport_t *this)
{
- socket_private_t *priv = NULL;
int ret = -1;
+ socket_private_t *priv = NULL;
GF_VALIDATE_OR_GOTO ("socket", this, out);
GF_VALIDATE_OR_GOTO ("socket", this->private, out);
@@ -592,21 +620,7 @@ __socket_disconnect (rpc_transport_t *this)
priv = this->private;
if (priv->sock != -1) {
- priv->connected = -1;
- ret = shutdown (priv->sock, SHUT_RDWR);
- if (ret) {
- /* its already disconnected.. no need to understand
- why it failed to shutdown in normal cases */
- gf_log (this->name, GF_LOG_DEBUG,
- "shutdown() returned %d. %s",
- ret, strerror (errno));
- }
- if (priv->ssl_ssl) {
- SSL_shutdown(priv->ssl_ssl);
- SSL_clear(priv->ssl_ssl);
- SSL_free(priv->ssl_ssl);
- priv->ssl_ssl = NULL;
- }
+ ret = __socket_shutdown(this);
if (priv->own_thread) {
/*
* Without this, reconnect (= disconnect + connect)
@@ -614,8 +628,19 @@ __socket_disconnect (rpc_transport_t *this)
*/
close(priv->sock);
priv->sock = -1;
- ++(priv->socket_gen);
+ /*
+ * Closing the socket forces an error that will wake
+ * up the polling thread. Wait for it to notice and
+ * respond.
+ */
+ if (priv->ot_state == OT_ALIVE) {
+ priv->ot_state = OT_DYING;
+ pthread_cond_wait(&priv->ot_event,&priv->lock);
+ }
}
+ else if (priv->use_ssl) {
+ ssl_teardown_connection(priv);
+ }
}
out:
@@ -2226,7 +2251,6 @@ socket_poller (void *ctx)
struct pollfd pfd[2] = {{0,},};
gf_boolean_t to_write = _gf_false;
int ret = 0;
- int orig_gen;
if (priv->use_ssl) {
if (ssl_setup_connection(this,priv->connected) < 0) {
@@ -2246,8 +2270,6 @@ socket_poller (void *ctx)
}
}
- orig_gen = ++(priv->socket_gen);
-
if (priv->connected == 0) {
THIS = this->xl;
ret = socket_connect_finish (this);
@@ -2265,11 +2287,6 @@ socket_poller (void *ctx)
}
for (;;) {
- if (priv->socket_gen != orig_gen) {
- gf_log(this->name,GF_LOG_DEBUG,
- "redundant poller exiting");
- return NULL;
- }
pthread_mutex_lock(&priv->lock);
to_write = !list_empty(&priv->ioq);
pthread_mutex_unlock(&priv->lock);
@@ -2339,13 +2356,57 @@ socket_poller (void *ctx)
err:
/* All (and only) I/O errors should come here. */
- __socket_disconnect (this);
+ pthread_mutex_lock(&priv->lock);
+ if (priv->ot_state == OT_ALIVE) {
+ /*
+ * We have to do this if we're here because of an error we
+ * detected ourselves, but need to avoid a recursive call
+ * if our death is the result of an external disconnect.
+ */
+ __socket_shutdown(this);
+ close(priv->sock);
+ priv->sock = -1;
+ }
+ if (priv->ssl_ssl) {
+ /*
+ * We're always responsible for this part, but only actually
+ * have to do it if we got far enough for ssl_ssl to be valid
+ * (i.e. errors in ssl_setup_connection don't count).
+ */
+ ssl_teardown_connection(priv);
+ }
+ priv->ot_state = OT_IDLE;
+ /*
+ * We expect there to be only one waiter, but if there do happen to
+ * be multiple it's probably better to unblock them than to let them
+ * hang. If there are none, this is a harmless no-op.
+ */
+ pthread_cond_broadcast(&priv->ot_event);
+ pthread_mutex_unlock(&priv->lock);
rpc_transport_notify (this->listener, RPC_TRANSPORT_DISCONNECT, this);
rpc_transport_unref (this);
return NULL;
}
+void
+socket_spawn (rpc_transport_t *this)
+{
+ socket_private_t *priv = this->private;
+
+ if (priv->ot_state == OT_ALIVE) {
+ gf_log (this->name, GF_LOG_WARNING,
+ "refusing to start redundant poller");
+ return;
+ }
+
+ priv->ot_state = OT_ALIVE;
+
+ if (pthread_create(&priv->thread,NULL,socket_poller,this) != 0) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "could not create poll thread");
+ }
+}
int
socket_server_event_handler (int fd, int idx, void *data,
@@ -2489,12 +2550,7 @@ socket_server_event_handler (int fd, int idx, void *data,
gf_log(this->name,GF_LOG_ERROR,
"could not create pipe");
}
- if (pthread_create(&new_priv->thread,
- NULL, socket_poller,
- new_trans) != 0) {
- gf_log(this->name,GF_LOG_ERROR,
- "could not create poll thread");
- }
+ socket_spawn(new_trans);
}
else {
new_priv->idx =
@@ -2752,11 +2808,7 @@ socket_connect (rpc_transport_t *this, int port)
}
this->listener = this;
- if (pthread_create(&priv->thread,NULL,
- socket_poller, this) != 0) {
- gf_log(this->name,GF_LOG_ERROR,
- "could not create poll thread");
- }
+ socket_spawn(this);
}
else {
priv->idx = event_register (ctx->event_pool, priv->sock,
@@ -3468,6 +3520,11 @@ socket_init (rpc_transport_t *this)
SSL_CTX_set_verify(priv->ssl_ctx,SSL_VERIFY_PEER,0);
}
+ if (priv->own_thread) {
+ priv->ot_state = OT_IDLE;
+ pthread_cond_init (&priv->ot_event, NULL);
+ }
+
out:
this->private = priv;
return 0;
diff --git a/rpc/rpc-transport/socket/src/socket.h b/rpc/rpc-transport/socket/src/socket.h
index 78faad903..bb342d998 100644
--- a/rpc/rpc-transport/socket/src/socket.h
+++ b/rpc/rpc-transport/socket/src/socket.h
@@ -184,6 +184,12 @@ struct gf_sock_incoming {
char *ra_buf;
};
+typedef enum {
+ OT_IDLE, /* Uninitialized or termination complete. */
+ OT_ALIVE, /* Past pthread_create, no error/disconnect. */
+ OT_DYING, /* Disconnect in progress. */
+} ot_state_t;
+
typedef struct {
int32_t sock;
int32_t idx;
@@ -222,7 +228,8 @@ typedef struct {
pthread_t thread;
int pipe[2];
gf_boolean_t own_thread;
- volatile int socket_gen;
+ ot_state_t ot_state;
+ pthread_cond_t ot_event;
} socket_private_t;