summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--rpc/rpc-transport/socket/src/socket.c74
-rw-r--r--rpc/rpc-transport/socket/src/socket.h2
-rw-r--r--xlators/protocol/client/src/client3_1-fops.c15
-rw-r--r--xlators/protocol/server/src/server3_1-fops.c60
4 files changed, 123 insertions, 28 deletions
diff --git a/rpc/rpc-transport/socket/src/socket.c b/rpc/rpc-transport/socket/src/socket.c
index 6c2d909e43a..5e65755d813 100644
--- a/rpc/rpc-transport/socket/src/socket.c
+++ b/rpc/rpc-transport/socket/src/socket.c
@@ -815,6 +815,7 @@ __socket_read_vectored_request (rpc_transport_t *this, rpcsvc_vector_sizer vecto
uint32_t remaining_size = 0;
ssize_t readsize = 0;
size_t size = 0;
+ char *proghdr_buf = NULL;
GF_VALIDATE_OR_GOTO ("socket", this, out);
GF_VALIDATE_OR_GOTO ("socket", this->private, out);
@@ -871,10 +872,10 @@ __socket_read_vectored_request (rpc_transport_t *this, rpcsvc_vector_sizer vecto
case SP_STATE_READ_VERFBYTES:
sp_state_read_verfbytes:
+ proghdr_buf = priv->incoming.frag.fragcurrent;
priv->incoming.frag.call_body.request.vector_sizer_state =
vector_sizer (priv->incoming.frag.call_body.request.vector_sizer_state,
- &readsize,
- priv->incoming.frag.fragcurrent);
+ &readsize, proghdr_buf);
__socket_proto_init_pending (priv, readsize);
priv->incoming.frag.call_body.request.vector_state
= SP_STATE_READING_PROGHDR;
@@ -886,8 +887,7 @@ sp_state_read_verfbytes:
sp_state_reading_proghdr:
priv->incoming.frag.call_body.request.vector_sizer_state =
vector_sizer (priv->incoming.frag.call_body.request.vector_sizer_state,
- &readsize,
- priv->incoming.frag.fragcurrent);
+ &readsize, proghdr_buf);
if (readsize == 0) {
priv->incoming.frag.call_body.request.vector_state =
SP_STATE_READ_PROGHDR;
@@ -1038,12 +1038,14 @@ out:
inline int
__socket_read_accepted_successful_reply (rpc_transport_t *this)
{
- socket_private_t *priv = NULL;
- int ret = 0;
- struct iobuf *iobuf = NULL;
- uint32_t gluster_read_rsp_hdr_len = 0;
- gfs3_read_rsp read_rsp = {0, };
- size_t size = 0;
+ socket_private_t *priv = NULL;
+ int ret = 0;
+ struct iobuf *iobuf = NULL;
+ gfs3_read_rsp read_rsp = {0, };
+ ssize_t size = 0;
+ ssize_t default_read_size = 0;
+ char *proghdr_buf = NULL;
+ XDR xdr;
GF_VALIDATE_OR_GOTO ("socket", this, out);
GF_VALIDATE_OR_GOTO ("socket", this->private, out);
@@ -1053,16 +1055,12 @@ __socket_read_accepted_successful_reply (rpc_transport_t *this)
switch (priv->incoming.frag.call_body.reply.accepted_success_state) {
case SP_STATE_ACCEPTED_SUCCESS_REPLY_INIT:
- gluster_read_rsp_hdr_len = xdr_sizeof ((xdrproc_t) xdr_gfs3_read_rsp,
- &read_rsp);
+ default_read_size = xdr_sizeof ((xdrproc_t) xdr_gfs3_read_rsp,
+ &read_rsp);
- if (gluster_read_rsp_hdr_len == 0) {
- gf_log (this->name, GF_LOG_ERROR,
- "xdr_sizeof on gfs3_read_rsp failed");
- ret = -1;
- goto out;
- }
- __socket_proto_init_pending (priv, gluster_read_rsp_hdr_len);
+ proghdr_buf = priv->incoming.frag.fragcurrent;
+
+ __socket_proto_init_pending (priv, default_read_size);
priv->incoming.frag.call_body.reply.accepted_success_state
= SP_STATE_READING_PROC_HEADER;
@@ -1072,9 +1070,40 @@ __socket_read_accepted_successful_reply (rpc_transport_t *this)
case SP_STATE_READING_PROC_HEADER:
__socket_proto_read (priv, ret);
+ /* there can be 'xdata' in read response, figure it out */
+ xdrmem_create (&xdr, proghdr_buf, default_read_size,
+ XDR_DECODE);
+
+ /* This will fail if there is xdata sent from server, if not,
+ well and good, we don't need to worry about */
+ xdr_gfs3_read_rsp (&xdr, &read_rsp);
+
+ if (read_rsp.xdata.xdata_val)
+ free (read_rsp.xdata.xdata_val);
+
+ /* need to round off to proper roof (%4), as XDR packing pads
+ the end of opaque object with '0' */
+ size = roof (read_rsp.xdata.xdata_len, 4);
+
+ if (!size) {
+ priv->incoming.frag.call_body.reply.accepted_success_state
+ = SP_STATE_READ_PROC_OPAQUE;
+ goto read_proc_opaque;
+ }
+
+ __socket_proto_init_pending (priv, size);
+
priv->incoming.frag.call_body.reply.accepted_success_state
- = SP_STATE_READ_PROC_HEADER;
+ = SP_STATE_READING_PROC_OPAQUE;
+
+ case SP_STATE_READING_PROC_OPAQUE:
+ __socket_proto_read (priv, ret);
+
+ priv->incoming.frag.call_body.reply.accepted_success_state
+ = SP_STATE_READ_PROC_OPAQUE;
+ case SP_STATE_READ_PROC_OPAQUE:
+ read_proc_opaque:
if (priv->incoming.payload_vector.iov_base == NULL) {
size = (RPC_FRAGSIZE (priv->incoming.fraghdr) -
@@ -1107,6 +1136,9 @@ __socket_read_accepted_successful_reply (rpc_transport_t *this)
priv->incoming.frag.fragcurrent
= priv->incoming.payload_vector.iov_base;
+ priv->incoming.frag.call_body.reply.accepted_success_state
+ = SP_STATE_READ_PROC_HEADER;
+
/* fall through */
case SP_STATE_READ_PROC_HEADER:
@@ -1529,7 +1561,6 @@ __socket_proto_state_machine (rpc_transport_t *this,
case SP_STATE_READ_FRAGHDR:
priv->incoming.fraghdr = ntoh32 (priv->incoming.fraghdr);
- priv->incoming.record_state = SP_STATE_READING_FRAG;
priv->incoming.total_bytes_read
+= RPC_FRAGSIZE(priv->incoming.fraghdr);
iobuf = iobuf_get2 (this->ctx->iobuf_pool,
@@ -1543,6 +1574,7 @@ __socket_proto_state_machine (rpc_transport_t *this,
priv->incoming.iobuf = iobuf;
priv->incoming.iobuf_size = 0;
priv->incoming.frag.fragcurrent = iobuf_ptr (iobuf);
+ priv->incoming.record_state = SP_STATE_READING_FRAG;
/* fall through */
case SP_STATE_READING_FRAG:
diff --git a/rpc/rpc-transport/socket/src/socket.h b/rpc/rpc-transport/socket/src/socket.h
index 0c897bd2ec0..6d6802a541f 100644
--- a/rpc/rpc-transport/socket/src/socket.h
+++ b/rpc/rpc-transport/socket/src/socket.h
@@ -115,6 +115,8 @@ typedef enum {
typedef enum {
SP_STATE_ACCEPTED_SUCCESS_REPLY_INIT,
SP_STATE_READING_PROC_HEADER,
+ SP_STATE_READING_PROC_OPAQUE,
+ SP_STATE_READ_PROC_OPAQUE,
SP_STATE_READ_PROC_HEADER,
} sp_rpcfrag_vectored_reply_accepted_success_state_t;
diff --git a/xlators/protocol/client/src/client3_1-fops.c b/xlators/protocol/client/src/client3_1-fops.c
index e17a650d3c4..07d55c8956b 100644
--- a/xlators/protocol/client/src/client3_1-fops.c
+++ b/xlators/protocol/client/src/client3_1-fops.c
@@ -2693,6 +2693,10 @@ client3_1_readv_cbk (struct rpc_req *req, struct iovec *iov, int count,
(rsp.xdata.xdata_len), ret,
rsp.op_errno, out);
+#ifdef GF_TESTING_IO_XDATA
+ dict_dump (xdata);
+#endif
+
out:
if (rsp.op_ret == -1) {
gf_log (this->name, GF_LOG_WARNING,
@@ -4010,6 +4014,17 @@ client3_1_writev (call_frame_t *frame, xlator_t *this, void *data)
memcpy (req.gfid, args->fd->inode->gfid, 16);
+#ifdef GF_TESTING_IO_XDATA
+ if (!args->xdata)
+ args->xdata = dict_new ();
+
+ ret = dict_set_str (args->xdata, "testing-the-xdata-key",
+ "testing-the-xdata-value");
+#endif
+
+ GF_PROTOCOL_DICT_SERIALIZE (this, args->xdata, (&req.xdata.xdata_val),
+ req.xdata.xdata_len, op_errno, unwind);
+
ret = client_submit_vec_request (this, &req, frame, conf->fops,
GFS3_OP_WRITE, client3_1_writev_cbk,
args->vector, args->count,
diff --git a/xlators/protocol/server/src/server3_1-fops.c b/xlators/protocol/server/src/server3_1-fops.c
index 2e0bbb4c8cd..06283461d06 100644
--- a/xlators/protocol/server/src/server3_1-fops.c
+++ b/xlators/protocol/server/src/server3_1-fops.c
@@ -1455,6 +1455,16 @@ server_readv_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
op_ret, strerror (op_errno));
}
+#ifdef GF_TESTING_IO_XDATA
+ {
+ int ret = 0;
+ if (!xdata)
+ xdata = dict_new ();
+
+ ret = dict_set_str (xdata, "testing-the-xdata-key",
+ "testing-xdata-value");
+ }
+#endif
GF_PROTOCOL_DICT_SERIALIZE (this, xdata, (&rsp.xdata.xdata_val),
rsp.xdata.xdata_len, op_errno, out);
@@ -3392,6 +3402,10 @@ server_writev (rpcsvc_request_t *req)
(args.xdata.xdata_len), ret,
op_errno, out);
+#ifdef GF_TESTING_IO_XDATA
+ dict_dump (state->xdata);
+#endif
+
ret = 0;
resolve_and_resume (frame, server_writev_resume);
out:
@@ -3413,26 +3427,58 @@ server_writev_vec (rpcsvc_request_t *req, struct iovec *payload,
}
#define SERVER3_1_VECWRITE_START 0
-#define SERVER3_1_VECWRITE_READINGHDR 1
+#define SERVER3_1_VECWRITE_READING_HDR 1
+#define SERVER3_1_VECWRITE_READING_OPAQUE 2
int
server_writev_vecsizer (int state, ssize_t *readsize, char *addr)
{
- int nextstate = 0;
- gfs3_write_req write_req = {{0,},};
+ ssize_t size = 0;
+ int nextstate = 0;
+ gfs3_write_req write_req = {{0,},};
+ XDR xdr;
switch (state) {
case SERVER3_1_VECWRITE_START:
- *readsize = xdr_sizeof ((xdrproc_t) xdr_gfs3_write_req, &write_req);
- nextstate = SERVER3_1_VECWRITE_READINGHDR;
+ size = xdr_sizeof ((xdrproc_t) xdr_gfs3_write_req,
+ &write_req);
+ *readsize = size;
+ nextstate = SERVER3_1_VECWRITE_READING_HDR;
+ break;
+ case SERVER3_1_VECWRITE_READING_HDR:
+ size = xdr_sizeof ((xdrproc_t) xdr_gfs3_write_req,
+ &write_req);
+
+ xdrmem_create (&xdr, addr, size, XDR_DECODE);
+
+ /* This will fail if there is xdata sent from client, if not,
+ well and good */
+ xdr_gfs3_write_req (&xdr, &write_req);
+
+ /* need to round off to proper roof (%4), as XDR packing pads
+ the end of opaque object with '0' */
+ size = roof (write_req.xdata.xdata_len, 4);
+
+ *readsize = size;
+
+ if (!size)
+ nextstate = SERVER3_1_VECWRITE_START;
+ else
+ nextstate = SERVER3_1_VECWRITE_READING_OPAQUE;
+
+ if (write_req.xdata.xdata_val)
+ free (write_req.xdata.xdata_val);
+
break;
- case SERVER3_1_VECWRITE_READINGHDR:
+
+ case SERVER3_1_VECWRITE_READING_OPAQUE:
*readsize = 0;
nextstate = SERVER3_1_VECWRITE_START;
break;
default:
- gf_log ("server3_1", GF_LOG_ERROR, "wrong state: %d", state);
+ gf_log ("server", GF_LOG_ERROR, "wrong state: %d", state);
}
+
return nextstate;
}