summaryrefslogtreecommitdiffstats
path: root/transport
diff options
context:
space:
mode:
authorVikas Gorur <vikas@zresearch.com>2009-02-18 17:36:07 +0530
committerVikas Gorur <vikas@zresearch.com>2009-02-18 17:36:07 +0530
commit77adf4cd648dce41f89469dd185deec6b6b53a0b (patch)
tree02e155a5753b398ee572b45793f889b538efab6b /transport
parentf3b2e6580e5663292ee113c741343c8a43ee133f (diff)
Added all files
Diffstat (limited to 'transport')
-rw-r--r--transport/Makefile.am3
-rw-r--r--transport/ib-verbs/Makefile.am1
-rw-r--r--transport/ib-verbs/src/Makefile.am15
-rw-r--r--transport/ib-verbs/src/ib-verbs.c2392
-rw-r--r--transport/ib-verbs/src/ib-verbs.h215
-rw-r--r--transport/ib-verbs/src/name.c682
-rw-r--r--transport/ib-verbs/src/name.h47
-rw-r--r--transport/socket/Makefile.am1
-rw-r--r--transport/socket/src/Makefile.am14
-rw-r--r--transport/socket/src/name.c677
-rw-r--r--transport/socket/src/name.h44
-rw-r--r--transport/socket/src/socket.c1370
-rw-r--r--transport/socket/src/socket.h106
13 files changed, 5567 insertions, 0 deletions
diff --git a/transport/Makefile.am b/transport/Makefile.am
new file mode 100644
index 00000000000..e2f97437c12
--- /dev/null
+++ b/transport/Makefile.am
@@ -0,0 +1,3 @@
+SUBDIRS = socket $(IBVERBS_SUBDIR)
+
+CLEANFILES =
diff --git a/transport/ib-verbs/Makefile.am b/transport/ib-verbs/Makefile.am
new file mode 100644
index 00000000000..f963effea22
--- /dev/null
+++ b/transport/ib-verbs/Makefile.am
@@ -0,0 +1 @@
+SUBDIRS = src \ No newline at end of file
diff --git a/transport/ib-verbs/src/Makefile.am b/transport/ib-verbs/src/Makefile.am
new file mode 100644
index 00000000000..e6240090e92
--- /dev/null
+++ b/transport/ib-verbs/src/Makefile.am
@@ -0,0 +1,15 @@
+noinst_HEADERS = ib-verbs.h name.h
+
+transport_LTLIBRARIES = ib-verbs.la
+transportdir = $(libdir)/glusterfs/$(PACKAGE_VERSION)/transport/
+
+ib_verbs_la_LDFLAGS = -module -avoidversion
+
+ib_verbs_la_SOURCES = ib-verbs.c name.c
+ib_verbs_la_LIBADD = -libverbs $(top_builddir)/libglusterfs/src/libglusterfs.la
+
+AM_CFLAGS = -fPIC -D_FILE_OFFSET_BITS=64 -D_GNU_SOURCE -Wall -D$(GF_HOST_OS)\
+ -I$(top_srcdir)/libglusterfs/src -I$(top_srcdir)/transport/ib-verbs \
+ -shared -nostartfiles
+
+CLEANFILES = *~
diff --git a/transport/ib-verbs/src/ib-verbs.c b/transport/ib-verbs/src/ib-verbs.c
new file mode 100644
index 00000000000..b9329588eb3
--- /dev/null
+++ b/transport/ib-verbs/src/ib-verbs.c
@@ -0,0 +1,2392 @@
+/*
+ Copyright (c) 2006, 2007, 2008 Z RESEARCH, Inc. <http://www.zresearch.com>
+ This file is part of GlusterFS.
+
+ GlusterFS is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published
+ by the Free Software Foundation; either version 3 of the License,
+ or (at your option) any later version.
+
+ GlusterFS is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program. If not, see
+ <http://www.gnu.org/licenses/>.
+*/
+
+
+#ifndef _CONFIG_H
+#define _CONFIG_H
+#include "config.h"
+#endif
+
+#include "dict.h"
+#include "glusterfs.h"
+#include "transport.h"
+#include "protocol.h"
+#include "logging.h"
+#include "xlator.h"
+#include "name.h"
+#include "ib-verbs.h"
+#include <signal.h>
+
+int32_t
+gf_resolve_ip6 (const char *hostname,
+ uint16_t port,
+ int family,
+ void **dnscache,
+ struct addrinfo **addr_info);
+
+static uint16_t
+ib_verbs_get_local_lid (struct ibv_context *context,
+ int32_t port)
+{
+ struct ibv_port_attr attr;
+
+ if (ibv_query_port (context, port, &attr))
+ return 0;
+
+ return attr.lid;
+}
+
+
+static void
+ib_verbs_put_post (ib_verbs_queue_t *queue,
+ ib_verbs_post_t *post)
+{
+ pthread_mutex_lock (&queue->lock);
+ if (post->prev) {
+ queue->active_count--;
+ post->prev->next = post->next;
+ }
+ if (post->next)
+ post->next->prev = post->prev;
+ post->prev = &queue->passive_posts;
+ post->next = post->prev->next;
+ post->prev->next = post;
+ post->next->prev = post;
+ queue->passive_count++;
+ pthread_mutex_unlock (&queue->lock);
+}
+
+
+static ib_verbs_post_t *
+ib_verbs_new_post (ib_verbs_device_t *device, int32_t len)
+{
+ ib_verbs_post_t *post;
+
+ post = (ib_verbs_post_t *) CALLOC (1, sizeof (*post));
+ if (!post)
+ return NULL;
+
+ post->buf_size = len;
+
+ post->buf = valloc (len);
+ if (!post->buf) {
+ free (post);
+ return NULL;
+ }
+
+ post->mr = ibv_reg_mr (device->pd,
+ post->buf,
+ post->buf_size,
+ IBV_ACCESS_LOCAL_WRITE);
+ if (!post->mr) {
+ free (post->buf);
+ free (post);
+ return NULL;
+ }
+
+ return post;
+}
+
+
+static ib_verbs_post_t *
+ib_verbs_get_post (ib_verbs_queue_t *queue)
+{
+ ib_verbs_post_t *post;
+
+ pthread_mutex_lock (&queue->lock);
+ {
+ post = queue->passive_posts.next;
+ if (post == &queue->passive_posts)
+ post = NULL;
+
+ if (post) {
+ if (post->prev)
+ post->prev->next = post->next;
+ if (post->next)
+ post->next->prev = post->prev;
+ post->prev = &queue->active_posts;
+ post->next = post->prev->next;
+ post->prev->next = post;
+ post->next->prev = post;
+ post->reused++;
+ queue->active_count++;
+ }
+ }
+ pthread_mutex_unlock (&queue->lock);
+
+ return post;
+}
+
+void
+ib_verbs_destroy_post (ib_verbs_post_t *post)
+{
+ ibv_dereg_mr (post->mr);
+ free (post->buf);
+ free (post);
+}
+
+
+static int32_t
+__ib_verbs_quota_get (ib_verbs_peer_t *peer)
+{
+ int32_t ret = -1;
+ ib_verbs_private_t *priv = peer->trans->private;
+
+ if (priv->connected && peer->quota > 0) {
+ ret = peer->quota--;
+ }
+
+ return ret;
+}
+
+/*
+ static int32_t
+ ib_verbs_quota_get (ib_verbs_peer_t *peer)
+ {
+ int32_t ret = -1;
+ ib_verbs_private_t *priv = peer->trans->private;
+
+ pthread_mutex_lock (&priv->write_mutex);
+ {
+ ret = __ib_verbs_quota_get (peer);
+ }
+ pthread_mutex_unlock (&priv->write_mutex);
+
+ return ret;
+ }
+*/
+
+static void
+__ib_verbs_ioq_entry_free (ib_verbs_ioq_t *entry)
+{
+ list_del_init (&entry->list);
+ if (entry->refs)
+ dict_unref (entry->refs);
+
+ /* TODO: use mem-pool */
+ free (entry->buf);
+
+ /* TODO: use mem-pool */
+ free (entry);
+}
+
+
+static void
+__ib_verbs_ioq_flush (ib_verbs_peer_t *peer)
+{
+ ib_verbs_ioq_t *entry = NULL, *dummy = NULL;
+
+ list_for_each_entry_safe (entry, dummy, &peer->ioq, list) {
+ __ib_verbs_ioq_entry_free (entry);
+ }
+}
+
+
+static int32_t
+__ib_verbs_disconnect (transport_t *this)
+{
+ ib_verbs_private_t *priv = this->private;
+ int32_t ret = 0;
+
+ if (priv->connected || priv->tcp_connected) {
+ fcntl (priv->sock, F_SETFL, O_NONBLOCK);
+ if (shutdown (priv->sock, SHUT_RDWR) != 0) {
+ gf_log ("transport/ib-verbs",
+ GF_LOG_ERROR,
+ "shutdown () - error: %s",
+ strerror (errno));
+ ret = -errno;
+ priv->tcp_connected = 0;
+ }
+ }
+
+ return ret;
+}
+
+
+static int32_t
+ib_verbs_post_send (struct ibv_qp *qp,
+ ib_verbs_post_t *post,
+ int32_t len)
+{
+ struct ibv_sge list = {
+ .addr = (unsigned long) post->buf,
+ .length = len,
+ .lkey = post->mr->lkey
+ };
+
+ struct ibv_send_wr wr = {
+ .wr_id = (unsigned long) post,
+ .sg_list = &list,
+ .num_sge = 1,
+ .opcode = IBV_WR_SEND,
+ .send_flags = IBV_SEND_SIGNALED,
+ }, *bad_wr;
+
+ if (!qp)
+ return -1;
+
+ return ibv_post_send (qp, &wr, &bad_wr);
+}
+
+
+static int32_t
+__ib_verbs_ioq_churn_entry (ib_verbs_peer_t *peer, ib_verbs_ioq_t *entry)
+{
+ int32_t ret = 0, quota = 0;
+ ib_verbs_private_t *priv = peer->trans->private;
+ ib_verbs_device_t *device = priv->device;
+ ib_verbs_options_t *options = &priv->options;
+ ib_verbs_post_t *post = NULL;
+ int32_t len = 0;
+
+ quota = __ib_verbs_quota_get (peer);
+ if (quota > 0) {
+ post = ib_verbs_get_post (&device->sendq);
+ if (!post)
+ post = ib_verbs_new_post (device,
+ (options->send_size + 2048));
+
+ len = iov_length ((const struct iovec *)&entry->vector,
+ entry->count);
+ if (len >= (options->send_size + 2048)) {
+ gf_log ("transport/ib-verbs", GF_LOG_CRITICAL,
+ "increase value of option 'transport.ib-verbs."
+ "work-request-send-size' (given=> %d) to send "
+ "bigger (%d) messages",
+ (options->send_size + 2048), len);
+ return -1;
+ }
+
+ iov_unload (post->buf,
+ (const struct iovec *)&entry->vector,
+ entry->count);
+
+ ret = ib_verbs_post_send (peer->qp, post, len);
+ if (!ret) {
+ __ib_verbs_ioq_entry_free (entry);
+ ret = len;
+ } else {
+ gf_log ("transport/ib-verbs", GF_LOG_ERROR,
+ "ibv_post_send failed with ret = %d", ret);
+ ib_verbs_put_post (&device->sendq, post);
+ __ib_verbs_disconnect (peer->trans);
+ ret = -1;
+ }
+ }
+
+ return ret;
+}
+
+
+static int32_t
+__ib_verbs_ioq_churn (ib_verbs_peer_t *peer)
+{
+ ib_verbs_ioq_t *entry = NULL;
+ int32_t ret = 0;
+
+ while (!list_empty (&peer->ioq))
+ {
+ /* pick next entry */
+ entry = peer->ioq_next;
+
+ ret = __ib_verbs_ioq_churn_entry (peer, entry);
+
+ if (ret <= 0)
+ break;
+ }
+
+ /*
+ list_for_each_entry_safe (entry, dummy, &peer->ioq, list) {
+ ret = __ib_verbs_ioq_churn_entry (peer, entry);
+ if (ret <= 0) {
+ break;
+ }
+ }
+ */
+
+ return ret;
+}
+
+static int32_t
+__ib_verbs_quota_put (ib_verbs_peer_t *peer)
+{
+ int32_t ret;
+
+ peer->quota++;
+ ret = peer->quota;
+
+ if (!list_empty (&peer->ioq)) {
+ ret = __ib_verbs_ioq_churn (peer);
+ }
+
+ return ret;
+}
+
+
+static int32_t
+ib_verbs_quota_put (ib_verbs_peer_t *peer)
+{
+ int32_t ret;
+ ib_verbs_private_t *priv = peer->trans->private;
+
+ pthread_mutex_lock (&priv->write_mutex);
+ {
+ ret = __ib_verbs_quota_put (peer);
+ }
+ pthread_mutex_unlock (&priv->write_mutex);
+
+ return ret;
+}
+
+
+static int32_t
+ib_verbs_post_recv (struct ibv_srq *srq,
+ ib_verbs_post_t *post)
+{
+ struct ibv_sge list = {
+ .addr = (unsigned long) post->buf,
+ .length = post->buf_size,
+ .lkey = post->mr->lkey
+ };
+
+ struct ibv_recv_wr wr = {
+ .wr_id = (unsigned long) post,
+ .sg_list = &list,
+ .num_sge = 1,
+ }, *bad_wr;
+
+ return ibv_post_srq_recv (srq, &wr, &bad_wr);
+}
+
+
+static int32_t
+ib_verbs_writev (transport_t *this,
+ ib_verbs_ioq_t *entry)
+{
+ int32_t ret = 0, need_append = 1;
+ ib_verbs_private_t *priv = this->private;
+ ib_verbs_peer_t *peer = NULL;
+
+ pthread_mutex_lock (&priv->write_mutex);
+ {
+ if (!priv->connected) {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "ib-verbs is not connected to post a "
+ "send request");
+ ret = -1;
+ goto unlock;
+ }
+
+ peer = &priv->peer;
+ if (list_empty (&peer->ioq)) {
+ ret = __ib_verbs_ioq_churn_entry (peer, entry);
+ if (ret > 0) {
+ need_append = 0;
+ }
+ }
+
+ if (need_append) {
+ list_add_tail (&entry->list, &peer->ioq);
+ }
+ }
+unlock:
+ pthread_mutex_unlock (&priv->write_mutex);
+ return ret;
+}
+
+
+static ib_verbs_ioq_t *
+ib_verbs_ioq_new (char *buf, int len, struct iovec *vector,
+ int count, dict_t *refs)
+{
+ ib_verbs_ioq_t *entry = NULL;
+
+ /* TODO: use mem-pool */
+ entry = CALLOC (1, sizeof (*entry));
+
+ assert (count <= (MAX_IOVEC-2));
+
+ entry->header.colonO[0] = ':';
+ entry->header.colonO[1] = 'O';
+ entry->header.colonO[2] = '\0';
+ entry->header.version = 42;
+ entry->header.size1 = hton32 (len);
+ entry->header.size2 = hton32 (iov_length (vector, count));
+
+ entry->vector[0].iov_base = &entry->header;
+ entry->vector[0].iov_len = sizeof (entry->header);
+ entry->count++;
+
+ entry->vector[1].iov_base = buf;
+ entry->vector[1].iov_len = len;
+ entry->count++;
+
+ if (vector && count)
+ {
+ memcpy (&entry->vector[2], vector, sizeof (*vector) * count);
+ entry->count += count;
+ }
+
+ if (refs)
+ entry->refs = dict_ref (refs);
+
+ entry->buf = buf;
+
+ INIT_LIST_HEAD (&entry->list);
+
+ return entry;
+}
+
+
+static int32_t
+ib_verbs_submit (transport_t *this, char *buf, int32_t len,
+ struct iovec *vector, int count, dict_t *refs)
+{
+ int32_t ret = 0;
+ ib_verbs_ioq_t *entry = NULL;
+
+ entry = ib_verbs_ioq_new (buf, len, vector, count, refs);
+ ret = ib_verbs_writev (this, entry);
+
+ if (ret > 0) {
+ ret = 0;
+ }
+
+ return ret;
+}
+
+static int
+ib_verbs_receive (transport_t *this, char **hdr_p, size_t *hdrlen_p,
+ char **buf_p, size_t *buflen_p)
+{
+ ib_verbs_private_t *priv = this->private;
+ /* TODO: return error if !priv->connected, check with locks */
+ /* TODO: boundry checks for data_ptr/offset */
+ char *copy_from = NULL;
+ ib_verbs_header_t *header = NULL;
+ uint32_t size1, size2, data_len = 0;
+ char *hdr = NULL, *buf = NULL;
+ int32_t ret = 0;
+
+ pthread_mutex_lock (&priv->recv_mutex);
+ {
+/*
+ while (!priv->data_ptr)
+ pthread_cond_wait (&priv->recv_cond, &priv->recv_mutex);
+*/
+
+ copy_from = priv->data_ptr + priv->data_offset;
+
+ priv->data_ptr = NULL;
+ data_len = priv->data_len;
+ /* pthread_cond_broadcast (&priv->recv_cond); */
+ }
+ pthread_mutex_unlock (&priv->recv_mutex);
+
+ header = (ib_verbs_header_t *)copy_from;
+ if (strcmp (header->colonO, ":O")) {
+ gf_log ("transport/ib-verbs", GF_LOG_ERROR,
+ "%s: corrupt header received", this->xl->name);
+ ret = -1;
+ goto err;
+ }
+
+ size1 = ntoh32 (header->size1);
+ size2 = ntoh32 (header->size2);
+
+ if (data_len != (size1 + size2 + sizeof (*header))) {
+ gf_log ("transport/ib-verbs", GF_LOG_ERROR,
+ "%s: sizeof data read from transport is not equal "
+ "to the size specified in the header",
+ this->xl->name);
+ ret = -1;
+ goto err;
+ }
+
+ copy_from += sizeof (*header);
+
+ if (size1) {
+ hdr = CALLOC (1, size1);
+ memcpy (hdr, copy_from, size1);
+ copy_from += size1;
+ *hdr_p = hdr;
+ }
+ *hdrlen_p = size1;
+
+ if (size2) {
+ buf = CALLOC (1, size2);
+ memcpy (buf, copy_from, size2);
+ *buf_p = buf;
+ }
+ *buflen_p = size2;
+
+err:
+ return ret;
+}
+
+
+static void
+ib_verbs_destroy_cq (transport_t *this)
+{
+ ib_verbs_private_t *priv = this->private;
+ ib_verbs_device_t *device = priv->device;
+
+ if (device->recv_cq)
+ ibv_destroy_cq (device->recv_cq);
+ device->recv_cq = NULL;
+
+ if (device->send_cq)
+ ibv_destroy_cq (device->send_cq);
+ device->send_cq = NULL;
+
+ return;
+}
+
+
+static int32_t
+ib_verbs_create_cq (transport_t *this)
+{
+ ib_verbs_private_t *priv = this->private;
+ ib_verbs_options_t *options = &priv->options;
+ ib_verbs_device_t *device = priv->device;
+ int32_t ret = 0;
+
+ device->recv_cq = ibv_create_cq (priv->device->context,
+ options->recv_count * 2,
+ device,
+ device->recv_chan,
+ 0);
+ if (!device->recv_cq) {
+ gf_log ("transport/ib-verbs",
+ GF_LOG_ERROR,
+ "%s: creation of CQ failed",
+ this->xl->name);
+ ret = -1;
+ } else if (ibv_req_notify_cq (device->recv_cq, 0)) {
+ gf_log ("transport/ib-verbs",
+ GF_LOG_ERROR,
+ "%s: ibv_req_notify_cq on CQ failed",
+ this->xl->name);
+ ret = -1;
+ }
+
+ do {
+ /* TODO: make send_cq size dynamically adaptive */
+ device->send_cq = ibv_create_cq (priv->device->context,
+ options->send_count * 1024,
+ device,
+ device->send_chan,
+ 0);
+ if (!device->send_cq) {
+ gf_log ("transport/ib-verbs",
+ GF_LOG_ERROR,
+ "%s: creation of send_cq failed",
+ this->xl->name);
+ ret = -1;
+ break;
+ }
+
+ if (ibv_req_notify_cq (device->send_cq, 0)) {
+ gf_log ("transport/ib-verbs",
+ GF_LOG_ERROR,
+ "%s: ibv_req_notify_cq on send_cq failed",
+ this->xl->name);
+ ret = -1;
+ break;
+ }
+ } while (0);
+
+ if (ret != 0)
+ ib_verbs_destroy_cq (this);
+
+ return ret;
+}
+
+
+static void
+ib_verbs_register_peer (ib_verbs_device_t *device,
+ int32_t qp_num,
+ ib_verbs_peer_t *peer)
+{
+ struct _qpent *ent;
+ ib_verbs_qpreg_t *qpreg = &device->qpreg;
+ int32_t hash = qp_num % 42;
+
+ pthread_mutex_lock (&qpreg->lock);
+ ent = qpreg->ents[hash].next;
+ while ((ent != &qpreg->ents[hash]) && (ent->qp_num != qp_num))
+ ent = ent->next;
+ if (ent->qp_num == qp_num) {
+ pthread_mutex_unlock (&qpreg->lock);
+ return;
+ }
+ ent = (struct _qpent *) CALLOC (1, sizeof (*ent));
+ ERR_ABORT (ent);
+ /* TODO: ref reg->peer */
+ ent->peer = peer;
+ ent->next = &qpreg->ents[hash];
+ ent->prev = ent->next->prev;
+ ent->next->prev = ent;
+ ent->prev->next = ent;
+ ent->qp_num = qp_num;
+ qpreg->count++;
+ pthread_mutex_unlock (&qpreg->lock);
+}
+
+
+static void
+ib_verbs_unregister_peer (ib_verbs_device_t *device,
+ int32_t qp_num)
+{
+ struct _qpent *ent;
+ ib_verbs_qpreg_t *qpreg = &device->qpreg;
+ int32_t hash = qp_num % 42;
+
+ pthread_mutex_lock (&qpreg->lock);
+ ent = qpreg->ents[hash].next;
+ while ((ent != &qpreg->ents[hash]) && (ent->qp_num != qp_num))
+ ent = ent->next;
+ if (ent->qp_num != qp_num) {
+ pthread_mutex_unlock (&qpreg->lock);
+ return;
+ }
+ ent->prev->next = ent->next;
+ ent->next->prev = ent->prev;
+ /* TODO: unref reg->peer */
+ free (ent);
+ qpreg->count--;
+ pthread_mutex_unlock (&qpreg->lock);
+}
+
+
+static ib_verbs_peer_t *
+ib_verbs_lookup_peer (ib_verbs_device_t *device,
+ int32_t qp_num)
+{
+ struct _qpent *ent;
+ ib_verbs_qpreg_t *qpreg = &device->qpreg;
+ ib_verbs_peer_t *peer;
+ int32_t hash = qp_num % 42;
+
+ pthread_mutex_lock (&qpreg->lock);
+ ent = qpreg->ents[hash].next;
+ while ((ent != &qpreg->ents[hash]) && (ent->qp_num != qp_num))
+ ent = ent->next;
+ peer = ent->peer;
+ pthread_mutex_unlock (&qpreg->lock);
+ return peer;
+}
+
+
+static void
+__ib_verbs_destroy_qp (transport_t *this)
+{
+ ib_verbs_private_t *priv = this->private;
+
+ if (priv->peer.qp) {
+ ib_verbs_unregister_peer (priv->device, priv->peer.qp->qp_num);
+ ibv_destroy_qp (priv->peer.qp);
+ }
+ priv->peer.qp = NULL;
+
+ return;
+}
+
+
+static int32_t
+ib_verbs_create_qp (transport_t *this)
+{
+ ib_verbs_private_t *priv = this->private;
+ ib_verbs_options_t *options = &priv->options;
+ ib_verbs_device_t *device = priv->device;
+ int32_t ret = 0;
+ ib_verbs_peer_t *peer;
+
+ peer = &priv->peer;
+ struct ibv_qp_init_attr init_attr = {
+ .send_cq = device->send_cq,
+ .recv_cq = device->recv_cq,
+ .srq = device->srq,
+ .cap = {
+ .max_send_wr = peer->send_count,
+ .max_recv_wr = peer->recv_count,
+ .max_send_sge = 1,
+ .max_recv_sge = 1
+ },
+ .qp_type = IBV_QPT_RC
+ };
+
+ struct ibv_qp_attr attr = {
+ .qp_state = IBV_QPS_INIT,
+ .pkey_index = 0,
+ .port_num = options->port,
+ .qp_access_flags = 0
+ };
+
+ peer->qp = ibv_create_qp (device->pd, &init_attr);
+ if (!peer->qp) {
+ gf_log ("transport/ib-verbs",
+ GF_LOG_CRITICAL,
+ "%s: could not create QP",
+ this->xl->name);
+ ret = -1;
+ } else if (ibv_modify_qp (peer->qp, &attr,
+ IBV_QP_STATE |
+ IBV_QP_PKEY_INDEX |
+ IBV_QP_PORT |
+ IBV_QP_ACCESS_FLAGS)) {
+ gf_log ("transport/ib-verbs",
+ GF_LOG_ERROR,
+ "%s: failed to modify QP to INIT state",
+ this->xl->name);
+ ret = -1;
+ }
+
+ peer->local_lid = ib_verbs_get_local_lid (device->context,
+ options->port);
+ peer->local_qpn = peer->qp->qp_num;
+ peer->local_psn = lrand48 () & 0xffffff;
+
+ ib_verbs_register_peer (device, peer->qp->qp_num, peer);
+
+ if (ret == -1)
+ __ib_verbs_destroy_qp (this);
+
+ return ret;
+}
+
+
+static void
+ib_verbs_destroy_posts (transport_t *this)
+{
+
+}
+
+
+static int32_t
+__ib_verbs_create_posts (transport_t *this,
+ int32_t count,
+ int32_t size,
+ ib_verbs_queue_t *q)
+{
+ int32_t i;
+ int32_t ret = 0;
+ ib_verbs_private_t *priv = this->private;
+ ib_verbs_device_t *device = priv->device;
+
+ for (i=0 ; i<count ; i++) {
+ ib_verbs_post_t *post;
+
+ post = ib_verbs_new_post (device, size + 2048);
+ if (!post) {
+ gf_log ("transport/ib-verbs",
+ GF_LOG_ERROR,
+ "%s: post creation failed",
+ this->xl->name);
+ ret = -1;
+ break;
+ }
+
+ ib_verbs_put_post (q, post);
+ }
+ return ret;
+}
+
+
+static int32_t
+ib_verbs_create_posts (transport_t *this)
+{
+ int32_t i, ret;
+ ib_verbs_post_t *post = NULL;
+ ib_verbs_private_t *priv = this->private;
+ ib_verbs_options_t *options = &priv->options;
+ ib_verbs_device_t *device = priv->device;
+
+ ret = __ib_verbs_create_posts (this, options->send_count,
+ options->send_size,
+ &device->sendq);
+ if (!ret)
+ ret = __ib_verbs_create_posts (this, options->recv_count,
+ options->recv_size,
+ &device->recvq);
+
+ if (!ret) {
+ for (i=0 ; i<options->recv_count ; i++) {
+ post = ib_verbs_get_post (&device->recvq);
+ if (ib_verbs_post_recv (device->srq, post) != 0) {
+ ret = -1;
+ break;
+ }
+ }
+ }
+
+ if (ret)
+ ib_verbs_destroy_posts (this);
+
+ return ret;
+}
+
+
+static int32_t
+ib_verbs_connect_qp (transport_t *this)
+{
+ ib_verbs_private_t *priv = this->private;
+ ib_verbs_options_t *options = &priv->options;
+ struct ibv_qp_attr attr = {
+ .qp_state = IBV_QPS_RTR,
+ .path_mtu = options->mtu,
+ .dest_qp_num = priv->peer.remote_qpn,
+ .rq_psn = priv->peer.remote_psn,
+ .max_dest_rd_atomic = 1,
+ .min_rnr_timer = 12,
+ .ah_attr = {
+ .is_global = 0,
+ .dlid = priv->peer.remote_lid,
+ .sl = 0,
+ .src_path_bits = 0,
+ .port_num = options->port
+ }
+ };
+ if (ibv_modify_qp (priv->peer.qp, &attr,
+ IBV_QP_STATE |
+ IBV_QP_AV |
+ IBV_QP_PATH_MTU |
+ IBV_QP_DEST_QPN |
+ IBV_QP_RQ_PSN |
+ IBV_QP_MAX_DEST_RD_ATOMIC |
+ IBV_QP_MIN_RNR_TIMER)) {
+ gf_log ("transport/ib-verbs",
+ GF_LOG_CRITICAL,
+ "Failed to modify QP to RTR\n");
+ return -1;
+ }
+
+ /* TODO: make timeout and retry_cnt configurable from options */
+ attr.qp_state = IBV_QPS_RTS;
+ attr.timeout = 14;
+ attr.retry_cnt = 7;
+ attr.rnr_retry = 7;
+ attr.sq_psn = priv->peer.local_psn;
+ attr.max_rd_atomic = 1;
+ if (ibv_modify_qp (priv->peer.qp, &attr,
+ IBV_QP_STATE |
+ IBV_QP_TIMEOUT |
+ IBV_QP_RETRY_CNT |
+ IBV_QP_RNR_RETRY |
+ IBV_QP_SQ_PSN |
+ IBV_QP_MAX_QP_RD_ATOMIC)) {
+ gf_log ("transport/ib-verbs",
+ GF_LOG_CRITICAL,
+ "Failed to modify QP to RTS\n");
+ return -1;
+ }
+
+ return 0;
+}
+
+static int32_t
+__ib_verbs_teardown (transport_t *this)
+{
+ ib_verbs_private_t *priv = this->private;
+
+ __ib_verbs_destroy_qp (this);
+
+ if (!list_empty (&priv->peer.ioq)) {
+ __ib_verbs_ioq_flush (&priv->peer);
+ }
+
+ /* TODO: decrement cq size */
+ return 0;
+}
+
+/*
+ * return value:
+ * 0 = success (completed)
+ * -1 = error
+ * > 0 = incomplete
+ */
+
+static int
+__tcp_rwv (transport_t *this, struct iovec *vector, int count,
+ struct iovec **pending_vector, int *pending_count,
+ int write)
+{
+ ib_verbs_private_t *priv = NULL;
+ int sock = -1;
+ int ret = -1;
+ struct iovec *opvector = vector;
+ int opcount = count;
+ int moved = 0;
+
+ priv = this->private;
+ sock = priv->sock;
+
+ while (opcount)
+ {
+ if (write)
+ {
+ ret = writev (sock, opvector, opcount);
+
+ if (ret == 0 || (ret == -1 && errno == EAGAIN))
+ {
+ /* done for now */
+ break;
+ }
+ }
+ else
+ {
+ ret = readv (sock, opvector, opcount);
+
+ if (ret == -1 && errno == EAGAIN)
+ {
+ /* done for now */
+ break;
+ }
+ }
+
+ if (ret == 0)
+ {
+ gf_log (this->xl->name, GF_LOG_ERROR, "EOF from peer");
+ opcount = -1;
+ errno = ENOTCONN;
+ break;
+ }
+
+ if (ret == -1)
+ {
+ if (errno == EINTR)
+ continue;
+
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "%s failed (%s)", write ? "writev" : "readv",
+ strerror (errno));
+ if (write && !priv->connected &&
+ (errno == ECONNREFUSED))
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "possible mismatch of 'transport-type'"
+ " in protocol server and client. "
+ "check volume file");
+ opcount = -1;
+ break;
+ }
+
+ moved = 0;
+
+ while (moved < ret)
+ {
+ if ((ret - moved) >= opvector[0].iov_len)
+ {
+ moved += opvector[0].iov_len;
+ opvector++;
+ opcount--;
+ }
+ else
+ {
+ opvector[0].iov_len -= (ret - moved);
+ opvector[0].iov_base += (ret - moved);
+ moved += (ret - moved);
+ }
+ while (opcount && !opvector[0].iov_len)
+ {
+ opvector++;
+ opcount--;
+ }
+ }
+ }
+
+ if (pending_vector)
+ *pending_vector = opvector;
+
+ if (pending_count)
+ *pending_count = opcount;
+
+ return opcount;
+}
+
+
+static int
+__tcp_readv (transport_t *this, struct iovec *vector, int count,
+ struct iovec **pending_vector, int *pending_count)
+{
+ int ret = -1;
+
+ ret = __tcp_rwv (this, vector, count,
+ pending_vector, pending_count, 0);
+
+ return ret;
+}
+
+
+static int
+__tcp_writev (transport_t *this, struct iovec *vector, int count,
+ struct iovec **pending_vector, int *pending_count)
+{
+ int ret = -1;
+ ib_verbs_private_t *priv = this->private;
+
+ ret = __tcp_rwv (this, vector, count, pending_vector,
+ pending_count, 1);
+
+ if (ret > 0) {
+ /* TODO: Avoid multiple calls when socket is already
+ registered for POLLOUT */
+ priv->idx = event_select_on (this->xl->ctx->event_pool,
+ priv->sock, priv->idx, -1, 1);
+ } else if (ret == 0) {
+ priv->idx = event_select_on (this->xl->ctx->event_pool,
+ priv->sock,
+ priv->idx, -1, 0);
+ }
+
+ return ret;
+}
+
+
+static void *
+ib_verbs_recv_completion_proc (void *data)
+{
+ struct ibv_comp_channel *chan = data;
+ ib_verbs_private_t *priv = NULL;
+ ib_verbs_device_t *device;
+ ib_verbs_post_t *post;
+ ib_verbs_peer_t *peer;
+ struct ibv_cq *event_cq;
+ struct ibv_wc wc;
+ void *event_ctx;
+ int32_t ret = 0;
+
+
+ while (1) {
+ ret = ibv_get_cq_event (chan, &event_cq, &event_ctx);
+ if (ret) {
+ gf_log ("transport/ib-verbs", GF_LOG_ERROR,
+ "ibv_get_cq_event failed, terminating recv "
+ "thread %d (%d)", ret, errno);
+ continue;
+ }
+
+ device = event_ctx;
+
+ ret = ibv_req_notify_cq (event_cq, 0);
+ if (ret) {
+ gf_log ("transport/ib-verbs", GF_LOG_ERROR,
+ "ibv_req_notify_cq on %s failed, terminating "
+ "recv thread: %d (%d)",
+ device->device_name, ret, errno);
+ continue;
+ }
+
+ device = (ib_verbs_device_t *) event_ctx;
+
+ while ((ret = ibv_poll_cq (event_cq, 1, &wc)) > 0) {
+ post = (ib_verbs_post_t *) (long) wc.wr_id;
+ peer = ib_verbs_lookup_peer (device, wc.qp_num);
+
+ if (wc.status != IBV_WC_SUCCESS) {
+ gf_log ("transport/ib-verbs", GF_LOG_ERROR,
+ "recv work request on `%s' returned "
+ "error (%d)",
+ device->device_name,
+ wc.status);
+ if (peer)
+ transport_disconnect (peer->trans);
+
+ if (post) {
+ ib_verbs_post_recv (device->srq, post);
+ }
+ continue;
+ }
+
+ if (peer) {
+ priv = peer->trans->private;
+
+ pthread_mutex_lock (&priv->recv_mutex);
+ {
+/* while (priv->data_ptr)
+ pthread_cond_wait (&priv->recv_cond, &priv->recv_mutex);
+*/
+
+ priv->data_ptr = post->buf;
+ priv->data_offset = 0;
+ priv->data_len = wc.byte_len;
+
+ /*pthread_cond_broadcast (&priv->recv_cond);*/
+ }
+ pthread_mutex_unlock (&priv->recv_mutex);
+
+ if ((ret = peer->trans->xl->notify (peer->trans->xl, GF_EVENT_POLLIN,
+ peer->trans, NULL)) == -1) {
+ gf_log ("transport/ib-verbs",
+ GF_LOG_ERROR,
+ "pollin notification to %s "
+ "failed, disconnecting "
+ "transport",
+ peer->trans->xl->name);
+ transport_disconnect (peer->trans);
+ }
+ } else {
+ gf_log ("transport/ib-verbs",
+ GF_LOG_DEBUG,
+ "could not lookup peer for qp_num: %d",
+ wc.qp_num);
+ }
+ ib_verbs_post_recv (device->srq, post);
+ }
+
+ if (ret < 0) {
+ gf_log ("transport/ib-verbs",
+ GF_LOG_ERROR,
+ "ibv_poll_cq on `%s' returned error "
+ "(ret = %d, errno = %d)",
+ device->device_name, ret, errno);
+ continue;
+ }
+ ibv_ack_cq_events (event_cq, 1);
+ }
+ return NULL;
+}
+
+
+static void *
+ib_verbs_send_completion_proc (void *data)
+{
+ struct ibv_comp_channel *chan = data;
+ ib_verbs_post_t *post;
+ ib_verbs_peer_t *peer;
+ struct ibv_cq *event_cq;
+ void *event_ctx;
+ ib_verbs_device_t *device;
+ struct ibv_wc wc;
+ int32_t ret;
+
+ while (1) {
+ ret = ibv_get_cq_event (chan, &event_cq, &event_ctx);
+ if (ret) {
+ gf_log ("transport/ib-verbs", GF_LOG_ERROR,
+ "ibv_get_cq_event on failed, terminating "
+ "send thread: %d (%d)", ret, errno);
+ continue;
+ }
+
+ device = event_ctx;
+
+ ret = ibv_req_notify_cq (event_cq, 0);
+ if (ret) {
+ gf_log ("transport/ib-verbs", GF_LOG_ERROR,
+ "ibv_req_notify_cq on %s failed, terminating "
+ "send thread: %d (%d)",
+ device->device_name, ret, errno);
+ continue;
+ }
+
+ while ((ret = ibv_poll_cq (event_cq, 1, &wc)) > 0) {
+ post = (ib_verbs_post_t *) (long) wc.wr_id;
+ peer = ib_verbs_lookup_peer (device, wc.qp_num);
+
+ if (wc.status != IBV_WC_SUCCESS) {
+ gf_log ("transport/ib-verbs", GF_LOG_ERROR,
+ "send work request on `%s' returned "
+ "error wc.status = %d, wc.vendor_err "
+ "= %d, post->buf = %p, wc.byte_len = "
+ "%d, post->reused = %d",
+ device->device_name, wc.status,
+ wc.vendor_err,
+ post->buf, wc.byte_len, post->reused);
+ if (peer)
+ transport_disconnect (peer->trans);
+ }
+
+ if (post) {
+ ib_verbs_put_post (&device->sendq, post);
+ }
+
+ if (peer) {
+ int quota_ret = ib_verbs_quota_put (peer);
+ if (quota_ret < 0) {
+ gf_log ("ib-verbs", GF_LOG_WARNING,
+ "failed to send message");
+
+ }
+ } else {
+ gf_log ("transport/ib-verbs", GF_LOG_DEBUG,
+ "could not lookup peer for qp_num: %d",
+ wc.qp_num);
+ }
+ }
+
+ if (ret < 0) {
+ gf_log ("transport/ib-verbs", GF_LOG_ERROR,
+ "ibv_poll_cq on `%s' returned error (ret = %d,"
+ " errno = %d)",
+ device->device_name, ret, errno);
+ continue;
+ }
+ ibv_ack_cq_events (event_cq, 1);
+ }
+
+ return NULL;
+}
+
+static void
+ib_verbs_options_init (transport_t *this)
+{
+ ib_verbs_private_t *priv = this->private;
+ ib_verbs_options_t *options = &priv->options;
+ int32_t mtu;
+ data_t *temp;
+
+ /* TODO: validate arguments from options below */
+
+ options->send_size = 1048576;
+ options->recv_size = 1048576;
+ options->send_count = 16;
+ options->recv_count = 16;
+
+ temp = dict_get (this->xl->options,
+ "transport.ib-verbs.work-request-send-count");
+ if (temp)
+ options->send_count = data_to_int32 (temp);
+
+ temp = dict_get (this->xl->options,
+ "transport.ib-verbs.work-request-recv-count");
+ if (temp)
+ options->recv_count = data_to_int32 (temp);
+
+ temp = dict_get (this->xl->options,
+ "transport.ib-verbs.work-request-send-size");
+ if (temp)
+ options->send_size = data_to_int32 (temp);
+
+ temp = dict_get (this->xl->options,
+ "transport.ib-verbs.work-request-recv-size");
+ if (temp)
+ options->recv_size = data_to_int32 (temp);
+
+ options->port = 1;
+ temp = dict_get (this->xl->options,
+ "transport.ib-verbs.port");
+ if (temp)
+ options->port = data_to_uint64 (temp);
+
+ options->mtu = mtu = IBV_MTU_2048;
+ temp = dict_get (this->xl->options,
+ "transport.ib-verbs.mtu");
+ if (temp)
+ mtu = data_to_int32 (temp);
+ switch (mtu) {
+ case 256: options->mtu = IBV_MTU_256;
+ break;
+ case 512: options->mtu = IBV_MTU_512;
+ break;
+ case 1024: options->mtu = IBV_MTU_1024;
+ break;
+ case 2048: options->mtu = IBV_MTU_2048;
+ break;
+ case 4096: options->mtu = IBV_MTU_4096;
+ break;
+ default:
+ if (temp)
+ gf_log ("transport/ib-verbs", GF_LOG_ERROR,
+ "%s: unrecognized MTU value '%s', defaulting "
+ "to '2048'", this->xl->name,
+ data_to_str (temp));
+ else
+ gf_log ("transport/ib-verbs", GF_LOG_DEBUG,
+ "%s: defaulting MTU to '2048'",
+ this->xl->name);
+ options->mtu = IBV_MTU_2048;
+ break;
+ }
+
+ temp = dict_get (this->xl->options,
+ "transport.ib-verbs.device-name");
+ if (temp)
+ options->device_name = strdup (temp->data);
+
+ return;
+}
+
+static void
+ib_verbs_queue_init (ib_verbs_queue_t *queue)
+{
+ pthread_mutex_init (&queue->lock, NULL);
+
+ queue->active_posts.next = &queue->active_posts;
+ queue->active_posts.prev = &queue->active_posts;
+ queue->passive_posts.next = &queue->passive_posts;
+ queue->passive_posts.prev = &queue->passive_posts;
+}
+
+
+static ib_verbs_device_t *
+ib_verbs_get_device (transport_t *this,
+ struct ibv_device *ib_dev,
+ int32_t port)
+{
+ glusterfs_ctx_t *ctx = this->xl->ctx;
+ ib_verbs_private_t *priv = this->private;
+ ib_verbs_options_t *options = &priv->options;
+ char *device_name = priv->options.device_name;
+ int32_t ret = 0, i = 0;
+
+ ib_verbs_device_t *trav;
+
+ trav = ctx->ib;
+ while (trav) {
+ if ((!strcmp (trav->device_name, device_name)) &&
+ (trav->port == port))
+ break;
+ trav = trav->next;
+ }
+
+ if (!trav) {
+ struct ibv_context *ibctx = ibv_open_device (ib_dev);
+
+ if (!ibctx) {
+ gf_log ("transport/ib-verbs", GF_LOG_ERROR,
+ "cannot open device `%s'",
+ device_name);
+ return NULL;
+ }
+
+ trav = CALLOC (1, sizeof (*trav));
+ ERR_ABORT (trav);
+ priv->device = trav;
+
+ trav->context = ibctx;
+ trav->device_name = strdup (device_name);
+ trav->port = port;
+
+ trav->next = ctx->ib;
+ ctx->ib = trav;
+
+ trav->send_chan = ibv_create_comp_channel (trav->context);
+ if (!trav->send_chan) {
+ gf_log ("transport/ib-verbs", GF_LOG_CRITICAL,
+ "%s: could not create send completion channel",
+ device_name);
+ /* TODO: cleanup current mess */
+ return NULL;
+ }
+
+ trav->recv_chan = ibv_create_comp_channel (trav->context);
+ if (!trav->recv_chan) {
+ gf_log ("transport/ib-verbs", GF_LOG_CRITICAL,
+ "could not create recv completion channel");
+ /* TODO: cleanup current mess */
+ return NULL;
+ }
+
+ if (ib_verbs_create_cq (this) < 0) {
+ gf_log ("transport/ib-verbs", GF_LOG_ERROR,
+ "%s: could not create CQ",
+ this->xl->name);
+ return NULL;
+ }
+
+ /* protection domain */
+ trav->pd = ibv_alloc_pd (trav->context);
+
+ if (!trav->pd) {
+ gf_log ("transport/ib-verbs", GF_LOG_CRITICAL,
+ "%s: could not allocate protection domain",
+ this->xl->name);
+ return NULL;
+ }
+
+ struct ibv_srq_init_attr attr = {
+ .attr = {
+ .max_wr = options->recv_count,
+ .max_sge = 1
+ }
+ };
+ trav->srq = ibv_create_srq (trav->pd, &attr);
+
+ if (!trav->srq) {
+ gf_log ("transport/ib-verbs", GF_LOG_CRITICAL,
+ "%s: could not create SRQ",
+ this->xl->name);
+ return NULL;
+ }
+
+ /* queue init */
+ ib_verbs_queue_init (&trav->sendq);
+ ib_verbs_queue_init (&trav->recvq);
+
+ if (ib_verbs_create_posts (this) < 0) {
+ gf_log ("transport/ib-verbs", GF_LOG_ERROR,
+ "%s: could not allocate posts",
+ this->xl->name);
+ return NULL;
+ }
+
+ /* completion threads */
+ ret = pthread_create (&trav->send_thread,
+ NULL,
+ ib_verbs_send_completion_proc,
+ trav->send_chan);
+ if (ret) {
+ gf_log ("transport/ib-verbs", GF_LOG_ERROR,
+ "could not create send completion thread");
+ return NULL;
+ }
+ ret = pthread_create (&trav->recv_thread,
+ NULL,
+ ib_verbs_recv_completion_proc,
+ trav->recv_chan);
+ if (ret) {
+ gf_log ("transport/ib-verbs", GF_LOG_ERROR,
+ "could not create recv completion thread");
+ return NULL;
+ }
+
+ /* qpreg */
+ pthread_mutex_init (&trav->qpreg.lock, NULL);
+ for (i=0; i<42; i++) {
+ trav->qpreg.ents[i].next = &trav->qpreg.ents[i];
+ trav->qpreg.ents[i].prev = &trav->qpreg.ents[i];
+ }
+ }
+ return trav;
+}
+
+static int32_t
+ib_verbs_init (transport_t *this)
+{
+ ib_verbs_private_t *priv = this->private;
+ ib_verbs_options_t *options = &priv->options;
+ struct ibv_device **dev_list;
+ struct ibv_device *ib_dev = NULL;
+ int32_t i;
+
+ ib_verbs_options_init (this);
+
+ {
+ dev_list = ibv_get_device_list (NULL);
+
+ if (!dev_list) {
+ gf_log ("transport/ib-verbs",
+ GF_LOG_CRITICAL,
+ "No IB devices found");
+ return -1;
+ }
+
+ if (!options->device_name) {
+ if (*dev_list) {
+ options->device_name =
+ strdup (ibv_get_device_name (*dev_list));
+ } else {
+ gf_log ("transport/ib-verbs", GF_LOG_CRITICAL,
+ "IB device list is empty. Check for "
+ "'ib_uverbs' module");
+ return -1;
+ }
+ }
+
+ for (i = 0; dev_list[i]; i++) {
+ if (!strcmp (ibv_get_device_name (dev_list[i]),
+ options->device_name)) {
+ ib_dev = dev_list[i];
+ break;
+ }
+ }
+
+ if (!ib_dev) {
+ gf_log ("transport/ib-verbs", GF_LOG_ERROR,
+ "could not open device `%s' (does not exist)",
+ options->device_name);
+ ibv_free_device_list (dev_list);
+ return -1;
+ }
+
+ priv->device = ib_verbs_get_device (this, ib_dev,
+ options->port);
+
+ if (!priv->device) {
+ gf_log ("transport/ib-verbs", GF_LOG_ERROR,
+ "could not create ib_verbs device for %s",
+ options->device_name);
+ ibv_free_device_list (dev_list);
+ return -1;
+ }
+ ibv_free_device_list (dev_list);
+ }
+
+ priv->peer.trans = this;
+ INIT_LIST_HEAD (&priv->peer.ioq);
+
+ pthread_mutex_init (&priv->read_mutex, NULL);
+ pthread_mutex_init (&priv->write_mutex, NULL);
+ pthread_mutex_init (&priv->recv_mutex, NULL);
+ /* pthread_cond_init (&priv->recv_cond, NULL); */
+
+ return 0;
+}
+
+
+static int32_t
+ib_verbs_disconnect (transport_t *this)
+{
+ ib_verbs_private_t *priv = this->private;
+ int32_t ret = 0;
+
+ pthread_mutex_lock (&priv->write_mutex);
+ {
+ ret = __ib_verbs_disconnect (this);
+ }
+ pthread_mutex_unlock (&priv->write_mutex);
+
+ return ret;
+}
+
+
+static int32_t
+__tcp_connect_finish (int fd)
+{
+ int ret = -1;
+ int optval = 0;
+ socklen_t optlen = sizeof (int);
+
+ ret = getsockopt (fd, SOL_SOCKET, SO_ERROR,
+ (void *)&optval, &optlen);
+
+ if (ret == 0 && optval)
+ {
+ errno = optval;
+ ret = -1;
+ }
+
+ return ret;
+}
+
+static inline void
+ib_verbs_fill_handshake_data (char *buf, struct ib_verbs_nbio *nbio,
+ ib_verbs_private_t *priv)
+{
+ sprintf (buf,
+ "QP1:RECV_BLKSIZE=%08x:SEND_BLKSIZE=%08x\n"
+ "QP1:LID=%04x:QPN=%06x:PSN=%06x\n",
+ priv->peer.recv_size,
+ priv->peer.send_size,
+ priv->peer.local_lid,
+ priv->peer.local_qpn,
+ priv->peer.local_psn);
+
+ nbio->vector.iov_base = buf;
+ nbio->vector.iov_len = strlen (buf) + 1;
+ nbio->count = 1;
+ return;
+}
+
+static inline void
+ib_verbs_fill_handshake_ack (char *buf, struct ib_verbs_nbio *nbio)
+{
+ sprintf (buf, "DONE\n");
+ nbio->vector.iov_base = buf;
+ nbio->vector.iov_len = strlen (buf) + 1;
+ nbio->count = 1;
+ return;
+}
+
+static int
+ib_verbs_handshake_pollin (transport_t *this)
+{
+ int ret = 0;
+ ib_verbs_private_t *priv = this->private;
+ char *buf = priv->handshake.incoming.buf;
+ int32_t recv_buf_size, send_buf_size;
+ socklen_t sock_len;
+
+ if (priv->handshake.incoming.state == IB_VERBS_HANDSHAKE_COMPLETE) {
+ return -1;
+ }
+
+ pthread_mutex_lock (&priv->write_mutex);
+ {
+ while (priv->handshake.incoming.state != IB_VERBS_HANDSHAKE_COMPLETE)
+ {
+ switch (priv->handshake.incoming.state)
+ {
+ case IB_VERBS_HANDSHAKE_START:
+ buf = priv->handshake.incoming.buf = CALLOC (1, 256);
+ ib_verbs_fill_handshake_data (buf, &priv->handshake.incoming, priv);
+ buf[0] = 0;
+ priv->handshake.incoming.state = IB_VERBS_HANDSHAKE_RECEIVING_DATA;
+ break;
+
+ case IB_VERBS_HANDSHAKE_RECEIVING_DATA:
+ ret = __tcp_readv (this,
+ &priv->handshake.incoming.vector,
+ priv->handshake.incoming.count,
+ &priv->handshake.incoming.pending_vector,
+ &priv->handshake.incoming.pending_count);
+ if (ret == -1) {
+ goto unlock;
+ }
+
+ if (ret > 0) {
+ gf_log (this->xl->name, GF_LOG_DEBUG,
+ "partial header read on NB socket. continue later");
+ goto unlock;
+ }
+
+ if (!ret) {
+ priv->handshake.incoming.state = IB_VERBS_HANDSHAKE_RECEIVED_DATA;
+ }
+ break;
+
+ case IB_VERBS_HANDSHAKE_RECEIVED_DATA:
+ ret = sscanf (buf,
+ "QP1:RECV_BLKSIZE=%08x:SEND_BLKSIZE=%08x\n"
+ "QP1:LID=%04x:QPN=%06x:PSN=%06x\n",
+ &recv_buf_size,
+ &send_buf_size,
+ &priv->peer.remote_lid,
+ &priv->peer.remote_qpn,
+ &priv->peer.remote_psn);
+
+ if ((ret != 5) && (strncmp (buf, "QP1:", 4))) {
+ gf_log ("transport/ib-verbs",
+ GF_LOG_CRITICAL,
+ "%s: remote-host(%s)'s "
+ "transport type is different",
+ this->xl->name,
+ this->peerinfo.identifier);
+ ret = -1;
+ goto unlock;
+ }
+
+ if (recv_buf_size < priv->peer.recv_size)
+ priv->peer.recv_size = recv_buf_size;
+ if (send_buf_size < priv->peer.send_size)
+ priv->peer.send_size = send_buf_size;
+
+ gf_log ("transport/ib-verbs", GF_LOG_DEBUG,
+ "%s: transacted recv_size=%d "
+ "send_size=%d",
+ this->xl->name, priv->peer.recv_size,
+ priv->peer.send_size);
+
+ priv->peer.quota = priv->peer.send_count;
+
+ if (ib_verbs_connect_qp (this)) {
+ gf_log ("transport/ib-verbs",
+ GF_LOG_ERROR,
+ "%s: failed to connect with "
+ "remote QP", this->xl->name);
+ ret = -1;
+ goto unlock;
+ }
+ ib_verbs_fill_handshake_ack (buf, &priv->handshake.incoming);
+ buf[0] = 0;
+ priv->handshake.incoming.state = IB_VERBS_HANDSHAKE_RECEIVING_ACK;
+ break;
+
+ case IB_VERBS_HANDSHAKE_RECEIVING_ACK:
+ ret = __tcp_readv (this,
+ &priv->handshake.incoming.vector,
+ priv->handshake.incoming.count,
+ &priv->handshake.incoming.pending_vector,
+ &priv->handshake.incoming.pending_count);
+ if (ret == -1) {
+ goto unlock;
+ }
+
+ if (ret > 0) {
+ gf_log (this->xl->name, GF_LOG_DEBUG,
+ "partial header read on NB "
+ "socket. continue later");
+ goto unlock;
+ }
+
+ if (!ret) {
+ priv->handshake.incoming.state = IB_VERBS_HANDSHAKE_RECEIVED_ACK;
+ }
+ break;
+
+ case IB_VERBS_HANDSHAKE_RECEIVED_ACK:
+ if (strncmp (buf, "DONE", 4)) {
+ gf_log ("transport/ib-verbs",
+ GF_LOG_ERROR,
+ "%s: handshake-3 did not "
+ "return 'DONE' (%s)",
+ this->xl->name, buf);
+ ret = -1;
+ goto unlock;
+ }
+ ret = 0;
+ priv->connected = 1;
+ sock_len = sizeof (struct sockaddr_storage);
+ getpeername (priv->sock,
+ (struct sockaddr *) &this->peerinfo.sockaddr,
+ &sock_len);
+
+ FREE (priv->handshake.incoming.buf);
+ priv->handshake.incoming.buf = NULL;
+ priv->handshake.incoming.state = IB_VERBS_HANDSHAKE_COMPLETE;
+ }
+ }
+ }
+unlock:
+ pthread_mutex_unlock (&priv->write_mutex);
+
+ if (ret == -1) {
+ transport_disconnect (this);
+ } else {
+ ret = 0;
+ }
+
+ if (!ret && priv->connected) {
+ ret = this->xl->notify (this->xl, GF_EVENT_CHILD_UP, this);
+ }
+
+ return ret;
+}
+
+static int
+ib_verbs_handshake_pollout (transport_t *this)
+{
+ ib_verbs_private_t *priv = this->private;
+ char *buf = priv->handshake.outgoing.buf;
+ int32_t ret = 0;
+
+ if (priv->handshake.outgoing.state == IB_VERBS_HANDSHAKE_COMPLETE) {
+ return 0;
+ }
+
+ pthread_mutex_unlock (&priv->write_mutex);
+ {
+ while (priv->handshake.outgoing.state != IB_VERBS_HANDSHAKE_COMPLETE)
+ {
+ switch (priv->handshake.outgoing.state)
+ {
+ case IB_VERBS_HANDSHAKE_START:
+ buf = priv->handshake.outgoing.buf = CALLOC (1, 256);
+ ib_verbs_fill_handshake_data (buf, &priv->handshake.outgoing, priv);
+ priv->handshake.outgoing.state = IB_VERBS_HANDSHAKE_SENDING_DATA;
+ break;
+
+ case IB_VERBS_HANDSHAKE_SENDING_DATA:
+ ret = __tcp_writev (this,
+ &priv->handshake.outgoing.vector,
+ priv->handshake.outgoing.count,
+ &priv->handshake.outgoing.pending_vector,
+ &priv->handshake.outgoing.pending_count);
+ if (ret == -1) {
+ goto unlock;
+ }
+
+ if (ret > 0) {
+ gf_log (this->xl->name, GF_LOG_DEBUG,
+ "partial header read on NB socket. continue later");
+ goto unlock;
+ }
+
+ if (!ret) {
+ priv->handshake.outgoing.state = IB_VERBS_HANDSHAKE_SENT_DATA;
+ }
+ break;
+
+ case IB_VERBS_HANDSHAKE_SENT_DATA:
+ ib_verbs_fill_handshake_ack (buf, &priv->handshake.outgoing);
+ priv->handshake.outgoing.state = IB_VERBS_HANDSHAKE_SENDING_ACK;
+ break;
+
+ case IB_VERBS_HANDSHAKE_SENDING_ACK:
+ ret = __tcp_writev (this,
+ &priv->handshake.outgoing.vector,
+ priv->handshake.outgoing.count,
+ &priv->handshake.outgoing.pending_vector,
+ &priv->handshake.outgoing.pending_count);
+
+ if (ret == -1) {
+ goto unlock;
+ }
+
+ if (ret > 0) {
+ gf_log (this->xl->name, GF_LOG_DEBUG,
+ "partial header read on NB "
+ "socket. continue later");
+ goto unlock;
+ }
+
+ if (!ret) {
+ FREE (priv->handshake.outgoing.buf);
+ priv->handshake.outgoing.buf = NULL;
+ priv->handshake.outgoing.state = IB_VERBS_HANDSHAKE_COMPLETE;
+ }
+ break;
+ }
+ }
+ }
+unlock:
+ pthread_mutex_unlock (&priv->write_mutex);
+
+ if (ret == -1) {
+ transport_disconnect (this);
+ } else {
+ ret = 0;
+ }
+
+ return ret;
+}
+
+static int
+ib_verbs_handshake_pollerr (transport_t *this)
+{
+ ib_verbs_private_t *priv = this->private;
+ int32_t ret = 0;
+ char need_unref = 0;
+
+ gf_log ("transport/ib-verbs", GF_LOG_DEBUG,
+ "%s: peer disconnected, cleaning up",
+ this->xl->name);
+
+ pthread_mutex_lock (&priv->write_mutex);
+ {
+ __ib_verbs_teardown (this);
+
+ if (priv->sock != -1) {
+ event_unregister (this->xl->ctx->event_pool,
+ priv->sock, priv->idx);
+ need_unref = 1;
+
+ if (close (priv->sock) != 0) {
+ gf_log ("transport/ib-verbs", GF_LOG_ERROR,
+ "close () - error: %s",
+ strerror (errno));
+ ret = -errno;
+ }
+ priv->tcp_connected = priv->connected = 0;
+ priv->sock = -1;
+ }
+
+ if (priv->handshake.incoming.buf) {
+ FREE (priv->handshake.incoming.buf);
+ priv->handshake.incoming.buf = NULL;
+ }
+
+ priv->handshake.incoming.state = IB_VERBS_HANDSHAKE_START;
+
+ if (priv->handshake.outgoing.buf) {
+ FREE (priv->handshake.outgoing.buf);
+ priv->handshake.outgoing.buf = NULL;
+ }
+
+ priv->handshake.outgoing.state = IB_VERBS_HANDSHAKE_START;
+ }
+ pthread_mutex_unlock (&priv->write_mutex);
+
+ this->xl->notify (this->xl, GF_EVENT_POLLERR, this, NULL);
+
+ if (need_unref)
+ transport_unref (this);
+
+ return 0;
+}
+
+
+static int
+tcp_connect_finish (transport_t *this)
+{
+ ib_verbs_private_t *priv = this->private;
+ int error = 0, ret = 0;
+
+ pthread_mutex_lock (&priv->write_mutex);
+ {
+ ret = __tcp_connect_finish (priv->sock);
+
+ if (!ret) {
+ this->myinfo.sockaddr_len =
+ sizeof (this->myinfo.sockaddr);
+ ret = getsockname (priv->sock,
+ (struct sockaddr *)&this->myinfo.sockaddr,
+ &this->myinfo.sockaddr_len);
+ if (ret == -1)
+ {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "getsockname on new client-socket %d "
+ "failed (%s)",
+ priv->sock, strerror (errno));
+ close (priv->sock);
+ error = 1;
+ goto unlock;
+ }
+
+ get_transport_identifiers (this);
+ priv->tcp_connected = 1;
+ }
+
+ if (ret == -1 && errno != EINPROGRESS) {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "tcp connect to %s failed (%s)",
+ this->peerinfo.identifier, strerror (errno));
+ error = 1;
+ }
+ }
+unlock:
+ pthread_mutex_unlock (&priv->write_mutex);
+
+ if (error) {
+ transport_disconnect (this);
+ }
+
+ return ret;
+}
+
+static int
+ib_verbs_event_handler (int fd, int idx, void *data,
+ int poll_in, int poll_out, int poll_err)
+{
+ transport_t *this = data;
+ ib_verbs_private_t *priv = this->private;
+ ib_verbs_options_t *options = NULL;
+ int ret = 0;
+
+ if (!priv->tcp_connected) {
+ ret = tcp_connect_finish (this);
+ if (priv->tcp_connected) {
+ options = &priv->options;
+
+ priv->peer.send_count = options->send_count;
+ priv->peer.recv_count = options->recv_count;
+ priv->peer.send_size = options->send_size;
+ priv->peer.recv_size = options->recv_size;
+
+ if ((ret = ib_verbs_create_qp (this)) < 0) {
+ gf_log ("transport/ib-verbs", GF_LOG_ERROR,
+ "%s: could not create QP",
+ this->xl->name);
+ transport_disconnect (this);
+ }
+ }
+ }
+
+ if (!ret && poll_out && priv->tcp_connected) {
+ ret = ib_verbs_handshake_pollout (this);
+ }
+
+ if (!ret && poll_in && priv->tcp_connected) {
+ if (priv->handshake.incoming.state == IB_VERBS_HANDSHAKE_COMPLETE) {
+ gf_log ("transport/ib-verbs", GF_LOG_ERROR,
+ "%s: pollin received on tcp socket (peer: %s) "
+ "after handshake is complete",
+ this->xl->name, this->peerinfo.identifier);
+ ib_verbs_handshake_pollerr (this);
+ return 0;
+ }
+ ret = ib_verbs_handshake_pollin (this);
+ }
+
+ if (poll_err) {
+ ret = ib_verbs_handshake_pollerr (this);
+ }
+
+ return 0;
+}
+
+static int
+__tcp_nonblock (int fd)
+{
+ int flags = 0;
+ int ret = -1;
+
+ flags = fcntl (fd, F_GETFL);
+
+ if (flags != -1)
+ ret = fcntl (fd, F_SETFL, flags | O_NONBLOCK);
+
+ return ret;
+}
+
+static int32_t
+ib_verbs_connect (struct transport *this)
+{
+ dict_t *options = this->xl->options;
+
+ ib_verbs_private_t *priv = this->private;
+
+ int32_t ret = 0;
+ gf_boolean_t non_blocking = 1;
+ struct sockaddr_storage sockaddr;
+ socklen_t sockaddr_len = 0;
+
+ if (priv->connected) {
+ return 0;
+ }
+
+ if (dict_get (options, "non-blocking-io")) {
+ char *nb_connect = data_to_str (dict_get (this->xl->options,
+ "non-blocking-io"));
+
+ if (gf_string2boolean (nb_connect, &non_blocking) == -1) {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "'non-blocking-io' takes only boolean "
+ "options, not taking any action");
+ non_blocking = 1;
+ }
+ }
+
+ ret = ibverbs_client_get_remote_sockaddr (this, (struct sockaddr *)&sockaddr,
+ &sockaddr_len);
+ if (ret != 0) {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "cannot get remote address to connect");
+ return ret;
+ }
+
+ pthread_mutex_lock (&priv->write_mutex);
+ {
+ if (priv->sock != -1) {
+ ret = 0;
+ goto unlock;
+ }
+
+ priv->sock = socket (((struct sockaddr *)&sockaddr)->sa_family,
+ SOCK_STREAM, 0);
+
+ if (priv->sock == -1) {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "socket () - error: %s", strerror (errno));
+ ret = -errno;
+ goto unlock;
+ }
+
+ gf_log (this->xl->name, GF_LOG_DEBUG,
+ "socket fd = %d", priv->sock);
+
+ memcpy (&this->peerinfo.sockaddr, &sockaddr, sockaddr_len);
+ this->peerinfo.sockaddr_len = sockaddr_len;
+
+ ((struct sockaddr *) &this->myinfo.sockaddr)->sa_family =
+ ((struct sockaddr *)&this->peerinfo.sockaddr)->sa_family;
+
+ if (non_blocking)
+ {
+ ret = __tcp_nonblock (priv->sock);
+
+ if (ret == -1)
+ {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "could not set socket %d to non "
+ "blocking mode (%s)",
+ priv->sock, strerror (errno));
+ close (priv->sock);
+ priv->sock = -1;
+ goto unlock;
+ }
+ }
+
+ ret = client_bind (this,
+ (struct sockaddr *)&this->myinfo.sockaddr,
+ &this->myinfo.sockaddr_len, priv->sock);
+ if (ret == -1)
+ {
+ gf_log (this->xl->name, GF_LOG_WARNING,
+ "client bind failed: %s", strerror (errno));
+ close (priv->sock);
+ priv->sock = -1;
+ goto unlock;
+ }
+
+ ret = connect (priv->sock,
+ (struct sockaddr *)&this->peerinfo.sockaddr,
+ this->peerinfo.sockaddr_len);
+ if (ret == -1 && errno != EINPROGRESS)
+ {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "connection attempt failed (%s)",
+ strerror (errno));
+ close (priv->sock);
+ priv->sock = -1;
+ goto unlock;
+ }
+
+ priv->tcp_connected = priv->connected = 0;
+
+ transport_ref (this);
+
+ priv->handshake.incoming.state = IB_VERBS_HANDSHAKE_START;
+ priv->handshake.outgoing.state = IB_VERBS_HANDSHAKE_START;
+
+ priv->idx = event_register (this->xl->ctx->event_pool,
+ priv->sock, ib_verbs_event_handler,
+ this, 1, 1);
+ }
+unlock:
+ pthread_mutex_unlock (&priv->write_mutex);
+
+ return ret;
+}
+
+static int
+ib_verbs_server_event_handler (int fd, int idx, void *data,
+ int poll_in, int poll_out, int poll_err)
+{
+ int32_t main_sock = -1;
+ transport_t *this, *trans = data;
+ ib_verbs_private_t *priv = NULL;
+ ib_verbs_private_t *trans_priv = (ib_verbs_private_t *) trans->private;
+ ib_verbs_options_t *options = NULL;
+
+ if (!poll_in)
+ return 0;
+
+ this = CALLOC (1, sizeof (transport_t));
+ ERR_ABORT (this);
+ priv = CALLOC (1, sizeof (ib_verbs_private_t));
+ ERR_ABORT (priv);
+ this->private = priv;
+ /* Copy all the ib_verbs related values in priv, from trans_priv
+ as other than QP, all the values remain same */
+ priv->device = trans_priv->device;
+ priv->options = trans_priv->options;
+ options = &priv->options;
+
+ this->ops = trans->ops;
+ this->xl = trans->xl;
+
+ memcpy (&this->myinfo.sockaddr, &trans->myinfo.sockaddr,
+ trans->myinfo.sockaddr_len);
+ this->myinfo.sockaddr_len = trans->myinfo.sockaddr_len;
+
+ main_sock = (trans_priv)->sock;
+ this->peerinfo.sockaddr_len = sizeof (this->peerinfo.sockaddr);
+ priv->sock = accept (main_sock,
+ (struct sockaddr *)&this->peerinfo.sockaddr,
+ &this->peerinfo.sockaddr_len);
+ if (priv->sock == -1) {
+ gf_log ("ib-verbs/server", GF_LOG_ERROR,
+ "accept() failed: %s",
+ strerror (errno));
+ free (this->private);
+ free (this);
+ return -1;
+ }
+
+ priv->peer.trans = this;
+ transport_ref (this);
+
+ get_transport_identifiers (this);
+
+ priv->tcp_connected = 1;
+ priv->handshake.incoming.state = IB_VERBS_HANDSHAKE_START;
+ priv->handshake.outgoing.state = IB_VERBS_HANDSHAKE_START;
+
+ priv->peer.send_count = options->send_count;
+ priv->peer.recv_count = options->recv_count;
+ priv->peer.send_size = options->send_size;
+ priv->peer.recv_size = options->recv_size;
+ INIT_LIST_HEAD (&priv->peer.ioq);
+
+ if (ib_verbs_create_qp (this) < 0) {
+ gf_log ("transport/ib-verbs", GF_LOG_ERROR,
+ "%s: could not create QP",
+ this->xl->name);
+ transport_disconnect (this);
+ return -1;
+ }
+
+ priv->idx = event_register (this->xl->ctx->event_pool, priv->sock,
+ ib_verbs_event_handler, this, 1, 1);
+
+ pthread_mutex_init (&priv->read_mutex, NULL);
+ pthread_mutex_init (&priv->write_mutex, NULL);
+ pthread_mutex_init (&priv->recv_mutex, NULL);
+ /* pthread_cond_init (&priv->recv_cond, NULL); */
+
+ return 0;
+}
+
+static int32_t
+ib_verbs_listen (transport_t *this)
+{
+ struct sockaddr_storage sockaddr;
+ socklen_t sockaddr_len;
+ ib_verbs_private_t *priv = this->private;
+ int opt = 1, ret = 0;
+ char service[NI_MAXSERV], host[NI_MAXHOST];
+
+ memset (&sockaddr, 0, sizeof (sockaddr));
+ ret = ibverbs_server_get_local_sockaddr (this,
+ (struct sockaddr *)&sockaddr,
+ &sockaddr_len);
+ if (ret != 0) {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "cannot find network address of server to bind to");
+ goto err;
+ }
+
+ priv->sock = socket (((struct sockaddr *)&sockaddr)->sa_family,
+ SOCK_STREAM, 0);
+ if (priv->sock == -1) {
+ gf_log ("ib-verbs/server", GF_LOG_CRITICAL,
+ "init: failed to create socket, error: %s",
+ strerror (errno));
+ free (this->private);
+ ret = -1;
+ goto err;
+ }
+
+ memcpy (&this->myinfo.sockaddr, &sockaddr, sockaddr_len);
+ this->myinfo.sockaddr_len = sockaddr_len;
+
+ ret = getnameinfo ((struct sockaddr *)&this->myinfo.sockaddr,
+ this->myinfo.sockaddr_len,
+ host, sizeof (host),
+ service, sizeof (service),
+ NI_NUMERICHOST);
+ if (ret != 0) {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "getnameinfo failed (%s)", gai_strerror (ret));
+ goto err;
+ }
+ sprintf (this->myinfo.identifier, "%s:%s", host, service);
+
+ setsockopt (priv->sock, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof (opt));
+ if (bind (priv->sock,
+ (struct sockaddr *)&sockaddr,
+ sockaddr_len) != 0) {
+ ret = -1;
+ gf_log ("ib-verbs/server", GF_LOG_CRITICAL,
+ "init: failed to bind to socket for %s (%s)",
+ this->myinfo.identifier, strerror (errno));
+ goto err;
+ }
+
+ if (listen (priv->sock, 10) != 0) {
+ gf_log ("ib-verbs/server", GF_LOG_CRITICAL,
+ "init: listen () failed on socket for %s (%s)",
+ this->myinfo.identifier, strerror (errno));
+ ret = -1;
+ goto err;
+ }
+
+ /* Register the main socket */
+ priv->idx = event_register (this->xl->ctx->event_pool, priv->sock,
+ ib_verbs_server_event_handler,
+ transport_ref (this), 1, 0);
+
+err:
+ return ret;
+}
+
+struct transport_ops tops = {
+ .receive = ib_verbs_receive,
+ .submit = ib_verbs_submit,
+ .connect = ib_verbs_connect,
+ .disconnect = ib_verbs_disconnect,
+ .listen = ib_verbs_listen,
+};
+
+int32_t
+init (transport_t *this)
+{
+ ib_verbs_private_t *priv = CALLOC (1, sizeof (*priv));
+ this->private = priv;
+ priv->sock = -1;
+
+ if (ib_verbs_init (this)) {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "Failed to initialize IB Device");
+ return -1;
+ }
+
+ return 0;
+}
+
+void
+fini (struct transport *this)
+{
+ /* TODO: verify this function does graceful finish */
+ ib_verbs_private_t *priv = this->private;
+ this->private = NULL;
+
+ pthread_mutex_destroy (&priv->recv_mutex);
+ pthread_mutex_destroy (&priv->write_mutex);
+ pthread_mutex_destroy (&priv->read_mutex);
+ /* pthread_cond_destroy (&priv->recv_cond); */
+
+ gf_log (this->xl->name, GF_LOG_CRITICAL,
+ "called fini on transport: %p",
+ this);
+ free (priv);
+ return;
+}
+
+/* TODO: expand each option */
+struct volume_options options[] = {
+ { .key = {"transport.ib-verbs.port",
+ "ib-verbs-port"},
+ .type = GF_OPTION_TYPE_INT,
+ .min = 1,
+ .max = 4,
+ .description = "check the option by 'ibv_devinfo'"
+ },
+ { .key = {"transport.ib-verbs.mtu",
+ "ib-verbs-mtu"},
+ .type = GF_OPTION_TYPE_INT,
+ },
+ { .key = {"transport.ib-verbs.device-name",
+ "ib-verbs-device-name"},
+ .type = GF_OPTION_TYPE_ANY,
+ .description = "check by 'ibv_devinfo'"
+ },
+ { .key = {"transport.ib-verbs.work-request-send-size",
+ "ib-verbs-work-request-send-size"},
+ .type = GF_OPTION_TYPE_INT,
+ },
+ { .key = {"transport.ib-verbs.work-request-recv-size",
+ "ib-verbs-work-request-recv-size"},
+ .type = GF_OPTION_TYPE_INT,
+ },
+ { .key = {"transport.ib-verbs.work-request-send-count",
+ "ib-verbs-work-request-send-count"},
+ .type = GF_OPTION_TYPE_INT,
+ },
+ { .key = {"transport.ib-verbs.work-request-recv-count",
+ "ib-verbs-work-request-recv-count"},
+ .type = GF_OPTION_TYPE_INT,
+ },
+ { .key = {"remote-port",
+ "transport.remote-port",
+ "transport.ib-verbs.remote-port"},
+ .type = GF_OPTION_TYPE_INT
+ },
+ { .key = {"transport.ib-verbs.listen-port", "listen-port"},
+ .type = GF_OPTION_TYPE_INT
+ },
+ { .key = {"transport.ib-verbs.connect-path", "connect-path"},
+ .type = GF_OPTION_TYPE_ANY
+ },
+ { .key = {"transport.ib-verbs.bind-path", "bind-path"},
+ .type = GF_OPTION_TYPE_ANY
+ },
+ { .key = {"transport.ib-verbs.listen-path", "listen-path"},
+ .type = GF_OPTION_TYPE_ANY
+ },
+ { .key = {"transport.address-family",
+ "address-family"},
+ .value = {"inet", "inet6", "inet/inet6", "inet6/inet",
+ "unix", "inet-sdp" },
+ .type = GF_OPTION_TYPE_STR
+ },
+ { .key = {NULL} }
+};
diff --git a/transport/ib-verbs/src/ib-verbs.h b/transport/ib-verbs/src/ib-verbs.h
new file mode 100644
index 00000000000..56b717865bf
--- /dev/null
+++ b/transport/ib-verbs/src/ib-verbs.h
@@ -0,0 +1,215 @@
+/*
+ Copyright (c) 2006, 2007, 2008 Z RESEARCH, Inc. <http://www.zresearch.com>
+ This file is part of GlusterFS.
+
+ GlusterFS is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published
+ by the Free Software Foundation; either version 3 of the License,
+ or (at your option) any later version.
+
+ GlusterFS is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program. If not, see
+ <http://www.gnu.org/licenses/>.
+*/
+
+#ifndef _XPORT_IB_VERBS_H
+#define _XPORT_IB_VERBS_H
+
+
+#ifndef _CONFIG_H
+#define _CONFIG_H
+#include "config.h"
+#endif
+
+#ifndef MAX_IOVEC
+#define MAX_IOVEC 16
+#endif /* MAX_IOVEC */
+
+#include "xlator.h"
+#include "event.h"
+
+#include <stdio.h>
+#include <list.h>
+#include <arpa/inet.h>
+#include <infiniband/verbs.h>
+
+#define GF_DEFAULT_IBVERBS_LISTEN_PORT 6997
+
+/* options per transport end point */
+struct _ib_verbs_options {
+ int32_t port;
+ char *device_name;
+ enum ibv_mtu mtu;
+ int32_t send_count, send_size, recv_count, recv_size;
+};
+typedef struct _ib_verbs_options ib_verbs_options_t;
+
+
+struct _ib_verbs_header {
+ char colonO[3];
+ uint32_t size1;
+ uint32_t size2;
+ char version;
+} __attribute__((packed));
+typedef struct _ib_verbs_header ib_verbs_header_t;
+
+struct _ib_verbs_ioq {
+ union {
+ struct list_head list;
+ struct {
+ struct _ib_verbs_ioq *next;
+ struct _ib_verbs_ioq *prev;
+ };
+ };
+ ib_verbs_header_t header;
+ struct iovec vector[MAX_IOVEC];
+ int count;
+ char *buf;
+ dict_t *refs;
+};
+typedef struct _ib_verbs_ioq ib_verbs_ioq_t;
+
+/* represents one communication peer, two per transport_t */
+struct _ib_verbs_peer {
+ transport_t *trans;
+ struct ibv_qp *qp;
+
+ int32_t recv_count;
+ int32_t send_count;
+ int32_t recv_size;
+ int32_t send_size;
+
+ int32_t quota;
+ union {
+ struct list_head ioq;
+ struct {
+ ib_verbs_ioq_t *ioq_next;
+ ib_verbs_ioq_t *ioq_prev;
+ };
+ };
+
+ /* QP attributes, needed to connect with remote QP */
+ int32_t local_lid;
+ int32_t local_psn;
+ int32_t local_qpn;
+ int32_t remote_lid;
+ int32_t remote_psn;
+ int32_t remote_qpn;
+};
+typedef struct _ib_verbs_peer ib_verbs_peer_t;
+
+
+struct _ib_verbs_post {
+ struct _ib_verbs_post *next, *prev;
+ struct ibv_mr *mr;
+ char *buf;
+ int32_t buf_size;
+ char aux;
+ int32_t reused;
+ pthread_barrier_t wait;
+};
+typedef struct _ib_verbs_post ib_verbs_post_t;
+
+
+struct _ib_verbs_queue {
+ ib_verbs_post_t active_posts, passive_posts;
+ int32_t active_count, passive_count;
+ pthread_mutex_t lock;
+};
+typedef struct _ib_verbs_queue ib_verbs_queue_t;
+
+
+struct _ib_verbs_qpreg {
+ pthread_mutex_t lock;
+ int32_t count;
+ struct _qpent {
+ struct _qpent *next, *prev;
+ int32_t qp_num;
+ ib_verbs_peer_t *peer;
+ } ents[42];
+};
+typedef struct _ib_verbs_qpreg ib_verbs_qpreg_t;
+
+/* context per device, stored in global glusterfs_ctx_t->ib */
+struct _ib_verbs_device {
+ struct _ib_verbs_device *next;
+ const char *device_name;
+ struct ibv_context *context;
+ int32_t port;
+ struct ibv_pd *pd;
+ struct ibv_srq *srq;
+ ib_verbs_qpreg_t qpreg;
+ struct ibv_comp_channel *send_chan, *recv_chan;
+ struct ibv_cq *send_cq, *recv_cq;
+ ib_verbs_queue_t sendq, recvq;
+ pthread_t send_thread, recv_thread;
+};
+typedef struct _ib_verbs_device ib_verbs_device_t;
+
+typedef enum {
+ IB_VERBS_HANDSHAKE_START = 0,
+ IB_VERBS_HANDSHAKE_SENDING_DATA,
+ IB_VERBS_HANDSHAKE_RECEIVING_DATA,
+ IB_VERBS_HANDSHAKE_SENT_DATA,
+ IB_VERBS_HANDSHAKE_RECEIVED_DATA,
+ IB_VERBS_HANDSHAKE_SENDING_ACK,
+ IB_VERBS_HANDSHAKE_RECEIVING_ACK,
+ IB_VERBS_HANDSHAKE_RECEIVED_ACK,
+ IB_VERBS_HANDSHAKE_COMPLETE,
+} ib_verbs_handshake_state_t;
+
+struct ib_verbs_nbio {
+ int state;
+ char *buf;
+ int count;
+ struct iovec vector;
+ struct iovec *pending_vector;
+ int pending_count;
+};
+
+
+struct _ib_verbs_private {
+ int32_t sock;
+ int32_t idx;
+ unsigned char connected;
+ unsigned char tcp_connected;
+ unsigned char ib_connected;
+ in_addr_t addr;
+ unsigned short port;
+
+ /* IB Verbs Driver specific variables, pointers */
+ ib_verbs_peer_t peer;
+ ib_verbs_device_t *device;
+ ib_verbs_options_t options;
+
+ /* Used by trans->op->receive */
+ char *data_ptr;
+ int32_t data_offset;
+ int32_t data_len;
+
+ /* Mutex */
+ pthread_mutex_t read_mutex;
+ pthread_mutex_t write_mutex;
+ pthread_barrier_t handshake_barrier;
+ char handshake_ret;
+
+ pthread_mutex_t recv_mutex;
+
+ /* used during ib_verbs_handshake */
+ struct {
+ struct ib_verbs_nbio incoming;
+ struct ib_verbs_nbio outgoing;
+ int state;
+ ib_verbs_header_t header;
+ char *buf;
+ size_t size;
+ } handshake;
+};
+typedef struct _ib_verbs_private ib_verbs_private_t;
+
+#endif /* _XPORT_IB_VERBS_H */
diff --git a/transport/ib-verbs/src/name.c b/transport/ib-verbs/src/name.c
new file mode 100644
index 00000000000..697344987b4
--- /dev/null
+++ b/transport/ib-verbs/src/name.c
@@ -0,0 +1,682 @@
+/*
+ Copyright (c) 2008 Z RESEARCH, Inc. <http://www.zresearch.com>
+ This file is part of GlusterFS.
+
+ GlusterFS is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published
+ by the Free Software Foundation; either version 3 of the License,
+ or (at your option) any later version.
+
+ GlusterFS is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program. If not, see
+ <http://www.gnu.org/licenses/>.
+*/
+
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <errno.h>
+#include <netdb.h>
+#include <string.h>
+
+#ifdef CLIENT_PORT_CEILING
+#undef CLIENT_PORT_CEILING
+#endif
+
+#define CLIENT_PORT_CEILING 1024
+
+#ifndef AF_INET_SDP
+#define AF_INET_SDP 27
+#endif
+
+#include "transport.h"
+#include "ib-verbs.h"
+
+int32_t
+gf_resolve_ip6 (const char *hostname,
+ uint16_t port,
+ int family,
+ void **dnscache,
+ struct addrinfo **addr_info);
+
+static int32_t
+af_inet_bind_to_port_lt_ceiling (int fd, struct sockaddr *sockaddr,
+ socklen_t sockaddr_len, int ceiling)
+{
+ int32_t ret = -1;
+ /* struct sockaddr_in sin = {0, }; */
+ uint16_t port = ceiling - 1;
+
+ while (port)
+ {
+ switch (sockaddr->sa_family)
+ {
+ case AF_INET6:
+ ((struct sockaddr_in6 *)sockaddr)->sin6_port = htons (port);
+ break;
+
+ case AF_INET_SDP:
+ case AF_INET:
+ ((struct sockaddr_in *)sockaddr)->sin_port = htons (port);
+ break;
+ }
+
+ ret = bind (fd, sockaddr, sockaddr_len);
+
+ if (ret == 0)
+ break;
+
+ if (ret == -1 && errno == EACCES)
+ break;
+
+ port--;
+ }
+
+ return ret;
+}
+
+static int32_t
+af_unix_client_bind (transport_t *this,
+ struct sockaddr *sockaddr,
+ socklen_t sockaddr_len,
+ int sock)
+{
+ data_t *path_data = NULL;
+ struct sockaddr_un *addr = NULL;
+ int32_t ret = -1;
+
+ path_data = dict_get (this->xl->options,
+ "transport.ib-verbs.bind-path");
+ if (path_data) {
+ char *path = data_to_str (path_data);
+ if (!path || strlen (path) > UNIX_PATH_MAX) {
+ gf_log (this->xl->name, GF_LOG_DEBUG,
+ "transport.ib-verbs.bind-path not specfied "
+ "for unix socket, letting connect to assign "
+ "default value");
+ goto err;
+ }
+
+ addr = (struct sockaddr_un *) sockaddr;
+ strcpy (addr->sun_path, path);
+ ret = bind (sock, (struct sockaddr *)addr, sockaddr_len);
+ if (ret == -1) {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "cannot bind to unix-domain socket %d (%s)",
+ sock, strerror (errno));
+ goto err;
+ }
+ }
+
+err:
+ return ret;
+}
+
+static int32_t
+client_fill_address_family (transport_t *this, struct sockaddr *sockaddr)
+{
+ data_t *address_family_data = NULL;
+
+ address_family_data = dict_get (this->xl->options,
+ "transport.address-family");
+ if (!address_family_data) {
+ data_t *remote_host_data = NULL, *connect_path_data = NULL;
+ remote_host_data = dict_get (this->xl->options, "remote-host");
+ connect_path_data = dict_get (this->xl->options,
+ "transport.ib-verbs.connect-path");
+
+ if (!(remote_host_data || connect_path_data) ||
+ (remote_host_data && connect_path_data)) {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "address-family not specified and not able to "
+ "determine the same from other options "
+ "(remote-host:%s and connect-path:%s)",
+ data_to_str (remote_host_data),
+ data_to_str (connect_path_data));
+ return -1;
+ }
+
+ if (remote_host_data) {
+ gf_log (this->xl->name, GF_LOG_DEBUG,
+ "address-family not specified, guessing it "
+ "to be inet/inet6");
+ sockaddr->sa_family = AF_UNSPEC;
+ } else {
+ gf_log (this->xl->name, GF_LOG_DEBUG,
+ "address-family not specified, guessing it "
+ "to be unix");
+ sockaddr->sa_family = AF_UNIX;
+ }
+
+ } else {
+ char *address_family = data_to_str (address_family_data);
+ if (!strcasecmp (address_family, "unix")) {
+ sockaddr->sa_family = AF_UNIX;
+ } else if (!strcasecmp (address_family, "inet")) {
+ sockaddr->sa_family = AF_INET;
+ } else if (!strcasecmp (address_family, "inet6")) {
+ sockaddr->sa_family = AF_INET6;
+ } else if (!strcasecmp (address_family, "inet-sdp")) {
+ sockaddr->sa_family = AF_INET_SDP;
+ } else if (!strcasecmp (address_family, "inet/inet6")
+ || !strcasecmp (address_family, "inet6/inet")) {
+ sockaddr->sa_family = AF_UNSPEC;
+ } else {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "unknown address-family (%s) specified",
+ address_family);
+ return -1;
+ }
+ }
+
+ return 0;
+}
+
+static int32_t
+af_inet_client_get_remote_sockaddr (transport_t *this,
+ struct sockaddr *sockaddr,
+ socklen_t *sockaddr_len)
+{
+ dict_t *options = this->xl->options;
+ data_t *remote_host_data = NULL;
+ data_t *remote_port_data = NULL;
+ char *remote_host = NULL;
+ uint16_t remote_port = 0;
+ struct addrinfo *addr_info = NULL;
+ int32_t ret = 0;
+
+ remote_host_data = dict_get (options, "remote-host");
+ if (remote_host_data == NULL)
+ {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "option remote-host missing in volume %s",
+ this->xl->name);
+ ret = -1;
+ goto err;
+ }
+
+ remote_host = data_to_str (remote_host_data);
+ if (remote_host == NULL)
+ {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "option remote-host has data NULL in volume %s",
+ this->xl->name);
+ ret = -1;
+ goto err;
+ }
+
+ remote_port_data = dict_get (options, "remote-port");
+ if (remote_port_data == NULL)
+ {
+ gf_log (this->xl->name, GF_LOG_DEBUG,
+ "option remote-port missing in volume %s. "
+ "Defaulting to %d",
+ this->xl->name, GF_DEFAULT_IBVERBS_LISTEN_PORT);
+
+ remote_port = GF_DEFAULT_IBVERBS_LISTEN_PORT;
+ }
+ else
+ {
+ remote_port = data_to_uint16 (remote_port_data);
+ }
+
+ if (remote_port == (uint16_t)-1)
+ {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "option remote-port has invalid port in volume %s",
+ this->xl->name);
+ ret = -1;
+ goto err;
+ }
+
+ /* TODO: gf_resolve is a blocking call. kick in some
+ non blocking dns techniques */
+ ret = gf_resolve_ip6 (remote_host, remote_port,
+ sockaddr->sa_family,
+ &this->dnscache, &addr_info);
+ if (ret == -1) {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "DNS resolution failed on host %s", remote_host);
+ goto err;
+ }
+
+ memcpy (sockaddr, addr_info->ai_addr, addr_info->ai_addrlen);
+ *sockaddr_len = addr_info->ai_addrlen;
+
+err:
+ return ret;
+}
+
+static int32_t
+af_unix_client_get_remote_sockaddr (transport_t *this,
+ struct sockaddr *sockaddr,
+ socklen_t *sockaddr_len)
+{
+ struct sockaddr_un *sockaddr_un = NULL;
+ char *connect_path = NULL;
+ data_t *connect_path_data = NULL;
+ int32_t ret = 0;
+
+ connect_path_data = dict_get (this->xl->options,
+ "transport.ib-verbs.connect-path");
+ if (!connect_path_data) {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "option transport.ib-verbs.connect-path not "
+ "specified for address-family unix");
+ ret = -1;
+ goto err;
+ }
+
+ connect_path = data_to_str (connect_path_data);
+ if (!connect_path) {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "connect-path is null-string");
+ ret = -1;
+ goto err;
+ }
+
+ if (strlen (connect_path) > UNIX_PATH_MAX) {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "connect-path value length %"GF_PRI_SIZET" > "
+ "%d octets", strlen (connect_path), UNIX_PATH_MAX);
+ ret = -1;
+ goto err;
+ }
+
+ gf_log (this->xl->name,
+ GF_LOG_DEBUG,
+ "using connect-path %s", connect_path);
+ sockaddr_un = (struct sockaddr_un *)sockaddr;
+ strcpy (sockaddr_un->sun_path, connect_path);
+ *sockaddr_len = sizeof (struct sockaddr_un);
+
+err:
+ return ret;
+}
+
+static int32_t
+af_unix_server_get_local_sockaddr (transport_t *this,
+ struct sockaddr *addr,
+ socklen_t *addr_len)
+{
+ data_t *listen_path_data = NULL;
+ char *listen_path = NULL;
+ int32_t ret = 0;
+ struct sockaddr_un *sunaddr = (struct sockaddr_un *)addr;
+
+
+ listen_path_data = dict_get (this->xl->options,
+ "transport.ib-verbs.listen-path");
+ if (!listen_path_data) {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "missing option listen-path");
+ ret = -1;
+ goto err;
+ }
+
+ listen_path = data_to_str (listen_path_data);
+
+#ifndef UNIX_PATH_MAX
+#define UNIX_PATH_MAX 108
+#endif
+
+ if (strlen (listen_path) > UNIX_PATH_MAX) {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "option listen-path has value length %"GF_PRI_SIZET" > %d",
+ strlen (listen_path), UNIX_PATH_MAX);
+ ret = -1;
+ goto err;
+ }
+
+ sunaddr->sun_family = AF_UNIX;
+ strcpy (sunaddr->sun_path, listen_path);
+ *addr_len = sizeof (struct sockaddr_un);
+
+err:
+ return ret;
+}
+
+static int32_t
+af_inet_server_get_local_sockaddr (transport_t *this,
+ struct sockaddr *addr,
+ socklen_t *addr_len)
+{
+ struct addrinfo hints, *res = 0;
+ data_t *listen_port_data = NULL, *listen_host_data = NULL;
+ uint16_t listen_port = -1;
+ char service[NI_MAXSERV], *listen_host = NULL;
+ dict_t *options = NULL;
+ int32_t ret = 0;
+
+ options = this->xl->options;
+
+ listen_port_data = dict_get (options, "transport.ib-verbs.listen-port");
+ listen_host_data = dict_get (options, "transport.ib-verbs.bind-address");
+
+ if (listen_port_data)
+ {
+ listen_port = data_to_uint16 (listen_port_data);
+ }
+
+ if (listen_port == (uint16_t) -1)
+ listen_port = GF_DEFAULT_IBVERBS_LISTEN_PORT;
+
+
+ if (listen_host_data)
+ {
+ listen_host = data_to_str (listen_host_data);
+ }
+
+ memset (service, 0, sizeof (service));
+ sprintf (service, "%d", listen_port);
+
+ memset (&hints, 0, sizeof (hints));
+ hints.ai_family = addr->sa_family;
+ hints.ai_socktype = SOCK_STREAM;
+ hints.ai_flags = AI_ADDRCONFIG | AI_PASSIVE;
+
+ ret = getaddrinfo(listen_host, service, &hints, &res);
+ if (ret != 0) {
+ gf_log (this->xl->name,
+ GF_LOG_ERROR,
+ "getaddrinfo failed for host %s, service %s (%s)",
+ listen_host, service, gai_strerror (ret));
+ ret = -1;
+ goto err;
+ }
+
+ memcpy (addr, res->ai_addr, res->ai_addrlen);
+ *addr_len = res->ai_addrlen;
+
+ freeaddrinfo (res);
+
+err:
+ return ret;
+}
+
+int32_t
+client_bind (transport_t *this,
+ struct sockaddr *sockaddr,
+ socklen_t *sockaddr_len,
+ int sock)
+{
+ int ret = 0;
+
+ *sockaddr_len = sizeof (struct sockaddr_in6);
+ switch (sockaddr->sa_family)
+ {
+ case AF_INET_SDP:
+ case AF_INET:
+ *sockaddr_len = sizeof (struct sockaddr_in);
+
+ case AF_INET6:
+ ret = af_inet_bind_to_port_lt_ceiling (sock, sockaddr,
+ *sockaddr_len,
+ CLIENT_PORT_CEILING);
+ if (ret == -1) {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "cannot bind inet socket (%d) to port "
+ "less than %d (%s)",
+ sock, CLIENT_PORT_CEILING, strerror (errno));
+ ret = 0;
+ }
+ break;
+
+ case AF_UNIX:
+ *sockaddr_len = sizeof (struct sockaddr_un);
+ ret = af_unix_client_bind (this, (struct sockaddr *)sockaddr,
+ *sockaddr_len, sock);
+ break;
+
+ default:
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "unknown address family %d", sockaddr->sa_family);
+ ret = -1;
+ break;
+ }
+
+ return ret;
+}
+
+int32_t
+ibverbs_client_get_remote_sockaddr (transport_t *this,
+ struct sockaddr *sockaddr,
+ socklen_t *sockaddr_len)
+{
+ int32_t ret = 0;
+ char is_inet_sdp = 0;
+
+ ret = client_fill_address_family (this, sockaddr);
+ if (ret) {
+ ret = -1;
+ goto err;
+ }
+
+ switch (sockaddr->sa_family)
+ {
+ case AF_INET_SDP:
+ sockaddr->sa_family = AF_INET;
+ is_inet_sdp = 1;
+
+ case AF_INET:
+ case AF_INET6:
+ case AF_UNSPEC:
+ ret = af_inet_client_get_remote_sockaddr (this,
+ sockaddr,
+ sockaddr_len);
+
+ if (is_inet_sdp) {
+ sockaddr->sa_family = AF_INET_SDP;
+ }
+
+ break;
+
+ case AF_UNIX:
+ ret = af_unix_client_get_remote_sockaddr (this,
+ sockaddr,
+ sockaddr_len);
+ break;
+
+ default:
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "unknown address-family %d", sockaddr->sa_family);
+ ret = -1;
+ }
+
+err:
+ return ret;
+}
+
+int32_t
+ibverbs_server_get_local_sockaddr (transport_t *this,
+ struct sockaddr *addr,
+ socklen_t *addr_len)
+{
+ data_t *address_family_data = NULL;
+ int32_t ret = 0;
+ char is_inet_sdp = 0;
+
+ address_family_data = dict_get (this->xl->options,
+ "transport.address-family");
+ if (address_family_data) {
+ char *address_family = NULL;
+ address_family = data_to_str (address_family_data);
+
+ if (!strcasecmp (address_family, "inet")) {
+ addr->sa_family = AF_INET;
+ } else if (!strcasecmp (address_family, "inet6")) {
+ addr->sa_family = AF_INET6;
+ } else if (!strcasecmp (address_family, "inet-sdp")) {
+ addr->sa_family = AF_INET_SDP;
+ } else if (!strcasecmp (address_family, "unix")) {
+ addr->sa_family = AF_UNIX;
+ } else if (!strcasecmp (address_family, "inet/inet6")
+ || !strcasecmp (address_family, "inet6/inet")) {
+ addr->sa_family = AF_UNSPEC;
+ } else {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "unknown address family (%s) specified",
+ address_family);
+ ret = -1;
+ goto err;
+ }
+ } else {
+ gf_log (this->xl->name, GF_LOG_DEBUG,
+ "option address-family not specified, defaulting "
+ "to inet/inet6");
+ addr->sa_family = AF_UNSPEC;
+ }
+
+ switch (addr->sa_family)
+ {
+ case AF_INET_SDP:
+ is_inet_sdp = 1;
+ addr->sa_family = AF_INET;
+
+ case AF_INET:
+ case AF_INET6:
+ case AF_UNSPEC:
+ ret = af_inet_server_get_local_sockaddr (this, addr, addr_len);
+ if (is_inet_sdp && !ret) {
+ addr->sa_family = AF_INET_SDP;
+ }
+ break;
+
+ case AF_UNIX:
+ ret = af_unix_server_get_local_sockaddr (this, addr, addr_len);
+ break;
+ }
+
+err:
+ return ret;
+}
+
+int32_t
+fill_inet6_inet_identifiers (transport_t *this, struct sockaddr_storage *addr,
+ int32_t addr_len, char *identifier)
+{
+ int32_t ret = 0, tmpaddr_len = 0;
+ char service[NI_MAXSERV], host[NI_MAXHOST];
+ struct sockaddr_storage tmpaddr;
+
+ memset (&tmpaddr, 0, sizeof (tmpaddr));
+ tmpaddr = *addr;
+ tmpaddr_len = addr_len;
+
+ if (((struct sockaddr *) &tmpaddr)->sa_family == AF_INET6) {
+ int32_t one_to_four, four_to_eight, twelve_to_sixteen;
+ int16_t eight_to_ten, ten_to_twelve;
+
+ one_to_four = four_to_eight = twelve_to_sixteen = 0;
+ eight_to_ten = ten_to_twelve = 0;
+
+ one_to_four = ((struct sockaddr_in6 *) &tmpaddr)->sin6_addr.s6_addr32[0];
+ four_to_eight = ((struct sockaddr_in6 *) &tmpaddr)->sin6_addr.s6_addr32[1];
+ eight_to_ten = ((struct sockaddr_in6 *) &tmpaddr)->sin6_addr.s6_addr16[4];
+ ten_to_twelve = ((struct sockaddr_in6 *) &tmpaddr)->sin6_addr.s6_addr16[5];
+ twelve_to_sixteen = ((struct sockaddr_in6 *) &tmpaddr)->sin6_addr.s6_addr32[3];
+
+ /* ipv4 mapped ipv6 address has
+ bits 0-80: 0
+ bits 80-96: 0xffff
+ bits 96-128: ipv4 address
+ */
+
+ if (one_to_four == 0 &&
+ four_to_eight == 0 &&
+ eight_to_ten == 0 &&
+ ten_to_twelve == -1) {
+ struct sockaddr_in *in_ptr = (struct sockaddr_in *)&tmpaddr;
+ memset (&tmpaddr, 0, sizeof (tmpaddr));
+
+ in_ptr->sin_family = AF_INET;
+ in_ptr->sin_port = ((struct sockaddr_in6 *)addr)->sin6_port;
+ in_ptr->sin_addr.s_addr = twelve_to_sixteen;
+ tmpaddr_len = sizeof (*in_ptr);
+ }
+ }
+
+ ret = getnameinfo ((struct sockaddr *) &tmpaddr,
+ tmpaddr_len,
+ host, sizeof (host),
+ service, sizeof (service),
+ NI_NUMERICHOST | NI_NUMERICSERV);
+ if (ret != 0) {
+ gf_log (this->xl->name,
+ GF_LOG_ERROR,
+ "getnameinfo failed (%s)", gai_strerror (ret));
+ }
+
+ sprintf (identifier, "%s:%s", host, service);
+
+ return ret;
+}
+
+int32_t
+get_transport_identifiers (transport_t *this)
+{
+ int32_t ret = 0;
+ char is_inet_sdp = 0;
+
+ switch (((struct sockaddr *) &this->myinfo.sockaddr)->sa_family)
+ {
+ case AF_INET_SDP:
+ is_inet_sdp = 1;
+ ((struct sockaddr *) &this->peerinfo.sockaddr)->sa_family = ((struct sockaddr *) &this->myinfo.sockaddr)->sa_family = AF_INET;
+
+ case AF_INET:
+ case AF_INET6:
+ {
+ ret = fill_inet6_inet_identifiers (this,
+ &this->myinfo.sockaddr,
+ this->myinfo.sockaddr_len,
+ this->myinfo.identifier);
+ if (ret == -1) {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "can't fill inet/inet6 identifier for server");
+ goto err;
+ }
+
+ ret = fill_inet6_inet_identifiers (this,
+ &this->peerinfo.sockaddr,
+ this->peerinfo.sockaddr_len,
+ this->peerinfo.identifier);
+ if (ret == -1) {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "can't fill inet/inet6 identifier for client");
+ goto err;
+ }
+
+ if (is_inet_sdp) {
+ ((struct sockaddr *) &this->peerinfo.sockaddr)->sa_family = ((struct sockaddr *) &this->myinfo.sockaddr)->sa_family = AF_INET_SDP;
+ }
+ }
+ break;
+
+ case AF_UNIX:
+ {
+ struct sockaddr_un *sunaddr = NULL;
+
+ sunaddr = (struct sockaddr_un *) &this->myinfo.sockaddr;
+ strcpy (this->myinfo.identifier, sunaddr->sun_path);
+
+ sunaddr = (struct sockaddr_un *) &this->peerinfo.sockaddr;
+ strcpy (this->peerinfo.identifier, sunaddr->sun_path);
+ }
+ break;
+
+ default:
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "unknown address family (%d)",
+ ((struct sockaddr *) &this->myinfo.sockaddr)->sa_family);
+ ret = -1;
+ break;
+ }
+
+err:
+ return ret;
+}
diff --git a/transport/ib-verbs/src/name.h b/transport/ib-verbs/src/name.h
new file mode 100644
index 00000000000..1b0f378b94a
--- /dev/null
+++ b/transport/ib-verbs/src/name.h
@@ -0,0 +1,47 @@
+/*
+ Copyright (c) 2008 Z RESEARCH, Inc. <http://www.zresearch.com>
+ This file is part of GlusterFS.
+
+ GlusterFS is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published
+ by the Free Software Foundation; either version 3 of the License,
+ or (at your option) any later version.
+
+ GlusterFS is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program. If not, see
+ <http://www.gnu.org/licenses/>.
+*/
+
+#ifndef _IB_VERBS_NAME_H
+#define _IB_VERBS_NAME_H
+
+#include <sys/socket.h>
+#include <sys/un.h>
+
+#include "compat.h"
+
+int32_t
+client_bind (transport_t *this,
+ struct sockaddr *sockaddr,
+ socklen_t *sockaddr_len,
+ int sock);
+
+int32_t
+ibverbs_client_get_remote_sockaddr (transport_t *this,
+ struct sockaddr *sockaddr,
+ socklen_t *sockaddr_len);
+
+int32_t
+ibverbs_server_get_local_sockaddr (transport_t *this,
+ struct sockaddr *addr,
+ socklen_t *addr_len);
+
+int32_t
+get_transport_identifiers (transport_t *this);
+
+#endif /* _IB_VERBS_NAME_H */
diff --git a/transport/socket/Makefile.am b/transport/socket/Makefile.am
new file mode 100644
index 00000000000..f963effea22
--- /dev/null
+++ b/transport/socket/Makefile.am
@@ -0,0 +1 @@
+SUBDIRS = src \ No newline at end of file
diff --git a/transport/socket/src/Makefile.am b/transport/socket/src/Makefile.am
new file mode 100644
index 00000000000..e112921232c
--- /dev/null
+++ b/transport/socket/src/Makefile.am
@@ -0,0 +1,14 @@
+noinst_HEADERS = socket.h name.h
+
+transport_LTLIBRARIES = socket.la
+transportdir = $(libdir)/glusterfs/$(PACKAGE_VERSION)/transport/
+
+socket_la_LDFLAGS = -module -avoidversion
+
+socket_la_SOURCES = socket.c name.c
+socket_la_LIBADD = $(top_builddir)/libglusterfs/src/libglusterfs.la
+
+AM_CFLAGS = -fPIC -D_FILE_OFFSET_BITS=64 -D_GNU_SOURCE -Wall -D$(GF_HOST_OS)\
+ -I$(top_srcdir)/libglusterfs/src -shared -nostartfiles $(GF_CFLAGS)
+
+CLEANFILES = *~
diff --git a/transport/socket/src/name.c b/transport/socket/src/name.c
new file mode 100644
index 00000000000..a599b00cced
--- /dev/null
+++ b/transport/socket/src/name.c
@@ -0,0 +1,677 @@
+/*
+ Copyright (c) 2008 Z RESEARCH, Inc. <http://www.zresearch.com>
+ This file is part of GlusterFS.
+
+ GlusterFS is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published
+ by the Free Software Foundation; either version 3 of the License,
+ or (at your option) any later version.
+
+ GlusterFS is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program. If not, see
+ <http://www.gnu.org/licenses/>.
+*/
+
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <errno.h>
+#include <netdb.h>
+#include <string.h>
+
+#ifdef CLIENT_PORT_CEILING
+#undef CLIENT_PORT_CEILING
+#endif
+
+#define CLIENT_PORT_CEILING 1024
+
+#ifndef AF_INET_SDP
+#define AF_INET_SDP 27
+#endif
+
+#include "transport.h"
+#include "socket.h"
+
+int32_t
+gf_resolve_ip6 (const char *hostname,
+ uint16_t port,
+ int family,
+ void **dnscache,
+ struct addrinfo **addr_info);
+
+static int32_t
+af_inet_bind_to_port_lt_ceiling (int fd, struct sockaddr *sockaddr,
+ socklen_t sockaddr_len, int ceiling)
+{
+ int32_t ret = -1;
+ /* struct sockaddr_in sin = {0, }; */
+ uint16_t port = ceiling - 1;
+
+ while (port)
+ {
+ switch (sockaddr->sa_family)
+ {
+ case AF_INET6:
+ ((struct sockaddr_in6 *)sockaddr)->sin6_port = htons (port);
+ break;
+
+ case AF_INET_SDP:
+ case AF_INET:
+ ((struct sockaddr_in *)sockaddr)->sin_port = htons (port);
+ break;
+ }
+
+ ret = bind (fd, sockaddr, sockaddr_len);
+
+ if (ret == 0)
+ break;
+
+ if (ret == -1 && errno == EACCES)
+ break;
+
+ port--;
+ }
+
+ return ret;
+}
+
+static int32_t
+af_unix_client_bind (transport_t *this,
+ struct sockaddr *sockaddr,
+ socklen_t sockaddr_len,
+ int sock)
+{
+ data_t *path_data = NULL;
+ struct sockaddr_un *addr = NULL;
+ int32_t ret = -1;
+
+ path_data = dict_get (this->xl->options, "transport.socket.bind-path");
+ if (path_data) {
+ char *path = data_to_str (path_data);
+ if (!path || strlen (path) > UNIX_PATH_MAX) {
+ gf_log (this->xl->name, GF_LOG_DEBUG,
+ "bind-path not specfied for unix socket, "
+ "letting connect to assign default value");
+ goto err;
+ }
+
+ addr = (struct sockaddr_un *) sockaddr;
+ strcpy (addr->sun_path, path);
+ ret = bind (sock, (struct sockaddr *)addr, sockaddr_len);
+ if (ret == -1) {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "cannot bind to unix-domain socket %d (%s)",
+ sock, strerror (errno));
+ goto err;
+ }
+ }
+
+err:
+ return ret;
+}
+
+static int32_t
+client_fill_address_family (transport_t *this, struct sockaddr *sockaddr)
+{
+ data_t *address_family_data = NULL;
+
+ address_family_data = dict_get (this->xl->options,
+ "transport.address-family");
+ if (!address_family_data) {
+ data_t *remote_host_data = NULL, *connect_path_data = NULL;
+ remote_host_data = dict_get (this->xl->options, "remote-host");
+ connect_path_data = dict_get (this->xl->options,
+ "transport.socket.connect-path");
+
+ if (!(remote_host_data || connect_path_data) ||
+ (remote_host_data && connect_path_data)) {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "transport.address-family not specified and "
+ "not able to determine the "
+ "same from other options (remote-host:%s and "
+ "transport.unix.connect-path:%s)",
+ data_to_str (remote_host_data),
+ data_to_str (connect_path_data));
+ return -1;
+ }
+
+ if (remote_host_data) {
+ gf_log (this->xl->name, GF_LOG_DEBUG,
+ "address-family not specified, guessing it "
+ "to be inet/inet6");
+ sockaddr->sa_family = AF_UNSPEC;
+ } else {
+ gf_log (this->xl->name, GF_LOG_DEBUG,
+ "address-family not specified, guessing it "
+ "to be unix");
+ sockaddr->sa_family = AF_UNIX;
+ }
+
+ } else {
+ char *address_family = data_to_str (address_family_data);
+ if (!strcasecmp (address_family, "unix")) {
+ sockaddr->sa_family = AF_UNIX;
+ } else if (!strcasecmp (address_family, "inet")) {
+ sockaddr->sa_family = AF_INET;
+ } else if (!strcasecmp (address_family, "inet6")) {
+ sockaddr->sa_family = AF_INET6;
+ } else if (!strcasecmp (address_family, "inet-sdp")) {
+ sockaddr->sa_family = AF_INET_SDP;
+ } else if (!strcasecmp (address_family, "inet/inet6")
+ || !strcasecmp (address_family, "inet6/inet")) {
+ sockaddr->sa_family = AF_UNSPEC;
+ } else {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "unknown address-family (%s) specified",
+ address_family);
+ return -1;
+ }
+ }
+
+ return 0;
+}
+
+static int32_t
+af_inet_client_get_remote_sockaddr (transport_t *this,
+ struct sockaddr *sockaddr,
+ socklen_t *sockaddr_len)
+{
+ dict_t *options = this->xl->options;
+ data_t *remote_host_data = NULL;
+ data_t *remote_port_data = NULL;
+ char *remote_host = NULL;
+ uint16_t remote_port = 0;
+ struct addrinfo *addr_info = NULL;
+ int32_t ret = 0;
+
+ remote_host_data = dict_get (options, "remote-host");
+ if (remote_host_data == NULL)
+ {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "option remote-host missing in volume %s", this->xl->name);
+ ret = -1;
+ goto err;
+ }
+
+ remote_host = data_to_str (remote_host_data);
+ if (remote_host == NULL)
+ {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "option remote-host has data NULL in volume %s", this->xl->name);
+ ret = -1;
+ goto err;
+ }
+
+ remote_port_data = dict_get (options, "remote-port");
+ if (remote_port_data == NULL)
+ {
+ gf_log (this->xl->name, GF_LOG_DEBUG,
+ "option remote-port missing in volume %s. Defaulting to %d",
+ this->xl->name, GF_DEFAULT_SOCKET_LISTEN_PORT);
+
+ remote_port = GF_DEFAULT_SOCKET_LISTEN_PORT;
+ }
+ else
+ {
+ remote_port = data_to_uint16 (remote_port_data);
+ }
+
+ if (remote_port == (uint16_t)-1)
+ {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "option remote-port has invalid port in volume %s",
+ this->xl->name);
+ ret = -1;
+ goto err;
+ }
+
+ /* TODO: gf_resolve is a blocking call. kick in some
+ non blocking dns techniques */
+ ret = gf_resolve_ip6 (remote_host, remote_port,
+ sockaddr->sa_family, &this->dnscache, &addr_info);
+ if (ret == -1) {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "DNS resolution failed on host %s", remote_host);
+ goto err;
+ }
+
+ memcpy (sockaddr, addr_info->ai_addr, addr_info->ai_addrlen);
+ *sockaddr_len = addr_info->ai_addrlen;
+
+err:
+ return ret;
+}
+
+static int32_t
+af_unix_client_get_remote_sockaddr (transport_t *this,
+ struct sockaddr *sockaddr,
+ socklen_t *sockaddr_len)
+{
+ struct sockaddr_un *sockaddr_un = NULL;
+ char *connect_path = NULL;
+ data_t *connect_path_data = NULL;
+ int32_t ret = 0;
+
+ connect_path_data = dict_get (this->xl->options,
+ "transport.socket.connect-path");
+ if (!connect_path_data) {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "option transport.unix.connect-path not specified for "
+ "address-family unix");
+ ret = -1;
+ goto err;
+ }
+
+ connect_path = data_to_str (connect_path_data);
+ if (!connect_path) {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "transport.unix.connect-path is null-string");
+ ret = -1;
+ goto err;
+ }
+
+ if (strlen (connect_path) > UNIX_PATH_MAX) {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "connect-path value length %"GF_PRI_SIZET" > %d octets",
+ strlen (connect_path), UNIX_PATH_MAX);
+ ret = -1;
+ goto err;
+ }
+
+ gf_log (this->xl->name, GF_LOG_DEBUG,
+ "using connect-path %s", connect_path);
+ sockaddr_un = (struct sockaddr_un *)sockaddr;
+ strcpy (sockaddr_un->sun_path, connect_path);
+ *sockaddr_len = sizeof (struct sockaddr_un);
+
+err:
+ return ret;
+}
+
+static int32_t
+af_unix_server_get_local_sockaddr (transport_t *this,
+ struct sockaddr *addr,
+ socklen_t *addr_len)
+{
+ data_t *listen_path_data = NULL;
+ char *listen_path = NULL;
+ int32_t ret = 0;
+ struct sockaddr_un *sunaddr = (struct sockaddr_un *)addr;
+
+
+ listen_path_data = dict_get (this->xl->options,
+ "transport.socket.listen-path");
+ if (!listen_path_data) {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "missing option transport.socket.listen-path");
+ ret = -1;
+ goto err;
+ }
+
+ listen_path = data_to_str (listen_path_data);
+
+#ifndef UNIX_PATH_MAX
+#define UNIX_PATH_MAX 108
+#endif
+
+ if (strlen (listen_path) > UNIX_PATH_MAX) {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "option transport.unix.listen-path has value length "
+ "%"GF_PRI_SIZET" > %d",
+ strlen (listen_path), UNIX_PATH_MAX);
+ ret = -1;
+ goto err;
+ }
+
+ sunaddr->sun_family = AF_UNIX;
+ strcpy (sunaddr->sun_path, listen_path);
+ *addr_len = sizeof (struct sockaddr_un);
+
+err:
+ return ret;
+}
+
+static int32_t
+af_inet_server_get_local_sockaddr (transport_t *this,
+ struct sockaddr *addr,
+ socklen_t *addr_len)
+{
+ struct addrinfo hints, *res = 0;
+ data_t *listen_port_data = NULL, *listen_host_data = NULL;
+ uint16_t listen_port = -1;
+ char service[NI_MAXSERV], *listen_host = NULL;
+ dict_t *options = NULL;
+ int32_t ret = 0;
+
+ options = this->xl->options;
+
+ listen_port_data = dict_get (options, "transport.socket.listen-port");
+ listen_host_data = dict_get (options, "transport.socket.bind-address");
+
+ if (listen_port_data)
+ {
+ listen_port = data_to_uint16 (listen_port_data);
+ }
+
+ if (listen_port == (uint16_t) -1)
+ listen_port = GF_DEFAULT_SOCKET_LISTEN_PORT;
+
+
+ if (listen_host_data)
+ {
+ listen_host = data_to_str (listen_host_data);
+ }
+
+ memset (service, 0, sizeof (service));
+ sprintf (service, "%d", listen_port);
+
+ memset (&hints, 0, sizeof (hints));
+ hints.ai_family = addr->sa_family;
+ hints.ai_socktype = SOCK_STREAM;
+ hints.ai_flags = AI_ADDRCONFIG | AI_PASSIVE;
+
+ ret = getaddrinfo(listen_host, service, &hints, &res);
+ if (ret != 0) {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "getaddrinfo failed for host %s, service %s (%s)",
+ listen_host, service, gai_strerror (ret));
+ ret = -1;
+ goto err;
+ }
+
+ memcpy (addr, res->ai_addr, res->ai_addrlen);
+ *addr_len = res->ai_addrlen;
+
+ freeaddrinfo (res);
+
+err:
+ return ret;
+}
+
+int32_t
+client_bind (transport_t *this,
+ struct sockaddr *sockaddr,
+ socklen_t *sockaddr_len,
+ int sock)
+{
+ int ret = 0;
+
+ *sockaddr_len = sizeof (struct sockaddr_in6);
+ switch (sockaddr->sa_family)
+ {
+ case AF_INET_SDP:
+ case AF_INET:
+ *sockaddr_len = sizeof (struct sockaddr_in);
+
+ case AF_INET6:
+ ret = af_inet_bind_to_port_lt_ceiling (sock, sockaddr,
+ *sockaddr_len, CLIENT_PORT_CEILING);
+ if (ret == -1) {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "cannot bind inet socket (%d) to port less than %d (%s)",
+ sock, CLIENT_PORT_CEILING, strerror (errno));
+ ret = 0;
+ }
+ break;
+
+ case AF_UNIX:
+ *sockaddr_len = sizeof (struct sockaddr_un);
+ ret = af_unix_client_bind (this, (struct sockaddr *)sockaddr,
+ *sockaddr_len, sock);
+ break;
+
+ default:
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "unknown address family %d", sockaddr->sa_family);
+ ret = -1;
+ break;
+ }
+
+ return ret;
+}
+
+int32_t
+socket_client_get_remote_sockaddr (transport_t *this,
+ struct sockaddr *sockaddr,
+ socklen_t *sockaddr_len)
+{
+ int32_t ret = 0;
+ char is_inet_sdp = 0;
+
+ ret = client_fill_address_family (this, sockaddr);
+ if (ret) {
+ ret = -1;
+ goto err;
+ }
+
+ switch (sockaddr->sa_family)
+ {
+ case AF_INET_SDP:
+ sockaddr->sa_family = AF_INET;
+ is_inet_sdp = 1;
+
+ case AF_INET:
+ case AF_INET6:
+ case AF_UNSPEC:
+ ret = af_inet_client_get_remote_sockaddr (this, sockaddr, sockaddr_len);
+
+ if (is_inet_sdp) {
+ sockaddr->sa_family = AF_INET_SDP;
+ }
+
+ break;
+
+ case AF_UNIX:
+ ret = af_unix_client_get_remote_sockaddr (this, sockaddr, sockaddr_len);
+ break;
+
+ default:
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "unknown address-family %d", sockaddr->sa_family);
+ ret = -1;
+ }
+
+err:
+ return ret;
+}
+
+int32_t
+socket_server_get_local_sockaddr (transport_t *this,
+ struct sockaddr *addr,
+ socklen_t *addr_len)
+{
+ data_t *address_family_data = NULL;
+ int32_t ret = 0;
+ char is_inet_sdp = 0;
+
+ address_family_data = dict_get (this->xl->options,
+ "transport.address-family");
+ if (address_family_data) {
+ char *address_family = NULL;
+ address_family = data_to_str (address_family_data);
+
+ if (!strcasecmp (address_family, "inet")) {
+ addr->sa_family = AF_INET;
+ } else if (!strcasecmp (address_family, "inet6")) {
+ addr->sa_family = AF_INET6;
+ } else if (!strcasecmp (address_family, "inet-sdp")) {
+ addr->sa_family = AF_INET_SDP;
+ } else if (!strcasecmp (address_family, "unix")) {
+ addr->sa_family = AF_UNIX;
+ } else if (!strcasecmp (address_family, "inet/inet6")
+ || !strcasecmp (address_family, "inet6/inet")) {
+ addr->sa_family = AF_UNSPEC;
+ } else {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "unknown address family (%s) specified", address_family);
+ ret = -1;
+ goto err;
+ }
+ } else {
+ gf_log (this->xl->name, GF_LOG_DEBUG,
+ "option address-family not specified, defaulting to inet/inet6");
+ addr->sa_family = AF_UNSPEC;
+ }
+
+ switch (addr->sa_family)
+ {
+ case AF_INET_SDP:
+ is_inet_sdp = 1;
+ addr->sa_family = AF_INET;
+
+ case AF_INET:
+ case AF_INET6:
+ case AF_UNSPEC:
+ ret = af_inet_server_get_local_sockaddr (this, addr, addr_len);
+ if (is_inet_sdp && !ret) {
+ addr->sa_family = AF_INET_SDP;
+ }
+ break;
+
+ case AF_UNIX:
+ ret = af_unix_server_get_local_sockaddr (this, addr, addr_len);
+ break;
+ }
+
+err:
+ return ret;
+}
+
+int32_t
+fill_inet6_inet_identifiers (transport_t *this, struct sockaddr_storage *addr,
+ int32_t addr_len, char *identifier)
+{
+ int32_t ret = 0, tmpaddr_len = 0;
+ char service[NI_MAXSERV], host[NI_MAXHOST];
+ struct sockaddr_storage tmpaddr;
+
+ memset (&tmpaddr, 0, sizeof (tmpaddr));
+ tmpaddr = *addr;
+ tmpaddr_len = addr_len;
+
+ if (((struct sockaddr *) &tmpaddr)->sa_family == AF_INET6) {
+ int32_t one_to_four, four_to_eight, twelve_to_sixteen;
+ int16_t eight_to_ten, ten_to_twelve;
+
+ one_to_four = four_to_eight = twelve_to_sixteen = 0;
+ eight_to_ten = ten_to_twelve = 0;
+
+ one_to_four = ((struct sockaddr_in6 *) &tmpaddr)->sin6_addr.s6_addr32[0];
+ four_to_eight = ((struct sockaddr_in6 *) &tmpaddr)->sin6_addr.s6_addr32[1];
+#ifdef GF_SOLARIS_HOST_OS
+ eight_to_ten = S6_ADDR16(((struct sockaddr_in6 *) &tmpaddr)->sin6_addr)[4];
+#else
+ eight_to_ten = ((struct sockaddr_in6 *) &tmpaddr)->sin6_addr.s6_addr16[4];
+#endif
+
+#ifdef GF_SOLARIS_HOST_OS
+ ten_to_twelve = S6_ADDR16(((struct sockaddr_in6 *) &tmpaddr)->sin6_addr)[5];
+#else
+ ten_to_twelve = ((struct sockaddr_in6 *) &tmpaddr)->sin6_addr.s6_addr16[5];
+#endif
+
+ twelve_to_sixteen = ((struct sockaddr_in6 *) &tmpaddr)->sin6_addr.s6_addr32[3];
+
+ /* ipv4 mapped ipv6 address has
+ bits 0-80: 0
+ bits 80-96: 0xffff
+ bits 96-128: ipv4 address
+ */
+
+ if (one_to_four == 0 &&
+ four_to_eight == 0 &&
+ eight_to_ten == 0 &&
+ ten_to_twelve == -1) {
+ struct sockaddr_in *in_ptr = (struct sockaddr_in *)&tmpaddr;
+ memset (&tmpaddr, 0, sizeof (tmpaddr));
+
+ in_ptr->sin_family = AF_INET;
+ in_ptr->sin_port = ((struct sockaddr_in6 *)addr)->sin6_port;
+ in_ptr->sin_addr.s_addr = twelve_to_sixteen;
+ tmpaddr_len = sizeof (*in_ptr);
+ }
+ }
+
+ ret = getnameinfo ((struct sockaddr *) &tmpaddr,
+ tmpaddr_len,
+ host, sizeof (host),
+ service, sizeof (service),
+ NI_NUMERICHOST | NI_NUMERICSERV);
+ if (ret != 0) {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "getnameinfo failed (%s)", gai_strerror (ret));
+ }
+
+ sprintf (identifier, "%s:%s", host, service);
+
+ return ret;
+}
+
+int32_t
+get_transport_identifiers (transport_t *this)
+{
+ int32_t ret = 0;
+ char is_inet_sdp = 0;
+
+ switch (((struct sockaddr *) &this->myinfo.sockaddr)->sa_family)
+ {
+ case AF_INET_SDP:
+ is_inet_sdp = 1;
+ ((struct sockaddr *) &this->peerinfo.sockaddr)->sa_family = ((struct sockaddr *) &this->myinfo.sockaddr)->sa_family = AF_INET;
+
+ case AF_INET:
+ case AF_INET6:
+ {
+ ret = fill_inet6_inet_identifiers (this,
+ &this->myinfo.sockaddr,
+ this->myinfo.sockaddr_len,
+ this->myinfo.identifier);
+ if (ret == -1) {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "cannot fill inet/inet6 identifier for server");
+ goto err;
+ }
+
+ ret = fill_inet6_inet_identifiers (this,
+ &this->peerinfo.sockaddr,
+ this->peerinfo.sockaddr_len,
+ this->peerinfo.identifier);
+ if (ret == -1) {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "cannot fill inet/inet6 identifier for client");
+ goto err;
+ }
+
+ if (is_inet_sdp) {
+ ((struct sockaddr *) &this->peerinfo.sockaddr)->sa_family = ((struct sockaddr *) &this->myinfo.sockaddr)->sa_family = AF_INET_SDP;
+ }
+ }
+ break;
+
+ case AF_UNIX:
+ {
+ struct sockaddr_un *sunaddr = NULL;
+
+ sunaddr = (struct sockaddr_un *) &this->myinfo.sockaddr;
+ strcpy (this->myinfo.identifier, sunaddr->sun_path);
+
+ sunaddr = (struct sockaddr_un *) &this->peerinfo.sockaddr;
+ strcpy (this->peerinfo.identifier, sunaddr->sun_path);
+ }
+ break;
+
+ default:
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "unknown address family (%d)",
+ ((struct sockaddr *) &this->myinfo.sockaddr)->sa_family);
+ ret = -1;
+ break;
+ }
+
+err:
+ return ret;
+}
diff --git a/transport/socket/src/name.h b/transport/socket/src/name.h
new file mode 100644
index 00000000000..552037bcc89
--- /dev/null
+++ b/transport/socket/src/name.h
@@ -0,0 +1,44 @@
+/*
+ Copyright (c) 2008 Z RESEARCH, Inc. <http://www.zresearch.com>
+ This file is part of GlusterFS.
+
+ GlusterFS is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published
+ by the Free Software Foundation; either version 3 of the License,
+ or (at your option) any later version.
+
+ GlusterFS is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program. If not, see
+ <http://www.gnu.org/licenses/>.
+*/
+
+#ifndef _SOCKET_NAME_H
+#define _SOCKET_NAME_H
+
+#include "compat.h"
+
+int32_t
+client_bind (transport_t *this,
+ struct sockaddr *sockaddr,
+ socklen_t *sockaddr_len,
+ int sock);
+
+int32_t
+socket_client_get_remote_sockaddr (transport_t *this,
+ struct sockaddr *sockaddr,
+ socklen_t *sockaddr_len);
+
+int32_t
+socket_server_get_local_sockaddr (transport_t *this,
+ struct sockaddr *addr,
+ socklen_t *addr_len);
+
+int32_t
+get_transport_identifiers (transport_t *this);
+
+#endif /* _SOCKET_NAME_H */
diff --git a/transport/socket/src/socket.c b/transport/socket/src/socket.c
new file mode 100644
index 00000000000..066da782250
--- /dev/null
+++ b/transport/socket/src/socket.c
@@ -0,0 +1,1370 @@
+/*
+ Copyright (c) 2008 Z RESEARCH, Inc. <http://www.zresearch.com>
+ This file is part of GlusterFS.
+
+ GlusterFS is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published
+ by the Free Software Foundation; either version 3 of the License,
+ or (at your option) any later version.
+
+ GlusterFS is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program. If not, see
+ <http://www.gnu.org/licenses/>.
+*/
+
+
+#ifndef _CONFIG_H
+#define _CONFIG_H
+#include "config.h"
+#endif
+
+#include "socket.h"
+#include "name.h"
+#include "dict.h"
+#include "transport.h"
+#include "logging.h"
+#include "xlator.h"
+#include "byte-order.h"
+#include "common-utils.h"
+#include "compat-errno.h"
+
+#include <fcntl.h>
+#include <errno.h>
+
+
+#define GF_LOG_ERRNO(errno) ((errno == ENOTCONN) ? GF_LOG_DEBUG : GF_LOG_ERROR)
+#define SA(ptr) ((struct sockaddr *)ptr)
+
+int socket_init (transport_t *this);
+
+/*
+ * return value:
+ * 0 = success (completed)
+ * -1 = error
+ * > 0 = incomplete
+ */
+
+int
+__socket_rwv (transport_t *this, struct iovec *vector, int count,
+ struct iovec **pending_vector, int *pending_count,
+ int write)
+{
+ socket_private_t *priv = NULL;
+ int sock = -1;
+ int ret = -1;
+ struct iovec *opvector = NULL;
+ int opcount = 0;
+ int moved = 0;
+
+ priv = this->private;
+ sock = priv->sock;
+
+ opvector = vector;
+ opcount = count;
+
+ while (opcount) {
+ if (write) {
+ ret = writev (sock, opvector, opcount);
+
+ if (ret == 0 || (ret == -1 && errno == EAGAIN)) {
+ /* done for now */
+ break;
+ }
+ } else {
+ ret = readv (sock, opvector, opcount);
+
+ if (ret == -1 && errno == EAGAIN) {
+ /* done for now */
+ break;
+ }
+ }
+
+ if (ret == 0) {
+ /* Mostly due to 'umount' in client */
+ gf_log (this->xl->name, GF_LOG_DEBUG,
+ "EOF from peer %s", this->peerinfo.identifier);
+ opcount = -1;
+ errno = ENOTCONN;
+ break;
+ }
+
+ if (ret == -1) {
+ if (errno == EINTR)
+ continue;
+
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "%s failed (%s)", write ? "writev" : "readv",
+ strerror (errno));
+ opcount = -1;
+ break;
+ }
+
+ moved = 0;
+
+ while (moved < ret) {
+ if ((ret - moved) >= opvector[0].iov_len) {
+ moved += opvector[0].iov_len;
+ opvector++;
+ opcount--;
+ } else {
+ opvector[0].iov_len -= (ret - moved);
+ opvector[0].iov_base += (ret - moved);
+ moved += (ret - moved);
+ }
+ while (opcount && !opvector[0].iov_len) {
+ opvector++;
+ opcount--;
+ }
+ }
+ }
+
+ if (pending_vector)
+ *pending_vector = opvector;
+
+ if (pending_count)
+ *pending_count = opcount;
+
+ return opcount;
+}
+
+
+int
+__socket_readv (transport_t *this, struct iovec *vector, int count,
+ struct iovec **pending_vector, int *pending_count)
+{
+ int ret = -1;
+
+ ret = __socket_rwv (this, vector, count,
+ pending_vector, pending_count, 0);
+
+ return ret;
+}
+
+
+int
+__socket_writev (transport_t *this, struct iovec *vector, int count,
+ struct iovec **pending_vector, int *pending_count)
+{
+ int ret = -1;
+
+ ret = __socket_rwv (this, vector, count,
+ pending_vector, pending_count, 1);
+
+ return ret;
+}
+
+
+int
+__socket_disconnect (transport_t *this)
+{
+ socket_private_t *priv = NULL;
+ int ret = -1;
+
+ priv = this->private;
+
+ if (priv->sock != -1) {
+ ret = shutdown (priv->sock, SHUT_RDWR);
+ priv->connected = -1;
+ gf_log (this->xl->name, GF_LOG_DEBUG,
+ "shutdown() returned %d. set connection state to -1",
+ ret);
+ }
+
+ return ret;
+}
+
+
+int
+__socket_server_bind (transport_t *this)
+{
+ socket_private_t *priv = NULL;
+ int ret = -1;
+ int opt = 1;
+
+ priv = this->private;
+
+ ret = setsockopt (priv->sock, SOL_SOCKET, SO_REUSEADDR,
+ &opt, sizeof (opt));
+
+ if (ret == -1) {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "setsockopt() for SO_REUSEADDR failed (%s)",
+ strerror (errno));
+ }
+
+ ret = bind (priv->sock, (struct sockaddr *)&this->myinfo.sockaddr,
+ this->myinfo.sockaddr_len);
+
+ if (ret == -1) {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "binding to %s failed: %s",
+ this->myinfo.identifier, strerror (errno));
+ if (errno == EADDRINUSE) {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "Port is already in use");
+ }
+ }
+
+ return ret;
+}
+
+
+int
+__socket_nonblock (int fd)
+{
+ int flags = 0;
+ int ret = -1;
+
+ flags = fcntl (fd, F_GETFL);
+
+ if (flags != -1)
+ ret = fcntl (fd, F_SETFL, flags | O_NONBLOCK);
+
+ return ret;
+}
+
+
+int
+__socket_connect_finish (int fd)
+{
+ int ret = -1;
+ int optval = 0;
+ socklen_t optlen = sizeof (int);
+
+ ret = getsockopt (fd, SOL_SOCKET, SO_ERROR, (void *)&optval, &optlen);
+
+ if (ret == 0 && optval) {
+ errno = optval;
+ ret = -1;
+ }
+
+ return ret;
+}
+
+
+void
+__socket_reset (transport_t *this)
+{
+ socket_private_t *priv = NULL;
+
+ priv = this->private;
+
+ /* TODO: use mem-pool on incoming data */
+
+ if (priv->incoming.hdr_p)
+ free (priv->incoming.hdr_p);
+
+ if (priv->incoming.buf_p)
+ free (priv->incoming.buf_p);
+
+ memset (&priv->incoming, 0, sizeof (priv->incoming));
+
+ event_unregister (this->xl->ctx->event_pool, priv->sock, priv->idx);
+ close (priv->sock);
+ priv->sock = -1;
+ priv->idx = -1;
+ priv->connected = -1;
+}
+
+
+struct ioq *
+__socket_ioq_new (transport_t *this, char *buf, int len,
+ struct iovec *vector, int count, dict_t *refs)
+{
+ socket_private_t *priv = NULL;
+ struct ioq *entry = NULL;
+
+ priv = this->private;
+
+ /* TODO: use mem-pool */
+ entry = CALLOC (1, sizeof (*entry));
+
+ assert (count <= (MAX_IOVEC-2));
+
+ entry->header.colonO[0] = ':';
+ entry->header.colonO[1] = 'O';
+ entry->header.colonO[2] = '\0';
+ entry->header.version = 42;
+ entry->header.size1 = hton32 (len);
+ entry->header.size2 = hton32 (iov_length (vector, count));
+
+ entry->vector[0].iov_base = &entry->header;
+ entry->vector[0].iov_len = sizeof (entry->header);
+ entry->count++;
+
+ entry->vector[1].iov_base = buf;
+ entry->vector[1].iov_len = len;
+ entry->count++;
+
+ if (vector && count) {
+ memcpy (&entry->vector[2], vector, sizeof (*vector) * count);
+ entry->count += count;
+ }
+
+ entry->pending_vector = entry->vector;
+ entry->pending_count = entry->count;
+
+ if (refs)
+ entry->refs = dict_ref (refs);
+
+ entry->buf = buf;
+
+ INIT_LIST_HEAD (&entry->list);
+
+ return entry;
+}
+
+
+void
+__socket_ioq_entry_free (struct ioq *entry)
+{
+ list_del_init (&entry->list);
+ if (entry->refs)
+ dict_unref (entry->refs);
+
+ /* TODO: use mem-pool */
+ free (entry->buf);
+
+ /* TODO: use mem-pool */
+ free (entry);
+}
+
+
+void
+__socket_ioq_flush (transport_t *this)
+{
+ socket_private_t *priv = NULL;
+ struct ioq *entry = NULL;
+
+ priv = this->private;
+
+ while (!list_empty (&priv->ioq)) {
+ entry = priv->ioq_next;
+ __socket_ioq_entry_free (entry);
+ }
+
+ return;
+}
+
+
+int
+__socket_ioq_churn_entry (transport_t *this, struct ioq *entry)
+{
+ int ret = -1;
+
+ ret = __socket_writev (this, entry->pending_vector,
+ entry->pending_count,
+ &entry->pending_vector,
+ &entry->pending_count);
+
+ if (ret == 0) {
+ /* current entry was completely written */
+ assert (entry->pending_count == 0);
+ __socket_ioq_entry_free (entry);
+ }
+
+ return ret;
+}
+
+
+int
+__socket_ioq_churn (transport_t *this)
+{
+ socket_private_t *priv = NULL;
+ int ret = 0;
+ struct ioq *entry = NULL;
+
+ priv = this->private;
+
+ while (!list_empty (&priv->ioq)) {
+ /* pick next entry */
+ entry = priv->ioq_next;
+
+ ret = __socket_ioq_churn_entry (this, entry);
+
+ if (ret != 0)
+ break;
+ }
+
+ if (list_empty (&priv->ioq)) {
+ /* all pending writes done, not interested in POLLOUT */
+ priv->idx = event_select_on (this->xl->ctx->event_pool,
+ priv->sock, priv->idx, -1, 0);
+ }
+
+ return ret;
+}
+
+
+int
+socket_event_poll_err (transport_t *this)
+{
+ socket_private_t *priv = NULL;
+ int ret = -1;
+
+ priv = this->private;
+
+ pthread_mutex_lock (&priv->lock);
+ {
+ __socket_ioq_flush (this);
+ __socket_reset (this);
+ }
+ pthread_mutex_unlock (&priv->lock);
+
+ this->xl->notify (this->xl, GF_EVENT_POLLERR, this);
+
+ return ret;
+}
+
+
+int
+socket_event_poll_out (transport_t *this)
+{
+ socket_private_t *priv = NULL;
+ int ret = -1;
+
+ priv = this->private;
+
+ pthread_mutex_lock (&priv->lock);
+ {
+ if (priv->connected == 1) {
+ ret = __socket_ioq_churn (this);
+
+ if (ret == -1) {
+ __socket_disconnect (this);
+ }
+ }
+ }
+ pthread_mutex_unlock (&priv->lock);
+
+ this->xl->notify (this->xl, GF_EVENT_POLLOUT, this);
+
+ return ret;
+}
+
+
+int
+__socket_proto_validate_header (transport_t *this,
+ struct socket_header *header,
+ size_t *size1_p, size_t *size2_p)
+{
+ size_t size1 = 0;
+ size_t size2 = 0;
+
+ if (strcmp (header->colonO, ":O")) {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "socket header signature does not match :O (%x.%x.%x)",
+ header->colonO[0], header->colonO[1],
+ header->colonO[2]);
+ return -1;
+ }
+
+ if (header->version != 42) {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "socket header version does not match 42 != %d",
+ header->version);
+ return -1;
+ }
+
+ size1 = ntoh32 (header->size1);
+ size2 = ntoh32 (header->size2);
+
+ if (size1 <= 0 || size1 > 1048576) {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "socket header has incorrect size1=%"GF_PRI_SIZET,
+ size1);
+ return -1;
+ }
+
+ if (size2 > (1048576 * 4)) {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "socket header has incorrect size2=%"GF_PRI_SIZET,
+ size2);
+ return -1;
+ }
+
+ if (size1_p)
+ *size1_p = size1;
+
+ if (size2_p)
+ *size2_p = size2;
+
+ return 0;
+}
+
+
+
+/* socket protocol state machine */
+
+int
+__socket_proto_state_machine (transport_t *this)
+{
+ int ret = -1;
+ socket_private_t *priv = NULL;
+ size_t size1 = 0;
+ size_t size2 = 0;
+ int previous_state = -1;
+ struct socket_header *hdr = NULL;
+
+
+ priv = this->private;
+
+ while (priv->incoming.state != SOCKET_PROTO_STATE_COMPLETE) {
+ /* debug check against infinite loops */
+ if (previous_state == priv->incoming.state) {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "state did not change! (%d) breaking",
+ previous_state);
+ ret = -1;
+ goto unlock;
+ }
+ previous_state = priv->incoming.state;
+
+ switch (priv->incoming.state) {
+
+ case SOCKET_PROTO_STATE_NADA:
+ priv->incoming.pending_vector =
+ priv->incoming.vector;
+
+ priv->incoming.pending_vector->iov_base =
+ &priv->incoming.header;
+
+ priv->incoming.pending_vector->iov_len =
+ sizeof (struct socket_header);
+
+ priv->incoming.state =
+ SOCKET_PROTO_STATE_HEADER_COMING;
+ break;
+
+ case SOCKET_PROTO_STATE_HEADER_COMING:
+
+ ret = __socket_readv (this,
+ priv->incoming.pending_vector, 1,
+ &priv->incoming.pending_vector,
+ NULL);
+ if (ret == 0) {
+ priv->incoming.state =
+ SOCKET_PROTO_STATE_HEADER_CAME;
+ break;
+ }
+
+ if (ret == -1) {
+ gf_log (this->xl->name, GF_LOG_ERRNO (errno),
+ "read (%s) in state %d (%s)",
+ strerror (errno),
+ SOCKET_PROTO_STATE_HEADER_COMING,
+ this->peerinfo.identifier);
+ goto unlock;
+ }
+
+ if (ret > 0) {
+ gf_log (this->xl->name, GF_LOG_DEBUG,
+ "partial header read on NB socket.");
+ goto unlock;
+ }
+ break;
+
+ case SOCKET_PROTO_STATE_HEADER_CAME:
+ hdr = &priv->incoming.header;
+ ret = __socket_proto_validate_header (this, hdr,
+ &size1, &size2);
+
+ if (ret == -1) {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "socket header validate failed (%s). "
+ "possible mismatch of transport-type "
+ "between server and client volumes, "
+ "or version mismatch",
+ this->peerinfo.identifier);
+ goto unlock;
+ }
+
+ priv->incoming.hdrlen = size1;
+ priv->incoming.buflen = size2;
+
+ /* TODO: use mem-pool */
+ priv->incoming.hdr_p = MALLOC (size1);
+ if (size2)
+ priv->incoming.buf_p = MALLOC (size2);
+
+ priv->incoming.vector[0].iov_base =
+ priv->incoming.hdr_p;
+
+ priv->incoming.vector[0].iov_len = size1;
+
+ priv->incoming.vector[1].iov_base =
+ priv->incoming.buf_p;
+
+ priv->incoming.vector[1].iov_len = size2;
+ priv->incoming.count = size2 ? 2 : 1;
+
+ priv->incoming.pending_vector =
+ priv->incoming.vector;
+
+ priv->incoming.pending_count =
+ priv->incoming.count;
+
+ priv->incoming.state =
+ SOCKET_PROTO_STATE_DATA_COMING;
+ break;
+
+ case SOCKET_PROTO_STATE_DATA_COMING:
+
+ ret = __socket_readv (this,
+ priv->incoming.pending_vector,
+ priv->incoming.pending_count,
+ &priv->incoming.pending_vector,
+ &priv->incoming.pending_count);
+ if (ret == 0) {
+ priv->incoming.state =
+ SOCKET_PROTO_STATE_DATA_CAME;
+ break;
+ }
+
+ if (ret == -1) {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "read (%s) in state %d (%s)",
+ strerror (errno),
+ SOCKET_PROTO_STATE_DATA_COMING,
+ this->peerinfo.identifier);
+ goto unlock;
+ }
+
+ if (ret > 0) {
+ gf_log (this->xl->name, GF_LOG_DEBUG,
+ "partial data read on NB socket");
+ goto unlock;
+ }
+ break;
+
+ case SOCKET_PROTO_STATE_DATA_CAME:
+ memset (&priv->incoming.vector, 0,
+ sizeof (priv->incoming.vector));
+ priv->incoming.pending_vector = NULL;
+ priv->incoming.pending_count = 0;
+ priv->incoming.state = SOCKET_PROTO_STATE_COMPLETE;
+ break;
+
+ case SOCKET_PROTO_STATE_COMPLETE:
+ /* not reached */
+ break;
+
+ default:
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "undefined state reached: %d",
+ priv->incoming.state);
+ goto unlock;
+ }
+ }
+unlock:
+
+ return ret;
+}
+
+
+int
+socket_proto_state_machine (transport_t *this)
+{
+ socket_private_t *priv = NULL;
+ int ret = 0;
+
+ priv = this->private;
+
+ pthread_mutex_lock (&priv->lock);
+ {
+ ret = __socket_proto_state_machine (this);
+ }
+ pthread_mutex_unlock (&priv->lock);
+
+ return ret;
+}
+
+
+int
+socket_event_poll_in (transport_t *this)
+{
+ int ret = -1;
+
+ ret = socket_proto_state_machine (this);
+
+ /* call POLLIN on xlator even if complete block is not received,
+ just to keep the last_received timestamp ticking */
+
+ if (ret == 0)
+ ret = this->xl->notify (this->xl, GF_EVENT_POLLIN, this);
+
+ return ret;
+}
+
+
+int
+socket_connect_finish (transport_t *this)
+{
+ int ret = -1;
+ socket_private_t *priv = NULL;
+ int event = -1;
+ char notify_xlator = 0;
+
+ priv = this->private;
+
+ pthread_mutex_lock (&priv->lock);
+ {
+ if (priv->connected)
+ goto unlock;
+
+ ret = __socket_connect_finish (priv->sock);
+
+ if (ret == -1 && errno == EINPROGRESS)
+ ret = 1;
+
+ if (ret == -1 && errno != EINPROGRESS) {
+ if (!priv->connect_finish_log) {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "connection failed (%s)",
+ strerror (errno));
+ priv->connect_finish_log = 1;
+ }
+ __socket_disconnect (this);
+ notify_xlator = 1;
+ event = GF_EVENT_POLLERR;
+ goto unlock;
+ }
+
+ if (ret == 0) {
+ notify_xlator = 1;
+
+ this->myinfo.sockaddr_len =
+ sizeof (this->myinfo.sockaddr);
+
+ ret = getsockname (priv->sock,
+ SA (&this->myinfo.sockaddr),
+ &this->myinfo.sockaddr_len);
+ if (ret == -1) {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "getsockname on (%d) failed (%s)",
+ priv->sock, strerror (errno));
+ __socket_disconnect (this);
+ event = GF_EVENT_POLLERR;
+ goto unlock;
+ }
+
+ priv->connected = 1;
+ priv->connect_finish_log = 0;
+ event = GF_EVENT_CHILD_UP;
+ get_transport_identifiers (this);
+ }
+ }
+unlock:
+ pthread_mutex_unlock (&priv->lock);
+
+ if (notify_xlator)
+ this->xl->notify (this->xl, event, this);
+
+ return 0;
+}
+
+
+int
+socket_event_handler (int fd, int idx, void *data,
+ int poll_in, int poll_out, int poll_err)
+{
+ transport_t *this = NULL;
+ socket_private_t *priv = NULL;
+ int ret = 0;
+
+ this = data;
+ priv = this->private;
+
+ pthread_mutex_lock (&priv->lock);
+ {
+ priv->idx = idx;
+ }
+ pthread_mutex_unlock (&priv->lock);
+
+ if (!priv->connected) {
+ ret = socket_connect_finish (this);
+ }
+
+ if (!ret && poll_out) {
+ ret = socket_event_poll_out (this);
+ }
+
+ if (!ret && poll_in) {
+ ret = socket_event_poll_in (this);
+ }
+
+ if (ret < 0 || poll_err) {
+ socket_event_poll_err (this);
+ transport_unref (this);
+ }
+
+ return 0;
+}
+
+
+int
+socket_server_event_handler (int fd, int idx, void *data,
+ int poll_in, int poll_out, int poll_err)
+{
+ transport_t *this = NULL;
+ socket_private_t *priv = NULL;
+ int ret = 0;
+ int new_sock = -1;
+ transport_t *new_trans = NULL;
+ struct sockaddr_storage new_sockaddr = {0, };
+ socklen_t addrlen = sizeof (new_sockaddr);
+ socket_private_t *new_priv = NULL;
+ glusterfs_ctx_t *ctx = NULL;
+
+ this = data;
+ priv = this->private;
+ ctx = this->xl->ctx;
+
+ pthread_mutex_lock (&priv->lock);
+ {
+ priv->idx = idx;
+
+ if (poll_in) {
+ new_sock = accept (priv->sock, SA (&new_sockaddr),
+ &addrlen);
+
+ if (new_sock == -1)
+ goto unlock;
+
+ if (!priv->bio) {
+ ret = __socket_nonblock (new_sock);
+
+ if (ret == -1) {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "NBIO on %d failed (%s)",
+ new_sock, strerror (errno));
+ close (new_sock);
+ goto unlock;
+ }
+ }
+
+ new_trans = CALLOC (1, sizeof (*new_trans));
+ new_trans->xl = this->xl;
+ new_trans->fini = this->fini;
+
+ memcpy (&new_trans->peerinfo.sockaddr, &new_sockaddr,
+ addrlen);
+ new_trans->peerinfo.sockaddr_len = addrlen;
+
+ new_trans->myinfo.sockaddr_len =
+ sizeof (new_trans->myinfo.sockaddr);
+
+ ret = getsockname (new_sock,
+ SA (&new_trans->myinfo.sockaddr),
+ &new_trans->myinfo.sockaddr_len);
+ if (ret == -1) {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "getsockname on %d failed (%s)",
+ new_sock, strerror (errno));
+ close (new_sock);
+ goto unlock;
+ }
+
+ get_transport_identifiers (new_trans);
+ socket_init (new_trans);
+ new_trans->ops = this->ops;
+ new_trans->init = this->init;
+ new_trans->fini = this->fini;
+
+ new_priv = new_trans->private;
+
+ pthread_mutex_lock (&new_priv->lock);
+ {
+ new_priv->sock = new_sock;
+ new_priv->connected = 1;
+
+ transport_ref (new_trans);
+ new_priv->idx =
+ event_register (ctx->event_pool,
+ new_sock,
+ socket_event_handler,
+ new_trans, 1, 0);
+
+ if (new_priv->idx == -1)
+ ret = -1;
+ }
+ pthread_mutex_unlock (&new_priv->lock);
+ }
+ }
+unlock:
+ pthread_mutex_unlock (&priv->lock);
+
+ return ret;
+}
+
+
+int
+socket_disconnect (transport_t *this)
+{
+ socket_private_t *priv = NULL;
+ int ret = -1;
+
+ priv = this->private;
+
+ pthread_mutex_lock (&priv->lock);
+ {
+ ret = __socket_disconnect (this);
+ }
+ pthread_mutex_unlock (&priv->lock);
+
+ return ret;
+}
+
+
+int
+socket_connect (transport_t *this)
+{
+ int ret = -1;
+ int sock = -1;
+ socket_private_t *priv = NULL;
+ struct sockaddr_storage sockaddr = {0, };
+ socklen_t sockaddr_len = 0;
+ glusterfs_ctx_t *ctx = NULL;
+
+ priv = this->private;
+ ctx = this->xl->ctx;
+
+ if (!priv) {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "connect() called on uninitialized transport");
+ goto err;
+ }
+
+ pthread_mutex_lock (&priv->lock);
+ {
+ sock = priv->sock;
+ }
+ pthread_mutex_unlock (&priv->lock);
+
+ if (sock != -1) {
+ gf_log (this->xl->name, GF_LOG_DEBUG,
+ "connect () called on transport already connected");
+ goto err;
+ }
+
+ ret = socket_client_get_remote_sockaddr (this, SA (&sockaddr),
+ &sockaddr_len);
+ if (ret == -1) {
+ /* logged inside client_get_remote_sockaddr */
+ goto err;
+ }
+
+ pthread_mutex_lock (&priv->lock);
+ {
+ if (priv->sock != -1) {
+ gf_log (this->xl->name, GF_LOG_DEBUG,
+ "connect() -- already connected");
+ goto unlock;
+ }
+
+ memcpy (&this->peerinfo.sockaddr, &sockaddr, sockaddr_len);
+ this->peerinfo.sockaddr_len = sockaddr_len;
+
+ priv->sock = socket (SA (&sockaddr)->sa_family,
+ SOCK_STREAM, 0);
+
+ if (priv->sock == -1) {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "socket creation failed (%s)",
+ strerror (errno));
+ goto unlock;
+ }
+
+ if (!priv->bio) {
+ ret = __socket_nonblock (priv->sock);
+
+ if (ret == -1) {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "NBIO on %d failed (%s)",
+ priv->sock, strerror (errno));
+ close (priv->sock);
+ priv->sock = -1;
+ goto unlock;
+ }
+ }
+
+ SA (&this->myinfo.sockaddr)->sa_family =
+ SA (&this->peerinfo.sockaddr)->sa_family;
+
+ ret = client_bind (this, SA (&this->myinfo.sockaddr),
+ &this->myinfo.sockaddr_len, priv->sock);
+ if (ret == -1) {
+ gf_log (this->xl->name, GF_LOG_WARNING,
+ "client bind failed: %s", strerror (errno));
+ close (priv->sock);
+ priv->sock = -1;
+ goto unlock;
+ }
+
+ ret = connect (priv->sock, SA (&this->peerinfo.sockaddr),
+ this->peerinfo.sockaddr_len);
+
+ if (ret == -1 && errno != EINPROGRESS) {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "connection attempt failed (%s)",
+ strerror (errno));
+ close (priv->sock);
+ priv->sock = -1;
+ goto unlock;
+ }
+
+ priv->connected = 0;
+
+ transport_ref (this);
+
+ priv->idx = event_register (ctx->event_pool, priv->sock,
+ socket_event_handler, this, 1, 1);
+ if (priv->idx == -1)
+ ret = -1;
+ }
+unlock:
+ pthread_mutex_unlock (&priv->lock);
+
+err:
+ return ret;
+}
+
+
+int
+socket_listen (transport_t *this)
+{
+ socket_private_t * priv = NULL;
+ int ret = -1;
+ int sock = -1;
+ struct sockaddr_storage sockaddr;
+ socklen_t sockaddr_len;
+ peer_info_t *myinfo = NULL;
+ glusterfs_ctx_t *ctx = NULL;
+
+ priv = this->private;
+ myinfo = &this->myinfo;
+ ctx = this->xl->ctx;
+
+ pthread_mutex_lock (&priv->lock);
+ {
+ sock = priv->sock;
+ }
+ pthread_mutex_unlock (&priv->lock);
+
+ if (sock != -1) {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "alreading listening");
+ return ret;
+ }
+
+ ret = socket_server_get_local_sockaddr (this, SA (&sockaddr),
+ &sockaddr_len);
+
+ if (ret == -1) {
+ return ret;
+ }
+
+ pthread_mutex_lock (&priv->lock);
+ {
+ if (priv->sock != -1) {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "already listening");
+ goto unlock;
+ }
+
+ memcpy (&myinfo->sockaddr, &sockaddr, sockaddr_len);
+ myinfo->sockaddr_len = sockaddr_len;
+
+ priv->sock = socket (SA (&sockaddr)->sa_family,
+ SOCK_STREAM, 0);
+
+ if (priv->sock == -1) {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "socket creation failed (%s)",
+ strerror (errno));
+ goto unlock;
+ }
+
+ if (!priv->bio) {
+ ret = __socket_nonblock (priv->sock);
+
+ if (ret == -1) {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "NBIO on %d failed (%s)",
+ priv->sock, strerror (errno));
+ close (priv->sock);
+ priv->sock = -1;
+ goto unlock;
+ }
+ }
+
+ ret = __socket_server_bind (this);
+
+ if (ret == -1) {
+ /* logged inside __socket_server_bind() */
+ close (priv->sock);
+ priv->sock = -1;
+ goto unlock;
+ }
+
+ ret = listen (priv->sock, 10);
+
+ if (ret == -1) {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "could not set socket %d to listen mode (%s)",
+ priv->sock, strerror (errno));
+ close (priv->sock);
+ priv->sock = -1;
+ goto unlock;
+ }
+
+ transport_ref (this);
+
+ priv->idx = event_register (ctx->event_pool, priv->sock,
+ socket_server_event_handler,
+ this, 1, 0);
+
+ if (priv->idx == -1) {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "could not register socket %d with events",
+ priv->sock);
+ ret = -1;
+ close (priv->sock);
+ priv->sock = -1;
+ goto unlock;
+ }
+ }
+unlock:
+ pthread_mutex_unlock (&priv->lock);
+
+ return ret;
+}
+
+
+int
+socket_receive (transport_t *this, char **hdr_p, size_t *hdrlen_p,
+ char **buf_p, size_t *buflen_p)
+{
+ socket_private_t *priv = NULL;
+ int ret = -1;
+
+ priv = this->private;
+
+ pthread_mutex_lock (&priv->lock);
+ {
+ if (priv->connected != 1) {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "socket not connected to receive");
+ goto unlock;
+ }
+
+ if (!hdr_p || !hdrlen_p || !buf_p || !buflen_p) {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "bad parameters %p %p %p %p",
+ hdr_p, hdrlen_p, buf_p, buflen_p);
+ goto unlock;
+ }
+
+ if (priv->incoming.state == SOCKET_PROTO_STATE_COMPLETE) {
+ *hdr_p = priv->incoming.hdr_p;
+ *hdrlen_p = priv->incoming.hdrlen;
+ *buf_p = priv->incoming.buf_p;
+ *buflen_p = priv->incoming.buflen;
+
+ memset (&priv->incoming, 0, sizeof (priv->incoming));
+ priv->incoming.state = SOCKET_PROTO_STATE_NADA;
+
+ ret = 0;
+ }
+ }
+unlock:
+ pthread_mutex_unlock (&priv->lock);
+
+ return ret;
+}
+
+
+/* TODO: implement per transfer limit */
+int
+socket_submit (transport_t *this, char *buf, int len,
+ struct iovec *vector, int count,
+ dict_t *refs)
+{
+ socket_private_t *priv = NULL;
+ int ret = -1;
+ char need_poll_out = 0;
+ char need_append = 1;
+ struct ioq *entry = NULL;
+ glusterfs_ctx_t *ctx = NULL;
+
+ priv = this->private;
+ ctx = this->xl->ctx;
+
+ pthread_mutex_lock (&priv->lock);
+ {
+ if (priv->connected != 1) {
+ if (!priv->submit_log && !priv->connect_finish_log) {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "not connected (priv->connected = %d)",
+ priv->connected);
+ priv->submit_log = 1;
+ }
+ goto unlock;
+ }
+
+ priv->submit_log = 0;
+ entry = __socket_ioq_new (this, buf, len, vector, count, refs);
+
+ if (list_empty (&priv->ioq)) {
+ ret = __socket_ioq_churn_entry (this, entry);
+
+ if (ret == 0)
+ need_append = 0;
+
+ if (ret > 0)
+ need_poll_out = 1;
+ }
+
+ if (need_append) {
+ list_add_tail (&entry->list, &priv->ioq);
+ ret = 0;
+ }
+
+ if (need_poll_out) {
+ /* first entry to wait. continue writing on POLLOUT */
+ priv->idx = event_select_on (ctx->event_pool,
+ priv->sock,
+ priv->idx, -1, 1);
+ }
+ }
+unlock:
+ pthread_mutex_unlock (&priv->lock);
+
+ return ret;
+}
+
+
+struct transport_ops tops = {
+ .listen = socket_listen,
+ .connect = socket_connect,
+ .disconnect = socket_disconnect,
+ .submit = socket_submit,
+ .receive = socket_receive
+};
+
+
+int
+socket_init (transport_t *this)
+{
+ socket_private_t *priv = NULL;
+
+ if (this->private) {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "double init attempted");
+ return -1;
+ }
+
+ priv = CALLOC (1, sizeof (*priv));
+ if (!priv) {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "calloc (1, %"GF_PRI_SIZET") returned NULL",
+ sizeof (*priv));
+ return -1;
+ }
+
+ pthread_mutex_init (&priv->lock, NULL);
+
+ priv->sock = -1;
+ priv->idx = -1;
+ priv->connected = -1;
+
+ INIT_LIST_HEAD (&priv->ioq);
+
+ if (dict_get (this->xl->options, "non-blocking-io")) {
+ gf_boolean_t tmp_bool = 0;
+ char *nb_connect = data_to_str (dict_get (this->xl->options,
+ "non-blocking-io"));
+
+ if (gf_string2boolean (nb_connect, &tmp_bool) == -1) {
+ gf_log (this->xl->name, GF_LOG_ERROR,
+ "'non-blocking-io' takes only boolean options,"
+ " not taking any action");
+ tmp_bool = 1;
+ }
+ priv->bio = 0;
+ if (!tmp_bool) {
+ priv->bio = 1;
+ gf_log (this->xl->name, GF_LOG_WARNING,
+ "disabling non-blocking IO");
+ }
+ }
+
+ this->private = priv;
+
+ return 0;
+}
+
+
+void
+fini (transport_t *this)
+{
+ socket_private_t *priv = this->private;
+ gf_log (this->xl->name, GF_LOG_DEBUG,
+ "transport %p destroyed", this);
+
+ pthread_mutex_destroy (&priv->lock);
+ FREE (priv);
+}
+
+
+int32_t
+init (transport_t *this)
+{
+ int ret = -1;
+
+ ret = socket_init (this);
+
+ if (ret == -1)
+ {
+ gf_log (this->xl->name, GF_LOG_ERROR, "socket_init() failed");
+ }
+
+ return ret;
+}
+
+struct volume_options options[] = {
+ { .key = {"remote-port",
+ "transport.remote-port",
+ "transport.socket.remote-port"},
+ .type = GF_OPTION_TYPE_INT
+ },
+ { .key = {"transport.socket.listen-port", "listen-port"},
+ .type = GF_OPTION_TYPE_INT
+ },
+ { .key = {"transport.socket.bind-address", "bind-address" },
+ .type = GF_OPTION_TYPE_ANY
+ },
+ { .key = {"transport.socket.connect-path", "connect-path"},
+ .type = GF_OPTION_TYPE_ANY
+ },
+ { .key = {"transport.socket.bind-path", "bind-path"},
+ .type = GF_OPTION_TYPE_ANY
+ },
+ { .key = {"transport.socket.listen-path", "listen-path"},
+ .type = GF_OPTION_TYPE_ANY
+ },
+ { .key = { "transport.address-family",
+ "address-family" },
+ .value = {"inet", "inet6", "inet/inet6", "inet6/inet",
+ "unix", "inet-sdp" },
+ .type = GF_OPTION_TYPE_STR
+ },
+
+ { .key = {NULL} }
+};
+
diff --git a/transport/socket/src/socket.h b/transport/socket/src/socket.h
new file mode 100644
index 00000000000..070e69d088b
--- /dev/null
+++ b/transport/socket/src/socket.h
@@ -0,0 +1,106 @@
+/*
+ Copyright (c) 2006, 2007, 2008 Z RESEARCH, Inc. <http://www.zresearch.com>
+ This file is part of GlusterFS.
+
+ GlusterFS is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published
+ by the Free Software Foundation; either version 3 of the License,
+ or (at your option) any later version.
+
+ GlusterFS is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program. If not, see
+ <http://www.gnu.org/licenses/>.
+*/
+
+#ifndef _SOCKET_H
+#define _SOCKET_H
+
+
+#ifndef _CONFIG_H
+#define _CONFIG_H
+#include "config.h"
+#endif
+
+#include "event.h"
+#include "transport.h"
+#include "logging.h"
+#include "dict.h"
+#include "mem-pool.h"
+
+#ifndef MAX_IOVEC
+#define MAX_IOVEC 16
+#endif /* MAX_IOVEC */
+
+#define GF_DEFAULT_SOCKET_LISTEN_PORT 6996
+
+typedef enum {
+ SOCKET_PROTO_STATE_NADA = 0,
+ SOCKET_PROTO_STATE_HEADER_COMING,
+ SOCKET_PROTO_STATE_HEADER_CAME,
+ SOCKET_PROTO_STATE_DATA_COMING,
+ SOCKET_PROTO_STATE_DATA_CAME,
+ SOCKET_PROTO_STATE_COMPLETE,
+} socket_proto_state_t;
+
+struct socket_header {
+ char colonO[3];
+ uint32_t size1;
+ uint32_t size2;
+ char version;
+} __attribute__((packed));
+
+
+struct ioq {
+ union {
+ struct list_head list;
+ struct {
+ struct ioq *next;
+ struct ioq *prev;
+ };
+ };
+ struct socket_header header;
+ struct iovec vector[MAX_IOVEC];
+ int count;
+ struct iovec *pending_vector;
+ int pending_count;
+ char *buf;
+ dict_t *refs;
+};
+
+
+typedef struct {
+ int32_t sock;
+ int32_t idx;
+ unsigned char connected; // -1 = not connected. 0 = in progress. 1 = connected
+ char bio;
+ char connect_finish_log;
+ char submit_log;
+ union {
+ struct list_head ioq;
+ struct {
+ struct ioq *ioq_next;
+ struct ioq *ioq_prev;
+ };
+ };
+ struct {
+ int state;
+ struct socket_header header;
+ char *hdr_p;
+ size_t hdrlen;
+ char *buf_p;
+ size_t buflen;
+ struct iovec vector[2];
+ int count;
+ struct iovec *pending_vector;
+ int pending_count;
+ } incoming;
+ pthread_mutex_t lock;
+} socket_private_t;
+
+
+#endif