summaryrefslogtreecommitdiffstats
path: root/rpc
diff options
context:
space:
mode:
Diffstat (limited to 'rpc')
-rw-r--r--rpc/rpc-lib/src/rpc-clnt-ping.c2
-rw-r--r--rpc/rpc-lib/src/rpc-clnt.c4
-rw-r--r--rpc/rpc-lib/src/rpc-transport.c5
-rw-r--r--rpc/rpc-lib/src/rpc-transport.h4
-rw-r--r--rpc/rpc-lib/src/rpcsvc.c2
-rw-r--r--rpc/rpc-transport/rdma/src/rdma.c19
-rw-r--r--rpc/rpc-transport/socket/src/socket.c98
-rw-r--r--rpc/rpc-transport/socket/src/socket.h6
8 files changed, 106 insertions, 34 deletions
diff --git a/rpc/rpc-lib/src/rpc-clnt-ping.c b/rpc/rpc-lib/src/rpc-clnt-ping.c
index 3eb7e90cb01..e042121ad47 100644
--- a/rpc/rpc-lib/src/rpc-clnt-ping.c
+++ b/rpc/rpc-lib/src/rpc-clnt-ping.c
@@ -159,7 +159,7 @@ rpc_clnt_ping_timer_expired (void *rpc_ptr)
trans->peerinfo.identifier,
conn->ping_timeout);
- rpc_transport_disconnect (conn->trans);
+ rpc_transport_disconnect (conn->trans, _gf_false);
}
out:
diff --git a/rpc/rpc-lib/src/rpc-clnt.c b/rpc/rpc-lib/src/rpc-clnt.c
index b868f56bdb3..d39b5236b91 100644
--- a/rpc/rpc-lib/src/rpc-clnt.c
+++ b/rpc/rpc-lib/src/rpc-clnt.c
@@ -1854,7 +1854,7 @@ rpc_clnt_disable (struct rpc_clnt *rpc)
pthread_mutex_unlock (&conn->lock);
if (trans) {
- rpc_transport_disconnect (trans);
+ rpc_transport_disconnect (trans, _gf_true);
}
if (unref)
@@ -1913,7 +1913,7 @@ rpc_clnt_disconnect (struct rpc_clnt *rpc)
pthread_mutex_unlock (&conn->lock);
if (trans) {
- rpc_transport_disconnect (trans);
+ rpc_transport_disconnect (trans, _gf_true);
}
if (unref)
rpc_clnt_unref (rpc);
diff --git a/rpc/rpc-lib/src/rpc-transport.c b/rpc/rpc-lib/src/rpc-transport.c
index 6ee5e15ede4..33e94450d9c 100644
--- a/rpc/rpc-lib/src/rpc-transport.c
+++ b/rpc/rpc-lib/src/rpc-transport.c
@@ -435,13 +435,14 @@ fail:
int32_t
-rpc_transport_disconnect (rpc_transport_t *this)
+rpc_transport_disconnect (rpc_transport_t *this, gf_boolean_t wait)
{
int32_t ret = -1;
GF_VALIDATE_OR_GOTO("rpc_transport", this, fail);
- ret = this->ops->disconnect (this);
+ ret = this->ops->disconnect (this, wait);
+
fail:
return ret;
}
diff --git a/rpc/rpc-lib/src/rpc-transport.h b/rpc/rpc-lib/src/rpc-transport.h
index 4e7a8c46fae..717c40af13a 100644
--- a/rpc/rpc-lib/src/rpc-transport.h
+++ b/rpc/rpc-lib/src/rpc-transport.h
@@ -224,7 +224,7 @@ struct rpc_transport_ops {
rpc_transport_reply_t *reply);
int32_t (*connect) (rpc_transport_t *this, int port);
int32_t (*listen) (rpc_transport_t *this);
- int32_t (*disconnect) (rpc_transport_t *this);
+ int32_t (*disconnect) (rpc_transport_t *this, gf_boolean_t wait);
int32_t (*get_peername) (rpc_transport_t *this, char *hostname,
int hostlen);
int32_t (*get_peeraddr) (rpc_transport_t *this, char *peeraddr,
@@ -248,7 +248,7 @@ int32_t
rpc_transport_connect (rpc_transport_t *this, int port);
int32_t
-rpc_transport_disconnect (rpc_transport_t *this);
+rpc_transport_disconnect (rpc_transport_t *this, gf_boolean_t wait);
int32_t
rpc_transport_destroy (rpc_transport_t *this);
diff --git a/rpc/rpc-lib/src/rpcsvc.c b/rpc/rpc-lib/src/rpcsvc.c
index c792909cb87..52b57205f6d 100644
--- a/rpc/rpc-lib/src/rpcsvc.c
+++ b/rpc/rpc-lib/src/rpcsvc.c
@@ -1654,7 +1654,7 @@ rpcsvc_create_listener (rpcsvc_t *svc, dict_t *options, char *name)
ret = 0;
out:
if (!listener && trans) {
- rpc_transport_disconnect (trans);
+ rpc_transport_disconnect (trans, _gf_true);
}
return ret;
diff --git a/rpc/rpc-transport/rdma/src/rdma.c b/rpc/rpc-transport/rdma/src/rdma.c
index 551ac072079..d2f04bd6d0c 100644
--- a/rpc/rpc-transport/rdma/src/rdma.c
+++ b/rpc/rpc-transport/rdma/src/rdma.c
@@ -51,7 +51,7 @@ static int32_t
gf_rdma_teardown (rpc_transport_t *this);
static int32_t
-gf_rdma_disconnect (rpc_transport_t *this);
+gf_rdma_disconnect (rpc_transport_t *this, gf_boolean_t wait);
static void
gf_rdma_cm_handle_disconnect (rpc_transport_t *this);
@@ -1209,7 +1209,7 @@ gf_rdma_cm_handle_connect_init (struct rdma_cm_event *event)
}
if (ret < 0) {
- gf_rdma_disconnect (this);
+ gf_rdma_disconnect (this, _gf_false);
}
return ret;
@@ -3014,7 +3014,7 @@ gf_rdma_submit_request (rpc_transport_t *this, rpc_transport_req_t *req)
RDMA_MSG_WRITE_PEER_FAILED,
"sending request to peer (%s) failed",
this->peerinfo.identifier);
- rpc_transport_disconnect (this);
+ rpc_transport_disconnect (this, _gf_false);
}
out:
@@ -3051,7 +3051,7 @@ gf_rdma_submit_reply (rpc_transport_t *this, rpc_transport_reply_t *reply)
RDMA_MSG_WRITE_PEER_FAILED,
"sending request to peer (%s) failed",
this->peerinfo.identifier);
- rpc_transport_disconnect (this);
+ rpc_transport_disconnect (this, _gf_false);
}
out:
@@ -4095,7 +4095,7 @@ gf_rdma_process_recv (gf_rdma_peer_t *peer, struct ibv_wc *wc)
out:
if (ret == -1) {
- rpc_transport_disconnect (peer->trans);
+ rpc_transport_disconnect (peer->trans, _gf_false);
}
return;
@@ -4216,7 +4216,8 @@ gf_rdma_recv_completion_proc (void *data)
if (peer) {
ibv_ack_cq_events (event_cq, num_wr);
rpc_transport_unref (peer->trans);
- rpc_transport_disconnect (peer->trans);
+ rpc_transport_disconnect (peer->trans,
+ _gf_false);
}
if (post) {
@@ -4292,7 +4293,7 @@ gf_rdma_handle_failed_send_completion (gf_rdma_peer_t *peer, struct ibv_wc *wc)
}
if (peer) {
- rpc_transport_disconnect (peer->trans);
+ rpc_transport_disconnect (peer->trans, _gf_false);
}
return;
@@ -4343,7 +4344,7 @@ gf_rdma_handle_successful_send_completion (gf_rdma_peer_t *peer,
ret = gf_rdma_pollin_notify (peer, post);
if ((ret == -1) && (peer != NULL)) {
- rpc_transport_disconnect (peer->trans);
+ rpc_transport_disconnect (peer->trans, _gf_false);
}
out:
@@ -4657,7 +4658,7 @@ gf_rdma_init (rpc_transport_t *this)
static int32_t
-gf_rdma_disconnect (rpc_transport_t *this)
+gf_rdma_disconnect (rpc_transport_t *this, gf_boolean_t wait)
{
gf_rdma_private_t *priv = NULL;
int32_t ret = 0;
diff --git a/rpc/rpc-transport/socket/src/socket.c b/rpc/rpc-transport/socket/src/socket.c
index 36388548937..d05dc4189aa 100644
--- a/rpc/rpc-transport/socket/src/socket.c
+++ b/rpc/rpc-transport/socket/src/socket.c
@@ -2344,7 +2344,7 @@ out:
return ret;
}
-static int socket_disconnect (rpc_transport_t *this);
+static int socket_disconnect (rpc_transport_t *this, gf_boolean_t wait);
/* reads rpc_requests during pollin */
static int
@@ -2375,7 +2375,7 @@ socket_event_handler (int fd, int idx, void *data,
EINPROGRESS or ENOENT, so nothing more to do, fail
reading/writing anything even if poll_in or poll_out
is set */
- ret = socket_disconnect (this);
+ ret = socket_disconnect (this, _gf_false);
/* Force ret to be -1, as we are officially done with
this socket */
@@ -2424,6 +2424,13 @@ socket_poller (void *ctx)
* conditionally
*/
THIS = this->xl;
+ GF_REF_GET (priv);
+
+ if (priv->ot_state == OT_PLEASE_DIE) {
+ gf_log (this->name, GF_LOG_DEBUG, "socket_poller is exiting "
+ "because socket state is OT_PLEASE_DIE");
+ goto err;
+ }
priv->ot_state = OT_RUNNING;
@@ -2494,6 +2501,13 @@ socket_poller (void *ctx)
break;
}
+ if (priv->ot_state == OT_PLEASE_DIE) {
+ gf_log (this->name, GF_LOG_DEBUG,
+ "OT_PLEASE_DIE on %p (exiting socket_poller)",
+ this);
+ break;
+ }
+
if (pfd[1].revents & POLL_MASK_INPUT) {
ret = socket_event_poll_in(this);
if (ret >= 0) {
@@ -2507,7 +2521,6 @@ socket_poller (void *ctx)
gf_log (this->name, GF_LOG_TRACE,
"OT_IDLE on %p (input request)",
this);
- priv->ot_state = OT_IDLE;
break;
}
}
@@ -2524,7 +2537,6 @@ socket_poller (void *ctx)
gf_log (this->name, GF_LOG_TRACE,
"OT_IDLE on %p (output request)",
this);
- priv->ot_state = OT_IDLE;
break;
}
}
@@ -2561,22 +2573,24 @@ socket_poller (void *ctx)
err:
/* All (and only) I/O errors should come here. */
pthread_mutex_lock(&priv->lock);
+ {
+ __socket_teardown_connection (this);
+ sys_close (priv->sock);
+ priv->sock = -1;
- __socket_teardown_connection (this);
- sys_close (priv->sock);
- priv->sock = -1;
-
- sys_close (priv->pipe[0]);
- sys_close (priv->pipe[1]);
- priv->pipe[0] = -1;
- priv->pipe[1] = -1;
-
- priv->ot_state = OT_IDLE;
+ sys_close (priv->pipe[0]);
+ sys_close (priv->pipe[1]);
+ priv->pipe[0] = -1;
+ priv->pipe[1] = -1;
+ priv->ot_state = OT_IDLE;
+ }
pthread_mutex_unlock(&priv->lock);
rpc_transport_notify (this, RPC_TRANSPORT_DISCONNECT, this);
+ GF_REF_PUT (priv);
+
rpc_transport_unref (this);
return NULL;
@@ -2848,16 +2862,39 @@ out:
static int
-socket_disconnect (rpc_transport_t *this)
+socket_disconnect (rpc_transport_t *this, gf_boolean_t wait)
{
- socket_private_t *priv = NULL;
- int ret = -1;
+ socket_private_t *priv = NULL;
+ int ret = -1;
+ char a_byte = 'r';
GF_VALIDATE_OR_GOTO ("socket", this, out);
GF_VALIDATE_OR_GOTO ("socket", this->private, out);
priv = this->private;
+ if (wait && priv->own_thread) {
+ pthread_mutex_lock (&priv->cond_lock);
+ {
+ GF_REF_PUT (priv);
+ /* Change the state to OT_PLEASE_DIE so that
+ * socket_poller can exit. */
+ priv->ot_state = OT_PLEASE_DIE;
+ /* Write something into the pipe so that poller
+ * thread can wake up.*/
+ if (sys_write (priv->pipe[1], &a_byte, 1) < 1) {
+ gf_log (this->name, GF_LOG_WARNING,
+ "write error on pipe");
+ }
+
+ /* Wait for socket_poller to exit */
+ if (!priv->own_thread_done)
+ pthread_cond_wait (&priv->cond,
+ &priv->cond_lock);
+ }
+ pthread_mutex_unlock (&priv->cond_lock);
+ }
+
pthread_mutex_lock (&priv->lock);
{
ret = __socket_disconnect (this);
@@ -2937,6 +2974,7 @@ socket_connect (rpc_transport_t *this, int port)
pthread_mutex_lock (&priv->lock);
{
+ priv->own_thread_done = _gf_false;
if (priv->sock != -1) {
gf_log_callingfn (this->name, GF_LOG_TRACE,
"connect () called on transport "
@@ -3805,6 +3843,26 @@ init_openssl_mt (void)
SSL_load_error_strings();
}
+void
+socket_poller_mayday (void *data)
+{
+ socket_private_t *priv = (socket_private_t *)data;
+
+ if (priv == NULL)
+ return;
+
+ pthread_mutex_lock (&priv->cond_lock);
+ {
+ /* Signal waiting threads before exiting from socket_poller */
+ if (!priv->own_thread_done) {
+ gf_log ("socket", GF_LOG_TRACE, "priv->cond SIGNALED");
+ pthread_cond_signal (&priv->cond);
+ priv->own_thread_done = _gf_true;
+ }
+ }
+ pthread_mutex_unlock (&priv->cond_lock);
+}
+
static int
socket_init (rpc_transport_t *this)
{
@@ -3835,6 +3893,10 @@ socket_init (rpc_transport_t *this)
memset(priv,0,sizeof(*priv));
pthread_mutex_init (&priv->lock, NULL);
+ pthread_mutex_init (&priv->cond_lock, NULL);
+ pthread_cond_init (&priv->cond, NULL);
+
+ GF_REF_INIT (priv, socket_poller_mayday);
priv->sock = -1;
priv->idx = -1;
@@ -4265,6 +4327,8 @@ fini (rpc_transport_t *this)
"transport %p destroyed", this);
pthread_mutex_destroy (&priv->lock);
+ pthread_mutex_destroy (&priv->cond_lock);
+ pthread_cond_destroy (&priv->cond);
if (priv->ssl_private_key) {
GF_FREE(priv->ssl_private_key);
}
diff --git a/rpc/rpc-transport/socket/src/socket.h b/rpc/rpc-transport/socket/src/socket.h
index 7c7005b59e7..8528bdeba8d 100644
--- a/rpc/rpc-transport/socket/src/socket.h
+++ b/rpc/rpc-transport/socket/src/socket.h
@@ -27,6 +27,7 @@
#include "dict.h"
#include "mem-pool.h"
#include "globals.h"
+#include "refcount.h"
#ifndef MAX_IOVEC
#define MAX_IOVEC 16
@@ -215,6 +216,8 @@ typedef struct {
};
struct gf_sock_incoming incoming;
pthread_mutex_t lock;
+ pthread_mutex_t cond_lock;
+ pthread_cond_t cond;
int windowsize;
char lowlat;
char nodelay;
@@ -239,10 +242,13 @@ typedef struct {
pthread_t thread;
int pipe[2];
gf_boolean_t own_thread;
+ gf_boolean_t own_thread_done;
ot_state_t ot_state;
uint32_t ot_gen;
gf_boolean_t is_server;
int log_ctr;
+ GF_REF_DECL; /* refcount to keep track of socket_poller
+ threads */
} socket_private_t;