summaryrefslogtreecommitdiffstats
path: root/rpc
diff options
context:
space:
mode:
Diffstat (limited to 'rpc')
-rw-r--r--rpc/rpc-lib/src/rpc-clnt.c4
-rw-r--r--rpc/rpc-transport/socket/src/Makefile.am2
-rw-r--r--rpc/rpc-transport/socket/src/socket-mem-types.h21
-rw-r--r--rpc/rpc-transport/socket/src/socket.c236
4 files changed, 169 insertions, 94 deletions
diff --git a/rpc/rpc-lib/src/rpc-clnt.c b/rpc/rpc-lib/src/rpc-clnt.c
index 44324a80431..8a460cfa617 100644
--- a/rpc/rpc-lib/src/rpc-clnt.c
+++ b/rpc/rpc-lib/src/rpc-clnt.c
@@ -429,10 +429,6 @@ rpc_clnt_reconnect (void *conn_ptr)
}
pthread_mutex_unlock (&conn->lock);
- if ((ret == -1) && (errno != EINPROGRESS) && (clnt->notifyfn)) {
- clnt->notifyfn (clnt, clnt->mydata, RPC_CLNT_DISCONNECT, NULL);
- }
-
return;
}
diff --git a/rpc/rpc-transport/socket/src/Makefile.am b/rpc/rpc-transport/socket/src/Makefile.am
index 71e6ed6ff4a..5e909aceac8 100644
--- a/rpc/rpc-transport/socket/src/Makefile.am
+++ b/rpc/rpc-transport/socket/src/Makefile.am
@@ -1,4 +1,4 @@
-noinst_HEADERS = socket.h name.h
+noinst_HEADERS = socket.h name.h socket-mem-types.h
rpctransport_LTLIBRARIES = socket.la
rpctransportdir = $(libdir)/glusterfs/$(PACKAGE_VERSION)/rpc-transport
diff --git a/rpc/rpc-transport/socket/src/socket-mem-types.h b/rpc/rpc-transport/socket/src/socket-mem-types.h
new file mode 100644
index 00000000000..e5553b172a2
--- /dev/null
+++ b/rpc/rpc-transport/socket/src/socket-mem-types.h
@@ -0,0 +1,21 @@
+/*
+ Copyright (c) 2008-2014 Red Hat, Inc. <http://www.redhat.com>
+ This file is part of GlusterFS.
+
+ This file is licensed to you under your choice of the GNU Lesser
+ General Public License, version 3 or any later version (LGPLv3 or
+ later), or the GNU General Public License, version 2 (GPLv2), in all
+ cases as published by the Free Software Foundation.
+*/
+
+#ifndef __SOCKET_MEM_TYPES_H__
+#define __SOCKET_MEM_TYPES_H__
+
+#include "mem-types.h"
+
+typedef enum gf_sock_mem_types_ {
+ gf_sock_connect_error_state_t = gf_common_mt_end + 1,
+ gf_sock_mt_end
+} gf_sock_mem_types_t;
+
+#endif
diff --git a/rpc/rpc-transport/socket/src/socket.c b/rpc/rpc-transport/socket/src/socket.c
index 6f566e49345..6d4a862aa8d 100644
--- a/rpc/rpc-transport/socket/src/socket.c
+++ b/rpc/rpc-transport/socket/src/socket.c
@@ -23,7 +23,7 @@
#include "byte-order.h"
#include "common-utils.h"
#include "compat-errno.h"
-
+#include "socket-mem-types.h"
/* ugly #includes below */
#include "protocol-common.h"
@@ -152,6 +152,13 @@ typedef int SSL_trinary_func (SSL *, void *, int);
__socket_proto_update_priv_after_read (priv, ret, bytes_read); \
}
+struct socket_connect_error_state_ {
+ xlator_t *this;
+ rpc_transport_t *trans;
+ gf_boolean_t refd;
+};
+typedef struct socket_connect_error_state_ socket_connect_error_state_t;
+
static int socket_init (rpc_transport_t *this);
static void
@@ -2652,19 +2659,41 @@ out:
return ret;
}
+void*
+socket_connect_error_cbk (void *opaque)
+{
+ socket_connect_error_state_t *arg;
+
+ GF_ASSERT (opaque);
+
+ arg = opaque;
+ THIS = arg->this;
+
+ rpc_transport_notify (arg->trans, RPC_TRANSPORT_DISCONNECT, arg->trans);
+
+ if (arg->refd)
+ rpc_transport_unref (arg->trans);
+
+ GF_FREE (opaque);
+ return NULL;
+}
static int
socket_connect (rpc_transport_t *this, int port)
{
- int ret = -1;
- int sock = -1;
- socket_private_t *priv = NULL;
- socklen_t sockaddr_len = 0;
- glusterfs_ctx_t *ctx = NULL;
- sa_family_t sa_family = {0, };
- char *local_addr = NULL;
- union gf_sock_union sock_union;
- struct sockaddr_in *addr = NULL;
+ int ret = -1;
+ int th_ret = -1;
+ int sock = -1;
+ socket_private_t *priv = NULL;
+ socklen_t sockaddr_len = 0;
+ glusterfs_ctx_t *ctx = NULL;
+ sa_family_t sa_family = {0, };
+ char *local_addr = NULL;
+ union gf_sock_union sock_union;
+ struct sockaddr_in *addr = NULL;
+ gf_boolean_t refd = _gf_false;
+ socket_connect_error_state_t *arg = NULL;
+ pthread_t th_id = {0, };
GF_VALIDATE_OR_GOTO ("socket", this, err);
GF_VALIDATE_OR_GOTO ("socket", this->private, err);
@@ -2680,52 +2709,43 @@ socket_connect (rpc_transport_t *this, int port)
pthread_mutex_lock (&priv->lock);
{
- sock = priv->sock;
- }
- pthread_mutex_unlock (&priv->lock);
-
- if (sock != -1) {
- gf_log_callingfn (this->name, GF_LOG_TRACE,
- "connect () called on transport already connected");
- errno = EINPROGRESS;
- ret = -1;
- goto err;
- }
+ if (priv->sock != -1) {
+ gf_log_callingfn (this->name, GF_LOG_TRACE,
+ "connect () called on transport "
+ "already connected");
+ errno = EINPROGRESS;
+ ret = -1;
+ goto unlock;
+ }
- gf_log (this->name, GF_LOG_TRACE,
- "connecting %p, state=%u gen=%u sock=%d", this,
- priv->ot_state, priv->ot_gen, priv->sock);
+ gf_log (this->name, GF_LOG_TRACE,
+ "connecting %p, state=%u gen=%u sock=%d", this,
+ priv->ot_state, priv->ot_gen, priv->sock);
- ret = socket_client_get_remote_sockaddr (this, &sock_union.sa,
- &sockaddr_len, &sa_family);
- if (ret == -1) {
- /* logged inside client_get_remote_sockaddr */
- goto err;
- }
+ ret = socket_client_get_remote_sockaddr (this, &sock_union.sa,
+ &sockaddr_len, &sa_family);
+ if (ret == -1) {
+ /* logged inside client_get_remote_sockaddr */
+ goto unlock;
+ }
- if (port > 0) {
- sock_union.sin.sin_port = htons (port);
- }
- if (ntohs(sock_union.sin.sin_port) == GF_DEFAULT_SOCKET_LISTEN_PORT) {
- if (priv->use_ssl) {
- gf_log(this->name,GF_LOG_DEBUG,
- "disabling SSL for portmapper connection");
- priv->use_ssl = _gf_false;
+ if (port > 0) {
+ sock_union.sin.sin_port = htons (port);
}
- }
- else {
- if (priv->ssl_enabled && !priv->use_ssl) {
- gf_log(this->name,GF_LOG_DEBUG,
- "re-enabling SSL for I/O connection");
- priv->use_ssl = _gf_true;
+ if (ntohs(sock_union.sin.sin_port) ==
+ GF_DEFAULT_SOCKET_LISTEN_PORT) {
+ if (priv->use_ssl) {
+ gf_log(this->name,GF_LOG_DEBUG,
+ "disabling SSL for portmapper connection");
+ priv->use_ssl = _gf_false;
+ }
}
- }
- pthread_mutex_lock (&priv->lock);
- {
- if (priv->sock != -1) {
- gf_log (this->name, GF_LOG_TRACE,
- "connect() -- already connected");
- goto unlock;
+ else {
+ if (priv->ssl_enabled && !priv->use_ssl) {
+ gf_log(this->name,GF_LOG_DEBUG,
+ "re-enabling SSL for I/O connection");
+ priv->use_ssl = _gf_true;
+ }
}
memcpy (&this->peerinfo.sockaddr, &sock_union.storage,
@@ -2737,6 +2757,7 @@ socket_connect (rpc_transport_t *this, int port)
gf_log (this->name, GF_LOG_ERROR,
"socket creation failed (%s)",
strerror (errno));
+ ret = -1;
goto unlock;
}
@@ -2795,7 +2816,8 @@ socket_connect (rpc_transport_t *this, int port)
&local_addr);
if (!ret && SA (&this->myinfo.sockaddr)->sa_family == AF_INET) {
addr = (struct sockaddr_in *)(&this->myinfo.sockaddr);
- ret = inet_pton (AF_INET, local_addr, &(addr->sin_addr.s_addr));
+ ret = inet_pton (AF_INET, local_addr,
+ &(addr->sin_addr.s_addr));
}
ret = client_bind (this, SA (&this->myinfo.sockaddr),
@@ -2803,9 +2825,7 @@ socket_connect (rpc_transport_t *this, int port)
if (ret == -1) {
gf_log (this->name, GF_LOG_WARNING,
"client bind failed: %s", strerror (errno));
- close (priv->sock);
- priv->sock = -1;
- goto unlock;
+ goto handler;
}
if (!priv->use_ssl && !priv->bio && !priv->own_thread) {
@@ -2814,9 +2834,7 @@ socket_connect (rpc_transport_t *this, int port)
gf_log (this->name, GF_LOG_ERROR,
"NBIO on %d failed (%s)",
priv->sock, strerror (errno));
- close (priv->sock);
- priv->sock = -1;
- goto unlock;
+ goto handler;
}
}
@@ -2832,21 +2850,20 @@ socket_connect (rpc_transport_t *this, int port)
GF_LOG_DEBUG : GF_LOG_ERROR),
"connection attempt on %s failed, (%s)",
this->peerinfo.identifier, strerror (errno));
- close (priv->sock);
- priv->sock = -1;
- goto unlock;
+ goto handler;
+ }
+ else {
+ ret = 0;
}
- if (priv->use_ssl && !priv->own_thread) {
- ret = ssl_setup_connection(this,0);
- if (ret < 0) {
- gf_log(this->name,GF_LOG_ERROR,
- "client setup failed");
- close(priv->sock);
- priv->sock = -1;
- goto unlock;
- }
- }
+ if (priv->use_ssl && !priv->own_thread) {
+ ret = ssl_setup_connection(this,0);
+ if (ret < 0) {
+ gf_log(this->name,GF_LOG_ERROR,
+ "client setup failed");
+ goto handler;
+ }
+ }
if (!priv->bio && !priv->own_thread) {
ret = __socket_nonblock (priv->sock);
@@ -2855,10 +2872,24 @@ socket_connect (rpc_transport_t *this, int port)
gf_log (this->name, GF_LOG_ERROR,
"NBIO on %d failed (%s)",
priv->sock, strerror (errno));
- close (priv->sock);
+ goto handler;
+ }
+ }
+
+handler:
+ if (ret < 0) {
+ if (priv->own_thread) {
+ close(priv->sock);
priv->sock = -1;
goto unlock;
}
+ else {
+ /* Ignore error from connect. epoll events
+ should be handled in the socket handler.
+ shutdown(2) will result in EPOLLERR, so
+ cleanup is done in socket_event_handler */
+ shutdown (priv->sock, SHUT_RDWR);
+ }
}
/*
@@ -2868,31 +2899,58 @@ socket_connect (rpc_transport_t *this, int port)
priv->connected = 0;
priv->is_server = _gf_false;
rpc_transport_ref (this);
+ refd = _gf_true;
- if (priv->own_thread) {
- if (pipe(priv->pipe) < 0) {
- gf_log(this->name,GF_LOG_ERROR,
- "could not create pipe");
- }
+ if (priv->own_thread) {
+ if (pipe(priv->pipe) < 0) {
+ gf_log(this->name,GF_LOG_ERROR,
+ "could not create pipe");
+ }
this->listener = this;
socket_spawn(this);
- }
- else {
- priv->idx = event_register (ctx->event_pool, priv->sock,
- socket_event_handler,
- this, 1, 1);
- if (priv->idx == -1) {
- gf_log ("", GF_LOG_WARNING,
- "failed to register the event");
- ret = -1;
- }
- }
- }
+ }
+ else {
+ priv->idx = event_register (ctx->event_pool, priv->sock,
+ socket_event_handler,
+ this, 1, 1);
+ if (priv->idx == -1) {
+ gf_log ("", GF_LOG_WARNING,
+ "failed to register the event");
+ close(priv->sock);
+ priv->sock = -1;
+ ret = -1;
+ }
+ }
+
unlock:
+ sock = priv->sock;
+ }
pthread_mutex_unlock (&priv->lock);
err:
+ /* if sock != -1, then cleanup is done from the event handler */
+ if (ret == -1 && sock == -1) {
+ /* Cleaup requires to send notification to upper layer which
+ intern holds the big_lock. There can be dead-lock situation
+ if big_lock is already held by the current thread.
+ So transfer the ownership to seperate thread for cleanup.
+ */
+ arg = GF_CALLOC (1, sizeof (*arg),
+ gf_sock_connect_error_state_t);
+ arg->this = THIS;
+ arg->trans = this;
+ arg->refd = refd;
+ th_ret = pthread_create (&th_id, NULL, socket_connect_error_cbk,
+ arg);
+ if (th_ret) {
+ gf_log (this->name, GF_LOG_ERROR, "pthread_create"
+ "failed: %s", strerror(errno));
+ GF_FREE (arg);
+ GF_ASSERT (0);
+ }
+ }
+
return ret;
}