diff options
| -rw-r--r-- | libglusterfs/src/common-utils.h | 22 | ||||
| -rw-r--r-- | rpc/rpc-transport/socket/src/socket.c | 115 | ||||
| -rw-r--r-- | rpc/rpc-transport/socket/src/socket.h | 7 | 
3 files changed, 137 insertions, 7 deletions
| diff --git a/libglusterfs/src/common-utils.h b/libglusterfs/src/common-utils.h index 6d46a997105..75692309e14 100644 --- a/libglusterfs/src/common-utils.h +++ b/libglusterfs/src/common-utils.h @@ -349,6 +349,28 @@ iov_unload (char *buf, const struct iovec *vector, int count)  static inline size_t +iov_load (const struct iovec *vector, int count, char *buf, int size) +{ +	size_t left = size; +	size_t cp = 0; +	int    ret = 0; +	int    i = 0; + +	while (left && i < count) { +		cp = min (vector[i].iov_len, left); +		if (vector[i].iov_base != buf + (size - left)) +			memcpy (vector[i].iov_base, buf + (size - left), cp); +		ret += cp; +		left -= cp; +		if (left) +			i++; +	} + +	return ret; +} + + +static inline size_t  iov_copy (const struct iovec *dst, int dcnt,  	  const struct iovec *src, int scnt)  { diff --git a/rpc/rpc-transport/socket/src/socket.c b/rpc/rpc-transport/socket/src/socket.c index 97612610975..ae574f5f599 100644 --- a/rpc/rpc-transport/socket/src/socket.c +++ b/rpc/rpc-transport/socket/src/socket.c @@ -308,6 +308,112 @@ done:  	return ret;  } + + +ssize_t +__socket_ssl_readv (rpc_transport_t *this, struct iovec *opvector, int opcount) +{ +	socket_private_t    *priv = NULL; +	int                  sock = -1; +	int                  ret = -1; + +	priv = this->private; +	sock = priv->sock; + +	if (priv->use_ssl) { +		ret = ssl_read_one (this, opvector->iov_base, opvector->iov_len); +	} else { +		ret = readv (sock, opvector, opcount); +	} + +	return ret; +} + + +ssize_t +__socket_ssl_read (rpc_transport_t *this, void *buf, size_t count) +{ +	struct iovec iov = {0, }; +	int          ret = -1; + +	iov.iov_base = buf; +	iov.iov_len = count; + +	ret = __socket_ssl_readv (this, &iov, 1); + +	return ret; +} + + +int +__socket_cached_read (rpc_transport_t *this, struct iovec *opvector, int opcount) +{ +	socket_private_t   *priv = NULL; +	int                 sock = -1; +	struct gf_sock_incoming *in = NULL; +	int                 req_len = -1; +	int                 ret = -1; + +	priv = this->private; +	sock = priv->sock; +	in = &priv->incoming; +	req_len = iov_length (opvector, opcount); + +	if (in->record_state == SP_STATE_READING_FRAGHDR) { +		in->ra_read = 0; +		in->ra_served = 0; +		in->ra_max = 0; +		in->ra_buf = NULL; +		goto uncached; +	} + +	if (!in->ra_max) { +		/* first call after passing SP_STATE_READING_FRAGHDR */ +		in->ra_max = min (RPC_FRAGSIZE (in->fraghdr), GF_SOCKET_RA_MAX); +		/* Note that the in->iobuf is the primary iobuf into which +		   headers are read into. By using this itself as our +		   read-ahead cache, we can avoid memory copies in iov_load +		*/ +		in->ra_buf = iobuf_ptr (in->iobuf); +	} + +	/* fill read-ahead */ +	if (in->ra_read < in->ra_max) { +		ret = __socket_ssl_read (this, &in->ra_buf[in->ra_read], +					 (in->ra_max - in->ra_read)); +		if (ret > 0) +			in->ra_read += ret; + +		/* we proceed to test if there is still cached data to +		   be served even if readahead could not progress */ +	} + +	/* serve cached */ +	if (in->ra_served < in->ra_read) { +		ret = iov_load (opvector, opcount, &in->ra_buf[in->ra_served], +				min (req_len, (in->ra_read - in->ra_served))); + +		in->ra_served += ret; +		/* Do not read uncached and cached in the same call */ +		goto out; +	} + +	if (in->ra_read < in->ra_max) +		/* If there was no cached data to be served, (and we are +		   guaranteed to have already performed an attempt to progress +		   readahead above), and we have not yet read out the full +		   readahead capacity, then bail out for now without doing +		   the uncached read below (as that will overtake future cached +		   read) +		*/ +		goto out; +uncached: +	ret = __socket_ssl_readv (this, opvector, opcount); +out: +	return ret; +} + +  /*   * return value:   *   0 = success (completed) @@ -363,13 +469,8 @@ __socket_rwv (rpc_transport_t *this, struct iovec *vector, int count,                          }                          this->total_bytes_write += ret;                  } else { -			if (priv->use_ssl) { -				ret = ssl_read_one(this, -					opvector->iov_base, opvector->iov_len); -			} -			else { -				ret = readv (sock, opvector, opcount); -			} +			ret = __socket_cached_read (this, opvector, opcount); +  			if (ret == 0) {  				gf_log(this->name,GF_LOG_DEBUG,"EOF on socket");  				errno = ENODATA; diff --git a/rpc/rpc-transport/socket/src/socket.h b/rpc/rpc-transport/socket/src/socket.h index 2c4b44cf466..78faad9038d 100644 --- a/rpc/rpc-transport/socket/src/socket.h +++ b/rpc/rpc-transport/socket/src/socket.h @@ -158,6 +158,8 @@ struct gf_sock_incoming_frag {          sp_rpcfrag_state_t state;  }; +#define GF_SOCKET_RA_MAX 1024 +  struct gf_sock_incoming {          sp_rpcrecord_state_t  record_state;          struct gf_sock_incoming_frag frag; @@ -175,6 +177,11 @@ struct gf_sock_incoming {          char                 complete_record;          msg_type_t           msg_type;          size_t               total_bytes_read; + +	size_t               ra_read; +	size_t               ra_max; +	size_t               ra_served; +	char                *ra_buf;  };  typedef struct { | 
