diff options
Diffstat (limited to 'rpc')
| -rw-r--r-- | rpc/rpc-transport/socket/src/socket.c | 186 | 
1 files changed, 45 insertions, 141 deletions
diff --git a/rpc/rpc-transport/socket/src/socket.c b/rpc/rpc-transport/socket/src/socket.c index 209d89a225b..030a37961b6 100644 --- a/rpc/rpc-transport/socket/src/socket.c +++ b/rpc/rpc-transport/socket/src/socket.c @@ -187,117 +187,6 @@ struct socket_connect_error_state_ {  };  typedef struct socket_connect_error_state_ socket_connect_error_state_t; - -/* This timer and queue are used to reap dead threads. The timer triggers every - * minute and pthread_joins any threads that added themselves to the reap queue - * - * TODO: Make the timer configurable? (Not sure if required) - */ -static gf_timer_t *reap_timer; -static struct list_head reap_queue; -static pthread_mutex_t reap_lock = PTHREAD_MUTEX_INITIALIZER; -const struct timespec reap_ts = {60, 0}; - -struct tid_wrap { -        struct list_head list; -        pthread_t tid; -}; - -/* _socket_reap_own_threads iterated over the queue of tid's and pthread_joins - * them.  If a thread join fails, it logs the failure and continues - */ -static void -_socket_reap_own_threads() { -        struct tid_wrap *node = NULL; -        struct tid_wrap *tmp = NULL; -        pthread_t tid = 0; -        int i = 0; - -        list_for_each_entry_safe (node, tmp, &reap_queue, list) { -                list_del_init (&node->list); -                if (pthread_join (node->tid, NULL)) { -                        gf_log (THIS->name, GF_LOG_ERROR, -                                "own-thread: failed to join thread (tid: %zu)", -                                tid); -                } -                node->tid = 0; -                GF_FREE (node); -                node = NULL; -                i++; -        } - -        if (i) { -                gf_log (THIS->name, GF_LOG_TRACE, "reaped %d own-threads", i); -        } - -        return; -} - -/* socket_thread_reaper reaps threads and restarts the reap_timer - */ -static void -socket_thread_reaper () { - -        pthread_mutex_lock (&reap_lock); - -        gf_timer_call_cancel (THIS->ctx, reap_timer); -        reap_timer = 0; - -        _socket_reap_own_threads(); - -        reap_timer = gf_timer_call_after (THIS->ctx, reap_ts, -                                          socket_thread_reaper, NULL); -        if (!reap_timer) -                gf_log (THIS->name, GF_LOG_ERROR, -                        "failed to restart socket own-thread reap timer"); - -        pthread_mutex_unlock (&reap_lock); - -        return; -} - -/* socket_thread_reaper_init initializes reap_timer and reap_queue. - * Initializations are done only the first time this is called. - * - * To make sure that the reap_timer is always run, reaper_init it is better to - * call this whenever an own-thread is launched - */ -static void -socket_thread_reaper_init () { -        pthread_mutex_lock (&reap_lock); - -        if (reap_timer == NULL) { -                reap_timer = gf_timer_call_after (THIS->ctx, reap_ts, -                                                  socket_thread_reaper, NULL); -                INIT_LIST_HEAD (&reap_queue); -        } - -        pthread_mutex_unlock (&reap_lock); - -        return; -} - -/* socket_thread_reaper_add adds the given thread id to the queue of threads - * that will be reaped by socket_thread_reaper - * own-threads need to call this with their thread-ids before dying - */ -static int -socket_thread_reaper_add (pthread_t tid) { -        struct tid_wrap *node = NULL; - -        pthread_mutex_lock (&reap_lock); - -        node = GF_CALLOC (1, sizeof (*node), gf_sock_mt_tid_wrap); -        node->tid = tid; -        INIT_LIST_HEAD (&node->list); -        list_add_tail (&node->list, &reap_queue); - -        pthread_mutex_unlock (&reap_lock); - -        return 0; -} - -  static int socket_init (rpc_transport_t *this);  static void @@ -2640,10 +2529,6 @@ err:          pthread_mutex_unlock(&priv->lock);          rpc_transport_notify (this, RPC_TRANSPORT_DISCONNECT, this); -        /* Add the thread to the reap_queue before freeing up the transport and -         * dying -         */ -        socket_thread_reaper_add (priv->thread);          rpc_transport_unref (this); @@ -2651,11 +2536,11 @@ err:  } -static void +static int  socket_spawn (rpc_transport_t *this)  {          socket_private_t        *priv   = this->private; - +        int ret = -1;          switch (priv->ot_state) {          case OT_IDLE:          case OT_PLEASE_DIE: @@ -2663,7 +2548,7 @@ socket_spawn (rpc_transport_t *this)          default:                  gf_log (this->name, GF_LOG_WARNING,                          "refusing to start redundant poller"); -                return; +                return ret;          }          priv->ot_gen += 7; @@ -2671,12 +2556,16 @@ socket_spawn (rpc_transport_t *this)          gf_log (this->name, GF_LOG_TRACE,                  "spawning %p with gen %u", this, priv->ot_gen); -        if (gf_thread_create(&priv->thread,NULL,socket_poller,this) != 0) { +        /* Create thread after enable detach flag */ + +        ret = gf_thread_create_detached (&priv->thread, socket_poller, this); +        if (ret) {                  gf_log (this->name, GF_LOG_ERROR,                          "could not create poll thread"); +                ret = -1;          } -        /* start the reaper thread */ -        socket_thread_reaper_init(); + +        return ret;  }  static int @@ -2860,30 +2749,38 @@ socket_server_event_handler (int fd, int idx, void *data,                                  new_priv->is_server = _gf_true;                                  rpc_transport_ref (new_trans); -				if (new_priv->own_thread) { -					if (pipe(new_priv->pipe) < 0) { -						gf_log(this->name,GF_LOG_ERROR, -						       "could not create pipe"); -					} -                                        socket_spawn(new_trans); -				} -				else { -					new_priv->idx = -						event_register (ctx->event_pool, -								new_sock, -								socket_event_handler, -								new_trans, -								1, 0); -					if (new_priv->idx == -1) -						ret = -1; -				} +                                if (new_priv->own_thread) { +                                        if (pipe(new_priv->pipe) < 0) { +                                                gf_log(this->name, GF_LOG_ERROR, +                                                       "could not create pipe"); +                                        } +                                        ret = socket_spawn(new_trans); +                                        if (ret) { +                                                gf_log(this->name, GF_LOG_ERROR, +                                                       "could not spawn thread"); +                                                sys_close (new_priv->pipe[0]); +                                                sys_close (new_priv->pipe[1]); +                                        } +                                }  else { +                                        new_priv->idx = +                                                event_register (ctx->event_pool, +                                                                new_sock, +                                                           socket_event_handler, +                                                                new_trans, +                                                                1, 0); +                                        if (new_priv->idx == -1) { +                                                ret = -1; +                                                gf_log(this->name, GF_LOG_ERROR, +                                                       "failed to register the socket with event"); +                                        } +                                }                          }                          pthread_mutex_unlock (&new_priv->lock);                          if (ret == -1) { -                                gf_log (this->name, GF_LOG_WARNING, -                                        "failed to register the socket with event");                                  sys_close (new_sock); +                                GF_FREE (new_trans->name); +                                GF_FREE (new_trans);                                  rpc_transport_unref (new_trans);                                  goto unlock;                          } @@ -3200,7 +3097,14 @@ handler:                          }                          this->listener = this; -                        socket_spawn(this); +                        ret =  socket_spawn(this); +                        if (ret) { +                                gf_log(this->name, GF_LOG_ERROR, +                                       "could not spawn thread"); +                                sys_close(priv->pipe[0]); +                                sys_close(priv->pipe[1]); +                                priv->sock = -1; +                        }                  }                  else {                          priv->idx = event_register (ctx->event_pool, priv->sock,  | 
