diff options
Diffstat (limited to 'rpc/rpc-lib/src/rpc-clnt.c')
| -rw-r--r-- | rpc/rpc-lib/src/rpc-clnt.c | 219 | 
1 files changed, 132 insertions, 87 deletions
diff --git a/rpc/rpc-lib/src/rpc-clnt.c b/rpc/rpc-lib/src/rpc-clnt.c index 9b0bfe33d5b..fce3e8200fe 100644 --- a/rpc/rpc-lib/src/rpc-clnt.c +++ b/rpc/rpc-lib/src/rpc-clnt.c @@ -23,10 +23,16 @@  #include "config.h"  #endif +#define RPC_CLNT_DEFAULT_REQUEST_COUNT 4096 +  #include "rpc-clnt.h"  #include "xdr-rpcclnt.h"  #include "rpc-transport.h"  #include "protocol-common.h" +#include "mem-pool.h" + +void +rpc_clnt_reply_deinit (struct rpc_req *req, struct mem_pool *pool);  uint64_t  rpc_clnt_new_callid (struct rpc_clnt *clnt) @@ -63,28 +69,24 @@ __saved_frames_get_timedout (struct saved_frames *frames, uint32_t timeout,  struct saved_frame * -__saved_frames_put (struct saved_frames *frames, void *frame, int32_t procnum, -                    rpc_clnt_prog_t *prog, fop_cbk_fn_t cbk, int64_t callid) +__saved_frames_put (struct saved_frames *frames, void *frame, +                    struct rpc_req *rpcreq)  {  	struct saved_frame *saved_frame = NULL; -	saved_frame = GF_CALLOC (1, sizeof (*saved_frame), -                                 gf_common_mt_rpcclnt_savedframe_t); +        saved_frame = mem_get (rpcreq->conn->rpc_clnt->saved_frames_pool);  	if (!saved_frame) {                  gf_log ("rpc-clnt", GF_LOG_ERROR, "out of memory");                  goto out;  	}          /* THIS should be saved and set back */ +        memset (saved_frame, 0, sizeof (*saved_frame));  	INIT_LIST_HEAD (&saved_frame->list);  	saved_frame->capital_this = THIS;  	saved_frame->frame        = frame; -	saved_frame->procnum      = procnum; -	saved_frame->callid       = callid; -        saved_frame->prog         = prog; -        saved_frame->cbkfn        = cbk; - +        saved_frame->rpcreq       = rpcreq;  	gettimeofday (&saved_frame->saved_at, NULL);  	list_add_tail (&saved_frame->list, &frames->sf.list); @@ -110,7 +112,12 @@ saved_frames_delete (struct saved_frame *saved_frame,          }          pthread_mutex_unlock (&conn->lock); -        GF_FREE (saved_frame); +        if (saved_frame->rpcreq != NULL) { +                rpc_clnt_reply_deinit (saved_frame->rpcreq, +                                       conn->rpc_clnt->reqpool); +        } + +        mem_put (conn->rpc_clnt->saved_frames_pool, saved_frame);  out:          return;  } @@ -129,7 +136,6 @@ call_bail (void *data)          struct tm              frame_sent_tm;          char                   frame_sent[32] = {0,};          struct timeval         timeout = {0,}; -        struct rpc_req         req;          struct iovec           iov = {0,};          GF_VALIDATE_OR_GOTO ("client", data, out); @@ -180,15 +186,19 @@ call_bail (void *data)  		gf_log (conn->trans->name, GF_LOG_ERROR,  			"bailing out frame type(%s) op(%s(%d)) sent = %s. "                          "timeout = %d", -			trav->prog->progname, (trav->prog->procnames) ? -                        trav->prog->procnames[trav->procnum] : "--", -                        trav->procnum, frame_sent, +			trav->rpcreq->prog->progname, +                        (trav->rpcreq->prog->procnames) ? +                        trav->rpcreq->prog->procnames[trav->rpcreq->procnum] : +                        "--", +                        trav->rpcreq->procnum, frame_sent,                          conn->frame_timeout); -		trav->cbkfn (&req, &iov, 1, trav->frame); +                trav->rpcreq->rpc_status = -1; +		trav->rpcreq->cbkfn (trav->rpcreq, &iov, 1, trav->frame); +                rpc_clnt_reply_deinit (trav->rpcreq, clnt->reqpool);                  list_del_init (&trav->list); -                GF_FREE (trav); +                mem_put (conn->rpc_clnt->saved_frames_pool, trav);          }  out:          return; @@ -197,8 +207,8 @@ out:  /* to be called with conn->lock held */  struct saved_frame * -__save_frame (struct rpc_clnt *rpc_clnt, call_frame_t *frame, int procnum, -              rpc_clnt_prog_t *prog, fop_cbk_fn_t cbk, uint64_t callid) +__save_frame (struct rpc_clnt *rpc_clnt, call_frame_t *frame, +              struct rpc_req *rpcreq)  {          rpc_clnt_connection_t *conn        = NULL;          struct timeval         timeout     = {0, }; @@ -206,8 +216,8 @@ __save_frame (struct rpc_clnt *rpc_clnt, call_frame_t *frame, int procnum,          conn = &rpc_clnt->conn; -        saved_frame = __saved_frames_put (conn->saved_frames, frame, -                                          procnum, prog, cbk, callid); +        saved_frame = __saved_frames_put (conn->saved_frames, frame, rpcreq); +          if (saved_frame == NULL) {                  goto out;          } @@ -258,7 +268,7 @@ __saved_frame_copy (struct saved_frames *frames, int64_t callid,          }  	list_for_each_entry (tmp, &frames->sf.list, list) { -		if (tmp->callid == callid) { +		if (tmp->rpcreq->xid == callid) {  			*saved_frame = *tmp;                          ret = 0;  			break; @@ -277,7 +287,7 @@ __saved_frame_get (struct saved_frames *frames, int64_t callid)  	struct saved_frame *tmp = NULL;  	list_for_each_entry (tmp, &frames->sf.list, list) { -		if (tmp->callid == callid) { +		if (tmp->rpcreq->xid == callid) {  			list_del_init (&tmp->list);  			frames->count--;  			saved_frame = tmp; @@ -300,32 +310,34 @@ saved_frames_unwind (struct saved_frames *saved_frames)          struct tm            *frame_sent_tm = NULL;          char                 timestr[256] = {0,}; -        struct rpc_req        req;          struct iovec          iov = {0,}; -        memset (&req, 0, sizeof (req)); - -        req.rpc_status = -1; -  	list_for_each_entry_safe (trav, tmp, &saved_frames->sf.list, list) {                  frame_sent_tm = localtime (&trav->saved_at.tv_sec);                  strftime (timestr, sizeof(timestr), "%Y-%m-%d %H:%M:%S",                            frame_sent_tm); -                snprintf (timestr + strlen (timestr), sizeof(timestr) - strlen (timestr), +                snprintf (timestr + strlen (timestr), +                          sizeof(timestr) - strlen (timestr),                            ".%"GF_PRI_SUSECONDS, trav->saved_at.tv_usec);  		gf_log ("rpc-clnt", GF_LOG_ERROR, -			"forced unwinding frame type(%s) op(%s(%d)) called at %s", -			trav->prog->progname, (trav->prog->procnames) ? -                        trav->prog->procnames[trav->procnum] : "--", -                        trav->procnum, timestr); +			"forced unwinding frame type(%s) op(%s(%d)) " +                        "called at %s", +			trav->rpcreq->prog->progname, +                        (trav->rpcreq->prog->procnames) ? +                        trav->rpcreq->prog->procnames[trav->rpcreq->procnum] +                        : "--", +                        trav->rpcreq->procnum, timestr);  		saved_frames->count--; -		trav->cbkfn (&req, &iov, 1, trav->frame); +                trav->rpcreq->rpc_status = -1; +                trav->rpcreq->cbkfn (trav->rpcreq, &iov, 1, trav->frame); +                rpc_clnt_reply_deinit (trav->rpcreq, +                                       trav->rpcreq->conn->rpc_clnt->reqpool);  		list_del_init (&trav->list); -		GF_FREE (trav); +                mem_put (trav->rpcreq->conn->rpc_clnt->saved_frames_pool, trav);  	}  } @@ -407,9 +419,9 @@ rpc_clnt_fill_request_info (struct rpc_clnt *clnt, rpc_request_info_t *info)                  goto out;          } -        info->prognum = saved_frame.prog->prognum; -        info->procnum = saved_frame.procnum; -        info->progver = saved_frame.prog->progver; +        info->prognum = saved_frame.rpcreq->prog->prognum; +        info->procnum = saved_frame.rpcreq->procnum; +        info->progver = saved_frame.rpcreq->prog->progver;          info->rsp     = saved_frame.rsp;          ret = 0; @@ -490,9 +502,10 @@ int  rpc_clnt_reply_fill (rpc_transport_pollin_t *msg,                       rpc_clnt_connection_t *conn,                       struct rpc_msg *replymsg, struct iovec progmsg, -                     struct rpc_req *req, struct saved_frame *saved_frame) +                     struct rpc_req *req, +                     struct saved_frame *saved_frame)  { -        int           ret   = -1; +        int             ret   = -1;          if ((!conn) || (!replymsg)|| (!req) || (!saved_frame) || (!msg)) {                  goto out; @@ -504,25 +517,14 @@ rpc_clnt_reply_fill (rpc_transport_pollin_t *msg,                  req->rpc_status = -1;          } -        req->xid = rpc_reply_xid (replymsg); -        req->prog = saved_frame->prog; -        req->procnum = saved_frame->procnum; -        req->conn = conn; -          req->rsp[0] = progmsg; +        req->rsp_iobref = iobref_ref (msg->iobref);          if (msg->vectored) { -                req->rsp[1].iov_base = iobuf_ptr (msg->data.vector.iobuf2); -                req->rsp[1].iov_len = msg->data.vector.size2; - +                req->rsp[1] = msg->vector[1];                  req->rspcnt = 2; - -                req->rsp_prochdr = iobuf_ref (msg->data.vector.iobuf1); -                req->rsp_procpayload = iobuf_ref (msg->data.vector.iobuf2);          } else {                  req->rspcnt = 1; - -                req->rsp_prochdr = iobuf_ref (msg->data.simple.iobuf);          }          /* By this time, the data bytes for the auth scheme would have already @@ -544,20 +546,17 @@ out:  void -rpc_clnt_reply_deinit (struct rpc_req *req) +rpc_clnt_reply_deinit (struct rpc_req *req, struct mem_pool *pool)  {          if (!req) {                  goto out;          } -        if (req->rsp_prochdr) { -                iobuf_unref (req->rsp_prochdr); -        } - -        if (req->rsp_procpayload) { -                iobuf_unref (req->rsp_procpayload); +        if (req->rsp_iobref) { +                iobref_unref (req->rsp_iobref);          } +        mem_put (pool, req);  out:          return;  } @@ -574,13 +573,8 @@ rpc_clnt_reply_init (rpc_clnt_connection_t *conn, rpc_transport_pollin_t *msg,          size_t                  msglen  = 0;          int                     ret     = -1; -        if (msg->vectored) { -                msgbuf = iobuf_ptr (msg->data.vector.iobuf1); -                msglen = msg->data.vector.size1; -        } else { -                msgbuf = iobuf_ptr (msg->data.simple.iobuf); -                msglen = msg->data.simple.size; -        } +        msgbuf = msg->vector[0].iov_base; +        msglen = msg->vector[0].iov_len;          ret = xdr_to_rpc_reply (msgbuf, msglen, &rpcmsg, &progmsg,                                  req->verf.authdata); @@ -595,10 +589,11 @@ rpc_clnt_reply_init (rpc_clnt_connection_t *conn, rpc_transport_pollin_t *msg,                  goto out;          } -        gf_log ("rpc-clnt", GF_LOG_TRACE, "RPC XID: %"PRIx64", Program: %s," -                " ProgVers: %d, Proc: %d", saved_frame->callid, -                saved_frame->prog->progname, saved_frame->prog->progver, -                saved_frame->procnum); +        gf_log ("rpc-clnt", GF_LOG_TRACE, "RPC XID: %d Program: %s," +                " ProgVers: %d, Proc: %d", saved_frame->rpcreq->xid, +                saved_frame->rpcreq->prog->progname, +                saved_frame->rpcreq->prog->progver, +                saved_frame->rpcreq->procnum);  /* TODO: */          /* TODO: AUTH */          /* The verifier that is sent in a reply is a string that can be used as @@ -639,8 +634,7 @@ rpc_clnt_handle_reply (struct rpc_clnt *clnt, rpc_transport_pollin_t *pollin)          struct saved_frame    *saved_frame  = NULL;          rpc_request_info_t    *request_info = NULL;          int                    ret          = -1; -        struct rpc_req         req          = {0, }; -        int                    cbk_ret      = -1; +        struct rpc_req        *req          = NULL;          conn = &clnt->conn; @@ -657,26 +651,36 @@ rpc_clnt_handle_reply (struct rpc_clnt *clnt, rpc_transport_pollin_t *pollin)                  goto out;          } -        ret = rpc_clnt_reply_init (conn, pollin, &req, saved_frame); +        req = saved_frame->rpcreq; +        if (req == NULL) { +                gf_log ("rpc-clnt", GF_LOG_CRITICAL, +                        "saved_frame for reply with xid (%d), " +                        "prog-version (%d), prog-num (%d), procnum (%d)" +                        "does not contain rpc-req", request_info->xid, +                        request_info->progver, request_info->prognum, +                        request_info->procnum); +                goto out; +        } +  +        ret = rpc_clnt_reply_init (conn, pollin, req, saved_frame);          if (ret != 0) { -                req.rpc_status = -1; +                req->rpc_status = -1;                  gf_log ("rpc-clnt", GF_LOG_DEBUG, "initialising rpc reply "                          "failed");          } -        cbk_ret = saved_frame->cbkfn (&req, req.rsp, req.rspcnt, -                                      saved_frame->frame); - -        if (ret == 0) { -                rpc_clnt_reply_deinit (&req); +        req->cbkfn (req, req->rsp, req->rspcnt, saved_frame->frame); +          +        if (req) { +                rpc_clnt_reply_deinit (req, conn->rpc_clnt->reqpool);          }  out:          if (saved_frame) { -                GF_FREE (saved_frame); +                mem_put (conn->rpc_clnt->saved_frames_pool, saved_frame);          } -        return cbk_ret; +        return ret;  } @@ -894,6 +898,25 @@ rpc_clnt_init (struct rpc_clnt_config *config, dict_t *options,          pthread_mutex_init (&rpc->lock, NULL);          rpc->ctx = ctx; +        rpc->reqpool = mem_pool_new (struct rpc_req, +                                     RPC_CLNT_DEFAULT_REQUEST_COUNT); +        if (rpc->reqpool == NULL) { +                pthread_mutex_destroy (&rpc->lock); +                GF_FREE (rpc); +                rpc = NULL; +                goto out; +        } + +        rpc->saved_frames_pool = mem_pool_new (struct saved_frame, +                                              RPC_CLNT_DEFAULT_REQUEST_COUNT); +        if (rpc->saved_frames_pool == NULL) { +                pthread_mutex_destroy (&rpc->lock); +                mem_pool_destroy (rpc->reqpool); +                GF_FREE (rpc); +                rpc = NULL; +                goto out; +        } +          ret = rpc_clnt_connection_init (rpc, ctx, options, name);          if (ret == -1) {                  pthread_mutex_destroy (&rpc->lock); @@ -901,6 +924,7 @@ rpc_clnt_init (struct rpc_clnt_config *config, dict_t *options,                  rpc = NULL;                  goto out;          } +  out:          return rpc;  } @@ -1167,12 +1191,14 @@ rpc_clnt_submit (struct rpc_clnt *rpc, rpc_clnt_prog_t *prog,                   int procnum, fop_cbk_fn_t cbkfn,                   struct iovec *proghdr, int proghdrcount,                   struct iovec *progpayload, int progpayloadcount, -                 struct iobref *iobref, void *frame) +                 struct iobref *iobref, void *frame, struct iovec *rsphdr, +                 int rsphdr_count, struct iovec *rsp_payload, +                 int rsp_payload_count, struct iobref *rsp_iobref)  {          rpc_clnt_connection_t *conn        = NULL;          struct iobuf          *request_iob = NULL;          struct iovec           rpchdr      = {0,}; -        struct rpc_req         rpcreq      = {0,}; +        struct rpc_req        *rpcreq      = NULL;          rpc_transport_req_t    req;          int                    ret         = -1;          int                    proglen     = 0; @@ -1183,6 +1209,13 @@ rpc_clnt_submit (struct rpc_clnt *rpc, rpc_clnt_prog_t *prog,                  goto out;          } +        rpcreq = mem_get (rpc->reqpool); +        if (rpcreq == NULL) { +                gf_log ("rpc-clnt", GF_LOG_ERROR, "out of memory"); +                goto out; +        } + +        memset (rpcreq, 0, sizeof (*rpcreq));          memset (&req, 0, sizeof (req));          if (!iobref) { @@ -1199,6 +1232,12 @@ rpc_clnt_submit (struct rpc_clnt *rpc, rpc_clnt_prog_t *prog,          conn = &rpc->conn; +        rpcreq->prog = prog; +        rpcreq->procnum = procnum; +        rpcreq->conn = conn; +        rpcreq->xid = callid; +        rpcreq->cbkfn = cbkfn; +          pthread_mutex_lock (&conn->lock);          {                  if (conn->connected == 0) { @@ -1235,6 +1274,12 @@ rpc_clnt_submit (struct rpc_clnt *rpc, rpc_clnt_prog_t *prog,                  req.msg.progpayloadcount = progpayloadcount;                  req.msg.iobref = iobref; +                req.rsp.rsphdr = rsphdr; +                req.rsp.rsphdr_count = rsphdr_count; +                req.rsp.rsp_payload = rsp_payload; +                req.rsp.rsp_payload_count = rsp_payload_count; +                req.rsp.rsp_iobref = rsp_iobref; +                  ret = rpc_transport_submit_request (rpc->conn.trans,                                                      &req);                  if (ret == -1) { @@ -1245,9 +1290,8 @@ rpc_clnt_submit (struct rpc_clnt *rpc, rpc_clnt_prog_t *prog,                  if ((ret >= 0) && frame) {                          gettimeofday (&conn->last_sent, NULL);                          /* Save the frame in queue */ -                        __save_frame (rpc, frame, procnum, prog, cbkfn, callid); +                        __save_frame (rpc, frame, rpcreq);                  } -          }  unlock:          pthread_mutex_unlock (&conn->lock); @@ -1266,8 +1310,9 @@ out:          }          if (frame && (ret == -1)) { -                rpcreq.rpc_status = -1; -                cbkfn (&rpcreq, NULL, 0, frame); +                rpcreq->rpc_status = -1; +                cbkfn (rpcreq, NULL, 0, frame); +                mem_put (rpc->reqpool, rpcreq);          }          return ret;  }  | 
