diff options
Diffstat (limited to 'xlators/features/changelog/src/changelog-helpers.c')
| -rw-r--r-- | xlators/features/changelog/src/changelog-helpers.c | 691 | 
1 files changed, 691 insertions, 0 deletions
diff --git a/xlators/features/changelog/src/changelog-helpers.c b/xlators/features/changelog/src/changelog-helpers.c new file mode 100644 index 00000000000..c1bb6e5fef9 --- /dev/null +++ b/xlators/features/changelog/src/changelog-helpers.c @@ -0,0 +1,691 @@ +/* +   Copyright (c) 2013 Red Hat, Inc. <http://www.redhat.com> +   This file is part of GlusterFS. + +   This file is licensed to you under your choice of the GNU Lesser +   General Public License, version 3 or any later version (LGPLv3 or +   later), or the GNU General Public License, version 2 (GPLv2), in all +   cases as published by the Free Software Foundation. +*/ + +#ifndef _CONFIG_H +#define _CONFIG_H +#include "config.h" +#endif + +#include "xlator.h" +#include "defaults.h" +#include "logging.h" +#include "iobuf.h" + +#include "changelog-helpers.h" +#include "changelog-mem-types.h" + +#include <pthread.h> + +void +changelog_thread_cleanup (xlator_t *this, pthread_t thr_id) +{ +        int   ret    = 0; +        void *retval = NULL; + +        /* send a cancel request to the thread */ +        ret = pthread_cancel (thr_id); +        if (ret) { +                gf_log (this->name, GF_LOG_ERROR, +                        "could not cancel thread (reason: %s)", +                        strerror (errno)); +                goto out; +        } + +        ret = pthread_join (thr_id, &retval); +        if (ret || (retval != PTHREAD_CANCELED)) { +                gf_log (this->name, GF_LOG_ERROR, +                        "cancel request not adhered as expected" +                        " (reason: %s)", strerror (errno)); +        } + + out: +        return; +} + +inline void * +changelog_get_usable_buffer (changelog_local_t *local) +{ +        changelog_log_data_t *cld = NULL; + +        cld = &local->cld; +        if (!cld->cld_iobuf) +                return NULL; + +        return cld->cld_iobuf->ptr; +} + +inline void +changelog_set_usable_record_and_length (changelog_local_t *local, +                                        size_t len, int xr) +{ +        changelog_log_data_t *cld = NULL; + +        cld = &local->cld; + +        cld->cld_ptr_len = len; +        cld->cld_xtra_records = xr; +} + +void +changelog_local_cleanup (xlator_t *xl, changelog_local_t *local) +{ +        int                   i   = 0; +        changelog_opt_t      *co  = NULL; +        changelog_log_data_t *cld = NULL; + +        if (!local) +                return; + +        cld = &local->cld; + +        /* cleanup dynamic allocation for extra records */ +        if (cld->cld_xtra_records) { +                co = (changelog_opt_t *) cld->cld_ptr; +                for (; i < cld->cld_xtra_records; i++, co++) +                        if (co->co_free) +                                co->co_free (co); +        } + +        CHANGELOG_IOBUF_UNREF (cld->cld_iobuf); + +        if (local->inode) +                inode_unref (local->inode); + +        mem_put (local); +} + +inline int +changelog_write (int fd, char *buffer, size_t len) +{ +        ssize_t size = 0; +        size_t writen = 0; + +        while (writen < len) { +                size = write (fd, +                              buffer + writen, len - writen); +                if (size <= 0) +                        break; + +                writen += size; +        } + +        return (writen != len); +} + +static int +changelog_rollover_changelog (xlator_t *this, +                              changelog_priv_t *priv, unsigned long ts) +{ +        int   ret            = -1; +        int   notify         = 0; +        char *bname          = NULL; +        char ofile[PATH_MAX] = {0,}; +        char nfile[PATH_MAX] = {0,}; + +        if (priv->changelog_fd != -1) { +                close (priv->changelog_fd); +                priv->changelog_fd = -1; +        } + +        (void) snprintf (ofile, PATH_MAX, +                         "%s/"CHANGELOG_FILE_NAME, priv->changelog_dir); +        (void) snprintf (nfile, PATH_MAX, +                         "%s/"CHANGELOG_FILE_NAME".%lu", +                         priv->changelog_dir, ts); + +        ret = rename (ofile, nfile); +        if (!ret) +                notify = 1; + +        if (ret && (errno == ENOENT)) { +                ret = 0; +        } + +        if (ret) { +                gf_log (this->name, GF_LOG_ERROR, +                        "error renaming %s -> %s (reason %s)", +                        ofile, nfile, strerror (errno)); +        } + +        if (notify) { +                bname = basename (nfile); +                gf_log (this->name, GF_LOG_DEBUG, "notifying: %s", bname); +                ret = changelog_write (priv->wfd, bname, strlen (bname) + 1); +                if (ret) { +                        gf_log (this->name, GF_LOG_ERROR, +                                "Failed to send file name to notify thread" +                                " (reason: %s)", strerror (errno)); +                } +        } + +        return ret; +} + +int +changelog_open (xlator_t *this, +                changelog_priv_t *priv) +{ +        int fd                        = 0; +        int ret                       = -1; +        int flags                     = 0; +        char buffer[1024]             = {0,}; +        char changelog_path[PATH_MAX] = {0,}; + +        (void) snprintf (changelog_path, PATH_MAX, +                         "%s/"CHANGELOG_FILE_NAME, +                         priv->changelog_dir); + +        flags |= (O_CREAT | O_RDWR); +        if (priv->fsync_interval == 0) +                flags |= O_SYNC; + +        fd = open (changelog_path, flags, +                   S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH); +        if (fd < 0) { +                gf_log (this->name, GF_LOG_ERROR, +                        "unable to open/create changelog file %s" +                        " (reason: %s). change-logging will be" +                        " inactive", changelog_path, strerror (errno)); +                goto out; +        } + +        priv->changelog_fd = fd; + +        (void) snprintf (buffer, 1024, CHANGELOG_HEADER, +                         CHANGELOG_VERSION_MAJOR, +                         CHANGELOG_VERSION_MINOR, +                         priv->encode_mode); +        ret = changelog_write_change (priv, buffer, strlen (buffer)); +        if (ret) { +                close (priv->changelog_fd); +                priv->changelog_fd = -1; +                goto out; +        } + +        ret = 0; + + out: +        return ret; +} + +int +changelog_start_next_change (xlator_t *this, +                             changelog_priv_t *priv, +                             unsigned long ts, gf_boolean_t finale) +{ +        int ret = -1; + +        ret = changelog_rollover_changelog (this, priv, ts); + +        if (!ret && !finale) +                ret = changelog_open (this, priv); + +        return ret; +} + +/** + * return the length of entry + */ +inline size_t +changelog_entry_length () +{ +        return sizeof (changelog_log_data_t); +} + +int +changelog_fill_rollover_data (changelog_log_data_t *cld, gf_boolean_t is_last) +{ +        struct timeval tv = {0,}; + +        cld->cld_type = CHANGELOG_TYPE_ROLLOVER; + +        if (gettimeofday (&tv, NULL)) +                return -1; + +        cld->cld_roll_time = (unsigned long) tv.tv_sec; +        cld->cld_finale = is_last; +        return 0; +} + +int +changelog_write_change (changelog_priv_t *priv, char *buffer, size_t len) +{ +        return changelog_write (priv->changelog_fd, buffer, len); +} + +inline int +changelog_handle_change (xlator_t *this, +                         changelog_priv_t *priv, changelog_log_data_t *cld) +{ +        int ret = 0; + +        if (CHANGELOG_TYPE_IS_ROLLOVER (cld->cld_type)) { +                ret = changelog_start_next_change (this, priv, +                                                   cld->cld_roll_time, +                                                   cld->cld_finale); +                if (ret) +                        gf_log (this->name, GF_LOG_ERROR, +                                "Problem rolling over changelog(s)"); +                goto out; +        } + +        /** +         * case when there is reconfigure done (disabling changelog) and there +         * are still fops that have updates in prgress. +         */ +        if (priv->changelog_fd == -1) +                return 0; + +        if (CHANGELOG_TYPE_IS_FSYNC (cld->cld_type)) { +                ret = fsync (priv->changelog_fd); +                if (ret < 0) { +                        gf_log (this->name, GF_LOG_ERROR, +                                "fsync failed (reason: %s)", +                                strerror (errno)); +                } +                goto out; +        } + +        ret = priv->ce->encode (this, cld); +        if (ret) { +                gf_log (this->name, GF_LOG_ERROR, +                        "error writing changelog to disk"); +        } + + out: +        return ret; +} + +changelog_local_t * +changelog_local_init (xlator_t *this, inode_t *inode, +                      uuid_t gfid, int xtra_records, +                      gf_boolean_t update_flag) +{ +        changelog_local_t *local = NULL; +        struct iobuf      *iobuf = NULL; + +        /** +         * We relax the presence of inode if @update_flag is true. +         * The caller (implmentation of the fop) needs to be careful to +         * not blindly use local->inode. +         */ +        if (!update_flag && !inode) { +                gf_log_callingfn (this->name, GF_LOG_WARNING, +                                  "inode needed for version checking !!!"); +                goto out; +        } + +        if (xtra_records) { +                iobuf = iobuf_get2 (this->ctx->iobuf_pool, +                                    xtra_records * CHANGELOG_OPT_RECORD_LEN); +                if (!iobuf) +                        goto out; +        } + +        local = mem_get0 (this->local_pool); +        if (!local) { +                CHANGELOG_IOBUF_UNREF (iobuf); +                goto out; +        } + +        local->update_no_check = update_flag; + +        uuid_copy (local->cld.cld_gfid, gfid); + +        local->cld.cld_iobuf = iobuf; +        local->cld.cld_xtra_records = 0; /* set by the caller */ + +        if (inode) +                local->inode = inode_ref (inode); + + out: +        return local; +} + +int +changelog_forget (xlator_t *this, inode_t *inode) +{ +        uint64_t ctx_addr = 0; +        changelog_inode_ctx_t *ctx = NULL; + +        inode_ctx_del (inode, this, &ctx_addr); +        if (!ctx_addr) +                return 0; + +        ctx = (changelog_inode_ctx_t *) (long) ctx_addr; +        GF_FREE (ctx); + +        return 0; +} + +int +changelog_inject_single_event (xlator_t *this, +                               changelog_priv_t *priv, +                               changelog_log_data_t *cld) +{ +        return priv->cd.dispatchfn (this, priv, priv->cd.cd_data, cld, NULL); +} + +/** + * TODO: these threads have many thing in common (wake up after + * a certain time etc..). move them into separate routine. + */ +void * +changelog_rollover (void *data) +{ +        int                     ret   = 0; +        xlator_t               *this  = NULL; +        struct timeval          tv    = {0,}; +        changelog_log_data_t    cld   = {0,}; +        changelog_time_slice_t *slice = NULL; +        changelog_priv_t       *priv  = data; + +        this = priv->cr.this; +        slice = &priv->slice; + +        while (1) { +                tv.tv_sec  = priv->rollover_time; +                tv.tv_usec = 0; + +                ret = select (0, NULL, NULL, NULL, &tv); +                if (ret) +                        continue; + +                ret = changelog_fill_rollover_data (&cld, _gf_false); +                if (ret) { +                        gf_log (this->name, GF_LOG_ERROR, +                                "failed to fill rollover data"); +                        continue; +                } + +                LOCK (&priv->lock); +                { +                        ret = changelog_inject_single_event (this, priv, &cld); +                        if (!ret) +                                SLICE_VERSION_UPDATE (slice); +                } +                UNLOCK (&priv->lock); +        } + +        return NULL; +} + +void * +changelog_fsync_thread (void *data) +{ +        int                   ret  = 0; +        xlator_t             *this = NULL; +        struct timeval        tv   = {0,}; +        changelog_log_data_t  cld  = {0,}; +        changelog_priv_t     *priv = data; + +        this = priv->cf.this; +        cld.cld_type = CHANGELOG_TYPE_FSYNC; + +        while (1) { +                tv.tv_sec  = priv->fsync_interval; +                tv.tv_usec = 0; + +                ret = select (0, NULL, NULL, NULL, &tv); +                if (ret) +                        continue; + +                ret = changelog_inject_single_event (this, priv, &cld); +                if (ret) +                        gf_log (this->name, GF_LOG_ERROR, +                                "failed to inject fsync event"); +        } + +        return NULL; +} + +/* macros for inode/changelog version checks */ + +#define INODE_VERSION_UPDATE(priv, inode, iver, slice, type) do {       \ +                LOCK (&inode->lock);                                    \ +                {                                                       \ +                        LOCK (&priv->lock);                             \ +                        {                                               \ +                                *iver = slice->changelog_version[type]; \ +                        }                                               \ +                        UNLOCK (&priv->lock);                           \ +                }                                                       \ +                UNLOCK (&inode->lock);                                  \ +        } while (0) + +#define INODE_VERSION_EQUALS_SLICE(priv, ver, slice, type, upd) do {    \ +                LOCK (&priv->lock);                                     \ +                {                                                       \ +                        upd = (ver == slice->changelog_version[type])   \ +                                ? _gf_false : _gf_true;                 \ +                }                                                       \ +                UNLOCK (&priv->lock);                                   \ +        } while (0) + +static int +__changelog_inode_ctx_set (xlator_t *this, +                           inode_t *inode, changelog_inode_ctx_t *ctx) +{ +        uint64_t ctx_addr = (uint64_t) ctx; +        return __inode_ctx_set (inode, this, &ctx_addr); +} + +/** + * one shot routine to get the address and the value of a inode version + * for a particular type. + */ +static changelog_inode_ctx_t * +__changelog_inode_ctx_get (xlator_t *this, +                           inode_t *inode, unsigned long **iver, +                           unsigned long *version, changelog_log_type type) +{ +        int                    ret      = 0; +        uint64_t               ctx_addr = 0; +        changelog_inode_ctx_t *ctx      = NULL; + +        ret = __inode_ctx_get (inode, this, &ctx_addr); +        if (ret < 0) +                ctx_addr = 0; +        if (ctx_addr != 0) { +                ctx = (changelog_inode_ctx_t *) (long)ctx_addr; +                goto out; +        } + +        ctx = GF_CALLOC (1, sizeof (*ctx), gf_changelog_mt_inode_ctx_t); +        if (!ctx) +                goto out; + +        ret = __changelog_inode_ctx_set (this, inode, ctx); +        if (ret) { +                GF_FREE (ctx); +                ctx = NULL; +        } + + out: +        if (ctx && iver && version) { +                *iver = CHANGELOG_INODE_VERSION_TYPE (ctx, type); +                *version = **iver; +        } + +        return ctx; +} + +static changelog_inode_ctx_t * +changelog_inode_ctx_get (xlator_t *this, +                         inode_t *inode, unsigned long **iver, +                         unsigned long *version, changelog_log_type type) +{ +        changelog_inode_ctx_t *ctx = NULL; + +        LOCK (&inode->lock); +        { +                ctx = __changelog_inode_ctx_get (this, +                                                 inode, iver, version, type); +        } +        UNLOCK (&inode->lock); + +        return ctx; +} + +/** + * This is the main update routine. Locking has been made granular so as to + * maximize parallelism of fops - I'll try to explain it below using execution + * timelines. + * + * Basically, the contention is between multiple execution threads of this + * routine and the roll-over thread. So, instead of having a big lock, we hold + * granular locks: inode->lock and priv->lock. Now I'll explain what happens + * when there is an update and a roll-over at just about the same time. + * NOTE: + *  - the dispatcher itself synchronizes updates via it's own lock + *  - the slice version in incremented by the roll-over thread + * + * Case 1: When the rollover thread wins before the inode version can be + * compared with the slice version. + * + *          [updater]                 |             [rollover] + *                                    | + *                                    |           <SLICE: 1, 1, 1> + * <changelog_update>                 | + *   <changelog_inode_ctx_get>        | + *      <CTX: 1, 1, 1>                | + *                                    |         <dispatch-rollover-event> + *                                    |         LOCK (&priv->lock) + *                                    |            <SLICE_VERSION_UPDATE> + *                                    |              <SLICE: 2, 2, 2> + *                                    |         UNLOCK (&priv->lock) + *                                    | + * LOCK (&priv->lock)                 | + *   <INODE_VERSION_EQUALS_SLICE>     | + *    I: 1 <-> S: 2                   | + *    update: true                    | + * UNLOCK (&priv->lock)               | + *                                    | + * <if update == true>                | + *  <dispath-update-event>            | + *  <INODE_VERSION_UPDATE>            | + *   LOCK (&inode->lock)              | + *    LOCK (&priv->lock)              | + *     <CTX: 2, 1, 1>                 | + *    UNLOCK (&priv->lock)            | + *   UNLOCK (&inode->lock)            | + * + * Therefore, the change gets recorded in the next change (no lost change). If + * the slice version was ahead of the inode version (say I:1, S: 2), then + * anyway the comparison would result in a update (I: 1, S: 3). + * + * If the rollover time is too less, then there is another contention when the + * updater tries to bring up inode version to the slice version (this is also + * the case when the roll-over thread wakes up during INODE_VERSION_UPDATE. + * + *   <CTX: 1, 1, 1>                   |       <SLICE: 2, 2, 2> + *                                    | + *                                    | + * <dispath-update-event>             | + * <INODE_VERSION_UPDATE>             | + *  LOCK (&inode->lock)               | + *   LOCK (&priv->lock)               | + *    <CTX: 2, 1, 1>                  | + *   UNLOCK (&priv->lock)             | + *  UNLOCK (&inode->lock)             | + *                                    |         <dispatch-rollover-event> + *                                    |         LOCK (&priv->lock) + *                                    |            <SLICE_VERSION_UPDATE> + *                                    |              <SLICE: 3, 3, 3> + *                                    |         UNLOCK (&priv->lock) + * + * + * Case 2: When the fop thread wins + * + *          [updater]                 |             [rollover] + *                                    | + *                                    |           <SLICE: 1, 1, 1> + * <changelog_update>                 | + *   <changelog_inode_ctx_get>        | + *      <CTX: 0, 0, 0>                | + *                                    | + * LOCK (&priv->lock)                 | + *   <INODE_VERSION_EQUALS_SLICE>     | + *    I: 0 <-> S: 1                   | + *    update: true                    | + * UNLOCK (&priv->lock)               | + *                                    |         <dispatch-rollover-event> + *                                    |         LOCK (&priv->lock) + *                                    |            <SLICE_VERSION_UPDATE> + *                                    |              <SLICE: 2, 2, 2> + *                                    |         UNLOCK (&priv->lock) + * <if update == true>                | + *  <dispath-update-event>            | + *  <INODE_VERSION_UPDATE>            | + *   LOCK (&inode->lock)              | + *    LOCK (&priv->lock)              | + *     <CTX: 2, 0, 0>                 | + *    UNLOCK (&priv->lock)            | + *   UNLOCK (&inode->lock)            | + * + * Here again, if the inode version was equal to the slice version (I: 1, S: 1) + * then there is no need to record an update (as the equality of the two version + * signifies an update was recorded in the current time slice). + */ +inline void +changelog_update (xlator_t *this, changelog_priv_t *priv, +                  changelog_local_t *local, changelog_log_type type) +{ +        int                     ret        = 0; +        unsigned long          *iver       = NULL; +        unsigned long           version    = 0; +        inode_t                *inode      = NULL; +        changelog_time_slice_t *slice      = NULL; +        changelog_inode_ctx_t  *ctx        = NULL; +        changelog_log_data_t   *cld_0      = NULL; +        changelog_log_data_t   *cld_1      = NULL; +        changelog_local_t      *next_local = NULL; +        gf_boolean_t            need_upd   = _gf_true; + +        slice = &priv->slice; + +        /** +         * for fops that do not require inode version checking +         */ +        if (local->update_no_check) +                goto update; + +        inode = local->inode; + +        ctx = changelog_inode_ctx_get (this, +                                       inode, &iver, &version, type); +        if (!ctx) +                goto update; + +        INODE_VERSION_EQUALS_SLICE (priv, version, slice, type, need_upd); + + update: +        if (need_upd) { +                cld_0 = &local->cld; +                cld_0->cld_type = type; + +                if ( (next_local = local->prev_entry) != NULL ) { +                        cld_1 = &next_local->cld; +                        cld_1->cld_type = type; +                } + +                ret = priv->cd.dispatchfn (this, priv, +                                           priv->cd.cd_data, cld_0, cld_1); + +                /** +                 * update after the dispatcher has successfully done +                 * it's job. +                 */ +                if (!local->update_no_check && iver && !ret) +                        INODE_VERSION_UPDATE (priv, inode, iver, slice, type); +        } + +        return; +}  | 
