/* Copyright (c) 2015 Red Hat, Inc. This file is part of GlusterFS. This file is licensed to you under your choice of the GNU Lesser General Public License, version 3 or any later version (LGPLv3 or later), or the GNU General Public License, version 2 (GPLv2), in all cases as published by the Free Software Foundation. */ #include #include #include #include #include #include "changelog.h" #include #include #include "bit-rot-stub.h" #include "bit-rot-stub-mem-types.h" #include "bit-rot-stub-messages.h" #include "bit-rot-common.h" #define BR_STUB_REQUEST_COOKIE 0x1 void br_stub_lock_cleaner(void *arg) { pthread_mutex_t *clean_mutex = arg; pthread_mutex_unlock(clean_mutex); return; } void * br_stub_signth(void *); struct br_stub_signentry { unsigned long v; call_stub_t *stub; struct list_head list; }; int32_t mem_acct_init(xlator_t *this) { int32_t ret = -1; if (!this) return ret; ret = xlator_mem_acct_init(this, gf_br_stub_mt_end + 1); if (ret != 0) { gf_msg(this->name, GF_LOG_WARNING, 0, BRS_MSG_MEM_ACNT_FAILED, "Memory accounting init failed"); return ret; } return ret; } int br_stub_bad_object_container_init(xlator_t *this, br_stub_private_t *priv) { pthread_attr_t w_attr; int ret = -1; ret = pthread_cond_init(&priv->container.bad_cond, NULL); if (ret != 0) { gf_msg(this->name, GF_LOG_ERROR, 0, BRS_MSG_BAD_OBJ_THREAD_FAIL, "pthread_cond_init failed (%d)", ret); goto out; } ret = pthread_mutex_init(&priv->container.bad_lock, NULL); if (ret != 0) { gf_msg(this->name, GF_LOG_ERROR, 0, BRS_MSG_BAD_OBJ_THREAD_FAIL, "pthread_mutex_init failed (%d)", ret); goto cleanup_cond; } ret = pthread_attr_init(&w_attr); if (ret != 0) { gf_msg(this->name, GF_LOG_ERROR, 0, BRS_MSG_BAD_OBJ_THREAD_FAIL, "pthread_attr_init failed (%d)", ret); goto cleanup_lock; } ret = pthread_attr_setstacksize(&w_attr, BAD_OBJECT_THREAD_STACK_SIZE); if (ret == EINVAL) { gf_msg(this->name, GF_LOG_WARNING, 0, BRS_MSG_BAD_OBJ_THREAD_FAIL, "Using default thread stack size"); } INIT_LIST_HEAD(&priv->container.bad_queue); ret = br_stub_dir_create(this, priv); if (ret < 0) goto cleanup_lock; ret = gf_thread_create(&priv->container.thread, &w_attr, br_stub_worker, this, "brswrker"); if (ret) goto cleanup_attr; return 0; cleanup_attr: pthread_attr_destroy(&w_attr); cleanup_lock: pthread_mutex_destroy(&priv->container.bad_lock); cleanup_cond: pthread_cond_destroy(&priv->container.bad_cond); out: return -1; } int32_t init(xlator_t *this) { int ret = 0; char *tmp = NULL; struct timeval tv = { 0, }; br_stub_private_t *priv = NULL; if (!this->children) { gf_msg(this->name, GF_LOG_ERROR, 0, BRS_MSG_NO_CHILD, "FATAL: no children"); goto error_return; } priv = GF_CALLOC(1, sizeof(*priv), gf_br_stub_mt_private_t); if (!priv) goto error_return; priv->local_pool = mem_pool_new(br_stub_local_t, 512); if (!priv->local_pool) goto free_priv; GF_OPTION_INIT("bitrot", priv->do_versioning, bool, free_mempool); GF_OPTION_INIT("export", tmp, str, free_mempool); if (snprintf(priv->export, PATH_MAX, "%s", tmp) >= PATH_MAX) goto free_mempool; if (snprintf(priv->stub_basepath, sizeof(priv->stub_basepath), "%s/%s", priv->export, BR_STUB_QUARANTINE_DIR) >= sizeof(priv->stub_basepath)) goto free_mempool; (void)gettimeofday(&tv, NULL); /* boot time is in network endian format */ priv->boot[0] = htonl(tv.tv_sec); priv->boot[1] = htonl(tv.tv_usec); pthread_mutex_init(&priv->lock, NULL); pthread_cond_init(&priv->cond, NULL); INIT_LIST_HEAD(&priv->squeue); /* Thread creations need 'this' to be passed so that THIS can be * assigned inside the thread. So setting this->private here. */ this->private = priv; if (!priv->do_versioning) return 0; ret = gf_thread_create(&priv->signth, NULL, br_stub_signth, this, "brssign"); if (ret != 0) { gf_msg(this->name, GF_LOG_WARNING, 0, BRS_MSG_SPAWN_SIGN_THRD_FAILED, "failed to create the new thread for signer"); goto cleanup_lock; } ret = br_stub_bad_object_container_init(this, priv); if (ret) { gf_msg(this->name, GF_LOG_ERROR, 0, BRS_MSG_BAD_CONTAINER_FAIL, "failed to launch the thread for storing bad gfids"); goto cleanup_lock; } gf_msg_debug(this->name, 0, "bit-rot stub loaded"); return 0; cleanup_lock: pthread_cond_destroy(&priv->cond); pthread_mutex_destroy(&priv->lock); free_mempool: mem_pool_destroy(priv->local_pool); free_priv: GF_FREE(priv); this->private = NULL; error_return: return -1; } /* TODO: * As of now enabling bitrot option does 2 things. * 1) Start the Bitrot Daemon which signs the objects (currently files only) * upon getting notified by the stub. * 2) Enable versioning of the objects. Object versions (again files only) are * incremented upon modification. * So object versioning is tied to bitrot daemon's signing. In future, object * versioning might be necessary for other things as well apart from bit-rot * detection (well that's the objective of bringing in object-versioning :)). * In that case, better to make versioning a new option and letting it to be * enabled despite bit-rot detection is not needed. * Ex: ICAP. */ int32_t reconfigure(xlator_t *this, dict_t *options) { int32_t ret = -1; br_stub_private_t *priv = NULL; priv = this->private; GF_OPTION_RECONF("bitrot", priv->do_versioning, options, bool, err); if (priv->do_versioning && !priv->signth) { ret = gf_thread_create(&priv->signth, NULL, br_stub_signth, this, "brssign"); if (ret != 0) { gf_msg(this->name, GF_LOG_WARNING, 0, BRS_MSG_SPAWN_SIGN_THRD_FAILED, "failed to create the new thread for signer"); goto err; } ret = br_stub_bad_object_container_init(this, priv); if (ret) { gf_msg(this->name, GF_LOG_ERROR, 0, BRS_MSG_BAD_CONTAINER_FAIL, "failed to launch the thread for storing bad gfids"); goto err; } } else { if (priv->signth) { if (gf_thread_cleanup_xint(priv->signth)) { gf_msg(this->name, GF_LOG_ERROR, 0, BRS_MSG_CANCEL_SIGN_THREAD_FAILED, "Could not cancel sign serializer thread"); } else { gf_msg(this->name, GF_LOG_INFO, 0, BRS_MSG_KILL_SIGN_THREAD, "killed the signer thread"); priv->signth = 0; } } if (priv->container.thread) { if (gf_thread_cleanup_xint(priv->container.thread)) { gf_msg(this->name, GF_LOG_ERROR, 0, BRS_MSG_CANCEL_SIGN_THREAD_FAILED, "Could not cancel sign serializer thread"); } priv->container.thread = 0; } } ret = 0; return ret; err: if (priv->signth) { if (gf_thread_cleanup_xint(priv->signth)) { gf_msg(this->name, GF_LOG_ERROR, 0, BRS_MSG_CANCEL_SIGN_THREAD_FAILED, "Could not cancel sign serializer thread"); } priv->signth = 0; } if (priv->container.thread) { if (gf_thread_cleanup_xint(priv->container.thread)) { gf_msg(this->name, GF_LOG_ERROR, 0, BRS_MSG_CANCEL_SIGN_THREAD_FAILED, "Could not cancel sign serializer thread"); } priv->container.thread = 0; } ret = -1; return ret; } int notify(xlator_t *this, int event, void *data, ...) { br_stub_private_t *priv = NULL; if (!this) return 0; priv = this->private; if (!priv) return 0; default_notify(this, event, data); return 0; } void fini(xlator_t *this) { int32_t ret = 0; br_stub_private_t *priv = this->private; struct br_stub_signentry *sigstub = NULL; call_stub_t *stub = NULL; if (!priv) return; if (!priv->do_versioning) goto cleanup; ret = gf_thread_cleanup_xint(priv->signth); if (ret) { gf_msg(this->name, GF_LOG_ERROR, 0, BRS_MSG_CANCEL_SIGN_THREAD_FAILED, "Could not cancel sign serializer thread"); goto out; } priv->signth = 0; while (!list_empty(&priv->squeue)) { sigstub = list_first_entry(&priv->squeue, struct br_stub_signentry, list); list_del_init(&sigstub->list); call_stub_destroy(sigstub->stub); GF_FREE(sigstub); } ret = gf_thread_cleanup_xint(priv->container.thread); if (ret) { gf_msg(this->name, GF_LOG_ERROR, 0, BRS_MSG_CANCEL_SIGN_THREAD_FAILED, "Could not cancel sign serializer thread"); goto out; } priv->container.thread = 0; while (!list_empty(&priv->container.bad_queue)) { stub = list_first_entry(&priv->container.bad_queue, call_stub_t, list); list_del_init(&stub->list); call_stub_destroy(stub); } pthread_mutex_destroy(&priv->container.bad_lock); pthread_cond_destroy(&priv->container.bad_cond); cleanup: pthread_mutex_destroy(&priv->lock); pthread_cond_destroy(&priv->cond); if (priv->local_pool) { mem_pool_destroy(priv->local_pool); priv->local_pool = NULL; } this->private = NULL; GF_FREE(priv); out: return; } static int br_stub_alloc_versions(br_version_t **obuf, br_signature_t **sbuf, size_t signaturelen) { void *mem = NULL; size_t size = 0; if (obuf) size += sizeof(br_version_t); if (sbuf) size += sizeof(br_signature_t) + signaturelen; mem = GF_CALLOC(1, size, gf_br_stub_mt_version_t); if (!mem) goto error_return; if (obuf) { *obuf = (br_version_t *)mem; mem = ((char *)mem + sizeof(br_version_t)); } if (sbuf) { *sbuf = (br_signature_t *)mem; } return 0; error_return: return -1; } static void br_stub_dealloc_versions(void *mem) { GF_FREE(mem); } static br_stub_local_t * br_stub_alloc_local(xlator_t *this) { br_stub_private_t *priv = this->private; return mem_get0(priv->local_pool); } static void br_stub_dealloc_local(br_stub_local_t *ptr) { if (!ptr) return; mem_put(ptr); } static int br_stub_prepare_version_request(xlator_t *this, dict_t *dict, br_version_t *obuf, unsigned long oversion) { br_stub_private_t *priv = NULL; priv = this->private; br_set_ongoingversion(obuf, oversion, priv->boot); return dict_set_static_bin(dict, BITROT_CURRENT_VERSION_KEY, (void *)obuf, sizeof(br_version_t)); } static int br_stub_prepare_signing_request(dict_t *dict, br_signature_t *sbuf, br_isignature_t *sign, size_t signaturelen) { size_t size = 0; br_set_signature(sbuf, sign, signaturelen, &size); return dict_set_static_bin(dict, BITROT_SIGNING_VERSION_KEY, (void *)sbuf, size); } /** * initialize an inode context starting with a given ongoing version. * a fresh lookup() or a first creat() call initializes the inode * context, hence the inode is marked dirty. this routine also * initializes the transient inode version. */ static int br_stub_init_inode_versions(xlator_t *this, fd_t *fd, inode_t *inode, unsigned long version, gf_boolean_t markdirty, gf_boolean_t bad_object, uint64_t *ctx_addr) { int32_t ret = 0; br_stub_inode_ctx_t *ctx = NULL; ctx = GF_CALLOC(1, sizeof(br_stub_inode_ctx_t), gf_br_stub_mt_inode_ctx_t); if (!ctx) goto error_return; INIT_LIST_HEAD(&ctx->fd_list); (markdirty) ? __br_stub_mark_inode_dirty(ctx) : __br_stub_mark_inode_synced(ctx); __br_stub_set_ongoing_version(ctx, version); if (bad_object) __br_stub_mark_object_bad(ctx); if (fd) { ret = br_stub_add_fd_to_inode(this, fd, ctx); if (ret) goto free_ctx; } ret = br_stub_set_inode_ctx(this, inode, ctx); if (ret) goto free_ctx; if (ctx_addr) *ctx_addr = (uint64_t)(uintptr_t)ctx; return 0; free_ctx: GF_FREE(ctx); error_return: return -1; } /** * modify the ongoing version of an inode. */ static int br_stub_mod_inode_versions(xlator_t *this, fd_t *fd, inode_t *inode, unsigned long version) { int32_t ret = -1; br_stub_inode_ctx_t *ctx = 0; LOCK(&inode->lock); { ctx = __br_stub_get_ongoing_version_ctx(this, inode, NULL); if (ctx == NULL) goto unblock; if (__br_stub_is_inode_dirty(ctx)) { __br_stub_set_ongoing_version(ctx, version); __br_stub_mark_inode_synced(ctx); } ret = 0; } unblock: UNLOCK(&inode->lock); return ret; } static void br_stub_fill_local(br_stub_local_t *local, call_stub_t *stub, fd_t *fd, inode_t *inode, uuid_t gfid, int versioningtype, unsigned long memversion) { local->fopstub = stub; local->versioningtype = versioningtype; local->u.context.version = memversion; if (fd) local->u.context.fd = fd_ref(fd); if (inode) local->u.context.inode = inode_ref(inode); gf_uuid_copy(local->u.context.gfid, gfid); } static void br_stub_cleanup_local(br_stub_local_t *local) { if (!local) return; local->fopstub = NULL; local->versioningtype = 0; local->u.context.version = 0; if (local->u.context.fd) { fd_unref(local->u.context.fd); local->u.context.fd = NULL; } if (local->u.context.inode) { inode_unref(local->u.context.inode); local->u.context.inode = NULL; } memset(local->u.context.gfid, '\0', sizeof(uuid_t)); } static int br_stub_need_versioning(xlator_t *this, fd_t *fd, gf_boolean_t *versioning, gf_boolean_t *modified, br_stub_inode_ctx_t **ctx) { int32_t ret = -1; uint64_t ctx_addr = 0; br_stub_inode_ctx_t *c = NULL; unsigned long version = BITROT_DEFAULT_CURRENT_VERSION; *versioning = _gf_false; *modified = _gf_false; /* Bitrot stub inode context was initialized only in lookup, create * and mknod cbk path. Object versioning was enabled by default * irrespective of bitrot enabled or not. But it's made optional now. * As a consequence there could be cases where getting inode ctx would * fail because it's not set yet. * e.g., If versioning (with bitrot enable) is enabled while I/O is * happening, it could directly get other fops like writev without * lookup, where getting inode ctx would fail. Hence initialize the * inode ctx on failure to get ctx. This is done in all places where * applicable. */ ret = br_stub_get_inode_ctx(this, fd->inode, &ctx_addr); if (ret < 0) { ret = br_stub_init_inode_versions(this, fd, fd->inode, version, _gf_true, _gf_false, &ctx_addr); if (ret) { gf_msg(this->name, GF_LOG_ERROR, 0, BRS_MSG_GET_INODE_CONTEXT_FAILED, "failed to " " init the inode context for the inode %s", uuid_utoa(fd->inode->gfid)); goto error_return; } } c = (br_stub_inode_ctx_t *)(long)ctx_addr; LOCK(&fd->inode->lock); { if (__br_stub_is_inode_dirty(c)) *versioning = _gf_true; if (__br_stub_is_inode_modified(c)) *modified = _gf_true; } UNLOCK(&fd->inode->lock); if (ctx) *ctx = c; return 0; error_return: return -1; } static int32_t br_stub_anon_fd_ctx(xlator_t *this, fd_t *fd, br_stub_inode_ctx_t *ctx) { int32_t ret = -1; br_stub_fd_t *br_stub_fd = NULL; br_stub_fd = br_stub_fd_ctx_get(this, fd); if (!br_stub_fd) { ret = br_stub_add_fd_to_inode(this, fd, ctx); if (ret) { gf_msg(this->name, GF_LOG_ERROR, 0, BRS_MSG_ADD_FD_TO_INODE, "failed to add fd to " "the inode (gfid: %s)", uuid_utoa(fd->inode->gfid)); goto out; } } ret = 0; out: return ret; } static int br_stub_versioning_prep(call_frame_t *frame, xlator_t *this, fd_t *fd, br_stub_inode_ctx_t *ctx) { int32_t ret = -1; br_stub_local_t *local = NULL; local = br_stub_alloc_local(this); if (!local) { gf_msg(this->name, GF_LOG_ERROR, ENOMEM, BRS_MSG_NO_MEMORY, "local allocation failed (gfid: %s)", uuid_utoa(fd->inode->gfid)); goto error_return; } if (fd_is_anonymous(fd)) { ret = br_stub_anon_fd_ctx(this, fd, ctx); if (ret) goto free_local; } frame->local = local; return 0; free_local: br_stub_dealloc_local(local); error_return: return -1; } static int br_stub_mark_inode_modified(xlator_t *this, br_stub_local_t *local) { fd_t *fd = NULL; int32_t ret = 0; uint64_t ctx_addr = 0; br_stub_inode_ctx_t *ctx = NULL; unsigned long version = BITROT_DEFAULT_CURRENT_VERSION; fd = local->u.context.fd; ret = br_stub_get_inode_ctx(this, fd->inode, &ctx_addr); if (ret < 0) { ret = br_stub_init_inode_versions(this, fd, fd->inode, version, _gf_true, _gf_false, &ctx_addr); if (ret) goto error_return; } ctx = (br_stub_inode_ctx_t *)(long)ctx_addr; LOCK(&fd->inode->lock); { __br_stub_set_inode_modified(ctx); } UNLOCK(&fd->inode->lock); return 0; error_return: return -1; } /** * The possible return values from br_stub_is_bad_object () are: * 1) 0 => as per the inode context object is not bad * 2) -1 => Failed to get the inode context itself * 3) -2 => As per the inode context object is bad * Both -ve values means the fop which called this function is failed * and error is returned upwards. */ static int br_stub_check_bad_object(xlator_t *this, inode_t *inode, int32_t *op_ret, int32_t *op_errno) { int ret = -1; unsigned long version = BITROT_DEFAULT_CURRENT_VERSION; ret = br_stub_is_bad_object(this, inode); if (ret == -2) { gf_msg(this->name, GF_LOG_ERROR, 0, BRS_MSG_BAD_OBJECT_ACCESS, "%s is a bad object. Returning", uuid_utoa(inode->gfid)); *op_ret = -1; *op_errno = EIO; } if (ret == -1) { ret = br_stub_init_inode_versions(this, NULL, inode, version, _gf_true, _gf_false, NULL); if (ret) { gf_msg( this->name, GF_LOG_ERROR, 0, BRS_MSG_GET_INODE_CONTEXT_FAILED, "failed to init inode context for %s", uuid_utoa(inode->gfid)); *op_ret = -1; *op_errno = EINVAL; } } return ret; } /** * callback for inode/fd versioning */ int br_stub_fd_incversioning_cbk(call_frame_t *frame, void *cookie, xlator_t *this, int op_ret, int op_errno, dict_t *xdata) { fd_t *fd = NULL; inode_t *inode = NULL; unsigned long version = 0; br_stub_local_t *local = NULL; local = (br_stub_local_t *)frame->local; if (op_ret < 0) goto done; fd = local->u.context.fd; inode = local->u.context.inode; version = local->u.context.version; op_ret = br_stub_mod_inode_versions(this, fd, inode, version); if (op_ret < 0) op_errno = EINVAL; done: if (op_ret < 0) { frame->local = NULL; call_unwind_error(local->fopstub, -1, op_errno); br_stub_cleanup_local(local); br_stub_dealloc_local(local); } else { call_resume(local->fopstub); } return 0; } /** * Initial object versioning * * Version persists two (2) extended attributes as explained below: * 1. Current (ongoing) version: This is incremented on an writev () * or truncate () and is the running version for an object. * 2. Signing version: This is the version against which an object * was signed (checksummed). * * During initial versioning, both ongoing and signing versions are * set of one and zero respectively. A write() call increments the * ongoing version as an indication of modification to the object. * Additionally this needs to be persisted on disk and needs to be * durable: fsync().. :-/ * As an optimization only the first write() synchronizes the ongoing * version to disk, subsequent write()s before the *last* release() * are no-op's. * * create(), just like lookup() initializes the object versions to * the default. As an optimization this is not a durable operation: * in case of a crash, hard reboot etc.. absence of versioning xattrs * is ignored in scrubber along with the one time crawler explicitly * triggering signing for such objects. * * c.f. br_stub_writev() / br_stub_truncate() */ /** * perform full or incremental versioning on an inode pointd by an * fd. incremental versioning is done when an inode is dirty and a * writeback is triggered. */ int br_stub_fd_versioning(xlator_t *this, call_frame_t *frame, call_stub_t *stub, dict_t *dict, fd_t *fd, br_stub_version_cbk *callback, unsigned long memversion, int versioningtype, int durable) { int32_t ret = -1; int flags = 0; dict_t *xdata = NULL; br_stub_local_t *local = NULL; xdata = dict_new(); if (!xdata) goto done; ret = dict_set_int32(xdata, GLUSTERFS_INTERNAL_FOP_KEY, 1); if (ret) goto dealloc_xdata; if (durable) { ret = dict_set_int32(xdata, GLUSTERFS_DURABLE_OP, 0); if (ret) goto dealloc_xdata; } local = frame->local; br_stub_fill_local(local, stub, fd, fd->inode, fd->inode->gfid, versioningtype, memversion); STACK_WIND(frame, callback, FIRST_CHILD(this), FIRST_CHILD(this)->fops->fsetxattr, fd, dict, flags, xdata); ret = 0; dealloc_xdata: dict_unref(xdata); done: return ret; } static int br_stub_perform_incversioning(xlator_t *this, call_frame_t *frame, call_stub_t *stub, fd_t *fd, br_stub_inode_ctx_t *ctx) { int32_t ret = -1; dict_t *dict = NULL; br_version_t *obuf = NULL; unsigned long writeback_version = 0; int op_errno = 0; br_stub_local_t *local = NULL; op_errno = EINVAL; local = frame->local; writeback_version = __br_stub_writeback_version(ctx); op_errno = ENOMEM; dict = dict_new(); if (!dict) goto done; ret = br_stub_alloc_versions(&obuf, NULL, 0); if (ret) goto dealloc_dict; ret = br_stub_prepare_version_request(this, dict, obuf, writeback_version); if (ret) goto dealloc_versions; ret = br_stub_fd_versioning( this, frame, stub, dict, fd, br_stub_fd_incversioning_cbk, writeback_version, BR_STUB_INCREMENTAL_VERSIONING, !WRITEBACK_DURABLE); dealloc_versions: br_stub_dealloc_versions(obuf); dealloc_dict: dict_unref(dict); done: if (ret) { if (local) frame->local = NULL; call_unwind_error(stub, -1, op_errno); if (local) { br_stub_cleanup_local(local); br_stub_dealloc_local(local); } } return ret; } /** {{{ */ /* fsetxattr() */ int32_t br_stub_perform_objsign(call_frame_t *frame, xlator_t *this, fd_t *fd, dict_t *dict, int flags, dict_t *xdata) { STACK_WIND(frame, default_fsetxattr_cbk, FIRST_CHILD(this), FIRST_CHILD(this)->fops->fsetxattr, fd, dict, flags, xdata); dict_unref(xdata); return 0; } void * br_stub_signth(void *arg) { xlator_t *this = arg; br_stub_private_t *priv = this->private; struct br_stub_signentry *sigstub = NULL; THIS = this; while (1) { /* * Disabling bit-rot feature leads to this particular thread * getting cleaned up by reconfigure via a call to the function * gf_thread_cleanup_xint (which in turn calls pthread_cancel * and pthread_join). But, if this thread had held the mutex * &priv->lock at the time of cancellation, then it leads to * deadlock in future when bit-rot feature is enabled (which * again spawns this thread which cant hold the lock as the * mutex is still held by the previous instance of the thread * which got killed). Also, the br_stub_handle_object_signature * function which is called whenever file has to be signed * also gets blocked as it too attempts to acquire &priv->lock. * * So, arrange for the lock to be unlocked as part of the * cleanup of this thread using pthread_cleanup_push and * pthread_cleanup_pop. */ pthread_cleanup_push(br_stub_lock_cleaner, &priv->lock); pthread_mutex_lock(&priv->lock); { while (list_empty(&priv->squeue)) pthread_cond_wait(&priv->cond, &priv->lock); sigstub = list_first_entry(&priv->squeue, struct br_stub_signentry, list); list_del_init(&sigstub->list); } pthread_mutex_unlock(&priv->lock); pthread_cleanup_pop(0); call_resume(sigstub->stub); GF_FREE(sigstub); } return NULL; } static gf_boolean_t br_stub_internal_xattr(dict_t *dict) { if (dict_get(dict, GLUSTERFS_SET_OBJECT_SIGNATURE) || dict_get(dict, GLUSTERFS_GET_OBJECT_SIGNATURE) || dict_get(dict, BR_REOPEN_SIGN_HINT_KEY) || dict_get(dict, BITROT_OBJECT_BAD_KEY) || dict_get(dict, BITROT_SIGNING_VERSION_KEY) || dict_get(dict, BITROT_CURRENT_VERSION_KEY)) return _gf_true; return _gf_false; } int orderq(struct list_head *elem1, struct list_head *elem2) { struct br_stub_signentry *s1 = NULL; struct br_stub_signentry *s2 = NULL; s1 = list_entry(elem1, struct br_stub_signentry, list); s2 = list_entry(elem2, struct br_stub_signentry, list); return (s1->v > s2->v); } static int br_stub_compare_sign_version(xlator_t *this, inode_t *inode, br_signature_t *sbuf, dict_t *dict, int *fakesuccess) { int32_t ret = -1; uint64_t tmp_ctx = 0; gf_boolean_t invalid = _gf_false; br_stub_inode_ctx_t *ctx = NULL; GF_VALIDATE_OR_GOTO("bit-rot-stub", this, out); GF_VALIDATE_OR_GOTO(this->name, inode, out); GF_VALIDATE_OR_GOTO(this->name, sbuf, out); GF_VALIDATE_OR_GOTO(this->name, dict, out); ret = br_stub_get_inode_ctx(this, inode, &tmp_ctx); if (ret) { dict_del(dict, BITROT_SIGNING_VERSION_KEY); goto out; } ctx = (br_stub_inode_ctx_t *)(long)tmp_ctx; LOCK(&inode->lock); { if (ctx->currentversion < sbuf->signedversion) { invalid = _gf_true; } else if (ctx->currentversion > sbuf->signedversion) { gf_msg_debug(this->name, 0, "\"Signing version\" " "(%lu) lower than \"Current version \" " "(%lu)", ctx->currentversion, sbuf->signedversion); *fakesuccess = 1; } } UNLOCK(&inode->lock); if (invalid) { ret = -1; gf_msg(this->name, GF_LOG_WARNING, 0, BRS_MSG_SIGN_VERSION_ERROR, "Signing version exceeds " "current version [%lu > %lu]", sbuf->signedversion, ctx->currentversion); } out: return ret; } static int br_stub_prepare_signature(xlator_t *this, dict_t *dict, inode_t *inode, br_isignature_t *sign, int *fakesuccess) { int32_t ret = 0; size_t signaturelen = 0; br_signature_t *sbuf = NULL; if (!br_is_signature_type_valid(sign->signaturetype)) goto error_return; signaturelen = sign->signaturelen; ret = br_stub_alloc_versions(NULL, &sbuf, signaturelen); if (ret) goto error_return; ret = br_stub_prepare_signing_request(dict, sbuf, sign, signaturelen); if (ret) goto dealloc_versions; ret = br_stub_compare_sign_version(this, inode, sbuf, dict, fakesuccess); if (ret) goto dealloc_versions; return 0; dealloc_versions: br_stub_dealloc_versions(sbuf); error_return: return -1; } static void br_stub_handle_object_signature(call_frame_t *frame, xlator_t *this, fd_t *fd, dict_t *dict, br_isignature_t *sign, dict_t *xdata) { int32_t ret = -1; int32_t op_ret = -1; int32_t op_errno = EINVAL; int fakesuccess = 0; br_stub_private_t *priv = NULL; struct br_stub_signentry *sigstub = NULL; priv = this->private; if (frame->root->pid != GF_CLIENT_PID_BITD) { gf_msg(this->name, GF_LOG_WARNING, op_errno, BRS_MSG_NON_BITD_PID, "PID %d from where signature request" "came, does not belong to bit-rot daemon." "Unwinding the fop", frame->root->pid); goto dofop; } ret = br_stub_prepare_signature(this, dict, fd->inode, sign, &fakesuccess); if (ret) { gf_msg(this->name, GF_LOG_WARNING, 0, BRS_MSG_SIGN_PREPARE_FAIL, "failed to prepare the signature for %s. Unwinding the fop", uuid_utoa(fd->inode->gfid)); goto dofop; } if (fakesuccess) { op_ret = op_errno = 0; goto dofop; } dict_del(dict, GLUSTERFS_SET_OBJECT_SIGNATURE); ret = -1; if (!xdata) { xdata = dict_new(); if (!xdata) goto dofop; } else { dict_ref(xdata); } ret = dict_set_int32(xdata, GLUSTERFS_DURABLE_OP, 0); if (ret) goto unref_dict; /* prepare dispatch stub to order object signing */ sigstub = GF_CALLOC(1, sizeof(*sigstub), gf_br_stub_mt_sigstub_t); if (!sigstub) goto unref_dict; INIT_LIST_HEAD(&sigstub->list); sigstub->v = ntohl(sign->signedversion); sigstub->stub = fop_fsetxattr_stub(frame, br_stub_perform_objsign, fd, dict, 0, xdata); if (!sigstub->stub) goto cleanup_stub; pthread_mutex_lock(&priv->lock); { list_add_order(&sigstub->list, &priv->squeue, orderq); pthread_cond_signal(&priv->cond); } pthread_mutex_unlock(&priv->lock); return; cleanup_stub: GF_FREE(sigstub); unref_dict: dict_unref(xdata); dofop: STACK_UNWIND_STRICT(fsetxattr, frame, op_ret, op_errno, NULL); } int32_t br_stub_fsetxattr_resume(call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno, dict_t *xdata) { int32_t ret = -1; br_stub_local_t *local = NULL; local = frame->local; frame->local = NULL; ret = br_stub_mark_inode_modified(this, local); if (ret) { op_ret = -1; op_errno = EINVAL; } STACK_UNWIND_STRICT(fsetxattr, frame, op_ret, op_errno, xdata); br_stub_cleanup_local(local); br_stub_dealloc_local(local); return 0; } /** * Handles object reopens. Object reopens can be of 3 types. 2 are from * oneshot crawler and 1 from the regular signer. * ONESHOT CRAWLER: * For those objects which were created before bitrot was enabled. oneshow * crawler crawls the namespace and signs all the objects. It has to do * the versioning before making bit-rot-stub send a sign notification. * So it sends fsetxattr with BR_OBJECT_REOPEN as the value. And bit-rot-stub * upon getting BR_OBJECT_REOPEN value checks if the version has to be * increased or not. By default the version will be increased. But if the * object is modified before BR_OBJECT_REOPEN from oneshot crawler, then * versioning need not be done. In that case simply a success is returned. * SIGNER: * Signer wait for 2 minutes upon getting the notification from bit-rot-stub * and then it sends a dummy write (in reality a fsetxattr) call, to change * the state of the inode from REOPEN_WAIT to SIGN_QUICK. The funny part here * is though the inode's state is REOPEN_WAIT, the call sent by signer is * BR_OBJECT_RESIGN. Once the state is changed to SIGN_QUICK, then yet another * notification is sent upon release (RESIGN would have happened via fsetxattr, * so a fd is needed) and the object is signed truly this time. * There is a challenge in the above RESIGN method by signer. After sending * the 1st notification, the inode could be forgotten before RESIGN request * is received. In that case, the inode's context (the newly looked up inode) * would not indicate the inode as being modified (it would be in the default * state) and because of this, a SIGN_QUICK notification to truly sign the * object would not be sent. So, this is how its handled. * if (request == RESIGN) { * if (inode->sign_info == NORMAL) { * mark_inode_non_dirty; * mark_inode_modified; * } * GOBACK (means unwind without doing versioning) * } */ static void br_stub_handle_object_reopen(call_frame_t *frame, xlator_t *this, fd_t *fd, uint32_t val) { int32_t ret = -1; int32_t op_ret = -1; int32_t op_errno = EINVAL; call_stub_t *stub = NULL; gf_boolean_t inc_version = _gf_false; gf_boolean_t modified = _gf_false; br_stub_inode_ctx_t *ctx = NULL; br_stub_local_t *local = NULL; gf_boolean_t goback = _gf_true; ret = br_stub_need_versioning(this, fd, &inc_version, &modified, &ctx); if (ret) goto unwind; LOCK(&fd->inode->lock); { if ((val == BR_OBJECT_REOPEN) && inc_version) goback = _gf_false; if (val == BR_OBJECT_RESIGN && ctx->info_sign == BR_SIGN_NORMAL) { __br_stub_mark_inode_synced(ctx); __br_stub_set_inode_modified(ctx); } (void)__br_stub_inode_sign_state(ctx, GF_FOP_FSETXATTR, fd); } UNLOCK(&fd->inode->lock); if (goback) { op_ret = op_errno = 0; goto unwind; } ret = br_stub_versioning_prep(frame, this, fd, ctx); if (ret) goto unwind; local = frame->local; stub = fop_fsetxattr_cbk_stub(frame, br_stub_fsetxattr_resume, 0, 0, NULL); if (!stub) { gf_msg(this->name, GF_LOG_ERROR, 0, BRS_MSG_STUB_ALLOC_FAILED, "failed to allocate stub for fsetxattr fop (gfid: %s)," " unwinding", uuid_utoa(fd->inode->gfid)); goto cleanup_local; } (void)br_stub_perform_incversioning(this, frame, stub, fd, ctx); return; cleanup_local: br_stub_cleanup_local(local); br_stub_dealloc_local(local); unwind: frame->local = NULL; STACK_UNWIND_STRICT(fsetxattr, frame, op_ret, op_errno, NULL); } /** * This function only handles bad file identification. Instead of checking in * fops like open, readv, writev whether the object is bad or not by doing * getxattr calls, better to catch them when scrubber marks it as bad. * So this callback is called only when the fsetxattr is sent by the scrubber * to mark the object as bad. */ int br_stub_fsetxattr_bad_object_cbk(call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno, dict_t *xdata) { br_stub_local_t *local = NULL; int32_t ret = -1; local = frame->local; frame->local = NULL; if (op_ret < 0) goto unwind; /* * What to do if marking the object as bad fails? (i.e. in memory * marking within the inode context. If we are here means fsetxattr * fop has succeeded on disk and the bad object xattr has been set). * We can return failure to scruber, but there is nothing the scrubber * can do with it (it might assume that the on disk setxattr itself has * failed). The main purpose of this operation is to help identify the * bad object by checking the inode context itself (thus avoiding the * necessity of doing a getxattr fop on the disk). * * So as of now, success itself is being returned even though inode * context set operation fails. * In future if there is any change in the policy which can handle this, * then appropriate response should be sent (i.e. success or error). */ ret = br_stub_mark_object_bad(this, local->u.context.inode); if (ret) gf_msg(this->name, GF_LOG_ERROR, 0, BRS_MSG_BAD_OBJ_MARK_FAIL, "failed to mark object %s as bad", uuid_utoa(local->u.context.inode->gfid)); ret = br_stub_add(this, local->u.context.inode->gfid); unwind: STACK_UNWIND_STRICT(fsetxattr, frame, op_ret, op_errno, xdata); br_stub_cleanup_local(local); br_stub_dealloc_local(local); return 0; } static int32_t br_stub_handle_bad_object_key(call_frame_t *frame, xlator_t *this, fd_t *fd, dict_t *dict, int flags, dict_t *xdata) { br_stub_local_t *local = NULL; int32_t op_ret = -1; int32_t op_errno = EINVAL; if (frame->root->pid != GF_CLIENT_PID_SCRUB) { gf_msg(this->name, GF_LOG_ERROR, 0, BRS_MSG_NON_SCRUB_BAD_OBJ_MARK, "bad object marking " "on %s is not from the scrubber", uuid_utoa(fd->inode->gfid)); goto unwind; } local = br_stub_alloc_local(this); if (!local) { gf_msg(this->name, GF_LOG_ERROR, 0, BRS_MSG_NO_MEMORY, "failed to allocate memory for fsetxattr on %s", uuid_utoa(fd->inode->gfid)); op_ret = -1; op_errno = ENOMEM; goto unwind; } br_stub_fill_local(local, NULL, fd, fd->inode, fd->inode->gfid, BR_STUB_NO_VERSIONING, 0); frame->local = local; STACK_WIND(frame, br_stub_fsetxattr_bad_object_cbk, FIRST_CHILD(this), FIRST_CHILD(this)->fops->fsetxattr, fd, dict, flags, xdata); return 0; unwind: STACK_UNWIND_STRICT(fsetxattr, frame, op_ret, op_errno, NULL); return 0; } /** * As of now, versioning is done by the stub (though as a setxattr * operation) as part of inode modification operations such as writev, * truncate, ftruncate. And signing is done by BitD by a fsetxattr call. * So any kind of setxattr coming on the versioning and the signing xattr is * not allowed (i.e. BITROT_CURRENT_VERSION_KEY and BITROT_SIGNING_VERSION_KEY). * In future if BitD/scrubber are allowed to change the versioning * xattrs (though I cannot see a reason for it as of now), then the below * function can be modified to block setxattr on version for only applications. * * NOTE: BitD sends sign request on GLUSTERFS_SET_OBJECT_SIGNATURE key. * BITROT_SIGNING_VERSION_KEY is the xattr used to save the signature. * */ static int32_t br_stub_handle_internal_xattr(call_frame_t *frame, xlator_t *this, fd_t *fd, char *key) { int32_t op_ret = -1; int32_t op_errno = EINVAL; gf_msg(this->name, GF_LOG_ERROR, 0, BRS_MSG_SET_INTERNAL_XATTR, "setxattr called" " on the internal xattr %s for inode %s", key, uuid_utoa(fd->inode->gfid)); STACK_UNWIND_STRICT(fsetxattr, frame, op_ret, op_errno, NULL); return 0; } static void br_stub_dump_xattr(xlator_t *this, dict_t *dict, int *op_errno) { char *format = "(%s:%s)"; char *dump = NULL; dump = GF_CALLOC(1, BR_STUB_DUMP_STR_SIZE, gf_br_stub_mt_misc); if (!dump) { *op_errno = ENOMEM; goto out; } dict_dump_to_str(dict, dump, BR_STUB_DUMP_STR_SIZE, format); gf_msg(this->name, GF_LOG_ERROR, 0, BRS_MSG_SET_INTERNAL_XATTR, "fsetxattr called on " "internal xattr %s", dump); out: if (dump) { GF_FREE(dump); } return; } int br_stub_fsetxattr(call_frame_t *frame, xlator_t *this, fd_t *fd, dict_t *dict, int flags, dict_t *xdata) { int32_t ret = 0; uint32_t val = 0; br_isignature_t *sign = NULL; br_stub_private_t *priv = NULL; int32_t op_ret = -1; int32_t op_errno = EINVAL; priv = this->private; if ((frame->root->pid != GF_CLIENT_PID_BITD && frame->root->pid != GF_CLIENT_PID_SCRUB) && br_stub_internal_xattr(dict)) { br_stub_dump_xattr(this, dict, &op_errno); goto unwind; } if (!priv->do_versioning) goto wind; if (!IA_ISREG(fd->inode->ia_type)) goto wind; /* object signature request */ ret = dict_get_bin(dict, GLUSTERFS_SET_OBJECT_SIGNATURE, (void **)&sign); if (!ret) { gf_msg_debug(this->name, 0, "got SIGNATURE request on %s", uuid_utoa(fd->inode->gfid)); br_stub_handle_object_signature(frame, this, fd, dict, sign, xdata); goto done; } /* signing xattr */ if (dict_get(dict, BITROT_SIGNING_VERSION_KEY)) { br_stub_handle_internal_xattr(frame, this, fd, BITROT_SIGNING_VERSION_KEY); goto done; } /* version xattr */ if (dict_get(dict, BITROT_CURRENT_VERSION_KEY)) { br_stub_handle_internal_xattr(frame, this, fd, BITROT_CURRENT_VERSION_KEY); goto done; } if (dict_get(dict, GLUSTERFS_GET_OBJECT_SIGNATURE)) { br_stub_handle_internal_xattr(frame, this, fd, GLUSTERFS_GET_OBJECT_SIGNATURE); goto done; } /* object reopen request */ ret = dict_get_uint32(dict, BR_REOPEN_SIGN_HINT_KEY, &val); if (!ret) { br_stub_handle_object_reopen(frame, this, fd, val); goto done; } /* handle bad object */ if (dict_get(dict, BITROT_OBJECT_BAD_KEY)) { br_stub_handle_bad_object_key(frame, this, fd, dict, flags, xdata); goto done; } wind: STACK_WIND(frame, default_fsetxattr_cbk, FIRST_CHILD(this), FIRST_CHILD(this)->fops->fsetxattr, fd, dict, flags, xdata); return 0; unwind: STACK_UNWIND_STRICT(fsetxattr, frame, op_ret, op_errno, NULL); done: return 0; } /** * Currently BitD and scrubber are doing fsetxattr to either sign the object * or to mark it as bad. Hence setxattr on any of those keys is denied directly * without checking from where the fop is coming. * Later, if BitD or Scrubber does setxattr of those keys, then appropriate * check has to be added below. */ int br_stub_setxattr(call_frame_t *frame, xlator_t *this, loc_t *loc, dict_t *dict, int flags, dict_t *xdata) { int32_t op_ret = -1; int32_t op_errno = EINVAL; if (br_stub_internal_xattr(dict)) { br_stub_dump_xattr(this, dict, &op_errno); goto unwind; } STACK_WIND_TAIL(frame, FIRST_CHILD(this), FIRST_CHILD(this)->fops->setxattr, loc, dict, flags, xdata); return 0; unwind: STACK_UNWIND_STRICT(setxattr, frame, op_ret, op_errno, NULL); return 0; } /** }}} */ /** {{{ */ /* {f}removexattr() */ int32_t br_stub_removexattr(call_frame_t *frame, xlator_t *this, loc_t *loc, const char *name, dict_t *xdata) { int32_t op_ret = -1; int32_t op_errno = EINVAL; if (!strcmp(BITROT_OBJECT_BAD_KEY, name) || !strcmp(BITROT_SIGNING_VERSION_KEY, name) || !strcmp(BITROT_CURRENT_VERSION_KEY, name)) { gf_msg(this->name, GF_LOG_WARNING, 0, BRS_MSG_REMOVE_INTERNAL_XATTR, "removexattr called" " on internal xattr %s for file %s", name, loc->path); goto unwind; } STACK_WIND_TAIL(frame, FIRST_CHILD(this), FIRST_CHILD(this)->fops->removexattr, loc, name, xdata); return 0; unwind: STACK_UNWIND_STRICT(removexattr, frame, op_ret, op_errno, NULL); return 0; } int32_t br_stub_fremovexattr(call_frame_t *frame, xlator_t *this, fd_t *fd, const char *name, dict_t *xdata) { int32_t op_ret = -1; int32_t op_errno = EINVAL; if (!strcmp(BITROT_OBJECT_BAD_KEY, name) || !strcmp(BITROT_SIGNING_VERSION_KEY, name) || !strcmp(BITROT_CURRENT_VERSION_KEY, name)) { gf_msg(this->name, GF_LOG_WARNING, 0, BRS_MSG_REMOVE_INTERNAL_XATTR, "removexattr called" " on internal xattr %s for inode %s", name, uuid_utoa(fd->inode->gfid)); goto unwind; } STACK_WIND_TAIL(frame, FIRST_CHILD(this), FIRST_CHILD(this)->fops->fremovexattr, fd, name, xdata); return 0; unwind: STACK_UNWIND_STRICT(fremovexattr, frame, op_ret, op_errno, NULL); return 0; } /** }}} */ /** {{{ */ /* {f}getxattr() */ int br_stub_listxattr_cbk(call_frame_t *frame, void *cookie, xlator_t *this, int op_ret, int op_errno, dict_t *xattr, dict_t *xdata) { if (op_ret < 0) goto unwind; br_stub_remove_vxattrs(xattr, _gf_true); unwind: STACK_UNWIND_STRICT(getxattr, frame, op_ret, op_errno, xattr, xdata); return 0; } /** * ONE SHOT CRAWLER from BitD signs the objects that it encounters while * crawling, if the object is identified as stale by the stub. Stub follows * the below logic to mark an object as stale or not. * If the ongoing version and the signed_version match, then the object is not * stale. Just return. Otherwise if they does not match, then it means one * of the below things. * 1) If the inode does not need write back of the version and the sign state is * is NORMAL, then some active i/o is going on the object. So skip it. * A notification will be sent to trigger the sign once the release is * received on the object. * 2) If inode does not need writeback of the version and the sign state is * either reopen wait or quick sign, then it means: * A) BitD restarted and it is not sure whether the object it encountered * while crawling is in its timer wheel or not. Since there is no way to * scan the timer wheel as of now, ONE SHOT CRAWLER just goes ahead and * signs the object. Since the inode does not need writeback, version will * not be incremented and directly the object will be signed. * 3) If the inode needs writeback, then it means the inode was forgotten after * the versioning and it has to be signed now. * * This is the algorithm followed: * if (ongoing_version == signed_version); then * object_is_not_stale; * return; * else; then * if (!inode_needs_writeback && inode_sign_state != NORMAL); then * object_is_stale; * if (inode_needs_writeback); then * object_is_stale; * * For SCRUBBER, no need to check for the sign state and inode writeback. * If the ondisk ongoingversion and the ondisk signed version does not match, * then treat the object as stale. */ char br_stub_is_object_stale(xlator_t *this, call_frame_t *frame, inode_t *inode, br_version_t *obuf, br_signature_t *sbuf) { uint64_t ctx_addr = 0; br_stub_inode_ctx_t *ctx = NULL; int32_t ret = -1; char stale = 0; if (obuf->ongoingversion == sbuf->signedversion) goto out; if (frame->root->pid == GF_CLIENT_PID_SCRUB) { stale = 1; goto out; } ret = br_stub_get_inode_ctx(this, inode, &ctx_addr); if (ret) { gf_msg(this->name, GF_LOG_ERROR, 0, BRS_MSG_GET_INODE_CONTEXT_FAILED, "failed to get the " "inode context for %s", uuid_utoa(inode->gfid)); goto out; } ctx = (br_stub_inode_ctx_t *)(long)ctx_addr; LOCK(&inode->lock); { if ((!__br_stub_is_inode_dirty(ctx) && ctx->info_sign != BR_SIGN_NORMAL) || __br_stub_is_inode_dirty(ctx)) stale = 1; } UNLOCK(&inode->lock); out: return stale; } int br_stub_getxattr_cbk(call_frame_t *frame, void *cookie, xlator_t *this, int op_ret, int op_errno, dict_t *xattr, dict_t *xdata) { int32_t ret = 0; size_t totallen = 0; size_t signaturelen = 0; br_stub_private_t *priv = NULL; br_version_t *obuf = NULL; br_signature_t *sbuf = NULL; br_isignature_out_t *sign = NULL; br_vxattr_status_t status; br_stub_local_t *local = NULL; inode_t *inode = NULL; gf_boolean_t bad_object = _gf_false; gf_boolean_t ver_enabled = _gf_false; BR_STUB_VER_ENABLED_IN_CALLPATH(frame, ver_enabled); priv = this->private; if (op_ret < 0) goto unwind; BR_STUB_VER_COND_GOTO(priv, (!ver_enabled), delkeys); if (cookie != (void *)BR_STUB_REQUEST_COOKIE) goto unwind; local = frame->local; frame->local = NULL; if (!local) { op_ret = -1; op_errno = EINVAL; goto unwind; } inode = local->u.context.inode; op_ret = -1; status = br_version_xattr_state(xattr, &obuf, &sbuf, &bad_object); op_errno = EIO; if (bad_object) goto delkeys; op_errno = EINVAL; if (status == BR_VXATTR_STATUS_INVALID) goto delkeys; op_errno = ENODATA; if ((status == BR_VXATTR_STATUS_MISSING) || (status == BR_VXATTR_STATUS_UNSIGNED)) goto delkeys; /** * okay.. we have enough information to satisfy the request, * namely: version and signing extended attribute. what's * pending is the signature length -- that's figured out * indirectly via the size of the _whole_ xattr and the * on-disk signing xattr header size. */ op_errno = EINVAL; ret = dict_get_uint32(xattr, BITROT_SIGNING_XATTR_SIZE_KEY, (uint32_t *)&signaturelen); if (ret) goto delkeys; signaturelen -= sizeof(br_signature_t); totallen = sizeof(br_isignature_out_t) + signaturelen; op_errno = ENOMEM; sign = GF_CALLOC(1, totallen, gf_br_stub_mt_signature_t); if (!sign) goto delkeys; sign->time[0] = obuf->timebuf[0]; sign->time[1] = obuf->timebuf[1]; /* Object's dirty state & current signed version */ sign->version = sbuf->signedversion; sign->stale = br_stub_is_object_stale(this, frame, inode, obuf, sbuf); /* Object's signature */ sign->signaturelen = signaturelen; sign->signaturetype = sbuf->signaturetype; (void)memcpy(sign->signature, sbuf->signature, signaturelen); op_errno = EINVAL; ret = dict_set_bin(xattr, GLUSTERFS_GET_OBJECT_SIGNATURE, (void *)sign, totallen); if (ret < 0) { GF_FREE(sign); goto delkeys; } op_errno = 0; op_ret = totallen; delkeys: br_stub_remove_vxattrs(xattr, _gf_true); unwind: STACK_UNWIND_STRICT(getxattr, frame, op_ret, op_errno, xattr, xdata); br_stub_cleanup_local(local); br_stub_dealloc_local(local); return 0; } static void br_stub_send_stub_init_time(call_frame_t *frame, xlator_t *this) { int op_ret = 0; int op_errno = 0; dict_t *xattr = NULL; br_stub_init_t stub = { { 0, }, }; br_stub_private_t *priv = NULL; priv = this->private; xattr = dict_new(); if (!xattr) { op_ret = -1; op_errno = ENOMEM; goto unwind; } stub.timebuf[0] = priv->boot[0]; stub.timebuf[1] = priv->boot[1]; memcpy(stub.export, priv->export, strlen(priv->export) + 1); op_ret = dict_set_static_bin(xattr, GLUSTERFS_GET_BR_STUB_INIT_TIME, (void *)&stub, sizeof(br_stub_init_t)); if (op_ret < 0) { op_errno = EINVAL; goto unwind; } op_ret = sizeof(br_stub_init_t); unwind: STACK_UNWIND_STRICT(getxattr, frame, op_ret, op_errno, xattr, NULL); if (xattr) dict_unref(xattr); } int br_stub_getxattr(call_frame_t *frame, xlator_t *this, loc_t *loc, const char *name, dict_t *xdata) { void *cookie = NULL; uuid_t rootgfid = { 0, }; fop_getxattr_cbk_t cbk = br_stub_getxattr_cbk; int32_t op_ret = -1; int32_t op_errno = EINVAL; br_stub_local_t *local = NULL; br_stub_private_t *priv = NULL; GF_VALIDATE_OR_GOTO("bit-rot-stub", this, unwind); GF_VALIDATE_OR_GOTO(this->name, loc, unwind); GF_VALIDATE_OR_GOTO(this->name, this->private, unwind); GF_VALIDATE_OR_GOTO(this->name, loc->inode, unwind); rootgfid[15] = 1; if (!name) { cbk = br_stub_listxattr_cbk; goto wind; } if (br_stub_is_internal_xattr(name)) goto unwind; priv = this->private; BR_STUB_VER_NOT_ACTIVE_THEN_GOTO(frame, priv, wind); /** * If xattr is node-uuid and the inode is marked bad, return EIO. * Returning EIO would result in AFR to choose correct node-uuid * corresponding to the subvolume * where the good copy of the * file resides. */ if (IA_ISREG(loc->inode->ia_type) && XATTR_IS_NODE_UUID(name) && br_stub_check_bad_object(this, loc->inode, &op_ret, &op_errno)) { goto unwind; } /** * this special extended attribute is allowed only on root */ if (name && (strncmp(name, GLUSTERFS_GET_BR_STUB_INIT_TIME, sizeof(GLUSTERFS_GET_BR_STUB_INIT_TIME) - 1) == 0) && ((gf_uuid_compare(loc->gfid, rootgfid) == 0) || (gf_uuid_compare(loc->inode->gfid, rootgfid) == 0))) { BR_STUB_RESET_LOCAL_NULL(frame); br_stub_send_stub_init_time(frame, this); return 0; } if (!IA_ISREG(loc->inode->ia_type)) goto wind; if (name && (strncmp(name, GLUSTERFS_GET_OBJECT_SIGNATURE, sizeof(GLUSTERFS_GET_OBJECT_SIGNATURE) - 1) == 0)) { cookie = (void *)BR_STUB_REQUEST_COOKIE; local = br_stub_alloc_local(this); if (!local) { op_ret = -1; op_errno = ENOMEM; goto unwind; } br_stub_fill_local(local, NULL, NULL, loc->inode, loc->inode->gfid, BR_STUB_NO_VERSIONING, 0); frame->local = local; } wind: STACK_WIND_COOKIE(frame, cbk, cookie, FIRST_CHILD(this), FIRST_CHILD(this)->fops->getxattr, loc, name, xdata); return 0; unwind: BR_STUB_RESET_LOCAL_NULL(frame); STACK_UNWIND_STRICT(getxattr, frame, op_ret, op_errno, NULL, NULL); return 0; } int br_stub_fgetxattr(call_frame_t *frame, xlator_t *this, fd_t *fd, const char *name, dict_t *xdata) { void *cookie = NULL; uuid_t rootgfid = { 0, }; fop_fgetxattr_cbk_t cbk = br_stub_getxattr_cbk; int32_t op_ret = -1; int32_t op_errno = EINVAL; br_stub_local_t *local = NULL; br_stub_private_t *priv = NULL; rootgfid[15] = 1; priv = this->private; if (!name) { cbk = br_stub_listxattr_cbk; goto wind; } if (br_stub_is_internal_xattr(name)) goto unwind; BR_STUB_VER_NOT_ACTIVE_THEN_GOTO(frame, priv, wind); /** * If xattr is node-uuid and the inode is marked bad, return EIO. * Returning EIO would result in AFR to choose correct node-uuid * corresponding to the subvolume * where the good copy of the * file resides. */ if (IA_ISREG(fd->inode->ia_type) && XATTR_IS_NODE_UUID(name) && br_stub_check_bad_object(this, fd->inode, &op_ret, &op_errno)) { goto unwind; } /** * this special extended attribute is allowed only on root */ if (name && (strncmp(name, GLUSTERFS_GET_BR_STUB_INIT_TIME, sizeof(GLUSTERFS_GET_BR_STUB_INIT_TIME) - 1) == 0) && (gf_uuid_compare(fd->inode->gfid, rootgfid) == 0)) { BR_STUB_RESET_LOCAL_NULL(frame); br_stub_send_stub_init_time(frame, this); return 0; } if (!IA_ISREG(fd->inode->ia_type)) goto wind; if (name && (strncmp(name, GLUSTERFS_GET_OBJECT_SIGNATURE, sizeof(GLUSTERFS_GET_OBJECT_SIGNATURE) - 1) == 0)) { cookie = (void *)BR_STUB_REQUEST_COOKIE; local = br_stub_alloc_local(this); if (!local) { op_ret = -1; op_errno = ENOMEM; goto unwind; } br_stub_fill_local(local, NULL, fd, fd->inode, fd->inode->gfid, BR_STUB_NO_VERSIONING, 0); frame->local = local; } wind: STACK_WIND_COOKIE(frame, cbk, cookie, FIRST_CHILD(this), FIRST_CHILD(this)->fops->fgetxattr, fd, name, xdata); return 0; unwind: BR_STUB_RESET_LOCAL_NULL(frame); STACK_UNWIND_STRICT(fgetxattr, frame, op_ret, op_errno, NULL, NULL); return 0; } int32_t br_stub_readv(call_frame_t *frame, xlator_t *this, fd_t *fd, size_t size, off_t offset, uint32_t flags, dict_t *xdata) { int32_t op_ret = -1; int32_t op_errno = EINVAL; int32_t ret = -1; br_stub_private_t *priv = NULL; GF_VALIDATE_OR_GOTO("bit-rot-stub", this, unwind); GF_VALIDATE_OR_GOTO(this->name, frame, unwind); GF_VALIDATE_OR_GOTO(this->name, this->private, unwind); GF_VALIDATE_OR_GOTO(this->name, fd, unwind); GF_VALIDATE_OR_GOTO(this->name, fd->inode, unwind); priv = this->private; if (!priv->do_versioning) goto wind; ret = br_stub_check_bad_object(this, fd->inode, &op_ret, &op_errno); if (ret) goto unwind; wind: STACK_WIND_TAIL(frame, FIRST_CHILD(this), FIRST_CHILD(this)->fops->readv, fd, size, offset, flags, xdata); return 0; unwind: STACK_UNWIND_STRICT(readv, frame, op_ret, op_errno, NULL, 0, NULL, NULL, NULL); return 0; } /** * The first write response on the first fd in the list of fds will set * the flag to indicate that the inode is modified. The subsequent write * respnses coming on either the first fd or some other fd will not change * the fd. The inode-modified flag is unset only upon release of all the * fds. */ int32_t br_stub_writev_cbk(call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno, struct iatt *prebuf, struct iatt *postbuf, dict_t *xdata) { int32_t ret = 0; br_stub_local_t *local = NULL; local = frame->local; frame->local = NULL; if (op_ret < 0) goto unwind; ret = br_stub_mark_inode_modified(this, local); if (ret) { op_ret = -1; op_errno = EINVAL; } unwind: STACK_UNWIND_STRICT(writev, frame, op_ret, op_errno, prebuf, postbuf, xdata); br_stub_cleanup_local(local); br_stub_dealloc_local(local); return 0; } int32_t br_stub_writev_resume(call_frame_t *frame, xlator_t *this, fd_t *fd, struct iovec *vector, int32_t count, off_t offset, uint32_t flags, struct iobref *iobref, dict_t *xdata) { STACK_WIND(frame, br_stub_writev_cbk, FIRST_CHILD(this), FIRST_CHILD(this)->fops->writev, fd, vector, count, offset, flags, iobref, xdata); return 0; } /** * This is probably the most crucial part about the whole versioning thing. * There's absolutely no differentiation as such between an anonymous fd * and a regular fd except the fd context initialization. Object versioning * is performed when the inode is dirty. Parallel write operations are no * special with each write performing object versioning followed by marking * the inode as non-dirty (synced). This is followed by the actual operation * (writev() in this case) which on a success marks the inode as modified. * This prevents signing of objects that have not been modified. */ int32_t br_stub_writev(call_frame_t *frame, xlator_t *this, fd_t *fd, struct iovec *vector, int32_t count, off_t offset, uint32_t flags, struct iobref *iobref, dict_t *xdata) { call_stub_t *stub = NULL; int32_t op_ret = -1; int32_t op_errno = EINVAL; gf_boolean_t inc_version = _gf_false; gf_boolean_t modified = _gf_false; br_stub_inode_ctx_t *ctx = NULL; int32_t ret = -1; fop_writev_cbk_t cbk = default_writev_cbk; br_stub_local_t *local = NULL; br_stub_private_t *priv = NULL; GF_VALIDATE_OR_GOTO("bit-rot-stub", this, unwind); GF_VALIDATE_OR_GOTO(this->name, this->private, unwind); GF_VALIDATE_OR_GOTO(this->name, frame, unwind); GF_VALIDATE_OR_GOTO(this->name, fd, unwind); priv = this->private; if (!priv->do_versioning) goto wind; ret = br_stub_need_versioning(this, fd, &inc_version, &modified, &ctx); if (ret) goto unwind; ret = br_stub_check_bad_object(this, fd->inode, &op_ret, &op_errno); if (ret) goto unwind; /** * The inode is not dirty and also witnessed at least one successful * modification operation. Therefore, subsequent operations need not * perform any special tracking. */ if (!inc_version && modified) goto wind; /** * okay.. so, either the inode needs versioning or the modification * needs to be tracked. ->cbk is set to the appropriate callback * routine for this. * NOTE: ->local needs to be deallocated on failures from here on. */ ret = br_stub_versioning_prep(frame, this, fd, ctx); if (ret) goto unwind; local = frame->local; if (!inc_version) { br_stub_fill_local(local, NULL, fd, fd->inode, fd->inode->gfid, BR_STUB_NO_VERSIONING, 0); cbk = br_stub_writev_cbk; goto wind; } stub = fop_writev_stub(frame, br_stub_writev_resume, fd, vector, count, offset, flags, iobref, xdata); if (!stub) { gf_msg(this->name, GF_LOG_ERROR, 0, BRS_MSG_STUB_ALLOC_FAILED, "failed to allocate stub for write fop (gfid: %s), " "unwinding", uuid_utoa(fd->inode->gfid)); goto cleanup_local; } /* Perform Versioning */ return br_stub_perform_incversioning(this, frame, stub, fd, ctx); wind: STACK_WIND(frame, cbk, FIRST_CHILD(this), FIRST_CHILD(this)->fops->writev, fd, vector, count, offset, flags, iobref, xdata); return 0; cleanup_local: br_stub_cleanup_local(local); br_stub_dealloc_local(local); unwind: frame->local = NULL; STACK_UNWIND_STRICT(writev, frame, op_ret, op_errno, NULL, NULL, NULL); return 0; } int32_t br_stub_ftruncate_cbk(call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno, struct iatt *prebuf, struct iatt *postbuf, dict_t *xdata) { int32_t ret = -1; br_stub_local_t *local = NULL; local = frame->local; frame->local = NULL; if (op_ret < 0) goto unwind; ret = br_stub_mark_inode_modified(this, local); if (ret) { op_ret = -1; op_errno = EINVAL; } unwind: STACK_UNWIND_STRICT(ftruncate, frame, op_ret, op_errno, prebuf, postbuf, xdata); br_stub_cleanup_local(local); br_stub_dealloc_local(local); return 0; } int32_t br_stub_ftruncate_resume(call_frame_t *frame, xlator_t *this, fd_t *fd, off_t offset, dict_t *xdata) { STACK_WIND(frame, br_stub_ftruncate_cbk, FIRST_CHILD(this), FIRST_CHILD(this)->fops->ftruncate, fd, offset, xdata); return 0; } /* c.f. br_stub_writev() for explanation */ int32_t br_stub_ftruncate(call_frame_t *frame, xlator_t *this, fd_t *fd, off_t offset, dict_t *xdata) { br_stub_local_t *local = NULL; call_stub_t *stub = NULL; int32_t op_ret = -1; int32_t op_errno = EINVAL; gf_boolean_t inc_version = _gf_false; gf_boolean_t modified = _gf_false; br_stub_inode_ctx_t *ctx = NULL; int32_t ret = -1; fop_ftruncate_cbk_t cbk = default_ftruncate_cbk; br_stub_private_t *priv = NULL; GF_VALIDATE_OR_GOTO("bit-rot-stub", this, unwind); GF_VALIDATE_OR_GOTO(this->name, this->private, unwind); GF_VALIDATE_OR_GOTO(this->name, frame, unwind); GF_VALIDATE_OR_GOTO(this->name, fd, unwind); priv = this->private; if (!priv->do_versioning) goto wind; ret = br_stub_need_versioning(this, fd, &inc_version, &modified, &ctx); if (ret) goto unwind; ret = br_stub_check_bad_object(this, fd->inode, &op_ret, &op_errno); if (ret) goto unwind; if (!inc_version && modified) goto wind; ret = br_stub_versioning_prep(frame, this, fd, ctx); if (ret) goto unwind; local = frame->local; if (!inc_version) { br_stub_fill_local(local, NULL, fd, fd->inode, fd->inode->gfid, BR_STUB_NO_VERSIONING, 0); cbk = br_stub_ftruncate_cbk; goto wind; } stub = fop_ftruncate_stub(frame, br_stub_ftruncate_resume, fd, offset, xdata); if (!stub) { gf_msg(this->name, GF_LOG_ERROR, 0, BRS_MSG_STUB_ALLOC_FAILED, "failed to allocate stub for ftruncate fop (gfid: %s)," " unwinding", uuid_utoa(fd->inode->gfid)); goto cleanup_local; } return br_stub_perform_incversioning(this, frame, stub, fd, ctx); wind: STACK_WIND(frame, cbk, FIRST_CHILD(this), FIRST_CHILD(this)->fops->ftruncate, fd, offset, xdata); return 0; cleanup_local: br_stub_cleanup_local(local); br_stub_dealloc_local(local); unwind: frame->local = NULL; STACK_UNWIND_STRICT(ftruncate, frame, op_ret, op_errno, NULL, NULL, NULL); return 0; } int32_t br_stub_truncate_cbk(call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno, struct iatt *prebuf, struct iatt *postbuf, dict_t *xdata) { int32_t ret = 0; br_stub_local_t *local = NULL; local = frame->local; frame->local = NULL; if (op_ret < 0) goto unwind; ret = br_stub_mark_inode_modified(this, local); if (ret) { op_ret = -1; op_errno = EINVAL; } unwind: STACK_UNWIND_STRICT(truncate, frame, op_ret, op_errno, prebuf, postbuf, xdata); br_stub_cleanup_local(local); br_stub_dealloc_local(local); return 0; } int32_t br_stub_truncate_resume(call_frame_t *frame, xlator_t *this, loc_t *loc, off_t offset, dict_t *xdata) { br_stub_local_t *local = frame->local; fd_unref(local->u.context.fd); STACK_WIND(frame, br_stub_ftruncate_cbk, FIRST_CHILD(this), FIRST_CHILD(this)->fops->truncate, loc, offset, xdata); return 0; } /** * Bit-rot-stub depends heavily on the fd based operations to for doing * versioning and sending notification. It starts tracking the operation * upon getting first fd based modify operation by doing versioning and * sends notification when last fd using which the inode was modified is * released. * But for truncate there is no fd and hence it becomes difficult to do * the versioning and send notification. It is handled by doing versioning * on an anonymous fd. The fd will be valid till the completion of the * truncate call. It guarantees that release on this anonymous fd will happen * after the truncate call and notification is sent after the truncate call. * * c.f. br_writev_cbk() for explanation */ int32_t br_stub_truncate(call_frame_t *frame, xlator_t *this, loc_t *loc, off_t offset, dict_t *xdata) { br_stub_local_t *local = NULL; call_stub_t *stub = NULL; int32_t op_ret = -1; int32_t op_errno = EINVAL; gf_boolean_t inc_version = _gf_false; gf_boolean_t modified = _gf_false; br_stub_inode_ctx_t *ctx = NULL; int32_t ret = -1; fd_t *fd = NULL; fop_truncate_cbk_t cbk = default_truncate_cbk; br_stub_private_t *priv = NULL; GF_VALIDATE_OR_GOTO("bit-rot-stub", this, unwind); GF_VALIDATE_OR_GOTO(this->name, this->private, unwind); GF_VALIDATE_OR_GOTO(this->name, frame, unwind); GF_VALIDATE_OR_GOTO(this->name, loc, unwind); GF_VALIDATE_OR_GOTO(this->name, loc->inode, unwind); priv = this->private; if (!priv->do_versioning) goto wind; fd = fd_anonymous(loc->inode); if (!fd) { gf_msg(this->name, GF_LOG_ERROR, 0, BRS_MSG_CREATE_ANONYMOUS_FD_FAILED, "failed to create " "anonymous fd for the inode %s", uuid_utoa(loc->inode->gfid)); goto unwind; } ret = br_stub_need_versioning(this, fd, &inc_version, &modified, &ctx); if (ret) goto cleanup_fd; ret = br_stub_check_bad_object(this, fd->inode, &op_ret, &op_errno); if (ret) goto unwind; if (!inc_version && modified) goto wind; ret = br_stub_versioning_prep(frame, this, fd, ctx); if (ret) goto cleanup_fd; local = frame->local; if (!inc_version) { br_stub_fill_local(local, NULL, fd, fd->inode, fd->inode->gfid, BR_STUB_NO_VERSIONING, 0); cbk = br_stub_truncate_cbk; goto wind; } stub = fop_truncate_stub(frame, br_stub_truncate_resume, loc, offset, xdata); if (!stub) { gf_msg(this->name, GF_LOG_ERROR, 0, BRS_MSG_STUB_ALLOC_FAILED, "failed to allocate stub for truncate fop (gfid: %s), " "unwinding", uuid_utoa(fd->inode->gfid)); goto cleanup_local; } return br_stub_perform_incversioning(this, frame, stub, fd, ctx); wind: STACK_WIND(frame, cbk, FIRST_CHILD(this), FIRST_CHILD(this)->fops->truncate, loc, offset, xdata); if (fd) fd_unref(fd); return 0; cleanup_local: br_stub_cleanup_local(local); br_stub_dealloc_local(local); cleanup_fd: fd_unref(fd); unwind: frame->local = NULL; STACK_UNWIND_STRICT(truncate, frame, op_ret, op_errno, NULL, NULL, NULL); return 0; } /** }}} */ /** {{{ */ /* open() */ /** * It's probably worth mentioning a bit about why some of the housekeeping * work is done in open() call path, rather than the callback path. * Two (or more) open()'s in parallel can race and lead to a situation * where a release() gets triggered (possibly after a series of write() * calls) when *other* open()'s have still not reached callback path * thereby having an active fd on an inode that is in process of getting * signed with the current version. * * Maintaining fd list in the call path ensures that a release() would * not be triggered if an open() call races ahead (followed by a close()) * threby finding non-empty fd list. */ int br_stub_open(call_frame_t *frame, xlator_t *this, loc_t *loc, int32_t flags, fd_t *fd, dict_t *xdata) { int32_t ret = -1; br_stub_inode_ctx_t *ctx = NULL; uint64_t ctx_addr = 0; int32_t op_ret = -1; int32_t op_errno = EINVAL; br_stub_private_t *priv = NULL; unsigned long version = BITROT_DEFAULT_CURRENT_VERSION; GF_VALIDATE_OR_GOTO("bit-rot-stub", this, unwind); GF_VALIDATE_OR_GOTO(this->name, this->private, unwind); GF_VALIDATE_OR_GOTO(this->name, loc, unwind); GF_VALIDATE_OR_GOTO(this->name, fd, unwind); GF_VALIDATE_OR_GOTO(this->name, fd->inode, unwind); priv = this->private; if (!priv->do_versioning) goto wind; ret = br_stub_get_inode_ctx(this, fd->inode, &ctx_addr); if (ret) { ret = br_stub_init_inode_versions(this, fd, fd->inode, version, _gf_true, _gf_false, &ctx_addr); if (ret) { gf_msg(this->name, GF_LOG_ERROR, 0, BRS_MSG_GET_INODE_CONTEXT_FAILED, "failed to init the inode context for " "the file %s (gfid: %s)", loc->path, uuid_utoa(fd->inode->gfid)); goto unwind; } } ctx = (br_stub_inode_ctx_t *)(long)ctx_addr; ret = br_stub_check_bad_object(this, fd->inode, &op_ret, &op_errno); if (ret) goto unwind; if (frame->root->pid == GF_CLIENT_PID_SCRUB) goto wind; if (flags == O_RDONLY) goto wind; ret = br_stub_add_fd_to_inode(this, fd, ctx); if (ret) { gf_msg(this->name, GF_LOG_ERROR, 0, BRS_MSG_ADD_FD_TO_LIST_FAILED, "failed add fd to the list (gfid: %s)", uuid_utoa(fd->inode->gfid)); goto unwind; } wind: STACK_WIND(frame, default_open_cbk, FIRST_CHILD(this), FIRST_CHILD(this)->fops->open, loc, flags, fd, xdata); return 0; unwind: STACK_UNWIND_STRICT(open, frame, op_ret, op_errno, NULL, NULL); return 0; } /** }}} */ /** {{{ */ /* creat() */ /** * This routine registers a release callback for the given fd and adds the * fd to the inode context fd tracking list. */ int32_t br_stub_add_fd_to_inode(xlator_t *this, fd_t *fd, br_stub_inode_ctx_t *ctx) { int32_t ret = -1; br_stub_fd_t *br_stub_fd = NULL; ret = br_stub_require_release_call(this, fd, &br_stub_fd); if (ret) { gf_msg(this->name, GF_LOG_ERROR, 0, BRS_MSG_SET_FD_CONTEXT_FAILED, "failed to set the fd " "context for the file (gfid: %s)", uuid_utoa(fd->inode->gfid)); goto out; } LOCK(&fd->inode->lock); { list_add_tail(&ctx->fd_list, &br_stub_fd->list); } UNLOCK(&fd->inode->lock); ret = 0; out: return ret; } int br_stub_create_cbk(call_frame_t *frame, void *cookie, xlator_t *this, int op_ret, int op_errno, fd_t *fd, inode_t *inode, struct iatt *stbuf, struct iatt *preparent, struct iatt *postparent, dict_t *xdata) { int32_t ret = 0; uint64_t ctx_addr = 0; br_stub_inode_ctx_t *ctx = NULL; unsigned long version = BITROT_DEFAULT_CURRENT_VERSION; br_stub_private_t *priv = NULL; priv = this->private; if (op_ret < 0) goto unwind; if (!priv->do_versioning) goto unwind; ret = br_stub_get_inode_ctx(this, fd->inode, &ctx_addr); if (ret < 0) { ret = br_stub_init_inode_versions(this, fd, inode, version, _gf_true, _gf_false, &ctx_addr); if (ret) { op_ret = -1; op_errno = EINVAL; } } else { ctx = (br_stub_inode_ctx_t *)(long)ctx_addr; ret = br_stub_add_fd_to_inode(this, fd, ctx); } unwind: STACK_UNWIND_STRICT(create, frame, op_ret, op_errno, fd, inode, stbuf, preparent, postparent, xdata); return 0; } int br_stub_create(call_frame_t *frame, xlator_t *this, loc_t *loc, int32_t flags, mode_t mode, mode_t umask, fd_t *fd, dict_t *xdata) { GF_VALIDATE_OR_GOTO("bit-rot-stub", this, unwind); GF_VALIDATE_OR_GOTO(this->name, loc, unwind); GF_VALIDATE_OR_GOTO(this->name, loc->inode, unwind); GF_VALIDATE_OR_GOTO(this->name, fd, unwind); GF_VALIDATE_OR_GOTO(this->name, fd->inode, unwind); STACK_WIND(frame, br_stub_create_cbk, FIRST_CHILD(this), FIRST_CHILD(this)->fops->create, loc, flags, mode, umask, fd, xdata); return 0; unwind: STACK_UNWIND_STRICT(create, frame, -1, EINVAL, NULL, NULL, NULL, NULL, NULL, NULL); return 0; } int br_stub_mknod_cbk(call_frame_t *frame, void *cookie, xlator_t *this, int op_ret, int op_errno, inode_t *inode, struct iatt *stbuf, struct iatt *preparent, struct iatt *postparent, dict_t *xdata) { int32_t ret = -1; unsigned long version = BITROT_DEFAULT_CURRENT_VERSION; br_stub_private_t *priv = NULL; priv = this->private; if (op_ret < 0) goto unwind; if (!priv->do_versioning) goto unwind; ret = br_stub_init_inode_versions(this, NULL, inode, version, _gf_true, _gf_false, NULL); /** * Like lookup, if init_inode_versions fail, return EINVAL */ if (ret) { op_ret = -1; op_errno = EINVAL; } unwind: STACK_UNWIND_STRICT(mknod, frame, op_ret, op_errno, inode, stbuf, preparent, postparent, xdata); return 0; } int br_stub_mknod(call_frame_t *frame, xlator_t *this, loc_t *loc, mode_t mode, dev_t dev, mode_t umask, dict_t *xdata) { GF_VALIDATE_OR_GOTO("bit-rot-stub", this, unwind); GF_VALIDATE_OR_GOTO(this->name, loc, unwind); GF_VALIDATE_OR_GOTO(this->name, loc->inode, unwind); STACK_WIND(frame, br_stub_mknod_cbk, FIRST_CHILD(this), FIRST_CHILD(this)->fops->mknod, loc, mode, dev, umask, xdata); return 0; unwind: STACK_UNWIND_STRICT(mknod, frame, -1, EINVAL, NULL, NULL, NULL, NULL, NULL); return 0; } /** }}} */ /** * As of now, only lookup searches for bad object xattr and marks the * object as bad in its inode context if the xattr is present. But there * is a possibility that, at the time of the lookup the object was not * marked bad (i.e. bad object xattr was not set), and later its marked * as bad. In this case, object is not bad, so when a fop such as open or * readv or writev comes on the object, the fop will be sent downward instead * of sending as error upwards. * The solution for this is to do a getxattr for the below list of fops. * lookup, readdirp, open, readv, writev. * But doing getxattr for each of the above fops might be costly. * So another method followed is to catch the bad file marking by the scrubber * and set that info within the object's inode context. In this way getxattr * calls can be avoided and bad objects can be caught instantly. Fetching the * xattr is needed only in lookups when there is a brick restart or inode * forget. * * If the dict (@xattr) is NULL, then how should that be handled? Fail the * lookup operation? Or let it continue with version being initialized to * BITROT_DEFAULT_CURRENT_VERSION. But what if the version was different * on disk (and also a right signature was there), but posix failed to * successfully allocate the dict? Posix does not treat call back xdata * creattion failure as the lookup failure. */ static int32_t br_stub_lookup_version(xlator_t *this, uuid_t gfid, inode_t *inode, dict_t *xattr) { unsigned long version = 0; br_version_t *obuf = NULL; br_signature_t *sbuf = NULL; br_vxattr_status_t status; gf_boolean_t bad_object = _gf_false; /** * versioning xattrs were requested from POSIX. if available, figure * out the correct version to use in the inode context (start with * the default version if unavailable). As of now versions are not * persisted on-disk. The inode is marked dirty, so that the first * operation (such as write(), etc..) triggers synchronization to * disk. */ status = br_version_xattr_state(xattr, &obuf, &sbuf, &bad_object); version = ((status == BR_VXATTR_STATUS_FULL) || (status == BR_VXATTR_STATUS_UNSIGNED)) ? obuf->ongoingversion : BITROT_DEFAULT_CURRENT_VERSION; /** * If signature is there, but version is not there then that status is * is treated as INVALID. So in that case, we should not initialize the * inode context with wrong version names etc. */ if (status == BR_VXATTR_STATUS_INVALID) return -1; return br_stub_init_inode_versions(this, NULL, inode, version, _gf_true, bad_object, NULL); } /** {{{ */ int32_t br_stub_opendir(call_frame_t *frame, xlator_t *this, loc_t *loc, fd_t *fd, dict_t *xdata) { br_stub_private_t *priv = NULL; br_stub_fd_t *fd_ctx = NULL; int32_t op_ret = -1; int32_t op_errno = EINVAL; priv = this->private; if (gf_uuid_compare(fd->inode->gfid, priv->bad_object_dir_gfid)) goto normal; fd_ctx = br_stub_fd_new(); if (!fd_ctx) { op_errno = ENOMEM; goto unwind; } fd_ctx->bad_object.dir_eof = -1; fd_ctx->bad_object.dir = sys_opendir(priv->stub_basepath); if (!fd_ctx->bad_object.dir) { op_errno = errno; goto err_freectx; } op_ret = br_stub_fd_ctx_set(this, fd, fd_ctx); if (!op_ret) goto unwind; sys_closedir(fd_ctx->bad_object.dir); err_freectx: GF_FREE(fd_ctx); unwind: STACK_UNWIND_STRICT(opendir, frame, op_ret, op_errno, fd, NULL); return 0; normal: STACK_WIND(frame, default_opendir_cbk, FIRST_CHILD(this), FIRST_CHILD(this)->fops->opendir, loc, fd, xdata); return 0; } int32_t br_stub_readdir(call_frame_t *frame, xlator_t *this, fd_t *fd, size_t size, off_t off, dict_t *xdata) { call_stub_t *stub = NULL; br_stub_private_t *priv = NULL; priv = this->private; if (!priv->do_versioning) goto out; if (gf_uuid_compare(fd->inode->gfid, priv->bad_object_dir_gfid)) goto out; stub = fop_readdir_stub(frame, br_stub_readdir_wrapper, fd, size, off, xdata); if (!stub) { STACK_UNWIND_STRICT(readdir, frame, -1, ENOMEM, NULL, NULL); return 0; } br_stub_worker_enqueue(this, stub); return 0; out: STACK_WIND(frame, default_readdir_cbk, FIRST_CHILD(this), FIRST_CHILD(this)->fops->readdir, fd, size, off, xdata); return 0; } int br_stub_readdirp_cbk(call_frame_t *frame, void *cookie, xlator_t *this, int op_ret, int op_errno, gf_dirent_t *entries, dict_t *dict) { int32_t ret = 0; uint64_t ctxaddr = 0; gf_dirent_t *entry = NULL; br_stub_private_t *priv = NULL; gf_boolean_t ver_enabled = _gf_false; BR_STUB_VER_ENABLED_IN_CALLPATH(frame, ver_enabled); priv = this->private; BR_STUB_VER_COND_GOTO(priv, (!ver_enabled), unwind); if (op_ret < 0) goto unwind; list_for_each_entry(entry, &entries->list, list) { if ((strcmp(entry->d_name, ".") == 0) || (strcmp(entry->d_name, "..") == 0)) continue; if (!IA_ISREG(entry->d_stat.ia_type)) continue; /* * Readdirp for most part is a bulk lookup for all the entries * present in the directory being read. Ideally, for each * entry, the handling should be similar to that of a lookup * callback. But for now, just keeping this as it has been * until now (which means, this comment has been added much * later as part of a change that wanted to send the flag * of true/false to br_stub_remove_vxattrs to indicate whether * the bad-object xattr should be removed from the entry->dict * or not). Until this change, the function br_stub_remove_vxattrs * was just removing all the xattrs associated with bit-rot-stub * (like version, bad-object, signature etc). But, there are * scenarios where we only want to send bad-object xattr and not * others. So this comment is part of that change which also * mentions about another possible change that might be needed * in future. * But for now, adding _gf_true means functionally its same as * what this function was doing before. Just remove all the stub * related xattrs. */ ret = br_stub_get_inode_ctx(this, entry->inode, &ctxaddr); if (ret < 0) ctxaddr = 0; if (ctxaddr) { /* already has the context */ br_stub_remove_vxattrs(entry->dict, _gf_true); continue; } ret = br_stub_lookup_version(this, entry->inode->gfid, entry->inode, entry->dict); br_stub_remove_vxattrs(entry->dict, _gf_true); if (ret) { /** * there's no per-file granularity support in case of * failure. let's fail the entire request for now.. */ break; } } if (ret) { op_ret = -1; op_errno = EINVAL; } unwind: STACK_UNWIND_STRICT(readdirp, frame, op_ret, op_errno, entries, dict); return 0; } int br_stub_readdirp(call_frame_t *frame, xlator_t *this, fd_t *fd, size_t size, off_t offset, dict_t *dict) { int32_t ret = -1; int op_errno = 0; gf_boolean_t xref = _gf_false; br_stub_private_t *priv = NULL; priv = this->private; BR_STUB_VER_NOT_ACTIVE_THEN_GOTO(frame, priv, wind); op_errno = ENOMEM; if (!dict) { dict = dict_new(); if (!dict) goto unwind; } else { dict = dict_ref(dict); } xref = _gf_true; op_errno = EINVAL; ret = dict_set_uint32(dict, BITROT_CURRENT_VERSION_KEY, 0); if (ret) goto unwind; ret = dict_set_uint32(dict, BITROT_SIGNING_VERSION_KEY, 0); if (ret) goto unwind; ret = dict_set_uint32(dict, BITROT_OBJECT_BAD_KEY, 0); if (ret) goto unwind; wind: STACK_WIND(frame, br_stub_readdirp_cbk, FIRST_CHILD(this), FIRST_CHILD(this)->fops->readdirp, fd, size, offset, dict); goto unref_dict; unwind: if (frame->local == (void *)0x1) frame->local = NULL; STACK_UNWIND_STRICT(readdirp, frame, -1, op_errno, NULL, NULL); return 0; unref_dict: if (xref) dict_unref(dict); return 0; } /** }}} */ /** {{{ */ /* lookup() */ /** * This function mainly handles the ENOENT error for the bad objects. Though * br_stub_forget () handles removal of the link for the bad object from the * quarantine directory, its better to handle it in lookup as well, where * a failed lookup on a bad object with ENOENT, will trigger deletion of the * link for the bad object from quarantine directory. So whoever comes first * either forget () or lookup () will take care of removing the link. */ void br_stub_handle_lookup_error(xlator_t *this, inode_t *inode, int32_t op_errno) { int32_t ret = -1; uint64_t ctx_addr = 0; br_stub_inode_ctx_t *ctx = NULL; if (op_errno != ENOENT) goto out; if (!inode_is_linked(inode)) goto out; ret = br_stub_get_inode_ctx(this, inode, &ctx_addr); if (ret) goto out; ctx = (br_stub_inode_ctx_t *)(long)ctx_addr; LOCK(&inode->lock); { if (__br_stub_is_bad_object(ctx)) (void)br_stub_del(this, inode->gfid); } UNLOCK(&inode->lock); if (__br_stub_is_bad_object(ctx)) { /* File is not present, might be deleted for recovery, * del the bitrot inode context */ ctx_addr = 0; inode_ctx_del(inode, this, &ctx_addr); if (ctx_addr) { ctx = (br_stub_inode_ctx_t *)(long)ctx_addr; GF_FREE(ctx); } } out: return; } int br_stub_lookup_cbk(call_frame_t *frame, void *cookie, xlator_t *this, int op_ret, int op_errno, inode_t *inode, struct iatt *stbuf, dict_t *xattr, struct iatt *postparent) { int32_t ret = 0; br_stub_private_t *priv = NULL; gf_boolean_t ver_enabled = _gf_false; gf_boolean_t remove_bad_file_marker = _gf_true; BR_STUB_VER_ENABLED_IN_CALLPATH(frame, ver_enabled); priv = this->private; if (op_ret < 0) { (void)br_stub_handle_lookup_error(this, inode, op_errno); /* * If the lookup error is not ENOENT, then it is better * to send the bad file marker to the higher layer (if * it has been set) */ if (op_errno != ENOENT) remove_bad_file_marker = _gf_false; goto delkey; } BR_STUB_VER_COND_GOTO(priv, (!ver_enabled), delkey); if (!IA_ISREG(stbuf->ia_type)) goto unwind; /** * If the object is bad, then "bad inode" marker has to be sent back * in resoinse, for revalidated lookups as well. Some xlators such as * quick-read might cache the data in revalidated lookup as fresh * lookup would anyway have sent "bad inode" marker. * In general send bad inode marker for every lookup operation on the * bad object. */ if (cookie != (void *)BR_STUB_REQUEST_COOKIE) { ret = br_stub_mark_xdata_bad_object(this, inode, xattr); if (ret) { op_ret = -1; op_errno = EIO; /* * This flag ensures that in the label @delkey below, * bad file marker is not removed from the dictinary, * but other virtual xattrs (such as version, signature) * are removed. */ remove_bad_file_marker = _gf_false; } goto delkey; } ret = br_stub_lookup_version(this, stbuf->ia_gfid, inode, xattr); if (ret < 0) { op_ret = -1; op_errno = EINVAL; goto delkey; } /** * If the object is bad, send "bad inode" marker back in response * for xlator(s) to act accordingly (such as quick-read, etc..) */ ret = br_stub_mark_xdata_bad_object(this, inode, xattr); if (ret) { /** * aaha! bad object, but sorry we would not * satisfy the request on allocation failures. */ op_ret = -1; op_errno = EIO; goto delkey; } delkey: br_stub_remove_vxattrs(xattr, remove_bad_file_marker); unwind: STACK_UNWIND_STRICT(lookup, frame, op_ret, op_errno, inode, stbuf, xattr, postparent); return 0; } int br_stub_lookup(call_frame_t *frame, xlator_t *this, loc_t *loc, dict_t *xdata) { int32_t ret = 0; int op_errno = 0; void *cookie = NULL; uint64_t ctx_addr = 0; gf_boolean_t xref = _gf_false; br_stub_private_t *priv = NULL; call_stub_t *stub = NULL; GF_VALIDATE_OR_GOTO("bit-rot-stub", this, unwind); GF_VALIDATE_OR_GOTO(this->name, loc, unwind); GF_VALIDATE_OR_GOTO(this->name, loc->inode, unwind); priv = this->private; BR_STUB_VER_NOT_ACTIVE_THEN_GOTO(frame, priv, wind); if (!gf_uuid_compare(loc->gfid, priv->bad_object_dir_gfid) || !gf_uuid_compare(loc->pargfid, priv->bad_object_dir_gfid)) { stub = fop_lookup_stub(frame, br_stub_lookup_wrapper, loc, xdata); if (!stub) { op_errno = ENOMEM; goto unwind; } br_stub_worker_enqueue(this, stub); return 0; } ret = br_stub_get_inode_ctx(this, loc->inode, &ctx_addr); if (ret < 0) ctx_addr = 0; if (ctx_addr != 0) goto wind; /** * fresh lookup: request version keys from POSIX */ op_errno = ENOMEM; if (!xdata) { xdata = dict_new(); if (!xdata) goto unwind; } else { xdata = dict_ref(xdata); } xref = _gf_true; /** * Requesting both xattrs provides a way of sanity checking the * object. Anomaly checking is done in cbk by examining absence * of either or both xattrs. */ op_errno = EINVAL; ret = dict_set_uint32(xdata, BITROT_CURRENT_VERSION_KEY, 0); if (ret) goto unwind; ret = dict_set_uint32(xdata, BITROT_SIGNING_VERSION_KEY, 0); if (ret) goto unwind; ret = dict_set_uint32(xdata, BITROT_OBJECT_BAD_KEY, 0); if (ret) goto unwind; cookie = (void *)BR_STUB_REQUEST_COOKIE; wind: STACK_WIND_COOKIE(frame, br_stub_lookup_cbk, cookie, FIRST_CHILD(this), FIRST_CHILD(this)->fops->lookup, loc, xdata); goto dealloc_dict; unwind: if (frame->local == (void *)0x1) frame->local = NULL; STACK_UNWIND_STRICT(lookup, frame, -1, op_errno, NULL, NULL, NULL, NULL); dealloc_dict: if (xref) dict_unref(xdata); return 0; } /** }}} */ /** {{{ */ /* stat */ int br_stub_stat(call_frame_t *frame, xlator_t *this, loc_t *loc, dict_t *xdata) { int32_t ret = 0; int32_t op_ret = -1; int32_t op_errno = EINVAL; br_stub_private_t *priv = NULL; priv = this->private; if (!priv->do_versioning) goto wind; if (!IA_ISREG(loc->inode->ia_type)) goto wind; ret = br_stub_check_bad_object(this, loc->inode, &op_ret, &op_errno); if (ret) goto unwind; wind: STACK_WIND_TAIL(frame, FIRST_CHILD(this), FIRST_CHILD(this)->fops->stat, loc, xdata); return 0; unwind: STACK_UNWIND_STRICT(stat, frame, op_ret, op_errno, NULL, NULL); return 0; } /* fstat */ int br_stub_fstat(call_frame_t *frame, xlator_t *this, fd_t *fd, dict_t *xdata) { int32_t ret = 0; int32_t op_ret = -1; int32_t op_errno = EINVAL; br_stub_private_t *priv = NULL; priv = this->private; if (!priv->do_versioning) goto wind; if (!IA_ISREG(fd->inode->ia_type)) goto wind; ret = br_stub_check_bad_object(this, fd->inode, &op_ret, &op_errno); if (ret) goto unwind; wind: STACK_WIND_TAIL(frame, FIRST_CHILD(this), FIRST_CHILD(this)->fops->fstat, fd, xdata); return 0; unwind: STACK_UNWIND_STRICT(fstat, frame, op_ret, op_errno, NULL, NULL); return 0; } /** }}} */ /** {{{ */ /* unlink() */ int br_stub_unlink_cbk(call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno, struct iatt *preparent, struct iatt *postparent, dict_t *xdata) { br_stub_local_t *local = NULL; inode_t *inode = NULL; uint64_t ctx_addr = 0; br_stub_inode_ctx_t *ctx = NULL; int32_t ret = -1; br_stub_private_t *priv = NULL; gf_boolean_t ver_enabled = _gf_false; BR_STUB_VER_ENABLED_IN_CALLPATH(frame, ver_enabled); priv = this->private; BR_STUB_VER_COND_GOTO(priv, (!ver_enabled), unwind); local = frame->local; frame->local = NULL; if (op_ret < 0) goto unwind; if (!local) { gf_msg(this->name, GF_LOG_WARNING, 0, BRS_MSG_NULL_LOCAL, "local is NULL"); goto unwind; } inode = local->u.context.inode; if (!IA_ISREG(inode->ia_type)) goto unwind; ret = br_stub_get_inode_ctx(this, inode, &ctx_addr); if (ret) { /** * If the inode is bad AND context is not there, then there * is a possibility of the gfid of the object being listed * in the quarantine directory and will be shown in the * bad objects list. So continuing with the fop with a * warning log. The entry from the quarantine directory * has to be removed manually. Its not a good idea to fail * the fop, as the object has already been deleted. */ gf_msg(this->name, GF_LOG_WARNING, 0, BRS_MSG_GET_INODE_CONTEXT_FAILED, "failed to get the context for the inode %s", uuid_utoa(inode->gfid)); goto unwind; } ctx = (br_stub_inode_ctx_t *)(long)ctx_addr; LOCK(&inode->lock); { /** * Ignoring the return value of br_stub_del (). * There is not much that can be done if unlinking * of the entry in the quarantine directory fails. * The failure is logged. */ if (__br_stub_is_bad_object(ctx)) (void)br_stub_del(this, inode->gfid); } UNLOCK(&inode->lock); unwind: STACK_UNWIND_STRICT(unlink, frame, op_ret, op_errno, preparent, postparent, xdata); br_stub_cleanup_local(local); br_stub_dealloc_local(local); return 0; } int br_stub_unlink(call_frame_t *frame, xlator_t *this, loc_t *loc, int flag, dict_t *xdata) { br_stub_local_t *local = NULL; int32_t op_ret = -1; int32_t op_errno = 0; br_stub_private_t *priv = NULL; priv = this->private; BR_STUB_VER_NOT_ACTIVE_THEN_GOTO(frame, priv, wind); local = br_stub_alloc_local(this); if (!local) { op_ret = -1; op_errno = ENOMEM; gf_msg(this->name, GF_LOG_ERROR, ENOMEM, BRS_MSG_NO_MEMORY, "failed to allocate memory for local (path: %s, gfid: %s)", loc->path, uuid_utoa(loc->inode->gfid)); goto unwind; } br_stub_fill_local(local, NULL, NULL, loc->inode, loc->inode->gfid, BR_STUB_NO_VERSIONING, 0); frame->local = local; wind: STACK_WIND(frame, br_stub_unlink_cbk, FIRST_CHILD(this), FIRST_CHILD(this)->fops->unlink, loc, flag, xdata); return 0; unwind: if (frame->local == (void *)0x1) frame->local = NULL; STACK_UNWIND_STRICT(unlink, frame, op_ret, op_errno, NULL, NULL, NULL); return 0; } /** }}} */ /** {{{ */ /* forget() */ int br_stub_forget(xlator_t *this, inode_t *inode) { uint64_t ctx_addr = 0; br_stub_inode_ctx_t *ctx = NULL; inode_ctx_del(inode, this, &ctx_addr); if (!ctx_addr) return 0; ctx = (br_stub_inode_ctx_t *)(long)ctx_addr; GF_FREE(ctx); return 0; } /** }}} */ /** {{{ */ int32_t br_stub_noop(call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno, dict_t *xdata) { STACK_DESTROY(frame->root); return 0; } static void br_stub_send_ipc_fop(xlator_t *this, fd_t *fd, unsigned long releaseversion, int sign_info) { int32_t op = 0; int32_t ret = 0; dict_t *xdata = NULL; call_frame_t *frame = NULL; changelog_event_t ev = { 0, }; ev.ev_type = CHANGELOG_OP_TYPE_BR_RELEASE; ev.u.releasebr.version = releaseversion; ev.u.releasebr.sign_info = sign_info; gf_uuid_copy(ev.u.releasebr.gfid, fd->inode->gfid); xdata = dict_new(); if (!xdata) { gf_msg(this->name, GF_LOG_WARNING, ENOMEM, BRS_MSG_NO_MEMORY, "dict allocation failed: cannot send IPC FOP " "to changelog"); goto out; } ret = dict_set_static_bin(xdata, "RELEASE-EVENT", &ev, CHANGELOG_EV_SIZE); if (ret) { gf_msg(this->name, GF_LOG_WARNING, 0, BRS_MSG_SET_EVENT_FAILED, "cannot set release event in dict"); goto dealloc_dict; } frame = create_frame(this, this->ctx->pool); if (!frame) { gf_msg(this->name, GF_LOG_WARNING, 0, BRS_MSG_CREATE_FRAME_FAILED, "create_frame() failure"); goto dealloc_dict; } op = GF_IPC_TARGET_CHANGELOG; STACK_WIND(frame, br_stub_noop, FIRST_CHILD(this), FIRST_CHILD(this)->fops->ipc, op, xdata); dealloc_dict: dict_unref(xdata); out: return; } /** * This is how the state machine of sign info works: * 3 states: * 1) BR_SIGN_NORMAL => The default State of the inode * 2) BR_SIGN_REOPEN_WAIT => A release has been sent and is waiting for reopen * 3) BR_SIGN_QUICK => reopen has happened and this release should trigger sign * 2 events: * 1) GF_FOP_RELEASE * 2) GF_FOP_WRITE (actually a dummy write for BitD) * * This is how states are changed based on events: * EVENT: GF_FOP_RELEASE: * if (state == BR_SIGN_NORMAL) ; then * set state = BR_SIGN_REOPEN_WAIT; * if (state == BR_SIGN_QUICK); then * set state = BR_SIGN_NORMAL; * EVENT: GF_FOP_WRITE: * if (state == BR_SIGN_REOPEN_WAIT); then * set state = BR_SIGN_QUICK; */ br_sign_state_t __br_stub_inode_sign_state(br_stub_inode_ctx_t *ctx, glusterfs_fop_t fop, fd_t *fd) { br_sign_state_t sign_info = BR_SIGN_INVALID; switch (fop) { case GF_FOP_FSETXATTR: sign_info = ctx->info_sign = BR_SIGN_QUICK; break; case GF_FOP_RELEASE: GF_ASSERT(ctx->info_sign != BR_SIGN_REOPEN_WAIT); if (ctx->info_sign == BR_SIGN_NORMAL) { sign_info = ctx->info_sign = BR_SIGN_REOPEN_WAIT; } else { sign_info = ctx->info_sign; ctx->info_sign = BR_SIGN_NORMAL; } break; default: break; } return sign_info; } int32_t br_stub_release(xlator_t *this, fd_t *fd) { int32_t ret = 0; int32_t flags = 0; inode_t *inode = NULL; unsigned long releaseversion = 0; br_stub_inode_ctx_t *ctx = NULL; uint64_t tmp = 0; br_stub_fd_t *br_stub_fd = NULL; int32_t signinfo = 0; inode = fd->inode; LOCK(&inode->lock); { ctx = __br_stub_get_ongoing_version_ctx(this, inode, NULL); if (ctx == NULL) goto unblock; br_stub_fd = br_stub_fd_ctx_get(this, fd); if (br_stub_fd) { list_del_init(&br_stub_fd->list); } ret = __br_stub_can_trigger_release(inode, ctx, &releaseversion); if (!ret) goto unblock; signinfo = __br_stub_inode_sign_state(ctx, GF_FOP_RELEASE, fd); signinfo = htonl(signinfo); /* inode back to initital state: mark dirty */ if (ctx->info_sign == BR_SIGN_NORMAL) { __br_stub_mark_inode_dirty(ctx); __br_stub_unset_inode_modified(ctx); } } unblock: UNLOCK(&inode->lock); if (ret) { gf_msg_debug(this->name, 0, "releaseversion: %lu | flags: %d " "| signinfo: %d", (unsigned long)ntohl(releaseversion), flags, ntohl(signinfo)); br_stub_send_ipc_fop(this, fd, releaseversion, signinfo); } ret = fd_ctx_del(fd, this, &tmp); br_stub_fd = (br_stub_fd_t *)(long)tmp; GF_FREE(br_stub_fd); return 0; } int32_t br_stub_releasedir(xlator_t *this, fd_t *fd) { br_stub_fd_t *fctx = NULL; uint64_t ctx = 0; int ret = 0; ret = fd_ctx_del(fd, this, &ctx); if (ret < 0) goto out; fctx = (br_stub_fd_t *)(long)ctx; if (fctx->bad_object.dir) { ret = sys_closedir(fctx->bad_object.dir); if (ret) gf_msg(this->name, GF_LOG_ERROR, 0, BRS_MSG_BAD_OBJ_DIR_CLOSE_FAIL, "closedir error: %s", strerror(errno)); } GF_FREE(fctx); out: return 0; } /** }}} */ /** {{{ */ /* ictxmerge */ void br_stub_ictxmerge(xlator_t *this, fd_t *fd, inode_t *inode, inode_t *linked_inode) { int32_t ret = 0; uint64_t ctxaddr = 0; uint64_t lctxaddr = 0; br_stub_inode_ctx_t *ctx = NULL; br_stub_inode_ctx_t *lctx = NULL; br_stub_fd_t *br_stub_fd = NULL; ret = br_stub_get_inode_ctx(this, inode, &ctxaddr); if (ret < 0) goto done; ctx = (br_stub_inode_ctx_t *)(uintptr_t)ctxaddr; LOCK(&linked_inode->lock); { ret = __br_stub_get_inode_ctx(this, linked_inode, &lctxaddr); if (ret < 0) goto unblock; lctx = (br_stub_inode_ctx_t *)(uintptr_t)lctxaddr; GF_ASSERT(list_is_singular(&ctx->fd_list)); br_stub_fd = list_first_entry(&ctx->fd_list, br_stub_fd_t, list); if (br_stub_fd) { GF_ASSERT(br_stub_fd->fd == fd); list_move_tail(&br_stub_fd->list, &lctx->fd_list); } } unblock: UNLOCK(&linked_inode->lock); done: return; } /** }}} */ struct xlator_fops fops = { .lookup = br_stub_lookup, .stat = br_stub_stat, .fstat = br_stub_fstat, .open = br_stub_open, .create = br_stub_create, .readdirp = br_stub_readdirp, .getxattr = br_stub_getxattr, .fgetxattr = br_stub_fgetxattr, .fsetxattr = br_stub_fsetxattr, .writev = br_stub_writev, .truncate = br_stub_truncate, .ftruncate = br_stub_ftruncate, .mknod = br_stub_mknod, .readv = br_stub_readv, .removexattr = br_stub_removexattr, .fremovexattr = br_stub_fremovexattr, .setxattr = br_stub_setxattr, .opendir = br_stub_opendir, .readdir = br_stub_readdir, .unlink = br_stub_unlink, }; struct xlator_cbks cbks = { .forget = br_stub_forget, .release = br_stub_release, .ictxmerge = br_stub_ictxmerge, }; struct volume_options options[] = { {.key = {"bitrot"}, .type = GF_OPTION_TYPE_BOOL, .default_value = "off", .op_version = {GD_OP_VERSION_3_7_0}, .flags = OPT_FLAG_SETTABLE | OPT_FLAG_FORCE, .tags = {"bitrot"}, .description = "enable/disable bitrot stub"}, {.key = {"export"}, .type = GF_OPTION_TYPE_PATH, .op_version = {GD_OP_VERSION_3_7_0}, .tags = {"bitrot"}, .description = "brick path for versioning", .default_value = "{{ brick.path }}"}, {.key = {NULL}}, }; xlator_api_t xlator_api = { .init = init, .fini = fini, .notify = notify, .reconfigure = reconfigure, .mem_acct_init = mem_acct_init, .op_version = {1}, /* Present from the initial version */ .fops = &fops, .cbks = &cbks, .options = options, .identifier = "bitrot-stub", .category = GF_MAINTAINED, };