diff options
Diffstat (limited to 'rpc/rpc-transport/socket/src/socket.c')
| -rw-r--r-- | rpc/rpc-transport/socket/src/socket.c | 96 | 
1 files changed, 77 insertions, 19 deletions
diff --git a/rpc/rpc-transport/socket/src/socket.c b/rpc/rpc-transport/socket/src/socket.c index 8ba2692cdc6..e14152c5822 100644 --- a/rpc/rpc-transport/socket/src/socket.c +++ b/rpc/rpc-transport/socket/src/socket.c @@ -1172,11 +1172,11 @@ out:  } -static int -socket_event_poll_err (rpc_transport_t *this) +static gf_boolean_t +socket_event_poll_err (rpc_transport_t *this, int gen, int idx)  { -        socket_private_t *priv = NULL; -        int               ret = -1; +        socket_private_t *priv          = NULL; +        gf_boolean_t      socket_closed = _gf_false;          GF_VALIDATE_OR_GOTO ("socket", this, out);          GF_VALIDATE_OR_GOTO ("socket", this->private, out); @@ -1185,15 +1185,29 @@ socket_event_poll_err (rpc_transport_t *this)          pthread_mutex_lock (&priv->lock);          { -                __socket_ioq_flush (this); -                __socket_reset (this); +                if ((priv->gen == gen) && (priv->idx == idx) +                    && (priv->sock != -1)) { +                        __socket_ioq_flush (this); +                        __socket_reset (this); +                        socket_closed = _gf_true; +                }          }          pthread_mutex_unlock (&priv->lock); -        rpc_transport_notify (this, RPC_TRANSPORT_DISCONNECT, this); +        if (socket_closed) { +                pthread_mutex_lock (&priv->notify.lock); +                { +                        while (priv->notify.in_progress) +                                pthread_cond_wait (&priv->notify.cond, +                                                   &priv->notify.lock); +                } +                pthread_mutex_unlock (&priv->notify.lock); + +                rpc_transport_notify (this, RPC_TRANSPORT_DISCONNECT, this); +        }  out: -        return ret; +        return socket_closed;  } @@ -2271,22 +2285,50 @@ out:  static int -socket_event_poll_in (rpc_transport_t *this) +socket_event_poll_in (rpc_transport_t *this, gf_boolean_t notify_handled)  {          int                     ret    = -1;          rpc_transport_pollin_t *pollin = NULL;          socket_private_t       *priv = this->private; +        glusterfs_ctx_t        *ctx  = NULL; + +        ctx = this->ctx;  	ret = socket_proto_state_machine (this, &pollin); +        if (pollin) { +                pthread_mutex_lock (&priv->notify.lock); +                { +                        priv->notify.in_progress++; +                } +                pthread_mutex_unlock (&priv->notify.lock); +        } + + +        if (notify_handled && (ret != -1)) +                event_handled (ctx->event_pool, priv->sock, priv->idx, +                               priv->gen); +  	if (pollin) {                  priv->ot_state = OT_CALLBACK; +                  ret = rpc_transport_notify (this, RPC_TRANSPORT_MSG_RECEIVED,                                              pollin); +                  if (priv->ot_state == OT_CALLBACK) {                          priv->ot_state = OT_RUNNING;                  } +                  rpc_transport_pollin_destroy (pollin); + +                pthread_mutex_lock (&priv->notify.lock); +                { +                        --priv->notify.in_progress; + +                        if (!priv->notify.in_progress) +                                pthread_cond_signal (&priv->notify.cond); +                } +                pthread_mutex_unlock (&priv->notify.lock);          }          return ret; @@ -2369,24 +2411,29 @@ static int socket_disconnect (rpc_transport_t *this, gf_boolean_t wait);  /* reads rpc_requests during pollin */  static int -socket_event_handler (int fd, int idx, void *data, +socket_event_handler (int fd, int idx, int gen, void *data,                        int poll_in, int poll_out, int poll_err)  { -        rpc_transport_t  *this = NULL; -        socket_private_t *priv = NULL; -	int               ret = -1; +        rpc_transport_t  *this          = NULL; +        socket_private_t *priv          = NULL; +	int               ret           = -1; +        glusterfs_ctx_t  *ctx           = NULL; +        gf_boolean_t      socket_closed = _gf_false, notify_handled = _gf_false;          this = data; +          GF_VALIDATE_OR_GOTO ("socket", this, out);          GF_VALIDATE_OR_GOTO ("socket", this->private, out);          GF_VALIDATE_OR_GOTO ("socket", this->xl, out);          THIS = this->xl;          priv = this->private; +        ctx = this->ctx;          pthread_mutex_lock (&priv->lock);          {                  priv->idx = idx; +                priv->gen = gen;          }          pthread_mutex_unlock (&priv->lock); @@ -2417,16 +2464,23 @@ socket_event_handler (int fd, int idx, void *data,          }          if (!ret && poll_in) { -                ret = socket_event_poll_in (this); +                ret = socket_event_poll_in (this, !poll_err); +                notify_handled = _gf_true;          }          if ((ret < 0) || poll_err) {                  /* Logging has happened already in earlier cases */                  gf_log ("transport", ((ret >= 0) ? GF_LOG_INFO : GF_LOG_DEBUG),                          "EPOLLERR - disconnecting now"); -                socket_event_poll_err (this); -                rpc_transport_unref (this); -	} + +                socket_closed = socket_event_poll_err (this, gen, idx); + +                if (socket_closed) +                        rpc_transport_unref (this); + +	} else if (!notify_handled) { +                event_handled (ctx->event_pool, fd, idx, gen); +        }  out:  	return ret; @@ -2533,7 +2587,7 @@ socket_poller (void *ctx)                  }  		if (pfd[1].revents & POLL_MASK_INPUT) { -			ret = socket_event_poll_in(this); +			ret = socket_event_poll_in(this, 0);  			if (ret >= 0) {  				/* Suppress errors while making progress. */  				pfd[1].revents &= ~POLL_MASK_ERROR; @@ -2657,7 +2711,7 @@ socket_spawn (rpc_transport_t *this)  }  static int -socket_server_event_handler (int fd, int idx, void *data, +socket_server_event_handler (int fd, int idx, int gen, void *data,                               int poll_in, int poll_out, int poll_err)  {          rpc_transport_t             *this = NULL; @@ -2913,6 +2967,8 @@ socket_server_event_handler (int fd, int idx, void *data,                  }          }  out: +        event_handled (ctx->event_pool, fd, idx, gen); +          if (cname && (cname != this->ssl_name)) {                  GF_FREE(cname);          } @@ -4024,6 +4080,8 @@ socket_init (rpc_transport_t *this)          priv->bio = 0;          priv->windowsize = GF_DEFAULT_SOCKET_WINDOW_SIZE;          INIT_LIST_HEAD (&priv->ioq); +        pthread_mutex_init (&priv->notify.lock, NULL); +        pthread_cond_init (&priv->notify.cond, NULL);          /* All the below section needs 'this->options' to be present */          if (!this->options)  | 
