summaryrefslogtreecommitdiffstats
path: root/rpc/rpc-lib/src/rpc-clnt.c
diff options
context:
space:
mode:
Diffstat (limited to 'rpc/rpc-lib/src/rpc-clnt.c')
-rw-r--r--rpc/rpc-lib/src/rpc-clnt.c219
1 files changed, 132 insertions, 87 deletions
diff --git a/rpc/rpc-lib/src/rpc-clnt.c b/rpc/rpc-lib/src/rpc-clnt.c
index 9b0bfe33d5b..fce3e8200fe 100644
--- a/rpc/rpc-lib/src/rpc-clnt.c
+++ b/rpc/rpc-lib/src/rpc-clnt.c
@@ -23,10 +23,16 @@
#include "config.h"
#endif
+#define RPC_CLNT_DEFAULT_REQUEST_COUNT 4096
+
#include "rpc-clnt.h"
#include "xdr-rpcclnt.h"
#include "rpc-transport.h"
#include "protocol-common.h"
+#include "mem-pool.h"
+
+void
+rpc_clnt_reply_deinit (struct rpc_req *req, struct mem_pool *pool);
uint64_t
rpc_clnt_new_callid (struct rpc_clnt *clnt)
@@ -63,28 +69,24 @@ __saved_frames_get_timedout (struct saved_frames *frames, uint32_t timeout,
struct saved_frame *
-__saved_frames_put (struct saved_frames *frames, void *frame, int32_t procnum,
- rpc_clnt_prog_t *prog, fop_cbk_fn_t cbk, int64_t callid)
+__saved_frames_put (struct saved_frames *frames, void *frame,
+ struct rpc_req *rpcreq)
{
struct saved_frame *saved_frame = NULL;
- saved_frame = GF_CALLOC (1, sizeof (*saved_frame),
- gf_common_mt_rpcclnt_savedframe_t);
+ saved_frame = mem_get (rpcreq->conn->rpc_clnt->saved_frames_pool);
if (!saved_frame) {
gf_log ("rpc-clnt", GF_LOG_ERROR, "out of memory");
goto out;
}
/* THIS should be saved and set back */
+ memset (saved_frame, 0, sizeof (*saved_frame));
INIT_LIST_HEAD (&saved_frame->list);
saved_frame->capital_this = THIS;
saved_frame->frame = frame;
- saved_frame->procnum = procnum;
- saved_frame->callid = callid;
- saved_frame->prog = prog;
- saved_frame->cbkfn = cbk;
-
+ saved_frame->rpcreq = rpcreq;
gettimeofday (&saved_frame->saved_at, NULL);
list_add_tail (&saved_frame->list, &frames->sf.list);
@@ -110,7 +112,12 @@ saved_frames_delete (struct saved_frame *saved_frame,
}
pthread_mutex_unlock (&conn->lock);
- GF_FREE (saved_frame);
+ if (saved_frame->rpcreq != NULL) {
+ rpc_clnt_reply_deinit (saved_frame->rpcreq,
+ conn->rpc_clnt->reqpool);
+ }
+
+ mem_put (conn->rpc_clnt->saved_frames_pool, saved_frame);
out:
return;
}
@@ -129,7 +136,6 @@ call_bail (void *data)
struct tm frame_sent_tm;
char frame_sent[32] = {0,};
struct timeval timeout = {0,};
- struct rpc_req req;
struct iovec iov = {0,};
GF_VALIDATE_OR_GOTO ("client", data, out);
@@ -180,15 +186,19 @@ call_bail (void *data)
gf_log (conn->trans->name, GF_LOG_ERROR,
"bailing out frame type(%s) op(%s(%d)) sent = %s. "
"timeout = %d",
- trav->prog->progname, (trav->prog->procnames) ?
- trav->prog->procnames[trav->procnum] : "--",
- trav->procnum, frame_sent,
+ trav->rpcreq->prog->progname,
+ (trav->rpcreq->prog->procnames) ?
+ trav->rpcreq->prog->procnames[trav->rpcreq->procnum] :
+ "--",
+ trav->rpcreq->procnum, frame_sent,
conn->frame_timeout);
- trav->cbkfn (&req, &iov, 1, trav->frame);
+ trav->rpcreq->rpc_status = -1;
+ trav->rpcreq->cbkfn (trav->rpcreq, &iov, 1, trav->frame);
+ rpc_clnt_reply_deinit (trav->rpcreq, clnt->reqpool);
list_del_init (&trav->list);
- GF_FREE (trav);
+ mem_put (conn->rpc_clnt->saved_frames_pool, trav);
}
out:
return;
@@ -197,8 +207,8 @@ out:
/* to be called with conn->lock held */
struct saved_frame *
-__save_frame (struct rpc_clnt *rpc_clnt, call_frame_t *frame, int procnum,
- rpc_clnt_prog_t *prog, fop_cbk_fn_t cbk, uint64_t callid)
+__save_frame (struct rpc_clnt *rpc_clnt, call_frame_t *frame,
+ struct rpc_req *rpcreq)
{
rpc_clnt_connection_t *conn = NULL;
struct timeval timeout = {0, };
@@ -206,8 +216,8 @@ __save_frame (struct rpc_clnt *rpc_clnt, call_frame_t *frame, int procnum,
conn = &rpc_clnt->conn;
- saved_frame = __saved_frames_put (conn->saved_frames, frame,
- procnum, prog, cbk, callid);
+ saved_frame = __saved_frames_put (conn->saved_frames, frame, rpcreq);
+
if (saved_frame == NULL) {
goto out;
}
@@ -258,7 +268,7 @@ __saved_frame_copy (struct saved_frames *frames, int64_t callid,
}
list_for_each_entry (tmp, &frames->sf.list, list) {
- if (tmp->callid == callid) {
+ if (tmp->rpcreq->xid == callid) {
*saved_frame = *tmp;
ret = 0;
break;
@@ -277,7 +287,7 @@ __saved_frame_get (struct saved_frames *frames, int64_t callid)
struct saved_frame *tmp = NULL;
list_for_each_entry (tmp, &frames->sf.list, list) {
- if (tmp->callid == callid) {
+ if (tmp->rpcreq->xid == callid) {
list_del_init (&tmp->list);
frames->count--;
saved_frame = tmp;
@@ -300,32 +310,34 @@ saved_frames_unwind (struct saved_frames *saved_frames)
struct tm *frame_sent_tm = NULL;
char timestr[256] = {0,};
- struct rpc_req req;
struct iovec iov = {0,};
- memset (&req, 0, sizeof (req));
-
- req.rpc_status = -1;
-
list_for_each_entry_safe (trav, tmp, &saved_frames->sf.list, list) {
frame_sent_tm = localtime (&trav->saved_at.tv_sec);
strftime (timestr, sizeof(timestr), "%Y-%m-%d %H:%M:%S",
frame_sent_tm);
- snprintf (timestr + strlen (timestr), sizeof(timestr) - strlen (timestr),
+ snprintf (timestr + strlen (timestr),
+ sizeof(timestr) - strlen (timestr),
".%"GF_PRI_SUSECONDS, trav->saved_at.tv_usec);
gf_log ("rpc-clnt", GF_LOG_ERROR,
- "forced unwinding frame type(%s) op(%s(%d)) called at %s",
- trav->prog->progname, (trav->prog->procnames) ?
- trav->prog->procnames[trav->procnum] : "--",
- trav->procnum, timestr);
+ "forced unwinding frame type(%s) op(%s(%d)) "
+ "called at %s",
+ trav->rpcreq->prog->progname,
+ (trav->rpcreq->prog->procnames) ?
+ trav->rpcreq->prog->procnames[trav->rpcreq->procnum]
+ : "--",
+ trav->rpcreq->procnum, timestr);
saved_frames->count--;
- trav->cbkfn (&req, &iov, 1, trav->frame);
+ trav->rpcreq->rpc_status = -1;
+ trav->rpcreq->cbkfn (trav->rpcreq, &iov, 1, trav->frame);
+ rpc_clnt_reply_deinit (trav->rpcreq,
+ trav->rpcreq->conn->rpc_clnt->reqpool);
list_del_init (&trav->list);
- GF_FREE (trav);
+ mem_put (trav->rpcreq->conn->rpc_clnt->saved_frames_pool, trav);
}
}
@@ -407,9 +419,9 @@ rpc_clnt_fill_request_info (struct rpc_clnt *clnt, rpc_request_info_t *info)
goto out;
}
- info->prognum = saved_frame.prog->prognum;
- info->procnum = saved_frame.procnum;
- info->progver = saved_frame.prog->progver;
+ info->prognum = saved_frame.rpcreq->prog->prognum;
+ info->procnum = saved_frame.rpcreq->procnum;
+ info->progver = saved_frame.rpcreq->prog->progver;
info->rsp = saved_frame.rsp;
ret = 0;
@@ -490,9 +502,10 @@ int
rpc_clnt_reply_fill (rpc_transport_pollin_t *msg,
rpc_clnt_connection_t *conn,
struct rpc_msg *replymsg, struct iovec progmsg,
- struct rpc_req *req, struct saved_frame *saved_frame)
+ struct rpc_req *req,
+ struct saved_frame *saved_frame)
{
- int ret = -1;
+ int ret = -1;
if ((!conn) || (!replymsg)|| (!req) || (!saved_frame) || (!msg)) {
goto out;
@@ -504,25 +517,14 @@ rpc_clnt_reply_fill (rpc_transport_pollin_t *msg,
req->rpc_status = -1;
}
- req->xid = rpc_reply_xid (replymsg);
- req->prog = saved_frame->prog;
- req->procnum = saved_frame->procnum;
- req->conn = conn;
-
req->rsp[0] = progmsg;
+ req->rsp_iobref = iobref_ref (msg->iobref);
if (msg->vectored) {
- req->rsp[1].iov_base = iobuf_ptr (msg->data.vector.iobuf2);
- req->rsp[1].iov_len = msg->data.vector.size2;
-
+ req->rsp[1] = msg->vector[1];
req->rspcnt = 2;
-
- req->rsp_prochdr = iobuf_ref (msg->data.vector.iobuf1);
- req->rsp_procpayload = iobuf_ref (msg->data.vector.iobuf2);
} else {
req->rspcnt = 1;
-
- req->rsp_prochdr = iobuf_ref (msg->data.simple.iobuf);
}
/* By this time, the data bytes for the auth scheme would have already
@@ -544,20 +546,17 @@ out:
void
-rpc_clnt_reply_deinit (struct rpc_req *req)
+rpc_clnt_reply_deinit (struct rpc_req *req, struct mem_pool *pool)
{
if (!req) {
goto out;
}
- if (req->rsp_prochdr) {
- iobuf_unref (req->rsp_prochdr);
- }
-
- if (req->rsp_procpayload) {
- iobuf_unref (req->rsp_procpayload);
+ if (req->rsp_iobref) {
+ iobref_unref (req->rsp_iobref);
}
+ mem_put (pool, req);
out:
return;
}
@@ -574,13 +573,8 @@ rpc_clnt_reply_init (rpc_clnt_connection_t *conn, rpc_transport_pollin_t *msg,
size_t msglen = 0;
int ret = -1;
- if (msg->vectored) {
- msgbuf = iobuf_ptr (msg->data.vector.iobuf1);
- msglen = msg->data.vector.size1;
- } else {
- msgbuf = iobuf_ptr (msg->data.simple.iobuf);
- msglen = msg->data.simple.size;
- }
+ msgbuf = msg->vector[0].iov_base;
+ msglen = msg->vector[0].iov_len;
ret = xdr_to_rpc_reply (msgbuf, msglen, &rpcmsg, &progmsg,
req->verf.authdata);
@@ -595,10 +589,11 @@ rpc_clnt_reply_init (rpc_clnt_connection_t *conn, rpc_transport_pollin_t *msg,
goto out;
}
- gf_log ("rpc-clnt", GF_LOG_TRACE, "RPC XID: %"PRIx64", Program: %s,"
- " ProgVers: %d, Proc: %d", saved_frame->callid,
- saved_frame->prog->progname, saved_frame->prog->progver,
- saved_frame->procnum);
+ gf_log ("rpc-clnt", GF_LOG_TRACE, "RPC XID: %d Program: %s,"
+ " ProgVers: %d, Proc: %d", saved_frame->rpcreq->xid,
+ saved_frame->rpcreq->prog->progname,
+ saved_frame->rpcreq->prog->progver,
+ saved_frame->rpcreq->procnum);
/* TODO: */
/* TODO: AUTH */
/* The verifier that is sent in a reply is a string that can be used as
@@ -639,8 +634,7 @@ rpc_clnt_handle_reply (struct rpc_clnt *clnt, rpc_transport_pollin_t *pollin)
struct saved_frame *saved_frame = NULL;
rpc_request_info_t *request_info = NULL;
int ret = -1;
- struct rpc_req req = {0, };
- int cbk_ret = -1;
+ struct rpc_req *req = NULL;
conn = &clnt->conn;
@@ -657,26 +651,36 @@ rpc_clnt_handle_reply (struct rpc_clnt *clnt, rpc_transport_pollin_t *pollin)
goto out;
}
- ret = rpc_clnt_reply_init (conn, pollin, &req, saved_frame);
+ req = saved_frame->rpcreq;
+ if (req == NULL) {
+ gf_log ("rpc-clnt", GF_LOG_CRITICAL,
+ "saved_frame for reply with xid (%d), "
+ "prog-version (%d), prog-num (%d), procnum (%d)"
+ "does not contain rpc-req", request_info->xid,
+ request_info->progver, request_info->prognum,
+ request_info->procnum);
+ goto out;
+ }
+
+ ret = rpc_clnt_reply_init (conn, pollin, req, saved_frame);
if (ret != 0) {
- req.rpc_status = -1;
+ req->rpc_status = -1;
gf_log ("rpc-clnt", GF_LOG_DEBUG, "initialising rpc reply "
"failed");
}
- cbk_ret = saved_frame->cbkfn (&req, req.rsp, req.rspcnt,
- saved_frame->frame);
-
- if (ret == 0) {
- rpc_clnt_reply_deinit (&req);
+ req->cbkfn (req, req->rsp, req->rspcnt, saved_frame->frame);
+
+ if (req) {
+ rpc_clnt_reply_deinit (req, conn->rpc_clnt->reqpool);
}
out:
if (saved_frame) {
- GF_FREE (saved_frame);
+ mem_put (conn->rpc_clnt->saved_frames_pool, saved_frame);
}
- return cbk_ret;
+ return ret;
}
@@ -894,6 +898,25 @@ rpc_clnt_init (struct rpc_clnt_config *config, dict_t *options,
pthread_mutex_init (&rpc->lock, NULL);
rpc->ctx = ctx;
+ rpc->reqpool = mem_pool_new (struct rpc_req,
+ RPC_CLNT_DEFAULT_REQUEST_COUNT);
+ if (rpc->reqpool == NULL) {
+ pthread_mutex_destroy (&rpc->lock);
+ GF_FREE (rpc);
+ rpc = NULL;
+ goto out;
+ }
+
+ rpc->saved_frames_pool = mem_pool_new (struct saved_frame,
+ RPC_CLNT_DEFAULT_REQUEST_COUNT);
+ if (rpc->saved_frames_pool == NULL) {
+ pthread_mutex_destroy (&rpc->lock);
+ mem_pool_destroy (rpc->reqpool);
+ GF_FREE (rpc);
+ rpc = NULL;
+ goto out;
+ }
+
ret = rpc_clnt_connection_init (rpc, ctx, options, name);
if (ret == -1) {
pthread_mutex_destroy (&rpc->lock);
@@ -901,6 +924,7 @@ rpc_clnt_init (struct rpc_clnt_config *config, dict_t *options,
rpc = NULL;
goto out;
}
+
out:
return rpc;
}
@@ -1167,12 +1191,14 @@ rpc_clnt_submit (struct rpc_clnt *rpc, rpc_clnt_prog_t *prog,
int procnum, fop_cbk_fn_t cbkfn,
struct iovec *proghdr, int proghdrcount,
struct iovec *progpayload, int progpayloadcount,
- struct iobref *iobref, void *frame)
+ struct iobref *iobref, void *frame, struct iovec *rsphdr,
+ int rsphdr_count, struct iovec *rsp_payload,
+ int rsp_payload_count, struct iobref *rsp_iobref)
{
rpc_clnt_connection_t *conn = NULL;
struct iobuf *request_iob = NULL;
struct iovec rpchdr = {0,};
- struct rpc_req rpcreq = {0,};
+ struct rpc_req *rpcreq = NULL;
rpc_transport_req_t req;
int ret = -1;
int proglen = 0;
@@ -1183,6 +1209,13 @@ rpc_clnt_submit (struct rpc_clnt *rpc, rpc_clnt_prog_t *prog,
goto out;
}
+ rpcreq = mem_get (rpc->reqpool);
+ if (rpcreq == NULL) {
+ gf_log ("rpc-clnt", GF_LOG_ERROR, "out of memory");
+ goto out;
+ }
+
+ memset (rpcreq, 0, sizeof (*rpcreq));
memset (&req, 0, sizeof (req));
if (!iobref) {
@@ -1199,6 +1232,12 @@ rpc_clnt_submit (struct rpc_clnt *rpc, rpc_clnt_prog_t *prog,
conn = &rpc->conn;
+ rpcreq->prog = prog;
+ rpcreq->procnum = procnum;
+ rpcreq->conn = conn;
+ rpcreq->xid = callid;
+ rpcreq->cbkfn = cbkfn;
+
pthread_mutex_lock (&conn->lock);
{
if (conn->connected == 0) {
@@ -1235,6 +1274,12 @@ rpc_clnt_submit (struct rpc_clnt *rpc, rpc_clnt_prog_t *prog,
req.msg.progpayloadcount = progpayloadcount;
req.msg.iobref = iobref;
+ req.rsp.rsphdr = rsphdr;
+ req.rsp.rsphdr_count = rsphdr_count;
+ req.rsp.rsp_payload = rsp_payload;
+ req.rsp.rsp_payload_count = rsp_payload_count;
+ req.rsp.rsp_iobref = rsp_iobref;
+
ret = rpc_transport_submit_request (rpc->conn.trans,
&req);
if (ret == -1) {
@@ -1245,9 +1290,8 @@ rpc_clnt_submit (struct rpc_clnt *rpc, rpc_clnt_prog_t *prog,
if ((ret >= 0) && frame) {
gettimeofday (&conn->last_sent, NULL);
/* Save the frame in queue */
- __save_frame (rpc, frame, procnum, prog, cbkfn, callid);
+ __save_frame (rpc, frame, rpcreq);
}
-
}
unlock:
pthread_mutex_unlock (&conn->lock);
@@ -1266,8 +1310,9 @@ out:
}
if (frame && (ret == -1)) {
- rpcreq.rpc_status = -1;
- cbkfn (&rpcreq, NULL, 0, frame);
+ rpcreq->rpc_status = -1;
+ cbkfn (rpcreq, NULL, 0, frame);
+ mem_put (rpc->reqpool, rpcreq);
}
return ret;
}