diff options
Diffstat (limited to 'rpc/rpc-transport')
| -rw-r--r-- | rpc/rpc-transport/socket/src/Makefile.am | 2 | ||||
| -rw-r--r-- | rpc/rpc-transport/socket/src/socket.c | 729 | ||||
| -rw-r--r-- | rpc/rpc-transport/socket/src/socket.h | 19 | 
3 files changed, 667 insertions, 83 deletions
diff --git a/rpc/rpc-transport/socket/src/Makefile.am b/rpc/rpc-transport/socket/src/Makefile.am index 2c918c7e313..3d1631a3457 100644 --- a/rpc/rpc-transport/socket/src/Makefile.am +++ b/rpc/rpc-transport/socket/src/Makefile.am @@ -3,7 +3,7 @@ noinst_HEADERS = socket.h name.h  rpctransport_LTLIBRARIES = socket.la  rpctransportdir = $(libdir)/glusterfs/$(PACKAGE_VERSION)/rpc-transport -socket_la_LDFLAGS = -module -avoidversion +socket_la_LDFLAGS = -module -avoidversion -lssl  socket_la_SOURCES = socket.c name.c  socket_la_LIBADD = $(top_builddir)/libglusterfs/src/libglusterfs.la diff --git a/rpc/rpc-transport/socket/src/socket.c b/rpc/rpc-transport/socket/src/socket.c index 9db09d77e6a..088ceb460e9 100644 --- a/rpc/rpc-transport/socket/src/socket.c +++ b/rpc/rpc-transport/socket/src/socket.c @@ -35,9 +35,33 @@  #include <errno.h>  #include <netinet/tcp.h>  #include <rpc/xdr.h> +#include <sys/ioctl.h>  #define GF_LOG_ERRNO(errno) ((errno == ENOTCONN) ? GF_LOG_DEBUG : GF_LOG_ERROR)  #define SA(ptr) ((struct sockaddr *)ptr) +#define SSL_ENABLED_OPT     "transport.socket.ssl-enabled" +#define SSL_OWN_CERT_OPT    "transport.socket.ssl-own-cert" +#define SSL_PRIVATE_KEY_OPT "transport.socket.ssl-private-key" +#define SSL_CA_LIST_OPT     "transport.socket.ssl-ca-list" +#define OWN_THREAD_OPT      "transport.socket.own-thread" + +/* TBD: do automake substitutions etc. (ick) to set these. */ +#if !defined(DEFAULT_CERT_PATH) +#define DEFAULT_CERT_PATH   "/etc/ssl/glusterfs.pem" +#endif +#if !defined(DEFAULT_KEY_PATH) +#define DEFAULT_KEY_PATH    "/etc/ssl/glusterfs.key" +#endif +#if !defined(DEFAULT_CA_PATH) +#define DEFAULT_CA_PATH     "/etc/ssl/glusterfs.ca" +#endif + +#define POLL_MASK_INPUT  (POLLIN | POLLPRI) +#define POLL_MASK_OUTPUT (POLLOUT) +#define POLL_MASK_ERROR  (POLLERR | POLLHUP | POLLNVAL) + +typedef int SSL_unary_func (SSL *); +typedef int SSL_trinary_func (SSL *, void *, int);  #define __socket_proto_reset_pending(priv) do {                 \                  memset (&priv->incoming.frag.vector, 0,         \ @@ -124,9 +148,156 @@                  __socket_proto_update_priv_after_read (priv, ret, bytes_read); \          } -  int socket_init (rpc_transport_t *this); +void +ssl_dump_error_stack (const char *caller) +{ +	unsigned long  errnum = 0; +	char           errbuf[120] = {0,}; + +	/* OpenSSL docs explicitly give 120 as the error-string length. */ + +	while ((errnum = ERR_get_error())) { +		ERR_error_string(errnum,errbuf); +		gf_log(caller,GF_LOG_ERROR,"  %s",errbuf); +	} +} + +int +ssl_do (rpc_transport_t *this, void *buf, size_t len, SSL_trinary_func *func) +{ +	int               r = (-1); +	struct pollfd     pfd = {-1,}; +	socket_private_t *priv = NULL; + +	GF_VALIDATE_OR_GOTO(this->name,this->private,out); +	priv = this->private; + +	for (;;) { +		if (buf) { +                        if (priv->connected == -1) { +                                /* +                                 * Fields in the SSL structure (especially +                                 * the BIO pointers) are not valid at this +                                 * point, so we'll segfault if we pass them +                                 * to SSL_read/SSL_write. +                                 */ +                                gf_log(this->name,GF_LOG_INFO, +                                       "lost connection in %s", __func__); +                                break; +                        } +			r = func(priv->ssl_ssl,buf,len); +		} +		else { +                        /* +                         * We actually need these functions to get to +                         * priv->connected == 1. +                         */ +			r = ((SSL_unary_func *)func)(priv->ssl_ssl); +		} +		switch (SSL_get_error(priv->ssl_ssl,r)) { +		case SSL_ERROR_NONE: +			return r; +		case SSL_ERROR_WANT_READ: +			pfd.fd = priv->sock; +			pfd.events = POLLIN; +			if (poll(&pfd,1,-1) < 0) { +				gf_log(this->name,GF_LOG_ERROR,"poll error %d", +				       errno); +			} +			break; +		case SSL_ERROR_WANT_WRITE: +			pfd.fd = priv->sock; +			pfd.events = POLLOUT; +			if (poll(&pfd,1,-1) < 0) { +				gf_log(this->name,GF_LOG_ERROR,"poll error %d", +				       errno); +			} +			break; +		case SSL_ERROR_SYSCALL: +			/* This is what we get when remote disconnects. */ +			gf_log(this->name,GF_LOG_DEBUG, +			       "syscall error (probably remote disconnect)"); +			errno = ENODATA; +			goto out; +		default: +			errno = EIO; +			goto out;	/* "break" would just loop again */ +		} +	} +out: +	return -1; +} + +#define ssl_connect_one(t)   ssl_do((t),NULL,0,(SSL_trinary_func *)SSL_connect) +#define ssl_accept_one(t)    ssl_do((t),NULL,0,(SSL_trinary_func *)SSL_accept) +#define ssl_read_one(t,b,l)  ssl_do((t),(b),(l),(SSL_trinary_func *)SSL_read) +#define ssl_write_one(t,b,l) ssl_do((t),(b),(l),(SSL_trinary_func *)SSL_write) + +int +ssl_setup_connection (rpc_transport_t *this, int server) +{ +	X509             *peer = NULL; +	char              peer_CN[256] = ""; +	int               ret = -1; +	socket_private_t *priv = NULL; + +	GF_VALIDATE_OR_GOTO(this->name,this->private,done); +	priv = this->private; + +	priv->ssl_ssl = SSL_new(priv->ssl_ctx); +	if (!priv->ssl_ssl) { +		gf_log(this->name,GF_LOG_ERROR,"SSL_new failed"); +		ssl_dump_error_stack(this->name); +		goto done; +	} +	priv->ssl_sbio = BIO_new_socket(priv->sock,BIO_NOCLOSE); +	if (!priv->ssl_sbio) { +		gf_log(this->name,GF_LOG_ERROR,"BIO_new_socket failed"); +		ssl_dump_error_stack(this->name); +		goto free_ssl; +	} +	SSL_set_bio(priv->ssl_ssl,priv->ssl_sbio,priv->ssl_sbio); + +	if (server) { +		ret = ssl_accept_one(this); +	} +	else { +		ret = ssl_connect_one(this); +	} + +	/* Make sure _the call_ succeeded. */ +	if (ret < 0) { +		goto ssl_error; +	} + +	/* Make sure _SSL verification_ succeeded, yielding an identity. */ +	if (SSL_get_verify_result(priv->ssl_ssl) != X509_V_OK) { +		goto ssl_error; +	} +	peer = SSL_get_peer_certificate(priv->ssl_ssl); +	if (!peer) { +		goto ssl_error; +	} + +	/* Finally, everything seems OK. */ +	X509_NAME_get_text_by_NID(X509_get_subject_name(peer), +		NID_commonName, peer_CN, sizeof(peer_CN)-1); +	peer_CN[sizeof(peer_CN)-1] = '\0'; +	gf_log(this->name,GF_LOG_INFO,"peer CN = %s", peer_CN); +	return 0; + +	/* Error paths. */ +ssl_error: +	gf_log(this->name,GF_LOG_ERROR,"SSL connect error"); +	ssl_dump_error_stack(this->name); +free_ssl: +	SSL_free(priv->ssl_ssl); +done: +	return ret; +} +  /*   * return value:   *   0 = success (completed) @@ -159,9 +330,22 @@ __socket_rwv (rpc_transport_t *this, struct iovec *vector, int count,                  *bytes = 0;          } -        while (opcount) { +        while (opcount > 0) { +                if (opvector->iov_len == 0) { +                        gf_log(this->name,GF_LOG_DEBUG, +                               "would have passed zero length to read/write"); +                        ++opvector; +                        --opcount; +                        continue; +                }                  if (write) { -                        ret = writev (sock, opvector, opcount); +			if (priv->use_ssl) { +				ret = ssl_write_one(this, +					opvector->iov_base, opvector->iov_len); +			} +			else { +				ret = writev (sock, opvector, opcount); +			}                          if (ret == 0 || (ret == -1 && errno == EAGAIN)) {                                  /* done for now */ @@ -169,7 +353,18 @@ __socket_rwv (rpc_transport_t *this, struct iovec *vector, int count,                          }                          this->total_bytes_write += ret;                  } else { -                        ret = readv (sock, opvector, opcount); +			if (priv->use_ssl) { +				ret = ssl_read_one(this, +					opvector->iov_base, opvector->iov_len); +			} +			else { +				ret = readv (sock, opvector, opcount); +			} +			if (ret == 0) { +				gf_log(this->name,GF_LOG_DEBUG,"EOF on socket"); +				errno = ENODATA; +				ret = -1; +			}                          if (ret == -1 && errno == EAGAIN) {                                  /* done for now */                                  break; @@ -193,6 +388,9 @@ __socket_rwv (rpc_transport_t *this, struct iovec *vector, int count,                          gf_log (this->name, GF_LOG_WARNING,                                  "%s failed (%s)", write ? "writev" : "readv",                                  strerror (errno)); +			if (priv->use_ssl) { +				ssl_dump_error_stack(this->name); +			}                          opcount = -1;                          break;                  } @@ -204,6 +402,17 @@ __socket_rwv (rpc_transport_t *this, struct iovec *vector, int count,                  moved = 0;                  while (moved < ret) { +                        if (!opcount) { +                                gf_log(this->name,GF_LOG_DEBUG, +                                       "ran out of iov, moved %d/%d", +                                       moved, ret); +                                goto ran_out; +                        } +                        if (!opvector[0].iov_len) { +                                opvector++; +                                opcount--; +                                continue; +                        }                          if ((ret - moved) >= opvector[0].iov_len) {                                  moved += opvector[0].iov_len;                                  opvector++; @@ -213,13 +422,11 @@ __socket_rwv (rpc_transport_t *this, struct iovec *vector, int count,                                  opvector[0].iov_base += (ret - moved);                                  moved += (ret - moved);                          } -                        while (opcount && !opvector[0].iov_len) { -                                opvector++; -                                opcount--; -                        }                  }          } +ran_out: +          if (pending_vector)                  *pending_vector = opvector; @@ -279,6 +486,20 @@ __socket_disconnect (rpc_transport_t *this)                                  "shutdown() returned %d. %s",                                  ret, strerror (errno));                  } +		if (priv->use_ssl) { +			SSL_shutdown(priv->ssl_ssl); +			SSL_clear(priv->ssl_ssl); +			SSL_free(priv->ssl_ssl); +		} +		if (priv->own_thread) { +			/* +			 * Without this, reconnect (= disconnect + connect) +			 * won't work except by accident. +			 */ +			close(priv->sock); +			priv->sock = -1; +			++(priv->socket_gen); +		}          }  out: @@ -356,7 +577,6 @@ __socket_nonblock (int fd)          return ret;  } -  int  __socket_nodelay (int fd)  { @@ -610,9 +830,11 @@ out:  int -__socket_ioq_churn_entry (rpc_transport_t *this, struct ioq *entry) +__socket_ioq_churn_entry (rpc_transport_t *this, struct ioq *entry, int direct)  { -        int ret = -1; +        int               ret = -1; +	socket_private_t *priv = NULL; +	char              a_byte = 0;          ret = __socket_writev (this, entry->pending_vector,                                 entry->pending_count, @@ -623,6 +845,18 @@ __socket_ioq_churn_entry (rpc_transport_t *this, struct ioq *entry)                  /* current entry was completely written */                  GF_ASSERT (entry->pending_count == 0);                  __socket_ioq_entry_free (entry); +		priv = this->private; +		if (priv->own_thread) { +			/* +			 * The pipe should only remain readable if there are +			 * more entries after this, so drain the byte +			 * representing this entry. +			 */ +			if (!direct && read(priv->pipe[0],&a_byte,1) < 1) { +				gf_log(this->name,GF_LOG_WARNING, +				       "read error on pipe"); +			} +		}          }          return ret; @@ -645,13 +879,13 @@ __socket_ioq_churn (rpc_transport_t *this)                  /* pick next entry */                  entry = priv->ioq_next; -                ret = __socket_ioq_churn_entry (this, entry); +                ret = __socket_ioq_churn_entry (this, entry, 0);                  if (ret != 0)                          break;          } -        if (list_empty (&priv->ioq)) { +        if (!priv->own_thread && list_empty (&priv->ioq)) {                  /* all pending writes done, not interested in POLLOUT */                  priv->idx = event_select_on (this->ctx->event_pool,                                               priv->sock, priv->idx, -1, 0); @@ -1729,7 +1963,6 @@ socket_event_poll_in (rpc_transport_t *this)          if (pollin != NULL) {                  ret = rpc_transport_notify (this, RPC_TRANSPORT_MSG_RECEIVED,                                              pollin); -                  rpc_transport_pollin_destroy (pollin);          } @@ -1752,7 +1985,7 @@ socket_connect_finish (rpc_transport_t *this)          pthread_mutex_lock (&priv->lock);          { -                if (priv->connected) +                if (priv->connected != 0)                          goto unlock;                  ret = __socket_connect_finish (priv->sock); @@ -1814,9 +2047,9 @@ int  socket_event_handler (int fd, int idx, void *data,                        int poll_in, int poll_out, int poll_err)  { -        rpc_transport_t      *this = NULL; +        rpc_transport_t  *this = NULL;          socket_private_t *priv = NULL; -        int               ret = 0; +	int               ret = -1;          this = data;          GF_VALIDATE_OR_GOTO ("socket", this, out); @@ -1826,16 +2059,13 @@ socket_event_handler (int fd, int idx, void *data,          THIS = this->xl;          priv = this->private; -          pthread_mutex_lock (&priv->lock);          {                  priv->idx = idx;          }          pthread_mutex_unlock (&priv->lock); -        if (!priv->connected) { -                ret = socket_connect_finish (this); -        } +	ret = (priv->connected == 1) ? 0 : socket_connect_finish(this);          if (!ret && poll_out) {                  ret = socket_event_poll_out (this); @@ -1851,13 +2081,112 @@ socket_event_handler (int fd, int idx, void *data,                          "disconnecting now");                  socket_event_poll_err (this);                  rpc_transport_unref (this); -        } +	}  out: -        return 0; +	return ret;  } +void * +socket_poller (void *ctx) +{ +        rpc_transport_t  *this = ctx; +        socket_private_t *priv = this->private; +	struct pollfd     pfd[2] = {{0,},}; +	gf_boolean_t      to_write = _gf_false; +	int               ret = 0; +	int               orig_gen; + +	orig_gen = ++(priv->socket_gen); + +        if (priv->connected == 0) { +		THIS = this->xl; +                ret = socket_connect_finish (this); +        } + +	for (;;) { +		if (priv->socket_gen != orig_gen) { +			gf_log(this->name,GF_LOG_DEBUG, +			       "redundant poller exiting"); +			return NULL; +		} +		pthread_mutex_lock(&priv->lock); +		to_write = !list_empty(&priv->ioq); +		pthread_mutex_unlock(&priv->lock); +		pfd[0].fd = priv->pipe[0]; +		pfd[0].events = POLL_MASK_ERROR; +		pfd[0].revents = 0; +		pfd[1].fd = priv->sock; +		pfd[1].events = POLL_MASK_INPUT | POLL_MASK_ERROR; +		pfd[1].revents = 0; +		if (to_write) { +			pfd[1].events |= POLL_MASK_OUTPUT; +		} +		else { +			pfd[0].events |= POLL_MASK_INPUT; +		} +		if (poll(pfd,2,-1) < 0) { +			gf_log(this->name,GF_LOG_ERROR,"poll failed"); +			break; +		} +		if (pfd[0].revents & POLL_MASK_ERROR) { +			gf_log(this->name,GF_LOG_ERROR, +			       "poll error on pipe"); +			break; +		} +		/* Only glusterd actually seems to need this. */ +		THIS = this->xl; +		if (pfd[1].revents & POLL_MASK_INPUT) { +			ret = socket_event_poll_in(this); +			if (ret >= 0) { +				/* Suppress errors while making progress. */ +				pfd[1].revents &= ~POLL_MASK_ERROR; +			} +			else if (errno == ENOTCONN) { +				ret = 0; +			} +		} +		else if (pfd[1].revents & POLL_MASK_OUTPUT) { +			ret = socket_event_poll_out(this); +			if (ret >= 0) { +				/* Suppress errors while making progress. */ +				pfd[1].revents &= ~POLL_MASK_ERROR; +			} +			else if (errno == ENOTCONN) { +				ret = 0; +			} +		} +		else { +			/* +			 * This usually means that we left poll() because +			 * somebody pushed a byte onto our pipe.  That wakeup +			 * is why the pipe is there, but once awake we can do +			 * all the checking we need on the next iteration. +			 */ +			ret = 0; +		} +		if (pfd[1].revents & POLL_MASK_ERROR) { +			gf_log(this->name,GF_LOG_ERROR, +			       "poll error on socket"); +			break; +		} +		if (ret < 0) { +			gf_log(this->name,GF_LOG_ERROR, +			       "error in polling loop"); +			break; +		} +	} + +	/* All (and only) I/O errors should come here. */ +	__socket_disconnect (this); +        rpc_transport_notify (this, RPC_TRANSPORT_DISCONNECT, this); +	rpc_transport_unref (this); +	return NULL; +} + + +  int  socket_server_event_handler (int fd, int idx, void *data,                               int poll_in, int poll_out, int poll_err) @@ -1896,19 +2225,6 @@ socket_server_event_handler (int fd, int idx, void *data,                                  goto unlock;                          } -                        if (!priv->bio) { -                                ret = __socket_nonblock (new_sock); - -                                if (ret == -1) { -                                        gf_log (this->name, GF_LOG_WARNING, -                                                "NBIO on %d failed (%s)", -                                                new_sock, strerror (errno)); - -                                        close (new_sock); -                                        goto unlock; -                                } -                        } -                          if (priv->nodelay) {                                  ret = __socket_nodelay (new_sock);                                  if (ret == -1) { @@ -1955,7 +2271,11 @@ socket_server_event_handler (int fd, int idx, void *data,                          }                          get_transport_identifiers (new_trans); -                        socket_init (new_trans); +                        ret = socket_init(new_trans); +                        if (ret != 0) { +                                close(new_sock); +                                goto unlock; +                        }                          new_trans->ops = this->ops;                          new_trans->init = this->init;                          new_trans->fini = this->fini; @@ -1966,20 +2286,61 @@ socket_server_event_handler (int fd, int idx, void *data,                          new_trans->listener = this;                          new_priv = new_trans->private; +			new_priv->use_ssl = priv->use_ssl; +			new_priv->sock = new_sock; +			new_priv->own_thread = priv->own_thread; + +			if (priv->use_ssl) { +				new_priv->ssl_ctx = priv->ssl_ctx; +				if (ssl_setup_connection(new_trans,1) < 0) { +					gf_log(this->name,GF_LOG_ERROR, +					       "server setup failed"); +					close(new_sock); +					goto unlock; +				} +			} + +                        if (!priv->bio) { +                                ret = __socket_nonblock (new_sock); + +                                if (ret == -1) { +                                        gf_log (this->name, GF_LOG_WARNING, +                                                "NBIO on %d failed (%s)", +                                                new_sock, strerror (errno)); + +                                        close (new_sock); +                                        goto unlock; +                                } +                        } +                          pthread_mutex_lock (&new_priv->lock);                          { -                                new_priv->sock = new_sock;                                  new_priv->connected = 1;                                  rpc_transport_ref (new_trans); -                                new_priv->idx = -                                        event_register (ctx->event_pool, -                                                        new_sock, -                                                        socket_event_handler, -                                                        new_trans, 1, 0); +				if (new_priv->own_thread) { +					if (pipe(new_priv->pipe) < 0) { +						gf_log(this->name,GF_LOG_ERROR, +						       "could not create pipe"); +					} +					if (pthread_create(&new_priv->thread, +							NULL, socket_poller, +							new_trans) != 0) { +						gf_log(this->name,GF_LOG_ERROR, +						       "could not create poll thread"); +					} +				} +				else { +					new_priv->idx = +						event_register (ctx->event_pool, +								new_sock, +								socket_event_handler, +								new_trans, +								1, 0); +					if (new_priv->idx == -1) +						ret = -1; +				} -                                if (new_priv->idx == -1) -                                        ret = -1;                          }                          pthread_mutex_unlock (&new_priv->lock);                          if (ret == -1) { @@ -2069,6 +2430,20 @@ socket_connect (rpc_transport_t *this, int port)          if (port > 0) {                  sock_union.sin.sin_port = htons (port);          } +        if (ntohs(sock_union.sin.sin_port) == GF_DEFAULT_SOCKET_LISTEN_PORT) { +                if (priv->use_ssl) { +                        gf_log(this->name,GF_LOG_DEBUG, +                               "disabling SSL for portmapper connection"); +                        priv->use_ssl = _gf_false; +                } +        } +        else { +                if (priv->ssl_enabled && !priv->use_ssl) { +                        gf_log(this->name,GF_LOG_DEBUG, +                               "re-enabling SSL for I/O connection"); +                        priv->use_ssl = _gf_true; +                } +        }          pthread_mutex_lock (&priv->lock);          {                  if (priv->sock != -1) { @@ -2124,19 +2499,6 @@ socket_connect (rpc_transport_t *this, int port)                          }                  } -                if (!priv->bio) { -                        ret = __socket_nonblock (priv->sock); - -                        if (ret == -1) { -                                gf_log (this->name, GF_LOG_ERROR, -                                        "NBIO on %d failed (%s)", -                                        priv->sock, strerror (errno)); -                                close (priv->sock); -                                priv->sock = -1; -                                goto unlock; -                        } -                } -                  if (priv->keepalive) {                          ret = __socket_keepalive (priv->sock,                                                    priv->keepaliveintvl, @@ -2172,17 +2534,55 @@ socket_connect (rpc_transport_t *this, int port)                          goto unlock;                  } -                priv->connected = 0; +		if (priv->use_ssl) { +			ret = ssl_setup_connection(this,0); +			if (ret < 0) { +				gf_log(this->name,GF_LOG_ERROR, +					"client setup failed"); +				close(priv->sock); +				priv->sock = -1; +				goto unlock; +			} +		} -                rpc_transport_ref (this); +                if (!priv->bio) { +                        ret = __socket_nonblock (priv->sock); -                priv->idx = event_register (ctx->event_pool, priv->sock, -                                            socket_event_handler, this, 1, 1); -                if (priv->idx == -1) { -                        gf_log (this->name, GF_LOG_WARNING, -                                "failed to register the event"); -                        ret = -1; +                        if (ret == -1) { +                                gf_log (this->name, GF_LOG_ERROR, +                                        "NBIO on %d failed (%s)", +                                        priv->sock, strerror (errno)); +                                close (priv->sock); +                                priv->sock = -1; +                                goto unlock; +                        }                  } + +                priv->connected = 0; +                rpc_transport_ref (this); + +		if (priv->own_thread) { +			if (pipe(priv->pipe) < 0) { +				gf_log(this->name,GF_LOG_ERROR, +				       "could not create pipe"); +			} + +			if (pthread_create(&priv->thread,NULL, +					socket_poller, this) != 0) { +				gf_log(this->name,GF_LOG_ERROR, +				       "could not create poll thread"); +			} +		} +		else { +			priv->idx = event_register (ctx->event_pool, priv->sock, +						    socket_event_handler, +						    this, 1, 1); +			if (priv->idx == -1) { +				gf_log ("", GF_LOG_WARNING, +					"failed to register the event"); +				ret = -1; +			} +		}          }  unlock:          pthread_mutex_unlock (&priv->lock); @@ -2352,6 +2752,7 @@ socket_submit_request (rpc_transport_t *this, rpc_transport_req_t *req)          char              need_append = 1;          struct ioq       *entry = NULL;          glusterfs_ctx_t  *ctx = NULL; +	char              a_byte = 'j';          GF_VALIDATE_OR_GOTO ("socket", this, out);          GF_VALIDATE_OR_GOTO ("socket", this->private, out); @@ -2377,21 +2778,31 @@ socket_submit_request (rpc_transport_t *this, rpc_transport_req_t *req)                          goto unlock;                  if (list_empty (&priv->ioq)) { -                        ret = __socket_ioq_churn_entry (this, entry); +                        ret = __socket_ioq_churn_entry (this, entry, 1); -                        if (ret == 0) +                        if (ret == 0) {                                  need_append = 0; - -                        if (ret > 0) +			} +                        if (ret > 0) {                                  need_poll_out = 1; +			}                  }                  if (need_append) {                          list_add_tail (&entry->list, &priv->ioq); +			if (priv->own_thread) { +				/* +				 * Make sure the polling thread wakes up, by +				 * writing a byte to represent this entry. +				 */ +				if (write(priv->pipe[1],&a_byte,1) < 1) { +					gf_log(this->name,GF_LOG_WARNING, +					       "write error on pipe"); +				} +			}                          ret = 0;                  } - -                if (need_poll_out) { +                if (!priv->own_thread && need_poll_out) {                          /* first entry to wait. continue writing on POLLOUT */                          priv->idx = event_select_on (ctx->event_pool,                                                       priv->sock, @@ -2415,6 +2826,7 @@ socket_submit_reply (rpc_transport_t *this, rpc_transport_reply_t *reply)          char              need_append = 1;          struct ioq       *entry = NULL;          glusterfs_ctx_t  *ctx = NULL; +	char              a_byte = 'd';          GF_VALIDATE_OR_GOTO ("socket", this, out);          GF_VALIDATE_OR_GOTO ("socket", this->private, out); @@ -2433,33 +2845,44 @@ socket_submit_reply (rpc_transport_t *this, rpc_transport_reply_t *reply)                          }                          goto unlock;                  } +                  priv->submit_log = 0;                  entry = __socket_ioq_new (this, &reply->msg);                  if (!entry)                          goto unlock; +                  if (list_empty (&priv->ioq)) { -                        ret = __socket_ioq_churn_entry (this, entry); +                        ret = __socket_ioq_churn_entry (this, entry, 1); -                        if (ret == 0) +                        if (ret == 0) {                                  need_append = 0; - -                        if (ret > 0) +			} +                        if (ret > 0) {                                  need_poll_out = 1; +			}                  }                  if (need_append) {                          list_add_tail (&entry->list, &priv->ioq); +			if (priv->own_thread) { +				/* +				 * Make sure the polling thread wakes up, by +				 * writing a byte to represent this entry. +				 */ +				if (write(priv->pipe[1],&a_byte,1) < 1) { +					gf_log(this->name,GF_LOG_WARNING, +					       "write error on pipe"); +				} +			}                          ret = 0;                  } - -                if (need_poll_out) { +                if (!priv->own_thread && need_poll_out) {                          /* first entry to wait. continue writing on POLLOUT */                          priv->idx = event_select_on (ctx->event_pool,                                                       priv->sock,                                                       priv->idx, -1, 1);                  }          } -  unlock:          pthread_mutex_unlock (&priv->lock); @@ -2622,6 +3045,7 @@ socket_init (rpc_transport_t *this)          char             *optstr = NULL;          uint32_t          keepalive = 0;          uint32_t          backlog = 0; +	int               session_id = 0;          if (this->private) {                  gf_log_callingfn (this->name, GF_LOG_ERROR, @@ -2750,11 +3174,130 @@ socket_init (rpc_transport_t *this)                  }          } -        optstr = NULL; +        priv->windowsize = (int)windowsize; + +        priv->ssl_enabled = _gf_false; +	if (dict_get_str(this->options,SSL_ENABLED_OPT,&optstr) == 0) { +                if (gf_string2boolean (optstr, &priv->ssl_enabled) != 0) { +                        gf_log (this->name, GF_LOG_ERROR, +				"invalid value given for ssl-enabled boolean"); +		} +	} + +        priv->ssl_own_cert = DEFAULT_CERT_PATH; +	if (dict_get_str(this->options,SSL_OWN_CERT_OPT,&optstr) == 0) { +                if (!priv->ssl_enabled) { +                        gf_log(this->name,GF_LOG_WARNING, +                               "%s specified without %s (ignored)", +                               SSL_OWN_CERT_OPT, SSL_ENABLED_OPT); +                } +                priv->ssl_own_cert = optstr; +	} +        priv->ssl_own_cert = gf_strdup(priv->ssl_own_cert); + +        priv->ssl_private_key = DEFAULT_KEY_PATH; +	if (dict_get_str(this->options,SSL_PRIVATE_KEY_OPT,&optstr) == 0) { +                if (!priv->ssl_enabled) { +                        gf_log(this->name,GF_LOG_WARNING, +                               "%s specified without %s (ignored)", +                               SSL_PRIVATE_KEY_OPT, SSL_ENABLED_OPT); +                } +                priv->ssl_private_key = optstr; +	} +        priv->ssl_private_key = gf_strdup(priv->ssl_private_key); + +        priv->ssl_ca_list = DEFAULT_CA_PATH; +	if (dict_get_str(this->options,SSL_CA_LIST_OPT,&optstr) == 0) { +                if (!priv->ssl_enabled) { +                        gf_log(this->name,GF_LOG_WARNING, +                               "%s specified without %s (ignored)", +                               SSL_CA_LIST_OPT, SSL_ENABLED_OPT); +                } +                priv->ssl_ca_list = optstr; +	} +        priv->ssl_ca_list = gf_strdup(priv->ssl_ca_list); + +        gf_log(this->name,GF_LOG_INFO,"SSL support is %s", +               priv->ssl_enabled ? "ENABLED" : "NOT enabled"); +        /* +         * This might get overridden temporarily in socket_connect (q.v.) +         * if we're using the glusterd portmapper. +         */ +        priv->use_ssl = priv->ssl_enabled; + +	priv->own_thread = priv->use_ssl; +	if (dict_get_str(this->options,OWN_THREAD_OPT,&optstr) == 0) { +                if (gf_string2boolean (optstr, &priv->own_thread) != 0) { +                        gf_log (this->name, GF_LOG_ERROR, +				"invalid value given for own-thread boolean"); +		} +	} +	gf_log(this->name,GF_LOG_INFO,"using %s polling thread", +	       priv->own_thread ? "private" : "system"); + +	if (priv->use_ssl) { +		SSL_library_init(); +		SSL_load_error_strings(); +		priv->ssl_meth = (SSL_METHOD *)TLSv1_method(); +		priv->ssl_ctx = SSL_CTX_new(priv->ssl_meth); + +                if (SSL_CTX_set_cipher_list(priv->ssl_ctx, +                                            "HIGH:-SSLv2") == 0) { +                        gf_log(this->name,GF_LOG_ERROR, +                               "failed to find any valid ciphers"); +                        goto err; +                } + +		if (!SSL_CTX_use_certificate_chain_file(priv->ssl_ctx, +							priv->ssl_own_cert)) { +			gf_log(this->name,GF_LOG_ERROR, +			       "could not load our cert"); +			goto err; +		} + +		if (!SSL_CTX_use_PrivateKey_file(priv->ssl_ctx, +						 priv->ssl_private_key, +						 SSL_FILETYPE_PEM)) { +			gf_log(this->name,GF_LOG_ERROR, +			       "could not load private key"); +			goto err; +		} + +		if (!SSL_CTX_load_verify_locations(priv->ssl_ctx, +						   priv->ssl_ca_list,0)) { +			gf_log(this->name,GF_LOG_ERROR, +			       "could not load CA list"); +			goto err; +		} + +#if (OPENSSL_VERSION_NUMBER < 0x00905100L) +		SSL_CTX_set_verify_depth(ctx,1); +#endif + +		priv->ssl_session_id = ++session_id; +		SSL_CTX_set_session_id_context(priv->ssl_ctx, +					       (void *)&priv->ssl_session_id, +					       sizeof(priv->ssl_session_id)); + +		SSL_CTX_set_verify(priv->ssl_ctx,SSL_VERIFY_PEER,0); +	} +  out:          this->private = priv; -          return 0; + +err: +        if (priv->ssl_own_cert) { +                GF_FREE(priv->ssl_own_cert); +        } +        if (priv->ssl_private_key) { +                GF_FREE(priv->ssl_private_key); +        } +        if (priv->ssl_ca_list) { +                GF_FREE(priv->ssl_ca_list); +        } +        GF_FREE(priv); +        return -1;  } @@ -2780,6 +3323,15 @@ fini (rpc_transport_t *this)                          "transport %p destroyed", this);                  pthread_mutex_destroy (&priv->lock); +		if (priv->ssl_private_key) { +			GF_FREE(priv->ssl_private_key); +		} +		if (priv->ssl_own_cert) { +			GF_FREE(priv->ssl_own_cert); +		} +		if (priv->ssl_ca_list) { +			GF_FREE(priv->ssl_ca_list); +		}                  GF_FREE (priv);          } @@ -2857,5 +3409,20 @@ struct volume_options options[] = {          { .key   = {"transport.socket.read-fail-log"},            .type  = GF_OPTION_TYPE_BOOL          }, +        { .key   = {SSL_ENABLED_OPT}, +          .type  = GF_OPTION_TYPE_BOOL +        }, +	{ .key   = {SSL_OWN_CERT_OPT}, +	  .type  = GF_OPTION_TYPE_STR +	}, +	{ .key   = {SSL_PRIVATE_KEY_OPT}, +	  .type  = GF_OPTION_TYPE_STR +	}, +	{ .key   = {SSL_CA_LIST_OPT}, +	  .type  = GF_OPTION_TYPE_STR +	}, +	{ .key   = {OWN_THREAD_OPT}, +	  .type  = GF_OPTION_TYPE_BOOL +	},          { .key = {NULL} }  }; diff --git a/rpc/rpc-transport/socket/src/socket.h b/rpc/rpc-transport/socket/src/socket.h index 0304f1db1eb..0a407cc1a76 100644 --- a/rpc/rpc-transport/socket/src/socket.h +++ b/rpc/rpc-transport/socket/src/socket.h @@ -11,6 +11,8 @@  #ifndef _SOCKET_H  #define _SOCKET_H +#include <openssl/ssl.h> +#include <openssl/err.h>  #ifndef _CONFIG_H  #define _CONFIG_H @@ -144,7 +146,8 @@ typedef struct {  typedef struct {          int32_t                sock;          int32_t                idx; -        unsigned char          connected; // -1 = not connected. 0 = in progress. 1 = connected +        /* -1 = not connected. 0 = in progress. 1 = connected */ +        char                   connected;          char                   bio;          char                   connect_finish_log;          char                   submit_log; @@ -195,6 +198,20 @@ typedef struct {          int                    keepaliveintvl;          uint32_t               backlog;          gf_boolean_t           read_fail_log; +        gf_boolean_t           ssl_enabled; +	gf_boolean_t           use_ssl; +	SSL_METHOD            *ssl_meth; +	SSL_CTX               *ssl_ctx; +	int                    ssl_session_id; +	BIO                   *ssl_sbio; +	SSL                   *ssl_ssl; +	char                  *ssl_own_cert; +	char                  *ssl_private_key; +	char                  *ssl_ca_list; +	pthread_t              thread; +	int                    pipe[2]; +	gf_boolean_t           own_thread; +	volatile int           socket_gen;  } socket_private_t;  | 
