diff options
| author | Gluster Ant <bugzilla-bot@gluster.org> | 2018-09-12 17:52:45 +0530 | 
|---|---|---|
| committer | Nigel Babu <nigelb@redhat.com> | 2018-09-12 17:52:45 +0530 | 
| commit | e16868dede6455cab644805af6fe1ac312775e13 (patch) | |
| tree | 15aebdb4fff2d87cf8a72f836816b3aa634da58d /xlators/features/changelog/lib | |
| parent | 45a71c0548b6fd2c757aa2e7b7671a1411948894 (diff) | |
Land part 2 of clang-format changes
Change-Id: Ia84cc24c8924e6d22d02ac15f611c10e26db99b4
Signed-off-by: Nigel Babu <nigelb@redhat.com>
Diffstat (limited to 'xlators/features/changelog/lib')
10 files changed, 2766 insertions, 2841 deletions
diff --git a/xlators/features/changelog/lib/examples/c/get-changes-multi.c b/xlators/features/changelog/lib/examples/c/get-changes-multi.c index 3741bdf6edc..5ea5bbb6630 100644 --- a/xlators/features/changelog/lib/examples/c/get-changes-multi.c +++ b/xlators/features/changelog/lib/examples/c/get-changes-multi.c @@ -25,64 +25,66 @@  #include "changelog.h" -void *brick_init (void *xl, struct gf_brick_spec *brick) +void * +brick_init(void *xl, struct gf_brick_spec *brick)  { -        return brick; +    return brick;  } -void brick_fini (void *xl, char *brick, void *data) +void +brick_fini(void *xl, char *brick, void *data)  { -        return; +    return;  } -void brick_callback (void *xl, char *brick, -                    void *data, changelog_event_t *ev) +void +brick_callback(void *xl, char *brick, void *data, changelog_event_t *ev)  { -        printf ("->callback: (brick,type) [%s:%d]\n", brick, ev->ev_type); +    printf("->callback: (brick,type) [%s:%d]\n", brick, ev->ev_type);  } -void fill_brick_spec (struct gf_brick_spec *brick, char *path) +void +fill_brick_spec(struct gf_brick_spec *brick, char *path)  { -        brick->brick_path = strdup (path); -        brick->filter = CHANGELOG_OP_TYPE_BR_RELEASE; - -        brick->init         = brick_init; -        brick->fini         = brick_fini; -        brick->callback     = brick_callback; -        brick->connected    = NULL; -        brick->disconnected = NULL; +    brick->brick_path = strdup(path); +    brick->filter = CHANGELOG_OP_TYPE_BR_RELEASE; + +    brick->init = brick_init; +    brick->fini = brick_fini; +    brick->callback = brick_callback; +    brick->connected = NULL; +    brick->disconnected = NULL;  }  int -main (int argc, char **argv) +main(int argc, char **argv)  { -        int ret = 0; -        void *bricks = NULL; -        struct gf_brick_spec *brick = NULL; +    int ret = 0; +    void *bricks = NULL; +    struct gf_brick_spec *brick = NULL; -        bricks = calloc (2, sizeof (struct gf_brick_spec)); -        if (!bricks) -                goto error_return; +    bricks = calloc(2, sizeof(struct gf_brick_spec)); +    if (!bricks) +        goto error_return; -        brick = (struct gf_brick_spec *)bricks; -        fill_brick_spec (brick, "/export/z1/zwoop"); +    brick = (struct gf_brick_spec *)bricks; +    fill_brick_spec(brick, "/export/z1/zwoop"); -        brick++; -        fill_brick_spec (brick, "/export/z2/zwoop"); +    brick++; +    fill_brick_spec(brick, "/export/z2/zwoop"); -        ret = gf_changelog_init (NULL); -        if (ret) -                goto error_return; +    ret = gf_changelog_init(NULL); +    if (ret) +        goto error_return; -        ret = gf_changelog_register_generic ((struct gf_brick_spec *)bricks, 2, -                                             0, "/tmp/multi-changes.log", 9, -                                             NULL); -        if (ret) -                goto error_return; +    ret = gf_changelog_register_generic((struct gf_brick_spec *)bricks, 2, 0, +                                        "/tmp/multi-changes.log", 9, NULL); +    if (ret) +        goto error_return; -        /* let callbacks do the job */ -        select (0, NULL, NULL, NULL, NULL); +    /* let callbacks do the job */ +    select(0, NULL, NULL, NULL, NULL); - error_return: -        return -1; +error_return: +    return -1;  } diff --git a/xlators/features/changelog/lib/examples/c/get-changes.c b/xlators/features/changelog/lib/examples/c/get-changes.c index ef766c566b6..8bc651c24a4 100644 --- a/xlators/features/changelog/lib/examples/c/get-changes.c +++ b/xlators/features/changelog/lib/examples/c/get-changes.c @@ -27,67 +27,67 @@  #include "changelog.h" -#define handle_error(fn)                                \ -        printf ("%s (reason: %s)\n", fn, strerror (errno)) +#define handle_error(fn) printf("%s (reason: %s)\n", fn, strerror(errno))  int -main (int argc, char ** argv) +main(int argc, char **argv)  { -        int     i           = 0; -        int     ret         = 0; -        ssize_t nr_changes  = 0; -        ssize_t changes     = 0; -        char fbuf[PATH_MAX] = {0,}; - -        ret = gf_changelog_init (NULL); -        if (ret) { -                handle_error ("Init failed"); -                goto out; +    int i = 0; +    int ret = 0; +    ssize_t nr_changes = 0; +    ssize_t changes = 0; +    char fbuf[PATH_MAX] = { +        0, +    }; + +    ret = gf_changelog_init(NULL); +    if (ret) { +        handle_error("Init failed"); +        goto out; +    } + +    /* get changes for brick "/home/vshankar/export/yow/yow-1" */ +    ret = gf_changelog_register("/export/z1/zwoop", "/tmp/scratch", +                                "/tmp/change.log", 9, 5); +    if (ret) { +        handle_error("register failed"); +        goto out; +    } + +    while (1) { +        i = 0; +        nr_changes = gf_changelog_scan(); +        if (nr_changes < 0) { +            handle_error("scan(): "); +            break;          } -        /* get changes for brick "/home/vshankar/export/yow/yow-1" */ -        ret = gf_changelog_register ("/export/z1/zwoop", -                                     "/tmp/scratch", "/tmp/change.log", 9, 5); -        if (ret) { -                handle_error ("register failed"); -                goto out; -        } - -        while (1) { -                i = 0; -                nr_changes = gf_changelog_scan (); -                if (nr_changes < 0) { -                        handle_error ("scan(): "); -                        break; -                } - -                if (nr_changes == 0) -                        goto next; +        if (nr_changes == 0) +            goto next; -                printf ("Got %ld changelog files\n", nr_changes); +        printf("Got %ld changelog files\n", nr_changes); -                while ( (changes = -                         gf_changelog_next_change (fbuf, PATH_MAX)) > 0) { -                        printf ("changelog file [%d]: %s\n", ++i, fbuf); +        while ((changes = gf_changelog_next_change(fbuf, PATH_MAX)) > 0) { +            printf("changelog file [%d]: %s\n", ++i, fbuf); -                        /* process changelog */ -                        /* ... */ -                        /* ... */ -                        /* ... */ -                        /* done processing */ +            /* process changelog */ +            /* ... */ +            /* ... */ +            /* ... */ +            /* done processing */ -                        ret = gf_changelog_done (fbuf); -                        if (ret) -                                handle_error ("gf_changelog_done"); -                } +            ret = gf_changelog_done(fbuf); +            if (ret) +                handle_error("gf_changelog_done"); +        } -                if (changes == -1) -                        handle_error ("gf_changelog_next_change"); +        if (changes == -1) +            handle_error("gf_changelog_next_change"); -        next: -                sleep (10); -        } +    next: +        sleep(10); +    } - out: -        return ret; +out: +    return ret;  } diff --git a/xlators/features/changelog/lib/examples/c/get-history.c b/xlators/features/changelog/lib/examples/c/get-history.c index ee3ec0ad100..3e888d75ca6 100644 --- a/xlators/features/changelog/lib/examples/c/get-history.c +++ b/xlators/features/changelog/lib/examples/c/get-history.c @@ -27,90 +27,90 @@  #include "changelog.h" -#define handle_error(fn)                                \ -        printf ("%s (reason: %s)\n", fn, strerror (errno)) +#define handle_error(fn) printf("%s (reason: %s)\n", fn, strerror(errno))  int -main (int argc, char ** argv) +main(int argc, char **argv)  { -        int     i            = 0; -        int     ret          = 0; -        ssize_t nr_changes   = 0; -        ssize_t changes      = 0; -        char fbuf[PATH_MAX]  = {0,}; -        unsigned long end_ts = 0; - -        ret = gf_changelog_init (NULL); -        if (ret) { -                handle_error ("init failed"); -                goto out; +    int i = 0; +    int ret = 0; +    ssize_t nr_changes = 0; +    ssize_t changes = 0; +    char fbuf[PATH_MAX] = { +        0, +    }; +    unsigned long end_ts = 0; + +    ret = gf_changelog_init(NULL); +    if (ret) { +        handle_error("init failed"); +        goto out; +    } + +    ret = gf_changelog_register("/export/z1/zwoop", "/tmp/scratch_v1", +                                "/tmp/changes.log", 9, 5); +    if (ret) { +        handle_error("register failed"); +        goto out; +    } + +    int a, b; +    printf("give the two numbers start and end\t"); +    scanf("%d%d", &a, &b); +    ret = gf_history_changelog("/export/z1/zwoop/.glusterfs/changelogs", a, b, +                               3, &end_ts); +    if (ret == -1) { +        printf("history failed"); +        goto out; +    } + +    printf("end time till when changelog available : %d , ret(%d) \t", end_ts, +           ret); +    fflush(stdout); + +    while (1) { +        nr_changes = gf_history_changelog_scan(); +        printf("scanned, nr_changes : %d\n", nr_changes); +        if (nr_changes < 0) { +            handle_error("scan(): "); +            break;          } -        ret = gf_changelog_register ("/export/z1/zwoop", -                                     "/tmp/scratch_v1", "/tmp/changes.log", -                                     9, 5); -        if (ret) { -                handle_error ("register failed"); -                goto out; +        if (nr_changes == 0) { +            printf("done scanning \n"); +            goto out;          } -        int a, b; -        printf ("give the two numbers start and end\t"); -        scanf ("%d%d", &a, &b); -        ret = gf_history_changelog ("/export/z1/zwoop/.glusterfs/changelogs", -                                    a, b, 3, &end_ts); -        if (ret == -1) { -                printf ("history failed"); -                goto out; -        } +        printf("Got %ld changelog files\n", nr_changes); + +        while ((changes = gf_history_changelog_next_change(fbuf, PATH_MAX)) > +               0) { +            printf("changelog file [%d]: %s\n", ++i, fbuf); -        printf ("end time till when changelog available : %d , ret(%d) \t", end_ts, ret); -        fflush(stdout); - -        while (1) { -                nr_changes = gf_history_changelog_scan (); -                printf ("scanned, nr_changes : %d\n",nr_changes); -                if (nr_changes < 0) { -                        handle_error ("scan(): "); -                        break; -                } - -                if (nr_changes == 0) { -                        printf ("done scanning \n"); -                        goto out; -                } - -                printf ("Got %ld changelog files\n", nr_changes); - -                while ( (changes = -                         gf_history_changelog_next_change (fbuf, PATH_MAX)) > 0) { -                        printf ("changelog file [%d]: %s\n", ++i, fbuf); - -                        /* process changelog */ -                        /* ... */ -                        /* ... */ -                        /* ... */ -                        /* done processing */ - -                        ret = gf_history_changelog_done (fbuf); -                        if (ret) -                                handle_error ("gf_changelog_done"); -                } -                /* -                if (changes == -1) -                        handle_error ("gf_changelog_next_change"); -                if (nr_changes ==1){ -                        printf("continue scanning\n"); -                } - -                if(nr_changes == 0){ -                        printf("done scanning \n"); -                        goto out; -                } -                */ +            /* process changelog */ +            /* ... */ +            /* ... */ +            /* ... */ +            /* done processing */ + +            ret = gf_history_changelog_done(fbuf); +            if (ret) +                handle_error("gf_changelog_done"); +        } +        /* +        if (changes == -1) +                handle_error ("gf_changelog_next_change"); +        if (nr_changes ==1){ +                printf("continue scanning\n");          } +        if(nr_changes == 0){ +                printf("done scanning \n"); +                goto out; +        } +        */ +    }  out: -        return ret; +    return ret;  } diff --git a/xlators/features/changelog/lib/src/gf-changelog-api.c b/xlators/features/changelog/lib/src/gf-changelog-api.c index 372550c7acf..1b6e932596d 100644 --- a/xlators/features/changelog/lib/src/gf-changelog-api.c +++ b/xlators/features/changelog/lib/src/gf-changelog-api.c @@ -19,57 +19,54 @@  #include "changelog-lib-messages.h"  int -gf_changelog_done (char *file) +gf_changelog_done(char *file)  { -        int                     ret    = -1; -        char                   *buffer = NULL; -        xlator_t               *this   = NULL; -        gf_changelog_journal_t *jnl    = NULL; -        char to_path[PATH_MAX]         = {0,}; - -        errno = EINVAL; - -        this = THIS; -        if (!this) -                goto out; - -        jnl = (gf_changelog_journal_t *) GF_CHANGELOG_GET_API_PTR (this); -        if (!jnl) -                goto out; - -        if (!file || !strlen (file)) -                goto out; - -        /* make sure 'file' is inside ->jnl_working_dir */ -        buffer = realpath (file, NULL); -        if (!buffer) -                goto out; - -        if (strncmp (jnl->jnl_working_dir, -                     buffer, strlen (jnl->jnl_working_dir))) -                goto out; - -        (void) snprintf (to_path, PATH_MAX, "%s%s", -                         jnl->jnl_processed_dir, basename (buffer)); -        gf_msg_debug (this->name, 0, -                      "moving %s to processed directory", file); -        ret = sys_rename (buffer, to_path); -        if (ret) { -                gf_smsg (this->name, GF_LOG_ERROR, errno, -                         CHANGELOG_LIB_MSG_RENAME_FAILED, -                         "cannot move changelog file", -                         "from=%s", file, -                         "to=%s", to_path, -                         NULL); -                goto out; -        } - -        ret = 0; +    int ret = -1; +    char *buffer = NULL; +    xlator_t *this = NULL; +    gf_changelog_journal_t *jnl = NULL; +    char to_path[PATH_MAX] = { +        0, +    }; + +    errno = EINVAL; + +    this = THIS; +    if (!this) +        goto out; + +    jnl = (gf_changelog_journal_t *)GF_CHANGELOG_GET_API_PTR(this); +    if (!jnl) +        goto out; + +    if (!file || !strlen(file)) +        goto out; + +    /* make sure 'file' is inside ->jnl_working_dir */ +    buffer = realpath(file, NULL); +    if (!buffer) +        goto out; + +    if (strncmp(jnl->jnl_working_dir, buffer, strlen(jnl->jnl_working_dir))) +        goto out; + +    (void)snprintf(to_path, PATH_MAX, "%s%s", jnl->jnl_processed_dir, +                   basename(buffer)); +    gf_msg_debug(this->name, 0, "moving %s to processed directory", file); +    ret = sys_rename(buffer, to_path); +    if (ret) { +        gf_smsg(this->name, GF_LOG_ERROR, errno, +                CHANGELOG_LIB_MSG_RENAME_FAILED, "cannot move changelog file", +                "from=%s", file, "to=%s", to_path, NULL); +        goto out; +    } + +    ret = 0; - out: -        if (buffer) -                free (buffer); /* allocated by realpath() */ -        return ret; +out: +    if (buffer) +        free(buffer); /* allocated by realpath() */ +    return ret;  }  /** @@ -77,28 +74,28 @@ gf_changelog_done (char *file)   *  for a set of changelogs, start from the beginning   */  int -gf_changelog_start_fresh () +gf_changelog_start_fresh()  { -        xlator_t *this = NULL; -        gf_changelog_journal_t *jnl = NULL; +    xlator_t *this = NULL; +    gf_changelog_journal_t *jnl = NULL; -        this = THIS; -        if (!this) -                goto out; +    this = THIS; +    if (!this) +        goto out; -        errno = EINVAL; +    errno = EINVAL; -        jnl = (gf_changelog_journal_t *) GF_CHANGELOG_GET_API_PTR (this); -        if (!jnl) -                goto out; +    jnl = (gf_changelog_journal_t *)GF_CHANGELOG_GET_API_PTR(this); +    if (!jnl) +        goto out; -        if (gf_ftruncate (jnl->jnl_fd, 0)) -                goto out; +    if (gf_ftruncate(jnl->jnl_fd, 0)) +        goto out; -        return 0; +    return 0; - out: -        return -1; +out: +    return -1;  }  /** @@ -107,40 +104,42 @@ gf_changelog_start_fresh ()   * consumed.   */  ssize_t -gf_changelog_next_change (char *bufptr, size_t maxlen) +gf_changelog_next_change(char *bufptr, size_t maxlen)  { -        ssize_t         size       = -1; -        int             tracker_fd = 0; -        xlator_t       *this       = NULL; -        gf_changelog_journal_t *jnl = NULL; -        char buffer[PATH_MAX]      = {0,}; +    ssize_t size = -1; +    int tracker_fd = 0; +    xlator_t *this = NULL; +    gf_changelog_journal_t *jnl = NULL; +    char buffer[PATH_MAX] = { +        0, +    }; -        errno = EINVAL; +    errno = EINVAL; -        this = THIS; -        if (!this) -                goto out; +    this = THIS; +    if (!this) +        goto out; -        jnl = (gf_changelog_journal_t *) GF_CHANGELOG_GET_API_PTR (this); -        if (!jnl) -                goto out; +    jnl = (gf_changelog_journal_t *)GF_CHANGELOG_GET_API_PTR(this); +    if (!jnl) +        goto out; -        tracker_fd = jnl->jnl_fd; +    tracker_fd = jnl->jnl_fd; -        size = gf_readline (tracker_fd, buffer, maxlen); -        if (size < 0) { -                size = -1; -                goto out; -        } +    size = gf_readline(tracker_fd, buffer, maxlen); +    if (size < 0) { +        size = -1; +        goto out; +    } -        if (size == 0) -                goto out; +    if (size == 0) +        goto out; -        memcpy (bufptr, buffer, size - 1); -        bufptr[size - 1] = '\0'; +    memcpy(bufptr, buffer, size - 1); +    bufptr[size - 1] = '\0';  out: -        return size; +    return size;  }  /** @@ -152,70 +151,74 @@ out:   * This call also acts as a cancellation point for the consumer.   */  ssize_t -gf_changelog_scan () +gf_changelog_scan()  { -        int             tracker_fd  = 0; -        size_t          off         = 0; -        xlator_t       *this        = NULL; -        size_t          nr_entries  = 0; -        gf_changelog_journal_t *jnl = NULL; -        struct dirent  *entry       = NULL; -        struct dirent   scratch[2]  = {{0,},}; -        char            buffer[PATH_MAX] = {0,}; - -        this = THIS; -        if (!this) -                goto out; - -        jnl = (gf_changelog_journal_t *) GF_CHANGELOG_GET_API_PTR (this); -        if (!jnl) -                goto out; -        if (JNL_IS_API_DISCONNECTED (jnl)) { -                errno = ENOTCONN; -                goto out; +    int tracker_fd = 0; +    size_t off = 0; +    xlator_t *this = NULL; +    size_t nr_entries = 0; +    gf_changelog_journal_t *jnl = NULL; +    struct dirent *entry = NULL; +    struct dirent scratch[2] = { +        { +            0, +        }, +    }; +    char buffer[PATH_MAX] = { +        0, +    }; + +    this = THIS; +    if (!this) +        goto out; + +    jnl = (gf_changelog_journal_t *)GF_CHANGELOG_GET_API_PTR(this); +    if (!jnl) +        goto out; +    if (JNL_IS_API_DISCONNECTED(jnl)) { +        errno = ENOTCONN; +        goto out; +    } + +    errno = EINVAL; + +    tracker_fd = jnl->jnl_fd; +    if (gf_ftruncate(tracker_fd, 0)) +        goto out; + +    rewinddir(jnl->jnl_dir); + +    for (;;) { +        errno = 0; +        entry = sys_readdir(jnl->jnl_dir, scratch); +        if (!entry || errno != 0) +            break; + +        if (!strcmp(basename(entry->d_name), ".") || +            !strcmp(basename(entry->d_name), "..")) +            continue; + +        nr_entries++; + +        GF_CHANGELOG_FILL_BUFFER(jnl->jnl_processing_dir, buffer, off, +                                 strlen(jnl->jnl_processing_dir)); +        GF_CHANGELOG_FILL_BUFFER(entry->d_name, buffer, off, +                                 strlen(entry->d_name)); +        GF_CHANGELOG_FILL_BUFFER("\n", buffer, off, 1); + +        if (gf_changelog_write(tracker_fd, buffer, off) != off) { +            gf_msg(this->name, GF_LOG_ERROR, 0, CHANGELOG_LIB_MSG_WRITE_FAILED, +                   "error writing changelog filename" +                   " to tracker file"); +            break;          } +        off = 0; +    } -        errno = EINVAL; - -        tracker_fd = jnl->jnl_fd; -        if (gf_ftruncate (tracker_fd, 0)) -                goto out; - -        rewinddir (jnl->jnl_dir); - -        for (;;) { -                errno = 0; -                entry = sys_readdir (jnl->jnl_dir, scratch); -                if (!entry || errno != 0) -                        break; - -                if (!strcmp (basename (entry->d_name), ".") -                     || !strcmp (basename (entry->d_name), "..")) -                        continue; - -                nr_entries++; - -                GF_CHANGELOG_FILL_BUFFER (jnl->jnl_processing_dir, -                                          buffer, off, -                                          strlen (jnl->jnl_processing_dir)); -                GF_CHANGELOG_FILL_BUFFER (entry->d_name, buffer, -                                          off, strlen (entry->d_name)); -                GF_CHANGELOG_FILL_BUFFER ("\n", buffer, off, 1); - -                if (gf_changelog_write (tracker_fd, buffer, off) != off) { -                        gf_msg (this->name, GF_LOG_ERROR, 0, -                                CHANGELOG_LIB_MSG_WRITE_FAILED, -                                "error writing changelog filename" -                                " to tracker file"); -                        break; -                } -                off = 0; -        } - -        if (!entry) { -                if (gf_lseek (tracker_fd, 0, SEEK_SET) != -1) -                        return nr_entries; -        } - out: -        return -1; +    if (!entry) { +        if (gf_lseek(tracker_fd, 0, SEEK_SET) != -1) +            return nr_entries; +    } +out: +    return -1;  } diff --git a/xlators/features/changelog/lib/src/gf-changelog-helpers.c b/xlators/features/changelog/lib/src/gf-changelog-helpers.c index 9ff1d135933..fd15ec68ab8 100644 --- a/xlators/features/changelog/lib/src/gf-changelog-helpers.c +++ b/xlators/features/changelog/lib/src/gf-changelog-helpers.c @@ -13,38 +13,40 @@  #include "changelog-lib-messages.h"  #include "syscall.h" -ssize_t gf_changelog_read_path (int fd, char *buffer, size_t bufsize) +ssize_t +gf_changelog_read_path(int fd, char *buffer, size_t bufsize)  { -        return sys_read (fd, buffer, bufsize); +    return sys_read(fd, buffer, bufsize);  }  size_t -gf_changelog_write (int fd, char *buffer, size_t len) +gf_changelog_write(int fd, char *buffer, size_t len)  { -        ssize_t size = 0; -        size_t written = 0; +    ssize_t size = 0; +    size_t written = 0; -        while (written < len) { -                size = sys_write (fd, buffer + written, len - written); -                if (size <= 0) -                        break; +    while (written < len) { +        size = sys_write(fd, buffer + written, len - written); +        if (size <= 0) +            break; -                written += size; -        } +        written += size; +    } -        return written; +    return written;  }  void -gf_rfc3986_encode_space_newline (unsigned char *s, char *enc, char *estr) +gf_rfc3986_encode_space_newline(unsigned char *s, char *enc, char *estr)  { -        for (; *s; s++) { -                if (estr[*s]) -                        sprintf(enc, "%c", estr[*s]); -                else -                        sprintf(enc, "%%%02X", *s); -                while (*++enc); -        } +    for (; *s; s++) { +        if (estr[*s]) +            sprintf(enc, "%c", estr[*s]); +        else +            sprintf(enc, "%%%02X", *s); +        while (*++enc) +            ; +    }  }  /** @@ -66,154 +68,152 @@ static pthread_key_t rl_key;  static pthread_once_t rl_once = PTHREAD_ONCE_INIT;  static void -readline_destructor (void *ptr) +readline_destructor(void *ptr)  { -        GF_FREE (ptr); +    GF_FREE(ptr);  }  static void -readline_once (void) +readline_once(void)  { -        pthread_key_create (&rl_key, readline_destructor); +    pthread_key_create(&rl_key, readline_destructor);  }  static ssize_t -my_read (read_line_t *tsd, int fd, char *ptr) +my_read(read_line_t *tsd, int fd, char *ptr)  { -        if (tsd->rl_cnt <= 0) { -                tsd->rl_cnt = sys_read (fd, tsd->rl_buf, MAXLINE); - -                if (tsd->rl_cnt < 0) -                        return -1; -                else if (tsd->rl_cnt == 0) -                        return 0; -                tsd->rl_bufptr = tsd->rl_buf; -        } - -        tsd->rl_cnt--; -        *ptr = *tsd->rl_bufptr++; -        return 1; +    if (tsd->rl_cnt <= 0) { +        tsd->rl_cnt = sys_read(fd, tsd->rl_buf, MAXLINE); + +        if (tsd->rl_cnt < 0) +            return -1; +        else if (tsd->rl_cnt == 0) +            return 0; +        tsd->rl_bufptr = tsd->rl_buf; +    } + +    tsd->rl_cnt--; +    *ptr = *tsd->rl_bufptr++; +    return 1;  }  static int -gf_readline_init_once (read_line_t **tsd) +gf_readline_init_once(read_line_t **tsd)  { -        if (pthread_once (&rl_once, readline_once) != 0) -                return -1; +    if (pthread_once(&rl_once, readline_once) != 0) +        return -1; -        *tsd = pthread_getspecific (rl_key); -        if (*tsd) -                goto out; +    *tsd = pthread_getspecific(rl_key); +    if (*tsd) +        goto out; -        *tsd = GF_CALLOC (1, sizeof (**tsd), -                          gf_changelog_mt_libgfchangelog_rl_t); -        if (!*tsd) -                return -1; +    *tsd = GF_CALLOC(1, sizeof(**tsd), gf_changelog_mt_libgfchangelog_rl_t); +    if (!*tsd) +        return -1; -        if (pthread_setspecific (rl_key, *tsd) != 0) -                return -1; +    if (pthread_setspecific(rl_key, *tsd) != 0) +        return -1; - out: -        return 0; +out: +    return 0;  }  ssize_t -gf_readline (int fd, void *vptr, size_t maxlen) +gf_readline(int fd, void *vptr, size_t maxlen)  { -        size_t       n   = 0; -        size_t       rc  = 0; -        char         c   = ' '; -        char        *ptr = NULL; -        read_line_t *tsd = NULL; - -        if (gf_readline_init_once (&tsd)) -                return -1; - -        ptr = vptr; -        for (n = 1; n < maxlen; n++) { -                if ( (rc = my_read (tsd, fd, &c)) == 1 ) { -                        *ptr++ = c; -                        if (c == '\n') -                                break; -                } else if (rc == 0) { -                        *ptr = '\0'; -                        return (n - 1); -                } else -                        return -1; -        } - -        *ptr = '\0'; -        return n; +    size_t n = 0; +    size_t rc = 0; +    char c = ' '; +    char *ptr = NULL; +    read_line_t *tsd = NULL; + +    if (gf_readline_init_once(&tsd)) +        return -1; +    ptr = vptr; +    for (n = 1; n < maxlen; n++) { +        if ((rc = my_read(tsd, fd, &c)) == 1) { +            *ptr++ = c; +            if (c == '\n') +                break; +        } else if (rc == 0) { +            *ptr = '\0'; +            return (n - 1); +        } else +            return -1; +    } + +    *ptr = '\0'; +    return n;  }  off_t -gf_lseek (int fd, off_t offset, int whence) +gf_lseek(int fd, off_t offset, int whence)  { -        off_t        off = 0; -        read_line_t *tsd = NULL; +    off_t off = 0; +    read_line_t *tsd = NULL; -        if (gf_readline_init_once (&tsd)) -                return -1; +    if (gf_readline_init_once(&tsd)) +        return -1; -        off = sys_lseek (fd, offset, whence); -        if (off == -1) -                return -1; +    off = sys_lseek(fd, offset, whence); +    if (off == -1) +        return -1; -        tsd->rl_cnt = 0; -        tsd->rl_bufptr = tsd->rl_buf; +    tsd->rl_cnt = 0; +    tsd->rl_bufptr = tsd->rl_buf; -        return off; +    return off;  }  int -gf_ftruncate (int fd, off_t length) +gf_ftruncate(int fd, off_t length)  { -        read_line_t *tsd = NULL; +    read_line_t *tsd = NULL; -        if (gf_readline_init_once (&tsd)) -                return -1; +    if (gf_readline_init_once(&tsd)) +        return -1; -        if (sys_ftruncate (fd, 0)) -                return -1; +    if (sys_ftruncate(fd, 0)) +        return -1; -        tsd->rl_cnt = 0; -        tsd->rl_bufptr = tsd->rl_buf; +    tsd->rl_cnt = 0; +    tsd->rl_bufptr = tsd->rl_buf; -        return 0; +    return 0;  }  int -gf_thread_cleanup (xlator_t *this, pthread_t thread) +gf_thread_cleanup(xlator_t *this, pthread_t thread)  { -        int ret = 0; -        void *res = NULL; - -        ret = pthread_cancel (thread); -        if (ret != 0) { -                gf_msg (this->name, GF_LOG_WARNING, 0, -                        CHANGELOG_LIB_MSG_THREAD_CLEANUP_WARNING, -                        "Failed to send cancellation to thread"); -                goto error_return; -        } - -        ret = pthread_join (thread, &res); -        if (ret != 0) { -                gf_msg (this->name, GF_LOG_WARNING, 0, -                        CHANGELOG_LIB_MSG_THREAD_CLEANUP_WARNING, -                        "failed to join thread"); -                goto error_return; -        } - -        if (res != PTHREAD_CANCELED) { -                gf_msg (this->name, GF_LOG_WARNING, 0, -                        CHANGELOG_LIB_MSG_THREAD_CLEANUP_WARNING, -                        "Thread could not be cleaned up"); -                goto error_return; -        } - -        return 0; - - error_return: -        return -1; +    int ret = 0; +    void *res = NULL; + +    ret = pthread_cancel(thread); +    if (ret != 0) { +        gf_msg(this->name, GF_LOG_WARNING, 0, +               CHANGELOG_LIB_MSG_THREAD_CLEANUP_WARNING, +               "Failed to send cancellation to thread"); +        goto error_return; +    } + +    ret = pthread_join(thread, &res); +    if (ret != 0) { +        gf_msg(this->name, GF_LOG_WARNING, 0, +               CHANGELOG_LIB_MSG_THREAD_CLEANUP_WARNING, +               "failed to join thread"); +        goto error_return; +    } + +    if (res != PTHREAD_CANCELED) { +        gf_msg(this->name, GF_LOG_WARNING, 0, +               CHANGELOG_LIB_MSG_THREAD_CLEANUP_WARNING, +               "Thread could not be cleaned up"); +        goto error_return; +    } + +    return 0; + +error_return: +    return -1;  } diff --git a/xlators/features/changelog/lib/src/gf-changelog-journal-handler.c b/xlators/features/changelog/lib/src/gf-changelog-journal-handler.c index bdb410030f6..ef46bf50c97 100644 --- a/xlators/features/changelog/lib/src/gf-changelog-journal-handler.c +++ b/xlators/features/changelog/lib/src/gf-changelog-journal-handler.c @@ -25,112 +25,107 @@  extern int byebye; -enum changelog_versions { -    VERSION_1_1 = 0, -    VERSION_1_2 = 1 -}; +enum changelog_versions { VERSION_1_1 = 0, VERSION_1_2 = 1 };  /**   * number of gfid records after fop number   */ -int nr_gfids[2][GF_FOP_MAXVALUE] = { -    { -        [GF_FOP_MKNOD]   = 1, -        [GF_FOP_MKDIR]   = 1, -        [GF_FOP_UNLINK]  = 1, -        [GF_FOP_RMDIR]   = 1, -        [GF_FOP_SYMLINK] = 1, -        [GF_FOP_RENAME]  = 2, -        [GF_FOP_LINK]    = 1, -        [GF_FOP_CREATE]  = 1, -    }, -    { -        [GF_FOP_MKNOD]   = 1, -        [GF_FOP_MKDIR]   = 1, -        [GF_FOP_UNLINK]  = 2, -        [GF_FOP_RMDIR]   = 2, -        [GF_FOP_SYMLINK] = 1, -        [GF_FOP_RENAME]  = 2, -        [GF_FOP_LINK]    = 1, -        [GF_FOP_CREATE]  = 1, -    } -}; - -int nr_extra_recs[2][GF_FOP_MAXVALUE] = { -    { -        [GF_FOP_MKNOD]   = 3, -        [GF_FOP_MKDIR]   = 3, -        [GF_FOP_UNLINK]  = 0, -        [GF_FOP_RMDIR]   = 0, -        [GF_FOP_SYMLINK] = 0, -        [GF_FOP_RENAME]  = 0, -        [GF_FOP_LINK]    = 0, -        [GF_FOP_CREATE]  = 3, -    }, -    { -        [GF_FOP_MKNOD]   = 3, -        [GF_FOP_MKDIR]   = 3, -        [GF_FOP_UNLINK]  = 0, -        [GF_FOP_RMDIR]   = 0, -        [GF_FOP_SYMLINK] = 0, -        [GF_FOP_RENAME]  = 0, -        [GF_FOP_LINK]    = 0, -        [GF_FOP_CREATE]  = 3, -    } -}; +int nr_gfids[2][GF_FOP_MAXVALUE] = {{ +                                        [GF_FOP_MKNOD] = 1, +                                        [GF_FOP_MKDIR] = 1, +                                        [GF_FOP_UNLINK] = 1, +                                        [GF_FOP_RMDIR] = 1, +                                        [GF_FOP_SYMLINK] = 1, +                                        [GF_FOP_RENAME] = 2, +                                        [GF_FOP_LINK] = 1, +                                        [GF_FOP_CREATE] = 1, +                                    }, +                                    { +                                        [GF_FOP_MKNOD] = 1, +                                        [GF_FOP_MKDIR] = 1, +                                        [GF_FOP_UNLINK] = 2, +                                        [GF_FOP_RMDIR] = 2, +                                        [GF_FOP_SYMLINK] = 1, +                                        [GF_FOP_RENAME] = 2, +                                        [GF_FOP_LINK] = 1, +                                        [GF_FOP_CREATE] = 1, +                                    }}; + +int nr_extra_recs[2][GF_FOP_MAXVALUE] = {{ +                                             [GF_FOP_MKNOD] = 3, +                                             [GF_FOP_MKDIR] = 3, +                                             [GF_FOP_UNLINK] = 0, +                                             [GF_FOP_RMDIR] = 0, +                                             [GF_FOP_SYMLINK] = 0, +                                             [GF_FOP_RENAME] = 0, +                                             [GF_FOP_LINK] = 0, +                                             [GF_FOP_CREATE] = 3, +                                         }, +                                         { +                                             [GF_FOP_MKNOD] = 3, +                                             [GF_FOP_MKDIR] = 3, +                                             [GF_FOP_UNLINK] = 0, +                                             [GF_FOP_RMDIR] = 0, +                                             [GF_FOP_SYMLINK] = 0, +                                             [GF_FOP_RENAME] = 0, +                                             [GF_FOP_LINK] = 0, +                                             [GF_FOP_CREATE] = 3, +                                         }};  static char * -binary_to_ascii (uuid_t uuid) +binary_to_ascii(uuid_t uuid)  { -        return uuid_utoa (uuid); +    return uuid_utoa(uuid);  }  static char * -conv_noop (char *ptr) { return ptr; } - -#define VERIFY_SEPARATOR(ptr, plen, perr)       \ -        {                                       \ -                if (*(ptr + plen) != '\0') {    \ -                        perr = 1;               \ -                        break;                  \ -                }                               \ -        } +conv_noop(char *ptr) +{ +    return ptr; +} -#define MOVER_MOVE(mover, nleft, bytes)         \ -        {                                       \ -                mover += bytes;                 \ -                nleft -= bytes;                 \ -        }                                       \ - -#define PARSE_GFID(mov, ptr, le, fn, perr)                      \ -        {                                                       \ -                VERIFY_SEPARATOR (mov, le, perr);               \ -                ptr = fn (mov);                                 \ -                if (!ptr) {                                     \ -                        perr = 1;                               \ -                        break;                                  \ -                }                                               \ -        } +#define VERIFY_SEPARATOR(ptr, plen, perr)                                      \ +    {                                                                          \ +        if (*(ptr + plen) != '\0') {                                           \ +            perr = 1;                                                          \ +            break;                                                             \ +        }                                                                      \ +    } -#define FILL_AND_MOVE(pt, buf, of, mo, nl, le)                          \ -        {                                                               \ -                GF_CHANGELOG_FILL_BUFFER (pt, buf, of, strlen (pt));    \ -                MOVER_MOVE (mo, nl, le);                                \ -        } +#define MOVER_MOVE(mover, nleft, bytes)                                        \ +    {                                                                          \ +        mover += bytes;                                                        \ +        nleft -= bytes;                                                        \ +    } + +#define PARSE_GFID(mov, ptr, le, fn, perr)                                     \ +    {                                                                          \ +        VERIFY_SEPARATOR(mov, le, perr);                                       \ +        ptr = fn(mov);                                                         \ +        if (!ptr) {                                                            \ +            perr = 1;                                                          \ +            break;                                                             \ +        }                                                                      \ +    } +#define FILL_AND_MOVE(pt, buf, of, mo, nl, le)                                 \ +    {                                                                          \ +        GF_CHANGELOG_FILL_BUFFER(pt, buf, of, strlen(pt));                     \ +        MOVER_MOVE(mo, nl, le);                                                \ +    } -#define PARSE_GFID_MOVE(ptr, uuid, mover, nleft, perr)          \ -        {                                                       \ -                memcpy (uuid, mover, sizeof (uuid_t));          \ -                ptr = binary_to_ascii (uuid);                   \ -                if (!ptr) {                                     \ -                        perr = 1;                               \ -                        break;                                  \ -                }                                               \ -                MOVER_MOVE (mover, nleft, sizeof (uuid_t));     \ -        }                                                       \ +#define PARSE_GFID_MOVE(ptr, uuid, mover, nleft, perr)                         \ +    {                                                                          \ +        memcpy(uuid, mover, sizeof(uuid_t));                                   \ +        ptr = binary_to_ascii(uuid);                                           \ +        if (!ptr) {                                                            \ +            perr = 1;                                                          \ +            break;                                                             \ +        }                                                                      \ +        MOVER_MOVE(mover, nleft, sizeof(uuid_t));                              \ +    } -#define LINE_BUFSIZE  (3*PATH_MAX) /* enough buffer for extra chars too */ +#define LINE_BUFSIZE (3 * PATH_MAX) /* enough buffer for extra chars too */  /**   * using mmap() makes parsing easy. fgets() cannot be used here as @@ -145,111 +140,107 @@ conv_noop (char *ptr) { return ptr; }   */  static int -gf_changelog_parse_binary (xlator_t *this, -                           gf_changelog_journal_t *jnl, -                           int from_fd, int to_fd, -                           size_t start_offset, struct stat *stbuf, -                           int version_idx) +gf_changelog_parse_binary(xlator_t *this, gf_changelog_journal_t *jnl, +                          int from_fd, int to_fd, size_t start_offset, +                          struct stat *stbuf, int version_idx)  { -        int     ret              = -1; -        off_t   off              = 0; -        off_t   nleft            = 0; -        uuid_t  uuid             = {0,}; -        char   *ptr              = NULL; -        char   *bname_start      = NULL; -        char   *bname_end        = NULL; -        char   *mover            = NULL; -        void   *start            = NULL; -        char    current_mover    = ' '; -        size_t  blen             = 0; -        int     parse_err        = 0; -        char   *ascii            = NULL; - -        ascii = GF_CALLOC (LINE_BUFSIZE, sizeof(char), gf_common_mt_char); - -        nleft = stbuf->st_size; - -        start = mmap (NULL, nleft, PROT_READ, MAP_PRIVATE, from_fd, 0); -        if (start == MAP_FAILED) { -                gf_msg (this->name, GF_LOG_ERROR, errno, -                        CHANGELOG_LIB_MSG_MMAP_FAILED, -                        "mmap() error"); -                goto out; -        } - -        mover = start; +    int ret = -1; +    off_t off = 0; +    off_t nleft = 0; +    uuid_t uuid = { +        0, +    }; +    char *ptr = NULL; +    char *bname_start = NULL; +    char *bname_end = NULL; +    char *mover = NULL; +    void *start = NULL; +    char current_mover = ' '; +    size_t blen = 0; +    int parse_err = 0; +    char *ascii = NULL; + +    ascii = GF_CALLOC(LINE_BUFSIZE, sizeof(char), gf_common_mt_char); + +    nleft = stbuf->st_size; + +    start = mmap(NULL, nleft, PROT_READ, MAP_PRIVATE, from_fd, 0); +    if (start == MAP_FAILED) { +        gf_msg(this->name, GF_LOG_ERROR, errno, CHANGELOG_LIB_MSG_MMAP_FAILED, +               "mmap() error"); +        goto out; +    } -        MOVER_MOVE (mover, nleft, start_offset); +    mover = start; -        while (nleft > 0) { +    MOVER_MOVE(mover, nleft, start_offset); -                off = blen = 0; -                ptr = bname_start = bname_end = NULL; +    while (nleft > 0) { +        off = blen = 0; +        ptr = bname_start = bname_end = NULL; -                current_mover = *mover; +        current_mover = *mover; -                switch (current_mover) { -                case 'D': -                case 'M': -                        MOVER_MOVE (mover, nleft, 1); -                        PARSE_GFID_MOVE (ptr, uuid, mover, nleft, parse_err); +        switch (current_mover) { +            case 'D': +            case 'M': +                MOVER_MOVE(mover, nleft, 1); +                PARSE_GFID_MOVE(ptr, uuid, mover, nleft, parse_err); -                        break; +                break; -                case 'E': -                        MOVER_MOVE (mover, nleft, 1); -                        PARSE_GFID_MOVE (ptr, uuid, mover, nleft, parse_err); +            case 'E': +                MOVER_MOVE(mover, nleft, 1); +                PARSE_GFID_MOVE(ptr, uuid, mover, nleft, parse_err); -                        bname_start = mover; -                        bname_end = strchr (mover, '\n'); -                        if (bname_end == NULL) { -                                parse_err = 1; -                                break; -                        } +                bname_start = mover; +                bname_end = strchr(mover, '\n'); +                if (bname_end == NULL) { +                    parse_err = 1; +                    break; +                } -                        blen = bname_end - bname_start; -                        MOVER_MOVE (mover, nleft, blen); +                blen = bname_end - bname_start; +                MOVER_MOVE(mover, nleft, blen); -                        break; +                break; -                default: -                        parse_err = 1; -                } +            default: +                parse_err = 1; +        } -                if (parse_err) -                        break; +        if (parse_err) +            break; + +        GF_CHANGELOG_FILL_BUFFER(¤t_mover, ascii, off, 1); +        GF_CHANGELOG_FILL_BUFFER(" ", ascii, off, 1); +        GF_CHANGELOG_FILL_BUFFER(ptr, ascii, off, strlen(ptr)); +        if (blen) +            GF_CHANGELOG_FILL_BUFFER(bname_start, ascii, off, blen); +        GF_CHANGELOG_FILL_BUFFER("\n", ascii, off, 1); + +        if (gf_changelog_write(to_fd, ascii, off) != off) { +            gf_msg(this->name, GF_LOG_ERROR, errno, +                   CHANGELOG_LIB_MSG_ASCII_ERROR, +                   "processing binary changelog failed due to " +                   " error in writing ascii change"); +            break; +        } -                GF_CHANGELOG_FILL_BUFFER (¤t_mover, ascii, off, 1); -                GF_CHANGELOG_FILL_BUFFER (" ", ascii, off, 1); -                GF_CHANGELOG_FILL_BUFFER (ptr, ascii, off, strlen (ptr)); -                if (blen) -                        GF_CHANGELOG_FILL_BUFFER (bname_start, -                                                  ascii, off, blen); -                GF_CHANGELOG_FILL_BUFFER ("\n", ascii, off, 1); - -                if (gf_changelog_write (to_fd, ascii, off) != off) { -                        gf_msg (this->name, GF_LOG_ERROR, errno, -                                CHANGELOG_LIB_MSG_ASCII_ERROR, -                                "processing binary changelog failed due to " -                                " error in writing ascii change"); -                        break; -                } +        MOVER_MOVE(mover, nleft, 1); +    } -                MOVER_MOVE (mover, nleft, 1); -        } +    if ((nleft == 0) && (!parse_err)) +        ret = 0; -        if ((nleft == 0) && (!parse_err)) -                ret = 0; - -        if (munmap (start, stbuf->st_size)) -                gf_msg (this->name, GF_LOG_ERROR, errno, -                        CHANGELOG_LIB_MSG_MUNMAP_FAILED, -                        "munmap() error"); - out: -        if (ascii) -                GF_FREE (ascii); -        return ret; +    if (munmap(start, stbuf->st_size)) +        gf_msg(this->name, GF_LOG_ERROR, errno, CHANGELOG_LIB_MSG_MUNMAP_FAILED, +               "munmap() error"); +out: +    if (ascii) +        GF_FREE(ascii); +    return ret;  }  /** @@ -258,804 +249,784 @@ gf_changelog_parse_binary (xlator_t *this,   *  - use fop name rather than fop number   */  static int -gf_changelog_parse_ascii (xlator_t *this, -                          gf_changelog_journal_t *jnl, -                          int from_fd, int to_fd, -                          size_t start_offset, struct stat *stbuf, -                          int version_idx) +gf_changelog_parse_ascii(xlator_t *this, gf_changelog_journal_t *jnl, +                         int from_fd, int to_fd, size_t start_offset, +                         struct stat *stbuf, int version_idx)  { -        int           ng            = 0; -        int           ret           = -1; -        int           fop           = 0; -        int           len           = 0; -        off_t         off           = 0; -        off_t         nleft         = 0; -        char         *ptr           = NULL; -        char         *eptr          = NULL; -        void         *start         = NULL; -        char         *mover         = NULL; -        int           parse_err     = 0; -        char          current_mover = ' '; -        char         *ascii         = NULL; -        const char   *fopname       = NULL; - -        ascii = GF_CALLOC (LINE_BUFSIZE, sizeof(char), gf_common_mt_char); - -        nleft = stbuf->st_size; - -        start = mmap (NULL, nleft, PROT_READ, MAP_PRIVATE, from_fd, 0); -        if (start == MAP_FAILED) { -                gf_msg (this->name, GF_LOG_ERROR, errno, -                        CHANGELOG_LIB_MSG_MMAP_FAILED, -                        "mmap() error"); -                goto out; -        } +    int ng = 0; +    int ret = -1; +    int fop = 0; +    int len = 0; +    off_t off = 0; +    off_t nleft = 0; +    char *ptr = NULL; +    char *eptr = NULL; +    void *start = NULL; +    char *mover = NULL; +    int parse_err = 0; +    char current_mover = ' '; +    char *ascii = NULL; +    const char *fopname = NULL; + +    ascii = GF_CALLOC(LINE_BUFSIZE, sizeof(char), gf_common_mt_char); + +    nleft = stbuf->st_size; + +    start = mmap(NULL, nleft, PROT_READ, MAP_PRIVATE, from_fd, 0); +    if (start == MAP_FAILED) { +        gf_msg(this->name, GF_LOG_ERROR, errno, CHANGELOG_LIB_MSG_MMAP_FAILED, +               "mmap() error"); +        goto out; +    } -        mover = start; +    mover = start; -        MOVER_MOVE (mover, nleft, start_offset); +    MOVER_MOVE(mover, nleft, start_offset); -        while (nleft > 0) { -                off = 0; -                current_mover = *mover; +    while (nleft > 0) { +        off = 0; +        current_mover = *mover; -                GF_CHANGELOG_FILL_BUFFER (¤t_mover, ascii, off, 1); -                GF_CHANGELOG_FILL_BUFFER (" ", ascii, off, 1); +        GF_CHANGELOG_FILL_BUFFER(¤t_mover, ascii, off, 1); +        GF_CHANGELOG_FILL_BUFFER(" ", ascii, off, 1); -                switch (current_mover) { -                case 'D': -                        MOVER_MOVE (mover, nleft, 1); +        switch (current_mover) { +            case 'D': +                MOVER_MOVE(mover, nleft, 1); -                        /* target gfid */ -                        PARSE_GFID (mover, ptr, UUID_CANONICAL_FORM_LEN, -                                    conv_noop, parse_err); -                        FILL_AND_MOVE(ptr, ascii, off, -                                      mover, nleft, UUID_CANONICAL_FORM_LEN); -                        break; -                case 'M': -                        MOVER_MOVE (mover, nleft, 1); +                /* target gfid */ +                PARSE_GFID(mover, ptr, UUID_CANONICAL_FORM_LEN, conv_noop, +                           parse_err); +                FILL_AND_MOVE(ptr, ascii, off, mover, nleft, +                              UUID_CANONICAL_FORM_LEN); +                break; +            case 'M': +                MOVER_MOVE(mover, nleft, 1); + +                /* target gfid */ +                PARSE_GFID(mover, ptr, UUID_CANONICAL_FORM_LEN, conv_noop, +                           parse_err); +                FILL_AND_MOVE(ptr, ascii, off, mover, nleft, +                              UUID_CANONICAL_FORM_LEN); +                FILL_AND_MOVE(" ", ascii, off, mover, nleft, 1); + +                /* fop */ +                len = strlen(mover); +                VERIFY_SEPARATOR(mover, len, parse_err); + +                fop = atoi(mover); +                fopname = gf_fop_list[fop]; +                if (fopname == NULL) { +                    parse_err = 1; +                    break; +                } -                        /* target gfid */ -                        PARSE_GFID (mover, ptr, UUID_CANONICAL_FORM_LEN, -                                    conv_noop, parse_err); -                        FILL_AND_MOVE (ptr, ascii, off, -                                       mover, nleft, UUID_CANONICAL_FORM_LEN); -                        FILL_AND_MOVE (" ", ascii, off, mover, nleft, 1); +                MOVER_MOVE(mover, nleft, len); -                        /* fop */ -                        len = strlen (mover); -                        VERIFY_SEPARATOR (mover, len, parse_err); +                len = strlen(fopname); +                GF_CHANGELOG_FILL_BUFFER(fopname, ascii, off, len); -                        fop = atoi (mover); -                        fopname = gf_fop_list[fop]; -                        if (fopname == NULL) { -                                parse_err = 1; -                                break; -                        } +                break; -                        MOVER_MOVE (mover, nleft, len); +            case 'E': +                MOVER_MOVE(mover, nleft, 1); + +                /* target gfid */ +                PARSE_GFID(mover, ptr, UUID_CANONICAL_FORM_LEN, conv_noop, +                           parse_err); +                FILL_AND_MOVE(ptr, ascii, off, mover, nleft, +                              UUID_CANONICAL_FORM_LEN); +                FILL_AND_MOVE(" ", ascii, off, mover, nleft, 1); + +                /* fop */ +                len = strlen(mover); +                VERIFY_SEPARATOR(mover, len, parse_err); + +                fop = atoi(mover); +                fopname = gf_fop_list[fop]; +                if (fopname == NULL) { +                    parse_err = 1; +                    break; +                } -                        len = strlen (fopname); -                        GF_CHANGELOG_FILL_BUFFER (fopname, ascii, off, len); +                MOVER_MOVE(mover, nleft, len); -                        break; +                len = strlen(fopname); +                GF_CHANGELOG_FILL_BUFFER(fopname, ascii, off, len); -                case 'E': -                        MOVER_MOVE (mover, nleft, 1); - -                        /* target gfid */ -                        PARSE_GFID (mover, ptr, UUID_CANONICAL_FORM_LEN, -                                    conv_noop, parse_err); -                        FILL_AND_MOVE (ptr, ascii, off, -                                       mover, nleft, UUID_CANONICAL_FORM_LEN); -                        FILL_AND_MOVE (" ", ascii, off, -                                       mover, nleft, 1); - -                        /* fop */ -                        len = strlen (mover); -                        VERIFY_SEPARATOR (mover, len, parse_err); - -                        fop = atoi (mover); -                        fopname = gf_fop_list[fop]; -                        if (fopname == NULL) { -                                parse_err = 1; -                                break; -                        } - -                        MOVER_MOVE (mover, nleft, len); - -                        len = strlen (fopname); -                        GF_CHANGELOG_FILL_BUFFER (fopname, ascii, off, len); - -                        ng = nr_extra_recs[version_idx][fop]; -                        for (; ng > 0; ng--) { -                                MOVER_MOVE (mover, nleft, 1); -                                len = strlen (mover); -                                VERIFY_SEPARATOR (mover, len, parse_err); - -                                GF_CHANGELOG_FILL_BUFFER (" ", ascii, off, 1); -                                FILL_AND_MOVE (mover, ascii, -                                               off, mover, nleft, len); -                        } - -                        /* pargfid + bname */ -                        ng = nr_gfids[version_idx][fop]; -                        while (ng-- > 0) { -                                MOVER_MOVE (mover, nleft, 1); -                                len = strlen (mover); -                                if (!len) { -                                        MOVER_MOVE (mover, nleft, 1); -                                        continue; -                                } - -                                GF_CHANGELOG_FILL_BUFFER (" ", ascii, off, 1); - -                                PARSE_GFID (mover, ptr, len, -                                            conv_noop, parse_err); -                                eptr = calloc (3, strlen (ptr)); -                                if (!eptr) { -                                        parse_err = 1; -                                        break; -                                } - -                                gf_rfc3986_encode_space_newline ( -                                                   (unsigned char *) ptr, -                                                   eptr, -                                                   jnl->rfc3986_space_newline); -                                FILL_AND_MOVE (eptr, ascii, off, -                                               mover, nleft, len); -                                free (eptr); -                        } +                ng = nr_extra_recs[version_idx][fop]; +                for (; ng > 0; ng--) { +                    MOVER_MOVE(mover, nleft, 1); +                    len = strlen(mover); +                    VERIFY_SEPARATOR(mover, len, parse_err); -                        break; -                default: -                        parse_err = 1; +                    GF_CHANGELOG_FILL_BUFFER(" ", ascii, off, 1); +                    FILL_AND_MOVE(mover, ascii, off, mover, nleft, len);                  } -                if (parse_err) +                /* pargfid + bname */ +                ng = nr_gfids[version_idx][fop]; +                while (ng-- > 0) { +                    MOVER_MOVE(mover, nleft, 1); +                    len = strlen(mover); +                    if (!len) { +                        MOVER_MOVE(mover, nleft, 1); +                        continue; +                    } + +                    GF_CHANGELOG_FILL_BUFFER(" ", ascii, off, 1); + +                    PARSE_GFID(mover, ptr, len, conv_noop, parse_err); +                    eptr = calloc(3, strlen(ptr)); +                    if (!eptr) { +                        parse_err = 1;                          break; +                    } -                GF_CHANGELOG_FILL_BUFFER ("\n", ascii, off, 1); - -                if (gf_changelog_write (to_fd, ascii, off) != off) { -                        gf_msg (this->name, GF_LOG_ERROR, errno, -                                CHANGELOG_LIB_MSG_ASCII_ERROR, -                                "processing ascii changelog failed due to " -                                " error in writing change"); -                        break; +                    gf_rfc3986_encode_space_newline((unsigned char *)ptr, eptr, +                                                    jnl->rfc3986_space_newline); +                    FILL_AND_MOVE(eptr, ascii, off, mover, nleft, len); +                    free(eptr);                  } -                MOVER_MOVE (mover, nleft, 1); +                break; +            default: +                parse_err = 1; +        } + +        if (parse_err) +            break; + +        GF_CHANGELOG_FILL_BUFFER("\n", ascii, off, 1); +        if (gf_changelog_write(to_fd, ascii, off) != off) { +            gf_msg(this->name, GF_LOG_ERROR, errno, +                   CHANGELOG_LIB_MSG_ASCII_ERROR, +                   "processing ascii changelog failed due to " +                   " error in writing change"); +            break;          } -        if ((nleft == 0) && (!parse_err)) -                ret = 0; +        MOVER_MOVE(mover, nleft, 1); +    } + +    if ((nleft == 0) && (!parse_err)) +        ret = 0; -        if (munmap (start, stbuf->st_size)) -                gf_msg (this->name, GF_LOG_ERROR, errno, -                        CHANGELOG_LIB_MSG_MUNMAP_FAILED, -                        "munmap() error"); +    if (munmap(start, stbuf->st_size)) +        gf_msg(this->name, GF_LOG_ERROR, errno, CHANGELOG_LIB_MSG_MUNMAP_FAILED, +               "munmap() error"); - out: -        if (ascii) -                GF_FREE (ascii); +out: +    if (ascii) +        GF_FREE(ascii); -        return ret; +    return ret;  }  static int -gf_changelog_decode (xlator_t *this, gf_changelog_journal_t *jnl, -                     int from_fd, int to_fd, struct stat *stbuf, int *zerob) +gf_changelog_decode(xlator_t *this, gf_changelog_journal_t *jnl, int from_fd, +                    int to_fd, struct stat *stbuf, int *zerob)  { -        int    ret        = -1; -        int    encoding   = -1; -        int major_version = -1; -        int minor_version = -1; -        int version_idx   = -1; -        size_t elen       = 0; -        char buffer[1024] = {0,}; - -        CHANGELOG_GET_HEADER_INFO (from_fd, buffer, sizeof (buffer), encoding, -                                   major_version, minor_version, elen); -        if (encoding == -1) /* unknown encoding */ -                goto out; - -        if (major_version == -1) /* unknown major version */ -                goto out; - -        if (minor_version == -1) /* unknown minor version */ -                goto out; - -        if (!CHANGELOG_VALID_ENCODING (encoding)) -                goto out; - -        if (elen == stbuf->st_size) { -                *zerob = 1; -                goto out; -        } +    int ret = -1; +    int encoding = -1; +    int major_version = -1; +    int minor_version = -1; +    int version_idx = -1; +    size_t elen = 0; +    char buffer[1024] = { +        0, +    }; + +    CHANGELOG_GET_HEADER_INFO(from_fd, buffer, sizeof(buffer), encoding, +                              major_version, minor_version, elen); +    if (encoding == -1) /* unknown encoding */ +        goto out; + +    if (major_version == -1) /* unknown major version */ +        goto out; + +    if (minor_version == -1) /* unknown minor version */ +        goto out; + +    if (!CHANGELOG_VALID_ENCODING(encoding)) +        goto out; + +    if (elen == stbuf->st_size) { +        *zerob = 1; +        goto out; +    } -        if (major_version == 1 && minor_version == 1) { -                version_idx = VERSION_1_1; -        } else if (major_version == 1 && minor_version == 2) { -                version_idx = VERSION_1_2; -        } +    if (major_version == 1 && minor_version == 1) { +        version_idx = VERSION_1_1; +    } else if (major_version == 1 && minor_version == 2) { +        version_idx = VERSION_1_2; +    } -        if (version_idx == -1) /* unknown version number */ -                goto out; +    if (version_idx == -1) /* unknown version number */ +        goto out; -        /** -         * start processing after the header -         */ -        if (sys_lseek (from_fd, elen, SEEK_SET) < 0) { -                goto out; -        } -        switch (encoding) { +    /** +     * start processing after the header +     */ +    if (sys_lseek(from_fd, elen, SEEK_SET) < 0) { +        goto out; +    } +    switch (encoding) {          case CHANGELOG_ENCODE_BINARY: -                /** -                 * this ideally should have been a part of changelog-encoders.c -                 * (ie. part of the changelog translator). -                 */ -                ret = gf_changelog_parse_binary (this, jnl, from_fd, -                                                 to_fd, elen, stbuf, -                                                 version_idx); -                break; +            /** +             * this ideally should have been a part of changelog-encoders.c +             * (ie. part of the changelog translator). +             */ +            ret = gf_changelog_parse_binary(this, jnl, from_fd, to_fd, elen, +                                            stbuf, version_idx); +            break;          case CHANGELOG_ENCODE_ASCII: -                ret = gf_changelog_parse_ascii (this, jnl, from_fd, -                                                to_fd, elen, stbuf, -                                                version_idx); -                break; -        } +            ret = gf_changelog_parse_ascii(this, jnl, from_fd, to_fd, elen, +                                           stbuf, version_idx); +            break; +    }  out: -        return ret; +    return ret;  }  int -gf_changelog_publish (xlator_t *this, -                      gf_changelog_journal_t *jnl, char *from_path) +gf_changelog_publish(xlator_t *this, gf_changelog_journal_t *jnl, +                     char *from_path)  { -        int         ret        = 0; -        char dest[PATH_MAX]    = {0,}; -        char to_path[PATH_MAX] = {0,}; -        struct stat stbuf      = {0,}; - -        if (snprintf (to_path, PATH_MAX, "%s%s", jnl->jnl_current_dir, -                      basename (from_path)) >= PATH_MAX) -	        return -1; - -        /* handle zerob file that won't exist in current */ -        ret = sys_stat (to_path, &stbuf); -        if (ret) { -                if (errno == ENOENT) -                        ret = 0; -                goto out; -        } +    int ret = 0; +    char dest[PATH_MAX] = { +        0, +    }; +    char to_path[PATH_MAX] = { +        0, +    }; +    struct stat stbuf = { +        0, +    }; + +    if (snprintf(to_path, PATH_MAX, "%s%s", jnl->jnl_current_dir, +                 basename(from_path)) >= PATH_MAX) +        return -1; -        if (snprintf (dest, PATH_MAX, "%s%s", jnl->jnl_processing_dir, -                      basename (from_path)) >= PATH_MAX) -		return -1; - -        ret = sys_rename (to_path, dest); -        if (ret) { -                gf_smsg (this->name, GF_LOG_ERROR, errno, -                         CHANGELOG_LIB_MSG_RENAME_FAILED, -                         "error moving changelog to processing dir", -                         "path=%s", to_path, -                         NULL); -        } +    /* handle zerob file that won't exist in current */ +    ret = sys_stat(to_path, &stbuf); +    if (ret) { +        if (errno == ENOENT) +            ret = 0; +        goto out; +    } + +    if (snprintf(dest, PATH_MAX, "%s%s", jnl->jnl_processing_dir, +                 basename(from_path)) >= PATH_MAX) +        return -1; + +    ret = sys_rename(to_path, dest); +    if (ret) { +        gf_smsg(this->name, GF_LOG_ERROR, errno, +                CHANGELOG_LIB_MSG_RENAME_FAILED, +                "error moving changelog to processing dir", "path=%s", to_path, +                NULL); +    }  out: -        return ret; +    return ret;  }  int -gf_changelog_consume (xlator_t *this, -                      gf_changelog_journal_t *jnl, -                      char *from_path, gf_boolean_t no_publish) +gf_changelog_consume(xlator_t *this, gf_changelog_journal_t *jnl, +                     char *from_path, gf_boolean_t no_publish)  { -        int         ret        = -1; -        int         fd1        = 0; -        int         fd2        = 0; -        int         zerob      = 0; -        struct stat stbuf      = {0,}; -        char dest[PATH_MAX]    = {0,}; -        char to_path[PATH_MAX] = {0,}; - -        if (snprintf (to_path, PATH_MAX, "%s%s", jnl->jnl_current_dir, -                     basename (from_path)) >= PATH_MAX) -                goto out; -        if (snprintf (dest, PATH_MAX, "%s%s", jnl->jnl_processing_dir, -                      basename (from_path)) >= PATH_MAX) -                goto out; - -        ret = sys_stat (from_path, &stbuf); -        if (ret || !S_ISREG(stbuf.st_mode)) { -                ret = -1; -                gf_smsg (this->name, GF_LOG_ERROR, errno, -                         CHANGELOG_LIB_MSG_STAT_FAILED, -                         "stat failed on changelog file", -                         "path=%s", from_path, -                         NULL); -                goto out; -        } +    int ret = -1; +    int fd1 = 0; +    int fd2 = 0; +    int zerob = 0; +    struct stat stbuf = { +        0, +    }; +    char dest[PATH_MAX] = { +        0, +    }; +    char to_path[PATH_MAX] = { +        0, +    }; + +    if (snprintf(to_path, PATH_MAX, "%s%s", jnl->jnl_current_dir, +                 basename(from_path)) >= PATH_MAX) +        goto out; +    if (snprintf(dest, PATH_MAX, "%s%s", jnl->jnl_processing_dir, +                 basename(from_path)) >= PATH_MAX) +        goto out; + +    ret = sys_stat(from_path, &stbuf); +    if (ret || !S_ISREG(stbuf.st_mode)) { +        ret = -1; +        gf_smsg(this->name, GF_LOG_ERROR, errno, CHANGELOG_LIB_MSG_STAT_FAILED, +                "stat failed on changelog file", "path=%s", from_path, NULL); +        goto out; +    } -        fd1 = open (from_path, O_RDONLY); -        if (fd1 < 0) { -                gf_smsg (this->name, GF_LOG_ERROR, errno, -                         CHANGELOG_LIB_MSG_OPEN_FAILED, -                         "cannot open changelog file", -                         "path=%s", from_path, -                         NULL); -                goto out; -        } +    fd1 = open(from_path, O_RDONLY); +    if (fd1 < 0) { +        gf_smsg(this->name, GF_LOG_ERROR, errno, CHANGELOG_LIB_MSG_OPEN_FAILED, +                "cannot open changelog file", "path=%s", from_path, NULL); +        goto out; +    } -        fd2 = open (to_path, O_CREAT | O_TRUNC | O_RDWR, -                    S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH); -        if (fd2 < 0) { -                gf_smsg (this->name, GF_LOG_ERROR, errno, -                         CHANGELOG_LIB_MSG_OPEN_FAILED, -                         "cannot create ascii changelog file", -                         "path=%s", to_path, -                         NULL); +    fd2 = open(to_path, O_CREAT | O_TRUNC | O_RDWR, +               S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH); +    if (fd2 < 0) { +        gf_smsg(this->name, GF_LOG_ERROR, errno, CHANGELOG_LIB_MSG_OPEN_FAILED, +                "cannot create ascii changelog file", "path=%s", to_path, NULL); +        goto close_fd; +    } else { +        ret = gf_changelog_decode(this, jnl, fd1, fd2, &stbuf, &zerob); + +        sys_close(fd2); + +        if (!ret) { +            /* move it to processing on a successful +               decode */ +            if (no_publish == _gf_true)                  goto close_fd; -        } else { -                ret = gf_changelog_decode (this, jnl, fd1, -                                           fd2, &stbuf, &zerob); - -                sys_close (fd2); - -                if (!ret) { -                        /* move it to processing on a successful -                           decode */ -                        if (no_publish == _gf_true) -                                goto close_fd; -                        ret = sys_rename (to_path, dest); -                        if (ret) -                                gf_smsg (this->name, GF_LOG_ERROR, errno, -                                         CHANGELOG_LIB_MSG_RENAME_FAILED, -                                         "error moving changelog to processing dir", -                                         "path=%s", to_path, -                                         NULL); -                } +            ret = sys_rename(to_path, dest); +            if (ret) +                gf_smsg(this->name, GF_LOG_ERROR, errno, +                        CHANGELOG_LIB_MSG_RENAME_FAILED, +                        "error moving changelog to processing dir", "path=%s", +                        to_path, NULL); +        } -                /* remove it from .current if it's an empty file */ -                if (zerob) { -                        /* zerob changelogs must be unlinked */ -                        ret = sys_unlink (to_path); -                        if (ret) -                                gf_smsg (this->name, GF_LOG_ERROR, errno, -                                         CHANGELOG_LIB_MSG_UNLINK_FAILED, -                                         "could not unlink empty changelog", -                                         "path=%s", to_path, -                                         NULL); -                } +        /* remove it from .current if it's an empty file */ +        if (zerob) { +            /* zerob changelogs must be unlinked */ +            ret = sys_unlink(to_path); +            if (ret) +                gf_smsg(this->name, GF_LOG_ERROR, errno, +                        CHANGELOG_LIB_MSG_UNLINK_FAILED, +                        "could not unlink empty changelog", "path=%s", to_path, +                        NULL);          } +    } - close_fd: -        sys_close (fd1); +close_fd: +    sys_close(fd1); - out: -        return ret; +out: +    return ret;  }  void * -gf_changelog_process (void *data) +gf_changelog_process(void *data)  { -        xlator_t *this = NULL; -        gf_changelog_journal_t *jnl = NULL; -        gf_changelog_entry_t *entry = NULL; -        gf_changelog_processor_t *jnl_proc = NULL; - -        jnl = data; -        jnl_proc = jnl->jnl_proc; -        THIS = jnl->this; -        this = jnl->this; - -        while (1) { -                pthread_mutex_lock (&jnl_proc->lock); -                { -                        while (list_empty (&jnl_proc->entries)) { -                                jnl_proc->waiting = _gf_true; -                                pthread_cond_wait -                                        (&jnl_proc->cond, &jnl_proc->lock); -                        } - -                        entry = list_first_entry (&jnl_proc->entries, -                                                  gf_changelog_entry_t, list); -                        if (entry) -                                list_del (&entry->list); - -                        jnl_proc->waiting = _gf_false; -                } -                pthread_mutex_unlock (&jnl_proc->lock); +    xlator_t *this = NULL; +    gf_changelog_journal_t *jnl = NULL; +    gf_changelog_entry_t *entry = NULL; +    gf_changelog_processor_t *jnl_proc = NULL; + +    jnl = data; +    jnl_proc = jnl->jnl_proc; +    THIS = jnl->this; +    this = jnl->this; + +    while (1) { +        pthread_mutex_lock(&jnl_proc->lock); +        { +            while (list_empty(&jnl_proc->entries)) { +                jnl_proc->waiting = _gf_true; +                pthread_cond_wait(&jnl_proc->cond, &jnl_proc->lock); +            } -                if (entry) { -                        (void) gf_changelog_consume (this, jnl, -                                                     entry->path, _gf_false); -                        GF_FREE (entry); -                } +            entry = list_first_entry(&jnl_proc->entries, gf_changelog_entry_t, +                                     list); +            if (entry) +                list_del(&entry->list); + +            jnl_proc->waiting = _gf_false;          } +        pthread_mutex_unlock(&jnl_proc->lock); + +        if (entry) { +            (void)gf_changelog_consume(this, jnl, entry->path, _gf_false); +            GF_FREE(entry); +        } +    } -        return NULL; +    return NULL;  }  void -gf_changelog_queue_journal (gf_changelog_processor_t *jnl_proc, -                            changelog_event_t *event) +gf_changelog_queue_journal(gf_changelog_processor_t *jnl_proc, +                           changelog_event_t *event)  { -        size_t len = 0; -        gf_changelog_entry_t *entry = NULL; +    size_t len = 0; +    gf_changelog_entry_t *entry = NULL; -        entry = GF_CALLOC (1, sizeof (gf_changelog_entry_t), -                           gf_changelog_mt_libgfchangelog_entry_t); -        if (!entry) -                return; -        INIT_LIST_HEAD (&entry->list); +    entry = GF_CALLOC(1, sizeof(gf_changelog_entry_t), +                      gf_changelog_mt_libgfchangelog_entry_t); +    if (!entry) +        return; +    INIT_LIST_HEAD(&entry->list); -        len = strlen (event->u.journal.path); -        (void)memcpy (entry->path, event->u.journal.path, len+1); -        entry->path[len] = '\0'; +    len = strlen(event->u.journal.path); +    (void)memcpy(entry->path, event->u.journal.path, len + 1); +    entry->path[len] = '\0'; -        pthread_mutex_lock (&jnl_proc->lock); -        { -                list_add_tail (&entry->list, &jnl_proc->entries); -                if (jnl_proc->waiting) -                        pthread_cond_signal (&jnl_proc->cond); -        } -        pthread_mutex_unlock (&jnl_proc->lock); +    pthread_mutex_lock(&jnl_proc->lock); +    { +        list_add_tail(&entry->list, &jnl_proc->entries); +        if (jnl_proc->waiting) +            pthread_cond_signal(&jnl_proc->cond); +    } +    pthread_mutex_unlock(&jnl_proc->lock); -        return; +    return;  }  void -gf_changelog_handle_journal (void *xl, char *brick, -                             void *cbkdata, changelog_event_t *event) +gf_changelog_handle_journal(void *xl, char *brick, void *cbkdata, +                            changelog_event_t *event)  { -        gf_changelog_journal_t   *jnl      = NULL; -        gf_changelog_processor_t *jnl_proc = NULL; +    gf_changelog_journal_t *jnl = NULL; +    gf_changelog_processor_t *jnl_proc = NULL; -        jnl      = cbkdata; -        jnl_proc = jnl->jnl_proc; +    jnl = cbkdata; +    jnl_proc = jnl->jnl_proc; -        gf_changelog_queue_journal (jnl_proc, event); +    gf_changelog_queue_journal(jnl_proc, event);  }  void -gf_changelog_journal_disconnect (void *xl, char *brick, void *data) +gf_changelog_journal_disconnect(void *xl, char *brick, void *data)  { -        gf_changelog_journal_t *jnl = NULL; +    gf_changelog_journal_t *jnl = NULL; -        jnl = data; +    jnl = data; -        pthread_spin_lock (&jnl->lock); -        { -                JNL_SET_API_STATE (jnl, JNL_API_DISCONNECTED); -        }; -        pthread_spin_unlock (&jnl->lock); +    pthread_spin_lock(&jnl->lock); +    { +        JNL_SET_API_STATE(jnl, JNL_API_DISCONNECTED); +    }; +    pthread_spin_unlock(&jnl->lock);  }  void -gf_changelog_journal_connect (void *xl, char *brick, void *data) +gf_changelog_journal_connect(void *xl, char *brick, void *data)  { -        gf_changelog_journal_t *jnl = NULL; +    gf_changelog_journal_t *jnl = NULL; -        jnl = data; +    jnl = data; -        pthread_spin_lock (&jnl->lock); -        { -                JNL_SET_API_STATE (jnl, JNL_API_CONNECTED); -        }; -        pthread_spin_unlock (&jnl->lock); +    pthread_spin_lock(&jnl->lock); +    { +        JNL_SET_API_STATE(jnl, JNL_API_CONNECTED); +    }; +    pthread_spin_unlock(&jnl->lock); -        return; +    return;  }  void -gf_changelog_cleanup_processor (gf_changelog_journal_t *jnl) +gf_changelog_cleanup_processor(gf_changelog_journal_t *jnl)  { -        int ret = 0; -        xlator_t *this = NULL; -        gf_changelog_processor_t *jnl_proc = NULL; - -        this = THIS; -        if (!this || !jnl || !jnl->jnl_proc) -                goto error_return; - -        jnl_proc = jnl->jnl_proc; - -        ret = gf_thread_cleanup (this, jnl_proc->processor); -        if (ret != 0) { -                gf_msg (this->name, GF_LOG_ERROR, 0, -                        CHANGELOG_LIB_MSG_CLEANUP_ERROR, -                        "failed to cleanup processor thread"); -                goto error_return; -        } +    int ret = 0; +    xlator_t *this = NULL; +    gf_changelog_processor_t *jnl_proc = NULL; -        (void)pthread_mutex_destroy (&jnl_proc->lock); -        (void)pthread_cond_destroy (&jnl_proc->cond); +    this = THIS; +    if (!this || !jnl || !jnl->jnl_proc) +        goto error_return; -        GF_FREE (jnl_proc); +    jnl_proc = jnl->jnl_proc; - error_return: -        return; +    ret = gf_thread_cleanup(this, jnl_proc->processor); +    if (ret != 0) { +        gf_msg(this->name, GF_LOG_ERROR, 0, CHANGELOG_LIB_MSG_CLEANUP_ERROR, +               "failed to cleanup processor thread"); +        goto error_return; +    } + +    (void)pthread_mutex_destroy(&jnl_proc->lock); +    (void)pthread_cond_destroy(&jnl_proc->cond); + +    GF_FREE(jnl_proc); + +error_return: +    return;  }  int -gf_changelog_init_processor (gf_changelog_journal_t *jnl) +gf_changelog_init_processor(gf_changelog_journal_t *jnl)  { -        int ret = -1; -        gf_changelog_processor_t *jnl_proc = NULL; +    int ret = -1; +    gf_changelog_processor_t *jnl_proc = NULL; -        jnl_proc = GF_CALLOC (1, sizeof (gf_changelog_processor_t), -                              gf_changelog_mt_libgfchangelog_t); -        if (!jnl_proc) -                goto error_return; - -        ret = pthread_mutex_init (&jnl_proc->lock, NULL); -        if (ret != 0) -                goto free_jnl_proc; -        ret = pthread_cond_init (&jnl_proc->cond, NULL); -        if (ret != 0) -                goto cleanup_mutex; - -        INIT_LIST_HEAD (&jnl_proc->entries); -        jnl_proc->waiting = _gf_false; -        jnl->jnl_proc = jnl_proc; - -        ret = gf_thread_create (&jnl_proc->processor, -                                NULL, gf_changelog_process, jnl, "clogproc"); -        if (ret != 0) { -                jnl->jnl_proc = NULL; -                goto cleanup_cond; -        } +    jnl_proc = GF_CALLOC(1, sizeof(gf_changelog_processor_t), +                         gf_changelog_mt_libgfchangelog_t); +    if (!jnl_proc) +        goto error_return; + +    ret = pthread_mutex_init(&jnl_proc->lock, NULL); +    if (ret != 0) +        goto free_jnl_proc; +    ret = pthread_cond_init(&jnl_proc->cond, NULL); +    if (ret != 0) +        goto cleanup_mutex; + +    INIT_LIST_HEAD(&jnl_proc->entries); +    jnl_proc->waiting = _gf_false; +    jnl->jnl_proc = jnl_proc; + +    ret = gf_thread_create(&jnl_proc->processor, NULL, gf_changelog_process, +                           jnl, "clogproc"); +    if (ret != 0) { +        jnl->jnl_proc = NULL; +        goto cleanup_cond; +    } -        return 0; +    return 0; - cleanup_cond: -        (void) pthread_cond_destroy (&jnl_proc->cond); - cleanup_mutex: -        (void) pthread_mutex_destroy (&jnl_proc->lock); - free_jnl_proc: -        GF_FREE (jnl_proc); - error_return: -        return -1; +cleanup_cond: +    (void)pthread_cond_destroy(&jnl_proc->cond); +cleanup_mutex: +    (void)pthread_mutex_destroy(&jnl_proc->lock); +free_jnl_proc: +    GF_FREE(jnl_proc); +error_return: +    return -1;  }  static void -gf_changelog_cleanup_fds (gf_changelog_journal_t *jnl) +gf_changelog_cleanup_fds(gf_changelog_journal_t *jnl)  { -        /* tracker fd */ -        if (jnl->jnl_fd != -1) -                sys_close (jnl->jnl_fd); -        /* processing dir */ -        if (jnl->jnl_dir) -                sys_closedir (jnl->jnl_dir); - -        if (jnl->jnl_working_dir) -                free (jnl->jnl_working_dir); /* allocated by realpath */ +    /* tracker fd */ +    if (jnl->jnl_fd != -1) +        sys_close(jnl->jnl_fd); +    /* processing dir */ +    if (jnl->jnl_dir) +        sys_closedir(jnl->jnl_dir); + +    if (jnl->jnl_working_dir) +        free(jnl->jnl_working_dir); /* allocated by realpath */  }  static int -gf_changelog_open_dirs (xlator_t *this, gf_changelog_journal_t *jnl) +gf_changelog_open_dirs(xlator_t *this, gf_changelog_journal_t *jnl)  { -        int  ret                    = -1; -        DIR *dir                    = NULL; -        int  tracker_fd             = 0; -        char tracker_path[PATH_MAX] = {0,}; - -        /* .current */ -        (void) snprintf (jnl->jnl_current_dir, PATH_MAX, -                         "%s/"GF_CHANGELOG_CURRENT_DIR"/", -                         jnl->jnl_working_dir); -        ret = recursive_rmdir (jnl->jnl_current_dir); -        if (ret) { -                gf_smsg (this->name, GF_LOG_ERROR, errno, -                         CHANGELOG_LIB_MSG_FAILED_TO_RMDIR, -                         "Failed to rmdir", -                         "path=%s", jnl->jnl_current_dir, -                         NULL); -                goto out; -        } -        ret = mkdir_p (jnl->jnl_current_dir, 0600, _gf_false); -        if (ret) -                goto out; +    int ret = -1; +    DIR *dir = NULL; +    int tracker_fd = 0; +    char tracker_path[PATH_MAX] = { +        0, +    }; + +    /* .current */ +    (void)snprintf(jnl->jnl_current_dir, PATH_MAX, +                   "%s/" GF_CHANGELOG_CURRENT_DIR "/", jnl->jnl_working_dir); +    ret = recursive_rmdir(jnl->jnl_current_dir); +    if (ret) { +        gf_smsg(this->name, GF_LOG_ERROR, errno, +                CHANGELOG_LIB_MSG_FAILED_TO_RMDIR, "Failed to rmdir", "path=%s", +                jnl->jnl_current_dir, NULL); +        goto out; +    } +    ret = mkdir_p(jnl->jnl_current_dir, 0600, _gf_false); +    if (ret) +        goto out; + +    /* .processed */ +    (void)snprintf(jnl->jnl_processed_dir, PATH_MAX, +                   "%s/" GF_CHANGELOG_PROCESSED_DIR "/", jnl->jnl_working_dir); +    ret = mkdir_p(jnl->jnl_processed_dir, 0600, _gf_false); +    if (ret) +        goto out; + +    /* .processing */ +    (void)snprintf(jnl->jnl_processing_dir, PATH_MAX, +                   "%s/" GF_CHANGELOG_PROCESSING_DIR "/", jnl->jnl_working_dir); +    ret = recursive_rmdir(jnl->jnl_processing_dir); +    if (ret) { +        gf_smsg(this->name, GF_LOG_ERROR, errno, +                CHANGELOG_LIB_MSG_FAILED_TO_RMDIR, "Failed to rmdir", "path=%s", +                jnl->jnl_processing_dir, NULL); +        goto out; +    } -        /* .processed */ -        (void) snprintf (jnl->jnl_processed_dir, PATH_MAX, -                         "%s/"GF_CHANGELOG_PROCESSED_DIR"/", -                         jnl->jnl_working_dir); -        ret = mkdir_p (jnl->jnl_processed_dir, 0600, _gf_false); -        if (ret) -                goto out; - -        /* .processing */ -        (void) snprintf (jnl->jnl_processing_dir, PATH_MAX, -                         "%s/"GF_CHANGELOG_PROCESSING_DIR"/", -                         jnl->jnl_working_dir); -        ret = recursive_rmdir (jnl->jnl_processing_dir); -        if (ret) { -                gf_smsg (this->name, GF_LOG_ERROR, errno, -                         CHANGELOG_LIB_MSG_FAILED_TO_RMDIR, -                         "Failed to rmdir", -                         "path=%s", jnl->jnl_processing_dir, -                         NULL); -                goto out; -        } +    ret = mkdir_p(jnl->jnl_processing_dir, 0600, _gf_false); +    if (ret) +        goto out; -        ret = mkdir_p (jnl->jnl_processing_dir, 0600, _gf_false); -        if (ret) -                goto out; - -        dir = sys_opendir (jnl->jnl_processing_dir); -        if (!dir) { -                gf_msg ("", GF_LOG_ERROR, errno, -                        CHANGELOG_LIB_MSG_OPENDIR_ERROR, -                        "opendir() error"); -                goto out; -        } +    dir = sys_opendir(jnl->jnl_processing_dir); +    if (!dir) { +        gf_msg("", GF_LOG_ERROR, errno, CHANGELOG_LIB_MSG_OPENDIR_ERROR, +               "opendir() error"); +        goto out; +    } -        jnl->jnl_dir = dir; +    jnl->jnl_dir = dir; -        (void) snprintf (tracker_path, PATH_MAX, -                         "%s/"GF_CHANGELOG_TRACKER, jnl->jnl_working_dir); +    (void)snprintf(tracker_path, PATH_MAX, "%s/" GF_CHANGELOG_TRACKER, +                   jnl->jnl_working_dir); -        tracker_fd = open (tracker_path, O_CREAT | O_APPEND | O_RDWR, -                           S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH); -        if (tracker_fd < 0) { -                sys_closedir (jnl->jnl_dir); -                ret = -1; -                goto out; -        } +    tracker_fd = open(tracker_path, O_CREAT | O_APPEND | O_RDWR, +                      S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH); +    if (tracker_fd < 0) { +        sys_closedir(jnl->jnl_dir); +        ret = -1; +        goto out; +    } -        jnl->jnl_fd = tracker_fd; -        ret = 0; - out: -        return ret; +    jnl->jnl_fd = tracker_fd; +    ret = 0; +out: +    return ret;  }  int -gf_changelog_init_history (xlator_t *this, -                           gf_changelog_journal_t *jnl, -                           char *brick_path) +gf_changelog_init_history(xlator_t *this, gf_changelog_journal_t *jnl, +                          char *brick_path)  { -        int i   = 0; -        int ret = 0; -        char hist_scratch_dir[PATH_MAX] = {0,}; +    int i = 0; +    int ret = 0; +    char hist_scratch_dir[PATH_MAX] = { +        0, +    }; -        jnl->hist_jnl = GF_CALLOC (1, sizeof (*jnl), -                         gf_changelog_mt_libgfchangelog_t); -        if (!jnl->hist_jnl) -                goto error_return; +    jnl->hist_jnl = GF_CALLOC(1, sizeof(*jnl), +                              gf_changelog_mt_libgfchangelog_t); +    if (!jnl->hist_jnl) +        goto error_return; -        jnl->hist_jnl->jnl_dir = NULL; -        jnl->hist_jnl->jnl_fd =  -1; +    jnl->hist_jnl->jnl_dir = NULL; +    jnl->hist_jnl->jnl_fd = -1; -        (void) snprintf (hist_scratch_dir, PATH_MAX, -                         "%s/"GF_CHANGELOG_HISTORY_DIR"/", -                         jnl->jnl_working_dir); +    (void)snprintf(hist_scratch_dir, PATH_MAX, +                   "%s/" GF_CHANGELOG_HISTORY_DIR "/", jnl->jnl_working_dir); -        ret = mkdir_p (hist_scratch_dir, 0600, _gf_false); -        if (ret) -                goto dealloc_hist; - -        jnl->hist_jnl->jnl_working_dir = realpath (hist_scratch_dir, NULL); -        if (!jnl->hist_jnl->jnl_working_dir) -                goto dealloc_hist; - -        ret = gf_changelog_open_dirs (this, jnl->hist_jnl); -        if (ret) { -                gf_msg (this->name, GF_LOG_ERROR, 0, -                        CHANGELOG_LIB_MSG_OPENDIR_ERROR, -                        "could not create entries in history scratch dir"); -                goto dealloc_hist; -        } +    ret = mkdir_p(hist_scratch_dir, 0600, _gf_false); +    if (ret) +        goto dealloc_hist; -        if (snprintf (jnl->hist_jnl->jnl_brickpath, PATH_MAX, "%s", -                      brick_path) >= PATH_MAX) -                goto dealloc_hist; +    jnl->hist_jnl->jnl_working_dir = realpath(hist_scratch_dir, NULL); +    if (!jnl->hist_jnl->jnl_working_dir) +        goto dealloc_hist; -        for (i = 0; i < 256; i++) { -                jnl->hist_jnl->rfc3986_space_newline[i] = -                        (i == ' ' || i == '\n' || i == '%') ? 0 : i; -        } +    ret = gf_changelog_open_dirs(this, jnl->hist_jnl); +    if (ret) { +        gf_msg(this->name, GF_LOG_ERROR, 0, CHANGELOG_LIB_MSG_OPENDIR_ERROR, +               "could not create entries in history scratch dir"); +        goto dealloc_hist; +    } -        return 0; +    if (snprintf(jnl->hist_jnl->jnl_brickpath, PATH_MAX, "%s", brick_path) >= +        PATH_MAX) +        goto dealloc_hist; - dealloc_hist: -        GF_FREE (jnl->hist_jnl); -        jnl->hist_jnl = NULL; - error_return: -        return -1; +    for (i = 0; i < 256; i++) { +        jnl->hist_jnl->rfc3986_space_newline[i] = (i == ' ' || i == '\n' || +                                                   i == '%') +                                                      ? 0 +                                                      : i; +    } + +    return 0; + +dealloc_hist: +    GF_FREE(jnl->hist_jnl); +    jnl->hist_jnl = NULL; +error_return: +    return -1;  }  void -gf_changelog_journal_fini (void *xl, char *brick, void *data) +gf_changelog_journal_fini(void *xl, char *brick, void *data)  { -        gf_changelog_journal_t *jnl = NULL; +    gf_changelog_journal_t *jnl = NULL; -        jnl = data; +    jnl = data; -        gf_changelog_cleanup_processor (jnl); +    gf_changelog_cleanup_processor(jnl); -        gf_changelog_cleanup_fds (jnl); -        if (jnl->hist_jnl) -                gf_changelog_cleanup_fds (jnl->hist_jnl); +    gf_changelog_cleanup_fds(jnl); +    if (jnl->hist_jnl) +        gf_changelog_cleanup_fds(jnl->hist_jnl); -        GF_FREE (jnl); +    GF_FREE(jnl);  }  void * -gf_changelog_journal_init (void *xl, struct gf_brick_spec *brick) +gf_changelog_journal_init(void *xl, struct gf_brick_spec *brick)  { -        int                     i           = 0; -        int                     ret         = 0; -        xlator_t               *this        = NULL; -        struct stat             buf         = {0,}; -        char                   *scratch_dir = NULL; -        gf_changelog_journal_t *jnl         = NULL; - -        this = xl; -        scratch_dir = (char *) brick->ptr; - -        jnl = GF_CALLOC (1, sizeof (gf_changelog_journal_t), -                         gf_changelog_mt_libgfchangelog_t); -        if (!jnl) -                goto error_return; - -        if (snprintf (jnl->jnl_brickpath, PATH_MAX, "%s", -		      brick->brick_path) >= PATH_MAX) -                goto dealloc_private; - -        if (sys_stat (scratch_dir, &buf) && errno == ENOENT) { -                ret = mkdir_p (scratch_dir, 0600, _gf_true); -                if (ret) -                        goto dealloc_private; -        } +    int i = 0; +    int ret = 0; +    xlator_t *this = NULL; +    struct stat buf = { +        0, +    }; +    char *scratch_dir = NULL; +    gf_changelog_journal_t *jnl = NULL; + +    this = xl; +    scratch_dir = (char *)brick->ptr; + +    jnl = GF_CALLOC(1, sizeof(gf_changelog_journal_t), +                    gf_changelog_mt_libgfchangelog_t); +    if (!jnl) +        goto error_return; + +    if (snprintf(jnl->jnl_brickpath, PATH_MAX, "%s", brick->brick_path) >= +        PATH_MAX) +        goto dealloc_private; + +    if (sys_stat(scratch_dir, &buf) && errno == ENOENT) { +        ret = mkdir_p(scratch_dir, 0600, _gf_true); +        if (ret) +            goto dealloc_private; +    } -        jnl->jnl_working_dir = realpath (scratch_dir, NULL); -        if (!jnl->jnl_working_dir) -                goto dealloc_private; +    jnl->jnl_working_dir = realpath(scratch_dir, NULL); +    if (!jnl->jnl_working_dir) +        goto dealloc_private; -        ret = gf_changelog_open_dirs (this, jnl); -        if (ret) { -                gf_msg (this->name, GF_LOG_ERROR, 0, -                        CHANGELOG_LIB_MSG_OPENDIR_ERROR, -                        "could not create entries in scratch dir"); -                goto dealloc_private; -        } - -        /* RFC 3986 {de,en}coding */ -        for (i = 0; i < 256; i++) { -                jnl->rfc3986_space_newline[i] = -                        (i == ' ' || i == '\n' || i == '%') ? 0 : i; -        } +    ret = gf_changelog_open_dirs(this, jnl); +    if (ret) { +        gf_msg(this->name, GF_LOG_ERROR, 0, CHANGELOG_LIB_MSG_OPENDIR_ERROR, +               "could not create entries in scratch dir"); +        goto dealloc_private; +    } -        ret = gf_changelog_init_history (this, jnl, brick->brick_path); -        if (ret) -                goto cleanup_fds; +    /* RFC 3986 {de,en}coding */ +    for (i = 0; i < 256; i++) { +        jnl->rfc3986_space_newline[i] = (i == ' ' || i == '\n' || i == '%') ? 0 +                                                                            : i; +    } -        /* initialize journal processor */ -        jnl->this = this; -        ret = gf_changelog_init_processor (jnl); -        if (ret) -                goto cleanup_fds; - -        JNL_SET_API_STATE (jnl, JNL_API_CONN_INPROGESS); -        ret = pthread_spin_init (&jnl->lock, 0); -        if (ret != 0) -                goto cleanup_processor; -        return jnl; - - cleanup_processor: -        gf_changelog_cleanup_processor (jnl); - cleanup_fds: -        gf_changelog_cleanup_fds (jnl); -        if (jnl->hist_jnl) -                gf_changelog_cleanup_fds (jnl->hist_jnl); - dealloc_private: -        GF_FREE (jnl); - error_return: -        return NULL; +    ret = gf_changelog_init_history(this, jnl, brick->brick_path); +    if (ret) +        goto cleanup_fds; + +    /* initialize journal processor */ +    jnl->this = this; +    ret = gf_changelog_init_processor(jnl); +    if (ret) +        goto cleanup_fds; + +    JNL_SET_API_STATE(jnl, JNL_API_CONN_INPROGESS); +    ret = pthread_spin_init(&jnl->lock, 0); +    if (ret != 0) +        goto cleanup_processor; +    return jnl; + +cleanup_processor: +    gf_changelog_cleanup_processor(jnl); +cleanup_fds: +    gf_changelog_cleanup_fds(jnl); +    if (jnl->hist_jnl) +        gf_changelog_cleanup_fds(jnl->hist_jnl); +dealloc_private: +    GF_FREE(jnl); +error_return: +    return NULL;  } diff --git a/xlators/features/changelog/lib/src/gf-changelog-reborp.c b/xlators/features/changelog/lib/src/gf-changelog-reborp.c index f9fb8fcf01a..8dfda4c79c5 100644 --- a/xlators/features/changelog/lib/src/gf-changelog-reborp.c +++ b/xlators/features/changelog/lib/src/gf-changelog-reborp.c @@ -25,133 +25,121 @@  struct rpcsvc_program *gf_changelog_reborp_programs[];  void * -gf_changelog_connection_janitor (void *arg) +gf_changelog_connection_janitor(void *arg)  { -        int32_t ret = 0; -        xlator_t *this = NULL; -        gf_private_t *priv = NULL; -        gf_changelog_t *entry = NULL; -        struct gf_event *event = NULL; -        struct gf_event_list *ev = NULL; -        unsigned long drained = 0; - -        this = arg; -        THIS = this; - -        priv = this->private; - -        while (1) { -                pthread_mutex_lock (&priv->lock); -                { -                        while (list_empty (&priv->cleanups)) -                                pthread_cond_wait (&priv->cond, &priv->lock); - -                        entry = list_first_entry (&priv->cleanups, -                                                  gf_changelog_t, list); -                        list_del_init (&entry->list); -                } -                pthread_mutex_unlock (&priv->lock); - -                drained = 0; -                ev = &entry->event; - -                gf_smsg (this->name, GF_LOG_INFO, 0, -                         CHANGELOG_LIB_MSG_CLEANING_BRICK_ENTRY_INFO, -                         "Cleaning brick entry for brick", -                         "brick=%s", entry->brick, -                         NULL); - -                /* 0x0: disable rpc-clnt */ -                rpc_clnt_disable (RPC_PROBER (entry)); - -                /* 0x1: cleanup callback invoker thread */ -                ret = gf_cleanup_event (this, ev); -                if (ret) -                        continue; - -                /* 0x2: drain pending events */ -                while (!list_empty (&ev->events)) { -                        event = list_first_entry (&ev->events, -                                                  struct gf_event, list); -                        gf_smsg (this->name, GF_LOG_INFO, 0, -                                 CHANGELOG_LIB_MSG_DRAINING_EVENT_INFO, -                                 "Draining event", -                                 "seq=%lu", event->seq, -                                 "payload=%d", event->count, -                                 NULL); - -                        GF_FREE (event); -                        drained++; -                } - -                gf_smsg (this->name, GF_LOG_INFO, 0, -                         CHANGELOG_LIB_MSG_DRAINING_EVENT_INFO, -                         "Drained events", -                         "num=%lu", drained, -                         NULL); - -                /* 0x3: freeup brick entry */ -                gf_smsg (this->name, GF_LOG_INFO, 0, -                         CHANGELOG_LIB_MSG_FREEING_ENTRY_INFO, -                         "freeing entry", -                         "entry=%p", entry, -                         NULL); -                LOCK_DESTROY (&entry->statelock); -                GF_FREE (entry); +    int32_t ret = 0; +    xlator_t *this = NULL; +    gf_private_t *priv = NULL; +    gf_changelog_t *entry = NULL; +    struct gf_event *event = NULL; +    struct gf_event_list *ev = NULL; +    unsigned long drained = 0; + +    this = arg; +    THIS = this; + +    priv = this->private; + +    while (1) { +        pthread_mutex_lock(&priv->lock); +        { +            while (list_empty(&priv->cleanups)) +                pthread_cond_wait(&priv->cond, &priv->lock); + +            entry = list_first_entry(&priv->cleanups, gf_changelog_t, list); +            list_del_init(&entry->list); +        } +        pthread_mutex_unlock(&priv->lock); + +        drained = 0; +        ev = &entry->event; + +        gf_smsg(this->name, GF_LOG_INFO, 0, +                CHANGELOG_LIB_MSG_CLEANING_BRICK_ENTRY_INFO, +                "Cleaning brick entry for brick", "brick=%s", entry->brick, +                NULL); + +        /* 0x0: disable rpc-clnt */ +        rpc_clnt_disable(RPC_PROBER(entry)); + +        /* 0x1: cleanup callback invoker thread */ +        ret = gf_cleanup_event(this, ev); +        if (ret) +            continue; + +        /* 0x2: drain pending events */ +        while (!list_empty(&ev->events)) { +            event = list_first_entry(&ev->events, struct gf_event, list); +            gf_smsg(this->name, GF_LOG_INFO, 0, +                    CHANGELOG_LIB_MSG_DRAINING_EVENT_INFO, "Draining event", +                    "seq=%lu", event->seq, "payload=%d", event->count, NULL); + +            GF_FREE(event); +            drained++;          } -        return NULL; +        gf_smsg(this->name, GF_LOG_INFO, 0, +                CHANGELOG_LIB_MSG_DRAINING_EVENT_INFO, "Drained events", +                "num=%lu", drained, NULL); + +        /* 0x3: freeup brick entry */ +        gf_smsg(this->name, GF_LOG_INFO, 0, +                CHANGELOG_LIB_MSG_FREEING_ENTRY_INFO, "freeing entry", +                "entry=%p", entry, NULL); +        LOCK_DESTROY(&entry->statelock); +        GF_FREE(entry); +    } + +    return NULL;  }  int -gf_changelog_reborp_rpcsvc_notify (rpcsvc_t *rpc, void *mydata, -                                   rpcsvc_event_t event, void *data) +gf_changelog_reborp_rpcsvc_notify(rpcsvc_t *rpc, void *mydata, +                                  rpcsvc_event_t event, void *data)  { -        int             ret      = 0; -        xlator_t       *this     = NULL; -        gf_changelog_t *entry    = NULL; +    int ret = 0; +    xlator_t *this = NULL; +    gf_changelog_t *entry = NULL; -        if (!(event == RPCSVC_EVENT_ACCEPT || -              event == RPCSVC_EVENT_DISCONNECT)) -                return 0; +    if (!(event == RPCSVC_EVENT_ACCEPT || event == RPCSVC_EVENT_DISCONNECT)) +        return 0; -        entry = mydata; -        this = entry->this; +    entry = mydata; +    this = entry->this; -        switch (event) { +    switch (event) {          case RPCSVC_EVENT_ACCEPT: -                ret = sys_unlink (RPC_SOCK(entry)); -                if (ret != 0) -                        gf_smsg (this->name, GF_LOG_WARNING, errno, -                                 CHANGELOG_LIB_MSG_UNLINK_FAILED, -                                 "failed to unlink " -                                 "reverse socket", -                                 "path=%s", RPC_SOCK (entry), -                                 NULL); -                if (entry->connected) -                        GF_CHANGELOG_INVOKE_CBK (this, entry->connected, -                                                 entry->brick, entry->ptr); -                break; +            ret = sys_unlink(RPC_SOCK(entry)); +            if (ret != 0) +                gf_smsg(this->name, GF_LOG_WARNING, errno, +                        CHANGELOG_LIB_MSG_UNLINK_FAILED, +                        "failed to unlink " +                        "reverse socket", +                        "path=%s", RPC_SOCK(entry), NULL); +            if (entry->connected) +                GF_CHANGELOG_INVOKE_CBK(this, entry->connected, entry->brick, +                                        entry->ptr); +            break;          case RPCSVC_EVENT_DISCONNECT: -                if (entry->disconnected) -                        GF_CHANGELOG_INVOKE_CBK (this, entry->disconnected, -                                                 entry->brick, entry->ptr); -                /* passthrough */ +            if (entry->disconnected) +                GF_CHANGELOG_INVOKE_CBK(this, entry->disconnected, entry->brick, +                                        entry->ptr); +            /* passthrough */          default: -                break; -        } +            break; +    } -        return 0; +    return 0;  }  rpcsvc_t * -gf_changelog_reborp_init_rpc_listner (xlator_t *this, -                                      char *path, char *sock, void *cbkdata) +gf_changelog_reborp_init_rpc_listner(xlator_t *this, char *path, char *sock, +                                     void *cbkdata)  { -        CHANGELOG_MAKE_TMP_SOCKET_PATH (path, sock, UNIX_PATH_MAX); -        return changelog_rpc_server_init (this, sock, cbkdata, -                                          gf_changelog_reborp_rpcsvc_notify, -                                          gf_changelog_reborp_programs); +    CHANGELOG_MAKE_TMP_SOCKET_PATH(path, sock, UNIX_PATH_MAX); +    return changelog_rpc_server_init(this, sock, cbkdata, +                                     gf_changelog_reborp_rpcsvc_notify, +                                     gf_changelog_reborp_programs);  }  /** @@ -164,29 +152,27 @@ gf_changelog_reborp_init_rpc_listner (xlator_t *this,   * @FIXME: cleanup this bugger once server filters events.   */  void -gf_changelog_invoke_callback (gf_changelog_t *entry, -                              struct iovec **vec, int payloadcnt) +gf_changelog_invoke_callback(gf_changelog_t *entry, struct iovec **vec, +                             int payloadcnt)  { -        int i = 0; -        int evsize = 0; -        xlator_t *this = NULL; -        changelog_event_t *event = NULL; - -        this = entry->this; - -        for (; i < payloadcnt; i++) { -                event = (changelog_event_t *)vec[i]->iov_base; -                evsize = vec[i]->iov_len / CHANGELOG_EV_SIZE; - -                for (; evsize > 0; evsize--, event++) { -                        if (gf_changelog_filter_check (entry, event)) { -                                GF_CHANGELOG_INVOKE_CBK (this, -                                                         entry->callback, -                                                         entry->brick, -                                                         entry->ptr, event); -                        } -                } +    int i = 0; +    int evsize = 0; +    xlator_t *this = NULL; +    changelog_event_t *event = NULL; + +    this = entry->this; + +    for (; i < payloadcnt; i++) { +        event = (changelog_event_t *)vec[i]->iov_base; +        evsize = vec[i]->iov_len / CHANGELOG_EV_SIZE; + +        for (; evsize > 0; evsize--, event++) { +            if (gf_changelog_filter_check(entry, event)) { +                GF_CHANGELOG_INVOKE_CBK(this, entry->callback, entry->brick, +                                        entry->ptr, event); +            }          } +    }  }  /** @@ -197,218 +183,217 @@ gf_changelog_invoke_callback (gf_changelog_t *entry,   */  int -__is_expected_sequence (struct gf_event_list *ev, struct gf_event *event) +__is_expected_sequence(struct gf_event_list *ev, struct gf_event *event)  { -        return (ev->next_seq == event->seq); +    return (ev->next_seq == event->seq);  }  int -__can_process_event (struct gf_event_list *ev, struct gf_event **event) +__can_process_event(struct gf_event_list *ev, struct gf_event **event)  { -        *event = list_first_entry (&ev->events, struct gf_event, list); +    *event = list_first_entry(&ev->events, struct gf_event, list); -        if (__is_expected_sequence (ev, *event)) { -                list_del (&(*event)->list); -                ev->next_seq++; -                return 1; -        } +    if (__is_expected_sequence(ev, *event)) { +        list_del(&(*event)->list); +        ev->next_seq++; +        return 1; +    } -        return 0; +    return 0;  }  void -pick_event_ordered (struct gf_event_list *ev, struct gf_event **event) +pick_event_ordered(struct gf_event_list *ev, struct gf_event **event)  { -        pthread_mutex_lock (&ev->lock); -        { -                while (list_empty (&ev->events) -                       || !__can_process_event (ev, event)) -                        pthread_cond_wait (&ev->cond, &ev->lock); -        } -        pthread_mutex_unlock (&ev->lock); +    pthread_mutex_lock(&ev->lock); +    { +        while (list_empty(&ev->events) || !__can_process_event(ev, event)) +            pthread_cond_wait(&ev->cond, &ev->lock); +    } +    pthread_mutex_unlock(&ev->lock);  }  void -pick_event_unordered (struct gf_event_list *ev, struct gf_event **event) +pick_event_unordered(struct gf_event_list *ev, struct gf_event **event)  { -        pthread_mutex_lock (&ev->lock); -        { -                while (list_empty (&ev->events)) -                        pthread_cond_wait (&ev->cond, &ev->lock); -                *event = list_first_entry (&ev->events, struct gf_event, list); -                list_del (&(*event)->list); -        } -        pthread_mutex_unlock (&ev->lock); +    pthread_mutex_lock(&ev->lock); +    { +        while (list_empty(&ev->events)) +            pthread_cond_wait(&ev->cond, &ev->lock); +        *event = list_first_entry(&ev->events, struct gf_event, list); +        list_del(&(*event)->list); +    } +    pthread_mutex_unlock(&ev->lock);  }  void * -gf_changelog_callback_invoker (void *arg) +gf_changelog_callback_invoker(void *arg)  { -        xlator_t             *this   = NULL; -        gf_changelog_t       *entry  = NULL; -        struct iovec         *vec    = NULL; -        struct gf_event      *event  = NULL; -        struct gf_event_list *ev     = NULL; +    xlator_t *this = NULL; +    gf_changelog_t *entry = NULL; +    struct iovec *vec = NULL; +    struct gf_event *event = NULL; +    struct gf_event_list *ev = NULL; -        ev    = arg; -        entry = ev->entry; -        THIS = this = entry->this; +    ev = arg; +    entry = ev->entry; +    THIS = this = entry->this; -        while (1) { -                entry->pickevent (ev, &event); +    while (1) { +        entry->pickevent(ev, &event); -                vec = (struct iovec *) &event->iov; -                gf_changelog_invoke_callback (entry, &vec, event->count); +        vec = (struct iovec *)&event->iov; +        gf_changelog_invoke_callback(entry, &vec, event->count); -                GF_FREE (event); -        } +        GF_FREE(event); +    } -        return NULL; +    return NULL;  }  static int -orderfn (struct list_head *pos1, struct list_head *pos2) +orderfn(struct list_head *pos1, struct list_head *pos2)  { -        struct gf_event *event1 = NULL; -        struct gf_event *event2 = NULL; +    struct gf_event *event1 = NULL; +    struct gf_event *event2 = NULL; -        event1 = list_entry (pos1, struct gf_event, list); -        event2 = list_entry (pos2, struct gf_event, list); +    event1 = list_entry(pos1, struct gf_event, list); +    event2 = list_entry(pos2, struct gf_event, list); -        if  (event1->seq > event2->seq) -                return 1; -        return -1; +    if (event1->seq > event2->seq) +        return 1; +    return -1;  }  void -queue_ordered_event (struct gf_event_list *ev, struct gf_event *event) +queue_ordered_event(struct gf_event_list *ev, struct gf_event *event)  { -        /* add event to the ordered event list and wake up listener(s) */ -        pthread_mutex_lock (&ev->lock); -        { -                list_add_order (&event->list, &ev->events, orderfn); -                if (!ev->next_seq) -                        ev->next_seq = event->seq; -                if (ev->next_seq == event->seq) -                        pthread_cond_signal (&ev->cond); -        } -        pthread_mutex_unlock (&ev->lock); +    /* add event to the ordered event list and wake up listener(s) */ +    pthread_mutex_lock(&ev->lock); +    { +        list_add_order(&event->list, &ev->events, orderfn); +        if (!ev->next_seq) +            ev->next_seq = event->seq; +        if (ev->next_seq == event->seq) +            pthread_cond_signal(&ev->cond); +    } +    pthread_mutex_unlock(&ev->lock);  }  void -queue_unordered_event (struct gf_event_list *ev, struct gf_event *event) +queue_unordered_event(struct gf_event_list *ev, struct gf_event *event)  { -        /* add event to the tail of the queue and wake up listener(s) */ -        pthread_mutex_lock (&ev->lock); -        { -                list_add_tail (&event->list, &ev->events); -                pthread_cond_signal (&ev->cond); -        } -        pthread_mutex_unlock (&ev->lock); +    /* add event to the tail of the queue and wake up listener(s) */ +    pthread_mutex_lock(&ev->lock); +    { +        list_add_tail(&event->list, &ev->events); +        pthread_cond_signal(&ev->cond); +    } +    pthread_mutex_unlock(&ev->lock);  }  int -gf_changelog_event_handler (rpcsvc_request_t *req, -                            xlator_t *this, gf_changelog_t *entry) +gf_changelog_event_handler(rpcsvc_request_t *req, xlator_t *this, +                           gf_changelog_t *entry)  { -        int                   i          = 0; -        size_t                payloadlen = 0; -        ssize_t               len        = 0; -        int                   payloadcnt = 0; -        changelog_event_req   rpc_req    = {0,}; -        changelog_event_rsp   rpc_rsp    = {0,}; -        struct iovec         *vec        = NULL; -        struct gf_event      *event      = NULL; -        struct gf_event_list *ev         = NULL; - -        ev = &entry->event; - -        len = xdr_to_generic (req->msg[0], -                              &rpc_req, (xdrproc_t)xdr_changelog_event_req); -        if (len < 0) { -                gf_msg (this->name, GF_LOG_ERROR, 0, -                        CHANGELOG_LIB_MSG_XDR_DECODING_FAILED, -                        "xdr decoding failed"); -                req->rpc_err = GARBAGE_ARGS; -                goto handle_xdr_error; -        } - -        if (len < req->msg[0].iov_len) { -                payloadcnt = 1; -                payloadlen = (req->msg[0].iov_len - len); -        } -        for (i = 1; i < req->count; i++) { -                payloadcnt++; -                payloadlen += req->msg[i].iov_len; -        } - -        event = GF_CALLOC (1, GF_EVENT_CALLOC_SIZE (payloadcnt, payloadlen), -                           gf_changelog_mt_libgfchangelog_event_t); -        if (!event) -                goto handle_xdr_error; -        INIT_LIST_HEAD (&event->list); - -        payloadlen   = 0; -        event->seq   = rpc_req.seq; -        event->count = payloadcnt; - -        /* deep copy IO vectors */ -        vec = &event->iov[0]; -        GF_EVENT_ASSIGN_IOVEC (vec, event, -                               (req->msg[0].iov_len - len), payloadlen); -        (void) memcpy (vec->iov_base, -                       req->msg[0].iov_base + len, vec->iov_len); - -        for (i = 1; i < req->count; i++) { -                vec = &event->iov[i]; -                GF_EVENT_ASSIGN_IOVEC (vec, event, -                                       req->msg[i].iov_len, payloadlen); -                (void) memcpy (event->iov[i].iov_base, -                               req->msg[i].iov_base, req->msg[i].iov_len); -        } - -        gf_msg_debug (this->name, 0, -                      "seq: %lu [%s] (time: %lu.%lu), (vec: %d, len: %zd)", -                      rpc_req.seq, entry->brick, rpc_req.tv_sec, -                      rpc_req.tv_usec, payloadcnt, payloadlen); - -        /* dispatch event */ -        entry->queueevent (ev, event); - -        /* ack sequence number */ -        rpc_rsp.op_ret = 0; -        rpc_rsp.seq    = rpc_req.seq; - -        goto submit_rpc; - - handle_xdr_error: -        rpc_rsp.op_ret = -1; -        rpc_rsp.seq    = 0;     /* invalid */ - submit_rpc: -        return changelog_rpc_sumbit_reply (req, &rpc_rsp, NULL, 0, NULL, -                                           (xdrproc_t)xdr_changelog_event_rsp); +    int i = 0; +    size_t payloadlen = 0; +    ssize_t len = 0; +    int payloadcnt = 0; +    changelog_event_req rpc_req = { +        0, +    }; +    changelog_event_rsp rpc_rsp = { +        0, +    }; +    struct iovec *vec = NULL; +    struct gf_event *event = NULL; +    struct gf_event_list *ev = NULL; + +    ev = &entry->event; + +    len = xdr_to_generic(req->msg[0], &rpc_req, +                         (xdrproc_t)xdr_changelog_event_req); +    if (len < 0) { +        gf_msg(this->name, GF_LOG_ERROR, 0, +               CHANGELOG_LIB_MSG_XDR_DECODING_FAILED, "xdr decoding failed"); +        req->rpc_err = GARBAGE_ARGS; +        goto handle_xdr_error; +    } + +    if (len < req->msg[0].iov_len) { +        payloadcnt = 1; +        payloadlen = (req->msg[0].iov_len - len); +    } +    for (i = 1; i < req->count; i++) { +        payloadcnt++; +        payloadlen += req->msg[i].iov_len; +    } + +    event = GF_CALLOC(1, GF_EVENT_CALLOC_SIZE(payloadcnt, payloadlen), +                      gf_changelog_mt_libgfchangelog_event_t); +    if (!event) +        goto handle_xdr_error; +    INIT_LIST_HEAD(&event->list); + +    payloadlen = 0; +    event->seq = rpc_req.seq; +    event->count = payloadcnt; + +    /* deep copy IO vectors */ +    vec = &event->iov[0]; +    GF_EVENT_ASSIGN_IOVEC(vec, event, (req->msg[0].iov_len - len), payloadlen); +    (void)memcpy(vec->iov_base, req->msg[0].iov_base + len, vec->iov_len); + +    for (i = 1; i < req->count; i++) { +        vec = &event->iov[i]; +        GF_EVENT_ASSIGN_IOVEC(vec, event, req->msg[i].iov_len, payloadlen); +        (void)memcpy(event->iov[i].iov_base, req->msg[i].iov_base, +                     req->msg[i].iov_len); +    } + +    gf_msg_debug(this->name, 0, +                 "seq: %lu [%s] (time: %lu.%lu), (vec: %d, len: %zd)", +                 rpc_req.seq, entry->brick, rpc_req.tv_sec, rpc_req.tv_usec, +                 payloadcnt, payloadlen); + +    /* dispatch event */ +    entry->queueevent(ev, event); + +    /* ack sequence number */ +    rpc_rsp.op_ret = 0; +    rpc_rsp.seq = rpc_req.seq; + +    goto submit_rpc; + +handle_xdr_error: +    rpc_rsp.op_ret = -1; +    rpc_rsp.seq = 0; /* invalid */ +submit_rpc: +    return changelog_rpc_sumbit_reply(req, &rpc_rsp, NULL, 0, NULL, +                                      (xdrproc_t)xdr_changelog_event_rsp);  }  int -gf_changelog_reborp_handle_event (rpcsvc_request_t *req) +gf_changelog_reborp_handle_event(rpcsvc_request_t *req)  { -        xlator_t       *this  = NULL; -        rpcsvc_t       *svc   = NULL; -        gf_changelog_t *entry = NULL; +    xlator_t *this = NULL; +    rpcsvc_t *svc = NULL; +    gf_changelog_t *entry = NULL; -        svc = rpcsvc_request_service (req); -        entry = svc->mydata; +    svc = rpcsvc_request_service(req); +    entry = svc->mydata; -        this = THIS = entry->this; +    this = THIS = entry->this; -        return gf_changelog_event_handler (req, this, entry); +    return gf_changelog_event_handler(req, this, entry);  }  rpcsvc_actor_t gf_changelog_reborp_actors[CHANGELOG_REV_PROC_MAX] = { -        [CHANGELOG_REV_PROC_EVENT] = { -                "CHANGELOG EVENT HANDLER", CHANGELOG_REV_PROC_EVENT, -                gf_changelog_reborp_handle_event, NULL, 0, DRC_NA -        }, +    [CHANGELOG_REV_PROC_EVENT] = {"CHANGELOG EVENT HANDLER", +                                  CHANGELOG_REV_PROC_EVENT, +                                  gf_changelog_reborp_handle_event, NULL, 0, +                                  DRC_NA},  };  /** @@ -418,15 +403,15 @@ rpcsvc_actor_t gf_changelog_reborp_actors[CHANGELOG_REV_PROC_MAX] = {   * brick path and it's private data.   */  struct rpcsvc_program gf_changelog_reborp_prog = { -        .progname  = "LIBGFCHANGELOG REBORP", -        .prognum   = CHANGELOG_REV_RPC_PROCNUM, -        .progver   = CHANGELOG_REV_RPC_PROCVER, -        .numactors = CHANGELOG_REV_PROC_MAX, -        .actors    = gf_changelog_reborp_actors, -        .synctask  = _gf_false, +    .progname = "LIBGFCHANGELOG REBORP", +    .prognum = CHANGELOG_REV_RPC_PROCNUM, +    .progver = CHANGELOG_REV_RPC_PROCVER, +    .numactors = CHANGELOG_REV_PROC_MAX, +    .actors = gf_changelog_reborp_actors, +    .synctask = _gf_false,  };  struct rpcsvc_program *gf_changelog_reborp_programs[] = { -        &gf_changelog_reborp_prog, -        NULL, +    &gf_changelog_reborp_prog, +    NULL,  }; diff --git a/xlators/features/changelog/lib/src/gf-changelog-rpc.c b/xlators/features/changelog/lib/src/gf-changelog-rpc.c index 7eb5416ae98..8ec6ffbcebc 100644 --- a/xlators/features/changelog/lib/src/gf-changelog-rpc.c +++ b/xlators/features/changelog/lib/src/gf-changelog-rpc.c @@ -16,31 +16,32 @@ struct rpc_clnt_program gf_changelog_clnt;  /* TODO: piggyback reconnect to called (upcall) */  int -gf_changelog_rpc_notify (struct rpc_clnt *rpc, -                         void *mydata, rpc_clnt_event_t event, void *data) +gf_changelog_rpc_notify(struct rpc_clnt *rpc, void *mydata, +                        rpc_clnt_event_t event, void *data)  { -        switch (event) { +    switch (event) {          case RPC_CLNT_CONNECT: -                break; +            break;          case RPC_CLNT_DISCONNECT:          case RPC_CLNT_MSG:          case RPC_CLNT_DESTROY:          case RPC_CLNT_PING: -                break; -        } +            break; +    } -        return 0; +    return 0;  }  struct rpc_clnt * -gf_changelog_rpc_init (xlator_t *this, gf_changelog_t *entry) +gf_changelog_rpc_init(xlator_t *this, gf_changelog_t *entry)  { -        char sockfile[UNIX_PATH_MAX] = {0,}; +    char sockfile[UNIX_PATH_MAX] = { +        0, +    }; -        CHANGELOG_MAKE_SOCKET_PATH (entry->brick, -                                    sockfile, UNIX_PATH_MAX); -        return changelog_rpc_client_init (this, entry, -                                          sockfile, gf_changelog_rpc_notify); +    CHANGELOG_MAKE_SOCKET_PATH(entry->brick, sockfile, UNIX_PATH_MAX); +    return changelog_rpc_client_init(this, entry, sockfile, +                                     gf_changelog_rpc_notify);  }  /** @@ -48,51 +49,50 @@ gf_changelog_rpc_init (xlator_t *this, gf_changelog_t *entry)   */  int -gf_probe_changelog_cbk (struct rpc_req *req, -                        struct iovec *iovec, int count, void *myframe) +gf_probe_changelog_cbk(struct rpc_req *req, struct iovec *iovec, int count, +                       void *myframe)  { -        return 0; +    return 0;  }  int -gf_probe_changelog_filter (call_frame_t *frame, xlator_t *this, void *data) +gf_probe_changelog_filter(call_frame_t *frame, xlator_t *this, void *data)  { -        char *sock = NULL; -        gf_changelog_t *entry = NULL; -        changelog_probe_req req = {0,}; - -        entry = data; -        sock = RPC_SOCK (entry); - -        (void) memcpy (&req.sock, sock, strlen (sock)); -        req.filter = entry->notify; - -        /* invoke RPC */ -        return changelog_rpc_sumbit_req (RPC_PROBER (entry), (void *) &req, -                                         frame, &gf_changelog_clnt, -                                         CHANGELOG_RPC_PROBE_FILTER, NULL, 0, -                                         NULL, this, gf_probe_changelog_cbk, -                                         (xdrproc_t) xdr_changelog_probe_req); +    char *sock = NULL; +    gf_changelog_t *entry = NULL; +    changelog_probe_req req = { +        0, +    }; + +    entry = data; +    sock = RPC_SOCK(entry); + +    (void)memcpy(&req.sock, sock, strlen(sock)); +    req.filter = entry->notify; + +    /* invoke RPC */ +    return changelog_rpc_sumbit_req( +        RPC_PROBER(entry), (void *)&req, frame, &gf_changelog_clnt, +        CHANGELOG_RPC_PROBE_FILTER, NULL, 0, NULL, this, gf_probe_changelog_cbk, +        (xdrproc_t)xdr_changelog_probe_req);  }  int -gf_changelog_invoke_rpc (xlator_t *this, gf_changelog_t *entry, int procidx) +gf_changelog_invoke_rpc(xlator_t *this, gf_changelog_t *entry, int procidx)  { -        return changelog_invoke_rpc (this, RPC_PROBER (entry), -                                     &gf_changelog_clnt, procidx, entry); +    return changelog_invoke_rpc(this, RPC_PROBER(entry), &gf_changelog_clnt, +                                procidx, entry);  }  struct rpc_clnt_procedure gf_changelog_procs[CHANGELOG_RPC_PROC_MAX] = { -        [CHANGELOG_RPC_PROC_NULL] = {"NULL", NULL}, -        [CHANGELOG_RPC_PROBE_FILTER] = { -                "PROBE FILTER", gf_probe_changelog_filter -        }, +    [CHANGELOG_RPC_PROC_NULL] = {"NULL", NULL}, +    [CHANGELOG_RPC_PROBE_FILTER] = {"PROBE FILTER", gf_probe_changelog_filter},  };  struct rpc_clnt_program gf_changelog_clnt = { -        .progname  = "LIBGFCHANGELOG", -        .prognum   = CHANGELOG_RPC_PROGNUM, -        .progver   = CHANGELOG_RPC_PROGVER, -        .numproc   = CHANGELOG_RPC_PROC_MAX, -        .proctable = gf_changelog_procs, +    .progname = "LIBGFCHANGELOG", +    .prognum = CHANGELOG_RPC_PROGNUM, +    .progver = CHANGELOG_RPC_PROGVER, +    .numproc = CHANGELOG_RPC_PROC_MAX, +    .proctable = gf_changelog_procs,  }; diff --git a/xlators/features/changelog/lib/src/gf-changelog.c b/xlators/features/changelog/lib/src/gf-changelog.c index 8198560e736..c7791c62950 100644 --- a/xlators/features/changelog/lib/src/gf-changelog.c +++ b/xlators/features/changelog/lib/src/gf-changelog.c @@ -45,291 +45,291 @@   */  xlator_t *master = NULL; -static inline -gf_private_t *gf_changelog_alloc_priv () +static inline gf_private_t * +gf_changelog_alloc_priv()  { -        int ret = 0; -        gf_private_t *priv = NULL; - -        priv = GF_CALLOC (1, sizeof (*priv), gf_changelog_mt_priv_t); -        if (!priv) -                goto error_return; -        INIT_LIST_HEAD (&priv->connections); -        INIT_LIST_HEAD (&priv->cleanups); - -        ret = pthread_mutex_init (&priv->lock, NULL); -        if (ret != 0) -                goto free_priv; -        ret = pthread_cond_init (&priv->cond, NULL); -        if (ret != 0) -                goto cleanup_mutex; - -        priv->api = NULL; -        return priv; - - cleanup_mutex: -        (void) pthread_mutex_destroy (&priv->lock); - free_priv: -        GF_FREE (priv); - error_return: -        return NULL; +    int ret = 0; +    gf_private_t *priv = NULL; + +    priv = GF_CALLOC(1, sizeof(*priv), gf_changelog_mt_priv_t); +    if (!priv) +        goto error_return; +    INIT_LIST_HEAD(&priv->connections); +    INIT_LIST_HEAD(&priv->cleanups); + +    ret = pthread_mutex_init(&priv->lock, NULL); +    if (ret != 0) +        goto free_priv; +    ret = pthread_cond_init(&priv->cond, NULL); +    if (ret != 0) +        goto cleanup_mutex; + +    priv->api = NULL; +    return priv; + +cleanup_mutex: +    (void)pthread_mutex_destroy(&priv->lock); +free_priv: +    GF_FREE(priv); +error_return: +    return NULL;  } -#define GF_CHANGELOG_EVENT_POOL_SIZE   16384 +#define GF_CHANGELOG_EVENT_POOL_SIZE 16384  #define GF_CHANGELOG_EVENT_THREAD_COUNT 4  static int -gf_changelog_ctx_defaults_init (glusterfs_ctx_t *ctx) +gf_changelog_ctx_defaults_init(glusterfs_ctx_t *ctx)  { -        cmd_args_t    *cmd_args = NULL; -        struct rlimit  lim = {0, }; -        call_pool_t   *pool = NULL; -        int            ret         = -1; - -        ret = xlator_mem_acct_init (THIS, gf_changelog_mt_end); -        if (ret != 0) -                return -1; +    cmd_args_t *cmd_args = NULL; +    struct rlimit lim = { +        0, +    }; +    call_pool_t *pool = NULL; +    int ret = -1; + +    ret = xlator_mem_acct_init(THIS, gf_changelog_mt_end); +    if (ret != 0) +        return -1; -        ctx->process_uuid = generate_glusterfs_ctx_id (); -        if (!ctx->process_uuid) -                return -1; +    ctx->process_uuid = generate_glusterfs_ctx_id(); +    if (!ctx->process_uuid) +        return -1; -        ctx->page_size  = 128 * GF_UNIT_KB; +    ctx->page_size = 128 * GF_UNIT_KB; -        ctx->iobuf_pool = iobuf_pool_new (); -        if (!ctx->iobuf_pool) -                return -1; +    ctx->iobuf_pool = iobuf_pool_new(); +    if (!ctx->iobuf_pool) +        return -1; -        ctx->event_pool = event_pool_new (GF_CHANGELOG_EVENT_POOL_SIZE, -                                          GF_CHANGELOG_EVENT_THREAD_COUNT); -        if (!ctx->event_pool) -                return -1; +    ctx->event_pool = event_pool_new(GF_CHANGELOG_EVENT_POOL_SIZE, +                                     GF_CHANGELOG_EVENT_THREAD_COUNT); +    if (!ctx->event_pool) +        return -1; -        pool = GF_CALLOC (1, sizeof (call_pool_t), -                          gf_changelog_mt_libgfchangelog_call_pool_t); -        if (!pool) -                return -1; +    pool = GF_CALLOC(1, sizeof(call_pool_t), +                     gf_changelog_mt_libgfchangelog_call_pool_t); +    if (!pool) +        return -1; -        /* frame_mem_pool size 112 * 64 */ -        pool->frame_mem_pool = mem_pool_new (call_frame_t, 32); -        if (!pool->frame_mem_pool) -                return -1; +    /* frame_mem_pool size 112 * 64 */ +    pool->frame_mem_pool = mem_pool_new(call_frame_t, 32); +    if (!pool->frame_mem_pool) +        return -1; -        /* stack_mem_pool size 256 * 128 */ -        pool->stack_mem_pool = mem_pool_new (call_stack_t, 16); +    /* stack_mem_pool size 256 * 128 */ +    pool->stack_mem_pool = mem_pool_new(call_stack_t, 16); -        if (!pool->stack_mem_pool) -                return -1; +    if (!pool->stack_mem_pool) +        return -1; -        ctx->stub_mem_pool = mem_pool_new (call_stub_t, 16); -        if (!ctx->stub_mem_pool) -                return -1; +    ctx->stub_mem_pool = mem_pool_new(call_stub_t, 16); +    if (!ctx->stub_mem_pool) +        return -1; -        ctx->dict_pool = mem_pool_new (dict_t, 32); -        if (!ctx->dict_pool) -                return -1; +    ctx->dict_pool = mem_pool_new(dict_t, 32); +    if (!ctx->dict_pool) +        return -1; -        ctx->dict_pair_pool = mem_pool_new (data_pair_t, 512); -        if (!ctx->dict_pair_pool) -                return -1; +    ctx->dict_pair_pool = mem_pool_new(data_pair_t, 512); +    if (!ctx->dict_pair_pool) +        return -1; -        ctx->dict_data_pool = mem_pool_new (data_t, 512); -        if (!ctx->dict_data_pool) -                return -1; +    ctx->dict_data_pool = mem_pool_new(data_t, 512); +    if (!ctx->dict_data_pool) +        return -1; -        ctx->logbuf_pool = mem_pool_new (log_buf_t, 256); -        if (!ctx->logbuf_pool) -                return -1; +    ctx->logbuf_pool = mem_pool_new(log_buf_t, 256); +    if (!ctx->logbuf_pool) +        return -1; -        INIT_LIST_HEAD (&pool->all_frames); -        LOCK_INIT (&pool->lock); -        ctx->pool = pool; +    INIT_LIST_HEAD(&pool->all_frames); +    LOCK_INIT(&pool->lock); +    ctx->pool = pool; -        LOCK_INIT (&ctx->lock); +    LOCK_INIT(&ctx->lock); -        cmd_args = &ctx->cmd_args; +    cmd_args = &ctx->cmd_args; -        INIT_LIST_HEAD (&cmd_args->xlator_options); +    INIT_LIST_HEAD(&cmd_args->xlator_options); -        lim.rlim_cur = RLIM_INFINITY; -        lim.rlim_max = RLIM_INFINITY; -        setrlimit (RLIMIT_CORE, &lim); +    lim.rlim_cur = RLIM_INFINITY; +    lim.rlim_max = RLIM_INFINITY; +    setrlimit(RLIMIT_CORE, &lim); -        return 0; +    return 0;  }  /* TODO: cleanup ctx defaults */  void -gf_changelog_cleanup_this (xlator_t *this) +gf_changelog_cleanup_this(xlator_t *this)  { -        glusterfs_ctx_t *ctx = NULL; +    glusterfs_ctx_t *ctx = NULL; -        if (!this) -                return; +    if (!this) +        return; -        ctx = this->ctx; -        syncenv_destroy (ctx->env); -        free (ctx); +    ctx = this->ctx; +    syncenv_destroy(ctx->env); +    free(ctx); -        this->private = NULL; -        this->ctx = NULL; +    this->private = NULL; +    this->ctx = NULL; -        mem_pools_fini (); +    mem_pools_fini();  }  static int -gf_changelog_init_context () +gf_changelog_init_context()  { -        glusterfs_ctx_t *ctx = NULL; +    glusterfs_ctx_t *ctx = NULL; -        ctx = glusterfs_ctx_new (); -        if (!ctx) -                goto error_return; +    ctx = glusterfs_ctx_new(); +    if (!ctx) +        goto error_return; -        if (glusterfs_globals_init (ctx)) -                goto free_ctx; +    if (glusterfs_globals_init(ctx)) +        goto free_ctx; -        THIS->ctx = ctx; -        if (gf_changelog_ctx_defaults_init (ctx)) -                goto free_ctx; +    THIS->ctx = ctx; +    if (gf_changelog_ctx_defaults_init(ctx)) +        goto free_ctx; -        ctx->env = syncenv_new (0, 0, 0); -        if (!ctx->env) -                goto free_ctx; -        return 0; +    ctx->env = syncenv_new(0, 0, 0); +    if (!ctx->env) +        goto free_ctx; +    return 0; - free_ctx: -        free (ctx); -        THIS->ctx = NULL; - error_return: -        return -1; +free_ctx: +    free(ctx); +    THIS->ctx = NULL; +error_return: +    return -1;  }  static int -gf_changelog_init_master () +gf_changelog_init_master()  { -        int              ret = 0; +    int ret = 0; -        mem_pools_init_early (); -        ret = gf_changelog_init_context (); -        mem_pools_init_late (); +    mem_pools_init_early(); +    ret = gf_changelog_init_context(); +    mem_pools_init_late(); -        return ret; +    return ret;  }  /* TODO: cleanup clnt/svc on failure */  int -gf_changelog_setup_rpc (xlator_t *this, -                        gf_changelog_t *entry, int proc) +gf_changelog_setup_rpc(xlator_t *this, gf_changelog_t *entry, int proc)  { -        int              ret = 0; -        rpcsvc_t        *svc = NULL; -        struct rpc_clnt *rpc = NULL; - -        /** -         * Initialize a connect back socket. A probe() RPC call to the server -         * triggers a reverse connect. -         */ -        svc = gf_changelog_reborp_init_rpc_listner (this, entry->brick, -                                                    RPC_SOCK (entry), entry); -        if (!svc) -                goto error_return; -        RPC_REBORP (entry) = svc; - -        /* Initialize an RPC client */ -        rpc = gf_changelog_rpc_init (this, entry); -        if (!rpc) -                goto error_return; -        RPC_PROBER (entry) = rpc; - -        /** -         * @FIXME -         * till we have connection state machine, let's delay the RPC call -         * for now.. -         */ -        sleep (2); - -        /** -         * Probe changelog translator for reverse connection. After a successful -         * call, there's less use of the client and can be disconnected, but -         * let's leave the connection active for any future RPC calls. -         */ -        ret = gf_changelog_invoke_rpc (this, entry, proc); -        if (ret) { -                gf_msg (this->name, GF_LOG_ERROR, 0, -                        CHANGELOG_LIB_MSG_INVOKE_RPC_FAILED, -                        "Could not initiate probe RPC, bailing out!!!"); -                goto error_return; -        } - -        return 0; - - error_return: -        return -1; +    int ret = 0; +    rpcsvc_t *svc = NULL; +    struct rpc_clnt *rpc = NULL; + +    /** +     * Initialize a connect back socket. A probe() RPC call to the server +     * triggers a reverse connect. +     */ +    svc = gf_changelog_reborp_init_rpc_listner(this, entry->brick, +                                               RPC_SOCK(entry), entry); +    if (!svc) +        goto error_return; +    RPC_REBORP(entry) = svc; + +    /* Initialize an RPC client */ +    rpc = gf_changelog_rpc_init(this, entry); +    if (!rpc) +        goto error_return; +    RPC_PROBER(entry) = rpc; + +    /** +     * @FIXME +     * till we have connection state machine, let's delay the RPC call +     * for now.. +     */ +    sleep(2); + +    /** +     * Probe changelog translator for reverse connection. After a successful +     * call, there's less use of the client and can be disconnected, but +     * let's leave the connection active for any future RPC calls. +     */ +    ret = gf_changelog_invoke_rpc(this, entry, proc); +    if (ret) { +        gf_msg(this->name, GF_LOG_ERROR, 0, CHANGELOG_LIB_MSG_INVOKE_RPC_FAILED, +               "Could not initiate probe RPC, bailing out!!!"); +        goto error_return; +    } + +    return 0; + +error_return: +    return -1;  }  int -gf_cleanup_event (xlator_t *this, struct gf_event_list *ev) +gf_cleanup_event(xlator_t *this, struct gf_event_list *ev)  { -        int ret = 0; - -        ret = gf_thread_cleanup (this, ev->invoker); -        if (ret) { -                gf_msg (this->name, GF_LOG_WARNING, -ret, -                        CHANGELOG_LIB_MSG_CLEANUP_ERROR, -                        "cannot cleanup callback invoker thread." -                        " Not freeing resources"); -                return -1; -        } +    int ret = 0; + +    ret = gf_thread_cleanup(this, ev->invoker); +    if (ret) { +        gf_msg(this->name, GF_LOG_WARNING, -ret, +               CHANGELOG_LIB_MSG_CLEANUP_ERROR, +               "cannot cleanup callback invoker thread." +               " Not freeing resources"); +        return -1; +    } -        ev->entry = NULL; +    ev->entry = NULL; -        return 0; +    return 0;  }  static int -gf_init_event (gf_changelog_t *entry) +gf_init_event(gf_changelog_t *entry)  { -        int ret = 0; -        struct gf_event_list *ev = NULL; - -        ev = &entry->event; -        ev->entry = entry; - -        ret = pthread_mutex_init (&ev->lock, NULL); -        if (ret != 0) -                goto error_return; -        ret = pthread_cond_init (&ev->cond, NULL); -        if (ret != 0) -                goto cleanup_mutex; -        INIT_LIST_HEAD (&ev->events); - -        ev->next_seq = 0;  /* bootstrap sequencing */ - -        if (GF_NEED_ORDERED_EVENTS (entry)) { -                entry->pickevent  = pick_event_ordered; -                entry->queueevent = queue_ordered_event; -        } else { -                entry->pickevent  = pick_event_unordered; -                entry->queueevent = queue_unordered_event; -        } - -        ret = gf_thread_create (&ev->invoker, NULL, -                                gf_changelog_callback_invoker, ev, "clogcbki"); -        if (ret != 0) { -                entry->pickevent = NULL; -                entry->queueevent = NULL; -                goto cleanup_cond; -        } - -        return 0; - - cleanup_cond: -        (void) pthread_cond_destroy (&ev->cond); - cleanup_mutex: -        (void) pthread_mutex_destroy (&ev->lock); - error_return: -        return -1; +    int ret = 0; +    struct gf_event_list *ev = NULL; + +    ev = &entry->event; +    ev->entry = entry; + +    ret = pthread_mutex_init(&ev->lock, NULL); +    if (ret != 0) +        goto error_return; +    ret = pthread_cond_init(&ev->cond, NULL); +    if (ret != 0) +        goto cleanup_mutex; +    INIT_LIST_HEAD(&ev->events); + +    ev->next_seq = 0; /* bootstrap sequencing */ + +    if (GF_NEED_ORDERED_EVENTS(entry)) { +        entry->pickevent = pick_event_ordered; +        entry->queueevent = queue_ordered_event; +    } else { +        entry->pickevent = pick_event_unordered; +        entry->queueevent = queue_unordered_event; +    } + +    ret = gf_thread_create(&ev->invoker, NULL, gf_changelog_callback_invoker, +                           ev, "clogcbki"); +    if (ret != 0) { +        entry->pickevent = NULL; +        entry->queueevent = NULL; +        goto cleanup_cond; +    } + +    return 0; + +cleanup_cond: +    (void)pthread_cond_destroy(&ev->cond); +cleanup_mutex: +    (void)pthread_mutex_destroy(&ev->lock); +error_return: +    return -1;  }  /** @@ -339,251 +339,242 @@ gf_init_event (gf_changelog_t *entry)   *  - destroy rpc{-clnt, svc}   */  int -gf_cleanup_brick_connection (xlator_t *this, gf_changelog_t *entry) +gf_cleanup_brick_connection(xlator_t *this, gf_changelog_t *entry)  { -        return 0; +    return 0;  }  int -gf_cleanup_connections (xlator_t *this) +gf_cleanup_connections(xlator_t *this)  { -        return 0; +    return 0;  }  static int -gf_setup_brick_connection (xlator_t *this, -                           struct gf_brick_spec *brick, -                           gf_boolean_t ordered, void *xl) +gf_setup_brick_connection(xlator_t *this, struct gf_brick_spec *brick, +                          gf_boolean_t ordered, void *xl)  { -        int ret = 0; -        gf_private_t *priv = NULL; -        gf_changelog_t *entry = NULL; - -        priv = this->private; - -        if (!brick->callback || !brick->init || !brick->fini) -                goto error_return; - -        entry = GF_CALLOC (1, sizeof (*entry), -                           gf_changelog_mt_libgfchangelog_t); -        if (!entry) -                goto error_return; -        INIT_LIST_HEAD (&entry->list); - -        LOCK_INIT (&entry->statelock); -        entry->connstate = GF_CHANGELOG_CONN_STATE_PENDING; - -        entry->notify = brick->filter; -        if (snprintf (entry->brick, PATH_MAX, "%s", brick->brick_path) -            >= PATH_MAX) -                goto free_entry; - -        entry->this = this; -        entry->invokerxl = xl; - -        entry->ordered = ordered; -        ret = gf_init_event (entry); -        if (ret) -                goto free_entry; - -        entry->fini         = brick->fini; -        entry->callback     = brick->callback; -        entry->connected    = brick->connected; -        entry->disconnected = brick->disconnected; - -        entry->ptr = brick->init (this, brick); -        if (!entry->ptr) -                goto cleanup_event; -        priv->api = entry->ptr;  /* pointer to API, if required */ - -        pthread_mutex_lock (&priv->lock); -        { -                list_add_tail (&entry->list, &priv->connections); -        } -        pthread_mutex_unlock (&priv->lock); - -        ret = gf_changelog_setup_rpc (this, entry, CHANGELOG_RPC_PROBE_FILTER); -        if (ret) -                goto cleanup_event; -        return 0; - - cleanup_event: -        (void) gf_cleanup_event (this, &entry->event); - free_entry: -        gf_msg_debug (this->name, 0, "freeing entry %p", entry); -        list_del (&entry->list); /* FIXME: kludge for now */ -        GF_FREE (entry); - error_return: -        return -1; +    int ret = 0; +    gf_private_t *priv = NULL; +    gf_changelog_t *entry = NULL; + +    priv = this->private; + +    if (!brick->callback || !brick->init || !brick->fini) +        goto error_return; + +    entry = GF_CALLOC(1, sizeof(*entry), gf_changelog_mt_libgfchangelog_t); +    if (!entry) +        goto error_return; +    INIT_LIST_HEAD(&entry->list); + +    LOCK_INIT(&entry->statelock); +    entry->connstate = GF_CHANGELOG_CONN_STATE_PENDING; + +    entry->notify = brick->filter; +    if (snprintf(entry->brick, PATH_MAX, "%s", brick->brick_path) >= PATH_MAX) +        goto free_entry; + +    entry->this = this; +    entry->invokerxl = xl; + +    entry->ordered = ordered; +    ret = gf_init_event(entry); +    if (ret) +        goto free_entry; + +    entry->fini = brick->fini; +    entry->callback = brick->callback; +    entry->connected = brick->connected; +    entry->disconnected = brick->disconnected; + +    entry->ptr = brick->init(this, brick); +    if (!entry->ptr) +        goto cleanup_event; +    priv->api = entry->ptr; /* pointer to API, if required */ + +    pthread_mutex_lock(&priv->lock); +    { +        list_add_tail(&entry->list, &priv->connections); +    } +    pthread_mutex_unlock(&priv->lock); + +    ret = gf_changelog_setup_rpc(this, entry, CHANGELOG_RPC_PROBE_FILTER); +    if (ret) +        goto cleanup_event; +    return 0; + +cleanup_event: +    (void)gf_cleanup_event(this, &entry->event); +free_entry: +    gf_msg_debug(this->name, 0, "freeing entry %p", entry); +    list_del(&entry->list); /* FIXME: kludge for now */ +    GF_FREE(entry); +error_return: +    return -1;  }  int -gf_changelog_register_brick (xlator_t *this, -                             struct gf_brick_spec *brick, -                             gf_boolean_t ordered, void *xl) +gf_changelog_register_brick(xlator_t *this, struct gf_brick_spec *brick, +                            gf_boolean_t ordered, void *xl)  { -        return gf_setup_brick_connection (this, brick, ordered, xl); +    return gf_setup_brick_connection(this, brick, ordered, xl);  }  static int -gf_changelog_setup_logging (xlator_t *this, char *logfile, int loglevel) +gf_changelog_setup_logging(xlator_t *this, char *logfile, int loglevel)  { -        /* passing ident as NULL means to use default ident for syslog */ -        if (gf_log_init (this->ctx, logfile, NULL)) -                return -1; +    /* passing ident as NULL means to use default ident for syslog */ +    if (gf_log_init(this->ctx, logfile, NULL)) +        return -1; -        gf_log_set_loglevel (this->ctx, (loglevel == -1) ? GF_LOG_INFO : -                             loglevel); -        return 0; +    gf_log_set_loglevel(this->ctx, (loglevel == -1) ? GF_LOG_INFO : loglevel); +    return 0;  }  static int -gf_changelog_set_master (xlator_t *master, void *xl) +gf_changelog_set_master(xlator_t *master, void *xl)  { -        int32_t ret = 0; -        xlator_t *this = NULL; -        xlator_t *old_this = NULL; -        gf_private_t *priv = NULL; - -        this = xl; -        if (!this || !this->ctx) { -                ret = gf_changelog_init_master (); -                if (ret) -                        return -1; -                this = THIS; -        } +    int32_t ret = 0; +    xlator_t *this = NULL; +    xlator_t *old_this = NULL; +    gf_private_t *priv = NULL; + +    this = xl; +    if (!this || !this->ctx) { +        ret = gf_changelog_init_master(); +        if (ret) +            return -1; +        this = THIS; +    } -        master->ctx = this->ctx; +    master->ctx = this->ctx; -        INIT_LIST_HEAD (&master->volume_options); -        SAVE_THIS (THIS); +    INIT_LIST_HEAD(&master->volume_options); +    SAVE_THIS(THIS); -        ret = xlator_mem_acct_init (THIS, gf_changelog_mt_end); -        if (ret != 0) -                goto restore_this; +    ret = xlator_mem_acct_init(THIS, gf_changelog_mt_end); +    if (ret != 0) +        goto restore_this; -        priv = gf_changelog_alloc_priv (); -        if (!priv) { -                ret = -1; -                goto restore_this; -        } +    priv = gf_changelog_alloc_priv(); +    if (!priv) { +        ret = -1; +        goto restore_this; +    } -        if (!xl) { -                /* poller thread */ -                ret = gf_thread_create (&priv->poller, -                                        NULL, changelog_rpc_poller, THIS, -                                        "clogpoll"); -                if (ret != 0) { -                        GF_FREE (priv); -                        gf_msg (master->name, GF_LOG_ERROR, 0, -                                CHANGELOG_LIB_MSG_THREAD_CREATION_FAILED, -                                "failed to spawn poller thread"); -                        goto restore_this; -                } +    if (!xl) { +        /* poller thread */ +        ret = gf_thread_create(&priv->poller, NULL, changelog_rpc_poller, THIS, +                               "clogpoll"); +        if (ret != 0) { +            GF_FREE(priv); +            gf_msg(master->name, GF_LOG_ERROR, 0, +                   CHANGELOG_LIB_MSG_THREAD_CREATION_FAILED, +                   "failed to spawn poller thread"); +            goto restore_this;          } +    } -        master->private = priv; +    master->private = priv; - restore_this: -        RESTORE_THIS (); +restore_this: +    RESTORE_THIS(); -        return ret; +    return ret;  }  int -gf_changelog_init (void *xl) +gf_changelog_init(void *xl)  { -        int ret = 0; -        gf_private_t *priv = NULL; - -        if (master) -                return 0; - -        master = calloc (1, sizeof (*master)); -        if (!master) -                goto error_return; - -        master->name = strdup ("gfchangelog"); -        if (!master->name) -                goto dealloc_master; - -        ret = gf_changelog_set_master (master, xl); -        if (ret) -                goto dealloc_name; - -        priv = master->private; -        ret = gf_thread_create (&priv->connectionjanitor, NULL, -                                gf_changelog_connection_janitor, master, -                                "clogjan"); -        if (ret != 0) { -                /* TODO: cleanup priv, mutex (poller thread for !xl) */ -                goto dealloc_name; -        } +    int ret = 0; +    gf_private_t *priv = NULL; +    if (master)          return 0; - dealloc_name: -        free (master->name); - dealloc_master: -        free (master); -        master = NULL; - error_return: -        return -1; +    master = calloc(1, sizeof(*master)); +    if (!master) +        goto error_return; + +    master->name = strdup("gfchangelog"); +    if (!master->name) +        goto dealloc_master; + +    ret = gf_changelog_set_master(master, xl); +    if (ret) +        goto dealloc_name; + +    priv = master->private; +    ret = gf_thread_create(&priv->connectionjanitor, NULL, +                           gf_changelog_connection_janitor, master, "clogjan"); +    if (ret != 0) { +        /* TODO: cleanup priv, mutex (poller thread for !xl) */ +        goto dealloc_name; +    } + +    return 0; + +dealloc_name: +    free(master->name); +dealloc_master: +    free(master); +    master = NULL; +error_return: +    return -1;  }  int -gf_changelog_register_generic (struct gf_brick_spec *bricks, int count, -                               int ordered, char *logfile, int lvl, void *xl) +gf_changelog_register_generic(struct gf_brick_spec *bricks, int count, +                              int ordered, char *logfile, int lvl, void *xl)  { -        int                   ret        = 0; -        xlator_t             *this       = NULL; -        xlator_t             *old_this   = NULL; -        struct gf_brick_spec *brick      = NULL; -        gf_boolean_t          need_order = _gf_false; +    int ret = 0; +    xlator_t *this = NULL; +    xlator_t *old_this = NULL; +    struct gf_brick_spec *brick = NULL; +    gf_boolean_t need_order = _gf_false; -        SAVE_THIS (xl); +    SAVE_THIS(xl); -        this = THIS; -        if (!this) -                goto error_return; +    this = THIS; +    if (!this) +        goto error_return; -        ret = gf_changelog_setup_logging (this, logfile, lvl); -        if (ret) -                goto error_return; - -        need_order = (ordered) ? _gf_true : _gf_false; - -        brick = bricks; -        while (count--) { -                gf_smsg (this->name, GF_LOG_INFO, 0, -                         CHANGELOG_LIB_MSG_NOTIFY_REGISTER_INFO, -                         "Registering brick", -                         "brick=%s", brick->brick_path, -                         "notify_filter=%d", brick->filter, -                         NULL); - -                ret = gf_changelog_register_brick (this, brick, need_order, xl); -                if (ret != 0) { -                        gf_msg (this->name, GF_LOG_ERROR, 0, -                                CHANGELOG_LIB_MSG_NOTIFY_REGISTER_FAILED, -                                "Error registering with changelog xlator"); -                        break; -                } - -                brick++; +    ret = gf_changelog_setup_logging(this, logfile, lvl); +    if (ret) +        goto error_return; + +    need_order = (ordered) ? _gf_true : _gf_false; + +    brick = bricks; +    while (count--) { +        gf_smsg(this->name, GF_LOG_INFO, 0, +                CHANGELOG_LIB_MSG_NOTIFY_REGISTER_INFO, "Registering brick", +                "brick=%s", brick->brick_path, "notify_filter=%d", +                brick->filter, NULL); + +        ret = gf_changelog_register_brick(this, brick, need_order, xl); +        if (ret != 0) { +            gf_msg(this->name, GF_LOG_ERROR, 0, +                   CHANGELOG_LIB_MSG_NOTIFY_REGISTER_FAILED, +                   "Error registering with changelog xlator"); +            break;          } -        if (ret != 0) -                goto cleanup_inited_bricks; +        brick++; +    } -        RESTORE_THIS(); -        return 0; +    if (ret != 0) +        goto cleanup_inited_bricks; - cleanup_inited_bricks: -        gf_cleanup_connections (this); - error_return: -        RESTORE_THIS(); -        return -1; +    RESTORE_THIS(); +    return 0; + +cleanup_inited_bricks: +    gf_cleanup_connections(this); +error_return: +    RESTORE_THIS(); +    return -1;  }  /** @@ -610,27 +601,29 @@ gf_changelog_register_generic (struct gf_brick_spec *bricks, int count,   * For generic API, refer gf_changelog_register_generic().   */  int -gf_changelog_register (char *brick_path, char *scratch_dir, -                       char *log_file, int log_level, int max_reconnects) +gf_changelog_register(char *brick_path, char *scratch_dir, char *log_file, +                      int log_level, int max_reconnects)  { -        struct gf_brick_spec brick = {0,}; +    struct gf_brick_spec brick = { +        0, +    }; -        if (master) -                THIS = master; -        else -                return -1; +    if (master) +        THIS = master; +    else +        return -1; -        brick.brick_path = brick_path; -        brick.filter     = CHANGELOG_OP_TYPE_JOURNAL; +    brick.brick_path = brick_path; +    brick.filter = CHANGELOG_OP_TYPE_JOURNAL; -        brick.init         = gf_changelog_journal_init; -        brick.fini         = gf_changelog_journal_fini; -        brick.callback     = gf_changelog_handle_journal; -        brick.connected    = gf_changelog_journal_connect; -        brick.disconnected = gf_changelog_journal_disconnect; +    brick.init = gf_changelog_journal_init; +    brick.fini = gf_changelog_journal_fini; +    brick.callback = gf_changelog_handle_journal; +    brick.connected = gf_changelog_journal_connect; +    brick.disconnected = gf_changelog_journal_disconnect; -        brick.ptr = scratch_dir; +    brick.ptr = scratch_dir; -        return gf_changelog_register_generic (&brick, 1, 1, -                                              log_file, log_level, NULL); +    return gf_changelog_register_generic(&brick, 1, 1, log_file, log_level, +                                         NULL);  } diff --git a/xlators/features/changelog/lib/src/gf-history-changelog.c b/xlators/features/changelog/lib/src/gf-history-changelog.c index 5259ae3893b..c8a31ebbd73 100644 --- a/xlators/features/changelog/lib/src/gf-history-changelog.c +++ b/xlators/features/changelog/lib/src/gf-history-changelog.c @@ -36,62 +36,60 @@   *   -1: On error.   */  int -gf_history_changelog_done (char *file) +gf_history_changelog_done(char *file)  { -        int                     ret               = -1; -        char                   *buffer            = NULL; -        xlator_t               *this              = NULL; -        gf_changelog_journal_t *jnl               = NULL; -        gf_changelog_journal_t *hist_jnl          = NULL; -        char                    to_path[PATH_MAX] = {0,}; +    int ret = -1; +    char *buffer = NULL; +    xlator_t *this = NULL; +    gf_changelog_journal_t *jnl = NULL; +    gf_changelog_journal_t *hist_jnl = NULL; +    char to_path[PATH_MAX] = { +        0, +    }; + +    errno = EINVAL; + +    this = THIS; +    if (!this) +        goto out; + +    jnl = (gf_changelog_journal_t *)GF_CHANGELOG_GET_API_PTR(this); +    if (!jnl) +        goto out; + +    hist_jnl = jnl->hist_jnl; +    if (!hist_jnl) +        goto out; + +    if (!file || !strlen(file)) +        goto out; + +    /* make sure 'file' is inside ->jnl_working_dir */ +    buffer = realpath(file, NULL); +    if (!buffer) +        goto out; + +    if (strncmp(hist_jnl->jnl_working_dir, buffer, +                strlen(hist_jnl->jnl_working_dir))) +        goto out; + +    (void)snprintf(to_path, PATH_MAX, "%s%s", hist_jnl->jnl_processed_dir, +                   basename(buffer)); +    gf_msg_debug(this->name, 0, "moving %s to processed directory", file); +    ret = sys_rename(buffer, to_path); +    if (ret) { +        gf_smsg(this->name, GF_LOG_ERROR, errno, +                CHANGELOG_LIB_MSG_RENAME_FAILED, "cannot move changelog file", +                "from=%s", file, "to=%s", to_path, NULL); +        goto out; +    } + +    ret = 0; -        errno = EINVAL; - -        this = THIS; -        if (!this) -                goto out; - -        jnl = (gf_changelog_journal_t *) GF_CHANGELOG_GET_API_PTR (this); -        if (!jnl) -                goto out; - -        hist_jnl = jnl->hist_jnl; -        if (!hist_jnl) -                goto out; - -        if (!file || !strlen (file)) -                goto out; - -        /* make sure 'file' is inside ->jnl_working_dir */ -        buffer = realpath (file, NULL); -        if (!buffer) -                goto out; - -        if (strncmp (hist_jnl->jnl_working_dir, -                     buffer, strlen (hist_jnl->jnl_working_dir))) -                goto out; - -        (void) snprintf (to_path, PATH_MAX, "%s%s", -                         hist_jnl->jnl_processed_dir, basename (buffer)); -        gf_msg_debug (this->name, 0, -                      "moving %s to processed directory", file); -        ret = sys_rename (buffer, to_path); -        if (ret) { -                gf_smsg (this->name, GF_LOG_ERROR, errno, -                         CHANGELOG_LIB_MSG_RENAME_FAILED, -                         "cannot move changelog file", -                         "from=%s", file, -                         "to=%s", to_path, -                         NULL); -                goto out; -        } - -        ret = 0; - - out: -        if (buffer) -                free (buffer); /* allocated by realpath() */ -        return ret; +out: +    if (buffer) +        free(buffer); /* allocated by realpath() */ +    return ret;  }  /** @@ -105,33 +103,33 @@ gf_history_changelog_done (char *file)   *    -1: On error.   */  int -gf_history_changelog_start_fresh () +gf_history_changelog_start_fresh()  { -        xlator_t               *this     = NULL; -        gf_changelog_journal_t *jnl      = NULL; -        gf_changelog_journal_t *hist_jnl = NULL; +    xlator_t *this = NULL; +    gf_changelog_journal_t *jnl = NULL; +    gf_changelog_journal_t *hist_jnl = NULL; -        this = THIS; -        if (!this) -                goto out; +    this = THIS; +    if (!this) +        goto out; -        errno = EINVAL; +    errno = EINVAL; -        jnl = (gf_changelog_journal_t *) GF_CHANGELOG_GET_API_PTR (this); -        if (!jnl) -                goto out; +    jnl = (gf_changelog_journal_t *)GF_CHANGELOG_GET_API_PTR(this); +    if (!jnl) +        goto out; -        hist_jnl = jnl->hist_jnl; -        if (!hist_jnl) -                goto out; +    hist_jnl = jnl->hist_jnl; +    if (!hist_jnl) +        goto out; -        if (gf_ftruncate (hist_jnl->jnl_fd, 0)) -                goto out; +    if (gf_ftruncate(hist_jnl->jnl_fd, 0)) +        goto out; -        return 0; +    return 0; - out: -        return -1; +out: +    return -1;  }  /** @@ -150,50 +148,52 @@ gf_history_changelog_start_fresh ()   *     -1  : On error.   */  ssize_t -gf_history_changelog_next_change (char *bufptr, size_t maxlen) +gf_history_changelog_next_change(char *bufptr, size_t maxlen)  { -        ssize_t                 size             = -1; -        int                     tracker_fd       = 0; -        xlator_t               *this             = NULL; -        gf_changelog_journal_t *jnl              = NULL; -        gf_changelog_journal_t *hist_jnl         = NULL; -        char                    buffer[PATH_MAX] = {0,}; - -        if (maxlen > PATH_MAX) { -                errno = ENAMETOOLONG; -                goto out; -        } +    ssize_t size = -1; +    int tracker_fd = 0; +    xlator_t *this = NULL; +    gf_changelog_journal_t *jnl = NULL; +    gf_changelog_journal_t *hist_jnl = NULL; +    char buffer[PATH_MAX] = { +        0, +    }; -        errno = EINVAL; +    if (maxlen > PATH_MAX) { +        errno = ENAMETOOLONG; +        goto out; +    } -        this = THIS; -        if (!this) -                goto out; +    errno = EINVAL; -        jnl = (gf_changelog_journal_t *) GF_CHANGELOG_GET_API_PTR (this); -        if (!jnl) -                goto out; +    this = THIS; +    if (!this) +        goto out; -        hist_jnl = jnl->hist_jnl; -        if (!hist_jnl) -                goto out; +    jnl = (gf_changelog_journal_t *)GF_CHANGELOG_GET_API_PTR(this); +    if (!jnl) +        goto out; -        tracker_fd = hist_jnl->jnl_fd; +    hist_jnl = jnl->hist_jnl; +    if (!hist_jnl) +        goto out; -        size = gf_readline (tracker_fd, buffer, maxlen); -        if (size < 0) { -                size = -1; -                goto out; -        } +    tracker_fd = hist_jnl->jnl_fd; -        if (size == 0) -                goto out; +    size = gf_readline(tracker_fd, buffer, maxlen); +    if (size < 0) { +        size = -1; +        goto out; +    } -        memcpy (bufptr, buffer, size - 1); -        bufptr[size - 1] = '\0'; +    if (size == 0) +        goto out; + +    memcpy(bufptr, buffer, size - 1); +    bufptr[size - 1] = '\0';  out: -        return size; +    return size;  }  /** @@ -214,97 +214,100 @@ out:   *   */  ssize_t -gf_history_changelog_scan () +gf_history_changelog_scan()  { -        int                     tracker_fd   = 0; -        size_t                  off          = 0; -        xlator_t               *this         = NULL; -        size_t                  nr_entries   = 0; -        gf_changelog_journal_t *jnl          = NULL; -        gf_changelog_journal_t *hist_jnl     = NULL; -        struct dirent          *entry        = NULL; -        struct dirent           scratch[2]   = {{0,},}; -        char buffer[PATH_MAX]                = {0,}; -        static int              is_last_scan; - -        this = THIS; -        if (!this) -                goto out; +    int tracker_fd = 0; +    size_t off = 0; +    xlator_t *this = NULL; +    size_t nr_entries = 0; +    gf_changelog_journal_t *jnl = NULL; +    gf_changelog_journal_t *hist_jnl = NULL; +    struct dirent *entry = NULL; +    struct dirent scratch[2] = { +        { +            0, +        }, +    }; +    char buffer[PATH_MAX] = { +        0, +    }; +    static int is_last_scan; + +    this = THIS; +    if (!this) +        goto out; + +    jnl = (gf_changelog_journal_t *)GF_CHANGELOG_GET_API_PTR(this); +    if (!jnl) +        goto out; +    if (JNL_IS_API_DISCONNECTED(jnl)) { +        errno = ENOTCONN; +        goto out; +    } + +    hist_jnl = jnl->hist_jnl; +    if (!hist_jnl) +        goto out; + +retry: +    if (is_last_scan == 1) +        return 0; +    if (hist_jnl->hist_done == 0) +        is_last_scan = 1; -        jnl = (gf_changelog_journal_t *) GF_CHANGELOG_GET_API_PTR (this); -        if (!jnl) -                goto out; -        if (JNL_IS_API_DISCONNECTED (jnl)) { -                errno = ENOTCONN; -                goto out; -        } +    errno = EINVAL; +    if (hist_jnl->hist_done == -1) +        goto out; -        hist_jnl = jnl->hist_jnl; -        if (!hist_jnl) -                goto out; +    tracker_fd = hist_jnl->jnl_fd; - retry: -        if (is_last_scan == 1) -                return 0; -        if (hist_jnl->hist_done == 0) -                is_last_scan = 1; +    if (gf_ftruncate(tracker_fd, 0)) +        goto out; -        errno = EINVAL; -        if (hist_jnl->hist_done == -1) -                goto out; +    rewinddir(hist_jnl->jnl_dir); -        tracker_fd = hist_jnl->jnl_fd; +    for (;;) { +        errno = 0; +        entry = sys_readdir(hist_jnl->jnl_dir, scratch); +        if (!entry || errno != 0) +            break; -        if (gf_ftruncate (tracker_fd, 0)) -                goto out; +        if (strcmp(basename(entry->d_name), ".") == 0 || +            strcmp(basename(entry->d_name), "..") == 0) +            continue; -        rewinddir (hist_jnl->jnl_dir); - -        for (;;) { -                errno = 0; -                entry = sys_readdir (hist_jnl->jnl_dir, scratch); -                if (!entry || errno != 0) -                        break; - -                if (strcmp (basename (entry->d_name), ".") == 0 || -                    strcmp (basename (entry->d_name), "..") == 0) -                        continue; - -                nr_entries++; - -                GF_CHANGELOG_FILL_BUFFER (hist_jnl->jnl_processing_dir, -                                          buffer, off, -                                          strlen (hist_jnl->jnl_processing_dir)); -                GF_CHANGELOG_FILL_BUFFER (entry->d_name, buffer, -                                          off, strlen (entry->d_name)); -                GF_CHANGELOG_FILL_BUFFER ("\n", buffer, off, 1); - -                if (gf_changelog_write (tracker_fd, buffer, off) != off) { -                        gf_msg (this->name, GF_LOG_ERROR, 0, -                                CHANGELOG_LIB_MSG_WRITE_FAILED, -                                "error writing changelog filename" -                                " to tracker file"); -                        break; -                } -                off = 0; -        } +        nr_entries++; -        gf_msg_debug (this->name, 0, -                      "hist_done %d, is_last_scan: %d", -                      hist_jnl->hist_done, is_last_scan); - -        if (!entry) { -                if (gf_lseek (tracker_fd, 0, SEEK_SET) != -1) { -                        if (nr_entries > 0) -                                return nr_entries; -                        else { -                                sleep(1); -                                goto retry; -                        } -                } +        GF_CHANGELOG_FILL_BUFFER(hist_jnl->jnl_processing_dir, buffer, off, +                                 strlen(hist_jnl->jnl_processing_dir)); +        GF_CHANGELOG_FILL_BUFFER(entry->d_name, buffer, off, +                                 strlen(entry->d_name)); +        GF_CHANGELOG_FILL_BUFFER("\n", buffer, off, 1); + +        if (gf_changelog_write(tracker_fd, buffer, off) != off) { +            gf_msg(this->name, GF_LOG_ERROR, 0, CHANGELOG_LIB_MSG_WRITE_FAILED, +                   "error writing changelog filename" +                   " to tracker file"); +            break;          } - out: -        return -1; +        off = 0; +    } + +    gf_msg_debug(this->name, 0, "hist_done %d, is_last_scan: %d", +                 hist_jnl->hist_done, is_last_scan); + +    if (!entry) { +        if (gf_lseek(tracker_fd, 0, SEEK_SET) != -1) { +            if (nr_entries > 0) +                return nr_entries; +            else { +                sleep(1); +                goto retry; +            } +        } +    } +out: +    return -1;  }  /* @@ -312,36 +315,36 @@ gf_history_changelog_scan ()   * Returns 0 on success(updates given time-stamp), -1 on failure.   */  int -gf_history_get_timestamp (int fd, int index, int len, -                          unsigned long *ts) +gf_history_get_timestamp(int fd, int index, int len, unsigned long *ts)  { -        xlator_t        *this             = NULL; -        int             n_read            = -1; -        char            path_buf[PATH_MAX]= {0,}; -        char            *iter             = path_buf; -        size_t          offset            = index * (len+1); -        unsigned long   value             = 0; -        int             ret               = 0; - -        this = THIS; -        if (!this) { -                return -1; -        } - -        n_read = sys_pread (fd, path_buf, len, offset); -        if (n_read < 0 ) { -                ret = -1; -                gf_msg (this->name, GF_LOG_ERROR, errno, -                         CHANGELOG_LIB_MSG_READ_ERROR, -                         "could not read from htime file"); -                goto out; -        } -        iter+= len - TIMESTAMP_LENGTH; -        sscanf (iter, "%lu",&value); +    xlator_t *this = NULL; +    int n_read = -1; +    char path_buf[PATH_MAX] = { +        0, +    }; +    char *iter = path_buf; +    size_t offset = index * (len + 1); +    unsigned long value = 0; +    int ret = 0; + +    this = THIS; +    if (!this) { +        return -1; +    } + +    n_read = sys_pread(fd, path_buf, len, offset); +    if (n_read < 0) { +        ret = -1; +        gf_msg(this->name, GF_LOG_ERROR, errno, CHANGELOG_LIB_MSG_READ_ERROR, +               "could not read from htime file"); +        goto out; +    } +    iter += len - TIMESTAMP_LENGTH; +    sscanf(iter, "%lu", &value);  out: -        if(ret == 0) -                *ts = value; -        return ret; +    if (ret == 0) +        *ts = value; +    return ret;  }  /* @@ -349,38 +352,37 @@ out:   * Checks whether @value is there next to @target_index or not   */  int -gf_history_check ( int fd, int target_index, unsigned long value, int len) +gf_history_check(int fd, int target_index, unsigned long value, int len)  { -        int             ret = 0; -        unsigned long   ts1 = 0; -        unsigned long   ts2 = 0; - -        if (target_index == 0) { -                ret = gf_history_get_timestamp (fd, target_index, len, &ts1); -                if (ret == -1) -                        goto out; -                if (value <= ts1) -                        goto out; -                else { -                        ret = -1; -                        goto out; -                } -        } +    int ret = 0; +    unsigned long ts1 = 0; +    unsigned long ts2 = 0; -        ret = gf_history_get_timestamp (fd, target_index, len, &ts1); -        if (ret ==-1) -                goto out; -        ret = gf_history_get_timestamp (fd, target_index -1, len, &ts2); -        if (ret ==-1) -                goto out; - -        if ( (value <= ts1) && (value > ts2) ) { -                goto out; +    if (target_index == 0) { +        ret = gf_history_get_timestamp(fd, target_index, len, &ts1); +        if (ret == -1) +            goto out; +        if (value <= ts1) +            goto out; +        else { +            ret = -1; +            goto out;          } -        else -                ret = -1; +    } + +    ret = gf_history_get_timestamp(fd, target_index, len, &ts1); +    if (ret == -1) +        goto out; +    ret = gf_history_get_timestamp(fd, target_index - 1, len, &ts2); +    if (ret == -1) +        goto out; + +    if ((value <= ts1) && (value > ts2)) { +        goto out; +    } else +        ret = -1;  out: -        return ret; +    return ret;  }  /* @@ -400,78 +402,69 @@ out:   */  int -gf_history_b_search (int fd, unsigned long value, -                     unsigned long from, unsigned long to, int len) +gf_history_b_search(int fd, unsigned long value, unsigned long from, +                    unsigned long to, int len)  { -        int             m_index   = -1; -        unsigned long   cur_value = 0; -        unsigned long   ts1       = 0; -        int             ret       = 0; - -        m_index = (from + to)/2; - -        if ( (to - from) <=1 ) { -                /* either one or 2 changelogs left */ -                if ( to != from ) { -                        /* check if value is less or greater than to -                         * return accordingly -                         */ -                        ret = gf_history_get_timestamp (fd, from, len, &ts1); -                        if (ret ==-1) -                                goto out; -                        if ( ts1 >= value) { -                                /* actually compatision should be -                                 * exactly == but considering -                                 * -                                 * case of only 2 changelogs in htime file -                                 */ -                                return from; -                        } -                        else -                                return to; -                } -                else -                        return to; -        } - -        ret = gf_history_get_timestamp (fd, m_index, len, &cur_value); +    int m_index = -1; +    unsigned long cur_value = 0; +    unsigned long ts1 = 0; +    int ret = 0; + +    m_index = (from + to) / 2; + +    if ((to - from) <= 1) { +        /* either one or 2 changelogs left */ +        if (to != from) { +            /* check if value is less or greater than to +             * return accordingly +             */ +            ret = gf_history_get_timestamp(fd, from, len, &ts1); +            if (ret == -1) +                goto out; +            if (ts1 >= value) { +                /* actually compatision should be +                 * exactly == but considering +                 * +                 * case of only 2 changelogs in htime file +                 */ +                return from; +            } else +                return to; +        } else +            return to; +    } + +    ret = gf_history_get_timestamp(fd, m_index, len, &cur_value); +    if (ret == -1) +        goto out; +    if (cur_value == value) { +        return m_index; +    } else if (value > cur_value) { +        ret = gf_history_get_timestamp(fd, m_index + 1, len, &cur_value);          if (ret == -1) +            goto out; +        if (value < cur_value) +            return m_index + 1; +        else +            return gf_history_b_search(fd, value, m_index + 1, to, len); +    } else { +        if (m_index == 0) { +            /*  we are sure that values exists +             *  in this htime file +             */ +            return 0; +        } else { +            ret = gf_history_get_timestamp(fd, m_index - 1, len, &cur_value); +            if (ret == -1)                  goto out; -        if (cur_value == value) { +            if (value > cur_value) {                  return m_index; +            } else +                return gf_history_b_search(fd, value, from, m_index - 1, len);          } -        else if (value > cur_value) { -                ret = gf_history_get_timestamp (fd, m_index+1, len, &cur_value); -                if (ret == -1) -                        goto out; -                if (value < cur_value) -                        return m_index + 1; -                else -                        return gf_history_b_search (fd, value, -                                                    m_index+1, to, len); -        } -        else { -                if (m_index ==0) { -                       /*  we are sure that values exists -                        *  in this htime file -                        */ -                        return 0; -                } -                else { -                        ret = gf_history_get_timestamp (fd, m_index-1, len, -                                                        &cur_value); -                        if (ret == -1) -                                goto out; -                        if (value > cur_value) { -                                return m_index; -                        } -                        else -                                return gf_history_b_search (fd, value, from, -                                                            m_index-1, len); -                } -        } +    }  out: -        return -1; +    return -1;  }  /* @@ -484,65 +477,60 @@ out:   * 0 : No, Not usable ( contains, "changelog")   */  int -gf_is_changelog_usable (char *cl_path) +gf_is_changelog_usable(char *cl_path)  { -        int             ret             = -1; -        const char      low_c[]         = "changelog"; -        char            *str_ret        = NULL; -        char            *bname          = NULL; +    int ret = -1; +    const char low_c[] = "changelog"; +    char *str_ret = NULL; +    char *bname = NULL; -        bname = basename (cl_path); +    bname = basename(cl_path); -        str_ret = strstr (bname, low_c); +    str_ret = strstr(bname, low_c); -        if (str_ret != NULL) -                ret = 0; -        else -                ret = 1; - -        return ret; +    if (str_ret != NULL) +        ret = 0; +    else +        ret = 1; +    return ret;  }  void * -gf_changelog_consume_wrap (void* data) +gf_changelog_consume_wrap(void *data)  { -        int                          ret   = -1; -        ssize_t                      nread = 0; -        xlator_t                    *this  = NULL; -        gf_changelog_consume_data_t *ccd   = NULL; - -        ccd = (gf_changelog_consume_data_t *) data; -        this = ccd->this; - -        ccd->retval = -1; - -        nread = sys_pread (ccd->fd, ccd->changelog, PATH_MAX-1, ccd->offset); -        if (nread < 0) { -                gf_msg (this->name, GF_LOG_ERROR, errno, -                        CHANGELOG_LIB_MSG_READ_ERROR, -                        "cannot read from history metadata file"); -                goto out; -        } - -        /* TODO: handle short reads and EOF. */ -        if (gf_is_changelog_usable (ccd->changelog) == 1) { - -                ret = gf_changelog_consume (ccd->this, -                                            ccd->jnl, ccd->changelog, _gf_true); -                if (ret) { -                        gf_smsg (this->name, GF_LOG_ERROR, -                                 0, CHANGELOG_LIB_MSG_PARSE_ERROR, -                                 "could not parse changelog", -                                 "name=%s", ccd->changelog, -                                 NULL); -                        goto out; -                } +    int ret = -1; +    ssize_t nread = 0; +    xlator_t *this = NULL; +    gf_changelog_consume_data_t *ccd = NULL; + +    ccd = (gf_changelog_consume_data_t *)data; +    this = ccd->this; + +    ccd->retval = -1; + +    nread = sys_pread(ccd->fd, ccd->changelog, PATH_MAX - 1, ccd->offset); +    if (nread < 0) { +        gf_msg(this->name, GF_LOG_ERROR, errno, CHANGELOG_LIB_MSG_READ_ERROR, +               "cannot read from history metadata file"); +        goto out; +    } + +    /* TODO: handle short reads and EOF. */ +    if (gf_is_changelog_usable(ccd->changelog) == 1) { +        ret = gf_changelog_consume(ccd->this, ccd->jnl, ccd->changelog, +                                   _gf_true); +        if (ret) { +            gf_smsg(this->name, GF_LOG_ERROR, 0, CHANGELOG_LIB_MSG_PARSE_ERROR, +                    "could not parse changelog", "name=%s", ccd->changelog, +                    NULL); +            goto out;          } -        ccd->retval = 0; +    } +    ccd->retval = 0; - out: -        return NULL; +out: +    return NULL;  }  /** @@ -551,133 +539,138 @@ gf_changelog_consume_wrap (void* data)   * to index "to" in open htime file whose fd is "fd".   */ -#define MAX_PARALLELS  10 +#define MAX_PARALLELS 10  void * -gf_history_consume (void * data) +gf_history_consume(void *data)  { -        xlator_t                    *this              = NULL; -        gf_changelog_journal_t              *jnl               = NULL; -        gf_changelog_journal_t              *hist_jnl          = NULL; -        int                          ret               = 0; -        int                          iter              = 0; -        int                          fd                = -1; -        int                          from              = -1; -        int                          to                = -1; -        int                          len               = -1; -        int                          n_parallel        = 0; -        int                          n_envoked         = 0; -        gf_boolean_t                 publish           = _gf_true; -        pthread_t th_id[MAX_PARALLELS]                 = {0,}; -        gf_changelog_history_data_t *hist_data         = NULL; -        gf_changelog_consume_data_t ccd[MAX_PARALLELS] = {{0},}; -        gf_changelog_consume_data_t *curr              = NULL; -        char thread_name[GF_THREAD_NAMEMAX]            = {0,}; - -        hist_data = (gf_changelog_history_data_t *) data; -        if (hist_data == NULL) { -                ret = -1; -                goto out; +    xlator_t *this = NULL; +    gf_changelog_journal_t *jnl = NULL; +    gf_changelog_journal_t *hist_jnl = NULL; +    int ret = 0; +    int iter = 0; +    int fd = -1; +    int from = -1; +    int to = -1; +    int len = -1; +    int n_parallel = 0; +    int n_envoked = 0; +    gf_boolean_t publish = _gf_true; +    pthread_t th_id[MAX_PARALLELS] = { +        0, +    }; +    gf_changelog_history_data_t *hist_data = NULL; +    gf_changelog_consume_data_t ccd[MAX_PARALLELS] = { +        {0}, +    }; +    gf_changelog_consume_data_t *curr = NULL; +    char thread_name[GF_THREAD_NAMEMAX] = { +        0, +    }; + +    hist_data = (gf_changelog_history_data_t *)data; +    if (hist_data == NULL) { +        ret = -1; +        goto out; +    } + +    fd = hist_data->htime_fd; +    from = hist_data->from; +    to = hist_data->to; +    len = hist_data->len; +    n_parallel = hist_data->n_parallel; + +    THIS = hist_data->this; +    this = hist_data->this; +    if (!this) { +        ret = -1; +        goto out; +    } + +    jnl = (gf_changelog_journal_t *)GF_CHANGELOG_GET_API_PTR(this); +    if (!jnl) { +        ret = -1; +        goto out; +    } + +    hist_jnl = jnl->hist_jnl; +    if (!hist_jnl) { +        ret = -1; +        goto out; +    } + +    while (from <= to) { +        n_envoked = 0; + +        for (iter = 0; (iter < n_parallel) && (from <= to); iter++) { +            curr = &ccd[iter]; + +            curr->this = this; +            curr->jnl = hist_jnl; +            curr->fd = fd; +            curr->offset = from * (len + 1); + +            curr->retval = 0; +            memset(curr->changelog, '\0', PATH_MAX); +            snprintf(thread_name, sizeof(thread_name), "clogc%03hx", +                     ((iter + 1) & 0x3ff)); + +            ret = gf_thread_create(&th_id[iter], NULL, +                                   gf_changelog_consume_wrap, curr, +                                   thread_name); +            if (ret) { +                gf_msg(this->name, GF_LOG_ERROR, ret, +                       CHANGELOG_LIB_MSG_THREAD_CREATION_FAILED, +                       "could not create consume-thread"); +                goto sync; +            } else +                n_envoked++; + +            from++;          } -        fd         = hist_data->htime_fd; -        from       = hist_data->from; -        to         = hist_data->to; -        len        = hist_data->len; -        n_parallel = hist_data->n_parallel; - -        THIS = hist_data->this; -        this = hist_data->this; -        if (!this) { -                ret = -1; -                goto out; +    sync: +        for (iter = 0; iter < n_envoked; iter++) { +            ret = pthread_join(th_id[iter], NULL); +            if (ret) { +                publish = _gf_false; +                gf_msg(this->name, GF_LOG_ERROR, ret, +                       CHANGELOG_LIB_MSG_PTHREAD_JOIN_FAILED, +                       "pthread_join() error"); +                /* try to join the rest */ +                continue; +            } + +            if (publish == _gf_false) +                continue; + +            curr = &ccd[iter]; +            if (ccd->retval) { +                publish = _gf_false; +                gf_msg(this->name, GF_LOG_ERROR, 0, +                       CHANGELOG_LIB_MSG_PARSE_ERROR, +                       "parsing error, ceased publishing..."); +                continue; +            } + +            ret = gf_changelog_publish(curr->this, curr->jnl, curr->changelog); +            if (ret) { +                publish = _gf_false; +                gf_msg(this->name, GF_LOG_ERROR, 0, +                       CHANGELOG_LIB_MSG_PUBLISH_ERROR, +                       "publish error, ceased publishing..."); +            }          } +    } -        jnl = (gf_changelog_journal_t *) GF_CHANGELOG_GET_API_PTR (this); -        if (!jnl) { -                ret = -1; -                goto out; -        } - -        hist_jnl = jnl->hist_jnl; -        if (!hist_jnl) { -                ret = -1; -                goto out; -        } - -        while (from <= to) { -                n_envoked = 0; - -                for (iter = 0 ; (iter < n_parallel) && (from <= to); iter++) { -                        curr = &ccd[iter]; - -                        curr->this   = this; -                        curr->jnl    = hist_jnl; -                        curr->fd     = fd; -                        curr->offset = from * (len + 1); - -                        curr->retval = 0; -                        memset (curr->changelog, '\0', PATH_MAX); -                        snprintf (thread_name, sizeof(thread_name), -                                  "clogc%03hx", ((iter + 1) & 0x3ff)); - -                        ret = gf_thread_create (&th_id[iter], NULL, -                                                gf_changelog_consume_wrap, curr, -                                                thread_name); -                        if (ret) { -                                gf_msg (this->name, GF_LOG_ERROR, ret, -                                        CHANGELOG_LIB_MSG_THREAD_CREATION_FAILED -                                        , "could not create consume-thread"); -                                goto sync; -                        } else -                                n_envoked++; - -                        from++; -                } - -        sync: -                for (iter = 0; iter < n_envoked; iter++) { -                        ret = pthread_join (th_id[iter], NULL); -                        if (ret) { -                                publish = _gf_false; -                                gf_msg (this->name, GF_LOG_ERROR, ret, -                                        CHANGELOG_LIB_MSG_PTHREAD_JOIN_FAILED, -                                        "pthread_join() error"); -                                /* try to join the rest */ -                                continue; -                        } - -                        if (publish == _gf_false) -                                continue; - -                        curr = &ccd[iter]; -                        if (ccd->retval) { -                                publish = _gf_false; -                                gf_msg (this->name, GF_LOG_ERROR, -                                        0, CHANGELOG_LIB_MSG_PARSE_ERROR, -                                        "parsing error, ceased publishing..."); -                                continue; -                        } - -                        ret = gf_changelog_publish (curr->this, -                                                    curr->jnl, curr->changelog); -                        if (ret) { -                                publish = _gf_false; -                                gf_msg (this->name, GF_LOG_ERROR, 0, -                                        CHANGELOG_LIB_MSG_PUBLISH_ERROR, -                                        "publish error, ceased publishing..."); -                        } -                } -        } - -       /* informing "parsing done". */ -        hist_jnl->hist_done = (publish == _gf_true) ? 0 : -1; +    /* informing "parsing done". */ +    hist_jnl->hist_done = (publish == _gf_true) ? 0 : -1;  out: -        if (fd != -1) -                (void) sys_close (fd); -        GF_FREE (hist_data); -        return NULL; +    if (fd != -1) +        (void)sys_close(fd); +    GF_FREE(hist_data); +    return NULL;  }  /** @@ -707,77 +700,73 @@ out:   *   -2 : Ignore this metadata file and process next   */  int -gf_changelog_extract_min_max (const char *dname, const char *htime_dir, -                              int *fd, unsigned long *total, -                              unsigned long *min_ts, unsigned long *max_ts) +gf_changelog_extract_min_max(const char *dname, const char *htime_dir, int *fd, +                             unsigned long *total, unsigned long *min_ts, +                             unsigned long *max_ts)  { -        int          ret          = -1; -        xlator_t    *this         = NULL; -        char htime_file[PATH_MAX] = {0,}; -        struct stat  stbuf        = {0,}; -        char        *iter         = NULL; -        char x_value[30]          = {0,}; - -        this = THIS; - -        snprintf (htime_file, PATH_MAX, "%s/%s", htime_dir, dname); - -        iter = (htime_file + strlen (htime_file) - TIMESTAMP_LENGTH); -        sscanf (iter ,"%lu",min_ts); - -        ret = sys_stat (htime_file, &stbuf); -        if (ret) { -                ret = -1; -                gf_smsg (this->name, GF_LOG_ERROR, errno, -                         CHANGELOG_LIB_MSG_HTIME_ERROR, -                         "stat() failed on htime file", -                         "path=%s", htime_file, -                         NULL); -                goto out; -        } - -        /* ignore everything except regular files */ -        if (!S_ISREG (stbuf.st_mode)) { -                ret = -2; -                goto out; -        } - -        *fd = open (htime_file, O_RDONLY); -        if (*fd < 0) { -                ret = -1; -                gf_smsg (this->name, GF_LOG_ERROR, errno, -                         CHANGELOG_LIB_MSG_HTIME_ERROR, -                         "open() failed for htime file", -                         "path=%s", htime_file, -                         NULL); -                goto out; -        } - -        /* Looks good, extract max timestamp */ -        ret = sys_fgetxattr (*fd, HTIME_KEY, x_value, sizeof (x_value)); -        if (ret < 0) { -                ret = -1; -                gf_smsg (this->name, GF_LOG_ERROR, errno, -                         CHANGELOG_LIB_MSG_GET_XATTR_FAILED, -                         "error extracting max timstamp from htime file" -                         "path=%s", htime_file, -                         NULL); -                goto out; -        } +    int ret = -1; +    xlator_t *this = NULL; +    char htime_file[PATH_MAX] = { +        0, +    }; +    struct stat stbuf = { +        0, +    }; +    char *iter = NULL; +    char x_value[30] = { +        0, +    }; + +    this = THIS; + +    snprintf(htime_file, PATH_MAX, "%s/%s", htime_dir, dname); + +    iter = (htime_file + strlen(htime_file) - TIMESTAMP_LENGTH); +    sscanf(iter, "%lu", min_ts); + +    ret = sys_stat(htime_file, &stbuf); +    if (ret) { +        ret = -1; +        gf_smsg(this->name, GF_LOG_ERROR, errno, CHANGELOG_LIB_MSG_HTIME_ERROR, +                "stat() failed on htime file", "path=%s", htime_file, NULL); +        goto out; +    } + +    /* ignore everything except regular files */ +    if (!S_ISREG(stbuf.st_mode)) { +        ret = -2; +        goto out; +    } + +    *fd = open(htime_file, O_RDONLY); +    if (*fd < 0) { +        ret = -1; +        gf_smsg(this->name, GF_LOG_ERROR, errno, CHANGELOG_LIB_MSG_HTIME_ERROR, +                "open() failed for htime file", "path=%s", htime_file, NULL); +        goto out; +    } + +    /* Looks good, extract max timestamp */ +    ret = sys_fgetxattr(*fd, HTIME_KEY, x_value, sizeof(x_value)); +    if (ret < 0) { +        ret = -1; +        gf_smsg(this->name, GF_LOG_ERROR, errno, +                CHANGELOG_LIB_MSG_GET_XATTR_FAILED, +                "error extracting max timstamp from htime file" +                "path=%s", +                htime_file, NULL); +        goto out; +    } + +    sscanf(x_value, "%lu:%lu", max_ts, total); +    gf_smsg(this->name, GF_LOG_INFO, 0, CHANGELOG_LIB_MSG_TOTAL_LOG_INFO, +            "changelogs min max", "min=%lu", *min_ts, "max=%lu", *max_ts, +            "total_changelogs=%lu", *total, NULL); + +    ret = 0; -        sscanf (x_value, "%lu:%lu", max_ts, total); -        gf_smsg (this->name, GF_LOG_INFO, 0, -                 CHANGELOG_LIB_MSG_TOTAL_LOG_INFO, -                 "changelogs min max", -                 "min=%lu", *min_ts, -                 "max=%lu", *max_ts, -                 "total_changelogs=%lu", *total, -                 NULL); - -        ret = 0; - - out: -        return ret; +out: +    return ret;  }  /* gf_history_changelog returns actual_end and spawns threads to @@ -790,275 +779,257 @@ gf_changelog_extract_min_max (const char *dname, const char *htime_dir,   *    -1 : On any error   */  int -gf_history_changelog (char* changelog_dir, unsigned long start, -                      unsigned long end, int n_parallel, -                      unsigned long *actual_end) +gf_history_changelog(char *changelog_dir, unsigned long start, +                     unsigned long end, int n_parallel, +                     unsigned long *actual_end)  { -        int                             ret                     = 0; -        int                             len                     = -1; -        int                             fd                      = -1; -        int                             n_read                  = -1; -        unsigned long                   min_ts                  = 0; -        unsigned long                   max_ts                  = 0; -        unsigned long                   end2                    = 0; -        unsigned long                   ts1                     = 0; -        unsigned long                   ts2                     = 0; -        unsigned long                   to                      = 0; -        unsigned long                   from                    = 0; -        unsigned long                   total_changelog         = 0; -        xlator_t                       *this                    = NULL; -        gf_changelog_journal_t         *jnl                     = NULL; -        gf_changelog_journal_t         *hist_jnl                = NULL; -        gf_changelog_history_data_t    *hist_data               = NULL; -        DIR                            *dirp                    = NULL; -        struct dirent                  *entry                   = NULL; -        struct dirent                   scratch[2]              = {{0,},}; -        pthread_t                       consume_th              = 0; -        char                            htime_dir[PATH_MAX]     = {0,}; -        char                            buffer[PATH_MAX]        = {0,}; -        gf_boolean_t                    partial_history         = _gf_false; - -        pthread_attr_t attr; - -        this = THIS; -        if (!this) { -                ret = -1; -                goto out; +    int ret = 0; +    int len = -1; +    int fd = -1; +    int n_read = -1; +    unsigned long min_ts = 0; +    unsigned long max_ts = 0; +    unsigned long end2 = 0; +    unsigned long ts1 = 0; +    unsigned long ts2 = 0; +    unsigned long to = 0; +    unsigned long from = 0; +    unsigned long total_changelog = 0; +    xlator_t *this = NULL; +    gf_changelog_journal_t *jnl = NULL; +    gf_changelog_journal_t *hist_jnl = NULL; +    gf_changelog_history_data_t *hist_data = NULL; +    DIR *dirp = NULL; +    struct dirent *entry = NULL; +    struct dirent scratch[2] = { +        { +            0, +        }, +    }; +    pthread_t consume_th = 0; +    char htime_dir[PATH_MAX] = { +        0, +    }; +    char buffer[PATH_MAX] = { +        0, +    }; +    gf_boolean_t partial_history = _gf_false; + +    pthread_attr_t attr; + +    this = THIS; +    if (!this) { +        ret = -1; +        goto out; +    } + +    ret = pthread_attr_init(&attr); +    if (ret != 0) { +        gf_msg(this->name, GF_LOG_ERROR, errno, CHANGELOG_LIB_MSG_PTHREAD_ERROR, +               "Pthread init failed"); +        return -1; +    } + +    jnl = (gf_changelog_journal_t *)GF_CHANGELOG_GET_API_PTR(this); +    if (!jnl) { +        ret = -1; +        goto out; +    } + +    hist_jnl = (gf_changelog_journal_t *)jnl->hist_jnl; +    if (!hist_jnl) { +        ret = -1; +        goto out; +    } + +    gf_smsg(this->name, GF_LOG_INFO, 0, CHANGELOG_LIB_MSG_TOTAL_LOG_INFO, +            "Requesting historical changelogs", "start=%lu", start, "end=%lu", +            end, NULL); + +    /* basic sanity check */ +    if (start > end || n_parallel <= 0) { +        gf_smsg(this->name, GF_LOG_ERROR, errno, CHANGELOG_LIB_MSG_HIST_FAILED, +                "Sanity check failed", "start=%lu", start, "end=%lu", end, +                "thread_count=%d", n_parallel, NULL); +        ret = -1; +        goto out; +    } + +    /* cap parallelism count */ +    if (n_parallel > MAX_PARALLELS) +        n_parallel = MAX_PARALLELS; + +    CHANGELOG_FILL_HTIME_DIR(changelog_dir, htime_dir); + +    dirp = sys_opendir(htime_dir); +    if (dirp == NULL) { +        gf_smsg(this->name, GF_LOG_ERROR, errno, CHANGELOG_LIB_MSG_HTIME_ERROR, +                "open dir on htime failed", "path=%s", htime_dir, NULL); +        ret = -1; +        goto out; +    } + +    for (;;) { +        errno = 0; + +        entry = sys_readdir(dirp, scratch); + +        if (!entry || errno != 0) { +            gf_smsg(this->name, GF_LOG_ERROR, errno, +                    CHANGELOG_LIB_MSG_HIST_FAILED, +                    "Requested changelog range is not availbale", "start=%lu", +                    start, "end=%lu", end, NULL); +            ret = -2; +            break;          } -        ret = pthread_attr_init (&attr); -        if (ret != 0) { -                gf_msg (this->name, GF_LOG_ERROR, errno, -                        CHANGELOG_LIB_MSG_PTHREAD_ERROR, -                        "Pthread init failed"); -                return -1; +        ret = gf_changelog_extract_min_max(entry->d_name, htime_dir, &fd, +                                           &total_changelog, &min_ts, &max_ts); +        if (ret) { +            if (-2 == ret) +                continue; +            goto out;          } -        jnl = (gf_changelog_journal_t *) GF_CHANGELOG_GET_API_PTR (this); -        if (!jnl) { +        if (start >= min_ts && start < max_ts) { +            /** +             * TODO: handle short reads later... +             */ +            n_read = sys_read(fd, buffer, PATH_MAX); +            if (n_read < 0) {                  ret = -1; +                gf_msg(this->name, GF_LOG_ERROR, errno, +                       CHANGELOG_LIB_MSG_READ_ERROR, +                       "unable to read htime file");                  goto out; -        } +            } + +            len = strlen(buffer); + +            /** +             * search @start in the htime file returning it's index +             * (@from) +             */ +            from = gf_history_b_search(fd, start, 0, total_changelog - 1, len); -        hist_jnl = (gf_changelog_journal_t *) jnl->hist_jnl; -        if (!hist_jnl) { +            /* ensuring correctness of gf_b_search */ +            if (gf_history_check(fd, from, start, len) != 0) {                  ret = -1; +                gf_smsg(this->name, GF_LOG_ERROR, 0, +                        CHANGELOG_LIB_MSG_GET_TIME_ERROR, +                        "wrong result for start", "start=%lu", start, "idx=%lu", +                        from, NULL);                  goto out; -        } - -        gf_smsg (this->name, GF_LOG_INFO, 0, -                 CHANGELOG_LIB_MSG_TOTAL_LOG_INFO, -                 "Requesting historical changelogs", -                 "start=%lu", start, "end=%lu", end, NULL); - -        /* basic sanity check */ -        if (start > end || n_parallel <= 0) { -                gf_smsg (this->name, GF_LOG_ERROR, errno, -                         CHANGELOG_LIB_MSG_HIST_FAILED, "Sanity check failed", -                         "start=%lu", start, -                         "end=%lu", end, -                         "thread_count=%d", n_parallel, -                         NULL); +            } + +            end2 = (end <= max_ts) ? end : max_ts; + +            /* Check if end falls out of same HTIME file. The end +             * falling to a different htime file or changelog +             * disable-enable is detected only after 20 seconds. +             * This is required because, applications generally +             * asks historical changelogs till current time and +             * it is possible changelog is not rolled over yet. +             * So, buffer time of default rollover time plus 5 +             * seconds is subtracted.  If the application requests +             * the end time with in half a minute of changelog +             * disable, it's not detected as changelog disable and +             * it's application's responsibility to retry after +             * 20 seconds before confirming it as partial history. +             */ +            if ((end - 20) > max_ts) { +                partial_history = _gf_true; +            } + +            /** +             * search @end2 in htime file returning it's index (@to) +             */ +            to = gf_history_b_search(fd, end2, 0, total_changelog - 1, len); + +            if (gf_history_check(fd, to, end2, len) != 0) {                  ret = -1; +                gf_smsg(this->name, GF_LOG_ERROR, 0, +                        CHANGELOG_LIB_MSG_GET_TIME_ERROR, +                        "wrong result for end", "start=%lu", end2, "idx=%lu", +                        to, NULL);                  goto out; -        } +            } -        /* cap parallelism count */ -        if (n_parallel > MAX_PARALLELS) -                n_parallel = MAX_PARALLELS; +            ret = gf_history_get_timestamp(fd, from, len, &ts1); +            if (ret == -1) +                goto out; -        CHANGELOG_FILL_HTIME_DIR (changelog_dir, htime_dir); +            ret = gf_history_get_timestamp(fd, to, len, &ts2); +            if (ret == -1) +                goto out; -        dirp = sys_opendir (htime_dir); -        if (dirp == NULL) { -                gf_smsg (this->name, GF_LOG_ERROR, errno, -                         CHANGELOG_LIB_MSG_HTIME_ERROR, -                         "open dir on htime failed", -                         "path=%s", htime_dir, -                         NULL); +            gf_smsg(this->name, GF_LOG_INFO, 0, +                    CHANGELOG_LIB_MSG_TOTAL_LOG_INFO, "FINAL", "from=%lu", ts1, +                    "to=%lu", ts2, "changes=%lu", (to - from + 1), NULL); + +            hist_data = GF_CALLOC(1, sizeof(gf_changelog_history_data_t), +                                  gf_changelog_mt_history_data_t); + +            hist_data->htime_fd = fd; +            hist_data->from = from; +            hist_data->to = to; +            hist_data->len = len; +            hist_data->n_parallel = n_parallel; +            hist_data->this = this; + +            ret = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); +            if (ret != 0) { +                gf_msg(this->name, GF_LOG_ERROR, ret, +                       CHANGELOG_LIB_MSG_PTHREAD_ERROR, +                       "unable to sets the detach" +                       " state attribute");                  ret = -1;                  goto out; -        } +            } + +            /* spawn a thread for background parsing & publishing */ +            ret = gf_thread_create(&consume_th, &attr, gf_history_consume, +                                   hist_data, "cloghcon"); +            if (ret) { +                gf_msg(this->name, GF_LOG_ERROR, ret, +                       CHANGELOG_LIB_MSG_THREAD_CREATION_FAILED, +                       "creation of consume parent-thread" +                       " failed."); +                ret = -1; +                goto out; +            } -        for (;;) { - -                errno = 0; - -                entry = sys_readdir (dirp, scratch); - -                if (!entry || errno != 0) { -                        gf_smsg (this->name, GF_LOG_ERROR, errno, -                                 CHANGELOG_LIB_MSG_HIST_FAILED, -                                 "Requested changelog range is not availbale", -                                 "start=%lu", start, "end=%lu", end, NULL); -                        ret = -2; -                        break; -                } - -                ret = gf_changelog_extract_min_max (entry->d_name, htime_dir, -                                                    &fd, &total_changelog, -                                                    &min_ts, &max_ts); -                if (ret) { -                        if (-2 == ret) -                                continue; -                        goto out; -                } - -                if (start >= min_ts && start < max_ts) { -                        /** -                         * TODO: handle short reads later... -                         */ -                        n_read = sys_read (fd, buffer, PATH_MAX); -                        if (n_read < 0) { -                                ret = -1; -                                gf_msg (this->name, GF_LOG_ERROR, errno, -                                        CHANGELOG_LIB_MSG_READ_ERROR, -                                        "unable to read htime file"); -                                goto out; -                        } - -                        len = strlen (buffer); - -                        /** -                         * search @start in the htime file returning it's index -                         * (@from) -                         */ -                        from = gf_history_b_search (fd, start, 0, -                                                   total_changelog - 1, len); - -                        /* ensuring correctness of gf_b_search */ -                        if (gf_history_check (fd, from, start, len) != 0) { -                                ret = -1; -                                gf_smsg (this->name, GF_LOG_ERROR, 0, -                                         CHANGELOG_LIB_MSG_GET_TIME_ERROR, -                                         "wrong result for start", -                                         "start=%lu", start, -                                         "idx=%lu", from, -                                         NULL); -                                goto out; -                        } - -                        end2 = (end <= max_ts) ? end : max_ts; - -                        /* Check if end falls out of same HTIME file. The end -                         * falling to a different htime file or changelog -                         * disable-enable is detected only after 20 seconds. -                         * This is required because, applications generally -                         * asks historical changelogs till current time and -                         * it is possible changelog is not rolled over yet. -                         * So, buffer time of default rollover time plus 5 -                         * seconds is subtracted.  If the application requests -                         * the end time with in half a minute of changelog -                         * disable, it's not detected as changelog disable and -                         * it's application's responsibility to retry after -                         * 20 seconds before confirming it as partial history. -                         */ -                        if ((end - 20) > max_ts) { -                                partial_history = _gf_true; -                        } - -                        /** -                         * search @end2 in htime file returning it's index (@to) -                         */ -                        to = gf_history_b_search (fd, end2, -                                                  0, total_changelog - 1, len); - -                        if (gf_history_check (fd, to, end2, len) != 0) { -                                ret = -1; -                                gf_smsg (this->name, GF_LOG_ERROR, 0, -                                         CHANGELOG_LIB_MSG_GET_TIME_ERROR, -                                         "wrong result for end", -                                         "start=%lu", end2, -                                         "idx=%lu", to, -                                         NULL); -                                goto out; -                        } - -                        ret = gf_history_get_timestamp (fd, from, len, &ts1); -                        if (ret == -1) -                                goto out; - -                        ret = gf_history_get_timestamp (fd, to, len, &ts2); -                        if (ret == -1) -                                goto out; - -                        gf_smsg (this->name, GF_LOG_INFO, 0, -                                 CHANGELOG_LIB_MSG_TOTAL_LOG_INFO, -                                 "FINAL", -                                 "from=%lu", ts1, -                                 "to=%lu", ts2, -                                 "changes=%lu", (to - from + 1), -                                 NULL); - -                        hist_data =  GF_CALLOC (1, -                                     sizeof (gf_changelog_history_data_t), -                                     gf_changelog_mt_history_data_t); - -                        hist_data->htime_fd   = fd; -                        hist_data->from       = from; -                        hist_data->to         = to; -                        hist_data->len        = len; -                        hist_data->n_parallel = n_parallel; -                        hist_data->this       = this; - -                        ret = pthread_attr_setdetachstate -                                (&attr, PTHREAD_CREATE_DETACHED); -                        if (ret != 0) { -                                gf_msg (this->name, GF_LOG_ERROR, ret, -                                        CHANGELOG_LIB_MSG_PTHREAD_ERROR, -                                        "unable to sets the detach" -                                        " state attribute"); -                                ret = -1; -                                goto out; -                        } - -                        /* spawn a thread for background parsing & publishing */ -                        ret = gf_thread_create (&consume_th, &attr, -                                                gf_history_consume, hist_data, -                                                "cloghcon"); -                        if (ret) { -                                gf_msg (this->name, GF_LOG_ERROR, ret, -                                        CHANGELOG_LIB_MSG_THREAD_CREATION_FAILED -                                        , "creation of consume parent-thread" -                                        " failed."); -                                ret = -1; -                                goto out; -                        } - -                        goto out; - -                } else {/* end of range check */ -                        gf_smsg (this->name, GF_LOG_ERROR, errno, -                                 CHANGELOG_LIB_MSG_HIST_FAILED, -                                 "Requested changelog range is not " -                                 "available. Retrying next HTIME", -                                 "start=%lu", start, -                                 "end=%lu", end, -                                 "chlog_min=%lu", min_ts, -                                 "chlog_max=%lu", max_ts, -                                 NULL); -                } -        } /* end of readdir() */ +            goto out; + +        } else { /* end of range check */ +            gf_smsg(this->name, GF_LOG_ERROR, errno, +                    CHANGELOG_LIB_MSG_HIST_FAILED, +                    "Requested changelog range is not " +                    "available. Retrying next HTIME", +                    "start=%lu", start, "end=%lu", end, "chlog_min=%lu", min_ts, +                    "chlog_max=%lu", max_ts, NULL); +        } +    } /* end of readdir() */  out: -        if (dirp != NULL) -                (void) sys_closedir (dirp); +    if (dirp != NULL) +        (void)sys_closedir(dirp); -        if (ret < 0) { -                if (fd != -1) -                        (void) sys_close (fd); -                GF_FREE (hist_data); -                (void) pthread_attr_destroy (&attr); +    if (ret < 0) { +        if (fd != -1) +            (void)sys_close(fd); +        GF_FREE(hist_data); +        (void)pthread_attr_destroy(&attr); -                return ret; -        } +        return ret; +    } -        hist_jnl->hist_done = 1; -        *actual_end = ts2; +    hist_jnl->hist_done = 1; +    *actual_end = ts2; -        if (partial_history) { -                ret = 1; -        } +    if (partial_history) { +        ret = 1; +    } -        return ret; +    return ret;  }  | 
