summaryrefslogtreecommitdiffstats
path: root/xlators/features/bit-rot/src/stub
diff options
context:
space:
mode:
Diffstat (limited to 'xlators/features/bit-rot/src/stub')
-rw-r--r--xlators/features/bit-rot/src/stub/Makefile.am20
-rw-r--r--xlators/features/bit-rot/src/stub/bit-rot-common.h178
-rw-r--r--xlators/features/bit-rot/src/stub/bit-rot-object-version.h30
-rw-r--r--xlators/features/bit-rot/src/stub/bit-rot-stub-helpers.c796
-rw-r--r--xlators/features/bit-rot/src/stub/bit-rot-stub-mem-types.h36
-rw-r--r--xlators/features/bit-rot/src/stub/bit-rot-stub-messages.h117
-rw-r--r--xlators/features/bit-rot/src/stub/bit-rot-stub.c3590
-rw-r--r--xlators/features/bit-rot/src/stub/bit-rot-stub.h515
8 files changed, 5282 insertions, 0 deletions
diff --git a/xlators/features/bit-rot/src/stub/Makefile.am b/xlators/features/bit-rot/src/stub/Makefile.am
new file mode 100644
index 00000000000..f13de7145fc
--- /dev/null
+++ b/xlators/features/bit-rot/src/stub/Makefile.am
@@ -0,0 +1,20 @@
+if WITH_SERVER
+xlator_LTLIBRARIES = bitrot-stub.la
+endif
+xlatordir = $(libdir)/glusterfs/$(PACKAGE_VERSION)/xlator/features
+
+bitrot_stub_la_LDFLAGS = -module $(GF_XLATOR_DEFAULT_LDFLAGS)
+
+bitrot_stub_la_SOURCES = bit-rot-stub-helpers.c bit-rot-stub.c
+bitrot_stub_la_LIBADD = $(top_builddir)/libglusterfs/src/libglusterfs.la
+
+noinst_HEADERS = bit-rot-stub.h bit-rot-common.h bit-rot-stub-mem-types.h \
+ bit-rot-object-version.h bit-rot-stub-messages.h
+
+AM_CPPFLAGS = $(GF_CPPFLAGS) -I$(top_srcdir)/libglusterfs/src \
+ -I$(top_srcdir)/rpc/xdr/src -I$(top_builddir)/rpc/xdr/src \
+ -I$(top_srcdir)/rpc/rpc-lib/src
+
+AM_CFLAGS = -Wall $(GF_CFLAGS)
+
+CLEANFILES =
diff --git a/xlators/features/bit-rot/src/stub/bit-rot-common.h b/xlators/features/bit-rot/src/stub/bit-rot-common.h
new file mode 100644
index 00000000000..20561aa7764
--- /dev/null
+++ b/xlators/features/bit-rot/src/stub/bit-rot-common.h
@@ -0,0 +1,178 @@
+/*
+ Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com>
+ This file is part of GlusterFS.
+
+ This file is licensed to you under your choice of the GNU Lesser
+ General Public License, version 3 or any later version (LGPLv3 or
+ later), or the GNU General Public License, version 2 (GPLv2), in all
+ cases as published by the Free Software Foundation.
+*/
+
+#ifndef __BIT_ROT_COMMON_H__
+#define __BIT_ROT_COMMON_H__
+
+#include <glusterfs/glusterfs.h>
+#include "bit-rot-object-version.h"
+
+#define BR_VXATTR_VERSION (1 << 0)
+#define BR_VXATTR_SIGNATURE (1 << 1)
+
+#define BR_VXATTR_SIGN_MISSING (BR_VXATTR_SIGNATURE)
+#define BR_VXATTR_ALL_MISSING (BR_VXATTR_VERSION | BR_VXATTR_SIGNATURE)
+
+#define BR_BAD_OBJ_CONTAINER \
+ (uuid_t) { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 8 }
+
+typedef enum br_vxattr_state {
+ BR_VXATTR_STATUS_FULL = 0,
+ BR_VXATTR_STATUS_MISSING = 1,
+ BR_VXATTR_STATUS_UNSIGNED = 2,
+ BR_VXATTR_STATUS_INVALID = 3,
+} br_vxattr_status_t;
+
+typedef enum br_sign_state {
+ BR_SIGN_INVALID = -1,
+ BR_SIGN_NORMAL = 0,
+ BR_SIGN_REOPEN_WAIT = 1,
+ BR_SIGN_QUICK = 2,
+} br_sign_state_t;
+
+static inline br_vxattr_status_t
+br_version_xattr_state(dict_t *xattr, br_version_t **obuf,
+ br_signature_t **sbuf, gf_boolean_t *objbad)
+{
+ int32_t ret = 0;
+ int32_t vxattr = 0;
+ br_vxattr_status_t status;
+ void *data = NULL;
+
+ /**
+ * The key being present in the dict indicates the xattr was set on
+ * disk. The presence of xattr itself as of now is suffecient to say
+ * the the object is bad.
+ */
+ *objbad = _gf_false;
+ ret = dict_get_bin(xattr, BITROT_OBJECT_BAD_KEY, (void **)&data);
+ if (!ret)
+ *objbad = _gf_true;
+
+ ret = dict_get_bin(xattr, BITROT_CURRENT_VERSION_KEY, (void **)obuf);
+ if (ret)
+ vxattr |= BR_VXATTR_VERSION;
+
+ ret = dict_get_bin(xattr, BITROT_SIGNING_VERSION_KEY, (void **)sbuf);
+ if (ret)
+ vxattr |= BR_VXATTR_SIGNATURE;
+
+ switch (vxattr) {
+ case 0:
+ status = BR_VXATTR_STATUS_FULL;
+ break;
+ case BR_VXATTR_SIGN_MISSING:
+ status = BR_VXATTR_STATUS_UNSIGNED;
+ break;
+ case BR_VXATTR_ALL_MISSING:
+ status = BR_VXATTR_STATUS_MISSING;
+ break;
+ default:
+ status = BR_VXATTR_STATUS_INVALID;
+ }
+
+ return status;
+}
+
+/**
+ * in-memory representation of signature used by signer for object
+ * signing.
+ */
+typedef struct br_isignature_in {
+ int8_t signaturetype; /* signature type */
+
+ unsigned long signedversion; /* version against which the
+ object was signed */
+
+ size_t signaturelen; /* signature length */
+ char signature[0]; /* object signature */
+} br_isignature_t;
+
+/**
+ * in-memory representation of signature used by scrubber for object
+ * verification.
+ */
+typedef struct br_isignature_out {
+ char stale; /* stale signature? */
+
+ unsigned long version; /* current signed version */
+
+ uint32_t time[2]; /* time when the object
+ got dirtied */
+
+ int8_t signaturetype; /* hash type */
+ size_t signaturelen; /* signature length */
+ char signature[0]; /* signature (hash) */
+} br_isignature_out_t;
+
+typedef struct br_stub_init {
+ uint32_t timebuf[2];
+ char export[PATH_MAX];
+} br_stub_init_t;
+
+typedef enum {
+ BR_SIGNATURE_TYPE_VOID = -1, /* object is not signed */
+ BR_SIGNATURE_TYPE_ZERO = 0, /* min boundary */
+ BR_SIGNATURE_TYPE_SHA256 = 1, /* signed with SHA256 */
+ BR_SIGNATURE_TYPE_MAX = 2, /* max boundary */
+} br_signature_type;
+
+/* BitRot stub start time (virtual xattr) */
+#define GLUSTERFS_GET_BR_STUB_INIT_TIME "trusted.glusterfs.bit-rot.stub-init"
+
+/* signing/reopen hint */
+#define BR_OBJECT_RESIGN 0
+#define BR_OBJECT_REOPEN 1
+#define BR_REOPEN_SIGN_HINT_KEY "trusted.glusterfs.bit-rot.reopen-hint"
+
+static inline int
+br_is_signature_type_valid(int8_t signaturetype)
+{
+ return ((signaturetype > BR_SIGNATURE_TYPE_ZERO) &&
+ (signaturetype < BR_SIGNATURE_TYPE_MAX));
+}
+
+static inline void
+br_set_default_ongoingversion(br_version_t *buf, uint32_t *tv)
+{
+ buf->ongoingversion = BITROT_DEFAULT_CURRENT_VERSION;
+ buf->timebuf[0] = tv[0];
+ buf->timebuf[1] = tv[1];
+}
+
+static inline void
+br_set_default_signature(br_signature_t *buf, size_t *size)
+{
+ buf->signaturetype = (int8_t)BR_SIGNATURE_TYPE_VOID;
+ buf->signedversion = BITROT_DEFAULT_SIGNING_VERSION;
+
+ *size = sizeof(br_signature_t); /* no signature */
+}
+
+static inline void
+br_set_ongoingversion(br_version_t *buf, unsigned long version, uint32_t *tv)
+{
+ buf->ongoingversion = version;
+ buf->timebuf[0] = tv[0];
+ buf->timebuf[1] = tv[1];
+}
+
+static inline void
+br_set_signature(br_signature_t *buf, br_isignature_t *sign,
+ size_t signaturelen, size_t *size)
+{
+ buf->signaturetype = sign->signaturetype;
+ buf->signedversion = ntohl(sign->signedversion);
+
+ memcpy(buf->signature, sign->signature, signaturelen);
+ *size = sizeof(br_signature_t) + signaturelen;
+}
+
+#endif /* __BIT_ROT_COMMON_H__ */
diff --git a/xlators/features/bit-rot/src/stub/bit-rot-object-version.h b/xlators/features/bit-rot/src/stub/bit-rot-object-version.h
new file mode 100644
index 00000000000..7ae6a5200df
--- /dev/null
+++ b/xlators/features/bit-rot/src/stub/bit-rot-object-version.h
@@ -0,0 +1,30 @@
+/*
+ Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com>
+ This file is part of GlusterFS.
+
+ This file is licensed to you under your choice of the GNU Lesser
+ General Public License, version 3 or any later version (LGPLv3 or
+ later), or the GNU General Public License, version 2 (GPLv2), in all
+ cases as published by the Free Software Foundation.
+*/
+
+#ifndef __BIT_ROT_OBJECT_VERSION_H
+#define __BIT_ROT_OBJECT_VERSION_H
+
+/**
+ * on-disk formats for ongoing version and object signature.
+ */
+typedef struct br_version {
+ unsigned long ongoingversion;
+ uint32_t timebuf[2];
+} br_version_t;
+
+typedef struct __attribute__((__packed__)) br_signature {
+ int8_t signaturetype;
+
+ unsigned long signedversion;
+
+ char signature[0];
+} br_signature_t;
+
+#endif
diff --git a/xlators/features/bit-rot/src/stub/bit-rot-stub-helpers.c b/xlators/features/bit-rot/src/stub/bit-rot-stub-helpers.c
new file mode 100644
index 00000000000..8ac13a09941
--- /dev/null
+++ b/xlators/features/bit-rot/src/stub/bit-rot-stub-helpers.c
@@ -0,0 +1,796 @@
+/*
+ Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com>
+ This file is part of GlusterFS.
+
+ This file is licensed to you under your choice of the GNU Lesser
+ General Public License, version 3 or any later version (LGPLv3 or
+ later), or the GNU General Public License, version 2 (GPLv2), in all
+ cases as published by the Free Software Foundation.
+*/
+
+#include "bit-rot-stub.h"
+
+br_stub_fd_t *
+br_stub_fd_new(void)
+{
+ br_stub_fd_t *br_stub_fd = NULL;
+
+ br_stub_fd = GF_CALLOC(1, sizeof(*br_stub_fd), gf_br_stub_mt_br_stub_fd_t);
+
+ return br_stub_fd;
+}
+
+int
+__br_stub_fd_ctx_set(xlator_t *this, fd_t *fd, br_stub_fd_t *br_stub_fd)
+{
+ uint64_t value = 0;
+ int ret = -1;
+
+ GF_VALIDATE_OR_GOTO("bit-rot-stub", this, out);
+ GF_VALIDATE_OR_GOTO(this->name, fd, out);
+ GF_VALIDATE_OR_GOTO(this->name, br_stub_fd, out);
+
+ value = (uint64_t)(long)br_stub_fd;
+
+ ret = __fd_ctx_set(fd, this, value);
+
+out:
+ return ret;
+}
+
+br_stub_fd_t *
+__br_stub_fd_ctx_get(xlator_t *this, fd_t *fd)
+{
+ br_stub_fd_t *br_stub_fd = NULL;
+ uint64_t value = 0;
+ int ret = -1;
+
+ GF_VALIDATE_OR_GOTO("bit-rot-stub", this, out);
+ GF_VALIDATE_OR_GOTO(this->name, fd, out);
+
+ ret = __fd_ctx_get(fd, this, &value);
+ if (ret)
+ return NULL;
+
+ br_stub_fd = (br_stub_fd_t *)((long)value);
+
+out:
+ return br_stub_fd;
+}
+
+br_stub_fd_t *
+br_stub_fd_ctx_get(xlator_t *this, fd_t *fd)
+{
+ br_stub_fd_t *br_stub_fd = NULL;
+
+ GF_VALIDATE_OR_GOTO("bit-rot-stub", this, out);
+ GF_VALIDATE_OR_GOTO(this->name, fd, out);
+
+ LOCK(&fd->lock);
+ {
+ br_stub_fd = __br_stub_fd_ctx_get(this, fd);
+ }
+ UNLOCK(&fd->lock);
+
+out:
+ return br_stub_fd;
+}
+
+int32_t
+br_stub_fd_ctx_set(xlator_t *this, fd_t *fd, br_stub_fd_t *br_stub_fd)
+{
+ int32_t ret = -1;
+
+ GF_VALIDATE_OR_GOTO("bit-rot-stub", this, out);
+ GF_VALIDATE_OR_GOTO(this->name, fd, out);
+ GF_VALIDATE_OR_GOTO(this->name, br_stub_fd, out);
+
+ LOCK(&fd->lock);
+ {
+ ret = __br_stub_fd_ctx_set(this, fd, br_stub_fd);
+ }
+ UNLOCK(&fd->lock);
+
+out:
+ return ret;
+}
+
+/**
+ * Adds an entry to the bad objects directory.
+ * @gfid: gfid of the bad object being added to the bad objects directory
+ */
+int
+br_stub_add(xlator_t *this, uuid_t gfid)
+{
+ char gfid_path[BR_PATH_MAX_PLUS] = {0};
+ char bad_gfid_path[BR_PATH_MAX_PLUS] = {0};
+ int ret = 0;
+ br_stub_private_t *priv = NULL;
+ struct stat st = {0};
+
+ priv = this->private;
+ GF_ASSERT_AND_GOTO_WITH_ERROR(this->name, !gf_uuid_is_null(gfid), out,
+ errno, EINVAL);
+
+ snprintf(gfid_path, sizeof(gfid_path), "%s/%s", priv->stub_basepath,
+ uuid_utoa(gfid));
+
+ ret = sys_stat(gfid_path, &st);
+ if (!ret)
+ goto out;
+ snprintf(bad_gfid_path, sizeof(bad_gfid_path), "%s/stub-%s",
+ priv->stub_basepath, uuid_utoa(priv->bad_object_dir_gfid));
+
+ ret = sys_link(bad_gfid_path, gfid_path);
+ if (ret) {
+ if ((errno != ENOENT) && (errno != EMLINK) && (errno != EEXIST))
+ goto out;
+
+ /*
+ * Continue with success. At least we'll have half of the
+ * functionality, in the sense, object is marked bad and
+ * would be inaccessible. It's only scrub status that would
+ * show up less number of objects. That's fine as we'll have
+ * the log files that will have the missing information.
+ */
+ gf_smsg(this->name, GF_LOG_WARNING, errno, BRS_MSG_LINK_FAIL, "gfid=%s",
+ uuid_utoa(gfid), NULL);
+ }
+
+ return 0;
+out:
+ return -1;
+}
+
+int
+br_stub_del(xlator_t *this, uuid_t gfid)
+{
+ int32_t op_errno __attribute__((unused)) = 0;
+ br_stub_private_t *priv = NULL;
+ int ret = 0;
+ char gfid_path[BR_PATH_MAX_PLUS] = {0};
+
+ priv = this->private;
+ GF_ASSERT_AND_GOTO_WITH_ERROR(this->name, !gf_uuid_is_null(gfid), out,
+ op_errno, EINVAL);
+ snprintf(gfid_path, sizeof(gfid_path), "%s/%s", priv->stub_basepath,
+ uuid_utoa(gfid));
+ ret = sys_unlink(gfid_path);
+ if (ret && (errno != ENOENT)) {
+ gf_smsg(this->name, GF_LOG_ERROR, errno, BRS_MSG_BAD_OBJ_UNLINK_FAIL,
+ "path=%s", gfid_path, NULL);
+ ret = -errno;
+ goto out;
+ }
+
+ ret = 0;
+
+out:
+ return ret;
+}
+
+static int
+br_stub_check_stub_directory(xlator_t *this, char *fullpath)
+{
+ int ret = 0;
+ struct stat st = {
+ 0,
+ };
+ char oldpath[BR_PATH_MAX_PLUS] = {0};
+ br_stub_private_t *priv = NULL;
+
+ priv = this->private;
+
+ snprintf(oldpath, sizeof(oldpath), "%s/%s", priv->export,
+ OLD_BR_STUB_QUARANTINE_DIR);
+
+ ret = sys_stat(fullpath, &st);
+ if (!ret && !S_ISDIR(st.st_mode))
+ goto error_return;
+ if (ret) {
+ if (errno != ENOENT)
+ goto error_return;
+ ret = sys_stat(oldpath, &st);
+ if (ret)
+ ret = mkdir_p(fullpath, 0600, _gf_true);
+ else
+ ret = sys_rename(oldpath, fullpath);
+ }
+
+ if (ret)
+ gf_smsg(this->name, GF_LOG_ERROR, errno, BRS_MSG_BAD_OBJECT_DIR_FAIL,
+ "create-path=%s", fullpath, NULL);
+ return ret;
+
+error_return:
+ gf_smsg(this->name, GF_LOG_ERROR, errno, BRS_MSG_BAD_OBJECT_DIR_FAIL,
+ "verify-path=%s", fullpath, NULL);
+ return -1;
+}
+
+/**
+ * Function to create the container for the bad objects within the bad objects
+ * directory.
+ */
+static int
+br_stub_check_stub_file(xlator_t *this, char *path)
+{
+ int ret = 0;
+ int fd = -1;
+ struct stat st = {
+ 0,
+ };
+
+ ret = sys_stat(path, &st);
+ if (!ret && !S_ISREG(st.st_mode))
+ goto error_return;
+ if (ret) {
+ if (errno != ENOENT)
+ goto error_return;
+ fd = sys_creat(path, 0);
+ if (fd < 0)
+ gf_smsg(this->name, GF_LOG_ERROR, errno,
+ BRS_MSG_BAD_OBJECT_DIR_FAIL, "create-path=%s", path, NULL);
+ }
+
+ if (fd >= 0) {
+ sys_close(fd);
+ ret = 0;
+ }
+
+ return ret;
+
+error_return:
+ gf_smsg(this->name, GF_LOG_ERROR, errno, BRS_MSG_BAD_OBJECT_DIR_FAIL,
+ "verify-path=%s", path, NULL);
+ return -1;
+}
+
+int
+br_stub_dir_create(xlator_t *this, br_stub_private_t *priv)
+{
+ int ret = -1;
+ char fullpath[BR_PATH_MAX_PLUS] = {
+ 0,
+ };
+ char stub_gfid_path[BR_PATH_MAX_PLUS] = {
+ 0,
+ };
+
+ gf_uuid_copy(priv->bad_object_dir_gfid, BR_BAD_OBJ_CONTAINER);
+
+ if (snprintf(fullpath, sizeof(fullpath), "%s", priv->stub_basepath) >=
+ sizeof(fullpath))
+ goto out;
+
+ if (snprintf(stub_gfid_path, sizeof(stub_gfid_path), "%s/stub-%s",
+ priv->stub_basepath, uuid_utoa(priv->bad_object_dir_gfid)) >=
+ sizeof(stub_gfid_path))
+ goto out;
+
+ ret = br_stub_check_stub_directory(this, fullpath);
+ if (ret)
+ goto out;
+ ret = br_stub_check_stub_file(this, stub_gfid_path);
+ if (ret)
+ goto out;
+
+ return 0;
+
+out:
+ return -1;
+}
+
+call_stub_t *
+__br_stub_dequeue(struct list_head *callstubs)
+{
+ call_stub_t *stub = NULL;
+
+ if (!list_empty(callstubs)) {
+ stub = list_entry(callstubs->next, call_stub_t, list);
+ list_del_init(&stub->list);
+ }
+
+ return stub;
+}
+
+void
+__br_stub_enqueue(struct list_head *callstubs, call_stub_t *stub)
+{
+ list_add_tail(&stub->list, callstubs);
+}
+
+void
+br_stub_worker_enqueue(xlator_t *this, call_stub_t *stub)
+{
+ br_stub_private_t *priv = NULL;
+
+ priv = this->private;
+ pthread_mutex_lock(&priv->container.bad_lock);
+ {
+ __br_stub_enqueue(&priv->container.bad_queue, stub);
+ pthread_cond_signal(&priv->container.bad_cond);
+ }
+ pthread_mutex_unlock(&priv->container.bad_lock);
+}
+
+void *
+br_stub_worker(void *data)
+{
+ br_stub_private_t *priv = NULL;
+ xlator_t *this = NULL;
+ call_stub_t *stub = NULL;
+
+ THIS = data;
+ this = data;
+ priv = this->private;
+
+ for (;;) {
+ pthread_mutex_lock(&priv->container.bad_lock);
+ {
+ while (list_empty(&priv->container.bad_queue)) {
+ (void)pthread_cond_wait(&priv->container.bad_cond,
+ &priv->container.bad_lock);
+ }
+
+ stub = __br_stub_dequeue(&priv->container.bad_queue);
+ }
+ pthread_mutex_unlock(&priv->container.bad_lock);
+
+ if (stub) /* guard against spurious wakeups */
+ call_resume(stub);
+ }
+
+ return NULL;
+}
+
+int32_t
+br_stub_lookup_wrapper(call_frame_t *frame, xlator_t *this, loc_t *loc,
+ dict_t *xattr_req)
+{
+ br_stub_private_t *priv = NULL;
+ struct stat lstatbuf = {0};
+ int ret = 0;
+ int32_t op_errno = EINVAL;
+ int32_t op_ret = -1;
+ struct iatt stbuf = {
+ 0,
+ };
+ struct iatt postparent = {
+ 0,
+ };
+ dict_t *xattr = NULL;
+ gf_boolean_t ver_enabled = _gf_false;
+
+ BR_STUB_VER_ENABLED_IN_CALLPATH(frame, ver_enabled);
+ priv = this->private;
+ BR_STUB_VER_COND_GOTO(priv, (!ver_enabled), done);
+
+ VALIDATE_OR_GOTO(loc, done);
+ if (gf_uuid_compare(loc->gfid, priv->bad_object_dir_gfid))
+ goto done;
+
+ ret = sys_lstat(priv->stub_basepath, &lstatbuf);
+ if (ret) {
+ gf_msg_debug(this->name, errno,
+ "Stat failed on stub bad "
+ "object dir");
+ op_errno = errno;
+ goto done;
+ } else if (!S_ISDIR(lstatbuf.st_mode)) {
+ gf_msg_debug(this->name, errno,
+ "bad object container is not "
+ "a directory");
+ op_errno = ENOTDIR;
+ goto done;
+ }
+
+ iatt_from_stat(&stbuf, &lstatbuf);
+ gf_uuid_copy(stbuf.ia_gfid, priv->bad_object_dir_gfid);
+
+ op_ret = op_errno = 0;
+ xattr = dict_new();
+ if (!xattr) {
+ op_ret = -1;
+ op_errno = ENOMEM;
+ }
+
+done:
+ STACK_UNWIND_STRICT(lookup, frame, op_ret, op_errno, loc->inode, &stbuf,
+ xattr, &postparent);
+ if (xattr)
+ dict_unref(xattr);
+ return 0;
+}
+
+static int
+is_bad_gfid_file_current(char *filename, uuid_t gfid)
+{
+ char current_stub_gfid[GF_UUID_BUF_SIZE + 16] = {
+ 0,
+ };
+
+ snprintf(current_stub_gfid, sizeof current_stub_gfid, "stub-%s",
+ uuid_utoa(gfid));
+ return (!strcmp(filename, current_stub_gfid));
+}
+
+static void
+check_delete_stale_bad_file(xlator_t *this, char *filename)
+{
+ int ret = 0;
+ struct stat st = {0};
+ char filepath[BR_PATH_MAX_PLUS] = {0};
+ br_stub_private_t *priv = NULL;
+
+ priv = this->private;
+
+ if (is_bad_gfid_file_current(filename, priv->bad_object_dir_gfid))
+ return;
+
+ snprintf(filepath, sizeof(filepath), "%s/%s", priv->stub_basepath,
+ filename);
+
+ ret = sys_stat(filepath, &st);
+ if (!ret && st.st_nlink == 1)
+ sys_unlink(filepath);
+}
+
+static int
+br_stub_fill_readdir(fd_t *fd, br_stub_fd_t *fctx, DIR *dir, off_t off,
+ size_t size, gf_dirent_t *entries)
+{
+ off_t in_case = -1;
+ off_t last_off = 0;
+ size_t filled = 0;
+ int count = 0;
+ int32_t this_size = -1;
+ gf_dirent_t *this_entry = NULL;
+ xlator_t *this = NULL;
+ struct dirent *entry = NULL;
+ struct dirent scratch[2] = {
+ {
+ 0,
+ },
+ };
+
+ this = THIS;
+ if (!off) {
+ rewinddir(dir);
+ } else {
+ seekdir(dir, off);
+#ifndef GF_LINUX_HOST_OS
+ if ((u_long)telldir(dir) != off && off != fctx->bad_object.dir_eof) {
+ gf_smsg(THIS->name, GF_LOG_ERROR, 0,
+ BRS_MSG_BAD_OBJECT_DIR_SEEK_FAIL, "off=(0x%llx)", off,
+ "dir=%p", dir, NULL);
+ errno = EINVAL;
+ count = -1;
+ goto out;
+ }
+#endif /* GF_LINUX_HOST_OS */
+ }
+
+ while (filled <= size) {
+ in_case = (u_long)telldir(dir);
+
+ if (in_case == -1) {
+ gf_smsg(THIS->name, GF_LOG_ERROR, 0,
+ BRS_MSG_BAD_OBJECT_DIR_TELL_FAIL, "dir=%p", dir, "err=%s",
+ strerror(errno), NULL);
+ goto out;
+ }
+
+ errno = 0;
+ entry = sys_readdir(dir, scratch);
+ if (!entry || errno != 0) {
+ if (errno == EBADF) {
+ gf_smsg(THIS->name, GF_LOG_WARNING, 0,
+ BRS_MSG_BAD_OBJECT_DIR_READ_FAIL, "dir=%p", dir,
+ "err=%s", strerror(errno), NULL);
+ goto out;
+ }
+ break;
+ }
+
+ if (!strcmp(entry->d_name, ".") || !strcmp(entry->d_name, ".."))
+ continue;
+
+ if (!strncmp(entry->d_name, "stub-", strlen("stub-"))) {
+ check_delete_stale_bad_file(this, entry->d_name);
+ continue;
+ }
+
+ this_size = max(sizeof(gf_dirent_t), sizeof(gfs3_dirplist)) +
+ strlen(entry->d_name) + 1;
+
+ if (this_size + filled > size) {
+ seekdir(dir, in_case);
+#ifndef GF_LINUX_HOST_OS
+ if ((u_long)telldir(dir) != in_case &&
+ in_case != fctx->bad_object.dir_eof) {
+ gf_smsg(THIS->name, GF_LOG_ERROR, 0,
+ BRS_MSG_BAD_OBJECT_DIR_SEEK_FAIL, "in_case=(0x%llx)",
+ in_case, "dir=%p", dir, NULL);
+ errno = EINVAL;
+ count = -1;
+ goto out;
+ }
+#endif /* GF_LINUX_HOST_OS */
+ break;
+ }
+
+ this_entry = gf_dirent_for_name(entry->d_name);
+
+ if (!this_entry) {
+ gf_smsg(THIS->name, GF_LOG_ERROR, 0,
+ BRS_MSG_CREATE_GF_DIRENT_FAILED, "entry-name=%s",
+ entry->d_name, "err=%s", strerror(errno), NULL);
+ goto out;
+ }
+ /*
+ * we store the offset of next entry here, which is
+ * probably not intended, but code using syncop_readdir()
+ * (glfs-heal.c, afr-self-heald.c, pump.c) rely on it
+ * for directory read resumption.
+ */
+ last_off = (u_long)telldir(dir);
+ this_entry->d_off = last_off;
+ this_entry->d_ino = entry->d_ino;
+
+ list_add_tail(&this_entry->list, &entries->list);
+
+ filled += this_size;
+ count++;
+ }
+
+ if ((!sys_readdir(dir, scratch) && (errno == 0))) {
+ /* Indicate EOF */
+ errno = ENOENT;
+ /* Remember EOF offset for later detection */
+ fctx->bad_object.dir_eof = last_off;
+ }
+out:
+ return count;
+}
+
+int32_t
+br_stub_readdir_wrapper(call_frame_t *frame, xlator_t *this, fd_t *fd,
+ size_t size, off_t off, dict_t *xdata)
+{
+ br_stub_fd_t *fctx = NULL;
+ DIR *dir = NULL;
+ int ret = -1;
+ int32_t op_ret = -1;
+ int32_t op_errno = 0;
+ int count = 0;
+ gf_dirent_t entries;
+ gf_boolean_t xdata_unref = _gf_false;
+ dict_t *dict = NULL;
+
+ INIT_LIST_HEAD(&entries.list);
+
+ fctx = br_stub_fd_ctx_get(this, fd);
+ if (!fctx) {
+ gf_smsg(this->name, GF_LOG_WARNING, 0, BRS_MSG_GET_FD_CONTEXT_FAILED,
+ "fd=%p", fd, NULL);
+ op_errno = -ret;
+ goto done;
+ }
+
+ dir = fctx->bad_object.dir;
+
+ if (!dir) {
+ gf_smsg(this->name, GF_LOG_WARNING, 0, BRS_MSG_BAD_HANDLE_DIR_NULL,
+ "fd=%p", fd, NULL);
+ op_errno = EINVAL;
+ goto done;
+ }
+
+ count = br_stub_fill_readdir(fd, fctx, dir, off, size, &entries);
+
+ /* pick ENOENT to indicate EOF */
+ op_errno = errno;
+ op_ret = count;
+
+ dict = xdata;
+ (void)br_stub_bad_objects_path(this, fd, &entries, &dict);
+ if (!xdata && dict) {
+ xdata = dict;
+ xdata_unref = _gf_true;
+ }
+
+done:
+ STACK_UNWIND_STRICT(readdir, frame, op_ret, op_errno, &entries, xdata);
+ gf_dirent_free(&entries);
+ if (xdata_unref)
+ dict_unref(xdata);
+ return 0;
+}
+
+/**
+ * This function is called to mainly obtain the paths of the corrupt
+ * objects (files as of now). Currently scrub status prints only the
+ * gfid of the corrupted files. Reason is, bitrot-stub maintains the
+ * list of the corrupted objects as entries inside the quarantine
+ * directory (<brick export>/.glusterfs/quarantine)
+ *
+ * And the name of each entry in the qurantine directory is the gfid
+ * of the corrupted object. So scrub status will just show that info.
+ * But it helps the users a lot if the actual path to the object is
+ * also reported. Hence the below function to get that information.
+ * The function allocates a new dict to be returned (if it does not
+ * get one from the caller of readdir i.e. scrubber as of now), and
+ * stores the paths of each corrupted gfid there. The gfid is used as
+ * the key and path is used as the value.
+ *
+ * NOTE: The path will be there in following situations
+ * 1) gfid2path option has been enabled (posix xlator option)
+ * and the corrupted file contains the path as an extended
+ * attribute.
+ * 2) If the gfid2path option is not enabled, OR if the xattr
+ * is absent, then the inode table should have it.
+ * The path will be there if a name based lookup has happened
+ * on the file which has been corrupted. With lookup a inode and
+ * dentry would be created in the inode table. And the path is
+ * constructed using the in memory inode and dentry. If a lookup
+ * has not happened OR the inode corresponding to the corrupted
+ * file does not exist in the inode table (because it got purged
+ * as lru limit of the inodes exceeded) OR a nameless lookup had
+ * happened to populate the inode in the inode table, then the
+ * path will not be printed in scrub and only the gfid will be there.
+ **/
+int
+br_stub_bad_objects_path(xlator_t *this, fd_t *fd, gf_dirent_t *entries,
+ dict_t **dict)
+{
+ gf_dirent_t *entry = NULL;
+ inode_t *inode = NULL;
+ char *hpath = NULL;
+ uuid_t gfid = {0};
+ int ret = -1;
+ dict_t *tmp_dict = NULL;
+ char str_gfid[64] = {0};
+
+ if (list_empty(&entries->list))
+ return 0;
+
+ tmp_dict = *dict;
+
+ if (!tmp_dict) {
+ tmp_dict = dict_new();
+ /*
+ * If the allocation of dict fails then no need treat it
+ * it as a error. This path (or function) is executed when
+ * "gluster volume bitrot <volume name> scrub status" is
+ * executed, to get the list of the corrupted objects.
+ * And the motive of this function is to get the paths of
+ * the corrupted objects. If the dict allocation fails, then
+ * the scrub status will only show the gfids of those corrupted
+ * objects (which is the behavior as of the time of this patch
+ * being worked upon). So just return and only the gfids will
+ * be shown.
+ */
+ if (!tmp_dict) {
+ gf_smsg(this->name, GF_LOG_ERROR, 0, BRS_MSG_ALLOC_FAILED, NULL);
+ goto out;
+ }
+ }
+
+ list_for_each_entry(entry, &entries->list, list)
+ {
+ gf_uuid_clear(gfid);
+ gf_uuid_parse(entry->d_name, gfid);
+
+ inode = inode_find(fd->inode->table, gfid);
+
+ /* No need to check the return value here.
+ * Because @hpath is examined.
+ */
+ (void)br_stub_get_path_of_gfid(this, fd->inode, inode, gfid, &hpath);
+
+ if (hpath) {
+ gf_msg_debug(this->name, 0,
+ "path of the corrupted "
+ "object (gfid: %s) is %s",
+ uuid_utoa(gfid), hpath);
+ br_stub_entry_xattr_fill(this, hpath, entry, tmp_dict);
+ } else
+ gf_smsg(this->name, GF_LOG_WARNING, 0, BRS_MSG_PATH_GET_FAILED,
+ "gfid=%s", uuid_utoa_r(gfid, str_gfid), NULL);
+
+ inode = NULL;
+ hpath = NULL;
+ }
+
+ ret = 0;
+ *dict = tmp_dict;
+
+out:
+ return ret;
+}
+
+int
+br_stub_get_path_of_gfid(xlator_t *this, inode_t *parent, inode_t *inode,
+ uuid_t gfid, char **path)
+{
+ int32_t ret = -1;
+ char gfid_str[64] = {0};
+
+ GF_VALIDATE_OR_GOTO("bitrot-stub", this, out);
+ GF_VALIDATE_OR_GOTO(this->name, parent, out);
+ GF_VALIDATE_OR_GOTO(this->name, path, out);
+
+ /* Above, No need to validate the @inode for hard resolution. Because
+ * inode can be NULL and if it is NULL, then syncop_gfid_to_path_hard
+ * will allocate a new inode and proceed. So no need to bother about
+ * @inode. Because we need it only to send a syncop_getxattr call
+ * from inside syncop_gfid_to_path_hard. And getxattr fetches the
+ * path from the backend.
+ */
+
+ ret = syncop_gfid_to_path_hard(parent->table, FIRST_CHILD(this), gfid,
+ inode, path, _gf_true);
+ if (ret < 0)
+ gf_smsg(this->name, GF_LOG_WARNING, 0, BRS_MSG_PATH_GET_FAILED,
+ "gfid=%s", uuid_utoa_r(gfid, gfid_str), NULL);
+
+ /*
+ * Try with soft resolution of path if hard resolve fails. Because
+ * checking the xattr on disk to get the path of a inode (or gfid)
+ * is dependent on whether that option is enabled in the posix
+ * xlator or not. If it is not enabled, then hard resolution by
+ * checking the on disk xattr fails.
+ *
+ * Thus in such situations fall back to the soft resolution which
+ * mainly depends on the inode_path() function. And for using
+ * inode_path, @inode has to be linked i.e. a successful lookup should
+ * have happened on the gfid (or the path) to link the inode to the
+ * inode table. And if @inode is NULL, means, the inode has not been
+ * found in the inode table and better not to do inode_path() on the
+ * inode which has not been linked.
+ */
+ if (ret < 0 && inode) {
+ ret = syncop_gfid_to_path_hard(parent->table, FIRST_CHILD(this), gfid,
+ inode, path, _gf_false);
+ if (ret < 0)
+ gf_smsg(this->name, GF_LOG_WARNING, 0, BRS_MSG_PATH_GET_FAILED,
+ "from-memory gfid=%s", uuid_utoa_r(gfid, gfid_str), NULL);
+ }
+
+out:
+ return ret;
+}
+
+/**
+ * NOTE: If the file has multiple hardlinks (in gluster volume
+ * namespace), the path would be one of the hardlinks. Its up to
+ * the user to find the remaining hardlinks (using find -samefile)
+ * and remove them.
+ **/
+void
+br_stub_entry_xattr_fill(xlator_t *this, char *hpath, gf_dirent_t *entry,
+ dict_t *dict)
+{
+ int32_t ret = -1;
+
+ GF_VALIDATE_OR_GOTO("bit-rot-stub", this, out);
+ GF_VALIDATE_OR_GOTO(this->name, hpath, out);
+
+ /*
+ * Use the entry->d_name (which is nothing but the gfid of the
+ * corrupted object) as the key. And the value will be the actual
+ * path of that object (or file).
+ *
+ * ALso ignore the dict_set errors. scrubber will get the gfid of
+ * the corrupted object for sure. So, for now lets just log the
+ * dict_set_dynstr failure and move on.
+ */
+
+ ret = dict_set_dynstr(dict, entry->d_name, hpath);
+ if (ret)
+ gf_smsg(this->name, GF_LOG_WARNING, 0, BRS_MSG_DICT_SET_FAILED,
+ "path=%s", hpath, "object-name=%s", entry->d_name, NULL);
+out:
+ return;
+}
diff --git a/xlators/features/bit-rot/src/stub/bit-rot-stub-mem-types.h b/xlators/features/bit-rot/src/stub/bit-rot-stub-mem-types.h
new file mode 100644
index 00000000000..9d93caf069f
--- /dev/null
+++ b/xlators/features/bit-rot/src/stub/bit-rot-stub-mem-types.h
@@ -0,0 +1,36 @@
+/*
+ Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com>
+ This file is part of GlusterFS.
+
+ This file is licensed to you under your choice of the GNU Lesser
+ General Public License, version 3 or any later version (LGPLv3 or
+ later), or the GNU General Public License, version 2 (GPLv2), in all
+ cases as published by the Free Software Foundation.
+*/
+
+#ifndef _BR_MEM_TYPES_H
+#define _BR_MEM_TYPES_H
+
+#include <glusterfs/mem-types.h>
+
+enum br_mem_types {
+ gf_br_stub_mt_private_t = gf_common_mt_end + 1,
+ gf_br_stub_mt_version_t,
+ gf_br_stub_mt_inode_ctx_t,
+ gf_br_stub_mt_signature_t,
+ gf_br_mt_br_private_t,
+ gf_br_mt_br_child_t,
+ gf_br_mt_br_object_t,
+ gf_br_mt_br_ob_n_wk_t,
+ gf_br_mt_br_scrubber_t,
+ gf_br_mt_br_fsscan_entry_t,
+ gf_br_stub_mt_br_stub_fd_t,
+ gf_br_stub_mt_br_scanner_freq_t,
+ gf_br_stub_mt_sigstub_t,
+ gf_br_mt_br_child_event_t,
+ gf_br_stub_mt_misc,
+ gf_br_mt_br_worker_t,
+ gf_br_stub_mt_end,
+};
+
+#endif
diff --git a/xlators/features/bit-rot/src/stub/bit-rot-stub-messages.h b/xlators/features/bit-rot/src/stub/bit-rot-stub-messages.h
new file mode 100644
index 00000000000..6c15a166f18
--- /dev/null
+++ b/xlators/features/bit-rot/src/stub/bit-rot-stub-messages.h
@@ -0,0 +1,117 @@
+/*
+ Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com>
+ This file is part of GlusterFS.
+
+ This file is licensed to you under your choice of the GNU Lesser
+ General Public License, version 3 or any later version (LGPLv3 or
+ later), or the GNU General Public License, version 2 (GPLv2), in all
+ cases as published by the Free Software Foundation.
+ */
+
+#ifndef _BITROT_STUB_MESSAGES_H_
+#define _BITROT_STUB_MESSAGES_H_
+
+#include <glusterfs/glfs-message-id.h>
+
+/* To add new message IDs, append new identifiers at the end of the list.
+ *
+ * Never remove a message ID. If it's not used anymore, you can rename it or
+ * leave it as it is, but not delete it. This is to prevent reutilization of
+ * IDs by other messages.
+ *
+ * The component name must match one of the entries defined in
+ * glfs-message-id.h.
+ */
+
+GLFS_MSGID(BITROT_STUB, BRS_MSG_NO_MEMORY, BRS_MSG_SET_EVENT_FAILED,
+ BRS_MSG_MEM_ACNT_FAILED, BRS_MSG_CREATE_FRAME_FAILED,
+ BRS_MSG_SET_CONTEXT_FAILED, BRS_MSG_CHANGE_VERSION_FAILED,
+ BRS_MSG_ADD_FD_TO_LIST_FAILED, BRS_MSG_SET_FD_CONTEXT_FAILED,
+ BRS_MSG_CREATE_ANONYMOUS_FD_FAILED, BRS_MSG_NO_CHILD,
+ BRS_MSG_STUB_ALLOC_FAILED, BRS_MSG_GET_INODE_CONTEXT_FAILED,
+ BRS_MSG_CANCEL_SIGN_THREAD_FAILED, BRS_MSG_ADD_FD_TO_INODE,
+ BRS_MSG_SIGN_VERSION_ERROR, BRS_MSG_BAD_OBJ_MARK_FAIL,
+ BRS_MSG_NON_SCRUB_BAD_OBJ_MARK, BRS_MSG_REMOVE_INTERNAL_XATTR,
+ BRS_MSG_SET_INTERNAL_XATTR, BRS_MSG_BAD_OBJECT_ACCESS,
+ BRS_MSG_BAD_CONTAINER_FAIL, BRS_MSG_BAD_OBJECT_DIR_FAIL,
+ BRS_MSG_BAD_OBJECT_DIR_SEEK_FAIL, BRS_MSG_BAD_OBJECT_DIR_TELL_FAIL,
+ BRS_MSG_BAD_OBJECT_DIR_READ_FAIL, BRS_MSG_GET_FD_CONTEXT_FAILED,
+ BRS_MSG_BAD_HANDLE_DIR_NULL, BRS_MSG_BAD_OBJ_THREAD_FAIL,
+ BRS_MSG_BAD_OBJ_DIR_CLOSE_FAIL, BRS_MSG_LINK_FAIL,
+ BRS_MSG_BAD_OBJ_UNLINK_FAIL, BRS_MSG_DICT_SET_FAILED,
+ BRS_MSG_PATH_GET_FAILED, BRS_MSG_NULL_LOCAL,
+ BRS_MSG_SPAWN_SIGN_THRD_FAILED, BRS_MSG_KILL_SIGN_THREAD,
+ BRS_MSG_NON_BITD_PID, BRS_MSG_SIGN_PREPARE_FAIL,
+ BRS_MSG_USING_DEFAULT_THREAD_SIZE, BRS_MSG_ALLOC_MEM_FAILED,
+ BRS_MSG_DICT_ALLOC_FAILED, BRS_MSG_CREATE_GF_DIRENT_FAILED,
+ BRS_MSG_ALLOC_FAILED, BRS_MSG_PATH_XATTR_GET_FAILED,
+ BRS_MSG_VERSION_PREPARE_FAIL);
+
+#define BRS_MSG_MEM_ACNT_FAILED_STR "Memory accounting init failed"
+#define BRS_MSG_BAD_OBJ_THREAD_FAIL_STR "pthread_init failed"
+#define BRS_MSG_USING_DEFAULT_THREAD_SIZE_STR "Using default thread stack size"
+#define BRS_MSG_NO_CHILD_STR "FATAL: no children"
+#define BRS_MSG_SPAWN_SIGN_THRD_FAILED_STR \
+ "failed to create the new thread for signer"
+#define BRS_MSG_BAD_CONTAINER_FAIL_STR \
+ "failed to launch the thread for storing bad gfids"
+#define BRS_MSG_CANCEL_SIGN_THREAD_FAILED_STR \
+ "Could not cancel sign serializer thread"
+#define BRS_MSG_KILL_SIGN_THREAD_STR "killed the signer thread"
+#define BRS_MSG_GET_INODE_CONTEXT_FAILED_STR \
+ "failed to init the inode context for the inode"
+#define BRS_MSG_ADD_FD_TO_INODE_STR "failed to add fd to the inode"
+#define BRS_MSG_NO_MEMORY_STR "local allocation failed"
+#define BRS_MSG_BAD_OBJECT_ACCESS_STR "bad object accessed. Returning"
+#define BRS_MSG_SIGN_VERSION_ERROR_STR "Signing version exceeds current version"
+#define BRS_MSG_NON_BITD_PID_STR \
+ "PID from where signature request came, does not belong to bit-rot " \
+ "daemon. Unwinding the fop"
+#define BRS_MSG_SIGN_PREPARE_FAIL_STR \
+ "failed to prepare the signature. Unwinding the fop"
+#define BRS_MSG_VERSION_PREPARE_FAIL_STR \
+ "failed to prepare the version. Unwinding the fop"
+#define BRS_MSG_STUB_ALLOC_FAILED_STR "failed to allocate stub fop, Unwinding"
+#define BRS_MSG_BAD_OBJ_MARK_FAIL_STR "failed to mark object as bad"
+#define BRS_MSG_NON_SCRUB_BAD_OBJ_MARK_STR \
+ "bad object marking is not from the scrubber"
+#define BRS_MSG_ALLOC_MEM_FAILED_STR "failed to allocate memory"
+#define BRS_MSG_SET_INTERNAL_XATTR_STR "called on the internal xattr"
+#define BRS_MSG_REMOVE_INTERNAL_XATTR_STR "removexattr called on internal xattr"
+#define BRS_MSG_CREATE_ANONYMOUS_FD_FAILED_STR \
+ "failed to create anonymous fd for the inode"
+#define BRS_MSG_ADD_FD_TO_LIST_FAILED_STR "failed add fd to the list"
+#define BRS_MSG_SET_FD_CONTEXT_FAILED_STR \
+ "failed to set the fd context for the file"
+#define BRS_MSG_NULL_LOCAL_STR "local is NULL"
+#define BRS_MSG_DICT_ALLOC_FAILED_STR \
+ "dict allocation failed: cannot send IPC FOP to changelog"
+#define BRS_MSG_SET_EVENT_FAILED_STR "cannot set release event in dict"
+#define BRS_MSG_CREATE_FRAME_FAILED_STR "create_frame() failure"
+#define BRS_MSG_BAD_OBJ_DIR_CLOSE_FAIL_STR "closedir error"
+#define BRS_MSG_LINK_FAIL_STR "failed to record gfid"
+#define BRS_MSG_BAD_OBJ_UNLINK_FAIL_STR \
+ "failed to delete bad object link from quaratine directory"
+#define BRS_MSG_BAD_OBJECT_DIR_FAIL_STR "failed stub directory"
+#define BRS_MSG_BAD_OBJECT_DIR_SEEK_FAIL_STR \
+ "seekdir failed. Invalid argument (offset reused from another DIR * " \
+ "structure)"
+#define BRS_MSG_BAD_OBJECT_DIR_TELL_FAIL_STR "telldir failed on dir"
+#define BRS_MSG_BAD_OBJECT_DIR_READ_FAIL_STR "readdir failed on dir"
+#define BRS_MSG_CREATE_GF_DIRENT_FAILED_STR "could not create gf_dirent"
+#define BRS_MSG_GET_FD_CONTEXT_FAILED_STR "pfd is NULL"
+#define BRS_MSG_BAD_HANDLE_DIR_NULL_STR "dir if NULL"
+#define BRS_MSG_ALLOC_FAILED_STR \
+ "failed to allocate new dict for saving the paths of the corrupted " \
+ "objects. Scrub status will only display the gfid"
+#define BRS_MSG_PATH_GET_FAILED_STR "failed to get the path"
+#define BRS_MSG_PATH_XATTR_GET_FAILED_STR \
+ "failed to get the path xattr from disk for the gfid. Trying to get path " \
+ "from the memory"
+#define BRS_MSG_DICT_SET_FAILED_STR \
+ "failed to set the actual path as the value in the dict for the " \
+ "corrupted object"
+#define BRS_MSG_SET_CONTEXT_FAILED_STR \
+ "could not set fd context for release callback"
+#define BRS_MSG_CHANGE_VERSION_FAILED_STR "change version failed"
+#endif /* !_BITROT_STUB_MESSAGES_H_ */
diff --git a/xlators/features/bit-rot/src/stub/bit-rot-stub.c b/xlators/features/bit-rot/src/stub/bit-rot-stub.c
new file mode 100644
index 00000000000..447dd47ff41
--- /dev/null
+++ b/xlators/features/bit-rot/src/stub/bit-rot-stub.c
@@ -0,0 +1,3590 @@
+/*
+ Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com>
+ This file is part of GlusterFS.
+
+ This file is licensed to you under your choice of the GNU Lesser
+ General Public License, version 3 or any later version (LGPLv3 or
+ later), or the GNU General Public License, version 2 (GPLv2), in all
+ cases as published by the Free Software Foundation.
+*/
+
+#include <ctype.h>
+#include <sys/uio.h>
+#include <signal.h>
+
+#include <glusterfs/glusterfs.h>
+#include <glusterfs/logging.h>
+#include "changelog.h"
+#include <glusterfs/compat-errno.h>
+#include <glusterfs/call-stub.h>
+
+#include "bit-rot-stub.h"
+#include "bit-rot-stub-mem-types.h"
+#include "bit-rot-stub-messages.h"
+#include "bit-rot-common.h"
+
+#define BR_STUB_REQUEST_COOKIE 0x1
+
+void
+br_stub_lock_cleaner(void *arg)
+{
+ pthread_mutex_t *clean_mutex = arg;
+
+ pthread_mutex_unlock(clean_mutex);
+ return;
+}
+
+void *
+br_stub_signth(void *);
+
+struct br_stub_signentry {
+ unsigned long v;
+
+ call_stub_t *stub;
+
+ struct list_head list;
+};
+
+int32_t
+mem_acct_init(xlator_t *this)
+{
+ int32_t ret = -1;
+
+ if (!this)
+ return ret;
+
+ ret = xlator_mem_acct_init(this, gf_br_stub_mt_end + 1);
+
+ if (ret != 0) {
+ gf_smsg(this->name, GF_LOG_WARNING, 0, BRS_MSG_MEM_ACNT_FAILED, NULL);
+ return ret;
+ }
+
+ return ret;
+}
+
+int
+br_stub_bad_object_container_init(xlator_t *this, br_stub_private_t *priv)
+{
+ pthread_attr_t w_attr;
+ int ret = -1;
+
+ ret = pthread_cond_init(&priv->container.bad_cond, NULL);
+ if (ret != 0) {
+ gf_smsg(this->name, GF_LOG_ERROR, 0, BRS_MSG_BAD_OBJ_THREAD_FAIL,
+ "cond_init ret=%d", ret, NULL);
+ goto out;
+ }
+
+ ret = pthread_mutex_init(&priv->container.bad_lock, NULL);
+ if (ret != 0) {
+ gf_smsg(this->name, GF_LOG_ERROR, 0, BRS_MSG_BAD_OBJ_THREAD_FAIL,
+ "mutex_init ret=%d", ret, NULL);
+ goto cleanup_cond;
+ }
+
+ ret = pthread_attr_init(&w_attr);
+ if (ret != 0) {
+ gf_smsg(this->name, GF_LOG_ERROR, 0, BRS_MSG_BAD_OBJ_THREAD_FAIL,
+ "attr_init ret=%d", ret, NULL);
+ goto cleanup_lock;
+ }
+
+ ret = pthread_attr_setstacksize(&w_attr, BAD_OBJECT_THREAD_STACK_SIZE);
+ if (ret == EINVAL) {
+ gf_smsg(this->name, GF_LOG_WARNING, 0,
+ BRS_MSG_USING_DEFAULT_THREAD_SIZE, NULL);
+ }
+
+ INIT_LIST_HEAD(&priv->container.bad_queue);
+ ret = br_stub_dir_create(this, priv);
+ if (ret < 0)
+ goto cleanup_lock;
+
+ ret = gf_thread_create(&priv->container.thread, &w_attr, br_stub_worker,
+ this, "brswrker");
+ if (ret)
+ goto cleanup_attr;
+
+ return 0;
+
+cleanup_attr:
+ pthread_attr_destroy(&w_attr);
+cleanup_lock:
+ pthread_mutex_destroy(&priv->container.bad_lock);
+cleanup_cond:
+ pthread_cond_destroy(&priv->container.bad_cond);
+out:
+ return -1;
+}
+
+int32_t
+init(xlator_t *this)
+{
+ int ret = 0;
+ char *tmp = NULL;
+ struct timeval tv = {
+ 0,
+ };
+ br_stub_private_t *priv = NULL;
+
+ if (!this->children) {
+ gf_smsg(this->name, GF_LOG_ERROR, 0, BRS_MSG_NO_CHILD, NULL);
+ goto error_return;
+ }
+
+ priv = GF_CALLOC(1, sizeof(*priv), gf_br_stub_mt_private_t);
+ if (!priv)
+ goto error_return;
+
+ priv->local_pool = mem_pool_new(br_stub_local_t, 512);
+ if (!priv->local_pool)
+ goto free_priv;
+
+ GF_OPTION_INIT("bitrot", priv->do_versioning, bool, free_mempool);
+
+ GF_OPTION_INIT("export", tmp, str, free_mempool);
+
+ if (snprintf(priv->export, PATH_MAX, "%s", tmp) >= PATH_MAX)
+ goto free_mempool;
+
+ if (snprintf(priv->stub_basepath, sizeof(priv->stub_basepath), "%s/%s",
+ priv->export,
+ BR_STUB_QUARANTINE_DIR) >= sizeof(priv->stub_basepath))
+ goto free_mempool;
+
+ (void)gettimeofday(&tv, NULL);
+
+ /* boot time is in network endian format */
+ priv->boot[0] = htonl(tv.tv_sec);
+ priv->boot[1] = htonl(tv.tv_usec);
+
+ pthread_mutex_init(&priv->lock, NULL);
+ pthread_cond_init(&priv->cond, NULL);
+ INIT_LIST_HEAD(&priv->squeue);
+
+ /* Thread creations need 'this' to be passed so that THIS can be
+ * assigned inside the thread. So setting this->private here.
+ */
+ this->private = priv;
+ if (!priv->do_versioning)
+ return 0;
+
+ ret = gf_thread_create(&priv->signth, NULL, br_stub_signth, this,
+ "brssign");
+ if (ret != 0) {
+ gf_smsg(this->name, GF_LOG_WARNING, 0, BRS_MSG_SPAWN_SIGN_THRD_FAILED,
+ NULL);
+ goto cleanup_lock;
+ }
+
+ ret = br_stub_bad_object_container_init(this, priv);
+ if (ret) {
+ gf_smsg(this->name, GF_LOG_ERROR, 0, BRS_MSG_BAD_CONTAINER_FAIL, NULL);
+ goto cleanup_lock;
+ }
+
+ gf_msg_debug(this->name, 0, "bit-rot stub loaded");
+
+ return 0;
+
+cleanup_lock:
+ pthread_cond_destroy(&priv->cond);
+ pthread_mutex_destroy(&priv->lock);
+free_mempool:
+ mem_pool_destroy(priv->local_pool);
+ priv->local_pool = NULL;
+free_priv:
+ GF_FREE(priv);
+ this->private = NULL;
+error_return:
+ return -1;
+}
+
+/* TODO:
+ * As of now enabling bitrot option does 2 things.
+ * 1) Start the Bitrot Daemon which signs the objects (currently files only)
+ * upon getting notified by the stub.
+ * 2) Enable versioning of the objects. Object versions (again files only) are
+ * incremented upon modification.
+ * So object versioning is tied to bitrot daemon's signing. In future, object
+ * versioning might be necessary for other things as well apart from bit-rot
+ * detection (well that's the objective of bringing in object-versioning :)).
+ * In that case, better to make versioning a new option and letting it to be
+ * enabled despite bit-rot detection is not needed.
+ * Ex: ICAP.
+ */
+int32_t
+reconfigure(xlator_t *this, dict_t *options)
+{
+ int32_t ret = -1;
+ br_stub_private_t *priv = NULL;
+
+ priv = this->private;
+
+ GF_OPTION_RECONF("bitrot", priv->do_versioning, options, bool, err);
+ if (priv->do_versioning && !priv->signth) {
+ ret = gf_thread_create(&priv->signth, NULL, br_stub_signth, this,
+ "brssign");
+ if (ret != 0) {
+ gf_smsg(this->name, GF_LOG_WARNING, 0,
+ BRS_MSG_SPAWN_SIGN_THRD_FAILED, NULL);
+ goto err;
+ }
+
+ ret = br_stub_bad_object_container_init(this, priv);
+ if (ret) {
+ gf_smsg(this->name, GF_LOG_ERROR, 0, BRS_MSG_BAD_CONTAINER_FAIL,
+ NULL);
+ goto err;
+ }
+ } else {
+ if (priv->signth) {
+ if (gf_thread_cleanup_xint(priv->signth)) {
+ gf_smsg(this->name, GF_LOG_ERROR, 0,
+ BRS_MSG_CANCEL_SIGN_THREAD_FAILED, NULL);
+ } else {
+ gf_smsg(this->name, GF_LOG_INFO, 0, BRS_MSG_KILL_SIGN_THREAD,
+ NULL);
+ priv->signth = 0;
+ }
+ }
+
+ if (priv->container.thread) {
+ if (gf_thread_cleanup_xint(priv->container.thread)) {
+ gf_smsg(this->name, GF_LOG_ERROR, 0,
+ BRS_MSG_CANCEL_SIGN_THREAD_FAILED, NULL);
+ }
+ priv->container.thread = 0;
+ }
+ }
+
+ ret = 0;
+ return ret;
+err:
+ if (priv->signth) {
+ if (gf_thread_cleanup_xint(priv->signth)) {
+ gf_smsg(this->name, GF_LOG_ERROR, 0,
+ BRS_MSG_CANCEL_SIGN_THREAD_FAILED, NULL);
+ }
+ priv->signth = 0;
+ }
+
+ if (priv->container.thread) {
+ if (gf_thread_cleanup_xint(priv->container.thread)) {
+ gf_smsg(this->name, GF_LOG_ERROR, 0,
+ BRS_MSG_CANCEL_SIGN_THREAD_FAILED, NULL);
+ }
+ priv->container.thread = 0;
+ }
+ ret = -1;
+ return ret;
+}
+
+int
+notify(xlator_t *this, int event, void *data, ...)
+{
+ br_stub_private_t *priv = NULL;
+
+ if (!this)
+ return 0;
+
+ priv = this->private;
+ if (!priv)
+ return 0;
+
+ default_notify(this, event, data);
+ return 0;
+}
+
+void
+fini(xlator_t *this)
+{
+ int32_t ret = 0;
+ br_stub_private_t *priv = this->private;
+ struct br_stub_signentry *sigstub = NULL;
+ call_stub_t *stub = NULL;
+
+ if (!priv)
+ return;
+
+ if (!priv->do_versioning)
+ goto cleanup;
+
+ ret = gf_thread_cleanup_xint(priv->signth);
+ if (ret) {
+ gf_smsg(this->name, GF_LOG_ERROR, 0, BRS_MSG_CANCEL_SIGN_THREAD_FAILED,
+ NULL);
+ goto out;
+ }
+ priv->signth = 0;
+
+ while (!list_empty(&priv->squeue)) {
+ sigstub = list_first_entry(&priv->squeue, struct br_stub_signentry,
+ list);
+ list_del_init(&sigstub->list);
+
+ call_stub_destroy(sigstub->stub);
+ GF_FREE(sigstub);
+ }
+
+ ret = gf_thread_cleanup_xint(priv->container.thread);
+ if (ret) {
+ gf_smsg(this->name, GF_LOG_ERROR, 0, BRS_MSG_CANCEL_SIGN_THREAD_FAILED,
+ NULL);
+ goto out;
+ }
+
+ priv->container.thread = 0;
+
+ while (!list_empty(&priv->container.bad_queue)) {
+ stub = list_first_entry(&priv->container.bad_queue, call_stub_t, list);
+ list_del_init(&stub->list);
+ call_stub_destroy(stub);
+ }
+
+ pthread_mutex_destroy(&priv->container.bad_lock);
+ pthread_cond_destroy(&priv->container.bad_cond);
+
+cleanup:
+ pthread_mutex_destroy(&priv->lock);
+ pthread_cond_destroy(&priv->cond);
+
+ if (priv->local_pool) {
+ mem_pool_destroy(priv->local_pool);
+ priv->local_pool = NULL;
+ }
+
+ this->private = NULL;
+ GF_FREE(priv);
+
+out:
+ return;
+}
+
+static int
+br_stub_alloc_versions(br_version_t **obuf, br_signature_t **sbuf,
+ size_t signaturelen)
+{
+ void *mem = NULL;
+ size_t size = 0;
+
+ if (obuf)
+ size += sizeof(br_version_t);
+ if (sbuf)
+ size += sizeof(br_signature_t) + signaturelen;
+
+ mem = GF_CALLOC(1, size, gf_br_stub_mt_version_t);
+ if (!mem)
+ goto error_return;
+
+ if (obuf) {
+ *obuf = (br_version_t *)mem;
+ mem = ((char *)mem + sizeof(br_version_t));
+ }
+ if (sbuf) {
+ *sbuf = (br_signature_t *)mem;
+ }
+
+ return 0;
+
+error_return:
+ return -1;
+}
+
+static void
+br_stub_dealloc_versions(void *mem)
+{
+ GF_FREE(mem);
+}
+
+static br_stub_local_t *
+br_stub_alloc_local(xlator_t *this)
+{
+ br_stub_private_t *priv = this->private;
+
+ return mem_get0(priv->local_pool);
+}
+
+static void
+br_stub_dealloc_local(br_stub_local_t *ptr)
+{
+ if (!ptr)
+ return;
+
+ mem_put(ptr);
+}
+
+static int
+br_stub_prepare_version_request(xlator_t *this, dict_t *dict,
+ br_version_t *obuf, unsigned long oversion)
+{
+ br_stub_private_t *priv = NULL;
+
+ priv = this->private;
+ br_set_ongoingversion(obuf, oversion, priv->boot);
+
+ return dict_set_bin(dict, BITROT_CURRENT_VERSION_KEY, (void *)obuf,
+ sizeof(br_version_t));
+}
+
+static int
+br_stub_prepare_signing_request(dict_t *dict, br_signature_t *sbuf,
+ br_isignature_t *sign, size_t signaturelen)
+{
+ size_t size = 0;
+
+ br_set_signature(sbuf, sign, signaturelen, &size);
+
+ return dict_set_bin(dict, BITROT_SIGNING_VERSION_KEY, (void *)sbuf, size);
+}
+
+/**
+ * initialize an inode context starting with a given ongoing version.
+ * a fresh lookup() or a first creat() call initializes the inode
+ * context, hence the inode is marked dirty. this routine also
+ * initializes the transient inode version.
+ */
+static int
+br_stub_init_inode_versions(xlator_t *this, fd_t *fd, inode_t *inode,
+ unsigned long version, gf_boolean_t markdirty,
+ gf_boolean_t bad_object, uint64_t *ctx_addr)
+{
+ int32_t ret = 0;
+ br_stub_inode_ctx_t *ctx = NULL;
+
+ ctx = GF_CALLOC(1, sizeof(br_stub_inode_ctx_t), gf_br_stub_mt_inode_ctx_t);
+ if (!ctx)
+ goto error_return;
+
+ INIT_LIST_HEAD(&ctx->fd_list);
+ (markdirty) ? __br_stub_mark_inode_dirty(ctx)
+ : __br_stub_mark_inode_synced(ctx);
+ __br_stub_set_ongoing_version(ctx, version);
+
+ if (bad_object)
+ __br_stub_mark_object_bad(ctx);
+
+ if (fd) {
+ ret = br_stub_add_fd_to_inode(this, fd, ctx);
+ if (ret)
+ goto free_ctx;
+ }
+
+ ret = br_stub_set_inode_ctx(this, inode, ctx);
+ if (ret)
+ goto free_ctx;
+
+ if (ctx_addr)
+ *ctx_addr = (uint64_t)(uintptr_t)ctx;
+ return 0;
+
+free_ctx:
+ GF_FREE(ctx);
+error_return:
+ return -1;
+}
+
+/**
+ * modify the ongoing version of an inode.
+ */
+static int
+br_stub_mod_inode_versions(xlator_t *this, fd_t *fd, inode_t *inode,
+ unsigned long version)
+{
+ int32_t ret = -1;
+ br_stub_inode_ctx_t *ctx = 0;
+
+ LOCK(&inode->lock);
+ {
+ ctx = __br_stub_get_ongoing_version_ctx(this, inode, NULL);
+ if (ctx == NULL)
+ goto unblock;
+ if (__br_stub_is_inode_dirty(ctx)) {
+ __br_stub_set_ongoing_version(ctx, version);
+ __br_stub_mark_inode_synced(ctx);
+ }
+
+ ret = 0;
+ }
+unblock:
+ UNLOCK(&inode->lock);
+
+ return ret;
+}
+
+static void
+br_stub_fill_local(br_stub_local_t *local, call_stub_t *stub, fd_t *fd,
+ inode_t *inode, uuid_t gfid, int versioningtype,
+ unsigned long memversion)
+{
+ local->fopstub = stub;
+ local->versioningtype = versioningtype;
+ local->u.context.version = memversion;
+ if (fd)
+ local->u.context.fd = fd_ref(fd);
+ if (inode)
+ local->u.context.inode = inode_ref(inode);
+ gf_uuid_copy(local->u.context.gfid, gfid);
+}
+
+static void
+br_stub_cleanup_local(br_stub_local_t *local)
+{
+ if (!local)
+ return;
+
+ local->fopstub = NULL;
+ local->versioningtype = 0;
+ local->u.context.version = 0;
+ if (local->u.context.fd) {
+ fd_unref(local->u.context.fd);
+ local->u.context.fd = NULL;
+ }
+ if (local->u.context.inode) {
+ inode_unref(local->u.context.inode);
+ local->u.context.inode = NULL;
+ }
+ memset(local->u.context.gfid, '\0', sizeof(uuid_t));
+}
+
+static int
+br_stub_need_versioning(xlator_t *this, fd_t *fd, gf_boolean_t *versioning,
+ gf_boolean_t *modified, br_stub_inode_ctx_t **ctx)
+{
+ int32_t ret = -1;
+ uint64_t ctx_addr = 0;
+ br_stub_inode_ctx_t *c = NULL;
+ unsigned long version = BITROT_DEFAULT_CURRENT_VERSION;
+
+ *versioning = _gf_false;
+ *modified = _gf_false;
+
+ /* Bitrot stub inode context was initialized only in lookup, create
+ * and mknod cbk path. Object versioning was enabled by default
+ * irrespective of bitrot enabled or not. But it's made optional now.
+ * As a consequence there could be cases where getting inode ctx would
+ * fail because it's not set yet.
+ * e.g., If versioning (with bitrot enable) is enabled while I/O is
+ * happening, it could directly get other fops like writev without
+ * lookup, where getting inode ctx would fail. Hence initialize the
+ * inode ctx on failure to get ctx. This is done in all places where
+ * applicable.
+ */
+ ret = br_stub_get_inode_ctx(this, fd->inode, &ctx_addr);
+ if (ret < 0) {
+ ret = br_stub_init_inode_versions(this, fd, fd->inode, version,
+ _gf_true, _gf_false, &ctx_addr);
+ if (ret) {
+ gf_smsg(this->name, GF_LOG_ERROR, 0,
+ BRS_MSG_GET_INODE_CONTEXT_FAILED, "gfid=%s",
+ uuid_utoa(fd->inode->gfid), NULL);
+ goto error_return;
+ }
+ }
+
+ c = (br_stub_inode_ctx_t *)(long)ctx_addr;
+
+ LOCK(&fd->inode->lock);
+ {
+ if (__br_stub_is_inode_dirty(c))
+ *versioning = _gf_true;
+ if (__br_stub_is_inode_modified(c))
+ *modified = _gf_true;
+ }
+ UNLOCK(&fd->inode->lock);
+
+ if (ctx)
+ *ctx = c;
+ return 0;
+
+error_return:
+ return -1;
+}
+
+static int32_t
+br_stub_anon_fd_ctx(xlator_t *this, fd_t *fd, br_stub_inode_ctx_t *ctx)
+{
+ int32_t ret = -1;
+ br_stub_fd_t *br_stub_fd = NULL;
+
+ br_stub_fd = br_stub_fd_ctx_get(this, fd);
+ if (!br_stub_fd) {
+ ret = br_stub_add_fd_to_inode(this, fd, ctx);
+ if (ret) {
+ gf_smsg(this->name, GF_LOG_ERROR, 0, BRS_MSG_ADD_FD_TO_INODE,
+ "gfid=%s", uuid_utoa(fd->inode->gfid), NULL);
+ goto out;
+ }
+ }
+
+ ret = 0;
+
+out:
+ return ret;
+}
+
+static int
+br_stub_versioning_prep(call_frame_t *frame, xlator_t *this, fd_t *fd,
+ br_stub_inode_ctx_t *ctx)
+{
+ int32_t ret = -1;
+ br_stub_local_t *local = NULL;
+
+ local = br_stub_alloc_local(this);
+ if (!local) {
+ gf_smsg(this->name, GF_LOG_ERROR, ENOMEM, BRS_MSG_NO_MEMORY, "gfid=%s",
+ uuid_utoa(fd->inode->gfid), NULL);
+ goto error_return;
+ }
+
+ if (fd_is_anonymous(fd)) {
+ ret = br_stub_anon_fd_ctx(this, fd, ctx);
+ if (ret)
+ goto free_local;
+ }
+
+ frame->local = local;
+
+ return 0;
+
+free_local:
+ br_stub_dealloc_local(local);
+error_return:
+ return -1;
+}
+
+static int
+br_stub_mark_inode_modified(xlator_t *this, br_stub_local_t *local)
+{
+ fd_t *fd = NULL;
+ int32_t ret = 0;
+ uint64_t ctx_addr = 0;
+ br_stub_inode_ctx_t *ctx = NULL;
+ unsigned long version = BITROT_DEFAULT_CURRENT_VERSION;
+
+ fd = local->u.context.fd;
+
+ ret = br_stub_get_inode_ctx(this, fd->inode, &ctx_addr);
+ if (ret < 0) {
+ ret = br_stub_init_inode_versions(this, fd, fd->inode, version,
+ _gf_true, _gf_false, &ctx_addr);
+ if (ret)
+ goto error_return;
+ }
+
+ ctx = (br_stub_inode_ctx_t *)(long)ctx_addr;
+
+ LOCK(&fd->inode->lock);
+ {
+ __br_stub_set_inode_modified(ctx);
+ }
+ UNLOCK(&fd->inode->lock);
+
+ return 0;
+
+error_return:
+ return -1;
+}
+
+/**
+ * The possible return values from br_stub_is_bad_object () are:
+ * 1) 0 => as per the inode context object is not bad
+ * 2) -1 => Failed to get the inode context itself
+ * 3) -2 => As per the inode context object is bad
+ * Both -ve values means the fop which called this function is failed
+ * and error is returned upwards.
+ */
+static int
+br_stub_check_bad_object(xlator_t *this, inode_t *inode, int32_t *op_ret,
+ int32_t *op_errno)
+{
+ int ret = -1;
+ unsigned long version = BITROT_DEFAULT_CURRENT_VERSION;
+
+ ret = br_stub_is_bad_object(this, inode);
+ if (ret == -2) {
+ gf_smsg(this->name, GF_LOG_ERROR, 0, BRS_MSG_BAD_OBJECT_ACCESS,
+ "gfid=%s", uuid_utoa(inode->gfid), NULL);
+ *op_ret = -1;
+ *op_errno = EIO;
+ }
+
+ if (ret == -1) {
+ ret = br_stub_init_inode_versions(this, NULL, inode, version, _gf_true,
+ _gf_false, NULL);
+ if (ret) {
+ gf_smsg(this->name, GF_LOG_ERROR, 0,
+ BRS_MSG_GET_INODE_CONTEXT_FAILED, "gfid=%s",
+ uuid_utoa(inode->gfid), NULL);
+ *op_ret = -1;
+ *op_errno = EINVAL;
+ }
+ }
+
+ return ret;
+}
+
+/**
+ * callback for inode/fd versioning
+ */
+int
+br_stub_fd_incversioning_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
+ int op_ret, int op_errno, dict_t *xdata)
+{
+ fd_t *fd = NULL;
+ inode_t *inode = NULL;
+ unsigned long version = 0;
+ br_stub_local_t *local = NULL;
+
+ local = (br_stub_local_t *)frame->local;
+ if (op_ret < 0)
+ goto done;
+ fd = local->u.context.fd;
+ inode = local->u.context.inode;
+ version = local->u.context.version;
+
+ op_ret = br_stub_mod_inode_versions(this, fd, inode, version);
+ if (op_ret < 0)
+ op_errno = EINVAL;
+
+done:
+ if (op_ret < 0) {
+ frame->local = NULL;
+ call_unwind_error(local->fopstub, -1, op_errno);
+ br_stub_cleanup_local(local);
+ br_stub_dealloc_local(local);
+ } else {
+ call_resume(local->fopstub);
+ }
+ return 0;
+}
+
+/**
+ * Initial object versioning
+ *
+ * Version persists two (2) extended attributes as explained below:
+ * 1. Current (ongoing) version: This is incremented on an writev ()
+ * or truncate () and is the running version for an object.
+ * 2. Signing version: This is the version against which an object
+ * was signed (checksummed).
+ *
+ * During initial versioning, both ongoing and signing versions are
+ * set of one and zero respectively. A write() call increments the
+ * ongoing version as an indication of modification to the object.
+ * Additionally this needs to be persisted on disk and needs to be
+ * durable: fsync().. :-/
+ * As an optimization only the first write() synchronizes the ongoing
+ * version to disk, subsequent write()s before the *last* release()
+ * are no-op's.
+ *
+ * create(), just like lookup() initializes the object versions to
+ * the default. As an optimization this is not a durable operation:
+ * in case of a crash, hard reboot etc.. absence of versioning xattrs
+ * is ignored in scrubber along with the one time crawler explicitly
+ * triggering signing for such objects.
+ *
+ * c.f. br_stub_writev() / br_stub_truncate()
+ */
+
+/**
+ * perform full or incremental versioning on an inode pointd by an
+ * fd. incremental versioning is done when an inode is dirty and a
+ * writeback is triggered.
+ */
+
+int
+br_stub_fd_versioning(xlator_t *this, call_frame_t *frame, call_stub_t *stub,
+ dict_t *dict, fd_t *fd, br_stub_version_cbk *callback,
+ unsigned long memversion, int versioningtype, int durable)
+{
+ int32_t ret = -1;
+ int flags = 0;
+ dict_t *xdata = NULL;
+ br_stub_local_t *local = NULL;
+
+ xdata = dict_new();
+ if (!xdata)
+ goto done;
+
+ ret = dict_set_int32(xdata, GLUSTERFS_INTERNAL_FOP_KEY, 1);
+ if (ret)
+ goto dealloc_xdata;
+
+ if (durable) {
+ ret = dict_set_int32(xdata, GLUSTERFS_DURABLE_OP, 0);
+ if (ret)
+ goto dealloc_xdata;
+ }
+
+ local = frame->local;
+
+ br_stub_fill_local(local, stub, fd, fd->inode, fd->inode->gfid,
+ versioningtype, memversion);
+
+ STACK_WIND(frame, callback, FIRST_CHILD(this),
+ FIRST_CHILD(this)->fops->fsetxattr, fd, dict, flags, xdata);
+
+ ret = 0;
+
+dealloc_xdata:
+ dict_unref(xdata);
+done:
+ return ret;
+}
+
+static int
+br_stub_perform_incversioning(xlator_t *this, call_frame_t *frame,
+ call_stub_t *stub, fd_t *fd,
+ br_stub_inode_ctx_t *ctx)
+{
+ int32_t ret = -1;
+ dict_t *dict = NULL;
+ br_version_t *obuf = NULL;
+ unsigned long writeback_version = 0;
+ int op_errno = 0;
+ br_stub_local_t *local = NULL;
+
+ op_errno = EINVAL;
+ local = frame->local;
+
+ writeback_version = __br_stub_writeback_version(ctx);
+
+ op_errno = ENOMEM;
+ dict = dict_new();
+ if (!dict)
+ goto out;
+ ret = br_stub_alloc_versions(&obuf, NULL, 0);
+ if (ret) {
+ gf_smsg(this->name, GF_LOG_ERROR, 0, BRS_MSG_ALLOC_MEM_FAILED,
+ "gfid=%s", uuid_utoa(fd->inode->gfid), NULL);
+ goto out;
+ }
+ ret = br_stub_prepare_version_request(this, dict, obuf, writeback_version);
+ if (ret) {
+ gf_smsg(this->name, GF_LOG_ERROR, 0, BRS_MSG_VERSION_PREPARE_FAIL,
+ "gfid=%s", uuid_utoa(fd->inode->gfid), NULL);
+ br_stub_dealloc_versions(obuf);
+ goto out;
+ }
+
+ ret = br_stub_fd_versioning(
+ this, frame, stub, dict, fd, br_stub_fd_incversioning_cbk,
+ writeback_version, BR_STUB_INCREMENTAL_VERSIONING, !WRITEBACK_DURABLE);
+out:
+ if (dict)
+ dict_unref(dict);
+ if (ret) {
+ if (local)
+ frame->local = NULL;
+ call_unwind_error(stub, -1, op_errno);
+ if (local) {
+ br_stub_cleanup_local(local);
+ br_stub_dealloc_local(local);
+ }
+ }
+
+ return ret;
+}
+
+/** {{{ */
+
+/* fsetxattr() */
+
+int32_t
+br_stub_perform_objsign(call_frame_t *frame, xlator_t *this, fd_t *fd,
+ dict_t *dict, int flags, dict_t *xdata)
+{
+ STACK_WIND(frame, default_fsetxattr_cbk, FIRST_CHILD(this),
+ FIRST_CHILD(this)->fops->fsetxattr, fd, dict, flags, xdata);
+
+ dict_unref(xdata);
+ return 0;
+}
+
+void *
+br_stub_signth(void *arg)
+{
+ xlator_t *this = arg;
+ br_stub_private_t *priv = this->private;
+ struct br_stub_signentry *sigstub = NULL;
+
+ THIS = this;
+ while (1) {
+ /*
+ * Disabling bit-rot feature leads to this particular thread
+ * getting cleaned up by reconfigure via a call to the function
+ * gf_thread_cleanup_xint (which in turn calls pthread_cancel
+ * and pthread_join). But, if this thread had held the mutex
+ * &priv->lock at the time of cancellation, then it leads to
+ * deadlock in future when bit-rot feature is enabled (which
+ * again spawns this thread which cant hold the lock as the
+ * mutex is still held by the previous instance of the thread
+ * which got killed). Also, the br_stub_handle_object_signature
+ * function which is called whenever file has to be signed
+ * also gets blocked as it too attempts to acquire &priv->lock.
+ *
+ * So, arrange for the lock to be unlocked as part of the
+ * cleanup of this thread using pthread_cleanup_push and
+ * pthread_cleanup_pop.
+ */
+ pthread_cleanup_push(br_stub_lock_cleaner, &priv->lock);
+ pthread_mutex_lock(&priv->lock);
+ {
+ while (list_empty(&priv->squeue))
+ pthread_cond_wait(&priv->cond, &priv->lock);
+
+ sigstub = list_first_entry(&priv->squeue, struct br_stub_signentry,
+ list);
+ list_del_init(&sigstub->list);
+ }
+ pthread_mutex_unlock(&priv->lock);
+ pthread_cleanup_pop(0);
+
+ call_resume(sigstub->stub);
+
+ GF_FREE(sigstub);
+ }
+
+ return NULL;
+}
+
+static gf_boolean_t
+br_stub_internal_xattr(dict_t *dict)
+{
+ if (dict_get(dict, GLUSTERFS_SET_OBJECT_SIGNATURE) ||
+ dict_get(dict, GLUSTERFS_GET_OBJECT_SIGNATURE) ||
+ dict_get(dict, BR_REOPEN_SIGN_HINT_KEY) ||
+ dict_get(dict, BITROT_OBJECT_BAD_KEY) ||
+ dict_get(dict, BITROT_SIGNING_VERSION_KEY) ||
+ dict_get(dict, BITROT_CURRENT_VERSION_KEY))
+ return _gf_true;
+
+ return _gf_false;
+}
+
+int
+orderq(struct list_head *elem1, struct list_head *elem2)
+{
+ struct br_stub_signentry *s1 = NULL;
+ struct br_stub_signentry *s2 = NULL;
+
+ s1 = list_entry(elem1, struct br_stub_signentry, list);
+ s2 = list_entry(elem2, struct br_stub_signentry, list);
+
+ return (s1->v > s2->v);
+}
+
+static int
+br_stub_compare_sign_version(xlator_t *this, inode_t *inode,
+ br_signature_t *sbuf, dict_t *dict,
+ int *fakesuccess)
+{
+ int32_t ret = -1;
+ uint64_t tmp_ctx = 0;
+ gf_boolean_t invalid = _gf_false;
+ br_stub_inode_ctx_t *ctx = NULL;
+
+ GF_VALIDATE_OR_GOTO("bit-rot-stub", this, out);
+ GF_VALIDATE_OR_GOTO(this->name, inode, out);
+ GF_VALIDATE_OR_GOTO(this->name, sbuf, out);
+ GF_VALIDATE_OR_GOTO(this->name, dict, out);
+
+ ret = br_stub_get_inode_ctx(this, inode, &tmp_ctx);
+ if (ret) {
+ dict_del(dict, BITROT_SIGNING_VERSION_KEY);
+ goto out;
+ }
+
+ ctx = (br_stub_inode_ctx_t *)(long)tmp_ctx;
+
+ LOCK(&inode->lock);
+ {
+ if (ctx->currentversion < sbuf->signedversion) {
+ invalid = _gf_true;
+ } else if (ctx->currentversion > sbuf->signedversion) {
+ gf_msg_debug(this->name, 0,
+ "\"Signing version\" "
+ "(%lu) lower than \"Current version \" "
+ "(%lu)",
+ ctx->currentversion, sbuf->signedversion);
+ *fakesuccess = 1;
+ }
+ }
+ UNLOCK(&inode->lock);
+
+ if (invalid) {
+ ret = -1;
+ gf_smsg(this->name, GF_LOG_WARNING, 0, BRS_MSG_SIGN_VERSION_ERROR,
+ "Signing-ver=%lu", sbuf->signedversion, "current-ver=%lu",
+ ctx->currentversion, NULL);
+ }
+
+out:
+ return ret;
+}
+
+static int
+br_stub_prepare_signature(xlator_t *this, dict_t *dict, inode_t *inode,
+ br_isignature_t *sign, int *fakesuccess)
+{
+ int32_t ret = -1;
+ size_t signaturelen = 0;
+ br_signature_t *sbuf = NULL;
+
+ if (!br_is_signature_type_valid(sign->signaturetype))
+ goto out;
+
+ signaturelen = sign->signaturelen;
+ ret = br_stub_alloc_versions(NULL, &sbuf, signaturelen);
+ if (ret) {
+ gf_smsg(this->name, GF_LOG_ERROR, 0, BRS_MSG_ALLOC_MEM_FAILED,
+ "gfid=%s", uuid_utoa(inode->gfid), NULL);
+ ret = -1;
+ goto out;
+ }
+ ret = br_stub_prepare_signing_request(dict, sbuf, sign, signaturelen);
+ if (ret) {
+ gf_smsg(this->name, GF_LOG_ERROR, 0, BRS_MSG_SIGN_PREPARE_FAIL,
+ "gfid=%s", uuid_utoa(inode->gfid), NULL);
+ ret = -1;
+ br_stub_dealloc_versions(sbuf);
+ goto out;
+ }
+
+ /* At this point sbuf has been added to dict, so the memory will be freed
+ * when the data from the dict is destroyed
+ */
+ ret = br_stub_compare_sign_version(this, inode, sbuf, dict, fakesuccess);
+out:
+ return ret;
+}
+
+static void
+br_stub_handle_object_signature(call_frame_t *frame, xlator_t *this, fd_t *fd,
+ dict_t *dict, br_isignature_t *sign,
+ dict_t *xdata)
+{
+ int32_t ret = -1;
+ int32_t op_ret = -1;
+ int32_t op_errno = EINVAL;
+ int fakesuccess = 0;
+ br_stub_private_t *priv = NULL;
+ struct br_stub_signentry *sigstub = NULL;
+
+ priv = this->private;
+
+ if (frame->root->pid != GF_CLIENT_PID_BITD) {
+ gf_smsg(this->name, GF_LOG_WARNING, op_errno, BRS_MSG_NON_BITD_PID,
+ "PID=%d", frame->root->pid, NULL);
+ goto dofop;
+ }
+
+ ret = br_stub_prepare_signature(this, dict, fd->inode, sign, &fakesuccess);
+ if (ret) {
+ gf_smsg(this->name, GF_LOG_WARNING, 0, BRS_MSG_SIGN_PREPARE_FAIL,
+ "gfid=%s", uuid_utoa(fd->inode->gfid), NULL);
+ goto dofop;
+ }
+ if (fakesuccess) {
+ op_ret = op_errno = 0;
+ goto dofop;
+ }
+
+ dict_del(dict, GLUSTERFS_SET_OBJECT_SIGNATURE);
+
+ ret = -1;
+ if (!xdata) {
+ xdata = dict_new();
+ if (!xdata)
+ goto dofop;
+ } else {
+ dict_ref(xdata);
+ }
+
+ ret = dict_set_int32(xdata, GLUSTERFS_DURABLE_OP, 0);
+ if (ret)
+ goto unref_dict;
+
+ /* prepare dispatch stub to order object signing */
+ sigstub = GF_CALLOC(1, sizeof(*sigstub), gf_br_stub_mt_sigstub_t);
+ if (!sigstub)
+ goto unref_dict;
+
+ INIT_LIST_HEAD(&sigstub->list);
+ sigstub->v = ntohl(sign->signedversion);
+ sigstub->stub = fop_fsetxattr_stub(frame, br_stub_perform_objsign, fd, dict,
+ 0, xdata);
+ if (!sigstub->stub)
+ goto cleanup_stub;
+
+ pthread_mutex_lock(&priv->lock);
+ {
+ list_add_order(&sigstub->list, &priv->squeue, orderq);
+ pthread_cond_signal(&priv->cond);
+ }
+ pthread_mutex_unlock(&priv->lock);
+
+ return;
+
+cleanup_stub:
+ GF_FREE(sigstub);
+unref_dict:
+ dict_unref(xdata);
+dofop:
+ STACK_UNWIND_STRICT(fsetxattr, frame, op_ret, op_errno, NULL);
+}
+
+int32_t
+br_stub_fsetxattr_resume(call_frame_t *frame, void *cookie, xlator_t *this,
+ int32_t op_ret, int32_t op_errno, dict_t *xdata)
+{
+ int32_t ret = -1;
+ br_stub_local_t *local = NULL;
+
+ local = frame->local;
+ frame->local = NULL;
+
+ ret = br_stub_mark_inode_modified(this, local);
+ if (ret) {
+ op_ret = -1;
+ op_errno = EINVAL;
+ }
+
+ STACK_UNWIND_STRICT(fsetxattr, frame, op_ret, op_errno, xdata);
+
+ br_stub_cleanup_local(local);
+ br_stub_dealloc_local(local);
+
+ return 0;
+}
+
+/**
+ * Handles object reopens. Object reopens can be of 3 types. 2 are from
+ * oneshot crawler and 1 from the regular signer.
+ * ONESHOT CRAWLER:
+ * For those objects which were created before bitrot was enabled. oneshow
+ * crawler crawls the namespace and signs all the objects. It has to do
+ * the versioning before making bit-rot-stub send a sign notification.
+ * So it sends fsetxattr with BR_OBJECT_REOPEN as the value. And bit-rot-stub
+ * upon getting BR_OBJECT_REOPEN value checks if the version has to be
+ * increased or not. By default the version will be increased. But if the
+ * object is modified before BR_OBJECT_REOPEN from oneshot crawler, then
+ * versioning need not be done. In that case simply a success is returned.
+ * SIGNER:
+ * Signer wait for 2 minutes upon getting the notification from bit-rot-stub
+ * and then it sends a dummy write (in reality a fsetxattr) call, to change
+ * the state of the inode from REOPEN_WAIT to SIGN_QUICK. The funny part here
+ * is though the inode's state is REOPEN_WAIT, the call sent by signer is
+ * BR_OBJECT_RESIGN. Once the state is changed to SIGN_QUICK, then yet another
+ * notification is sent upon release (RESIGN would have happened via fsetxattr,
+ * so a fd is needed) and the object is signed truly this time.
+ * There is a challenge in the above RESIGN method by signer. After sending
+ * the 1st notification, the inode could be forgotten before RESIGN request
+ * is received. In that case, the inode's context (the newly looked up inode)
+ * would not indicate the inode as being modified (it would be in the default
+ * state) and because of this, a SIGN_QUICK notification to truly sign the
+ * object would not be sent. So, this is how its handled.
+ * if (request == RESIGN) {
+ * if (inode->sign_info == NORMAL) {
+ * mark_inode_non_dirty;
+ * mark_inode_modified;
+ * }
+ * GOBACK (means unwind without doing versioning)
+ * }
+ */
+static void
+br_stub_handle_object_reopen(call_frame_t *frame, xlator_t *this, fd_t *fd,
+ uint32_t val)
+{
+ int32_t ret = -1;
+ int32_t op_ret = -1;
+ int32_t op_errno = EINVAL;
+ call_stub_t *stub = NULL;
+ gf_boolean_t inc_version = _gf_false;
+ gf_boolean_t modified = _gf_false;
+ br_stub_inode_ctx_t *ctx = NULL;
+ br_stub_local_t *local = NULL;
+ gf_boolean_t goback = _gf_true;
+
+ ret = br_stub_need_versioning(this, fd, &inc_version, &modified, &ctx);
+ if (ret)
+ goto unwind;
+
+ LOCK(&fd->inode->lock);
+ {
+ if ((val == BR_OBJECT_REOPEN) && inc_version)
+ goback = _gf_false;
+ if (val == BR_OBJECT_RESIGN && ctx->info_sign == BR_SIGN_NORMAL) {
+ __br_stub_mark_inode_synced(ctx);
+ __br_stub_set_inode_modified(ctx);
+ }
+ (void)__br_stub_inode_sign_state(ctx, GF_FOP_FSETXATTR, fd);
+ }
+ UNLOCK(&fd->inode->lock);
+
+ if (goback) {
+ op_ret = op_errno = 0;
+ goto unwind;
+ }
+
+ ret = br_stub_versioning_prep(frame, this, fd, ctx);
+ if (ret)
+ goto unwind;
+ local = frame->local;
+
+ stub = fop_fsetxattr_cbk_stub(frame, br_stub_fsetxattr_resume, 0, 0, NULL);
+ if (!stub) {
+ gf_smsg(this->name, GF_LOG_ERROR, 0, BRS_MSG_STUB_ALLOC_FAILED,
+ "fsetxattr gfid=%s", uuid_utoa(fd->inode->gfid), NULL);
+ goto cleanup_local;
+ }
+
+ (void)br_stub_perform_incversioning(this, frame, stub, fd, ctx);
+ return;
+
+cleanup_local:
+ br_stub_cleanup_local(local);
+ br_stub_dealloc_local(local);
+
+unwind:
+ frame->local = NULL;
+ STACK_UNWIND_STRICT(fsetxattr, frame, op_ret, op_errno, NULL);
+}
+
+/**
+ * This function only handles bad file identification. Instead of checking in
+ * fops like open, readv, writev whether the object is bad or not by doing
+ * getxattr calls, better to catch them when scrubber marks it as bad.
+ * So this callback is called only when the fsetxattr is sent by the scrubber
+ * to mark the object as bad.
+ */
+int
+br_stub_fsetxattr_bad_object_cbk(call_frame_t *frame, void *cookie,
+ xlator_t *this, int32_t op_ret,
+ int32_t op_errno, dict_t *xdata)
+{
+ br_stub_local_t *local = NULL;
+ int32_t ret = -1;
+
+ local = frame->local;
+ frame->local = NULL;
+
+ if (op_ret < 0)
+ goto unwind;
+
+ /*
+ * What to do if marking the object as bad fails? (i.e. in memory
+ * marking within the inode context. If we are here means fsetxattr
+ * fop has succeeded on disk and the bad object xattr has been set).
+ * We can return failure to scruber, but there is nothing the scrubber
+ * can do with it (it might assume that the on disk setxattr itself has
+ * failed). The main purpose of this operation is to help identify the
+ * bad object by checking the inode context itself (thus avoiding the
+ * necessity of doing a getxattr fop on the disk).
+ *
+ * So as of now, success itself is being returned even though inode
+ * context set operation fails.
+ * In future if there is any change in the policy which can handle this,
+ * then appropriate response should be sent (i.e. success or error).
+ */
+ ret = br_stub_mark_object_bad(this, local->u.context.inode);
+ if (ret)
+ gf_smsg(this->name, GF_LOG_ERROR, 0, BRS_MSG_BAD_OBJ_MARK_FAIL,
+ "gfid=%s", uuid_utoa(local->u.context.inode->gfid), NULL);
+
+ ret = br_stub_add(this, local->u.context.inode->gfid);
+
+unwind:
+ STACK_UNWIND_STRICT(fsetxattr, frame, op_ret, op_errno, xdata);
+ br_stub_cleanup_local(local);
+ br_stub_dealloc_local(local);
+ return 0;
+}
+
+static int32_t
+br_stub_handle_bad_object_key(call_frame_t *frame, xlator_t *this, fd_t *fd,
+ dict_t *dict, int flags, dict_t *xdata)
+{
+ br_stub_local_t *local = NULL;
+ int32_t op_ret = -1;
+ int32_t op_errno = EINVAL;
+
+ if (frame->root->pid != GF_CLIENT_PID_SCRUB) {
+ gf_smsg(this->name, GF_LOG_ERROR, 0, BRS_MSG_NON_SCRUB_BAD_OBJ_MARK,
+ "gfid=%s", uuid_utoa(fd->inode->gfid), NULL);
+ goto unwind;
+ }
+
+ local = br_stub_alloc_local(this);
+ if (!local) {
+ gf_smsg(this->name, GF_LOG_ERROR, 0, BRS_MSG_ALLOC_MEM_FAILED,
+ "fsetxattr gfid=%s", uuid_utoa(fd->inode->gfid), NULL);
+ op_ret = -1;
+ op_errno = ENOMEM;
+ goto unwind;
+ }
+
+ br_stub_fill_local(local, NULL, fd, fd->inode, fd->inode->gfid,
+ BR_STUB_NO_VERSIONING, 0);
+ frame->local = local;
+
+ STACK_WIND(frame, br_stub_fsetxattr_bad_object_cbk, FIRST_CHILD(this),
+ FIRST_CHILD(this)->fops->fsetxattr, fd, dict, flags, xdata);
+ return 0;
+unwind:
+ STACK_UNWIND_STRICT(fsetxattr, frame, op_ret, op_errno, NULL);
+ return 0;
+}
+
+/**
+ * As of now, versioning is done by the stub (though as a setxattr
+ * operation) as part of inode modification operations such as writev,
+ * truncate, ftruncate. And signing is done by BitD by a fsetxattr call.
+ * So any kind of setxattr coming on the versioning and the signing xattr is
+ * not allowed (i.e. BITROT_CURRENT_VERSION_KEY and BITROT_SIGNING_VERSION_KEY).
+ * In future if BitD/scrubber are allowed to change the versioning
+ * xattrs (though I cannot see a reason for it as of now), then the below
+ * function can be modified to block setxattr on version for only applications.
+ *
+ * NOTE: BitD sends sign request on GLUSTERFS_SET_OBJECT_SIGNATURE key.
+ * BITROT_SIGNING_VERSION_KEY is the xattr used to save the signature.
+ *
+ */
+static int32_t
+br_stub_handle_internal_xattr(call_frame_t *frame, xlator_t *this, fd_t *fd,
+ char *key)
+{
+ int32_t op_ret = -1;
+ int32_t op_errno = EINVAL;
+
+ gf_smsg(this->name, GF_LOG_ERROR, 0, BRS_MSG_SET_INTERNAL_XATTR,
+ "setxattr key=%s", key, "inode-gfid=%s", uuid_utoa(fd->inode->gfid),
+ NULL);
+
+ STACK_UNWIND_STRICT(fsetxattr, frame, op_ret, op_errno, NULL);
+ return 0;
+}
+
+static void
+br_stub_dump_xattr(xlator_t *this, dict_t *dict, int *op_errno)
+{
+ char *format = "(%s:%s)";
+ char *dump = NULL;
+
+ dump = GF_CALLOC(1, BR_STUB_DUMP_STR_SIZE, gf_br_stub_mt_misc);
+ if (!dump) {
+ *op_errno = ENOMEM;
+ goto out;
+ }
+ dict_dump_to_str(dict, dump, BR_STUB_DUMP_STR_SIZE, format);
+ gf_smsg(this->name, GF_LOG_ERROR, 0, BRS_MSG_SET_INTERNAL_XATTR,
+ "fsetxattr dump=%s", dump, NULL);
+out:
+ if (dump) {
+ GF_FREE(dump);
+ }
+ return;
+}
+
+int
+br_stub_fsetxattr(call_frame_t *frame, xlator_t *this, fd_t *fd, dict_t *dict,
+ int flags, dict_t *xdata)
+{
+ int32_t ret = 0;
+ uint32_t val = 0;
+ br_isignature_t *sign = NULL;
+ br_stub_private_t *priv = NULL;
+ int32_t op_ret = -1;
+ int32_t op_errno = EINVAL;
+
+ priv = this->private;
+
+ if ((frame->root->pid != GF_CLIENT_PID_BITD &&
+ frame->root->pid != GF_CLIENT_PID_SCRUB) &&
+ br_stub_internal_xattr(dict)) {
+ br_stub_dump_xattr(this, dict, &op_errno);
+ goto unwind;
+ }
+
+ if (!priv->do_versioning)
+ goto wind;
+
+ if (!IA_ISREG(fd->inode->ia_type))
+ goto wind;
+
+ /* object signature request */
+ ret = dict_get_bin(dict, GLUSTERFS_SET_OBJECT_SIGNATURE, (void **)&sign);
+ if (!ret) {
+ gf_msg_debug(this->name, 0, "got SIGNATURE request on %s",
+ uuid_utoa(fd->inode->gfid));
+ br_stub_handle_object_signature(frame, this, fd, dict, sign, xdata);
+ goto done;
+ }
+
+ /* signing xattr */
+ if (dict_get(dict, BITROT_SIGNING_VERSION_KEY)) {
+ br_stub_handle_internal_xattr(frame, this, fd,
+ BITROT_SIGNING_VERSION_KEY);
+ goto done;
+ }
+
+ /* version xattr */
+ if (dict_get(dict, BITROT_CURRENT_VERSION_KEY)) {
+ br_stub_handle_internal_xattr(frame, this, fd,
+ BITROT_CURRENT_VERSION_KEY);
+ goto done;
+ }
+
+ if (dict_get(dict, GLUSTERFS_GET_OBJECT_SIGNATURE)) {
+ br_stub_handle_internal_xattr(frame, this, fd,
+ GLUSTERFS_GET_OBJECT_SIGNATURE);
+ goto done;
+ }
+
+ /* object reopen request */
+ ret = dict_get_uint32(dict, BR_REOPEN_SIGN_HINT_KEY, &val);
+ if (!ret) {
+ br_stub_handle_object_reopen(frame, this, fd, val);
+ goto done;
+ }
+
+ /* handle bad object */
+ if (dict_get(dict, BITROT_OBJECT_BAD_KEY)) {
+ br_stub_handle_bad_object_key(frame, this, fd, dict, flags, xdata);
+ goto done;
+ }
+
+wind:
+ STACK_WIND(frame, default_fsetxattr_cbk, FIRST_CHILD(this),
+ FIRST_CHILD(this)->fops->fsetxattr, fd, dict, flags, xdata);
+ return 0;
+
+unwind:
+ STACK_UNWIND_STRICT(fsetxattr, frame, op_ret, op_errno, NULL);
+
+done:
+ return 0;
+}
+
+/**
+ * Currently BitD and scrubber are doing fsetxattr to either sign the object
+ * or to mark it as bad. Hence setxattr on any of those keys is denied directly
+ * without checking from where the fop is coming.
+ * Later, if BitD or Scrubber does setxattr of those keys, then appropriate
+ * check has to be added below.
+ */
+int
+br_stub_setxattr(call_frame_t *frame, xlator_t *this, loc_t *loc, dict_t *dict,
+ int flags, dict_t *xdata)
+{
+ int32_t op_ret = -1;
+ int32_t op_errno = EINVAL;
+
+ if (br_stub_internal_xattr(dict)) {
+ br_stub_dump_xattr(this, dict, &op_errno);
+ goto unwind;
+ }
+
+ STACK_WIND_TAIL(frame, FIRST_CHILD(this), FIRST_CHILD(this)->fops->setxattr,
+ loc, dict, flags, xdata);
+ return 0;
+unwind:
+ STACK_UNWIND_STRICT(setxattr, frame, op_ret, op_errno, NULL);
+ return 0;
+}
+
+/** }}} */
+
+/** {{{ */
+
+/* {f}removexattr() */
+
+int32_t
+br_stub_removexattr(call_frame_t *frame, xlator_t *this, loc_t *loc,
+ const char *name, dict_t *xdata)
+{
+ int32_t op_ret = -1;
+ int32_t op_errno = EINVAL;
+
+ if (!strcmp(BITROT_OBJECT_BAD_KEY, name) ||
+ !strcmp(BITROT_SIGNING_VERSION_KEY, name) ||
+ !strcmp(BITROT_CURRENT_VERSION_KEY, name)) {
+ gf_smsg(this->name, GF_LOG_WARNING, 0, BRS_MSG_REMOVE_INTERNAL_XATTR,
+ "name=%s", name, "file-path=%s", loc->path, NULL);
+ goto unwind;
+ }
+
+ STACK_WIND_TAIL(frame, FIRST_CHILD(this),
+ FIRST_CHILD(this)->fops->removexattr, loc, name, xdata);
+ return 0;
+unwind:
+ STACK_UNWIND_STRICT(removexattr, frame, op_ret, op_errno, NULL);
+ return 0;
+}
+
+int32_t
+br_stub_fremovexattr(call_frame_t *frame, xlator_t *this, fd_t *fd,
+ const char *name, dict_t *xdata)
+{
+ int32_t op_ret = -1;
+ int32_t op_errno = EINVAL;
+
+ if (!strcmp(BITROT_OBJECT_BAD_KEY, name) ||
+ !strcmp(BITROT_SIGNING_VERSION_KEY, name) ||
+ !strcmp(BITROT_CURRENT_VERSION_KEY, name)) {
+ gf_smsg(this->name, GF_LOG_WARNING, 0, BRS_MSG_REMOVE_INTERNAL_XATTR,
+ "name=%s", name, "inode-gfid=%s", uuid_utoa(fd->inode->gfid),
+ NULL);
+ goto unwind;
+ }
+
+ STACK_WIND_TAIL(frame, FIRST_CHILD(this),
+ FIRST_CHILD(this)->fops->fremovexattr, fd, name, xdata);
+ return 0;
+unwind:
+ STACK_UNWIND_STRICT(fremovexattr, frame, op_ret, op_errno, NULL);
+ return 0;
+}
+
+/** }}} */
+
+/** {{{ */
+
+/* {f}getxattr() */
+
+int
+br_stub_listxattr_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
+ int op_ret, int op_errno, dict_t *xattr, dict_t *xdata)
+{
+ if (op_ret < 0)
+ goto unwind;
+
+ br_stub_remove_vxattrs(xattr, _gf_true);
+
+unwind:
+ STACK_UNWIND_STRICT(getxattr, frame, op_ret, op_errno, xattr, xdata);
+ return 0;
+}
+
+/**
+ * ONE SHOT CRAWLER from BitD signs the objects that it encounters while
+ * crawling, if the object is identified as stale by the stub. Stub follows
+ * the below logic to mark an object as stale or not.
+ * If the ongoing version and the signed_version match, then the object is not
+ * stale. Just return. Otherwise if they does not match, then it means one
+ * of the below things.
+ * 1) If the inode does not need write back of the version and the sign state is
+ * is NORMAL, then some active i/o is going on the object. So skip it.
+ * A notification will be sent to trigger the sign once the release is
+ * received on the object.
+ * 2) If inode does not need writeback of the version and the sign state is
+ * either reopen wait or quick sign, then it means:
+ * A) BitD restarted and it is not sure whether the object it encountered
+ * while crawling is in its timer wheel or not. Since there is no way to
+ * scan the timer wheel as of now, ONE SHOT CRAWLER just goes ahead and
+ * signs the object. Since the inode does not need writeback, version will
+ * not be incremented and directly the object will be signed.
+ * 3) If the inode needs writeback, then it means the inode was forgotten after
+ * the versioning and it has to be signed now.
+ *
+ * This is the algorithm followed:
+ * if (ongoing_version == signed_version); then
+ * object_is_not_stale;
+ * return;
+ * else; then
+ * if (!inode_needs_writeback && inode_sign_state != NORMAL); then
+ * object_is_stale;
+ * if (inode_needs_writeback); then
+ * object_is_stale;
+ *
+ * For SCRUBBER, no need to check for the sign state and inode writeback.
+ * If the ondisk ongoingversion and the ondisk signed version does not match,
+ * then treat the object as stale.
+ */
+char
+br_stub_is_object_stale(xlator_t *this, call_frame_t *frame, inode_t *inode,
+ br_version_t *obuf, br_signature_t *sbuf)
+{
+ uint64_t ctx_addr = 0;
+ br_stub_inode_ctx_t *ctx = NULL;
+ int32_t ret = -1;
+ char stale = 0;
+
+ if (obuf->ongoingversion == sbuf->signedversion)
+ goto out;
+
+ if (frame->root->pid == GF_CLIENT_PID_SCRUB) {
+ stale = 1;
+ goto out;
+ }
+
+ ret = br_stub_get_inode_ctx(this, inode, &ctx_addr);
+ if (ret) {
+ gf_smsg(this->name, GF_LOG_ERROR, 0, BRS_MSG_GET_INODE_CONTEXT_FAILED,
+ "gfid=%s", uuid_utoa(inode->gfid), NULL);
+ goto out;
+ }
+
+ ctx = (br_stub_inode_ctx_t *)(long)ctx_addr;
+
+ LOCK(&inode->lock);
+ {
+ if ((!__br_stub_is_inode_dirty(ctx) &&
+ ctx->info_sign != BR_SIGN_NORMAL) ||
+ __br_stub_is_inode_dirty(ctx))
+ stale = 1;
+ }
+ UNLOCK(&inode->lock);
+
+out:
+ return stale;
+}
+
+int
+br_stub_getxattr_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
+ int op_ret, int op_errno, dict_t *xattr, dict_t *xdata)
+{
+ int32_t ret = 0;
+ size_t totallen = 0;
+ size_t signaturelen = 0;
+ br_stub_private_t *priv = NULL;
+ br_version_t *obuf = NULL;
+ br_signature_t *sbuf = NULL;
+ br_isignature_out_t *sign = NULL;
+ br_vxattr_status_t status;
+ br_stub_local_t *local = NULL;
+ inode_t *inode = NULL;
+ gf_boolean_t bad_object = _gf_false;
+ gf_boolean_t ver_enabled = _gf_false;
+
+ BR_STUB_VER_ENABLED_IN_CALLPATH(frame, ver_enabled);
+ priv = this->private;
+
+ if (op_ret < 0)
+ goto unwind;
+ BR_STUB_VER_COND_GOTO(priv, (!ver_enabled), delkeys);
+
+ if (cookie != (void *)BR_STUB_REQUEST_COOKIE)
+ goto unwind;
+
+ local = frame->local;
+ frame->local = NULL;
+ if (!local) {
+ op_ret = -1;
+ op_errno = EINVAL;
+ goto unwind;
+ }
+ inode = local->u.context.inode;
+
+ op_ret = -1;
+ status = br_version_xattr_state(xattr, &obuf, &sbuf, &bad_object);
+
+ op_errno = EIO;
+ if (bad_object)
+ goto delkeys;
+
+ op_errno = EINVAL;
+ if (status == BR_VXATTR_STATUS_INVALID)
+ goto delkeys;
+
+ op_errno = ENODATA;
+ if ((status == BR_VXATTR_STATUS_MISSING) ||
+ (status == BR_VXATTR_STATUS_UNSIGNED))
+ goto delkeys;
+
+ /**
+ * okay.. we have enough information to satisfy the request,
+ * namely: version and signing extended attribute. what's
+ * pending is the signature length -- that's figured out
+ * indirectly via the size of the _whole_ xattr and the
+ * on-disk signing xattr header size.
+ */
+ op_errno = EINVAL;
+ ret = dict_get_uint32(xattr, BITROT_SIGNING_XATTR_SIZE_KEY,
+ (uint32_t *)&signaturelen);
+ if (ret)
+ goto delkeys;
+
+ signaturelen -= sizeof(br_signature_t);
+ totallen = sizeof(br_isignature_out_t) + signaturelen;
+
+ op_errno = ENOMEM;
+ sign = GF_CALLOC(1, totallen, gf_br_stub_mt_signature_t);
+ if (!sign)
+ goto delkeys;
+
+ sign->time[0] = obuf->timebuf[0];
+ sign->time[1] = obuf->timebuf[1];
+
+ /* Object's dirty state & current signed version */
+ sign->version = sbuf->signedversion;
+ sign->stale = br_stub_is_object_stale(this, frame, inode, obuf, sbuf);
+
+ /* Object's signature */
+ sign->signaturelen = signaturelen;
+ sign->signaturetype = sbuf->signaturetype;
+ (void)memcpy(sign->signature, sbuf->signature, signaturelen);
+
+ op_errno = EINVAL;
+ ret = dict_set_bin(xattr, GLUSTERFS_GET_OBJECT_SIGNATURE, (void *)sign,
+ totallen);
+ if (ret < 0) {
+ GF_FREE(sign);
+ goto delkeys;
+ }
+ op_errno = 0;
+ op_ret = totallen;
+
+delkeys:
+ br_stub_remove_vxattrs(xattr, _gf_true);
+
+unwind:
+ STACK_UNWIND_STRICT(getxattr, frame, op_ret, op_errno, xattr, xdata);
+ br_stub_cleanup_local(local);
+ br_stub_dealloc_local(local);
+ return 0;
+}
+
+static void
+br_stub_send_stub_init_time(call_frame_t *frame, xlator_t *this)
+{
+ int op_ret = 0;
+ int op_errno = 0;
+ dict_t *xattr = NULL;
+ br_stub_init_t stub = {
+ {
+ 0,
+ },
+ };
+ br_stub_private_t *priv = NULL;
+
+ priv = this->private;
+
+ xattr = dict_new();
+ if (!xattr) {
+ op_ret = -1;
+ op_errno = ENOMEM;
+ goto unwind;
+ }
+
+ stub.timebuf[0] = priv->boot[0];
+ stub.timebuf[1] = priv->boot[1];
+ memcpy(stub.export, priv->export, strlen(priv->export) + 1);
+
+ op_ret = dict_set_static_bin(xattr, GLUSTERFS_GET_BR_STUB_INIT_TIME,
+ (void *)&stub, sizeof(br_stub_init_t));
+ if (op_ret < 0) {
+ op_errno = EINVAL;
+ goto unwind;
+ }
+
+ op_ret = sizeof(br_stub_init_t);
+
+unwind:
+ STACK_UNWIND_STRICT(getxattr, frame, op_ret, op_errno, xattr, NULL);
+
+ if (xattr)
+ dict_unref(xattr);
+}
+
+int
+br_stub_getxattr(call_frame_t *frame, xlator_t *this, loc_t *loc,
+ const char *name, dict_t *xdata)
+{
+ void *cookie = NULL;
+ static uuid_t rootgfid = {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1};
+ fop_getxattr_cbk_t cbk = br_stub_getxattr_cbk;
+ int32_t op_ret = -1;
+ int32_t op_errno = EINVAL;
+ br_stub_local_t *local = NULL;
+ br_stub_private_t *priv = NULL;
+
+ GF_VALIDATE_OR_GOTO("bit-rot-stub", this, unwind);
+ GF_VALIDATE_OR_GOTO(this->name, loc, unwind);
+ GF_VALIDATE_OR_GOTO(this->name, this->private, unwind);
+ GF_VALIDATE_OR_GOTO(this->name, loc->inode, unwind);
+
+ if (!name) {
+ cbk = br_stub_listxattr_cbk;
+ goto wind;
+ }
+
+ if (br_stub_is_internal_xattr(name))
+ goto unwind;
+
+ priv = this->private;
+ BR_STUB_VER_NOT_ACTIVE_THEN_GOTO(frame, priv, wind);
+
+ /**
+ * If xattr is node-uuid and the inode is marked bad, return EIO.
+ * Returning EIO would result in AFR to choose correct node-uuid
+ * corresponding to the subvolume * where the good copy of the
+ * file resides.
+ */
+ if (IA_ISREG(loc->inode->ia_type) && XATTR_IS_NODE_UUID(name) &&
+ br_stub_check_bad_object(this, loc->inode, &op_ret, &op_errno)) {
+ goto unwind;
+ }
+
+ /**
+ * this special extended attribute is allowed only on root
+ */
+ if (name &&
+ (strncmp(name, GLUSTERFS_GET_BR_STUB_INIT_TIME,
+ sizeof(GLUSTERFS_GET_BR_STUB_INIT_TIME) - 1) == 0) &&
+ ((gf_uuid_compare(loc->gfid, rootgfid) == 0) ||
+ (gf_uuid_compare(loc->inode->gfid, rootgfid) == 0))) {
+ BR_STUB_RESET_LOCAL_NULL(frame);
+ br_stub_send_stub_init_time(frame, this);
+ return 0;
+ }
+
+ if (!IA_ISREG(loc->inode->ia_type))
+ goto wind;
+
+ if (name && (strncmp(name, GLUSTERFS_GET_OBJECT_SIGNATURE,
+ sizeof(GLUSTERFS_GET_OBJECT_SIGNATURE) - 1) == 0)) {
+ cookie = (void *)BR_STUB_REQUEST_COOKIE;
+
+ local = br_stub_alloc_local(this);
+ if (!local) {
+ op_ret = -1;
+ op_errno = ENOMEM;
+ goto unwind;
+ }
+
+ br_stub_fill_local(local, NULL, NULL, loc->inode, loc->inode->gfid,
+ BR_STUB_NO_VERSIONING, 0);
+ frame->local = local;
+ }
+
+wind:
+ STACK_WIND_COOKIE(frame, cbk, cookie, FIRST_CHILD(this),
+ FIRST_CHILD(this)->fops->getxattr, loc, name, xdata);
+ return 0;
+unwind:
+ BR_STUB_RESET_LOCAL_NULL(frame);
+ STACK_UNWIND_STRICT(getxattr, frame, op_ret, op_errno, NULL, NULL);
+ return 0;
+}
+
+int
+br_stub_fgetxattr(call_frame_t *frame, xlator_t *this, fd_t *fd,
+ const char *name, dict_t *xdata)
+{
+ void *cookie = NULL;
+ static uuid_t rootgfid = {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1};
+ fop_fgetxattr_cbk_t cbk = br_stub_getxattr_cbk;
+ int32_t op_ret = -1;
+ int32_t op_errno = EINVAL;
+ br_stub_local_t *local = NULL;
+ br_stub_private_t *priv = NULL;
+
+ priv = this->private;
+
+ if (!name) {
+ cbk = br_stub_listxattr_cbk;
+ goto wind;
+ }
+
+ if (br_stub_is_internal_xattr(name))
+ goto unwind;
+
+ BR_STUB_VER_NOT_ACTIVE_THEN_GOTO(frame, priv, wind);
+
+ /**
+ * If xattr is node-uuid and the inode is marked bad, return EIO.
+ * Returning EIO would result in AFR to choose correct node-uuid
+ * corresponding to the subvolume * where the good copy of the
+ * file resides.
+ */
+ if (IA_ISREG(fd->inode->ia_type) && XATTR_IS_NODE_UUID(name) &&
+ br_stub_check_bad_object(this, fd->inode, &op_ret, &op_errno)) {
+ goto unwind;
+ }
+
+ /**
+ * this special extended attribute is allowed only on root
+ */
+ if (name &&
+ (strncmp(name, GLUSTERFS_GET_BR_STUB_INIT_TIME,
+ sizeof(GLUSTERFS_GET_BR_STUB_INIT_TIME) - 1) == 0) &&
+ (gf_uuid_compare(fd->inode->gfid, rootgfid) == 0)) {
+ BR_STUB_RESET_LOCAL_NULL(frame);
+ br_stub_send_stub_init_time(frame, this);
+ return 0;
+ }
+
+ if (!IA_ISREG(fd->inode->ia_type))
+ goto wind;
+
+ if (name && (strncmp(name, GLUSTERFS_GET_OBJECT_SIGNATURE,
+ sizeof(GLUSTERFS_GET_OBJECT_SIGNATURE) - 1) == 0)) {
+ cookie = (void *)BR_STUB_REQUEST_COOKIE;
+
+ local = br_stub_alloc_local(this);
+ if (!local) {
+ op_ret = -1;
+ op_errno = ENOMEM;
+ goto unwind;
+ }
+
+ br_stub_fill_local(local, NULL, fd, fd->inode, fd->inode->gfid,
+ BR_STUB_NO_VERSIONING, 0);
+ frame->local = local;
+ }
+
+wind:
+ STACK_WIND_COOKIE(frame, cbk, cookie, FIRST_CHILD(this),
+ FIRST_CHILD(this)->fops->fgetxattr, fd, name, xdata);
+ return 0;
+unwind:
+ BR_STUB_RESET_LOCAL_NULL(frame);
+ STACK_UNWIND_STRICT(fgetxattr, frame, op_ret, op_errno, NULL, NULL);
+ return 0;
+}
+
+int32_t
+br_stub_readv(call_frame_t *frame, xlator_t *this, fd_t *fd, size_t size,
+ off_t offset, uint32_t flags, dict_t *xdata)
+{
+ int32_t op_ret = -1;
+ int32_t op_errno = EINVAL;
+ int32_t ret = -1;
+ br_stub_private_t *priv = NULL;
+
+ GF_VALIDATE_OR_GOTO("bit-rot-stub", this, unwind);
+ GF_VALIDATE_OR_GOTO(this->name, frame, unwind);
+ GF_VALIDATE_OR_GOTO(this->name, this->private, unwind);
+ GF_VALIDATE_OR_GOTO(this->name, fd, unwind);
+ GF_VALIDATE_OR_GOTO(this->name, fd->inode, unwind);
+
+ priv = this->private;
+ if (!priv->do_versioning)
+ goto wind;
+
+ ret = br_stub_check_bad_object(this, fd->inode, &op_ret, &op_errno);
+ if (ret)
+ goto unwind;
+
+wind:
+ STACK_WIND_TAIL(frame, FIRST_CHILD(this), FIRST_CHILD(this)->fops->readv,
+ fd, size, offset, flags, xdata);
+ return 0;
+
+unwind:
+ STACK_UNWIND_STRICT(readv, frame, op_ret, op_errno, NULL, 0, NULL, NULL,
+ NULL);
+ return 0;
+}
+
+/**
+ * The first write response on the first fd in the list of fds will set
+ * the flag to indicate that the inode is modified. The subsequent write
+ * respnses coming on either the first fd or some other fd will not change
+ * the fd. The inode-modified flag is unset only upon release of all the
+ * fds.
+ */
+int32_t
+br_stub_writev_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
+ int32_t op_ret, int32_t op_errno, struct iatt *prebuf,
+ struct iatt *postbuf, dict_t *xdata)
+{
+ int32_t ret = 0;
+ br_stub_local_t *local = NULL;
+
+ local = frame->local;
+ frame->local = NULL;
+
+ if (op_ret < 0)
+ goto unwind;
+
+ ret = br_stub_mark_inode_modified(this, local);
+ if (ret) {
+ op_ret = -1;
+ op_errno = EINVAL;
+ }
+
+unwind:
+ STACK_UNWIND_STRICT(writev, frame, op_ret, op_errno, prebuf, postbuf,
+ xdata);
+
+ br_stub_cleanup_local(local);
+ br_stub_dealloc_local(local);
+
+ return 0;
+}
+
+int32_t
+br_stub_writev_resume(call_frame_t *frame, xlator_t *this, fd_t *fd,
+ struct iovec *vector, int32_t count, off_t offset,
+ uint32_t flags, struct iobref *iobref, dict_t *xdata)
+{
+ STACK_WIND(frame, br_stub_writev_cbk, FIRST_CHILD(this),
+ FIRST_CHILD(this)->fops->writev, fd, vector, count, offset,
+ flags, iobref, xdata);
+ return 0;
+}
+
+/**
+ * This is probably the most crucial part about the whole versioning thing.
+ * There's absolutely no differentiation as such between an anonymous fd
+ * and a regular fd except the fd context initialization. Object versioning
+ * is performed when the inode is dirty. Parallel write operations are no
+ * special with each write performing object versioning followed by marking
+ * the inode as non-dirty (synced). This is followed by the actual operation
+ * (writev() in this case) which on a success marks the inode as modified.
+ * This prevents signing of objects that have not been modified.
+ */
+int32_t
+br_stub_writev(call_frame_t *frame, xlator_t *this, fd_t *fd,
+ struct iovec *vector, int32_t count, off_t offset,
+ uint32_t flags, struct iobref *iobref, dict_t *xdata)
+{
+ call_stub_t *stub = NULL;
+ int32_t op_ret = -1;
+ int32_t op_errno = EINVAL;
+ gf_boolean_t inc_version = _gf_false;
+ gf_boolean_t modified = _gf_false;
+ br_stub_inode_ctx_t *ctx = NULL;
+ int32_t ret = -1;
+ fop_writev_cbk_t cbk = default_writev_cbk;
+ br_stub_local_t *local = NULL;
+ br_stub_private_t *priv = NULL;
+
+ GF_VALIDATE_OR_GOTO("bit-rot-stub", this, unwind);
+ GF_VALIDATE_OR_GOTO(this->name, this->private, unwind);
+ GF_VALIDATE_OR_GOTO(this->name, frame, unwind);
+ GF_VALIDATE_OR_GOTO(this->name, fd, unwind);
+
+ priv = this->private;
+ if (!priv->do_versioning)
+ goto wind;
+
+ ret = br_stub_need_versioning(this, fd, &inc_version, &modified, &ctx);
+ if (ret)
+ goto unwind;
+
+ ret = br_stub_check_bad_object(this, fd->inode, &op_ret, &op_errno);
+ if (ret)
+ goto unwind;
+
+ /**
+ * The inode is not dirty and also witnessed at least one successful
+ * modification operation. Therefore, subsequent operations need not
+ * perform any special tracking.
+ */
+ if (!inc_version && modified)
+ goto wind;
+
+ /**
+ * okay.. so, either the inode needs versioning or the modification
+ * needs to be tracked. ->cbk is set to the appropriate callback
+ * routine for this.
+ * NOTE: ->local needs to be deallocated on failures from here on.
+ */
+ ret = br_stub_versioning_prep(frame, this, fd, ctx);
+ if (ret)
+ goto unwind;
+
+ local = frame->local;
+ if (!inc_version) {
+ br_stub_fill_local(local, NULL, fd, fd->inode, fd->inode->gfid,
+ BR_STUB_NO_VERSIONING, 0);
+ cbk = br_stub_writev_cbk;
+ goto wind;
+ }
+
+ stub = fop_writev_stub(frame, br_stub_writev_resume, fd, vector, count,
+ offset, flags, iobref, xdata);
+
+ if (!stub) {
+ gf_smsg(this->name, GF_LOG_ERROR, 0, BRS_MSG_STUB_ALLOC_FAILED,
+ "write gfid=%s", uuid_utoa(fd->inode->gfid), NULL);
+ goto cleanup_local;
+ }
+
+ /* Perform Versioning */
+ return br_stub_perform_incversioning(this, frame, stub, fd, ctx);
+
+wind:
+ STACK_WIND(frame, cbk, FIRST_CHILD(this), FIRST_CHILD(this)->fops->writev,
+ fd, vector, count, offset, flags, iobref, xdata);
+ return 0;
+
+cleanup_local:
+ br_stub_cleanup_local(local);
+ br_stub_dealloc_local(local);
+
+unwind:
+ frame->local = NULL;
+ STACK_UNWIND_STRICT(writev, frame, op_ret, op_errno, NULL, NULL, NULL);
+
+ return 0;
+}
+
+int32_t
+br_stub_ftruncate_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
+ int32_t op_ret, int32_t op_errno, struct iatt *prebuf,
+ struct iatt *postbuf, dict_t *xdata)
+{
+ int32_t ret = -1;
+ br_stub_local_t *local = NULL;
+
+ local = frame->local;
+ frame->local = NULL;
+
+ if (op_ret < 0)
+ goto unwind;
+
+ ret = br_stub_mark_inode_modified(this, local);
+ if (ret) {
+ op_ret = -1;
+ op_errno = EINVAL;
+ }
+
+unwind:
+ STACK_UNWIND_STRICT(ftruncate, frame, op_ret, op_errno, prebuf, postbuf,
+ xdata);
+
+ br_stub_cleanup_local(local);
+ br_stub_dealloc_local(local);
+
+ return 0;
+}
+
+int32_t
+br_stub_ftruncate_resume(call_frame_t *frame, xlator_t *this, fd_t *fd,
+ off_t offset, dict_t *xdata)
+{
+ STACK_WIND(frame, br_stub_ftruncate_cbk, FIRST_CHILD(this),
+ FIRST_CHILD(this)->fops->ftruncate, fd, offset, xdata);
+ return 0;
+}
+
+/* c.f. br_stub_writev() for explanation */
+int32_t
+br_stub_ftruncate(call_frame_t *frame, xlator_t *this, fd_t *fd, off_t offset,
+ dict_t *xdata)
+{
+ br_stub_local_t *local = NULL;
+ call_stub_t *stub = NULL;
+ int32_t op_ret = -1;
+ int32_t op_errno = EINVAL;
+ gf_boolean_t inc_version = _gf_false;
+ gf_boolean_t modified = _gf_false;
+ br_stub_inode_ctx_t *ctx = NULL;
+ int32_t ret = -1;
+ fop_ftruncate_cbk_t cbk = default_ftruncate_cbk;
+ br_stub_private_t *priv = NULL;
+
+ GF_VALIDATE_OR_GOTO("bit-rot-stub", this, unwind);
+ GF_VALIDATE_OR_GOTO(this->name, this->private, unwind);
+ GF_VALIDATE_OR_GOTO(this->name, frame, unwind);
+ GF_VALIDATE_OR_GOTO(this->name, fd, unwind);
+
+ priv = this->private;
+ if (!priv->do_versioning)
+ goto wind;
+
+ ret = br_stub_need_versioning(this, fd, &inc_version, &modified, &ctx);
+ if (ret)
+ goto unwind;
+
+ ret = br_stub_check_bad_object(this, fd->inode, &op_ret, &op_errno);
+ if (ret)
+ goto unwind;
+
+ if (!inc_version && modified)
+ goto wind;
+
+ ret = br_stub_versioning_prep(frame, this, fd, ctx);
+ if (ret)
+ goto unwind;
+
+ local = frame->local;
+ if (!inc_version) {
+ br_stub_fill_local(local, NULL, fd, fd->inode, fd->inode->gfid,
+ BR_STUB_NO_VERSIONING, 0);
+ cbk = br_stub_ftruncate_cbk;
+ goto wind;
+ }
+
+ stub = fop_ftruncate_stub(frame, br_stub_ftruncate_resume, fd, offset,
+ xdata);
+ if (!stub) {
+ gf_smsg(this->name, GF_LOG_ERROR, 0, BRS_MSG_STUB_ALLOC_FAILED,
+ "ftruncate gfid=%s", uuid_utoa(fd->inode->gfid), NULL);
+ goto cleanup_local;
+ }
+
+ return br_stub_perform_incversioning(this, frame, stub, fd, ctx);
+
+wind:
+ STACK_WIND(frame, cbk, FIRST_CHILD(this),
+ FIRST_CHILD(this)->fops->ftruncate, fd, offset, xdata);
+ return 0;
+
+cleanup_local:
+ br_stub_cleanup_local(local);
+ br_stub_dealloc_local(local);
+
+unwind:
+ frame->local = NULL;
+ STACK_UNWIND_STRICT(ftruncate, frame, op_ret, op_errno, NULL, NULL, NULL);
+
+ return 0;
+}
+
+int32_t
+br_stub_truncate_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
+ int32_t op_ret, int32_t op_errno, struct iatt *prebuf,
+ struct iatt *postbuf, dict_t *xdata)
+{
+ int32_t ret = 0;
+ br_stub_local_t *local = NULL;
+
+ local = frame->local;
+ frame->local = NULL;
+
+ if (op_ret < 0)
+ goto unwind;
+
+ ret = br_stub_mark_inode_modified(this, local);
+ if (ret) {
+ op_ret = -1;
+ op_errno = EINVAL;
+ }
+
+unwind:
+ STACK_UNWIND_STRICT(truncate, frame, op_ret, op_errno, prebuf, postbuf,
+ xdata);
+ br_stub_cleanup_local(local);
+ br_stub_dealloc_local(local);
+ return 0;
+}
+
+int32_t
+br_stub_truncate_resume(call_frame_t *frame, xlator_t *this, loc_t *loc,
+ off_t offset, dict_t *xdata)
+{
+ br_stub_local_t *local = frame->local;
+
+ fd_unref(local->u.context.fd);
+ STACK_WIND(frame, br_stub_ftruncate_cbk, FIRST_CHILD(this),
+ FIRST_CHILD(this)->fops->truncate, loc, offset, xdata);
+ return 0;
+}
+
+/**
+ * Bit-rot-stub depends heavily on the fd based operations to for doing
+ * versioning and sending notification. It starts tracking the operation
+ * upon getting first fd based modify operation by doing versioning and
+ * sends notification when last fd using which the inode was modified is
+ * released.
+ * But for truncate there is no fd and hence it becomes difficult to do
+ * the versioning and send notification. It is handled by doing versioning
+ * on an anonymous fd. The fd will be valid till the completion of the
+ * truncate call. It guarantees that release on this anonymous fd will happen
+ * after the truncate call and notification is sent after the truncate call.
+ *
+ * c.f. br_writev_cbk() for explanation
+ */
+int32_t
+br_stub_truncate(call_frame_t *frame, xlator_t *this, loc_t *loc, off_t offset,
+ dict_t *xdata)
+{
+ br_stub_local_t *local = NULL;
+ call_stub_t *stub = NULL;
+ int32_t op_ret = -1;
+ int32_t op_errno = EINVAL;
+ gf_boolean_t inc_version = _gf_false;
+ gf_boolean_t modified = _gf_false;
+ br_stub_inode_ctx_t *ctx = NULL;
+ int32_t ret = -1;
+ fd_t *fd = NULL;
+ fop_truncate_cbk_t cbk = default_truncate_cbk;
+ br_stub_private_t *priv = NULL;
+
+ GF_VALIDATE_OR_GOTO("bit-rot-stub", this, unwind);
+ GF_VALIDATE_OR_GOTO(this->name, this->private, unwind);
+ GF_VALIDATE_OR_GOTO(this->name, frame, unwind);
+ GF_VALIDATE_OR_GOTO(this->name, loc, unwind);
+ GF_VALIDATE_OR_GOTO(this->name, loc->inode, unwind);
+
+ priv = this->private;
+ if (!priv->do_versioning)
+ goto wind;
+
+ fd = fd_anonymous(loc->inode);
+ if (!fd) {
+ gf_smsg(this->name, GF_LOG_ERROR, 0, BRS_MSG_CREATE_ANONYMOUS_FD_FAILED,
+ "inode-gfid=%s", uuid_utoa(loc->inode->gfid), NULL);
+ goto unwind;
+ }
+
+ ret = br_stub_need_versioning(this, fd, &inc_version, &modified, &ctx);
+ if (ret)
+ goto cleanup_fd;
+
+ ret = br_stub_check_bad_object(this, fd->inode, &op_ret, &op_errno);
+ if (ret)
+ goto unwind;
+
+ if (!inc_version && modified)
+ goto wind;
+
+ ret = br_stub_versioning_prep(frame, this, fd, ctx);
+ if (ret)
+ goto cleanup_fd;
+
+ local = frame->local;
+ if (!inc_version) {
+ br_stub_fill_local(local, NULL, fd, fd->inode, fd->inode->gfid,
+ BR_STUB_NO_VERSIONING, 0);
+ cbk = br_stub_truncate_cbk;
+ goto wind;
+ }
+
+ stub = fop_truncate_stub(frame, br_stub_truncate_resume, loc, offset,
+ xdata);
+ if (!stub) {
+ gf_smsg(this->name, GF_LOG_ERROR, 0, BRS_MSG_STUB_ALLOC_FAILED,
+ "truncate gfid=%s", uuid_utoa(fd->inode->gfid), NULL);
+ goto cleanup_local;
+ }
+
+ return br_stub_perform_incversioning(this, frame, stub, fd, ctx);
+
+wind:
+ STACK_WIND(frame, cbk, FIRST_CHILD(this), FIRST_CHILD(this)->fops->truncate,
+ loc, offset, xdata);
+ if (fd)
+ fd_unref(fd);
+ return 0;
+
+cleanup_local:
+ br_stub_cleanup_local(local);
+ br_stub_dealloc_local(local);
+cleanup_fd:
+ fd_unref(fd);
+unwind:
+ frame->local = NULL;
+ STACK_UNWIND_STRICT(truncate, frame, op_ret, op_errno, NULL, NULL, NULL);
+
+ return 0;
+}
+
+/** }}} */
+
+/** {{{ */
+
+/* open() */
+
+/**
+ * It's probably worth mentioning a bit about why some of the housekeeping
+ * work is done in open() call path, rather than the callback path.
+ * Two (or more) open()'s in parallel can race and lead to a situation
+ * where a release() gets triggered (possibly after a series of write()
+ * calls) when *other* open()'s have still not reached callback path
+ * thereby having an active fd on an inode that is in process of getting
+ * signed with the current version.
+ *
+ * Maintaining fd list in the call path ensures that a release() would
+ * not be triggered if an open() call races ahead (followed by a close())
+ * threby finding non-empty fd list.
+ */
+
+int
+br_stub_open(call_frame_t *frame, xlator_t *this, loc_t *loc, int32_t flags,
+ fd_t *fd, dict_t *xdata)
+{
+ int32_t ret = -1;
+ br_stub_inode_ctx_t *ctx = NULL;
+ uint64_t ctx_addr = 0;
+ int32_t op_ret = -1;
+ int32_t op_errno = EINVAL;
+ br_stub_private_t *priv = NULL;
+ unsigned long version = BITROT_DEFAULT_CURRENT_VERSION;
+
+ GF_VALIDATE_OR_GOTO("bit-rot-stub", this, unwind);
+ GF_VALIDATE_OR_GOTO(this->name, this->private, unwind);
+ GF_VALIDATE_OR_GOTO(this->name, loc, unwind);
+ GF_VALIDATE_OR_GOTO(this->name, fd, unwind);
+ GF_VALIDATE_OR_GOTO(this->name, fd->inode, unwind);
+
+ priv = this->private;
+
+ if (!priv->do_versioning)
+ goto wind;
+
+ ret = br_stub_get_inode_ctx(this, fd->inode, &ctx_addr);
+ if (ret) {
+ ret = br_stub_init_inode_versions(this, fd, fd->inode, version,
+ _gf_true, _gf_false, &ctx_addr);
+ if (ret) {
+ gf_smsg(this->name, GF_LOG_ERROR, 0,
+ BRS_MSG_GET_INODE_CONTEXT_FAILED, "path=%s", loc->path,
+ "gfid=%s", uuid_utoa(fd->inode->gfid), NULL);
+ goto unwind;
+ }
+ }
+
+ ctx = (br_stub_inode_ctx_t *)(long)ctx_addr;
+
+ ret = br_stub_check_bad_object(this, fd->inode, &op_ret, &op_errno);
+ if (ret)
+ goto unwind;
+
+ if (frame->root->pid == GF_CLIENT_PID_SCRUB)
+ goto wind;
+
+ if (flags == O_RDONLY)
+ goto wind;
+
+ ret = br_stub_add_fd_to_inode(this, fd, ctx);
+ if (ret) {
+ gf_smsg(this->name, GF_LOG_ERROR, 0, BRS_MSG_ADD_FD_TO_LIST_FAILED,
+ "gfid=%s", uuid_utoa(fd->inode->gfid), NULL);
+ goto unwind;
+ }
+
+wind:
+ STACK_WIND(frame, default_open_cbk, FIRST_CHILD(this),
+ FIRST_CHILD(this)->fops->open, loc, flags, fd, xdata);
+ return 0;
+unwind:
+ STACK_UNWIND_STRICT(open, frame, op_ret, op_errno, NULL, NULL);
+ return 0;
+}
+
+/** }}} */
+
+/** {{{ */
+
+/* creat() */
+
+/**
+ * This routine registers a release callback for the given fd and adds the
+ * fd to the inode context fd tracking list.
+ */
+int32_t
+br_stub_add_fd_to_inode(xlator_t *this, fd_t *fd, br_stub_inode_ctx_t *ctx)
+{
+ int32_t ret = -1;
+ br_stub_fd_t *br_stub_fd = NULL;
+
+ ret = br_stub_require_release_call(this, fd, &br_stub_fd);
+ if (ret) {
+ gf_smsg(this->name, GF_LOG_ERROR, 0, BRS_MSG_SET_FD_CONTEXT_FAILED,
+ "gfid=%s", uuid_utoa(fd->inode->gfid), NULL);
+ goto out;
+ }
+
+ LOCK(&fd->inode->lock);
+ {
+ list_add_tail(&ctx->fd_list, &br_stub_fd->list);
+ }
+ UNLOCK(&fd->inode->lock);
+
+ ret = 0;
+
+out:
+ return ret;
+}
+
+int
+br_stub_create_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
+ int op_ret, int op_errno, fd_t *fd, inode_t *inode,
+ struct iatt *stbuf, struct iatt *preparent,
+ struct iatt *postparent, dict_t *xdata)
+{
+ int32_t ret = 0;
+ uint64_t ctx_addr = 0;
+ br_stub_inode_ctx_t *ctx = NULL;
+ unsigned long version = BITROT_DEFAULT_CURRENT_VERSION;
+ br_stub_private_t *priv = NULL;
+
+ priv = this->private;
+
+ if (op_ret < 0)
+ goto unwind;
+
+ if (!priv->do_versioning)
+ goto unwind;
+
+ ret = br_stub_get_inode_ctx(this, fd->inode, &ctx_addr);
+ if (ret < 0) {
+ ret = br_stub_init_inode_versions(this, fd, inode, version, _gf_true,
+ _gf_false, &ctx_addr);
+ if (ret) {
+ op_ret = -1;
+ op_errno = EINVAL;
+ }
+ } else {
+ ctx = (br_stub_inode_ctx_t *)(long)ctx_addr;
+ ret = br_stub_add_fd_to_inode(this, fd, ctx);
+ }
+
+unwind:
+ STACK_UNWIND_STRICT(create, frame, op_ret, op_errno, fd, inode, stbuf,
+ preparent, postparent, xdata);
+ return 0;
+}
+
+int
+br_stub_create(call_frame_t *frame, xlator_t *this, loc_t *loc, int32_t flags,
+ mode_t mode, mode_t umask, fd_t *fd, dict_t *xdata)
+{
+ GF_VALIDATE_OR_GOTO("bit-rot-stub", this, unwind);
+ GF_VALIDATE_OR_GOTO(this->name, loc, unwind);
+ GF_VALIDATE_OR_GOTO(this->name, loc->inode, unwind);
+ GF_VALIDATE_OR_GOTO(this->name, fd, unwind);
+ GF_VALIDATE_OR_GOTO(this->name, fd->inode, unwind);
+
+ STACK_WIND(frame, br_stub_create_cbk, FIRST_CHILD(this),
+ FIRST_CHILD(this)->fops->create, loc, flags, mode, umask, fd,
+ xdata);
+ return 0;
+unwind:
+ STACK_UNWIND_STRICT(create, frame, -1, EINVAL, NULL, NULL, NULL, NULL, NULL,
+ NULL);
+ return 0;
+}
+
+int
+br_stub_mknod_cbk(call_frame_t *frame, void *cookie, xlator_t *this, int op_ret,
+ int op_errno, inode_t *inode, struct iatt *stbuf,
+ struct iatt *preparent, struct iatt *postparent,
+ dict_t *xdata)
+{
+ int32_t ret = -1;
+ unsigned long version = BITROT_DEFAULT_CURRENT_VERSION;
+ br_stub_private_t *priv = NULL;
+
+ priv = this->private;
+
+ if (op_ret < 0)
+ goto unwind;
+
+ if (!priv->do_versioning)
+ goto unwind;
+
+ ret = br_stub_init_inode_versions(this, NULL, inode, version, _gf_true,
+ _gf_false, NULL);
+ /**
+ * Like lookup, if init_inode_versions fail, return EINVAL
+ */
+ if (ret) {
+ op_ret = -1;
+ op_errno = EINVAL;
+ }
+
+unwind:
+ STACK_UNWIND_STRICT(mknod, frame, op_ret, op_errno, inode, stbuf, preparent,
+ postparent, xdata);
+ return 0;
+}
+
+int
+br_stub_mknod(call_frame_t *frame, xlator_t *this, loc_t *loc, mode_t mode,
+ dev_t dev, mode_t umask, dict_t *xdata)
+{
+ GF_VALIDATE_OR_GOTO("bit-rot-stub", this, unwind);
+ GF_VALIDATE_OR_GOTO(this->name, loc, unwind);
+ GF_VALIDATE_OR_GOTO(this->name, loc->inode, unwind);
+
+ STACK_WIND(frame, br_stub_mknod_cbk, FIRST_CHILD(this),
+ FIRST_CHILD(this)->fops->mknod, loc, mode, dev, umask, xdata);
+ return 0;
+unwind:
+ STACK_UNWIND_STRICT(mknod, frame, -1, EINVAL, NULL, NULL, NULL, NULL, NULL);
+ return 0;
+}
+
+/** }}} */
+
+/**
+ * As of now, only lookup searches for bad object xattr and marks the
+ * object as bad in its inode context if the xattr is present. But there
+ * is a possibility that, at the time of the lookup the object was not
+ * marked bad (i.e. bad object xattr was not set), and later its marked
+ * as bad. In this case, object is not bad, so when a fop such as open or
+ * readv or writev comes on the object, the fop will be sent downward instead
+ * of sending as error upwards.
+ * The solution for this is to do a getxattr for the below list of fops.
+ * lookup, readdirp, open, readv, writev.
+ * But doing getxattr for each of the above fops might be costly.
+ * So another method followed is to catch the bad file marking by the scrubber
+ * and set that info within the object's inode context. In this way getxattr
+ * calls can be avoided and bad objects can be caught instantly. Fetching the
+ * xattr is needed only in lookups when there is a brick restart or inode
+ * forget.
+ *
+ * If the dict (@xattr) is NULL, then how should that be handled? Fail the
+ * lookup operation? Or let it continue with version being initialized to
+ * BITROT_DEFAULT_CURRENT_VERSION. But what if the version was different
+ * on disk (and also a right signature was there), but posix failed to
+ * successfully allocate the dict? Posix does not treat call back xdata
+ * creattion failure as the lookup failure.
+ */
+static int32_t
+br_stub_lookup_version(xlator_t *this, uuid_t gfid, inode_t *inode,
+ dict_t *xattr)
+{
+ unsigned long version = 0;
+ br_version_t *obuf = NULL;
+ br_signature_t *sbuf = NULL;
+ br_vxattr_status_t status;
+ gf_boolean_t bad_object = _gf_false;
+
+ /**
+ * versioning xattrs were requested from POSIX. if available, figure
+ * out the correct version to use in the inode context (start with
+ * the default version if unavailable). As of now versions are not
+ * persisted on-disk. The inode is marked dirty, so that the first
+ * operation (such as write(), etc..) triggers synchronization to
+ * disk.
+ */
+ status = br_version_xattr_state(xattr, &obuf, &sbuf, &bad_object);
+ version = ((status == BR_VXATTR_STATUS_FULL) ||
+ (status == BR_VXATTR_STATUS_UNSIGNED))
+ ? obuf->ongoingversion
+ : BITROT_DEFAULT_CURRENT_VERSION;
+
+ /**
+ * If signature is there, but version is not there then that status is
+ * is treated as INVALID. So in that case, we should not initialize the
+ * inode context with wrong version names etc.
+ */
+ if (status == BR_VXATTR_STATUS_INVALID)
+ return -1;
+
+ return br_stub_init_inode_versions(this, NULL, inode, version, _gf_true,
+ bad_object, NULL);
+}
+
+/** {{{ */
+
+int32_t
+br_stub_opendir(call_frame_t *frame, xlator_t *this, loc_t *loc, fd_t *fd,
+ dict_t *xdata)
+{
+ br_stub_private_t *priv = NULL;
+ br_stub_fd_t *fd_ctx = NULL;
+ int32_t op_ret = -1;
+ int32_t op_errno = EINVAL;
+
+ priv = this->private;
+ if (gf_uuid_compare(fd->inode->gfid, priv->bad_object_dir_gfid))
+ goto normal;
+
+ fd_ctx = br_stub_fd_new();
+ if (!fd_ctx) {
+ op_errno = ENOMEM;
+ goto unwind;
+ }
+
+ fd_ctx->bad_object.dir_eof = -1;
+ fd_ctx->bad_object.dir = sys_opendir(priv->stub_basepath);
+ if (!fd_ctx->bad_object.dir) {
+ op_errno = errno;
+ goto err_freectx;
+ }
+
+ op_ret = br_stub_fd_ctx_set(this, fd, fd_ctx);
+ if (!op_ret)
+ goto unwind;
+
+ sys_closedir(fd_ctx->bad_object.dir);
+
+err_freectx:
+ GF_FREE(fd_ctx);
+unwind:
+ STACK_UNWIND_STRICT(opendir, frame, op_ret, op_errno, fd, NULL);
+ return 0;
+
+normal:
+ STACK_WIND(frame, default_opendir_cbk, FIRST_CHILD(this),
+ FIRST_CHILD(this)->fops->opendir, loc, fd, xdata);
+ return 0;
+}
+
+int32_t
+br_stub_readdir(call_frame_t *frame, xlator_t *this, fd_t *fd, size_t size,
+ off_t off, dict_t *xdata)
+{
+ call_stub_t *stub = NULL;
+ br_stub_private_t *priv = NULL;
+
+ priv = this->private;
+ if (!priv->do_versioning)
+ goto out;
+
+ if (gf_uuid_compare(fd->inode->gfid, priv->bad_object_dir_gfid))
+ goto out;
+ stub = fop_readdir_stub(frame, br_stub_readdir_wrapper, fd, size, off,
+ xdata);
+ if (!stub) {
+ STACK_UNWIND_STRICT(readdir, frame, -1, ENOMEM, NULL, NULL);
+ return 0;
+ }
+ br_stub_worker_enqueue(this, stub);
+ return 0;
+out:
+ STACK_WIND(frame, default_readdir_cbk, FIRST_CHILD(this),
+ FIRST_CHILD(this)->fops->readdir, fd, size, off, xdata);
+ return 0;
+}
+
+int
+br_stub_readdirp_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
+ int op_ret, int op_errno, gf_dirent_t *entries,
+ dict_t *dict)
+{
+ int32_t ret = 0;
+ uint64_t ctxaddr = 0;
+ gf_dirent_t *entry = NULL;
+ br_stub_private_t *priv = NULL;
+ gf_boolean_t ver_enabled = _gf_false;
+
+ BR_STUB_VER_ENABLED_IN_CALLPATH(frame, ver_enabled);
+ priv = this->private;
+ BR_STUB_VER_COND_GOTO(priv, (!ver_enabled), unwind);
+
+ if (op_ret < 0)
+ goto unwind;
+
+ list_for_each_entry(entry, &entries->list, list)
+ {
+ if ((strcmp(entry->d_name, ".") == 0) ||
+ (strcmp(entry->d_name, "..") == 0))
+ continue;
+
+ if (!IA_ISREG(entry->d_stat.ia_type))
+ continue;
+
+ /*
+ * Readdirp for most part is a bulk lookup for all the entries
+ * present in the directory being read. Ideally, for each
+ * entry, the handling should be similar to that of a lookup
+ * callback. But for now, just keeping this as it has been
+ * until now (which means, this comment has been added much
+ * later as part of a change that wanted to send the flag
+ * of true/false to br_stub_remove_vxattrs to indicate whether
+ * the bad-object xattr should be removed from the entry->dict
+ * or not). Until this change, the function br_stub_remove_vxattrs
+ * was just removing all the xattrs associated with bit-rot-stub
+ * (like version, bad-object, signature etc). But, there are
+ * scenarios where we only want to send bad-object xattr and not
+ * others. So this comment is part of that change which also
+ * mentions about another possible change that might be needed
+ * in future.
+ * But for now, adding _gf_true means functionally its same as
+ * what this function was doing before. Just remove all the stub
+ * related xattrs.
+ */
+ ret = br_stub_get_inode_ctx(this, entry->inode, &ctxaddr);
+ if (ret < 0)
+ ctxaddr = 0;
+ if (ctxaddr) { /* already has the context */
+ br_stub_remove_vxattrs(entry->dict, _gf_true);
+ continue;
+ }
+
+ ret = br_stub_lookup_version(this, entry->inode->gfid, entry->inode,
+ entry->dict);
+ br_stub_remove_vxattrs(entry->dict, _gf_true);
+ if (ret) {
+ /**
+ * there's no per-file granularity support in case of
+ * failure. let's fail the entire request for now..
+ */
+ break;
+ }
+ }
+
+ if (ret) {
+ op_ret = -1;
+ op_errno = EINVAL;
+ }
+
+unwind:
+ STACK_UNWIND_STRICT(readdirp, frame, op_ret, op_errno, entries, dict);
+
+ return 0;
+}
+
+int
+br_stub_readdirp(call_frame_t *frame, xlator_t *this, fd_t *fd, size_t size,
+ off_t offset, dict_t *dict)
+{
+ int32_t ret = -1;
+ int op_errno = 0;
+ gf_boolean_t xref = _gf_false;
+ br_stub_private_t *priv = NULL;
+
+ priv = this->private;
+ BR_STUB_VER_NOT_ACTIVE_THEN_GOTO(frame, priv, wind);
+
+ op_errno = ENOMEM;
+ if (!dict) {
+ dict = dict_new();
+ if (!dict)
+ goto unwind;
+ } else {
+ dict = dict_ref(dict);
+ }
+
+ xref = _gf_true;
+
+ op_errno = EINVAL;
+ ret = dict_set_uint32(dict, BITROT_CURRENT_VERSION_KEY, 0);
+ if (ret)
+ goto unwind;
+ ret = dict_set_uint32(dict, BITROT_SIGNING_VERSION_KEY, 0);
+ if (ret)
+ goto unwind;
+ ret = dict_set_uint32(dict, BITROT_OBJECT_BAD_KEY, 0);
+ if (ret)
+ goto unwind;
+
+wind:
+ STACK_WIND(frame, br_stub_readdirp_cbk, FIRST_CHILD(this),
+ FIRST_CHILD(this)->fops->readdirp, fd, size, offset, dict);
+ goto unref_dict;
+
+unwind:
+ if (frame->local == (void *)0x1)
+ frame->local = NULL;
+ STACK_UNWIND_STRICT(readdirp, frame, -1, op_errno, NULL, NULL);
+ return 0;
+
+unref_dict:
+ if (xref)
+ dict_unref(dict);
+ return 0;
+}
+
+/** }}} */
+
+/** {{{ */
+
+/* lookup() */
+
+/**
+ * This function mainly handles the ENOENT error for the bad objects. Though
+ * br_stub_forget () handles removal of the link for the bad object from the
+ * quarantine directory, its better to handle it in lookup as well, where
+ * a failed lookup on a bad object with ENOENT, will trigger deletion of the
+ * link for the bad object from quarantine directory. So whoever comes first
+ * either forget () or lookup () will take care of removing the link.
+ */
+void
+br_stub_handle_lookup_error(xlator_t *this, inode_t *inode, int32_t op_errno)
+{
+ int32_t ret = -1;
+ uint64_t ctx_addr = 0;
+ br_stub_inode_ctx_t *ctx = NULL;
+
+ if (op_errno != ENOENT)
+ goto out;
+
+ if (!inode_is_linked(inode))
+ goto out;
+
+ ret = br_stub_get_inode_ctx(this, inode, &ctx_addr);
+ if (ret)
+ goto out;
+
+ ctx = (br_stub_inode_ctx_t *)(long)ctx_addr;
+
+ LOCK(&inode->lock);
+ {
+ if (__br_stub_is_bad_object(ctx))
+ (void)br_stub_del(this, inode->gfid);
+ }
+ UNLOCK(&inode->lock);
+
+ if (__br_stub_is_bad_object(ctx)) {
+ /* File is not present, might be deleted for recovery,
+ * del the bitrot inode context
+ */
+ ctx_addr = 0;
+ inode_ctx_del(inode, this, &ctx_addr);
+ if (ctx_addr) {
+ ctx = (br_stub_inode_ctx_t *)(long)ctx_addr;
+ GF_FREE(ctx);
+ }
+ }
+
+out:
+ return;
+}
+
+int
+br_stub_lookup_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
+ int op_ret, int op_errno, inode_t *inode, struct iatt *stbuf,
+ dict_t *xattr, struct iatt *postparent)
+{
+ int32_t ret = 0;
+ br_stub_private_t *priv = NULL;
+ gf_boolean_t ver_enabled = _gf_false;
+ gf_boolean_t remove_bad_file_marker = _gf_true;
+
+ BR_STUB_VER_ENABLED_IN_CALLPATH(frame, ver_enabled);
+ priv = this->private;
+
+ if (op_ret < 0) {
+ (void)br_stub_handle_lookup_error(this, inode, op_errno);
+
+ /*
+ * If the lookup error is not ENOENT, then it is better
+ * to send the bad file marker to the higher layer (if
+ * it has been set)
+ */
+ if (op_errno != ENOENT)
+ remove_bad_file_marker = _gf_false;
+ goto delkey;
+ }
+
+ BR_STUB_VER_COND_GOTO(priv, (!ver_enabled), delkey);
+
+ if (!IA_ISREG(stbuf->ia_type))
+ goto unwind;
+
+ /**
+ * If the object is bad, then "bad inode" marker has to be sent back
+ * in resoinse, for revalidated lookups as well. Some xlators such as
+ * quick-read might cache the data in revalidated lookup as fresh
+ * lookup would anyway have sent "bad inode" marker.
+ * In general send bad inode marker for every lookup operation on the
+ * bad object.
+ */
+ if (cookie != (void *)BR_STUB_REQUEST_COOKIE) {
+ ret = br_stub_mark_xdata_bad_object(this, inode, xattr);
+ if (ret) {
+ op_ret = -1;
+ op_errno = EIO;
+ /*
+ * This flag ensures that in the label @delkey below,
+ * bad file marker is not removed from the dictinary,
+ * but other virtual xattrs (such as version, signature)
+ * are removed.
+ */
+ remove_bad_file_marker = _gf_false;
+ }
+ goto delkey;
+ }
+
+ ret = br_stub_lookup_version(this, stbuf->ia_gfid, inode, xattr);
+ if (ret < 0) {
+ op_ret = -1;
+ op_errno = EINVAL;
+ goto delkey;
+ }
+
+ /**
+ * If the object is bad, send "bad inode" marker back in response
+ * for xlator(s) to act accordingly (such as quick-read, etc..)
+ */
+ ret = br_stub_mark_xdata_bad_object(this, inode, xattr);
+ if (ret) {
+ /**
+ * aaha! bad object, but sorry we would not
+ * satisfy the request on allocation failures.
+ */
+ op_ret = -1;
+ op_errno = EIO;
+ goto delkey;
+ }
+
+delkey:
+ br_stub_remove_vxattrs(xattr, remove_bad_file_marker);
+unwind:
+ STACK_UNWIND_STRICT(lookup, frame, op_ret, op_errno, inode, stbuf, xattr,
+ postparent);
+
+ return 0;
+}
+
+int
+br_stub_lookup(call_frame_t *frame, xlator_t *this, loc_t *loc, dict_t *xdata)
+{
+ int32_t ret = 0;
+ int op_errno = 0;
+ void *cookie = NULL;
+ uint64_t ctx_addr = 0;
+ gf_boolean_t xref = _gf_false;
+ br_stub_private_t *priv = NULL;
+ call_stub_t *stub = NULL;
+
+ GF_VALIDATE_OR_GOTO("bit-rot-stub", this, unwind);
+ GF_VALIDATE_OR_GOTO(this->name, loc, unwind);
+ GF_VALIDATE_OR_GOTO(this->name, loc->inode, unwind);
+
+ priv = this->private;
+
+ BR_STUB_VER_NOT_ACTIVE_THEN_GOTO(frame, priv, wind);
+
+ if (!gf_uuid_compare(loc->gfid, priv->bad_object_dir_gfid) ||
+ !gf_uuid_compare(loc->pargfid, priv->bad_object_dir_gfid)) {
+ stub = fop_lookup_stub(frame, br_stub_lookup_wrapper, loc, xdata);
+ if (!stub) {
+ op_errno = ENOMEM;
+ goto unwind;
+ }
+ br_stub_worker_enqueue(this, stub);
+ return 0;
+ }
+
+ ret = br_stub_get_inode_ctx(this, loc->inode, &ctx_addr);
+ if (ret < 0)
+ ctx_addr = 0;
+ if (ctx_addr != 0)
+ goto wind;
+
+ /**
+ * fresh lookup: request version keys from POSIX
+ */
+ op_errno = ENOMEM;
+ if (!xdata) {
+ xdata = dict_new();
+ if (!xdata)
+ goto unwind;
+ } else {
+ xdata = dict_ref(xdata);
+ }
+
+ xref = _gf_true;
+
+ /**
+ * Requesting both xattrs provides a way of sanity checking the
+ * object. Anomaly checking is done in cbk by examining absence
+ * of either or both xattrs.
+ */
+ op_errno = EINVAL;
+ ret = dict_set_uint32(xdata, BITROT_CURRENT_VERSION_KEY, 0);
+ if (ret)
+ goto unwind;
+ ret = dict_set_uint32(xdata, BITROT_SIGNING_VERSION_KEY, 0);
+ if (ret)
+ goto unwind;
+ ret = dict_set_uint32(xdata, BITROT_OBJECT_BAD_KEY, 0);
+ if (ret)
+ goto unwind;
+ cookie = (void *)BR_STUB_REQUEST_COOKIE;
+
+wind:
+ STACK_WIND_COOKIE(frame, br_stub_lookup_cbk, cookie, FIRST_CHILD(this),
+ FIRST_CHILD(this)->fops->lookup, loc, xdata);
+ goto dealloc_dict;
+
+unwind:
+ if (frame->local == (void *)0x1)
+ frame->local = NULL;
+ STACK_UNWIND_STRICT(lookup, frame, -1, op_errno, NULL, NULL, NULL, NULL);
+dealloc_dict:
+ if (xref)
+ dict_unref(xdata);
+ return 0;
+}
+
+/** }}} */
+
+/** {{{ */
+
+/* stat */
+int
+br_stub_stat(call_frame_t *frame, xlator_t *this, loc_t *loc, dict_t *xdata)
+{
+ int32_t ret = 0;
+ int32_t op_ret = -1;
+ int32_t op_errno = EINVAL;
+ br_stub_private_t *priv = NULL;
+
+ priv = this->private;
+
+ if (!priv->do_versioning)
+ goto wind;
+
+ if (!IA_ISREG(loc->inode->ia_type))
+ goto wind;
+
+ ret = br_stub_check_bad_object(this, loc->inode, &op_ret, &op_errno);
+ if (ret)
+ goto unwind;
+
+wind:
+ STACK_WIND_TAIL(frame, FIRST_CHILD(this), FIRST_CHILD(this)->fops->stat,
+ loc, xdata);
+ return 0;
+
+unwind:
+ STACK_UNWIND_STRICT(stat, frame, op_ret, op_errno, NULL, NULL);
+ return 0;
+}
+
+/* fstat */
+int
+br_stub_fstat(call_frame_t *frame, xlator_t *this, fd_t *fd, dict_t *xdata)
+{
+ int32_t ret = 0;
+ int32_t op_ret = -1;
+ int32_t op_errno = EINVAL;
+ br_stub_private_t *priv = NULL;
+
+ priv = this->private;
+
+ if (!priv->do_versioning)
+ goto wind;
+
+ if (!IA_ISREG(fd->inode->ia_type))
+ goto wind;
+
+ ret = br_stub_check_bad_object(this, fd->inode, &op_ret, &op_errno);
+ if (ret)
+ goto unwind;
+
+wind:
+ STACK_WIND_TAIL(frame, FIRST_CHILD(this), FIRST_CHILD(this)->fops->fstat,
+ fd, xdata);
+ return 0;
+
+unwind:
+ STACK_UNWIND_STRICT(fstat, frame, op_ret, op_errno, NULL, NULL);
+ return 0;
+}
+
+/** }}} */
+
+/** {{{ */
+
+/* unlink() */
+
+int
+br_stub_unlink_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
+ int32_t op_ret, int32_t op_errno, struct iatt *preparent,
+ struct iatt *postparent, dict_t *xdata)
+{
+ br_stub_local_t *local = NULL;
+ inode_t *inode = NULL;
+ uint64_t ctx_addr = 0;
+ br_stub_inode_ctx_t *ctx = NULL;
+ int32_t ret = -1;
+ br_stub_private_t *priv = NULL;
+ gf_boolean_t ver_enabled = _gf_false;
+
+ BR_STUB_VER_ENABLED_IN_CALLPATH(frame, ver_enabled);
+ priv = this->private;
+ BR_STUB_VER_COND_GOTO(priv, (!ver_enabled), unwind);
+
+ local = frame->local;
+ frame->local = NULL;
+
+ if (op_ret < 0)
+ goto unwind;
+
+ if (!local) {
+ gf_smsg(this->name, GF_LOG_WARNING, 0, BRS_MSG_NULL_LOCAL, NULL);
+ goto unwind;
+ }
+ inode = local->u.context.inode;
+ if (!IA_ISREG(inode->ia_type))
+ goto unwind;
+
+ ret = br_stub_get_inode_ctx(this, inode, &ctx_addr);
+ if (ret) {
+ /**
+ * If the inode is bad AND context is not there, then there
+ * is a possibility of the gfid of the object being listed
+ * in the quarantine directory and will be shown in the
+ * bad objects list. So continuing with the fop with a
+ * warning log. The entry from the quarantine directory
+ * has to be removed manually. Its not a good idea to fail
+ * the fop, as the object has already been deleted.
+ */
+ gf_smsg(this->name, GF_LOG_WARNING, 0, BRS_MSG_GET_INODE_CONTEXT_FAILED,
+ "inode-gfid=%s", uuid_utoa(inode->gfid), NULL);
+ goto unwind;
+ }
+
+ ctx = (br_stub_inode_ctx_t *)(long)ctx_addr;
+
+ LOCK(&inode->lock);
+ {
+ /**
+ * Ignoring the return value of br_stub_del ().
+ * There is not much that can be done if unlinking
+ * of the entry in the quarantine directory fails.
+ * The failure is logged.
+ */
+ if (__br_stub_is_bad_object(ctx))
+ (void)br_stub_del(this, inode->gfid);
+ }
+ UNLOCK(&inode->lock);
+
+unwind:
+ STACK_UNWIND_STRICT(unlink, frame, op_ret, op_errno, preparent, postparent,
+ xdata);
+ br_stub_cleanup_local(local);
+ br_stub_dealloc_local(local);
+ return 0;
+}
+
+int
+br_stub_unlink(call_frame_t *frame, xlator_t *this, loc_t *loc, int flag,
+ dict_t *xdata)
+{
+ br_stub_local_t *local = NULL;
+ int32_t op_ret = -1;
+ int32_t op_errno = 0;
+ br_stub_private_t *priv = NULL;
+
+ priv = this->private;
+ BR_STUB_VER_NOT_ACTIVE_THEN_GOTO(frame, priv, wind);
+
+ local = br_stub_alloc_local(this);
+ if (!local) {
+ op_ret = -1;
+ op_errno = ENOMEM;
+ gf_smsg(this->name, GF_LOG_ERROR, ENOMEM, BRS_MSG_ALLOC_MEM_FAILED,
+ "local path=%s", loc->path, "gfid=%s",
+ uuid_utoa(loc->inode->gfid), NULL);
+ goto unwind;
+ }
+
+ br_stub_fill_local(local, NULL, NULL, loc->inode, loc->inode->gfid,
+ BR_STUB_NO_VERSIONING, 0);
+
+ frame->local = local;
+
+wind:
+ STACK_WIND(frame, br_stub_unlink_cbk, FIRST_CHILD(this),
+ FIRST_CHILD(this)->fops->unlink, loc, flag, xdata);
+ return 0;
+
+unwind:
+ if (frame->local == (void *)0x1)
+ frame->local = NULL;
+ STACK_UNWIND_STRICT(unlink, frame, op_ret, op_errno, NULL, NULL, NULL);
+ return 0;
+}
+
+/** }}} */
+
+/** {{{ */
+
+/* forget() */
+
+int
+br_stub_forget(xlator_t *this, inode_t *inode)
+{
+ uint64_t ctx_addr = 0;
+ br_stub_inode_ctx_t *ctx = NULL;
+
+ inode_ctx_del(inode, this, &ctx_addr);
+ if (!ctx_addr)
+ return 0;
+
+ ctx = (br_stub_inode_ctx_t *)(long)ctx_addr;
+
+ GF_FREE(ctx);
+
+ return 0;
+}
+
+/** }}} */
+
+/** {{{ */
+
+int32_t
+br_stub_noop(call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret,
+ int32_t op_errno, dict_t *xdata)
+{
+ STACK_DESTROY(frame->root);
+ return 0;
+}
+
+static void
+br_stub_send_ipc_fop(xlator_t *this, fd_t *fd, unsigned long releaseversion,
+ int sign_info)
+{
+ int32_t op = 0;
+ int32_t ret = 0;
+ dict_t *xdata = NULL;
+ call_frame_t *frame = NULL;
+ changelog_event_t ev = {
+ 0,
+ };
+
+ ev.ev_type = CHANGELOG_OP_TYPE_BR_RELEASE;
+ ev.u.releasebr.version = releaseversion;
+ ev.u.releasebr.sign_info = sign_info;
+ gf_uuid_copy(ev.u.releasebr.gfid, fd->inode->gfid);
+
+ xdata = dict_new();
+ if (!xdata) {
+ gf_smsg(this->name, GF_LOG_WARNING, ENOMEM, BRS_MSG_DICT_ALLOC_FAILED,
+ NULL);
+ goto out;
+ }
+
+ ret = dict_set_static_bin(xdata, "RELEASE-EVENT", &ev, CHANGELOG_EV_SIZE);
+ if (ret) {
+ gf_smsg(this->name, GF_LOG_WARNING, 0, BRS_MSG_SET_EVENT_FAILED, NULL);
+ goto dealloc_dict;
+ }
+
+ frame = create_frame(this, this->ctx->pool);
+ if (!frame) {
+ gf_smsg(this->name, GF_LOG_WARNING, 0, BRS_MSG_CREATE_FRAME_FAILED,
+ NULL);
+ goto dealloc_dict;
+ }
+
+ op = GF_IPC_TARGET_CHANGELOG;
+ STACK_WIND(frame, br_stub_noop, FIRST_CHILD(this),
+ FIRST_CHILD(this)->fops->ipc, op, xdata);
+
+dealloc_dict:
+ dict_unref(xdata);
+out:
+ return;
+}
+
+/**
+ * This is how the state machine of sign info works:
+ * 3 states:
+ * 1) BR_SIGN_NORMAL => The default State of the inode
+ * 2) BR_SIGN_REOPEN_WAIT => A release has been sent and is waiting for reopen
+ * 3) BR_SIGN_QUICK => reopen has happened and this release should trigger sign
+ * 2 events:
+ * 1) GF_FOP_RELEASE
+ * 2) GF_FOP_WRITE (actually a dummy write for BitD)
+ *
+ * This is how states are changed based on events:
+ * EVENT: GF_FOP_RELEASE:
+ * if (state == BR_SIGN_NORMAL) ; then
+ * set state = BR_SIGN_REOPEN_WAIT;
+ * if (state == BR_SIGN_QUICK); then
+ * set state = BR_SIGN_NORMAL;
+ * EVENT: GF_FOP_WRITE:
+ * if (state == BR_SIGN_REOPEN_WAIT); then
+ * set state = BR_SIGN_QUICK;
+ */
+br_sign_state_t
+__br_stub_inode_sign_state(br_stub_inode_ctx_t *ctx, glusterfs_fop_t fop,
+ fd_t *fd)
+{
+ br_sign_state_t sign_info = BR_SIGN_INVALID;
+
+ switch (fop) {
+ case GF_FOP_FSETXATTR:
+ sign_info = ctx->info_sign = BR_SIGN_QUICK;
+ break;
+
+ case GF_FOP_RELEASE:
+ GF_ASSERT(ctx->info_sign != BR_SIGN_REOPEN_WAIT);
+
+ if (ctx->info_sign == BR_SIGN_NORMAL) {
+ sign_info = ctx->info_sign = BR_SIGN_REOPEN_WAIT;
+ } else {
+ sign_info = ctx->info_sign;
+ ctx->info_sign = BR_SIGN_NORMAL;
+ }
+
+ break;
+ default:
+ break;
+ }
+
+ return sign_info;
+}
+
+int32_t
+br_stub_release(xlator_t *this, fd_t *fd)
+{
+ int32_t ret = 0;
+ int32_t flags = 0;
+ inode_t *inode = NULL;
+ unsigned long releaseversion = 0;
+ br_stub_inode_ctx_t *ctx = NULL;
+ uint64_t tmp = 0;
+ br_stub_fd_t *br_stub_fd = NULL;
+ int32_t signinfo = 0;
+
+ inode = fd->inode;
+
+ LOCK(&inode->lock);
+ {
+ ctx = __br_stub_get_ongoing_version_ctx(this, inode, NULL);
+ if (ctx == NULL)
+ goto unblock;
+ br_stub_fd = br_stub_fd_ctx_get(this, fd);
+ if (br_stub_fd) {
+ list_del_init(&br_stub_fd->list);
+ }
+
+ ret = __br_stub_can_trigger_release(inode, ctx, &releaseversion);
+ if (!ret)
+ goto unblock;
+
+ signinfo = __br_stub_inode_sign_state(ctx, GF_FOP_RELEASE, fd);
+ signinfo = htonl(signinfo);
+
+ /* inode back to initital state: mark dirty */
+ if (ctx->info_sign == BR_SIGN_NORMAL) {
+ __br_stub_mark_inode_dirty(ctx);
+ __br_stub_unset_inode_modified(ctx);
+ }
+ }
+unblock:
+ UNLOCK(&inode->lock);
+
+ if (ret) {
+ gf_msg_debug(this->name, 0,
+ "releaseversion: %lu | flags: %d "
+ "| signinfo: %d",
+ (unsigned long)ntohl(releaseversion), flags,
+ ntohl(signinfo));
+ br_stub_send_ipc_fop(this, fd, releaseversion, signinfo);
+ }
+
+ ret = fd_ctx_del(fd, this, &tmp);
+ br_stub_fd = (br_stub_fd_t *)(long)tmp;
+
+ GF_FREE(br_stub_fd);
+
+ return 0;
+}
+
+int32_t
+br_stub_releasedir(xlator_t *this, fd_t *fd)
+{
+ br_stub_fd_t *fctx = NULL;
+ uint64_t ctx = 0;
+ int ret = 0;
+
+ ret = fd_ctx_del(fd, this, &ctx);
+ if (ret < 0)
+ goto out;
+
+ fctx = (br_stub_fd_t *)(long)ctx;
+ if (fctx->bad_object.dir) {
+ ret = sys_closedir(fctx->bad_object.dir);
+ if (ret)
+ gf_smsg(this->name, GF_LOG_ERROR, 0, BRS_MSG_BAD_OBJ_DIR_CLOSE_FAIL,
+ "error=%s", strerror(errno), NULL);
+ }
+
+ GF_FREE(fctx);
+out:
+ return 0;
+}
+
+/** }}} */
+
+/** {{{ */
+
+/* ictxmerge */
+
+void
+br_stub_ictxmerge(xlator_t *this, fd_t *fd, inode_t *inode,
+ inode_t *linked_inode)
+{
+ int32_t ret = 0;
+ uint64_t ctxaddr = 0;
+ uint64_t lctxaddr = 0;
+ br_stub_inode_ctx_t *ctx = NULL;
+ br_stub_inode_ctx_t *lctx = NULL;
+ br_stub_fd_t *br_stub_fd = NULL;
+
+ ret = br_stub_get_inode_ctx(this, inode, &ctxaddr);
+ if (ret < 0)
+ goto done;
+ ctx = (br_stub_inode_ctx_t *)(uintptr_t)ctxaddr;
+
+ LOCK(&linked_inode->lock);
+ {
+ ret = __br_stub_get_inode_ctx(this, linked_inode, &lctxaddr);
+ if (ret < 0)
+ goto unblock;
+ lctx = (br_stub_inode_ctx_t *)(uintptr_t)lctxaddr;
+
+ GF_ASSERT(list_is_singular(&ctx->fd_list));
+ br_stub_fd = list_first_entry(&ctx->fd_list, br_stub_fd_t, list);
+ if (br_stub_fd) {
+ GF_ASSERT(br_stub_fd->fd == fd);
+ list_move_tail(&br_stub_fd->list, &lctx->fd_list);
+ }
+ }
+unblock:
+ UNLOCK(&linked_inode->lock);
+
+done:
+ return;
+}
+
+/** }}} */
+
+struct xlator_fops fops = {
+ .lookup = br_stub_lookup,
+ .stat = br_stub_stat,
+ .fstat = br_stub_fstat,
+ .open = br_stub_open,
+ .create = br_stub_create,
+ .readdirp = br_stub_readdirp,
+ .getxattr = br_stub_getxattr,
+ .fgetxattr = br_stub_fgetxattr,
+ .fsetxattr = br_stub_fsetxattr,
+ .writev = br_stub_writev,
+ .truncate = br_stub_truncate,
+ .ftruncate = br_stub_ftruncate,
+ .mknod = br_stub_mknod,
+ .readv = br_stub_readv,
+ .removexattr = br_stub_removexattr,
+ .fremovexattr = br_stub_fremovexattr,
+ .setxattr = br_stub_setxattr,
+ .opendir = br_stub_opendir,
+ .readdir = br_stub_readdir,
+ .unlink = br_stub_unlink,
+};
+
+struct xlator_cbks cbks = {
+ .forget = br_stub_forget,
+ .release = br_stub_release,
+ .ictxmerge = br_stub_ictxmerge,
+};
+
+struct volume_options options[] = {
+ {.key = {"bitrot"},
+ .type = GF_OPTION_TYPE_BOOL,
+ .default_value = "off",
+ .op_version = {GD_OP_VERSION_3_7_0},
+ .flags = OPT_FLAG_SETTABLE | OPT_FLAG_FORCE,
+ .tags = {"bitrot"},
+ .description = "enable/disable bitrot stub"},
+ {.key = {"export"},
+ .type = GF_OPTION_TYPE_PATH,
+ .op_version = {GD_OP_VERSION_3_7_0},
+ .tags = {"bitrot"},
+ .description = "brick path for versioning",
+ .default_value = "{{ brick.path }}"},
+ {.key = {NULL}},
+};
+
+xlator_api_t xlator_api = {
+ .init = init,
+ .fini = fini,
+ .notify = notify,
+ .reconfigure = reconfigure,
+ .mem_acct_init = mem_acct_init,
+ .op_version = {1}, /* Present from the initial version */
+ .fops = &fops,
+ .cbks = &cbks,
+ .options = options,
+ .identifier = "bitrot-stub",
+ .category = GF_MAINTAINED,
+};
diff --git a/xlators/features/bit-rot/src/stub/bit-rot-stub.h b/xlators/features/bit-rot/src/stub/bit-rot-stub.h
new file mode 100644
index 00000000000..edd79a77e4f
--- /dev/null
+++ b/xlators/features/bit-rot/src/stub/bit-rot-stub.h
@@ -0,0 +1,515 @@
+/*
+ Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com>
+ This file is part of GlusterFS.
+
+ This file is licensed to you under your choice of the GNU Lesser
+ General Public License, version 3 or any later version (LGPLv3 or
+ later), or the GNU General Public License, version 2 (GPLv2), in all
+ cases as published by the Free Software Foundation.
+*/
+#ifndef __BIT_ROT_STUB_H__
+#define __BIT_ROT_STUB_H__
+
+#include <glusterfs/glusterfs.h>
+#include <glusterfs/logging.h>
+#include <glusterfs/dict.h>
+#include <glusterfs/xlator.h>
+#include <glusterfs/defaults.h>
+#include <glusterfs/call-stub.h>
+#include "bit-rot-stub-mem-types.h"
+#include <glusterfs/syscall.h>
+#include <glusterfs/common-utils.h>
+#include "bit-rot-common.h"
+#include "bit-rot-stub-messages.h"
+#include "glusterfs3-xdr.h"
+#include <glusterfs/syncop.h>
+#include <glusterfs/syncop-utils.h>
+
+#define BAD_OBJECT_THREAD_STACK_SIZE ((size_t)(1024 * 1024))
+#define BR_STUB_DUMP_STR_SIZE 65536
+
+#define BR_PATH_MAX_EXTRA (PATH_MAX + 1024)
+#define BR_PATH_MAX_PLUS (PATH_MAX + 2048)
+
+/*
+ * Oops. Spelling mistake. Correcting it
+ */
+#define OLD_BR_STUB_QUARANTINE_DIR GF_HIDDEN_PATH "/quanrantine"
+#define BR_STUB_QUARANTINE_DIR GF_HIDDEN_PATH "/quarantine"
+
+/* do not reference frame->local in cbk unless initialized.
+ * Assigned 0x1 marks verisoning flag between call path and
+ * cbk path.
+ */
+#define BR_STUB_VER_NOT_ACTIVE_THEN_GOTO(frame, priv, label) \
+ do { \
+ if (priv->do_versioning) \
+ frame->local = (void *)0x1; \
+ else \
+ goto label; \
+ } while (0)
+
+#define BR_STUB_VER_COND_GOTO(priv, cond, label) \
+ do { \
+ if (!priv->do_versioning || cond) \
+ goto label; \
+ } while (0)
+
+#define BR_STUB_VER_ENABLED_IN_CALLPATH(frame, flag) \
+ do { \
+ if (frame->local) \
+ flag = _gf_true; \
+ if (frame->local == (void *)0x1) \
+ frame->local = NULL; \
+ } while (0)
+
+#define BR_STUB_RESET_LOCAL_NULL(frame) \
+ do { \
+ if (frame->local == (void *)0x1) \
+ frame->local = NULL; \
+ } while (0)
+
+typedef int(br_stub_version_cbk)(call_frame_t *, void *, xlator_t *, int32_t,
+ int32_t, dict_t *);
+
+typedef struct br_stub_inode_ctx {
+ int need_writeback; /* does the inode need
+ a writeback to disk? */
+ unsigned long currentversion; /* ongoing version */
+
+ int info_sign;
+ struct list_head fd_list; /* list of open fds or fds participating in
+ write operations */
+ gf_boolean_t bad_object;
+} br_stub_inode_ctx_t;
+
+typedef struct br_stub_fd {
+ fd_t *fd;
+ struct list_head list;
+ struct bad_object_dir {
+ DIR *dir;
+ off_t dir_eof;
+ } bad_object;
+} br_stub_fd_t;
+
+#define I_DIRTY (1 << 0) /* inode needs writeback */
+#define I_MODIFIED (1 << 1)
+#define WRITEBACK_DURABLE 1 /* writeback is durable */
+
+/**
+ * This could just have been a plain struct without unions and all,
+ * but we may need additional things in the future.
+ */
+typedef struct br_stub_local {
+ call_stub_t *fopstub; /* stub for original fop */
+
+ int versioningtype; /* not much used atm */
+
+ union {
+ struct br_stub_ctx {
+ fd_t *fd;
+ uuid_t gfid;
+ inode_t *inode;
+ unsigned long version;
+ } context;
+ } u;
+} br_stub_local_t;
+
+#define BR_STUB_NO_VERSIONING (1 << 0)
+#define BR_STUB_INCREMENTAL_VERSIONING (1 << 1)
+
+typedef struct br_stub_private {
+ gf_boolean_t do_versioning;
+
+ uint32_t boot[2];
+ char export[PATH_MAX];
+
+ pthread_mutex_t lock;
+ pthread_cond_t cond;
+
+ struct list_head squeue; /* ordered signing queue */
+ pthread_t signth;
+ struct bad_objects_container {
+ pthread_t thread;
+ pthread_mutex_t bad_lock;
+ pthread_cond_t bad_cond;
+ struct list_head bad_queue;
+ } container;
+ struct mem_pool *local_pool;
+
+ char stub_basepath[BR_PATH_MAX_EXTRA];
+
+ uuid_t bad_object_dir_gfid;
+} br_stub_private_t;
+
+br_stub_fd_t *
+br_stub_fd_new(void);
+
+int
+__br_stub_fd_ctx_set(xlator_t *this, fd_t *fd, br_stub_fd_t *br_stub_fd);
+
+br_stub_fd_t *
+__br_stub_fd_ctx_get(xlator_t *this, fd_t *fd);
+
+br_stub_fd_t *
+br_stub_fd_ctx_get(xlator_t *this, fd_t *fd);
+
+int32_t
+br_stub_fd_ctx_set(xlator_t *this, fd_t *fd, br_stub_fd_t *br_stub_fd);
+
+static inline gf_boolean_t
+__br_stub_is_bad_object(br_stub_inode_ctx_t *ctx)
+{
+ return ctx->bad_object;
+}
+
+static inline void
+__br_stub_mark_object_bad(br_stub_inode_ctx_t *ctx)
+{
+ ctx->bad_object = _gf_true;
+}
+
+/* inode writeback helpers */
+static inline void
+__br_stub_mark_inode_dirty(br_stub_inode_ctx_t *ctx)
+{
+ ctx->need_writeback |= I_DIRTY;
+}
+
+static inline void
+__br_stub_mark_inode_synced(br_stub_inode_ctx_t *ctx)
+{
+ ctx->need_writeback &= ~I_DIRTY;
+}
+
+static inline int
+__br_stub_is_inode_dirty(br_stub_inode_ctx_t *ctx)
+{
+ return (ctx->need_writeback & I_DIRTY);
+}
+
+/* inode mofification markers */
+static inline void
+__br_stub_set_inode_modified(br_stub_inode_ctx_t *ctx)
+{
+ ctx->need_writeback |= I_MODIFIED;
+}
+
+static inline void
+__br_stub_unset_inode_modified(br_stub_inode_ctx_t *ctx)
+{
+ ctx->need_writeback &= ~I_MODIFIED;
+}
+
+static inline int
+__br_stub_is_inode_modified(br_stub_inode_ctx_t *ctx)
+{
+ return (ctx->need_writeback & I_MODIFIED);
+}
+
+static inline int
+br_stub_require_release_call(xlator_t *this, fd_t *fd, br_stub_fd_t **fd_ctx)
+{
+ int32_t ret = 0;
+ br_stub_fd_t *br_stub_fd = NULL;
+
+ br_stub_fd = br_stub_fd_new();
+ if (!br_stub_fd)
+ return -1;
+
+ br_stub_fd->fd = fd;
+ INIT_LIST_HEAD(&br_stub_fd->list);
+
+ ret = br_stub_fd_ctx_set(this, fd, br_stub_fd);
+ if (ret)
+ gf_smsg(this->name, GF_LOG_WARNING, 0, BRS_MSG_SET_CONTEXT_FAILED,
+ NULL);
+ else
+ *fd_ctx = br_stub_fd;
+
+ return ret;
+}
+
+/* get/set inode context helpers */
+
+static inline int
+__br_stub_get_inode_ctx(xlator_t *this, inode_t *inode, uint64_t *ctx)
+{
+ return __inode_ctx_get(inode, this, ctx);
+}
+
+static inline int
+br_stub_get_inode_ctx(xlator_t *this, inode_t *inode, uint64_t *ctx)
+{
+ int ret = -1;
+
+ LOCK(&inode->lock);
+ {
+ ret = __br_stub_get_inode_ctx(this, inode, ctx);
+ }
+ UNLOCK(&inode->lock);
+
+ return ret;
+}
+
+static inline int
+br_stub_set_inode_ctx(xlator_t *this, inode_t *inode, br_stub_inode_ctx_t *ctx)
+{
+ uint64_t ctx_addr = (uint64_t)(uintptr_t)ctx;
+ return inode_ctx_set(inode, this, &ctx_addr);
+}
+
+/* version get/set helpers */
+
+static inline unsigned long
+__br_stub_writeback_version(br_stub_inode_ctx_t *ctx)
+{
+ return (ctx->currentversion + 1);
+}
+
+static inline void
+__br_stub_set_ongoing_version(br_stub_inode_ctx_t *ctx, unsigned long version)
+{
+ if (ctx->currentversion < version)
+ ctx->currentversion = version;
+ else
+ gf_smsg("bit-rot-stub", GF_LOG_WARNING, 0,
+ BRS_MSG_CHANGE_VERSION_FAILED, "current version=%lu",
+ ctx->currentversion, "new version=%lu", version, NULL);
+}
+
+static inline int
+__br_stub_can_trigger_release(inode_t *inode, br_stub_inode_ctx_t *ctx,
+ unsigned long *version)
+{
+ /**
+ * If the inode is modified, then it has to be dirty. An inode is
+ * marked dirty once version is increased. Its marked as modified
+ * when the modification call (write/truncate) which triggered
+ * the versioning is successful.
+ */
+ if (__br_stub_is_inode_modified(ctx) && list_empty(&ctx->fd_list) &&
+ (ctx->info_sign != BR_SIGN_REOPEN_WAIT)) {
+ GF_ASSERT(__br_stub_is_inode_dirty(ctx) == 0);
+
+ if (version)
+ *version = htonl(ctx->currentversion);
+ return 1;
+ }
+
+ return 0;
+}
+
+static inline int32_t
+br_stub_get_ongoing_version(xlator_t *this, inode_t *inode,
+ unsigned long *version)
+{
+ int32_t ret = 0;
+ uint64_t ctx_addr = 0;
+ br_stub_inode_ctx_t *ctx = NULL;
+
+ LOCK(&inode->lock);
+ {
+ ret = __inode_ctx_get(inode, this, &ctx_addr);
+ if (ret < 0)
+ goto unblock;
+ ctx = (br_stub_inode_ctx_t *)(long)ctx_addr;
+ *version = ctx->currentversion;
+ }
+unblock:
+ UNLOCK(&inode->lock);
+
+ return ret;
+}
+
+/**
+ * fetch the current version from inode and return the context.
+ * inode->lock should be held before invoking this as context
+ * *needs* to be valid in the caller.
+ */
+static inline br_stub_inode_ctx_t *
+__br_stub_get_ongoing_version_ctx(xlator_t *this, inode_t *inode,
+ unsigned long *version)
+{
+ int32_t ret = 0;
+ uint64_t ctx_addr = 0;
+ br_stub_inode_ctx_t *ctx = NULL;
+
+ ret = __inode_ctx_get(inode, this, &ctx_addr);
+ if (ret < 0)
+ return NULL;
+ ctx = (br_stub_inode_ctx_t *)(long)ctx_addr;
+ if (version)
+ *version = ctx->currentversion;
+
+ return ctx;
+}
+
+/* filter for xattr fetch */
+static inline int
+br_stub_is_internal_xattr(const char *name)
+{
+ if (name && ((strncmp(name, BITROT_CURRENT_VERSION_KEY,
+ SLEN(BITROT_CURRENT_VERSION_KEY)) == 0) ||
+ (strncmp(name, BITROT_SIGNING_VERSION_KEY,
+ SLEN(BITROT_SIGNING_VERSION_KEY)) == 0)))
+ return 1;
+ return 0;
+}
+
+static inline void
+br_stub_remove_vxattrs(dict_t *xattr, gf_boolean_t remove_bad_marker)
+{
+ if (xattr) {
+ /*
+ * When a file is corrupted, bad-object should be
+ * set in the dict. But, other info such as version,
+ * signature etc should not be set. Hence the flag
+ * remove_bad_marker. The consumer should know whether
+ * to send the bad-object info in the dict or not.
+ */
+ if (remove_bad_marker)
+ dict_del(xattr, BITROT_OBJECT_BAD_KEY);
+ dict_del(xattr, BITROT_CURRENT_VERSION_KEY);
+ dict_del(xattr, BITROT_SIGNING_VERSION_KEY);
+ dict_del(xattr, BITROT_SIGNING_XATTR_SIZE_KEY);
+ }
+}
+
+/**
+ * This function returns the below values for different situations
+ * 0 => as per the inode context object is not bad
+ * -1 => Failed to get the inode context itself
+ * -2 => As per the inode context object is bad
+ * Both -ve values means the fop which called this function is failed
+ * and error is returned upwards.
+ * In future if needed or more errors have to be handled, then those
+ * errors can be made into enums.
+ */
+static inline int
+br_stub_is_bad_object(xlator_t *this, inode_t *inode)
+{
+ int bad_object = 0;
+ gf_boolean_t tmp = _gf_false;
+ uint64_t ctx_addr = 0;
+ br_stub_inode_ctx_t *ctx = NULL;
+ int32_t ret = -1;
+
+ ret = br_stub_get_inode_ctx(this, inode, &ctx_addr);
+ if (ret) {
+ gf_smsg(this->name, GF_LOG_ERROR, 0, BRS_MSG_GET_INODE_CONTEXT_FAILED,
+ "inode-gfid=%s", uuid_utoa(inode->gfid), NULL);
+ bad_object = -1;
+ goto out;
+ }
+
+ ctx = (br_stub_inode_ctx_t *)(long)ctx_addr;
+
+ LOCK(&inode->lock);
+ {
+ tmp = __br_stub_is_bad_object(ctx);
+ if (tmp)
+ bad_object = -2;
+ }
+ UNLOCK(&inode->lock);
+
+out:
+ return bad_object;
+}
+
+static inline int32_t
+br_stub_mark_object_bad(xlator_t *this, inode_t *inode)
+{
+ int32_t ret = -1;
+ uint64_t ctx_addr = 0;
+ br_stub_inode_ctx_t *ctx = NULL;
+
+ ret = br_stub_get_inode_ctx(this, inode, &ctx_addr);
+ if (ret) {
+ gf_smsg(this->name, GF_LOG_ERROR, 0, BRS_MSG_GET_INODE_CONTEXT_FAILED,
+ "inode-gfid=%s", uuid_utoa(inode->gfid), NULL);
+ goto out;
+ }
+
+ ctx = (br_stub_inode_ctx_t *)(long)ctx_addr;
+
+ LOCK(&inode->lock);
+ {
+ __br_stub_mark_object_bad(ctx);
+ }
+ UNLOCK(&inode->lock);
+
+out:
+ return ret;
+}
+
+/**
+ * There is a possibility that dict_set might fail. The o/p of dict_set is
+ * given to the caller and the caller has to decide what to do.
+ */
+static inline int32_t
+br_stub_mark_xdata_bad_object(xlator_t *this, inode_t *inode, dict_t *xdata)
+{
+ int32_t ret = 0;
+
+ if (br_stub_is_bad_object(this, inode) == -2)
+ ret = dict_set_int32(xdata, GLUSTERFS_BAD_INODE, 1);
+
+ return ret;
+}
+
+int32_t
+br_stub_add_fd_to_inode(xlator_t *this, fd_t *fd, br_stub_inode_ctx_t *ctx);
+
+br_sign_state_t
+__br_stub_inode_sign_state(br_stub_inode_ctx_t *ctx, glusterfs_fop_t fop,
+ fd_t *fd);
+
+int
+br_stub_dir_create(xlator_t *this, br_stub_private_t *priv);
+
+int
+br_stub_add(xlator_t *this, uuid_t gfid);
+
+int32_t
+br_stub_create_stub_gfid(xlator_t *this, char *stub_gfid_path, uuid_t gfid);
+
+int
+br_stub_dir_create(xlator_t *this, br_stub_private_t *priv);
+
+call_stub_t *
+__br_stub_dequeue(struct list_head *callstubs);
+
+void
+__br_stub_enqueue(struct list_head *callstubs, call_stub_t *stub);
+
+void
+br_stub_worker_enqueue(xlator_t *this, call_stub_t *stub);
+
+void *
+br_stub_worker(void *data);
+
+int32_t
+br_stub_lookup_wrapper(call_frame_t *frame, xlator_t *this, loc_t *loc,
+ dict_t *xattr_req);
+
+int32_t
+br_stub_readdir_wrapper(call_frame_t *frame, xlator_t *this, fd_t *fd,
+ size_t size, off_t off, dict_t *xdata);
+
+int
+br_stub_del(xlator_t *this, uuid_t gfid);
+
+int
+br_stub_bad_objects_path(xlator_t *this, fd_t *fd, gf_dirent_t *entries,
+ dict_t **dict);
+
+void
+br_stub_entry_xattr_fill(xlator_t *this, char *hpath, gf_dirent_t *entry,
+ dict_t *dict);
+
+int
+br_stub_get_path_of_gfid(xlator_t *this, inode_t *parent, inode_t *inode,
+ uuid_t gfid, char **path);
+
+#endif /* __BIT_ROT_STUB_H__ */