diff options
Diffstat (limited to 'xlators/features/changelog/src/changelog-ev-handle.c')
| -rw-r--r-- | xlators/features/changelog/src/changelog-ev-handle.c | 412 |
1 files changed, 412 insertions, 0 deletions
diff --git a/xlators/features/changelog/src/changelog-ev-handle.c b/xlators/features/changelog/src/changelog-ev-handle.c new file mode 100644 index 00000000000..aa94459de5a --- /dev/null +++ b/xlators/features/changelog/src/changelog-ev-handle.c @@ -0,0 +1,412 @@ +/* + Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com> + This file is part of GlusterFS. + + This file is licensed to you under your choice of the GNU Lesser + General Public License, version 3 or any later version (LGPLv3 or + later), or the GNU General Public License, version 2 (GPLv2), in all + cases as published by the Free Software Foundation. +*/ + +#include "changelog-ev-handle.h" +#include "changelog-rpc-common.h" +#include "changelog-helpers.h" + +struct rpc_clnt_program changelog_ev_program; + +#define NR_IOVEC (MAX_IOVEC - 3) +struct ev_rpc_vec { + int count; + struct iovec vector[NR_IOVEC]; + + /* sequence number */ + unsigned long seq; +}; + +struct ev_rpc { + rbuf_list_t *rlist; + struct rpc_clnt *rpc; + struct ev_rpc_vec vec; +}; + +/** + * As of now this just does the minimal (retval logging). Going further + * un-acknowledges sequence numbers can be retransmitted and other + * intelligence can be built into the server. + */ +int +changelog_event_dispatch_cbk(struct rpc_req *req, struct iovec *iov, int count, + void *myframe) +{ + return 0; +} + +/* dispatcher RPC */ +int +changelog_dispatch_vec(call_frame_t *frame, xlator_t *this, + struct rpc_clnt *rpc, struct ev_rpc_vec *vec) +{ + struct timeval tv = { + 0, + }; + changelog_event_req req = { + 0, + }; + + (void)gettimeofday(&tv, NULL); + + /** + * Event dispatch RPC header contains a sequence number for each + * dispatch. This allows the receiver to order the request before + * processing. + */ + req.seq = vec->seq; + req.tv_sec = tv.tv_sec; + req.tv_usec = tv.tv_usec; + + return changelog_rpc_sumbit_req( + rpc, (void *)&req, frame, &changelog_ev_program, + CHANGELOG_REV_PROC_EVENT, vec->vector, vec->count, NULL, this, + changelog_event_dispatch_cbk, (xdrproc_t)xdr_changelog_event_req); +} + +int +changelog_event_dispatch_rpc(call_frame_t *frame, xlator_t *this, void *data) +{ + int idx = 0; + int count = 0; + int ret = 0; + unsigned long sequence = 0; + rbuf_iovec_t *rvec = NULL; + struct ev_rpc *erpc = NULL; + struct rlist_iter riter = { + { + 0, + }, + }; + + /* dispatch NR_IOVEC IO vectors at a time. */ + + erpc = data; + sequence = erpc->rlist->seq[0]; + + rlist_iter_init(&riter, erpc->rlist); + + rvec_for_each_entry(rvec, &riter) + { + idx = count % NR_IOVEC; + if (++count == NR_IOVEC) { + erpc->vec.vector[idx] = rvec->iov; + erpc->vec.seq = sequence++; + erpc->vec.count = NR_IOVEC; + + ret = changelog_dispatch_vec(frame, this, erpc->rpc, &erpc->vec); + if (ret) + break; + count = 0; + continue; + } + + erpc->vec.vector[idx] = rvec->iov; + } + + if (ret) + goto error_return; + + idx = count % NR_IOVEC; + if (idx) { + erpc->vec.seq = sequence; + erpc->vec.count = idx; + + ret = changelog_dispatch_vec(frame, this, erpc->rpc, &erpc->vec); + } + +error_return: + return ret; +} + +int +changelog_rpc_notify(struct rpc_clnt *rpc, void *mydata, rpc_clnt_event_t event, + void *data) +{ + xlator_t *this = NULL; + changelog_rpc_clnt_t *crpc = NULL; + changelog_clnt_t *c_clnt = NULL; + changelog_priv_t *priv = NULL; + changelog_ev_selector_t *selection = NULL; + uint64_t clntcnt = 0; + uint64_t xprtcnt = 0; + + crpc = mydata; + this = crpc->this; + c_clnt = crpc->c_clnt; + + priv = this->private; + + switch (event) { + case RPC_CLNT_CONNECT: + selection = &priv->ev_selection; + GF_ATOMIC_INC(priv->clntcnt); + + LOCK(&c_clnt->wait_lock); + { + LOCK(&c_clnt->active_lock); + { + changelog_select_event(this, selection, crpc->filter); + list_move_tail(&crpc->list, &c_clnt->active); + } + UNLOCK(&c_clnt->active_lock); + } + UNLOCK(&c_clnt->wait_lock); + + break; + case RPC_CLNT_DISCONNECT: + rpc_clnt_disable(crpc->rpc); + + /* rpc_clnt_disable doesn't unref the rpc. It just marks + * the rpc as disabled and cancels reconnection timer. + * Hence unref the rpc object to free it. + */ + rpc_clnt_unref(crpc->rpc); + + if (priv) + selection = &priv->ev_selection; + + LOCK(&crpc->lock); + { + if (selection) + changelog_deselect_event(this, selection, crpc->filter); + changelog_set_disconnect_flag(crpc, _gf_true); + } + UNLOCK(&crpc->lock); + LOCK(&c_clnt->active_lock); + { + list_del_init(&crpc->list); + } + UNLOCK(&c_clnt->active_lock); + + break; + case RPC_CLNT_MSG: + case RPC_CLNT_DESTROY: + /* Free up mydata */ + changelog_rpc_clnt_unref(crpc); + clntcnt = GF_ATOMIC_DEC(priv->clntcnt); + xprtcnt = GF_ATOMIC_GET(priv->xprtcnt); + if (this->cleanup_starting) { + if (!clntcnt && !xprtcnt) + changelog_process_cleanup_event(this); + } + break; + case RPC_CLNT_PING: + break; + } + + return 0; +} + +void * +changelog_ev_connector(void *data) +{ + xlator_t *this = NULL; + changelog_clnt_t *c_clnt = NULL; + changelog_rpc_clnt_t *crpc = NULL; + + c_clnt = data; + this = c_clnt->this; + + while (1) { + pthread_mutex_lock(&c_clnt->pending_lock); + { + while (list_empty(&c_clnt->pending)) + pthread_cond_wait(&c_clnt->pending_cond, &c_clnt->pending_lock); + crpc = list_first_entry(&c_clnt->pending, changelog_rpc_clnt_t, + list); + crpc->rpc = changelog_rpc_client_init(this, crpc, crpc->sock, + changelog_rpc_notify); + if (!crpc->rpc) { + gf_smsg(this->name, GF_LOG_ERROR, 0, + CHANGELOG_MSG_RPC_CONNECT_ERROR, "path=%s", crpc->sock, + NULL); + crpc->cleanup(crpc); + goto mutex_unlock; + } + + LOCK(&c_clnt->wait_lock); + { + list_move_tail(&crpc->list, &c_clnt->waitq); + } + UNLOCK(&c_clnt->wait_lock); + } + mutex_unlock: + pthread_mutex_unlock(&c_clnt->pending_lock); + } + + return NULL; +} + +void +changelog_ev_cleanup_connections(xlator_t *this, changelog_clnt_t *c_clnt) +{ + changelog_rpc_clnt_t *crpc = NULL; + + /* cleanup active connections */ + LOCK(&c_clnt->active_lock); + { + list_for_each_entry(crpc, &c_clnt->active, list) + { + rpc_clnt_disable(crpc->rpc); + } + } + UNLOCK(&c_clnt->active_lock); +} + +/** + * TODO: granularize lock + * + * If we have multiple threads dispatching events, doing it this way is + * a performance bottleneck. + */ + +static changelog_rpc_clnt_t * +get_client(changelog_clnt_t *c_clnt, struct list_head **next) +{ + changelog_rpc_clnt_t *crpc = NULL; + + LOCK(&c_clnt->active_lock); + { + if (*next == &c_clnt->active) + goto unblock; + crpc = list_entry(*next, changelog_rpc_clnt_t, list); + /* ref rpc as DISCONNECT might unref the rpc asynchronously */ + changelog_rpc_clnt_ref(crpc); + rpc_clnt_ref(crpc->rpc); + *next = (*next)->next; + } +unblock: + UNLOCK(&c_clnt->active_lock); + + return crpc; +} + +static void +put_client(changelog_clnt_t *c_clnt, changelog_rpc_clnt_t *crpc) +{ + LOCK(&c_clnt->active_lock); + { + rpc_clnt_unref(crpc->rpc); + changelog_rpc_clnt_unref(crpc); + } + UNLOCK(&c_clnt->active_lock); +} + +void +_dispatcher(rbuf_list_t *rlist, void *arg) +{ + xlator_t *this = NULL; + changelog_clnt_t *c_clnt = NULL; + changelog_rpc_clnt_t *crpc = NULL; + struct ev_rpc erpc = { + 0, + }; + struct list_head *next = NULL; + + c_clnt = arg; + this = c_clnt->this; + + erpc.rlist = rlist; + next = c_clnt->active.next; + + while (1) { + crpc = get_client(c_clnt, &next); + if (!crpc) + break; + erpc.rpc = crpc->rpc; + (void)changelog_invoke_rpc(this, crpc->rpc, &changelog_ev_program, + CHANGELOG_REV_PROC_EVENT, &erpc); + put_client(c_clnt, crpc); + } +} + +/** this is called under rotbuff's lock */ +void +sequencer(rbuf_list_t *rlist, void *mydata) +{ + unsigned long range = 0; + changelog_clnt_t *c_clnt = 0; + + c_clnt = mydata; + + range = (RLIST_ENTRY_COUNT(rlist)) / NR_IOVEC; + if ((RLIST_ENTRY_COUNT(rlist)) % NR_IOVEC) + range++; + RLIST_STORE_SEQ(rlist, c_clnt->sequence, range); + + c_clnt->sequence += range; +} + +void * +changelog_ev_dispatch(void *data) +{ + int ret = 0; + void *opaque = NULL; + xlator_t *this = NULL; + changelog_clnt_t *c_clnt = NULL; + struct timeval tv = { + 0, + }; + + c_clnt = data; + this = c_clnt->this; + + while (1) { + /* TODO: change this to be pthread cond based.. later */ + + tv.tv_sec = 1; + tv.tv_usec = 0; + select(0, NULL, NULL, NULL, &tv); + + ret = rbuf_get_buffer(c_clnt->rbuf, &opaque, sequencer, c_clnt); + if (ret != RBUF_CONSUMABLE) { + if (ret != RBUF_EMPTY) + gf_smsg(this->name, GF_LOG_WARNING, 0, + CHANGELOG_MSG_BUFFER_STARVATION_ERROR, + "Failed to get buffer for RPC dispatch", + "rbuf_retval=%d", ret, NULL); + continue; + } + + ret = rbuf_wait_for_completion(c_clnt->rbuf, opaque, _dispatcher, + c_clnt); + if (ret) + gf_smsg(this->name, GF_LOG_WARNING, 0, + CHANGELOG_MSG_PUT_BUFFER_FAILED, NULL); + } + + return NULL; +} + +void +changelog_ev_queue_connection(changelog_clnt_t *c_clnt, + changelog_rpc_clnt_t *crpc) +{ + pthread_mutex_lock(&c_clnt->pending_lock); + { + list_add_tail(&crpc->list, &c_clnt->pending); + pthread_cond_signal(&c_clnt->pending_cond); + } + pthread_mutex_unlock(&c_clnt->pending_lock); +} + +struct rpc_clnt_procedure changelog_ev_procs[CHANGELOG_REV_PROC_MAX] = { + [CHANGELOG_REV_PROC_NULL] = {"NULL", NULL}, + [CHANGELOG_REV_PROC_EVENT] = {"EVENT DISPATCH", + changelog_event_dispatch_rpc}, +}; + +struct rpc_clnt_program changelog_ev_program = { + .progname = "CHANGELOG EVENT DISPATCHER", + .prognum = CHANGELOG_REV_RPC_PROCNUM, + .progver = CHANGELOG_REV_RPC_PROCVER, + .numproc = CHANGELOG_REV_PROC_MAX, + .proctable = changelog_ev_procs, +}; |
