summaryrefslogtreecommitdiffstats
path: root/rpc
diff options
context:
space:
mode:
Diffstat (limited to 'rpc')
-rw-r--r--rpc/rpc-transport/socket/src/socket.c90
-rw-r--r--rpc/rpc-transport/socket/src/socket.h3
2 files changed, 58 insertions, 35 deletions
diff --git a/rpc/rpc-transport/socket/src/socket.c b/rpc/rpc-transport/socket/src/socket.c
index 6261e564f91..820683d2e8c 100644
--- a/rpc/rpc-transport/socket/src/socket.c
+++ b/rpc/rpc-transport/socket/src/socket.c
@@ -1185,7 +1185,8 @@ socket_event_poll_err (rpc_transport_t *this, int gen, int idx)
priv = this->private;
- pthread_mutex_lock (&priv->lock);
+ pthread_mutex_lock (&priv->in_lock);
+ pthread_mutex_lock (&priv->out_lock);
{
if ((priv->gen == gen) && (priv->idx == idx)
&& (priv->sock != -1)) {
@@ -1194,7 +1195,8 @@ socket_event_poll_err (rpc_transport_t *this, int gen, int idx)
socket_closed = _gf_true;
}
}
- pthread_mutex_unlock (&priv->lock);
+ pthread_mutex_unlock (&priv->out_lock);
+ pthread_mutex_unlock (&priv->in_lock);
if (socket_closed) {
pthread_mutex_lock (&priv->notify.lock);
@@ -1224,7 +1226,7 @@ socket_event_poll_out (rpc_transport_t *this)
priv = this->private;
- pthread_mutex_lock (&priv->lock);
+ pthread_mutex_lock (&priv->out_lock);
{
if (priv->connected == 1) {
ret = __socket_ioq_churn (this);
@@ -1237,7 +1239,7 @@ socket_event_poll_out (rpc_transport_t *this)
}
}
}
- pthread_mutex_unlock (&priv->lock);
+ pthread_mutex_unlock (&priv->out_lock);
if (ret == 0)
ret = rpc_transport_notify (this, RPC_TRANSPORT_MSG_SENT, NULL);
@@ -1934,13 +1936,13 @@ __socket_read_reply (rpc_transport_t *this)
* and priv->lock, since we are doing an upcall here.
*/
frag->state = SP_STATE_NOTIFYING_XID;
- pthread_mutex_unlock (&priv->lock);
+ pthread_mutex_unlock (&priv->in_lock);
{
ret = rpc_transport_notify (this,
RPC_TRANSPORT_MAP_XID_REQUEST,
in->request_info);
}
- pthread_mutex_lock (&priv->lock);
+ pthread_mutex_lock (&priv->in_lock);
/* Transition back to externally visible state. */
frag->state = SP_STATE_READ_MSGTYPE;
@@ -2275,11 +2277,11 @@ socket_proto_state_machine (rpc_transport_t *this,
priv = this->private;
- pthread_mutex_lock (&priv->lock);
+ pthread_mutex_lock (&priv->in_lock);
{
ret = __socket_proto_state_machine (this, pollin);
}
- pthread_mutex_unlock (&priv->lock);
+ pthread_mutex_unlock (&priv->in_lock);
out:
return ret;
@@ -2350,7 +2352,8 @@ socket_connect_finish (rpc_transport_t *this)
priv = this->private;
- pthread_mutex_lock (&priv->lock);
+ pthread_mutex_lock (&priv->in_lock);
+ pthread_mutex_lock (&priv->out_lock);
{
if (priv->connected != 0)
goto unlock;
@@ -2400,7 +2403,8 @@ socket_connect_finish (rpc_transport_t *this)
}
}
unlock:
- pthread_mutex_unlock (&priv->lock);
+ pthread_mutex_unlock (&priv->out_lock);
+ pthread_mutex_unlock (&priv->in_lock);
if (notify_rpc) {
rpc_transport_notify (this, event, this);
@@ -2432,12 +2436,14 @@ socket_event_handler (int fd, int idx, int gen, void *data,
priv = this->private;
ctx = this->ctx;
- pthread_mutex_lock (&priv->lock);
+ pthread_mutex_lock (&priv->in_lock);
+ pthread_mutex_lock (&priv->out_lock);
{
priv->idx = idx;
priv->gen = gen;
}
- pthread_mutex_unlock (&priv->lock);
+ pthread_mutex_unlock (&priv->out_lock);
+ pthread_mutex_unlock (&priv->in_lock);
if (priv->connected != 1) {
if (priv->connect_failed) {
@@ -2556,9 +2562,9 @@ socket_poller (void *ctx)
gen = priv->ot_gen;
for (;;) {
- pthread_mutex_lock(&priv->lock);
+ pthread_mutex_lock(&priv->out_lock);
to_write = !list_empty(&priv->ioq);
- pthread_mutex_unlock(&priv->lock);
+ pthread_mutex_unlock(&priv->out_lock);
pfd[0].fd = priv->pipe[0];
pfd[0].events = POLL_MASK_ERROR;
pfd[0].revents = 0;
@@ -2652,7 +2658,8 @@ socket_poller (void *ctx)
err:
/* All (and only) I/O errors should come here. */
- pthread_mutex_lock(&priv->lock);
+ pthread_mutex_lock(&priv->in_lock);
+ pthread_mutex_lock(&priv->out_lock);
{
gf_log (this->name, GF_LOG_TRACE, "disconnecting socket");
__socket_teardown_connection (this);
@@ -2666,7 +2673,8 @@ err:
priv->ot_state = OT_IDLE;
}
- pthread_mutex_unlock(&priv->lock);
+ pthread_mutex_unlock(&priv->out_lock);
+ pthread_mutex_unlock(&priv->in_lock);
rpc_transport_notify (this, RPC_TRANSPORT_DISCONNECT, this);
@@ -3014,11 +3022,13 @@ socket_disconnect (rpc_transport_t *this, gf_boolean_t wait)
pthread_mutex_unlock (&priv->cond_lock);
}
- pthread_mutex_lock (&priv->lock);
+ pthread_mutex_lock (&priv->in_lock);
+ pthread_mutex_lock (&priv->out_lock);
{
ret = __socket_disconnect (this);
}
- pthread_mutex_unlock (&priv->lock);
+ pthread_mutex_unlock (&priv->out_lock);
+ pthread_mutex_unlock (&priv->in_lock);
out:
return ret;
@@ -3118,7 +3128,8 @@ socket_connect (rpc_transport_t *this, int port)
goto err;
}
- pthread_mutex_lock (&priv->lock);
+ pthread_mutex_lock (&priv->in_lock);
+ pthread_mutex_lock (&priv->out_lock);
{
priv->own_thread_done = _gf_false;
if (priv->sock != -1) {
@@ -3395,7 +3406,8 @@ handler:
unlock:
sock = priv->sock;
}
- pthread_mutex_unlock (&priv->lock);
+ pthread_mutex_unlock (&priv->out_lock);
+ pthread_mutex_unlock (&priv->in_lock);
err:
/* if sock != -1, then cleanup is done from the event handler */
@@ -3445,11 +3457,13 @@ socket_listen (rpc_transport_t *this)
myinfo = &this->myinfo;
ctx = this->ctx;
- pthread_mutex_lock (&priv->lock);
+ pthread_mutex_lock (&priv->in_lock);
+ pthread_mutex_lock (&priv->out_lock);
{
sock = priv->sock;
}
- pthread_mutex_unlock (&priv->lock);
+ pthread_mutex_unlock (&priv->out_lock);
+ pthread_mutex_unlock (&priv->in_lock);
if (sock != -1) {
gf_log_callingfn (this->name, GF_LOG_DEBUG,
@@ -3463,7 +3477,8 @@ socket_listen (rpc_transport_t *this)
return ret;
}
- pthread_mutex_lock (&priv->lock);
+ pthread_mutex_lock (&priv->in_lock);
+ pthread_mutex_lock (&priv->out_lock);
{
if (priv->sock != -1) {
gf_log (this->name, GF_LOG_DEBUG,
@@ -3567,7 +3582,8 @@ socket_listen (rpc_transport_t *this)
}
}
unlock:
- pthread_mutex_unlock (&priv->lock);
+ pthread_mutex_unlock (&priv->out_lock);
+ pthread_mutex_unlock (&priv->in_lock);
out:
return ret;
@@ -3591,7 +3607,7 @@ socket_submit_request (rpc_transport_t *this, rpc_transport_req_t *req)
priv = this->private;
ctx = this->ctx;
- pthread_mutex_lock (&priv->lock);
+ pthread_mutex_lock (&priv->out_lock);
{
if (priv->connected != 1) {
if (!priv->submit_log && !priv->connect_finish_log) {
@@ -3641,7 +3657,7 @@ socket_submit_request (rpc_transport_t *this, rpc_transport_req_t *req)
}
}
unlock:
- pthread_mutex_unlock (&priv->lock);
+ pthread_mutex_unlock (&priv->out_lock);
out:
return ret;
@@ -3665,7 +3681,7 @@ socket_submit_reply (rpc_transport_t *this, rpc_transport_reply_t *reply)
priv = this->private;
ctx = this->ctx;
- pthread_mutex_lock (&priv->lock);
+ pthread_mutex_lock (&priv->out_lock);
{
if (priv->connected != 1) {
if (!priv->submit_log && !priv->connect_finish_log) {
@@ -3715,7 +3731,7 @@ socket_submit_reply (rpc_transport_t *this, rpc_transport_reply_t *reply)
}
}
unlock:
- pthread_mutex_unlock (&priv->lock);
+ pthread_mutex_unlock (&priv->out_lock);
out:
return ret;
@@ -3814,7 +3830,8 @@ socket_throttle (rpc_transport_t *this, gf_boolean_t onoff)
will never read() any more data until throttling
is turned off.
*/
- pthread_mutex_lock (&priv->lock);
+ pthread_mutex_lock (&priv->in_lock);
+ pthread_mutex_lock (&priv->out_lock);
{
/* Throttling is useless on a disconnected transport. In fact,
@@ -3828,7 +3845,8 @@ socket_throttle (rpc_transport_t *this, gf_boolean_t onoff)
priv->idx, (int) !onoff,
-1);
}
- pthread_mutex_unlock (&priv->lock);
+ pthread_mutex_unlock (&priv->out_lock);
+ pthread_mutex_unlock (&priv->in_lock);
return 0;
}
@@ -4108,7 +4126,8 @@ socket_init (rpc_transport_t *this)
}
memset(priv,0,sizeof(*priv));
- pthread_mutex_init (&priv->lock, NULL);
+ pthread_mutex_init (&priv->in_lock, NULL);
+ pthread_mutex_init (&priv->out_lock, NULL);
pthread_mutex_init (&priv->cond_lock, NULL);
pthread_cond_init (&priv->cond, NULL);
@@ -4530,17 +4549,20 @@ fini (rpc_transport_t *this)
priv = this->private;
if (priv) {
if (priv->sock != -1) {
- pthread_mutex_lock (&priv->lock);
+ pthread_mutex_lock (&priv->in_lock);
+ pthread_mutex_lock (&priv->out_lock);
{
__socket_ioq_flush (this);
__socket_reset (this);
}
- pthread_mutex_unlock (&priv->lock);
+ pthread_mutex_unlock (&priv->out_lock);
+ pthread_mutex_unlock (&priv->in_lock);
}
gf_log (this->name, GF_LOG_TRACE,
"transport %p destroyed", this);
- pthread_mutex_destroy (&priv->lock);
+ pthread_mutex_destroy (&priv->in_lock);
+ pthread_mutex_destroy (&priv->out_lock);
pthread_mutex_destroy (&priv->cond_lock);
pthread_cond_destroy (&priv->cond);
if (priv->ssl_private_key) {
diff --git a/rpc/rpc-transport/socket/src/socket.h b/rpc/rpc-transport/socket/src/socket.h
index e299a3d7bd5..59110b5043a 100644
--- a/rpc/rpc-transport/socket/src/socket.h
+++ b/rpc/rpc-transport/socket/src/socket.h
@@ -220,7 +220,8 @@ typedef struct {
};
};
struct gf_sock_incoming incoming;
- pthread_mutex_t lock;
+ pthread_mutex_t in_lock;
+ pthread_mutex_t out_lock;
pthread_mutex_t cond_lock;
pthread_cond_t cond;
int windowsize;