summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAnand V. Avati <avati@amp.gluster.com>2009-05-07 01:24:41 +0530
committerAnand V. Avati <avati@amp.gluster.com>2009-05-07 01:24:41 +0530
commitf9f5519b66a25651eb03de577f68d481abdd4c40 (patch)
treeb87c19fa62033da1dbc8a11ce6dbd5a9cb3b3ad8
parent12eb832e255a1abb90434ab3e0d1e1632ae7ce03 (diff)
transport shortcut b/w client and server
This patch makes the server pass back the transport pointer of the client. If the UUID matches, the client makes the local transport 'shortcut' with the remote transport (pointer received from server) The shortcut simulates a socket queue. Instead of serialized messages going over the network and getting queued in the tcp socket queue, the messages get queued in a transport specific queue picked by a polling thread.
-rw-r--r--libglusterfs/src/transport.c101
-rw-r--r--libglusterfs/src/transport.h19
-rw-r--r--xlators/protocol/client/src/client-protocol.c10
-rw-r--r--xlators/protocol/server/src/server-protocol.c3
4 files changed, 131 insertions, 2 deletions
diff --git a/libglusterfs/src/transport.c b/libglusterfs/src/transport.c
index 2c2894d90..244aa960b 100644
--- a/libglusterfs/src/transport.c
+++ b/libglusterfs/src/transport.c
@@ -214,7 +214,43 @@ transport_submit (transport_t *this, char *buf, int32_t len,
struct iovec *vector, int count,
struct iobref *iobref)
{
- int32_t ret = -1;
+ int32_t ret = -1;
+ transport_t *peer_trans = NULL;
+ struct iobuf *iobuf = NULL;
+ struct transport_msg *msg = NULL;
+
+ if (this->peer_trans) {
+ peer_trans = this->peer_trans;
+
+ msg = CALLOC (1, sizeof (*msg));
+ if (!msg) {
+ return -ENOMEM;
+ }
+
+ msg->hdr = buf;
+ msg->hdrlen = len;
+
+ if (vector) {
+ iobuf = iobuf_get (this->xl->ctx->iobuf_pool);
+ if (!iobuf) {
+ FREE (msg->hdr);
+ FREE (msg);
+ return -ENOMEM;
+ }
+
+ iov_unload (iobuf->ptr, vector, count);
+ msg->iobuf = iobuf;
+ }
+
+ pthread_mutex_lock (&peer_trans->handover.mutex);
+ {
+ list_add_tail (&msg->list, &peer_trans->handover.msgs);
+ pthread_cond_broadcast (&peer_trans->handover.cond);
+ }
+ pthread_mutex_unlock (&peer_trans->handover.mutex);
+
+ return 0;
+ }
GF_VALIDATE_OR_GOTO("transport", this, fail);
GF_VALIDATE_OR_GOTO("transport", this->ops, fail);
@@ -307,7 +343,15 @@ transport_receive (transport_t *this, char **hdr_p, size_t *hdrlen_p,
int32_t ret = -1;
GF_VALIDATE_OR_GOTO("transport", this, fail);
-
+
+ if (this->peer_trans) {
+ *hdr_p = this->handover.msg->hdr;
+ *hdrlen_p = this->handover.msg->hdrlen;
+ *iobuf_p = this->handover.msg->iobuf;
+
+ return 0;
+ }
+
ret = this->ops->receive (this, hdr_p, hdrlen_p, iobuf_p);
fail:
return ret;
@@ -338,3 +382,56 @@ fail:
return ret;
}
+
+void *
+transport_peerproc (void *trans_data)
+{
+ transport_t *trans = NULL;
+ struct transport_msg *msg = NULL;
+
+ trans = trans_data;
+
+ while (1) {
+ pthread_mutex_lock (&trans->handover.mutex);
+ {
+ while (list_empty (&trans->handover.msgs))
+ pthread_cond_wait (&trans->handover.cond,
+ &trans->handover.mutex);
+
+ msg = list_entry (trans->handover.msgs.next,
+ struct transport_msg, list);
+
+ list_del_init (&msg->list);
+ }
+ pthread_mutex_unlock (&trans->handover.mutex);
+
+ trans->handover.msg = msg;
+
+ trans->xl->notify (trans->xl, GF_EVENT_POLLIN, trans);
+
+ FREE (msg);
+ }
+}
+
+
+int
+transport_setpeer (transport_t *trans, transport_t *peer_trans)
+{
+ trans->peer_trans = peer_trans;
+
+ INIT_LIST_HEAD (&trans->handover.msgs);
+ pthread_cond_init (&trans->handover.cond, NULL);
+ pthread_mutex_init (&trans->handover.mutex, NULL);
+ pthread_create (&trans->handover.thread, NULL,
+ transport_peerproc, trans);
+
+ peer_trans->peer_trans = trans;
+
+ INIT_LIST_HEAD (&peer_trans->handover.msgs);
+ pthread_cond_init (&peer_trans->handover.cond, NULL);
+ pthread_mutex_init (&peer_trans->handover.mutex, NULL);
+ pthread_create (&peer_trans->handover.thread, NULL,
+ transport_peerproc, peer_trans);
+
+ return 0;
+}
diff --git a/libglusterfs/src/transport.h b/libglusterfs/src/transport.h
index a7cd5ec8f..2d85c76cc 100644
--- a/libglusterfs/src/transport.h
+++ b/libglusterfs/src/transport.h
@@ -40,6 +40,13 @@ typedef struct peer_info {
char identifier[UNIX_PATH_MAX];
}peer_info_t;
+struct transport_msg {
+ struct list_head list;
+ char *hdr;
+ int hdrlen;
+ struct iobuf *iobuf;
+};
+
struct transport {
struct transport_ops *ops;
void *private;
@@ -55,6 +62,16 @@ struct transport {
/* int (*notify) (transport_t *this, int event, void *data); */
peer_info_t peerinfo;
peer_info_t myinfo;
+
+ transport_t *peer_trans;
+ struct {
+ pthread_mutex_t mutex;
+ pthread_cond_t cond;
+ pthread_t thread;
+ struct list_head msgs;
+ struct transport_msg *msg;
+ } handover;
+
};
struct transport_ops {
@@ -84,4 +101,6 @@ transport_t *transport_load (dict_t *options, xlator_t *xl);
transport_t *transport_ref (transport_t *trans);
int32_t transport_unref (transport_t *trans);
+int transport_setpeer (transport_t *trans, transport_t *trans_peer);
+
#endif /* __TRANSPORT_H__ */
diff --git a/xlators/protocol/client/src/client-protocol.c b/xlators/protocol/client/src/client-protocol.c
index 7a2326afd..a0a0f06fd 100644
--- a/xlators/protocol/client/src/client-protocol.c
+++ b/xlators/protocol/client/src/client-protocol.c
@@ -6254,6 +6254,8 @@ client_setvolume_cbk (call_frame_t *frame,
int32_t op_ret = -1;
int32_t op_errno = EINVAL;
int32_t dict_len = 0;
+ transport_t *peer_trans = NULL;
+ uint64_t peer_trans_int = 0;
trans = frame->local; frame->local = NULL;
@@ -6321,14 +6323,22 @@ client_setvolume_cbk (call_frame_t *frame,
ctx = get_global_ctx_ptr ();
if (process_uuid && !strcmp (ctx->process_uuid,process_uuid)) {
+ ret = dict_get_uint64 (reply, "transport-ptr",
+ &peer_trans_int);
+
+ peer_trans = (void *) (long) (peer_trans_int);
gf_log (this->name, GF_LOG_WARNING,
"attaching to the local volume '%s'",
remote_subvol);
+ transport_setpeer (trans, peer_trans);
+
/* TODO: */
+ /*
conf->child = xlator_search_by_name (this,
remote_subvol);
+ */
}
gf_log (trans->xl->name, GF_LOG_NORMAL,
diff --git a/xlators/protocol/server/src/server-protocol.c b/xlators/protocol/server/src/server-protocol.c
index 92cdda2bf..7fc379efb 100644
--- a/xlators/protocol/server/src/server-protocol.c
+++ b/xlators/protocol/server/src/server-protocol.c
@@ -7085,6 +7085,9 @@ mop_setvolume (call_frame_t *frame, xlator_t *bound_xl,
ret = dict_set_str (reply, "process-uuid",
xl->ctx->process_uuid);
+ ret = dict_set_uint64 (reply, "transport-ptr",
+ ((uint64_t) (long) trans));
+
fail:
dict_len = dict_serialized_length (reply);
if (dict_len < 0) {