diff options
Diffstat (limited to 'rpc/rpc-transport/socket/src/socket.c')
-rw-r--r-- | rpc/rpc-transport/socket/src/socket.c | 288 |
1 files changed, 94 insertions, 194 deletions
diff --git a/rpc/rpc-transport/socket/src/socket.c b/rpc/rpc-transport/socket/src/socket.c index 414952818aa..a295e6a9bab 100644 --- a/rpc/rpc-transport/socket/src/socket.c +++ b/rpc/rpc-transport/socket/src/socket.c @@ -11,7 +11,6 @@ #include "socket.h" #include "name.h" #include <glusterfs/dict.h> -#include "rpc-transport.h" #include <glusterfs/syscall.h> #include <glusterfs/byte-order.h> #include <glusterfs/compat-errno.h> @@ -30,7 +29,6 @@ #include <netinet/tcp.h> #endif -#include <fcntl.h> #include <errno.h> #include <rpc/xdr.h> #include <sys/ioctl.h> @@ -248,7 +246,6 @@ ssl_do(rpc_transport_t *this, void *buf, size_t len, SSL_trinary_func *func) int r = (-1); socket_private_t *priv = NULL; - GF_VALIDATE_OR_GOTO(this->name, this->private, out); priv = this->private; if (buf) { @@ -358,14 +355,12 @@ ssl_set_crl_verify_flags(SSL_CTX *ssl_ctx) #endif } -int +static int ssl_setup_connection_prefix(rpc_transport_t *this, gf_boolean_t server) { int ret = -1; socket_private_t *priv = NULL; - GF_VALIDATE_OR_GOTO(this->name, this->private, done); - priv = this->private; if (ssl_setup_connection_params(this) < 0) { @@ -417,7 +412,6 @@ ssl_setup_connection_postfix(rpc_transport_t *this) char peer_CN[256] = ""; socket_private_t *priv = NULL; - GF_VALIDATE_OR_GOTO(this->name, this->private, done); priv = this->private; /* Make sure _SSL verification_ succeeded, yielding an identity. */ @@ -451,11 +445,10 @@ ssl_error: SSL_free(priv->ssl_ssl); priv->ssl_ssl = NULL; -done: return NULL; } -int +static int ssl_complete_connection(rpc_transport_t *this) { int ret = -1; /* 1 : implies go back to epoll_wait() @@ -694,7 +687,6 @@ __socket_rwv(rpc_transport_t *this, struct iovec *vector, int count, int opcount = 0; int moved = 0; - GF_VALIDATE_OR_GOTO("socket", this, out); GF_VALIDATE_OR_GOTO("socket", this->private, out); priv = this->private; @@ -827,24 +819,16 @@ static int __socket_readv(rpc_transport_t *this, struct iovec *vector, int count, struct iovec **pending_vector, int *pending_count, size_t *bytes) { - int ret = -1; - - ret = __socket_rwv(this, vector, count, pending_vector, pending_count, - bytes, 0); - - return ret; + return __socket_rwv(this, vector, count, pending_vector, pending_count, + bytes, 0); } static int __socket_writev(rpc_transport_t *this, struct iovec *vector, int count, struct iovec **pending_vector, int *pending_count) { - int ret = -1; - - ret = __socket_rwv(this, vector, count, pending_vector, pending_count, NULL, - 1); - - return ret; + return __socket_rwv(this, vector, count, pending_vector, pending_count, + NULL, 1); } static int @@ -871,20 +855,14 @@ __socket_shutdown(rpc_transport_t *this) static int __socket_teardown_connection(rpc_transport_t *this) { - int ret = -1; socket_private_t *priv = NULL; - GF_VALIDATE_OR_GOTO("socket", this, out); - GF_VALIDATE_OR_GOTO("socket", this->private, out); - priv = this->private; if (priv->use_ssl) ssl_teardown_connection(priv); - ret = __socket_shutdown(this); -out: - return ret; + return __socket_shutdown(this); } static int @@ -893,9 +871,6 @@ __socket_disconnect(rpc_transport_t *this) int ret = -1; socket_private_t *priv = NULL; - GF_VALIDATE_OR_GOTO("socket", this, out); - GF_VALIDATE_OR_GOTO("socket", this->private, out); - priv = this->private; gf_log(this->name, GF_LOG_TRACE, "disconnecting %p, sock=%d", this, @@ -912,7 +887,6 @@ __socket_disconnect(rpc_transport_t *this) } } -out: return ret; } @@ -929,9 +903,6 @@ __socket_server_bind(rpc_transport_t *this) uint16_t sin_port = 0; int retries = 0; - GF_VALIDATE_OR_GOTO("socket", this, out); - GF_VALIDATE_OR_GOTO("socket", this->private, out); - priv = this->private; ctx = this->ctx; cmd_args = &ctx->cmd_args; @@ -1159,9 +1130,6 @@ __socket_reset(rpc_transport_t *this) { socket_private_t *priv = NULL; - GF_VALIDATE_OR_GOTO("socket", this, out); - GF_VALIDATE_OR_GOTO("socket", this->private, out); - priv = this->private; /* TODO: use mem-pool on incoming data */ @@ -1209,8 +1177,6 @@ __socket_reset(rpc_transport_t *this) GF_FREE(priv->ssl_ca_list); priv->ssl_ca_list = NULL; } -out: - return; } static void @@ -1240,8 +1206,6 @@ __socket_ioq_new(rpc_transport_t *this, rpc_transport_msg_t *msg) int count = 0; uint32_t size = 0; - GF_VALIDATE_OR_GOTO("socket", this, out); - /* TODO: use mem-pool */ entry = GF_CALLOC(1, sizeof(*entry), gf_common_mt_ioq); if (!entry) @@ -1296,7 +1260,6 @@ __socket_ioq_new(rpc_transport_t *this, rpc_transport_msg_t *msg) INIT_LIST_HEAD(&entry->list); -out: return entry; } @@ -1317,27 +1280,18 @@ out: } static void -__socket_ioq_flush(rpc_transport_t *this) +__socket_ioq_flush(socket_private_t *priv) { - socket_private_t *priv = NULL; struct ioq *entry = NULL; - GF_VALIDATE_OR_GOTO("socket", this, out); - GF_VALIDATE_OR_GOTO("socket", this->private, out); - - priv = this->private; - while (!list_empty(&priv->ioq)) { entry = priv->ioq_next; __socket_ioq_entry_free(entry); } - -out: - return; } static int -__socket_ioq_churn_entry(rpc_transport_t *this, struct ioq *entry, int direct) +__socket_ioq_churn_entry(rpc_transport_t *this, struct ioq *entry) { int ret = -1; @@ -1360,16 +1314,13 @@ __socket_ioq_churn(rpc_transport_t *this) int ret = 0; struct ioq *entry = NULL; - GF_VALIDATE_OR_GOTO("socket", this, out); - GF_VALIDATE_OR_GOTO("socket", this->private, out); - priv = this->private; while (!list_empty(&priv->ioq)) { /* pick next entry */ entry = priv->ioq_next; - ret = __socket_ioq_churn_entry(this, entry, 0); + ret = __socket_ioq_churn_entry(this, entry); if (ret != 0) break; @@ -1381,7 +1332,6 @@ __socket_ioq_churn(rpc_transport_t *this) priv->idx, -1, 0); } -out: return ret; } @@ -1391,15 +1341,12 @@ socket_event_poll_err(rpc_transport_t *this, int gen, int idx) socket_private_t *priv = NULL; gf_boolean_t socket_closed = _gf_false; - GF_VALIDATE_OR_GOTO("socket", this, out); - GF_VALIDATE_OR_GOTO("socket", this->private, out); - priv = this->private; pthread_mutex_lock(&priv->out_lock); { if ((priv->gen == gen) && (priv->idx == idx) && (priv->sock >= 0)) { - __socket_ioq_flush(this); + __socket_ioq_flush(priv); __socket_reset(this); socket_closed = _gf_true; } @@ -1417,7 +1364,6 @@ socket_event_poll_err(rpc_transport_t *this, int gen, int idx) rpc_transport_notify(this, RPC_TRANSPORT_DISCONNECT, this); } -out: return socket_closed; } @@ -1427,9 +1373,6 @@ socket_event_poll_out(rpc_transport_t *this) socket_private_t *priv = NULL; int ret = -1; - GF_VALIDATE_OR_GOTO("socket", this, out); - GF_VALIDATE_OR_GOTO("socket", this->private, out); - priv = this->private; pthread_mutex_lock(&priv->out_lock); @@ -1452,7 +1395,7 @@ socket_event_poll_out(rpc_transport_t *this) if (ret > 0) ret = 0; -out: + return ret; } @@ -1522,12 +1465,6 @@ out: return ret; } -static int -__socket_read_simple_request(rpc_transport_t *this) -{ - return __socket_read_simple_msg(this); -} - #define rpc_cred_addr(buf) (buf + RPC_MSGTYPE_SIZE + RPC_CALL_BODY_SIZE - 4) #define rpc_verf_addr(fragcurrent) (fragcurrent - 4) @@ -1554,9 +1491,6 @@ __socket_read_vectored_request(rpc_transport_t *this, struct gf_sock_incoming_frag *frag = NULL; sp_rpcfrag_request_state_t *request = NULL; - GF_VALIDATE_OR_GOTO("socket", this, out); - GF_VALIDATE_OR_GOTO("socket", this->private, out); - priv = this->private; /* used to reduce the indirection */ @@ -1703,7 +1637,6 @@ __socket_read_vectored_request(rpc_transport_t *this, break; } -out: return ret; } @@ -1720,9 +1653,6 @@ __socket_read_request(rpc_transport_t *this) struct gf_sock_incoming_frag *frag = NULL; sp_rpcfrag_request_state_t *request = NULL; - GF_VALIDATE_OR_GOTO("socket", this, out); - GF_VALIDATE_OR_GOTO("socket", this->private, out); - priv = this->private; /* used to reduce the indirection */ @@ -1766,7 +1696,7 @@ __socket_read_request(rpc_transport_t *this) if (vector_sizer) { ret = __socket_read_vectored_request(this, vector_sizer); } else { - ret = __socket_read_simple_request(this); + ret = __socket_read_simple_msg(this); } remaining_size = RPC_FRAGSIZE(in->fraghdr) - frag->bytes_read; @@ -1779,7 +1709,6 @@ __socket_read_request(rpc_transport_t *this) break; } -out: return ret; } @@ -1799,9 +1728,6 @@ __socket_read_accepted_successful_reply(rpc_transport_t *this) struct gf_sock_incoming_frag *frag = NULL; uint32_t remaining_size = 0; - GF_VALIDATE_OR_GOTO("socket", this, out); - GF_VALIDATE_OR_GOTO("socket", this->private, out); - priv = this->private; /* used to reduce the indirection */ @@ -1931,9 +1857,6 @@ __socket_read_accepted_successful_reply_v2(rpc_transport_t *this) struct gf_sock_incoming_frag *frag = NULL; uint32_t remaining_size = 0; - GF_VALIDATE_OR_GOTO("socket", this, out); - GF_VALIDATE_OR_GOTO("socket", this->private, out); - priv = this->private; /* used to reduce the indirection */ @@ -2062,9 +1985,6 @@ __socket_read_accepted_reply(rpc_transport_t *this) struct gf_sock_incoming *in = NULL; struct gf_sock_incoming_frag *frag = NULL; - GF_VALIDATE_OR_GOTO("socket", this, out); - GF_VALIDATE_OR_GOTO("socket", this->private, out); - priv = this->private; /* used to reduce the indirection */ in = &priv->incoming; @@ -2144,7 +2064,6 @@ __socket_read_accepted_reply(rpc_transport_t *this) break; } -out: return ret; } @@ -2166,9 +2085,6 @@ __socket_read_vectored_reply(rpc_transport_t *this) struct gf_sock_incoming *in = NULL; struct gf_sock_incoming_frag *frag = NULL; - GF_VALIDATE_OR_GOTO("socket", this, out); - GF_VALIDATE_OR_GOTO("socket", this->private, out); - priv = this->private; in = &priv->incoming; frag = &in->frag; @@ -2212,7 +2128,6 @@ __socket_read_vectored_reply(rpc_transport_t *this) break; } -out: return ret; } @@ -2235,9 +2150,6 @@ __socket_read_reply(rpc_transport_t *this) struct gf_sock_incoming *in = NULL; struct gf_sock_incoming_frag *frag = NULL; - GF_VALIDATE_OR_GOTO("socket", this, out); - GF_VALIDATE_OR_GOTO("socket", this->private, out); - priv = this->private; in = &priv->incoming; frag = &in->frag; @@ -2303,9 +2215,6 @@ __socket_read_frag(rpc_transport_t *this) struct gf_sock_incoming *in = NULL; struct gf_sock_incoming_frag *frag = NULL; - GF_VALIDATE_OR_GOTO("socket", this, out); - GF_VALIDATE_OR_GOTO("socket", this->private, out); - priv = this->private; /* used to reduce the indirection */ in = &priv->incoming; @@ -2363,7 +2272,6 @@ __socket_read_frag(rpc_transport_t *this) break; } -out: return ret; } @@ -2577,14 +2485,7 @@ static int socket_proto_state_machine(rpc_transport_t *this, rpc_transport_pollin_t **pollin) { - int ret = -1; - - GF_VALIDATE_OR_GOTO("socket", this, out); - - ret = __socket_proto_state_machine(this, pollin); - -out: - return ret; + return __socket_proto_state_machine(this, pollin); } static void @@ -2653,9 +2554,6 @@ socket_connect_finish(rpc_transport_t *this) rpc_transport_event_t event = 0; char notify_rpc = 0; - GF_VALIDATE_OR_GOTO("socket", this, out); - GF_VALIDATE_OR_GOTO("socket", this->private, out); - priv = this->private; pthread_mutex_lock(&priv->out_lock); @@ -2710,7 +2608,7 @@ unlock: if (notify_rpc) { rpc_transport_notify(this, event, this); } -out: + return ret; } @@ -2719,12 +2617,8 @@ socket_disconnect(rpc_transport_t *this, gf_boolean_t wait); /* socket_is_connected() is for use only in socket_event_handler() */ static inline gf_boolean_t -socket_is_connected(rpc_transport_t *this) +socket_is_connected(socket_private_t *priv) { - socket_private_t *priv = NULL; - - priv = this->private; - if (priv->use_ssl) { return priv->is_server ? priv->ssl_accepted : priv->ssl_connected; } else { @@ -2821,7 +2715,7 @@ ssl_handle_client_connection_attempt(rpc_transport_t *this) /* SSL client */ if (priv->connect_failed) { gf_log(this->name, GF_LOG_TRACE, ">>> disconnecting SSL socket"); - ret = socket_disconnect(this, _gf_false); + (void)socket_disconnect(this, _gf_false); /* Force ret to be -1, as we are officially done with this socket */ ret = -1; @@ -2851,7 +2745,7 @@ ssl_handle_client_connection_attempt(rpc_transport_t *this) ret = 1; } else { /* this is a connection failure */ - ret = socket_connect_finish(this); + (void)socket_connect_finish(this); gf_log(this->name, GF_LOG_TRACE, "ssl_complete_connection " "returned error"); @@ -3000,7 +2894,7 @@ socket_event_handler(int fd, int idx, int gen, void *data, int poll_in, poll_out, poll_err); if (!poll_err) { - if (!socket_is_connected(this)) { + if (!socket_is_connected(priv)) { gf_log(this->name, GF_LOG_TRACE, "%s (sock:%d) socket is not connected, " "completing connection", @@ -3136,23 +3030,25 @@ socket_server_event_handler(int fd, int idx, int gen, void *data, int poll_in, goto out; } - if (priv->nodelay && (new_sockaddr.ss_family != AF_UNIX)) { - ret = __socket_nodelay(new_sock); - if (ret != 0) { - gf_log(this->name, GF_LOG_WARNING, - "setsockopt() failed for " - "NODELAY (%s)", - strerror(errno)); + if (new_sockaddr.ss_family != AF_UNIX) { + if (priv->nodelay) { + ret = __socket_nodelay(new_sock); + if (ret != 0) { + gf_log(this->name, GF_LOG_WARNING, + "setsockopt() failed for " + "NODELAY (%s)", + strerror(errno)); + } } - } - if (priv->keepalive && new_sockaddr.ss_family != AF_UNIX) { - ret = __socket_keepalive(new_sock, new_sockaddr.ss_family, - priv->keepaliveintvl, priv->keepaliveidle, - priv->keepalivecnt, priv->timeout); - if (ret != 0) - gf_log(this->name, GF_LOG_WARNING, - "Failed to set keep-alive: %s", strerror(errno)); + if (priv->keepalive) { + ret = __socket_keepalive( + new_sock, new_sockaddr.ss_family, priv->keepaliveintvl, + priv->keepaliveidle, priv->keepalivecnt, priv->timeout); + if (ret != 0) + gf_log(this->name, GF_LOG_WARNING, + "Failed to set keep-alive: %s", strerror(errno)); + } } new_trans = GF_CALLOC(1, sizeof(*new_trans), gf_common_mt_rpc_trans_t); @@ -3349,9 +3245,6 @@ socket_disconnect(rpc_transport_t *this, gf_boolean_t wait) socket_private_t *priv = NULL; int ret = -1; - GF_VALIDATE_OR_GOTO("socket", this, out); - GF_VALIDATE_OR_GOTO("socket", this->private, out); - priv = this->private; pthread_mutex_lock(&priv->out_lock); @@ -3360,7 +3253,6 @@ socket_disconnect(rpc_transport_t *this, gf_boolean_t wait) } pthread_mutex_unlock(&priv->out_lock); -out: return ret; } @@ -3536,30 +3428,32 @@ socket_connect(rpc_transport_t *this, int port) } #endif - if (priv->nodelay && (sa_family != AF_UNIX)) { - ret = __socket_nodelay(priv->sock); - - if (ret != 0) { - gf_log(this->name, GF_LOG_ERROR, "NODELAY on %d failed (%s)", - priv->sock, strerror(errno)); + if (sa_family != AF_UNIX) { + if (priv->nodelay) { + ret = __socket_nodelay(priv->sock); + if (ret != 0) { + gf_log(this->name, GF_LOG_ERROR, + "NODELAY on %d failed (%s)", priv->sock, + strerror(errno)); + } } - } - if (priv->keepalive && sa_family != AF_UNIX) { - ret = __socket_keepalive(priv->sock, sa_family, - priv->keepaliveintvl, priv->keepaliveidle, - priv->keepalivecnt, priv->timeout); - if (ret != 0) - gf_log(this->name, GF_LOG_ERROR, "Failed to set keep-alive: %s", - strerror(errno)); + if (priv->keepalive) { + ret = __socket_keepalive( + priv->sock, sa_family, priv->keepaliveintvl, + priv->keepaliveidle, priv->keepalivecnt, priv->timeout); + if (ret != 0) + gf_log(this->name, GF_LOG_ERROR, + "Failed to set keep-alive: %s", strerror(errno)); + } } SA(&this->myinfo.sockaddr)->sa_family = SA(&this->peerinfo.sockaddr) ->sa_family; /* If a source addr is explicitly specified, use it */ - ret = dict_get_str(this->options, "transport.socket.source-addr", - &local_addr); + ret = dict_get_str_sizen(this->options, "transport.socket.source-addr", + &local_addr); if (!ret && SA(&this->myinfo.sockaddr)->sa_family == AF_INET) { addr = (struct sockaddr_in *)(&this->myinfo.sockaddr); ret = inet_pton(AF_INET, local_addr, &(addr->sin_addr.s_addr)); @@ -3902,7 +3796,7 @@ socket_submit_outgoing_msg(rpc_transport_t *this, rpc_transport_msg_t *msg) goto unlock; if (list_empty(&priv->ioq)) { - ret = __socket_ioq_churn_entry(this, entry, 1); + ret = __socket_ioq_churn_entry(this, entry); if (ret == 0) { need_append = 0; @@ -4100,26 +3994,28 @@ reconfigure(rpc_transport_t *this, dict_t *options) "Reconfigured transport.listen-backlog=%d", priv->backlog); } - if (dict_get_int32_sizen(options, "transport.socket.keepalive-time", - &(priv->keepaliveidle)) != 0) - priv->keepaliveidle = GF_KEEPALIVE_TIME; - gf_log(this->name, GF_LOG_DEBUG, - "Reconfigured transport.socket.keepalive-time=%d", - priv->keepaliveidle); + if (priv->keepalive) { + if (dict_get_int32_sizen(options, "transport.socket.keepalive-time", + &(priv->keepaliveidle)) != 0) + priv->keepaliveidle = GF_KEEPALIVE_TIME; + gf_log(this->name, GF_LOG_DEBUG, + "Reconfigured transport.socket.keepalive-time=%d", + priv->keepaliveidle); - if (dict_get_int32_sizen(options, "transport.socket.keepalive-interval", - &(priv->keepaliveintvl)) != 0) - priv->keepaliveintvl = GF_KEEPALIVE_INTERVAL; - gf_log(this->name, GF_LOG_DEBUG, - "Reconfigured transport.socket.keepalive-interval=%d", - priv->keepaliveintvl); + if (dict_get_int32_sizen(options, "transport.socket.keepalive-interval", + &(priv->keepaliveintvl)) != 0) + priv->keepaliveintvl = GF_KEEPALIVE_INTERVAL; + gf_log(this->name, GF_LOG_DEBUG, + "Reconfigured transport.socket.keepalive-interval=%d", + priv->keepaliveintvl); - if (dict_get_int32_sizen(options, "transport.socket.keepalive-count", - &(priv->keepalivecnt)) != 0) - priv->keepalivecnt = GF_KEEPALIVE_COUNT; - gf_log(this->name, GF_LOG_DEBUG, - "Reconfigured transport.socket.keepalive-count=%d", - priv->keepalivecnt); + if (dict_get_int32_sizen(options, "transport.socket.keepalive-count", + &(priv->keepalivecnt)) != 0) + priv->keepalivecnt = GF_KEEPALIVE_COUNT; + gf_log(this->name, GF_LOG_DEBUG, + "Reconfigured transport.socket.keepalive-count=%d", + priv->keepalivecnt); + } optstr = NULL; if (dict_get_str_sizen(options, "tcp-window-size", &optstr) == 0) { @@ -4614,22 +4510,26 @@ socket_init(rpc_transport_t *this) gf_log(this->name, GF_LOG_DEBUG, "Configured transport.tcp-user-timeout=%d", priv->timeout); - if (dict_get_int32_sizen(this->options, "transport.socket.keepalive-time", - &(priv->keepaliveidle)) != 0) { - priv->keepaliveidle = GF_KEEPALIVE_TIME; - } + if (priv->keepalive) { + if (dict_get_int32_sizen(this->options, + "transport.socket.keepalive-time", + &(priv->keepaliveidle)) != 0) { + priv->keepaliveidle = GF_KEEPALIVE_TIME; + } - if (dict_get_int32_sizen(this->options, - "transport.socket.keepalive-interval", - &(priv->keepaliveintvl)) != 0) { - priv->keepaliveintvl = GF_KEEPALIVE_INTERVAL; - } + if (dict_get_int32_sizen(this->options, + "transport.socket.keepalive-interval", + &(priv->keepaliveintvl)) != 0) { + priv->keepaliveintvl = GF_KEEPALIVE_INTERVAL; + } - if (dict_get_int32_sizen(this->options, "transport.socket.keepalive-count", - &(priv->keepalivecnt)) != 0) - priv->keepalivecnt = GF_KEEPALIVE_COUNT; - gf_log(this->name, GF_LOG_DEBUG, "Reconfigured transport.keepalivecnt=%d", - priv->keepalivecnt); + if (dict_get_int32_sizen(this->options, + "transport.socket.keepalive-count", + &(priv->keepalivecnt)) != 0) + priv->keepalivecnt = GF_KEEPALIVE_COUNT; + gf_log(this->name, GF_LOG_DEBUG, + "Reconfigured transport.keepalivecnt=%d", priv->keepalivecnt); + } if (dict_get_uint32(this->options, "transport.listen-backlog", &(priv->backlog)) != 0) { @@ -4683,7 +4583,7 @@ fini(rpc_transport_t *this) if (priv->sock >= 0) { pthread_mutex_lock(&priv->out_lock); { - __socket_ioq_flush(this); + __socket_ioq_flush(priv); __socket_reset(this); } pthread_mutex_unlock(&priv->out_lock); |