From d19a72024c8cd5e40dc48df27c21fda701e76c54 Mon Sep 17 00:00:00 2001 From: Raghavendra G Date: Thu, 4 Nov 2010 02:33:19 +0000 Subject: rpc-transport/rdma: Fix 2KB as threshold size for msgs that can be transferred inline. - Any fop/mop that can result a reply whose size is greater than this threshold allocates and submits a buffer along with the request to receive reply. Signed-off-by: Raghavendra G Signed-off-by: Anand V. Avati BUG: 513 (Introduce 0 copy rdma) URL: http://bugs.gluster.com/cgi-bin/bugzilla3/show_bug.cgi?id=513 --- libglusterfs/src/glusterfs.h | 7 + rpc/rpc-transport/rdma/src/rdma.c | 20 +- rpc/rpc-transport/rdma/src/rdma.h | 16 +- xlators/protocol/client/src/Makefile.am | 2 +- xlators/protocol/client/src/client3_1-fops.c | 495 ++++++++++++++++++++++++--- 5 files changed, 469 insertions(+), 71 deletions(-) diff --git a/libglusterfs/src/glusterfs.h b/libglusterfs/src/glusterfs.h index bf37f4fa5e0..2acd3d9d589 100644 --- a/libglusterfs/src/glusterfs.h +++ b/libglusterfs/src/glusterfs.h @@ -78,6 +78,13 @@ #define GLUSTERFS_INODELK_COUNT "glusterfs.inodelk-count" #define GLUSTERFS_ENTRYLK_COUNT "glusterfs.entrylk-count" #define GLUSTERFS_POSIXLK_COUNT "glusterfs.posixlk-count" +#define GLUSTERFS_RDMA_INLINE_THRESHOLD (2048) +#define GLUSTERFS_RDMA_MAX_HEADER_SIZE (228) /* (sizeof (rdma_header_t) \ + + RDMA_MAX_SEGMENTS \ + * sizeof (rdma_read_chunk_t)) + */ + +#define GLUSTERFS_RPC_REPLY_SIZE 24 #define ZR_FILE_CONTENT_REQUEST(key) (!strncmp(key, ZR_FILE_CONTENT_STR, \ ZR_FILE_CONTENT_STRLEN)) diff --git a/rpc/rpc-transport/rdma/src/rdma.c b/rpc/rpc-transport/rdma/src/rdma.c index 4301f4119a0..b2258fbb99d 100644 --- a/rpc/rpc-transport/rdma/src/rdma.c +++ b/rpc/rpc-transport/rdma/src/rdma.c @@ -957,7 +957,7 @@ __rdma_ioq_churn_request (rdma_peer_t *peer, rdma_ioq_t *entry, send_size = iov_length (entry->rpchdr, entry->rpchdr_count) + iov_length (entry->proghdr, entry->proghdr_count) - + RDMA_MAX_HEADER_SIZE; + + GLUSTERFS_RDMA_MAX_HEADER_SIZE; if (entry->prog_payload_count != 0) { prog_payload_length @@ -965,9 +965,10 @@ __rdma_ioq_churn_request (rdma_peer_t *peer, rdma_ioq_t *entry, entry->prog_payload_count); } - if (send_size > RDMA_INLINE_THRESHOLD) { + if (send_size > GLUSTERFS_RDMA_INLINE_THRESHOLD) { rtype = rdma_areadch; - } else if ((send_size + prog_payload_length) < RDMA_INLINE_THRESHOLD) { + } else if ((send_size + prog_payload_length) + < GLUSTERFS_RDMA_INLINE_THRESHOLD) { rtype = rdma_noch; } else if (entry->prog_payload_count != 0) { rtype = rdma_readch; @@ -1151,7 +1152,7 @@ __rdma_send_reply_inline (rdma_peer_t *peer, rdma_ioq_t *entry, * reply */ - if (send_size > RDMA_INLINE_THRESHOLD) { + if (send_size > GLUSTERFS_RDMA_INLINE_THRESHOLD) { ret = __rdma_send_error (peer, entry, post, reply_info, ERR_CHUNK); goto out; @@ -1492,14 +1493,15 @@ __rdma_send_reply_type_msg (rdma_peer_t *peer, rdma_ioq_t *entry, send_size = iov_length (entry->rpchdr, entry->rpchdr_count) + iov_length (entry->proghdr, entry->proghdr_count) - + RDMA_MAX_HEADER_SIZE; + + GLUSTERFS_RDMA_MAX_HEADER_SIZE; - if (send_size > RDMA_INLINE_THRESHOLD) { + if (send_size > GLUSTERFS_RDMA_INLINE_THRESHOLD) { gf_log (RDMA_LOG_NAME, GF_LOG_DEBUG, "client has provided only write chunks, but the " "combined size of rpc and program header (%d) is " "exceeding the size of msg that can be sent using " - "RDMA send (%d)", send_size, RDMA_INLINE_THRESHOLD); + "RDMA send (%d)", send_size, + GLUSTERFS_RDMA_INLINE_THRESHOLD); ret = __rdma_send_error (peer, entry, post, reply_info, ERR_CHUNK); @@ -3550,8 +3552,8 @@ rdma_options_init (rpc_transport_t *this) /* TODO: validate arguments from options below */ - options->send_size = this->ctx->page_size * 4; /* 512 KB */ - options->recv_size = this->ctx->page_size * 4; /* 512 KB */ + options->send_size = GLUSTERFS_RDMA_INLINE_THRESHOLD;/*this->ctx->page_size * 4; 512 KB*/ + options->recv_size = GLUSTERFS_RDMA_INLINE_THRESHOLD;/*this->ctx->page_size * 4; 512 KB*/ options->send_count = 32; options->recv_count = 32; diff --git a/rpc/rpc-transport/rdma/src/rdma.h b/rpc/rpc-transport/rdma/src/rdma.h index 4bb5a3759e1..e24ce76ccf1 100644 --- a/rpc/rpc-transport/rdma/src/rdma.h +++ b/rpc/rpc-transport/rdma/src/rdma.h @@ -41,11 +41,12 @@ /* FIXME: give appropriate values to these macros */ #define GF_DEFAULT_RDMA_LISTEN_PORT (GF_DEFAULT_BASE_PORT + 1) + +/* If you are changing RDMA_MAX_SEGMENTS, please make sure to update + * GLUSTERFS_RDMA_MAX_HEADER_SIZE defined in glusterfs.h . + */ #define RDMA_MAX_SEGMENTS 8 -#define RDMA_MAX_HEADER_SIZE (sizeof (rdma_header_t) \ - + RDMA_MAX_SEGMENTS \ - * sizeof (rdma_read_chunk_t)) -#define RDMA_INLINE_THRESHOLD (1024 * 128) + #define RDMA_VERSION 1 #define RDMA_POOL_SIZE 512 @@ -76,6 +77,9 @@ typedef enum rdma_chunktype { rdma_replych /* entire reply through rdma write */ }rdma_chunktype_t; +/* If you are modifying __rdma_header, please make sure to change + * GLUSTERFS_RDMA_MAX_HEADER_SIZE defined in glusterfs.h to reflect your changes + */ struct __rdma_header { uint32_t rm_xid; /* Mirrors the RPC header xid */ uint32_t rm_vers; /* Version of this protocol */ @@ -102,6 +106,10 @@ struct __rdma_header { } __attribute__((packed)); typedef struct __rdma_header rdma_header_t; +/* If you are modifying __rdma_segment or __rdma_read_chunk, please make sure + * to change GLUSTERFS_RDMA_MAX_HEADER_SIZE defined in glusterfs.h to reflect + * your changes. + */ struct __rdma_segment { uint32_t rs_handle; /* Registered memory handle */ uint32_t rs_length; /* Length of the chunk in bytes */ diff --git a/xlators/protocol/client/src/Makefile.am b/xlators/protocol/client/src/Makefile.am index f2dea68d70d..29c6e9dde76 100644 --- a/xlators/protocol/client/src/Makefile.am +++ b/xlators/protocol/client/src/Makefile.am @@ -14,4 +14,4 @@ noinst_HEADERS = client.h client-mem-types.h AM_CFLAGS = -fPIC -D_FILE_OFFSET_BITS=64 -D_GNU_SOURCE -Wall -D$(GF_HOST_OS) \ -I$(top_srcdir)/libglusterfs/src -shared -nostartfiles $(GF_CFLAGS) \ - -I$(top_srcdir)/rpc/xdr/src -I$(top_srcdir)/rpc/rpc-lib/src/ + -I$(top_srcdir)/rpc/xdr/src -I$(top_srcdir)/rpc/rpc-lib/src/ \ No newline at end of file diff --git a/xlators/protocol/client/src/client3_1-fops.c b/xlators/protocol/client/src/client3_1-fops.c index 184f67d0b7e..86e7fb24e0f 100644 --- a/xlators/protocol/client/src/client3_1-fops.c +++ b/xlators/protocol/client/src/client3_1-fops.c @@ -17,7 +17,6 @@ . */ - #ifndef _CONFIG_H #define _CONFIG_H #include "config.h" @@ -724,8 +723,11 @@ client3_1_getxattr_cbk (struct rpc_req *req, struct iovec *iov, int count, int op_errno = EINVAL; gfs3_getxattr_rsp rsp = {0,}; int ret = 0; + clnt_local_t *local = NULL; frame = myframe; + local = frame->local; + frame->local = NULL; if (-1 == req->rpc_status) { op_ret = -1; @@ -783,6 +785,8 @@ out: if (dict) dict_unref (dict); + client_local_wipe (local); + return 0; } @@ -798,8 +802,11 @@ client3_1_fgetxattr_cbk (struct rpc_req *req, struct iovec *iov, int count, int dict_len = 0; int op_ret = 0; int op_errno = EINVAL; + clnt_local_t *local = NULL; frame = myframe; + local = frame->local; + frame->local = NULL; if (-1 == req->rpc_status) { op_ret = -1; @@ -853,6 +860,9 @@ out: if (dict) dict_unref (dict); + if (local) + client_local_wipe (local); + return 0; } @@ -1155,8 +1165,11 @@ client3_1_xattrop_cbk (struct rpc_req *req, struct iovec *iov, int count, int op_ret = 0; int dict_len = 0; int op_errno = EINVAL; + clnt_local_t *local = NULL; frame = myframe; + local = frame->local; + frame->local = NULL; if (-1 == req->rpc_status) { op_ret = -1; @@ -1214,6 +1227,8 @@ out: if (dict) dict_unref (dict); + client_local_wipe (local); + return 0; } @@ -1229,8 +1244,11 @@ client3_1_fxattrop_cbk (struct rpc_req *req, struct iovec *iov, int count, int op_ret = 0; int dict_len = 0; int op_errno = 0; + clnt_local_t *local = NULL; frame = myframe; + local = frame->local; + frame->local = NULL; if (-1 == req->rpc_status) { op_ret = -1; @@ -1288,6 +1306,7 @@ out: if (dict) dict_unref (dict); + client_local_wipe (local); return 0; } @@ -1575,11 +1594,14 @@ client3_1_readdir_cbk (struct rpc_req *req, struct iovec *iov, int count, void *myframe) { call_frame_t *frame = NULL; - gfs3_readdir_rsp rsp = {0,}; - int32_t ret = 0; + gfs3_readdir_rsp rsp = {0,}; + int32_t ret = 0; + clnt_local_t *local = NULL; gf_dirent_t entries; frame = myframe; + local = frame->local; + frame->local = NULL; if (-1 == req->rpc_status) { rsp.op_ret = -1; @@ -1604,6 +1626,8 @@ out: STACK_UNWIND_STRICT (readdir, frame, rsp.op_ret, gf_error_to_errno (rsp.op_errno), &entries); + client_local_wipe (local); + if (rsp.op_ret != -1) { gf_dirent_free (&entries); } @@ -1619,11 +1643,14 @@ client3_1_readdirp_cbk (struct rpc_req *req, struct iovec *iov, int count, void *myframe) { call_frame_t *frame = NULL; - gfs3_readdirp_rsp rsp = {0,}; - int32_t ret = 0; + gfs3_readdirp_rsp rsp = {0,}; + int32_t ret = 0; + clnt_local_t *local = NULL; gf_dirent_t entries; frame = myframe; + local = frame->local; + frame->local = NULL; if (-1 == req->rpc_status) { rsp.op_ret = -1; @@ -1648,6 +1675,8 @@ out: STACK_UNWIND_STRICT (readdirp, frame, rsp.op_ret, gf_error_to_errno (rsp.op_errno), &entries); + client_local_wipe (local); + if (rsp.op_ret != -1) { gf_dirent_free (&entries); } @@ -1932,7 +1961,7 @@ client3_1_readv_cbk (struct rpc_req *req, struct iovec *iov, int count, { call_frame_t *frame = NULL; struct iobref *iobref = NULL; - struct iovec vector[MAX_IOVEC]; + struct iovec vector[MAX_IOVEC] = {{0}, }; struct iatt stat = {0,}; gfs3_read_rsp rsp = {0,}; int ret = 0, rspcount = 0; @@ -2168,7 +2197,7 @@ client3_1_lookup (call_frame_t *frame, xlator_t *this, size_t dict_len = 0; int op_errno = ESTALE; data_t *content = NULL; - struct iovec vector[MAX_IOVEC]; + struct iovec vector[MAX_IOVEC] = {{0}, }; int count = 0; struct iobref *rsp_iobref = NULL; struct iobuf *rsp_iobuf = NULL; @@ -2245,7 +2274,7 @@ client3_1_lookup (call_frame_t *frame, xlator_t *this, ret = client_submit_request (this, &req, frame, conf->fops, GFS3_OP_LOOKUP, client3_1_lookup_cbk, NULL, xdr_from_lookup_req, rsphdr, count, - NULL, 0, rsp_iobref); + NULL, 0, local->iobref); if (ret) { op_errno = ENOTCONN; @@ -3695,12 +3724,18 @@ int32_t client3_1_fgetxattr (call_frame_t *frame, xlator_t *this, void *data) { - clnt_args_t *args = NULL; - clnt_fd_ctx_t *fdctx = NULL; - clnt_conf_t *conf = NULL; - gfs3_fgetxattr_req req = {{0,},}; - int op_errno = ESTALE; - int ret = 0; + clnt_args_t *args = NULL; + clnt_fd_ctx_t *fdctx = NULL; + clnt_conf_t *conf = NULL; + gfs3_fgetxattr_req req = {{0,},}; + int op_errno = ESTALE; + int ret = 0; + int count = 0; + clnt_local_t *local = NULL; + struct iobref *rsp_iobref = NULL; + struct iobuf *rsp_iobuf = NULL; + struct iovec *rsphdr = NULL; + struct iovec vector[MAX_IOVEC] = {{0}, }; if (!frame || !this || !data) goto unwind; @@ -3729,6 +3764,42 @@ client3_1_fgetxattr (call_frame_t *frame, xlator_t *this, goto unwind; } + local = GF_CALLOC (1, sizeof (*local), + gf_client_mt_clnt_local_t); + if (!local) { + gf_log (this->name, GF_LOG_ERROR, + "Out of memory"); + op_errno = ENOMEM; + goto unwind; + } + frame->local = local; + + rsp_iobref = iobref_new (); + if (rsp_iobref == NULL) { + gf_log (this->name, GF_LOG_ERROR, + "out of memory"); + op_errno = ENOMEM; + goto unwind; + } + + rsp_iobuf = iobuf_get (this->ctx->iobuf_pool); + if (rsp_iobuf == NULL) { + op_errno = ENOMEM; + gf_log (this->name, GF_LOG_ERROR, + "out of memory"); + goto unwind; + } + + iobref_add (rsp_iobref, rsp_iobuf); + iobuf_unref (rsp_iobuf); + rsphdr = &vector[0]; + rsphdr->iov_base = iobuf_ptr (rsp_iobuf); + rsphdr->iov_len = rsp_iobuf->iobuf_arena->iobuf_pool->page_size; + count = 1; + rsp_iobuf = NULL; + local->iobref = rsp_iobref; + rsp_iobref = NULL; + req.namelen = 1; /* Use it as a flag */ req.fd = fdctx->remote_fd; req.name = (char *)args->name; @@ -3740,8 +3811,8 @@ client3_1_fgetxattr (call_frame_t *frame, xlator_t *this, ret = client_submit_request (this, &req, frame, conf->fops, GFS3_OP_FGETXATTR, client3_1_fgetxattr_cbk, NULL, - xdr_from_fgetxattr_req, NULL, 0, - NULL, 0, NULL); + xdr_from_fgetxattr_req, rsphdr, count, + NULL, 0, local->iobref); if (ret) { op_errno = ENOTCONN; goto unwind; @@ -3749,7 +3820,21 @@ client3_1_fgetxattr (call_frame_t *frame, xlator_t *this, return 0; unwind: + local = frame->local; + frame->local = NULL; + STACK_UNWIND_STRICT (fgetxattr, frame, -1, op_errno, NULL); + + client_local_wipe (local); + + if (rsp_iobuf) { + iobuf_unref (rsp_iobuf); + } + + if (rsp_iobref) { + iobref_unref (rsp_iobref); + } + return 0; } @@ -3759,13 +3844,19 @@ int32_t client3_1_getxattr (call_frame_t *frame, xlator_t *this, void *data) { - clnt_conf_t *conf = NULL; - clnt_args_t *args = NULL; - gfs3_getxattr_req req = {{0,},}; - dict_t *dict = NULL; - int ret = 0; - int32_t op_ret = 0; - int op_errno = ESTALE; + clnt_conf_t *conf = NULL; + clnt_args_t *args = NULL; + gfs3_getxattr_req req = {{0,},}; + dict_t *dict = NULL; + int ret = 0; + int32_t op_ret = 0; + int op_errno = ESTALE; + int count = 0; + clnt_local_t *local = NULL; + struct iobref *rsp_iobref = NULL; + struct iobuf *rsp_iobuf = NULL; + struct iovec *rsphdr = NULL; + struct iovec vector[MAX_IOVEC] = {{0}, }; if (!frame || !this || !data) { op_ret = -1; @@ -3780,6 +3871,45 @@ client3_1_getxattr (call_frame_t *frame, xlator_t *this, goto unwind; } + local = GF_CALLOC (1, sizeof (*local), + gf_client_mt_clnt_local_t); + if (!local) { + gf_log (this->name, GF_LOG_ERROR, + "Out of memory"); + op_ret = -1; + op_errno = ENOMEM; + goto unwind; + } + frame->local = local; + + rsp_iobref = iobref_new (); + if (rsp_iobref == NULL) { + gf_log (this->name, GF_LOG_ERROR, + "out of memory"); + op_ret = -1; + op_errno = ENOMEM; + goto unwind; + } + + rsp_iobuf = iobuf_get (this->ctx->iobuf_pool); + if (rsp_iobuf == NULL) { + op_ret = -1; + op_errno = ENOMEM; + gf_log (this->name, GF_LOG_ERROR, + "out of memory"); + goto unwind; + } + + iobref_add (rsp_iobref, rsp_iobuf); + iobuf_unref (rsp_iobuf); + rsphdr = &vector[0]; + rsphdr->iov_base = iobuf_ptr (rsp_iobuf); + rsphdr->iov_len = rsp_iobuf->iobuf_arena->iobuf_pool->page_size; + count = 1; + rsp_iobuf = NULL; + local->iobref = rsp_iobref; + rsp_iobref = NULL; + memcpy (req.gfid, args->loc->inode->gfid, 16); req.namelen = 1; /* Use it as a flag */ req.path = (char *)args->loc->path; @@ -3814,8 +3944,8 @@ client3_1_getxattr (call_frame_t *frame, xlator_t *this, ret = client_submit_request (this, &req, frame, conf->fops, GFS3_OP_GETXATTR, client3_1_getxattr_cbk, NULL, - xdr_from_getxattr_req, NULL, 0, - NULL, 0, NULL); + xdr_from_getxattr_req, rsphdr, count, + NULL, 0, local->iobref); if (ret) { op_ret = -1; op_errno = ENOTCONN; @@ -3824,6 +3954,18 @@ client3_1_getxattr (call_frame_t *frame, xlator_t *this, return 0; unwind: + local = frame->local; + frame->local = NULL; + client_local_wipe (local); + + if (rsp_iobuf) { + iobuf_unref (rsp_iobuf); + } + + if (rsp_iobref) { + iobref_unref (rsp_iobref); + } + STACK_UNWIND_STRICT (getxattr, frame, op_ret, op_errno, dict); return 0; @@ -3835,12 +3977,18 @@ int32_t client3_1_xattrop (call_frame_t *frame, xlator_t *this, void *data) { - clnt_conf_t *conf = NULL; - clnt_args_t *args = NULL; - gfs3_xattrop_req req = {{0,},}; - int ret = 0; - size_t dict_len = 0; - int op_errno = ESTALE; + clnt_conf_t *conf = NULL; + clnt_args_t *args = NULL; + gfs3_xattrop_req req = {{0,},}; + int ret = 0; + size_t dict_len = 0; + int op_errno = ESTALE; + int count = 0; + clnt_local_t *local = NULL; + struct iobref *rsp_iobref = NULL; + struct iobuf *rsp_iobuf = NULL; + struct iovec *rsphdr = NULL; + struct iovec vector[MAX_IOVEC] = {{0}, }; if (!frame || !this || !data) goto unwind; @@ -3850,6 +3998,42 @@ client3_1_xattrop (call_frame_t *frame, xlator_t *this, if (!(args->loc && args->loc->inode)) goto unwind; + local = GF_CALLOC (1, sizeof (*local), + gf_client_mt_clnt_local_t); + if (!local) { + gf_log (this->name, GF_LOG_ERROR, + "Out of memory"); + op_errno = ENOMEM; + goto unwind; + } + frame->local = local; + + rsp_iobref = iobref_new (); + if (rsp_iobref == NULL) { + gf_log (this->name, GF_LOG_ERROR, + "out of memory"); + op_errno = ENOMEM; + goto unwind; + } + + rsp_iobuf = iobuf_get (this->ctx->iobuf_pool); + if (rsp_iobuf == NULL) { + op_errno = ENOMEM; + gf_log (this->name, GF_LOG_ERROR, + "out of memory"); + goto unwind; + } + + iobref_add (rsp_iobref, rsp_iobuf); + iobuf_unref (rsp_iobuf); + rsphdr = &vector[0]; + rsphdr->iov_base = iobuf_ptr (rsp_iobuf); + rsphdr->iov_len = rsp_iobuf->iobuf_arena->iobuf_pool->page_size; + count = 1; + rsp_iobuf = NULL; + local->iobref = rsp_iobref; + rsp_iobref = NULL; + memcpy (req.gfid, args->loc->inode->gfid, 16); if (args->dict) { ret = dict_allocate_and_serialize (args->dict, @@ -3871,8 +4055,8 @@ client3_1_xattrop (call_frame_t *frame, xlator_t *this, ret = client_submit_request (this, &req, frame, conf->fops, GFS3_OP_XATTROP, client3_1_xattrop_cbk, NULL, - xdr_from_xattrop_req, NULL, 0, - NULL, 0, NULL); + xdr_from_xattrop_req, rsphdr, count, + NULL, 0, local->iobref); if (ret) { op_errno = ENOTCONN; goto unwind; @@ -3883,10 +4067,25 @@ client3_1_xattrop (call_frame_t *frame, xlator_t *this, } return 0; unwind: + local = frame->local; + frame->local = NULL; + STACK_UNWIND_STRICT (xattrop, frame, -1, op_errno, NULL); + if (req.dict.dict_val) { GF_FREE (req.dict.dict_val); } + + client_local_wipe (local); + + if (rsp_iobuf) { + iobuf_unref (rsp_iobuf); + } + + if (rsp_iobref) { + iobref_unref (rsp_iobref); + } + return 0; } @@ -3896,13 +4095,19 @@ int32_t client3_1_fxattrop (call_frame_t *frame, xlator_t *this, void *data) { - clnt_args_t *args = NULL; - clnt_fd_ctx_t *fdctx = NULL; - clnt_conf_t *conf = NULL; - gfs3_fxattrop_req req = {{0,},}; - int op_errno = ESTALE; - int ret = 0; - size_t dict_len = 0; + clnt_args_t *args = NULL; + clnt_fd_ctx_t *fdctx = NULL; + clnt_conf_t *conf = NULL; + gfs3_fxattrop_req req = {{0,},}; + int op_errno = ESTALE; + int ret = 0; + size_t dict_len = 0; + int count = 0; + clnt_local_t *local = NULL; + struct iobref *rsp_iobref = NULL; + struct iobuf *rsp_iobuf = NULL; + struct iovec *rsphdr = NULL; + struct iovec vector[MAX_IOVEC] = {{0}, }; if (!frame || !this || !data) goto unwind; @@ -3935,6 +4140,42 @@ client3_1_fxattrop (call_frame_t *frame, xlator_t *this, req.flags = args->flags; memcpy (req.gfid, args->fd->inode->gfid, 16); + local = GF_CALLOC (1, sizeof (*local), + gf_client_mt_clnt_local_t); + if (!local) { + gf_log (this->name, GF_LOG_ERROR, + "Out of memory"); + op_errno = ENOMEM; + goto unwind; + } + frame->local = local; + + rsp_iobref = iobref_new (); + if (rsp_iobref == NULL) { + gf_log (this->name, GF_LOG_ERROR, + "out of memory"); + op_errno = ENOMEM; + goto unwind; + } + + rsp_iobuf = iobuf_get (this->ctx->iobuf_pool); + if (rsp_iobuf == NULL) { + op_errno = ENOMEM; + gf_log (this->name, GF_LOG_ERROR, + "out of memory"); + goto unwind; + } + + iobref_add (rsp_iobref, rsp_iobuf); + iobuf_unref (rsp_iobuf); + rsphdr = &vector[0]; + rsphdr->iov_base = iobuf_ptr (rsp_iobuf); + rsphdr->iov_len = rsp_iobuf->iobuf_arena->iobuf_pool->page_size; + count = 1; + rsp_iobuf = NULL; + local->iobref = rsp_iobref; + rsp_iobref = NULL; + if (args->dict) { ret = dict_allocate_and_serialize (args->dict, &req.dict.dict_val, @@ -3950,8 +4191,8 @@ client3_1_fxattrop (call_frame_t *frame, xlator_t *this, ret = client_submit_request (this, &req, frame, conf->fops, GFS3_OP_FXATTROP, client3_1_fxattrop_cbk, NULL, - xdr_from_fxattrop_req, NULL, 0, NULL, 0, - NULL); + xdr_from_fxattrop_req, rsphdr, count, + NULL, 0, local->iobref); if (ret) { op_errno = ENOTCONN; goto unwind; @@ -3963,10 +4204,25 @@ client3_1_fxattrop (call_frame_t *frame, xlator_t *this, return 0; unwind: + local = frame->local; + frame->local = NULL; + STACK_UNWIND_STRICT (fxattrop, frame, -1, op_errno, NULL); + if (req.dict.dict_val) { GF_FREE (req.dict.dict_val); } + + client_local_wipe (local); + + if (rsp_iobref) { + iobref_unref (rsp_iobref); + } + + if (rsp_iobuf) { + iobuf_unref (rsp_iobuf); + } + return 0; } @@ -4441,12 +4697,20 @@ int32_t client3_1_readdir (call_frame_t *frame, xlator_t *this, void *data) { - clnt_args_t *args = NULL; - clnt_fd_ctx_t *fdctx = NULL; - clnt_conf_t *conf = NULL; - gfs3_readdir_req req = {{0,},}; - int op_errno = ESTALE; + clnt_args_t *args = NULL; + clnt_fd_ctx_t *fdctx = NULL; + clnt_conf_t *conf = NULL; + gfs3_readdir_req req = {{0,},}; + gfs3_readdir_rsp rsp = {0, }; + clnt_local_t *local = NULL; + int op_errno = ESTALE; int ret = 0; + int count = 0; + struct iobref *rsp_iobref = NULL; + struct iobuf *rsp_iobuf = NULL; + struct iovec *rsphdr = NULL; + struct iovec vector[MAX_IOVEC] = {{0}, }; + int readdir_rsp_size = 0; if (!frame || !this || !data) goto unwind; @@ -4475,6 +4739,46 @@ client3_1_readdir (call_frame_t *frame, xlator_t *this, goto unwind; } + readdir_rsp_size = xdr_sizeof ((xdrproc_t) xdr_gfs3_readdir_rsp, &rsp) + + args->size; + + if ((readdir_rsp_size + GLUSTERFS_RPC_REPLY_SIZE + GLUSTERFS_RDMA_MAX_HEADER_SIZE) + > (GLUSTERFS_RDMA_INLINE_THRESHOLD)) { + local = GF_CALLOC (1, sizeof (*local), + gf_client_mt_clnt_local_t); + if (!local) { + gf_log (this->name, GF_LOG_ERROR, + "Out of memory"); + op_errno = ENOMEM; + goto unwind; + } + frame->local = local; + + rsp_iobref = iobref_new (); + if (rsp_iobref == NULL) { + gf_log (this->name, GF_LOG_ERROR, + "out of memory"); + goto unwind; + } + + rsp_iobuf = iobuf_get (this->ctx->iobuf_pool); + if (rsp_iobuf == NULL) { + gf_log (this->name, GF_LOG_ERROR, + "out of memory"); + goto unwind; + } + + iobref_add (rsp_iobref, rsp_iobuf); + iobuf_unref (rsp_iobuf); + rsphdr = &vector[0]; + rsphdr->iov_base = iobuf_ptr (rsp_iobuf); + rsphdr->iov_len + = rsp_iobuf->iobuf_arena->iobuf_pool->page_size; + count = 1; + rsp_iobuf = NULL; + local->iobref = rsp_iobref; + } + req.size = args->size; req.offset = args->offset; req.fd = fdctx->remote_fd; @@ -4482,15 +4786,30 @@ client3_1_readdir (call_frame_t *frame, xlator_t *this, ret = client_submit_request (this, &req, frame, conf->fops, GFS3_OP_READDIR, client3_1_readdir_cbk, NULL, - xdr_from_readdir_req, NULL, 0, NULL, 0, - NULL); + xdr_from_readdir_req, rsphdr, count, + NULL, 0, rsp_iobref); + rsp_iobref = NULL; + if (ret) { op_errno = ENOTCONN; goto unwind; } return 0; + unwind: + local = frame->local; + frame->local = NULL; + client_local_wipe (local); + + if (rsp_iobref != NULL) { + iobref_unref (rsp_iobref); + } + + if (rsp_iobuf != NULL) { + iobuf_unref (rsp_iobuf); + } + STACK_UNWIND_STRICT (readdir, frame, -1, op_errno, NULL); return 0; } @@ -4500,12 +4819,20 @@ int32_t client3_1_readdirp (call_frame_t *frame, xlator_t *this, void *data) { - clnt_args_t *args = NULL; - gfs3_readdirp_req req = {{0,},}; - clnt_fd_ctx_t *fdctx = NULL; - clnt_conf_t *conf = NULL; - int op_errno = ESTALE; - int ret = 0; + clnt_args_t *args = NULL; + gfs3_readdirp_req req = {{0,},}; + gfs3_readdirp_rsp rsp = {0,}; + clnt_fd_ctx_t *fdctx = NULL; + clnt_conf_t *conf = NULL; + int op_errno = ESTALE; + int ret = 0; + int count = 0; + int readdirp_rsp_size = 0; + struct iobref *rsp_iobref = NULL; + struct iobuf *rsp_iobuf = NULL; + struct iovec *rsphdr = NULL; + struct iovec vector[MAX_IOVEC] = {{0}, }; + clnt_local_t *local = NULL; if (!frame || !this || !data) goto unwind; @@ -4534,6 +4861,48 @@ client3_1_readdirp (call_frame_t *frame, xlator_t *this, goto unwind; } + readdirp_rsp_size = xdr_sizeof ((xdrproc_t) xdr_gfs3_readdirp_rsp, &rsp) + + args->size; + + if ((readdirp_rsp_size + GLUSTERFS_RPC_REPLY_SIZE + + GLUSTERFS_RDMA_MAX_HEADER_SIZE) + > (GLUSTERFS_RDMA_INLINE_THRESHOLD)) { + local = GF_CALLOC (1, sizeof (*local), + gf_client_mt_clnt_local_t); + if (!local) { + gf_log (this->name, GF_LOG_ERROR, + "Out of memory"); + op_errno = ENOMEM; + goto unwind; + } + frame->local = local; + + rsp_iobref = iobref_new (); + if (rsp_iobref == NULL) { + gf_log (this->name, GF_LOG_ERROR, + "out of memory"); + goto unwind; + } + + rsp_iobuf = iobuf_get (this->ctx->iobuf_pool); + if (rsp_iobuf == NULL) { + gf_log (this->name, GF_LOG_ERROR, + "out of memory"); + goto unwind; + } + + iobref_add (rsp_iobref, rsp_iobuf); + iobuf_unref (rsp_iobuf); + rsphdr = &vector[0]; + rsphdr->iov_base = iobuf_ptr (rsp_iobuf); + rsphdr->iov_len + = rsp_iobuf->iobuf_arena->iobuf_pool->page_size; + count = 1; + rsp_iobuf = NULL; + local->iobref = rsp_iobref; + rsp_iobref = NULL; + } + req.size = args->size; req.offset = args->offset; req.fd = fdctx->remote_fd; @@ -4541,8 +4910,8 @@ client3_1_readdirp (call_frame_t *frame, xlator_t *this, ret = client_submit_request (this, &req, frame, conf->fops, GFS3_OP_READDIRP, client3_1_readdirp_cbk, NULL, - xdr_from_readdirp_req, NULL, 0, NULL, 0, - NULL); + xdr_from_readdirp_req, rsphdr, count, NULL, + 0, rsp_iobref); if (ret) { op_errno = ENOTCONN; goto unwind; @@ -4550,6 +4919,18 @@ client3_1_readdirp (call_frame_t *frame, xlator_t *this, return 0; unwind: + local = frame->local; + frame->local = NULL; + client_local_wipe (local); + + if (rsp_iobref) { + iobref_unref (rsp_iobref); + } + + if (rsp_iobuf) { + iobuf_unref (rsp_iobuf); + } + STACK_UNWIND_STRICT (readdirp, frame, -1, op_errno, NULL); return 0; } -- cgit