diff options
Diffstat (limited to 'libglusterfs/src')
-rw-r--r-- | libglusterfs/src/transport.c | 101 | ||||
-rw-r--r-- | libglusterfs/src/transport.h | 19 |
2 files changed, 118 insertions, 2 deletions
diff --git a/libglusterfs/src/transport.c b/libglusterfs/src/transport.c index 2c2894d90..244aa960b 100644 --- a/libglusterfs/src/transport.c +++ b/libglusterfs/src/transport.c @@ -214,7 +214,43 @@ transport_submit (transport_t *this, char *buf, int32_t len, struct iovec *vector, int count, struct iobref *iobref) { - int32_t ret = -1; + int32_t ret = -1; + transport_t *peer_trans = NULL; + struct iobuf *iobuf = NULL; + struct transport_msg *msg = NULL; + + if (this->peer_trans) { + peer_trans = this->peer_trans; + + msg = CALLOC (1, sizeof (*msg)); + if (!msg) { + return -ENOMEM; + } + + msg->hdr = buf; + msg->hdrlen = len; + + if (vector) { + iobuf = iobuf_get (this->xl->ctx->iobuf_pool); + if (!iobuf) { + FREE (msg->hdr); + FREE (msg); + return -ENOMEM; + } + + iov_unload (iobuf->ptr, vector, count); + msg->iobuf = iobuf; + } + + pthread_mutex_lock (&peer_trans->handover.mutex); + { + list_add_tail (&msg->list, &peer_trans->handover.msgs); + pthread_cond_broadcast (&peer_trans->handover.cond); + } + pthread_mutex_unlock (&peer_trans->handover.mutex); + + return 0; + } GF_VALIDATE_OR_GOTO("transport", this, fail); GF_VALIDATE_OR_GOTO("transport", this->ops, fail); @@ -307,7 +343,15 @@ transport_receive (transport_t *this, char **hdr_p, size_t *hdrlen_p, int32_t ret = -1; GF_VALIDATE_OR_GOTO("transport", this, fail); - + + if (this->peer_trans) { + *hdr_p = this->handover.msg->hdr; + *hdrlen_p = this->handover.msg->hdrlen; + *iobuf_p = this->handover.msg->iobuf; + + return 0; + } + ret = this->ops->receive (this, hdr_p, hdrlen_p, iobuf_p); fail: return ret; @@ -338,3 +382,56 @@ fail: return ret; } + +void * +transport_peerproc (void *trans_data) +{ + transport_t *trans = NULL; + struct transport_msg *msg = NULL; + + trans = trans_data; + + while (1) { + pthread_mutex_lock (&trans->handover.mutex); + { + while (list_empty (&trans->handover.msgs)) + pthread_cond_wait (&trans->handover.cond, + &trans->handover.mutex); + + msg = list_entry (trans->handover.msgs.next, + struct transport_msg, list); + + list_del_init (&msg->list); + } + pthread_mutex_unlock (&trans->handover.mutex); + + trans->handover.msg = msg; + + trans->xl->notify (trans->xl, GF_EVENT_POLLIN, trans); + + FREE (msg); + } +} + + +int +transport_setpeer (transport_t *trans, transport_t *peer_trans) +{ + trans->peer_trans = peer_trans; + + INIT_LIST_HEAD (&trans->handover.msgs); + pthread_cond_init (&trans->handover.cond, NULL); + pthread_mutex_init (&trans->handover.mutex, NULL); + pthread_create (&trans->handover.thread, NULL, + transport_peerproc, trans); + + peer_trans->peer_trans = trans; + + INIT_LIST_HEAD (&peer_trans->handover.msgs); + pthread_cond_init (&peer_trans->handover.cond, NULL); + pthread_mutex_init (&peer_trans->handover.mutex, NULL); + pthread_create (&peer_trans->handover.thread, NULL, + transport_peerproc, peer_trans); + + return 0; +} diff --git a/libglusterfs/src/transport.h b/libglusterfs/src/transport.h index a7cd5ec8f..2d85c76cc 100644 --- a/libglusterfs/src/transport.h +++ b/libglusterfs/src/transport.h @@ -40,6 +40,13 @@ typedef struct peer_info { char identifier[UNIX_PATH_MAX]; }peer_info_t; +struct transport_msg { + struct list_head list; + char *hdr; + int hdrlen; + struct iobuf *iobuf; +}; + struct transport { struct transport_ops *ops; void *private; @@ -55,6 +62,16 @@ struct transport { /* int (*notify) (transport_t *this, int event, void *data); */ peer_info_t peerinfo; peer_info_t myinfo; + + transport_t *peer_trans; + struct { + pthread_mutex_t mutex; + pthread_cond_t cond; + pthread_t thread; + struct list_head msgs; + struct transport_msg *msg; + } handover; + }; struct transport_ops { @@ -84,4 +101,6 @@ transport_t *transport_load (dict_t *options, xlator_t *xl); transport_t *transport_ref (transport_t *trans); int32_t transport_unref (transport_t *trans); +int transport_setpeer (transport_t *trans, transport_t *trans_peer); + #endif /* __TRANSPORT_H__ */ |