From f9f5519b66a25651eb03de577f68d481abdd4c40 Mon Sep 17 00:00:00 2001 From: "Anand V. Avati" Date: Thu, 7 May 2009 01:24:41 +0530 Subject: transport shortcut b/w client and server This patch makes the server pass back the transport pointer of the client. If the UUID matches, the client makes the local transport 'shortcut' with the remote transport (pointer received from server) The shortcut simulates a socket queue. Instead of serialized messages going over the network and getting queued in the tcp socket queue, the messages get queued in a transport specific queue picked by a polling thread. --- libglusterfs/src/transport.c | 101 ++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 99 insertions(+), 2 deletions(-) (limited to 'libglusterfs/src/transport.c') diff --git a/libglusterfs/src/transport.c b/libglusterfs/src/transport.c index 2c2894d9031..244aa960b78 100644 --- a/libglusterfs/src/transport.c +++ b/libglusterfs/src/transport.c @@ -214,7 +214,43 @@ transport_submit (transport_t *this, char *buf, int32_t len, struct iovec *vector, int count, struct iobref *iobref) { - int32_t ret = -1; + int32_t ret = -1; + transport_t *peer_trans = NULL; + struct iobuf *iobuf = NULL; + struct transport_msg *msg = NULL; + + if (this->peer_trans) { + peer_trans = this->peer_trans; + + msg = CALLOC (1, sizeof (*msg)); + if (!msg) { + return -ENOMEM; + } + + msg->hdr = buf; + msg->hdrlen = len; + + if (vector) { + iobuf = iobuf_get (this->xl->ctx->iobuf_pool); + if (!iobuf) { + FREE (msg->hdr); + FREE (msg); + return -ENOMEM; + } + + iov_unload (iobuf->ptr, vector, count); + msg->iobuf = iobuf; + } + + pthread_mutex_lock (&peer_trans->handover.mutex); + { + list_add_tail (&msg->list, &peer_trans->handover.msgs); + pthread_cond_broadcast (&peer_trans->handover.cond); + } + pthread_mutex_unlock (&peer_trans->handover.mutex); + + return 0; + } GF_VALIDATE_OR_GOTO("transport", this, fail); GF_VALIDATE_OR_GOTO("transport", this->ops, fail); @@ -307,7 +343,15 @@ transport_receive (transport_t *this, char **hdr_p, size_t *hdrlen_p, int32_t ret = -1; GF_VALIDATE_OR_GOTO("transport", this, fail); - + + if (this->peer_trans) { + *hdr_p = this->handover.msg->hdr; + *hdrlen_p = this->handover.msg->hdrlen; + *iobuf_p = this->handover.msg->iobuf; + + return 0; + } + ret = this->ops->receive (this, hdr_p, hdrlen_p, iobuf_p); fail: return ret; @@ -338,3 +382,56 @@ fail: return ret; } + +void * +transport_peerproc (void *trans_data) +{ + transport_t *trans = NULL; + struct transport_msg *msg = NULL; + + trans = trans_data; + + while (1) { + pthread_mutex_lock (&trans->handover.mutex); + { + while (list_empty (&trans->handover.msgs)) + pthread_cond_wait (&trans->handover.cond, + &trans->handover.mutex); + + msg = list_entry (trans->handover.msgs.next, + struct transport_msg, list); + + list_del_init (&msg->list); + } + pthread_mutex_unlock (&trans->handover.mutex); + + trans->handover.msg = msg; + + trans->xl->notify (trans->xl, GF_EVENT_POLLIN, trans); + + FREE (msg); + } +} + + +int +transport_setpeer (transport_t *trans, transport_t *peer_trans) +{ + trans->peer_trans = peer_trans; + + INIT_LIST_HEAD (&trans->handover.msgs); + pthread_cond_init (&trans->handover.cond, NULL); + pthread_mutex_init (&trans->handover.mutex, NULL); + pthread_create (&trans->handover.thread, NULL, + transport_peerproc, trans); + + peer_trans->peer_trans = trans; + + INIT_LIST_HEAD (&peer_trans->handover.msgs); + pthread_cond_init (&peer_trans->handover.cond, NULL); + pthread_mutex_init (&peer_trans->handover.mutex, NULL); + pthread_create (&peer_trans->handover.thread, NULL, + transport_peerproc, peer_trans); + + return 0; +} -- cgit