From d19a72024c8cd5e40dc48df27c21fda701e76c54 Mon Sep 17 00:00:00 2001 From: Raghavendra G Date: Thu, 4 Nov 2010 02:33:19 +0000 Subject: rpc-transport/rdma: Fix 2KB as threshold size for msgs that can be transferred inline. - Any fop/mop that can result a reply whose size is greater than this threshold allocates and submits a buffer along with the request to receive reply. Signed-off-by: Raghavendra G Signed-off-by: Anand V. Avati BUG: 513 (Introduce 0 copy rdma) URL: http://bugs.gluster.com/cgi-bin/bugzilla3/show_bug.cgi?id=513 --- xlators/protocol/client/src/Makefile.am | 2 +- xlators/protocol/client/src/client3_1-fops.c | 495 ++++++++++++++++++++++++--- 2 files changed, 439 insertions(+), 58 deletions(-) (limited to 'xlators/protocol') diff --git a/xlators/protocol/client/src/Makefile.am b/xlators/protocol/client/src/Makefile.am index f2dea68d70d..29c6e9dde76 100644 --- a/xlators/protocol/client/src/Makefile.am +++ b/xlators/protocol/client/src/Makefile.am @@ -14,4 +14,4 @@ noinst_HEADERS = client.h client-mem-types.h AM_CFLAGS = -fPIC -D_FILE_OFFSET_BITS=64 -D_GNU_SOURCE -Wall -D$(GF_HOST_OS) \ -I$(top_srcdir)/libglusterfs/src -shared -nostartfiles $(GF_CFLAGS) \ - -I$(top_srcdir)/rpc/xdr/src -I$(top_srcdir)/rpc/rpc-lib/src/ + -I$(top_srcdir)/rpc/xdr/src -I$(top_srcdir)/rpc/rpc-lib/src/ \ No newline at end of file diff --git a/xlators/protocol/client/src/client3_1-fops.c b/xlators/protocol/client/src/client3_1-fops.c index 184f67d0b7e..86e7fb24e0f 100644 --- a/xlators/protocol/client/src/client3_1-fops.c +++ b/xlators/protocol/client/src/client3_1-fops.c @@ -17,7 +17,6 @@ . */ - #ifndef _CONFIG_H #define _CONFIG_H #include "config.h" @@ -724,8 +723,11 @@ client3_1_getxattr_cbk (struct rpc_req *req, struct iovec *iov, int count, int op_errno = EINVAL; gfs3_getxattr_rsp rsp = {0,}; int ret = 0; + clnt_local_t *local = NULL; frame = myframe; + local = frame->local; + frame->local = NULL; if (-1 == req->rpc_status) { op_ret = -1; @@ -783,6 +785,8 @@ out: if (dict) dict_unref (dict); + client_local_wipe (local); + return 0; } @@ -798,8 +802,11 @@ client3_1_fgetxattr_cbk (struct rpc_req *req, struct iovec *iov, int count, int dict_len = 0; int op_ret = 0; int op_errno = EINVAL; + clnt_local_t *local = NULL; frame = myframe; + local = frame->local; + frame->local = NULL; if (-1 == req->rpc_status) { op_ret = -1; @@ -853,6 +860,9 @@ out: if (dict) dict_unref (dict); + if (local) + client_local_wipe (local); + return 0; } @@ -1155,8 +1165,11 @@ client3_1_xattrop_cbk (struct rpc_req *req, struct iovec *iov, int count, int op_ret = 0; int dict_len = 0; int op_errno = EINVAL; + clnt_local_t *local = NULL; frame = myframe; + local = frame->local; + frame->local = NULL; if (-1 == req->rpc_status) { op_ret = -1; @@ -1214,6 +1227,8 @@ out: if (dict) dict_unref (dict); + client_local_wipe (local); + return 0; } @@ -1229,8 +1244,11 @@ client3_1_fxattrop_cbk (struct rpc_req *req, struct iovec *iov, int count, int op_ret = 0; int dict_len = 0; int op_errno = 0; + clnt_local_t *local = NULL; frame = myframe; + local = frame->local; + frame->local = NULL; if (-1 == req->rpc_status) { op_ret = -1; @@ -1288,6 +1306,7 @@ out: if (dict) dict_unref (dict); + client_local_wipe (local); return 0; } @@ -1575,11 +1594,14 @@ client3_1_readdir_cbk (struct rpc_req *req, struct iovec *iov, int count, void *myframe) { call_frame_t *frame = NULL; - gfs3_readdir_rsp rsp = {0,}; - int32_t ret = 0; + gfs3_readdir_rsp rsp = {0,}; + int32_t ret = 0; + clnt_local_t *local = NULL; gf_dirent_t entries; frame = myframe; + local = frame->local; + frame->local = NULL; if (-1 == req->rpc_status) { rsp.op_ret = -1; @@ -1604,6 +1626,8 @@ out: STACK_UNWIND_STRICT (readdir, frame, rsp.op_ret, gf_error_to_errno (rsp.op_errno), &entries); + client_local_wipe (local); + if (rsp.op_ret != -1) { gf_dirent_free (&entries); } @@ -1619,11 +1643,14 @@ client3_1_readdirp_cbk (struct rpc_req *req, struct iovec *iov, int count, void *myframe) { call_frame_t *frame = NULL; - gfs3_readdirp_rsp rsp = {0,}; - int32_t ret = 0; + gfs3_readdirp_rsp rsp = {0,}; + int32_t ret = 0; + clnt_local_t *local = NULL; gf_dirent_t entries; frame = myframe; + local = frame->local; + frame->local = NULL; if (-1 == req->rpc_status) { rsp.op_ret = -1; @@ -1648,6 +1675,8 @@ out: STACK_UNWIND_STRICT (readdirp, frame, rsp.op_ret, gf_error_to_errno (rsp.op_errno), &entries); + client_local_wipe (local); + if (rsp.op_ret != -1) { gf_dirent_free (&entries); } @@ -1932,7 +1961,7 @@ client3_1_readv_cbk (struct rpc_req *req, struct iovec *iov, int count, { call_frame_t *frame = NULL; struct iobref *iobref = NULL; - struct iovec vector[MAX_IOVEC]; + struct iovec vector[MAX_IOVEC] = {{0}, }; struct iatt stat = {0,}; gfs3_read_rsp rsp = {0,}; int ret = 0, rspcount = 0; @@ -2168,7 +2197,7 @@ client3_1_lookup (call_frame_t *frame, xlator_t *this, size_t dict_len = 0; int op_errno = ESTALE; data_t *content = NULL; - struct iovec vector[MAX_IOVEC]; + struct iovec vector[MAX_IOVEC] = {{0}, }; int count = 0; struct iobref *rsp_iobref = NULL; struct iobuf *rsp_iobuf = NULL; @@ -2245,7 +2274,7 @@ client3_1_lookup (call_frame_t *frame, xlator_t *this, ret = client_submit_request (this, &req, frame, conf->fops, GFS3_OP_LOOKUP, client3_1_lookup_cbk, NULL, xdr_from_lookup_req, rsphdr, count, - NULL, 0, rsp_iobref); + NULL, 0, local->iobref); if (ret) { op_errno = ENOTCONN; @@ -3695,12 +3724,18 @@ int32_t client3_1_fgetxattr (call_frame_t *frame, xlator_t *this, void *data) { - clnt_args_t *args = NULL; - clnt_fd_ctx_t *fdctx = NULL; - clnt_conf_t *conf = NULL; - gfs3_fgetxattr_req req = {{0,},}; - int op_errno = ESTALE; - int ret = 0; + clnt_args_t *args = NULL; + clnt_fd_ctx_t *fdctx = NULL; + clnt_conf_t *conf = NULL; + gfs3_fgetxattr_req req = {{0,},}; + int op_errno = ESTALE; + int ret = 0; + int count = 0; + clnt_local_t *local = NULL; + struct iobref *rsp_iobref = NULL; + struct iobuf *rsp_iobuf = NULL; + struct iovec *rsphdr = NULL; + struct iovec vector[MAX_IOVEC] = {{0}, }; if (!frame || !this || !data) goto unwind; @@ -3729,6 +3764,42 @@ client3_1_fgetxattr (call_frame_t *frame, xlator_t *this, goto unwind; } + local = GF_CALLOC (1, sizeof (*local), + gf_client_mt_clnt_local_t); + if (!local) { + gf_log (this->name, GF_LOG_ERROR, + "Out of memory"); + op_errno = ENOMEM; + goto unwind; + } + frame->local = local; + + rsp_iobref = iobref_new (); + if (rsp_iobref == NULL) { + gf_log (this->name, GF_LOG_ERROR, + "out of memory"); + op_errno = ENOMEM; + goto unwind; + } + + rsp_iobuf = iobuf_get (this->ctx->iobuf_pool); + if (rsp_iobuf == NULL) { + op_errno = ENOMEM; + gf_log (this->name, GF_LOG_ERROR, + "out of memory"); + goto unwind; + } + + iobref_add (rsp_iobref, rsp_iobuf); + iobuf_unref (rsp_iobuf); + rsphdr = &vector[0]; + rsphdr->iov_base = iobuf_ptr (rsp_iobuf); + rsphdr->iov_len = rsp_iobuf->iobuf_arena->iobuf_pool->page_size; + count = 1; + rsp_iobuf = NULL; + local->iobref = rsp_iobref; + rsp_iobref = NULL; + req.namelen = 1; /* Use it as a flag */ req.fd = fdctx->remote_fd; req.name = (char *)args->name; @@ -3740,8 +3811,8 @@ client3_1_fgetxattr (call_frame_t *frame, xlator_t *this, ret = client_submit_request (this, &req, frame, conf->fops, GFS3_OP_FGETXATTR, client3_1_fgetxattr_cbk, NULL, - xdr_from_fgetxattr_req, NULL, 0, - NULL, 0, NULL); + xdr_from_fgetxattr_req, rsphdr, count, + NULL, 0, local->iobref); if (ret) { op_errno = ENOTCONN; goto unwind; @@ -3749,7 +3820,21 @@ client3_1_fgetxattr (call_frame_t *frame, xlator_t *this, return 0; unwind: + local = frame->local; + frame->local = NULL; + STACK_UNWIND_STRICT (fgetxattr, frame, -1, op_errno, NULL); + + client_local_wipe (local); + + if (rsp_iobuf) { + iobuf_unref (rsp_iobuf); + } + + if (rsp_iobref) { + iobref_unref (rsp_iobref); + } + return 0; } @@ -3759,13 +3844,19 @@ int32_t client3_1_getxattr (call_frame_t *frame, xlator_t *this, void *data) { - clnt_conf_t *conf = NULL; - clnt_args_t *args = NULL; - gfs3_getxattr_req req = {{0,},}; - dict_t *dict = NULL; - int ret = 0; - int32_t op_ret = 0; - int op_errno = ESTALE; + clnt_conf_t *conf = NULL; + clnt_args_t *args = NULL; + gfs3_getxattr_req req = {{0,},}; + dict_t *dict = NULL; + int ret = 0; + int32_t op_ret = 0; + int op_errno = ESTALE; + int count = 0; + clnt_local_t *local = NULL; + struct iobref *rsp_iobref = NULL; + struct iobuf *rsp_iobuf = NULL; + struct iovec *rsphdr = NULL; + struct iovec vector[MAX_IOVEC] = {{0}, }; if (!frame || !this || !data) { op_ret = -1; @@ -3780,6 +3871,45 @@ client3_1_getxattr (call_frame_t *frame, xlator_t *this, goto unwind; } + local = GF_CALLOC (1, sizeof (*local), + gf_client_mt_clnt_local_t); + if (!local) { + gf_log (this->name, GF_LOG_ERROR, + "Out of memory"); + op_ret = -1; + op_errno = ENOMEM; + goto unwind; + } + frame->local = local; + + rsp_iobref = iobref_new (); + if (rsp_iobref == NULL) { + gf_log (this->name, GF_LOG_ERROR, + "out of memory"); + op_ret = -1; + op_errno = ENOMEM; + goto unwind; + } + + rsp_iobuf = iobuf_get (this->ctx->iobuf_pool); + if (rsp_iobuf == NULL) { + op_ret = -1; + op_errno = ENOMEM; + gf_log (this->name, GF_LOG_ERROR, + "out of memory"); + goto unwind; + } + + iobref_add (rsp_iobref, rsp_iobuf); + iobuf_unref (rsp_iobuf); + rsphdr = &vector[0]; + rsphdr->iov_base = iobuf_ptr (rsp_iobuf); + rsphdr->iov_len = rsp_iobuf->iobuf_arena->iobuf_pool->page_size; + count = 1; + rsp_iobuf = NULL; + local->iobref = rsp_iobref; + rsp_iobref = NULL; + memcpy (req.gfid, args->loc->inode->gfid, 16); req.namelen = 1; /* Use it as a flag */ req.path = (char *)args->loc->path; @@ -3814,8 +3944,8 @@ client3_1_getxattr (call_frame_t *frame, xlator_t *this, ret = client_submit_request (this, &req, frame, conf->fops, GFS3_OP_GETXATTR, client3_1_getxattr_cbk, NULL, - xdr_from_getxattr_req, NULL, 0, - NULL, 0, NULL); + xdr_from_getxattr_req, rsphdr, count, + NULL, 0, local->iobref); if (ret) { op_ret = -1; op_errno = ENOTCONN; @@ -3824,6 +3954,18 @@ client3_1_getxattr (call_frame_t *frame, xlator_t *this, return 0; unwind: + local = frame->local; + frame->local = NULL; + client_local_wipe (local); + + if (rsp_iobuf) { + iobuf_unref (rsp_iobuf); + } + + if (rsp_iobref) { + iobref_unref (rsp_iobref); + } + STACK_UNWIND_STRICT (getxattr, frame, op_ret, op_errno, dict); return 0; @@ -3835,12 +3977,18 @@ int32_t client3_1_xattrop (call_frame_t *frame, xlator_t *this, void *data) { - clnt_conf_t *conf = NULL; - clnt_args_t *args = NULL; - gfs3_xattrop_req req = {{0,},}; - int ret = 0; - size_t dict_len = 0; - int op_errno = ESTALE; + clnt_conf_t *conf = NULL; + clnt_args_t *args = NULL; + gfs3_xattrop_req req = {{0,},}; + int ret = 0; + size_t dict_len = 0; + int op_errno = ESTALE; + int count = 0; + clnt_local_t *local = NULL; + struct iobref *rsp_iobref = NULL; + struct iobuf *rsp_iobuf = NULL; + struct iovec *rsphdr = NULL; + struct iovec vector[MAX_IOVEC] = {{0}, }; if (!frame || !this || !data) goto unwind; @@ -3850,6 +3998,42 @@ client3_1_xattrop (call_frame_t *frame, xlator_t *this, if (!(args->loc && args->loc->inode)) goto unwind; + local = GF_CALLOC (1, sizeof (*local), + gf_client_mt_clnt_local_t); + if (!local) { + gf_log (this->name, GF_LOG_ERROR, + "Out of memory"); + op_errno = ENOMEM; + goto unwind; + } + frame->local = local; + + rsp_iobref = iobref_new (); + if (rsp_iobref == NULL) { + gf_log (this->name, GF_LOG_ERROR, + "out of memory"); + op_errno = ENOMEM; + goto unwind; + } + + rsp_iobuf = iobuf_get (this->ctx->iobuf_pool); + if (rsp_iobuf == NULL) { + op_errno = ENOMEM; + gf_log (this->name, GF_LOG_ERROR, + "out of memory"); + goto unwind; + } + + iobref_add (rsp_iobref, rsp_iobuf); + iobuf_unref (rsp_iobuf); + rsphdr = &vector[0]; + rsphdr->iov_base = iobuf_ptr (rsp_iobuf); + rsphdr->iov_len = rsp_iobuf->iobuf_arena->iobuf_pool->page_size; + count = 1; + rsp_iobuf = NULL; + local->iobref = rsp_iobref; + rsp_iobref = NULL; + memcpy (req.gfid, args->loc->inode->gfid, 16); if (args->dict) { ret = dict_allocate_and_serialize (args->dict, @@ -3871,8 +4055,8 @@ client3_1_xattrop (call_frame_t *frame, xlator_t *this, ret = client_submit_request (this, &req, frame, conf->fops, GFS3_OP_XATTROP, client3_1_xattrop_cbk, NULL, - xdr_from_xattrop_req, NULL, 0, - NULL, 0, NULL); + xdr_from_xattrop_req, rsphdr, count, + NULL, 0, local->iobref); if (ret) { op_errno = ENOTCONN; goto unwind; @@ -3883,10 +4067,25 @@ client3_1_xattrop (call_frame_t *frame, xlator_t *this, } return 0; unwind: + local = frame->local; + frame->local = NULL; + STACK_UNWIND_STRICT (xattrop, frame, -1, op_errno, NULL); + if (req.dict.dict_val) { GF_FREE (req.dict.dict_val); } + + client_local_wipe (local); + + if (rsp_iobuf) { + iobuf_unref (rsp_iobuf); + } + + if (rsp_iobref) { + iobref_unref (rsp_iobref); + } + return 0; } @@ -3896,13 +4095,19 @@ int32_t client3_1_fxattrop (call_frame_t *frame, xlator_t *this, void *data) { - clnt_args_t *args = NULL; - clnt_fd_ctx_t *fdctx = NULL; - clnt_conf_t *conf = NULL; - gfs3_fxattrop_req req = {{0,},}; - int op_errno = ESTALE; - int ret = 0; - size_t dict_len = 0; + clnt_args_t *args = NULL; + clnt_fd_ctx_t *fdctx = NULL; + clnt_conf_t *conf = NULL; + gfs3_fxattrop_req req = {{0,},}; + int op_errno = ESTALE; + int ret = 0; + size_t dict_len = 0; + int count = 0; + clnt_local_t *local = NULL; + struct iobref *rsp_iobref = NULL; + struct iobuf *rsp_iobuf = NULL; + struct iovec *rsphdr = NULL; + struct iovec vector[MAX_IOVEC] = {{0}, }; if (!frame || !this || !data) goto unwind; @@ -3935,6 +4140,42 @@ client3_1_fxattrop (call_frame_t *frame, xlator_t *this, req.flags = args->flags; memcpy (req.gfid, args->fd->inode->gfid, 16); + local = GF_CALLOC (1, sizeof (*local), + gf_client_mt_clnt_local_t); + if (!local) { + gf_log (this->name, GF_LOG_ERROR, + "Out of memory"); + op_errno = ENOMEM; + goto unwind; + } + frame->local = local; + + rsp_iobref = iobref_new (); + if (rsp_iobref == NULL) { + gf_log (this->name, GF_LOG_ERROR, + "out of memory"); + op_errno = ENOMEM; + goto unwind; + } + + rsp_iobuf = iobuf_get (this->ctx->iobuf_pool); + if (rsp_iobuf == NULL) { + op_errno = ENOMEM; + gf_log (this->name, GF_LOG_ERROR, + "out of memory"); + goto unwind; + } + + iobref_add (rsp_iobref, rsp_iobuf); + iobuf_unref (rsp_iobuf); + rsphdr = &vector[0]; + rsphdr->iov_base = iobuf_ptr (rsp_iobuf); + rsphdr->iov_len = rsp_iobuf->iobuf_arena->iobuf_pool->page_size; + count = 1; + rsp_iobuf = NULL; + local->iobref = rsp_iobref; + rsp_iobref = NULL; + if (args->dict) { ret = dict_allocate_and_serialize (args->dict, &req.dict.dict_val, @@ -3950,8 +4191,8 @@ client3_1_fxattrop (call_frame_t *frame, xlator_t *this, ret = client_submit_request (this, &req, frame, conf->fops, GFS3_OP_FXATTROP, client3_1_fxattrop_cbk, NULL, - xdr_from_fxattrop_req, NULL, 0, NULL, 0, - NULL); + xdr_from_fxattrop_req, rsphdr, count, + NULL, 0, local->iobref); if (ret) { op_errno = ENOTCONN; goto unwind; @@ -3963,10 +4204,25 @@ client3_1_fxattrop (call_frame_t *frame, xlator_t *this, return 0; unwind: + local = frame->local; + frame->local = NULL; + STACK_UNWIND_STRICT (fxattrop, frame, -1, op_errno, NULL); + if (req.dict.dict_val) { GF_FREE (req.dict.dict_val); } + + client_local_wipe (local); + + if (rsp_iobref) { + iobref_unref (rsp_iobref); + } + + if (rsp_iobuf) { + iobuf_unref (rsp_iobuf); + } + return 0; } @@ -4441,12 +4697,20 @@ int32_t client3_1_readdir (call_frame_t *frame, xlator_t *this, void *data) { - clnt_args_t *args = NULL; - clnt_fd_ctx_t *fdctx = NULL; - clnt_conf_t *conf = NULL; - gfs3_readdir_req req = {{0,},}; - int op_errno = ESTALE; + clnt_args_t *args = NULL; + clnt_fd_ctx_t *fdctx = NULL; + clnt_conf_t *conf = NULL; + gfs3_readdir_req req = {{0,},}; + gfs3_readdir_rsp rsp = {0, }; + clnt_local_t *local = NULL; + int op_errno = ESTALE; int ret = 0; + int count = 0; + struct iobref *rsp_iobref = NULL; + struct iobuf *rsp_iobuf = NULL; + struct iovec *rsphdr = NULL; + struct iovec vector[MAX_IOVEC] = {{0}, }; + int readdir_rsp_size = 0; if (!frame || !this || !data) goto unwind; @@ -4475,6 +4739,46 @@ client3_1_readdir (call_frame_t *frame, xlator_t *this, goto unwind; } + readdir_rsp_size = xdr_sizeof ((xdrproc_t) xdr_gfs3_readdir_rsp, &rsp) + + args->size; + + if ((readdir_rsp_size + GLUSTERFS_RPC_REPLY_SIZE + GLUSTERFS_RDMA_MAX_HEADER_SIZE) + > (GLUSTERFS_RDMA_INLINE_THRESHOLD)) { + local = GF_CALLOC (1, sizeof (*local), + gf_client_mt_clnt_local_t); + if (!local) { + gf_log (this->name, GF_LOG_ERROR, + "Out of memory"); + op_errno = ENOMEM; + goto unwind; + } + frame->local = local; + + rsp_iobref = iobref_new (); + if (rsp_iobref == NULL) { + gf_log (this->name, GF_LOG_ERROR, + "out of memory"); + goto unwind; + } + + rsp_iobuf = iobuf_get (this->ctx->iobuf_pool); + if (rsp_iobuf == NULL) { + gf_log (this->name, GF_LOG_ERROR, + "out of memory"); + goto unwind; + } + + iobref_add (rsp_iobref, rsp_iobuf); + iobuf_unref (rsp_iobuf); + rsphdr = &vector[0]; + rsphdr->iov_base = iobuf_ptr (rsp_iobuf); + rsphdr->iov_len + = rsp_iobuf->iobuf_arena->iobuf_pool->page_size; + count = 1; + rsp_iobuf = NULL; + local->iobref = rsp_iobref; + } + req.size = args->size; req.offset = args->offset; req.fd = fdctx->remote_fd; @@ -4482,15 +4786,30 @@ client3_1_readdir (call_frame_t *frame, xlator_t *this, ret = client_submit_request (this, &req, frame, conf->fops, GFS3_OP_READDIR, client3_1_readdir_cbk, NULL, - xdr_from_readdir_req, NULL, 0, NULL, 0, - NULL); + xdr_from_readdir_req, rsphdr, count, + NULL, 0, rsp_iobref); + rsp_iobref = NULL; + if (ret) { op_errno = ENOTCONN; goto unwind; } return 0; + unwind: + local = frame->local; + frame->local = NULL; + client_local_wipe (local); + + if (rsp_iobref != NULL) { + iobref_unref (rsp_iobref); + } + + if (rsp_iobuf != NULL) { + iobuf_unref (rsp_iobuf); + } + STACK_UNWIND_STRICT (readdir, frame, -1, op_errno, NULL); return 0; } @@ -4500,12 +4819,20 @@ int32_t client3_1_readdirp (call_frame_t *frame, xlator_t *this, void *data) { - clnt_args_t *args = NULL; - gfs3_readdirp_req req = {{0,},}; - clnt_fd_ctx_t *fdctx = NULL; - clnt_conf_t *conf = NULL; - int op_errno = ESTALE; - int ret = 0; + clnt_args_t *args = NULL; + gfs3_readdirp_req req = {{0,},}; + gfs3_readdirp_rsp rsp = {0,}; + clnt_fd_ctx_t *fdctx = NULL; + clnt_conf_t *conf = NULL; + int op_errno = ESTALE; + int ret = 0; + int count = 0; + int readdirp_rsp_size = 0; + struct iobref *rsp_iobref = NULL; + struct iobuf *rsp_iobuf = NULL; + struct iovec *rsphdr = NULL; + struct iovec vector[MAX_IOVEC] = {{0}, }; + clnt_local_t *local = NULL; if (!frame || !this || !data) goto unwind; @@ -4534,6 +4861,48 @@ client3_1_readdirp (call_frame_t *frame, xlator_t *this, goto unwind; } + readdirp_rsp_size = xdr_sizeof ((xdrproc_t) xdr_gfs3_readdirp_rsp, &rsp) + + args->size; + + if ((readdirp_rsp_size + GLUSTERFS_RPC_REPLY_SIZE + + GLUSTERFS_RDMA_MAX_HEADER_SIZE) + > (GLUSTERFS_RDMA_INLINE_THRESHOLD)) { + local = GF_CALLOC (1, sizeof (*local), + gf_client_mt_clnt_local_t); + if (!local) { + gf_log (this->name, GF_LOG_ERROR, + "Out of memory"); + op_errno = ENOMEM; + goto unwind; + } + frame->local = local; + + rsp_iobref = iobref_new (); + if (rsp_iobref == NULL) { + gf_log (this->name, GF_LOG_ERROR, + "out of memory"); + goto unwind; + } + + rsp_iobuf = iobuf_get (this->ctx->iobuf_pool); + if (rsp_iobuf == NULL) { + gf_log (this->name, GF_LOG_ERROR, + "out of memory"); + goto unwind; + } + + iobref_add (rsp_iobref, rsp_iobuf); + iobuf_unref (rsp_iobuf); + rsphdr = &vector[0]; + rsphdr->iov_base = iobuf_ptr (rsp_iobuf); + rsphdr->iov_len + = rsp_iobuf->iobuf_arena->iobuf_pool->page_size; + count = 1; + rsp_iobuf = NULL; + local->iobref = rsp_iobref; + rsp_iobref = NULL; + } + req.size = args->size; req.offset = args->offset; req.fd = fdctx->remote_fd; @@ -4541,8 +4910,8 @@ client3_1_readdirp (call_frame_t *frame, xlator_t *this, ret = client_submit_request (this, &req, frame, conf->fops, GFS3_OP_READDIRP, client3_1_readdirp_cbk, NULL, - xdr_from_readdirp_req, NULL, 0, NULL, 0, - NULL); + xdr_from_readdirp_req, rsphdr, count, NULL, + 0, rsp_iobref); if (ret) { op_errno = ENOTCONN; goto unwind; @@ -4550,6 +4919,18 @@ client3_1_readdirp (call_frame_t *frame, xlator_t *this, return 0; unwind: + local = frame->local; + frame->local = NULL; + client_local_wipe (local); + + if (rsp_iobref) { + iobref_unref (rsp_iobref); + } + + if (rsp_iobuf) { + iobuf_unref (rsp_iobuf); + } + STACK_UNWIND_STRICT (readdirp, frame, -1, op_errno, NULL); return 0; } -- cgit