From c0b8e886cac4ef0f16d5f93adab02229bb1414cd Mon Sep 17 00:00:00 2001 From: Amar Tumballi Date: Tue, 14 Feb 2012 15:05:19 +0530 Subject: iobuf: use 'iobuf_get2()' to get variable sized buffers added 'TODO' in places where it is missing. Change-Id: Ia802c94e3bb76930f7c88c990f078525be5459f5 Signed-off-by: Amar Tumballi BUG: 765264 Reviewed-on: http://review.gluster.com/388 Tested-by: Gluster Build System Reviewed-by: Vijay Bellur --- rpc/rpc-lib/src/rpc-clnt.c | 25 +++++------ rpc/rpc-lib/src/rpcsvc.c | 49 +++++++++++++--------- rpc/rpc-transport/rdma/src/rdma.c | 23 ++++------ rpc/rpc-transport/socket/src/socket.c | 15 ++++++- xlators/cluster/stripe/src/stripe.c | 3 +- xlators/mount/fuse/src/fuse-bridge.c | 5 +++ xlators/nfs/server/src/mount3.c | 1 + xlators/nfs/server/src/nfs3.c | 2 + xlators/performance/quick-read/src/quick-read.c | 3 ++ .../performance/write-behind/src/write-behind.c | 1 + xlators/protocol/client/src/client3_1-fops.c | 30 +++++++------ 11 files changed, 92 insertions(+), 65 deletions(-) diff --git a/rpc/rpc-lib/src/rpc-clnt.c b/rpc/rpc-lib/src/rpc-clnt.c index 631d7fcf7..540c72c82 100644 --- a/rpc/rpc-lib/src/rpc-clnt.c +++ b/rpc/rpc-lib/src/rpc-clnt.c @@ -1117,7 +1117,7 @@ ret: int -rpc_clnt_fill_request (int prognum, int progver, int procnum, int payload, +rpc_clnt_fill_request (int prognum, int progver, int procnum, uint64_t xid, struct auth_glusterfs_parms_v2 *au, struct rpc_msg *request, char *auth_data) { @@ -1197,7 +1197,7 @@ out: struct iobuf * rpc_clnt_record_build_record (struct rpc_clnt *clnt, int prognum, int progver, - int procnum, size_t payload, uint64_t xid, + int procnum, size_t hdrsize, uint64_t xid, struct auth_glusterfs_parms_v2 *au, struct iovec *recbuf) { @@ -1207,16 +1207,19 @@ rpc_clnt_record_build_record (struct rpc_clnt *clnt, int prognum, int progver, struct iovec recordhdr = {0, }; size_t pagesize = 0; int ret = -1; + size_t xdr_size = 0; char auth_data[GF_MAX_AUTH_BYTES] = {0, }; if ((!clnt) || (!recbuf) || (!au)) { goto out; } + xdr_size = xdr_sizeof ((xdrproc_t)xdr_callmsg, &request); + /* First, try to get a pointer into the buffer which the RPC * layer can use. */ - request_iob = iobuf_get (clnt->ctx->iobuf_pool); + request_iob = iobuf_get2 (clnt->ctx->iobuf_pool, (xdr_size + hdrsize)); if (!request_iob) { goto out; } @@ -1226,7 +1229,7 @@ rpc_clnt_record_build_record (struct rpc_clnt *clnt, int prognum, int progver, record = iobuf_ptr (request_iob); /* Now we have it. */ /* Fill the rpc structure and XDR it into the buffer got above. */ - ret = rpc_clnt_fill_request (prognum, progver, procnum, payload, xid, + ret = rpc_clnt_fill_request (prognum, progver, procnum, xid, au, &request, auth_data); if (ret == -1) { gf_log (clnt->conn.trans->name, GF_LOG_WARNING, @@ -1235,7 +1238,7 @@ rpc_clnt_record_build_record (struct rpc_clnt *clnt, int prognum, int progver, } recordhdr = rpc_clnt_record_build_header (record, pagesize, &request, - payload); + hdrsize); if (!recordhdr.iov_base) { gf_log (clnt->conn.trans->name, GF_LOG_ERROR, @@ -1256,7 +1259,7 @@ out: struct iobuf * rpc_clnt_record (struct rpc_clnt *clnt, call_frame_t *call_frame, - rpc_clnt_prog_t *prog,int procnum, size_t payload_len, + rpc_clnt_prog_t *prog, int procnum, size_t hdrlen, struct iovec *rpchdr, uint64_t callid) { struct auth_glusterfs_parms_v2 au = {0, }; @@ -1292,12 +1295,9 @@ rpc_clnt_record (struct rpc_clnt *clnt, call_frame_t *call_frame, ", gid: %d, owner: %s", au.pid, au.uid, au.gid, lkowner_utoa (&call_frame->root->lk_owner)); - /* Assuming the client program would like to speak to the same version of - * program on server. - */ request_iob = rpc_clnt_record_build_record (clnt, prog->prognum, prog->progver, - procnum, payload_len, + procnum, hdrlen, callid, &au, rpchdr); if (!request_iob) { @@ -1432,11 +1432,6 @@ rpc_clnt_submit (struct rpc_clnt *rpc, rpc_clnt_prog_t *prog, proglen += iov_length (proghdr, proghdrcount); } - if (progpayload) { - proglen += iov_length (progpayload, - progpayloadcount); - } - request_iob = rpc_clnt_record (rpc, frame, prog, procnum, proglen, &rpchdr, callid); diff --git a/rpc/rpc-lib/src/rpcsvc.c b/rpc/rpc-lib/src/rpcsvc.c index e0fc29184..ca6a6ca4c 100644 --- a/rpc/rpc-lib/src/rpcsvc.c +++ b/rpc/rpc-lib/src/rpcsvc.c @@ -768,16 +768,28 @@ rpcsvc_callback_build_record (rpcsvc_t *rpc, int prognum, int progver, char *record = NULL; struct iovec recordhdr = {0, }; size_t pagesize = 0; + size_t xdr_size = 0; int ret = -1; if ((!rpc) || (!recbuf)) { goto out; } + /* Fill the rpc structure and XDR it into the buffer got above. */ + ret = rpcsvc_fill_callback (prognum, progver, procnum, payload, xid, + &request); + if (ret == -1) { + gf_log ("rpcsvc", GF_LOG_WARNING, "cannot build a rpc-request " + "xid (%"PRIu64")", xid); + goto out; + } + /* First, try to get a pointer into the buffer which the RPC * layer can use. */ - request_iob = iobuf_get (rpc->ctx->iobuf_pool); + xdr_size = xdr_sizeof ((xdrproc_t)xdr_callmsg, &request); + + request_iob = iobuf_get2 (rpc->ctx->iobuf_pool, (xdr_size + payload)); if (!request_iob) { goto out; } @@ -786,15 +798,6 @@ rpcsvc_callback_build_record (rpcsvc_t *rpc, int prognum, int progver, record = iobuf_ptr (request_iob); /* Now we have it. */ - /* Fill the rpc structure and XDR it into the buffer got above. */ - ret = rpcsvc_fill_callback (prognum, progver, procnum, payload, xid, - &request); - if (ret == -1) { - gf_log ("rpcsvc", GF_LOG_WARNING, "cannot build a rpc-request " - "xid (%"PRIu64")", xid); - goto out; - } - recordhdr = rpcsvc_callback_build_header (record, pagesize, &request, payload); @@ -938,13 +941,14 @@ out: */ struct iobuf * rpcsvc_record_build_record (rpcsvc_request_t *req, size_t payload, - struct iovec *recbuf) + size_t hdrlen, struct iovec *recbuf) { struct rpc_msg reply; struct iobuf *replyiob = NULL; char *record = NULL; struct iovec recordhdr = {0, }; size_t pagesize = 0; + size_t xdr_size = 0; rpcsvc_t *svc = NULL; int ret = -1; @@ -952,19 +956,25 @@ rpcsvc_record_build_record (rpcsvc_request_t *req, size_t payload, return NULL; svc = req->svc; - replyiob = iobuf_get (svc->ctx->iobuf_pool); - pagesize = iobuf_pagesize (replyiob); - if (!replyiob) { - goto err_exit; - } - - record = iobuf_ptr (replyiob); /* Now we have it. */ /* Fill the rpc structure and XDR it into the buffer got above. */ ret = rpcsvc_fill_reply (req, &reply); if (ret) goto err_exit; + xdr_size = xdr_sizeof ((xdrproc_t)xdr_replymsg, &reply); + + /* Payload would include 'readv' size etc too, where as + that comes as another payload iobuf */ + replyiob = iobuf_get2 (svc->ctx->iobuf_pool, (xdr_size + hdrlen)); + if (!replyiob) { + goto err_exit; + } + + pagesize = iobuf_pagesize (replyiob); + + record = iobuf_ptr (replyiob); /* Now we have it. */ + recordhdr = rpcsvc_record_build_header (record, pagesize, reply, payload); if (!recordhdr.iov_base) { @@ -1019,6 +1029,7 @@ rpcsvc_submit_generic (rpcsvc_request_t *req, struct iovec *proghdr, struct iovec recordhdr = {0, }; rpc_transport_t *trans = NULL; size_t msglen = 0; + size_t hdrlen = 0; char new_iobref = 0; if ((!req) || (!req->trans)) @@ -1037,7 +1048,7 @@ rpcsvc_submit_generic (rpcsvc_request_t *req, struct iovec *proghdr, gf_log (GF_RPCSVC, GF_LOG_TRACE, "Tx message: %zu", msglen); /* Build the buffer containing the encoded RPC reply. */ - replyiob = rpcsvc_record_build_record (req, msglen, &recordhdr); + replyiob = rpcsvc_record_build_record (req, msglen, hdrlen, &recordhdr); if (!replyiob) { gf_log (GF_RPCSVC, GF_LOG_ERROR,"Reply record creation failed"); goto disconnect_exit; diff --git a/rpc/rpc-transport/rdma/src/rdma.c b/rpc/rpc-transport/rdma/src/rdma.c index d3a8e9c8d..04531cda0 100644 --- a/rpc/rpc-transport/rdma/src/rdma.c +++ b/rpc/rpc-transport/rdma/src/rdma.c @@ -1992,7 +1992,7 @@ gf_rdma_receive (rpc_transport_t *this, char **hdr_p, size_t *hdrlen_p, *hdrlen_p = size1; if (size2) { - iobuf = iobuf_get (this->ctx->iobuf_pool); + iobuf = iobuf_get2 (this->ctx->iobuf_pool, size2); if (!iobuf) { gf_log (this->name, GF_LOG_ERROR, "unable to allocate IO buffer for peer %s", @@ -2716,7 +2716,7 @@ gf_rdma_decode_error_msg (gf_rdma_peer_t *peer, gf_rdma_post_t *post, ntoh32 (header->rm_body.rm_error.rm_version.gf_rdma_vers_high); } - iobuf = iobuf_get (peer->trans->ctx->iobuf_pool); + iobuf = iobuf_get2 (peer->trans->ctx->iobuf_pool, bytes_in_post); if (iobuf == NULL) { ret = -1; goto out; @@ -2822,15 +2822,17 @@ gf_rdma_decode_msg (gf_rdma_peer_t *peer, gf_rdma_post_t *post, /* skip terminator of reply chunk */ ptr = ptr + sizeof (uint32_t); if (header->rm_type != GF_RDMA_NOMSG) { - post->ctx.hdr_iobuf = iobuf_get (peer->trans->ctx->iobuf_pool); + header_len = (long)ptr - (long)post->buf; + post->ctx.vector[0].iov_len = (bytes_in_post - header_len); + + post->ctx.hdr_iobuf = iobuf_get2 (peer->trans->ctx->iobuf_pool, + (bytes_in_post - header_len)); if (post->ctx.hdr_iobuf == NULL) { ret = -1; goto out; } - header_len = (long)ptr - (long)post->buf; post->ctx.vector[0].iov_base = iobuf_ptr (post->ctx.hdr_iobuf); - post->ctx.vector[0].iov_len = bytes_in_post - header_len; memcpy (post->ctx.vector[0].iov_base, ptr, post->ctx.vector[0].iov_len); post->ctx.count = 1; @@ -2965,16 +2967,7 @@ gf_rdma_do_reads (gf_rdma_peer_t *peer, gf_rdma_post_t *post, post->ctx.gf_rdma_reads = i; - if (size > peer->trans->ctx->page_size) { - gf_log (GF_RDMA_LOG_NAME, GF_LOG_ERROR, - "total size of rdma-read (%lu) is greater than " - "page-size (%lu). This is not supported till variable " - "sized iobufs are implemented", (unsigned long)size, - (unsigned long)peer->trans->ctx->page_size); - goto out; - } - - iobuf = iobuf_get (peer->trans->ctx->iobuf_pool); + iobuf = iobuf_get2 (peer->trans->ctx->iobuf_pool, size); if (iobuf == NULL) { goto out; } diff --git a/rpc/rpc-transport/socket/src/socket.c b/rpc/rpc-transport/socket/src/socket.c index 3b2d05c45..120e193dd 100644 --- a/rpc/rpc-transport/socket/src/socket.c +++ b/rpc/rpc-transport/socket/src/socket.c @@ -823,6 +823,7 @@ __socket_read_vectored_request (rpc_transport_t *this, rpcsvc_vector_sizer vecto struct iobuf *iobuf = NULL; uint32_t remaining_size = 0; ssize_t readsize = 0; + size_t size = 0; GF_VALIDATE_OR_GOTO ("socket", this, out); GF_VALIDATE_OR_GOTO ("socket", this->private, out); @@ -907,7 +908,10 @@ sp_state_reading_proghdr: case SP_STATE_READ_PROGHDR: if (priv->incoming.payload_vector.iov_base == NULL) { - iobuf = iobuf_get (this->ctx->iobuf_pool); + + size = RPC_FRAGSIZE (priv->incoming.fraghdr) - + priv->incoming.frag.bytes_read; + iobuf = iobuf_get2 (this->ctx->iobuf_pool, size); if (!iobuf) { ret = -1; break; @@ -1048,6 +1052,7 @@ __socket_read_accepted_successful_reply (rpc_transport_t *this) struct iobuf *iobuf = NULL; uint32_t gluster_read_rsp_hdr_len = 0; gfs3_read_rsp read_rsp = {0, }; + size_t size = 0; GF_VALIDATE_OR_GOTO ("socket", this, out); GF_VALIDATE_OR_GOTO ("socket", this->private, out); @@ -1080,7 +1085,11 @@ __socket_read_accepted_successful_reply (rpc_transport_t *this) = SP_STATE_READ_PROC_HEADER; if (priv->incoming.payload_vector.iov_base == NULL) { - iobuf = iobuf_get (this->ctx->iobuf_pool); + + size = (RPC_FRAGSIZE (priv->incoming.fraghdr) - + priv->incoming.frag.bytes_read); + + iobuf = iobuf_get2 (this->ctx->iobuf_pool, size); if (iobuf == NULL) { ret = -1; goto out; @@ -1100,6 +1109,8 @@ __socket_read_accepted_successful_reply (rpc_transport_t *this) priv->incoming.payload_vector.iov_base = iobuf_ptr (iobuf); + + priv->incoming.payload_vector.iov_len = size; } priv->incoming.frag.fragcurrent diff --git a/xlators/cluster/stripe/src/stripe.c b/xlators/cluster/stripe/src/stripe.c index 5e369d64b..ec9b6a744 100644 --- a/xlators/cluster/stripe/src/stripe.c +++ b/xlators/cluster/stripe/src/stripe.c @@ -3372,7 +3372,8 @@ stripe_readv_fstat_cbk (call_frame_t *frame, void *cookie, xlator_t *this, vec[count].iov_len = (local->replies[i].requested_size - local->replies[i].op_ret); - iobuf = iobuf_get (this->ctx->iobuf_pool); + iobuf = iobuf_get2 (this->ctx->iobuf_pool, + vec[count].iov_len); if (!iobuf) { gf_log (this->name, GF_LOG_ERROR, "Out of memory."); diff --git a/xlators/mount/fuse/src/fuse-bridge.c b/xlators/mount/fuse/src/fuse-bridge.c index 1da2dcdb2..e644290e4 100644 --- a/xlators/mount/fuse/src/fuse-bridge.c +++ b/xlators/mount/fuse/src/fuse-bridge.c @@ -3474,7 +3474,12 @@ fuse_thread_proc (void *data) if (priv->init_recvd) fuse_graph_sync (this); + /* TODO: This place should always get maximum supported buffer + size from 'fuse', which is as of today 128KB. If we bring in + support for higher block sizes support, then we should be + changing this one too */ iobuf = iobuf_get (this->ctx->iobuf_pool); + /* Add extra 128 byte to the first iov so that it can * accommodate "ordinary" non-write requests. It's not * guaranteed to be big enough, as SETXATTR and namespace diff --git a/xlators/nfs/server/src/mount3.c b/xlators/nfs/server/src/mount3.c index 0f14c8d5a..cebdf5270 100644 --- a/xlators/nfs/server/src/mount3.c +++ b/xlators/nfs/server/src/mount3.c @@ -70,6 +70,7 @@ mnt3svc_submit_reply (rpcsvc_request_t *req, void *arg, mnt3_serializer sfunc) /* First, get the io buffer into which the reply in arg will * be serialized. */ + /* TODO: use 'xdrproc_t' instead of 'sfunc' to get the xdr-size */ iob = iobuf_get (ms->iobpool); if (!iob) { gf_log (GF_MNT, GF_LOG_ERROR, "Failed to get iobuf"); diff --git a/xlators/nfs/server/src/nfs3.c b/xlators/nfs/server/src/nfs3.c index 422646bc0..527101de2 100644 --- a/xlators/nfs/server/src/nfs3.c +++ b/xlators/nfs/server/src/nfs3.c @@ -487,6 +487,8 @@ nfs3_serialize_reply (rpcsvc_request_t *req, void *arg, nfs3_serializer sfunc, /* First, get the io buffer into which the reply in arg will * be serialized. */ + /* TODO: get rid of 'sfunc' and use 'xdrproc_t' so we + can have 'xdr_sizeof' */ iob = iobuf_get (nfs3->iobpool); if (!iob) { gf_log (GF_NFS3, GF_LOG_ERROR, "Failed to get iobuf"); diff --git a/xlators/performance/quick-read/src/quick-read.c b/xlators/performance/quick-read/src/quick-read.c index 14ff58b51..c1460b1de 100644 --- a/xlators/performance/quick-read/src/quick-read.c +++ b/xlators/performance/quick-read/src/quick-read.c @@ -1180,6 +1180,9 @@ qr_readv (call_frame_t *frame, xlator_t *this, fd_t *fd, size_t size, } for (i = 0; i < count; i++) { + /* TODO: Now that we have support for variable + io-buf-sizes, i guess we need to get rid of + default size here */ iobuf = iobuf_get (iobuf_pool); if (iobuf == NULL) { op_ret = -1; diff --git a/xlators/performance/write-behind/src/write-behind.c b/xlators/performance/write-behind/src/write-behind.c index 7c666b403..be7da2f44 100644 --- a/xlators/performance/write-behind/src/write-behind.c +++ b/xlators/performance/write-behind/src/write-behind.c @@ -1901,6 +1901,7 @@ __wb_copy_into_holder (wb_request_t *holder, wb_request_t *request) int ret = -1; if (holder->flags.write_request.virgin) { + /* TODO: check the required size */ iobuf = iobuf_get (request->file->this->ctx->iobuf_pool); if (iobuf == NULL) { goto out; diff --git a/xlators/protocol/client/src/client3_1-fops.c b/xlators/protocol/client/src/client3_1-fops.c index 234b0dd6e..e3d6d36db 100644 --- a/xlators/protocol/client/src/client3_1-fops.c +++ b/xlators/protocol/client/src/client3_1-fops.c @@ -2493,6 +2493,8 @@ client3_1_lookup (call_frame_t *frame, xlator_t *this, } /* TODO: what is the size we should send ? */ + /* This change very much depends on quick-read + changes */ rsp_iobuf = iobuf_get (this->ctx->iobuf_pool); if (rsp_iobuf == NULL) { goto unwind; @@ -3409,8 +3411,7 @@ client3_1_readv (call_frame_t *frame, xlator_t *this, memcpy (req.gfid, args->fd->inode->gfid, 16); - /* TODO: what is the size we should send ? */ - rsp_iobuf = iobuf_get (this->ctx->iobuf_pool); + rsp_iobuf = iobuf_get2 (this->ctx->iobuf_pool, args->size); if (rsp_iobuf == NULL) { op_errno = ENOMEM; goto unwind; @@ -3959,8 +3960,8 @@ client3_1_fgetxattr (call_frame_t *frame, xlator_t *this, goto unwind; } - /* TODO: what is the size we should send ? */ - rsp_iobuf = iobuf_get (this->ctx->iobuf_pool); + /* TODO: what is the size we should send ? */ + rsp_iobuf = iobuf_get2 (this->ctx->iobuf_pool, 8 * GF_UNIT_KB); if (rsp_iobuf == NULL) { op_errno = ENOMEM; goto unwind; @@ -4062,8 +4063,8 @@ client3_1_getxattr (call_frame_t *frame, xlator_t *this, goto unwind; } - /* TODO: what is the size we should send ? */ - rsp_iobuf = iobuf_get (this->ctx->iobuf_pool); + /* TODO: what is the size we should send ? */ + rsp_iobuf = iobuf_get2 (this->ctx->iobuf_pool, 8 * GF_UNIT_KB); if (rsp_iobuf == NULL) { op_errno = ENOMEM; goto unwind; @@ -4188,8 +4189,8 @@ client3_1_xattrop (call_frame_t *frame, xlator_t *this, goto unwind; } - /* TODO: what is the size we should send ? */ - rsp_iobuf = iobuf_get (this->ctx->iobuf_pool); + /* TODO: what is the size we should send ? */ + rsp_iobuf = iobuf_get2 (this->ctx->iobuf_pool, 8 * GF_UNIT_KB); if (rsp_iobuf == NULL) { op_errno = ENOMEM; goto unwind; @@ -4308,8 +4309,8 @@ client3_1_fxattrop (call_frame_t *frame, xlator_t *this, goto unwind; } - /* TODO: what is the size we should send ? */ - rsp_iobuf = iobuf_get (this->ctx->iobuf_pool); + /* TODO: what is the size we should send ? */ + rsp_iobuf = iobuf_get2 (this->ctx->iobuf_pool, 8 * GF_UNIT_KB); if (rsp_iobuf == NULL) { op_errno = ENOMEM; goto unwind; @@ -4880,7 +4881,9 @@ client3_1_readdir (call_frame_t *frame, xlator_t *this, goto unwind; } - /* TODO: what is the size we should send ? */ + /* TODO: what is the size we should send ? */ + /* This iobuf will live for only receiving the response, + so not harmful */ rsp_iobuf = iobuf_get (this->ctx->iobuf_pool); if (rsp_iobuf == NULL) { goto unwind; @@ -4890,8 +4893,7 @@ client3_1_readdir (call_frame_t *frame, xlator_t *this, iobuf_unref (rsp_iobuf); rsphdr = &vector[0]; rsphdr->iov_base = iobuf_ptr (rsp_iobuf); - rsphdr->iov_len - = iobuf_pagesize (rsp_iobuf); + rsphdr->iov_len = iobuf_pagesize (rsp_iobuf); count = 1; rsp_iobuf = NULL; local->iobref = rsp_iobref; @@ -4983,6 +4985,8 @@ client3_1_readdirp (call_frame_t *frame, xlator_t *this, } /* TODO: what is the size we should send ? */ + /* This iobuf will live for only receiving the response, + so not harmful */ rsp_iobuf = iobuf_get (this->ctx->iobuf_pool); if (rsp_iobuf == NULL) { goto unwind; -- cgit