summaryrefslogtreecommitdiffstats
path: root/xlators/features/changelog/lib/src
diff options
context:
space:
mode:
Diffstat (limited to 'xlators/features/changelog/lib/src')
-rw-r--r--xlators/features/changelog/lib/src/Makefile.am41
-rw-r--r--xlators/features/changelog/lib/src/changelog-lib-messages.h74
-rw-r--r--xlators/features/changelog/lib/src/gf-changelog-api.c224
-rw-r--r--xlators/features/changelog/lib/src/gf-changelog-helpers.c224
-rw-r--r--xlators/features/changelog/lib/src/gf-changelog-helpers.h244
-rw-r--r--xlators/features/changelog/lib/src/gf-changelog-journal-handler.c1029
-rw-r--r--xlators/features/changelog/lib/src/gf-changelog-journal.h116
-rw-r--r--xlators/features/changelog/lib/src/gf-changelog-process.c618
-rw-r--r--xlators/features/changelog/lib/src/gf-changelog-reborp.c413
-rw-r--r--xlators/features/changelog/lib/src/gf-changelog-rpc.c98
-rw-r--r--xlators/features/changelog/lib/src/gf-changelog-rpc.h (renamed from xlators/features/changelog/lib/src/changelog.h)25
-rw-r--r--xlators/features/changelog/lib/src/gf-changelog.c985
-rw-r--r--xlators/features/changelog/lib/src/gf-history-changelog.c1098
13 files changed, 3745 insertions, 1444 deletions
diff --git a/xlators/features/changelog/lib/src/Makefile.am b/xlators/features/changelog/lib/src/Makefile.am
index 28d5a70aab2..c933ec53ed2 100644
--- a/xlators/features/changelog/lib/src/Makefile.am
+++ b/xlators/features/changelog/lib/src/Makefile.am
@@ -1,38 +1,35 @@
libgfchangelog_la_CFLAGS = -Wall $(GF_CFLAGS) $(GF_DARWIN_LIBGLUSTERFS_CFLAGS) \
- -DDATADIR=\"$(localstatedir)\"
+ -DDATADIR=\"$(localstatedir)\"
-libgfchangelog_la_CPPFLAGS = $(GF_CPPFLAGS) -D__USE_FILE_OFFSET64 -fpic \
- -I../../../src/ -I$(top_srcdir)/libglusterfs/src \
- -I$(top_srcdir)/xlators/features/changelog/src \
- -DDATADIR=\"$(localstatedir)\"
+libgfchangelog_la_CPPFLAGS = $(GF_CPPFLAGS) -D__USE_FILE_OFFSET64 -D__USE_LARGEFILE64 -fpic \
+ -I../../../src/ -I$(top_srcdir)/libglusterfs/src \
+ -I$(top_srcdir)/xlators/features/changelog/src \
+ -I$(top_srcdir)/rpc/xdr/src -I$(top_builddir)/rpc/xdr/src \
+ -I$(top_srcdir)/rpc/rpc-lib/src \
+ -I$(top_srcdir)/rpc/rpc-transport/socket/src \
+ -DDATADIR=\"$(localstatedir)\"
libgfchangelog_la_LIBADD = $(top_builddir)/libglusterfs/src/libglusterfs.la \
- $(GF_GLUSTERFS_LIBS)
+ $(top_builddir)/rpc/xdr/src/libgfxdr.la \
+ $(top_builddir)/rpc/rpc-lib/src/libgfrpc.la
-libgfchangelog_la_LDFLAGS = $(GF_LDFLAGS) -version-info $(LIBGFCHANGELOG_LT_VERSION)
+libgfchangelog_la_LDFLAGS = $(GF_LDFLAGS) \
+ -version-info $(LIBGFCHANGELOG_LT_VERSION) \
+ $(GF_NO_UNDEFINED)
-libgfchangelogdir = $(includedir)/glusterfs/gfchangelog
lib_LTLIBRARIES = libgfchangelog.la
CONTRIB_BUILDDIR = $(top_builddir)/contrib
-libgfchangelog_la_SOURCES = gf-changelog.c gf-changelog-process.c \
- gf-changelog-helpers.c gf-history-changelog.c \
- $(CONTRIBDIR)/uuid/clear.c \
- $(CONTRIBDIR)/uuid/copy.c $(CONTRIBDIR)/uuid/gen_uuid.c \
- $(CONTRIBDIR)/uuid/pack.c $(CONTRIBDIR)/uuid/parse.c \
- $(CONTRIBDIR)/uuid/unparse.c $(CONTRIBDIR)/uuid/uuid_time.c \
- $(CONTRIBDIR)/uuid/compare.c $(CONTRIBDIR)/uuid/isnull.c \
- $(CONTRIBDIR)/uuid/unpack.c
+libgfchangelog_la_SOURCES = gf-changelog.c gf-changelog-journal-handler.c \
+ gf-changelog-helpers.c gf-changelog-api.c gf-history-changelog.c \
+ gf-changelog-rpc.c gf-changelog-reborp.c \
+ $(top_srcdir)/xlators/features/changelog/src/changelog-rpc-common.c
-noinst_HEADERS = gf-changelog-helpers.h $(CONTRIBDIR)/uuid/uuidd.h \
- $(CONTRIBDIR)/uuid/uuid.h $(CONTRIBDIR)/uuid/uuidP.h \
- $(CONTRIB_BUILDDIR)/uuid/uuid_types.h
-
-libgfchangelog_HEADERS = changelog.h
+noinst_HEADERS = gf-changelog-helpers.h gf-changelog-rpc.h \
+ gf-changelog-journal.h changelog-lib-messages.h
CLEANFILES =
-CONFIG_CLEAN_FILES = $(CONTRIB_BUILDDIR)/uuid/uuid_types.h
$(top_builddir)/libglusterfs/src/libglusterfs.la:
$(MAKE) -C $(top_builddir)/libglusterfs/src/ all
diff --git a/xlators/features/changelog/lib/src/changelog-lib-messages.h b/xlators/features/changelog/lib/src/changelog-lib-messages.h
new file mode 100644
index 00000000000..d7fe7274353
--- /dev/null
+++ b/xlators/features/changelog/lib/src/changelog-lib-messages.h
@@ -0,0 +1,74 @@
+/*
+ Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com>
+ This file is part of GlusterFS.
+
+ This file is licensed to you under your choice of the GNU Lesser
+ General Public License, version 3 or any later version (LGPLv3 or
+ later), or the GNU General Public License, version 2 (GPLv2), in all
+ cases as published by the Free Software Foundation.
+ */
+
+#ifndef _CHANGELOG_LIB_MESSAGES_H_
+#define _CHANGELOG_LIB_MESSAGES_H_
+
+#include <glusterfs/glfs-message-id.h>
+
+/* To add new message IDs, append new identifiers at the end of the list.
+ *
+ * Never remove a message ID. If it's not used anymore, you can rename it or
+ * leave it as it is, but not delete it. This is to prevent reutilization of
+ * IDs by other messages.
+ *
+ * The component name must match one of the entries defined in
+ * glfs-message-id.h.
+ */
+
+GLFS_MSGID(
+ CHANGELOG_LIB, CHANGELOG_LIB_MSG_OPEN_FAILED,
+ CHANGELOG_LIB_MSG_FAILED_TO_RMDIR,
+ CHANGELOG_LIB_MSG_SCRATCH_DIR_ENTRIES_CREATION_ERROR,
+ CHANGELOG_LIB_MSG_THREAD_CREATION_FAILED, CHANGELOG_LIB_MSG_OPENDIR_ERROR,
+ CHANGELOG_LIB_MSG_RENAME_FAILED, CHANGELOG_LIB_MSG_READ_ERROR,
+ CHANGELOG_LIB_MSG_HTIME_ERROR, CHANGELOG_LIB_MSG_GET_TIME_ERROR,
+ CHANGELOG_LIB_MSG_WRITE_FAILED, CHANGELOG_LIB_MSG_PTHREAD_ERROR,
+ CHANGELOG_LIB_MSG_MMAP_FAILED, CHANGELOG_LIB_MSG_MUNMAP_FAILED,
+ CHANGELOG_LIB_MSG_ASCII_ERROR, CHANGELOG_LIB_MSG_STAT_FAILED,
+ CHANGELOG_LIB_MSG_GET_XATTR_FAILED, CHANGELOG_LIB_MSG_PUBLISH_ERROR,
+ CHANGELOG_LIB_MSG_PARSE_ERROR, CHANGELOG_LIB_MSG_MIN_MAX_INFO,
+ CHANGELOG_LIB_MSG_CLEANUP_ERROR, CHANGELOG_LIB_MSG_UNLINK_FAILED,
+ CHANGELOG_LIB_MSG_NOTIFY_REGISTER_FAILED,
+ CHANGELOG_LIB_MSG_INVOKE_RPC_FAILED, CHANGELOG_LIB_MSG_DRAINING_EVENT_INFO,
+ CHANGELOG_LIB_MSG_CLEANING_BRICK_ENTRY_INFO,
+ CHANGELOG_LIB_MSG_FREEING_ENTRY_INFO, CHANGELOG_LIB_MSG_XDR_DECODING_FAILED,
+ CHANGELOG_LIB_MSG_NOTIFY_REGISTER_INFO,
+ CHANGELOG_LIB_MSG_THREAD_CLEANUP_WARNING,
+ CHANGELOG_LIB_MSG_COPY_FROM_BUFFER_FAILED,
+ CHANGELOG_LIB_MSG_PTHREAD_JOIN_FAILED, CHANGELOG_LIB_MSG_HIST_FAILED,
+ CHANGELOG_LIB_MSG_DRAINED_EVENT_INFO, CHANGELOG_LIB_MSG_PARSE_ERROR_CEASED,
+ CHANGELOG_LIB_MSG_REQUESTING_INFO, CHANGELOG_LIB_MSG_FINAL_INFO);
+
+#define CHANGELOG_LIB_MSG_NOTIFY_REGISTER_INFO_STR "Registering brick"
+#define CHANGELOG_LIB_MSG_RENAME_FAILED_STR "error moving changelog file"
+#define CHANGELOG_LIB_MSG_OPEN_FAILED_STR "cannot open changelog file"
+#define CHANGELOG_LIB_MSG_UNLINK_FAILED_STR "failed to unlink"
+#define CHANGELOG_LIB_MSG_FAILED_TO_RMDIR_STR "failed to rmdir"
+#define CHANGELOG_LIB_MSG_STAT_FAILED_STR "stat failed on changelog file"
+#define CHANGELOG_LIB_MSG_PARSE_ERROR_STR "could not parse changelog"
+#define CHANGELOG_LIB_MSG_PARSE_ERROR_CEASED_STR \
+ "parsing error, ceased publishing..."
+#define CHANGELOG_LIB_MSG_HTIME_ERROR_STR "fop failed on htime file"
+#define CHANGELOG_LIB_MSG_GET_XATTR_FAILED_STR \
+ "error extracting max timstamp from htime file"
+#define CHANGELOG_LIB_MSG_MIN_MAX_INFO_STR "changelogs min max"
+#define CHANGELOG_LIB_MSG_REQUESTING_INFO_STR "Requesting historical changelogs"
+#define CHANGELOG_LIB_MSG_FINAL_INFO_STR "FINAL"
+#define CHANGELOG_LIB_MSG_HIST_FAILED_STR \
+ "Requested changelog range is not available"
+#define CHANGELOG_LIB_MSG_GET_TIME_ERROR_STR "wrong result"
+#define CHANGELOG_LIB_MSG_CLEANING_BRICK_ENTRY_INFO_STR \
+ "Cleaning brick entry for brick"
+#define CHANGELOG_LIB_MSG_DRAINING_EVENT_INFO_STR "Draining event"
+#define CHANGELOG_LIB_MSG_DRAINED_EVENT_INFO_STR "Drained event"
+#define CHANGELOG_LIB_MSG_FREEING_ENTRY_INFO_STR "freeing entry"
+
+#endif /* !_CHANGELOG_MESSAGES_H_ */
diff --git a/xlators/features/changelog/lib/src/gf-changelog-api.c b/xlators/features/changelog/lib/src/gf-changelog-api.c
new file mode 100644
index 00000000000..81a5cbfec10
--- /dev/null
+++ b/xlators/features/changelog/lib/src/gf-changelog-api.c
@@ -0,0 +1,224 @@
+/*
+ Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com>
+ This file is part of GlusterFS.
+
+ This file is licensed to you under your choice of the GNU Lesser
+ General Public License, version 3 or any later version (LGPLv3 or
+ later), or the GNU General Public License, version 2 (GPLv2), in all
+ cases as published by the Free Software Foundation.
+*/
+
+#include <glusterfs/compat-uuid.h>
+#include <glusterfs/globals.h>
+#include <glusterfs/glusterfs.h>
+#include <glusterfs/syscall.h>
+
+#include "gf-changelog-helpers.h"
+#include "gf-changelog-journal.h"
+#include "changelog-mem-types.h"
+#include "changelog-lib-messages.h"
+
+int
+gf_changelog_done(char *file)
+{
+ int ret = -1;
+ char *buffer = NULL;
+ xlator_t *this = NULL;
+ gf_changelog_journal_t *jnl = NULL;
+ char to_path[PATH_MAX] = {
+ 0,
+ };
+
+ errno = EINVAL;
+
+ this = THIS;
+ if (!this)
+ goto out;
+
+ jnl = (gf_changelog_journal_t *)GF_CHANGELOG_GET_API_PTR(this);
+ if (!jnl)
+ goto out;
+
+ if (!file || !strlen(file))
+ goto out;
+
+ /* make sure 'file' is inside ->jnl_working_dir */
+ buffer = realpath(file, NULL);
+ if (!buffer)
+ goto out;
+
+ if (strncmp(jnl->jnl_working_dir, buffer, strlen(jnl->jnl_working_dir)))
+ goto out;
+
+ (void)snprintf(to_path, PATH_MAX, "%s%s", jnl->jnl_processed_dir,
+ basename(buffer));
+ gf_msg_debug(this->name, 0, "moving %s to processed directory", file);
+ ret = sys_rename(buffer, to_path);
+ if (ret) {
+ gf_smsg(this->name, GF_LOG_ERROR, errno,
+ CHANGELOG_LIB_MSG_RENAME_FAILED, "from=%s", file, "to=%s",
+ to_path, NULL);
+ goto out;
+ }
+
+ ret = 0;
+
+out:
+ if (buffer)
+ free(buffer); /* allocated by realpath() */
+ return ret;
+}
+
+/**
+ * @API
+ * for a set of changelogs, start from the beginning
+ */
+int
+gf_changelog_start_fresh()
+{
+ xlator_t *this = NULL;
+ gf_changelog_journal_t *jnl = NULL;
+
+ this = THIS;
+ if (!this)
+ goto out;
+
+ errno = EINVAL;
+
+ jnl = (gf_changelog_journal_t *)GF_CHANGELOG_GET_API_PTR(this);
+ if (!jnl)
+ goto out;
+
+ if (gf_ftruncate(jnl->jnl_fd, 0))
+ goto out;
+
+ return 0;
+
+out:
+ return -1;
+}
+
+/**
+ * @API
+ * return the next changelog file entry. zero means all chanelogs
+ * consumed.
+ */
+ssize_t
+gf_changelog_next_change(char *bufptr, size_t maxlen)
+{
+ ssize_t size = -1;
+ int tracker_fd = 0;
+ xlator_t *this = NULL;
+ gf_changelog_journal_t *jnl = NULL;
+ char buffer[PATH_MAX] = {
+ 0,
+ };
+
+ errno = EINVAL;
+
+ this = THIS;
+ if (!this)
+ goto out;
+
+ jnl = (gf_changelog_journal_t *)GF_CHANGELOG_GET_API_PTR(this);
+ if (!jnl)
+ goto out;
+
+ tracker_fd = jnl->jnl_fd;
+
+ size = gf_readline(tracker_fd, buffer, maxlen);
+ if (size < 0) {
+ size = -1;
+ goto out;
+ }
+
+ if (size == 0)
+ goto out;
+
+ memcpy(bufptr, buffer, size - 1);
+ bufptr[size - 1] = '\0';
+
+out:
+ return size;
+}
+
+/**
+ * @API
+ * gf_changelog_scan() - scan and generate a list of change entries
+ *
+ * calling this api multiple times (without calling gf_changlog_done())
+ * would result new changelogs(s) being refreshed in the tracker file.
+ * This call also acts as a cancellation point for the consumer.
+ */
+ssize_t
+gf_changelog_scan()
+{
+ int tracker_fd = 0;
+ size_t off = 0;
+ xlator_t *this = NULL;
+ size_t nr_entries = 0;
+ gf_changelog_journal_t *jnl = NULL;
+ struct dirent *entry = NULL;
+ struct dirent scratch[2] = {
+ {
+ 0,
+ },
+ };
+ char buffer[PATH_MAX] = {
+ 0,
+ };
+
+ this = THIS;
+ if (!this)
+ goto out;
+
+ jnl = (gf_changelog_journal_t *)GF_CHANGELOG_GET_API_PTR(this);
+ if (!jnl)
+ goto out;
+ if (JNL_IS_API_DISCONNECTED(jnl)) {
+ errno = ENOTCONN;
+ goto out;
+ }
+
+ errno = EINVAL;
+
+ tracker_fd = jnl->jnl_fd;
+ if (gf_ftruncate(tracker_fd, 0))
+ goto out;
+
+ rewinddir(jnl->jnl_dir);
+
+ for (;;) {
+ errno = 0;
+ entry = sys_readdir(jnl->jnl_dir, scratch);
+ if (!entry || errno != 0)
+ break;
+
+ if (!strcmp(basename(entry->d_name), ".") ||
+ !strcmp(basename(entry->d_name), ".."))
+ continue;
+
+ nr_entries++;
+
+ GF_CHANGELOG_FILL_BUFFER(jnl->jnl_processing_dir, buffer, off,
+ strlen(jnl->jnl_processing_dir));
+ GF_CHANGELOG_FILL_BUFFER(entry->d_name, buffer, off,
+ strlen(entry->d_name));
+ GF_CHANGELOG_FILL_BUFFER("\n", buffer, off, 1);
+
+ if (gf_changelog_write(tracker_fd, buffer, off) != off) {
+ gf_msg(this->name, GF_LOG_ERROR, 0, CHANGELOG_LIB_MSG_WRITE_FAILED,
+ "error writing changelog filename"
+ " to tracker file");
+ break;
+ }
+ off = 0;
+ }
+
+ if (!entry) {
+ if (gf_lseek(tracker_fd, 0, SEEK_SET) != -1)
+ return nr_entries;
+ }
+out:
+ return -1;
+}
diff --git a/xlators/features/changelog/lib/src/gf-changelog-helpers.c b/xlators/features/changelog/lib/src/gf-changelog-helpers.c
index 1eef8bf0479..75f8a6dfc08 100644
--- a/xlators/features/changelog/lib/src/gf-changelog-helpers.c
+++ b/xlators/features/changelog/lib/src/gf-changelog-helpers.c
@@ -10,40 +10,37 @@
#include "changelog-mem-types.h"
#include "gf-changelog-helpers.h"
-
-ssize_t gf_changelog_read_path (int fd, char *buffer, size_t bufsize)
-{
- return read (fd, buffer, bufsize);
-}
+#include "changelog-lib-messages.h"
+#include <glusterfs/syscall.h>
size_t
-gf_changelog_write (int fd, char *buffer, size_t len)
+gf_changelog_write(int fd, char *buffer, size_t len)
{
- ssize_t size = 0;
- size_t writen = 0;
+ ssize_t size = 0;
+ size_t written = 0;
- while (writen < len) {
- size = write (fd,
- buffer + writen, len - writen);
- if (size <= 0)
- break;
+ while (written < len) {
+ size = sys_write(fd, buffer + written, len - written);
+ if (size <= 0)
+ break;
- writen += size;
- }
+ written += size;
+ }
- return writen;
+ return written;
}
void
-gf_rfc3986_encode (unsigned char *s, char *enc, char *estr)
+gf_rfc3986_encode_space_newline(unsigned char *s, char *enc, char *estr)
{
- for (; *s; s++) {
- if (estr[*s])
- sprintf(enc, "%c", estr[*s]);
- else
- sprintf(enc, "%%%02X", *s);
- while (*++enc);
- }
+ for (; *s; s++) {
+ if (estr[*s])
+ sprintf(enc, "%c", estr[*s]);
+ else
+ sprintf(enc, "%%%02X", *s);
+ while (*++enc)
+ ;
+ }
}
/**
@@ -56,125 +53,118 @@ gf_rfc3986_encode (unsigned char *s, char *enc, char *estr)
* that can be done via @fflush(fp), @ftruncate(fd) and @fseek(fp),
* but this involves mixing POSIX file descriptors and stream FILE *).
*
- * NOTE: This implmentation still does work with more than one fd's
+ * NOTE: This implementation still does work with more than one fd's
* used to perform gf_readline(). For this very reason it's not
* made a part of libglusterfs.
*/
-static pthread_key_t rl_key;
-static pthread_once_t rl_once = PTHREAD_ONCE_INIT;
-
-static void
-readline_destructor (void *ptr)
-{
- GF_FREE (ptr);
-}
-
-static void
-readline_once (void)
-{
- pthread_key_create (&rl_key, readline_destructor);
-}
+static __thread read_line_t thread_tsd = {};
static ssize_t
-my_read (read_line_t *tsd, int fd, char *ptr)
-{
- if (tsd->rl_cnt <= 0) {
- if ( (tsd->rl_cnt = read (fd, tsd->rl_buf, MAXLINE)) < 0 )
- return -1;
- else if (tsd->rl_cnt == 0)
- return 0;
- tsd->rl_bufptr = tsd->rl_buf;
- }
-
- tsd->rl_cnt--;
- *ptr = *tsd->rl_bufptr++;
- return 1;
-}
-
-static int
-gf_readline_init_once (read_line_t **tsd)
+my_read(read_line_t *tsd, int fd, char *ptr)
{
- if (pthread_once (&rl_once, readline_once) != 0)
- return -1;
-
- *tsd = pthread_getspecific (rl_key);
- if (*tsd)
- goto out;
-
- *tsd = GF_CALLOC (1, sizeof (**tsd),
- gf_changelog_mt_libgfchangelog_rl_t);
- if (!*tsd)
- return -1;
+ if (tsd->rl_cnt <= 0) {
+ tsd->rl_cnt = sys_read(fd, tsd->rl_buf, MAXLINE);
- if (pthread_setspecific (rl_key, *tsd) != 0)
- return -1;
+ if (tsd->rl_cnt < 0)
+ return -1;
+ else if (tsd->rl_cnt == 0)
+ return 0;
+ tsd->rl_bufptr = tsd->rl_buf;
+ }
- out:
- return 0;
+ tsd->rl_cnt--;
+ *ptr = *tsd->rl_bufptr++;
+ return 1;
}
ssize_t
-gf_readline (int fd, void *vptr, size_t maxlen)
+gf_readline(int fd, void *vptr, size_t maxlen)
{
- size_t n = 0;
- size_t rc = 0;
- char c = ' ';
- char *ptr = NULL;
- read_line_t *tsd = NULL;
-
- if (gf_readline_init_once (&tsd))
- return -1;
-
- ptr = vptr;
- for (n = 1; n < maxlen; n++) {
- if ( (rc = my_read (tsd, fd, &c)) == 1 ) {
- *ptr++ = c;
- if (c == '\n')
- break;
- } else if (rc == 0) {
- *ptr = '\0';
- return (n - 1);
- } else
- return -1;
- }
-
- *ptr = '\0';
- return n;
-
+ size_t n = 0;
+ size_t rc = 0;
+ char c = ' ';
+ char *ptr = NULL;
+ read_line_t *tsd = &thread_tsd;
+
+ ptr = vptr;
+ for (n = 1; n < maxlen; n++) {
+ if ((rc = my_read(tsd, fd, &c)) == 1) {
+ *ptr++ = c;
+ if (c == '\n')
+ break;
+ } else if (rc == 0) {
+ *ptr = '\0';
+ return (n - 1);
+ } else
+ return -1;
+ }
+
+ *ptr = '\0';
+ return n;
}
off_t
-gf_lseek (int fd, off_t offset, int whence)
+gf_lseek(int fd, off_t offset, int whence)
{
- off_t off = 0;
- read_line_t *tsd = NULL;
+ off_t off = 0;
+ read_line_t *tsd = &thread_tsd;
- if (gf_readline_init_once (&tsd))
- return -1;
+ off = sys_lseek(fd, offset, whence);
+ if (off == -1)
+ return -1;
- if ( (off = lseek (fd, offset, whence)) == -1)
- return -1;
+ tsd->rl_cnt = 0;
+ tsd->rl_bufptr = tsd->rl_buf;
- tsd->rl_cnt = 0;
- tsd->rl_bufptr = tsd->rl_buf;
-
- return off;
+ return off;
}
int
-gf_ftruncate (int fd, off_t length)
+gf_ftruncate(int fd, off_t length)
{
- read_line_t *tsd = NULL;
+ read_line_t *tsd = &thread_tsd;
- if (gf_readline_init_once (&tsd))
- return -1;
+ if (sys_ftruncate(fd, 0))
+ return -1;
- if (ftruncate (fd, 0))
- return -1;
+ tsd->rl_cnt = 0;
+ tsd->rl_bufptr = tsd->rl_buf;
- tsd->rl_cnt = 0;
- tsd->rl_bufptr = tsd->rl_buf;
+ return 0;
+}
- return 0;
+int
+gf_thread_cleanup(xlator_t *this, pthread_t thread)
+{
+ int ret = 0;
+ void *res = NULL;
+
+ ret = pthread_cancel(thread);
+ if (ret != 0) {
+ gf_msg(this->name, GF_LOG_WARNING, 0,
+ CHANGELOG_LIB_MSG_THREAD_CLEANUP_WARNING,
+ "Failed to send cancellation to thread");
+ goto error_return;
+ }
+
+ ret = pthread_join(thread, &res);
+ if (ret != 0) {
+ gf_msg(this->name, GF_LOG_WARNING, 0,
+ CHANGELOG_LIB_MSG_THREAD_CLEANUP_WARNING,
+ "failed to join thread");
+ goto error_return;
+ }
+
+ if (res != PTHREAD_CANCELED) {
+ gf_msg(this->name, GF_LOG_WARNING, 0,
+ CHANGELOG_LIB_MSG_THREAD_CLEANUP_WARNING,
+ "Thread could not be cleaned up");
+ goto error_return;
+ }
+
+ return 0;
+
+error_return:
+ return -1;
}
diff --git a/xlators/features/changelog/lib/src/gf-changelog-helpers.h b/xlators/features/changelog/lib/src/gf-changelog-helpers.h
index 2d545da9e82..9c609d33172 100644
--- a/xlators/features/changelog/lib/src/gf-changelog-helpers.h
+++ b/xlators/features/changelog/lib/src/gf-changelog-helpers.h
@@ -14,88 +14,242 @@
#include <unistd.h>
#include <dirent.h>
#include <limits.h>
-#include <pthread.h>
+#include <glusterfs/locking.h>
-#include <xlator.h>
+#include <glusterfs/xlator.h>
-#define GF_CHANGELOG_TRACKER "tracker"
+#include "changelog.h"
-#define GF_CHANGELOG_CURRENT_DIR ".current"
-#define GF_CHANGELOG_PROCESSED_DIR ".processed"
+#include "changelog-rpc-common.h"
+#include "gf-changelog-journal.h"
+
+#define GF_CHANGELOG_TRACKER "tracker"
+
+#define GF_CHANGELOG_CURRENT_DIR ".current"
+#define GF_CHANGELOG_PROCESSED_DIR ".processed"
#define GF_CHANGELOG_PROCESSING_DIR ".processing"
-#define GF_CHANGELOG_HISTORY_DIR ".history"
+#define GF_CHANGELOG_HISTORY_DIR ".history"
+#define TIMESTAMP_LENGTH 10
#ifndef MAXLINE
#define MAXLINE 4096
#endif
-#define GF_CHANGELOG_FILL_BUFFER(ptr, ascii, off, len) do { \
- memcpy (ascii + off, ptr, len); \
- off += len; \
- } while (0)
+#define GF_CHANGELOG_FILL_BUFFER(ptr, ascii, off, len) \
+ do { \
+ memcpy(ascii + off, ptr, len); \
+ off += len; \
+ } while (0)
typedef struct read_line {
- int rl_cnt;
- char *rl_bufptr;
- char rl_buf[MAXLINE];
+ int rl_cnt;
+ char *rl_bufptr;
+ char rl_buf[MAXLINE];
} read_line_t;
+struct gf_changelog;
+struct gf_event;
+
+/**
+ * Event list for ordered event notification
+ *
+ * ->next_seq holds the next _expected_ sequence number.
+ */
+struct gf_event_list {
+ pthread_mutex_t lock; /* protects this structure */
+ pthread_cond_t cond;
+
+ pthread_t invoker;
+
+ unsigned long next_seq; /* next sequence number expected:
+ zero during bootstrap */
+
+ struct gf_changelog *entry; /* backpointer to it's brick
+ encapsulator (entry) */
+ struct list_head events; /* list of events */
+};
+
+/**
+ * include a refcount if it's of use by additional layers
+ */
+struct gf_event {
+ int count;
+
+ unsigned long seq;
+
+ struct list_head list;
+
+ struct iovec iov[0];
+};
+#define GF_EVENT_CALLOC_SIZE(cnt, len) \
+ (sizeof(struct gf_event) + (cnt * sizeof(struct iovec)) + len)
+
+/**
+ * assign the base address of the IO vector to the correct memory
+o * area and set it's addressable length.
+ */
+#define GF_EVENT_ASSIGN_IOVEC(vec, event, len, pos) \
+ do { \
+ vec->iov_base = ((char *)event) + sizeof(struct gf_event) + \
+ (event->count * sizeof(struct iovec)) + pos; \
+ vec->iov_len = len; \
+ pos += len; \
+ } while (0)
+
+typedef enum gf_changelog_conn_state {
+ GF_CHANGELOG_CONN_STATE_PENDING = 0,
+ GF_CHANGELOG_CONN_STATE_ACCEPTED,
+ GF_CHANGELOG_CONN_STATE_DISCONNECTED,
+} gf_changelog_conn_state_t;
+
+/**
+ * An instance of this structure is allocated for each brick for which
+ * notifications are streamed.
+ */
typedef struct gf_changelog {
- xlator_t *this;
-
- /* 'processing' directory stream */
- DIR *gfc_dir;
+ gf_lock_t statelock;
+ gf_changelog_conn_state_t connstate;
- /* fd to the tracker file */
- int gfc_fd;
+ xlator_t *this;
- /* connection retries */
- int gfc_connretries;
+ struct list_head list; /* list of instances */
- char gfc_sockpath[UNIX_PATH_MAX];
+ char brick[PATH_MAX]; /* brick path for this end-point */
- char gfc_brickpath[PATH_MAX];
+ changelog_rpc_t grpc; /* rpc{-clnt,svc} for this brick */
+#define RPC_PROBER(ent) ent->grpc.rpc
+#define RPC_REBORP(ent) ent->grpc.svc
+#define RPC_SOCK(ent) ent->grpc.sock
- /* socket for recieving notifications */
- int gfc_sockfd;
+ unsigned int notify; /* notification flag(s) */
- char *gfc_working_dir;
+ FINI *fini; /* destructor callback */
+ CALLBACK *callback; /* event callback dispatcher */
+ CONNECT *connected; /* connect callback */
+ DISCONNECT *disconnected; /* disconnection callback */
- /* RFC 3986 string encoding */
- char rfc3986[256];
+ void *ptr; /* owner specific private data */
+ xlator_t *invokerxl; /* consumers _this_, if valid,
+ assigned to THIS before cbk is
+ invoked */
- char gfc_current_dir[PATH_MAX];
- char gfc_processed_dir[PATH_MAX];
- char gfc_processing_dir[PATH_MAX];
+ gf_boolean_t ordered;
- pthread_t gfc_changelog_processor;
+ void (*queueevent)(struct gf_event_list *, struct gf_event *);
+ void (*pickevent)(struct gf_event_list *, struct gf_event **);
- /* Holds gfc for History API */
- struct gf_changelog *hist_gfc;
+ struct gf_event_list event;
} gf_changelog_t;
-int
-gf_changelog_notification_init (xlator_t *this, gf_changelog_t *gfc);
+static inline int
+gf_changelog_filter_check(gf_changelog_t *entry, changelog_event_t *event)
+{
+ if (event->ev_type & entry->notify)
+ return 1;
+ return 0;
+}
+
+#define GF_NEED_ORDERED_EVENTS(ent) (ent->ordered == _gf_true)
+
+/** private structure */
+typedef struct gf_private {
+ pthread_mutex_t lock; /* protects ->connections, cleanups */
+ pthread_cond_t cond;
+
+ void *api; /* pointer for API access */
+
+ pthread_t poller; /* event poller thread */
+ pthread_t connectionjanitor; /* connection cleaner */
+
+ struct list_head connections; /* list of connections */
+ struct list_head cleanups; /* list of connection to be
+ cleaned up */
+} gf_private_t;
+
+#define GF_CHANGELOG_GET_API_PTR(this) (((gf_private_t *)this->private)->api)
+
+/**
+ * upcall: invoke callback with _correct_ THIS
+ */
+#define GF_CHANGELOG_INVOKE_CBK(this, cbk, brick, args...) \
+ do { \
+ xlator_t *old_this = NULL; \
+ xlator_t *invokerxl = NULL; \
+ \
+ invokerxl = entry->invokerxl; \
+ old_this = this; \
+ \
+ if (invokerxl) { \
+ THIS = invokerxl; \
+ } \
+ \
+ cbk(invokerxl, brick, args); \
+ THIS = old_this; \
+ \
+ } while (0)
+
+#define SAVE_THIS(xl) \
+ do { \
+ old_this = xl; \
+ THIS = master; \
+ } while (0)
+
+#define RESTORE_THIS() \
+ do { \
+ if (old_this) \
+ THIS = old_this; \
+ } while (0)
+
+/** APIs and the rest */
void *
-gf_changelog_process (void *data);
-
-ssize_t
-gf_changelog_read_path (int fd, char *buffer, size_t bufsize);
+gf_changelog_process(void *data);
void
-gf_rfc3986_encode (unsigned char *s, char *enc, char *estr);
+gf_rfc3986_encode_space_newline(unsigned char *s, char *enc, char *estr);
size_t
-gf_changelog_write (int fd, char *buffer, size_t len);
+gf_changelog_write(int fd, char *buffer, size_t len);
ssize_t
-gf_readline (int fd, void *vptr, size_t maxlen);
+gf_readline(int fd, void *vptr, size_t maxlen);
int
-gf_ftruncate (int fd, off_t length);
+gf_ftruncate(int fd, off_t length);
off_t
-gf_lseek (int fd, off_t offset, int whence);
+gf_lseek(int fd, off_t offset, int whence);
+
+int
+gf_changelog_consume(xlator_t *this, gf_changelog_journal_t *jnl,
+ char *from_path, gf_boolean_t no_publish);
+int
+gf_changelog_publish(xlator_t *this, gf_changelog_journal_t *jnl,
+ char *from_path);
+int
+gf_thread_cleanup(xlator_t *this, pthread_t thread);
+void *
+gf_changelog_callback_invoker(void *arg);
+
+int
+gf_cleanup_event(xlator_t *, struct gf_event_list *);
+
+/* (un)ordered event queueing */
+void
+queue_ordered_event(struct gf_event_list *, struct gf_event *);
+
+void
+queue_unordered_event(struct gf_event_list *, struct gf_event *);
+
+/* (un)ordered event picking */
+void
+pick_event_ordered(struct gf_event_list *, struct gf_event **);
+
+void
+pick_event_unordered(struct gf_event_list *, struct gf_event **);
+
+/* connection janitor thread */
+void *
+gf_changelog_connection_janitor(void *);
#endif
diff --git a/xlators/features/changelog/lib/src/gf-changelog-journal-handler.c b/xlators/features/changelog/lib/src/gf-changelog-journal-handler.c
new file mode 100644
index 00000000000..7f6e2329e71
--- /dev/null
+++ b/xlators/features/changelog/lib/src/gf-changelog-journal-handler.c
@@ -0,0 +1,1029 @@
+/*
+ Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com>
+ This file is part of GlusterFS.
+
+ This file is licensed to you under your choice of the GNU Lesser
+ General Public License, version 3 or any later version (LGPLv3 or
+ later), or the GNU General Public License, version 2 (GPLv2), in all
+ cases as published by the Free Software Foundation.
+*/
+
+#include <glusterfs/compat-uuid.h>
+#include <glusterfs/globals.h>
+#include <glusterfs/glusterfs.h>
+#include <glusterfs/syscall.h>
+#include <glusterfs/compat-errno.h>
+
+#include "gf-changelog-helpers.h"
+
+/* from the changelog translator */
+#include "changelog-misc.h"
+#include "changelog-mem-types.h"
+
+#include "gf-changelog-journal.h"
+#include "changelog-lib-messages.h"
+
+extern int byebye;
+
+enum changelog_versions { VERSION_1_1 = 0, VERSION_1_2 = 1 };
+
+/**
+ * number of gfid records after fop number
+ */
+int nr_gfids[2][GF_FOP_MAXVALUE] = {{
+ [GF_FOP_MKNOD] = 1,
+ [GF_FOP_MKDIR] = 1,
+ [GF_FOP_UNLINK] = 1,
+ [GF_FOP_RMDIR] = 1,
+ [GF_FOP_SYMLINK] = 1,
+ [GF_FOP_RENAME] = 2,
+ [GF_FOP_LINK] = 1,
+ [GF_FOP_CREATE] = 1,
+ },
+ {
+ [GF_FOP_MKNOD] = 1,
+ [GF_FOP_MKDIR] = 1,
+ [GF_FOP_UNLINK] = 2,
+ [GF_FOP_RMDIR] = 2,
+ [GF_FOP_SYMLINK] = 1,
+ [GF_FOP_RENAME] = 2,
+ [GF_FOP_LINK] = 1,
+ [GF_FOP_CREATE] = 1,
+ }};
+
+int nr_extra_recs[2][GF_FOP_MAXVALUE] = {{
+ [GF_FOP_MKNOD] = 3,
+ [GF_FOP_MKDIR] = 3,
+ [GF_FOP_UNLINK] = 0,
+ [GF_FOP_RMDIR] = 0,
+ [GF_FOP_SYMLINK] = 0,
+ [GF_FOP_RENAME] = 0,
+ [GF_FOP_LINK] = 0,
+ [GF_FOP_CREATE] = 3,
+ },
+ {
+ [GF_FOP_MKNOD] = 3,
+ [GF_FOP_MKDIR] = 3,
+ [GF_FOP_UNLINK] = 0,
+ [GF_FOP_RMDIR] = 0,
+ [GF_FOP_SYMLINK] = 0,
+ [GF_FOP_RENAME] = 0,
+ [GF_FOP_LINK] = 0,
+ [GF_FOP_CREATE] = 3,
+ }};
+
+static char *
+binary_to_ascii(uuid_t uuid)
+{
+ return uuid_utoa(uuid);
+}
+
+static char *
+conv_noop(char *ptr)
+{
+ return ptr;
+}
+
+#define VERIFY_SEPARATOR(ptr, plen, perr) \
+ { \
+ if (*(ptr + plen) != '\0') { \
+ perr = 1; \
+ break; \
+ } \
+ }
+
+#define MOVER_MOVE(mover, nleft, bytes) \
+ { \
+ mover += bytes; \
+ nleft -= bytes; \
+ }
+
+#define PARSE_GFID(mov, ptr, le, fn, perr) \
+ { \
+ VERIFY_SEPARATOR(mov, le, perr); \
+ ptr = fn(mov); \
+ if (!ptr) { \
+ perr = 1; \
+ break; \
+ } \
+ }
+
+#define FILL_AND_MOVE(pt, buf, of, mo, nl, le) \
+ { \
+ GF_CHANGELOG_FILL_BUFFER(pt, buf, of, strlen(pt)); \
+ MOVER_MOVE(mo, nl, le); \
+ }
+
+#define PARSE_GFID_MOVE(ptr, uuid, mover, nleft, perr) \
+ { \
+ memcpy(uuid, mover, sizeof(uuid_t)); \
+ ptr = binary_to_ascii(uuid); \
+ if (!ptr) { \
+ perr = 1; \
+ break; \
+ } \
+ MOVER_MOVE(mover, nleft, sizeof(uuid_t)); \
+ }
+
+#define LINE_BUFSIZE (3 * PATH_MAX) /* enough buffer for extra chars too */
+
+/**
+ * using mmap() makes parsing easy. fgets() cannot be used here as
+ * the binary gfid could contain a line-feed (0x0A), in that case fgets()
+ * would read an incomplete line and parsing would fail. using POSIX fds
+ * would result is additional code to maintain state in case of partial
+ * reads of data (where multiple entries do not fit extirely in the buffer).
+ *
+ * mmap() gives the flexibility of pointing to an offset in the file
+ * without us worrying about reading it in memory (VM does that for us for
+ * free).
+ */
+
+static int
+gf_changelog_parse_binary(xlator_t *this, gf_changelog_journal_t *jnl,
+ int from_fd, int to_fd, size_t start_offset,
+ struct stat *stbuf, int version_idx)
+
+{
+ int ret = -1;
+ off_t off = 0;
+ off_t nleft = 0;
+ uuid_t uuid = {
+ 0,
+ };
+ char *ptr = NULL;
+ char *bname_start = NULL;
+ char *bname_end = NULL;
+ char *mover = NULL;
+ void *start = NULL;
+ char current_mover = ' ';
+ size_t blen = 0;
+ int parse_err = 0;
+ char *ascii = NULL;
+
+ ascii = GF_CALLOC(LINE_BUFSIZE, sizeof(char), gf_common_mt_char);
+
+ nleft = stbuf->st_size;
+
+ start = mmap(NULL, nleft, PROT_READ, MAP_PRIVATE, from_fd, 0);
+ if (start == MAP_FAILED) {
+ gf_msg(this->name, GF_LOG_ERROR, errno, CHANGELOG_LIB_MSG_MMAP_FAILED,
+ "mmap() error");
+ goto out;
+ }
+
+ mover = start;
+
+ MOVER_MOVE(mover, nleft, start_offset);
+
+ while (nleft > 0) {
+ off = blen = 0;
+ ptr = bname_start = bname_end = NULL;
+
+ current_mover = *mover;
+
+ switch (current_mover) {
+ case 'D':
+ case 'M':
+ MOVER_MOVE(mover, nleft, 1);
+ PARSE_GFID_MOVE(ptr, uuid, mover, nleft, parse_err);
+
+ break;
+
+ case 'E':
+ MOVER_MOVE(mover, nleft, 1);
+ PARSE_GFID_MOVE(ptr, uuid, mover, nleft, parse_err);
+
+ bname_start = mover;
+ bname_end = strchr(mover, '\n');
+ if (bname_end == NULL) {
+ parse_err = 1;
+ break;
+ }
+
+ blen = bname_end - bname_start;
+ MOVER_MOVE(mover, nleft, blen);
+
+ break;
+
+ default:
+ parse_err = 1;
+ }
+
+ if (parse_err)
+ break;
+
+ GF_CHANGELOG_FILL_BUFFER(&current_mover, ascii, off, 1);
+ GF_CHANGELOG_FILL_BUFFER(" ", ascii, off, 1);
+ GF_CHANGELOG_FILL_BUFFER(ptr, ascii, off, strlen(ptr));
+ if (blen)
+ GF_CHANGELOG_FILL_BUFFER(bname_start, ascii, off, blen);
+ GF_CHANGELOG_FILL_BUFFER("\n", ascii, off, 1);
+
+ if (gf_changelog_write(to_fd, ascii, off) != off) {
+ gf_msg(this->name, GF_LOG_ERROR, errno,
+ CHANGELOG_LIB_MSG_ASCII_ERROR,
+ "processing binary changelog failed due to "
+ " error in writing ascii change");
+ break;
+ }
+
+ MOVER_MOVE(mover, nleft, 1);
+ }
+
+ if ((nleft == 0) && (!parse_err))
+ ret = 0;
+
+ if (munmap(start, stbuf->st_size))
+ gf_msg(this->name, GF_LOG_ERROR, errno, CHANGELOG_LIB_MSG_MUNMAP_FAILED,
+ "munmap() error");
+out:
+ if (ascii)
+ GF_FREE(ascii);
+ return ret;
+}
+
+/**
+ * ascii decoder:
+ * - separate out one entry from another
+ * - use fop name rather than fop number
+ */
+static int
+gf_changelog_parse_ascii(xlator_t *this, gf_changelog_journal_t *jnl,
+ int from_fd, int to_fd, size_t start_offset,
+ struct stat *stbuf, int version_idx)
+{
+ int ng = 0;
+ int ret = -1;
+ int fop = 0;
+ int len = 0;
+ off_t off = 0;
+ off_t nleft = 0;
+ char *ptr = NULL;
+ char *eptr = NULL;
+ void *start = NULL;
+ char *mover = NULL;
+ int parse_err = 0;
+ char current_mover = ' ';
+ char *ascii = NULL;
+ const char *fopname = NULL;
+
+ ascii = GF_CALLOC(LINE_BUFSIZE, sizeof(char), gf_common_mt_char);
+
+ nleft = stbuf->st_size;
+
+ start = mmap(NULL, nleft, PROT_READ, MAP_PRIVATE, from_fd, 0);
+ if (start == MAP_FAILED) {
+ gf_msg(this->name, GF_LOG_ERROR, errno, CHANGELOG_LIB_MSG_MMAP_FAILED,
+ "mmap() error");
+ goto out;
+ }
+
+ mover = start;
+
+ MOVER_MOVE(mover, nleft, start_offset);
+
+ while (nleft > 0) {
+ off = 0;
+ current_mover = *mover;
+
+ GF_CHANGELOG_FILL_BUFFER(&current_mover, ascii, off, 1);
+ GF_CHANGELOG_FILL_BUFFER(" ", ascii, off, 1);
+
+ switch (current_mover) {
+ case 'D':
+ MOVER_MOVE(mover, nleft, 1);
+
+ /* target gfid */
+ PARSE_GFID(mover, ptr, UUID_CANONICAL_FORM_LEN, conv_noop,
+ parse_err);
+ FILL_AND_MOVE(ptr, ascii, off, mover, nleft,
+ UUID_CANONICAL_FORM_LEN);
+ break;
+ case 'M':
+ MOVER_MOVE(mover, nleft, 1);
+
+ /* target gfid */
+ PARSE_GFID(mover, ptr, UUID_CANONICAL_FORM_LEN, conv_noop,
+ parse_err);
+ FILL_AND_MOVE(ptr, ascii, off, mover, nleft,
+ UUID_CANONICAL_FORM_LEN);
+ FILL_AND_MOVE(" ", ascii, off, mover, nleft, 1);
+
+ /* fop */
+ len = strlen(mover);
+ VERIFY_SEPARATOR(mover, len, parse_err);
+
+ fop = atoi(mover);
+ fopname = gf_fop_list[fop];
+ if (fopname == NULL) {
+ parse_err = 1;
+ break;
+ }
+
+ MOVER_MOVE(mover, nleft, len);
+
+ len = strlen(fopname);
+ GF_CHANGELOG_FILL_BUFFER(fopname, ascii, off, len);
+
+ break;
+
+ case 'E':
+ MOVER_MOVE(mover, nleft, 1);
+
+ /* target gfid */
+ PARSE_GFID(mover, ptr, UUID_CANONICAL_FORM_LEN, conv_noop,
+ parse_err);
+ FILL_AND_MOVE(ptr, ascii, off, mover, nleft,
+ UUID_CANONICAL_FORM_LEN);
+ FILL_AND_MOVE(" ", ascii, off, mover, nleft, 1);
+
+ /* fop */
+ len = strlen(mover);
+ VERIFY_SEPARATOR(mover, len, parse_err);
+
+ fop = atoi(mover);
+ fopname = gf_fop_list[fop];
+ if (fopname == NULL) {
+ parse_err = 1;
+ break;
+ }
+
+ MOVER_MOVE(mover, nleft, len);
+
+ len = strlen(fopname);
+ GF_CHANGELOG_FILL_BUFFER(fopname, ascii, off, len);
+
+ ng = nr_extra_recs[version_idx][fop];
+ for (; ng > 0; ng--) {
+ MOVER_MOVE(mover, nleft, 1);
+ len = strlen(mover);
+ VERIFY_SEPARATOR(mover, len, parse_err);
+
+ GF_CHANGELOG_FILL_BUFFER(" ", ascii, off, 1);
+ FILL_AND_MOVE(mover, ascii, off, mover, nleft, len);
+ }
+
+ /* pargfid + bname */
+ ng = nr_gfids[version_idx][fop];
+ while (ng-- > 0) {
+ MOVER_MOVE(mover, nleft, 1);
+ len = strlen(mover);
+ if (!len) {
+ MOVER_MOVE(mover, nleft, 1);
+ continue;
+ }
+
+ GF_CHANGELOG_FILL_BUFFER(" ", ascii, off, 1);
+
+ PARSE_GFID(mover, ptr, len, conv_noop, parse_err);
+ eptr = calloc(3, strlen(ptr));
+ if (!eptr) {
+ parse_err = 1;
+ break;
+ }
+
+ gf_rfc3986_encode_space_newline((unsigned char *)ptr, eptr,
+ jnl->rfc3986_space_newline);
+ FILL_AND_MOVE(eptr, ascii, off, mover, nleft, len);
+ free(eptr);
+ }
+
+ break;
+ default:
+ parse_err = 1;
+ }
+
+ if (parse_err)
+ break;
+
+ GF_CHANGELOG_FILL_BUFFER("\n", ascii, off, 1);
+
+ if (gf_changelog_write(to_fd, ascii, off) != off) {
+ gf_msg(this->name, GF_LOG_ERROR, errno,
+ CHANGELOG_LIB_MSG_ASCII_ERROR,
+ "processing ascii changelog failed due to "
+ " error in writing change");
+ break;
+ }
+
+ MOVER_MOVE(mover, nleft, 1);
+ }
+
+ if ((nleft == 0) && (!parse_err))
+ ret = 0;
+
+ if (munmap(start, stbuf->st_size))
+ gf_msg(this->name, GF_LOG_ERROR, errno, CHANGELOG_LIB_MSG_MUNMAP_FAILED,
+ "munmap() error");
+
+out:
+ if (ascii)
+ GF_FREE(ascii);
+
+ return ret;
+}
+
+static int
+gf_changelog_decode(xlator_t *this, gf_changelog_journal_t *jnl, int from_fd,
+ int to_fd, struct stat *stbuf, int *zerob)
+{
+ int ret = -1;
+ int encoding = -1;
+ int major_version = -1;
+ int minor_version = -1;
+ int version_idx = -1;
+ size_t elen = 0;
+ char buffer[1024] = {
+ 0,
+ };
+
+ CHANGELOG_GET_HEADER_INFO(from_fd, buffer, sizeof(buffer), encoding,
+ major_version, minor_version, elen);
+ if (encoding == -1) /* unknown encoding */
+ goto out;
+
+ if (major_version == -1) /* unknown major version */
+ goto out;
+
+ if (minor_version == -1) /* unknown minor version */
+ goto out;
+
+ if (!CHANGELOG_VALID_ENCODING(encoding))
+ goto out;
+
+ if (elen == stbuf->st_size) {
+ *zerob = 1;
+ goto out;
+ }
+
+ if (major_version == 1 && minor_version == 1) {
+ version_idx = VERSION_1_1;
+ } else if (major_version == 1 && minor_version == 2) {
+ version_idx = VERSION_1_2;
+ }
+
+ if (version_idx == -1) /* unknown version number */
+ goto out;
+
+ /**
+ * start processing after the header
+ */
+ if (sys_lseek(from_fd, elen, SEEK_SET) < 0) {
+ goto out;
+ }
+ switch (encoding) {
+ case CHANGELOG_ENCODE_BINARY:
+ /**
+ * this ideally should have been a part of changelog-encoders.c
+ * (ie. part of the changelog translator).
+ */
+ ret = gf_changelog_parse_binary(this, jnl, from_fd, to_fd, elen,
+ stbuf, version_idx);
+ break;
+
+ case CHANGELOG_ENCODE_ASCII:
+ ret = gf_changelog_parse_ascii(this, jnl, from_fd, to_fd, elen,
+ stbuf, version_idx);
+ break;
+ }
+
+out:
+ return ret;
+}
+
+int
+gf_changelog_publish(xlator_t *this, gf_changelog_journal_t *jnl,
+ char *from_path)
+{
+ int ret = 0;
+ char dest[PATH_MAX] = {
+ 0,
+ };
+ char to_path[PATH_MAX] = {
+ 0,
+ };
+ struct stat stbuf = {
+ 0,
+ };
+
+ if (snprintf(to_path, PATH_MAX, "%s%s", jnl->jnl_current_dir,
+ basename(from_path)) >= PATH_MAX)
+ return -1;
+
+ /* handle zerob file that won't exist in current */
+ ret = sys_stat(to_path, &stbuf);
+ if (ret) {
+ if (errno == ENOENT)
+ ret = 0;
+ goto out;
+ }
+
+ if (snprintf(dest, PATH_MAX, "%s%s", jnl->jnl_processing_dir,
+ basename(from_path)) >= PATH_MAX)
+ return -1;
+
+ ret = sys_rename(to_path, dest);
+ if (ret) {
+ gf_smsg(this->name, GF_LOG_ERROR, errno,
+ CHANGELOG_LIB_MSG_RENAME_FAILED, "from=%s", to_path, "to=%s",
+ dest, NULL);
+ }
+
+out:
+ return ret;
+}
+
+int
+gf_changelog_consume(xlator_t *this, gf_changelog_journal_t *jnl,
+ char *from_path, gf_boolean_t no_publish)
+{
+ int ret = -1;
+ int fd1 = 0;
+ int fd2 = 0;
+ int zerob = 0;
+ struct stat stbuf = {
+ 0,
+ };
+ char dest[PATH_MAX] = {
+ 0,
+ };
+ char to_path[PATH_MAX] = {
+ 0,
+ };
+
+ if (snprintf(to_path, PATH_MAX, "%s%s", jnl->jnl_current_dir,
+ basename(from_path)) >= PATH_MAX)
+ goto out;
+ if (snprintf(dest, PATH_MAX, "%s%s", jnl->jnl_processing_dir,
+ basename(from_path)) >= PATH_MAX)
+ goto out;
+
+ ret = sys_stat(from_path, &stbuf);
+ if (ret || !S_ISREG(stbuf.st_mode)) {
+ ret = -1;
+ gf_smsg(this->name, GF_LOG_ERROR, errno, CHANGELOG_LIB_MSG_STAT_FAILED,
+ "path=%s", from_path, NULL);
+ goto out;
+ }
+
+ fd1 = open(from_path, O_RDONLY);
+ if (fd1 < 0) {
+ gf_smsg(this->name, GF_LOG_ERROR, errno, CHANGELOG_LIB_MSG_OPEN_FAILED,
+ "path=%s", from_path, NULL);
+ goto out;
+ }
+
+ fd2 = open(to_path, O_CREAT | O_TRUNC | O_RDWR,
+ S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
+ if (fd2 < 0) {
+ gf_smsg(this->name, GF_LOG_ERROR, errno, CHANGELOG_LIB_MSG_OPEN_FAILED,
+ "path=%s", to_path, NULL);
+ goto close_fd;
+ } else {
+ ret = gf_changelog_decode(this, jnl, fd1, fd2, &stbuf, &zerob);
+
+ sys_close(fd2);
+
+ if (!ret) {
+ /* move it to processing on a successful
+ decode */
+ if (no_publish == _gf_true)
+ goto close_fd;
+ ret = sys_rename(to_path, dest);
+ if (ret)
+ gf_smsg(this->name, GF_LOG_ERROR, errno,
+ CHANGELOG_LIB_MSG_RENAME_FAILED, "from=%s", to_path,
+ "to=%s", dest, NULL);
+ }
+
+ /* remove it from .current if it's an empty file */
+ if (zerob) {
+ /* zerob changelogs must be unlinked */
+ ret = sys_unlink(to_path);
+ if (ret)
+ gf_smsg(this->name, GF_LOG_ERROR, errno,
+ CHANGELOG_LIB_MSG_UNLINK_FAILED, "name=empty changelog",
+ "path=%s", to_path, NULL);
+ }
+ }
+
+close_fd:
+ sys_close(fd1);
+
+out:
+ return ret;
+}
+
+void *
+gf_changelog_process(void *data)
+{
+ xlator_t *this = NULL;
+ gf_changelog_journal_t *jnl = NULL;
+ gf_changelog_entry_t *entry = NULL;
+ gf_changelog_processor_t *jnl_proc = NULL;
+
+ jnl = data;
+ jnl_proc = jnl->jnl_proc;
+ THIS = jnl->this;
+ this = jnl->this;
+
+ while (1) {
+ pthread_mutex_lock(&jnl_proc->lock);
+ {
+ while (list_empty(&jnl_proc->entries)) {
+ jnl_proc->waiting = _gf_true;
+ pthread_cond_wait(&jnl_proc->cond, &jnl_proc->lock);
+ }
+
+ entry = list_first_entry(&jnl_proc->entries, gf_changelog_entry_t,
+ list);
+ if (entry)
+ list_del(&entry->list);
+
+ jnl_proc->waiting = _gf_false;
+ }
+ pthread_mutex_unlock(&jnl_proc->lock);
+
+ if (entry) {
+ (void)gf_changelog_consume(this, jnl, entry->path, _gf_false);
+ GF_FREE(entry);
+ }
+ }
+
+ return NULL;
+}
+
+void
+gf_changelog_queue_journal(gf_changelog_processor_t *jnl_proc,
+ changelog_event_t *event)
+{
+ size_t len = 0;
+ gf_changelog_entry_t *entry = NULL;
+
+ entry = GF_CALLOC(1, sizeof(gf_changelog_entry_t),
+ gf_changelog_mt_libgfchangelog_entry_t);
+ if (!entry)
+ return;
+ INIT_LIST_HEAD(&entry->list);
+
+ len = strlen(event->u.journal.path);
+ (void)memcpy(entry->path, event->u.journal.path, len + 1);
+ entry->path[len] = '\0';
+
+ pthread_mutex_lock(&jnl_proc->lock);
+ {
+ list_add_tail(&entry->list, &jnl_proc->entries);
+ if (jnl_proc->waiting)
+ pthread_cond_signal(&jnl_proc->cond);
+ }
+ pthread_mutex_unlock(&jnl_proc->lock);
+
+ return;
+}
+
+void
+gf_changelog_handle_journal(void *xl, char *brick, void *cbkdata,
+ changelog_event_t *event)
+{
+ gf_changelog_journal_t *jnl = NULL;
+ gf_changelog_processor_t *jnl_proc = NULL;
+
+ jnl = cbkdata;
+ jnl_proc = jnl->jnl_proc;
+
+ gf_changelog_queue_journal(jnl_proc, event);
+}
+
+void
+gf_changelog_journal_disconnect(void *xl, char *brick, void *data)
+{
+ gf_changelog_journal_t *jnl = NULL;
+
+ jnl = data;
+
+ pthread_spin_lock(&jnl->lock);
+ {
+ JNL_SET_API_STATE(jnl, JNL_API_DISCONNECTED);
+ };
+ pthread_spin_unlock(&jnl->lock);
+}
+
+void
+gf_changelog_journal_connect(void *xl, char *brick, void *data)
+{
+ gf_changelog_journal_t *jnl = NULL;
+
+ jnl = data;
+
+ pthread_spin_lock(&jnl->lock);
+ {
+ JNL_SET_API_STATE(jnl, JNL_API_CONNECTED);
+ };
+ pthread_spin_unlock(&jnl->lock);
+
+ return;
+}
+
+void
+gf_changelog_cleanup_processor(gf_changelog_journal_t *jnl)
+{
+ int ret = 0;
+ xlator_t *this = NULL;
+ gf_changelog_processor_t *jnl_proc = NULL;
+
+ this = THIS;
+ if (!this || !jnl || !jnl->jnl_proc)
+ goto error_return;
+
+ jnl_proc = jnl->jnl_proc;
+
+ ret = gf_thread_cleanup(this, jnl_proc->processor);
+ if (ret != 0) {
+ gf_msg(this->name, GF_LOG_ERROR, 0, CHANGELOG_LIB_MSG_CLEANUP_ERROR,
+ "failed to cleanup processor thread");
+ goto error_return;
+ }
+
+ (void)pthread_mutex_destroy(&jnl_proc->lock);
+ (void)pthread_cond_destroy(&jnl_proc->cond);
+
+ GF_FREE(jnl_proc);
+
+error_return:
+ return;
+}
+
+int
+gf_changelog_init_processor(gf_changelog_journal_t *jnl)
+{
+ int ret = -1;
+ gf_changelog_processor_t *jnl_proc = NULL;
+
+ jnl_proc = GF_CALLOC(1, sizeof(gf_changelog_processor_t),
+ gf_changelog_mt_libgfchangelog_t);
+ if (!jnl_proc)
+ goto error_return;
+
+ ret = pthread_mutex_init(&jnl_proc->lock, NULL);
+ if (ret != 0)
+ goto free_jnl_proc;
+ ret = pthread_cond_init(&jnl_proc->cond, NULL);
+ if (ret != 0)
+ goto cleanup_mutex;
+
+ INIT_LIST_HEAD(&jnl_proc->entries);
+ jnl_proc->waiting = _gf_false;
+ jnl->jnl_proc = jnl_proc;
+
+ ret = gf_thread_create(&jnl_proc->processor, NULL, gf_changelog_process,
+ jnl, "clogproc");
+ if (ret != 0) {
+ jnl->jnl_proc = NULL;
+ goto cleanup_cond;
+ }
+
+ return 0;
+
+cleanup_cond:
+ (void)pthread_cond_destroy(&jnl_proc->cond);
+cleanup_mutex:
+ (void)pthread_mutex_destroy(&jnl_proc->lock);
+free_jnl_proc:
+ GF_FREE(jnl_proc);
+error_return:
+ return -1;
+}
+
+static void
+gf_changelog_cleanup_fds(gf_changelog_journal_t *jnl)
+{
+ /* tracker fd */
+ if (jnl->jnl_fd != -1)
+ sys_close(jnl->jnl_fd);
+ /* processing dir */
+ if (jnl->jnl_dir)
+ sys_closedir(jnl->jnl_dir);
+
+ if (jnl->jnl_working_dir)
+ free(jnl->jnl_working_dir); /* allocated by realpath */
+}
+
+static int
+gf_changelog_open_dirs(xlator_t *this, gf_changelog_journal_t *jnl)
+{
+ int ret = -1;
+ DIR *dir = NULL;
+ int tracker_fd = 0;
+ char tracker_path[PATH_MAX] = {
+ 0,
+ };
+
+ /* .current */
+ (void)snprintf(jnl->jnl_current_dir, PATH_MAX,
+ "%s/" GF_CHANGELOG_CURRENT_DIR "/", jnl->jnl_working_dir);
+ ret = recursive_rmdir(jnl->jnl_current_dir);
+ if (ret) {
+ gf_smsg(this->name, GF_LOG_ERROR, errno,
+ CHANGELOG_LIB_MSG_FAILED_TO_RMDIR, "path=%s",
+ jnl->jnl_current_dir, NULL);
+ goto out;
+ }
+ ret = mkdir_p(jnl->jnl_current_dir, 0600, _gf_false);
+ if (ret)
+ goto out;
+
+ /* .processed */
+ (void)snprintf(jnl->jnl_processed_dir, PATH_MAX,
+ "%s/" GF_CHANGELOG_PROCESSED_DIR "/", jnl->jnl_working_dir);
+ ret = mkdir_p(jnl->jnl_processed_dir, 0600, _gf_false);
+ if (ret)
+ goto out;
+
+ /* .processing */
+ (void)snprintf(jnl->jnl_processing_dir, PATH_MAX,
+ "%s/" GF_CHANGELOG_PROCESSING_DIR "/", jnl->jnl_working_dir);
+ ret = recursive_rmdir(jnl->jnl_processing_dir);
+ if (ret) {
+ gf_smsg(this->name, GF_LOG_ERROR, errno,
+ CHANGELOG_LIB_MSG_FAILED_TO_RMDIR, "path=%s",
+ jnl->jnl_processing_dir, NULL);
+ goto out;
+ }
+
+ ret = mkdir_p(jnl->jnl_processing_dir, 0600, _gf_false);
+ if (ret)
+ goto out;
+
+ dir = sys_opendir(jnl->jnl_processing_dir);
+ if (!dir) {
+ gf_msg("", GF_LOG_ERROR, errno, CHANGELOG_LIB_MSG_OPENDIR_ERROR,
+ "opendir() error");
+ goto out;
+ }
+
+ jnl->jnl_dir = dir;
+
+ (void)snprintf(tracker_path, PATH_MAX, "%s/" GF_CHANGELOG_TRACKER,
+ jnl->jnl_working_dir);
+
+ tracker_fd = open(tracker_path, O_CREAT | O_APPEND | O_RDWR,
+ S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
+ if (tracker_fd < 0) {
+ sys_closedir(jnl->jnl_dir);
+ ret = -1;
+ goto out;
+ }
+
+ jnl->jnl_fd = tracker_fd;
+ ret = 0;
+out:
+ return ret;
+}
+
+int
+gf_changelog_init_history(xlator_t *this, gf_changelog_journal_t *jnl,
+ char *brick_path)
+{
+ int i = 0;
+ int ret = 0;
+ char hist_scratch_dir[PATH_MAX] = {
+ 0,
+ };
+
+ jnl->hist_jnl = GF_CALLOC(1, sizeof(*jnl),
+ gf_changelog_mt_libgfchangelog_t);
+ if (!jnl->hist_jnl)
+ goto error_return;
+
+ jnl->hist_jnl->jnl_dir = NULL;
+ jnl->hist_jnl->jnl_fd = -1;
+
+ (void)snprintf(hist_scratch_dir, PATH_MAX,
+ "%s/" GF_CHANGELOG_HISTORY_DIR "/", jnl->jnl_working_dir);
+
+ ret = mkdir_p(hist_scratch_dir, 0600, _gf_false);
+ if (ret)
+ goto dealloc_hist;
+
+ jnl->hist_jnl->jnl_working_dir = realpath(hist_scratch_dir, NULL);
+ if (!jnl->hist_jnl->jnl_working_dir)
+ goto dealloc_hist;
+
+ ret = gf_changelog_open_dirs(this, jnl->hist_jnl);
+ if (ret) {
+ gf_msg(this->name, GF_LOG_ERROR, 0, CHANGELOG_LIB_MSG_OPENDIR_ERROR,
+ "could not create entries in history scratch dir");
+ goto dealloc_hist;
+ }
+
+ if (snprintf(jnl->hist_jnl->jnl_brickpath, PATH_MAX, "%s", brick_path) >=
+ PATH_MAX)
+ goto dealloc_hist;
+
+ for (i = 0; i < 256; i++) {
+ jnl->hist_jnl->rfc3986_space_newline[i] = (i == ' ' || i == '\n' ||
+ i == '%')
+ ? 0
+ : i;
+ }
+
+ return 0;
+
+dealloc_hist:
+ GF_FREE(jnl->hist_jnl);
+ jnl->hist_jnl = NULL;
+error_return:
+ return -1;
+}
+
+void
+gf_changelog_journal_fini(void *xl, char *brick, void *data)
+{
+ gf_changelog_journal_t *jnl = NULL;
+
+ jnl = data;
+
+ gf_changelog_cleanup_processor(jnl);
+
+ gf_changelog_cleanup_fds(jnl);
+ if (jnl->hist_jnl)
+ gf_changelog_cleanup_fds(jnl->hist_jnl);
+
+ GF_FREE(jnl);
+}
+
+void *
+gf_changelog_journal_init(void *xl, struct gf_brick_spec *brick)
+{
+ int i = 0;
+ int ret = 0;
+ xlator_t *this = NULL;
+ struct stat buf = {
+ 0,
+ };
+ char *scratch_dir = NULL;
+ gf_changelog_journal_t *jnl = NULL;
+
+ this = xl;
+ scratch_dir = (char *)brick->ptr;
+
+ jnl = GF_CALLOC(1, sizeof(gf_changelog_journal_t),
+ gf_changelog_mt_libgfchangelog_t);
+ if (!jnl)
+ goto error_return;
+
+ if (snprintf(jnl->jnl_brickpath, PATH_MAX, "%s", brick->brick_path) >=
+ PATH_MAX)
+ goto dealloc_private;
+
+ if (sys_stat(scratch_dir, &buf) && errno == ENOENT) {
+ ret = mkdir_p(scratch_dir, 0600, _gf_true);
+ if (ret)
+ goto dealloc_private;
+ }
+
+ jnl->jnl_working_dir = realpath(scratch_dir, NULL);
+ if (!jnl->jnl_working_dir)
+ goto dealloc_private;
+
+ ret = gf_changelog_open_dirs(this, jnl);
+ if (ret) {
+ gf_msg(this->name, GF_LOG_ERROR, 0, CHANGELOG_LIB_MSG_OPENDIR_ERROR,
+ "could not create entries in scratch dir");
+ goto dealloc_private;
+ }
+
+ /* RFC 3986 {de,en}coding */
+ for (i = 0; i < 256; i++) {
+ jnl->rfc3986_space_newline[i] = (i == ' ' || i == '\n' || i == '%') ? 0
+ : i;
+ }
+
+ ret = gf_changelog_init_history(this, jnl, brick->brick_path);
+ if (ret)
+ goto cleanup_fds;
+
+ /* initialize journal processor */
+ jnl->this = this;
+ ret = gf_changelog_init_processor(jnl);
+ if (ret)
+ goto cleanup_fds;
+
+ JNL_SET_API_STATE(jnl, JNL_API_CONN_INPROGESS);
+ ret = pthread_spin_init(&jnl->lock, 0);
+ if (ret != 0)
+ goto cleanup_processor;
+ return jnl;
+
+cleanup_processor:
+ gf_changelog_cleanup_processor(jnl);
+cleanup_fds:
+ gf_changelog_cleanup_fds(jnl);
+ if (jnl->hist_jnl)
+ gf_changelog_cleanup_fds(jnl->hist_jnl);
+dealloc_private:
+ GF_FREE(jnl);
+error_return:
+ return NULL;
+}
diff --git a/xlators/features/changelog/lib/src/gf-changelog-journal.h b/xlators/features/changelog/lib/src/gf-changelog-journal.h
new file mode 100644
index 00000000000..ba5b9bf827e
--- /dev/null
+++ b/xlators/features/changelog/lib/src/gf-changelog-journal.h
@@ -0,0 +1,116 @@
+/*
+ Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com>
+ This file is part of GlusterFS.
+
+ This file is licensed to you under your choice of the GNU Lesser
+ General Public License, version 3 or any later version (LGPLv3 or
+ later), or the GNU General Public License, version 2 (GPLv2), in all
+ cases as published by the Free Software Foundation.
+*/
+
+#ifndef __GF_CHANGELOG_JOURNAL_H
+#define __GF_CHANGELOG_JOURNAL_H
+
+#include <unistd.h>
+#include <pthread.h>
+
+#include "changelog.h"
+
+enum api_conn {
+ JNL_API_CONNECTED,
+ JNL_API_CONN_INPROGESS,
+ JNL_API_DISCONNECTED,
+};
+
+typedef struct gf_changelog_entry {
+ char path[PATH_MAX];
+
+ struct list_head list;
+} gf_changelog_entry_t;
+
+typedef struct gf_changelog_processor {
+ pthread_mutex_t lock; /* protects ->entries */
+ pthread_cond_t cond; /* waiter during empty list */
+ gf_boolean_t waiting;
+
+ pthread_t processor; /* thread-id of journal processing thread */
+
+ struct list_head entries;
+} gf_changelog_processor_t;
+
+typedef struct gf_changelog_journal {
+ DIR *jnl_dir; /* 'processing' directory stream */
+
+ int jnl_fd; /* fd to the tracker file */
+
+ char jnl_brickpath[PATH_MAX]; /* brick path for this end-point */
+
+ gf_changelog_processor_t *jnl_proc;
+
+ char *jnl_working_dir; /* scratch directory */
+
+ char jnl_current_dir[PATH_MAX];
+ char jnl_processed_dir[PATH_MAX];
+ char jnl_processing_dir[PATH_MAX];
+
+ char rfc3986_space_newline[256]; /* RFC 3986 string encoding */
+
+ struct gf_changelog_journal *hist_jnl;
+ int hist_done; /* holds 0 done scanning,
+ 1 keep scanning and -1 error */
+
+ pthread_spinlock_t lock;
+ int connected;
+ xlator_t *this;
+} gf_changelog_journal_t;
+
+#define JNL_SET_API_STATE(jnl, state) (jnl->connected = state)
+#define JNL_IS_API_DISCONNECTED(jnl) (jnl->connected == JNL_API_DISCONNECTED)
+
+/* History API */
+typedef struct gf_changelog_history_data {
+ int len;
+
+ int htime_fd;
+
+ /* parallelism count */
+ int n_parallel;
+
+ /* history from, to indexes */
+ unsigned long from;
+ unsigned long to;
+ xlator_t *this;
+} gf_changelog_history_data_t;
+
+typedef struct gf_changelog_consume_data {
+ /** set of inputs */
+
+ /* fd to read from */
+ int fd;
+
+ /* from @offset */
+ off_t offset;
+
+ xlator_t *this;
+
+ gf_changelog_journal_t *jnl;
+
+ /** set of outputs */
+
+ /* return value */
+ int retval;
+
+ /* journal processed */
+ char changelog[PATH_MAX];
+} gf_changelog_consume_data_t;
+
+/* event handler */
+CALLBACK gf_changelog_handle_journal;
+
+/* init, connect & disconnect handler */
+INIT gf_changelog_journal_init;
+FINI gf_changelog_journal_fini;
+CONNECT gf_changelog_journal_connect;
+DISCONNECT gf_changelog_journal_disconnect;
+
+#endif
diff --git a/xlators/features/changelog/lib/src/gf-changelog-process.c b/xlators/features/changelog/lib/src/gf-changelog-process.c
deleted file mode 100644
index 3ea2700c62b..00000000000
--- a/xlators/features/changelog/lib/src/gf-changelog-process.c
+++ /dev/null
@@ -1,618 +0,0 @@
-/*
- Copyright (c) 2013 Red Hat, Inc. <http://www.redhat.com>
- This file is part of GlusterFS.
-
- This file is licensed to you under your choice of the GNU Lesser
- General Public License, version 3 or any later version (LGPLv3 or
- later), or the GNU General Public License, version 2 (GPLv2), in all
- cases as published by the Free Software Foundation.
-*/
-
-#include <unistd.h>
-#include <pthread.h>
-
-#include "uuid.h"
-#include "globals.h"
-#include "glusterfs.h"
-
-#include "gf-changelog-helpers.h"
-
-/* from the changelog translator */
-#include "changelog-misc.h"
-
-extern int byebye;
-
-/**
- * number of gfid records after fop number
- */
-int nr_gfids[] = {
- [GF_FOP_MKNOD] = 1,
- [GF_FOP_MKDIR] = 1,
- [GF_FOP_UNLINK] = 1,
- [GF_FOP_RMDIR] = 1,
- [GF_FOP_SYMLINK] = 1,
- [GF_FOP_RENAME] = 2,
- [GF_FOP_LINK] = 1,
- [GF_FOP_CREATE] = 1,
-};
-
-int nr_extra_recs[] = {
- [GF_FOP_MKNOD] = 3,
- [GF_FOP_MKDIR] = 3,
- [GF_FOP_UNLINK] = 0,
- [GF_FOP_RMDIR] = 0,
- [GF_FOP_SYMLINK] = 0,
- [GF_FOP_RENAME] = 0,
- [GF_FOP_LINK] = 0,
- [GF_FOP_CREATE] = 3,
-};
-
-static char *
-binary_to_ascii (uuid_t uuid)
-{
- return uuid_utoa (uuid);
-}
-
-static char *
-conv_noop (char *ptr) { return ptr; }
-
-#define VERIFY_SEPARATOR(ptr, plen, perr) \
- { \
- if (*(ptr + plen) != '\0') { \
- perr = 1; \
- break; \
- } \
- }
-
-#define MOVER_MOVE(mover, nleft, bytes) \
- { \
- mover += bytes; \
- nleft -= bytes; \
- } \
-
-#define PARSE_GFID(mov, ptr, le, fn, perr) \
- { \
- VERIFY_SEPARATOR (mov, le, perr); \
- ptr = fn (mov); \
- if (!ptr) { \
- perr = 1; \
- break; \
- } \
- }
-
-#define FILL_AND_MOVE(pt, buf, of, mo, nl, le) \
- { \
- GF_CHANGELOG_FILL_BUFFER (pt, buf, of, strlen (pt)); \
- MOVER_MOVE (mo, nl, le); \
- }
-
-
-#define PARSE_GFID_MOVE(ptr, uuid, mover, nleft, perr) \
- { \
- memcpy (uuid, mover, sizeof (uuid_t)); \
- ptr = binary_to_ascii (uuid); \
- if (!ptr) { \
- perr = 1; \
- break; \
- } \
- MOVER_MOVE (mover, nleft, sizeof (uuid_t)); \
- } \
-
-#define LINE_BUFSIZE 3*PATH_MAX /* enough buffer for extra chars too */
-
-/**
- * using mmap() makes parsing easy. fgets() cannot be used here as
- * the binary gfid could contain a line-feed (0x0A), in that case fgets()
- * would read an incomplete line and parsing would fail. using POSIX fds
- * would result is additional code to maintain state in case of partial
- * reads of data (where multiple entries do not fit extirely in the buffer).
- *
- * mmap() gives the flexibility of pointing to an offset in the file
- * without us worrying about reading it in memory (VM does that for us for
- * free).
- */
-
-static int
-gf_changelog_parse_binary (xlator_t *this,
- gf_changelog_t *gfc, int from_fd, int to_fd,
- size_t start_offset, struct stat *stbuf)
-
-{
- int ret = -1;
- off_t off = 0;
- off_t nleft = 0;
- uuid_t uuid = {0,};
- char *ptr = NULL;
- char *bname_start = NULL;
- char *bname_end = NULL;
- char *mover = NULL;
- char *start = NULL;
- char current_mover = ' ';
- size_t blen = 0;
- int parse_err = 0;
- char ascii[LINE_BUFSIZE] = {0,};
-
- nleft = stbuf->st_size;
-
- start = (char *) mmap (NULL, nleft,
- PROT_READ, MAP_PRIVATE, from_fd, 0);
- if (!start) {
- gf_log (this->name, GF_LOG_ERROR,
- "mmap() error (reason: %s)", strerror (errno));
- goto out;
- }
-
- mover = start;
-
- MOVER_MOVE (mover, nleft, start_offset);
-
- while (nleft > 0) {
-
- off = blen = 0;
- ptr = bname_start = bname_end = NULL;
-
- current_mover = *mover;
-
- switch (current_mover) {
- case 'D':
- case 'M':
- MOVER_MOVE (mover, nleft, 1);
- PARSE_GFID_MOVE (ptr, uuid, mover, nleft, parse_err);
-
- break;
-
- case 'E':
- MOVER_MOVE (mover, nleft, 1);
- PARSE_GFID_MOVE (ptr, uuid, mover, nleft, parse_err);
-
- bname_start = mover;
- if ( (bname_end = strchr (mover, '\n')) == NULL ) {
- parse_err = 1;
- break;
- }
-
- blen = bname_end - bname_start;
- MOVER_MOVE (mover, nleft, blen);
-
- break;
-
- default:
- parse_err = 1;
- }
-
- if (parse_err)
- break;
-
- GF_CHANGELOG_FILL_BUFFER (&current_mover, ascii, off, 1);
- GF_CHANGELOG_FILL_BUFFER (" ", ascii, off, 1);
- GF_CHANGELOG_FILL_BUFFER (ptr, ascii, off, strlen (ptr));
- if (blen)
- GF_CHANGELOG_FILL_BUFFER (bname_start,
- ascii, off, blen);
- GF_CHANGELOG_FILL_BUFFER ("\n", ascii, off, 1);
-
- if (gf_changelog_write (to_fd, ascii, off) != off) {
- gf_log (this->name, GF_LOG_ERROR,
- "processing binary changelog failed due to "
- " error in writing ascii change (reason: %s)",
- strerror (errno));
- break;
- }
-
- MOVER_MOVE (mover, nleft, 1);
- }
-
- if ( (nleft == 0) && (!parse_err))
- ret = 0;
-
- if (munmap (start, stbuf->st_size))
- gf_log (this->name, GF_LOG_ERROR,
- "munmap() error (reason: %s)", strerror (errno));
- out:
- return ret;
-}
-
-/**
- * ascii decoder:
- * - separate out one entry from another
- * - use fop name rather than fop number
- */
-static int
-gf_changelog_parse_ascii (xlator_t *this,
- gf_changelog_t *gfc, int from_fd, int to_fd,
- size_t start_offset, struct stat *stbuf)
-{
- int ng = 0;
- int ret = -1;
- int fop = 0;
- int len = 0;
- off_t off = 0;
- off_t nleft = 0;
- char *ptr = NULL;
- char *eptr = NULL;
- char *start = NULL;
- char *mover = NULL;
- int parse_err = 0;
- char current_mover = ' ';
- char ascii[LINE_BUFSIZE] = {0,};
- const char *fopname = NULL;
-
- nleft = stbuf->st_size;
-
- start = (char *) mmap (NULL, nleft,
- PROT_READ, MAP_PRIVATE, from_fd, 0);
- if (!start) {
- gf_log (this->name, GF_LOG_ERROR,
- "mmap() error (reason: %s)", strerror (errno));
- goto out;
- }
-
- mover = start;
-
- MOVER_MOVE (mover, nleft, start_offset);
-
- while (nleft > 0) {
- off = 0;
- current_mover = *mover;
-
- GF_CHANGELOG_FILL_BUFFER (&current_mover, ascii, off, 1);
- GF_CHANGELOG_FILL_BUFFER (" ", ascii, off, 1);
-
- switch (current_mover) {
- case 'D':
- MOVER_MOVE (mover, nleft, 1);
-
- /* target gfid */
- PARSE_GFID (mover, ptr, UUID_CANONICAL_FORM_LEN,
- conv_noop, parse_err);
- FILL_AND_MOVE(ptr, ascii, off,
- mover, nleft, UUID_CANONICAL_FORM_LEN);
- break;
- case 'M':
- MOVER_MOVE (mover, nleft, 1);
-
- /* target gfid */
- PARSE_GFID (mover, ptr, UUID_CANONICAL_FORM_LEN,
- conv_noop, parse_err);
- FILL_AND_MOVE (ptr, ascii, off,
- mover, nleft, UUID_CANONICAL_FORM_LEN);
- FILL_AND_MOVE (" ", ascii, off, mover, nleft, 1);
-
- /* fop */
- len = strlen (mover);
- VERIFY_SEPARATOR (mover, len, parse_err);
-
- fop = atoi (mover);
- if ( (fopname = gf_fop_list[fop]) == NULL) {
- parse_err = 1;
- break;
- }
-
- MOVER_MOVE (mover, nleft, len);
-
- len = strlen (fopname);
- GF_CHANGELOG_FILL_BUFFER (fopname, ascii, off, len);
-
- break;
-
- case 'E':
- MOVER_MOVE (mover, nleft, 1);
-
- /* target gfid */
- PARSE_GFID (mover, ptr, UUID_CANONICAL_FORM_LEN,
- conv_noop, parse_err);
- FILL_AND_MOVE (ptr, ascii, off,
- mover, nleft, UUID_CANONICAL_FORM_LEN);
- FILL_AND_MOVE (" ", ascii, off,
- mover, nleft, 1);
-
- /* fop */
- len = strlen (mover);
- VERIFY_SEPARATOR (mover, len, parse_err);
-
- fop = atoi (mover);
- if ( (fopname = gf_fop_list[fop]) == NULL) {
- parse_err = 1;
- break;
- }
-
- MOVER_MOVE (mover, nleft, len);
-
- len = strlen (fopname);
- GF_CHANGELOG_FILL_BUFFER (fopname, ascii, off, len);
-
- ng = nr_extra_recs[fop];
- for (;ng > 0; ng--) {
- MOVER_MOVE (mover, nleft, 1);
- len = strlen (mover);
- VERIFY_SEPARATOR (mover, len, parse_err);
-
- GF_CHANGELOG_FILL_BUFFER (" ", ascii, off, 1);
- FILL_AND_MOVE (mover, ascii,
- off, mover, nleft, len);
- }
-
- /* pargfid + bname */
- ng = nr_gfids[fop];
- while (ng-- > 0) {
- MOVER_MOVE (mover, nleft, 1);
- len = strlen (mover);
- GF_CHANGELOG_FILL_BUFFER (" ", ascii, off, 1);
-
- PARSE_GFID (mover, ptr, len,
- conv_noop, parse_err);
- eptr = calloc (3, strlen (ptr));
- if (!eptr) {
- parse_err = 1;
- break;
- }
-
- gf_rfc3986_encode ((unsigned char *) ptr,
- eptr, gfc->rfc3986);
- FILL_AND_MOVE (eptr, ascii, off,
- mover, nleft, len);
- free (eptr);
- }
-
- break;
- default:
- parse_err = 1;
- }
-
- if (parse_err)
- break;
-
- GF_CHANGELOG_FILL_BUFFER ("\n", ascii, off, 1);
-
- if (gf_changelog_write (to_fd, ascii, off) != off) {
- gf_log (this->name, GF_LOG_ERROR,
- "processing ascii changelog failed due to "
- " error in writing change (reason: %s)",
- strerror (errno));
- break;
- }
-
- MOVER_MOVE (mover, nleft, 1);
-
- }
-
- if ( (nleft == 0) && (!parse_err))
- ret = 0;
-
- if (munmap (start, stbuf->st_size))
- gf_log (this->name, GF_LOG_ERROR,
- "munmap() error (reason: %s)", strerror (errno));
-
- out:
- return ret;
-}
-
-#define COPY_BUFSIZE 8192
-static int
-gf_changelog_copy (xlator_t *this, int from_fd, int to_fd)
-{
- ssize_t size = 0;
- char buffer[COPY_BUFSIZE+1] = {0,};
-
- while (1) {
- size = read (from_fd, buffer, COPY_BUFSIZE);
- if (size <= 0)
- break;
-
- if (gf_changelog_write (to_fd,
- buffer, size) != size) {
- gf_log (this->name, GF_LOG_ERROR,
- "error processing ascii changlog");
- size = -1;
- break;
- }
- }
-
- return (size < 0 ? -1 : 0);
-}
-
-static int
-gf_changelog_decode (xlator_t *this, gf_changelog_t *gfc, int from_fd,
- int to_fd, struct stat *stbuf, int *zerob)
-{
- int ret = -1;
- int encoding = -1;
- size_t elen = 0;
- char buffer[1024] = {0,};
-
- CHANGELOG_GET_ENCODING (from_fd, buffer, 1024, encoding, elen);
- if (encoding == -1) /* unknown encoding */
- goto out;
-
- if (!CHANGELOG_VALID_ENCODING (encoding))
- goto out;
-
- if (elen == stbuf->st_size) {
- *zerob = 1;
- goto out;
- }
-
- /**
- * start processing after the header
- */
- lseek (from_fd, elen, SEEK_SET);
-
- switch (encoding) {
- case CHANGELOG_ENCODE_BINARY:
- /**
- * this ideally should have been a part of changelog-encoders.c
- * (ie. part of the changelog translator).
- */
- ret = gf_changelog_parse_binary (this, gfc, from_fd,
- to_fd, elen, stbuf);
- break;
-
- case CHANGELOG_ENCODE_ASCII:
- ret = gf_changelog_parse_ascii (this, gfc, from_fd,
- to_fd, elen, stbuf);
- break;
- default:
- ret = gf_changelog_copy (this, from_fd, to_fd);
- }
-
- out:
- return ret;
-}
-
-static int
-gf_changelog_consume (xlator_t *this, gf_changelog_t *gfc, char *from_path)
-{
- int ret = -1;
- int fd1 = 0;
- int fd2 = 0;
- int zerob = 0;
- struct stat stbuf = {0,};
- char dest[PATH_MAX] = {0,};
- char to_path[PATH_MAX] = {0,};
-
- ret = stat (from_path, &stbuf);
- if (ret || !S_ISREG(stbuf.st_mode)) {
- gf_log (this->name, GF_LOG_ERROR,
- "stat failed on changelog file: %s", from_path);
- goto out;
- }
-
- fd1 = open (from_path, O_RDONLY);
- if (fd1 < 0) {
- gf_log (this->name, GF_LOG_ERROR,
- "cannot open changelog file: %s (reason: %s)",
- from_path, strerror (errno));
- goto out;
- }
-
- (void) snprintf (to_path, PATH_MAX, "%s%s",
- gfc->gfc_current_dir, basename (from_path));
- (void) snprintf (dest, PATH_MAX, "%s%s",
- gfc->gfc_processing_dir, basename (from_path));
-
- fd2 = open (to_path, O_CREAT | O_TRUNC | O_RDWR,
- S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
- if (fd2 < 0) {
- gf_log (this->name, GF_LOG_ERROR,
- "cannot create ascii changelog file %s (reason %s)",
- to_path, strerror (errno));
- goto close_fd;
- } else {
- ret = gf_changelog_decode (this, gfc, fd1,
- fd2, &stbuf, &zerob);
-
- close (fd2);
-
- if (!ret) {
- /* move it to processing on a successfull
- decode */
- ret = rename (to_path, dest);
- if (ret)
- gf_log (this->name, GF_LOG_ERROR,
- "error moving %s to processing dir"
- " (reason: %s)", to_path,
- strerror (errno));
- }
-
- /* remove it from .current if it's an empty file */
- if (zerob) {
- ret = unlink (to_path);
- if (ret)
- gf_log (this->name, GF_LOG_ERROR,
- "could not unlink %s (reason: %s",
- to_path, strerror (errno));
- }
- }
-
- close_fd:
- close (fd1);
-
- out:
- return ret;
-}
-
-static char *
-gf_changelog_ext_change (xlator_t *this,
- gf_changelog_t *gfc, char *path, size_t readlen)
-{
- int alo = 0;
- int ret = 0;
- size_t len = 0;
- char *buf = NULL;
-
- buf = path;
- while (len < readlen) {
- if (*buf == '\0') {
- alo = 1;
- gf_log (this->name, GF_LOG_DEBUG,
- "processing changelog: %s", path);
- ret = gf_changelog_consume (this, gfc, path);
- }
-
- if (ret)
- break;
-
- len++; buf++;
- if (alo) {
- alo = 0;
- path = buf;
- }
- }
-
- return (ret) ? NULL : path;
-}
-
-void *
-gf_changelog_process (void *data)
-{
- ssize_t len = 0;
- ssize_t offlen = 0;
- xlator_t *this = NULL;
- char *sbuf = NULL;
- gf_changelog_t *gfc = NULL;
- char from_path[PATH_MAX] = {0,};
-
- gfc = (gf_changelog_t *) data;
- this = gfc->this;
-
- pthread_detach (pthread_self());
-
- for (;;) {
- len = gf_changelog_read_path (gfc->gfc_sockfd,
- from_path + offlen,
- PATH_MAX - offlen);
- if (len < 0)
- continue; /* ignore it for now */
-
- if (len == 0) { /* close() from the changelog translator */
- gf_log (this->name, GF_LOG_INFO, "close from changelog"
- " notification translator.");
-
- if (gfc->gfc_connretries != 1) {
- if (!gf_changelog_notification_init(this, gfc))
- continue;
- }
-
- byebye = 1;
- break;
- }
-
- len += offlen;
- sbuf = gf_changelog_ext_change (this, gfc, from_path, len);
- if (!sbuf) {
- gf_log (this->name, GF_LOG_ERROR,
- "could not extract changelog filename");
- continue;
- }
-
- offlen = 0;
- if (sbuf != (from_path + len)) {
- offlen = from_path + len - sbuf;
- memmove (from_path, sbuf, offlen);
- }
- }
-
- gf_log (this->name, GF_LOG_DEBUG,
- "byebye (%d) from processing thread...", byebye);
- return NULL;
-}
diff --git a/xlators/features/changelog/lib/src/gf-changelog-reborp.c b/xlators/features/changelog/lib/src/gf-changelog-reborp.c
new file mode 100644
index 00000000000..56b11cbb705
--- /dev/null
+++ b/xlators/features/changelog/lib/src/gf-changelog-reborp.c
@@ -0,0 +1,413 @@
+/*
+ Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com>
+ This file is part of GlusterFS.
+
+ This file is licensed to you under your choice of the GNU Lesser
+ General Public License, version 3 or any later version (LGPLv3 or
+ later), or the GNU General Public License, version 2 (GPLv2), in all
+ cases as published by the Free Software Foundation.
+*/
+
+#include "changelog-misc.h"
+#include "changelog-mem-types.h"
+
+#include "gf-changelog-helpers.h"
+#include "changelog-rpc-common.h"
+#include "changelog-lib-messages.h"
+
+#include <glusterfs/syscall.h>
+
+/**
+ * Reverse socket: actual data transfer handler. Connection
+ * initiator is PROBER, data transfer is REBORP.
+ */
+
+static struct rpcsvc_program *gf_changelog_reborp_programs[];
+
+void *
+gf_changelog_connection_janitor(void *arg)
+{
+ int32_t ret = 0;
+ xlator_t *this = NULL;
+ gf_private_t *priv = NULL;
+ gf_changelog_t *entry = NULL;
+ struct gf_event *event = NULL;
+ struct gf_event_list *ev = NULL;
+ unsigned long drained = 0;
+
+ this = arg;
+ THIS = this;
+
+ priv = this->private;
+
+ while (1) {
+ pthread_mutex_lock(&priv->lock);
+ {
+ while (list_empty(&priv->cleanups))
+ pthread_cond_wait(&priv->cond, &priv->lock);
+
+ entry = list_first_entry(&priv->cleanups, gf_changelog_t, list);
+ list_del_init(&entry->list);
+ }
+ pthread_mutex_unlock(&priv->lock);
+
+ drained = 0;
+ ev = &entry->event;
+
+ gf_smsg(this->name, GF_LOG_INFO, 0,
+ CHANGELOG_LIB_MSG_CLEANING_BRICK_ENTRY_INFO, "brick=%s",
+ entry->brick, NULL);
+
+ /* 0x0: disable rpc-clnt */
+ rpc_clnt_disable(RPC_PROBER(entry));
+
+ /* 0x1: cleanup callback invoker thread */
+ ret = gf_cleanup_event(this, ev);
+ if (ret)
+ continue;
+
+ /* 0x2: drain pending events */
+ while (!list_empty(&ev->events)) {
+ event = list_first_entry(&ev->events, struct gf_event, list);
+ gf_smsg(this->name, GF_LOG_INFO, 0,
+ CHANGELOG_LIB_MSG_DRAINING_EVENT_INFO, "seq=%lu",
+ event->seq, "payload=%d", event->count, NULL);
+
+ GF_FREE(event);
+ drained++;
+ }
+
+ gf_smsg(this->name, GF_LOG_INFO, 0,
+ CHANGELOG_LIB_MSG_DRAINED_EVENT_INFO, "num=%lu", drained, NULL);
+
+ /* 0x3: freeup brick entry */
+ gf_smsg(this->name, GF_LOG_INFO, 0,
+ CHANGELOG_LIB_MSG_FREEING_ENTRY_INFO, "entry=%p", entry, NULL);
+ LOCK_DESTROY(&entry->statelock);
+ GF_FREE(entry);
+ }
+
+ return NULL;
+}
+
+int
+gf_changelog_reborp_rpcsvc_notify(rpcsvc_t *rpc, void *mydata,
+ rpcsvc_event_t event, void *data)
+{
+ int ret = 0;
+ xlator_t *this = NULL;
+ gf_changelog_t *entry = NULL;
+
+ if (!(event == RPCSVC_EVENT_ACCEPT || event == RPCSVC_EVENT_DISCONNECT))
+ return 0;
+
+ entry = mydata;
+ this = entry->this;
+
+ switch (event) {
+ case RPCSVC_EVENT_ACCEPT:
+ ret = sys_unlink(RPC_SOCK(entry));
+ if (ret != 0)
+ gf_smsg(this->name, GF_LOG_WARNING, errno,
+ CHANGELOG_LIB_MSG_UNLINK_FAILED, "name=reverse socket",
+ "path=%s", RPC_SOCK(entry), NULL);
+ if (entry->connected)
+ GF_CHANGELOG_INVOKE_CBK(this, entry->connected, entry->brick,
+ entry->ptr);
+ break;
+ case RPCSVC_EVENT_DISCONNECT:
+ if (entry->disconnected)
+ GF_CHANGELOG_INVOKE_CBK(this, entry->disconnected, entry->brick,
+ entry->ptr);
+ /* passthrough */
+ default:
+ break;
+ }
+
+ return 0;
+}
+
+rpcsvc_t *
+gf_changelog_reborp_init_rpc_listner(xlator_t *this, char *path, char *sock,
+ void *cbkdata)
+{
+ CHANGELOG_MAKE_TMP_SOCKET_PATH(path, sock, UNIX_PATH_MAX);
+ return changelog_rpc_server_init(this, sock, cbkdata,
+ gf_changelog_reborp_rpcsvc_notify,
+ gf_changelog_reborp_programs);
+}
+
+/**
+ * This is dirty and painful as of now until there is event filtering in the
+ * server. The entire event buffer is scanned and interested events are picked,
+ * whereas we _should_ be notified with the events we were interested in
+ * (selected at the time of probe). As of now this is complete BS and needs
+ * fixture ASAP. I just made it work, it needs to be better.
+ *
+ * @FIXME: cleanup this bugger once server filters events.
+ */
+void
+gf_changelog_invoke_callback(gf_changelog_t *entry, struct iovec **vec,
+ int payloadcnt)
+{
+ int i = 0;
+ int evsize = 0;
+ xlator_t *this = NULL;
+ changelog_event_t *event = NULL;
+
+ this = entry->this;
+
+ for (; i < payloadcnt; i++) {
+ event = (changelog_event_t *)vec[i]->iov_base;
+ evsize = vec[i]->iov_len / CHANGELOG_EV_SIZE;
+
+ for (; evsize > 0; evsize--, event++) {
+ if (gf_changelog_filter_check(entry, event)) {
+ GF_CHANGELOG_INVOKE_CBK(this, entry->callback, entry->brick,
+ entry->ptr, event);
+ }
+ }
+ }
+}
+
+/**
+ * Ordered event handler is self-adaptive.. if the event sequence number
+ * is what's expected (->next_seq) there is no ordering list that's
+ * maintained. On out-of-order event notifications, event buffers are
+ * dynamically allocated and ordered.
+ */
+
+int
+__is_expected_sequence(struct gf_event_list *ev, struct gf_event *event)
+{
+ return (ev->next_seq == event->seq);
+}
+
+int
+__can_process_event(struct gf_event_list *ev, struct gf_event **event)
+{
+ *event = list_first_entry(&ev->events, struct gf_event, list);
+
+ if (__is_expected_sequence(ev, *event)) {
+ list_del(&(*event)->list);
+ ev->next_seq++;
+ return 1;
+ }
+
+ return 0;
+}
+
+void
+pick_event_ordered(struct gf_event_list *ev, struct gf_event **event)
+{
+ pthread_mutex_lock(&ev->lock);
+ {
+ while (list_empty(&ev->events) || !__can_process_event(ev, event))
+ pthread_cond_wait(&ev->cond, &ev->lock);
+ }
+ pthread_mutex_unlock(&ev->lock);
+}
+
+void
+pick_event_unordered(struct gf_event_list *ev, struct gf_event **event)
+{
+ pthread_mutex_lock(&ev->lock);
+ {
+ while (list_empty(&ev->events))
+ pthread_cond_wait(&ev->cond, &ev->lock);
+ *event = list_first_entry(&ev->events, struct gf_event, list);
+ list_del(&(*event)->list);
+ }
+ pthread_mutex_unlock(&ev->lock);
+}
+
+void *
+gf_changelog_callback_invoker(void *arg)
+{
+ xlator_t *this = NULL;
+ gf_changelog_t *entry = NULL;
+ struct iovec *vec = NULL;
+ struct gf_event *event = NULL;
+ struct gf_event_list *ev = NULL;
+
+ ev = arg;
+ entry = ev->entry;
+ THIS = this = entry->this;
+
+ while (1) {
+ entry->pickevent(ev, &event);
+
+ vec = (struct iovec *)&event->iov;
+ gf_changelog_invoke_callback(entry, &vec, event->count);
+
+ GF_FREE(event);
+ }
+
+ return NULL;
+}
+
+static int
+orderfn(struct list_head *pos1, struct list_head *pos2)
+{
+ struct gf_event *event1 = NULL;
+ struct gf_event *event2 = NULL;
+
+ event1 = list_entry(pos1, struct gf_event, list);
+ event2 = list_entry(pos2, struct gf_event, list);
+
+ if (event1->seq > event2->seq)
+ return 1;
+ return -1;
+}
+
+void
+queue_ordered_event(struct gf_event_list *ev, struct gf_event *event)
+{
+ /* add event to the ordered event list and wake up listener(s) */
+ pthread_mutex_lock(&ev->lock);
+ {
+ list_add_order(&event->list, &ev->events, orderfn);
+ if (!ev->next_seq)
+ ev->next_seq = event->seq;
+ if (ev->next_seq == event->seq)
+ pthread_cond_signal(&ev->cond);
+ }
+ pthread_mutex_unlock(&ev->lock);
+}
+
+void
+queue_unordered_event(struct gf_event_list *ev, struct gf_event *event)
+{
+ /* add event to the tail of the queue and wake up listener(s) */
+ pthread_mutex_lock(&ev->lock);
+ {
+ list_add_tail(&event->list, &ev->events);
+ pthread_cond_signal(&ev->cond);
+ }
+ pthread_mutex_unlock(&ev->lock);
+}
+
+int
+gf_changelog_event_handler(rpcsvc_request_t *req, xlator_t *this,
+ gf_changelog_t *entry)
+{
+ int i = 0;
+ size_t payloadlen = 0;
+ ssize_t len = 0;
+ int payloadcnt = 0;
+ changelog_event_req rpc_req = {
+ 0,
+ };
+ changelog_event_rsp rpc_rsp = {
+ 0,
+ };
+ struct iovec *vec = NULL;
+ struct gf_event *event = NULL;
+ struct gf_event_list *ev = NULL;
+
+ ev = &entry->event;
+
+ len = xdr_to_generic(req->msg[0], &rpc_req,
+ (xdrproc_t)xdr_changelog_event_req);
+ if (len < 0) {
+ gf_msg(this->name, GF_LOG_ERROR, 0,
+ CHANGELOG_LIB_MSG_XDR_DECODING_FAILED, "xdr decoding failed");
+ req->rpc_err = GARBAGE_ARGS;
+ goto handle_xdr_error;
+ }
+
+ if (len < req->msg[0].iov_len) {
+ payloadcnt = 1;
+ payloadlen = (req->msg[0].iov_len - len);
+ }
+ for (i = 1; i < req->count; i++) {
+ payloadcnt++;
+ payloadlen += req->msg[i].iov_len;
+ }
+
+ event = GF_CALLOC(1, GF_EVENT_CALLOC_SIZE(payloadcnt, payloadlen),
+ gf_changelog_mt_libgfchangelog_event_t);
+ if (!event)
+ goto handle_xdr_error;
+ INIT_LIST_HEAD(&event->list);
+
+ payloadlen = 0;
+ event->seq = rpc_req.seq;
+ event->count = payloadcnt;
+
+ /* deep copy IO vectors */
+ vec = &event->iov[0];
+ GF_EVENT_ASSIGN_IOVEC(vec, event, (req->msg[0].iov_len - len), payloadlen);
+ (void)memcpy(vec->iov_base, req->msg[0].iov_base + len, vec->iov_len);
+
+ for (i = 1; i < req->count; i++) {
+ vec = &event->iov[i];
+ GF_EVENT_ASSIGN_IOVEC(vec, event, req->msg[i].iov_len, payloadlen);
+ (void)memcpy(event->iov[i].iov_base, req->msg[i].iov_base,
+ req->msg[i].iov_len);
+ }
+
+ gf_msg_debug(this->name, 0,
+ "seq: %" PRIu64 " [%s] (time: %" PRIu64 ".%" PRIu64
+ "), "
+ "(vec: %d, len: %zd)",
+ rpc_req.seq, entry->brick, rpc_req.tv_sec, rpc_req.tv_usec,
+ payloadcnt, payloadlen);
+
+ /* dispatch event */
+ entry->queueevent(ev, event);
+
+ /* ack sequence number */
+ rpc_rsp.op_ret = 0;
+ rpc_rsp.seq = rpc_req.seq;
+
+ goto submit_rpc;
+
+handle_xdr_error:
+ rpc_rsp.op_ret = -1;
+ rpc_rsp.seq = 0; /* invalid */
+submit_rpc:
+ return changelog_rpc_sumbit_reply(req, &rpc_rsp, NULL, 0, NULL,
+ (xdrproc_t)xdr_changelog_event_rsp);
+}
+
+int
+gf_changelog_reborp_handle_event(rpcsvc_request_t *req)
+{
+ xlator_t *this = NULL;
+ rpcsvc_t *svc = NULL;
+ gf_changelog_t *entry = NULL;
+
+ svc = rpcsvc_request_service(req);
+ entry = svc->mydata;
+
+ this = THIS = entry->this;
+
+ return gf_changelog_event_handler(req, this, entry);
+}
+
+static rpcsvc_actor_t gf_changelog_reborp_actors[CHANGELOG_REV_PROC_MAX] = {
+ [CHANGELOG_REV_PROC_EVENT] = {"CHANGELOG EVENT HANDLER",
+ gf_changelog_reborp_handle_event, NULL,
+ CHANGELOG_REV_PROC_EVENT, DRC_NA, 0},
+};
+
+/**
+ * Do not use synctask as the RPC layer dereferences ->mydata as THIS.
+ * In gf_changelog_setup_rpc(), @cbkdata is of type @gf_changelog_t,
+ * and that's required to invoke the callback with the appropriate
+ * brick path and it's private data.
+ */
+static struct rpcsvc_program gf_changelog_reborp_prog = {
+ .progname = "LIBGFCHANGELOG REBORP",
+ .prognum = CHANGELOG_REV_RPC_PROCNUM,
+ .progver = CHANGELOG_REV_RPC_PROCVER,
+ .numactors = CHANGELOG_REV_PROC_MAX,
+ .actors = gf_changelog_reborp_actors,
+ .synctask = _gf_false,
+};
+
+static struct rpcsvc_program *gf_changelog_reborp_programs[] = {
+ &gf_changelog_reborp_prog,
+ NULL,
+};
diff --git a/xlators/features/changelog/lib/src/gf-changelog-rpc.c b/xlators/features/changelog/lib/src/gf-changelog-rpc.c
new file mode 100644
index 00000000000..8ec6ffbcebc
--- /dev/null
+++ b/xlators/features/changelog/lib/src/gf-changelog-rpc.c
@@ -0,0 +1,98 @@
+/*
+ Copyright (c) 2013 Red Hat, Inc. <http://www.redhat.com>
+ This file is part of GlusterFS.
+
+ This file is licensed to you under your choice of the GNU Lesser
+ General Public License, version 3 or any later version (LGPLv3 or
+ later), or the GNU General Public License, version 2 (GPLv2), in all
+ cases as published by the Free Software Foundation.
+*/
+
+#include "gf-changelog-rpc.h"
+#include "changelog-misc.h"
+#include "changelog-mem-types.h"
+
+struct rpc_clnt_program gf_changelog_clnt;
+
+/* TODO: piggyback reconnect to called (upcall) */
+int
+gf_changelog_rpc_notify(struct rpc_clnt *rpc, void *mydata,
+ rpc_clnt_event_t event, void *data)
+{
+ switch (event) {
+ case RPC_CLNT_CONNECT:
+ break;
+ case RPC_CLNT_DISCONNECT:
+ case RPC_CLNT_MSG:
+ case RPC_CLNT_DESTROY:
+ case RPC_CLNT_PING:
+ break;
+ }
+
+ return 0;
+}
+
+struct rpc_clnt *
+gf_changelog_rpc_init(xlator_t *this, gf_changelog_t *entry)
+{
+ char sockfile[UNIX_PATH_MAX] = {
+ 0,
+ };
+
+ CHANGELOG_MAKE_SOCKET_PATH(entry->brick, sockfile, UNIX_PATH_MAX);
+ return changelog_rpc_client_init(this, entry, sockfile,
+ gf_changelog_rpc_notify);
+}
+
+/**
+ * remote procedure calls declarations.
+ */
+
+int
+gf_probe_changelog_cbk(struct rpc_req *req, struct iovec *iovec, int count,
+ void *myframe)
+{
+ return 0;
+}
+
+int
+gf_probe_changelog_filter(call_frame_t *frame, xlator_t *this, void *data)
+{
+ char *sock = NULL;
+ gf_changelog_t *entry = NULL;
+ changelog_probe_req req = {
+ 0,
+ };
+
+ entry = data;
+ sock = RPC_SOCK(entry);
+
+ (void)memcpy(&req.sock, sock, strlen(sock));
+ req.filter = entry->notify;
+
+ /* invoke RPC */
+ return changelog_rpc_sumbit_req(
+ RPC_PROBER(entry), (void *)&req, frame, &gf_changelog_clnt,
+ CHANGELOG_RPC_PROBE_FILTER, NULL, 0, NULL, this, gf_probe_changelog_cbk,
+ (xdrproc_t)xdr_changelog_probe_req);
+}
+
+int
+gf_changelog_invoke_rpc(xlator_t *this, gf_changelog_t *entry, int procidx)
+{
+ return changelog_invoke_rpc(this, RPC_PROBER(entry), &gf_changelog_clnt,
+ procidx, entry);
+}
+
+struct rpc_clnt_procedure gf_changelog_procs[CHANGELOG_RPC_PROC_MAX] = {
+ [CHANGELOG_RPC_PROC_NULL] = {"NULL", NULL},
+ [CHANGELOG_RPC_PROBE_FILTER] = {"PROBE FILTER", gf_probe_changelog_filter},
+};
+
+struct rpc_clnt_program gf_changelog_clnt = {
+ .progname = "LIBGFCHANGELOG",
+ .prognum = CHANGELOG_RPC_PROGNUM,
+ .progver = CHANGELOG_RPC_PROGVER,
+ .numproc = CHANGELOG_RPC_PROC_MAX,
+ .proctable = gf_changelog_procs,
+};
diff --git a/xlators/features/changelog/lib/src/changelog.h b/xlators/features/changelog/lib/src/gf-changelog-rpc.h
index 5cddfb5839c..5c82d6f1c08 100644
--- a/xlators/features/changelog/lib/src/changelog.h
+++ b/xlators/features/changelog/lib/src/gf-changelog-rpc.h
@@ -8,24 +8,21 @@
cases as published by the Free Software Foundation.
*/
-#ifndef _GF_CHANGELOG_H
-#define _GF_CHANGELOG_H
+#ifndef __GF_CHANGELOG_RPC_H
+#define __GF_CHANGELOG_RPC_H
-/* API set */
+#include <glusterfs/xlator.h>
-int
-gf_changelog_register (char *brick_path, char *scratch_dir,
- char *log_file, int log_levl, int max_reconnects);
-ssize_t
-gf_changelog_scan ();
-
-int
-gf_changelog_start_fresh ();
+#include "gf-changelog-helpers.h"
+#include "changelog-rpc-common.h"
-ssize_t
-gf_changelog_next_change (char *bufptr, size_t maxlen);
+struct rpc_clnt *
+gf_changelog_rpc_init(xlator_t *, gf_changelog_t *);
int
-gf_changelog_done (char *file);
+gf_changelog_invoke_rpc(xlator_t *, gf_changelog_t *, int);
+
+rpcsvc_t *
+gf_changelog_reborp_init_rpc_listner(xlator_t *, char *, char *, void *);
#endif
diff --git a/xlators/features/changelog/lib/src/gf-changelog.c b/xlators/features/changelog/lib/src/gf-changelog.c
index 0827f2cac6c..57c3d39ef76 100644
--- a/xlators/features/changelog/lib/src/gf-changelog.c
+++ b/xlators/features/changelog/lib/src/gf-changelog.c
@@ -1,5 +1,5 @@
/*
- Copyright (c) 2013 Red Hat, Inc. <http://www.redhat.com>
+ Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com>
This file is part of GlusterFS.
This file is licensed to you under your choice of the GNU Lesser
@@ -14,558 +14,639 @@
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/un.h>
+#include <sys/time.h>
+#include <sys/resource.h>
#ifndef _GNU_SOURCE
#define _GNU_SOURCE
#endif
#include <string.h>
-#include "globals.h"
-#include "glusterfs.h"
-#include "logging.h"
+#include <glusterfs/globals.h>
+#include <glusterfs/glusterfs.h>
+#include <glusterfs/logging.h>
+#include <glusterfs/defaults.h>
+#include <glusterfs/syncop.h>
+#include "gf-changelog-rpc.h"
#include "gf-changelog-helpers.h"
/* from the changelog translator */
#include "changelog-misc.h"
#include "changelog-mem-types.h"
+#include "changelog-lib-messages.h"
-int byebye = 0;
+/**
+ * Global singleton xlator pointer for the library, initialized
+ * during library load. This should probably be hidden inside
+ * an initialized object which is an handle for the consumer.
+ *
+ * TODO: do away with the global..
+ */
+xlator_t *master = NULL;
-static void
-gf_changelog_cleanup (gf_changelog_t *gfc)
+static inline gf_private_t *
+gf_changelog_alloc_priv()
{
- /* socket */
- if (gfc->gfc_sockfd != -1)
- close (gfc->gfc_sockfd);
- /* tracker fd */
- if (gfc->gfc_fd != -1)
- close (gfc->gfc_fd);
- /* processing dir */
- if (gfc->gfc_dir)
- closedir (gfc->gfc_dir);
-
- if (gfc->gfc_working_dir)
- free (gfc->gfc_working_dir); /* allocated by realpath */
+ int ret = 0;
+ gf_private_t *priv = NULL;
+
+ priv = GF_CALLOC(1, sizeof(*priv), gf_changelog_mt_priv_t);
+ if (!priv)
+ goto error_return;
+ INIT_LIST_HEAD(&priv->connections);
+ INIT_LIST_HEAD(&priv->cleanups);
+
+ ret = pthread_mutex_init(&priv->lock, NULL);
+ if (ret != 0)
+ goto free_priv;
+ ret = pthread_cond_init(&priv->cond, NULL);
+ if (ret != 0)
+ goto cleanup_mutex;
+
+ priv->api = NULL;
+ return priv;
+
+cleanup_mutex:
+ (void)pthread_mutex_destroy(&priv->lock);
+free_priv:
+ GF_FREE(priv);
+error_return:
+ return NULL;
}
-void
-__attribute__ ((constructor)) gf_changelog_ctor (void)
+#define GF_CHANGELOG_EVENT_POOL_SIZE 16384
+#define GF_CHANGELOG_EVENT_THREAD_COUNT 4
+
+static int
+gf_changelog_ctx_defaults_init(glusterfs_ctx_t *ctx)
{
- glusterfs_ctx_t *ctx = NULL;
+ cmd_args_t *cmd_args = NULL;
+ struct rlimit lim = {
+ 0,
+ };
+ call_pool_t *pool = NULL;
+ int ret = -1;
+
+ ret = xlator_mem_acct_init(THIS, gf_changelog_mt_end);
+ if (ret != 0)
+ return -1;
- ctx = glusterfs_ctx_new ();
- if (!ctx)
- return;
+ ctx->process_uuid = generate_glusterfs_ctx_id();
+ if (!ctx->process_uuid)
+ return -1;
- if (glusterfs_globals_init (ctx)) {
- free (ctx);
- ctx = NULL;
- return;
- }
+ ctx->page_size = 128 * GF_UNIT_KB;
- THIS->ctx = ctx;
-}
+ ctx->iobuf_pool = iobuf_pool_new();
+ if (!ctx->iobuf_pool)
+ goto free_pool;
-void
-__attribute__ ((destructor)) gf_changelog_dtor (void)
-{
- xlator_t *this = NULL;
- glusterfs_ctx_t *ctx = NULL;
- gf_changelog_t *gfc = NULL;
+ ctx->event_pool = gf_event_pool_new(GF_CHANGELOG_EVENT_POOL_SIZE,
+ GF_CHANGELOG_EVENT_THREAD_COUNT);
+ if (!ctx->event_pool)
+ goto free_pool;
- this = THIS;
- if (!this)
- return;
-
- ctx = this->ctx;
- gfc = this->private;
-
- if (gfc) {
- if (gfc->hist_gfc) {
- gf_changelog_cleanup(gfc->hist_gfc);
- GF_FREE (gfc->hist_gfc);
- }
- gf_changelog_cleanup (gfc);
- GF_FREE (gfc);
- }
+ pool = GF_CALLOC(1, sizeof(call_pool_t),
+ gf_changelog_mt_libgfchangelog_call_pool_t);
+ if (!pool)
+ goto free_pool;
- if (ctx) {
- pthread_mutex_destroy (&ctx->lock);
- free (ctx);
- ctx = NULL;
- }
-}
+ /* frame_mem_pool size 112 * 64 */
+ pool->frame_mem_pool = mem_pool_new(call_frame_t, 32);
+ if (!pool->frame_mem_pool)
+ goto free_pool;
+ /* stack_mem_pool size 256 * 128 */
+ pool->stack_mem_pool = mem_pool_new(call_stack_t, 16);
-static int
-gf_changelog_open_dirs (gf_changelog_t *gfc)
-{
- int ret = -1;
- DIR *dir = NULL;
- int tracker_fd = 0;
- char tracker_path[PATH_MAX] = {0,};
-
- (void) snprintf (gfc->gfc_current_dir, PATH_MAX,
- "%s/"GF_CHANGELOG_CURRENT_DIR"/",
- gfc->gfc_working_dir);
- ret = mkdir_p (gfc->gfc_current_dir, 0600, _gf_false);
- if (ret)
- goto out;
+ if (!pool->stack_mem_pool)
+ goto free_pool;
- (void) snprintf (gfc->gfc_processed_dir, PATH_MAX,
- "%s/"GF_CHANGELOG_PROCESSED_DIR"/",
- gfc->gfc_working_dir);
- ret = mkdir_p (gfc->gfc_processed_dir, 0600, _gf_false);
- if (ret)
- goto out;
+ ctx->stub_mem_pool = mem_pool_new(call_stub_t, 16);
+ if (!ctx->stub_mem_pool)
+ goto free_pool;
- (void) snprintf (gfc->gfc_processing_dir, PATH_MAX,
- "%s/"GF_CHANGELOG_PROCESSING_DIR"/",
- gfc->gfc_working_dir);
- ret = mkdir_p (gfc->gfc_processing_dir, 0600, _gf_false);
- if (ret)
- goto out;
+ ctx->dict_pool = mem_pool_new(dict_t, 32);
+ if (!ctx->dict_pool)
+ goto free_pool;
- dir = opendir (gfc->gfc_processing_dir);
- if (!dir) {
- gf_log ("", GF_LOG_ERROR,
- "opendir() error [reason: %s]", strerror (errno));
- goto out;
- }
+ ctx->dict_pair_pool = mem_pool_new(data_pair_t, 512);
+ if (!ctx->dict_pair_pool)
+ goto free_pool;
- gfc->gfc_dir = dir;
+ ctx->dict_data_pool = mem_pool_new(data_t, 512);
+ if (!ctx->dict_data_pool)
+ goto free_pool;
- (void) snprintf (tracker_path, PATH_MAX,
- "%s/"GF_CHANGELOG_TRACKER, gfc->gfc_working_dir);
+ ctx->logbuf_pool = mem_pool_new(log_buf_t, 256);
+ if (!ctx->logbuf_pool)
+ goto free_pool;
- tracker_fd = open (tracker_path, O_CREAT | O_APPEND | O_RDWR,
- S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
- if (tracker_fd < 0) {
- closedir (gfc->gfc_dir);
- ret = -1;
- goto out;
- }
+ INIT_LIST_HEAD(&pool->all_frames);
+ LOCK_INIT(&pool->lock);
+ ctx->pool = pool;
- gfc->gfc_fd = tracker_fd;
- ret = 0;
- out:
- return ret;
-}
+ LOCK_INIT(&ctx->lock);
-int
-gf_changelog_notification_init (xlator_t *this, gf_changelog_t *gfc)
-{
- int ret = 0;
- int len = 0;
- int tries = 0;
- int sockfd = 0;
- struct sockaddr_un remote;
-
- this = gfc->this;
-
- if (gfc->gfc_sockfd != -1) {
- gf_log (this->name, GF_LOG_INFO,
- "Reconnecting...");
- close (gfc->gfc_sockfd);
- }
+ cmd_args = &ctx->cmd_args;
- sockfd = socket (AF_UNIX, SOCK_STREAM, 0);
- if (sockfd < 0) {
- ret = -1;
- goto out;
- }
+ INIT_LIST_HEAD(&cmd_args->xlator_options);
- CHANGELOG_MAKE_SOCKET_PATH (gfc->gfc_brickpath,
- gfc->gfc_sockpath, UNIX_PATH_MAX);
- gf_log (this->name, GF_LOG_INFO,
- "connecting to changelog socket: %s (brick: %s)",
- gfc->gfc_sockpath, gfc->gfc_brickpath);
+ lim.rlim_cur = RLIM_INFINITY;
+ lim.rlim_max = RLIM_INFINITY;
+ setrlimit(RLIMIT_CORE, &lim);
- remote.sun_family = AF_UNIX;
- strcpy (remote.sun_path, gfc->gfc_sockpath);
+ return 0;
- len = strlen (remote.sun_path) + sizeof (remote.sun_family);
+free_pool:
+ if (pool) {
+ GF_FREE(pool->frame_mem_pool);
- while (tries < gfc->gfc_connretries) {
- gf_log (this->name, GF_LOG_WARNING,
- "connection attempt %d/%d...",
- tries + 1, gfc->gfc_connretries);
+ GF_FREE(pool->stack_mem_pool);
- /* initiate a connect */
- if (connect (sockfd, (struct sockaddr *) &remote, len) == 0) {
- gfc->gfc_sockfd = sockfd;
- break;
- }
+ GF_FREE(pool);
+ }
- tries++;
- sleep (2);
- }
+ GF_FREE(ctx->stub_mem_pool);
- if (tries == gfc->gfc_connretries) {
- gf_log (this->name, GF_LOG_ERROR,
- "could not connect to changelog socket!"
- " bailing out...");
- close (sockfd);
- ret = -1;
- } else
- gf_log (this->name, GF_LOG_INFO,
- "connection successful");
-
- out:
- return ret;
-}
+ GF_FREE(ctx->dict_pool);
-int
-gf_changelog_done (char *file)
-{
- int ret = -1;
- char *buffer = NULL;
- xlator_t *this = NULL;
- gf_changelog_t *gfc = NULL;
- char to_path[PATH_MAX] = {0,};
+ GF_FREE(ctx->dict_pair_pool);
- errno = EINVAL;
+ GF_FREE(ctx->dict_data_pool);
- this = THIS;
- if (!this)
- goto out;
-
- gfc = (gf_changelog_t *) this->private;
- if (!gfc)
- goto out;
-
- if (!file || !strlen (file))
- goto out;
-
- /* make sure 'file' is inside ->gfc_working_dir */
- buffer = realpath (file, NULL);
- if (!buffer)
- goto out;
-
- if (strncmp (gfc->gfc_working_dir,
- buffer, strlen (gfc->gfc_working_dir)))
- goto out;
-
- (void) snprintf (to_path, PATH_MAX, "%s%s",
- gfc->gfc_processed_dir, basename (buffer));
- gf_log (this->name, GF_LOG_DEBUG,
- "moving %s to processed directory", file);
- ret = rename (buffer, to_path);
- if (ret) {
- gf_log (this->name, GF_LOG_ERROR,
- "cannot move %s to %s (reason: %s)",
- file, to_path, strerror (errno));
- goto out;
- }
+ GF_FREE(ctx->logbuf_pool);
- ret = 0;
+ GF_FREE(ctx->iobuf_pool);
- out:
- if (buffer)
- free (buffer); /* allocated by realpath() */
- return ret;
+ GF_FREE(ctx->event_pool);
+
+ return -1;
}
-/**
- * @API
- * for a set of changelogs, start from the begining
- */
-int
-gf_changelog_start_fresh ()
+/* TODO: cleanup ctx defaults */
+void
+gf_changelog_cleanup_this(xlator_t *this)
{
- xlator_t *this = NULL;
- gf_changelog_t *gfc = NULL;
-
- this = THIS;
- if (!this)
- goto out;
+ glusterfs_ctx_t *ctx = NULL;
- errno = EINVAL;
+ if (!this)
+ return;
- gfc = (gf_changelog_t *) this->private;
- if (!gfc)
- goto out;
+ ctx = this->ctx;
+ syncenv_destroy(ctx->env);
+ free(ctx);
- if (gf_ftruncate (gfc->gfc_fd, 0))
- goto out;
+ this->private = NULL;
+ this->ctx = NULL;
- return 0;
-
- out:
- return -1;
+ mem_pools_fini();
}
-/**
- * @API
- * return the next changelog file entry. zero means all chanelogs
- * consumed.
- */
-ssize_t
-gf_changelog_next_change (char *bufptr, size_t maxlen)
+static int
+gf_changelog_init_context()
{
- ssize_t size = 0;
- int tracker_fd = 0;
- xlator_t *this = NULL;
- gf_changelog_t *gfc = NULL;
- char buffer[PATH_MAX] = {0,};
+ glusterfs_ctx_t *ctx = NULL;
- errno = EINVAL;
+ ctx = glusterfs_ctx_new();
+ if (!ctx)
+ goto error_return;
- this = THIS;
- if (!this)
- goto out;
+ if (glusterfs_globals_init(ctx))
+ goto free_ctx;
- gfc = (gf_changelog_t *) this->private;
- if (!gfc)
- goto out;
+ THIS->ctx = ctx;
+ if (gf_changelog_ctx_defaults_init(ctx))
+ goto free_ctx;
- tracker_fd = gfc->gfc_fd;
+ ctx->env = syncenv_new(0, 0, 0);
+ if (!ctx->env)
+ goto free_ctx;
+ return 0;
- size = gf_readline (tracker_fd, buffer, maxlen);
- if (size < 0)
- goto out;
- if (size == 0)
- return 0;
+free_ctx:
+ free(ctx);
+ THIS->ctx = NULL;
+error_return:
+ return -1;
+}
- memcpy (bufptr, buffer, size - 1);
- *(buffer + size) = '\0';
+static int
+gf_changelog_init_master()
+{
+ int ret = 0;
- return size;
+ ret = gf_changelog_init_context();
+ mem_pools_init();
- out:
- return -1;
+ return ret;
}
-/**
- * @API
- * gf_changelog_scan() - scan and generate a list of change entries
- *
- * calling this api multiple times (without calling gf_changlog_done())
- * would result new changelogs(s) being refreshed in the tracker file.
- * This call also acts as a cancellation point for the consumer.
- */
-ssize_t
-gf_changelog_scan ()
+/* TODO: cleanup clnt/svc on failure */
+int
+gf_changelog_setup_rpc(xlator_t *this, gf_changelog_t *entry, int proc)
{
- int ret = 0;
- int tracker_fd = 0;
- size_t len = 0;
- size_t off = 0;
- xlator_t *this = NULL;
- size_t nr_entries = 0;
- gf_changelog_t *gfc = NULL;
- struct dirent *entryp = NULL;
- struct dirent *result = NULL;
- char buffer[PATH_MAX] = {0,};
+ int ret = 0;
+ rpcsvc_t *svc = NULL;
+ struct rpc_clnt *rpc = NULL;
+
+ /**
+ * Initialize a connect back socket. A probe() RPC call to the server
+ * triggers a reverse connect.
+ */
+ svc = gf_changelog_reborp_init_rpc_listner(this, entry->brick,
+ RPC_SOCK(entry), entry);
+ if (!svc)
+ goto error_return;
+ RPC_REBORP(entry) = svc;
+
+ /* Initialize an RPC client */
+ rpc = gf_changelog_rpc_init(this, entry);
+ if (!rpc)
+ goto error_return;
+ RPC_PROBER(entry) = rpc;
+
+ /**
+ * @FIXME
+ * till we have connection state machine, let's delay the RPC call
+ * for now..
+ */
+ sleep(2);
+
+ /**
+ * Probe changelog translator for reverse connection. After a successful
+ * call, there's less use of the client and can be disconnected, but
+ * let's leave the connection active for any future RPC calls.
+ */
+ ret = gf_changelog_invoke_rpc(this, entry, proc);
+ if (ret) {
+ gf_msg(this->name, GF_LOG_ERROR, 0, CHANGELOG_LIB_MSG_INVOKE_RPC_FAILED,
+ "Could not initiate probe RPC, bailing out!!!");
+ goto error_return;
+ }
+
+ return 0;
+
+error_return:
+ return -1;
+}
- this = THIS;
- if (!this)
- goto out;
-
- gfc = (gf_changelog_t *) this->private;
- if (!gfc)
- goto out;
-
- /**
- * do we need to protect 'byebye' with locks? worst, the
- * consumer would get notified during next scan().
- */
- if (byebye) {
- errno = ECONNREFUSED;
- goto out;
- }
+int
+gf_cleanup_event(xlator_t *this, struct gf_event_list *ev)
+{
+ int ret = 0;
+
+ ret = gf_thread_cleanup(this, ev->invoker);
+ if (ret) {
+ gf_msg(this->name, GF_LOG_WARNING, -ret,
+ CHANGELOG_LIB_MSG_CLEANUP_ERROR,
+ "cannot cleanup callback invoker thread."
+ " Not freeing resources");
+ return -1;
+ }
- errno = EINVAL;
-
- tracker_fd = gfc->gfc_fd;
-
- if (gf_ftruncate (tracker_fd, 0))
- goto out;
-
- len = offsetof(struct dirent, d_name)
- + pathconf(gfc->gfc_processing_dir, _PC_NAME_MAX) + 1;
- entryp = GF_CALLOC (1, len,
- gf_changelog_mt_libgfchangelog_dirent_t);
- if (!entryp)
- goto out;
-
- rewinddir (gfc->gfc_dir);
- while (1) {
- ret = readdir_r (gfc->gfc_dir, entryp, &result);
- if (ret || !result)
- break;
-
- if ( !strcmp (basename (entryp->d_name), ".")
- || !strcmp (basename (entryp->d_name), "..") )
- continue;
-
- nr_entries++;
-
- GF_CHANGELOG_FILL_BUFFER (gfc->gfc_processing_dir,
- buffer, off,
- strlen (gfc->gfc_processing_dir));
- GF_CHANGELOG_FILL_BUFFER (entryp->d_name, buffer,
- off, strlen (entryp->d_name));
- GF_CHANGELOG_FILL_BUFFER ("\n", buffer, off, 1);
-
- if (gf_changelog_write (tracker_fd, buffer, off) != off) {
- gf_log (this->name, GF_LOG_ERROR,
- "error writing changelog filename"
- " to tracker file");
- break;
- }
- off = 0;
- }
+ ev->entry = NULL;
- GF_FREE (entryp);
+ return 0;
+}
- if (!result) {
- if (gf_lseek (tracker_fd, 0, SEEK_SET) != -1)
- return nr_entries;
- }
- out:
- return -1;
+static int
+gf_init_event(gf_changelog_t *entry)
+{
+ int ret = 0;
+ struct gf_event_list *ev = NULL;
+
+ ev = &entry->event;
+ ev->entry = entry;
+
+ ret = pthread_mutex_init(&ev->lock, NULL);
+ if (ret != 0)
+ goto error_return;
+ ret = pthread_cond_init(&ev->cond, NULL);
+ if (ret != 0)
+ goto cleanup_mutex;
+ INIT_LIST_HEAD(&ev->events);
+
+ ev->next_seq = 0; /* bootstrap sequencing */
+
+ if (GF_NEED_ORDERED_EVENTS(entry)) {
+ entry->pickevent = pick_event_ordered;
+ entry->queueevent = queue_ordered_event;
+ } else {
+ entry->pickevent = pick_event_unordered;
+ entry->queueevent = queue_unordered_event;
+ }
+
+ ret = gf_thread_create(&ev->invoker, NULL, gf_changelog_callback_invoker,
+ ev, "clogcbki");
+ if (ret != 0) {
+ entry->pickevent = NULL;
+ entry->queueevent = NULL;
+ goto cleanup_cond;
+ }
+
+ return 0;
+
+cleanup_cond:
+ (void)pthread_cond_destroy(&ev->cond);
+cleanup_mutex:
+ (void)pthread_mutex_destroy(&ev->lock);
+error_return:
+ return -1;
}
/**
- * @API
- * gf_changelog_register() - register a client for updates.
+ * TODO:
+ * - cleanup invoker thread
+ * - cleanup event list
+ * - destroy rpc{-clnt, svc}
*/
int
-gf_changelog_register (char *brick_path, char *scratch_dir,
- char *log_file, int log_level, int max_reconnects)
+gf_cleanup_brick_connection(xlator_t *this, gf_changelog_t *entry)
{
- int i = 0;
- int ret = -1;
- int errn = 0;
- xlator_t *this = NULL;
- gf_changelog_t *gfc = NULL;
- char hist_scratch_dir[PATH_MAX] = {0,};
+ return 0;
+}
- this = THIS;
- if (!this->ctx)
- goto out;
+int
+gf_cleanup_connections(xlator_t *this)
+{
+ return 0;
+}
- errno = ENOMEM;
+static int
+gf_setup_brick_connection(xlator_t *this, struct gf_brick_spec *brick,
+ gf_boolean_t ordered, void *xl)
+{
+ int ret = 0;
+ gf_private_t *priv = NULL;
+ gf_changelog_t *entry = NULL;
+
+ priv = this->private;
+
+ if (!brick->callback || !brick->init || !brick->fini)
+ goto error_return;
+
+ entry = GF_CALLOC(1, sizeof(*entry), gf_changelog_mt_libgfchangelog_t);
+ if (!entry)
+ goto error_return;
+ INIT_LIST_HEAD(&entry->list);
+
+ LOCK_INIT(&entry->statelock);
+ entry->connstate = GF_CHANGELOG_CONN_STATE_PENDING;
+
+ entry->notify = brick->filter;
+ if (snprintf(entry->brick, PATH_MAX, "%s", brick->brick_path) >= PATH_MAX)
+ goto free_entry;
+
+ entry->this = this;
+ entry->invokerxl = xl;
+
+ entry->ordered = ordered;
+ ret = gf_init_event(entry);
+ if (ret)
+ goto free_entry;
+
+ entry->fini = brick->fini;
+ entry->callback = brick->callback;
+ entry->connected = brick->connected;
+ entry->disconnected = brick->disconnected;
+
+ entry->ptr = brick->init(this, brick);
+ if (!entry->ptr)
+ goto cleanup_event;
+ priv->api = entry->ptr; /* pointer to API, if required */
+
+ pthread_mutex_lock(&priv->lock);
+ {
+ list_add_tail(&entry->list, &priv->connections);
+ }
+ pthread_mutex_unlock(&priv->lock);
+
+ ret = gf_changelog_setup_rpc(this, entry, CHANGELOG_RPC_PROBE_FILTER);
+ if (ret)
+ goto cleanup_event;
+ return 0;
+
+cleanup_event:
+ (void)gf_cleanup_event(this, &entry->event);
+free_entry:
+ gf_msg_debug(this->name, 0, "freeing entry %p", entry);
+ list_del(&entry->list); /* FIXME: kludge for now */
+ GF_FREE(entry);
+error_return:
+ return -1;
+}
- gfc = GF_CALLOC (1, sizeof (*gfc),
- gf_changelog_mt_libgfchangelog_t);
- if (!gfc)
- goto out;
+int
+gf_changelog_register_brick(xlator_t *this, struct gf_brick_spec *brick,
+ gf_boolean_t ordered, void *xl)
+{
+ return gf_setup_brick_connection(this, brick, ordered, xl);
+}
- gfc->this = this;
+static int
+gf_changelog_setup_logging(xlator_t *this, char *logfile, int loglevel)
+{
+ /* passing ident as NULL means to use default ident for syslog */
+ if (gf_log_init(this->ctx, logfile, NULL))
+ return -1;
- gfc->gfc_dir = NULL;
- gfc->gfc_fd = gfc->gfc_sockfd = -1;
+ gf_log_set_loglevel(this->ctx, (loglevel == -1) ? GF_LOG_INFO : loglevel);
+ return 0;
+}
- gfc->gfc_working_dir = realpath (scratch_dir, NULL);
- if (!gfc->gfc_working_dir) {
- errn = errno;
- goto cleanup;
+static int
+gf_changelog_set_master(xlator_t *master, void *xl)
+{
+ int32_t ret = 0;
+ xlator_t *this = NULL;
+ xlator_t *old_this = NULL;
+ gf_private_t *priv = NULL;
+
+ this = xl;
+ if (!this || !this->ctx) {
+ ret = gf_changelog_init_master();
+ if (ret)
+ return -1;
+ this = THIS;
+ }
+
+ master->ctx = this->ctx;
+
+ INIT_LIST_HEAD(&master->volume_options);
+ SAVE_THIS(THIS);
+
+ ret = xlator_mem_acct_init(THIS, gf_changelog_mt_end);
+ if (ret != 0)
+ goto restore_this;
+
+ priv = gf_changelog_alloc_priv();
+ if (!priv) {
+ ret = -1;
+ goto restore_this;
+ }
+
+ if (!xl) {
+ /* poller thread */
+ ret = gf_thread_create(&priv->poller, NULL, changelog_rpc_poller, THIS,
+ "clogpoll");
+ if (ret != 0) {
+ GF_FREE(priv);
+ gf_msg(master->name, GF_LOG_ERROR, 0,
+ CHANGELOG_LIB_MSG_THREAD_CREATION_FAILED,
+ "failed to spawn poller thread");
+ goto restore_this;
}
+ }
- /* Begin: Changes for History API */
- gfc->hist_gfc = NULL;
-
- gfc->hist_gfc = GF_CALLOC (1, sizeof (*gfc),
- gf_changelog_mt_libgfchangelog_t);
- if (!gfc->hist_gfc)
- goto cleanup;
+ master->private = priv;
- gfc->hist_gfc->gfc_dir = NULL;
- gfc->hist_gfc->gfc_fd = gfc->hist_gfc->gfc_sockfd = -1;
- gfc->hist_gfc->this = NULL;
+restore_this:
+ RESTORE_THIS();
- (void) strncpy (hist_scratch_dir, scratch_dir, PATH_MAX);
- (void) snprintf (hist_scratch_dir, PATH_MAX,
- "%s/"GF_CHANGELOG_HISTORY_DIR"/",
- gfc->gfc_working_dir);
-
- ret = mkdir_p (hist_scratch_dir, 0600, _gf_false);
- if (ret) {
- errn = errno;
- goto cleanup;
- }
+ return ret;
+}
- gfc->hist_gfc->gfc_working_dir = realpath (hist_scratch_dir, NULL);
- if (!gfc->hist_gfc->gfc_working_dir) {
- errn = errno;
- goto cleanup;
- }
+int
+gf_changelog_init(void *xl)
+{
+ int ret = 0;
+ gf_private_t *priv = NULL;
- ret = gf_changelog_open_dirs (gfc->hist_gfc);
- if (ret) {
- errn = errno;
- gf_log (this->name, GF_LOG_ERROR,
- "could not create entries in history scratch dir");
- goto cleanup;
- }
+ if (master)
+ return 0;
- (void) strncpy (gfc->hist_gfc->gfc_brickpath, brick_path, PATH_MAX);
+ master = calloc(1, sizeof(*master));
+ if (!master)
+ goto error_return;
+
+ master->name = strdup("gfchangelog");
+ if (!master->name)
+ goto dealloc_master;
+
+ ret = gf_changelog_set_master(master, xl);
+ if (ret)
+ goto dealloc_name;
+
+ priv = master->private;
+ ret = gf_thread_create(&priv->connectionjanitor, NULL,
+ gf_changelog_connection_janitor, master, "clogjan");
+ if (ret != 0) {
+ /* TODO: cleanup priv, mutex (poller thread for !xl) */
+ goto dealloc_name;
+ }
+
+ return 0;
+
+dealloc_name:
+ free(master->name);
+dealloc_master:
+ free(master);
+ master = NULL;
+error_return:
+ return -1;
+}
- for (i=0; i < 256; i++) {
- gfc->hist_gfc->rfc3986[i] =
- (isalnum(i) || i == '~' ||
- i == '-' || i == '.' || i == '_') ? i : 0;
- }
- /* End: Changes for History API*/
-
- ret = gf_changelog_open_dirs (gfc);
- if (ret) {
- errn = errno;
- gf_log (this->name, GF_LOG_ERROR,
- "could not create entries in scratch dir");
- goto cleanup;
+int
+gf_changelog_register_generic(struct gf_brick_spec *bricks, int count,
+ int ordered, char *logfile, int lvl, void *xl)
+{
+ int ret = 0;
+ xlator_t *this = NULL;
+ xlator_t *old_this = NULL;
+ struct gf_brick_spec *brick = NULL;
+ gf_boolean_t need_order = _gf_false;
+
+ SAVE_THIS(xl);
+
+ this = THIS;
+ if (!this)
+ goto error_return;
+
+ ret = gf_changelog_setup_logging(this, logfile, lvl);
+ if (ret)
+ goto error_return;
+
+ need_order = (ordered) ? _gf_true : _gf_false;
+
+ brick = bricks;
+ while (count--) {
+ gf_smsg(this->name, GF_LOG_INFO, 0,
+ CHANGELOG_LIB_MSG_NOTIFY_REGISTER_INFO, "brick=%s",
+ brick->brick_path, "notify_filter=%d", brick->filter, NULL);
+
+ ret = gf_changelog_register_brick(this, brick, need_order, xl);
+ if (ret != 0) {
+ gf_msg(this->name, GF_LOG_ERROR, 0,
+ CHANGELOG_LIB_MSG_NOTIFY_REGISTER_FAILED,
+ "Error registering with changelog xlator");
+ break;
}
- /* passing ident as NULL means to use default ident for syslog */
- if (gf_log_init (this->ctx, log_file, NULL))
- goto cleanup;
+ brick++;
+ }
- gf_log_set_loglevel ((log_level == -1) ? GF_LOG_INFO :
- log_level);
+ if (ret != 0)
+ goto cleanup_inited_bricks;
- gfc->gfc_connretries = (max_reconnects <= 0) ? 1 : max_reconnects;
- (void) strncpy (gfc->gfc_brickpath, brick_path, PATH_MAX);
+ RESTORE_THIS();
+ return 0;
- ret = gf_changelog_notification_init (this, gfc);
- if (ret) {
- errn = errno;
- goto cleanup;
- }
+cleanup_inited_bricks:
+ gf_cleanup_connections(this);
+error_return:
+ RESTORE_THIS();
+ return -1;
+}
- ret = gf_thread_create (&gfc->gfc_changelog_processor,
- NULL, gf_changelog_process, gfc);
- if (ret) {
- errn = errno;
- gf_log (this->name, GF_LOG_ERROR,
- "error creating changelog processor thread"
- " new changes won't be recorded!!!");
- goto cleanup;
- }
+/**
+ * @API
+ * gf_changelog_register()
+ *
+ * This is _NOT_ a generic register API. It's a special API to handle
+ * updates at a journal granulality. This is used by consumers wanting
+ * to process persistent journal such as geo-replication via a set of
+ * APIs. All of this is required to maintain backward compatibility.
+ * Owner specific private data is stored in ->api (in gf_private_t),
+ * which is used by APIs to access it's private data. This limits
+ * the library access to a single brick, but that's how it used to
+ * be anyway. Furthermore, this API solely _owns_ "this", therefore
+ * callers already having a notion of "this" are expected to use the
+ * newer API.
+ *
+ * Newer applications wanting to use this library need not face this
+ * limitation and reply of the much more feature rich generic register
+ * API, which is purely callback based.
+ *
+ * NOTE: @max_reconnects is not used but required for backward compat.
+ *
+ * For generic API, refer gf_changelog_register_generic().
+ */
+int
+gf_changelog_register(char *brick_path, char *scratch_dir, char *log_file,
+ int log_level, int max_reconnects)
+{
+ struct gf_brick_spec brick = {
+ 0,
+ };
- for (i=0; i < 256; i++) {
- gfc->rfc3986[i] =
- (isalnum(i) || i == '~' ||
- i == '-' || i == '.' || i == '_') ? i : 0;
- }
+ if (master)
+ THIS = master;
+ else
+ return -1;
- ret = 0;
- this->private = gfc;
+ brick.brick_path = brick_path;
+ brick.filter = CHANGELOG_OP_TYPE_JOURNAL;
- goto out;
+ brick.init = gf_changelog_journal_init;
+ brick.fini = gf_changelog_journal_fini;
+ brick.callback = gf_changelog_handle_journal;
+ brick.connected = gf_changelog_journal_connect;
+ brick.disconnected = gf_changelog_journal_disconnect;
- cleanup:
- if (gfc->hist_gfc) {
- gf_changelog_cleanup (gfc->hist_gfc);
- GF_FREE (gfc->hist_gfc);
- }
- gf_changelog_cleanup (gfc);
- GF_FREE (gfc);
- this->private = NULL;
- errno = errn;
+ brick.ptr = scratch_dir;
- out:
- return ret;
+ return gf_changelog_register_generic(&brick, 1, 1, log_file, log_level,
+ NULL);
}
diff --git a/xlators/features/changelog/lib/src/gf-history-changelog.c b/xlators/features/changelog/lib/src/gf-history-changelog.c
index bfc4cd37dc3..a16219f3664 100644
--- a/xlators/features/changelog/lib/src/gf-history-changelog.c
+++ b/xlators/features/changelog/lib/src/gf-history-changelog.c
@@ -8,17 +8,21 @@
#endif
#include <string.h>
-#include "globals.h"
-#include "glusterfs.h"
-#include "logging.h"
+#include <glusterfs/globals.h>
+#include <glusterfs/glusterfs.h>
+#include <glusterfs/logging.h>
+#include <glusterfs/syscall.h>
#include "gf-changelog-helpers.h"
+#include "gf-changelog-journal.h"
/* from the changelog translator */
#include "changelog-misc.h"
+#include "changelog-lib-messages.h"
#include "changelog-mem-types.h"
-/*@API
+/**
+ * @API
* gf_history_changelog_done:
* Move processed history changelog file from .processing
* to .processed
@@ -32,64 +36,66 @@
* -1: On error.
*/
int
-gf_history_changelog_done (char *file)
+gf_history_changelog_done(char *file)
{
- int ret = -1;
- char *buffer = NULL;
- xlator_t *this = NULL;
- gf_changelog_t *gfc = NULL;
- gf_changelog_t *hist_gfc = NULL;
- char to_path[PATH_MAX] = {0,};
-
- errno = EINVAL;
-
- this = THIS;
- if (!this)
- goto out;
-
- gfc = (gf_changelog_t *) this->private;
- if (!gfc)
- goto out;
-
- hist_gfc = gfc->hist_gfc;
- if (!hist_gfc)
- goto out;
-
- if (!file || !strlen (file))
- goto out;
-
- /* make sure 'file' is inside ->gfc_working_dir */
- buffer = realpath (file, NULL);
- if (!buffer)
- goto out;
-
- if (strncmp (hist_gfc->gfc_working_dir,
- buffer, strlen (hist_gfc->gfc_working_dir)))
- goto out;
-
- (void) snprintf (to_path, PATH_MAX, "%s%s",
- hist_gfc->gfc_processed_dir, basename (buffer));
- gf_log (this->name, GF_LOG_DEBUG,
- "moving %s to processed directory", file);
- ret = rename (buffer, to_path);
- if (ret) {
- gf_log (this->name, GF_LOG_ERROR,
- "cannot move %s to %s (reason: %s)",
- file, to_path, strerror (errno));
- goto out;
- }
-
- ret = 0;
-
- out:
- if (buffer)
- free (buffer); /* allocated by realpath() */
- return ret;
+ int ret = -1;
+ char *buffer = NULL;
+ xlator_t *this = NULL;
+ gf_changelog_journal_t *jnl = NULL;
+ gf_changelog_journal_t *hist_jnl = NULL;
+ char to_path[PATH_MAX] = {
+ 0,
+ };
+
+ errno = EINVAL;
+
+ this = THIS;
+ if (!this)
+ goto out;
+
+ jnl = (gf_changelog_journal_t *)GF_CHANGELOG_GET_API_PTR(this);
+ if (!jnl)
+ goto out;
+
+ hist_jnl = jnl->hist_jnl;
+ if (!hist_jnl)
+ goto out;
+
+ if (!file || !strlen(file))
+ goto out;
+
+ /* make sure 'file' is inside ->jnl_working_dir */
+ buffer = realpath(file, NULL);
+ if (!buffer)
+ goto out;
+
+ if (strncmp(hist_jnl->jnl_working_dir, buffer,
+ strlen(hist_jnl->jnl_working_dir)))
+ goto out;
+
+ (void)snprintf(to_path, PATH_MAX, "%s%s", hist_jnl->jnl_processed_dir,
+ basename(buffer));
+ gf_msg_debug(this->name, 0, "moving %s to processed directory", file);
+ ret = sys_rename(buffer, to_path);
+ if (ret) {
+ gf_smsg(this->name, GF_LOG_ERROR, errno,
+ CHANGELOG_LIB_MSG_RENAME_FAILED, "from=%s", file, "to=%s",
+ to_path, NULL);
+ goto out;
+ }
+
+ ret = 0;
+
+out:
+ if (buffer)
+ free(buffer); /* allocated by realpath() */
+ return ret;
}
+
/**
* @API
* gf_history_changelog_start_fresh:
- * For a set of changelogs, start from the begining.
+ * For a set of changelogs, start from the beginning.
* It will truncates the history tracker fd.
*
* RETURN VALUES:
@@ -97,36 +103,36 @@ gf_history_changelog_done (char *file)
* -1: On error.
*/
int
-gf_history_changelog_start_fresh ()
+gf_history_changelog_start_fresh()
{
- xlator_t *this = NULL;
- gf_changelog_t *gfc = NULL;
- gf_changelog_t *hist_gfc = NULL;
+ xlator_t *this = NULL;
+ gf_changelog_journal_t *jnl = NULL;
+ gf_changelog_journal_t *hist_jnl = NULL;
- this = THIS;
- if (!this)
- goto out;
+ this = THIS;
+ if (!this)
+ goto out;
- errno = EINVAL;
+ errno = EINVAL;
- gfc = (gf_changelog_t *) this->private;
- if (!gfc)
- goto out;
+ jnl = (gf_changelog_journal_t *)GF_CHANGELOG_GET_API_PTR(this);
+ if (!jnl)
+ goto out;
- hist_gfc = gfc->hist_gfc;
- if (!hist_gfc)
- goto out;
+ hist_jnl = jnl->hist_jnl;
+ if (!hist_jnl)
+ goto out;
- if (gf_ftruncate (hist_gfc->gfc_fd, 0))
- goto out;
+ if (gf_ftruncate(hist_jnl->jnl_fd, 0))
+ goto out;
- return 0;
+ return 0;
- out:
- return -1;
+out:
+ return -1;
}
-/*
+/**
* @API
* gf_history_changelog_next_change:
* Return the next history changelog file entry. Zero means all
@@ -142,133 +148,873 @@ gf_history_changelog_start_fresh ()
* -1 : On error.
*/
ssize_t
-gf_history_changelog_next_change (char *bufptr, size_t maxlen)
+gf_history_changelog_next_change(char *bufptr, size_t maxlen)
{
- ssize_t size = 0;
- int tracker_fd = 0;
- xlator_t *this = NULL;
- gf_changelog_t *gfc = NULL;
- gf_changelog_t *hist_gfc = NULL;
- char buffer[PATH_MAX] = {0,};
+ ssize_t size = -1;
+ int tracker_fd = 0;
+ xlator_t *this = NULL;
+ gf_changelog_journal_t *jnl = NULL;
+ gf_changelog_journal_t *hist_jnl = NULL;
+ char buffer[PATH_MAX] = {
+ 0,
+ };
+
+ if (maxlen > PATH_MAX) {
+ errno = ENAMETOOLONG;
+ goto out;
+ }
+
+ errno = EINVAL;
+
+ this = THIS;
+ if (!this)
+ goto out;
+
+ jnl = (gf_changelog_journal_t *)GF_CHANGELOG_GET_API_PTR(this);
+ if (!jnl)
+ goto out;
+
+ hist_jnl = jnl->hist_jnl;
+ if (!hist_jnl)
+ goto out;
+
+ tracker_fd = hist_jnl->jnl_fd;
+
+ size = gf_readline(tracker_fd, buffer, maxlen);
+ if (size < 0) {
+ size = -1;
+ goto out;
+ }
+
+ if (size == 0)
+ goto out;
+
+ memcpy(bufptr, buffer, size - 1);
+ bufptr[size - 1] = '\0';
+
+out:
+ return size;
+}
- errno = EINVAL;
+/**
+ * @API
+ * gf_history_changelog_scan:
+ * Scan and generate a list of change entries.
+ * Calling this api multiple times (without calling gf_changlog_done())
+ * would result new changelogs(s) being refreshed in the tracker file.
+ * This call also acts as a cancellation point for the consumer.
+ *
+ * RETURN VALUES:
+ * +ve integer : success and keep scanning.(count of changelogs)
+ * 0 : success and done scanning.
+ * -1 : error.
+ *
+ * NOTE: After first 0 return call_get_next change for once more time
+ * to empty the tracker
+ *
+ */
+ssize_t
+gf_history_changelog_scan()
+{
+ int tracker_fd = 0;
+ size_t off = 0;
+ xlator_t *this = NULL;
+ size_t nr_entries = 0;
+ gf_changelog_journal_t *jnl = NULL;
+ gf_changelog_journal_t *hist_jnl = NULL;
+ struct dirent *entry = NULL;
+ struct dirent scratch[2] = {
+ {
+ 0,
+ },
+ };
+ char buffer[PATH_MAX] = {
+ 0,
+ };
+ static int is_last_scan;
+
+ this = THIS;
+ if (!this)
+ goto out;
+
+ jnl = (gf_changelog_journal_t *)GF_CHANGELOG_GET_API_PTR(this);
+ if (!jnl)
+ goto out;
+ if (JNL_IS_API_DISCONNECTED(jnl)) {
+ errno = ENOTCONN;
+ goto out;
+ }
+
+ hist_jnl = jnl->hist_jnl;
+ if (!hist_jnl)
+ goto out;
+
+retry:
+ if (is_last_scan == 1)
+ return 0;
+ if (hist_jnl->hist_done == 0)
+ is_last_scan = 1;
- this = THIS;
- if (!this)
- goto out;
+ errno = EINVAL;
+ if (hist_jnl->hist_done == -1)
+ goto out;
- gfc = (gf_changelog_t *) this->private;
- if (!gfc)
- goto out;
+ tracker_fd = hist_jnl->jnl_fd;
- hist_gfc = gfc->hist_gfc;
- if (!hist_gfc)
- goto out;
+ if (gf_ftruncate(tracker_fd, 0))
+ goto out;
- tracker_fd = hist_gfc->gfc_fd;
+ rewinddir(hist_jnl->jnl_dir);
- size = gf_readline (tracker_fd, buffer, maxlen);
- if (size < 0)
- goto out;
- if (size == 0)
- return 0;
+ for (;;) {
+ errno = 0;
+ entry = sys_readdir(hist_jnl->jnl_dir, scratch);
+ if (!entry || errno != 0)
+ break;
+
+ if (strcmp(basename(entry->d_name), ".") == 0 ||
+ strcmp(basename(entry->d_name), "..") == 0)
+ continue;
- memcpy (bufptr, buffer, size - 1);
- *(buffer + size) = '\0';
+ nr_entries++;
- return size;
+ GF_CHANGELOG_FILL_BUFFER(hist_jnl->jnl_processing_dir, buffer, off,
+ strlen(hist_jnl->jnl_processing_dir));
+ GF_CHANGELOG_FILL_BUFFER(entry->d_name, buffer, off,
+ strlen(entry->d_name));
+ GF_CHANGELOG_FILL_BUFFER("\n", buffer, off, 1);
+
+ if (gf_changelog_write(tracker_fd, buffer, off) != off) {
+ gf_msg(this->name, GF_LOG_ERROR, 0, CHANGELOG_LIB_MSG_WRITE_FAILED,
+ "error writing changelog filename"
+ " to tracker file");
+ break;
+ }
+ off = 0;
+ }
+
+ gf_msg_debug(this->name, 0, "hist_done %d, is_last_scan: %d",
+ hist_jnl->hist_done, is_last_scan);
+
+ if (!entry) {
+ if (gf_lseek(tracker_fd, 0, SEEK_SET) != -1) {
+ if (nr_entries > 0)
+ return nr_entries;
+ else {
+ sleep(1);
+ goto retry;
+ }
+ }
+ }
+out:
+ return -1;
+}
- out:
+/*
+ * Gets timestamp value at the changelog path at index.
+ * Returns 0 on success(updates given time-stamp), -1 on failure.
+ */
+int
+gf_history_get_timestamp(int fd, int index, int len, unsigned long *ts)
+{
+ xlator_t *this = NULL;
+ int n_read = -1;
+ char path_buf[PATH_MAX] = {
+ 0,
+ };
+ char *iter = path_buf;
+ size_t offset = index * (len + 1);
+ unsigned long value = 0;
+ int ret = 0;
+
+ this = THIS;
+ if (!this) {
return -1;
+ }
+
+ n_read = sys_pread(fd, path_buf, len, offset);
+ if (n_read < 0) {
+ ret = -1;
+ gf_msg(this->name, GF_LOG_ERROR, errno, CHANGELOG_LIB_MSG_READ_ERROR,
+ "could not read from htime file");
+ goto out;
+ }
+ iter += len - TIMESTAMP_LENGTH;
+ sscanf(iter, "%lu", &value);
+out:
+ if (ret == 0)
+ *ts = value;
+ return ret;
}
/*
- * @API
- * gf_history_changelog_scan:
- * Scan and generate a list of change entries.
- * Calling this api multiple times (without calling gf_changlog_done())
- * would result new changelogs(s) being refreshed in the tracker file.
- * This call also acts as a cancellation point for the consumer.
+ * Function to ensure correctness of search
+ * Checks whether @value is there next to @target_index or not
+ */
+int
+gf_history_check(int fd, int target_index, unsigned long value, int len)
+{
+ int ret = 0;
+ unsigned long ts1 = 0;
+ unsigned long ts2 = 0;
+
+ if (target_index == 0) {
+ ret = gf_history_get_timestamp(fd, target_index, len, &ts1);
+ if (ret == -1)
+ goto out;
+ if (value <= ts1)
+ goto out;
+ else {
+ ret = -1;
+ goto out;
+ }
+ }
+
+ ret = gf_history_get_timestamp(fd, target_index, len, &ts1);
+ if (ret == -1)
+ goto out;
+ ret = gf_history_get_timestamp(fd, target_index - 1, len, &ts2);
+ if (ret == -1)
+ goto out;
+
+ if ((value <= ts1) && (value > ts2)) {
+ goto out;
+ } else
+ ret = -1;
+out:
+ return ret;
+}
+
+/*
+ * This is a "binary search" based search function which checks neighbours
+ * for in-range availability of the value to be searched and provides the
+ * index at which the changelog file nearest to the requested timestamp(value)
+ * can be read from.
*
- * RETURN VALUES:
- * nr_entries: On success.
- * -1 : On error.
+ * Actual offset can be calculated as (index* (len+1) ).
+ * "1" is because the changelog paths are null terminated.
+ *
+ * @path : Htime file to search in
+ * @value : time stamp to search
+ * @from : start index to search
+ * @to : end index to search
+ * @len : length of fixes length strings separated by null
*/
-ssize_t
-gf_history_changelog_scan ()
+
+int
+gf_history_b_search(int fd, unsigned long value, unsigned long from,
+ unsigned long to, int len)
{
- int ret = 0;
- int tracker_fd = 0;
- size_t len = 0;
- size_t off = 0;
- xlator_t *this = NULL;
- size_t nr_entries = 0;
- gf_changelog_t *gfc = NULL;
- gf_changelog_t *hist_gfc = NULL;
- struct dirent *entryp = NULL;
- struct dirent *result = NULL;
- char buffer[PATH_MAX] = {0,};
-
- this = THIS;
- if (!this)
+ int m_index = -1;
+ unsigned long cur_value = 0;
+ unsigned long ts1 = 0;
+ int ret = 0;
+
+ m_index = (from + to) / 2;
+
+ if ((to - from) <= 1) {
+ /* either one or 2 changelogs left */
+ if (to != from) {
+ /* check if value is less or greater than to
+ * return accordingly
+ */
+ ret = gf_history_get_timestamp(fd, from, len, &ts1);
+ if (ret == -1)
goto out;
-
- gfc = (gf_changelog_t *) this->private;
- if (!gfc)
+ if (ts1 >= value) {
+ /* actually compatision should be
+ * exactly == but considering
+ *
+ * case of only 2 changelogs in htime file
+ */
+ return from;
+ } else
+ return to;
+ } else
+ return to;
+ }
+
+ ret = gf_history_get_timestamp(fd, m_index, len, &cur_value);
+ if (ret == -1)
+ goto out;
+ if (cur_value == value) {
+ return m_index;
+ } else if (value > cur_value) {
+ ret = gf_history_get_timestamp(fd, m_index + 1, len, &cur_value);
+ if (ret == -1)
+ goto out;
+ if (value < cur_value)
+ return m_index + 1;
+ else
+ return gf_history_b_search(fd, value, m_index + 1, to, len);
+ } else {
+ if (m_index == 0) {
+ /* we are sure that values exists
+ * in this htime file
+ */
+ return 0;
+ } else {
+ ret = gf_history_get_timestamp(fd, m_index - 1, len, &cur_value);
+ if (ret == -1)
goto out;
+ if (value > cur_value) {
+ return m_index;
+ } else
+ return gf_history_b_search(fd, value, from, m_index - 1, len);
+ }
+ }
+out:
+ return -1;
+}
- hist_gfc = gfc->hist_gfc;
- if (!hist_gfc)
- goto out;
+/*
+ * Description: Checks if the changelog path is usable or not,
+ * which is differentiated by checking for "changelog"
+ * in the path and not "CHANGELOG".
+ *
+ * Returns:
+ * 1 : Yes, usable ( contains "CHANGELOG" )
+ * 0 : No, Not usable ( contains, "changelog")
+ */
+int
+gf_is_changelog_usable(char *cl_path)
+{
+ int ret = -1;
+ const char low_c[] = "changelog";
+ char *str_ret = NULL;
+ char *bname = NULL;
- errno = EINVAL;
+ bname = basename(cl_path);
- tracker_fd = hist_gfc->gfc_fd;
+ str_ret = strstr(bname, low_c);
- if (gf_ftruncate (tracker_fd, 0))
- goto out;
+ if (str_ret != NULL)
+ ret = 0;
+ else
+ ret = 1;
- len = offsetof(struct dirent, d_name)
- + pathconf(hist_gfc->gfc_processing_dir, _PC_NAME_MAX) + 1;
- entryp = GF_CALLOC (1, len,
- gf_changelog_mt_libgfchangelog_dirent_t);
- if (!entryp)
- goto out;
+ return ret;
+}
- rewinddir (hist_gfc->gfc_dir);
- while (1) {
- ret = readdir_r (hist_gfc->gfc_dir, entryp, &result);
- if (ret || !result)
- break;
-
- if ( !strcmp (basename (entryp->d_name), ".")
- || !strcmp (basename (entryp->d_name), "..") )
- continue;
-
- nr_entries++;
-
- GF_CHANGELOG_FILL_BUFFER (hist_gfc->gfc_processing_dir,
- buffer, off,
- strlen (hist_gfc->gfc_processing_dir));
- GF_CHANGELOG_FILL_BUFFER (entryp->d_name, buffer,
- off, strlen (entryp->d_name));
- GF_CHANGELOG_FILL_BUFFER ("\n", buffer, off, 1);
-
- if (gf_changelog_write (tracker_fd, buffer, off) != off) {
- gf_log (this->name, GF_LOG_ERROR,
- "error writing changelog filename"
- " to tracker file");
- break;
- }
- off = 0;
+void *
+gf_changelog_consume_wrap(void *data)
+{
+ int ret = -1;
+ ssize_t nread = 0;
+ xlator_t *this = NULL;
+ gf_changelog_consume_data_t *ccd = NULL;
+
+ ccd = (gf_changelog_consume_data_t *)data;
+ this = ccd->this;
+
+ ccd->retval = -1;
+
+ nread = sys_pread(ccd->fd, ccd->changelog, PATH_MAX - 1, ccd->offset);
+ if (nread < 0) {
+ gf_msg(this->name, GF_LOG_ERROR, errno, CHANGELOG_LIB_MSG_READ_ERROR,
+ "cannot read from history metadata file");
+ goto out;
+ }
+
+ /* TODO: handle short reads and EOF. */
+ if (gf_is_changelog_usable(ccd->changelog) == 1) {
+ ret = gf_changelog_consume(ccd->this, ccd->jnl, ccd->changelog,
+ _gf_true);
+ if (ret) {
+ gf_smsg(this->name, GF_LOG_ERROR, 0, CHANGELOG_LIB_MSG_PARSE_ERROR,
+ "name=%s", ccd->changelog, NULL);
+ goto out;
}
+ }
+ ccd->retval = 0;
- GF_FREE (entryp);
+out:
+ return NULL;
+}
+
+/**
+ * "gf_history_consume" is a worker function for history.
+ * parses and moves changelogs files from index "from"
+ * to index "to" in open htime file whose fd is "fd".
+ */
+
+#define MAX_PARALLELS 10
+
+void *
+gf_history_consume(void *data)
+{
+ xlator_t *this = NULL;
+ gf_changelog_journal_t *jnl = NULL;
+ gf_changelog_journal_t *hist_jnl = NULL;
+ int ret = 0;
+ int iter = 0;
+ int fd = -1;
+ int from = -1;
+ int to = -1;
+ int len = -1;
+ int n_parallel = 0;
+ int n_envoked = 0;
+ gf_boolean_t publish = _gf_true;
+ pthread_t th_id[MAX_PARALLELS] = {
+ 0,
+ };
+ gf_changelog_history_data_t *hist_data = NULL;
+ gf_changelog_consume_data_t ccd[MAX_PARALLELS] = {
+ {0},
+ };
+ gf_changelog_consume_data_t *curr = NULL;
+
+ hist_data = (gf_changelog_history_data_t *)data;
+ if (hist_data == NULL) {
+ ret = -1;
+ goto out;
+ }
+
+ fd = hist_data->htime_fd;
+ from = hist_data->from;
+ to = hist_data->to;
+ len = hist_data->len;
+ n_parallel = hist_data->n_parallel;
+
+ THIS = hist_data->this;
+ this = hist_data->this;
+ if (!this) {
+ ret = -1;
+ goto out;
+ }
+
+ jnl = (gf_changelog_journal_t *)GF_CHANGELOG_GET_API_PTR(this);
+ if (!jnl) {
+ ret = -1;
+ goto out;
+ }
+
+ hist_jnl = jnl->hist_jnl;
+ if (!hist_jnl) {
+ ret = -1;
+ goto out;
+ }
+
+ while (from <= to) {
+ n_envoked = 0;
+
+ for (iter = 0; (iter < n_parallel) && (from <= to); iter++) {
+ curr = &ccd[iter];
+
+ curr->this = this;
+ curr->jnl = hist_jnl;
+ curr->fd = fd;
+ curr->offset = from * (len + 1);
+
+ curr->retval = 0;
+ memset(curr->changelog, '\0', PATH_MAX);
+
+ ret = gf_thread_create(&th_id[iter], NULL,
+ gf_changelog_consume_wrap, curr,
+ "clogc%03hx", (iter + 1) & 0x3ff);
+ if (ret) {
+ gf_msg(this->name, GF_LOG_ERROR, ret,
+ CHANGELOG_LIB_MSG_THREAD_CREATION_FAILED,
+ "could not create consume-thread");
+ goto sync;
+ } else
+ n_envoked++;
+
+ from++;
+ }
- if (!result) {
- if (gf_lseek (tracker_fd, 0, SEEK_SET) != -1)
- return nr_entries;
+ sync:
+ for (iter = 0; iter < n_envoked; iter++) {
+ ret = pthread_join(th_id[iter], NULL);
+ if (ret) {
+ publish = _gf_false;
+ gf_msg(this->name, GF_LOG_ERROR, ret,
+ CHANGELOG_LIB_MSG_PTHREAD_JOIN_FAILED,
+ "pthread_join() error");
+ /* try to join the rest */
+ continue;
+ }
+
+ if (publish == _gf_false)
+ continue;
+
+ curr = &ccd[iter];
+ if (ccd->retval) {
+ publish = _gf_false;
+ gf_smsg(this->name, GF_LOG_ERROR, 0,
+ CHANGELOG_LIB_MSG_PARSE_ERROR_CEASED, NULL);
+ continue;
+ }
+
+ ret = gf_changelog_publish(curr->this, curr->jnl, curr->changelog);
+ if (ret) {
+ publish = _gf_false;
+ gf_msg(this->name, GF_LOG_ERROR, 0,
+ CHANGELOG_LIB_MSG_PUBLISH_ERROR,
+ "publish error, ceased publishing...");
+ }
}
- out:
+ }
+
+ /* informing "parsing done". */
+ hist_jnl->hist_done = (publish == _gf_true) ? 0 : -1;
+
+out:
+ if (fd != -1)
+ (void)sys_close(fd);
+ GF_FREE(hist_data);
+ return NULL;
+}
+
+/**
+ * @API
+ * gf_history_changelog() : Get/parse historical changelogs and get them ready
+ * for consumption.
+ *
+ * Arguments:
+ * @changelog_dir : Directory location from where history changelogs are
+ * supposed to be consumed.
+ * @start: Unix timestamp FROM where changelogs should be consumed.
+ * @end: Unix timestamp TO where changelogsshould be consumed.
+ * @n_parallel : degree of parallelism while changelog parsing.
+ * @actual_end : the end time till where changelogs are available.
+ *
+ * Return:
+ * Returns <timestamp> on success, the last time till where changelogs are
+ * available.
+ * Returns -1 on failure(error).
+ */
+
+/**
+ * Extract timestamp range from a historical metadata file
+ * Returns:
+ * 0 : Success ({min,max}_ts with the appropriate values)
+ * -1 : Failure
+ * -2 : Ignore this metadata file and process next
+ */
+int
+gf_changelog_extract_min_max(const char *dname, const char *htime_dir, int *fd,
+ unsigned long *total, unsigned long *min_ts,
+ unsigned long *max_ts)
+{
+ int ret = -1;
+ xlator_t *this = NULL;
+ char htime_file[PATH_MAX] = {
+ 0,
+ };
+ struct stat stbuf = {
+ 0,
+ };
+ char *iter = NULL;
+ char x_value[30] = {
+ 0,
+ };
+
+ this = THIS;
+
+ snprintf(htime_file, PATH_MAX, "%s/%s", htime_dir, dname);
+
+ iter = (htime_file + strlen(htime_file) - TIMESTAMP_LENGTH);
+ sscanf(iter, "%lu", min_ts);
+
+ ret = sys_stat(htime_file, &stbuf);
+ if (ret) {
+ ret = -1;
+ gf_smsg(this->name, GF_LOG_ERROR, errno, CHANGELOG_LIB_MSG_HTIME_ERROR,
+ "op=stat", "path=%s", htime_file, NULL);
+ goto out;
+ }
+
+ /* ignore everything except regular files */
+ if (!S_ISREG(stbuf.st_mode)) {
+ ret = -2;
+ goto out;
+ }
+
+ *fd = open(htime_file, O_RDONLY);
+ if (*fd < 0) {
+ ret = -1;
+ gf_smsg(this->name, GF_LOG_ERROR, errno, CHANGELOG_LIB_MSG_HTIME_ERROR,
+ "op=open", "path=%s", htime_file, NULL);
+ goto out;
+ }
+
+ /* Looks good, extract max timestamp */
+ ret = sys_fgetxattr(*fd, HTIME_KEY, x_value, sizeof(x_value));
+ if (ret < 0) {
+ ret = -1;
+ gf_smsg(this->name, GF_LOG_ERROR, errno,
+ CHANGELOG_LIB_MSG_GET_XATTR_FAILED, "path=%s", htime_file,
+ NULL);
+ goto out;
+ }
+
+ sscanf(x_value, "%lu:%lu", max_ts, total);
+ gf_smsg(this->name, GF_LOG_INFO, 0, CHANGELOG_LIB_MSG_MIN_MAX_INFO,
+ "min=%lu", *min_ts, "max=%lu", *max_ts, "total_changelogs=%lu",
+ *total, NULL);
+
+ ret = 0;
+
+out:
+ return ret;
+}
+
+/* gf_history_changelog returns actual_end and spawns threads to
+ * parse historical changelogs. The return values are as follows.
+ * 0 : On success
+ * 1 : Successful, but partial historical changelogs available,
+ * end time falls into different htime file or future time
+ * -2 : Error, requested historical changelog not available, not
+ * even partial
+ * -1 : On any error
+ */
+int
+gf_history_changelog(char *changelog_dir, unsigned long start,
+ unsigned long end, int n_parallel,
+ unsigned long *actual_end)
+{
+ int ret = 0;
+ int len = -1;
+ int fd = -1;
+ int n_read = -1;
+ unsigned long min_ts = 0;
+ unsigned long max_ts = 0;
+ unsigned long end2 = 0;
+ unsigned long ts1 = 0;
+ unsigned long ts2 = 0;
+ unsigned long to = 0;
+ unsigned long from = 0;
+ unsigned long total_changelog = 0;
+ xlator_t *this = NULL;
+ gf_changelog_journal_t *jnl = NULL;
+ gf_changelog_journal_t *hist_jnl = NULL;
+ gf_changelog_history_data_t *hist_data = NULL;
+ DIR *dirp = NULL;
+ struct dirent *entry = NULL;
+ struct dirent scratch[2] = {
+ {
+ 0,
+ },
+ };
+ pthread_t consume_th = 0;
+ char htime_dir[PATH_MAX] = {
+ 0,
+ };
+ char buffer[PATH_MAX] = {
+ 0,
+ };
+ gf_boolean_t partial_history = _gf_false;
+
+ pthread_attr_t attr;
+
+ this = THIS;
+ if (!this) {
+ ret = -1;
+ goto out;
+ }
+
+ ret = pthread_attr_init(&attr);
+ if (ret != 0) {
+ gf_msg(this->name, GF_LOG_ERROR, errno, CHANGELOG_LIB_MSG_PTHREAD_ERROR,
+ "Pthread init failed");
return -1;
+ }
+
+ jnl = (gf_changelog_journal_t *)GF_CHANGELOG_GET_API_PTR(this);
+ if (!jnl) {
+ ret = -1;
+ goto out;
+ }
+
+ hist_jnl = (gf_changelog_journal_t *)jnl->hist_jnl;
+ if (!hist_jnl) {
+ ret = -1;
+ goto out;
+ }
+
+ gf_smsg(this->name, GF_LOG_INFO, 0, CHANGELOG_LIB_MSG_REQUESTING_INFO,
+ "start=%lu", start, "end=%lu", end, NULL);
+
+ /* basic sanity check */
+ if (start > end || n_parallel <= 0) {
+ gf_smsg(this->name, GF_LOG_ERROR, errno, CHANGELOG_LIB_MSG_HIST_FAILED,
+ "start=%lu", start, "end=%lu", end, "thread_count=%d",
+ n_parallel, NULL);
+ ret = -1;
+ goto out;
+ }
+
+ /* cap parallelism count */
+ if (n_parallel > MAX_PARALLELS)
+ n_parallel = MAX_PARALLELS;
+
+ CHANGELOG_FILL_HTIME_DIR(changelog_dir, htime_dir);
+
+ dirp = sys_opendir(htime_dir);
+ if (dirp == NULL) {
+ gf_smsg(this->name, GF_LOG_ERROR, errno, CHANGELOG_LIB_MSG_HTIME_ERROR,
+ "op=opendir", "path=%s", htime_dir, NULL);
+ ret = -1;
+ goto out;
+ }
+
+ for (;;) {
+ errno = 0;
+
+ entry = sys_readdir(dirp, scratch);
+
+ if (!entry || errno != 0) {
+ gf_smsg(this->name, GF_LOG_ERROR, errno,
+ CHANGELOG_LIB_MSG_HIST_FAILED, "start=%lu", start,
+ "end=%lu", end, NULL);
+ ret = -2;
+ break;
+ }
+
+ ret = gf_changelog_extract_min_max(entry->d_name, htime_dir, &fd,
+ &total_changelog, &min_ts, &max_ts);
+ if (ret) {
+ if (-2 == ret)
+ continue;
+ goto out;
+ }
+
+ if (start >= min_ts && start < max_ts) {
+ /**
+ * TODO: handle short reads later...
+ */
+ n_read = sys_read(fd, buffer, PATH_MAX);
+ if (n_read < 0) {
+ ret = -1;
+ gf_msg(this->name, GF_LOG_ERROR, errno,
+ CHANGELOG_LIB_MSG_READ_ERROR,
+ "unable to read htime file");
+ goto out;
+ }
+
+ len = strlen(buffer);
+
+ /**
+ * search @start in the htime file returning it's index
+ * (@from)
+ */
+ from = gf_history_b_search(fd, start, 0, total_changelog - 1, len);
+
+ /* ensuring correctness of gf_b_search */
+ if (gf_history_check(fd, from, start, len) != 0) {
+ ret = -1;
+ gf_smsg(this->name, GF_LOG_ERROR, 0,
+ CHANGELOG_LIB_MSG_GET_TIME_ERROR, "for=start",
+ "start=%lu", start, "idx=%lu", from, NULL);
+ goto out;
+ }
+
+ end2 = (end <= max_ts) ? end : max_ts;
+
+ /* Check if end falls out of same HTIME file. The end
+ * falling to a different htime file or changelog
+ * disable-enable is detected only after 20 seconds.
+ * This is required because, applications generally
+ * asks historical changelogs till current time and
+ * it is possible changelog is not rolled over yet.
+ * So, buffer time of default rollover time plus 5
+ * seconds is subtracted. If the application requests
+ * the end time with in half a minute of changelog
+ * disable, it's not detected as changelog disable and
+ * it's application's responsibility to retry after
+ * 20 seconds before confirming it as partial history.
+ */
+ if ((end - 20) > max_ts) {
+ partial_history = _gf_true;
+ }
+
+ /**
+ * search @end2 in htime file returning it's index (@to)
+ */
+ to = gf_history_b_search(fd, end2, 0, total_changelog - 1, len);
+
+ if (gf_history_check(fd, to, end2, len) != 0) {
+ ret = -1;
+ gf_smsg(this->name, GF_LOG_ERROR, 0,
+ CHANGELOG_LIB_MSG_GET_TIME_ERROR, "for=end",
+ "start=%lu", end2, "idx=%lu", to, NULL);
+ goto out;
+ }
+
+ ret = gf_history_get_timestamp(fd, from, len, &ts1);
+ if (ret == -1)
+ goto out;
+
+ ret = gf_history_get_timestamp(fd, to, len, &ts2);
+ if (ret == -1)
+ goto out;
+
+ gf_smsg(this->name, GF_LOG_INFO, 0, CHANGELOG_LIB_MSG_FINAL_INFO,
+ "from=%lu", ts1, "to=%lu", ts2, "changes=%lu",
+ (to - from + 1), NULL);
+
+ hist_data = GF_CALLOC(1, sizeof(gf_changelog_history_data_t),
+ gf_changelog_mt_history_data_t);
+
+ hist_data->htime_fd = fd;
+ hist_data->from = from;
+ hist_data->to = to;
+ hist_data->len = len;
+ hist_data->n_parallel = n_parallel;
+ hist_data->this = this;
+
+ ret = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
+ if (ret != 0) {
+ gf_msg(this->name, GF_LOG_ERROR, ret,
+ CHANGELOG_LIB_MSG_PTHREAD_ERROR,
+ "unable to sets the detach"
+ " state attribute");
+ ret = -1;
+ goto out;
+ }
+
+ /* spawn a thread for background parsing & publishing */
+ ret = gf_thread_create(&consume_th, &attr, gf_history_consume,
+ hist_data, "cloghcon");
+ if (ret) {
+ gf_msg(this->name, GF_LOG_ERROR, ret,
+ CHANGELOG_LIB_MSG_THREAD_CREATION_FAILED,
+ "creation of consume parent-thread"
+ " failed.");
+ ret = -1;
+ goto out;
+ }
+
+ goto out;
+
+ } else { /* end of range check */
+ gf_smsg(this->name, GF_LOG_ERROR, errno,
+ CHANGELOG_LIB_MSG_HIST_FAILED, "start=%lu", start,
+ "end=%lu", end, "chlog_min=%lu", min_ts, "chlog_max=%lu",
+ max_ts, NULL);
+ }
+ } /* end of readdir() */
+
+out:
+ if (dirp != NULL)
+ (void)sys_closedir(dirp);
+
+ if (ret < 0) {
+ if (fd != -1)
+ (void)sys_close(fd);
+ GF_FREE(hist_data);
+ (void)pthread_attr_destroy(&attr);
+
+ return ret;
+ }
+
+ hist_jnl->hist_done = 1;
+ *actual_end = ts2;
+
+ if (partial_history) {
+ ret = 1;
+ }
+
+ return ret;
}