diff options
Diffstat (limited to 'rpc/rpc-transport/socket/src')
| -rw-r--r-- | rpc/rpc-transport/socket/src/socket.c | 99 | ||||
| -rw-r--r-- | rpc/rpc-transport/socket/src/socket.h | 9 | 
2 files changed, 69 insertions, 39 deletions
diff --git a/rpc/rpc-transport/socket/src/socket.c b/rpc/rpc-transport/socket/src/socket.c index 97c0dc57d..d33146ab5 100644 --- a/rpc/rpc-transport/socket/src/socket.c +++ b/rpc/rpc-transport/socket/src/socket.c @@ -629,25 +629,23 @@ __socket_disconnect (rpc_transport_t *this)          priv = this->private; +        gf_log (this->name, GF_LOG_TRACE, +                "disconnecting %p, state=%u gen=%u sock=%d", this, +                priv->ot_state, priv->ot_gen, priv->sock); +          if (priv->sock != -1) {                  ret = __socket_shutdown(this);  		if (priv->own_thread) { -			/* -			 * Without this, reconnect (= disconnect + connect) -			 * won't work except by accident. -			 */ -			close(priv->sock); -			priv->sock = -1;                          /* -                         * Closing the socket forces an error that will wake -                         * up the polling thread.  Wait for it to notice and -                         * respond. +                         * Without this, reconnect (= disconnect + connect) +                         * won't work except by accident.                           */ -                        if (priv->ot_state == OT_ALIVE) { -                                priv->ot_state = OT_DYING; -                                pthread_cond_wait(&priv->ot_event,&priv->lock); -                        } -		} +                        close(priv->sock); +                        priv->sock = -1; +                        gf_log (this->name, GF_LOG_TRACE, +                                "OT_PLEASE_DIE on %p", this); +                        priv->ot_state = OT_PLEASE_DIE; +                }                  else if (priv->use_ssl) {                          ssl_teardown_connection(priv);                  } @@ -1415,7 +1413,7 @@ __socket_read_request (rpc_transport_t *this)                  buf = rpc_procnum_addr (iobuf_ptr (in->iobuf));                  procnum = ntoh32 (*((uint32_t *)buf)); -                if (this->listener) { +                if (priv->is_server) {                          /* this check is needed as rpcsvc and rpc-clnt                           * actor structures are not same */                          vector_sizer = @@ -2112,12 +2110,17 @@ socket_event_poll_in (rpc_transport_t *this)  {          int                     ret    = -1;          rpc_transport_pollin_t *pollin = NULL; +        socket_private_t       *priv = this->private;          ret = socket_proto_state_machine (this, &pollin);          if (pollin != NULL) { +                priv->ot_state = OT_CALLBACK;                  ret = rpc_transport_notify (this, RPC_TRANSPORT_MSG_RECEIVED,                                              pollin); +                if (priv->ot_state == OT_CALLBACK) { +                        priv->ot_state = OT_RUNNING; +                }                  rpc_transport_pollin_destroy (pollin);          } @@ -2252,6 +2255,9 @@ socket_poller (void *ctx)  	struct pollfd     pfd[2] = {{0,},};  	gf_boolean_t      to_write = _gf_false;  	int               ret = 0; +        uint32_t          gen = 0; + +        priv->ot_state = OT_RUNNING;          if (priv->use_ssl) {                  if (ssl_setup_connection(this,priv->connected) < 0) { @@ -2287,6 +2293,7 @@ socket_poller (void *ctx)                          "asynchronous rpc_transport_notify failed");          } +        gen = priv->ot_gen;  	for (;;) {  		pthread_mutex_lock(&priv->lock);  		to_write = !list_empty(&priv->ioq); @@ -2323,6 +2330,13 @@ socket_poller (void *ctx)  			else if (errno == ENOTCONN) {  				ret = 0;  			} +                        if (priv->ot_state == OT_PLEASE_DIE) { +                                gf_log (this->name, GF_LOG_TRACE, +                                        "OT_IDLE on %p (input request)", +                                        this); +                                priv->ot_state = OT_IDLE; +                                break; +                        }  		}  		else if (pfd[1].revents & POLL_MASK_OUTPUT) {  			ret = socket_event_poll_out(this); @@ -2333,6 +2347,13 @@ socket_poller (void *ctx)  			else if (errno == ENOTCONN) {  				ret = 0;  			} +                        if (priv->ot_state == OT_PLEASE_DIE) { +                                gf_log (this->name, GF_LOG_TRACE, +                                        "OT_IDLE on %p (output request)", +                                        this); +                                priv->ot_state = OT_IDLE; +                                break; +                        }  		}  		else {  			/* @@ -2353,21 +2374,17 @@ socket_poller (void *ctx)  			       "error in polling loop");  			break;  		} +                if (priv->ot_gen != gen) { +                        gf_log (this->name, GF_LOG_TRACE, +                                "generation mismatch, my %u != %u", +                                gen, priv->ot_gen); +                        return NULL; +                }  	}  err:  	/* All (and only) I/O errors should come here. */          pthread_mutex_lock(&priv->lock); -        if (priv->ot_state == OT_ALIVE) { -                /* -                 * We have to do this if we're here because of an error we -                 * detected ourselves, but need to avoid a recursive call -                 * if our death is the result of an external disconnect. -                 */ -                __socket_shutdown(this); -                close(priv->sock); -                priv->sock = -1; -        }          if (priv->ssl_ssl) {                  /*                   * We're always responsible for this part, but only actually @@ -2376,16 +2393,14 @@ err:                   */                  ssl_teardown_connection(priv);          } +        __socket_shutdown(this); +        close(priv->sock); +        priv->sock = -1;          priv->ot_state = OT_IDLE; -        /* -         * We expect there to be only one waiter, but if there do happen to -         * be multiple it's probably better to unblock them than to let them -         * hang.  If there are none, this is a harmless no-op. -         */ -        pthread_cond_broadcast(&priv->ot_event);          pthread_mutex_unlock(&priv->lock); -        rpc_transport_notify (this->listener, RPC_TRANSPORT_DISCONNECT, this); -	rpc_transport_unref (this); +        rpc_transport_notify (this->listener, RPC_TRANSPORT_DISCONNECT, +                              this); +        rpc_transport_unref (this);  	return NULL;  } @@ -2395,13 +2410,20 @@ socket_spawn (rpc_transport_t *this)  {          socket_private_t        *priv   = this->private; -        if (priv->ot_state == OT_ALIVE) { +        switch (priv->ot_state) { +        case OT_IDLE: +        case OT_PLEASE_DIE: +                break; +        default:                  gf_log (this->name, GF_LOG_WARNING,                          "refusing to start redundant poller");                  return;          } -        priv->ot_state = OT_ALIVE; +        priv->ot_gen += 7; +        priv->ot_state = OT_SPAWNING; +        gf_log (this->name, GF_LOG_TRACE, +                "spawning %p with gen %u", this, priv->ot_gen);          if (pthread_create(&priv->thread,NULL,socket_poller,this) != 0) {                  gf_log (this->name, GF_LOG_ERROR, @@ -2554,6 +2576,7 @@ socket_server_event_handler (int fd, int idx, void *data,                                   * connection.                                   */                                  new_priv->connected = 1; +                                new_priv->is_server = _gf_true;                                  rpc_transport_ref (new_trans);  				if (new_priv->own_thread) { @@ -2657,6 +2680,10 @@ socket_connect (rpc_transport_t *this, int port)                  goto err;          } +        gf_log (this->name, GF_LOG_TRACE, +                "connecting %p, state=%u gen=%u sock=%d", this, +                priv->ot_state, priv->ot_gen, priv->sock); +          ret = socket_client_get_remote_sockaddr (this, &sock_union.sa,                                                   &sockaddr_len, &sa_family);          if (ret == -1) { @@ -2822,6 +2849,7 @@ socket_connect (rpc_transport_t *this, int port)                   * initializing a client connection.                   */                  priv->connected = 0; +                priv->is_server = _gf_false;                  rpc_transport_ref (this);  		if (priv->own_thread) { @@ -3545,7 +3573,6 @@ socket_init (rpc_transport_t *this)          if (priv->own_thread) {                  priv->ot_state = OT_IDLE; -                pthread_cond_init (&priv->ot_event, NULL);          }  out: diff --git a/rpc/rpc-transport/socket/src/socket.h b/rpc/rpc-transport/socket/src/socket.h index bb342d998..e0b412fcc 100644 --- a/rpc/rpc-transport/socket/src/socket.h +++ b/rpc/rpc-transport/socket/src/socket.h @@ -186,8 +186,10 @@ struct gf_sock_incoming {  typedef enum {          OT_IDLE,        /* Uninitialized or termination complete. */ -        OT_ALIVE,       /* Past pthread_create, no error/disconnect. */ -        OT_DYING,       /* Disconnect in progress. */ +        OT_SPAWNING,    /* Past pthread_create but not in thread yet. */ +        OT_RUNNING,     /* Poller thread running normally. */ +        OT_CALLBACK,    /* Poller thread in the middle of a callback. */ +        OT_PLEASE_DIE,  /* Poller termination requested. */  } ot_state_t;  typedef struct { @@ -229,7 +231,8 @@ typedef struct {  	int                    pipe[2];  	gf_boolean_t           own_thread;          ot_state_t             ot_state; -        pthread_cond_t         ot_event; +        uint32_t               ot_gen; +        gf_boolean_t           is_server;  } socket_private_t;  | 
