summaryrefslogtreecommitdiffstats
path: root/rpc
diff options
context:
space:
mode:
authorAmar Tumballi <amar@gluster.com>2010-08-27 06:45:38 +0000
committerVijay Bellur <vijay@dev.gluster.com>2010-08-27 05:53:56 -0700
commit8071909e84b6a479a6b5be1eddd15e8b16fc1a80 (patch)
treedb3a56cef557a3c86fd983e778927c70a8665363 /rpc
parentfd282c4299a5354aa272345e312ac600d9dcc680 (diff)
rpc: server to client callback mechanism
Signed-off-by: Amar Tumballi <amar@gluster.com> Signed-off-by: Vijay Bellur <vijay@dev.gluster.com>
Diffstat (limited to 'rpc')
-rw-r--r--rpc/rpc-lib/src/protocol-common.h18
-rw-r--r--rpc/rpc-lib/src/rpc-clnt.c82
-rw-r--r--rpc/rpc-lib/src/rpc-clnt.h43
-rw-r--r--rpc/rpc-lib/src/rpc-transport.h3
-rw-r--r--rpc/rpc-lib/src/rpcsvc.c185
-rw-r--r--rpc/rpc-lib/src/rpcsvc.h11
-rw-r--r--rpc/rpc-transport/socket/src/socket.c2
7 files changed, 337 insertions, 7 deletions
diff --git a/rpc/rpc-lib/src/protocol-common.h b/rpc/rpc-lib/src/protocol-common.h
index 4df5a554fec..fdb42dfe663 100644
--- a/rpc/rpc-lib/src/protocol-common.h
+++ b/rpc/rpc-lib/src/protocol-common.h
@@ -140,6 +140,13 @@ enum gf_probe_resp {
GF_PROBE_FRIEND,
};
+enum gf_cbk_procnum {
+ GF_CBK_NULL = 0,
+ GF_CBK_FETCHSPEC,
+ GF_CBK_INO_FLUSH,
+ GF_CBK_MAXVALUE,
+};
+
#define GLUSTER3_1_FOP_PROGRAM 1298437 /* Completely random */
#define GLUSTER3_1_FOP_VERSION 310 /* 3.1.0 */
#define GLUSTER3_1_FOP_PROCCNT GFS3_OP_MAXVALUE
@@ -152,10 +159,13 @@ enum gf_probe_resp {
#define GLUSTER3_1_CLI_VERSION 1 /* 0.0.1 */
#define GLUSTER3_1_CLI_PROCCNT GF1_CLI_MAXVALUE
-#define GLUSTER_HNDSK_PROGRAM 14398633 /* Completely random */
-#define GLUSTER_HNDSK_VERSION 1 /* 0.0.1 */
+#define GLUSTER_HNDSK_PROGRAM 14398633 /* Completely random */
+#define GLUSTER_HNDSK_VERSION 1 /* 0.0.1 */
+
+#define GLUSTER_PMAP_PROGRAM 34123456
+#define GLUSTER_PMAP_VERSION 1
-#define GLUSTER_PMAP_PROGRAM 34123456
-#define GLUSTER_PMAP_VERSION 1
+#define GLUSTER_CBK_PROGRAM 52743234 /* Completely random */
+#define GLUSTER_CBK_VERSION 1 /* 0.0.1 */
#endif /* !_PROTOCOL_COMMON_H */
diff --git a/rpc/rpc-lib/src/rpc-clnt.c b/rpc/rpc-lib/src/rpc-clnt.c
index eac9f875066..8d923ed5f43 100644
--- a/rpc/rpc-lib/src/rpc-clnt.c
+++ b/rpc/rpc-lib/src/rpc-clnt.c
@@ -31,6 +31,7 @@
#include "rpc-transport.h"
#include "protocol-common.h"
#include "mem-pool.h"
+#include "xdr-rpc.h"
void
rpc_clnt_reply_deinit (struct rpc_req *req, struct mem_pool *pool);
@@ -653,6 +654,49 @@ out:
return ret;
}
+int
+rpc_clnt_handle_cbk (struct rpc_clnt *clnt, rpc_transport_pollin_t *msg)
+{
+ char *msgbuf = NULL;
+ rpcclnt_cb_program_t *program = NULL;
+ struct rpc_msg rpcmsg;
+ struct iovec progmsg; /* RPC Program payload */
+ size_t msglen = 0;
+ int found = 0;
+ int ret = -1;
+ int procnum = 0;
+
+ msgbuf = msg->vector[0].iov_base;
+ msglen = msg->vector[0].iov_len;
+
+ ret = xdr_to_rpc_call (msgbuf, msglen, &rpcmsg, &progmsg, NULL,NULL);
+ if (ret == -1) {
+ gf_log ("rpc-clnt", GF_LOG_ERROR, "RPC call decoding failed");
+ goto out;
+ }
+
+ gf_log ("rpc-clnt", GF_LOG_INFO, "RPC XID: %lx, Ver: %ld, Program: %ld,"
+ " ProgVers: %ld, Proc: %ld", rpc_call_xid (&rpcmsg),
+ rpc_call_rpcvers (&rpcmsg), rpc_call_program (&rpcmsg),
+ rpc_call_progver (&rpcmsg), rpc_call_progproc (&rpcmsg));
+
+ procnum = rpc_call_progproc (&rpcmsg);
+
+ list_for_each_entry (program, &clnt->programs, program) {
+ if ((program->prognum == rpc_call_program (&rpcmsg))
+ && (program->progver == rpc_call_progver (&rpcmsg))) {
+ found = 1;
+ break;
+ }
+ }
+ if (found && (procnum < program->numactors) &&
+ (program->actors[procnum].actor)) {
+ program->actors[procnum].actor (&progmsg);
+ }
+
+out:
+ return ret;
+}
int
rpc_clnt_handle_reply (struct rpc_clnt *clnt, rpc_transport_pollin_t *pollin)
@@ -796,7 +840,10 @@ rpc_clnt_notify (rpc_transport_t *trans, void *mydata,
case RPC_TRANSPORT_MSG_RECEIVED:
{
pollin = data;
- ret = rpc_clnt_handle_reply (clnt, pollin);
+ if (pollin->is_reply)
+ ret = rpc_clnt_handle_reply (clnt, pollin);
+ else
+ ret = rpc_clnt_handle_cbk (clnt, pollin);
/* ret = clnt->notifyfn (clnt, clnt->mydata, RPC_CLNT_MSG,
* data);
*/
@@ -943,6 +990,8 @@ rpc_clnt_init (struct rpc_clnt_config *config, dict_t *options,
goto out;
}
+ INIT_LIST_HEAD (&rpc->programs);
+
out:
return rpc;
}
@@ -1168,6 +1217,37 @@ out:
return request_iob;
}
+int
+rpcclnt_cbk_program_register (struct rpc_clnt *clnt,
+ rpcclnt_cb_program_t *program)
+{
+ int ret = -1;
+
+ if (!clnt)
+ goto out;
+
+ if (program->actors == NULL)
+ goto out;
+
+ INIT_LIST_HEAD (&program->program);
+
+ list_add_tail (&program->program, &clnt->programs);
+
+ ret = 0;
+ gf_log ("rpc-clnt", GF_LOG_DEBUG, "New program registered: %s, Num: %d,"
+ " Ver: %d", program->progname, program->prognum,
+ program->progver);
+
+out:
+ if (ret == -1) {
+ gf_log ("rpc-clnt", GF_LOG_ERROR, "Program registration failed:"
+ " %s, Num: %d, Ver: %d", program->progname,
+ program->prognum, program->progver);
+ }
+
+ return ret;
+}
+
int
rpc_clnt_submit (struct rpc_clnt *rpc, rpc_clnt_prog_t *prog,
diff --git a/rpc/rpc-lib/src/rpc-clnt.h b/rpc/rpc-lib/src/rpc-clnt.h
index 2381aaa087c..a0251c7c551 100644
--- a/rpc/rpc-lib/src/rpc-clnt.h
+++ b/rpc/rpc-lib/src/rpc-clnt.h
@@ -83,6 +83,38 @@ typedef struct rpc_clnt_program {
int numproc;
} rpc_clnt_prog_t;
+typedef int (*rpcclnt_cb_fn) (void *data);
+
+/* The descriptor for each procedure/actor that runs
+ * over the RPC service.
+ */
+typedef struct rpcclnt_actor_desc {
+ char procname[32];
+ int procnum;
+ rpcclnt_cb_fn actor;
+} rpcclnt_cb_actor_t;
+
+/* Describes a program and its version along with the function pointers
+ * required to handle the procedures/actors of each program/version.
+ * Never changed ever by any thread so no need for a lock.
+ */
+typedef struct rpcclnt_cb_program {
+ char progname[32];
+ int prognum;
+ int progver;
+ rpcclnt_cb_actor_t *actors; /* All procedure handlers */
+ int numactors; /* Num actors in actor array */
+
+ /* Program specific state handed to actors */
+ void *private;
+
+
+ /* list member to link to list of registered services with rpc_clnt */
+ struct list_head program;
+} rpcclnt_cb_program_t;
+
+
+
#define RPC_MAX_AUTH_BYTES 400
typedef struct rpc_auth_data {
int flavour;
@@ -141,6 +173,9 @@ struct rpc_clnt {
void *mydata;
uint64_t xid;
+ /* list of cb programs registered with rpc-clnt */
+ struct list_head programs;
+
/* Memory pool for rpc_req_t */
struct mem_pool *reqpool;
@@ -171,7 +206,7 @@ int rpc_clnt_register_notify (struct rpc_clnt *rpc, rpc_clnt_notify_t fn,
* also be filled with pointer to buffer to hold header and length
* of the header.
*/
-
+
int rpc_clnt_submit (struct rpc_clnt *rpc, rpc_clnt_prog_t *prog,
int procnum, fop_cbk_fn_t cbkfn,
struct iovec *proghdr, int proghdrcount,
@@ -190,4 +225,10 @@ void rpc_clnt_reconnect (void *trans_ptr);
void rpc_clnt_reconfig (struct rpc_clnt *rpc, struct rpc_clnt_config *config);
+/* All users of RPC services should use this API to register their
+ * procedure handlers.
+ */
+int rpcclnt_cbk_program_register (struct rpc_clnt *svc,
+ rpcclnt_cb_program_t *program);
+
#endif /* !_RPC_CLNT_H */
diff --git a/rpc/rpc-lib/src/rpc-transport.h b/rpc/rpc-lib/src/rpc-transport.h
index c5b6f382e7b..cccae5f261d 100644
--- a/rpc/rpc-lib/src/rpc-transport.h
+++ b/rpc/rpc-lib/src/rpc-transport.h
@@ -166,6 +166,7 @@ struct rpc_transport_pollin {
char vectored;
void *private;
struct iobref *iobref;
+ char is_reply;
};
typedef struct rpc_transport_pollin rpc_transport_pollin_t;
@@ -196,6 +197,8 @@ struct rpc_transport {
void *notify_data;
peer_info_t peerinfo;
peer_info_t myinfo;
+
+ struct list_head list;
};
struct rpc_transport_ops {
diff --git a/rpc/rpc-lib/src/rpcsvc.c b/rpc/rpc-lib/src/rpcsvc.c
index 5b5c2998c5e..ee3d674c2eb 100644
--- a/rpc/rpc-lib/src/rpcsvc.c
+++ b/rpc/rpc-lib/src/rpcsvc.c
@@ -46,6 +46,8 @@
#include <stdarg.h>
#include <stdio.h>
+#include "xdr-rpcclnt.h"
+
struct rpcsvc_program gluster_dump_prog;
#define rpcsvc_alloc_request(svc, request) \
@@ -1119,6 +1121,189 @@ err:
return txrecord;
}
+inline int
+rpcsvc_get_callid (rpcsvc_t *rpc)
+{
+ return GF_UNIVERSAL_ANSWER;
+}
+
+int
+rpcsvc_fill_callback (int prognum, int progver, int procnum, int payload,
+ uint64_t xid, struct rpc_msg *request)
+{
+ int ret = -1;
+
+ if (!request) {
+ goto out;
+ }
+
+ memset (request, 0, sizeof (*request));
+
+ request->rm_xid = xid;
+ request->rm_direction = CALL;
+
+ request->rm_call.cb_rpcvers = 2;
+ request->rm_call.cb_prog = prognum;
+ request->rm_call.cb_vers = progver;
+ request->rm_call.cb_proc = procnum;
+
+ request->rm_call.cb_cred.oa_flavor = AUTH_NONE;
+ request->rm_call.cb_cred.oa_base = NULL;
+ request->rm_call.cb_cred.oa_length = 0;
+
+ request->rm_call.cb_verf.oa_flavor = AUTH_NONE;
+ request->rm_call.cb_verf.oa_base = NULL;
+ request->rm_call.cb_verf.oa_length = 0;
+
+ ret = 0;
+out:
+ return ret;
+}
+
+
+struct iovec
+rpcsvc_callback_build_header (char *recordstart, size_t rlen,
+ struct rpc_msg *request, size_t payload)
+{
+ struct iovec requesthdr = {0, };
+ struct iovec txrecord = {0, 0};
+ int ret = -1;
+ size_t fraglen = 0;
+
+ ret = rpc_request_to_xdr (request, recordstart, rlen, &requesthdr);
+ if (ret == -1) {
+ gf_log ("rpcsvc", GF_LOG_DEBUG,
+ "Failed to create RPC request");
+ goto out;
+ }
+
+ fraglen = payload + requesthdr.iov_len;
+ gf_log ("rpcsvc", GF_LOG_TRACE, "Request fraglen %zu, payload: %zu, "
+ "rpc hdr: %zu", fraglen, payload, requesthdr.iov_len);
+
+ txrecord.iov_base = recordstart;
+
+ /* Remember, this is only the vec for the RPC header and does not
+ * include the payload above. We needed the payload only to calculate
+ * the size of the full fragment. This size is sent in the fragment
+ * header.
+ */
+ txrecord.iov_len = requesthdr.iov_len;
+
+out:
+ return txrecord;
+}
+
+struct iobuf *
+rpcsvc_callback_build_record (rpcsvc_t *rpc, int prognum, int progver,
+ int procnum, size_t payload, uint64_t xid,
+ struct iovec *recbuf)
+{
+ struct rpc_msg request = {0, };
+ struct iobuf *request_iob = NULL;
+ char *record = NULL;
+ struct iovec recordhdr = {0, };
+ size_t pagesize = 0;
+ int ret = -1;
+
+ if ((!rpc) || (!recbuf)) {
+ goto out;
+ }
+
+ /* First, try to get a pointer into the buffer which the RPC
+ * layer can use.
+ */
+ request_iob = iobuf_get (rpc->ctx->iobuf_pool);
+ if (!request_iob) {
+ gf_log ("rpcsvc", GF_LOG_ERROR, "Failed to get iobuf");
+ goto out;
+ }
+
+ pagesize = ((struct iobuf_pool *)rpc->ctx->iobuf_pool)->page_size;
+
+ record = iobuf_ptr (request_iob); /* Now we have it. */
+
+ /* Fill the rpc structure and XDR it into the buffer got above. */
+ ret = rpcsvc_fill_callback (prognum, progver, procnum, payload, xid,
+ &request);
+ if (ret == -1) {
+ gf_log ("rpcsvc", GF_LOG_DEBUG, "cannot build a rpc-request "
+ "xid (%"PRIu64")", xid);
+ goto out;
+ }
+
+ recordhdr = rpcsvc_callback_build_header (record, pagesize, &request,
+ payload);
+
+ if (!recordhdr.iov_base) {
+ gf_log ("rpc-clnt", GF_LOG_ERROR, "Failed to build record "
+ " header");
+ iobuf_unref (request_iob);
+ request_iob = NULL;
+ recbuf->iov_base = NULL;
+ goto out;
+ }
+
+ recbuf->iov_base = recordhdr.iov_base;
+ recbuf->iov_len = recordhdr.iov_len;
+
+out:
+ return request_iob;
+}
+
+int
+rpcsvc_callback_submit (rpcsvc_t *rpc, rpc_transport_t *trans,
+ rpcsvc_cbk_program_t *prog, int procnum,
+ struct iovec *proghdr, int proghdrcount)
+{
+ struct iobuf *request_iob = NULL;
+ struct iovec rpchdr = {0,};
+ rpc_transport_req_t req;
+ int ret = -1;
+ int proglen = 0;
+ uint64_t callid = 0;
+
+ if (!rpc) {
+ goto out;
+ }
+
+ memset (&req, 0, sizeof (req));
+
+ callid = rpcsvc_get_callid (rpc);
+
+ if (proghdr) {
+ proglen += iov_length (proghdr, proghdrcount);
+ }
+
+ request_iob = rpcsvc_callback_build_record (rpc, prog->prognum,
+ prog->progver, procnum,
+ proglen, callid,
+ &rpchdr);
+ if (!request_iob) {
+ gf_log ("rpcsvc", GF_LOG_DEBUG,
+ "cannot build rpc-record");
+ goto out;
+ }
+
+ req.msg.rpchdr = &rpchdr;
+ req.msg.rpchdrcount = 1;
+ req.msg.proghdr = proghdr;
+ req.msg.proghdrcount = proghdrcount;
+
+ ret = rpc_transport_submit_request (trans, &req);
+ if (ret == -1) {
+ gf_log ("rpc-clnt", GF_LOG_DEBUG,
+ "transmission of rpc-request failed");
+ goto out;
+ }
+
+ ret = 0;
+
+out:
+ iobuf_unref (request_iob);
+
+ return ret;
+}
inline int
rpcsvc_transport_submit (rpc_transport_t *trans, struct iovec *hdrvec,
diff --git a/rpc/rpc-lib/src/rpcsvc.h b/rpc/rpc-lib/src/rpcsvc.h
index 10b20af0a88..fca7d047a7b 100644
--- a/rpc/rpc-lib/src/rpcsvc.h
+++ b/rpc/rpc-lib/src/rpcsvc.h
@@ -375,7 +375,11 @@ struct rpcsvc_program {
struct list_head program;
};
-
+typedef struct rpcsvc_cbk_program {
+ char *progname;
+ int prognum;
+ int progver;
+} rpcsvc_cbk_program_t;
/* All users of RPC services should use this API to register their
* procedure handlers.
*/
@@ -525,4 +529,9 @@ rpcsvc_combine_gen_spec_volume_checks (int gen, int spec);
extern char *
rpcsvc_volume_allowed (dict_t *options, char *volname);
+
+int rpcsvc_callback_submit (rpcsvc_t *rpc, rpc_transport_t *trans,
+ rpcsvc_cbk_program_t *prog, int procnum,
+ struct iovec *proghdr, int proghdrcount);
+
#endif
diff --git a/rpc/rpc-transport/socket/src/socket.c b/rpc/rpc-transport/socket/src/socket.c
index 6d2d584d111..26b56fad577 100644
--- a/rpc/rpc-transport/socket/src/socket.c
+++ b/rpc/rpc-transport/socket/src/socket.c
@@ -1477,6 +1477,8 @@ __socket_proto_state_machine (rpc_transport_t *this,
ret = -1;
goto out;
}
+ if (priv->incoming.msg_type == REPLY)
+ (*pollin)->is_reply = 1;
priv->incoming.request_info = NULL;
}