summaryrefslogtreecommitdiffstats
path: root/rpc
diff options
context:
space:
mode:
authorRaghavendra G <raghavendra@gluster.com>2010-07-28 06:23:31 +0000
committerAnand V. Avati <avati@dev.gluster.com>2010-07-28 05:08:26 -0700
commit40d3ad15856c88d93d16264aa1f6bb55806aafde (patch)
tree1290d311c9001e3954176f005b89a2e438321bd9 /rpc
parentb8692a3c3cc8e0dab404664e0aeb6ebaea6ab6e5 (diff)
changes to rpc
- use mem-pool for requests and saved_frames. - preserve the rpc_req structure till rpc invokes program's reply. This will enable us to store transport specific data that has to last till reply has come (eg., memory regions of chunk lists in case of rdma). - change signature of rpc_clnt_submit to accept rsphdr_vector and rsppayload_vector. The buffers pointed by these vectors will be from iobufs and these iobufs are added to an iobref which should also be passed as an arguement to rpc_clnt_submit. Signed-off-by: Raghavendra G <raghavendra@gluster.com> Signed-off-by: Anand V. Avati <avati@dev.gluster.com> BUG: 875 (Implement a new protocol to provide proper backward/forward compatibility) URL: http://bugs.gluster.com/cgi-bin/bugzilla3/show_bug.cgi?id=875
Diffstat (limited to 'rpc')
-rw-r--r--rpc/rpc-lib/src/rpc-clnt.c219
-rw-r--r--rpc/rpc-lib/src/rpc-clnt.h36
-rw-r--r--rpc/rpc-lib/src/rpc-transport.c96
-rw-r--r--rpc/rpc-lib/src/rpc-transport.h44
-rw-r--r--rpc/rpc-lib/src/rpcsvc.c33
-rw-r--r--rpc/rpc-lib/src/rpcsvc.h20
-rw-r--r--rpc/rpc-lib/src/xdr-common.h8
-rw-r--r--rpc/rpc-transport/socket/src/socket.c129
-rw-r--r--rpc/rpc-transport/socket/src/socket.h4
9 files changed, 359 insertions, 230 deletions
diff --git a/rpc/rpc-lib/src/rpc-clnt.c b/rpc/rpc-lib/src/rpc-clnt.c
index 9b0bfe33d5b..fce3e8200fe 100644
--- a/rpc/rpc-lib/src/rpc-clnt.c
+++ b/rpc/rpc-lib/src/rpc-clnt.c
@@ -23,10 +23,16 @@
#include "config.h"
#endif
+#define RPC_CLNT_DEFAULT_REQUEST_COUNT 4096
+
#include "rpc-clnt.h"
#include "xdr-rpcclnt.h"
#include "rpc-transport.h"
#include "protocol-common.h"
+#include "mem-pool.h"
+
+void
+rpc_clnt_reply_deinit (struct rpc_req *req, struct mem_pool *pool);
uint64_t
rpc_clnt_new_callid (struct rpc_clnt *clnt)
@@ -63,28 +69,24 @@ __saved_frames_get_timedout (struct saved_frames *frames, uint32_t timeout,
struct saved_frame *
-__saved_frames_put (struct saved_frames *frames, void *frame, int32_t procnum,
- rpc_clnt_prog_t *prog, fop_cbk_fn_t cbk, int64_t callid)
+__saved_frames_put (struct saved_frames *frames, void *frame,
+ struct rpc_req *rpcreq)
{
struct saved_frame *saved_frame = NULL;
- saved_frame = GF_CALLOC (1, sizeof (*saved_frame),
- gf_common_mt_rpcclnt_savedframe_t);
+ saved_frame = mem_get (rpcreq->conn->rpc_clnt->saved_frames_pool);
if (!saved_frame) {
gf_log ("rpc-clnt", GF_LOG_ERROR, "out of memory");
goto out;
}
/* THIS should be saved and set back */
+ memset (saved_frame, 0, sizeof (*saved_frame));
INIT_LIST_HEAD (&saved_frame->list);
saved_frame->capital_this = THIS;
saved_frame->frame = frame;
- saved_frame->procnum = procnum;
- saved_frame->callid = callid;
- saved_frame->prog = prog;
- saved_frame->cbkfn = cbk;
-
+ saved_frame->rpcreq = rpcreq;
gettimeofday (&saved_frame->saved_at, NULL);
list_add_tail (&saved_frame->list, &frames->sf.list);
@@ -110,7 +112,12 @@ saved_frames_delete (struct saved_frame *saved_frame,
}
pthread_mutex_unlock (&conn->lock);
- GF_FREE (saved_frame);
+ if (saved_frame->rpcreq != NULL) {
+ rpc_clnt_reply_deinit (saved_frame->rpcreq,
+ conn->rpc_clnt->reqpool);
+ }
+
+ mem_put (conn->rpc_clnt->saved_frames_pool, saved_frame);
out:
return;
}
@@ -129,7 +136,6 @@ call_bail (void *data)
struct tm frame_sent_tm;
char frame_sent[32] = {0,};
struct timeval timeout = {0,};
- struct rpc_req req;
struct iovec iov = {0,};
GF_VALIDATE_OR_GOTO ("client", data, out);
@@ -180,15 +186,19 @@ call_bail (void *data)
gf_log (conn->trans->name, GF_LOG_ERROR,
"bailing out frame type(%s) op(%s(%d)) sent = %s. "
"timeout = %d",
- trav->prog->progname, (trav->prog->procnames) ?
- trav->prog->procnames[trav->procnum] : "--",
- trav->procnum, frame_sent,
+ trav->rpcreq->prog->progname,
+ (trav->rpcreq->prog->procnames) ?
+ trav->rpcreq->prog->procnames[trav->rpcreq->procnum] :
+ "--",
+ trav->rpcreq->procnum, frame_sent,
conn->frame_timeout);
- trav->cbkfn (&req, &iov, 1, trav->frame);
+ trav->rpcreq->rpc_status = -1;
+ trav->rpcreq->cbkfn (trav->rpcreq, &iov, 1, trav->frame);
+ rpc_clnt_reply_deinit (trav->rpcreq, clnt->reqpool);
list_del_init (&trav->list);
- GF_FREE (trav);
+ mem_put (conn->rpc_clnt->saved_frames_pool, trav);
}
out:
return;
@@ -197,8 +207,8 @@ out:
/* to be called with conn->lock held */
struct saved_frame *
-__save_frame (struct rpc_clnt *rpc_clnt, call_frame_t *frame, int procnum,
- rpc_clnt_prog_t *prog, fop_cbk_fn_t cbk, uint64_t callid)
+__save_frame (struct rpc_clnt *rpc_clnt, call_frame_t *frame,
+ struct rpc_req *rpcreq)
{
rpc_clnt_connection_t *conn = NULL;
struct timeval timeout = {0, };
@@ -206,8 +216,8 @@ __save_frame (struct rpc_clnt *rpc_clnt, call_frame_t *frame, int procnum,
conn = &rpc_clnt->conn;
- saved_frame = __saved_frames_put (conn->saved_frames, frame,
- procnum, prog, cbk, callid);
+ saved_frame = __saved_frames_put (conn->saved_frames, frame, rpcreq);
+
if (saved_frame == NULL) {
goto out;
}
@@ -258,7 +268,7 @@ __saved_frame_copy (struct saved_frames *frames, int64_t callid,
}
list_for_each_entry (tmp, &frames->sf.list, list) {
- if (tmp->callid == callid) {
+ if (tmp->rpcreq->xid == callid) {
*saved_frame = *tmp;
ret = 0;
break;
@@ -277,7 +287,7 @@ __saved_frame_get (struct saved_frames *frames, int64_t callid)
struct saved_frame *tmp = NULL;
list_for_each_entry (tmp, &frames->sf.list, list) {
- if (tmp->callid == callid) {
+ if (tmp->rpcreq->xid == callid) {
list_del_init (&tmp->list);
frames->count--;
saved_frame = tmp;
@@ -300,32 +310,34 @@ saved_frames_unwind (struct saved_frames *saved_frames)
struct tm *frame_sent_tm = NULL;
char timestr[256] = {0,};
- struct rpc_req req;
struct iovec iov = {0,};
- memset (&req, 0, sizeof (req));
-
- req.rpc_status = -1;
-
list_for_each_entry_safe (trav, tmp, &saved_frames->sf.list, list) {
frame_sent_tm = localtime (&trav->saved_at.tv_sec);
strftime (timestr, sizeof(timestr), "%Y-%m-%d %H:%M:%S",
frame_sent_tm);
- snprintf (timestr + strlen (timestr), sizeof(timestr) - strlen (timestr),
+ snprintf (timestr + strlen (timestr),
+ sizeof(timestr) - strlen (timestr),
".%"GF_PRI_SUSECONDS, trav->saved_at.tv_usec);
gf_log ("rpc-clnt", GF_LOG_ERROR,
- "forced unwinding frame type(%s) op(%s(%d)) called at %s",
- trav->prog->progname, (trav->prog->procnames) ?
- trav->prog->procnames[trav->procnum] : "--",
- trav->procnum, timestr);
+ "forced unwinding frame type(%s) op(%s(%d)) "
+ "called at %s",
+ trav->rpcreq->prog->progname,
+ (trav->rpcreq->prog->procnames) ?
+ trav->rpcreq->prog->procnames[trav->rpcreq->procnum]
+ : "--",
+ trav->rpcreq->procnum, timestr);
saved_frames->count--;
- trav->cbkfn (&req, &iov, 1, trav->frame);
+ trav->rpcreq->rpc_status = -1;
+ trav->rpcreq->cbkfn (trav->rpcreq, &iov, 1, trav->frame);
+ rpc_clnt_reply_deinit (trav->rpcreq,
+ trav->rpcreq->conn->rpc_clnt->reqpool);
list_del_init (&trav->list);
- GF_FREE (trav);
+ mem_put (trav->rpcreq->conn->rpc_clnt->saved_frames_pool, trav);
}
}
@@ -407,9 +419,9 @@ rpc_clnt_fill_request_info (struct rpc_clnt *clnt, rpc_request_info_t *info)
goto out;
}
- info->prognum = saved_frame.prog->prognum;
- info->procnum = saved_frame.procnum;
- info->progver = saved_frame.prog->progver;
+ info->prognum = saved_frame.rpcreq->prog->prognum;
+ info->procnum = saved_frame.rpcreq->procnum;
+ info->progver = saved_frame.rpcreq->prog->progver;
info->rsp = saved_frame.rsp;
ret = 0;
@@ -490,9 +502,10 @@ int
rpc_clnt_reply_fill (rpc_transport_pollin_t *msg,
rpc_clnt_connection_t *conn,
struct rpc_msg *replymsg, struct iovec progmsg,
- struct rpc_req *req, struct saved_frame *saved_frame)
+ struct rpc_req *req,
+ struct saved_frame *saved_frame)
{
- int ret = -1;
+ int ret = -1;
if ((!conn) || (!replymsg)|| (!req) || (!saved_frame) || (!msg)) {
goto out;
@@ -504,25 +517,14 @@ rpc_clnt_reply_fill (rpc_transport_pollin_t *msg,
req->rpc_status = -1;
}
- req->xid = rpc_reply_xid (replymsg);
- req->prog = saved_frame->prog;
- req->procnum = saved_frame->procnum;
- req->conn = conn;
-
req->rsp[0] = progmsg;
+ req->rsp_iobref = iobref_ref (msg->iobref);
if (msg->vectored) {
- req->rsp[1].iov_base = iobuf_ptr (msg->data.vector.iobuf2);
- req->rsp[1].iov_len = msg->data.vector.size2;
-
+ req->rsp[1] = msg->vector[1];
req->rspcnt = 2;
-
- req->rsp_prochdr = iobuf_ref (msg->data.vector.iobuf1);
- req->rsp_procpayload = iobuf_ref (msg->data.vector.iobuf2);
} else {
req->rspcnt = 1;
-
- req->rsp_prochdr = iobuf_ref (msg->data.simple.iobuf);
}
/* By this time, the data bytes for the auth scheme would have already
@@ -544,20 +546,17 @@ out:
void
-rpc_clnt_reply_deinit (struct rpc_req *req)
+rpc_clnt_reply_deinit (struct rpc_req *req, struct mem_pool *pool)
{
if (!req) {
goto out;
}
- if (req->rsp_prochdr) {
- iobuf_unref (req->rsp_prochdr);
- }
-
- if (req->rsp_procpayload) {
- iobuf_unref (req->rsp_procpayload);
+ if (req->rsp_iobref) {
+ iobref_unref (req->rsp_iobref);
}
+ mem_put (pool, req);
out:
return;
}
@@ -574,13 +573,8 @@ rpc_clnt_reply_init (rpc_clnt_connection_t *conn, rpc_transport_pollin_t *msg,
size_t msglen = 0;
int ret = -1;
- if (msg->vectored) {
- msgbuf = iobuf_ptr (msg->data.vector.iobuf1);
- msglen = msg->data.vector.size1;
- } else {
- msgbuf = iobuf_ptr (msg->data.simple.iobuf);
- msglen = msg->data.simple.size;
- }
+ msgbuf = msg->vector[0].iov_base;
+ msglen = msg->vector[0].iov_len;
ret = xdr_to_rpc_reply (msgbuf, msglen, &rpcmsg, &progmsg,
req->verf.authdata);
@@ -595,10 +589,11 @@ rpc_clnt_reply_init (rpc_clnt_connection_t *conn, rpc_transport_pollin_t *msg,
goto out;
}
- gf_log ("rpc-clnt", GF_LOG_TRACE, "RPC XID: %"PRIx64", Program: %s,"
- " ProgVers: %d, Proc: %d", saved_frame->callid,
- saved_frame->prog->progname, saved_frame->prog->progver,
- saved_frame->procnum);
+ gf_log ("rpc-clnt", GF_LOG_TRACE, "RPC XID: %d Program: %s,"
+ " ProgVers: %d, Proc: %d", saved_frame->rpcreq->xid,
+ saved_frame->rpcreq->prog->progname,
+ saved_frame->rpcreq->prog->progver,
+ saved_frame->rpcreq->procnum);
/* TODO: */
/* TODO: AUTH */
/* The verifier that is sent in a reply is a string that can be used as
@@ -639,8 +634,7 @@ rpc_clnt_handle_reply (struct rpc_clnt *clnt, rpc_transport_pollin_t *pollin)
struct saved_frame *saved_frame = NULL;
rpc_request_info_t *request_info = NULL;
int ret = -1;
- struct rpc_req req = {0, };
- int cbk_ret = -1;
+ struct rpc_req *req = NULL;
conn = &clnt->conn;
@@ -657,26 +651,36 @@ rpc_clnt_handle_reply (struct rpc_clnt *clnt, rpc_transport_pollin_t *pollin)
goto out;
}
- ret = rpc_clnt_reply_init (conn, pollin, &req, saved_frame);
+ req = saved_frame->rpcreq;
+ if (req == NULL) {
+ gf_log ("rpc-clnt", GF_LOG_CRITICAL,
+ "saved_frame for reply with xid (%d), "
+ "prog-version (%d), prog-num (%d), procnum (%d)"
+ "does not contain rpc-req", request_info->xid,
+ request_info->progver, request_info->prognum,
+ request_info->procnum);
+ goto out;
+ }
+
+ ret = rpc_clnt_reply_init (conn, pollin, req, saved_frame);
if (ret != 0) {
- req.rpc_status = -1;
+ req->rpc_status = -1;
gf_log ("rpc-clnt", GF_LOG_DEBUG, "initialising rpc reply "
"failed");
}
- cbk_ret = saved_frame->cbkfn (&req, req.rsp, req.rspcnt,
- saved_frame->frame);
-
- if (ret == 0) {
- rpc_clnt_reply_deinit (&req);
+ req->cbkfn (req, req->rsp, req->rspcnt, saved_frame->frame);
+
+ if (req) {
+ rpc_clnt_reply_deinit (req, conn->rpc_clnt->reqpool);
}
out:
if (saved_frame) {
- GF_FREE (saved_frame);
+ mem_put (conn->rpc_clnt->saved_frames_pool, saved_frame);
}
- return cbk_ret;
+ return ret;
}
@@ -894,6 +898,25 @@ rpc_clnt_init (struct rpc_clnt_config *config, dict_t *options,
pthread_mutex_init (&rpc->lock, NULL);
rpc->ctx = ctx;
+ rpc->reqpool = mem_pool_new (struct rpc_req,
+ RPC_CLNT_DEFAULT_REQUEST_COUNT);
+ if (rpc->reqpool == NULL) {
+ pthread_mutex_destroy (&rpc->lock);
+ GF_FREE (rpc);
+ rpc = NULL;
+ goto out;
+ }
+
+ rpc->saved_frames_pool = mem_pool_new (struct saved_frame,
+ RPC_CLNT_DEFAULT_REQUEST_COUNT);
+ if (rpc->saved_frames_pool == NULL) {
+ pthread_mutex_destroy (&rpc->lock);
+ mem_pool_destroy (rpc->reqpool);
+ GF_FREE (rpc);
+ rpc = NULL;
+ goto out;
+ }
+
ret = rpc_clnt_connection_init (rpc, ctx, options, name);
if (ret == -1) {
pthread_mutex_destroy (&rpc->lock);
@@ -901,6 +924,7 @@ rpc_clnt_init (struct rpc_clnt_config *config, dict_t *options,
rpc = NULL;
goto out;
}
+
out:
return rpc;
}
@@ -1167,12 +1191,14 @@ rpc_clnt_submit (struct rpc_clnt *rpc, rpc_clnt_prog_t *prog,
int procnum, fop_cbk_fn_t cbkfn,
struct iovec *proghdr, int proghdrcount,
struct iovec *progpayload, int progpayloadcount,
- struct iobref *iobref, void *frame)
+ struct iobref *iobref, void *frame, struct iovec *rsphdr,
+ int rsphdr_count, struct iovec *rsp_payload,
+ int rsp_payload_count, struct iobref *rsp_iobref)
{
rpc_clnt_connection_t *conn = NULL;
struct iobuf *request_iob = NULL;
struct iovec rpchdr = {0,};
- struct rpc_req rpcreq = {0,};
+ struct rpc_req *rpcreq = NULL;
rpc_transport_req_t req;
int ret = -1;
int proglen = 0;
@@ -1183,6 +1209,13 @@ rpc_clnt_submit (struct rpc_clnt *rpc, rpc_clnt_prog_t *prog,
goto out;
}
+ rpcreq = mem_get (rpc->reqpool);
+ if (rpcreq == NULL) {
+ gf_log ("rpc-clnt", GF_LOG_ERROR, "out of memory");
+ goto out;
+ }
+
+ memset (rpcreq, 0, sizeof (*rpcreq));
memset (&req, 0, sizeof (req));
if (!iobref) {
@@ -1199,6 +1232,12 @@ rpc_clnt_submit (struct rpc_clnt *rpc, rpc_clnt_prog_t *prog,
conn = &rpc->conn;
+ rpcreq->prog = prog;
+ rpcreq->procnum = procnum;
+ rpcreq->conn = conn;
+ rpcreq->xid = callid;
+ rpcreq->cbkfn = cbkfn;
+
pthread_mutex_lock (&conn->lock);
{
if (conn->connected == 0) {
@@ -1235,6 +1274,12 @@ rpc_clnt_submit (struct rpc_clnt *rpc, rpc_clnt_prog_t *prog,
req.msg.progpayloadcount = progpayloadcount;
req.msg.iobref = iobref;
+ req.rsp.rsphdr = rsphdr;
+ req.rsp.rsphdr_count = rsphdr_count;
+ req.rsp.rsp_payload = rsp_payload;
+ req.rsp.rsp_payload_count = rsp_payload_count;
+ req.rsp.rsp_iobref = rsp_iobref;
+
ret = rpc_transport_submit_request (rpc->conn.trans,
&req);
if (ret == -1) {
@@ -1245,9 +1290,8 @@ rpc_clnt_submit (struct rpc_clnt *rpc, rpc_clnt_prog_t *prog,
if ((ret >= 0) && frame) {
gettimeofday (&conn->last_sent, NULL);
/* Save the frame in queue */
- __save_frame (rpc, frame, procnum, prog, cbkfn, callid);
+ __save_frame (rpc, frame, rpcreq);
}
-
}
unlock:
pthread_mutex_unlock (&conn->lock);
@@ -1266,8 +1310,9 @@ out:
}
if (frame && (ret == -1)) {
- rpcreq.rpc_status = -1;
- cbkfn (&rpcreq, NULL, 0, frame);
+ rpcreq->rpc_status = -1;
+ cbkfn (rpcreq, NULL, 0, frame);
+ mem_put (rpc->reqpool, rpcreq);
}
return ret;
}
diff --git a/rpc/rpc-lib/src/rpc-clnt.h b/rpc/rpc-lib/src/rpc-clnt.h
index efc256cd261..b9d39b3320a 100644
--- a/rpc/rpc-lib/src/rpc-clnt.h
+++ b/rpc/rpc-lib/src/rpc-clnt.h
@@ -58,10 +58,7 @@ struct saved_frame {
void *capital_this;
void *frame;
struct timeval saved_at;
- int32_t procnum;
- struct rpc_clnt_program *prog;
- fop_cbk_fn_t cbkfn;
- uint64_t callid;
+ struct rpc_req *rpcreq;
rpc_transport_rsp_t rsp;
};
@@ -116,14 +113,16 @@ struct rpc_req {
uint32_t xid;
struct iovec req[2];
int reqcnt;
+ struct iobref *req_iobref;
struct iovec rsp[2];
int rspcnt;
- struct iobuf *rsp_prochdr;
- struct iobuf *rsp_procpayload;
+ struct iobref *rsp_iobref;
int rpc_status;
rpc_auth_data_t verf;
rpc_clnt_prog_t *prog;
int procnum;
+ fop_cbk_fn_t cbkfn;
+ void *conn_private;
};
struct rpc_clnt {
@@ -132,6 +131,12 @@ struct rpc_clnt {
rpc_clnt_connection_t conn;
void *mydata;
uint64_t xid;
+
+ /* Memory pool for rpc_req_t */
+ struct mem_pool *reqpool;
+
+ struct mem_pool *saved_frames_pool;
+
glusterfs_ctx_t *ctx;
};
@@ -149,11 +154,28 @@ struct rpc_clnt * rpc_clnt_init (struct rpc_clnt_config *config,
int rpc_clnt_register_notify (struct rpc_clnt *rpc, rpc_clnt_notify_t fn,
void *mydata);
+/* Some preconditions related to vectors holding responses.
+ * @rsphdr: should contain pointer to buffer which can hold response header
+ * and length of the program header. In case of procedures whose
+ * respnose size is not bounded (eg., glusterfs lookup), the length
+ * should be equal to size of buffer.
+ * @rsp_payload: should contain pointer and length of the bu
+ *
+ * 1. Both @rsp_hdr and @rsp_payload are optional.
+ * 2. The user of rpc_clnt_submit, if wants response hdr and payload in its own
+ * buffers, then it has to populate @rsphdr and @rsp_payload.
+ * 3. when @rsp_payload is not NULL, @rsphdr should
+ * also be filled with pointer to buffer to hold header and length
+ * of the header.
+ */
+
int rpc_clnt_submit (struct rpc_clnt *rpc, rpc_clnt_prog_t *prog,
int procnum, fop_cbk_fn_t cbkfn,
struct iovec *proghdr, int proghdrcount,
struct iovec *progpayload, int progpayloadcount,
- struct iobref *iobref, void *frame);
+ struct iobref *iobref, void *frame, struct iovec *rsphdr,
+ int rsphdr_count, struct iovec *rsp_payload,
+ int rsp_payload_count, struct iobref *rsp_iobref);
void rpc_clnt_destroy (struct rpc_clnt *rpc);
diff --git a/rpc/rpc-lib/src/rpc-transport.c b/rpc/rpc-lib/src/rpc-transport.c
index b77ea2aa553..50379c14950 100644
--- a/rpc/rpc-lib/src/rpc-transport.c
+++ b/rpc/rpc-lib/src/rpc-transport.c
@@ -628,20 +628,10 @@ rpc_transport_pollin_destroy (rpc_transport_pollin_t *pollin)
goto out;
}
- if (pollin->vectored) {
- if (pollin->data.vector.iobuf1) {
- iobuf_unref (pollin->data.vector.iobuf1);
- }
-
- if (pollin->data.vector.iobuf2) {
- iobuf_unref (pollin->data.vector.iobuf2);
- }
- } else {
- if (pollin->data.simple.iobuf) {
- iobuf_unref (pollin->data.simple.iobuf);
- }
+ if (pollin->iobref) {
+ iobref_unref (pollin->iobref);
}
-
+
if (pollin->private) {
/* */
GF_FREE (pollin->private);
@@ -654,9 +644,8 @@ out:
rpc_transport_pollin_t *
-rpc_transport_pollin_alloc (rpc_transport_t *this, struct iobuf *iobuf,
- size_t size, struct iobuf *vectored_buf,
- size_t vectored_size, void *private)
+rpc_transport_pollin_alloc (rpc_transport_t *this, struct iovec *vector,
+ int count, struct iobref *iobref, void *private)
{
rpc_transport_pollin_t *msg = NULL;
msg = GF_CALLOC (1, sizeof (*msg), gf_common_mt_rpc_trans_pollin_t);
@@ -665,19 +654,15 @@ rpc_transport_pollin_alloc (rpc_transport_t *this, struct iobuf *iobuf,
goto out;
}
- if (vectored_buf) {
+ if (count == 2) {
msg->vectored = 1;
- msg->data.vector.iobuf1 = iobuf_ref (iobuf);
- msg->data.vector.size1 = size;
-
- msg->data.vector.iobuf2 = iobuf_ref (vectored_buf);
- msg->data.vector.size2 = vectored_size;
- } else {
- msg->data.simple.iobuf = iobuf_ref (iobuf);
- msg->data.simple.size = size;
}
+ memcpy (msg->vector, vector, count * sizeof (*vector));
+ msg->count = count;
+ msg->iobref = iobref_ref (iobref);
msg->private = private;
+
out:
return msg;
}
@@ -698,6 +683,7 @@ rpc_transport_same_process_pollin_alloc (rpc_transport_t *this,
int progpayloadlen = 0;
char vectored = 0;
char *hdr = NULL, *progpayloadbuf = NULL;
+ struct iobuf *iobuf = NULL;
if (!rpchdr || !proghdr) {
goto err;
@@ -729,47 +715,72 @@ rpc_transport_same_process_pollin_alloc (rpc_transport_t *this,
}
if (vectored) {
- msg->data.vector.iobuf1 = iobuf_get (this->ctx->iobuf_pool);
- if (!msg->data.vector.iobuf1) {
+ msg->iobref = iobref_new ();
+ if (!msg->iobref) {
+ gf_log ("rpc-transport", GF_LOG_ERROR,
+ "out of memory");
+ goto err;
+ }
+
+ iobuf = iobuf_get (this->ctx->iobuf_pool);
+ if (!iobuf) {
gf_log ("rpc_transport", GF_LOG_ERROR,
"out of memory");
goto err;
}
- msg->data.vector.size1 = rpchdrlen + proghdrlen;
- hdr = iobuf_ptr (msg->data.vector.iobuf1);
+ iobref_add (msg->iobref, iobuf);
+ iobuf_unref (iobuf);
+
+ msg->vector[0].iov_len = rpchdrlen + proghdrlen;
+ msg->vector[0].iov_base = hdr = iobuf_ptr (iobuf);
if (!is_request && rsp) {
- msg->data.vector.iobuf2 = rsp->rspbuf;
- progpayloadbuf = rsp->rspvec->iov_base;
+ msg->vector[1] = rsp->rsp_payload[0];
+ progpayloadbuf = rsp->rsp_payload[0].iov_base;
} else {
- msg->data.vector.iobuf2 = iobuf_get (this->ctx->iobuf_pool);
- if (!msg->data.vector.iobuf2) {
+ iobuf = iobuf_get (this->ctx->iobuf_pool);
+ if (!iobuf) {
gf_log ("rpc_transport", GF_LOG_ERROR,
"out of memory");
goto err;
}
- progpayloadbuf = iobuf_ptr (msg->data.vector.iobuf2);
+ iobref_add (msg->iobref, iobuf);
+ iobuf_unref (iobuf);
+
+ msg->vector[1].iov_base
+ = progpayloadbuf = iobuf_ptr (iobuf);
}
- msg->data.vector.size2 = progpayloadlen;
+ msg->vector[1].iov_len = progpayloadlen;
} else {
if (!is_request && rsp) {
/* FIXME: Assuming rspvec contains only one vector */
- hdr = rsp->rspvec->iov_base;
- msg->data.simple.iobuf = rsp->rspbuf;
+ hdr = rsp->rsphdr[0].iov_base;
+ msg->vector[0] = rsp->rsphdr[0];
} else {
- msg->data.simple.iobuf = iobuf_get (this->ctx->iobuf_pool);
- if (!msg->data.simple.iobuf) {
+ msg->iobref = iobref_new ();
+ if (!msg->iobref) {
+ gf_log ("rpc-transport", GF_LOG_ERROR,
+ "out of memory");
+ goto err;
+ }
+
+ iobuf = iobuf_get (this->ctx->iobuf_pool);
+ if (!iobuf) {
gf_log ("rpc_transport", GF_LOG_ERROR,
"out of memory");
goto err;
}
- hdr = iobuf_ptr (msg->data.simple.iobuf);
+ iobref_add (msg->iobref, iobuf);
+ iobuf_unref (iobuf);
+
+ hdr = iobuf_ptr (iobuf);
+ msg->vector[0].iov_base = hdr;
}
- msg->data.simple.size = rpchdrlen + proghdrlen;
+ msg->vector[0].iov_len = rpchdrlen + proghdrlen;
}
iov_unload (hdr, rpchdr, rpchdrcount);
@@ -1253,7 +1264,8 @@ rpc_transport_peerproc (void *trans_data)
}
pthread_mutex_unlock (&trans->handover.mutex);
- rpc_transport_notify (trans, RPC_TRANSPORT_MSG_RECEIVED, msg->pollin);
+ rpc_transport_notify (trans, RPC_TRANSPORT_MSG_RECEIVED,
+ msg->pollin);
rpc_transport_handover_destroy (msg);
}
}
diff --git a/rpc/rpc-lib/src/rpc-transport.h b/rpc/rpc-lib/src/rpc-transport.h
index b3c7985e095..5698b287cce 100644
--- a/rpc/rpc-lib/src/rpc-transport.h
+++ b/rpc/rpc-lib/src/rpc-transport.h
@@ -30,6 +30,10 @@
#include <rpc/auth.h>
#include <rpc/rpc_msg.h>
+#ifndef MAX_IOVEC
+#define MAX_IOVEC 16
+#endif
+
/* Given the 4-byte fragment header, returns non-zero if this fragment
* is the last fragment for the RPC record being assemebled.
* RPC Record marking standard defines a 32 bit value as the fragment
@@ -108,12 +112,11 @@ struct rpc_transport_msg {
typedef struct rpc_transport_msg rpc_transport_msg_t;
struct rpc_transport_rsp {
- /* as of now, the entire rsp payload is read into rspbuf and hence
- * rspcount is always set to one.
- */
- struct iovec *rspvec;
- int rspcount;
- struct iobuf *rspbuf;
+ struct iovec *rsphdr;
+ int rsphdr_count;
+ struct iovec *rsp_payload;
+ int rsp_payload_count;
+ struct iobref *rsp_iobref;
};
typedef struct rpc_transport_rsp rpc_transport_rsp_t;
@@ -129,6 +132,15 @@ struct rpc_transport_reply {
};
typedef struct rpc_transport_reply rpc_transport_reply_t;
+struct rpc_transport_data {
+ char is_request;
+ union {
+ rpc_transport_req_t req;
+ rpc_transport_reply_t reply;
+ } data;
+};
+typedef struct rpc_transport_data rpc_transport_data_t;
+
struct rpc_request_info {
uint32_t xid;
int prognum;
@@ -140,20 +152,11 @@ typedef struct rpc_request_info rpc_request_info_t;
struct rpc_transport_pollin {
- union {
- struct vectored {
- struct iobuf *iobuf1;
- size_t size1;
- struct iobuf *iobuf2;
- size_t size2;
- } vector;
- struct simple {
- struct iobuf *iobuf;
- size_t size;
- } simple;
- } data;
+ struct iovec vector[2];
+ int count;
char vectored;
void *private;
+ struct iobref *iobref;
};
typedef struct rpc_transport_pollin rpc_transport_pollin_t;
@@ -279,9 +282,8 @@ rpc_transport_get_myaddr (rpc_transport_t *this, char *peeraddr, int addrlen,
struct sockaddr *sa, size_t salen);
rpc_transport_pollin_t *
-rpc_transport_pollin_alloc (rpc_transport_t *this, struct iobuf *iobuf,
- size_t iobuf_size, struct iobuf *vectoriob,
- size_t vectoriob_size, void *private);
+rpc_transport_pollin_alloc (rpc_transport_t *this, struct iovec *vector,
+ int count, struct iobref *iobref, void *private);
void
rpc_transport_pollin_destroy (rpc_transport_pollin_t *pollin);
diff --git a/rpc/rpc-lib/src/rpcsvc.c b/rpc/rpc-lib/src/rpcsvc.c
index 9bb7e0e4c9e..5bb908cec0a 100644
--- a/rpc/rpc-lib/src/rpcsvc.c
+++ b/rpc/rpc-lib/src/rpcsvc.c
@@ -1021,12 +1021,8 @@ rpcsvc_request_destroy (rpcsvc_conn_t *conn, rpcsvc_request_t *req)
goto out;
}
- if (req->recordiob) {
- iobuf_unref (req->recordiob);
- }
-
- if (req->vectorediob) {
- iobuf_unref (req->vectorediob);
+ if (req->iobref) {
+ iobref_unref (req->iobref);
}
mem_put (conn->rxpool, req);
@@ -1050,15 +1046,11 @@ rpcsvc_request_init (rpcsvc_conn_t *conn, struct rpc_msg *callmsg,
req->progver = rpc_call_progver (callmsg);
req->procnum = rpc_call_progproc (callmsg);
req->conn = conn;
+ req->count = msg->count;
req->msg[0] = progmsg;
+ req->iobref = iobref_ref (msg->iobref);
if (msg->vectored) {
- req->msg[1].iov_base = iobuf_ptr (msg->data.vector.iobuf2);
- req->msg[1].iov_len = msg->data.vector.size2;
-
- req->recordiob = iobuf_ref (msg->data.vector.iobuf1);
- req->vectorediob = iobuf_ref (msg->data.vector.iobuf2);
- } else {
- req->recordiob = iobuf_ref (msg->data.simple.iobuf);
+ req->msg[1] = msg->vector[1];
}
req->trans_private = msg->private;
@@ -1106,13 +1098,8 @@ rpcsvc_request_create (rpcsvc_conn_t *conn, rpc_transport_pollin_t *msg)
goto err;
}
- if (msg->vectored) {
- msgbuf = iobuf_ptr (msg->data.vector.iobuf1);
- msglen = msg->data.vector.size1;
- } else {
- msgbuf = iobuf_ptr (msg->data.simple.iobuf);
- msglen = msg->data.simple.size;
- }
+ msgbuf = msg->vector[0].iov_base;
+ msglen = msg->vector[0].iov_len;
ret = xdr_to_rpc_call (msgbuf, msglen, &rpcmsg, &progmsg,
req->cred.authdata,req->verf.authdata);
@@ -1190,11 +1177,11 @@ rpcsvc_handle_rpc_call (rpcsvc_conn_t *conn, rpc_transport_pollin_t *msg)
goto err_reply;
if (actor && (req->rpc_err == SUCCESS)) {
- if (req->vectorediob) {
+ if (req->count == 2) {
if (actor->vector_actor) {
rpcsvc_conn_ref (conn);
- ret = actor->vector_actor (req,
- req->vectorediob);
+ ret = actor->vector_actor (req, &req->msg[1], 1,
+ req->iobref);
} else {
rpcsvc_request_seterr (req, PROC_UNAVAIL);
gf_log (GF_RPCSVC, GF_LOG_ERROR,
diff --git a/rpc/rpc-lib/src/rpcsvc.h b/rpc/rpc-lib/src/rpcsvc.h
index 10dc32698ad..5a3f3cd3449 100644
--- a/rpc/rpc-lib/src/rpcsvc.h
+++ b/rpc/rpc-lib/src/rpcsvc.h
@@ -237,21 +237,9 @@ struct rpcsvc_request {
* be de-xdred by the actor.
*/
struct iovec msg[2];
+ int count;
- /* The full message buffer allocated to store the RPC headers.
- * This buffer is ref'd when allocated why RPC svc and unref'd after
- * the buffer is handed to the actor. That means if the actor or any
- * higher layer wants to keep this buffer around, they too must ref it
- * right after entering the program actor.
- */
- struct iobuf *recordiob;
-
- /* iobuf to hold payload of calls like write. By storing large payloads
- * starting from page-aligned addresses, performance increases while
- * accessing the payload
- */
- struct iobuf *vectorediob;
-
+ struct iobref *iobref;
/* Status of the RPC call, whether it was accepted or denied. */
int rpc_status;
@@ -317,7 +305,6 @@ struct rpcsvc_request {
#define rpcsvc_request_private(req) ((req)->private)
#define rpcsvc_request_xid(req) ((req)->xid)
#define rpcsvc_request_set_private(req,prv) (req)->private = (void *)(prv)
-#define rpcsvc_request_record_iob(rq) ((rq)->recordiob)
#define rpcsvc_request_record_ref(req) (iobuf_ref ((req)->recordiob))
#define rpcsvc_request_record_unref(req) (iobuf_unref ((req)->recordiob))
@@ -338,7 +325,8 @@ struct rpcsvc_request {
*
*/
typedef int (*rpcsvc_actor) (rpcsvc_request_t *req);
-typedef int (*rpcsvc_vector_actor) (rpcsvc_request_t *req, struct iobuf *iob);
+typedef int (*rpcsvc_vector_actor) (rpcsvc_request_t *req, struct iovec *vec,
+ int count, struct iobref *iobref);
typedef int (*rpcsvc_vector_sizer) (rpcsvc_request_t *req, ssize_t *readsize,
int *newiob);
diff --git a/rpc/rpc-lib/src/xdr-common.h b/rpc/rpc-lib/src/xdr-common.h
index 7ba1372529c..b3ce29e5dbe 100644
--- a/rpc/rpc-lib/src/xdr-common.h
+++ b/rpc/rpc-lib/src/xdr-common.h
@@ -50,12 +50,12 @@ struct auth_glusterfs_parms {
u_int gid;
u_int ngrps;
u_int groups[16];
-};
+} __attribute__((packed));
typedef struct auth_glusterfs_parms auth_glusterfs_parms;
struct gf_dump_req {
u_quad_t gfs_id;
-};
+} __attribute__((packed));
typedef struct gf_dump_req gf_dump_req;
struct gf_prog_detail {
@@ -63,7 +63,7 @@ struct gf_prog_detail {
u_quad_t prognum;
u_quad_t progver;
struct gf_prog_detail *next;
-};
+} __attribute__((packed));
typedef struct gf_prog_detail gf_prog_detail;
struct gf_dump_rsp {
@@ -71,7 +71,7 @@ struct gf_dump_rsp {
int op_ret;
int op_errno;
struct gf_prog_detail *prog;
-};
+}__attribute__((packed));
typedef struct gf_dump_rsp gf_dump_rsp;
extern bool_t
diff --git a/rpc/rpc-transport/socket/src/socket.c b/rpc/rpc-transport/socket/src/socket.c
index 4396454a33a..baeee735bb8 100644
--- a/rpc/rpc-transport/socket/src/socket.c
+++ b/rpc/rpc-transport/socket/src/socket.c
@@ -370,12 +370,13 @@ __socket_reset (rpc_transport_t *this)
/* TODO: use mem-pool on incoming data */
- if (priv->incoming.iobuf) {
- iobuf_unref (priv->incoming.iobuf);
+ if (priv->incoming.iobref) {
+ iobref_unref (priv->incoming.iobref);
+ priv->incoming.iobref = NULL;
}
- if (priv->incoming.vectoriob) {
- iobuf_unref (priv->incoming.vectoriob);
+ if (priv->incoming.iobuf) {
+ iobuf_unref (priv->incoming.iobuf);
}
memset (&priv->incoming, 0, sizeof (priv->incoming));
@@ -699,7 +700,7 @@ __socket_read_vectored_request (rpc_transport_t *this)
/* fall through */
case SP_STATE_READ_VERFBYTES:
- if (priv->incoming.vectoriob == NULL) {
+ if (priv->incoming.payload_vector.iov_base == NULL) {
iobuf = iobuf_get (this->ctx->iobuf_pool);
if (!iobuf) {
gf_log (this->name, GF_LOG_ERROR,
@@ -710,7 +711,23 @@ __socket_read_vectored_request (rpc_transport_t *this)
break;
}
- priv->incoming.vectoriob = iobuf;
+ if (priv->incoming.iobref == NULL) {
+ priv->incoming.iobref = iobref_new ();
+ if (priv->incoming.iobref == NULL) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "out of memory");
+ ret = -1;
+ iobuf_unref (iobuf);
+ break;
+ }
+ }
+
+ iobref_add (priv->incoming.iobref, iobuf);
+ iobuf_unref (iobuf);
+
+ priv->incoming.payload_vector.iov_base
+ = iobuf_ptr (iobuf);
+
priv->incoming.frag.fragcurrent = iobuf_ptr (iobuf);
}
@@ -735,9 +752,10 @@ __socket_read_vectored_request (rpc_transport_t *this)
&& RPC_LASTFRAG (priv->incoming.fraghdr))) {
priv->incoming.frag.call_body.request.vector_state
= SP_STATE_VECTORED_REQUEST_INIT;
- priv->incoming.vectoriob_size
+ priv->incoming.payload_vector.iov_len
= (unsigned long)priv->incoming.frag.fragcurrent
- - (unsigned long)iobuf_ptr (priv->incoming.vectoriob);
+ - (unsigned long)
+ priv->incoming.payload_vector.iov_base;
}
break;
}
@@ -846,18 +864,31 @@ __socket_read_accepted_successful_reply (rpc_transport_t *this)
/* fall through */
case SP_STATE_READ_PROC_HEADER:
- if (priv->incoming.vectoriob == NULL) {
+ if (priv->incoming.payload_vector.iov_base == NULL) {
iobuf = iobuf_get (this->ctx->iobuf_pool);
if (iobuf == NULL) {
ret = -1;
goto out;
}
- priv->incoming.vectoriob = iobuf;
+ if (priv->incoming.iobref == NULL) {
+ priv->incoming.iobref = iobref_new ();
+ if (priv->incoming.iobref == NULL) {
+ ret = -1;
+ iobuf_unref (iobuf);
+ goto out;
+ }
+ }
+
+ iobref_add (priv->incoming.iobref, iobuf);
+ iobuf_unref (iobuf);
+
+ priv->incoming.payload_vector.iov_base
+ = iobuf_ptr (iobuf);
}
priv->incoming.frag.fragcurrent
- = iobuf_ptr (priv->incoming.vectoriob);
+ = priv->incoming.payload_vector.iov_base;
/* now read the entire remaining msg into new iobuf */
ret = __socket_read_simple_msg (this);
@@ -1080,9 +1111,11 @@ __socket_read_reply (rpc_transport_t *this)
if ((request_info->prognum == GLUSTER3_1_FOP_PROGRAM)
&& (request_info->procnum == GF_FOP_READ)) {
- if (request_info->rsp.rspbuf != NULL) {
- priv->incoming.vectoriob
- = iobuf_ref (request_info->rsp.rspbuf);
+ if (request_info->rsp.rsp_payload_count != 0) {
+ priv->incoming.iobref
+ = iobref_ref (request_info->rsp.rsp_iobref);
+ priv->incoming.payload_vector
+ = *request_info->rsp.rsp_payload;
}
ret = __socket_read_vectored_reply (this);
@@ -1154,15 +1187,19 @@ __socket_read_frag (rpc_transport_t *this)
inline
void __socket_reset_priv (socket_private_t *priv)
{
+ if (priv->incoming.iobref) {
+ iobref_unref (priv->incoming.iobref);
+ priv->incoming.iobref = NULL;
+ }
+
if (priv->incoming.iobuf) {
iobuf_unref (priv->incoming.iobuf);
- priv->incoming.iobuf = NULL;
}
- if (priv->incoming.vectoriob) {
- iobuf_unref (priv->incoming.vectoriob);
- priv->incoming.vectoriob = NULL;
- }
+ memset (&priv->incoming.payload_vector, 0,
+ sizeof (priv->incoming.payload_vector));
+
+ priv->incoming.iobuf = NULL;
}
@@ -1170,9 +1207,11 @@ int
__socket_proto_state_machine (rpc_transport_t *this,
rpc_transport_pollin_t **pollin)
{
- int ret = -1;
- socket_private_t *priv = NULL;
- struct iobuf *iobuf = NULL;
+ int ret = -1;
+ socket_private_t *priv = NULL;
+ struct iobuf *iobuf = NULL;
+ struct iobref *iobref = NULL;
+ struct iovec vector[2];
priv = this->private;
while (priv->incoming.record_state != SP_STATE_COMPLETE) {
@@ -1191,7 +1230,7 @@ __socket_proto_state_machine (rpc_transport_t *this,
priv->incoming.iobuf = iobuf;
priv->incoming.iobuf_size = 0;
- priv->incoming.vectoriob_size = 0;
+ priv->incoming.payload_vector.iov_len = 0;
priv->incoming.pending_vector = priv->incoming.vector;
priv->incoming.pending_vector->iov_base =
@@ -1260,15 +1299,49 @@ __socket_proto_state_machine (rpc_transport_t *this,
* upper layers.
*/
if (pollin != NULL) {
+ int count = 0;
priv->incoming.iobuf_size
= priv->incoming.total_bytes_read
- - priv->incoming.vectoriob_size;
+ - priv->incoming.payload_vector.iov_len;
+
+ memset (vector, 0, sizeof (vector));
+
+ if (priv->incoming.iobref == NULL) {
+ priv->incoming.iobref = iobref_new ();
+ if (priv->incoming.iobref == NULL) {
+ gf_log (this->name,
+ GF_LOG_ERROR,
+ "out of memory");
+ ret = -1;
+ goto out;
+ }
+ }
+
+ vector[count].iov_base
+ = iobuf_ptr (priv->incoming.iobuf);
+ vector[count].iov_len
+ = priv->incoming.iobuf_size;
+
+ iobref = priv->incoming.iobref;
+
+ iobref_add (iobref,
+ priv->incoming.iobuf);
+ iobuf_unref (priv->incoming.iobuf);
+ priv->incoming.iobuf = NULL;
+
+ count++;
+
+ if (priv->incoming.payload_vector.iov_base
+ != NULL) {
+ vector[count]
+ = priv->incoming.payload_vector;
+ count++;
+ }
*pollin = rpc_transport_pollin_alloc (this,
- priv->incoming.iobuf,
- priv->incoming.iobuf_size,
- priv->incoming.vectoriob,
- priv->incoming.vectoriob_size,
+ vector,
+ count,
+ iobref,
priv->incoming.request_info);
if (*pollin == NULL) {
ret = -1;
diff --git a/rpc/rpc-transport/socket/src/socket.h b/rpc/rpc-transport/socket/src/socket.h
index aa31ee2a7ef..5078b161e29 100644
--- a/rpc/rpc-transport/socket/src/socket.h
+++ b/rpc/rpc-transport/socket/src/socket.h
@@ -170,8 +170,8 @@ typedef struct {
size_t iobuf_size;
struct iovec vector[2];
int count;
- struct iobuf *vectoriob;
- size_t vectoriob_size;
+ struct iovec payload_vector;
+ struct iobref *iobref;
rpc_request_info_t *request_info;
struct iovec *pending_vector;
int pending_count;