summaryrefslogtreecommitdiffstats
path: root/rpc/rpc-transport
diff options
context:
space:
mode:
authorMilind Changire <mchangir@redhat.com>2018-01-05 11:17:46 +0530
committerRaghavendra G <rgowdapp@redhat.com>2018-01-07 03:55:51 +0000
commit9b12157fea5ac054106ed14f9d91cdb8bad665c7 (patch)
treed7b6701996424111ce8e39c5f2d03239830fae3c /rpc/rpc-transport
parent515a832de0e761639b1d076a59bf918070ec3130 (diff)
Revert "rpc: merge ssl infra with epoll infra"
This reverts commit 56e5fdae74845dfec0ff7ad0c8fee77695d36ad5. Change-Id: Ia62cee5440bbe8e23f5da9cff692d792091d544a Signed-off-by: Milind Changire <mchangir@redhat.com>
Diffstat (limited to 'rpc/rpc-transport')
-rw-r--r--rpc/rpc-transport/socket/src/socket.c1569
-rw-r--r--rpc/rpc-transport/socket/src/socket.h32
2 files changed, 766 insertions, 835 deletions
diff --git a/rpc/rpc-transport/socket/src/socket.c b/rpc/rpc-transport/socket/src/socket.c
index 7ffebad555f..65d0b641333 100644
--- a/rpc/rpc-transport/socket/src/socket.c
+++ b/rpc/rpc-transport/socket/src/socket.c
@@ -90,7 +90,6 @@
typedef int SSL_unary_func (SSL *);
typedef int SSL_trinary_func (SSL *, void *, int);
-static int ssl_setup_connection_params(rpc_transport_t *this);
#define __socket_proto_reset_pending(priv) do { \
struct gf_sock_incoming_frag *frag; \
@@ -189,63 +188,18 @@ struct socket_connect_error_state_ {
typedef struct socket_connect_error_state_ socket_connect_error_state_t;
static int socket_init (rpc_transport_t *this);
-static int __socket_nonblock (int fd);
-
-static void
-socket_dump_info (struct sockaddr *sa, int is_server, int is_ssl, int sock,
- char *log_domain, char *log_label)
-{
- char addr_buf[INET6_ADDRSTRLEN+1] = {0, };
- char *addr = NULL;
- char *peer_type = NULL;
- int af = sa->sa_family;
- int so_error = -1;
- socklen_t slen = sizeof(so_error);
-
- if (af == AF_UNIX) {
- addr = ((struct sockaddr_un *)(sa))->sun_path;
- } else {
- if (af == AF_INET6) {
- struct sockaddr_in6 *sin6 =
- (struct sockaddr_in6 *)(sa);
-
- inet_ntop (af, &sin6->sin6_addr, addr_buf,
- sizeof (addr_buf));
- addr = addr_buf;
- } else {
- struct sockaddr_in *sin =
- (struct sockaddr_in *)(sa);
-
- inet_ntop (af, &sin->sin_addr, addr_buf,
- sizeof (addr_buf));
- addr = addr_buf;
- }
- }
- if (is_server)
- peer_type = "server";
- else
- peer_type = "client";
-
- getsockopt (sock, SOL_SOCKET, SO_ERROR, &so_error, &slen);
-
- gf_log (log_domain, GF_LOG_TRACE,
- "$$$ %s: %s (af:%d,sock:%d) %s %s (errno:%d:%s)",
- peer_type, log_label, af, sock, addr,
- (is_ssl ? "SSL" : "non-SSL"),
- so_error, strerror (so_error));
-}
static void
ssl_dump_error_stack (const char *caller)
{
unsigned long errnum = 0;
- char errbuf[120] = {0,};
+ char errbuf[120] = {0, };
/* OpenSSL docs explicitly give 120 as the error-string length. */
while ((errnum = ERR_get_error())) {
- ERR_error_string(errnum,errbuf);
- gf_log(caller,GF_LOG_ERROR," %s",errbuf);
+ ERR_error_string(errnum, errbuf);
+ gf_log(caller, GF_LOG_ERROR, " %s", errbuf);
}
}
@@ -253,119 +207,128 @@ static int
ssl_do (rpc_transport_t *this, void *buf, size_t len, SSL_trinary_func *func)
{
int r = (-1);
+ struct pollfd pfd = {-1, };
socket_private_t *priv = NULL;
- GF_VALIDATE_OR_GOTO(this->name,this->private,out);
+ GF_VALIDATE_OR_GOTO(this->name, this->private, out);
priv = this->private;
- if (buf) {
- if (priv->connected == -1) {
+ for (;;) {
+ if (buf) {
+ if (priv->connected == -1) {
+ /*
+ * Fields in the SSL structure (especially
+ * the BIO pointers) are not valid at this
+ * point, so we'll segfault if we pass them
+ * to SSL_read/SSL_write.
+ */
+ gf_log(this->name, GF_LOG_INFO,
+ "lost connection in %s", __func__);
+ break;
+ }
+ r = func(priv->ssl_ssl, buf, len);
+ } else {
/*
- * Fields in the SSL structure (especially
- * the BIO pointers) are not valid at this
- * point, so we'll segfault if we pass them
- * to SSL_read/SSL_write.
+ * We actually need these functions to get to
+ * priv->connected == 1.
*/
- gf_log (this->name, GF_LOG_INFO,
- "lost connection in %s", __func__);
- return -1;
- }
- r = func (priv->ssl_ssl, buf, len);
- } else {
- /*
- * We actually need these functions to get to
- * priv->connected == 1.
- */
- r = ((SSL_unary_func *)func)(priv->ssl_ssl);
- }
- switch (SSL_get_error (priv->ssl_ssl, r)) {
- case SSL_ERROR_NONE:
- /* fall through */
- case SSL_ERROR_WANT_READ:
- /* fall through */
- case SSL_ERROR_WANT_WRITE:
- errno = EAGAIN;
- return r;
+ r = ((SSL_unary_func *)func)(priv->ssl_ssl);
+ }
+ switch (SSL_get_error(priv->ssl_ssl, r)) {
+ case SSL_ERROR_NONE:
+ return r;
+ case SSL_ERROR_WANT_READ:
+ /* If we are attempting to connect/accept then we
+ * should wait here on the poll, for the SSL
+ * (re)negotiation to complete, else we would error out
+ * on the accept/connect.
+ * If we are here when attempting to read/write
+ * then we return r (or -1) as the socket is always
+ * primed for the read event, and it would eventually
+ * call one of the SSL routines */
+ /* NOTE: Only way to determine this is a accept/connect
+ * is to examine buf or func, which is not very
+ * clean */
+ if ((func == (SSL_trinary_func *)SSL_read)
+ || (func == (SSL_trinary_func *) SSL_write)) {
+ return r;
+ }
- case SSL_ERROR_SYSCALL:
- /* Sometimes SSL_ERROR_SYSCALL returns errno as
- * EAGAIN. In such a case we should reattempt operation
- * So, for now, just return the return value and the
- * errno as is.
- */
- gf_log (this->name, GF_LOG_DEBUG,
- "syscall error (probably remote disconnect) "
- "errno:%d:%s", errno, strerror(errno));
- return r;
- default:
- errno = EIO;
- goto out; /* "break" would just loop again */
+ pfd.fd = priv->sock;
+ pfd.events = POLLIN;
+ if (poll(&pfd, 1, -1) < 0) {
+ gf_log(this->name, GF_LOG_ERROR, "poll error %d",
+ errno);
+ }
+ break;
+ case SSL_ERROR_WANT_WRITE:
+ if ((func == (SSL_trinary_func *)SSL_read)
+ || (func == (SSL_trinary_func *) SSL_write)) {
+ errno = EAGAIN;
+ return r;
+ }
+ pfd.fd = priv->sock;
+ pfd.events = POLLOUT;
+ if (poll(&pfd, 1, -1) < 0) {
+ gf_log(this->name, GF_LOG_ERROR, "poll error %d",
+ errno);
+ }
+ break;
+ case SSL_ERROR_SYSCALL:
+ /* This is what we get when remote disconnects. */
+ gf_log(this->name, GF_LOG_DEBUG,
+ "syscall error (probably remote disconnect)");
+ errno = ENODATA;
+ goto out;
+ default:
+ errno = EIO;
+ goto out; /* "break" would just loop again */
+ }
}
out:
return -1;
}
-#define ssl_read_one(t,b,l) ssl_do((t),(b),(l),(SSL_trinary_func *)SSL_read)
-#define ssl_write_one(t,b,l) ssl_do((t),(b),(l),(SSL_trinary_func *)SSL_write)
-
+#define ssl_connect_one(t) ssl_do((t), NULL, 0, (SSL_trinary_func *)SSL_connect)
+#define ssl_accept_one(t) ssl_do((t), NULL, 0, (SSL_trinary_func *)SSL_accept)
+#define ssl_read_one(t, b, l) ssl_do((t), (b), (l), (SSL_trinary_func *)SSL_read)
+#define ssl_write_one(t, b, l) ssl_do((t), (b), (l), (SSL_trinary_func *)SSL_write)
-int
-ssl_setup_connection_prefix (rpc_transport_t *this)
+static char *
+ssl_setup_connection (rpc_transport_t *this, int server)
{
+ X509 *peer = NULL;
+ char peer_CN[256] = "";
int ret = -1;
socket_private_t *priv = NULL;
- GF_VALIDATE_OR_GOTO(this->name,this->private,done);
-
+ GF_VALIDATE_OR_GOTO(this->name, this->private, done);
priv = this->private;
- if (ssl_setup_connection_params (this) < 0) {
- gf_log (this->name, GF_LOG_TRACE,
- "+ ssl_setup_connection_params() failed!");
- goto done;
- } else {
- gf_log (this->name, GF_LOG_TRACE,
- "+ ssl_setup_connection_params() done!");
- }
-
- priv->ssl_error_required = SSL_ERROR_NONE;
- priv->ssl_connected = _gf_false;
- priv->ssl_accepted = _gf_false;
- priv->ssl_context_created = _gf_false;
-
priv->ssl_ssl = SSL_new(priv->ssl_ctx);
if (!priv->ssl_ssl) {
- gf_log(this->name,GF_LOG_ERROR,"SSL_new failed");
+ gf_log(this->name, GF_LOG_ERROR, "SSL_new failed");
ssl_dump_error_stack(this->name);
goto done;
}
-
priv->ssl_sbio = BIO_new_socket(priv->sock, BIO_NOCLOSE);
if (!priv->ssl_sbio) {
- gf_log(this->name,GF_LOG_ERROR,"BIO_new_socket failed");
+ gf_log(this->name, GF_LOG_ERROR, "BIO_new_socket failed");
ssl_dump_error_stack(this->name);
goto free_ssl;
}
- SSL_set_bio (priv->ssl_ssl, priv->ssl_sbio, priv->ssl_sbio);
- ret = 0;
- goto done;
+ SSL_set_bio(priv->ssl_ssl, priv->ssl_sbio, priv->ssl_sbio);
-free_ssl:
- SSL_free(priv->ssl_ssl);
- priv->ssl_ssl = NULL;
-done:
- return ret;
-}
-
-static char *
-ssl_setup_connection_postfix (rpc_transport_t *this)
-{
- X509 *peer = NULL;
- char peer_CN[256] = "";
- socket_private_t *priv = NULL;
+ if (server) {
+ ret = ssl_accept_one(this);
+ } else {
+ ret = ssl_connect_one(this);
+ }
- GF_VALIDATE_OR_GOTO(this->name, this->private, done);
- priv = this->private;
+ /* Make sure _the call_ succeeded. */
+ if (ret < 0) {
+ goto ssl_error;
+ }
/* Make sure _SSL verification_ succeeded, yielding an identity. */
if (SSL_get_verify_result(priv->ssl_ssl) != X509_V_OK) {
@@ -376,8 +339,6 @@ ssl_setup_connection_postfix (rpc_transport_t *this)
goto ssl_error;
}
- SSL_set_mode(priv->ssl_ssl, SSL_MODE_ENABLE_PARTIAL_WRITE);
-
/* Finally, everything seems OK. */
X509_NAME_get_text_by_NID(X509_get_subject_name(peer),
NID_commonName, peer_CN, sizeof(peer_CN)-1);
@@ -394,7 +355,7 @@ ssl_error:
"SSL connect error (client: %s) (server: %s)",
this->peerinfo.identifier, this->myinfo.identifier);
ssl_dump_error_stack(this->name);
-
+free_ssl:
SSL_free(priv->ssl_ssl);
priv->ssl_ssl = NULL;
done:
@@ -402,86 +363,6 @@ done:
}
-int
-ssl_complete_connection (rpc_transport_t *this)
-{
- int ret = -1; /* 1 : implies go back to epoll_wait()
- * 0 : implies successful ssl connection
- * -1: implies continue processing current event
- * as if EPOLLERR has been encountered
- */
- char *cname = NULL;
- int r = -1;
- int ssl_error = -1;
- socket_private_t *priv = NULL;
-
-
- priv = this->private;
-
- if (priv->is_server) {
- r = SSL_accept (priv->ssl_ssl);
- } else {
- r = SSL_connect (priv->ssl_ssl);
- }
-
- ssl_error = SSL_get_error (priv->ssl_ssl, r);
- priv->ssl_error_required = ssl_error;
-
- switch (ssl_error) {
- case SSL_ERROR_NONE:
- cname = ssl_setup_connection_postfix (this);
- if (!cname) {
- /* we've failed to get the cname so
- * we must close the connection
- *
- * treat this as EPOLLERR
- */
- gf_log (this->name, GF_LOG_TRACE,
- "error getting cname");
- errno = ECONNRESET;
- ret = -1;
- } else {
- this->ssl_name = cname;
- if (priv->is_server) {
- priv->ssl_accepted = _gf_true;
- gf_log (this->name, GF_LOG_TRACE,
- "ssl_accepted!");
- } else {
- priv->ssl_connected = _gf_true;
- gf_log (this->name, GF_LOG_TRACE,
- "ssl_connected!");
- }
- ret = 0;
- }
- break;
-
- case SSL_ERROR_WANT_READ:
- /* fall through */
- case SSL_ERROR_WANT_WRITE:
- errno = EAGAIN;
- break;
-
- case SSL_ERROR_SYSCALL:
- /* Sometimes SSL_ERROR_SYSCALL returns with errno as EAGAIN
- * So, we should retry the operation.
- * So, for now, we just return the return value and errno as is.
- */
- break;
-
- case SSL_ERROR_SSL:
- /* treat this as EPOLLERR */
- ret = -1;
- break;
-
- default:
- /* treat this as EPOLLERR */
- errno = EIO;
- ret = -1;
- break;
- }
- return ret;
-}
-
static void
ssl_teardown_connection (socket_private_t *priv)
{
@@ -489,21 +370,7 @@ ssl_teardown_connection (socket_private_t *priv)
SSL_shutdown(priv->ssl_ssl);
SSL_clear(priv->ssl_ssl);
SSL_free(priv->ssl_ssl);
- SSL_CTX_free(priv->ssl_ctx);
priv->ssl_ssl = NULL;
- priv->ssl_ctx = NULL;
- if (priv->ssl_private_key) {
- GF_FREE (priv->ssl_private_key);
- priv->ssl_private_key = NULL;
- }
- if (priv->ssl_own_cert) {
- GF_FREE (priv->ssl_own_cert);
- priv->ssl_own_cert = NULL;
- }
- if (priv->ssl_ca_list) {
- GF_FREE (priv->ssl_ca_list);
- priv->ssl_ca_list = NULL;
- }
}
priv->use_ssl = _gf_false;
}
@@ -520,10 +387,8 @@ __socket_ssl_readv (rpc_transport_t *this, struct iovec *opvector, int opcount)
sock = priv->sock;
if (priv->use_ssl) {
- gf_log (this->name, GF_LOG_TRACE, "***** reading over SSL");
ret = ssl_read_one (this, opvector->iov_base, opvector->iov_len);
} else {
- gf_log (this->name, GF_LOG_TRACE, "***** reading over non-SSL");
ret = sys_readv (sock, opvector, IOV_MIN(opcount));
}
@@ -661,7 +526,7 @@ __socket_rwv (rpc_transport_t *this, struct iovec *vector, int count,
while (opcount > 0) {
if (opvector->iov_len == 0) {
- gf_log(this->name,GF_LOG_DEBUG,
+ gf_log(this->name, GF_LOG_DEBUG,
"would have passed zero length to read/write");
++opvector;
--opcount;
@@ -675,8 +540,6 @@ __socket_rwv (rpc_transport_t *this, struct iovec *vector, int count,
* non-SSL might be insecure, so just fail it outright.
*/
ret = -1;
- gf_log (this->name, GF_LOG_TRACE,
- "### no priv->ssl_ssl yet; ret = -1;");
} else if (write) {
if (priv->use_ssl) {
ret = ssl_write_one (this, opvector->iov_base,
@@ -692,10 +555,9 @@ __socket_rwv (rpc_transport_t *this, struct iovec *vector, int count,
this->total_bytes_write += ret;
} else {
ret = __socket_cached_read (this, opvector, opcount);
+
if (ret == 0) {
- gf_log (this->name, GF_LOG_DEBUG,
- "EOF on socket (errno:%d:%s)",
- errno, strerror (errno));
+ gf_log(this->name, GF_LOG_DEBUG, "EOF on socket");
errno = ENODATA;
ret = -1;
}
@@ -744,7 +606,7 @@ __socket_rwv (rpc_transport_t *this, struct iovec *vector, int count,
while (moved < ret) {
if (!opcount) {
- gf_log(this->name,GF_LOG_DEBUG,
+ gf_log(this->name, GF_LOG_DEBUG,
"ran out of iov, moved %d/%d",
moved, ret);
goto ran_out;
@@ -855,8 +717,9 @@ __socket_disconnect (rpc_transport_t *this)
priv = this->private;
- gf_log (this->name, GF_LOG_TRACE, "disconnecting %p, sock=%d",
- this, priv->sock);
+ gf_log (this->name, GF_LOG_TRACE,
+ "disconnecting %p, state=%u gen=%u sock=%d", this,
+ priv->ot_state, priv->ot_gen, priv->sock);
if (priv->sock != -1) {
gf_log_callingfn (this->name, GF_LOG_TRACE,
@@ -867,6 +730,16 @@ __socket_disconnect (rpc_transport_t *this)
"__socket_teardown_connection () failed: %s",
strerror (errno));
}
+
+ if (priv->own_thread) {
+ /*
+ * Without this, reconnect (= disconnect + connect)
+ * won't work except by accident.
+ */
+ gf_log (this->name, GF_LOG_TRACE,
+ "OT_PLEASE_DIE on %p", this);
+ priv->ot_state = OT_PLEASE_DIE;
+ }
}
out:
@@ -1100,22 +973,7 @@ __socket_reset (rpc_transport_t *this)
priv->sock = -1;
priv->idx = -1;
priv->connected = -1;
- priv->ssl_connected = _gf_false;
- priv->ssl_accepted = _gf_false;
- priv->ssl_context_created = _gf_false;
- if (priv->ssl_private_key) {
- GF_FREE (priv->ssl_private_key);
- priv->ssl_private_key = NULL;
- }
- if (priv->ssl_own_cert) {
- GF_FREE (priv->ssl_own_cert);
- priv->ssl_own_cert = NULL;
- }
- if (priv->ssl_ca_list) {
- GF_FREE (priv->ssl_ca_list);
- priv->ssl_ca_list = NULL;
- }
out:
return;
}
@@ -1251,6 +1109,8 @@ static int
__socket_ioq_churn_entry (rpc_transport_t *this, struct ioq *entry, int direct)
{
int ret = -1;
+ socket_private_t *priv = NULL;
+ char a_byte = 0;
ret = __socket_writev (this, entry->pending_vector,
entry->pending_count,
@@ -1261,6 +1121,18 @@ __socket_ioq_churn_entry (rpc_transport_t *this, struct ioq *entry, int direct)
/* current entry was completely written */
GF_ASSERT (entry->pending_count == 0);
__socket_ioq_entry_free (entry);
+ priv = this->private;
+ if (priv->own_thread) {
+ /*
+ * The pipe should only remain readable if there are
+ * more entries after this, so drain the byte
+ * representing this entry.
+ */
+ if (!direct && sys_read (priv->pipe[0], &a_byte, 1) < 1) {
+ gf_log(this->name, GF_LOG_WARNING,
+ "read error on pipe");
+ }
+ }
}
return ret;
@@ -1289,7 +1161,7 @@ __socket_ioq_churn (rpc_transport_t *this)
break;
}
- if (list_empty (&priv->ioq)) {
+ if (!priv->own_thread && list_empty (&priv->ioq)) {
/* all pending writes done, not interested in POLLOUT */
priv->idx = event_select_on (this->ctx->event_pool,
priv->sock, priv->idx, -1, 0);
@@ -2444,9 +2316,15 @@ socket_event_poll_in (rpc_transport_t *this, gf_boolean_t notify_handled)
priv->gen);
if (pollin) {
+ priv->ot_state = OT_CALLBACK;
+
ret = rpc_transport_notify (this, RPC_TRANSPORT_MSG_RECEIVED,
pollin);
+ if (priv->ot_state == OT_CALLBACK) {
+ priv->ot_state = OT_RUNNING;
+ }
+
rpc_transport_pollin_destroy (pollin);
pthread_mutex_lock (&priv->notify.lock);
@@ -2539,355 +2417,303 @@ out:
static int socket_disconnect (rpc_transport_t *this, gf_boolean_t wait);
-/* socket_is_connected() is for use only in socket_event_handler() */
-static inline gf_boolean_t
-socket_is_connected (rpc_transport_t *this)
+/* reads rpc_requests during pollin */
+static int
+socket_event_handler (int fd, int idx, int gen, void *data,
+ int poll_in, int poll_out, int poll_err)
{
+ rpc_transport_t *this = NULL;
socket_private_t *priv = NULL;
+ int ret = -1;
+ glusterfs_ctx_t *ctx = NULL;
+ gf_boolean_t socket_closed = _gf_false, notify_handled = _gf_false;
- priv = this->private;
-
- if (priv->use_ssl) {
- return priv->is_server ? priv->ssl_accepted :
- priv->ssl_connected;
- } else {
- return priv->is_server ? priv->accepted :
- priv->connected;
- }
-}
-
-static void
-ssl_rearm_event_fd (rpc_transport_t *this)
-{
- socket_private_t *priv = NULL;
- glusterfs_ctx_t *ctx = NULL;
- int idx = -1;
- int gen = -1;
- int fd = -1;
-
- priv = this->private;
- ctx = this->ctx;
-
- idx = priv->idx;
- gen = priv->gen;
- fd = priv->sock;
-
- if (priv->ssl_error_required == SSL_ERROR_WANT_READ)
- event_select_on (ctx->event_pool, fd, idx, 1, -1);
- if (priv->ssl_error_required == SSL_ERROR_WANT_WRITE)
- event_select_on (ctx->event_pool, fd, idx, -1, 1);
- event_handled (ctx->event_pool, fd, idx, gen);
-}
+ this = data;
-static int
-ssl_handle_server_connection_attempt (rpc_transport_t *this)
-{
- socket_private_t *priv = NULL;
- glusterfs_ctx_t *ctx = NULL;
- int idx = -1;
- int gen = -1;
- int ret = -1;
- int fd = -1;
+ GF_VALIDATE_OR_GOTO ("socket", this, out);
+ GF_VALIDATE_OR_GOTO ("socket", this->private, out);
+ GF_VALIDATE_OR_GOTO ("socket", this->xl, out);
+ THIS = this->xl;
priv = this->private;
ctx = this->ctx;
- idx = priv->idx;
- gen = priv->gen;
- fd = priv->sock;
+ pthread_mutex_lock (&priv->in_lock);
+ pthread_mutex_lock (&priv->out_lock);
+ {
+ priv->idx = idx;
+ priv->gen = gen;
+ }
+ pthread_mutex_unlock (&priv->out_lock);
+ pthread_mutex_unlock (&priv->in_lock);
- if (!priv->ssl_context_created) {
- ret = ssl_setup_connection_prefix (this);
- if (ret < 0) {
- gf_log (this->name, GF_LOG_TRACE,
- "> ssl_setup_connection_prefix() failed!");
+ if (priv->connected != 1) {
+ if (priv->connect_failed) {
+ /* connect failed with some other error than
+ EINPROGRESS or ENOENT, so nothing more to do, fail
+ reading/writing anything even if poll_in or poll_out
+ is set */
+ gf_log ("transport", GF_LOG_DEBUG,
+ "connect failed with some other error than "
+ "EINPROGRESS or ENOENT, so nothing more to "
+ "do; disconnecting socket");
+ ret = socket_disconnect (this, _gf_false);
+
+ /* Force ret to be -1, as we are officially done with
+ this socket */
ret = -1;
- goto out;
} else {
- priv->ssl_context_created = _gf_true;
+ ret = socket_connect_finish (this);
}
- }
- ret = ssl_complete_connection (this);
- if (ret == 0) {
- /* nothing to do */
- event_select_on (ctx->event_pool, fd, idx, 1, 0);
- event_handled (ctx->event_pool, fd, idx, gen);
- ret = 1;
} else {
- if (errno == EAGAIN) {
- ssl_rearm_event_fd (this);
- ret = 1;
- } else {
- ret = -1;
- gf_log (this->name, GF_LOG_TRACE,
- "ssl_complete_connection returned error");
- }
+ ret = 0;
}
-out:
- return ret;
-}
-static int
-ssl_handle_client_connection_attempt (rpc_transport_t *this)
-{
- socket_private_t *priv = NULL;
- glusterfs_ctx_t *ctx = NULL;
- int idx = -1;
- int ret = -1;
- int fd = -1;
+ if (!ret && poll_out) {
+ ret = socket_event_poll_out (this);
+ }
- priv = this->private;
- ctx = this->ctx;
+ if (!ret && poll_in) {
+ ret = socket_event_poll_in (this, !poll_err);
+ notify_handled = _gf_true;
+ }
- idx = priv->idx;
- fd = priv->sock;
+ if ((ret < 0) || poll_err) {
+ /* Logging has happened already in earlier cases */
+ gf_log ("transport", ((ret >= 0) ? GF_LOG_INFO : GF_LOG_DEBUG),
+ "EPOLLERR - disconnecting now");
- /* SSL client */
- if (priv->connect_failed) {
- gf_log (this->name, GF_LOG_TRACE,
- ">>> disconnecting SSL socket");
- ret = socket_disconnect (this, _gf_false);
- /* Force ret to be -1, as we are officially done with
- this socket */
- ret = -1;
- } else {
- if (!priv->ssl_context_created) {
- ret = ssl_setup_connection_prefix (this);
- if (ret < 0) {
- gf_log (this->name, GF_LOG_TRACE,
- "> ssl_setup_connection_prefix() "
- "failed!");
- ret = -1;
- goto out;
- } else {
- priv->ssl_context_created = _gf_true;
- }
- }
- ret = ssl_complete_connection (this);
- if (ret == 0) {
- ret = socket_connect_finish (this);
- event_select_on (ctx->event_pool, fd, idx, 1, 0);
- gf_log (this->name, GF_LOG_TRACE,
- ">>> completed client connect");
- } else {
- if (errno == EAGAIN) {
- gf_log (this->name, GF_LOG_TRACE,
- ">>> retrying client connect 2");
- ssl_rearm_event_fd (this);
- ret = 1;
- } else {
- /* this is a connection failure */
- ret = socket_connect_finish (this);
- gf_log (this->name, GF_LOG_TRACE,
- "ssl_complete_connection "
- "returned error");
- ret = -1;
- }
- }
+ socket_closed = socket_event_poll_err (this, gen, idx);
+
+ if (socket_closed)
+ rpc_transport_unref (this);
+
+ } else if (!notify_handled) {
+ event_handled (ctx->event_pool, fd, idx, gen);
}
+
out:
return ret;
}
-static int
-socket_handle_client_connection_attempt (rpc_transport_t *this)
+static int poll_err_cnt;
+static void *
+socket_poller (void *ctx)
{
- socket_private_t *priv = NULL;
- glusterfs_ctx_t *ctx = NULL;
- int idx = -1;
- int gen = -1;
- int ret = -1;
- int fd = -1;
+ rpc_transport_t *this = ctx;
+ socket_private_t *priv = this->private;
+ struct pollfd pfd[2] = {{0, }, };
+ gf_boolean_t to_write = _gf_false;
+ int ret = 0;
+ uint32_t gen = 0;
+ char *cname = NULL;
- priv = this->private;
- ctx = this->ctx;
+ GF_ASSERT (this);
+ /* Set THIS early on in the life of this thread, instead of setting it
+ * conditionally
+ */
+ THIS = this->xl;
- idx = priv->idx;
- gen = priv->gen;
- fd = priv->sock;
-
- /* non-SSL client */
- if (priv->connect_failed) {
- /* connect failed with some other error than
- EINPROGRESS or ENOENT, so nothing more to
- do, fail reading/writing anything even if
- poll_in or poll_out
- is set
- */
- gf_log ("transport", GF_LOG_DEBUG,
- "connect failed with some other error "
- "than EINPROGRESS or ENOENT, so "
- "nothing more to do; disconnecting "
- "socket");
- (void)socket_disconnect (this, _gf_false);
-
- /* Force ret to be -1, as we are officially
- * done with this socket
- */
- ret = -1;
- } else {
- ret = socket_connect_finish (this);
- gf_log (this->name, GF_LOG_TRACE,
- "socket_connect_finish() returned %d",
- ret);
- if (ret == 0 || ret == 1) {
- /* we don't want to do any reads or
- * writes on the connection yet in
- * socket_event_handler, so just
- * return 1
- */
- ret = 1;
- event_handled (ctx->event_pool, fd, idx, gen);
- }
+ if (priv->ot_state == OT_PLEASE_DIE) {
+ gf_log (this->name, GF_LOG_DEBUG, "socket_poller is exiting "
+ "because socket state is OT_PLEASE_DIE");
+ goto err;
}
- return ret;
-}
-static int
-socket_complete_connection (rpc_transport_t *this)
-{
- socket_private_t *priv = NULL;
- glusterfs_ctx_t *ctx = NULL;
- int idx = -1;
- int gen = -1;
- int ret = -1;
- int fd = -1;
-
- priv = this->private;
- ctx = this->ctx;
-
- idx = priv->idx;
- gen = priv->gen;
- fd = priv->sock;
+ priv->ot_state = OT_RUNNING;
if (priv->use_ssl) {
- if (priv->is_server) {
- ret = ssl_handle_server_connection_attempt (this);
- } else {
- ret = ssl_handle_client_connection_attempt (this);
+ cname = ssl_setup_connection(this, priv->connected);
+ if (!cname) {
+ gf_log (this->name, GF_LOG_ERROR, "%s setup failed",
+ priv->connected ? "server" : "client");
+ goto err;
}
- } else {
- if (priv->is_server) {
- /* non-SSL server: nothing much to do
- * connection has already been accepted in
- * socket_server_event_handler()
- */
- priv->accepted = _gf_true;
- event_handled (ctx->event_pool, fd, idx, gen);
- ret = 1;
+ if (priv->connected) {
+ this->ssl_name = cname;
} else {
- ret = socket_handle_client_connection_attempt (this);
+ GF_FREE(cname);
}
}
- return ret;
-}
-/* reads rpc_requests during pollin */
-static int
-socket_event_handler (int fd, int idx, int gen, void *data,
- int poll_in, int poll_out, int poll_err)
-{
- rpc_transport_t *this = NULL;
- socket_private_t *priv = NULL;
- int ret = -1;
- glusterfs_ctx_t *ctx = NULL;
- gf_boolean_t socket_closed = _gf_false, notify_handled = _gf_false;
-
-
- this = data;
-
- GF_VALIDATE_OR_GOTO ("socket", this, out);
- GF_VALIDATE_OR_GOTO ("socket", this->private, out);
- GF_VALIDATE_OR_GOTO ("socket", this->xl, out);
-
- THIS = this->xl;
- priv = this->private;
- ctx = this->ctx;
-
- pthread_mutex_lock (&priv->in_lock);
- pthread_mutex_lock (&priv->out_lock);
- {
- priv->idx = idx;
- priv->gen = gen;
+ if (!priv->bio) {
+ ret = __socket_nonblock (priv->sock);
+ if (ret == -1) {
+ gf_log (this->name, GF_LOG_WARNING,
+ "NBIO on %d failed (%s)",
+ priv->sock, strerror (errno));
+ goto err;
+ }
}
- pthread_mutex_unlock (&priv->out_lock);
- pthread_mutex_unlock (&priv->in_lock);
- gf_log (this->name, GF_LOG_TRACE, "%s (sock:%d) in:%d, out:%d, err:%d",
- (priv->is_server ? "server" : "client"),
- priv->sock, poll_in, poll_out, poll_err);
+ if (priv->connected == 0) {
+ ret = socket_connect_finish (this);
+ if (ret != 0) {
+ gf_log (this->name, GF_LOG_WARNING,
+ "asynchronous socket_connect_finish failed");
+ }
+ }
- if (!poll_err) {
- if (!socket_is_connected (this)) {
- gf_log (this->name, GF_LOG_TRACE,
- "%s (sock:%d) socket is not connected, "
- "completing connection",
- (priv->is_server ? "server" : "client"),
- priv->sock);
+ ret = rpc_transport_notify (this->listener,
+ RPC_TRANSPORT_ACCEPT, this);
+ if (ret != 0) {
+ gf_log (this->name, GF_LOG_WARNING,
+ "asynchronous rpc_transport_notify failed");
+ }
- ret = socket_complete_connection (this);
+ gen = priv->ot_gen;
+ for (;;) {
+ pthread_mutex_lock(&priv->out_lock);
+ to_write = !list_empty(&priv->ioq);
+ pthread_mutex_unlock(&priv->out_lock);
+ pfd[0].fd = priv->pipe[0];
+ pfd[0].events = POLL_MASK_ERROR;
+ pfd[0].revents = 0;
+ pfd[1].fd = priv->sock;
+ pfd[1].events = POLL_MASK_INPUT | POLL_MASK_ERROR;
+ pfd[1].revents = 0;
+ if (to_write) {
+ pfd[1].events |= POLL_MASK_OUTPUT;
+ } else {
+ pfd[0].events |= POLL_MASK_INPUT;
+ }
+ if (poll(pfd, 2, -1) < 0) {
+ gf_log(this->name, GF_LOG_ERROR, "poll failed");
+ break;
+ }
+ if (pfd[0].revents & POLL_MASK_ERROR) {
+ gf_log(this->name, GF_LOG_ERROR,
+ "poll error on pipe");
+ break;
+ }
- gf_log (this->name, GF_LOG_TRACE, "(sock:%d) "
- "socket_complete_connection() returned %d",
- priv->sock, ret);
+ if (priv->ot_state == OT_PLEASE_DIE) {
+ gf_log (this->name, GF_LOG_DEBUG,
+ "OT_PLEASE_DIE on %p (exiting socket_poller)",
+ this);
+ break;
+ }
- if (ret > 0) {
+ if (pfd[1].revents & POLL_MASK_INPUT) {
+ ret = socket_event_poll_in(this, 0);
+ if (ret >= 0) {
+ /* Suppress errors while making progress. */
+ pfd[1].revents &= ~POLL_MASK_ERROR;
+ } else if (errno == ENOTCONN) {
+ ret = 0;
+ }
+ if (priv->ot_state == OT_PLEASE_DIE) {
gf_log (this->name, GF_LOG_TRACE,
- "(sock:%d) returning to wait on socket",
- priv->sock);
- return 0;
+ "OT_IDLE on %p (input request)",
+ this);
+ break;
+ }
+ } else if (pfd[1].revents & POLL_MASK_OUTPUT) {
+ ret = socket_event_poll_out(this);
+ if (ret >= 0) {
+ /* Suppress errors while making progress. */
+ pfd[1].revents &= ~POLL_MASK_ERROR;
+ } else if (errno == ENOTCONN) {
+ ret = 0;
+ }
+ if (priv->ot_state == OT_PLEASE_DIE) {
+ gf_log (this->name, GF_LOG_TRACE,
+ "OT_IDLE on %p (output request)",
+ this);
+ break;
}
} else {
- char *sock_type = (priv->is_server ? "Server" :
- "Client");
-
- gf_log (this->name, GF_LOG_TRACE,
- "%s socket (%d) is already connected",
- sock_type, priv->sock);
+ /*
+ * This usually means that we left poll() because
+ * somebody pushed a byte onto our pipe. That wakeup
+ * is why the pipe is there, but once awake we can do
+ * all the checking we need on the next iteration.
+ */
ret = 0;
}
+ if (pfd[1].revents & POLL_MASK_ERROR) {
+ gf_log(this->name, GF_LOG_ERROR,
+ "poll error on socket");
+ break;
+ }
+ if (ret < 0) {
+ GF_LOG_OCCASIONALLY (poll_err_cnt, this->name,
+ GF_LOG_ERROR,
+ "socket_poller %s failed (%s)",
+ this->peerinfo.identifier,
+ strerror (errno));
+ break;
+ }
+ if (priv->ot_gen != gen) {
+ gf_log (this->name, GF_LOG_TRACE,
+ "generation mismatch, my %u != %u",
+ gen, priv->ot_gen);
+ return NULL;
+ }
}
- if (!ret && poll_out) {
- ret = socket_event_poll_out (this);
- gf_log (this->name, GF_LOG_TRACE, "(sock:%d) "
- "socket_event_poll_out returned %d", priv->sock, ret);
- }
+err:
+ /* All (and only) I/O errors should come here. */
+ pthread_mutex_lock(&priv->in_lock);
+ pthread_mutex_lock(&priv->out_lock);
+ {
+ gf_log (this->name, GF_LOG_TRACE, "disconnecting socket");
+ __socket_teardown_connection (this);
+ sys_close (priv->sock);
+ priv->sock = -1;
- if (!ret && poll_in) {
- ret = socket_event_poll_in (this, !poll_err);
- gf_log (this->name, GF_LOG_TRACE, "(sock:%d) "
- "socket_event_poll_in returned %d", priv->sock, ret);
- notify_handled = _gf_true;
+ sys_close (priv->pipe[0]);
+ sys_close (priv->pipe[1]);
+ priv->pipe[0] = -1;
+ priv->pipe[1] = -1;
+
+ priv->ot_state = OT_IDLE;
}
+ pthread_mutex_unlock(&priv->out_lock);
+ pthread_mutex_unlock(&priv->in_lock);
- if ((ret < 0) || poll_err) {
- struct sockaddr *sa = SA(&this->peerinfo.sockaddr);
+ rpc_transport_notify (this, RPC_TRANSPORT_DISCONNECT, this);
- if (priv->is_server &&
- SA(&this->myinfo.sockaddr)->sa_family == AF_UNIX) {
- sa = SA(&this->myinfo.sockaddr);
- }
+ GF_REF_PUT (priv);
- socket_dump_info (sa, priv->is_server, priv->use_ssl,
- priv->sock, this->name,
- "disconnecting from");
+ rpc_transport_unref (this);
- /* Logging has happened already in earlier cases */
- gf_log ("transport", ((ret >= 0) ? GF_LOG_INFO : GF_LOG_DEBUG),
- "EPOLLERR - disconnecting (sock:%d) (%s)",
- priv->sock, (priv->use_ssl ? "SSL" : "non-SSL"));
+ return NULL;
+}
- socket_closed = socket_event_poll_err (this, gen, idx);
- if (socket_closed)
- rpc_transport_unref (this);
+static int
+socket_spawn (rpc_transport_t *this)
+{
+ socket_private_t *priv = this->private;
+ int ret = -1;
+ switch (priv->ot_state) {
+ case OT_IDLE:
+ case OT_PLEASE_DIE:
+ break;
+ default:
+ gf_log (this->name, GF_LOG_WARNING,
+ "refusing to start redundant poller");
+ return ret;
+ }
- } else if (!notify_handled) {
- event_handled (ctx->event_pool, fd, idx, gen);
+ priv->ot_gen += 7;
+ priv->ot_state = OT_SPAWNING;
+ gf_log (this->name, GF_LOG_TRACE,
+ "spawning %p with gen %u", this, priv->ot_gen);
+
+ GF_REF_GET (priv);
+
+ /* Create thread after enable detach flag */
+
+ ret = gf_thread_create_detached (&priv->thread, socket_poller, this,
+ "spoller");
+ if (ret) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "could not create poll thread");
+ ret = -1;
}
-out:
return ret;
}
@@ -2904,6 +2730,7 @@ socket_server_event_handler (int fd, int idx, int gen, void *data,
socklen_t addrlen = sizeof (new_sockaddr);
socket_private_t *new_priv = NULL;
glusterfs_ctx_t *ctx = NULL;
+ char *cname = NULL;
this = data;
GF_VALIDATE_OR_GOTO ("socket", this, out);
@@ -2994,10 +2821,6 @@ socket_server_event_handler (int fd, int idx, int gen, void *data,
}
get_transport_identifiers (new_trans);
- gf_log (this->name, GF_LOG_TRACE, "XXX server:%s, client:%s",
- new_trans->myinfo.identifier,
- new_trans->peerinfo.identifier);
-
ret = socket_init(new_trans);
if (ret != 0) {
sys_close (new_sock);
@@ -3033,16 +2856,23 @@ socket_server_event_handler (int fd, int idx, int gen, void *data,
}
new_priv->sock = new_sock;
+ new_priv->own_thread = priv->own_thread;
- new_priv->ssl_enabled = priv->ssl_enabled;
new_priv->ssl_ctx = priv->ssl_ctx;
- new_priv->connected = 1;
- new_priv->is_server = _gf_true;
+ if (new_priv->use_ssl && !new_priv->own_thread) {
+ cname = ssl_setup_connection(new_trans, 1);
+ if (!cname) {
+ gf_log(this->name, GF_LOG_ERROR,
+ "server setup failed");
+ sys_close (new_sock);
+ GF_FREE (new_trans->name);
+ GF_FREE (new_trans);
+ goto out;
+ }
+ this->ssl_name = cname;
+ }
- /* set O_NONBLOCK for plain text as well as ssl connections */
- if (!priv->bio) {
- gf_log (this->name, GF_LOG_TRACE,
- "### use non-blocking IO");
+ if (!priv->bio && !priv->own_thread) {
ret = __socket_nonblock (new_sock);
if (ret == -1) {
@@ -3056,13 +2886,29 @@ socket_server_event_handler (int fd, int idx, int gen, void *data,
goto out;
}
}
+
/*
- * This is the first ref on the newly accepted
- * transport.
+ * In the own_thread case, this is used to
+ * indicate that we're initializing a server
+ * connection.
*/
+ new_priv->connected = 1;
+ new_priv->is_server = _gf_true;
rpc_transport_ref (new_trans);
- {
+ if (new_priv->own_thread) {
+ if (pipe(new_priv->pipe) < 0) {
+ gf_log(this->name, GF_LOG_ERROR,
+ "could not create pipe");
+ }
+ ret = socket_spawn(new_trans);
+ if (ret) {
+ gf_log(this->name, GF_LOG_ERROR,
+ "could not spawn thread");
+ sys_close (new_priv->pipe[0]);
+ sys_close (new_priv->pipe[1]);
+ }
+ } else {
/* Take a ref on the new_trans to avoid
* getting deleted when event_register()
* causes socket_event_handler() to race
@@ -3131,6 +2977,9 @@ out:
if (ctx)
event_handled (ctx->event_pool, fd, idx, gen);
+ if (cname && (cname != this->ssl_name)) {
+ GF_FREE(cname);
+ }
return ret;
}
@@ -3140,12 +2989,36 @@ socket_disconnect (rpc_transport_t *this, gf_boolean_t wait)
{
socket_private_t *priv = NULL;
int ret = -1;
+ char a_byte = 'r';
GF_VALIDATE_OR_GOTO ("socket", this, out);
GF_VALIDATE_OR_GOTO ("socket", this->private, out);
priv = this->private;
+ if (wait && priv->own_thread) {
+ GF_REF_PUT (priv);
+
+ pthread_mutex_lock (&priv->cond_lock);
+ {
+ /* Change the state to OT_PLEASE_DIE so that
+ * socket_poller can exit. */
+ priv->ot_state = OT_PLEASE_DIE;
+ /* Write something into the pipe so that poller
+ * thread can wake up.*/
+ if (sys_write (priv->pipe[1], &a_byte, 1) < 1) {
+ gf_log (this->name, GF_LOG_WARNING,
+ "write error on pipe");
+ }
+
+ /* Wait for socket_poller to exit */
+ if (!priv->own_thread_done)
+ pthread_cond_wait (&priv->cond,
+ &priv->cond_lock);
+ }
+ pthread_mutex_unlock (&priv->cond_lock);
+ }
+
pthread_mutex_lock (&priv->in_lock);
pthread_mutex_lock (&priv->out_lock);
{
@@ -3186,9 +3059,8 @@ socket_fix_ssl_opts (rpc_transport_t *this, socket_private_t *priv,
"%s SSL for portmapper connection",
priv->mgmt_ssl ? "enabling" : "disabling");
priv->use_ssl = priv->mgmt_ssl;
- }
- else if (priv->ssl_enabled && !priv->use_ssl) {
- gf_log(this->name,GF_LOG_DEBUG,
+ } else if (priv->ssl_enabled && !priv->use_ssl) {
+ gf_log(this->name, GF_LOG_DEBUG,
"re-enabling SSL for I/O connection");
priv->use_ssl = _gf_true;
}
@@ -3237,8 +3109,8 @@ socket_connect (rpc_transport_t *this, int port)
gf_boolean_t refd = _gf_false;
socket_connect_error_state_t *arg = NULL;
pthread_t th_id = {0, };
+ char *cname = NULL;
gf_boolean_t ign_enoent = _gf_false;
- gf_boolean_t connect_attempted = _gf_false;
GF_VALIDATE_OR_GOTO ("socket", this, err);
GF_VALIDATE_OR_GOTO ("socket", this->private, err);
@@ -3255,6 +3127,7 @@ socket_connect (rpc_transport_t *this, int port)
pthread_mutex_lock (&priv->in_lock);
pthread_mutex_lock (&priv->out_lock);
{
+ priv->own_thread_done = _gf_false;
if (priv->sock != -1) {
gf_log_callingfn (this->name, GF_LOG_TRACE,
"connect () called on transport "
@@ -3265,7 +3138,8 @@ socket_connect (rpc_transport_t *this, int port)
}
gf_log (this->name, GF_LOG_TRACE,
- "connecting %p, sock=%d", this, priv->sock);
+ "connecting %p, state=%u gen=%u sock=%d", this,
+ priv->ot_state, priv->ot_gen, priv->sock);
ret = socket_client_get_remote_sockaddr (this, &sock_union.sa,
&sockaddr_len, &sa_family);
@@ -3277,8 +3151,7 @@ socket_connect (rpc_transport_t *this, int port)
if (sa_family == AF_UNIX) {
priv->ssl_enabled = _gf_false;
priv->mgmt_ssl = _gf_false;
- }
- else {
+ } else {
if (port > 0) {
sock_union.sin.sin_port = htons (port);
}
@@ -3376,9 +3249,8 @@ socket_connect (rpc_transport_t *this, int port)
}
/* If client wants ENOENT to be ignored */
- ign_enoent = dict_get_str_boolean (this->options,
- "transport.socket.ignore-enoent",
- _gf_false);
+ ign_enoent = dict_get_str_boolean (this->options,
+ "transport.socket.ignore-enoent", _gf_false);
ret = client_bind (this, SA (&this->myinfo.sockaddr),
&this->myinfo.sockaddr_len, priv->sock);
@@ -3388,27 +3260,17 @@ socket_connect (rpc_transport_t *this, int port)
goto handler;
}
- /* make socket non-blocking for all types of sockets */
- if (!priv->bio) {
+ if (!priv->use_ssl && !priv->bio && !priv->own_thread) {
ret = __socket_nonblock (priv->sock);
if (ret == -1) {
gf_log (this->name, GF_LOG_ERROR,
"NBIO on %d failed (%s)",
priv->sock, strerror (errno));
goto handler;
- } else {
- gf_log (this->name, GF_LOG_TRACE,
- ">>> connect() with non-blocking IO for ALL");
}
}
- this->connect_failed = _gf_false;
- priv->connect_failed = 0;
- priv->connected = 0;
-
- socket_dump_info (SA(&this->peerinfo.sockaddr), priv->is_server,
- priv->use_ssl, priv->sock, this->name,
- "connecting to");
+ this->connect_failed = _gf_false;
if (ign_enoent) {
ret = connect_loop (priv->sock,
SA (&this->peerinfo.sockaddr),
@@ -3419,8 +3281,6 @@ socket_connect (rpc_transport_t *this, int port)
this->peerinfo.sockaddr_len);
}
- connect_attempted = _gf_true;
-
if (ret == -1 && errno == ENOENT && ign_enoent) {
gf_log (this->name, GF_LOG_WARNING,
"Ignore failed connection attempt on %s, (%s) ",
@@ -3459,41 +3319,94 @@ socket_connect (rpc_transport_t *this, int port)
priv->connect_failed = 1;
goto handler;
- }
- else {
+ } else {
/* reset connect_failed so that any previous attempts
state is not carried forward */
priv->connect_failed = 0;
ret = 0;
}
+ if (priv->use_ssl && !priv->own_thread) {
+ cname = ssl_setup_connection(this, 0);
+ if (!cname) {
+ errno = ENOTCONN;
+ ret = -1;
+ gf_log(this->name, GF_LOG_ERROR,
+ "client setup failed");
+ goto handler;
+ }
+ if (priv->connected) {
+ this->ssl_name = cname;
+ } else {
+ GF_FREE(cname);
+ }
+ }
+
+ if (!priv->bio && !priv->own_thread) {
+ ret = __socket_nonblock (priv->sock);
+
+ if (ret == -1) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "NBIO on %d failed (%s)",
+ priv->sock, strerror (errno));
+ goto handler;
+ }
+ }
+
handler:
- if (ret < 0 && !connect_attempted) {
+ if (ret < 0) {
/* Ignore error from connect. epoll events
should be handled in the socket handler. shutdown(2)
will result in EPOLLERR, so cleanup is done in
socket_event_handler or socket_poller */
shutdown (priv->sock, SHUT_RDWR);
- gf_log (this->name, GF_LOG_TRACE,
- "@@@ client shutdown(%d, SHUT_RDWR)",
- priv->sock);
}
+ /*
+ * In the own_thread case, this is used to indicate that we're
+ * initializing a client connection.
+ */
priv->connected = 0;
priv->is_server = _gf_false;
rpc_transport_ref (this);
refd = _gf_true;
- this->listener = this;
- priv->idx = event_register (ctx->event_pool, priv->sock,
- socket_event_handler,
- this, 1, 1);
- if (priv->idx == -1) {
- gf_log ("", GF_LOG_WARNING,
- "failed to register the event");
- sys_close (priv->sock);
- priv->sock = -1;
- ret = -1;
+ if (priv->own_thread) {
+ if (priv->connect_failed) {
+ gf_msg_debug (this->name, 0,
+ "socket connect is failed so close it");
+ sys_close (priv->sock);
+ priv->sock = -1;
+ ret = -1;
+ goto unlock;
+ }
+
+ if (pipe(priv->pipe) < 0) {
+ gf_log(this->name, GF_LOG_ERROR,
+ "could not create pipe");
+ }
+
+ this->listener = this;
+ ret = socket_spawn(this);
+ if (ret) {
+ gf_log(this->name, GF_LOG_ERROR,
+ "could not spawn thread");
+ sys_close (priv->pipe[0]);
+ sys_close (priv->pipe[1]);
+ sys_close (priv->sock);
+ priv->sock = -1;
+ }
+ } else {
+ priv->idx = event_register (ctx->event_pool, priv->sock,
+ socket_event_handler,
+ this, 1, 1);
+ if (priv->idx == -1) {
+ gf_log ("", GF_LOG_WARNING,
+ "failed to register the event");
+ sys_close (priv->sock);
+ priv->sock = -1;
+ ret = -1;
+ }
}
unlock:
@@ -3647,10 +3560,6 @@ socket_listen (rpc_transport_t *this)
goto unlock;
}
- socket_dump_info (SA(&this->myinfo.sockaddr), priv->is_server,
- priv->use_ssl, priv->sock, this->name,
- "listening on");
-
ret = listen (priv->sock, priv->backlog);
if (ret == -1) {
@@ -3692,11 +3601,11 @@ socket_submit_request (rpc_transport_t *this, rpc_transport_req_t *req)
{
socket_private_t *priv = NULL;
int ret = -1;
- struct ioq *entry = NULL;
- glusterfs_ctx_t *ctx = NULL;
char need_poll_out = 0;
char need_append = 1;
-
+ struct ioq *entry = NULL;
+ glusterfs_ctx_t *ctx = NULL;
+ char a_byte = 'j';
GF_VALIDATE_OR_GOTO ("socket", this, out);
GF_VALIDATE_OR_GOTO ("socket", this->private, out);
@@ -3734,9 +3643,19 @@ socket_submit_request (rpc_transport_t *this, rpc_transport_req_t *req)
if (need_append) {
list_add_tail (&entry->list, &priv->ioq);
+ if (priv->own_thread) {
+ /*
+ * Make sure the polling thread wakes up, by
+ * writing a byte to represent this entry.
+ */
+ if (sys_write (priv->pipe[1], &a_byte, 1) < 1) {
+ gf_log(this->name, GF_LOG_WARNING,
+ "write error on pipe");
+ }
+ }
ret = 0;
}
- if (need_poll_out) {
+ if (!priv->own_thread && need_poll_out) {
/* first entry to wait. continue writing on POLLOUT */
priv->idx = event_select_on (ctx->event_pool,
priv->sock,
@@ -3756,11 +3675,11 @@ socket_submit_reply (rpc_transport_t *this, rpc_transport_reply_t *reply)
{
socket_private_t *priv = NULL;
int ret = -1;
- struct ioq *entry = NULL;
- glusterfs_ctx_t *ctx = NULL;
char need_poll_out = 0;
char need_append = 1;
-
+ struct ioq *entry = NULL;
+ glusterfs_ctx_t *ctx = NULL;
+ char a_byte = 'd';
GF_VALIDATE_OR_GOTO ("socket", this, out);
GF_VALIDATE_OR_GOTO ("socket", this->private, out);
@@ -3782,7 +3701,6 @@ socket_submit_reply (rpc_transport_t *this, rpc_transport_reply_t *reply)
priv->submit_log = 0;
entry = __socket_ioq_new (this, &reply->msg);
-
if (!entry)
goto unlock;
@@ -3799,9 +3717,19 @@ socket_submit_reply (rpc_transport_t *this, rpc_transport_reply_t *reply)
if (need_append) {
list_add_tail (&entry->list, &priv->ioq);
+ if (priv->own_thread) {
+ /*
+ * Make sure the polling thread wakes up, by
+ * writing a byte to represent this entry.
+ */
+ if (sys_write (priv->pipe[1], &a_byte, 1) < 1) {
+ gf_log(this->name, GF_LOG_WARNING,
+ "write error on pipe");
+ }
+ }
+ ret = 0;
}
-
- if (need_poll_out) {
+ if (!priv->own_thread && need_poll_out) {
/* first entry to wait. continue writing on POLLOUT */
priv->idx = event_select_on (ctx->event_pool,
priv->sock,
@@ -3978,8 +3906,7 @@ reconfigure (rpc_transport_t *this, dict_t *options)
gf_log (this->name, GF_LOG_DEBUG, "Reconfigured transport.socket.keepalive");
priv->keepalive = tmp_bool;
- }
- else
+ } else
priv->keepalive = 1;
if (dict_get_int32 (options, "transport.tcp-user-timeout",
@@ -4154,25 +4081,204 @@ fini_openssl_mt (void)
ERR_free_strings();
}
+static void
+socket_poller_mayday (socket_private_t *priv)
+{
+ if (priv == NULL)
+ return;
+
+ pthread_mutex_lock (&priv->cond_lock);
+ {
+ /* Signal waiting threads before exiting from socket_poller */
+ if (!priv->own_thread_done) {
+ gf_log ("socket", GF_LOG_TRACE, "priv->cond SIGNALED");
+ pthread_cond_signal (&priv->cond);
+ priv->own_thread_done = _gf_true;
+ }
+ }
+ pthread_mutex_unlock (&priv->cond_lock);
+}
+
static int
-ssl_setup_connection_params(rpc_transport_t *this)
+socket_init (rpc_transport_t *this)
{
socket_private_t *priv = NULL;
+ gf_boolean_t tmp_bool = 0;
+ uint64_t windowsize = GF_DEFAULT_SOCKET_WINDOW_SIZE;
char *optstr = NULL;
- static int session_id = 1;
+ uint32_t timeout = 0;
+ int keepaliveidle = GF_KEEPALIVE_TIME;
+ int keepaliveintvl = GF_KEEPALIVE_INTERVAL;
+ int keepalivecnt = GF_KEEPALIVE_COUNT;
+ uint32_t backlog = 0;
+ int session_id = 0;
int32_t cert_depth = DEFAULT_VERIFY_DEPTH;
char *cipher_list = DEFAULT_CIPHER_LIST;
char *dh_param = DEFAULT_DH_PARAM;
char *ec_curve = DEFAULT_EC_CURVE;
char *crl_path = NULL;
- priv = this->private;
+ if (this->private) {
+ gf_log_callingfn (this->name, GF_LOG_ERROR,
+ "double init attempted");
+ return -1;
+ }
+
+ priv = GF_CALLOC (1, sizeof (*priv), gf_common_mt_socket_private_t);
+ if (!priv) {
+ return -1;
+ }
+ memset(priv, 0, sizeof(*priv));
+
+ pthread_mutex_init (&priv->in_lock, NULL);
+ pthread_mutex_init (&priv->out_lock, NULL);
+ pthread_mutex_init (&priv->cond_lock, NULL);
+ pthread_cond_init (&priv->cond, NULL);
+
+ GF_REF_INIT (priv, socket_poller_mayday);
+
+ priv->sock = -1;
+ priv->idx = -1;
+ priv->connected = -1;
+ priv->nodelay = 1;
+ priv->bio = 0;
+ priv->windowsize = GF_DEFAULT_SOCKET_WINDOW_SIZE;
+ INIT_LIST_HEAD (&priv->ioq);
+ pthread_mutex_init (&priv->notify.lock, NULL);
+ pthread_cond_init (&priv->notify.cond, NULL);
+
+ /* All the below section needs 'this->options' to be present */
+ if (!this->options)
+ goto out;
+
+ if (dict_get (this->options, "non-blocking-io")) {
+ optstr = data_to_str (dict_get (this->options,
+ "non-blocking-io"));
+
+ if (gf_string2boolean (optstr, &tmp_bool) == -1) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "'non-blocking-io' takes only boolean options,"
+ " not taking any action");
+ tmp_bool = 1;
+ }
+
+ if (!tmp_bool) {
+ priv->bio = 1;
+ gf_log (this->name, GF_LOG_WARNING,
+ "disabling non-blocking IO");
+ }
+ }
- if (priv->ssl_ctx != NULL) {
- gf_log (this->name, GF_LOG_TRACE, "found old SSL context!");
- return 0;
+ optstr = NULL;
+
+ /* By default, we enable NODELAY */
+ if (dict_get (this->options, "transport.socket.nodelay")) {
+ optstr = data_to_str (dict_get (this->options,
+ "transport.socket.nodelay"));
+
+ if (gf_string2boolean (optstr, &tmp_bool) == -1) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "'transport.socket.nodelay' takes only "
+ "boolean options, not taking any action");
+ tmp_bool = 1;
+ }
+ if (!tmp_bool) {
+ priv->nodelay = 0;
+ gf_log (this->name, GF_LOG_DEBUG,
+ "disabling nodelay");
+ }
}
+ optstr = NULL;
+ if (dict_get_str (this->options, "tcp-window-size",
+ &optstr) == 0) {
+ if (gf_string2uint64 (optstr, &windowsize) != 0) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "invalid number format: %s", optstr);
+ return -1;
+ }
+ }
+
+ priv->windowsize = (int)windowsize;
+
+ optstr = NULL;
+ /* Enable Keep-alive by default. */
+ priv->keepalive = 1;
+ priv->keepaliveintvl = GF_KEEPALIVE_INTERVAL;
+ priv->keepaliveidle = GF_KEEPALIVE_TIME;
+ priv->keepalivecnt = GF_KEEPALIVE_COUNT;
+ if (dict_get_str (this->options, "transport.socket.keepalive",
+ &optstr) == 0) {
+ if (gf_string2boolean (optstr, &tmp_bool) == -1) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "'transport.socket.keepalive' takes only "
+ "boolean options, not taking any action");
+ tmp_bool = 1;
+ }
+
+ if (!tmp_bool)
+ priv->keepalive = 0;
+ }
+
+ if (dict_get_int32 (this->options, "transport.tcp-user-timeout",
+ &(priv->timeout)) != 0)
+ priv->timeout = timeout;
+ gf_log (this->name, GF_LOG_DEBUG, "Configued "
+ "transport.tcp-user-timeout=%d", priv->timeout);
+
+ if (dict_get_int32 (this->options,
+ "transport.socket.keepalive-time",
+ &(priv->keepaliveidle)) != 0) {
+ priv->keepaliveidle = keepaliveidle;
+ }
+
+ if (dict_get_int32 (this->options,
+ "transport.socket.keepalive-interval",
+ &(priv->keepaliveintvl)) != 0) {
+ priv->keepaliveintvl = keepaliveintvl;
+ }
+
+ if (dict_get_int32 (this->options, "transport.socket.keepalive-count",
+ &(priv->keepalivecnt)) != 0)
+ priv->keepalivecnt = keepalivecnt;
+ gf_log (this->name, GF_LOG_DEBUG, "Reconfigued "
+ "transport.keepalivecnt=%d", keepalivecnt);
+
+ if (dict_get_uint32 (this->options,
+ "transport.listen-backlog",
+ &backlog) != 0) {
+
+ backlog = GLUSTERFS_SOCKET_LISTEN_BACKLOG;
+ }
+ priv->backlog = backlog;
+
+ optstr = NULL;
+
+ /* Check if socket read failures are to be logged */
+ priv->read_fail_log = 1;
+ if (dict_get (this->options, "transport.socket.read-fail-log")) {
+ optstr = data_to_str (dict_get (this->options, "transport.socket.read-fail-log"));
+ if (gf_string2boolean (optstr, &tmp_bool) == -1) {
+ gf_log (this->name, GF_LOG_WARNING,
+ "'transport.socket.read-fail-log' takes only "
+ "boolean options; logging socket read fails");
+ } else if (tmp_bool == _gf_false) {
+ priv->read_fail_log = 0;
+ }
+ }
+
+ priv->windowsize = (int)windowsize;
+
+ priv->ssl_enabled = _gf_false;
+ if (dict_get_str(this->options, SSL_ENABLED_OPT, &optstr) == 0) {
+ if (gf_string2boolean (optstr, &priv->ssl_enabled) != 0) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "invalid value given for ssl-enabled boolean");
+ }
+ }
+ priv->mgmt_ssl = this->ctx->secure_mgmt;
+ priv->srvr_ssl = this->ctx->secure_srvr;
+
priv->ssl_own_cert = DEFAULT_CERT_PATH;
if (dict_get_str(this->options, SSL_OWN_CERT_OPT, &optstr) == 0) {
if (!priv->ssl_enabled) {
@@ -4224,6 +4330,23 @@ ssl_setup_connection_params(rpc_transport_t *this)
gf_log(this->name, priv->mgmt_ssl ? GF_LOG_INFO: GF_LOG_DEBUG,
"SSL support for glusterd is %s",
priv->mgmt_ssl ? "ENABLED" : "NOT enabled");
+ /*
+ * This might get overridden temporarily in socket_connect (q.v.)
+ * if we're using the glusterd portmapper.
+ */
+ priv->use_ssl = priv->ssl_enabled;
+
+ priv->own_thread = priv->use_ssl;
+ if (dict_get_str(this->options, OWN_THREAD_OPT, &optstr) == 0) {
+ gf_log (this->name, GF_LOG_INFO, "OWN_THREAD_OPT found");
+ if (gf_string2boolean (optstr, &priv->own_thread) != 0) {
+ gf_log (this->name, GF_LOG_WARNING,
+ "invalid value given for own-thread boolean");
+ }
+ }
+ gf_log(this->name, priv->own_thread ? GF_LOG_INFO : GF_LOG_DEBUG,
+ "using %s polling thread",
+ priv->own_thread ? "private" : "system");
if (!dict_get_int32 (this->options, SSL_CERT_DEPTH_OPT, &cert_depth)) {
gf_log (this->name, GF_LOG_INFO,
@@ -4258,7 +4381,7 @@ ssl_setup_connection_params(rpc_transport_t *this)
#error Old and insecure OpenSSL, use -DUSE_INSECURE_OPENSSL to use it anyway
#endif
/* SSLv23_method uses highest available protocol */
- priv->ssl_meth = SSLv23_method();
+ priv->ssl_meth = (SSL_METHOD *)SSLv23_method();
#endif
priv->ssl_ctx = SSL_CTX_new(priv->ssl_meth);
@@ -4333,7 +4456,7 @@ ssl_setup_connection_params(rpc_transport_t *this)
/* This must be done after DH and ECDH setups */
if (SSL_CTX_set_cipher_list(priv->ssl_ctx, cipher_list) == 0) {
- gf_log(this->name,GF_LOG_ERROR,
+ gf_log(this->name, GF_LOG_ERROR,
"failed to find any valid ciphers");
goto err;
}
@@ -4343,7 +4466,7 @@ ssl_setup_connection_params(rpc_transport_t *this)
if (!SSL_CTX_use_certificate_chain_file(priv->ssl_ctx,
priv->ssl_own_cert)) {
- gf_log(this->name,GF_LOG_ERROR,
+ gf_log(this->name, GF_LOG_ERROR,
"could not load our cert");
goto err;
}
@@ -4374,15 +4497,14 @@ ssl_setup_connection_params(rpc_transport_t *this)
x509store = SSL_CTX_get_cert_store(priv->ssl_ctx);
X509_STORE_set_flags(x509store,
- X509_V_FLAG_CRL_CHECK |
- X509_V_FLAG_CRL_CHECK_ALL);
+ X509_V_FLAG_CRL_CHECK|X509_V_FLAG_CRL_CHECK_ALL);
#else
gf_log(this->name, GF_LOG_ERROR,
"OpenSSL version does not support CRL");
#endif
}
- priv->ssl_session_id = session_id++;
+ priv->ssl_session_id = ++session_id;
SSL_CTX_set_session_id_context(priv->ssl_ctx,
(void *)&priv->ssl_session_id,
sizeof(priv->ssl_session_id));
@@ -4397,194 +4519,27 @@ ssl_setup_connection_params(rpc_transport_t *this)
*/
SSL_CTX_set_purpose(priv->ssl_ctx, X509_PURPOSE_ANY);
}
- return 0;
-err:
- return -1;
-}
-
-static int
-socket_init (rpc_transport_t *this)
-{
- socket_private_t *priv = NULL;
- gf_boolean_t tmp_bool = 0;
- uint64_t windowsize = GF_DEFAULT_SOCKET_WINDOW_SIZE;
- char *optstr = NULL;
- uint32_t timeout = 0;
- int keepaliveidle = GF_KEEPALIVE_TIME;
- int keepaliveintvl = GF_KEEPALIVE_INTERVAL;
- int keepalivecnt = GF_KEEPALIVE_COUNT;
- uint32_t backlog = 0;
-
-
- if (this->private) {
- gf_log_callingfn (this->name, GF_LOG_ERROR,
- "double init attempted");
- return -1;
- }
-
- priv = GF_CALLOC (1, sizeof (*priv), gf_common_mt_socket_private_t);
- if (!priv) {
- return -1;
+ if (priv->own_thread) {
+ priv->ot_state = OT_IDLE;
}
- memset(priv, 0, sizeof(*priv));
+out:
this->private = priv;
- pthread_mutex_init (&priv->in_lock, NULL);
- pthread_mutex_init (&priv->out_lock, NULL);
- pthread_mutex_init (&priv->cond_lock, NULL);
- pthread_cond_init (&priv->cond, NULL);
-
- /*GF_REF_INIT (priv, socket_poller_mayday);*/
-
- priv->sock = -1;
- priv->idx = -1;
- priv->connected = -1;
- priv->nodelay = 1;
- priv->bio = 0;
- priv->ssl_accepted = _gf_false;
- priv->ssl_connected = _gf_false;
- priv->windowsize = GF_DEFAULT_SOCKET_WINDOW_SIZE;
- INIT_LIST_HEAD (&priv->ioq);
- pthread_mutex_init (&priv->notify.lock, NULL);
- pthread_cond_init (&priv->notify.cond, NULL);
-
- /* All the below section needs 'this->options' to be present */
- if (!this->options)
- goto out;
-
- if (dict_get (this->options, "non-blocking-io")) {
- optstr = data_to_str (dict_get (this->options,
- "non-blocking-io"));
-
- if (gf_string2boolean (optstr, &tmp_bool) == -1) {
- gf_log (this->name, GF_LOG_ERROR,
- "'non-blocking-io' takes only boolean options,"
- " not taking any action");
- tmp_bool = 1;
- }
-
- if (!tmp_bool) {
- priv->bio = 1;
- gf_log (this->name, GF_LOG_WARNING,
- "disabling non-blocking IO");
- }
- }
-
- optstr = NULL;
-
- /* By default, we enable NODELAY */
- if (dict_get (this->options, "transport.socket.nodelay")) {
- optstr = data_to_str (dict_get (this->options,
- "transport.socket.nodelay"));
-
- if (gf_string2boolean (optstr, &tmp_bool) == -1) {
- gf_log (this->name, GF_LOG_ERROR,
- "'transport.socket.nodelay' takes only "
- "boolean options, not taking any action");
- tmp_bool = 1;
- }
- if (!tmp_bool) {
- priv->nodelay = 0;
- gf_log (this->name, GF_LOG_DEBUG,
- "disabling nodelay");
- }
- }
-
- optstr = NULL;
- if (dict_get_str (this->options, "tcp-window-size",
- &optstr) == 0) {
- if (gf_string2uint64 (optstr, &windowsize) != 0) {
- gf_log (this->name, GF_LOG_ERROR,
- "invalid number format: %s", optstr);
- return -1;
- }
- }
-
- priv->windowsize = (int)windowsize;
-
- optstr = NULL;
- /* Enable Keep-alive by default. */
- priv->keepalive = 1;
- priv->keepaliveintvl = GF_KEEPALIVE_INTERVAL;
- priv->keepaliveidle = GF_KEEPALIVE_TIME;
- priv->keepalivecnt = GF_KEEPALIVE_COUNT;
- if (dict_get_str (this->options, "transport.socket.keepalive",
- &optstr) == 0) {
- if (gf_string2boolean (optstr, &tmp_bool) == -1) {
- gf_log (this->name, GF_LOG_ERROR,
- "'transport.socket.keepalive' takes only "
- "boolean options, not taking any action");
- tmp_bool = 1;
- }
-
- if (!tmp_bool)
- priv->keepalive = 0;
- }
-
- if (dict_get_int32 (this->options, "transport.tcp-user-timeout",
- &(priv->timeout)) != 0)
- priv->timeout = timeout;
- gf_log (this->name, GF_LOG_DEBUG, "Configued "
- "transport.tcp-user-timeout=%d", priv->timeout);
-
- if (dict_get_int32 (this->options,
- "transport.socket.keepalive-time",
- &(priv->keepaliveidle)) != 0) {
- priv->keepaliveidle = keepaliveidle;
- }
-
- if (dict_get_int32 (this->options,
- "transport.socket.keepalive-interval",
- &(priv->keepaliveintvl)) != 0) {
- priv->keepaliveintvl = keepaliveintvl;
- }
-
- if (dict_get_int32 (this->options, "transport.socket.keepalive-count",
- &(priv->keepalivecnt)) != 0)
- priv->keepalivecnt = keepalivecnt;
- gf_log (this->name, GF_LOG_DEBUG, "Reconfigued "
- "transport.keepalivecnt=%d", keepalivecnt);
+ return 0;
- if (dict_get_uint32 (this->options,
- "transport.listen-backlog",
- &backlog) != 0) {
- backlog = GLUSTERFS_SOCKET_LISTEN_BACKLOG;
+err:
+ if (priv->ssl_own_cert) {
+ GF_FREE(priv->ssl_own_cert);
}
- priv->backlog = backlog;
-
- optstr = NULL;
-
- /* Check if socket read failures are to be logged */
- priv->read_fail_log = 1;
- if (dict_get (this->options, "transport.socket.read-fail-log")) {
- optstr = data_to_str (dict_get (this->options,
- "transport.socket.read-fail-log"));
- if (gf_string2boolean (optstr, &tmp_bool) == -1) {
- gf_log (this->name, GF_LOG_WARNING,
- "'transport.socket.read-fail-log' takes only "
- "boolean options; logging socket read fails");
- } else if (tmp_bool == _gf_false) {
- priv->read_fail_log = 0;
- }
+ if (priv->ssl_private_key) {
+ GF_FREE(priv->ssl_private_key);
}
-
- priv->windowsize = (int)windowsize;
-
- priv->ssl_enabled = _gf_false;
- if (dict_get_str(this->options, SSL_ENABLED_OPT, &optstr) == 0) {
- if (gf_string2boolean (optstr, &priv->ssl_enabled) != 0) {
- gf_log (this->name, GF_LOG_ERROR,
- "invalid value given for ssl-enabled boolean");
- }
+ if (priv->ssl_ca_list) {
+ GF_FREE(priv->ssl_ca_list);
}
- priv->mgmt_ssl = this->ctx->secure_mgmt;
- priv->srvr_ssl = this->ctx->secure_srvr;
-
- ssl_setup_connection_params(this);
-out:
- this->private = priv;
- return 0;
+ GF_FREE(priv);
+ return -1;
}
diff --git a/rpc/rpc-transport/socket/src/socket.h b/rpc/rpc-transport/socket/src/socket.h
index fdfc20774a8..ccc2a84cb35 100644
--- a/rpc/rpc-transport/socket/src/socket.h
+++ b/rpc/rpc-transport/socket/src/socket.h
@@ -248,36 +248,12 @@ typedef struct {
char *ssl_ca_list;
pthread_t thread;
int pipe[2];
+ gf_boolean_t own_thread;
+ gf_boolean_t own_thread_done;
+ ot_state_t ot_state;
+ uint32_t ot_gen;
gf_boolean_t is_server;
int log_ctr;
- gf_boolean_t ssl_accepted; /* To indicate SSL_accept() */
- gf_boolean_t ssl_connected;/* or SSL_connect() has been
- * been completed on this socket.
- * These are valid only when
- * use_ssl is true.
- */
- /* SSL_CTX is created for each transport. Since we are now using non-
- * blocking mechanism for SSL_accept() and SSL_connect(), the SSL
- * context is created on the first EPOLLIN event which may lead to
- * SSL_ERROR_WANT_READ/SSL_ERROR_WANT_WRITE and may not complete the
- * SSL connection at the first attempt.
- * ssl_context_created is a flag to note that we've created the SSL
- * context for the connection so that we don't blindly create any more
- * while !ssl_accepted or !ssl_connected.
- */
- gf_boolean_t ssl_context_created;
- gf_boolean_t accepted; /* explicit flag to be set in
- * socket_event_handler() for
- * newly accepted socket
- */
-
- /* ssl_error_required is used only during the SSL connection setup
- * phase.
- * It holds the error code returned by SSL_get_error() and is used to
- * arm the epoll event set for the required event for the specific fd.
- */
- int ssl_error_required;
-
GF_REF_DECL; /* refcount to keep track of socket_poller
threads */
struct {