/* Copyright (c) 2013 Red Hat, Inc. This file is part of GlusterFS. This file is licensed to you under your choice of the GNU Lesser General Public License, version 3 or any later version (LGPLv3 or later), or the GNU General Public License, version 2 (GPLv2), in all cases as published by the Free Software Foundation. */ #include "changelog-policy.h" #include "changelog-encoders.h" #include "changelog-fops.h" #define JOURNAL_NAME "TERM" #define JOURNAL_SECTOR_SIZE 128 #define PRE_OP_MARK 0x5F4552505FULL /* _PRE_ */ #define POST_OP_MARK 0x5F54534F505FULL /* _POST_ */ /* similar to fop_fn, but... */ size_t int32_fn (void *data, char *buffer, gf_boolean_t encode) { size_t bufsz = 0; int nr = 0; char buf[20] = {0,}; nr = *(int *) data; if (encode) { (void) snprintf (buf, sizeof (buf), "%d", nr); CHANGELOG_FILL_BUFFER (buffer, bufsz, buf, strlen (buf)); } else CHANGELOG_FILL_BUFFER (buffer, bufsz, &nr, sizeof (int)); return bufsz; } size_t uint32_fn (void *data, char *buffer, gf_boolean_t encode) { size_t bufsz = 0; unsigned int nr = 0; char buf[20] = {0,}; nr = *(unsigned int *) data; if (encode) { (void) snprintf (buf, sizeof (buf), "%u", nr); CHANGELOG_FILL_BUFFER (buffer, bufsz, buf, strlen (buf)); } else CHANGELOG_FILL_BUFFER (buffer, bufsz, &nr, sizeof (unsigned int)); return bufsz; } size_t number_fn (void *data, char *buffer, gf_boolean_t encode) { char buf[1024] = {0,}; size_t bufsz = 0; unsigned long long nr = 0; nr = *(unsigned long long *) data; if (encode) { (void) snprintf (buf, sizeof (buf), "%llu", nr); CHANGELOG_FILL_BUFFER (buffer, bufsz, buf, strlen (buf)); } else CHANGELOG_FILL_BUFFER (buffer, bufsz, &nr, sizeof (unsigned long long)); return bufsz; } size_t uuid_fn (void *data, char *buffer, gf_boolean_t encode) { char buf[1024] = {0,}; uuid_t uuid = {0,}; size_t bufsz = 0; memcpy (uuid, (uuid_t *) data, sizeof (uuid_t)); if (encode) { char *tmpbuf = uuid_utoa (uuid); (void) snprintf (buf, sizeof (buf), "%s", tmpbuf); CHANGELOG_FILL_BUFFER (buffer, bufsz, buf, strlen (buf)); } else CHANGELOG_FILL_BUFFER (buffer, bufsz, uuid, sizeof (uuid_t)); return bufsz; } #define CHANGELOG_FILL_USIGNLL(co, number, converter, xlen) do { \ co->co_convert = converter; \ co->co_free = NULL; \ co->co_type = CHANGELOG_OPT_REC_ULL; \ co->co_number = (unsigned long long) number; \ xlen += sizeof (unsigned long long); \ if (!co->co_convert) \ co->co_len = sizeof (unsigned long long); \ } while (0) #define CHANGELOG_FILL_UUID(co, uuid, converter, xlen) do { \ co->co_convert = converter; \ co->co_free = NULL; \ co->co_type = CHANGELOG_OPT_REC_UUID; \ uuid_copy (co->co_uuid, uuid); \ xlen += sizeof (uuid_t); \ } while (0) /* TBD: move declarations here and nsr.c into a common place */ #define NSR_TERM_XATTR "trusted.nsr.term" #define NSR_INDEX_XATTR "trusted.nsr.index" #define RECON_TERM_XATTR "trusted.nsr.recon-term" #define RECON_INDEX_XATTR "trusted.nsr.recon-index" static gf_boolean_t changelog_fix_term(xlator_t *this, changelog_local_t *local, dict_t *xdata) { int32_t old_term, new_term; uint32_t index; changelog_priv_t *priv = this->private; int ret = 0; char nfile[PATH_MAX] = {0,}; int32_t recon_term, recon_index; changelog_rollover_data_t crd; // If coming via the regular IO path, we should get the dict "nsr-term" // If coming via reconciliation, we should get the dicts "nsr-recon-term" // that indicates the term and "nsr-recon-index" for the index if ((dict_get_int32(xdata,NSR_TERM_XATTR,&new_term) == 0) && (dict_get_uint32(xdata, NSR_INDEX_XATTR, &index) == 0)) { old_term = priv->term; if (old_term != new_term) { GF_ASSERT(new_term > old_term); LOCK (&priv->lock); priv->term = new_term; (void) snprintf (nfile, PATH_MAX, "%s.%d", JOURNAL_NAME, priv->term); ret = CHANGELOG_INVOKE_CFOP(this, priv, rollover, nfile, _gf_false); UNLOCK (&priv->lock); if (ret != 0) return _gf_false; } local->nr_bytes = 0; local->lu.val = index; } else if ((dict_get_int32(xdata, RECON_TERM_XATTR, &recon_term) == 0) && (dict_get_int32(xdata, RECON_INDEX_XATTR, &recon_index) == 0)) { old_term = priv->term; if (old_term != recon_term) { LOCK (&priv->lock); { priv->term = recon_term; (void) snprintf (crd.crd_changelog_name, PATH_MAX, "%s.%d", JOURNAL_NAME, priv->term); crd.crd_prealloc_size = 1<<29; ret = changelog_open(this, priv, local, &crd); } UNLOCK (&priv->lock); if (ret != 0) return _gf_false; } local->nr_bytes = 0; local->lu.val = recon_index; } else { return _gf_false; } return _gf_true; } /** * Replication policy records journal entries in the FOP path. This is * quite different that the default policy (used by geo-replication), * which journals records in the callback path on a successfull posix * operation. Additionally, each record starts with a PRE-OP marker and * the index number generated by the leader. * (c.f. nsr_$NAME$() ~/xlator/cluster/nsr/nsr-server/src/all-templates.c) * * POST-OPs are marked asynchronously and not during in the callback path * Marking it in the callback path is incorrect as the actual FOP may not * have been synchronized to the disk. Therefore, POST op marking is done * after a successful file system sync, which is trigerred periodically * by NSR server component. To keep journal updates strictly sequential, * POST-OPs are separate record in the journal. */ /** * Override File Operations * * NOTE: Since journal updates are done in the FOP path, there is no * actual use of @local in cbk. Therefore, @local could have been * declared statically for each FOP (which would remove the overhead * of allocating (here) and deallocating (in cbk)). Let's not do that * and keep it this way for now. Will worry about it later (for code * reasonability and performance). */ int32_t changelog_replication_rmdir (call_frame_t *frame, xlator_t *this, loc_t *loc, int xflags, dict_t *xdata) { int ret = -1; size_t xtra_len = 0; changelog_priv_t *priv = NULL; changelog_opt_t *co = NULL; changelog_local_t *local = NULL; priv = this->private; /*
 + IDX + FOP + GFID + UID + GID + Entry */
        CHANGELOG_INIT_NOCHECK (this, local, NULL, loc->inode->gfid, 7);
        if (!local)
                goto out;

        co = changelog_get_usable_buffer (local);
        if (!co)
                goto out;

        if (changelog_fix_term(this, local, xdata) == _gf_false)
                goto out;

        CHANGELOG_FILL_USIGNLL (co, PRE_OP_MARK, NULL, xtra_len);
        co++;

        CHANGELOG_FILL_UINT32 (co, local->lu.val, uint32_fn, xtra_len);
        co++;

        CHANGELOG_FILL_FOP_NUMBER (co, frame->root->op, fop_fn, xtra_len);
        co++;

        CHANGELOG_FILL_UUID (co, loc->inode->gfid, uuid_fn, xtra_len);
        co++;

        CHANGELOG_FILL_INT32 (co, frame->root->uid, int32_fn, xtra_len);
        co++;

        CHANGELOG_FILL_INT32 (co, frame->root->gid, int32_fn, xtra_len);
        co++;

        CHANGELOG_FILL_ENTRY (co, loc->pargfid, loc->name,
                              entry_fn, entry_free_fn, xtra_len, out);

        changelog_set_usable_record_and_length (local, xtra_len, 7);

        frame->local = local;
        ret = 0;

        changelog_update (this, priv, local, CHANGELOG_TYPE_ENTRY);

 out:
        if (ret)
                changelog_local_cleanup (this, local);
        return ret;
}

int32_t
changelog_replication_unlink (call_frame_t *frame, xlator_t *this,
                              loc_t *loc, int xflags, dict_t *xdata)
{
        return changelog_replication_rmdir (frame, this, loc, xflags, xdata);
}

int32_t
changelog_replication_rename (call_frame_t *frame, xlator_t *this,
                              loc_t *oldloc, loc_t *newloc, dict_t *xdata)
{
        int                ret      = -1;
        size_t             xtra_len = 0;
        changelog_priv_t  *priv     = NULL;
        changelog_opt_t   *co       = NULL;
        changelog_local_t *local    = NULL;

        priv = this->private;

        /* 
 + IDX + FOP + GFID + UID + GID + OLDLOC + NEWLOC */
        CHANGELOG_INIT_NOCHECK (this, local, NULL, oldloc->inode->gfid, 8);
        if (!local)
                goto out;

        co = changelog_get_usable_buffer (local);
        if (!co)
                goto out;

        if (changelog_fix_term(this, local, xdata) == _gf_false)
                goto out;

        CHANGELOG_FILL_USIGNLL (co, PRE_OP_MARK, NULL, xtra_len);
        co++;

        CHANGELOG_FILL_UINT32 (co, local->lu.val, uint32_fn, xtra_len);
        co++;

        CHANGELOG_FILL_FOP_NUMBER (co, frame->root->op, fop_fn, xtra_len);
        co++;

        CHANGELOG_FILL_UUID (co, oldloc->inode->gfid, uuid_fn, xtra_len);
        co++;

        CHANGELOG_FILL_INT32 (co, frame->root->uid, int32_fn, xtra_len);
        co++;

        CHANGELOG_FILL_INT32 (co, frame->root->gid, int32_fn, xtra_len);
        co++;

        CHANGELOG_FILL_ENTRY (co, oldloc->pargfid, oldloc->name,
                              entry_fn, entry_free_fn, xtra_len, out);
        co++;

        CHANGELOG_FILL_ENTRY (co, newloc->pargfid, newloc->name,
                              entry_fn, entry_free_fn, xtra_len, out);

        changelog_set_usable_record_and_length (local, xtra_len, 8);

        frame->local = local;
        ret = 0;

        changelog_update (this, priv, local, CHANGELOG_TYPE_ENTRY);

 out:
        if (ret)
                changelog_local_cleanup (this, local);
        return ret;
}

int32_t
changelog_replication_link (call_frame_t *frame,
                            xlator_t *this, loc_t *oldloc,
                            loc_t *newloc, dict_t *xdata)
{
        int                ret      = -1;
        size_t             xtra_len = 0;
        changelog_priv_t  *priv     = NULL;
        changelog_opt_t   *co       = NULL;
        changelog_local_t *local    = NULL;

        priv = this->private;

        /* 
 + IDX + FOP + GFID + UID + GID + Entry */
        CHANGELOG_INIT_NOCHECK (this, local, NULL, oldloc->gfid, 7);
        if (!local)
                goto out;

        co = changelog_get_usable_buffer (local);
        if (!co)
                goto out;

        if (changelog_fix_term(this, local, xdata) == _gf_false)
                goto out;

        CHANGELOG_FILL_USIGNLL (co, PRE_OP_MARK, NULL, xtra_len);
        co++;

        CHANGELOG_FILL_UINT32 (co, local->lu.val, uint32_fn, xtra_len);
        co++;

        CHANGELOG_FILL_FOP_NUMBER (co, frame->root->op, fop_fn, xtra_len);
        co++;

        CHANGELOG_FILL_UUID (co, oldloc->gfid, uuid_fn, xtra_len);
        co++;

        CHANGELOG_FILL_INT32 (co, frame->root->uid, int32_fn, xtra_len);
        co++;

        CHANGELOG_FILL_INT32 (co, frame->root->gid, int32_fn, xtra_len);
        co++;

        CHANGELOG_FILL_ENTRY (co, newloc->pargfid, newloc->name,
                              entry_fn, entry_free_fn, xtra_len, out);

        changelog_set_usable_record_and_length (local, xtra_len, 7);

        frame->local = local;
        ret = 0;

        changelog_update (this, priv, local, CHANGELOG_TYPE_ENTRY);

 out:
        if (ret)
                changelog_local_cleanup (this, local);
        return ret;
}

int32_t
changelog_replication_mkdir (call_frame_t *frame,
                             xlator_t *this, loc_t *loc,
                             mode_t mode, mode_t umask, dict_t *xdata)
{
        int                ret      = -1;
        uuid_t             gfid     = {0,};
        void              *uuid_req = NULL;
        size_t             xtra_len = 0;
        changelog_priv_t  *priv     = NULL;
        changelog_opt_t   *co       = NULL;
        changelog_local_t *local    = NULL;

        priv = this->private;

        ret = dict_get_ptr (xdata, "gfid-req", &uuid_req);
        if (ret) {
                gf_log (this->name, GF_LOG_DEBUG,
                        "failed to get gfid from dict");
                goto out;
        }
        uuid_copy (gfid, uuid_req);

        ret = -1;

        /* 
 + IDX + FOP + GFID + MODE + UID + GID + Entry */
        CHANGELOG_INIT_NOCHECK (this, local, NULL, gfid, 8);
        if (!local)
                goto out;

        co = changelog_get_usable_buffer (local);
        if (!co)
                goto out;

        if (changelog_fix_term(this, local, xdata) == _gf_false)
                goto out;

        CHANGELOG_FILL_USIGNLL (co, PRE_OP_MARK, NULL, xtra_len);
        co++;

        CHANGELOG_FILL_UINT32 (co, local->lu.val, uint32_fn, xtra_len);
        co++;

        CHANGELOG_FILL_FOP_NUMBER (co, frame->root->op, fop_fn, xtra_len);
        co++;

        CHANGELOG_FILL_UUID (co, gfid, uuid_fn, xtra_len);
        co++;

        CHANGELOG_FILL_UINT32 (co, mode | S_IFDIR, uint32_fn, xtra_len);
        co++;

        CHANGELOG_FILL_INT32 (co, frame->root->uid, int32_fn, xtra_len);
        co++;

        CHANGELOG_FILL_INT32 (co, frame->root->gid, int32_fn, xtra_len);
        co++;

        CHANGELOG_FILL_ENTRY (co, loc->pargfid, loc->name,
                              entry_fn, entry_free_fn, xtra_len, out);

        changelog_set_usable_record_and_length (local, xtra_len, 8);

        frame->local = local;
        ret = 0;

        changelog_update (this, priv, local, CHANGELOG_TYPE_ENTRY);

 out:
        if (ret)
                changelog_local_cleanup (this, local);
        return ret;
}

int32_t
changelog_replication_symlink (call_frame_t *frame, xlator_t *this,
                               const char *linkname, loc_t *loc,
                               mode_t umask, dict_t *xdata)
{
        int                ret      = -1;
        size_t             xtra_len = 0;
        uuid_t             gfid     = {0,};
        void              *uuid_req = NULL;
        changelog_priv_t  *priv     = NULL;
        changelog_opt_t   *co       = NULL;
        changelog_local_t *local    = NULL;

        priv = this->private;

        ret = dict_get_ptr (xdata, "gfid-req", &uuid_req);
        if (ret) {
                gf_log (this->name, GF_LOG_DEBUG,
                        "failed to get gfid from dict");
                goto out;
        }
        uuid_copy (gfid, uuid_req);

        ret = -1;

        /* 
 + IDX + FOP + GFID + LINKNAME + UID + GID + Entry */
        CHANGELOG_INIT_NOCHECK (this, local, NULL, gfid, 8);
        if (!local)
                goto out;

        co = changelog_get_usable_buffer (local);
        if (!co)
                goto out;

        if (changelog_fix_term(this, local, xdata) == _gf_false)
                goto out;

        CHANGELOG_FILL_USIGNLL (co, PRE_OP_MARK, NULL, xtra_len);
        co++;

        CHANGELOG_FILL_UINT32 (co, local->lu.val, uint32_fn, xtra_len);
        co++;

        CHANGELOG_FILL_FOP_NUMBER (co, frame->root->op, fop_fn, xtra_len);
        co++;

        CHANGELOG_FILL_UUID (co, gfid, uuid_fn, xtra_len);
        co++;

        CHANGELOG_FILL_NAME (co, linkname, entry_free_fn, xtra_len, out);
        co++;

        CHANGELOG_FILL_INT32 (co, frame->root->uid, int32_fn, xtra_len);
        co++;

        CHANGELOG_FILL_INT32 (co, frame->root->gid, int32_fn, xtra_len);
        co++;

        CHANGELOG_FILL_ENTRY (co, loc->pargfid, loc->name,
                              entry_fn, entry_free_fn, xtra_len, out);

        changelog_set_usable_record_and_length (local, xtra_len, 8);

        frame->local = local;
        ret = 0;

        changelog_update (this, priv, local, CHANGELOG_TYPE_ENTRY);

 out:
        if (ret)
                changelog_local_cleanup (this, local);
        return ret;
}

int32_t
changelog_replication_mknod (call_frame_t *frame,
                             xlator_t *this, loc_t *loc,
                             mode_t mode, dev_t dev,
                             mode_t umask, dict_t *xdata)
{
        int                ret      = -1;
        uuid_t             gfid     = {0,};
        void              *uuid_req = NULL;
        size_t             xtra_len = 0;
        changelog_priv_t  *priv     = NULL;
        changelog_opt_t   *co       = NULL;
        changelog_local_t *local    = NULL;

        priv = this->private;

        ret = dict_get_ptr (xdata, "gfid-req", &uuid_req);
        if (ret) {
                gf_log (this->name, GF_LOG_DEBUG,
                        "failed to get gfid from dict");
                goto out;
        }
        uuid_copy (gfid, uuid_req);

        ret = -1;

        /* 
 + IDX + FOP + GFID + MODE + UID + GID + DEV + Entry */
        CHANGELOG_INIT_NOCHECK (this, local, NULL, gfid, 9);

        co = changelog_get_usable_buffer (local);
        if (!co)
                goto out;

        if (changelog_fix_term (this, local, xdata) == _gf_false)
                goto out;

        CHANGELOG_FILL_USIGNLL (co, PRE_OP_MARK, NULL, xtra_len);
        co++;

        CHANGELOG_FILL_UINT32 (co, local->lu.val, uint32_fn, xtra_len);
        co++;

        CHANGELOG_FILL_FOP_NUMBER (co, frame->root->op, fop_fn, xtra_len);
        co++;

        CHANGELOG_FILL_UUID (co, gfid, uuid_fn, xtra_len);
        co++;

        CHANGELOG_FILL_UINT32 (co, mode, uint32_fn, xtra_len);
        co++;

        CHANGELOG_FILL_INT32 (co, frame->root->uid, int32_fn, xtra_len);
        co++;

        CHANGELOG_FILL_INT32 (co, frame->root->gid, int32_fn, xtra_len);
        co++;

        CHANGELOG_FILL_USIGNLL (co, dev, number_fn, xtra_len);
        co++;

        CHANGELOG_FILL_ENTRY (co, loc->pargfid, loc->name,
                              entry_fn, entry_free_fn, xtra_len, out);

        changelog_set_usable_record_and_length (local, xtra_len, 9);

        frame->local = local;
        ret = 0;

        changelog_update (this, priv, local, CHANGELOG_TYPE_ENTRY);

 out:
        if (ret)
                changelog_local_cleanup (this, local);
        return ret;
}

int32_t
changelog_replication_create (call_frame_t *frame, xlator_t *this,
                              loc_t *loc, int32_t flags, mode_t mode,
                              mode_t umask, fd_t *fd, dict_t *xdata)
{
        int                ret      = -1;
        uuid_t             gfid     = {0,};
        void              *uuid_req = NULL;
        size_t             xtra_len = 0;
        changelog_priv_t  *priv     = NULL;
        changelog_opt_t   *co       = NULL;
        changelog_local_t *local    = NULL;

        priv = this->private;

        ret = dict_get_ptr (xdata, "gfid-req", &uuid_req);
        if (ret) {
                gf_log (this->name, GF_LOG_DEBUG,
                        "failed to get gfid from dict");
                goto out;
        }
        uuid_copy (gfid, uuid_req);

        ret = -1;

        /* 
 + IDX + FOP + GFID + MODE + UID + GID + ENTRY */
        CHANGELOG_INIT_NOCHECK (this, local, NULL, gfid, 8);

        co = changelog_get_usable_buffer (local);
        if (!co)
                goto out;

        if (changelog_fix_term (this, local, xdata) == _gf_false)
                goto out;

        CHANGELOG_FILL_USIGNLL (co, PRE_OP_MARK, NULL, xtra_len);
        co++;

        CHANGELOG_FILL_UINT32 (co, local->lu.val, uint32_fn, xtra_len);
        co++;

        CHANGELOG_FILL_FOP_NUMBER (co, frame->root->op, fop_fn, xtra_len);
        co++;

        CHANGELOG_FILL_UUID (co, gfid, uuid_fn, xtra_len);
        co++;

        CHANGELOG_FILL_UINT32 (co, mode, uint32_fn, xtra_len);
        co++;

        CHANGELOG_FILL_INT32 (co, frame->root->uid, int32_fn, xtra_len);
        co++;

        CHANGELOG_FILL_INT32 (co, frame->root->gid, int32_fn, xtra_len);
        co++;

        CHANGELOG_FILL_ENTRY (co, loc->pargfid, loc->name,
                              entry_fn, entry_free_fn, xtra_len, out);

        changelog_set_usable_record_and_length (local, xtra_len, 8);

        frame->local = local;
        ret = 0;

        changelog_update (this, priv, local, CHANGELOG_TYPE_ENTRY);

 out:
        if (ret)
                changelog_local_cleanup (this, local);
        return ret;
}

static int
_changelog_setattr_fill_common (call_frame_t *frame, xlator_t *this,
                                int32_t attr, struct iatt *stbuf,
                                uuid_t gfid, dict_t *xdata)
{
        int                ret        = -1;
        size_t             xtra_len   = 0;
        int                used_count = 0;
        changelog_priv_t  *priv       = NULL;
        changelog_opt_t   *co         = NULL;
        changelog_local_t *local      = NULL;

        priv = this->private;

        used_count = 7;
        CHANGELOG_INIT_NOCHECK (this, local, NULL, gfid, used_count);
        if (!local)
                goto out;

        co = changelog_get_usable_buffer (local);
        if (!co)
                goto out;

        if (changelog_fix_term(this, local, xdata) == _gf_false)
                goto out;

        /**
         * - 
         * - IDX
         * - FOP
         * - GFID
         * - Valid flag
         *     GF_SET_ATTR_MODE [chmod]
         *       ->ia_prot
         *       ->ia_type
         *     GF_SET_ATTR_UID | GF_SET_ATTR_GID [chown]
         *       ->ia_uid
         *       ->ia_gid
         *     GF_SET_ATTR_ATIME | GF_SET_ATTR_MTIME [utimes]
         *       ->ia_atime
         *       ->ia_mtime
         */

        CHANGELOG_FILL_USIGNLL (co, PRE_OP_MARK, NULL, xtra_len);
        co++;

        CHANGELOG_FILL_UINT32 (co, local->lu.val, uint32_fn, xtra_len);
        co++;

        CHANGELOG_FILL_FOP_NUMBER (co, frame->root->op, fop_fn, xtra_len);
        co++;

        CHANGELOG_FILL_UUID (co, gfid, uuid_fn, xtra_len);
        co++;

        CHANGELOG_FILL_UINT32 (co, attr, uint32_fn, xtra_len);
        co++;

        if (attr & GF_SET_ATTR_MODE) {
                mode_t mode = 0;

                /* ->ia_prot & ->ia_type stored as a consolidated value */
                used_count--;
                mode = st_mode_from_ia (stbuf->ia_prot, stbuf->ia_type);

                CHANGELOG_FILL_UINT32 (co, mode, uint32_fn, xtra_len);

        } else if (attr & (GF_SET_ATTR_UID | GF_SET_ATTR_GID)) {
                uid_t uid = -1;
                gid_t gid = -1;

                if (attr & GF_SET_ATTR_UID)
                        uid = stbuf->ia_uid;
                if (attr & GF_SET_ATTR_GID)
                        gid = stbuf->ia_gid;

                /* ->ia_uid & ->ia_gid */
                CHANGELOG_FILL_INT32 (co, uid, int32_fn, xtra_len);
                co++;

                CHANGELOG_FILL_INT32 (co, gid, int32_fn, xtra_len);

        } else if (attr & (GF_SET_ATTR_ATIME | GF_SET_ATTR_MTIME)) {

                /* ->ia_atime & ->ia_mtime, need usecs? */
                CHANGELOG_FILL_UINT32 (co,
                                       stbuf->ia_atime, uint32_fn, xtra_len);
                co++;

                CHANGELOG_FILL_UINT32 (co,
                                       stbuf->ia_mtime, uint32_fn, xtra_len);
        }

        changelog_set_usable_record_and_length (local, xtra_len, used_count);

        ret = 0;
        frame->local = local;

        changelog_update (this, priv, frame->local, CHANGELOG_TYPE_METADATA);

 out:
        if (ret)
                changelog_local_cleanup (this, local);

        return ret;
}

int32_t
changelog_replication_fsetattr (call_frame_t *frame,
                                xlator_t *this, fd_t *fd,
                                struct iatt *stbuf, int32_t valid,
                                dict_t *xdata)
{
        return _changelog_setattr_fill_common (frame, this, valid,
                                               stbuf, fd->inode->gfid, xdata);
}

int32_t
changelog_replication_setattr (call_frame_t *frame,
                               xlator_t *this, loc_t *loc,
                               struct iatt *stbuf, int32_t valid, dict_t *xdata)
{
        return _changelog_setattr_fill_common (frame, this, valid,
                                               stbuf, loc->inode->gfid, xdata);
}

int32_t
changelog_replication_fremovexattr (call_frame_t *frame, xlator_t *this,
                                    fd_t *fd, const char *name, dict_t *xdata)
{
        int                ret      = -1;
        size_t             xtra_len = 0;
        changelog_opt_t   *co       = NULL;
        changelog_priv_t  *priv     = NULL;
        changelog_local_t *local    = NULL;
        int32_t            xattr_op;

        priv = this->private;

        /* 
 + IDX + FOP + GFID */
        CHANGELOG_INIT_NOCHECK (this, local, NULL, fd->inode->gfid, 4);
        if (!local)
                goto out;

        co = changelog_get_usable_buffer (local);
        if (!co)
                goto out;

        if (changelog_fix_term(this, local, xdata) == _gf_false)
                goto out;

        CHANGELOG_FILL_USIGNLL (co, PRE_OP_MARK, NULL, xtra_len);
        co++;

        CHANGELOG_FILL_UINT32 (co, local->lu.val, uint32_fn, xtra_len);
        co++;

        if (dict_get_int32(xdata, "recon-xattr-opcode", &xattr_op) == 0)
                CHANGELOG_FILL_FOP_NUMBER (co, (glusterfs_fop_t)xattr_op,
                                                fop_fn, xtra_len);
        else
                CHANGELOG_FILL_FOP_NUMBER (co, frame->root->op, fop_fn,
                                           xtra_len);
        co++;

        CHANGELOG_FILL_UUID (co, fd->inode->gfid, uuid_fn, xtra_len);

        changelog_set_usable_record_and_length (local, xtra_len, 4);

        frame->local = local;
        ret = 0;

        changelog_update (this, priv, local, CHANGELOG_TYPE_METADATA);

 out:
        if (ret)
                changelog_local_cleanup (this, local);
        return ret;
}

int32_t
changelog_replication_removexattr (call_frame_t *frame, xlator_t *this,
                                   loc_t *loc, const char *name, dict_t *xdata)
{
        int                ret      = -1;
        size_t             xtra_len = 0;
        changelog_priv_t  *priv     = NULL;
        changelog_opt_t   *co       = NULL;
        changelog_local_t *local    = NULL;
        int32_t            xattr_op;

        priv = this->private;

        /* 
 + IDX + FOP + GFID */
        CHANGELOG_INIT_NOCHECK (this, local, NULL, loc->inode->gfid, 4);
        if (!local)
                goto out;

        co = changelog_get_usable_buffer (local);
        if (!co)
                goto out;

        if (changelog_fix_term(this, local, xdata) == _gf_false)
                goto out;

        CHANGELOG_FILL_USIGNLL (co, PRE_OP_MARK, NULL, xtra_len);
        co++;

        CHANGELOG_FILL_UINT32 (co, local->lu.val, uint32_fn, xtra_len);
        co++;

        CHANGELOG_FILL_UINT32 (co, local->lu.val, uint32_fn, xtra_len);
        co++;

        if (dict_get_int32(xdata, "recon-xattr-opcode", &xattr_op) == 0)
                CHANGELOG_FILL_FOP_NUMBER (co, (glusterfs_fop_t)xattr_op,
                                           fop_fn, xtra_len);
        else
                CHANGELOG_FILL_FOP_NUMBER (co, frame->root->op, fop_fn,
                                           xtra_len);
        co++;

        CHANGELOG_FILL_UUID (co, loc->inode->gfid, uuid_fn, xtra_len);

        changelog_set_usable_record_and_length (local, xtra_len, 4);

        frame->local = local;
        ret = 0;

        changelog_update (this, priv, local, CHANGELOG_TYPE_METADATA);

 out:
        if (ret)
                changelog_local_cleanup (this, local);
        return ret;
}

int32_t
changelog_replication_setxattr (call_frame_t *frame,
                                xlator_t *this, loc_t *loc,
                                dict_t *dict, int32_t flags, dict_t *xdata)
{
        int                ret      = -1;
        size_t             xtra_len = 0;
        changelog_priv_t  *priv     = NULL;
        changelog_opt_t   *co       = NULL;
        changelog_local_t *local    = NULL;
        int32_t            xattr_op;

        priv = this->private;

        /* 
 + IDX + FOP + GFID */
        CHANGELOG_INIT_NOCHECK (this, local, NULL, loc->inode->gfid, 4);
        if (!local)
                goto out;

        co = changelog_get_usable_buffer (local);
        if (!co)
                goto out;

        if (changelog_fix_term(this, local, xdata) == _gf_false)
                goto out;

        CHANGELOG_FILL_USIGNLL (co, PRE_OP_MARK, NULL, xtra_len);
        co++;

        CHANGELOG_FILL_UINT32 (co, local->lu.val, uint32_fn, xtra_len);
        co++;

        if (dict_get_int32(xdata, "recon-xattr-opcode", &xattr_op) == 0)
                CHANGELOG_FILL_FOP_NUMBER (co, (glusterfs_fop_t)xattr_op,
                                           fop_fn, xtra_len);
        else
                CHANGELOG_FILL_FOP_NUMBER (co, frame->root->op, fop_fn,
                                           xtra_len);
        co++;

        CHANGELOG_FILL_UUID (co, loc->inode->gfid, uuid_fn, xtra_len);

        changelog_set_usable_record_and_length (local, xtra_len, 4);

        frame->local = local;
        ret = 0;

        changelog_update (this, priv, local, CHANGELOG_TYPE_METADATA);

 out:
        if (ret)
                changelog_local_cleanup (this, local);
        return ret;
}

int32_t
changelog_replication_fsetxattr (call_frame_t *frame,
                                 xlator_t *this, fd_t *fd, dict_t *dict,
                                 int32_t flags, dict_t *xdata)
{
        int                ret      = -1;
        size_t             xtra_len = 0;
        changelog_priv_t  *priv     = NULL;
        changelog_opt_t   *co       = NULL;
        changelog_local_t *local    = NULL;
        int32_t            xattr_op;

        priv = this->private;

        /* 
 + IDX + FOP + GFID */
        CHANGELOG_INIT_NOCHECK (this, local, NULL, fd->inode->gfid, 4);
        if (!local)
                goto out;

        co = changelog_get_usable_buffer (local);
        if (!co)
                goto out;

        if (changelog_fix_term(this, local, xdata) == _gf_false)
                goto out;

        CHANGELOG_FILL_USIGNLL (co, PRE_OP_MARK, NULL, xtra_len);
        co++;

        CHANGELOG_FILL_UINT32 (co, local->lu.val, uint32_fn, xtra_len);
        co++;

        if (dict_get_int32(xdata, "recon-xattr-opcode", &xattr_op) == 0)
                CHANGELOG_FILL_FOP_NUMBER (co, (glusterfs_fop_t)xattr_op,
                                           fop_fn, xtra_len);
        else
                CHANGELOG_FILL_FOP_NUMBER (co, frame->root->op, fop_fn,
                                           xtra_len);
        co++;

        CHANGELOG_FILL_UUID (co, fd->inode->gfid, uuid_fn, xtra_len);

        changelog_set_usable_record_and_length (local, xtra_len, 4);

        frame->local = local;
        ret = 0;

        changelog_update (this, priv, local, CHANGELOG_TYPE_METADATA);

 out:
        if (ret)
                changelog_local_cleanup (this, local);
        return ret;
}

int32_t
changelog_replication_truncate (call_frame_t *frame,
                                xlator_t *this, loc_t *loc,
                                off_t offset, dict_t *xdata)
{
        int                ret      = -1;
        size_t             xtra_len = 0;
        changelog_priv_t *priv = NULL;
        changelog_opt_t   *co       = NULL;
        changelog_local_t *local    = NULL;

        priv = this->private;

        /* 
 + IDX + FOP + GFID + Offset */
        CHANGELOG_INIT_NOCHECK (this, local, NULL, loc->inode->gfid, 5);
        if (!local)
                goto out;

        co = changelog_get_usable_buffer (local);
        if (!co)
                goto out;

        if (changelog_fix_term(this, local, xdata) == _gf_false)
                goto out;

        CHANGELOG_FILL_USIGNLL (co, PRE_OP_MARK, NULL, xtra_len);
        co++;

        CHANGELOG_FILL_UINT32 (co, local->lu.val, uint32_fn, xtra_len);
        co++;

        CHANGELOG_FILL_FOP_NUMBER (co, frame->root->op, fop_fn, xtra_len);
        co++;

        CHANGELOG_FILL_UUID (co, loc->inode->gfid, uuid_fn, xtra_len);
        co++;

        CHANGELOG_FILL_USIGNLL (co, offset, number_fn, xtra_len);

        changelog_set_usable_record_and_length (local, xtra_len, 5);

        frame->local = local;
        ret = 0;

        changelog_update (this, priv, local, CHANGELOG_TYPE_DATA);

 out:
        if (ret)
                changelog_local_cleanup (this, local);
        return ret;
}

int32_t
changelog_replication_ftruncate (call_frame_t *frame,
                                 xlator_t *this, fd_t *fd,
                                 off_t offset, dict_t *xdata)
{
        int                ret      = -1;
        size_t             xtra_len = 0;
        changelog_priv_t *priv = NULL;
        changelog_opt_t   *co       = NULL;
        changelog_local_t *local    = NULL;

        priv = this->private;

        /* 
 + IDX + FOP + GFID + Offset */
        CHANGELOG_INIT_NOCHECK (this, local, NULL, fd->inode->gfid, 5);
        if (!local)
                goto out;

        co = changelog_get_usable_buffer (local);
        if (!co)
                goto out;

        if (changelog_fix_term(this, local, xdata) == _gf_false)
                goto out;

        CHANGELOG_FILL_USIGNLL (co, PRE_OP_MARK, NULL, xtra_len);
        co++;

        CHANGELOG_FILL_UINT32 (co, local->lu.val, uint32_fn, xtra_len);
        co++;

        CHANGELOG_FILL_FOP_NUMBER (co, frame->root->op, fop_fn, xtra_len);
        co++;

        CHANGELOG_FILL_UUID (co, fd->inode->gfid, uuid_fn, xtra_len);
        co++;

        CHANGELOG_FILL_USIGNLL (co, offset, number_fn, xtra_len);

        changelog_set_usable_record_and_length (local, xtra_len, 5);

        frame->local = local;
        ret = 0;

        changelog_update (this, priv, local, CHANGELOG_TYPE_DATA);

 out:
        if (ret)
                changelog_local_cleanup (this, local);
        return ret;
}

int32_t
changelog_replication_writev (call_frame_t *frame,
                              xlator_t *this, fd_t *fd, struct iovec *vector,
                              int32_t count, off_t offset, uint32_t flags,
                              struct iobref *iobref, dict_t *xdata)
{
        int                ret      = -1;
        size_t             xtra_len = 0;
        changelog_priv_t *priv = NULL;
        changelog_opt_t   *co       = NULL;
        changelog_local_t *local    = NULL;

        priv = this->private;

        /* 
 + IDX + FOP + GFID + Offset + Length */
        CHANGELOG_INIT_NOCHECK (this, local, NULL, fd->inode->gfid, 6);
        if (!local)
                goto out;

        co = changelog_get_usable_buffer (local);
        if (!co)
                goto out;

        if (changelog_fix_term(this, local, xdata) == _gf_false)
                goto out;

        CHANGELOG_FILL_USIGNLL (co, PRE_OP_MARK, NULL, xtra_len);
        co++;

        CHANGELOG_FILL_UINT32 (co, local->lu.val, uint32_fn, xtra_len);
        co++;

        CHANGELOG_FILL_FOP_NUMBER (co, frame->root->op, fop_fn, xtra_len);
        co++;

        CHANGELOG_FILL_UUID (co, fd->inode->gfid, uuid_fn, xtra_len);
        co++;

        CHANGELOG_FILL_USIGNLL (co, offset, number_fn, xtra_len);
        co++;

        CHANGELOG_FILL_USIGNLL (co, iov_length (vector, count),
                               number_fn, xtra_len);

        changelog_set_usable_record_and_length (local, xtra_len, 6);

        frame->local = local;
        ret = 0;

        changelog_update (this, priv, local, CHANGELOG_TYPE_DATA);

 out:
        if (ret)
                changelog_local_cleanup (this, local);
        return ret;
}

/* overriden COPS */
int
changelog_replication_cops_open (xlator_t *this,
                                 changelog_priv_t *priv, void *cpriv,
                                 char *name, gf_boolean_t last)
{
        changelog_local_t local = {0,};
        changelog_log_data_t       cld = {0,};
        changelog_rollover_data_t *crd = NULL;

        crd = &cld.cld_roll;

        cld.cld_type = CHANGELOG_TYPE_ROLLOVER;

        crd->crd_finale = last;
        crd->crd_use_suffix = _gf_false;
        crd->crd_prealloc_size = 1<<29; /* preallocate 512 MB */


        (void) strcpy (crd->crd_changelog_name, name);

        local.lu.val = 0;
        local.nr_bytes = 0;

        return changelog_inject_single_event (this, priv, &local, &cld);
}

/**
 * NO-OP changelog write (from changelog.c). Records are journaled
 * in the FOP path.
 */
int
changelog_replication_cops_write (xlator_t *this,
                                  changelog_priv_t *priv, void *cpriv,
                                  changelog_local_t *local,
                                  changelog_log_type type)
{
        return 0;
}

/**
 * no implicit rollover
 */
int
changelog_replication_cops_rollover (xlator_t *this,
                                     changelog_priv_t *priv, void *cpriv,
                                     char *name, gf_boolean_t last)
{
        return changelog_replication_cops_open (this, priv, cpriv, name, last);
}

off_t
changelog_replication_cops_get_offset (xlator_t *this,
                                       changelog_priv_t *priv, void *cpriv,
                                       changelog_local_t *local)
{
        if (!local)
                return 0;

        return (local->lu.val * JOURNAL_SECTOR_SIZE) + local->nr_bytes;
}

void
changelog_replication_cops_set_offset (xlator_t *this,
                                       changelog_priv_t *priv, void *cpriv,
                                       changelog_local_t *local, off_t bytes)
{
        local->nr_bytes += bytes;
}

void
changelog_replication_cops_reset_offset (xlator_t *this, changelog_priv_t *priv,
                                         void *cpriv, changelog_local_t *local)
{
        return;
}

int
changelog_replication_policy_init (xlator_t *this,
                                   changelog_priv_t *priv,
                                   struct changelog_logpolicy *cp)
{
        struct xlator_fops   *r_fops = NULL;
        struct changelog_ops *r_cops = NULL;

        r_fops = GF_CALLOC (1, sizeof (struct xlator_fops),
                            gf_changelog_mt_fop_policy_t);
        if (!r_fops)
                return -1;

        r_cops = GF_CALLOC (1, sizeof (struct changelog_ops),
                            gf_changelog_mt_fop_policy_t);
        if (!r_cops) {
                GF_FREE (r_fops);
                return -1;
        }

        cp->cpriv = GF_CALLOC (1, sizeof (off_t),
                               gf_changelog_mt_fop_policy_t);
        if (!cp->cpriv) {
                GF_FREE (r_fops);
                GF_FREE (r_cops);
                return -1;
        }

        /* no roll-over, one big fat journal per term */
        priv->rollover_time = 0;

        /* fsync() is internally trigerred by NSR */
        priv->fsync_interval = 0;

        /* no record header: extra data (via iobufs) are always persisted */
        priv->no_gfid_hdr = _gf_true;

        priv->lockless_update = _gf_false;

        memcpy (r_fops, &changelog_default_fops, sizeof (struct xlator_fops));
        memcpy (r_cops, &changelog_default_cops, sizeof (struct changelog_ops));

        priv->term = 0;
        (void) memset (cp->changelog_name, '\0', PATH_MAX);
        memcpy(cp->changelog_name, JOURNAL_NAME, strlen(JOURNAL_NAME));
#if 0
        (void) snprintf (cp->changelog_name, PATH_MAX,
                         JOURNAL_NAME, priv->term);
#endif

        /* overload all fops */
        r_fops->writev       = changelog_replication_writev;
        r_fops->ftruncate    = changelog_replication_ftruncate;
        r_fops->truncate     = changelog_replication_truncate;
        r_fops->fsetxattr    = changelog_replication_fsetxattr;
        r_fops->setxattr     = changelog_replication_setxattr;
        r_fops->removexattr  = changelog_replication_removexattr;
        r_fops->fremovexattr = changelog_replication_fremovexattr;
        r_fops->setattr      = changelog_replication_setattr;
        r_fops->fsetattr     = changelog_replication_fsetattr;
        r_fops->create       = changelog_replication_create;
        r_fops->mknod        = changelog_replication_mknod;
        r_fops->symlink      = changelog_replication_symlink;
        r_fops->mkdir        = changelog_replication_mkdir;
        r_fops->link         = changelog_replication_link;
        r_fops->rename       = changelog_replication_rename;
        r_fops->unlink       = changelog_replication_unlink;
        r_fops->rmdir        = changelog_replication_rmdir;

        /* overload cops */
        r_cops->open         = changelog_replication_cops_open;
        r_cops->write        = changelog_replication_cops_write;
        r_cops->rollover     = changelog_replication_cops_rollover;
        r_cops->get_offset   = changelog_replication_cops_get_offset;
        r_cops->set_offset   = changelog_replication_cops_set_offset;
        r_cops->reset_offset = changelog_replication_cops_reset_offset;


        cp->fops = r_fops;
        cp->cops = r_cops;

        return 0;
}

int
changelog_replication_policy_fini (xlator_t *this,
                                   struct changelog_logpolicy *cp)
{
        GF_FREE (cp->fops);
        GF_FREE (cp->cops);
        GF_FREE (cp->cpriv);
        return 0;
}