summaryrefslogtreecommitdiffstats
path: root/xlators/features/changelog
diff options
context:
space:
mode:
Diffstat (limited to 'xlators/features/changelog')
-rw-r--r--xlators/features/changelog/lib/examples/c/get-changes-multi.c84
-rw-r--r--xlators/features/changelog/lib/examples/c/get-changes.c2
-rw-r--r--xlators/features/changelog/lib/examples/c/get-history.c7
-rw-r--r--xlators/features/changelog/lib/src/Makefile.am30
-rw-r--r--xlators/features/changelog/lib/src/changelog.h72
-rw-r--r--xlators/features/changelog/lib/src/gf-changelog-api.c224
-rw-r--r--xlators/features/changelog/lib/src/gf-changelog-helpers.c32
-rw-r--r--xlators/features/changelog/lib/src/gf-changelog-helpers.h194
-rw-r--r--xlators/features/changelog/lib/src/gf-changelog-journal-handler.c (renamed from xlators/features/changelog/lib/src/gf-changelog-process.c)520
-rw-r--r--xlators/features/changelog/lib/src/gf-changelog-journal.h114
-rw-r--r--xlators/features/changelog/lib/src/gf-changelog-reborp.c381
-rw-r--r--xlators/features/changelog/lib/src/gf-changelog-rpc.c105
-rw-r--r--xlators/features/changelog/lib/src/gf-changelog-rpc.h (renamed from xlators/features/changelog/src/changelog-notifier.h)17
-rw-r--r--xlators/features/changelog/lib/src/gf-changelog.c862
-rw-r--r--xlators/features/changelog/lib/src/gf-history-changelog.c161
-rw-r--r--xlators/features/changelog/src/Makefile.am18
-rw-r--r--xlators/features/changelog/src/changelog-encoders.c2
-rw-r--r--xlators/features/changelog/src/changelog-ev-handle.c382
-rw-r--r--xlators/features/changelog/src/changelog-ev-handle.h140
-rw-r--r--xlators/features/changelog/src/changelog-helpers.c210
-rw-r--r--xlators/features/changelog/src/changelog-helpers.h107
-rw-r--r--xlators/features/changelog/src/changelog-mem-types.h25
-rw-r--r--xlators/features/changelog/src/changelog-misc.h14
-rw-r--r--xlators/features/changelog/src/changelog-notifier.c314
-rw-r--r--xlators/features/changelog/src/changelog-rpc-common.c334
-rw-r--r--xlators/features/changelog/src/changelog-rpc-common.h84
-rw-r--r--xlators/features/changelog/src/changelog-rpc.c300
-rw-r--r--xlators/features/changelog/src/changelog-rpc.h29
-rw-r--r--xlators/features/changelog/src/changelog.c473
29 files changed, 3936 insertions, 1301 deletions
diff --git a/xlators/features/changelog/lib/examples/c/get-changes-multi.c b/xlators/features/changelog/lib/examples/c/get-changes-multi.c
new file mode 100644
index 00000000000..8f23c81c2a0
--- /dev/null
+++ b/xlators/features/changelog/lib/examples/c/get-changes-multi.c
@@ -0,0 +1,84 @@
+/*
+ Copyright (c) 2013 Red Hat, Inc. <http://www.redhat.com>
+ This file is part of GlusterFS.
+
+ This file is licensed to you under your choice of the GNU Lesser
+ General Public License, version 3 or any later version (LGPLv3 or
+ later), or the GNU General Public License, version 2 (GPLv2), in all
+ cases as published by the Free Software Foundation.
+*/
+
+/**
+ * Compile it using:
+ * gcc -o getchanges-multi `pkg-config --cflags libgfchangelog` \
+ * get-changes-multi.c `pkg-config --libs libgfchangelog`
+ */
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <sys/un.h>
+#include <limits.h>
+#include <sys/socket.h>
+#include <sys/types.h>
+#include <errno.h>
+
+#include "changelog.h"
+
+void *brick_init (void *xl, struct gf_brick_spec *brick)
+{
+ return brick;
+}
+
+void brick_fini (void *xl, char *brick, void *data)
+{
+ return;
+}
+
+void brick_callback (void *xl, char *brick,
+ void *data, changelog_event_t *ev)
+{
+ printf ("->callback: (brick,type) [%s:%d]\n", brick, ev->ev_type);
+}
+
+void fill_brick_spec (struct gf_brick_spec *brick, char *path)
+{
+ brick->brick_path = strdup (path);
+ brick->filter = CHANGELOG_OP_TYPE_RELEASE;
+
+ brick->init = brick_init;
+ brick->fini = brick_fini;
+ brick->callback = brick_callback;
+ brick->connected = NULL;
+ brick->disconnected = NULL;
+}
+
+int
+main (int argc, char **argv)
+{
+ int ret = 0;
+ void *bricks = NULL;
+ struct gf_brick_spec *brick = NULL;
+
+ bricks = calloc (2, sizeof (struct gf_brick_spec));
+ if (!bricks)
+ goto error_return;
+
+ brick = (struct gf_brick_spec *)bricks;
+ fill_brick_spec (brick, "/export/z1/zwoop");
+
+ brick++;
+ fill_brick_spec (brick, "/export/z2/zwoop");
+
+ ret = gf_changelog_register_generic ((struct gf_brick_spec *)bricks, 2,
+ 1, "/tmp/multi-changes.log", 9,
+ NULL);
+ if (ret)
+ goto error_return;
+
+ /* let callbacks do the job */
+ select (0, NULL, NULL, NULL, NULL);
+
+ error_return:
+ return -1;
+}
diff --git a/xlators/features/changelog/lib/examples/c/get-changes.c b/xlators/features/changelog/lib/examples/c/get-changes.c
index 6d0d0357db9..0b2808c7e35 100644
--- a/xlators/features/changelog/lib/examples/c/get-changes.c
+++ b/xlators/features/changelog/lib/examples/c/get-changes.c
@@ -40,7 +40,7 @@ main (int argc, char ** argv)
char fbuf[PATH_MAX] = {0,};
/* get changes for brick "/home/vshankar/export/yow/yow-1" */
- ret = gf_changelog_register ("/home/vshankar/exports/yow/yow-1",
+ ret = gf_changelog_register ("/export/z1/zwoop",
"/tmp/scratch", "/tmp/change.log", 9, 5);
if (ret) {
handle_error ("register failed");
diff --git a/xlators/features/changelog/lib/examples/c/get-history.c b/xlators/features/changelog/lib/examples/c/get-history.c
index 33eb8c32d4d..2e1ff3c767f 100644
--- a/xlators/features/changelog/lib/examples/c/get-history.c
+++ b/xlators/features/changelog/lib/examples/c/get-history.c
@@ -40,8 +40,8 @@ main (int argc, char ** argv)
char fbuf[PATH_MAX] = {0,};
unsigned long end_ts = 0;
- ret = gf_changelog_register ("/export1/v1/b1",
- "/tmp/scratch_v1", "/tmp/scratch_v1/changes.log",
+ ret = gf_changelog_register ("/export/z1/zwoop",
+ "/tmp/scratch_v1", "/tmp/changes.log",
9, 5);
if (ret) {
handle_error ("register failed");
@@ -51,7 +51,8 @@ main (int argc, char ** argv)
int a, b;
printf ("give the two numbers start and end\t");
scanf ("%d%d", &a, &b);
- ret = gf_history_changelog ("/export1/v1/b1/.glusterfs/changelogs",a, b, 3, &end_ts);
+ ret = gf_history_changelog ("/export/z1/zwoop/.glusterfs/changelogs",
+ a, b, 3, &end_ts);
if (ret == -1) {
printf ("history failed");
goto out;
diff --git a/xlators/features/changelog/lib/src/Makefile.am b/xlators/features/changelog/lib/src/Makefile.am
index 1ae919bfb38..306306bd585 100644
--- a/xlators/features/changelog/lib/src/Makefile.am
+++ b/xlators/features/changelog/lib/src/Makefile.am
@@ -4,9 +4,13 @@ libgfchangelog_la_CFLAGS = -Wall $(GF_CFLAGS) $(GF_DARWIN_LIBGLUSTERFS_CFLAGS) \
libgfchangelog_la_CPPFLAGS = $(GF_CPPFLAGS) -D__USE_FILE_OFFSET64 -fpic \
-I../../../src/ -I$(top_srcdir)/libglusterfs/src \
-I$(top_srcdir)/xlators/features/changelog/src \
+ -I$(top_srcdir)/rpc/xdr/src -I$(top_srcdir)/rpc/rpc-lib/src \
+ -I$(top_srcdir)/rpc/rpc-transport/socket/src \
-DDATADIR=\"$(localstatedir)\"
-libgfchangelog_la_LIBADD = $(top_builddir)/libglusterfs/src/libglusterfs.la
+libgfchangelog_la_LIBADD = $(top_builddir)/libglusterfs/src/libglusterfs.la \
+ $(top_builddir)/rpc/xdr/src/libgfxdr.la \
+ $(top_builddir)/rpc/rpc-lib/src/libgfrpc.la
libgfchangelog_la_LDFLAGS = $(GF_LDFLAGS) -version-info $(LIBGFCHANGELOG_LT_VERSION)
@@ -15,18 +19,18 @@ lib_LTLIBRARIES = libgfchangelog.la
CONTRIB_BUILDDIR = $(top_builddir)/contrib
-libgfchangelog_la_SOURCES = gf-changelog.c gf-changelog-process.c \
- gf-changelog-helpers.c gf-history-changelog.c \
- $(CONTRIBDIR)/uuid/clear.c \
- $(CONTRIBDIR)/uuid/copy.c $(CONTRIBDIR)/uuid/gen_uuid.c \
- $(CONTRIBDIR)/uuid/pack.c $(CONTRIBDIR)/uuid/parse.c \
- $(CONTRIBDIR)/uuid/unparse.c $(CONTRIBDIR)/uuid/uuid_time.c \
- $(CONTRIBDIR)/uuid/compare.c $(CONTRIBDIR)/uuid/isnull.c \
- $(CONTRIBDIR)/uuid/unpack.c
-
-noinst_HEADERS = gf-changelog-helpers.h $(CONTRIBDIR)/uuid/uuidd.h \
- $(CONTRIBDIR)/uuid/uuid.h $(CONTRIBDIR)/uuid/uuidP.h \
- $(CONTRIB_BUILDDIR)/uuid/uuid_types.h
+libgfchangelog_la_SOURCES = gf-changelog.c gf-changelog-journal-handler.c gf-changelog-helpers.c \
+ gf-changelog-api.c gf-history-changelog.c gf-changelog-rpc.c gf-changelog-reborp.c \
+ $(top_srcdir)/xlators/features/changelog/src/changelog-rpc-common.c \
+ $(CONTRIBDIR)/uuid/clear.c $(CONTRIBDIR)/uuid/copy.c \
+ $(CONTRIBDIR)/uuid/gen_uuid.c $(CONTRIBDIR)/uuid/pack.c \
+ $(CONTRIBDIR)/uuid/parse.c $(CONTRIBDIR)/uuid/unparse.c \
+ $(CONTRIBDIR)/uuid/uuid_time.c $(CONTRIBDIR)/uuid/compare.c \
+ $(CONTRIBDIR)/uuid/isnull.c $(CONTRIBDIR)/uuid/unpack.c
+
+noinst_HEADERS = gf-changelog-helpers.h gf-changelog-rpc.h gf-changelog-journal.h \
+ $(CONTRIBDIR)/uuid/uuidd.h $(CONTRIBDIR)/uuid/uuid.h \
+ $(CONTRIBDIR)/uuid/uuidP.h $(CONTRIB_BUILDDIR)/uuid/uuid_types.h
libgfchangelog_HEADERS = changelog.h
diff --git a/xlators/features/changelog/lib/src/changelog.h b/xlators/features/changelog/lib/src/changelog.h
index 5cddfb5839c..d7048ff2508 100644
--- a/xlators/features/changelog/lib/src/changelog.h
+++ b/xlators/features/changelog/lib/src/changelog.h
@@ -11,6 +11,73 @@
#ifndef _GF_CHANGELOG_H
#define _GF_CHANGELOG_H
+struct gf_brick_spec;
+
+/**
+ * Max bit shiter for event selection
+ */
+#define CHANGELOG_EV_SELECTION_RANGE 4
+
+#define CHANGELOG_OP_TYPE_JOURNAL (1<<0)
+#define CHANGELOG_OP_TYPE_OPEN (1<<1)
+#define CHANGELOG_OP_TYPE_CREATE (1<<2)
+#define CHANGELOG_OP_TYPE_RELEASE (1<<3)
+#define CHANGELOG_OP_TYPE_MAX (1<<CHANGELOG_EV_SELECTION_RANGE)
+
+
+struct ev_open {
+ unsigned char gfid[16];
+ int32_t flags;
+};
+
+struct ev_creat {
+ unsigned char gfid[16];
+ int32_t flags;
+};
+
+struct ev_release {
+ unsigned char gfid[16];
+};
+
+struct ev_changelog {
+ char path[PATH_MAX];
+};
+
+typedef struct changelog_event {
+ unsigned int ev_type;
+ union {
+ struct ev_open open;
+ struct ev_creat create;
+ struct ev_release release;
+ struct ev_changelog journal;
+ } u;
+} changelog_event_t;
+
+#define CHANGELOG_EV_SIZE (sizeof (changelog_event_t))
+
+/**
+ * event callback, connected & disconnection defs
+ */
+typedef void (CALLBACK) (void *, char *,
+ void *, changelog_event_t *);
+typedef void *(INIT) (void *, struct gf_brick_spec *);
+typedef void (FINI) (void *, char *, void *);
+typedef void (CONNECT) (void *, char *, void *);
+typedef void (DISCONNECT) (void *, char *, void *);
+
+struct gf_brick_spec {
+ char *brick_path;
+ unsigned int filter;
+
+ INIT *init;
+ FINI *fini;
+ CALLBACK *callback;
+ CONNECT *connected;
+ DISCONNECT *disconnected;
+
+ void *ptr;
+};
+
/* API set */
int
@@ -28,4 +95,9 @@ gf_changelog_next_change (char *bufptr, size_t maxlen);
int
gf_changelog_done (char *file);
+/* newer flexible API */
+int
+gf_changelog_register_generic (struct gf_brick_spec *bricks, int count,
+ int ordered, char *logfile, int lvl, void *xl);
+
#endif
diff --git a/xlators/features/changelog/lib/src/gf-changelog-api.c b/xlators/features/changelog/lib/src/gf-changelog-api.c
new file mode 100644
index 00000000000..cea2ff01988
--- /dev/null
+++ b/xlators/features/changelog/lib/src/gf-changelog-api.c
@@ -0,0 +1,224 @@
+/*
+ Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com>
+ This file is part of GlusterFS.
+
+ This file is licensed to you under your choice of the GNU Lesser
+ General Public License, version 3 or any later version (LGPLv3 or
+ later), or the GNU General Public License, version 2 (GPLv2), in all
+ cases as published by the Free Software Foundation.
+*/
+
+#include "uuid.h"
+#include "globals.h"
+#include "glusterfs.h"
+
+#include "gf-changelog-helpers.h"
+#include "gf-changelog-journal.h"
+#include "changelog-mem-types.h"
+
+int
+gf_changelog_done (char *file)
+{
+ int ret = -1;
+ char *buffer = NULL;
+ xlator_t *this = NULL;
+ gf_changelog_journal_t *jnl = NULL;
+ char to_path[PATH_MAX] = {0,};
+
+ errno = EINVAL;
+
+ this = THIS;
+ if (!this)
+ goto out;
+
+ jnl = (gf_changelog_journal_t *) GF_CHANGELOG_GET_API_PTR (this);
+ if (!jnl)
+ goto out;
+
+ if (!file || !strlen (file))
+ goto out;
+
+ /* make sure 'file' is inside ->jnl_working_dir */
+ buffer = realpath (file, NULL);
+ if (!buffer)
+ goto out;
+
+ if (strncmp (jnl->jnl_working_dir,
+ buffer, strlen (jnl->jnl_working_dir)))
+ goto out;
+
+ (void) snprintf (to_path, PATH_MAX, "%s%s",
+ jnl->jnl_processed_dir, basename (buffer));
+ gf_log (this->name, GF_LOG_DEBUG,
+ "moving %s to processed directory", file);
+ ret = rename (buffer, to_path);
+ if (ret) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "cannot move %s to %s (reason: %s)",
+ file, to_path, strerror (errno));
+ goto out;
+ }
+
+ ret = 0;
+
+ out:
+ if (buffer)
+ free (buffer); /* allocated by realpath() */
+ return ret;
+}
+
+/**
+ * @API
+ * for a set of changelogs, start from the beginning
+ */
+int
+gf_changelog_start_fresh ()
+{
+ xlator_t *this = NULL;
+ gf_changelog_journal_t *jnl = NULL;
+
+ this = THIS;
+ if (!this)
+ goto out;
+
+ errno = EINVAL;
+
+ jnl = (gf_changelog_journal_t *) GF_CHANGELOG_GET_API_PTR (this);
+ if (!jnl)
+ goto out;
+
+ if (gf_ftruncate (jnl->jnl_fd, 0))
+ goto out;
+
+ return 0;
+
+ out:
+ return -1;
+}
+
+/**
+ * @API
+ * return the next changelog file entry. zero means all chanelogs
+ * consumed.
+ */
+ssize_t
+gf_changelog_next_change (char *bufptr, size_t maxlen)
+{
+ ssize_t size = -1;
+ int tracker_fd = 0;
+ xlator_t *this = NULL;
+ gf_changelog_journal_t *jnl = NULL;
+ char buffer[PATH_MAX] = {0,};
+
+ errno = EINVAL;
+
+ this = THIS;
+ if (!this)
+ goto out;
+
+ jnl = (gf_changelog_journal_t *) GF_CHANGELOG_GET_API_PTR (this);
+ if (!jnl)
+ goto out;
+
+ tracker_fd = jnl->jnl_fd;
+
+ size = gf_readline (tracker_fd, buffer, maxlen);
+ if (size < 0) {
+ size = -1;
+ goto out;
+ }
+
+ if (size == 0)
+ goto out;
+
+ memcpy (bufptr, buffer, size - 1);
+ bufptr[size - 1] = '\0';
+
+out:
+ return size;
+}
+
+/**
+ * @API
+ * gf_changelog_scan() - scan and generate a list of change entries
+ *
+ * calling this api multiple times (without calling gf_changlog_done())
+ * would result new changelogs(s) being refreshed in the tracker file.
+ * This call also acts as a cancellation point for the consumer.
+ */
+ssize_t
+gf_changelog_scan ()
+{
+ int ret = 0;
+ int tracker_fd = 0;
+ size_t len = 0;
+ size_t off = 0;
+ xlator_t *this = NULL;
+ size_t nr_entries = 0;
+ gf_changelog_journal_t *jnl = NULL;
+ struct dirent *entryp = NULL;
+ struct dirent *result = NULL;
+ char buffer[PATH_MAX] = {0,};
+
+ this = THIS;
+ if (!this)
+ goto out;
+
+ jnl = (gf_changelog_journal_t *) GF_CHANGELOG_GET_API_PTR (this);
+ if (!jnl)
+ goto out;
+ if (JNL_IS_API_DISCONNECTED (jnl)) {
+ errno = ENOTCONN;
+ goto out;
+ }
+
+ errno = EINVAL;
+
+ tracker_fd = jnl->jnl_fd;
+ if (gf_ftruncate (tracker_fd, 0))
+ goto out;
+
+ len = offsetof(struct dirent, d_name)
+ + pathconf(jnl->jnl_processing_dir, _PC_NAME_MAX) + 1;
+ entryp = GF_CALLOC (1, len,
+ gf_changelog_mt_libgfchangelog_dirent_t);
+ if (!entryp)
+ goto out;
+
+ rewinddir (jnl->jnl_dir);
+ while (1) {
+ ret = readdir_r (jnl->jnl_dir, entryp, &result);
+ if (ret || !result)
+ break;
+
+ if (!strcmp (basename (entryp->d_name), ".")
+ || !strcmp (basename (entryp->d_name), ".."))
+ continue;
+
+ nr_entries++;
+
+ GF_CHANGELOG_FILL_BUFFER (jnl->jnl_processing_dir,
+ buffer, off,
+ strlen (jnl->jnl_processing_dir));
+ GF_CHANGELOG_FILL_BUFFER (entryp->d_name, buffer,
+ off, strlen (entryp->d_name));
+ GF_CHANGELOG_FILL_BUFFER ("\n", buffer, off, 1);
+
+ if (gf_changelog_write (tracker_fd, buffer, off) != off) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "error writing changelog filename"
+ " to tracker file");
+ break;
+ }
+ off = 0;
+ }
+
+ GF_FREE (entryp);
+
+ if (!result) {
+ if (gf_lseek (tracker_fd, 0, SEEK_SET) != -1)
+ return nr_entries;
+ }
+ out:
+ return -1;
+}
diff --git a/xlators/features/changelog/lib/src/gf-changelog-helpers.c b/xlators/features/changelog/lib/src/gf-changelog-helpers.c
index f071b057d59..6bf709dc664 100644
--- a/xlators/features/changelog/lib/src/gf-changelog-helpers.c
+++ b/xlators/features/changelog/lib/src/gf-changelog-helpers.c
@@ -178,3 +178,35 @@ gf_ftruncate (int fd, off_t length)
return 0;
}
+
+int
+gf_thread_cleanup (xlator_t *this, pthread_t thread)
+{
+ int ret = 0;
+ void *res = NULL;
+
+ ret = pthread_cancel (thread);
+ if (ret != 0) {
+ gf_log (this->name, GF_LOG_WARNING,
+ "Failed to send cancellation to thread");
+ goto error_return;
+ }
+
+ ret = pthread_join (thread, &res);
+ if (ret != 0) {
+ gf_log (this->name, GF_LOG_WARNING,
+ "failed to join thread");
+ goto error_return;
+ }
+
+ if (res != PTHREAD_CANCELED) {
+ gf_log (this->name, GF_LOG_WARNING,
+ "Thread could not be cleaned up");
+ goto error_return;
+ }
+
+ return 0;
+
+ error_return:
+ return -1;
+}
diff --git a/xlators/features/changelog/lib/src/gf-changelog-helpers.h b/xlators/features/changelog/lib/src/gf-changelog-helpers.h
index 9b875d45dcc..17b8862a89b 100644
--- a/xlators/features/changelog/lib/src/gf-changelog-helpers.h
+++ b/xlators/features/changelog/lib/src/gf-changelog-helpers.h
@@ -18,6 +18,11 @@
#include <xlator.h>
+#include "changelog.h"
+
+#include "changelog-rpc-common.h"
+#include "gf-changelog-journal.h"
+
#define GF_CHANGELOG_TRACKER "tracker"
#define GF_CHANGELOG_CURRENT_DIR ".current"
@@ -41,79 +46,143 @@ typedef struct read_line {
char rl_buf[MAXLINE];
} read_line_t;
+struct gf_changelog;
+
+/**
+ * Event list for ordered event notification
+ *
+ * ->next_seq holds the next _expected_ sequence number.
+ */
+struct gf_event_list {
+ pthread_mutex_t lock; /* protects this structure */
+ pthread_cond_t cond;
+
+ pthread_t invoker;
+
+ unsigned long next_seq; /* next sequence number expected:
+ zero during bootstrap */
+
+ struct gf_changelog *entry; /* backpointer to it's brick
+ encapsulator (entry) */
+ struct list_head events; /* list of events (ordered) */
+};
+
+/**
+ * include a refcount if it's of use by additional layers
+ */
+struct gf_event {
+ int count;
+
+ unsigned long seq;
+
+ struct list_head list;
+
+ struct iovec iov[0];
+};
+#define GF_EVENT_CALLOC_SIZE(cnt, len) \
+ (sizeof (struct gf_event) + (cnt * sizeof (struct iovec)) + len)
+
+/**
+ * assign the base address of the IO vector to the correct memory
+ * area and set it's addressable length.
+ */
+#define GF_EVENT_ASSIGN_IOVEC(vec, event, len, pos) \
+ do { \
+ vec->iov_base = ((char *)event) + \
+ sizeof (struct gf_event) + \
+ (event->count * sizeof (struct iovec)) + pos; \
+ vec->iov_len = len; \
+ pos += len; \
+ } while (0)
+
+/**
+ * An instance of this structure is allocated for each brick for which
+ * notifications are streamed.
+ */
typedef struct gf_changelog {
xlator_t *this;
- /* 'processing' directory stream */
- DIR *gfc_dir;
-
- /* fd to the tracker file */
- int gfc_fd;
-
- /* connection retries */
- int gfc_connretries;
+ struct list_head list; /* list of instances */
- char gfc_sockpath[UNIX_PATH_MAX];
+ char brick[PATH_MAX]; /* brick path for this end-point */
- char gfc_brickpath[PATH_MAX];
+ changelog_rpc_t grpc; /* rpc{-clnt,svc} for this brick */
+#define RPC_PROBER(ent) ent->grpc.rpc
+#define RPC_REBORP(ent) ent->grpc.svc
+#define RPC_SOCK(ent) ent->grpc.sock
- /* socket for receiving notifications */
- int gfc_sockfd;
+ unsigned int notify; /* notification flag(s) */
- char *gfc_working_dir;
+ FINI *fini; /* destructor callback */
+ CALLBACK *callback; /* event callback dispatcher */
+ CONNECT *connected; /* connect callback */
+ DISCONNECT *disconnected; /* disconnection callback */
- /* RFC 3986 string encoding */
- char rfc3986[256];
+ void *ptr; /* owner specific private data */
+ xlator_t *invokerxl; /* consumers _this_, if valid,
+ assigned to THIS before cbk is
+ invoked */
- char gfc_current_dir[PATH_MAX];
- char gfc_processed_dir[PATH_MAX];
- char gfc_processing_dir[PATH_MAX];
+ gf_boolean_t ordered;
- pthread_t gfc_changelog_processor;
-
- /* Holds gfc for History API */
- struct gf_changelog *hist_gfc;
-
- /* holds 0 done scanning, 1 keep scanning and -1 error */
- int hist_done;
+ struct gf_event_list event;
} gf_changelog_t;
-typedef struct gf_changelog_history_data {
- int len;
-
- int htime_fd;
-
- /* parallelism count */
- int n_parallel;
-
- /* history from, to indexes */
- unsigned long from;
- unsigned long to;
-} gf_changelog_history_data_t;
-
-typedef struct gf_changelog_consume_data {
- /** set of inputs */
-
- /* fd to read from */
- int fd;
-
- /* from @offset */
- off_t offset;
-
- xlator_t *this;
- gf_changelog_t *gfc;
-
- /** set of outputs */
+static inline int
+gf_changelog_filter_check (gf_changelog_t *entry, changelog_event_t *event)
+{
+ if (event->ev_type & entry->notify)
+ return 1;
+ return 0;
+}
+
+#define GF_NEED_ORDERED_EVENTS(ent) (ent->ordered == _gf_true)
+
+/** private structure */
+typedef struct gf_private {
+ gf_lock_t lock; /* protects ->connections */
+
+ void *api; /* pointer for API access */
+
+ pthread_t poller; /* event poller thread */
+
+ struct list_head connections; /* list of connections */
+} gf_private_t;
+
+#define GF_CHANGELOG_GET_API_PTR(this) (((gf_private_t *) this->private)->api)
+
+/**
+ * upcall: invoke callback with _correct_ THIS
+ */
+#define GF_CHANGELOG_INVOKE_CBK(this, cbk, brick, args ...) \
+ do { \
+ xlator_t *old_this = NULL; \
+ xlator_t *invokerxl = NULL; \
+ \
+ invokerxl = entry->invokerxl; \
+ old_this = this; \
+ \
+ if (invokerxl) { \
+ THIS = invokerxl; \
+ } \
+ \
+ cbk (invokerxl, brick, args); \
+ THIS = old_this; \
+ \
+ } while (0)
- /* return value */
- int retval;
+#define SAVE_THIS(xl) \
+ do { \
+ old_this = xl; \
+ THIS = master; \
+ } while (0)
- /* journal processed */
- char changelog[PATH_MAX];
-} gf_changelog_consume_data_t;
+#define RESTORE_THIS() \
+ do { \
+ THIS = old_this; \
+ } while (0)
-int
-gf_changelog_notification_init (xlator_t *this, gf_changelog_t *gfc);
+/** APIs and the rest */
void *
gf_changelog_process (void *data);
@@ -138,9 +207,14 @@ gf_lseek (int fd, off_t offset, int whence);
int
gf_changelog_consume (xlator_t *this,
- gf_changelog_t *gfc,
+ gf_changelog_journal_t *jnl,
char *from_path, gf_boolean_t no_publish);
int
-gf_changelog_publish (xlator_t *this, gf_changelog_t *gfc, char *from_path);
+gf_changelog_publish (xlator_t *this,
+ gf_changelog_journal_t *jnl, char *from_path);
+int
+gf_thread_cleanup (xlator_t *this, pthread_t thread);
+void *
+gf_changelog_callback_invoker (void *arg);
#endif
diff --git a/xlators/features/changelog/lib/src/gf-changelog-process.c b/xlators/features/changelog/lib/src/gf-changelog-journal-handler.c
index 1a275e676fb..6ee6f9f074f 100644
--- a/xlators/features/changelog/lib/src/gf-changelog-process.c
+++ b/xlators/features/changelog/lib/src/gf-changelog-journal-handler.c
@@ -1,5 +1,5 @@
/*
- Copyright (c) 2013 Red Hat, Inc. <http://www.redhat.com>
+ Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com>
This file is part of GlusterFS.
This file is licensed to you under your choice of the GNU Lesser
@@ -8,9 +8,6 @@
cases as published by the Free Software Foundation.
*/
-#include <unistd.h>
-#include <pthread.h>
-
#include "uuid.h"
#include "globals.h"
#include "glusterfs.h"
@@ -19,6 +16,9 @@
/* from the changelog translator */
#include "changelog-misc.h"
+#include "changelog-mem-types.h"
+
+#include "gf-changelog-journal.h"
extern int byebye;
@@ -98,7 +98,7 @@ conv_noop (char *ptr) { return ptr; }
MOVER_MOVE (mover, nleft, sizeof (uuid_t)); \
} \
-#define LINE_BUFSIZE 3*PATH_MAX /* enough buffer for extra chars too */
+#define LINE_BUFSIZE (3*PATH_MAX) /* enough buffer for extra chars too */
/**
* using mmap() makes parsing easy. fgets() cannot be used here as
@@ -114,7 +114,8 @@ conv_noop (char *ptr) { return ptr; }
static int
gf_changelog_parse_binary (xlator_t *this,
- gf_changelog_t *gfc, int from_fd, int to_fd,
+ gf_changelog_journal_t *jnl,
+ int from_fd, int to_fd,
size_t start_offset, struct stat *stbuf)
{
@@ -165,7 +166,8 @@ gf_changelog_parse_binary (xlator_t *this,
PARSE_GFID_MOVE (ptr, uuid, mover, nleft, parse_err);
bname_start = mover;
- if ( (bname_end = strchr (mover, '\n')) == NULL ) {
+ bname_end = strchr (mover, '\n');
+ if (bname_end == NULL) {
parse_err = 1;
break;
}
@@ -201,7 +203,7 @@ gf_changelog_parse_binary (xlator_t *this,
MOVER_MOVE (mover, nleft, 1);
}
- if ( (nleft == 0) && (!parse_err))
+ if ((nleft == 0) && (!parse_err))
ret = 0;
if (munmap (start, stbuf->st_size))
@@ -218,7 +220,8 @@ gf_changelog_parse_binary (xlator_t *this,
*/
static int
gf_changelog_parse_ascii (xlator_t *this,
- gf_changelog_t *gfc, int from_fd, int to_fd,
+ gf_changelog_journal_t *jnl,
+ int from_fd, int to_fd,
size_t start_offset, struct stat *stbuf)
{
int ng = 0;
@@ -281,7 +284,8 @@ gf_changelog_parse_ascii (xlator_t *this,
VERIFY_SEPARATOR (mover, len, parse_err);
fop = atoi (mover);
- if ( (fopname = gf_fop_list[fop]) == NULL) {
+ fopname = gf_fop_list[fop];
+ if (fopname == NULL) {
parse_err = 1;
break;
}
@@ -309,7 +313,8 @@ gf_changelog_parse_ascii (xlator_t *this,
VERIFY_SEPARATOR (mover, len, parse_err);
fop = atoi (mover);
- if ( (fopname = gf_fop_list[fop]) == NULL) {
+ fopname = gf_fop_list[fop];
+ if (fopname == NULL) {
parse_err = 1;
break;
}
@@ -320,7 +325,7 @@ gf_changelog_parse_ascii (xlator_t *this,
GF_CHANGELOG_FILL_BUFFER (fopname, ascii, off, len);
ng = nr_extra_recs[fop];
- for (;ng > 0; ng--) {
+ for (; ng > 0; ng--) {
MOVER_MOVE (mover, nleft, 1);
len = strlen (mover);
VERIFY_SEPARATOR (mover, len, parse_err);
@@ -346,7 +351,7 @@ gf_changelog_parse_ascii (xlator_t *this,
}
gf_rfc3986_encode ((unsigned char *) ptr,
- eptr, gfc->rfc3986);
+ eptr, jnl->rfc3986);
FILL_AND_MOVE (eptr, ascii, off,
mover, nleft, len);
free (eptr);
@@ -374,7 +379,7 @@ gf_changelog_parse_ascii (xlator_t *this,
}
- if ( (nleft == 0) && (!parse_err))
+ if ((nleft == 0) && (!parse_err))
ret = 0;
if (munmap (start, stbuf->st_size))
@@ -410,8 +415,8 @@ gf_changelog_copy (xlator_t *this, int from_fd, int to_fd)
}
static int
-gf_changelog_decode (xlator_t *this, gf_changelog_t *gfc, int from_fd,
- int to_fd, struct stat *stbuf, int *zerob)
+gf_changelog_decode (xlator_t *this, gf_changelog_journal_t *jnl,
+ int from_fd, int to_fd, struct stat *stbuf, int *zerob)
{
int ret = -1;
int encoding = -1;
@@ -441,12 +446,12 @@ gf_changelog_decode (xlator_t *this, gf_changelog_t *gfc, int from_fd,
* this ideally should have been a part of changelog-encoders.c
* (ie. part of the changelog translator).
*/
- ret = gf_changelog_parse_binary (this, gfc, from_fd,
+ ret = gf_changelog_parse_binary (this, jnl, from_fd,
to_fd, elen, stbuf);
break;
case CHANGELOG_ENCODE_ASCII:
- ret = gf_changelog_parse_ascii (this, gfc, from_fd,
+ ret = gf_changelog_parse_ascii (this, jnl, from_fd,
to_fd, elen, stbuf);
break;
default:
@@ -458,7 +463,8 @@ gf_changelog_decode (xlator_t *this, gf_changelog_t *gfc, int from_fd,
}
int
-gf_changelog_publish (xlator_t *this, gf_changelog_t *gfc, char *from_path)
+gf_changelog_publish (xlator_t *this,
+ gf_changelog_journal_t *jnl, char *from_path)
{
int ret = 0;
char dest[PATH_MAX] = {0,};
@@ -466,21 +472,21 @@ gf_changelog_publish (xlator_t *this, gf_changelog_t *gfc, char *from_path)
struct stat stbuf = {0,};
(void) snprintf (to_path, PATH_MAX, "%s%s",
- gfc->gfc_current_dir, basename (from_path));
+ jnl->jnl_current_dir, basename (from_path));
/* handle zerob file that wont exist in current */
ret = stat (to_path, &stbuf);
- if (ret){
+ if (ret) {
if (errno == ENOENT)
ret = 0;
goto out;
}
(void) snprintf (dest, PATH_MAX, "%s%s",
- gfc->gfc_processing_dir, basename (from_path));
+ jnl->jnl_processing_dir, basename (from_path));
ret = rename (to_path, dest);
- if (ret){
+ if (ret) {
gf_log (this->name, GF_LOG_ERROR,
"error moving %s to processing dir"
" (reason: %s)", to_path, strerror (errno));
@@ -492,7 +498,7 @@ out:
int
gf_changelog_consume (xlator_t *this,
- gf_changelog_t *gfc,
+ gf_changelog_journal_t *jnl,
char *from_path, gf_boolean_t no_publish)
{
int ret = -1;
@@ -520,9 +526,9 @@ gf_changelog_consume (xlator_t *this,
}
(void) snprintf (to_path, PATH_MAX, "%s%s",
- gfc->gfc_current_dir, basename (from_path));
+ jnl->jnl_current_dir, basename (from_path));
(void) snprintf (dest, PATH_MAX, "%s%s",
- gfc->gfc_processing_dir, basename (from_path));
+ jnl->jnl_processing_dir, basename (from_path));
fd2 = open (to_path, O_CREAT | O_TRUNC | O_RDWR,
S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
@@ -532,7 +538,7 @@ gf_changelog_consume (xlator_t *this,
to_path, strerror (errno));
goto close_fd;
} else {
- ret = gf_changelog_decode (this, gfc, fd1,
+ ret = gf_changelog_decode (this, jnl, fd1,
fd2, &stbuf, &zerob);
close (fd2);
@@ -568,88 +574,412 @@ gf_changelog_consume (xlator_t *this,
return ret;
}
-static char *
-gf_changelog_ext_change (xlator_t *this,
- gf_changelog_t *gfc, char *path, size_t readlen)
+void *
+gf_changelog_process (void *data)
{
- int alo = 0;
- int ret = 0;
- size_t len = 0;
- char *buf = NULL;
-
- buf = path;
- while (len < readlen) {
- if (*buf == '\0') {
- alo = 1;
- gf_log (this->name, GF_LOG_DEBUG,
- "processing changelog: %s", path);
- ret = gf_changelog_consume (this, gfc, path, _gf_false);
- }
+ int ret = 0;
+ xlator_t *this = NULL;
+ gf_changelog_journal_t *jnl = NULL;
+ gf_changelog_entry_t *entry = NULL;
+ gf_changelog_processor_t *jnl_proc = NULL;
- if (ret)
- break;
+ this = THIS;
- len++; buf++;
- if (alo) {
- alo = 0;
- path = buf;
+ jnl = data;
+ jnl_proc = jnl->jnl_proc;
+
+ while (1) {
+ pthread_mutex_lock (&jnl_proc->lock);
+ {
+ while (list_empty (&jnl_proc->entries)) {
+ jnl_proc->waiting = _gf_true;
+ pthread_cond_wait
+ (&jnl_proc->cond, &jnl_proc->lock);
+ }
+
+ entry = list_first_entry (&jnl_proc->entries,
+ gf_changelog_entry_t, list);
+ list_del (&entry->list);
+ jnl_proc->waiting = _gf_false;
}
+ pthread_mutex_unlock (&jnl_proc->lock);
+
+ if (entry) {
+ ret = gf_changelog_consume (this, jnl,
+ entry->path, _gf_false);
+ GF_FREE (entry);
+ }
+ }
+
+ return NULL;
+}
+
+inline void
+gf_changelog_queue_journal (gf_changelog_processor_t *jnl_proc,
+ changelog_event_t *event)
+{
+ size_t len = 0;
+ gf_changelog_entry_t *entry = NULL;
+
+ entry = GF_CALLOC (1, sizeof (gf_changelog_entry_t),
+ gf_changelog_mt_libgfchangelog_entry_t);
+ if (!entry)
+ return;
+ INIT_LIST_HEAD (&entry->list);
+
+ len = strlen (event->u.journal.path);
+ (void)memcpy (entry->path, event->u.journal.path, len+1);
+
+ pthread_mutex_lock (&jnl_proc->lock);
+ {
+ list_add_tail (&entry->list, &jnl_proc->entries);
+ if (jnl_proc->waiting)
+ pthread_cond_signal (&jnl_proc->cond);
+ }
+ pthread_mutex_unlock (&jnl_proc->lock);
+
+ return;
+}
+
+void
+gf_changelog_handle_journal (void *xl, char *brick,
+ void *cbkdata, changelog_event_t *event)
+{
+ int ret = 0;
+ gf_changelog_journal_t *jnl = NULL;
+ gf_changelog_processor_t *jnl_proc = NULL;
+
+ jnl = cbkdata;
+ jnl_proc = jnl->jnl_proc;
+
+ gf_changelog_queue_journal (jnl_proc, event);
+}
+
+void
+gf_changelog_journal_disconnect (void *xl, char *brick, void *data)
+{
+ gf_changelog_journal_t *jnl = NULL;
+
+ jnl = data;
+
+ pthread_spin_lock (&jnl->lock);
+ {
+ JNL_SET_API_STATE (jnl, JNL_API_DISCONNECTED);
+ };
+ pthread_spin_unlock (&jnl->lock);
+}
+
+void
+gf_changelog_journal_connect (void *xl, char *brick, void *data)
+{
+ gf_changelog_journal_t *jnl = NULL;
+
+ jnl = data;
+
+ pthread_spin_lock (&jnl->lock);
+ {
+ JNL_SET_API_STATE (jnl, JNL_API_CONNECTED);
+ };
+ pthread_spin_unlock (&jnl->lock);
+
+ return;
+}
+
+void
+gf_changelog_cleanup_processor (gf_changelog_journal_t *jnl)
+{
+ int ret = 0;
+ xlator_t *this = NULL;
+ gf_changelog_processor_t *jnl_proc = NULL;
+
+ this = THIS;
+ if (!this || !jnl || !jnl->jnl_proc)
+ goto error_return;
+
+ jnl_proc = jnl->jnl_proc;
+
+ ret = gf_thread_cleanup (this, jnl_proc->processor);
+ if (ret != 0) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "failed to cleanup processor thread");
+ goto error_return;
+ }
+
+ (void)pthread_mutex_destroy (&jnl_proc->lock);
+ (void)pthread_cond_destroy (&jnl_proc->cond);
+
+ GF_FREE (jnl_proc);
+
+ error_return:
+ return;
+}
+
+int
+gf_changelog_init_processor (gf_changelog_journal_t *jnl)
+{
+ int ret = -1;
+ gf_changelog_processor_t *jnl_proc = NULL;
+
+ jnl_proc = GF_CALLOC (1, sizeof (gf_changelog_processor_t),
+ gf_changelog_mt_libgfchangelog_t);
+ if (!jnl_proc)
+ goto error_return;
+
+ ret = pthread_mutex_init (&jnl_proc->lock, NULL);
+ if (ret != 0)
+ goto free_jnl_proc;
+ ret = pthread_cond_init (&jnl_proc->cond, NULL);
+ if (ret != 0)
+ goto cleanup_mutex;
+
+ INIT_LIST_HEAD (&jnl_proc->entries);
+ ret = pthread_create (&jnl_proc->processor,
+ NULL, gf_changelog_process, jnl);
+ if (ret != 0)
+ goto cleanup_cond;
+ jnl_proc->waiting = _gf_false;
+
+ jnl->jnl_proc = jnl_proc;
+ return 0;
+
+ cleanup_cond:
+ (void) pthread_cond_destroy (&jnl_proc->cond);
+ cleanup_mutex:
+ (void) pthread_mutex_destroy (&jnl_proc->lock);
+ free_jnl_proc:
+ GF_FREE (jnl_proc);
+ error_return:
+ return -1;
+}
+
+static void
+gf_changelog_cleanup_fds (gf_changelog_journal_t *jnl)
+{
+ /* tracker fd */
+ if (jnl->jnl_fd != -1)
+ close (jnl->jnl_fd);
+ /* processing dir */
+ if (jnl->jnl_dir)
+ closedir (jnl->jnl_dir);
+
+ if (jnl->jnl_working_dir)
+ free (jnl->jnl_working_dir); /* allocated by realpath */
+}
+
+static int
+gf_changelog_open_dirs (xlator_t *this, gf_changelog_journal_t *jnl)
+{
+ int ret = -1;
+ DIR *dir = NULL;
+ int tracker_fd = 0;
+ char tracker_path[PATH_MAX] = {0,};
+
+ /* .current */
+ (void) snprintf (jnl->jnl_current_dir, PATH_MAX,
+ "%s/"GF_CHANGELOG_CURRENT_DIR"/",
+ jnl->jnl_working_dir);
+ ret = recursive_rmdir (jnl->jnl_current_dir);
+ if (ret) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "Failed to rmdir: %s, err: %s",
+ jnl->jnl_current_dir, strerror (errno));
+ goto out;
+ }
+ ret = mkdir_p (jnl->jnl_current_dir, 0600, _gf_false);
+ if (ret)
+ goto out;
+
+ /* .processed */
+ (void) snprintf (jnl->jnl_processed_dir, PATH_MAX,
+ "%s/"GF_CHANGELOG_PROCESSED_DIR"/",
+ jnl->jnl_working_dir);
+ ret = mkdir_p (jnl->jnl_processed_dir, 0600, _gf_false);
+ if (ret)
+ goto out;
+
+ /* .processing */
+ (void) snprintf (jnl->jnl_processing_dir, PATH_MAX,
+ "%s/"GF_CHANGELOG_PROCESSING_DIR"/",
+ jnl->jnl_working_dir);
+ ret = recursive_rmdir (jnl->jnl_processing_dir);
+ if (ret) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "Failed to rmdir: %s, err: %s",
+ jnl->jnl_processing_dir, strerror (errno));
+ goto out;
+ }
+
+ ret = mkdir_p (jnl->jnl_processing_dir, 0600, _gf_false);
+ if (ret)
+ goto out;
+
+ dir = opendir (jnl->jnl_processing_dir);
+ if (!dir) {
+ gf_log ("", GF_LOG_ERROR,
+ "opendir() error [reason: %s]", strerror (errno));
+ goto out;
}
- return (ret) ? NULL : path;
+ jnl->jnl_dir = dir;
+
+ (void) snprintf (tracker_path, PATH_MAX,
+ "%s/"GF_CHANGELOG_TRACKER, jnl->jnl_working_dir);
+
+ tracker_fd = open (tracker_path, O_CREAT | O_APPEND | O_RDWR,
+ S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
+ if (tracker_fd < 0) {
+ closedir (jnl->jnl_dir);
+ ret = -1;
+ goto out;
+ }
+
+ jnl->jnl_fd = tracker_fd;
+ ret = 0;
+ out:
+ return ret;
+}
+
+int
+gf_changelog_init_history (xlator_t *this,
+ gf_changelog_journal_t *jnl,
+ char *brick_path, char *scratch_dir)
+{
+ int i = 0;
+ int ret = 0;
+ char hist_scratch_dir[PATH_MAX] = {0,};
+
+ jnl->hist_jnl = GF_CALLOC (1, sizeof (*jnl),
+ gf_changelog_mt_libgfchangelog_t);
+ if (!jnl->hist_jnl)
+ goto error_return;
+
+ jnl->hist_jnl->jnl_dir = NULL;
+ jnl->hist_jnl->jnl_fd = -1;
+
+ (void) strncpy (hist_scratch_dir, scratch_dir, PATH_MAX);
+ (void) snprintf (hist_scratch_dir, PATH_MAX,
+ "%s/"GF_CHANGELOG_HISTORY_DIR"/",
+ jnl->jnl_working_dir);
+
+ ret = mkdir_p (hist_scratch_dir, 0600, _gf_false);
+ if (ret)
+ goto dealloc_hist;
+
+ jnl->hist_jnl->jnl_working_dir = realpath (hist_scratch_dir, NULL);
+ if (!jnl->hist_jnl->jnl_working_dir)
+ goto dealloc_hist;
+
+ ret = gf_changelog_open_dirs (this, jnl->hist_jnl);
+ if (ret) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "could not create entries in history scratch dir");
+ goto dealloc_hist;
+ }
+
+ (void) strncpy (jnl->hist_jnl->jnl_brickpath, brick_path, PATH_MAX);
+
+ for (i = 0; i < 256; i++) {
+ jnl->hist_jnl->rfc3986[i] =
+ (isalnum(i) || i == '~' ||
+ i == '-' || i == '.' || i == '_') ? i : 0;
+ }
+
+ return 0;
+
+ dealloc_hist:
+ GF_FREE (jnl->hist_jnl);
+ jnl->hist_jnl = NULL;
+ error_return:
+ return -1;
+}
+
+void
+gf_changelog_journal_fini (void *xl, char *brick, void *data)
+{
+ int ret = 0;
+ xlator_t *this = NULL;
+ gf_changelog_journal_t *jnl = NULL;
+
+ this = xl;
+ jnl = data;
+
+ gf_changelog_cleanup_processor (jnl);
+
+ gf_changelog_cleanup_fds (jnl);
+ if (jnl->hist_jnl)
+ gf_changelog_cleanup_fds (jnl->hist_jnl);
+
+ GF_FREE (jnl);
}
void *
-gf_changelog_process (void *data)
+gf_changelog_journal_init (void *xl, struct gf_brick_spec *brick)
{
- ssize_t len = 0;
- ssize_t offlen = 0;
- xlator_t *this = NULL;
- char *sbuf = NULL;
- gf_changelog_t *gfc = NULL;
- char from_path[PATH_MAX] = {0,};
-
- gfc = (gf_changelog_t *) data;
- this = gfc->this;
-
- pthread_detach (pthread_self());
-
- for (;;) {
- len = gf_changelog_read_path (gfc->gfc_sockfd,
- from_path + offlen,
- PATH_MAX - offlen);
- if (len < 0)
- continue; /* ignore it for now */
-
- if (len == 0) { /* close() from the changelog translator */
- gf_log (this->name, GF_LOG_INFO, "close from changelog"
- " notification translator.");
-
- if (gfc->gfc_connretries != 1) {
- if (!gf_changelog_notification_init(this, gfc))
- continue;
- }
+ int i = 0;
+ int ret = 0;
+ xlator_t *this = NULL;
+ struct stat buf = {0,};
+ char *scratch_dir = NULL;
+ gf_changelog_journal_t *jnl = NULL;
+
+ this = xl;
+ scratch_dir = (char *) brick->ptr;
+
+ jnl = GF_CALLOC (1, sizeof (gf_changelog_journal_t),
+ gf_changelog_mt_libgfchangelog_t);
+ if (!jnl)
+ goto error_return;
+
+ if (stat (scratch_dir, &buf) && errno == ENOENT) {
+ ret = mkdir_p (scratch_dir, 0600, _gf_true);
+ if (ret)
+ goto dealloc_private;
+ }
- byebye = 1;
- break;
- }
+ jnl->jnl_working_dir = realpath (scratch_dir, NULL);
+ if (!jnl->jnl_working_dir)
+ goto dealloc_private;
- len += offlen;
- sbuf = gf_changelog_ext_change (this, gfc, from_path, len);
- if (!sbuf) {
- gf_log (this->name, GF_LOG_ERROR,
- "could not extract changelog filename");
- continue;
- }
+ ret = gf_changelog_open_dirs (this, jnl);
+ if (ret) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "could not create entries in scratch dir");
+ goto dealloc_private;
+ }
- offlen = 0;
- if (sbuf != (from_path + len)) {
- offlen = from_path + len - sbuf;
- memmove (from_path, sbuf, offlen);
- }
+ (void) strncpy (jnl->jnl_brickpath, brick->brick_path, PATH_MAX);
+
+ /* RFC 3986 {de,en}coding */
+ for (i = 0; i < 256; i++) {
+ jnl->rfc3986[i] =
+ (isalnum(i) || i == '~' ||
+ i == '-' || i == '.' || i == '_') ? i : 0;
}
- gf_log (this->name, GF_LOG_DEBUG,
- "byebye (%d) from processing thread...", byebye);
+ ret = gf_changelog_init_history (this, jnl,
+ brick->brick_path, scratch_dir);
+ if (ret)
+ goto cleanup_fds;
+
+ /* initialize journal processor */
+ ret = gf_changelog_init_processor (jnl);
+ if (ret)
+ goto cleanup_fds;
+
+ JNL_SET_API_STATE (jnl, JNL_API_CONN_INPROGESS);
+ ret = pthread_spin_init (&jnl->lock, 0);
+ if (ret != 0)
+ goto cleanup_processor;
+ return jnl;
+
+ cleanup_processor:
+ gf_changelog_cleanup_processor (jnl);
+ cleanup_fds:
+ gf_changelog_cleanup_fds (jnl);
+ if (jnl->hist_jnl)
+ gf_changelog_cleanup_fds (jnl->hist_jnl);
+ dealloc_private:
+ GF_FREE (jnl);
+ error_return:
return NULL;
}
diff --git a/xlators/features/changelog/lib/src/gf-changelog-journal.h b/xlators/features/changelog/lib/src/gf-changelog-journal.h
new file mode 100644
index 00000000000..9a0f0b28956
--- /dev/null
+++ b/xlators/features/changelog/lib/src/gf-changelog-journal.h
@@ -0,0 +1,114 @@
+/*
+ Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com>
+ This file is part of GlusterFS.
+
+ This file is licensed to you under your choice of the GNU Lesser
+ General Public License, version 3 or any later version (LGPLv3 or
+ later), or the GNU General Public License, version 2 (GPLv2), in all
+ cases as published by the Free Software Foundation.
+*/
+
+#ifndef __GF_CHANGELOG_JOURNAL_H
+#define __GF_CHANGELOG_JOURNAL_H
+
+#include <unistd.h>
+#include <pthread.h>
+
+#include "changelog.h"
+
+enum api_conn {
+ JNL_API_CONNECTED,
+ JNL_API_CONN_INPROGESS,
+ JNL_API_DISCONNECTED,
+};
+
+typedef struct gf_changelog_entry {
+ char path[PATH_MAX];
+
+ struct list_head list;
+} gf_changelog_entry_t;
+
+typedef struct gf_changelog_processor {
+ pthread_mutex_t lock; /* protects ->entries */
+ pthread_cond_t cond; /* waiter during empty list */
+ gf_boolean_t waiting;
+
+ pthread_t processor; /* thread-id of journal processing thread */
+
+ struct list_head entries;
+} gf_changelog_processor_t;
+
+typedef struct gf_changelog_journal {
+ DIR *jnl_dir; /* 'processing' directory stream */
+
+ int jnl_fd; /* fd to the tracker file */
+
+ char jnl_brickpath[PATH_MAX]; /* brick path for this end-point */
+
+ gf_changelog_processor_t *jnl_proc;
+
+ char *jnl_working_dir; /* scratch directory */
+
+ char jnl_current_dir[PATH_MAX];
+ char jnl_processed_dir[PATH_MAX];
+ char jnl_processing_dir[PATH_MAX];
+
+ char rfc3986[256]; /* RFC 3986 string encoding */
+
+ struct gf_changelog_journal *hist_jnl;
+ int hist_done; /* holds 0 done scanning,
+ 1 keep scanning and -1 error */
+
+ pthread_spinlock_t lock;
+ int connected;
+} gf_changelog_journal_t;
+
+#define JNL_SET_API_STATE(jnl, state) (jnl->connected = state)
+#define JNL_IS_API_DISCONNECTED(jnl) (jnl->connected == JNL_API_DISCONNECTED)
+
+/* History API */
+typedef struct gf_changelog_history_data {
+ int len;
+
+ int htime_fd;
+
+ /* parallelism count */
+ int n_parallel;
+
+ /* history from, to indexes */
+ unsigned long from;
+ unsigned long to;
+} gf_changelog_history_data_t;
+
+typedef struct gf_changelog_consume_data {
+ /** set of inputs */
+
+ /* fd to read from */
+ int fd;
+
+ /* from @offset */
+ off_t offset;
+
+ xlator_t *this;
+
+ gf_changelog_journal_t *jnl;
+
+ /** set of outputs */
+
+ /* return value */
+ int retval;
+
+ /* journal processed */
+ char changelog[PATH_MAX];
+} gf_changelog_consume_data_t;
+
+/* event handler */
+CALLBACK gf_changelog_handle_journal;
+
+/* init, connect & disconnect handler */
+INIT gf_changelog_journal_init;
+FINI gf_changelog_journal_fini;
+CONNECT gf_changelog_journal_connect;
+DISCONNECT gf_changelog_journal_disconnect;
+
+#endif
diff --git a/xlators/features/changelog/lib/src/gf-changelog-reborp.c b/xlators/features/changelog/lib/src/gf-changelog-reborp.c
new file mode 100644
index 00000000000..d7e60fb9634
--- /dev/null
+++ b/xlators/features/changelog/lib/src/gf-changelog-reborp.c
@@ -0,0 +1,381 @@
+/*
+ Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com>
+ This file is part of GlusterFS.
+
+ This file is licensed to you under your choice of the GNU Lesser
+ General Public License, version 3 or any later version (LGPLv3 or
+ later), or the GNU General Public License, version 2 (GPLv2), in all
+ cases as published by the Free Software Foundation.
+*/
+
+#include "changelog-misc.h"
+#include "changelog-mem-types.h"
+
+#include "gf-changelog-helpers.h"
+#include "changelog-rpc-common.h"
+
+/**
+ * Reverse socket: actual data transfer handler. Connection
+ * initiator is PROBER, data transfer is REBORP.
+ */
+
+struct rpcsvc_program *gf_changelog_reborp_programs[];
+
+/**
+ * On a reverse connection, unlink the socket file.
+ */
+int
+gf_changelog_reborp_rpcsvc_notify (rpcsvc_t *rpc, void *mydata,
+ rpcsvc_event_t event, void *data)
+{
+ int ret = 0;
+ xlator_t *this = NULL;
+ gf_private_t *priv = NULL;
+ gf_changelog_t *entry = NULL;
+ char sock[UNIX_PATH_MAX] = {0,};
+
+ entry = mydata;
+ this = entry->this;
+ priv = this->private;
+
+ switch (event) {
+ case RPCSVC_EVENT_ACCEPT:
+ ret = unlink (RPC_SOCK(entry));
+ if (ret != 0)
+ gf_log (this->name, GF_LOG_WARNING, "failed to unlink"
+ " reverse socket file %s", RPC_SOCK (entry));
+ if (entry->connected)
+ GF_CHANGELOG_INVOKE_CBK (this, entry->connected,
+ entry->brick, entry->ptr);
+ break;
+ case RPCSVC_EVENT_DISCONNECT:
+ LOCK (&priv->lock);
+ {
+ list_del (&entry->list);
+ }
+ UNLOCK (&priv->lock);
+
+ if (entry->disconnected)
+ GF_CHANGELOG_INVOKE_CBK (this, entry->disconnected,
+ entry->brick, entry->ptr);
+
+ GF_FREE (entry);
+ break;
+ default:
+ break;
+ }
+
+ return 0;
+}
+
+rpcsvc_t *
+gf_changelog_reborp_init_rpc_listner (xlator_t *this,
+ char *path, char *sock, void *cbkdata)
+{
+ CHANGELOG_MAKE_TMP_SOCKET_PATH (path, sock, UNIX_PATH_MAX);
+ return changelog_rpc_server_init (this, sock, cbkdata,
+ gf_changelog_reborp_rpcsvc_notify,
+ gf_changelog_reborp_programs);
+}
+
+/**
+ * This is dirty and painful as of now untill there is event filtering in the
+ * server. The entire event buffer is scanned and interested events are picked,
+ * whereas we _should_ be notified with the events we were interested in
+ * (selected at the time of probe). As of now this is complete BS and needs
+ * fixture ASAP. I just made it work, it needs to be better.
+ *
+ * @FIXME: cleanup this bugger once server filters events.
+ */
+inline void
+gf_changelog_invoke_callback (gf_changelog_t *entry,
+ struct iovec **vec, int payloadcnt)
+{
+ int i = 0;
+ int evsize = 0;
+ xlator_t *this = NULL;
+ changelog_event_t *event = NULL;
+
+ this = entry->this;
+
+ for (; i < payloadcnt; i++) {
+ event = (changelog_event_t *)vec[i]->iov_base;
+ evsize = vec[i]->iov_len / CHANGELOG_EV_SIZE;
+
+ for (; evsize > 0; evsize--, event++) {
+ if (gf_changelog_filter_check (entry, event)) {
+ GF_CHANGELOG_INVOKE_CBK (this,
+ entry->callback,
+ entry->brick,
+ entry->ptr, event);
+ }
+ }
+ }
+}
+
+/**
+ * Ordered event handler is self-adaptive.. if the event sequence number
+ * is what's expected (->next_seq) there is no ordering list that's
+ * maintained. On out-of-order event notifications, event buffers are
+ * dynamically allocated and ordered.
+ */
+
+inline int
+__is_expected_sequence (struct gf_event_list *ev, struct gf_event *event)
+{
+ return (ev->next_seq == event->seq);
+}
+
+inline int
+__can_process_event (struct gf_event_list *ev, struct gf_event **event)
+{
+ *event = list_first_entry (&ev->events, struct gf_event, list);
+
+ if (__is_expected_sequence (ev, *event)) {
+ list_del (&(*event)->list);
+ ev->next_seq++;
+ return 1;
+ }
+
+ return 0;
+}
+
+inline void
+__process_event_list (struct gf_event_list *ev, struct gf_event **event)
+{
+ while (list_empty (&ev->events)
+ || !__can_process_event (ev, event))
+ pthread_cond_wait (&ev->cond, &ev->lock);
+}
+
+void *
+gf_changelog_callback_invoker (void *arg)
+{
+ int ret = 0;
+ xlator_t *this = NULL;
+ gf_changelog_t *entry = NULL;
+ struct iovec *vec = NULL;
+ struct gf_event *event = NULL;
+ struct gf_event_list *ev = NULL;
+
+ ev = arg;
+ entry = ev->entry;
+ this = entry->this;
+
+ while (1) {
+ pthread_mutex_lock (&ev->lock);
+ {
+ __process_event_list (ev, &event);
+ }
+ pthread_mutex_unlock (&ev->lock);
+
+ vec = (struct iovec *) &event->iov;
+ gf_changelog_invoke_callback (entry, &vec, event->count);
+
+ GF_FREE (event);
+ }
+
+ return NULL;
+}
+
+static int
+orderfn (struct list_head *pos1, struct list_head *pos2)
+{
+ struct gf_event *event1 = NULL;
+ struct gf_event *event2 = NULL;
+
+ event1 = list_entry (pos1, struct gf_event, list);
+ event2 = list_entry (pos2, struct gf_event, list);
+
+ if (event1->seq > event2->seq)
+ return 1;
+ return -1;
+}
+
+int
+gf_changelog_ordered_event_handler (rpcsvc_request_t *req,
+ xlator_t *this, gf_changelog_t *entry)
+{
+ int i = 0;
+ size_t payloadlen = 0;
+ ssize_t len = 0;
+ int payloadcnt = 0;
+ changelog_event_req rpc_req = {0,};
+ changelog_event_rsp rpc_rsp = {0,};
+ struct iovec *vec = NULL;
+ struct gf_event *event = NULL;
+ struct gf_event_list *ev = NULL;
+
+ ev = &entry->event;
+
+ len = xdr_to_generic (req->msg[0],
+ &rpc_req, (xdrproc_t)xdr_changelog_event_req);
+ if (len < 0) {
+ gf_log (this->name, GF_LOG_ERROR, "xdr decoding failed");
+ req->rpc_err = GARBAGE_ARGS;
+ goto handle_xdr_error;
+ }
+
+ if (len < req->msg[0].iov_len) {
+ payloadcnt = 1;
+ payloadlen = (req->msg[0].iov_len - len);
+ }
+ for (i = 1; i < req->count; i++) {
+ payloadcnt++;
+ payloadlen += req->msg[i].iov_len;
+ }
+
+ event = GF_CALLOC (1, GF_EVENT_CALLOC_SIZE (payloadcnt, payloadlen),
+ gf_changelog_mt_libgfchangelog_event_t);
+ if (!event)
+ goto handle_xdr_error;
+ INIT_LIST_HEAD (&event->list);
+
+ payloadlen = 0;
+ event->seq = rpc_req.seq;
+ event->count = payloadcnt;
+
+ /* deep copy IO vectors */
+ vec = &event->iov[0];
+ GF_EVENT_ASSIGN_IOVEC (vec, event,
+ (req->msg[0].iov_len - len), payloadlen);
+ (void) memcpy (vec->iov_base,
+ req->msg[0].iov_base + len, vec->iov_len);
+
+ for (i = 1; i < req->count; i++) {
+ vec = &event->iov[i];
+ GF_EVENT_ASSIGN_IOVEC (vec, event,
+ req->msg[i].iov_len, payloadlen);
+ (void) memcpy (event->iov[i].iov_base,
+ req->msg[i].iov_base, req->msg[i].iov_len);
+ }
+
+ gf_log (this->name, GF_LOG_DEBUG,
+ "seq: %lu [%s] (time: %lu.%lu), (vec: %d, len: %ld)",
+ rpc_req.seq, entry->brick, rpc_req.tv_sec,
+ rpc_req.tv_usec, payloadcnt, payloadlen);
+
+ /* add it to the ordered event list and wake up listner(s) */
+ pthread_mutex_lock (&ev->lock);
+ {
+ list_add_order (&event->list, &ev->events, orderfn);
+ if (!ev->next_seq)
+ ev->next_seq = event->seq;
+ if (ev->next_seq == event->seq)
+ pthread_cond_signal (&ev->cond);
+ }
+ pthread_mutex_unlock (&ev->lock);
+
+ /* ack sequence number */
+ rpc_rsp.op_ret = 0;
+ rpc_rsp.seq = rpc_req.seq;
+
+ goto submit_rpc;
+
+ handle_xdr_error:
+ rpc_rsp.op_ret = -1;
+ rpc_rsp.seq = 0; /* invalid */
+ submit_rpc:
+ return changelog_rpc_sumbit_reply (req, &rpc_rsp, NULL, 0, NULL,
+ (xdrproc_t)xdr_changelog_event_rsp);
+}
+
+int
+gf_changelog_unordered_event_handler (rpcsvc_request_t *req,
+ xlator_t *this, gf_changelog_t *entry)
+{
+ int i = 0;
+ int ret = 0;
+ ssize_t len = 0;
+ int payloadcnt = 0;
+ struct iovec vector[MAX_IOVEC] = {{0,}, };
+ changelog_event_req rpc_req = {0,};
+ changelog_event_rsp rpc_rsp = {0,};
+
+ len = xdr_to_generic (req->msg[0],
+ &rpc_req, (xdrproc_t)xdr_changelog_event_req);
+ if (len < 0) {
+ gf_log (this->name, GF_LOG_ERROR, "xdr decoding failed");
+ req->rpc_err = GARBAGE_ARGS;
+ goto handle_xdr_error;
+ }
+
+ /* prepare payload */
+ if (len < req->msg[0].iov_len) {
+ payloadcnt = 1;
+ vector[0].iov_base = (req->msg[0].iov_base + len);
+ vector[0].iov_len = (req->msg[0].iov_len - len);
+ }
+
+ for (i = 1; i < req->count; i++) {
+ vector[payloadcnt++] = req->msg[i];
+ }
+
+ gf_log (this->name, GF_LOG_DEBUG,
+ "seq: %lu (time: %lu.%lu), (vec: %d)",
+ rpc_req.seq, rpc_req.tv_sec, rpc_req.tv_usec, payloadcnt);
+
+ /* invoke callback */
+ struct iovec *vec = (struct iovec *) &vector;
+ gf_changelog_invoke_callback (entry, &vec, payloadcnt);
+
+ /* ack sequence number */
+ rpc_rsp.op_ret = 0;
+ rpc_rsp.seq = rpc_req.seq;
+
+ goto submit_rpc;
+
+ handle_xdr_error:
+ rpc_rsp.op_ret = -1;
+ rpc_rsp.seq = 0; /* invalid */
+ submit_rpc:
+ return changelog_rpc_sumbit_reply (req, &rpc_rsp, NULL, 0, NULL,
+ (xdrproc_t)xdr_changelog_event_rsp);
+}
+
+int
+gf_changelog_reborp_handle_event (rpcsvc_request_t *req)
+{
+ int ret = 0;
+ xlator_t *this = NULL;
+ rpcsvc_t *svc = NULL;
+ gf_changelog_t *entry = NULL;
+
+ svc = rpcsvc_request_service (req);
+ entry = svc->mydata;
+
+ this = THIS = entry->this;
+
+ ret = GF_NEED_ORDERED_EVENTS (entry)
+ ? gf_changelog_ordered_event_handler (req, this, entry)
+ : gf_changelog_unordered_event_handler (req, this, entry);
+
+ return ret;
+}
+
+rpcsvc_actor_t gf_changelog_reborp_actors[CHANGELOG_REV_PROC_MAX] = {
+ [CHANGELOG_REV_PROC_EVENT] = {
+ "CHANGELOG EVENT HANDLER", CHANGELOG_REV_PROC_EVENT,
+ gf_changelog_reborp_handle_event, NULL, 0, DRC_NA
+ },
+};
+
+/**
+ * Do not use synctask as the RPC layer dereferences ->mydata as THIS.
+ * In gf_changelog_setup_rpc(), @cbkdata is of type @gf_changelog_t,
+ * and that's required to invoke the callback with the appropriate
+ * brick path and it's private data.
+ */
+struct rpcsvc_program gf_changelog_reborp_prog = {
+ .progname = "LIBGFCHANGELOG REBORP",
+ .prognum = CHANGELOG_REV_RPC_PROCNUM,
+ .progver = CHANGELOG_REV_RPC_PROCVER,
+ .numactors = CHANGELOG_REV_PROC_MAX,
+ .actors = gf_changelog_reborp_actors,
+ .synctask = _gf_false,
+};
+
+struct rpcsvc_program *gf_changelog_reborp_programs[] = {
+ &gf_changelog_reborp_prog,
+ NULL,
+};
diff --git a/xlators/features/changelog/lib/src/gf-changelog-rpc.c b/xlators/features/changelog/lib/src/gf-changelog-rpc.c
new file mode 100644
index 00000000000..c2a4c044d23
--- /dev/null
+++ b/xlators/features/changelog/lib/src/gf-changelog-rpc.c
@@ -0,0 +1,105 @@
+/*
+ Copyright (c) 2013 Red Hat, Inc. <http://www.redhat.com>
+ This file is part of GlusterFS.
+
+ This file is licensed to you under your choice of the GNU Lesser
+ General Public License, version 3 or any later version (LGPLv3 or
+ later), or the GNU General Public License, version 2 (GPLv2), in all
+ cases as published by the Free Software Foundation.
+*/
+
+#include "gf-changelog-rpc.h"
+#include "changelog-misc.h"
+#include "changelog-mem-types.h"
+
+struct rpc_clnt_program gf_changelog_clnt;
+
+/* TODO: piggyback reconnect to called (upcall) */
+int
+gf_changelog_rpc_notify (struct rpc_clnt *rpc,
+ void *mydata, rpc_clnt_event_t event, void *data)
+{
+ xlator_t *this = NULL;
+
+ this = mydata;
+
+ switch (event) {
+ case RPC_CLNT_CONNECT:
+ rpc_clnt_set_connected (&rpc->conn);
+ break;
+ case RPC_CLNT_DISCONNECT:
+ rpc_clnt_unset_connected (&rpc->conn);
+ break;
+ case RPC_CLNT_MSG:
+ case RPC_CLNT_DESTROY:
+ break;
+ }
+
+ return 0;
+}
+
+struct rpc_clnt *
+gf_changelog_rpc_init (xlator_t *this, gf_changelog_t *entry)
+{
+ char sockfile[UNIX_PATH_MAX] = {0,};
+
+ CHANGELOG_MAKE_SOCKET_PATH (entry->brick,
+ sockfile, UNIX_PATH_MAX);
+ return changelog_rpc_client_init (this, entry,
+ sockfile, gf_changelog_rpc_notify);
+}
+
+/**
+ * remote procedure calls declarations.
+ */
+
+int
+gf_probe_changelog_cbk (struct rpc_req *req,
+ struct iovec *iovec, int count, void *myframe)
+{
+ return 0;
+}
+
+int
+gf_probe_changelog_filter (call_frame_t *frame, xlator_t *this, void *data)
+{
+ int ret = 0;
+ char *sock = NULL;
+ gf_changelog_t *entry = NULL;
+ changelog_probe_req req = {0,};
+
+ entry = data;
+ sock = RPC_SOCK (entry);
+
+ (void) memcpy (&req.sock, sock, strlen (sock));
+ req.filter = entry->notify;
+
+ /* invoke RPC */
+ return changelog_rpc_sumbit_req (RPC_PROBER (entry), (void *) &req,
+ frame, &gf_changelog_clnt,
+ CHANGELOG_RPC_PROBE_FILTER, NULL, 0,
+ NULL, this, gf_probe_changelog_cbk,
+ (xdrproc_t) xdr_changelog_probe_req);
+}
+
+int
+gf_changelog_invoke_rpc (xlator_t *this, gf_changelog_t *entry, int procidx)
+{
+ return changelog_invoke_rpc (this, RPC_PROBER (entry),
+ &gf_changelog_clnt, procidx, entry);
+}
+
+struct rpc_clnt_procedure gf_changelog_procs[CHANGELOG_RPC_PROC_MAX] = {
+ [CHANGELOG_RPC_PROC_NULL] = {"NULL", NULL},
+ [CHANGELOG_RPC_PROBE_FILTER] = {
+ "PROBE FILTER", gf_probe_changelog_filter
+ },
+};
+
+struct rpc_clnt_program gf_changelog_clnt = {
+ .progname = "LIBGFCHANGELOG",
+ .prognum = CHANGELOG_RPC_PROGNUM,
+ .progver = CHANGELOG_RPC_PROGVER,
+ .numproc = CHANGELOG_RPC_PROC_MAX,
+ .proctable = gf_changelog_procs,
+};
diff --git a/xlators/features/changelog/src/changelog-notifier.h b/xlators/features/changelog/lib/src/gf-changelog-rpc.h
index 55e728356e6..1c982eef809 100644
--- a/xlators/features/changelog/src/changelog-notifier.h
+++ b/xlators/features/changelog/lib/src/gf-changelog-rpc.h
@@ -8,12 +8,19 @@
cases as published by the Free Software Foundation.
*/
-#ifndef _CHANGELOG_NOTIFIER_H
-#define _CHANGELOG_NOTIFIER_H
+#ifndef __GF_CHANGELOG_RPC_H
+#define __GF_CHANGELOG_RPC_H
-#include "changelog-helpers.h"
+#include "xlator.h"
-void *
-changelog_notifier (void *data);
+#include "gf-changelog-helpers.h"
+#include "changelog-rpc-common.h"
+
+struct rpc_clnt *gf_changelog_rpc_init (xlator_t *, gf_changelog_t *);
+
+int gf_changelog_invoke_rpc (xlator_t *, gf_changelog_t *, int);
+
+rpcsvc_t *
+gf_changelog_reborp_init_rpc_listner (xlator_t *, char *, char *, void *);
#endif
diff --git a/xlators/features/changelog/lib/src/gf-changelog.c b/xlators/features/changelog/lib/src/gf-changelog.c
index fb2d9037ffb..8f33eb01013 100644
--- a/xlators/features/changelog/lib/src/gf-changelog.c
+++ b/xlators/features/changelog/lib/src/gf-changelog.c
@@ -14,6 +14,8 @@
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/un.h>
+#include <sys/time.h>
+#include <sys/resource.h>
#ifndef _GNU_SOURCE
#define _GNU_SOURCE
@@ -24,589 +26,523 @@
#include "glusterfs.h"
#include "logging.h"
#include "defaults.h"
+#include "syncop.h"
+#include "gf-changelog-rpc.h"
#include "gf-changelog-helpers.h"
/* from the changelog translator */
#include "changelog-misc.h"
#include "changelog-mem-types.h"
-int byebye = 0;
+/**
+ * Global singleton xlator pointer for the library, initialized
+ * during library load. This should probably be hidden inside
+ * an initialized object which is an handle for the consumer.
+ *
+ * TODO: do away with the global..
+ */
+xlator_t *master = NULL;
-static void
-gf_changelog_cleanup (gf_changelog_t *gfc)
+static inline
+gf_private_t *gf_changelog_alloc_priv ()
{
- /* socket */
- if (gfc->gfc_sockfd != -1)
- close (gfc->gfc_sockfd);
- /* tracker fd */
- if (gfc->gfc_fd != -1)
- close (gfc->gfc_fd);
- /* processing dir */
- if (gfc->gfc_dir)
- closedir (gfc->gfc_dir);
-
- if (gfc->gfc_working_dir)
- free (gfc->gfc_working_dir); /* allocated by realpath */
-}
+ int ret = 0;
+ gf_private_t *priv = NULL;
-void
-__attribute__ ((constructor)) gf_changelog_ctor (void)
-{
- glusterfs_ctx_t *ctx = NULL;
+ priv = calloc (1, sizeof (gf_private_t));
+ if (!priv)
+ goto error_return;
+ INIT_LIST_HEAD (&priv->connections);
- ctx = glusterfs_ctx_new ();
- if (!ctx)
- return;
+ ret = LOCK_INIT (&priv->lock);
+ if (ret != 0)
+ goto free_priv;
+ priv->api = NULL;
- if (glusterfs_globals_init (ctx)) {
- free (ctx);
- ctx = NULL;
- return;
- }
+ return priv;
- THIS->ctx = ctx;
- if (xlator_mem_acct_init (THIS, gf_changelog_mt_end))
- return;
+ free_priv:
+ free (priv);
+ error_return:
+ return NULL;
}
-void
-__attribute__ ((destructor)) gf_changelog_dtor (void)
-{
- xlator_t *this = NULL;
- glusterfs_ctx_t *ctx = NULL;
- gf_changelog_t *gfc = NULL;
+#define GF_CHANGELOG_EVENT_POOL_SIZE 16384
+#define GF_CHANGELOG_EVENT_THREAD_COUNT 4
- this = THIS;
- if (!this)
- return;
-
- ctx = this->ctx;
- gfc = this->private;
-
- if (gfc) {
- if (gfc->hist_gfc) {
- gf_changelog_cleanup(gfc->hist_gfc);
- GF_FREE (gfc->hist_gfc);
- }
- gf_changelog_cleanup (gfc);
- GF_FREE (gfc);
+static int
+gf_changelog_ctx_defaults_init (glusterfs_ctx_t *ctx)
+{
+ cmd_args_t *cmd_args = NULL;
+ struct rlimit lim = {0, };
+ call_pool_t *pool = NULL;
+ int ret = -1;
+
+ ret = xlator_mem_acct_init (THIS, gf_changelog_mt_end);
+ if (ret != 0) {
+ return ret;
}
- if (ctx) {
- pthread_mutex_destroy (&ctx->lock);
- free (ctx);
- ctx = NULL;
- }
-}
+ ctx->process_uuid = generate_glusterfs_ctx_id ();
+ if (!ctx->process_uuid)
+ return -1;
+ ctx->page_size = 128 * GF_UNIT_KB;
-static int
-gf_changelog_open_dirs (gf_changelog_t *gfc)
-{
- int ret = -1;
- DIR *dir = NULL;
- int tracker_fd = 0;
- char tracker_path[PATH_MAX] = {0,};
- xlator_t *this = NULL;
+ ctx->iobuf_pool = iobuf_pool_new ();
+ if (!ctx->iobuf_pool)
+ return -1;
- this = THIS;
- GF_ASSERT (this);
+ ctx->event_pool = event_pool_new (GF_CHANGELOG_EVENT_POOL_SIZE,
+ GF_CHANGELOG_EVENT_THREAD_COUNT);
+ if (!ctx->event_pool)
+ return -1;
- (void) snprintf (gfc->gfc_current_dir, PATH_MAX,
- "%s/"GF_CHANGELOG_CURRENT_DIR"/",
- gfc->gfc_working_dir);
+ pool = GF_CALLOC (1, sizeof (call_pool_t),
+ gf_changelog_mt_libgfchangelog_call_pool_t);
+ if (!pool)
+ return -1;
- ret = recursive_rmdir (gfc->gfc_current_dir);
- if (ret) {
- gf_log (this->name, GF_LOG_ERROR,
- "Failed to rmdir: %s, err: %s",
- gfc->gfc_current_dir, strerror (errno));
- goto out;
- }
+ /* frame_mem_pool size 112 * 64 */
+ pool->frame_mem_pool = mem_pool_new (call_frame_t, 32);
+ if (!pool->frame_mem_pool)
+ return -1;
- ret = mkdir_p (gfc->gfc_current_dir, 0600, _gf_false);
- if (ret)
- goto out;
+ /* stack_mem_pool size 256 * 128 */
+ pool->stack_mem_pool = mem_pool_new (call_stack_t, 16);
- (void) snprintf (gfc->gfc_processed_dir, PATH_MAX,
- "%s/"GF_CHANGELOG_PROCESSED_DIR"/",
- gfc->gfc_working_dir);
+ if (!pool->stack_mem_pool)
+ return -1;
- ret = mkdir_p (gfc->gfc_processed_dir, 0600, _gf_false);
- if (ret)
- goto out;
+ ctx->stub_mem_pool = mem_pool_new (call_stub_t, 16);
+ if (!ctx->stub_mem_pool)
+ return -1;
- (void) snprintf (gfc->gfc_processing_dir, PATH_MAX,
- "%s/"GF_CHANGELOG_PROCESSING_DIR"/",
- gfc->gfc_working_dir);
+ ctx->dict_pool = mem_pool_new (dict_t, 32);
+ if (!ctx->dict_pool)
+ return -1;
- ret = recursive_rmdir (gfc->gfc_processing_dir);
- if (ret) {
- gf_log (this->name, GF_LOG_ERROR,
- "Failed to rmdir: %s, err: %s",
- gfc->gfc_processing_dir, strerror (errno));
- goto out;
- }
+ ctx->dict_pair_pool = mem_pool_new (data_pair_t, 512);
+ if (!ctx->dict_pair_pool)
+ return -1;
- ret = mkdir_p (gfc->gfc_processing_dir, 0600, _gf_false);
- if (ret)
- goto out;
+ ctx->dict_data_pool = mem_pool_new (data_t, 512);
+ if (!ctx->dict_data_pool)
+ return -1;
- dir = opendir (gfc->gfc_processing_dir);
- if (!dir) {
- gf_log ("", GF_LOG_ERROR,
- "opendir() error [reason: %s]", strerror (errno));
- goto out;
- }
+ INIT_LIST_HEAD (&pool->all_frames);
+ LOCK_INIT (&pool->lock);
+ ctx->pool = pool;
- gfc->gfc_dir = dir;
+ pthread_mutex_init (&(ctx->lock), NULL);
- (void) snprintf (tracker_path, PATH_MAX,
- "%s/"GF_CHANGELOG_TRACKER, gfc->gfc_working_dir);
+ cmd_args = &ctx->cmd_args;
- tracker_fd = open (tracker_path, O_CREAT | O_APPEND | O_RDWR,
- S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
- if (tracker_fd < 0) {
- closedir (gfc->gfc_dir);
- ret = -1;
- goto out;
- }
+ INIT_LIST_HEAD (&cmd_args->xlator_options);
+
+ lim.rlim_cur = RLIM_INFINITY;
+ lim.rlim_max = RLIM_INFINITY;
+ setrlimit (RLIMIT_CORE, &lim);
- gfc->gfc_fd = tracker_fd;
- ret = 0;
- out:
- return ret;
+ return 0;
}
-int
-gf_changelog_notification_init (xlator_t *this, gf_changelog_t *gfc)
+/* TODO: cleanup ctx defaults */
+static void
+gf_changelog_cleanup_this (xlator_t *this)
{
- int ret = 0;
- int len = 0;
- int tries = 0;
- int sockfd = 0;
- struct sockaddr_un remote;
-
- this = gfc->this;
+ glusterfs_ctx_t *ctx = NULL;
- if (gfc->gfc_sockfd != -1) {
- gf_log (this->name, GF_LOG_INFO,
- "Reconnecting...");
- close (gfc->gfc_sockfd);
- }
+ if (!this)
+ return;
- sockfd = socket (AF_UNIX, SOCK_STREAM, 0);
- if (sockfd < 0) {
- ret = -1;
- goto out;
- }
+ ctx = this->ctx;
+ syncenv_destroy (ctx->env);
+ free (ctx);
- CHANGELOG_MAKE_SOCKET_PATH (gfc->gfc_brickpath,
- gfc->gfc_sockpath, UNIX_PATH_MAX);
- gf_log (this->name, GF_LOG_INFO,
- "connecting to changelog socket: %s (brick: %s)",
- gfc->gfc_sockpath, gfc->gfc_brickpath);
+ if (this->private)
+ free (this->private);
- remote.sun_family = AF_UNIX;
- strcpy (remote.sun_path, gfc->gfc_sockpath);
+ this->private = NULL;
+ this->ctx = NULL;
+}
- len = strlen (remote.sun_path) + sizeof (remote.sun_family);
+static int
+gf_changelog_init_this ()
+{
+ glusterfs_ctx_t *ctx = NULL;
- while (tries < gfc->gfc_connretries) {
- gf_log (this->name, GF_LOG_WARNING,
- "connection attempt %d/%d...",
- tries + 1, gfc->gfc_connretries);
+ ctx = glusterfs_ctx_new ();
+ if (!ctx)
+ goto error_return;
- /* initiate a connect */
- if (connect (sockfd, (struct sockaddr *) &remote, len) == 0) {
- gfc->gfc_sockfd = sockfd;
- break;
- }
+ if (glusterfs_globals_init (ctx))
+ goto free_ctx;
- tries++;
- sleep (2);
- }
+ THIS->ctx = ctx;
+ if (gf_changelog_ctx_defaults_init (ctx))
+ goto free_ctx;
- if (tries == gfc->gfc_connretries) {
- gf_log (this->name, GF_LOG_ERROR,
- "could not connect to changelog socket!"
- " bailing out...");
- close (sockfd);
- ret = -1;
- } else
- gf_log (this->name, GF_LOG_INFO,
- "connection successful");
+ ctx->env = syncenv_new (0, 0, 0);
+ if (!ctx->env)
+ goto free_ctx;
+ return 0;
- out:
- return ret;
+ free_ctx:
+ free (ctx);
+ THIS->ctx = NULL;
+ error_return:
+ return -1;
}
-int
-gf_changelog_done (char *file)
+static int
+gf_changelog_init_master ()
{
- int ret = -1;
- char *buffer = NULL;
- xlator_t *this = NULL;
- gf_changelog_t *gfc = NULL;
- char to_path[PATH_MAX] = {0,};
-
- errno = EINVAL;
-
- this = THIS;
- if (!this)
- goto out;
-
- gfc = (gf_changelog_t *) this->private;
- if (!gfc)
- goto out;
+ int ret = 0;
+ gf_private_t *priv = NULL;
+ glusterfs_ctx_t *ctx = NULL;
- if (!file || !strlen (file))
- goto out;
+ ret = gf_changelog_init_this ();
+ if (ret != 0)
+ goto error_return;
+ master = THIS;
+
+ priv = gf_changelog_alloc_priv ();
+ if (!priv)
+ goto cleanup_master;
+ master->private = priv;
+
+ /* poller thread */
+ ret = pthread_create (&priv->poller,
+ NULL, changelog_rpc_poller, master);
+ if (ret != 0) {
+ gf_log (master->name, GF_LOG_ERROR,
+ "failed to spawn poller thread");
+ goto cleanup_master;
+ }
- /* make sure 'file' is inside ->gfc_working_dir */
- buffer = realpath (file, NULL);
- if (!buffer)
- goto out;
+ return 0;
- if (strncmp (gfc->gfc_working_dir,
- buffer, strlen (gfc->gfc_working_dir)))
- goto out;
+ cleanup_master:
+ master->private = NULL;
+ gf_changelog_cleanup_this (master);
+ error_return:
+ return -1;
+}
- (void) snprintf (to_path, PATH_MAX, "%s%s",
- gfc->gfc_processed_dir, basename (buffer));
- gf_log (this->name, GF_LOG_DEBUG,
- "moving %s to processed directory", file);
- ret = rename (buffer, to_path);
- if (ret) {
- gf_log (this->name, GF_LOG_ERROR,
- "cannot move %s to %s (reason: %s)",
- file, to_path, strerror (errno));
- goto out;
- }
+/* ctor/dtor */
- ret = 0;
+void
+__attribute__ ((constructor)) gf_changelog_ctor (void)
+{
+ (void) gf_changelog_init_master ();
+}
- out:
- if (buffer)
- free (buffer); /* allocated by realpath() */
- return ret;
+void
+__attribute__ ((destructor)) gf_changelog_dtor (void)
+{
+ gf_changelog_cleanup_this (master);
}
-/**
- * @API
- * for a set of changelogs, start from the beginning
- */
+/* TODO: cleanup clnt/svc on failure */
int
-gf_changelog_start_fresh ()
+gf_changelog_setup_rpc (xlator_t *this,
+ gf_changelog_t *entry, int proc)
{
- xlator_t *this = NULL;
- gf_changelog_t *gfc = NULL;
-
- this = THIS;
- if (!this)
- goto out;
+ int ret = 0;
+ rpcsvc_t *svc = NULL;
+ struct rpc_clnt *rpc = NULL;
- errno = EINVAL;
+ /**
+ * Initialize a connect back socket. A probe() RPC call to the server
+ * triggers a reverse connect.
+ */
+ svc = gf_changelog_reborp_init_rpc_listner (this, entry->brick,
+ RPC_SOCK (entry), entry);
+ if (!svc)
+ goto error_return;
+ RPC_REBORP (entry) = svc;
+
+ /* Initialize an RPC client */
+ rpc = gf_changelog_rpc_init (this, entry);
+ if (!rpc)
+ goto error_return;
+ RPC_PROBER (entry) = rpc;
- gfc = (gf_changelog_t *) this->private;
- if (!gfc)
- goto out;
+ /**
+ * @FIXME
+ * till we have connection state machine, let's delay the RPC call
+ * for now..
+ */
+ sleep (2);
- if (gf_ftruncate (gfc->gfc_fd, 0))
- goto out;
+ /**
+ * Probe changelog translator for reverse connection. After a successful
+ * call, there's less use of the client and can be disconnected, but
+ * let's leave the connection active for any future RPC calls.
+ */
+ ret = gf_changelog_invoke_rpc (this, entry, proc);
+ if (ret) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "Could not initiate probe RPC, bailing out!!!");
+ goto error_return;
+ }
return 0;
- out:
+ error_return:
return -1;
}
-/**
- * @API
- * return the next changelog file entry. zero means all chanelogs
- * consumed.
- */
-ssize_t
-gf_changelog_next_change (char *bufptr, size_t maxlen)
+static void
+gf_cleanup_event (gf_changelog_t *entry)
{
- ssize_t size = -1;
- int tracker_fd = 0;
- xlator_t *this = NULL;
- gf_changelog_t *gfc = NULL;
- char buffer[PATH_MAX] = {0,};
-
- if (maxlen > PATH_MAX) {
- errno = ENAMETOOLONG;
- goto out;
- }
+ xlator_t *this = NULL;
+ struct gf_event_list *ev = NULL;
- errno = EINVAL;
+ this = entry->this;
+ ev = &entry->event;
- this = THIS;
- if (!this)
- goto out;
+ (void) gf_thread_cleanup (this, ev->invoker);
- gfc = (gf_changelog_t *) this->private;
- if (!gfc)
- goto out;
+ (void) pthread_mutex_destroy (&ev->lock);
+ (void) pthread_cond_destroy (&ev->cond);
- tracker_fd = gfc->gfc_fd;
+ ev->entry = NULL;
+}
- size = gf_readline (tracker_fd, buffer, maxlen);
- if (size < 0) {
- size = -1;
- goto out;
+static int
+gf_init_event (gf_changelog_t *entry)
+{
+ int ret = 0;
+ struct gf_event_list *ev = NULL;
+
+ ev = &entry->event;
+ ev->entry = entry;
+
+ ret = pthread_mutex_init (&ev->lock, NULL);
+ if (ret != 0)
+ goto error_return;
+ ret = pthread_cond_init (&ev->cond, NULL);
+ if (ret != 0)
+ goto cleanup_mutex;
+ INIT_LIST_HEAD (&ev->events);
+
+ ev->next_seq = 0; /* bootstrap sequencing */
+
+ if (entry->ordered) {
+ ret = pthread_create (&ev->invoker, NULL,
+ gf_changelog_callback_invoker, ev);
+ if (ret != 0)
+ goto cleanup_cond;
}
- if (size == 0)
- goto out;
-
- memcpy (bufptr, buffer, size - 1);
- bufptr[size - 1] = '\0';
+ return 0;
-out:
- return size;
+ cleanup_cond:
+ (void) pthread_cond_destroy (&ev->cond);
+ cleanup_mutex:
+ (void) pthread_mutex_destroy (&ev->lock);
+ error_return:
+ return -1;
}
/**
- * @API
- * gf_changelog_scan() - scan and generate a list of change entries
- *
- * calling this api multiple times (without calling gf_changlog_done())
- * would result new changelogs(s) being refreshed in the tracker file.
- * This call also acts as a cancellation point for the consumer.
+ * TODO:
+ * - cleanup invoker thread (if ordered mode)
+ * - cleanup event list
+ * - destroy rpc{-clnt, svc}
*/
-ssize_t
-gf_changelog_scan ()
+int
+gf_cleanup_brick_connection (xlator_t *this, gf_changelog_t *entry)
{
- int ret = 0;
- int tracker_fd = 0;
- size_t len = 0;
- size_t off = 0;
- xlator_t *this = NULL;
- size_t nr_entries = 0;
- gf_changelog_t *gfc = NULL;
- struct dirent *entryp = NULL;
- struct dirent *result = NULL;
- char buffer[PATH_MAX] = {0,};
-
- this = THIS;
- if (!this)
- goto out;
+ return 0;
+}
- gfc = (gf_changelog_t *) this->private;
- if (!gfc)
- goto out;
+int
+gf_cleanup_connections (xlator_t *this)
+{
+ return 0;
+}
- /**
- * do we need to protect 'byebye' with locks? worst, the
- * consumer would get notified during next scan().
- */
- if (byebye) {
- errno = ECONNREFUSED;
- goto out;
- }
+static int
+gf_setup_brick_connection (xlator_t *this,
+ struct gf_brick_spec *brick,
+ gf_boolean_t ordered, void *xl)
+{
+ int ret = 0;
+ gf_private_t *priv = NULL;
+ gf_changelog_t *entry = NULL;
- errno = EINVAL;
+ priv = this->private;
- tracker_fd = gfc->gfc_fd;
+ if (!brick->callback || !brick->init || !brick->fini)
+ goto error_return;
- if (gf_ftruncate (tracker_fd, 0))
- goto out;
+ entry = GF_CALLOC (1, sizeof (*entry),
+ gf_changelog_mt_libgfchangelog_t);
+ if (!entry)
+ goto error_return;
+ INIT_LIST_HEAD (&entry->list);
- len = offsetof(struct dirent, d_name)
- + pathconf(gfc->gfc_processing_dir, _PC_NAME_MAX) + 1;
- entryp = GF_CALLOC (1, len,
- gf_changelog_mt_libgfchangelog_dirent_t);
- if (!entryp)
- goto out;
+ entry->notify = brick->filter;
+ (void) strncpy (entry->brick, brick->brick_path, PATH_MAX);
- rewinddir (gfc->gfc_dir);
- while (1) {
- ret = readdir_r (gfc->gfc_dir, entryp, &result);
- if (ret || !result)
- break;
+ entry->this = this;
+ entry->invokerxl = xl;
- if ( !strcmp (basename (entryp->d_name), ".")
- || !strcmp (basename (entryp->d_name), "..") )
- continue;
+ entry->ordered = ordered;
+ if (ordered) {
+ ret = gf_init_event (entry);
+ if (ret)
+ goto free_entry;
+ }
- nr_entries++;
+ entry->fini = brick->fini;
+ entry->callback = brick->callback;
+ entry->connected = brick->connected;
+ entry->disconnected = brick->disconnected;
- GF_CHANGELOG_FILL_BUFFER (gfc->gfc_processing_dir,
- buffer, off,
- strlen (gfc->gfc_processing_dir));
- GF_CHANGELOG_FILL_BUFFER (entryp->d_name, buffer,
- off, strlen (entryp->d_name));
- GF_CHANGELOG_FILL_BUFFER ("\n", buffer, off, 1);
+ entry->ptr = brick->init (this, brick);
+ if (!entry->ptr)
+ goto cleanup_event;
+ priv->api = entry->ptr; /* pointer to API, if required */
- if (gf_changelog_write (tracker_fd, buffer, off) != off) {
- gf_log (this->name, GF_LOG_ERROR,
- "error writing changelog filename"
- " to tracker file");
- break;
- }
- off = 0;
+ LOCK (&priv->lock);
+ {
+ list_add_tail (&entry->list, &priv->connections);
}
+ UNLOCK (&priv->lock);
- GF_FREE (entryp);
+ ret = gf_changelog_setup_rpc (this, entry, CHANGELOG_RPC_PROBE_FILTER);
+ if (ret)
+ goto cleanup_event;
+ return 0;
- if (!result) {
- if (gf_lseek (tracker_fd, 0, SEEK_SET) != -1)
- return nr_entries;
- }
- out:
+ cleanup_event:
+ if (ordered)
+ gf_cleanup_event (entry);
+ free_entry:
+ list_del (&entry->list); /* FIXME: kludge for now */
+ GF_FREE (entry);
+ error_return:
return -1;
}
-/**
- * @API
- * gf_changelog_register() - register a client for updates.
- */
int
-gf_changelog_register (char *brick_path, char *scratch_dir,
- char *log_file, int log_level, int max_reconnects)
+gf_changelog_register_brick (xlator_t *this,
+ struct gf_brick_spec *brick,
+ gf_boolean_t ordered, void *xl)
{
- int i = 0;
- int ret = -1;
- int errn = 0;
- xlator_t *this = NULL;
- gf_changelog_t *gfc = NULL;
- char hist_scratch_dir[PATH_MAX] = {0,};
- struct stat buf = {0,};
-
- this = THIS;
- if (!this->ctx)
- goto out;
-
- errno = ENOMEM;
-
- gfc = GF_CALLOC (1, sizeof (*gfc),
- gf_changelog_mt_libgfchangelog_t);
- if (!gfc)
- goto out;
-
- gfc->this = this;
-
- gfc->gfc_dir = NULL;
- gfc->gfc_fd = gfc->gfc_sockfd = -1;
-
- if (stat (scratch_dir, &buf) && errno == ENOENT) {
- ret = mkdir_p (scratch_dir, 0600, _gf_true);
- if (ret) {
- errn = errno;
- goto cleanup;
- }
- }
-
- gfc->gfc_working_dir = realpath (scratch_dir, NULL);
- if (!gfc->gfc_working_dir) {
- errn = errno;
- goto cleanup;
- }
+ return gf_setup_brick_connection (this, brick, ordered, xl);
+}
- /* Begin: Changes for History API */
- gfc->hist_gfc = NULL;
+static int
+gf_changelog_setup_logging (xlator_t *this, char *logfile, int loglevel)
+{
+ /* passing ident as NULL means to use default ident for syslog */
+ if (gf_log_init (this->ctx, logfile, NULL))
+ return -1;
- gfc->hist_gfc = GF_CALLOC (1, sizeof (*gfc),
- gf_changelog_mt_libgfchangelog_t);
- if (!gfc->hist_gfc)
- goto cleanup;
+ gf_log_set_loglevel ((loglevel == -1) ? GF_LOG_INFO :
+ loglevel);
+ return 0;
+}
- gfc->hist_gfc->gfc_dir = NULL;
- gfc->hist_gfc->gfc_fd = gfc->hist_gfc->gfc_sockfd = -1;
- gfc->hist_gfc->this = NULL;
+int
+gf_changelog_register_generic (struct gf_brick_spec *bricks, int count,
+ int ordered, char *logfile, int lvl, void *xl)
+{
+ int ret = 0;
+ xlator_t *this = NULL;
+ xlator_t *old_this = NULL;
+ struct gf_brick_spec *brick = NULL;
+ gf_boolean_t need_order = _gf_false;
- (void) strncpy (hist_scratch_dir, scratch_dir, PATH_MAX);
- (void) snprintf (hist_scratch_dir, PATH_MAX,
- "%s/"GF_CHANGELOG_HISTORY_DIR"/",
- gfc->gfc_working_dir);
+ SAVE_THIS (xl);
- ret = mkdir_p (hist_scratch_dir, 0600, _gf_false);
- if (ret) {
- errn = errno;
- goto cleanup;
- }
+ this = THIS;
+ if (!this)
+ goto error_return;
- gfc->hist_gfc->gfc_working_dir = realpath (hist_scratch_dir, NULL);
- if (!gfc->hist_gfc->gfc_working_dir) {
- errn = errno;
- goto cleanup;
- }
+ ret = gf_changelog_setup_logging (this, logfile, lvl);
+ if (ret)
+ goto error_return;
- ret = gf_changelog_open_dirs (gfc->hist_gfc);
- if (ret) {
- errn = errno;
- gf_log (this->name, GF_LOG_ERROR,
- "could not create entries in history scratch dir");
- goto cleanup;
- }
+ need_order = (ordered) ? _gf_true : _gf_false;
- (void) strncpy (gfc->hist_gfc->gfc_brickpath, brick_path, PATH_MAX);
+ brick = bricks;
+ while (count--) {
+ gf_log (this->name, GF_LOG_INFO,
+ "Registering brick: %s [notify filter: %d]",
+ brick->brick_path, brick->filter);
- for (i=0; i < 256; i++) {
- gfc->hist_gfc->rfc3986[i] =
- (isalnum(i) || i == '~' ||
- i == '-' || i == '.' || i == '_') ? i : 0;
- }
- /* End: Changes for History API*/
+ ret = gf_changelog_register_brick (this, brick, need_order, xl);
+ if (ret != 0) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "Error registering with changelog xlator");
+ break;
+ }
- ret = gf_changelog_open_dirs (gfc);
- if (ret) {
- errn = errno;
- gf_log (this->name, GF_LOG_ERROR,
- "could not create entries in scratch dir");
- goto cleanup;
+ brick++;
}
- /* passing ident as NULL means to use default ident for syslog */
- if (gf_log_init (this->ctx, log_file, NULL))
- goto cleanup;
-
- gf_log_set_loglevel ((log_level == -1) ? GF_LOG_INFO :
- log_level);
+ if (ret != 0)
+ goto cleanup_inited_bricks;
- gfc->gfc_connretries = (max_reconnects <= 0) ? 1 : max_reconnects;
- (void) strncpy (gfc->gfc_brickpath, brick_path, PATH_MAX);
+ RESTORE_THIS();
+ return 0;
- ret = gf_changelog_notification_init (this, gfc);
- if (ret) {
- errn = errno;
- goto cleanup;
- }
+ cleanup_inited_bricks:
+ gf_cleanup_connections (this);
+ error_return:
+ RESTORE_THIS();
+ return -1;
+}
- ret = gf_thread_create (&gfc->gfc_changelog_processor,
- NULL, gf_changelog_process, gfc);
- if (ret) {
- errn = errno;
- gf_log (this->name, GF_LOG_ERROR,
- "error creating changelog processor thread"
- " new changes won't be recorded!!!");
- goto cleanup;
- }
+/**
+ * @API
+ * gf_changelog_register()
+ *
+ * This is _NOT_ a generic register API. It's a special API to handle
+ * updates at a journal granulality. This is used by consumers wanting
+ * to process persistent journal such as geo-replication via a set of
+ * APIs. All of this is required to maintain backward compatibility.
+ * Owner specific private data is stored in ->api (in gf_private_t),
+ * which is used by APIs to access it's private data. This limits
+ * the library access to a single brick, but that's how it used to
+ * be anyway. Furthermore, this API solely _owns_ "this", therefore
+ * callers already having a notion of "this" are expected to use the
+ * newer API.
+ *
+ * Newer applications wanting to use this library need not face this
+ * limitation and reply of the much more feature rich generic register
+ * API, which is purely callback based.
+ *
+ * NOTE: @max_reconnects is not used but required for backward compat.
+ *
+ * For generic API, refer gf_changelog_register_generic().
+ */
+int
+gf_changelog_register (char *brick_path, char *scratch_dir,
+ char *log_file, int log_level, int max_reconnects)
+{
+ struct gf_brick_spec brick = {0,};
- for (i=0; i < 256; i++) {
- gfc->rfc3986[i] =
- (isalnum(i) || i == '~' ||
- i == '-' || i == '.' || i == '_') ? i : 0;
- }
+ THIS = master;
- ret = 0;
- this->private = gfc;
+ brick.brick_path = brick_path;
+ brick.filter = CHANGELOG_OP_TYPE_JOURNAL;
- goto out;
+ brick.init = gf_changelog_journal_init;
+ brick.fini = gf_changelog_journal_fini;
+ brick.callback = gf_changelog_handle_journal;
+ brick.connected = gf_changelog_journal_connect;
+ brick.disconnected = gf_changelog_journal_disconnect;
- cleanup:
- if (gfc->hist_gfc) {
- gf_changelog_cleanup (gfc->hist_gfc);
- GF_FREE (gfc->hist_gfc);
- }
- gf_changelog_cleanup (gfc);
- GF_FREE (gfc);
- this->private = NULL;
- errno = errn;
+ brick.ptr = scratch_dir;
- out:
- return ret;
+ return gf_changelog_register_generic (&brick, 1, 1,
+ log_file, log_level, NULL);
}
diff --git a/xlators/features/changelog/lib/src/gf-history-changelog.c b/xlators/features/changelog/lib/src/gf-history-changelog.c
index 8a527dd6e4b..12f51da8fa2 100644
--- a/xlators/features/changelog/lib/src/gf-history-changelog.c
+++ b/xlators/features/changelog/lib/src/gf-history-changelog.c
@@ -14,6 +14,7 @@
#include "syscall.h"
#include "gf-changelog-helpers.h"
+#include "gf-changelog-journal.h"
/* from the changelog translator */
#include "changelog-misc.h"
@@ -36,12 +37,12 @@
int
gf_history_changelog_done (char *file)
{
- int ret = -1;
- char *buffer = NULL;
- xlator_t *this = NULL;
- gf_changelog_t *gfc = NULL;
- gf_changelog_t *hist_gfc = NULL;
- char to_path[PATH_MAX] = {0,};
+ int ret = -1;
+ char *buffer = NULL;
+ xlator_t *this = NULL;
+ gf_changelog_journal_t *jnl = NULL;
+ gf_changelog_journal_t *hist_jnl = NULL;
+ char to_path[PATH_MAX] = {0,};
errno = EINVAL;
@@ -49,28 +50,28 @@ gf_history_changelog_done (char *file)
if (!this)
goto out;
- gfc = (gf_changelog_t *) this->private;
- if (!gfc)
+ jnl = (gf_changelog_journal_t *) GF_CHANGELOG_GET_API_PTR (this);
+ if (!jnl)
goto out;
- hist_gfc = gfc->hist_gfc;
- if (!hist_gfc)
+ hist_jnl = jnl->hist_jnl;
+ if (!hist_jnl)
goto out;
if (!file || !strlen (file))
goto out;
- /* make sure 'file' is inside ->gfc_working_dir */
+ /* make sure 'file' is inside ->jnl_working_dir */
buffer = realpath (file, NULL);
if (!buffer)
goto out;
- if (strncmp (hist_gfc->gfc_working_dir,
- buffer, strlen (hist_gfc->gfc_working_dir)))
+ if (strncmp (hist_jnl->jnl_working_dir,
+ buffer, strlen (hist_jnl->jnl_working_dir)))
goto out;
(void) snprintf (to_path, PATH_MAX, "%s%s",
- hist_gfc->gfc_processed_dir, basename (buffer));
+ hist_jnl->jnl_processed_dir, basename (buffer));
gf_log (this->name, GF_LOG_DEBUG,
"moving %s to processed directory", file);
ret = rename (buffer, to_path);
@@ -102,9 +103,9 @@ gf_history_changelog_done (char *file)
int
gf_history_changelog_start_fresh ()
{
- xlator_t *this = NULL;
- gf_changelog_t *gfc = NULL;
- gf_changelog_t *hist_gfc = NULL;
+ xlator_t *this = NULL;
+ gf_changelog_journal_t *jnl = NULL;
+ gf_changelog_journal_t *hist_jnl = NULL;
this = THIS;
if (!this)
@@ -112,15 +113,15 @@ gf_history_changelog_start_fresh ()
errno = EINVAL;
- gfc = (gf_changelog_t *) this->private;
- if (!gfc)
+ jnl = (gf_changelog_journal_t *) GF_CHANGELOG_GET_API_PTR (this);
+ if (!jnl)
goto out;
- hist_gfc = gfc->hist_gfc;
- if (!hist_gfc)
+ hist_jnl = jnl->hist_jnl;
+ if (!hist_jnl)
goto out;
- if (gf_ftruncate (hist_gfc->gfc_fd, 0))
+ if (gf_ftruncate (hist_jnl->jnl_fd, 0))
goto out;
return 0;
@@ -147,12 +148,17 @@ gf_history_changelog_start_fresh ()
ssize_t
gf_history_changelog_next_change (char *bufptr, size_t maxlen)
{
- ssize_t size = -1;
- int tracker_fd = 0;
- xlator_t *this = NULL;
- gf_changelog_t *gfc = NULL;
- gf_changelog_t *hist_gfc = NULL;
- char buffer[PATH_MAX] = {0,};
+ ssize_t size = -1;
+ int tracker_fd = 0;
+ xlator_t *this = NULL;
+ gf_changelog_journal_t *jnl = NULL;
+ gf_changelog_journal_t *hist_jnl = NULL;
+ char buffer[PATH_MAX] = {0,};
+
+ if (maxlen > PATH_MAX) {
+ errno = ENAMETOOLONG;
+ goto out;
+ }
errno = EINVAL;
@@ -160,15 +166,15 @@ gf_history_changelog_next_change (char *bufptr, size_t maxlen)
if (!this)
goto out;
- gfc = (gf_changelog_t *) this->private;
- if (!gfc)
+ jnl = (gf_changelog_journal_t *) GF_CHANGELOG_GET_API_PTR (this);
+ if (!jnl)
goto out;
- hist_gfc = gfc->hist_gfc;
- if (!hist_gfc)
+ hist_jnl = jnl->hist_jnl;
+ if (!hist_jnl)
goto out;
- tracker_fd = hist_gfc->gfc_fd;
+ tracker_fd = hist_jnl->jnl_fd;
size = gf_readline (tracker_fd, buffer, maxlen);
if (size < 0) {
@@ -206,56 +212,60 @@ out:
ssize_t
gf_history_changelog_scan ()
{
- int ret = 0;
- int tracker_fd = 0;
- size_t len = 0;
- size_t off = 0;
- xlator_t *this = NULL;
- size_t nr_entries = 0;
- gf_changelog_t *gfc = NULL;
- gf_changelog_t *hist_gfc = NULL;
- struct dirent *entryp = NULL;
- struct dirent *result = NULL;
- char buffer[PATH_MAX] = {0,};
- static int is_last_scan = 0;
+ int ret = 0;
+ int tracker_fd = 0;
+ size_t len = 0;
+ size_t off = 0;
+ xlator_t *this = NULL;
+ size_t nr_entries = 0;
+ gf_changelog_journal_t *jnl = NULL;
+ gf_changelog_journal_t *hist_jnl = NULL;
+ struct dirent *entryp = NULL;
+ struct dirent *result = NULL;
+ char buffer[PATH_MAX] = {0,};
+ static int is_last_scan;
this = THIS;
if (!this)
goto out;
- gfc = (gf_changelog_t *) this->private;
- if (!gfc)
+ jnl = (gf_changelog_journal_t *) GF_CHANGELOG_GET_API_PTR (this);
+ if (!jnl)
goto out;
+ if (JNL_IS_API_DISCONNECTED (jnl)) {
+ errno = ENOTCONN;
+ goto out;
+ }
- hist_gfc = gfc->hist_gfc;
- if (!hist_gfc)
+ hist_jnl = jnl->hist_jnl;
+ if (!hist_jnl)
goto out;
retry:
if (is_last_scan == 1)
return 0;
- if (hist_gfc->hist_done == 0)
+ if (hist_jnl->hist_done == 0)
is_last_scan = 1;
errno = EINVAL;
- if (hist_gfc->hist_done == -1)
+ if (hist_jnl->hist_done == -1)
goto out;
- tracker_fd = hist_gfc->gfc_fd;
+ tracker_fd = hist_jnl->jnl_fd;
if (gf_ftruncate (tracker_fd, 0))
goto out;
len = offsetof (struct dirent, d_name)
- + pathconf (hist_gfc->gfc_processing_dir, _PC_NAME_MAX) + 1;
+ + pathconf (hist_jnl->jnl_processing_dir, _PC_NAME_MAX) + 1;
entryp = GF_CALLOC (1, len,
gf_changelog_mt_libgfchangelog_dirent_t);
if (!entryp)
goto out;
- rewinddir (hist_gfc->gfc_dir);
+ rewinddir (hist_jnl->jnl_dir);
while (1) {
- ret = readdir_r (hist_gfc->gfc_dir, entryp, &result);
+ ret = readdir_r (hist_jnl->jnl_dir, entryp, &result);
if (ret || !result)
break;
@@ -265,9 +275,9 @@ gf_history_changelog_scan ()
nr_entries++;
- GF_CHANGELOG_FILL_BUFFER (hist_gfc->gfc_processing_dir,
+ GF_CHANGELOG_FILL_BUFFER (hist_jnl->jnl_processing_dir,
buffer, off,
- strlen (hist_gfc->gfc_processing_dir));
+ strlen (hist_jnl->jnl_processing_dir));
GF_CHANGELOG_FILL_BUFFER (entryp->d_name, buffer,
off, strlen (entryp->d_name));
GF_CHANGELOG_FILL_BUFFER ("\n", buffer, off, 1);
@@ -284,7 +294,8 @@ gf_history_changelog_scan ()
GF_FREE (entryp);
gf_log (this->name, GF_LOG_DEBUG,
- "hist_done %d, is_last_scan: %d", hist_gfc->hist_done, is_last_scan);
+ "hist_done %d, is_last_scan: %d", hist_jnl->hist_done,
+ is_last_scan);
if (!result) {
if (gf_lseek (tracker_fd, 0, SEEK_SET) != -1) {
@@ -490,7 +501,7 @@ gf_changelog_consume_wrap (void* data)
/* TODO: handle short reads and EOF. */
ret = gf_changelog_consume (ccd->this,
- ccd->gfc, ccd->changelog, _gf_true);
+ ccd->jnl, ccd->changelog, _gf_true);
if (ret) {
gf_log (this->name, GF_LOG_ERROR,
"could not parse changelog: %s", ccd->changelog);
@@ -515,8 +526,8 @@ void *
gf_history_consume (void * data)
{
xlator_t *this = NULL;
- gf_changelog_t *gfc = NULL;
- gf_changelog_t *hist_gfc = NULL;
+ gf_changelog_journal_t *jnl = NULL;
+ gf_changelog_journal_t *hist_jnl = NULL;
int ret = 0;
int iter = 0;
int fd = -1;
@@ -549,14 +560,14 @@ gf_history_consume (void * data)
goto out;
}
- gfc = (gf_changelog_t *) this->private;
- if (!gfc) {
+ jnl = (gf_changelog_journal_t *) GF_CHANGELOG_GET_API_PTR (this);
+ if (!jnl) {
ret = -1;
goto out;
}
- hist_gfc = gfc->hist_gfc;
- if (!hist_gfc) {
+ hist_jnl = jnl->hist_jnl;
+ if (!hist_jnl) {
ret = -1;
goto out;
}
@@ -568,7 +579,7 @@ gf_history_consume (void * data)
curr = &ccd[iter];
curr->this = this;
- curr->gfc = hist_gfc;
+ curr->jnl = hist_jnl;
curr->fd = fd;
curr->offset = from * (len + 1);
@@ -613,7 +624,7 @@ gf_history_consume (void * data)
}
ret = gf_changelog_publish (curr->this,
- curr->gfc, curr->changelog);
+ curr->jnl, curr->changelog);
if (ret) {
publish = _gf_false;
gf_log (this->name, GF_LOG_ERROR,
@@ -623,7 +634,7 @@ gf_history_consume (void * data)
}
/* informing "parsing done". */
- hist_gfc->hist_done = (publish == _gf_true) ? 0 : -1;
+ hist_jnl->hist_done = (publish == _gf_true) ? 0 : -1;
out:
if (fd != -1)
@@ -740,8 +751,8 @@ gf_history_changelog (char* changelog_dir, unsigned long start,
unsigned long from = 0;
unsigned long total_changelog = 0;
xlator_t *this = NULL;
- gf_changelog_t *gfc = NULL;
- gf_changelog_t *hist_gfc = NULL;
+ gf_changelog_journal_t *jnl = NULL;
+ gf_changelog_journal_t *hist_jnl = NULL;
gf_changelog_history_data_t *hist_data = NULL;
DIR *dirp = NULL;
struct dirent *dp = NULL;
@@ -762,14 +773,14 @@ gf_history_changelog (char* changelog_dir, unsigned long start,
goto out;
}
- gfc = (gf_changelog_t *) this->private;
- if (!gfc) {
+ jnl = (gf_changelog_journal_t *) GF_CHANGELOG_GET_API_PTR (this);
+ if (!jnl) {
ret = -1;
goto out;
}
- hist_gfc = (gf_changelog_t *) gfc->hist_gfc;
- if (!hist_gfc) {
+ hist_jnl = (gf_changelog_journal_t *) jnl->hist_jnl;
+ if (!hist_jnl) {
ret = -1;
goto out;
}
@@ -917,7 +928,7 @@ out:
return ret;
}
- hist_gfc->hist_done = 1;
+ hist_jnl->hist_done = 1;
*actual_end = ts2;
return ret;
diff --git a/xlators/features/changelog/src/Makefile.am b/xlators/features/changelog/src/Makefile.am
index 18c41e7d7d1..8712b9d059f 100644
--- a/xlators/features/changelog/src/Makefile.am
+++ b/xlators/features/changelog/src/Makefile.am
@@ -3,16 +3,24 @@ xlator_LTLIBRARIES = changelog.la
xlatordir = $(libdir)/glusterfs/$(PACKAGE_VERSION)/xlator/features
noinst_HEADERS = changelog-helpers.h changelog-mem-types.h changelog-rt.h \
- changelog-misc.h changelog-encoders.h changelog-notifier.h
+ changelog-rpc-common.h changelog-misc.h changelog-encoders.h \
+ changelog-rpc-common.h changelog-rpc.h changelog-ev-handle.h
changelog_la_LDFLAGS = -module -avoid-version
changelog_la_SOURCES = changelog.c changelog-rt.c changelog-helpers.c \
- changelog-encoders.c changelog-notifier.c changelog-barrier.c
-changelog_la_LIBADD = $(top_builddir)/libglusterfs/src/libglusterfs.la
+ changelog-encoders.c changelog-rpc.c changelog-barrier.c \
+ changelog-rpc-common.c changelog-ev-handle.c
+changelog_la_LIBADD = $(top_builddir)/libglusterfs/src/libglusterfs.la \
+ $(top_builddir)/rpc/xdr/src/libgfxdr.la \
+ $(top_builddir)/rpc/rpc-lib/src/libgfrpc.la
-AM_CPPFLAGS = $(GF_CPPFLAGS) -I$(top_srcdir)/libglusterfs/src -fPIC -D_FILE_OFFSET_BITS=64 \
- -D_GNU_SOURCE -D$(GF_HOST_OS) -DDATADIR=\"$(localstatedir)\"
+AM_CPPFLAGS = $(GF_CPPFLAGS) -I$(top_srcdir)/libglusterfs/src \
+ -I$(top_srcdir)/rpc/xdr/src -I$(top_srcdir)/rpc/rpc-lib/src \
+ -I$(top_srcdir)/rpc/rpc-transport/socket/src \
+ -I$(top_srcdir)/xlators/features/changelog/lib/src/ \
+ -fPIC -D_FILE_OFFSET_BITS=64 -D_GNU_SOURCE -D$(GF_HOST_OS) \
+ -DDATADIR=\"$(localstatedir)\"
AM_CFLAGS = -Wall $(GF_CFLAGS)
diff --git a/xlators/features/changelog/src/changelog-encoders.c b/xlators/features/changelog/src/changelog-encoders.c
index 08626ee2f22..ea9db4061ca 100644
--- a/xlators/features/changelog/src/changelog-encoders.c
+++ b/xlators/features/changelog/src/changelog-encoders.c
@@ -191,7 +191,7 @@ cb_encoder[] = {
};
void
-changelog_encode_change( changelog_priv_t * priv)
+changelog_encode_change(changelog_priv_t *priv)
{
priv->ce = &cb_encoder[priv->encode_mode];
}
diff --git a/xlators/features/changelog/src/changelog-ev-handle.c b/xlators/features/changelog/src/changelog-ev-handle.c
new file mode 100644
index 00000000000..ca7443cfd22
--- /dev/null
+++ b/xlators/features/changelog/src/changelog-ev-handle.c
@@ -0,0 +1,382 @@
+/*
+ Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com>
+ This file is part of GlusterFS.
+
+ This file is licensed to you under your choice of the GNU Lesser
+ General Public License, version 3 or any later version (LGPLv3 or
+ later), or the GNU General Public License, version 2 (GPLv2), in all
+ cases as published by the Free Software Foundation.
+*/
+
+#include "changelog-ev-handle.h"
+#include "changelog-rpc-common.h"
+#include "changelog-helpers.h"
+
+struct rpc_clnt_program changelog_ev_program;
+
+#define NR_IOVEC (MAX_IOVEC - 3)
+struct ev_rpc_vec {
+ int count;
+ struct iovec vector[NR_IOVEC];
+
+ /* sequence number */
+ unsigned long seq;
+};
+
+struct ev_rpc {
+ rbuf_list_t *rlist;
+ struct rpc_clnt *rpc;
+ struct ev_rpc_vec vec;
+};
+
+/**
+ * As of now this just does the minimal (retval logging). Going further
+ * un-acknowledges sequence numbers can be retransmitted and other
+ * intelligence can be built into the server.
+ */
+int
+changelog_event_dispatch_cbk (struct rpc_req *req,
+ struct iovec *iov, int count, void *myframe)
+{
+ return 0;
+}
+
+/* dispatcher RPC */
+inline int
+changelog_dispatch_vec (call_frame_t *frame, xlator_t *this,
+ struct rpc_clnt *rpc, struct ev_rpc_vec *vec)
+{
+ struct timeval tv = {0,};
+ changelog_event_req req = {0,};
+
+ (void) gettimeofday (&tv, NULL);
+
+ /**
+ * Event dispatch RPC header contains a sequence number for each
+ * dispatch. This allows the reciever to order the request before
+ * processing.
+ */
+ req.seq = vec->seq;
+ req.tv_sec = tv.tv_sec;
+ req.tv_usec = tv.tv_usec;
+
+ return changelog_rpc_sumbit_req (rpc, (void *)&req,
+ frame, &changelog_ev_program,
+ CHANGELOG_REV_PROC_EVENT,
+ vec->vector, vec->count, NULL,
+ this, changelog_event_dispatch_cbk,
+ (xdrproc_t) xdr_changelog_event_req);
+ }
+
+ int
+ changelog_event_dispatch_rpc (call_frame_t *frame, xlator_t *this, void *data)
+ {
+ int idx = 0;
+ int count = 0;
+ int ret = 0;
+ unsigned long range = 0;
+ unsigned long sequence = 0;
+ rbuf_iovec_t *rvec = NULL;
+ struct ev_rpc *erpc = NULL;
+ struct rlist_iter riter = {{0,},};
+
+ /* dispatch NR_IOVEC IO vectors at a time. */
+
+ erpc = data;
+ RLIST_GET_SEQ (erpc->rlist, sequence, range);
+
+ rlist_iter_init (&riter, erpc->rlist);
+
+ rvec_for_each_entry (rvec, &riter) {
+ idx = count % NR_IOVEC;
+ if (++count == NR_IOVEC) {
+ erpc->vec.vector[idx] = rvec->iov;
+ erpc->vec.seq = sequence++;
+ erpc->vec.count = NR_IOVEC;
+
+ ret = changelog_dispatch_vec (frame, this,
+ erpc->rpc, &erpc->vec);
+ if (ret)
+ break;
+ count = 0;
+ continue;
+ }
+
+ erpc->vec.vector[idx] = rvec->iov;
+ }
+
+ if (ret)
+ goto error_return;
+
+ idx = count % NR_IOVEC;
+ if (idx) {
+ erpc->vec.seq = sequence;
+ erpc->vec.count = idx;
+
+ ret = changelog_dispatch_vec (frame, this,
+ erpc->rpc, &erpc->vec);
+ }
+
+ error_return:
+ return ret;
+}
+
+int
+changelog_rpc_notify (struct rpc_clnt *rpc,
+ void *mydata, rpc_clnt_event_t event, void *data)
+{
+ xlator_t *this = NULL;
+ changelog_rpc_clnt_t *crpc = NULL;
+ changelog_clnt_t *c_clnt = NULL;
+ changelog_priv_t *priv = NULL;
+ changelog_ev_selector_t *selection = NULL;
+
+ crpc = mydata;
+ this = crpc->this;
+ c_clnt = crpc->c_clnt;
+
+ priv = this->private;
+
+ switch (event) {
+ case RPC_CLNT_CONNECT:
+ rpc_clnt_set_connected (&rpc->conn);
+ selection = &priv->ev_selection;
+
+ LOCK (&c_clnt->wait_lock);
+ {
+ LOCK (&c_clnt->active_lock);
+ {
+ changelog_select_event (this, selection,
+ crpc->filter);
+ list_move_tail (&crpc->list, &c_clnt->active);
+ }
+ UNLOCK (&c_clnt->active_lock);
+ }
+ UNLOCK (&c_clnt->wait_lock);
+
+ break;
+ case RPC_CLNT_DISCONNECT:
+ rpc_clnt_disable (crpc->rpc);
+ selection = &priv->ev_selection;
+
+ LOCK (&crpc->lock);
+ {
+ changelog_deselect_event (this, selection,
+ crpc->filter);
+ changelog_set_disconnect_flag (crpc, _gf_true);
+ }
+ UNLOCK (&crpc->lock);
+
+ break;
+ case RPC_CLNT_MSG:
+ case RPC_CLNT_DESTROY:
+ break;
+ }
+
+ return 0;
+}
+
+void *
+changelog_ev_connector (void *data)
+{
+ xlator_t *this = NULL;
+ changelog_clnt_t *c_clnt = NULL;
+ changelog_rpc_clnt_t *crpc = NULL;
+
+ c_clnt = data;
+ this = c_clnt->this;
+
+ while (1) {
+ pthread_mutex_lock (&c_clnt->pending_lock);
+ {
+ while (list_empty (&c_clnt->pending))
+ pthread_cond_wait (&c_clnt->pending_cond,
+ &c_clnt->pending_lock);
+ crpc = list_first_entry (&c_clnt->pending,
+ changelog_rpc_clnt_t, list);
+ crpc->rpc =
+ changelog_rpc_client_init (this, crpc,
+ crpc->sock,
+ changelog_rpc_notify);
+ if (!crpc->rpc) {
+ gf_log (this->name, GF_LOG_ERROR, "failed to "
+ "connect back.. <%s>", crpc->sock);
+ crpc->cleanup (crpc);
+ goto mutex_unlock;
+ }
+
+ LOCK (&c_clnt->wait_lock);
+ {
+ list_move_tail (&crpc->list, &c_clnt->waitq);
+ }
+ UNLOCK (&c_clnt->wait_lock);
+ }
+ mutex_unlock:
+ pthread_mutex_unlock (&c_clnt->pending_lock);
+ }
+
+ return NULL;
+}
+
+void
+changelog_ev_cleanup_connections (xlator_t *this, changelog_clnt_t *c_clnt)
+{
+ int ret = 0;
+ changelog_rpc_clnt_t *crpc = NULL;
+
+ /* cleanup active connections */
+ LOCK (&c_clnt->active_lock);
+ {
+ list_for_each_entry (crpc, &c_clnt->active, list) {
+ rpc_clnt_disable (crpc->rpc);
+ }
+ }
+ UNLOCK (&c_clnt->active_lock);
+}
+
+/**
+ * TODO: granularize lock
+ *
+ * If we have multiple threads dispatching events, doing it this way is
+ * a performance bottleneck.
+ */
+
+static inline changelog_rpc_clnt_t *
+get_client (changelog_clnt_t *c_clnt, struct list_head **next)
+{
+ changelog_rpc_clnt_t *crpc = NULL;
+
+ LOCK (&c_clnt->active_lock);
+ {
+ if (*next == &c_clnt->active)
+ goto unblock;
+ crpc = list_entry (*next, changelog_rpc_clnt_t, list);
+ changelog_rpc_clnt_ref (crpc);
+ *next = (*next)->next;
+ }
+ unblock:
+ UNLOCK (&c_clnt->active_lock);
+
+ return crpc;
+}
+
+static inline void
+put_client (changelog_clnt_t *c_clnt, changelog_rpc_clnt_t *crpc)
+{
+ LOCK (&c_clnt->active_lock);
+ {
+ changelog_rpc_clnt_unref (crpc);
+ }
+ UNLOCK (&c_clnt->active_lock);
+}
+
+void
+_dispatcher (rbuf_list_t *rlist, void *arg)
+{
+ int ret = 0;
+ xlator_t *this = NULL;
+ changelog_clnt_t *c_clnt = NULL;
+ changelog_rpc_clnt_t *crpc = NULL;
+ changelog_rpc_clnt_t *tmp = NULL;
+ struct ev_rpc erpc = {0,};
+ struct list_head *next = NULL;
+
+ c_clnt = arg;
+ this = c_clnt->this;
+
+ erpc.rlist = rlist;
+ next = c_clnt->active.next;
+
+ while (1) {
+ crpc = get_client (c_clnt, &next);
+ if (!crpc)
+ break;
+ erpc.rpc = crpc->rpc;
+ ret = changelog_invoke_rpc (this, crpc->rpc,
+ &changelog_ev_program,
+ CHANGELOG_REV_PROC_EVENT, &erpc);
+ put_client (c_clnt, crpc);
+ }
+}
+
+/** this is called under rotbuff's lock */
+void
+sequencer (rbuf_list_t *rlist, void *mydata)
+{
+ unsigned long range = 0;
+ changelog_clnt_t *c_clnt = 0;
+
+ c_clnt = mydata;
+
+ range = (RLIST_ENTRY_COUNT (rlist)) / NR_IOVEC;
+ if ((RLIST_ENTRY_COUNT (rlist)) % NR_IOVEC)
+ range++;
+ RLIST_STORE_SEQ (rlist, c_clnt->sequence, range);
+
+ c_clnt->sequence += range;
+}
+
+void *
+changelog_ev_dispatch (void *data)
+{
+ int ret = 0;
+ void *opaque = NULL;
+ xlator_t *this = NULL;
+ changelog_clnt_t *c_clnt = NULL;
+ struct timeval tv = {0,};
+
+ c_clnt = data;
+ this = c_clnt->this;
+
+ while (1) {
+ /* TODO: change this to be pthread cond based.. later */
+ tv.tv_sec = 1;
+ tv.tv_usec = 0;
+ select (0, NULL, NULL, NULL, &tv);
+
+ ret = rbuf_get_buffer (c_clnt->rbuf,
+ &opaque, sequencer, c_clnt);
+ if (ret != RBUF_CONSUMABLE) {
+ if (ret != RBUF_EMPTY)
+ gf_log (this->name, GF_LOG_WARNING,
+ "Failed to get buffer for RPC dispatch "
+ "[rbuf retval: %d]", ret);
+ continue;
+ }
+
+ ret = rbuf_wait_for_completion (c_clnt->rbuf,
+ opaque, _dispatcher, c_clnt);
+ if (ret)
+ gf_log (this->name, GF_LOG_WARNING,
+ "failed to put buffer after consumption");
+ }
+
+ return NULL;
+}
+
+void
+changelog_ev_queue_connection (changelog_clnt_t *c_clnt,
+ changelog_rpc_clnt_t *crpc)
+{
+ pthread_mutex_lock (&c_clnt->pending_lock);
+ {
+ list_add_tail (&crpc->list, &c_clnt->pending);
+ pthread_cond_signal (&c_clnt->pending_cond);
+ }
+ pthread_mutex_unlock (&c_clnt->pending_lock);
+}
+
+struct rpc_clnt_procedure changelog_ev_procs[CHANGELOG_REV_PROC_MAX] = {
+ [CHANGELOG_REV_PROC_NULL] = {"NULL", NULL},
+ [CHANGELOG_REV_PROC_EVENT] = {
+ "EVENT DISPATCH", changelog_event_dispatch_rpc
+ },
+};
+
+struct rpc_clnt_program changelog_ev_program = {
+ .progname = "CHANGELOG EVENT DISPATCHER",
+ .prognum = CHANGELOG_REV_RPC_PROCNUM,
+ .progver = CHANGELOG_REV_RPC_PROCVER,
+ .numproc = CHANGELOG_REV_PROC_MAX,
+ .proctable = changelog_ev_procs,
+};
diff --git a/xlators/features/changelog/src/changelog-ev-handle.h b/xlators/features/changelog/src/changelog-ev-handle.h
new file mode 100644
index 00000000000..eef0492a9ee
--- /dev/null
+++ b/xlators/features/changelog/src/changelog-ev-handle.h
@@ -0,0 +1,140 @@
+/*
+ Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com>
+ This file is part of GlusterFS.
+
+ This file is licensed to you under your choice of the GNU Lesser
+ General Public License, version 3 or any later version (LGPLv3 or
+ later), or the GNU General Public License, version 2 (GPLv2), in all
+ cases as published by the Free Software Foundation.
+*/
+
+#ifndef __CHANGELOG_EV_HANDLE_H
+#define __CHANGELOG_EV_HANDLE_H
+
+#include "list.h"
+#include "xlator.h"
+#include "rpc-clnt.h"
+
+#include "rot-buffs.h"
+
+struct changelog_clnt;
+
+typedef struct changelog_rpc_clnt {
+ xlator_t *this;
+
+ gf_lock_t lock;
+
+ unsigned long ref;
+ gf_boolean_t disconnected;
+
+ unsigned int filter;
+ char sock[UNIX_PATH_MAX];
+
+ struct changelog_clnt *c_clnt; /* back pointer to list holder */
+
+ struct rpc_clnt *rpc; /* RPC client endpoint */
+
+ struct list_head list; /* ->pending, ->waitq, ->active */
+
+ void (*cleanup)
+ (struct changelog_rpc_clnt *); /* cleanup handler */
+} changelog_rpc_clnt_t;
+
+static inline void
+changelog_rpc_clnt_ref (changelog_rpc_clnt_t *crpc)
+{
+ LOCK (&crpc->lock);
+ {
+ ++crpc->ref;
+ }
+ UNLOCK (&crpc->lock);
+}
+
+static inline void
+changelog_set_disconnect_flag (changelog_rpc_clnt_t *crpc, gf_boolean_t flag)
+{
+ crpc->disconnected = flag;
+}
+
+static inline int
+changelog_rpc_clnt_is_disconnected (changelog_rpc_clnt_t *crpc)
+{
+ return (crpc->disconnected == _gf_true);
+}
+
+static inline void
+changelog_rpc_clnt_unref (changelog_rpc_clnt_t *crpc)
+{
+ gf_boolean_t gone = _gf_false;
+
+ LOCK (&crpc->lock);
+ {
+ if (!(--crpc->ref)
+ && changelog_rpc_clnt_is_disconnected (crpc)) {
+ list_del (&crpc->list);
+ gone = _gf_true;
+ }
+ }
+ UNLOCK (&crpc->lock);
+
+ if (gone)
+ crpc->cleanup (crpc);
+}
+
+/**
+ * This structure holds pending and active clients. On probe RPC all
+ * an instance of the above structure (@changelog_rpc_clnt) is placed
+ * in ->pending and gets moved to ->active on a successful connect.
+ *
+ * locking rules:
+ *
+ * Manipulating ->pending
+ * ->pending_lock
+ * ->pending
+ *
+ * Manipulating ->active
+ * ->active_lock
+ * ->active
+ *
+ * Moving object from ->pending to ->active
+ * ->pending_lock
+ * ->active_lock
+ *
+ * Objects are _never_ moved from ->active to ->pending, i.e., during
+ * disconnection, the object is destroyed. Well, we could have tried
+ * to reconnect, but that's pure waste.. let the other end reconnect.
+ */
+
+typedef struct changelog_clnt {
+ xlator_t *this;
+
+ /* pending connections */
+ pthread_mutex_t pending_lock;
+ pthread_cond_t pending_cond;
+ struct list_head pending;
+
+ /* current active connections */
+ gf_lock_t active_lock;
+ struct list_head active;
+
+ gf_lock_t wait_lock;
+ struct list_head waitq;
+
+ /* consumer part of rot-buffs */
+ rbuf_t *rbuf;
+ unsigned long sequence;
+} changelog_clnt_t;
+
+void *changelog_ev_connector (void *);
+
+void *changelog_ev_dispatch (void *);
+
+/* APIs */
+void
+changelog_ev_queue_connection (changelog_clnt_t *, changelog_rpc_clnt_t *);
+
+void
+changelog_ev_cleanup_connections (xlator_t *, changelog_clnt_t *);
+
+#endif
+
diff --git a/xlators/features/changelog/src/changelog-helpers.c b/xlators/features/changelog/src/changelog-helpers.c
index 3af2938190d..5c755d76d69 100644
--- a/xlators/features/changelog/src/changelog-helpers.c
+++ b/xlators/features/changelog/src/changelog-helpers.c
@@ -24,6 +24,7 @@
#include "changelog-mem-types.h"
#include "changelog-encoders.h"
+#include "changelog-rpc-common.h"
#include <pthread.h>
static inline void
@@ -57,7 +58,7 @@ changelog_cleanup_free_mutex (void *arg_mutex)
pthread_mutex_unlock(p_mutex);
}
-void
+int
changelog_thread_cleanup (xlator_t *this, pthread_t thr_id)
{
int ret = 0;
@@ -65,7 +66,7 @@ changelog_thread_cleanup (xlator_t *this, pthread_t thr_id)
/* send a cancel request to the thread */
ret = pthread_cancel (thr_id);
- if (ret) {
+ if (ret != 0) {
gf_log (this->name, GF_LOG_ERROR,
"could not cancel thread (reason: %s)",
strerror (errno));
@@ -73,14 +74,14 @@ changelog_thread_cleanup (xlator_t *this, pthread_t thr_id)
}
ret = pthread_join (thr_id, &retval);
- if (ret || (retval != PTHREAD_CANCELED)) {
+ if ((ret != 0) || (retval != PTHREAD_CANCELED)) {
gf_log (this->name, GF_LOG_ERROR,
"cancel request not adhered as expected"
" (reason: %s)", strerror (errno));
}
out:
- return;
+ return ret;
}
inline void *
@@ -98,6 +99,145 @@ changelog_get_usable_buffer (changelog_local_t *local)
return cld->cld_iobuf->ptr;
}
+static inline int
+changelog_selector_index (unsigned int selector)
+{
+ return (ffs (selector) - 1);
+}
+
+inline int
+changelog_ev_selected (xlator_t *this,
+ changelog_ev_selector_t *selection,
+ unsigned int selector)
+{
+ int idx = 0;
+
+ idx = changelog_selector_index (selector);
+ gf_log (this->name, GF_LOG_DEBUG,
+ "selector ref count for %d (idx: %d): %d",
+ selector, idx, selection->ref[idx]);
+ /* this can be lockless */
+ return (idx < CHANGELOG_EV_SELECTION_RANGE
+ && (selection->ref[idx] > 0));
+}
+
+inline void
+changelog_select_event (xlator_t *this,
+ changelog_ev_selector_t *selection,
+ unsigned int selector)
+{
+ int idx = 0;
+
+ LOCK (&selection->reflock);
+ {
+ while (selector) {
+ idx = changelog_selector_index (selector);
+ if (idx < CHANGELOG_EV_SELECTION_RANGE) {
+ selection->ref[idx]++;
+ gf_log (this->name, GF_LOG_DEBUG,
+ "selecting event %d", idx);
+ }
+ selector &= ~(1 << idx);
+ }
+ }
+ UNLOCK (&selection->reflock);
+}
+
+inline void
+changelog_deselect_event (xlator_t *this,
+ changelog_ev_selector_t *selection,
+ unsigned int selector)
+{
+ int idx = 0;
+
+ LOCK (&selection->reflock);
+ {
+ while (selector) {
+ idx = changelog_selector_index (selector);
+ if (idx < CHANGELOG_EV_SELECTION_RANGE) {
+ selection->ref[idx]--;
+ gf_log (this->name, GF_LOG_DEBUG,
+ "de-selecting event %d", idx);
+ }
+ selector &= ~(1 << idx);
+ }
+ }
+ UNLOCK (&selection->reflock);
+}
+
+inline int
+changelog_init_event_selection (xlator_t *this,
+ changelog_ev_selector_t *selection)
+{
+ int ret = 0;
+ int j = CHANGELOG_EV_SELECTION_RANGE;
+
+ ret = LOCK_INIT (&selection->reflock);
+ if (ret != 0)
+ return -1;
+
+ LOCK (&selection->reflock);
+ {
+ while (j--) {
+ selection->ref[j] = 0;
+ }
+ }
+ UNLOCK (&selection->reflock);
+
+ return 0;
+}
+
+inline int
+changelog_cleanup_event_selection (xlator_t *this,
+ changelog_ev_selector_t *selection)
+{
+ int ret = 0;
+ int j = CHANGELOG_EV_SELECTION_RANGE;
+
+ LOCK (&selection->reflock);
+ {
+ while (j--) {
+ if (selection->ref[j] > 0)
+ gf_log (this->name, GF_LOG_WARNING,
+ "changelog event selection cleaning up "
+ " on active references");
+ }
+ }
+ UNLOCK (&selection->reflock);
+
+ return LOCK_DESTROY (&selection->reflock);
+}
+
+static inline void
+changelog_perform_dispatch (xlator_t *this,
+ changelog_priv_t *priv, void *mem, size_t size)
+{
+ char *buf = NULL;
+ void *opaque = NULL;
+
+ buf = rbuf_reserve_write_area (priv->rbuf, size, &opaque);
+ if (!buf) {
+ gf_log_callingfn (this->name,
+ GF_LOG_WARNING, "failed to dispatch event");
+ return;
+ }
+
+ memcpy (buf, mem, size);
+ rbuf_write_complete (opaque);
+}
+
+inline void
+changelog_dispatch_event (xlator_t *this,
+ changelog_priv_t *priv, changelog_event_t *ev)
+{
+ changelog_ev_selector_t *selection = NULL;
+
+ selection = &priv->ev_selection;
+ if (changelog_ev_selected (this, selection, ev->ev_type)) {
+ changelog_perform_dispatch (this, priv, ev, CHANGELOG_EV_SIZE);
+ }
+}
+
inline void
changelog_set_usable_record_and_length (changelog_local_t *local,
size_t len, int xr)
@@ -206,9 +346,9 @@ changelog_rollover_changelog (xlator_t *this,
{
int ret = -1;
int notify = 0;
- char *bname = NULL;
char ofile[PATH_MAX] = {0,};
char nfile[PATH_MAX] = {0,};
+ changelog_event_t ev = {0,};
if (priv->changelog_fd != -1) {
ret = fsync (priv->changelog_fd);
@@ -252,40 +392,32 @@ changelog_rollover_changelog (xlator_t *this,
}
if (notify) {
- bname = basename (nfile);
- gf_log (this->name, GF_LOG_DEBUG, "notifying: %s", bname);
- ret = changelog_write (priv->wfd, bname, strlen (bname) + 1);
- if (ret) {
- gf_log (this->name, GF_LOG_ERROR,
- "Failed to send file name to notify thread"
- " (reason: %s)", strerror (errno));
- } else {
- /* If this is explicit rollover initiated by snapshot,
- * wakeup reconfigure thread waiting for changelog to
- * rollover
- */
- if (priv->explicit_rollover) {
- priv->explicit_rollover = _gf_false;
- ret = pthread_mutex_lock (
- &priv->bn.bnotify_mutex);
- CHANGELOG_PTHREAD_ERROR_HANDLE_0 (ret, out);
- {
- priv->bn.bnotify = _gf_false;
- ret = pthread_cond_signal (
- &priv->bn.bnotify_cond);
- CHANGELOG_PTHREAD_ERROR_HANDLE_0 (ret,
- out);
- gf_log (this->name, GF_LOG_INFO,
- "Changelog published: %s and"
- " signalled bnotify", bname);
- }
- ret = pthread_mutex_unlock (
- &priv->bn.bnotify_mutex);
+ ev.ev_type = CHANGELOG_OP_TYPE_JOURNAL;
+ memcpy (ev.u.journal.path, nfile, strlen (nfile) + 1);
+ changelog_dispatch_event (this, priv, &ev);
+
+ /* If this is explicit rollover initiated by snapshot,
+ * wakeup reconfigure thread waiting for changelog to
+ * rollover
+ */
+ if (priv->explicit_rollover) {
+ priv->explicit_rollover = _gf_false;
+
+ ret = pthread_mutex_lock (&priv->bn.bnotify_mutex);
+ CHANGELOG_PTHREAD_ERROR_HANDLE_0 (ret, out);
+ {
+ priv->bn.bnotify = _gf_false;
+ ret = pthread_cond_signal
+ (&priv->bn.bnotify_cond);
CHANGELOG_PTHREAD_ERROR_HANDLE_0 (ret, out);
+ gf_log (this->name, GF_LOG_INFO,
+ "Changelog published: %s signalled"
+ " bnotify", nfile);
}
+ ret = pthread_mutex_unlock (&priv->bn.bnotify_mutex);
+ CHANGELOG_PTHREAD_ERROR_HANDLE_0 (ret, out);
}
}
-
out:
return ret;
}
@@ -434,8 +566,8 @@ changelog_snap_logging_stop (xlator_t *this,
}
int
-changelog_open (xlator_t *this,
- changelog_priv_t *priv)
+changelog_open_journal (xlator_t *this,
+ changelog_priv_t *priv)
{
int fd = 0;
int ret = -1;
@@ -490,7 +622,7 @@ changelog_start_next_change (xlator_t *this,
ret = changelog_rollover_changelog (this, priv, ts);
if (!ret && !finale)
- ret = changelog_open (this, priv);
+ ret = changelog_open_journal (this, priv);
return ret;
}
@@ -975,7 +1107,7 @@ __changelog_inode_ctx_set (xlator_t *this,
* one shot routine to get the address and the value of a inode version
* for a particular type.
*/
-static changelog_inode_ctx_t *
+changelog_inode_ctx_t *
__changelog_inode_ctx_get (xlator_t *this,
inode_t *inode, unsigned long **iver,
unsigned long *version, changelog_log_type type)
diff --git a/xlators/features/changelog/src/changelog-helpers.h b/xlators/features/changelog/src/changelog-helpers.h
index 03a795369d1..33a99ee4eed 100644
--- a/xlators/features/changelog/src/changelog-helpers.h
+++ b/xlators/features/changelog/src/changelog-helpers.h
@@ -15,10 +15,16 @@
#include "timer.h"
#include "pthread.h"
#include "iobuf.h"
+#include "rot-buffs.h"
#include "changelog-misc.h"
#include "call-stub.h"
+#include "rpcsvc.h"
+#include "changelog-ev-handle.h"
+
+#include "changelog.h"
+
/**
* the changelog entry
*/
@@ -120,29 +126,6 @@ typedef struct changelog_fsync {
xlator_t *this;
} changelog_fsync_t;
-# define CHANGELOG_MAX_CLIENTS 5
-typedef struct changelog_notify {
- /* reader end of the pipe */
- int rfd;
-
- /* notifier thread */
- pthread_t notify_th;
-
- /* unique socket path */
- char sockpath[UNIX_PATH_MAX];
-
- int socket_fd;
-
- /**
- * simple array of accept()'ed fds. Not scalable at all
- * for large number of clients, but it's okay as we have
- * a ahrd limit in this version (@CHANGELOG_MAX_CLIENTS).
- */
- int client_fd[CHANGELOG_MAX_CLIENTS];
-
- xlator_t *this;
-} changelog_notify_t;
-
/* Draining during changelog rollover (for geo-rep snapshot dependency):
* --------------------------------------------------------------------
* The introduction of draining of in-transit fops during changelog rollover
@@ -162,14 +145,14 @@ typedef struct changelog_notify {
typedef enum chlog_fop_color {
FOP_COLOR_BLACK,
FOP_COLOR_WHITE
-}chlog_fop_color_t;
+} chlog_fop_color_t;
/* Barrier notify variable */
typedef struct barrier_notify {
pthread_mutex_t bnotify_mutex;
pthread_cond_t bnotify_cond;
gf_boolean_t bnotify;
-}barrier_notify_t;
+} barrier_notify_t;
/* Two separate mutex and conditional variable set is used
* to drain white and black fops. */
@@ -185,15 +168,26 @@ typedef struct drain_mgmt {
unsigned long white_fop_cnt;
gf_boolean_t drain_wait_black;
gf_boolean_t drain_wait_white;
-}drain_mgmt_t;
+} drain_mgmt_t;
/* External barrier as a result of snap on/off indicating flag*/
typedef struct barrier_flags {
gf_lock_t lock;
gf_boolean_t barrier_ext;
-}barrier_flags_t;
+} barrier_flags_t;
+/* Event selection */
+typedef struct changelog_ev_selector {
+ gf_lock_t reflock;
+ /**
+ * Array of references for each selection bit.
+ */
+ unsigned int ref[CHANGELOG_EV_SELECTION_RANGE];
+} changelog_ev_selector_t;
+
+
+/* changelog's private structure */
struct changelog_priv {
gf_boolean_t active;
@@ -223,9 +217,6 @@ struct changelog_priv {
/* lock to synchronize CSNAP updation */
gf_lock_t c_snap_lock;
- /* writen end of the pipe */
- int wfd;
-
/* rollover time */
int32_t rollover_time;
@@ -247,9 +238,6 @@ struct changelog_priv {
/* context of fsync thread */
changelog_fsync_t cf;
- /* context of the notifier thread */
- changelog_notify_t cn;
-
/* operation mode */
changelog_mode_t op_mode;
@@ -262,7 +250,9 @@ struct changelog_priv {
/* encoder */
struct changelog_encoder *ce;
- /* snapshot dependency changes */
+ /**
+ * snapshot dependency changes
+ */
/* Draining of fops*/
drain_mgmt_t dm;
@@ -289,6 +279,30 @@ struct changelog_priv {
gf_timer_t *timer;
struct timespec timeout;
+ /**
+ * buffers, RPC, event selection, notifications and other
+ * beasts.
+ */
+
+ /* epoll pthread */
+ pthread_t poller;
+
+ /* rotational buffer */
+ rbuf_t *rbuf;
+
+ /* changelog RPC server */
+ rpcsvc_t *rpc;
+
+ /* event selection */
+ changelog_ev_selector_t ev_selection;
+
+ /* client handling (reverse connection) */
+ pthread_t connector;
+
+ int nr_dispatchers;
+ pthread_t *ev_dispatcher;
+
+ changelog_clnt_t connections;
};
struct changelog_local {
@@ -367,7 +381,7 @@ typedef struct {
* helpers routines
*/
-void
+int
changelog_thread_cleanup (xlator_t *this, pthread_t thr_id);
void *
@@ -386,7 +400,7 @@ changelog_start_next_change (xlator_t *this,
changelog_priv_t *priv,
unsigned long ts, gf_boolean_t finale);
int
-changelog_open (xlator_t *this, changelog_priv_t *priv);
+changelog_open_journal (xlator_t *this, changelog_priv_t *priv);
int
changelog_fill_rollover_data (changelog_log_data_t *cld, gf_boolean_t is_last);
int
@@ -449,6 +463,7 @@ changelog_snap_handle_ascii_change (xlator_t *this,
changelog_log_data_t *cld);
int
changelog_snap_write_change (changelog_priv_t *priv, char *buffer, size_t len);
+
/* Changelog barrier routines */
void __chlog_barrier_enqueue (xlator_t *this, call_stub_t *stub);
void __chlog_barrier_disable (xlator_t *this, struct list_head *queue);
@@ -460,6 +475,24 @@ int32_t
changelog_fill_entry_buf (call_frame_t *frame, xlator_t *this,
loc_t *loc, changelog_local_t **local);
+/* event selection routines */
+inline void changelog_select_event (xlator_t *,
+ changelog_ev_selector_t *, unsigned int);
+inline void changelog_deselect_event (xlator_t *,
+ changelog_ev_selector_t *, unsigned int);
+inline int changelog_init_event_selection (xlator_t *,
+ changelog_ev_selector_t *);
+inline int changelog_cleanup_event_selection (xlator_t *,
+ changelog_ev_selector_t *);
+inline int changelog_ev_selected (xlator_t *,
+ changelog_ev_selector_t *, unsigned int);
+inline void
+changelog_dispatch_event (xlator_t *, changelog_priv_t *, changelog_event_t *);
+
+changelog_inode_ctx_t *
+__changelog_inode_ctx_get (xlator_t *, inode_t *, unsigned long **,
+ unsigned long *, changelog_log_type);
+
/* macros */
#define CHANGELOG_STACK_UNWIND(fop, frame, params ...) do { \
@@ -471,10 +504,10 @@ changelog_fill_entry_buf (call_frame_t *frame, xlator_t *this,
frame->local = NULL; \
} \
STACK_UNWIND_STRICT (fop, frame, params); \
- changelog_local_cleanup (__xl, __local); \
if (__local && __local->prev_entry) \
changelog_local_cleanup (__xl, \
__local->prev_entry); \
+ changelog_local_cleanup (__xl, __local); \
} while (0)
#define CHANGELOG_IOBUF_REF(iobuf) do { \
diff --git a/xlators/features/changelog/src/changelog-mem-types.h b/xlators/features/changelog/src/changelog-mem-types.h
index e1fa319a715..1618f722f6c 100644
--- a/xlators/features/changelog/src/changelog-mem-types.h
+++ b/xlators/features/changelog/src/changelog-mem-types.h
@@ -14,16 +14,21 @@
#include "mem-types.h"
enum gf_changelog_mem_types {
- gf_changelog_mt_priv_t = gf_common_mt_end + 1,
- gf_changelog_mt_str_t = gf_common_mt_end + 2,
- gf_changelog_mt_batch_t = gf_common_mt_end + 3,
- gf_changelog_mt_rt_t = gf_common_mt_end + 4,
- gf_changelog_mt_inode_ctx_t = gf_common_mt_end + 5,
- gf_changelog_mt_libgfchangelog_t = gf_common_mt_end + 6,
- gf_changelog_mt_libgfchangelog_rl_t = gf_common_mt_end + 7,
- gf_changelog_mt_libgfchangelog_dirent_t = gf_common_mt_end + 8,
- gf_changelog_mt_changelog_buffer_t = gf_common_mt_end + 9,
- gf_changelog_mt_history_data_t = gf_common_mt_end + 10,
+ gf_changelog_mt_priv_t = gf_common_mt_end + 1,
+ gf_changelog_mt_str_t = gf_common_mt_end + 2,
+ gf_changelog_mt_batch_t = gf_common_mt_end + 3,
+ gf_changelog_mt_rt_t = gf_common_mt_end + 4,
+ gf_changelog_mt_inode_ctx_t = gf_common_mt_end + 5,
+ gf_changelog_mt_rpc_clnt_t = gf_common_mt_end + 6,
+ gf_changelog_mt_libgfchangelog_t = gf_common_mt_end + 7,
+ gf_changelog_mt_libgfchangelog_entry_t = gf_common_mt_end + 8,
+ gf_changelog_mt_libgfchangelog_rl_t = gf_common_mt_end + 9,
+ gf_changelog_mt_libgfchangelog_dirent_t = gf_common_mt_end + 10,
+ gf_changelog_mt_changelog_buffer_t = gf_common_mt_end + 11,
+ gf_changelog_mt_history_data_t = gf_common_mt_end + 12,
+ gf_changelog_mt_libgfchangelog_call_pool_t = gf_common_mt_end + 13,
+ gf_changelog_mt_libgfchangelog_event_t = gf_common_mt_end + 14,
+ gf_changelog_mt_ev_dispatcher_t = gf_common_mt_end + 15,
gf_changelog_mt_end
};
diff --git a/xlators/features/changelog/src/changelog-misc.h b/xlators/features/changelog/src/changelog-misc.h
index 58b10961463..b45302ad099 100644
--- a/xlators/features/changelog/src/changelog-misc.h
+++ b/xlators/features/changelog/src/changelog-misc.h
@@ -25,6 +25,7 @@
#define CHANGELOG_VERSION_MINOR 1
#define CHANGELOG_UNIX_SOCK DEFAULT_VAR_RUN_DIRECTORY"/changelog-%s.sock"
+#define CHANGELOG_TMP_UNIX_SOCK DEFAULT_VAR_RUN_DIRECTORY"/.%s%lu.sock"
/**
* header starts with the version and the format of the changelog.
@@ -42,6 +43,19 @@
CHANGELOG_UNIX_SOCK, md5_sum); \
} while (0)
+#define CHANGELOG_MAKE_TMP_SOCKET_PATH(brick_path, sockpath, len) do { \
+ unsigned long pid = 0; \
+ char md5_sum[MD5_DIGEST_LENGTH*2+1] = {0,}; \
+ pid = (unsigned long) getpid (); \
+ md5_wrapper((unsigned char *) brick_path, \
+ strlen(brick_path), \
+ md5_sum); \
+ (void) snprintf (sockpath, \
+ len, CHANGELOG_TMP_UNIX_SOCK, \
+ md5_sum, pid); \
+ } while (0)
+
+
/**
* ... used by libgfchangelog.
*/
diff --git a/xlators/features/changelog/src/changelog-notifier.c b/xlators/features/changelog/src/changelog-notifier.c
deleted file mode 100644
index 5f3d063a8ad..00000000000
--- a/xlators/features/changelog/src/changelog-notifier.c
+++ /dev/null
@@ -1,314 +0,0 @@
-/*
- Copyright (c) 2013 Red Hat, Inc. <http://www.redhat.com>
- This file is part of GlusterFS.
-
- This file is licensed to you under your choice of the GNU Lesser
- General Public License, version 3 or any later version (LGPLv3 or
- later), or the GNU General Public License, version 2 (GPLv2), in all
- cases as published by the Free Software Foundation.
-*/
-
-#include "changelog-notifier.h"
-
-#include <pthread.h>
-
-inline static void
-changelog_notify_clear_fd (changelog_notify_t *cn, int i)
-{
- cn->client_fd[i] = -1;
-}
-
-inline static void
-changelog_notify_save_fd (changelog_notify_t *cn, int i, int fd)
-{
- cn->client_fd[i] = fd;
-}
-
-static int
-changelog_notify_insert_fd (xlator_t *this, changelog_notify_t *cn, int fd)
-{
- int i = 0;
- int ret = 0;
-
- for (; i < CHANGELOG_MAX_CLIENTS; i++) {
- if (cn->client_fd[i] == -1)
- break;
- }
-
- if (i == CHANGELOG_MAX_CLIENTS) {
- /**
- * this case should not be hit as listen() would limit
- * the number of completely established connections.
- */
- gf_log (this->name, GF_LOG_WARNING,
- "hit max client limit (%d)", CHANGELOG_MAX_CLIENTS);
- ret = -1;
- }
- else
- changelog_notify_save_fd (cn, i, fd);
-
- return ret;
-}
-
-static void
-changelog_notify_fill_rset (changelog_notify_t *cn, fd_set *rset, int *maxfd)
-{
- int i = 0;
-
- FD_ZERO (rset);
-
- FD_SET (cn->socket_fd, rset);
- *maxfd = cn->socket_fd;
-
- FD_SET (cn->rfd, rset);
- *maxfd = max (*maxfd, cn->rfd);
-
- for (; i < CHANGELOG_MAX_CLIENTS; i++) {
- if (cn->client_fd[i] != -1) {
- FD_SET (cn->client_fd[i], rset);
- *maxfd = max (*maxfd, cn->client_fd[i]);
- }
- }
-
- *maxfd = *maxfd + 1;
-}
-
-static int
-changelog_notify_client (changelog_notify_t *cn, char *path, ssize_t len)
-{
- int i = 0;
- int ret = 0;
-
- for (; i < CHANGELOG_MAX_CLIENTS; i++) {
- if (cn->client_fd[i] == -1)
- continue;
-
- if (changelog_write (cn->client_fd[i],
- path, len)) {
- ret = -1;
-
- close (cn->client_fd[i]);
- changelog_notify_clear_fd (cn, i);
- }
- }
-
- return ret;
-}
-
-static void
-changelog_notifier_init (changelog_notify_t *cn)
-{
- int i = 0;
-
- cn->socket_fd = -1;
-
- for (; i < CHANGELOG_MAX_CLIENTS; i++) {
- changelog_notify_clear_fd (cn, i);
- }
-}
-
-static void
-changelog_close_client_conn (changelog_notify_t *cn)
-{
- int i = 0;
-
- for (; i < CHANGELOG_MAX_CLIENTS; i++) {
- if (cn->client_fd[i] == -1)
- continue;
-
- close (cn->client_fd[i]);
- changelog_notify_clear_fd (cn, i);
- }
-}
-
-static void
-changelog_notifier_cleanup (void *arg)
-{
- changelog_notify_t *cn = NULL;
-
- cn = (changelog_notify_t *) arg;
-
- changelog_close_client_conn (cn);
-
- if (cn->socket_fd != -1)
- close (cn->socket_fd);
-
- if (cn->rfd)
- close (cn->rfd);
-
- if (unlink (cn->sockpath))
- gf_log ("", GF_LOG_WARNING,
- "could not unlink changelog socket file"
- " %s (reason: %s", cn->sockpath, strerror (errno));
-}
-
-void *
-changelog_notifier (void *data)
-{
- int i = 0;
- int fd = 0;
- int max_fd = 0;
- int len = 0;
- ssize_t readlen = 0;
- xlator_t *this = NULL;
- changelog_priv_t *priv = NULL;
- changelog_notify_t *cn = NULL;
- struct sockaddr_un local = {0,};
- char path[PATH_MAX] = {0,};
- char abspath[PATH_MAX] = {0,};
-
- char buffer;
- fd_set rset;
-
- priv = (changelog_priv_t *) data;
-
- cn = &priv->cn;
- this = cn->this;
-
- pthread_cleanup_push (changelog_notifier_cleanup, cn);
-
- changelog_notifier_init (cn);
-
- cn->socket_fd = socket (AF_UNIX, SOCK_STREAM, 0);
- if (cn->socket_fd < 0) {
- gf_log (this->name, GF_LOG_ERROR,
- "changelog socket error (reason: %s)",
- strerror (errno));
- goto out;
- }
-
- CHANGELOG_MAKE_SOCKET_PATH (priv->changelog_brick,
- cn->sockpath, UNIX_PATH_MAX);
- if (unlink (cn->sockpath) < 0) {
- if (errno != ENOENT) {
- gf_log (this->name, GF_LOG_ERROR,
- "Could not unlink changelog socket file (%s)"
- " (reason: %s)",
- CHANGELOG_UNIX_SOCK, strerror (errno));
- goto cleanup;
- }
- }
-
- local.sun_family = AF_UNIX;
- strcpy (local.sun_path, cn->sockpath);
-
- len = strlen (local.sun_path) + sizeof (local.sun_family);
-
- /* bind to the unix domain socket */
- if (bind (cn->socket_fd, (struct sockaddr *) &local, len) < 0) {
- gf_log (this->name, GF_LOG_ERROR,
- "Could not bind to changelog socket (reason: %s)",
- strerror (errno));
- goto cleanup;
- }
-
- /* listen for incoming connections */
- if (listen (cn->socket_fd, CHANGELOG_MAX_CLIENTS) < 0) {
- gf_log (this->name, GF_LOG_ERROR,
- "listen() error on changelog socket (reason: %s)",
- strerror (errno));
- goto cleanup;
- }
-
- /**
- * simple select() on all to-be-read file descriptors. This method
- * though old school works pretty well when you have a handfull of
- * fd's to be watched (clients).
- *
- * Future TODO: move this to epoll based notification facility if
- * number of clients increase.
- */
- for (;;) {
- changelog_notify_fill_rset (cn, &rset, &max_fd);
-
- if (select (max_fd, &rset, NULL, NULL, NULL) < 0) {
- gf_log (this->name, GF_LOG_ERROR,
- "select() returned -1 (reason: %s)",
- strerror (errno));
- sleep (2);
- continue;
- }
-
- if (FD_ISSET (cn->socket_fd, &rset)) {
- fd = accept (cn->socket_fd, NULL, NULL);
- if (fd < 0) {
- gf_log (this->name, GF_LOG_ERROR,
- "accept error on changelog socket"
- " (reason: %s)", strerror (errno));
- } else if (changelog_notify_insert_fd (this, cn, fd)) {
- gf_log (this->name, GF_LOG_ERROR,
- "hit max client limit");
- }
- }
-
- if (FD_ISSET (cn->rfd, &rset)) {
- /**
- * read changelog filename and notify all connected
- * clients.
- */
- readlen = 0;
- while (readlen < PATH_MAX) {
- len = read (cn->rfd, &path[readlen++], 1);
- if (len == -1) {
- break;
- }
-
- if (len == 0) {
- gf_log (this->name, GF_LOG_ERROR,
- "rollover thread sent EOF"
- " on pipe - possibly a crash.");
- /* be blunt and close all connections */
- pthread_exit(NULL);
- }
-
- if (path[readlen - 1] == '\0')
- break;
- }
-
- /* should we close all client connections here too? */
- if (len < 0 || readlen == PATH_MAX) {
- gf_log (this->name, GF_LOG_ERROR,
- "Could not get pathname from rollover"
- " thread or pathname too long");
- goto process_rest;
- }
-
- (void) snprintf (abspath, PATH_MAX,
- "%s/%s", priv->changelog_dir, path);
- if (changelog_notify_client (cn, abspath,
- strlen (abspath) + 1))
- gf_log (this->name, GF_LOG_ERROR,
- "could not notify some clients with new"
- " changelogs");
- }
-
- process_rest:
- for (i = 0; i < CHANGELOG_MAX_CLIENTS; i++) {
- if ( (fd = cn->client_fd[i]) == -1 )
- continue;
-
- if (FD_ISSET (fd, &rset)) {
- /**
- * the only data we accept from the client is a
- * disconnect. Anything else is treated as bogus
- * and is silently discarded (also warned!!!).
- */
- if ( (readlen = read (fd, &buffer, 1)) <= 0 ) {
- close (fd);
- changelog_notify_clear_fd (cn, i);
- } else {
- /* silently discard data and log */
- gf_log (this->name, GF_LOG_WARNING,
- "misbehaving changelog client");
- }
- }
- }
-
- }
-
- cleanup:;
- pthread_cleanup_pop (1);
-
- out:
- return NULL;
-}
diff --git a/xlators/features/changelog/src/changelog-rpc-common.c b/xlators/features/changelog/src/changelog-rpc-common.c
new file mode 100644
index 00000000000..76db6696ae8
--- /dev/null
+++ b/xlators/features/changelog/src/changelog-rpc-common.c
@@ -0,0 +1,334 @@
+/*
+ Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com>
+ This file is part of GlusterFS.
+
+ This file is licensed to you under your choice of the GNU Lesser
+ General Public License, version 3 or any later version (LGPLv3 or
+ later), or the GNU General Public License, version 2 (GPLv2), in all
+ cases as published by the Free Software Foundation.
+*/
+
+#include "changelog-rpc-common.h"
+
+/**
+*****************************************************
+ Client Interface
+*****************************************************
+*/
+
+/**
+ * Initialize and return an RPC client object for a given unix
+ * domain socket.
+ */
+
+void *
+changelog_rpc_poller (void *arg)
+{
+ xlator_t *this = arg;
+
+ (void) event_dispatch (this->ctx->event_pool);
+ return NULL;
+}
+
+struct rpc_clnt *
+changelog_rpc_client_init (xlator_t *this, void *cbkdata,
+ char *sockfile, rpc_clnt_notify_t fn)
+{
+ int ret = 0;
+ struct rpc_clnt *rpc = NULL;
+ dict_t *options = NULL;
+
+ if (!cbkdata)
+ cbkdata = this;
+
+ options = dict_new ();
+ if (!options)
+ goto error_return;
+
+ ret = rpc_transport_unix_options_build (&options, sockfile, 0);
+ if (ret) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "failed to build rpc options");
+ goto dealloc_dict;
+ }
+
+ rpc = rpc_clnt_new (options, this->ctx, this->name, 16);
+ if (!rpc)
+ goto dealloc_dict;
+
+ ret = rpc_clnt_register_notify (rpc, fn, cbkdata);
+ if (ret) {
+ gf_log (this->name, GF_LOG_ERROR, "failed to register notify");
+ goto dealloc_rpc_clnt;
+ }
+
+ ret = rpc_clnt_start (rpc);
+ if (ret) {
+ gf_log (this->name, GF_LOG_ERROR, "failed to start rpc");
+ goto dealloc_rpc_clnt;
+ }
+
+ return rpc;
+
+ dealloc_rpc_clnt:
+ rpc_clnt_unref (rpc);
+ dealloc_dict:
+ dict_unref (options);
+ error_return:
+ return NULL;
+}
+
+/**
+ * Generic RPC client routine to dispatch a request to an
+ * RPC server.
+ */
+int
+changelog_rpc_sumbit_req (struct rpc_clnt *rpc, void *req,
+ call_frame_t *frame, rpc_clnt_prog_t *prog,
+ int procnum, struct iovec *payload, int payloadcnt,
+ struct iobref *iobref, xlator_t *this,
+ fop_cbk_fn_t cbkfn, xdrproc_t xdrproc)
+{
+ int ret = 0;
+ int count = 0;
+ struct iovec iov = {0, };
+ struct iobuf *iobuf = NULL;
+ char new_iobref = 0;
+ ssize_t xdr_size = 0;
+
+ GF_ASSERT (this);
+
+ if (req) {
+ xdr_size = xdr_sizeof (xdrproc, req);
+
+ iobuf = iobuf_get2 (this->ctx->iobuf_pool, xdr_size);
+ if (!iobuf) {
+ goto out;
+ };
+
+ if (!iobref) {
+ iobref = iobref_new ();
+ if (!iobref) {
+ goto out;
+ }
+
+ new_iobref = 1;
+ }
+
+ iobref_add (iobref, iobuf);
+
+ iov.iov_base = iobuf->ptr;
+ iov.iov_len = iobuf_size (iobuf);
+
+ /* Create the xdr payload */
+ ret = xdr_serialize_generic (iov, req, xdrproc);
+ if (ret == -1) {
+ goto out;
+ }
+
+ iov.iov_len = ret;
+ count = 1;
+ }
+
+ ret = rpc_clnt_submit (rpc, prog, procnum, cbkfn, &iov, count,
+ payload, payloadcnt, iobref, frame, NULL,
+ 0, NULL, 0, NULL);
+
+ out:
+ if (new_iobref)
+ iobref_unref (iobref);
+ if (iobuf)
+ iobuf_unref (iobuf);
+ return ret;
+}
+
+/**
+ * Entry point to perform a remote procedure call
+ */
+int
+changelog_invoke_rpc (xlator_t *this, struct rpc_clnt *rpc,
+ rpc_clnt_prog_t *prog, int procidx, void *arg)
+{
+ int ret = 0;
+ call_frame_t *frame = NULL;
+ rpc_clnt_procedure_t *proc = NULL;
+
+ if (!this || !prog)
+ goto error_return;
+
+ frame = create_frame (this, this->ctx->pool);
+ if (!frame) {
+ gf_log (this->name, GF_LOG_ERROR, "failed to create frame");
+ goto error_return;
+ }
+
+ proc = &prog->proctable[procidx];
+ if (proc->fn)
+ ret = proc->fn (frame, this, arg);
+
+ STACK_DESTROY (frame->root);
+ return ret;
+
+ error_return:
+ return -1;
+}
+
+/**
+*****************************************************
+ Server Interface
+*****************************************************
+*/
+
+struct iobuf *
+__changelog_rpc_serialize_reply (rpcsvc_request_t *req, void *arg,
+ struct iovec *outmsg, xdrproc_t xdrproc)
+{
+ struct iobuf *iob = NULL;
+ ssize_t retlen = 0;
+ ssize_t rsp_size = 0;
+
+ rsp_size = xdr_sizeof (xdrproc, arg);
+ iob = iobuf_get2 (req->svc->ctx->iobuf_pool, rsp_size);
+ if (!iob)
+ goto error_return;
+
+ iobuf_to_iovec (iob, outmsg);
+
+ retlen = xdr_serialize_generic (*outmsg, arg, xdrproc);
+ if (retlen == -1)
+ goto unref_iob;
+
+ outmsg->iov_len = retlen;
+ return iob;
+
+ unref_iob:
+ iobuf_unref (iob);
+ error_return:
+ return NULL;
+}
+
+int
+changelog_rpc_sumbit_reply (rpcsvc_request_t *req,
+ void *arg, struct iovec *payload, int payloadcount,
+ struct iobref *iobref, xdrproc_t xdrproc)
+{
+ int ret = -1;
+ struct iobuf *iob = NULL;
+ struct iovec iov = {0,};
+ char new_iobref = 0;
+
+ if (!req)
+ goto return_ret;
+
+ if (!iobref) {
+ iobref = iobref_new ();
+ if (!iobref)
+ goto return_ret;
+ new_iobref = 1;
+ }
+
+ iob = __changelog_rpc_serialize_reply (req, arg, &iov, xdrproc);
+ if (!iob)
+ gf_log ("", GF_LOG_ERROR, "failed to serialize reply");
+ else
+ iobref_add (iobref, iob);
+
+ ret = rpcsvc_submit_generic (req, &iov,
+ 1, payload, payloadcount, iobref);
+
+ if (new_iobref)
+ iobref_unref (iobref);
+ if (iob)
+ iobuf_unref (iob);
+ return_ret:
+ return ret;
+}
+
+void
+changelog_rpc_server_destroy (xlator_t *this, rpcsvc_t *rpc, char *sockfile,
+ rpcsvc_notify_t fn, struct rpcsvc_program **progs)
+{
+ rpcsvc_listener_t *listener = NULL;
+ rpcsvc_listener_t *next = NULL;
+ struct rpcsvc_program *prog = NULL;
+
+ while (*progs) {
+ prog = *progs;
+ (void) rpcsvc_program_unregister (rpc, prog);
+ }
+
+ list_for_each_entry_safe (listener, next, &rpc->listeners, list) {
+ rpcsvc_listener_destroy (listener);
+ }
+
+ (void) rpcsvc_unregister_notify (rpc, fn, this);
+ unlink (sockfile);
+
+ GF_FREE (rpc);
+}
+
+rpcsvc_t *
+changelog_rpc_server_init (xlator_t *this, char *sockfile, void *cbkdata,
+ rpcsvc_notify_t fn, struct rpcsvc_program **progs)
+{
+ int j = 0;
+ int ret = 0;
+ rpcsvc_t *rpc = NULL;
+ dict_t *options = NULL;
+ struct rpcsvc_program *prog = NULL;
+
+ if (!cbkdata)
+ cbkdata = this;
+
+ options = dict_new ();
+ if (!options)
+ goto error_return;
+
+ ret = rpcsvc_transport_unix_options_build (&options, sockfile);
+ if (ret)
+ goto dealloc_dict;
+
+ rpc = rpcsvc_init (this, this->ctx, options, 8);
+ if (rpc == NULL) {
+ gf_log (this->name, GF_LOG_ERROR, "failed to init rpc");
+ goto dealloc_dict;
+ }
+
+ ret = rpcsvc_register_notify (rpc, fn, cbkdata);
+ if (ret) {
+ gf_log (this->name,
+ GF_LOG_ERROR, "failed to register notify function");
+ goto dealloc_rpc;
+ }
+
+ ret = rpcsvc_create_listeners (rpc, options, this->name);
+ if (ret != 1) {
+ gf_log (this->name,
+ GF_LOG_DEBUG, "failed to create listeners");
+ goto dealloc_rpc;
+ }
+
+ while (*progs) {
+ prog = *progs;
+ ret = rpcsvc_program_register (rpc, prog);
+ if (ret) {
+ gf_log (this->name,
+ GF_LOG_ERROR, "cannot register program "
+ "(name: %s, prognum: %d, pogver: %d)",
+ prog->progname, prog->prognum, prog->progver);
+ goto dealloc_rpc;
+ }
+
+ progs++;
+ }
+
+ dict_unref (options);
+ return rpc;
+
+ dealloc_rpc:
+ GF_FREE (rpc);
+ dealloc_dict:
+ dict_unref (options);
+ error_return:
+ return NULL;
+}
diff --git a/xlators/features/changelog/src/changelog-rpc-common.h b/xlators/features/changelog/src/changelog-rpc-common.h
new file mode 100644
index 00000000000..95c850c9400
--- /dev/null
+++ b/xlators/features/changelog/src/changelog-rpc-common.h
@@ -0,0 +1,84 @@
+/*
+ Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com>
+ This file is part of GlusterFS.
+
+ This file is licensed to you under your choice of the GNU Lesser
+ General Public License, version 3 or any later version (LGPLv3 or
+ later), or the GNU General Public License, version 2 (GPLv2), in all
+ cases as published by the Free Software Foundation.
+*/
+
+#ifndef __CHANGELOG_RPC_COMMON_H
+#define __CHANGELOG_RPC_COMMON_H
+
+#include "rpcsvc.h"
+#include "rpc-clnt.h"
+#include "event.h"
+#include "call-stub.h"
+
+#include "changelog-xdr.h"
+#include "xdr-generic.h"
+
+#include "changelog.h"
+
+/**
+ * Let's keep this non-configurable for now.
+ */
+#define NR_ROTT_BUFFS 4
+#define NR_DISPATCHERS (NR_ROTT_BUFFS - 1)
+
+enum changelog_rpc_procnum {
+ CHANGELOG_RPC_PROC_NULL = 0,
+ CHANGELOG_RPC_PROBE_FILTER = 1,
+ CHANGELOG_RPC_PROC_MAX = 2,
+};
+
+#define CHANGELOG_RPC_PROGNUM 1885957735
+#define CHANGELOG_RPC_PROGVER 1
+
+/**
+ * reverse connection: data xfer path
+ */
+enum changelog_reverse_rpc_procnum {
+ CHANGELOG_REV_PROC_NULL = 0,
+ CHANGELOG_REV_PROC_EVENT = 1,
+ CHANGELOG_REV_PROC_MAX = 2,
+};
+
+#define CHANGELOG_REV_RPC_PROCNUM 1886350951
+#define CHANGELOG_REV_RPC_PROCVER 1
+
+typedef struct changelog_rpc {
+ rpcsvc_t *svc;
+ struct rpc_clnt *rpc;
+ char sock[UNIX_PATH_MAX]; /* tied to server */
+} changelog_rpc_t;
+
+/* event poller */
+void *changelog_rpc_poller (void *);
+
+/* CLIENT API */
+struct rpc_clnt *
+changelog_rpc_client_init (xlator_t *, void *, char *, rpc_clnt_notify_t);
+
+int
+changelog_rpc_sumbit_req (struct rpc_clnt *, void *, call_frame_t *,
+ rpc_clnt_prog_t *, int , struct iovec *, int,
+ struct iobref *, xlator_t *, fop_cbk_fn_t, xdrproc_t);
+
+int
+changelog_invoke_rpc (xlator_t *, struct rpc_clnt *,
+ rpc_clnt_prog_t *, int , void *);
+
+/* SERVER API */
+int
+changelog_rpc_sumbit_reply (rpcsvc_request_t *, void *,
+ struct iovec *, int, struct iobref *, xdrproc_t);
+rpcsvc_t *
+changelog_rpc_server_init (xlator_t *, char *, void*,
+ rpcsvc_notify_t, struct rpcsvc_program **);
+void
+changelog_rpc_server_destroy (xlator_t *, rpcsvc_t *, char *,
+ rpcsvc_notify_t, struct rpcsvc_program **);
+
+#endif
diff --git a/xlators/features/changelog/src/changelog-rpc.c b/xlators/features/changelog/src/changelog-rpc.c
new file mode 100644
index 00000000000..04326456d31
--- /dev/null
+++ b/xlators/features/changelog/src/changelog-rpc.c
@@ -0,0 +1,300 @@
+/*
+ Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com>
+ This file is part of GlusterFS.
+
+ This file is licensed to you under your choice of the GNU Lesser
+ General Public License, version 3 or any later version (LGPLv3 or
+ later), or the GNU General Public License, version 2 (GPLv2), in all
+ cases as published by the Free Software Foundation.
+*/
+
+#include "changelog-rpc.h"
+#include "changelog-mem-types.h"
+#include "changelog-ev-handle.h"
+
+struct rpcsvc_program *changelog_programs[];
+
+static void
+changelog_cleanup_dispatchers (xlator_t *this,
+ changelog_priv_t *priv, int count)
+{
+ for (; count >= 0; count--) {
+ (void) changelog_thread_cleanup
+ (this, priv->ev_dispatcher[count]);
+ }
+}
+
+static int
+changelog_cleanup_rpc_threads (xlator_t *this, changelog_priv_t *priv)
+{
+ int ret = 0;
+ changelog_clnt_t *conn = NULL;
+
+ conn = &priv->connections;
+ if (!conn)
+ return 0;
+
+ /** terminate RPC thread(s) */
+ ret = changelog_thread_cleanup (this, priv->connector);
+ if (ret != 0)
+ goto error_return;
+ /** terminate dispatcher thread(s) */
+ changelog_cleanup_dispatchers (this, priv, priv->nr_dispatchers);
+
+ /* TODO: what about pending and waiting connections? */
+ changelog_ev_cleanup_connections (this, conn);
+
+ /* destroy locks */
+ ret = pthread_mutex_destroy (&conn->pending_lock);
+ if (ret != 0)
+ goto error_return;
+ ret = pthread_cond_destroy (&conn->pending_cond);
+ if (ret != 0)
+ goto error_return;
+ ret = LOCK_DESTROY (&conn->active_lock);
+ if (ret != 0)
+ goto error_return;
+ ret = LOCK_DESTROY (&conn->wait_lock);
+ if (ret != 0)
+ goto error_return;
+ return 0;
+
+ error_return:
+ return -1;
+}
+
+static int
+changelog_init_rpc_threads (xlator_t *this, changelog_priv_t *priv,
+ rbuf_t *rbuf, int nr_dispatchers)
+{
+ int j = 0;
+ int ret = 0;
+ changelog_clnt_t *conn = NULL;
+
+ conn = &priv->connections;
+
+ conn->this = this;
+ conn->rbuf = rbuf;
+ conn->sequence = 1; /* start with sequence number one */
+
+ INIT_LIST_HEAD (&conn->pending);
+ INIT_LIST_HEAD (&conn->active);
+ INIT_LIST_HEAD (&conn->waitq);
+
+ ret = pthread_mutex_init (&conn->pending_lock, NULL);
+ if (ret)
+ goto error_return;
+ ret = pthread_cond_init (&conn->pending_cond, NULL);
+ if (ret)
+ goto cleanup_pending_lock;
+
+ ret = LOCK_INIT (&conn->active_lock);
+ if (ret)
+ goto cleanup_pending_cond;
+ ret = LOCK_INIT (&conn->wait_lock);
+ if (ret)
+ goto cleanup_active_lock;
+
+ /* spawn reverse connection thread */
+ ret = pthread_create (&priv->connector,
+ NULL, changelog_ev_connector, conn);
+ if (ret != 0)
+ goto cleanup_wait_lock;
+
+ /* spawn dispatcher thread(s) */
+ priv->ev_dispatcher = GF_CALLOC (nr_dispatchers, sizeof(pthread_t),
+ gf_changelog_mt_ev_dispatcher_t);
+ if (!priv->ev_dispatcher)
+ goto cleanup_connector;
+
+ /* spawn dispatcher threads */
+ for (; j < nr_dispatchers; j++) {
+ ret = pthread_create (&priv->ev_dispatcher[j],
+ NULL, changelog_ev_dispatch, conn);
+ if (ret != 0) {
+ changelog_cleanup_dispatchers (this, priv, --j);
+ break;
+ }
+ }
+
+ if (ret != 0)
+ goto cleanup_connector;
+
+ priv->nr_dispatchers = nr_dispatchers;
+ return 0;
+
+ cleanup_connector:
+ (void) pthread_cancel (priv->connector);
+ cleanup_wait_lock:
+ (void) LOCK_DESTROY (&conn->wait_lock);
+ cleanup_active_lock:
+ (void) LOCK_DESTROY (&conn->active_lock);
+ cleanup_pending_cond:
+ (void) pthread_cond_destroy (&conn->pending_cond);
+ cleanup_pending_lock:
+ (void) pthread_mutex_destroy (&conn->pending_lock);
+ error_return:
+ return -1;
+}
+
+int
+changelog_rpcsvc_notify (rpcsvc_t *rpc,
+ void *xl, rpcsvc_event_t event, void *data)
+{
+ return 0;
+}
+
+void
+changelog_destroy_rpc_listner (xlator_t *this, changelog_priv_t *priv)
+{
+ char sockfile[UNIX_PATH_MAX] = {0,};
+
+ /* sockfile path could have been saved to avoid this */
+ CHANGELOG_MAKE_SOCKET_PATH (priv->changelog_brick,
+ sockfile, UNIX_PATH_MAX);
+ changelog_rpc_server_destroy (this,
+ priv->rpc, sockfile,
+ changelog_rpcsvc_notify,
+ changelog_programs);
+ (void) changelog_cleanup_rpc_threads (this, priv);
+}
+
+rpcsvc_t *
+changelog_init_rpc_listner (xlator_t *this, changelog_priv_t *priv,
+ rbuf_t *rbuf, int nr_dispatchers)
+{
+ int ret = 0;
+ char sockfile[UNIX_PATH_MAX] = {0,};
+
+ ret = changelog_init_rpc_threads (this, priv, rbuf, nr_dispatchers);
+ if (ret)
+ return NULL;
+
+ CHANGELOG_MAKE_SOCKET_PATH (priv->changelog_brick,
+ sockfile, UNIX_PATH_MAX);
+ return changelog_rpc_server_init (this, sockfile, NULL,
+ changelog_rpcsvc_notify,
+ changelog_programs);
+}
+
+void
+changelog_rpc_clnt_cleanup (changelog_rpc_clnt_t *crpc)
+{
+ if (!crpc)
+ return;
+ crpc->c_clnt = NULL;
+ (void) LOCK_DESTROY (&crpc->lock);
+ GF_FREE (crpc);
+}
+
+inline changelog_rpc_clnt_t *
+changelog_rpc_clnt_init (xlator_t *this,
+ changelog_probe_req *rpc_req, changelog_clnt_t *c_clnt)
+{
+ int ret = 0;
+ changelog_rpc_clnt_t *crpc = NULL;
+
+ crpc = GF_CALLOC (1, sizeof (*crpc), gf_changelog_mt_rpc_clnt_t);
+ if (!crpc)
+ goto error_return;
+ INIT_LIST_HEAD (&crpc->list);
+
+ crpc->ref = 0;
+ changelog_set_disconnect_flag (crpc, _gf_false);
+
+ crpc->filter = rpc_req->filter;
+ (void) memcpy (crpc->sock, rpc_req->sock, strlen (rpc_req->sock));
+
+ crpc->this = this;
+ crpc->c_clnt = c_clnt;
+ crpc->cleanup = changelog_rpc_clnt_cleanup;
+
+ ret = LOCK_INIT (&crpc->lock);
+ if (ret != 0)
+ goto dealloc_crpc;
+ return crpc;
+
+ dealloc_crpc:
+ GF_FREE (crpc);
+ error_return:
+ return NULL;
+}
+
+/**
+ * Actor declarations
+ */
+
+/**
+ * @probe_handler
+ * A probe RPC call spawns a connect back to the caller. Caller also
+ * passes an hint which acts as a filter for selecting updates.
+ */
+
+int
+changelog_handle_probe (rpcsvc_request_t *req)
+{
+ int ret = 0;
+ xlator_t *this = NULL;
+ rpcsvc_t *svc = NULL;
+ changelog_priv_t *priv = NULL;
+ changelog_clnt_t *c_clnt = NULL;
+ changelog_rpc_clnt_t *crpc = NULL;
+
+ changelog_probe_req rpc_req = {0,};
+ changelog_probe_rsp rpc_rsp = {0,};
+
+ ret = xdr_to_generic (req->msg[0],
+ &rpc_req, (xdrproc_t)xdr_changelog_probe_req);
+ if (ret < 0) {
+ gf_log ("", GF_LOG_ERROR, "xdr decoding error");
+ req->rpc_err = GARBAGE_ARGS;
+ goto handle_xdr_error;
+ }
+
+ /* ->xl hidden in rpcsvc */
+ svc = rpcsvc_request_service (req);
+ this = svc->mydata;
+ priv = this->private;
+ c_clnt = &priv->connections;
+
+ crpc = changelog_rpc_clnt_init (this, &rpc_req, c_clnt);
+ if (!crpc)
+ goto handle_xdr_error;
+
+ changelog_ev_queue_connection (c_clnt, crpc);
+ rpc_rsp.op_ret = 0;
+
+ goto submit_rpc;
+
+ handle_xdr_error:
+ rpc_rsp.op_ret = -1;
+ submit_rpc:
+ (void) changelog_rpc_sumbit_reply (req, &rpc_rsp, NULL, 0, NULL,
+ (xdrproc_t)xdr_changelog_probe_rsp);
+ return 0;
+}
+
+/**
+ * RPC declarations
+ */
+
+rpcsvc_actor_t changelog_svc_actors[CHANGELOG_RPC_PROC_MAX] = {
+ [CHANGELOG_RPC_PROBE_FILTER] = {
+ "CHANGELOG PROBE FILTER", CHANGELOG_RPC_PROBE_FILTER,
+ changelog_handle_probe, NULL, 0, DRC_NA
+ },
+};
+
+struct rpcsvc_program changelog_svc_prog = {
+ .progname = CHANGELOG_RPC_PROGNAME,
+ .prognum = CHANGELOG_RPC_PROGNUM,
+ .progver = CHANGELOG_RPC_PROGVER,
+ .numactors = CHANGELOG_RPC_PROC_MAX,
+ .actors = changelog_svc_actors,
+ .synctask = _gf_true,
+};
+
+struct rpcsvc_program *changelog_programs[] = {
+ &changelog_svc_prog,
+ NULL,
+};
diff --git a/xlators/features/changelog/src/changelog-rpc.h b/xlators/features/changelog/src/changelog-rpc.h
new file mode 100644
index 00000000000..0df96684b6c
--- /dev/null
+++ b/xlators/features/changelog/src/changelog-rpc.h
@@ -0,0 +1,29 @@
+/*
+ Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com>
+ This file is part of GlusterFS.
+
+ This file is licensed to you under your choice of the GNU Lesser
+ General Public License, version 3 or any later version (LGPLv3 or
+ later), or the GNU General Public License, version 2 (GPLv2), in all
+ cases as published by the Free Software Foundation.
+*/
+
+#ifndef __CHANGELOG_RPC_H
+#define __CHANGELOG_RPC_H
+
+#include "xlator.h"
+#include "changelog-helpers.h"
+
+/* one time */
+#include "socket.h"
+#include "changelog-rpc-common.h"
+
+#define CHANGELOG_RPC_PROGNAME "GlusterFS Changelog"
+
+rpcsvc_t *
+changelog_init_rpc_listner (xlator_t *, changelog_priv_t *, rbuf_t *, int);
+
+void
+changelog_destroy_rpc_listner (xlator_t *, changelog_priv_t *);
+
+#endif
diff --git a/xlators/features/changelog/src/changelog.c b/xlators/features/changelog/src/changelog.c
index 4263a462ad7..e7d8522ae8c 100644
--- a/xlators/features/changelog/src/changelog.c
+++ b/xlators/features/changelog/src/changelog.c
@@ -19,14 +19,13 @@
#include "iobuf.h"
#include "changelog-rt.h"
-#include "changelog-helpers.h"
#include "changelog-encoders.h"
#include "changelog-mem-types.h"
#include <pthread.h>
-#include "changelog-notifier.h"
+#include "changelog-rpc.h"
static struct changelog_bootstrap
cb_bootstrap[] = {
@@ -912,14 +911,30 @@ changelog_create_cbk (call_frame_t *frame,
struct iatt *preparent,
struct iatt *postparent, dict_t *xdata)
{
+ int32_t ret = 0;
changelog_priv_t *priv = NULL;
changelog_local_t *local = NULL;
+ changelog_event_t ev = {0,};
priv = this->private;
local = frame->local;
CHANGELOG_COND_GOTO (priv, ((op_ret < 0) || !local), unwind);
+ /* fill the event structure.. similar to open() */
+ ev.ev_type = CHANGELOG_OP_TYPE_CREATE;
+ uuid_copy (ev.u.create.gfid, buf->ia_gfid);
+ ev.u.create.flags = fd->flags;
+ changelog_dispatch_event (this, priv, &ev);
+
+ if (changelog_ev_selected
+ (this, &priv->ev_selection, CHANGELOG_OP_TYPE_RELEASE)) {
+ ret = fd_ctx_set (fd, this, (uint64_t)(long) 0x1);
+ if (ret)
+ gf_log (this->name, GF_LOG_WARNING,
+ "could not set fd context (for release cbk)");
+ }
+
changelog_update (this, priv, local, CHANGELOG_TYPE_ENTRY);
unwind:
@@ -1633,6 +1648,92 @@ changelog_writev (call_frame_t *frame,
/* }}} */
+/* open, release and other beasts */
+
+/* {{{ */
+
+
+
+int
+changelog_open_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
+ int op_ret, int op_errno, fd_t *fd, dict_t *xdata)
+{
+ int ret = 0;
+ void *opaque = NULL;
+ char *buf = NULL;
+ ssize_t buflen = 0;
+ changelog_priv_t *priv = NULL;
+ changelog_event_t ev = {0,};
+ gf_boolean_t logopen = _gf_false;
+
+ priv = this->private;
+ if (frame->local) {
+ frame->local = NULL;
+ logopen = _gf_true;
+ }
+
+ CHANGELOG_COND_GOTO (priv, ((op_ret < 0) || !logopen), unwind);
+
+ /* fill the event structure */
+ ev.ev_type = CHANGELOG_OP_TYPE_OPEN;
+ uuid_copy (ev.u.open.gfid, fd->inode->gfid);
+ ev.u.open.flags = fd->flags;
+ changelog_dispatch_event (this, priv, &ev);
+
+ if (changelog_ev_selected
+ (this, &priv->ev_selection, CHANGELOG_OP_TYPE_RELEASE)) {
+ ret = fd_ctx_set (fd, this, (uint64_t)(long) 0x1);
+ if (ret)
+ gf_log (this->name, GF_LOG_WARNING,
+ "could not set fd context (for release cbk)");
+ }
+
+ unwind:
+ CHANGELOG_STACK_UNWIND (open, frame, op_ret, op_errno, fd, xdata);
+ return 0;
+}
+
+int
+changelog_open (call_frame_t *frame, xlator_t *this,
+ loc_t *loc, int flags, fd_t *fd, dict_t *xdata)
+{
+ changelog_priv_t *priv = NULL;
+
+ priv = this->private;
+ CHANGELOG_NOT_ACTIVE_THEN_GOTO (frame, priv, wind);
+
+ frame->local = (void *)0x1; /* do not dereference in ->cbk */
+
+ wind:
+ STACK_WIND (frame, changelog_open_cbk, FIRST_CHILD (this),
+ FIRST_CHILD (this)->fops->open, loc, flags, fd, xdata);
+ return 0;
+}
+
+/* }}} */
+
+/* {{{ */
+
+int32_t
+changelog_release (xlator_t *this, fd_t *fd)
+{
+ changelog_event_t ev = {0,};
+ changelog_priv_t *priv = NULL;
+
+ priv = this->private;
+
+ ev.ev_type = CHANGELOG_OP_TYPE_RELEASE;
+ uuid_copy (ev.u.release.gfid, fd->inode->gfid);
+ changelog_dispatch_event (this, priv, &ev);
+
+ (void) fd_ctx_del (fd, this, NULL);
+
+ return 0;
+}
+
+
+/* }}} */
+
/**
* The
* - @init ()
@@ -1679,7 +1780,7 @@ changelog_cleanup_helper_threads (xlator_t *this, changelog_priv_t *priv)
int ret = 0;
if (priv->cr.rollover_th) {
- changelog_thread_cleanup (this, priv->cr.rollover_th);
+ (void) changelog_thread_cleanup (this, priv->cr.rollover_th);
priv->cr.rollover_th = 0;
ret = close (priv->cr_wfd);
if (ret)
@@ -1689,7 +1790,7 @@ changelog_cleanup_helper_threads (xlator_t *this, changelog_priv_t *priv)
}
if (priv->cf.fsync_th) {
- changelog_thread_cleanup (this, priv->cf.fsync_th);
+ (void) changelog_thread_cleanup (this, priv->cf.fsync_th);
priv->cf.fsync_th = 0;
}
}
@@ -1754,67 +1855,6 @@ changelog_spawn_helper_threads (xlator_t *this, changelog_priv_t *priv)
return ret;
}
-/* cleanup the notifier thread */
-static int
-changelog_cleanup_notifier (xlator_t *this, changelog_priv_t *priv)
-{
- int ret = 0;
-
- if (priv->cn.notify_th) {
- changelog_thread_cleanup (this, priv->cn.notify_th);
- priv->cn.notify_th = 0;
-
- ret = close (priv->wfd);
- if (ret)
- gf_log (this->name, GF_LOG_ERROR,
- "error closing writer end of notifier pipe"
- " (reason: %s)", strerror (errno));
- }
-
- return ret;
-}
-
-/* spawn the notifier thread - nop if already running */
-static int
-changelog_spawn_notifier (xlator_t *this, changelog_priv_t *priv)
-{
- int ret = 0;
- int flags = 0;
- int pipe_fd[2] = {0, 0};
-
- if (priv->cn.notify_th)
- goto out; /* notifier thread already running */
-
- ret = pipe (pipe_fd);
- if (ret == -1) {
- gf_log (this->name, GF_LOG_ERROR,
- "Cannot create pipe (reason: %s)", strerror (errno));
- goto out;
- }
-
- /* writer is non-blocking */
- flags = fcntl (pipe_fd[1], F_GETFL);
- flags |= O_NONBLOCK;
-
- ret = fcntl (pipe_fd[1], F_SETFL, flags);
- if (ret) {
- gf_log (this->name, GF_LOG_ERROR,
- "failed to set O_NONBLOCK flag");
- goto out;
- }
-
- priv->wfd = pipe_fd[1];
-
- priv->cn.this = this;
- priv->cn.rfd = pipe_fd[0];
-
- ret = gf_thread_create (&priv->cn.notify_th,
- NULL, changelog_notifier, priv);
-
- out:
- return ret;
-}
-
int
notify (xlator_t *this, int event, void *data, ...)
{
@@ -2054,11 +2094,6 @@ changelog_init (xlator_t *this, changelog_priv_t *priv)
if (!priv->active)
return ret;
- /* spawn the notifier thread */
- ret = changelog_spawn_notifier (this, priv);
- if (ret)
- goto out;
-
/**
* start with a fresh changelog file every time. this is done
* in case there was an encoding change. so... things are kept
@@ -2086,9 +2121,11 @@ changelog_init (xlator_t *this, changelog_priv_t *priv)
return ret;
}
-/* Init all pthread condition variables and locks in changelog*/
+/**
+ * Init barrier related condition variables and locks
+ */
static int
-changelog_pthread_init (xlator_t *this, changelog_priv_t *priv)
+changelog_barrier_pthread_init (xlator_t *this, changelog_priv_t *priv)
{
gf_boolean_t bn_mutex_init = _gf_false;
gf_boolean_t bn_cond_init = _gf_false;
@@ -2165,9 +2202,9 @@ changelog_pthread_init (xlator_t *this, changelog_priv_t *priv)
return ret;
}
-/* Destroy all pthread condition variables and locks in changelog */
+/* Destroy barrier related condition variables and locks */
static inline void
-changelog_pthread_destroy (changelog_priv_t *priv)
+changelog_barrier_pthread_destroy (changelog_priv_t *priv)
{
pthread_mutex_destroy (&priv->bn.bnotify_mutex);
pthread_cond_destroy (&priv->bn.bnotify_cond);
@@ -2284,17 +2321,13 @@ reconfigure (xlator_t *this, dict_t *options)
}
htime_open(this, priv, tv.tv_sec);
}
- ret = changelog_spawn_notifier (this, priv);
- if (!ret)
- ret = changelog_spawn_helper_threads (this,
- priv);
- } else
- ret = changelog_cleanup_notifier (this, priv);
+ ret = changelog_spawn_helper_threads (this, priv);
+ }
}
out:
if (ret) {
- ret = changelog_cleanup_notifier (this, priv);
+ /* TODO */
} else {
gf_log (this->name, GF_LOG_DEBUG,
"changelog reconfigured");
@@ -2305,67 +2338,40 @@ reconfigure (xlator_t *this, dict_t *options)
return ret;
}
-int32_t
-init (xlator_t *this)
+static void
+changelog_freeup_options (xlator_t *this, changelog_priv_t *priv)
{
- int ret = -1;
- char *tmp = NULL;
- changelog_priv_t *priv = NULL;
- gf_boolean_t cond_lock_init = _gf_false;
- char htime_dir[PATH_MAX] = {0,};
- char csnap_dir[PATH_MAX] = {0,};
- uint32_t timeout = 0;
-
- GF_VALIDATE_OR_GOTO ("changelog", this, out);
-
- if (!this->children || this->children->next) {
- gf_log (this->name, GF_LOG_ERROR,
- "translator needs a single subvolume");
- goto out;
- }
-
- if (!this->parents) {
- gf_log (this->name, GF_LOG_ERROR,
- "dangling volume. please check volfile");
- goto out;
- }
-
- priv = GF_CALLOC (1, sizeof (*priv), gf_changelog_mt_priv_t);
- if (!priv)
- goto out;
+ int ret = 0;
- this->local_pool = mem_pool_new (changelog_local_t, 64);
- if (!this->local_pool) {
+ ret = priv->cb->dtor (this, &priv->cd);
+ if (ret)
gf_log (this->name, GF_LOG_ERROR,
- "failed to create local memory pool");
- goto out;
- }
-
- LOCK_INIT (&priv->lock);
- LOCK_INIT (&priv->c_snap_lock);
+ "could not cleanup bootstrapper");
+ GF_FREE (priv->changelog_brick);
+ GF_FREE (priv->changelog_dir);
+}
- GF_OPTION_INIT ("changelog-brick", tmp, str, out);
- if (!tmp) {
- gf_log (this->name, GF_LOG_ERROR,
- "\"changelog-brick\" option is not set");
- goto out;
- }
+static int
+changelog_init_options (xlator_t *this, changelog_priv_t *priv)
+{
+ int ret = 0;
+ char *tmp = NULL;
+ uint32_t timeout = 0;
+ char htime_dir[PATH_MAX] = {0,};
+ char csnap_dir[PATH_MAX] = {0,};
+ GF_OPTION_INIT ("changelog-brick", tmp, str, error_return);
priv->changelog_brick = gf_strdup (tmp);
if (!priv->changelog_brick)
- goto out;
- tmp = NULL;
+ goto error_return;
- GF_OPTION_INIT ("changelog-dir", tmp, str, out);
- if (!tmp) {
- gf_log (this->name, GF_LOG_ERROR,
- "\"changelog-dir\" option is not set");
- goto out;
- }
+ tmp = NULL;
+ GF_OPTION_INIT ("changelog-dir", tmp, str, dealloc_1);
priv->changelog_dir = gf_strdup (tmp);
if (!priv->changelog_dir)
- goto out;
+ goto dealloc_1;
+
tmp = NULL;
/**
@@ -2375,35 +2381,38 @@ init (xlator_t *this)
ret = mkdir_p (priv->changelog_dir, 0600, _gf_true);
if (ret)
- goto out;
+ goto dealloc_2;
- CHANGELOG_FILL_HTIME_DIR(priv->changelog_dir, htime_dir);
+ CHANGELOG_FILL_HTIME_DIR (priv->changelog_dir, htime_dir);
ret = mkdir_p (htime_dir, 0600, _gf_true);
if (ret)
- goto out;
+ goto dealloc_2;
- CHANGELOG_FILL_CSNAP_DIR(priv->changelog_dir, csnap_dir);
+ CHANGELOG_FILL_CSNAP_DIR (priv->changelog_dir, csnap_dir);
ret = mkdir_p (csnap_dir, 0600, _gf_true);
if (ret)
- goto out;
+ goto dealloc_2;
- GF_OPTION_INIT ("changelog", priv->active, bool, out);
+ GF_OPTION_INIT ("changelog", priv->active, bool, dealloc_2);
- GF_OPTION_INIT ("op-mode", tmp, str, out);
+ GF_OPTION_INIT ("op-mode", tmp, str, dealloc_2);
changelog_assign_opmode (priv, tmp);
tmp = NULL;
- GF_OPTION_INIT ("encoding", tmp, str, out);
+ GF_OPTION_INIT ("encoding", tmp, str, dealloc_2);
changelog_assign_encoding (priv, tmp);
+ changelog_encode_change (priv);
- GF_OPTION_INIT ("rollover-time", priv->rollover_time, int32, out);
+ GF_OPTION_INIT ("rollover-time",
+ priv->rollover_time, int32, dealloc_2);
- GF_OPTION_INIT ("fsync-interval", priv->fsync_interval, int32, out);
- GF_OPTION_INIT ("changelog-barrier-timeout", timeout, time, out);
- priv->timeout.tv_sec = timeout;
+ GF_OPTION_INIT ("fsync-interval",
+ priv->fsync_interval, int32, dealloc_2);
- changelog_encode_change(priv);
+ GF_OPTION_INIT ("changelog-barrier-timeout",
+ timeout, time, dealloc_2);
+ changelog_assign_barrier_timeout (priv, timeout);
GF_ASSERT (cb_bootstrap[priv->op_mode].mode == priv->op_mode);
priv->cb = &cb_bootstrap[priv->op_mode];
@@ -2411,10 +2420,111 @@ init (xlator_t *this)
/* ... now bootstrap the logger */
ret = priv->cb->ctor (this, &priv->cd);
if (ret)
- goto out;
+ goto dealloc_2;
priv->changelog_fd = -1;
+ return 0;
+
+ dealloc_2:
+ GF_FREE (priv->changelog_dir);
+ dealloc_1:
+ GF_FREE (priv->changelog_brick);
+ error_return:
+ return -1;
+}
+
+static void
+changelog_cleanup_rpc (xlator_t *this, changelog_priv_t *priv)
+{
+ /* terminate rpc server */
+ changelog_destroy_rpc_listner (this, priv);
+
+ /* cleanup rot buffs */
+ rbuf_dtor (priv->rbuf);
+
+ /* cleanup poller thread */
+ (void) changelog_thread_cleanup (this, priv->poller);
+}
+
+static int
+changelog_init_rpc (xlator_t *this, changelog_priv_t *priv)
+{
+ int ret = 0;
+ rpcsvc_t *rpc = NULL;
+ changelog_ev_selector_t *selection = NULL;
+
+ selection = &priv->ev_selection;
+
+ /* initialize event selection */
+ changelog_init_event_selection (this, selection);
+
+ ret = pthread_create (&priv->poller, NULL, changelog_rpc_poller, this);
+ if (ret != 0) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "failed to spawn poller thread");
+ goto error_return;
+ }
+
+ priv->rbuf = rbuf_init (NR_ROTT_BUFFS);
+ if (!priv->rbuf)
+ goto cleanup_thread;
+
+ rpc = changelog_init_rpc_listner (this, priv,
+ priv->rbuf, NR_DISPATCHERS);
+ if (!rpc)
+ goto cleanup_rbuf;
+ priv->rpc = rpc;
+
+ return 0;
+
+ cleanup_rbuf:
+ rbuf_dtor (priv->rbuf);
+ cleanup_thread:
+ (void) changelog_thread_cleanup (this, priv->poller);
+ error_return:
+ return -1;
+}
+
+int32_t
+init (xlator_t *this)
+{
+ int ret = -1;
+ char *tmp = NULL;
+ changelog_priv_t *priv = NULL;
+
+ GF_VALIDATE_OR_GOTO ("changelog", this, error_return);
+
+ if (!this->children || this->children->next) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "translator needs a single subvolume");
+ goto error_return;
+ }
+
+ if (!this->parents) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "dangling volume. please check volfile");
+ goto error_return;
+ }
+
+ priv = GF_CALLOC (1, sizeof (*priv), gf_changelog_mt_priv_t);
+ if (!priv)
+ goto error_return;
+
+ this->local_pool = mem_pool_new (changelog_local_t, 64);
+ if (!this->local_pool) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "failed to create local memory pool");
+ goto cleanup_priv;
+ }
+
+ LOCK_INIT (&priv->lock);
+ LOCK_INIT (&priv->c_snap_lock);
+
+ ret = changelog_init_options (this, priv);
+ if (ret)
+ goto cleanup_mempool;
+
/* snap dependency changes */
priv->dm.black_fop_cnt = 0;
priv->dm.white_fop_cnt = 0;
@@ -2422,67 +2532,68 @@ init (xlator_t *this)
priv->dm.drain_wait_white = _gf_false;
priv->current_color = FOP_COLOR_BLACK;
priv->explicit_rollover = _gf_false;
+
/* Mutex is not needed as threads are not spawned yet */
priv->bn.bnotify = _gf_false;
- ret = changelog_pthread_init (this, priv);
+ ret = changelog_barrier_pthread_init (this, priv);
if (ret)
- goto out;
-
+ goto cleanup_options;
LOCK_INIT (&priv->bflags.lock);
- cond_lock_init = _gf_true;
priv->bflags.barrier_ext = _gf_false;
/* Changelog barrier init */
INIT_LIST_HEAD (&priv->queue);
priv->barrier_enabled = _gf_false;
- ret = changelog_init (this, priv);
+ /* RPC ball rolling.. */
+ ret = changelog_init_rpc (this, priv);
if (ret)
- goto out;
+ goto cleanup_barrier;
+ ret = changelog_init (this, priv);
+ if (ret)
+ goto cleanup_rpc;
gf_log (this->name, GF_LOG_DEBUG, "changelog translator loaded");
- out:
- if (ret) {
- if (this && this->local_pool)
- mem_pool_destroy (this->local_pool);
- if (priv) {
- if (priv->cb) {
- ret = priv->cb->dtor (this, &priv->cd);
- if (ret)
- gf_log (this->name, GF_LOG_ERROR,
- "error in cleanup during init()");
- }
- GF_FREE (priv->changelog_brick);
- GF_FREE (priv->changelog_dir);
- if (cond_lock_init)
- changelog_pthread_destroy (priv);
- GF_FREE (priv);
- }
- this->private = NULL;
- } else
- this->private = priv;
+ this->private = priv;
+ return 0;
- return ret;
+ cleanup_rpc:
+ changelog_cleanup_rpc (this, priv);
+ cleanup_barrier:
+ changelog_barrier_pthread_destroy (priv);
+ cleanup_options:
+ changelog_freeup_options (this, priv);
+ cleanup_mempool:
+ mem_pool_destroy (this->local_pool);
+ cleanup_priv:
+ GF_FREE (priv);
+ error_return:
+ this->private = NULL;
+ return -1;
}
void
fini (xlator_t *this)
{
- int ret = -1;
changelog_priv_t *priv = NULL;
priv = this->private;
if (priv) {
- ret = priv->cb->dtor (this, &priv->cd);
- if (ret)
- gf_log (this->name, GF_LOG_ERROR,
- "error in fini");
+ /* terminate RPC server/threads */
+ changelog_cleanup_rpc (this, priv);
+
+ /* cleanup barrier related objects */
+ changelog_barrier_pthread_destroy (priv);
+
+ /* cleanup allocated options */
+ changelog_freeup_options (this, priv);
+
+ /* deallocate mempool */
mem_pool_destroy (this->local_pool);
- GF_FREE (priv->changelog_brick);
- GF_FREE (priv->changelog_dir);
- changelog_pthread_destroy (priv);
+
+ /* finally, dealloac private variable */
GF_FREE (priv);
}
@@ -2492,6 +2603,7 @@ fini (xlator_t *this)
}
struct xlator_fops fops = {
+ .open = changelog_open,
.mknod = changelog_mknod,
.mkdir = changelog_mkdir,
.create = changelog_create,
@@ -2513,6 +2625,7 @@ struct xlator_fops fops = {
struct xlator_cbks cbks = {
.forget = changelog_forget,
+ .release = changelog_release,
};
struct volume_options options[] = {