diff options
| author | Raghavendra G <raghavendra@gluster.com> | 2010-07-28 06:23:31 +0000 | 
|---|---|---|
| committer | Anand V. Avati <avati@dev.gluster.com> | 2010-07-28 05:08:26 -0700 | 
| commit | 40d3ad15856c88d93d16264aa1f6bb55806aafde (patch) | |
| tree | 1290d311c9001e3954176f005b89a2e438321bd9 /rpc/rpc-transport | |
| parent | b8692a3c3cc8e0dab404664e0aeb6ebaea6ab6e5 (diff) | |
changes to rpc
- use mem-pool for requests and saved_frames.
  - preserve the rpc_req structure till rpc invokes program's reply.
    This will enable us to store transport specific data that has to
    last till reply has come (eg., memory regions of chunk lists in
    case of rdma).
  - change signature of rpc_clnt_submit to accept rsphdr_vector and
    rsppayload_vector. The buffers pointed by these vectors will be
    from iobufs and these iobufs are added to an iobref which should
    also be passed as an arguement to rpc_clnt_submit.
Signed-off-by: Raghavendra G <raghavendra@gluster.com>
Signed-off-by: Anand V. Avati <avati@dev.gluster.com>
BUG: 875 (Implement a new protocol to provide proper backward/forward compatibility)
URL: http://bugs.gluster.com/cgi-bin/bugzilla3/show_bug.cgi?id=875
Diffstat (limited to 'rpc/rpc-transport')
| -rw-r--r-- | rpc/rpc-transport/socket/src/socket.c | 129 | ||||
| -rw-r--r-- | rpc/rpc-transport/socket/src/socket.h | 4 | 
2 files changed, 103 insertions, 30 deletions
diff --git a/rpc/rpc-transport/socket/src/socket.c b/rpc/rpc-transport/socket/src/socket.c index 4396454a33a..baeee735bb8 100644 --- a/rpc/rpc-transport/socket/src/socket.c +++ b/rpc/rpc-transport/socket/src/socket.c @@ -370,12 +370,13 @@ __socket_reset (rpc_transport_t *this)          /* TODO: use mem-pool on incoming data */ -        if (priv->incoming.iobuf) { -                iobuf_unref (priv->incoming.iobuf); +        if (priv->incoming.iobref) { +                iobref_unref (priv->incoming.iobref); +                priv->incoming.iobref = NULL;          } -        if (priv->incoming.vectoriob) { -                iobuf_unref (priv->incoming.vectoriob); +        if (priv->incoming.iobuf) { +                iobuf_unref (priv->incoming.iobuf);          }          memset (&priv->incoming, 0, sizeof (priv->incoming)); @@ -699,7 +700,7 @@ __socket_read_vectored_request (rpc_transport_t *this)                  /* fall through */          case SP_STATE_READ_VERFBYTES: -                if (priv->incoming.vectoriob == NULL) { +                if (priv->incoming.payload_vector.iov_base == NULL) {                          iobuf = iobuf_get (this->ctx->iobuf_pool);                          if (!iobuf) {                                  gf_log (this->name, GF_LOG_ERROR, @@ -710,7 +711,23 @@ __socket_read_vectored_request (rpc_transport_t *this)                                  break;                          } -                        priv->incoming.vectoriob = iobuf; +                        if (priv->incoming.iobref == NULL) { +                                priv->incoming.iobref = iobref_new (); +                                if (priv->incoming.iobref == NULL) { +                                        gf_log (this->name, GF_LOG_ERROR, +                                                "out of memory"); +                                        ret = -1; +                                        iobuf_unref (iobuf); +                                        break; +                                } +                        } + +                        iobref_add (priv->incoming.iobref, iobuf); +                        iobuf_unref (iobuf); + +                        priv->incoming.payload_vector.iov_base +                                = iobuf_ptr (iobuf); +                          priv->incoming.frag.fragcurrent = iobuf_ptr (iobuf);                  } @@ -735,9 +752,10 @@ __socket_read_vectored_request (rpc_transport_t *this)                          && RPC_LASTFRAG (priv->incoming.fraghdr))) {                          priv->incoming.frag.call_body.request.vector_state                                  = SP_STATE_VECTORED_REQUEST_INIT; -                        priv->incoming.vectoriob_size +                        priv->incoming.payload_vector.iov_len                                  = (unsigned long)priv->incoming.frag.fragcurrent -                                - (unsigned long)iobuf_ptr (priv->incoming.vectoriob); +                                - (unsigned long) +                                priv->incoming.payload_vector.iov_base;                  }                  break;          } @@ -846,18 +864,31 @@ __socket_read_accepted_successful_reply (rpc_transport_t *this)                  /* fall through */          case SP_STATE_READ_PROC_HEADER: -                if (priv->incoming.vectoriob == NULL) { +                if (priv->incoming.payload_vector.iov_base == NULL) {                          iobuf = iobuf_get (this->ctx->iobuf_pool);                          if (iobuf == NULL) {                                  ret = -1;                                  goto out;                          } -                        priv->incoming.vectoriob = iobuf; +                        if (priv->incoming.iobref == NULL) { +                                priv->incoming.iobref = iobref_new (); +                                if (priv->incoming.iobref == NULL) { +                                        ret = -1; +                                        iobuf_unref (iobuf); +                                        goto out; +                                } +                        } + +                        iobref_add (priv->incoming.iobref, iobuf); +                        iobuf_unref (iobuf); + +                        priv->incoming.payload_vector.iov_base +                                = iobuf_ptr (iobuf);                  }                  priv->incoming.frag.fragcurrent -                        = iobuf_ptr (priv->incoming.vectoriob); +                        = priv->incoming.payload_vector.iov_base;                  /* now read the entire remaining msg into new iobuf */                  ret = __socket_read_simple_msg (this); @@ -1080,9 +1111,11 @@ __socket_read_reply (rpc_transport_t *this)          if ((request_info->prognum == GLUSTER3_1_FOP_PROGRAM)              && (request_info->procnum == GF_FOP_READ)) { -                if (request_info->rsp.rspbuf != NULL) { -                        priv->incoming.vectoriob -                                = iobuf_ref (request_info->rsp.rspbuf); +                if (request_info->rsp.rsp_payload_count != 0) { +                        priv->incoming.iobref +                                = iobref_ref (request_info->rsp.rsp_iobref); +                        priv->incoming.payload_vector +                                = *request_info->rsp.rsp_payload;                  }                  ret = __socket_read_vectored_reply (this); @@ -1154,15 +1187,19 @@ __socket_read_frag (rpc_transport_t *this)  inline  void __socket_reset_priv (socket_private_t *priv)  { +        if (priv->incoming.iobref) { +                iobref_unref (priv->incoming.iobref); +                priv->incoming.iobref = NULL; +        } +          if (priv->incoming.iobuf) {                  iobuf_unref (priv->incoming.iobuf); -                priv->incoming.iobuf = NULL;          } -        if (priv->incoming.vectoriob) { -                iobuf_unref (priv->incoming.vectoriob); -                priv->incoming.vectoriob = NULL; -        } +        memset (&priv->incoming.payload_vector, 0, +                sizeof (priv->incoming.payload_vector)); + +        priv->incoming.iobuf = NULL;  } @@ -1170,9 +1207,11 @@ int  __socket_proto_state_machine (rpc_transport_t *this,                                rpc_transport_pollin_t **pollin)  { -        int               ret   = -1; -        socket_private_t *priv  = NULL; -        struct iobuf     *iobuf = NULL; +        int               ret    = -1; +        socket_private_t *priv   = NULL; +        struct iobuf     *iobuf  = NULL; +        struct iobref    *iobref = NULL; +        struct iovec      vector[2];          priv = this->private;          while (priv->incoming.record_state != SP_STATE_COMPLETE) { @@ -1191,7 +1230,7 @@ __socket_proto_state_machine (rpc_transport_t *this,                          priv->incoming.iobuf = iobuf;                          priv->incoming.iobuf_size = 0; -                        priv->incoming.vectoriob_size = 0; +                        priv->incoming.payload_vector.iov_len = 0;                          priv->incoming.pending_vector = priv->incoming.vector;                          priv->incoming.pending_vector->iov_base = @@ -1260,15 +1299,49 @@ __socket_proto_state_machine (rpc_transport_t *this,                           * upper layers.                           */                          if (pollin != NULL) { +                                int count = 0;                                  priv->incoming.iobuf_size                                          = priv->incoming.total_bytes_read -                                        - priv->incoming.vectoriob_size; +                                        - priv->incoming.payload_vector.iov_len; + +                                memset (vector, 0, sizeof (vector)); + +                                if (priv->incoming.iobref == NULL) { +                                        priv->incoming.iobref = iobref_new (); +                                        if (priv->incoming.iobref == NULL) { +                                                gf_log (this->name, +                                                        GF_LOG_ERROR, +                                                        "out of memory"); +                                                ret = -1; +                                                goto out; +                                        } +                                } + +                                vector[count].iov_base +                                        = iobuf_ptr (priv->incoming.iobuf); +                                vector[count].iov_len +                                        = priv->incoming.iobuf_size; + +                                iobref = priv->incoming.iobref; + +                                iobref_add (iobref, +                                            priv->incoming.iobuf); +                                iobuf_unref (priv->incoming.iobuf); +                                priv->incoming.iobuf = NULL;  + +                                count++; + +                                if (priv->incoming.payload_vector.iov_base +                                    != NULL) { +                                        vector[count] +                                                = priv->incoming.payload_vector; +                                        count++; +                                }                                  *pollin = rpc_transport_pollin_alloc (this, -                                                                      priv->incoming.iobuf, -                                                                      priv->incoming.iobuf_size, -                                                                      priv->incoming.vectoriob, -                                                                      priv->incoming.vectoriob_size, +                                                                      vector, +                                                                      count, +                                                                      iobref,                                                                        priv->incoming.request_info);                                  if (*pollin == NULL) {                                          ret = -1; diff --git a/rpc/rpc-transport/socket/src/socket.h b/rpc/rpc-transport/socket/src/socket.h index aa31ee2a7ef..5078b161e29 100644 --- a/rpc/rpc-transport/socket/src/socket.h +++ b/rpc/rpc-transport/socket/src/socket.h @@ -170,8 +170,8 @@ typedef struct {                  size_t               iobuf_size;                  struct iovec         vector[2];                  int                  count; -                struct iobuf        *vectoriob; -                size_t               vectoriob_size; +                struct iovec         payload_vector; +                struct iobref       *iobref;                  rpc_request_info_t  *request_info;                  struct iovec        *pending_vector;                  int                  pending_count;  | 
