summaryrefslogtreecommitdiffstats
path: root/xlators
diff options
context:
space:
mode:
authorVenky Shankar <vshankar@redhat.com>2015-02-03 19:22:16 +0530
committerVijay Bellur <vbellur@redhat.com>2015-03-18 18:22:36 -0700
commit4737584fffcd25dbe35d17b076c95bf90a422cf2 (patch)
tree9f30e0e90c88c245787b78af3ca78d7ae05e30f2 /xlators
parent728fcd41eb39f66744d84b979dd8195fd47313ed (diff)
features/changelog: RPC'fy {libgf}changelog
This patch introduces RPC based communication between the changelog translator and libgfchangelog. It replaces the old pathetic stream based interaction that existed earlier (due to time constraints :-/). Changelog, upon initialization starts a RPC server (rpcsvc) allowing clients to invoke a probe API as a bootup mechanism to request for event notifications. During probe, clients can choose an event filter specifying the type(s) of events they are interested in. As of now there is no way to change the event notification set once the probe RPC call is made, but that is easier to implement. The actual event notifications is done on a separate RPC session. The client (libgfchangelog) itself starts and RPC server which the changelog translator "connects back" during probe. Notifications are dispatched by a bunch of threads from the server (translator) and the client optionally orders them if ordered notifications are requried. FOPs fill in their respective event details in a buffer (rot-buffs to be particular) and a bunch of threads (consumers) swap the buffers out of roatation and dispatch them via RPC. To avoid writer starvation, then number of dispatcher threads is one less than the number of buffer list in rot-buffs.x libgfchangelog becomes purely callback based -- upon event notification from the server (and re-ordering them if required) invoke a callback routine specified by consumer(s). A major part of the patch is also aimed at providing backward compatibility for geo-replication, which was one of the main consumer of the stream based API. Also, this patch does not\ "turn on" event notifications for all fops, just a bunch which is currently in requirement. Another pain point is that the server does not filter events before dispatching it to the clients. That load is taken up by the client itself (although it's done at the library layer rather than making it hard on the callback implementor). This needs improvement and care needs to be taken to not load the server up with expensive filtering mechanisms. Change-Id: Ibf60a432b68f2dfa60c6f9add2bcfd37a9c41395 BUG: 1170075 Signed-off-by: Venky Shankar <vshankar@redhat.com> Reviewed-on: http://review.gluster.org/9708 Reviewed-by: Jeff Darcy <jdarcy@redhat.com> Tested-by: Gluster Build System <jenkins@build.gluster.com> Reviewed-by: Vijay Bellur <vbellur@redhat.com>
Diffstat (limited to 'xlators')
-rw-r--r--xlators/features/changelog/lib/examples/c/get-changes-multi.c84
-rw-r--r--xlators/features/changelog/lib/examples/c/get-changes.c2
-rw-r--r--xlators/features/changelog/lib/examples/c/get-history.c7
-rw-r--r--xlators/features/changelog/lib/src/Makefile.am30
-rw-r--r--xlators/features/changelog/lib/src/changelog.h72
-rw-r--r--xlators/features/changelog/lib/src/gf-changelog-api.c224
-rw-r--r--xlators/features/changelog/lib/src/gf-changelog-helpers.c32
-rw-r--r--xlators/features/changelog/lib/src/gf-changelog-helpers.h194
-rw-r--r--xlators/features/changelog/lib/src/gf-changelog-journal-handler.c (renamed from xlators/features/changelog/lib/src/gf-changelog-process.c)520
-rw-r--r--xlators/features/changelog/lib/src/gf-changelog-journal.h114
-rw-r--r--xlators/features/changelog/lib/src/gf-changelog-reborp.c381
-rw-r--r--xlators/features/changelog/lib/src/gf-changelog-rpc.c105
-rw-r--r--xlators/features/changelog/lib/src/gf-changelog-rpc.h (renamed from xlators/features/changelog/src/changelog-notifier.h)17
-rw-r--r--xlators/features/changelog/lib/src/gf-changelog.c862
-rw-r--r--xlators/features/changelog/lib/src/gf-history-changelog.c161
-rw-r--r--xlators/features/changelog/src/Makefile.am18
-rw-r--r--xlators/features/changelog/src/changelog-encoders.c2
-rw-r--r--xlators/features/changelog/src/changelog-ev-handle.c382
-rw-r--r--xlators/features/changelog/src/changelog-ev-handle.h140
-rw-r--r--xlators/features/changelog/src/changelog-helpers.c210
-rw-r--r--xlators/features/changelog/src/changelog-helpers.h107
-rw-r--r--xlators/features/changelog/src/changelog-mem-types.h25
-rw-r--r--xlators/features/changelog/src/changelog-misc.h14
-rw-r--r--xlators/features/changelog/src/changelog-notifier.c314
-rw-r--r--xlators/features/changelog/src/changelog-rpc-common.c334
-rw-r--r--xlators/features/changelog/src/changelog-rpc-common.h84
-rw-r--r--xlators/features/changelog/src/changelog-rpc.c300
-rw-r--r--xlators/features/changelog/src/changelog-rpc.h29
-rw-r--r--xlators/features/changelog/src/changelog.c473
29 files changed, 3936 insertions, 1301 deletions
diff --git a/xlators/features/changelog/lib/examples/c/get-changes-multi.c b/xlators/features/changelog/lib/examples/c/get-changes-multi.c
new file mode 100644
index 00000000000..8f23c81c2a0
--- /dev/null
+++ b/xlators/features/changelog/lib/examples/c/get-changes-multi.c
@@ -0,0 +1,84 @@
+/*
+ Copyright (c) 2013 Red Hat, Inc. <http://www.redhat.com>
+ This file is part of GlusterFS.
+
+ This file is licensed to you under your choice of the GNU Lesser
+ General Public License, version 3 or any later version (LGPLv3 or
+ later), or the GNU General Public License, version 2 (GPLv2), in all
+ cases as published by the Free Software Foundation.
+*/
+
+/**
+ * Compile it using:
+ * gcc -o getchanges-multi `pkg-config --cflags libgfchangelog` \
+ * get-changes-multi.c `pkg-config --libs libgfchangelog`
+ */
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <sys/un.h>
+#include <limits.h>
+#include <sys/socket.h>
+#include <sys/types.h>
+#include <errno.h>
+
+#include "changelog.h"
+
+void *brick_init (void *xl, struct gf_brick_spec *brick)
+{
+ return brick;
+}
+
+void brick_fini (void *xl, char *brick, void *data)
+{
+ return;
+}
+
+void brick_callback (void *xl, char *brick,
+ void *data, changelog_event_t *ev)
+{
+ printf ("->callback: (brick,type) [%s:%d]\n", brick, ev->ev_type);
+}
+
+void fill_brick_spec (struct gf_brick_spec *brick, char *path)
+{
+ brick->brick_path = strdup (path);
+ brick->filter = CHANGELOG_OP_TYPE_RELEASE;
+
+ brick->init = brick_init;
+ brick->fini = brick_fini;
+ brick->callback = brick_callback;
+ brick->connected = NULL;
+ brick->disconnected = NULL;
+}
+
+int
+main (int argc, char **argv)
+{
+ int ret = 0;
+ void *bricks = NULL;
+ struct gf_brick_spec *brick = NULL;
+
+ bricks = calloc (2, sizeof (struct gf_brick_spec));
+ if (!bricks)
+ goto error_return;
+
+ brick = (struct gf_brick_spec *)bricks;
+ fill_brick_spec (brick, "/export/z1/zwoop");
+
+ brick++;
+ fill_brick_spec (brick, "/export/z2/zwoop");
+
+ ret = gf_changelog_register_generic ((struct gf_brick_spec *)bricks, 2,
+ 1, "/tmp/multi-changes.log", 9,
+ NULL);
+ if (ret)
+ goto error_return;
+
+ /* let callbacks do the job */
+ select (0, NULL, NULL, NULL, NULL);
+
+ error_return:
+ return -1;
+}
diff --git a/xlators/features/changelog/lib/examples/c/get-changes.c b/xlators/features/changelog/lib/examples/c/get-changes.c
index 6d0d0357db9..0b2808c7e35 100644
--- a/xlators/features/changelog/lib/examples/c/get-changes.c
+++ b/xlators/features/changelog/lib/examples/c/get-changes.c
@@ -40,7 +40,7 @@ main (int argc, char ** argv)
char fbuf[PATH_MAX] = {0,};
/* get changes for brick "/home/vshankar/export/yow/yow-1" */
- ret = gf_changelog_register ("/home/vshankar/exports/yow/yow-1",
+ ret = gf_changelog_register ("/export/z1/zwoop",
"/tmp/scratch", "/tmp/change.log", 9, 5);
if (ret) {
handle_error ("register failed");
diff --git a/xlators/features/changelog/lib/examples/c/get-history.c b/xlators/features/changelog/lib/examples/c/get-history.c
index 33eb8c32d4d..2e1ff3c767f 100644
--- a/xlators/features/changelog/lib/examples/c/get-history.c
+++ b/xlators/features/changelog/lib/examples/c/get-history.c
@@ -40,8 +40,8 @@ main (int argc, char ** argv)
char fbuf[PATH_MAX] = {0,};
unsigned long end_ts = 0;
- ret = gf_changelog_register ("/export1/v1/b1",
- "/tmp/scratch_v1", "/tmp/scratch_v1/changes.log",
+ ret = gf_changelog_register ("/export/z1/zwoop",
+ "/tmp/scratch_v1", "/tmp/changes.log",
9, 5);
if (ret) {
handle_error ("register failed");
@@ -51,7 +51,8 @@ main (int argc, char ** argv)
int a, b;
printf ("give the two numbers start and end\t");
scanf ("%d%d", &a, &b);
- ret = gf_history_changelog ("/export1/v1/b1/.glusterfs/changelogs",a, b, 3, &end_ts);
+ ret = gf_history_changelog ("/export/z1/zwoop/.glusterfs/changelogs",
+ a, b, 3, &end_ts);
if (ret == -1) {
printf ("history failed");
goto out;
diff --git a/xlators/features/changelog/lib/src/Makefile.am b/xlators/features/changelog/lib/src/Makefile.am
index 1ae919bfb38..306306bd585 100644
--- a/xlators/features/changelog/lib/src/Makefile.am
+++ b/xlators/features/changelog/lib/src/Makefile.am
@@ -4,9 +4,13 @@ libgfchangelog_la_CFLAGS = -Wall $(GF_CFLAGS) $(GF_DARWIN_LIBGLUSTERFS_CFLAGS) \
libgfchangelog_la_CPPFLAGS = $(GF_CPPFLAGS) -D__USE_FILE_OFFSET64 -fpic \
-I../../../src/ -I$(top_srcdir)/libglusterfs/src \
-I$(top_srcdir)/xlators/features/changelog/src \
+ -I$(top_srcdir)/rpc/xdr/src -I$(top_srcdir)/rpc/rpc-lib/src \
+ -I$(top_srcdir)/rpc/rpc-transport/socket/src \
-DDATADIR=\"$(localstatedir)\"
-libgfchangelog_la_LIBADD = $(top_builddir)/libglusterfs/src/libglusterfs.la
+libgfchangelog_la_LIBADD = $(top_builddir)/libglusterfs/src/libglusterfs.la \
+ $(top_builddir)/rpc/xdr/src/libgfxdr.la \
+ $(top_builddir)/rpc/rpc-lib/src/libgfrpc.la
libgfchangelog_la_LDFLAGS = $(GF_LDFLAGS) -version-info $(LIBGFCHANGELOG_LT_VERSION)
@@ -15,18 +19,18 @@ lib_LTLIBRARIES = libgfchangelog.la
CONTRIB_BUILDDIR = $(top_builddir)/contrib
-libgfchangelog_la_SOURCES = gf-changelog.c gf-changelog-process.c \
- gf-changelog-helpers.c gf-history-changelog.c \
- $(CONTRIBDIR)/uuid/clear.c \
- $(CONTRIBDIR)/uuid/copy.c $(CONTRIBDIR)/uuid/gen_uuid.c \
- $(CONTRIBDIR)/uuid/pack.c $(CONTRIBDIR)/uuid/parse.c \
- $(CONTRIBDIR)/uuid/unparse.c $(CONTRIBDIR)/uuid/uuid_time.c \
- $(CONTRIBDIR)/uuid/compare.c $(CONTRIBDIR)/uuid/isnull.c \
- $(CONTRIBDIR)/uuid/unpack.c
-
-noinst_HEADERS = gf-changelog-helpers.h $(CONTRIBDIR)/uuid/uuidd.h \
- $(CONTRIBDIR)/uuid/uuid.h $(CONTRIBDIR)/uuid/uuidP.h \
- $(CONTRIB_BUILDDIR)/uuid/uuid_types.h
+libgfchangelog_la_SOURCES = gf-changelog.c gf-changelog-journal-handler.c gf-changelog-helpers.c \
+ gf-changelog-api.c gf-history-changelog.c gf-changelog-rpc.c gf-changelog-reborp.c \
+ $(top_srcdir)/xlators/features/changelog/src/changelog-rpc-common.c \
+ $(CONTRIBDIR)/uuid/clear.c $(CONTRIBDIR)/uuid/copy.c \
+ $(CONTRIBDIR)/uuid/gen_uuid.c $(CONTRIBDIR)/uuid/pack.c \
+ $(CONTRIBDIR)/uuid/parse.c $(CONTRIBDIR)/uuid/unparse.c \
+ $(CONTRIBDIR)/uuid/uuid_time.c $(CONTRIBDIR)/uuid/compare.c \
+ $(CONTRIBDIR)/uuid/isnull.c $(CONTRIBDIR)/uuid/unpack.c
+
+noinst_HEADERS = gf-changelog-helpers.h gf-changelog-rpc.h gf-changelog-journal.h \
+ $(CONTRIBDIR)/uuid/uuidd.h $(CONTRIBDIR)/uuid/uuid.h \
+ $(CONTRIBDIR)/uuid/uuidP.h $(CONTRIB_BUILDDIR)/uuid/uuid_types.h
libgfchangelog_HEADERS = changelog.h
diff --git a/xlators/features/changelog/lib/src/changelog.h b/xlators/features/changelog/lib/src/changelog.h
index 5cddfb5839c..d7048ff2508 100644
--- a/xlators/features/changelog/lib/src/changelog.h
+++ b/xlators/features/changelog/lib/src/changelog.h
@@ -11,6 +11,73 @@
#ifndef _GF_CHANGELOG_H
#define _GF_CHANGELOG_H
+struct gf_brick_spec;
+
+/**
+ * Max bit shiter for event selection
+ */
+#define CHANGELOG_EV_SELECTION_RANGE 4
+
+#define CHANGELOG_OP_TYPE_JOURNAL (1<<0)
+#define CHANGELOG_OP_TYPE_OPEN (1<<1)
+#define CHANGELOG_OP_TYPE_CREATE (1<<2)
+#define CHANGELOG_OP_TYPE_RELEASE (1<<3)
+#define CHANGELOG_OP_TYPE_MAX (1<<CHANGELOG_EV_SELECTION_RANGE)
+
+
+struct ev_open {
+ unsigned char gfid[16];
+ int32_t flags;
+};
+
+struct ev_creat {
+ unsigned char gfid[16];
+ int32_t flags;
+};
+
+struct ev_release {
+ unsigned char gfid[16];
+};
+
+struct ev_changelog {
+ char path[PATH_MAX];
+};
+
+typedef struct changelog_event {
+ unsigned int ev_type;
+ union {
+ struct ev_open open;
+ struct ev_creat create;
+ struct ev_release release;
+ struct ev_changelog journal;
+ } u;
+} changelog_event_t;
+
+#define CHANGELOG_EV_SIZE (sizeof (changelog_event_t))
+
+/**
+ * event callback, connected & disconnection defs
+ */
+typedef void (CALLBACK) (void *, char *,
+ void *, changelog_event_t *);
+typedef void *(INIT) (void *, struct gf_brick_spec *);
+typedef void (FINI) (void *, char *, void *);
+typedef void (CONNECT) (void *, char *, void *);
+typedef void (DISCONNECT) (void *, char *, void *);
+
+struct gf_brick_spec {
+ char *brick_path;
+ unsigned int filter;
+
+ INIT *init;
+ FINI *fini;
+ CALLBACK *callback;
+ CONNECT *connected;
+ DISCONNECT *disconnected;
+
+ void *ptr;
+};
+
/* API set */
int
@@ -28,4 +95,9 @@ gf_changelog_next_change (char *bufptr, size_t maxlen);
int
gf_changelog_done (char *file);
+/* newer flexible API */
+int
+gf_changelog_register_generic (struct gf_brick_spec *bricks, int count,
+ int ordered, char *logfile, int lvl, void *xl);
+
#endif
diff --git a/xlators/features/changelog/lib/src/gf-changelog-api.c b/xlators/features/changelog/lib/src/gf-changelog-api.c
new file mode 100644
index 00000000000..cea2ff01988
--- /dev/null
+++ b/xlators/features/changelog/lib/src/gf-changelog-api.c
@@ -0,0 +1,224 @@
+/*
+ Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com>
+ This file is part of GlusterFS.
+
+ This file is licensed to you under your choice of the GNU Lesser
+ General Public License, version 3 or any later version (LGPLv3 or
+ later), or the GNU General Public License, version 2 (GPLv2), in all
+ cases as published by the Free Software Foundation.
+*/
+
+#include "uuid.h"
+#include "globals.h"
+#include "glusterfs.h"
+
+#include "gf-changelog-helpers.h"
+#include "gf-changelog-journal.h"
+#include "changelog-mem-types.h"
+
+int
+gf_changelog_done (char *file)
+{
+ int ret = -1;
+ char *buffer = NULL;
+ xlator_t *this = NULL;
+ gf_changelog_journal_t *jnl = NULL;
+ char to_path[PATH_MAX] = {0,};
+
+ errno = EINVAL;
+
+ this = THIS;
+ if (!this)
+ goto out;
+
+ jnl = (gf_changelog_journal_t *) GF_CHANGELOG_GET_API_PTR (this);
+ if (!jnl)
+ goto out;
+
+ if (!file || !strlen (file))
+ goto out;
+
+ /* make sure 'file' is inside ->jnl_working_dir */
+ buffer = realpath (file, NULL);
+ if (!buffer)
+ goto out;
+
+ if (strncmp (jnl->jnl_working_dir,
+ buffer, strlen (jnl->jnl_working_dir)))
+ goto out;
+
+ (void) snprintf (to_path, PATH_MAX, "%s%s",
+ jnl->jnl_processed_dir, basename (buffer));
+ gf_log (this->name, GF_LOG_DEBUG,
+ "moving %s to processed directory", file);
+ ret = rename (buffer, to_path);
+ if (ret) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "cannot move %s to %s (reason: %s)",
+ file, to_path, strerror (errno));
+ goto out;
+ }
+
+ ret = 0;
+
+ out:
+ if (buffer)
+ free (buffer); /* allocated by realpath() */
+ return ret;
+}
+
+/**
+ * @API
+ * for a set of changelogs, start from the beginning
+ */
+int
+gf_changelog_start_fresh ()
+{
+ xlator_t *this = NULL;
+ gf_changelog_journal_t *jnl = NULL;
+
+ this = THIS;
+ if (!this)
+ goto out;
+
+ errno = EINVAL;
+
+ jnl = (gf_changelog_journal_t *) GF_CHANGELOG_GET_API_PTR (this);
+ if (!jnl)
+ goto out;
+
+ if (gf_ftruncate (jnl->jnl_fd, 0))
+ goto out;
+
+ return 0;
+
+ out:
+ return -1;
+}
+
+/**
+ * @API
+ * return the next changelog file entry. zero means all chanelogs
+ * consumed.
+ */
+ssize_t
+gf_changelog_next_change (char *bufptr, size_t maxlen)
+{
+ ssize_t size = -1;
+ int tracker_fd = 0;
+ xlator_t *this = NULL;
+ gf_changelog_journal_t *jnl = NULL;
+ char buffer[PATH_MAX] = {0,};
+
+ errno = EINVAL;
+
+ this = THIS;
+ if (!this)
+ goto out;
+
+ jnl = (gf_changelog_journal_t *) GF_CHANGELOG_GET_API_PTR (this);
+ if (!jnl)
+ goto out;
+
+ tracker_fd = jnl->jnl_fd;
+
+ size = gf_readline (tracker_fd, buffer, maxlen);
+ if (size < 0) {
+ size = -1;
+ goto out;
+ }
+
+ if (size == 0)
+ goto out;
+
+ memcpy (bufptr, buffer, size - 1);
+ bufptr[size - 1] = '\0';
+
+out:
+ return size;
+}
+
+/**
+ * @API
+ * gf_changelog_scan() - scan and generate a list of change entries
+ *
+ * calling this api multiple times (without calling gf_changlog_done())
+ * would result new changelogs(s) being refreshed in the tracker file.
+ * This call also acts as a cancellation point for the consumer.
+ */
+ssize_t
+gf_changelog_scan ()
+{
+ int ret = 0;
+ int tracker_fd = 0;
+ size_t len = 0;
+ size_t off = 0;
+ xlator_t *this = NULL;
+ size_t nr_entries = 0;
+ gf_changelog_journal_t *jnl = NULL;
+ struct dirent *entryp = NULL;
+ struct dirent *result = NULL;
+ char buffer[PATH_MAX] = {0,};
+
+ this = THIS;
+ if (!this)
+ goto out;
+
+ jnl = (gf_changelog_journal_t *) GF_CHANGELOG_GET_API_PTR (this);
+ if (!jnl)
+ goto out;
+ if (JNL_IS_API_DISCONNECTED (jnl)) {
+ errno = ENOTCONN;
+ goto out;
+ }
+
+ errno = EINVAL;
+
+ tracker_fd = jnl->jnl_fd;
+ if (gf_ftruncate (tracker_fd, 0))
+ goto out;
+
+ len = offsetof(struct dirent, d_name)
+ + pathconf(jnl->jnl_processing_dir, _PC_NAME_MAX) + 1;
+ entryp = GF_CALLOC (1, len,
+ gf_changelog_mt_libgfchangelog_dirent_t);
+ if (!entryp)
+ goto out;
+
+ rewinddir (jnl->jnl_dir);
+ while (1) {
+ ret = readdir_r (jnl->jnl_dir, entryp, &result);
+ if (ret || !result)
+ break;
+
+ if (!strcmp (basename (entryp->d_name), ".")
+ || !strcmp (basename (entryp->d_name), ".."))
+ continue;
+
+ nr_entries++;
+
+ GF_CHANGELOG_FILL_BUFFER (jnl->jnl_processing_dir,
+ buffer, off,
+ strlen (jnl->jnl_processing_dir));
+ GF_CHANGELOG_FILL_BUFFER (entryp->d_name, buffer,
+ off, strlen (entryp->d_name));
+ GF_CHANGELOG_FILL_BUFFER ("\n", buffer, off, 1);
+
+ if (gf_changelog_write (tracker_fd, buffer, off) != off) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "error writing changelog filename"
+ " to tracker file");
+ break;
+ }
+ off = 0;
+ }
+
+ GF_FREE (entryp);
+
+ if (!result) {
+ if (gf_lseek (tracker_fd, 0, SEEK_SET) != -1)
+ return nr_entries;
+ }
+ out:
+ return -1;
+}
diff --git a/xlators/features/changelog/lib/src/gf-changelog-helpers.c b/xlators/features/changelog/lib/src/gf-changelog-helpers.c
index f071b057d59..6bf709dc664 100644
--- a/xlators/features/changelog/lib/src/gf-changelog-helpers.c
+++ b/xlators/features/changelog/lib/src/gf-changelog-helpers.c
@@ -178,3 +178,35 @@ gf_ftruncate (int fd, off_t length)
return 0;
}
+
+int
+gf_thread_cleanup (xlator_t *this, pthread_t thread)
+{
+ int ret = 0;
+ void *res = NULL;
+
+ ret = pthread_cancel (thread);
+ if (ret != 0) {
+ gf_log (this->name, GF_LOG_WARNING,
+ "Failed to send cancellation to thread");
+ goto error_return;
+ }
+
+ ret = pthread_join (thread, &res);
+ if (ret != 0) {
+ gf_log (this->name, GF_LOG_WARNING,
+ "failed to join thread");
+ goto error_return;
+ }
+
+ if (res != PTHREAD_CANCELED) {
+ gf_log (this->name, GF_LOG_WARNING,
+ "Thread could not be cleaned up");
+ goto error_return;
+ }
+
+ return 0;
+
+ error_return:
+ return -1;
+}
diff --git a/xlators/features/changelog/lib/src/gf-changelog-helpers.h b/xlators/features/changelog/lib/src/gf-changelog-helpers.h
index 9b875d45dcc..17b8862a89b 100644
--- a/xlators/features/changelog/lib/src/gf-changelog-helpers.h
+++ b/xlators/features/changelog/lib/src/gf-changelog-helpers.h
@@ -18,6 +18,11 @@
#include <xlator.h>
+#include "changelog.h"
+
+#include "changelog-rpc-common.h"
+#include "gf-changelog-journal.h"
+
#define GF_CHANGELOG_TRACKER "tracker"
#define GF_CHANGELOG_CURRENT_DIR ".current"
@@ -41,79 +46,143 @@ typedef struct read_line {
char rl_buf[MAXLINE];
} read_line_t;
+struct gf_changelog;
+
+/**
+ * Event list for ordered event notification
+ *
+ * ->next_seq holds the next _expected_ sequence number.
+ */
+struct gf_event_list {
+ pthread_mutex_t lock; /* protects this structure */
+ pthread_cond_t cond;
+
+ pthread_t invoker;
+
+ unsigned long next_seq; /* next sequence number expected:
+ zero during bootstrap */
+
+ struct gf_changelog *entry; /* backpointer to it's brick
+ encapsulator (entry) */
+ struct list_head events; /* list of events (ordered) */
+};
+
+/**
+ * include a refcount if it's of use by additional layers
+ */
+struct gf_event {
+ int count;
+
+ unsigned long seq;
+
+ struct list_head list;
+
+ struct iovec iov[0];
+};
+#define GF_EVENT_CALLOC_SIZE(cnt, len) \
+ (sizeof (struct gf_event) + (cnt * sizeof (struct iovec)) + len)
+
+/**
+ * assign the base address of the IO vector to the correct memory
+ * area and set it's addressable length.
+ */
+#define GF_EVENT_ASSIGN_IOVEC(vec, event, len, pos) \
+ do { \
+ vec->iov_base = ((char *)event) + \
+ sizeof (struct gf_event) + \
+ (event->count * sizeof (struct iovec)) + pos; \
+ vec->iov_len = len; \
+ pos += len; \
+ } while (0)
+
+/**
+ * An instance of this structure is allocated for each brick for which
+ * notifications are streamed.
+ */
typedef struct gf_changelog {
xlator_t *this;
- /* 'processing' directory stream */
- DIR *gfc_dir;
-
- /* fd to the tracker file */
- int gfc_fd;
-
- /* connection retries */
- int gfc_connretries;
+ struct list_head list; /* list of instances */
- char gfc_sockpath[UNIX_PATH_MAX];
+ char brick[PATH_MAX]; /* brick path for this end-point */
- char gfc_brickpath[PATH_MAX];
+ changelog_rpc_t grpc; /* rpc{-clnt,svc} for this brick */
+#define RPC_PROBER(ent) ent->grpc.rpc
+#define RPC_REBORP(ent) ent->grpc.svc
+#define RPC_SOCK(ent) ent->grpc.sock
- /* socket for receiving notifications */
- int gfc_sockfd;
+ unsigned int notify; /* notification flag(s) */
- char *gfc_working_dir;
+ FINI *fini; /* destructor callback */
+ CALLBACK *callback; /* event callback dispatcher */
+ CONNECT *connected; /* connect callback */
+ DISCONNECT *disconnected; /* disconnection callback */
- /* RFC 3986 string encoding */
- char rfc3986[256];
+ void *ptr; /* owner specific private data */
+ xlator_t *invokerxl; /* consumers _this_, if valid,
+ assigned to THIS before cbk is
+ invoked */
- char gfc_current_dir[PATH_MAX];
- char gfc_processed_dir[PATH_MAX];
- char gfc_processing_dir[PATH_MAX];
+ gf_boolean_t ordered;
- pthread_t gfc_changelog_processor;
-
- /* Holds gfc for History API */
- struct gf_changelog *hist_gfc;
-
- /* holds 0 done scanning, 1 keep scanning and -1 error */
- int hist_done;
+ struct gf_event_list event;
} gf_changelog_t;
-typedef struct gf_changelog_history_data {
- int len;
-
- int htime_fd;
-
- /* parallelism count */
- int n_parallel;
-
- /* history from, to indexes */
- unsigned long from;
- unsigned long to;
-} gf_changelog_history_data_t;
-
-typedef struct gf_changelog_consume_data {
- /** set of inputs */
-
- /* fd to read from */
- int fd;
-
- /* from @offset */
- off_t offset;
-
- xlator_t *this;
- gf_changelog_t *gfc;
-
- /** set of outputs */
+static inline int
+gf_changelog_filter_check (gf_changelog_t *entry, changelog_event_t *event)
+{
+ if (event->ev_type & entry->notify)
+ return 1;
+ return 0;
+}
+
+#define GF_NEED_ORDERED_EVENTS(ent) (ent->ordered == _gf_true)
+
+/** private structure */
+typedef struct gf_private {
+ gf_lock_t lock; /* protects ->connections */
+
+ void *api; /* pointer for API access */
+
+ pthread_t poller; /* event poller thread */
+
+ struct list_head connections; /* list of connections */
+} gf_private_t;
+
+#define GF_CHANGELOG_GET_API_PTR(this) (((gf_private_t *) this->private)->api)
+
+/**
+ * upcall: invoke callback with _correct_ THIS
+ */
+#define GF_CHANGELOG_INVOKE_CBK(this, cbk, brick, args ...) \
+ do { \
+ xlator_t *old_this = NULL; \
+ xlator_t *invokerxl = NULL; \
+ \
+ invokerxl = entry->invokerxl; \
+ old_this = this; \
+ \
+ if (invokerxl) { \
+ THIS = invokerxl; \
+ } \
+ \
+ cbk (invokerxl, brick, args); \
+ THIS = old_this; \
+ \
+ } while (0)
- /* return value */
- int retval;
+#define SAVE_THIS(xl) \
+ do { \
+ old_this = xl; \
+ THIS = master; \
+ } while (0)
- /* journal processed */
- char changelog[PATH_MAX];
-} gf_changelog_consume_data_t;
+#define RESTORE_THIS() \
+ do { \
+ THIS = old_this; \
+ } while (0)
-int
-gf_changelog_notification_init (xlator_t *this, gf_changelog_t *gfc);
+/** APIs and the rest */
void *
gf_changelog_process (void *data);
@@ -138,9 +207,14 @@ gf_lseek (int fd, off_t offset, int whence);
int
gf_changelog_consume (xlator_t *this,
- gf_changelog_t *gfc,
+ gf_changelog_journal_t *jnl,
char *from_path, gf_boolean_t no_publish);
int
-gf_changelog_publish (xlator_t *this, gf_changelog_t *gfc, char *from_path);
+gf_changelog_publish (xlator_t *this,
+ gf_changelog_journal_t *jnl, char *from_path);
+int
+gf_thread_cleanup (xlator_t *this, pthread_t thread);
+void *
+gf_changelog_callback_invoker (void *arg);
#endif
diff --git a/xlators/features/changelog/lib/src/gf-changelog-process.c b/xlators/features/changelog/lib/src/gf-changelog-journal-handler.c
index 1a275e676fb..6ee6f9f074f 100644
--- a/xlators/features/changelog/lib/src/gf-changelog-process.c
+++ b/xlators/features/changelog/lib/src/gf-changelog-journal-handler.c
@@ -1,5 +1,5 @@
/*
- Copyright (c) 2013 Red Hat, Inc. <http://www.redhat.com>
+ Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com>
This file is part of GlusterFS.
This file is licensed to you under your choice of the GNU Lesser
@@ -8,9 +8,6 @@
cases as published by the Free Software Foundation.
*/
-#include <unistd.h>
-#include <pthread.h>
-
#include "uuid.h"
#include "globals.h"
#include "glusterfs.h"
@@ -19,6 +16,9 @@
/* from the changelog translator */
#include "changelog-misc.h"
+#include "changelog-mem-types.h"
+
+#include "gf-changelog-journal.h"
extern int byebye;
@@ -98,7 +98,7 @@ conv_noop (char *ptr) { return ptr; }
MOVER_MOVE (mover, nleft, sizeof (uuid_t)); \
} \
-#define LINE_BUFSIZE 3*PATH_MAX /* enough buffer for extra chars too */
+#define LINE_BUFSIZE (3*PATH_MAX) /* enough buffer for extra chars too */
/**
* using mmap() makes parsing easy. fgets() cannot be used here as
@@ -114,7 +114,8 @@ conv_noop (char *ptr) { return ptr; }
static int
gf_changelog_parse_binary (xlator_t *this,
- gf_changelog_t *gfc, int from_fd, int to_fd,
+ gf_changelog_journal_t *jnl,
+ int from_fd, int to_fd,
size_t start_offset, struct stat *stbuf)
{
@@ -165,7 +166,8 @@ gf_changelog_parse_binary (xlator_t *this,
PARSE_GFID_MOVE (ptr, uuid, mover, nleft, parse_err);
bname_start = mover;
- if ( (bname_end = strchr (mover, '\n')) == NULL ) {
+ bname_end = strchr (mover, '\n');
+ if (bname_end == NULL) {
parse_err = 1;
break;
}
@@ -201,7 +203,7 @@ gf_changelog_parse_binary (xlator_t *this,
MOVER_MOVE (mover, nleft, 1);
}
- if ( (nleft == 0) && (!parse_err))
+ if ((nleft == 0) && (!parse_err))
ret = 0;
if (munmap (start, stbuf->st_size))
@@ -218,7 +220,8 @@ gf_changelog_parse_binary (xlator_t *this,
*/
static int
gf_changelog_parse_ascii (xlator_t *this,
- gf_changelog_t *gfc, int from_fd, int to_fd,
+ gf_changelog_journal_t *jnl,
+ int from_fd, int to_fd,
size_t start_offset, struct stat *stbuf)
{
int ng = 0;
@@ -281,7 +284,8 @@ gf_changelog_parse_ascii (xlator_t *this,
VERIFY_SEPARATOR (mover, len, parse_err);
fop = atoi (mover);
- if ( (fopname = gf_fop_list[fop]) == NULL) {
+ fopname = gf_fop_list[fop];
+ if (fopname == NULL) {
parse_err = 1;
break;
}
@@ -309,7 +313,8 @@ gf_changelog_parse_ascii (xlator_t *this,
VERIFY_SEPARATOR (mover, len, parse_err);
fop = atoi (mover);
- if ( (fopname = gf_fop_list[fop]) == NULL) {
+ fopname = gf_fop_list[fop];
+ if (fopname == NULL) {
parse_err = 1;
break;
}
@@ -320,7 +325,7 @@ gf_changelog_parse_ascii (xlator_t *this,
GF_CHANGELOG_FILL_BUFFER (fopname, ascii, off, len);
ng = nr_extra_recs[fop];
- for (;ng > 0; ng--) {
+ for (; ng > 0; ng--) {
MOVER_MOVE (mover, nleft, 1);
len = strlen (mover);
VERIFY_SEPARATOR (mover, len, parse_err);
@@ -346,7 +351,7 @@ gf_changelog_parse_ascii (xlator_t *this,
}
gf_rfc3986_encode ((unsigned char *) ptr,
- eptr, gfc->rfc3986);
+ eptr, jnl->rfc3986);
FILL_AND_MOVE (eptr, ascii, off,
mover, nleft, len);
free (eptr);
@@ -374,7 +379,7 @@ gf_changelog_parse_ascii (xlator_t *this,
}
- if ( (nleft == 0) && (!parse_err))
+ if ((nleft == 0) && (!parse_err))
ret = 0;
if (munmap (start, stbuf->st_size))
@@ -410,8 +415,8 @@ gf_changelog_copy (xlator_t *this, int from_fd, int to_fd)
}
static int
-gf_changelog_decode (xlator_t *this, gf_changelog_t *gfc, int from_fd,
- int to_fd, struct stat *stbuf, int *zerob)
+gf_changelog_decode (xlator_t *this, gf_changelog_journal_t *jnl,
+ int from_fd, int to_fd, struct stat *stbuf, int *zerob)
{
int ret = -1;
int encoding = -1;
@@ -441,12 +446,12 @@ gf_changelog_decode (xlator_t *this, gf_changelog_t *gfc, int from_fd,
* this ideally should have been a part of changelog-encoders.c
* (ie. part of the changelog translator).
*/
- ret = gf_changelog_parse_binary (this, gfc, from_fd,
+ ret = gf_changelog_parse_binary (this, jnl, from_fd,
to_fd, elen, stbuf);
break;
case CHANGELOG_ENCODE_ASCII:
- ret = gf_changelog_parse_ascii (this, gfc, from_fd,
+ ret = gf_changelog_parse_ascii (this, jnl, from_fd,
to_fd, elen, stbuf);
break;
default:
@@ -458,7 +463,8 @@ gf_changelog_decode (xlator_t *this, gf_changelog_t *gfc, int from_fd,
}
int
-gf_changelog_publish (xlator_t *this, gf_changelog_t *gfc, char *from_path)
+gf_changelog_publish (xlator_t *this,
+ gf_changelog_journal_t *jnl, char *from_path)
{
int ret = 0;
char dest[PATH_MAX] = {0,};
@@ -466,21 +472,21 @@ gf_changelog_publish (xlator_t *this, gf_changelog_t *gfc, char *from_path)
struct stat stbuf = {0,};
(void) snprintf (to_path, PATH_MAX, "%s%s",
- gfc->gfc_current_dir, basename (from_path));
+ jnl->jnl_current_dir, basename (from_path));
/* handle zerob file that wont exist in current */
ret = stat (to_path, &stbuf);
- if (ret){
+ if (ret) {
if (errno == ENOENT)
ret = 0;
goto out;
}
(void) snprintf (dest, PATH_MAX, "%s%s",
- gfc->gfc_processing_dir, basename (from_path));
+ jnl->jnl_processing_dir, basename (from_path));
ret = rename (to_path, dest);
- if (ret){
+ if (ret) {
gf_log (this->name, GF_LOG_ERROR,
"error moving %s to processing dir"
" (reason: %s)", to_path, strerror (errno));
@@ -492,7 +498,7 @@ out:
int
gf_changelog_consume (xlator_t *this,
- gf_changelog_t *gfc,
+ gf_changelog_journal_t *jnl,
char *from_path, gf_boolean_t no_publish)
{
int ret = -1;
@@ -520,9 +526,9 @@ gf_changelog_consume (xlator_t *this,
}
(void) snprintf (to_path, PATH_MAX, "%s%s",
- gfc->gfc_current_dir, basename (from_path));
+ jnl->jnl_current_dir, basename (from_path));
(void) snprintf (dest, PATH_MAX, "%s%s",
- gfc->gfc_processing_dir, basename (from_path));
+ jnl->jnl_processing_dir, basename (from_path));
fd2 = open (to_path, O_CREAT | O_TRUNC | O_RDWR,
S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
@@ -532,7 +538,7 @@ gf_changelog_consume (xlator_t *this,
to_path, strerror (errno));
goto close_fd;
} else {
- ret = gf_changelog_decode (this, gfc, fd1,
+ ret = gf_changelog_decode (this, jnl, fd1,
fd2, &stbuf, &zerob);
close (fd2);
@@ -568,88 +574,412 @@ gf_changelog_consume (xlator_t *this,
return ret;
}
-static char *
-gf_changelog_ext_change (xlator_t *this,
- gf_changelog_t *gfc, char *path, size_t readlen)
+void *
+gf_changelog_process (void *data)
{
- int alo = 0;
- int ret = 0;
- size_t len = 0;
- char *buf = NULL;
-
- buf = path;
- while (len < readlen) {
- if (*buf == '\0') {
- alo = 1;
- gf_log (this->name, GF_LOG_DEBUG,
- "processing changelog: %s", path);
- ret = gf_changelog_consume (this, gfc, path, _gf_false);
- }
+ int ret = 0;
+ xlator_t *this = NULL;
+ gf_changelog_journal_t *jnl = NULL;
+ gf_changelog_entry_t *entry = NULL;
+ gf_changelog_processor_t *jnl_proc = NULL;
- if (ret)
- break;
+ this = THIS;
- len++; buf++;
- if (alo) {
- alo = 0;
- path = buf;
+ jnl = data;
+ jnl_proc = jnl->jnl_proc;
+
+ while (1) {
+ pthread_mutex_lock (&jnl_proc->lock);
+ {
+ while (list_empty (&jnl_proc->entries)) {
+ jnl_proc->waiting = _gf_true;
+ pthread_cond_wait
+ (&jnl_proc->cond, &jnl_proc->lock);
+ }
+
+ entry = list_first_entry (&jnl_proc->entries,
+ gf_changelog_entry_t, list);
+ list_del (&entry->list);
+ jnl_proc->waiting = _gf_false;
}
+ pthread_mutex_unlock (&jnl_proc->lock);
+
+ if (entry) {
+ ret = gf_changelog_consume (this, jnl,
+ entry->path, _gf_false);
+ GF_FREE (entry);
+ }
+ }
+
+ return NULL;
+}
+
+inline void
+gf_changelog_queue_journal (gf_changelog_processor_t *jnl_proc,
+ changelog_event_t *event)
+{
+ size_t len = 0;
+ gf_changelog_entry_t *entry = NULL;
+
+ entry = GF_CALLOC (1, sizeof (gf_changelog_entry_t),
+ gf_changelog_mt_libgfchangelog_entry_t);
+ if (!entry)
+ return;
+ INIT_LIST_HEAD (&entry->list);
+
+ len = strlen (event->u.journal.path);
+ (void)memcpy (entry->path, event->u.journal.path, len+1);
+
+ pthread_mutex_lock (&jnl_proc->lock);
+ {
+ list_add_tail (&entry->list, &jnl_proc->entries);
+ if (jnl_proc->waiting)
+ pthread_cond_signal (&jnl_proc->cond);
+ }
+ pthread_mutex_unlock (&jnl_proc->lock);
+
+ return;
+}
+
+void
+gf_changelog_handle_journal (void *xl, char *brick,
+ void *cbkdata, changelog_event_t *event)
+{
+ int ret = 0;
+ gf_changelog_journal_t *jnl = NULL;
+ gf_changelog_processor_t *jnl_proc = NULL;
+
+ jnl = cbkdata;
+ jnl_proc = jnl->jnl_proc;
+
+ gf_changelog_queue_journal (jnl_proc, event);
+}
+
+void
+gf_changelog_journal_disconnect (void *xl, char *brick, void *data)
+{
+ gf_changelog_journal_t *jnl = NULL;
+
+ jnl = data;
+
+ pthread_spin_lock (&jnl->lock);
+ {
+ JNL_SET_API_STATE (jnl, JNL_API_DISCONNECTED);
+ };
+ pthread_spin_unlock (&jnl->lock);
+}
+
+void
+gf_changelog_journal_connect (void *xl, char *brick, void *data)
+{
+ gf_changelog_journal_t *jnl = NULL;
+
+ jnl = data;
+
+ pthread_spin_lock (&jnl->lock);
+ {
+ JNL_SET_API_STATE (jnl, JNL_API_CONNECTED);
+ };
+ pthread_spin_unlock (&jnl->lock);
+
+ return;
+}
+
+void
+gf_changelog_cleanup_processor (gf_changelog_journal_t *jnl)
+{
+ int ret = 0;
+ xlator_t *this = NULL;
+ gf_changelog_processor_t *jnl_proc = NULL;
+
+ this = THIS;
+ if (!this || !jnl || !jnl->jnl_proc)
+ goto error_return;
+
+ jnl_proc = jnl->jnl_proc;
+
+ ret = gf_thread_cleanup (this, jnl_proc->processor);
+ if (ret != 0) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "failed to cleanup processor thread");
+ goto error_return;
+ }
+
+ (void)pthread_mutex_destroy (&jnl_proc->lock);
+ (void)pthread_cond_destroy (&jnl_proc->cond);
+
+ GF_FREE (jnl_proc);
+
+ error_return:
+ return;
+}
+
+int
+gf_changelog_init_processor (gf_changelog_journal_t *jnl)
+{
+ int ret = -1;
+ gf_changelog_processor_t *jnl_proc = NULL;
+
+ jnl_proc = GF_CALLOC (1, sizeof (gf_changelog_processor_t),
+ gf_changelog_mt_libgfchangelog_t);
+ if (!jnl_proc)
+ goto error_return;
+
+ ret = pthread_mutex_init (&jnl_proc->lock, NULL);
+ if (ret != 0)
+ goto free_jnl_proc;
+ ret = pthread_cond_init (&jnl_proc->cond, NULL);
+ if (ret != 0)
+ goto cleanup_mutex;
+
+ INIT_LIST_HEAD (&jnl_proc->entries);
+ ret = pthread_create (&jnl_proc->processor,
+ NULL, gf_changelog_process, jnl);
+ if (ret != 0)
+ goto cleanup_cond;
+ jnl_proc->waiting = _gf_false;
+
+ jnl->jnl_proc = jnl_proc;
+ return 0;
+
+ cleanup_cond:
+ (void) pthread_cond_destroy (&jnl_proc->cond);
+ cleanup_mutex:
+ (void) pthread_mutex_destroy (&jnl_proc->lock);
+ free_jnl_proc:
+ GF_FREE (jnl_proc);
+ error_return:
+ return -1;
+}
+
+static void
+gf_changelog_cleanup_fds (gf_changelog_journal_t *jnl)
+{
+ /* tracker fd */
+ if (jnl->jnl_fd != -1)
+ close (jnl->jnl_fd);
+ /* processing dir */
+ if (jnl->jnl_dir)
+ closedir (jnl->jnl_dir);
+
+ if (jnl->jnl_working_dir)
+ free (jnl->jnl_working_dir); /* allocated by realpath */
+}
+
+static int
+gf_changelog_open_dirs (xlator_t *this, gf_changelog_journal_t *jnl)
+{
+ int ret = -1;
+ DIR *dir = NULL;
+ int tracker_fd = 0;
+ char tracker_path[PATH_MAX] = {0,};
+
+ /* .current */
+ (void) snprintf (jnl->jnl_current_dir, PATH_MAX,
+ "%s/"GF_CHANGELOG_CURRENT_DIR"/",
+ jnl->jnl_working_dir);
+ ret = recursive_rmdir (jnl->jnl_current_dir);
+ if (ret) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "Failed to rmdir: %s, err: %s",
+ jnl->jnl_current_dir, strerror (errno));
+ goto out;
+ }
+ ret = mkdir_p (jnl->jnl_current_dir, 0600, _gf_false);
+ if (ret)
+ goto out;
+
+ /* .processed */
+ (void) snprintf (jnl->jnl_processed_dir, PATH_MAX,
+ "%s/"GF_CHANGELOG_PROCESSED_DIR"/",
+ jnl->jnl_working_dir);
+ ret = mkdir_p (jnl->jnl_processed_dir, 0600, _gf_false);
+ if (ret)
+ goto out;
+
+ /* .processing */
+ (void) snprintf (jnl->jnl_processing_dir, PATH_MAX,
+ "%s/"GF_CHANGELOG_PROCESSING_DIR"/",
+ jnl->jnl_working_dir);
+ ret = recursive_rmdir (jnl->jnl_processing_dir);
+ if (ret) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "Failed to rmdir: %s, err: %s",
+ jnl->jnl_processing_dir, strerror (errno));
+ goto out;
+ }
+
+ ret = mkdir_p (jnl->jnl_processing_dir, 0600, _gf_false);
+ if (ret)
+ goto out;
+
+ dir = opendir (jnl->jnl_processing_dir);
+ if (!dir) {
+ gf_log ("", GF_LOG_ERROR,
+ "opendir() error [reason: %s]", strerror (errno));
+ goto out;
}
- return (ret) ? NULL : path;
+ jnl->jnl_dir = dir;
+
+ (void) snprintf (tracker_path, PATH_MAX,
+ "%s/"GF_CHANGELOG_TRACKER, jnl->jnl_working_dir);
+
+ tracker_fd = open (tracker_path, O_CREAT | O_APPEND | O_RDWR,
+ S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
+ if (tracker_fd < 0) {
+ closedir (jnl->jnl_dir);
+ ret = -1;
+ goto out;
+ }
+
+ jnl->jnl_fd = tracker_fd;
+ ret = 0;
+ out:
+ return ret;
+}
+
+int
+gf_changelog_init_history (xlator_t *this,
+ gf_changelog_journal_t *jnl,
+ char *brick_path, char *scratch_dir)
+{
+ int i = 0;
+ int ret = 0;
+ char hist_scratch_dir[PATH_MAX] = {0,};
+
+ jnl->hist_jnl = GF_CALLOC (1, sizeof (*jnl),
+ gf_changelog_mt_libgfchangelog_t);
+ if (!jnl->hist_jnl)
+ goto error_return;
+
+ jnl->hist_jnl->jnl_dir = NULL;
+ jnl->hist_jnl->jnl_fd = -1;
+
+ (void) strncpy (hist_scratch_dir, scratch_dir, PATH_MAX);
+ (void) snprintf (hist_scratch_dir, PATH_MAX,
+ "%s/"GF_CHANGELOG_HISTORY_DIR"/",
+ jnl->jnl_working_dir);
+
+ ret = mkdir_p (hist_scratch_dir, 0600, _gf_false);
+ if (ret)
+ goto dealloc_hist;
+
+ jnl->hist_jnl->jnl_working_dir = realpath (hist_scratch_dir, NULL);
+ if (!jnl->hist_jnl->jnl_working_dir)
+ goto dealloc_hist;
+
+ ret = gf_changelog_open_dirs (this, jnl->hist_jnl);
+ if (ret) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "could not create entries in history scratch dir");
+ goto dealloc_hist;
+ }
+
+ (void) strncpy (jnl->hist_jnl->jnl_brickpath, brick_path, PATH_MAX);
+
+ for (i = 0; i < 256; i++) {
+ jnl->hist_jnl->rfc3986[i] =
+ (isalnum(i) || i == '~' ||
+ i == '-' || i == '.' || i == '_') ? i : 0;
+ }
+
+ return 0;
+
+ dealloc_hist:
+ GF_FREE (jnl->hist_jnl);
+ jnl->hist_jnl = NULL;
+ error_return:
+ return -1;
+}
+
+void
+gf_changelog_journal_fini (void *xl, char *brick, void *data)
+{
+ int ret = 0;
+ xlator_t *this = NULL;
+ gf_changelog_journal_t *jnl = NULL;
+
+ this = xl;
+ jnl = data;
+
+ gf_changelog_cleanup_processor (jnl);
+
+ gf_changelog_cleanup_fds (jnl);
+ if (jnl->hist_jnl)
+ gf_changelog_cleanup_fds (jnl->hist_jnl);
+
+ GF_FREE (jnl);
}
void *
-gf_changelog_process (void *data)
+gf_changelog_journal_init (void *xl, struct gf_brick_spec *brick)
{
- ssize_t len = 0;
- ssize_t offlen = 0;
- xlator_t *this = NULL;
- char *sbuf = NULL;
- gf_changelog_t *gfc = NULL;
- char from_path[PATH_MAX] = {0,};
-
- gfc = (gf_changelog_t *) data;
- this = gfc->this;
-
- pthread_detach (pthread_self());
-
- for (;;) {
- len = gf_changelog_read_path (gfc->gfc_sockfd,
- from_path + offlen,
- PATH_MAX - offlen);
- if (len < 0)
- continue; /* ignore it for now */
-
- if (len == 0) { /* close() from the changelog translator */
- gf_log (this->name, GF_LOG_INFO, "close from changelog"
- " notification translator.");
-
- if (gfc->gfc_connretries != 1) {
- if (!gf_changelog_notification_init(this, gfc))
- continue;
- }
+ int i = 0;
+ int ret = 0;
+ xlator_t *this = NULL;
+ struct stat buf = {0,};
+ char *scratch_dir = NULL;
+ gf_changelog_journal_t *jnl = NULL;
+
+ this = xl;
+ scratch_dir = (char *) brick->ptr;
+
+ jnl = GF_CALLOC (1, sizeof (gf_changelog_journal_t),
+ gf_changelog_mt_libgfchangelog_t);
+ if (!jnl)
+ goto error_return;
+
+ if (stat (scratch_dir, &buf) && errno == ENOENT) {
+ ret = mkdir_p (scratch_dir, 0600, _gf_true);
+ if (ret)
+ goto dealloc_private;
+ }
- byebye = 1;
- break;
- }
+ jnl->jnl_working_dir = realpath (scratch_dir, NULL);
+ if (!jnl->jnl_working_dir)
+ goto dealloc_private;
- len += offlen;
- sbuf = gf_changelog_ext_change (this, gfc, from_path, len);
- if (!sbuf) {
- gf_log (this->name, GF_LOG_ERROR,
- "could not extract changelog filename");
- continue;
- }
+ ret = gf_changelog_open_dirs (this, jnl);
+ if (ret) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "could not create entries in scratch dir");
+ goto dealloc_private;
+ }
- offlen = 0;
- if (sbuf != (from_path + len)) {
- offlen = from_path + len - sbuf;
- memmove (from_path, sbuf, offlen);
- }
+ (void) strncpy (jnl->jnl_brickpath, brick->brick_path, PATH_MAX);
+
+ /* RFC 3986 {de,en}coding */
+ for (i = 0; i < 256; i++) {
+ jnl->rfc3986[i] =
+ (isalnum(i) || i == '~' ||
+ i == '-' || i == '.' || i == '_') ? i : 0;
}
- gf_log (this->name, GF_LOG_DEBUG,
- "byebye (%d) from processing thread...", byebye);
+ ret = gf_changelog_init_history (this, jnl,
+ brick->brick_path, scratch_dir);
+ if (ret)
+ goto cleanup_fds;
+
+ /* initialize journal processor */
+ ret = gf_changelog_init_processor (jnl);
+ if (ret)
+ goto cleanup_fds;
+
+ JNL_SET_API_STATE (jnl, JNL_API_CONN_INPROGESS);
+ ret = pthread_spin_init (&jnl->lock, 0);
+ if (ret != 0)
+ goto cleanup_processor;
+ return jnl;
+
+ cleanup_processor:
+ gf_changelog_cleanup_processor (jnl);
+ cleanup_fds:
+ gf_changelog_cleanup_fds (jnl);
+ if (jnl->hist_jnl)
+ gf_changelog_cleanup_fds (jnl->hist_jnl);
+ dealloc_private:
+ GF_FREE (jnl);
+ error_return:
return NULL;
}
diff --git a/xlators/features/changelog/lib/src/gf-changelog-journal.h b/xlators/features/changelog/lib/src/gf-changelog-journal.h
new file mode 100644
index 00000000000..9a0f0b28956
--- /dev/null
+++ b/xlators/features/changelog/lib/src/gf-changelog-journal.h
@@ -0,0 +1,114 @@
+/*
+ Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com>
+ This file is part of GlusterFS.
+
+ This file is licensed to you under your choice of the GNU Lesser
+ General Public License, version 3 or any later version (LGPLv3 or
+ later), or the GNU General Public License, version 2 (GPLv2), in all
+ cases as published by the Free Software Foundation.
+*/
+
+#ifndef __GF_CHANGELOG_JOURNAL_H
+#define __GF_CHANGELOG_JOURNAL_H
+
+#include <unistd.h>
+#include <pthread.h>
+
+#include "changelog.h"
+
+enum api_conn {
+ JNL_API_CONNECTED,
+ JNL_API_CONN_INPROGESS,
+ JNL_API_DISCONNECTED,
+};
+
+typedef struct gf_changelog_entry {
+ char path[PATH_MAX];
+
+ struct list_head list;
+} gf_changelog_entry_t;
+
+typedef struct gf_changelog_processor {
+ pthread_mutex_t lock; /* protects ->entries */
+ pthread_cond_t cond; /* waiter during empty list */
+ gf_boolean_t waiting;
+
+ pthread_t processor; /* thread-id of journal processing thread */
+
+ struct list_head entries;
+} gf_changelog_processor_t;
+
+typedef struct gf_changelog_journal {
+ DIR *jnl_dir; /* 'processing' directory stream */
+
+ int jnl_fd; /* fd to the tracker file */
+
+ char jnl_brickpath[PATH_MAX]; /* brick path for this end-point */
+
+ gf_changelog_processor_t *jnl_proc;
+
+ char *jnl_working_dir; /* scratch directory */
+
+ char jnl_current_dir[PATH_MAX];
+ char jnl_processed_dir[PATH_MAX];
+ char jnl_processing_dir[PATH_MAX];
+
+ char rfc3986[256]; /* RFC 3986 string encoding */
+
+ struct gf_changelog_journal *hist_jnl;
+ int hist_done; /* holds 0 done scanning,
+ 1 keep scanning and -1 error */
+
+ pthread_spinlock_t lock;
+ int connected;
+} gf_changelog_journal_t;
+
+#define JNL_SET_API_STATE(jnl, state) (jnl->connected = state)
+#define JNL_IS_API_DISCONNECTED(jnl) (jnl->connected == JNL_API_DISCONNECTED)
+
+/* History API */
+typedef struct gf_changelog_history_data {
+ int len;
+
+ int htime_fd;
+
+ /* parallelism count */
+ int n_parallel;
+
+ /* history from, to indexes */
+ unsigned long from;
+ unsigned long to;
+} gf_changelog_history_data_t;
+
+typedef struct gf_changelog_consume_data {
+ /** set of inputs */
+
+ /* fd to read from */
+ int fd;
+
+ /* from @offset */
+ off_t offset;
+
+ xlator_t *this;
+
+ gf_changelog_journal_t *jnl;
+
+ /** set of outputs */
+
+ /* return value */
+ int retval;
+
+ /* journal processed */
+ char changelog[PATH_MAX];
+} gf_changelog_consume_data_t;
+
+/* event handler */
+CALLBACK gf_changelog_handle_journal;
+
+/* init, connect & disconnect handler */
+INIT gf_changelog_journal_init;
+FINI gf_changelog_journal_fini;
+CONNECT gf_changelog_journal_connect;
+DISCONNECT gf_changelog_journal_disconnect;
+
+#endif
diff --git a/xlators/features/changelog/lib/src/gf-changelog-reborp.c b/xlators/features/changelog/lib/src/gf-changelog-reborp.c
new file mode 100644
index 00000000000..d7e60fb9634
--- /dev/null
+++ b/xlators/features/changelog/lib/src/gf-changelog-reborp.c
@@ -0,0 +1,381 @@
+/*
+ Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com>
+ This file is part of GlusterFS.
+
+ This file is licensed to you under your choice of the GNU Lesser
+ General Public License, version 3 or any later version (LGPLv3 or
+ later), or the GNU General Public License, version 2 (GPLv2), in all
+ cases as published by the Free Software Foundation.
+*/
+
+#include "changelog-misc.h"
+#include "changelog-mem-types.h"
+
+#include "gf-changelog-helpers.h"
+#include "changelog-rpc-common.h"
+
+/**
+ * Reverse socket: actual data transfer handler. Connection
+ * initiator is PROBER, data transfer is REBORP.
+ */
+
+struct rpcsvc_program *gf_changelog_reborp_programs[];
+
+/**
+ * On a reverse connection, unlink the socket file.
+ */
+int
+gf_changelog_reborp_rpcsvc_notify (rpcsvc_t *rpc, void *mydata,
+ rpcsvc_event_t event, void *data)
+{
+ int ret = 0;
+ xlator_t *this = NULL;
+ gf_private_t *priv = NULL;
+ gf_changelog_t *entry = NULL;
+ char sock[UNIX_PATH_MAX] = {0,};
+
+ entry = mydata;
+ this = entry->this;
+ priv = this->private;
+
+ switch (event) {
+ case RPCSVC_EVENT_ACCEPT:
+ ret = unlink (RPC_SOCK(entry));
+ if (ret != 0)
+ gf_log (this->name, GF_LOG_WARNING, "failed to unlink"
+ " reverse socket file %s", RPC_SOCK (entry));
+ if (entry->connected)
+ GF_CHANGELOG_INVOKE_CBK (this, entry->connected,
+ entry->brick, entry->ptr);
+ break;
+ case RPCSVC_EVENT_DISCONNECT:
+ LOCK (&priv->lock);
+ {
+ list_del (&entry->list);
+ }
+ UNLOCK (&priv->lock);
+
+ if (entry->disconnected)
+ GF_CHANGELOG_INVOKE_CBK (this, entry->disconnected,
+ entry->brick, entry->ptr);
+
+ GF_FREE (entry);
+ break;
+ default:
+ break;
+ }
+
+ return 0;
+}
+
+rpcsvc_t *
+gf_changelog_reborp_init_rpc_listner (xlator_t *this,
+ char *path, char *sock, void *cbkdata)
+{
+ CHANGELOG_MAKE_TMP_SOCKET_PATH (path, sock, UNIX_PATH_MAX);
+ return changelog_rpc_server_init (this, sock, cbkdata,
+ gf_changelog_reborp_rpcsvc_notify,
+ gf_changelog_reborp_programs);
+}
+
+/**
+ * This is dirty and painful as of now untill there is event filtering in the
+ * server. The entire event buffer is scanned and interested events are picked,
+ * whereas we _should_ be notified with the events we were interested in
+ * (selected at the time of probe). As of now this is complete BS and needs
+ * fixture ASAP. I just made it work, it needs to be better.
+ *
+ * @FIXME: cleanup this bugger once server filters events.
+ */
+inline void
+gf_changelog_invoke_callback (gf_changelog_t *entry,
+ struct iovec **vec, int payloadcnt)
+{
+ int i = 0;
+ int evsize = 0;
+ xlator_t *this = NULL;
+ changelog_event_t *event = NULL;
+
+ this = entry->this;
+
+ for (; i < payloadcnt; i++) {
+ event = (changelog_event_t *)vec[i]->iov_base;
+ evsize = vec[i]->iov_len / CHANGELOG_EV_SIZE;
+
+ for (; evsize > 0; evsize--, event++) {
+ if (gf_changelog_filter_check (entry, event)) {
+ GF_CHANGELOG_INVOKE_CBK (this,
+ entry->callback,
+ entry->brick,
+ entry->ptr, event);
+ }
+ }
+ }
+}
+
+/**
+ * Ordered event handler is self-adaptive.. if the event sequence number
+ * is what's expected (->next_seq) there is no ordering list that's
+ * maintained. On out-of-order event notifications, event buffers are
+ * dynamically allocated and ordered.
+ */
+
+inline int
+__is_expected_sequence (struct gf_event_list *ev, struct gf_event *event)
+{
+ return (ev->next_seq == event->seq);
+}
+
+inline int
+__can_process_event (struct gf_event_list *ev, struct gf_event **event)
+{
+ *event = list_first_entry (&ev->events, struct gf_event, list);
+
+ if (__is_expected_sequence (ev, *event)) {
+ list_del (&(*event)->list);
+ ev->next_seq++;
+ return 1;
+ }
+
+ return 0;
+}
+
+inline void
+__process_event_list (struct gf_event_list *ev, struct gf_event **event)
+{
+ while (list_empty (&ev->events)
+ || !__can_process_event (ev, event))
+ pthread_cond_wait (&ev->cond, &ev->lock);
+}
+
+void *
+gf_changelog_callback_invoker (void *arg)
+{
+ int ret = 0;
+ xlator_t *this = NULL;
+ gf_changelog_t *entry = NULL;
+ struct iovec *vec = NULL;
+ struct gf_event *event = NULL;
+ struct gf_event_list *ev = NULL;
+
+ ev = arg;
+ entry = ev->entry;
+ this = entry->this;
+
+ while (1) {
+ pthread_mutex_lock (&ev->lock);
+ {
+ __process_event_list (ev, &event);
+ }
+ pthread_mutex_unlock (&ev->lock);
+
+ vec = (struct iovec *) &event->iov;
+ gf_changelog_invoke_callback (entry, &vec, event->count);
+
+ GF_FREE (event);
+ }
+
+ return NULL;
+}
+
+static int
+orderfn (struct list_head *pos1, struct list_head *pos2)
+{
+ struct gf_event *event1 = NULL;
+ struct gf_event *event2 = NULL;
+
+ event1 = list_entry (pos1, struct gf_event, list);
+ event2 = list_entry (pos2, struct gf_event, list);
+
+ if (event1->seq > event2->seq)
+ return 1;
+ return -1;
+}
+
+int
+gf_changelog_ordered_event_handler (rpcsvc_request_t *req,
+ xlator_t *this, gf_changelog_t *entry)
+{
+ int i = 0;
+ size_t payloadlen = 0;
+ ssize_t len = 0;
+ int payloadcnt = 0;
+ changelog_event_req rpc_req = {0,};
+ changelog_event_rsp rpc_rsp = {0,};
+ struct iovec *vec = NULL;
+ struct gf_event *event = NULL;
+ struct gf_event_list *ev = NULL;
+
+ ev = &entry->event;
+
+ len = xdr_to_generic (req->msg[0],
+ &rpc_req, (xdrproc_t)xdr_changelog_event_req);
+ if (len < 0) {
+ gf_log (this->name, GF_LOG_ERROR, "xdr decoding failed");
+ req->rpc_err = GARBAGE_ARGS;
+ goto handle_xdr_error;
+ }
+
+ if (len < req->msg[0].iov_len) {
+ payloadcnt = 1;
+ payloadlen = (req->msg[0].iov_len - len);
+ }
+ for (i = 1; i < req->count; i++) {
+ payloadcnt++;
+ payloadlen += req->msg[i].iov_len;
+ }
+
+ event = GF_CALLOC (1, GF_EVENT_CALLOC_SIZE (payloadcnt, payloadlen),
+ gf_changelog_mt_libgfchangelog_event_t);
+ if (!event)
+ goto handle_xdr_error;
+ INIT_LIST_HEAD (&event->list);
+
+ payloadlen = 0;
+ event->seq = rpc_req.seq;
+ event->count = payloadcnt;
+
+ /* deep copy IO vectors */
+ vec = &event->iov[0];
+ GF_EVENT_ASSIGN_IOVEC (vec, event,
+ (req->msg[0].iov_len - len), payloadlen);
+ (void) memcpy (vec->iov_base,
+ req->msg[0].iov_base + len, vec->iov_len);
+
+ for (i = 1; i < req->count; i++) {
+ vec = &event->iov[i];
+ GF_EVENT_ASSIGN_IOVEC (vec, event,
+ req->msg[i].iov_len, payloadlen);
+ (void) memcpy (event->iov[i].iov_base,
+ req->msg[i].iov_base, req->msg[i].iov_len);
+ }
+
+ gf_log (this->name, GF_LOG_DEBUG,
+ "seq: %lu [%s] (time: %lu.%lu), (vec: %d, len: %ld)",
+ rpc_req.seq, entry->brick, rpc_req.tv_sec,
+ rpc_req.tv_usec, payloadcnt, payloadlen);
+
+ /* add it to the ordered event list and wake up listner(s) */
+ pthread_mutex_lock (&ev->lock);
+ {
+ list_add_order (&event->list, &ev->events, orderfn);
+ if (!ev->next_seq)
+ ev->next_seq = event->seq;
+ if (ev->next_seq == event->seq)
+ pthread_cond_signal (&ev->cond);
+ }
+ pthread_mutex_unlock (&ev->lock);
+
+ /* ack sequence number */
+ rpc_rsp.op_ret = 0;
+ rpc_rsp.seq = rpc_req.seq;
+
+ goto submit_rpc;
+
+ handle_xdr_error:
+ rpc_rsp.op_ret = -1;
+ rpc_rsp.seq = 0; /* invalid */
+ submit_rpc:
+ return changelog_rpc_sumbit_reply (req, &rpc_rsp, NULL, 0, NULL,
+ (xdrproc_t)xdr_changelog_event_rsp);
+}
+
+int
+gf_changelog_unordered_event_handler (rpcsvc_request_t *req,
+ xlator_t *this, gf_changelog_t *entry)
+{
+ int i = 0;
+ int ret = 0;
+ ssize_t len = 0;
+ int payloadcnt = 0;
+ struct iovec vector[MAX_IOVEC] = {{0,}, };
+ changelog_event_req rpc_req = {0,};
+ changelog_event_rsp rpc_rsp = {0,};
+
+ len = xdr_to_generic (req->msg[0],
+ &rpc_req, (xdrproc_t)xdr_changelog_event_req);
+ if (len < 0) {
+ gf_log (this->name, GF_LOG_ERROR, "xdr decoding failed");
+ req->rpc_err = GARBAGE_ARGS;
+ goto handle_xdr_error;
+ }
+
+ /* prepare payload */
+ if (len < req->msg[0].iov_len) {
+ payloadcnt = 1;
+ vector[0].iov_base = (req->msg[0].iov_base + len);
+ vector[0].iov_len = (req->msg[0].iov_len - len);
+ }
+
+ for (i = 1; i < req->count; i++) {
+ vector[payloadcnt++] = req->msg[i];
+ }
+
+ gf_log (this->name, GF_LOG_DEBUG,
+ "seq: %lu (time: %lu.%lu), (vec: %d)",
+ rpc_req.seq, rpc_req.tv_sec, rpc_req.tv_usec, payloadcnt);
+
+ /* invoke callback */
+ struct iovec *vec = (struct iovec *) &vector;
+ gf_changelog_invoke_callback (entry, &vec, payloadcnt);
+
+ /* ack sequence number */
+ rpc_rsp.op_ret = 0;
+ rpc_rsp.seq = rpc_req.seq;
+
+ goto submit_rpc;
+
+ handle_xdr_error:
+ rpc_rsp.op_ret = -1;
+ rpc_rsp.seq = 0; /* invalid */
+ submit_rpc:
+ return changelog_rpc_sumbit_reply (req, &rpc_rsp, NULL, 0, NULL,
+ (xdrproc_t)xdr_changelog_event_rsp);
+}
+
+int
+gf_changelog_reborp_handle_event (rpcsvc_request_t *req)
+{
+ int ret = 0;
+ xlator_t *this = NULL;
+ rpcsvc_t *svc = NULL;
+ gf_changelog_t *entry = NULL;
+
+ svc = rpcsvc_request_service (req);
+ entry = svc->mydata;
+
+ this = THIS = entry->this;
+
+ ret = GF_NEED_ORDERED_EVENTS (entry)
+ ? gf_changelog_ordered_event_handler (req, this, entry)
+ : gf_changelog_unordered_event_handler (req, this, entry);
+
+ return ret;
+}
+
+rpcsvc_actor_t gf_changelog_reborp_actors[CHANGELOG_REV_PROC_MAX] = {
+ [CHANGELOG_REV_PROC_EVENT] = {
+ "CHANGELOG EVENT HANDLER", CHANGELOG_REV_PROC_EVENT,
+ gf_changelog_reborp_handle_event, NULL, 0, DRC_NA
+ },
+};
+
+/**
+ * Do not use synctask as the RPC layer dereferences ->mydata as THIS.
+ * In gf_changelog_setup_rpc(), @cbkdata is of type @gf_changelog_t,
+ * and that's required to invoke the callback with the appropriate
+ * brick path and it's private data.
+ */
+struct rpcsvc_program gf_changelog_reborp_prog = {
+ .progname = "LIBGFCHANGELOG REBORP",
+ .prognum = CHANGELOG_REV_RPC_PROCNUM,
+ .progver = CHANGELOG_REV_RPC_PROCVER,
+ .numactors = CHANGELOG_REV_PROC_MAX,
+ .actors = gf_changelog_reborp_actors,
+ .synctask = _gf_false,
+};
+
+struct rpcsvc_program *gf_changelog_reborp_programs[] = {
+ &gf_changelog_reborp_prog,
+ NULL,
+};
diff --git a/xlators/features/changelog/lib/src/gf-changelog-rpc.c b/xlators/features/changelog/lib/src/gf-changelog-rpc.c
new file mode 100644
index 00000000000..c2a4c044d23
--- /dev/null
+++ b/xlators/features/changelog/lib/src/gf-changelog-rpc.c
@@ -0,0 +1,105 @@
+/*
+ Copyright (c) 2013 Red Hat, Inc. <http://www.redhat.com>
+ This file is part of GlusterFS.
+
+ This file is licensed to you under your choice of the GNU Lesser
+ General Public License, version 3 or any later version (LGPLv3 or
+ later), or the GNU General Public License, version 2 (GPLv2), in all
+ cases as published by the Free Software Foundation.
+*/
+
+#include "gf-changelog-rpc.h"
+#include "changelog-misc.h"
+#include "changelog-mem-types.h"
+
+struct rpc_clnt_program gf_changelog_clnt;
+
+/* TODO: piggyback reconnect to called (upcall) */
+int
+gf_changelog_rpc_notify (struct rpc_clnt *rpc,
+ void *mydata, rpc_clnt_event_t event, void *data)
+{
+ xlator_t *this = NULL;
+
+ this = mydata;
+
+ switch (event) {
+ case RPC_CLNT_CONNECT:
+ rpc_clnt_set_connected (&rpc->conn);
+ break;
+ case RPC_CLNT_DISCONNECT:
+ rpc_clnt_unset_connected (&rpc->conn);
+ break;
+ case RPC_CLNT_MSG:
+ case RPC_CLNT_DESTROY:
+ break;
+ }
+
+ return 0;
+}
+
+struct rpc_clnt *
+gf_changelog_rpc_init (xlator_t *this, gf_changelog_t *entry)
+{
+ char sockfile[UNIX_PATH_MAX] = {0,};
+
+ CHANGELOG_MAKE_SOCKET_PATH (entry->brick,
+ sockfile, UNIX_PATH_MAX);
+ return changelog_rpc_client_init (this, entry,
+ sockfile, gf_changelog_rpc_notify);
+}
+
+/**
+ * remote procedure calls declarations.
+ */
+
+int
+gf_probe_changelog_cbk (struct rpc_req *req,
+ struct iovec *iovec, int count, void *myframe)
+{
+ return 0;
+}
+
+int
+gf_probe_changelog_filter (call_frame_t *frame, xlator_t *this, void *data)
+{
+ int ret = 0;
+ char *sock = NULL;
+ gf_changelog_t *entry = NULL;
+ changelog_probe_req req = {0,};
+
+ entry = data;
+ sock = RPC_SOCK (entry);
+
+ (void) memcpy (&req.sock, sock, strlen (sock));
+ req.filter = entry->notify;
+
+ /* invoke RPC */
+ return changelog_rpc_sumbit_req (RPC_PROBER (entry), (void *) &req,
+ frame, &gf_changelog_clnt,
+ CHANGELOG_RPC_PROBE_FILTER, NULL, 0,
+ NULL, this, gf_probe_changelog_cbk,
+ (xdrproc_t) xdr_changelog_probe_req);
+}
+
+int
+gf_changelog_invoke_rpc (xlator_t *this, gf_changelog_t *entry, int procidx)
+{
+ return changelog_invoke_rpc (this, RPC_PROBER (entry),
+ &gf_changelog_clnt, procidx, entry);
+}
+
+struct rpc_clnt_procedure gf_changelog_procs[CHANGELOG_RPC_PROC_MAX] = {
+ [CHANGELOG_RPC_PROC_NULL] = {"NULL", NULL},
+ [CHANGELOG_RPC_PROBE_FILTER] = {
+ "PROBE FILTER", gf_probe_changelog_filter
+ },
+};
+
+struct rpc_clnt_program gf_changelog_clnt = {
+ .progname = "LIBGFCHANGELOG",
+ .prognum = CHANGELOG_RPC_PROGNUM,
+ .progver = CHANGELOG_RPC_PROGVER,
+ .numproc = CHANGELOG_RPC_PROC_MAX,
+ .proctable = gf_changelog_procs,
+};
diff --git a/xlators/features/changelog/src/changelog-notifier.h b/xlators/features/changelog/lib/src/gf-changelog-rpc.h
index 55e728356e6..1c982eef809 100644
--- a/xlators/features/changelog/src/changelog-notifier.h
+++ b/xlators/features/changelog/lib/src/gf-changelog-rpc.h
@@ -8,12 +8,19 @@
cases as published by the Free Software Foundation.
*/
-#ifndef _CHANGELOG_NOTIFIER_H
-#define _CHANGELOG_NOTIFIER_H
+#ifndef __GF_CHANGELOG_RPC_H
+#define __GF_CHANGELOG_RPC_H
-#include "changelog-helpers.h"
+#include "xlator.h"
-void *
-changelog_notifier (void *data);
+#include "gf-changelog-helpers.h"
+#include "changelog-rpc-common.h"
+
+struct rpc_clnt *gf_changelog_rpc_init (xlator_t *, gf_changelog_t *);
+
+int gf_changelog_invoke_rpc (xlator_t *, gf_changelog_t *, int);
+
+rpcsvc_t *
+gf_changelog_reborp_init_rpc_listner (xlator_t *, char *, char *, void *);
#endif
diff --git a/xlators/features/changelog/lib/src/gf-changelog.c b/xlators/features/changelog/lib/src/gf-changelog.c
index fb2d9037ffb..8f33eb01013 100644
--- a/xlators/features/changelog/lib/src/gf-changelog.c
+++ b/xlators/features/changelog/lib/src/gf-changelog.c
@@ -14,6 +14,8 @@
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/un.h>
+#include <sys/time.h>
+#include <sys/resource.h>
#ifndef _GNU_SOURCE
#define _GNU_SOURCE
@@ -24,589 +26,523 @@
#include "glusterfs.h"
#include "logging.h"
#include "defaults.h"
+#include "syncop.h"
+#include "gf-changelog-rpc.h"
#include "gf-changelog-helpers.h"
/* from the changelog translator */
#include "changelog-misc.h"
#include "changelog-mem-types.h"
-int byebye = 0;
+/**
+ * Global singleton xlator pointer for the library, initialized
+ * during library load. This should probably be hidden inside
+ * an initialized object which is an handle for the consumer.
+ *
+ * TODO: do away with the global..
+ */
+xlator_t *master = NULL;
-static void
-gf_changelog_cleanup (gf_changelog_t *gfc)
+static inline
+gf_private_t *gf_changelog_alloc_priv ()
{
- /* socket */
- if (gfc->gfc_sockfd != -1)
- close (gfc->gfc_sockfd);
- /* tracker fd */
- if (gfc->gfc_fd != -1)
- close (gfc->gfc_fd);
- /* processing dir */
- if (gfc->gfc_dir)
- closedir (gfc->gfc_dir);
-
- if (gfc->gfc_working_dir)
- free (gfc->gfc_working_dir); /* allocated by realpath */
-}
+ int ret = 0;
+ gf_private_t *priv = NULL;
-void
-__attribute__ ((constructor)) gf_changelog_ctor (void)
-{
- glusterfs_ctx_t *ctx = NULL;
+ priv = calloc (1, sizeof (gf_private_t));
+ if (!priv)
+ goto error_return;
+ INIT_LIST_HEAD (&priv->connections);
- ctx = glusterfs_ctx_new ();
- if (!ctx)
- return;
+ ret = LOCK_INIT (&priv->lock);
+ if (ret != 0)
+ goto free_priv;
+ priv->api = NULL;
- if (glusterfs_globals_init (ctx)) {
- free (ctx);
- ctx = NULL;
- return;
- }
+ return priv;
- THIS->ctx = ctx;
- if (xlator_mem_acct_init (THIS, gf_changelog_mt_end))
- return;
+ free_priv:
+ free (priv);
+ error_return:
+ return NULL;
}
-void
-__attribute__ ((destructor)) gf_changelog_dtor (void)
-{
- xlator_t *this = NULL;
- glusterfs_ctx_t *ctx = NULL;
- gf_changelog_t *gfc = NULL;
+#define GF_CHANGELOG_EVENT_POOL_SIZE 16384
+#define GF_CHANGELOG_EVENT_THREAD_COUNT 4
- this = THIS;
- if (!this)
- return;
-
- ctx = this->ctx;
- gfc = this->private;
-
- if (gfc) {
- if (gfc->hist_gfc) {
- gf_changelog_cleanup(gfc->hist_gfc);
- GF_FREE (gfc->hist_gfc);
- }
- gf_changelog_cleanup (gfc);
- GF_FREE (gfc);
+static int
+gf_changelog_ctx_defaults_init (glusterfs_ctx_t *ctx)
+{
+ cmd_args_t *cmd_args = NULL;
+ struct rlimit lim = {0, };
+ call_pool_t *pool = NULL;
+ int ret = -1;
+
+ ret = xlator_mem_acct_init (THIS, gf_changelog_mt_end);
+ if (ret != 0) {
+ return ret;
}
- if (ctx) {
- pthread_mutex_destroy (&ctx->lock);
- free (ctx);
- ctx = NULL;
- }
-}
+ ctx->process_uuid = generate_glusterfs_ctx_id ();
+ if (!ctx->process_uuid)
+ return -1;
+ ctx->page_size = 128 * GF_UNIT_KB;
-static int
-gf_changelog_open_dirs (gf_changelog_t *gfc)
-{
- int ret = -1;
- DIR *dir = NULL;
- int tracker_fd = 0;
- char tracker_path[PATH_MAX] = {0,};
- xlator_t *this = NULL;
+ ctx->iobuf_pool = iobuf_pool_new ();
+ if (!ctx->iobuf_pool)
+ return -1;
- this = THIS;
- GF_ASSERT (this);
+ ctx->event_pool = event_pool_new (GF_CHANGELOG_EVENT_POOL_SIZE,
+ GF_CHANGELOG_EVENT_THREAD_COUNT);
+ if (!ctx->event_pool)
+ return -1;
- (void) snprintf (gfc->gfc_current_dir, PATH_MAX,
- "%s/"GF_CHANGELOG_CURRENT_DIR"/",
- gfc->gfc_working_dir);
+ pool = GF_CALLOC (1, sizeof (call_pool_t),
+ gf_changelog_mt_libgfchangelog_call_pool_t);
+ if (!pool)
+ return -1;
- ret = recursive_rmdir (gfc->gfc_current_dir);
- if (ret) {
- gf_log (this->name, GF_LOG_ERROR,
- "Failed to rmdir: %s, err: %s",
- gfc->gfc_current_dir, strerror (errno));
- goto out;
- }
+ /* frame_mem_pool size 112 * 64 */
+ pool->frame_mem_pool = mem_pool_new (call_frame_t, 32);
+ if (!pool->frame_mem_pool)
+ return -1;
- ret = mkdir_p (gfc->gfc_current_dir, 0600, _gf_false);
- if (ret)
- goto out;
+ /* stack_mem_pool size 256 * 128 */
+ pool->stack_mem_pool = mem_pool_new (call_stack_t, 16);
- (void) snprintf (gfc->gfc_processed_dir, PATH_MAX,
- "%s/"GF_CHANGELOG_PROCESSED_DIR"/",
- gfc->gfc_working_dir);
+ if (!pool->stack_mem_pool)
+ return -1;
- ret = mkdir_p (gfc->gfc_processed_dir, 0600, _gf_false);
- if (ret)
- goto out;
+ ctx->stub_mem_pool = mem_pool_new (call_stub_t, 16);
+ if (!ctx->stub_mem_pool)
+ return -1;
- (void) snprintf (gfc->gfc_processing_dir, PATH_MAX,
- "%s/"GF_CHANGELOG_PROCESSING_DIR"/",
- gfc->gfc_working_dir);
+ ctx->dict_pool = mem_pool_new (dict_t, 32);
+ if (!ctx->dict_pool)
+ return -1;
- ret = recursive_rmdir (gfc->gfc_processing_dir);
- if (ret) {
- gf_log (this->name, GF_LOG_ERROR,
- "Failed to rmdir: %s, err: %s",
- gfc->gfc_processing_dir, strerror (errno));
- goto out;
- }
+ ctx->dict_pair_pool = mem_pool_new (data_pair_t, 512);
+ if (!ctx->dict_pair_pool)
+ return -1;
- ret = mkdir_p (gfc->gfc_processing_dir, 0600, _gf_false);
- if (ret)
- goto out;
+ ctx->dict_data_pool = mem_pool_new (data_t, 512);
+ if (!ctx->dict_data_pool)
+ return -1;
- dir = opendir (gfc->gfc_processing_dir);
- if (!dir) {
- gf_log ("", GF_LOG_ERROR,
- "opendir() error [reason: %s]", strerror (errno));
- goto out;
- }
+ INIT_LIST_HEAD (&pool->all_frames);
+ LOCK_INIT (&pool->lock);
+ ctx->pool = pool;
- gfc->gfc_dir = dir;
+ pthread_mutex_init (&(ctx->lock), NULL);
- (void) snprintf (tracker_path, PATH_MAX,
- "%s/"GF_CHANGELOG_TRACKER, gfc->gfc_working_dir);
+ cmd_args = &ctx->cmd_args;
- tracker_fd = open (tracker_path, O_CREAT | O_APPEND | O_RDWR,
- S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
- if (tracker_fd < 0) {
- closedir (gfc->gfc_dir);
- ret = -1;
- goto out;
- }
+ INIT_LIST_HEAD (&cmd_args->xlator_options);
+
+ lim.rlim_cur = RLIM_INFINITY;
+ lim.rlim_max = RLIM_INFINITY;
+ setrlimit (RLIMIT_CORE, &lim);
- gfc->gfc_fd = tracker_fd;
- ret = 0;
- out:
- return ret;
+ return 0;
}
-int
-gf_changelog_notification_init (xlator_t *this, gf_changelog_t *gfc)
+/* TODO: cleanup ctx defaults */
+static void
+gf_changelog_cleanup_this (xlator_t *this)
{
- int ret = 0;
- int len = 0;
- int tries = 0;
- int sockfd = 0;
- struct sockaddr_un remote;
-
- this = gfc->this;
+ glusterfs_ctx_t *ctx = NULL;
- if (gfc->gfc_sockfd != -1) {
- gf_log (this->name, GF_LOG_INFO,
- "Reconnecting...");
- close (gfc->gfc_sockfd);
- }
+ if (!this)
+ return;
- sockfd = socket (AF_UNIX, SOCK_STREAM, 0);
- if (sockfd < 0) {
- ret = -1;
- goto out;
- }
+ ctx = this->ctx;
+ syncenv_destroy (ctx->env);
+ free (ctx);
- CHANGELOG_MAKE_SOCKET_PATH (gfc->gfc_brickpath,
- gfc->gfc_sockpath, UNIX_PATH_MAX);
- gf_log (this->name, GF_LOG_INFO,
- "connecting to changelog socket: %s (brick: %s)",
- gfc->gfc_sockpath, gfc->gfc_brickpath);
+ if (this->private)
+ free (this->private);
- remote.sun_family = AF_UNIX;
- strcpy (remote.sun_path, gfc->gfc_sockpath);
+ this->private = NULL;
+ this->ctx = NULL;
+}
- len = strlen (remote.sun_path) + sizeof (remote.sun_family);
+static int
+gf_changelog_init_this ()
+{
+ glusterfs_ctx_t *ctx = NULL;
- while (tries < gfc->gfc_connretries) {
- gf_log (this->name, GF_LOG_WARNING,
- "connection attempt %d/%d...",
- tries + 1, gfc->gfc_connretries);
+ ctx = glusterfs_ctx_new ();
+ if (!ctx)
+ goto error_return;
- /* initiate a connect */
- if (connect (sockfd, (struct sockaddr *) &remote, len) == 0) {
- gfc->gfc_sockfd = sockfd;
- break;
- }
+ if (glusterfs_globals_init (ctx))
+ goto free_ctx;
- tries++;
- sleep (2);
- }
+ THIS->ctx = ctx;
+ if (gf_changelog_ctx_defaults_init (ctx))
+ goto free_ctx;
- if (tries == gfc->gfc_connretries) {
- gf_log (this->name, GF_LOG_ERROR,
- "could not connect to changelog socket!"
- " bailing out...");
- close (sockfd);
- ret = -1;
- } else
- gf_log (this->name, GF_LOG_INFO,
- "connection successful");
+ ctx->env = syncenv_new (0, 0, 0);
+ if (!ctx->env)
+ goto free_ctx;
+ return 0;
- out:
- return ret;
+ free_ctx:
+ free (ctx);
+ THIS->ctx = NULL;
+ error_return:
+ return -1;
}
-int
-gf_changelog_done (char *file)
+static int
+gf_changelog_init_master ()
{
- int ret = -1;
- char *buffer = NULL;
- xlator_t *this = NULL;
- gf_changelog_t *gfc = NULL;
- char to_path[PATH_MAX] = {0,};
-
- errno = EINVAL;
-
- this = THIS;
- if (!this)
- goto out;
-
- gfc = (gf_changelog_t *) this->private;
- if (!gfc)
- goto out;
+ int ret = 0;
+ gf_private_t *priv = NULL;
+ glusterfs_ctx_t *ctx = NULL;
- if (!file || !strlen (file))
- goto out;
+ ret = gf_changelog_init_this ();
+ if (ret != 0)
+ goto error_return;
+ master = THIS;
+
+ priv = gf_changelog_alloc_priv ();
+ if (!priv)
+ goto cleanup_master;
+ master->private = priv;
+
+ /* poller thread */
+ ret = pthread_create (&priv->poller,
+ NULL, changelog_rpc_poller, master);
+ if (ret != 0) {
+ gf_log (master->name, GF_LOG_ERROR,
+ "failed to spawn poller thread");
+ goto cleanup_master;
+ }
- /* make sure 'file' is inside ->gfc_working_dir */
- buffer = realpath (file, NULL);
- if (!buffer)
- goto out;
+ return 0;
- if (strncmp (gfc->gfc_working_dir,
- buffer, strlen (gfc->gfc_working_dir)))
- goto out;
+ cleanup_master:
+ master->private = NULL;
+ gf_changelog_cleanup_this (master);
+ error_return:
+ return -1;
+}
- (void) snprintf (to_path, PATH_MAX, "%s%s",
- gfc->gfc_processed_dir, basename (buffer));
- gf_log (this->name, GF_LOG_DEBUG,
- "moving %s to processed directory", file);
- ret = rename (buffer, to_path);
- if (ret) {
- gf_log (this->name, GF_LOG_ERROR,
- "cannot move %s to %s (reason: %s)",
- file, to_path, strerror (errno));
- goto out;
- }
+/* ctor/dtor */
- ret = 0;
+void
+__attribute__ ((constructor)) gf_changelog_ctor (void)
+{
+ (void) gf_changelog_init_master ();
+}
- out:
- if (buffer)
- free (buffer); /* allocated by realpath() */
- return ret;
+void
+__attribute__ ((destructor)) gf_changelog_dtor (void)
+{
+ gf_changelog_cleanup_this (master);
}
-/**
- * @API
- * for a set of changelogs, start from the beginning
- */
+/* TODO: cleanup clnt/svc on failure */
int
-gf_changelog_start_fresh ()
+gf_changelog_setup_rpc (xlator_t *this,
+ gf_changelog_t *entry, int proc)
{
- xlator_t *this = NULL;
- gf_changelog_t *gfc = NULL;
-
- this = THIS;
- if (!this)
- goto out;
+ int ret = 0;
+ rpcsvc_t *svc = NULL;
+ struct rpc_clnt *rpc = NULL;
- errno = EINVAL;
+ /**
+ * Initialize a connect back socket. A probe() RPC call to the server
+ * triggers a reverse connect.
+ */
+ svc = gf_changelog_reborp_init_rpc_listner (this, entry->brick,
+ RPC_SOCK (entry), entry);
+ if (!svc)
+ goto error_return;
+ RPC_REBORP (entry) = svc;
+
+ /* Initialize an RPC client */
+ rpc = gf_changelog_rpc_init (this, entry);
+ if (!rpc)
+ goto error_return;
+ RPC_PROBER (entry) = rpc;
- gfc = (gf_changelog_t *) this->private;
- if (!gfc)
- goto out;
+ /**
+ * @FIXME
+ * till we have connection state machine, let's delay the RPC call
+ * for now..
+ */
+ sleep (2);
- if (gf_ftruncate (gfc->gfc_fd, 0))
- goto out;
+ /**
+ * Probe changelog translator for reverse connection. After a successful
+ * call, there's less use of the client and can be disconnected, but
+ * let's leave the connection active for any future RPC calls.
+ */
+ ret = gf_changelog_invoke_rpc (this, entry, proc);
+ if (ret) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "Could not initiate probe RPC, bailing out!!!");
+ goto error_return;
+ }
return 0;
- out:
+ error_return:
return -1;
}
-/**
- * @API
- * return the next changelog file entry. zero means all chanelogs
- * consumed.
- */
-ssize_t
-gf_changelog_next_change (char *bufptr, size_t maxlen)
+static void
+gf_cleanup_event (gf_changelog_t *entry)
{
- ssize_t size = -1;
- int tracker_fd = 0;
- xlator_t *this = NULL;
- gf_changelog_t *gfc = NULL;
- char buffer[PATH_MAX] = {0,};
-
- if (maxlen > PATH_MAX) {
- errno = ENAMETOOLONG;
- goto out;
- }
+ xlator_t *this = NULL;
+ struct gf_event_list *ev = NULL;
- errno = EINVAL;
+ this = entry->this;
+ ev = &entry->event;
- this = THIS;
- if (!this)
- goto out;
+ (void) gf_thread_cleanup (this, ev->invoker);
- gfc = (gf_changelog_t *) this->private;
- if (!gfc)
- goto out;
+ (void) pthread_mutex_destroy (&ev->lock);
+ (void) pthread_cond_destroy (&ev->cond);
- tracker_fd = gfc->gfc_fd;
+ ev->entry = NULL;
+}
- size = gf_readline (tracker_fd, buffer, maxlen);
- if (size < 0) {
- size = -1;
- goto out;
+static int
+gf_init_event (gf_changelog_t *entry)
+{
+ int ret = 0;
+ struct gf_event_list *ev = NULL;
+
+ ev = &entry->event;
+ ev->entry = entry;
+
+ ret = pthread_mutex_init (&ev->lock, NULL);
+ if (ret != 0)
+ goto error_return;
+ ret = pthread_cond_init (&ev->cond, NULL);
+ if (ret != 0)
+ goto cleanup_mutex;
+ INIT_LIST_HEAD (&ev->events);
+
+ ev->next_seq = 0; /* bootstrap sequencing */
+
+ if (entry->ordered) {
+ ret = pthread_create (&ev->invoker, NULL,
+ gf_changelog_callback_invoker, ev);
+ if (ret != 0)
+ goto cleanup_cond;
}
- if (size == 0)
- goto out;
-
- memcpy (bufptr, buffer, size - 1);
- bufptr[size - 1] = '\0';
+ return 0;
-out:
- return size;
+ cleanup_cond:
+ (void) pthread_cond_destroy (&ev->cond);
+ cleanup_mutex:
+ (void) pthread_mutex_destroy (&ev->lock);
+ error_return:
+ return -1;
}
/**
- * @API
- * gf_changelog_scan() - scan and generate a list of change entries
- *
- * calling this api multiple times (without calling gf_changlog_done())
- * would result new changelogs(s) being refreshed in the tracker file.
- * This call also acts as a cancellation point for the consumer.
+ * TODO:
+ * - cleanup invoker thread (if ordered mode)
+ * - cleanup event list
+ * - destroy rpc{-clnt, svc}
*/
-ssize_t
-gf_changelog_scan ()
+int
+gf_cleanup_brick_connection (xlator_t *this, gf_changelog_t *entry)
{
- int ret = 0;
- int tracker_fd = 0;
- size_t len = 0;
- size_t off = 0;
- xlator_t *this = NULL;
- size_t nr_entries = 0;
- gf_changelog_t *gfc = NULL;
- struct dirent *entryp = NULL;
- struct dirent *result = NULL;
- char buffer[PATH_MAX] = {0,};
-
- this = THIS;
- if (!this)
- goto out;
+ return 0;
+}
- gfc = (gf_changelog_t *) this->private;
- if (!gfc)
- goto out;
+int
+gf_cleanup_connections (xlator_t *this)
+{
+ return 0;
+}
- /**
- * do we need to protect 'byebye' with locks? worst, the
- * consumer would get notified during next scan().
- */
- if (byebye) {
- errno = ECONNREFUSED;
- goto out;
- }
+static int
+gf_setup_brick_connection (xlator_t *this,
+ struct gf_brick_spec *brick,
+ gf_boolean_t ordered, void *xl)
+{
+ int ret = 0;
+ gf_private_t *priv = NULL;
+ gf_changelog_t *entry = NULL;
- errno = EINVAL;
+ priv = this->private;
- tracker_fd = gfc->gfc_fd;
+ if (!brick->callback || !brick->init || !brick->fini)
+ goto error_return;
- if (gf_ftruncate (tracker_fd, 0))
- goto out;
+ entry = GF_CALLOC (1, sizeof (*entry),
+ gf_changelog_mt_libgfchangelog_t);
+ if (!entry)
+ goto error_return;
+ INIT_LIST_HEAD (&entry->list);
- len = offsetof(struct dirent, d_name)
- + pathconf(gfc->gfc_processing_dir, _PC_NAME_MAX) + 1;
- entryp = GF_CALLOC (1, len,
- gf_changelog_mt_libgfchangelog_dirent_t);
- if (!entryp)
- goto out;
+ entry->notify = brick->filter;
+ (void) strncpy (entry->brick, brick->brick_path, PATH_MAX);
- rewinddir (gfc->gfc_dir);
- while (1) {
- ret = readdir_r (gfc->gfc_dir, entryp, &result);
- if (ret || !result)
- break;
+ entry->this = this;
+ entry->invokerxl = xl;
- if ( !strcmp (basename (entryp->d_name), ".")
- || !strcmp (basename (entryp->d_name), "..") )
- continue;
+ entry->ordered = ordered;
+ if (ordered) {
+ ret = gf_init_event (entry);
+ if (ret)
+ goto free_entry;
+ }
- nr_entries++;
+ entry->fini = brick->fini;
+ entry->callback = brick->callback;
+ entry->connected = brick->connected;
+ entry->disconnected = brick->disconnected;
- GF_CHANGELOG_FILL_BUFFER (gfc->gfc_processing_dir,
- buffer, off,
- strlen (gfc->gfc_processing_dir));
- GF_CHANGELOG_FILL_BUFFER (entryp->d_name, buffer,
- off, strlen (entryp->d_name));
- GF_CHANGELOG_FILL_BUFFER ("\n", buffer, off, 1);
+ entry->ptr = brick->init (this, brick);
+ if (!entry->ptr)
+ goto cleanup_event;
+ priv->api = entry->ptr; /* pointer to API, if required */
- if (gf_changelog_write (tracker_fd, buffer, off) != off) {
- gf_log (this->name, GF_LOG_ERROR,
- "error writing changelog filename"
- " to tracker file");
- break;
- }
- off = 0;
+ LOCK (&priv->lock);
+ {
+ list_add_tail (&entry->list, &priv->connections);
}
+ UNLOCK (&priv->lock);
- GF_FREE (entryp);
+ ret = gf_changelog_setup_rpc (this, entry, CHANGELOG_RPC_PROBE_FILTER);
+ if (ret)
+ goto cleanup_event;
+ return 0;
- if (!result) {
- if (gf_lseek (tracker_fd, 0, SEEK_SET) != -1)
- return nr_entries;
- }
- out:
+ cleanup_event:
+ if (ordered)
+ gf_cleanup_event (entry);
+ free_entry:
+ list_del (&entry->list); /* FIXME: kludge for now */
+ GF_FREE (entry);
+ error_return:
return -1;
}
-/**
- * @API
- * gf_changelog_register() - register a client for updates.
- */
int
-gf_changelog_register (char *brick_path, char *scratch_dir,
- char *log_file, int log_level, int max_reconnects)
+gf_changelog_register_brick (xlator_t *this,
+ struct gf_brick_spec *brick,
+ gf_boolean_t ordered, void *xl)
{
- int i = 0;
- int ret = -1;
- int errn = 0;
- xlator_t *this = NULL;
- gf_changelog_t *gfc = NULL;
- char hist_scratch_dir[PATH_MAX] = {0,};
- struct stat buf = {0,};
-
- this = THIS;
- if (!this->ctx)
- goto out;
-
- errno = ENOMEM;
-
- gfc = GF_CALLOC (1, sizeof (*gfc),
- gf_changelog_mt_libgfchangelog_t);
- if (!gfc)
- goto out;
-
- gfc->this = this;
-
- gfc->gfc_dir = NULL;
- gfc->gfc_fd = gfc->gfc_sockfd = -1;
-
- if (stat (scratch_dir, &buf) && errno == ENOENT) {
- ret = mkdir_p (scratch_dir, 0600, _gf_true);
- if (ret) {
- errn = errno;
- goto cleanup;
- }
- }
-
- gfc->gfc_working_dir = realpath (scratch_dir, NULL);
- if (!gfc->gfc_working_dir) {
- errn = errno;
- goto cleanup;
- }
+ return gf_setup_brick_connection (this, brick, ordered, xl);
+}
- /* Begin: Changes for History API */
- gfc->hist_gfc = NULL;
+static int
+gf_changelog_setup_logging (xlator_t *this, char *logfile, int loglevel)
+{
+ /* passing ident as NULL means to use default ident for syslog */
+ if (gf_log_init (this->ctx, logfile, NULL))
+ return -1;
- gfc->hist_gfc = GF_CALLOC (1, sizeof (*gfc),
- gf_changelog_mt_libgfchangelog_t);
- if (!gfc->hist_gfc)
- goto cleanup;
+ gf_log_set_loglevel ((loglevel == -1) ? GF_LOG_INFO :
+ loglevel);
+ return 0;
+}
- gfc->hist_gfc->gfc_dir = NULL;
- gfc->hist_gfc->gfc_fd = gfc->hist_gfc->gfc_sockfd = -1;
- gfc->hist_gfc->this = NULL;
+int
+gf_changelog_register_generic (struct gf_brick_spec *bricks, int count,
+ int ordered, char *logfile, int lvl, void *xl)
+{
+ int ret = 0;
+ xlator_t *this = NULL;
+ xlator_t *old_this = NULL;
+ struct gf_brick_spec *brick = NULL;
+ gf_boolean_t need_order = _gf_false;
- (void) strncpy (hist_scratch_dir, scratch_dir, PATH_MAX);
- (void) snprintf (hist_scratch_dir, PATH_MAX,
- "%s/"GF_CHANGELOG_HISTORY_DIR"/",
- gfc->gfc_working_dir);
+ SAVE_THIS (xl);
- ret = mkdir_p (hist_scratch_dir, 0600, _gf_false);
- if (ret) {
- errn = errno;
- goto cleanup;
- }
+ this = THIS;
+ if (!this)
+ goto error_return;
- gfc->hist_gfc->gfc_working_dir = realpath (hist_scratch_dir, NULL);
- if (!gfc->hist_gfc->gfc_working_dir) {
- errn = errno;
- goto cleanup;
- }
+ ret = gf_changelog_setup_logging (this, logfile, lvl);
+ if (ret)
+ goto error_return;
- ret = gf_changelog_open_dirs (gfc->hist_gfc);
- if (ret) {
- errn = errno;
- gf_log (this->name, GF_LOG_ERROR,
- "could not create entries in history scratch dir");
- goto cleanup;
- }
+ need_order = (ordered) ? _gf_true : _gf_false;
- (void) strncpy (gfc->hist_gfc->gfc_brickpath, brick_path, PATH_MAX);
+ brick = bricks;
+ while (count--) {
+ gf_log (this->name, GF_LOG_INFO,
+ "Registering brick: %s [notify filter: %d]",
+ brick->brick_path, brick->filter);
- for (i=0; i < 256; i++) {
- gfc->hist_gfc->rfc3986[i] =
- (isalnum(i) || i == '~' ||
- i == '-' || i == '.' || i == '_') ? i : 0;
- }
- /* End: Changes for History API*/
+ ret = gf_changelog_register_brick (this, brick, need_order, xl);
+ if (ret != 0) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "Error registering with changelog xlator");
+ break;
+ }
- ret = gf_changelog_open_dirs (gfc);
- if (ret) {
- errn = errno;
- gf_log (this->name, GF_LOG_ERROR,
- "could not create entries in scratch dir");
- goto cleanup;
+ brick++;
}
- /* passing ident as NULL means to use default ident for syslog */
- if (gf_log_init (this->ctx, log_file, NULL))
- goto cleanup;
-
- gf_log_set_loglevel ((log_level == -1) ? GF_LOG_INFO :
- log_level);
+ if (ret != 0)
+ goto cleanup_inited_bricks;
- gfc->gfc_connretries = (max_reconnects <= 0) ? 1 : max_reconnects;
- (void) strncpy (gfc->gfc_brickpath, brick_path, PATH_MAX);
+ RESTORE_THIS();
+ return 0;
- ret = gf_changelog_notification_init (this, gfc);
- if (ret) {
- errn = errno;
- goto cleanup;
- }
+ cleanup_inited_bricks:
+ gf_cleanup_connections (this);
+ error_return:
+ RESTORE_THIS();
+ return -1;
+}
- ret = gf_thread_create (&gfc->gfc_changelog_processor,
- NULL, gf_changelog_process, gfc);
- if (ret) {
- errn = errno;
- gf_log (this->name, GF_LOG_ERROR,
- "error creating changelog processor thread"
- " new changes won't be recorded!!!");
- goto cleanup;
- }
+/**
+ * @API
+ * gf_changelog_register()
+ *
+ * This is _NOT_ a generic register API. It's a special API to handle
+ * updates at a journal granulality. This is used by consumers wanting
+ * to process persistent journal such as geo-replication via a set of
+ * APIs. All of this is required to maintain backward compatibility.
+ * Owner specific private data is stored in ->api (in gf_private_t),
+ * which is used by APIs to access it's private data. This limits
+ * the library access to a single brick, but that's how it used to
+ * be anyway. Furthermore, this API solely _owns_ "this", therefore
+ * callers already having a notion of "this" are expected to use the
+ * newer API.
+ *
+ * Newer applications wanting to use this library need not face this
+ * limitation and reply of the much more feature rich generic register
+ * API, which is purely callback based.
+ *
+ * NOTE: @max_reconnects is not used but required for backward compat.
+ *
+ * For generic API, refer gf_changelog_register_generic().
+ */
+int
+gf_changelog_register (char *brick_path, char *scratch_dir,
+ char *log_file, int log_level, int max_reconnects)
+{
+ struct gf_brick_spec brick = {0,};
- for (i=0; i < 256; i++) {
- gfc->rfc3986[i] =
- (isalnum(i) || i == '~' ||
- i == '-' || i == '.' || i == '_') ? i : 0;
- }
+ THIS = master;
- ret = 0;
- this->private = gfc;
+ brick.brick_path = brick_path;
+ brick.filter = CHANGELOG_OP_TYPE_JOURNAL;
- goto out;
+ brick.init = gf_changelog_journal_init;
+ brick.fini = gf_changelog_journal_fini;
+ brick.callback = gf_changelog_handle_journal;
+ brick.connected = gf_changelog_journal_connect;
+ brick.disconnected = gf_changelog_journal_disconnect;
- cleanup:
- if (gfc->hist_gfc) {
- gf_changelog_cleanup (gfc->hist_gfc);
- GF_FREE (gfc->hist_gfc);
- }
- gf_changelog_cleanup (gfc);
- GF_FREE (gfc);
- this->private = NULL;
- errno = errn;
+ brick.ptr = scratch_dir;
- out:
- return ret;
+ return gf_changelog_register_generic (&brick, 1, 1,
+ log_file, log_level, NULL);
}
diff --git a/xlators/features/changelog/lib/src/gf-history-changelog.c b/xlators/features/changelog/lib/src/gf-history-changelog.c
index 8a527dd6e4b..12f51da8fa2 100644
--- a/xlators/features/changelog/lib/src/gf-history-changelog.c
+++ b/xlators/features/changelog/lib/src/gf-history-changelog.c
@@ -14,6 +14,7 @@
#include "syscall.h"
#include "gf-changelog-helpers.h"
+#include "gf-changelog-journal.h"
/* from the changelog translator */
#include "changelog-misc.h"
@@ -36,12 +37,12 @@
int
gf_history_changelog_done (char *file)
{
- int ret = -1;
- char *buffer = NULL;
- xlator_t *this = NULL;
- gf_changelog_t *gfc = NULL;
- gf_changelog_t *hist_gfc = NULL;
- char to_path[PATH_MAX] = {0,};
+ int ret = -1;
+ char *buffer = NULL;
+ xlator_t *this = NULL;
+ gf_changelog_journal_t *jnl = NULL;
+ gf_changelog_journal_t *hist_jnl = NULL;
+ char to_path[PATH_MAX] = {0,};
errno = EINVAL;
@@ -49,28 +50,28 @@ gf_history_changelog_done (char *file)
if (!this)
goto out;
- gfc = (gf_changelog_t *) this->private;
- if (!gfc)
+ jnl = (gf_changelog_journal_t *) GF_CHANGELOG_GET_API_PTR (this);
+ if (!jnl)
goto out;
- hist_gfc = gfc->hist_gfc;
- if (!hist_gfc)
+ hist_jnl = jnl->hist_jnl;
+ if (!hist_jnl)
goto out;
if (!file || !strlen (file))
goto out;
- /* make sure 'file' is inside ->gfc_working_dir */
+ /* make sure 'file' is inside ->jnl_working_dir */
buffer = realpath (file, NULL);
if (!buffer)
goto out;
- if (strncmp (hist_gfc->gfc_working_dir,
- buffer, strlen (hist_gfc->gfc_working_dir)))
+ if (strncmp (hist_jnl->jnl_working_dir,
+ buffer, strlen (hist_jnl->jnl_working_dir)))
goto out;
(void) snprintf (to_path, PATH_MAX, "%s%s",
- hist_gfc->gfc_processed_dir, basename (buffer));
+ hist_jnl->jnl_processed_dir, basename (buffer));
gf_log (this->name, GF_LOG_DEBUG,
"moving %s to processed directory", file);
ret = rename (buffer, to_path);
@@ -102,9 +103,9 @@ gf_history_changelog_done (char *file)
int
gf_history_changelog_start_fresh ()
{
- xlator_t *this = NULL;
- gf_changelog_t *gfc = NULL;
- gf_changelog_t *hist_gfc = NULL;
+ xlator_t *this = NULL;
+ gf_changelog_journal_t *jnl = NULL;
+ gf_changelog_journal_t *hist_jnl = NULL;
this = THIS;
if (!this)
@@ -112,15 +113,15 @@ gf_history_changelog_start_fresh ()
errno = EINVAL;
- gfc = (gf_changelog_t *) this->private;
- if (!gfc)
+ jnl = (gf_changelog_journal_t *) GF_CHANGELOG_GET_API_PTR (this);
+ if (!jnl)
goto out;
- hist_gfc = gfc->hist_gfc;
- if (!hist_gfc)
+ hist_jnl = jnl->hist_jnl;
+ if (!hist_jnl)
goto out;
- if (gf_ftruncate (hist_gfc->gfc_fd, 0))
+ if (gf_ftruncate (hist_jnl->jnl_fd, 0))
goto out;
return 0;
@@ -147,12 +148,17 @@ gf_history_changelog_start_fresh ()
ssize_t
gf_history_changelog_next_change (char *bufptr, size_t maxlen)
{
- ssize_t size = -1;
- int tracker_fd = 0;
- xlator_t *this = NULL;
- gf_changelog_t *gfc = NULL;
- gf_changelog_t *hist_gfc = NULL;
- char buffer[PATH_MAX] = {0,};
+ ssize_t size = -1;
+ int tracker_fd = 0;
+ xlator_t *this = NULL;
+ gf_changelog_journal_t *jnl = NULL;
+ gf_changelog_journal_t *hist_jnl = NULL;
+ char buffer[PATH_MAX] = {0,};
+
+ if (maxlen > PATH_MAX) {
+ errno = ENAMETOOLONG;
+ goto out;
+ }
errno = EINVAL;
@@ -160,15 +166,15 @@ gf_history_changelog_next_change (char *bufptr, size_t maxlen)
if (!this)
goto out;
- gfc = (gf_changelog_t *) this->private;
- if (!gfc)
+ jnl = (gf_changelog_journal_t *) GF_CHANGELOG_GET_API_PTR (this);
+ if (!jnl)
goto out;
- hist_gfc = gfc->hist_gfc;
- if (!hist_gfc)
+ hist_jnl = jnl->hist_jnl;
+ if (!hist_jnl)
goto out;
- tracker_fd = hist_gfc->gfc_fd;
+ tracker_fd = hist_jnl->jnl_fd;
size = gf_readline (tracker_fd, buffer, maxlen);
if (size < 0) {
@@ -206,56 +212,60 @@ out:
ssize_t
gf_history_changelog_scan ()
{
- int ret = 0;
- int tracker_fd = 0;
- size_t len = 0;
- size_t off = 0;
- xlator_t *this = NULL;
- size_t nr_entries = 0;
- gf_changelog_t *gfc = NULL;
- gf_changelog_t *hist_gfc = NULL;
- struct dirent *entryp = NULL;
- struct dirent *result = NULL;
- char buffer[PATH_MAX] = {0,};
- static int is_last_scan = 0;
+ int ret = 0;
+ int tracker_fd = 0;
+ size_t len = 0;
+ size_t off = 0;
+ xlator_t *this = NULL;
+ size_t nr_entries = 0;
+ gf_changelog_journal_t *jnl = NULL;
+ gf_changelog_journal_t *hist_jnl = NULL;
+ struct dirent *entryp = NULL;
+ struct dirent *result = NULL;
+ char buffer[PATH_MAX] = {0,};
+ static int is_last_scan;
this = THIS;
if (!this)
goto out;
- gfc = (gf_changelog_t *) this->private;
- if (!gfc)
+ jnl = (gf_changelog_journal_t *) GF_CHANGELOG_GET_API_PTR (this);
+ if (!jnl)
goto out;
+ if (JNL_IS_API_DISCONNECTED (jnl)) {
+ errno = ENOTCONN;
+ goto out;
+ }
- hist_gfc = gfc->hist_gfc;
- if (!hist_gfc)
+ hist_jnl = jnl->hist_jnl;
+ if (!hist_jnl)
goto out;
retry:
if (is_last_scan == 1)
return 0;
- if (hist_gfc->hist_done == 0)
+ if (hist_jnl->hist_done == 0)
is_last_scan = 1;
errno = EINVAL;
- if (hist_gfc->hist_done == -1)
+ if (hist_jnl->hist_done == -1)
goto out;
- tracker_fd = hist_gfc->gfc_fd;
+ tracker_fd = hist_jnl->jnl_fd;
if (gf_ftruncate (tracker_fd, 0))
goto out;
len = offsetof (struct dirent, d_name)
- + pathconf (hist_gfc->gfc_processing_dir, _PC_NAME_MAX) + 1;
+ + pathconf (hist_jnl->jnl_processing_dir, _PC_NAME_MAX) + 1;
entryp = GF_CALLOC (1, len,
gf_changelog_mt_libgfchangelog_dirent_t);
if (!entryp)
goto out;
- rewinddir (hist_gfc->gfc_dir);
+ rewinddir (hist_jnl->jnl_dir);
while (1) {
- ret = readdir_r (hist_gfc->gfc_dir, entryp, &result);
+ ret = readdir_r (hist_jnl->jnl_dir, entryp, &result);
if (ret || !result)
break;
@@ -265,9 +275,9 @@ gf_history_changelog_scan ()
nr_entries++;
- GF_CHANGELOG_FILL_BUFFER (hist_gfc->gfc_processing_dir,
+ GF_CHANGELOG_FILL_BUFFER (hist_jnl->jnl_processing_dir,
buffer, off,
- strlen (hist_gfc->gfc_processing_dir));
+ strlen (hist_jnl->jnl_processing_dir));
GF_CHANGELOG_FILL_BUFFER (entryp->d_name, buffer,
off, strlen (entryp->d_name));
GF_CHANGELOG_FILL_BUFFER ("\n", buffer, off, 1);
@@ -284,7 +294,8 @@ gf_history_changelog_scan ()
GF_FREE (entryp);
gf_log (this->name, GF_LOG_DEBUG,
- "hist_done %d, is_last_scan: %d", hist_gfc->hist_done, is_last_scan);
+ "hist_done %d, is_last_scan: %d", hist_jnl->hist_done,
+ is_last_scan);
if (!result) {
if (gf_lseek (tracker_fd, 0, SEEK_SET) != -1) {
@@ -490,7 +501,7 @@ gf_changelog_consume_wrap (void* data)
/* TODO: handle short reads and EOF. */
ret = gf_changelog_consume (ccd->this,
- ccd->gfc, ccd->changelog, _gf_true);
+ ccd->jnl, ccd->changelog, _gf_true);
if (ret) {
gf_log (this->name, GF_LOG_ERROR,
"could not parse changelog: %s", ccd->changelog);
@@ -515,8 +526,8 @@ void *
gf_history_consume (void * data)
{
xlator_t *this = NULL;
- gf_changelog_t *gfc = NULL;
- gf_changelog_t *hist_gfc = NULL;
+ gf_changelog_journal_t *jnl = NULL;
+ gf_changelog_journal_t *hist_jnl = NULL;
int ret = 0;
int iter = 0;
int fd = -1;
@@ -549,14 +560,14 @@ gf_history_consume (void * data)
goto out;
}
- gfc = (gf_changelog_t *) this->private;
- if (!gfc) {
+ jnl = (gf_changelog_journal_t *) GF_CHANGELOG_GET_API_PTR (this);
+ if (!jnl) {
ret = -1;
goto out;
}
- hist_gfc = gfc->hist_gfc;
- if (!hist_gfc) {
+ hist_jnl = jnl->hist_jnl;
+ if (!hist_jnl) {
ret = -1;
goto out;
}
@@ -568,7 +579,7 @@ gf_history_consume (void * data)
curr = &ccd[iter];
curr->this = this;
- curr->gfc = hist_gfc;
+ curr->jnl = hist_jnl;
curr->fd = fd;
curr->offset = from * (len + 1);
@@ -613,7 +624,7 @@ gf_history_consume (void * data)
}
ret = gf_changelog_publish (curr->this,
- curr->gfc, curr->changelog);
+ curr->jnl, curr->changelog);
if (ret) {
publish = _gf_false;
gf_log (this->name, GF_LOG_ERROR,
@@ -623,7 +634,7 @@ gf_history_consume (void * data)
}
/* informing "parsing done". */
- hist_gfc->hist_done = (publish == _gf_true) ? 0 : -1;
+ hist_jnl->hist_done = (publish == _gf_true) ? 0 : -1;
out:
if (fd != -1)
@@ -740,8 +751,8 @@ gf_history_changelog (char* changelog_dir, unsigned long start,
unsigned long from = 0;
unsigned long total_changelog = 0;
xlator_t *this = NULL;
- gf_changelog_t *gfc = NULL;
- gf_changelog_t *hist_gfc = NULL;
+ gf_changelog_journal_t *jnl = NULL;
+ gf_changelog_journal_t *hist_jnl = NULL;
gf_changelog_history_data_t *hist_data = NULL;
DIR *dirp = NULL;
struct dirent *dp = NULL;
@@ -762,14 +773,14 @@ gf_history_changelog (char* changelog_dir, unsigned long start,
goto out;
}
- gfc = (gf_changelog_t *) this->private;
- if (!gfc) {
+ jnl = (gf_changelog_journal_t *) GF_CHANGELOG_GET_API_PTR (this);
+ if (!jnl) {
ret = -1;
goto out;
}
- hist_gfc = (gf_changelog_t *) gfc->hist_gfc;
- if (!hist_gfc) {
+ hist_jnl = (gf_changelog_journal_t *) jnl->hist_jnl;
+ if (!hist_jnl) {
ret = -1;
goto out;
}
@@ -917,7 +928,7 @@ out:
return ret;
}
- hist_gfc->hist_done = 1;
+ hist_jnl->hist_done = 1;
*actual_end = ts2;
return ret;
diff --git a/xlators/features/changelog/src/Makefile.am b/xlators/features/changelog/src/Makefile.am
index 18c41e7d7d1..8712b9d059f 100644
--- a/xlators/features/changelog/src/Makefile.am
+++ b/xlators/features/changelog/src/Makefile.am
@@ -3,16 +3,24 @@ xlator_LTLIBRARIES = changelog.la
xlatordir = $(libdir)/glusterfs/$(PACKAGE_VERSION)/xlator/features
noinst_HEADERS = changelog-helpers.h changelog-mem-types.h changelog-rt.h \
- changelog-misc.h changelog-encoders.h changelog-notifier.h
+ changelog-rpc-common.h changelog-misc.h changelog-encoders.h \
+ changelog-rpc-common.h changelog-rpc.h changelog-ev-handle.h
changelog_la_LDFLAGS = -module -avoid-version
changelog_la_SOURCES = changelog.c changelog-rt.c changelog-helpers.c \
- changelog-encoders.c changelog-notifier.c changelog-barrier.c
-changelog_la_LIBADD = $(top_builddir)/libglusterfs/src/libglusterfs.la
+ changelog-encoders.c changelog-rpc.c changelog-barrier.c \
+ changelog-rpc-common.c changelog-ev-handle.c
+changelog_la_LIBADD = $(top_builddir)/libglusterfs/src/libglusterfs.la \
+ $(top_builddir)/rpc/xdr/src/libgfxdr.la \
+ $(top_builddir)/rpc/rpc-lib/src/libgfrpc.la
-AM_CPPFLAGS = $(GF_CPPFLAGS) -I$(top_srcdir)/libglusterfs/src -fPIC -D_FILE_OFFSET_BITS=64 \
- -D_GNU_SOURCE -D$(GF_HOST_OS) -DDATADIR=\"$(localstatedir)\"
+AM_CPPFLAGS = $(GF_CPPFLAGS) -I$(top_srcdir)/libglusterfs/src \
+ -I$(top_srcdir)/rpc/xdr/src -I$(top_srcdir)/rpc/rpc-lib/src \
+ -I$(top_srcdir)/rpc/rpc-transport/socket/src \
+ -I$(top_srcdir)/xlators/features/changelog/lib/src/ \
+ -fPIC -D_FILE_OFFSET_BITS=64 -D_GNU_SOURCE -D$(GF_HOST_OS) \
+ -DDATADIR=\"$(localstatedir)\"
AM_CFLAGS = -Wall $(GF_CFLAGS)
diff --git a/xlators/features/changelog/src/changelog-encoders.c b/xlators/features/changelog/src/changelog-encoders.c
index 08626ee2f22..ea9db4061ca 100644
--- a/xlators/features/changelog/src/changelog-encoders.c
+++ b/xlators/features/changelog/src/changelog-encoders.c
@@ -191,7 +191,7 @@ cb_encoder[] = {
};
void
-changelog_encode_change( changelog_priv_t * priv)
+changelog_encode_change(changelog_priv_t *priv)
{
priv->ce = &cb_encoder[priv->encode_mode];
}
diff --git a/xlators/features/changelog/src/changelog-ev-handle.c b/xlators/features/changelog/src/changelog-ev-handle.c
new file mode 100644
index 00000000000..ca7443cfd22
--- /dev/null
+++ b/xlators/features/changelog/src/changelog-ev-handle.c
@@ -0,0 +1,382 @@
+/*
+ Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com>
+ This file is part of GlusterFS.
+
+ This file is licensed to you under your choice of the GNU Lesser
+ General Public License, version 3 or any later version (LGPLv3 or
+ later), or the GNU General Public License, version 2 (GPLv2), in all
+ cases as published by the Free Software Foundation.
+*/
+
+#include "changelog-ev-handle.h"
+#include "changelog-rpc-common.h"
+#include "changelog-helpers.h"
+
+struct rpc_clnt_program changelog_ev_program;
+
+#define NR_IOVEC (MAX_IOVEC - 3)
+struct ev_rpc_vec {
+ int count;
+ struct iovec vector[NR_IOVEC];
+
+ /* sequence number */
+ unsigned long seq;
+};
+
+struct ev_rpc {
+ rbuf_list_t *rlist;
+ struct rpc_clnt *rpc;
+ struct ev_rpc_vec vec;
+};
+
+/**
+ * As of now this just does the minimal (retval logging). Going further
+ * un-acknowledges sequence numbers can be retransmitted and other
+ * intelligence can be built into the server.
+ */
+int
+changelog_event_dispatch_cbk (struct rpc_req *req,
+ struct iovec *iov, int count, void *myframe)
+{
+ return 0;
+}
+
+/* dispatcher RPC */
+inline int
+changelog_dispatch_vec (call_frame_t *frame, xlator_t *this,
+ struct rpc_clnt *rpc, struct ev_rpc_vec *vec)
+{
+ struct timeval tv = {0,};
+ changelog_event_req req = {0,};
+
+ (void) gettimeofday (&tv, NULL);
+
+ /**
+ * Event dispatch RPC header contains a sequence number for each
+ * dispatch. This allows the reciever to order the request before
+ * processing.
+ */
+ req.seq = vec->seq;
+ req.tv_sec = tv.tv_sec;
+ req.tv_usec = tv.tv_usec;
+
+ return changelog_rpc_sumbit_req (rpc, (void *)&req,
+ frame, &changelog_ev_program,
+ CHANGELOG_REV_PROC_EVENT,
+ vec->vector, vec->count, NULL,
+ this, changelog_event_dispatch_cbk,
+ (xdrproc_t) xdr_changelog_event_req);
+ }
+
+ int
+ changelog_event_dispatch_rpc (call_frame_t *frame, xlator_t *this, void *data)
+ {
+ int idx = 0;
+ int count = 0;
+ int ret = 0;
+ unsigned long range = 0;
+ unsigned long sequence = 0;
+ rbuf_iovec_t *rvec = NULL;
+ struct ev_rpc *erpc = NULL;
+ struct rlist_iter riter = {{0,},};
+
+ /* dispatch NR_IOVEC IO vectors at a time. */
+
+ erpc = data;
+ RLIST_GET_SEQ (erpc->rlist, sequence, range);
+
+ rlist_iter_init (&riter, erpc->rlist);
+
+ rvec_for_each_entry (rvec, &riter) {
+ idx = count % NR_IOVEC;
+ if (++count == NR_IOVEC) {
+ erpc->vec.vector[idx] = rvec->iov;
+ erpc->vec.seq = sequence++;
+ erpc->vec.count = NR_IOVEC;
+
+ ret = changelog_dispatch_vec (frame, this,
+ erpc->rpc, &erpc->vec);
+ if (ret)
+ break;
+ count = 0;
+ continue;
+ }
+
+ erpc->vec.vector[idx] = rvec->iov;
+ }
+
+ if (ret)
+ goto error_return;
+
+ idx = count % NR_IOVEC;
+ if (idx) {
+ erpc->vec.seq = sequence;
+ erpc->vec.count = idx;
+
+ ret = changelog_dispatch_vec (frame, this,
+ erpc->rpc, &erpc->vec);
+ }
+
+ error_return:
+ return ret;
+}
+
+int
+changelog_rpc_notify (struct rpc_clnt *rpc,
+ void *mydata, rpc_clnt_event_t event, void *data)
+{
+ xlator_t *this = NULL;
+ changelog_rpc_clnt_t *crpc = NULL;
+ changelog_clnt_t *c_clnt = NULL;
+ changelog_priv_t *priv = NULL;
+ changelog_ev_selector_t *selection = NULL;
+
+ crpc = mydata;
+ this = crpc->this;
+ c_clnt = crpc->c_clnt;
+
+ priv = this->private;
+
+ switch (event) {
+ case RPC_CLNT_CONNECT:
+ rpc_clnt_set_connected (&rpc->conn);
+ selection = &priv->ev_selection;
+
+ LOCK (&c_clnt->wait_lock);
+ {
+ LOCK (&c_clnt->active_lock);
+ {
+ changelog_select_event (this, selection,
+ crpc->filter);
+ list_move_tail (&crpc->list, &c_clnt->active);
+ }
+ UNLOCK (&c_clnt->active_lock);
+ }
+ UNLOCK (&c_clnt->wait_lock);
+
+ break;
+ case RPC_CLNT_DISCONNECT:
+ rpc_clnt_disable (crpc->rpc);
+ selection = &priv->ev_selection;
+
+ LOCK (&crpc->lock);
+ {
+ changelog_deselect_event (this, selection,
+ crpc->filter);
+ changelog_set_disconnect_flag (crpc, _gf_true);
+ }
+ UNLOCK (&crpc->lock);
+
+ break;
+ case RPC_CLNT_MSG:
+ case RPC_CLNT_DESTROY:
+ break;
+ }
+
+ return 0;
+}
+
+void *
+changelog_ev_connector (void *data)
+{
+ xlator_t *this = NULL;
+ changelog_clnt_t *c_clnt = NULL;
+ changelog_rpc_clnt_t *crpc = NULL;
+
+ c_clnt = data;
+ this = c_clnt->this;
+
+ while (1) {
+ pthread_mutex_lock (&c_clnt->pending_lock);
+ {
+ while (list_empty (&c_clnt->pending))
+ pthread_cond_wait (&c_clnt->pending_cond,
+ &c_clnt->pending_lock);
+ crpc = list_first_entry (&c_clnt->pending,
+ changelog_rpc_clnt_t, list);
+ crpc->rpc =
+ changelog_rpc_client_init (this, crpc,
+ crpc->sock,
+ changelog_rpc_notify);
+ if (!crpc->rpc) {
+ gf_log (this->name, GF_LOG_ERROR, "failed to "
+ "connect back.. <%s>", crpc->sock);
+ crpc->cleanup (crpc);
+ goto mutex_unlock;
+ }
+
+ LOCK (&c_clnt->wait_lock);
+ {
+ list_move_tail (&crpc->list, &c_clnt->waitq);
+ }
+ UNLOCK (&c_clnt->wait_lock);
+ }
+ mutex_unlock:
+ pthread_mutex_unlock (&c_clnt->pending_lock);
+ }
+
+ return NULL;
+}
+
+void
+changelog_ev_cleanup_connections (xlator_t *this, changelog_clnt_t *c_clnt)
+{
+ int ret = 0;
+ changelog_rpc_clnt_t *crpc = NULL;
+
+ /* cleanup active connections */
+ LOCK (&c_clnt->active_lock);
+ {
+ list_for_each_entry (crpc, &c_clnt->active, list) {
+ rpc_clnt_disable (crpc->rpc);
+ }
+ }
+ UNLOCK (&c_clnt->active_lock);
+}
+
+/**
+ * TODO: granularize lock
+ *
+ * If we have multiple threads dispatching events, doing it this way is
+ * a performance bottleneck.
+ */
+
+static inline changelog_rpc_clnt_t *
+get_client (changelog_clnt_t *c_clnt, struct list_head **next)
+{
+ changelog_rpc_clnt_t *crpc = NULL;
+
+ LOCK (&c_clnt->active_lock);
+ {
+ if (*next == &c_clnt->active)
+ goto unblock;
+ crpc = list_entry (*next, changelog_rpc_clnt_t, list);
+ changelog_rpc_clnt_ref (crpc);
+ *next = (*next)->next;
+ }
+ unblock:
+ UNLOCK (&c_clnt->active_lock);
+
+ return crpc;
+}
+
+static inline void
+put_client (changelog_clnt_t *c_clnt, changelog_rpc_clnt_t *crpc)
+{
+ LOCK (&c_clnt->active_lock);
+ {
+ changelog_rpc_clnt_unref (crpc);
+ }
+ UNLOCK (&c_clnt->active_lock);
+}
+
+void
+_dispatcher (rbuf_list_t *rlist, void *arg)
+{
+ int ret = 0;
+ xlator_t *this = NULL;
+ changelog_clnt_t *c_clnt = NULL;
+ changelog_rpc_clnt_t *crpc = NULL;
+ changelog_rpc_clnt_t *tmp = NULL;
+ struct ev_rpc erpc = {0,};
+ struct list_head *next = NULL;
+
+ c_clnt = arg;
+ this = c_clnt->this;
+
+ erpc.rlist = rlist;
+ next = c_clnt->active.next;
+
+ while (1) {
+ crpc = get_client (c_clnt, &next);
+ if (!crpc)
+ break;
+ erpc.rpc = crpc->rpc;
+ ret = changelog_invoke_rpc (this, crpc->rpc,
+ &changelog_ev_program,
+ CHANGELOG_REV_PROC_EVENT, &erpc);
+ put_client (c_clnt, crpc);
+ }
+}
+
+/** this is called under rotbuff's lock */
+void
+sequencer (rbuf_list_t *rlist, void *mydata)
+{
+ unsigned long range = 0;
+ changelog_clnt_t *c_clnt = 0;
+
+ c_clnt = mydata;
+
+ range = (RLIST_ENTRY_COUNT (rlist)) / NR_IOVEC;
+ if ((RLIST_ENTRY_COUNT (rlist)) % NR_IOVEC)
+ range++;
+ RLIST_STORE_SEQ (rlist, c_clnt->sequence, range);
+
+ c_clnt->sequence += range;
+}
+
+void *
+changelog_ev_dispatch (void *data)
+{
+ int ret = 0;
+ void *opaque = NULL;
+ xlator_t *this = NULL;
+ changelog_clnt_t *c_clnt = NULL;
+ struct timeval tv = {0,};
+
+ c_clnt = data;
+ this = c_clnt->this;
+
+ while (1) {
+ /* TODO: change this to be pthread cond based.. later */
+ tv.tv_sec = 1;
+ tv.tv_usec = 0;
+ select (0, NULL, NULL, NULL, &tv);
+
+ ret = rbuf_get_buffer (c_clnt->rbuf,
+ &opaque, sequencer, c_clnt);
+ if (ret != RBUF_CONSUMABLE) {
+ if (ret != RBUF_EMPTY)
+ gf_log (this->name, GF_LOG_WARNING,
+ "Failed to get buffer for RPC dispatch "
+ "[rbuf retval: %d]", ret);
+ continue;
+ }
+
+ ret = rbuf_wait_for_completion (c_clnt->rbuf,
+ opaque, _dispatcher, c_clnt);
+ if (ret)
+ gf_log (this->name, GF_LOG_WARNING,
+ "failed to put buffer after consumption");
+ }
+
+ return NULL;
+}
+
+void
+changelog_ev_queue_connection (changelog_clnt_t *c_clnt,
+ changelog_rpc_clnt_t *crpc)
+{
+ pthread_mutex_lock (&c_clnt->pending_lock);
+ {
+ list_add_tail (&crpc->list, &c_clnt->pending);
+ pthread_cond_signal (&c_clnt->pending_cond);
+ }
+ pthread_mutex_unlock (&c_clnt->pending_lock);
+}
+
+struct rpc_clnt_procedure changelog_ev_procs[CHANGELOG_REV_PROC_MAX] = {
+ [CHANGELOG_REV_PROC_NULL] = {"NULL", NULL},
+ [CHANGELOG_REV_PROC_EVENT] = {
+ "EVENT DISPATCH", changelog_event_dispatch_rpc
+ },
+};
+
+struct rpc_clnt_program changelog_ev_program = {
+ .progname = "CHANGELOG EVENT DISPATCHER",
+ .prognum = CHANGELOG_REV_RPC_PROCNUM,
+ .progver = CHANGELOG_REV_RPC_PROCVER,
+ .numproc = CHANGELOG_REV_PROC_MAX,
+ .proctable = changelog_ev_procs,
+};
diff --git a/xlators/features/changelog/src/changelog-ev-handle.h b/xlators/features/changelog/src/changelog-ev-handle.h
new file mode 100644
index 00000000000..eef0492a9ee
--- /dev/null
+++ b/xlators/features/changelog/src/changelog-ev-handle.h
@@ -0,0 +1,140 @@
+/*
+ Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com>
+ This file is part of GlusterFS.
+
+ This file is licensed to you under your choice of the GNU Lesser
+ General Public License, version 3 or any later version (LGPLv3 or
+ later), or the GNU General Public License, version 2 (GPLv2), in all
+ cases as published by the Free Software Foundation.
+*/
+
+#ifndef __CHANGELOG_EV_HANDLE_H
+#define __CHANGELOG_EV_HANDLE_H
+
+#include "list.h"
+#include "xlator.h"
+#include "rpc-clnt.h"
+
+#include "rot-buffs.h"
+
+struct changelog_clnt;
+
+typedef struct changelog_rpc_clnt {
+ xlator_t *this;
+
+ gf_lock_t lock;
+
+ unsigned long ref;
+ gf_boolean_t disconnected;
+
+ unsigned int filter;
+ char sock[UNIX_PATH_MAX];
+
+ struct changelog_clnt *c_clnt; /* back pointer to list holder */
+
+ struct rpc_clnt *rpc; /* RPC client endpoint */
+
+ struct list_head list; /* ->pending, ->waitq, ->active */
+
+ void (*cleanup)
+ (struct changelog_rpc_clnt *); /* cleanup handler */
+} changelog_rpc_clnt_t;
+
+static inline void
+changelog_rpc_clnt_ref (changelog_rpc_clnt_t *crpc)
+{
+ LOCK (&crpc->lock);
+ {
+ ++crpc->ref;
+ }
+ UNLOCK (&crpc->lock);
+}
+
+static inline void
+changelog_set_disconnect_flag (changelog_rpc_clnt_t *crpc, gf_boolean_t flag)
+{
+ crpc->disconnected = flag;
+}
+
+static inline int
+changelog_rpc_clnt_is_disconnected (changelog_rpc_clnt_t *crpc)
+{
+ return (crpc->disconnected == _gf_true);
+}
+
+static inline void
+changelog_rpc_clnt_unref (changelog_rpc_clnt_t *crpc)
+{
+ gf_boolean_t gone = _gf_false;
+
+ LOCK (&crpc->lock);
+ {
+ if (!(--crpc->ref)
+ && changelog_rpc_clnt_is_disconnected (crpc)) {
+ list_del (&crpc->list);
+ gone = _gf_true;
+ }
+ }
+ UNLOCK (&crpc->lock);
+
+ if (gone)
+ crpc->cleanup (crpc);
+}
+
+/**
+ * This structure holds pending and active clients. On probe RPC all
+ * an instance of the above structure (@changelog_rpc_clnt) is placed
+ * in ->pending and gets moved to ->active on a successful connect.
+ *
+ * locking rules:
+ *
+ * Manipulating ->pending
+ * ->pending_lock
+ * ->pending
+ *
+ * Manipulating ->active
+ * ->active_lock
+ * ->active
+ *
+ * Moving object from ->pending to ->active
+ * ->pending_lock
+ * ->active_lock
+ *
+ * Objects are _never_ moved from ->active to ->pending, i.e., during
+ * disconnection, the object is destroyed. Well, we could have tried
+ * to reconnect, but that's pure waste.. let the other end reconnect.
+ */
+
+typedef struct changelog_clnt {
+ xlator_t *this;
+
+ /* pending connections */
+ pthread_mutex_t pending_lock;
+ pthread_cond_t pending_cond;
+ struct list_head pending;
+
+ /* current active connections */
+ gf_lock_t active_lock;
+ struct list_head active;
+
+ gf_lock_t wait_lock;
+ struct list_head waitq;
+
+ /* consumer part of rot-buffs */
+ rbuf_t *rbuf;
+ unsigned long sequence;
+} changelog_clnt_t;
+
+void *changelog_ev_connector (void *);
+
+void *changelog_ev_dispatch (void *);
+
+/* APIs */
+void
+changelog_ev_queue_connection (changelog_clnt_t *, changelog_rpc_clnt_t *);
+
+void
+changelog_ev_cleanup_connections (xlator_t *, changelog_clnt_t *);
+
+#endif
+
diff --git a/xlators/features/changelog/src/changelog-helpers.c b/xlators/features/changelog/src/changelog-helpers.c
index 3af2938190d..5c755d76d69 100644
--- a/xlators/features/changelog/src/changelog-helpers.c
+++ b/xlators/features/changelog/src/changelog-helpers.c
@@ -24,6 +24,7 @@
#include "changelog-mem-types.h"
#include "changelog-encoders.h"
+#include "changelog-rpc-common.h"
#include <pthread.h>
static inline void
@@ -57,7 +58,7 @@ changelog_cleanup_free_mutex (void *arg_mutex)
pthread_mutex_unlock(p_mutex);
}
-void
+int
changelog_thread_cleanup (xlator_t *this, pthread_t thr_id)
{
int ret = 0;
@@ -65,7 +66,7 @@ changelog_thread_cleanup (xlator_t *this, pthread_t thr_id)
/* send a cancel request to the thread */
ret = pthread_cancel (thr_id);
- if (ret) {
+ if (ret != 0) {
gf_log (this->name, GF_LOG_ERROR,
"could not cancel thread (reason: %s)",
strerror (errno));
@@ -73,14 +74,14 @@ changelog_thread_cleanup (xlator_t *this, pthread_t thr_id)
}
ret = pthread_join (thr_id, &retval);
- if (ret || (retval != PTHREAD_CANCELED)) {
+ if ((ret != 0) || (retval != PTHREAD_CANCELED)) {
gf_log (this->name, GF_LOG_ERROR,
"cancel request not adhered as expected"
" (reason: %s)", strerror (errno));
}
out:
- return;
+ return ret;
}
inline void *
@@ -98,6 +99,145 @@ changelog_get_usable_buffer (changelog_local_t *local)
return cld->cld_iobuf->ptr;
}
+static inline int
+changelog_selector_index (unsigned int selector)
+{
+ return (ffs (selector) - 1);
+}
+
+inline int
+changelog_ev_selected (xlator_t *this,
+ changelog_ev_selector_t *selection,
+ unsigned int selector)
+{
+ int idx = 0;
+
+ idx = changelog_selector_index (selector);
+ gf_log (this->name, GF_LOG_DEBUG,
+ "selector ref count for %d (idx: %d): %d",
+ selector, idx, selection->ref[idx]);
+ /* this can be lockless */
+ return (idx < CHANGELOG_EV_SELECTION_RANGE
+ && (selection->ref[idx] > 0));
+}
+
+inline void
+changelog_select_event (xlator_t *this,
+ changelog_ev_selector_t *selection,
+ unsigned int selector)
+{
+ int idx = 0;
+
+ LOCK (&selection->reflock);
+ {
+ while (selector) {
+ idx = changelog_selector_index (selector);
+ if (idx < CHANGELOG_EV_SELECTION_RANGE) {
+ selection->ref[idx]++;
+ gf_log (this->name, GF_LOG_DEBUG,
+ "selecting event %d", idx);
+ }
+ selector &= ~(1 << idx);
+ }
+ }
+ UNLOCK (&selection->reflock);
+}
+
+inline void
+changelog_deselect_event (xlator_t *this,
+ changelog_ev_selector_t *selection,
+ unsigned int selector)
+{
+ int idx = 0;
+
+ LOCK (&selection->reflock);
+ {
+ while (selector) {
+ idx = changelog_selector_index (selector);
+ if (idx < CHANGELOG_EV_SELECTION_RANGE) {
+ selection->ref[idx]--;
+ gf_log (this->name, GF_LOG_DEBUG,
+ "de-selecting event %d", idx);
+ }
+ selector &= ~(1 << idx);
+ }
+ }
+ UNLOCK (&selection->reflock);
+}
+
+inline int
+changelog_init_event_selection (xlator_t *this,
+ changelog_ev_selector_t *selection)
+{
+ int ret = 0;
+ int j = CHANGELOG_EV_SELECTION_RANGE;
+
+ ret = LOCK_INIT (&selection->reflock);
+ if (ret != 0)
+ return -1;
+
+ LOCK (&selection->reflock);
+ {
+ while (j--) {
+ selection->ref[j] = 0;
+ }
+ }
+ UNLOCK (&selection->reflock);
+
+ return 0;
+}
+
+inline int
+changelog_cleanup_event_selection (xlator_t *this,
+ changelog_ev_selector_t *selection)
+{
+ int ret = 0;
+ int j = CHANGELOG_EV_SELECTION_RANGE;
+
+ LOCK (&selection->reflock);
+ {
+ while (j--) {
+ if (selection->ref[j] > 0)
+ gf_log (this->name, GF_LOG_WARNING,
+ "changelog event selection cleaning up "
+ " on active references");
+ }
+ }
+ UNLOCK (&selection->reflock);
+
+ return LOCK_DESTROY (&selection->reflock);
+}
+
+static inline void
+changelog_perform_dispatch (xlator_t *this,
+ changelog_priv_t *priv, void *mem, size_t size)
+{
+ char *buf = NULL;
+ void *opaque = NULL;
+
+ buf = rbuf_reserve_write_area (priv->rbuf, size, &opaque);
+ if (!buf) {
+ gf_log_callingfn (this->name,
+ GF_LOG_WARNING, "failed to dispatch event");
+ return;
+ }
+
+ memcpy (buf, mem, size);
+ rbuf_write_complete (opaque);
+}
+
+inline void
+changelog_dispatch_event (xlator_t *this,
+ changelog_priv_t *priv, changelog_event_t *ev)
+{
+ changelog_ev_selector_t *selection = NULL;
+
+ selection = &priv->ev_selection;
+ if (changelog_ev_selected (this, selection, ev->ev_type)) {
+ changelog_perform_dispatch (this, priv, ev, CHANGELOG_EV_SIZE);
+ }
+}
+
inline void
changelog_set_usable_record_and_length (changelog_local_t *local,
size_t len, int xr)
@@ -206,9 +346,9 @@ changelog_rollover_changelog (xlator_t *this,
{
int ret = -1;
int notify = 0;
- char *bname = NULL;
char ofile[PATH_MAX] = {0,};
char nfile[PATH_MAX] = {0,};
+ changelog_event_t ev = {0,};
if (priv->changelog_fd != -1) {
ret = fsync (priv->changelog_fd);
@@ -252,40 +392,32 @@ changelog_rollover_changelog (xlator_t *this,
}
if (notify) {
- bname = basename (nfile);
- gf_log (this->name, GF_LOG_DEBUG, "notifying: %s", bname);
- ret = changelog_write (priv->wfd, bname, strlen (bname) + 1);
- if (ret) {
- gf_log (this->name, GF_LOG_ERROR,
- "Failed to send file name to notify thread"
- " (reason: %s)", strerror (errno));
- } else {
- /* If this is explicit rollover initiated by snapshot,
- * wakeup reconfigure thread waiting for changelog to
- * rollover
- */
- if (priv->explicit_rollover) {
- priv->explicit_rollover = _gf_false;
- ret = pthread_mutex_lock (
- &priv->bn.bnotify_mutex);
- CHANGELOG_PTHREAD_ERROR_HANDLE_0 (ret, out);
- {
- priv->bn.bnotify = _gf_false;
- ret = pthread_cond_signal (
- &priv->bn.bnotify_cond);
- CHANGELOG_PTHREAD_ERROR_HANDLE_0 (ret,
- out);
- gf_log (this->name, GF_LOG_INFO,
- "Changelog published: %s and"
- " signalled bnotify", bname);
- }
- ret = pthread_mutex_unlock (
- &priv->bn.bnotify_mutex);
+ ev.ev_type = CHANGELOG_OP_TYPE_JOURNAL;
+ memcpy (ev.u.journal.path, nfile, strlen (nfile) + 1);
+ changelog_dispatch_event (this, priv, &ev);
+
+ /* If this is explicit rollover initiated by snapshot,
+ * wakeup reconfigure thread waiting for changelog to
+ * rollover
+ */
+ if (priv->explicit_rollover) {
+ priv->explicit_rollover = _gf_false;
+
+ ret = pthread_mutex_lock (&priv->bn.bnotify_mutex);
+ CHANGELOG_PTHREAD_ERROR_HANDLE_0 (ret, out);
+ {
+ priv->bn.bnotify = _gf_false;
+ ret = pthread_cond_signal
+ (&priv->bn.bnotify_cond);
CHANGELOG_PTHREAD_ERROR_HANDLE_0 (ret, out);
+ gf_log (this->name, GF_LOG_INFO,
+ "Changelog published: %s signalled"
+ " bnotify", nfile);
}
+ ret = pthread_mutex_unlock (&priv->bn.bnotify_mutex);
+ CHANGELOG_PTHREAD_ERROR_HANDLE_0 (ret, out);
}
}
-
out:
return ret;
}
@@ -434,8 +566,8 @@ changelog_snap_logging_stop (xlator_t *this,
}
int
-changelog_open (xlator_t *this,
- changelog_priv_t *priv)
+changelog_open_journal (xlator_t *this,
+ changelog_priv_t *priv)
{
int fd = 0;
int ret = -1;
@@ -490,7 +622,7 @@ changelog_start_next_change (xlator_t *this,
ret = changelog_rollover_changelog (this, priv, ts);
if (!ret && !finale)
- ret = changelog_open (this, priv);
+ ret = changelog_open_journal (this, priv);
return ret;
}
@@ -975,7 +1107,7 @@ __changelog_inode_ctx_set (xlator_t *this,
* one shot routine to get the address and the value of a inode version
* for a particular type.
*/
-static changelog_inode_ctx_t *
+changelog_inode_ctx_t *
__changelog_inode_ctx_get (xlator_t *this,
inode_t *inode, unsigned long **iver,
unsigned long *version, changelog_log_type type)
diff --git a/xlators/features/changelog/src/changelog-helpers.h b/xlators/features/changelog/src/changelog-helpers.h
index 03a795369d1..33a99ee4eed 100644
--- a/xlators/features/changelog/src/changelog-helpers.h
+++ b/xlators/features/changelog/src/changelog-helpers.h
@@ -15,10 +15,16 @@
#include "timer.h"
#include "pthread.h"
#include "iobuf.h"
+#include "rot-buffs.h"
#include "changelog-misc.h"
#include "call-stub.h"
+#include "rpcsvc.h"
+#include "changelog-ev-handle.h"
+
+#include "changelog.h"
+
/**
* the changelog entry
*/
@@ -120,29 +126,6 @@ typedef struct changelog_fsync {
xlator_t *this;
} changelog_fsync_t;
-# define CHANGELOG_MAX_CLIENTS 5
-typedef struct changelog_notify {
- /* reader end of the pipe */
- int rfd;
-
- /* notifier thread */
- pthread_t notify_th;
-
- /* unique socket path */
- char sockpath[UNIX_PATH_MAX];
-
- int socket_fd;
-
- /**
- * simple array of accept()'ed fds. Not scalable at all
- * for large number of clients, but it's okay as we have
- * a ahrd limit in this version (@CHANGELOG_MAX_CLIENTS).
- */
- int client_fd[CHANGELOG_MAX_CLIENTS];
-
- xlator_t *this;
-} changelog_notify_t;
-
/* Draining during changelog rollover (for geo-rep snapshot dependency):
* --------------------------------------------------------------------
* The introduction of draining of in-transit fops during changelog rollover
@@ -162,14 +145,14 @@ typedef struct changelog_notify {
typedef enum chlog_fop_color {
FOP_COLOR_BLACK,
FOP_COLOR_WHITE
-}chlog_fop_color_t;
+} chlog_fop_color_t;
/* Barrier notify variable */
typedef struct barrier_notify {
pthread_mutex_t bnotify_mutex;
pthread_cond_t bnotify_cond;
gf_boolean_t bnotify;
-}barrier_notify_t;
+} barrier_notify_t;
/* Two separate mutex and conditional variable set is used
* to drain white and black fops. */
@@ -185,15 +168,26 @@ typedef struct drain_mgmt {
unsigned long white_fop_cnt;
gf_boolean_t drain_wait_black;
gf_boolean_t drain_wait_white;
-}drain_mgmt_t;
+} drain_mgmt_t;
/* External barrier as a result of snap on/off indicating flag*/
typedef struct barrier_flags {
gf_lock_t lock;
gf_boolean_t barrier_ext;
-}barrier_flags_t;
+} barrier_flags_t;
+/* Event selection */
+typedef struct changelog_ev_selector {
+ gf_lock_t reflock;
+ /**
+ * Array of references for each selection bit.
+ */
+ unsigned int ref[CHANGELOG_EV_SELECTION_RANGE];
+} changelog_ev_selector_t;
+
+
+/* changelog's private structure */
struct changelog_priv {
gf_boolean_t active;
@@ -223,9 +217,6 @@ struct changelog_priv {
/* lock to synchronize CSNAP updation */
gf_lock_t c_snap_lock;
- /* writen end of the pipe */
- int wfd;
-
/* rollover time */
int32_t rollover_time;
@@ -247,9 +238,6 @@ struct changelog_priv {
/* context of fsync thread */
changelog_fsync_t cf;
- /* context of the notifier thread */
- changelog_notify_t cn;
-
/* operation mode */
changelog_mode_t op_mode;
@@ -262,7 +250,9 @@ struct changelog_priv {
/* encoder */
struct changelog_encoder *ce;
- /* snapshot dependency changes */
+ /**
+ * snapshot dependency changes
+ */
/* Draining of fops*/
drain_mgmt_t dm;
@@ -289,6 +279,30 @@ struct changelog_priv {
gf_timer_t *timer;
struct timespec timeout;
+ /**
+ * buffers, RPC, event selection, notifications and other
+ * beasts.
+ */
+
+ /* epoll pthread */
+ pthread_t poller;
+
+ /* rotational buffer */
+ rbuf_t *rbuf;
+
+ /* changelog RPC server */
+ rpcsvc_t *rpc;
+
+ /* event selection */
+ changelog_ev_selector_t ev_selection;
+
+ /* client handling (reverse connection) */
+ pthread_t connector;
+
+ int nr_dispatchers;
+ pthread_t *ev_dispatcher;
+
+ changelog_clnt_t connections;
};
struct changelog_local {
@@ -367,7 +381,7 @@ typedef struct {
* helpers routines
*/
-void
+int
changelog_thread_cleanup (xlator_t *this, pthread_t thr_id);
void *
@@ -386,7 +400,7 @@ changelog_start_next_change (xlator_t *this,
changelog_priv_t *priv,
unsigned long ts, gf_boolean_t finale);
int
-changelog_open (xlator_t *this, changelog_priv_t *priv);
+changelog_open_journal (xlator_t *this, changelog_priv_t *priv);
int
changelog_fill_rollover_data (changelog_log_data_t *cld, gf_boolean_t is_last);
int
@@ -449,6 +463,7 @@ changelog_snap_handle_ascii_change (xlator_t *this,
changelog_log_data_t *cld);
int
changelog_snap_write_change (changelog_priv_t *priv, char *buffer, size_t len);
+
/* Changelog barrier routines */
void __chlog_barrier_enqueue (xlator_t *this, call_stub_t *stub);
void __chlog_barrier_disable (xlator_t *this, struct list_head *queue);
@@ -460,6 +475,24 @@ int32_t
changelog_fill_entry_buf (call_frame_t *frame, xlator_t *this,
loc_t *loc, changelog_local_t **local);
+/* event selection routines */
+inline void changelog_select_event (xlator_t *,
+ changelog_ev_selector_t *, unsigned int);
+inline void changelog_deselect_event (xlator_t *,
+ changelog_ev_selector_t *, unsigned int);
+inline int changelog_init_event_selection (xlator_t *,
+ changelog_ev_selector_t *);
+inline int changelog_cleanup_event_selection (xlator_t *,
+ changelog_ev_selector_t *);
+inline int changelog_ev_selected (xlator_t *,
+ changelog_ev_selector_t *, unsigned int);
+inline void
+changelog_dispatch_event (xlator_t *, changelog_priv_t *, changelog_event_t *);
+
+changelog_inode_ctx_t *
+__changelog_inode_ctx_get (xlator_t *, inode_t *, unsigned long **,
+ unsigned long *, changelog_log_type);
+
/* macros */
#define CHANGELOG_STACK_UNWIND(fop, frame, params ...) do { \
@@ -471,10 +504,10 @@ changelog_fill_entry_buf (call_frame_t *frame, xlator_t *this,
frame->local = NULL; \
} \
STACK_UNWIND_STRICT (fop, frame, params); \
- changelog_local_cleanup (__xl, __local); \
if (__local && __local->prev_entry) \
changelog_local_cleanup (__xl, \
__local->prev_entry); \
+ changelog_local_cleanup (__xl, __local); \
} while (0)
#define CHANGELOG_IOBUF_REF(iobuf) do { \
diff --git a/xlators/features/changelog/src/changelog-mem-types.h b/xlators/features/changelog/src/changelog-mem-types.h
index e1fa319a715..1618f722f6c 100644
--- a/xlators/features/changelog/src/changelog-mem-types.h
+++ b/xlators/features/changelog/src/changelog-mem-types.h
@@ -14,16 +14,21 @@
#include "mem-types.h"
enum gf_changelog_mem_types {
- gf_changelog_mt_priv_t = gf_common_mt_end + 1,
- gf_changelog_mt_str_t = gf_common_mt_end + 2,
- gf_changelog_mt_batch_t = gf_common_mt_end + 3,
- gf_changelog_mt_rt_t = gf_common_mt_end + 4,
- gf_changelog_mt_inode_ctx_t = gf_common_mt_end + 5,
- gf_changelog_mt_libgfchangelog_t = gf_common_mt_end + 6,
- gf_changelog_mt_libgfchangelog_rl_t = gf_common_mt_end + 7,
- gf_changelog_mt_libgfchangelog_dirent_t = gf_common_mt_end + 8,
- gf_changelog_mt_changelog_buffer_t = gf_common_mt_end + 9,
- gf_changelog_mt_history_data_t = gf_common_mt_end + 10,
+ gf_changelog_mt_priv_t = gf_common_mt_end + 1,
+ gf_changelog_mt_str_t = gf_common_mt_end + 2,
+ gf_changelog_mt_batch_t = gf_common_mt_end + 3,
+ gf_changelog_mt_rt_t = gf_common_mt_end + 4,
+ gf_changelog_mt_inode_ctx_t = gf_common_mt_end + 5,
+ gf_changelog_mt_rpc_clnt_t = gf_common_mt_end + 6,
+ gf_changelog_mt_libgfchangelog_t = gf_common_mt_end + 7,
+ gf_changelog_mt_libgfchangelog_entry_t = gf_common_mt_end + 8,
+ gf_changelog_mt_libgfchangelog_rl_t = gf_common_mt_end + 9,
+ gf_changelog_mt_libgfchangelog_dirent_t = gf_common_mt_end + 10,
+ gf_changelog_mt_changelog_buffer_t = gf_common_mt_end + 11,
+ gf_changelog_mt_history_data_t = gf_common_mt_end + 12,
+ gf_changelog_mt_libgfchangelog_call_pool_t = gf_common_mt_end + 13,
+ gf_changelog_mt_libgfchangelog_event_t = gf_common_mt_end + 14,
+ gf_changelog_mt_ev_dispatcher_t = gf_common_mt_end + 15,
gf_changelog_mt_end
};
diff --git a/xlators/features/changelog/src/changelog-misc.h b/xlators/features/changelog/src/changelog-misc.h
index 58b10961463..b45302ad099 100644
--- a/xlators/features/changelog/src/changelog-misc.h
+++ b/xlators/features/changelog/src/changelog-misc.h
@@ -25,6 +25,7 @@
#define CHANGELOG_VERSION_MINOR 1
#define CHANGELOG_UNIX_SOCK DEFAULT_VAR_RUN_DIRECTORY"/changelog-%s.sock"
+#define CHANGELOG_TMP_UNIX_SOCK DEFAULT_VAR_RUN_DIRECTORY"/.%s%lu.sock"
/**
* header starts with the version and the format of the changelog.
@@ -42,6 +43,19 @@
CHANGELOG_UNIX_SOCK, md5_sum); \
} while (0)
+#define CHANGELOG_MAKE_TMP_SOCKET_PATH(brick_path, sockpath, len) do { \
+ unsigned long pid = 0; \
+ char md5_sum[MD5_DIGEST_LENGTH*2+1] = {0,}; \
+ pid = (unsigned long) getpid (); \
+ md5_wrapper((unsigned char *) brick_path, \
+ strlen(brick_path), \
+ md5_sum); \
+ (void) snprintf (sockpath, \
+ len, CHANGELOG_TMP_UNIX_SOCK, \
+ md5_sum, pid); \
+ } while (0)
+
+
/**
* ... used by libgfchangelog.
*/
diff --git a/xlators/features/changelog/src/changelog-notifier.c b/xlators/features/changelog/src/changelog-notifier.c
deleted file mode 100644
index 5f3d063a8ad..00000000000
--- a/xlators/features/changelog/src/changelog-notifier.c
+++ /dev/null
@@ -1,314 +0,0 @@
-/*
- Copyright (c) 2013 Red Hat, Inc. <http://www.redhat.com>
- This file is part of GlusterFS.
-
- This file is licensed to you under your choice of the GNU Lesser
- General Public License, version 3 or any later version (LGPLv3 or
- later), or the GNU General Public License, version 2 (GPLv2), in all
- cases as published by the Free Software Foundation.
-*/
-
-#include "changelog-notifier.h"
-
-#include <pthread.h>
-
-inline static void
-changelog_notify_clear_fd (changelog_notify_t *cn, int i)
-{
- cn->client_fd[i] = -1;
-}
-
-inline static void
-changelog_notify_save_fd (changelog_notify_t *cn, int i, int fd)
-{
- cn->client_fd[i] = fd;
-}
-
-static int
-changelog_notify_insert_fd (xlator_t *this, changelog_notify_t *cn, int fd)
-{
- int i = 0;
- int ret = 0;
-
- for (; i < CHANGELOG_MAX_CLIENTS; i++) {
- if (cn->client_fd[i] == -1)
- break;
- }
-
- if (i == CHANGELOG_MAX_CLIENTS) {
- /**
- * this case should not be hit as listen() would limit
- * the number of completely established connections.
- */
- gf_log (this->name, GF_LOG_WARNING,
- "hit max client limit (%d)", CHANGELOG_MAX_CLIENTS);
- ret = -1;
- }
- else
- changelog_notify_save_fd (cn, i, fd);
-
- return ret;
-}
-
-static void
-changelog_notify_fill_rset (changelog_notify_t *cn, fd_set *rset, int *maxfd)
-{
- int i = 0;
-
- FD_ZERO (rset);
-
- FD_SET (cn->socket_fd, rset);
- *maxfd = cn->socket_fd;
-
- FD_SET (cn->rfd, rset);
- *maxfd = max (*maxfd, cn->rfd);
-
- for (; i < CHANGELOG_MAX_CLIENTS; i++) {
- if (cn->client_fd[i] != -1) {
- FD_SET (cn->client_fd[i], rset);
- *maxfd = max (*maxfd, cn->client_fd[i]);
- }
- }
-
- *maxfd = *maxfd + 1;
-}
-
-static int
-changelog_notify_client (changelog_notify_t *cn, char *path, ssize_t len)
-{
- int i = 0;
- int ret = 0;
-
- for (; i < CHANGELOG_MAX_CLIENTS; i++) {
- if (cn->client_fd[i] == -1)
- continue;
-
- if (changelog_write (cn->client_fd[i],
- path, len)) {
- ret = -1;
-
- close (cn->client_fd[i]);
- changelog_notify_clear_fd (cn, i);
- }
- }
-
- return ret;
-}
-
-static void
-changelog_notifier_init (changelog_notify_t *cn)
-{
- int i = 0;
-
- cn->socket_fd = -1;
-
- for (; i < CHANGELOG_MAX_CLIENTS; i++) {
- changelog_notify_clear_fd (cn, i);
- }
-}
-
-static void
-changelog_close_client_conn (changelog_notify_t *cn)
-{
- int i = 0;
-
- for (; i < CHANGELOG_MAX_CLIENTS; i++) {
- if (cn->client_fd[i] == -1)
- continue;
-
- close (cn->client_fd[i]);
- changelog_notify_clear_fd (cn, i);
- }
-}
-
-static void
-changelog_notifier_cleanup (void *arg)
-{
- changelog_notify_t *cn = NULL;
-
- cn = (changelog_notify_t *) arg;
-
- changelog_close_client_conn (cn);
-
- if (cn->socket_fd != -1)
- close (cn->socket_fd);
-
- if (cn->rfd)
- close (cn->rfd);
-
- if (unlink (cn->sockpath))
- gf_log ("", GF_LOG_WARNING,
- "could not unlink changelog socket file"
- " %s (reason: %s", cn->sockpath, strerror (errno));
-}
-
-void *
-changelog_notifier (void *data)
-{
- int i = 0;
- int fd = 0;
- int max_fd = 0;
- int len = 0;
- ssize_t readlen = 0;
- xlator_t *this = NULL;
- changelog_priv_t *priv = NULL;
- changelog_notify_t *cn = NULL;
- struct sockaddr_un local = {0,};
- char path[PATH_MAX] = {0,};
- char abspath[PATH_MAX] = {0,};
-
- char buffer;
- fd_set rset;
-
- priv = (changelog_priv_t *) data;
-
- cn = &priv->cn;
- this = cn->this;
-
- pthread_cleanup_push (changelog_notifier_cleanup, cn);
-
- changelog_notifier_init (cn);
-
- cn->socket_fd = socket (AF_UNIX, SOCK_STREAM, 0);
- if (cn->socket_fd < 0) {
- gf_log (this->name, GF_LOG_ERROR,
- "changelog socket error (reason: %s)",
- strerror (errno));
- goto out;
- }
-
- CHANGELOG_MAKE_SOCKET_PATH (priv->changelog_brick,
- cn->sockpath, UNIX_PATH_MAX);
- if (unlink (cn->sockpath) < 0) {
- if (errno != ENOENT) {
- gf_log (this->name, GF_LOG_ERROR,
- "Could not unlink changelog socket file (%s)"
- " (reason: %s)",
- CHANGELOG_UNIX_SOCK, strerror (errno));
- goto cleanup;
- }
- }
-
- local.sun_family = AF_UNIX;
- strcpy (local.sun_path, cn->sockpath);
-
- len = strlen (local.sun_path) + sizeof (local.sun_family);
-
- /* bind to the unix domain socket */
- if (bind (cn->socket_fd, (struct sockaddr *) &local, len) < 0) {
- gf_log (this->name, GF_LOG_ERROR,
- "Could not bind to changelog socket (reason: %s)",
- strerror (errno));
- goto cleanup;
- }
-
- /* listen for incoming connections */
- if (listen (cn->socket_fd, CHANGELOG_MAX_CLIENTS) < 0) {
- gf_log (this->name, GF_LOG_ERROR,
- "listen() error on changelog socket (reason: %s)",
- strerror (errno));
- goto cleanup;
- }
-
- /**
- * simple select() on all to-be-read file descriptors. This method
- * though old school works pretty well when you have a handfull of
- * fd's to be watched (clients).
- *
- * Future TODO: move this to epoll based notification facility if
- * number of clients increase.
- */
- for (;;) {
- changelog_notify_fill_rset (cn, &rset, &max_fd);
-
- if (select (max_fd, &rset, NULL, NULL, NULL) < 0) {
- gf_log (this->name, GF_LOG_ERROR,
- "select() returned -1 (reason: %s)",
- strerror (errno));
- sleep (2);
- continue;
- }
-
- if (FD_ISSET (cn->socket_fd, &rset)) {
- fd = accept (cn->socket_fd, NULL, NULL);
- if (fd < 0) {
- gf_log (this->name, GF_LOG_ERROR,
- "accept error on changelog socket"
- " (reason: %s)", strerror (errno));
- } else if (changelog_notify_insert_fd (this, cn, fd)) {
- gf_log (this->name, GF_LOG_ERROR,
- "hit max client limit");
- }
- }
-
- if (FD_ISSET (cn->rfd, &rset)) {
- /**
- * read changelog filename and notify all connected
- * clients.
- */
- readlen = 0;
- while (readlen < PATH_MAX) {
- len = read (cn->rfd, &path[readlen++], 1);
- if (len == -1) {
- break;
- }
-
- if (len == 0) {
- gf_log (this->name, GF_LOG_ERROR,
- "rollover thread sent EOF"
- " on pipe - possibly a crash.");
- /* be blunt and close all connections */
- pthread_exit(NULL);
- }
-
- if (path[readlen - 1] == '\0')
- break;
- }
-
- /* should we close all client connections here too? */
- if (len < 0 || readlen == PATH_MAX) {
- gf_log (this->name, GF_LOG_ERROR,
- "Could not get pathname from rollover"
- " thread or pathname too long");
- goto process_rest;
- }
-
- (void) snprintf (abspath, PATH_MAX,
- "%s/%s", priv->changelog_dir, path);
- if (changelog_notify_client (cn, abspath,
- strlen (abspath) + 1))
- gf_log (this->name, GF_LOG_ERROR,
- "could not notify some clients with new"
- " changelogs");
- }
-
- process_rest:
- for (i = 0; i < CHANGELOG_MAX_CLIENTS; i++) {
- if ( (fd = cn->client_fd[i]) == -1 )
- continue;
-
- if (FD_ISSET (fd, &rset)) {
- /**
- * the only data we accept from the client is a
- * disconnect. Anything else is treated as bogus
- * and is silently discarded (also warned!!!).
- */
- if ( (readlen = read (fd, &buffer, 1)) <= 0 ) {
- close (fd);
- changelog_notify_clear_fd (cn, i);
- } else {
- /* silently discard data and log */
- gf_log (this->name, GF_LOG_WARNING,
- "misbehaving changelog client");
- }
- }
- }
-
- }
-
- cleanup:;
- pthread_cleanup_pop (1);
-
- out:
- return NULL;
-}
diff --git a/xlators/features/changelog/src/changelog-rpc-common.c b/xlators/features/changelog/src/changelog-rpc-common.c
new file mode 100644
index 00000000000..76db6696ae8
--- /dev/null
+++ b/xlators/features/changelog/src/changelog-rpc-common.c
@@ -0,0 +1,334 @@
+/*
+ Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com>
+ This file is part of GlusterFS.
+
+ This file is licensed to you under your choice of the GNU Lesser
+ General Public License, version 3 or any later version (LGPLv3 or
+ later), or the GNU General Public License, version 2 (GPLv2), in all
+ cases as published by the Free Software Foundation.
+*/
+
+#include "changelog-rpc-common.h"
+
+/**
+*****************************************************
+ Client Interface
+*****************************************************
+*/
+
+/**
+ * Initialize and return an RPC client object for a given unix
+ * domain socket.
+ */
+
+void *
+changelog_rpc_poller (void *arg)
+{
+ xlator_t *this = arg;
+
+ (void) event_dispatch (this->ctx->event_pool);
+ return NULL;
+}
+
+struct rpc_clnt *
+changelog_rpc_client_init (xlator_t *this, void *cbkdata,
+ char *sockfile, rpc_clnt_notify_t fn)
+{
+ int ret = 0;
+ struct rpc_clnt *rpc = NULL;
+ dict_t *options = NULL;
+
+ if (!cbkdata)
+ cbkdata = this;
+
+ options = dict_new ();
+ if (!options)
+ goto error_return;
+
+ ret = rpc_transport_unix_options_build (&options, sockfile, 0);
+ if (ret) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "failed to build rpc options");
+ goto dealloc_dict;
+ }
+
+ rpc = rpc_clnt_new (options, this->ctx, this->name, 16);
+ if (!rpc)
+ goto dealloc_dict;
+
+ ret = rpc_clnt_register_notify (rpc, fn, cbkdata);
+ if (ret) {
+ gf_log (this->name, GF_LOG_ERROR, "failed to register notify");
+ goto dealloc_rpc_clnt;
+ }
+
+ ret = rpc_clnt_start (rpc);
+ if (ret) {
+ gf_log (this->name, GF_LOG_ERROR, "failed to start rpc");
+ goto dealloc_rpc_clnt;
+ }
+
+ return rpc;
+
+ dealloc_rpc_clnt:
+ rpc_clnt_unref (rpc);
+ dealloc_dict:
+ dict_unref (options);
+ error_return:
+ return NULL;
+}
+
+/**
+ * Generic RPC client routine to dispatch a request to an
+ * RPC server.
+ */
+int
+changelog_rpc_sumbit_req (struct rpc_clnt *rpc, void *req,
+ call_frame_t *frame, rpc_clnt_prog_t *prog,
+ int procnum, struct iovec *payload, int payloadcnt,
+ struct iobref *iobref, xlator_t *this,
+ fop_cbk_fn_t cbkfn, xdrproc_t xdrproc)
+{
+ int ret = 0;
+ int count = 0;
+ struct iovec iov = {0, };
+ struct iobuf *iobuf = NULL;
+ char new_iobref = 0;
+ ssize_t xdr_size = 0;
+
+ GF_ASSERT (this);
+
+ if (req) {
+ xdr_size = xdr_sizeof (xdrproc, req);
+
+ iobuf = iobuf_get2 (this->ctx->iobuf_pool, xdr_size);
+ if (!iobuf) {
+ goto out;
+ };
+
+ if (!iobref) {
+ iobref = iobref_new ();
+ if (!iobref) {
+ goto out;
+ }
+
+ new_iobref = 1;
+ }
+
+ iobref_add (iobref, iobuf);
+
+ iov.iov_base = iobuf->ptr;
+ iov.iov_len = iobuf_size (iobuf);
+
+ /* Create the xdr payload */
+ ret = xdr_serialize_generic (iov, req, xdrproc);
+ if (ret == -1) {
+ goto out;
+ }
+
+ iov.iov_len = ret;
+ count = 1;
+ }
+
+ ret = rpc_clnt_submit (rpc, prog, procnum, cbkfn, &iov, count,
+ payload, payloadcnt, iobref, frame, NULL,
+ 0, NULL, 0, NULL);
+
+ out:
+ if (new_iobref)
+ iobref_unref (iobref);
+ if (iobuf)
+ iobuf_unref (iobuf);
+ return ret;
+}
+
+/**
+ * Entry point to perform a remote procedure call
+ */
+int
+changelog_invoke_rpc (xlator_t *this, struct rpc_clnt *rpc,
+ rpc_clnt_prog_t *prog, int procidx, void *arg)
+{
+ int ret = 0;
+ call_frame_t *frame = NULL;
+ rpc_clnt_procedure_t *proc = NULL;
+
+ if (!this || !prog)
+ goto error_return;
+
+ frame = create_frame (this, this->ctx->pool);
+ if (!frame) {
+ gf_log (this->name, GF_LOG_ERROR, "failed to create frame");
+ goto error_return;
+ }
+
+ proc = &prog->proctable[procidx];
+ if (proc->fn)
+ ret = proc->fn (frame, this, arg);
+
+ STACK_DESTROY (frame->root);
+ return ret;
+
+ error_return:
+ return -1;
+}
+
+/**
+*****************************************************
+ Server Interface
+*****************************************************
+*/
+
+struct iobuf *
+__changelog_rpc_serialize_reply (rpcsvc_request_t *req, void *arg,
+ struct iovec *outmsg, xdrproc_t xdrproc)
+{
+ struct iobuf *iob = NULL;
+ ssize_t retlen = 0;
+ ssize_t rsp_size = 0;
+
+ rsp_size = xdr_sizeof (xdrproc, arg);
+ iob = iobuf_get2 (req->svc->ctx->iobuf_pool, rsp_size);
+ if (!iob)
+ goto error_return;
+
+ iobuf_to_iovec (iob, outmsg);
+
+ retlen = xdr_serialize_generic (*outmsg, arg, xdrproc);
+ if (retlen == -1)
+ goto unref_iob;
+
+ outmsg->iov_len = retlen;
+ return iob;
+
+ unref_iob:
+ iobuf_unref (iob);
+ error_return:
+ return NULL;
+}
+
+int
+changelog_rpc_sumbit_reply (rpcsvc_request_t *req,
+ void *arg, struct iovec *payload, int payloadcount,
+ struct iobref *iobref, xdrproc_t xdrproc)
+{
+ int ret = -1;
+ struct iobuf *iob = NULL;
+ struct iovec iov = {0,};
+ char new_iobref = 0;
+
+ if (!req)
+ goto return_ret;
+
+ if (!iobref) {
+ iobref = iobref_new ();
+ if (!iobref)
+ goto return_ret;
+ new_iobref = 1;
+ }
+
+ iob = __changelog_rpc_serialize_reply (req, arg, &iov, xdrproc);
+ if (!iob)
+ gf_log ("", GF_LOG_ERROR, "failed to serialize reply");
+ else
+ iobref_add (iobref, iob);
+
+ ret = rpcsvc_submit_generic (req, &iov,
+ 1, payload, payloadcount, iobref);
+
+ if (new_iobref)
+ iobref_unref (iobref);
+ if (iob)
+ iobuf_unref (iob);
+ return_ret:
+ return ret;
+}
+
+void
+changelog_rpc_server_destroy (xlator_t *this, rpcsvc_t *rpc, char *sockfile,
+ rpcsvc_notify_t fn, struct rpcsvc_program **progs)
+{
+ rpcsvc_listener_t *listener = NULL;
+ rpcsvc_listener_t *next = NULL;
+ struct rpcsvc_program *prog = NULL;
+
+ while (*progs) {
+ prog = *progs;
+ (void) rpcsvc_program_unregister (rpc, prog);
+ }
+
+ list_for_each_entry_safe (listener, next, &rpc->listeners, list) {
+ rpcsvc_listener_destroy (listener);
+ }
+
+ (void) rpcsvc_unregister_notify (rpc, fn, this);
+ unlink (sockfile);
+
+ GF_FREE (rpc);
+}
+
+rpcsvc_t *
+changelog_rpc_server_init (xlator_t *this, char *sockfile, void *cbkdata,
+ rpcsvc_notify_t fn, struct rpcsvc_program **progs)
+{
+ int j = 0;
+ int ret = 0;
+ rpcsvc_t *rpc = NULL;
+ dict_t *options = NULL;
+ struct rpcsvc_program *prog = NULL;
+
+ if (!cbkdata)
+ cbkdata = this;
+
+ options = dict_new ();
+ if (!options)
+ goto error_return;
+
+ ret = rpcsvc_transport_unix_options_build (&options, sockfile);
+ if (ret)
+ goto dealloc_dict;
+
+ rpc = rpcsvc_init (this, this->ctx, options, 8);
+ if (rpc == NULL) {
+ gf_log (this->name, GF_LOG_ERROR, "failed to init rpc");
+ goto dealloc_dict;
+ }
+
+ ret = rpcsvc_register_notify (rpc, fn, cbkdata);
+ if (ret) {
+ gf_log (this->name,
+ GF_LOG_ERROR, "failed to register notify function");
+ goto dealloc_rpc;
+ }
+
+ ret = rpcsvc_create_listeners (rpc, options, this->name);
+ if (ret != 1) {
+ gf_log (this->name,
+ GF_LOG_DEBUG, "failed to create listeners");
+ goto dealloc_rpc;
+ }
+
+ while (*progs) {
+ prog = *progs;
+ ret = rpcsvc_program_register (rpc, prog);
+ if (ret) {
+ gf_log (this->name,
+ GF_LOG_ERROR, "cannot register program "
+ "(name: %s, prognum: %d, pogver: %d)",
+ prog->progname, prog->prognum, prog->progver);
+ goto dealloc_rpc;
+ }
+
+ progs++;
+ }
+
+ dict_unref (options);
+ return rpc;
+
+ dealloc_rpc:
+ GF_FREE (rpc);
+ dealloc_dict:
+ dict_unref (options);
+ error_return:
+ return NULL;
+}
diff --git a/xlators/features/changelog/src/changelog-rpc-common.h b/xlators/features/changelog/src/changelog-rpc-common.h
new file mode 100644
index 00000000000..95c850c9400
--- /dev/null
+++ b/xlators/features/changelog/src/changelog-rpc-common.h
@@ -0,0 +1,84 @@
+/*
+ Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com>
+ This file is part of GlusterFS.
+
+ This file is licensed to you under your choice of the GNU Lesser
+ General Public License, version 3 or any later version (LGPLv3 or
+ later), or the GNU General Public License, version 2 (GPLv2), in all
+ cases as published by the Free Software Foundation.
+*/
+
+#ifndef __CHANGELOG_RPC_COMMON_H
+#define __CHANGELOG_RPC_COMMON_H
+
+#include "rpcsvc.h"
+#include "rpc-clnt.h"
+#include "event.h"
+#include "call-stub.h"
+
+#include "changelog-xdr.h"
+#include "xdr-generic.h"
+
+#include "changelog.h"
+
+/**
+ * Let's keep this non-configurable for now.
+ */
+#define NR_ROTT_BUFFS 4
+#define NR_DISPATCHERS (NR_ROTT_BUFFS - 1)
+
+enum changelog_rpc_procnum {
+ CHANGELOG_RPC_PROC_NULL = 0,
+ CHANGELOG_RPC_PROBE_FILTER = 1,
+ CHANGELOG_RPC_PROC_MAX = 2,
+};
+
+#define CHANGELOG_RPC_PROGNUM 1885957735
+#define CHANGELOG_RPC_PROGVER 1
+
+/**
+ * reverse connection: data xfer path
+ */
+enum changelog_reverse_rpc_procnum {
+ CHANGELOG_REV_PROC_NULL = 0,
+ CHANGELOG_REV_PROC_EVENT = 1,
+ CHANGELOG_REV_PROC_MAX = 2,
+};
+
+#define CHANGELOG_REV_RPC_PROCNUM 1886350951
+#define CHANGELOG_REV_RPC_PROCVER 1
+
+typedef struct changelog_rpc {
+ rpcsvc_t *svc;
+ struct rpc_clnt *rpc;
+ char sock[UNIX_PATH_MAX]; /* tied to server */
+} changelog_rpc_t;
+
+/* event poller */
+void *changelog_rpc_poller (void *);
+
+/* CLIENT API */
+struct rpc_clnt *
+changelog_rpc_client_init (xlator_t *, void *, char *, rpc_clnt_notify_t);
+
+int
+changelog_rpc_sumbit_req (struct rpc_clnt *, void *, call_frame_t *,
+ rpc_clnt_prog_t *, int , struct iovec *, int,
+ struct iobref *, xlator_t *, fop_cbk_fn_t, xdrproc_t);
+
+int
+changelog_invoke_rpc (xlator_t *, struct rpc_clnt *,
+ rpc_clnt_prog_t *, int , void *);
+
+/* SERVER API */
+int
+changelog_rpc_sumbit_reply (rpcsvc_request_t *, void *,
+ struct iovec *, int, struct iobref *, xdrproc_t);
+rpcsvc_t *
+changelog_rpc_server_init (xlator_t *, char *, void*,
+ rpcsvc_notify_t, struct rpcsvc_program **);
+void
+changelog_rpc_server_destroy (xlator_t *, rpcsvc_t *, char *,
+ rpcsvc_notify_t, struct rpcsvc_program **);
+
+#endif
diff --git a/xlators/features/changelog/src/changelog-rpc.c b/xlators/features/changelog/src/changelog-rpc.c
new file mode 100644
index 00000000000..04326456d31
--- /dev/null
+++ b/xlators/features/changelog/src/changelog-rpc.c
@@ -0,0 +1,300 @@
+/*
+ Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com>
+ This file is part of GlusterFS.
+
+ This file is licensed to you under your choice of the GNU Lesser
+ General Public License, version 3 or any later version (LGPLv3 or
+ later), or the GNU General Public License, version 2 (GPLv2), in all
+ cases as published by the Free Software Foundation.
+*/
+
+#include "changelog-rpc.h"
+#include "changelog-mem-types.h"
+#include "changelog-ev-handle.h"
+
+struct rpcsvc_program *changelog_programs[];
+
+static void
+changelog_cleanup_dispatchers (xlator_t *this,
+ changelog_priv_t *priv, int count)
+{
+ for (; count >= 0; count--) {
+ (void) changelog_thread_cleanup
+ (this, priv->ev_dispatcher[count]);
+ }
+}
+
+static int
+changelog_cleanup_rpc_threads (xlator_t *this, changelog_priv_t *priv)
+{
+ int ret = 0;
+ changelog_clnt_t *conn = NULL;
+
+ conn = &priv->connections;
+ if (!conn)
+ return 0;
+
+ /** terminate RPC thread(s) */
+ ret = changelog_thread_cleanup (this, priv->connector);
+ if (ret != 0)
+ goto error_return;
+ /** terminate dispatcher thread(s) */
+ changelog_cleanup_dispatchers (this, priv, priv->nr_dispatchers);
+
+ /* TODO: what about pending and waiting connections? */
+ changelog_ev_cleanup_connections (this, conn);
+
+ /* destroy locks */
+ ret = pthread_mutex_destroy (&conn->pending_lock);
+ if (ret != 0)
+ goto error_return;
+ ret = pthread_cond_destroy (&conn->pending_cond);
+ if (ret != 0)
+ goto error_return;
+ ret = LOCK_DESTROY (&conn->active_lock);
+ if (ret != 0)
+ goto error_return;
+ ret = LOCK_DESTROY (&conn->wait_lock);
+ if (ret != 0)
+ goto error_return;
+ return 0;
+
+ error_return:
+ return -1;
+}
+
+static int
+changelog_init_rpc_threads (xlator_t *this, changelog_priv_t *priv,
+ rbuf_t *rbuf, int nr_dispatchers)
+{
+ int j = 0;
+ int ret = 0;
+ changelog_clnt_t *conn = NULL;
+
+ conn = &priv->connections;
+
+ conn->this = this;
+ conn->rbuf = rbuf;
+ conn->sequence = 1; /* start with sequence number one */
+
+ INIT_LIST_HEAD (&conn->pending);
+ INIT_LIST_HEAD (&conn->active);
+ INIT_LIST_HEAD (&conn->waitq);
+
+ ret = pthread_mutex_init (&conn->pending_lock, NULL);
+ if (ret)
+ goto error_return;
+ ret = pthread_cond_init (&conn->pending_cond, NULL);
+ if (ret)
+ goto cleanup_pending_lock;
+
+ ret = LOCK_INIT (&conn->active_lock);
+ if (ret)
+ goto cleanup_pending_cond;
+ ret = LOCK_INIT (&conn->wait_lock);
+ if (ret)
+ goto cleanup_active_lock;
+
+ /* spawn reverse connection thread */
+ ret = pthread_create (&priv->connector,
+ NULL, changelog_ev_connector, conn);
+ if (ret != 0)
+ goto cleanup_wait_lock;
+
+ /* spawn dispatcher thread(s) */
+ priv->ev_dispatcher = GF_CALLOC (nr_dispatchers, sizeof(pthread_t),
+ gf_changelog_mt_ev_dispatcher_t);
+ if (!priv->ev_dispatcher)
+ goto cleanup_connector;
+
+ /* spawn dispatcher threads */
+ for (; j < nr_dispatchers; j++) {
+ ret = pthread_create (&priv->ev_dispatcher[j],
+ NULL, changelog_ev_dispatch, conn);
+ if (ret != 0) {
+ changelog_cleanup_dispatchers (this, priv, --j);
+ break;
+ }
+ }
+
+ if (ret != 0)
+ goto cleanup_connector;
+
+ priv->nr_dispatchers = nr_dispatchers;
+ return 0;
+
+ cleanup_connector:
+ (void) pthread_cancel (priv->connector);
+ cleanup_wait_lock:
+ (void) LOCK_DESTROY (&conn->wait_lock);
+ cleanup_active_lock:
+ (void) LOCK_DESTROY (&conn->active_lock);
+ cleanup_pending_cond:
+ (void) pthread_cond_destroy (&conn->pending_cond);
+ cleanup_pending_lock:
+ (void) pthread_mutex_destroy (&conn->pending_lock);
+ error_return:
+ return -1;
+}
+
+int
+changelog_rpcsvc_notify (rpcsvc_t *rpc,
+ void *xl, rpcsvc_event_t event, void *data)
+{
+ return 0;
+}
+
+void
+changelog_destroy_rpc_listner (xlator_t *this, changelog_priv_t *priv)
+{
+ char sockfile[UNIX_PATH_MAX] = {0,};
+
+ /* sockfile path could have been saved to avoid this */
+ CHANGELOG_MAKE_SOCKET_PATH (priv->changelog_brick,
+ sockfile, UNIX_PATH_MAX);
+ changelog_rpc_server_destroy (this,
+ priv->rpc, sockfile,
+ changelog_rpcsvc_notify,
+ changelog_programs);
+ (void) changelog_cleanup_rpc_threads (this, priv);
+}
+
+rpcsvc_t *
+changelog_init_rpc_listner (xlator_t *this, changelog_priv_t *priv,
+ rbuf_t *rbuf, int nr_dispatchers)
+{
+ int ret = 0;
+ char sockfile[UNIX_PATH_MAX] = {0,};
+
+ ret = changelog_init_rpc_threads (this, priv, rbuf, nr_dispatchers);
+ if (ret)
+ return NULL;
+
+ CHANGELOG_MAKE_SOCKET_PATH (priv->changelog_brick,
+ sockfile, UNIX_PATH_MAX);
+ return changelog_rpc_server_init (this, sockfile, NULL,
+ changelog_rpcsvc_notify,
+ changelog_programs);
+}
+
+void
+changelog_rpc_clnt_cleanup (changelog_rpc_clnt_t *crpc)
+{
+ if (!crpc)
+ return;
+ crpc->c_clnt = NULL;
+ (void) LOCK_DESTROY (&crpc->lock);
+ GF_FREE (crpc);
+}
+
+inline changelog_rpc_clnt_t *
+changelog_rpc_clnt_init (xlator_t *this,
+ changelog_probe_req *rpc_req, changelog_clnt_t *c_clnt)
+{
+ int ret = 0;
+ changelog_rpc_clnt_t *crpc = NULL;
+
+ crpc = GF_CALLOC (1, sizeof (*crpc), gf_changelog_mt_rpc_clnt_t);
+ if (!crpc)
+ goto error_return;
+ INIT_LIST_HEAD (&crpc->list);
+
+ crpc->ref = 0;
+ changelog_set_disconnect_flag (crpc, _gf_false);
+
+ crpc->filter = rpc_req->filter;
+ (void) memcpy (crpc->sock, rpc_req->sock, strlen (rpc_req->sock));
+
+ crpc->this = this;
+ crpc->c_clnt = c_clnt;
+ crpc->cleanup = changelog_rpc_clnt_cleanup;
+
+ ret = LOCK_INIT (&crpc->lock);
+ if (ret != 0)
+ goto dealloc_crpc;
+ return crpc;
+
+ dealloc_crpc:
+ GF_FREE (crpc);
+ error_return:
+ return NULL;
+}
+
+/**
+ * Actor declarations
+ */
+
+/**
+ * @probe_handler
+ * A probe RPC call spawns a connect back to the caller. Caller also
+ * passes an hint which acts as a filter for selecting updates.
+ */
+
+int
+changelog_handle_probe (rpcsvc_request_t *req)
+{
+ int ret = 0;
+ xlator_t *this = NULL;
+ rpcsvc_t *svc = NULL;
+ changelog_priv_t *priv = NULL;
+ changelog_clnt_t *c_clnt = NULL;
+ changelog_rpc_clnt_t *crpc = NULL;
+
+ changelog_probe_req rpc_req = {0,};
+ changelog_probe_rsp rpc_rsp = {0,};
+
+ ret = xdr_to_generic (req->msg[0],
+ &rpc_req, (xdrproc_t)xdr_changelog_probe_req);
+ if (ret < 0) {
+ gf_log ("", GF_LOG_ERROR, "xdr decoding error");
+ req->rpc_err = GARBAGE_ARGS;
+ goto handle_xdr_error;
+ }
+
+ /* ->xl hidden in rpcsvc */
+ svc = rpcsvc_request_service (req);
+ this = svc->mydata;
+ priv = this->private;
+ c_clnt = &priv->connections;
+
+ crpc = changelog_rpc_clnt_init (this, &rpc_req, c_clnt);
+ if (!crpc)
+ goto handle_xdr_error;
+
+ changelog_ev_queue_connection (c_clnt, crpc);
+ rpc_rsp.op_ret = 0;
+
+ goto submit_rpc;
+
+ handle_xdr_error:
+ rpc_rsp.op_ret = -1;
+ submit_rpc:
+ (void) changelog_rpc_sumbit_reply (req, &rpc_rsp, NULL, 0, NULL,
+ (xdrproc_t)xdr_changelog_probe_rsp);
+ return 0;
+}
+
+/**
+ * RPC declarations
+ */
+
+rpcsvc_actor_t changelog_svc_actors[CHANGELOG_RPC_PROC_MAX] = {
+ [CHANGELOG_RPC_PROBE_FILTER] = {
+ "CHANGELOG PROBE FILTER", CHANGELOG_RPC_PROBE_FILTER,
+ changelog_handle_probe, NULL, 0, DRC_NA
+ },
+};
+
+struct rpcsvc_program changelog_svc_prog = {
+ .progname = CHANGELOG_RPC_PROGNAME,
+ .prognum = CHANGELOG_RPC_PROGNUM,
+ .progver = CHANGELOG_RPC_PROGVER,
+ .numactors = CHANGELOG_RPC_PROC_MAX,
+ .actors = changelog_svc_actors,
+ .synctask = _gf_true,
+};
+
+struct rpcsvc_program *changelog_programs[] = {
+ &changelog_svc_prog,
+ NULL,
+};
diff --git a/xlators/features/changelog/src/changelog-rpc.h b/xlators/features/changelog/src/changelog-rpc.h
new file mode 100644
index 00000000000..0df96684b6c
--- /dev/null
+++ b/xlators/features/changelog/src/changelog-rpc.h
@@ -0,0 +1,29 @@
+/*
+ Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com>
+ This file is part of GlusterFS.
+
+ This file is licensed to you under your choice of the GNU Lesser
+ General Public License, version 3 or any later version (LGPLv3 or
+ later), or the GNU General Public License, version 2 (GPLv2), in all
+ cases as published by the Free Software Foundation.
+*/
+
+#ifndef __CHANGELOG_RPC_H
+#define __CHANGELOG_RPC_H
+
+#include "xlator.h"
+#include "changelog-helpers.h"
+
+/* one time */
+#include "socket.h"
+#include "changelog-rpc-common.h"
+
+#define CHANGELOG_RPC_PROGNAME "GlusterFS Changelog"
+
+rpcsvc_t *
+changelog_init_rpc_listner (xlator_t *, changelog_priv_t *, rbuf_t *, int);
+
+void
+changelog_destroy_rpc_listner (xlator_t *, changelog_priv_t *);
+
+#endif
diff --git a/xlators/features/changelog/src/changelog.c b/xlators/features/changelog/src/changelog.c
index 4263a462ad7..e7d8522ae8c 100644
--- a/xlators/features/changelog/src/changelog.c
+++ b/xlators/features/changelog/src/changelog.c
@@ -19,14 +19,13 @@
#include "iobuf.h"
#include "changelog-rt.h"
-#include "changelog-helpers.h"
#include "changelog-encoders.h"
#include "changelog-mem-types.h"
#include <pthread.h>
-#include "changelog-notifier.h"
+#include "changelog-rpc.h"
static struct changelog_bootstrap
cb_bootstrap[] = {
@@ -912,14 +911,30 @@ changelog_create_cbk (call_frame_t *frame,
struct iatt *preparent,
struct iatt *postparent, dict_t *xdata)
{
+ int32_t ret = 0;
changelog_priv_t *priv = NULL;
changelog_local_t *local = NULL;
+ changelog_event_t ev = {0,};
priv = this->private;
local = frame->local;
CHANGELOG_COND_GOTO (priv, ((op_ret < 0) || !local), unwind);
+ /* fill the event structure.. similar to open() */
+ ev.ev_type = CHANGELOG_OP_TYPE_CREATE;
+ uuid_copy (ev.u.create.gfid, buf->ia_gfid);
+ ev.u.create.flags = fd->flags;
+ changelog_dispatch_event (this, priv, &ev);
+
+ if (changelog_ev_selected
+ (this, &priv->ev_selection, CHANGELOG_OP_TYPE_RELEASE)) {
+ ret = fd_ctx_set (fd, this, (uint64_t)(long) 0x1);
+ if (ret)
+ gf_log (this->name, GF_LOG_WARNING,
+ "could not set fd context (for release cbk)");
+ }
+
changelog_update (this, priv, local, CHANGELOG_TYPE_ENTRY);
unwind:
@@ -1633,6 +1648,92 @@ changelog_writev (call_frame_t *frame,
/* }}} */
+/* open, release and other beasts */
+
+/* {{{ */
+
+
+
+int
+changelog_open_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
+ int op_ret, int op_errno, fd_t *fd, dict_t *xdata)
+{
+ int ret = 0;
+ void *opaque = NULL;
+ char *buf = NULL;
+ ssize_t buflen = 0;
+ changelog_priv_t *priv = NULL;
+ changelog_event_t ev = {0,};
+ gf_boolean_t logopen = _gf_false;
+
+ priv = this->private;
+ if (frame->local) {
+ frame->local = NULL;
+ logopen = _gf_true;
+ }
+
+ CHANGELOG_COND_GOTO (priv, ((op_ret < 0) || !logopen), unwind);
+
+ /* fill the event structure */
+ ev.ev_type = CHANGELOG_OP_TYPE_OPEN;
+ uuid_copy (ev.u.open.gfid, fd->inode->gfid);
+ ev.u.open.flags = fd->flags;
+ changelog_dispatch_event (this, priv, &ev);
+
+ if (changelog_ev_selected
+ (this, &priv->ev_selection, CHANGELOG_OP_TYPE_RELEASE)) {
+ ret = fd_ctx_set (fd, this, (uint64_t)(long) 0x1);
+ if (ret)
+ gf_log (this->name, GF_LOG_WARNING,
+ "could not set fd context (for release cbk)");
+ }
+
+ unwind:
+ CHANGELOG_STACK_UNWIND (open, frame, op_ret, op_errno, fd, xdata);
+ return 0;
+}
+
+int
+changelog_open (call_frame_t *frame, xlator_t *this,
+ loc_t *loc, int flags, fd_t *fd, dict_t *xdata)
+{
+ changelog_priv_t *priv = NULL;
+
+ priv = this->private;
+ CHANGELOG_NOT_ACTIVE_THEN_GOTO (frame, priv, wind);
+
+ frame->local = (void *)0x1; /* do not dereference in ->cbk */
+
+ wind:
+ STACK_WIND (frame, changelog_open_cbk, FIRST_CHILD (this),
+ FIRST_CHILD (this)->fops->open, loc, flags, fd, xdata);
+ return 0;
+}
+
+/* }}} */
+
+/* {{{ */
+
+int32_t
+changelog_release (xlator_t *this, fd_t *fd)
+{
+ changelog_event_t ev = {0,};
+ changelog_priv_t *priv = NULL;
+
+ priv = this->private;
+
+ ev.ev_type = CHANGELOG_OP_TYPE_RELEASE;
+ uuid_copy (ev.u.release.gfid, fd->inode->gfid);
+ changelog_dispatch_event (this, priv, &ev);
+
+ (void) fd_ctx_del (fd, this, NULL);
+
+ return 0;
+}
+
+
+/* }}} */
+
/**
* The
* - @init ()
@@ -1679,7 +1780,7 @@ changelog_cleanup_helper_threads (xlator_t *this, changelog_priv_t *priv)
int ret = 0;
if (priv->cr.rollover_th) {
- changelog_thread_cleanup (this, priv->cr.rollover_th);
+ (void) changelog_thread_cleanup (this, priv->cr.rollover_th);
priv->cr.rollover_th = 0;
ret = close (priv->cr_wfd);
if (ret)
@@ -1689,7 +1790,7 @@ changelog_cleanup_helper_threads (xlator_t *this, changelog_priv_t *priv)
}
if (priv->cf.fsync_th) {
- changelog_thread_cleanup (this, priv->cf.fsync_th);
+ (void) changelog_thread_cleanup (this, priv->cf.fsync_th);
priv->cf.fsync_th = 0;
}
}
@@ -1754,67 +1855,6 @@ changelog_spawn_helper_threads (xlator_t *this, changelog_priv_t *priv)
return ret;
}
-/* cleanup the notifier thread */
-static int
-changelog_cleanup_notifier (xlator_t *this, changelog_priv_t *priv)
-{
- int ret = 0;
-
- if (priv->cn.notify_th) {
- changelog_thread_cleanup (this, priv->cn.notify_th);
- priv->cn.notify_th = 0;
-
- ret = close (priv->wfd);
- if (ret)
- gf_log (this->name, GF_LOG_ERROR,
- "error closing writer end of notifier pipe"
- " (reason: %s)", strerror (errno));
- }
-
- return ret;
-}
-
-/* spawn the notifier thread - nop if already running */
-static int
-changelog_spawn_notifier (xlator_t *this, changelog_priv_t *priv)
-{
- int ret = 0;
- int flags = 0;
- int pipe_fd[2] = {0, 0};
-
- if (priv->cn.notify_th)
- goto out; /* notifier thread already running */
-
- ret = pipe (pipe_fd);
- if (ret == -1) {
- gf_log (this->name, GF_LOG_ERROR,
- "Cannot create pipe (reason: %s)", strerror (errno));
- goto out;
- }
-
- /* writer is non-blocking */
- flags = fcntl (pipe_fd[1], F_GETFL);
- flags |= O_NONBLOCK;
-
- ret = fcntl (pipe_fd[1], F_SETFL, flags);
- if (ret) {
- gf_log (this->name, GF_LOG_ERROR,
- "failed to set O_NONBLOCK flag");
- goto out;
- }
-
- priv->wfd = pipe_fd[1];
-
- priv->cn.this = this;
- priv->cn.rfd = pipe_fd[0];
-
- ret = gf_thread_create (&priv->cn.notify_th,
- NULL, changelog_notifier, priv);
-
- out:
- return ret;
-}
-
int
notify (xlator_t *this, int event, void *data, ...)
{
@@ -2054,11 +2094,6 @@ changelog_init (xlator_t *this, changelog_priv_t *priv)
if (!priv->active)
return ret;
- /* spawn the notifier thread */
- ret = changelog_spawn_notifier (this, priv);
- if (ret)
- goto out;
-
/**
* start with a fresh changelog file every time. this is done
* in case there was an encoding change. so... things are kept
@@ -2086,9 +2121,11 @@ changelog_init (xlator_t *this, changelog_priv_t *priv)
return ret;
}
-/* Init all pthread condition variables and locks in changelog*/
+/**
+ * Init barrier related condition variables and locks
+ */
static int
-changelog_pthread_init (xlator_t *this, changelog_priv_t *priv)
+changelog_barrier_pthread_init (xlator_t *this, changelog_priv_t *priv)
{
gf_boolean_t bn_mutex_init = _gf_false;
gf_boolean_t bn_cond_init = _gf_false;
@@ -2165,9 +2202,9 @@ changelog_pthread_init (xlator_t *this, changelog_priv_t *priv)
return ret;
}
-/* Destroy all pthread condition variables and locks in changelog */
+/* Destroy barrier related condition variables and locks */
static inline void
-changelog_pthread_destroy (changelog_priv_t *priv)
+changelog_barrier_pthread_destroy (changelog_priv_t *priv)
{
pthread_mutex_destroy (&priv->bn.bnotify_mutex);
pthread_cond_destroy (&priv->bn.bnotify_cond);
@@ -2284,17 +2321,13 @@ reconfigure (xlator_t *this, dict_t *options)
}
htime_open(this, priv, tv.tv_sec);
}
- ret = changelog_spawn_notifier (this, priv);
- if (!ret)
- ret = changelog_spawn_helper_threads (this,
- priv);
- } else
- ret = changelog_cleanup_notifier (this, priv);
+ ret = changelog_spawn_helper_threads (this, priv);
+ }
}
out:
if (ret) {
- ret = changelog_cleanup_notifier (this, priv);
+ /* TODO */
} else {
gf_log (this->name, GF_LOG_DEBUG,
"changelog reconfigured");
@@ -2305,67 +2338,40 @@ reconfigure (xlator_t *this, dict_t *options)
return ret;
}
-int32_t
-init (xlator_t *this)
+static void
+changelog_freeup_options (xlator_t *this, changelog_priv_t *priv)
{
- int ret = -1;
- char *tmp = NULL;
- changelog_priv_t *priv = NULL;
- gf_boolean_t cond_lock_init = _gf_false;
- char htime_dir[PATH_MAX] = {0,};
- char csnap_dir[PATH_MAX] = {0,};
- uint32_t timeout = 0;
-
- GF_VALIDATE_OR_GOTO ("changelog", this, out);
-
- if (!this->children || this->children->next) {
- gf_log (this->name, GF_LOG_ERROR,
- "translator needs a single subvolume");
- goto out;
- }
-
- if (!this->parents) {
- gf_log (this->name, GF_LOG_ERROR,
- "dangling volume. please check volfile");
- goto out;
- }
-
- priv = GF_CALLOC (1, sizeof (*priv), gf_changelog_mt_priv_t);
- if (!priv)
- goto out;
+ int ret = 0;
- this->local_pool = mem_pool_new (changelog_local_t, 64);
- if (!this->local_pool) {
+ ret = priv->cb->dtor (this, &priv->cd);
+ if (ret)
gf_log (this->name, GF_LOG_ERROR,
- "failed to create local memory pool");
- goto out;
- }
-
- LOCK_INIT (&priv->lock);
- LOCK_INIT (&priv->c_snap_lock);
+ "could not cleanup bootstrapper");
+ GF_FREE (priv->changelog_brick);
+ GF_FREE (priv->changelog_dir);
+}
- GF_OPTION_INIT ("changelog-brick", tmp, str, out);
- if (!tmp) {
- gf_log (this->name, GF_LOG_ERROR,
- "\"changelog-brick\" option is not set");
- goto out;
- }
+static int
+changelog_init_options (xlator_t *this, changelog_priv_t *priv)
+{
+ int ret = 0;
+ char *tmp = NULL;
+ uint32_t timeout = 0;
+ char htime_dir[PATH_MAX] = {0,};
+ char csnap_dir[PATH_MAX] = {0,};
+ GF_OPTION_INIT ("changelog-brick", tmp, str, error_return);
priv->changelog_brick = gf_strdup (tmp);
if (!priv->changelog_brick)
- goto out;
- tmp = NULL;
+ goto error_return;
- GF_OPTION_INIT ("changelog-dir", tmp, str, out);
- if (!tmp) {
- gf_log (this->name, GF_LOG_ERROR,
- "\"changelog-dir\" option is not set");
- goto out;
- }
+ tmp = NULL;
+ GF_OPTION_INIT ("changelog-dir", tmp, str, dealloc_1);
priv->changelog_dir = gf_strdup (tmp);
if (!priv->changelog_dir)
- goto out;
+ goto dealloc_1;
+
tmp = NULL;
/**
@@ -2375,35 +2381,38 @@ init (xlator_t *this)
ret = mkdir_p (priv->changelog_dir, 0600, _gf_true);
if (ret)
- goto out;
+ goto dealloc_2;
- CHANGELOG_FILL_HTIME_DIR(priv->changelog_dir, htime_dir);
+ CHANGELOG_FILL_HTIME_DIR (priv->changelog_dir, htime_dir);
ret = mkdir_p (htime_dir, 0600, _gf_true);
if (ret)
- goto out;
+ goto dealloc_2;
- CHANGELOG_FILL_CSNAP_DIR(priv->changelog_dir, csnap_dir);
+ CHANGELOG_FILL_CSNAP_DIR (priv->changelog_dir, csnap_dir);
ret = mkdir_p (csnap_dir, 0600, _gf_true);
if (ret)
- goto out;
+ goto dealloc_2;
- GF_OPTION_INIT ("changelog", priv->active, bool, out);
+ GF_OPTION_INIT ("changelog", priv->active, bool, dealloc_2);
- GF_OPTION_INIT ("op-mode", tmp, str, out);
+ GF_OPTION_INIT ("op-mode", tmp, str, dealloc_2);
changelog_assign_opmode (priv, tmp);
tmp = NULL;
- GF_OPTION_INIT ("encoding", tmp, str, out);
+ GF_OPTION_INIT ("encoding", tmp, str, dealloc_2);
changelog_assign_encoding (priv, tmp);
+ changelog_encode_change (priv);
- GF_OPTION_INIT ("rollover-time", priv->rollover_time, int32, out);
+ GF_OPTION_INIT ("rollover-time",
+ priv->rollover_time, int32, dealloc_2);
- GF_OPTION_INIT ("fsync-interval", priv->fsync_interval, int32, out);
- GF_OPTION_INIT ("changelog-barrier-timeout", timeout, time, out);
- priv->timeout.tv_sec = timeout;
+ GF_OPTION_INIT ("fsync-interval",
+ priv->fsync_interval, int32, dealloc_2);
- changelog_encode_change(priv);
+ GF_OPTION_INIT ("changelog-barrier-timeout",
+ timeout, time, dealloc_2);
+ changelog_assign_barrier_timeout (priv, timeout);
GF_ASSERT (cb_bootstrap[priv->op_mode].mode == priv->op_mode);
priv->cb = &cb_bootstrap[priv->op_mode];
@@ -2411,10 +2420,111 @@ init (xlator_t *this)
/* ... now bootstrap the logger */
ret = priv->cb->ctor (this, &priv->cd);
if (ret)
- goto out;
+ goto dealloc_2;
priv->changelog_fd = -1;
+ return 0;
+
+ dealloc_2:
+ GF_FREE (priv->changelog_dir);
+ dealloc_1:
+ GF_FREE (priv->changelog_brick);
+ error_return:
+ return -1;
+}
+
+static void
+changelog_cleanup_rpc (xlator_t *this, changelog_priv_t *priv)
+{
+ /* terminate rpc server */
+ changelog_destroy_rpc_listner (this, priv);
+
+ /* cleanup rot buffs */
+ rbuf_dtor (priv->rbuf);
+
+ /* cleanup poller thread */
+ (void) changelog_thread_cleanup (this, priv->poller);
+}
+
+static int
+changelog_init_rpc (xlator_t *this, changelog_priv_t *priv)
+{
+ int ret = 0;
+ rpcsvc_t *rpc = NULL;
+ changelog_ev_selector_t *selection = NULL;
+
+ selection = &priv->ev_selection;
+
+ /* initialize event selection */
+ changelog_init_event_selection (this, selection);
+
+ ret = pthread_create (&priv->poller, NULL, changelog_rpc_poller, this);
+ if (ret != 0) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "failed to spawn poller thread");
+ goto error_return;
+ }
+
+ priv->rbuf = rbuf_init (NR_ROTT_BUFFS);
+ if (!priv->rbuf)
+ goto cleanup_thread;
+
+ rpc = changelog_init_rpc_listner (this, priv,
+ priv->rbuf, NR_DISPATCHERS);
+ if (!rpc)
+ goto cleanup_rbuf;
+ priv->rpc = rpc;
+
+ return 0;
+
+ cleanup_rbuf:
+ rbuf_dtor (priv->rbuf);
+ cleanup_thread:
+ (void) changelog_thread_cleanup (this, priv->poller);
+ error_return:
+ return -1;
+}
+
+int32_t
+init (xlator_t *this)
+{
+ int ret = -1;
+ char *tmp = NULL;
+ changelog_priv_t *priv = NULL;
+
+ GF_VALIDATE_OR_GOTO ("changelog", this, error_return);
+
+ if (!this->children || this->children->next) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "translator needs a single subvolume");
+ goto error_return;
+ }
+
+ if (!this->parents) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "dangling volume. please check volfile");
+ goto error_return;
+ }
+
+ priv = GF_CALLOC (1, sizeof (*priv), gf_changelog_mt_priv_t);
+ if (!priv)
+ goto error_return;
+
+ this->local_pool = mem_pool_new (changelog_local_t, 64);
+ if (!this->local_pool) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "failed to create local memory pool");
+ goto cleanup_priv;
+ }
+
+ LOCK_INIT (&priv->lock);
+ LOCK_INIT (&priv->c_snap_lock);
+
+ ret = changelog_init_options (this, priv);
+ if (ret)
+ goto cleanup_mempool;
+
/* snap dependency changes */
priv->dm.black_fop_cnt = 0;
priv->dm.white_fop_cnt = 0;
@@ -2422,67 +2532,68 @@ init (xlator_t *this)
priv->dm.drain_wait_white = _gf_false;
priv->current_color = FOP_COLOR_BLACK;
priv->explicit_rollover = _gf_false;
+
/* Mutex is not needed as threads are not spawned yet */
priv->bn.bnotify = _gf_false;
- ret = changelog_pthread_init (this, priv);
+ ret = changelog_barrier_pthread_init (this, priv);
if (ret)
- goto out;
-
+ goto cleanup_options;
LOCK_INIT (&priv->bflags.lock);
- cond_lock_init = _gf_true;
priv->bflags.barrier_ext = _gf_false;
/* Changelog barrier init */
INIT_LIST_HEAD (&priv->queue);
priv->barrier_enabled = _gf_false;
- ret = changelog_init (this, priv);
+ /* RPC ball rolling.. */
+ ret = changelog_init_rpc (this, priv);
if (ret)
- goto out;
+ goto cleanup_barrier;
+ ret = changelog_init (this, priv);
+ if (ret)
+ goto cleanup_rpc;
gf_log (this->name, GF_LOG_DEBUG, "changelog translator loaded");
- out:
- if (ret) {
- if (this && this->local_pool)
- mem_pool_destroy (this->local_pool);
- if (priv) {
- if (priv->cb) {
- ret = priv->cb->dtor (this, &priv->cd);
- if (ret)
- gf_log (this->name, GF_LOG_ERROR,
- "error in cleanup during init()");
- }
- GF_FREE (priv->changelog_brick);
- GF_FREE (priv->changelog_dir);
- if (cond_lock_init)
- changelog_pthread_destroy (priv);
- GF_FREE (priv);
- }
- this->private = NULL;
- } else
- this->private = priv;
+ this->private = priv;
+ return 0;
- return ret;
+ cleanup_rpc:
+ changelog_cleanup_rpc (this, priv);
+ cleanup_barrier:
+ changelog_barrier_pthread_destroy (priv);
+ cleanup_options:
+ changelog_freeup_options (this, priv);
+ cleanup_mempool:
+ mem_pool_destroy (this->local_pool);
+ cleanup_priv:
+ GF_FREE (priv);
+ error_return:
+ this->private = NULL;
+ return -1;
}
void
fini (xlator_t *this)
{
- int ret = -1;
changelog_priv_t *priv = NULL;
priv = this->private;
if (priv) {
- ret = priv->cb->dtor (this, &priv->cd);
- if (ret)
- gf_log (this->name, GF_LOG_ERROR,
- "error in fini");
+ /* terminate RPC server/threads */
+ changelog_cleanup_rpc (this, priv);
+
+ /* cleanup barrier related objects */
+ changelog_barrier_pthread_destroy (priv);
+
+ /* cleanup allocated options */
+ changelog_freeup_options (this, priv);
+
+ /* deallocate mempool */
mem_pool_destroy (this->local_pool);
- GF_FREE (priv->changelog_brick);
- GF_FREE (priv->changelog_dir);
- changelog_pthread_destroy (priv);
+
+ /* finally, dealloac private variable */
GF_FREE (priv);
}
@@ -2492,6 +2603,7 @@ fini (xlator_t *this)
}
struct xlator_fops fops = {
+ .open = changelog_open,
.mknod = changelog_mknod,
.mkdir = changelog_mkdir,
.create = changelog_create,
@@ -2513,6 +2625,7 @@ struct xlator_fops fops = {
struct xlator_cbks cbks = {
.forget = changelog_forget,
+ .release = changelog_release,
};
struct volume_options options[] = {