diff options
Diffstat (limited to 'rpc')
-rw-r--r-- | rpc/rpc-lib/src/rpc-transport.c | 2 | ||||
-rw-r--r-- | rpc/rpc-lib/src/rpc-transport.h | 23 | ||||
-rw-r--r-- | rpc/rpc-lib/src/rpcsvc.c | 5 | ||||
-rw-r--r-- | rpc/rpc-transport/socket/src/socket.c | 41 |
4 files changed, 49 insertions, 22 deletions
diff --git a/rpc/rpc-lib/src/rpc-transport.c b/rpc/rpc-lib/src/rpc-transport.c index f9cbdf133c7..ce984426cbe 100644 --- a/rpc/rpc-lib/src/rpc-transport.c +++ b/rpc/rpc-lib/src/rpc-transport.c @@ -144,6 +144,8 @@ rpc_transport_pollin_alloc(rpc_transport_t *this, struct iovec *vector, goto out; } + msg->trans = this; + if (count > 1) { msg->vectored = 1; } diff --git a/rpc/rpc-lib/src/rpc-transport.h b/rpc/rpc-lib/src/rpc-transport.h index 9e75d1a2bbb..6830279f07e 100644 --- a/rpc/rpc-lib/src/rpc-transport.h +++ b/rpc/rpc-lib/src/rpc-transport.h @@ -58,6 +58,7 @@ typedef struct rpc_transport rpc_transport_t; #include <glusterfs/dict.h> #include <glusterfs/compat.h> +#include <glusterfs/async.h> #include "rpcsvc-common.h" struct peer_info { @@ -155,16 +156,6 @@ struct rpc_request_info { }; typedef struct rpc_request_info rpc_request_info_t; -struct rpc_transport_pollin { - int count; - void *private; - struct iobref *iobref; - struct iovec vector[MAX_IOVEC]; - char is_reply; - char vectored; -}; -typedef struct rpc_transport_pollin rpc_transport_pollin_t; - typedef int (*rpc_transport_notify_t)(rpc_transport_t *, void *mydata, rpc_transport_event_t, void *data, ...); @@ -217,6 +208,18 @@ struct rpc_transport { gf_atomic_t disconnect_progress; }; +struct rpc_transport_pollin { + struct rpc_transport *trans; + int count; + void *private; + struct iobref *iobref; + struct iovec vector[MAX_IOVEC]; + char is_reply; + char vectored; + gf_async_t async; +}; +typedef struct rpc_transport_pollin rpc_transport_pollin_t; + struct rpc_transport_ops { /* no need of receive op, msg will be delivered through an event * notification diff --git a/rpc/rpc-lib/src/rpcsvc.c b/rpc/rpc-lib/src/rpcsvc.c index 74373c44f91..c38a675b8c2 100644 --- a/rpc/rpc-lib/src/rpcsvc.c +++ b/rpc/rpc-lib/src/rpcsvc.c @@ -2314,6 +2314,11 @@ rpcsvc_program_register(rpcsvc_t *svc, rpcsvc_program_t *program, newprog->alive = _gf_true; + if (gf_async_ctrl.enabled) { + newprog->ownthread = _gf_false; + newprog->synctask = _gf_false; + } + /* make sure synctask gets priority over ownthread */ if (newprog->synctask) newprog->ownthread = _gf_false; diff --git a/rpc/rpc-transport/socket/src/socket.c b/rpc/rpc-transport/socket/src/socket.c index fa0e0f20901..26dbe0b706a 100644 --- a/rpc/rpc-transport/socket/src/socket.c +++ b/rpc/rpc-transport/socket/src/socket.c @@ -2495,6 +2495,33 @@ out: return ret; } +static void +socket_event_poll_in_async(xlator_t *xl, gf_async_t *async) +{ + rpc_transport_pollin_t *pollin; + rpc_transport_t *this; + socket_private_t *priv; + + pollin = caa_container_of(async, rpc_transport_pollin_t, async); + this = pollin->trans; + priv = this->private; + + rpc_transport_notify(this, RPC_TRANSPORT_MSG_RECEIVED, pollin); + + rpc_transport_unref(this); + + rpc_transport_pollin_destroy(pollin); + + pthread_mutex_lock(&priv->notify.lock); + { + --priv->notify.in_progress; + + if (!priv->notify.in_progress) + pthread_cond_signal(&priv->notify.cond); + } + pthread_mutex_unlock(&priv->notify.lock); +} + static int socket_event_poll_in(rpc_transport_t *this, gf_boolean_t notify_handled) { @@ -2519,18 +2546,8 @@ socket_event_poll_in(rpc_transport_t *this, gf_boolean_t notify_handled) event_handled(ctx->event_pool, priv->sock, priv->idx, priv->gen); if (pollin) { - rpc_transport_notify(this, RPC_TRANSPORT_MSG_RECEIVED, pollin); - - rpc_transport_pollin_destroy(pollin); - - pthread_mutex_lock(&priv->notify.lock); - { - --priv->notify.in_progress; - - if (!priv->notify.in_progress) - pthread_cond_signal(&priv->notify.cond); - } - pthread_mutex_unlock(&priv->notify.lock); + rpc_transport_ref(this); + gf_async(&pollin->async, THIS, socket_event_poll_in_async); } return ret; |