summaryrefslogtreecommitdiffstats
path: root/xlators/features/changelog/lib/src/gf-changelog-reborp.c
diff options
context:
space:
mode:
authorVenky Shankar <vshankar@redhat.com>2015-02-03 19:22:16 +0530
committerVijay Bellur <vbellur@redhat.com>2015-03-18 18:22:36 -0700
commit4737584fffcd25dbe35d17b076c95bf90a422cf2 (patch)
tree9f30e0e90c88c245787b78af3ca78d7ae05e30f2 /xlators/features/changelog/lib/src/gf-changelog-reborp.c
parent728fcd41eb39f66744d84b979dd8195fd47313ed (diff)
features/changelog: RPC'fy {libgf}changelog
This patch introduces RPC based communication between the changelog translator and libgfchangelog. It replaces the old pathetic stream based interaction that existed earlier (due to time constraints :-/). Changelog, upon initialization starts a RPC server (rpcsvc) allowing clients to invoke a probe API as a bootup mechanism to request for event notifications. During probe, clients can choose an event filter specifying the type(s) of events they are interested in. As of now there is no way to change the event notification set once the probe RPC call is made, but that is easier to implement. The actual event notifications is done on a separate RPC session. The client (libgfchangelog) itself starts and RPC server which the changelog translator "connects back" during probe. Notifications are dispatched by a bunch of threads from the server (translator) and the client optionally orders them if ordered notifications are requried. FOPs fill in their respective event details in a buffer (rot-buffs to be particular) and a bunch of threads (consumers) swap the buffers out of roatation and dispatch them via RPC. To avoid writer starvation, then number of dispatcher threads is one less than the number of buffer list in rot-buffs.x libgfchangelog becomes purely callback based -- upon event notification from the server (and re-ordering them if required) invoke a callback routine specified by consumer(s). A major part of the patch is also aimed at providing backward compatibility for geo-replication, which was one of the main consumer of the stream based API. Also, this patch does not\ "turn on" event notifications for all fops, just a bunch which is currently in requirement. Another pain point is that the server does not filter events before dispatching it to the clients. That load is taken up by the client itself (although it's done at the library layer rather than making it hard on the callback implementor). This needs improvement and care needs to be taken to not load the server up with expensive filtering mechanisms. Change-Id: Ibf60a432b68f2dfa60c6f9add2bcfd37a9c41395 BUG: 1170075 Signed-off-by: Venky Shankar <vshankar@redhat.com> Reviewed-on: http://review.gluster.org/9708 Reviewed-by: Jeff Darcy <jdarcy@redhat.com> Tested-by: Gluster Build System <jenkins@build.gluster.com> Reviewed-by: Vijay Bellur <vbellur@redhat.com>
Diffstat (limited to 'xlators/features/changelog/lib/src/gf-changelog-reborp.c')
-rw-r--r--xlators/features/changelog/lib/src/gf-changelog-reborp.c381
1 files changed, 381 insertions, 0 deletions
diff --git a/xlators/features/changelog/lib/src/gf-changelog-reborp.c b/xlators/features/changelog/lib/src/gf-changelog-reborp.c
new file mode 100644
index 00000000000..d7e60fb9634
--- /dev/null
+++ b/xlators/features/changelog/lib/src/gf-changelog-reborp.c
@@ -0,0 +1,381 @@
+/*
+ Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com>
+ This file is part of GlusterFS.
+
+ This file is licensed to you under your choice of the GNU Lesser
+ General Public License, version 3 or any later version (LGPLv3 or
+ later), or the GNU General Public License, version 2 (GPLv2), in all
+ cases as published by the Free Software Foundation.
+*/
+
+#include "changelog-misc.h"
+#include "changelog-mem-types.h"
+
+#include "gf-changelog-helpers.h"
+#include "changelog-rpc-common.h"
+
+/**
+ * Reverse socket: actual data transfer handler. Connection
+ * initiator is PROBER, data transfer is REBORP.
+ */
+
+struct rpcsvc_program *gf_changelog_reborp_programs[];
+
+/**
+ * On a reverse connection, unlink the socket file.
+ */
+int
+gf_changelog_reborp_rpcsvc_notify (rpcsvc_t *rpc, void *mydata,
+ rpcsvc_event_t event, void *data)
+{
+ int ret = 0;
+ xlator_t *this = NULL;
+ gf_private_t *priv = NULL;
+ gf_changelog_t *entry = NULL;
+ char sock[UNIX_PATH_MAX] = {0,};
+
+ entry = mydata;
+ this = entry->this;
+ priv = this->private;
+
+ switch (event) {
+ case RPCSVC_EVENT_ACCEPT:
+ ret = unlink (RPC_SOCK(entry));
+ if (ret != 0)
+ gf_log (this->name, GF_LOG_WARNING, "failed to unlink"
+ " reverse socket file %s", RPC_SOCK (entry));
+ if (entry->connected)
+ GF_CHANGELOG_INVOKE_CBK (this, entry->connected,
+ entry->brick, entry->ptr);
+ break;
+ case RPCSVC_EVENT_DISCONNECT:
+ LOCK (&priv->lock);
+ {
+ list_del (&entry->list);
+ }
+ UNLOCK (&priv->lock);
+
+ if (entry->disconnected)
+ GF_CHANGELOG_INVOKE_CBK (this, entry->disconnected,
+ entry->brick, entry->ptr);
+
+ GF_FREE (entry);
+ break;
+ default:
+ break;
+ }
+
+ return 0;
+}
+
+rpcsvc_t *
+gf_changelog_reborp_init_rpc_listner (xlator_t *this,
+ char *path, char *sock, void *cbkdata)
+{
+ CHANGELOG_MAKE_TMP_SOCKET_PATH (path, sock, UNIX_PATH_MAX);
+ return changelog_rpc_server_init (this, sock, cbkdata,
+ gf_changelog_reborp_rpcsvc_notify,
+ gf_changelog_reborp_programs);
+}
+
+/**
+ * This is dirty and painful as of now untill there is event filtering in the
+ * server. The entire event buffer is scanned and interested events are picked,
+ * whereas we _should_ be notified with the events we were interested in
+ * (selected at the time of probe). As of now this is complete BS and needs
+ * fixture ASAP. I just made it work, it needs to be better.
+ *
+ * @FIXME: cleanup this bugger once server filters events.
+ */
+inline void
+gf_changelog_invoke_callback (gf_changelog_t *entry,
+ struct iovec **vec, int payloadcnt)
+{
+ int i = 0;
+ int evsize = 0;
+ xlator_t *this = NULL;
+ changelog_event_t *event = NULL;
+
+ this = entry->this;
+
+ for (; i < payloadcnt; i++) {
+ event = (changelog_event_t *)vec[i]->iov_base;
+ evsize = vec[i]->iov_len / CHANGELOG_EV_SIZE;
+
+ for (; evsize > 0; evsize--, event++) {
+ if (gf_changelog_filter_check (entry, event)) {
+ GF_CHANGELOG_INVOKE_CBK (this,
+ entry->callback,
+ entry->brick,
+ entry->ptr, event);
+ }
+ }
+ }
+}
+
+/**
+ * Ordered event handler is self-adaptive.. if the event sequence number
+ * is what's expected (->next_seq) there is no ordering list that's
+ * maintained. On out-of-order event notifications, event buffers are
+ * dynamically allocated and ordered.
+ */
+
+inline int
+__is_expected_sequence (struct gf_event_list *ev, struct gf_event *event)
+{
+ return (ev->next_seq == event->seq);
+}
+
+inline int
+__can_process_event (struct gf_event_list *ev, struct gf_event **event)
+{
+ *event = list_first_entry (&ev->events, struct gf_event, list);
+
+ if (__is_expected_sequence (ev, *event)) {
+ list_del (&(*event)->list);
+ ev->next_seq++;
+ return 1;
+ }
+
+ return 0;
+}
+
+inline void
+__process_event_list (struct gf_event_list *ev, struct gf_event **event)
+{
+ while (list_empty (&ev->events)
+ || !__can_process_event (ev, event))
+ pthread_cond_wait (&ev->cond, &ev->lock);
+}
+
+void *
+gf_changelog_callback_invoker (void *arg)
+{
+ int ret = 0;
+ xlator_t *this = NULL;
+ gf_changelog_t *entry = NULL;
+ struct iovec *vec = NULL;
+ struct gf_event *event = NULL;
+ struct gf_event_list *ev = NULL;
+
+ ev = arg;
+ entry = ev->entry;
+ this = entry->this;
+
+ while (1) {
+ pthread_mutex_lock (&ev->lock);
+ {
+ __process_event_list (ev, &event);
+ }
+ pthread_mutex_unlock (&ev->lock);
+
+ vec = (struct iovec *) &event->iov;
+ gf_changelog_invoke_callback (entry, &vec, event->count);
+
+ GF_FREE (event);
+ }
+
+ return NULL;
+}
+
+static int
+orderfn (struct list_head *pos1, struct list_head *pos2)
+{
+ struct gf_event *event1 = NULL;
+ struct gf_event *event2 = NULL;
+
+ event1 = list_entry (pos1, struct gf_event, list);
+ event2 = list_entry (pos2, struct gf_event, list);
+
+ if (event1->seq > event2->seq)
+ return 1;
+ return -1;
+}
+
+int
+gf_changelog_ordered_event_handler (rpcsvc_request_t *req,
+ xlator_t *this, gf_changelog_t *entry)
+{
+ int i = 0;
+ size_t payloadlen = 0;
+ ssize_t len = 0;
+ int payloadcnt = 0;
+ changelog_event_req rpc_req = {0,};
+ changelog_event_rsp rpc_rsp = {0,};
+ struct iovec *vec = NULL;
+ struct gf_event *event = NULL;
+ struct gf_event_list *ev = NULL;
+
+ ev = &entry->event;
+
+ len = xdr_to_generic (req->msg[0],
+ &rpc_req, (xdrproc_t)xdr_changelog_event_req);
+ if (len < 0) {
+ gf_log (this->name, GF_LOG_ERROR, "xdr decoding failed");
+ req->rpc_err = GARBAGE_ARGS;
+ goto handle_xdr_error;
+ }
+
+ if (len < req->msg[0].iov_len) {
+ payloadcnt = 1;
+ payloadlen = (req->msg[0].iov_len - len);
+ }
+ for (i = 1; i < req->count; i++) {
+ payloadcnt++;
+ payloadlen += req->msg[i].iov_len;
+ }
+
+ event = GF_CALLOC (1, GF_EVENT_CALLOC_SIZE (payloadcnt, payloadlen),
+ gf_changelog_mt_libgfchangelog_event_t);
+ if (!event)
+ goto handle_xdr_error;
+ INIT_LIST_HEAD (&event->list);
+
+ payloadlen = 0;
+ event->seq = rpc_req.seq;
+ event->count = payloadcnt;
+
+ /* deep copy IO vectors */
+ vec = &event->iov[0];
+ GF_EVENT_ASSIGN_IOVEC (vec, event,
+ (req->msg[0].iov_len - len), payloadlen);
+ (void) memcpy (vec->iov_base,
+ req->msg[0].iov_base + len, vec->iov_len);
+
+ for (i = 1; i < req->count; i++) {
+ vec = &event->iov[i];
+ GF_EVENT_ASSIGN_IOVEC (vec, event,
+ req->msg[i].iov_len, payloadlen);
+ (void) memcpy (event->iov[i].iov_base,
+ req->msg[i].iov_base, req->msg[i].iov_len);
+ }
+
+ gf_log (this->name, GF_LOG_DEBUG,
+ "seq: %lu [%s] (time: %lu.%lu), (vec: %d, len: %ld)",
+ rpc_req.seq, entry->brick, rpc_req.tv_sec,
+ rpc_req.tv_usec, payloadcnt, payloadlen);
+
+ /* add it to the ordered event list and wake up listner(s) */
+ pthread_mutex_lock (&ev->lock);
+ {
+ list_add_order (&event->list, &ev->events, orderfn);
+ if (!ev->next_seq)
+ ev->next_seq = event->seq;
+ if (ev->next_seq == event->seq)
+ pthread_cond_signal (&ev->cond);
+ }
+ pthread_mutex_unlock (&ev->lock);
+
+ /* ack sequence number */
+ rpc_rsp.op_ret = 0;
+ rpc_rsp.seq = rpc_req.seq;
+
+ goto submit_rpc;
+
+ handle_xdr_error:
+ rpc_rsp.op_ret = -1;
+ rpc_rsp.seq = 0; /* invalid */
+ submit_rpc:
+ return changelog_rpc_sumbit_reply (req, &rpc_rsp, NULL, 0, NULL,
+ (xdrproc_t)xdr_changelog_event_rsp);
+}
+
+int
+gf_changelog_unordered_event_handler (rpcsvc_request_t *req,
+ xlator_t *this, gf_changelog_t *entry)
+{
+ int i = 0;
+ int ret = 0;
+ ssize_t len = 0;
+ int payloadcnt = 0;
+ struct iovec vector[MAX_IOVEC] = {{0,}, };
+ changelog_event_req rpc_req = {0,};
+ changelog_event_rsp rpc_rsp = {0,};
+
+ len = xdr_to_generic (req->msg[0],
+ &rpc_req, (xdrproc_t)xdr_changelog_event_req);
+ if (len < 0) {
+ gf_log (this->name, GF_LOG_ERROR, "xdr decoding failed");
+ req->rpc_err = GARBAGE_ARGS;
+ goto handle_xdr_error;
+ }
+
+ /* prepare payload */
+ if (len < req->msg[0].iov_len) {
+ payloadcnt = 1;
+ vector[0].iov_base = (req->msg[0].iov_base + len);
+ vector[0].iov_len = (req->msg[0].iov_len - len);
+ }
+
+ for (i = 1; i < req->count; i++) {
+ vector[payloadcnt++] = req->msg[i];
+ }
+
+ gf_log (this->name, GF_LOG_DEBUG,
+ "seq: %lu (time: %lu.%lu), (vec: %d)",
+ rpc_req.seq, rpc_req.tv_sec, rpc_req.tv_usec, payloadcnt);
+
+ /* invoke callback */
+ struct iovec *vec = (struct iovec *) &vector;
+ gf_changelog_invoke_callback (entry, &vec, payloadcnt);
+
+ /* ack sequence number */
+ rpc_rsp.op_ret = 0;
+ rpc_rsp.seq = rpc_req.seq;
+
+ goto submit_rpc;
+
+ handle_xdr_error:
+ rpc_rsp.op_ret = -1;
+ rpc_rsp.seq = 0; /* invalid */
+ submit_rpc:
+ return changelog_rpc_sumbit_reply (req, &rpc_rsp, NULL, 0, NULL,
+ (xdrproc_t)xdr_changelog_event_rsp);
+}
+
+int
+gf_changelog_reborp_handle_event (rpcsvc_request_t *req)
+{
+ int ret = 0;
+ xlator_t *this = NULL;
+ rpcsvc_t *svc = NULL;
+ gf_changelog_t *entry = NULL;
+
+ svc = rpcsvc_request_service (req);
+ entry = svc->mydata;
+
+ this = THIS = entry->this;
+
+ ret = GF_NEED_ORDERED_EVENTS (entry)
+ ? gf_changelog_ordered_event_handler (req, this, entry)
+ : gf_changelog_unordered_event_handler (req, this, entry);
+
+ return ret;
+}
+
+rpcsvc_actor_t gf_changelog_reborp_actors[CHANGELOG_REV_PROC_MAX] = {
+ [CHANGELOG_REV_PROC_EVENT] = {
+ "CHANGELOG EVENT HANDLER", CHANGELOG_REV_PROC_EVENT,
+ gf_changelog_reborp_handle_event, NULL, 0, DRC_NA
+ },
+};
+
+/**
+ * Do not use synctask as the RPC layer dereferences ->mydata as THIS.
+ * In gf_changelog_setup_rpc(), @cbkdata is of type @gf_changelog_t,
+ * and that's required to invoke the callback with the appropriate
+ * brick path and it's private data.
+ */
+struct rpcsvc_program gf_changelog_reborp_prog = {
+ .progname = "LIBGFCHANGELOG REBORP",
+ .prognum = CHANGELOG_REV_RPC_PROCNUM,
+ .progver = CHANGELOG_REV_RPC_PROCVER,
+ .numactors = CHANGELOG_REV_PROC_MAX,
+ .actors = gf_changelog_reborp_actors,
+ .synctask = _gf_false,
+};
+
+struct rpcsvc_program *gf_changelog_reborp_programs[] = {
+ &gf_changelog_reborp_prog,
+ NULL,
+};