/* Copyright (c) 2013 Red Hat, Inc. This file is part of GlusterFS. This file is licensed to you under your choice of the GNU Lesser General Public License, version 3 or any later version (LGPLv3 or later), or the GNU General Public License, version 2 (GPLv2), in all cases as published by the Free Software Foundation. */ #ifndef _CONFIG_H #define _CONFIG_H #include "config.h" #endif #include "xlator.h" #include "defaults.h" #include "logging.h" #include "iobuf.h" #include #include "changelog-rt.h" #include "changelog-notifier.h" #include "changelog-encoders.h" #include "changelog-mem-types.h" #include "changelog-fops.h" #include "changelog-policy.h" static struct changelog_bootstrap cb_bootstrap[] = { { .mode = CHANGELOG_MODE_RT, .ctor = changelog_rt_init, .dtor = changelog_rt_fini, }, }; static struct changelog_encoder cb_encoder[] = { [CHANGELOG_ENCODE_BINARY] = { .encoder = CHANGELOG_ENCODE_BINARY, .encode = changelog_encode_binary, }, [CHANGELOG_ENCODE_ASCII] = { .encoder = CHANGELOG_ENCODE_ASCII, .encode = changelog_encode_ascii, }, }; static struct changelog_logpolicy cb_policy[] = { [CHANGELOG_LOG_POLICY_DEFAULT] = { .fops = NULL, .cops = NULL, .policy = CHANGELOG_LOG_POLICY_DEFAULT, .init_policy = changelog_default_policy_init, .fini_policy = changelog_default_policy_fini, }, [CHANGELOG_LOG_POLICY_REPLICATE] = { .fops = NULL, .cops = NULL, .policy = CHANGELOG_LOG_POLICY_REPLICATE, .init_policy = changelog_replication_policy_init, .fini_policy = changelog_replication_policy_fini, }, }; /* Entry operations - TYPE III */ /* {{{ */ /* rmdir */ int32_t changelog_rmdir_cbk (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno, struct iatt *preparent, struct iatt *postparent, dict_t *xdata) { changelog_priv_t *priv = NULL; changelog_local_t *local = NULL; priv = this->private; local = frame->local; CHANGELOG_COND_GOTO (priv, ((op_ret < 0) || !local), unwind); CHANGELOG_INVOKE_CFOP (this, priv, write, local, CHANGELOG_TYPE_ENTRY); unwind: CHANGELOG_STACK_UNWIND (rmdir, frame, op_ret, op_errno, preparent, postparent, xdata); return 0; } int32_t changelog_rmdir (call_frame_t *frame, xlator_t *this, loc_t *loc, int xflags, dict_t *xdata) { changelog_priv_t *priv = NULL; priv = this->private; CHANGELOG_NOT_ACTIVE_THEN_GOTO (frame, priv, wind); CHANGELOG_INVOKE_FOP (priv, rmdir, frame, this, loc, xflags, xdata); wind: STACK_WIND (frame, changelog_rmdir_cbk, FIRST_CHILD (this), FIRST_CHILD (this)->fops->rmdir, loc, xflags, xdata); return 0; } /* unlink */ int32_t changelog_unlink_cbk (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno, struct iatt *preparent, struct iatt *postparent, dict_t *xdata) { changelog_priv_t *priv = NULL; changelog_local_t *local = NULL; priv = this->private; local = frame->local; CHANGELOG_COND_GOTO (priv, ((op_ret < 0) || !local), unwind); CHANGELOG_INVOKE_CFOP (this, priv, write, local, CHANGELOG_TYPE_ENTRY); unwind: CHANGELOG_STACK_UNWIND (unlink, frame, op_ret, op_errno, preparent, postparent, xdata); return 0; } int32_t changelog_unlink (call_frame_t *frame, xlator_t *this, loc_t *loc, int xflags, dict_t *xdata) { changelog_priv_t *priv = NULL; priv = this->private; CHANGELOG_NOT_ACTIVE_THEN_GOTO (frame, priv, wind); CHANGELOG_IF_INTERNAL_FOP_THEN_GOTO (xdata, wind); CHANGELOG_INVOKE_FOP (priv, unlink, frame, this, loc, xflags, xdata); wind: STACK_WIND (frame, changelog_unlink_cbk, FIRST_CHILD (this), FIRST_CHILD (this)->fops->unlink, loc, xflags, xdata); return 0; } /* rename */ int32_t changelog_rename_cbk (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno, struct iatt *buf, struct iatt *preoldparent, struct iatt *postoldparent, struct iatt *prenewparent, struct iatt *postnewparent, dict_t *xdata) { changelog_priv_t *priv = NULL; changelog_local_t *local = NULL; priv = this->private; local = frame->local; CHANGELOG_COND_GOTO (priv, ((op_ret < 0) || !local), unwind); CHANGELOG_INVOKE_CFOP (this, priv, write, local, CHANGELOG_TYPE_ENTRY); unwind: CHANGELOG_STACK_UNWIND (rename, frame, op_ret, op_errno, buf, preoldparent, postoldparent, prenewparent, postnewparent, xdata); return 0; } int32_t changelog_rename (call_frame_t *frame, xlator_t *this, loc_t *oldloc, loc_t *newloc, dict_t *xdata) { changelog_priv_t *priv = NULL; priv = this->private; CHANGELOG_NOT_ACTIVE_THEN_GOTO (frame, priv, wind); CHANGELOG_INVOKE_FOP (priv, rename, frame, this, oldloc, newloc, xdata); wind: STACK_WIND (frame, changelog_rename_cbk, FIRST_CHILD (this), FIRST_CHILD (this)->fops->rename, oldloc, newloc, xdata); return 0; } /* link */ int32_t changelog_link_cbk (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno, inode_t *inode, struct iatt *buf, struct iatt *preparent, struct iatt *postparent, dict_t *xdata) { changelog_priv_t *priv = NULL; changelog_local_t *local = NULL; priv = this->private; local = frame->local; CHANGELOG_COND_GOTO (priv, ((op_ret < 0) || !local), unwind); CHANGELOG_INVOKE_CFOP (this, priv, write, local, CHANGELOG_TYPE_ENTRY); unwind: CHANGELOG_STACK_UNWIND (link, frame, op_ret, op_errno, inode, buf, preparent, postparent, xdata); return 0; } int32_t changelog_link (call_frame_t *frame, xlator_t *this, loc_t *oldloc, loc_t *newloc, dict_t *xdata) { changelog_priv_t *priv = NULL; priv = this->private; CHANGELOG_NOT_ACTIVE_THEN_GOTO (frame, priv, wind); CHANGELOG_IF_INTERNAL_FOP_THEN_GOTO (xdata, wind); CHANGELOG_INVOKE_FOP (priv, link, frame, this, oldloc, newloc, xdata); wind: STACK_WIND (frame, changelog_link_cbk, FIRST_CHILD (this), FIRST_CHILD (this)->fops->link, oldloc, newloc, xdata); return 0; } /* mkdir */ int32_t changelog_mkdir_cbk (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno, inode_t *inode, struct iatt *buf, struct iatt *preparent, struct iatt *postparent, dict_t *xdata) { changelog_priv_t *priv = NULL; changelog_local_t *local = NULL; priv = this->private; local = frame->local; CHANGELOG_COND_GOTO (priv, ((op_ret < 0) || !local), unwind); CHANGELOG_INVOKE_CFOP (this, priv, write, local, CHANGELOG_TYPE_ENTRY); unwind: CHANGELOG_STACK_UNWIND (mkdir, frame, op_ret, op_errno, inode, buf, preparent, postparent, xdata); return 0; } int32_t changelog_mkdir (call_frame_t *frame, xlator_t *this, loc_t *loc, mode_t mode, mode_t umask, dict_t *xdata) { changelog_priv_t *priv = NULL; priv = this->private; CHANGELOG_NOT_ACTIVE_THEN_GOTO (frame, priv, wind); CHANGELOG_INVOKE_FOP (priv, mkdir, frame, this, loc, mode, umask, xdata); wind: STACK_WIND (frame, changelog_mkdir_cbk, FIRST_CHILD (this), FIRST_CHILD (this)->fops->mkdir, loc, mode, umask, xdata); return 0; } /* symlink */ int32_t changelog_symlink_cbk (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno, inode_t *inode, struct iatt *buf, struct iatt *preparent, struct iatt *postparent, dict_t *xdata) { changelog_priv_t *priv = NULL; changelog_local_t *local = NULL; priv = this->private; local = frame->local; CHANGELOG_COND_GOTO (priv, ((op_ret < 0) || !local), unwind); CHANGELOG_INVOKE_CFOP (this, priv, write, local, CHANGELOG_TYPE_ENTRY); unwind: CHANGELOG_STACK_UNWIND (symlink, frame, op_ret, op_errno, inode, buf, preparent, postparent, xdata); return 0; } int32_t changelog_symlink (call_frame_t *frame, xlator_t *this, const char *linkname, loc_t *loc, mode_t umask, dict_t *xdata) { changelog_priv_t *priv = NULL; priv = this->private; CHANGELOG_NOT_ACTIVE_THEN_GOTO (frame, priv, wind); CHANGELOG_INVOKE_FOP (priv, symlink, frame, this, linkname, loc, umask, xdata); wind: STACK_WIND (frame, changelog_symlink_cbk, FIRST_CHILD (this), FIRST_CHILD (this)->fops->symlink, linkname, loc, umask, xdata); return 0; } /* mknod */ int32_t changelog_mknod_cbk (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno, inode_t *inode, struct iatt *buf, struct iatt *preparent, struct iatt *postparent, dict_t *xdata) { changelog_priv_t *priv = NULL; changelog_local_t *local = NULL; priv = this->private; local = frame->local; CHANGELOG_COND_GOTO (priv, ((op_ret < 0) || !local), unwind); CHANGELOG_INVOKE_CFOP (this, priv, write, local, CHANGELOG_TYPE_ENTRY); unwind: CHANGELOG_STACK_UNWIND (mknod, frame, op_ret, op_errno, inode, buf, preparent, postparent, xdata); return 0; } int32_t changelog_mknod (call_frame_t *frame, xlator_t *this, loc_t *loc, mode_t mode, dev_t dev, mode_t umask, dict_t *xdata) { changelog_priv_t *priv = NULL; priv = this->private; CHANGELOG_NOT_ACTIVE_THEN_GOTO (frame, priv, wind); CHANGELOG_INVOKE_FOP (priv, mknod, frame, this, loc, mode, dev, umask, xdata); wind: STACK_WIND (frame, changelog_mknod_cbk, FIRST_CHILD (this), FIRST_CHILD (this)->fops->mknod, loc, mode, dev, umask, xdata); return 0; } /* creat */ int32_t changelog_create_cbk (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno, fd_t *fd, inode_t *inode, struct iatt *buf, struct iatt *preparent, struct iatt *postparent, dict_t *xdata) { changelog_priv_t *priv = NULL; changelog_local_t *local = NULL; priv = this->private; local = frame->local; CHANGELOG_COND_GOTO (priv, ((op_ret < 0) || !local), unwind); CHANGELOG_INVOKE_CFOP (this, priv, write, local, CHANGELOG_TYPE_ENTRY); unwind: CHANGELOG_STACK_UNWIND (create, frame, op_ret, op_errno, fd, inode, buf, preparent, postparent, xdata); return 0; } int32_t changelog_create (call_frame_t *frame, xlator_t *this, loc_t *loc, int32_t flags, mode_t mode, mode_t umask, fd_t *fd, dict_t *xdata) { changelog_priv_t *priv = NULL; priv = this->private; CHANGELOG_NOT_ACTIVE_THEN_GOTO (frame, priv, wind); CHANGELOG_INVOKE_FOP (priv, create, frame, this, loc, flags, mode, umask, fd, xdata); wind: STACK_WIND (frame, changelog_create_cbk, FIRST_CHILD (this), FIRST_CHILD (this)->fops->create, loc, flags, mode, umask, fd, xdata); return 0; } /* }}} */ /* Metadata modification fops - TYPE II */ /* {{{ */ /* {f}setattr */ int32_t changelog_fsetattr_cbk (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno, struct iatt *preop_stbuf, struct iatt *postop_stbuf, dict_t *xdata) { changelog_priv_t *priv = NULL; changelog_local_t *local = NULL; priv = this->private; local = frame->local; CHANGELOG_COND_GOTO (priv, ((op_ret < 0) || !local), unwind); CHANGELOG_INVOKE_CFOP (this, priv, write, local, CHANGELOG_TYPE_METADATA); unwind: CHANGELOG_STACK_UNWIND (fsetattr, frame, op_ret, op_errno, preop_stbuf, postop_stbuf, xdata); return 0; } int32_t changelog_fsetattr (call_frame_t *frame, xlator_t *this, fd_t *fd, struct iatt *stbuf, int32_t valid, dict_t *xdata) { changelog_priv_t *priv = NULL; priv = this->private; CHANGELOG_NOT_ACTIVE_THEN_GOTO (frame, priv, wind); CHANGELOG_INVOKE_FOP (priv, fsetattr, frame, this, fd, stbuf, valid, xdata); wind: STACK_WIND (frame, changelog_fsetattr_cbk, FIRST_CHILD (this), FIRST_CHILD (this)->fops->fsetattr, fd, stbuf, valid, xdata); return 0; } int32_t changelog_setattr_cbk (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno, struct iatt *preop_stbuf, struct iatt *postop_stbuf, dict_t *xdata) { changelog_priv_t *priv = NULL; changelog_local_t *local = NULL; priv = this->private; local = frame->local; CHANGELOG_COND_GOTO (priv, ((op_ret < 0) || !local), unwind); CHANGELOG_INVOKE_CFOP (this, priv, write, local, CHANGELOG_TYPE_METADATA); unwind: CHANGELOG_STACK_UNWIND (setattr, frame, op_ret, op_errno, preop_stbuf, postop_stbuf, xdata); return 0; } int32_t changelog_setattr (call_frame_t *frame, xlator_t *this, loc_t *loc, struct iatt *stbuf, int32_t valid, dict_t *xdata) { changelog_priv_t *priv = NULL; priv = this->private; CHANGELOG_NOT_ACTIVE_THEN_GOTO (frame, priv, wind); CHANGELOG_INVOKE_FOP (priv, setattr, frame, this, loc, stbuf, valid, xdata); wind: STACK_WIND (frame, changelog_setattr_cbk, FIRST_CHILD (this), FIRST_CHILD (this)->fops->setattr, loc, stbuf, valid, xdata); return 0; } /* {f}removexattr */ int32_t changelog_fremovexattr_cbk (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno, dict_t *xdata) { changelog_priv_t *priv = NULL; changelog_local_t *local = NULL; priv = this->private; local = frame->local; CHANGELOG_COND_GOTO (priv, ((op_ret < 0) || !local), unwind); CHANGELOG_INVOKE_CFOP (this, priv, write, local, CHANGELOG_TYPE_METADATA); unwind: CHANGELOG_STACK_UNWIND (fremovexattr, frame, op_ret, op_errno, xdata); return 0; } int32_t changelog_fremovexattr (call_frame_t *frame, xlator_t *this, fd_t *fd, const char *name, dict_t *xdata) { changelog_priv_t *priv = NULL; priv = this->private; CHANGELOG_NOT_ACTIVE_THEN_GOTO (frame, priv, wind); CHANGELOG_INVOKE_FOP (priv, fremovexattr, frame, this, fd, name, xdata); wind: STACK_WIND (frame, changelog_fremovexattr_cbk, FIRST_CHILD (this), FIRST_CHILD (this)->fops->fremovexattr, fd, name, xdata); return 0; } int32_t changelog_removexattr_cbk (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno, dict_t *xdata) { changelog_priv_t *priv = NULL; changelog_local_t *local = NULL; priv = this->private; local = frame->local; CHANGELOG_COND_GOTO (priv, ((op_ret < 0) || !local), unwind); CHANGELOG_INVOKE_CFOP (this, priv, write, local, CHANGELOG_TYPE_METADATA); unwind: CHANGELOG_STACK_UNWIND (removexattr, frame, op_ret, op_errno, xdata); return 0; } int32_t changelog_removexattr (call_frame_t *frame, xlator_t *this, loc_t *loc, const char *name, dict_t *xdata) { changelog_priv_t *priv = NULL; priv = this->private; CHANGELOG_NOT_ACTIVE_THEN_GOTO (frame, priv, wind); CHANGELOG_INVOKE_FOP (priv, removexattr, frame, this, loc, name, xdata); wind: STACK_WIND (frame, changelog_removexattr_cbk, FIRST_CHILD (this), FIRST_CHILD (this)->fops->removexattr, loc, name, xdata); return 0; } /* {f}setxattr */ int32_t changelog_setxattr_cbk (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno, dict_t *xdata) { changelog_priv_t *priv = NULL; changelog_local_t *local = NULL; priv = this->private; local = frame->local; CHANGELOG_COND_GOTO (priv, ((op_ret < 0) || !local), unwind); CHANGELOG_INVOKE_CFOP (this, priv, write, local, CHANGELOG_TYPE_METADATA); unwind: CHANGELOG_STACK_UNWIND (setxattr, frame, op_ret, op_errno, xdata); return 0; } int32_t changelog_setxattr (call_frame_t *frame, xlator_t *this, loc_t *loc, dict_t *dict, int32_t flags, dict_t *xdata) { changelog_priv_t *priv = NULL; priv = this->private; CHANGELOG_NOT_ACTIVE_THEN_GOTO (frame, priv, wind); CHANGELOG_INVOKE_FOP (priv, setxattr, frame, this, loc, dict, flags, xdata); wind: STACK_WIND (frame, changelog_setxattr_cbk, FIRST_CHILD (this), FIRST_CHILD (this)->fops->setxattr, loc, dict, flags, xdata); return 0; } int32_t changelog_fsetxattr_cbk (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno, dict_t *xdata) { changelog_priv_t *priv = NULL; changelog_local_t *local = NULL; priv = this->private; local = frame->local; CHANGELOG_COND_GOTO (priv, ((op_ret < 0) || !local), unwind); CHANGELOG_INVOKE_CFOP (this, priv, write, local, CHANGELOG_TYPE_METADATA); unwind: CHANGELOG_STACK_UNWIND (fsetxattr, frame, op_ret, op_errno, xdata); return 0; } int32_t changelog_fsetxattr (call_frame_t *frame, xlator_t *this, fd_t *fd, dict_t *dict, int32_t flags, dict_t *xdata) { changelog_priv_t *priv = NULL; priv = this->private; CHANGELOG_NOT_ACTIVE_THEN_GOTO (frame, priv, wind); CHANGELOG_INVOKE_FOP (priv, fsetxattr, frame, this, fd, dict, flags, xdata); wind: STACK_WIND (frame, changelog_fsetxattr_cbk, FIRST_CHILD (this), FIRST_CHILD (this)->fops->fsetxattr, fd, dict, flags, xdata); return 0; } /* }}} */ /* Data modification fops - TYPE I */ /* {{{ */ /* {f}truncate() */ int32_t changelog_truncate_cbk (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno, struct iatt *prebuf, struct iatt *postbuf, dict_t *xdata) { changelog_priv_t *priv = NULL; changelog_local_t *local = NULL; priv = this->private; local = frame->local; CHANGELOG_COND_GOTO (priv, ((op_ret < 0) || !local), unwind); CHANGELOG_INVOKE_CFOP (this, priv, write, local, CHANGELOG_TYPE_DATA); unwind: CHANGELOG_STACK_UNWIND (truncate, frame, op_ret, op_errno, prebuf, postbuf, xdata); return 0; } int32_t changelog_truncate (call_frame_t *frame, xlator_t *this, loc_t *loc, off_t offset, dict_t *xdata) { changelog_priv_t *priv = NULL; priv = this->private; CHANGELOG_NOT_ACTIVE_THEN_GOTO (frame, priv, wind); CHANGELOG_INVOKE_FOP (priv, truncate, frame, this, loc, offset, xdata); wind: STACK_WIND (frame, changelog_truncate_cbk, FIRST_CHILD (this), FIRST_CHILD (this)->fops->truncate, loc, offset, xdata); return 0; } int32_t changelog_ftruncate_cbk (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno, struct iatt *prebuf, struct iatt *postbuf, dict_t *xdata) { changelog_priv_t *priv = NULL; changelog_local_t *local = NULL; priv = this->private; local = frame->local; CHANGELOG_COND_GOTO (priv, ((op_ret < 0) || !local), unwind); CHANGELOG_INVOKE_CFOP (this, priv, write, local, CHANGELOG_TYPE_DATA); unwind: CHANGELOG_STACK_UNWIND (ftruncate, frame, op_ret, op_errno, prebuf, postbuf, xdata); return 0; } int32_t changelog_ftruncate (call_frame_t *frame, xlator_t *this, fd_t *fd, off_t offset, dict_t *xdata) { changelog_priv_t *priv = NULL; priv = this->private; CHANGELOG_NOT_ACTIVE_THEN_GOTO (frame, priv, wind); CHANGELOG_INVOKE_FOP (priv, ftruncate, frame, this, fd, offset, xdata); wind: STACK_WIND (frame, changelog_ftruncate_cbk, FIRST_CHILD (this), FIRST_CHILD (this)->fops->ftruncate, fd, offset, xdata); return 0; } /* writev() */ int32_t changelog_writev_cbk (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno, struct iatt *prebuf, struct iatt *postbuf, dict_t *xdata) { changelog_priv_t *priv = NULL; changelog_local_t *local = NULL; priv = this->private; local = frame->local; CHANGELOG_COND_GOTO (priv, ((op_ret <= 0) || !local), unwind); CHANGELOG_INVOKE_CFOP (this, priv, write, local, CHANGELOG_TYPE_DATA); unwind: CHANGELOG_STACK_UNWIND (writev, frame, op_ret, op_errno, prebuf, postbuf, xdata); return 0; } int32_t changelog_writev (call_frame_t *frame, xlator_t *this, fd_t *fd, struct iovec *vector, int32_t count, off_t offset, uint32_t flags, struct iobref *iobref, dict_t *xdata) { changelog_priv_t *priv = NULL; priv = this->private; CHANGELOG_NOT_ACTIVE_THEN_GOTO (frame, priv, wind); CHANGELOG_INVOKE_FOP (priv, writev, frame, this, fd, vector, count, offset, flags, iobref, xdata); wind: STACK_WIND (frame, changelog_writev_cbk, FIRST_CHILD (this), FIRST_CHILD (this)->fops->writev, fd, vector, count, offset, flags, iobref, xdata); return 0; } /* }}} */ /** * The * - @init () * - @fini () * - @reconfigure () * ... and helper routines */ /** * needed if there are more operation modes in the future. */ static void changelog_assign_opmode (changelog_priv_t *priv, char *mode) { if ( strncmp (mode, "realtime", 8) == 0 ) { priv->op_mode = CHANGELOG_MODE_RT; } } static void changelog_assign_encoding (changelog_priv_t *priv, char *enc) { if ( strncmp (enc, "binary", 6) == 0 ) { priv->encode_mode = CHANGELOG_ENCODE_BINARY; } else if ( strncmp (enc, "ascii", 5) == 0 ) { priv->encode_mode = CHANGELOG_ENCODE_ASCII; } } static void changelog_assign_policy (changelog_priv_t *priv, char *pol) { if ( strncmp (pol, "default", 7) == 0 ) priv->policy = CHANGELOG_LOG_POLICY_DEFAULT; else if ( strncmp (pol, "replication", 11) == 0 ) priv->policy = CHANGELOG_LOG_POLICY_REPLICATE; } /* cleanup any helper threads that are running */ static void changelog_cleanup_helper_threads (xlator_t *this, changelog_priv_t *priv) { if (priv->cr.rollover_th) { changelog_thread_cleanup (this, priv->cr.rollover_th); priv->cr.rollover_th = 0; } if (priv->cf.fsync_th) { changelog_thread_cleanup (this, priv->cf.fsync_th); priv->cf.fsync_th = 0; } } /* spawn helper thread; cleaning up in case of errors */ static int changelog_spawn_helper_threads (xlator_t *this, changelog_priv_t *priv) { int ret = 0; priv->cr.this = this; if (priv->rollover_time) { ret = pthread_create (&priv->cr.rollover_th, NULL, changelog_rollover, priv); if (ret) goto out; } if (priv->fsync_interval) { priv->cf.this = this; ret = pthread_create (&priv->cf.fsync_th, NULL, changelog_fsync_thread, priv); } if (ret) changelog_cleanup_helper_threads (this, priv); out: return ret; } /* cleanup the notifier thread */ static int changelog_cleanup_notifier (xlator_t *this, changelog_priv_t *priv) { int ret = 0; if (priv->cn.notify_th) { changelog_thread_cleanup (this, priv->cn.notify_th); priv->cn.notify_th = 0; ret = close (priv->wfd); if (ret) gf_log (this->name, GF_LOG_ERROR, "error closing writer end of notifier pipe" " (reason: %s)", strerror (errno)); } return ret; } /* spawn the notifier thread - nop if already running */ static int changelog_spawn_notifier (xlator_t *this, changelog_priv_t *priv) { int ret = 0; int flags = 0; int pipe_fd[2] = {0, 0}; if (priv->cn.notify_th) goto out; /* notifier thread already running */ ret = pipe (pipe_fd); if (ret == -1) { gf_log (this->name, GF_LOG_ERROR, "Cannot create pipe (reason: %s)", strerror (errno)); goto out; } /* writer is non-blocking */ flags = fcntl (pipe_fd[1], F_GETFL); flags |= O_NONBLOCK; ret = fcntl (pipe_fd[1], F_SETFL, flags); if (ret) { gf_log (this->name, GF_LOG_ERROR, "failed to set O_NONBLOCK flag"); goto out; } priv->wfd = pipe_fd[1]; priv->cn.this = this; priv->cn.rfd = pipe_fd[0]; ret = pthread_create (&priv->cn.notify_th, NULL, changelog_notifier, priv); out: return ret; } int32_t mem_acct_init (xlator_t *this) { int ret = -1; if (!this) return ret; ret = xlator_mem_acct_init (this, gf_changelog_mt_end + 1); if (ret != 0) { gf_log (this->name, GF_LOG_WARNING, "Memory accounting" " init failed"); return ret; } return ret; } static int changelog_init (xlator_t *this, changelog_priv_t *priv) { int i = 0; int ret = -1; char *cname = NULL; struct timeval tv = {0,}; ret = gettimeofday (&tv, NULL); if (ret) { gf_log (this->name, GF_LOG_ERROR, "gettimeofday() failure"); goto out; } priv->slice.tv_start = tv; priv->maps[CHANGELOG_TYPE_DATA] = "D "; priv->maps[CHANGELOG_TYPE_METADATA] = "M "; priv->maps[CHANGELOG_TYPE_ENTRY] = "E "; for (; i < CHANGELOG_MAX_TYPE; i++) { /* start with version 1 */ priv->slice.changelog_version[i] = 1; } if (!priv->active) return ret; /* spawn the notifier thread */ ret = changelog_spawn_notifier (this, priv); if (ret) goto out; cname = CHANGELOG_FNAME_FROM_POLICY (priv->cp); LOCK (&priv->lock); { ret = CHANGELOG_INVOKE_CFOP (this, priv, open, cname, _gf_false); } UNLOCK (&priv->lock); if (ret) goto out; /* ... and finally spawn the helpers threads */ ret = changelog_spawn_helper_threads (this, priv); out: return ret; } int reconfigure (xlator_t *this, dict_t *options) { int ret = 0; char *tmp = NULL; char *cname = NULL; changelog_priv_t *priv = NULL; gf_boolean_t active_earlier = _gf_true; gf_boolean_t active_now = _gf_true; changelog_time_slice_t *slice = NULL; priv = this->private; if (!priv) goto out; ret = -1; active_earlier = priv->active; /* first stop the rollover and the fsync thread */ changelog_cleanup_helper_threads (this, priv); GF_OPTION_RECONF ("changelog-dir", tmp, options, str, out); if (!tmp) { gf_log (this->name, GF_LOG_ERROR, "\"changelog-dir\" option is not set"); goto out; } GF_FREE (priv->changelog_dir); priv->changelog_dir = gf_strdup (tmp); if (!priv->changelog_dir) goto out; ret = mkdir_p (priv->changelog_dir, 0600, _gf_true); if (ret) goto out; GF_OPTION_RECONF ("changelog", active_now, options, bool, out); /** * changelog_handle_change() handles changes that could possibly * have been submit changes before changelog deactivation. */ if (!active_now) priv->active = _gf_false; GF_OPTION_RECONF ("op-mode", tmp, options, str, out); changelog_assign_opmode (priv, tmp); tmp = NULL; GF_OPTION_RECONF ("encoding", tmp, options, str, out); changelog_assign_encoding (priv, tmp); GF_OPTION_RECONF ("rollover-time", priv->rollover_time, options, int32, out); GF_OPTION_RECONF ("fsync-interval", priv->fsync_interval, options, int32, out); if (active_now || active_earlier) { slice = &priv->slice; cname = CHANGELOG_FNAME_FROM_POLICY (priv->cp); LOCK (&priv->lock); { ret = CHANGELOG_INVOKE_CFOP (this, priv, rollover, cname, !active_now); if (!ret && active_now) SLICE_VERSION_UPDATE (slice); } UNLOCK (&priv->lock); if (ret) goto out; if (active_now) { ret = changelog_spawn_notifier (this, priv); if (!ret) ret = changelog_spawn_helper_threads (this, priv); } else ret = changelog_cleanup_notifier (this, priv); } out: if (ret) { ret = changelog_cleanup_notifier (this, priv); } else { gf_log (this->name, GF_LOG_DEBUG, "changelog reconfigured"); if (active_now) priv->active = _gf_true; } return ret; } int32_t init (xlator_t *this) { int ret = -1; char *tmp = NULL; changelog_priv_t *priv = NULL; GF_VALIDATE_OR_GOTO ("changelog", this, out); if (!this->children || this->children->next) { gf_log (this->name, GF_LOG_ERROR, "translator needs a single subvolume"); goto out; } if (!this->parents) { gf_log (this->name, GF_LOG_ERROR, "dangling volume. please check volfile"); goto out; } priv = GF_CALLOC (1, sizeof (*priv), gf_changelog_mt_priv_t); if (!priv) goto out; this->local_pool = mem_pool_new (changelog_local_t, 64); if (!this->local_pool) { gf_log (this->name, GF_LOG_ERROR, "failed to create local memory pool"); goto out; } LOCK_INIT (&priv->lock); GF_OPTION_INIT ("changelog-brick", tmp, str, out); if (!tmp) { gf_log (this->name, GF_LOG_ERROR, "\"changelog-brick\" option is not set"); goto out; } priv->changelog_brick = gf_strdup (tmp); if (!priv->changelog_brick) goto out; tmp = NULL; GF_OPTION_INIT ("changelog-dir", tmp, str, out); if (!tmp) { gf_log (this->name, GF_LOG_ERROR, "\"changelog-dir\" option is not set"); goto out; } priv->changelog_dir = gf_strdup (tmp); if (!priv->changelog_dir) goto out; tmp = NULL; /** * create the directory even if change-logging would be inactive * so that consumers can _look_ into it (finding nothing...) */ ret = mkdir_p (priv->changelog_dir, 0600, _gf_true); if (ret) goto out; GF_OPTION_INIT ("changelog", priv->active, bool, out); GF_OPTION_INIT ("op-mode", tmp, str, out); changelog_assign_opmode (priv, tmp); tmp = NULL; GF_OPTION_INIT ("encoding", tmp, str, out); changelog_assign_encoding (priv, tmp); tmp = NULL; GF_OPTION_INIT ("policy", tmp, str, out); changelog_assign_policy (priv, tmp); GF_OPTION_INIT ("fsync-interval", priv->fsync_interval, int32, out); GF_ASSERT (cb_encoder[priv->encode_mode].encoder == priv->encode_mode); priv->ce = &cb_encoder[priv->encode_mode]; GF_ASSERT (cb_bootstrap[priv->op_mode].mode == priv->op_mode); priv->cb = &cb_bootstrap[priv->op_mode]; GF_ASSERT (cb_policy[priv->policy].policy == priv->policy); priv->cp = &cb_policy[priv->policy]; /* ... init logging policy */ ret = priv->cp->init_policy (this, priv, priv->cp); if (ret) goto out; /* ... now bootstrap the logger */ ret = priv->cb->ctor (this, &priv->cd, priv->lockless_update); if (ret) goto out; /* override the value if set */ if (dict_get (this->options, "rollover-time")) { ret = dict_get_int32 (this->options, "rollover-time", &priv->rollover_time); if (ret) { gf_log (this->name, GF_LOG_ERROR, "Cannot get value for \"rollover-time\""); goto out; } } priv->changelog_fd = -1; ret = changelog_init (this, priv); if (ret) goto out; gf_log (this->name, GF_LOG_DEBUG, "changelog translator loaded"); out: if (ret) { if (this->local_pool) mem_pool_destroy (this->local_pool); if (priv->cb) { ret = priv->cb->dtor (this, &priv->cd); if (ret) gf_log (this->name, GF_LOG_ERROR, "error in cleanup during init()"); } GF_FREE (priv->changelog_brick); GF_FREE (priv->changelog_dir); GF_FREE (priv); this->private = NULL; } else this->private = priv; return ret; } void fini (xlator_t *this) { int ret = -1; changelog_priv_t *priv = NULL; priv = this->private; if (priv) { ret = priv->cb->dtor (this, &priv->cd); if (ret) gf_log (this->name, GF_LOG_ERROR, "error in fini"); mem_pool_destroy (this->local_pool); GF_FREE (priv->changelog_brick); GF_FREE (priv->changelog_dir); GF_FREE (priv); } this->private = NULL; return; } struct xlator_fops fops = { .mknod = changelog_mknod, .mkdir = changelog_mkdir, .create = changelog_create, .symlink = changelog_symlink, .writev = changelog_writev, .truncate = changelog_truncate, .ftruncate = changelog_ftruncate, .link = changelog_link, .rename = changelog_rename, .unlink = changelog_unlink, .rmdir = changelog_rmdir, .setattr = changelog_setattr, .fsetattr = changelog_fsetattr, .setxattr = changelog_setxattr, .fsetxattr = changelog_fsetxattr, .removexattr = changelog_removexattr, .fremovexattr = changelog_fremovexattr, }; struct xlator_cbks cbks = { .forget = changelog_forget, }; struct volume_options options[] = { {.key = {"changelog"}, .type = GF_OPTION_TYPE_BOOL, .default_value = "on", .description = "enable/disable change-logging" }, {.key = {"changelog-brick"}, .type = GF_OPTION_TYPE_PATH, .description = "brick path to generate unique socket file name." " should be the export directory of the volume strictly." }, {.key = {"changelog-dir"}, .type = GF_OPTION_TYPE_PATH, .description = "directory for the changelog files" }, {.key = {"op-mode"}, .type = GF_OPTION_TYPE_STR, .default_value = "realtime", .value = {"realtime"}, .description = "operation mode - futuristic operation modes" }, {.key = {"encoding"}, .type = GF_OPTION_TYPE_STR, .default_value = "ascii", .value = {"binary", "ascii"}, .description = "encoding type for changelogs" }, {.key = {"rollover-time"}, .type = GF_OPTION_TYPE_INT, .description = "time to switch to a new changelog file (in seconds)" }, {.key = {"fsync-interval"}, .type = GF_OPTION_TYPE_TIME, .default_value = "0", .description = "do not open CHANGELOG file with O_SYNC mode." " instead perform fsync() at specified intervals" }, {.key = {"policy"}, .type = GF_OPTION_TYPE_STR, .default_value = "replication", .value = {"default", "replication"}, .description = "Logging policies" }, {.key = {NULL} }, };