summaryrefslogtreecommitdiffstats
path: root/xlators/features/changelog
diff options
context:
space:
mode:
Diffstat (limited to 'xlators/features/changelog')
-rw-r--r--xlators/features/changelog/Makefile.am3
-rw-r--r--xlators/features/changelog/lib/Makefile.am3
-rw-r--r--xlators/features/changelog/lib/examples/c/get-changes.c87
-rw-r--r--xlators/features/changelog/lib/examples/python/changes.py32
-rw-r--r--xlators/features/changelog/lib/examples/python/libgfchangelog.py64
-rw-r--r--xlators/features/changelog/lib/src/Makefile.am37
-rw-r--r--xlators/features/changelog/lib/src/changelog.h31
-rw-r--r--xlators/features/changelog/lib/src/gf-changelog-helpers.c180
-rw-r--r--xlators/features/changelog/lib/src/gf-changelog-helpers.h97
-rw-r--r--xlators/features/changelog/lib/src/gf-changelog-process.c571
-rw-r--r--xlators/features/changelog/lib/src/gf-changelog.c514
-rw-r--r--xlators/features/changelog/src/Makefile.am19
-rw-r--r--xlators/features/changelog/src/changelog-encoders.c156
-rw-r--r--xlators/features/changelog/src/changelog-encoders.h44
-rw-r--r--xlators/features/changelog/src/changelog-helpers.c691
-rw-r--r--xlators/features/changelog/src/changelog-helpers.h386
-rw-r--r--xlators/features/changelog/src/changelog-mem-types.h29
-rw-r--r--xlators/features/changelog/src/changelog-misc.h101
-rw-r--r--xlators/features/changelog/src/changelog-notifier.c314
-rw-r--r--xlators/features/changelog/src/changelog-notifier.h19
-rw-r--r--xlators/features/changelog/src/changelog-rt.c72
-rw-r--r--xlators/features/changelog/src/changelog-rt.h33
-rw-r--r--xlators/features/changelog/src/changelog.c1487
23 files changed, 4970 insertions, 0 deletions
diff --git a/xlators/features/changelog/Makefile.am b/xlators/features/changelog/Makefile.am
new file mode 100644
index 00000000000..153bb685076
--- /dev/null
+++ b/xlators/features/changelog/Makefile.am
@@ -0,0 +1,3 @@
+SUBDIRS = src lib
+
+CLEANFILES =
diff --git a/xlators/features/changelog/lib/Makefile.am b/xlators/features/changelog/lib/Makefile.am
new file mode 100644
index 00000000000..a985f42a877
--- /dev/null
+++ b/xlators/features/changelog/lib/Makefile.am
@@ -0,0 +1,3 @@
+SUBDIRS = src
+
+CLEANFILES =
diff --git a/xlators/features/changelog/lib/examples/c/get-changes.c b/xlators/features/changelog/lib/examples/c/get-changes.c
new file mode 100644
index 00000000000..14562585aa9
--- /dev/null
+++ b/xlators/features/changelog/lib/examples/c/get-changes.c
@@ -0,0 +1,87 @@
+/*
+ Copyright (c) 2013 Red Hat, Inc. <http://www.redhat.com>
+ This file is part of GlusterFS.
+
+ This file is licensed to you under your choice of the GNU Lesser
+ General Public License, version 3 or any later version (LGPLv3 or
+ later), or the GNU General Public License, version 2 (GPLv2), in all
+ cases as published by the Free Software Foundation.
+*/
+
+/**
+ * get set of new changes every 10 seconds (just print the file names)
+ *
+ * Compile it using:
+ * gcc -o getchanges `pkg-config --cflags libgfchangelog` get-changes.c \
+ * `pkg-config --libs libgfchangelog`
+ */
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <sys/un.h>
+#include <limits.h>
+#include <sys/socket.h>
+#include <sys/types.h>
+#include <errno.h>
+
+#include "changelog.h"
+
+#define handle_error(fn) \
+ printf ("%s (reason: %s)\n", fn, strerror (errno))
+
+int
+main (int argc, char ** argv)
+{
+ int i = 0;
+ int ret = 0;
+ ssize_t nr_changes = 0;
+ ssize_t changes = 0;
+ char fbuf[PATH_MAX] = {0,};
+
+ /* get changes for brick "/home/vshankar/export/yow/yow-1" */
+ ret = gf_changelog_register ("/home/vshankar/export/yow/yow-1",
+ "/tmp/scratch", "/tmp/change.log", 9, 5);
+ if (ret) {
+ handle_error ("register failed");
+ goto out;
+ }
+
+ while (1) {
+ i = 0;
+ nr_changes = gf_changelog_scan ();
+ if (nr_changes < 0) {
+ handle_error ("scan(): ");
+ break;
+ }
+
+ if (nr_changes == 0)
+ goto next;
+
+ printf ("Got %ld changelog files\n", nr_changes);
+
+ while ( (changes =
+ gf_changelog_next_change (fbuf, PATH_MAX)) > 0) {
+ printf ("changelog file [%d]: %s\n", ++i, fbuf);
+
+ /* process changelog */
+ /* ... */
+ /* ... */
+ /* ... */
+ /* done processing */
+
+ ret = gf_changelog_done (fbuf);
+ if (ret)
+ handle_error ("gf_changelog_done");
+ }
+
+ if (changes == -1)
+ handle_error ("gf_changelog_next_change");
+
+ next:
+ sleep (10);
+ }
+
+ out:
+ return ret;
+}
diff --git a/xlators/features/changelog/lib/examples/python/changes.py b/xlators/features/changelog/lib/examples/python/changes.py
new file mode 100644
index 00000000000..d21db8eab2e
--- /dev/null
+++ b/xlators/features/changelog/lib/examples/python/changes.py
@@ -0,0 +1,32 @@
+#!/usr/bin/python
+
+import os
+import sys
+import time
+import libgfchangelog
+
+cl = libgfchangelog.Changes()
+
+def get_changes(brick, scratch_dir, log_file, log_level, interval):
+ change_list = []
+ try:
+ cl.cl_register(brick, scratch_dir, log_file, log_level)
+ while True:
+ cl.cl_scan()
+ change_list = cl.cl_getchanges()
+ if change_list:
+ print change_list
+ for change in change_list:
+ print('done with %s' % (change))
+ cl.cl_done(change)
+ time.sleep(interval)
+ except OSError:
+ ex = sys.exc_info()[1]
+ print ex
+
+if __name__ == '__main__':
+ if len(sys.argv) != 5:
+ print("usage: %s <brick> <scratch-dir> <log-file> <fetch-interval>"
+ % (sys.argv[0]))
+ sys.exit(1)
+ get_changes(sys.argv[1], sys.argv[2], sys.argv[3], 9, int(sys.argv[4]))
diff --git a/xlators/features/changelog/lib/examples/python/libgfchangelog.py b/xlators/features/changelog/lib/examples/python/libgfchangelog.py
new file mode 100644
index 00000000000..68ec3baf144
--- /dev/null
+++ b/xlators/features/changelog/lib/examples/python/libgfchangelog.py
@@ -0,0 +1,64 @@
+import os
+from ctypes import *
+from ctypes.util import find_library
+
+class Changes(object):
+ libgfc = CDLL(find_library("gfchangelog"), use_errno=True)
+
+ @classmethod
+ def geterrno(cls):
+ return get_errno()
+
+ @classmethod
+ def raise_oserr(cls):
+ errn = cls.geterrno()
+ raise OSError(errn, os.strerror(errn))
+
+ @classmethod
+ def _get_api(cls, call):
+ return getattr(cls.libgfc, call)
+
+ @classmethod
+ def cl_register(cls, brick, path, log_file, log_level, retries = 0):
+ ret = cls._get_api('gf_changelog_register')(brick, path,
+ log_file, log_level, retries)
+ if ret == -1:
+ cls.raise_oserr()
+
+ @classmethod
+ def cl_scan(cls):
+ ret = cls._get_api('gf_changelog_scan')()
+ if ret == -1:
+ cls.raise_oserr()
+
+ @classmethod
+ def cl_startfresh(cls):
+ ret = cls._get_api('gf_changelog_start_fresh')()
+ if ret == -1:
+ cls.raise_oserr()
+
+ @classmethod
+ def cl_getchanges(cls):
+ """ remove hardcoding for path name length """
+ def clsort(f):
+ return f.split('.')[-1]
+ changes = []
+ buf = create_string_buffer('\0', 4096)
+ call = cls._get_api('gf_changelog_next_change')
+
+ while True:
+ ret = call(buf, 4096)
+ if ret in (0, -1):
+ break;
+ changes.append(buf.raw[:ret-1])
+ if ret == -1:
+ cls.raise_oserr()
+ # cleanup tracker
+ cls.cl_startfresh()
+ return sorted(changes, key=clsort)
+
+ @classmethod
+ def cl_done(cls, clfile):
+ ret = cls._get_api('gf_changelog_done')(clfile)
+ if ret == -1:
+ cls.raise_oserr()
diff --git a/xlators/features/changelog/lib/src/Makefile.am b/xlators/features/changelog/lib/src/Makefile.am
new file mode 100644
index 00000000000..fbaaea628b7
--- /dev/null
+++ b/xlators/features/changelog/lib/src/Makefile.am
@@ -0,0 +1,37 @@
+libgfchangelog_la_CFLAGS = -Wall $(GF_CFLAGS) $(GF_DARWIN_LIBGLUSTERFS_CFLAGS) \
+ -DDATADIR=\"$(localstatedir)\"
+
+libgfchangelog_la_CPPFLAGS = $(GF_CPPFLAGS) -D__USE_FILE_OFFSET64 -fpic \
+ -I../../../src/ -I$(top_srcdir)/libglusterfs/src \
+ -I$(top_srcdir)/xlators/features/changelog/src \
+ -DDATADIR=\"$(localstatedir)\"
+
+libgfchangelog_la_LIBADD = $(top_builddir)/libglusterfs/src/libglusterfs.la \
+ $(GF_GLUSTERFS_LIBS)
+
+libgfchangelog_la_LDFLAGS = $(GF_LDFLAGS)
+
+libgfchangelogdir = $(includedir)/glusterfs/gfchangelog
+lib_LTLIBRARIES = libgfchangelog.la
+
+CONTRIB_BUILDDIR = $(top_builddir)/contrib
+
+libgfchangelog_la_SOURCES = gf-changelog.c gf-changelog-process.c \
+ gf-changelog-helpers.c $(CONTRIBDIR)/uuid/clear.c \
+ $(CONTRIBDIR)/uuid/copy.c $(CONTRIBDIR)/uuid/gen_uuid.c \
+ $(CONTRIBDIR)/uuid/pack.c $(CONTRIBDIR)/uuid/parse.c \
+ $(CONTRIBDIR)/uuid/unparse.c $(CONTRIBDIR)/uuid/uuid_time.c \
+ $(CONTRIBDIR)/uuid/compare.c $(CONTRIBDIR)/uuid/isnull.c \
+ $(CONTRIBDIR)/uuid/unpack.c
+
+noinst_HEADERS = gf-changelog-helpers.h $(CONTRIBDIR)/uuid/uuidd.h \
+ $(CONTRIBDIR)/uuid/uuid.h $(CONTRIBDIR)/uuid/uuidP.h \
+ $(CONTRIB_BUILDDIR)/uuid/uuid_types.h
+
+libgfchangelog_HEADERS = changelog.h
+
+CLEANFILES =
+CONFIG_CLEAN_FILES = $(CONTRIB_BUILDDIR)/uuid/uuid_types.h
+
+$(top_builddir)/libglusterfs/src/libglusterfs.la:
+ $(MAKE) -C $(top_builddir)/libglusterfs/src/ all
diff --git a/xlators/features/changelog/lib/src/changelog.h b/xlators/features/changelog/lib/src/changelog.h
new file mode 100644
index 00000000000..5cddfb5839c
--- /dev/null
+++ b/xlators/features/changelog/lib/src/changelog.h
@@ -0,0 +1,31 @@
+/*
+ Copyright (c) 2013 Red Hat, Inc. <http://www.redhat.com>
+ This file is part of GlusterFS.
+
+ This file is licensed to you under your choice of the GNU Lesser
+ General Public License, version 3 or any later version (LGPLv3 or
+ later), or the GNU General Public License, version 2 (GPLv2), in all
+ cases as published by the Free Software Foundation.
+*/
+
+#ifndef _GF_CHANGELOG_H
+#define _GF_CHANGELOG_H
+
+/* API set */
+
+int
+gf_changelog_register (char *brick_path, char *scratch_dir,
+ char *log_file, int log_levl, int max_reconnects);
+ssize_t
+gf_changelog_scan ();
+
+int
+gf_changelog_start_fresh ();
+
+ssize_t
+gf_changelog_next_change (char *bufptr, size_t maxlen);
+
+int
+gf_changelog_done (char *file);
+
+#endif
diff --git a/xlators/features/changelog/lib/src/gf-changelog-helpers.c b/xlators/features/changelog/lib/src/gf-changelog-helpers.c
new file mode 100644
index 00000000000..1eef8bf0479
--- /dev/null
+++ b/xlators/features/changelog/lib/src/gf-changelog-helpers.c
@@ -0,0 +1,180 @@
+/*
+ Copyright (c) 2013 Red Hat, Inc. <http://www.redhat.com>
+ This file is part of GlusterFS.
+
+ This file is licensed to you under your choice of the GNU Lesser
+ General Public License, version 3 or any later version (LGPLv3 or
+ later), or the GNU General Public License, version 2 (GPLv2), in all
+ cases as published by the Free Software Foundation.
+*/
+
+#include "changelog-mem-types.h"
+#include "gf-changelog-helpers.h"
+
+ssize_t gf_changelog_read_path (int fd, char *buffer, size_t bufsize)
+{
+ return read (fd, buffer, bufsize);
+}
+
+size_t
+gf_changelog_write (int fd, char *buffer, size_t len)
+{
+ ssize_t size = 0;
+ size_t writen = 0;
+
+ while (writen < len) {
+ size = write (fd,
+ buffer + writen, len - writen);
+ if (size <= 0)
+ break;
+
+ writen += size;
+ }
+
+ return writen;
+}
+
+void
+gf_rfc3986_encode (unsigned char *s, char *enc, char *estr)
+{
+ for (; *s; s++) {
+ if (estr[*s])
+ sprintf(enc, "%c", estr[*s]);
+ else
+ sprintf(enc, "%%%02X", *s);
+ while (*++enc);
+ }
+}
+
+/**
+ * thread safe version of readline with buffering
+ * (taken from Unix Network Programming Volume I, W.R. Stevens)
+ *
+ * This is favoured over fgets() as we'd need to ftruncate()
+ * (see gf_changelog_scan() API) to record new changelog files.
+ * stream open functions does have a truncate like api (although
+ * that can be done via @fflush(fp), @ftruncate(fd) and @fseek(fp),
+ * but this involves mixing POSIX file descriptors and stream FILE *).
+ *
+ * NOTE: This implmentation still does work with more than one fd's
+ * used to perform gf_readline(). For this very reason it's not
+ * made a part of libglusterfs.
+ */
+
+static pthread_key_t rl_key;
+static pthread_once_t rl_once = PTHREAD_ONCE_INIT;
+
+static void
+readline_destructor (void *ptr)
+{
+ GF_FREE (ptr);
+}
+
+static void
+readline_once (void)
+{
+ pthread_key_create (&rl_key, readline_destructor);
+}
+
+static ssize_t
+my_read (read_line_t *tsd, int fd, char *ptr)
+{
+ if (tsd->rl_cnt <= 0) {
+ if ( (tsd->rl_cnt = read (fd, tsd->rl_buf, MAXLINE)) < 0 )
+ return -1;
+ else if (tsd->rl_cnt == 0)
+ return 0;
+ tsd->rl_bufptr = tsd->rl_buf;
+ }
+
+ tsd->rl_cnt--;
+ *ptr = *tsd->rl_bufptr++;
+ return 1;
+}
+
+static int
+gf_readline_init_once (read_line_t **tsd)
+{
+ if (pthread_once (&rl_once, readline_once) != 0)
+ return -1;
+
+ *tsd = pthread_getspecific (rl_key);
+ if (*tsd)
+ goto out;
+
+ *tsd = GF_CALLOC (1, sizeof (**tsd),
+ gf_changelog_mt_libgfchangelog_rl_t);
+ if (!*tsd)
+ return -1;
+
+ if (pthread_setspecific (rl_key, *tsd) != 0)
+ return -1;
+
+ out:
+ return 0;
+}
+
+ssize_t
+gf_readline (int fd, void *vptr, size_t maxlen)
+{
+ size_t n = 0;
+ size_t rc = 0;
+ char c = ' ';
+ char *ptr = NULL;
+ read_line_t *tsd = NULL;
+
+ if (gf_readline_init_once (&tsd))
+ return -1;
+
+ ptr = vptr;
+ for (n = 1; n < maxlen; n++) {
+ if ( (rc = my_read (tsd, fd, &c)) == 1 ) {
+ *ptr++ = c;
+ if (c == '\n')
+ break;
+ } else if (rc == 0) {
+ *ptr = '\0';
+ return (n - 1);
+ } else
+ return -1;
+ }
+
+ *ptr = '\0';
+ return n;
+
+}
+
+off_t
+gf_lseek (int fd, off_t offset, int whence)
+{
+ off_t off = 0;
+ read_line_t *tsd = NULL;
+
+ if (gf_readline_init_once (&tsd))
+ return -1;
+
+ if ( (off = lseek (fd, offset, whence)) == -1)
+ return -1;
+
+ tsd->rl_cnt = 0;
+ tsd->rl_bufptr = tsd->rl_buf;
+
+ return off;
+}
+
+int
+gf_ftruncate (int fd, off_t length)
+{
+ read_line_t *tsd = NULL;
+
+ if (gf_readline_init_once (&tsd))
+ return -1;
+
+ if (ftruncate (fd, 0))
+ return -1;
+
+ tsd->rl_cnt = 0;
+ tsd->rl_bufptr = tsd->rl_buf;
+
+ return 0;
+}
diff --git a/xlators/features/changelog/lib/src/gf-changelog-helpers.h b/xlators/features/changelog/lib/src/gf-changelog-helpers.h
new file mode 100644
index 00000000000..3aa6ed7b8e2
--- /dev/null
+++ b/xlators/features/changelog/lib/src/gf-changelog-helpers.h
@@ -0,0 +1,97 @@
+/*
+ Copyright (c) 2013 Red Hat, Inc. <http://www.redhat.com>
+ This file is part of GlusterFS.
+
+ This file is licensed to you under your choice of the GNU Lesser
+ General Public License, version 3 or any later version (LGPLv3 or
+ later), or the GNU General Public License, version 2 (GPLv2), in all
+ cases as published by the Free Software Foundation.
+*/
+
+#ifndef _GF_CHANGELOG_HELPERS_H
+#define _GF_CHANGELOG_HELPERS_H
+
+#include <unistd.h>
+#include <dirent.h>
+#include <limits.h>
+#include <pthread.h>
+
+#include <xlator.h>
+
+#define GF_CHANGELOG_TRACKER "tracker"
+
+#define GF_CHANGELOG_CURRENT_DIR ".current"
+#define GF_CHANGELOG_PROCESSED_DIR ".processed"
+#define GF_CHANGELOG_PROCESSING_DIR ".processing"
+
+#ifndef MAXLINE
+#define MAXLINE 4096
+#endif
+
+#define GF_CHANGELOG_FILL_BUFFER(ptr, ascii, off, len) do { \
+ memcpy (ascii + off, ptr, len); \
+ off += len; \
+ } while (0)
+
+typedef struct read_line {
+ int rl_cnt;
+ char *rl_bufptr;
+ char rl_buf[MAXLINE];
+} read_line_t;
+
+typedef struct gf_changelog {
+ xlator_t *this;
+
+ /* 'processing' directory stream */
+ DIR *gfc_dir;
+
+ /* fd to the tracker file */
+ int gfc_fd;
+
+ /* connection retries */
+ int gfc_connretries;
+
+ char gfc_sockpath[PATH_MAX];
+
+ char gfc_brickpath[PATH_MAX];
+
+ /* socket for recieving notifications */
+ int gfc_sockfd;
+
+ char *gfc_working_dir;
+
+ /* RFC 3986 string encoding */
+ char rfc3986[256];
+
+ char gfc_current_dir[PATH_MAX];
+ char gfc_processed_dir[PATH_MAX];
+ char gfc_processing_dir[PATH_MAX];
+
+ pthread_t gfc_changelog_processor;
+} gf_changelog_t;
+
+int
+gf_changelog_notification_init (xlator_t *this, gf_changelog_t *gfc);
+
+void *
+gf_changelog_process (void *data);
+
+ssize_t
+gf_changelog_read_path (int fd, char *buffer, size_t bufsize);
+
+void
+gf_rfc3986_encode (unsigned char *s, char *enc, char *estr);
+
+size_t
+gf_changelog_write (int fd, char *buffer, size_t len);
+
+ssize_t
+gf_readline (int fd, void *vptr, size_t maxlen);
+
+int
+gf_ftruncate (int fd, off_t length);
+
+off_t
+gf_lseek (int fd, off_t offset, int whence);
+
+#endif
diff --git a/xlators/features/changelog/lib/src/gf-changelog-process.c b/xlators/features/changelog/lib/src/gf-changelog-process.c
new file mode 100644
index 00000000000..df7204931a8
--- /dev/null
+++ b/xlators/features/changelog/lib/src/gf-changelog-process.c
@@ -0,0 +1,571 @@
+/*
+ Copyright (c) 2013 Red Hat, Inc. <http://www.redhat.com>
+ This file is part of GlusterFS.
+
+ This file is licensed to you under your choice of the GNU Lesser
+ General Public License, version 3 or any later version (LGPLv3 or
+ later), or the GNU General Public License, version 2 (GPLv2), in all
+ cases as published by the Free Software Foundation.
+*/
+
+#include <unistd.h>
+#include <pthread.h>
+
+#include "uuid.h"
+#include "globals.h"
+#include "glusterfs.h"
+
+#include "gf-changelog-helpers.h"
+
+/* from the changelog translator */
+#include "changelog-misc.h"
+
+extern int byebye;
+
+/**
+ * number of gfid records after fop number
+ */
+int nr_gfids[] = {
+ [GF_FOP_MKNOD] = 1,
+ [GF_FOP_MKDIR] = 1,
+ [GF_FOP_UNLINK] = 1,
+ [GF_FOP_RMDIR] = 1,
+ [GF_FOP_SYMLINK] = 1,
+ [GF_FOP_RENAME] = 2,
+ [GF_FOP_LINK] = 1,
+ [GF_FOP_CREATE] = 1,
+};
+
+static char *
+binary_to_ascii (uuid_t uuid)
+{
+ return uuid_utoa (uuid);
+}
+
+static char *
+conv_noop (char *ptr) { return ptr; }
+
+#define VERIFY_SEPARATOR(ptr, plen, perr) \
+ { \
+ if (*(ptr + plen) != '\0') { \
+ perr = 1; \
+ break; \
+ } \
+ }
+
+#define MOVER_MOVE(mover, nleft, bytes) \
+ { \
+ mover += bytes; \
+ nleft -= bytes; \
+ } \
+
+#define PARSE_GFID(mov, ptr, le, fn, perr) \
+ { \
+ VERIFY_SEPARATOR (mov, le, perr); \
+ ptr = fn (mov); \
+ if (!ptr) { \
+ perr = 1; \
+ break; \
+ } \
+ }
+
+#define FILL_AND_MOVE(pt, buf, of, mo, nl, le) \
+ { \
+ GF_CHANGELOG_FILL_BUFFER (pt, buf, of, strlen (pt)); \
+ MOVER_MOVE (mo, nl, le); \
+ }
+
+
+#define PARSE_GFID_MOVE(ptr, uuid, mover, nleft, perr) \
+ { \
+ memcpy (uuid, mover, sizeof (uuid_t)); \
+ ptr = binary_to_ascii (uuid); \
+ if (!ptr) { \
+ perr = 1; \
+ break; \
+ } \
+ MOVER_MOVE (mover, nleft, sizeof (uuid_t)); \
+ } \
+
+#define LINE_BUFSIZE 3*PATH_MAX /* enough buffer for extra chars too */
+
+/**
+ * using mmap() makes parsing easy. fgets() cannot be used here as
+ * the binary gfid could contain a line-feed (0x0A), in that case fgets()
+ * would read an incomplete line and parsing would fail. using POSIX fds
+ * would result is additional code to maintain state in case of partial
+ * reads of data (where multiple entries do not fit extirely in the buffer).
+ *
+ * mmap() gives the flexibility of pointing to an offset in the file
+ * without us worrying about reading it in memory (VM does that for us for
+ * free).
+ */
+
+static int
+gf_changelog_parse_binary (xlator_t *this,
+ gf_changelog_t *gfc, int from_fd, int to_fd,
+ size_t start_offset, struct stat *stbuf)
+
+{
+ int ret = -1;
+ off_t off = 0;
+ off_t nleft = 0;
+ uuid_t uuid = {0,};
+ char *ptr = NULL;
+ char *bname_start = NULL;
+ char *bname_end = NULL;
+ char *mover = NULL;
+ char *start = NULL;
+ char current_mover = ' ';
+ size_t blen = 0;
+ int parse_err = 0;
+ char ascii[LINE_BUFSIZE] = {0,};
+
+ nleft = stbuf->st_size;
+
+ start = (char *) mmap (NULL, nleft,
+ PROT_READ, MAP_PRIVATE, from_fd, 0);
+ if (!start) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "mmap() error (reason: %s)", strerror (errno));
+ goto out;
+ }
+
+ mover = start;
+
+ MOVER_MOVE (mover, nleft, start_offset);
+
+ while (nleft > 0) {
+
+ off = blen = 0;
+ ptr = bname_start = bname_end = NULL;
+
+ current_mover = *mover;
+
+ switch (current_mover) {
+ case 'D':
+ case 'M':
+ MOVER_MOVE (mover, nleft, 1);
+ PARSE_GFID_MOVE (ptr, uuid, mover, nleft, parse_err);
+
+ break;
+
+ case 'E':
+ MOVER_MOVE (mover, nleft, 1);
+ PARSE_GFID_MOVE (ptr, uuid, mover, nleft, parse_err);
+
+ bname_start = mover;
+ if ( (bname_end = strchr (mover, '\n')) == NULL ) {
+ parse_err = 1;
+ break;
+ }
+
+ blen = bname_end - bname_start;
+ MOVER_MOVE (mover, nleft, blen);
+
+ break;
+
+ default:
+ parse_err = 1;
+ }
+
+ if (parse_err)
+ break;
+
+ GF_CHANGELOG_FILL_BUFFER (&current_mover, ascii, off, 1);
+ GF_CHANGELOG_FILL_BUFFER (" ", ascii, off, 1);
+ GF_CHANGELOG_FILL_BUFFER (ptr, ascii, off, strlen (ptr));
+ if (blen)
+ GF_CHANGELOG_FILL_BUFFER (bname_start,
+ ascii, off, blen);
+ GF_CHANGELOG_FILL_BUFFER ("\n", ascii, off, 1);
+
+ if (gf_changelog_write (to_fd, ascii, off) != off) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "processing binary changelog failed due to "
+ " error in writing ascii change (reason: %s)",
+ strerror (errno));
+ break;
+ }
+
+ MOVER_MOVE (mover, nleft, 1);
+ }
+
+ if ( (nleft == 0) && (!parse_err))
+ ret = 0;
+
+ if (munmap (start, stbuf->st_size))
+ gf_log (this->name, GF_LOG_ERROR,
+ "munmap() error (reason: %s)", strerror (errno));
+ out:
+ return ret;
+}
+
+/**
+ * ascii decoder:
+ * - separate out one entry from another
+ * - use fop name rather than fop number
+ */
+static int
+gf_changelog_parse_ascii (xlator_t *this,
+ gf_changelog_t *gfc, int from_fd, int to_fd,
+ size_t start_offset, struct stat *stbuf)
+{
+ int ng = 0;
+ int ret = -1;
+ int fop = 0;
+ int len = 0;
+ off_t off = 0;
+ off_t nleft = 0;
+ char *ptr = NULL;
+ char *eptr = NULL;
+ char *start = NULL;
+ char *mover = NULL;
+ int parse_err = 0;
+ char current_mover = ' ';
+ char ascii[LINE_BUFSIZE] = {0,};
+ const char *fopname = NULL;
+
+ nleft = stbuf->st_size;
+
+ start = (char *) mmap (NULL, nleft,
+ PROT_READ, MAP_PRIVATE, from_fd, 0);
+ if (!start) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "mmap() error (reason: %s)", strerror (errno));
+ goto out;
+ }
+
+ mover = start;
+
+ MOVER_MOVE (mover, nleft, start_offset);
+
+ while (nleft > 0) {
+ off = 0;
+ current_mover = *mover;
+
+ GF_CHANGELOG_FILL_BUFFER (&current_mover, ascii, off, 1);
+ GF_CHANGELOG_FILL_BUFFER (" ", ascii, off, 1);
+
+ switch (current_mover) {
+ case 'D':
+ case 'M':
+ MOVER_MOVE (mover, nleft, 1);
+
+ /* target gfid */
+ PARSE_GFID (mover, ptr, UUID_CANONICAL_FORM_LEN,
+ conv_noop, parse_err);
+ FILL_AND_MOVE(ptr, ascii, off,
+ mover, nleft, UUID_CANONICAL_FORM_LEN);
+ break;
+
+ case 'E':
+ MOVER_MOVE (mover, nleft, 1);
+
+ /* target gfid */
+ PARSE_GFID (mover, ptr, UUID_CANONICAL_FORM_LEN,
+ conv_noop, parse_err);
+ FILL_AND_MOVE (ptr, ascii, off,
+ mover, nleft, UUID_CANONICAL_FORM_LEN);
+ FILL_AND_MOVE (" ", ascii, off,
+ mover, nleft, 1);
+
+ /* fop */
+ len = strlen (mover);
+ VERIFY_SEPARATOR (mover, len, parse_err);
+
+ fop = atoi (mover);
+ if ( (fopname = gf_fop_list[fop]) == NULL) {
+ parse_err = 1;
+ break;
+ }
+
+ MOVER_MOVE (mover, nleft, len);
+
+ len = strlen (fopname);
+ GF_CHANGELOG_FILL_BUFFER (fopname, ascii, off, len);
+
+ /* pargfid + bname */
+ ng = nr_gfids[fop];
+ while (ng-- > 0) {
+ MOVER_MOVE (mover, nleft, 1);
+ len = strlen (mover);
+ GF_CHANGELOG_FILL_BUFFER (" ", ascii, off, 1);
+
+ PARSE_GFID (mover, ptr, len,
+ conv_noop, parse_err);
+ eptr = calloc (3, strlen (ptr));
+ if (!eptr) {
+ parse_err = 1;
+ break;
+ }
+
+ gf_rfc3986_encode ((unsigned char *) ptr,
+ eptr, gfc->rfc3986);
+ FILL_AND_MOVE (eptr, ascii, off,
+ mover, nleft, len);
+ free (eptr);
+ }
+
+ break;
+ default:
+ parse_err = 1;
+ }
+
+ if (parse_err)
+ break;
+
+ GF_CHANGELOG_FILL_BUFFER ("\n", ascii, off, 1);
+
+ if (gf_changelog_write (to_fd, ascii, off) != off) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "processing ascii changelog failed due to "
+ " wrror in writing change (reason: %s)",
+ strerror (errno));
+ break;
+ }
+
+ MOVER_MOVE (mover, nleft, 1);
+
+ }
+
+ if ( (nleft == 0) && (!parse_err))
+ ret = 0;
+
+ if (munmap (start, stbuf->st_size))
+ gf_log (this->name, GF_LOG_ERROR,
+ "munmap() error (reason: %s)", strerror (errno));
+
+ out:
+ return ret;
+}
+
+#define COPY_BUFSIZE 8192
+static int
+gf_changelog_copy (xlator_t *this, int from_fd, int to_fd)
+{
+ ssize_t size = 0;
+ char buffer[COPY_BUFSIZE+1] = {0,};
+
+ while (1) {
+ size = read (from_fd, buffer, COPY_BUFSIZE);
+ if (size <= 0)
+ break;
+
+ if (gf_changelog_write (to_fd,
+ buffer, size) != size) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "error processing ascii changlog");
+ size = -1;
+ break;
+ }
+ }
+
+ return (size < 0 ? -1 : 0);
+}
+
+static int
+gf_changelog_decode (xlator_t *this, gf_changelog_t *gfc, int from_fd,
+ int to_fd, struct stat *stbuf, int *zerob)
+{
+ int ret = -1;
+ int encoding = -1;
+ size_t elen = 0;
+ char buffer[1024] = {0,};
+
+ CHANGELOG_GET_ENCODING (from_fd, buffer, 1024, encoding, elen);
+ if (encoding == -1) /* unknown encoding */
+ goto out;
+
+ if (!CHANGELOG_VALID_ENCODING (encoding))
+ goto out;
+
+ if (elen == stbuf->st_size) {
+ *zerob = 1;
+ goto out;
+ }
+
+ /**
+ * start processing after the header
+ */
+ lseek (from_fd, elen, SEEK_SET);
+
+ switch (encoding) {
+ case CHANGELOG_ENCODE_BINARY:
+ /**
+ * this ideally should have been a part of changelog-encoders.c
+ * (ie. part of the changelog translator).
+ */
+ ret = gf_changelog_parse_binary (this, gfc, from_fd,
+ to_fd, elen, stbuf);
+ break;
+
+ case CHANGELOG_ENCODE_ASCII:
+ ret = gf_changelog_parse_ascii (this, gfc, from_fd,
+ to_fd, elen, stbuf);
+ break;
+ default:
+ ret = gf_changelog_copy (this, from_fd, to_fd);
+ }
+
+ out:
+ return ret;
+}
+
+static int
+gf_changelog_consume (xlator_t *this, gf_changelog_t *gfc, char *from_path)
+{
+ int ret = -1;
+ int fd1 = 0;
+ int fd2 = 0;
+ int zerob = 0;
+ struct stat stbuf = {0,};
+ char dest[PATH_MAX] = {0,};
+ char to_path[PATH_MAX] = {0,};
+
+ ret = stat (from_path, &stbuf);
+ if (ret || !S_ISREG(stbuf.st_mode)) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "stat failed on changelog file: %s", from_path);
+ goto out;
+ }
+
+ fd1 = open (from_path, O_RDONLY);
+ if (fd1 < 0) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "cannot open changelog file: %s (reason: %s)",
+ from_path, strerror (errno));
+ goto out;
+ }
+
+ (void) snprintf (to_path, PATH_MAX, "%s%s",
+ gfc->gfc_current_dir, basename (from_path));
+ (void) snprintf (dest, PATH_MAX, "%s%s",
+ gfc->gfc_processing_dir, basename (from_path));
+
+ fd2 = open (to_path, O_CREAT | O_TRUNC | O_RDWR,
+ S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
+ if (fd2 < 0) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "cannot create ascii changelog file %s (reason %s)",
+ to_path, strerror (errno));
+ goto close_fd;
+ } else {
+ ret = gf_changelog_decode (this, gfc, fd1,
+ fd2, &stbuf, &zerob);
+
+ close (fd2);
+
+ if (!ret) {
+ /* move it to processing on a successfull
+ decode */
+ ret = rename (to_path, dest);
+ if (ret)
+ gf_log (this->name, GF_LOG_ERROR,
+ "error moving %s to processing dir"
+ " (reason: %s)", to_path,
+ strerror (errno));
+ }
+
+ /* remove it from .current if it's an empty file */
+ if (zerob) {
+ ret = unlink (to_path);
+ if (ret)
+ gf_log (this->name, GF_LOG_ERROR,
+ "could not unlink %s (reason: %s",
+ to_path, strerror (errno));
+ }
+ }
+
+ close_fd:
+ close (fd1);
+
+ out:
+ return ret;
+}
+
+static char *
+gf_changelog_ext_change (xlator_t *this,
+ gf_changelog_t *gfc, char *path, size_t readlen)
+{
+ int alo = 0;
+ int ret = 0;
+ size_t len = 0;
+ char *buf = NULL;
+
+ buf = path;
+ while (len < readlen) {
+ if (*buf == '\0') {
+ alo = 1;
+ gf_log (this->name, GF_LOG_DEBUG,
+ "processing changelog: %s", path);
+ ret = gf_changelog_consume (this, gfc, path);
+ }
+
+ if (ret)
+ break;
+
+ len++; buf++;
+ if (alo) {
+ alo = 0;
+ path = buf;
+ }
+ }
+
+ return (ret) ? NULL : path;
+}
+
+void *
+gf_changelog_process (void *data)
+{
+ ssize_t len = 0;
+ ssize_t offlen = 0;
+ xlator_t *this = NULL;
+ char *sbuf = NULL;
+ gf_changelog_t *gfc = NULL;
+ char from_path[PATH_MAX] = {0,};
+
+ gfc = (gf_changelog_t *) data;
+ this = gfc->this;
+
+ pthread_detach (pthread_self());
+
+ for (;;) {
+ len = gf_changelog_read_path (gfc->gfc_sockfd,
+ from_path + offlen,
+ PATH_MAX - offlen);
+ if (len < 0)
+ continue; /* ignore it for now */
+
+ if (len == 0) { /* close() from the changelog translator */
+ gf_log (this->name, GF_LOG_INFO, "close from changelog"
+ " notification translator.");
+
+ if (gfc->gfc_connretries != 1) {
+ if (!gf_changelog_notification_init(this, gfc))
+ continue;
+ }
+
+ byebye = 1;
+ break;
+ }
+
+ len += offlen;
+ sbuf = gf_changelog_ext_change (this, gfc, from_path, len);
+ if (!sbuf) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "could not extract changelog filename");
+ continue;
+ }
+
+ offlen = 0;
+ if (sbuf != (from_path + len)) {
+ offlen = from_path + len - sbuf;
+ memmove (from_path, sbuf, offlen);
+ }
+ }
+
+ gf_log (this->name, GF_LOG_DEBUG,
+ "byebye (%d) from processing thread...", byebye);
+ return NULL;
+}
diff --git a/xlators/features/changelog/lib/src/gf-changelog.c b/xlators/features/changelog/lib/src/gf-changelog.c
new file mode 100644
index 00000000000..4e60bb276a7
--- /dev/null
+++ b/xlators/features/changelog/lib/src/gf-changelog.c
@@ -0,0 +1,514 @@
+/*
+ Copyright (c) 2013 Red Hat, Inc. <http://www.redhat.com>
+ This file is part of GlusterFS.
+
+ This file is licensed to you under your choice of the GNU Lesser
+ General Public License, version 3 or any later version (LGPLv3 or
+ later), or the GNU General Public License, version 2 (GPLv2), in all
+ cases as published by the Free Software Foundation.
+*/
+
+#include <errno.h>
+#include <dirent.h>
+#include <stddef.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <sys/un.h>
+
+#ifndef _GNU_SOURCE
+#define _GNU_SOURCE
+#endif
+#include <string.h>
+
+#include "globals.h"
+#include "glusterfs.h"
+#include "logging.h"
+
+#include "gf-changelog-helpers.h"
+
+/* from the changelog translator */
+#include "changelog-misc.h"
+#include "changelog-mem-types.h"
+
+int byebye = 0;
+
+static void
+gf_changelog_cleanup (gf_changelog_t *gfc)
+{
+ /* socket */
+ if (gfc->gfc_sockfd != -1)
+ close (gfc->gfc_sockfd);
+ /* tracker fd */
+ if (gfc->gfc_fd != -1)
+ close (gfc->gfc_fd);
+ /* processing dir */
+ if (gfc->gfc_dir)
+ closedir (gfc->gfc_dir);
+
+ if (gfc->gfc_working_dir)
+ free (gfc->gfc_working_dir); /* allocated by realpath */
+}
+
+void
+__attribute__ ((constructor)) gf_changelog_ctor (void)
+{
+ glusterfs_ctx_t *ctx = NULL;
+
+ ctx = glusterfs_ctx_new ();
+ if (!ctx)
+ return;
+
+ if (glusterfs_globals_init (ctx)) {
+ free (ctx);
+ ctx = NULL;
+ return;
+ }
+
+ THIS->ctx = ctx;
+}
+
+void
+__attribute__ ((destructor)) gf_changelog_dtor (void)
+{
+ xlator_t *this = NULL;
+ glusterfs_ctx_t *ctx = NULL;
+ gf_changelog_t *gfc = NULL;
+
+ this = THIS;
+ if (!this)
+ return;
+
+ ctx = this->ctx;
+ gfc = this->private;
+
+ if (gfc) {
+ gf_changelog_cleanup (gfc);
+ GF_FREE (gfc);
+ }
+
+ if (ctx) {
+ pthread_mutex_destroy (&ctx->lock);
+ free (ctx);
+ ctx = NULL;
+ }
+}
+
+
+static int
+gf_changelog_open_dirs (gf_changelog_t *gfc)
+{
+ int ret = -1;
+ DIR *dir = NULL;
+ int tracker_fd = 0;
+ char tracker_path[PATH_MAX] = {0,};
+
+ (void) snprintf (gfc->gfc_current_dir, PATH_MAX,
+ "%s/"GF_CHANGELOG_CURRENT_DIR"/",
+ gfc->gfc_working_dir);
+ ret = mkdir_p (gfc->gfc_current_dir, 0600, _gf_false);
+ if (ret)
+ goto out;
+
+ (void) snprintf (gfc->gfc_processed_dir, PATH_MAX,
+ "%s/"GF_CHANGELOG_PROCESSED_DIR"/",
+ gfc->gfc_working_dir);
+ ret = mkdir_p (gfc->gfc_processed_dir, 0600, _gf_false);
+ if (ret)
+ goto out;
+
+ (void) snprintf (gfc->gfc_processing_dir, PATH_MAX,
+ "%s/"GF_CHANGELOG_PROCESSING_DIR"/",
+ gfc->gfc_working_dir);
+ ret = mkdir_p (gfc->gfc_processing_dir, 0600, _gf_false);
+ if (ret)
+ goto out;
+
+ dir = opendir (gfc->gfc_processing_dir);
+ if (!dir) {
+ gf_log ("", GF_LOG_ERROR,
+ "opendir() error [reason: %s]", strerror (errno));
+ goto out;
+ }
+
+ gfc->gfc_dir = dir;
+
+ (void) snprintf (tracker_path, PATH_MAX,
+ "%s/"GF_CHANGELOG_TRACKER, gfc->gfc_working_dir);
+
+ tracker_fd = open (tracker_path, O_CREAT | O_APPEND | O_RDWR,
+ S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
+ if (tracker_fd < 0) {
+ closedir (gfc->gfc_dir);
+ ret = -1;
+ goto out;
+ }
+
+ gfc->gfc_fd = tracker_fd;
+ ret = 0;
+ out:
+ return ret;
+}
+
+int
+gf_changelog_notification_init (xlator_t *this, gf_changelog_t *gfc)
+{
+ int ret = 0;
+ int len = 0;
+ int tries = 0;
+ int sockfd = 0;
+ struct sockaddr_un remote;
+
+ this = gfc->this;
+
+ if (gfc->gfc_sockfd != -1) {
+ gf_log (this->name, GF_LOG_INFO,
+ "Reconnecting...");
+ close (gfc->gfc_sockfd);
+ }
+
+ sockfd = socket (AF_UNIX, SOCK_STREAM, 0);
+ if (sockfd < 0) {
+ ret = -1;
+ goto out;
+ }
+
+ CHANGELOG_MAKE_SOCKET_PATH (gfc->gfc_brickpath,
+ gfc->gfc_sockpath, PATH_MAX);
+ gf_log (this->name, GF_LOG_INFO,
+ "connecting to changelog socket: %s (brick: %s)",
+ gfc->gfc_sockpath, gfc->gfc_brickpath);
+
+ remote.sun_family = AF_UNIX;
+ strcpy (remote.sun_path, gfc->gfc_sockpath);
+
+ len = strlen (remote.sun_path) + sizeof (remote.sun_family);
+
+ while (tries < gfc->gfc_connretries) {
+ gf_log (this->name, GF_LOG_WARNING,
+ "connection attempt %d/%d...",
+ tries + 1, gfc->gfc_connretries);
+
+ /* initiate a connect */
+ if (connect (sockfd, (struct sockaddr *) &remote, len) == 0) {
+ gfc->gfc_sockfd = sockfd;
+ break;
+ }
+
+ tries++;
+ sleep (2);
+ }
+
+ if (tries == gfc->gfc_connretries) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "could not connect to changelog socket!"
+ " bailing out...");
+ ret = -1;
+ } else
+ gf_log (this->name, GF_LOG_INFO,
+ "connection successful");
+
+ out:
+ return ret;
+}
+
+int
+gf_changelog_done (char *file)
+{
+ int ret = -1;
+ char *buffer = NULL;
+ xlator_t *this = NULL;
+ gf_changelog_t *gfc = NULL;
+ char to_path[PATH_MAX] = {0,};
+
+ errno = EINVAL;
+
+ this = THIS;
+ if (!this)
+ goto out;
+
+ gfc = (gf_changelog_t *) this->private;
+ if (!gfc)
+ goto out;
+
+ if (!file || !strlen (file))
+ goto out;
+
+ /* make sure 'file' is inside ->gfc_working_dir */
+ buffer = realpath (file, NULL);
+ if (!buffer)
+ goto out;
+
+ if (strncmp (gfc->gfc_working_dir,
+ buffer, strlen (gfc->gfc_working_dir)))
+ goto out;
+
+ (void) snprintf (to_path, PATH_MAX, "%s%s",
+ gfc->gfc_processed_dir, basename (buffer));
+ gf_log (this->name, GF_LOG_DEBUG,
+ "moving %s to processed directory", file);
+ ret = rename (buffer, to_path);
+ if (ret) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "cannot move %s to %s (reason: %s)",
+ file, to_path, strerror (errno));
+ goto out;
+ }
+
+ ret = 0;
+
+ out:
+ if (buffer)
+ free (buffer); /* allocated by realpath() */
+ return ret;
+}
+
+/**
+ * @API
+ * for a set of changelogs, start from the begining
+ */
+int
+gf_changelog_start_fresh ()
+{
+ xlator_t *this = NULL;
+ gf_changelog_t *gfc = NULL;
+
+ this = THIS;
+ if (!this)
+ goto out;
+
+ errno = EINVAL;
+
+ gfc = (gf_changelog_t *) this->private;
+ if (!gfc)
+ goto out;
+
+ if (gf_ftruncate (gfc->gfc_fd, 0))
+ goto out;
+
+ return 0;
+
+ out:
+ return -1;
+}
+
+/**
+ * @API
+ * return the next changelog file entry. zero means all chanelogs
+ * consumed.
+ */
+ssize_t
+gf_changelog_next_change (char *bufptr, size_t maxlen)
+{
+ ssize_t size = 0;
+ int tracker_fd = 0;
+ xlator_t *this = NULL;
+ gf_changelog_t *gfc = NULL;
+ char buffer[PATH_MAX] = {0,};
+
+ errno = EINVAL;
+
+ this = THIS;
+ if (!this)
+ goto out;
+
+ gfc = (gf_changelog_t *) this->private;
+ if (!gfc)
+ goto out;
+
+ tracker_fd = gfc->gfc_fd;
+
+ size = gf_readline (tracker_fd, buffer, maxlen);
+ if (size < 0)
+ goto out;
+ if (size == 0)
+ return 0;
+
+ memcpy (bufptr, buffer, size - 1);
+ *(buffer + size) = '\0';
+
+ return size;
+
+ out:
+ return -1;
+}
+
+/**
+ * @API
+ * gf_changelog_scan() - scan and generate a list of change entries
+ *
+ * calling this api multiple times (without calling gf_changlog_done())
+ * would result new changelogs(s) being refreshed in the tracker file.
+ * This call also acts as a cancellation point for the consumer.
+ */
+ssize_t
+gf_changelog_scan ()
+{
+ int ret = 0;
+ int tracker_fd = 0;
+ size_t len = 0;
+ size_t off = 0;
+ xlator_t *this = NULL;
+ size_t nr_entries = 0;
+ gf_changelog_t *gfc = NULL;
+ struct dirent *entryp = NULL;
+ struct dirent *result = NULL;
+ char buffer[PATH_MAX] = {0,};
+
+ this = THIS;
+ if (!this)
+ goto out;
+
+ gfc = (gf_changelog_t *) this->private;
+ if (!gfc)
+ goto out;
+
+ /**
+ * do we need to protect 'byebye' with locks? worst, the
+ * consumer would get notified during next scan().
+ */
+ if (byebye) {
+ errno = ECONNREFUSED;
+ goto out;
+ }
+
+ errno = EINVAL;
+
+ tracker_fd = gfc->gfc_fd;
+
+ if (gf_ftruncate (tracker_fd, 0))
+ goto out;
+
+ len = offsetof(struct dirent, d_name)
+ + pathconf(gfc->gfc_processing_dir, _PC_NAME_MAX) + 1;
+ entryp = GF_CALLOC (1, len,
+ gf_changelog_mt_libgfchangelog_dirent_t);
+ if (!entryp)
+ goto out;
+
+ rewinddir (gfc->gfc_dir);
+ while (1) {
+ ret = readdir_r (gfc->gfc_dir, entryp, &result);
+ if (ret || !result)
+ break;
+
+ if ( !strcmp (basename (entryp->d_name), ".")
+ || !strcmp (basename (entryp->d_name), "..") )
+ continue;
+
+ nr_entries++;
+
+ GF_CHANGELOG_FILL_BUFFER (gfc->gfc_processing_dir,
+ buffer, off,
+ strlen (gfc->gfc_processing_dir));
+ GF_CHANGELOG_FILL_BUFFER (entryp->d_name, buffer,
+ off, strlen (entryp->d_name));
+ GF_CHANGELOG_FILL_BUFFER ("\n", buffer, off, 1);
+
+ if (gf_changelog_write (tracker_fd, buffer, off) != off) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "error writing changelog filename"
+ " to tracker file");
+ break;
+ }
+ off = 0;
+ }
+
+ GF_FREE (entryp);
+
+ if (!result) {
+ if (gf_lseek (tracker_fd, 0, SEEK_SET) != -1)
+ return nr_entries;
+ }
+ out:
+ return -1;
+}
+
+/**
+ * @API
+ * gf_changelog_register() - register a client for updates.
+ */
+int
+gf_changelog_register (char *brick_path, char *scratch_dir,
+ char *log_file, int log_level, int max_reconnects)
+{
+ int i = 0;
+ int ret = -1;
+ int errn = 0;
+ xlator_t *this = NULL;
+ gf_changelog_t *gfc = NULL;
+
+ this = THIS;
+ if (!this->ctx)
+ goto out;
+
+ errno = ENOMEM;
+
+ gfc = GF_CALLOC (1, sizeof (*gfc),
+ gf_changelog_mt_libgfchangelog_t);
+ if (!gfc)
+ goto out;
+
+ gfc->this = this;
+
+ gfc->gfc_dir = NULL;
+ gfc->gfc_fd = gfc->gfc_sockfd = -1;
+
+ gfc->gfc_working_dir = realpath (scratch_dir, NULL);
+ if (!gfc->gfc_working_dir) {
+ errn = errno;
+ goto cleanup;
+ }
+
+ ret = gf_changelog_open_dirs (gfc);
+ if (ret) {
+ errn = errno;
+ gf_log (this->name, GF_LOG_ERROR,
+ "could not create entries in scratch dir");
+ goto cleanup;
+ }
+
+ if (gf_log_init (this->ctx, log_file))
+ goto cleanup;
+
+ gf_log_set_loglevel ((log_level == -1) ? GF_LOG_INFO :
+ log_level);
+
+ gfc->gfc_connretries = (max_reconnects <= 0) ? 1 : max_reconnects;
+ (void) strncpy (gfc->gfc_brickpath, brick_path, PATH_MAX);
+
+ ret = gf_changelog_notification_init (this, gfc);
+ if (ret) {
+ errn = errno;
+ goto cleanup;
+ }
+
+ ret = pthread_create (&gfc->gfc_changelog_processor,
+ NULL, gf_changelog_process, gfc);
+ if (ret) {
+ errn = errno;
+ gf_log (this->name, GF_LOG_ERROR,
+ "error creating changelog processor thread"
+ " new changes won't be recorded!!!");
+ goto cleanup;
+ }
+
+ for (; i < 256; i++) {
+ gfc->rfc3986[i] =
+ (isalnum(i) || i == '~' ||
+ i == '-' || i == '.' || i == '_') ? i : 0;
+ }
+
+ ret = 0;
+ this->private = gfc;
+
+ goto out;
+
+ cleanup:
+ gf_changelog_cleanup (gfc);
+ GF_FREE (gfc);
+ this->private = NULL;
+ errno = errn;
+
+ out:
+ return ret;
+}
diff --git a/xlators/features/changelog/src/Makefile.am b/xlators/features/changelog/src/Makefile.am
new file mode 100644
index 00000000000..e85031ad496
--- /dev/null
+++ b/xlators/features/changelog/src/Makefile.am
@@ -0,0 +1,19 @@
+xlator_LTLIBRARIES = changelog.la
+
+xlatordir = $(libdir)/glusterfs/$(PACKAGE_VERSION)/xlator/features
+
+noinst_HEADERS = changelog-helpers.h changelog-mem-types.h changelog-rt.h \
+ changelog-misc.h changelog-encoders.h changelog-notifier.h
+
+changelog_la_LDFLAGS = -module -avoidversion
+
+changelog_la_SOURCES = changelog.c changelog-rt.c changelog-helpers.c \
+ changelog-encoders.c changelog-notifier.c
+changelog_la_LIBADD = $(top_builddir)/libglusterfs/src/libglusterfs.la
+
+AM_CPPFLAGS = $(GF_CPPFLAGS) -I$(top_srcdir)/libglusterfs/src -fPIC -D_FILE_OFFSET_BITS=64 \
+ -D_GNU_SOURCE -D$(GF_HOST_OS) -shared -nostartfiles -DDATADIR=\"$(localstatedir)\"
+
+AM_CFLAGS = -Wall $(GF_CFLAGS)
+
+CLEANFILES =
diff --git a/xlators/features/changelog/src/changelog-encoders.c b/xlators/features/changelog/src/changelog-encoders.c
new file mode 100644
index 00000000000..c71ea9270b0
--- /dev/null
+++ b/xlators/features/changelog/src/changelog-encoders.c
@@ -0,0 +1,156 @@
+/*
+ Copyright (c) 2013 Red Hat, Inc. <http://www.redhat.com>
+ This file is part of GlusterFS.
+
+ This file is licensed to you under your choice of the GNU Lesser
+ General Public License, version 3 or any later version (LGPLv3 or
+ later), or the GNU General Public License, version 2 (GPLv2), in all
+ cases as published by the Free Software Foundation.
+*/
+
+#ifndef _CONFIG_H
+#define _CONFIG_H
+#include "config.h"
+#endif
+
+#include "changelog-encoders.h"
+
+size_t
+entry_fn (void *data, char *buffer, gf_boolean_t encode)
+{
+ char *tmpbuf = NULL;
+ size_t bufsz = 0;
+ struct changelog_entry_fields *ce = NULL;
+
+ ce = (struct changelog_entry_fields *) data;
+
+ if (encode) {
+ tmpbuf = uuid_utoa (ce->cef_uuid);
+ CHANGELOG_FILL_BUFFER (buffer, bufsz, tmpbuf, strlen (tmpbuf));
+ } else {
+ CHANGELOG_FILL_BUFFER (buffer, bufsz,
+ ce->cef_uuid, sizeof (uuid_t));
+ }
+
+ CHANGELOG_FILL_BUFFER (buffer, bufsz, "/", 1);
+ CHANGELOG_FILL_BUFFER (buffer, bufsz,
+ ce->cef_bname, strlen (ce->cef_bname));
+ return bufsz;
+}
+
+size_t
+fop_fn (void *data, char *buffer, gf_boolean_t encode)
+{
+ char buf[10] = {0,};
+ size_t bufsz = 0;
+ glusterfs_fop_t fop = 0;
+
+ fop = *(glusterfs_fop_t *) data;
+
+ if (encode) {
+ (void) snprintf (buf, sizeof (buf), "%d", fop);
+ CHANGELOG_FILL_BUFFER (buffer, bufsz, buf, strlen (buf));
+ } else
+ CHANGELOG_FILL_BUFFER (buffer, bufsz, &fop, sizeof (fop));
+
+ return bufsz;
+}
+
+void
+entry_free_fn (void *data)
+{
+ changelog_opt_t *co = data;
+
+ if (!co)
+ return;
+
+ GF_FREE (co->co_entry.cef_bname);
+}
+
+/**
+ * try to write all data in one shot
+ */
+
+static inline void
+changelog_encode_write_xtra (changelog_log_data_t *cld,
+ char *buffer, size_t *off, gf_boolean_t encode)
+{
+ int i = 0;
+ size_t offset = 0;
+ void *data = NULL;
+ changelog_opt_t *co = NULL;
+
+ offset = *off;
+
+ co = (changelog_opt_t *) cld->cld_ptr;
+
+ for (; i < cld->cld_xtra_records; i++, co++) {
+ CHANGELOG_FILL_BUFFER (buffer, offset, "\0", 1);
+
+ switch (co->co_type) {
+ case CHANGELOG_OPT_REC_FOP:
+ data = &co->co_fop;
+ break;
+ case CHANGELOG_OPT_REC_ENTRY:
+ data = &co->co_entry;
+ break;
+ }
+
+ if (co->co_convert)
+ offset += co->co_convert (data,
+ buffer + offset, encode);
+ else /* no coversion: write it out as it is */
+ CHANGELOG_FILL_BUFFER (buffer, offset,
+ data, co->co_len);
+ }
+
+ *off = offset;
+}
+
+int
+changelog_encode_ascii (xlator_t *this, changelog_log_data_t *cld)
+{
+ size_t off = 0;
+ size_t gfid_len = 0;
+ char *gfid_str = NULL;
+ char *buffer = NULL;
+ changelog_priv_t *priv = NULL;
+
+ priv = this->private;
+
+ gfid_str = uuid_utoa (cld->cld_gfid);
+ gfid_len = strlen (gfid_str);
+
+ /* extra bytes for decorations */
+ buffer = alloca (gfid_len + cld->cld_ptr_len + 10);
+ CHANGELOG_STORE_ASCII (priv, buffer,
+ off, gfid_str, gfid_len, cld);
+
+ if (cld->cld_xtra_records)
+ changelog_encode_write_xtra (cld, buffer, &off, _gf_true);
+
+ CHANGELOG_FILL_BUFFER (buffer, off, "\0", 1);
+
+ return changelog_write_change (priv, buffer, off);
+}
+
+int
+changelog_encode_binary (xlator_t *this, changelog_log_data_t *cld)
+{
+ size_t off = 0;
+ char *buffer = NULL;
+ changelog_priv_t *priv = NULL;
+
+ priv = this->private;
+
+ /* extra bytes for decorations */
+ buffer = alloca (sizeof (uuid_t) + cld->cld_ptr_len + 10);
+ CHANGELOG_STORE_BINARY (priv, buffer, off, cld->cld_gfid, cld);
+
+ if (cld->cld_xtra_records)
+ changelog_encode_write_xtra (cld, buffer, &off, _gf_false);
+
+ CHANGELOG_FILL_BUFFER (buffer, off, "\0", 1);
+
+ return changelog_write_change (priv, buffer, off);
+}
diff --git a/xlators/features/changelog/src/changelog-encoders.h b/xlators/features/changelog/src/changelog-encoders.h
new file mode 100644
index 00000000000..43deb1307d7
--- /dev/null
+++ b/xlators/features/changelog/src/changelog-encoders.h
@@ -0,0 +1,44 @@
+/*
+ Copyright (c) 2013 Red Hat, Inc. <http://www.redhat.com>
+ This file is part of GlusterFS.
+
+ This file is licensed to you under your choice of the GNU Lesser
+ General Public License, version 3 or any later version (LGPLv3 or
+ later), or the GNU General Public License, version 2 (GPLv2), in all
+ cases as published by the Free Software Foundation.
+*/
+
+#ifndef _CHANGELOG_ENCODERS_H
+#define _CHANGELOG_ENCODERS_H
+
+#include "xlator.h"
+#include "defaults.h"
+
+#include "changelog-helpers.h"
+
+#define CHANGELOG_STORE_ASCII(priv, buf, off, gfid, gfid_len, cld) do { \
+ CHANGELOG_FILL_BUFFER (buffer, off, \
+ priv->maps[cld->cld_type], 1); \
+ CHANGELOG_FILL_BUFFER (buffer, \
+ off, gfid, gfid_len); \
+ } while (0)
+
+#define CHANGELOG_STORE_BINARY(priv, buf, off, gfid, cld) do { \
+ CHANGELOG_FILL_BUFFER (buffer, off, \
+ priv->maps[cld->cld_type], 1); \
+ CHANGELOG_FILL_BUFFER (buffer, \
+ off, gfid, sizeof (uuid_t)); \
+ } while (0)
+
+size_t
+entry_fn (void *data, char *buffer, gf_boolean_t encode);
+size_t
+fop_fn (void *data, char *buffer, gf_boolean_t encode);
+void
+entry_free_fn (void *data);
+int
+changelog_encode_binary (xlator_t *, changelog_log_data_t *);
+int
+changelog_encode_ascii (xlator_t *, changelog_log_data_t *);
+
+#endif /* _CHANGELOG_ENCODERS_H */
diff --git a/xlators/features/changelog/src/changelog-helpers.c b/xlators/features/changelog/src/changelog-helpers.c
new file mode 100644
index 00000000000..c1bb6e5fef9
--- /dev/null
+++ b/xlators/features/changelog/src/changelog-helpers.c
@@ -0,0 +1,691 @@
+/*
+ Copyright (c) 2013 Red Hat, Inc. <http://www.redhat.com>
+ This file is part of GlusterFS.
+
+ This file is licensed to you under your choice of the GNU Lesser
+ General Public License, version 3 or any later version (LGPLv3 or
+ later), or the GNU General Public License, version 2 (GPLv2), in all
+ cases as published by the Free Software Foundation.
+*/
+
+#ifndef _CONFIG_H
+#define _CONFIG_H
+#include "config.h"
+#endif
+
+#include "xlator.h"
+#include "defaults.h"
+#include "logging.h"
+#include "iobuf.h"
+
+#include "changelog-helpers.h"
+#include "changelog-mem-types.h"
+
+#include <pthread.h>
+
+void
+changelog_thread_cleanup (xlator_t *this, pthread_t thr_id)
+{
+ int ret = 0;
+ void *retval = NULL;
+
+ /* send a cancel request to the thread */
+ ret = pthread_cancel (thr_id);
+ if (ret) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "could not cancel thread (reason: %s)",
+ strerror (errno));
+ goto out;
+ }
+
+ ret = pthread_join (thr_id, &retval);
+ if (ret || (retval != PTHREAD_CANCELED)) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "cancel request not adhered as expected"
+ " (reason: %s)", strerror (errno));
+ }
+
+ out:
+ return;
+}
+
+inline void *
+changelog_get_usable_buffer (changelog_local_t *local)
+{
+ changelog_log_data_t *cld = NULL;
+
+ cld = &local->cld;
+ if (!cld->cld_iobuf)
+ return NULL;
+
+ return cld->cld_iobuf->ptr;
+}
+
+inline void
+changelog_set_usable_record_and_length (changelog_local_t *local,
+ size_t len, int xr)
+{
+ changelog_log_data_t *cld = NULL;
+
+ cld = &local->cld;
+
+ cld->cld_ptr_len = len;
+ cld->cld_xtra_records = xr;
+}
+
+void
+changelog_local_cleanup (xlator_t *xl, changelog_local_t *local)
+{
+ int i = 0;
+ changelog_opt_t *co = NULL;
+ changelog_log_data_t *cld = NULL;
+
+ if (!local)
+ return;
+
+ cld = &local->cld;
+
+ /* cleanup dynamic allocation for extra records */
+ if (cld->cld_xtra_records) {
+ co = (changelog_opt_t *) cld->cld_ptr;
+ for (; i < cld->cld_xtra_records; i++, co++)
+ if (co->co_free)
+ co->co_free (co);
+ }
+
+ CHANGELOG_IOBUF_UNREF (cld->cld_iobuf);
+
+ if (local->inode)
+ inode_unref (local->inode);
+
+ mem_put (local);
+}
+
+inline int
+changelog_write (int fd, char *buffer, size_t len)
+{
+ ssize_t size = 0;
+ size_t writen = 0;
+
+ while (writen < len) {
+ size = write (fd,
+ buffer + writen, len - writen);
+ if (size <= 0)
+ break;
+
+ writen += size;
+ }
+
+ return (writen != len);
+}
+
+static int
+changelog_rollover_changelog (xlator_t *this,
+ changelog_priv_t *priv, unsigned long ts)
+{
+ int ret = -1;
+ int notify = 0;
+ char *bname = NULL;
+ char ofile[PATH_MAX] = {0,};
+ char nfile[PATH_MAX] = {0,};
+
+ if (priv->changelog_fd != -1) {
+ close (priv->changelog_fd);
+ priv->changelog_fd = -1;
+ }
+
+ (void) snprintf (ofile, PATH_MAX,
+ "%s/"CHANGELOG_FILE_NAME, priv->changelog_dir);
+ (void) snprintf (nfile, PATH_MAX,
+ "%s/"CHANGELOG_FILE_NAME".%lu",
+ priv->changelog_dir, ts);
+
+ ret = rename (ofile, nfile);
+ if (!ret)
+ notify = 1;
+
+ if (ret && (errno == ENOENT)) {
+ ret = 0;
+ }
+
+ if (ret) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "error renaming %s -> %s (reason %s)",
+ ofile, nfile, strerror (errno));
+ }
+
+ if (notify) {
+ bname = basename (nfile);
+ gf_log (this->name, GF_LOG_DEBUG, "notifying: %s", bname);
+ ret = changelog_write (priv->wfd, bname, strlen (bname) + 1);
+ if (ret) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "Failed to send file name to notify thread"
+ " (reason: %s)", strerror (errno));
+ }
+ }
+
+ return ret;
+}
+
+int
+changelog_open (xlator_t *this,
+ changelog_priv_t *priv)
+{
+ int fd = 0;
+ int ret = -1;
+ int flags = 0;
+ char buffer[1024] = {0,};
+ char changelog_path[PATH_MAX] = {0,};
+
+ (void) snprintf (changelog_path, PATH_MAX,
+ "%s/"CHANGELOG_FILE_NAME,
+ priv->changelog_dir);
+
+ flags |= (O_CREAT | O_RDWR);
+ if (priv->fsync_interval == 0)
+ flags |= O_SYNC;
+
+ fd = open (changelog_path, flags,
+ S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
+ if (fd < 0) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "unable to open/create changelog file %s"
+ " (reason: %s). change-logging will be"
+ " inactive", changelog_path, strerror (errno));
+ goto out;
+ }
+
+ priv->changelog_fd = fd;
+
+ (void) snprintf (buffer, 1024, CHANGELOG_HEADER,
+ CHANGELOG_VERSION_MAJOR,
+ CHANGELOG_VERSION_MINOR,
+ priv->encode_mode);
+ ret = changelog_write_change (priv, buffer, strlen (buffer));
+ if (ret) {
+ close (priv->changelog_fd);
+ priv->changelog_fd = -1;
+ goto out;
+ }
+
+ ret = 0;
+
+ out:
+ return ret;
+}
+
+int
+changelog_start_next_change (xlator_t *this,
+ changelog_priv_t *priv,
+ unsigned long ts, gf_boolean_t finale)
+{
+ int ret = -1;
+
+ ret = changelog_rollover_changelog (this, priv, ts);
+
+ if (!ret && !finale)
+ ret = changelog_open (this, priv);
+
+ return ret;
+}
+
+/**
+ * return the length of entry
+ */
+inline size_t
+changelog_entry_length ()
+{
+ return sizeof (changelog_log_data_t);
+}
+
+int
+changelog_fill_rollover_data (changelog_log_data_t *cld, gf_boolean_t is_last)
+{
+ struct timeval tv = {0,};
+
+ cld->cld_type = CHANGELOG_TYPE_ROLLOVER;
+
+ if (gettimeofday (&tv, NULL))
+ return -1;
+
+ cld->cld_roll_time = (unsigned long) tv.tv_sec;
+ cld->cld_finale = is_last;
+ return 0;
+}
+
+int
+changelog_write_change (changelog_priv_t *priv, char *buffer, size_t len)
+{
+ return changelog_write (priv->changelog_fd, buffer, len);
+}
+
+inline int
+changelog_handle_change (xlator_t *this,
+ changelog_priv_t *priv, changelog_log_data_t *cld)
+{
+ int ret = 0;
+
+ if (CHANGELOG_TYPE_IS_ROLLOVER (cld->cld_type)) {
+ ret = changelog_start_next_change (this, priv,
+ cld->cld_roll_time,
+ cld->cld_finale);
+ if (ret)
+ gf_log (this->name, GF_LOG_ERROR,
+ "Problem rolling over changelog(s)");
+ goto out;
+ }
+
+ /**
+ * case when there is reconfigure done (disabling changelog) and there
+ * are still fops that have updates in prgress.
+ */
+ if (priv->changelog_fd == -1)
+ return 0;
+
+ if (CHANGELOG_TYPE_IS_FSYNC (cld->cld_type)) {
+ ret = fsync (priv->changelog_fd);
+ if (ret < 0) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "fsync failed (reason: %s)",
+ strerror (errno));
+ }
+ goto out;
+ }
+
+ ret = priv->ce->encode (this, cld);
+ if (ret) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "error writing changelog to disk");
+ }
+
+ out:
+ return ret;
+}
+
+changelog_local_t *
+changelog_local_init (xlator_t *this, inode_t *inode,
+ uuid_t gfid, int xtra_records,
+ gf_boolean_t update_flag)
+{
+ changelog_local_t *local = NULL;
+ struct iobuf *iobuf = NULL;
+
+ /**
+ * We relax the presence of inode if @update_flag is true.
+ * The caller (implmentation of the fop) needs to be careful to
+ * not blindly use local->inode.
+ */
+ if (!update_flag && !inode) {
+ gf_log_callingfn (this->name, GF_LOG_WARNING,
+ "inode needed for version checking !!!");
+ goto out;
+ }
+
+ if (xtra_records) {
+ iobuf = iobuf_get2 (this->ctx->iobuf_pool,
+ xtra_records * CHANGELOG_OPT_RECORD_LEN);
+ if (!iobuf)
+ goto out;
+ }
+
+ local = mem_get0 (this->local_pool);
+ if (!local) {
+ CHANGELOG_IOBUF_UNREF (iobuf);
+ goto out;
+ }
+
+ local->update_no_check = update_flag;
+
+ uuid_copy (local->cld.cld_gfid, gfid);
+
+ local->cld.cld_iobuf = iobuf;
+ local->cld.cld_xtra_records = 0; /* set by the caller */
+
+ if (inode)
+ local->inode = inode_ref (inode);
+
+ out:
+ return local;
+}
+
+int
+changelog_forget (xlator_t *this, inode_t *inode)
+{
+ uint64_t ctx_addr = 0;
+ changelog_inode_ctx_t *ctx = NULL;
+
+ inode_ctx_del (inode, this, &ctx_addr);
+ if (!ctx_addr)
+ return 0;
+
+ ctx = (changelog_inode_ctx_t *) (long) ctx_addr;
+ GF_FREE (ctx);
+
+ return 0;
+}
+
+int
+changelog_inject_single_event (xlator_t *this,
+ changelog_priv_t *priv,
+ changelog_log_data_t *cld)
+{
+ return priv->cd.dispatchfn (this, priv, priv->cd.cd_data, cld, NULL);
+}
+
+/**
+ * TODO: these threads have many thing in common (wake up after
+ * a certain time etc..). move them into separate routine.
+ */
+void *
+changelog_rollover (void *data)
+{
+ int ret = 0;
+ xlator_t *this = NULL;
+ struct timeval tv = {0,};
+ changelog_log_data_t cld = {0,};
+ changelog_time_slice_t *slice = NULL;
+ changelog_priv_t *priv = data;
+
+ this = priv->cr.this;
+ slice = &priv->slice;
+
+ while (1) {
+ tv.tv_sec = priv->rollover_time;
+ tv.tv_usec = 0;
+
+ ret = select (0, NULL, NULL, NULL, &tv);
+ if (ret)
+ continue;
+
+ ret = changelog_fill_rollover_data (&cld, _gf_false);
+ if (ret) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "failed to fill rollover data");
+ continue;
+ }
+
+ LOCK (&priv->lock);
+ {
+ ret = changelog_inject_single_event (this, priv, &cld);
+ if (!ret)
+ SLICE_VERSION_UPDATE (slice);
+ }
+ UNLOCK (&priv->lock);
+ }
+
+ return NULL;
+}
+
+void *
+changelog_fsync_thread (void *data)
+{
+ int ret = 0;
+ xlator_t *this = NULL;
+ struct timeval tv = {0,};
+ changelog_log_data_t cld = {0,};
+ changelog_priv_t *priv = data;
+
+ this = priv->cf.this;
+ cld.cld_type = CHANGELOG_TYPE_FSYNC;
+
+ while (1) {
+ tv.tv_sec = priv->fsync_interval;
+ tv.tv_usec = 0;
+
+ ret = select (0, NULL, NULL, NULL, &tv);
+ if (ret)
+ continue;
+
+ ret = changelog_inject_single_event (this, priv, &cld);
+ if (ret)
+ gf_log (this->name, GF_LOG_ERROR,
+ "failed to inject fsync event");
+ }
+
+ return NULL;
+}
+
+/* macros for inode/changelog version checks */
+
+#define INODE_VERSION_UPDATE(priv, inode, iver, slice, type) do { \
+ LOCK (&inode->lock); \
+ { \
+ LOCK (&priv->lock); \
+ { \
+ *iver = slice->changelog_version[type]; \
+ } \
+ UNLOCK (&priv->lock); \
+ } \
+ UNLOCK (&inode->lock); \
+ } while (0)
+
+#define INODE_VERSION_EQUALS_SLICE(priv, ver, slice, type, upd) do { \
+ LOCK (&priv->lock); \
+ { \
+ upd = (ver == slice->changelog_version[type]) \
+ ? _gf_false : _gf_true; \
+ } \
+ UNLOCK (&priv->lock); \
+ } while (0)
+
+static int
+__changelog_inode_ctx_set (xlator_t *this,
+ inode_t *inode, changelog_inode_ctx_t *ctx)
+{
+ uint64_t ctx_addr = (uint64_t) ctx;
+ return __inode_ctx_set (inode, this, &ctx_addr);
+}
+
+/**
+ * one shot routine to get the address and the value of a inode version
+ * for a particular type.
+ */
+static changelog_inode_ctx_t *
+__changelog_inode_ctx_get (xlator_t *this,
+ inode_t *inode, unsigned long **iver,
+ unsigned long *version, changelog_log_type type)
+{
+ int ret = 0;
+ uint64_t ctx_addr = 0;
+ changelog_inode_ctx_t *ctx = NULL;
+
+ ret = __inode_ctx_get (inode, this, &ctx_addr);
+ if (ret < 0)
+ ctx_addr = 0;
+ if (ctx_addr != 0) {
+ ctx = (changelog_inode_ctx_t *) (long)ctx_addr;
+ goto out;
+ }
+
+ ctx = GF_CALLOC (1, sizeof (*ctx), gf_changelog_mt_inode_ctx_t);
+ if (!ctx)
+ goto out;
+
+ ret = __changelog_inode_ctx_set (this, inode, ctx);
+ if (ret) {
+ GF_FREE (ctx);
+ ctx = NULL;
+ }
+
+ out:
+ if (ctx && iver && version) {
+ *iver = CHANGELOG_INODE_VERSION_TYPE (ctx, type);
+ *version = **iver;
+ }
+
+ return ctx;
+}
+
+static changelog_inode_ctx_t *
+changelog_inode_ctx_get (xlator_t *this,
+ inode_t *inode, unsigned long **iver,
+ unsigned long *version, changelog_log_type type)
+{
+ changelog_inode_ctx_t *ctx = NULL;
+
+ LOCK (&inode->lock);
+ {
+ ctx = __changelog_inode_ctx_get (this,
+ inode, iver, version, type);
+ }
+ UNLOCK (&inode->lock);
+
+ return ctx;
+}
+
+/**
+ * This is the main update routine. Locking has been made granular so as to
+ * maximize parallelism of fops - I'll try to explain it below using execution
+ * timelines.
+ *
+ * Basically, the contention is between multiple execution threads of this
+ * routine and the roll-over thread. So, instead of having a big lock, we hold
+ * granular locks: inode->lock and priv->lock. Now I'll explain what happens
+ * when there is an update and a roll-over at just about the same time.
+ * NOTE:
+ * - the dispatcher itself synchronizes updates via it's own lock
+ * - the slice version in incremented by the roll-over thread
+ *
+ * Case 1: When the rollover thread wins before the inode version can be
+ * compared with the slice version.
+ *
+ * [updater] | [rollover]
+ * |
+ * | <SLICE: 1, 1, 1>
+ * <changelog_update> |
+ * <changelog_inode_ctx_get> |
+ * <CTX: 1, 1, 1> |
+ * | <dispatch-rollover-event>
+ * | LOCK (&priv->lock)
+ * | <SLICE_VERSION_UPDATE>
+ * | <SLICE: 2, 2, 2>
+ * | UNLOCK (&priv->lock)
+ * |
+ * LOCK (&priv->lock) |
+ * <INODE_VERSION_EQUALS_SLICE> |
+ * I: 1 <-> S: 2 |
+ * update: true |
+ * UNLOCK (&priv->lock) |
+ * |
+ * <if update == true> |
+ * <dispath-update-event> |
+ * <INODE_VERSION_UPDATE> |
+ * LOCK (&inode->lock) |
+ * LOCK (&priv->lock) |
+ * <CTX: 2, 1, 1> |
+ * UNLOCK (&priv->lock) |
+ * UNLOCK (&inode->lock) |
+ *
+ * Therefore, the change gets recorded in the next change (no lost change). If
+ * the slice version was ahead of the inode version (say I:1, S: 2), then
+ * anyway the comparison would result in a update (I: 1, S: 3).
+ *
+ * If the rollover time is too less, then there is another contention when the
+ * updater tries to bring up inode version to the slice version (this is also
+ * the case when the roll-over thread wakes up during INODE_VERSION_UPDATE.
+ *
+ * <CTX: 1, 1, 1> | <SLICE: 2, 2, 2>
+ * |
+ * |
+ * <dispath-update-event> |
+ * <INODE_VERSION_UPDATE> |
+ * LOCK (&inode->lock) |
+ * LOCK (&priv->lock) |
+ * <CTX: 2, 1, 1> |
+ * UNLOCK (&priv->lock) |
+ * UNLOCK (&inode->lock) |
+ * | <dispatch-rollover-event>
+ * | LOCK (&priv->lock)
+ * | <SLICE_VERSION_UPDATE>
+ * | <SLICE: 3, 3, 3>
+ * | UNLOCK (&priv->lock)
+ *
+ *
+ * Case 2: When the fop thread wins
+ *
+ * [updater] | [rollover]
+ * |
+ * | <SLICE: 1, 1, 1>
+ * <changelog_update> |
+ * <changelog_inode_ctx_get> |
+ * <CTX: 0, 0, 0> |
+ * |
+ * LOCK (&priv->lock) |
+ * <INODE_VERSION_EQUALS_SLICE> |
+ * I: 0 <-> S: 1 |
+ * update: true |
+ * UNLOCK (&priv->lock) |
+ * | <dispatch-rollover-event>
+ * | LOCK (&priv->lock)
+ * | <SLICE_VERSION_UPDATE>
+ * | <SLICE: 2, 2, 2>
+ * | UNLOCK (&priv->lock)
+ * <if update == true> |
+ * <dispath-update-event> |
+ * <INODE_VERSION_UPDATE> |
+ * LOCK (&inode->lock) |
+ * LOCK (&priv->lock) |
+ * <CTX: 2, 0, 0> |
+ * UNLOCK (&priv->lock) |
+ * UNLOCK (&inode->lock) |
+ *
+ * Here again, if the inode version was equal to the slice version (I: 1, S: 1)
+ * then there is no need to record an update (as the equality of the two version
+ * signifies an update was recorded in the current time slice).
+ */
+inline void
+changelog_update (xlator_t *this, changelog_priv_t *priv,
+ changelog_local_t *local, changelog_log_type type)
+{
+ int ret = 0;
+ unsigned long *iver = NULL;
+ unsigned long version = 0;
+ inode_t *inode = NULL;
+ changelog_time_slice_t *slice = NULL;
+ changelog_inode_ctx_t *ctx = NULL;
+ changelog_log_data_t *cld_0 = NULL;
+ changelog_log_data_t *cld_1 = NULL;
+ changelog_local_t *next_local = NULL;
+ gf_boolean_t need_upd = _gf_true;
+
+ slice = &priv->slice;
+
+ /**
+ * for fops that do not require inode version checking
+ */
+ if (local->update_no_check)
+ goto update;
+
+ inode = local->inode;
+
+ ctx = changelog_inode_ctx_get (this,
+ inode, &iver, &version, type);
+ if (!ctx)
+ goto update;
+
+ INODE_VERSION_EQUALS_SLICE (priv, version, slice, type, need_upd);
+
+ update:
+ if (need_upd) {
+ cld_0 = &local->cld;
+ cld_0->cld_type = type;
+
+ if ( (next_local = local->prev_entry) != NULL ) {
+ cld_1 = &next_local->cld;
+ cld_1->cld_type = type;
+ }
+
+ ret = priv->cd.dispatchfn (this, priv,
+ priv->cd.cd_data, cld_0, cld_1);
+
+ /**
+ * update after the dispatcher has successfully done
+ * it's job.
+ */
+ if (!local->update_no_check && iver && !ret)
+ INODE_VERSION_UPDATE (priv, inode, iver, slice, type);
+ }
+
+ return;
+}
diff --git a/xlators/features/changelog/src/changelog-helpers.h b/xlators/features/changelog/src/changelog-helpers.h
new file mode 100644
index 00000000000..bbea245b95f
--- /dev/null
+++ b/xlators/features/changelog/src/changelog-helpers.h
@@ -0,0 +1,386 @@
+/*
+ Copyright (c) 2013 Red Hat, Inc. <http://www.redhat.com>
+ This file is part of GlusterFS.
+
+ This file is licensed to you under your choice of the GNU Lesser
+ General Public License, version 3 or any later version (LGPLv3 or
+ later), or the GNU General Public License, version 2 (GPLv2), in all
+ cases as published by the Free Software Foundation.
+*/
+
+#ifndef _CHANGELOG_HELPERS_H
+#define _CHANGELOG_HELPERS_H
+
+#include "locking.h"
+#include "timer.h"
+#include "pthread.h"
+#include "iobuf.h"
+
+#include "changelog-misc.h"
+
+/**
+ * the changelog entry
+ */
+typedef struct changelog_log_data {
+ /* rollover related */
+ unsigned long cld_roll_time;
+
+ /* reopen changelog? */
+ gf_boolean_t cld_finale;
+
+ changelog_log_type cld_type;
+
+ /**
+ * sincd gfid is _always_ a necessity, it's not a part
+ * of the iobuf. by doing this we do not add any overhead
+ * for data and metadata related fops.
+ */
+ uuid_t cld_gfid;
+
+ /**
+ * iobufs are used for optionals records: pargfid, path,
+ * write offsets etc.. It's the fop implementers job
+ * to allocate (iobuf_get() in the fop) and get unref'ed
+ * in the callback (CHANGELOG_STACK_UNWIND).
+ */
+ struct iobuf *cld_iobuf;
+
+#define cld_ptr cld_iobuf->ptr
+
+ /**
+ * after allocation you can point this to the length of
+ * usable data, but make sure it does not exceed the
+ * the size of the requested iobuf.
+ */
+ size_t cld_iobuf_len;
+
+#define cld_ptr_len cld_iobuf_len
+
+ /**
+ * number of optional records
+ */
+ int cld_xtra_records;
+} changelog_log_data_t;
+
+/**
+ * holder for dispatch function and private data
+ */
+
+typedef struct changelog_priv changelog_priv_t;
+
+typedef struct changelog_dispatcher {
+ void *cd_data;
+ int (*dispatchfn) (xlator_t *, changelog_priv_t *, void *,
+ changelog_log_data_t *, changelog_log_data_t *);
+} changelog_dispatcher_t;
+
+struct changelog_bootstrap {
+ changelog_mode_t mode;
+ int (*ctor) (xlator_t *, changelog_dispatcher_t *);
+ int (*dtor) (xlator_t *, changelog_dispatcher_t *);
+};
+
+struct changelog_encoder {
+ changelog_encoder_t encoder;
+ int (*encode) (xlator_t *, changelog_log_data_t *);
+};
+
+
+/* xlator private */
+
+typedef struct changelog_time_slice {
+ /**
+ * just in case we need nanosecond granularity some day.
+ * field is unused as of now (maybe we'd need it later).
+ */
+ struct timeval tv_start;
+
+ /**
+ * version of changelog file, incremented each time changes
+ * rollover.
+ */
+ unsigned long changelog_version[CHANGELOG_MAX_TYPE];
+} changelog_time_slice_t;
+
+typedef struct changelog_rollover {
+ /* rollover thread */
+ pthread_t rollover_th;
+
+ xlator_t *this;
+} changelog_rollover_t;
+
+typedef struct changelog_fsync {
+ /* fsync() thread */
+ pthread_t fsync_th;
+
+ xlator_t *this;
+} changelog_fsync_t;
+
+# define CHANGELOG_MAX_CLIENTS 5
+typedef struct changelog_notify {
+ /* reader end of the pipe */
+ int rfd;
+
+ /* notifier thread */
+ pthread_t notify_th;
+
+ /* unique socket path */
+ char sockpath[PATH_MAX];
+
+ int socket_fd;
+
+ /**
+ * simple array of accept()'ed fds. Not scalable at all
+ * for large number of clients, but it's okay as we have
+ * a ahrd limit in this version (@CHANGELOG_MAX_CLIENTS).
+ */
+ int client_fd[CHANGELOG_MAX_CLIENTS];
+
+ xlator_t *this;
+} changelog_notify_t;
+
+struct changelog_priv {
+ gf_boolean_t active;
+
+ /* to generate unique socket file per brick */
+ char *changelog_brick;
+
+ /* logging directory */
+ char *changelog_dir;
+
+ /* one file for all changelog types */
+ int changelog_fd;
+
+ gf_lock_t lock;
+
+ /* writen end of the pipe */
+ int wfd;
+
+ /* rollover time */
+ int32_t rollover_time;
+
+ /* fsync() interval */
+ int32_t fsync_interval;
+
+ /* changelog type maps */
+ const char *maps[CHANGELOG_MAX_TYPE];
+
+ /* time slicer */
+ changelog_time_slice_t slice;
+
+ /* context of the updater */
+ changelog_dispatcher_t cd;
+
+ /* context of the rollover thread */
+ changelog_rollover_t cr;
+
+ /* context of fsync thread */
+ changelog_fsync_t cf;
+
+ /* context of the notifier thread */
+ changelog_notify_t cn;
+
+ /* operation mode */
+ changelog_mode_t op_mode;
+
+ /* bootstrap routine for 'current' logger */
+ struct changelog_bootstrap *cb;
+
+ /* encoder mode */
+ changelog_encoder_t encode_mode;
+
+ /* encoder */
+ struct changelog_encoder *ce;
+};
+
+struct changelog_local {
+ inode_t *inode;
+ gf_boolean_t update_no_check;
+
+ changelog_log_data_t cld;
+
+ /**
+ * ->prev_entry is used in cases when there needs to be
+ * additional changelog entry for the parent (eg. rename)
+ * It's analogous to ->next in single linked list world,
+ * but we call it as ->prev_entry... ha ha ha
+ */
+ struct changelog_local *prev_entry;
+};
+
+typedef struct changelog_local changelog_local_t;
+
+/* inode version is stored in inode ctx */
+typedef struct changelog_inode_ctx {
+ unsigned long iversion[CHANGELOG_MAX_TYPE];
+} changelog_inode_ctx_t;
+
+#define CHANGELOG_INODE_VERSION_TYPE(ctx, type) &(ctx->iversion[type])
+
+/**
+ * Optional Records:
+ * fops that need to save additional information request a array of
+ * @changelog_opt_t struct. The array is allocated via @iobufs.
+ */
+typedef enum {
+ CHANGELOG_OPT_REC_FOP,
+ CHANGELOG_OPT_REC_ENTRY,
+} changelog_optional_rec_type_t;
+
+struct changelog_entry_fields {
+ uuid_t cef_uuid;
+ char *cef_bname;
+};
+
+typedef struct {
+ /**
+ * @co_covert can be used to do post-processing of the record before
+ * it's persisted to the CHANGELOG. If this is NULL, then the record
+ * is persisted as per it's in memory format.
+ */
+ size_t (*co_convert) (void *data, char *buffer, gf_boolean_t encode);
+
+ /* release routines */
+ void (*co_free) (void *data);
+
+ /* type of the field */
+ changelog_optional_rec_type_t co_type;
+
+ /**
+ * sizeof of the 'valid' field in the union. This field is not used if
+ * @co_convert is specified.
+ */
+ size_t co_len;
+
+ union {
+ glusterfs_fop_t co_fop;
+ struct changelog_entry_fields co_entry;
+ };
+} changelog_opt_t;
+
+#define CHANGELOG_OPT_RECORD_LEN sizeof (changelog_opt_t)
+
+/**
+ * helpers routines
+ */
+
+void
+changelog_thread_cleanup (xlator_t *this, pthread_t thr_id);
+inline void *
+changelog_get_usable_buffer (changelog_local_t *local);
+inline void
+changelog_set_usable_record_and_length (changelog_local_t *local,
+ size_t len, int xr);
+void
+changelog_local_cleanup (xlator_t *xl, changelog_local_t *local);
+changelog_local_t *
+changelog_local_init (xlator_t *this, inode_t *inode, uuid_t gfid,
+ int xtra_records, gf_boolean_t update_flag);
+int
+changelog_start_next_change (xlator_t *this,
+ changelog_priv_t *priv,
+ unsigned long ts, gf_boolean_t finale);
+int
+changelog_open (xlator_t *this, changelog_priv_t *priv);
+int
+changelog_fill_rollover_data (changelog_log_data_t *cld, gf_boolean_t is_last);
+int
+changelog_inject_single_event (xlator_t *this,
+ changelog_priv_t *priv,
+ changelog_log_data_t *cld);
+inline size_t
+changelog_entry_length ();
+inline int
+changelog_write (int fd, char *buffer, size_t len);
+int
+changelog_write_change (changelog_priv_t *priv, char *buffer, size_t len);
+inline int
+changelog_handle_change (xlator_t *this,
+ changelog_priv_t *priv, changelog_log_data_t *cld);
+inline void
+changelog_update (xlator_t *this, changelog_priv_t *priv,
+ changelog_local_t *local, changelog_log_type type);
+void *
+changelog_rollover (void *data);
+void *
+changelog_fsync_thread (void *data);
+int
+changelog_forget (xlator_t *this, inode_t *inode);
+
+/* macros */
+
+#define CHANGELOG_STACK_UNWIND(fop, frame, params ...) do { \
+ changelog_local_t *__local = NULL; \
+ xlator_t *__xl = NULL; \
+ if (frame) { \
+ __local = frame->local; \
+ __xl = frame->this; \
+ frame->local = NULL; \
+ } \
+ STACK_UNWIND_STRICT (fop, frame, params); \
+ changelog_local_cleanup (__xl, __local); \
+ if (__local && __local->prev_entry) \
+ changelog_local_cleanup (__xl, \
+ __local->prev_entry); \
+ } while (0)
+
+#define CHANGELOG_IOBUF_REF(iobuf) do { \
+ if (iobuf) \
+ iobuf_ref (iobuf); \
+ } while (0)
+
+#define CHANGELOG_IOBUF_UNREF(iobuf) do { \
+ if (iobuf) \
+ iobuf_unref (iobuf); \
+ } while (0)
+
+#define CHANGELOG_FILL_BUFFER(buffer, off, val, len) do { \
+ memcpy (buffer + off, val, len); \
+ off += len; \
+ } while (0)
+
+#define SLICE_VERSION_UPDATE(slice) do { \
+ int i = 0; \
+ for (; i < CHANGELOG_MAX_TYPE; i++) { \
+ slice->changelog_version[i]++; \
+ } \
+ } while (0)
+
+#define CHANGLOG_FILL_FOP_NUMBER(co, fop, converter, xlen) do { \
+ co->co_convert = converter; \
+ co->co_free = NULL; \
+ co->co_type = CHANGELOG_OPT_REC_FOP; \
+ co->co_fop = fop; \
+ xlen += sizeof (fop); \
+ } while (0)
+
+#define CHANGELOG_FILL_ENTRY(co, pargfid, bname, \
+ converter, freefn, xlen, label) \
+ do { \
+ co->co_convert = converter; \
+ co->co_free = freefn; \
+ co->co_type = CHANGELOG_OPT_REC_ENTRY; \
+ uuid_copy (co->co_entry.cef_uuid, pargfid); \
+ co->co_entry.cef_bname = gf_strdup(bname); \
+ if (!co->co_entry.cef_bname) \
+ goto label; \
+ xlen += (UUID_CANONICAL_FORM_LEN + strlen (bname)); \
+ } while (0)
+
+#define CHANGELOG_INIT(this, local, inode, gfid, xrec) \
+ local = changelog_local_init (this, inode, gfid, xrec, _gf_false)
+
+#define CHANGELOG_INIT_NOCHECK(this, local, inode, gfid, xrec) \
+ local = changelog_local_init (this, inode, gfid, xrec, _gf_true)
+
+#define CHANGELOG_NOT_ACTIVE_THEN_GOTO(priv, label) do { \
+ if (!priv->active) \
+ goto label; \
+ } while (0)
+
+#define CHANGELOG_COND_GOTO(priv, cond, label) do { \
+ if (!priv->active || cond) \
+ goto label; \
+ } while (0)
+
+#endif /* _CHANGELOG_HELPERS_H */
diff --git a/xlators/features/changelog/src/changelog-mem-types.h b/xlators/features/changelog/src/changelog-mem-types.h
new file mode 100644
index 00000000000..d72464eab70
--- /dev/null
+++ b/xlators/features/changelog/src/changelog-mem-types.h
@@ -0,0 +1,29 @@
+/*
+ Copyright (c) 2013 Red Hat, Inc. <http://www.redhat.com>
+ This file is part of GlusterFS.
+
+ This file is licensed to you under your choice of the GNU Lesser
+ General Public License, version 3 or any later version (LGPLv3 or
+ later), or the GNU General Public License, version 2 (GPLv2), in all
+ cases as published by the Free Software Foundation.
+*/
+
+#ifndef _CHANGELOG_MEM_TYPES_H
+#define _CHANGELOG_MEM_TYPES_H
+
+#include "mem-types.h"
+
+enum gf_changelog_mem_types {
+ gf_changelog_mt_priv_t = gf_common_mt_end + 1,
+ gf_changelog_mt_str_t = gf_common_mt_end + 2,
+ gf_changelog_mt_batch_t = gf_common_mt_end + 3,
+ gf_changelog_mt_rt_t = gf_common_mt_end + 4,
+ gf_changelog_mt_inode_ctx_t = gf_common_mt_end + 5,
+ gf_changelog_mt_libgfchangelog_t = gf_common_mt_end + 6,
+ gf_changelog_mt_libgfchangelog_rl_t = gf_common_mt_end + 7,
+ gf_changelog_mt_libgfchangelog_dirent_t = gf_common_mt_end + 8,
+ gf_changelog_mt_changelog_buffer_t = gf_common_mt_end + 9,
+ gf_changelog_mt_end
+};
+
+#endif
diff --git a/xlators/features/changelog/src/changelog-misc.h b/xlators/features/changelog/src/changelog-misc.h
new file mode 100644
index 00000000000..0712a377183
--- /dev/null
+++ b/xlators/features/changelog/src/changelog-misc.h
@@ -0,0 +1,101 @@
+/*
+ Copyright (c) 2013 Red Hat, Inc. <http://www.redhat.com>
+ This file is part of GlusterFS.
+
+ This file is licensed to you under your choice of the GNU Lesser
+ General Public License, version 3 or any later version (LGPLv3 or
+ later), or the GNU General Public License, version 2 (GPLv2), in all
+ cases as published by the Free Software Foundation.
+*/
+
+#ifndef _CHANGELOG_MISC_H
+#define _CHANGELOG_MISC_H
+
+#include "glusterfs.h"
+#include "common-utils.h"
+
+#define CHANGELOG_MAX_TYPE 3
+#define CHANGELOG_FILE_NAME "CHANGELOG"
+
+#define CHANGELOG_VERSION_MAJOR 1
+#define CHANGELOG_VERSION_MINOR 0
+
+#define CHANGELOG_UNIX_SOCK DEFAULT_VAR_RUN_DIRECTORY"/changelog-%s.sock"
+
+/**
+ * header starts with the version and the format of the changelog.
+ * 'version' not much of a use now.
+ */
+#define CHANGELOG_HEADER \
+ "GlusterFS Changelog | version: v%d.%d | encoding : %d\n"
+
+#define CHANGELOG_MAKE_SOCKET_PATH(brick_path, sockpath, len) do { \
+ char md5_sum[MD5_DIGEST_LENGTH*2+1] = {0,}; \
+ md5_wrapper((unsigned char *) brick_path, \
+ strlen(brick_path), \
+ md5_sum); \
+ (void) snprintf (sockpath, len, \
+ CHANGELOG_UNIX_SOCK, md5_sum); \
+ } while (0)
+
+/**
+ * ... used by libgfchangelog.
+ */
+#define CHANGELOG_GET_ENCODING(fd, buffer, len, enc, enc_len) do { \
+ FILE *fp; \
+ int fd_dup, maj, min; \
+ \
+ enc = -1; \
+ fd_dup = dup (fd); \
+ \
+ if (fd_dup != -1) { \
+ fp = fdopen (fd_dup, "r"); \
+ if (fp) { \
+ if (fgets (buffer, len, fp)) { \
+ elen = strlen (buffer); \
+ sscanf (buffer, \
+ CHANGELOG_HEADER, \
+ &maj, &min, &enc); \
+ } \
+ fclose (fp); \
+ } else { \
+ close (fd_dup); \
+ } \
+ } \
+ } while (0)
+
+/**
+ * everything after 'CHANGELOG_TYPE_ENTRY' are internal types
+ * (ie. none of the fops trigger this type of event), hence
+ * CHANGELOG_MAX_TYPE = 3
+ */
+typedef enum {
+ CHANGELOG_TYPE_DATA = 0,
+ CHANGELOG_TYPE_METADATA,
+ CHANGELOG_TYPE_ENTRY,
+ CHANGELOG_TYPE_ROLLOVER,
+ CHANGELOG_TYPE_FSYNC,
+} changelog_log_type;
+
+/* operation modes - RT for now */
+typedef enum {
+ CHANGELOG_MODE_RT = 0,
+} changelog_mode_t;
+
+/* encoder types */
+
+typedef enum {
+ CHANGELOG_ENCODE_MIN = 0,
+ CHANGELOG_ENCODE_BINARY,
+ CHANGELOG_ENCODE_ASCII,
+ CHANGELOG_ENCODE_MAX,
+} changelog_encoder_t;
+
+#define CHANGELOG_VALID_ENCODING(enc) \
+ (enc > CHANGELOG_ENCODE_MIN && enc < CHANGELOG_ENCODE_MAX)
+
+#define CHANGELOG_TYPE_IS_ENTRY(type) (type == CHANGELOG_TYPE_ENTRY)
+#define CHANGELOG_TYPE_IS_ROLLOVER(type) (type == CHANGELOG_TYPE_ROLLOVER)
+#define CHANGELOG_TYPE_IS_FSYNC(type) (type == CHANGELOG_TYPE_FSYNC)
+
+#endif /* _CHANGELOG_MISC_H */
diff --git a/xlators/features/changelog/src/changelog-notifier.c b/xlators/features/changelog/src/changelog-notifier.c
new file mode 100644
index 00000000000..1f8b312538e
--- /dev/null
+++ b/xlators/features/changelog/src/changelog-notifier.c
@@ -0,0 +1,314 @@
+/*
+ Copyright (c) 2013 Red Hat, Inc. <http://www.redhat.com>
+ This file is part of GlusterFS.
+
+ This file is licensed to you under your choice of the GNU Lesser
+ General Public License, version 3 or any later version (LGPLv3 or
+ later), or the GNU General Public License, version 2 (GPLv2), in all
+ cases as published by the Free Software Foundation.
+*/
+
+#include "changelog-notifier.h"
+
+#include <pthread.h>
+
+inline static void
+changelog_notify_clear_fd (changelog_notify_t *cn, int i)
+{
+ cn->client_fd[i] = -1;
+}
+
+inline static void
+changelog_notify_save_fd (changelog_notify_t *cn, int i, int fd)
+{
+ cn->client_fd[i] = fd;
+}
+
+static int
+changelog_notify_insert_fd (xlator_t *this, changelog_notify_t *cn, int fd)
+{
+ int i = 0;
+ int ret = 0;
+
+ for (; i < CHANGELOG_MAX_CLIENTS; i++) {
+ if (cn->client_fd[i] == -1)
+ break;
+ }
+
+ if (i == CHANGELOG_MAX_CLIENTS) {
+ /**
+ * this case should not be hit as listen() would limit
+ * the number of completely established connections.
+ */
+ gf_log (this->name, GF_LOG_WARNING,
+ "hit max client limit (%d)", CHANGELOG_MAX_CLIENTS);
+ ret = -1;
+ }
+ else
+ changelog_notify_save_fd (cn, i, fd);
+
+ return ret;
+}
+
+static void
+changelog_notify_fill_rset (changelog_notify_t *cn, fd_set *rset, int *maxfd)
+{
+ int i = 0;
+
+ FD_ZERO (rset);
+
+ FD_SET (cn->socket_fd, rset);
+ *maxfd = cn->socket_fd;
+
+ FD_SET (cn->rfd, rset);
+ *maxfd = max (*maxfd, cn->rfd);
+
+ for (; i < CHANGELOG_MAX_CLIENTS; i++) {
+ if (cn->client_fd[i] != -1) {
+ FD_SET (cn->client_fd[i], rset);
+ *maxfd = max (*maxfd, cn->client_fd[i]);
+ }
+ }
+
+ *maxfd = *maxfd + 1;
+}
+
+static int
+changelog_notify_client (changelog_notify_t *cn, char *path, ssize_t len)
+{
+ int i = 0;
+ int ret = 0;
+
+ for (; i < CHANGELOG_MAX_CLIENTS; i++) {
+ if (cn->client_fd[i] == -1)
+ continue;
+
+ if (changelog_write (cn->client_fd[i],
+ path, len)) {
+ ret = -1;
+
+ close (cn->client_fd[i]);
+ changelog_notify_clear_fd (cn, i);
+ }
+ }
+
+ return ret;
+}
+
+static void
+changelog_notifier_init (changelog_notify_t *cn)
+{
+ int i = 0;
+
+ cn->socket_fd = -1;
+
+ for (; i < CHANGELOG_MAX_CLIENTS; i++) {
+ changelog_notify_clear_fd (cn, i);
+ }
+}
+
+static void
+changelog_close_client_conn (changelog_notify_t *cn)
+{
+ int i = 0;
+
+ for (; i < CHANGELOG_MAX_CLIENTS; i++) {
+ if (cn->client_fd[i] == -1)
+ continue;
+
+ close (cn->client_fd[i]);
+ changelog_notify_clear_fd (cn, i);
+ }
+}
+
+static void
+changelog_notifier_cleanup (void *arg)
+{
+ changelog_notify_t *cn = NULL;
+
+ cn = (changelog_notify_t *) arg;
+
+ changelog_close_client_conn (cn);
+
+ if (cn->socket_fd != -1)
+ close (cn->socket_fd);
+
+ if (cn->rfd)
+ close (cn->rfd);
+
+ if (unlink (cn->sockpath))
+ gf_log ("", GF_LOG_WARNING,
+ "could not unlink changelog socket file"
+ " %s (reason: %s", cn->sockpath, strerror (errno));
+}
+
+void *
+changelog_notifier (void *data)
+{
+ int i = 0;
+ int fd = 0;
+ int max_fd = 0;
+ int len = 0;
+ ssize_t readlen = 0;
+ xlator_t *this = NULL;
+ changelog_priv_t *priv = NULL;
+ changelog_notify_t *cn = NULL;
+ struct sockaddr_un local = {0,};
+ char path[PATH_MAX] = {0,};
+ char abspath[PATH_MAX] = {0,};
+
+ char buffer;
+ fd_set rset;
+
+ priv = (changelog_priv_t *) data;
+
+ cn = &priv->cn;
+ this = cn->this;
+
+ pthread_cleanup_push (changelog_notifier_cleanup, cn);
+
+ changelog_notifier_init (cn);
+
+ cn->socket_fd = socket (AF_UNIX, SOCK_STREAM, 0);
+ if (cn->socket_fd < 0) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "changelog socket error (reason: %s)",
+ strerror (errno));
+ goto out;
+ }
+
+ CHANGELOG_MAKE_SOCKET_PATH (priv->changelog_brick,
+ cn->sockpath, PATH_MAX);
+ if (unlink (cn->sockpath) < 0) {
+ if (errno != ENOENT) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "Could not unlink changelog socket file (%s)"
+ " (reason: %s)",
+ CHANGELOG_UNIX_SOCK, strerror (errno));
+ goto cleanup;
+ }
+ }
+
+ local.sun_family = AF_UNIX;
+ strcpy (local.sun_path, cn->sockpath);
+
+ len = strlen (local.sun_path) + sizeof (local.sun_family);
+
+ /* bind to the unix domain socket */
+ if (bind (cn->socket_fd, (struct sockaddr *) &local, len) < 0) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "Could not bind to changelog socket (reason: %s)",
+ strerror (errno));
+ goto cleanup;
+ }
+
+ /* listen for incoming connections */
+ if (listen (cn->socket_fd, CHANGELOG_MAX_CLIENTS) < 0) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "listen() error on changelog socket (reason: %s)",
+ strerror (errno));
+ goto cleanup;
+ }
+
+ /**
+ * simple select() on all to-be-read file descriptors. This method
+ * though old school works pretty well when you have a handfull of
+ * fd's to be watched (clients).
+ *
+ * Future TODO: move this to epoll based notification facility if
+ * number of clients increase.
+ */
+ for (;;) {
+ changelog_notify_fill_rset (cn, &rset, &max_fd);
+
+ if (select (max_fd, &rset, NULL, NULL, NULL) < 0) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "select() returned -1 (reason: %s)",
+ strerror (errno));
+ sleep (2);
+ continue;
+ }
+
+ if (FD_ISSET (cn->socket_fd, &rset)) {
+ fd = accept (cn->socket_fd, NULL, NULL);
+ if (fd < 0) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "accept error on changelog socket"
+ " (reason: %s)", strerror (errno));
+ } else if (changelog_notify_insert_fd (this, cn, fd)) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "hit max client limit");
+ }
+ }
+
+ if (FD_ISSET (cn->rfd, &rset)) {
+ /**
+ * read changelog filename and notify all connected
+ * clients.
+ */
+ readlen = 0;
+ while (readlen < PATH_MAX) {
+ len = read (cn->rfd, &path[readlen++], 1);
+ if (len == -1) {
+ break;
+ }
+
+ if (len == 0) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "rollover thread sent EOF"
+ " on pipe - possibly a crash.");
+ /* be blunt and close all connections */
+ pthread_exit(NULL);
+ }
+
+ if (path[readlen - 1] == '\0')
+ break;
+ }
+
+ /* should we close all client connections here too? */
+ if (len < 0 || readlen == PATH_MAX) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "Could not get pathname from rollover"
+ " thread or pathname too long");
+ goto process_rest;
+ }
+
+ (void) snprintf (abspath, PATH_MAX,
+ "%s/%s", priv->changelog_dir, path);
+ if (changelog_notify_client (cn, abspath,
+ strlen (abspath) + 1))
+ gf_log (this->name, GF_LOG_ERROR,
+ "could not notify some clients with new"
+ " changelogs");
+ }
+
+ process_rest:
+ for (i = 0; i < CHANGELOG_MAX_CLIENTS; i++) {
+ if ( (fd = cn->client_fd[i]) == -1 )
+ continue;
+
+ if (FD_ISSET (fd, &rset)) {
+ /**
+ * the only data we accept from the client is a
+ * disconnect. Anything else is treated as bogus
+ * and is silently discarded (also warned!!!).
+ */
+ if ( (readlen = read (fd, &buffer, 1)) <= 0 ) {
+ close (fd);
+ changelog_notify_clear_fd (cn, i);
+ } else {
+ /* silently discard data and log */
+ gf_log (this->name, GF_LOG_WARNING,
+ "misbehaving changelog client");
+ }
+ }
+ }
+
+ }
+
+ cleanup:;
+ pthread_cleanup_pop (1);
+
+ out:
+ return NULL;
+}
diff --git a/xlators/features/changelog/src/changelog-notifier.h b/xlators/features/changelog/src/changelog-notifier.h
new file mode 100644
index 00000000000..55e728356e6
--- /dev/null
+++ b/xlators/features/changelog/src/changelog-notifier.h
@@ -0,0 +1,19 @@
+/*
+ Copyright (c) 2013 Red Hat, Inc. <http://www.redhat.com>
+ This file is part of GlusterFS.
+
+ This file is licensed to you under your choice of the GNU Lesser
+ General Public License, version 3 or any later version (LGPLv3 or
+ later), or the GNU General Public License, version 2 (GPLv2), in all
+ cases as published by the Free Software Foundation.
+*/
+
+#ifndef _CHANGELOG_NOTIFIER_H
+#define _CHANGELOG_NOTIFIER_H
+
+#include "changelog-helpers.h"
+
+void *
+changelog_notifier (void *data);
+
+#endif
diff --git a/xlators/features/changelog/src/changelog-rt.c b/xlators/features/changelog/src/changelog-rt.c
new file mode 100644
index 00000000000..c147f68ca85
--- /dev/null
+++ b/xlators/features/changelog/src/changelog-rt.c
@@ -0,0 +1,72 @@
+/*
+ Copyright (c) 2013 Red Hat, Inc. <http://www.redhat.com>
+ This file is part of GlusterFS.
+
+ This file is licensed to you under your choice of the GNU Lesser
+ General Public License, version 3 or any later version (LGPLv3 or
+ later), or the GNU General Public License, version 2 (GPLv2), in all
+ cases as published by the Free Software Foundation.
+*/
+
+#ifndef _CONFIG_H
+#define _CONFIG_H
+#include "config.h"
+#endif
+
+#include "xlator.h"
+#include "defaults.h"
+#include "logging.h"
+
+#include "changelog-rt.h"
+#include "changelog-mem-types.h"
+
+int
+changelog_rt_init (xlator_t *this, changelog_dispatcher_t *cd)
+{
+ changelog_rt_t *crt = NULL;
+
+ crt = GF_CALLOC (1, sizeof (*crt),
+ gf_changelog_mt_rt_t);
+ if (!crt)
+ return -1;
+
+ LOCK_INIT (&crt->lock);
+
+ cd->cd_data = crt;
+ cd->dispatchfn = &changelog_rt_enqueue;
+
+ return 0;
+}
+
+int
+changelog_rt_fini (xlator_t *this, changelog_dispatcher_t *cd)
+{
+ changelog_rt_t *crt = NULL;
+
+ crt = cd->cd_data;
+
+ LOCK_DESTROY (&crt->lock);
+ GF_FREE (crt);
+
+ return 0;
+}
+
+int
+changelog_rt_enqueue (xlator_t *this, changelog_priv_t *priv, void *cbatch,
+ changelog_log_data_t *cld_0, changelog_log_data_t *cld_1)
+{
+ int ret = 0;
+ changelog_rt_t *crt = NULL;
+
+ crt = (changelog_rt_t *) cbatch;
+
+ LOCK (&crt->lock);
+ {
+ ret = changelog_handle_change (this, priv, cld_0);
+ if (!ret && cld_1)
+ ret = changelog_handle_change (this, priv, cld_1);
+ }
+ UNLOCK (&crt->lock);
+
+ return ret;
+}
diff --git a/xlators/features/changelog/src/changelog-rt.h b/xlators/features/changelog/src/changelog-rt.h
new file mode 100644
index 00000000000..1fc2bbc5bb9
--- /dev/null
+++ b/xlators/features/changelog/src/changelog-rt.h
@@ -0,0 +1,33 @@
+/*
+ Copyright (c) 2013 Red Hat, Inc. <http://www.redhat.com>
+ This file is part of GlusterFS.
+
+ This file is licensed to you under your choice of the GNU Lesser
+ General Public License, version 3 or any later version (LGPLv3 or
+ later), or the GNU General Public License, version 2 (GPLv2), in all
+ cases as published by the Free Software Foundation.
+*/
+
+#ifndef _CHANGELOG_RT_H
+#define _CHANGELOG_RT_H
+
+#include "locking.h"
+#include "timer.h"
+#include "pthread.h"
+
+#include "changelog-helpers.h"
+
+/* unused as of now - may be you would need it later */
+typedef struct changelog_rt {
+ gf_lock_t lock;
+} changelog_rt_t;
+
+int
+changelog_rt_init (xlator_t *this, changelog_dispatcher_t *cd);
+int
+changelog_rt_fini (xlator_t *this, changelog_dispatcher_t *cd);
+int
+changelog_rt_enqueue (xlator_t *this, changelog_priv_t *priv, void *cbatch,
+ changelog_log_data_t *cld_0, changelog_log_data_t *cld_1);
+
+#endif /* _CHANGELOG_RT_H */
diff --git a/xlators/features/changelog/src/changelog.c b/xlators/features/changelog/src/changelog.c
new file mode 100644
index 00000000000..35e3e784986
--- /dev/null
+++ b/xlators/features/changelog/src/changelog.c
@@ -0,0 +1,1487 @@
+/*
+ Copyright (c) 2013 Red Hat, Inc. <http://www.redhat.com>
+ This file is part of GlusterFS.
+
+ This file is licensed to you under your choice of the GNU Lesser
+ General Public License, version 3 or any later version (LGPLv3 or
+ later), or the GNU General Public License, version 2 (GPLv2), in all
+ cases as published by the Free Software Foundation.
+*/
+
+#ifndef _CONFIG_H
+#define _CONFIG_H
+#include "config.h"
+#endif
+
+#include "xlator.h"
+#include "defaults.h"
+#include "logging.h"
+#include "iobuf.h"
+
+#include "changelog-rt.h"
+
+#include "changelog-encoders.h"
+#include "changelog-mem-types.h"
+
+#include <pthread.h>
+
+#include "changelog-notifier.h"
+
+static struct changelog_bootstrap
+cb_bootstrap[] = {
+ {
+ .mode = CHANGELOG_MODE_RT,
+ .ctor = changelog_rt_init,
+ .dtor = changelog_rt_fini,
+ },
+};
+
+static struct changelog_encoder
+cb_encoder[] = {
+ [CHANGELOG_ENCODE_BINARY] =
+ {
+ .encoder = CHANGELOG_ENCODE_BINARY,
+ .encode = changelog_encode_binary,
+ },
+ [CHANGELOG_ENCODE_ASCII] =
+ {
+ .encoder = CHANGELOG_ENCODE_ASCII,
+ .encode = changelog_encode_ascii,
+ },
+};
+
+/* Entry operations - TYPE III */
+
+/**
+ * entry operations do not undergo inode version checking.
+ */
+
+/* {{{ */
+
+/* rmdir */
+
+int32_t
+changelog_rmdir_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
+ int32_t op_ret, int32_t op_errno, struct iatt *preparent,
+ struct iatt *postparent, dict_t *xdata)
+{
+ changelog_priv_t *priv = NULL;
+ changelog_local_t *local = NULL;
+
+ priv = this->private;
+ local = frame->local;
+
+ CHANGELOG_COND_GOTO (priv, ((op_ret < 0) || !local), unwind);
+
+ changelog_update (this, priv, local, CHANGELOG_TYPE_ENTRY);
+
+ unwind:
+ CHANGELOG_STACK_UNWIND (rmdir, frame, op_ret, op_errno,
+ preparent, postparent, xdata);
+ return 0;
+}
+
+int32_t
+changelog_rmdir (call_frame_t *frame, xlator_t *this,
+ loc_t *loc, int xflags, dict_t *xdata)
+{
+ size_t xtra_len = 0;
+ changelog_priv_t *priv = NULL;
+ changelog_opt_t *co = NULL;
+
+ priv = this->private;
+ CHANGELOG_NOT_ACTIVE_THEN_GOTO (priv, wind);
+
+ CHANGELOG_INIT_NOCHECK (this, frame->local,
+ NULL, loc->inode->gfid, 2);
+
+ co = changelog_get_usable_buffer (frame->local);
+ if (!co)
+ goto wind;
+
+ CHANGLOG_FILL_FOP_NUMBER (co, frame->root->op, fop_fn, xtra_len);
+
+ co++;
+ CHANGELOG_FILL_ENTRY (co, loc->pargfid, loc->name,
+ entry_fn, entry_free_fn, xtra_len, wind);
+
+ changelog_set_usable_record_and_length (frame->local, xtra_len, 2);
+
+ wind:
+ STACK_WIND (frame, changelog_rmdir_cbk,
+ FIRST_CHILD (this), FIRST_CHILD (this)->fops->rmdir,
+ loc, xflags, xdata);
+ return 0;
+}
+
+/* unlink */
+
+int32_t
+changelog_unlink_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
+ int32_t op_ret, int32_t op_errno, struct iatt *preparent,
+ struct iatt *postparent, dict_t *xdata)
+{
+ changelog_priv_t *priv = NULL;
+ changelog_local_t *local = NULL;
+
+ priv = this->private;
+ local = frame->local;
+
+ CHANGELOG_COND_GOTO (priv, ((op_ret < 0) || !local), unwind);
+
+ changelog_update (this, priv, local, CHANGELOG_TYPE_ENTRY);
+
+ unwind:
+ CHANGELOG_STACK_UNWIND (unlink, frame, op_ret, op_errno,
+ preparent, postparent, xdata);
+ return 0;
+}
+
+int32_t
+changelog_unlink (call_frame_t *frame, xlator_t *this,
+ loc_t *loc, int xflags, dict_t *xdata)
+{
+ size_t xtra_len = 0;
+ changelog_priv_t *priv = NULL;
+ changelog_opt_t *co = NULL;
+
+ priv = this->private;
+ CHANGELOG_NOT_ACTIVE_THEN_GOTO (priv, wind);
+
+ CHANGELOG_INIT_NOCHECK (this, frame->local, NULL, loc->inode->gfid, 2);
+
+ co = changelog_get_usable_buffer (frame->local);
+ if (!co)
+ goto wind;
+
+ CHANGLOG_FILL_FOP_NUMBER (co, frame->root->op, fop_fn, xtra_len);
+
+ co++;
+ CHANGELOG_FILL_ENTRY (co, loc->pargfid, loc->name,
+ entry_fn, entry_free_fn, xtra_len, wind);
+
+ changelog_set_usable_record_and_length (frame->local, xtra_len, 2);
+
+ wind:
+ STACK_WIND (frame, changelog_unlink_cbk,
+ FIRST_CHILD (this), FIRST_CHILD (this)->fops->unlink,
+ loc, xflags, xdata);
+ return 0;
+}
+
+/* rename */
+
+int32_t
+changelog_rename_cbk (call_frame_t *frame,
+ void *cookie, xlator_t *this,
+ int32_t op_ret, int32_t op_errno,
+ struct iatt *buf, struct iatt *preoldparent,
+ struct iatt *postoldparent, struct iatt *prenewparent,
+ struct iatt *postnewparent, dict_t *xdata)
+{
+ changelog_priv_t *priv = NULL;
+ changelog_local_t *local = NULL;
+
+ priv = this->private;
+ local = frame->local;
+
+ CHANGELOG_COND_GOTO (priv, ((op_ret < 0) || !local), unwind);
+
+ changelog_update (this, priv, local, CHANGELOG_TYPE_ENTRY);
+
+ unwind:
+ CHANGELOG_STACK_UNWIND (rename, frame, op_ret, op_errno,
+ buf, preoldparent, postoldparent,
+ prenewparent, postnewparent, xdata);
+ return 0;
+}
+
+
+int32_t
+changelog_rename (call_frame_t *frame, xlator_t *this,
+ loc_t *oldloc, loc_t *newloc, dict_t *xdata)
+{
+ size_t xtra_len = 0;
+ uuid_t null_uuid = {0,};
+ changelog_priv_t *priv = NULL;
+ changelog_opt_t *co = NULL;
+
+ priv = this->private;
+ CHANGELOG_NOT_ACTIVE_THEN_GOTO (priv, wind);
+
+ /* 3 == fop + oldloc + newloc */
+ CHANGELOG_INIT_NOCHECK (this, frame->local, NULL, null_uuid, 3);
+
+ co = changelog_get_usable_buffer (frame->local);
+ if (!co)
+ goto wind;
+
+ CHANGLOG_FILL_FOP_NUMBER (co, frame->root->op, fop_fn, xtra_len);
+
+ co++;
+ CHANGELOG_FILL_ENTRY (co, oldloc->pargfid, oldloc->name,
+ entry_fn, entry_free_fn, xtra_len, wind);
+
+ co++;
+ CHANGELOG_FILL_ENTRY (co, newloc->pargfid, newloc->name,
+ entry_fn, entry_free_fn, xtra_len, wind);
+
+ changelog_set_usable_record_and_length (frame->local, xtra_len, 3);
+
+ wind:
+ STACK_WIND (frame, changelog_rename_cbk,
+ FIRST_CHILD (this), FIRST_CHILD (this)->fops->rename,
+ oldloc, newloc, xdata);
+ return 0;
+}
+
+/* link */
+
+int32_t
+changelog_link_cbk (call_frame_t *frame,
+ void *cookie, xlator_t *this, int32_t op_ret,
+ int32_t op_errno, inode_t *inode,
+ struct iatt *buf, struct iatt *preparent,
+ struct iatt *postparent, dict_t *xdata)
+{
+ changelog_priv_t *priv = NULL;
+ changelog_local_t *local = NULL;
+
+ priv = this->private;
+ local = frame->local;
+
+ CHANGELOG_COND_GOTO (priv, ((op_ret < 0) || !local), unwind);
+
+ changelog_update (this, priv, local, CHANGELOG_TYPE_ENTRY);
+
+ unwind:
+ CHANGELOG_STACK_UNWIND (link, frame, op_ret, op_errno,
+ inode, buf, preparent, postparent, xdata);
+ return 0;
+}
+
+int32_t
+changelog_link (call_frame_t *frame,
+ xlator_t *this, loc_t *oldloc,
+ loc_t *newloc, dict_t *xdata)
+{
+ size_t xtra_len = 0;
+ changelog_priv_t *priv = NULL;
+ changelog_opt_t *co = NULL;
+
+ priv = this->private;
+ CHANGELOG_NOT_ACTIVE_THEN_GOTO (priv, wind);
+
+ CHANGELOG_INIT_NOCHECK (this, frame->local, NULL, oldloc->gfid, 2);
+
+ co = changelog_get_usable_buffer (frame->local);
+ if (!co)
+ goto wind;
+
+ CHANGLOG_FILL_FOP_NUMBER (co, frame->root->op, fop_fn, xtra_len);
+
+ co++;
+ CHANGELOG_FILL_ENTRY (co, newloc->pargfid, newloc->name,
+ entry_fn, entry_free_fn, xtra_len, wind);
+
+ changelog_set_usable_record_and_length (frame->local, xtra_len, 2);
+
+ wind:
+ STACK_WIND (frame, changelog_link_cbk,
+ FIRST_CHILD (this), FIRST_CHILD (this)->fops->link,
+ oldloc, newloc, xdata);
+ return 0;
+}
+
+/* mkdir */
+
+int32_t
+changelog_mkdir_cbk (call_frame_t *frame,
+ void *cookie, xlator_t *this, int32_t op_ret,
+ int32_t op_errno, inode_t *inode,
+ struct iatt *buf, struct iatt *preparent,
+ struct iatt *postparent, dict_t *xdata)
+{
+ changelog_priv_t *priv = NULL;
+ changelog_local_t *local = NULL;
+
+ priv = this->private;
+ local = frame->local;
+
+ CHANGELOG_COND_GOTO (priv, ((op_ret < 0) || !local), unwind);
+
+ changelog_update (this, priv, local, CHANGELOG_TYPE_ENTRY);
+
+ unwind:
+ CHANGELOG_STACK_UNWIND (mkdir, frame, op_ret, op_errno,
+ inode, buf, preparent, postparent, xdata);
+ return 0;
+}
+
+int32_t
+changelog_mkdir (call_frame_t *frame, xlator_t *this,
+ loc_t *loc, mode_t mode, mode_t umask, dict_t *xdata)
+{
+ int ret = -1;
+ uuid_t gfid = {0,};
+ void *uuid_req = NULL;
+ size_t xtra_len = 0;
+ changelog_priv_t *priv = NULL;
+ changelog_opt_t *co = NULL;
+
+ priv = this->private;
+ CHANGELOG_NOT_ACTIVE_THEN_GOTO (priv, wind);
+
+ ret = dict_get_ptr (xdata, "gfid-req", &uuid_req);
+ if (ret) {
+ gf_log (this->name, GF_LOG_DEBUG,
+ "failed to get gfid from dict");
+ goto wind;
+ }
+ uuid_copy (gfid, uuid_req);
+
+ CHANGELOG_INIT_NOCHECK (this, frame->local, NULL, gfid, 2);
+
+ co = changelog_get_usable_buffer (frame->local);
+ if (!co)
+ goto wind;
+
+ CHANGLOG_FILL_FOP_NUMBER (co, frame->root->op, fop_fn, xtra_len);
+
+ co++;
+ CHANGELOG_FILL_ENTRY (co, loc->pargfid, loc->name,
+ entry_fn, entry_free_fn, xtra_len, wind);
+
+ changelog_set_usable_record_and_length (frame->local, xtra_len, 2);
+
+ wind:
+ STACK_WIND (frame, changelog_mkdir_cbk,
+ FIRST_CHILD (this), FIRST_CHILD (this)->fops->mkdir,
+ loc, mode, umask, xdata);
+ return 0;
+}
+
+/* symlink */
+
+int32_t
+changelog_symlink_cbk (call_frame_t *frame,
+ void *cookie, xlator_t *this,
+ int32_t op_ret, int32_t op_errno,
+ inode_t *inode, struct iatt *buf, struct iatt *preparent,
+ struct iatt *postparent, dict_t *xdata)
+{
+ changelog_priv_t *priv = NULL;
+ changelog_local_t *local = NULL;
+
+ priv = this->private;
+ local = frame->local;
+
+ CHANGELOG_COND_GOTO (priv, ((op_ret < 0) || !local), unwind);
+
+ changelog_update (this, priv, local, CHANGELOG_TYPE_ENTRY);
+
+ unwind:
+ CHANGELOG_STACK_UNWIND (symlink, frame, op_ret, op_errno,
+ inode, buf, preparent, postparent, xdata);
+ return 0;
+}
+
+int32_t
+changelog_symlink (call_frame_t *frame, xlator_t *this,
+ const char *linkname, loc_t *loc,
+ mode_t umask, dict_t *xdata)
+{
+ int ret = -1;
+ size_t xtra_len = 0;
+ uuid_t gfid = {0,};
+ void *uuid_req = NULL;
+ changelog_priv_t *priv = NULL;
+ changelog_opt_t *co = NULL;
+
+ priv = this->private;
+ CHANGELOG_NOT_ACTIVE_THEN_GOTO (priv, wind);
+
+ ret = dict_get_ptr (xdata, "gfid-req", &uuid_req);
+ if (ret) {
+ gf_log (this->name, GF_LOG_DEBUG,
+ "failed to get gfid from dict");
+ goto wind;
+ }
+ uuid_copy (gfid, uuid_req);
+
+ CHANGELOG_INIT_NOCHECK (this, frame->local, NULL, gfid, 2);
+
+ co = changelog_get_usable_buffer (frame->local);
+ if (!co)
+ goto wind;
+
+ CHANGLOG_FILL_FOP_NUMBER (co, frame->root->op, fop_fn, xtra_len);
+
+ co++;
+ CHANGELOG_FILL_ENTRY (co, loc->pargfid, loc->name,
+ entry_fn, entry_free_fn, xtra_len, wind);
+
+ changelog_set_usable_record_and_length (frame->local, xtra_len, 2);
+
+ wind:
+ STACK_WIND (frame, changelog_symlink_cbk,
+ FIRST_CHILD (this), FIRST_CHILD (this)->fops->symlink,
+ linkname, loc, umask, xdata);
+ return 0;
+}
+
+/* mknod */
+
+int32_t
+changelog_mknod_cbk (call_frame_t *frame,
+ void *cookie, xlator_t *this,
+ int32_t op_ret, int32_t op_errno, inode_t *inode,
+ struct iatt *buf, struct iatt *preparent,
+ struct iatt *postparent, dict_t *xdata)
+{
+ changelog_priv_t *priv = NULL;
+ changelog_local_t *local = NULL;
+
+ priv = this->private;
+ local = frame->local;
+
+ CHANGELOG_COND_GOTO (priv, ((op_ret < 0) || !local), unwind);
+
+ changelog_update (this, priv, local, CHANGELOG_TYPE_ENTRY);
+
+ unwind:
+ CHANGELOG_STACK_UNWIND (mknod, frame, op_ret, op_errno,
+ inode, buf, preparent, postparent, xdata);
+ return 0;
+}
+
+int32_t
+changelog_mknod (call_frame_t *frame,
+ xlator_t *this, loc_t *loc,
+ mode_t mode, dev_t dev, mode_t umask, dict_t *xdata)
+{
+ int ret = -1;
+ uuid_t gfid = {0,};
+ void *uuid_req = NULL;
+ size_t xtra_len = 0;
+ changelog_priv_t *priv = NULL;
+ changelog_opt_t *co = NULL;
+
+ priv = this->private;
+ CHANGELOG_NOT_ACTIVE_THEN_GOTO (priv, wind);
+
+ ret = dict_get_ptr (xdata, "gfid-req", &uuid_req);
+ if (ret) {
+ gf_log (this->name, GF_LOG_DEBUG,
+ "failed to get gfid from dict");
+ goto wind;
+ }
+ uuid_copy (gfid, uuid_req);
+
+ CHANGELOG_INIT_NOCHECK (this, frame->local, NULL, gfid, 2);
+
+ co = changelog_get_usable_buffer (frame->local);
+ if (!co)
+ goto wind;
+
+ CHANGLOG_FILL_FOP_NUMBER (co, frame->root->op, fop_fn, xtra_len);
+
+ co++;
+ CHANGELOG_FILL_ENTRY (co, loc->pargfid, loc->name,
+ entry_fn, entry_free_fn, xtra_len, wind);
+
+ changelog_set_usable_record_and_length (frame->local, xtra_len, 2);
+
+ wind:
+ STACK_WIND (frame, changelog_mknod_cbk,
+ FIRST_CHILD (this), FIRST_CHILD (this)->fops->mknod,
+ loc, mode, dev, umask, xdata);
+ return 0;
+}
+
+/* creat */
+
+int32_t
+changelog_create_cbk (call_frame_t *frame,
+ void *cookie, xlator_t *this,
+ int32_t op_ret, int32_t op_errno,
+ fd_t *fd, inode_t *inode, struct iatt *buf,
+ struct iatt *preparent,
+ struct iatt *postparent, dict_t *xdata)
+{
+ changelog_priv_t *priv = NULL;
+ changelog_local_t *local = NULL;
+
+ priv = this->private;
+ local = frame->local;
+
+ CHANGELOG_COND_GOTO (priv, ((op_ret < 0) || !local), unwind);
+
+ changelog_update (this, priv, local, CHANGELOG_TYPE_ENTRY);
+
+ unwind:
+ CHANGELOG_STACK_UNWIND (create, frame,
+ op_ret, op_errno, fd, inode,
+ buf, preparent, postparent, xdata);
+ return 0;
+}
+
+int32_t
+changelog_create (call_frame_t *frame, xlator_t *this,
+ loc_t *loc, int32_t flags, mode_t mode,
+ mode_t umask, fd_t *fd, dict_t *xdata)
+{
+ int ret = -1;
+ uuid_t gfid = {0,};
+ void *uuid_req = NULL;
+ changelog_opt_t *co = NULL;
+ changelog_priv_t *priv = NULL;
+ size_t xtra_len = 0;
+
+ priv = this->private;
+ CHANGELOG_NOT_ACTIVE_THEN_GOTO (priv, wind);
+
+ ret = dict_get_ptr (xdata, "gfid-req", &uuid_req);
+ if (ret) {
+ gf_log (this->name, GF_LOG_DEBUG,
+ "failed to get gfid from dict");
+ goto wind;
+ }
+ uuid_copy (gfid, uuid_req);
+
+ /* init with two extra records */
+ CHANGELOG_INIT_NOCHECK (this, frame->local, NULL, gfid, 2);
+ if (!frame->local)
+ goto wind;
+
+ co = changelog_get_usable_buffer (frame->local);
+ if (!co)
+ goto wind;
+
+ CHANGLOG_FILL_FOP_NUMBER (co, frame->root->op, fop_fn, xtra_len);
+
+ co++;
+ CHANGELOG_FILL_ENTRY (co, loc->pargfid, loc->name,
+ entry_fn, entry_free_fn, xtra_len, wind);
+
+ changelog_set_usable_record_and_length (frame->local, xtra_len, 2);
+
+ wind:
+ STACK_WIND (frame, changelog_create_cbk,
+ FIRST_CHILD (this), FIRST_CHILD (this)->fops->create,
+ loc, flags, mode, umask, fd, xdata);
+ return 0;
+}
+
+/* }}} */
+
+
+/* Metadata modification fops - TYPE II */
+
+/* {{{ */
+
+/* {f}setattr */
+
+int32_t
+changelog_fsetattr_cbk (call_frame_t *frame,
+ void *cookie, xlator_t *this, int32_t op_ret,
+ int32_t op_errno, struct iatt *preop_stbuf,
+ struct iatt *postop_stbuf, dict_t *xdata)
+{
+ changelog_priv_t *priv = NULL;
+ changelog_local_t *local = NULL;
+
+ priv = this->private;
+ local = frame->local;
+
+ CHANGELOG_COND_GOTO (priv, ((op_ret < 0) || !local), unwind);
+
+ changelog_update (this, priv, local, CHANGELOG_TYPE_METADATA);
+
+ unwind:
+ CHANGELOG_STACK_UNWIND (fsetattr, frame, op_ret, op_errno,
+ preop_stbuf, postop_stbuf, xdata);
+
+ return 0;
+
+
+}
+
+int32_t
+changelog_fsetattr (call_frame_t *frame,
+ xlator_t *this, fd_t *fd,
+ struct iatt *stbuf, int32_t valid, dict_t *xdata)
+{
+ changelog_priv_t *priv = NULL;
+
+ priv = this->private;
+ CHANGELOG_NOT_ACTIVE_THEN_GOTO (priv, wind);
+
+ CHANGELOG_INIT (this, frame->local,
+ fd->inode, fd->inode->gfid, 0);
+
+ wind:
+ STACK_WIND (frame, changelog_fsetattr_cbk,
+ FIRST_CHILD (this), FIRST_CHILD (this)->fops->fsetattr,
+ fd, stbuf, valid, xdata);
+ return 0;
+
+
+}
+
+int32_t
+changelog_setattr_cbk (call_frame_t *frame,
+ void *cookie, xlator_t *this, int32_t op_ret,
+ int32_t op_errno, struct iatt *preop_stbuf,
+ struct iatt *postop_stbuf, dict_t *xdata)
+{
+ changelog_priv_t *priv = NULL;
+ changelog_local_t *local = NULL;
+
+ priv = this->private;
+ local = frame->local;
+
+ CHANGELOG_COND_GOTO (priv, ((op_ret < 0) || !local), unwind);
+
+ changelog_update (this, priv, local, CHANGELOG_TYPE_METADATA);
+
+ unwind:
+ CHANGELOG_STACK_UNWIND (setattr, frame, op_ret, op_errno,
+ preop_stbuf, postop_stbuf, xdata);
+
+ return 0;
+}
+
+int32_t
+changelog_setattr (call_frame_t *frame,
+ xlator_t *this, loc_t *loc,
+ struct iatt *stbuf, int32_t valid, dict_t *xdata)
+{
+ changelog_priv_t *priv = NULL;
+
+ priv = this->private;
+ CHANGELOG_NOT_ACTIVE_THEN_GOTO (priv, wind);
+
+ CHANGELOG_INIT (this, frame->local,
+ loc->inode, loc->inode->gfid, 0);
+
+ wind:
+ STACK_WIND (frame, changelog_setattr_cbk,
+ FIRST_CHILD (this), FIRST_CHILD (this)->fops->setattr,
+ loc, stbuf, valid, xdata);
+ return 0;
+}
+
+/* {f}removexattr */
+
+int32_t
+changelog_fremovexattr_cbk (call_frame_t *frame,
+ void *cookie, xlator_t *this,
+ int32_t op_ret, int32_t op_errno, dict_t *xdata)
+{
+ changelog_priv_t *priv = NULL;
+ changelog_local_t *local = NULL;
+
+ priv = this->private;
+ local = frame->local;
+
+ CHANGELOG_COND_GOTO (priv, ((op_ret < 0) || !local), unwind);
+
+ changelog_update (this, priv, local, CHANGELOG_TYPE_METADATA);
+
+ unwind:
+ CHANGELOG_STACK_UNWIND (fremovexattr, frame, op_ret, op_errno, xdata);
+
+ return 0;
+}
+
+int32_t
+changelog_fremovexattr (call_frame_t *frame, xlator_t *this,
+ fd_t *fd, const char *name, dict_t *xdata)
+{
+ changelog_priv_t *priv = NULL;
+
+ priv = this->private;
+ CHANGELOG_NOT_ACTIVE_THEN_GOTO (priv, wind);
+
+ CHANGELOG_INIT (this, frame->local,
+ fd->inode, fd->inode->gfid, 0);
+
+ wind:
+ STACK_WIND (frame, changelog_fremovexattr_cbk,
+ FIRST_CHILD (this), FIRST_CHILD (this)->fops->fremovexattr,
+ fd, name, xdata);
+ return 0;
+}
+
+int32_t
+changelog_removexattr_cbk (call_frame_t *frame,
+ void *cookie, xlator_t *this,
+ int32_t op_ret, int32_t op_errno, dict_t *xdata)
+{
+ changelog_priv_t *priv = NULL;
+ changelog_local_t *local = NULL;
+
+ priv = this->private;
+ local = frame->local;
+
+ CHANGELOG_COND_GOTO (priv, ((op_ret < 0) || !local), unwind);
+
+ changelog_update (this, priv, local, CHANGELOG_TYPE_METADATA);
+
+ unwind:
+ CHANGELOG_STACK_UNWIND (removexattr, frame, op_ret, op_errno, xdata);
+
+ return 0;
+}
+
+int32_t
+changelog_removexattr (call_frame_t *frame, xlator_t *this,
+ loc_t *loc, const char *name, dict_t *xdata)
+{
+ changelog_priv_t *priv = NULL;
+
+ priv = this->private;
+ CHANGELOG_NOT_ACTIVE_THEN_GOTO (priv, wind);
+
+ CHANGELOG_INIT (this, frame->local,
+ loc->inode, loc->inode->gfid, 0);
+
+ wind:
+ STACK_WIND (frame, changelog_removexattr_cbk,
+ FIRST_CHILD (this), FIRST_CHILD (this)->fops->removexattr,
+ loc, name, xdata);
+ return 0;
+}
+
+/* {f}setxattr */
+
+int32_t
+changelog_setxattr_cbk (call_frame_t *frame,
+ void *cookie, xlator_t *this,
+ int32_t op_ret, int32_t op_errno, dict_t *xdata)
+{
+ changelog_priv_t *priv = NULL;
+ changelog_local_t *local = NULL;
+
+ priv = this->private;
+ local = frame->local;
+
+ CHANGELOG_COND_GOTO (priv, ((op_ret < 0) || !local), unwind);
+
+ changelog_update (this, priv, local, CHANGELOG_TYPE_METADATA);
+
+ unwind:
+ CHANGELOG_STACK_UNWIND (setxattr, frame, op_ret, op_errno, xdata);
+
+ return 0;
+}
+
+int32_t
+changelog_setxattr (call_frame_t *frame,
+ xlator_t *this, loc_t *loc,
+ dict_t *dict, int32_t flags, dict_t *xdata)
+{
+ changelog_priv_t *priv = NULL;
+
+ priv = this->private;
+ CHANGELOG_NOT_ACTIVE_THEN_GOTO (priv, wind);
+
+ CHANGELOG_INIT (this, frame->local,
+ loc->inode, loc->inode->gfid, 0);
+
+ wind:
+ STACK_WIND (frame, changelog_setxattr_cbk,
+ FIRST_CHILD (this), FIRST_CHILD (this)->fops->setxattr,
+ loc, dict, flags, xdata);
+ return 0;
+}
+
+int32_t
+changelog_fsetxattr_cbk (call_frame_t *frame,
+ void *cookie, xlator_t *this, int32_t op_ret,
+ int32_t op_errno, dict_t *xdata)
+{
+ changelog_priv_t *priv = NULL;
+ changelog_local_t *local = NULL;
+
+ priv = this->private;
+ local = frame->local;
+
+ CHANGELOG_COND_GOTO (priv, ((op_ret < 0) || !local), unwind);
+
+ changelog_update (this, priv, local, CHANGELOG_TYPE_METADATA);
+
+ unwind:
+ CHANGELOG_STACK_UNWIND (fsetxattr, frame, op_ret, op_errno, xdata);
+
+ return 0;
+}
+
+int32_t
+changelog_fsetxattr (call_frame_t *frame,
+ xlator_t *this, fd_t *fd, dict_t *dict,
+ int32_t flags, dict_t *xdata)
+{
+ changelog_priv_t *priv = NULL;
+
+ priv = this->private;
+ CHANGELOG_NOT_ACTIVE_THEN_GOTO (priv, wind);
+
+ CHANGELOG_INIT (this, frame->local,
+ fd->inode, fd->inode->gfid, 0);
+
+ wind:
+ STACK_WIND (frame, changelog_fsetxattr_cbk,
+ FIRST_CHILD (this), FIRST_CHILD (this)->fops->fsetxattr,
+ fd, dict, flags, xdata);
+ return 0;
+}
+
+/* }}} */
+
+
+/* Data modification fops - TYPE I */
+
+/* {{{ */
+
+/* {f}truncate() */
+
+int32_t
+changelog_truncate_cbk (call_frame_t *frame,
+ void *cookie, xlator_t *this, int32_t op_ret,
+ int32_t op_errno, struct iatt *prebuf,
+ struct iatt *postbuf, dict_t *xdata)
+{
+ changelog_priv_t *priv = NULL;
+ changelog_local_t *local = NULL;
+
+ priv = this->private;
+ local = frame->local;
+
+ CHANGELOG_COND_GOTO (priv, ((op_ret < 0) || !local), unwind);
+
+ changelog_update (this, priv, local, CHANGELOG_TYPE_DATA);
+
+ unwind:
+ CHANGELOG_STACK_UNWIND (truncate, frame,
+ op_ret, op_errno, prebuf, postbuf, xdata);
+ return 0;
+}
+
+int32_t
+changelog_truncate (call_frame_t *frame,
+ xlator_t *this, loc_t *loc, off_t offset, dict_t *xdata)
+{
+ changelog_priv_t *priv = NULL;
+
+ priv = this->private;
+ CHANGELOG_NOT_ACTIVE_THEN_GOTO (priv, wind);
+
+ CHANGELOG_INIT (this, frame->local,
+ loc->inode, loc->inode->gfid, 0);
+
+ wind:
+ STACK_WIND (frame, changelog_truncate_cbk,
+ FIRST_CHILD (this), FIRST_CHILD (this)->fops->truncate,
+ loc, offset, xdata);
+ return 0;
+}
+
+int32_t
+changelog_ftruncate_cbk (call_frame_t *frame,
+ void *cookie, xlator_t *this, int32_t op_ret,
+ int32_t op_errno, struct iatt *prebuf,
+ struct iatt *postbuf, dict_t *xdata)
+{
+ changelog_priv_t *priv = NULL;
+ changelog_local_t *local = NULL;
+
+ priv = this->private;
+ local = frame->local;
+
+ CHANGELOG_COND_GOTO (priv, ((op_ret < 0) || !local), unwind);
+
+ changelog_update (this, priv, local, CHANGELOG_TYPE_DATA);
+
+ unwind:
+ CHANGELOG_STACK_UNWIND (ftruncate, frame,
+ op_ret, op_errno, prebuf, postbuf, xdata);
+ return 0;
+}
+
+int32_t
+changelog_ftruncate (call_frame_t *frame,
+ xlator_t *this, fd_t *fd, off_t offset, dict_t *xdata)
+{
+ changelog_priv_t *priv = NULL;
+
+ priv = this->private;
+ CHANGELOG_NOT_ACTIVE_THEN_GOTO (priv, wind);
+
+ CHANGELOG_INIT (this, frame->local,
+ fd->inode, fd->inode->gfid, 0);
+
+ wind:
+ STACK_WIND (frame, changelog_ftruncate_cbk,
+ FIRST_CHILD (this), FIRST_CHILD (this)->fops->ftruncate,
+ fd, offset, xdata);
+ return 0;
+}
+
+/* writev() */
+
+int32_t
+changelog_writev_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
+ int32_t op_ret, int32_t op_errno, struct iatt *prebuf,
+ struct iatt *postbuf,
+ dict_t *xdata)
+{
+ changelog_priv_t *priv = NULL;
+ changelog_local_t *local = NULL;
+
+ priv = this->private;
+ local = frame->local;
+
+ CHANGELOG_COND_GOTO (priv, ((op_ret <= 0) || !local), unwind);
+
+ changelog_update (this, priv, local, CHANGELOG_TYPE_DATA);
+
+ unwind:
+ CHANGELOG_STACK_UNWIND (writev, frame,
+ op_ret, op_errno, prebuf, postbuf, xdata);
+ return 0;
+}
+
+int32_t
+changelog_writev (call_frame_t *frame,
+ xlator_t *this, fd_t *fd, struct iovec *vector,
+ int32_t count, off_t offset, uint32_t flags,
+ struct iobref *iobref, dict_t *xdata)
+{
+ changelog_priv_t *priv = NULL;
+
+ priv = this->private;
+ CHANGELOG_NOT_ACTIVE_THEN_GOTO (priv, wind);
+
+ CHANGELOG_INIT (this, frame->local,
+ fd->inode, fd->inode->gfid, 0);
+
+ wind:
+ STACK_WIND (frame, changelog_writev_cbk, FIRST_CHILD (this),
+ FIRST_CHILD (this)->fops->writev, fd, vector,
+ count, offset, flags, iobref, xdata);
+ return 0;
+}
+
+/* }}} */
+
+/**
+ * The
+ * - @init ()
+ * - @fini ()
+ * - @reconfigure ()
+ * ... and helper routines
+ */
+
+/**
+ * needed if there are more operation modes in the future.
+ */
+static void
+changelog_assign_opmode (changelog_priv_t *priv, char *mode)
+{
+ if ( strncmp (mode, "realtime", 8) == 0 ) {
+ priv->op_mode = CHANGELOG_MODE_RT;
+ }
+}
+
+static void
+changelog_assign_encoding (changelog_priv_t *priv, char *enc)
+{
+ if ( strncmp (enc, "binary", 6) == 0 ) {
+ priv->encode_mode = CHANGELOG_ENCODE_BINARY;
+ } else if ( strncmp (enc, "ascii", 5) == 0 ) {
+ priv->encode_mode = CHANGELOG_ENCODE_ASCII;
+ }
+}
+
+/* cleanup any helper threads that are running */
+static void
+changelog_cleanup_helper_threads (xlator_t *this, changelog_priv_t *priv)
+{
+ if (priv->cr.rollover_th) {
+ changelog_thread_cleanup (this, priv->cr.rollover_th);
+ priv->cr.rollover_th = 0;
+ }
+
+ if (priv->cf.fsync_th) {
+ changelog_thread_cleanup (this, priv->cf.fsync_th);
+ priv->cf.fsync_th = 0;
+ }
+}
+
+/* spawn helper thread; cleaning up in case of errors */
+static int
+changelog_spawn_helper_threads (xlator_t *this, changelog_priv_t *priv)
+{
+ int ret = 0;
+
+ priv->cr.this = this;
+ ret = pthread_create (&priv->cr.rollover_th,
+ NULL, changelog_rollover, priv);
+ if (ret)
+ goto out;
+
+ if (priv->fsync_interval) {
+ priv->cf.this = this;
+ ret = pthread_create (&priv->cf.fsync_th,
+ NULL, changelog_fsync_thread, priv);
+ }
+
+ if (ret)
+ changelog_cleanup_helper_threads (this, priv);
+
+ out:
+ return ret;
+}
+
+/* cleanup the notifier thread */
+static int
+changelog_cleanup_notifier (xlator_t *this, changelog_priv_t *priv)
+{
+ int ret = 0;
+
+ if (priv->cn.notify_th) {
+ changelog_thread_cleanup (this, priv->cn.notify_th);
+ priv->cn.notify_th = 0;
+
+ ret = close (priv->wfd);
+ if (ret)
+ gf_log (this->name, GF_LOG_ERROR,
+ "error closing writer end of notifier pipe"
+ " (reason: %s)", strerror (errno));
+ }
+
+ return ret;
+}
+
+/* spawn the notifier thread - nop if already running */
+static int
+changelog_spawn_notifier (xlator_t *this, changelog_priv_t *priv)
+{
+ int ret = 0;
+ int flags = 0;
+ int pipe_fd[2] = {0, 0};
+
+ if (priv->cn.notify_th)
+ goto out; /* notifier thread already running */
+
+ ret = pipe (pipe_fd);
+ if (ret == -1) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "Cannot create pipe (reason: %s)", strerror (errno));
+ goto out;
+ }
+
+ /* writer is non-blocking */
+ flags = fcntl (pipe_fd[1], F_GETFL);
+ flags |= O_NONBLOCK;
+
+ ret = fcntl (pipe_fd[1], F_SETFL, flags);
+ if (ret) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "failed to set O_NONBLOCK flag");
+ goto out;
+ }
+
+ priv->wfd = pipe_fd[1];
+
+ priv->cn.this = this;
+ priv->cn.rfd = pipe_fd[0];
+
+ ret = pthread_create (&priv->cn.notify_th,
+ NULL, changelog_notifier, priv);
+
+ out:
+ return ret;
+}
+
+int32_t
+mem_acct_init (xlator_t *this)
+{
+ int ret = -1;
+
+ if (!this)
+ return ret;
+
+ ret = xlator_mem_acct_init (this, gf_changelog_mt_end + 1);
+
+ if (ret != 0) {
+ gf_log (this->name, GF_LOG_WARNING, "Memory accounting"
+ " init failed");
+ return ret;
+ }
+
+ return ret;
+}
+
+static int
+changelog_init (xlator_t *this, changelog_priv_t *priv)
+{
+ int i = 0;
+ int ret = -1;
+ struct timeval tv = {0,};
+ changelog_log_data_t cld = {0,};
+
+ ret = gettimeofday (&tv, NULL);
+ if (ret) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "gettimeofday() failure");
+ goto out;
+ }
+
+ priv->slice.tv_start = tv;
+
+ priv->maps[CHANGELOG_TYPE_DATA] = "D ";
+ priv->maps[CHANGELOG_TYPE_METADATA] = "M ";
+ priv->maps[CHANGELOG_TYPE_ENTRY] = "E ";
+
+ /* spawn the notifier thread */
+ ret = changelog_spawn_notifier (this, priv);
+ if (ret)
+ goto out;
+
+ /**
+ * start with a fresh changelog file every time. this is done
+ * in case there was an encoding change. so... things are kept
+ * simple here.
+ */
+ ret = changelog_fill_rollover_data (&cld, _gf_false);
+ if (ret)
+ goto out;
+
+ LOCK (&priv->lock);
+ {
+ for (; i < CHANGELOG_MAX_TYPE; i++) {
+ /* start with version 1 */
+ priv->slice.changelog_version[i] = 1;
+ }
+
+ ret = changelog_inject_single_event (this, priv, &cld);
+ }
+ UNLOCK (&priv->lock);
+
+ /* ... and finally spawn the helpers threads */
+ ret = changelog_spawn_helper_threads (this, priv);
+
+ out:
+ return ret;
+}
+
+int
+reconfigure (xlator_t *this, dict_t *options)
+{
+ int ret = 0;
+ char *tmp = NULL;
+ changelog_priv_t *priv = NULL;
+ gf_boolean_t active_earlier = _gf_true;
+ gf_boolean_t active_now = _gf_true;
+ changelog_time_slice_t *slice = NULL;
+ changelog_log_data_t cld = {0,};
+
+ priv = this->private;
+ if (!priv)
+ goto out;
+
+ ret = -1;
+ active_earlier = priv->active;
+
+ /* first stop the rollover and the fsync thread */
+ changelog_cleanup_helper_threads (this, priv);
+
+ GF_OPTION_RECONF ("changelog-dir", tmp, options, str, out);
+ if (!tmp) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "\"changelog-dir\" option is not set");
+ goto out;
+ }
+
+ GF_FREE (priv->changelog_dir);
+ priv->changelog_dir = gf_strdup (tmp);
+ if (!priv->changelog_dir)
+ goto out;
+
+ ret = mkdir_p (priv->changelog_dir, 0600, _gf_true);
+ if (ret)
+ goto out;
+
+ GF_OPTION_RECONF ("changelog", active_now, options, bool, out);
+
+ /**
+ * changelog_handle_change() handles changes that could possibly
+ * have been submit changes before changelog deactivation.
+ */
+ if (!active_now)
+ priv->active = _gf_false;
+
+ GF_OPTION_RECONF ("op-mode", tmp, options, str, out);
+ changelog_assign_opmode (priv, tmp);
+
+ GF_OPTION_RECONF ("encoding", tmp, options, str, out);
+ changelog_assign_encoding (priv, tmp);
+
+ GF_OPTION_RECONF ("rollover-time",
+ priv->rollover_time, options, int32, out);
+ GF_OPTION_RECONF ("fsync-interval",
+ priv->fsync_interval, options, int32, out);
+
+ if (active_now || active_earlier) {
+ ret = changelog_fill_rollover_data (&cld, !active_now);
+ if (ret)
+ goto out;
+
+ slice = &priv->slice;
+
+ LOCK (&priv->lock);
+ {
+ ret = changelog_inject_single_event (this, priv, &cld);
+ if (!ret && active_now)
+ SLICE_VERSION_UPDATE (slice);
+ }
+ UNLOCK (&priv->lock);
+
+ if (ret)
+ goto out;
+
+ if (active_now) {
+ ret = changelog_spawn_notifier (this, priv);
+ if (!ret)
+ ret = changelog_spawn_helper_threads (this,
+ priv);
+ } else
+ ret = changelog_cleanup_notifier (this, priv);
+ }
+
+ out:
+ if (ret) {
+ ret = changelog_cleanup_notifier (this, priv);
+ } else {
+ gf_log (this->name, GF_LOG_DEBUG,
+ "changelog reconfigured");
+ if (active_now)
+ priv->active = _gf_true;
+ }
+
+ return ret;
+}
+
+int32_t
+init (xlator_t *this)
+{
+ int ret = -1;
+ char *tmp = NULL;
+ changelog_priv_t *priv = NULL;
+
+ GF_VALIDATE_OR_GOTO ("changelog", this, out);
+
+ if (!this->children || this->children->next) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "translator needs a single subvolume");
+ goto out;
+ }
+
+ if (!this->parents) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "dangling volume. please check volfile");
+ goto out;
+ }
+
+ priv = GF_CALLOC (1, sizeof (*priv), gf_changelog_mt_priv_t);
+ if (!priv)
+ goto out;
+
+ this->local_pool = mem_pool_new (changelog_local_t, 64);
+ if (!this->local_pool) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "failed to create local memory pool");
+ goto out;
+ }
+
+ LOCK_INIT (&priv->lock);
+
+ GF_OPTION_INIT ("changelog-brick", tmp, str, out);
+ if (!tmp) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "\"changelog-brick\" option is not set");
+ goto out;
+ }
+
+ priv->changelog_brick = gf_strdup (tmp);
+ if (!priv->changelog_brick)
+ goto out;
+ tmp = NULL;
+
+ GF_OPTION_INIT ("changelog-dir", tmp, str, out);
+ if (!tmp) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "\"changelog-dir\" option is not set");
+ goto out;
+ }
+
+ priv->changelog_dir = gf_strdup (tmp);
+ if (!priv->changelog_dir)
+ goto out;
+ tmp = NULL;
+
+ /**
+ * create the directory even if change-logging would be inactive
+ * so that consumers can _look_ into it (finding nothing...)
+ */
+ ret = mkdir_p (priv->changelog_dir, 0600, _gf_true);
+ if (ret)
+ goto out;
+
+ GF_OPTION_INIT ("changelog", priv->active, bool, out);
+ if (!priv->active) {
+ ret = 0;
+ goto out;
+ }
+
+ GF_OPTION_INIT ("op-mode", tmp, str, out);
+ changelog_assign_opmode (priv, tmp);
+
+ tmp = NULL;
+
+ GF_OPTION_INIT ("encoding", tmp, str, out);
+ changelog_assign_encoding (priv, tmp);
+
+ GF_OPTION_INIT ("rollover-time", priv->rollover_time, int32, out);
+
+ GF_OPTION_INIT ("fsync-interval", priv->fsync_interval, int32, out);
+
+ GF_ASSERT (cb_encoder[priv->encode_mode].encoder == priv->encode_mode);
+ priv->ce = &cb_encoder[priv->encode_mode];
+
+ GF_ASSERT (cb_bootstrap[priv->op_mode].mode == priv->op_mode);
+ priv->cb = &cb_bootstrap[priv->op_mode];
+
+ /* ... now bootstrap the logger */
+ ret = priv->cb->ctor (this, &priv->cd);
+ if (ret)
+ goto out;
+
+ priv->changelog_fd = -1;
+ if (priv->active)
+ ret = changelog_init (this, priv);
+ if (ret)
+ goto out;
+
+ gf_log (this->name, GF_LOG_DEBUG, "changelog translator loaded");
+
+ out:
+ if (ret) {
+ if (this->local_pool)
+ mem_pool_destroy (this->local_pool);
+ ret = priv->cb->dtor (this, &priv->cd);
+ if (ret)
+ gf_log (this->name, GF_LOG_ERROR,
+ "error in cleanup during init()");
+ GF_FREE (priv->changelog_brick);
+ GF_FREE (priv->changelog_dir);
+ GF_FREE (priv);
+ this->private = NULL;
+ } else
+ this->private = priv;
+
+ return ret;
+}
+
+void
+fini (xlator_t *this)
+{
+ int ret = -1;
+ changelog_priv_t *priv = NULL;
+
+ priv = this->private;
+
+ if (priv) {
+ ret = priv->cb->dtor (this, &priv->cd);
+ if (ret)
+ gf_log (this->name, GF_LOG_ERROR,
+ "error in fini");
+ mem_pool_destroy (this->local_pool);
+ GF_FREE (priv->changelog_brick);
+ GF_FREE (priv->changelog_dir);
+ GF_FREE (priv);
+ }
+
+ this->private = NULL;
+
+ return;
+}
+
+struct xlator_fops fops = {
+ .mknod = changelog_mknod,
+ .mkdir = changelog_mkdir,
+ .create = changelog_create,
+ .symlink = changelog_symlink,
+ .writev = changelog_writev,
+ .truncate = changelog_truncate,
+ .ftruncate = changelog_ftruncate,
+ .link = changelog_link,
+ .rename = changelog_rename,
+ .unlink = changelog_unlink,
+ .rmdir = changelog_rmdir,
+ .setattr = changelog_setattr,
+ .fsetattr = changelog_fsetattr,
+ .setxattr = changelog_setxattr,
+ .fsetxattr = changelog_fsetxattr,
+ .removexattr = changelog_removexattr,
+ .fremovexattr = changelog_fremovexattr,
+};
+
+struct xlator_cbks cbks = {
+ .forget = changelog_forget,
+};
+
+struct volume_options options[] = {
+ {.key = {"changelog"},
+ .type = GF_OPTION_TYPE_BOOL,
+ .default_value = "off",
+ .description = "enable/disable change-logging"
+ },
+ {.key = {"changelog-brick"},
+ .type = GF_OPTION_TYPE_PATH,
+ .description = "brick path to generate unique socket file name."
+ " should be the export directory of the volume strictly."
+ },
+ {.key = {"changelog-dir"},
+ .type = GF_OPTION_TYPE_PATH,
+ .description = "directory for the changelog files"
+ },
+ {.key = {"op-mode"},
+ .type = GF_OPTION_TYPE_STR,
+ .default_value = "realtime",
+ .value = {"realtime"},
+ .description = "operation mode - futuristic operation modes"
+ },
+ {.key = {"encoding"},
+ .type = GF_OPTION_TYPE_STR,
+ .default_value = "ascii",
+ .value = {"binary", "ascii"},
+ .description = "encoding type for changelogs"
+ },
+ {.key = {"rollover-time"},
+ .default_value = "60",
+ .type = GF_OPTION_TYPE_TIME,
+ .description = "time to switch to a new changelog file (in seconds)"
+ },
+ {.key = {"fsync-interval"},
+ .type = GF_OPTION_TYPE_TIME,
+ .default_value = "0",
+ .description = "do not open CHANGELOG file with O_SYNC mode."
+ " instead perform fsync() at specified intervals"
+ },
+ {.key = {NULL}
+ },
+};