summaryrefslogtreecommitdiffstats
path: root/xlators/features/changelog/src/changelog-ev-handle.c
diff options
context:
space:
mode:
Diffstat (limited to 'xlators/features/changelog/src/changelog-ev-handle.c')
-rw-r--r--xlators/features/changelog/src/changelog-ev-handle.c412
1 files changed, 412 insertions, 0 deletions
diff --git a/xlators/features/changelog/src/changelog-ev-handle.c b/xlators/features/changelog/src/changelog-ev-handle.c
new file mode 100644
index 00000000000..aa94459de5a
--- /dev/null
+++ b/xlators/features/changelog/src/changelog-ev-handle.c
@@ -0,0 +1,412 @@
+/*
+ Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com>
+ This file is part of GlusterFS.
+
+ This file is licensed to you under your choice of the GNU Lesser
+ General Public License, version 3 or any later version (LGPLv3 or
+ later), or the GNU General Public License, version 2 (GPLv2), in all
+ cases as published by the Free Software Foundation.
+*/
+
+#include "changelog-ev-handle.h"
+#include "changelog-rpc-common.h"
+#include "changelog-helpers.h"
+
+struct rpc_clnt_program changelog_ev_program;
+
+#define NR_IOVEC (MAX_IOVEC - 3)
+struct ev_rpc_vec {
+ int count;
+ struct iovec vector[NR_IOVEC];
+
+ /* sequence number */
+ unsigned long seq;
+};
+
+struct ev_rpc {
+ rbuf_list_t *rlist;
+ struct rpc_clnt *rpc;
+ struct ev_rpc_vec vec;
+};
+
+/**
+ * As of now this just does the minimal (retval logging). Going further
+ * un-acknowledges sequence numbers can be retransmitted and other
+ * intelligence can be built into the server.
+ */
+int
+changelog_event_dispatch_cbk(struct rpc_req *req, struct iovec *iov, int count,
+ void *myframe)
+{
+ return 0;
+}
+
+/* dispatcher RPC */
+int
+changelog_dispatch_vec(call_frame_t *frame, xlator_t *this,
+ struct rpc_clnt *rpc, struct ev_rpc_vec *vec)
+{
+ struct timeval tv = {
+ 0,
+ };
+ changelog_event_req req = {
+ 0,
+ };
+
+ (void)gettimeofday(&tv, NULL);
+
+ /**
+ * Event dispatch RPC header contains a sequence number for each
+ * dispatch. This allows the receiver to order the request before
+ * processing.
+ */
+ req.seq = vec->seq;
+ req.tv_sec = tv.tv_sec;
+ req.tv_usec = tv.tv_usec;
+
+ return changelog_rpc_sumbit_req(
+ rpc, (void *)&req, frame, &changelog_ev_program,
+ CHANGELOG_REV_PROC_EVENT, vec->vector, vec->count, NULL, this,
+ changelog_event_dispatch_cbk, (xdrproc_t)xdr_changelog_event_req);
+}
+
+int
+changelog_event_dispatch_rpc(call_frame_t *frame, xlator_t *this, void *data)
+{
+ int idx = 0;
+ int count = 0;
+ int ret = 0;
+ unsigned long sequence = 0;
+ rbuf_iovec_t *rvec = NULL;
+ struct ev_rpc *erpc = NULL;
+ struct rlist_iter riter = {
+ {
+ 0,
+ },
+ };
+
+ /* dispatch NR_IOVEC IO vectors at a time. */
+
+ erpc = data;
+ sequence = erpc->rlist->seq[0];
+
+ rlist_iter_init(&riter, erpc->rlist);
+
+ rvec_for_each_entry(rvec, &riter)
+ {
+ idx = count % NR_IOVEC;
+ if (++count == NR_IOVEC) {
+ erpc->vec.vector[idx] = rvec->iov;
+ erpc->vec.seq = sequence++;
+ erpc->vec.count = NR_IOVEC;
+
+ ret = changelog_dispatch_vec(frame, this, erpc->rpc, &erpc->vec);
+ if (ret)
+ break;
+ count = 0;
+ continue;
+ }
+
+ erpc->vec.vector[idx] = rvec->iov;
+ }
+
+ if (ret)
+ goto error_return;
+
+ idx = count % NR_IOVEC;
+ if (idx) {
+ erpc->vec.seq = sequence;
+ erpc->vec.count = idx;
+
+ ret = changelog_dispatch_vec(frame, this, erpc->rpc, &erpc->vec);
+ }
+
+error_return:
+ return ret;
+}
+
+int
+changelog_rpc_notify(struct rpc_clnt *rpc, void *mydata, rpc_clnt_event_t event,
+ void *data)
+{
+ xlator_t *this = NULL;
+ changelog_rpc_clnt_t *crpc = NULL;
+ changelog_clnt_t *c_clnt = NULL;
+ changelog_priv_t *priv = NULL;
+ changelog_ev_selector_t *selection = NULL;
+ uint64_t clntcnt = 0;
+ uint64_t xprtcnt = 0;
+
+ crpc = mydata;
+ this = crpc->this;
+ c_clnt = crpc->c_clnt;
+
+ priv = this->private;
+
+ switch (event) {
+ case RPC_CLNT_CONNECT:
+ selection = &priv->ev_selection;
+ GF_ATOMIC_INC(priv->clntcnt);
+
+ LOCK(&c_clnt->wait_lock);
+ {
+ LOCK(&c_clnt->active_lock);
+ {
+ changelog_select_event(this, selection, crpc->filter);
+ list_move_tail(&crpc->list, &c_clnt->active);
+ }
+ UNLOCK(&c_clnt->active_lock);
+ }
+ UNLOCK(&c_clnt->wait_lock);
+
+ break;
+ case RPC_CLNT_DISCONNECT:
+ rpc_clnt_disable(crpc->rpc);
+
+ /* rpc_clnt_disable doesn't unref the rpc. It just marks
+ * the rpc as disabled and cancels reconnection timer.
+ * Hence unref the rpc object to free it.
+ */
+ rpc_clnt_unref(crpc->rpc);
+
+ if (priv)
+ selection = &priv->ev_selection;
+
+ LOCK(&crpc->lock);
+ {
+ if (selection)
+ changelog_deselect_event(this, selection, crpc->filter);
+ changelog_set_disconnect_flag(crpc, _gf_true);
+ }
+ UNLOCK(&crpc->lock);
+ LOCK(&c_clnt->active_lock);
+ {
+ list_del_init(&crpc->list);
+ }
+ UNLOCK(&c_clnt->active_lock);
+
+ break;
+ case RPC_CLNT_MSG:
+ case RPC_CLNT_DESTROY:
+ /* Free up mydata */
+ changelog_rpc_clnt_unref(crpc);
+ clntcnt = GF_ATOMIC_DEC(priv->clntcnt);
+ xprtcnt = GF_ATOMIC_GET(priv->xprtcnt);
+ if (this->cleanup_starting) {
+ if (!clntcnt && !xprtcnt)
+ changelog_process_cleanup_event(this);
+ }
+ break;
+ case RPC_CLNT_PING:
+ break;
+ }
+
+ return 0;
+}
+
+void *
+changelog_ev_connector(void *data)
+{
+ xlator_t *this = NULL;
+ changelog_clnt_t *c_clnt = NULL;
+ changelog_rpc_clnt_t *crpc = NULL;
+
+ c_clnt = data;
+ this = c_clnt->this;
+
+ while (1) {
+ pthread_mutex_lock(&c_clnt->pending_lock);
+ {
+ while (list_empty(&c_clnt->pending))
+ pthread_cond_wait(&c_clnt->pending_cond, &c_clnt->pending_lock);
+ crpc = list_first_entry(&c_clnt->pending, changelog_rpc_clnt_t,
+ list);
+ crpc->rpc = changelog_rpc_client_init(this, crpc, crpc->sock,
+ changelog_rpc_notify);
+ if (!crpc->rpc) {
+ gf_smsg(this->name, GF_LOG_ERROR, 0,
+ CHANGELOG_MSG_RPC_CONNECT_ERROR, "path=%s", crpc->sock,
+ NULL);
+ crpc->cleanup(crpc);
+ goto mutex_unlock;
+ }
+
+ LOCK(&c_clnt->wait_lock);
+ {
+ list_move_tail(&crpc->list, &c_clnt->waitq);
+ }
+ UNLOCK(&c_clnt->wait_lock);
+ }
+ mutex_unlock:
+ pthread_mutex_unlock(&c_clnt->pending_lock);
+ }
+
+ return NULL;
+}
+
+void
+changelog_ev_cleanup_connections(xlator_t *this, changelog_clnt_t *c_clnt)
+{
+ changelog_rpc_clnt_t *crpc = NULL;
+
+ /* cleanup active connections */
+ LOCK(&c_clnt->active_lock);
+ {
+ list_for_each_entry(crpc, &c_clnt->active, list)
+ {
+ rpc_clnt_disable(crpc->rpc);
+ }
+ }
+ UNLOCK(&c_clnt->active_lock);
+}
+
+/**
+ * TODO: granularize lock
+ *
+ * If we have multiple threads dispatching events, doing it this way is
+ * a performance bottleneck.
+ */
+
+static changelog_rpc_clnt_t *
+get_client(changelog_clnt_t *c_clnt, struct list_head **next)
+{
+ changelog_rpc_clnt_t *crpc = NULL;
+
+ LOCK(&c_clnt->active_lock);
+ {
+ if (*next == &c_clnt->active)
+ goto unblock;
+ crpc = list_entry(*next, changelog_rpc_clnt_t, list);
+ /* ref rpc as DISCONNECT might unref the rpc asynchronously */
+ changelog_rpc_clnt_ref(crpc);
+ rpc_clnt_ref(crpc->rpc);
+ *next = (*next)->next;
+ }
+unblock:
+ UNLOCK(&c_clnt->active_lock);
+
+ return crpc;
+}
+
+static void
+put_client(changelog_clnt_t *c_clnt, changelog_rpc_clnt_t *crpc)
+{
+ LOCK(&c_clnt->active_lock);
+ {
+ rpc_clnt_unref(crpc->rpc);
+ changelog_rpc_clnt_unref(crpc);
+ }
+ UNLOCK(&c_clnt->active_lock);
+}
+
+void
+_dispatcher(rbuf_list_t *rlist, void *arg)
+{
+ xlator_t *this = NULL;
+ changelog_clnt_t *c_clnt = NULL;
+ changelog_rpc_clnt_t *crpc = NULL;
+ struct ev_rpc erpc = {
+ 0,
+ };
+ struct list_head *next = NULL;
+
+ c_clnt = arg;
+ this = c_clnt->this;
+
+ erpc.rlist = rlist;
+ next = c_clnt->active.next;
+
+ while (1) {
+ crpc = get_client(c_clnt, &next);
+ if (!crpc)
+ break;
+ erpc.rpc = crpc->rpc;
+ (void)changelog_invoke_rpc(this, crpc->rpc, &changelog_ev_program,
+ CHANGELOG_REV_PROC_EVENT, &erpc);
+ put_client(c_clnt, crpc);
+ }
+}
+
+/** this is called under rotbuff's lock */
+void
+sequencer(rbuf_list_t *rlist, void *mydata)
+{
+ unsigned long range = 0;
+ changelog_clnt_t *c_clnt = 0;
+
+ c_clnt = mydata;
+
+ range = (RLIST_ENTRY_COUNT(rlist)) / NR_IOVEC;
+ if ((RLIST_ENTRY_COUNT(rlist)) % NR_IOVEC)
+ range++;
+ RLIST_STORE_SEQ(rlist, c_clnt->sequence, range);
+
+ c_clnt->sequence += range;
+}
+
+void *
+changelog_ev_dispatch(void *data)
+{
+ int ret = 0;
+ void *opaque = NULL;
+ xlator_t *this = NULL;
+ changelog_clnt_t *c_clnt = NULL;
+ struct timeval tv = {
+ 0,
+ };
+
+ c_clnt = data;
+ this = c_clnt->this;
+
+ while (1) {
+ /* TODO: change this to be pthread cond based.. later */
+
+ tv.tv_sec = 1;
+ tv.tv_usec = 0;
+ select(0, NULL, NULL, NULL, &tv);
+
+ ret = rbuf_get_buffer(c_clnt->rbuf, &opaque, sequencer, c_clnt);
+ if (ret != RBUF_CONSUMABLE) {
+ if (ret != RBUF_EMPTY)
+ gf_smsg(this->name, GF_LOG_WARNING, 0,
+ CHANGELOG_MSG_BUFFER_STARVATION_ERROR,
+ "Failed to get buffer for RPC dispatch",
+ "rbuf_retval=%d", ret, NULL);
+ continue;
+ }
+
+ ret = rbuf_wait_for_completion(c_clnt->rbuf, opaque, _dispatcher,
+ c_clnt);
+ if (ret)
+ gf_smsg(this->name, GF_LOG_WARNING, 0,
+ CHANGELOG_MSG_PUT_BUFFER_FAILED, NULL);
+ }
+
+ return NULL;
+}
+
+void
+changelog_ev_queue_connection(changelog_clnt_t *c_clnt,
+ changelog_rpc_clnt_t *crpc)
+{
+ pthread_mutex_lock(&c_clnt->pending_lock);
+ {
+ list_add_tail(&crpc->list, &c_clnt->pending);
+ pthread_cond_signal(&c_clnt->pending_cond);
+ }
+ pthread_mutex_unlock(&c_clnt->pending_lock);
+}
+
+struct rpc_clnt_procedure changelog_ev_procs[CHANGELOG_REV_PROC_MAX] = {
+ [CHANGELOG_REV_PROC_NULL] = {"NULL", NULL},
+ [CHANGELOG_REV_PROC_EVENT] = {"EVENT DISPATCH",
+ changelog_event_dispatch_rpc},
+};
+
+struct rpc_clnt_program changelog_ev_program = {
+ .progname = "CHANGELOG EVENT DISPATCHER",
+ .prognum = CHANGELOG_REV_RPC_PROCNUM,
+ .progver = CHANGELOG_REV_RPC_PROCVER,
+ .numproc = CHANGELOG_REV_PROC_MAX,
+ .proctable = changelog_ev_procs,
+};