diff options
Diffstat (limited to 'xlators/features/changelog/lib/src/gf-changelog.c')
| -rw-r--r-- | xlators/features/changelog/lib/src/gf-changelog.c | 514 | 
1 files changed, 514 insertions, 0 deletions
diff --git a/xlators/features/changelog/lib/src/gf-changelog.c b/xlators/features/changelog/lib/src/gf-changelog.c new file mode 100644 index 00000000000..4e60bb276a7 --- /dev/null +++ b/xlators/features/changelog/lib/src/gf-changelog.c @@ -0,0 +1,514 @@ +/* +   Copyright (c) 2013 Red Hat, Inc. <http://www.redhat.com> +   This file is part of GlusterFS. + +   This file is licensed to you under your choice of the GNU Lesser +   General Public License, version 3 or any later version (LGPLv3 or +   later), or the GNU General Public License, version 2 (GPLv2), in all +   cases as published by the Free Software Foundation. +*/ + +#include <errno.h> +#include <dirent.h> +#include <stddef.h> +#include <sys/types.h> +#include <sys/socket.h> +#include <sys/un.h> + +#ifndef _GNU_SOURCE +#define _GNU_SOURCE +#endif +#include <string.h> + +#include "globals.h" +#include "glusterfs.h" +#include "logging.h" + +#include "gf-changelog-helpers.h" + +/* from the changelog translator */ +#include "changelog-misc.h" +#include "changelog-mem-types.h" + +int byebye = 0; + +static void +gf_changelog_cleanup (gf_changelog_t *gfc) +{ +        /* socket */ +        if (gfc->gfc_sockfd != -1) +                close (gfc->gfc_sockfd); +        /* tracker fd */ +        if (gfc->gfc_fd != -1) +                close (gfc->gfc_fd); +        /* processing dir */ +        if (gfc->gfc_dir) +                closedir (gfc->gfc_dir); + +        if (gfc->gfc_working_dir) +                free (gfc->gfc_working_dir); /* allocated by realpath */ +} + +void +__attribute__ ((constructor)) gf_changelog_ctor (void) +{ +        glusterfs_ctx_t *ctx = NULL; + +        ctx = glusterfs_ctx_new (); +        if (!ctx) +                return; + +        if (glusterfs_globals_init (ctx)) { +                free (ctx); +                ctx = NULL; +                return; +        } + +        THIS->ctx = ctx; +} + +void +__attribute__ ((destructor)) gf_changelog_dtor (void) +{ +        xlator_t        *this = NULL; +        glusterfs_ctx_t *ctx  = NULL; +        gf_changelog_t  *gfc  = NULL; + +        this = THIS; +        if (!this) +                return; + +        ctx = this->ctx; +        gfc = this->private; + +        if (gfc) { +                gf_changelog_cleanup (gfc); +                GF_FREE (gfc); +        } + +        if (ctx) { +                pthread_mutex_destroy (&ctx->lock); +                free (ctx); +                ctx = NULL; +        } +} + + +static int +gf_changelog_open_dirs (gf_changelog_t *gfc) +{ +        int  ret                    = -1; +        DIR *dir                    = NULL; +        int  tracker_fd             = 0; +        char tracker_path[PATH_MAX] = {0,}; + +        (void) snprintf (gfc->gfc_current_dir, PATH_MAX, +                         "%s/"GF_CHANGELOG_CURRENT_DIR"/", +                         gfc->gfc_working_dir); +        ret = mkdir_p (gfc->gfc_current_dir, 0600, _gf_false); +        if (ret) +                goto out; + +        (void) snprintf (gfc->gfc_processed_dir, PATH_MAX, +                         "%s/"GF_CHANGELOG_PROCESSED_DIR"/", +                         gfc->gfc_working_dir); +        ret = mkdir_p (gfc->gfc_processed_dir, 0600, _gf_false); +        if (ret) +                goto out; + +        (void) snprintf (gfc->gfc_processing_dir, PATH_MAX, +                         "%s/"GF_CHANGELOG_PROCESSING_DIR"/", +                         gfc->gfc_working_dir); +        ret = mkdir_p (gfc->gfc_processing_dir, 0600, _gf_false); +        if (ret) +                goto out; + +        dir = opendir (gfc->gfc_processing_dir); +        if (!dir) { +                gf_log ("", GF_LOG_ERROR, +                        "opendir() error [reason: %s]", strerror (errno)); +                goto out; +        } + +        gfc->gfc_dir = dir; + +        (void) snprintf (tracker_path, PATH_MAX, +                         "%s/"GF_CHANGELOG_TRACKER, gfc->gfc_working_dir); + +        tracker_fd = open (tracker_path, O_CREAT | O_APPEND | O_RDWR, +                           S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH); +        if (tracker_fd < 0) { +                closedir (gfc->gfc_dir); +                ret = -1; +                goto out; +        } + +        gfc->gfc_fd = tracker_fd; +        ret = 0; + out: +        return ret; +} + +int +gf_changelog_notification_init (xlator_t *this, gf_changelog_t *gfc) +{ +        int                ret    = 0; +        int                len    = 0; +        int                tries  = 0; +        int                sockfd = 0; +        struct sockaddr_un remote; + +        this = gfc->this; + +        if (gfc->gfc_sockfd != -1) { +                gf_log (this->name, GF_LOG_INFO, +                        "Reconnecting..."); +                close (gfc->gfc_sockfd); +        } + +        sockfd = socket (AF_UNIX, SOCK_STREAM, 0); +        if (sockfd < 0) { +                ret = -1; +                goto out; +        } + +        CHANGELOG_MAKE_SOCKET_PATH (gfc->gfc_brickpath, +                                    gfc->gfc_sockpath, PATH_MAX); +        gf_log (this->name, GF_LOG_INFO, +                "connecting to changelog socket: %s (brick: %s)", +                gfc->gfc_sockpath, gfc->gfc_brickpath); + +        remote.sun_family = AF_UNIX; +        strcpy (remote.sun_path, gfc->gfc_sockpath); + +        len = strlen (remote.sun_path) + sizeof (remote.sun_family); + +        while (tries < gfc->gfc_connretries) { +                gf_log (this->name, GF_LOG_WARNING, +                        "connection attempt %d/%d...", +                        tries + 1, gfc->gfc_connretries); + +                /* initiate a connect */ +                if (connect (sockfd, (struct sockaddr *) &remote, len) == 0) { +                        gfc->gfc_sockfd = sockfd; +                        break; +                } + +                tries++; +                sleep (2); +        } + +        if (tries == gfc->gfc_connretries) { +                gf_log (this->name, GF_LOG_ERROR, +                        "could not connect to changelog socket!" +                        " bailing out..."); +                ret = -1; +        } else +                gf_log (this->name, GF_LOG_INFO, +                        "connection successful"); + + out: +        return ret; +} + +int +gf_changelog_done (char *file) +{ +        int             ret    = -1; +        char           *buffer = NULL; +        xlator_t       *this   = NULL; +        gf_changelog_t *gfc    = NULL; +        char to_path[PATH_MAX] = {0,}; + +        errno = EINVAL; + +        this = THIS; +        if (!this) +                goto out; + +        gfc = (gf_changelog_t *) this->private; +        if (!gfc) +                goto out; + +        if (!file || !strlen (file)) +                goto out; + +        /* make sure 'file' is inside ->gfc_working_dir */ +        buffer = realpath (file, NULL); +        if (!buffer) +                goto out; + +        if (strncmp (gfc->gfc_working_dir, +                     buffer, strlen (gfc->gfc_working_dir))) +                goto out; + +        (void) snprintf (to_path, PATH_MAX, "%s%s", +                         gfc->gfc_processed_dir, basename (buffer)); +        gf_log (this->name, GF_LOG_DEBUG, +                "moving %s to processed directory", file); +        ret = rename (buffer, to_path); +        if (ret) { +                gf_log (this->name, GF_LOG_ERROR, +                        "cannot move %s to %s (reason: %s)", +                        file, to_path, strerror (errno)); +                goto out; +        } + +        ret = 0; + + out: +        if (buffer) +                free (buffer); /* allocated by realpath() */ +        return ret; +} + +/** + * @API + *  for a set of changelogs, start from the begining + */ +int +gf_changelog_start_fresh () +{ +        xlator_t *this = NULL; +        gf_changelog_t *gfc = NULL; + +        this = THIS; +        if (!this) +                goto out; + +        errno = EINVAL; + +        gfc = (gf_changelog_t *) this->private; +        if (!gfc) +                goto out; + +        if (gf_ftruncate (gfc->gfc_fd, 0)) +                goto out; + +        return 0; + + out: +        return -1; +} + +/** + * @API + * return the next changelog file entry. zero means all chanelogs + * consumed. + */ +ssize_t +gf_changelog_next_change (char *bufptr, size_t maxlen) +{ +        ssize_t         size       = 0; +        int             tracker_fd = 0; +        xlator_t       *this       = NULL; +        gf_changelog_t *gfc        = NULL; +        char buffer[PATH_MAX]      = {0,}; + +        errno = EINVAL; + +        this = THIS; +        if (!this) +                goto out; + +        gfc = (gf_changelog_t *) this->private; +        if (!gfc) +                goto out; + +        tracker_fd = gfc->gfc_fd; + +        size = gf_readline (tracker_fd, buffer, maxlen); +        if (size < 0) +                goto out; +        if (size == 0) +                return 0; + +        memcpy (bufptr, buffer, size - 1); +        *(buffer + size) = '\0'; + +        return size; + + out: +        return -1; +} + +/** + * @API + *  gf_changelog_scan() - scan and generate a list of change entries + * + * calling this api multiple times (without calling gf_changlog_done()) + * would result new changelogs(s) being refreshed in the tracker file. + * This call also acts as a cancellation point for the consumer. + */ +ssize_t +gf_changelog_scan () +{ +        int             ret        = 0; +        int             tracker_fd = 0; +        size_t          len        = 0; +        size_t          off        = 0; +        xlator_t       *this       = NULL; +        size_t          nr_entries = 0; +        gf_changelog_t *gfc        = NULL; +        struct dirent  *entryp     = NULL; +        struct dirent  *result     = NULL; +        char buffer[PATH_MAX]      = {0,}; + +        this = THIS; +        if (!this) +                goto out; + +        gfc = (gf_changelog_t *) this->private; +        if (!gfc) +                goto out; + +        /** +         * do we need to protect 'byebye' with locks? worst, the +         * consumer would get notified during next scan(). +         */ +        if (byebye) { +                errno = ECONNREFUSED; +                goto out; +        } + +        errno = EINVAL; + +        tracker_fd = gfc->gfc_fd; + +        if (gf_ftruncate (tracker_fd, 0)) +                goto out; + +        len = offsetof(struct dirent, d_name) +                + pathconf(gfc->gfc_processing_dir, _PC_NAME_MAX) + 1; +        entryp = GF_CALLOC (1, len, +                            gf_changelog_mt_libgfchangelog_dirent_t); +        if (!entryp) +                goto out; + +        rewinddir (gfc->gfc_dir); +        while (1) { +                ret = readdir_r (gfc->gfc_dir, entryp, &result); +                if (ret || !result) +                        break; + +                if ( !strcmp (basename (entryp->d_name), ".") +                     || !strcmp (basename (entryp->d_name), "..") ) +                        continue; + +                nr_entries++; + +                GF_CHANGELOG_FILL_BUFFER (gfc->gfc_processing_dir, +                                          buffer, off, +                                          strlen (gfc->gfc_processing_dir)); +                GF_CHANGELOG_FILL_BUFFER (entryp->d_name, buffer, +                                          off, strlen (entryp->d_name)); +                GF_CHANGELOG_FILL_BUFFER ("\n", buffer, off, 1); + +                if (gf_changelog_write (tracker_fd, buffer, off) != off) { +                        gf_log (this->name, GF_LOG_ERROR, +                                "error writing changelog filename" +                                " to tracker file"); +                        break; +                } +                off = 0; +        } + +        GF_FREE (entryp); + +        if (!result) { +                if (gf_lseek (tracker_fd, 0, SEEK_SET) != -1) +                        return nr_entries; +        } + out: +        return -1; +} + +/** + * @API + *  gf_changelog_register() - register a client for updates. + */ +int +gf_changelog_register (char *brick_path, char *scratch_dir, +                       char *log_file, int log_level, int max_reconnects) +{ +        int             i    = 0; +        int             ret  = -1; +        int             errn = 0; +        xlator_t       *this = NULL; +        gf_changelog_t *gfc  = NULL; + +        this = THIS; +        if (!this->ctx) +                goto out; + +        errno = ENOMEM; + +        gfc = GF_CALLOC (1, sizeof (*gfc), +                         gf_changelog_mt_libgfchangelog_t); +        if (!gfc) +                goto out; + +        gfc->this = this; + +        gfc->gfc_dir = NULL; +        gfc->gfc_fd = gfc->gfc_sockfd = -1; + +        gfc->gfc_working_dir = realpath (scratch_dir, NULL); +        if (!gfc->gfc_working_dir) { +                errn = errno; +                goto cleanup; +        } + +        ret = gf_changelog_open_dirs (gfc); +        if (ret) { +                errn = errno; +                gf_log (this->name, GF_LOG_ERROR, +                        "could not create entries in scratch dir"); +                goto cleanup; +        } + +        if (gf_log_init (this->ctx, log_file)) +                goto cleanup; + +        gf_log_set_loglevel ((log_level == -1) ? GF_LOG_INFO : +                             log_level); + +        gfc->gfc_connretries = (max_reconnects <= 0) ? 1 : max_reconnects; +        (void) strncpy (gfc->gfc_brickpath, brick_path, PATH_MAX); + +        ret = gf_changelog_notification_init (this, gfc); +        if (ret) { +                errn = errno; +                goto cleanup; +        } + +        ret = pthread_create (&gfc->gfc_changelog_processor, +                              NULL, gf_changelog_process, gfc); +        if (ret) { +                errn = errno; +                gf_log (this->name, GF_LOG_ERROR, +                        "error creating changelog processor thread" +                        " new changes won't be recorded!!!"); +                goto cleanup; +        } + +        for (; i < 256; i++) { +                gfc->rfc3986[i] = +                        (isalnum(i) || i == '~' || +                        i == '-' || i == '.' || i == '_') ? i : 0; +        } + +        ret = 0; +        this->private = gfc; + +        goto out; + + cleanup: +        gf_changelog_cleanup (gfc); +        GF_FREE (gfc); +        this->private = NULL; +        errno = errn; + + out: +        return ret; +}  | 
