summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--libglusterfs/src/common-utils.c30
-rw-r--r--libglusterfs/src/common-utils.h3
-rw-r--r--libglusterfs/src/libglusterfs-messages.h10
-rw-r--r--rpc/rpc-transport/socket/src/socket.c186
4 files changed, 87 insertions, 142 deletions
diff --git a/libglusterfs/src/common-utils.c b/libglusterfs/src/common-utils.c
index a225c957f2e..97d9ef8da1a 100644
--- a/libglusterfs/src/common-utils.c
+++ b/libglusterfs/src/common-utils.c
@@ -3555,6 +3555,36 @@ gf_thread_create (pthread_t *thread, const pthread_attr_t *attr,
}
int
+gf_thread_create_detached (pthread_t *thread,
+ void *(*start_routine)(void *), void *arg)
+{
+ pthread_attr_t attr;
+ int ret = -1;
+
+ ret = pthread_attr_init (&attr);
+ if (ret) {
+ gf_msg (THIS->name, GF_LOG_ERROR, ret,
+ LG_MSG_PTHREAD_ATTR_INIT_FAILED,
+ "Thread attribute initialization failed");
+ return -1;
+ }
+
+ pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
+
+ ret = gf_thread_create (thread, &attr, start_routine, arg);
+ if (ret) {
+ gf_msg (THIS->name, GF_LOG_ERROR, ret,
+ LG_MSG_PTHREAD_FAILED,
+ "Thread creation failed");
+ ret = -1;
+ }
+
+ pthread_attr_destroy (&attr);
+
+ return ret;
+}
+
+int
gf_skip_header_section (int fd, int header_len)
{
int ret = -1;
diff --git a/libglusterfs/src/common-utils.h b/libglusterfs/src/common-utils.h
index 09d585ad9c3..4741d430a03 100644
--- a/libglusterfs/src/common-utils.h
+++ b/libglusterfs/src/common-utils.h
@@ -766,6 +766,9 @@ int gf_set_timestamp (const char *src, const char* dest);
int gf_thread_create (pthread_t *thread, const pthread_attr_t *attr,
void *(*start_routine)(void *), void *arg);
+int gf_thread_create_detached (pthread_t *thread,
+ void *(*start_routine)(void *), void *arg);
+
gf_boolean_t
gf_is_service_running (char *pidfile, int *pid);
int
diff --git a/libglusterfs/src/libglusterfs-messages.h b/libglusterfs/src/libglusterfs-messages.h
index c0bcabac798..d18f4cb3112 100644
--- a/libglusterfs/src/libglusterfs-messages.h
+++ b/libglusterfs/src/libglusterfs-messages.h
@@ -36,7 +36,7 @@
*/
#define GLFS_LG_BASE GLFS_MSGID_COMP_LIBGLUSTERFS
-#define GLFS_LG_NUM_MESSAGES 205
+#define GLFS_LG_NUM_MESSAGES 206
#define GLFS_LG_MSGID_END (GLFS_LG_BASE + GLFS_LG_NUM_MESSAGES + 1)
/* Messaged with message IDs */
#define glfs_msg_start_lg GLFS_LG_BASE, "Invalid: Start of messages"
@@ -1754,6 +1754,14 @@
* @recommendedaction
*
*/
+#define LG_MSG_PTHREAD_ATTR_INIT_FAILED (GLFS_LG_BASE + 206)
+
+/*!
+ * @messageid
+ * @diagnosis
+ * @recommendedaction
+ *
+ */
/*------------*/
#define glfs_msg_end_lg GLFS_LG_MSGID_END, "Invalid: End of messages"
diff --git a/rpc/rpc-transport/socket/src/socket.c b/rpc/rpc-transport/socket/src/socket.c
index 209d89a225b..030a37961b6 100644
--- a/rpc/rpc-transport/socket/src/socket.c
+++ b/rpc/rpc-transport/socket/src/socket.c
@@ -187,117 +187,6 @@ struct socket_connect_error_state_ {
};
typedef struct socket_connect_error_state_ socket_connect_error_state_t;
-
-/* This timer and queue are used to reap dead threads. The timer triggers every
- * minute and pthread_joins any threads that added themselves to the reap queue
- *
- * TODO: Make the timer configurable? (Not sure if required)
- */
-static gf_timer_t *reap_timer;
-static struct list_head reap_queue;
-static pthread_mutex_t reap_lock = PTHREAD_MUTEX_INITIALIZER;
-const struct timespec reap_ts = {60, 0};
-
-struct tid_wrap {
- struct list_head list;
- pthread_t tid;
-};
-
-/* _socket_reap_own_threads iterated over the queue of tid's and pthread_joins
- * them. If a thread join fails, it logs the failure and continues
- */
-static void
-_socket_reap_own_threads() {
- struct tid_wrap *node = NULL;
- struct tid_wrap *tmp = NULL;
- pthread_t tid = 0;
- int i = 0;
-
- list_for_each_entry_safe (node, tmp, &reap_queue, list) {
- list_del_init (&node->list);
- if (pthread_join (node->tid, NULL)) {
- gf_log (THIS->name, GF_LOG_ERROR,
- "own-thread: failed to join thread (tid: %zu)",
- tid);
- }
- node->tid = 0;
- GF_FREE (node);
- node = NULL;
- i++;
- }
-
- if (i) {
- gf_log (THIS->name, GF_LOG_TRACE, "reaped %d own-threads", i);
- }
-
- return;
-}
-
-/* socket_thread_reaper reaps threads and restarts the reap_timer
- */
-static void
-socket_thread_reaper () {
-
- pthread_mutex_lock (&reap_lock);
-
- gf_timer_call_cancel (THIS->ctx, reap_timer);
- reap_timer = 0;
-
- _socket_reap_own_threads();
-
- reap_timer = gf_timer_call_after (THIS->ctx, reap_ts,
- socket_thread_reaper, NULL);
- if (!reap_timer)
- gf_log (THIS->name, GF_LOG_ERROR,
- "failed to restart socket own-thread reap timer");
-
- pthread_mutex_unlock (&reap_lock);
-
- return;
-}
-
-/* socket_thread_reaper_init initializes reap_timer and reap_queue.
- * Initializations are done only the first time this is called.
- *
- * To make sure that the reap_timer is always run, reaper_init it is better to
- * call this whenever an own-thread is launched
- */
-static void
-socket_thread_reaper_init () {
- pthread_mutex_lock (&reap_lock);
-
- if (reap_timer == NULL) {
- reap_timer = gf_timer_call_after (THIS->ctx, reap_ts,
- socket_thread_reaper, NULL);
- INIT_LIST_HEAD (&reap_queue);
- }
-
- pthread_mutex_unlock (&reap_lock);
-
- return;
-}
-
-/* socket_thread_reaper_add adds the given thread id to the queue of threads
- * that will be reaped by socket_thread_reaper
- * own-threads need to call this with their thread-ids before dying
- */
-static int
-socket_thread_reaper_add (pthread_t tid) {
- struct tid_wrap *node = NULL;
-
- pthread_mutex_lock (&reap_lock);
-
- node = GF_CALLOC (1, sizeof (*node), gf_sock_mt_tid_wrap);
- node->tid = tid;
- INIT_LIST_HEAD (&node->list);
- list_add_tail (&node->list, &reap_queue);
-
- pthread_mutex_unlock (&reap_lock);
-
- return 0;
-}
-
-
static int socket_init (rpc_transport_t *this);
static void
@@ -2640,10 +2529,6 @@ err:
pthread_mutex_unlock(&priv->lock);
rpc_transport_notify (this, RPC_TRANSPORT_DISCONNECT, this);
- /* Add the thread to the reap_queue before freeing up the transport and
- * dying
- */
- socket_thread_reaper_add (priv->thread);
rpc_transport_unref (this);
@@ -2651,11 +2536,11 @@ err:
}
-static void
+static int
socket_spawn (rpc_transport_t *this)
{
socket_private_t *priv = this->private;
-
+ int ret = -1;
switch (priv->ot_state) {
case OT_IDLE:
case OT_PLEASE_DIE:
@@ -2663,7 +2548,7 @@ socket_spawn (rpc_transport_t *this)
default:
gf_log (this->name, GF_LOG_WARNING,
"refusing to start redundant poller");
- return;
+ return ret;
}
priv->ot_gen += 7;
@@ -2671,12 +2556,16 @@ socket_spawn (rpc_transport_t *this)
gf_log (this->name, GF_LOG_TRACE,
"spawning %p with gen %u", this, priv->ot_gen);
- if (gf_thread_create(&priv->thread,NULL,socket_poller,this) != 0) {
+ /* Create thread after enable detach flag */
+
+ ret = gf_thread_create_detached (&priv->thread, socket_poller, this);
+ if (ret) {
gf_log (this->name, GF_LOG_ERROR,
"could not create poll thread");
+ ret = -1;
}
- /* start the reaper thread */
- socket_thread_reaper_init();
+
+ return ret;
}
static int
@@ -2860,30 +2749,38 @@ socket_server_event_handler (int fd, int idx, void *data,
new_priv->is_server = _gf_true;
rpc_transport_ref (new_trans);
- if (new_priv->own_thread) {
- if (pipe(new_priv->pipe) < 0) {
- gf_log(this->name,GF_LOG_ERROR,
- "could not create pipe");
- }
- socket_spawn(new_trans);
- }
- else {
- new_priv->idx =
- event_register (ctx->event_pool,
- new_sock,
- socket_event_handler,
- new_trans,
- 1, 0);
- if (new_priv->idx == -1)
- ret = -1;
- }
+ if (new_priv->own_thread) {
+ if (pipe(new_priv->pipe) < 0) {
+ gf_log(this->name, GF_LOG_ERROR,
+ "could not create pipe");
+ }
+ ret = socket_spawn(new_trans);
+ if (ret) {
+ gf_log(this->name, GF_LOG_ERROR,
+ "could not spawn thread");
+ sys_close (new_priv->pipe[0]);
+ sys_close (new_priv->pipe[1]);
+ }
+ } else {
+ new_priv->idx =
+ event_register (ctx->event_pool,
+ new_sock,
+ socket_event_handler,
+ new_trans,
+ 1, 0);
+ if (new_priv->idx == -1) {
+ ret = -1;
+ gf_log(this->name, GF_LOG_ERROR,
+ "failed to register the socket with event");
+ }
+ }
}
pthread_mutex_unlock (&new_priv->lock);
if (ret == -1) {
- gf_log (this->name, GF_LOG_WARNING,
- "failed to register the socket with event");
sys_close (new_sock);
+ GF_FREE (new_trans->name);
+ GF_FREE (new_trans);
rpc_transport_unref (new_trans);
goto unlock;
}
@@ -3200,7 +3097,14 @@ handler:
}
this->listener = this;
- socket_spawn(this);
+ ret = socket_spawn(this);
+ if (ret) {
+ gf_log(this->name, GF_LOG_ERROR,
+ "could not spawn thread");
+ sys_close(priv->pipe[0]);
+ sys_close(priv->pipe[1]);
+ priv->sock = -1;
+ }
}
else {
priv->idx = event_register (ctx->event_pool, priv->sock,