summaryrefslogtreecommitdiffstats
path: root/rpc/rpc-transport/socket/src/socket.c
diff options
context:
space:
mode:
Diffstat (limited to 'rpc/rpc-transport/socket/src/socket.c')
-rw-r--r--rpc/rpc-transport/socket/src/socket.c361
1 files changed, 196 insertions, 165 deletions
diff --git a/rpc/rpc-transport/socket/src/socket.c b/rpc/rpc-transport/socket/src/socket.c
index a4791437e6d..8ba2692cdc6 100644
--- a/rpc/rpc-transport/socket/src/socket.c
+++ b/rpc/rpc-transport/socket/src/socket.c
@@ -2680,108 +2680,107 @@ socket_server_event_handler (int fd, int idx, void *data,
priv = this->private;
ctx = this->ctx;
- pthread_mutex_lock (&priv->lock);
- {
- priv->idx = idx;
-
- if (poll_in) {
- new_sock = accept (priv->sock, SA (&new_sockaddr),
- &addrlen);
-
- if (new_sock == -1) {
- gf_log (this->name, GF_LOG_WARNING,
- "accept on %d failed (%s)",
- priv->sock, strerror (errno));
- goto unlock;
- }
-
- if (priv->nodelay && (new_sockaddr.ss_family != AF_UNIX)) {
- ret = __socket_nodelay (new_sock);
- if (ret == -1) {
- gf_log (this->name, GF_LOG_WARNING,
- "setsockopt() failed for "
- "NODELAY (%s)",
- strerror (errno));
- }
- }
+ /* NOTE:
+ * We have done away with the critical section in this function. since
+ * there's little that it helps with. There's no other code that
+ * attempts to unref the listener socket/transport from any other
+ * thread context while we are using it here.
+ */
+ priv->idx = idx;
- if (priv->keepalive &&
- new_sockaddr.ss_family != AF_UNIX) {
- ret = __socket_keepalive (new_sock,
- new_sockaddr.ss_family,
- priv->keepaliveintvl,
- priv->keepaliveidle,
- priv->keepalivecnt,
- priv->timeout);
- if (ret == -1)
- gf_log (this->name, GF_LOG_WARNING,
- "Failed to set keep-alive: %s",
- strerror (errno));
- }
+ if (poll_in) {
+ new_sock = accept (priv->sock, SA (&new_sockaddr), &addrlen);
- new_trans = GF_CALLOC (1, sizeof (*new_trans),
- gf_common_mt_rpc_trans_t);
- if (!new_trans) {
- sys_close (new_sock);
- goto unlock;
- }
+ if (new_sock == -1) {
+ gf_log (this->name, GF_LOG_WARNING,
+ "accept on %d failed (%s)",
+ priv->sock, strerror (errno));
+ goto out;
+ }
- ret = pthread_mutex_init(&new_trans->lock, NULL);
+ if (priv->nodelay && (new_sockaddr.ss_family != AF_UNIX)) {
+ ret = __socket_nodelay (new_sock);
if (ret == -1) {
gf_log (this->name, GF_LOG_WARNING,
- "pthread_mutex_init() failed: %s",
+ "setsockopt() failed for "
+ "NODELAY (%s)",
strerror (errno));
- sys_close (new_sock);
- GF_FREE (new_trans);
- goto unlock;
}
- INIT_LIST_HEAD (&new_trans->list);
+ }
- new_trans->name = gf_strdup (this->name);
+ if (priv->keepalive &&
+ new_sockaddr.ss_family != AF_UNIX) {
+ ret = __socket_keepalive (new_sock,
+ new_sockaddr.ss_family,
+ priv->keepaliveintvl,
+ priv->keepaliveidle,
+ priv->keepalivecnt,
+ priv->timeout);
+ if (ret == -1)
+ gf_log (this->name, GF_LOG_WARNING,
+ "Failed to set keep-alive: %s",
+ strerror (errno));
+ }
- memcpy (&new_trans->peerinfo.sockaddr, &new_sockaddr,
- addrlen);
- new_trans->peerinfo.sockaddr_len = addrlen;
+ new_trans = GF_CALLOC (1, sizeof (*new_trans),
+ gf_common_mt_rpc_trans_t);
+ if (!new_trans) {
+ sys_close (new_sock);
+ goto out;
+ }
- new_trans->myinfo.sockaddr_len =
- sizeof (new_trans->myinfo.sockaddr);
+ ret = pthread_mutex_init(&new_trans->lock, NULL);
+ if (ret == -1) {
+ gf_log (this->name, GF_LOG_WARNING,
+ "pthread_mutex_init() failed: %s",
+ strerror (errno));
+ sys_close (new_sock);
+ GF_FREE (new_trans);
+ goto out;
+ }
+ INIT_LIST_HEAD (&new_trans->list);
- ret = getsockname (new_sock,
- SA (&new_trans->myinfo.sockaddr),
- &new_trans->myinfo.sockaddr_len);
- if (ret == -1) {
- gf_log (this->name, GF_LOG_WARNING,
- "getsockname on %d failed (%s)",
- new_sock, strerror (errno));
- sys_close (new_sock);
- GF_FREE (new_trans->name);
- GF_FREE (new_trans);
- goto unlock;
- }
+ new_trans->name = gf_strdup (this->name);
- get_transport_identifiers (new_trans);
- ret = socket_init(new_trans);
- if (ret != 0) {
- sys_close (new_sock);
- GF_FREE (new_trans->name);
- GF_FREE (new_trans);
- goto unlock;
- }
- new_trans->ops = this->ops;
- new_trans->init = this->init;
- new_trans->fini = this->fini;
- new_trans->ctx = ctx;
- new_trans->xl = this->xl;
- new_trans->mydata = this->mydata;
- new_trans->notify = this->notify;
- new_trans->listener = this;
- new_priv = new_trans->private;
-
- if (new_sockaddr.ss_family == AF_UNIX) {
- new_priv->use_ssl = _gf_false;
- }
- else {
- switch (priv->srvr_ssl) {
+ memcpy (&new_trans->peerinfo.sockaddr, &new_sockaddr, addrlen);
+ new_trans->peerinfo.sockaddr_len = addrlen;
+
+ new_trans->myinfo.sockaddr_len = sizeof (new_trans->myinfo.sockaddr);
+
+ ret = getsockname (new_sock, SA (&new_trans->myinfo.sockaddr),
+ &new_trans->myinfo.sockaddr_len);
+ if (ret == -1) {
+ gf_log (this->name, GF_LOG_WARNING,
+ "getsockname on %d failed (%s)",
+ new_sock, strerror (errno));
+ sys_close (new_sock);
+ GF_FREE (new_trans->name);
+ GF_FREE (new_trans);
+ goto out;
+ }
+
+ get_transport_identifiers (new_trans);
+ ret = socket_init(new_trans);
+ if (ret != 0) {
+ sys_close (new_sock);
+ GF_FREE (new_trans->name);
+ GF_FREE (new_trans);
+ goto out;
+ }
+ new_trans->ops = this->ops;
+ new_trans->init = this->init;
+ new_trans->fini = this->fini;
+ new_trans->ctx = ctx;
+ new_trans->xl = this->xl;
+ new_trans->mydata = this->mydata;
+ new_trans->notify = this->notify;
+ new_trans->listener = this;
+ new_priv = new_trans->private;
+
+ if (new_sockaddr.ss_family == AF_UNIX) {
+ new_priv->use_ssl = _gf_false;
+ } else {
+ switch (priv->srvr_ssl) {
case MGMT_SSL_ALWAYS:
/* Glusterd with secure_mgmt. */
new_priv->use_ssl = _gf_true;
@@ -2792,95 +2791,127 @@ socket_server_event_handler (int fd, int idx, void *data,
break;
default:
new_priv->use_ssl = _gf_false;
- }
}
+ }
- new_priv->sock = new_sock;
- new_priv->own_thread = priv->own_thread;
-
- new_priv->ssl_ctx = priv->ssl_ctx;
- if (new_priv->use_ssl && !new_priv->own_thread) {
- cname = ssl_setup_connection(new_trans,1);
- if (!cname) {
- gf_log(this->name,GF_LOG_ERROR,
- "server setup failed");
- sys_close (new_sock);
- GF_FREE (new_trans->name);
- GF_FREE (new_trans);
- goto unlock;
- }
- this->ssl_name = cname;
- }
+ new_priv->sock = new_sock;
+ new_priv->own_thread = priv->own_thread;
+
+ new_priv->ssl_ctx = priv->ssl_ctx;
+ if (new_priv->use_ssl && !new_priv->own_thread) {
+ cname = ssl_setup_connection(new_trans, 1);
+ if (!cname) {
+ gf_log(this->name, GF_LOG_ERROR,
+ "server setup failed");
+ sys_close (new_sock);
+ GF_FREE (new_trans->name);
+ GF_FREE (new_trans);
+ goto out;
+ }
+ this->ssl_name = cname;
+ }
- if (!priv->bio && !priv->own_thread) {
- ret = __socket_nonblock (new_sock);
+ if (!priv->bio && !priv->own_thread) {
+ ret = __socket_nonblock (new_sock);
- if (ret == -1) {
- gf_log (this->name, GF_LOG_WARNING,
- "NBIO on %d failed (%s)",
- new_sock, strerror (errno));
+ if (ret == -1) {
+ gf_log (this->name, GF_LOG_WARNING,
+ "NBIO on %d failed (%s)",
+ new_sock, strerror (errno));
- sys_close (new_sock);
- GF_FREE (new_trans->name);
- GF_FREE (new_trans);
- goto unlock;
- }
+ sys_close (new_sock);
+ GF_FREE (new_trans->name);
+ GF_FREE (new_trans);
+ goto out;
}
+ }
- pthread_mutex_lock (&new_priv->lock);
- {
- /*
- * In the own_thread case, this is used to
- * indicate that we're initializing a server
- * connection.
- */
- new_priv->connected = 1;
- new_priv->is_server = _gf_true;
- rpc_transport_ref (new_trans);
-
- if (new_priv->own_thread) {
- if (pipe(new_priv->pipe) < 0) {
- gf_log(this->name, GF_LOG_ERROR,
- "could not create pipe");
- }
- ret = socket_spawn(new_trans);
- if (ret) {
- gf_log(this->name, GF_LOG_ERROR,
- "could not spawn thread");
- sys_close (new_priv->pipe[0]);
- sys_close (new_priv->pipe[1]);
- }
- } else {
- new_priv->idx =
- event_register (ctx->event_pool,
- new_sock,
- socket_event_handler,
- new_trans,
- 1, 0);
- if (new_priv->idx == -1) {
- ret = -1;
- gf_log(this->name, GF_LOG_ERROR,
- "failed to register the socket with event");
- }
- }
+ /*
+ * In the own_thread case, this is used to
+ * indicate that we're initializing a server
+ * connection.
+ */
+ new_priv->connected = 1;
+ new_priv->is_server = _gf_true;
+ rpc_transport_ref (new_trans);
+ if (new_priv->own_thread) {
+ if (pipe(new_priv->pipe) < 0) {
+ gf_log(this->name, GF_LOG_ERROR,
+ "could not create pipe");
}
- pthread_mutex_unlock (&new_priv->lock);
- if (ret == -1) {
- sys_close (new_sock);
- rpc_transport_unref (new_trans);
- goto unlock;
+ ret = socket_spawn(new_trans);
+ if (ret) {
+ gf_log(this->name, GF_LOG_ERROR,
+ "could not spawn thread");
+ sys_close (new_priv->pipe[0]);
+ sys_close (new_priv->pipe[1]);
}
+ } else {
+ /* Take a ref on the new_trans to avoid
+ * getting deleted when event_register()
+ * causes socket_event_handler() to race
+ * ahead of this path to eventually find
+ * a disconnect and unref the transport
+ */
+ rpc_transport_ref (new_trans);
- if (!priv->own_thread) {
- ret = rpc_transport_notify (this,
- RPC_TRANSPORT_ACCEPT, new_trans);
+ /* Send a notification to RPCSVC layer
+ * to save the new_trans in its service
+ * list before we register the new_sock
+ * with epoll to begin receiving notifications
+ * for data handling.
+ */
+ ret = rpc_transport_notify (this, RPC_TRANSPORT_ACCEPT, new_trans);
+
+ if (ret != -1) {
+ new_priv->idx =
+ event_register (ctx->event_pool,
+ new_sock,
+ socket_event_handler,
+ new_trans,
+ 1, 0);
+ if (new_priv->idx == -1) {
+ ret = -1;
+ gf_log(this->name, GF_LOG_ERROR,
+ "failed to register the socket "
+ "with event");
+
+ /* event_register() could have failed for some
+ * reason, implying that the new_sock cannot be
+ * added to the epoll set. If we wont get any
+ * more notifications for new_sock from epoll,
+ * then we better remove the corresponding
+ * new_trans object from the RPCSVC service list.
+ * Since we've notified RPC service of new_trans
+ * before we attempted event_register(), we better
+ * unlink the new_trans from the RPCSVC service list
+ * to cleanup the stateby sending out a DISCONNECT
+ * notification.
+ */
+ rpc_transport_notify (this, RPC_TRANSPORT_DISCONNECT, new_trans);
+ }
}
+
+ /* this rpc_transport_unref() is for managing race between
+ * 1. socket_server_event_handler and
+ * 2. socket_event_handler
+ * trying to add and remove new_trans from the rpcsvc
+ * service list
+ * now that we are done with the notifications, lets
+ * reduce the reference
+ */
+ rpc_transport_unref (new_trans);
}
- }
-unlock:
- pthread_mutex_unlock (&priv->lock);
+ if (ret == -1) {
+ sys_close (new_sock);
+ /* this unref is to actually cause the destruction of
+ * the new_trans since we've failed at everything so far
+ */
+ rpc_transport_unref (new_trans);
+ }
+ }
out:
if (cname && (cname != this->ssl_name)) {
GF_FREE(cname);