summaryrefslogtreecommitdiffstats
path: root/xlators/features/changelog/lib/src/gf-changelog-reborp.c
diff options
context:
space:
mode:
Diffstat (limited to 'xlators/features/changelog/lib/src/gf-changelog-reborp.c')
-rw-r--r--xlators/features/changelog/lib/src/gf-changelog-reborp.c381
1 files changed, 381 insertions, 0 deletions
diff --git a/xlators/features/changelog/lib/src/gf-changelog-reborp.c b/xlators/features/changelog/lib/src/gf-changelog-reborp.c
new file mode 100644
index 00000000000..d7e60fb9634
--- /dev/null
+++ b/xlators/features/changelog/lib/src/gf-changelog-reborp.c
@@ -0,0 +1,381 @@
+/*
+ Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com>
+ This file is part of GlusterFS.
+
+ This file is licensed to you under your choice of the GNU Lesser
+ General Public License, version 3 or any later version (LGPLv3 or
+ later), or the GNU General Public License, version 2 (GPLv2), in all
+ cases as published by the Free Software Foundation.
+*/
+
+#include "changelog-misc.h"
+#include "changelog-mem-types.h"
+
+#include "gf-changelog-helpers.h"
+#include "changelog-rpc-common.h"
+
+/**
+ * Reverse socket: actual data transfer handler. Connection
+ * initiator is PROBER, data transfer is REBORP.
+ */
+
+struct rpcsvc_program *gf_changelog_reborp_programs[];
+
+/**
+ * On a reverse connection, unlink the socket file.
+ */
+int
+gf_changelog_reborp_rpcsvc_notify (rpcsvc_t *rpc, void *mydata,
+ rpcsvc_event_t event, void *data)
+{
+ int ret = 0;
+ xlator_t *this = NULL;
+ gf_private_t *priv = NULL;
+ gf_changelog_t *entry = NULL;
+ char sock[UNIX_PATH_MAX] = {0,};
+
+ entry = mydata;
+ this = entry->this;
+ priv = this->private;
+
+ switch (event) {
+ case RPCSVC_EVENT_ACCEPT:
+ ret = unlink (RPC_SOCK(entry));
+ if (ret != 0)
+ gf_log (this->name, GF_LOG_WARNING, "failed to unlink"
+ " reverse socket file %s", RPC_SOCK (entry));
+ if (entry->connected)
+ GF_CHANGELOG_INVOKE_CBK (this, entry->connected,
+ entry->brick, entry->ptr);
+ break;
+ case RPCSVC_EVENT_DISCONNECT:
+ LOCK (&priv->lock);
+ {
+ list_del (&entry->list);
+ }
+ UNLOCK (&priv->lock);
+
+ if (entry->disconnected)
+ GF_CHANGELOG_INVOKE_CBK (this, entry->disconnected,
+ entry->brick, entry->ptr);
+
+ GF_FREE (entry);
+ break;
+ default:
+ break;
+ }
+
+ return 0;
+}
+
+rpcsvc_t *
+gf_changelog_reborp_init_rpc_listner (xlator_t *this,
+ char *path, char *sock, void *cbkdata)
+{
+ CHANGELOG_MAKE_TMP_SOCKET_PATH (path, sock, UNIX_PATH_MAX);
+ return changelog_rpc_server_init (this, sock, cbkdata,
+ gf_changelog_reborp_rpcsvc_notify,
+ gf_changelog_reborp_programs);
+}
+
+/**
+ * This is dirty and painful as of now untill there is event filtering in the
+ * server. The entire event buffer is scanned and interested events are picked,
+ * whereas we _should_ be notified with the events we were interested in
+ * (selected at the time of probe). As of now this is complete BS and needs
+ * fixture ASAP. I just made it work, it needs to be better.
+ *
+ * @FIXME: cleanup this bugger once server filters events.
+ */
+inline void
+gf_changelog_invoke_callback (gf_changelog_t *entry,
+ struct iovec **vec, int payloadcnt)
+{
+ int i = 0;
+ int evsize = 0;
+ xlator_t *this = NULL;
+ changelog_event_t *event = NULL;
+
+ this = entry->this;
+
+ for (; i < payloadcnt; i++) {
+ event = (changelog_event_t *)vec[i]->iov_base;
+ evsize = vec[i]->iov_len / CHANGELOG_EV_SIZE;
+
+ for (; evsize > 0; evsize--, event++) {
+ if (gf_changelog_filter_check (entry, event)) {
+ GF_CHANGELOG_INVOKE_CBK (this,
+ entry->callback,
+ entry->brick,
+ entry->ptr, event);
+ }
+ }
+ }
+}
+
+/**
+ * Ordered event handler is self-adaptive.. if the event sequence number
+ * is what's expected (->next_seq) there is no ordering list that's
+ * maintained. On out-of-order event notifications, event buffers are
+ * dynamically allocated and ordered.
+ */
+
+inline int
+__is_expected_sequence (struct gf_event_list *ev, struct gf_event *event)
+{
+ return (ev->next_seq == event->seq);
+}
+
+inline int
+__can_process_event (struct gf_event_list *ev, struct gf_event **event)
+{
+ *event = list_first_entry (&ev->events, struct gf_event, list);
+
+ if (__is_expected_sequence (ev, *event)) {
+ list_del (&(*event)->list);
+ ev->next_seq++;
+ return 1;
+ }
+
+ return 0;
+}
+
+inline void
+__process_event_list (struct gf_event_list *ev, struct gf_event **event)
+{
+ while (list_empty (&ev->events)
+ || !__can_process_event (ev, event))
+ pthread_cond_wait (&ev->cond, &ev->lock);
+}
+
+void *
+gf_changelog_callback_invoker (void *arg)
+{
+ int ret = 0;
+ xlator_t *this = NULL;
+ gf_changelog_t *entry = NULL;
+ struct iovec *vec = NULL;
+ struct gf_event *event = NULL;
+ struct gf_event_list *ev = NULL;
+
+ ev = arg;
+ entry = ev->entry;
+ this = entry->this;
+
+ while (1) {
+ pthread_mutex_lock (&ev->lock);
+ {
+ __process_event_list (ev, &event);
+ }
+ pthread_mutex_unlock (&ev->lock);
+
+ vec = (struct iovec *) &event->iov;
+ gf_changelog_invoke_callback (entry, &vec, event->count);
+
+ GF_FREE (event);
+ }
+
+ return NULL;
+}
+
+static int
+orderfn (struct list_head *pos1, struct list_head *pos2)
+{
+ struct gf_event *event1 = NULL;
+ struct gf_event *event2 = NULL;
+
+ event1 = list_entry (pos1, struct gf_event, list);
+ event2 = list_entry (pos2, struct gf_event, list);
+
+ if (event1->seq > event2->seq)
+ return 1;
+ return -1;
+}
+
+int
+gf_changelog_ordered_event_handler (rpcsvc_request_t *req,
+ xlator_t *this, gf_changelog_t *entry)
+{
+ int i = 0;
+ size_t payloadlen = 0;
+ ssize_t len = 0;
+ int payloadcnt = 0;
+ changelog_event_req rpc_req = {0,};
+ changelog_event_rsp rpc_rsp = {0,};
+ struct iovec *vec = NULL;
+ struct gf_event *event = NULL;
+ struct gf_event_list *ev = NULL;
+
+ ev = &entry->event;
+
+ len = xdr_to_generic (req->msg[0],
+ &rpc_req, (xdrproc_t)xdr_changelog_event_req);
+ if (len < 0) {
+ gf_log (this->name, GF_LOG_ERROR, "xdr decoding failed");
+ req->rpc_err = GARBAGE_ARGS;
+ goto handle_xdr_error;
+ }
+
+ if (len < req->msg[0].iov_len) {
+ payloadcnt = 1;
+ payloadlen = (req->msg[0].iov_len - len);
+ }
+ for (i = 1; i < req->count; i++) {
+ payloadcnt++;
+ payloadlen += req->msg[i].iov_len;
+ }
+
+ event = GF_CALLOC (1, GF_EVENT_CALLOC_SIZE (payloadcnt, payloadlen),
+ gf_changelog_mt_libgfchangelog_event_t);
+ if (!event)
+ goto handle_xdr_error;
+ INIT_LIST_HEAD (&event->list);
+
+ payloadlen = 0;
+ event->seq = rpc_req.seq;
+ event->count = payloadcnt;
+
+ /* deep copy IO vectors */
+ vec = &event->iov[0];
+ GF_EVENT_ASSIGN_IOVEC (vec, event,
+ (req->msg[0].iov_len - len), payloadlen);
+ (void) memcpy (vec->iov_base,
+ req->msg[0].iov_base + len, vec->iov_len);
+
+ for (i = 1; i < req->count; i++) {
+ vec = &event->iov[i];
+ GF_EVENT_ASSIGN_IOVEC (vec, event,
+ req->msg[i].iov_len, payloadlen);
+ (void) memcpy (event->iov[i].iov_base,
+ req->msg[i].iov_base, req->msg[i].iov_len);
+ }
+
+ gf_log (this->name, GF_LOG_DEBUG,
+ "seq: %lu [%s] (time: %lu.%lu), (vec: %d, len: %ld)",
+ rpc_req.seq, entry->brick, rpc_req.tv_sec,
+ rpc_req.tv_usec, payloadcnt, payloadlen);
+
+ /* add it to the ordered event list and wake up listner(s) */
+ pthread_mutex_lock (&ev->lock);
+ {
+ list_add_order (&event->list, &ev->events, orderfn);
+ if (!ev->next_seq)
+ ev->next_seq = event->seq;
+ if (ev->next_seq == event->seq)
+ pthread_cond_signal (&ev->cond);
+ }
+ pthread_mutex_unlock (&ev->lock);
+
+ /* ack sequence number */
+ rpc_rsp.op_ret = 0;
+ rpc_rsp.seq = rpc_req.seq;
+
+ goto submit_rpc;
+
+ handle_xdr_error:
+ rpc_rsp.op_ret = -1;
+ rpc_rsp.seq = 0; /* invalid */
+ submit_rpc:
+ return changelog_rpc_sumbit_reply (req, &rpc_rsp, NULL, 0, NULL,
+ (xdrproc_t)xdr_changelog_event_rsp);
+}
+
+int
+gf_changelog_unordered_event_handler (rpcsvc_request_t *req,
+ xlator_t *this, gf_changelog_t *entry)
+{
+ int i = 0;
+ int ret = 0;
+ ssize_t len = 0;
+ int payloadcnt = 0;
+ struct iovec vector[MAX_IOVEC] = {{0,}, };
+ changelog_event_req rpc_req = {0,};
+ changelog_event_rsp rpc_rsp = {0,};
+
+ len = xdr_to_generic (req->msg[0],
+ &rpc_req, (xdrproc_t)xdr_changelog_event_req);
+ if (len < 0) {
+ gf_log (this->name, GF_LOG_ERROR, "xdr decoding failed");
+ req->rpc_err = GARBAGE_ARGS;
+ goto handle_xdr_error;
+ }
+
+ /* prepare payload */
+ if (len < req->msg[0].iov_len) {
+ payloadcnt = 1;
+ vector[0].iov_base = (req->msg[0].iov_base + len);
+ vector[0].iov_len = (req->msg[0].iov_len - len);
+ }
+
+ for (i = 1; i < req->count; i++) {
+ vector[payloadcnt++] = req->msg[i];
+ }
+
+ gf_log (this->name, GF_LOG_DEBUG,
+ "seq: %lu (time: %lu.%lu), (vec: %d)",
+ rpc_req.seq, rpc_req.tv_sec, rpc_req.tv_usec, payloadcnt);
+
+ /* invoke callback */
+ struct iovec *vec = (struct iovec *) &vector;
+ gf_changelog_invoke_callback (entry, &vec, payloadcnt);
+
+ /* ack sequence number */
+ rpc_rsp.op_ret = 0;
+ rpc_rsp.seq = rpc_req.seq;
+
+ goto submit_rpc;
+
+ handle_xdr_error:
+ rpc_rsp.op_ret = -1;
+ rpc_rsp.seq = 0; /* invalid */
+ submit_rpc:
+ return changelog_rpc_sumbit_reply (req, &rpc_rsp, NULL, 0, NULL,
+ (xdrproc_t)xdr_changelog_event_rsp);
+}
+
+int
+gf_changelog_reborp_handle_event (rpcsvc_request_t *req)
+{
+ int ret = 0;
+ xlator_t *this = NULL;
+ rpcsvc_t *svc = NULL;
+ gf_changelog_t *entry = NULL;
+
+ svc = rpcsvc_request_service (req);
+ entry = svc->mydata;
+
+ this = THIS = entry->this;
+
+ ret = GF_NEED_ORDERED_EVENTS (entry)
+ ? gf_changelog_ordered_event_handler (req, this, entry)
+ : gf_changelog_unordered_event_handler (req, this, entry);
+
+ return ret;
+}
+
+rpcsvc_actor_t gf_changelog_reborp_actors[CHANGELOG_REV_PROC_MAX] = {
+ [CHANGELOG_REV_PROC_EVENT] = {
+ "CHANGELOG EVENT HANDLER", CHANGELOG_REV_PROC_EVENT,
+ gf_changelog_reborp_handle_event, NULL, 0, DRC_NA
+ },
+};
+
+/**
+ * Do not use synctask as the RPC layer dereferences ->mydata as THIS.
+ * In gf_changelog_setup_rpc(), @cbkdata is of type @gf_changelog_t,
+ * and that's required to invoke the callback with the appropriate
+ * brick path and it's private data.
+ */
+struct rpcsvc_program gf_changelog_reborp_prog = {
+ .progname = "LIBGFCHANGELOG REBORP",
+ .prognum = CHANGELOG_REV_RPC_PROCNUM,
+ .progver = CHANGELOG_REV_RPC_PROCVER,
+ .numactors = CHANGELOG_REV_PROC_MAX,
+ .actors = gf_changelog_reborp_actors,
+ .synctask = _gf_false,
+};
+
+struct rpcsvc_program *gf_changelog_reborp_programs[] = {
+ &gf_changelog_reborp_prog,
+ NULL,
+};