diff options
Diffstat (limited to 'rpc/rpc-transport/socket/src/socket.c')
| -rw-r--r-- | rpc/rpc-transport/socket/src/socket.c | 370 |
1 files changed, 250 insertions, 120 deletions
diff --git a/rpc/rpc-transport/socket/src/socket.c b/rpc/rpc-transport/socket/src/socket.c index 7657e5d35..93da3f296 100644 --- a/rpc/rpc-transport/socket/src/socket.c +++ b/rpc/rpc-transport/socket/src/socket.c @@ -147,21 +147,14 @@ typedef int SSL_trinary_func (SSL *, void *, int); &in->pending_vector, \ &in->pending_count, \ &bytes_read); \ - if (ret == -1) { \ - if (priv->read_fail_log) \ - gf_log (this->name, GF_LOG_WARNING, \ - "reading from socket failed." \ - "Error (%s), peer (%s)", \ - strerror (errno), \ - this->peerinfo.identifier); \ + if (ret == -1) \ break; \ - } \ __socket_proto_update_priv_after_read (priv, ret, bytes_read); \ } -int socket_init (rpc_transport_t *this); +static int socket_init (rpc_transport_t *this); -void +static void ssl_dump_error_stack (const char *caller) { unsigned long errnum = 0; @@ -175,7 +168,7 @@ ssl_dump_error_stack (const char *caller) } } -int +static int ssl_do (rpc_transport_t *this, void *buf, size_t len, SSL_trinary_func *func) { int r = (-1); @@ -246,7 +239,7 @@ out: #define ssl_read_one(t,b,l) ssl_do((t),(b),(l),(SSL_trinary_func *)SSL_read) #define ssl_write_one(t,b,l) ssl_do((t),(b),(l),(SSL_trinary_func *)SSL_write) -int +static int ssl_setup_connection (rpc_transport_t *this, int server) { X509 *peer = NULL; @@ -311,8 +304,17 @@ done: } +static void +ssl_teardown_connection (socket_private_t *priv) +{ + SSL_shutdown(priv->ssl_ssl); + SSL_clear(priv->ssl_ssl); + SSL_free(priv->ssl_ssl); + priv->ssl_ssl = NULL; +} -ssize_t + +static ssize_t __socket_ssl_readv (rpc_transport_t *this, struct iovec *opvector, int opcount) { socket_private_t *priv = NULL; @@ -332,7 +334,7 @@ __socket_ssl_readv (rpc_transport_t *this, struct iovec *opvector, int opcount) } -ssize_t +static ssize_t __socket_ssl_read (rpc_transport_t *this, void *buf, size_t count) { struct iovec iov = {0, }; @@ -347,7 +349,7 @@ __socket_ssl_read (rpc_transport_t *this, void *buf, size_t count) } -int +static int __socket_cached_read (rpc_transport_t *this, struct iovec *opvector, int opcount) { socket_private_t *priv = NULL; @@ -415,6 +417,19 @@ out: return ret; } +static gf_boolean_t +__does_socket_rwv_error_need_logging (socket_private_t *priv, int write) +{ + int read = !write; + + if (priv->connected == -1) /* Didn't even connect, of course it fails */ + return _gf_false; + + if (read && (priv->read_fail_log == _gf_false)) + return _gf_false; + + return _gf_true; +} /* * return value: @@ -423,7 +438,7 @@ out: * > 0 = incomplete */ -int +static int __socket_rwv (rpc_transport_t *this, struct iovec *vector, int count, struct iovec **pending_vector, int *pending_count, size_t *bytes, int write) @@ -498,11 +513,15 @@ __socket_rwv (rpc_transport_t *this, struct iovec *vector, int count, if (errno == EINTR) continue; - if (write || (!write && priv->read_fail_log)) + if (__does_socket_rwv_error_need_logging (priv, + write)) { gf_log (this->name, GF_LOG_WARNING, - "%s failed (%s)", + "%s on %s failed (%s)", write ? "writev":"readv", + this->peerinfo.identifier, strerror (errno)); + } + if (priv->use_ssl) { ssl_dump_error_stack(this->name); } @@ -553,7 +572,7 @@ out: } -int +static int __socket_readv (rpc_transport_t *this, struct iovec *vector, int count, struct iovec **pending_vector, int *pending_count, size_t *bytes) @@ -567,7 +586,7 @@ __socket_readv (rpc_transport_t *this, struct iovec *vector, int count, } -int +static int __socket_writev (rpc_transport_t *this, struct iovec *vector, int count, struct iovec **pending_vector, int *pending_count) { @@ -580,42 +599,56 @@ __socket_writev (rpc_transport_t *this, struct iovec *vector, int count, } -int +static int +__socket_shutdown (rpc_transport_t *this) +{ + int ret = -1; + socket_private_t *priv = this->private; + + priv->connected = -1; + ret = shutdown (priv->sock, SHUT_RDWR); + if (ret) { + /* its already disconnected.. no need to understand + why it failed to shutdown in normal cases */ + gf_log (this->name, GF_LOG_DEBUG, + "shutdown() returned %d. %s", + ret, strerror (errno)); + } + + return ret; +} + +static int __socket_disconnect (rpc_transport_t *this) { - socket_private_t *priv = NULL; int ret = -1; + socket_private_t *priv = NULL; GF_VALIDATE_OR_GOTO ("socket", this, out); GF_VALIDATE_OR_GOTO ("socket", this->private, out); priv = this->private; + gf_log (this->name, GF_LOG_TRACE, + "disconnecting %p, state=%u gen=%u sock=%d", this, + priv->ot_state, priv->ot_gen, priv->sock); + if (priv->sock != -1) { - priv->connected = -1; - ret = shutdown (priv->sock, SHUT_RDWR); - if (ret) { - /* its already disconnected.. no need to understand - why it failed to shutdown in normal cases */ - gf_log (this->name, GF_LOG_DEBUG, - "shutdown() returned %d. %s", - ret, strerror (errno)); - } - if (priv->ssl_ssl) { - SSL_shutdown(priv->ssl_ssl); - SSL_clear(priv->ssl_ssl); - SSL_free(priv->ssl_ssl); - priv->ssl_ssl = NULL; - } + ret = __socket_shutdown(this); if (priv->own_thread) { - /* - * Without this, reconnect (= disconnect + connect) - * won't work except by accident. - */ - close(priv->sock); - priv->sock = -1; - ++(priv->socket_gen); - } + /* + * Without this, reconnect (= disconnect + connect) + * won't work except by accident. + */ + close(priv->sock); + priv->sock = -1; + gf_log (this->name, GF_LOG_TRACE, + "OT_PLEASE_DIE on %p", this); + priv->ot_state = OT_PLEASE_DIE; + } + else if (priv->use_ssl) { + ssl_teardown_connection(priv); + } } out: @@ -623,7 +656,7 @@ out: } -int +static int __socket_server_bind (rpc_transport_t *this) { socket_private_t *priv = NULL; @@ -679,7 +712,7 @@ out: } -int +static int __socket_nonblock (int fd) { int flags = 0; @@ -693,7 +726,7 @@ __socket_nonblock (int fd) return ret; } -int +static int __socket_nodelay (int fd) { int on = 1; @@ -769,7 +802,7 @@ err: } -int +static int __socket_connect_finish (int fd) { int ret = -1; @@ -787,7 +820,7 @@ __socket_connect_finish (int fd) } -void +static void __socket_reset (rpc_transport_t *this) { socket_private_t *priv = NULL; @@ -824,13 +857,13 @@ out: } -void +static void socket_set_lastfrag (uint32_t *fragsize) { (*fragsize) |= 0x80000000U; } -void +static void socket_set_frag_header_size (uint32_t size, char *haddr) { size = htonl (size); @@ -838,14 +871,14 @@ socket_set_frag_header_size (uint32_t size, char *haddr) } -void +static void socket_set_last_frag_header_size (uint32_t size, char *haddr) { socket_set_lastfrag (&size); socket_set_frag_header_size (size, haddr); } -struct ioq * +static struct ioq * __socket_ioq_new (rpc_transport_t *this, rpc_transport_msg_t *msg) { struct ioq *entry = NULL; @@ -912,7 +945,7 @@ out: } -void +static void __socket_ioq_entry_free (struct ioq *entry) { GF_VALIDATE_OR_GOTO ("socket", entry, out); @@ -929,7 +962,7 @@ out: } -void +static void __socket_ioq_flush (rpc_transport_t *this) { socket_private_t *priv = NULL; @@ -950,7 +983,7 @@ out: } -int +static int __socket_ioq_churn_entry (rpc_transport_t *this, struct ioq *entry, int direct) { int ret = -1; @@ -984,7 +1017,7 @@ __socket_ioq_churn_entry (rpc_transport_t *this, struct ioq *entry, int direct) } -int +static int __socket_ioq_churn (rpc_transport_t *this) { socket_private_t *priv = NULL; @@ -1017,7 +1050,7 @@ out: } -int +static int socket_event_poll_err (rpc_transport_t *this) { socket_private_t *priv = NULL; @@ -1042,7 +1075,7 @@ out: } -int +static int socket_event_poll_out (rpc_transport_t *this) { socket_private_t *priv = NULL; @@ -1380,7 +1413,7 @@ __socket_read_request (rpc_transport_t *this) buf = rpc_procnum_addr (iobuf_ptr (in->iobuf)); procnum = ntoh32 (*((uint32_t *)buf)); - if (this->listener) { + if (priv->is_server) { /* this check is needed as rpcsvc and rpc-clnt * actor structures are not same */ vector_sizer = @@ -1886,7 +1919,7 @@ void __socket_reset_priv (socket_private_t *priv) } -int +static int __socket_proto_state_machine (rpc_transport_t *this, rpc_transport_pollin_t **pollin) { @@ -1927,17 +1960,8 @@ __socket_proto_state_machine (rpc_transport_t *this, &in->pending_vector, &in->pending_count, NULL); - if (ret == -1) { - if (priv->read_fail_log == 1) { - gf_log (this->name, - ((priv->connected == 1) ? - GF_LOG_WARNING : GF_LOG_DEBUG), - "reading from socket failed. Error (%s)" - ", peer (%s)", strerror (errno), - this->peerinfo.identifier); - } + if (ret == -1) goto out; - } if (ret > 0) { gf_log (this->name, GF_LOG_TRACE, "partial " @@ -2058,7 +2082,7 @@ out: } -int +static int socket_proto_state_machine (rpc_transport_t *this, rpc_transport_pollin_t **pollin) { @@ -2081,17 +2105,22 @@ out: } -int +static int socket_event_poll_in (rpc_transport_t *this) { int ret = -1; rpc_transport_pollin_t *pollin = NULL; + socket_private_t *priv = this->private; ret = socket_proto_state_machine (this, &pollin); if (pollin != NULL) { + priv->ot_state = OT_CALLBACK; ret = rpc_transport_notify (this, RPC_TRANSPORT_MSG_RECEIVED, pollin); + if (priv->ot_state == OT_CALLBACK) { + priv->ot_state = OT_RUNNING; + } rpc_transport_pollin_destroy (pollin); } @@ -2099,7 +2128,7 @@ socket_event_poll_in (rpc_transport_t *this) } -int +static int socket_connect_finish (rpc_transport_t *this) { int ret = -1; @@ -2133,8 +2162,6 @@ socket_connect_finish (rpc_transport_t *this) priv->connect_finish_log = 1; } __socket_disconnect (this); - notify_rpc = 1; - event = RPC_TRANSPORT_DISCONNECT; goto unlock; } @@ -2173,7 +2200,7 @@ out: /* reads rpc_requests during pollin */ -int +static int socket_event_handler (int fd, int idx, void *data, int poll_in, int poll_out, int poll_err) { @@ -2218,7 +2245,7 @@ out: } -void * +static void * socket_poller (void *ctx) { rpc_transport_t *this = ctx; @@ -2226,7 +2253,9 @@ socket_poller (void *ctx) struct pollfd pfd[2] = {{0,},}; gf_boolean_t to_write = _gf_false; int ret = 0; - int orig_gen; + uint32_t gen = 0; + + priv->ot_state = OT_RUNNING; if (priv->use_ssl) { if (ssl_setup_connection(this,priv->connected) < 0) { @@ -2246,8 +2275,6 @@ socket_poller (void *ctx) } } - orig_gen = ++(priv->socket_gen); - if (priv->connected == 0) { THIS = this->xl; ret = socket_connect_finish (this); @@ -2264,12 +2291,8 @@ socket_poller (void *ctx) "asynchronous rpc_transport_notify failed"); } + gen = priv->ot_gen; for (;;) { - if (priv->socket_gen != orig_gen) { - gf_log(this->name,GF_LOG_DEBUG, - "redundant poller exiting"); - return NULL; - } pthread_mutex_lock(&priv->lock); to_write = !list_empty(&priv->ioq); pthread_mutex_unlock(&priv->lock); @@ -2305,6 +2328,13 @@ socket_poller (void *ctx) else if (errno == ENOTCONN) { ret = 0; } + if (priv->ot_state == OT_PLEASE_DIE) { + gf_log (this->name, GF_LOG_TRACE, + "OT_IDLE on %p (input request)", + this); + priv->ot_state = OT_IDLE; + break; + } } else if (pfd[1].revents & POLL_MASK_OUTPUT) { ret = socket_event_poll_out(this); @@ -2315,6 +2345,13 @@ socket_poller (void *ctx) else if (errno == ENOTCONN) { ret = 0; } + if (priv->ot_state == OT_PLEASE_DIE) { + gf_log (this->name, GF_LOG_TRACE, + "OT_IDLE on %p (output request)", + this); + priv->ot_state = OT_IDLE; + break; + } } else { /* @@ -2335,19 +2372,64 @@ socket_poller (void *ctx) "error in polling loop"); break; } + if (priv->ot_gen != gen) { + gf_log (this->name, GF_LOG_TRACE, + "generation mismatch, my %u != %u", + gen, priv->ot_gen); + return NULL; + } } err: /* All (and only) I/O errors should come here. */ - __socket_disconnect (this); - rpc_transport_notify (this->listener, RPC_TRANSPORT_DISCONNECT, this); - rpc_transport_unref (this); + pthread_mutex_lock(&priv->lock); + if (priv->ssl_ssl) { + /* + * We're always responsible for this part, but only actually + * have to do it if we got far enough for ssl_ssl to be valid + * (i.e. errors in ssl_setup_connection don't count). + */ + ssl_teardown_connection(priv); + } + __socket_shutdown(this); + close(priv->sock); + priv->sock = -1; + priv->ot_state = OT_IDLE; + pthread_mutex_unlock(&priv->lock); + rpc_transport_notify (this->listener, RPC_TRANSPORT_DISCONNECT, + this); + rpc_transport_unref (this); return NULL; } +static void +socket_spawn (rpc_transport_t *this) +{ + socket_private_t *priv = this->private; -int + switch (priv->ot_state) { + case OT_IDLE: + case OT_PLEASE_DIE: + break; + default: + gf_log (this->name, GF_LOG_WARNING, + "refusing to start redundant poller"); + return; + } + + priv->ot_gen += 7; + priv->ot_state = OT_SPAWNING; + gf_log (this->name, GF_LOG_TRACE, + "spawning %p with gen %u", this, priv->ot_gen); + + if (gf_thread_create(&priv->thread,NULL,socket_poller,this) != 0) { + gf_log (this->name, GF_LOG_ERROR, + "could not create poll thread"); + } +} + +static int socket_server_event_handler (int fd, int idx, void *data, int poll_in, int poll_out, int poll_err) { @@ -2385,7 +2467,7 @@ socket_server_event_handler (int fd, int idx, void *data, goto unlock; } - if (priv->nodelay) { + if (priv->nodelay && (new_sockaddr.ss_family != AF_UNIX)) { ret = __socket_nodelay (new_sock); if (ret == -1) { gf_log (this->name, GF_LOG_WARNING, @@ -2395,7 +2477,8 @@ socket_server_event_handler (int fd, int idx, void *data, } } - if (priv->keepalive) { + if (priv->keepalive && + new_sockaddr.ss_family != AF_UNIX) { ret = __socket_keepalive (new_sock, new_sockaddr.ss_family, priv->keepaliveintvl, @@ -2411,6 +2494,15 @@ socket_server_event_handler (int fd, int idx, void *data, if (!new_trans) goto unlock; + ret = pthread_mutex_init(&new_trans->lock, NULL); + if (ret == -1) { + gf_log (this->name, GF_LOG_WARNING, + "pthread_mutex_init() failed: %s", + strerror (errno)); + close (new_sock); + goto unlock; + } + new_trans->name = gf_strdup (this->name); memcpy (&new_trans->peerinfo.sockaddr, &new_sockaddr, @@ -2482,6 +2574,7 @@ socket_server_event_handler (int fd, int idx, void *data, * connection. */ new_priv->connected = 1; + new_priv->is_server = _gf_true; rpc_transport_ref (new_trans); if (new_priv->own_thread) { @@ -2489,12 +2582,7 @@ socket_server_event_handler (int fd, int idx, void *data, gf_log(this->name,GF_LOG_ERROR, "could not create pipe"); } - if (pthread_create(&new_priv->thread, - NULL, socket_poller, - new_trans) != 0) { - gf_log(this->name,GF_LOG_ERROR, - "could not create poll thread"); - } + socket_spawn(new_trans); } else { new_priv->idx = @@ -2529,7 +2617,7 @@ out: } -int +static int socket_disconnect (rpc_transport_t *this) { socket_private_t *priv = NULL; @@ -2551,7 +2639,7 @@ out: } -int +static int socket_connect (rpc_transport_t *this, int port) { int ret = -1; @@ -2590,6 +2678,10 @@ socket_connect (rpc_transport_t *this, int port) goto err; } + gf_log (this->name, GF_LOG_TRACE, + "connecting %p, state=%u gen=%u sock=%d", this, + priv->ot_state, priv->ot_gen, priv->sock); + ret = socket_client_get_remote_sockaddr (this, &sock_union.sa, &sockaddr_len, &sa_family); if (ret == -1) { @@ -2659,7 +2751,7 @@ socket_connect (rpc_transport_t *this, int port) } } - if (priv->nodelay) { + if (priv->nodelay && (sa_family != AF_UNIX)) { ret = __socket_nodelay (priv->sock); if (ret == -1) { @@ -2669,7 +2761,7 @@ socket_connect (rpc_transport_t *this, int port) } } - if (priv->keepalive) { + if (priv->keepalive && sa_family != AF_UNIX) { ret = __socket_keepalive (priv->sock, sa_family, priv->keepaliveintvl, @@ -2702,13 +2794,30 @@ socket_connect (rpc_transport_t *this, int port) goto unlock; } + if (!priv->use_ssl && !priv->bio && !priv->own_thread) { + ret = __socket_nonblock (priv->sock); + if (ret == -1) { + gf_log (this->name, GF_LOG_ERROR, + "NBIO on %d failed (%s)", + priv->sock, strerror (errno)); + close (priv->sock); + priv->sock = -1; + goto unlock; + } + } + ret = connect (priv->sock, SA (&this->peerinfo.sockaddr), this->peerinfo.sockaddr_len); if (ret == -1 && ((errno != EINPROGRESS) && (errno != ENOENT))) { - gf_log (this->name, GF_LOG_ERROR, - "connection attempt failed (%s)", - strerror (errno)); + /* For unix path based sockets, the socket path is + * cryptic (md5sum of path) and may not be useful for + * the user in debugging so log it in DEBUG + */ + gf_log (this->name, ((sa_family == AF_UNIX) ? + GF_LOG_DEBUG : GF_LOG_ERROR), + "connection attempt on %s failed, (%s)", + this->peerinfo.identifier, strerror (errno)); close (priv->sock); priv->sock = -1; goto unlock; @@ -2743,6 +2852,7 @@ socket_connect (rpc_transport_t *this, int port) * initializing a client connection. */ priv->connected = 0; + priv->is_server = _gf_false; rpc_transport_ref (this); if (priv->own_thread) { @@ -2752,11 +2862,7 @@ socket_connect (rpc_transport_t *this, int port) } this->listener = this; - if (pthread_create(&priv->thread,NULL, - socket_poller, this) != 0) { - gf_log(this->name,GF_LOG_ERROR, - "could not create poll thread"); - } + socket_spawn(this); } else { priv->idx = event_register (ctx->event_pool, priv->sock, @@ -2777,7 +2883,7 @@ err: } -int +static int socket_listen (rpc_transport_t *this) { socket_private_t * priv = NULL; @@ -2859,7 +2965,7 @@ socket_listen (rpc_transport_t *this) } } - if (priv->nodelay) { + if (priv->nodelay && (sa_family != AF_UNIX)) { ret = __socket_nodelay (priv->sock); if (ret == -1) { gf_log (this->name, GF_LOG_ERROR, @@ -2928,7 +3034,7 @@ out: } -int32_t +static int32_t socket_submit_request (rpc_transport_t *this, rpc_transport_req_t *req) { socket_private_t *priv = NULL; @@ -3002,7 +3108,7 @@ out: } -int32_t +static int32_t socket_submit_reply (rpc_transport_t *this, rpc_transport_reply_t *reply) { socket_private_t *priv = NULL; @@ -3076,7 +3182,7 @@ out: } -int32_t +static int32_t socket_getpeername (rpc_transport_t *this, char *hostname, int hostlen) { int32_t ret = -1; @@ -3095,7 +3201,7 @@ out: } -int32_t +static int32_t socket_getpeeraddr (rpc_transport_t *this, char *peeraddr, int addrlen, struct sockaddr_storage *sa, socklen_t salen) { @@ -3116,7 +3222,7 @@ out: } -int32_t +static int32_t socket_getmyname (rpc_transport_t *this, char *hostname, int hostlen) { int32_t ret = -1; @@ -3135,7 +3241,7 @@ out: } -int32_t +static int32_t socket_getmyaddr (rpc_transport_t *this, char *myaddr, int addrlen, struct sockaddr_storage *sa, socklen_t salen) { @@ -3155,6 +3261,25 @@ out: } +static int +socket_throttle (rpc_transport_t *this, gf_boolean_t onoff) +{ + socket_private_t *priv = NULL; + + priv = this->private; + + /* The way we implement throttling is by taking off + POLLIN event from the polled flags. This way we + never get called with the POLLIN event and therefore + will never read() any more data until throttling + is turned off. + */ + priv->idx = event_select_on (this->ctx->event_pool, priv->sock, + priv->idx, (int) !onoff, -1); + return 0; +} + + struct rpc_transport_ops tops = { .listen = socket_listen, .connect = socket_connect, @@ -3165,6 +3290,7 @@ struct rpc_transport_ops tops = { .get_peeraddr = socket_getpeeraddr, .get_myname = socket_getmyname, .get_myaddr = socket_getmyaddr, + .throttle = socket_throttle, }; int @@ -3221,7 +3347,7 @@ out: } -int +static int socket_init (rpc_transport_t *this) { socket_private_t *priv = NULL; @@ -3468,6 +3594,10 @@ socket_init (rpc_transport_t *this) SSL_CTX_set_verify(priv->ssl_ctx,SSL_VERIFY_PEER,0); } + if (priv->own_thread) { + priv->ot_state = OT_IDLE; + } + out: this->private = priv; return 0; |
