summaryrefslogtreecommitdiffstats
path: root/rpc
diff options
context:
space:
mode:
Diffstat (limited to 'rpc')
-rw-r--r--rpc/rpc-lib/src/rpc-transport.c2
-rw-r--r--rpc/rpc-lib/src/rpc-transport.h23
-rw-r--r--rpc/rpc-lib/src/rpcsvc.c5
-rw-r--r--rpc/rpc-transport/socket/src/socket.c41
4 files changed, 49 insertions, 22 deletions
diff --git a/rpc/rpc-lib/src/rpc-transport.c b/rpc/rpc-lib/src/rpc-transport.c
index f9cbdf133c7..ce984426cbe 100644
--- a/rpc/rpc-lib/src/rpc-transport.c
+++ b/rpc/rpc-lib/src/rpc-transport.c
@@ -144,6 +144,8 @@ rpc_transport_pollin_alloc(rpc_transport_t *this, struct iovec *vector,
goto out;
}
+ msg->trans = this;
+
if (count > 1) {
msg->vectored = 1;
}
diff --git a/rpc/rpc-lib/src/rpc-transport.h b/rpc/rpc-lib/src/rpc-transport.h
index 9e75d1a2bbb..6830279f07e 100644
--- a/rpc/rpc-lib/src/rpc-transport.h
+++ b/rpc/rpc-lib/src/rpc-transport.h
@@ -58,6 +58,7 @@ typedef struct rpc_transport rpc_transport_t;
#include <glusterfs/dict.h>
#include <glusterfs/compat.h>
+#include <glusterfs/async.h>
#include "rpcsvc-common.h"
struct peer_info {
@@ -155,16 +156,6 @@ struct rpc_request_info {
};
typedef struct rpc_request_info rpc_request_info_t;
-struct rpc_transport_pollin {
- int count;
- void *private;
- struct iobref *iobref;
- struct iovec vector[MAX_IOVEC];
- char is_reply;
- char vectored;
-};
-typedef struct rpc_transport_pollin rpc_transport_pollin_t;
-
typedef int (*rpc_transport_notify_t)(rpc_transport_t *, void *mydata,
rpc_transport_event_t, void *data, ...);
@@ -217,6 +208,18 @@ struct rpc_transport {
gf_atomic_t disconnect_progress;
};
+struct rpc_transport_pollin {
+ struct rpc_transport *trans;
+ int count;
+ void *private;
+ struct iobref *iobref;
+ struct iovec vector[MAX_IOVEC];
+ char is_reply;
+ char vectored;
+ gf_async_t async;
+};
+typedef struct rpc_transport_pollin rpc_transport_pollin_t;
+
struct rpc_transport_ops {
/* no need of receive op, msg will be delivered through an event
* notification
diff --git a/rpc/rpc-lib/src/rpcsvc.c b/rpc/rpc-lib/src/rpcsvc.c
index 74373c44f91..c38a675b8c2 100644
--- a/rpc/rpc-lib/src/rpcsvc.c
+++ b/rpc/rpc-lib/src/rpcsvc.c
@@ -2314,6 +2314,11 @@ rpcsvc_program_register(rpcsvc_t *svc, rpcsvc_program_t *program,
newprog->alive = _gf_true;
+ if (gf_async_ctrl.enabled) {
+ newprog->ownthread = _gf_false;
+ newprog->synctask = _gf_false;
+ }
+
/* make sure synctask gets priority over ownthread */
if (newprog->synctask)
newprog->ownthread = _gf_false;
diff --git a/rpc/rpc-transport/socket/src/socket.c b/rpc/rpc-transport/socket/src/socket.c
index fa0e0f20901..26dbe0b706a 100644
--- a/rpc/rpc-transport/socket/src/socket.c
+++ b/rpc/rpc-transport/socket/src/socket.c
@@ -2495,6 +2495,33 @@ out:
return ret;
}
+static void
+socket_event_poll_in_async(xlator_t *xl, gf_async_t *async)
+{
+ rpc_transport_pollin_t *pollin;
+ rpc_transport_t *this;
+ socket_private_t *priv;
+
+ pollin = caa_container_of(async, rpc_transport_pollin_t, async);
+ this = pollin->trans;
+ priv = this->private;
+
+ rpc_transport_notify(this, RPC_TRANSPORT_MSG_RECEIVED, pollin);
+
+ rpc_transport_unref(this);
+
+ rpc_transport_pollin_destroy(pollin);
+
+ pthread_mutex_lock(&priv->notify.lock);
+ {
+ --priv->notify.in_progress;
+
+ if (!priv->notify.in_progress)
+ pthread_cond_signal(&priv->notify.cond);
+ }
+ pthread_mutex_unlock(&priv->notify.lock);
+}
+
static int
socket_event_poll_in(rpc_transport_t *this, gf_boolean_t notify_handled)
{
@@ -2519,18 +2546,8 @@ socket_event_poll_in(rpc_transport_t *this, gf_boolean_t notify_handled)
event_handled(ctx->event_pool, priv->sock, priv->idx, priv->gen);
if (pollin) {
- rpc_transport_notify(this, RPC_TRANSPORT_MSG_RECEIVED, pollin);
-
- rpc_transport_pollin_destroy(pollin);
-
- pthread_mutex_lock(&priv->notify.lock);
- {
- --priv->notify.in_progress;
-
- if (!priv->notify.in_progress)
- pthread_cond_signal(&priv->notify.cond);
- }
- pthread_mutex_unlock(&priv->notify.lock);
+ rpc_transport_ref(this);
+ gf_async(&pollin->async, THIS, socket_event_poll_in_async);
}
return ret;