diff options
Diffstat (limited to 'rpc')
| -rw-r--r-- | rpc/rpc-lib/src/rpc-clnt.c | 219 | ||||
| -rw-r--r-- | rpc/rpc-lib/src/rpc-clnt.h | 36 | ||||
| -rw-r--r-- | rpc/rpc-lib/src/rpc-transport.c | 96 | ||||
| -rw-r--r-- | rpc/rpc-lib/src/rpc-transport.h | 44 | ||||
| -rw-r--r-- | rpc/rpc-lib/src/rpcsvc.c | 33 | ||||
| -rw-r--r-- | rpc/rpc-lib/src/rpcsvc.h | 20 | ||||
| -rw-r--r-- | rpc/rpc-lib/src/xdr-common.h | 8 | ||||
| -rw-r--r-- | rpc/rpc-transport/socket/src/socket.c | 129 | ||||
| -rw-r--r-- | rpc/rpc-transport/socket/src/socket.h | 4 | 
9 files changed, 359 insertions, 230 deletions
diff --git a/rpc/rpc-lib/src/rpc-clnt.c b/rpc/rpc-lib/src/rpc-clnt.c index 9b0bfe33d5b..fce3e8200fe 100644 --- a/rpc/rpc-lib/src/rpc-clnt.c +++ b/rpc/rpc-lib/src/rpc-clnt.c @@ -23,10 +23,16 @@  #include "config.h"  #endif +#define RPC_CLNT_DEFAULT_REQUEST_COUNT 4096 +  #include "rpc-clnt.h"  #include "xdr-rpcclnt.h"  #include "rpc-transport.h"  #include "protocol-common.h" +#include "mem-pool.h" + +void +rpc_clnt_reply_deinit (struct rpc_req *req, struct mem_pool *pool);  uint64_t  rpc_clnt_new_callid (struct rpc_clnt *clnt) @@ -63,28 +69,24 @@ __saved_frames_get_timedout (struct saved_frames *frames, uint32_t timeout,  struct saved_frame * -__saved_frames_put (struct saved_frames *frames, void *frame, int32_t procnum, -                    rpc_clnt_prog_t *prog, fop_cbk_fn_t cbk, int64_t callid) +__saved_frames_put (struct saved_frames *frames, void *frame, +                    struct rpc_req *rpcreq)  {  	struct saved_frame *saved_frame = NULL; -	saved_frame = GF_CALLOC (1, sizeof (*saved_frame), -                                 gf_common_mt_rpcclnt_savedframe_t); +        saved_frame = mem_get (rpcreq->conn->rpc_clnt->saved_frames_pool);  	if (!saved_frame) {                  gf_log ("rpc-clnt", GF_LOG_ERROR, "out of memory");                  goto out;  	}          /* THIS should be saved and set back */ +        memset (saved_frame, 0, sizeof (*saved_frame));  	INIT_LIST_HEAD (&saved_frame->list);  	saved_frame->capital_this = THIS;  	saved_frame->frame        = frame; -	saved_frame->procnum      = procnum; -	saved_frame->callid       = callid; -        saved_frame->prog         = prog; -        saved_frame->cbkfn        = cbk; - +        saved_frame->rpcreq       = rpcreq;  	gettimeofday (&saved_frame->saved_at, NULL);  	list_add_tail (&saved_frame->list, &frames->sf.list); @@ -110,7 +112,12 @@ saved_frames_delete (struct saved_frame *saved_frame,          }          pthread_mutex_unlock (&conn->lock); -        GF_FREE (saved_frame); +        if (saved_frame->rpcreq != NULL) { +                rpc_clnt_reply_deinit (saved_frame->rpcreq, +                                       conn->rpc_clnt->reqpool); +        } + +        mem_put (conn->rpc_clnt->saved_frames_pool, saved_frame);  out:          return;  } @@ -129,7 +136,6 @@ call_bail (void *data)          struct tm              frame_sent_tm;          char                   frame_sent[32] = {0,};          struct timeval         timeout = {0,}; -        struct rpc_req         req;          struct iovec           iov = {0,};          GF_VALIDATE_OR_GOTO ("client", data, out); @@ -180,15 +186,19 @@ call_bail (void *data)  		gf_log (conn->trans->name, GF_LOG_ERROR,  			"bailing out frame type(%s) op(%s(%d)) sent = %s. "                          "timeout = %d", -			trav->prog->progname, (trav->prog->procnames) ? -                        trav->prog->procnames[trav->procnum] : "--", -                        trav->procnum, frame_sent, +			trav->rpcreq->prog->progname, +                        (trav->rpcreq->prog->procnames) ? +                        trav->rpcreq->prog->procnames[trav->rpcreq->procnum] : +                        "--", +                        trav->rpcreq->procnum, frame_sent,                          conn->frame_timeout); -		trav->cbkfn (&req, &iov, 1, trav->frame); +                trav->rpcreq->rpc_status = -1; +		trav->rpcreq->cbkfn (trav->rpcreq, &iov, 1, trav->frame); +                rpc_clnt_reply_deinit (trav->rpcreq, clnt->reqpool);                  list_del_init (&trav->list); -                GF_FREE (trav); +                mem_put (conn->rpc_clnt->saved_frames_pool, trav);          }  out:          return; @@ -197,8 +207,8 @@ out:  /* to be called with conn->lock held */  struct saved_frame * -__save_frame (struct rpc_clnt *rpc_clnt, call_frame_t *frame, int procnum, -              rpc_clnt_prog_t *prog, fop_cbk_fn_t cbk, uint64_t callid) +__save_frame (struct rpc_clnt *rpc_clnt, call_frame_t *frame, +              struct rpc_req *rpcreq)  {          rpc_clnt_connection_t *conn        = NULL;          struct timeval         timeout     = {0, }; @@ -206,8 +216,8 @@ __save_frame (struct rpc_clnt *rpc_clnt, call_frame_t *frame, int procnum,          conn = &rpc_clnt->conn; -        saved_frame = __saved_frames_put (conn->saved_frames, frame, -                                          procnum, prog, cbk, callid); +        saved_frame = __saved_frames_put (conn->saved_frames, frame, rpcreq); +          if (saved_frame == NULL) {                  goto out;          } @@ -258,7 +268,7 @@ __saved_frame_copy (struct saved_frames *frames, int64_t callid,          }  	list_for_each_entry (tmp, &frames->sf.list, list) { -		if (tmp->callid == callid) { +		if (tmp->rpcreq->xid == callid) {  			*saved_frame = *tmp;                          ret = 0;  			break; @@ -277,7 +287,7 @@ __saved_frame_get (struct saved_frames *frames, int64_t callid)  	struct saved_frame *tmp = NULL;  	list_for_each_entry (tmp, &frames->sf.list, list) { -		if (tmp->callid == callid) { +		if (tmp->rpcreq->xid == callid) {  			list_del_init (&tmp->list);  			frames->count--;  			saved_frame = tmp; @@ -300,32 +310,34 @@ saved_frames_unwind (struct saved_frames *saved_frames)          struct tm            *frame_sent_tm = NULL;          char                 timestr[256] = {0,}; -        struct rpc_req        req;          struct iovec          iov = {0,}; -        memset (&req, 0, sizeof (req)); - -        req.rpc_status = -1; -  	list_for_each_entry_safe (trav, tmp, &saved_frames->sf.list, list) {                  frame_sent_tm = localtime (&trav->saved_at.tv_sec);                  strftime (timestr, sizeof(timestr), "%Y-%m-%d %H:%M:%S",                            frame_sent_tm); -                snprintf (timestr + strlen (timestr), sizeof(timestr) - strlen (timestr), +                snprintf (timestr + strlen (timestr), +                          sizeof(timestr) - strlen (timestr),                            ".%"GF_PRI_SUSECONDS, trav->saved_at.tv_usec);  		gf_log ("rpc-clnt", GF_LOG_ERROR, -			"forced unwinding frame type(%s) op(%s(%d)) called at %s", -			trav->prog->progname, (trav->prog->procnames) ? -                        trav->prog->procnames[trav->procnum] : "--", -                        trav->procnum, timestr); +			"forced unwinding frame type(%s) op(%s(%d)) " +                        "called at %s", +			trav->rpcreq->prog->progname, +                        (trav->rpcreq->prog->procnames) ? +                        trav->rpcreq->prog->procnames[trav->rpcreq->procnum] +                        : "--", +                        trav->rpcreq->procnum, timestr);  		saved_frames->count--; -		trav->cbkfn (&req, &iov, 1, trav->frame); +                trav->rpcreq->rpc_status = -1; +                trav->rpcreq->cbkfn (trav->rpcreq, &iov, 1, trav->frame); +                rpc_clnt_reply_deinit (trav->rpcreq, +                                       trav->rpcreq->conn->rpc_clnt->reqpool);  		list_del_init (&trav->list); -		GF_FREE (trav); +                mem_put (trav->rpcreq->conn->rpc_clnt->saved_frames_pool, trav);  	}  } @@ -407,9 +419,9 @@ rpc_clnt_fill_request_info (struct rpc_clnt *clnt, rpc_request_info_t *info)                  goto out;          } -        info->prognum = saved_frame.prog->prognum; -        info->procnum = saved_frame.procnum; -        info->progver = saved_frame.prog->progver; +        info->prognum = saved_frame.rpcreq->prog->prognum; +        info->procnum = saved_frame.rpcreq->procnum; +        info->progver = saved_frame.rpcreq->prog->progver;          info->rsp     = saved_frame.rsp;          ret = 0; @@ -490,9 +502,10 @@ int  rpc_clnt_reply_fill (rpc_transport_pollin_t *msg,                       rpc_clnt_connection_t *conn,                       struct rpc_msg *replymsg, struct iovec progmsg, -                     struct rpc_req *req, struct saved_frame *saved_frame) +                     struct rpc_req *req, +                     struct saved_frame *saved_frame)  { -        int           ret   = -1; +        int             ret   = -1;          if ((!conn) || (!replymsg)|| (!req) || (!saved_frame) || (!msg)) {                  goto out; @@ -504,25 +517,14 @@ rpc_clnt_reply_fill (rpc_transport_pollin_t *msg,                  req->rpc_status = -1;          } -        req->xid = rpc_reply_xid (replymsg); -        req->prog = saved_frame->prog; -        req->procnum = saved_frame->procnum; -        req->conn = conn; -          req->rsp[0] = progmsg; +        req->rsp_iobref = iobref_ref (msg->iobref);          if (msg->vectored) { -                req->rsp[1].iov_base = iobuf_ptr (msg->data.vector.iobuf2); -                req->rsp[1].iov_len = msg->data.vector.size2; - +                req->rsp[1] = msg->vector[1];                  req->rspcnt = 2; - -                req->rsp_prochdr = iobuf_ref (msg->data.vector.iobuf1); -                req->rsp_procpayload = iobuf_ref (msg->data.vector.iobuf2);          } else {                  req->rspcnt = 1; - -                req->rsp_prochdr = iobuf_ref (msg->data.simple.iobuf);          }          /* By this time, the data bytes for the auth scheme would have already @@ -544,20 +546,17 @@ out:  void -rpc_clnt_reply_deinit (struct rpc_req *req) +rpc_clnt_reply_deinit (struct rpc_req *req, struct mem_pool *pool)  {          if (!req) {                  goto out;          } -        if (req->rsp_prochdr) { -                iobuf_unref (req->rsp_prochdr); -        } - -        if (req->rsp_procpayload) { -                iobuf_unref (req->rsp_procpayload); +        if (req->rsp_iobref) { +                iobref_unref (req->rsp_iobref);          } +        mem_put (pool, req);  out:          return;  } @@ -574,13 +573,8 @@ rpc_clnt_reply_init (rpc_clnt_connection_t *conn, rpc_transport_pollin_t *msg,          size_t                  msglen  = 0;          int                     ret     = -1; -        if (msg->vectored) { -                msgbuf = iobuf_ptr (msg->data.vector.iobuf1); -                msglen = msg->data.vector.size1; -        } else { -                msgbuf = iobuf_ptr (msg->data.simple.iobuf); -                msglen = msg->data.simple.size; -        } +        msgbuf = msg->vector[0].iov_base; +        msglen = msg->vector[0].iov_len;          ret = xdr_to_rpc_reply (msgbuf, msglen, &rpcmsg, &progmsg,                                  req->verf.authdata); @@ -595,10 +589,11 @@ rpc_clnt_reply_init (rpc_clnt_connection_t *conn, rpc_transport_pollin_t *msg,                  goto out;          } -        gf_log ("rpc-clnt", GF_LOG_TRACE, "RPC XID: %"PRIx64", Program: %s," -                " ProgVers: %d, Proc: %d", saved_frame->callid, -                saved_frame->prog->progname, saved_frame->prog->progver, -                saved_frame->procnum); +        gf_log ("rpc-clnt", GF_LOG_TRACE, "RPC XID: %d Program: %s," +                " ProgVers: %d, Proc: %d", saved_frame->rpcreq->xid, +                saved_frame->rpcreq->prog->progname, +                saved_frame->rpcreq->prog->progver, +                saved_frame->rpcreq->procnum);  /* TODO: */          /* TODO: AUTH */          /* The verifier that is sent in a reply is a string that can be used as @@ -639,8 +634,7 @@ rpc_clnt_handle_reply (struct rpc_clnt *clnt, rpc_transport_pollin_t *pollin)          struct saved_frame    *saved_frame  = NULL;          rpc_request_info_t    *request_info = NULL;          int                    ret          = -1; -        struct rpc_req         req          = {0, }; -        int                    cbk_ret      = -1; +        struct rpc_req        *req          = NULL;          conn = &clnt->conn; @@ -657,26 +651,36 @@ rpc_clnt_handle_reply (struct rpc_clnt *clnt, rpc_transport_pollin_t *pollin)                  goto out;          } -        ret = rpc_clnt_reply_init (conn, pollin, &req, saved_frame); +        req = saved_frame->rpcreq; +        if (req == NULL) { +                gf_log ("rpc-clnt", GF_LOG_CRITICAL, +                        "saved_frame for reply with xid (%d), " +                        "prog-version (%d), prog-num (%d), procnum (%d)" +                        "does not contain rpc-req", request_info->xid, +                        request_info->progver, request_info->prognum, +                        request_info->procnum); +                goto out; +        } +  +        ret = rpc_clnt_reply_init (conn, pollin, req, saved_frame);          if (ret != 0) { -                req.rpc_status = -1; +                req->rpc_status = -1;                  gf_log ("rpc-clnt", GF_LOG_DEBUG, "initialising rpc reply "                          "failed");          } -        cbk_ret = saved_frame->cbkfn (&req, req.rsp, req.rspcnt, -                                      saved_frame->frame); - -        if (ret == 0) { -                rpc_clnt_reply_deinit (&req); +        req->cbkfn (req, req->rsp, req->rspcnt, saved_frame->frame); +          +        if (req) { +                rpc_clnt_reply_deinit (req, conn->rpc_clnt->reqpool);          }  out:          if (saved_frame) { -                GF_FREE (saved_frame); +                mem_put (conn->rpc_clnt->saved_frames_pool, saved_frame);          } -        return cbk_ret; +        return ret;  } @@ -894,6 +898,25 @@ rpc_clnt_init (struct rpc_clnt_config *config, dict_t *options,          pthread_mutex_init (&rpc->lock, NULL);          rpc->ctx = ctx; +        rpc->reqpool = mem_pool_new (struct rpc_req, +                                     RPC_CLNT_DEFAULT_REQUEST_COUNT); +        if (rpc->reqpool == NULL) { +                pthread_mutex_destroy (&rpc->lock); +                GF_FREE (rpc); +                rpc = NULL; +                goto out; +        } + +        rpc->saved_frames_pool = mem_pool_new (struct saved_frame, +                                              RPC_CLNT_DEFAULT_REQUEST_COUNT); +        if (rpc->saved_frames_pool == NULL) { +                pthread_mutex_destroy (&rpc->lock); +                mem_pool_destroy (rpc->reqpool); +                GF_FREE (rpc); +                rpc = NULL; +                goto out; +        } +          ret = rpc_clnt_connection_init (rpc, ctx, options, name);          if (ret == -1) {                  pthread_mutex_destroy (&rpc->lock); @@ -901,6 +924,7 @@ rpc_clnt_init (struct rpc_clnt_config *config, dict_t *options,                  rpc = NULL;                  goto out;          } +  out:          return rpc;  } @@ -1167,12 +1191,14 @@ rpc_clnt_submit (struct rpc_clnt *rpc, rpc_clnt_prog_t *prog,                   int procnum, fop_cbk_fn_t cbkfn,                   struct iovec *proghdr, int proghdrcount,                   struct iovec *progpayload, int progpayloadcount, -                 struct iobref *iobref, void *frame) +                 struct iobref *iobref, void *frame, struct iovec *rsphdr, +                 int rsphdr_count, struct iovec *rsp_payload, +                 int rsp_payload_count, struct iobref *rsp_iobref)  {          rpc_clnt_connection_t *conn        = NULL;          struct iobuf          *request_iob = NULL;          struct iovec           rpchdr      = {0,}; -        struct rpc_req         rpcreq      = {0,}; +        struct rpc_req        *rpcreq      = NULL;          rpc_transport_req_t    req;          int                    ret         = -1;          int                    proglen     = 0; @@ -1183,6 +1209,13 @@ rpc_clnt_submit (struct rpc_clnt *rpc, rpc_clnt_prog_t *prog,                  goto out;          } +        rpcreq = mem_get (rpc->reqpool); +        if (rpcreq == NULL) { +                gf_log ("rpc-clnt", GF_LOG_ERROR, "out of memory"); +                goto out; +        } + +        memset (rpcreq, 0, sizeof (*rpcreq));          memset (&req, 0, sizeof (req));          if (!iobref) { @@ -1199,6 +1232,12 @@ rpc_clnt_submit (struct rpc_clnt *rpc, rpc_clnt_prog_t *prog,          conn = &rpc->conn; +        rpcreq->prog = prog; +        rpcreq->procnum = procnum; +        rpcreq->conn = conn; +        rpcreq->xid = callid; +        rpcreq->cbkfn = cbkfn; +          pthread_mutex_lock (&conn->lock);          {                  if (conn->connected == 0) { @@ -1235,6 +1274,12 @@ rpc_clnt_submit (struct rpc_clnt *rpc, rpc_clnt_prog_t *prog,                  req.msg.progpayloadcount = progpayloadcount;                  req.msg.iobref = iobref; +                req.rsp.rsphdr = rsphdr; +                req.rsp.rsphdr_count = rsphdr_count; +                req.rsp.rsp_payload = rsp_payload; +                req.rsp.rsp_payload_count = rsp_payload_count; +                req.rsp.rsp_iobref = rsp_iobref; +                  ret = rpc_transport_submit_request (rpc->conn.trans,                                                      &req);                  if (ret == -1) { @@ -1245,9 +1290,8 @@ rpc_clnt_submit (struct rpc_clnt *rpc, rpc_clnt_prog_t *prog,                  if ((ret >= 0) && frame) {                          gettimeofday (&conn->last_sent, NULL);                          /* Save the frame in queue */ -                        __save_frame (rpc, frame, procnum, prog, cbkfn, callid); +                        __save_frame (rpc, frame, rpcreq);                  } -          }  unlock:          pthread_mutex_unlock (&conn->lock); @@ -1266,8 +1310,9 @@ out:          }          if (frame && (ret == -1)) { -                rpcreq.rpc_status = -1; -                cbkfn (&rpcreq, NULL, 0, frame); +                rpcreq->rpc_status = -1; +                cbkfn (rpcreq, NULL, 0, frame); +                mem_put (rpc->reqpool, rpcreq);          }          return ret;  } diff --git a/rpc/rpc-lib/src/rpc-clnt.h b/rpc/rpc-lib/src/rpc-clnt.h index efc256cd261..b9d39b3320a 100644 --- a/rpc/rpc-lib/src/rpc-clnt.h +++ b/rpc/rpc-lib/src/rpc-clnt.h @@ -58,10 +58,7 @@ struct saved_frame {          void                    *capital_this;  	void                    *frame;  	struct timeval           saved_at; -	int32_t                  procnum; -        struct rpc_clnt_program *prog; -        fop_cbk_fn_t             cbkfn; -	uint64_t                 callid; +        struct rpc_req          *rpcreq;          rpc_transport_rsp_t      rsp;  }; @@ -116,14 +113,16 @@ struct rpc_req {          uint32_t               xid;          struct iovec           req[2];          int                    reqcnt; +        struct iobref         *req_iobref;          struct iovec           rsp[2];          int                    rspcnt; -        struct iobuf          *rsp_prochdr; -        struct iobuf          *rsp_procpayload; +        struct iobref         *rsp_iobref;          int                    rpc_status;          rpc_auth_data_t        verf;          rpc_clnt_prog_t       *prog;          int                    procnum; +        fop_cbk_fn_t           cbkfn; +        void                  *conn_private;  };  struct rpc_clnt { @@ -132,6 +131,12 @@ struct rpc_clnt {          rpc_clnt_connection_t  conn;          void                  *mydata;          uint64_t               xid; + +        /* Memory pool for rpc_req_t */ +        struct mem_pool       *reqpool; + +        struct mem_pool       *saved_frames_pool; +          glusterfs_ctx_t       *ctx;  }; @@ -149,11 +154,28 @@ struct rpc_clnt * rpc_clnt_init (struct rpc_clnt_config *config,  int rpc_clnt_register_notify (struct rpc_clnt *rpc, rpc_clnt_notify_t fn,                                void *mydata); +/* Some preconditions related to vectors holding responses. + * @rsphdr: should contain pointer to buffer which can hold response header + *          and length of the program header. In case of procedures whose + *          respnose size is not bounded (eg., glusterfs lookup), the length + *          should be equal to size of buffer. + * @rsp_payload: should contain pointer and length of the bu + * + * 1. Both @rsp_hdr and @rsp_payload are optional. + * 2. The user of rpc_clnt_submit, if wants response hdr and payload in its own + *    buffers, then it has to populate @rsphdr and @rsp_payload. + * 3. when @rsp_payload is not NULL, @rsphdr should + *    also be filled with pointer to buffer to hold header and length + *    of the header. + */ +   int rpc_clnt_submit (struct rpc_clnt *rpc, rpc_clnt_prog_t *prog,                       int procnum, fop_cbk_fn_t cbkfn,                       struct iovec *proghdr, int proghdrcount,                       struct iovec *progpayload, int progpayloadcount, -                     struct iobref *iobref, void *frame); +                     struct iobref *iobref, void *frame, struct iovec *rsphdr, +                     int rsphdr_count, struct iovec *rsp_payload, +                     int rsp_payload_count, struct iobref *rsp_iobref);  void rpc_clnt_destroy (struct rpc_clnt *rpc); diff --git a/rpc/rpc-lib/src/rpc-transport.c b/rpc/rpc-lib/src/rpc-transport.c index b77ea2aa553..50379c14950 100644 --- a/rpc/rpc-lib/src/rpc-transport.c +++ b/rpc/rpc-lib/src/rpc-transport.c @@ -628,20 +628,10 @@ rpc_transport_pollin_destroy (rpc_transport_pollin_t *pollin)                  goto out;          } -        if (pollin->vectored) { -                if (pollin->data.vector.iobuf1) { -                        iobuf_unref (pollin->data.vector.iobuf1); -                } - -                if (pollin->data.vector.iobuf2) { -                        iobuf_unref (pollin->data.vector.iobuf2); -                } -        } else { -                if (pollin->data.simple.iobuf) { -                        iobuf_unref (pollin->data.simple.iobuf); -                } +        if (pollin->iobref) { +                iobref_unref (pollin->iobref);          } - +           if (pollin->private) {                  /* */                  GF_FREE (pollin->private); @@ -654,9 +644,8 @@ out:  rpc_transport_pollin_t * -rpc_transport_pollin_alloc (rpc_transport_t *this, struct iobuf *iobuf, -                            size_t size, struct iobuf *vectored_buf, -                            size_t vectored_size, void *private) +rpc_transport_pollin_alloc (rpc_transport_t *this, struct iovec *vector, +                            int count, struct iobref *iobref, void *private)  {          rpc_transport_pollin_t *msg = NULL;          msg = GF_CALLOC (1, sizeof (*msg), gf_common_mt_rpc_trans_pollin_t); @@ -665,19 +654,15 @@ rpc_transport_pollin_alloc (rpc_transport_t *this, struct iobuf *iobuf,                  goto out;          } -        if (vectored_buf) { +        if (count == 2) {                  msg->vectored = 1; -                msg->data.vector.iobuf1 = iobuf_ref (iobuf); -                msg->data.vector.size1  = size; - -                msg->data.vector.iobuf2 = iobuf_ref (vectored_buf); -                msg->data.vector.size2  = vectored_size; -        } else { -                msg->data.simple.iobuf = iobuf_ref (iobuf); -                msg->data.simple.size = size;          } +        memcpy (msg->vector, vector, count * sizeof (*vector)); +        msg->count = count; +        msg->iobref = iobref_ref (iobref);          msg->private = private; +  out:          return msg;  } @@ -698,6 +683,7 @@ rpc_transport_same_process_pollin_alloc (rpc_transport_t *this,          int                     progpayloadlen = 0;          char                    vectored       = 0;          char                   *hdr            = NULL, *progpayloadbuf = NULL; +        struct iobuf           *iobuf          = NULL;          if (!rpchdr || !proghdr) {                  goto err; @@ -729,47 +715,72 @@ rpc_transport_same_process_pollin_alloc (rpc_transport_t *this,          }          if (vectored) { -                msg->data.vector.iobuf1 = iobuf_get (this->ctx->iobuf_pool); -                if (!msg->data.vector.iobuf1) { +                msg->iobref = iobref_new (); +                if (!msg->iobref) { +                        gf_log ("rpc-transport", GF_LOG_ERROR, +                                "out of memory"); +                        goto err; +                } + +                iobuf = iobuf_get (this->ctx->iobuf_pool); +                if (!iobuf) {                          gf_log ("rpc_transport", GF_LOG_ERROR,                                  "out of memory");                          goto err;                  } -                msg->data.vector.size1 = rpchdrlen + proghdrlen; -                hdr = iobuf_ptr (msg->data.vector.iobuf1); +                iobref_add (msg->iobref, iobuf); +                iobuf_unref (iobuf); + +                msg->vector[0].iov_len = rpchdrlen + proghdrlen; +                msg->vector[0].iov_base = hdr = iobuf_ptr (iobuf);                  if (!is_request && rsp) { -                        msg->data.vector.iobuf2 = rsp->rspbuf; -                        progpayloadbuf = rsp->rspvec->iov_base; +                        msg->vector[1] = rsp->rsp_payload[0]; +                        progpayloadbuf = rsp->rsp_payload[0].iov_base;                  } else { -                        msg->data.vector.iobuf2 = iobuf_get (this->ctx->iobuf_pool); -                        if (!msg->data.vector.iobuf2) { +                        iobuf = iobuf_get (this->ctx->iobuf_pool); +                        if (!iobuf) {                                  gf_log ("rpc_transport", GF_LOG_ERROR,                                          "out of memory");                                  goto err;                          } -                        progpayloadbuf = iobuf_ptr (msg->data.vector.iobuf2); +                        iobref_add (msg->iobref, iobuf); +                        iobuf_unref (iobuf); +  +                        msg->vector[1].iov_base +                                = progpayloadbuf = iobuf_ptr (iobuf);                  } -                msg->data.vector.size2 = progpayloadlen; +                msg->vector[1].iov_len = progpayloadlen;          } else {                  if (!is_request && rsp) {                          /* FIXME: Assuming rspvec contains only one vector */ -                        hdr = rsp->rspvec->iov_base; -                        msg->data.simple.iobuf = rsp->rspbuf; +                        hdr = rsp->rsphdr[0].iov_base; +                        msg->vector[0] = rsp->rsphdr[0];                  } else { -                        msg->data.simple.iobuf = iobuf_get (this->ctx->iobuf_pool); -                        if (!msg->data.simple.iobuf) { +                        msg->iobref = iobref_new (); +                        if (!msg->iobref) { +                                gf_log ("rpc-transport", GF_LOG_ERROR, +                                        "out of memory"); +                                goto err; +                        } + +                        iobuf = iobuf_get (this->ctx->iobuf_pool); +                        if (!iobuf) {                                  gf_log ("rpc_transport", GF_LOG_ERROR,                                          "out of memory");                                  goto err;                          } -                        hdr = iobuf_ptr (msg->data.simple.iobuf); +                        iobref_add (msg->iobref, iobuf); +                        iobuf_unref (iobuf); + +                        hdr = iobuf_ptr (iobuf); +                        msg->vector[0].iov_base = hdr;                  } -                msg->data.simple.size = rpchdrlen + proghdrlen; +                msg->vector[0].iov_len = rpchdrlen + proghdrlen;          }          iov_unload (hdr, rpchdr, rpchdrcount); @@ -1253,7 +1264,8 @@ rpc_transport_peerproc (void *trans_data)                  }                  pthread_mutex_unlock (&trans->handover.mutex); -                rpc_transport_notify (trans, RPC_TRANSPORT_MSG_RECEIVED, msg->pollin); +                rpc_transport_notify (trans, RPC_TRANSPORT_MSG_RECEIVED, +                                      msg->pollin);                  rpc_transport_handover_destroy (msg);          }  } diff --git a/rpc/rpc-lib/src/rpc-transport.h b/rpc/rpc-lib/src/rpc-transport.h index b3c7985e095..5698b287cce 100644 --- a/rpc/rpc-lib/src/rpc-transport.h +++ b/rpc/rpc-lib/src/rpc-transport.h @@ -30,6 +30,10 @@  #include <rpc/auth.h>  #include <rpc/rpc_msg.h> +#ifndef MAX_IOVEC +#define MAX_IOVEC 16 +#endif +  /* Given the 4-byte fragment header, returns non-zero if this fragment   * is the last fragment for the RPC record being assemebled.   * RPC Record marking standard defines a 32 bit value as the fragment @@ -108,12 +112,11 @@ struct rpc_transport_msg {  typedef struct rpc_transport_msg rpc_transport_msg_t;  struct rpc_transport_rsp { -        /* as of now, the entire rsp payload is read into rspbuf and hence -         * rspcount is always set to one. -         */ -        struct iovec    *rspvec; -        int              rspcount; -        struct iobuf    *rspbuf; +        struct iovec   *rsphdr; +        int             rsphdr_count; +        struct iovec   *rsp_payload; +        int             rsp_payload_count; +        struct iobref  *rsp_iobref;  };  typedef struct rpc_transport_rsp rpc_transport_rsp_t; @@ -129,6 +132,15 @@ struct rpc_transport_reply {  };  typedef struct rpc_transport_reply rpc_transport_reply_t; +struct rpc_transport_data { +        char is_request; +        union { +                rpc_transport_req_t   req; +                rpc_transport_reply_t reply; +        } data; +}; +typedef struct rpc_transport_data rpc_transport_data_t; +  struct rpc_request_info {          uint32_t            xid;          int                 prognum; @@ -140,20 +152,11 @@ typedef struct rpc_request_info rpc_request_info_t;  struct rpc_transport_pollin { -        union { -                struct vectored { -                        struct iobuf *iobuf1; -                        size_t        size1; -                        struct iobuf *iobuf2; -                        size_t        size2; -                } vector; -                struct simple { -                        struct iobuf *iobuf; -                        size_t        size; -                } simple; -        } data; +        struct iovec vector[2]; +        int count;          char vectored;          void *private; +        struct iobref *iobref;  };  typedef struct rpc_transport_pollin rpc_transport_pollin_t; @@ -279,9 +282,8 @@ rpc_transport_get_myaddr (rpc_transport_t *this, char *peeraddr, int addrlen,                            struct sockaddr *sa, size_t salen);  rpc_transport_pollin_t * -rpc_transport_pollin_alloc (rpc_transport_t *this, struct iobuf *iobuf, -                            size_t iobuf_size, struct iobuf *vectoriob, -                            size_t vectoriob_size, void *private); +rpc_transport_pollin_alloc (rpc_transport_t *this, struct iovec *vector, +                            int count, struct iobref *iobref, void *private);  void  rpc_transport_pollin_destroy (rpc_transport_pollin_t *pollin); diff --git a/rpc/rpc-lib/src/rpcsvc.c b/rpc/rpc-lib/src/rpcsvc.c index 9bb7e0e4c9e..5bb908cec0a 100644 --- a/rpc/rpc-lib/src/rpcsvc.c +++ b/rpc/rpc-lib/src/rpcsvc.c @@ -1021,12 +1021,8 @@ rpcsvc_request_destroy (rpcsvc_conn_t *conn, rpcsvc_request_t *req)                  goto out;          } -        if (req->recordiob) { -                iobuf_unref (req->recordiob); -        } - -        if (req->vectorediob) { -                iobuf_unref (req->vectorediob); +        if (req->iobref) { +                iobref_unref (req->iobref);          }          mem_put (conn->rxpool, req); @@ -1050,15 +1046,11 @@ rpcsvc_request_init (rpcsvc_conn_t *conn, struct rpc_msg *callmsg,          req->progver = rpc_call_progver (callmsg);          req->procnum = rpc_call_progproc (callmsg);          req->conn = conn; +        req->count = msg->count;          req->msg[0] = progmsg; +        req->iobref = iobref_ref (msg->iobref);          if (msg->vectored) { -                req->msg[1].iov_base = iobuf_ptr (msg->data.vector.iobuf2); -                req->msg[1].iov_len = msg->data.vector.size2; - -                req->recordiob = iobuf_ref (msg->data.vector.iobuf1); -                req->vectorediob = iobuf_ref (msg->data.vector.iobuf2); -        } else { -                req->recordiob = iobuf_ref (msg->data.simple.iobuf); +                req->msg[1] = msg->vector[1];          }          req->trans_private = msg->private; @@ -1106,13 +1098,8 @@ rpcsvc_request_create (rpcsvc_conn_t *conn, rpc_transport_pollin_t *msg)                  goto err;          } -        if (msg->vectored) { -                msgbuf = iobuf_ptr (msg->data.vector.iobuf1); -                msglen = msg->data.vector.size1; -        } else { -                msgbuf = iobuf_ptr (msg->data.simple.iobuf); -                msglen = msg->data.simple.size; -        } +        msgbuf = msg->vector[0].iov_base; +        msglen = msg->vector[0].iov_len;          ret = xdr_to_rpc_call (msgbuf, msglen, &rpcmsg, &progmsg,                                 req->cred.authdata,req->verf.authdata); @@ -1190,11 +1177,11 @@ rpcsvc_handle_rpc_call (rpcsvc_conn_t *conn, rpc_transport_pollin_t *msg)                  goto err_reply;          if (actor && (req->rpc_err == SUCCESS)) { -                if (req->vectorediob) { +                if (req->count == 2) {                          if (actor->vector_actor) {                                  rpcsvc_conn_ref (conn); -                                ret = actor->vector_actor (req, -                                                           req->vectorediob); +                                ret = actor->vector_actor (req, &req->msg[1], 1, +                                                           req->iobref);                          } else {                                  rpcsvc_request_seterr (req, PROC_UNAVAIL);                                  gf_log (GF_RPCSVC, GF_LOG_ERROR, diff --git a/rpc/rpc-lib/src/rpcsvc.h b/rpc/rpc-lib/src/rpcsvc.h index 10dc32698ad..5a3f3cd3449 100644 --- a/rpc/rpc-lib/src/rpcsvc.h +++ b/rpc/rpc-lib/src/rpcsvc.h @@ -237,21 +237,9 @@ struct rpcsvc_request {           * be de-xdred by the actor.           */          struct iovec            msg[2]; +        int                     count; -        /* The full message buffer allocated to store the RPC headers. -         * This buffer is ref'd when allocated why RPC svc and unref'd after -         * the buffer is handed to the actor. That means if the actor or any -         * higher layer wants to keep this buffer around, they too must ref it -         * right after entering the program actor. -         */ -        struct iobuf            *recordiob; - -        /* iobuf to hold payload of calls like write. By storing large payloads -         * starting from page-aligned addresses, performance increases while -         * accessing the payload -         */ -        struct iobuf            *vectorediob; - +        struct iobref          *iobref;          /* Status of the RPC call, whether it was accepted or denied. */          int                     rpc_status; @@ -317,7 +305,6 @@ struct rpcsvc_request {  #define rpcsvc_request_private(req)     ((req)->private)  #define rpcsvc_request_xid(req)         ((req)->xid)  #define rpcsvc_request_set_private(req,prv)  (req)->private = (void *)(prv) -#define rpcsvc_request_record_iob(rq)   ((rq)->recordiob)  #define rpcsvc_request_record_ref(req)  (iobuf_ref ((req)->recordiob))  #define rpcsvc_request_record_unref(req) (iobuf_unref ((req)->recordiob)) @@ -338,7 +325,8 @@ struct rpcsvc_request {   *   */  typedef int (*rpcsvc_actor) (rpcsvc_request_t *req); -typedef int (*rpcsvc_vector_actor) (rpcsvc_request_t *req, struct iobuf *iob); +typedef int (*rpcsvc_vector_actor) (rpcsvc_request_t *req, struct iovec *vec, +                                    int count, struct iobref *iobref);  typedef int (*rpcsvc_vector_sizer) (rpcsvc_request_t *req, ssize_t *readsize,                                      int *newiob); diff --git a/rpc/rpc-lib/src/xdr-common.h b/rpc/rpc-lib/src/xdr-common.h index 7ba1372529c..b3ce29e5dbe 100644 --- a/rpc/rpc-lib/src/xdr-common.h +++ b/rpc/rpc-lib/src/xdr-common.h @@ -50,12 +50,12 @@ struct auth_glusterfs_parms {  	u_int gid;  	u_int ngrps;  	u_int groups[16]; -}; +} __attribute__((packed));  typedef struct auth_glusterfs_parms auth_glusterfs_parms;  struct gf_dump_req {  	u_quad_t gfs_id; -}; +} __attribute__((packed));  typedef struct gf_dump_req gf_dump_req;  struct gf_prog_detail { @@ -63,7 +63,7 @@ struct gf_prog_detail {  	u_quad_t prognum;  	u_quad_t progver;  	struct gf_prog_detail *next; -}; +} __attribute__((packed));  typedef struct gf_prog_detail gf_prog_detail;  struct gf_dump_rsp { @@ -71,7 +71,7 @@ struct gf_dump_rsp {  	int op_ret;  	int op_errno;  	struct gf_prog_detail *prog; -}; +}__attribute__((packed));  typedef struct gf_dump_rsp gf_dump_rsp;  extern bool_t diff --git a/rpc/rpc-transport/socket/src/socket.c b/rpc/rpc-transport/socket/src/socket.c index 4396454a33a..baeee735bb8 100644 --- a/rpc/rpc-transport/socket/src/socket.c +++ b/rpc/rpc-transport/socket/src/socket.c @@ -370,12 +370,13 @@ __socket_reset (rpc_transport_t *this)          /* TODO: use mem-pool on incoming data */ -        if (priv->incoming.iobuf) { -                iobuf_unref (priv->incoming.iobuf); +        if (priv->incoming.iobref) { +                iobref_unref (priv->incoming.iobref); +                priv->incoming.iobref = NULL;          } -        if (priv->incoming.vectoriob) { -                iobuf_unref (priv->incoming.vectoriob); +        if (priv->incoming.iobuf) { +                iobuf_unref (priv->incoming.iobuf);          }          memset (&priv->incoming, 0, sizeof (priv->incoming)); @@ -699,7 +700,7 @@ __socket_read_vectored_request (rpc_transport_t *this)                  /* fall through */          case SP_STATE_READ_VERFBYTES: -                if (priv->incoming.vectoriob == NULL) { +                if (priv->incoming.payload_vector.iov_base == NULL) {                          iobuf = iobuf_get (this->ctx->iobuf_pool);                          if (!iobuf) {                                  gf_log (this->name, GF_LOG_ERROR, @@ -710,7 +711,23 @@ __socket_read_vectored_request (rpc_transport_t *this)                                  break;                          } -                        priv->incoming.vectoriob = iobuf; +                        if (priv->incoming.iobref == NULL) { +                                priv->incoming.iobref = iobref_new (); +                                if (priv->incoming.iobref == NULL) { +                                        gf_log (this->name, GF_LOG_ERROR, +                                                "out of memory"); +                                        ret = -1; +                                        iobuf_unref (iobuf); +                                        break; +                                } +                        } + +                        iobref_add (priv->incoming.iobref, iobuf); +                        iobuf_unref (iobuf); + +                        priv->incoming.payload_vector.iov_base +                                = iobuf_ptr (iobuf); +                          priv->incoming.frag.fragcurrent = iobuf_ptr (iobuf);                  } @@ -735,9 +752,10 @@ __socket_read_vectored_request (rpc_transport_t *this)                          && RPC_LASTFRAG (priv->incoming.fraghdr))) {                          priv->incoming.frag.call_body.request.vector_state                                  = SP_STATE_VECTORED_REQUEST_INIT; -                        priv->incoming.vectoriob_size +                        priv->incoming.payload_vector.iov_len                                  = (unsigned long)priv->incoming.frag.fragcurrent -                                - (unsigned long)iobuf_ptr (priv->incoming.vectoriob); +                                - (unsigned long) +                                priv->incoming.payload_vector.iov_base;                  }                  break;          } @@ -846,18 +864,31 @@ __socket_read_accepted_successful_reply (rpc_transport_t *this)                  /* fall through */          case SP_STATE_READ_PROC_HEADER: -                if (priv->incoming.vectoriob == NULL) { +                if (priv->incoming.payload_vector.iov_base == NULL) {                          iobuf = iobuf_get (this->ctx->iobuf_pool);                          if (iobuf == NULL) {                                  ret = -1;                                  goto out;                          } -                        priv->incoming.vectoriob = iobuf; +                        if (priv->incoming.iobref == NULL) { +                                priv->incoming.iobref = iobref_new (); +                                if (priv->incoming.iobref == NULL) { +                                        ret = -1; +                                        iobuf_unref (iobuf); +                                        goto out; +                                } +                        } + +                        iobref_add (priv->incoming.iobref, iobuf); +                        iobuf_unref (iobuf); + +                        priv->incoming.payload_vector.iov_base +                                = iobuf_ptr (iobuf);                  }                  priv->incoming.frag.fragcurrent -                        = iobuf_ptr (priv->incoming.vectoriob); +                        = priv->incoming.payload_vector.iov_base;                  /* now read the entire remaining msg into new iobuf */                  ret = __socket_read_simple_msg (this); @@ -1080,9 +1111,11 @@ __socket_read_reply (rpc_transport_t *this)          if ((request_info->prognum == GLUSTER3_1_FOP_PROGRAM)              && (request_info->procnum == GF_FOP_READ)) { -                if (request_info->rsp.rspbuf != NULL) { -                        priv->incoming.vectoriob -                                = iobuf_ref (request_info->rsp.rspbuf); +                if (request_info->rsp.rsp_payload_count != 0) { +                        priv->incoming.iobref +                                = iobref_ref (request_info->rsp.rsp_iobref); +                        priv->incoming.payload_vector +                                = *request_info->rsp.rsp_payload;                  }                  ret = __socket_read_vectored_reply (this); @@ -1154,15 +1187,19 @@ __socket_read_frag (rpc_transport_t *this)  inline  void __socket_reset_priv (socket_private_t *priv)  { +        if (priv->incoming.iobref) { +                iobref_unref (priv->incoming.iobref); +                priv->incoming.iobref = NULL; +        } +          if (priv->incoming.iobuf) {                  iobuf_unref (priv->incoming.iobuf); -                priv->incoming.iobuf = NULL;          } -        if (priv->incoming.vectoriob) { -                iobuf_unref (priv->incoming.vectoriob); -                priv->incoming.vectoriob = NULL; -        } +        memset (&priv->incoming.payload_vector, 0, +                sizeof (priv->incoming.payload_vector)); + +        priv->incoming.iobuf = NULL;  } @@ -1170,9 +1207,11 @@ int  __socket_proto_state_machine (rpc_transport_t *this,                                rpc_transport_pollin_t **pollin)  { -        int               ret   = -1; -        socket_private_t *priv  = NULL; -        struct iobuf     *iobuf = NULL; +        int               ret    = -1; +        socket_private_t *priv   = NULL; +        struct iobuf     *iobuf  = NULL; +        struct iobref    *iobref = NULL; +        struct iovec      vector[2];          priv = this->private;          while (priv->incoming.record_state != SP_STATE_COMPLETE) { @@ -1191,7 +1230,7 @@ __socket_proto_state_machine (rpc_transport_t *this,                          priv->incoming.iobuf = iobuf;                          priv->incoming.iobuf_size = 0; -                        priv->incoming.vectoriob_size = 0; +                        priv->incoming.payload_vector.iov_len = 0;                          priv->incoming.pending_vector = priv->incoming.vector;                          priv->incoming.pending_vector->iov_base = @@ -1260,15 +1299,49 @@ __socket_proto_state_machine (rpc_transport_t *this,                           * upper layers.                           */                          if (pollin != NULL) { +                                int count = 0;                                  priv->incoming.iobuf_size                                          = priv->incoming.total_bytes_read -                                        - priv->incoming.vectoriob_size; +                                        - priv->incoming.payload_vector.iov_len; + +                                memset (vector, 0, sizeof (vector)); + +                                if (priv->incoming.iobref == NULL) { +                                        priv->incoming.iobref = iobref_new (); +                                        if (priv->incoming.iobref == NULL) { +                                                gf_log (this->name, +                                                        GF_LOG_ERROR, +                                                        "out of memory"); +                                                ret = -1; +                                                goto out; +                                        } +                                } + +                                vector[count].iov_base +                                        = iobuf_ptr (priv->incoming.iobuf); +                                vector[count].iov_len +                                        = priv->incoming.iobuf_size; + +                                iobref = priv->incoming.iobref; + +                                iobref_add (iobref, +                                            priv->incoming.iobuf); +                                iobuf_unref (priv->incoming.iobuf); +                                priv->incoming.iobuf = NULL;  + +                                count++; + +                                if (priv->incoming.payload_vector.iov_base +                                    != NULL) { +                                        vector[count] +                                                = priv->incoming.payload_vector; +                                        count++; +                                }                                  *pollin = rpc_transport_pollin_alloc (this, -                                                                      priv->incoming.iobuf, -                                                                      priv->incoming.iobuf_size, -                                                                      priv->incoming.vectoriob, -                                                                      priv->incoming.vectoriob_size, +                                                                      vector, +                                                                      count, +                                                                      iobref,                                                                        priv->incoming.request_info);                                  if (*pollin == NULL) {                                          ret = -1; diff --git a/rpc/rpc-transport/socket/src/socket.h b/rpc/rpc-transport/socket/src/socket.h index aa31ee2a7ef..5078b161e29 100644 --- a/rpc/rpc-transport/socket/src/socket.h +++ b/rpc/rpc-transport/socket/src/socket.h @@ -170,8 +170,8 @@ typedef struct {                  size_t               iobuf_size;                  struct iovec         vector[2];                  int                  count; -                struct iobuf        *vectoriob; -                size_t               vectoriob_size; +                struct iovec         payload_vector; +                struct iobref       *iobref;                  rpc_request_info_t  *request_info;                  struct iovec        *pending_vector;                  int                  pending_count;  | 
