diff options
| author | Venky Shankar <vshankar@redhat.com> | 2014-02-19 20:47:46 +0530 | 
|---|---|---|
| committer | Venky Shankar <vshankar@redhat.com> | 2014-05-14 05:10:15 -0700 | 
| commit | d2db585ce7e26851178104433fa9422482d8719e (patch) | |
| tree | 2e52f15cf261906debd8ec54106ffe9f84af881e /xlators/features/changelog/lib | |
| parent | bfde478cedda8267134ee3807c8db5e042115eae (diff) | |
features/changelog : historical journal consumption.
Facilitates Glusterfs with the ability to detect file-operations
happened in past by scanning the back-end(brick-level) glusterfs
journal (changelog).
Design:
  * List of changelogs produces in one perfectly running session are
    stored in htime file which also holds necessary information about
    the session start and end time.
  * Involves fixed sized seeks to identify N'th changelog in the list.
  * Requires O(log n), (where n is number of changelogs in the list),
    time to identify the end changelog for the given start-end time
    interval.
Currently the background processing of changelogs is sub optimal. BZ
1097041 tracks the development effort.
For complete design, refer the below link:
http://lists.nongnu.org/archive/html/gluster-devel/2014-02/msg00206.html
Change-Id: I27e49f75e492e843084d0ecaf9130224d08462a0
BUG: 1091961
Signed-off-by: Ajeet Jha <ajha@redhat.com>
Signed-off-by: Venky Shankar <vshankar@redhat.com>
Signed-off-by: Ajeet Jha <ajha@redhat.com>
Reviewed-on: http://review.gluster.org/6930
Reviewed-by: Kotresh HR <khiremat@redhat.com>
Tested-by: Gluster Build System <jenkins@build.gluster.com>
Diffstat (limited to 'xlators/features/changelog/lib')
4 files changed, 873 insertions, 14 deletions
diff --git a/xlators/features/changelog/lib/examples/c/get-history.c b/xlators/features/changelog/lib/examples/c/get-history.c new file mode 100644 index 00000000000..33eb8c32d4d --- /dev/null +++ b/xlators/features/changelog/lib/examples/c/get-history.c @@ -0,0 +1,109 @@ +/* +   Copyright (c) 2013 Red Hat, Inc. <http://www.redhat.com> +   This file is part of GlusterFS. + +   This file is licensed to you under your choice of the GNU Lesser +   General Public License, version 3 or any later version (LGPLv3 or +   later), or the GNU General Public License, version 2 (GPLv2), in all +   cases as published by the Free Software Foundation. +*/ + +/** + * get set of new changes every 10 seconds (just print the file names) + * + * Compile it using: + *  gcc -o gethistory `pkg-config --cflags libgfchangelog` get-history.c \ + *  `pkg-config --libs libgfchangelog` + */ + +#include <stdio.h> +#include <stdlib.h> +#include <unistd.h> +#include <sys/un.h> +#include <limits.h> +#include <sys/socket.h> +#include <sys/types.h> +#include <errno.h> + +#include "changelog.h" + +#define handle_error(fn)                                \ +        printf ("%s (reason: %s)\n", fn, strerror (errno)) + +int +main (int argc, char ** argv) +{ +        int     i            = 0; +        int     ret          = 0; +        ssize_t nr_changes   = 0; +        ssize_t changes      = 0; +        char fbuf[PATH_MAX]  = {0,}; +        unsigned long end_ts = 0; + +        ret = gf_changelog_register ("/export1/v1/b1", +                                     "/tmp/scratch_v1", "/tmp/scratch_v1/changes.log", +                                     9, 5); +        if (ret) { +                handle_error ("register failed"); +                goto out; +        } + +        int a, b; +        printf ("give the two numbers start and end\t"); +        scanf ("%d%d", &a, &b); +        ret = gf_history_changelog ("/export1/v1/b1/.glusterfs/changelogs",a, b, 3, &end_ts); +        if (ret == -1) { +                printf ("history failed"); +                goto out; +        } + +        printf ("end time till when changelog available : %d , ret(%d) \t", end_ts, ret); +        fflush(stdout); + +        while (1) { +                nr_changes = gf_history_changelog_scan (); +                printf ("scanned, nr_changes : %d\n",nr_changes); +                if (nr_changes < 0) { +                        handle_error ("scan(): "); +                        break; +                } + +                if (nr_changes == 0) { +                        printf ("done scanning \n"); +                        goto out; +                } + +                printf ("Got %ld changelog files\n", nr_changes); + +                while ( (changes = +                         gf_history_changelog_next_change (fbuf, PATH_MAX)) > 0) { +                        printf ("changelog file [%d]: %s\n", ++i, fbuf); + +                        /* process changelog */ +                        /* ... */ +                        /* ... */ +                        /* ... */ +                        /* done processing */ + +                        ret = gf_history_changelog_done (fbuf); +                        if (ret) +                                handle_error ("gf_changelog_done"); +                } +                /* +                if (changes == -1) +                        handle_error ("gf_changelog_next_change"); +                if (nr_changes ==1){ +                        printf("continue scanning\n"); +                } + +                if(nr_changes == 0){ +                        printf("done scanning \n"); +                        goto out; +                } +                */ +        } + + +out: +        return ret; +} diff --git a/xlators/features/changelog/lib/src/gf-changelog-helpers.h b/xlators/features/changelog/lib/src/gf-changelog-helpers.h index 2d545da9e82..218896b86e7 100644 --- a/xlators/features/changelog/lib/src/gf-changelog-helpers.h +++ b/xlators/features/changelog/lib/src/gf-changelog-helpers.h @@ -24,6 +24,7 @@  #define GF_CHANGELOG_PROCESSED_DIR  ".processed"  #define GF_CHANGELOG_PROCESSING_DIR ".processing"  #define GF_CHANGELOG_HISTORY_DIR    ".history" +#define TIMESTAMP_LENGTH 10  #ifndef MAXLINE  #define MAXLINE 4096 @@ -72,8 +73,45 @@ typedef struct gf_changelog {          /* Holds gfc for History API */          struct gf_changelog *hist_gfc; + +        /* holds 0 done scanning, 1 keep scanning and -1 error */ +        int hist_done;  } gf_changelog_t; +typedef struct gf_changelog_history_data { +        int           len; + +        int           htime_fd; + +        /* parallelism count */ +        int           n_parallel; + +        /* history from, to indexes */ +        unsigned long from; +        unsigned long to; +} gf_changelog_history_data_t; + +typedef struct gf_changelog_consume_data { +        /** set of inputs */ + +        /* fd to read from */ +        int             fd; + +        /* from @offset */ +        off_t           offset; + +        xlator_t       *this; +        gf_changelog_t *gfc; + +        /** set of outputs */ + +        /* return value */ +        int retval; + +        /* journal processed */ +        char changelog[PATH_MAX]; +} gf_changelog_consume_data_t; +  int  gf_changelog_notification_init (xlator_t *this, gf_changelog_t *gfc); @@ -98,4 +136,11 @@ gf_ftruncate (int fd, off_t length);  off_t  gf_lseek (int fd, off_t offset, int whence); +int +gf_changelog_consume (xlator_t *this, +                      gf_changelog_t *gfc, +                      char *from_path, gf_boolean_t no_publish); +int +gf_changelog_publish (xlator_t *this, gf_changelog_t *gfc, char *from_path); +  #endif diff --git a/xlators/features/changelog/lib/src/gf-changelog-process.c b/xlators/features/changelog/lib/src/gf-changelog-process.c index 3ea2700c62b..3b8d2683672 100644 --- a/xlators/features/changelog/lib/src/gf-changelog-process.c +++ b/xlators/features/changelog/lib/src/gf-changelog-process.c @@ -459,8 +459,49 @@ gf_changelog_decode (xlator_t *this, gf_changelog_t *gfc, int from_fd,          return ret;  } -static int -gf_changelog_consume (xlator_t *this, gf_changelog_t *gfc, char *from_path) +int +gf_changelog_publish (xlator_t *this, gf_changelog_t *gfc, char *from_path) +{ +        int         ret        = 0; +        char dest[PATH_MAX]    = {0,}; +        char to_path[PATH_MAX] = {0,}; +        struct stat stbuf      = {0,}; + +        (void) snprintf (to_path, PATH_MAX, "%s%s", +                         gfc->gfc_current_dir, basename (from_path)); + +        /* handle zerob file that wont exist in current */ +        ret = stat (from_path, &stbuf); +        if (ret) +                goto out; + +        if (stbuf.st_size == 0) { +                ret = unlink (from_path); +                if (ret) +                        gf_log (this->name, GF_LOG_ERROR, +                                "could not unlink %s (reason %s)", +                                from_path, strerror (errno)); +                goto out; +        } + +        (void) snprintf (dest, PATH_MAX, "%s%s", +                         gfc->gfc_processing_dir, basename (from_path)); + +        ret = rename (to_path, dest); +        if (ret){ +                gf_log (this->name, GF_LOG_ERROR, +                        "error moving %s to processing dir" +                        " (reason: %s)", to_path, strerror (errno)); +        } + +out: +        return ret; +} + +int +gf_changelog_consume (xlator_t *this, +                      gf_changelog_t *gfc, +                      char *from_path, gf_boolean_t no_publish)  {          int         ret        = -1;          int         fd1        = 0; @@ -472,6 +513,7 @@ gf_changelog_consume (xlator_t *this, gf_changelog_t *gfc, char *from_path)          ret = stat (from_path, &stbuf);          if (ret || !S_ISREG(stbuf.st_mode)) { +                ret = -1;                  gf_log (this->name, GF_LOG_ERROR,                          "stat failed on changelog file: %s", from_path);                  goto out; @@ -506,6 +548,8 @@ gf_changelog_consume (xlator_t *this, gf_changelog_t *gfc, char *from_path)                  if (!ret) {                          /* move it to processing on a successfull                             decode */ +                        if (no_publish == _gf_true) +                                goto close_fd;                          ret = rename (to_path, dest);                          if (ret)                                  gf_log (this->name, GF_LOG_ERROR, @@ -516,10 +560,15 @@ gf_changelog_consume (xlator_t *this, gf_changelog_t *gfc, char *from_path)                  /* remove it from .current if it's an empty file */                  if (zerob) { +                        if (no_publish == _gf_true) { +                                ret = 0; +                                goto close_fd; +                        } +                          ret = unlink (to_path);                          if (ret)                                  gf_log (this->name, GF_LOG_ERROR, -                                        "could not unlink %s (reason: %s", +                                        "could not unlink %s (reason: %s)",                                          to_path, strerror (errno));                  }          } @@ -546,7 +595,7 @@ gf_changelog_ext_change (xlator_t *this,                          alo = 1;                          gf_log (this->name, GF_LOG_DEBUG,                                  "processing changelog: %s", path); -                        ret = gf_changelog_consume (this, gfc, path); +                        ret = gf_changelog_consume (this, gfc, path, _gf_false);                  }                  if (ret) diff --git a/xlators/features/changelog/lib/src/gf-history-changelog.c b/xlators/features/changelog/lib/src/gf-history-changelog.c index bfc4cd37dc3..a895037eeca 100644 --- a/xlators/features/changelog/lib/src/gf-history-changelog.c +++ b/xlators/features/changelog/lib/src/gf-history-changelog.c @@ -11,6 +11,7 @@  #include "globals.h"  #include "glusterfs.h"  #include "logging.h" +#include "syscall.h"  #include "gf-changelog-helpers.h" @@ -18,7 +19,8 @@  #include "changelog-misc.h"  #include "changelog-mem-types.h" -/*@API +/** + * @API   * gf_history_changelog_done:   *    Move processed history changelog file from .processing   *    to .processed @@ -86,6 +88,7 @@ gf_history_changelog_done (char *file)                  free (buffer); /* allocated by realpath() */          return ret;  } +  /**   * @API   *  gf_history_changelog_start_fresh: @@ -126,7 +129,7 @@ gf_history_changelog_start_fresh ()          return -1;  } -/* +/**   * @API   *  gf_history_changelog_next_change:   *     Return the next history changelog file entry. Zero means all @@ -182,7 +185,7 @@ gf_history_changelog_next_change (char *bufptr, size_t maxlen)          return -1;  } -/* +/**   * @API   *  gf_history_changelog_scan:   *     Scan and generate a list of change entries. @@ -191,8 +194,13 @@ gf_history_changelog_next_change (char *bufptr, size_t maxlen)   *     This call also acts as a cancellation point for the consumer.   *   *  RETURN VALUES: - *     nr_entries: On success. - *     -1        : On error. + *      +ve integer : success and keep scanning.(count of changelogs) + *      0           : success and done scanning. + *     -1           : error. + * + *  NOTE: After first 0 return call_get_next change for once more time + *        to empty the tracker + *   */  ssize_t  gf_history_changelog_scan () @@ -204,10 +212,11 @@ gf_history_changelog_scan ()          xlator_t       *this       = NULL;          size_t          nr_entries = 0;          gf_changelog_t *gfc        = NULL; -        gf_changelog_t *hist_gfc        = NULL; +        gf_changelog_t *hist_gfc   = NULL;          struct dirent  *entryp     = NULL;          struct dirent  *result     = NULL;          char buffer[PATH_MAX]      = {0,}; +        static int    is_last_scan = 0;          this = THIS;          if (!this) @@ -221,15 +230,23 @@ gf_history_changelog_scan ()          if (!hist_gfc)                  goto out; + retry: +        if (is_last_scan == 1) +                return 0; +        if (hist_gfc->hist_done == 0) +                is_last_scan = 1; +          errno = EINVAL; +        if (hist_gfc->hist_done == -1) +                goto out;          tracker_fd = hist_gfc->gfc_fd;          if (gf_ftruncate (tracker_fd, 0))                  goto out; -        len = offsetof(struct dirent, d_name) -                + pathconf(hist_gfc->gfc_processing_dir, _PC_NAME_MAX) + 1; +        len = offsetof (struct dirent, d_name) +                + pathconf (hist_gfc->gfc_processing_dir, _PC_NAME_MAX) + 1;          entryp = GF_CALLOC (1, len,                              gf_changelog_mt_libgfchangelog_dirent_t);          if (!entryp) @@ -265,10 +282,649 @@ gf_history_changelog_scan ()          GF_FREE (entryp); +        gf_log (this->name, GF_LOG_DEBUG, +                "hist_done %d, is_last_scan: %d", hist_gfc->hist_done, is_last_scan); +          if (!result) { -                if (gf_lseek (tracker_fd, 0, SEEK_SET) != -1) -                        return nr_entries; +                if (gf_lseek (tracker_fd, 0, SEEK_SET) != -1) { +                        if (nr_entries > 0) +                                return nr_entries; +                        else { +                                sleep(1); +                                goto retry; +                        } +                }          }   out:          return -1;  } + +/* + * Gets timestamp value at the changelog path at index. + * Returns 0 on success(updates given time-stamp), -1 on failure. + */ +int +gf_history_get_timestamp (int fd, int index, int len, +                          unsigned long *ts) +{ +        xlator_t        *this             = NULL; +        int             n_read            = -1; +        char            path_buf[PATH_MAX]= {0,}; +        char            *iter             = path_buf; +        size_t          offset            = index * (len+1); +        unsigned long   value             = 0; +        int             ret               = 0; + +        this = THIS; +        if (!this) { +                return -1; +        } + +        n_read = pread (fd, path_buf, len, offset); +        if (n_read < 0 ) { +                ret = -1; +                gf_log ( this->name, GF_LOG_ERROR, +                         "could not read from htime file"); +                goto out; +        } +        iter+= len - TIMESTAMP_LENGTH; +        sscanf (iter, "%lu",&value); +out: +        if(ret == 0) +                *ts = value; +        return ret; +} + +/* + * Function to ensure correctness of search + * Checks whether @value is there next to @target_index or not + */ +int +gf_history_check ( int fd, int target_index, unsigned long value, int len) +{ +        int             ret = 0; +        unsigned long   ts1 = 0; +        unsigned long   ts2 = 0; + +        if (target_index == 0) { +                ret = gf_history_get_timestamp (fd, target_index, len, &ts1); +                if (ret == -1) +                        goto out; +                if (value <= ts1) +                        goto out; +                else { +                        ret = -1; +                        goto out; +                } +        } + +        ret = gf_history_get_timestamp (fd, target_index, len, &ts1); +        if (ret ==-1) +                goto out; +        ret = gf_history_get_timestamp (fd, target_index -1, len, &ts2); +        if (ret ==-1) +                goto out; + +        if ( (value <= ts1) && (value > ts2) ) { +                goto out; +        } +        else +                ret = -1; +out: +        return ret; +} + +/* + * This is a "binary search" based search function which checks neighbours + * for in-range availability of the value to be searched and provides the + * index at which the changelog file nearest to the requested timestamp(value) + * can be read from. + * + * Actual offset can be calculated as (index* (len+1) ). + * "1" is because the changelog paths are null terminated. + * + * @path        : Htime file to search in + * @value       : time stamp to search + * @from        : start index to search + * @to          : end index to search + * @len         : length of fixes length strings seperated by null + */ + +int +gf_history_b_search (int fd, unsigned long value, +                     unsigned long from, unsigned long to, int len) +{ +        int             m_index   = -1; +        unsigned long   cur_value = 0; +        unsigned long   ts1       = 0; +        int             ret       = 0; + +        m_index = (from + to)/2; + +        if ( (to - from) <=1 ) { +                /* either one or 2 changelogs left */ +                if ( to != from ) { +                        /* check if value is less or greater than to +                         * return accordingly +                         */ +                        ret = gf_history_get_timestamp (fd, from, len, &ts1); +                        if (ret ==-1) +                                goto out; +                        if ( ts1 >= value) { +                                /* actually compatision should be +                                 * exactly == but considering +                                 * +                                 * case of only 2 changelogs in htime file +                                 */ +                                return from; +                        } +                        else +                                return to; +                } +                else +                        return to; +        } + +        ret = gf_history_get_timestamp (fd, m_index, len, &cur_value); +        if (ret == -1) +                goto out; +        if (cur_value == value) { +                return m_index; +        } +        else if (value > cur_value) { +                ret = gf_history_get_timestamp (fd, m_index+1, len, &cur_value); +                if (ret == -1) +                        goto out; +                if (value < cur_value) +                        return m_index + 1; +                else +                        return gf_history_b_search (fd, value, +                                                    m_index+1, to, len); +        } +        else { +                if (m_index ==0) { +                       /*  we are sure that values exists +                        *  in this htime file +                        */ +                        return 0; +                } +                else { +                        ret = gf_history_get_timestamp (fd, m_index-1, len, +                                                        &cur_value); +                        if (ret == -1) +                                goto out; +                        if (value > cur_value) { +                                return m_index; +                        } +                        else +                                return gf_history_b_search (fd, value, from, +                                                            m_index-1, len); +                } +        } +out: +        return -1; +} + +void * +gf_changelog_consume_wrap (void* data) +{ +        int                          ret   = -1; +        ssize_t                      nread = 0; +        xlator_t                    *this  = NULL; +        gf_changelog_consume_data_t *ccd   = NULL; + +        ccd = (gf_changelog_consume_data_t *) data; +        this = ccd->this; + +        ccd->retval = -1; + +        nread = pread (ccd->fd, ccd->changelog, PATH_MAX, ccd->offset); +        if (nread < 0) { +                gf_log (this->name, GF_LOG_ERROR, +                        "cannot read from history metadata file (reason %s)", +                        strerror (errno)); +                goto out; +        } + +        /* TODO: handle short reads and EOF. */ + +        ret = gf_changelog_consume (ccd->this, +                                    ccd->gfc, ccd->changelog, _gf_true); +        if (ret) { +                gf_log (this->name, GF_LOG_ERROR, +                        "could not parse changelog: %s", ccd->changelog); +                goto out; +        } + +        ccd->retval = 0; + + out: +        return NULL; +} + +/** + * "gf_history_consume" is a worker function for history. + * parses and moves changelogs files from index "from" + * to index "to" in open htime file whose fd is "fd". + */ + +#define MAX_PARALLELS  10 + +void * +gf_history_consume (void * data) +{ +        xlator_t                    *this              = NULL; +        gf_changelog_t              *gfc               = NULL; +        gf_changelog_t              *hist_gfc          = NULL; +        int                          ret               = 0; +        int                          iter              = 0; +        int                          fd                = -1; +        int                          from              = -1; +        int                          to                = -1; +        int                          len               = -1; +        int                          n_parallel        = 0; +        int                          n_envoked         = 0; +        gf_boolean_t                 publish           = _gf_true; +        pthread_t th_id[MAX_PARALLELS]                 = {0,}; +        gf_changelog_history_data_t *hist_data         = NULL; +        gf_changelog_consume_data_t ccd[MAX_PARALLELS] = {{0},}; +        gf_changelog_consume_data_t *curr              = NULL; + +        hist_data = (gf_changelog_history_data_t *) data; +        if (hist_data == NULL) { +                ret = -1; +                goto out; +        } + +        fd         = hist_data->htime_fd; +        from       = hist_data->from; +        to         = hist_data->to; +        len        = hist_data->len; +        n_parallel = hist_data->n_parallel; + +        this = THIS; +        if (!this) { +                ret = -1; +                goto out; +        } + +        gfc = (gf_changelog_t *) this->private; +        if (!gfc) { +                ret = -1; +                goto out; +        } + +        hist_gfc = gfc->hist_gfc; +        if (!hist_gfc) { +                ret = -1; +                goto out; +        } + +        while (from <= to) { +                n_envoked = 0; + +                for (iter = 0 ; (iter < n_parallel) && (from <= to); iter++) { +                        curr = &ccd[iter]; + +                        curr->this   = this; +                        curr->gfc    = hist_gfc; +                        curr->fd     = fd; +                        curr->offset = from * (len + 1); + +                        curr->retval = 0; +                        memset (curr->changelog, '\0', PATH_MAX); + +                        ret = pthread_create (&th_id[iter], NULL, +                                              gf_changelog_consume_wrap, curr); +                        if (ret) { +                                gf_log ( this->name, GF_LOG_ERROR, +                                        "could not create consume-thread" +                                        " reason (%s)", strerror (ret)); +                                ret = -1; +                                goto sync; +                        } else +                                n_envoked++; + +                        from++; +                } + +        sync: +                for (iter = 0; iter < n_envoked; iter++) { +                        ret = pthread_join (th_id[iter], NULL); +                        if (ret) { +                                publish = _gf_false; +                                gf_log (this->name, GF_LOG_ERROR, +                                        "pthread_join() error %s", +                                        strerror (ret)); +                                /* try to join the rest */ +                                continue; +                        } + +                        if (publish == _gf_false) +                                continue; + +                        curr = &ccd[iter]; +                        if (ccd->retval) { +                                publish = _gf_false; +                                gf_log (this->name, GF_LOG_ERROR, +                                        "parsing error, ceased publishing..."); +                                continue; +                        } + +                        ret = gf_changelog_publish (curr->this, +                                                    curr->gfc, curr->changelog); +                        if (ret) { +                                publish = _gf_false; +                                gf_log (this->name, GF_LOG_ERROR, +                                        "publish error, ceased publishing..."); +                        } +                } +        } + +       /* informing "parsing done". */ +        hist_gfc->hist_done = (publish == _gf_true) ? 0 : -1; + +out: +        if (fd != -1) +                close (fd); +        GF_FREE (hist_data); +        return NULL; +} + +/** + * @API + * gf_history_changelog() : Get/parse historical changelogs and get them ready + * for consumption. + * + * Arguments: + * @changelog_dir : Directory location from where history changelogs are + * supposed to be consumed. + * @start: Unix timestamp FROM where changelogs should be consumed. + * @end: Unix timestamp TO where changelogsshould be consumed. + * @n_parallel : degree of parallelism while changelog parsing. + * @actual_end : the end time till where changelogs are available. + * + * Return: + * Returns <timestamp> on success, the last time till where changelogs are + *      available. + * Returns -1 on failure(error). + */ + +#define MAKE_HTIME_FILE_PATH(htime_file, htime_dir, htime_bname) do {   \ +                strcpy (htime_file, htime_dir);                         \ +                strcat (htime_file, "/");                               \ +                strcat (htime_file, htime_bname);                       \ +        } while (0) + +/** + * Extract timestamp range from a historical metadata file + * Returns: + *    0 : Success ({min,max}_ts with the appropriate values) + *   -1 : Failure + *   -2 : Ignore this metadata file and process next + */ +int +gf_changelog_extract_min_max (const char *dname, const char *htime_dir, +                              int *fd, unsigned long *total, +                              unsigned long *min_ts, unsigned long *max_ts) +{ +        int          ret          = -1; +        xlator_t    *this         = NULL; +        char htime_file[PATH_MAX] = {0,}; +        struct stat  stbuf        = {0,}; +        char        *iter         = NULL; +        char x_value[30]          = {0,}; + +        this = THIS; + +        MAKE_HTIME_FILE_PATH (htime_file, htime_dir, dname); + +        iter = (htime_file + strlen (htime_file) - TIMESTAMP_LENGTH); +        sscanf (iter ,"%lu",min_ts); + +        ret = stat (htime_file, &stbuf); +        if (ret) { +                ret = -1; +                gf_log (this->name, GF_LOG_ERROR, +                        "stat() failed on htime file %s (reason %s)", +                        htime_file, strerror (errno)); +                goto out; +        } + +        /* ignore everything except regular files */ +        if (!S_ISREG (stbuf.st_mode)) { +                ret = -2; +                goto out; +        } + +        *fd = open (htime_file, O_RDONLY); +        if (*fd < 0) { +                ret = -1; +                gf_log (this->name, GF_LOG_ERROR, +                        "open() failed for htime %s (reasong %s)", +                        htime_file, strerror (errno)); +                goto out; +        } + +        /* Looks good, extract max timestamp */ +        ret = sys_fgetxattr (*fd, HTIME_KEY, x_value, sizeof (x_value)); +        if (ret < 0) { +                ret = -1; +                gf_log (this->name, GF_LOG_ERROR, +                        "error extracting max timstamp from htime file" +                        " %s (reason %s)", htime_file, strerror (errno)); +                goto out; +        } + +        sscanf (x_value, "%lu:%lu", max_ts, total); +        gf_log (this->name, GF_LOG_INFO, +                "MIN: %lu, MAX: %lu, TOTAL CHANGELOGS: %lu", +                *min_ts, *max_ts, *total); + +        ret = 0; + + out: +        return ret; +} + +int +gf_history_changelog (char* changelog_dir, unsigned long start, +                      unsigned long end, int n_parallel, +                      unsigned long *actual_end) +{ +        int                          ret             = 0; +        int                          len             = -1; +        int                          fd              = -1; +        int                          n_read          = -1; +        unsigned long                min_ts          = 0; +        unsigned long                max_ts          = 0; +        unsigned long                end2            = 0; +        unsigned long                ts1             = 0; +        unsigned long                ts2             = 0; +        unsigned long                to              = 0; +        unsigned long                from            = 0; +        unsigned long                total_changelog = 0; +        xlator_t                    *this            = NULL; +        gf_changelog_t              *gfc             = NULL; +        gf_changelog_t              *hist_gfc        = NULL; +        gf_changelog_history_data_t *hist_data       = NULL; +        DIR                         *dirp            = NULL; +        struct dirent               *dp              = NULL; +        pthread_t                    consume_th      = 0; +        char            htime_dir[PATH_MAX]          = {0,}; +        char buffer[PATH_MAX]                        = {0,}; + +        pthread_attr_t attr; + +        this = THIS; +        if (!this) { +                ret = -1; +                goto out; +        } + +        gfc = (gf_changelog_t *) this->private; +        if (!gfc) { +                ret = -1; +                goto out; +        } + +        hist_gfc = (gf_changelog_t *) gfc->hist_gfc; +        if (!hist_gfc) { +                ret = -1; +                goto out; +        } + +        /* basic sanity check */ +        if (start > end || n_parallel <= 0) { +                ret = -1; +                goto out; +        } + +        /* cap parallelism count */ +        if (n_parallel > MAX_PARALLELS) +                n_parallel = MAX_PARALLELS; + +        CHANGELOG_FILL_HTIME_DIR (changelog_dir, htime_dir); + +        dirp = opendir (htime_dir); +        if (dirp == NULL) { +                gf_log (this->name, GF_LOG_ERROR, +                        "open dir on htime failed : %s (reason: %s)", +                        htime_dir, strerror (errno)); +                ret = -1; +                goto out; +        } + +        while ((dp = readdir (dirp)) != NULL) { +                ret = gf_changelog_extract_min_max (dp->d_name, htime_dir, +                                                    &fd, &total_changelog, +                                                    &min_ts, &max_ts); +                if (ret) { +                        if (-2 == ret) +                                continue; +                        goto out; +                } + +                if (start >= min_ts && start < max_ts) { +                        /** +                         * TODO: handle short reads later... +                         */ +                        n_read = read (fd, buffer, PATH_MAX); +                        if (n_read < 0) { +                                ret = -1; +                                gf_log ( this->name, GF_LOG_ERROR, +                                        "unable to read htime file"); +                                goto out; +                        } + +                        len = strlen (buffer); + +                        /** +                         * search @start in the htime file returning it's index +                         * (@from) +                         */ +                        from = gf_history_b_search (fd, start, 0, +                                                   total_changelog - 1, len); + +                        /* ensuring correctness of gf_b_search */ +                        if (gf_history_check (fd, from, start, len) != 0) { +                                ret = -1; +                                gf_log (this->name, GF_LOG_ERROR, +                                        "wrong result for start: %lu idx: %lu", +                                        start, from); +                                goto out; +                        } + +                        end2 = (end <= max_ts) ? end : max_ts; + +                        /** +                         * search @end2 in htime file returning it's index (@to) +                         */ +                        to = gf_history_b_search (fd, end2, +                                                  0, total_changelog - 1, len); + +                        if (gf_history_check (fd, to, end2, len) != 0) { +                                ret = -1; +                                gf_log (this->name, GF_LOG_ERROR, +                                        "wrong result for start: %lu idx: %lu", +                                        end2, to); +                                goto out; +                        } + +                        ret = gf_history_get_timestamp (fd, from, len, &ts1); +                        if (ret == -1) +                                goto out; + +                        ret = gf_history_get_timestamp (fd, to, len, &ts2); +                        if (ret == -1) +                                goto out; + +                        gf_log (this->name, GF_LOG_INFO, +                                "FINAL: from: %lu, to: %lu, changes: %lu", +                                ts1, ts2, (to - from + 1)); + +                        hist_data =  GF_CALLOC (1, +                                     sizeof (gf_changelog_history_data_t), +                                     gf_changelog_mt_history_data_t); + +                        hist_data->htime_fd   = fd; +                        hist_data->from       = from; +                        hist_data->to         = to; +                        hist_data->len        = len; +                        hist_data->n_parallel = n_parallel; + +                        ret = pthread_attr_init (&attr); +                        if (ret != 0) { +                                ret = -1; +                                goto out; +                        } + +                        ret = pthread_attr_setdetachstate +                                (&attr, PTHREAD_CREATE_DETACHED); +                        if (ret != 0) { +                                gf_log (this->name, GF_LOG_ERROR, +                                        "unable to sets the detach" +                                        " state attribute, reason(%s)", +                                        strerror (ret)); +                                ret = -1; +                                goto out; +                        } + +                        /* spawn a thread for background parsing & publishing */ +                        ret = pthread_create (&consume_th, &attr, +                                              gf_history_consume, hist_data); +                        if (ret) { +                                gf_log (this->name, GF_LOG_ERROR, +                                        "creation of consume parent-thread" +                                        " failed. reason(%s)", strerror (ret)); +                                ret = -1; +                                goto out; +                        } + +                        goto out; + +                } /* end of range check */ + +        } /* end of readdir() */ + +        if (!from || !to) +                ret = -1; + +out: +        if (dirp != NULL) +                closedir (dirp); + +        if (ret < 0) { +                if (fd != -1) +                        close (fd); +                GF_FREE (hist_data); +                (void) pthread_attr_destroy (&attr); + +                return ret; +        } + +        hist_gfc->hist_done = 1; +        *actual_end = ts2; + +        return ret; +}  | 
