diff options
| -rw-r--r-- | rpc/rpc-transport/socket/src/socket.c | 90 | ||||
| -rw-r--r-- | rpc/rpc-transport/socket/src/socket.h | 3 | 
2 files changed, 58 insertions, 35 deletions
diff --git a/rpc/rpc-transport/socket/src/socket.c b/rpc/rpc-transport/socket/src/socket.c index 6261e564f91..820683d2e8c 100644 --- a/rpc/rpc-transport/socket/src/socket.c +++ b/rpc/rpc-transport/socket/src/socket.c @@ -1185,7 +1185,8 @@ socket_event_poll_err (rpc_transport_t *this, int gen, int idx)          priv = this->private; -        pthread_mutex_lock (&priv->lock); +        pthread_mutex_lock (&priv->in_lock); +        pthread_mutex_lock (&priv->out_lock);          {                  if ((priv->gen == gen) && (priv->idx == idx)                      && (priv->sock != -1)) { @@ -1194,7 +1195,8 @@ socket_event_poll_err (rpc_transport_t *this, int gen, int idx)                          socket_closed = _gf_true;                  }          } -        pthread_mutex_unlock (&priv->lock); +        pthread_mutex_unlock (&priv->out_lock); +        pthread_mutex_unlock (&priv->in_lock);          if (socket_closed) {                  pthread_mutex_lock (&priv->notify.lock); @@ -1224,7 +1226,7 @@ socket_event_poll_out (rpc_transport_t *this)          priv = this->private; -        pthread_mutex_lock (&priv->lock); +        pthread_mutex_lock (&priv->out_lock);          {                  if (priv->connected == 1) {                          ret = __socket_ioq_churn (this); @@ -1237,7 +1239,7 @@ socket_event_poll_out (rpc_transport_t *this)                          }                  }          } -        pthread_mutex_unlock (&priv->lock); +        pthread_mutex_unlock (&priv->out_lock);          if (ret == 0)                  ret = rpc_transport_notify (this, RPC_TRANSPORT_MSG_SENT, NULL); @@ -1934,13 +1936,13 @@ __socket_read_reply (rpc_transport_t *this)                   * and priv->lock, since we are doing an upcall here.                   */                  frag->state = SP_STATE_NOTIFYING_XID; -                pthread_mutex_unlock (&priv->lock); +                pthread_mutex_unlock (&priv->in_lock);                  {                          ret = rpc_transport_notify (this,                                                      RPC_TRANSPORT_MAP_XID_REQUEST,                                                      in->request_info);                  } -                pthread_mutex_lock (&priv->lock); +                pthread_mutex_lock (&priv->in_lock);                  /* Transition back to externally visible state. */                  frag->state = SP_STATE_READ_MSGTYPE; @@ -2275,11 +2277,11 @@ socket_proto_state_machine (rpc_transport_t *this,          priv = this->private; -        pthread_mutex_lock (&priv->lock); +        pthread_mutex_lock (&priv->in_lock);          {                  ret = __socket_proto_state_machine (this, pollin);          } -        pthread_mutex_unlock (&priv->lock); +        pthread_mutex_unlock (&priv->in_lock);  out:          return ret; @@ -2350,7 +2352,8 @@ socket_connect_finish (rpc_transport_t *this)          priv = this->private; -        pthread_mutex_lock (&priv->lock); +        pthread_mutex_lock (&priv->in_lock); +        pthread_mutex_lock (&priv->out_lock);          {                  if (priv->connected != 0)                          goto unlock; @@ -2400,7 +2403,8 @@ socket_connect_finish (rpc_transport_t *this)                  }          }  unlock: -        pthread_mutex_unlock (&priv->lock); +        pthread_mutex_unlock (&priv->out_lock); +        pthread_mutex_unlock (&priv->in_lock);          if (notify_rpc) {                  rpc_transport_notify (this, event, this); @@ -2432,12 +2436,14 @@ socket_event_handler (int fd, int idx, int gen, void *data,          priv = this->private;          ctx = this->ctx; -        pthread_mutex_lock (&priv->lock); +        pthread_mutex_lock (&priv->in_lock); +        pthread_mutex_lock (&priv->out_lock);          {                  priv->idx = idx;                  priv->gen = gen;          } -        pthread_mutex_unlock (&priv->lock); +        pthread_mutex_unlock (&priv->out_lock); +        pthread_mutex_unlock (&priv->in_lock);          if (priv->connected != 1) {                  if (priv->connect_failed) { @@ -2556,9 +2562,9 @@ socket_poller (void *ctx)          gen = priv->ot_gen;  	for (;;) { -		pthread_mutex_lock(&priv->lock); +		pthread_mutex_lock(&priv->out_lock);  		to_write = !list_empty(&priv->ioq); -		pthread_mutex_unlock(&priv->lock); +		pthread_mutex_unlock(&priv->out_lock);  		pfd[0].fd = priv->pipe[0];  		pfd[0].events = POLL_MASK_ERROR;  		pfd[0].revents = 0; @@ -2652,7 +2658,8 @@ socket_poller (void *ctx)  err:  	/* All (and only) I/O errors should come here. */ -        pthread_mutex_lock(&priv->lock); +        pthread_mutex_lock(&priv->in_lock); +        pthread_mutex_lock(&priv->out_lock);          {                  gf_log (this->name, GF_LOG_TRACE, "disconnecting socket");                  __socket_teardown_connection (this); @@ -2666,7 +2673,8 @@ err:                  priv->ot_state = OT_IDLE;          } -        pthread_mutex_unlock(&priv->lock); +        pthread_mutex_unlock(&priv->out_lock); +        pthread_mutex_unlock(&priv->in_lock);          rpc_transport_notify (this, RPC_TRANSPORT_DISCONNECT, this); @@ -3014,11 +3022,13 @@ socket_disconnect (rpc_transport_t *this, gf_boolean_t wait)                  pthread_mutex_unlock (&priv->cond_lock);          } -        pthread_mutex_lock (&priv->lock); +        pthread_mutex_lock (&priv->in_lock); +        pthread_mutex_lock (&priv->out_lock);          {                  ret = __socket_disconnect (this);          } -        pthread_mutex_unlock (&priv->lock); +        pthread_mutex_unlock (&priv->out_lock); +        pthread_mutex_unlock (&priv->in_lock);  out:          return ret; @@ -3118,7 +3128,8 @@ socket_connect (rpc_transport_t *this, int port)                  goto err;          } -        pthread_mutex_lock (&priv->lock); +        pthread_mutex_lock (&priv->in_lock); +        pthread_mutex_lock (&priv->out_lock);          {                  priv->own_thread_done = _gf_false;                  if (priv->sock != -1) { @@ -3395,7 +3406,8 @@ handler:  unlock:                  sock = priv->sock;          } -        pthread_mutex_unlock (&priv->lock); +        pthread_mutex_unlock (&priv->out_lock); +        pthread_mutex_unlock (&priv->in_lock);  err:          /* if sock != -1, then cleanup is done from the event handler */ @@ -3445,11 +3457,13 @@ socket_listen (rpc_transport_t *this)          myinfo = &this->myinfo;          ctx    = this->ctx; -        pthread_mutex_lock (&priv->lock); +        pthread_mutex_lock (&priv->in_lock); +        pthread_mutex_lock (&priv->out_lock);          {                  sock = priv->sock;          } -        pthread_mutex_unlock (&priv->lock); +        pthread_mutex_unlock (&priv->out_lock); +        pthread_mutex_unlock (&priv->in_lock);          if (sock != -1)  {                  gf_log_callingfn (this->name, GF_LOG_DEBUG, @@ -3463,7 +3477,8 @@ socket_listen (rpc_transport_t *this)                  return ret;          } -        pthread_mutex_lock (&priv->lock); +        pthread_mutex_lock (&priv->in_lock); +        pthread_mutex_lock (&priv->out_lock);          {                  if (priv->sock != -1) {                          gf_log (this->name, GF_LOG_DEBUG, @@ -3567,7 +3582,8 @@ socket_listen (rpc_transport_t *this)                  }          }  unlock: -        pthread_mutex_unlock (&priv->lock); +        pthread_mutex_unlock (&priv->out_lock); +        pthread_mutex_unlock (&priv->in_lock);  out:          return ret; @@ -3591,7 +3607,7 @@ socket_submit_request (rpc_transport_t *this, rpc_transport_req_t *req)          priv = this->private;          ctx  = this->ctx; -        pthread_mutex_lock (&priv->lock); +        pthread_mutex_lock (&priv->out_lock);          {                  if (priv->connected != 1) {                          if (!priv->submit_log && !priv->connect_finish_log) { @@ -3641,7 +3657,7 @@ socket_submit_request (rpc_transport_t *this, rpc_transport_req_t *req)                  }          }  unlock: -        pthread_mutex_unlock (&priv->lock); +        pthread_mutex_unlock (&priv->out_lock);  out:          return ret; @@ -3665,7 +3681,7 @@ socket_submit_reply (rpc_transport_t *this, rpc_transport_reply_t *reply)          priv = this->private;          ctx  = this->ctx; -        pthread_mutex_lock (&priv->lock); +        pthread_mutex_lock (&priv->out_lock);          {                  if (priv->connected != 1) {                          if (!priv->submit_log && !priv->connect_finish_log) { @@ -3715,7 +3731,7 @@ socket_submit_reply (rpc_transport_t *this, rpc_transport_reply_t *reply)                  }          }  unlock: -        pthread_mutex_unlock (&priv->lock); +        pthread_mutex_unlock (&priv->out_lock);  out:          return ret; @@ -3814,7 +3830,8 @@ socket_throttle (rpc_transport_t *this, gf_boolean_t onoff)             will never read() any more data until throttling             is turned off.          */ -        pthread_mutex_lock (&priv->lock); +        pthread_mutex_lock (&priv->in_lock); +        pthread_mutex_lock (&priv->out_lock);          {                  /* Throttling is useless on a disconnected transport. In fact, @@ -3828,7 +3845,8 @@ socket_throttle (rpc_transport_t *this, gf_boolean_t onoff)                                                       priv->idx, (int) !onoff,                                                       -1);          } -        pthread_mutex_unlock (&priv->lock); +        pthread_mutex_unlock (&priv->out_lock); +        pthread_mutex_unlock (&priv->in_lock);          return 0;  } @@ -4108,7 +4126,8 @@ socket_init (rpc_transport_t *this)          }          memset(priv,0,sizeof(*priv)); -        pthread_mutex_init (&priv->lock, NULL); +        pthread_mutex_init (&priv->in_lock, NULL); +        pthread_mutex_init (&priv->out_lock, NULL);          pthread_mutex_init (&priv->cond_lock, NULL);          pthread_cond_init (&priv->cond, NULL); @@ -4530,17 +4549,20 @@ fini (rpc_transport_t *this)          priv = this->private;          if (priv) {                  if (priv->sock != -1) { -                        pthread_mutex_lock (&priv->lock); +                        pthread_mutex_lock (&priv->in_lock); +                        pthread_mutex_lock (&priv->out_lock);                          {                                  __socket_ioq_flush (this);                                  __socket_reset (this);                          } -                        pthread_mutex_unlock (&priv->lock); +                        pthread_mutex_unlock (&priv->out_lock); +                        pthread_mutex_unlock (&priv->in_lock);                  }                  gf_log (this->name, GF_LOG_TRACE,                          "transport %p destroyed", this); -                pthread_mutex_destroy (&priv->lock); +                pthread_mutex_destroy (&priv->in_lock); +                pthread_mutex_destroy (&priv->out_lock);                  pthread_mutex_destroy (&priv->cond_lock);                  pthread_cond_destroy (&priv->cond);  		if (priv->ssl_private_key) { diff --git a/rpc/rpc-transport/socket/src/socket.h b/rpc/rpc-transport/socket/src/socket.h index e299a3d7bd5..59110b5043a 100644 --- a/rpc/rpc-transport/socket/src/socket.h +++ b/rpc/rpc-transport/socket/src/socket.h @@ -220,7 +220,8 @@ typedef struct {                  };          };          struct gf_sock_incoming incoming; -        pthread_mutex_t        lock; +        pthread_mutex_t        in_lock; +        pthread_mutex_t        out_lock;          pthread_mutex_t        cond_lock;          pthread_cond_t         cond;          int                    windowsize;  | 
