diff options
Diffstat (limited to 'rpc/rpc-transport/socket/src/socket.c')
| -rw-r--r-- | rpc/rpc-transport/socket/src/socket.c | 1128 |
1 files changed, 702 insertions, 426 deletions
diff --git a/rpc/rpc-transport/socket/src/socket.c b/rpc/rpc-transport/socket/src/socket.c index f2db84d34..93da3f296 100644 --- a/rpc/rpc-transport/socket/src/socket.c +++ b/rpc/rpc-transport/socket/src/socket.c @@ -63,94 +63,98 @@ typedef int SSL_unary_func (SSL *); typedef int SSL_trinary_func (SSL *, void *, int); -#define __socket_proto_reset_pending(priv) do { \ - memset (&priv->incoming.frag.vector, 0, \ - sizeof (priv->incoming.frag.vector)); \ - priv->incoming.frag.pending_vector = \ - &priv->incoming.frag.vector; \ - priv->incoming.frag.pending_vector->iov_base = \ - priv->incoming.frag.fragcurrent; \ - priv->incoming.pending_vector = \ - priv->incoming.frag.pending_vector; \ - } while (0); +#define __socket_proto_reset_pending(priv) do { \ + struct gf_sock_incoming_frag *frag; \ + frag = &priv->incoming.frag; \ + \ + memset (&frag->vector, 0, sizeof (frag->vector)); \ + frag->pending_vector = &frag->vector; \ + frag->pending_vector->iov_base = frag->fragcurrent; \ + priv->incoming.pending_vector = frag->pending_vector; \ + } while (0) #define __socket_proto_update_pending(priv) \ do { \ - uint32_t remaining_fragsize = 0; \ - if (priv->incoming.frag.pending_vector->iov_len == 0) { \ - remaining_fragsize = RPC_FRAGSIZE (priv->incoming.fraghdr) \ - - priv->incoming.frag.bytes_read; \ + uint32_t remaining; \ + struct gf_sock_incoming_frag *frag; \ + frag = &priv->incoming.frag; \ + if (frag->pending_vector->iov_len == 0) { \ + remaining = (RPC_FRAGSIZE (priv->incoming.fraghdr) \ + - frag->bytes_read); \ \ - priv->incoming.frag.pending_vector->iov_len = \ - remaining_fragsize > priv->incoming.frag.remaining_size \ - ? priv->incoming.frag.remaining_size : remaining_fragsize; \ + frag->pending_vector->iov_len = \ + (remaining > frag->remaining_size) \ + ? frag->remaining_size : remaining; \ \ - priv->incoming.frag.remaining_size -= \ - priv->incoming.frag.pending_vector->iov_len; \ + frag->remaining_size -= \ + frag->pending_vector->iov_len; \ } \ - } while (0); + } while (0) #define __socket_proto_update_priv_after_read(priv, ret, bytes_read) \ { \ - priv->incoming.frag.fragcurrent += bytes_read; \ - priv->incoming.frag.bytes_read += bytes_read; \ + struct gf_sock_incoming_frag *frag; \ + frag = &priv->incoming.frag; \ + \ + frag->fragcurrent += bytes_read; \ + frag->bytes_read += bytes_read; \ \ - if ((ret > 0) || (priv->incoming.frag.remaining_size != 0)) { \ - if (priv->incoming.frag.remaining_size != 0 && ret == 0) { \ + if ((ret > 0) || (frag->remaining_size != 0)) { \ + if (frag->remaining_size != 0 && ret == 0) { \ __socket_proto_reset_pending (priv); \ } \ \ - gf_log (this->name, GF_LOG_TRACE, "partial read on non-blocking socket"); \ + gf_log (this->name, GF_LOG_TRACE, \ + "partial read on non-blocking socket"); \ \ break; \ } \ } -#define __socket_proto_init_pending(priv, size) \ +#define __socket_proto_init_pending(priv,size) \ do { \ - uint32_t remaining_fragsize = 0; \ - remaining_fragsize = RPC_FRAGSIZE (priv->incoming.fraghdr) \ - - priv->incoming.frag.bytes_read; \ + uint32_t remaining = 0; \ + struct gf_sock_incoming_frag *frag; \ + frag = &priv->incoming.frag; \ + \ + remaining = (RPC_FRAGSIZE (priv->incoming.fraghdr) \ + - frag->bytes_read); \ \ - __socket_proto_reset_pending (priv); \ + __socket_proto_reset_pending (priv); \ \ - priv->incoming.frag.pending_vector->iov_len = \ - remaining_fragsize > size ? size : remaining_fragsize; \ + frag->pending_vector->iov_len = \ + (remaining > size) ? size : remaining; \ \ - priv->incoming.frag.remaining_size = \ - size - priv->incoming.frag.pending_vector->iov_len; \ + frag->remaining_size = (size - frag->pending_vector->iov_len); \ \ - } while (0); + } while(0) /* This will be used in a switch case and breaks from the switch case if all * the pending data is not read. */ #define __socket_proto_read(priv, ret) \ - { \ + { \ size_t bytes_read = 0; \ + struct gf_sock_incoming *in; \ + in = &priv->incoming; \ \ __socket_proto_update_pending (priv); \ \ ret = __socket_readv (this, \ - priv->incoming.pending_vector, 1, \ - &priv->incoming.pending_vector, \ - &priv->incoming.pending_count, \ + in->pending_vector, 1, \ + &in->pending_vector, \ + &in->pending_count, \ &bytes_read); \ - if (ret == -1) { \ - gf_log (this->name, GF_LOG_WARNING, \ - "reading from socket failed. Error (%s), " \ - "peer (%s)", strerror (errno), \ - this->peerinfo.identifier); \ + if (ret == -1) \ break; \ - } \ __socket_proto_update_priv_after_read (priv, ret, bytes_read); \ } -int socket_init (rpc_transport_t *this); +static int socket_init (rpc_transport_t *this); -void +static void ssl_dump_error_stack (const char *caller) { unsigned long errnum = 0; @@ -164,7 +168,7 @@ ssl_dump_error_stack (const char *caller) } } -int +static int ssl_do (rpc_transport_t *this, void *buf, size_t len, SSL_trinary_func *func) { int r = (-1); @@ -235,7 +239,7 @@ out: #define ssl_read_one(t,b,l) ssl_do((t),(b),(l),(SSL_trinary_func *)SSL_read) #define ssl_write_one(t,b,l) ssl_do((t),(b),(l),(SSL_trinary_func *)SSL_write) -int +static int ssl_setup_connection (rpc_transport_t *this, int server) { X509 *peer = NULL; @@ -294,10 +298,139 @@ ssl_error: ssl_dump_error_stack(this->name); free_ssl: SSL_free(priv->ssl_ssl); + priv->ssl_ssl = NULL; done: return ret; } + +static void +ssl_teardown_connection (socket_private_t *priv) +{ + SSL_shutdown(priv->ssl_ssl); + SSL_clear(priv->ssl_ssl); + SSL_free(priv->ssl_ssl); + priv->ssl_ssl = NULL; +} + + +static ssize_t +__socket_ssl_readv (rpc_transport_t *this, struct iovec *opvector, int opcount) +{ + socket_private_t *priv = NULL; + int sock = -1; + int ret = -1; + + priv = this->private; + sock = priv->sock; + + if (priv->use_ssl) { + ret = ssl_read_one (this, opvector->iov_base, opvector->iov_len); + } else { + ret = readv (sock, opvector, opcount); + } + + return ret; +} + + +static ssize_t +__socket_ssl_read (rpc_transport_t *this, void *buf, size_t count) +{ + struct iovec iov = {0, }; + int ret = -1; + + iov.iov_base = buf; + iov.iov_len = count; + + ret = __socket_ssl_readv (this, &iov, 1); + + return ret; +} + + +static int +__socket_cached_read (rpc_transport_t *this, struct iovec *opvector, int opcount) +{ + socket_private_t *priv = NULL; + int sock = -1; + struct gf_sock_incoming *in = NULL; + int req_len = -1; + int ret = -1; + + priv = this->private; + sock = priv->sock; + in = &priv->incoming; + req_len = iov_length (opvector, opcount); + + if (in->record_state == SP_STATE_READING_FRAGHDR) { + in->ra_read = 0; + in->ra_served = 0; + in->ra_max = 0; + in->ra_buf = NULL; + goto uncached; + } + + if (!in->ra_max) { + /* first call after passing SP_STATE_READING_FRAGHDR */ + in->ra_max = min (RPC_FRAGSIZE (in->fraghdr), GF_SOCKET_RA_MAX); + /* Note that the in->iobuf is the primary iobuf into which + headers are read into. By using this itself as our + read-ahead cache, we can avoid memory copies in iov_load + */ + in->ra_buf = iobuf_ptr (in->iobuf); + } + + /* fill read-ahead */ + if (in->ra_read < in->ra_max) { + ret = __socket_ssl_read (this, &in->ra_buf[in->ra_read], + (in->ra_max - in->ra_read)); + if (ret > 0) + in->ra_read += ret; + + /* we proceed to test if there is still cached data to + be served even if readahead could not progress */ + } + + /* serve cached */ + if (in->ra_served < in->ra_read) { + ret = iov_load (opvector, opcount, &in->ra_buf[in->ra_served], + min (req_len, (in->ra_read - in->ra_served))); + + in->ra_served += ret; + /* Do not read uncached and cached in the same call */ + goto out; + } + + if (in->ra_read < in->ra_max) + /* If there was no cached data to be served, (and we are + guaranteed to have already performed an attempt to progress + readahead above), and we have not yet read out the full + readahead capacity, then bail out for now without doing + the uncached read below (as that will overtake future cached + read) + */ + goto out; +uncached: + ret = __socket_ssl_readv (this, opvector, opcount); +out: + return ret; +} + +static gf_boolean_t +__does_socket_rwv_error_need_logging (socket_private_t *priv, int write) +{ + int read = !write; + + if (priv->connected == -1) /* Didn't even connect, of course it fails */ + return _gf_false; + + if (read && (priv->read_fail_log == _gf_false)) + return _gf_false; + + return _gf_true; +} + /* * return value: * 0 = success (completed) @@ -305,7 +438,7 @@ done: * > 0 = incomplete */ -int +static int __socket_rwv (rpc_transport_t *this, struct iovec *vector, int count, struct iovec **pending_vector, int *pending_count, size_t *bytes, int write) @@ -353,13 +486,8 @@ __socket_rwv (rpc_transport_t *this, struct iovec *vector, int count, } this->total_bytes_write += ret; } else { - if (priv->use_ssl) { - ret = ssl_read_one(this, - opvector->iov_base, opvector->iov_len); - } - else { - ret = readv (sock, opvector, opcount); - } + ret = __socket_cached_read (this, opvector, opcount); + if (ret == 0) { gf_log(this->name,GF_LOG_DEBUG,"EOF on socket"); errno = ENODATA; @@ -385,9 +513,15 @@ __socket_rwv (rpc_transport_t *this, struct iovec *vector, int count, if (errno == EINTR) continue; - gf_log (this->name, GF_LOG_WARNING, - "%s failed (%s)", write ? "writev" : "readv", - strerror (errno)); + if (__does_socket_rwv_error_need_logging (priv, + write)) { + gf_log (this->name, GF_LOG_WARNING, + "%s on %s failed (%s)", + write ? "writev":"readv", + this->peerinfo.identifier, + strerror (errno)); + } + if (priv->use_ssl) { ssl_dump_error_stack(this->name); } @@ -438,7 +572,7 @@ out: } -int +static int __socket_readv (rpc_transport_t *this, struct iovec *vector, int count, struct iovec **pending_vector, int *pending_count, size_t *bytes) @@ -452,7 +586,7 @@ __socket_readv (rpc_transport_t *this, struct iovec *vector, int count, } -int +static int __socket_writev (rpc_transport_t *this, struct iovec *vector, int count, struct iovec **pending_vector, int *pending_count) { @@ -465,41 +599,56 @@ __socket_writev (rpc_transport_t *this, struct iovec *vector, int count, } -int +static int +__socket_shutdown (rpc_transport_t *this) +{ + int ret = -1; + socket_private_t *priv = this->private; + + priv->connected = -1; + ret = shutdown (priv->sock, SHUT_RDWR); + if (ret) { + /* its already disconnected.. no need to understand + why it failed to shutdown in normal cases */ + gf_log (this->name, GF_LOG_DEBUG, + "shutdown() returned %d. %s", + ret, strerror (errno)); + } + + return ret; +} + +static int __socket_disconnect (rpc_transport_t *this) { - socket_private_t *priv = NULL; int ret = -1; + socket_private_t *priv = NULL; GF_VALIDATE_OR_GOTO ("socket", this, out); GF_VALIDATE_OR_GOTO ("socket", this->private, out); priv = this->private; + gf_log (this->name, GF_LOG_TRACE, + "disconnecting %p, state=%u gen=%u sock=%d", this, + priv->ot_state, priv->ot_gen, priv->sock); + if (priv->sock != -1) { - priv->connected = -1; - ret = shutdown (priv->sock, SHUT_RDWR); - if (ret) { - /* its already disconnected.. no need to understand - why it failed to shutdown in normal cases */ - gf_log (this->name, GF_LOG_DEBUG, - "shutdown() returned %d. %s", - ret, strerror (errno)); - } - if (priv->use_ssl) { - SSL_shutdown(priv->ssl_ssl); - SSL_clear(priv->ssl_ssl); - SSL_free(priv->ssl_ssl); - } + ret = __socket_shutdown(this); if (priv->own_thread) { - /* - * Without this, reconnect (= disconnect + connect) - * won't work except by accident. - */ - close(priv->sock); - priv->sock = -1; - ++(priv->socket_gen); - } + /* + * Without this, reconnect (= disconnect + connect) + * won't work except by accident. + */ + close(priv->sock); + priv->sock = -1; + gf_log (this->name, GF_LOG_TRACE, + "OT_PLEASE_DIE on %p", this); + priv->ot_state = OT_PLEASE_DIE; + } + else if (priv->use_ssl) { + ssl_teardown_connection(priv); + } } out: @@ -507,7 +656,7 @@ out: } -int +static int __socket_server_bind (rpc_transport_t *this) { socket_private_t *priv = NULL; @@ -563,7 +712,7 @@ out: } -int +static int __socket_nonblock (int fd) { int flags = 0; @@ -577,7 +726,7 @@ __socket_nonblock (int fd) return ret; } -int +static int __socket_nodelay (int fd) { int on = 1; @@ -594,7 +743,7 @@ __socket_nodelay (int fd) static int -__socket_keepalive (int fd, int keepalive_intvl, int keepalive_idle) +__socket_keepalive (int fd, int family, int keepalive_intvl, int keepalive_idle) { int on = 1; int ret = -1; @@ -623,18 +772,23 @@ __socket_keepalive (int fd, int keepalive_intvl, int keepalive_idle) goto err; } #else + if (family != AF_INET) + goto done; + ret = setsockopt (fd, IPPROTO_TCP, TCP_KEEPIDLE, &keepalive_idle, sizeof (keepalive_intvl)); if (ret == -1) { gf_log ("socket", GF_LOG_WARNING, - "failed to set keep idle on socket %d", fd); + "failed to set keep idle %d on socket %d, %s", + keepalive_idle, fd, strerror(errno)); goto err; } - ret = setsockopt (fd, IPPROTO_TCP, TCP_KEEPINTVL, &keepalive_intvl, + ret = setsockopt (fd, IPPROTO_TCP , TCP_KEEPINTVL, &keepalive_intvl, sizeof (keepalive_intvl)); if (ret == -1) { gf_log ("socket", GF_LOG_WARNING, - "failed to set keep alive interval on socket %d", fd); + "failed to set keep interval %d on socket %d, %s", + keepalive_intvl, fd, strerror(errno)); goto err; } #endif @@ -648,7 +802,7 @@ err: } -int +static int __socket_connect_finish (int fd) { int ret = -1; @@ -666,7 +820,7 @@ __socket_connect_finish (int fd) } -void +static void __socket_reset (rpc_transport_t *this) { socket_private_t *priv = NULL; @@ -703,13 +857,13 @@ out: } -void +static void socket_set_lastfrag (uint32_t *fragsize) { (*fragsize) |= 0x80000000U; } -void +static void socket_set_frag_header_size (uint32_t size, char *haddr) { size = htonl (size); @@ -717,14 +871,14 @@ socket_set_frag_header_size (uint32_t size, char *haddr) } -void +static void socket_set_last_frag_header_size (uint32_t size, char *haddr) { socket_set_lastfrag (&size); socket_set_frag_header_size (size, haddr); } -struct ioq * +static struct ioq * __socket_ioq_new (rpc_transport_t *this, rpc_transport_msg_t *msg) { struct ioq *entry = NULL; @@ -791,7 +945,7 @@ out: } -void +static void __socket_ioq_entry_free (struct ioq *entry) { GF_VALIDATE_OR_GOTO ("socket", entry, out); @@ -808,7 +962,7 @@ out: } -void +static void __socket_ioq_flush (rpc_transport_t *this) { socket_private_t *priv = NULL; @@ -829,7 +983,7 @@ out: } -int +static int __socket_ioq_churn_entry (rpc_transport_t *this, struct ioq *entry, int direct) { int ret = -1; @@ -863,7 +1017,7 @@ __socket_ioq_churn_entry (rpc_transport_t *this, struct ioq *entry, int direct) } -int +static int __socket_ioq_churn (rpc_transport_t *this) { socket_private_t *priv = NULL; @@ -896,7 +1050,7 @@ out: } -int +static int socket_event_poll_err (rpc_transport_t *this) { socket_private_t *priv = NULL; @@ -921,7 +1075,7 @@ out: } -int +static int socket_event_poll_out (rpc_transport_t *this) { socket_private_t *priv = NULL; @@ -951,43 +1105,45 @@ out: } -inline int +static inline int __socket_read_simple_msg (rpc_transport_t *this) { - socket_private_t *priv = NULL; - int ret = 0; - uint32_t remaining_size = 0; - size_t bytes_read = 0; + int ret = 0; + uint32_t remaining_size = 0; + size_t bytes_read = 0; + socket_private_t *priv = NULL; + struct gf_sock_incoming *in = NULL; + struct gf_sock_incoming_frag *frag = NULL; GF_VALIDATE_OR_GOTO ("socket", this, out); GF_VALIDATE_OR_GOTO ("socket", this->private, out); priv = this->private; - switch (priv->incoming.frag.simple_state) { + in = &priv->incoming; + frag = &in->frag; + + switch (frag->simple_state) { case SP_STATE_SIMPLE_MSG_INIT: - remaining_size = RPC_FRAGSIZE (priv->incoming.fraghdr) - - priv->incoming.frag.bytes_read; + remaining_size = RPC_FRAGSIZE (in->fraghdr) - frag->bytes_read; __socket_proto_init_pending (priv, remaining_size); - priv->incoming.frag.simple_state = - SP_STATE_READING_SIMPLE_MSG; + frag->simple_state = SP_STATE_READING_SIMPLE_MSG; /* fall through */ case SP_STATE_READING_SIMPLE_MSG: ret = 0; - remaining_size = RPC_FRAGSIZE (priv->incoming.fraghdr) - - priv->incoming.frag.bytes_read; + remaining_size = RPC_FRAGSIZE (in->fraghdr) - frag->bytes_read; if (remaining_size > 0) { ret = __socket_readv (this, - priv->incoming.pending_vector, 1, - &priv->incoming.pending_vector, - &priv->incoming.pending_count, + in->pending_vector, 1, + &in->pending_vector, + &in->pending_count, &bytes_read); } @@ -999,8 +1155,8 @@ __socket_read_simple_msg (rpc_transport_t *this) break; } - priv->incoming.frag.bytes_read += bytes_read; - priv->incoming.frag.fragcurrent += bytes_read; + frag->bytes_read += bytes_read; + frag->fragcurrent += bytes_read; if (ret > 0) { gf_log (this->name, GF_LOG_TRACE, @@ -1009,8 +1165,7 @@ __socket_read_simple_msg (rpc_transport_t *this) } if (ret == 0) { - priv->incoming.frag.simple_state - = SP_STATE_SIMPLE_MSG_INIT; + frag->simple_state = SP_STATE_SIMPLE_MSG_INIT; } } @@ -1019,7 +1174,7 @@ out: } -inline int +static inline int __socket_read_simple_request (rpc_transport_t *this) { return __socket_read_simple_msg (this); @@ -1036,7 +1191,7 @@ __socket_read_simple_request (rpc_transport_t *this) #define rpc_progver_addr(buf) (buf + RPC_MSGTYPE_SIZE + 8) #define rpc_procnum_addr(buf) (buf + RPC_MSGTYPE_SIZE + 12) -inline int +static inline int __socket_read_vectored_request (rpc_transport_t *this, rpcsvc_vector_sizer vector_sizer) { socket_private_t *priv = NULL; @@ -1046,17 +1201,26 @@ __socket_read_vectored_request (rpc_transport_t *this, rpcsvc_vector_sizer vecto struct iobuf *iobuf = NULL; uint32_t remaining_size = 0; ssize_t readsize = 0; - size_t size = 0; + size_t size = 0; + struct gf_sock_incoming *in = NULL; + struct gf_sock_incoming_frag *frag = NULL; + sp_rpcfrag_request_state_t *request = NULL; GF_VALIDATE_OR_GOTO ("socket", this, out); GF_VALIDATE_OR_GOTO ("socket", this->private, out); priv = this->private; - switch (priv->incoming.frag.call_body.request.vector_state) { + /* used to reduce the indirection */ + in = &priv->incoming; + frag = &in->frag; + request = &frag->call_body.request; + + switch (request->vector_state) { case SP_STATE_VECTORED_REQUEST_INIT: - priv->incoming.frag.call_body.request.vector_sizer_state = 0; - addr = rpc_cred_addr (iobuf_ptr (priv->incoming.iobuf)); + request->vector_sizer_state = 0; + + addr = rpc_cred_addr (iobuf_ptr (in->iobuf)); /* also read verf flavour and verflen */ credlen = ntoh32 (*((uint32_t *)addr)) @@ -1064,40 +1228,35 @@ __socket_read_vectored_request (rpc_transport_t *this, rpcsvc_vector_sizer vecto __socket_proto_init_pending (priv, credlen); - priv->incoming.frag.call_body.request.vector_state = - SP_STATE_READING_CREDBYTES; + request->vector_state = SP_STATE_READING_CREDBYTES; /* fall through */ case SP_STATE_READING_CREDBYTES: __socket_proto_read (priv, ret); - priv->incoming.frag.call_body.request.vector_state = - SP_STATE_READ_CREDBYTES; + request->vector_state = SP_STATE_READ_CREDBYTES; /* fall through */ case SP_STATE_READ_CREDBYTES: - addr = rpc_verf_addr (priv->incoming.frag.fragcurrent); + addr = rpc_verf_addr (frag->fragcurrent); verflen = ntoh32 (*((uint32_t *)addr)); if (verflen == 0) { - priv->incoming.frag.call_body.request.vector_state - = SP_STATE_READ_VERFBYTES; + request->vector_state = SP_STATE_READ_VERFBYTES; goto sp_state_read_verfbytes; } __socket_proto_init_pending (priv, verflen); - priv->incoming.frag.call_body.request.vector_state - = SP_STATE_READING_VERFBYTES; + request->vector_state = SP_STATE_READING_VERFBYTES; /* fall through */ case SP_STATE_READING_VERFBYTES: __socket_proto_read (priv, ret); - priv->incoming.frag.call_body.request.vector_state = - SP_STATE_READ_VERFBYTES; + request->vector_state = SP_STATE_READ_VERFBYTES; /* fall through */ @@ -1105,85 +1264,78 @@ __socket_read_vectored_request (rpc_transport_t *this, rpcsvc_vector_sizer vecto sp_state_read_verfbytes: /* set the base_addr 'persistently' across multiple calls into the state machine */ - priv->incoming.proghdr_base_addr = priv->incoming.frag.fragcurrent; + in->proghdr_base_addr = frag->fragcurrent; - priv->incoming.frag.call_body.request.vector_sizer_state = - vector_sizer (priv->incoming.frag.call_body.request.vector_sizer_state, - &readsize, priv->incoming.proghdr_base_addr, - priv->incoming.frag.fragcurrent); + request->vector_sizer_state = + vector_sizer (request->vector_sizer_state, + &readsize, in->proghdr_base_addr, + frag->fragcurrent); __socket_proto_init_pending (priv, readsize); - priv->incoming.frag.call_body.request.vector_state - = SP_STATE_READING_PROGHDR; + + request->vector_state = SP_STATE_READING_PROGHDR; /* fall through */ case SP_STATE_READING_PROGHDR: __socket_proto_read (priv, ret); - priv->incoming.frag.call_body.request.vector_state = - SP_STATE_READ_PROGHDR; + + request->vector_state = SP_STATE_READ_PROGHDR; /* fall through */ case SP_STATE_READ_PROGHDR: sp_state_read_proghdr: - priv->incoming.frag.call_body.request.vector_sizer_state = - vector_sizer (priv->incoming.frag.call_body.request.vector_sizer_state, - &readsize, - priv->incoming.proghdr_base_addr, - priv->incoming.frag.fragcurrent); + request->vector_sizer_state = + vector_sizer (request->vector_sizer_state, + &readsize, in->proghdr_base_addr, + frag->fragcurrent); if (readsize == 0) { - priv->incoming.frag.call_body.request.vector_state = - SP_STATE_READ_PROGHDR_XDATA; + request->vector_state = SP_STATE_READ_PROGHDR_XDATA; goto sp_state_read_proghdr_xdata; } __socket_proto_init_pending (priv, readsize); - priv->incoming.frag.call_body.request.vector_state = - SP_STATE_READING_PROGHDR_XDATA; + request->vector_state = SP_STATE_READING_PROGHDR_XDATA; /* fall through */ case SP_STATE_READING_PROGHDR_XDATA: __socket_proto_read (priv, ret); - priv->incoming.frag.call_body.request.vector_state = - SP_STATE_READ_PROGHDR; + request->vector_state = SP_STATE_READ_PROGHDR; /* check if the vector_sizer() has more to say */ goto sp_state_read_proghdr; case SP_STATE_READ_PROGHDR_XDATA: sp_state_read_proghdr_xdata: - if (priv->incoming.payload_vector.iov_base == NULL) { + if (in->payload_vector.iov_base == NULL) { - size = RPC_FRAGSIZE (priv->incoming.fraghdr) - - priv->incoming.frag.bytes_read; + size = RPC_FRAGSIZE (in->fraghdr) - frag->bytes_read; iobuf = iobuf_get2 (this->ctx->iobuf_pool, size); if (!iobuf) { ret = -1; break; } - if (priv->incoming.iobref == NULL) { - priv->incoming.iobref = iobref_new (); - if (priv->incoming.iobref == NULL) { + if (in->iobref == NULL) { + in->iobref = iobref_new (); + if (in->iobref == NULL) { ret = -1; iobuf_unref (iobuf); break; } } - iobref_add (priv->incoming.iobref, iobuf); + iobref_add (in->iobref, iobuf); iobuf_unref (iobuf); - priv->incoming.payload_vector.iov_base - = iobuf_ptr (iobuf); + in->payload_vector.iov_base = iobuf_ptr (iobuf); - priv->incoming.frag.fragcurrent = iobuf_ptr (iobuf); + frag->fragcurrent = iobuf_ptr (iobuf); } - priv->incoming.frag.call_body.request.vector_state = - SP_STATE_READING_PROG; + request->vector_state = SP_STATE_READING_PROG; /* fall through */ @@ -1194,19 +1346,15 @@ sp_state_read_proghdr_xdata: ret = __socket_read_simple_msg (this); - remaining_size = RPC_FRAGSIZE (priv->incoming.fraghdr) - - priv->incoming.frag.bytes_read; + remaining_size = RPC_FRAGSIZE (in->fraghdr) - frag->bytes_read; - if ((ret == -1) - || ((ret == 0) - && (remaining_size == 0) - && RPC_LASTFRAG (priv->incoming.fraghdr))) { - priv->incoming.frag.call_body.request.vector_state - = SP_STATE_VECTORED_REQUEST_INIT; - priv->incoming.payload_vector.iov_len - = (unsigned long)priv->incoming.frag.fragcurrent - - (unsigned long) - priv->incoming.payload_vector.iov_base; + if ((ret == -1) || + ((ret == 0) && (remaining_size == 0) + && RPC_LASTFRAG (in->fraghdr))) { + request->vector_state = SP_STATE_VECTORED_REQUEST_INIT; + in->payload_vector.iov_len + = ((unsigned long)frag->fragcurrent + - (unsigned long)in->payload_vector.iov_base); } break; } @@ -1215,7 +1363,7 @@ out: return ret; } -inline int +static inline int __socket_read_request (rpc_transport_t *this) { socket_private_t *priv = NULL; @@ -1224,46 +1372,53 @@ __socket_read_request (rpc_transport_t *this) int ret = -1; char *buf = NULL; rpcsvc_vector_sizer vector_sizer = NULL; + struct gf_sock_incoming *in = NULL; + struct gf_sock_incoming_frag *frag = NULL; + sp_rpcfrag_request_state_t *request = NULL; GF_VALIDATE_OR_GOTO ("socket", this, out); GF_VALIDATE_OR_GOTO ("socket", this->private, out); priv = this->private; - switch (priv->incoming.frag.call_body.request.header_state) { + /* used to reduce the indirection */ + in = &priv->incoming; + frag = &in->frag; + request = &frag->call_body.request; + + switch (request->header_state) { case SP_STATE_REQUEST_HEADER_INIT: __socket_proto_init_pending (priv, RPC_CALL_BODY_SIZE); - priv->incoming.frag.call_body.request.header_state - = SP_STATE_READING_RPCHDR1; + request->header_state = SP_STATE_READING_RPCHDR1; /* fall through */ case SP_STATE_READING_RPCHDR1: __socket_proto_read (priv, ret); - priv->incoming.frag.call_body.request.header_state = - SP_STATE_READ_RPCHDR1; + request->header_state = SP_STATE_READ_RPCHDR1; /* fall through */ case SP_STATE_READ_RPCHDR1: - buf = rpc_prognum_addr (iobuf_ptr (priv->incoming.iobuf)); + buf = rpc_prognum_addr (iobuf_ptr (in->iobuf)); prognum = ntoh32 (*((uint32_t *)buf)); - buf = rpc_progver_addr (iobuf_ptr (priv->incoming.iobuf)); + buf = rpc_progver_addr (iobuf_ptr (in->iobuf)); progver = ntoh32 (*((uint32_t *)buf)); - buf = rpc_procnum_addr (iobuf_ptr (priv->incoming.iobuf)); + buf = rpc_procnum_addr (iobuf_ptr (in->iobuf)); procnum = ntoh32 (*((uint32_t *)buf)); - if (this->listener) { - /* this check is needed as rpcsvc and rpc-clnt actor structures are - * not same */ - vector_sizer = rpcsvc_get_program_vector_sizer ((rpcsvc_t *)this->mydata, - prognum, progver, procnum); + if (priv->is_server) { + /* this check is needed as rpcsvc and rpc-clnt + * actor structures are not same */ + vector_sizer = + rpcsvc_get_program_vector_sizer ((rpcsvc_t *)this->mydata, + prognum, progver, procnum); } if (vector_sizer) { @@ -1272,15 +1427,13 @@ __socket_read_request (rpc_transport_t *this) ret = __socket_read_simple_request (this); } - remaining_size = RPC_FRAGSIZE (priv->incoming.fraghdr) - - priv->incoming.frag.bytes_read; + remaining_size = RPC_FRAGSIZE (in->fraghdr) - frag->bytes_read; if ((ret == -1) || ((ret == 0) && (remaining_size == 0) - && (RPC_LASTFRAG (priv->incoming.fraghdr)))) { - priv->incoming.frag.call_body.request.header_state = - SP_STATE_REQUEST_HEADER_INIT; + && (RPC_LASTFRAG (in->fraghdr)))) { + request->header_state = SP_STATE_REQUEST_HEADER_INIT; } break; @@ -1291,7 +1444,7 @@ out: } -inline int +static inline int __socket_read_accepted_successful_reply (rpc_transport_t *this) { socket_private_t *priv = NULL; @@ -1302,23 +1455,29 @@ __socket_read_accepted_successful_reply (rpc_transport_t *this) ssize_t default_read_size = 0; char *proghdr_buf = NULL; XDR xdr; + struct gf_sock_incoming *in = NULL; + struct gf_sock_incoming_frag *frag = NULL; GF_VALIDATE_OR_GOTO ("socket", this, out); GF_VALIDATE_OR_GOTO ("socket", this->private, out); priv = this->private; - switch (priv->incoming.frag.call_body.reply.accepted_success_state) { + /* used to reduce the indirection */ + in = &priv->incoming; + frag = &in->frag; + + switch (frag->call_body.reply.accepted_success_state) { case SP_STATE_ACCEPTED_SUCCESS_REPLY_INIT: default_read_size = xdr_sizeof ((xdrproc_t) xdr_gfs3_read_rsp, &read_rsp); - proghdr_buf = priv->incoming.frag.fragcurrent; + proghdr_buf = frag->fragcurrent; __socket_proto_init_pending (priv, default_read_size); - priv->incoming.frag.call_body.reply.accepted_success_state + frag->call_body.reply.accepted_success_state = SP_STATE_READING_PROC_HEADER; /* fall through */ @@ -1341,28 +1500,27 @@ __socket_read_accepted_successful_reply (rpc_transport_t *this) size = roof (read_rsp.xdata.xdata_len, 4); if (!size) { - priv->incoming.frag.call_body.reply.accepted_success_state + frag->call_body.reply.accepted_success_state = SP_STATE_READ_PROC_OPAQUE; goto read_proc_opaque; } __socket_proto_init_pending (priv, size); - priv->incoming.frag.call_body.reply.accepted_success_state + frag->call_body.reply.accepted_success_state = SP_STATE_READING_PROC_OPAQUE; case SP_STATE_READING_PROC_OPAQUE: __socket_proto_read (priv, ret); - priv->incoming.frag.call_body.reply.accepted_success_state + frag->call_body.reply.accepted_success_state = SP_STATE_READ_PROC_OPAQUE; case SP_STATE_READ_PROC_OPAQUE: read_proc_opaque: - if (priv->incoming.payload_vector.iov_base == NULL) { + if (in->payload_vector.iov_base == NULL) { - size = (RPC_FRAGSIZE (priv->incoming.fraghdr) - - priv->incoming.frag.bytes_read); + size = (RPC_FRAGSIZE (in->fraghdr) - frag->bytes_read); iobuf = iobuf_get2 (this->ctx->iobuf_pool, size); if (iobuf == NULL) { @@ -1370,28 +1528,26 @@ __socket_read_accepted_successful_reply (rpc_transport_t *this) goto out; } - if (priv->incoming.iobref == NULL) { - priv->incoming.iobref = iobref_new (); - if (priv->incoming.iobref == NULL) { + if (in->iobref == NULL) { + in->iobref = iobref_new (); + if (in->iobref == NULL) { ret = -1; iobuf_unref (iobuf); goto out; } } - iobref_add (priv->incoming.iobref, iobuf); + iobref_add (in->iobref, iobuf); iobuf_unref (iobuf); - priv->incoming.payload_vector.iov_base - = iobuf_ptr (iobuf); + in->payload_vector.iov_base = iobuf_ptr (iobuf); - priv->incoming.payload_vector.iov_len = size; + in->payload_vector.iov_len = size; } - priv->incoming.frag.fragcurrent - = priv->incoming.payload_vector.iov_base; + frag->fragcurrent = in->payload_vector.iov_base; - priv->incoming.frag.call_body.reply.accepted_success_state + frag->call_body.reply.accepted_success_state = SP_STATE_READ_PROC_HEADER; /* fall through */ @@ -1400,9 +1556,8 @@ __socket_read_accepted_successful_reply (rpc_transport_t *this) /* now read the entire remaining msg into new iobuf */ ret = __socket_read_simple_msg (this); if ((ret == -1) - || ((ret == 0) - && RPC_LASTFRAG (priv->incoming.fraghdr))) { - priv->incoming.frag.call_body.reply.accepted_success_state + || ((ret == 0) && RPC_LASTFRAG (in->fraghdr))) { + frag->call_body.reply.accepted_success_state = SP_STATE_ACCEPTED_SUCCESS_REPLY_INIT; } @@ -1416,7 +1571,7 @@ out: #define rpc_reply_verflen_addr(fragcurrent) ((char *)fragcurrent - 4) #define rpc_reply_accept_status_addr(fragcurrent) ((char *)fragcurrent - 4) -inline int +static inline int __socket_read_accepted_reply (rpc_transport_t *this) { socket_private_t *priv = NULL; @@ -1424,19 +1579,24 @@ __socket_read_accepted_reply (rpc_transport_t *this) char *buf = NULL; uint32_t verflen = 0, len = 0; uint32_t remaining_size = 0; + struct gf_sock_incoming *in = NULL; + struct gf_sock_incoming_frag *frag = NULL; GF_VALIDATE_OR_GOTO ("socket", this, out); GF_VALIDATE_OR_GOTO ("socket", this->private, out); priv = this->private; + /* used to reduce the indirection */ + in = &priv->incoming; + frag = &in->frag; - switch (priv->incoming.frag.call_body.reply.accepted_state) { + switch (frag->call_body.reply.accepted_state) { case SP_STATE_ACCEPTED_REPLY_INIT: __socket_proto_init_pending (priv, RPC_AUTH_FLAVOUR_N_LENGTH_SIZE); - priv->incoming.frag.call_body.reply.accepted_state + frag->call_body.reply.accepted_state = SP_STATE_READING_REPLY_VERFLEN; /* fall through */ @@ -1444,13 +1604,13 @@ __socket_read_accepted_reply (rpc_transport_t *this) case SP_STATE_READING_REPLY_VERFLEN: __socket_proto_read (priv, ret); - priv->incoming.frag.call_body.reply.accepted_state + frag->call_body.reply.accepted_state = SP_STATE_READ_REPLY_VERFLEN; /* fall through */ case SP_STATE_READ_REPLY_VERFLEN: - buf = rpc_reply_verflen_addr (priv->incoming.frag.fragcurrent); + buf = rpc_reply_verflen_addr (frag->fragcurrent); verflen = ntoh32 (*((uint32_t *) buf)); @@ -1459,7 +1619,7 @@ __socket_read_accepted_reply (rpc_transport_t *this) __socket_proto_init_pending (priv, len); - priv->incoming.frag.call_body.reply.accepted_state + frag->call_body.reply.accepted_state = SP_STATE_READING_REPLY_VERFBYTES; /* fall through */ @@ -1467,19 +1627,19 @@ __socket_read_accepted_reply (rpc_transport_t *this) case SP_STATE_READING_REPLY_VERFBYTES: __socket_proto_read (priv, ret); - priv->incoming.frag.call_body.reply.accepted_state + frag->call_body.reply.accepted_state = SP_STATE_READ_REPLY_VERFBYTES; - buf = rpc_reply_accept_status_addr (priv->incoming.frag.fragcurrent); + buf = rpc_reply_accept_status_addr (frag->fragcurrent); - priv->incoming.frag.call_body.reply.accept_status + frag->call_body.reply.accept_status = ntoh32 (*(uint32_t *) buf); /* fall through */ case SP_STATE_READ_REPLY_VERFBYTES: - if (priv->incoming.frag.call_body.reply.accept_status + if (frag->call_body.reply.accept_status == SUCCESS) { ret = __socket_read_accepted_successful_reply (this); } else { @@ -1489,14 +1649,13 @@ __socket_read_accepted_reply (rpc_transport_t *this) ret = __socket_read_simple_msg (this); } - remaining_size = RPC_FRAGSIZE (priv->incoming.fraghdr) - - priv->incoming.frag.bytes_read; + remaining_size = RPC_FRAGSIZE (in->fraghdr) + - frag->bytes_read; if ((ret == -1) - || ((ret == 0) - && (remaining_size == 0) - && (RPC_LASTFRAG (priv->incoming.fraghdr)))) { - priv->incoming.frag.call_body.reply.accepted_state + || ((ret == 0) && (remaining_size == 0) + && (RPC_LASTFRAG (in->fraghdr)))) { + frag->call_body.reply.accepted_state = SP_STATE_ACCEPTED_REPLY_INIT; } @@ -1508,7 +1667,7 @@ out: } -inline int +static inline int __socket_read_denied_reply (rpc_transport_t *this) { return __socket_read_simple_msg (this); @@ -1518,25 +1677,29 @@ __socket_read_denied_reply (rpc_transport_t *this) #define rpc_reply_status_addr(fragcurrent) ((char *)fragcurrent - 4) -inline int +static inline int __socket_read_vectored_reply (rpc_transport_t *this) { socket_private_t *priv = NULL; int ret = 0; char *buf = NULL; uint32_t remaining_size = 0; + struct gf_sock_incoming *in = NULL; + struct gf_sock_incoming_frag *frag = NULL; GF_VALIDATE_OR_GOTO ("socket", this, out); GF_VALIDATE_OR_GOTO ("socket", this->private, out); priv = this->private; + in = &priv->incoming; + frag = &in->frag; - switch (priv->incoming.frag.call_body.reply.status_state) { + switch (frag->call_body.reply.status_state) { case SP_STATE_ACCEPTED_REPLY_INIT: __socket_proto_init_pending (priv, RPC_REPLY_STATUS_SIZE); - priv->incoming.frag.call_body.reply.status_state + frag->call_body.reply.status_state = SP_STATE_READING_REPLY_STATUS; /* fall through */ @@ -1544,37 +1707,33 @@ __socket_read_vectored_reply (rpc_transport_t *this) case SP_STATE_READING_REPLY_STATUS: __socket_proto_read (priv, ret); - buf = rpc_reply_status_addr (priv->incoming.frag.fragcurrent); + buf = rpc_reply_status_addr (frag->fragcurrent); - priv->incoming.frag.call_body.reply.accept_status + frag->call_body.reply.accept_status = ntoh32 (*((uint32_t *) buf)); - priv->incoming.frag.call_body.reply.status_state + frag->call_body.reply.status_state = SP_STATE_READ_REPLY_STATUS; /* fall through */ case SP_STATE_READ_REPLY_STATUS: - if (priv->incoming.frag.call_body.reply.accept_status - == MSG_ACCEPTED) { + if (frag->call_body.reply.accept_status == MSG_ACCEPTED) { ret = __socket_read_accepted_reply (this); } else { ret = __socket_read_denied_reply (this); } - remaining_size = RPC_FRAGSIZE (priv->incoming.fraghdr) - - priv->incoming.frag.bytes_read; + remaining_size = RPC_FRAGSIZE (in->fraghdr) - frag->bytes_read; if ((ret == -1) - || ((ret == 0) - && (remaining_size == 0) - && (RPC_LASTFRAG (priv->incoming.fraghdr)))) { - priv->incoming.frag.call_body.reply.status_state + || ((ret == 0) && (remaining_size == 0) + && (RPC_LASTFRAG (in->fraghdr)))) { + frag->call_body.reply.status_state = SP_STATE_ACCEPTED_REPLY_INIT; - priv->incoming.payload_vector.iov_len - = (unsigned long)priv->incoming.frag.fragcurrent - - (unsigned long) - priv->incoming.payload_vector.iov_base; + in->payload_vector.iov_len + = (unsigned long)frag->fragcurrent + - (unsigned long)in->payload_vector.iov_base; } break; } @@ -1584,7 +1743,7 @@ out: } -inline int +static inline int __socket_read_simple_reply (rpc_transport_t *this) { return __socket_read_simple_msg (this); @@ -1592,7 +1751,7 @@ __socket_read_simple_reply (rpc_transport_t *this) #define rpc_xid_addr(buf) (buf) -inline int +static inline int __socket_read_reply (rpc_transport_t *this) { socket_private_t *priv = NULL; @@ -1600,26 +1759,29 @@ __socket_read_reply (rpc_transport_t *this) int32_t ret = -1; rpc_request_info_t *request_info = NULL; char map_xid = 0; + struct gf_sock_incoming *in = NULL; + struct gf_sock_incoming_frag *frag = NULL; GF_VALIDATE_OR_GOTO ("socket", this, out); GF_VALIDATE_OR_GOTO ("socket", this->private, out); priv = this->private; + in = &priv->incoming; + frag = &in->frag; - buf = rpc_xid_addr (iobuf_ptr (priv->incoming.iobuf)); + buf = rpc_xid_addr (iobuf_ptr (in->iobuf)); - if (priv->incoming.request_info == NULL) { - priv->incoming.request_info = GF_CALLOC (1, - sizeof (*request_info), - gf_common_mt_rpc_trans_reqinfo_t); - if (priv->incoming.request_info == NULL) { + if (in->request_info == NULL) { + in->request_info = GF_CALLOC (1, sizeof (*request_info), + gf_common_mt_rpc_trans_reqinfo_t); + if (in->request_info == NULL) { goto out; } map_xid = 1; } - request_info = priv->incoming.request_info; + request_info = in->request_info; if (map_xid) { request_info->xid = ntoh32 (*((uint32_t *) buf)); @@ -1631,7 +1793,7 @@ __socket_read_reply (rpc_transport_t *this) { ret = rpc_transport_notify (this, RPC_TRANSPORT_MAP_XID_REQUEST, - priv->incoming.request_info); + in->request_info); } pthread_mutex_lock (&priv->lock); @@ -1645,10 +1807,8 @@ __socket_read_reply (rpc_transport_t *this) if ((request_info->prognum == GLUSTER_FOP_PROGRAM) && (request_info->procnum == GF_FOP_READ)) { if (map_xid && request_info->rsp.rsp_payload_count != 0) { - priv->incoming.iobref - = iobref_ref (request_info->rsp.rsp_iobref); - priv->incoming.payload_vector - = *request_info->rsp.rsp_payload; + in->iobref = iobref_ref (request_info->rsp.rsp_iobref); + in->payload_vector = *request_info->rsp.rsp_payload; } ret = __socket_read_vectored_reply (this); @@ -1661,42 +1821,47 @@ out: /* returns the number of bytes yet to be read in a fragment */ -inline int +static inline int __socket_read_frag (rpc_transport_t *this) { socket_private_t *priv = NULL; int32_t ret = 0; char *buf = NULL; uint32_t remaining_size = 0; + struct gf_sock_incoming *in = NULL; + struct gf_sock_incoming_frag *frag = NULL; GF_VALIDATE_OR_GOTO ("socket", this, out); GF_VALIDATE_OR_GOTO ("socket", this->private, out); priv = this->private; + /* used to reduce the indirection */ + in = &priv->incoming; + frag = &in->frag; - switch (priv->incoming.frag.state) { + switch (frag->state) { case SP_STATE_NADA: __socket_proto_init_pending (priv, RPC_MSGTYPE_SIZE); - priv->incoming.frag.state = SP_STATE_READING_MSGTYPE; + frag->state = SP_STATE_READING_MSGTYPE; /* fall through */ case SP_STATE_READING_MSGTYPE: __socket_proto_read (priv, ret); - priv->incoming.frag.state = SP_STATE_READ_MSGTYPE; + frag->state = SP_STATE_READ_MSGTYPE; /* fall through */ case SP_STATE_READ_MSGTYPE: - buf = rpc_msgtype_addr (iobuf_ptr (priv->incoming.iobuf)); - priv->incoming.msg_type = ntoh32 (*((uint32_t *)buf)); + buf = rpc_msgtype_addr (iobuf_ptr (in->iobuf)); + in->msg_type = ntoh32 (*((uint32_t *)buf)); - if (priv->incoming.msg_type == CALL) { + if (in->msg_type == CALL) { ret = __socket_read_request (this); - } else if (priv->incoming.msg_type == REPLY) { + } else if (in->msg_type == REPLY) { ret = __socket_read_reply (this); - } else if (priv->incoming.msg_type == GF_UNIVERSAL_ANSWER) { + } else if (in->msg_type == GF_UNIVERSAL_ANSWER) { gf_log ("rpc", GF_LOG_ERROR, "older version of protocol/process trying to " "connect from %s. use newer version on that node", @@ -1704,19 +1869,17 @@ __socket_read_frag (rpc_transport_t *this) } else { gf_log ("rpc", GF_LOG_ERROR, "wrong MSG-TYPE (%d) received from %s", - priv->incoming.msg_type, + in->msg_type, this->peerinfo.identifier); ret = -1; } - remaining_size = RPC_FRAGSIZE (priv->incoming.fraghdr) - - priv->incoming.frag.bytes_read; + remaining_size = RPC_FRAGSIZE (in->fraghdr) - frag->bytes_read; if ((ret == -1) - || ((ret == 0) - && (remaining_size == 0) - && (RPC_LASTFRAG (priv->incoming.fraghdr)))) { - priv->incoming.frag.state = SP_STATE_NADA; + || ((ret == 0) && (remaining_size == 0) + && (RPC_LASTFRAG (in->fraghdr)))) { + frag->state = SP_STATE_NADA; } break; @@ -1727,31 +1890,36 @@ out: } -inline +static inline void __socket_reset_priv (socket_private_t *priv) { - if (priv->incoming.iobref) { - iobref_unref (priv->incoming.iobref); - priv->incoming.iobref = NULL; + struct gf_sock_incoming *in = NULL; + + /* used to reduce the indirection */ + in = &priv->incoming; + + if (in->iobref) { + iobref_unref (in->iobref); + in->iobref = NULL; } - if (priv->incoming.iobuf) { - iobuf_unref (priv->incoming.iobuf); + if (in->iobuf) { + iobuf_unref (in->iobuf); } - if (priv->incoming.request_info != NULL) { - GF_FREE (priv->incoming.request_info); - priv->incoming.request_info = NULL; + if (in->request_info != NULL) { + GF_FREE (in->request_info); + in->request_info = NULL; } - memset (&priv->incoming.payload_vector, 0, - sizeof (priv->incoming.payload_vector)); + memset (&in->payload_vector, 0, + sizeof (in->payload_vector)); - priv->incoming.iobuf = NULL; + in->iobuf = NULL; } -int +static int __socket_proto_state_machine (rpc_transport_t *this, rpc_transport_pollin_t **pollin) { @@ -1760,46 +1928,40 @@ __socket_proto_state_machine (rpc_transport_t *this, struct iobuf *iobuf = NULL; struct iobref *iobref = NULL; struct iovec vector[2]; + struct gf_sock_incoming *in = NULL; + struct gf_sock_incoming_frag *frag = NULL; GF_VALIDATE_OR_GOTO ("socket", this, out); GF_VALIDATE_OR_GOTO ("socket", this->private, out); priv = this->private; - while (priv->incoming.record_state != SP_STATE_COMPLETE) { - switch (priv->incoming.record_state) { + /* used to reduce the indirection */ + in = &priv->incoming; + frag = &in->frag; + + while (in->record_state != SP_STATE_COMPLETE) { + switch (in->record_state) { case SP_STATE_NADA: - priv->incoming.total_bytes_read = 0; - priv->incoming.payload_vector.iov_len = 0; + in->total_bytes_read = 0; + in->payload_vector.iov_len = 0; - priv->incoming.pending_vector = priv->incoming.vector; - priv->incoming.pending_vector->iov_base = - &priv->incoming.fraghdr; + in->pending_vector = in->vector; + in->pending_vector->iov_base = &in->fraghdr; - priv->incoming.pending_vector->iov_len = - sizeof (priv->incoming.fraghdr); + in->pending_vector->iov_len = sizeof (in->fraghdr); - priv->incoming.record_state = SP_STATE_READING_FRAGHDR; + in->record_state = SP_STATE_READING_FRAGHDR; /* fall through */ case SP_STATE_READING_FRAGHDR: - ret = __socket_readv (this, - priv->incoming.pending_vector, 1, - &priv->incoming.pending_vector, - &priv->incoming.pending_count, + ret = __socket_readv (this, in->pending_vector, 1, + &in->pending_vector, + &in->pending_count, NULL); - if (ret == -1) { - if (priv->read_fail_log == 1) { - gf_log (this->name, - ((priv->connected == 1) ? - GF_LOG_WARNING : GF_LOG_DEBUG), - "reading from socket failed. Error (%s)" - ", peer (%s)", strerror (errno), - this->peerinfo.identifier); - } + if (ret == -1) goto out; - } if (ret > 0) { gf_log (this->name, GF_LOG_TRACE, "partial " @@ -1808,44 +1970,40 @@ __socket_proto_state_machine (rpc_transport_t *this, } if (ret == 0) { - priv->incoming.record_state = - SP_STATE_READ_FRAGHDR; + in->record_state = SP_STATE_READ_FRAGHDR; } /* fall through */ case SP_STATE_READ_FRAGHDR: - priv->incoming.fraghdr = ntoh32 (priv->incoming.fraghdr); - priv->incoming.total_bytes_read - += RPC_FRAGSIZE(priv->incoming.fraghdr); + in->fraghdr = ntoh32 (in->fraghdr); + in->total_bytes_read += RPC_FRAGSIZE(in->fraghdr); iobuf = iobuf_get2 (this->ctx->iobuf_pool, - priv->incoming.total_bytes_read + - sizeof (priv->incoming.fraghdr)); + (in->total_bytes_read + + sizeof (in->fraghdr))); if (!iobuf) { ret = -ENOMEM; goto out; } - priv->incoming.iobuf = iobuf; - priv->incoming.iobuf_size = 0; - priv->incoming.frag.fragcurrent = iobuf_ptr (iobuf); - priv->incoming.record_state = SP_STATE_READING_FRAG; + in->iobuf = iobuf; + in->iobuf_size = 0; + frag->fragcurrent = iobuf_ptr (iobuf); + in->record_state = SP_STATE_READING_FRAG; /* fall through */ case SP_STATE_READING_FRAG: ret = __socket_read_frag (this); - if ((ret == -1) - || (priv->incoming.frag.bytes_read != - RPC_FRAGSIZE (priv->incoming.fraghdr))) { + if ((ret == -1) || + (frag->bytes_read != RPC_FRAGSIZE (in->fraghdr))) { goto out; } - priv->incoming.frag.bytes_read = 0; + frag->bytes_read = 0; - if (!RPC_LASTFRAG (priv->incoming.fraghdr)) { - priv->incoming.record_state = - SP_STATE_READING_FRAGHDR; + if (!RPC_LASTFRAG (in->fraghdr)) { + in->record_state = SP_STATE_READING_FRAGHDR; break; } @@ -1854,44 +2012,39 @@ __socket_proto_state_machine (rpc_transport_t *this, */ if (pollin != NULL) { int count = 0; - priv->incoming.iobuf_size - = priv->incoming.total_bytes_read - - priv->incoming.payload_vector.iov_len; + in->iobuf_size = (in->total_bytes_read - + in->payload_vector.iov_len); memset (vector, 0, sizeof (vector)); - if (priv->incoming.iobref == NULL) { - priv->incoming.iobref = iobref_new (); - if (priv->incoming.iobref == NULL) { + if (in->iobref == NULL) { + in->iobref = iobref_new (); + if (in->iobref == NULL) { ret = -1; goto out; } } - vector[count].iov_base - = iobuf_ptr (priv->incoming.iobuf); - vector[count].iov_len - = priv->incoming.iobuf_size; + vector[count].iov_base = iobuf_ptr (in->iobuf); + vector[count].iov_len = in->iobuf_size; - iobref = priv->incoming.iobref; + iobref = in->iobref; count++; - if (priv->incoming.payload_vector.iov_base - != NULL) { - vector[count] - = priv->incoming.payload_vector; + if (in->payload_vector.iov_base != NULL) { + vector[count] = in->payload_vector; count++; } *pollin = rpc_transport_pollin_alloc (this, vector, count, - priv->incoming.iobuf, + in->iobuf, iobref, - priv->incoming.request_info); - iobuf_unref (priv->incoming.iobuf); - priv->incoming.iobuf = NULL; + in->request_info); + iobuf_unref (in->iobuf); + in->iobuf = NULL; if (*pollin == NULL) { gf_log (this->name, GF_LOG_WARNING, @@ -1899,12 +2052,12 @@ __socket_proto_state_machine (rpc_transport_t *this, ret = -1; goto out; } - if (priv->incoming.msg_type == REPLY) + if (in->msg_type == REPLY) (*pollin)->is_reply = 1; - priv->incoming.request_info = NULL; + in->request_info = NULL; } - priv->incoming.record_state = SP_STATE_COMPLETE; + in->record_state = SP_STATE_COMPLETE; break; case SP_STATE_COMPLETE: @@ -1916,8 +2069,8 @@ __socket_proto_state_machine (rpc_transport_t *this, } } - if (priv->incoming.record_state == SP_STATE_COMPLETE) { - priv->incoming.record_state = SP_STATE_NADA; + if (in->record_state == SP_STATE_COMPLETE) { + in->record_state = SP_STATE_NADA; __socket_reset_priv (priv); } @@ -1929,7 +2082,7 @@ out: } -int +static int socket_proto_state_machine (rpc_transport_t *this, rpc_transport_pollin_t **pollin) { @@ -1952,17 +2105,22 @@ out: } -int +static int socket_event_poll_in (rpc_transport_t *this) { int ret = -1; rpc_transport_pollin_t *pollin = NULL; + socket_private_t *priv = this->private; ret = socket_proto_state_machine (this, &pollin); if (pollin != NULL) { + priv->ot_state = OT_CALLBACK; ret = rpc_transport_notify (this, RPC_TRANSPORT_MSG_RECEIVED, pollin); + if (priv->ot_state == OT_CALLBACK) { + priv->ot_state = OT_RUNNING; + } rpc_transport_pollin_destroy (pollin); } @@ -1970,7 +2128,7 @@ socket_event_poll_in (rpc_transport_t *this) } -int +static int socket_connect_finish (rpc_transport_t *this) { int ret = -1; @@ -1988,6 +2146,8 @@ socket_connect_finish (rpc_transport_t *this) if (priv->connected != 0) goto unlock; + get_transport_identifiers (this); + ret = __socket_connect_finish (priv->sock); if (ret == -1 && errno == EINPROGRESS) @@ -2002,8 +2162,6 @@ socket_connect_finish (rpc_transport_t *this) priv->connect_finish_log = 1; } __socket_disconnect (this); - notify_rpc = 1; - event = RPC_TRANSPORT_DISCONNECT; goto unlock; } @@ -2028,7 +2186,6 @@ socket_connect_finish (rpc_transport_t *this) priv->connected = 1; priv->connect_finish_log = 0; event = RPC_TRANSPORT_CONNECT; - get_transport_identifiers (this); } } unlock: @@ -2043,7 +2200,7 @@ out: /* reads rpc_requests during pollin */ -int +static int socket_event_handler (int fd, int idx, void *data, int poll_in, int poll_out, int poll_err) { @@ -2088,7 +2245,7 @@ out: } -void * +static void * socket_poller (void *ctx) { rpc_transport_t *this = ctx; @@ -2096,7 +2253,9 @@ socket_poller (void *ctx) struct pollfd pfd[2] = {{0,},}; gf_boolean_t to_write = _gf_false; int ret = 0; - int orig_gen; + uint32_t gen = 0; + + priv->ot_state = OT_RUNNING; if (priv->use_ssl) { if (ssl_setup_connection(this,priv->connected) < 0) { @@ -2116,8 +2275,6 @@ socket_poller (void *ctx) } } - orig_gen = ++(priv->socket_gen); - if (priv->connected == 0) { THIS = this->xl; ret = socket_connect_finish (this); @@ -2125,20 +2282,17 @@ socket_poller (void *ctx) gf_log (this->name, GF_LOG_WARNING, "asynchronous socket_connect_finish failed"); } - ret = rpc_transport_notify (this->listener, - RPC_TRANSPORT_ACCEPT, this); - if (ret != 0) { - gf_log (this->name, GF_LOG_WARNING, - "asynchronous rpc_transport_notify failed"); - } } + ret = rpc_transport_notify (this->listener, + RPC_TRANSPORT_ACCEPT, this); + if (ret != 0) { + gf_log (this->name, GF_LOG_WARNING, + "asynchronous rpc_transport_notify failed"); + } + + gen = priv->ot_gen; for (;;) { - if (priv->socket_gen != orig_gen) { - gf_log(this->name,GF_LOG_DEBUG, - "redundant poller exiting"); - return NULL; - } pthread_mutex_lock(&priv->lock); to_write = !list_empty(&priv->ioq); pthread_mutex_unlock(&priv->lock); @@ -2174,6 +2328,13 @@ socket_poller (void *ctx) else if (errno == ENOTCONN) { ret = 0; } + if (priv->ot_state == OT_PLEASE_DIE) { + gf_log (this->name, GF_LOG_TRACE, + "OT_IDLE on %p (input request)", + this); + priv->ot_state = OT_IDLE; + break; + } } else if (pfd[1].revents & POLL_MASK_OUTPUT) { ret = socket_event_poll_out(this); @@ -2184,6 +2345,13 @@ socket_poller (void *ctx) else if (errno == ENOTCONN) { ret = 0; } + if (priv->ot_state == OT_PLEASE_DIE) { + gf_log (this->name, GF_LOG_TRACE, + "OT_IDLE on %p (output request)", + this); + priv->ot_state = OT_IDLE; + break; + } } else { /* @@ -2204,19 +2372,64 @@ socket_poller (void *ctx) "error in polling loop"); break; } + if (priv->ot_gen != gen) { + gf_log (this->name, GF_LOG_TRACE, + "generation mismatch, my %u != %u", + gen, priv->ot_gen); + return NULL; + } } err: /* All (and only) I/O errors should come here. */ - __socket_disconnect (this); - rpc_transport_notify (this, RPC_TRANSPORT_DISCONNECT, this); - rpc_transport_unref (this); + pthread_mutex_lock(&priv->lock); + if (priv->ssl_ssl) { + /* + * We're always responsible for this part, but only actually + * have to do it if we got far enough for ssl_ssl to be valid + * (i.e. errors in ssl_setup_connection don't count). + */ + ssl_teardown_connection(priv); + } + __socket_shutdown(this); + close(priv->sock); + priv->sock = -1; + priv->ot_state = OT_IDLE; + pthread_mutex_unlock(&priv->lock); + rpc_transport_notify (this->listener, RPC_TRANSPORT_DISCONNECT, + this); + rpc_transport_unref (this); return NULL; } +static void +socket_spawn (rpc_transport_t *this) +{ + socket_private_t *priv = this->private; -int + switch (priv->ot_state) { + case OT_IDLE: + case OT_PLEASE_DIE: + break; + default: + gf_log (this->name, GF_LOG_WARNING, + "refusing to start redundant poller"); + return; + } + + priv->ot_gen += 7; + priv->ot_state = OT_SPAWNING; + gf_log (this->name, GF_LOG_TRACE, + "spawning %p with gen %u", this, priv->ot_gen); + + if (gf_thread_create(&priv->thread,NULL,socket_poller,this) != 0) { + gf_log (this->name, GF_LOG_ERROR, + "could not create poll thread"); + } +} + +static int socket_server_event_handler (int fd, int idx, void *data, int poll_in, int poll_out, int poll_err) { @@ -2254,7 +2467,7 @@ socket_server_event_handler (int fd, int idx, void *data, goto unlock; } - if (priv->nodelay) { + if (priv->nodelay && (new_sockaddr.ss_family != AF_UNIX)) { ret = __socket_nodelay (new_sock); if (ret == -1) { gf_log (this->name, GF_LOG_WARNING, @@ -2264,8 +2477,10 @@ socket_server_event_handler (int fd, int idx, void *data, } } - if (priv->keepalive) { + if (priv->keepalive && + new_sockaddr.ss_family != AF_UNIX) { ret = __socket_keepalive (new_sock, + new_sockaddr.ss_family, priv->keepaliveintvl, priv->keepaliveidle); if (ret == -1) @@ -2279,6 +2494,15 @@ socket_server_event_handler (int fd, int idx, void *data, if (!new_trans) goto unlock; + ret = pthread_mutex_init(&new_trans->lock, NULL); + if (ret == -1) { + gf_log (this->name, GF_LOG_WARNING, + "pthread_mutex_init() failed: %s", + strerror (errno)); + close (new_sock); + goto unlock; + } + new_trans->name = gf_strdup (this->name); memcpy (&new_trans->peerinfo.sockaddr, &new_sockaddr, @@ -2350,6 +2574,7 @@ socket_server_event_handler (int fd, int idx, void *data, * connection. */ new_priv->connected = 1; + new_priv->is_server = _gf_true; rpc_transport_ref (new_trans); if (new_priv->own_thread) { @@ -2357,12 +2582,7 @@ socket_server_event_handler (int fd, int idx, void *data, gf_log(this->name,GF_LOG_ERROR, "could not create pipe"); } - if (pthread_create(&new_priv->thread, - NULL, socket_poller, - new_trans) != 0) { - gf_log(this->name,GF_LOG_ERROR, - "could not create poll thread"); - } + socket_spawn(new_trans); } else { new_priv->idx = @@ -2397,7 +2617,7 @@ out: } -int +static int socket_disconnect (rpc_transport_t *this) { socket_private_t *priv = NULL; @@ -2419,7 +2639,7 @@ out: } -int +static int socket_connect (rpc_transport_t *this, int port) { int ret = -1; @@ -2428,7 +2648,9 @@ socket_connect (rpc_transport_t *this, int port) socklen_t sockaddr_len = 0; glusterfs_ctx_t *ctx = NULL; sa_family_t sa_family = {0, }; + char *local_addr = NULL; union gf_sock_union sock_union; + struct sockaddr_in *addr = NULL; GF_VALIDATE_OR_GOTO ("socket", this, err); GF_VALIDATE_OR_GOTO ("socket", this->private, err); @@ -2456,6 +2678,10 @@ socket_connect (rpc_transport_t *this, int port) goto err; } + gf_log (this->name, GF_LOG_TRACE, + "connecting %p, state=%u gen=%u sock=%d", this, + priv->ot_state, priv->ot_gen, priv->sock); + ret = socket_client_get_remote_sockaddr (this, &sock_union.sa, &sockaddr_len, &sa_family); if (ret == -1) { @@ -2525,7 +2751,7 @@ socket_connect (rpc_transport_t *this, int port) } } - if (priv->nodelay) { + if (priv->nodelay && (sa_family != AF_UNIX)) { ret = __socket_nodelay (priv->sock); if (ret == -1) { @@ -2535,8 +2761,9 @@ socket_connect (rpc_transport_t *this, int port) } } - if (priv->keepalive) { + if (priv->keepalive && sa_family != AF_UNIX) { ret = __socket_keepalive (priv->sock, + sa_family, priv->keepaliveintvl, priv->keepaliveidle); if (ret == -1) @@ -2548,6 +2775,15 @@ socket_connect (rpc_transport_t *this, int port) SA (&this->myinfo.sockaddr)->sa_family = SA (&this->peerinfo.sockaddr)->sa_family; + /* If a source addr is explicitly specified, use it */ + ret = dict_get_str (this->options, + "transport.socket.source-addr", + &local_addr); + if (!ret && SA (&this->myinfo.sockaddr)->sa_family == AF_INET) { + addr = (struct sockaddr_in *)(&this->myinfo.sockaddr); + ret = inet_pton (AF_INET, local_addr, &(addr->sin_addr.s_addr)); + } + ret = client_bind (this, SA (&this->myinfo.sockaddr), &this->myinfo.sockaddr_len, priv->sock); if (ret == -1) { @@ -2558,13 +2794,30 @@ socket_connect (rpc_transport_t *this, int port) goto unlock; } + if (!priv->use_ssl && !priv->bio && !priv->own_thread) { + ret = __socket_nonblock (priv->sock); + if (ret == -1) { + gf_log (this->name, GF_LOG_ERROR, + "NBIO on %d failed (%s)", + priv->sock, strerror (errno)); + close (priv->sock); + priv->sock = -1; + goto unlock; + } + } + ret = connect (priv->sock, SA (&this->peerinfo.sockaddr), this->peerinfo.sockaddr_len); if (ret == -1 && ((errno != EINPROGRESS) && (errno != ENOENT))) { - gf_log (this->name, GF_LOG_ERROR, - "connection attempt failed (%s)", - strerror (errno)); + /* For unix path based sockets, the socket path is + * cryptic (md5sum of path) and may not be useful for + * the user in debugging so log it in DEBUG + */ + gf_log (this->name, ((sa_family == AF_UNIX) ? + GF_LOG_DEBUG : GF_LOG_ERROR), + "connection attempt on %s failed, (%s)", + this->peerinfo.identifier, strerror (errno)); close (priv->sock); priv->sock = -1; goto unlock; @@ -2599,6 +2852,7 @@ socket_connect (rpc_transport_t *this, int port) * initializing a client connection. */ priv->connected = 0; + priv->is_server = _gf_false; rpc_transport_ref (this); if (priv->own_thread) { @@ -2607,11 +2861,8 @@ socket_connect (rpc_transport_t *this, int port) "could not create pipe"); } - if (pthread_create(&priv->thread,NULL, - socket_poller, this) != 0) { - gf_log(this->name,GF_LOG_ERROR, - "could not create poll thread"); - } + this->listener = this; + socket_spawn(this); } else { priv->idx = event_register (ctx->event_pool, priv->sock, @@ -2632,7 +2883,7 @@ err: } -int +static int socket_listen (rpc_transport_t *this) { socket_private_t * priv = NULL; @@ -2714,7 +2965,7 @@ socket_listen (rpc_transport_t *this) } } - if (priv->nodelay) { + if (priv->nodelay && (sa_family != AF_UNIX)) { ret = __socket_nodelay (priv->sock); if (ret == -1) { gf_log (this->name, GF_LOG_ERROR, @@ -2783,7 +3034,7 @@ out: } -int32_t +static int32_t socket_submit_request (rpc_transport_t *this, rpc_transport_req_t *req) { socket_private_t *priv = NULL; @@ -2857,7 +3108,7 @@ out: } -int32_t +static int32_t socket_submit_reply (rpc_transport_t *this, rpc_transport_reply_t *reply) { socket_private_t *priv = NULL; @@ -2931,7 +3182,7 @@ out: } -int32_t +static int32_t socket_getpeername (rpc_transport_t *this, char *hostname, int hostlen) { int32_t ret = -1; @@ -2950,7 +3201,7 @@ out: } -int32_t +static int32_t socket_getpeeraddr (rpc_transport_t *this, char *peeraddr, int addrlen, struct sockaddr_storage *sa, socklen_t salen) { @@ -2971,7 +3222,7 @@ out: } -int32_t +static int32_t socket_getmyname (rpc_transport_t *this, char *hostname, int hostlen) { int32_t ret = -1; @@ -2990,7 +3241,7 @@ out: } -int32_t +static int32_t socket_getmyaddr (rpc_transport_t *this, char *myaddr, int addrlen, struct sockaddr_storage *sa, socklen_t salen) { @@ -3010,6 +3261,25 @@ out: } +static int +socket_throttle (rpc_transport_t *this, gf_boolean_t onoff) +{ + socket_private_t *priv = NULL; + + priv = this->private; + + /* The way we implement throttling is by taking off + POLLIN event from the polled flags. This way we + never get called with the POLLIN event and therefore + will never read() any more data until throttling + is turned off. + */ + priv->idx = event_select_on (this->ctx->event_pool, priv->sock, + priv->idx, (int) !onoff, -1); + return 0; +} + + struct rpc_transport_ops tops = { .listen = socket_listen, .connect = socket_connect, @@ -3020,6 +3290,7 @@ struct rpc_transport_ops tops = { .get_peeraddr = socket_getpeeraddr, .get_myname = socket_getmyname, .get_myaddr = socket_getmyaddr, + .throttle = socket_throttle, }; int @@ -3076,7 +3347,7 @@ out: } -int +static int socket_init (rpc_transport_t *this) { socket_private_t *priv = NULL; @@ -3097,6 +3368,7 @@ socket_init (rpc_transport_t *this) if (!priv) { return -1; } + memset(priv,0,sizeof(*priv)); pthread_mutex_init (&priv->lock, NULL); @@ -3322,6 +3594,10 @@ socket_init (rpc_transport_t *this) SSL_CTX_set_verify(priv->ssl_ctx,SSL_VERIFY_PEER,0); } + if (priv->own_thread) { + priv->ot_state = OT_IDLE; + } + out: this->private = priv; return 0; |
