diff options
Diffstat (limited to 'rpc/rpc-transport/socket/src')
| -rw-r--r-- | rpc/rpc-transport/socket/src/socket.c | 98 | ||||
| -rw-r--r-- | rpc/rpc-transport/socket/src/socket.h | 6 | 
2 files changed, 87 insertions, 17 deletions
diff --git a/rpc/rpc-transport/socket/src/socket.c b/rpc/rpc-transport/socket/src/socket.c index 36388548937..d05dc4189aa 100644 --- a/rpc/rpc-transport/socket/src/socket.c +++ b/rpc/rpc-transport/socket/src/socket.c @@ -2344,7 +2344,7 @@ out:          return ret;  } -static int socket_disconnect (rpc_transport_t *this); +static int socket_disconnect (rpc_transport_t *this, gf_boolean_t wait);  /* reads rpc_requests during pollin */  static int @@ -2375,7 +2375,7 @@ socket_event_handler (int fd, int idx, void *data,                          EINPROGRESS or ENOENT, so nothing more to do, fail                          reading/writing anything even if poll_in or poll_out                          is set */ -                        ret = socket_disconnect (this); +                        ret = socket_disconnect (this, _gf_false);                          /* Force ret to be -1, as we are officially done with                          this socket */ @@ -2424,6 +2424,13 @@ socket_poller (void *ctx)           * conditionally           */          THIS = this->xl; +        GF_REF_GET (priv); + +        if (priv->ot_state == OT_PLEASE_DIE) { +                gf_log (this->name, GF_LOG_DEBUG, "socket_poller is exiting " +                        "because socket state is OT_PLEASE_DIE"); +                goto err; +        }          priv->ot_state = OT_RUNNING; @@ -2494,6 +2501,13 @@ socket_poller (void *ctx)  			break;  		} +                if (priv->ot_state == OT_PLEASE_DIE) { +                        gf_log (this->name, GF_LOG_DEBUG, +                                "OT_PLEASE_DIE on %p (exiting socket_poller)", +                                this); +                        break; +                } +  		if (pfd[1].revents & POLL_MASK_INPUT) {  			ret = socket_event_poll_in(this);  			if (ret >= 0) { @@ -2507,7 +2521,6 @@ socket_poller (void *ctx)                                  gf_log (this->name, GF_LOG_TRACE,                                          "OT_IDLE on %p (input request)",                                          this); -                                priv->ot_state = OT_IDLE;                                  break;                          }  		} @@ -2524,7 +2537,6 @@ socket_poller (void *ctx)                                  gf_log (this->name, GF_LOG_TRACE,                                          "OT_IDLE on %p (output request)",                                          this); -                                priv->ot_state = OT_IDLE;                                  break;                          }  		} @@ -2561,22 +2573,24 @@ socket_poller (void *ctx)  err:  	/* All (and only) I/O errors should come here. */          pthread_mutex_lock(&priv->lock); +        { +                __socket_teardown_connection (this); +                sys_close (priv->sock); +                priv->sock = -1; -        __socket_teardown_connection (this); -        sys_close (priv->sock); -        priv->sock = -1; - -        sys_close (priv->pipe[0]); -        sys_close (priv->pipe[1]); -        priv->pipe[0] = -1; -        priv->pipe[1] = -1; - -        priv->ot_state = OT_IDLE; +                sys_close (priv->pipe[0]); +                sys_close (priv->pipe[1]); +                priv->pipe[0] = -1; +                priv->pipe[1] = -1; +                priv->ot_state = OT_IDLE; +        }          pthread_mutex_unlock(&priv->lock);          rpc_transport_notify (this, RPC_TRANSPORT_DISCONNECT, this); +        GF_REF_PUT (priv); +          rpc_transport_unref (this);  	return NULL; @@ -2848,16 +2862,39 @@ out:  static int -socket_disconnect (rpc_transport_t *this) +socket_disconnect (rpc_transport_t *this, gf_boolean_t wait)  { -        socket_private_t *priv = NULL; -        int               ret = -1; +        socket_private_t *priv   = NULL; +        int               ret    = -1; +        char              a_byte = 'r';          GF_VALIDATE_OR_GOTO ("socket", this, out);          GF_VALIDATE_OR_GOTO ("socket", this->private, out);          priv = this->private; +        if (wait && priv->own_thread) { +                pthread_mutex_lock (&priv->cond_lock); +                { +                        GF_REF_PUT (priv); +                        /* Change the state to OT_PLEASE_DIE so that +                         * socket_poller can exit. */ +                        priv->ot_state = OT_PLEASE_DIE; +                        /* Write something into the pipe so that poller +                         * thread can wake up.*/ +                        if (sys_write (priv->pipe[1], &a_byte, 1) < 1) { +                                gf_log (this->name, GF_LOG_WARNING, +                                                "write error on pipe"); +                        } + +                        /* Wait for socket_poller to exit */ +                        if (!priv->own_thread_done) +                                pthread_cond_wait (&priv->cond, +                                                   &priv->cond_lock); +                } +                pthread_mutex_unlock (&priv->cond_lock); +        } +          pthread_mutex_lock (&priv->lock);          {                  ret = __socket_disconnect (this); @@ -2937,6 +2974,7 @@ socket_connect (rpc_transport_t *this, int port)          pthread_mutex_lock (&priv->lock);          { +                priv->own_thread_done = _gf_false;                  if (priv->sock != -1) {                          gf_log_callingfn (this->name, GF_LOG_TRACE,                                            "connect () called on transport " @@ -3805,6 +3843,26 @@ init_openssl_mt (void)          SSL_load_error_strings();  } +void +socket_poller_mayday (void *data) +{ +        socket_private_t *priv = (socket_private_t *)data; + +        if (priv == NULL) +                return; + +        pthread_mutex_lock (&priv->cond_lock); +        { +                /* Signal waiting threads before exiting from socket_poller */ +                if (!priv->own_thread_done) { +                        gf_log ("socket", GF_LOG_TRACE, "priv->cond SIGNALED"); +                        pthread_cond_signal (&priv->cond); +                        priv->own_thread_done = _gf_true; +                } +        } +        pthread_mutex_unlock (&priv->cond_lock); +} +  static int  socket_init (rpc_transport_t *this)  { @@ -3835,6 +3893,10 @@ socket_init (rpc_transport_t *this)          memset(priv,0,sizeof(*priv));          pthread_mutex_init (&priv->lock, NULL); +        pthread_mutex_init (&priv->cond_lock, NULL); +        pthread_cond_init (&priv->cond, NULL); + +        GF_REF_INIT (priv, socket_poller_mayday);          priv->sock = -1;          priv->idx = -1; @@ -4265,6 +4327,8 @@ fini (rpc_transport_t *this)                          "transport %p destroyed", this);                  pthread_mutex_destroy (&priv->lock); +                pthread_mutex_destroy (&priv->cond_lock); +                pthread_cond_destroy (&priv->cond);  		if (priv->ssl_private_key) {  			GF_FREE(priv->ssl_private_key);  		} diff --git a/rpc/rpc-transport/socket/src/socket.h b/rpc/rpc-transport/socket/src/socket.h index 7c7005b59e7..8528bdeba8d 100644 --- a/rpc/rpc-transport/socket/src/socket.h +++ b/rpc/rpc-transport/socket/src/socket.h @@ -27,6 +27,7 @@  #include "dict.h"  #include "mem-pool.h"  #include "globals.h" +#include "refcount.h"  #ifndef MAX_IOVEC  #define MAX_IOVEC 16 @@ -215,6 +216,8 @@ typedef struct {          };          struct gf_sock_incoming incoming;          pthread_mutex_t        lock; +        pthread_mutex_t        cond_lock; +        pthread_cond_t         cond;          int                    windowsize;          char                   lowlat;          char                   nodelay; @@ -239,10 +242,13 @@ typedef struct {  	pthread_t              thread;  	int                    pipe[2];  	gf_boolean_t           own_thread; +        gf_boolean_t           own_thread_done;          ot_state_t             ot_state;          uint32_t               ot_gen;          gf_boolean_t           is_server;          int                    log_ctr; +        GF_REF_DECL;           /* refcount to keep track of socket_poller +                                  threads */  } socket_private_t;  | 
