summaryrefslogtreecommitdiffstats
path: root/xlators/features
diff options
context:
space:
mode:
Diffstat (limited to 'xlators/features')
-rw-r--r--xlators/features/changelog/lib/examples/c/get-history.c109
-rw-r--r--xlators/features/changelog/lib/src/gf-changelog-helpers.h45
-rw-r--r--xlators/features/changelog/lib/src/gf-changelog-process.c57
-rw-r--r--xlators/features/changelog/lib/src/gf-history-changelog.c676
-rw-r--r--xlators/features/changelog/src/changelog-helpers.c103
-rw-r--r--xlators/features/changelog/src/changelog-helpers.h17
-rw-r--r--xlators/features/changelog/src/changelog-mem-types.h1
-rw-r--r--xlators/features/changelog/src/changelog-misc.h7
-rw-r--r--xlators/features/changelog/src/changelog.c37
9 files changed, 1032 insertions, 20 deletions
diff --git a/xlators/features/changelog/lib/examples/c/get-history.c b/xlators/features/changelog/lib/examples/c/get-history.c
new file mode 100644
index 00000000000..33eb8c32d4d
--- /dev/null
+++ b/xlators/features/changelog/lib/examples/c/get-history.c
@@ -0,0 +1,109 @@
+/*
+ Copyright (c) 2013 Red Hat, Inc. <http://www.redhat.com>
+ This file is part of GlusterFS.
+
+ This file is licensed to you under your choice of the GNU Lesser
+ General Public License, version 3 or any later version (LGPLv3 or
+ later), or the GNU General Public License, version 2 (GPLv2), in all
+ cases as published by the Free Software Foundation.
+*/
+
+/**
+ * get set of new changes every 10 seconds (just print the file names)
+ *
+ * Compile it using:
+ * gcc -o gethistory `pkg-config --cflags libgfchangelog` get-history.c \
+ * `pkg-config --libs libgfchangelog`
+ */
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <sys/un.h>
+#include <limits.h>
+#include <sys/socket.h>
+#include <sys/types.h>
+#include <errno.h>
+
+#include "changelog.h"
+
+#define handle_error(fn) \
+ printf ("%s (reason: %s)\n", fn, strerror (errno))
+
+int
+main (int argc, char ** argv)
+{
+ int i = 0;
+ int ret = 0;
+ ssize_t nr_changes = 0;
+ ssize_t changes = 0;
+ char fbuf[PATH_MAX] = {0,};
+ unsigned long end_ts = 0;
+
+ ret = gf_changelog_register ("/export1/v1/b1",
+ "/tmp/scratch_v1", "/tmp/scratch_v1/changes.log",
+ 9, 5);
+ if (ret) {
+ handle_error ("register failed");
+ goto out;
+ }
+
+ int a, b;
+ printf ("give the two numbers start and end\t");
+ scanf ("%d%d", &a, &b);
+ ret = gf_history_changelog ("/export1/v1/b1/.glusterfs/changelogs",a, b, 3, &end_ts);
+ if (ret == -1) {
+ printf ("history failed");
+ goto out;
+ }
+
+ printf ("end time till when changelog available : %d , ret(%d) \t", end_ts, ret);
+ fflush(stdout);
+
+ while (1) {
+ nr_changes = gf_history_changelog_scan ();
+ printf ("scanned, nr_changes : %d\n",nr_changes);
+ if (nr_changes < 0) {
+ handle_error ("scan(): ");
+ break;
+ }
+
+ if (nr_changes == 0) {
+ printf ("done scanning \n");
+ goto out;
+ }
+
+ printf ("Got %ld changelog files\n", nr_changes);
+
+ while ( (changes =
+ gf_history_changelog_next_change (fbuf, PATH_MAX)) > 0) {
+ printf ("changelog file [%d]: %s\n", ++i, fbuf);
+
+ /* process changelog */
+ /* ... */
+ /* ... */
+ /* ... */
+ /* done processing */
+
+ ret = gf_history_changelog_done (fbuf);
+ if (ret)
+ handle_error ("gf_changelog_done");
+ }
+ /*
+ if (changes == -1)
+ handle_error ("gf_changelog_next_change");
+ if (nr_changes ==1){
+ printf("continue scanning\n");
+ }
+
+ if(nr_changes == 0){
+ printf("done scanning \n");
+ goto out;
+ }
+ */
+ }
+
+
+out:
+ return ret;
+}
diff --git a/xlators/features/changelog/lib/src/gf-changelog-helpers.h b/xlators/features/changelog/lib/src/gf-changelog-helpers.h
index 2d545da9e82..218896b86e7 100644
--- a/xlators/features/changelog/lib/src/gf-changelog-helpers.h
+++ b/xlators/features/changelog/lib/src/gf-changelog-helpers.h
@@ -24,6 +24,7 @@
#define GF_CHANGELOG_PROCESSED_DIR ".processed"
#define GF_CHANGELOG_PROCESSING_DIR ".processing"
#define GF_CHANGELOG_HISTORY_DIR ".history"
+#define TIMESTAMP_LENGTH 10
#ifndef MAXLINE
#define MAXLINE 4096
@@ -72,8 +73,45 @@ typedef struct gf_changelog {
/* Holds gfc for History API */
struct gf_changelog *hist_gfc;
+
+ /* holds 0 done scanning, 1 keep scanning and -1 error */
+ int hist_done;
} gf_changelog_t;
+typedef struct gf_changelog_history_data {
+ int len;
+
+ int htime_fd;
+
+ /* parallelism count */
+ int n_parallel;
+
+ /* history from, to indexes */
+ unsigned long from;
+ unsigned long to;
+} gf_changelog_history_data_t;
+
+typedef struct gf_changelog_consume_data {
+ /** set of inputs */
+
+ /* fd to read from */
+ int fd;
+
+ /* from @offset */
+ off_t offset;
+
+ xlator_t *this;
+ gf_changelog_t *gfc;
+
+ /** set of outputs */
+
+ /* return value */
+ int retval;
+
+ /* journal processed */
+ char changelog[PATH_MAX];
+} gf_changelog_consume_data_t;
+
int
gf_changelog_notification_init (xlator_t *this, gf_changelog_t *gfc);
@@ -98,4 +136,11 @@ gf_ftruncate (int fd, off_t length);
off_t
gf_lseek (int fd, off_t offset, int whence);
+int
+gf_changelog_consume (xlator_t *this,
+ gf_changelog_t *gfc,
+ char *from_path, gf_boolean_t no_publish);
+int
+gf_changelog_publish (xlator_t *this, gf_changelog_t *gfc, char *from_path);
+
#endif
diff --git a/xlators/features/changelog/lib/src/gf-changelog-process.c b/xlators/features/changelog/lib/src/gf-changelog-process.c
index 3ea2700c62b..3b8d2683672 100644
--- a/xlators/features/changelog/lib/src/gf-changelog-process.c
+++ b/xlators/features/changelog/lib/src/gf-changelog-process.c
@@ -459,8 +459,49 @@ gf_changelog_decode (xlator_t *this, gf_changelog_t *gfc, int from_fd,
return ret;
}
-static int
-gf_changelog_consume (xlator_t *this, gf_changelog_t *gfc, char *from_path)
+int
+gf_changelog_publish (xlator_t *this, gf_changelog_t *gfc, char *from_path)
+{
+ int ret = 0;
+ char dest[PATH_MAX] = {0,};
+ char to_path[PATH_MAX] = {0,};
+ struct stat stbuf = {0,};
+
+ (void) snprintf (to_path, PATH_MAX, "%s%s",
+ gfc->gfc_current_dir, basename (from_path));
+
+ /* handle zerob file that wont exist in current */
+ ret = stat (from_path, &stbuf);
+ if (ret)
+ goto out;
+
+ if (stbuf.st_size == 0) {
+ ret = unlink (from_path);
+ if (ret)
+ gf_log (this->name, GF_LOG_ERROR,
+ "could not unlink %s (reason %s)",
+ from_path, strerror (errno));
+ goto out;
+ }
+
+ (void) snprintf (dest, PATH_MAX, "%s%s",
+ gfc->gfc_processing_dir, basename (from_path));
+
+ ret = rename (to_path, dest);
+ if (ret){
+ gf_log (this->name, GF_LOG_ERROR,
+ "error moving %s to processing dir"
+ " (reason: %s)", to_path, strerror (errno));
+ }
+
+out:
+ return ret;
+}
+
+int
+gf_changelog_consume (xlator_t *this,
+ gf_changelog_t *gfc,
+ char *from_path, gf_boolean_t no_publish)
{
int ret = -1;
int fd1 = 0;
@@ -472,6 +513,7 @@ gf_changelog_consume (xlator_t *this, gf_changelog_t *gfc, char *from_path)
ret = stat (from_path, &stbuf);
if (ret || !S_ISREG(stbuf.st_mode)) {
+ ret = -1;
gf_log (this->name, GF_LOG_ERROR,
"stat failed on changelog file: %s", from_path);
goto out;
@@ -506,6 +548,8 @@ gf_changelog_consume (xlator_t *this, gf_changelog_t *gfc, char *from_path)
if (!ret) {
/* move it to processing on a successfull
decode */
+ if (no_publish == _gf_true)
+ goto close_fd;
ret = rename (to_path, dest);
if (ret)
gf_log (this->name, GF_LOG_ERROR,
@@ -516,10 +560,15 @@ gf_changelog_consume (xlator_t *this, gf_changelog_t *gfc, char *from_path)
/* remove it from .current if it's an empty file */
if (zerob) {
+ if (no_publish == _gf_true) {
+ ret = 0;
+ goto close_fd;
+ }
+
ret = unlink (to_path);
if (ret)
gf_log (this->name, GF_LOG_ERROR,
- "could not unlink %s (reason: %s",
+ "could not unlink %s (reason: %s)",
to_path, strerror (errno));
}
}
@@ -546,7 +595,7 @@ gf_changelog_ext_change (xlator_t *this,
alo = 1;
gf_log (this->name, GF_LOG_DEBUG,
"processing changelog: %s", path);
- ret = gf_changelog_consume (this, gfc, path);
+ ret = gf_changelog_consume (this, gfc, path, _gf_false);
}
if (ret)
diff --git a/xlators/features/changelog/lib/src/gf-history-changelog.c b/xlators/features/changelog/lib/src/gf-history-changelog.c
index bfc4cd37dc3..a895037eeca 100644
--- a/xlators/features/changelog/lib/src/gf-history-changelog.c
+++ b/xlators/features/changelog/lib/src/gf-history-changelog.c
@@ -11,6 +11,7 @@
#include "globals.h"
#include "glusterfs.h"
#include "logging.h"
+#include "syscall.h"
#include "gf-changelog-helpers.h"
@@ -18,7 +19,8 @@
#include "changelog-misc.h"
#include "changelog-mem-types.h"
-/*@API
+/**
+ * @API
* gf_history_changelog_done:
* Move processed history changelog file from .processing
* to .processed
@@ -86,6 +88,7 @@ gf_history_changelog_done (char *file)
free (buffer); /* allocated by realpath() */
return ret;
}
+
/**
* @API
* gf_history_changelog_start_fresh:
@@ -126,7 +129,7 @@ gf_history_changelog_start_fresh ()
return -1;
}
-/*
+/**
* @API
* gf_history_changelog_next_change:
* Return the next history changelog file entry. Zero means all
@@ -182,7 +185,7 @@ gf_history_changelog_next_change (char *bufptr, size_t maxlen)
return -1;
}
-/*
+/**
* @API
* gf_history_changelog_scan:
* Scan and generate a list of change entries.
@@ -191,8 +194,13 @@ gf_history_changelog_next_change (char *bufptr, size_t maxlen)
* This call also acts as a cancellation point for the consumer.
*
* RETURN VALUES:
- * nr_entries: On success.
- * -1 : On error.
+ * +ve integer : success and keep scanning.(count of changelogs)
+ * 0 : success and done scanning.
+ * -1 : error.
+ *
+ * NOTE: After first 0 return call_get_next change for once more time
+ * to empty the tracker
+ *
*/
ssize_t
gf_history_changelog_scan ()
@@ -204,10 +212,11 @@ gf_history_changelog_scan ()
xlator_t *this = NULL;
size_t nr_entries = 0;
gf_changelog_t *gfc = NULL;
- gf_changelog_t *hist_gfc = NULL;
+ gf_changelog_t *hist_gfc = NULL;
struct dirent *entryp = NULL;
struct dirent *result = NULL;
char buffer[PATH_MAX] = {0,};
+ static int is_last_scan = 0;
this = THIS;
if (!this)
@@ -221,15 +230,23 @@ gf_history_changelog_scan ()
if (!hist_gfc)
goto out;
+ retry:
+ if (is_last_scan == 1)
+ return 0;
+ if (hist_gfc->hist_done == 0)
+ is_last_scan = 1;
+
errno = EINVAL;
+ if (hist_gfc->hist_done == -1)
+ goto out;
tracker_fd = hist_gfc->gfc_fd;
if (gf_ftruncate (tracker_fd, 0))
goto out;
- len = offsetof(struct dirent, d_name)
- + pathconf(hist_gfc->gfc_processing_dir, _PC_NAME_MAX) + 1;
+ len = offsetof (struct dirent, d_name)
+ + pathconf (hist_gfc->gfc_processing_dir, _PC_NAME_MAX) + 1;
entryp = GF_CALLOC (1, len,
gf_changelog_mt_libgfchangelog_dirent_t);
if (!entryp)
@@ -265,10 +282,649 @@ gf_history_changelog_scan ()
GF_FREE (entryp);
+ gf_log (this->name, GF_LOG_DEBUG,
+ "hist_done %d, is_last_scan: %d", hist_gfc->hist_done, is_last_scan);
+
if (!result) {
- if (gf_lseek (tracker_fd, 0, SEEK_SET) != -1)
- return nr_entries;
+ if (gf_lseek (tracker_fd, 0, SEEK_SET) != -1) {
+ if (nr_entries > 0)
+ return nr_entries;
+ else {
+ sleep(1);
+ goto retry;
+ }
+ }
}
out:
return -1;
}
+
+/*
+ * Gets timestamp value at the changelog path at index.
+ * Returns 0 on success(updates given time-stamp), -1 on failure.
+ */
+int
+gf_history_get_timestamp (int fd, int index, int len,
+ unsigned long *ts)
+{
+ xlator_t *this = NULL;
+ int n_read = -1;
+ char path_buf[PATH_MAX]= {0,};
+ char *iter = path_buf;
+ size_t offset = index * (len+1);
+ unsigned long value = 0;
+ int ret = 0;
+
+ this = THIS;
+ if (!this) {
+ return -1;
+ }
+
+ n_read = pread (fd, path_buf, len, offset);
+ if (n_read < 0 ) {
+ ret = -1;
+ gf_log ( this->name, GF_LOG_ERROR,
+ "could not read from htime file");
+ goto out;
+ }
+ iter+= len - TIMESTAMP_LENGTH;
+ sscanf (iter, "%lu",&value);
+out:
+ if(ret == 0)
+ *ts = value;
+ return ret;
+}
+
+/*
+ * Function to ensure correctness of search
+ * Checks whether @value is there next to @target_index or not
+ */
+int
+gf_history_check ( int fd, int target_index, unsigned long value, int len)
+{
+ int ret = 0;
+ unsigned long ts1 = 0;
+ unsigned long ts2 = 0;
+
+ if (target_index == 0) {
+ ret = gf_history_get_timestamp (fd, target_index, len, &ts1);
+ if (ret == -1)
+ goto out;
+ if (value <= ts1)
+ goto out;
+ else {
+ ret = -1;
+ goto out;
+ }
+ }
+
+ ret = gf_history_get_timestamp (fd, target_index, len, &ts1);
+ if (ret ==-1)
+ goto out;
+ ret = gf_history_get_timestamp (fd, target_index -1, len, &ts2);
+ if (ret ==-1)
+ goto out;
+
+ if ( (value <= ts1) && (value > ts2) ) {
+ goto out;
+ }
+ else
+ ret = -1;
+out:
+ return ret;
+}
+
+/*
+ * This is a "binary search" based search function which checks neighbours
+ * for in-range availability of the value to be searched and provides the
+ * index at which the changelog file nearest to the requested timestamp(value)
+ * can be read from.
+ *
+ * Actual offset can be calculated as (index* (len+1) ).
+ * "1" is because the changelog paths are null terminated.
+ *
+ * @path : Htime file to search in
+ * @value : time stamp to search
+ * @from : start index to search
+ * @to : end index to search
+ * @len : length of fixes length strings seperated by null
+ */
+
+int
+gf_history_b_search (int fd, unsigned long value,
+ unsigned long from, unsigned long to, int len)
+{
+ int m_index = -1;
+ unsigned long cur_value = 0;
+ unsigned long ts1 = 0;
+ int ret = 0;
+
+ m_index = (from + to)/2;
+
+ if ( (to - from) <=1 ) {
+ /* either one or 2 changelogs left */
+ if ( to != from ) {
+ /* check if value is less or greater than to
+ * return accordingly
+ */
+ ret = gf_history_get_timestamp (fd, from, len, &ts1);
+ if (ret ==-1)
+ goto out;
+ if ( ts1 >= value) {
+ /* actually compatision should be
+ * exactly == but considering
+ *
+ * case of only 2 changelogs in htime file
+ */
+ return from;
+ }
+ else
+ return to;
+ }
+ else
+ return to;
+ }
+
+ ret = gf_history_get_timestamp (fd, m_index, len, &cur_value);
+ if (ret == -1)
+ goto out;
+ if (cur_value == value) {
+ return m_index;
+ }
+ else if (value > cur_value) {
+ ret = gf_history_get_timestamp (fd, m_index+1, len, &cur_value);
+ if (ret == -1)
+ goto out;
+ if (value < cur_value)
+ return m_index + 1;
+ else
+ return gf_history_b_search (fd, value,
+ m_index+1, to, len);
+ }
+ else {
+ if (m_index ==0) {
+ /* we are sure that values exists
+ * in this htime file
+ */
+ return 0;
+ }
+ else {
+ ret = gf_history_get_timestamp (fd, m_index-1, len,
+ &cur_value);
+ if (ret == -1)
+ goto out;
+ if (value > cur_value) {
+ return m_index;
+ }
+ else
+ return gf_history_b_search (fd, value, from,
+ m_index-1, len);
+ }
+ }
+out:
+ return -1;
+}
+
+void *
+gf_changelog_consume_wrap (void* data)
+{
+ int ret = -1;
+ ssize_t nread = 0;
+ xlator_t *this = NULL;
+ gf_changelog_consume_data_t *ccd = NULL;
+
+ ccd = (gf_changelog_consume_data_t *) data;
+ this = ccd->this;
+
+ ccd->retval = -1;
+
+ nread = pread (ccd->fd, ccd->changelog, PATH_MAX, ccd->offset);
+ if (nread < 0) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "cannot read from history metadata file (reason %s)",
+ strerror (errno));
+ goto out;
+ }
+
+ /* TODO: handle short reads and EOF. */
+
+ ret = gf_changelog_consume (ccd->this,
+ ccd->gfc, ccd->changelog, _gf_true);
+ if (ret) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "could not parse changelog: %s", ccd->changelog);
+ goto out;
+ }
+
+ ccd->retval = 0;
+
+ out:
+ return NULL;
+}
+
+/**
+ * "gf_history_consume" is a worker function for history.
+ * parses and moves changelogs files from index "from"
+ * to index "to" in open htime file whose fd is "fd".
+ */
+
+#define MAX_PARALLELS 10
+
+void *
+gf_history_consume (void * data)
+{
+ xlator_t *this = NULL;
+ gf_changelog_t *gfc = NULL;
+ gf_changelog_t *hist_gfc = NULL;
+ int ret = 0;
+ int iter = 0;
+ int fd = -1;
+ int from = -1;
+ int to = -1;
+ int len = -1;
+ int n_parallel = 0;
+ int n_envoked = 0;
+ gf_boolean_t publish = _gf_true;
+ pthread_t th_id[MAX_PARALLELS] = {0,};
+ gf_changelog_history_data_t *hist_data = NULL;
+ gf_changelog_consume_data_t ccd[MAX_PARALLELS] = {{0},};
+ gf_changelog_consume_data_t *curr = NULL;
+
+ hist_data = (gf_changelog_history_data_t *) data;
+ if (hist_data == NULL) {
+ ret = -1;
+ goto out;
+ }
+
+ fd = hist_data->htime_fd;
+ from = hist_data->from;
+ to = hist_data->to;
+ len = hist_data->len;
+ n_parallel = hist_data->n_parallel;
+
+ this = THIS;
+ if (!this) {
+ ret = -1;
+ goto out;
+ }
+
+ gfc = (gf_changelog_t *) this->private;
+ if (!gfc) {
+ ret = -1;
+ goto out;
+ }
+
+ hist_gfc = gfc->hist_gfc;
+ if (!hist_gfc) {
+ ret = -1;
+ goto out;
+ }
+
+ while (from <= to) {
+ n_envoked = 0;
+
+ for (iter = 0 ; (iter < n_parallel) && (from <= to); iter++) {
+ curr = &ccd[iter];
+
+ curr->this = this;
+ curr->gfc = hist_gfc;
+ curr->fd = fd;
+ curr->offset = from * (len + 1);
+
+ curr->retval = 0;
+ memset (curr->changelog, '\0', PATH_MAX);
+
+ ret = pthread_create (&th_id[iter], NULL,
+ gf_changelog_consume_wrap, curr);
+ if (ret) {
+ gf_log ( this->name, GF_LOG_ERROR,
+ "could not create consume-thread"
+ " reason (%s)", strerror (ret));
+ ret = -1;
+ goto sync;
+ } else
+ n_envoked++;
+
+ from++;
+ }
+
+ sync:
+ for (iter = 0; iter < n_envoked; iter++) {
+ ret = pthread_join (th_id[iter], NULL);
+ if (ret) {
+ publish = _gf_false;
+ gf_log (this->name, GF_LOG_ERROR,
+ "pthread_join() error %s",
+ strerror (ret));
+ /* try to join the rest */
+ continue;
+ }
+
+ if (publish == _gf_false)
+ continue;
+
+ curr = &ccd[iter];
+ if (ccd->retval) {
+ publish = _gf_false;
+ gf_log (this->name, GF_LOG_ERROR,
+ "parsing error, ceased publishing...");
+ continue;
+ }
+
+ ret = gf_changelog_publish (curr->this,
+ curr->gfc, curr->changelog);
+ if (ret) {
+ publish = _gf_false;
+ gf_log (this->name, GF_LOG_ERROR,
+ "publish error, ceased publishing...");
+ }
+ }
+ }
+
+ /* informing "parsing done". */
+ hist_gfc->hist_done = (publish == _gf_true) ? 0 : -1;
+
+out:
+ if (fd != -1)
+ close (fd);
+ GF_FREE (hist_data);
+ return NULL;
+}
+
+/**
+ * @API
+ * gf_history_changelog() : Get/parse historical changelogs and get them ready
+ * for consumption.
+ *
+ * Arguments:
+ * @changelog_dir : Directory location from where history changelogs are
+ * supposed to be consumed.
+ * @start: Unix timestamp FROM where changelogs should be consumed.
+ * @end: Unix timestamp TO where changelogsshould be consumed.
+ * @n_parallel : degree of parallelism while changelog parsing.
+ * @actual_end : the end time till where changelogs are available.
+ *
+ * Return:
+ * Returns <timestamp> on success, the last time till where changelogs are
+ * available.
+ * Returns -1 on failure(error).
+ */
+
+#define MAKE_HTIME_FILE_PATH(htime_file, htime_dir, htime_bname) do { \
+ strcpy (htime_file, htime_dir); \
+ strcat (htime_file, "/"); \
+ strcat (htime_file, htime_bname); \
+ } while (0)
+
+/**
+ * Extract timestamp range from a historical metadata file
+ * Returns:
+ * 0 : Success ({min,max}_ts with the appropriate values)
+ * -1 : Failure
+ * -2 : Ignore this metadata file and process next
+ */
+int
+gf_changelog_extract_min_max (const char *dname, const char *htime_dir,
+ int *fd, unsigned long *total,
+ unsigned long *min_ts, unsigned long *max_ts)
+{
+ int ret = -1;
+ xlator_t *this = NULL;
+ char htime_file[PATH_MAX] = {0,};
+ struct stat stbuf = {0,};
+ char *iter = NULL;
+ char x_value[30] = {0,};
+
+ this = THIS;
+
+ MAKE_HTIME_FILE_PATH (htime_file, htime_dir, dname);
+
+ iter = (htime_file + strlen (htime_file) - TIMESTAMP_LENGTH);
+ sscanf (iter ,"%lu",min_ts);
+
+ ret = stat (htime_file, &stbuf);
+ if (ret) {
+ ret = -1;
+ gf_log (this->name, GF_LOG_ERROR,
+ "stat() failed on htime file %s (reason %s)",
+ htime_file, strerror (errno));
+ goto out;
+ }
+
+ /* ignore everything except regular files */
+ if (!S_ISREG (stbuf.st_mode)) {
+ ret = -2;
+ goto out;
+ }
+
+ *fd = open (htime_file, O_RDONLY);
+ if (*fd < 0) {
+ ret = -1;
+ gf_log (this->name, GF_LOG_ERROR,
+ "open() failed for htime %s (reasong %s)",
+ htime_file, strerror (errno));
+ goto out;
+ }
+
+ /* Looks good, extract max timestamp */
+ ret = sys_fgetxattr (*fd, HTIME_KEY, x_value, sizeof (x_value));
+ if (ret < 0) {
+ ret = -1;
+ gf_log (this->name, GF_LOG_ERROR,
+ "error extracting max timstamp from htime file"
+ " %s (reason %s)", htime_file, strerror (errno));
+ goto out;
+ }
+
+ sscanf (x_value, "%lu:%lu", max_ts, total);
+ gf_log (this->name, GF_LOG_INFO,
+ "MIN: %lu, MAX: %lu, TOTAL CHANGELOGS: %lu",
+ *min_ts, *max_ts, *total);
+
+ ret = 0;
+
+ out:
+ return ret;
+}
+
+int
+gf_history_changelog (char* changelog_dir, unsigned long start,
+ unsigned long end, int n_parallel,
+ unsigned long *actual_end)
+{
+ int ret = 0;
+ int len = -1;
+ int fd = -1;
+ int n_read = -1;
+ unsigned long min_ts = 0;
+ unsigned long max_ts = 0;
+ unsigned long end2 = 0;
+ unsigned long ts1 = 0;
+ unsigned long ts2 = 0;
+ unsigned long to = 0;
+ unsigned long from = 0;
+ unsigned long total_changelog = 0;
+ xlator_t *this = NULL;
+ gf_changelog_t *gfc = NULL;
+ gf_changelog_t *hist_gfc = NULL;
+ gf_changelog_history_data_t *hist_data = NULL;
+ DIR *dirp = NULL;
+ struct dirent *dp = NULL;
+ pthread_t consume_th = 0;
+ char htime_dir[PATH_MAX] = {0,};
+ char buffer[PATH_MAX] = {0,};
+
+ pthread_attr_t attr;
+
+ this = THIS;
+ if (!this) {
+ ret = -1;
+ goto out;
+ }
+
+ gfc = (gf_changelog_t *) this->private;
+ if (!gfc) {
+ ret = -1;
+ goto out;
+ }
+
+ hist_gfc = (gf_changelog_t *) gfc->hist_gfc;
+ if (!hist_gfc) {
+ ret = -1;
+ goto out;
+ }
+
+ /* basic sanity check */
+ if (start > end || n_parallel <= 0) {
+ ret = -1;
+ goto out;
+ }
+
+ /* cap parallelism count */
+ if (n_parallel > MAX_PARALLELS)
+ n_parallel = MAX_PARALLELS;
+
+ CHANGELOG_FILL_HTIME_DIR (changelog_dir, htime_dir);
+
+ dirp = opendir (htime_dir);
+ if (dirp == NULL) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "open dir on htime failed : %s (reason: %s)",
+ htime_dir, strerror (errno));
+ ret = -1;
+ goto out;
+ }
+
+ while ((dp = readdir (dirp)) != NULL) {
+ ret = gf_changelog_extract_min_max (dp->d_name, htime_dir,
+ &fd, &total_changelog,
+ &min_ts, &max_ts);
+ if (ret) {
+ if (-2 == ret)
+ continue;
+ goto out;
+ }
+
+ if (start >= min_ts && start < max_ts) {
+ /**
+ * TODO: handle short reads later...
+ */
+ n_read = read (fd, buffer, PATH_MAX);
+ if (n_read < 0) {
+ ret = -1;
+ gf_log ( this->name, GF_LOG_ERROR,
+ "unable to read htime file");
+ goto out;
+ }
+
+ len = strlen (buffer);
+
+ /**
+ * search @start in the htime file returning it's index
+ * (@from)
+ */
+ from = gf_history_b_search (fd, start, 0,
+ total_changelog - 1, len);
+
+ /* ensuring correctness of gf_b_search */
+ if (gf_history_check (fd, from, start, len) != 0) {
+ ret = -1;
+ gf_log (this->name, GF_LOG_ERROR,
+ "wrong result for start: %lu idx: %lu",
+ start, from);
+ goto out;
+ }
+
+ end2 = (end <= max_ts) ? end : max_ts;
+
+ /**
+ * search @end2 in htime file returning it's index (@to)
+ */
+ to = gf_history_b_search (fd, end2,
+ 0, total_changelog - 1, len);
+
+ if (gf_history_check (fd, to, end2, len) != 0) {
+ ret = -1;
+ gf_log (this->name, GF_LOG_ERROR,
+ "wrong result for start: %lu idx: %lu",
+ end2, to);
+ goto out;
+ }
+
+ ret = gf_history_get_timestamp (fd, from, len, &ts1);
+ if (ret == -1)
+ goto out;
+
+ ret = gf_history_get_timestamp (fd, to, len, &ts2);
+ if (ret == -1)
+ goto out;
+
+ gf_log (this->name, GF_LOG_INFO,
+ "FINAL: from: %lu, to: %lu, changes: %lu",
+ ts1, ts2, (to - from + 1));
+
+ hist_data = GF_CALLOC (1,
+ sizeof (gf_changelog_history_data_t),
+ gf_changelog_mt_history_data_t);
+
+ hist_data->htime_fd = fd;
+ hist_data->from = from;
+ hist_data->to = to;
+ hist_data->len = len;
+ hist_data->n_parallel = n_parallel;
+
+ ret = pthread_attr_init (&attr);
+ if (ret != 0) {
+ ret = -1;
+ goto out;
+ }
+
+ ret = pthread_attr_setdetachstate
+ (&attr, PTHREAD_CREATE_DETACHED);
+ if (ret != 0) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "unable to sets the detach"
+ " state attribute, reason(%s)",
+ strerror (ret));
+ ret = -1;
+ goto out;
+ }
+
+ /* spawn a thread for background parsing & publishing */
+ ret = pthread_create (&consume_th, &attr,
+ gf_history_consume, hist_data);
+ if (ret) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "creation of consume parent-thread"
+ " failed. reason(%s)", strerror (ret));
+ ret = -1;
+ goto out;
+ }
+
+ goto out;
+
+ } /* end of range check */
+
+ } /* end of readdir() */
+
+ if (!from || !to)
+ ret = -1;
+
+out:
+ if (dirp != NULL)
+ closedir (dirp);
+
+ if (ret < 0) {
+ if (fd != -1)
+ close (fd);
+ GF_FREE (hist_data);
+ (void) pthread_attr_destroy (&attr);
+
+ return ret;
+ }
+
+ hist_gfc->hist_done = 1;
+ *actual_end = ts2;
+
+ return ret;
+}
diff --git a/xlators/features/changelog/src/changelog-helpers.c b/xlators/features/changelog/src/changelog-helpers.c
index 71f2f7a25ad..984106b75e6 100644
--- a/xlators/features/changelog/src/changelog-helpers.c
+++ b/xlators/features/changelog/src/changelog-helpers.c
@@ -17,6 +17,7 @@
#include "defaults.h"
#include "logging.h"
#include "iobuf.h"
+#include "syscall.h"
#include "changelog-helpers.h"
#include "changelog-mem-types.h"
@@ -132,6 +133,49 @@ changelog_write (int fd, char *buffer, size_t len)
return (writen != len);
}
+int
+htime_update (xlator_t *this,
+ changelog_priv_t *priv, unsigned long ts,
+ char * buffer)
+{
+ char changelog_path[PATH_MAX+1] = {0,};
+ int len = -1;
+ char x_value[25] = {0,};
+ /* time stamp(10) + : (1) + rolltime (12 ) + buffer (2) */
+ int ret = 0;
+
+ if (priv->htime_fd ==-1) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "Htime fd not available for updation");
+ ret = -1;
+ goto out;
+ }
+ strcpy (changelog_path, buffer);
+ len = strlen (changelog_path);
+ changelog_path[len] = '\0'; /* redundant */
+
+ if (changelog_write (priv->htime_fd, (void*) changelog_path, len+1 ) < 0) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "Htime file content write failed");
+ ret =-1;
+ goto out;
+ }
+
+ sprintf (x_value,"%lu:%d",ts, priv->rollover_count);
+
+ if (sys_fsetxattr (priv->htime_fd, HTIME_KEY, x_value,
+ strlen (x_value), XATTR_REPLACE)) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "Htime xattr updation failed");
+ goto out;
+ }
+
+ priv->rollover_count +=1;
+
+out:
+ return ret;
+}
+
static int
changelog_rollover_changelog (xlator_t *this,
changelog_priv_t *priv, unsigned long ts)
@@ -173,6 +217,15 @@ changelog_rollover_changelog (xlator_t *this,
ofile, nfile, strerror (errno));
}
+ if (!ret) {
+ ret = htime_update (this, priv, ts, nfile);
+ if (ret == -1) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "could not update htime file");
+ goto out;
+ }
+ }
+
if (notify) {
bname = basename (nfile);
gf_log (this->name, GF_LOG_DEBUG, "notifying: %s", bname);
@@ -212,6 +265,54 @@ changelog_rollover_changelog (xlator_t *this,
return ret;
}
+/* Returns 0 on successful creation of htime file
+ * returns -1 on failure or error
+ */
+int
+htime_open (xlator_t *this,
+ changelog_priv_t * priv, unsigned long ts)
+{
+ int fd = -1;
+ int ret = 0;
+ char ht_dir_path[PATH_MAX] = {0,};
+ char ht_file_path[PATH_MAX] = {0,};
+ int flags = 0;
+
+ CHANGELOG_FILL_HTIME_DIR(priv->changelog_dir, ht_dir_path);
+
+ /* get the htime file name in ht_file_path */
+ (void) snprintf (ht_file_path,PATH_MAX,"%s/%s.%lu",ht_dir_path,
+ HTIME_FILE_NAME, ts);
+
+ flags |= (O_CREAT | O_RDWR | O_SYNC);
+ fd = open (ht_file_path, flags,
+ S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
+ if (fd < 0) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "unable to open/create htime file: %s"
+ "(reason: %s)", ht_file_path, strerror (errno));
+ ret = -1;
+ goto out;
+
+ }
+
+ if (sys_fsetxattr (fd, HTIME_KEY, HTIME_INITIAL_VALUE,
+ sizeof (HTIME_INITIAL_VALUE)-1, 0)) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "Htime xattr initialization failed");
+ ret = -1;
+ goto out;
+ }
+
+ /* save this htime_fd in priv->htime_fd */
+ priv->htime_fd = fd;
+ /* initialize rollover-number in priv to 1 */
+ priv->rollover_count = 1;
+
+out:
+ return ret;
+}
+
int
changelog_open (xlator_t *this,
changelog_priv_t *priv)
@@ -311,7 +412,7 @@ changelog_handle_change (xlator_t *this,
int ret = 0;
if (CHANGELOG_TYPE_IS_ROLLOVER (cld->cld_type)) {
- changelog_encode_change(priv);
+ changelog_encode_change (priv);
ret = changelog_start_next_change (this, priv,
cld->cld_roll_time,
cld->cld_finale);
diff --git a/xlators/features/changelog/src/changelog-helpers.h b/xlators/features/changelog/src/changelog-helpers.h
index ec90f8a13d7..987af190b9c 100644
--- a/xlators/features/changelog/src/changelog-helpers.h
+++ b/xlators/features/changelog/src/changelog-helpers.h
@@ -203,9 +203,18 @@ struct changelog_priv {
/* logging directory */
char *changelog_dir;
+ /* htime directory */
+ char *htime_dir;
+
/* one file for all changelog types */
int changelog_fd;
+ /* htime fd for current changelog session */
+ int htime_fd;
+
+ /* rollover_count used by htime */
+ int rollover_count;
+
gf_lock_t lock;
/* writen end of the pipe */
@@ -393,6 +402,11 @@ void *
changelog_fsync_thread (void *data);
int
changelog_forget (xlator_t *this, inode_t *inode);
+int
+htime_update (xlator_t *this, changelog_priv_t *priv,
+ unsigned long ts, char * buffer);
+int
+htime_open (xlator_t *this, changelog_priv_t * priv, unsigned long ts);
/* Geo-Rep snapshot dependency changes */
void
@@ -549,5 +563,6 @@ call_stub_t *__chlog_barrier_dequeue (xlator_t *this, struct list_head *queue);
goto label; \
} \
} while (0)
+/* End: Geo-Rep snapshot dependency changes */
+
#endif /* _CHANGELOG_HELPERS_H */
-/* End: Geo-Rep snapshot dependency changes */
diff --git a/xlators/features/changelog/src/changelog-mem-types.h b/xlators/features/changelog/src/changelog-mem-types.h
index d72464eab70..e1fa319a715 100644
--- a/xlators/features/changelog/src/changelog-mem-types.h
+++ b/xlators/features/changelog/src/changelog-mem-types.h
@@ -23,6 +23,7 @@ enum gf_changelog_mem_types {
gf_changelog_mt_libgfchangelog_rl_t = gf_common_mt_end + 7,
gf_changelog_mt_libgfchangelog_dirent_t = gf_common_mt_end + 8,
gf_changelog_mt_changelog_buffer_t = gf_common_mt_end + 9,
+ gf_changelog_mt_history_data_t = gf_common_mt_end + 10,
gf_changelog_mt_end
};
diff --git a/xlators/features/changelog/src/changelog-misc.h b/xlators/features/changelog/src/changelog-misc.h
index 127b03e2e1b..257c1f34218 100644
--- a/xlators/features/changelog/src/changelog-misc.h
+++ b/xlators/features/changelog/src/changelog-misc.h
@@ -16,6 +16,9 @@
#define CHANGELOG_MAX_TYPE 3
#define CHANGELOG_FILE_NAME "CHANGELOG"
+#define HTIME_FILE_NAME "HTIME"
+#define HTIME_KEY "trusted.glusterfs.htime"
+#define HTIME_INITIAL_VALUE "0:0"
#define CHANGELOG_VERSION_MAJOR 1
#define CHANGELOG_VERSION_MINOR 1
@@ -64,6 +67,10 @@
} \
} while (0)
+#define CHANGELOG_FILL_HTIME_DIR(changelog_dir, path) do { \
+ strcpy (path, changelog_dir); \
+ strcat (path, "/htime"); \
+ } while(0)
/**
* everything after 'CHANGELOG_TYPE_ENTRY' are internal types
* (ie. none of the fops trigger this type of event), hence
diff --git a/xlators/features/changelog/src/changelog.c b/xlators/features/changelog/src/changelog.c
index 5eb2cd93fd3..0b982148f44 100644
--- a/xlators/features/changelog/src/changelog.c
+++ b/xlators/features/changelog/src/changelog.c
@@ -1661,6 +1661,11 @@ changelog_init (xlator_t *this, changelog_priv_t *priv)
* simple here.
*/
ret = changelog_fill_rollover_data (&cld, _gf_false);
+ if(ret)
+ goto out;
+
+ ret = htime_open (this, priv, cld.cld_roll_time);
+ /* call htime open with cld's rollover_time */
if (ret)
goto out;
@@ -1779,6 +1784,8 @@ reconfigure (xlator_t *this, dict_t *options)
gf_boolean_t active_now = _gf_true;
changelog_time_slice_t *slice = NULL;
changelog_log_data_t cld = {0,};
+ char htime_dir[PATH_MAX] = {0,};
+ struct timeval tv = {0,};
priv = this->private;
if (!priv)
@@ -1803,6 +1810,12 @@ reconfigure (xlator_t *this, dict_t *options)
goto out;
ret = mkdir_p (priv->changelog_dir, 0600, _gf_true);
+
+ if (ret)
+ goto out;
+ CHANGELOG_FILL_HTIME_DIR(priv->changelog_dir, htime_dir);
+ ret = mkdir_p (htime_dir, 0600, _gf_true);
+
if (ret)
goto out;
@@ -1847,6 +1860,15 @@ reconfigure (xlator_t *this, dict_t *options)
goto out;
if (active_now) {
+ if (!active_earlier) {
+ if (gettimeofday(&tv, NULL) ) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "unable to fetch htime");
+ ret = -1;
+ goto out;
+ }
+ htime_open(this, priv, tv.tv_sec);
+ }
ret = changelog_spawn_notifier (this, priv);
if (!ret)
ret = changelog_spawn_helper_threads (this,
@@ -1871,10 +1893,11 @@ reconfigure (xlator_t *this, dict_t *options)
int32_t
init (xlator_t *this)
{
- int ret = -1;
- char *tmp = NULL;
- changelog_priv_t *priv = NULL;
- gf_boolean_t cond_lock_init = _gf_false;
+ int ret = -1;
+ char *tmp = NULL;
+ changelog_priv_t *priv = NULL;
+ gf_boolean_t cond_lock_init = _gf_false;
+ char htime_dir[PATH_MAX] = {0,};
GF_VALIDATE_OR_GOTO ("changelog", this, out);
@@ -1932,6 +1955,12 @@ init (xlator_t *this)
* so that consumers can _look_ into it (finding nothing...)
*/
ret = mkdir_p (priv->changelog_dir, 0600, _gf_true);
+
+ if (ret)
+ goto out;
+
+ CHANGELOG_FILL_HTIME_DIR(priv->changelog_dir, htime_dir);
+ ret = mkdir_p (htime_dir, 0600, _gf_true);
if (ret)
goto out;