From 05d2d4a401cb2497185b091e666638e01f1d7f1d Mon Sep 17 00:00:00 2001 From: Raghavendra G Date: Tue, 18 Dec 2012 12:47:43 +0530 Subject: rpc-transport/rdma: use rdma-cm for connection establishment. Till now gluster used tcp/ip based communication channel with gluster specific protocol to exchange infiniband addresses. Change-Id: I9de4db398a0e2af51d3d2d68c2fe42168102b190 BUG: 765051 Signed-off-by: Raghavendra G Reviewed-on: http://review.gluster.org/149 Tested-by: Gluster Build System Reviewed-by: Vijay Bellur --- rpc/rpc-transport/rdma/src/Makefile.am | 2 +- rpc/rpc-transport/rdma/src/name.c | 36 +- rpc/rpc-transport/rdma/src/name.h | 9 +- rpc/rpc-transport/rdma/src/rdma.c | 6714 +++++++++++++++----------------- rpc/rpc-transport/rdma/src/rdma.h | 103 +- 5 files changed, 3141 insertions(+), 3723 deletions(-) (limited to 'rpc') diff --git a/rpc/rpc-transport/rdma/src/Makefile.am b/rpc/rpc-transport/rdma/src/Makefile.am index 817925dec..2bf7cf238 100644 --- a/rpc/rpc-transport/rdma/src/Makefile.am +++ b/rpc/rpc-transport/rdma/src/Makefile.am @@ -7,7 +7,7 @@ rdma_la_LDFLAGS = -module -avoid-version rdma_la_SOURCES = rdma.c name.c rdma_la_LIBADD = $(top_builddir)/libglusterfs/src/libglusterfs.la \ - -libverbs + -libverbs -lrdmacm noinst_HEADERS = rdma.h name.h -I$(top_srcdir)/libglusterfs/src -I$(top_srcdir)/rpc/rpc-lib/src/ \ diff --git a/rpc/rpc-transport/rdma/src/name.c b/rpc/rpc-transport/rdma/src/name.c index f6ae818b1..c57428ad6 100644 --- a/rpc/rpc-transport/rdma/src/name.c +++ b/rpc/rpc-transport/rdma/src/name.c @@ -13,6 +13,7 @@ #include #include #include +#include #ifndef AF_INET_SDP #define AF_INET_SDP 27 @@ -31,7 +32,8 @@ gf_resolve_ip6 (const char *hostname, struct addrinfo **addr_info); static int32_t -af_inet_bind_to_port_lt_ceiling (int fd, struct sockaddr *sockaddr, +af_inet_bind_to_port_lt_ceiling (struct rdma_cm_id *cm_id, + struct sockaddr *sockaddr, socklen_t sockaddr_len, int ceiling) { int32_t ret = -1; @@ -51,12 +53,14 @@ af_inet_bind_to_port_lt_ceiling (int fd, struct sockaddr *sockaddr, switch (sockaddr->sa_family) { case AF_INET6: - ((struct sockaddr_in6 *)sockaddr)->sin6_port = htons (port); + ((struct sockaddr_in6 *)sockaddr)->sin6_port + = htons (port); break; case AF_INET_SDP: case AF_INET: - ((struct sockaddr_in *)sockaddr)->sin_port = htons (port); + ((struct sockaddr_in *)sockaddr)->sin_port + = htons (port); break; } // ignore the reserved ports @@ -64,7 +68,7 @@ af_inet_bind_to_port_lt_ceiling (int fd, struct sockaddr *sockaddr, port--; continue; } - ret = bind (fd, sockaddr, sockaddr_len); + ret = rdma_bind_addr (cm_id, sockaddr); if (ret == 0) break; @@ -78,11 +82,10 @@ af_inet_bind_to_port_lt_ceiling (int fd, struct sockaddr *sockaddr, return ret; } +#if 0 static int32_t -af_unix_client_bind (rpc_transport_t *this, - struct sockaddr *sockaddr, - socklen_t sockaddr_len, - int sock) +af_unix_client_bind (rpc_transport_t *this, struct sockaddr *sockaddr, + socklen_t sockaddr_len, struct rdma_cm_id *cm_id) { data_t *path_data = NULL; struct sockaddr_un *addr = NULL; @@ -114,6 +117,7 @@ af_unix_client_bind (rpc_transport_t *this, err: return ret; } +#endif static int32_t client_fill_address_family (rpc_transport_t *this, struct sockaddr *sockaddr) @@ -412,10 +416,8 @@ out: } int32_t -gf_rdma_client_bind (rpc_transport_t *this, - struct sockaddr *sockaddr, - socklen_t *sockaddr_len, - int sock) +gf_rdma_client_bind (rpc_transport_t *this, struct sockaddr *sockaddr, + socklen_t *sockaddr_len, struct rdma_cm_id *cm_id) { int ret = 0; @@ -427,22 +429,24 @@ gf_rdma_client_bind (rpc_transport_t *this, *sockaddr_len = sizeof (struct sockaddr_in); case AF_INET6: - ret = af_inet_bind_to_port_lt_ceiling (sock, sockaddr, + ret = af_inet_bind_to_port_lt_ceiling (cm_id, sockaddr, *sockaddr_len, GF_CLIENT_PORT_CEILING); if (ret == -1) { gf_log (this->name, GF_LOG_WARNING, - "cannot bind inet socket (%d) to port " - "less than %d (%s)", - sock, GF_CLIENT_PORT_CEILING, strerror (errno)); + "cannot bind rdma_cm_id to port " + "less than %d (%s)", GF_CLIENT_PORT_CEILING, + strerror (errno)); ret = 0; } break; case AF_UNIX: *sockaddr_len = sizeof (struct sockaddr_un); +#if 0 ret = af_unix_client_bind (this, (struct sockaddr *)sockaddr, *sockaddr_len, sock); +#endif break; default: diff --git a/rpc/rpc-transport/rdma/src/name.h b/rpc/rpc-transport/rdma/src/name.h index 114ed1661..742fc5fc3 100644 --- a/rpc/rpc-transport/rdma/src/name.h +++ b/rpc/rpc-transport/rdma/src/name.h @@ -11,16 +11,13 @@ #ifndef _IB_VERBS_NAME_H #define _IB_VERBS_NAME_H -#include -#include +#include #include "compat.h" int32_t -gf_rdma_client_bind (rpc_transport_t *this, - struct sockaddr *sockaddr, - socklen_t *sockaddr_len, - int sock); +gf_rdma_client_bind (rpc_transport_t *this, struct sockaddr *sockaddr, + socklen_t *sockaddr_len, struct rdma_cm_id *cm_id); int32_t gf_rdma_client_get_remote_sockaddr (rpc_transport_t *this, diff --git a/rpc/rpc-transport/rdma/src/rdma.c b/rpc/rpc-transport/rdma/src/rdma.c index a44e8995f..135fbdf28 100644 --- a/rpc/rpc-transport/rdma/src/rdma.c +++ b/rpc/rpc-transport/rdma/src/rdma.c @@ -8,7 +8,6 @@ cases as published by the Free Software Foundation. */ - #ifndef _CONFIG_H #define _CONFIG_H #include "config.h" @@ -35,99 +34,26 @@ gf_rdma_post_ref (gf_rdma_post_t *post); int gf_rdma_post_unref (gf_rdma_post_t *post); -int32_t -gf_resolve_ip6 (const char *hostname, - uint16_t port, - int family, - void **dnscache, - struct addrinfo **addr_info); - -static uint16_t -gf_rdma_get_local_lid (struct ibv_context *context, - int32_t port) -{ - struct ibv_port_attr attr; - - if (ibv_query_port (context, port, &attr)) - return 0; - - return attr.lid; -} +static void * +gf_rdma_send_completion_proc (void *data); -static const char * -get_port_state_str(enum ibv_port_state pstate) -{ - switch (pstate) { - case IBV_PORT_DOWN: return "PORT_DOWN"; - case IBV_PORT_INIT: return "PORT_INIT"; - case IBV_PORT_ARMED: return "PORT_ARMED"; - case IBV_PORT_ACTIVE: return "PORT_ACTIVE"; - case IBV_PORT_ACTIVE_DEFER: return "PORT_ACTIVE_DEFER"; - default: return "invalid state"; - } -} +static void * +gf_rdma_recv_completion_proc (void *data); static int32_t -ib_check_active_port (struct ibv_context *ctx, uint8_t port) -{ - struct ibv_port_attr port_attr = {0, }; - int32_t ret = 0; - const char *state_str = NULL; - - if (!ctx) { - gf_log_callingfn (GF_RDMA_LOG_NAME, GF_LOG_ERROR, - "Error in supplied context"); - return -1; - } - - ret = ibv_query_port (ctx, port, &port_attr); - - if (ret) { - gf_log (GF_RDMA_LOG_NAME, GF_LOG_ERROR, - "Failed to query port %u properties", port); - return -1; - } - - state_str = get_port_state_str (port_attr.state); - gf_log (GF_RDMA_LOG_NAME, GF_LOG_TRACE, - "Infiniband PORT: (%u) STATE: (%s)", - port, state_str); - - if (port_attr.state == IBV_PORT_ACTIVE) - return 0; - - return -1; -} +gf_rdma_create_qp (rpc_transport_t *this); static int32_t -ib_get_active_port (struct ibv_context *ib_ctx) -{ - struct ibv_device_attr ib_device_attr = {{0, }, }; - int32_t ret = -1; - uint8_t ib_port = 0; +__gf_rdma_teardown (rpc_transport_t *this); - if (!ib_ctx) { - gf_log_callingfn (GF_RDMA_LOG_NAME, GF_LOG_ERROR, - "Error in supplied context"); - return -1; - } - if (ibv_query_device (ib_ctx, &ib_device_attr)) { - gf_log (GF_RDMA_LOG_NAME, GF_LOG_ERROR, - "Failed to query device properties"); - return -1; - } +static int32_t +gf_rdma_teardown (rpc_transport_t *this); - for (ib_port = 1; ib_port <= ib_device_attr.phys_port_cnt; ++ib_port) { - ret = ib_check_active_port (ib_ctx, ib_port); - if (ret == 0) - return ib_port; +static int32_t +gf_rdma_disconnect (rpc_transport_t *this); - gf_log (GF_RDMA_LOG_NAME, GF_LOG_TRACE, - "Port:(%u) not active", ib_port); - continue; - } - return ret; -} +static void +gf_rdma_cm_handle_disconnect (rpc_transport_t *this); static void @@ -157,7 +83,7 @@ gf_rdma_put_post (gf_rdma_queue_t *queue, gf_rdma_post_t *post) static gf_rdma_post_t * -gf_rdma_new_post (gf_rdma_device_t *device, int32_t len, +gf_rdma_new_post (rpc_transport_t *this, gf_rdma_device_t *device, int32_t len, gf_rdma_post_type_t type) { gf_rdma_post_t *post = NULL; @@ -184,7 +110,7 @@ gf_rdma_new_post (gf_rdma_device_t *device, int32_t len, post->buf_size, IBV_ACCESS_LOCAL_WRITE); if (!post->mr) { - gf_log (GF_RDMA_LOG_NAME, GF_LOG_WARNING, + gf_log (this->name, GF_LOG_WARNING, "memory registration failed (%s)", strerror (errno)); goto out; @@ -259,22 +185,6 @@ __gf_rdma_quota_get (gf_rdma_peer_t *peer) return ret; } -/* - static int32_t - gf_rdma_quota_get (gf_rdma_peer_t *peer) - { - int32_t ret = -1; - gf_rdma_private_t *priv = peer->trans->private; - - pthread_mutex_lock (&priv->write_mutex); - { - ret = __gf_rdma_quota_get (peer); - } - pthread_mutex_unlock (&priv->write_mutex); - - return ret; - } -*/ static void __gf_rdma_ioq_entry_free (gf_rdma_ioq_t *entry) @@ -290,6 +200,7 @@ __gf_rdma_ioq_entry_free (gf_rdma_ioq_t *entry) iobref_unref (entry->msg.request.rsp_iobref); entry->msg.request.rsp_iobref = NULL; } + mem_put (entry); } @@ -309,898 +220,968 @@ static int32_t __gf_rdma_disconnect (rpc_transport_t *this) { gf_rdma_private_t *priv = NULL; - int32_t ret = 0; priv = this->private; - if (priv->connected || priv->tcp_connected) { - fcntl (priv->sock, F_SETFL, O_NONBLOCK); - if (shutdown (priv->sock, SHUT_RDWR) != 0) { - gf_log (GF_RDMA_LOG_NAME, GF_LOG_DEBUG, - "shutdown () - error: %s", - strerror (errno)); - ret = -errno; - priv->tcp_connected = 0; - priv->connected = 0; - } + if (priv->connected) { + rdma_disconnect (priv->peer.cm_id); } - return ret; + return 0; } -static int32_t -gf_rdma_post_send (struct ibv_qp *qp, gf_rdma_post_t *post, int32_t len) +static void +gf_rdma_queue_init (gf_rdma_queue_t *queue) { - struct ibv_sge list = { - .addr = (unsigned long) post->buf, - .length = len, - .lkey = post->mr->lkey - }; + pthread_mutex_init (&queue->lock, NULL); - struct ibv_send_wr wr = { - .wr_id = (unsigned long) post, - .sg_list = &list, - .num_sge = 1, - .opcode = IBV_WR_SEND, - .send_flags = IBV_SEND_SIGNALED, - }, *bad_wr; + queue->active_posts.next = &queue->active_posts; + queue->active_posts.prev = &queue->active_posts; + queue->passive_posts.next = &queue->passive_posts; + queue->passive_posts.prev = &queue->passive_posts; +} - if (!qp) - return EINVAL; - return ibv_post_send (qp, &wr, &bad_wr); +static void +__gf_rdma_destroy_queue (gf_rdma_post_t *post) +{ + gf_rdma_post_t *tmp = NULL; + + while (post->next != post) { + tmp = post->next; + + post->next = post->next->next; + post->next->prev = post; + + gf_rdma_destroy_post (tmp); + } } -int -__gf_rdma_encode_error(gf_rdma_peer_t *peer, gf_rdma_reply_info_t *reply_info, - struct iovec *rpchdr, gf_rdma_header_t *hdr, - gf_rdma_errcode_t err) -{ - struct rpc_msg *rpc_msg = NULL; - if (reply_info != NULL) { - hdr->rm_xid = hton32(reply_info->rm_xid); - } else { - rpc_msg = rpchdr[0].iov_base; /* assume rpchdr contains - * only one vector. - * (which is true) - */ - hdr->rm_xid = rpc_msg->rm_xid; +static void +gf_rdma_destroy_queue (gf_rdma_queue_t *queue) +{ + if (queue == NULL) { + goto out; } - hdr->rm_vers = hton32(GF_RDMA_VERSION); - hdr->rm_credit = hton32(peer->send_count); - hdr->rm_type = hton32(GF_RDMA_ERROR); - hdr->rm_body.rm_error.rm_type = hton32(err); - if (err == ERR_VERS) { - hdr->rm_body.rm_error.rm_version.gf_rdma_vers_low - = hton32(GF_RDMA_VERSION); - hdr->rm_body.rm_error.rm_version.gf_rdma_vers_high - = hton32(GF_RDMA_VERSION); + pthread_mutex_lock (&queue->lock); + { + if (queue->passive_count > 0) { + __gf_rdma_destroy_queue (&queue->passive_posts); + queue->passive_count = 0; + } + + if (queue->active_count > 0) { + __gf_rdma_destroy_queue (&queue->active_posts); + queue->active_count = 0; + } } + pthread_mutex_unlock (&queue->lock); - return sizeof (*hdr); +out: + return; } -int32_t -__gf_rdma_send_error (gf_rdma_peer_t *peer, gf_rdma_ioq_t *entry, - gf_rdma_post_t *post, gf_rdma_reply_info_t *reply_info, - gf_rdma_errcode_t err) +static void +gf_rdma_destroy_posts (rpc_transport_t *this) { - int32_t ret = -1, len = 0; + gf_rdma_device_t *device = NULL; + gf_rdma_private_t *priv = NULL; - len = __gf_rdma_encode_error (peer, reply_info, entry->rpchdr, - (gf_rdma_header_t *)post->buf, err); - if (len == -1) { - gf_log (GF_RDMA_LOG_NAME, GF_LOG_ERROR, - "encode error returned -1"); + if (this == NULL) { goto out; } - gf_rdma_post_ref (post); + priv = this->private; + device = priv->device; - ret = gf_rdma_post_send (peer->qp, post, len); - if (!ret) { - ret = len; - } else { - gf_log (GF_RDMA_LOG_NAME, GF_LOG_WARNING, - "gf_rdma_post_send (to %s) failed with ret = %d (%s)", - peer->trans->peerinfo.identifier, ret, - (ret > 0) ? strerror (ret) : ""); - gf_rdma_post_unref (post); - __gf_rdma_disconnect (peer->trans); - ret = -1; - } + gf_rdma_destroy_queue (&device->sendq); + gf_rdma_destroy_queue (&device->recvq); out: - return ret; + return; } -int32_t -__gf_rdma_create_read_chunks_from_vector (gf_rdma_peer_t *peer, - gf_rdma_read_chunk_t **readch_ptr, - int32_t *pos, struct iovec *vector, - int count, - gf_rdma_request_context_t *request_ctx) +static int32_t +__gf_rdma_create_posts (rpc_transport_t *this, int32_t count, int32_t size, + gf_rdma_queue_t *q, gf_rdma_post_type_t type) { - int i = 0; - gf_rdma_private_t *priv = NULL; - gf_rdma_device_t *device = NULL; - struct ibv_mr *mr = NULL; - gf_rdma_read_chunk_t *readch = NULL; - int32_t ret = -1; - - GF_VALIDATE_OR_GOTO (GF_RDMA_LOG_NAME, peer, out); - GF_VALIDATE_OR_GOTO (GF_RDMA_LOG_NAME, readch_ptr, out); - GF_VALIDATE_OR_GOTO (GF_RDMA_LOG_NAME, *readch_ptr, out); - GF_VALIDATE_OR_GOTO (GF_RDMA_LOG_NAME, request_ctx, out); - GF_VALIDATE_OR_GOTO (GF_RDMA_LOG_NAME, vector, out); + int32_t i = 0; + int32_t ret = 0; + gf_rdma_private_t *priv = NULL; + gf_rdma_device_t *device = NULL; - priv = peer->trans->private; + priv = this->private; device = priv->device; - readch = *readch_ptr; - for (i = 0; i < count; i++) { - readch->rc_discrim = hton32 (1); - readch->rc_position = hton32 (*pos); + for (i=0 ; ipd, vector[i].iov_base, - vector[i].iov_len, - IBV_ACCESS_REMOTE_READ); - if (!mr) { - gf_log (GF_RDMA_LOG_NAME, GF_LOG_WARNING, - "memory registration failed (%s) (peer:%s)", - strerror (errno), - peer->trans->peerinfo.identifier); - goto out; + post = gf_rdma_new_post (this, device, size + 2048, type); + if (!post) { + gf_log (this->name, GF_LOG_ERROR, + "post creation failed"); + ret = -1; + break; } - request_ctx->mr[request_ctx->mr_count++] = mr; + gf_rdma_put_post (q, post); + } + return ret; +} - readch->rc_target.rs_handle = hton32 (mr->rkey); - readch->rc_target.rs_length - = hton32 (vector[i].iov_len); - readch->rc_target.rs_offset - = hton64 ((uint64_t)(unsigned long)vector[i].iov_base); - *pos = *pos + vector[i].iov_len; - readch++; - } +static int32_t +gf_rdma_post_recv (struct ibv_srq *srq, + gf_rdma_post_t *post) +{ + struct ibv_sge list = { + .addr = (unsigned long) post->buf, + .length = post->buf_size, + .lkey = post->mr->lkey + }; - *readch_ptr = readch; + struct ibv_recv_wr wr = { + .wr_id = (unsigned long) post, + .sg_list = &list, + .num_sge = 1, + }, *bad_wr; - ret = 0; -out: - return ret; + gf_rdma_post_ref (post); + + return ibv_post_srq_recv (srq, &wr, &bad_wr); } -int32_t -__gf_rdma_create_read_chunks (gf_rdma_peer_t *peer, gf_rdma_ioq_t *entry, - gf_rdma_chunktype_t type, uint32_t **ptr, - gf_rdma_request_context_t *request_ctx) +static int32_t +gf_rdma_create_posts (rpc_transport_t *this) { - int32_t ret = -1; - int pos = 0; - - GF_VALIDATE_OR_GOTO (GF_RDMA_LOG_NAME, peer, out); - GF_VALIDATE_OR_GOTO (GF_RDMA_LOG_NAME, entry, out); - GF_VALIDATE_OR_GOTO (GF_RDMA_LOG_NAME, ptr, out); - GF_VALIDATE_OR_GOTO (GF_RDMA_LOG_NAME, *ptr, out); - GF_VALIDATE_OR_GOTO (GF_RDMA_LOG_NAME, request_ctx, out); + int32_t i = 0, ret = 0; + gf_rdma_post_t *post = NULL; + gf_rdma_private_t *priv = NULL; + gf_rdma_options_t *options = NULL; + gf_rdma_device_t *device = NULL; - request_ctx->iobref = iobref_ref (entry->iobref); + priv = this->private; + options = &priv->options; + device = priv->device; - if (type == gf_rdma_areadch) { - pos = 0; - ret = __gf_rdma_create_read_chunks_from_vector (peer, - (gf_rdma_read_chunk_t **)ptr, - &pos, - entry->rpchdr, - entry->rpchdr_count, - request_ctx); - if (ret == -1) { - gf_log (GF_RDMA_LOG_NAME, GF_LOG_WARNING, - "cannot create read chunks from vector " - "entry->rpchdr"); - goto out; - } - - ret = __gf_rdma_create_read_chunks_from_vector (peer, - (gf_rdma_read_chunk_t **)ptr, - &pos, - entry->proghdr, - entry->proghdr_count, - request_ctx); - if (ret == -1) { - gf_log (GF_RDMA_LOG_NAME, GF_LOG_WARNING, - "cannot create read chunks from vector " - "entry->proghdr"); - } + ret = __gf_rdma_create_posts (this, options->send_count, + options->send_size, + &device->sendq, GF_RDMA_SEND_POST); + if (!ret) + ret = __gf_rdma_create_posts (this, options->recv_count, + options->recv_size, + &device->recvq, + GF_RDMA_RECV_POST); - if (entry->prog_payload_count != 0) { - ret = __gf_rdma_create_read_chunks_from_vector (peer, - (gf_rdma_read_chunk_t **)ptr, - &pos, - entry->prog_payload, - entry->prog_payload_count, - request_ctx); - if (ret == -1) { - gf_log (GF_RDMA_LOG_NAME, GF_LOG_WARNING, - "cannot create read chunks from vector" - " entry->prog_payload"); + if (!ret) { + for (i=0 ; irecv_count ; i++) { + post = gf_rdma_get_post (&device->recvq); + if (gf_rdma_post_recv (device->srq, post) != 0) { + ret = -1; + break; } } - } else { - pos = iov_length (entry->rpchdr, entry->rpchdr_count); - ret = __gf_rdma_create_read_chunks_from_vector (peer, - (gf_rdma_read_chunk_t **)ptr, - &pos, - entry->prog_payload, - entry->prog_payload_count, - request_ctx); - if (ret == -1) { - gf_log (GF_RDMA_LOG_NAME, GF_LOG_WARNING, - "cannot create read chunks from vector " - "entry->prog_payload"); - } } - /* terminate read-chunk list*/ - **ptr = 0; - *ptr = *ptr + 1; -out: + if (ret) + gf_rdma_destroy_posts (this); + return ret; } -int32_t -__gf_rdma_create_write_chunks_from_vector (gf_rdma_peer_t *peer, - gf_rdma_write_chunk_t **writech_ptr, - struct iovec *vector, int count, - gf_rdma_request_context_t *request_ctx) +static void +gf_rdma_destroy_cq (rpc_transport_t *this) { - int i = 0; - gf_rdma_private_t *priv = NULL; - gf_rdma_device_t *device = NULL; - struct ibv_mr *mr = NULL; - gf_rdma_write_chunk_t *writech = NULL; - int32_t ret = -1; + gf_rdma_private_t *priv = NULL; + gf_rdma_device_t *device = NULL; - GF_VALIDATE_OR_GOTO (GF_RDMA_LOG_NAME, peer, out); - GF_VALIDATE_OR_GOTO (GF_RDMA_LOG_NAME, writech_ptr, out); - GF_VALIDATE_OR_GOTO (GF_RDMA_LOG_NAME, *writech_ptr, out); - GF_VALIDATE_OR_GOTO (GF_RDMA_LOG_NAME, request_ctx, out); - GF_VALIDATE_OR_GOTO (GF_RDMA_LOG_NAME, vector, out); + priv = this->private; + device = priv->device; - writech = *writech_ptr; + if (device->recv_cq) + ibv_destroy_cq (device->recv_cq); + device->recv_cq = NULL; - priv = peer->trans->private; + if (device->send_cq) + ibv_destroy_cq (device->send_cq); + device->send_cq = NULL; + + return; +} + + +static int32_t +gf_rdma_create_cq (rpc_transport_t *this) +{ + gf_rdma_private_t *priv = NULL; + gf_rdma_options_t *options = NULL; + gf_rdma_device_t *device = NULL; + uint64_t send_cqe = 0; + int32_t ret = 0; + struct ibv_device_attr device_attr = {{0}, }; + + priv = this->private; + options = &priv->options; device = priv->device; - for (i = 0; i < count; i++) { - mr = ibv_reg_mr (device->pd, vector[i].iov_base, - vector[i].iov_len, - IBV_ACCESS_REMOTE_WRITE - | IBV_ACCESS_LOCAL_WRITE); - if (!mr) { - gf_log (GF_RDMA_LOG_NAME, GF_LOG_WARNING, - "memory registration failed (%s) (peer:%s)", - strerror (errno), - peer->trans->peerinfo.identifier); + device->recv_cq = ibv_create_cq (priv->device->context, + options->recv_count * 2, + device, + device->recv_chan, + 0); + if (!device->recv_cq) { + gf_log (this->name, GF_LOG_ERROR, + "creation of CQ for device %s failed", + device->device_name); + ret = -1; + goto out; + } else if (ibv_req_notify_cq (device->recv_cq, 0)) { + gf_log (this->name, GF_LOG_ERROR, + "ibv_req_notify_cq on recv CQ of device %s failed", + device->device_name); + ret = -1; + goto out; + } + + do { + ret = ibv_query_device (priv->device->context, &device_attr); + if (ret != 0) { + gf_log (this->name, GF_LOG_ERROR, + "ibv_query_device on %s returned %d (%s)", + priv->device->device_name, ret, + (ret > 0) ? strerror (ret) : ""); + ret = -1; goto out; } - request_ctx->mr[request_ctx->mr_count++] = mr; - - writech->wc_target.rs_handle = hton32 (mr->rkey); - writech->wc_target.rs_length = hton32 (vector[i].iov_len); - writech->wc_target.rs_offset - = hton64 (((uint64_t)(unsigned long)vector[i].iov_base)); + send_cqe = options->send_count * 128; + send_cqe = (send_cqe > device_attr.max_cqe) + ? device_attr.max_cqe : send_cqe; - writech++; - } + /* TODO: make send_cq size dynamically adaptive */ + device->send_cq = ibv_create_cq (priv->device->context, + send_cqe, device, + device->send_chan, 0); + if (!device->send_cq) { + gf_log (this->name, GF_LOG_ERROR, + "creation of send_cq for device %s failed", + device->device_name); + ret = -1; + goto out; + } - *writech_ptr = writech; + if (ibv_req_notify_cq (device->send_cq, 0)) { + gf_log (this->name, GF_LOG_ERROR, + "ibv_req_notify_cq on send_cq for device %s" + " failed", device->device_name); + ret = -1; + goto out; + } + } while (0); - ret = 0; out: + if (ret != 0) + gf_rdma_destroy_cq (this); + return ret; } -int32_t -__gf_rdma_create_write_chunks (gf_rdma_peer_t *peer, gf_rdma_ioq_t *entry, - gf_rdma_chunktype_t chunk_type, uint32_t **ptr, - gf_rdma_request_context_t *request_ctx) +static gf_rdma_device_t * +gf_rdma_get_device (rpc_transport_t *this, struct ibv_context *ibctx, + char *device_name) { - int32_t ret = -1; - gf_rdma_write_array_t *warray = NULL; + glusterfs_ctx_t *ctx = NULL; + gf_rdma_private_t *priv = NULL; + gf_rdma_options_t *options = NULL; + int32_t ret = 0; + int32_t i = 0; + gf_rdma_device_t *trav = NULL, *device = NULL; + gf_rdma_ctx_t *rdma_ctx = NULL; - GF_VALIDATE_OR_GOTO (GF_RDMA_LOG_NAME, peer, out); - GF_VALIDATE_OR_GOTO (GF_RDMA_LOG_NAME, ptr, out); - GF_VALIDATE_OR_GOTO (GF_RDMA_LOG_NAME, *ptr, out); - GF_VALIDATE_OR_GOTO (GF_RDMA_LOG_NAME, request_ctx, out); - GF_VALIDATE_OR_GOTO (GF_RDMA_LOG_NAME, entry, out); + priv = this->private; + options = &priv->options; + ctx = this->ctx; + rdma_ctx = ctx->ib; - if ((chunk_type == gf_rdma_replych) - && ((entry->msg.request.rsphdr_count != 1) || - (entry->msg.request.rsphdr_vec[0].iov_base == NULL))) { - gf_log (GF_RDMA_LOG_NAME, GF_LOG_WARNING, - (entry->msg.request.rsphdr_count == 1) - ? "chunktype specified as reply chunk but the vector " - "specifying the buffer to be used for holding reply" - " header is not correct" : - "chunktype specified as reply chunk, but more than one " - "buffer provided for holding reply"); - goto out; - } + trav = rdma_ctx->device; -/* - if ((chunk_type == gf_rdma_writech) - && ((entry->msg.request.rsphdr_count == 0) - || (entry->msg.request.rsphdr_vec[0].iov_base == NULL))) { - gf_log (GF_RDMA_LOG_NAME, GF_LOG_DEBUG, - "vector specifying buffer to hold the program's reply " - "header should also be provided when buffers are " - "provided for holding the program's payload in reply"); - goto out; - } -*/ + while (trav) { + if (!strcmp (trav->device_name, device_name)) + break; + trav = trav->next; + } - if (chunk_type == gf_rdma_writech) { - warray = (gf_rdma_write_array_t *)*ptr; - warray->wc_discrim = hton32 (1); - warray->wc_nchunks - = hton32 (entry->msg.request.rsp_payload_count); + if (!trav) { + trav = GF_CALLOC (1, sizeof (*trav), + gf_common_mt_rdma_device_t); + if (trav == NULL) { + goto out; + } - *ptr = (uint32_t *)&warray->wc_array[0]; + priv->device = trav; + trav->context = ibctx; - ret = __gf_rdma_create_write_chunks_from_vector (peer, - (gf_rdma_write_chunk_t **)ptr, - entry->msg.request.rsp_payload, - entry->msg.request.rsp_payload_count, - request_ctx); - if (ret == -1) { - gf_log (GF_RDMA_LOG_NAME, GF_LOG_WARNING, - "cannot create write chunks from vector " - "entry->rpc_payload"); + trav->request_ctx_pool + = mem_pool_new (gf_rdma_request_context_t, + GF_RDMA_POOL_SIZE); + if (trav->request_ctx_pool == NULL) { goto out; } - /* terminate write chunklist */ - **ptr = 0; - *ptr = *ptr + 1; + trav->ioq_pool + = mem_pool_new (gf_rdma_ioq_t, GF_RDMA_POOL_SIZE); + if (trav->ioq_pool == NULL) { + goto out; + } - /* no reply chunklist */ - **ptr = 0; - *ptr = *ptr + 1; - } else { - /* no write chunklist */ - **ptr = 0; - *ptr = *ptr + 1; + trav->reply_info_pool = mem_pool_new (gf_rdma_reply_info_t, + GF_RDMA_POOL_SIZE); + if (trav->reply_info_pool == NULL) { + goto out; + } - warray = (gf_rdma_write_array_t *)*ptr; - warray->wc_discrim = hton32 (1); - warray->wc_nchunks = hton32 (entry->msg.request.rsphdr_count); + trav->device_name = gf_strdup (device_name); - *ptr = (uint32_t *)&warray->wc_array[0]; + trav->next = rdma_ctx->device; + rdma_ctx->device = trav; - ret = __gf_rdma_create_write_chunks_from_vector (peer, - (gf_rdma_write_chunk_t **)ptr, - entry->msg.request.rsphdr_vec, - entry->msg.request.rsphdr_count, - request_ctx); - if (ret == -1) { - gf_log (GF_RDMA_LOG_NAME, GF_LOG_WARNING, - "cannot create write chunks from vector " - "entry->rpchdr"); + trav->send_chan = ibv_create_comp_channel (trav->context); + if (!trav->send_chan) { + gf_log (this->name, GF_LOG_ERROR, + "could not create send completion channel for " + "device (%s)", device_name); goto out; } - /* terminate reply chunklist */ - **ptr = 0; - *ptr = *ptr + 1; + trav->recv_chan = ibv_create_comp_channel (trav->context); + if (!trav->recv_chan) { + gf_log (this->name, GF_LOG_ERROR, + "could not create recv completion channel for " + "device (%s)", device_name); + + /* TODO: cleanup current mess */ + goto out; + } + + if (gf_rdma_create_cq (this) < 0) { + gf_log (this->name, GF_LOG_ERROR, + "could not create CQ for device (%s)", + device_name); + goto out; + } + + /* protection domain */ + trav->pd = ibv_alloc_pd (trav->context); + + if (!trav->pd) { + gf_log (this->name, GF_LOG_ERROR, + "could not allocate protection domain for " + "device (%s)", device_name); + goto out; + } + + struct ibv_srq_init_attr attr = { + .attr = { + .max_wr = options->recv_count, + .max_sge = 1 + } + }; + trav->srq = ibv_create_srq (trav->pd, &attr); + + if (!trav->srq) { + gf_log (this->name, GF_LOG_ERROR, + "could not create SRQ for device (%s)", + device_name); + goto out; + } + + /* queue init */ + gf_rdma_queue_init (&trav->sendq); + gf_rdma_queue_init (&trav->recvq); + + if (gf_rdma_create_posts (this) < 0) { + gf_log (this->name, GF_LOG_ERROR, + "could not allocate posts for device (%s)", + device_name); + goto out; + } + + /* completion threads */ + ret = pthread_create (&trav->send_thread, + NULL, + gf_rdma_send_completion_proc, + trav->send_chan); + if (ret) { + gf_log (this->name, GF_LOG_ERROR, + "could not create send completion thread for " + "device (%s)", device_name); + goto out; + } + + ret = pthread_create (&trav->recv_thread, + NULL, + gf_rdma_recv_completion_proc, + trav->recv_chan); + if (ret) { + gf_log (this->name, GF_LOG_ERROR, + "could not create recv completion thread " + "for device (%s)", device_name); + return NULL; + } + + /* qpreg */ + pthread_mutex_init (&trav->qpreg.lock, NULL); + for (i=0; i<42; i++) { + trav->qpreg.ents[i].next = &trav->qpreg.ents[i]; + trav->qpreg.ents[i].prev = &trav->qpreg.ents[i]; + } } + device = trav; + trav = NULL; out: - return ret; + + if (trav != NULL) { + gf_rdma_destroy_posts (this); + mem_pool_destroy (trav->ioq_pool); + mem_pool_destroy (trav->request_ctx_pool); + mem_pool_destroy (trav->reply_info_pool); + ibv_dealloc_pd (trav->pd); + gf_rdma_destroy_cq (this); + ibv_destroy_comp_channel (trav->recv_chan); + ibv_destroy_comp_channel (trav->send_chan); + GF_FREE ((char *)trav->device_name); + GF_FREE (trav); + } + + return device; } -static inline void -__gf_rdma_deregister_mr (struct ibv_mr **mr, int count) +static rpc_transport_t * +gf_rdma_transport_new (rpc_transport_t *listener, struct rdma_cm_id *cm_id) { - int i = 0; + gf_rdma_private_t *listener_priv = NULL, *priv = NULL; + rpc_transport_t *this = NULL, *new = NULL; + gf_rdma_options_t *options = NULL; + char *device_name = NULL; - if (mr == NULL) { + listener_priv = listener->private; + + this = GF_CALLOC (1, sizeof (rpc_transport_t), + gf_common_mt_rpc_transport_t); + if (this == NULL) { goto out; } - for (i = 0; i < count; i++) { - ibv_dereg_mr (mr[i]); + this->listener = listener; + + priv = GF_CALLOC (1, sizeof (gf_rdma_private_t), + gf_common_mt_rdma_private_t); + if (priv == NULL) { + goto out; } -out: - return; -} + this->private = priv; + priv->options = listener_priv->options; + priv->listener = listener; + priv->entity = GF_RDMA_SERVER; -static int32_t -__gf_rdma_quota_put (gf_rdma_peer_t *peer) -{ - int32_t ret = 0; + options = &priv->options; - peer->quota++; - ret = peer->quota; + this->ops = listener->ops; + this->init = listener->init; + this->fini = listener->fini; + this->ctx = listener->ctx; + this->name = gf_strdup (listener->name); + this->notify = listener->notify; + this->mydata = listener->mydata; - if (!list_empty (&peer->ioq)) { - ret = __gf_rdma_ioq_churn (peer); + this->myinfo.sockaddr_len = sizeof (cm_id->route.addr.src_addr); + memcpy (&this->myinfo.sockaddr, &cm_id->route.addr.src_addr, + this->myinfo.sockaddr_len); + + this->peerinfo.sockaddr_len = sizeof (cm_id->route.addr.dst_addr); + memcpy (&this->peerinfo.sockaddr, &cm_id->route.addr.dst_addr, + this->peerinfo.sockaddr_len); + + priv->peer.trans = this; + gf_rdma_get_transport_identifiers (this); + + device_name = (char *)ibv_get_device_name (cm_id->verbs->device); + if (device_name == NULL) { + gf_log (listener->name, GF_LOG_WARNING, + "cannot get device name (peer:%s me:%s)", + this->peerinfo.identifier, this->myinfo.identifier); + goto out; } - return ret; -} + priv->device = gf_rdma_get_device (this, cm_id->verbs, + device_name); + if (priv->device == NULL) { + gf_log (listener->name, GF_LOG_WARNING, + "cannot get infiniband device %s (peer:%s me:%s)", + device_name, this->peerinfo.identifier, + this->myinfo.identifier); + goto out; + } + priv->peer.send_count = options->send_count; + priv->peer.recv_count = options->recv_count; + priv->peer.send_size = options->send_size; + priv->peer.recv_size = options->recv_size; + priv->peer.cm_id = cm_id; + INIT_LIST_HEAD (&priv->peer.ioq); -static int32_t -gf_rdma_quota_put (gf_rdma_peer_t *peer) -{ - int32_t ret = 0; - gf_rdma_private_t *priv = NULL; + pthread_mutex_init (&priv->write_mutex, NULL); + pthread_mutex_init (&priv->recv_mutex, NULL); - priv = peer->trans->private; - pthread_mutex_lock (&priv->write_mutex); - { - ret = __gf_rdma_quota_put (peer); + cm_id->context = this; + + new = rpc_transport_ref (this); + this = NULL; +out: + if (this != NULL) { + if (this->private != NULL) { + GF_FREE (this->private); + } + + if (this->name != NULL) { + GF_FREE (this->name); + } + + GF_FREE (this); } - pthread_mutex_unlock (&priv->write_mutex); - return ret; + return new; } -/* to be called with priv->mutex held */ -void -__gf_rdma_request_context_destroy (gf_rdma_request_context_t *context) +static int +gf_rdma_cm_handle_connect_request (struct rdma_cm_event *event) { - gf_rdma_peer_t *peer = NULL; - gf_rdma_private_t *priv = NULL; - int32_t ret = 0; + int ret = -1; + rpc_transport_t *this = NULL, *listener = NULL; + struct rdma_cm_id *child_cm_id = NULL, *listener_cm_id = NULL; + struct rdma_conn_param conn_param = {0, }; + gf_rdma_private_t *priv = NULL; + gf_rdma_options_t *options = NULL; - if (context == NULL) { - goto out; - } + child_cm_id = event->id; + listener_cm_id = event->listen_id; - peer = context->peer; + listener = listener_cm_id->context; + priv = listener->private; + options = &priv->options; - __gf_rdma_deregister_mr (context->mr, context->mr_count); + this = gf_rdma_transport_new (listener, child_cm_id); + if (this == NULL) { + gf_log (listener->name, GF_LOG_WARNING, + "could not create a transport for incoming connection" + " (me.name:%s me.identifier:%s)", listener->name, + listener->myinfo.identifier); + rdma_destroy_id (child_cm_id); + goto out; + } - priv = peer->trans->private; + gf_log (listener->name, GF_LOG_TRACE, + "got a connect request (me:%s peer:%s)", + listener->myinfo.identifier, this->peerinfo.identifier); - if (priv->connected) { - ret = __gf_rdma_quota_put (peer); - if (ret < 0) { - gf_log ("rdma", GF_LOG_DEBUG, - "failed to send " - "message"); - mem_put (context); - __gf_rdma_disconnect (peer->trans); - goto out; - } + ret = gf_rdma_create_qp (this); + if (ret < 0) { + gf_log (listener->name, GF_LOG_WARNING, + "could not create QP (peer:%s me:%s)", + this->peerinfo.identifier, this->myinfo.identifier); + gf_rdma_cm_handle_disconnect (this); + goto out; } - if (context->iobref != NULL) { - iobref_unref (context->iobref); - context->iobref = NULL; - } + conn_param.responder_resources = 1; + conn_param.initiator_depth = 1; + conn_param.retry_count = options->attr_retry_cnt; + conn_param.rnr_retry_count = options->attr_rnr_retry; - if (context->rsp_iobref != NULL) { - iobref_unref (context->rsp_iobref); - context->rsp_iobref = NULL; - } + ret = rdma_accept(child_cm_id, &conn_param); + if (ret < 0) { + gf_log (listener->name, GF_LOG_WARNING, "rdma_accept failed " + "peer:%s me:%s (%s)", this->peerinfo.identifier, + this->myinfo.identifier, strerror (errno)); + gf_rdma_cm_handle_disconnect (this); + goto out; + } - mem_put (context); + ret = 0; out: - return; + return ret; } -void -gf_rdma_post_context_destroy (gf_rdma_post_context_t *ctx) +static int +gf_rdma_cm_handle_route_resolved (struct rdma_cm_event *event) { - if (ctx == NULL) { + struct rdma_conn_param conn_param = {0, }; + int ret = 0; + rpc_transport_t *this = NULL; + gf_rdma_private_t *priv = NULL; + gf_rdma_peer_t *peer = NULL; + gf_rdma_options_t *options = NULL; + + if (event == NULL) { goto out; } - __gf_rdma_deregister_mr (ctx->mr, ctx->mr_count); + this = event->id->context; - if (ctx->iobref != NULL) { - iobref_unref (ctx->iobref); - } + priv = this->private; + peer = &priv->peer; + options = &priv->options; - if (ctx->hdr_iobuf != NULL) { - iobuf_unref (ctx->hdr_iobuf); + ret = gf_rdma_create_qp (this); + if (ret != 0) { + gf_log (this->name, GF_LOG_WARNING, + "could not create QP (peer:%s me:%s)", + this->peerinfo.identifier, this->myinfo.identifier); + gf_rdma_cm_handle_disconnect (this); + goto out; } - memset (ctx, 0, sizeof (*ctx)); + memset(&conn_param, 0, sizeof conn_param); + conn_param.responder_resources = 1; + conn_param.initiator_depth = 1; + conn_param.retry_count = options->attr_retry_cnt; + conn_param.rnr_retry_count = options->attr_rnr_retry; + + ret = rdma_connect(peer->cm_id, &conn_param); + if (ret != 0) { + gf_log (this->name, GF_LOG_WARNING, + "rdma_connect failed (%s)", strerror (errno)); + gf_rdma_cm_handle_disconnect (this); + goto out; + } + + gf_log (this->name, GF_LOG_TRACE, "route resolved (me:%s peer:%s)", + this->myinfo.identifier, this->peerinfo.identifier); + + ret = 0; out: - return; + return ret; } -static int32_t -gf_rdma_post_recv (struct ibv_srq *srq, - gf_rdma_post_t *post) +static int +gf_rdma_cm_handle_addr_resolved (struct rdma_cm_event *event) { - struct ibv_sge list = { - .addr = (unsigned long) post->buf, - .length = post->buf_size, - .lkey = post->mr->lkey - }; + rpc_transport_t *this = NULL; + gf_rdma_peer_t *peer = NULL; + gf_rdma_private_t *priv = NULL; + int ret = 0; - struct ibv_recv_wr wr = { - .wr_id = (unsigned long) post, - .sg_list = &list, - .num_sge = 1, - }, *bad_wr; + this = event->id->context; - gf_rdma_post_ref (post); + priv = this->private; + peer = &priv->peer; - return ibv_post_srq_recv (srq, &wr, &bad_wr); + GF_ASSERT (peer->cm_id == event->id); + + this->myinfo.sockaddr_len = sizeof (peer->cm_id->route.addr.src_addr); + memcpy (&this->myinfo.sockaddr, &peer->cm_id->route.addr.src_addr, + this->myinfo.sockaddr_len); + + this->peerinfo.sockaddr_len = sizeof (peer->cm_id->route.addr.dst_addr); + memcpy (&this->peerinfo.sockaddr, &peer->cm_id->route.addr.dst_addr, + this->peerinfo.sockaddr_len); + + gf_rdma_get_transport_identifiers (this); + + ret = rdma_resolve_route(peer->cm_id, 2000); + if (ret != 0) { + gf_log (this->name, GF_LOG_WARNING, + "rdma_resolve_route failed (me:%s peer:%s) (%s)", + this->myinfo.identifier, this->peerinfo.identifier, + strerror (errno)); + gf_rdma_cm_handle_disconnect (this); + } + + gf_log (this->name, GF_LOG_TRACE, "Address resolved (me:%s peer:%s)", + this->myinfo.identifier, this->peerinfo.identifier); + + return ret; } -int -gf_rdma_post_unref (gf_rdma_post_t *post) +static void +gf_rdma_cm_handle_disconnect (rpc_transport_t *this) { - int refcount = -1; + gf_rdma_private_t *priv = NULL; + char need_unref = 0, connected = 0; - if (post == NULL) { - goto out; - } + priv = this->private; + gf_log (this->name, GF_LOG_DEBUG, + "peer disconnected, cleaning up"); - pthread_mutex_lock (&post->lock); + pthread_mutex_lock (&priv->write_mutex); { - refcount = --post->refcount; + if (priv->peer.cm_id != NULL) { + need_unref = 1; + connected = priv->connected; + priv->connected = 0; + } + + __gf_rdma_teardown (this); } - pthread_mutex_unlock (&post->lock); + pthread_mutex_unlock (&priv->write_mutex); - if (refcount == 0) { - gf_rdma_post_context_destroy (&post->ctx); - if (post->type == GF_RDMA_SEND_POST) { - gf_rdma_put_post (&post->device->sendq, post); - } else { - gf_rdma_post_recv (post->device->srq, post); - } + if (connected) { + rpc_transport_notify (this, RPC_TRANSPORT_DISCONNECT, this); } -out: - return refcount; + + if (need_unref) + rpc_transport_unref (this); + } -int -gf_rdma_post_get_refcount (gf_rdma_post_t *post) +static int +gf_rdma_cm_handle_event_established (struct rdma_cm_event *event) { - int refcount = -1; + rpc_transport_t *this = NULL; + gf_rdma_private_t *priv = NULL; + struct rdma_cm_id *cm_id = NULL; + int ret = 0; - if (post == NULL) { - goto out; - } + cm_id = event->id; + this = cm_id->context; + priv = this->private; - pthread_mutex_lock (&post->lock); + priv->connected = 1; + + pthread_mutex_lock (&priv->write_mutex); { - refcount = post->refcount; + priv->peer.quota = 1; + priv->peer.quota_set = 0; } - pthread_mutex_unlock (&post->lock); + pthread_mutex_unlock (&priv->write_mutex); -out: - return refcount; -} + if (priv->entity == GF_RDMA_CLIENT) { + ret = rpc_transport_notify (this, RPC_TRANSPORT_CONNECT, this); -gf_rdma_post_t * -gf_rdma_post_ref (gf_rdma_post_t *post) -{ - if (post == NULL) { - goto out; + } else if (priv->entity == GF_RDMA_SERVER) { + ret = rpc_transport_notify (priv->listener, + RPC_TRANSPORT_ACCEPT, this); } - pthread_mutex_lock (&post->lock); - { - post->refcount++; + if (ret < 0) { + gf_rdma_disconnect (this); } - pthread_mutex_unlock (&post->lock); -out: - return post; + gf_log (this->name, GF_LOG_TRACE, + "recieved event RDMA_CM_EVENT_ESTABLISHED (me:%s peer:%s)", + this->myinfo.identifier, this->peerinfo.identifier); + + return ret; } -int32_t -__gf_rdma_ioq_churn_request (gf_rdma_peer_t *peer, gf_rdma_ioq_t *entry, - gf_rdma_post_t *post) +static int +gf_rdma_cm_handle_event_error (rpc_transport_t *this) { - gf_rdma_chunktype_t rtype = gf_rdma_noch; - gf_rdma_chunktype_t wtype = gf_rdma_noch; - uint64_t send_size = 0; - gf_rdma_header_t *hdr = NULL; - struct rpc_msg *rpc_msg = NULL; - uint32_t *chunkptr = NULL; - char *buf = NULL; - int32_t ret = 0; - gf_rdma_private_t *priv = NULL; - gf_rdma_device_t *device = NULL; - int chunk_count = 0; - gf_rdma_request_context_t *request_ctx = NULL; - uint32_t prog_payload_length = 0, len = 0; - struct rpc_req *rpc_req = NULL; + gf_rdma_private_t *priv = NULL; - GF_VALIDATE_OR_GOTO (GF_RDMA_LOG_NAME, peer, out); - GF_VALIDATE_OR_GOTO (GF_RDMA_LOG_NAME, entry, out); - GF_VALIDATE_OR_GOTO (GF_RDMA_LOG_NAME, post, out); + priv = this->private; - if ((entry->msg.request.rsphdr_count != 0) - && (entry->msg.request.rsp_payload_count != 0)) { - ret = -1; - gf_log (GF_RDMA_LOG_NAME, GF_LOG_WARNING, - "both write-chunklist and reply-chunk cannot be " - "present"); - goto out; + if (priv->entity != GF_RDMA_SERVER_LISTENER) { + gf_rdma_cm_handle_disconnect (this); } - post->ctx.is_request = 1; - priv = peer->trans->private; - device = priv->device; - - hdr = (gf_rdma_header_t *)post->buf; + return 0; +} - send_size = iov_length (entry->rpchdr, entry->rpchdr_count) - + iov_length (entry->proghdr, entry->proghdr_count) - + GLUSTERFS_RDMA_MAX_HEADER_SIZE; - if (entry->prog_payload_count != 0) { - prog_payload_length - = iov_length (entry->prog_payload, - entry->prog_payload_count); - } +static int +gf_rdma_cm_handle_device_removal (struct rdma_cm_event *event) +{ + return 0; +} - if (send_size > GLUSTERFS_RDMA_INLINE_THRESHOLD) { - rtype = gf_rdma_areadch; - } else if ((send_size + prog_payload_length) - < GLUSTERFS_RDMA_INLINE_THRESHOLD) { - rtype = gf_rdma_noch; - } else if (entry->prog_payload_count != 0) { - rtype = gf_rdma_readch; - } - if (entry->msg.request.rsphdr_count != 0) { - wtype = gf_rdma_replych; - } else if (entry->msg.request.rsp_payload_count != 0) { - wtype = gf_rdma_writech; - } +static void * +gf_rdma_cm_event_handler (void *data) +{ + struct rdma_cm_event *event = NULL; + int ret = 0; + rpc_transport_t *this = NULL; + struct rdma_event_channel *event_channel = NULL; - if (rtype == gf_rdma_readch) { - chunk_count += entry->prog_payload_count; - } else if (rtype == gf_rdma_areadch) { - chunk_count += entry->rpchdr_count; - chunk_count += entry->proghdr_count; - } + event_channel = data; - if (wtype == gf_rdma_writech) { - chunk_count += entry->msg.request.rsp_payload_count; - } else if (wtype == gf_rdma_replych) { - chunk_count += entry->msg.request.rsphdr_count; - } + while (1) { + ret = rdma_get_cm_event (event_channel, &event); + if (ret != 0) { + gf_log (GF_RDMA_LOG_NAME, GF_LOG_WARNING, + "rdma_cm_get_event failed (%s)", + strerror (errno)); + break; + } - if (chunk_count > GF_RDMA_MAX_SEGMENTS) { - ret = -1; - gf_log (GF_RDMA_LOG_NAME, GF_LOG_WARNING, - "chunk count(%d) exceeding maximum allowed RDMA " - "segment count(%d)", chunk_count, GF_RDMA_MAX_SEGMENTS); - goto out; - } + switch (event->event) { + case RDMA_CM_EVENT_ADDR_RESOLVED: + gf_rdma_cm_handle_addr_resolved (event); + break; - if ((wtype != gf_rdma_noch) || (rtype != gf_rdma_noch)) { - request_ctx = mem_get (device->request_ctx_pool); - if (request_ctx == NULL) { - ret = -1; - goto out; - } + case RDMA_CM_EVENT_ROUTE_RESOLVED: + gf_rdma_cm_handle_route_resolved (event); + break; - memset (request_ctx, 0, sizeof (*request_ctx)); + case RDMA_CM_EVENT_CONNECT_REQUEST: + gf_rdma_cm_handle_connect_request (event); + break; - request_ctx->pool = device->request_ctx_pool; - request_ctx->peer = peer; + case RDMA_CM_EVENT_ESTABLISHED: + gf_rdma_cm_handle_event_established (event); + break; - entry->msg.request.rpc_req->conn_private = request_ctx; + case RDMA_CM_EVENT_ADDR_ERROR: + case RDMA_CM_EVENT_ROUTE_ERROR: + case RDMA_CM_EVENT_CONNECT_ERROR: + case RDMA_CM_EVENT_UNREACHABLE: + case RDMA_CM_EVENT_REJECTED: + this = event->id->context; - if (entry->msg.request.rsp_iobref != NULL) { - request_ctx->rsp_iobref - = iobref_ref (entry->msg.request.rsp_iobref); - } - } + gf_log (this->name, GF_LOG_WARNING, + "cma event %s, error %d (me:%s peer:%s)\n", + rdma_event_str(event->event), event->status, + this->myinfo.identifier, + this->peerinfo.identifier); - rpc_msg = (struct rpc_msg *) entry->rpchdr[0].iov_base; + rdma_ack_cm_event (event); + event = NULL; - hdr->rm_xid = rpc_msg->rm_xid; /* no need of hton32(rpc_msg->rm_xid), - * since rpc_msg->rm_xid is already - * hton32ed value of actual xid - */ - hdr->rm_vers = hton32 (GF_RDMA_VERSION); - hdr->rm_credit = hton32 (peer->send_count); - if (rtype == gf_rdma_areadch) { - hdr->rm_type = hton32 (GF_RDMA_NOMSG); - } else { - hdr->rm_type = hton32 (GF_RDMA_MSG); - } + gf_rdma_cm_handle_event_error (this); + continue; - chunkptr = &hdr->rm_body.rm_chunks[0]; - if (rtype != gf_rdma_noch) { - ret = __gf_rdma_create_read_chunks (peer, entry, rtype, - &chunkptr, - request_ctx); - if (ret != 0) { - gf_log (GF_RDMA_LOG_NAME, GF_LOG_WARNING, - "creation of read chunks failed"); - goto out; - } - } else { - *chunkptr++ = 0; /* no read chunks */ - } + case RDMA_CM_EVENT_DISCONNECTED: + this = event->id->context; - if (wtype != gf_rdma_noch) { - ret = __gf_rdma_create_write_chunks (peer, entry, wtype, - &chunkptr, - request_ctx); - if (ret != 0) { - gf_log (GF_RDMA_LOG_NAME, GF_LOG_WARNING, - "creation of write/reply chunk failed"); - goto out; - } - } else { - *chunkptr++ = 0; /* no write chunks */ - *chunkptr++ = 0; /* no reply chunk */ - } + gf_log (this->name, GF_LOG_DEBUG, + "recieved disconnect (me:%s peer:%s)\n", + this->myinfo.identifier, + this->peerinfo.identifier); - buf = (char *)chunkptr; + rdma_ack_cm_event (event); + event = NULL; - if (rtype != gf_rdma_areadch) { - iov_unload (buf, entry->rpchdr, entry->rpchdr_count); - buf += iov_length (entry->rpchdr, entry->rpchdr_count); + gf_rdma_cm_handle_disconnect (this); + continue; - iov_unload (buf, entry->proghdr, entry->proghdr_count); - buf += iov_length (entry->proghdr, entry->proghdr_count); + case RDMA_CM_EVENT_DEVICE_REMOVAL: + gf_log (GF_RDMA_LOG_NAME, GF_LOG_WARNING, + "device removed"); + gf_rdma_cm_handle_device_removal (event); + break; - if (rtype != gf_rdma_readch) { - iov_unload (buf, entry->prog_payload, - entry->prog_payload_count); - buf += iov_length (entry->prog_payload, - entry->prog_payload_count); + default: + gf_log (GF_RDMA_LOG_NAME, GF_LOG_WARNING, + "unhandled event: %s, ignoring", + rdma_event_str(event->event)); + break; } + + rdma_ack_cm_event (event); } - len = buf - post->buf; + return NULL; +} - gf_rdma_post_ref (post); - ret = gf_rdma_post_send (peer->qp, post, len); - if (!ret) { - ret = len; - } else { - gf_log (GF_RDMA_LOG_NAME, GF_LOG_WARNING, - "gf_rdma_post_send (to %s) failed with ret = %d (%s)", - peer->trans->peerinfo.identifier, ret, - (ret > 0) ? strerror (ret) : ""); - gf_rdma_post_unref (post); - __gf_rdma_disconnect (peer->trans); - ret = -1; - } - -out: - if (ret == -1) { - rpc_req = entry->msg.request.rpc_req; +static int32_t +gf_rdma_post_send (struct ibv_qp *qp, gf_rdma_post_t *post, int32_t len) +{ + struct ibv_sge list = { + .addr = (unsigned long) post->buf, + .length = len, + .lkey = post->mr->lkey + }; - if (request_ctx != NULL) { - __gf_rdma_request_context_destroy (rpc_req->conn_private); - } + struct ibv_send_wr wr = { + .wr_id = (unsigned long) post, + .sg_list = &list, + .num_sge = 1, + .opcode = IBV_WR_SEND, + .send_flags = IBV_SEND_SIGNALED, + }, *bad_wr; - rpc_req->conn_private = NULL; - } + if (!qp) + return EINVAL; - return ret; + return ibv_post_send (qp, &wr, &bad_wr); } - -static inline void -__gf_rdma_fill_reply_header (gf_rdma_header_t *header, struct iovec *rpchdr, - gf_rdma_reply_info_t *reply_info, int credits) +int +__gf_rdma_encode_error(gf_rdma_peer_t *peer, gf_rdma_reply_info_t *reply_info, + struct iovec *rpchdr, gf_rdma_header_t *hdr, + gf_rdma_errcode_t err) { struct rpc_msg *rpc_msg = NULL; if (reply_info != NULL) { - header->rm_xid = hton32 (reply_info->rm_xid); + hdr->rm_xid = hton32(reply_info->rm_xid); } else { rpc_msg = rpchdr[0].iov_base; /* assume rpchdr contains * only one vector. * (which is true) */ - header->rm_xid = rpc_msg->rm_xid; + hdr->rm_xid = rpc_msg->rm_xid; } - header->rm_type = hton32 (GF_RDMA_MSG); - header->rm_vers = hton32 (GF_RDMA_VERSION); - header->rm_credit = hton32 (credits); - - header->rm_body.rm_chunks[0] = 0; /* no read chunks */ - header->rm_body.rm_chunks[1] = 0; /* no write chunks */ - header->rm_body.rm_chunks[2] = 0; /* no reply chunks */ + hdr->rm_vers = hton32(GF_RDMA_VERSION); + hdr->rm_credit = hton32(peer->send_count); + hdr->rm_type = hton32(GF_RDMA_ERROR); + hdr->rm_body.rm_error.rm_type = hton32(err); + if (err == ERR_VERS) { + hdr->rm_body.rm_error.rm_version.gf_rdma_vers_low + = hton32(GF_RDMA_VERSION); + hdr->rm_body.rm_error.rm_version.gf_rdma_vers_high + = hton32(GF_RDMA_VERSION); + } - return; + return sizeof (*hdr); } int32_t -__gf_rdma_send_reply_inline (gf_rdma_peer_t *peer, gf_rdma_ioq_t *entry, - gf_rdma_post_t *post, - gf_rdma_reply_info_t *reply_info) +__gf_rdma_send_error (gf_rdma_peer_t *peer, gf_rdma_ioq_t *entry, + gf_rdma_post_t *post, gf_rdma_reply_info_t *reply_info, + gf_rdma_errcode_t err) { - gf_rdma_header_t *header = NULL; - int32_t send_size = 0, ret = 0; - char *buf = NULL; - - send_size = iov_length (entry->rpchdr, entry->rpchdr_count) - + iov_length (entry->proghdr, entry->proghdr_count) - + iov_length (entry->prog_payload, entry->prog_payload_count) - + sizeof (gf_rdma_header_t); /* - * remember, no chunklists in the - * reply - */ + int32_t ret = -1, len = 0; - if (send_size > GLUSTERFS_RDMA_INLINE_THRESHOLD) { - ret = __gf_rdma_send_error (peer, entry, post, reply_info, - ERR_CHUNK); - gf_log (GF_RDMA_LOG_NAME, GF_LOG_WARNING, - "msg size (%d) is greater than maximum size " - "of msg that can be sent inlined (%d)", - send_size, GLUSTERFS_RDMA_INLINE_THRESHOLD); + len = __gf_rdma_encode_error (peer, reply_info, entry->rpchdr, + (gf_rdma_header_t *)post->buf, err); + if (len == -1) { + gf_log (GF_RDMA_LOG_NAME, GF_LOG_ERROR, + "encode error returned -1"); goto out; } - header = (gf_rdma_header_t *)post->buf; - - __gf_rdma_fill_reply_header (header, entry->rpchdr, reply_info, - peer->send_count); - - buf = (char *)&header->rm_body.rm_chunks[3]; - - if (entry->rpchdr_count != 0) { - iov_unload (buf, entry->rpchdr, entry->rpchdr_count); - buf += iov_length (entry->rpchdr, entry->rpchdr_count); - } - - if (entry->proghdr_count != 0) { - iov_unload (buf, entry->proghdr, entry->proghdr_count); - buf += iov_length (entry->proghdr, entry->proghdr_count); - } - - if (entry->prog_payload_count != 0) { - iov_unload (buf, entry->prog_payload, - entry->prog_payload_count); - buf += iov_length (entry->prog_payload, - entry->prog_payload_count); - } - gf_rdma_post_ref (post); - ret = gf_rdma_post_send (peer->qp, post, (buf - post->buf)); + ret = gf_rdma_post_send (peer->qp, post, len); if (!ret) { - ret = send_size; + ret = len; } else { gf_log (GF_RDMA_LOG_NAME, GF_LOG_WARNING, - "posting send (to %s) failed with ret = %d (%s)", + "gf_rdma_post_send (to %s) failed with ret = %d (%s)", peer->trans->peerinfo.identifier, ret, (ret > 0) ? strerror (ret) : ""); gf_rdma_post_unref (post); @@ -1214,3517 +1195,3095 @@ out: int32_t -__gf_rdma_reply_encode_write_chunks (gf_rdma_peer_t *peer, - uint32_t payload_size, - gf_rdma_post_t *post, - gf_rdma_reply_info_t *reply_info, - uint32_t **ptr) +__gf_rdma_create_read_chunks_from_vector (gf_rdma_peer_t *peer, + gf_rdma_read_chunk_t **readch_ptr, + int32_t *pos, struct iovec *vector, + int count, + gf_rdma_request_context_t *request_ctx) { - uint32_t chunk_size = 0; - int32_t ret = -1; - gf_rdma_write_array_t *target_array = NULL; - int i = 0; + int i = 0; + gf_rdma_private_t *priv = NULL; + gf_rdma_device_t *device = NULL; + struct ibv_mr *mr = NULL; + gf_rdma_read_chunk_t *readch = NULL; + int32_t ret = -1; - target_array = (gf_rdma_write_array_t *)*ptr; + GF_VALIDATE_OR_GOTO (GF_RDMA_LOG_NAME, peer, out); + GF_VALIDATE_OR_GOTO (GF_RDMA_LOG_NAME, readch_ptr, out); + GF_VALIDATE_OR_GOTO (GF_RDMA_LOG_NAME, *readch_ptr, out); + GF_VALIDATE_OR_GOTO (GF_RDMA_LOG_NAME, request_ctx, out); + GF_VALIDATE_OR_GOTO (GF_RDMA_LOG_NAME, vector, out); - for (i = 0; i < reply_info->wc_array->wc_nchunks; i++) { - chunk_size += - reply_info->wc_array->wc_array[i].wc_target.rs_length; - } + priv = peer->trans->private; + device = priv->device; + readch = *readch_ptr; - if (chunk_size < payload_size) { - gf_log (GF_RDMA_LOG_NAME, GF_LOG_DEBUG, - "length of payload (%d) is exceeding the total " - "write chunk length (%d)", payload_size, chunk_size); - goto out; - } + for (i = 0; i < count; i++) { + readch->rc_discrim = hton32 (1); + readch->rc_position = hton32 (*pos); - target_array->wc_discrim = hton32 (1); - for (i = 0; (i < reply_info->wc_array->wc_nchunks) - && (payload_size != 0); - i++) { - target_array->wc_array[i].wc_target.rs_offset - = hton64 (reply_info->wc_array->wc_array[i].wc_target.rs_offset); + mr = ibv_reg_mr (device->pd, vector[i].iov_base, + vector[i].iov_len, + IBV_ACCESS_REMOTE_READ); + if (!mr) { + gf_log (GF_RDMA_LOG_NAME, GF_LOG_WARNING, + "memory registration failed (%s) (peer:%s)", + strerror (errno), + peer->trans->peerinfo.identifier); + goto out; + } - target_array->wc_array[i].wc_target.rs_length - = hton32 (min (payload_size, - reply_info->wc_array->wc_array[i].wc_target.rs_length)); + request_ctx->mr[request_ctx->mr_count++] = mr; + + readch->rc_target.rs_handle = hton32 (mr->rkey); + readch->rc_target.rs_length + = hton32 (vector[i].iov_len); + readch->rc_target.rs_offset + = hton64 ((uint64_t)(unsigned long)vector[i].iov_base); + + *pos = *pos + vector[i].iov_len; + readch++; } - target_array->wc_nchunks = hton32 (i); - target_array->wc_array[i].wc_target.rs_handle = 0; /* terminate - chunklist */ + *readch_ptr = readch; ret = 0; - - *ptr = &target_array->wc_array[i].wc_target.rs_length; out: return ret; } -static inline int32_t -__gf_rdma_register_local_mr_for_rdma (gf_rdma_peer_t *peer, - struct iovec *vector, int count, - gf_rdma_post_context_t *ctx) +int32_t +__gf_rdma_create_read_chunks (gf_rdma_peer_t *peer, gf_rdma_ioq_t *entry, + gf_rdma_chunktype_t type, uint32_t **ptr, + gf_rdma_request_context_t *request_ctx) { - int i = 0; - int32_t ret = -1; - gf_rdma_private_t *priv = NULL; - gf_rdma_device_t *device = NULL; + int32_t ret = -1; + int pos = 0; - GF_VALIDATE_OR_GOTO (GF_RDMA_LOG_NAME, ctx, out); - GF_VALIDATE_OR_GOTO (GF_RDMA_LOG_NAME, vector, out); + GF_VALIDATE_OR_GOTO (GF_RDMA_LOG_NAME, peer, out); + GF_VALIDATE_OR_GOTO (GF_RDMA_LOG_NAME, entry, out); + GF_VALIDATE_OR_GOTO (GF_RDMA_LOG_NAME, ptr, out); + GF_VALIDATE_OR_GOTO (GF_RDMA_LOG_NAME, *ptr, out); + GF_VALIDATE_OR_GOTO (GF_RDMA_LOG_NAME, request_ctx, out); - priv = peer->trans->private; - device = priv->device; + request_ctx->iobref = iobref_ref (entry->iobref); - for (i = 0; i < count; i++) { - /* what if the memory is registered more than once? - * Assume that a single write buffer is passed to afr, which - * then passes it to its children. If more than one children - * happen to use rdma, then the buffer is registered more than - * once. - * Ib-verbs specification says that multiple registrations of - * same memory location is allowed. Refer to 10.6.3.8 of - * Infiniband Architecture Specification Volume 1 - * (Release 1.2.1) - */ - ctx->mr[ctx->mr_count] = ibv_reg_mr (device->pd, - vector[i].iov_base, - vector[i].iov_len, - IBV_ACCESS_LOCAL_WRITE); - if (ctx->mr[ctx->mr_count] == NULL) { + if (type == gf_rdma_areadch) { + pos = 0; + ret = __gf_rdma_create_read_chunks_from_vector (peer, + (gf_rdma_read_chunk_t **)ptr, + &pos, + entry->rpchdr, + entry->rpchdr_count, + request_ctx); + if (ret == -1) { gf_log (GF_RDMA_LOG_NAME, GF_LOG_WARNING, - "registering memory for IBV_ACCESS_LOCAL_WRITE " - "failed (%s)", strerror (errno)); + "cannot create read chunks from vector " + "entry->rpchdr"); goto out; } - ctx->mr_count++; + ret = __gf_rdma_create_read_chunks_from_vector (peer, + (gf_rdma_read_chunk_t **)ptr, + &pos, + entry->proghdr, + entry->proghdr_count, + request_ctx); + if (ret == -1) { + gf_log (GF_RDMA_LOG_NAME, GF_LOG_WARNING, + "cannot create read chunks from vector " + "entry->proghdr"); + } + + if (entry->prog_payload_count != 0) { + ret = __gf_rdma_create_read_chunks_from_vector (peer, + (gf_rdma_read_chunk_t **)ptr, + &pos, + entry->prog_payload, + entry->prog_payload_count, + request_ctx); + if (ret == -1) { + gf_log (GF_RDMA_LOG_NAME, GF_LOG_WARNING, + "cannot create read chunks from vector" + " entry->prog_payload"); + } + } + } else { + pos = iov_length (entry->rpchdr, entry->rpchdr_count); + ret = __gf_rdma_create_read_chunks_from_vector (peer, + (gf_rdma_read_chunk_t **)ptr, + &pos, + entry->prog_payload, + entry->prog_payload_count, + request_ctx); + if (ret == -1) { + gf_log (GF_RDMA_LOG_NAME, GF_LOG_WARNING, + "cannot create read chunks from vector " + "entry->prog_payload"); + } } - ret = 0; + /* terminate read-chunk list*/ + **ptr = 0; + *ptr = *ptr + 1; out: return ret; } -/* 1. assumes xfer_len of data is pointed by vector(s) starting from vec[*idx] - * 2. modifies vec - */ + int32_t -__gf_rdma_write (gf_rdma_peer_t *peer, gf_rdma_post_t *post, struct iovec *vec, - uint32_t xfer_len, int *idx, gf_rdma_write_chunk_t *writech) +__gf_rdma_create_write_chunks_from_vector (gf_rdma_peer_t *peer, + gf_rdma_write_chunk_t **writech_ptr, + struct iovec *vector, int count, + gf_rdma_request_context_t *request_ctx) { - int size = 0, num_sge = 0, i = 0; - int32_t ret = -1; - struct ibv_sge *sg_list = NULL; - struct ibv_send_wr wr = { - .opcode = IBV_WR_RDMA_WRITE, - .send_flags = IBV_SEND_SIGNALED, - }, *bad_wr; - - if ((peer == NULL) || (writech == NULL) || (idx == NULL) - || (post == NULL) || (vec == NULL) || (xfer_len == 0)) { - goto out; - } - - for (i = *idx; size < xfer_len; i++) { - size += vec[i].iov_len; - } + int i = 0; + gf_rdma_private_t *priv = NULL; + gf_rdma_device_t *device = NULL; + struct ibv_mr *mr = NULL; + gf_rdma_write_chunk_t *writech = NULL; + int32_t ret = -1; - num_sge = i - *idx; + GF_VALIDATE_OR_GOTO (GF_RDMA_LOG_NAME, peer, out); + GF_VALIDATE_OR_GOTO (GF_RDMA_LOG_NAME, writech_ptr, out); + GF_VALIDATE_OR_GOTO (GF_RDMA_LOG_NAME, *writech_ptr, out); + GF_VALIDATE_OR_GOTO (GF_RDMA_LOG_NAME, request_ctx, out); + GF_VALIDATE_OR_GOTO (GF_RDMA_LOG_NAME, vector, out); - sg_list = GF_CALLOC (num_sge, sizeof (struct ibv_sge), - gf_common_mt_sge); - if (sg_list == NULL) { - ret = -1; - goto out; - } + writech = *writech_ptr; - for ((i = *idx), (num_sge = 0); (xfer_len != 0); i++, num_sge++) { - size = min (xfer_len, vec[i].iov_len); + priv = peer->trans->private; + device = priv->device; - sg_list [num_sge].addr = (unsigned long)vec[i].iov_base; - sg_list [num_sge].length = size; - sg_list [num_sge].lkey = post->ctx.mr[i]->lkey; + for (i = 0; i < count; i++) { + mr = ibv_reg_mr (device->pd, vector[i].iov_base, + vector[i].iov_len, + IBV_ACCESS_REMOTE_WRITE + | IBV_ACCESS_LOCAL_WRITE); + if (!mr) { + gf_log (GF_RDMA_LOG_NAME, GF_LOG_WARNING, + "memory registration failed (%s) (peer:%s)", + strerror (errno), + peer->trans->peerinfo.identifier); + goto out; + } - xfer_len -= size; - } + request_ctx->mr[request_ctx->mr_count++] = mr; - *idx = i; + writech->wc_target.rs_handle = hton32 (mr->rkey); + writech->wc_target.rs_length = hton32 (vector[i].iov_len); + writech->wc_target.rs_offset + = hton64 (((uint64_t)(unsigned long)vector[i].iov_base)); - if (size < vec[i - 1].iov_len) { - vec[i - 1].iov_base += size; - vec[i - 1].iov_len -= size; - *idx = i - 1; + writech++; } - wr.sg_list = sg_list; - wr.num_sge = num_sge; - wr.wr_id = (unsigned long) gf_rdma_post_ref (post); - wr.wr.rdma.rkey = writech->wc_target.rs_handle; - wr.wr.rdma.remote_addr = writech->wc_target.rs_offset; - - ret = ibv_post_send(peer->qp, &wr, &bad_wr); - if (ret) { - gf_log (GF_RDMA_LOG_NAME, GF_LOG_WARNING, - "rdma write to " - "client (%s) failed with ret = %d (%s)", - peer->trans->peerinfo.identifier, ret, - (ret > 0) ? strerror (ret) : ""); - ret = -1; - } + *writech_ptr = writech; - GF_FREE (sg_list); + ret = 0; out: return ret; } int32_t -__gf_rdma_do_gf_rdma_write (gf_rdma_peer_t *peer, gf_rdma_post_t *post, - struct iovec *vector, int count, - struct iobref *iobref, - gf_rdma_reply_info_t *reply_info) +__gf_rdma_create_write_chunks (gf_rdma_peer_t *peer, gf_rdma_ioq_t *entry, + gf_rdma_chunktype_t chunk_type, uint32_t **ptr, + gf_rdma_request_context_t *request_ctx) { - int i = 0, payload_idx = 0; - uint32_t payload_size = 0, xfer_len = 0; - int32_t ret = -1; - - if (count != 0) { - payload_size = iov_length (vector, count); - } + int32_t ret = -1; + gf_rdma_write_array_t *warray = NULL; - if (payload_size == 0) { - ret = 0; - goto out; - } + GF_VALIDATE_OR_GOTO (GF_RDMA_LOG_NAME, peer, out); + GF_VALIDATE_OR_GOTO (GF_RDMA_LOG_NAME, ptr, out); + GF_VALIDATE_OR_GOTO (GF_RDMA_LOG_NAME, *ptr, out); + GF_VALIDATE_OR_GOTO (GF_RDMA_LOG_NAME, request_ctx, out); + GF_VALIDATE_OR_GOTO (GF_RDMA_LOG_NAME, entry, out); - ret = __gf_rdma_register_local_mr_for_rdma (peer, vector, count, - &post->ctx); - if (ret == -1) { + if ((chunk_type == gf_rdma_replych) + && ((entry->msg.request.rsphdr_count != 1) || + (entry->msg.request.rsphdr_vec[0].iov_base == NULL))) { gf_log (GF_RDMA_LOG_NAME, GF_LOG_WARNING, - "registering memory region for rdma failed"); + (entry->msg.request.rsphdr_count == 1) + ? "chunktype specified as reply chunk but the vector " + "specifying the buffer to be used for holding reply" + " header is not correct" : + "chunktype specified as reply chunk, but more than one " + "buffer provided for holding reply"); goto out; } - post->ctx.iobref = iobref_ref (iobref); +/* + if ((chunk_type == gf_rdma_writech) + && ((entry->msg.request.rsphdr_count == 0) + || (entry->msg.request.rsphdr_vec[0].iov_base == NULL))) { + gf_log (GF_RDMA_LOG_NAME, GF_LOG_DEBUG, + "vector specifying buffer to hold the program's reply " + "header should also be provided when buffers are " + "provided for holding the program's payload in reply"); + goto out; + } +*/ - for (i = 0; (i < reply_info->wc_array->wc_nchunks) - && (payload_size != 0); - i++) { - xfer_len = min (payload_size, - reply_info->wc_array->wc_array[i].wc_target.rs_length); + if (chunk_type == gf_rdma_writech) { + warray = (gf_rdma_write_array_t *)*ptr; + warray->wc_discrim = hton32 (1); + warray->wc_nchunks + = hton32 (entry->msg.request.rsp_payload_count); - ret = __gf_rdma_write (peer, post, vector, xfer_len, - &payload_idx, - &reply_info->wc_array->wc_array[i]); + *ptr = (uint32_t *)&warray->wc_array[0]; + + ret = __gf_rdma_create_write_chunks_from_vector (peer, + (gf_rdma_write_chunk_t **)ptr, + entry->msg.request.rsp_payload, + entry->msg.request.rsp_payload_count, + request_ctx); if (ret == -1) { gf_log (GF_RDMA_LOG_NAME, GF_LOG_WARNING, - "rdma write to client (%s) failed", - peer->trans->peerinfo.identifier); + "cannot create write chunks from vector " + "entry->rpc_payload"); goto out; } - payload_size -= xfer_len; + /* terminate write chunklist */ + **ptr = 0; + *ptr = *ptr + 1; + + /* no reply chunklist */ + **ptr = 0; + *ptr = *ptr + 1; + } else { + /* no write chunklist */ + **ptr = 0; + *ptr = *ptr + 1; + + warray = (gf_rdma_write_array_t *)*ptr; + warray->wc_discrim = hton32 (1); + warray->wc_nchunks = hton32 (entry->msg.request.rsphdr_count); + + *ptr = (uint32_t *)&warray->wc_array[0]; + + ret = __gf_rdma_create_write_chunks_from_vector (peer, + (gf_rdma_write_chunk_t **)ptr, + entry->msg.request.rsphdr_vec, + entry->msg.request.rsphdr_count, + request_ctx); + if (ret == -1) { + gf_log (GF_RDMA_LOG_NAME, GF_LOG_WARNING, + "cannot create write chunks from vector " + "entry->rpchdr"); + goto out; + } + + /* terminate reply chunklist */ + **ptr = 0; + *ptr = *ptr + 1; } - ret = 0; out: - return ret; } -int32_t -__gf_rdma_send_reply_type_nomsg (gf_rdma_peer_t *peer, gf_rdma_ioq_t *entry, - gf_rdma_post_t *post, - gf_rdma_reply_info_t *reply_info) +static inline void +__gf_rdma_deregister_mr (struct ibv_mr **mr, int count) { - gf_rdma_header_t *header = NULL; - char *buf = NULL; - uint32_t payload_size = 0; - int count = 0, i = 0; - int32_t ret = 0; - struct iovec vector[MAX_IOVEC]; + int i = 0; - header = (gf_rdma_header_t *)post->buf; + if (mr == NULL) { + goto out; + } - __gf_rdma_fill_reply_header (header, entry->rpchdr, reply_info, - peer->send_count); + for (i = 0; i < count; i++) { + ibv_dereg_mr (mr[i]); + } - header->rm_type = hton32 (GF_RDMA_NOMSG); +out: + return; +} - payload_size = iov_length (entry->rpchdr, entry->rpchdr_count) + - iov_length (entry->proghdr, entry->proghdr_count); - /* encode reply chunklist */ - buf = (char *)&header->rm_body.rm_chunks[2]; - ret = __gf_rdma_reply_encode_write_chunks (peer, payload_size, post, - reply_info, - (uint32_t **)&buf); - if (ret == -1) { - gf_log (GF_RDMA_LOG_NAME, GF_LOG_WARNING, - "encoding write chunks failed"); - ret = __gf_rdma_send_error (peer, entry, post, reply_info, - ERR_CHUNK); - goto out; - } +static int32_t +__gf_rdma_quota_put (gf_rdma_peer_t *peer) +{ + int32_t ret = 0; - gf_rdma_post_ref (post); + peer->quota++; + ret = peer->quota; - for (i = 0; i < entry->rpchdr_count; i++) { - vector[count++] = entry->rpchdr[i]; + if (!list_empty (&peer->ioq)) { + ret = __gf_rdma_ioq_churn (peer); } - for (i = 0; i < entry->proghdr_count; i++) { - vector[count++] = entry->proghdr[i]; - } + return ret; +} - ret = __gf_rdma_do_gf_rdma_write (peer, post, vector, count, - entry->iobref, reply_info); - if (ret == -1) { - gf_log (GF_RDMA_LOG_NAME, GF_LOG_WARNING, - "rdma write to peer (%s) failed", - peer->trans->peerinfo.identifier); - gf_rdma_post_unref (post); - goto out; - } - ret = gf_rdma_post_send (peer->qp, post, (buf - post->buf)); - if (ret) { - gf_log (GF_RDMA_LOG_NAME, GF_LOG_WARNING, - "posting a send request to client (%s) failed with " - "ret = %d (%s)", peer->trans->peerinfo.identifier, ret, - (ret > 0) ? strerror (ret) : ""); - ret = -1; - gf_rdma_post_unref (post); - } else { - ret = payload_size; +static int32_t +gf_rdma_quota_put (gf_rdma_peer_t *peer) +{ + int32_t ret = 0; + gf_rdma_private_t *priv = NULL; + + priv = peer->trans->private; + pthread_mutex_lock (&priv->write_mutex); + { + ret = __gf_rdma_quota_put (peer); } + pthread_mutex_unlock (&priv->write_mutex); -out: return ret; } -int32_t -__gf_rdma_send_reply_type_msg (gf_rdma_peer_t *peer, gf_rdma_ioq_t *entry, - gf_rdma_post_t *post, - gf_rdma_reply_info_t *reply_info) +/* to be called with priv->mutex held */ +void +__gf_rdma_request_context_destroy (gf_rdma_request_context_t *context) { - gf_rdma_header_t *header = NULL; - int32_t send_size = 0, ret = 0; - char *ptr = NULL; - uint32_t payload_size = 0; - - send_size = iov_length (entry->rpchdr, entry->rpchdr_count) - + iov_length (entry->proghdr, entry->proghdr_count) - + GLUSTERFS_RDMA_MAX_HEADER_SIZE; - - if (send_size > GLUSTERFS_RDMA_INLINE_THRESHOLD) { - gf_log (GF_RDMA_LOG_NAME, GF_LOG_WARNING, - "client has provided only write chunks, but the " - "combined size of rpc and program header (%d) is " - "exceeding the size of msg that can be sent using " - "RDMA send (%d)", send_size, - GLUSTERFS_RDMA_INLINE_THRESHOLD); + gf_rdma_peer_t *peer = NULL; + gf_rdma_private_t *priv = NULL; + int32_t ret = 0; - ret = __gf_rdma_send_error (peer, entry, post, reply_info, - ERR_CHUNK); + if (context == NULL) { goto out; } - header = (gf_rdma_header_t *)post->buf; + peer = context->peer; - __gf_rdma_fill_reply_header (header, entry->rpchdr, reply_info, - peer->send_count); + __gf_rdma_deregister_mr (context->mr, context->mr_count); - payload_size = iov_length (entry->prog_payload, - entry->prog_payload_count); - ptr = (char *)&header->rm_body.rm_chunks[1]; + priv = peer->trans->private; - ret = __gf_rdma_reply_encode_write_chunks (peer, payload_size, post, - reply_info, - (uint32_t **)&ptr); - if (ret == -1) { - gf_log (GF_RDMA_LOG_NAME, GF_LOG_WARNING, - "encoding write chunks failed"); - ret = __gf_rdma_send_error (peer, entry, post, reply_info, - ERR_CHUNK); - goto out; + if (priv->connected) { + ret = __gf_rdma_quota_put (peer); + if (ret < 0) { + gf_log ("rdma", GF_LOG_DEBUG, + "failed to send " + "message"); + mem_put (context); + __gf_rdma_disconnect (peer->trans); + goto out; + } } - *(uint32_t *)ptr = 0; /* terminate reply chunklist */ - ptr += sizeof (uint32_t); + if (context->iobref != NULL) { + iobref_unref (context->iobref); + context->iobref = NULL; + } - gf_rdma_post_ref (post); + if (context->rsp_iobref != NULL) { + iobref_unref (context->rsp_iobref); + context->rsp_iobref = NULL; + } - ret = __gf_rdma_do_gf_rdma_write (peer, post, entry->prog_payload, - entry->prog_payload_count, - entry->iobref, reply_info); - if (ret == -1) { - gf_log (GF_RDMA_LOG_NAME, GF_LOG_WARNING, "rdma write to peer " - "(%s) failed", peer->trans->peerinfo.identifier); - gf_rdma_post_unref (post); + mem_put (context); + +out: + return; +} + + +void +gf_rdma_post_context_destroy (gf_rdma_post_context_t *ctx) +{ + if (ctx == NULL) { goto out; } - iov_unload (ptr, entry->rpchdr, entry->rpchdr_count); - ptr += iov_length (entry->rpchdr, entry->rpchdr_count); + __gf_rdma_deregister_mr (ctx->mr, ctx->mr_count); - iov_unload (ptr, entry->proghdr, entry->proghdr_count); - ptr += iov_length (entry->proghdr, entry->proghdr_count); + if (ctx->iobref != NULL) { + iobref_unref (ctx->iobref); + } - ret = gf_rdma_post_send (peer->qp, post, (ptr - post->buf)); - if (ret) { - gf_log (GF_RDMA_LOG_NAME, GF_LOG_WARNING, - "rdma send to client (%s) failed with ret = %d (%s)", - peer->trans->peerinfo.identifier, ret, - (ret > 0) ? strerror (ret) : ""); - gf_rdma_post_unref (post); - ret = -1; - } else { - ret = send_size + payload_size; + if (ctx->hdr_iobuf != NULL) { + iobuf_unref (ctx->hdr_iobuf); } + memset (ctx, 0, sizeof (*ctx)); out: - return ret; + return; } -void -gf_rdma_reply_info_destroy (gf_rdma_reply_info_t *reply_info) +int +gf_rdma_post_unref (gf_rdma_post_t *post) { - if (reply_info == NULL) { + int refcount = -1; + + if (post == NULL) { goto out; } - if (reply_info->wc_array != NULL) { - GF_FREE (reply_info->wc_array); - reply_info->wc_array = NULL; + pthread_mutex_lock (&post->lock); + { + refcount = --post->refcount; } + pthread_mutex_unlock (&post->lock); - mem_put (reply_info); + if (refcount == 0) { + gf_rdma_post_context_destroy (&post->ctx); + if (post->type == GF_RDMA_SEND_POST) { + gf_rdma_put_post (&post->device->sendq, post); + } else { + gf_rdma_post_recv (post->device->srq, post); + } + } out: - return; + return refcount; } -gf_rdma_reply_info_t * -gf_rdma_reply_info_alloc (gf_rdma_peer_t *peer) +int +gf_rdma_post_get_refcount (gf_rdma_post_t *post) { - gf_rdma_reply_info_t *reply_info = NULL; - gf_rdma_private_t *priv = NULL; + int refcount = -1; - priv = peer->trans->private; + if (post == NULL) { + goto out; + } - reply_info = mem_get (priv->device->reply_info_pool); - if (reply_info == NULL) { + pthread_mutex_lock (&post->lock); + { + refcount = post->refcount; + } + pthread_mutex_unlock (&post->lock); + +out: + return refcount; +} + +gf_rdma_post_t * +gf_rdma_post_ref (gf_rdma_post_t *post) +{ + if (post == NULL) { goto out; } - memset (reply_info, 0, sizeof (*reply_info)); - reply_info->pool = priv->device->reply_info_pool; + pthread_mutex_lock (&post->lock); + { + post->refcount++; + } + pthread_mutex_unlock (&post->lock); out: - return reply_info; + return post; } int32_t -__gf_rdma_ioq_churn_reply (gf_rdma_peer_t *peer, gf_rdma_ioq_t *entry, - gf_rdma_post_t *post) +__gf_rdma_ioq_churn_request (gf_rdma_peer_t *peer, gf_rdma_ioq_t *entry, + gf_rdma_post_t *post) { - gf_rdma_reply_info_t *reply_info = NULL; - int32_t ret = -1; - gf_rdma_chunktype_t type = gf_rdma_noch; + gf_rdma_chunktype_t rtype = gf_rdma_noch; + gf_rdma_chunktype_t wtype = gf_rdma_noch; + uint64_t send_size = 0; + gf_rdma_header_t *hdr = NULL; + struct rpc_msg *rpc_msg = NULL; + uint32_t *chunkptr = NULL; + char *buf = NULL; + int32_t ret = 0; + gf_rdma_private_t *priv = NULL; + gf_rdma_device_t *device = NULL; + int chunk_count = 0; + gf_rdma_request_context_t *request_ctx = NULL; + uint32_t prog_payload_length = 0, len = 0; + struct rpc_req *rpc_req = NULL; GF_VALIDATE_OR_GOTO (GF_RDMA_LOG_NAME, peer, out); GF_VALIDATE_OR_GOTO (GF_RDMA_LOG_NAME, entry, out); GF_VALIDATE_OR_GOTO (GF_RDMA_LOG_NAME, post, out); - reply_info = entry->msg.reply_info; - if (reply_info != NULL) { - type = reply_info->type; + if ((entry->msg.request.rsphdr_count != 0) + && (entry->msg.request.rsp_payload_count != 0)) { + ret = -1; + gf_log (GF_RDMA_LOG_NAME, GF_LOG_WARNING, + "both write-chunklist and reply-chunk cannot be " + "present"); + goto out; } - switch (type) { - case gf_rdma_noch: - ret = __gf_rdma_send_reply_inline (peer, entry, post, - reply_info); - if (ret < 0) { - gf_log (GF_RDMA_LOG_NAME, GF_LOG_WARNING, - "failed to send reply to peer (%s) as an " - "inlined rdma msg", - peer->trans->peerinfo.identifier); - } - break; + post->ctx.is_request = 1; + priv = peer->trans->private; + device = priv->device; - case gf_rdma_replych: - ret = __gf_rdma_send_reply_type_nomsg (peer, entry, post, - reply_info); - if (ret < 0) { - gf_log (GF_RDMA_LOG_NAME, GF_LOG_WARNING, - "failed to send reply to peer (%s) as " - "RDMA_NOMSG", peer->trans->peerinfo.identifier); - } - break; + hdr = (gf_rdma_header_t *)post->buf; - case gf_rdma_writech: - ret = __gf_rdma_send_reply_type_msg (peer, entry, post, - reply_info); - if (ret < 0) { - gf_log (GF_RDMA_LOG_NAME, GF_LOG_WARNING, - "failed to send reply with write chunks " - "to peer (%s)", - peer->trans->peerinfo.identifier); - } - break; + send_size = iov_length (entry->rpchdr, entry->rpchdr_count) + + iov_length (entry->proghdr, entry->proghdr_count) + + GLUSTERFS_RDMA_MAX_HEADER_SIZE; - default: - gf_log (GF_RDMA_LOG_NAME, GF_LOG_WARNING, - "invalid chunktype (%d) specified for sending reply " - " (peer:%s)", type, peer->trans->peerinfo.identifier); - break; + if (entry->prog_payload_count != 0) { + prog_payload_length + = iov_length (entry->prog_payload, + entry->prog_payload_count); } - if (reply_info != NULL) { - gf_rdma_reply_info_destroy (reply_info); + if (send_size > GLUSTERFS_RDMA_INLINE_THRESHOLD) { + rtype = gf_rdma_areadch; + } else if ((send_size + prog_payload_length) + < GLUSTERFS_RDMA_INLINE_THRESHOLD) { + rtype = gf_rdma_noch; + } else if (entry->prog_payload_count != 0) { + rtype = gf_rdma_readch; } -out: - return ret; -} + if (entry->msg.request.rsphdr_count != 0) { + wtype = gf_rdma_replych; + } else if (entry->msg.request.rsp_payload_count != 0) { + wtype = gf_rdma_writech; + } -int32_t -__gf_rdma_ioq_churn_entry (gf_rdma_peer_t *peer, gf_rdma_ioq_t *entry) -{ - int32_t ret = 0, quota = 0; - gf_rdma_private_t *priv = NULL; - gf_rdma_device_t *device = NULL; - gf_rdma_options_t *options = NULL; - gf_rdma_post_t *post = NULL; + if (rtype == gf_rdma_readch) { + chunk_count += entry->prog_payload_count; + } else if (rtype == gf_rdma_areadch) { + chunk_count += entry->rpchdr_count; + chunk_count += entry->proghdr_count; + } - priv = peer->trans->private; - options = &priv->options; - device = priv->device; + if (wtype == gf_rdma_writech) { + chunk_count += entry->msg.request.rsp_payload_count; + } else if (wtype == gf_rdma_replych) { + chunk_count += entry->msg.request.rsphdr_count; + } - quota = __gf_rdma_quota_get (peer); - if (quota > 0) { - post = gf_rdma_get_post (&device->sendq); - if (post == NULL) { - post = gf_rdma_new_post (device, - (options->send_size + 2048), - GF_RDMA_SEND_POST); - } + if (chunk_count > GF_RDMA_MAX_SEGMENTS) { + ret = -1; + gf_log (GF_RDMA_LOG_NAME, GF_LOG_WARNING, + "chunk count(%d) exceeding maximum allowed RDMA " + "segment count(%d)", chunk_count, GF_RDMA_MAX_SEGMENTS); + goto out; + } - if (post == NULL) { + if ((wtype != gf_rdma_noch) || (rtype != gf_rdma_noch)) { + request_ctx = mem_get (device->request_ctx_pool); + if (request_ctx == NULL) { ret = -1; - gf_log_callingfn (GF_RDMA_LOG_NAME, GF_LOG_WARNING, - "not able to get a post to send msg"); goto out; } - if (entry->is_request) { - ret = __gf_rdma_ioq_churn_request (peer, entry, post); - if (ret < 0) { - gf_log (GF_RDMA_LOG_NAME, GF_LOG_WARNING, - "failed to process request ioq entry " - "to peer(%s)", - peer->trans->peerinfo.identifier); - } - } else { - ret = __gf_rdma_ioq_churn_reply (peer, entry, post); - if (ret < 0) { - gf_log (GF_RDMA_LOG_NAME, GF_LOG_WARNING, - "failed to process reply ioq entry " - "to peer (%s)", - peer->trans->peerinfo.identifier); - } + memset (request_ctx, 0, sizeof (*request_ctx)); + + request_ctx->pool = device->request_ctx_pool; + request_ctx->peer = peer; + + entry->msg.request.rpc_req->conn_private = request_ctx; + + if (entry->msg.request.rsp_iobref != NULL) { + request_ctx->rsp_iobref + = iobref_ref (entry->msg.request.rsp_iobref); } + } + + rpc_msg = (struct rpc_msg *) entry->rpchdr[0].iov_base; + + hdr->rm_xid = rpc_msg->rm_xid; /* no need of hton32(rpc_msg->rm_xid), + * since rpc_msg->rm_xid is already + * hton32ed value of actual xid + */ + hdr->rm_vers = hton32 (GF_RDMA_VERSION); + hdr->rm_credit = hton32 (peer->send_count); + if (rtype == gf_rdma_areadch) { + hdr->rm_type = hton32 (GF_RDMA_NOMSG); + } else { + hdr->rm_type = hton32 (GF_RDMA_MSG); + } + chunkptr = &hdr->rm_body.rm_chunks[0]; + if (rtype != gf_rdma_noch) { + ret = __gf_rdma_create_read_chunks (peer, entry, rtype, + &chunkptr, + request_ctx); if (ret != 0) { - __gf_rdma_ioq_entry_free (entry); + gf_log (GF_RDMA_LOG_NAME, GF_LOG_WARNING, + "creation of read chunks failed"); + goto out; } } else { - ret = 0; + *chunkptr++ = 0; /* no read chunks */ } -out: - return ret; -} + if (wtype != gf_rdma_noch) { + ret = __gf_rdma_create_write_chunks (peer, entry, wtype, + &chunkptr, + request_ctx); + if (ret != 0) { + gf_log (GF_RDMA_LOG_NAME, GF_LOG_WARNING, + "creation of write/reply chunk failed"); + goto out; + } + } else { + *chunkptr++ = 0; /* no write chunks */ + *chunkptr++ = 0; /* no reply chunk */ + } + buf = (char *)chunkptr; -static int32_t -__gf_rdma_ioq_churn (gf_rdma_peer_t *peer) -{ - gf_rdma_ioq_t *entry = NULL; - int32_t ret = 0; + if (rtype != gf_rdma_areadch) { + iov_unload (buf, entry->rpchdr, entry->rpchdr_count); + buf += iov_length (entry->rpchdr, entry->rpchdr_count); - while (!list_empty (&peer->ioq)) - { - /* pick next entry */ - entry = peer->ioq_next; + iov_unload (buf, entry->proghdr, entry->proghdr_count); + buf += iov_length (entry->proghdr, entry->proghdr_count); - ret = __gf_rdma_ioq_churn_entry (peer, entry); + if (rtype != gf_rdma_readch) { + iov_unload (buf, entry->prog_payload, + entry->prog_payload_count); + buf += iov_length (entry->prog_payload, + entry->prog_payload_count); + } + } - if (ret <= 0) - break; + len = buf - post->buf; + + gf_rdma_post_ref (post); + + ret = gf_rdma_post_send (peer->qp, post, len); + if (!ret) { + ret = len; + } else { + gf_log (GF_RDMA_LOG_NAME, GF_LOG_WARNING, + "gf_rdma_post_send (to %s) failed with ret = %d (%s)", + peer->trans->peerinfo.identifier, ret, + (ret > 0) ? strerror (ret) : ""); + gf_rdma_post_unref (post); + __gf_rdma_disconnect (peer->trans); + ret = -1; } - /* - list_for_each_entry_safe (entry, dummy, &peer->ioq, list) { - ret = __gf_rdma_ioq_churn_entry (peer, entry); - if (ret <= 0) { - break; - } - } - */ +out: + if (ret == -1) { + rpc_req = entry->msg.request.rpc_req; + + if (request_ctx != NULL) { + __gf_rdma_request_context_destroy (rpc_req->conn_private); + } + + rpc_req->conn_private = NULL; + } return ret; } -static int32_t -gf_rdma_writev (rpc_transport_t *this, gf_rdma_ioq_t *entry) +static inline void +__gf_rdma_fill_reply_header (gf_rdma_header_t *header, struct iovec *rpchdr, + gf_rdma_reply_info_t *reply_info, int credits) { - int32_t ret = 0, need_append = 1; - gf_rdma_private_t *priv = NULL; - gf_rdma_peer_t *peer = NULL; + struct rpc_msg *rpc_msg = NULL; - priv = this->private; - pthread_mutex_lock (&priv->write_mutex); - { - if (!priv->connected) { - gf_log (this->name, GF_LOG_WARNING, - "rdma is not connected to peer (%s)", - this->peerinfo.identifier); - ret = -1; - goto unlock; - } + if (reply_info != NULL) { + header->rm_xid = hton32 (reply_info->rm_xid); + } else { + rpc_msg = rpchdr[0].iov_base; /* assume rpchdr contains + * only one vector. + * (which is true) + */ + header->rm_xid = rpc_msg->rm_xid; + } - peer = &priv->peer; - if (list_empty (&peer->ioq)) { - ret = __gf_rdma_ioq_churn_entry (peer, entry); - if (ret != 0) { - need_append = 0; + header->rm_type = hton32 (GF_RDMA_MSG); + header->rm_vers = hton32 (GF_RDMA_VERSION); + header->rm_credit = hton32 (credits); - if (ret < 0) { - gf_log (this->name, GF_LOG_WARNING, - "processing ioq entry destined " - "to (%s) failed", - this->peerinfo.identifier); - } - } - } + header->rm_body.rm_chunks[0] = 0; /* no read chunks */ + header->rm_body.rm_chunks[1] = 0; /* no write chunks */ + header->rm_body.rm_chunks[2] = 0; /* no reply chunks */ - if (need_append) { - list_add_tail (&entry->list, &peer->ioq); - } - } -unlock: - pthread_mutex_unlock (&priv->write_mutex); - return ret; + return; } -gf_rdma_ioq_t * -gf_rdma_ioq_new (rpc_transport_t *this, rpc_transport_data_t *data) +int32_t +__gf_rdma_send_reply_inline (gf_rdma_peer_t *peer, gf_rdma_ioq_t *entry, + gf_rdma_post_t *post, + gf_rdma_reply_info_t *reply_info) { - gf_rdma_ioq_t *entry = NULL; - int count = 0, i = 0; - rpc_transport_msg_t *msg = NULL; - gf_rdma_private_t *priv = NULL; - - if ((data == NULL) || (this == NULL)) { - goto out; - } + gf_rdma_header_t *header = NULL; + int32_t send_size = 0, ret = 0; + char *buf = NULL; - priv = this->private; + send_size = iov_length (entry->rpchdr, entry->rpchdr_count) + + iov_length (entry->proghdr, entry->proghdr_count) + + iov_length (entry->prog_payload, entry->prog_payload_count) + + sizeof (gf_rdma_header_t); /* + * remember, no chunklists in the + * reply + */ - entry = mem_get (priv->device->ioq_pool); - if (entry == NULL) { + if (send_size > GLUSTERFS_RDMA_INLINE_THRESHOLD) { + ret = __gf_rdma_send_error (peer, entry, post, reply_info, + ERR_CHUNK); + gf_log (GF_RDMA_LOG_NAME, GF_LOG_WARNING, + "msg size (%d) is greater than maximum size " + "of msg that can be sent inlined (%d)", + send_size, GLUSTERFS_RDMA_INLINE_THRESHOLD); goto out; } - memset (entry, 0, sizeof (*entry)); - entry->pool = priv->device->ioq_pool; - - if (data->is_request) { - msg = &data->data.req.msg; - if (data->data.req.rsp.rsphdr_count != 0) { - for (i = 0; i < data->data.req.rsp.rsphdr_count; i++) { - entry->msg.request.rsphdr_vec[i] - = data->data.req.rsp.rsphdr[i]; - } - - entry->msg.request.rsphdr_count = - data->data.req.rsp.rsphdr_count; - } - if (data->data.req.rsp.rsp_payload_count != 0) { - for (i = 0; i < data->data.req.rsp.rsp_payload_count; - i++) { - entry->msg.request.rsp_payload[i] - = data->data.req.rsp.rsp_payload[i]; - } + header = (gf_rdma_header_t *)post->buf; - entry->msg.request.rsp_payload_count = - data->data.req.rsp.rsp_payload_count; - } + __gf_rdma_fill_reply_header (header, entry->rpchdr, reply_info, + peer->send_count); - entry->msg.request.rpc_req = data->data.req.rpc_req; + buf = (char *)&header->rm_body.rm_chunks[3]; - if (data->data.req.rsp.rsp_iobref != NULL) { - entry->msg.request.rsp_iobref - = iobref_ref (data->data.req.rsp.rsp_iobref); - } - } else { - msg = &data->data.reply.msg; - entry->msg.reply_info = data->data.reply.private; + if (entry->rpchdr_count != 0) { + iov_unload (buf, entry->rpchdr, entry->rpchdr_count); + buf += iov_length (entry->rpchdr, entry->rpchdr_count); } - entry->is_request = data->is_request; - - count = msg->rpchdrcount + msg->proghdrcount + msg->progpayloadcount; - - GF_ASSERT (count <= MAX_IOVEC); - - if (msg->rpchdr != NULL) { - memcpy (&entry->rpchdr[0], msg->rpchdr, - sizeof (struct iovec) * msg->rpchdrcount); - entry->rpchdr_count = msg->rpchdrcount; + if (entry->proghdr_count != 0) { + iov_unload (buf, entry->proghdr, entry->proghdr_count); + buf += iov_length (entry->proghdr, entry->proghdr_count); } - if (msg->proghdr != NULL) { - memcpy (&entry->proghdr[0], msg->proghdr, - sizeof (struct iovec) * msg->proghdrcount); - entry->proghdr_count = msg->proghdrcount; + if (entry->prog_payload_count != 0) { + iov_unload (buf, entry->prog_payload, + entry->prog_payload_count); + buf += iov_length (entry->prog_payload, + entry->prog_payload_count); } - if (msg->progpayload != NULL) { - memcpy (&entry->prog_payload[0], msg->progpayload, - sizeof (struct iovec) * msg->progpayloadcount); - entry->prog_payload_count = msg->progpayloadcount; - } + gf_rdma_post_ref (post); - if (msg->iobref != NULL) { - entry->iobref = iobref_ref (msg->iobref); + ret = gf_rdma_post_send (peer->qp, post, (buf - post->buf)); + if (!ret) { + ret = send_size; + } else { + gf_log (GF_RDMA_LOG_NAME, GF_LOG_WARNING, + "posting send (to %s) failed with ret = %d (%s)", + peer->trans->peerinfo.identifier, ret, + (ret > 0) ? strerror (ret) : ""); + gf_rdma_post_unref (post); + __gf_rdma_disconnect (peer->trans); + ret = -1; } - INIT_LIST_HEAD (&entry->list); - out: - return entry; + return ret; } int32_t -gf_rdma_submit_request (rpc_transport_t *this, rpc_transport_req_t *req) +__gf_rdma_reply_encode_write_chunks (gf_rdma_peer_t *peer, + uint32_t payload_size, + gf_rdma_post_t *post, + gf_rdma_reply_info_t *reply_info, + uint32_t **ptr) { - int32_t ret = 0; - gf_rdma_ioq_t *entry = NULL; - rpc_transport_data_t data = {0, }; + uint32_t chunk_size = 0; + int32_t ret = -1; + gf_rdma_write_array_t *target_array = NULL; + int i = 0; - if (req == NULL) { - goto out; - } + target_array = (gf_rdma_write_array_t *)*ptr; - data.is_request = 1; - data.data.req = *req; + for (i = 0; i < reply_info->wc_array->wc_nchunks; i++) { + chunk_size += + reply_info->wc_array->wc_array[i].wc_target.rs_length; + } - entry = gf_rdma_ioq_new (this, &data); - if (entry == NULL) { - gf_log (this->name, GF_LOG_WARNING, - "getting a new ioq entry failed (peer:%s)", - this->peerinfo.identifier); + if (chunk_size < payload_size) { + gf_log (GF_RDMA_LOG_NAME, GF_LOG_DEBUG, + "length of payload (%d) is exceeding the total " + "write chunk length (%d)", payload_size, chunk_size); goto out; } - ret = gf_rdma_writev (this, entry); + target_array->wc_discrim = hton32 (1); + for (i = 0; (i < reply_info->wc_array->wc_nchunks) + && (payload_size != 0); + i++) { + target_array->wc_array[i].wc_target.rs_offset + = hton64 (reply_info->wc_array->wc_array[i].wc_target.rs_offset); - if (ret > 0) { - ret = 0; - } else if (ret < 0) { - gf_log (GF_RDMA_LOG_NAME, GF_LOG_WARNING, - "sending request to peer (%s) failed", - this->peerinfo.identifier); - rpc_transport_disconnect (this); + target_array->wc_array[i].wc_target.rs_length + = hton32 (min (payload_size, + reply_info->wc_array->wc_array[i].wc_target.rs_length)); } + target_array->wc_nchunks = hton32 (i); + target_array->wc_array[i].wc_target.rs_handle = 0; /* terminate + chunklist */ + + ret = 0; + + *ptr = &target_array->wc_array[i].wc_target.rs_length; out: return ret; } -int32_t -gf_rdma_submit_reply (rpc_transport_t *this, rpc_transport_reply_t *reply) + +inline int32_t +__gf_rdma_register_local_mr_for_rdma (gf_rdma_peer_t *peer, + struct iovec *vector, int count, + gf_rdma_post_context_t *ctx) { - int32_t ret = 0; - gf_rdma_ioq_t *entry = NULL; - rpc_transport_data_t data = {0, }; + int i = 0; + int32_t ret = -1; + gf_rdma_private_t *priv = NULL; + gf_rdma_device_t *device = NULL; - if (reply == NULL) { - goto out; - } + GF_VALIDATE_OR_GOTO (GF_RDMA_LOG_NAME, ctx, out); + GF_VALIDATE_OR_GOTO (GF_RDMA_LOG_NAME, vector, out); - data.data.reply = *reply; + priv = peer->trans->private; + device = priv->device; - entry = gf_rdma_ioq_new (this, &data); - if (entry == NULL) { - gf_log (this->name, GF_LOG_WARNING, - "getting a new ioq entry failed (peer:%s)", - this->peerinfo.identifier); - goto out; - } + for (i = 0; i < count; i++) { + /* what if the memory is registered more than once? + * Assume that a single write buffer is passed to afr, which + * then passes it to its children. If more than one children + * happen to use rdma, then the buffer is registered more than + * once. + * Ib-verbs specification says that multiple registrations of + * same memory location is allowed. Refer to 10.6.3.8 of + * Infiniband Architecture Specification Volume 1 + * (Release 1.2.1) + */ + ctx->mr[ctx->mr_count] = ibv_reg_mr (device->pd, + vector[i].iov_base, + vector[i].iov_len, + IBV_ACCESS_LOCAL_WRITE); + if (ctx->mr[ctx->mr_count] == NULL) { + gf_log (GF_RDMA_LOG_NAME, GF_LOG_WARNING, + "registering memory for IBV_ACCESS_LOCAL_WRITE " + "failed (%s)", strerror (errno)); + goto out; + } - ret = gf_rdma_writev (this, entry); - if (ret > 0) { - ret = 0; - } else if (ret < 0) { - gf_log (GF_RDMA_LOG_NAME, GF_LOG_WARNING, - "sending request to peer (%s) failed", - this->peerinfo.identifier); - rpc_transport_disconnect (this); + ctx->mr_count++; } + ret = 0; out: return ret; } -#if 0 -static int -gf_rdma_receive (rpc_transport_t *this, char **hdr_p, size_t *hdrlen_p, - struct iobuf **iobuf_p) +/* 1. assumes xfer_len of data is pointed by vector(s) starting from vec[*idx] + * 2. modifies vec + */ +int32_t +__gf_rdma_write (gf_rdma_peer_t *peer, gf_rdma_post_t *post, struct iovec *vec, + uint32_t xfer_len, int *idx, gf_rdma_write_chunk_t *writech) { - gf_rdma_private_t *priv = this->private; - /* TODO: return error if !priv->connected, check with locks */ - /* TODO: boundry checks for data_ptr/offset */ - char *copy_from = NULL; - gf_rdma_header_t *header = NULL; - uint32_t size1, size2, data_len = 0; - char *hdr = NULL; - struct iobuf *iobuf = NULL; - int32_t ret = 0; - - pthread_mutex_lock (&priv->recv_mutex); - { -/* - while (!priv->data_ptr) - pthread_cond_wait (&priv->recv_cond, &priv->recv_mutex); -*/ - - copy_from = priv->data_ptr + priv->data_offset; + int size = 0, num_sge = 0, i = 0; + int32_t ret = -1; + struct ibv_sge *sg_list = NULL; + struct ibv_send_wr wr = { + .opcode = IBV_WR_RDMA_WRITE, + .send_flags = IBV_SEND_SIGNALED, + }, *bad_wr; - priv->data_ptr = NULL; - data_len = priv->data_len; - pthread_cond_broadcast (&priv->recv_cond); + if ((peer == NULL) || (writech == NULL) || (idx == NULL) + || (post == NULL) || (vec == NULL) || (xfer_len == 0)) { + goto out; } - pthread_mutex_unlock (&priv->recv_mutex); - header = (gf_rdma_header_t *)copy_from; - if (strcmp (header->colonO, ":O")) { - gf_log (GF_RDMA_LOG_NAME, GF_LOG_DEBUG, - "%s: corrupt header received", this->name); - ret = -1; - goto err; + for (i = *idx; size < xfer_len; i++) { + size += vec[i].iov_len; } - size1 = ntoh32 (header->size1); - size2 = ntoh32 (header->size2); + num_sge = i - *idx; - if (data_len != (size1 + size2 + sizeof (*header))) { - gf_log (GF_RDMA_LOG_NAME, GF_LOG_DEBUG, - "%s: sizeof data read from transport is not equal " - "to the size specified in the header", - this->name); + sg_list = GF_CALLOC (num_sge, sizeof (struct ibv_sge), + gf_common_mt_sge); + if (sg_list == NULL) { ret = -1; - goto err; + goto out; } - copy_from += sizeof (*header); + for ((i = *idx), (num_sge = 0); (xfer_len != 0); i++, num_sge++) { + size = min (xfer_len, vec[i].iov_len); - if (size1) { - hdr = GF_CALLOC (1, size1, gf_common_mt_char); - if (!hdr) { - gf_log (this->name, GF_LOG_ERROR, - "unable to allocate header for peer %s", - this->peerinfo.identifier); - ret = -ENOMEM; - goto err; - } - memcpy (hdr, copy_from, size1); - copy_from += size1; - *hdr_p = hdr; - } - *hdrlen_p = size1; + sg_list [num_sge].addr = (unsigned long)vec[i].iov_base; + sg_list [num_sge].length = size; + sg_list [num_sge].lkey = post->ctx.mr[i]->lkey; - if (size2) { - iobuf = iobuf_get2 (this->ctx->iobuf_pool, size2); - if (!iobuf) { - gf_log (this->name, GF_LOG_ERROR, - "unable to allocate IO buffer for peer %s", - this->peerinfo.identifier); - ret = -ENOMEM; - goto err; - } - memcpy (iobuf->ptr, copy_from, size2); - *iobuf_p = iobuf; + xfer_len -= size; } -err: - return ret; -} -#endif - - -static void -gf_rdma_destroy_cq (rpc_transport_t *this) -{ - gf_rdma_private_t *priv = NULL; - gf_rdma_device_t *device = NULL; + *idx = i; - priv = this->private; - device = priv->device; + if (size < vec[i - 1].iov_len) { + vec[i - 1].iov_base += size; + vec[i - 1].iov_len -= size; + *idx = i - 1; + } - if (device->recv_cq) - ibv_destroy_cq (device->recv_cq); - device->recv_cq = NULL; + wr.sg_list = sg_list; + wr.num_sge = num_sge; + wr.wr_id = (unsigned long) gf_rdma_post_ref (post); + wr.wr.rdma.rkey = writech->wc_target.rs_handle; + wr.wr.rdma.remote_addr = writech->wc_target.rs_offset; - if (device->send_cq) - ibv_destroy_cq (device->send_cq); - device->send_cq = NULL; + ret = ibv_post_send(peer->qp, &wr, &bad_wr); + if (ret) { + gf_log (GF_RDMA_LOG_NAME, GF_LOG_WARNING, + "rdma write to " + "client (%s) failed with ret = %d (%s)", + peer->trans->peerinfo.identifier, ret, + (ret > 0) ? strerror (ret) : ""); + ret = -1; + } - return; + GF_FREE (sg_list); +out: + return ret; } -static int32_t -gf_rdma_create_cq (rpc_transport_t *this) +int32_t +__gf_rdma_do_gf_rdma_write (gf_rdma_peer_t *peer, gf_rdma_post_t *post, + struct iovec *vector, int count, + struct iobref *iobref, + gf_rdma_reply_info_t *reply_info) { - gf_rdma_private_t *priv = NULL; - gf_rdma_options_t *options = NULL; - gf_rdma_device_t *device = NULL; - uint64_t send_cqe = 0; - int32_t ret = 0; - struct ibv_device_attr device_attr = {{0}, }; + int i = 0, payload_idx = 0; + uint32_t payload_size = 0, xfer_len = 0; + int32_t ret = -1; - priv = this->private; - options = &priv->options; - device = priv->device; + if (count != 0) { + payload_size = iov_length (vector, count); + } - device->recv_cq = ibv_create_cq (priv->device->context, - options->recv_count * 2, - device, - device->recv_chan, - 0); - if (!device->recv_cq) { - gf_log (GF_RDMA_LOG_NAME, GF_LOG_ERROR, - "%s: creation of CQ for device %s failed", - this->name, device->device_name); - ret = -1; + if (payload_size == 0) { + ret = 0; goto out; - } else if (ibv_req_notify_cq (device->recv_cq, 0)) { - gf_log (GF_RDMA_LOG_NAME, GF_LOG_ERROR, - "%s: ibv_req_notify_cq on recv CQ of device %s failed", - this->name, device->device_name); - ret = -1; + } + + ret = __gf_rdma_register_local_mr_for_rdma (peer, vector, count, + &post->ctx); + if (ret == -1) { + gf_log (GF_RDMA_LOG_NAME, GF_LOG_WARNING, + "registering memory region for rdma failed"); goto out; } - do { - ret = ibv_query_device (priv->device->context, &device_attr); - if (ret != 0) { - gf_log (GF_RDMA_LOG_NAME, GF_LOG_ERROR, - "%s: ibv_query_device on %s returned %d (%s)", - this->name, priv->device->device_name, ret, - (ret > 0) ? strerror (ret) : ""); - ret = -1; - goto out; - } + post->ctx.iobref = iobref_ref (iobref); - send_cqe = options->send_count * 128; - send_cqe = (send_cqe > device_attr.max_cqe) - ? device_attr.max_cqe : send_cqe; + for (i = 0; (i < reply_info->wc_array->wc_nchunks) + && (payload_size != 0); + i++) { + xfer_len = min (payload_size, + reply_info->wc_array->wc_array[i].wc_target.rs_length); - /* TODO: make send_cq size dynamically adaptive */ - device->send_cq = ibv_create_cq (priv->device->context, - send_cqe, device, - device->send_chan, 0); - if (!device->send_cq) { - gf_log (GF_RDMA_LOG_NAME, GF_LOG_ERROR, - "%s: creation of send_cq for device %s failed", - this->name, device->device_name); - ret = -1; + ret = __gf_rdma_write (peer, post, vector, xfer_len, + &payload_idx, + &reply_info->wc_array->wc_array[i]); + if (ret == -1) { + gf_log (GF_RDMA_LOG_NAME, GF_LOG_WARNING, + "rdma write to client (%s) failed", + peer->trans->peerinfo.identifier); goto out; } - if (ibv_req_notify_cq (device->send_cq, 0)) { - gf_log (GF_RDMA_LOG_NAME, GF_LOG_ERROR, - "%s: ibv_req_notify_cq on send_cq for device %s" - " failed", this->name, device->device_name); - ret = -1; - goto out; - } - } while (0); + payload_size -= xfer_len; + } + ret = 0; out: - if (ret != 0) - gf_rdma_destroy_cq (this); return ret; } -static int -gf_rdma_register_peer (gf_rdma_device_t *device, int32_t qp_num, - gf_rdma_peer_t *peer) +int32_t +__gf_rdma_send_reply_type_nomsg (gf_rdma_peer_t *peer, gf_rdma_ioq_t *entry, + gf_rdma_post_t *post, + gf_rdma_reply_info_t *reply_info) { - struct _qpent *ent = NULL; - gf_rdma_qpreg_t *qpreg = NULL; - int32_t hash = 0; - int ret = -1; + gf_rdma_header_t *header = NULL; + char *buf = NULL; + uint32_t payload_size = 0; + int count = 0, i = 0; + int32_t ret = 0; + struct iovec vector[MAX_IOVEC]; - qpreg = &device->qpreg; - hash = qp_num % 42; + header = (gf_rdma_header_t *)post->buf; - pthread_mutex_lock (&qpreg->lock); - { - ent = qpreg->ents[hash].next; - while ((ent != &qpreg->ents[hash]) && (ent->qp_num != qp_num)) { - ent = ent->next; - } + __gf_rdma_fill_reply_header (header, entry->rpchdr, reply_info, + peer->send_count); - if (ent->qp_num == qp_num) { - ret = 0; - goto unlock; - } + header->rm_type = hton32 (GF_RDMA_NOMSG); - ent = (struct _qpent *) GF_CALLOC (1, sizeof (*ent), - gf_common_mt_qpent); - if (ent == NULL) { - goto unlock; - } + payload_size = iov_length (entry->rpchdr, entry->rpchdr_count) + + iov_length (entry->proghdr, entry->proghdr_count); - /* TODO: ref reg->peer */ - ent->peer = peer; - ent->next = &qpreg->ents[hash]; - ent->prev = ent->next->prev; - ent->next->prev = ent; - ent->prev->next = ent; - ent->qp_num = qp_num; - qpreg->count++; - ret = 0; - } -unlock: - pthread_mutex_unlock (&qpreg->lock); - - return ret; -} - - -static void -gf_rdma_unregister_peer (gf_rdma_device_t *device, int32_t qp_num) -{ - struct _qpent *ent = NULL; - gf_rdma_qpreg_t *qpreg = NULL; - int32_t hash = 0; + /* encode reply chunklist */ + buf = (char *)&header->rm_body.rm_chunks[2]; + ret = __gf_rdma_reply_encode_write_chunks (peer, payload_size, post, + reply_info, + (uint32_t **)&buf); + if (ret == -1) { + gf_log (GF_RDMA_LOG_NAME, GF_LOG_WARNING, + "encoding write chunks failed"); + ret = __gf_rdma_send_error (peer, entry, post, reply_info, + ERR_CHUNK); + goto out; + } - qpreg = &device->qpreg; - hash = qp_num % 42; + gf_rdma_post_ref (post); - pthread_mutex_lock (&qpreg->lock); - { - ent = qpreg->ents[hash].next; - while ((ent != &qpreg->ents[hash]) && (ent->qp_num != qp_num)) - ent = ent->next; - if (ent->qp_num != qp_num) { - pthread_mutex_unlock (&qpreg->lock); - return; - } - ent->prev->next = ent->next; - ent->next->prev = ent->prev; - /* TODO: unref reg->peer */ - GF_FREE (ent); - qpreg->count--; + for (i = 0; i < entry->rpchdr_count; i++) { + vector[count++] = entry->rpchdr[i]; } - pthread_mutex_unlock (&qpreg->lock); -} + for (i = 0; i < entry->proghdr_count; i++) { + vector[count++] = entry->proghdr[i]; + } -static gf_rdma_peer_t * -__gf_rdma_lookup_peer (gf_rdma_device_t *device, int32_t qp_num) -{ - struct _qpent *ent = NULL; - gf_rdma_peer_t *peer = NULL; - gf_rdma_qpreg_t *qpreg = NULL; - int32_t hash = 0; - - qpreg = &device->qpreg; - hash = qp_num % 42; - ent = qpreg->ents[hash].next; - while ((ent != &qpreg->ents[hash]) && (ent->qp_num != qp_num)) - ent = ent->next; + ret = __gf_rdma_do_gf_rdma_write (peer, post, vector, count, + entry->iobref, reply_info); + if (ret == -1) { + gf_log (GF_RDMA_LOG_NAME, GF_LOG_WARNING, + "rdma write to peer (%s) failed", + peer->trans->peerinfo.identifier); + gf_rdma_post_unref (post); + goto out; + } - if (ent != &qpreg->ents[hash]) { - peer = ent->peer; + ret = gf_rdma_post_send (peer->qp, post, (buf - post->buf)); + if (ret) { + gf_log (GF_RDMA_LOG_NAME, GF_LOG_WARNING, + "posting a send request to client (%s) failed with " + "ret = %d (%s)", peer->trans->peerinfo.identifier, ret, + (ret > 0) ? strerror (ret) : ""); + ret = -1; + gf_rdma_post_unref (post); + } else { + ret = payload_size; } - return peer; +out: + return ret; } -/* - static gf_rdma_peer_t * - gf_rdma_lookup_peer (gf_rdma_device_t *device, - int32_t qp_num) - { - gf_rdma_qpreg_t *qpreg = NULL; - gf_rdma_peer_t *peer = NULL; - - qpreg = &device->qpreg; - pthread_mutex_lock (&qpreg->lock); - { - peer = __gf_rdma_lookup_peer (device, qp_num); - } - pthread_mutex_unlock (&qpreg->lock); - return peer; - } -*/ +int32_t +__gf_rdma_send_reply_type_msg (gf_rdma_peer_t *peer, gf_rdma_ioq_t *entry, + gf_rdma_post_t *post, + gf_rdma_reply_info_t *reply_info) +{ + gf_rdma_header_t *header = NULL; + int32_t send_size = 0, ret = 0; + char *ptr = NULL; + uint32_t payload_size = 0; + send_size = iov_length (entry->rpchdr, entry->rpchdr_count) + + iov_length (entry->proghdr, entry->proghdr_count) + + GLUSTERFS_RDMA_MAX_HEADER_SIZE; -static void -__gf_rdma_destroy_qp (rpc_transport_t *this) -{ - gf_rdma_private_t *priv = NULL; + if (send_size > GLUSTERFS_RDMA_INLINE_THRESHOLD) { + gf_log (GF_RDMA_LOG_NAME, GF_LOG_WARNING, + "client has provided only write chunks, but the " + "combined size of rpc and program header (%d) is " + "exceeding the size of msg that can be sent using " + "RDMA send (%d)", send_size, + GLUSTERFS_RDMA_INLINE_THRESHOLD); - priv = this->private; - if (priv->peer.qp) { - gf_rdma_unregister_peer (priv->device, priv->peer.qp->qp_num); - ibv_destroy_qp (priv->peer.qp); + ret = __gf_rdma_send_error (peer, entry, post, reply_info, + ERR_CHUNK); + goto out; } - priv->peer.qp = NULL; - - return; -} + header = (gf_rdma_header_t *)post->buf; -static int32_t -gf_rdma_create_qp (rpc_transport_t *this) -{ - gf_rdma_private_t *priv = NULL; - gf_rdma_options_t *options = NULL; - gf_rdma_device_t *device = NULL; - int32_t ret = 0; - gf_rdma_peer_t *peer = NULL; + __gf_rdma_fill_reply_header (header, entry->rpchdr, reply_info, + peer->send_count); - priv = this->private; - options = &priv->options; - device = priv->device; + payload_size = iov_length (entry->prog_payload, + entry->prog_payload_count); + ptr = (char *)&header->rm_body.rm_chunks[1]; - peer = &priv->peer; + ret = __gf_rdma_reply_encode_write_chunks (peer, payload_size, post, + reply_info, + (uint32_t **)&ptr); + if (ret == -1) { + gf_log (GF_RDMA_LOG_NAME, GF_LOG_WARNING, + "encoding write chunks failed"); + ret = __gf_rdma_send_error (peer, entry, post, reply_info, + ERR_CHUNK); + goto out; + } - struct ibv_qp_init_attr init_attr = { - .send_cq = device->send_cq, - .recv_cq = device->recv_cq, - .srq = device->srq, - .cap = { - .max_send_wr = peer->send_count, - .max_recv_wr = peer->recv_count, - .max_send_sge = 2, - .max_recv_sge = 1 - }, - .qp_type = IBV_QPT_RC - }; + *(uint32_t *)ptr = 0; /* terminate reply chunklist */ + ptr += sizeof (uint32_t); - struct ibv_qp_attr attr = { - .qp_state = IBV_QPS_INIT, - .pkey_index = 0, - .port_num = options->port, - .qp_access_flags - = IBV_ACCESS_REMOTE_READ | IBV_ACCESS_REMOTE_WRITE - }; + gf_rdma_post_ref (post); - peer->qp = ibv_create_qp (device->pd, &init_attr); - if (!peer->qp) { - gf_log (GF_RDMA_LOG_NAME, - GF_LOG_CRITICAL, - "%s: could not create QP", - this->name); - ret = -1; - goto out; - } else if (ibv_modify_qp (peer->qp, &attr, - IBV_QP_STATE | - IBV_QP_PKEY_INDEX | - IBV_QP_PORT | - IBV_QP_ACCESS_FLAGS)) { - gf_log (GF_RDMA_LOG_NAME, - GF_LOG_ERROR, - "%s: failed to modify QP to INIT state", - this->name); - ret = -1; + ret = __gf_rdma_do_gf_rdma_write (peer, post, entry->prog_payload, + entry->prog_payload_count, + entry->iobref, reply_info); + if (ret == -1) { + gf_log (GF_RDMA_LOG_NAME, GF_LOG_WARNING, "rdma write to peer " + "(%s) failed", peer->trans->peerinfo.identifier); + gf_rdma_post_unref (post); goto out; } - peer->local_lid = gf_rdma_get_local_lid (device->context, - options->port); - peer->local_qpn = peer->qp->qp_num; - peer->local_psn = lrand48 () & 0xffffff; + iov_unload (ptr, entry->rpchdr, entry->rpchdr_count); + ptr += iov_length (entry->rpchdr, entry->rpchdr_count); - ret = gf_rdma_register_peer (device, peer->qp->qp_num, peer); + iov_unload (ptr, entry->proghdr, entry->proghdr_count); + ptr += iov_length (entry->proghdr, entry->proghdr_count); -out: - if (ret == -1) - __gf_rdma_destroy_qp (this); + ret = gf_rdma_post_send (peer->qp, post, (ptr - post->buf)); + if (ret) { + gf_log (GF_RDMA_LOG_NAME, GF_LOG_WARNING, + "rdma send to client (%s) failed with ret = %d (%s)", + peer->trans->peerinfo.identifier, ret, + (ret > 0) ? strerror (ret) : ""); + gf_rdma_post_unref (post); + ret = -1; + } else { + ret = send_size + payload_size; + } +out: return ret; } -static void -gf_rdma_destroy_posts (rpc_transport_t *this) +void +gf_rdma_reply_info_destroy (gf_rdma_reply_info_t *reply_info) { + if (reply_info == NULL) { + goto out; + } + if (reply_info->wc_array != NULL) { + GF_FREE (reply_info->wc_array); + reply_info->wc_array = NULL; + } + + mem_put (reply_info); +out: + return; } -static int32_t -__gf_rdma_create_posts (rpc_transport_t *this, int32_t count, int32_t size, - gf_rdma_queue_t *q, gf_rdma_post_type_t type) +gf_rdma_reply_info_t * +gf_rdma_reply_info_alloc (gf_rdma_peer_t *peer) { - int32_t i = 0; - int32_t ret = 0; - gf_rdma_private_t *priv = NULL; - gf_rdma_device_t *device = NULL; + gf_rdma_reply_info_t *reply_info = NULL; + gf_rdma_private_t *priv = NULL; - priv = this->private; - device = priv->device; + priv = peer->trans->private; - for (i=0 ; idevice->reply_info_pool); + if (reply_info == NULL) { + goto out; + } - post = gf_rdma_new_post (device, size + 2048, type); - if (!post) { - gf_log (GF_RDMA_LOG_NAME, GF_LOG_ERROR, - "%s: post creation failed", - this->name); - ret = -1; - break; - } + memset (reply_info, 0, sizeof (*reply_info)); + reply_info->pool = priv->device->reply_info_pool; - gf_rdma_put_post (q, post); - } - return ret; +out: + return reply_info; } -static int32_t -gf_rdma_create_posts (rpc_transport_t *this) +int32_t +__gf_rdma_ioq_churn_reply (gf_rdma_peer_t *peer, gf_rdma_ioq_t *entry, + gf_rdma_post_t *post) { - int32_t i = 0, ret = 0; - gf_rdma_post_t *post = NULL; - gf_rdma_private_t *priv = NULL; - gf_rdma_options_t *options = NULL; - gf_rdma_device_t *device = NULL; + gf_rdma_reply_info_t *reply_info = NULL; + int32_t ret = -1; + gf_rdma_chunktype_t type = gf_rdma_noch; - priv = this->private; - options = &priv->options; - device = priv->device; - - ret = __gf_rdma_create_posts (this, options->send_count, - options->send_size, - &device->sendq, GF_RDMA_SEND_POST); - if (!ret) - ret = __gf_rdma_create_posts (this, options->recv_count, - options->recv_size, - &device->recvq, - GF_RDMA_RECV_POST); + GF_VALIDATE_OR_GOTO (GF_RDMA_LOG_NAME, peer, out); + GF_VALIDATE_OR_GOTO (GF_RDMA_LOG_NAME, entry, out); + GF_VALIDATE_OR_GOTO (GF_RDMA_LOG_NAME, post, out); - if (!ret) { - for (i=0 ; irecv_count ; i++) { - post = gf_rdma_get_post (&device->recvq); - if (gf_rdma_post_recv (device->srq, post) != 0) { - ret = -1; - break; - } - } + reply_info = entry->msg.reply_info; + if (reply_info != NULL) { + type = reply_info->type; } - if (ret) - gf_rdma_destroy_posts (this); - - return ret; -} + switch (type) { + case gf_rdma_noch: + ret = __gf_rdma_send_reply_inline (peer, entry, post, + reply_info); + if (ret < 0) { + gf_log (GF_RDMA_LOG_NAME, GF_LOG_WARNING, + "failed to send reply to peer (%s) as an " + "inlined rdma msg", + peer->trans->peerinfo.identifier); + } + break; + case gf_rdma_replych: + ret = __gf_rdma_send_reply_type_nomsg (peer, entry, post, + reply_info); + if (ret < 0) { + gf_log (GF_RDMA_LOG_NAME, GF_LOG_WARNING, + "failed to send reply to peer (%s) as " + "RDMA_NOMSG", peer->trans->peerinfo.identifier); + } + break; -static int32_t -gf_rdma_connect_qp (rpc_transport_t *this) -{ - gf_rdma_private_t *priv = this->private; - gf_rdma_options_t *options = &priv->options; - struct ibv_qp_attr attr = { - .qp_state = IBV_QPS_RTR, - .path_mtu = options->mtu, - .dest_qp_num = priv->peer.remote_qpn, - .rq_psn = priv->peer.remote_psn, - .max_dest_rd_atomic = 1, - .min_rnr_timer = 12, - .qp_access_flags - = IBV_ACCESS_REMOTE_READ | IBV_ACCESS_REMOTE_WRITE, - .ah_attr = { - .is_global = 0, - .dlid = priv->peer.remote_lid, - .sl = 0, - .src_path_bits = 0, - .port_num = options->port + case gf_rdma_writech: + ret = __gf_rdma_send_reply_type_msg (peer, entry, post, + reply_info); + if (ret < 0) { + gf_log (GF_RDMA_LOG_NAME, GF_LOG_WARNING, + "failed to send reply with write chunks " + "to peer (%s)", + peer->trans->peerinfo.identifier); } - }; - if (ibv_modify_qp (priv->peer.qp, &attr, - IBV_QP_STATE | - IBV_QP_AV | - IBV_QP_PATH_MTU | - IBV_QP_DEST_QPN | - IBV_QP_RQ_PSN | - IBV_QP_MAX_DEST_RD_ATOMIC | - IBV_QP_MIN_RNR_TIMER)) { - gf_log (GF_RDMA_LOG_NAME, - GF_LOG_CRITICAL, - "Failed to modify QP to RTR\n"); - return -1; - } + break; - attr.qp_state = IBV_QPS_RTS; - attr.timeout = options->attr_timeout; - attr.retry_cnt = options->attr_retry_cnt; - attr.rnr_retry = options->attr_rnr_retry; - attr.sq_psn = priv->peer.local_psn; - attr.max_rd_atomic = 1; - if (ibv_modify_qp (priv->peer.qp, &attr, - IBV_QP_STATE | - IBV_QP_TIMEOUT | - IBV_QP_RETRY_CNT | - IBV_QP_RNR_RETRY | - IBV_QP_SQ_PSN | - IBV_QP_MAX_QP_RD_ATOMIC)) { - gf_log (GF_RDMA_LOG_NAME, - GF_LOG_CRITICAL, - "Failed to modify QP to RTS\n"); - return -1; + default: + gf_log (GF_RDMA_LOG_NAME, GF_LOG_WARNING, + "invalid chunktype (%d) specified for sending reply " + " (peer:%s)", type, peer->trans->peerinfo.identifier); + break; } - return 0; -} - -static int32_t -__gf_rdma_teardown (rpc_transport_t *this) -{ - gf_rdma_private_t *priv = NULL; - - priv = this->private; - __gf_rdma_destroy_qp (this); - - if (!list_empty (&priv->peer.ioq)) { - __gf_rdma_ioq_flush (&priv->peer); + if (reply_info != NULL) { + gf_rdma_reply_info_destroy (reply_info); } - - /* TODO: decrement cq size */ - return 0; +out: + return ret; } -/* - * return value: - * 0 = success (completed) - * -1 = error - * > 0 = incomplete - */ -static int -__tcp_rwv (rpc_transport_t *this, struct iovec *vector, int count, - struct iovec **pending_vector, int *pending_count, - int write) +int32_t +__gf_rdma_ioq_churn_entry (gf_rdma_peer_t *peer, gf_rdma_ioq_t *entry) { - gf_rdma_private_t *priv = NULL; - int sock = -1; - int ret = -1; - struct iovec *opvector = NULL; - int opcount = 0; - int moved = 0; - - priv = this->private; - sock = priv->sock; - opvector = vector; - opcount = count; - - while (opcount) - { - if (write) - { - ret = writev (sock, opvector, opcount); - - if (ret == 0 || (ret == -1 && errno == EAGAIN)) - { - /* done for now */ - break; - } - } - else - { - ret = readv (sock, opvector, opcount); + int32_t ret = 0, quota = 0; + gf_rdma_private_t *priv = NULL; + gf_rdma_device_t *device = NULL; + gf_rdma_options_t *options = NULL; + gf_rdma_post_t *post = NULL; - if (ret == -1 && errno == EAGAIN) - { - /* done for now */ - break; - } - } + priv = peer->trans->private; + options = &priv->options; + device = priv->device; - if (ret == 0) - { - gf_log (this->name, GF_LOG_DEBUG, - "EOF from peer %s", this->peerinfo.identifier); - opcount = -1; - errno = ENOTCONN; - break; + quota = __gf_rdma_quota_get (peer); + if (quota > 0) { + post = gf_rdma_get_post (&device->sendq); + if (post == NULL) { + post = gf_rdma_new_post (peer->trans, device, + (options->send_size + 2048), + GF_RDMA_SEND_POST); } - if (ret == -1) - { - if (errno == EINTR) - continue; - - gf_log (this->name, GF_LOG_DEBUG, - "%s failed (%s)", write ? "writev" : "readv", - strerror (errno)); - if (write && !priv->connected && - (errno == ECONNREFUSED)) - gf_log (this->name, GF_LOG_ERROR, - "possible mismatch of 'rpc-transport-type'" - " in protocol server and client. " - "check volume file"); - opcount = -1; - break; + if (post == NULL) { + ret = -1; + gf_log_callingfn (GF_RDMA_LOG_NAME, GF_LOG_WARNING, + "not able to get a post to send msg"); + goto out; } - moved = 0; - - while (moved < ret) - { - if ((ret - moved) >= opvector[0].iov_len) - { - moved += opvector[0].iov_len; - opvector++; - opcount--; - } - else - { - opvector[0].iov_len -= (ret - moved); - opvector[0].iov_base += (ret - moved); - moved += (ret - moved); + if (entry->is_request) { + ret = __gf_rdma_ioq_churn_request (peer, entry, post); + if (ret < 0) { + gf_log (GF_RDMA_LOG_NAME, GF_LOG_WARNING, + "failed to process request ioq entry " + "to peer(%s)", + peer->trans->peerinfo.identifier); } - while (opcount && !opvector[0].iov_len) - { - opvector++; - opcount--; + } else { + ret = __gf_rdma_ioq_churn_reply (peer, entry, post); + if (ret < 0) { + gf_log (GF_RDMA_LOG_NAME, GF_LOG_WARNING, + "failed to process reply ioq entry " + "to peer (%s)", + peer->trans->peerinfo.identifier); } } - } - - if (pending_vector) - *pending_vector = opvector; - if (pending_count) - *pending_count = opcount; + if (ret != 0) { + __gf_rdma_ioq_entry_free (entry); + } + } else { + ret = 0; + } - return opcount; +out: + return ret; } -static int -__tcp_readv (rpc_transport_t *this, struct iovec *vector, int count, - struct iovec **pending_vector, int *pending_count) +static int32_t +__gf_rdma_ioq_churn (gf_rdma_peer_t *peer) { - int ret = -1; + gf_rdma_ioq_t *entry = NULL; + int32_t ret = 0; + + while (!list_empty (&peer->ioq)) + { + /* pick next entry */ + entry = peer->ioq_next; + + ret = __gf_rdma_ioq_churn_entry (peer, entry); + + if (ret <= 0) + break; + } - ret = __tcp_rwv (this, vector, count, - pending_vector, pending_count, 0); + /* + list_for_each_entry_safe (entry, dummy, &peer->ioq, list) { + ret = __gf_rdma_ioq_churn_entry (peer, entry); + if (ret <= 0) { + break; + } + } + */ return ret; } -static int -__tcp_writev (rpc_transport_t *this, struct iovec *vector, int count, - struct iovec **pending_vector, int *pending_count) +static int32_t +gf_rdma_writev (rpc_transport_t *this, gf_rdma_ioq_t *entry) { - int ret = -1; + int32_t ret = 0, need_append = 1; gf_rdma_private_t *priv = NULL; + gf_rdma_peer_t *peer = NULL; priv = this->private; + pthread_mutex_lock (&priv->write_mutex); + { + if (!priv->connected) { + gf_log (this->name, GF_LOG_WARNING, + "rdma is not connected to peer (%s)", + this->peerinfo.identifier); + ret = -1; + goto unlock; + } - ret = __tcp_rwv (this, vector, count, pending_vector, - pending_count, 1); + peer = &priv->peer; + if (list_empty (&peer->ioq)) { + ret = __gf_rdma_ioq_churn_entry (peer, entry); + if (ret != 0) { + need_append = 0; - if (ret > 0) { - /* TODO: Avoid multiple calls when socket is already - registered for POLLOUT */ - priv->idx = event_select_on (this->ctx->event_pool, - priv->sock, priv->idx, -1, 1); - } else if (ret == 0) { - priv->idx = event_select_on (this->ctx->event_pool, - priv->sock, - priv->idx, -1, 0); - } + if (ret < 0) { + gf_log (this->name, GF_LOG_WARNING, + "processing ioq entry destined " + "to (%s) failed", + this->peerinfo.identifier); + } + } + } + if (need_append) { + list_add_tail (&entry->list, &peer->ioq); + } + } +unlock: + pthread_mutex_unlock (&priv->write_mutex); return ret; } -/* - * allocates new memory to hold write-chunklist. New memory is needed since - * write-chunklist will be used while sending reply and the post holding initial - * write-chunklist sent from client will be put back to srq before a pollin - * event is sent to upper layers. - */ -int32_t -gf_rdma_get_write_chunklist (char **ptr, gf_rdma_write_array_t **write_ary) +gf_rdma_ioq_t * +gf_rdma_ioq_new (rpc_transport_t *this, rpc_transport_data_t *data) { - gf_rdma_write_array_t *from = NULL, *to = NULL; - int32_t ret = -1, size = 0, i = 0; + gf_rdma_ioq_t *entry = NULL; + int count = 0, i = 0; + rpc_transport_msg_t *msg = NULL; + gf_rdma_private_t *priv = NULL; - from = (gf_rdma_write_array_t *) *ptr; - if (from->wc_discrim == 0) { - ret = 0; + if ((data == NULL) || (this == NULL)) { goto out; } - from->wc_nchunks = ntoh32 (from->wc_nchunks); - - size = sizeof (*from) - + (sizeof (gf_rdma_write_chunk_t) * from->wc_nchunks); + priv = this->private; - to = GF_CALLOC (1, size, gf_common_mt_char); - if (to == NULL) { - ret = -1; + entry = mem_get (priv->device->ioq_pool); + if (entry == NULL) { goto out; } + memset (entry, 0, sizeof (*entry)); + entry->pool = priv->device->ioq_pool; - to->wc_discrim = ntoh32 (from->wc_discrim); - to->wc_nchunks = from->wc_nchunks; - - for (i = 0; i < to->wc_nchunks; i++) { - to->wc_array[i].wc_target.rs_handle - = ntoh32 (from->wc_array[i].wc_target.rs_handle); - to->wc_array[i].wc_target.rs_length - = ntoh32 (from->wc_array[i].wc_target.rs_length); - to->wc_array[i].wc_target.rs_offset - = ntoh64 (from->wc_array[i].wc_target.rs_offset); - } + if (data->is_request) { + msg = &data->data.req.msg; + if (data->data.req.rsp.rsphdr_count != 0) { + for (i = 0; i < data->data.req.rsp.rsphdr_count; i++) { + entry->msg.request.rsphdr_vec[i] + = data->data.req.rsp.rsphdr[i]; + } - *write_ary = to; - ret = 0; - *ptr = (char *)&from->wc_array[i].wc_target.rs_handle; -out: - return ret; -} + entry->msg.request.rsphdr_count = + data->data.req.rsp.rsphdr_count; + } + if (data->data.req.rsp.rsp_payload_count != 0) { + for (i = 0; i < data->data.req.rsp.rsp_payload_count; + i++) { + entry->msg.request.rsp_payload[i] + = data->data.req.rsp.rsp_payload[i]; + } -/* - * does not allocate new memory to hold read-chunklist. New memory is not - * needed, since post is not put back to srq till we've completed all the - * rdma-reads and hence readchunk-list can point to memory held by post. - */ -int32_t -gf_rdma_get_read_chunklist (char **ptr, gf_rdma_read_chunk_t **readch) -{ - int32_t ret = -1; - gf_rdma_read_chunk_t *chunk = NULL; - int i = 0; + entry->msg.request.rsp_payload_count = + data->data.req.rsp.rsp_payload_count; + } - chunk = (gf_rdma_read_chunk_t *)*ptr; - if (chunk[0].rc_discrim == 0) { - ret = 0; - goto out; - } + entry->msg.request.rpc_req = data->data.req.rpc_req; - for (i = 0; chunk[i].rc_discrim != 0; i++) { - chunk[i].rc_discrim = ntoh32 (chunk[i].rc_discrim); - chunk[i].rc_position = ntoh32 (chunk[i].rc_position); - chunk[i].rc_target.rs_handle - = ntoh32 (chunk[i].rc_target.rs_handle); - chunk[i].rc_target.rs_length - = ntoh32 (chunk[i].rc_target.rs_length); - chunk[i].rc_target.rs_offset - = ntoh64 (chunk[i].rc_target.rs_offset); + if (data->data.req.rsp.rsp_iobref != NULL) { + entry->msg.request.rsp_iobref + = iobref_ref (data->data.req.rsp.rsp_iobref); + } + } else { + msg = &data->data.reply.msg; + entry->msg.reply_info = data->data.reply.private; } - *readch = &chunk[0]; - ret = 0; - *ptr = (char *)&chunk[i].rc_discrim; -out: - return ret; -} + entry->is_request = data->is_request; + count = msg->rpchdrcount + msg->proghdrcount + msg->progpayloadcount; -static inline int32_t -gf_rdma_decode_error_msg (gf_rdma_peer_t *peer, gf_rdma_post_t *post, - size_t bytes_in_post) -{ - gf_rdma_header_t *header = NULL; - struct iobuf *iobuf = NULL; - struct iobref *iobref = NULL; - int32_t ret = -1; - struct rpc_msg rpc_msg = {0, }; + GF_ASSERT (count <= MAX_IOVEC); - header = (gf_rdma_header_t *)post->buf; - header->rm_body.rm_error.rm_type - = ntoh32 (header->rm_body.rm_error.rm_type); - if (header->rm_body.rm_error.rm_type == ERR_VERS) { - header->rm_body.rm_error.rm_version.gf_rdma_vers_low = - ntoh32 (header->rm_body.rm_error.rm_version.gf_rdma_vers_low); - header->rm_body.rm_error.rm_version.gf_rdma_vers_high = - ntoh32 (header->rm_body.rm_error.rm_version.gf_rdma_vers_high); + if (msg->rpchdr != NULL) { + memcpy (&entry->rpchdr[0], msg->rpchdr, + sizeof (struct iovec) * msg->rpchdrcount); + entry->rpchdr_count = msg->rpchdrcount; } - rpc_msg.rm_xid = header->rm_xid; - rpc_msg.rm_direction = REPLY; - rpc_msg.rm_reply.rp_stat = MSG_DENIED; + if (msg->proghdr != NULL) { + memcpy (&entry->proghdr[0], msg->proghdr, + sizeof (struct iovec) * msg->proghdrcount); + entry->proghdr_count = msg->proghdrcount; + } - iobuf = iobuf_get2 (peer->trans->ctx->iobuf_pool, bytes_in_post); - if (iobuf == NULL) { - ret = -1; - goto out; + if (msg->progpayload != NULL) { + memcpy (&entry->prog_payload[0], msg->progpayload, + sizeof (struct iovec) * msg->progpayloadcount); + entry->prog_payload_count = msg->progpayloadcount; } - post->ctx.iobref = iobref = iobref_new (); - if (iobref == NULL) { - ret = -1; - goto out; + if (msg->iobref != NULL) { + entry->iobref = iobref_ref (msg->iobref); } - iobref_add (iobref, iobuf); - iobuf_unref (iobuf); + INIT_LIST_HEAD (&entry->list); - ret = rpc_reply_to_xdr (&rpc_msg, iobuf_ptr (iobuf), - iobuf_pagesize (iobuf), &post->ctx.vector[0]); - if (ret == -1) { - gf_log (GF_RDMA_LOG_NAME, GF_LOG_WARNING, - "Failed to create RPC reply"); +out: + return entry; +} + + +int32_t +gf_rdma_submit_request (rpc_transport_t *this, rpc_transport_req_t *req) +{ + int32_t ret = 0; + gf_rdma_ioq_t *entry = NULL; + rpc_transport_data_t data = {0, }; + + if (req == NULL) { goto out; } - post->ctx.count = 1; + data.is_request = 1; + data.data.req = *req; - iobuf = NULL; - iobref = NULL; + entry = gf_rdma_ioq_new (this, &data); + if (entry == NULL) { + gf_log (this->name, GF_LOG_WARNING, + "getting a new ioq entry failed (peer:%s)", + this->peerinfo.identifier); + goto out; + } -out: - if (ret == -1) { - if (iobuf != NULL) { - iobuf_unref (iobuf); - } + ret = gf_rdma_writev (this, entry); - if (iobref != NULL) { - iobref_unref (iobref); - } + if (ret > 0) { + ret = 0; + } else if (ret < 0) { + gf_log (GF_RDMA_LOG_NAME, GF_LOG_WARNING, + "sending request to peer (%s) failed", + this->peerinfo.identifier); + rpc_transport_disconnect (this); } - return 0; +out: + return ret; } - int32_t -gf_rdma_decode_msg (gf_rdma_peer_t *peer, gf_rdma_post_t *post, - gf_rdma_read_chunk_t **readch, size_t bytes_in_post) +gf_rdma_submit_reply (rpc_transport_t *this, rpc_transport_reply_t *reply) { - int32_t ret = -1; - gf_rdma_header_t *header = NULL; - gf_rdma_reply_info_t *reply_info = NULL; - char *ptr = NULL; - gf_rdma_write_array_t *write_ary = NULL; - size_t header_len = 0; + int32_t ret = 0; + gf_rdma_ioq_t *entry = NULL; + rpc_transport_data_t data = {0, }; - header = (gf_rdma_header_t *)post->buf; + if (reply == NULL) { + goto out; + } - ptr = (char *)&header->rm_body.rm_chunks[0]; + data.data.reply = *reply; - ret = gf_rdma_get_read_chunklist (&ptr, readch); - if (ret == -1) { - gf_log (GF_RDMA_LOG_NAME, GF_LOG_WARNING, - "cannot get read chunklist from msg"); + entry = gf_rdma_ioq_new (this, &data); + if (entry == NULL) { + gf_log (this->name, GF_LOG_WARNING, + "getting a new ioq entry failed (peer:%s)", + this->peerinfo.identifier); goto out; } - /* skip terminator of read-chunklist */ - ptr = ptr + sizeof (uint32_t); - - ret = gf_rdma_get_write_chunklist (&ptr, &write_ary); - if (ret == -1) { + ret = gf_rdma_writev (this, entry); + if (ret > 0) { + ret = 0; + } else if (ret < 0) { gf_log (GF_RDMA_LOG_NAME, GF_LOG_WARNING, - "cannot get write chunklist from msg"); - goto out; + "sending request to peer (%s) failed", + this->peerinfo.identifier); + rpc_transport_disconnect (this); } - /* skip terminator of write-chunklist */ - ptr = ptr + sizeof (uint32_t); +out: + return ret; +} - if (write_ary != NULL) { - reply_info = gf_rdma_reply_info_alloc (peer); - if (reply_info == NULL) { - gf_log (GF_RDMA_LOG_NAME, GF_LOG_WARNING, - "reply_info_alloc failed"); - ret = -1; - goto out; + +static int +gf_rdma_register_peer (gf_rdma_device_t *device, int32_t qp_num, + gf_rdma_peer_t *peer) +{ + struct _qpent *ent = NULL; + gf_rdma_qpreg_t *qpreg = NULL; + int32_t hash = 0; + int ret = -1; + + qpreg = &device->qpreg; + hash = qp_num % 42; + + pthread_mutex_lock (&qpreg->lock); + { + ent = qpreg->ents[hash].next; + while ((ent != &qpreg->ents[hash]) && (ent->qp_num != qp_num)) { + ent = ent->next; } - reply_info->type = gf_rdma_writech; - reply_info->wc_array = write_ary; - reply_info->rm_xid = header->rm_xid; - } else { - ret = gf_rdma_get_write_chunklist (&ptr, &write_ary); - if (ret == -1) { - gf_log (GF_RDMA_LOG_NAME, GF_LOG_WARNING, - "cannot get reply chunklist from msg"); - goto out; + if (ent->qp_num == qp_num) { + ret = 0; + goto unlock; } - if (write_ary != NULL) { - reply_info = gf_rdma_reply_info_alloc (peer); - if (reply_info == NULL) { - gf_log (GF_RDMA_LOG_NAME, GF_LOG_WARNING, - "reply_info_alloc_failed"); - ret = -1; - goto out; - } - - reply_info->type = gf_rdma_replych; - reply_info->wc_array = write_ary; - reply_info->rm_xid = header->rm_xid; + ent = (struct _qpent *) GF_CALLOC (1, sizeof (*ent), + gf_common_mt_qpent); + if (ent == NULL) { + goto unlock; } + + /* TODO: ref reg->peer */ + ent->peer = peer; + ent->next = &qpreg->ents[hash]; + ent->prev = ent->next->prev; + ent->next->prev = ent; + ent->prev->next = ent; + ent->qp_num = qp_num; + qpreg->count++; + ret = 0; } +unlock: + pthread_mutex_unlock (&qpreg->lock); - /* skip terminator of reply chunk */ - ptr = ptr + sizeof (uint32_t); - if (header->rm_type != GF_RDMA_NOMSG) { - header_len = (long)ptr - (long)post->buf; - post->ctx.vector[0].iov_len = (bytes_in_post - header_len); + return ret; +} - post->ctx.hdr_iobuf = iobuf_get2 (peer->trans->ctx->iobuf_pool, - (bytes_in_post - header_len)); - if (post->ctx.hdr_iobuf == NULL) { - ret = -1; - goto out; - } - post->ctx.vector[0].iov_base = iobuf_ptr (post->ctx.hdr_iobuf); - memcpy (post->ctx.vector[0].iov_base, ptr, - post->ctx.vector[0].iov_len); - post->ctx.count = 1; - } +static void +gf_rdma_unregister_peer (gf_rdma_device_t *device, int32_t qp_num) +{ + struct _qpent *ent = NULL; + gf_rdma_qpreg_t *qpreg = NULL; + int32_t hash = 0; - post->ctx.reply_info = reply_info; -out: - if (ret == -1) { - if (*readch != NULL) { - GF_FREE (*readch); - *readch = NULL; - } + qpreg = &device->qpreg; + hash = qp_num % 42; - GF_FREE (write_ary); + pthread_mutex_lock (&qpreg->lock); + { + ent = qpreg->ents[hash].next; + while ((ent != &qpreg->ents[hash]) && (ent->qp_num != qp_num)) + ent = ent->next; + if (ent->qp_num != qp_num) { + pthread_mutex_unlock (&qpreg->lock); + return; + } + ent->prev->next = ent->next; + ent->next->prev = ent->prev; + /* TODO: unref reg->peer */ + GF_FREE (ent); + qpreg->count--; } - - return ret; + pthread_mutex_unlock (&qpreg->lock); } -/* Assumes only one of either write-chunklist or a reply chunk is present */ -int32_t -gf_rdma_decode_header (gf_rdma_peer_t *peer, gf_rdma_post_t *post, - gf_rdma_read_chunk_t **readch, size_t bytes_in_post) +static gf_rdma_peer_t * +__gf_rdma_lookup_peer (gf_rdma_device_t *device, int32_t qp_num) { - int32_t ret = -1; - gf_rdma_header_t *header = NULL; - - header = (gf_rdma_header_t *)post->buf; - - header->rm_xid = ntoh32 (header->rm_xid); - header->rm_vers = ntoh32 (header->rm_vers); - header->rm_credit = ntoh32 (header->rm_credit); - header->rm_type = ntoh32 (header->rm_type); + struct _qpent *ent = NULL; + gf_rdma_peer_t *peer = NULL; + gf_rdma_qpreg_t *qpreg = NULL; + int32_t hash = 0; - switch (header->rm_type) { - case GF_RDMA_MSG: - case GF_RDMA_NOMSG: - ret = gf_rdma_decode_msg (peer, post, readch, bytes_in_post); - if (ret < 0) { - gf_log (GF_RDMA_LOG_NAME, GF_LOG_WARNING, - "cannot decode msg of type (%d)", - header->rm_type); - } + qpreg = &device->qpreg; + hash = qp_num % 42; + ent = qpreg->ents[hash].next; + while ((ent != &qpreg->ents[hash]) && (ent->qp_num != qp_num)) + ent = ent->next; - break; + if (ent != &qpreg->ents[hash]) { + peer = ent->peer; + } - case GF_RDMA_MSGP: - gf_log (GF_RDMA_LOG_NAME, GF_LOG_WARNING, - "rdma msg of msg-type GF_RDMA_MSGP should not have " - "been received"); - ret = -1; - break; + return peer; +} - case GF_RDMA_DONE: - gf_log (GF_RDMA_LOG_NAME, GF_LOG_WARNING, - "rdma msg of msg-type GF_RDMA_DONE should not have " - "been received"); - ret = -1; - break; - case GF_RDMA_ERROR: - gf_log (GF_RDMA_LOG_NAME, GF_LOG_WARNING, - "received a msg of type RDMA_ERROR"); - ret = gf_rdma_decode_error_msg (peer, post, bytes_in_post); - break; +static void +__gf_rdma_destroy_qp (rpc_transport_t *this) +{ + gf_rdma_private_t *priv = NULL; - default: - gf_log (GF_RDMA_LOG_NAME, GF_LOG_WARNING, - "unknown rdma msg-type (%d)", header->rm_type); + priv = this->private; + if (priv->peer.qp) { + gf_rdma_unregister_peer (priv->device, priv->peer.qp->qp_num); + rdma_destroy_qp (priv->peer.cm_id); } + priv->peer.qp = NULL; - return ret; + return; } -int32_t -__gf_rdma_read (gf_rdma_peer_t *peer, gf_rdma_post_t *post, struct iovec *to, - gf_rdma_read_chunk_t *readch) +static int32_t +gf_rdma_create_qp (rpc_transport_t *this) { - int32_t ret = -1; - struct ibv_sge list = {0, }; - struct ibv_send_wr wr = {0, }, *bad_wr = NULL; + gf_rdma_private_t *priv = NULL; + gf_rdma_device_t *device = NULL; + int32_t ret = 0; + gf_rdma_peer_t *peer = NULL; + char *device_name = NULL; - ret = __gf_rdma_register_local_mr_for_rdma (peer, to, 1, &post->ctx); - if (ret == -1) { - gf_log (GF_RDMA_LOG_NAME, GF_LOG_WARNING, - "registering local memory for rdma read failed"); + priv = this->private; + + peer = &priv->peer; + + device_name = (char *)ibv_get_device_name (peer->cm_id->verbs->device); + if (device_name == NULL) { + ret = -1; + gf_log (this->name, GF_LOG_WARNING, "cannot get device_name"); goto out; } - list.addr = (unsigned long) to->iov_base; - list.length = to->iov_len; - list.lkey = post->ctx.mr[post->ctx.mr_count - 1]->lkey; + device = gf_rdma_get_device (this, peer->cm_id->verbs, + device_name); + if (device == NULL) { + ret = -1; + gf_log (this->name, GF_LOG_WARNING, "cannot get device for " + "device %s", device_name); + goto out; + } - wr.wr_id = (unsigned long) gf_rdma_post_ref (post); - wr.sg_list = &list; - wr.num_sge = 1; - wr.opcode = IBV_WR_RDMA_READ; - wr.send_flags = IBV_SEND_SIGNALED; - wr.wr.rdma.remote_addr = readch->rc_target.rs_offset; - wr.wr.rdma.rkey = readch->rc_target.rs_handle; + if (priv->device == NULL) { + priv->device = device; + } - ret = ibv_post_send (peer->qp, &wr, &bad_wr); - if (ret) { - gf_log (GF_RDMA_LOG_NAME, GF_LOG_WARNING, - "rdma read from client " - "(%s) failed with ret = %d (%s)", - peer->trans->peerinfo.identifier, - ret, (ret > 0) ? strerror (ret) : ""); + struct ibv_qp_init_attr init_attr = { + .send_cq = device->send_cq, + .recv_cq = device->recv_cq, + .srq = device->srq, + .cap = { + .max_send_wr = peer->send_count, + .max_recv_wr = peer->recv_count, + .max_send_sge = 2, + .max_recv_sge = 1 + }, + .qp_type = IBV_QPT_RC + }; + + ret = rdma_create_qp(peer->cm_id, device->pd, &init_attr); + if (ret != 0) { + gf_log (peer->trans->name, GF_LOG_CRITICAL, + "%s: could not create QP (%s)", this->name, + strerror (errno)); ret = -1; - gf_rdma_post_unref (post); + goto out; } + + peer->qp = peer->cm_id->qp; + + ret = gf_rdma_register_peer (device, peer->qp->qp_num, peer); + out: + if (ret == -1) + __gf_rdma_destroy_qp (this); + return ret; } -int32_t -gf_rdma_do_reads (gf_rdma_peer_t *peer, gf_rdma_post_t *post, - gf_rdma_read_chunk_t *readch) +static int32_t +__gf_rdma_teardown (rpc_transport_t *this) { - int32_t ret = -1, i = 0, count = 0; - size_t size = 0; - char *ptr = NULL; - struct iobuf *iobuf = NULL; - gf_rdma_private_t *priv = NULL; - - priv = peer->trans->private; + gf_rdma_private_t *priv = NULL; + gf_rdma_peer_t *peer = NULL; - for (i = 0; readch[i].rc_discrim != 0; i++) { - size += readch[i].rc_target.rs_length; - } + priv = this->private; + peer = &priv->peer; - if (i == 0) { - gf_log (GF_RDMA_LOG_NAME, GF_LOG_WARNING, - "message type specified as rdma-read but there are no " - "rdma read-chunks present"); - goto out; + if (peer->cm_id->qp != NULL) { + __gf_rdma_destroy_qp (this); } - post->ctx.gf_rdma_reads = i; - - iobuf = iobuf_get2 (peer->trans->ctx->iobuf_pool, size); - if (iobuf == NULL) { - goto out; + if (!list_empty (&priv->peer.ioq)) { + __gf_rdma_ioq_flush (peer); } - if (post->ctx.iobref == NULL) { - post->ctx.iobref = iobref_new (); - if (post->ctx.iobref == NULL) { - iobuf_unref (iobuf); - goto out; - } + if (peer->cm_id != NULL) { + rdma_destroy_id (peer->cm_id); + peer->cm_id = NULL; } - iobref_add (post->ctx.iobref, iobuf); - iobuf_unref (iobuf); + /* TODO: decrement cq size */ + return 0; +} - ptr = iobuf_ptr (iobuf); - iobuf = NULL; - pthread_mutex_lock (&priv->write_mutex); - { - if (!priv->connected) { - gf_log (GF_RDMA_LOG_NAME, GF_LOG_WARNING, - "transport not connected to peer (%s), " - "not doing rdma reads", - peer->trans->peerinfo.identifier); - goto unlock; - } - - for (i = 0; readch[i].rc_discrim != 0; i++) { - count = post->ctx.count++; - post->ctx.vector[count].iov_base = ptr; - post->ctx.vector[count].iov_len - = readch[i].rc_target.rs_length; +static int32_t +gf_rdma_teardown (rpc_transport_t *this) +{ + int32_t ret = 0; + gf_rdma_private_t *priv = NULL; - ret = __gf_rdma_read (peer, post, - &post->ctx.vector[count], - &readch[i]); - if (ret == -1) { - gf_log (GF_RDMA_LOG_NAME, GF_LOG_WARNING, - "rdma read from peer (%s) failed", - peer->trans->peerinfo.identifier); - goto unlock; - } + if (this == NULL) { + goto out; + } - ptr += readch[i].rc_target.rs_length; - } + priv = this->private; - ret = 0; + pthread_mutex_lock (&priv->write_mutex); + { + ret = __gf_rdma_teardown (this); } -unlock: pthread_mutex_unlock (&priv->write_mutex); -out: - - if (ret == -1) { - if (iobuf != NULL) { - iobuf_unref (iobuf); - } - } +out: return ret; } +/* + * allocates new memory to hold write-chunklist. New memory is needed since + * write-chunklist will be used while sending reply and the post holding initial + * write-chunklist sent from client will be put back to srq before a pollin + * event is sent to upper layers. + */ int32_t -gf_rdma_pollin_notify (gf_rdma_peer_t *peer, gf_rdma_post_t *post) +gf_rdma_get_write_chunklist (char **ptr, gf_rdma_write_array_t **write_ary) { - int32_t ret = -1; - enum msg_type msg_type = 0; - struct rpc_req *rpc_req = NULL; - gf_rdma_request_context_t *request_context = NULL; - rpc_request_info_t request_info = {0, }; - gf_rdma_private_t *priv = NULL; - uint32_t *ptr = NULL; - rpc_transport_pollin_t *pollin = NULL; + gf_rdma_write_array_t *from = NULL, *to = NULL; + int32_t ret = -1, size = 0, i = 0; - if ((peer == NULL) || (post == NULL)) { + from = (gf_rdma_write_array_t *) *ptr; + if (from->wc_discrim == 0) { + ret = 0; goto out; } - if (post->ctx.iobref == NULL) { - post->ctx.iobref = iobref_new (); - if (post->ctx.iobref == NULL) { - goto out; - } + from->wc_nchunks = ntoh32 (from->wc_nchunks); - /* handling the case where both hdr and payload of - * GF_FOP_READ_CBK were received in a single iobuf - * because of server sending entire msg as inline without - * doing rdma writes. - */ - if (post->ctx.hdr_iobuf) - iobref_add (post->ctx.iobref, post->ctx.hdr_iobuf); - } + size = sizeof (*from) + + (sizeof (gf_rdma_write_chunk_t) * from->wc_nchunks); - pollin = rpc_transport_pollin_alloc (peer->trans, - post->ctx.vector, - post->ctx.count, - post->ctx.hdr_iobuf, - post->ctx.iobref, - post->ctx.reply_info); - if (pollin == NULL) { + to = GF_CALLOC (1, size, gf_common_mt_char); + if (to == NULL) { + ret = -1; goto out; } - ptr = (uint32_t *)pollin->vector[0].iov_base; - - request_info.xid = ntoh32 (*ptr); - msg_type = ntoh32 (*(ptr + 1)); - - if (msg_type == REPLY) { - ret = rpc_transport_notify (peer->trans, - RPC_TRANSPORT_MAP_XID_REQUEST, - &request_info); - if (ret == -1) { - gf_log (GF_RDMA_LOG_NAME, GF_LOG_DEBUG, - "cannot get request information from rpc " - "layer"); - goto out; - } - - rpc_req = request_info.rpc_req; - if (rpc_req == NULL) { - gf_log (GF_RDMA_LOG_NAME, GF_LOG_DEBUG, - "rpc request structure not found"); - ret = -1; - goto out; - } - - request_context = rpc_req->conn_private; - rpc_req->conn_private = NULL; - - priv = peer->trans->private; - if (request_context != NULL) { - pthread_mutex_lock (&priv->write_mutex); - { - __gf_rdma_request_context_destroy (request_context); - } - pthread_mutex_unlock (&priv->write_mutex); - } else { - gf_rdma_quota_put (peer); - } - - pollin->is_reply = 1; - } + to->wc_discrim = ntoh32 (from->wc_discrim); + to->wc_nchunks = from->wc_nchunks; - ret = rpc_transport_notify (peer->trans, RPC_TRANSPORT_MSG_RECEIVED, - pollin); - if (ret < 0) { - gf_log (GF_RDMA_LOG_NAME, GF_LOG_WARNING, - "transport_notify failed"); + for (i = 0; i < to->wc_nchunks; i++) { + to->wc_array[i].wc_target.rs_handle + = ntoh32 (from->wc_array[i].wc_target.rs_handle); + to->wc_array[i].wc_target.rs_length + = ntoh32 (from->wc_array[i].wc_target.rs_length); + to->wc_array[i].wc_target.rs_offset + = ntoh64 (from->wc_array[i].wc_target.rs_offset); } + *write_ary = to; + ret = 0; + *ptr = (char *)&from->wc_array[i].wc_target.rs_handle; out: - if (pollin != NULL) { - pollin->private = NULL; - rpc_transport_pollin_destroy (pollin); - } - return ret; } +/* + * does not allocate new memory to hold read-chunklist. New memory is not + * needed, since post is not put back to srq till we've completed all the + * rdma-reads and hence readchunk-list can point to memory held by post. + */ int32_t -gf_rdma_recv_reply (gf_rdma_peer_t *peer, gf_rdma_post_t *post) +gf_rdma_get_read_chunklist (char **ptr, gf_rdma_read_chunk_t **readch) { - int32_t ret = -1; - gf_rdma_header_t *header = NULL; - gf_rdma_reply_info_t *reply_info = NULL; - gf_rdma_write_array_t *wc_array = NULL; - int i = 0; - uint32_t *ptr = NULL; - gf_rdma_request_context_t *ctx = NULL; - rpc_request_info_t request_info = {0, }; - struct rpc_req *rpc_req = NULL; - - header = (gf_rdma_header_t *)post->buf; - reply_info = post->ctx.reply_info; + int32_t ret = -1; + gf_rdma_read_chunk_t *chunk = NULL; + int i = 0; - /* no write chunklist, just notify upper layers */ - if (reply_info == NULL) { + chunk = (gf_rdma_read_chunk_t *)*ptr; + if (chunk[0].rc_discrim == 0) { ret = 0; goto out; } - wc_array = reply_info->wc_array; + for (i = 0; chunk[i].rc_discrim != 0; i++) { + chunk[i].rc_discrim = ntoh32 (chunk[i].rc_discrim); + chunk[i].rc_position = ntoh32 (chunk[i].rc_position); + chunk[i].rc_target.rs_handle + = ntoh32 (chunk[i].rc_target.rs_handle); + chunk[i].rc_target.rs_length + = ntoh32 (chunk[i].rc_target.rs_length); + chunk[i].rc_target.rs_offset + = ntoh64 (chunk[i].rc_target.rs_offset); + } - if (header->rm_type == GF_RDMA_NOMSG) { - post->ctx.vector[0].iov_base - = (void *)(long)wc_array->wc_array[0].wc_target.rs_offset; - post->ctx.vector[0].iov_len - = wc_array->wc_array[0].wc_target.rs_length; + *readch = &chunk[0]; + ret = 0; + *ptr = (char *)&chunk[i].rc_discrim; +out: + return ret; +} - post->ctx.count = 1; - } else { - for (i = 0; i < wc_array->wc_nchunks; i++) { - post->ctx.vector[i + 1].iov_base - = (void *)(long)wc_array->wc_array[i].wc_target.rs_offset; - post->ctx.vector[i + 1].iov_len - = wc_array->wc_array[i].wc_target.rs_length; - } - post->ctx.count += wc_array->wc_nchunks; +inline int32_t +gf_rdma_decode_error_msg (gf_rdma_peer_t *peer, gf_rdma_post_t *post, + size_t bytes_in_post) +{ + gf_rdma_header_t *header = NULL; + struct iobuf *iobuf = NULL; + struct iobref *iobref = NULL; + int32_t ret = -1; + struct rpc_msg rpc_msg = {0, }; + + header = (gf_rdma_header_t *)post->buf; + header->rm_body.rm_error.rm_type + = ntoh32 (header->rm_body.rm_error.rm_type); + if (header->rm_body.rm_error.rm_type == ERR_VERS) { + header->rm_body.rm_error.rm_version.gf_rdma_vers_low = + ntoh32 (header->rm_body.rm_error.rm_version.gf_rdma_vers_low); + header->rm_body.rm_error.rm_version.gf_rdma_vers_high = + ntoh32 (header->rm_body.rm_error.rm_version.gf_rdma_vers_high); } - ptr = (uint32_t *)post->ctx.vector[0].iov_base; - request_info.xid = ntoh32 (*ptr); + rpc_msg.rm_xid = header->rm_xid; + rpc_msg.rm_direction = REPLY; + rpc_msg.rm_reply.rp_stat = MSG_DENIED; - ret = rpc_transport_notify (peer->trans, - RPC_TRANSPORT_MAP_XID_REQUEST, - &request_info); - if (ret == -1) { - gf_log (GF_RDMA_LOG_NAME, GF_LOG_WARNING, - "cannot get request information (peer:%s) from rpc " - "layer", peer->trans->peerinfo.identifier); + iobuf = iobuf_get2 (peer->trans->ctx->iobuf_pool, bytes_in_post); + if (iobuf == NULL) { + ret = -1; goto out; } - rpc_req = request_info.rpc_req; - if (rpc_req == NULL) { - gf_log (GF_RDMA_LOG_NAME, GF_LOG_WARNING, - "rpc request structure not found"); + post->ctx.iobref = iobref = iobref_new (); + if (iobref == NULL) { ret = -1; goto out; } - ctx = rpc_req->conn_private; - if ((post->ctx.iobref == NULL) && ctx->rsp_iobref) { - post->ctx.iobref = iobref_ref (ctx->rsp_iobref); - } + iobref_add (iobref, iobuf); + iobuf_unref (iobuf); - ret = 0; + ret = rpc_reply_to_xdr (&rpc_msg, iobuf_ptr (iobuf), + iobuf_pagesize (iobuf), &post->ctx.vector[0]); + if (ret == -1) { + gf_log (GF_RDMA_LOG_NAME, GF_LOG_WARNING, + "Failed to create RPC reply"); + goto out; + } - gf_rdma_reply_info_destroy (reply_info); + post->ctx.count = 1; + + iobuf = NULL; + iobref = NULL; out: - if (ret == 0) { - ret = gf_rdma_pollin_notify (peer, post); - if (ret < 0) { - gf_log (GF_RDMA_LOG_NAME, GF_LOG_WARNING, - "pollin notify failed"); + if (ret == -1) { + if (iobuf != NULL) { + iobuf_unref (iobuf); + } + + if (iobref != NULL) { + iobref_unref (iobref); } } - return ret; + return 0; } -static inline int32_t -gf_rdma_recv_request (gf_rdma_peer_t *peer, gf_rdma_post_t *post, - gf_rdma_read_chunk_t *readch) +int32_t +gf_rdma_decode_msg (gf_rdma_peer_t *peer, gf_rdma_post_t *post, + gf_rdma_read_chunk_t **readch, size_t bytes_in_post) { - int32_t ret = -1; - - if (readch != NULL) { - ret = gf_rdma_do_reads (peer, post, readch); - if (ret < 0) { - gf_log (GF_RDMA_LOG_NAME, GF_LOG_WARNING, - "rdma read from peer (%s) failed", - peer->trans->peerinfo.identifier); - } - } else { - ret = gf_rdma_pollin_notify (peer, post); - if (ret == -1) { - gf_log (GF_RDMA_LOG_NAME, GF_LOG_WARNING, - "pollin notification failed"); - } - } + int32_t ret = -1; + gf_rdma_header_t *header = NULL; + gf_rdma_reply_info_t *reply_info = NULL; + char *ptr = NULL; + gf_rdma_write_array_t *write_ary = NULL; + size_t header_len = 0; - return ret; -} + header = (gf_rdma_header_t *)post->buf; -void -gf_rdma_process_recv (gf_rdma_peer_t *peer, struct ibv_wc *wc) -{ - gf_rdma_post_t *post = NULL; - gf_rdma_read_chunk_t *readch = NULL; - int ret = -1; - uint32_t *ptr = NULL; - enum msg_type msg_type = 0; - gf_rdma_header_t *header = NULL; + ptr = (char *)&header->rm_body.rm_chunks[0]; - post = (gf_rdma_post_t *) (long) wc->wr_id; - if (post == NULL) { + ret = gf_rdma_get_read_chunklist (&ptr, readch); + if (ret == -1) { gf_log (GF_RDMA_LOG_NAME, GF_LOG_WARNING, - "no post found in successful work completion element"); + "cannot get read chunklist from msg"); goto out; } - ret = gf_rdma_decode_header (peer, post, &readch, wc->byte_len); + /* skip terminator of read-chunklist */ + ptr = ptr + sizeof (uint32_t); + + ret = gf_rdma_get_write_chunklist (&ptr, &write_ary); if (ret == -1) { gf_log (GF_RDMA_LOG_NAME, GF_LOG_WARNING, - "decoding of header failed"); + "cannot get write chunklist from msg"); goto out; } - header = (gf_rdma_header_t *)post->buf; - - switch (header->rm_type) { - case GF_RDMA_MSG: - ptr = (uint32_t *)post->ctx.vector[0].iov_base; - msg_type = ntoh32 (*(ptr + 1)); - break; + /* skip terminator of write-chunklist */ + ptr = ptr + sizeof (uint32_t); - case GF_RDMA_NOMSG: - if (readch != NULL) { - msg_type = CALL; - } else { - msg_type = REPLY; + if (write_ary != NULL) { + reply_info = gf_rdma_reply_info_alloc (peer); + if (reply_info == NULL) { + gf_log (GF_RDMA_LOG_NAME, GF_LOG_WARNING, + "reply_info_alloc failed"); + ret = -1; + goto out; } - break; - case GF_RDMA_ERROR: - if (header->rm_body.rm_error.rm_type == ERR_CHUNK) { + reply_info->type = gf_rdma_writech; + reply_info->wc_array = write_ary; + reply_info->rm_xid = header->rm_xid; + } else { + ret = gf_rdma_get_write_chunklist (&ptr, &write_ary); + if (ret == -1) { gf_log (GF_RDMA_LOG_NAME, GF_LOG_WARNING, - "peer (%s), couldn't encode or decode the msg " - "properly or write chunks were not provided " - "for replies that were bigger than " - "RDMA_INLINE_THRESHOLD (%d)", - peer->trans->peerinfo.identifier, - GLUSTERFS_RDMA_INLINE_THRESHOLD); - ret = gf_rdma_pollin_notify (peer, post); - if (ret == -1) { - gf_log (GF_RDMA_LOG_NAME, GF_LOG_DEBUG, - "pollin notification failed"); - } - goto out; - } else { - gf_log (GF_RDMA_LOG_NAME, GF_LOG_ERROR, - "an error has happened while transmission of " - "msg, disconnecting the transport"); - ret = -1; + "cannot get reply chunklist from msg"); goto out; } - default: - gf_log (GF_RDMA_LOG_NAME, GF_LOG_WARNING, - "invalid rdma msg-type (%d)", header->rm_type); - goto out; - } + if (write_ary != NULL) { + reply_info = gf_rdma_reply_info_alloc (peer); + if (reply_info == NULL) { + gf_log (GF_RDMA_LOG_NAME, GF_LOG_WARNING, + "reply_info_alloc_failed"); + ret = -1; + goto out; + } - if (msg_type == CALL) { - ret = gf_rdma_recv_request (peer, post, readch); - if (ret < 0) { - gf_log (GF_RDMA_LOG_NAME, GF_LOG_WARNING, - "receiving a request from peer (%s) failed", - peer->trans->peerinfo.identifier); + reply_info->type = gf_rdma_replych; + reply_info->wc_array = write_ary; + reply_info->rm_xid = header->rm_xid; } - } else { - ret = gf_rdma_recv_reply (peer, post); - if (ret < 0) { - gf_log (GF_RDMA_LOG_NAME, GF_LOG_WARNING, - "receiving a reply from peer (%s) failed", - peer->trans->peerinfo.identifier); + } + + /* skip terminator of reply chunk */ + ptr = ptr + sizeof (uint32_t); + if (header->rm_type != GF_RDMA_NOMSG) { + header_len = (long)ptr - (long)post->buf; + post->ctx.vector[0].iov_len = (bytes_in_post - header_len); + + post->ctx.hdr_iobuf = iobuf_get2 (peer->trans->ctx->iobuf_pool, + (bytes_in_post - header_len)); + if (post->ctx.hdr_iobuf == NULL) { + ret = -1; + goto out; } + + post->ctx.vector[0].iov_base = iobuf_ptr (post->ctx.hdr_iobuf); + memcpy (post->ctx.vector[0].iov_base, ptr, + post->ctx.vector[0].iov_len); + post->ctx.count = 1; } + post->ctx.reply_info = reply_info; out: if (ret == -1) { - rpc_transport_disconnect (peer->trans); + if (*readch != NULL) { + GF_FREE (*readch); + *readch = NULL; + } + + GF_FREE (write_ary); } - return; + return ret; } -static void * -gf_rdma_recv_completion_proc (void *data) +/* Assumes only one of either write-chunklist or a reply chunk is present */ +int32_t +gf_rdma_decode_header (gf_rdma_peer_t *peer, gf_rdma_post_t *post, + gf_rdma_read_chunk_t **readch, size_t bytes_in_post) { - struct ibv_comp_channel *chan = NULL; - gf_rdma_device_t *device = NULL;; - gf_rdma_post_t *post = NULL; - gf_rdma_peer_t *peer = NULL; - struct ibv_cq *event_cq = NULL; - struct ibv_wc wc = {0, }; - void *event_ctx = NULL; - int32_t ret = 0; - - chan = data; + int32_t ret = -1; + gf_rdma_header_t *header = NULL; - while (1) { - ret = ibv_get_cq_event (chan, &event_cq, &event_ctx); - if (ret) { - gf_log (GF_RDMA_LOG_NAME, GF_LOG_ERROR, - "ibv_get_cq_event failed, terminating recv " - "thread %d (%d)", ret, errno); - continue; - } + header = (gf_rdma_header_t *)post->buf; - device = event_ctx; + header->rm_xid = ntoh32 (header->rm_xid); + header->rm_vers = ntoh32 (header->rm_vers); + header->rm_credit = ntoh32 (header->rm_credit); + header->rm_type = ntoh32 (header->rm_type); - ret = ibv_req_notify_cq (event_cq, 0); - if (ret) { - gf_log (GF_RDMA_LOG_NAME, GF_LOG_ERROR, - "ibv_req_notify_cq on %s failed, terminating " - "recv thread: %d (%d)", - device->device_name, ret, errno); - continue; + switch (header->rm_type) { + case GF_RDMA_MSG: + case GF_RDMA_NOMSG: + ret = gf_rdma_decode_msg (peer, post, readch, bytes_in_post); + if (ret < 0) { + gf_log (GF_RDMA_LOG_NAME, GF_LOG_WARNING, + "cannot decode msg of type (%d)", + header->rm_type); } - device = (gf_rdma_device_t *) event_ctx; + break; - while ((ret = ibv_poll_cq (event_cq, 1, &wc)) > 0) { - post = (gf_rdma_post_t *) (long) wc.wr_id; + case GF_RDMA_MSGP: + gf_log (GF_RDMA_LOG_NAME, GF_LOG_WARNING, + "rdma msg of msg-type GF_RDMA_MSGP should not have " + "been received"); + ret = -1; + break; - pthread_mutex_lock (&device->qpreg.lock); - { - peer = __gf_rdma_lookup_peer (device, - wc.qp_num); + case GF_RDMA_DONE: + gf_log (GF_RDMA_LOG_NAME, GF_LOG_WARNING, + "rdma msg of msg-type GF_RDMA_DONE should not have " + "been received"); + ret = -1; + break; - /* - * keep a refcount on transport so that it - * does not get freed because of some error - * indicated by wc.status till we are done - * with usage of peer and thereby that of trans. - */ - if (peer != NULL) { - rpc_transport_ref (peer->trans); - } - } - pthread_mutex_unlock (&device->qpreg.lock); - - if (wc.status != IBV_WC_SUCCESS) { - gf_log (GF_RDMA_LOG_NAME, GF_LOG_ERROR, - "recv work request on `%s' returned " - "error (%d)", device->device_name, - wc.status); - if (peer) { - rpc_transport_unref (peer->trans); - rpc_transport_disconnect (peer->trans); - } - - if (post) { - gf_rdma_post_unref (post); - } - continue; - } - - if (peer) { - gf_rdma_process_recv (peer, &wc); - rpc_transport_unref (peer->trans); - } else { - gf_log (GF_RDMA_LOG_NAME, - GF_LOG_DEBUG, - "could not lookup peer for qp_num: %d", - wc.qp_num); - } - - gf_rdma_post_unref (post); - } + case GF_RDMA_ERROR: + gf_log (GF_RDMA_LOG_NAME, GF_LOG_WARNING, + "received a msg of type RDMA_ERROR"); + ret = gf_rdma_decode_error_msg (peer, post, bytes_in_post); + break; - if (ret < 0) { - gf_log (GF_RDMA_LOG_NAME, - GF_LOG_ERROR, - "ibv_poll_cq on `%s' returned error " - "(ret = %d, errno = %d)", - device->device_name, ret, errno); - continue; - } - ibv_ack_cq_events (event_cq, 1); + default: + gf_log (GF_RDMA_LOG_NAME, GF_LOG_WARNING, + "unknown rdma msg-type (%d)", header->rm_type); } - return NULL; + return ret; } -void -gf_rdma_handle_failed_send_completion (gf_rdma_peer_t *peer, struct ibv_wc *wc) +int32_t +__gf_rdma_read (gf_rdma_peer_t *peer, gf_rdma_post_t *post, struct iovec *to, + gf_rdma_read_chunk_t *readch) { - gf_rdma_post_t *post = NULL; - gf_rdma_device_t *device = NULL; - gf_rdma_private_t *priv = NULL; + int32_t ret = -1; + struct ibv_sge list = {0, }; + struct ibv_send_wr wr = {0, }, *bad_wr = NULL; - if (peer != NULL) { - priv = peer->trans->private; - if (priv != NULL) { - device = priv->device; - } + ret = __gf_rdma_register_local_mr_for_rdma (peer, to, 1, &post->ctx); + if (ret == -1) { + gf_log (GF_RDMA_LOG_NAME, GF_LOG_WARNING, + "registering local memory for rdma read failed"); + goto out; } + list.addr = (unsigned long) to->iov_base; + list.length = to->iov_len; + list.lkey = post->ctx.mr[post->ctx.mr_count - 1]->lkey; - post = (gf_rdma_post_t *) (long) wc->wr_id; - - gf_log (GF_RDMA_LOG_NAME, GF_LOG_WARNING, - "send work request on `%s' returned error " - "wc.status = %d, wc.vendor_err = %d, post->buf = %p, " - "wc.byte_len = %d, post->reused = %d", - (device != NULL) ? device->device_name : NULL, wc->status, - wc->vendor_err, post->buf, wc->byte_len, post->reused); - - if (wc->status == IBV_WC_RETRY_EXC_ERR) { - gf_log ("rdma", GF_LOG_ERROR, "connection between client and" - " server not working. check by running " - "'ibv_srq_pingpong'. also make sure subnet manager" - " is running (eg: 'opensm'), or check if rdma port is " - "valid (or active) by running 'ibv_devinfo'. contact " - "Gluster Support Team if the problem persists."); - } + wr.wr_id = (unsigned long) gf_rdma_post_ref (post); + wr.sg_list = &list; + wr.num_sge = 1; + wr.opcode = IBV_WR_RDMA_READ; + wr.send_flags = IBV_SEND_SIGNALED; + wr.wr.rdma.remote_addr = readch->rc_target.rs_offset; + wr.wr.rdma.rkey = readch->rc_target.rs_handle; - if (peer) { - rpc_transport_disconnect (peer->trans); + ret = ibv_post_send (peer->qp, &wr, &bad_wr); + if (ret) { + gf_log (GF_RDMA_LOG_NAME, GF_LOG_WARNING, + "rdma read from client " + "(%s) failed with ret = %d (%s)", + peer->trans->peerinfo.identifier, + ret, (ret > 0) ? strerror (ret) : ""); + ret = -1; + gf_rdma_post_unref (post); } - - return; +out: + return ret; } -void -gf_rdma_handle_successful_send_completion (gf_rdma_peer_t *peer, - struct ibv_wc *wc) +int32_t +gf_rdma_do_reads (gf_rdma_peer_t *peer, gf_rdma_post_t *post, + gf_rdma_read_chunk_t *readch) { - gf_rdma_post_t *post = NULL; - int reads = 0, ret = 0; - gf_rdma_header_t *header = NULL; - - if (wc->opcode != IBV_WC_RDMA_READ) { - goto out; - } + int32_t ret = -1, i = 0, count = 0; + size_t size = 0; + char *ptr = NULL; + struct iobuf *iobuf = NULL; + gf_rdma_private_t *priv = NULL; - post = (gf_rdma_post_t *)(long) wc->wr_id; + priv = peer->trans->private; - pthread_mutex_lock (&post->lock); - { - reads = --post->ctx.gf_rdma_reads; + for (i = 0; readch[i].rc_discrim != 0; i++) { + size += readch[i].rc_target.rs_length; } - pthread_mutex_unlock (&post->lock); - if (reads != 0) { - /* if it is not the last rdma read, we've got nothing to do */ + if (i == 0) { + gf_log (GF_RDMA_LOG_NAME, GF_LOG_WARNING, + "message type specified as rdma-read but there are no " + "rdma read-chunks present"); goto out; } - header = (gf_rdma_header_t *)post->buf; - - if (header->rm_type == GF_RDMA_NOMSG) { - post->ctx.count = 1; - post->ctx.vector[0].iov_len += post->ctx.vector[1].iov_len; - } + post->ctx.gf_rdma_reads = i; - ret = gf_rdma_pollin_notify (peer, post); - if ((ret == -1) && (peer != NULL)) { - rpc_transport_disconnect (peer->trans); + iobuf = iobuf_get2 (peer->trans->ctx->iobuf_pool, size); + if (iobuf == NULL) { + goto out; } -out: - return; -} - - -static void * -gf_rdma_send_completion_proc (void *data) -{ - struct ibv_comp_channel *chan = NULL; - gf_rdma_post_t *post = NULL; - gf_rdma_peer_t *peer = NULL; - struct ibv_cq *event_cq = NULL; - void *event_ctx = NULL; - gf_rdma_device_t *device = NULL; - struct ibv_wc wc = {0, }; - char is_request = 0; - int32_t ret = 0, quota_ret = 0; - - chan = data; - while (1) { - ret = ibv_get_cq_event (chan, &event_cq, &event_ctx); - if (ret) { - gf_log (GF_RDMA_LOG_NAME, GF_LOG_ERROR, - "ibv_get_cq_event on failed, terminating " - "send thread: %d (%d)", ret, errno); - continue; - } - - device = event_ctx; - - ret = ibv_req_notify_cq (event_cq, 0); - if (ret) { - gf_log (GF_RDMA_LOG_NAME, GF_LOG_ERROR, - "ibv_req_notify_cq on %s failed, terminating " - "send thread: %d (%d)", - device->device_name, ret, errno); - continue; - } - - while ((ret = ibv_poll_cq (event_cq, 1, &wc)) > 0) { - post = (gf_rdma_post_t *) (long) wc.wr_id; - - pthread_mutex_lock (&device->qpreg.lock); - { - peer = __gf_rdma_lookup_peer (device, wc.qp_num); - - /* - * keep a refcount on transport so that it - * does not get freed because of some error - * indicated by wc.status, till we are done - * with usage of peer and thereby that of trans. - */ - if (peer != NULL) { - rpc_transport_ref (peer->trans); - } - } - pthread_mutex_unlock (&device->qpreg.lock); - - if (wc.status != IBV_WC_SUCCESS) { - gf_rdma_handle_failed_send_completion (peer, &wc); - } else { - gf_rdma_handle_successful_send_completion (peer, - &wc); - } - - if (post) { - is_request = post->ctx.is_request; - - ret = gf_rdma_post_unref (post); - if ((ret == 0) - && (wc.status == IBV_WC_SUCCESS) - && !is_request - && (post->type == GF_RDMA_SEND_POST) - && (peer != NULL)) { - /* An GF_RDMA_RECV_POST can end up in - * gf_rdma_send_completion_proc for - * rdma-reads, and we do not take - * quota for getting an GF_RDMA_RECV_POST. - */ - - /* - * if it is request, quota is returned - * after reply has come. - */ - quota_ret = gf_rdma_quota_put (peer); - if (quota_ret < 0) { - gf_log ("rdma", GF_LOG_DEBUG, - "failed to send " - "message"); - } - } - } - - if (peer) { - rpc_transport_unref (peer->trans); - } else { - gf_log (GF_RDMA_LOG_NAME, GF_LOG_DEBUG, - "could not lookup peer for qp_num: %d", - wc.qp_num); - } - } - - if (ret < 0) { - gf_log (GF_RDMA_LOG_NAME, GF_LOG_ERROR, - "ibv_poll_cq on `%s' returned error (ret = %d," - " errno = %d)", - device->device_name, ret, errno); - continue; + if (post->ctx.iobref == NULL) { + post->ctx.iobref = iobref_new (); + if (post->ctx.iobref == NULL) { + iobuf_unref (iobuf); + goto out; } - - ibv_ack_cq_events (event_cq, 1); } - return NULL; -} - - -static void -gf_rdma_options_init (rpc_transport_t *this) -{ - gf_rdma_private_t *priv = NULL; - gf_rdma_options_t *options = NULL; - int32_t mtu = 0; - data_t *temp = NULL; - - /* TODO: validate arguments from options below */ - - priv = this->private; - options = &priv->options; - options->send_size = GLUSTERFS_RDMA_INLINE_THRESHOLD;/*this->ctx->page_size * 4; 512 KB*/ - options->recv_size = GLUSTERFS_RDMA_INLINE_THRESHOLD;/*this->ctx->page_size * 4; 512 KB*/ - options->send_count = 4096; - options->recv_count = 4096; - options->attr_timeout = GF_RDMA_TIMEOUT; - options->attr_retry_cnt = GF_RDMA_RETRY_CNT; - options->attr_rnr_retry = GF_RDMA_RNR_RETRY; - - temp = dict_get (this->options, - "transport.rdma.work-request-send-count"); - if (temp) - options->send_count = data_to_int32 (temp); - - temp = dict_get (this->options, - "transport.rdma.work-request-recv-count"); - if (temp) - options->recv_count = data_to_int32 (temp); - - temp = dict_get (this->options, "transport.rdma.attr-timeout"); - - if (temp) - options->attr_timeout = data_to_uint8 (temp); - - temp = dict_get (this->options, "transport.rdma.attr-retry-cnt"); - - if (temp) - options->attr_retry_cnt = data_to_uint8 (temp); - - temp = dict_get (this->options, "transport.rdma.attr-rnr-retry"); - - if (temp) - options->attr_rnr_retry = data_to_uint8 (temp); + iobref_add (post->ctx.iobref, iobuf); + iobuf_unref (iobuf); - options->port = 1; - temp = dict_get (this->options, - "transport.rdma.port"); - if (temp) - options->port = data_to_uint64 (temp); + ptr = iobuf_ptr (iobuf); + iobuf = NULL; - options->mtu = mtu = IBV_MTU_2048; - temp = dict_get (this->options, - "transport.rdma.mtu"); - if (temp) - mtu = data_to_int32 (temp); - switch (mtu) { - case 256: options->mtu = IBV_MTU_256; - break; - case 512: options->mtu = IBV_MTU_512; - break; - case 1024: options->mtu = IBV_MTU_1024; - break; - case 2048: options->mtu = IBV_MTU_2048; - break; - case 4096: options->mtu = IBV_MTU_4096; - break; - default: - if (temp) + pthread_mutex_lock (&priv->write_mutex); + { + if (!priv->connected) { gf_log (GF_RDMA_LOG_NAME, GF_LOG_WARNING, - "%s: unrecognized MTU value '%s', defaulting " - "to '2048'", this->name, - data_to_str (temp)); - else - gf_log (GF_RDMA_LOG_NAME, GF_LOG_TRACE, - "%s: defaulting MTU to '2048'", - this->name); - options->mtu = IBV_MTU_2048; - break; - } + "transport not connected to peer (%s), " + "not doing rdma reads", + peer->trans->peerinfo.identifier); + goto unlock; + } - temp = dict_get (this->options, - "transport.rdma.device-name"); - if (temp) - options->device_name = gf_strdup (temp->data); + for (i = 0; readch[i].rc_discrim != 0; i++) { + count = post->ctx.count++; + post->ctx.vector[count].iov_base = ptr; + post->ctx.vector[count].iov_len + = readch[i].rc_target.rs_length; - return; -} + ret = __gf_rdma_read (peer, post, + &post->ctx.vector[count], + &readch[i]); + if (ret == -1) { + gf_log (GF_RDMA_LOG_NAME, GF_LOG_WARNING, + "rdma read from peer (%s) failed", + peer->trans->peerinfo.identifier); + goto unlock; + } -static void -gf_rdma_queue_init (gf_rdma_queue_t *queue) -{ - pthread_mutex_init (&queue->lock, NULL); + ptr += readch[i].rc_target.rs_length; + } - queue->active_posts.next = &queue->active_posts; - queue->active_posts.prev = &queue->active_posts; - queue->passive_posts.next = &queue->passive_posts; - queue->passive_posts.prev = &queue->passive_posts; + ret = 0; + } +unlock: + pthread_mutex_unlock (&priv->write_mutex); +out: + + if (ret == -1) { + if (iobuf != NULL) { + iobuf_unref (iobuf); + } + } + + return ret; } -static gf_rdma_device_t * -gf_rdma_get_device (rpc_transport_t *this, struct ibv_context *ibctx) +int32_t +gf_rdma_pollin_notify (gf_rdma_peer_t *peer, gf_rdma_post_t *post) { - glusterfs_ctx_t *ctx = NULL; - gf_rdma_private_t *priv = NULL; - gf_rdma_options_t *options = NULL; - char *device_name = NULL; - uint32_t port = 0; - uint8_t active_port = 0; - int32_t ret = 0; - int32_t i = 0; - gf_rdma_device_t *trav = NULL; - - priv = this->private; - options = &priv->options; - device_name = priv->options.device_name; - ctx = this->ctx; - trav = ctx->ib; - port = priv->options.port; + int32_t ret = -1; + enum msg_type msg_type = 0; + struct rpc_req *rpc_req = NULL; + gf_rdma_request_context_t *request_context = NULL; + rpc_request_info_t request_info = {0, }; + gf_rdma_private_t *priv = NULL; + uint32_t *ptr = NULL; + rpc_transport_pollin_t *pollin = NULL; - while (trav) { - if ((!strcmp (trav->device_name, device_name)) && - (trav->port == port)) - break; - trav = trav->next; + if ((peer == NULL) || (post == NULL)) { + goto out; } - if (!trav) { - - trav = GF_CALLOC (1, sizeof (*trav), - gf_common_mt_rdma_device_t); - if (trav == NULL) { - return NULL; + if (post->ctx.iobref == NULL) { + post->ctx.iobref = iobref_new (); + if (post->ctx.iobref == NULL) { + goto out; } - priv->device = trav; - - trav->context = ibctx; + /* handling the case where both hdr and payload of + * GF_FOP_READ_CBK were received in a single iobuf + * because of server sending entire msg as inline without + * doing rdma writes. + */ + if (post->ctx.hdr_iobuf) + iobref_add (post->ctx.iobref, post->ctx.hdr_iobuf); + } - ret = ib_get_active_port (trav->context); + pollin = rpc_transport_pollin_alloc (peer->trans, + post->ctx.vector, + post->ctx.count, + post->ctx.hdr_iobuf, + post->ctx.iobref, + post->ctx.reply_info); + if (pollin == NULL) { + goto out; + } - if (ret < 0) { - if (!port) { - gf_log (GF_RDMA_LOG_NAME, GF_LOG_ERROR, - "Failed to find any active ports and " - "none specified in volume file," - " exiting"); - GF_FREE (trav); - return NULL; - } - } + ptr = (uint32_t *)pollin->vector[0].iov_base; - trav->request_ctx_pool = mem_pool_new (gf_rdma_request_context_t, - GF_RDMA_POOL_SIZE); - if (trav->request_ctx_pool == NULL) { - return NULL; - } + request_info.xid = ntoh32 (*ptr); + msg_type = ntoh32 (*(ptr + 1)); - trav->ioq_pool = mem_pool_new (gf_rdma_ioq_t, GF_RDMA_POOL_SIZE); - if (trav->ioq_pool == NULL) { - mem_pool_destroy (trav->request_ctx_pool); - return NULL; + if (msg_type == REPLY) { + ret = rpc_transport_notify (peer->trans, + RPC_TRANSPORT_MAP_XID_REQUEST, + &request_info); + if (ret == -1) { + gf_log (GF_RDMA_LOG_NAME, GF_LOG_DEBUG, + "cannot get request information from rpc " + "layer"); + goto out; } - trav->reply_info_pool = mem_pool_new (gf_rdma_reply_info_t, - GF_RDMA_POOL_SIZE); - if (trav->reply_info_pool == NULL) { - mem_pool_destroy (trav->request_ctx_pool); - mem_pool_destroy (trav->ioq_pool); - return NULL; + rpc_req = request_info.rpc_req; + if (rpc_req == NULL) { + gf_log (GF_RDMA_LOG_NAME, GF_LOG_DEBUG, + "rpc request structure not found"); + ret = -1; + goto out; } + request_context = rpc_req->conn_private; + rpc_req->conn_private = NULL; - active_port = ret; - - if (port) { - ret = ib_check_active_port (trav->context, port); - if (ret < 0) { - gf_log (GF_RDMA_LOG_NAME, GF_LOG_WARNING, - "On device %s: provided port:%u is " - "found to be offline, continuing to " - "use the same port", device_name, port); + priv = peer->trans->private; + if (request_context != NULL) { + pthread_mutex_lock (&priv->write_mutex); + { + __gf_rdma_request_context_destroy (request_context); } + pthread_mutex_unlock (&priv->write_mutex); } else { - priv->options.port = active_port; - port = active_port; - gf_log (GF_RDMA_LOG_NAME, GF_LOG_TRACE, - "Port unspecified in volume file using active " - "port: %u", port); + gf_rdma_quota_put (peer); } - trav->device_name = gf_strdup (device_name); - trav->port = port; + pollin->is_reply = 1; + } - trav->next = ctx->ib; - ctx->ib = trav; + ret = rpc_transport_notify (peer->trans, RPC_TRANSPORT_MSG_RECEIVED, + pollin); + if (ret < 0) { + gf_log (GF_RDMA_LOG_NAME, GF_LOG_WARNING, + "transport_notify failed"); + } - trav->send_chan = ibv_create_comp_channel (trav->context); - if (!trav->send_chan) { - gf_log (GF_RDMA_LOG_NAME, GF_LOG_ERROR, - "%s: could not create send completion channel", - device_name); - mem_pool_destroy (trav->ioq_pool); - mem_pool_destroy (trav->request_ctx_pool); - mem_pool_destroy (trav->reply_info_pool); - GF_FREE ((char *)trav->device_name); - GF_FREE (trav); - return NULL; - } +out: + if (pollin != NULL) { + pollin->private = NULL; + rpc_transport_pollin_destroy (pollin); + } - trav->recv_chan = ibv_create_comp_channel (trav->context); - if (!trav->recv_chan) { - mem_pool_destroy (trav->ioq_pool); - mem_pool_destroy (trav->request_ctx_pool); - mem_pool_destroy (trav->reply_info_pool); - ibv_destroy_comp_channel (trav->send_chan); - GF_FREE ((char *)trav->device_name); - GF_FREE (trav); - gf_log (GF_RDMA_LOG_NAME, GF_LOG_ERROR, - "could not create recv completion channel"); - /* TODO: cleanup current mess */ - return NULL; - } + return ret; +} - if (gf_rdma_create_cq (this) < 0) { - mem_pool_destroy (trav->ioq_pool); - mem_pool_destroy (trav->request_ctx_pool); - mem_pool_destroy (trav->reply_info_pool); - ibv_destroy_comp_channel (trav->recv_chan); - ibv_destroy_comp_channel (trav->send_chan); - GF_FREE ((char *)trav->device_name); - GF_FREE (trav); - gf_log (GF_RDMA_LOG_NAME, GF_LOG_ERROR, - "%s: could not create CQ", - this->name); - return NULL; - } - /* protection domain */ - trav->pd = ibv_alloc_pd (trav->context); +int32_t +gf_rdma_recv_reply (gf_rdma_peer_t *peer, gf_rdma_post_t *post) +{ + int32_t ret = -1; + gf_rdma_header_t *header = NULL; + gf_rdma_reply_info_t *reply_info = NULL; + gf_rdma_write_array_t *wc_array = NULL; + int i = 0; + uint32_t *ptr = NULL; + gf_rdma_request_context_t *ctx = NULL; + rpc_request_info_t request_info = {0, }; + struct rpc_req *rpc_req = NULL; - if (!trav->pd) { - mem_pool_destroy (trav->ioq_pool); - mem_pool_destroy (trav->request_ctx_pool); - mem_pool_destroy (trav->reply_info_pool); - gf_rdma_destroy_cq (this); - ibv_destroy_comp_channel (trav->recv_chan); - ibv_destroy_comp_channel (trav->send_chan); - GF_FREE ((char *)trav->device_name); - GF_FREE (trav); - gf_log (GF_RDMA_LOG_NAME, GF_LOG_ERROR, - "%s: could not allocate protection domain", - this->name); - return NULL; - } + header = (gf_rdma_header_t *)post->buf; + reply_info = post->ctx.reply_info; - struct ibv_srq_init_attr attr = { - .attr = { - .max_wr = options->recv_count, - .max_sge = 1 - } - }; - trav->srq = ibv_create_srq (trav->pd, &attr); + /* no write chunklist, just notify upper layers */ + if (reply_info == NULL) { + ret = 0; + goto out; + } - if (!trav->srq) { - mem_pool_destroy (trav->ioq_pool); - mem_pool_destroy (trav->request_ctx_pool); - mem_pool_destroy (trav->reply_info_pool); - ibv_dealloc_pd (trav->pd); - gf_rdma_destroy_cq (this); - ibv_destroy_comp_channel (trav->recv_chan); - ibv_destroy_comp_channel (trav->send_chan); - GF_FREE ((char *)trav->device_name); - GF_FREE (trav); + wc_array = reply_info->wc_array; - gf_log (GF_RDMA_LOG_NAME, GF_LOG_ERROR, - "%s: could not create SRQ", - this->name); - return NULL; + if (header->rm_type == GF_RDMA_NOMSG) { + post->ctx.vector[0].iov_base + = (void *)(long)wc_array->wc_array[0].wc_target.rs_offset; + post->ctx.vector[0].iov_len + = wc_array->wc_array[0].wc_target.rs_length; + + post->ctx.count = 1; + } else { + for (i = 0; i < wc_array->wc_nchunks; i++) { + post->ctx.vector[i + 1].iov_base + = (void *)(long)wc_array->wc_array[i].wc_target.rs_offset; + post->ctx.vector[i + 1].iov_len + = wc_array->wc_array[i].wc_target.rs_length; } - /* queue init */ - gf_rdma_queue_init (&trav->sendq); - gf_rdma_queue_init (&trav->recvq); + post->ctx.count += wc_array->wc_nchunks; + } + + ptr = (uint32_t *)post->ctx.vector[0].iov_base; + request_info.xid = ntoh32 (*ptr); + + ret = rpc_transport_notify (peer->trans, + RPC_TRANSPORT_MAP_XID_REQUEST, + &request_info); + if (ret == -1) { + gf_log (GF_RDMA_LOG_NAME, GF_LOG_WARNING, + "cannot get request information (peer:%s) from rpc " + "layer", peer->trans->peerinfo.identifier); + goto out; + } + + rpc_req = request_info.rpc_req; + if (rpc_req == NULL) { + gf_log (GF_RDMA_LOG_NAME, GF_LOG_WARNING, + "rpc request structure not found"); + ret = -1; + goto out; + } + + ctx = rpc_req->conn_private; + if ((post->ctx.iobref == NULL) && ctx->rsp_iobref) { + post->ctx.iobref = iobref_ref (ctx->rsp_iobref); + } - if (gf_rdma_create_posts (this) < 0) { - mem_pool_destroy (trav->ioq_pool); - mem_pool_destroy (trav->request_ctx_pool); - mem_pool_destroy (trav->reply_info_pool); - ibv_dealloc_pd (trav->pd); - gf_rdma_destroy_cq (this); - ibv_destroy_comp_channel (trav->recv_chan); - ibv_destroy_comp_channel (trav->send_chan); - GF_FREE ((char *)trav->device_name); - GF_FREE (trav); + ret = 0; - gf_log (GF_RDMA_LOG_NAME, GF_LOG_ERROR, - "%s: could not allocate posts", - this->name); - return NULL; + gf_rdma_reply_info_destroy (reply_info); + +out: + if (ret == 0) { + ret = gf_rdma_pollin_notify (peer, post); + if (ret < 0) { + gf_log (GF_RDMA_LOG_NAME, GF_LOG_WARNING, + "pollin notify failed"); } + } - /* completion threads */ - ret = pthread_create (&trav->send_thread, - NULL, - gf_rdma_send_completion_proc, - trav->send_chan); - if (ret) { - gf_rdma_destroy_posts (this); - mem_pool_destroy (trav->ioq_pool); - mem_pool_destroy (trav->request_ctx_pool); - mem_pool_destroy (trav->reply_info_pool); - ibv_dealloc_pd (trav->pd); - gf_rdma_destroy_cq (this); - ibv_destroy_comp_channel (trav->recv_chan); - ibv_destroy_comp_channel (trav->send_chan); - GF_FREE ((char *)trav->device_name); - GF_FREE (trav); + return ret; +} - gf_log (GF_RDMA_LOG_NAME, GF_LOG_ERROR, - "could not create send completion thread"); - return NULL; - } - ret = pthread_create (&trav->recv_thread, - NULL, - gf_rdma_recv_completion_proc, - trav->recv_chan); - if (ret) { - gf_rdma_destroy_posts (this); - mem_pool_destroy (trav->ioq_pool); - mem_pool_destroy (trav->request_ctx_pool); - mem_pool_destroy (trav->reply_info_pool); - ibv_dealloc_pd (trav->pd); - gf_rdma_destroy_cq (this); - ibv_destroy_comp_channel (trav->recv_chan); - ibv_destroy_comp_channel (trav->send_chan); - GF_FREE ((char *)trav->device_name); - GF_FREE (trav); - gf_log (GF_RDMA_LOG_NAME, GF_LOG_ERROR, - "could not create recv completion thread"); - return NULL; - } +inline int32_t +gf_rdma_recv_request (gf_rdma_peer_t *peer, gf_rdma_post_t *post, + gf_rdma_read_chunk_t *readch) +{ + int32_t ret = -1; - /* qpreg */ - pthread_mutex_init (&trav->qpreg.lock, NULL); - for (i=0; i<42; i++) { - trav->qpreg.ents[i].next = &trav->qpreg.ents[i]; - trav->qpreg.ents[i].prev = &trav->qpreg.ents[i]; + if (readch != NULL) { + ret = gf_rdma_do_reads (peer, post, readch); + if (ret < 0) { + gf_log (GF_RDMA_LOG_NAME, GF_LOG_WARNING, + "rdma read from peer (%s) failed", + peer->trans->peerinfo.identifier); + } + } else { + ret = gf_rdma_pollin_notify (peer, post); + if (ret == -1) { + gf_log (GF_RDMA_LOG_NAME, GF_LOG_WARNING, + "pollin notification failed"); } } - return trav; + + return ret; } -static int32_t -gf_rdma_init (rpc_transport_t *this) +void +gf_rdma_process_recv (gf_rdma_peer_t *peer, struct ibv_wc *wc) { - gf_rdma_private_t *priv = NULL; - gf_rdma_options_t *options = NULL; - struct ibv_device **dev_list; - struct ibv_context *ib_ctx = NULL; - int32_t ret = 0; + gf_rdma_post_t *post = NULL; + gf_rdma_read_chunk_t *readch = NULL; + int ret = -1; + uint32_t *ptr = NULL; + enum msg_type msg_type = 0; + gf_rdma_header_t *header = NULL; + gf_rdma_private_t *priv = NULL; - priv = this->private; - options = &priv->options; + post = (gf_rdma_post_t *) (long) wc->wr_id; + if (post == NULL) { + gf_log (GF_RDMA_LOG_NAME, GF_LOG_WARNING, + "no post found in successful work completion element"); + goto out; + } - ibv_fork_init (); - gf_rdma_options_init (this); + ret = gf_rdma_decode_header (peer, post, &readch, wc->byte_len); + if (ret == -1) { + gf_log (GF_RDMA_LOG_NAME, GF_LOG_WARNING, + "decoding of header failed"); + goto out; + } - { - dev_list = ibv_get_device_list (NULL); + header = (gf_rdma_header_t *)post->buf; - if (!dev_list) { - gf_log (GF_RDMA_LOG_NAME, - GF_LOG_CRITICAL, - "Failed to get IB devices"); - ret = -1; - goto cleanup; - } + priv = peer->trans->private; - if (!*dev_list) { - gf_log (GF_RDMA_LOG_NAME, - GF_LOG_CRITICAL, - "No IB devices found"); - ret = -1; - goto cleanup; + pthread_mutex_lock (&priv->write_mutex); + { + if (!priv->peer.quota_set) { + priv->peer.quota_set = 1; + + /* Initially peer.quota is set to 1 as per RFC 5666. We + * have to account for the quota used while sending + * first msg (which may or may not be returned to pool + * at this point) while deriving peer.quota from + * header->rm_credit. Hence the arithmatic below, + * instead of directly setting it to header->rm_credit. + */ + priv->peer.quota = header->rm_credit + - ( 1 - priv->peer.quota); } + } + pthread_mutex_unlock (&priv->write_mutex); - if (!options->device_name) { - if (*dev_list) { - options->device_name = - gf_strdup (ibv_get_device_name (*dev_list)); - } else { - gf_log (GF_RDMA_LOG_NAME, GF_LOG_CRITICAL, - "IB device list is empty. Check for " - "'ib_uverbs' module"); - return -1; - goto cleanup; - } - } + switch (header->rm_type) { + case GF_RDMA_MSG: + ptr = (uint32_t *)post->ctx.vector[0].iov_base; + msg_type = ntoh32 (*(ptr + 1)); + break; - while (*dev_list) { - if (!strcmp (ibv_get_device_name (*dev_list), - options->device_name)) { - ib_ctx = ibv_open_device (*dev_list); - - if (!ib_ctx) { - gf_log (GF_RDMA_LOG_NAME, - GF_LOG_ERROR, - "Failed to get infiniband" - "device context"); - ret = -1; - goto cleanup; - } - break; - } - ++dev_list; + case GF_RDMA_NOMSG: + if (readch != NULL) { + msg_type = CALL; + } else { + msg_type = REPLY; } + break; - priv->device = gf_rdma_get_device (this, ib_ctx); - - if (!priv->device) { + case GF_RDMA_ERROR: + if (header->rm_body.rm_error.rm_type == ERR_CHUNK) { + gf_log (GF_RDMA_LOG_NAME, GF_LOG_WARNING, + "peer (%s), couldn't encode or decode the msg " + "properly or write chunks were not provided " + "for replies that were bigger than " + "RDMA_INLINE_THRESHOLD (%d)", + peer->trans->peerinfo.identifier, + GLUSTERFS_RDMA_INLINE_THRESHOLD); + ret = gf_rdma_pollin_notify (peer, post); + if (ret == -1) { + gf_log (GF_RDMA_LOG_NAME, GF_LOG_DEBUG, + "pollin notification failed"); + } + goto out; + } else { gf_log (GF_RDMA_LOG_NAME, GF_LOG_ERROR, - "could not create rdma device for %s", - options->device_name); + "an error has happened while transmission of " + "msg, disconnecting the transport"); ret = -1; - goto cleanup; + goto out; } - } - - priv->peer.trans = this; - INIT_LIST_HEAD (&priv->peer.ioq); - pthread_mutex_init (&priv->read_mutex, NULL); - pthread_mutex_init (&priv->write_mutex, NULL); - pthread_mutex_init (&priv->recv_mutex, NULL); - pthread_cond_init (&priv->recv_cond, NULL); + default: + gf_log (GF_RDMA_LOG_NAME, GF_LOG_WARNING, + "invalid rdma msg-type (%d)", header->rm_type); + goto out; + } -cleanup: - if (-1 == ret) { - if (ib_ctx) - ibv_close_device (ib_ctx); + if (msg_type == CALL) { + ret = gf_rdma_recv_request (peer, post, readch); + if (ret < 0) { + gf_log (GF_RDMA_LOG_NAME, GF_LOG_WARNING, + "receiving a request from peer (%s) failed", + peer->trans->peerinfo.identifier); + } + } else { + ret = gf_rdma_recv_reply (peer, post); + if (ret < 0) { + gf_log (GF_RDMA_LOG_NAME, GF_LOG_WARNING, + "receiving a reply from peer (%s) failed", + peer->trans->peerinfo.identifier); + } } - if (dev_list) - ibv_free_device_list (dev_list); +out: + if (ret == -1) { + rpc_transport_disconnect (peer->trans); + } - return ret; + return; } -static int32_t -gf_rdma_disconnect (rpc_transport_t *this) +static void * +gf_rdma_recv_completion_proc (void *data) { - gf_rdma_private_t *priv = NULL; - int32_t ret = 0; + struct ibv_comp_channel *chan = NULL; + gf_rdma_device_t *device = NULL;; + gf_rdma_post_t *post = NULL; + gf_rdma_peer_t *peer = NULL; + struct ibv_cq *event_cq = NULL; + struct ibv_wc wc = {0, }; + void *event_ctx = NULL; + int32_t ret = 0; - priv = this->private; - gf_log_callingfn (this->name, GF_LOG_WARNING, - "disconnect called (peer:%s)", - this->peerinfo.identifier); + chan = data; - pthread_mutex_lock (&priv->write_mutex); - { - ret = __gf_rdma_disconnect (this); - } - pthread_mutex_unlock (&priv->write_mutex); + while (1) { + ret = ibv_get_cq_event (chan, &event_cq, &event_ctx); + if (ret) { + gf_log (GF_RDMA_LOG_NAME, GF_LOG_ERROR, + "ibv_get_cq_event failed, terminating recv " + "thread %d (%d)", ret, errno); + continue; + } - return ret; -} + device = event_ctx; + + ret = ibv_req_notify_cq (event_cq, 0); + if (ret) { + gf_log (GF_RDMA_LOG_NAME, GF_LOG_ERROR, + "ibv_req_notify_cq on %s failed, terminating " + "recv thread: %d (%d)", + device->device_name, ret, errno); + continue; + } + device = (gf_rdma_device_t *) event_ctx; -static int32_t -__tcp_connect_finish (int fd) -{ - int ret = -1; - int optval = 0; - socklen_t optlen = sizeof (int); + while ((ret = ibv_poll_cq (event_cq, 1, &wc)) > 0) { + post = (gf_rdma_post_t *) (long) wc.wr_id; + + pthread_mutex_lock (&device->qpreg.lock); + { + peer = __gf_rdma_lookup_peer (device, + wc.qp_num); + + /* + * keep a refcount on transport so that it + * does not get freed because of some error + * indicated by wc.status till we are done + * with usage of peer and thereby that of trans. + */ + if (peer != NULL) { + rpc_transport_ref (peer->trans); + } + } + pthread_mutex_unlock (&device->qpreg.lock); + + if (wc.status != IBV_WC_SUCCESS) { + gf_log (GF_RDMA_LOG_NAME, GF_LOG_ERROR, + "recv work request on `%s' returned " + "error (%d)", device->device_name, + wc.status); + if (peer) { + rpc_transport_unref (peer->trans); + rpc_transport_disconnect (peer->trans); + } + + if (post) { + gf_rdma_post_unref (post); + } + continue; + } + + if (peer) { + gf_rdma_process_recv (peer, &wc); + rpc_transport_unref (peer->trans); + } else { + gf_log (GF_RDMA_LOG_NAME, + GF_LOG_DEBUG, + "could not lookup peer for qp_num: %d", + wc.qp_num); + } - ret = getsockopt (fd, SOL_SOCKET, SO_ERROR, - (void *)&optval, &optlen); + gf_rdma_post_unref (post); + } - if (ret == 0 && optval) - { - errno = optval; - ret = -1; + if (ret < 0) { + gf_log (GF_RDMA_LOG_NAME, + GF_LOG_ERROR, + "ibv_poll_cq on `%s' returned error " + "(ret = %d, errno = %d)", + device->device_name, ret, errno); + continue; + } + ibv_ack_cq_events (event_cq, 1); } - return ret; -} - -static inline void -gf_rdma_fill_handshake_data (char *buf, struct gf_rdma_nbio *nbio, - gf_rdma_private_t *priv) -{ - sprintf (buf, - "QP1:RECV_BLKSIZE=%08x:SEND_BLKSIZE=%08x\n" - "QP1:LID=%04x:QPN=%06x:PSN=%06x\n", - priv->peer.recv_size, - priv->peer.send_size, - priv->peer.local_lid, - priv->peer.local_qpn, - priv->peer.local_psn); - - nbio->vector.iov_base = buf; - nbio->vector.iov_len = strlen (buf) + 1; - nbio->count = 1; - return; + return NULL; } -static inline void -gf_rdma_fill_handshake_ack (char *buf, struct gf_rdma_nbio *nbio) -{ - sprintf (buf, "DONE\n"); - nbio->vector.iov_base = buf; - nbio->vector.iov_len = strlen (buf) + 1; - nbio->count = 1; - return; -} -static int -gf_rdma_handshake_pollin (rpc_transport_t *this) +void +gf_rdma_handle_failed_send_completion (gf_rdma_peer_t *peer, struct ibv_wc *wc) { - int ret = 0; - gf_rdma_private_t *priv = NULL; - char *buf = NULL; - int32_t recv_buf_size = 0, send_buf_size; - socklen_t sock_len = 0; - - priv = this->private; - buf = priv->handshake.incoming.buf; + gf_rdma_post_t *post = NULL; + gf_rdma_device_t *device = NULL; + gf_rdma_private_t *priv = NULL; - if (priv->handshake.incoming.state == GF_RDMA_HANDSHAKE_COMPLETE) { - return -1; + if (peer != NULL) { + priv = peer->trans->private; + if (priv != NULL) { + device = priv->device; + } } - pthread_mutex_lock (&priv->write_mutex); - { - while (priv->handshake.incoming.state != GF_RDMA_HANDSHAKE_COMPLETE) - { - switch (priv->handshake.incoming.state) - { - case GF_RDMA_HANDSHAKE_START: - buf = priv->handshake.incoming.buf = GF_CALLOC (1, 256, gf_common_mt_char); - gf_rdma_fill_handshake_data (buf, &priv->handshake.incoming, priv); - buf[0] = 0; - priv->handshake.incoming.state = GF_RDMA_HANDSHAKE_RECEIVING_DATA; - break; - case GF_RDMA_HANDSHAKE_RECEIVING_DATA: - ret = __tcp_readv (this, - &priv->handshake.incoming.vector, - priv->handshake.incoming.count, - &priv->handshake.incoming.pending_vector, - &priv->handshake.incoming.pending_count); - if (ret == -1) { - goto unlock; - } + post = (gf_rdma_post_t *) (long) wc->wr_id; - if (ret > 0) { - gf_log (this->name, GF_LOG_TRACE, - "partial header read on NB socket. continue later"); - goto unlock; - } + gf_log (GF_RDMA_LOG_NAME, GF_LOG_WARNING, + "send work request on `%s' returned error " + "wc.status = %d, wc.vendor_err = %d, post->buf = %p, " + "wc.byte_len = %d, post->reused = %d", + (device != NULL) ? device->device_name : NULL, wc->status, + wc->vendor_err, post->buf, wc->byte_len, post->reused); - if (!ret) { - priv->handshake.incoming.state = GF_RDMA_HANDSHAKE_RECEIVED_DATA; - } - break; + if (wc->status == IBV_WC_RETRY_EXC_ERR) { + gf_log ("rdma", GF_LOG_ERROR, "connection between client and" + " server not working. check by running " + "'ibv_srq_pingpong'. also make sure subnet manager" + " is running (eg: 'opensm'), or check if rdma port is " + "valid (or active) by running 'ibv_devinfo'. contact " + "Gluster Support Team if the problem persists."); + } - case GF_RDMA_HANDSHAKE_RECEIVED_DATA: - ret = sscanf (buf, - "QP1:RECV_BLKSIZE=%08x:SEND_BLKSIZE=%08x\n" - "QP1:LID=%04x:QPN=%06x:PSN=%06x\n", - &recv_buf_size, - &send_buf_size, - &priv->peer.remote_lid, - &priv->peer.remote_qpn, - &priv->peer.remote_psn); - - if ((ret != 5) && (strncmp (buf, "QP1:", 4))) { - gf_log (GF_RDMA_LOG_NAME, - GF_LOG_CRITICAL, - "%s: remote-host(%s)'s " - "transport type is different", - this->name, - this->peerinfo.identifier); - ret = -1; - goto unlock; - } + if (peer) { + rpc_transport_disconnect (peer->trans); + } - if (recv_buf_size < priv->peer.recv_size) - priv->peer.recv_size = recv_buf_size; - if (send_buf_size < priv->peer.send_size) - priv->peer.send_size = send_buf_size; - - gf_log (GF_RDMA_LOG_NAME, GF_LOG_TRACE, - "%s: transacted recv_size=%d " - "send_size=%d", - this->name, priv->peer.recv_size, - priv->peer.send_size); - - priv->peer.quota = priv->peer.send_count; - - if (gf_rdma_connect_qp (this)) { - gf_log (GF_RDMA_LOG_NAME, - GF_LOG_ERROR, - "%s: failed to connect with " - "remote QP", this->name); - ret = -1; - goto unlock; - } - gf_rdma_fill_handshake_ack (buf, &priv->handshake.incoming); - buf[0] = 0; - priv->handshake.incoming.state = GF_RDMA_HANDSHAKE_RECEIVING_ACK; - break; + return; +} - case GF_RDMA_HANDSHAKE_RECEIVING_ACK: - ret = __tcp_readv (this, - &priv->handshake.incoming.vector, - priv->handshake.incoming.count, - &priv->handshake.incoming.pending_vector, - &priv->handshake.incoming.pending_count); - if (ret == -1) { - goto unlock; - } - if (ret > 0) { - gf_log (this->name, GF_LOG_TRACE, - "partial header read on NB " - "socket. continue later"); - goto unlock; - } +void +gf_rdma_handle_successful_send_completion (gf_rdma_peer_t *peer, + struct ibv_wc *wc) +{ + gf_rdma_post_t *post = NULL; + int reads = 0, ret = 0; + gf_rdma_header_t *header = NULL; - if (!ret) { - priv->handshake.incoming.state = GF_RDMA_HANDSHAKE_RECEIVED_ACK; - } - break; + if (wc->opcode != IBV_WC_RDMA_READ) { + goto out; + } - case GF_RDMA_HANDSHAKE_RECEIVED_ACK: - if (strncmp (buf, "DONE", 4)) { - gf_log (GF_RDMA_LOG_NAME, - GF_LOG_DEBUG, - "%s: handshake-3 did not " - "return 'DONE' (%s)", - this->name, buf); - ret = -1; - goto unlock; - } - ret = 0; - priv->connected = 1; - sock_len = sizeof (struct sockaddr_storage); - getpeername (priv->sock, - (struct sockaddr *) &this->peerinfo.sockaddr, - &sock_len); - - GF_FREE (priv->handshake.incoming.buf); - priv->handshake.incoming.buf = NULL; - priv->handshake.incoming.state = GF_RDMA_HANDSHAKE_COMPLETE; - } - } + post = (gf_rdma_post_t *)(long) wc->wr_id; + + pthread_mutex_lock (&post->lock); + { + reads = --post->ctx.gf_rdma_reads; } -unlock: - pthread_mutex_unlock (&priv->write_mutex); + pthread_mutex_unlock (&post->lock); - if (ret == -1) { - rpc_transport_disconnect (this); - } else { - ret = 0; + if (reads != 0) { + /* if it is not the last rdma read, we've got nothing to do */ + goto out; } + header = (gf_rdma_header_t *)post->buf; - if (!ret && priv->connected) { - if (priv->is_server) { - ret = rpc_transport_notify (priv->listener, - RPC_TRANSPORT_ACCEPT, - this); - } else { - ret = rpc_transport_notify (this, RPC_TRANSPORT_CONNECT, - this); - } + if (header->rm_type == GF_RDMA_NOMSG) { + post->ctx.count = 1; + post->ctx.vector[0].iov_len += post->ctx.vector[1].iov_len; } - return ret; + ret = gf_rdma_pollin_notify (peer, post); + if ((ret == -1) && (peer != NULL)) { + rpc_transport_disconnect (peer->trans); + } + +out: + return; } -static int -gf_rdma_handshake_pollout (rpc_transport_t *this) + +static void * +gf_rdma_send_completion_proc (void *data) { - gf_rdma_private_t *priv = NULL; - char *buf = NULL; - int32_t ret = 0; + struct ibv_comp_channel *chan = NULL; + gf_rdma_post_t *post = NULL; + gf_rdma_peer_t *peer = NULL; + struct ibv_cq *event_cq = NULL; + void *event_ctx = NULL; + gf_rdma_device_t *device = NULL; + struct ibv_wc wc = {0, }; + char is_request = 0; + int32_t ret = 0, quota_ret = 0; - priv = this->private; - buf = priv->handshake.outgoing.buf; + chan = data; + while (1) { + ret = ibv_get_cq_event (chan, &event_cq, &event_ctx); + if (ret) { + gf_log (GF_RDMA_LOG_NAME, GF_LOG_ERROR, + "ibv_get_cq_event on failed, terminating " + "send thread: %d (%d)", ret, errno); + continue; + } - if (priv->handshake.outgoing.state == GF_RDMA_HANDSHAKE_COMPLETE) { - return 0; - } + device = event_ctx; - pthread_mutex_unlock (&priv->write_mutex); - { - while (priv->handshake.outgoing.state - != GF_RDMA_HANDSHAKE_COMPLETE) - { - switch (priv->handshake.outgoing.state) - { - case GF_RDMA_HANDSHAKE_START: - buf = priv->handshake.outgoing.buf - = GF_CALLOC (1, 256, gf_common_mt_char); - gf_rdma_fill_handshake_data (buf, - &priv->handshake.outgoing, priv); - priv->handshake.outgoing.state - = GF_RDMA_HANDSHAKE_SENDING_DATA; - break; + ret = ibv_req_notify_cq (event_cq, 0); + if (ret) { + gf_log (GF_RDMA_LOG_NAME, GF_LOG_ERROR, + "ibv_req_notify_cq on %s failed, terminating " + "send thread: %d (%d)", + device->device_name, ret, errno); + continue; + } - case GF_RDMA_HANDSHAKE_SENDING_DATA: - ret = __tcp_writev (this, - &priv->handshake.outgoing.vector, - priv->handshake.outgoing.count, - &priv->handshake.outgoing.pending_vector, - &priv->handshake.outgoing.pending_count); - if (ret == -1) { - goto unlock; - } + while ((ret = ibv_poll_cq (event_cq, 1, &wc)) > 0) { + post = (gf_rdma_post_t *) (long) wc.wr_id; - if (ret > 0) { - gf_log (this->name, GF_LOG_TRACE, - "partial header read on NB " - "socket. continue later"); - goto unlock; - } + pthread_mutex_lock (&device->qpreg.lock); + { + peer = __gf_rdma_lookup_peer (device, wc.qp_num); - if (!ret) { - priv->handshake.outgoing.state - = GF_RDMA_HANDSHAKE_SENT_DATA; + /* + * keep a refcount on transport so that it + * does not get freed because of some error + * indicated by wc.status, till we are done + * with usage of peer and thereby that of trans. + */ + if (peer != NULL) { + rpc_transport_ref (peer->trans); } - break; + } + pthread_mutex_unlock (&device->qpreg.lock); - case GF_RDMA_HANDSHAKE_SENT_DATA: - gf_rdma_fill_handshake_ack (buf, - &priv->handshake.outgoing); - priv->handshake.outgoing.state - = GF_RDMA_HANDSHAKE_SENDING_ACK; - break; + if (wc.status != IBV_WC_SUCCESS) { + gf_rdma_handle_failed_send_completion (peer, &wc); + } else { + gf_rdma_handle_successful_send_completion (peer, + &wc); + } - case GF_RDMA_HANDSHAKE_SENDING_ACK: - ret = __tcp_writev (this, - &priv->handshake.outgoing.vector, - priv->handshake.outgoing.count, - &priv->handshake.outgoing.pending_vector, - &priv->handshake.outgoing.pending_count); + if (post) { + is_request = post->ctx.is_request; - if (ret == -1) { - goto unlock; - } + ret = gf_rdma_post_unref (post); + if ((ret == 0) + && (wc.status == IBV_WC_SUCCESS) + && !is_request + && (post->type == GF_RDMA_SEND_POST) + && (peer != NULL)) { + /* An GF_RDMA_RECV_POST can end up in + * gf_rdma_send_completion_proc for + * rdma-reads, and we do not take + * quota for getting an GF_RDMA_RECV_POST. + */ - if (ret > 0) { - gf_log (this->name, GF_LOG_TRACE, - "partial header read on NB " - "socket. continue later"); - goto unlock; + /* + * if it is request, quota is returned + * after reply has come. + */ + quota_ret = gf_rdma_quota_put (peer); + if (quota_ret < 0) { + gf_log ("rdma", GF_LOG_DEBUG, + "failed to send " + "message"); + } } + } - if (!ret) { - GF_FREE (priv->handshake.outgoing.buf); - priv->handshake.outgoing.buf = NULL; - priv->handshake.outgoing.state - = GF_RDMA_HANDSHAKE_COMPLETE; - } - break; + if (peer) { + rpc_transport_unref (peer->trans); + } else { + gf_log (GF_RDMA_LOG_NAME, GF_LOG_DEBUG, + "could not lookup peer for qp_num: %d", + wc.qp_num); } } - } -unlock: - pthread_mutex_unlock (&priv->write_mutex); - if (ret == -1) { - rpc_transport_disconnect (this); - } else { - ret = 0; + if (ret < 0) { + gf_log (GF_RDMA_LOG_NAME, GF_LOG_ERROR, + "ibv_poll_cq on `%s' returned error (ret = %d," + " errno = %d)", + device->device_name, ret, errno); + continue; + } + + ibv_ack_cq_events (event_cq, 1); } - return ret; + return NULL; } -static int -gf_rdma_handshake_pollerr (rpc_transport_t *this) + +static void +gf_rdma_options_init (rpc_transport_t *this) { - gf_rdma_private_t *priv = this->private; - char need_unref = 0, connected = 0; + gf_rdma_private_t *priv = NULL; + gf_rdma_options_t *options = NULL; + int32_t mtu = 0; + data_t *temp = NULL; + + /* TODO: validate arguments from options below */ + + priv = this->private; + options = &priv->options; + options->send_size = GLUSTERFS_RDMA_INLINE_THRESHOLD;/*this->ctx->page_size * 4; 512 KB*/ + options->recv_size = GLUSTERFS_RDMA_INLINE_THRESHOLD;/*this->ctx->page_size * 4; 512 KB*/ + options->send_count = 4096; + options->recv_count = 4096; + options->attr_timeout = GF_RDMA_TIMEOUT; + options->attr_retry_cnt = GF_RDMA_RETRY_CNT; + options->attr_rnr_retry = GF_RDMA_RNR_RETRY; + + temp = dict_get (this->options, + "transport.rdma.work-request-send-count"); + if (temp) + options->send_count = data_to_int32 (temp); - gf_log_callingfn (GF_RDMA_LOG_NAME, GF_LOG_WARNING, - "%s: peer (%s) disconnected, cleaning up", - this->name, this->peerinfo.identifier); + temp = dict_get (this->options, + "transport.rdma.work-request-recv-count"); + if (temp) + options->recv_count = data_to_int32 (temp); - pthread_mutex_lock (&priv->write_mutex); - { - __gf_rdma_teardown (this); + temp = dict_get (this->options, "transport.rdma.attr-timeout"); - connected = priv->connected; - if (priv->sock != -1) { - event_unregister (this->ctx->event_pool, - priv->sock, priv->idx); - need_unref = 1; + if (temp) + options->attr_timeout = data_to_uint8 (temp); - if (close (priv->sock) != 0) { - gf_log (GF_RDMA_LOG_NAME, GF_LOG_ERROR, - "close () - error: %s", - strerror (errno)); - } - priv->tcp_connected = priv->connected = 0; - priv->sock = -1; - } + temp = dict_get (this->options, "transport.rdma.attr-retry-cnt"); - if (priv->handshake.incoming.buf) { - GF_FREE (priv->handshake.incoming.buf); - priv->handshake.incoming.buf = NULL; - } + if (temp) + options->attr_retry_cnt = data_to_uint8 (temp); - priv->handshake.incoming.state = GF_RDMA_HANDSHAKE_START; + temp = dict_get (this->options, "transport.rdma.attr-rnr-retry"); - if (priv->handshake.outgoing.buf) { - GF_FREE (priv->handshake.outgoing.buf); - priv->handshake.outgoing.buf = NULL; - } + if (temp) + options->attr_rnr_retry = data_to_uint8 (temp); - priv->handshake.outgoing.state = GF_RDMA_HANDSHAKE_START; - } - pthread_mutex_unlock (&priv->write_mutex); + options->port = 1; + temp = dict_get (this->options, + "transport.rdma.port"); + if (temp) + options->port = data_to_uint64 (temp); - if (connected) { - rpc_transport_notify (this, RPC_TRANSPORT_DISCONNECT, this); + options->mtu = mtu = IBV_MTU_2048; + temp = dict_get (this->options, + "transport.rdma.mtu"); + if (temp) + mtu = data_to_int32 (temp); + switch (mtu) { + case 256: options->mtu = IBV_MTU_256; + break; + case 512: options->mtu = IBV_MTU_512; + break; + case 1024: options->mtu = IBV_MTU_1024; + break; + case 2048: options->mtu = IBV_MTU_2048; + break; + case 4096: options->mtu = IBV_MTU_4096; + break; + default: + if (temp) + gf_log (GF_RDMA_LOG_NAME, GF_LOG_WARNING, + "%s: unrecognized MTU value '%s', defaulting " + "to '2048'", this->name, + data_to_str (temp)); + else + gf_log (GF_RDMA_LOG_NAME, GF_LOG_TRACE, + "%s: defaulting MTU to '2048'", + this->name); + options->mtu = IBV_MTU_2048; + break; } - if (need_unref) - rpc_transport_unref (this); + temp = dict_get (this->options, + "transport.rdma.device-name"); + if (temp) + options->device_name = gf_strdup (temp->data); - return 0; + return; } -static int -tcp_connect_finish (rpc_transport_t *this) +gf_rdma_ctx_t * +__gf_rdma_ctx_create (void) { - gf_rdma_private_t *priv = NULL; - int error = 0, ret = 0; + gf_rdma_ctx_t *rdma_ctx = NULL; + int ret = -1; - priv = this->private; - pthread_mutex_lock (&priv->write_mutex); - { - ret = __tcp_connect_finish (priv->sock); + rdma_ctx = GF_CALLOC (1, sizeof (*rdma_ctx), gf_common_mt_char); + if (rdma_ctx == NULL) { + goto out; + } - if (!ret) { - this->myinfo.sockaddr_len = - sizeof (this->myinfo.sockaddr); - ret = getsockname (priv->sock, - (struct sockaddr *)&this->myinfo.sockaddr, - &this->myinfo.sockaddr_len); - if (ret == -1) - { - gf_log (this->name, GF_LOG_ERROR, - "getsockname on new client-socket %d " - "failed (%s)", - priv->sock, strerror (errno)); - close (priv->sock); - error = 1; - goto unlock; - } + rdma_ctx->rdma_cm_event_channel = rdma_create_event_channel (); + if (rdma_ctx->rdma_cm_event_channel == NULL) { + gf_log (GF_RDMA_LOG_NAME, GF_LOG_WARNING, + "rdma_cm event channel creation failed (%s)", + strerror (errno)); + goto out; + } - gf_rdma_get_transport_identifiers (this); - priv->tcp_connected = 1; - } + ret = pthread_create (&rdma_ctx->rdma_cm_thread, NULL, + gf_rdma_cm_event_handler, + rdma_ctx->rdma_cm_event_channel); + if (ret != 0) { + gf_log (GF_RDMA_LOG_NAME, GF_LOG_WARNING, + "creation of thread to handle rdma-cm events " + "failed (%s)", strerror (ret)); + goto out; + } - if (ret == -1 && errno != EINPROGRESS) { - gf_log (this->name, GF_LOG_ERROR, - "tcp connect to %s failed (%s)", - this->peerinfo.identifier, strerror (errno)); - error = 1; +out: + if (ret < 0) { + if (rdma_ctx->rdma_cm_event_channel != NULL) { + rdma_destroy_event_channel (rdma_ctx->rdma_cm_event_channel); } - } -unlock: - pthread_mutex_unlock (&priv->write_mutex); - if (error) { - rpc_transport_disconnect (this); + GF_FREE (rdma_ctx); + rdma_ctx = NULL; } - return ret; + return rdma_ctx; } -static int -gf_rdma_event_handler (int fd, int idx, void *data, - int poll_in, int poll_out, int poll_err) +static int32_t +gf_rdma_init (rpc_transport_t *this) { - rpc_transport_t *this = NULL; - gf_rdma_private_t *priv = NULL; - gf_rdma_options_t *options = NULL; - int ret = 0; + gf_rdma_private_t *priv = NULL; + int32_t ret = 0; + glusterfs_ctx_t *ctx = NULL; + gf_rdma_options_t *options = NULL; + + ctx= this->ctx; - this = data; priv = this->private; - if (!priv->tcp_connected) { - ret = tcp_connect_finish (this); - if (priv->tcp_connected) { - options = &priv->options; - priv->peer.send_count = options->send_count; - priv->peer.recv_count = options->recv_count; - priv->peer.send_size = options->send_size; - priv->peer.recv_size = options->recv_size; + ibv_fork_init (); + gf_rdma_options_init (this); - if ((ret = gf_rdma_create_qp (this)) < 0) { - gf_log (GF_RDMA_LOG_NAME, GF_LOG_ERROR, - "%s: could not create QP", - this->name); - rpc_transport_disconnect (this); - } - } - } + options = &priv->options; + priv->peer.send_count = options->send_count; + priv->peer.recv_count = options->recv_count; + priv->peer.send_size = options->send_size; + priv->peer.recv_size = options->recv_size; - if (!ret && poll_out && priv->tcp_connected) { - ret = gf_rdma_handshake_pollout (this); - } + priv->peer.trans = this; + INIT_LIST_HEAD (&priv->peer.ioq); - if (!ret && !poll_err && poll_in && priv->tcp_connected) { - if (priv->handshake.incoming.state - == GF_RDMA_HANDSHAKE_COMPLETE) { - gf_log (GF_RDMA_LOG_NAME, GF_LOG_ERROR, - "%s: pollin received on tcp socket (peer: %s) " - "after handshake is complete", - this->name, this->peerinfo.identifier); - gf_rdma_handshake_pollerr (this); - return 0; - } + pthread_mutex_init (&priv->write_mutex, NULL); + pthread_mutex_init (&priv->recv_mutex, NULL); + pthread_cond_init (&priv->recv_cond, NULL); - ret = gf_rdma_handshake_pollin (this); - if (ret < 0) { - gf_log (GF_RDMA_LOG_NAME, GF_LOG_WARNING, - "handshake pollin failed"); + pthread_mutex_lock (&ctx->lock); + { + if (ctx->ib == NULL) { + ctx->ib = __gf_rdma_ctx_create (); + if (ctx->ib == NULL) { + ret = -1; + } } } + pthread_mutex_unlock (&ctx->lock); - if (ret < 0 || poll_err) { - ret = gf_rdma_handshake_pollerr (this); - } - - return 0; + return ret; } -static int -__tcp_nonblock (int fd) + +static int32_t +gf_rdma_disconnect (rpc_transport_t *this) { - int flags = 0; - int ret = -1; + gf_rdma_private_t *priv = NULL; + int32_t ret = 0; - flags = fcntl (fd, F_GETFL); + priv = this->private; + gf_log_callingfn (this->name, GF_LOG_WARNING, + "disconnect called (peer:%s)", + this->peerinfo.identifier); - if (flags != -1) - ret = fcntl (fd, F_SETFL, flags | O_NONBLOCK); + pthread_mutex_lock (&priv->write_mutex); + { + ret = __gf_rdma_disconnect (this); + } + pthread_mutex_unlock (&priv->write_mutex); return ret; } + static int32_t gf_rdma_connect (struct rpc_transport *this, int port) { gf_rdma_private_t *priv = NULL; int32_t ret = 0; - gf_boolean_t non_blocking = 1; union gf_sock_union sock_union = {{0, }, }; socklen_t sockaddr_len = 0; + gf_rdma_peer_t *peer = NULL; + gf_rdma_ctx_t *rdma_ctx = NULL; + gf_boolean_t connected = _gf_false; priv = this->private; + peer = &priv->peer; + + rpc_transport_ref (this); + ret = gf_rdma_client_get_remote_sockaddr (this, &sock_union.sa, &sockaddr_len, port); if (ret != 0) { gf_log (this->name, GF_LOG_DEBUG, "cannot get remote address to connect"); - return ret; + goto out; } + rdma_ctx = this->ctx->ib; pthread_mutex_lock (&priv->write_mutex); { - if (priv->sock != -1) { - ret = 0; + if (peer->cm_id != NULL) { + ret = -1; + errno = EINPROGRESS; + connected = _gf_true; goto unlock; } - priv->sock = socket (sock_union.sa.sa_family, SOCK_STREAM, 0); + priv->entity = GF_RDMA_CLIENT; - if (priv->sock == -1) { + ret = rdma_create_id (rdma_ctx->rdma_cm_event_channel, + &peer->cm_id, this, RDMA_PS_TCP); + if (ret != 0) { gf_log (this->name, GF_LOG_ERROR, - "socket () - error: %s", strerror (errno)); + "creation of rdma_cm_id failed (%s)", + strerror (errno)); ret = -errno; goto unlock; } - gf_log (this->name, GF_LOG_TRACE, - "socket fd = %d", priv->sock); - memcpy (&this->peerinfo.sockaddr, &sock_union.storage, sockaddr_len); this->peerinfo.sockaddr_len = sockaddr_len; @@ -4735,201 +4294,84 @@ gf_rdma_connect (struct rpc_transport *this, int port) ((struct sockaddr *) &this->myinfo.sockaddr)->sa_family = ((struct sockaddr *)&this->peerinfo.sockaddr)->sa_family; - if (non_blocking) - { - ret = __tcp_nonblock (priv->sock); - - if (ret == -1) - { - gf_log (this->name, GF_LOG_ERROR, - "could not set socket %d to non " - "blocking mode (%s)", - priv->sock, strerror (errno)); - close (priv->sock); - priv->sock = -1; - goto unlock; - } - } - ret = gf_rdma_client_bind (this, (struct sockaddr *)&this->myinfo.sockaddr, &this->myinfo.sockaddr_len, - priv->sock); - if (ret == -1) - { + peer->cm_id); + if (ret != 0) { gf_log (this->name, GF_LOG_WARNING, "client bind failed: %s", strerror (errno)); - close (priv->sock); - priv->sock = -1; goto unlock; } - ret = connect (priv->sock, - (struct sockaddr *)&this->peerinfo.sockaddr, - this->peerinfo.sockaddr_len); - if (ret == -1 && errno != EINPROGRESS) - { - gf_log (this->name, GF_LOG_ERROR, - "connection attempt failed (%s)", + ret = rdma_resolve_addr (peer->cm_id, NULL, &sock_union.sa, + 2000); + if (ret != 0) { + gf_log (this->name, GF_LOG_WARNING, + "rdma_resolve_addr failed (%s)", strerror (errno)); - close (priv->sock); - priv->sock = -1; goto unlock; } - priv->tcp_connected = priv->connected = 0; - - rpc_transport_ref (this); - - priv->handshake.incoming.state = GF_RDMA_HANDSHAKE_START; - priv->handshake.outgoing.state = GF_RDMA_HANDSHAKE_START; - - priv->idx = event_register (this->ctx->event_pool, - priv->sock, gf_rdma_event_handler, - this, 1, 1); + priv->connected = 0; } unlock: pthread_mutex_unlock (&priv->write_mutex); - return ret; -} - -static int -gf_rdma_server_event_handler (int fd, int idx, void *data, - int poll_in, int poll_out, int poll_err) -{ - int32_t main_sock = -1; - rpc_transport_t *this = NULL, *trans = NULL; - gf_rdma_private_t *priv = NULL; - gf_rdma_private_t *trans_priv = NULL; - gf_rdma_options_t *options = NULL; - - if (!poll_in) { - return 0; - } - - trans = data; - trans_priv = (gf_rdma_private_t *) trans->private; - - this = GF_CALLOC (1, sizeof (rpc_transport_t), - gf_common_mt_rpc_transport_t); - if (this == NULL) { - return -1; - } - - this->listener = trans; - - priv = GF_CALLOC (1, sizeof (gf_rdma_private_t), - gf_common_mt_rdma_private_t); - if (priv == NULL) { - GF_FREE (priv); - return -1; - } - this->private = priv; - /* Copy all the rdma related values in priv, from trans_priv - as other than QP, all the values remain same */ - priv->device = trans_priv->device; - priv->options = trans_priv->options; - priv->is_server = 1; - priv->listener = trans; - - options = &priv->options; - - this->ops = trans->ops; - this->init = trans->init; - this->fini = trans->fini; - this->ctx = trans->ctx; - this->name = gf_strdup (trans->name); - this->notify = trans->notify; - this->mydata = trans->mydata; - - memcpy (&this->myinfo.sockaddr, &trans->myinfo.sockaddr, - trans->myinfo.sockaddr_len); - this->myinfo.sockaddr_len = trans->myinfo.sockaddr_len; - - main_sock = (trans_priv)->sock; - this->peerinfo.sockaddr_len = sizeof (this->peerinfo.sockaddr); - priv->sock = accept (main_sock, - (struct sockaddr *)&this->peerinfo.sockaddr, - &this->peerinfo.sockaddr_len); - if (priv->sock == -1) { - gf_log ("rdma/server", GF_LOG_ERROR, - "accept() failed: %s", - strerror (errno)); - GF_FREE (this->private); - GF_FREE (this); - return -1; - } - - priv->peer.trans = this; - rpc_transport_ref (this); - - gf_rdma_get_transport_identifiers (this); - - priv->tcp_connected = 1; - priv->handshake.incoming.state = GF_RDMA_HANDSHAKE_START; - priv->handshake.outgoing.state = GF_RDMA_HANDSHAKE_START; - - priv->peer.send_count = options->send_count; - priv->peer.recv_count = options->recv_count; - priv->peer.send_size = options->send_size; - priv->peer.recv_size = options->recv_size; - INIT_LIST_HEAD (&priv->peer.ioq); +out: + if (ret != 0) { + if (!connected) { + gf_rdma_teardown (this); + } - if (gf_rdma_create_qp (this) < 0) { - gf_log (GF_RDMA_LOG_NAME, GF_LOG_ERROR, - "%s: could not create QP", - this->name); - rpc_transport_disconnect (this); - return -1; + rpc_transport_unref (this); } - priv->idx = event_register (this->ctx->event_pool, priv->sock, - gf_rdma_event_handler, this, 1, 1); - - pthread_mutex_init (&priv->read_mutex, NULL); - pthread_mutex_init (&priv->write_mutex, NULL); - pthread_mutex_init (&priv->recv_mutex, NULL); - /* pthread_cond_init (&priv->recv_cond, NULL); */ - return 0; + return ret; } + static int32_t gf_rdma_listen (rpc_transport_t *this) { union gf_sock_union sock_union = {{0, }, }; socklen_t sockaddr_len = 0; gf_rdma_private_t *priv = NULL; - int opt = 1, ret = 0; + gf_rdma_peer_t *peer = NULL; + int ret = 0; + gf_rdma_ctx_t *rdma_ctx = NULL; char service[NI_MAXSERV], host[NI_MAXHOST]; priv = this->private; - memset (&sock_union, 0, sizeof (sock_union)); - ret = gf_rdma_server_get_local_sockaddr (this, - &sock_union.sa, + peer = &priv->peer; + + priv->entity = GF_RDMA_SERVER_LISTENER; + + rdma_ctx = this->ctx->ib; + + ret = gf_rdma_server_get_local_sockaddr (this, &sock_union.sa, &sockaddr_len); if (ret != 0) { - gf_log (this->name, GF_LOG_DEBUG, + gf_log (this->name, GF_LOG_WARNING, "cannot find network address of server to bind to"); goto err; } - priv->sock = socket (sock_union.sa.sa_family, SOCK_STREAM, 0); - if (priv->sock == -1) { - gf_log ("rdma/server", GF_LOG_CRITICAL, - "init: failed to create socket, error: %s", + ret = rdma_create_id (rdma_ctx->rdma_cm_event_channel, + &peer->cm_id, this, RDMA_PS_TCP); + if (ret != 0) { + gf_log (this->name, GF_LOG_WARNING, + "creation of rdma_cm_id failed (%s)", strerror (errno)); - GF_FREE (this->private); - ret = -1; goto err; } - memcpy (&this->myinfo.sockaddr, &sock_union.storage, sockaddr_len); + memcpy (&this->myinfo.sockaddr, &sock_union.storage, + sockaddr_len); this->myinfo.sockaddr_len = sockaddr_len; ret = getnameinfo ((struct sockaddr *)&this->myinfo.sockaddr, - this->myinfo.sockaddr_len, - host, sizeof (host), + this->myinfo.sockaddr_len, host, sizeof (host), service, sizeof (service), NI_NUMERICHOST); if (ret != 0) { @@ -4937,34 +4379,38 @@ gf_rdma_listen (rpc_transport_t *this) "getnameinfo failed (%s)", gai_strerror (ret)); goto err; } + sprintf (this->myinfo.identifier, "%s:%s", host, service); - setsockopt (priv->sock, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof (opt)); - if (bind (priv->sock, &sock_union.sa, sockaddr_len) != 0) { - ret = -1; - gf_log ("rdma/server", GF_LOG_ERROR, - "init: failed to bind to socket for %s (%s)", - this->myinfo.identifier, strerror (errno)); + ret = rdma_bind_addr (peer->cm_id, &sock_union.sa); + if (ret != 0) { + gf_log (this->name, GF_LOG_WARNING, + "rdma_bind_addr failed (%s)", strerror (errno)); goto err; } - if (listen (priv->sock, 10) != 0) { - gf_log ("rdma/server", GF_LOG_ERROR, - "init: listen () failed on socket for %s (%s)", - this->myinfo.identifier, strerror (errno)); - ret = -1; + ret = rdma_listen (peer->cm_id, 10); + if (ret != 0) { + gf_log (this->name, GF_LOG_WARNING, + "rdma_listen failed (%s)", strerror (errno)); goto err; } - /* Register the main socket */ - priv->idx = event_register (this->ctx->event_pool, priv->sock, - gf_rdma_server_event_handler, - rpc_transport_ref (this), 1, 0); + rpc_transport_ref (this); + ret = 0; err: + if (ret < 0) { + if (peer->cm_id != NULL) { + rdma_destroy_id (peer->cm_id); + peer->cm_id = NULL; + } + } + return ret; } + struct rpc_transport_ops tops = { .submit_request = gf_rdma_submit_request, .submit_reply = gf_rdma_submit_reply, @@ -4983,7 +4429,6 @@ init (rpc_transport_t *this) return -1; this->private = priv; - priv->sock = -1; if (gf_rdma_init (this)) { gf_log (this->name, GF_LOG_ERROR, @@ -5007,13 +4452,6 @@ fini (struct rpc_transport *this) if (priv) { pthread_mutex_destroy (&priv->recv_mutex); pthread_mutex_destroy (&priv->write_mutex); - pthread_mutex_destroy (&priv->read_mutex); - - /* pthread_cond_destroy (&priv->recv_cond); */ - if (priv->sock != -1) { - event_unregister (this->ctx->event_pool, - priv->sock, priv->idx); - } gf_log (this->name, GF_LOG_TRACE, "called fini on transport: %p", this); diff --git a/rpc/rpc-transport/rdma/src/rdma.h b/rpc/rpc-transport/rdma/src/rdma.h index 687d6005f..6a2951b89 100644 --- a/rpc/rpc-transport/rdma/src/rdma.h +++ b/rpc/rpc-transport/rdma/src/rdma.h @@ -29,6 +29,7 @@ #include #include #include +#include /* FIXME: give appropriate values to these macros */ #define GF_DEFAULT_RDMA_LISTEN_PORT (GF_DEFAULT_BASE_PORT + 1) @@ -230,30 +231,33 @@ typedef enum __gf_rdma_send_post_type { /* represents one communication peer, two per transport_t */ struct __gf_rdma_peer { - rpc_transport_t *trans; - struct ibv_qp *qp; + rpc_transport_t *trans; + struct rdma_cm_id *cm_id; + struct ibv_qp *qp; + pthread_t rdma_event_thread; + char quota_set; int32_t recv_count; int32_t send_count; int32_t recv_size; int32_t send_size; - int32_t quota; + int32_t quota; union { - struct list_head ioq; + struct list_head ioq; struct { - gf_rdma_ioq_t *ioq_next; - gf_rdma_ioq_t *ioq_prev; + gf_rdma_ioq_t *ioq_next; + gf_rdma_ioq_t *ioq_prev; }; }; /* QP attributes, needed to connect with remote QP */ - int32_t local_lid; - int32_t local_psn; - int32_t local_qpn; - int32_t remote_lid; - int32_t remote_psn; - int32_t remote_qpn; + int32_t local_lid; + int32_t local_psn; + int32_t local_qpn; + int32_t remote_lid; + int32_t remote_psn; + int32_t remote_qpn; }; typedef struct __gf_rdma_peer gf_rdma_peer_t; @@ -327,26 +331,12 @@ struct __gf_rdma_device { }; typedef struct __gf_rdma_device gf_rdma_device_t; -typedef enum { - GF_RDMA_HANDSHAKE_START = 0, - GF_RDMA_HANDSHAKE_SENDING_DATA, - GF_RDMA_HANDSHAKE_RECEIVING_DATA, - GF_RDMA_HANDSHAKE_SENT_DATA, - GF_RDMA_HANDSHAKE_RECEIVED_DATA, - GF_RDMA_HANDSHAKE_SENDING_ACK, - GF_RDMA_HANDSHAKE_RECEIVING_ACK, - GF_RDMA_HANDSHAKE_RECEIVED_ACK, - GF_RDMA_HANDSHAKE_COMPLETE, -} gf_rdma_handshake_state_t; - -struct gf_rdma_nbio { - int state; - char *buf; - int count; - struct iovec vector; - struct iovec *pending_vector; - int pending_count; +struct __gf_rdma_ctx { + gf_rdma_device_t *device; + struct rdma_event_channel *rdma_cm_event_channel; + pthread_t rdma_cm_thread; }; +typedef struct __gf_rdma_ctx gf_rdma_ctx_t; struct __gf_rdma_request_context { struct ibv_mr *mr[GF_RDMA_MAX_SEGMENTS]; @@ -358,46 +348,35 @@ struct __gf_rdma_request_context { }; typedef struct __gf_rdma_request_context gf_rdma_request_context_t; +typedef enum { + GF_RDMA_SERVER_LISTENER, + GF_RDMA_SERVER, + GF_RDMA_CLIENT, +} gf_rdma_transport_entity_t; + struct __gf_rdma_private { - int32_t sock; - int32_t idx; - unsigned char connected; - unsigned char tcp_connected; - unsigned char ib_connected; - in_addr_t addr; + int32_t idx; + unsigned char connected; + in_addr_t addr; unsigned short port; /* IB Verbs Driver specific variables, pointers */ - gf_rdma_peer_t peer; + gf_rdma_peer_t peer; struct __gf_rdma_device *device; - gf_rdma_options_t options; + gf_rdma_options_t options; /* Used by trans->op->receive */ - char *data_ptr; - int32_t data_offset; - int32_t data_len; + char *data_ptr; + int32_t data_offset; + int32_t data_len; /* Mutex */ - pthread_mutex_t read_mutex; - pthread_mutex_t write_mutex; - pthread_barrier_t handshake_barrier; - char handshake_ret; - char is_server; - rpc_transport_t *listener; - - pthread_mutex_t recv_mutex; - pthread_cond_t recv_cond; - - /* used during gf_rdma_handshake */ - struct { - struct gf_rdma_nbio incoming; - struct gf_rdma_nbio outgoing; - int state; - gf_rdma_header_t header; - char *buf; - size_t size; - } handshake; + pthread_mutex_t write_mutex; + rpc_transport_t *listener; + pthread_mutex_t recv_mutex; + pthread_cond_t recv_cond; + gf_rdma_transport_entity_t entity; }; -typedef struct __gf_rdma_private gf_rdma_private_t; +typedef struct __gf_rdma_private gf_rdma_private_t; #endif /* _XPORT_GF_RDMA_H */ -- cgit