summaryrefslogtreecommitdiffstats
path: root/rpc/rpc-transport/socket/src/socket.c
diff options
context:
space:
mode:
Diffstat (limited to 'rpc/rpc-transport/socket/src/socket.c')
-rw-r--r--rpc/rpc-transport/socket/src/socket.c2699
1 files changed, 1947 insertions, 752 deletions
diff --git a/rpc/rpc-transport/socket/src/socket.c b/rpc/rpc-transport/socket/src/socket.c
index 9e85c3ca9..93da3f296 100644
--- a/rpc/rpc-transport/socket/src/socket.c
+++ b/rpc/rpc-transport/socket/src/socket.c
@@ -1,20 +1,11 @@
/*
- Copyright (c) 2010 Gluster, Inc. <http://www.gluster.com>
+ Copyright (c) 2008-2012 Red Hat, Inc. <http://www.redhat.com>
This file is part of GlusterFS.
- GlusterFS is free software; you can redistribute it and/or modify
- it under the terms of the GNU Affero General Public License as published
- by the Free Software Foundation; either version 3 of the License,
- or (at your option) any later version.
-
- GlusterFS is distributed in the hope that it will be useful, but
- WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- Affero General Public License for more details.
-
- You should have received a copy of the GNU Affero General Public License
- along with this program. If not, see
- <http://www.gnu.org/licenses/>.
+ This file is licensed to you under your choice of the GNU Lesser
+ General Public License, version 3 or any later version (LGPLv3 or
+ later), or the GNU General Public License, version 2 (GPLv2), in all
+ cases as published by the Free Software Foundation.
*/
@@ -37,104 +28,408 @@
/* ugly #includes below */
#include "protocol-common.h"
#include "glusterfs3-xdr.h"
-#include "glusterfs3.h"
+#include "xdr-nfs3.h"
+#include "rpcsvc.h"
#include <fcntl.h>
#include <errno.h>
#include <netinet/tcp.h>
#include <rpc/xdr.h>
-
+#include <sys/ioctl.h>
#define GF_LOG_ERRNO(errno) ((errno == ENOTCONN) ? GF_LOG_DEBUG : GF_LOG_ERROR)
#define SA(ptr) ((struct sockaddr *)ptr)
+#define SSL_ENABLED_OPT "transport.socket.ssl-enabled"
+#define SSL_OWN_CERT_OPT "transport.socket.ssl-own-cert"
+#define SSL_PRIVATE_KEY_OPT "transport.socket.ssl-private-key"
+#define SSL_CA_LIST_OPT "transport.socket.ssl-ca-list"
+#define OWN_THREAD_OPT "transport.socket.own-thread"
+
+/* TBD: do automake substitutions etc. (ick) to set these. */
+#if !defined(DEFAULT_CERT_PATH)
+#define DEFAULT_CERT_PATH "/etc/ssl/glusterfs.pem"
+#endif
+#if !defined(DEFAULT_KEY_PATH)
+#define DEFAULT_KEY_PATH "/etc/ssl/glusterfs.key"
+#endif
+#if !defined(DEFAULT_CA_PATH)
+#define DEFAULT_CA_PATH "/etc/ssl/glusterfs.ca"
+#endif
-#define __socket_proto_reset_pending(priv) do { \
- memset (&priv->incoming.frag.vector, 0, \
- sizeof (priv->incoming.frag.vector)); \
- priv->incoming.frag.pending_vector = \
- &priv->incoming.frag.vector; \
- priv->incoming.frag.pending_vector->iov_base = \
- priv->incoming.frag.fragcurrent; \
- priv->incoming.pending_vector = \
- priv->incoming.frag.pending_vector; \
- } while (0);
-
-
-#define __socket_proto_update_pending(priv) \
- do { \
- uint32_t remaining_fragsize = 0; \
- if (priv->incoming.frag.pending_vector->iov_len == 0) { \
- remaining_fragsize = RPC_FRAGSIZE (priv->incoming.fraghdr) \
- - priv->incoming.frag.bytes_read; \
- \
- priv->incoming.frag.pending_vector->iov_len = \
- remaining_fragsize > priv->incoming.frag.remaining_size \
- ? priv->incoming.frag.remaining_size : remaining_fragsize; \
- \
- priv->incoming.frag.remaining_size -= \
- priv->incoming.frag.pending_vector->iov_len; \
- } \
- } while (0);
-
-#define __socket_proto_update_priv_after_read(priv, ret, bytes_read) \
- { \
- priv->incoming.frag.fragcurrent += bytes_read; \
- priv->incoming.frag.bytes_read += bytes_read; \
- \
- if ((ret > 0) || (priv->incoming.frag.remaining_size != 0)) { \
- if (priv->incoming.frag.remaining_size != 0) { \
- __socket_proto_reset_pending (priv); \
- } \
- \
- gf_log (this->name, GF_LOG_TRACE, "partial read on non-blocking socket"); \
- \
- break; \
- } \
- }
-
-#define __socket_proto_init_pending(priv, size) \
- do { \
- uint32_t remaining_fragsize = 0; \
- remaining_fragsize = RPC_FRAGSIZE (priv->incoming.fraghdr) \
- - priv->incoming.frag.bytes_read; \
- \
- __socket_proto_reset_pending (priv); \
- \
- priv->incoming.frag.pending_vector->iov_len = \
- remaining_fragsize > size ? size : remaining_fragsize; \
- \
- priv->incoming.frag.remaining_size = \
- size - priv->incoming.frag.pending_vector->iov_len; \
- \
-} while (0);
+#define POLL_MASK_INPUT (POLLIN | POLLPRI)
+#define POLL_MASK_OUTPUT (POLLOUT)
+#define POLL_MASK_ERROR (POLLERR | POLLHUP | POLLNVAL)
+
+typedef int SSL_unary_func (SSL *);
+typedef int SSL_trinary_func (SSL *, void *, int);
+
+#define __socket_proto_reset_pending(priv) do { \
+ struct gf_sock_incoming_frag *frag; \
+ frag = &priv->incoming.frag; \
+ \
+ memset (&frag->vector, 0, sizeof (frag->vector)); \
+ frag->pending_vector = &frag->vector; \
+ frag->pending_vector->iov_base = frag->fragcurrent; \
+ priv->incoming.pending_vector = frag->pending_vector; \
+ } while (0)
+
+
+#define __socket_proto_update_pending(priv) \
+ do { \
+ uint32_t remaining; \
+ struct gf_sock_incoming_frag *frag; \
+ frag = &priv->incoming.frag; \
+ if (frag->pending_vector->iov_len == 0) { \
+ remaining = (RPC_FRAGSIZE (priv->incoming.fraghdr) \
+ - frag->bytes_read); \
+ \
+ frag->pending_vector->iov_len = \
+ (remaining > frag->remaining_size) \
+ ? frag->remaining_size : remaining; \
+ \
+ frag->remaining_size -= \
+ frag->pending_vector->iov_len; \
+ } \
+ } while (0)
+
+#define __socket_proto_update_priv_after_read(priv, ret, bytes_read) \
+ { \
+ struct gf_sock_incoming_frag *frag; \
+ frag = &priv->incoming.frag; \
+ \
+ frag->fragcurrent += bytes_read; \
+ frag->bytes_read += bytes_read; \
+ \
+ if ((ret > 0) || (frag->remaining_size != 0)) { \
+ if (frag->remaining_size != 0 && ret == 0) { \
+ __socket_proto_reset_pending (priv); \
+ } \
+ \
+ gf_log (this->name, GF_LOG_TRACE, \
+ "partial read on non-blocking socket"); \
+ \
+ break; \
+ } \
+ }
+
+#define __socket_proto_init_pending(priv,size) \
+ do { \
+ uint32_t remaining = 0; \
+ struct gf_sock_incoming_frag *frag; \
+ frag = &priv->incoming.frag; \
+ \
+ remaining = (RPC_FRAGSIZE (priv->incoming.fraghdr) \
+ - frag->bytes_read); \
+ \
+ __socket_proto_reset_pending (priv); \
+ \
+ frag->pending_vector->iov_len = \
+ (remaining > size) ? size : remaining; \
+ \
+ frag->remaining_size = (size - frag->pending_vector->iov_len); \
+ \
+ } while(0)
/* This will be used in a switch case and breaks from the switch case if all
* the pending data is not read.
*/
-#define __socket_proto_read(priv, ret) \
- { \
- size_t bytes_read = 0; \
- \
- __socket_proto_update_pending (priv); \
- \
- ret = __socket_readv (this, \
- priv->incoming.pending_vector, 1, \
- &priv->incoming.pending_vector, \
- &priv->incoming.pending_count, \
- &bytes_read); \
- if (ret == -1) { \
- gf_log (this->name, GF_LOG_TRACE, \
- "reading from socket failed. Error (%s), " \
- "peer (%s)", strerror (errno), \
- this->peerinfo.identifier); \
- break; \
- } \
+#define __socket_proto_read(priv, ret) \
+ { \
+ size_t bytes_read = 0; \
+ struct gf_sock_incoming *in; \
+ in = &priv->incoming; \
+ \
+ __socket_proto_update_pending (priv); \
+ \
+ ret = __socket_readv (this, \
+ in->pending_vector, 1, \
+ &in->pending_vector, \
+ &in->pending_count, \
+ &bytes_read); \
+ if (ret == -1) \
+ break; \
__socket_proto_update_priv_after_read (priv, ret, bytes_read); \
- }
+ }
+
+static int socket_init (rpc_transport_t *this);
+
+static void
+ssl_dump_error_stack (const char *caller)
+{
+ unsigned long errnum = 0;
+ char errbuf[120] = {0,};
+
+ /* OpenSSL docs explicitly give 120 as the error-string length. */
+
+ while ((errnum = ERR_get_error())) {
+ ERR_error_string(errnum,errbuf);
+ gf_log(caller,GF_LOG_ERROR," %s",errbuf);
+ }
+}
+
+static int
+ssl_do (rpc_transport_t *this, void *buf, size_t len, SSL_trinary_func *func)
+{
+ int r = (-1);
+ struct pollfd pfd = {-1,};
+ socket_private_t *priv = NULL;
+
+ GF_VALIDATE_OR_GOTO(this->name,this->private,out);
+ priv = this->private;
+
+ for (;;) {
+ if (buf) {
+ if (priv->connected == -1) {
+ /*
+ * Fields in the SSL structure (especially
+ * the BIO pointers) are not valid at this
+ * point, so we'll segfault if we pass them
+ * to SSL_read/SSL_write.
+ */
+ gf_log(this->name,GF_LOG_INFO,
+ "lost connection in %s", __func__);
+ break;
+ }
+ r = func(priv->ssl_ssl,buf,len);
+ }
+ else {
+ /*
+ * We actually need these functions to get to
+ * priv->connected == 1.
+ */
+ r = ((SSL_unary_func *)func)(priv->ssl_ssl);
+ }
+ switch (SSL_get_error(priv->ssl_ssl,r)) {
+ case SSL_ERROR_NONE:
+ return r;
+ case SSL_ERROR_WANT_READ:
+ pfd.fd = priv->sock;
+ pfd.events = POLLIN;
+ if (poll(&pfd,1,-1) < 0) {
+ gf_log(this->name,GF_LOG_ERROR,"poll error %d",
+ errno);
+ }
+ break;
+ case SSL_ERROR_WANT_WRITE:
+ pfd.fd = priv->sock;
+ pfd.events = POLLOUT;
+ if (poll(&pfd,1,-1) < 0) {
+ gf_log(this->name,GF_LOG_ERROR,"poll error %d",
+ errno);
+ }
+ break;
+ case SSL_ERROR_SYSCALL:
+ /* This is what we get when remote disconnects. */
+ gf_log(this->name,GF_LOG_DEBUG,
+ "syscall error (probably remote disconnect)");
+ errno = ENODATA;
+ goto out;
+ default:
+ errno = EIO;
+ goto out; /* "break" would just loop again */
+ }
+ }
+out:
+ return -1;
+}
+
+#define ssl_connect_one(t) ssl_do((t),NULL,0,(SSL_trinary_func *)SSL_connect)
+#define ssl_accept_one(t) ssl_do((t),NULL,0,(SSL_trinary_func *)SSL_accept)
+#define ssl_read_one(t,b,l) ssl_do((t),(b),(l),(SSL_trinary_func *)SSL_read)
+#define ssl_write_one(t,b,l) ssl_do((t),(b),(l),(SSL_trinary_func *)SSL_write)
+
+static int
+ssl_setup_connection (rpc_transport_t *this, int server)
+{
+ X509 *peer = NULL;
+ char peer_CN[256] = "";
+ int ret = -1;
+ socket_private_t *priv = NULL;
+
+ GF_VALIDATE_OR_GOTO(this->name,this->private,done);
+ priv = this->private;
+
+ priv->ssl_ssl = SSL_new(priv->ssl_ctx);
+ if (!priv->ssl_ssl) {
+ gf_log(this->name,GF_LOG_ERROR,"SSL_new failed");
+ ssl_dump_error_stack(this->name);
+ goto done;
+ }
+ priv->ssl_sbio = BIO_new_socket(priv->sock,BIO_NOCLOSE);
+ if (!priv->ssl_sbio) {
+ gf_log(this->name,GF_LOG_ERROR,"BIO_new_socket failed");
+ ssl_dump_error_stack(this->name);
+ goto free_ssl;
+ }
+ SSL_set_bio(priv->ssl_ssl,priv->ssl_sbio,priv->ssl_sbio);
+
+ if (server) {
+ ret = ssl_accept_one(this);
+ }
+ else {
+ ret = ssl_connect_one(this);
+ }
+
+ /* Make sure _the call_ succeeded. */
+ if (ret < 0) {
+ goto ssl_error;
+ }
+
+ /* Make sure _SSL verification_ succeeded, yielding an identity. */
+ if (SSL_get_verify_result(priv->ssl_ssl) != X509_V_OK) {
+ goto ssl_error;
+ }
+ peer = SSL_get_peer_certificate(priv->ssl_ssl);
+ if (!peer) {
+ goto ssl_error;
+ }
+ /* Finally, everything seems OK. */
+ X509_NAME_get_text_by_NID(X509_get_subject_name(peer),
+ NID_commonName, peer_CN, sizeof(peer_CN)-1);
+ peer_CN[sizeof(peer_CN)-1] = '\0';
+ gf_log(this->name,GF_LOG_INFO,"peer CN = %s", peer_CN);
+ return 0;
+
+ /* Error paths. */
+ssl_error:
+ gf_log(this->name,GF_LOG_ERROR,"SSL connect error");
+ ssl_dump_error_stack(this->name);
+free_ssl:
+ SSL_free(priv->ssl_ssl);
+ priv->ssl_ssl = NULL;
+done:
+ return ret;
+}
+
+
+static void
+ssl_teardown_connection (socket_private_t *priv)
+{
+ SSL_shutdown(priv->ssl_ssl);
+ SSL_clear(priv->ssl_ssl);
+ SSL_free(priv->ssl_ssl);
+ priv->ssl_ssl = NULL;
+}
+
+
+static ssize_t
+__socket_ssl_readv (rpc_transport_t *this, struct iovec *opvector, int opcount)
+{
+ socket_private_t *priv = NULL;
+ int sock = -1;
+ int ret = -1;
+
+ priv = this->private;
+ sock = priv->sock;
+
+ if (priv->use_ssl) {
+ ret = ssl_read_one (this, opvector->iov_base, opvector->iov_len);
+ } else {
+ ret = readv (sock, opvector, opcount);
+ }
+
+ return ret;
+}
+
+
+static ssize_t
+__socket_ssl_read (rpc_transport_t *this, void *buf, size_t count)
+{
+ struct iovec iov = {0, };
+ int ret = -1;
+
+ iov.iov_base = buf;
+ iov.iov_len = count;
+
+ ret = __socket_ssl_readv (this, &iov, 1);
+
+ return ret;
+}
-int socket_init (rpc_transport_t *this);
+
+static int
+__socket_cached_read (rpc_transport_t *this, struct iovec *opvector, int opcount)
+{
+ socket_private_t *priv = NULL;
+ int sock = -1;
+ struct gf_sock_incoming *in = NULL;
+ int req_len = -1;
+ int ret = -1;
+
+ priv = this->private;
+ sock = priv->sock;
+ in = &priv->incoming;
+ req_len = iov_length (opvector, opcount);
+
+ if (in->record_state == SP_STATE_READING_FRAGHDR) {
+ in->ra_read = 0;
+ in->ra_served = 0;
+ in->ra_max = 0;
+ in->ra_buf = NULL;
+ goto uncached;
+ }
+
+ if (!in->ra_max) {
+ /* first call after passing SP_STATE_READING_FRAGHDR */
+ in->ra_max = min (RPC_FRAGSIZE (in->fraghdr), GF_SOCKET_RA_MAX);
+ /* Note that the in->iobuf is the primary iobuf into which
+ headers are read into. By using this itself as our
+ read-ahead cache, we can avoid memory copies in iov_load
+ */
+ in->ra_buf = iobuf_ptr (in->iobuf);
+ }
+
+ /* fill read-ahead */
+ if (in->ra_read < in->ra_max) {
+ ret = __socket_ssl_read (this, &in->ra_buf[in->ra_read],
+ (in->ra_max - in->ra_read));
+ if (ret > 0)
+ in->ra_read += ret;
+
+ /* we proceed to test if there is still cached data to
+ be served even if readahead could not progress */
+ }
+
+ /* serve cached */
+ if (in->ra_served < in->ra_read) {
+ ret = iov_load (opvector, opcount, &in->ra_buf[in->ra_served],
+ min (req_len, (in->ra_read - in->ra_served)));
+
+ in->ra_served += ret;
+ /* Do not read uncached and cached in the same call */
+ goto out;
+ }
+
+ if (in->ra_read < in->ra_max)
+ /* If there was no cached data to be served, (and we are
+ guaranteed to have already performed an attempt to progress
+ readahead above), and we have not yet read out the full
+ readahead capacity, then bail out for now without doing
+ the uncached read below (as that will overtake future cached
+ read)
+ */
+ goto out;
+uncached:
+ ret = __socket_ssl_readv (this, opvector, opcount);
+out:
+ return ret;
+}
+
+static gf_boolean_t
+__does_socket_rwv_error_need_logging (socket_private_t *priv, int write)
+{
+ int read = !write;
+
+ if (priv->connected == -1) /* Didn't even connect, of course it fails */
+ return _gf_false;
+
+ if (read && (priv->read_fail_log == _gf_false))
+ return _gf_false;
+
+ return _gf_true;
+}
/*
* return value:
@@ -143,7 +438,7 @@ int socket_init (rpc_transport_t *this);
* > 0 = incomplete
*/
-int
+static int
__socket_rwv (rpc_transport_t *this, struct iovec *vector, int count,
struct iovec **pending_vector, int *pending_count, size_t *bytes,
int write)
@@ -155,39 +450,60 @@ __socket_rwv (rpc_transport_t *this, struct iovec *vector, int count,
int opcount = 0;
int moved = 0;
- if (!this || !this->private)
- goto out;
+ GF_VALIDATE_OR_GOTO ("socket", this, out);
+ GF_VALIDATE_OR_GOTO ("socket", this->private, out);
priv = this->private;
sock = priv->sock;
- opvector = vector;
- opcount = count;
+ opvector = vector;
+ opcount = count;
if (bytes != NULL) {
*bytes = 0;
}
- while (opcount) {
+ while (opcount > 0) {
+ if (opvector->iov_len == 0) {
+ gf_log(this->name,GF_LOG_DEBUG,
+ "would have passed zero length to read/write");
+ ++opvector;
+ --opcount;
+ continue;
+ }
if (write) {
- ret = writev (sock, opvector, opcount);
+ if (priv->use_ssl) {
+ ret = ssl_write_one(this,
+ opvector->iov_base, opvector->iov_len);
+ }
+ else {
+ ret = writev (sock, opvector, opcount);
+ }
if (ret == 0 || (ret == -1 && errno == EAGAIN)) {
/* done for now */
break;
}
+ this->total_bytes_write += ret;
} else {
- ret = readv (sock, opvector, opcount);
+ ret = __socket_cached_read (this, opvector, opcount);
+
+ if (ret == 0) {
+ gf_log(this->name,GF_LOG_DEBUG,"EOF on socket");
+ errno = ENODATA;
+ ret = -1;
+ }
if (ret == -1 && errno == EAGAIN) {
/* done for now */
break;
}
+ this->total_bytes_read += ret;
}
if (ret == 0) {
/* Mostly due to 'umount' in client */
- gf_log (this->name, GF_LOG_TRACE,
+ gf_log (this->name, GF_LOG_DEBUG,
"EOF from peer %s", this->peerinfo.identifier);
opcount = -1;
errno = ENOTCONN;
@@ -197,9 +513,18 @@ __socket_rwv (rpc_transport_t *this, struct iovec *vector, int count,
if (errno == EINTR)
continue;
- gf_log (this->name, GF_LOG_TRACE,
- "%s failed (%s)", write ? "writev" : "readv",
- strerror (errno));
+ if (__does_socket_rwv_error_need_logging (priv,
+ write)) {
+ gf_log (this->name, GF_LOG_WARNING,
+ "%s on %s failed (%s)",
+ write ? "writev":"readv",
+ this->peerinfo.identifier,
+ strerror (errno));
+ }
+
+ if (priv->use_ssl) {
+ ssl_dump_error_stack(this->name);
+ }
opcount = -1;
break;
}
@@ -211,6 +536,17 @@ __socket_rwv (rpc_transport_t *this, struct iovec *vector, int count,
moved = 0;
while (moved < ret) {
+ if (!opcount) {
+ gf_log(this->name,GF_LOG_DEBUG,
+ "ran out of iov, moved %d/%d",
+ moved, ret);
+ goto ran_out;
+ }
+ if (!opvector[0].iov_len) {
+ opvector++;
+ opcount--;
+ continue;
+ }
if ((ret - moved) >= opvector[0].iov_len) {
moved += opvector[0].iov_len;
opvector++;
@@ -220,13 +556,11 @@ __socket_rwv (rpc_transport_t *this, struct iovec *vector, int count,
opvector[0].iov_base += (ret - moved);
moved += (ret - moved);
}
- while (opcount && !opvector[0].iov_len) {
- opvector++;
- opcount--;
- }
}
}
+ran_out:
+
if (pending_vector)
*pending_vector = opvector;
@@ -238,7 +572,7 @@ out:
}
-int
+static int
__socket_readv (rpc_transport_t *this, struct iovec *vector, int count,
struct iovec **pending_vector, int *pending_count,
size_t *bytes)
@@ -246,43 +580,75 @@ __socket_readv (rpc_transport_t *this, struct iovec *vector, int count,
int ret = -1;
ret = __socket_rwv (this, vector, count,
- pending_vector, pending_count, bytes, 0);
+ pending_vector, pending_count, bytes, 0);
return ret;
}
-int
+static int
__socket_writev (rpc_transport_t *this, struct iovec *vector, int count,
struct iovec **pending_vector, int *pending_count)
{
int ret = -1;
ret = __socket_rwv (this, vector, count,
- pending_vector, pending_count, NULL, 1);
+ pending_vector, pending_count, NULL, 1);
return ret;
}
-int
-__socket_disconnect (rpc_transport_t *this)
+static int
+__socket_shutdown (rpc_transport_t *this)
{
- socket_private_t *priv = NULL;
int ret = -1;
+ socket_private_t *priv = this->private;
- if (!this || !this->private)
- goto out;
+ priv->connected = -1;
+ ret = shutdown (priv->sock, SHUT_RDWR);
+ if (ret) {
+ /* its already disconnected.. no need to understand
+ why it failed to shutdown in normal cases */
+ gf_log (this->name, GF_LOG_DEBUG,
+ "shutdown() returned %d. %s",
+ ret, strerror (errno));
+ }
+ return ret;
+}
+
+static int
+__socket_disconnect (rpc_transport_t *this)
+{
+ int ret = -1;
+ socket_private_t *priv = NULL;
+
+ GF_VALIDATE_OR_GOTO ("socket", this, out);
+ GF_VALIDATE_OR_GOTO ("socket", this->private, out);
priv = this->private;
+ gf_log (this->name, GF_LOG_TRACE,
+ "disconnecting %p, state=%u gen=%u sock=%d", this,
+ priv->ot_state, priv->ot_gen, priv->sock);
+
if (priv->sock != -1) {
- ret = shutdown (priv->sock, SHUT_RDWR);
- priv->connected = -1;
- gf_log (this->name, GF_LOG_TRACE,
- "shutdown() returned %d. set connection state to -1",
- ret);
+ ret = __socket_shutdown(this);
+ if (priv->own_thread) {
+ /*
+ * Without this, reconnect (= disconnect + connect)
+ * won't work except by accident.
+ */
+ close(priv->sock);
+ priv->sock = -1;
+ gf_log (this->name, GF_LOG_TRACE,
+ "OT_PLEASE_DIE on %p", this);
+ priv->ot_state = OT_PLEASE_DIE;
+ }
+ else if (priv->use_ssl) {
+ ssl_teardown_connection(priv);
+ }
}
out:
@@ -290,20 +656,22 @@ out:
}
-int
+static int
__socket_server_bind (rpc_transport_t *this)
{
socket_private_t *priv = NULL;
int ret = -1;
- int opt = 1;
+ int opt = 1;
+ int reuse_check_sock = -1;
+ struct sockaddr_storage unix_addr = {0};
- if (!this || !this->private)
- goto out;
+ GF_VALIDATE_OR_GOTO ("socket", this, out);
+ GF_VALIDATE_OR_GOTO ("socket", this->private, out);
- priv = this->private;
+ priv = this->private;
ret = setsockopt (priv->sock, SOL_SOCKET, SO_REUSEADDR,
- &opt, sizeof (opt));
+ &opt, sizeof (opt));
if (ret == -1) {
gf_log (this->name, GF_LOG_ERROR,
@@ -311,8 +679,23 @@ __socket_server_bind (rpc_transport_t *this)
strerror (errno));
}
+ /* reuse-address doesn't work for unix type sockets */
+ if (AF_UNIX == SA (&this->myinfo.sockaddr)->sa_family) {
+ memcpy (&unix_addr, SA (&this->myinfo.sockaddr),
+ this->myinfo.sockaddr_len);
+ reuse_check_sock = socket (AF_UNIX, SOCK_STREAM, 0);
+ if (reuse_check_sock >= 0) {
+ ret = connect (reuse_check_sock, SA (&unix_addr),
+ this->myinfo.sockaddr_len);
+ if ((ret == -1) && (ECONNREFUSED == errno)) {
+ unlink (((struct sockaddr_un*)&unix_addr)->sun_path);
+ }
+ close (reuse_check_sock);
+ }
+ }
+
ret = bind (priv->sock, (struct sockaddr *)&this->myinfo.sockaddr,
- this->myinfo.sockaddr_len);
+ this->myinfo.sockaddr_len);
if (ret == -1) {
gf_log (this->name, GF_LOG_ERROR,
@@ -329,7 +712,7 @@ out:
}
-int
+static int
__socket_nonblock (int fd)
{
int flags = 0;
@@ -343,23 +726,83 @@ __socket_nonblock (int fd)
return ret;
}
-
-int
+static int
__socket_nodelay (int fd)
{
int on = 1;
int ret = -1;
ret = setsockopt (fd, IPPROTO_TCP, TCP_NODELAY,
- &on, sizeof (on));
+ &on, sizeof (on));
if (!ret)
- gf_log ("", GF_LOG_TRACE,
+ gf_log (THIS->name, GF_LOG_TRACE,
"NODELAY enabled for socket %d", fd);
return ret;
}
-int
+
+static int
+__socket_keepalive (int fd, int family, int keepalive_intvl, int keepalive_idle)
+{
+ int on = 1;
+ int ret = -1;
+
+ ret = setsockopt (fd, SOL_SOCKET, SO_KEEPALIVE, &on, sizeof (on));
+ if (ret == -1) {
+ gf_log ("socket", GF_LOG_WARNING,
+ "failed to set keep alive option on socket %d", fd);
+ goto err;
+ }
+
+ if (keepalive_intvl == GF_USE_DEFAULT_KEEPALIVE)
+ goto done;
+
+#if !defined(GF_LINUX_HOST_OS) && !defined(__NetBSD__)
+#ifdef GF_SOLARIS_HOST_OS
+ ret = setsockopt (fd, SOL_SOCKET, SO_KEEPALIVE, &keepalive_intvl,
+ sizeof (keepalive_intvl));
+#else
+ ret = setsockopt (fd, IPPROTO_TCP, TCP_KEEPALIVE, &keepalive_intvl,
+ sizeof (keepalive_intvl));
+#endif
+ if (ret == -1) {
+ gf_log ("socket", GF_LOG_WARNING,
+ "failed to set keep alive interval on socket %d", fd);
+ goto err;
+ }
+#else
+ if (family != AF_INET)
+ goto done;
+
+ ret = setsockopt (fd, IPPROTO_TCP, TCP_KEEPIDLE, &keepalive_idle,
+ sizeof (keepalive_intvl));
+ if (ret == -1) {
+ gf_log ("socket", GF_LOG_WARNING,
+ "failed to set keep idle %d on socket %d, %s",
+ keepalive_idle, fd, strerror(errno));
+ goto err;
+ }
+ ret = setsockopt (fd, IPPROTO_TCP , TCP_KEEPINTVL, &keepalive_intvl,
+ sizeof (keepalive_intvl));
+ if (ret == -1) {
+ gf_log ("socket", GF_LOG_WARNING,
+ "failed to set keep interval %d on socket %d, %s",
+ keepalive_intvl, fd, strerror(errno));
+ goto err;
+ }
+#endif
+
+done:
+ gf_log (THIS->name, GF_LOG_TRACE, "Keep-alive enabled for socket %d, interval "
+ "%d, idle: %d", fd, keepalive_intvl, keepalive_idle);
+
+err:
+ return ret;
+}
+
+
+static int
__socket_connect_finish (int fd)
{
int ret = -1;
@@ -377,13 +820,13 @@ __socket_connect_finish (int fd)
}
-void
+static void
__socket_reset (rpc_transport_t *this)
{
socket_private_t *priv = NULL;
- if (!this || !this->private)
- goto out;
+ GF_VALIDATE_OR_GOTO ("socket", this, out);
+ GF_VALIDATE_OR_GOTO ("socket", this->private, out);
priv = this->private;
@@ -398,9 +841,7 @@ __socket_reset (rpc_transport_t *this)
iobuf_unref (priv->incoming.iobuf);
}
- if (priv->incoming.request_info != NULL) {
- GF_FREE (priv->incoming.request_info);
- }
+ GF_FREE (priv->incoming.request_info);
memset (&priv->incoming, 0, sizeof (priv->incoming));
@@ -416,13 +857,13 @@ out:
}
-void
+static void
socket_set_lastfrag (uint32_t *fragsize) {
(*fragsize) |= 0x80000000U;
}
-void
+static void
socket_set_frag_header_size (uint32_t size, char *haddr)
{
size = htonl (size);
@@ -430,22 +871,21 @@ socket_set_frag_header_size (uint32_t size, char *haddr)
}
-void
+static void
socket_set_last_frag_header_size (uint32_t size, char *haddr)
{
socket_set_lastfrag (&size);
socket_set_frag_header_size (size, haddr);
}
-struct ioq *
+static struct ioq *
__socket_ioq_new (rpc_transport_t *this, rpc_transport_msg_t *msg)
{
struct ioq *entry = NULL;
int count = 0;
uint32_t size = 0;
- if (!this)
- goto out;
+ GF_VALIDATE_OR_GOTO ("socket", this, out);
/* TODO: use mem-pool */
entry = GF_CALLOC (1, sizeof (*entry), gf_common_mt_ioq);
@@ -454,7 +894,7 @@ __socket_ioq_new (rpc_transport_t *this, rpc_transport_msg_t *msg)
count = msg->rpchdrcount + msg->proghdrcount + msg->progpayloadcount;
- assert (count <= (MAX_IOVEC - 1));
+ GF_ASSERT (count <= (MAX_IOVEC - 1));
size = iov_length (msg->rpchdr, msg->rpchdrcount)
+ iov_length (msg->proghdr, msg->proghdrcount)
@@ -505,11 +945,10 @@ out:
}
-void
+static void
__socket_ioq_entry_free (struct ioq *entry)
{
- if (!entry)
- return;
+ GF_VALIDATE_OR_GOTO ("socket", entry, out);
list_del_init (&entry->list);
if (entry->iobref)
@@ -517,17 +956,20 @@ __socket_ioq_entry_free (struct ioq *entry)
/* TODO: use mem-pool */
GF_FREE (entry);
+
+out:
+ return;
}
-void
+static void
__socket_ioq_flush (rpc_transport_t *this)
{
socket_private_t *priv = NULL;
struct ioq *entry = NULL;
- if (!this || !this->private)
- goto out;
+ GF_VALIDATE_OR_GOTO ("socket", this, out);
+ GF_VALIDATE_OR_GOTO ("socket", this->private, out);
priv = this->private;
@@ -541,35 +983,49 @@ out:
}
-int
-__socket_ioq_churn_entry (rpc_transport_t *this, struct ioq *entry)
+static int
+__socket_ioq_churn_entry (rpc_transport_t *this, struct ioq *entry, int direct)
{
- int ret = -1;
+ int ret = -1;
+ socket_private_t *priv = NULL;
+ char a_byte = 0;
ret = __socket_writev (this, entry->pending_vector,
- entry->pending_count,
+ entry->pending_count,
&entry->pending_vector,
- &entry->pending_count);
+ &entry->pending_count);
if (ret == 0) {
/* current entry was completely written */
- assert (entry->pending_count == 0);
+ GF_ASSERT (entry->pending_count == 0);
__socket_ioq_entry_free (entry);
+ priv = this->private;
+ if (priv->own_thread) {
+ /*
+ * The pipe should only remain readable if there are
+ * more entries after this, so drain the byte
+ * representing this entry.
+ */
+ if (!direct && read(priv->pipe[0],&a_byte,1) < 1) {
+ gf_log(this->name,GF_LOG_WARNING,
+ "read error on pipe");
+ }
+ }
}
return ret;
}
-int
+static int
__socket_ioq_churn (rpc_transport_t *this)
{
socket_private_t *priv = NULL;
int ret = 0;
struct ioq *entry = NULL;
- if (!this || !this->private)
- goto out;
+ GF_VALIDATE_OR_GOTO ("socket", this, out);
+ GF_VALIDATE_OR_GOTO ("socket", this->private, out);
priv = this->private;
@@ -577,16 +1033,16 @@ __socket_ioq_churn (rpc_transport_t *this)
/* pick next entry */
entry = priv->ioq_next;
- ret = __socket_ioq_churn_entry (this, entry);
+ ret = __socket_ioq_churn_entry (this, entry, 0);
if (ret != 0)
break;
}
- if (list_empty (&priv->ioq)) {
+ if (!priv->own_thread && list_empty (&priv->ioq)) {
/* all pending writes done, not interested in POLLOUT */
priv->idx = event_select_on (this->ctx->event_pool,
- priv->sock, priv->idx, -1, 0);
+ priv->sock, priv->idx, -1, 0);
}
out:
@@ -594,14 +1050,14 @@ out:
}
-int
+static int
socket_event_poll_err (rpc_transport_t *this)
{
socket_private_t *priv = NULL;
int ret = -1;
- if (!this || !this->private)
- goto out;
+ GF_VALIDATE_OR_GOTO ("socket", this, out);
+ GF_VALIDATE_OR_GOTO ("socket", this->private, out);
priv = this->private;
@@ -619,14 +1075,14 @@ out:
}
-int
+static int
socket_event_poll_out (rpc_transport_t *this)
{
socket_private_t *priv = NULL;
int ret = -1;
- if (!this || !this->private)
- goto out;
+ GF_VALIDATE_OR_GOTO ("socket", this, out);
+ GF_VALIDATE_OR_GOTO ("socket", this->private, out);
priv = this->private;
@@ -649,56 +1105,58 @@ out:
}
-inline int
+static inline int
__socket_read_simple_msg (rpc_transport_t *this)
{
- socket_private_t *priv = NULL;
- int ret = 0;
- uint32_t remaining_size = 0;
- size_t bytes_read = 0;
+ int ret = 0;
+ uint32_t remaining_size = 0;
+ size_t bytes_read = 0;
+ socket_private_t *priv = NULL;
+ struct gf_sock_incoming *in = NULL;
+ struct gf_sock_incoming_frag *frag = NULL;
- if (!this || !this->private)
- goto out;
+ GF_VALIDATE_OR_GOTO ("socket", this, out);
+ GF_VALIDATE_OR_GOTO ("socket", this->private, out);
priv = this->private;
- switch (priv->incoming.frag.simple_state) {
+ in = &priv->incoming;
+ frag = &in->frag;
+
+ switch (frag->simple_state) {
case SP_STATE_SIMPLE_MSG_INIT:
- remaining_size = RPC_FRAGSIZE (priv->incoming.fraghdr)
- - priv->incoming.frag.bytes_read;
+ remaining_size = RPC_FRAGSIZE (in->fraghdr) - frag->bytes_read;
__socket_proto_init_pending (priv, remaining_size);
- priv->incoming.frag.simple_state =
- SP_STATE_READING_SIMPLE_MSG;
+ frag->simple_state = SP_STATE_READING_SIMPLE_MSG;
/* fall through */
case SP_STATE_READING_SIMPLE_MSG:
ret = 0;
- remaining_size = RPC_FRAGSIZE (priv->incoming.fraghdr)
- - priv->incoming.frag.bytes_read;
+ remaining_size = RPC_FRAGSIZE (in->fraghdr) - frag->bytes_read;
if (remaining_size > 0) {
ret = __socket_readv (this,
- priv->incoming.pending_vector, 1,
- &priv->incoming.pending_vector,
- &priv->incoming.pending_count,
+ in->pending_vector, 1,
+ &in->pending_vector,
+ &in->pending_count,
&bytes_read);
}
if (ret == -1) {
- gf_log (this->name, GF_LOG_TRACE,
+ gf_log (this->name, GF_LOG_WARNING,
"reading from socket failed. Error (%s), "
"peer (%s)", strerror (errno),
this->peerinfo.identifier);
break;
}
- priv->incoming.frag.bytes_read += bytes_read;
- priv->incoming.frag.fragcurrent += bytes_read;
+ frag->bytes_read += bytes_read;
+ frag->fragcurrent += bytes_read;
if (ret > 0) {
gf_log (this->name, GF_LOG_TRACE,
@@ -707,8 +1165,7 @@ __socket_read_simple_msg (rpc_transport_t *this)
}
if (ret == 0) {
- priv->incoming.frag.simple_state
- = SP_STATE_SIMPLE_MSG_INIT;
+ frag->simple_state = SP_STATE_SIMPLE_MSG_INIT;
}
}
@@ -717,7 +1174,7 @@ out:
}
-inline int
+static inline int
__socket_read_simple_request (rpc_transport_t *this)
{
return __socket_read_simple_msg (this);
@@ -728,9 +1185,14 @@ __socket_read_simple_request (rpc_transport_t *this)
#define rpc_verf_addr(fragcurrent) (fragcurrent - 4)
+#define rpc_msgtype_addr(buf) (buf + 4)
+
+#define rpc_prognum_addr(buf) (buf + RPC_MSGTYPE_SIZE + 4)
+#define rpc_progver_addr(buf) (buf + RPC_MSGTYPE_SIZE + 8)
+#define rpc_procnum_addr(buf) (buf + RPC_MSGTYPE_SIZE + 12)
-inline int
-__socket_read_vectored_request (rpc_transport_t *this)
+static inline int
+__socket_read_vectored_request (rpc_transport_t *this, rpcsvc_vector_sizer vector_sizer)
{
socket_private_t *priv = NULL;
int ret = 0;
@@ -738,17 +1200,27 @@ __socket_read_vectored_request (rpc_transport_t *this)
char *addr = NULL;
struct iobuf *iobuf = NULL;
uint32_t remaining_size = 0;
- uint32_t gluster_write_proc_len = 0;
- gfs3_write_req write_req = {0, };
+ ssize_t readsize = 0;
+ size_t size = 0;
+ struct gf_sock_incoming *in = NULL;
+ struct gf_sock_incoming_frag *frag = NULL;
+ sp_rpcfrag_request_state_t *request = NULL;
- if (!this || !this->private)
- goto out;
+ GF_VALIDATE_OR_GOTO ("socket", this, out);
+ GF_VALIDATE_OR_GOTO ("socket", this->private, out);
priv = this->private;
- switch (priv->incoming.frag.call_body.request.vector_state) {
+ /* used to reduce the indirection */
+ in = &priv->incoming;
+ frag = &in->frag;
+ request = &frag->call_body.request;
+
+ switch (request->vector_state) {
case SP_STATE_VECTORED_REQUEST_INIT:
- addr = rpc_cred_addr (iobuf_ptr (priv->incoming.iobuf));
+ request->vector_sizer_state = 0;
+
+ addr = rpc_cred_addr (iobuf_ptr (in->iobuf));
/* also read verf flavour and verflen */
credlen = ntoh32 (*((uint32_t *)addr))
@@ -756,88 +1228,114 @@ __socket_read_vectored_request (rpc_transport_t *this)
__socket_proto_init_pending (priv, credlen);
- priv->incoming.frag.call_body.request.vector_state =
- SP_STATE_READING_CREDBYTES;
+ request->vector_state = SP_STATE_READING_CREDBYTES;
/* fall through */
case SP_STATE_READING_CREDBYTES:
__socket_proto_read (priv, ret);
- priv->incoming.frag.call_body.request.vector_state =
- SP_STATE_READ_CREDBYTES;
+ request->vector_state = SP_STATE_READ_CREDBYTES;
/* fall through */
case SP_STATE_READ_CREDBYTES:
- addr = rpc_verf_addr (priv->incoming.frag.fragcurrent);
-
- /* FIXME: Also handle procedures other than glusterfs-write
- * here
- */
- /* also read proc-header */
- gluster_write_proc_len = xdr_sizeof ((xdrproc_t) xdr_gfs3_write_req,
- &write_req);
+ addr = rpc_verf_addr (frag->fragcurrent);
+ verflen = ntoh32 (*((uint32_t *)addr));
- if (gluster_write_proc_len == 0) {
- gf_log (this->name, GF_LOG_DEBUG,
- "xdr_sizeof on gfs3_write_req failed");
- ret = -1;
- goto out;
+ if (verflen == 0) {
+ request->vector_state = SP_STATE_READ_VERFBYTES;
+ goto sp_state_read_verfbytes;
}
-
- verflen = ntoh32 (*((uint32_t *)addr))
- + gluster_write_proc_len;
-
__socket_proto_init_pending (priv, verflen);
- priv->incoming.frag.call_body.request.vector_state
- = SP_STATE_READING_VERFBYTES;
+ request->vector_state = SP_STATE_READING_VERFBYTES;
/* fall through */
case SP_STATE_READING_VERFBYTES:
__socket_proto_read (priv, ret);
- priv->incoming.frag.call_body.request.vector_state =
- SP_STATE_READ_VERFBYTES;
+ request->vector_state = SP_STATE_READ_VERFBYTES;
/* fall through */
case SP_STATE_READ_VERFBYTES:
- if (priv->incoming.payload_vector.iov_base == NULL) {
- iobuf = iobuf_get (this->ctx->iobuf_pool);
+sp_state_read_verfbytes:
+ /* set the base_addr 'persistently' across multiple calls
+ into the state machine */
+ in->proghdr_base_addr = frag->fragcurrent;
+
+ request->vector_sizer_state =
+ vector_sizer (request->vector_sizer_state,
+ &readsize, in->proghdr_base_addr,
+ frag->fragcurrent);
+ __socket_proto_init_pending (priv, readsize);
+
+ request->vector_state = SP_STATE_READING_PROGHDR;
+
+ /* fall through */
+
+ case SP_STATE_READING_PROGHDR:
+ __socket_proto_read (priv, ret);
+
+ request->vector_state = SP_STATE_READ_PROGHDR;
+
+ /* fall through */
+
+ case SP_STATE_READ_PROGHDR:
+sp_state_read_proghdr:
+ request->vector_sizer_state =
+ vector_sizer (request->vector_sizer_state,
+ &readsize, in->proghdr_base_addr,
+ frag->fragcurrent);
+ if (readsize == 0) {
+ request->vector_state = SP_STATE_READ_PROGHDR_XDATA;
+ goto sp_state_read_proghdr_xdata;
+ }
+
+ __socket_proto_init_pending (priv, readsize);
+
+ request->vector_state = SP_STATE_READING_PROGHDR_XDATA;
+
+ /* fall through */
+
+ case SP_STATE_READING_PROGHDR_XDATA:
+ __socket_proto_read (priv, ret);
+
+ request->vector_state = SP_STATE_READ_PROGHDR;
+ /* check if the vector_sizer() has more to say */
+ goto sp_state_read_proghdr;
+
+ case SP_STATE_READ_PROGHDR_XDATA:
+sp_state_read_proghdr_xdata:
+ if (in->payload_vector.iov_base == NULL) {
+
+ size = RPC_FRAGSIZE (in->fraghdr) - frag->bytes_read;
+ iobuf = iobuf_get2 (this->ctx->iobuf_pool, size);
if (!iobuf) {
- gf_log (this->name, GF_LOG_ERROR,
- "unable to allocate IO buffer "
- "for peer %s",
- this->peerinfo.identifier);
ret = -1;
break;
}
- if (priv->incoming.iobref == NULL) {
- priv->incoming.iobref = iobref_new ();
- if (priv->incoming.iobref == NULL) {
- gf_log (this->name, GF_LOG_ERROR,
- "out of memory");
+ if (in->iobref == NULL) {
+ in->iobref = iobref_new ();
+ if (in->iobref == NULL) {
ret = -1;
iobuf_unref (iobuf);
break;
}
}
- iobref_add (priv->incoming.iobref, iobuf);
+ iobref_add (in->iobref, iobuf);
iobuf_unref (iobuf);
- priv->incoming.payload_vector.iov_base
- = iobuf_ptr (iobuf);
+ in->payload_vector.iov_base = iobuf_ptr (iobuf);
- priv->incoming.frag.fragcurrent = iobuf_ptr (iobuf);
+ frag->fragcurrent = iobuf_ptr (iobuf);
}
- priv->incoming.frag.call_body.request.vector_state =
- SP_STATE_READING_PROG;
+ request->vector_state = SP_STATE_READING_PROG;
/* fall through */
@@ -848,19 +1346,15 @@ __socket_read_vectored_request (rpc_transport_t *this)
ret = __socket_read_simple_msg (this);
- remaining_size = RPC_FRAGSIZE (priv->incoming.fraghdr)
- - priv->incoming.frag.bytes_read;
+ remaining_size = RPC_FRAGSIZE (in->fraghdr) - frag->bytes_read;
- if ((ret == -1)
- || ((ret == 0)
- && (remaining_size == 0)
- && RPC_LASTFRAG (priv->incoming.fraghdr))) {
- priv->incoming.frag.call_body.request.vector_state
- = SP_STATE_VECTORED_REQUEST_INIT;
- priv->incoming.payload_vector.iov_len
- = (unsigned long)priv->incoming.frag.fragcurrent
- - (unsigned long)
- priv->incoming.payload_vector.iov_base;
+ if ((ret == -1) ||
+ ((ret == 0) && (remaining_size == 0)
+ && RPC_LASTFRAG (in->fraghdr))) {
+ request->vector_state = SP_STATE_VECTORED_REQUEST_INIT;
+ in->payload_vector.iov_len
+ = ((unsigned long)frag->fragcurrent
+ - (unsigned long)in->payload_vector.iov_base);
}
break;
}
@@ -869,70 +1363,77 @@ out:
return ret;
}
-
-#define rpc_msgtype_addr(buf) (buf + 4)
-
-#define rpc_prognum_addr(buf) (buf + RPC_MSGTYPE_SIZE + 4)
-
-#define rpc_procnum_addr(buf) (buf + RPC_MSGTYPE_SIZE + 12)
-
-
-inline int
+static inline int
__socket_read_request (rpc_transport_t *this)
{
socket_private_t *priv = NULL;
- uint32_t prognum = 0, procnum = 0;
+ uint32_t prognum = 0, procnum = 0, progver = 0;
uint32_t remaining_size = 0;
int ret = -1;
char *buf = NULL;
+ rpcsvc_vector_sizer vector_sizer = NULL;
+ struct gf_sock_incoming *in = NULL;
+ struct gf_sock_incoming_frag *frag = NULL;
+ sp_rpcfrag_request_state_t *request = NULL;
- if (!this || !this->private)
- goto out;
+ GF_VALIDATE_OR_GOTO ("socket", this, out);
+ GF_VALIDATE_OR_GOTO ("socket", this->private, out);
priv = this->private;
- switch (priv->incoming.frag.call_body.request.header_state) {
+ /* used to reduce the indirection */
+ in = &priv->incoming;
+ frag = &in->frag;
+ request = &frag->call_body.request;
+
+ switch (request->header_state) {
case SP_STATE_REQUEST_HEADER_INIT:
__socket_proto_init_pending (priv, RPC_CALL_BODY_SIZE);
- priv->incoming.frag.call_body.request.header_state
- = SP_STATE_READING_RPCHDR1;
+ request->header_state = SP_STATE_READING_RPCHDR1;
/* fall through */
case SP_STATE_READING_RPCHDR1:
__socket_proto_read (priv, ret);
- priv->incoming.frag.call_body.request.header_state =
- SP_STATE_READ_RPCHDR1;
+ request->header_state = SP_STATE_READ_RPCHDR1;
/* fall through */
case SP_STATE_READ_RPCHDR1:
- buf = rpc_prognum_addr (iobuf_ptr (priv->incoming.iobuf));
+ buf = rpc_prognum_addr (iobuf_ptr (in->iobuf));
prognum = ntoh32 (*((uint32_t *)buf));
- buf = rpc_procnum_addr (iobuf_ptr (priv->incoming.iobuf));
+ buf = rpc_progver_addr (iobuf_ptr (in->iobuf));
+ progver = ntoh32 (*((uint32_t *)buf));
+
+ buf = rpc_procnum_addr (iobuf_ptr (in->iobuf));
procnum = ntoh32 (*((uint32_t *)buf));
- if ((prognum == GLUSTER3_1_FOP_PROGRAM)
- && (procnum == GF_FOP_WRITE)) {
- ret = __socket_read_vectored_request (this);
+ if (priv->is_server) {
+ /* this check is needed as rpcsvc and rpc-clnt
+ * actor structures are not same */
+ vector_sizer =
+ rpcsvc_get_program_vector_sizer ((rpcsvc_t *)this->mydata,
+ prognum, progver, procnum);
+ }
+
+ if (vector_sizer) {
+ ret = __socket_read_vectored_request (this, vector_sizer);
} else {
ret = __socket_read_simple_request (this);
}
- remaining_size = RPC_FRAGSIZE (priv->incoming.fraghdr)
- - priv->incoming.frag.bytes_read;
+ remaining_size = RPC_FRAGSIZE (in->fraghdr) - frag->bytes_read;
if ((ret == -1)
|| ((ret == 0)
&& (remaining_size == 0)
- && (RPC_LASTFRAG (priv->incoming.fraghdr)))) {
- priv->incoming.frag.call_body.request.header_state =
- SP_STATE_REQUEST_HEADER_INIT;
+ && (RPC_LASTFRAG (in->fraghdr)))) {
+ request->header_state = SP_STATE_REQUEST_HEADER_INIT;
}
break;
@@ -943,35 +1444,40 @@ out:
}
-inline int
+static inline int
__socket_read_accepted_successful_reply (rpc_transport_t *this)
{
- socket_private_t *priv = NULL;
- int ret = 0;
- struct iobuf *iobuf = NULL;
- uint32_t gluster_read_rsp_hdr_len = 0;
- gfs3_read_rsp read_rsp = {0, };
-
- if (!this || !this->private)
- goto out;
+ socket_private_t *priv = NULL;
+ int ret = 0;
+ struct iobuf *iobuf = NULL;
+ gfs3_read_rsp read_rsp = {0, };
+ ssize_t size = 0;
+ ssize_t default_read_size = 0;
+ char *proghdr_buf = NULL;
+ XDR xdr;
+ struct gf_sock_incoming *in = NULL;
+ struct gf_sock_incoming_frag *frag = NULL;
+
+ GF_VALIDATE_OR_GOTO ("socket", this, out);
+ GF_VALIDATE_OR_GOTO ("socket", this->private, out);
priv = this->private;
- switch (priv->incoming.frag.call_body.reply.accepted_success_state) {
+ /* used to reduce the indirection */
+ in = &priv->incoming;
+ frag = &in->frag;
+
+ switch (frag->call_body.reply.accepted_success_state) {
case SP_STATE_ACCEPTED_SUCCESS_REPLY_INIT:
- gluster_read_rsp_hdr_len = xdr_sizeof ((xdrproc_t) xdr_gfs3_read_rsp,
- &read_rsp);
+ default_read_size = xdr_sizeof ((xdrproc_t) xdr_gfs3_read_rsp,
+ &read_rsp);
- if (gluster_read_rsp_hdr_len == 0) {
- gf_log (this->name, GF_LOG_DEBUG,
- "xdr_sizeof on gfs3_read_rsp failed");
- ret = -1;
- goto out;
- }
- __socket_proto_init_pending (priv, gluster_read_rsp_hdr_len);
+ proghdr_buf = frag->fragcurrent;
- priv->incoming.frag.call_body.reply.accepted_success_state
+ __socket_proto_init_pending (priv, default_read_size);
+
+ frag->call_body.reply.accepted_success_state
= SP_STATE_READING_PROC_HEADER;
/* fall through */
@@ -979,44 +1485,79 @@ __socket_read_accepted_successful_reply (rpc_transport_t *this)
case SP_STATE_READING_PROC_HEADER:
__socket_proto_read (priv, ret);
- priv->incoming.frag.call_body.reply.accepted_success_state
- = SP_STATE_READ_PROC_HEADER;
+ /* there can be 'xdata' in read response, figure it out */
+ xdrmem_create (&xdr, proghdr_buf, default_read_size,
+ XDR_DECODE);
- /* fall through */
+ /* This will fail if there is xdata sent from server, if not,
+ well and good, we don't need to worry about */
+ xdr_gfs3_read_rsp (&xdr, &read_rsp);
- case SP_STATE_READ_PROC_HEADER:
- if (priv->incoming.payload_vector.iov_base == NULL) {
- iobuf = iobuf_get (this->ctx->iobuf_pool);
+ free (read_rsp.xdata.xdata_val);
+
+ /* need to round off to proper roof (%4), as XDR packing pads
+ the end of opaque object with '0' */
+ size = roof (read_rsp.xdata.xdata_len, 4);
+
+ if (!size) {
+ frag->call_body.reply.accepted_success_state
+ = SP_STATE_READ_PROC_OPAQUE;
+ goto read_proc_opaque;
+ }
+
+ __socket_proto_init_pending (priv, size);
+
+ frag->call_body.reply.accepted_success_state
+ = SP_STATE_READING_PROC_OPAQUE;
+
+ case SP_STATE_READING_PROC_OPAQUE:
+ __socket_proto_read (priv, ret);
+
+ frag->call_body.reply.accepted_success_state
+ = SP_STATE_READ_PROC_OPAQUE;
+
+ case SP_STATE_READ_PROC_OPAQUE:
+ read_proc_opaque:
+ if (in->payload_vector.iov_base == NULL) {
+
+ size = (RPC_FRAGSIZE (in->fraghdr) - frag->bytes_read);
+
+ iobuf = iobuf_get2 (this->ctx->iobuf_pool, size);
if (iobuf == NULL) {
ret = -1;
goto out;
}
- if (priv->incoming.iobref == NULL) {
- priv->incoming.iobref = iobref_new ();
- if (priv->incoming.iobref == NULL) {
+ if (in->iobref == NULL) {
+ in->iobref = iobref_new ();
+ if (in->iobref == NULL) {
ret = -1;
iobuf_unref (iobuf);
goto out;
}
}
- iobref_add (priv->incoming.iobref, iobuf);
+ iobref_add (in->iobref, iobuf);
iobuf_unref (iobuf);
- priv->incoming.payload_vector.iov_base
- = iobuf_ptr (iobuf);
+ in->payload_vector.iov_base = iobuf_ptr (iobuf);
+
+ in->payload_vector.iov_len = size;
}
- priv->incoming.frag.fragcurrent
- = priv->incoming.payload_vector.iov_base;
+ frag->fragcurrent = in->payload_vector.iov_base;
+
+ frag->call_body.reply.accepted_success_state
+ = SP_STATE_READ_PROC_HEADER;
+
+ /* fall through */
+ case SP_STATE_READ_PROC_HEADER:
/* now read the entire remaining msg into new iobuf */
ret = __socket_read_simple_msg (this);
if ((ret == -1)
- || ((ret == 0)
- && RPC_LASTFRAG (priv->incoming.fraghdr))) {
- priv->incoming.frag.call_body.reply.accepted_success_state
+ || ((ret == 0) && RPC_LASTFRAG (in->fraghdr))) {
+ frag->call_body.reply.accepted_success_state
= SP_STATE_ACCEPTED_SUCCESS_REPLY_INIT;
}
@@ -1030,7 +1571,7 @@ out:
#define rpc_reply_verflen_addr(fragcurrent) ((char *)fragcurrent - 4)
#define rpc_reply_accept_status_addr(fragcurrent) ((char *)fragcurrent - 4)
-inline int
+static inline int
__socket_read_accepted_reply (rpc_transport_t *this)
{
socket_private_t *priv = NULL;
@@ -1038,19 +1579,24 @@ __socket_read_accepted_reply (rpc_transport_t *this)
char *buf = NULL;
uint32_t verflen = 0, len = 0;
uint32_t remaining_size = 0;
+ struct gf_sock_incoming *in = NULL;
+ struct gf_sock_incoming_frag *frag = NULL;
- if (!this || !this->private)
- goto out;
+ GF_VALIDATE_OR_GOTO ("socket", this, out);
+ GF_VALIDATE_OR_GOTO ("socket", this->private, out);
priv = this->private;
+ /* used to reduce the indirection */
+ in = &priv->incoming;
+ frag = &in->frag;
- switch (priv->incoming.frag.call_body.reply.accepted_state) {
+ switch (frag->call_body.reply.accepted_state) {
case SP_STATE_ACCEPTED_REPLY_INIT:
__socket_proto_init_pending (priv,
RPC_AUTH_FLAVOUR_N_LENGTH_SIZE);
- priv->incoming.frag.call_body.reply.accepted_state
+ frag->call_body.reply.accepted_state
= SP_STATE_READING_REPLY_VERFLEN;
/* fall through */
@@ -1058,13 +1604,13 @@ __socket_read_accepted_reply (rpc_transport_t *this)
case SP_STATE_READING_REPLY_VERFLEN:
__socket_proto_read (priv, ret);
- priv->incoming.frag.call_body.reply.accepted_state
+ frag->call_body.reply.accepted_state
= SP_STATE_READ_REPLY_VERFLEN;
/* fall through */
case SP_STATE_READ_REPLY_VERFLEN:
- buf = rpc_reply_verflen_addr (priv->incoming.frag.fragcurrent);
+ buf = rpc_reply_verflen_addr (frag->fragcurrent);
verflen = ntoh32 (*((uint32_t *) buf));
@@ -1073,7 +1619,7 @@ __socket_read_accepted_reply (rpc_transport_t *this)
__socket_proto_init_pending (priv, len);
- priv->incoming.frag.call_body.reply.accepted_state
+ frag->call_body.reply.accepted_state
= SP_STATE_READING_REPLY_VERFBYTES;
/* fall through */
@@ -1081,19 +1627,19 @@ __socket_read_accepted_reply (rpc_transport_t *this)
case SP_STATE_READING_REPLY_VERFBYTES:
__socket_proto_read (priv, ret);
- priv->incoming.frag.call_body.reply.accepted_state
+ frag->call_body.reply.accepted_state
= SP_STATE_READ_REPLY_VERFBYTES;
- buf = rpc_reply_accept_status_addr (priv->incoming.frag.fragcurrent);
+ buf = rpc_reply_accept_status_addr (frag->fragcurrent);
- priv->incoming.frag.call_body.reply.accept_status
+ frag->call_body.reply.accept_status
= ntoh32 (*(uint32_t *) buf);
/* fall through */
case SP_STATE_READ_REPLY_VERFBYTES:
- if (priv->incoming.frag.call_body.reply.accept_status
+ if (frag->call_body.reply.accept_status
== SUCCESS) {
ret = __socket_read_accepted_successful_reply (this);
} else {
@@ -1103,14 +1649,13 @@ __socket_read_accepted_reply (rpc_transport_t *this)
ret = __socket_read_simple_msg (this);
}
- remaining_size = RPC_FRAGSIZE (priv->incoming.fraghdr)
- - priv->incoming.frag.bytes_read;
+ remaining_size = RPC_FRAGSIZE (in->fraghdr)
+ - frag->bytes_read;
if ((ret == -1)
- || ((ret == 0)
- && (remaining_size == 0)
- && (RPC_LASTFRAG (priv->incoming.fraghdr)))) {
- priv->incoming.frag.call_body.reply.accepted_state
+ || ((ret == 0) && (remaining_size == 0)
+ && (RPC_LASTFRAG (in->fraghdr)))) {
+ frag->call_body.reply.accepted_state
= SP_STATE_ACCEPTED_REPLY_INIT;
}
@@ -1122,7 +1667,7 @@ out:
}
-inline int
+static inline int
__socket_read_denied_reply (rpc_transport_t *this)
{
return __socket_read_simple_msg (this);
@@ -1132,25 +1677,29 @@ __socket_read_denied_reply (rpc_transport_t *this)
#define rpc_reply_status_addr(fragcurrent) ((char *)fragcurrent - 4)
-inline int
+static inline int
__socket_read_vectored_reply (rpc_transport_t *this)
{
socket_private_t *priv = NULL;
int ret = 0;
char *buf = NULL;
uint32_t remaining_size = 0;
+ struct gf_sock_incoming *in = NULL;
+ struct gf_sock_incoming_frag *frag = NULL;
- if (!this || !this->private)
- goto out;
+ GF_VALIDATE_OR_GOTO ("socket", this, out);
+ GF_VALIDATE_OR_GOTO ("socket", this->private, out);
priv = this->private;
+ in = &priv->incoming;
+ frag = &in->frag;
- switch (priv->incoming.frag.call_body.reply.status_state) {
+ switch (frag->call_body.reply.status_state) {
case SP_STATE_ACCEPTED_REPLY_INIT:
__socket_proto_init_pending (priv, RPC_REPLY_STATUS_SIZE);
- priv->incoming.frag.call_body.reply.status_state
+ frag->call_body.reply.status_state
= SP_STATE_READING_REPLY_STATUS;
/* fall through */
@@ -1158,37 +1707,33 @@ __socket_read_vectored_reply (rpc_transport_t *this)
case SP_STATE_READING_REPLY_STATUS:
__socket_proto_read (priv, ret);
- buf = rpc_reply_status_addr (priv->incoming.frag.fragcurrent);
+ buf = rpc_reply_status_addr (frag->fragcurrent);
- priv->incoming.frag.call_body.reply.accept_status
+ frag->call_body.reply.accept_status
= ntoh32 (*((uint32_t *) buf));
- priv->incoming.frag.call_body.reply.status_state
+ frag->call_body.reply.status_state
= SP_STATE_READ_REPLY_STATUS;
/* fall through */
case SP_STATE_READ_REPLY_STATUS:
- if (priv->incoming.frag.call_body.reply.accept_status
- == MSG_ACCEPTED) {
+ if (frag->call_body.reply.accept_status == MSG_ACCEPTED) {
ret = __socket_read_accepted_reply (this);
} else {
ret = __socket_read_denied_reply (this);
}
- remaining_size = RPC_FRAGSIZE (priv->incoming.fraghdr)
- - priv->incoming.frag.bytes_read;
+ remaining_size = RPC_FRAGSIZE (in->fraghdr) - frag->bytes_read;
if ((ret == -1)
- || ((ret == 0)
- && (remaining_size == 0)
- && (RPC_LASTFRAG (priv->incoming.fraghdr)))) {
- priv->incoming.frag.call_body.reply.status_state
+ || ((ret == 0) && (remaining_size == 0)
+ && (RPC_LASTFRAG (in->fraghdr)))) {
+ frag->call_body.reply.status_state
= SP_STATE_ACCEPTED_REPLY_INIT;
- priv->incoming.payload_vector.iov_len
- = (unsigned long)priv->incoming.frag.fragcurrent
- - (unsigned long)
- priv->incoming.payload_vector.iov_base;
+ in->payload_vector.iov_len
+ = (unsigned long)frag->fragcurrent
+ - (unsigned long)in->payload_vector.iov_base;
}
break;
}
@@ -1198,7 +1743,7 @@ out:
}
-inline int
+static inline int
__socket_read_simple_reply (rpc_transport_t *this)
{
return __socket_read_simple_msg (this);
@@ -1206,7 +1751,7 @@ __socket_read_simple_reply (rpc_transport_t *this)
#define rpc_xid_addr(buf) (buf)
-inline int
+static inline int
__socket_read_reply (rpc_transport_t *this)
{
socket_private_t *priv = NULL;
@@ -1214,27 +1759,29 @@ __socket_read_reply (rpc_transport_t *this)
int32_t ret = -1;
rpc_request_info_t *request_info = NULL;
char map_xid = 0;
+ struct gf_sock_incoming *in = NULL;
+ struct gf_sock_incoming_frag *frag = NULL;
- if (!this || !this->private)
- goto out;
+ GF_VALIDATE_OR_GOTO ("socket", this, out);
+ GF_VALIDATE_OR_GOTO ("socket", this->private, out);
priv = this->private;
+ in = &priv->incoming;
+ frag = &in->frag;
- buf = rpc_xid_addr (iobuf_ptr (priv->incoming.iobuf));
+ buf = rpc_xid_addr (iobuf_ptr (in->iobuf));
- if (priv->incoming.request_info == NULL) {
- priv->incoming.request_info = GF_CALLOC (1,
- sizeof (*request_info),
- gf_common_mt_rpc_trans_reqinfo_t);
- if (priv->incoming.request_info == NULL) {
- gf_log (this->name, GF_LOG_ERROR, "out of memory");
+ if (in->request_info == NULL) {
+ in->request_info = GF_CALLOC (1, sizeof (*request_info),
+ gf_common_mt_rpc_trans_reqinfo_t);
+ if (in->request_info == NULL) {
goto out;
}
map_xid = 1;
}
- request_info = priv->incoming.request_info;
+ request_info = in->request_info;
if (map_xid) {
request_info->xid = ntoh32 (*((uint32_t *) buf));
@@ -1246,22 +1793,22 @@ __socket_read_reply (rpc_transport_t *this)
{
ret = rpc_transport_notify (this,
RPC_TRANSPORT_MAP_XID_REQUEST,
- priv->incoming.request_info);
+ in->request_info);
}
pthread_mutex_lock (&priv->lock);
if (ret == -1) {
+ gf_log (this->name, GF_LOG_WARNING,
+ "notify for event MAP_XID failed");
goto out;
}
}
- if ((request_info->prognum == GLUSTER3_1_FOP_PROGRAM)
+ if ((request_info->prognum == GLUSTER_FOP_PROGRAM)
&& (request_info->procnum == GF_FOP_READ)) {
if (map_xid && request_info->rsp.rsp_payload_count != 0) {
- priv->incoming.iobref
- = iobref_ref (request_info->rsp.rsp_iobref);
- priv->incoming.payload_vector
- = *request_info->rsp.rsp_payload;
+ in->iobref = iobref_ref (request_info->rsp.rsp_iobref);
+ in->payload_vector = *request_info->rsp.rsp_payload;
}
ret = __socket_read_vectored_reply (this);
@@ -1274,62 +1821,65 @@ out:
/* returns the number of bytes yet to be read in a fragment */
-inline int
+static inline int
__socket_read_frag (rpc_transport_t *this)
{
socket_private_t *priv = NULL;
int32_t ret = 0;
char *buf = NULL;
uint32_t remaining_size = 0;
+ struct gf_sock_incoming *in = NULL;
+ struct gf_sock_incoming_frag *frag = NULL;
- if (!this || !this->private)
- goto out;
+ GF_VALIDATE_OR_GOTO ("socket", this, out);
+ GF_VALIDATE_OR_GOTO ("socket", this->private, out);
priv = this->private;
+ /* used to reduce the indirection */
+ in = &priv->incoming;
+ frag = &in->frag;
- switch (priv->incoming.frag.state) {
+ switch (frag->state) {
case SP_STATE_NADA:
__socket_proto_init_pending (priv, RPC_MSGTYPE_SIZE);
- priv->incoming.frag.state = SP_STATE_READING_MSGTYPE;
+ frag->state = SP_STATE_READING_MSGTYPE;
/* fall through */
case SP_STATE_READING_MSGTYPE:
__socket_proto_read (priv, ret);
- priv->incoming.frag.state = SP_STATE_READ_MSGTYPE;
+ frag->state = SP_STATE_READ_MSGTYPE;
/* fall through */
case SP_STATE_READ_MSGTYPE:
- buf = rpc_msgtype_addr (iobuf_ptr (priv->incoming.iobuf));
- priv->incoming.msg_type = ntoh32 (*((uint32_t *)buf));
+ buf = rpc_msgtype_addr (iobuf_ptr (in->iobuf));
+ in->msg_type = ntoh32 (*((uint32_t *)buf));
- if (priv->incoming.msg_type == CALL) {
+ if (in->msg_type == CALL) {
ret = __socket_read_request (this);
- } else if (priv->incoming.msg_type == REPLY) {
+ } else if (in->msg_type == REPLY) {
ret = __socket_read_reply (this);
- } else if (priv->incoming.msg_type == GF_UNIVERSAL_ANSWER) {
+ } else if (in->msg_type == GF_UNIVERSAL_ANSWER) {
gf_log ("rpc", GF_LOG_ERROR,
"older version of protocol/process trying to "
- "connect from %s. use newer verison on that node",
+ "connect from %s. use newer version on that node",
this->peerinfo.identifier);
} else {
gf_log ("rpc", GF_LOG_ERROR,
"wrong MSG-TYPE (%d) received from %s",
- priv->incoming.msg_type,
+ in->msg_type,
this->peerinfo.identifier);
ret = -1;
}
- remaining_size = RPC_FRAGSIZE (priv->incoming.fraghdr)
- - priv->incoming.frag.bytes_read;
+ remaining_size = RPC_FRAGSIZE (in->fraghdr) - frag->bytes_read;
if ((ret == -1)
- || ((ret == 0)
- && (remaining_size == 0)
- && (RPC_LASTFRAG (priv->incoming.fraghdr)))) {
- priv->incoming.frag.state = SP_STATE_NADA;
+ || ((ret == 0) && (remaining_size == 0)
+ && (RPC_LASTFRAG (in->fraghdr)))) {
+ frag->state = SP_STATE_NADA;
}
break;
@@ -1340,31 +1890,36 @@ out:
}
-inline
+static inline
void __socket_reset_priv (socket_private_t *priv)
{
- if (priv->incoming.iobref) {
- iobref_unref (priv->incoming.iobref);
- priv->incoming.iobref = NULL;
+ struct gf_sock_incoming *in = NULL;
+
+ /* used to reduce the indirection */
+ in = &priv->incoming;
+
+ if (in->iobref) {
+ iobref_unref (in->iobref);
+ in->iobref = NULL;
}
- if (priv->incoming.iobuf) {
- iobuf_unref (priv->incoming.iobuf);
+ if (in->iobuf) {
+ iobuf_unref (in->iobuf);
}
- if (priv->incoming.request_info != NULL) {
- GF_FREE (priv->incoming.request_info);
- priv->incoming.request_info = NULL;
+ if (in->request_info != NULL) {
+ GF_FREE (in->request_info);
+ in->request_info = NULL;
}
- memset (&priv->incoming.payload_vector, 0,
- sizeof (priv->incoming.payload_vector));
+ memset (&in->payload_vector, 0,
+ sizeof (in->payload_vector));
- priv->incoming.iobuf = NULL;
+ in->iobuf = NULL;
}
-int
+static int
__socket_proto_state_machine (rpc_transport_t *this,
rpc_transport_pollin_t **pollin)
{
@@ -1373,55 +1928,40 @@ __socket_proto_state_machine (rpc_transport_t *this,
struct iobuf *iobuf = NULL;
struct iobref *iobref = NULL;
struct iovec vector[2];
+ struct gf_sock_incoming *in = NULL;
+ struct gf_sock_incoming_frag *frag = NULL;
- if (!this || !this->private)
- goto out;
+ GF_VALIDATE_OR_GOTO ("socket", this, out);
+ GF_VALIDATE_OR_GOTO ("socket", this->private, out);
priv = this->private;
- while (priv->incoming.record_state != SP_STATE_COMPLETE) {
- switch (priv->incoming.record_state) {
+ /* used to reduce the indirection */
+ in = &priv->incoming;
+ frag = &in->frag;
- case SP_STATE_NADA:
- iobuf = iobuf_get (this->ctx->iobuf_pool);
- if (!iobuf) {
- gf_log (this->name, GF_LOG_ERROR,
- "unable to allocate IO buffer "
- "for peer %s",
- this->peerinfo.identifier);
- ret = -ENOMEM;
- goto out;
- }
+ while (in->record_state != SP_STATE_COMPLETE) {
+ switch (in->record_state) {
- priv->incoming.iobuf = iobuf;
- priv->incoming.iobuf_size = 0;
- priv->incoming.total_bytes_read = 0;
- priv->incoming.payload_vector.iov_len = 0;
+ case SP_STATE_NADA:
+ in->total_bytes_read = 0;
+ in->payload_vector.iov_len = 0;
- priv->incoming.pending_vector = priv->incoming.vector;
- priv->incoming.pending_vector->iov_base =
- &priv->incoming.fraghdr;
+ in->pending_vector = in->vector;
+ in->pending_vector->iov_base = &in->fraghdr;
- priv->incoming.frag.fragcurrent = iobuf_ptr (iobuf);
- priv->incoming.pending_vector->iov_len =
- sizeof (priv->incoming.fraghdr);
+ in->pending_vector->iov_len = sizeof (in->fraghdr);
- priv->incoming.record_state = SP_STATE_READING_FRAGHDR;
+ in->record_state = SP_STATE_READING_FRAGHDR;
/* fall through */
case SP_STATE_READING_FRAGHDR:
- ret = __socket_readv (this,
- priv->incoming.pending_vector, 1,
- &priv->incoming.pending_vector,
- &priv->incoming.pending_count,
+ ret = __socket_readv (this, in->pending_vector, 1,
+ &in->pending_vector,
+ &in->pending_count,
NULL);
- if (ret == -1) {
- gf_log (this->name, GF_LOG_TRACE,
- "reading from socket failed. Error (%s), "
- "peer (%s)", strerror (errno),
- this->peerinfo.identifier);
+ if (ret == -1)
goto out;
- }
if (ret > 0) {
gf_log (this->name, GF_LOG_TRACE, "partial "
@@ -1430,33 +1970,40 @@ __socket_proto_state_machine (rpc_transport_t *this,
}
if (ret == 0) {
- priv->incoming.record_state =
- SP_STATE_READ_FRAGHDR;
+ in->record_state = SP_STATE_READ_FRAGHDR;
}
/* fall through */
case SP_STATE_READ_FRAGHDR:
- priv->incoming.fraghdr = ntoh32 (priv->incoming.fraghdr);
- priv->incoming.record_state = SP_STATE_READING_FRAG;
- priv->incoming.total_bytes_read
- += RPC_FRAGSIZE(priv->incoming.fraghdr);
+ in->fraghdr = ntoh32 (in->fraghdr);
+ in->total_bytes_read += RPC_FRAGSIZE(in->fraghdr);
+ iobuf = iobuf_get2 (this->ctx->iobuf_pool,
+ (in->total_bytes_read +
+ sizeof (in->fraghdr)));
+ if (!iobuf) {
+ ret = -ENOMEM;
+ goto out;
+ }
+
+ in->iobuf = iobuf;
+ in->iobuf_size = 0;
+ frag->fragcurrent = iobuf_ptr (iobuf);
+ in->record_state = SP_STATE_READING_FRAG;
/* fall through */
case SP_STATE_READING_FRAG:
ret = __socket_read_frag (this);
- if ((ret == -1)
- || (priv->incoming.frag.bytes_read !=
- RPC_FRAGSIZE (priv->incoming.fraghdr))) {
+ if ((ret == -1) ||
+ (frag->bytes_read != RPC_FRAGSIZE (in->fraghdr))) {
goto out;
}
- priv->incoming.frag.bytes_read = 0;
+ frag->bytes_read = 0;
- if (!RPC_LASTFRAG (priv->incoming.fraghdr)) {
- priv->incoming.record_state =
- SP_STATE_READING_FRAGHDR;
+ if (!RPC_LASTFRAG (in->fraghdr)) {
+ in->record_state = SP_STATE_READING_FRAGHDR;
break;
}
@@ -1465,72 +2012,65 @@ __socket_proto_state_machine (rpc_transport_t *this,
*/
if (pollin != NULL) {
int count = 0;
- priv->incoming.iobuf_size
- = priv->incoming.total_bytes_read
- - priv->incoming.payload_vector.iov_len;
+ in->iobuf_size = (in->total_bytes_read -
+ in->payload_vector.iov_len);
memset (vector, 0, sizeof (vector));
- if (priv->incoming.iobref == NULL) {
- priv->incoming.iobref = iobref_new ();
- if (priv->incoming.iobref == NULL) {
- gf_log (this->name,
- GF_LOG_ERROR,
- "out of memory");
+ if (in->iobref == NULL) {
+ in->iobref = iobref_new ();
+ if (in->iobref == NULL) {
ret = -1;
goto out;
}
}
- vector[count].iov_base
- = iobuf_ptr (priv->incoming.iobuf);
- vector[count].iov_len
- = priv->incoming.iobuf_size;
-
- iobref = priv->incoming.iobref;
+ vector[count].iov_base = iobuf_ptr (in->iobuf);
+ vector[count].iov_len = in->iobuf_size;
- iobref_add (iobref,
- priv->incoming.iobuf);
- iobuf_unref (priv->incoming.iobuf);
- priv->incoming.iobuf = NULL;
+ iobref = in->iobref;
count++;
- if (priv->incoming.payload_vector.iov_base
- != NULL) {
- vector[count]
- = priv->incoming.payload_vector;
+ if (in->payload_vector.iov_base != NULL) {
+ vector[count] = in->payload_vector;
count++;
}
*pollin = rpc_transport_pollin_alloc (this,
vector,
count,
+ in->iobuf,
iobref,
- priv->incoming.request_info);
+ in->request_info);
+ iobuf_unref (in->iobuf);
+ in->iobuf = NULL;
+
if (*pollin == NULL) {
+ gf_log (this->name, GF_LOG_WARNING,
+ "transport pollin allocation failed");
ret = -1;
goto out;
}
- if (priv->incoming.msg_type == REPLY)
+ if (in->msg_type == REPLY)
(*pollin)->is_reply = 1;
- priv->incoming.request_info = NULL;
+ in->request_info = NULL;
}
- priv->incoming.record_state = SP_STATE_COMPLETE;
+ in->record_state = SP_STATE_COMPLETE;
break;
case SP_STATE_COMPLETE:
/* control should not reach here */
- gf_log (this->name, GF_LOG_DEBUG, "control reached to "
+ gf_log (this->name, GF_LOG_WARNING, "control reached to "
"SP_STATE_COMPLETE, which should not have "
"happened");
break;
}
}
- if (priv->incoming.record_state == SP_STATE_COMPLETE) {
- priv->incoming.record_state = SP_STATE_NADA;
+ if (in->record_state == SP_STATE_COMPLETE) {
+ in->record_state = SP_STATE_NADA;
__socket_reset_priv (priv);
}
@@ -1542,41 +2082,45 @@ out:
}
-int
+static int
socket_proto_state_machine (rpc_transport_t *this,
rpc_transport_pollin_t **pollin)
{
socket_private_t *priv = NULL;
- int ret = 0;
+ int ret = 0;
- if (!this || !this->private)
- goto out;
+ GF_VALIDATE_OR_GOTO ("socket", this, out);
+ GF_VALIDATE_OR_GOTO ("socket", this->private, out);
- priv = this->private;
+ priv = this->private;
- pthread_mutex_lock (&priv->lock);
- {
- ret = __socket_proto_state_machine (this, pollin);
- }
+ pthread_mutex_lock (&priv->lock);
+ {
+ ret = __socket_proto_state_machine (this, pollin);
+ }
pthread_mutex_unlock (&priv->lock);
out:
- return ret;
+ return ret;
}
-int
+static int
socket_event_poll_in (rpc_transport_t *this)
{
int ret = -1;
rpc_transport_pollin_t *pollin = NULL;
+ socket_private_t *priv = this->private;
ret = socket_proto_state_machine (this, &pollin);
if (pollin != NULL) {
+ priv->ot_state = OT_CALLBACK;
ret = rpc_transport_notify (this, RPC_TRANSPORT_MSG_RECEIVED,
pollin);
-
+ if (priv->ot_state == OT_CALLBACK) {
+ priv->ot_state = OT_RUNNING;
+ }
rpc_transport_pollin_destroy (pollin);
}
@@ -1584,7 +2128,7 @@ socket_event_poll_in (rpc_transport_t *this)
}
-int
+static int
socket_connect_finish (rpc_transport_t *this)
{
int ret = -1;
@@ -1592,58 +2136,57 @@ socket_connect_finish (rpc_transport_t *this)
rpc_transport_event_t event = 0;
char notify_rpc = 0;
- if (!this || !this->private)
- goto out;
+ GF_VALIDATE_OR_GOTO ("socket", this, out);
+ GF_VALIDATE_OR_GOTO ("socket", this->private, out);
priv = this->private;
pthread_mutex_lock (&priv->lock);
{
- if (priv->connected)
- goto unlock;
+ if (priv->connected != 0)
+ goto unlock;
+
+ get_transport_identifiers (this);
- ret = __socket_connect_finish (priv->sock);
+ ret = __socket_connect_finish (priv->sock);
- if (ret == -1 && errno == EINPROGRESS)
- ret = 1;
+ if (ret == -1 && errno == EINPROGRESS)
+ ret = 1;
- if (ret == -1 && errno != EINPROGRESS) {
- if (!priv->connect_finish_log) {
- gf_log (this->name, GF_LOG_ERROR,
- "connection to %s failed (%s)",
+ if (ret == -1 && errno != EINPROGRESS) {
+ if (!priv->connect_finish_log) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "connection to %s failed (%s)",
this->peerinfo.identifier,
- strerror (errno));
- priv->connect_finish_log = 1;
- }
- __socket_disconnect (this);
- notify_rpc = 1;
- event = RPC_TRANSPORT_DISCONNECT;
- goto unlock;
- }
+ strerror (errno));
+ priv->connect_finish_log = 1;
+ }
+ __socket_disconnect (this);
+ goto unlock;
+ }
- if (ret == 0) {
- notify_rpc = 1;
-
- this->myinfo.sockaddr_len =
- sizeof (this->myinfo.sockaddr);
-
- ret = getsockname (priv->sock,
- SA (&this->myinfo.sockaddr),
- &this->myinfo.sockaddr_len);
- if (ret == -1) {
- gf_log (this->name, GF_LOG_DEBUG,
- "getsockname on (%d) failed (%s)",
- priv->sock, strerror (errno));
- __socket_disconnect (this);
- event = GF_EVENT_POLLERR;
- goto unlock;
- }
+ if (ret == 0) {
+ notify_rpc = 1;
- priv->connected = 1;
- priv->connect_finish_log = 0;
- event = RPC_TRANSPORT_CONNECT;
- get_transport_identifiers (this);
- }
+ this->myinfo.sockaddr_len =
+ sizeof (this->myinfo.sockaddr);
+
+ ret = getsockname (priv->sock,
+ SA (&this->myinfo.sockaddr),
+ &this->myinfo.sockaddr_len);
+ if (ret == -1) {
+ gf_log (this->name, GF_LOG_WARNING,
+ "getsockname on (%d) failed (%s)",
+ priv->sock, strerror (errno));
+ __socket_disconnect (this);
+ event = GF_EVENT_POLLERR;
+ goto unlock;
+ }
+
+ priv->connected = 1;
+ priv->connect_finish_log = 0;
+ event = RPC_TRANSPORT_CONNECT;
+ }
}
unlock:
pthread_mutex_unlock (&priv->lock);
@@ -1657,31 +2200,29 @@ out:
/* reads rpc_requests during pollin */
-int
+static int
socket_event_handler (int fd, int idx, void *data,
int poll_in, int poll_out, int poll_err)
{
- rpc_transport_t *this = NULL;
+ rpc_transport_t *this = NULL;
socket_private_t *priv = NULL;
- int ret = 0;
+ int ret = -1;
this = data;
- if (!this || !this->private || !this->xl)
- goto out;
+ GF_VALIDATE_OR_GOTO ("socket", this, out);
+ GF_VALIDATE_OR_GOTO ("socket", this->private, out);
+ GF_VALIDATE_OR_GOTO ("socket", this->xl, out);
THIS = this->xl;
priv = this->private;
-
pthread_mutex_lock (&priv->lock);
{
priv->idx = idx;
}
pthread_mutex_unlock (&priv->lock);
- if (!priv->connected) {
- ret = socket_connect_finish (this);
- }
+ ret = (priv->connected == 1) ? 0 : socket_connect_finish(this);
if (!ret && poll_out) {
ret = socket_event_poll_out (this);
@@ -1692,17 +2233,203 @@ socket_event_handler (int fd, int idx, void *data,
}
if ((ret < 0) || poll_err) {
- gf_log ("transport", GF_LOG_TRACE, "disconnecting now");
+ /* Logging has happened already in earlier cases */
+ gf_log ("transport", ((ret >= 0) ? GF_LOG_INFO : GF_LOG_DEBUG),
+ "disconnecting now");
socket_event_poll_err (this);
rpc_transport_unref (this);
- }
+ }
out:
- return 0;
+ return ret;
}
-int
+static void *
+socket_poller (void *ctx)
+{
+ rpc_transport_t *this = ctx;
+ socket_private_t *priv = this->private;
+ struct pollfd pfd[2] = {{0,},};
+ gf_boolean_t to_write = _gf_false;
+ int ret = 0;
+ uint32_t gen = 0;
+
+ priv->ot_state = OT_RUNNING;
+
+ if (priv->use_ssl) {
+ if (ssl_setup_connection(this,priv->connected) < 0) {
+ gf_log (this->name,GF_LOG_ERROR, "%s setup failed",
+ priv->connected ? "server" : "client");
+ goto err;
+ }
+ }
+
+ if (!priv->bio) {
+ ret = __socket_nonblock (priv->sock);
+ if (ret == -1) {
+ gf_log (this->name, GF_LOG_WARNING,
+ "NBIO on %d failed (%s)",
+ priv->sock, strerror (errno));
+ goto err;
+ }
+ }
+
+ if (priv->connected == 0) {
+ THIS = this->xl;
+ ret = socket_connect_finish (this);
+ if (ret != 0) {
+ gf_log (this->name, GF_LOG_WARNING,
+ "asynchronous socket_connect_finish failed");
+ }
+ }
+
+ ret = rpc_transport_notify (this->listener,
+ RPC_TRANSPORT_ACCEPT, this);
+ if (ret != 0) {
+ gf_log (this->name, GF_LOG_WARNING,
+ "asynchronous rpc_transport_notify failed");
+ }
+
+ gen = priv->ot_gen;
+ for (;;) {
+ pthread_mutex_lock(&priv->lock);
+ to_write = !list_empty(&priv->ioq);
+ pthread_mutex_unlock(&priv->lock);
+ pfd[0].fd = priv->pipe[0];
+ pfd[0].events = POLL_MASK_ERROR;
+ pfd[0].revents = 0;
+ pfd[1].fd = priv->sock;
+ pfd[1].events = POLL_MASK_INPUT | POLL_MASK_ERROR;
+ pfd[1].revents = 0;
+ if (to_write) {
+ pfd[1].events |= POLL_MASK_OUTPUT;
+ }
+ else {
+ pfd[0].events |= POLL_MASK_INPUT;
+ }
+ if (poll(pfd,2,-1) < 0) {
+ gf_log(this->name,GF_LOG_ERROR,"poll failed");
+ break;
+ }
+ if (pfd[0].revents & POLL_MASK_ERROR) {
+ gf_log(this->name,GF_LOG_ERROR,
+ "poll error on pipe");
+ break;
+ }
+ /* Only glusterd actually seems to need this. */
+ THIS = this->xl;
+ if (pfd[1].revents & POLL_MASK_INPUT) {
+ ret = socket_event_poll_in(this);
+ if (ret >= 0) {
+ /* Suppress errors while making progress. */
+ pfd[1].revents &= ~POLL_MASK_ERROR;
+ }
+ else if (errno == ENOTCONN) {
+ ret = 0;
+ }
+ if (priv->ot_state == OT_PLEASE_DIE) {
+ gf_log (this->name, GF_LOG_TRACE,
+ "OT_IDLE on %p (input request)",
+ this);
+ priv->ot_state = OT_IDLE;
+ break;
+ }
+ }
+ else if (pfd[1].revents & POLL_MASK_OUTPUT) {
+ ret = socket_event_poll_out(this);
+ if (ret >= 0) {
+ /* Suppress errors while making progress. */
+ pfd[1].revents &= ~POLL_MASK_ERROR;
+ }
+ else if (errno == ENOTCONN) {
+ ret = 0;
+ }
+ if (priv->ot_state == OT_PLEASE_DIE) {
+ gf_log (this->name, GF_LOG_TRACE,
+ "OT_IDLE on %p (output request)",
+ this);
+ priv->ot_state = OT_IDLE;
+ break;
+ }
+ }
+ else {
+ /*
+ * This usually means that we left poll() because
+ * somebody pushed a byte onto our pipe. That wakeup
+ * is why the pipe is there, but once awake we can do
+ * all the checking we need on the next iteration.
+ */
+ ret = 0;
+ }
+ if (pfd[1].revents & POLL_MASK_ERROR) {
+ gf_log(this->name,GF_LOG_ERROR,
+ "poll error on socket");
+ break;
+ }
+ if (ret < 0) {
+ gf_log(this->name,GF_LOG_ERROR,
+ "error in polling loop");
+ break;
+ }
+ if (priv->ot_gen != gen) {
+ gf_log (this->name, GF_LOG_TRACE,
+ "generation mismatch, my %u != %u",
+ gen, priv->ot_gen);
+ return NULL;
+ }
+ }
+
+err:
+ /* All (and only) I/O errors should come here. */
+ pthread_mutex_lock(&priv->lock);
+ if (priv->ssl_ssl) {
+ /*
+ * We're always responsible for this part, but only actually
+ * have to do it if we got far enough for ssl_ssl to be valid
+ * (i.e. errors in ssl_setup_connection don't count).
+ */
+ ssl_teardown_connection(priv);
+ }
+ __socket_shutdown(this);
+ close(priv->sock);
+ priv->sock = -1;
+ priv->ot_state = OT_IDLE;
+ pthread_mutex_unlock(&priv->lock);
+ rpc_transport_notify (this->listener, RPC_TRANSPORT_DISCONNECT,
+ this);
+ rpc_transport_unref (this);
+ return NULL;
+}
+
+
+static void
+socket_spawn (rpc_transport_t *this)
+{
+ socket_private_t *priv = this->private;
+
+ switch (priv->ot_state) {
+ case OT_IDLE:
+ case OT_PLEASE_DIE:
+ break;
+ default:
+ gf_log (this->name, GF_LOG_WARNING,
+ "refusing to start redundant poller");
+ return;
+ }
+
+ priv->ot_gen += 7;
+ priv->ot_state = OT_SPAWNING;
+ gf_log (this->name, GF_LOG_TRACE,
+ "spawning %p with gen %u", this, priv->ot_gen);
+
+ if (gf_thread_create(&priv->thread,NULL,socket_poller,this) != 0) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "could not create poll thread");
+ }
+}
+
+static int
socket_server_event_handler (int fd, int idx, void *data,
int poll_in, int poll_out, int poll_err)
{
@@ -1714,15 +2441,16 @@ socket_server_event_handler (int fd, int idx, void *data,
struct sockaddr_storage new_sockaddr = {0, };
socklen_t addrlen = sizeof (new_sockaddr);
socket_private_t *new_priv = NULL;
- glusterfs_ctx_t *ctx = NULL;
+ glusterfs_ctx_t *ctx = NULL;
this = data;
- if (!this || !this->private || !this->xl)
- goto out;
+ GF_VALIDATE_OR_GOTO ("socket", this, out);
+ GF_VALIDATE_OR_GOTO ("socket", this->private, out);
+ GF_VALIDATE_OR_GOTO ("socket", this->xl, out);
THIS = this->xl;
priv = this->private;
- ctx = this->ctx;
+ ctx = this->ctx;
pthread_mutex_lock (&priv->lock);
{
@@ -1730,53 +2458,65 @@ socket_server_event_handler (int fd, int idx, void *data,
if (poll_in) {
new_sock = accept (priv->sock, SA (&new_sockaddr),
- &addrlen);
+ &addrlen);
- if (new_sock == -1)
+ if (new_sock == -1) {
+ gf_log (this->name, GF_LOG_WARNING,
+ "accept on %d failed (%s)",
+ priv->sock, strerror (errno));
goto unlock;
-
- if (!priv->bio) {
- ret = __socket_nonblock (new_sock);
-
- if (ret == -1) {
- gf_log (this->name, GF_LOG_DEBUG,
- "NBIO on %d failed (%s)",
- new_sock, strerror (errno));
-
- close (new_sock);
- goto unlock;
- }
}
- if (priv->nodelay) {
+ if (priv->nodelay && (new_sockaddr.ss_family != AF_UNIX)) {
ret = __socket_nodelay (new_sock);
if (ret == -1) {
- gf_log (this->name, GF_LOG_ERROR,
+ gf_log (this->name, GF_LOG_WARNING,
"setsockopt() failed for "
"NODELAY (%s)",
strerror (errno));
}
}
+ if (priv->keepalive &&
+ new_sockaddr.ss_family != AF_UNIX) {
+ ret = __socket_keepalive (new_sock,
+ new_sockaddr.ss_family,
+ priv->keepaliveintvl,
+ priv->keepaliveidle);
+ if (ret == -1)
+ gf_log (this->name, GF_LOG_WARNING,
+ "Failed to set keep-alive: %s",
+ strerror (errno));
+ }
+
new_trans = GF_CALLOC (1, sizeof (*new_trans),
gf_common_mt_rpc_trans_t);
if (!new_trans)
goto unlock;
+ ret = pthread_mutex_init(&new_trans->lock, NULL);
+ if (ret == -1) {
+ gf_log (this->name, GF_LOG_WARNING,
+ "pthread_mutex_init() failed: %s",
+ strerror (errno));
+ close (new_sock);
+ goto unlock;
+ }
+
new_trans->name = gf_strdup (this->name);
memcpy (&new_trans->peerinfo.sockaddr, &new_sockaddr,
- addrlen);
+ addrlen);
new_trans->peerinfo.sockaddr_len = addrlen;
new_trans->myinfo.sockaddr_len =
- sizeof (new_trans->myinfo.sockaddr);
+ sizeof (new_trans->myinfo.sockaddr);
ret = getsockname (new_sock,
SA (&new_trans->myinfo.sockaddr),
&new_trans->myinfo.sockaddr_len);
if (ret == -1) {
- gf_log (this->name, GF_LOG_DEBUG,
+ gf_log (this->name, GF_LOG_WARNING,
"getsockname on %d failed (%s)",
new_sock, strerror (errno));
close (new_sock);
@@ -1784,7 +2524,11 @@ socket_server_event_handler (int fd, int idx, void *data,
}
get_transport_identifiers (new_trans);
- socket_init (new_trans);
+ ret = socket_init(new_trans);
+ if (ret != 0) {
+ close(new_sock);
+ goto unlock;
+ }
new_trans->ops = this->ops;
new_trans->init = this->init;
new_trans->fini = this->fini;
@@ -1795,27 +2539,74 @@ socket_server_event_handler (int fd, int idx, void *data,
new_trans->listener = this;
new_priv = new_trans->private;
+ new_priv->use_ssl = priv->use_ssl;
+ new_priv->sock = new_sock;
+ new_priv->own_thread = priv->own_thread;
+
+ new_priv->ssl_ctx = priv->ssl_ctx;
+ if (priv->use_ssl && !priv->own_thread) {
+ if (ssl_setup_connection(new_trans,1) < 0) {
+ gf_log(this->name,GF_LOG_ERROR,
+ "server setup failed");
+ close(new_sock);
+ goto unlock;
+ }
+ }
+
+ if (!priv->bio && !priv->own_thread) {
+ ret = __socket_nonblock (new_sock);
+
+ if (ret == -1) {
+ gf_log (this->name, GF_LOG_WARNING,
+ "NBIO on %d failed (%s)",
+ new_sock, strerror (errno));
+
+ close (new_sock);
+ goto unlock;
+ }
+ }
+
pthread_mutex_lock (&new_priv->lock);
{
- new_priv->sock = new_sock;
+ /*
+ * In the own_thread case, this is used to
+ * indicate that we're initializing a server
+ * connection.
+ */
new_priv->connected = 1;
+ new_priv->is_server = _gf_true;
rpc_transport_ref (new_trans);
- new_priv->idx =
- event_register (ctx->event_pool,
- new_sock,
- socket_event_handler,
- new_trans, 1, 0);
+ if (new_priv->own_thread) {
+ if (pipe(new_priv->pipe) < 0) {
+ gf_log(this->name,GF_LOG_ERROR,
+ "could not create pipe");
+ }
+ socket_spawn(new_trans);
+ }
+ else {
+ new_priv->idx =
+ event_register (ctx->event_pool,
+ new_sock,
+ socket_event_handler,
+ new_trans,
+ 1, 0);
+ if (new_priv->idx == -1)
+ ret = -1;
+ }
- if (new_priv->idx == -1)
- ret = -1;
}
pthread_mutex_unlock (&new_priv->lock);
- if (ret == -1)
+ if (ret == -1) {
+ gf_log (this->name, GF_LOG_WARNING,
+ "failed to register the socket with event");
goto unlock;
+ }
- ret = rpc_transport_notify (this, RPC_TRANSPORT_ACCEPT,
- new_trans);
+ if (!priv->own_thread) {
+ ret = rpc_transport_notify (this,
+ RPC_TRANSPORT_ACCEPT, new_trans);
+ }
}
}
unlock:
@@ -1826,14 +2617,14 @@ out:
}
-int
+static int
socket_disconnect (rpc_transport_t *this)
{
socket_private_t *priv = NULL;
int ret = -1;
- if (!this || !this->private)
- goto out;
+ GF_VALIDATE_OR_GOTO ("socket", this, out);
+ GF_VALIDATE_OR_GOTO ("socket", this->private, out);
priv = this->private;
@@ -1848,25 +2639,27 @@ out:
}
-int
+static int
socket_connect (rpc_transport_t *this, int port)
{
int ret = -1;
- int sock = -1;
+ int sock = -1;
socket_private_t *priv = NULL;
- struct sockaddr_storage sockaddr = {0, };
socklen_t sockaddr_len = 0;
- glusterfs_ctx_t *ctx = NULL;
+ glusterfs_ctx_t *ctx = NULL;
sa_family_t sa_family = {0, };
+ char *local_addr = NULL;
+ union gf_sock_union sock_union;
+ struct sockaddr_in *addr = NULL;
- if (!this || !this->private)
- goto err;
+ GF_VALIDATE_OR_GOTO ("socket", this, err);
+ GF_VALIDATE_OR_GOTO ("socket", this->private, err);
priv = this->private;
- ctx = this->ctx;
+ ctx = this->ctx;
if (!priv) {
- gf_log (this->name, GF_LOG_DEBUG,
+ gf_log_callingfn (this->name, GF_LOG_WARNING,
"connect() called on uninitialized transport");
goto err;
}
@@ -1878,22 +2671,41 @@ socket_connect (rpc_transport_t *this, int port)
pthread_mutex_unlock (&priv->lock);
if (sock != -1) {
- gf_log (this->name, GF_LOG_TRACE,
- "connect () called on transport already connected");
- ret = 0;
+ gf_log_callingfn (this->name, GF_LOG_TRACE,
+ "connect () called on transport already connected");
+ errno = EINPROGRESS;
+ ret = -1;
goto err;
}
- ret = socket_client_get_remote_sockaddr (this, SA (&sockaddr),
+ gf_log (this->name, GF_LOG_TRACE,
+ "connecting %p, state=%u gen=%u sock=%d", this,
+ priv->ot_state, priv->ot_gen, priv->sock);
+
+ ret = socket_client_get_remote_sockaddr (this, &sock_union.sa,
&sockaddr_len, &sa_family);
if (ret == -1) {
/* logged inside client_get_remote_sockaddr */
goto err;
}
- if (port > 0)
- ((struct sockaddr_in *) (&sockaddr))->sin_port = htons (port);
-
+ if (port > 0) {
+ sock_union.sin.sin_port = htons (port);
+ }
+ if (ntohs(sock_union.sin.sin_port) == GF_DEFAULT_SOCKET_LISTEN_PORT) {
+ if (priv->use_ssl) {
+ gf_log(this->name,GF_LOG_DEBUG,
+ "disabling SSL for portmapper connection");
+ priv->use_ssl = _gf_false;
+ }
+ }
+ else {
+ if (priv->ssl_enabled && !priv->use_ssl) {
+ gf_log(this->name,GF_LOG_DEBUG,
+ "re-enabling SSL for I/O connection");
+ priv->use_ssl = _gf_true;
+ }
+ }
pthread_mutex_lock (&priv->lock);
{
if (priv->sock != -1) {
@@ -1902,66 +2714,78 @@ socket_connect (rpc_transport_t *this, int port)
goto unlock;
}
- memcpy (&this->peerinfo.sockaddr, &sockaddr, sockaddr_len);
+ memcpy (&this->peerinfo.sockaddr, &sock_union.storage,
+ sockaddr_len);
this->peerinfo.sockaddr_len = sockaddr_len;
priv->sock = socket (sa_family, SOCK_STREAM, 0);
if (priv->sock == -1) {
gf_log (this->name, GF_LOG_ERROR,
"socket creation failed (%s)",
- strerror (errno));
+ strerror (errno));
goto unlock;
}
/* Cant help if setting socket options fails. We can continue
* working nonetheless.
*/
- if (setsockopt (priv->sock, SOL_SOCKET, SO_RCVBUF,
- &priv->windowsize,
- sizeof (priv->windowsize)) < 0) {
- gf_log (this->name, GF_LOG_ERROR,
- "setting receive window size failed: %d: %d: "
- "%s", priv->sock, priv->windowsize,
- strerror (errno));
- }
-
- if (setsockopt (priv->sock, SOL_SOCKET, SO_SNDBUF,
- &priv->windowsize,
- sizeof (priv->windowsize)) < 0) {
- gf_log (this->name, GF_LOG_ERROR,
- "setting send window size failed: %d: %d: "
- "%s", priv->sock, priv->windowsize,
- strerror (errno));
- }
-
+ if (priv->windowsize != 0) {
+ if (setsockopt (priv->sock, SOL_SOCKET, SO_RCVBUF,
+ &priv->windowsize,
+ sizeof (priv->windowsize)) < 0) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "setting receive window "
+ "size failed: %d: %d: %s",
+ priv->sock, priv->windowsize,
+ strerror (errno));
+ }
- if (priv->nodelay && priv->lowlat) {
- ret = __socket_nodelay (priv->sock);
- if (ret == -1) {
+ if (setsockopt (priv->sock, SOL_SOCKET, SO_SNDBUF,
+ &priv->windowsize,
+ sizeof (priv->windowsize)) < 0) {
gf_log (this->name, GF_LOG_ERROR,
- "setsockopt() failed for NODELAY (%s)",
+ "setting send window size "
+ "failed: %d: %d: %s",
+ priv->sock, priv->windowsize,
strerror (errno));
}
}
- if (!priv->bio) {
- ret = __socket_nonblock (priv->sock);
+ if (priv->nodelay && (sa_family != AF_UNIX)) {
+ ret = __socket_nodelay (priv->sock);
if (ret == -1) {
gf_log (this->name, GF_LOG_ERROR,
- "NBIO on %d failed (%s)",
+ "NODELAY on %d failed (%s)",
priv->sock, strerror (errno));
- close (priv->sock);
- priv->sock = -1;
- goto unlock;
}
}
+ if (priv->keepalive && sa_family != AF_UNIX) {
+ ret = __socket_keepalive (priv->sock,
+ sa_family,
+ priv->keepaliveintvl,
+ priv->keepaliveidle);
+ if (ret == -1)
+ gf_log (this->name, GF_LOG_ERROR,
+ "Failed to set keep-alive: %s",
+ strerror (errno));
+ }
+
SA (&this->myinfo.sockaddr)->sa_family =
- SA (&this->peerinfo.sockaddr)->sa_family;
+ SA (&this->peerinfo.sockaddr)->sa_family;
+
+ /* If a source addr is explicitly specified, use it */
+ ret = dict_get_str (this->options,
+ "transport.socket.source-addr",
+ &local_addr);
+ if (!ret && SA (&this->myinfo.sockaddr)->sa_family == AF_INET) {
+ addr = (struct sockaddr_in *)(&this->myinfo.sockaddr);
+ ret = inet_pton (AF_INET, local_addr, &(addr->sin_addr.s_addr));
+ }
ret = client_bind (this, SA (&this->myinfo.sockaddr),
- &this->myinfo.sockaddr_len, priv->sock);
+ &this->myinfo.sockaddr_len, priv->sock);
if (ret == -1) {
gf_log (this->name, GF_LOG_WARNING,
"client bind failed: %s", strerror (errno));
@@ -1970,26 +2794,86 @@ socket_connect (rpc_transport_t *this, int port)
goto unlock;
}
+ if (!priv->use_ssl && !priv->bio && !priv->own_thread) {
+ ret = __socket_nonblock (priv->sock);
+ if (ret == -1) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "NBIO on %d failed (%s)",
+ priv->sock, strerror (errno));
+ close (priv->sock);
+ priv->sock = -1;
+ goto unlock;
+ }
+ }
+
ret = connect (priv->sock, SA (&this->peerinfo.sockaddr),
- this->peerinfo.sockaddr_len);
+ this->peerinfo.sockaddr_len);
- if (ret == -1 && errno != EINPROGRESS) {
- gf_log (this->name, GF_LOG_ERROR,
- "connection attempt failed (%s)",
- strerror (errno));
+ if (ret == -1 && ((errno != EINPROGRESS) && (errno != ENOENT))) {
+ /* For unix path based sockets, the socket path is
+ * cryptic (md5sum of path) and may not be useful for
+ * the user in debugging so log it in DEBUG
+ */
+ gf_log (this->name, ((sa_family == AF_UNIX) ?
+ GF_LOG_DEBUG : GF_LOG_ERROR),
+ "connection attempt on %s failed, (%s)",
+ this->peerinfo.identifier, strerror (errno));
close (priv->sock);
priv->sock = -1;
goto unlock;
}
- priv->connected = 0;
+ if (priv->use_ssl && !priv->own_thread) {
+ ret = ssl_setup_connection(this,0);
+ if (ret < 0) {
+ gf_log(this->name,GF_LOG_ERROR,
+ "client setup failed");
+ close(priv->sock);
+ priv->sock = -1;
+ goto unlock;
+ }
+ }
+
+ if (!priv->bio && !priv->own_thread) {
+ ret = __socket_nonblock (priv->sock);
+ if (ret == -1) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "NBIO on %d failed (%s)",
+ priv->sock, strerror (errno));
+ close (priv->sock);
+ priv->sock = -1;
+ goto unlock;
+ }
+ }
+
+ /*
+ * In the own_thread case, this is used to indicate that we're
+ * initializing a client connection.
+ */
+ priv->connected = 0;
+ priv->is_server = _gf_false;
rpc_transport_ref (this);
- priv->idx = event_register (ctx->event_pool, priv->sock,
- socket_event_handler, this, 1, 1);
- if (priv->idx == -1)
- ret = -1;
+ if (priv->own_thread) {
+ if (pipe(priv->pipe) < 0) {
+ gf_log(this->name,GF_LOG_ERROR,
+ "could not create pipe");
+ }
+
+ this->listener = this;
+ socket_spawn(this);
+ }
+ else {
+ priv->idx = event_register (ctx->event_pool, priv->sock,
+ socket_event_handler,
+ this, 1, 1);
+ if (priv->idx == -1) {
+ gf_log ("", GF_LOG_WARNING,
+ "failed to register the event");
+ ret = -1;
+ }
+ }
}
unlock:
pthread_mutex_unlock (&priv->lock);
@@ -1999,24 +2883,24 @@ err:
}
-int
+static int
socket_listen (rpc_transport_t *this)
{
socket_private_t * priv = NULL;
int ret = -1;
- int sock = -1;
+ int sock = -1;
struct sockaddr_storage sockaddr;
- socklen_t sockaddr_len;
+ socklen_t sockaddr_len = 0;
peer_info_t *myinfo = NULL;
- glusterfs_ctx_t *ctx = NULL;
+ glusterfs_ctx_t *ctx = NULL;
sa_family_t sa_family = {0, };
- if (!this || !this->private)
- goto out;
+ GF_VALIDATE_OR_GOTO ("socket", this, out);
+ GF_VALIDATE_OR_GOTO ("socket", this->private, out);
- priv = this->private;
- myinfo = &this->myinfo;
- ctx = this->ctx;
+ priv = this->private;
+ myinfo = &this->myinfo;
+ ctx = this->ctx;
pthread_mutex_lock (&priv->lock);
{
@@ -2025,8 +2909,8 @@ socket_listen (rpc_transport_t *this)
pthread_mutex_unlock (&priv->lock);
if (sock != -1) {
- gf_log (this->name, GF_LOG_DEBUG,
- "alreading listening");
+ gf_log_callingfn (this->name, GF_LOG_DEBUG,
+ "already listening");
return ret;
}
@@ -2052,32 +2936,36 @@ socket_listen (rpc_transport_t *this)
if (priv->sock == -1) {
gf_log (this->name, GF_LOG_ERROR,
"socket creation failed (%s)",
- strerror (errno));
+ strerror (errno));
goto unlock;
}
/* Cant help if setting socket options fails. We can continue
* working nonetheless.
*/
- if (setsockopt (priv->sock, SOL_SOCKET, SO_RCVBUF,
- &priv->windowsize,
- sizeof (priv->windowsize)) < 0) {
- gf_log (this->name, GF_LOG_ERROR,
- "setting receive window size failed: %d: %d: "
- "%s", priv->sock, priv->windowsize,
- strerror (errno));
- }
+ if (priv->windowsize != 0) {
+ if (setsockopt (priv->sock, SOL_SOCKET, SO_RCVBUF,
+ &priv->windowsize,
+ sizeof (priv->windowsize)) < 0) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "setting receive window size "
+ "failed: %d: %d: %s", priv->sock,
+ priv->windowsize,
+ strerror (errno));
+ }
- if (setsockopt (priv->sock, SOL_SOCKET, SO_SNDBUF,
- &priv->windowsize,
- sizeof (priv->windowsize)) < 0) {
- gf_log (this->name, GF_LOG_ERROR,
- "setting send window size failed: %d: %d: "
- "%s", priv->sock, priv->windowsize,
- strerror (errno));
+ if (setsockopt (priv->sock, SOL_SOCKET, SO_SNDBUF,
+ &priv->windowsize,
+ sizeof (priv->windowsize)) < 0) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "setting send window size failed:"
+ " %d: %d: %s", priv->sock,
+ priv->windowsize,
+ strerror (errno));
+ }
}
- if (priv->nodelay) {
+ if (priv->nodelay && (sa_family != AF_UNIX)) {
ret = __socket_nodelay (priv->sock);
if (ret == -1) {
gf_log (this->name, GF_LOG_ERROR,
@@ -2108,12 +2996,15 @@ socket_listen (rpc_transport_t *this)
goto unlock;
}
- ret = listen (priv->sock, 10);
+ if (priv->backlog)
+ ret = listen (priv->sock, priv->backlog);
+ else
+ ret = listen (priv->sock, 10);
if (ret == -1) {
gf_log (this->name, GF_LOG_ERROR,
"could not set socket %d to listen mode (%s)",
- priv->sock, strerror (errno));
+ priv->sock, strerror (errno));
close (priv->sock);
priv->sock = -1;
goto unlock;
@@ -2123,12 +3014,12 @@ socket_listen (rpc_transport_t *this)
priv->idx = event_register (ctx->event_pool, priv->sock,
socket_server_event_handler,
- this, 1, 0);
+ this, 1, 0);
if (priv->idx == -1) {
- gf_log (this->name, GF_LOG_DEBUG,
+ gf_log (this->name, GF_LOG_WARNING,
"could not register socket %d with events",
- priv->sock);
+ priv->sock);
ret = -1;
close (priv->sock);
priv->sock = -1;
@@ -2143,7 +3034,7 @@ out:
}
-int32_t
+static int32_t
socket_submit_request (rpc_transport_t *this, rpc_transport_req_t *req)
{
socket_private_t *priv = NULL;
@@ -2151,19 +3042,20 @@ socket_submit_request (rpc_transport_t *this, rpc_transport_req_t *req)
char need_poll_out = 0;
char need_append = 1;
struct ioq *entry = NULL;
- glusterfs_ctx_t *ctx = NULL;
+ glusterfs_ctx_t *ctx = NULL;
+ char a_byte = 'j';
- if (!this || !this->private)
- goto out;
+ GF_VALIDATE_OR_GOTO ("socket", this, out);
+ GF_VALIDATE_OR_GOTO ("socket", this->private, out);
priv = this->private;
- ctx = this->ctx;
+ ctx = this->ctx;
pthread_mutex_lock (&priv->lock);
{
if (priv->connected != 1) {
if (!priv->submit_log && !priv->connect_finish_log) {
- gf_log (this->name, GF_LOG_DEBUG,
+ gf_log (this->name, GF_LOG_INFO,
"not connected (priv->connected = %d)",
priv->connected);
priv->submit_log = 1;
@@ -2177,24 +3069,34 @@ socket_submit_request (rpc_transport_t *this, rpc_transport_req_t *req)
goto unlock;
if (list_empty (&priv->ioq)) {
- ret = __socket_ioq_churn_entry (this, entry);
+ ret = __socket_ioq_churn_entry (this, entry, 1);
- if (ret == 0)
+ if (ret == 0) {
need_append = 0;
-
- if (ret > 0)
+ }
+ if (ret > 0) {
need_poll_out = 1;
+ }
}
if (need_append) {
list_add_tail (&entry->list, &priv->ioq);
+ if (priv->own_thread) {
+ /*
+ * Make sure the polling thread wakes up, by
+ * writing a byte to represent this entry.
+ */
+ if (write(priv->pipe[1],&a_byte,1) < 1) {
+ gf_log(this->name,GF_LOG_WARNING,
+ "write error on pipe");
+ }
+ }
ret = 0;
}
-
- if (need_poll_out) {
+ if (!priv->own_thread && need_poll_out) {
/* first entry to wait. continue writing on POLLOUT */
priv->idx = event_select_on (ctx->event_pool,
- priv->sock,
+ priv->sock,
priv->idx, -1, 1);
}
}
@@ -2206,7 +3108,7 @@ out:
}
-int32_t
+static int32_t
socket_submit_reply (rpc_transport_t *this, rpc_transport_reply_t *reply)
{
socket_private_t *priv = NULL;
@@ -2214,52 +3116,64 @@ socket_submit_reply (rpc_transport_t *this, rpc_transport_reply_t *reply)
char need_poll_out = 0;
char need_append = 1;
struct ioq *entry = NULL;
- glusterfs_ctx_t *ctx = NULL;
+ glusterfs_ctx_t *ctx = NULL;
+ char a_byte = 'd';
- if (!this || !this->private)
- goto out;
+ GF_VALIDATE_OR_GOTO ("socket", this, out);
+ GF_VALIDATE_OR_GOTO ("socket", this->private, out);
priv = this->private;
- ctx = this->ctx;
+ ctx = this->ctx;
pthread_mutex_lock (&priv->lock);
{
if (priv->connected != 1) {
if (!priv->submit_log && !priv->connect_finish_log) {
- gf_log (this->name, GF_LOG_DEBUG,
+ gf_log (this->name, GF_LOG_INFO,
"not connected (priv->connected = %d)",
priv->connected);
priv->submit_log = 1;
}
goto unlock;
}
+
priv->submit_log = 0;
entry = __socket_ioq_new (this, &reply->msg);
if (!entry)
goto unlock;
+
if (list_empty (&priv->ioq)) {
- ret = __socket_ioq_churn_entry (this, entry);
+ ret = __socket_ioq_churn_entry (this, entry, 1);
- if (ret == 0)
+ if (ret == 0) {
need_append = 0;
-
- if (ret > 0)
+ }
+ if (ret > 0) {
need_poll_out = 1;
+ }
}
if (need_append) {
list_add_tail (&entry->list, &priv->ioq);
+ if (priv->own_thread) {
+ /*
+ * Make sure the polling thread wakes up, by
+ * writing a byte to represent this entry.
+ */
+ if (write(priv->pipe[1],&a_byte,1) < 1) {
+ gf_log(this->name,GF_LOG_WARNING,
+ "write error on pipe");
+ }
+ }
ret = 0;
}
-
- if (need_poll_out) {
+ if (!priv->own_thread && need_poll_out) {
/* first entry to wait. continue writing on POLLOUT */
priv->idx = event_select_on (ctx->event_pool,
- priv->sock,
+ priv->sock,
priv->idx, -1, 1);
}
}
-
unlock:
pthread_mutex_unlock (&priv->lock);
@@ -2268,14 +3182,13 @@ out:
}
-int32_t
+static int32_t
socket_getpeername (rpc_transport_t *this, char *hostname, int hostlen)
{
int32_t ret = -1;
- if ((this == NULL) || (hostname == NULL)) {
- goto out;
- }
+ GF_VALIDATE_OR_GOTO ("socket", this, out);
+ GF_VALIDATE_OR_GOTO ("socket", hostname, out);
if (hostlen < (strlen (this->peerinfo.identifier) + 1)) {
goto out;
@@ -2288,35 +3201,34 @@ out:
}
-int32_t
+static int32_t
socket_getpeeraddr (rpc_transport_t *this, char *peeraddr, int addrlen,
struct sockaddr_storage *sa, socklen_t salen)
{
int32_t ret = -1;
- if ((this == NULL) || (sa == NULL)) {
- goto out;
- }
+ GF_VALIDATE_OR_GOTO ("socket", this, out);
+ GF_VALIDATE_OR_GOTO ("socket", sa, out);
*sa = this->peerinfo.sockaddr;
if (peeraddr != NULL) {
ret = socket_getpeername (this, peeraddr, addrlen);
}
+ ret = 0;
out:
return ret;
}
-int32_t
+static int32_t
socket_getmyname (rpc_transport_t *this, char *hostname, int hostlen)
{
int32_t ret = -1;
- if ((this == NULL) || (hostname == NULL)) {
- goto out;
- }
+ GF_VALIDATE_OR_GOTO ("socket", this, out);
+ GF_VALIDATE_OR_GOTO ("socket", hostname, out);
if (hostlen < (strlen (this->myinfo.identifier) + 1)) {
goto out;
@@ -2329,15 +3241,14 @@ out:
}
-int32_t
+static int32_t
socket_getmyaddr (rpc_transport_t *this, char *myaddr, int addrlen,
struct sockaddr_storage *sa, socklen_t salen)
{
int32_t ret = 0;
- if ((this == NULL) || (sa == NULL)) {
- goto out;
- }
+ GF_VALIDATE_OR_GOTO ("socket", this, out);
+ GF_VALIDATE_OR_GOTO ("socket", sa, out);
*sa = this->myinfo.sockaddr;
@@ -2350,6 +3261,25 @@ out:
}
+static int
+socket_throttle (rpc_transport_t *this, gf_boolean_t onoff)
+{
+ socket_private_t *priv = NULL;
+
+ priv = this->private;
+
+ /* The way we implement throttling is by taking off
+ POLLIN event from the polled flags. This way we
+ never get called with the POLLIN event and therefore
+ will never read() any more data until throttling
+ is turned off.
+ */
+ priv->idx = event_select_on (this->ctx->event_pool, priv->sock,
+ priv->idx, (int) !onoff, -1);
+ return 0;
+}
+
+
struct rpc_transport_ops tops = {
.listen = socket_listen,
.connect = socket_connect,
@@ -2360,39 +3290,100 @@ struct rpc_transport_ops tops = {
.get_peeraddr = socket_getpeeraddr,
.get_myname = socket_getmyname,
.get_myaddr = socket_getmyaddr,
+ .throttle = socket_throttle,
};
-
int
+reconfigure (rpc_transport_t *this, dict_t *options)
+{
+ socket_private_t *priv = NULL;
+ gf_boolean_t tmp_bool = _gf_false;
+ char *optstr = NULL;
+ int ret = 0;
+ uint64_t windowsize = 0;
+
+ GF_VALIDATE_OR_GOTO ("socket", this, out);
+ GF_VALIDATE_OR_GOTO ("socket", this->private, out);
+
+ if (!this || !this->private) {
+ ret =-1;
+ goto out;
+ }
+
+ priv = this->private;
+
+ if (dict_get_str (this->options, "transport.socket.keepalive",
+ &optstr) == 0) {
+ if (gf_string2boolean (optstr, &tmp_bool) == -1) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "'transport.socket.keepalive' takes only "
+ "boolean options, not taking any action");
+ priv->keepalive = 1;
+ ret = -1;
+ goto out;
+ }
+ gf_log (this->name, GF_LOG_DEBUG, "Reconfigured transport.socket.keepalive");
+
+ priv->keepalive = tmp_bool;
+ }
+ else
+ priv->keepalive = 1;
+
+ optstr = NULL;
+ if (dict_get_str (this->options, "tcp-window-size",
+ &optstr) == 0) {
+ if (gf_string2bytesize (optstr, &windowsize) != 0) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "invalid number format: %s", optstr);
+ goto out;
+ }
+ }
+
+ priv->windowsize = (int)windowsize;
+
+ ret = 0;
+out:
+ return ret;
+
+}
+
+static int
socket_init (rpc_transport_t *this)
{
socket_private_t *priv = NULL;
gf_boolean_t tmp_bool = 0;
uint64_t windowsize = GF_DEFAULT_SOCKET_WINDOW_SIZE;
char *optstr = NULL;
+ uint32_t keepalive = 0;
+ uint32_t backlog = 0;
+ int session_id = 0;
if (this->private) {
- gf_log (this->name, GF_LOG_DEBUG,
- "double init attempted");
+ gf_log_callingfn (this->name, GF_LOG_ERROR,
+ "double init attempted");
return -1;
}
priv = GF_CALLOC (1, sizeof (*priv), gf_common_mt_socket_private_t);
if (!priv) {
- gf_log (this->name, GF_LOG_ERROR,
- "calloc (1, %"GF_PRI_SIZET") returned NULL",
- sizeof (*priv));
return -1;
}
+ memset(priv,0,sizeof(*priv));
pthread_mutex_init (&priv->lock, NULL);
priv->sock = -1;
priv->idx = -1;
priv->connected = -1;
-
+ priv->nodelay = 1;
+ priv->bio = 0;
+ priv->windowsize = GF_DEFAULT_SOCKET_WINDOW_SIZE;
INIT_LIST_HEAD (&priv->ioq);
+ /* All the below section needs 'this->options' to be present */
+ if (!this->options)
+ goto out;
+
if (dict_get (this->options, "non-blocking-io")) {
optstr = data_to_str (dict_get (this->options,
"non-blocking-io"));
@@ -2400,10 +3391,10 @@ socket_init (rpc_transport_t *this)
if (gf_string2boolean (optstr, &tmp_bool) == -1) {
gf_log (this->name, GF_LOG_ERROR,
"'non-blocking-io' takes only boolean options,"
- " not taking any action");
+ " not taking any action");
tmp_bool = 1;
}
- priv->bio = 0;
+
if (!tmp_bool) {
priv->bio = 1;
gf_log (this->name, GF_LOG_WARNING,
@@ -2414,7 +3405,6 @@ socket_init (rpc_transport_t *this)
optstr = NULL;
// By default, we enable NODELAY
- priv->nodelay = 1;
if (dict_get (this->options, "transport.socket.nodelay")) {
optstr = data_to_str (dict_get (this->options,
"transport.socket.nodelay"));
@@ -2432,9 +3422,8 @@ socket_init (rpc_transport_t *this)
}
}
-
optstr = NULL;
- if (dict_get_str (this->options, "transport.window-size",
+ if (dict_get_str (this->options, "tcp-window-size",
&optstr) == 0) {
if (gf_string2bytesize (optstr, &windowsize) != 0) {
gf_log (this->name, GF_LOG_ERROR,
@@ -2443,17 +3432,188 @@ socket_init (rpc_transport_t *this)
}
}
- optstr = NULL;
+ priv->windowsize = (int)windowsize;
- if (dict_get_str (this->options, "transport.socket.lowlat",
+ optstr = NULL;
+ /* Enable Keep-alive by default. */
+ priv->keepalive = 1;
+ priv->keepaliveintvl = 2;
+ priv->keepaliveidle = 20;
+ if (dict_get_str (this->options, "transport.socket.keepalive",
&optstr) == 0) {
- priv->lowlat = 1;
+ if (gf_string2boolean (optstr, &tmp_bool) == -1) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "'transport.socket.keepalive' takes only "
+ "boolean options, not taking any action");
+ tmp_bool = 1;
+ }
+
+ if (!tmp_bool)
+ priv->keepalive = 0;
+ }
+
+ if (dict_get_uint32 (this->options,
+ "transport.socket.keepalive-interval",
+ &keepalive) == 0) {
+ priv->keepaliveintvl = keepalive;
+ }
+
+ if (dict_get_uint32 (this->options,
+ "transport.socket.keepalive-time",
+ &keepalive) == 0) {
+ priv->keepaliveidle = keepalive;
+ }
+
+ if (dict_get_uint32 (this->options,
+ "transport.socket.listen-backlog",
+ &backlog) == 0) {
+ priv->backlog = backlog;
+ }
+
+ optstr = NULL;
+
+ /* Check if socket read failures are to be logged */
+ priv->read_fail_log = 1;
+ if (dict_get (this->options, "transport.socket.read-fail-log")) {
+ optstr = data_to_str (dict_get (this->options, "transport.socket.read-fail-log"));
+ if (gf_string2boolean (optstr, &tmp_bool) == -1) {
+ gf_log (this->name, GF_LOG_WARNING,
+ "'transport.socket.read-fail-log' takes only "
+ "boolean options; logging socket read fails");
+ }
+ else if (tmp_bool == _gf_false) {
+ priv->read_fail_log = 0;
+ }
}
priv->windowsize = (int)windowsize;
- this->private = priv;
+ priv->ssl_enabled = _gf_false;
+ if (dict_get_str(this->options,SSL_ENABLED_OPT,&optstr) == 0) {
+ if (gf_string2boolean (optstr, &priv->ssl_enabled) != 0) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "invalid value given for ssl-enabled boolean");
+ }
+ }
+
+ priv->ssl_own_cert = DEFAULT_CERT_PATH;
+ if (dict_get_str(this->options,SSL_OWN_CERT_OPT,&optstr) == 0) {
+ if (!priv->ssl_enabled) {
+ gf_log(this->name,GF_LOG_WARNING,
+ "%s specified without %s (ignored)",
+ SSL_OWN_CERT_OPT, SSL_ENABLED_OPT);
+ }
+ priv->ssl_own_cert = optstr;
+ }
+ priv->ssl_own_cert = gf_strdup(priv->ssl_own_cert);
+
+ priv->ssl_private_key = DEFAULT_KEY_PATH;
+ if (dict_get_str(this->options,SSL_PRIVATE_KEY_OPT,&optstr) == 0) {
+ if (!priv->ssl_enabled) {
+ gf_log(this->name,GF_LOG_WARNING,
+ "%s specified without %s (ignored)",
+ SSL_PRIVATE_KEY_OPT, SSL_ENABLED_OPT);
+ }
+ priv->ssl_private_key = optstr;
+ }
+ priv->ssl_private_key = gf_strdup(priv->ssl_private_key);
+
+ priv->ssl_ca_list = DEFAULT_CA_PATH;
+ if (dict_get_str(this->options,SSL_CA_LIST_OPT,&optstr) == 0) {
+ if (!priv->ssl_enabled) {
+ gf_log(this->name,GF_LOG_WARNING,
+ "%s specified without %s (ignored)",
+ SSL_CA_LIST_OPT, SSL_ENABLED_OPT);
+ }
+ priv->ssl_ca_list = optstr;
+ }
+ priv->ssl_ca_list = gf_strdup(priv->ssl_ca_list);
+
+ gf_log(this->name,GF_LOG_INFO,"SSL support is %s",
+ priv->ssl_enabled ? "ENABLED" : "NOT enabled");
+ /*
+ * This might get overridden temporarily in socket_connect (q.v.)
+ * if we're using the glusterd portmapper.
+ */
+ priv->use_ssl = priv->ssl_enabled;
+
+ priv->own_thread = priv->use_ssl;
+ if (dict_get_str(this->options,OWN_THREAD_OPT,&optstr) == 0) {
+ if (gf_string2boolean (optstr, &priv->own_thread) != 0) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "invalid value given for own-thread boolean");
+ }
+ }
+ gf_log(this->name,GF_LOG_INFO,"using %s polling thread",
+ priv->own_thread ? "private" : "system");
+
+ if (priv->use_ssl) {
+ SSL_library_init();
+ SSL_load_error_strings();
+ priv->ssl_meth = (SSL_METHOD *)TLSv1_method();
+ priv->ssl_ctx = SSL_CTX_new(priv->ssl_meth);
+
+ if (SSL_CTX_set_cipher_list(priv->ssl_ctx,
+ "HIGH:-SSLv2") == 0) {
+ gf_log(this->name,GF_LOG_ERROR,
+ "failed to find any valid ciphers");
+ goto err;
+ }
+
+ if (!SSL_CTX_use_certificate_chain_file(priv->ssl_ctx,
+ priv->ssl_own_cert)) {
+ gf_log(this->name,GF_LOG_ERROR,
+ "could not load our cert");
+ goto err;
+ }
+
+ if (!SSL_CTX_use_PrivateKey_file(priv->ssl_ctx,
+ priv->ssl_private_key,
+ SSL_FILETYPE_PEM)) {
+ gf_log(this->name,GF_LOG_ERROR,
+ "could not load private key");
+ goto err;
+ }
+
+ if (!SSL_CTX_load_verify_locations(priv->ssl_ctx,
+ priv->ssl_ca_list,0)) {
+ gf_log(this->name,GF_LOG_ERROR,
+ "could not load CA list");
+ goto err;
+ }
+
+#if (OPENSSL_VERSION_NUMBER < 0x00905100L)
+ SSL_CTX_set_verify_depth(ctx,1);
+#endif
+
+ priv->ssl_session_id = ++session_id;
+ SSL_CTX_set_session_id_context(priv->ssl_ctx,
+ (void *)&priv->ssl_session_id,
+ sizeof(priv->ssl_session_id));
+
+ SSL_CTX_set_verify(priv->ssl_ctx,SSL_VERIFY_PEER,0);
+ }
+
+ if (priv->own_thread) {
+ priv->ot_state = OT_IDLE;
+ }
+
+out:
+ this->private = priv;
return 0;
+
+err:
+ if (priv->ssl_own_cert) {
+ GF_FREE(priv->ssl_own_cert);
+ }
+ if (priv->ssl_private_key) {
+ GF_FREE(priv->ssl_private_key);
+ }
+ if (priv->ssl_ca_list) {
+ GF_FREE(priv->ssl_ca_list);
+ }
+ GF_FREE(priv);
+ return -1;
}
@@ -2479,12 +3639,18 @@ fini (rpc_transport_t *this)
"transport %p destroyed", this);
pthread_mutex_destroy (&priv->lock);
+ if (priv->ssl_private_key) {
+ GF_FREE(priv->ssl_private_key);
+ }
+ if (priv->ssl_own_cert) {
+ GF_FREE(priv->ssl_own_cert);
+ }
+ if (priv->ssl_ca_list) {
+ GF_FREE(priv->ssl_ca_list);
+ }
GF_FREE (priv);
}
- if (this->name)
- GF_FREE (this->name);
-
this->private = NULL;
}
@@ -2526,18 +3692,17 @@ struct volume_options options[] = {
},
{ .key = { "transport.address-family",
"address-family" },
- .value = {"inet", "inet6", "inet/inet6", "inet6/inet",
- "unix", "inet-sdp" },
+ .value = {"inet", "inet6", "unix", "inet-sdp" },
.type = GF_OPTION_TYPE_STR
},
{ .key = {"non-blocking-io"},
.type = GF_OPTION_TYPE_BOOL
},
- { .key = {"transport.window-size"},
+ { .key = {"tcp-window-size"},
.type = GF_OPTION_TYPE_SIZET,
.min = GF_MIN_SOCKET_WINDOW_SIZE,
- .max = GF_MAX_SOCKET_WINDOW_SIZE,
+ .max = GF_MAX_SOCKET_WINDOW_SIZE
},
{ .key = {"transport.socket.nodelay"},
.type = GF_OPTION_TYPE_BOOL
@@ -2545,5 +3710,35 @@ struct volume_options options[] = {
{ .key = {"transport.socket.lowlat"},
.type = GF_OPTION_TYPE_BOOL
},
+ { .key = {"transport.socket.keepalive"},
+ .type = GF_OPTION_TYPE_BOOL
+ },
+ { .key = {"transport.socket.keepalive-interval"},
+ .type = GF_OPTION_TYPE_INT
+ },
+ { .key = {"transport.socket.keepalive-time"},
+ .type = GF_OPTION_TYPE_INT
+ },
+ { .key = {"transport.socket.listen-backlog"},
+ .type = GF_OPTION_TYPE_INT
+ },
+ { .key = {"transport.socket.read-fail-log"},
+ .type = GF_OPTION_TYPE_BOOL
+ },
+ { .key = {SSL_ENABLED_OPT},
+ .type = GF_OPTION_TYPE_BOOL
+ },
+ { .key = {SSL_OWN_CERT_OPT},
+ .type = GF_OPTION_TYPE_STR
+ },
+ { .key = {SSL_PRIVATE_KEY_OPT},
+ .type = GF_OPTION_TYPE_STR
+ },
+ { .key = {SSL_CA_LIST_OPT},
+ .type = GF_OPTION_TYPE_STR
+ },
+ { .key = {OWN_THREAD_OPT},
+ .type = GF_OPTION_TYPE_BOOL
+ },
{ .key = {NULL} }
};