diff options
Diffstat (limited to 'rpc/rpc-transport/rdma/src/rdma.c')
| -rw-r--r-- | rpc/rpc-transport/rdma/src/rdma.c | 2738 | 
1 files changed, 1088 insertions, 1650 deletions
diff --git a/rpc/rpc-transport/rdma/src/rdma.c b/rpc/rpc-transport/rdma/src/rdma.c index a44e8995f11..135fbdf2868 100644 --- a/rpc/rpc-transport/rdma/src/rdma.c +++ b/rpc/rpc-transport/rdma/src/rdma.c @@ -8,7 +8,6 @@    cases as published by the Free Software Foundation.  */ -  #ifndef _CONFIG_H  #define _CONFIG_H  #include "config.h" @@ -35,99 +34,26 @@ gf_rdma_post_ref (gf_rdma_post_t *post);  int  gf_rdma_post_unref (gf_rdma_post_t *post); -int32_t -gf_resolve_ip6 (const char *hostname, -                uint16_t port, -                int family, -                void **dnscache, -                struct addrinfo **addr_info); - -static uint16_t -gf_rdma_get_local_lid (struct ibv_context *context, -                       int32_t port) -{ -        struct ibv_port_attr attr; - -        if (ibv_query_port (context, port, &attr)) -                return 0; - -        return attr.lid; -} +static void * +gf_rdma_send_completion_proc (void *data); -static const char * -get_port_state_str(enum ibv_port_state pstate) -{ -        switch (pstate) { -        case IBV_PORT_DOWN:          return "PORT_DOWN"; -        case IBV_PORT_INIT:          return "PORT_INIT"; -        case IBV_PORT_ARMED:         return "PORT_ARMED"; -        case IBV_PORT_ACTIVE:        return "PORT_ACTIVE"; -        case IBV_PORT_ACTIVE_DEFER:  return "PORT_ACTIVE_DEFER"; -        default:                     return "invalid state"; -        } -} +static void * +gf_rdma_recv_completion_proc (void *data);  static int32_t -ib_check_active_port (struct ibv_context *ctx, uint8_t port) -{ -        struct ibv_port_attr  port_attr = {0, }; -        int32_t               ret       = 0; -        const char           *state_str = NULL; - -        if (!ctx) { -                gf_log_callingfn (GF_RDMA_LOG_NAME, GF_LOG_ERROR, -                                  "Error in supplied context"); -                return -1; -        } - -        ret = ibv_query_port (ctx, port, &port_attr); - -        if (ret) { -                gf_log (GF_RDMA_LOG_NAME, GF_LOG_ERROR, -                        "Failed to query port %u properties", port); -                return -1; -        } - -        state_str = get_port_state_str (port_attr.state); -        gf_log (GF_RDMA_LOG_NAME, GF_LOG_TRACE, -                "Infiniband PORT: (%u) STATE: (%s)", -                port, state_str); - -        if (port_attr.state == IBV_PORT_ACTIVE) -                return 0; - -        return -1; -} +gf_rdma_create_qp (rpc_transport_t *this);  static int32_t -ib_get_active_port (struct ibv_context *ib_ctx) -{ -        struct ibv_device_attr ib_device_attr = {{0, }, }; -        int32_t                ret            = -1; -        uint8_t                ib_port        = 0; +__gf_rdma_teardown (rpc_transport_t *this); -        if (!ib_ctx) { -                gf_log_callingfn (GF_RDMA_LOG_NAME, GF_LOG_ERROR, -                                  "Error in supplied context"); -                return -1; -        } -        if (ibv_query_device (ib_ctx, &ib_device_attr)) { -                gf_log (GF_RDMA_LOG_NAME, GF_LOG_ERROR, -                        "Failed to query device properties"); -                return -1; -        } +static int32_t +gf_rdma_teardown (rpc_transport_t *this); -        for (ib_port = 1; ib_port <= ib_device_attr.phys_port_cnt; ++ib_port) { -                ret = ib_check_active_port (ib_ctx, ib_port); -                if (ret == 0) -                        return ib_port; +static int32_t +gf_rdma_disconnect (rpc_transport_t *this); -                gf_log (GF_RDMA_LOG_NAME, GF_LOG_TRACE, -                        "Port:(%u) not active", ib_port); -                continue; -        } -        return ret; -} +static void +gf_rdma_cm_handle_disconnect (rpc_transport_t *this);  static void @@ -157,7 +83,7 @@ gf_rdma_put_post (gf_rdma_queue_t *queue, gf_rdma_post_t *post)  static gf_rdma_post_t * -gf_rdma_new_post (gf_rdma_device_t *device, int32_t len, +gf_rdma_new_post (rpc_transport_t *this, gf_rdma_device_t *device, int32_t len,                    gf_rdma_post_type_t type)  {          gf_rdma_post_t *post = NULL; @@ -184,7 +110,7 @@ gf_rdma_new_post (gf_rdma_device_t *device, int32_t len,                                 post->buf_size,                                 IBV_ACCESS_LOCAL_WRITE);          if (!post->mr) { -                gf_log (GF_RDMA_LOG_NAME, GF_LOG_WARNING, +                gf_log (this->name, GF_LOG_WARNING,                          "memory registration failed (%s)",                          strerror (errno));                  goto out; @@ -259,22 +185,6 @@ __gf_rdma_quota_get (gf_rdma_peer_t *peer)          return ret;  } -/* -  static int32_t -  gf_rdma_quota_get (gf_rdma_peer_t *peer) -  { -  int32_t ret = -1; -  gf_rdma_private_t *priv = peer->trans->private; - -  pthread_mutex_lock (&priv->write_mutex); -  { -  ret = __gf_rdma_quota_get (peer); -  } -  pthread_mutex_unlock (&priv->write_mutex); - -  return ret; -  } -*/  static void  __gf_rdma_ioq_entry_free (gf_rdma_ioq_t *entry) @@ -290,6 +200,7 @@ __gf_rdma_ioq_entry_free (gf_rdma_ioq_t *entry)                  iobref_unref (entry->msg.request.rsp_iobref);                  entry->msg.request.rsp_iobref = NULL;          } +          mem_put (entry);  } @@ -309,26 +220,890 @@ static int32_t  __gf_rdma_disconnect (rpc_transport_t *this)  {          gf_rdma_private_t *priv = NULL; -        int32_t            ret  = 0;          priv = this->private; -        if (priv->connected || priv->tcp_connected) { -                fcntl (priv->sock, F_SETFL, O_NONBLOCK); -                if (shutdown (priv->sock, SHUT_RDWR) != 0) { -                        gf_log (GF_RDMA_LOG_NAME, GF_LOG_DEBUG, -                                "shutdown () - error: %s", -                                strerror (errno)); -                        ret = -errno; -                        priv->tcp_connected = 0; +        if (priv->connected) { +                rdma_disconnect (priv->peer.cm_id); +        } + +        return 0; +} + + +static void +gf_rdma_queue_init (gf_rdma_queue_t *queue) +{ +        pthread_mutex_init (&queue->lock, NULL); + +        queue->active_posts.next = &queue->active_posts; +        queue->active_posts.prev = &queue->active_posts; +        queue->passive_posts.next = &queue->passive_posts; +        queue->passive_posts.prev = &queue->passive_posts; +} + + +static void +__gf_rdma_destroy_queue (gf_rdma_post_t *post) +{ +        gf_rdma_post_t *tmp = NULL; + +        while (post->next != post) { +                tmp = post->next; + +                post->next = post->next->next; +                post->next->prev = post; + +                gf_rdma_destroy_post (tmp); +        } +} + + +static void +gf_rdma_destroy_queue (gf_rdma_queue_t *queue) +{ +        if (queue == NULL) { +                goto out; +        } + +        pthread_mutex_lock (&queue->lock); +        { +                if (queue->passive_count > 0) { +                        __gf_rdma_destroy_queue (&queue->passive_posts); +                        queue->passive_count = 0; +                } + +                if (queue->active_count > 0) { +                        __gf_rdma_destroy_queue (&queue->active_posts); +                        queue->active_count = 0; +                } +        } +        pthread_mutex_unlock (&queue->lock); + +out: +        return; +} + + +static void +gf_rdma_destroy_posts (rpc_transport_t *this) +{ +        gf_rdma_device_t  *device = NULL; +        gf_rdma_private_t *priv   = NULL; + +        if (this == NULL) { +                goto out; +        } + +        priv = this->private; +        device = priv->device; + +        gf_rdma_destroy_queue (&device->sendq); +        gf_rdma_destroy_queue (&device->recvq); + +out: +        return; +} + + +static int32_t +__gf_rdma_create_posts (rpc_transport_t *this, int32_t count, int32_t size, +                        gf_rdma_queue_t *q, gf_rdma_post_type_t type) +{ +        int32_t            i      = 0; +        int32_t            ret    = 0; +        gf_rdma_private_t *priv   = NULL; +        gf_rdma_device_t  *device = NULL; + +        priv = this->private; +        device = priv->device; + +        for (i=0 ; i<count ; i++) { +                gf_rdma_post_t *post = NULL; + +                post = gf_rdma_new_post (this, device, size + 2048, type); +                if (!post) { +                        gf_log (this->name, GF_LOG_ERROR, +                                "post creation failed"); +                        ret = -1; +                        break; +                } + +                gf_rdma_put_post (q, post); +        } +        return ret; +} + + +static int32_t +gf_rdma_post_recv (struct ibv_srq *srq, +                   gf_rdma_post_t *post) +{ +        struct ibv_sge list = { +                .addr   = (unsigned long) post->buf, +                .length = post->buf_size, +                .lkey   = post->mr->lkey +        }; + +        struct ibv_recv_wr wr = { +                .wr_id  = (unsigned long) post, +                .sg_list = &list, +                .num_sge = 1, +        }, *bad_wr; + +        gf_rdma_post_ref (post); + +        return ibv_post_srq_recv (srq, &wr, &bad_wr); +} + + +static int32_t +gf_rdma_create_posts (rpc_transport_t *this) +{ +        int32_t            i       = 0, ret = 0; +        gf_rdma_post_t    *post    = NULL; +        gf_rdma_private_t *priv    = NULL; +        gf_rdma_options_t *options = NULL; +        gf_rdma_device_t  *device  = NULL; + +        priv = this->private; +        options = &priv->options; +        device = priv->device; + +        ret =  __gf_rdma_create_posts (this, options->send_count, +                                       options->send_size, +                                       &device->sendq, GF_RDMA_SEND_POST); +        if (!ret) +                ret =  __gf_rdma_create_posts (this, options->recv_count, +                                               options->recv_size, +                                               &device->recvq, +                                               GF_RDMA_RECV_POST); + +        if (!ret) { +                for (i=0 ; i<options->recv_count ; i++) { +                        post = gf_rdma_get_post (&device->recvq); +                        if (gf_rdma_post_recv (device->srq, post) != 0) { +                                ret = -1; +                                break; +                        } +                } +        } + +        if (ret) +                gf_rdma_destroy_posts (this); + +        return ret; +} + + +static void +gf_rdma_destroy_cq (rpc_transport_t *this) +{ +        gf_rdma_private_t *priv   = NULL; +        gf_rdma_device_t  *device = NULL; + +        priv = this->private; +        device = priv->device; + +        if (device->recv_cq) +                ibv_destroy_cq (device->recv_cq); +        device->recv_cq = NULL; + +        if (device->send_cq) +                ibv_destroy_cq (device->send_cq); +        device->send_cq = NULL; + +        return; +} + + +static int32_t +gf_rdma_create_cq (rpc_transport_t *this) +{ +        gf_rdma_private_t      *priv        = NULL; +        gf_rdma_options_t      *options     = NULL; +        gf_rdma_device_t       *device      = NULL; +        uint64_t                send_cqe    = 0; +        int32_t                 ret         = 0; +        struct ibv_device_attr  device_attr = {{0}, }; + +        priv = this->private; +        options = &priv->options; +        device = priv->device; + +        device->recv_cq = ibv_create_cq (priv->device->context, +                                         options->recv_count * 2, +                                         device, +                                         device->recv_chan, +                                         0); +        if (!device->recv_cq) { +                gf_log (this->name, GF_LOG_ERROR, +                        "creation of CQ for device %s failed", +                        device->device_name); +                ret = -1; +                goto out; +        } else if (ibv_req_notify_cq (device->recv_cq, 0)) { +                gf_log (this->name, GF_LOG_ERROR, +                        "ibv_req_notify_cq on recv CQ of device %s failed", +                        device->device_name); +                ret = -1; +                goto out; +        } + +        do { +                ret = ibv_query_device (priv->device->context, &device_attr); +                if (ret != 0) { +                        gf_log (this->name, GF_LOG_ERROR, +                                "ibv_query_device on %s returned %d (%s)", +                                priv->device->device_name, ret, +                                (ret > 0) ? strerror (ret) : ""); +                        ret = -1; +                        goto out; +                } + +                send_cqe = options->send_count * 128; +                send_cqe = (send_cqe > device_attr.max_cqe) +                        ? device_attr.max_cqe : send_cqe; + +                /* TODO: make send_cq size dynamically adaptive */ +                device->send_cq = ibv_create_cq (priv->device->context, +                                                 send_cqe, device, +                                                 device->send_chan, 0); +                if (!device->send_cq) { +                        gf_log (this->name, GF_LOG_ERROR, +                                "creation of send_cq for device %s failed", +                                device->device_name); +                        ret = -1; +                        goto out; +                } + +                if (ibv_req_notify_cq (device->send_cq, 0)) { +                        gf_log (this->name, GF_LOG_ERROR, +                                "ibv_req_notify_cq on send_cq for device %s" +                                " failed",  device->device_name); +                        ret = -1; +                        goto out; +                } +        } while (0); + +out: +        if (ret != 0) +                gf_rdma_destroy_cq (this); + +        return ret; +} + + +static gf_rdma_device_t * +gf_rdma_get_device (rpc_transport_t *this, struct ibv_context *ibctx, +                    char *device_name) +{ +        glusterfs_ctx_t   *ctx      = NULL; +        gf_rdma_private_t *priv     = NULL; +        gf_rdma_options_t *options  = NULL; +        int32_t            ret      = 0; +        int32_t            i        = 0; +        gf_rdma_device_t  *trav     = NULL, *device = NULL; +        gf_rdma_ctx_t     *rdma_ctx = NULL; + +        priv        = this->private; +        options     = &priv->options; +        ctx         = this->ctx; +        rdma_ctx    = ctx->ib; + +        trav = rdma_ctx->device; + +        while (trav) { +                if (!strcmp (trav->device_name, device_name)) +                        break; +                trav = trav->next; +        } + +        if (!trav) { +                trav = GF_CALLOC (1, sizeof (*trav), +                                  gf_common_mt_rdma_device_t); +                if (trav == NULL) { +                        goto out; +                } + +                priv->device = trav; +                trav->context = ibctx; + +                trav->request_ctx_pool +                        = mem_pool_new (gf_rdma_request_context_t, +                                        GF_RDMA_POOL_SIZE); +                if (trav->request_ctx_pool == NULL) { +                        goto out; +                } + +                trav->ioq_pool +                        = mem_pool_new (gf_rdma_ioq_t, GF_RDMA_POOL_SIZE); +                if (trav->ioq_pool == NULL) { +                        goto out; +                } + +                trav->reply_info_pool = mem_pool_new (gf_rdma_reply_info_t, +                                                      GF_RDMA_POOL_SIZE); +                if (trav->reply_info_pool == NULL) { +                        goto out; +                } + +                trav->device_name = gf_strdup (device_name); + +                trav->next = rdma_ctx->device; +                rdma_ctx->device = trav; + +                trav->send_chan = ibv_create_comp_channel (trav->context); +                if (!trav->send_chan) { +                        gf_log (this->name, GF_LOG_ERROR, +                                "could not create send completion channel for " +                                "device (%s)", device_name); +                        goto out; +                } + +                trav->recv_chan = ibv_create_comp_channel (trav->context); +                if (!trav->recv_chan) { +                        gf_log (this->name, GF_LOG_ERROR, +                                "could not create recv completion channel for " +                                "device (%s)", device_name); + +                        /* TODO: cleanup current mess */ +                        goto out; +                } + +                if (gf_rdma_create_cq (this) < 0) { +                        gf_log (this->name, GF_LOG_ERROR, +                                "could not create CQ for device (%s)", +                                device_name); +                        goto out; +                } + +                /* protection domain */ +                trav->pd = ibv_alloc_pd (trav->context); + +                if (!trav->pd) { +                        gf_log (this->name, GF_LOG_ERROR, +                                "could not allocate protection domain for " +                                "device (%s)", device_name); +                        goto out; +                } + +                struct ibv_srq_init_attr attr = { +                        .attr = { +                                .max_wr = options->recv_count, +                                .max_sge = 1 +                        } +                }; +                trav->srq = ibv_create_srq (trav->pd, &attr); + +                if (!trav->srq) { +                        gf_log (this->name, GF_LOG_ERROR, +                                "could not create SRQ for device (%s)", +                                device_name); +                        goto out; +                } + +                /* queue init */ +                gf_rdma_queue_init (&trav->sendq); +                gf_rdma_queue_init (&trav->recvq); + +                if (gf_rdma_create_posts (this) < 0) { +                        gf_log (this->name, GF_LOG_ERROR, +                                "could not allocate posts for device (%s)", +                                device_name); +                        goto out; +                } + +                /* completion threads */ +                ret = pthread_create (&trav->send_thread, +                                      NULL, +                                      gf_rdma_send_completion_proc, +                                      trav->send_chan); +                if (ret) { +                        gf_log (this->name, GF_LOG_ERROR, +                                "could not create send completion thread for " +                                "device (%s)", device_name); +                        goto out; +                } + +                ret = pthread_create (&trav->recv_thread, +                                      NULL, +                                      gf_rdma_recv_completion_proc, +                                      trav->recv_chan); +                if (ret) { +                        gf_log (this->name, GF_LOG_ERROR, +                                "could not create recv completion thread " +                                "for device (%s)", device_name); +                        return NULL; +                } + +                /* qpreg */ +                pthread_mutex_init (&trav->qpreg.lock, NULL); +                for (i=0; i<42; i++) { +                        trav->qpreg.ents[i].next = &trav->qpreg.ents[i]; +                        trav->qpreg.ents[i].prev = &trav->qpreg.ents[i]; +                } +        } + +        device = trav; +        trav = NULL; +out: + +        if (trav != NULL) { +                gf_rdma_destroy_posts (this); +                mem_pool_destroy (trav->ioq_pool); +                mem_pool_destroy (trav->request_ctx_pool); +                mem_pool_destroy (trav->reply_info_pool); +                ibv_dealloc_pd (trav->pd); +                gf_rdma_destroy_cq (this); +                ibv_destroy_comp_channel (trav->recv_chan); +                ibv_destroy_comp_channel (trav->send_chan); +                GF_FREE ((char *)trav->device_name); +                GF_FREE (trav); +        } + +        return device; +} + + +static rpc_transport_t * +gf_rdma_transport_new (rpc_transport_t *listener, struct rdma_cm_id *cm_id) +{ +        gf_rdma_private_t *listener_priv = NULL, *priv = NULL; +        rpc_transport_t   *this          = NULL, *new = NULL; +        gf_rdma_options_t *options       = NULL; +        char              *device_name   = NULL; + +        listener_priv = listener->private; + +        this = GF_CALLOC (1, sizeof (rpc_transport_t), +                          gf_common_mt_rpc_transport_t); +        if (this == NULL) { +                goto out; +        } + +        this->listener = listener; + +        priv = GF_CALLOC (1, sizeof (gf_rdma_private_t), +                          gf_common_mt_rdma_private_t); +        if (priv == NULL) { +                goto out; +        } + +        this->private = priv; +        priv->options = listener_priv->options; + +        priv->listener = listener; +        priv->entity = GF_RDMA_SERVER; + +        options = &priv->options; + +        this->ops = listener->ops; +        this->init = listener->init; +        this->fini = listener->fini; +        this->ctx = listener->ctx; +        this->name = gf_strdup (listener->name); +        this->notify = listener->notify; +        this->mydata = listener->mydata; + +        this->myinfo.sockaddr_len = sizeof (cm_id->route.addr.src_addr); +        memcpy (&this->myinfo.sockaddr, &cm_id->route.addr.src_addr, +                this->myinfo.sockaddr_len); + +        this->peerinfo.sockaddr_len = sizeof (cm_id->route.addr.dst_addr); +        memcpy (&this->peerinfo.sockaddr, &cm_id->route.addr.dst_addr, +                this->peerinfo.sockaddr_len); + +        priv->peer.trans = this; +        gf_rdma_get_transport_identifiers (this); + +        device_name = (char *)ibv_get_device_name (cm_id->verbs->device); +        if (device_name == NULL) { +                gf_log (listener->name, GF_LOG_WARNING, +                        "cannot get device name (peer:%s me:%s)", +                        this->peerinfo.identifier, this->myinfo.identifier); +                goto out; +        } + +        priv->device = gf_rdma_get_device (this, cm_id->verbs, +                                           device_name); +        if (priv->device == NULL) { +                gf_log (listener->name, GF_LOG_WARNING, +                        "cannot get infiniband device %s (peer:%s me:%s)", +                        device_name, this->peerinfo.identifier, +                        this->myinfo.identifier); +                goto out; +        } + +        priv->peer.send_count = options->send_count; +        priv->peer.recv_count = options->recv_count; +        priv->peer.send_size = options->send_size; +        priv->peer.recv_size = options->recv_size; +        priv->peer.cm_id = cm_id; +        INIT_LIST_HEAD (&priv->peer.ioq); + +        pthread_mutex_init (&priv->write_mutex, NULL); +        pthread_mutex_init (&priv->recv_mutex, NULL); + +        cm_id->context = this; + +        new = rpc_transport_ref (this); +        this = NULL; +out: +        if (this != NULL) { +                if (this->private != NULL) { +                        GF_FREE (this->private); +                } + +                if (this->name != NULL) { +                        GF_FREE (this->name); +                } + +                GF_FREE (this); +        } + +        return new; +} + + +static int +gf_rdma_cm_handle_connect_request (struct rdma_cm_event *event) +{ +        int                     ret         = -1; +        rpc_transport_t        *this        = NULL, *listener = NULL; +        struct rdma_cm_id      *child_cm_id = NULL, *listener_cm_id = NULL; +	struct rdma_conn_param  conn_param  = {0, }; +        gf_rdma_private_t      *priv        = NULL; +        gf_rdma_options_t      *options     = NULL; + +        child_cm_id = event->id; +        listener_cm_id = event->listen_id; + +        listener = listener_cm_id->context; +        priv = listener->private; +        options = &priv->options; + +        this = gf_rdma_transport_new (listener, child_cm_id); +        if (this == NULL) { +                gf_log (listener->name, GF_LOG_WARNING, +                        "could not create a transport for incoming connection" +                        " (me.name:%s me.identifier:%s)", listener->name, +                        listener->myinfo.identifier); +                rdma_destroy_id (child_cm_id); +                goto out; +        } + +        gf_log (listener->name, GF_LOG_TRACE, +                "got a connect request (me:%s peer:%s)", +                listener->myinfo.identifier, this->peerinfo.identifier); + +        ret = gf_rdma_create_qp (this); +        if (ret < 0) { +                gf_log (listener->name, GF_LOG_WARNING, +                        "could not create QP (peer:%s me:%s)", +                        this->peerinfo.identifier, this->myinfo.identifier); +                gf_rdma_cm_handle_disconnect (this); +                goto out; +        } + +	conn_param.responder_resources = 1; +	conn_param.initiator_depth = 1; +        conn_param.retry_count = options->attr_retry_cnt; +        conn_param.rnr_retry_count = options->attr_rnr_retry; + +	ret = rdma_accept(child_cm_id, &conn_param); +	if (ret < 0) { +                gf_log (listener->name, GF_LOG_WARNING, "rdma_accept failed " +                        "peer:%s me:%s (%s)", this->peerinfo.identifier, +                        this->myinfo.identifier, strerror (errno)); +                gf_rdma_cm_handle_disconnect (this); +                goto out; +	} + +        ret = 0; + +out: +        return ret; +} + + +static int +gf_rdma_cm_handle_route_resolved (struct rdma_cm_event *event) +{ +	struct rdma_conn_param  conn_param = {0, }; +	int                     ret        = 0; +        rpc_transport_t        *this       = NULL; +        gf_rdma_private_t      *priv       = NULL; +        gf_rdma_peer_t         *peer       = NULL; +        gf_rdma_options_t      *options    = NULL; + +        if (event == NULL) { +                goto out; +        } + +        this = event->id->context; + +        priv = this->private; +        peer = &priv->peer; +        options = &priv->options; + +        ret = gf_rdma_create_qp (this); +        if (ret != 0) { +                gf_log (this->name, GF_LOG_WARNING, +                        "could not create QP (peer:%s me:%s)", +                        this->peerinfo.identifier, this->myinfo.identifier); +                gf_rdma_cm_handle_disconnect (this); +                goto out; +        } + +	memset(&conn_param, 0, sizeof conn_param); +	conn_param.responder_resources = 1; +	conn_param.initiator_depth = 1; +	conn_param.retry_count = options->attr_retry_cnt; +        conn_param.rnr_retry_count = options->attr_rnr_retry; + +	ret = rdma_connect(peer->cm_id, &conn_param); +	if (ret != 0) { +                gf_log (this->name, GF_LOG_WARNING, +                        "rdma_connect failed (%s)", strerror (errno)); +                gf_rdma_cm_handle_disconnect (this); +                goto out; +	} + +        gf_log (this->name, GF_LOG_TRACE, "route resolved (me:%s peer:%s)", +                this->myinfo.identifier, this->peerinfo.identifier); + +        ret = 0; +out: +        return ret; +} + + +static int +gf_rdma_cm_handle_addr_resolved (struct rdma_cm_event *event) +{ +        rpc_transport_t   *this = NULL; +        gf_rdma_peer_t    *peer = NULL; +        gf_rdma_private_t *priv = NULL; +        int                ret  = 0; + +        this = event->id->context; + +        priv = this->private; +        peer = &priv->peer; + +        GF_ASSERT (peer->cm_id == event->id); + +        this->myinfo.sockaddr_len = sizeof (peer->cm_id->route.addr.src_addr); +        memcpy (&this->myinfo.sockaddr, &peer->cm_id->route.addr.src_addr, +                this->myinfo.sockaddr_len); + +        this->peerinfo.sockaddr_len = sizeof (peer->cm_id->route.addr.dst_addr); +        memcpy (&this->peerinfo.sockaddr, &peer->cm_id->route.addr.dst_addr, +                this->peerinfo.sockaddr_len); + +        gf_rdma_get_transport_identifiers (this); + +        ret = rdma_resolve_route(peer->cm_id, 2000); +        if (ret != 0) { +                gf_log (this->name, GF_LOG_WARNING, +                        "rdma_resolve_route failed (me:%s peer:%s) (%s)", +                        this->myinfo.identifier, this->peerinfo.identifier, +                        strerror (errno)); +                gf_rdma_cm_handle_disconnect (this); +        } + +        gf_log (this->name, GF_LOG_TRACE, "Address resolved (me:%s peer:%s)", +                this->myinfo.identifier, this->peerinfo.identifier); + +        return ret; +} + + +static void +gf_rdma_cm_handle_disconnect (rpc_transport_t *this) +{ +        gf_rdma_private_t *priv       = NULL; +        char               need_unref = 0, connected = 0; + +        priv = this->private; +        gf_log (this->name, GF_LOG_DEBUG, +                "peer disconnected, cleaning up"); + +        pthread_mutex_lock (&priv->write_mutex); +        { +                if (priv->peer.cm_id != NULL) { +                        need_unref = 1; +                        connected = priv->connected;                          priv->connected = 0;                  } + +                __gf_rdma_teardown (this); +        } +        pthread_mutex_unlock (&priv->write_mutex); + +        if (connected) { +                rpc_transport_notify (this, RPC_TRANSPORT_DISCONNECT, this); +        } + +        if (need_unref) +                rpc_transport_unref (this); + +} + + +static int +gf_rdma_cm_handle_event_established (struct rdma_cm_event *event) +{ +        rpc_transport_t   *this  = NULL; +        gf_rdma_private_t *priv  = NULL; +        struct rdma_cm_id *cm_id = NULL; +        int                ret   = 0; + +        cm_id = event->id; +        this = cm_id->context; +        priv = this->private; + +        priv->connected = 1; + +        pthread_mutex_lock (&priv->write_mutex); +        { +                priv->peer.quota = 1; +                priv->peer.quota_set = 0; +        } +        pthread_mutex_unlock (&priv->write_mutex); + +        if (priv->entity == GF_RDMA_CLIENT) { +                ret = rpc_transport_notify (this, RPC_TRANSPORT_CONNECT, this); + +        } else if (priv->entity == GF_RDMA_SERVER) { +                ret = rpc_transport_notify (priv->listener, +                                            RPC_TRANSPORT_ACCEPT, this); +        } + +        if (ret < 0) { +                gf_rdma_disconnect (this);          } +        gf_log (this->name, GF_LOG_TRACE, +                "recieved event RDMA_CM_EVENT_ESTABLISHED (me:%s peer:%s)", +                this->myinfo.identifier, this->peerinfo.identifier); +          return ret;  } +static int +gf_rdma_cm_handle_event_error (rpc_transport_t *this) +{ +        gf_rdma_private_t *priv  = NULL; + +        priv = this->private; + +        if (priv->entity != GF_RDMA_SERVER_LISTENER) { +                gf_rdma_cm_handle_disconnect (this); +        } + +        return 0; +} + + +static int +gf_rdma_cm_handle_device_removal (struct rdma_cm_event *event) +{ +        return 0; +} + + +static void * +gf_rdma_cm_event_handler (void *data) +{ +        struct rdma_cm_event      *event         = NULL; +        int                        ret           = 0; +        rpc_transport_t           *this          = NULL; +        struct rdma_event_channel *event_channel = NULL; + +        event_channel = data; + +        while (1) { +                ret = rdma_get_cm_event (event_channel, &event); +                if (ret != 0) { +                        gf_log (GF_RDMA_LOG_NAME, GF_LOG_WARNING, +                                "rdma_cm_get_event failed (%s)", +                                strerror (errno)); +                        break; +                } + +                switch (event->event) { +                case RDMA_CM_EVENT_ADDR_RESOLVED: +                        gf_rdma_cm_handle_addr_resolved (event); +                        break; + +                case RDMA_CM_EVENT_ROUTE_RESOLVED: +                        gf_rdma_cm_handle_route_resolved (event); +                        break; + +                case RDMA_CM_EVENT_CONNECT_REQUEST: +                        gf_rdma_cm_handle_connect_request (event); +                        break; + +                case RDMA_CM_EVENT_ESTABLISHED: +                        gf_rdma_cm_handle_event_established (event); +                        break; + +                case RDMA_CM_EVENT_ADDR_ERROR: +                case RDMA_CM_EVENT_ROUTE_ERROR: +                case RDMA_CM_EVENT_CONNECT_ERROR: +                case RDMA_CM_EVENT_UNREACHABLE: +                case RDMA_CM_EVENT_REJECTED: +                        this = event->id->context; + +                        gf_log (this->name, GF_LOG_WARNING, +                                "cma event %s, error %d (me:%s peer:%s)\n", +                                rdma_event_str(event->event), event->status, +                                this->myinfo.identifier, +                                this->peerinfo.identifier); + +                        rdma_ack_cm_event (event); +                        event = NULL; + +                        gf_rdma_cm_handle_event_error (this); +                        continue; + +                case RDMA_CM_EVENT_DISCONNECTED: +                        this = event->id->context; + +                        gf_log (this->name, GF_LOG_DEBUG, +                                "recieved disconnect (me:%s peer:%s)\n", +                                this->myinfo.identifier, +                                this->peerinfo.identifier); + +                        rdma_ack_cm_event (event); +                        event = NULL; + +                        gf_rdma_cm_handle_disconnect (this); +                        continue; + +                case RDMA_CM_EVENT_DEVICE_REMOVAL: +                        gf_log (GF_RDMA_LOG_NAME, GF_LOG_WARNING, +                                "device removed"); +                        gf_rdma_cm_handle_device_removal (event); +                        break; + +                default: +                        gf_log (GF_RDMA_LOG_NAME, GF_LOG_WARNING, +                                "unhandled event: %s, ignoring", +                                rdma_event_str(event->event)); +                        break; +                } + +                rdma_ack_cm_event (event); +        } + +        return NULL; +} + +  static int32_t  gf_rdma_post_send (struct ibv_qp *qp, gf_rdma_post_t *post, int32_t len)  { @@ -832,28 +1607,6 @@ out:  } -static int32_t -gf_rdma_post_recv (struct ibv_srq *srq, -                   gf_rdma_post_t *post) -{ -        struct ibv_sge list = { -                .addr   = (unsigned long) post->buf, -                .length = post->buf_size, -                .lkey   = post->mr->lkey -        }; - -        struct ibv_recv_wr wr = { -                .wr_id  = (unsigned long) post, -                .sg_list = &list, -                .num_sge = 1, -        }, *bad_wr; - -        gf_rdma_post_ref (post); - -        return ibv_post_srq_recv (srq, &wr, &bad_wr); -} - -  int  gf_rdma_post_unref (gf_rdma_post_t *post)  { @@ -1263,7 +2016,7 @@ out:  } -static inline int32_t +inline int32_t  __gf_rdma_register_local_mr_for_rdma (gf_rdma_peer_t *peer,                                        struct iovec *vector, int count,                                        gf_rdma_post_context_t *ctx) @@ -1715,7 +2468,7 @@ __gf_rdma_ioq_churn_entry (gf_rdma_peer_t *peer, gf_rdma_ioq_t *entry)          if (quota > 0) {                  post = gf_rdma_get_post (&device->sendq);                  if (post == NULL) { -                        post = gf_rdma_new_post (device, +                        post = gf_rdma_new_post (peer->trans, device,                                                   (options->send_size + 2048),                                                   GF_RDMA_SEND_POST);                  } @@ -1992,189 +2745,6 @@ out:          return ret;  } -#if 0 -static int -gf_rdma_receive (rpc_transport_t *this, char **hdr_p, size_t *hdrlen_p, -                 struct iobuf **iobuf_p) -{ -        gf_rdma_private_t *priv                   = this->private; -        /* TODO: return error if !priv->connected, check with locks */ -        /* TODO: boundry checks for data_ptr/offset */ -        char              *copy_from              = NULL; -        gf_rdma_header_t  *header                 = NULL; -        uint32_t           size1, size2, data_len = 0; -        char              *hdr                    = NULL; -        struct iobuf      *iobuf                  = NULL; -        int32_t            ret                    = 0; - -        pthread_mutex_lock (&priv->recv_mutex); -        { -/* -  while (!priv->data_ptr) -  pthread_cond_wait (&priv->recv_cond, &priv->recv_mutex); -*/ - -                copy_from = priv->data_ptr + priv->data_offset; - -                priv->data_ptr = NULL; -                data_len = priv->data_len; -                pthread_cond_broadcast (&priv->recv_cond); -        } -        pthread_mutex_unlock (&priv->recv_mutex); - -        header = (gf_rdma_header_t *)copy_from; -        if (strcmp (header->colonO, ":O")) { -                gf_log (GF_RDMA_LOG_NAME, GF_LOG_DEBUG, -                        "%s: corrupt header received", this->name); -                ret = -1; -                goto err; -        } - -        size1 = ntoh32 (header->size1); -        size2 = ntoh32 (header->size2); - -        if (data_len != (size1 + size2 + sizeof (*header))) { -                gf_log (GF_RDMA_LOG_NAME, GF_LOG_DEBUG, -                        "%s: sizeof data read from transport is not equal " -                        "to the size specified in the header", -                        this->name); -                ret = -1; -                goto err; -        } - -        copy_from += sizeof (*header); - -        if (size1) { -                hdr = GF_CALLOC (1, size1, gf_common_mt_char); -                if (!hdr) { -                        gf_log (this->name, GF_LOG_ERROR, -                                "unable to allocate header for peer %s", -                                this->peerinfo.identifier); -                        ret = -ENOMEM; -                        goto err; -                } -                memcpy (hdr, copy_from, size1); -                copy_from += size1; -                *hdr_p = hdr; -        } -        *hdrlen_p = size1; - -        if (size2) { -                iobuf = iobuf_get2 (this->ctx->iobuf_pool, size2); -                if (!iobuf) { -                        gf_log (this->name, GF_LOG_ERROR, -                                "unable to allocate IO buffer for peer %s", -                                this->peerinfo.identifier); -                        ret = -ENOMEM; -                        goto err; -                } -                memcpy (iobuf->ptr, copy_from, size2); -                *iobuf_p = iobuf; -        } - -err: -        return ret; -} -#endif - - -static void -gf_rdma_destroy_cq (rpc_transport_t *this) -{ -        gf_rdma_private_t *priv   = NULL; -        gf_rdma_device_t  *device = NULL; - -        priv = this->private; -        device = priv->device; - -        if (device->recv_cq) -                ibv_destroy_cq (device->recv_cq); -        device->recv_cq = NULL; - -        if (device->send_cq) -                ibv_destroy_cq (device->send_cq); -        device->send_cq = NULL; - -        return; -} - - -static int32_t -gf_rdma_create_cq (rpc_transport_t *this) -{ -        gf_rdma_private_t      *priv        = NULL; -        gf_rdma_options_t      *options     = NULL; -        gf_rdma_device_t       *device      = NULL; -        uint64_t                send_cqe    = 0; -        int32_t                 ret         = 0; -        struct ibv_device_attr  device_attr = {{0}, }; - -        priv = this->private; -        options = &priv->options; -        device = priv->device; - -        device->recv_cq = ibv_create_cq (priv->device->context, -                                         options->recv_count * 2, -                                         device, -                                         device->recv_chan, -                                         0); -        if (!device->recv_cq) { -                gf_log (GF_RDMA_LOG_NAME, GF_LOG_ERROR, -                        "%s: creation of CQ for device %s failed", -                        this->name, device->device_name); -                ret = -1; -                goto out; -        } else if (ibv_req_notify_cq (device->recv_cq, 0)) { -                gf_log (GF_RDMA_LOG_NAME, GF_LOG_ERROR, -                        "%s: ibv_req_notify_cq on recv CQ of device %s failed", -                        this->name, device->device_name); -                ret = -1; -                goto out; -        } - -        do { -                ret = ibv_query_device (priv->device->context, &device_attr); -                if (ret != 0) { -                        gf_log (GF_RDMA_LOG_NAME, GF_LOG_ERROR, -                                "%s: ibv_query_device on %s returned %d (%s)", -                                this->name, priv->device->device_name, ret, -                                (ret > 0) ? strerror (ret) : ""); -                        ret = -1; -                        goto out; -                } - -                send_cqe = options->send_count * 128; -                send_cqe = (send_cqe > device_attr.max_cqe) -                        ? device_attr.max_cqe : send_cqe; - -                /* TODO: make send_cq size dynamically adaptive */ -                device->send_cq = ibv_create_cq (priv->device->context, -                                                 send_cqe, device, -                                                 device->send_chan, 0); -                if (!device->send_cq) { -                        gf_log (GF_RDMA_LOG_NAME, GF_LOG_ERROR, -                                "%s: creation of send_cq for device %s failed", -                                this->name, device->device_name); -                        ret = -1; -                        goto out; -                } - -                if (ibv_req_notify_cq (device->send_cq, 0)) { -                        gf_log (GF_RDMA_LOG_NAME, GF_LOG_ERROR, -                                "%s: ibv_req_notify_cq on send_cq for device %s" -                                " failed", this->name, device->device_name); -                        ret = -1; -                        goto out; -                } -        } while (0); - -out: -        if (ret != 0) -                gf_rdma_destroy_cq (this); - -        return ret; -} -  static int  gf_rdma_register_peer (gf_rdma_device_t *device, int32_t qp_num, @@ -2273,25 +2843,6 @@ __gf_rdma_lookup_peer (gf_rdma_device_t *device, int32_t qp_num)          return peer;  } -/* -  static gf_rdma_peer_t * -  gf_rdma_lookup_peer (gf_rdma_device_t *device, -  int32_t qp_num) -  { -  gf_rdma_qpreg_t *qpreg = NULL; -  gf_rdma_peer_t  *peer  = NULL; - -  qpreg = &device->qpreg; -  pthread_mutex_lock (&qpreg->lock); -  { -  peer = __gf_rdma_lookup_peer (device, qp_num); -  } -  pthread_mutex_unlock (&qpreg->lock); - -  return peer; -  } -*/ -  static void  __gf_rdma_destroy_qp (rpc_transport_t *this) @@ -2301,7 +2852,7 @@ __gf_rdma_destroy_qp (rpc_transport_t *this)          priv = this->private;          if (priv->peer.qp) {                  gf_rdma_unregister_peer (priv->device, priv->peer.qp->qp_num); -                ibv_destroy_qp (priv->peer.qp); +                rdma_destroy_qp (priv->peer.cm_id);          }          priv->peer.qp = NULL; @@ -2312,18 +2863,36 @@ __gf_rdma_destroy_qp (rpc_transport_t *this)  static int32_t  gf_rdma_create_qp (rpc_transport_t *this)  { -        gf_rdma_private_t *priv    = NULL; -        gf_rdma_options_t *options = NULL; -        gf_rdma_device_t  *device  = NULL; -        int32_t            ret     = 0; -        gf_rdma_peer_t    *peer    = NULL; +        gf_rdma_private_t *priv        = NULL; +        gf_rdma_device_t  *device      = NULL; +        int32_t            ret         = 0; +        gf_rdma_peer_t    *peer        = NULL; +        char              *device_name = NULL;          priv = this->private; -        options = &priv->options; -        device = priv->device;          peer = &priv->peer; +        device_name = (char *)ibv_get_device_name (peer->cm_id->verbs->device); +        if (device_name == NULL) { +                ret = -1; +                gf_log (this->name, GF_LOG_WARNING, "cannot get device_name"); +                goto out; +        } + +        device = gf_rdma_get_device (this, peer->cm_id->verbs, +                                     device_name); +        if (device == NULL) { +                ret = -1; +                gf_log (this->name, GF_LOG_WARNING, "cannot get device for " +                        "device %s", device_name); +                goto out; +        } + +        if (priv->device == NULL) { +                priv->device = device; +        } +          struct ibv_qp_init_attr init_attr = {                  .send_cq        = device->send_cq,                  .recv_cq        = device->recv_cq, @@ -2337,39 +2906,16 @@ gf_rdma_create_qp (rpc_transport_t *this)                  .qp_type = IBV_QPT_RC          }; -        struct ibv_qp_attr attr = { -                .qp_state        = IBV_QPS_INIT, -                .pkey_index      = 0, -                .port_num        = options->port, -                .qp_access_flags -                = IBV_ACCESS_REMOTE_READ | IBV_ACCESS_REMOTE_WRITE -        }; - -        peer->qp = ibv_create_qp (device->pd, &init_attr); -        if (!peer->qp) { -                gf_log (GF_RDMA_LOG_NAME, -                        GF_LOG_CRITICAL, -                        "%s: could not create QP", -                        this->name); -                ret = -1; -                goto out; -        } else if (ibv_modify_qp (peer->qp, &attr, -                                  IBV_QP_STATE              | -                                  IBV_QP_PKEY_INDEX         | -                                  IBV_QP_PORT               | -                                  IBV_QP_ACCESS_FLAGS)) { -                gf_log (GF_RDMA_LOG_NAME, -                        GF_LOG_ERROR, -                        "%s: failed to modify QP to INIT state", -                        this->name); +        ret = rdma_create_qp(peer->cm_id, device->pd, &init_attr); +        if (ret != 0) { +                gf_log (peer->trans->name, GF_LOG_CRITICAL, +                        "%s: could not create QP (%s)", this->name, +                        strerror (errno));                  ret = -1;                  goto out;          } -        peer->local_lid = gf_rdma_get_local_lid (device->context, -                                                 options->port); -        peer->local_qpn = peer->qp->qp_num; -        peer->local_psn = lrand48 () & 0xffffff; +        peer->qp = peer->cm_id->qp;          ret = gf_rdma_register_peer (device, peer->qp->qp_num, peer); @@ -2381,300 +2927,52 @@ out:  } -static void -gf_rdma_destroy_posts (rpc_transport_t *this) -{ - -} - -  static int32_t -__gf_rdma_create_posts (rpc_transport_t *this, int32_t count, int32_t size, -                        gf_rdma_queue_t *q, gf_rdma_post_type_t type) -{ -        int32_t            i      = 0; -        int32_t            ret    = 0; -        gf_rdma_private_t *priv   = NULL; -        gf_rdma_device_t  *device = NULL; - -        priv = this->private; -        device = priv->device; - -        for (i=0 ; i<count ; i++) { -                gf_rdma_post_t *post = NULL; - -                post = gf_rdma_new_post (device, size + 2048, type); -                if (!post) { -                        gf_log (GF_RDMA_LOG_NAME, GF_LOG_ERROR, -                                "%s: post creation failed", -                                this->name); -                        ret = -1; -                        break; -                } - -                gf_rdma_put_post (q, post); -        } -        return ret; -} - - -static int32_t -gf_rdma_create_posts (rpc_transport_t *this) +__gf_rdma_teardown (rpc_transport_t *this)  { -        int32_t            i       = 0, ret = 0; -        gf_rdma_post_t    *post    = NULL; -        gf_rdma_private_t *priv    = NULL; -        gf_rdma_options_t *options = NULL; -        gf_rdma_device_t  *device  = NULL; +        gf_rdma_private_t *priv = NULL; +        gf_rdma_peer_t    *peer = NULL;          priv = this->private; -        options = &priv->options; -        device = priv->device; - -        ret =  __gf_rdma_create_posts (this, options->send_count, -                                       options->send_size, -                                       &device->sendq, GF_RDMA_SEND_POST); -        if (!ret) -                ret =  __gf_rdma_create_posts (this, options->recv_count, -                                               options->recv_size, -                                               &device->recvq, -                                               GF_RDMA_RECV_POST); +        peer = &priv->peer; -        if (!ret) { -                for (i=0 ; i<options->recv_count ; i++) { -                        post = gf_rdma_get_post (&device->recvq); -                        if (gf_rdma_post_recv (device->srq, post) != 0) { -                                ret = -1; -                                break; -                        } -                } +        if (peer->cm_id->qp != NULL) { +                __gf_rdma_destroy_qp (this);          } -        if (ret) -                gf_rdma_destroy_posts (this); - -        return ret; -} - - -static int32_t -gf_rdma_connect_qp (rpc_transport_t *this) -{ -        gf_rdma_private_t *priv = this->private; -        gf_rdma_options_t *options = &priv->options; -        struct ibv_qp_attr attr = { -                .qp_state               = IBV_QPS_RTR, -                .path_mtu               = options->mtu, -                .dest_qp_num            = priv->peer.remote_qpn, -                .rq_psn                 = priv->peer.remote_psn, -                .max_dest_rd_atomic     = 1, -                .min_rnr_timer          = 12, -                .qp_access_flags -                = IBV_ACCESS_REMOTE_READ | IBV_ACCESS_REMOTE_WRITE, -                .ah_attr                = { -                        .is_global      = 0, -                        .dlid           = priv->peer.remote_lid, -                        .sl             = 0, -                        .src_path_bits  = 0, -                        .port_num       = options->port -                } -        }; -        if (ibv_modify_qp (priv->peer.qp, &attr, -                           IBV_QP_STATE              | -                           IBV_QP_AV                 | -                           IBV_QP_PATH_MTU           | -                           IBV_QP_DEST_QPN           | -                           IBV_QP_RQ_PSN             | -                           IBV_QP_MAX_DEST_RD_ATOMIC | -                           IBV_QP_MIN_RNR_TIMER)) { -                gf_log (GF_RDMA_LOG_NAME, -                        GF_LOG_CRITICAL, -                        "Failed to modify QP to RTR\n"); -                return -1; +        if (!list_empty (&priv->peer.ioq)) { +                __gf_rdma_ioq_flush (peer);          } -        attr.qp_state       = IBV_QPS_RTS; -        attr.timeout        = options->attr_timeout; -        attr.retry_cnt      = options->attr_retry_cnt; -        attr.rnr_retry      = options->attr_rnr_retry; -        attr.sq_psn         = priv->peer.local_psn; -        attr.max_rd_atomic  = 1; -        if (ibv_modify_qp (priv->peer.qp, &attr, -                           IBV_QP_STATE              | -                           IBV_QP_TIMEOUT            | -                           IBV_QP_RETRY_CNT          | -                           IBV_QP_RNR_RETRY          | -                           IBV_QP_SQ_PSN             | -                           IBV_QP_MAX_QP_RD_ATOMIC)) { -                gf_log (GF_RDMA_LOG_NAME, -                        GF_LOG_CRITICAL, -                        "Failed to modify QP to RTS\n"); -                return -1; +        if (peer->cm_id != NULL) { +                rdma_destroy_id (peer->cm_id); +                peer->cm_id = NULL;          } +        /* TODO: decrement cq size */          return 0;  } +  static int32_t -__gf_rdma_teardown (rpc_transport_t *this) +gf_rdma_teardown (rpc_transport_t *this)  { +        int32_t ret = 0;          gf_rdma_private_t *priv = NULL; -        priv = this->private; -        __gf_rdma_destroy_qp (this); - -        if (!list_empty (&priv->peer.ioq)) { -                __gf_rdma_ioq_flush (&priv->peer); +        if (this == NULL) { +                goto out;          } -        /* TODO: decrement cq size */ -        return 0; -} - -/* - * return value: - *   0 = success (completed) - *  -1 = error - * > 0 = incomplete - */ - -static int -__tcp_rwv (rpc_transport_t *this, struct iovec *vector, int count, -           struct iovec **pending_vector, int *pending_count, -           int write) -{ -        gf_rdma_private_t *priv     = NULL; -        int                sock     = -1; -        int                ret      = -1; -        struct iovec      *opvector = NULL; -        int                opcount  = 0; -        int                moved    = 0; -          priv = this->private; -        sock = priv->sock; -        opvector = vector; -        opcount = count; -        while (opcount) +        pthread_mutex_lock (&priv->write_mutex);          { -                if (write) -                { -                        ret = writev (sock, opvector, opcount); - -                        if (ret == 0 || (ret == -1 && errno == EAGAIN)) -                        { -                                /* done for now */ -                                break; -                        } -                } -                else -                { -                        ret = readv (sock, opvector, opcount); - -                        if (ret == -1 && errno == EAGAIN) -                        { -                                /* done for now */ -                                break; -                        } -                } - -                if (ret == 0) -                { -                        gf_log (this->name, GF_LOG_DEBUG, -                                "EOF from peer %s", this->peerinfo.identifier); -                        opcount = -1; -                        errno = ENOTCONN; -                        break; -                } - -                if (ret == -1) -                { -                        if (errno == EINTR) -                                continue; - -                        gf_log (this->name, GF_LOG_DEBUG, -                                "%s failed (%s)", write ? "writev" : "readv", -                                strerror (errno)); -                        if (write && !priv->connected && -                            (errno == ECONNREFUSED)) -                                gf_log (this->name, GF_LOG_ERROR, -                                        "possible mismatch of 'rpc-transport-type'" -                                        " in protocol server and client. " -                                        "check volume file"); -                        opcount = -1; -                        break; -                } - -                moved = 0; - -                while (moved < ret) -                { -                        if ((ret - moved) >= opvector[0].iov_len) -                        { -                                moved += opvector[0].iov_len; -                                opvector++; -                                opcount--; -                        } -                        else -                        { -                                opvector[0].iov_len -= (ret - moved); -                                opvector[0].iov_base += (ret - moved); -                                moved += (ret - moved); -                        } -                        while (opcount && !opvector[0].iov_len) -                        { -                                opvector++; -                                opcount--; -                        } -                } -        } - -        if (pending_vector) -                *pending_vector = opvector; - -        if (pending_count) -                *pending_count = opcount; - -        return opcount; -} - - -static int -__tcp_readv (rpc_transport_t *this, struct iovec *vector, int count, -             struct iovec **pending_vector, int *pending_count) -{ -        int ret = -1; - -        ret = __tcp_rwv (this, vector, count, -                         pending_vector, pending_count, 0); - -        return ret; -} - - -static int -__tcp_writev (rpc_transport_t *this, struct iovec *vector, int count, -              struct iovec **pending_vector, int *pending_count) -{ -        int                ret  = -1; -        gf_rdma_private_t *priv = NULL; - -        priv = this->private; - -        ret = __tcp_rwv (this, vector, count, pending_vector, -                         pending_count, 1); - -        if (ret > 0) { -                /* TODO: Avoid multiple calls when socket is already -                   registered for POLLOUT */ -                priv->idx = event_select_on (this->ctx->event_pool, -                                             priv->sock, priv->idx, -1, 1); -        } else if (ret == 0) { -                priv->idx = event_select_on (this->ctx->event_pool, -                                             priv->sock, -                                             priv->idx, -1, 0); +                ret = __gf_rdma_teardown (this);          } +        pthread_mutex_unlock (&priv->write_mutex); +out:          return ret;  } @@ -2765,7 +3063,7 @@ out:  } -static inline int32_t +inline int32_t  gf_rdma_decode_error_msg (gf_rdma_peer_t *peer, gf_rdma_post_t *post,                            size_t bytes_in_post)  { @@ -3307,7 +3605,7 @@ out:  } -static inline int32_t +inline int32_t  gf_rdma_recv_request (gf_rdma_peer_t *peer, gf_rdma_post_t *post,                        gf_rdma_read_chunk_t *readch)  { @@ -3340,6 +3638,7 @@ gf_rdma_process_recv (gf_rdma_peer_t *peer, struct ibv_wc *wc)          uint32_t             *ptr      = NULL;          enum msg_type         msg_type = 0;          gf_rdma_header_t     *header   = NULL; +        gf_rdma_private_t    *priv     = NULL;          post = (gf_rdma_post_t *) (long) wc->wr_id;          if (post == NULL) { @@ -3357,6 +3656,26 @@ gf_rdma_process_recv (gf_rdma_peer_t *peer, struct ibv_wc *wc)          header = (gf_rdma_header_t *)post->buf; +	priv = peer->trans->private; + +        pthread_mutex_lock (&priv->write_mutex); +        { +                if (!priv->peer.quota_set) { +                        priv->peer.quota_set = 1; + +                        /* Initially peer.quota is set to 1 as per RFC 5666. We +                         * have to account for the quota used while sending +                         * first msg (which may or may not be returned to pool +                         * at this point) while deriving peer.quota from +                         * header->rm_credit. Hence the arithmatic below, +                         * instead of directly setting it to header->rm_credit. +                         */ +                        priv->peer.quota = header->rm_credit +                                - ( 1 - priv->peer.quota); +                } +        } +        pthread_mutex_unlock (&priv->write_mutex); +          switch (header->rm_type) {          case GF_RDMA_MSG:                  ptr = (uint32_t *)post->ctx.vector[0].iov_base; @@ -3809,364 +4128,87 @@ gf_rdma_options_init (rpc_transport_t *this)          return;  } -static void -gf_rdma_queue_init (gf_rdma_queue_t *queue) -{ -        pthread_mutex_init (&queue->lock, NULL); - -        queue->active_posts.next = &queue->active_posts; -        queue->active_posts.prev = &queue->active_posts; -        queue->passive_posts.next = &queue->passive_posts; -        queue->passive_posts.prev = &queue->passive_posts; -} - -static gf_rdma_device_t * -gf_rdma_get_device (rpc_transport_t *this, struct ibv_context *ibctx) +gf_rdma_ctx_t * +__gf_rdma_ctx_create (void)  { -        glusterfs_ctx_t   *ctx         = NULL; -        gf_rdma_private_t *priv        = NULL; -        gf_rdma_options_t *options     = NULL; -        char              *device_name = NULL; -        uint32_t           port        = 0; -        uint8_t            active_port = 0; -        int32_t            ret         = 0; -        int32_t            i           = 0; -        gf_rdma_device_t  *trav        = NULL; +        gf_rdma_ctx_t *rdma_ctx = NULL; +        int            ret      = -1; -        priv        = this->private; -        options     = &priv->options; -        device_name = priv->options.device_name; -        ctx         = this->ctx; -        trav        = ctx->ib; -        port        = priv->options.port; - -        while (trav) { -                if ((!strcmp (trav->device_name, device_name)) && -                    (trav->port == port)) -                        break; -                trav = trav->next; +        rdma_ctx = GF_CALLOC (1, sizeof (*rdma_ctx), gf_common_mt_char); +        if (rdma_ctx == NULL) { +                goto out;          } -        if (!trav) { - -                trav = GF_CALLOC (1, sizeof (*trav), -                                  gf_common_mt_rdma_device_t); -                if (trav == NULL) { -                        return NULL; -                } - -                priv->device = trav; - -                trav->context = ibctx; - -                ret = ib_get_active_port (trav->context); - -                if (ret < 0) { -                        if (!port) { -                                gf_log (GF_RDMA_LOG_NAME, GF_LOG_ERROR, -                                        "Failed to find any active ports and " -                                        "none specified in volume file," -                                        " exiting"); -                                GF_FREE (trav); -                                return NULL; -                        } -                } - -                trav->request_ctx_pool = mem_pool_new (gf_rdma_request_context_t, -                                                       GF_RDMA_POOL_SIZE); -                if (trav->request_ctx_pool == NULL) { -                        return NULL; -                } - -                trav->ioq_pool = mem_pool_new (gf_rdma_ioq_t, GF_RDMA_POOL_SIZE); -                if (trav->ioq_pool == NULL) { -                        mem_pool_destroy (trav->request_ctx_pool); -                        return NULL; -                } - -                trav->reply_info_pool = mem_pool_new (gf_rdma_reply_info_t, -                                                      GF_RDMA_POOL_SIZE); -                if (trav->reply_info_pool == NULL) { -                        mem_pool_destroy (trav->request_ctx_pool); -                        mem_pool_destroy (trav->ioq_pool); -                        return NULL; -                } - - -                active_port = ret; - -                if (port) { -                        ret = ib_check_active_port (trav->context, port); -                        if (ret < 0) { -                                gf_log (GF_RDMA_LOG_NAME, GF_LOG_WARNING, -                                        "On device %s: provided port:%u is " -                                        "found to be offline, continuing to " -                                        "use the same port", device_name, port); -                        } -                } else { -                        priv->options.port = active_port; -                        port = active_port; -                        gf_log (GF_RDMA_LOG_NAME, GF_LOG_TRACE, -                                "Port unspecified in volume file using active " -                                "port: %u", port); -                } - -                trav->device_name = gf_strdup (device_name); -                trav->port = port; - -                trav->next = ctx->ib; -                ctx->ib = trav; - -                trav->send_chan = ibv_create_comp_channel (trav->context); -                if (!trav->send_chan) { -                        gf_log (GF_RDMA_LOG_NAME, GF_LOG_ERROR, -                                "%s: could not create send completion channel", -                                device_name); -                        mem_pool_destroy (trav->ioq_pool); -                        mem_pool_destroy (trav->request_ctx_pool); -                        mem_pool_destroy (trav->reply_info_pool); -                        GF_FREE ((char *)trav->device_name); -                        GF_FREE (trav); -                        return NULL; -                } - -                trav->recv_chan = ibv_create_comp_channel (trav->context); -                if (!trav->recv_chan) { -                        mem_pool_destroy (trav->ioq_pool); -                        mem_pool_destroy (trav->request_ctx_pool); -                        mem_pool_destroy (trav->reply_info_pool); -                        ibv_destroy_comp_channel (trav->send_chan); -                        GF_FREE ((char *)trav->device_name); -                        GF_FREE (trav); -                        gf_log (GF_RDMA_LOG_NAME, GF_LOG_ERROR, -                                "could not create recv completion channel"); -                        /* TODO: cleanup current mess */ -                        return NULL; -                } - -                if (gf_rdma_create_cq (this) < 0) { -                        mem_pool_destroy (trav->ioq_pool); -                        mem_pool_destroy (trav->request_ctx_pool); -                        mem_pool_destroy (trav->reply_info_pool); -                        ibv_destroy_comp_channel (trav->recv_chan); -                        ibv_destroy_comp_channel (trav->send_chan); -                        GF_FREE ((char *)trav->device_name); -                        GF_FREE (trav); -                        gf_log (GF_RDMA_LOG_NAME, GF_LOG_ERROR, -                                "%s: could not create CQ", -                                this->name); -                        return NULL; -                } - -                /* protection domain */ -                trav->pd = ibv_alloc_pd (trav->context); - -                if (!trav->pd) { -                        mem_pool_destroy (trav->ioq_pool); -                        mem_pool_destroy (trav->request_ctx_pool); -                        mem_pool_destroy (trav->reply_info_pool); -                        gf_rdma_destroy_cq (this); -                        ibv_destroy_comp_channel (trav->recv_chan); -                        ibv_destroy_comp_channel (trav->send_chan); -                        GF_FREE ((char *)trav->device_name); -                        GF_FREE (trav); -                        gf_log (GF_RDMA_LOG_NAME, GF_LOG_ERROR, -                                "%s: could not allocate protection domain", -                                this->name); -                        return NULL; -                } - -                struct ibv_srq_init_attr attr = { -                        .attr = { -                                .max_wr = options->recv_count, -                                .max_sge = 1 -                        } -                }; -                trav->srq = ibv_create_srq (trav->pd, &attr); - -                if (!trav->srq) { -                        mem_pool_destroy (trav->ioq_pool); -                        mem_pool_destroy (trav->request_ctx_pool); -                        mem_pool_destroy (trav->reply_info_pool); -                        ibv_dealloc_pd (trav->pd); -                        gf_rdma_destroy_cq (this); -                        ibv_destroy_comp_channel (trav->recv_chan); -                        ibv_destroy_comp_channel (trav->send_chan); -                        GF_FREE ((char *)trav->device_name); -                        GF_FREE (trav); - -                        gf_log (GF_RDMA_LOG_NAME, GF_LOG_ERROR, -                                "%s: could not create SRQ", -                                this->name); -                        return NULL; -                } - -                /* queue init */ -                gf_rdma_queue_init (&trav->sendq); -                gf_rdma_queue_init (&trav->recvq); - -                if (gf_rdma_create_posts (this) < 0) { -                        mem_pool_destroy (trav->ioq_pool); -                        mem_pool_destroy (trav->request_ctx_pool); -                        mem_pool_destroy (trav->reply_info_pool); -                        ibv_dealloc_pd (trav->pd); -                        gf_rdma_destroy_cq (this); -                        ibv_destroy_comp_channel (trav->recv_chan); -                        ibv_destroy_comp_channel (trav->send_chan); -                        GF_FREE ((char *)trav->device_name); -                        GF_FREE (trav); - -                        gf_log (GF_RDMA_LOG_NAME, GF_LOG_ERROR, -                                "%s: could not allocate posts", -                                this->name); -                        return NULL; -                } - -                /* completion threads */ -                ret = pthread_create (&trav->send_thread, -                                      NULL, -                                      gf_rdma_send_completion_proc, -                                      trav->send_chan); -                if (ret) { -                        gf_rdma_destroy_posts (this); -                        mem_pool_destroy (trav->ioq_pool); -                        mem_pool_destroy (trav->request_ctx_pool); -                        mem_pool_destroy (trav->reply_info_pool); -                        ibv_dealloc_pd (trav->pd); -                        gf_rdma_destroy_cq (this); -                        ibv_destroy_comp_channel (trav->recv_chan); -                        ibv_destroy_comp_channel (trav->send_chan); -                        GF_FREE ((char *)trav->device_name); -                        GF_FREE (trav); +        rdma_ctx->rdma_cm_event_channel = rdma_create_event_channel (); +        if (rdma_ctx->rdma_cm_event_channel == NULL) { +                gf_log (GF_RDMA_LOG_NAME, GF_LOG_WARNING, +                        "rdma_cm event channel creation failed (%s)", +                        strerror (errno)); +                goto out; +        } -                        gf_log (GF_RDMA_LOG_NAME, GF_LOG_ERROR, -                                "could not create send completion thread"); -                        return NULL; -                } +        ret = pthread_create (&rdma_ctx->rdma_cm_thread, NULL, +                              gf_rdma_cm_event_handler, +                              rdma_ctx->rdma_cm_event_channel); +        if (ret != 0) { +                gf_log (GF_RDMA_LOG_NAME, GF_LOG_WARNING, +                        "creation of thread to handle rdma-cm events " +                        "failed (%s)", strerror (ret)); +                goto out; +        } -                ret = pthread_create (&trav->recv_thread, -                                      NULL, -                                      gf_rdma_recv_completion_proc, -                                      trav->recv_chan); -                if (ret) { -                        gf_rdma_destroy_posts (this); -                        mem_pool_destroy (trav->ioq_pool); -                        mem_pool_destroy (trav->request_ctx_pool); -                        mem_pool_destroy (trav->reply_info_pool); -                        ibv_dealloc_pd (trav->pd); -                        gf_rdma_destroy_cq (this); -                        ibv_destroy_comp_channel (trav->recv_chan); -                        ibv_destroy_comp_channel (trav->send_chan); -                        GF_FREE ((char *)trav->device_name); -                        GF_FREE (trav); -                        gf_log (GF_RDMA_LOG_NAME, GF_LOG_ERROR, -                                "could not create recv completion thread"); -                        return NULL; +out: +        if (ret < 0) { +                if (rdma_ctx->rdma_cm_event_channel != NULL) { +                        rdma_destroy_event_channel (rdma_ctx->rdma_cm_event_channel);                  } -                /* qpreg */ -                pthread_mutex_init (&trav->qpreg.lock, NULL); -                for (i=0; i<42; i++) { -                        trav->qpreg.ents[i].next = &trav->qpreg.ents[i]; -                        trav->qpreg.ents[i].prev = &trav->qpreg.ents[i]; -                } +                GF_FREE (rdma_ctx); +                rdma_ctx = NULL;          } -        return trav; + +        return rdma_ctx;  }  static int32_t  gf_rdma_init (rpc_transport_t *this)  {          gf_rdma_private_t   *priv    = NULL; -        gf_rdma_options_t   *options = NULL; -        struct ibv_device  **dev_list; -        struct ibv_context  *ib_ctx  = NULL;          int32_t              ret     = 0; +        glusterfs_ctx_t     *ctx     = NULL; +        gf_rdma_options_t   *options = NULL; + +        ctx= this->ctx;          priv = this->private; -        options = &priv->options;          ibv_fork_init ();          gf_rdma_options_init (this); -        { -                dev_list = ibv_get_device_list (NULL); - -                if (!dev_list) { -                        gf_log (GF_RDMA_LOG_NAME, -                                GF_LOG_CRITICAL, -                                "Failed to get IB devices"); -                        ret = -1; -                        goto cleanup; -                } - -                if (!*dev_list) { -                        gf_log (GF_RDMA_LOG_NAME, -                                GF_LOG_CRITICAL, -                                "No IB devices found"); -                        ret = -1; -                        goto cleanup; -                } - -                if (!options->device_name) { -                        if (*dev_list) { -                                options->device_name = -                                        gf_strdup (ibv_get_device_name (*dev_list)); -                        } else { -                                gf_log (GF_RDMA_LOG_NAME, GF_LOG_CRITICAL, -                                        "IB device list is empty. Check for " -                                        "'ib_uverbs' module"); -                                return -1; -                                goto cleanup; -                        } -                } - -                while (*dev_list) { -                        if (!strcmp (ibv_get_device_name (*dev_list), -                                     options->device_name)) { -                                ib_ctx = ibv_open_device (*dev_list); - -                                if (!ib_ctx) { -                                        gf_log (GF_RDMA_LOG_NAME, -                                                GF_LOG_ERROR, -                                                "Failed to get infiniband" -                                                "device context"); -                                        ret = -1; -                                        goto cleanup; -                                } -                                break; -                        } -                        ++dev_list; -                } - -                priv->device = gf_rdma_get_device (this, ib_ctx); - -                if (!priv->device) { -                        gf_log (GF_RDMA_LOG_NAME, GF_LOG_ERROR, -                                "could not create rdma device for %s", -                                options->device_name); -                        ret = -1; -                        goto cleanup; -                } -        } +        options = &priv->options; +        priv->peer.send_count = options->send_count; +        priv->peer.recv_count = options->recv_count; +        priv->peer.send_size = options->send_size; +        priv->peer.recv_size = options->recv_size;          priv->peer.trans = this;          INIT_LIST_HEAD (&priv->peer.ioq); -        pthread_mutex_init (&priv->read_mutex, NULL);          pthread_mutex_init (&priv->write_mutex, NULL);          pthread_mutex_init (&priv->recv_mutex, NULL);          pthread_cond_init (&priv->recv_cond, NULL); -cleanup: -        if (-1 == ret) { -                if (ib_ctx) -                        ibv_close_device (ib_ctx); +        pthread_mutex_lock (&ctx->lock); +        { +                if (ctx->ib == NULL) { +                        ctx->ib = __gf_rdma_ctx_create (); +                        if (ctx->ib == NULL) { +                                ret = -1; +                        } +                }          } - -        if (dev_list) -                ibv_free_device_list (dev_list); +        pthread_mutex_unlock (&ctx->lock);          return ret;  } @@ -4194,537 +4236,54 @@ gf_rdma_disconnect (rpc_transport_t *this)  static int32_t -__tcp_connect_finish (int fd) -{ -        int       ret    = -1; -        int       optval = 0; -        socklen_t optlen = sizeof (int); - -        ret = getsockopt (fd, SOL_SOCKET, SO_ERROR, -                          (void *)&optval, &optlen); - -        if (ret == 0 && optval) -        { -                errno = optval; -                ret = -1; -        } - -        return ret; -} - -static inline void -gf_rdma_fill_handshake_data (char *buf, struct gf_rdma_nbio *nbio, -                             gf_rdma_private_t *priv) -{ -        sprintf (buf, -                 "QP1:RECV_BLKSIZE=%08x:SEND_BLKSIZE=%08x\n" -                 "QP1:LID=%04x:QPN=%06x:PSN=%06x\n", -                 priv->peer.recv_size, -                 priv->peer.send_size, -                 priv->peer.local_lid, -                 priv->peer.local_qpn, -                 priv->peer.local_psn); - -        nbio->vector.iov_base = buf; -        nbio->vector.iov_len = strlen (buf) + 1; -        nbio->count = 1; -        return; -} - -static inline void -gf_rdma_fill_handshake_ack (char *buf, struct gf_rdma_nbio *nbio) -{ -        sprintf (buf, "DONE\n"); -        nbio->vector.iov_base = buf; -        nbio->vector.iov_len = strlen (buf) + 1; -        nbio->count = 1; -        return; -} - -static int -gf_rdma_handshake_pollin (rpc_transport_t *this) -{ -        int                ret           = 0; -        gf_rdma_private_t *priv          = NULL; -        char              *buf           = NULL; -        int32_t            recv_buf_size = 0, send_buf_size; -        socklen_t          sock_len      = 0; - -        priv = this->private; -	buf = priv->handshake.incoming.buf; - -        if (priv->handshake.incoming.state == GF_RDMA_HANDSHAKE_COMPLETE) { -                return -1; -        } - -        pthread_mutex_lock (&priv->write_mutex); -        { -                while (priv->handshake.incoming.state != GF_RDMA_HANDSHAKE_COMPLETE) -                { -                        switch (priv->handshake.incoming.state) -                        { -                        case GF_RDMA_HANDSHAKE_START: -                                buf = priv->handshake.incoming.buf = GF_CALLOC (1, 256, gf_common_mt_char); -                                gf_rdma_fill_handshake_data (buf, &priv->handshake.incoming, priv); -                                buf[0] = 0; -                                priv->handshake.incoming.state = GF_RDMA_HANDSHAKE_RECEIVING_DATA; -                                break; - -                        case GF_RDMA_HANDSHAKE_RECEIVING_DATA: -                                ret = __tcp_readv (this, -                                                   &priv->handshake.incoming.vector, -                                                   priv->handshake.incoming.count, -                                                   &priv->handshake.incoming.pending_vector, -                                                   &priv->handshake.incoming.pending_count); -                                if (ret == -1) { -                                        goto unlock; -                                } - -                                if (ret > 0) { -                                        gf_log (this->name, GF_LOG_TRACE, -                                                "partial header read on NB socket. continue later"); -                                        goto unlock; -                                } - -                                if (!ret) { -                                        priv->handshake.incoming.state = GF_RDMA_HANDSHAKE_RECEIVED_DATA; -                                } -                                break; - -                        case GF_RDMA_HANDSHAKE_RECEIVED_DATA: -                                ret = sscanf (buf, -                                              "QP1:RECV_BLKSIZE=%08x:SEND_BLKSIZE=%08x\n" -                                              "QP1:LID=%04x:QPN=%06x:PSN=%06x\n", -                                              &recv_buf_size, -                                              &send_buf_size, -                                              &priv->peer.remote_lid, -                                              &priv->peer.remote_qpn, -                                              &priv->peer.remote_psn); - -                                if ((ret != 5) && (strncmp (buf, "QP1:", 4))) { -                                        gf_log (GF_RDMA_LOG_NAME, -                                                GF_LOG_CRITICAL, -                                                "%s: remote-host(%s)'s " -                                                "transport type is different", -                                                this->name, -                                                this->peerinfo.identifier); -                                        ret = -1; -                                        goto unlock; -                                } - -                                if (recv_buf_size < priv->peer.recv_size) -                                        priv->peer.recv_size = recv_buf_size; -                                if (send_buf_size < priv->peer.send_size) -                                        priv->peer.send_size = send_buf_size; - -                                gf_log (GF_RDMA_LOG_NAME, GF_LOG_TRACE, -                                        "%s: transacted recv_size=%d " -                                        "send_size=%d", -                                        this->name, priv->peer.recv_size, -                                        priv->peer.send_size); - -                                priv->peer.quota = priv->peer.send_count; - -                                if (gf_rdma_connect_qp (this)) { -                                        gf_log (GF_RDMA_LOG_NAME, -                                                GF_LOG_ERROR, -                                                "%s: failed to connect with " -                                                "remote QP", this->name); -                                        ret = -1; -                                        goto unlock; -                                } -                                gf_rdma_fill_handshake_ack (buf, &priv->handshake.incoming); -                                buf[0] = 0; -                                priv->handshake.incoming.state = GF_RDMA_HANDSHAKE_RECEIVING_ACK; -                                break; - -                        case GF_RDMA_HANDSHAKE_RECEIVING_ACK: -                                ret = __tcp_readv (this, -                                                   &priv->handshake.incoming.vector, -                                                   priv->handshake.incoming.count, -                                                   &priv->handshake.incoming.pending_vector, -                                                   &priv->handshake.incoming.pending_count); -                                if (ret == -1) { -                                        goto unlock; -                                } - -                                if (ret > 0) { -                                        gf_log (this->name, GF_LOG_TRACE, -                                                "partial header read on NB " -                                                "socket. continue later"); -                                        goto unlock; -                                } - -                                if (!ret) { -                                        priv->handshake.incoming.state = GF_RDMA_HANDSHAKE_RECEIVED_ACK; -                                } -                                break; - -                        case GF_RDMA_HANDSHAKE_RECEIVED_ACK: -                                if (strncmp (buf, "DONE", 4)) { -                                        gf_log (GF_RDMA_LOG_NAME, -                                                GF_LOG_DEBUG, -                                                "%s: handshake-3 did not " -                                                "return 'DONE' (%s)", -                                                this->name, buf); -                                        ret = -1; -                                        goto unlock; -                                } -                                ret = 0; -                                priv->connected = 1; -                                sock_len = sizeof (struct sockaddr_storage); -                                getpeername (priv->sock, -                                             (struct sockaddr *) &this->peerinfo.sockaddr, -                                             &sock_len); - -                                GF_FREE (priv->handshake.incoming.buf); -                                priv->handshake.incoming.buf = NULL; -                                priv->handshake.incoming.state = GF_RDMA_HANDSHAKE_COMPLETE; -                        } -                } -        } -unlock: -        pthread_mutex_unlock (&priv->write_mutex); - -        if (ret == -1) { -                rpc_transport_disconnect (this); -        } else { -                ret = 0; -        } - - -        if (!ret && priv->connected) { -                if (priv->is_server) { -                        ret = rpc_transport_notify (priv->listener, -                                                    RPC_TRANSPORT_ACCEPT, -                                                    this); -                } else { -                        ret = rpc_transport_notify (this, RPC_TRANSPORT_CONNECT, -                                                    this); -                } -        } - -        return ret; -} - -static int -gf_rdma_handshake_pollout (rpc_transport_t *this) -{ -        gf_rdma_private_t *priv = NULL; -        char              *buf  = NULL; -        int32_t            ret  = 0; - -        priv = this->private; -        buf = priv->handshake.outgoing.buf; - -        if (priv->handshake.outgoing.state == GF_RDMA_HANDSHAKE_COMPLETE) { -                return 0; -        } - -        pthread_mutex_unlock (&priv->write_mutex); -        { -                while (priv->handshake.outgoing.state -                       != GF_RDMA_HANDSHAKE_COMPLETE) -                { -                        switch (priv->handshake.outgoing.state) -                        { -                        case GF_RDMA_HANDSHAKE_START: -                                buf = priv->handshake.outgoing.buf -                                        = GF_CALLOC (1, 256, gf_common_mt_char); -                                gf_rdma_fill_handshake_data (buf, -                                                             &priv->handshake.outgoing, priv); -                                priv->handshake.outgoing.state -                                        = GF_RDMA_HANDSHAKE_SENDING_DATA; -                                break; - -                        case GF_RDMA_HANDSHAKE_SENDING_DATA: -                                ret = __tcp_writev (this, -                                                    &priv->handshake.outgoing.vector, -                                                    priv->handshake.outgoing.count, -                                                    &priv->handshake.outgoing.pending_vector, -                                                    &priv->handshake.outgoing.pending_count); -                                if (ret == -1) { -                                        goto unlock; -                                } - -                                if (ret > 0) { -                                        gf_log (this->name, GF_LOG_TRACE, -                                                "partial header read on NB " -                                                "socket. continue later"); -                                        goto unlock; -                                } - -                                if (!ret) { -                                        priv->handshake.outgoing.state -                                                = GF_RDMA_HANDSHAKE_SENT_DATA; -                                } -                                break; - -                        case GF_RDMA_HANDSHAKE_SENT_DATA: -                                gf_rdma_fill_handshake_ack (buf, -                                                            &priv->handshake.outgoing); -                                priv->handshake.outgoing.state -                                        = GF_RDMA_HANDSHAKE_SENDING_ACK; -                                break; - -                        case GF_RDMA_HANDSHAKE_SENDING_ACK: -                                ret = __tcp_writev (this, -                                                    &priv->handshake.outgoing.vector, -                                                    priv->handshake.outgoing.count, -                                                    &priv->handshake.outgoing.pending_vector, -                                                    &priv->handshake.outgoing.pending_count); - -                                if (ret == -1) { -                                        goto unlock; -                                } - -                                if (ret > 0) { -                                        gf_log (this->name, GF_LOG_TRACE, -                                                "partial header read on NB " -                                                "socket. continue later"); -                                        goto unlock; -                                } - -                                if (!ret) { -                                        GF_FREE (priv->handshake.outgoing.buf); -                                        priv->handshake.outgoing.buf = NULL; -                                        priv->handshake.outgoing.state -                                                = GF_RDMA_HANDSHAKE_COMPLETE; -                                } -                                break; -                        } -                } -        } -unlock: -        pthread_mutex_unlock (&priv->write_mutex); - -        if (ret == -1) { -                rpc_transport_disconnect (this); -        } else { -                ret = 0; -        } - -        return ret; -} - -static int -gf_rdma_handshake_pollerr (rpc_transport_t *this) -{ -        gf_rdma_private_t *priv       = this->private; -        char               need_unref = 0, connected = 0; - -        gf_log_callingfn (GF_RDMA_LOG_NAME, GF_LOG_WARNING, -                          "%s: peer (%s) disconnected, cleaning up", -                          this->name, this->peerinfo.identifier); - -        pthread_mutex_lock (&priv->write_mutex); -        { -                __gf_rdma_teardown (this); - -                connected = priv->connected; -                if (priv->sock != -1) { -                        event_unregister (this->ctx->event_pool, -                                          priv->sock, priv->idx); -                        need_unref = 1; - -                        if (close (priv->sock) != 0) { -                                gf_log (GF_RDMA_LOG_NAME, GF_LOG_ERROR, -                                        "close () - error: %s", -                                        strerror (errno)); -                        } -                        priv->tcp_connected = priv->connected = 0; -                        priv->sock = -1; -                } - -                if (priv->handshake.incoming.buf) { -                        GF_FREE (priv->handshake.incoming.buf); -                        priv->handshake.incoming.buf = NULL; -                } - -                priv->handshake.incoming.state = GF_RDMA_HANDSHAKE_START; - -                if (priv->handshake.outgoing.buf) { -                        GF_FREE (priv->handshake.outgoing.buf); -                        priv->handshake.outgoing.buf = NULL; -                } - -                priv->handshake.outgoing.state = GF_RDMA_HANDSHAKE_START; -        } -        pthread_mutex_unlock (&priv->write_mutex); - -        if (connected) { -                rpc_transport_notify (this, RPC_TRANSPORT_DISCONNECT, this); -        } - -        if (need_unref) -                rpc_transport_unref (this); - -        return 0; -} - - -static int -tcp_connect_finish (rpc_transport_t *this) -{ -        gf_rdma_private_t *priv  = NULL; -        int                error = 0, ret = 0; - -        priv = this->private; -        pthread_mutex_lock (&priv->write_mutex); -        { -                ret = __tcp_connect_finish (priv->sock); - -                if (!ret) { -                        this->myinfo.sockaddr_len = -                                sizeof (this->myinfo.sockaddr); -                        ret = getsockname (priv->sock, -                                           (struct sockaddr *)&this->myinfo.sockaddr, -                                           &this->myinfo.sockaddr_len); -                        if (ret == -1) -                        { -                                gf_log (this->name, GF_LOG_ERROR, -                                        "getsockname on new client-socket %d " -                                        "failed (%s)", -                                        priv->sock, strerror (errno)); -                                close (priv->sock); -                                error = 1; -                                goto unlock; -                        } - -                        gf_rdma_get_transport_identifiers (this); -                        priv->tcp_connected = 1; -                } - -                if (ret == -1 && errno != EINPROGRESS) { -                        gf_log (this->name, GF_LOG_ERROR, -                                "tcp connect to %s failed (%s)", -                                this->peerinfo.identifier, strerror (errno)); -                        error = 1; -                } -        } -unlock: -        pthread_mutex_unlock (&priv->write_mutex); - -        if (error) { -                rpc_transport_disconnect (this); -        } - -        return ret; -} - -static int -gf_rdma_event_handler (int fd, int idx, void *data, -                       int poll_in, int poll_out, int poll_err) -{ -        rpc_transport_t   *this    = NULL; -        gf_rdma_private_t *priv    = NULL; -        gf_rdma_options_t *options = NULL; -        int                ret     = 0; - -        this = data; -        priv = this->private; -        if (!priv->tcp_connected) { -                ret = tcp_connect_finish (this); -                if (priv->tcp_connected) { -                        options = &priv->options; - -                        priv->peer.send_count = options->send_count; -                        priv->peer.recv_count = options->recv_count; -                        priv->peer.send_size = options->send_size; -                        priv->peer.recv_size = options->recv_size; - -                        if ((ret = gf_rdma_create_qp (this)) < 0) { -                                gf_log (GF_RDMA_LOG_NAME, GF_LOG_ERROR, -                                        "%s: could not create QP", -                                        this->name); -                                rpc_transport_disconnect (this); -                        } -                } -        } - -        if (!ret && poll_out && priv->tcp_connected) { -                ret = gf_rdma_handshake_pollout (this); -        } - -        if (!ret && !poll_err && poll_in && priv->tcp_connected) { -                if (priv->handshake.incoming.state -                    == GF_RDMA_HANDSHAKE_COMPLETE) { -                        gf_log (GF_RDMA_LOG_NAME, GF_LOG_ERROR, -                                "%s: pollin received on tcp socket (peer: %s) " -                                "after handshake is complete", -                                this->name, this->peerinfo.identifier); -                        gf_rdma_handshake_pollerr (this); -                        return 0; -                } - -                ret = gf_rdma_handshake_pollin (this); -                if (ret < 0) { -                        gf_log (GF_RDMA_LOG_NAME, GF_LOG_WARNING, -                                "handshake pollin failed"); -                } -        } - -        if (ret < 0 || poll_err) { -                ret = gf_rdma_handshake_pollerr (this); -        } - -        return 0; -} - -static int -__tcp_nonblock (int fd) -{ -        int flags = 0; -        int ret   = -1; - -        flags = fcntl (fd, F_GETFL); - -        if (flags != -1) -                ret = fcntl (fd, F_SETFL, flags | O_NONBLOCK); - -        return ret; -} - -static int32_t  gf_rdma_connect (struct rpc_transport *this, int port)  {          gf_rdma_private_t   *priv         = NULL;          int32_t              ret          = 0; -        gf_boolean_t         non_blocking = 1;          union gf_sock_union  sock_union   = {{0, }, };          socklen_t            sockaddr_len = 0; +        gf_rdma_peer_t      *peer         = NULL; +        gf_rdma_ctx_t       *rdma_ctx     = NULL; +        gf_boolean_t         connected    = _gf_false;          priv = this->private; +        peer = &priv->peer; + +        rpc_transport_ref (this); +          ret = gf_rdma_client_get_remote_sockaddr (this,                                                    &sock_union.sa,                                                    &sockaddr_len, port);          if (ret != 0) {                  gf_log (this->name, GF_LOG_DEBUG,                          "cannot get remote address to connect"); -                return ret; +                goto out;          } +        rdma_ctx = this->ctx->ib;          pthread_mutex_lock (&priv->write_mutex);          { -                if (priv->sock != -1) { -                        ret = 0; +                if (peer->cm_id != NULL) { +                        ret = -1; +                        errno = EINPROGRESS; +                        connected = _gf_true;                          goto unlock;                  } -                priv->sock = socket (sock_union.sa.sa_family, SOCK_STREAM, 0); +                priv->entity = GF_RDMA_CLIENT; -                if (priv->sock == -1) { +                ret = rdma_create_id (rdma_ctx->rdma_cm_event_channel, +                                      &peer->cm_id, this, RDMA_PS_TCP); +                if (ret != 0) {                          gf_log (this->name, GF_LOG_ERROR, -                                "socket () - error: %s", strerror (errno)); +                                "creation of rdma_cm_id failed (%s)", +                                strerror (errno));                          ret = -errno;                          goto unlock;                  } -                gf_log (this->name, GF_LOG_TRACE, -                        "socket fd = %d", priv->sock); -                  memcpy (&this->peerinfo.sockaddr, &sock_union.storage,                          sockaddr_len);                  this->peerinfo.sockaddr_len = sockaddr_len; @@ -4735,201 +4294,84 @@ gf_rdma_connect (struct rpc_transport *this, int port)                  ((struct sockaddr *) &this->myinfo.sockaddr)->sa_family =                          ((struct sockaddr *)&this->peerinfo.sockaddr)->sa_family; -                if (non_blocking) -                { -                        ret = __tcp_nonblock (priv->sock); - -                        if (ret == -1) -                        { -                                gf_log (this->name, GF_LOG_ERROR, -                                        "could not set socket %d to non " -                                        "blocking mode (%s)", -                                        priv->sock, strerror (errno)); -                                close (priv->sock); -                                priv->sock = -1; -                                goto unlock; -                        } -                } -                  ret = gf_rdma_client_bind (this,                                             (struct sockaddr *)&this->myinfo.sockaddr,                                             &this->myinfo.sockaddr_len, -                                           priv->sock); -                if (ret == -1) -                { +                                           peer->cm_id); +                if (ret != 0) {                          gf_log (this->name, GF_LOG_WARNING,                                  "client bind failed: %s", strerror (errno)); -                        close (priv->sock); -                        priv->sock = -1;                          goto unlock;                  } -                ret = connect (priv->sock, -                               (struct sockaddr *)&this->peerinfo.sockaddr, -                               this->peerinfo.sockaddr_len); -                if (ret == -1 && errno != EINPROGRESS) -                { -                        gf_log (this->name, GF_LOG_ERROR, -                                "connection attempt failed (%s)", +                ret = rdma_resolve_addr (peer->cm_id, NULL, &sock_union.sa, +                                         2000); +                if (ret != 0) { +                        gf_log (this->name, GF_LOG_WARNING, +                                "rdma_resolve_addr failed (%s)",                                  strerror (errno)); -                        close (priv->sock); -                        priv->sock = -1;                          goto unlock;                  } -                priv->tcp_connected = priv->connected = 0; - -                rpc_transport_ref (this); - -                priv->handshake.incoming.state = GF_RDMA_HANDSHAKE_START; -                priv->handshake.outgoing.state = GF_RDMA_HANDSHAKE_START; - -                priv->idx = event_register (this->ctx->event_pool, -                                            priv->sock, gf_rdma_event_handler, -                                            this, 1, 1); +                priv->connected = 0;          }  unlock:          pthread_mutex_unlock (&priv->write_mutex); -        return ret; -} - -static int -gf_rdma_server_event_handler (int fd, int idx, void *data, -                              int poll_in, int poll_out, int poll_err) -{ -        int32_t            main_sock    = -1; -        rpc_transport_t   *this         = NULL, *trans = NULL; -        gf_rdma_private_t *priv         = NULL; -        gf_rdma_private_t *trans_priv   = NULL; -        gf_rdma_options_t *options      = NULL; - -        if (!poll_in) { -                return 0; -        } - -        trans = data; -        trans_priv = (gf_rdma_private_t *) trans->private; - -        this = GF_CALLOC (1, sizeof (rpc_transport_t), -                          gf_common_mt_rpc_transport_t); -        if (this == NULL) { -                return -1; -        } - -        this->listener = trans; - -        priv = GF_CALLOC (1, sizeof (gf_rdma_private_t), -                          gf_common_mt_rdma_private_t); -        if (priv == NULL) { -                GF_FREE (priv); -                return -1; -        } -        this->private = priv; -        /* Copy all the rdma related values in priv, from trans_priv -           as other than QP, all the values remain same */ -        priv->device = trans_priv->device; -        priv->options = trans_priv->options; -        priv->is_server = 1; -        priv->listener = trans; - -        options = &priv->options; - -        this->ops = trans->ops; -        this->init = trans->init; -        this->fini = trans->fini; -        this->ctx = trans->ctx; -        this->name = gf_strdup (trans->name); -        this->notify = trans->notify; -        this->mydata = trans->mydata; - -        memcpy (&this->myinfo.sockaddr, &trans->myinfo.sockaddr, -                trans->myinfo.sockaddr_len); -        this->myinfo.sockaddr_len = trans->myinfo.sockaddr_len; - -        main_sock = (trans_priv)->sock; -        this->peerinfo.sockaddr_len = sizeof (this->peerinfo.sockaddr); -        priv->sock = accept (main_sock, -                             (struct sockaddr *)&this->peerinfo.sockaddr, -                             &this->peerinfo.sockaddr_len); -        if (priv->sock == -1) { -                gf_log ("rdma/server", GF_LOG_ERROR, -                        "accept() failed: %s", -                        strerror (errno)); -                GF_FREE (this->private); -                GF_FREE (this); -                return -1; -        } - -        priv->peer.trans = this; -        rpc_transport_ref (this); - -        gf_rdma_get_transport_identifiers (this); - -        priv->tcp_connected = 1; -        priv->handshake.incoming.state = GF_RDMA_HANDSHAKE_START; -        priv->handshake.outgoing.state = GF_RDMA_HANDSHAKE_START; - -        priv->peer.send_count = options->send_count; -        priv->peer.recv_count = options->recv_count; -        priv->peer.send_size = options->send_size; -        priv->peer.recv_size = options->recv_size; -        INIT_LIST_HEAD (&priv->peer.ioq); +out: +        if (ret != 0) { +                if (!connected) { +                        gf_rdma_teardown (this); +                } -        if (gf_rdma_create_qp (this) < 0) { -                gf_log (GF_RDMA_LOG_NAME, GF_LOG_ERROR, -                        "%s: could not create QP", -                        this->name); -                rpc_transport_disconnect (this); -                return -1; +                rpc_transport_unref (this);          } -        priv->idx = event_register (this->ctx->event_pool, priv->sock, -                                    gf_rdma_event_handler, this, 1, 1); - -        pthread_mutex_init (&priv->read_mutex, NULL); -        pthread_mutex_init (&priv->write_mutex, NULL); -        pthread_mutex_init (&priv->recv_mutex, NULL); -        /*  pthread_cond_init (&priv->recv_cond, NULL); */ -        return 0; +        return ret;  } +  static int32_t  gf_rdma_listen (rpc_transport_t *this)  {          union gf_sock_union  sock_union   = {{0, }, };          socklen_t            sockaddr_len = 0;          gf_rdma_private_t   *priv         = NULL; -        int                  opt          = 1, ret = 0; +        gf_rdma_peer_t      *peer         = NULL; +        int                  ret          = 0; +        gf_rdma_ctx_t       *rdma_ctx     = NULL;          char                 service[NI_MAXSERV], host[NI_MAXHOST];          priv = this->private; -        memset (&sock_union, 0, sizeof (sock_union)); -        ret = gf_rdma_server_get_local_sockaddr (this, -                                                 &sock_union.sa, +        peer = &priv->peer; + +        priv->entity = GF_RDMA_SERVER_LISTENER; + +        rdma_ctx = this->ctx->ib; + +        ret = gf_rdma_server_get_local_sockaddr (this, &sock_union.sa,                                                   &sockaddr_len);          if (ret != 0) { -                gf_log (this->name, GF_LOG_DEBUG, +                gf_log (this->name, GF_LOG_WARNING,                          "cannot find network address of server to bind to");                  goto err;          } -        priv->sock = socket (sock_union.sa.sa_family, SOCK_STREAM, 0); -        if (priv->sock == -1) { -                gf_log ("rdma/server", GF_LOG_CRITICAL, -                        "init: failed to create socket, error: %s", +        ret = rdma_create_id (rdma_ctx->rdma_cm_event_channel, +                              &peer->cm_id, this, RDMA_PS_TCP); +        if (ret != 0) { +                gf_log (this->name, GF_LOG_WARNING, +                        "creation of rdma_cm_id failed (%s)",                          strerror (errno)); -                GF_FREE (this->private); -                ret = -1;                  goto err;          } -        memcpy (&this->myinfo.sockaddr, &sock_union.storage, sockaddr_len); +        memcpy (&this->myinfo.sockaddr, &sock_union.storage, +                sockaddr_len);          this->myinfo.sockaddr_len = sockaddr_len;          ret = getnameinfo ((struct sockaddr *)&this->myinfo.sockaddr, -                           this->myinfo.sockaddr_len, -                           host, sizeof (host), +                           this->myinfo.sockaddr_len, host, sizeof (host),                             service, sizeof (service),                             NI_NUMERICHOST);          if (ret != 0) { @@ -4937,34 +4379,38 @@ gf_rdma_listen (rpc_transport_t *this)                          "getnameinfo failed (%s)", gai_strerror (ret));                  goto err;          } +          sprintf (this->myinfo.identifier, "%s:%s", host, service); -        setsockopt (priv->sock, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof (opt)); -        if (bind (priv->sock, &sock_union.sa, sockaddr_len) != 0) { -                ret = -1; -                gf_log ("rdma/server", GF_LOG_ERROR, -                        "init: failed to bind to socket for %s (%s)", -                        this->myinfo.identifier, strerror (errno)); +        ret = rdma_bind_addr (peer->cm_id, &sock_union.sa); +        if (ret != 0) { +                gf_log (this->name, GF_LOG_WARNING, +                        "rdma_bind_addr failed (%s)", strerror (errno));                  goto err;          } -        if (listen (priv->sock, 10) != 0) { -                gf_log ("rdma/server", GF_LOG_ERROR, -                        "init: listen () failed on socket for %s (%s)", -                        this->myinfo.identifier, strerror (errno)); -                ret = -1; +        ret = rdma_listen (peer->cm_id, 10); +        if (ret != 0) { +                gf_log (this->name, GF_LOG_WARNING, +                        "rdma_listen failed (%s)", strerror (errno));                  goto err;          } -        /* Register the main socket */ -        priv->idx = event_register (this->ctx->event_pool, priv->sock, -                                    gf_rdma_server_event_handler, -                                    rpc_transport_ref (this), 1, 0); +        rpc_transport_ref (this); +        ret = 0;  err: +        if (ret < 0) { +                if (peer->cm_id != NULL) { +                        rdma_destroy_id (peer->cm_id); +                        peer->cm_id = NULL; +                } +        } +          return ret;  } +  struct rpc_transport_ops tops = {          .submit_request = gf_rdma_submit_request,          .submit_reply   = gf_rdma_submit_reply, @@ -4983,7 +4429,6 @@ init (rpc_transport_t *this)                  return -1;          this->private = priv; -        priv->sock = -1;          if (gf_rdma_init (this)) {                  gf_log (this->name, GF_LOG_ERROR, @@ -5007,13 +4452,6 @@ fini (struct rpc_transport *this)          if (priv) {                  pthread_mutex_destroy (&priv->recv_mutex);                  pthread_mutex_destroy (&priv->write_mutex); -                pthread_mutex_destroy (&priv->read_mutex); - -                /*  pthread_cond_destroy (&priv->recv_cond); */ -                if (priv->sock != -1) { -                        event_unregister (this->ctx->event_pool, -                                          priv->sock, priv->idx); -                }                  gf_log (this->name, GF_LOG_TRACE,                          "called fini on transport: %p", this);  | 
