/* Copyright (c) 2013 Red Hat, Inc. This file is part of GlusterFS. This file is licensed to you under your choice of the GNU Lesser General Public License, version 3 or any later version (LGPLv3 or later), or the GNU General Public License, version 2 (GPLv2), in all cases as published by the Free Software Foundation. */ #include #include #include #include #include #include "changelog-rt.h" #include "changelog-encoders.h" #include "changelog-mem-types.h" #include "changelog-messages.h" #include #include #include "changelog-rpc.h" #include "errno.h" static struct changelog_bootstrap cb_bootstrap[] = { { .mode = CHANGELOG_MODE_RT, .ctor = changelog_rt_init, .dtor = changelog_rt_fini, }, }; static int changelog_init_rpc(xlator_t *this, changelog_priv_t *priv); static int changelog_init(xlator_t *this, changelog_priv_t *priv); /* Entry operations - TYPE III */ /** * entry operations do not undergo inode version checking. */ /* {{{ */ /* rmdir */ int32_t changelog_rmdir_cbk(call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno, struct iatt *preparent, struct iatt *postparent, dict_t *xdata) { changelog_priv_t *priv = NULL; changelog_local_t *local = NULL; priv = this->private; local = frame->local; CHANGELOG_COND_GOTO(priv, ((op_ret < 0) || !local), unwind); changelog_update(this, priv, local, CHANGELOG_TYPE_ENTRY); unwind: changelog_dec_fop_cnt(this, priv, local); CHANGELOG_STACK_UNWIND(rmdir, frame, op_ret, op_errno, preparent, postparent, xdata); return 0; } int32_t changelog_rmdir_resume(call_frame_t *frame, xlator_t *this, loc_t *loc, int xflags, dict_t *xdata) { changelog_priv_t *priv = NULL; priv = this->private; gf_msg_debug(this->name, 0, "Dequeue rmdir"); changelog_color_fop_and_inc_cnt(this, priv, frame->local); STACK_WIND(frame, changelog_rmdir_cbk, FIRST_CHILD(this), FIRST_CHILD(this)->fops->rmdir, loc, xflags, xdata); return 0; } int32_t changelog_rmdir(call_frame_t *frame, xlator_t *this, loc_t *loc, int xflags, dict_t *xdata) { size_t xtra_len = 0; changelog_priv_t *priv = NULL; changelog_opt_t *co = NULL; call_stub_t *stub = NULL; struct list_head queue = { 0, }; gf_boolean_t barrier_enabled = _gf_false; INIT_LIST_HEAD(&queue); priv = this->private; CHANGELOG_NOT_ACTIVE_THEN_GOTO(frame, priv, wind); CHANGELOG_INIT_NOCHECK(this, frame->local, NULL, loc->inode->gfid, 2); co = changelog_get_usable_buffer(frame->local); if (!co) goto wind; CHANGLOG_FILL_FOP_NUMBER(co, frame->root->op, fop_fn, xtra_len); co++; if (priv->capture_del_path) { CHANGELOG_FILL_ENTRY_DIR_PATH(co, loc->pargfid, loc->name, del_entry_fn, del_entry_free_fn, xtra_len, wind, _gf_true); } else { CHANGELOG_FILL_ENTRY_DIR_PATH(co, loc->pargfid, loc->name, del_entry_fn, del_entry_free_fn, xtra_len, wind, _gf_false); } changelog_set_usable_record_and_length(frame->local, xtra_len, 2); /* changelog barrier */ /* Color assignment and increment of fop_cnt for rmdir/unlink/rename * should be made with in priv lock if changelog barrier is not enabled. * Because if counter is not incremented yet, draining wakes up and * publishes the changelog but later these fops might hit the disk and * present in snapped volume but where as the intention is these fops * should not be present in snapped volume. */ LOCK(&priv->lock); { if ((barrier_enabled = priv->barrier_enabled)) { stub = fop_rmdir_stub(frame, changelog_rmdir_resume, loc, xflags, xdata); if (!stub) __chlog_barrier_disable(this, &queue); else __chlog_barrier_enqueue(this, stub); } else { ((changelog_local_t *)frame->local)->color = priv->current_color; changelog_inc_fop_cnt(this, priv, frame->local); } } UNLOCK(&priv->lock); if (barrier_enabled && stub) { gf_msg_debug(this->name, 0, "Enqueue rmdir"); goto out; } if (barrier_enabled && !stub) { gf_smsg(this->name, GF_LOG_ERROR, ENOMEM, CHANGELOG_MSG_BARRIER_FOP_FAILED, "fop=rmdir", NULL); chlog_barrier_dequeue_all(this, &queue); } /* changelog barrier */ wind: STACK_WIND(frame, changelog_rmdir_cbk, FIRST_CHILD(this), FIRST_CHILD(this)->fops->rmdir, loc, xflags, xdata); out: return 0; } /* unlink */ int32_t changelog_unlink_cbk(call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno, struct iatt *preparent, struct iatt *postparent, dict_t *xdata) { changelog_priv_t *priv = NULL; changelog_local_t *local = NULL; priv = this->private; local = frame->local; CHANGELOG_COND_GOTO(priv, ((op_ret < 0) || !local), unwind); changelog_update(this, priv, local, CHANGELOG_TYPE_ENTRY); unwind: changelog_dec_fop_cnt(this, priv, local); CHANGELOG_STACK_UNWIND(unlink, frame, op_ret, op_errno, preparent, postparent, xdata); return 0; } int32_t changelog_unlink_resume(call_frame_t *frame, xlator_t *this, loc_t *loc, int xflags, dict_t *xdata) { changelog_priv_t *priv = NULL; priv = this->private; gf_msg_debug(this->name, 0, "Dequeue unlink"); changelog_color_fop_and_inc_cnt(this, priv, frame->local); STACK_WIND(frame, changelog_unlink_cbk, FIRST_CHILD(this), FIRST_CHILD(this)->fops->unlink, loc, xflags, xdata); return 0; } int32_t changelog_unlink(call_frame_t *frame, xlator_t *this, loc_t *loc, int xflags, dict_t *xdata) { size_t xtra_len = 0; changelog_priv_t *priv = NULL; changelog_opt_t *co = NULL; call_stub_t *stub = NULL; struct list_head queue = { 0, }; gf_boolean_t barrier_enabled = _gf_false; dht_changelog_rename_info_t *info = NULL; int ret = 0; char *old_name = NULL; char *new_name = NULL; char *nname = NULL; INIT_LIST_HEAD(&queue); priv = this->private; CHANGELOG_NOT_ACTIVE_THEN_GOTO(frame, priv, wind); ret = dict_get_bin(xdata, DHT_CHANGELOG_RENAME_OP_KEY, (void **)&info); if (!ret) { /* special case: unlink considered as rename */ /* 3 == fop + oldloc + newloc */ old_name = alloca(info->oldname_len); new_name = alloca(info->newname_len); CHANGELOG_INIT_NOCHECK(this, frame->local, NULL, loc->inode->gfid, 3); co = changelog_get_usable_buffer(frame->local); if (!co) goto wind; CHANGLOG_FILL_FOP_NUMBER(co, GF_FOP_RENAME, fop_fn, xtra_len); co++; strncpy(old_name, info->buffer, info->oldname_len); CHANGELOG_FILL_ENTRY(co, info->old_pargfid, old_name, entry_fn, entry_free_fn, xtra_len, wind); co++; /* new name resides just after old name */ nname = info->buffer + info->oldname_len; strncpy(new_name, nname, info->newname_len); CHANGELOG_FILL_ENTRY(co, info->new_pargfid, new_name, entry_fn, entry_free_fn, xtra_len, wind); changelog_set_usable_record_and_length(frame->local, xtra_len, 3); } else { /* default unlink */ CHANGELOG_IF_INTERNAL_FOP_THEN_GOTO(frame, xdata, wind); CHANGELOG_INIT_NOCHECK(this, frame->local, NULL, loc->inode->gfid, 2); co = changelog_get_usable_buffer(frame->local); if (!co) goto wind; CHANGLOG_FILL_FOP_NUMBER(co, frame->root->op, fop_fn, xtra_len); co++; if (priv->capture_del_path) { CHANGELOG_FILL_ENTRY_DIR_PATH(co, loc->pargfid, loc->name, del_entry_fn, del_entry_free_fn, xtra_len, wind, _gf_true); } else { CHANGELOG_FILL_ENTRY_DIR_PATH(co, loc->pargfid, loc->name, del_entry_fn, del_entry_free_fn, xtra_len, wind, _gf_false); } changelog_set_usable_record_and_length(frame->local, xtra_len, 2); } /* changelog barrier */ LOCK(&priv->lock); { if ((barrier_enabled = priv->barrier_enabled)) { stub = fop_unlink_stub(frame, changelog_unlink_resume, loc, xflags, xdata); if (!stub) __chlog_barrier_disable(this, &queue); else __chlog_barrier_enqueue(this, stub); } else { ((changelog_local_t *)frame->local)->color = priv->current_color; changelog_inc_fop_cnt(this, priv, frame->local); } } UNLOCK(&priv->lock); if (barrier_enabled && stub) { gf_msg_debug(this->name, 0, "Enqueue unlink"); goto out; } if (barrier_enabled && !stub) { gf_smsg(this->name, GF_LOG_ERROR, ENOMEM, CHANGELOG_MSG_BARRIER_FOP_FAILED, "fop=unlink", NULL); chlog_barrier_dequeue_all(this, &queue); } /* changelog barrier */ wind: STACK_WIND(frame, changelog_unlink_cbk, FIRST_CHILD(this), FIRST_CHILD(this)->fops->unlink, loc, xflags, xdata); out: return 0; } /* rename */ int32_t changelog_rename_cbk(call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno, struct iatt *buf, struct iatt *preoldparent, struct iatt *postoldparent, struct iatt *prenewparent, struct iatt *postnewparent, dict_t *xdata) { changelog_priv_t *priv = NULL; changelog_local_t *local = NULL; priv = this->private; local = frame->local; CHANGELOG_COND_GOTO(priv, ((op_ret < 0) || !local), unwind); changelog_update(this, priv, local, CHANGELOG_TYPE_ENTRY); unwind: changelog_dec_fop_cnt(this, priv, local); CHANGELOG_STACK_UNWIND(rename, frame, op_ret, op_errno, buf, preoldparent, postoldparent, prenewparent, postnewparent, xdata); return 0; } int32_t changelog_rename_resume(call_frame_t *frame, xlator_t *this, loc_t *oldloc, loc_t *newloc, dict_t *xdata) { changelog_priv_t *priv = NULL; priv = this->private; gf_msg_debug(this->name, 0, "Dequeue rename"); changelog_color_fop_and_inc_cnt(this, priv, frame->local); STACK_WIND(frame, changelog_rename_cbk, FIRST_CHILD(this), FIRST_CHILD(this)->fops->rename, oldloc, newloc, xdata); return 0; } int32_t changelog_rename(call_frame_t *frame, xlator_t *this, loc_t *oldloc, loc_t *newloc, dict_t *xdata) { size_t xtra_len = 0; changelog_priv_t *priv = NULL; changelog_opt_t *co = NULL; call_stub_t *stub = NULL; struct list_head queue = { 0, }; gf_boolean_t barrier_enabled = _gf_false; dht_changelog_rename_info_t *info = NULL; int ret = 0; INIT_LIST_HEAD(&queue); priv = this->private; CHANGELOG_NOT_ACTIVE_THEN_GOTO(frame, priv, wind); ret = dict_get_bin(xdata, DHT_CHANGELOG_RENAME_OP_KEY, (void **)&info); if (ret && oldloc->inode->ia_type != IA_IFDIR) { /* xdata "NOT" set for a non-directory, * Special rename => avoid logging */ goto wind; } /* 3 == fop + oldloc + newloc */ CHANGELOG_INIT_NOCHECK(this, frame->local, NULL, oldloc->inode->gfid, 3); co = changelog_get_usable_buffer(frame->local); if (!co) goto wind; CHANGLOG_FILL_FOP_NUMBER(co, frame->root->op, fop_fn, xtra_len); co++; CHANGELOG_FILL_ENTRY(co, oldloc->pargfid, oldloc->name, entry_fn, entry_free_fn, xtra_len, wind); co++; CHANGELOG_FILL_ENTRY(co, newloc->pargfid, newloc->name, entry_fn, entry_free_fn, xtra_len, wind); changelog_set_usable_record_and_length(frame->local, xtra_len, 3); /* changelog barrier */ LOCK(&priv->lock); { if ((barrier_enabled = priv->barrier_enabled)) { stub = fop_rename_stub(frame, changelog_rename_resume, oldloc, newloc, xdata); if (!stub) __chlog_barrier_disable(this, &queue); else __chlog_barrier_enqueue(this, stub); } else { ((changelog_local_t *)frame->local)->color = priv->current_color; changelog_inc_fop_cnt(this, priv, frame->local); } } UNLOCK(&priv->lock); if (barrier_enabled && stub) { gf_msg_debug(this->name, 0, "Enqueue rename"); goto out; } if (barrier_enabled && !stub) { gf_smsg(this->name, GF_LOG_ERROR, ENOMEM, CHANGELOG_MSG_BARRIER_FOP_FAILED, "fop=rename", NULL); chlog_barrier_dequeue_all(this, &queue); } /* changelog barrier */ wind: STACK_WIND(frame, changelog_rename_cbk, FIRST_CHILD(this), FIRST_CHILD(this)->fops->rename, oldloc, newloc, xdata); out: return 0; } /* link */ int32_t changelog_link_cbk(call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno, inode_t *inode, struct iatt *buf, struct iatt *preparent, struct iatt *postparent, dict_t *xdata) { changelog_priv_t *priv = NULL; changelog_local_t *local = NULL; priv = this->private; local = frame->local; CHANGELOG_COND_GOTO(priv, ((op_ret < 0) || !local), unwind); changelog_update(this, priv, local, CHANGELOG_TYPE_ENTRY); unwind: changelog_dec_fop_cnt(this, priv, local); CHANGELOG_STACK_UNWIND(link, frame, op_ret, op_errno, inode, buf, preparent, postparent, xdata); return 0; } int32_t changelog_link_resume(call_frame_t *frame, xlator_t *this, loc_t *oldloc, loc_t *newloc, dict_t *xdata) { changelog_priv_t *priv = NULL; GF_VALIDATE_OR_GOTO("changelog", this, out); GF_VALIDATE_OR_GOTO("changelog", this->fops, out); GF_VALIDATE_OR_GOTO("changelog", frame, out); priv = this->private; gf_msg_debug(this->name, 0, "Dequeuing link"); changelog_color_fop_and_inc_cnt(this, priv, frame->local); STACK_WIND(frame, changelog_link_cbk, FIRST_CHILD(this), FIRST_CHILD(this)->fops->link, oldloc, newloc, xdata); return 0; out: return -1; } int32_t changelog_link(call_frame_t *frame, xlator_t *this, loc_t *oldloc, loc_t *newloc, dict_t *xdata) { size_t xtra_len = 0; changelog_priv_t *priv = NULL; changelog_opt_t *co = NULL; call_stub_t *stub = NULL; struct list_head queue = { 0, }; gf_boolean_t barrier_enabled = _gf_false; priv = this->private; CHANGELOG_NOT_ACTIVE_THEN_GOTO(frame, priv, wind); CHANGELOG_IF_INTERNAL_FOP_THEN_GOTO(frame, xdata, wind); CHANGELOG_INIT_NOCHECK(this, frame->local, NULL, oldloc->gfid, 2); co = changelog_get_usable_buffer(frame->local); if (!co) goto wind; CHANGLOG_FILL_FOP_NUMBER(co, frame->root->op, fop_fn, xtra_len); co++; CHANGELOG_FILL_ENTRY(co, newloc->pargfid, newloc->name, entry_fn, entry_free_fn, xtra_len, wind); changelog_set_usable_record_and_length(frame->local, xtra_len, 2); LOCK(&priv->lock); { if ((barrier_enabled = priv->barrier_enabled)) { stub = fop_link_stub(frame, changelog_link_resume, oldloc, newloc, xdata); if (!stub) __chlog_barrier_disable(this, &queue); else __chlog_barrier_enqueue(this, stub); } else { ((changelog_local_t *)frame->local)->color = priv->current_color; changelog_inc_fop_cnt(this, priv, frame->local); } } UNLOCK(&priv->lock); if (barrier_enabled && stub) { gf_msg_debug(this->name, 0, "Enqueued link"); goto out; } if (barrier_enabled && !stub) { gf_smsg(this->name, GF_LOG_ERROR, 0, CHANGELOG_MSG_BARRIER_FOP_FAILED, "fop=link", NULL); chlog_barrier_dequeue_all(this, &queue); } wind: STACK_WIND(frame, changelog_link_cbk, FIRST_CHILD(this), FIRST_CHILD(this)->fops->link, oldloc, newloc, xdata); out: return 0; } /* mkdir */ int32_t changelog_mkdir_cbk(call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno, inode_t *inode, struct iatt *buf, struct iatt *preparent, struct iatt *postparent, dict_t *xdata) { changelog_priv_t *priv = NULL; changelog_local_t *local = NULL; priv = this->private; local = frame->local; CHANGELOG_COND_GOTO(priv, ((op_ret < 0) || !local), unwind); changelog_update(this, priv, local, CHANGELOG_TYPE_ENTRY); unwind: changelog_dec_fop_cnt(this, priv, local); CHANGELOG_STACK_UNWIND(mkdir, frame, op_ret, op_errno, inode, buf, preparent, postparent, xdata); return 0; } int32_t changelog_mkdir_resume(call_frame_t *frame, xlator_t *this, loc_t *loc, mode_t mode, mode_t umask, dict_t *xdata) { changelog_priv_t *priv = NULL; GF_VALIDATE_OR_GOTO("changelog", this, out); GF_VALIDATE_OR_GOTO("changelog", this->fops, out); GF_VALIDATE_OR_GOTO("changelog", frame, out); priv = this->private; gf_msg_debug(this->name, 0, "Dequeuing mkdir"); changelog_color_fop_and_inc_cnt(this, priv, frame->local); STACK_WIND(frame, changelog_mkdir_cbk, FIRST_CHILD(this), FIRST_CHILD(this)->fops->mkdir, loc, mode, umask, xdata); return 0; out: return -1; } int32_t changelog_mkdir(call_frame_t *frame, xlator_t *this, loc_t *loc, mode_t mode, mode_t umask, dict_t *xdata) { int ret = -1; uuid_t gfid = { 0, }; size_t xtra_len = 0; changelog_priv_t *priv = NULL; changelog_opt_t *co = NULL; call_stub_t *stub = NULL; struct list_head queue = { 0, }; gf_boolean_t barrier_enabled = _gf_false; priv = this->private; CHANGELOG_NOT_ACTIVE_THEN_GOTO(frame, priv, wind); ret = dict_get_gfuuid(xdata, "gfid-req", &gfid); if (ret) { gf_msg_debug(this->name, 0, "failed to get gfid from dict"); goto wind; } CHANGELOG_INIT_NOCHECK(this, frame->local, NULL, gfid, 5); co = changelog_get_usable_buffer(frame->local); if (!co) goto wind; CHANGLOG_FILL_FOP_NUMBER(co, frame->root->op, fop_fn, xtra_len); co++; CHANGELOG_FILL_UINT32(co, S_IFDIR | mode, number_fn, xtra_len); co++; CHANGELOG_FILL_UINT32(co, frame->root->uid, number_fn, xtra_len); co++; CHANGELOG_FILL_UINT32(co, frame->root->gid, number_fn, xtra_len); co++; CHANGELOG_FILL_ENTRY(co, loc->pargfid, loc->name, entry_fn, entry_free_fn, xtra_len, wind); changelog_set_usable_record_and_length(frame->local, xtra_len, 5); LOCK(&priv->lock); { if ((barrier_enabled = priv->barrier_enabled)) { stub = fop_mkdir_stub(frame, changelog_mkdir_resume, loc, mode, umask, xdata); if (!stub) __chlog_barrier_disable(this, &queue); else __chlog_barrier_enqueue(this, stub); } else { ((changelog_local_t *)frame->local)->color = priv->current_color; changelog_inc_fop_cnt(this, priv, frame->local); } } UNLOCK(&priv->lock); if (barrier_enabled && stub) { gf_msg_debug(this->name, 0, "Enqueued mkdir"); goto out; } if (barrier_enabled && !stub) { gf_smsg(this->name, GF_LOG_ERROR, ENOMEM, CHANGELOG_MSG_BARRIER_FOP_FAILED, "fop=mkdir", NULL); chlog_barrier_dequeue_all(this, &queue); } wind: STACK_WIND(frame, changelog_mkdir_cbk, FIRST_CHILD(this), FIRST_CHILD(this)->fops->mkdir, loc, mode, umask, xdata); out: return 0; } /* symlink */ int32_t changelog_symlink_cbk(call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno, inode_t *inode, struct iatt *buf, struct iatt *preparent, struct iatt *postparent, dict_t *xdata) { changelog_priv_t *priv = NULL; changelog_local_t *local = NULL; priv = this->private; local = frame->local; CHANGELOG_COND_GOTO(priv, ((op_ret < 0) || !local), unwind); changelog_update(this, priv, local, CHANGELOG_TYPE_ENTRY); unwind: changelog_dec_fop_cnt(this, priv, local); CHANGELOG_STACK_UNWIND(symlink, frame, op_ret, op_errno, inode, buf, preparent, postparent, xdata); return 0; } int32_t changelog_symlink_resume(call_frame_t *frame, xlator_t *this, const char *linkname, loc_t *loc, mode_t umask, dict_t *xdata) { changelog_priv_t *priv = NULL; GF_VALIDATE_OR_GOTO("changelog", this, out); GF_VALIDATE_OR_GOTO("changelog", this->fops, out); GF_VALIDATE_OR_GOTO("changelog", frame, out); priv = this->private; gf_msg_debug(this->name, 0, "Dequeuing symlink"); changelog_color_fop_and_inc_cnt(this, priv, frame->local); STACK_WIND(frame, changelog_symlink_cbk, FIRST_CHILD(this), FIRST_CHILD(this)->fops->symlink, linkname, loc, umask, xdata); return 0; out: return -1; } int32_t changelog_symlink(call_frame_t *frame, xlator_t *this, const char *linkname, loc_t *loc, mode_t umask, dict_t *xdata) { int ret = -1; size_t xtra_len = 0; uuid_t gfid = { 0, }; changelog_priv_t *priv = NULL; changelog_opt_t *co = NULL; call_stub_t *stub = NULL; struct list_head queue = { 0, }; gf_boolean_t barrier_enabled = _gf_false; priv = this->private; CHANGELOG_NOT_ACTIVE_THEN_GOTO(frame, priv, wind); ret = dict_get_gfuuid(xdata, "gfid-req", &gfid); if (ret) { gf_msg_debug(this->name, 0, "failed to get gfid from dict"); goto wind; } CHANGELOG_INIT_NOCHECK(this, frame->local, NULL, gfid, 2); co = changelog_get_usable_buffer(frame->local); if (!co) goto wind; CHANGLOG_FILL_FOP_NUMBER(co, frame->root->op, fop_fn, xtra_len); co++; CHANGELOG_FILL_ENTRY(co, loc->pargfid, loc->name, entry_fn, entry_free_fn, xtra_len, wind); changelog_set_usable_record_and_length(frame->local, xtra_len, 2); LOCK(&priv->lock); { if ((barrier_enabled = priv->barrier_enabled)) { stub = fop_symlink_stub(frame, changelog_symlink_resume, linkname, loc, umask, xdata); if (!stub) __chlog_barrier_disable(this, &queue); else __chlog_barrier_enqueue(this, stub); } else { ((changelog_local_t *)frame->local)->color = priv->current_color; changelog_inc_fop_cnt(this, priv, frame->local); } } UNLOCK(&priv->lock); if (barrier_enabled && stub) { gf_msg_debug(this->name, 0, "Enqueued symlink"); goto out; } if (barrier_enabled && !stub) { gf_smsg(this->name, GF_LOG_ERROR, ENOMEM, CHANGELOG_MSG_BARRIER_FOP_FAILED, "fop=symlink", NULL); chlog_barrier_dequeue_all(this, &queue); } wind: STACK_WIND(frame, changelog_symlink_cbk, FIRST_CHILD(this), FIRST_CHILD(this)->fops->symlink, linkname, loc, umask, xdata); out: return 0; } /* mknod */ int32_t changelog_mknod_cbk(call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno, inode_t *inode, struct iatt *buf, struct iatt *preparent, struct iatt *postparent, dict_t *xdata) { changelog_priv_t *priv = NULL; changelog_local_t *local = NULL; priv = this->private; local = frame->local; CHANGELOG_COND_GOTO(priv, ((op_ret < 0) || !local), unwind); changelog_update(this, priv, local, CHANGELOG_TYPE_ENTRY); unwind: changelog_dec_fop_cnt(this, priv, local); CHANGELOG_STACK_UNWIND(mknod, frame, op_ret, op_errno, inode, buf, preparent, postparent, xdata); return 0; } int32_t changelog_mknod_resume(call_frame_t *frame, xlator_t *this, loc_t *loc, mode_t mode, dev_t rdev, mode_t umask, dict_t *xdata) { changelog_priv_t *priv = NULL; GF_VALIDATE_OR_GOTO("changelog", this, out); GF_VALIDATE_OR_GOTO("changelog", this->fops, out); GF_VALIDATE_OR_GOTO("changelog", frame, out); priv = this->private; gf_msg_debug(this->name, 0, "Dequeuing mknod"); changelog_color_fop_and_inc_cnt(this, priv, frame->local); STACK_WIND(frame, changelog_mknod_cbk, FIRST_CHILD(this), FIRST_CHILD(this)->fops->mknod, loc, mode, rdev, umask, xdata); return 0; out: return -1; } int32_t changelog_mknod(call_frame_t *frame, xlator_t *this, loc_t *loc, mode_t mode, dev_t dev, mode_t umask, dict_t *xdata) { int ret = -1; uuid_t gfid = { 0, }; size_t xtra_len = 0; changelog_priv_t *priv = NULL; changelog_opt_t *co = NULL; call_stub_t *stub = NULL; struct list_head queue = { 0, }; gf_boolean_t barrier_enabled = _gf_false; priv = this->private; /* Check whether changelog active */ if (!(priv->active)) goto wind; /* Check whether rebalance activity */ if (frame->root->pid == GF_CLIENT_PID_DEFRAG) goto wind; /* If tier-dht linkto is SET, ignore about verifiying : * 1. Whether internal fop AND * 2. Whether tier rebalance process activity (this will help in * recording mknod if tier rebalance process calls this mknod) */ if (!(dict_get(xdata, "trusted.tier.tier-dht.linkto"))) { CHANGELOG_IF_INTERNAL_FOP_THEN_GOTO(frame, xdata, wind); if (frame->root->pid == GF_CLIENT_PID_TIER_DEFRAG) goto wind; } ret = dict_get_gfuuid(xdata, "gfid-req", &gfid); if (ret) { gf_msg_debug(this->name, 0, "failed to get gfid from dict"); goto wind; } CHANGELOG_INIT_NOCHECK(this, frame->local, NULL, gfid, 5); co = changelog_get_usable_buffer(frame->local); if (!co) goto wind; CHANGLOG_FILL_FOP_NUMBER(co, frame->root->op, fop_fn, xtra_len); co++; CHANGELOG_FILL_UINT32(co, mode, number_fn, xtra_len); co++; CHANGELOG_FILL_UINT32(co, frame->root->uid, number_fn, xtra_len); co++; CHANGELOG_FILL_UINT32(co, frame->root->gid, number_fn, xtra_len); co++; CHANGELOG_FILL_ENTRY(co, loc->pargfid, loc->name, entry_fn, entry_free_fn, xtra_len, wind); changelog_set_usable_record_and_length(frame->local, xtra_len, 5); LOCK(&priv->lock); { if ((barrier_enabled = priv->barrier_enabled)) { stub = fop_mknod_stub(frame, changelog_mknod_resume, loc, mode, dev, umask, xdata); if (!stub) __chlog_barrier_disable(this, &queue); else __chlog_barrier_enqueue(this, stub); } else { ((changelog_local_t *)frame->local)->color = priv->current_color; changelog_inc_fop_cnt(this, priv, frame->local); } } UNLOCK(&priv->lock); if (barrier_enabled && stub) { gf_msg_debug(this->name, 0, "Enqueued mknod"); goto out; } if (barrier_enabled && !stub) { gf_smsg(this->name, GF_LOG_ERROR, ENOMEM, CHANGELOG_MSG_BARRIER_FOP_FAILED, "fop=mknod", NULL); chlog_barrier_dequeue_all(this, &queue); } wind: STACK_WIND(frame, changelog_mknod_cbk, FIRST_CHILD(this), FIRST_CHILD(this)->fops->mknod, loc, mode, dev, umask, xdata); out: return 0; } /* create */ int32_t changelog_create_cbk(call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno, fd_t *fd, inode_t *inode, struct iatt *buf, struct iatt *preparent, struct iatt *postparent, dict_t *xdata) { int32_t ret = 0; changelog_priv_t *priv = NULL; changelog_local_t *local = NULL; changelog_event_t ev = { 0, }; priv = this->private; local = frame->local; CHANGELOG_COND_GOTO(priv, ((op_ret < 0) || !local), unwind); /* fill the event structure.. similar to open() */ ev.ev_type = CHANGELOG_OP_TYPE_CREATE; gf_uuid_copy(ev.u.create.gfid, buf->ia_gfid); ev.u.create.flags = fd->flags; changelog_dispatch_event(this, priv, &ev); if (changelog_ev_selected(this, &priv->ev_selection, CHANGELOG_OP_TYPE_RELEASE)) { ret = fd_ctx_set(fd, this, (uint64_t)(long)0x1); if (ret) gf_smsg(this->name, GF_LOG_WARNING, 0, CHANGELOG_MSG_SET_FD_CONTEXT, NULL); } changelog_update(this, priv, local, CHANGELOG_TYPE_ENTRY); unwind: changelog_dec_fop_cnt(this, priv, local); CHANGELOG_STACK_UNWIND(create, frame, op_ret, op_errno, fd, inode, buf, preparent, postparent, xdata); return 0; } int32_t changelog_create_resume(call_frame_t *frame, xlator_t *this, loc_t *loc, int32_t flags, mode_t mode, mode_t umask, fd_t *fd, dict_t *xdata) { changelog_priv_t *priv = NULL; GF_VALIDATE_OR_GOTO("changelog", this, out); GF_VALIDATE_OR_GOTO("changelog", this->fops, out); GF_VALIDATE_OR_GOTO("changelog", frame, out); priv = this->private; gf_msg_debug(this->name, 0, "Dequeuing create"); changelog_color_fop_and_inc_cnt(this, priv, frame->local); STACK_WIND(frame, changelog_create_cbk, FIRST_CHILD(this), FIRST_CHILD(this)->fops->create, loc, flags, mode, umask, fd, xdata); return 0; out: return -1; } int32_t changelog_create(call_frame_t *frame, xlator_t *this, loc_t *loc, int32_t flags, mode_t mode, mode_t umask, fd_t *fd, dict_t *xdata) { int ret = -1; uuid_t gfid = { 0, }; changelog_opt_t *co = NULL; changelog_priv_t *priv = NULL; size_t xtra_len = 0; call_stub_t *stub = NULL; struct list_head queue = { 0, }; gf_boolean_t barrier_enabled = _gf_false; priv = this->private; CHANGELOG_NOT_ACTIVE_THEN_GOTO(frame, priv, wind); ret = dict_get_gfuuid(xdata, "gfid-req", &gfid); if (ret) { gf_msg_debug(this->name, 0, "failed to get gfid from dict"); goto wind; } /* init with two extra records */ CHANGELOG_INIT_NOCHECK(this, frame->local, NULL, gfid, 5); if (!frame->local) goto wind; co = changelog_get_usable_buffer(frame->local); if (!co) goto wind; CHANGLOG_FILL_FOP_NUMBER(co, frame->root->op, fop_fn, xtra_len); co++; CHANGELOG_FILL_UINT32(co, mode, number_fn, xtra_len); co++; CHANGELOG_FILL_UINT32(co, frame->root->uid, number_fn, xtra_len); co++; CHANGELOG_FILL_UINT32(co, frame->root->gid, number_fn, xtra_len); co++; CHANGELOG_FILL_ENTRY(co, loc->pargfid, loc->name, entry_fn, entry_free_fn, xtra_len, wind); changelog_set_usable_record_and_length(frame->local, xtra_len, 5); LOCK(&priv->lock); { if ((barrier_enabled = priv->barrier_enabled)) { stub = fop_create_stub(frame, changelog_create_resume, loc, flags, mode, umask, fd, xdata); if (!stub) __chlog_barrier_disable(this, &queue); else __chlog_barrier_enqueue(this, stub); } else { ((changelog_local_t *)frame->local)->color = priv->current_color; changelog_inc_fop_cnt(this, priv, frame->local); } } UNLOCK(&priv->lock); if (barrier_enabled && stub) { gf_msg_debug(this->name, 0, "Enqueued create"); goto out; } if (barrier_enabled && !stub) { gf_smsg(this->name, GF_LOG_ERROR, ENOMEM, CHANGELOG_MSG_BARRIER_FOP_FAILED, "fop=create", NULL); chlog_barrier_dequeue_all(this, &queue); } wind: STACK_WIND(frame, changelog_create_cbk, FIRST_CHILD(this), FIRST_CHILD(this)->fops->create, loc, flags, mode, umask, fd, xdata); out: return 0; } /* }}} */ /* Metadata modification fops - TYPE II */ /* {{{ */ /* {f}setattr */ int32_t changelog_fsetattr_cbk(call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno, struct iatt *preop_stbuf, struct iatt *postop_stbuf, dict_t *xdata) { changelog_priv_t *priv = NULL; changelog_local_t *local = NULL; priv = this->private; local = frame->local; CHANGELOG_COND_GOTO(priv, ((op_ret < 0) || !local), unwind); changelog_update(this, priv, local, CHANGELOG_TYPE_METADATA); unwind: changelog_dec_fop_cnt(this, priv, local); CHANGELOG_STACK_UNWIND(fsetattr, frame, op_ret, op_errno, preop_stbuf, postop_stbuf, xdata); return 0; } int32_t changelog_fsetattr(call_frame_t *frame, xlator_t *this, fd_t *fd, struct iatt *stbuf, int32_t valid, dict_t *xdata) { changelog_priv_t *priv = NULL; changelog_opt_t *co = NULL; size_t xtra_len = 0; priv = this->private; CHANGELOG_NOT_ACTIVE_THEN_GOTO(frame, priv, wind); CHANGELOG_OP_BOUNDARY_CHECK(frame, wind); CHANGELOG_INIT(this, frame->local, fd->inode, fd->inode->gfid, 1); if (!frame->local) goto wind; co = changelog_get_usable_buffer(frame->local); if (!co) goto wind; CHANGLOG_FILL_FOP_NUMBER(co, frame->root->op, fop_fn, xtra_len); changelog_set_usable_record_and_length(frame->local, xtra_len, 1); wind: changelog_color_fop_and_inc_cnt(this, priv, frame->local); STACK_WIND(frame, changelog_fsetattr_cbk, FIRST_CHILD(this), FIRST_CHILD(this)->fops->fsetattr, fd, stbuf, valid, xdata); return 0; } int32_t changelog_setattr_cbk(call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno, struct iatt *preop_stbuf, struct iatt *postop_stbuf, dict_t *xdata) { changelog_priv_t *priv = NULL; changelog_local_t *local = NULL; priv = this->private; local = frame->local; CHANGELOG_COND_GOTO(priv, ((op_ret < 0) || !local), unwind); changelog_update(this, priv, local, CHANGELOG_TYPE_METADATA); unwind: changelog_dec_fop_cnt(this, priv, local); CHANGELOG_STACK_UNWIND(setattr, frame, op_ret, op_errno, preop_stbuf, postop_stbuf, xdata); return 0; } int32_t changelog_setattr(call_frame_t *frame, xlator_t *this, loc_t *loc, struct iatt *stbuf, int32_t valid, dict_t *xdata) { changelog_priv_t *priv = NULL; changelog_opt_t *co = NULL; size_t xtra_len = 0; uuid_t shard_root_gfid = { 0, }; priv = this->private; CHANGELOG_NOT_ACTIVE_THEN_GOTO(frame, priv, wind); CHANGELOG_IF_INTERNAL_FOP_THEN_GOTO(frame, xdata, wind); /* Do not record META on .shard */ gf_uuid_parse(SHARD_ROOT_GFID, shard_root_gfid); if (gf_uuid_compare(loc->gfid, shard_root_gfid) == 0) { goto wind; } CHANGELOG_OP_BOUNDARY_CHECK(frame, wind); CHANGELOG_INIT(this, frame->local, loc->inode, loc->inode->gfid, 1); if (!frame->local) goto wind; co = changelog_get_usable_buffer(frame->local); if (!co) goto wind; CHANGLOG_FILL_FOP_NUMBER(co, frame->root->op, fop_fn, xtra_len); changelog_set_usable_record_and_length(frame->local, xtra_len, 1); wind: changelog_color_fop_and_inc_cnt(this, priv, frame->local); STACK_WIND(frame, changelog_setattr_cbk, FIRST_CHILD(this), FIRST_CHILD(this)->fops->setattr, loc, stbuf, valid, xdata); return 0; } /* {f}removexattr */ int32_t changelog_fremovexattr_cbk(call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno, dict_t *xdata) { changelog_priv_t *priv = NULL; changelog_local_t *local = NULL; priv = this->private; local = frame->local; CHANGELOG_COND_GOTO(priv, ((op_ret < 0) || !local), unwind); changelog_update(this, priv, local, CHANGELOG_TYPE_METADATA_XATTR); unwind: changelog_dec_fop_cnt(this, priv, local); CHANGELOG_STACK_UNWIND(fremovexattr, frame, op_ret, op_errno, xdata); return 0; } int32_t changelog_fremovexattr(call_frame_t *frame, xlator_t *this, fd_t *fd, const char *name, dict_t *xdata) { changelog_priv_t *priv = NULL; changelog_opt_t *co = NULL; size_t xtra_len = 0; priv = this->private; CHANGELOG_NOT_ACTIVE_THEN_GOTO(frame, priv, wind); CHANGELOG_OP_BOUNDARY_CHECK(frame, wind); CHANGELOG_INIT(this, frame->local, fd->inode, fd->inode->gfid, 1); co = changelog_get_usable_buffer(frame->local); if (!co) goto wind; CHANGLOG_FILL_FOP_NUMBER(co, frame->root->op, fop_fn, xtra_len); changelog_set_usable_record_and_length(frame->local, xtra_len, 1); wind: changelog_color_fop_and_inc_cnt(this, priv, frame->local); STACK_WIND(frame, changelog_fremovexattr_cbk, FIRST_CHILD(this), FIRST_CHILD(this)->fops->fremovexattr, fd, name, xdata); return 0; } int32_t changelog_removexattr_cbk(call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno, dict_t *xdata) { changelog_priv_t *priv = NULL; changelog_local_t *local = NULL; priv = this->private; local = frame->local; CHANGELOG_COND_GOTO(priv, ((op_ret < 0) || !local), unwind); changelog_update(this, priv, local, CHANGELOG_TYPE_METADATA_XATTR); unwind: changelog_dec_fop_cnt(this, priv, local); CHANGELOG_STACK_UNWIND(removexattr, frame, op_ret, op_errno, xdata); return 0; } int32_t changelog_removexattr(call_frame_t *frame, xlator_t *this, loc_t *loc, const char *name, dict_t *xdata) { changelog_priv_t *priv = NULL; changelog_opt_t *co = NULL; size_t xtra_len = 0; priv = this->private; CHANGELOG_NOT_ACTIVE_THEN_GOTO(frame, priv, wind); CHANGELOG_OP_BOUNDARY_CHECK(frame, wind); CHANGELOG_INIT(this, frame->local, loc->inode, loc->inode->gfid, 1); co = changelog_get_usable_buffer(frame->local); if (!co) goto wind; CHANGLOG_FILL_FOP_NUMBER(co, frame->root->op, fop_fn, xtra_len); changelog_set_usable_record_and_length(frame->local, xtra_len, 1); wind: changelog_color_fop_and_inc_cnt(this, priv, frame->local); STACK_WIND(frame, changelog_removexattr_cbk, FIRST_CHILD(this), FIRST_CHILD(this)->fops->removexattr, loc, name, xdata); return 0; } /* {f}setxattr */ int32_t changelog_setxattr_cbk(call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno, dict_t *xdata) { changelog_priv_t *priv = NULL; changelog_local_t *local = NULL; priv = this->private; local = frame->local; CHANGELOG_COND_GOTO(priv, ((op_ret < 0) || !local), unwind); changelog_update(this, priv, local, CHANGELOG_TYPE_METADATA_XATTR); unwind: changelog_dec_fop_cnt(this, priv, local); CHANGELOG_STACK_UNWIND(setxattr, frame, op_ret, op_errno, xdata); return 0; } /* changelog_handle_virtual_xattr: * Handles virtual setxattr 'glusterfs.geo-rep.trigger-sync' on files. * Following is the behaviour based on the value of xattr. * 1: Captures only DATA entry in changelog. * 2: Tries to captures both ENTRY and DATA entry in * changelog. If failed to get pargfid, only DATA * entry is captured. * any other value: ENOTSUP is returned. */ static void changelog_handle_virtual_xattr(call_frame_t *frame, xlator_t *this, loc_t *loc, dict_t *dict) { changelog_priv_t *priv = NULL; changelog_local_t *local = NULL; int32_t value = 0; int ret = 0; int dict_ret = 0; gf_boolean_t valid = _gf_false; priv = this->private; GF_ASSERT(priv); dict_ret = dict_get_int32(dict, GF_XATTR_TRIGGER_SYNC, &value); if ((dict_ret == 0 && value == 1) && ((loc->inode->ia_type == IA_IFDIR) || (loc->inode->ia_type == IA_IFREG))) valid = _gf_true; if (valid) { ret = changelog_fill_entry_buf(frame, this, loc, &local); if (ret) { gf_smsg(this->name, GF_LOG_INFO, 0, CHANGELOG_MSG_ENTRY_BUF_INFO, "gfid=%s", uuid_utoa(loc->inode->gfid), NULL); goto unwind; } changelog_update(this, priv, local, CHANGELOG_TYPE_ENTRY); unwind: /* Capture DATA only if it's a file. */ if (loc->inode->ia_type != IA_IFDIR) changelog_update(this, priv, frame->local, CHANGELOG_TYPE_DATA); /* Assign local to prev_entry, so unwind will take * care of cleanup. */ ((changelog_local_t *)(frame->local))->prev_entry = local; CHANGELOG_STACK_UNWIND(setxattr, frame, 0, 0, NULL); return; } else { CHANGELOG_STACK_UNWIND(setxattr, frame, -1, ENOTSUP, NULL); return; } } int32_t changelog_setxattr(call_frame_t *frame, xlator_t *this, loc_t *loc, dict_t *dict, int32_t flags, dict_t *xdata) { changelog_priv_t *priv = NULL; changelog_opt_t *co = NULL; size_t xtra_len = 0; priv = this->private; CHANGELOG_NOT_ACTIVE_THEN_GOTO(frame, priv, wind); CHANGELOG_OP_BOUNDARY_CHECK(frame, wind); CHANGELOG_INIT(this, frame->local, loc->inode, loc->inode->gfid, 1); /* On setting this virtual xattr on a file, an explicit data * sync is triggered from geo-rep as CREATE|DATA entry is * recorded in changelog based on xattr value. */ if (dict_get(dict, GF_XATTR_TRIGGER_SYNC)) { changelog_handle_virtual_xattr(frame, this, loc, dict); return 0; } co = changelog_get_usable_buffer(frame->local); if (!co) goto wind; CHANGLOG_FILL_FOP_NUMBER(co, frame->root->op, fop_fn, xtra_len); changelog_set_usable_record_and_length(frame->local, xtra_len, 1); wind: changelog_color_fop_and_inc_cnt(this, priv, frame->local); STACK_WIND(frame, changelog_setxattr_cbk, FIRST_CHILD(this), FIRST_CHILD(this)->fops->setxattr, loc, dict, flags, xdata); return 0; } int32_t changelog_fsetxattr_cbk(call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno, dict_t *xdata) { changelog_priv_t *priv = NULL; changelog_local_t *local = NULL; priv = this->private; local = frame->local; CHANGELOG_COND_GOTO(priv, ((op_ret < 0) || !local), unwind); changelog_update(this, priv, local, CHANGELOG_TYPE_METADATA_XATTR); unwind: changelog_dec_fop_cnt(this, priv, local); CHANGELOG_STACK_UNWIND(fsetxattr, frame, op_ret, op_errno, xdata); return 0; } int32_t changelog_fsetxattr(call_frame_t *frame, xlator_t *this, fd_t *fd, dict_t *dict, int32_t flags, dict_t *xdata) { changelog_priv_t *priv = NULL; changelog_opt_t *co = NULL; size_t xtra_len = 0; priv = this->private; CHANGELOG_NOT_ACTIVE_THEN_GOTO(frame, priv, wind); CHANGELOG_IF_INTERNAL_FOP_THEN_GOTO(frame, xdata, wind); CHANGELOG_OP_BOUNDARY_CHECK(frame, wind); CHANGELOG_INIT(this, frame->local, fd->inode, fd->inode->gfid, 1); co = changelog_get_usable_buffer(frame->local); if (!co) goto wind; CHANGLOG_FILL_FOP_NUMBER(co, frame->root->op, fop_fn, xtra_len); changelog_set_usable_record_and_length(frame->local, xtra_len, 1); wind: changelog_color_fop_and_inc_cnt(this, priv, frame->local); STACK_WIND(frame, changelog_fsetxattr_cbk, FIRST_CHILD(this), FIRST_CHILD(this)->fops->fsetxattr, fd, dict, flags, xdata); return 0; } int32_t changelog_xattrop_cbk(call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno, dict_t *xattr, dict_t *xdata) { changelog_priv_t *priv = NULL; changelog_local_t *local = NULL; priv = this->private; local = frame->local; CHANGELOG_COND_GOTO(priv, ((op_ret < 0) || !local), unwind); changelog_update(this, priv, local, CHANGELOG_TYPE_METADATA); unwind: changelog_dec_fop_cnt(this, priv, local); CHANGELOG_STACK_UNWIND(xattrop, frame, op_ret, op_errno, xattr, xdata); return 0; } int32_t changelog_xattrop(call_frame_t *frame, xlator_t *this, loc_t *loc, gf_xattrop_flags_t optype, dict_t *xattr, dict_t *xdata) { changelog_priv_t *priv = NULL; changelog_opt_t *co = NULL; size_t xtra_len = 0; int ret = 0; void *size_attr = NULL; priv = this->private; CHANGELOG_NOT_ACTIVE_THEN_GOTO(frame, priv, wind); ret = dict_get_ptr(xattr, GF_XATTR_SHARD_FILE_SIZE, &size_attr); if (ret) goto wind; CHANGELOG_OP_BOUNDARY_CHECK(frame, wind); CHANGELOG_INIT(this, frame->local, loc->inode, loc->inode->gfid, 1); co = changelog_get_usable_buffer(frame->local); if (!co) goto wind; CHANGLOG_FILL_FOP_NUMBER(co, frame->root->op, fop_fn, xtra_len); changelog_set_usable_record_and_length(frame->local, xtra_len, 1); wind: changelog_color_fop_and_inc_cnt(this, priv, frame->local); STACK_WIND(frame, changelog_xattrop_cbk, FIRST_CHILD(this), FIRST_CHILD(this)->fops->xattrop, loc, optype, xattr, xdata); return 0; } int32_t changelog_fxattrop_cbk(call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno, dict_t *xattr, dict_t *xdata) { changelog_priv_t *priv = NULL; changelog_local_t *local = NULL; priv = this->private; local = frame->local; CHANGELOG_COND_GOTO(priv, ((op_ret < 0) || !local), unwind); changelog_update(this, priv, local, CHANGELOG_TYPE_METADATA_XATTR); unwind: changelog_dec_fop_cnt(this, priv, local); CHANGELOG_STACK_UNWIND(fxattrop, frame, op_ret, op_errno, xattr, xdata); return 0; } int32_t changelog_fxattrop(call_frame_t *frame, xlator_t *this, fd_t *fd, gf_xattrop_flags_t optype, dict_t *xattr, dict_t *xdata) { changelog_priv_t *priv = NULL; changelog_opt_t *co = NULL; size_t xtra_len = 0; void *size_attr = NULL; int ret = 0; priv = this->private; CHANGELOG_NOT_ACTIVE_THEN_GOTO(frame, priv, wind); ret = dict_get_ptr(xattr, GF_XATTR_SHARD_FILE_SIZE, &size_attr); if (ret) goto wind; CHANGELOG_OP_BOUNDARY_CHECK(frame, wind); CHANGELOG_INIT(this, frame->local, fd->inode, fd->inode->gfid, 1); co = changelog_get_usable_buffer(frame->local); if (!co) goto wind; CHANGLOG_FILL_FOP_NUMBER(co, frame->root->op, fop_fn, xtra_len); changelog_set_usable_record_and_length(frame->local, xtra_len, 1); wind: changelog_color_fop_and_inc_cnt(this, priv, frame->local); STACK_WIND(frame, changelog_fxattrop_cbk, FIRST_CHILD(this), FIRST_CHILD(this)->fops->fxattrop, fd, optype, xattr, xdata); return 0; } /* }}} */ /* Data modification fops - TYPE I */ /* {{{ */ /* {f}truncate() */ int32_t changelog_truncate_cbk(call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno, struct iatt *prebuf, struct iatt *postbuf, dict_t *xdata) { changelog_priv_t *priv = NULL; changelog_local_t *local = NULL; priv = this->private; local = frame->local; CHANGELOG_COND_GOTO(priv, ((op_ret < 0) || !local), unwind); changelog_update(this, priv, local, CHANGELOG_TYPE_DATA); unwind: changelog_dec_fop_cnt(this, priv, local); CHANGELOG_STACK_UNWIND(truncate, frame, op_ret, op_errno, prebuf, postbuf, xdata); return 0; } int32_t changelog_truncate(call_frame_t *frame, xlator_t *this, loc_t *loc, off_t offset, dict_t *xdata) { changelog_priv_t *priv = NULL; priv = this->private; CHANGELOG_NOT_ACTIVE_THEN_GOTO(frame, priv, wind); CHANGELOG_INIT(this, frame->local, loc->inode, loc->inode->gfid, 0); LOCK(&priv->c_snap_lock); { if (priv->c_snap_fd != -1 && priv->barrier_enabled == _gf_true) { changelog_snap_handle_ascii_change( this, &(((changelog_local_t *)(frame->local))->cld)); } } UNLOCK(&priv->c_snap_lock); wind: changelog_color_fop_and_inc_cnt(this, priv, frame->local); STACK_WIND(frame, changelog_truncate_cbk, FIRST_CHILD(this), FIRST_CHILD(this)->fops->truncate, loc, offset, xdata); return 0; } int32_t changelog_ftruncate_cbk(call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno, struct iatt *prebuf, struct iatt *postbuf, dict_t *xdata) { changelog_priv_t *priv = NULL; changelog_local_t *local = NULL; priv = this->private; local = frame->local; CHANGELOG_COND_GOTO(priv, ((op_ret < 0) || !local), unwind); changelog_update(this, priv, local, CHANGELOG_TYPE_DATA); unwind: changelog_dec_fop_cnt(this, priv, local); CHANGELOG_STACK_UNWIND(ftruncate, frame, op_ret, op_errno, prebuf, postbuf, xdata); return 0; } int32_t changelog_ftruncate(call_frame_t *frame, xlator_t *this, fd_t *fd, off_t offset, dict_t *xdata) { changelog_priv_t *priv = NULL; priv = this->private; CHANGELOG_NOT_ACTIVE_THEN_GOTO(frame, priv, wind); CHANGELOG_INIT(this, frame->local, fd->inode, fd->inode->gfid, 0); LOCK(&priv->c_snap_lock); { if (priv->c_snap_fd != -1 && priv->barrier_enabled == _gf_true) { changelog_snap_handle_ascii_change( this, &(((changelog_local_t *)(frame->local))->cld)); } } UNLOCK(&priv->c_snap_lock); wind: changelog_color_fop_and_inc_cnt(this, priv, frame->local); STACK_WIND(frame, changelog_ftruncate_cbk, FIRST_CHILD(this), FIRST_CHILD(this)->fops->ftruncate, fd, offset, xdata); return 0; } /* writev() */ int32_t changelog_writev_cbk(call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno, struct iatt *prebuf, struct iatt *postbuf, dict_t *xdata) { changelog_priv_t *priv = NULL; changelog_local_t *local = NULL; priv = this->private; local = frame->local; CHANGELOG_COND_GOTO(priv, ((op_ret <= 0) || !local), unwind); changelog_update(this, priv, local, CHANGELOG_TYPE_DATA); unwind: changelog_dec_fop_cnt(this, priv, local); CHANGELOG_STACK_UNWIND(writev, frame, op_ret, op_errno, prebuf, postbuf, xdata); return 0; } int32_t changelog_writev(call_frame_t *frame, xlator_t *this, fd_t *fd, struct iovec *vector, int32_t count, off_t offset, uint32_t flags, struct iobref *iobref, dict_t *xdata) { changelog_priv_t *priv = NULL; priv = this->private; CHANGELOG_NOT_ACTIVE_THEN_GOTO(frame, priv, wind); CHANGELOG_INIT(this, frame->local, fd->inode, fd->inode->gfid, 0); LOCK(&priv->c_snap_lock); { if (priv->c_snap_fd != -1 && priv->barrier_enabled == _gf_true) { changelog_snap_handle_ascii_change( this, &(((changelog_local_t *)(frame->local))->cld)); } } UNLOCK(&priv->c_snap_lock); wind: changelog_color_fop_and_inc_cnt(this, priv, frame->local); STACK_WIND(frame, changelog_writev_cbk, FIRST_CHILD(this), FIRST_CHILD(this)->fops->writev, fd, vector, count, offset, flags, iobref, xdata); return 0; } /* }}} */ /* open, release and other beasts */ /* {{{ */ int changelog_open_cbk(call_frame_t *frame, void *cookie, xlator_t *this, int op_ret, int op_errno, fd_t *fd, dict_t *xdata) { int ret = 0; changelog_priv_t *priv = NULL; changelog_event_t ev = { 0, }; gf_boolean_t logopen = _gf_false; priv = this->private; if (frame->local) { frame->local = NULL; logopen = _gf_true; } CHANGELOG_COND_GOTO(priv, ((op_ret < 0) || !logopen), unwind); /* fill the event structure */ ev.ev_type = CHANGELOG_OP_TYPE_OPEN; gf_uuid_copy(ev.u.open.gfid, fd->inode->gfid); ev.u.open.flags = fd->flags; changelog_dispatch_event(this, priv, &ev); if (changelog_ev_selected(this, &priv->ev_selection, CHANGELOG_OP_TYPE_RELEASE)) { ret = fd_ctx_set(fd, this, (uint64_t)(long)0x1); if (ret) gf_smsg(this->name, GF_LOG_WARNING, 0, CHANGELOG_MSG_SET_FD_CONTEXT, NULL); } unwind: CHANGELOG_STACK_UNWIND(open, frame, op_ret, op_errno, fd, xdata); return 0; } int changelog_open(call_frame_t *frame, xlator_t *this, loc_t *loc, int flags, fd_t *fd, dict_t *xdata) { changelog_priv_t *priv = NULL; priv = this->private; CHANGELOG_NOT_ACTIVE_THEN_GOTO(frame, priv, wind); frame->local = (void *)0x1; /* do not dereference in ->cbk */ wind: STACK_WIND(frame, changelog_open_cbk, FIRST_CHILD(this), FIRST_CHILD(this)->fops->open, loc, flags, fd, xdata); return 0; } /* }}} */ /* {{{ */ /* }}} */ int32_t _changelog_generic_dispatcher(dict_t *dict, char *key, data_t *value, void *data) { xlator_t *this = NULL; changelog_priv_t *priv = NULL; this = data; priv = this->private; changelog_dispatch_event(this, priv, (changelog_event_t *)value->data); return 0; } /** * changelog ipc dispatches events, pointers of which are passed in * @xdata. Dispatching is orderless (whatever order dict_foreach() * traverses the dictionary). */ int32_t changelog_ipc(call_frame_t *frame, xlator_t *this, int32_t op, dict_t *xdata) { if (op != GF_IPC_TARGET_CHANGELOG) goto wind; /* it's for us, do the job */ if (xdata) (void)dict_foreach(xdata, _changelog_generic_dispatcher, this); STACK_UNWIND_STRICT(ipc, frame, 0, 0, NULL); return 0; wind: STACK_WIND(frame, default_ipc_cbk, FIRST_CHILD(this), FIRST_CHILD(this)->fops->ipc, op, xdata); return 0; } /* {{{ */ int32_t changelog_release(xlator_t *this, fd_t *fd) { changelog_event_t ev = { 0, }; changelog_priv_t *priv = NULL; priv = this->private; ev.ev_type = CHANGELOG_OP_TYPE_RELEASE; gf_uuid_copy(ev.u.release.gfid, fd->inode->gfid); changelog_dispatch_event(this, priv, &ev); (void)fd_ctx_del(fd, this, NULL); return 0; } /* }}} */ /** * The * - @init () * - @fini () * - @reconfigure () * ... and helper routines */ /** * needed if there are more operation modes in the future. */ static void changelog_assign_opmode(changelog_priv_t *priv, char *mode) { if (strncmp(mode, "realtime", 8) == 0) { priv->op_mode = CHANGELOG_MODE_RT; } } static void changelog_assign_encoding(changelog_priv_t *priv, char *enc) { if (strncmp(enc, "binary", 6) == 0) { priv->encode_mode = CHANGELOG_ENCODE_BINARY; } else if (strncmp(enc, "ascii", 5) == 0) { priv->encode_mode = CHANGELOG_ENCODE_ASCII; } } static void changelog_assign_barrier_timeout(changelog_priv_t *priv, uint32_t timeout) { LOCK(&priv->lock); { priv->timeout.tv_sec = timeout; } UNLOCK(&priv->lock); } /* cleanup any helper threads that are running */ static void changelog_cleanup_helper_threads(xlator_t *this, changelog_priv_t *priv) { if (priv->cr.rollover_th) { (void)changelog_thread_cleanup(this, priv->cr.rollover_th); priv->cr.rollover_th = 0; } if (priv->cf.fsync_th) { (void)changelog_thread_cleanup(this, priv->cf.fsync_th); priv->cf.fsync_th = 0; } } /* spawn helper thread; cleaning up in case of errors */ static int changelog_spawn_helper_threads(xlator_t *this, changelog_priv_t *priv) { int ret = 0; /* Geo-Rep snapshot dependency: * * To implement explicit rollover of changlog journal on barrier * notification, a pipe is created to communicate between * 'changelog_rollover' thread and changelog main thread. The select * call used to wait till roll-over time in changelog_rollover thread * is modified to wait on read end of the pipe. When barrier * notification comes (i.e, in 'reconfigure'), select in * changelog_rollover thread is woken up explicitly by writing into * the write end of the pipe in 'reconfigure'. */ priv->cr.notify = _gf_false; priv->cr.this = this; ret = gf_thread_create(&priv->cr.rollover_th, NULL, changelog_rollover, priv, "clogro"); if (ret) goto out; if (priv->fsync_interval) { priv->cf.this = this; ret = gf_thread_create(&priv->cf.fsync_th, NULL, changelog_fsync_thread, priv, "clogfsyn"); } if (ret) changelog_cleanup_helper_threads(this, priv); out: return ret; } int notify(xlator_t *this, int event, void *data, ...) { changelog_priv_t *priv = NULL; dict_t *dict = NULL; char buf[1] = {1}; int barrier = DICT_DEFAULT; gf_boolean_t bclean_req = _gf_false; int ret = 0; int ret1 = 0; struct list_head queue = { 0, }; uint64_t xprtcnt = 0; uint64_t clntcnt = 0; changelog_clnt_t *conn = NULL; gf_boolean_t cleanup_notify = _gf_false; char sockfile[UNIX_PATH_MAX] = { 0, }; rpcsvc_listener_t *listener = NULL; rpcsvc_listener_t *next = NULL; INIT_LIST_HEAD(&queue); priv = this->private; if (!priv) goto out; if (event == GF_EVENT_PARENT_DOWN) { priv->victim = data; gf_log(this->name, GF_LOG_INFO, "cleanup changelog rpc connection of brick %s", priv->victim->name); if (priv->rpc_active) { this->cleanup_starting = 1; changelog_destroy_rpc_listner(this, priv); conn = &priv->connections; if (conn) changelog_ev_cleanup_connections(this, conn); xprtcnt = GF_ATOMIC_GET(priv->xprtcnt); clntcnt = GF_ATOMIC_GET(priv->clntcnt); if (!xprtcnt && !clntcnt) { LOCK(&priv->lock); { cleanup_notify = priv->notify_down; priv->notify_down = _gf_true; } UNLOCK(&priv->lock); list_for_each_entry_safe(listener, next, &priv->rpc->listeners, list) { if (listener->trans) { rpc_transport_unref(listener->trans); } } CHANGELOG_MAKE_SOCKET_PATH(priv->changelog_brick, sockfile, UNIX_PATH_MAX); sys_unlink(sockfile); if (priv->rpc) { rpcsvc_destroy(priv->rpc); priv->rpc = NULL; } if (!cleanup_notify) default_notify(this, GF_EVENT_PARENT_DOWN, data); } } else { default_notify(this, GF_EVENT_PARENT_DOWN, data); } goto out; } if (event == GF_EVENT_TRANSLATOR_OP) { dict = data; barrier = dict_get_str_boolean(dict, "barrier", DICT_DEFAULT); switch (barrier) { case DICT_ERROR: gf_smsg(this->name, GF_LOG_ERROR, 0, CHANGELOG_MSG_DICT_GET_FAILED, "dict_get_str_boolean", NULL); ret = -1; goto out; case BARRIER_OFF: gf_smsg(this->name, GF_LOG_INFO, 0, CHANGELOG_MSG_BARRIER_STATE_NOTIFY, "off", NULL); CHANGELOG_NOT_ON_THEN_GOTO(priv, ret, out); LOCK(&priv->c_snap_lock); { changelog_snap_logging_stop(this, priv); } UNLOCK(&priv->c_snap_lock); LOCK(&priv->bflags.lock); { if (priv->bflags.barrier_ext == _gf_false) ret = -1; } UNLOCK(&priv->bflags.lock); if (ret == -1) { gf_smsg(this->name, GF_LOG_ERROR, 0, CHANGELOG_MSG_BARRIER_ERROR, NULL); goto out; } /* Stop changelog barrier and dequeue all fops */ LOCK(&priv->lock); { if (priv->barrier_enabled == _gf_true) __chlog_barrier_disable(this, &queue); else ret = -1; } UNLOCK(&priv->lock); /* If ret = -1, then changelog barrier is already * disabled because of error or timeout. */ if (ret == 0) { chlog_barrier_dequeue_all(this, &queue); gf_smsg(this->name, GF_LOG_INFO, 0, CHANGELOG_MSG_BARRIER_DISABLED, NULL); } else { gf_smsg(this->name, GF_LOG_ERROR, 0, CHANGELOG_MSG_BARRIER_ALREADY_DISABLED, NULL); } LOCK(&priv->bflags.lock); { priv->bflags.barrier_ext = _gf_false; } UNLOCK(&priv->bflags.lock); goto out; case BARRIER_ON: gf_smsg(this->name, GF_LOG_INFO, 0, CHANGELOG_MSG_BARRIER_STATE_NOTIFY, "on", NULL); CHANGELOG_NOT_ON_THEN_GOTO(priv, ret, out); LOCK(&priv->c_snap_lock); { changelog_snap_logging_start(this, priv); } UNLOCK(&priv->c_snap_lock); LOCK(&priv->bflags.lock); { if (priv->bflags.barrier_ext == _gf_true) ret = -1; else priv->bflags.barrier_ext = _gf_true; } UNLOCK(&priv->bflags.lock); if (ret == -1) { gf_smsg(this->name, GF_LOG_ERROR, 0, CHANGELOG_MSG_BARRIER_ON_ERROR, NULL); goto out; } ret = pthread_mutex_lock(&priv->bn.bnotify_mutex); CHANGELOG_PTHREAD_ERROR_HANDLE_1(ret, out, bclean_req); { priv->bn.bnotify = _gf_true; } ret = pthread_mutex_unlock(&priv->bn.bnotify_mutex); CHANGELOG_PTHREAD_ERROR_HANDLE_1(ret, out, bclean_req); /* Start changelog barrier */ LOCK(&priv->lock); { ret = __chlog_barrier_enable(this, priv); } UNLOCK(&priv->lock); if (ret == -1) { changelog_barrier_cleanup(this, priv, &queue); goto out; } gf_smsg(this->name, GF_LOG_INFO, 0, CHANGELOG_MSG_BARRIER_ENABLE, NULL); ret = changelog_barrier_notify(priv, buf); if (ret) { gf_smsg(this->name, GF_LOG_ERROR, 0, CHANGELOG_MSG_WRITE_FAILED, "Explicit roll over", NULL); changelog_barrier_cleanup(this, priv, &queue); ret = -1; goto out; } ret = pthread_mutex_lock(&priv->bn.bnotify_mutex); CHANGELOG_PTHREAD_ERROR_HANDLE_1(ret, out, bclean_req); { /* The while condition check is required here to * handle spurious wakeup of cond wait that can * happen with pthreads. See man page */ while (priv->bn.bnotify == _gf_true) { ret = pthread_cond_wait(&priv->bn.bnotify_cond, &priv->bn.bnotify_mutex); CHANGELOG_PTHREAD_ERROR_HANDLE_1(ret, out, bclean_req); } if (priv->bn.bnotify_error == _gf_true) { ret = -1; priv->bn.bnotify_error = _gf_false; } } ret1 = pthread_mutex_unlock(&priv->bn.bnotify_mutex); CHANGELOG_PTHREAD_ERROR_HANDLE_1(ret1, out, bclean_req); gf_smsg(this->name, GF_LOG_INFO, 0, CHANGELOG_MSG_BNOTIFY_COND_INFO, NULL); goto out; case DICT_DEFAULT: gf_smsg(this->name, GF_LOG_ERROR, 0, CHANGELOG_MSG_BARRIER_KEY_NOT_FOUND, NULL); ret = -1; goto out; default: gf_smsg(this->name, GF_LOG_ERROR, EINVAL, CHANGELOG_MSG_ERROR_IN_DICT_GET, NULL); ret = -1; goto out; } } else { ret = default_notify(this, event, data); } out: if (bclean_req) changelog_barrier_cleanup(this, priv, &queue); return ret; } int32_t mem_acct_init(xlator_t *this) { int ret = -1; if (!this) return ret; ret = xlator_mem_acct_init(this, gf_changelog_mt_end + 1); if (ret != 0) { gf_smsg(this->name, GF_LOG_WARNING, ENOMEM, CHANGELOG_MSG_MEMORY_INIT_FAILED, NULL); return ret; } return ret; } static int changelog_init(xlator_t *this, changelog_priv_t *priv) { int i = 0; int ret = -1; struct timeval tv = { 0, }; changelog_log_data_t cld = { 0, }; ret = gettimeofday(&tv, NULL); if (ret) { gf_smsg(this->name, GF_LOG_ERROR, errno, CHANGELOG_MSG_GET_TIME_FAILURE, NULL); goto out; } priv->slice.tv_start = tv; priv->maps[CHANGELOG_TYPE_DATA] = "D "; priv->maps[CHANGELOG_TYPE_METADATA] = "M "; priv->maps[CHANGELOG_TYPE_METADATA_XATTR] = "M "; priv->maps[CHANGELOG_TYPE_ENTRY] = "E "; for (; i < CHANGELOG_MAX_TYPE; i++) { /* start with version 1 */ priv->slice.changelog_version[i] = 1; } if (!priv->active) return ret; /** * start with a fresh changelog file every time. this is done * in case there was an encoding change. so... things are kept * simple here. */ ret = changelog_fill_rollover_data(&cld, _gf_false); if (ret) goto out; ret = htime_open(this, priv, cld.cld_roll_time); /* call htime open with cld's rollover_time */ if (ret) goto out; LOCK(&priv->lock); { ret = changelog_inject_single_event(this, priv, &cld); } UNLOCK(&priv->lock); /* ... and finally spawn the helpers threads */ ret = changelog_spawn_helper_threads(this, priv); out: return ret; } /** * Init barrier related condition variables and locks */ static int changelog_barrier_pthread_init(xlator_t *this, changelog_priv_t *priv) { gf_boolean_t bn_mutex_init = _gf_false; gf_boolean_t bn_cond_init = _gf_false; gf_boolean_t dm_mutex_black_init = _gf_false; gf_boolean_t dm_cond_black_init = _gf_false; gf_boolean_t dm_mutex_white_init = _gf_false; gf_boolean_t dm_cond_white_init = _gf_false; gf_boolean_t cr_mutex_init = _gf_false; gf_boolean_t cr_cond_init = _gf_false; int ret = 0; if ((ret = pthread_mutex_init(&priv->bn.bnotify_mutex, NULL)) != 0) { gf_smsg(this->name, GF_LOG_ERROR, errno, CHANGELOG_MSG_PTHREAD_MUTEX_INIT_FAILED, "name=bnotify", "ret=%d", ret, NULL); ret = -1; goto out; } bn_mutex_init = _gf_true; if ((ret = pthread_cond_init(&priv->bn.bnotify_cond, NULL)) != 0) { gf_smsg(this->name, GF_LOG_ERROR, errno, CHANGELOG_MSG_PTHREAD_COND_INIT_FAILED, "name=bnotify", "ret=%d", ret, NULL); ret = -1; goto out; } bn_cond_init = _gf_true; if ((ret = pthread_mutex_init(&priv->dm.drain_black_mutex, NULL)) != 0) { gf_smsg(this->name, GF_LOG_ERROR, errno, CHANGELOG_MSG_PTHREAD_MUTEX_INIT_FAILED, "name=drain_black", "ret=%d", ret, NULL); ret = -1; goto out; } dm_mutex_black_init = _gf_true; if ((ret = pthread_cond_init(&priv->dm.drain_black_cond, NULL)) != 0) { gf_smsg(this->name, GF_LOG_ERROR, errno, CHANGELOG_MSG_PTHREAD_COND_INIT_FAILED, "name=drain_black", "ret=%d", ret, NULL); ret = -1; goto out; } dm_cond_black_init = _gf_true; if ((ret = pthread_mutex_init(&priv->dm.drain_white_mutex, NULL)) != 0) { gf_smsg(this->name, GF_LOG_ERROR, errno, CHANGELOG_MSG_PTHREAD_MUTEX_INIT_FAILED, "name=drain_white", "ret=%d", ret, NULL); ret = -1; goto out; } dm_mutex_white_init = _gf_true; if ((ret = pthread_cond_init(&priv->dm.drain_white_cond, NULL)) != 0) { gf_smsg(this->name, GF_LOG_ERROR, errno, CHANGELOG_MSG_PTHREAD_COND_INIT_FAILED, "name=drain_white", "ret=%d", ret, NULL); ret = -1; goto out; } dm_cond_white_init = _gf_true; if ((pthread_mutex_init(&priv->cr.lock, NULL)) != 0) { gf_smsg(this->name, GF_LOG_ERROR, errno, CHANGELOG_MSG_PTHREAD_MUTEX_INIT_FAILED, "name=changelog_rollover", "ret=%d", ret, NULL); ret = -1; goto out; } cr_mutex_init = _gf_true; if ((pthread_cond_init(&priv->cr.cond, NULL)) != 0) { gf_smsg(this->name, GF_LOG_ERROR, errno, CHANGELOG_MSG_PTHREAD_COND_INIT_FAILED, "changelog_rollover cond init failed", "ret=%d", ret, NULL); ret = -1; goto out; } cr_cond_init = _gf_true; out: if (ret) { if (bn_mutex_init) pthread_mutex_destroy(&priv->bn.bnotify_mutex); if (bn_cond_init) pthread_cond_destroy(&priv->bn.bnotify_cond); if (dm_mutex_black_init) pthread_mutex_destroy(&priv->dm.drain_black_mutex); if (dm_cond_black_init) pthread_cond_destroy(&priv->dm.drain_black_cond); if (dm_mutex_white_init) pthread_mutex_destroy(&priv->dm.drain_white_mutex); if (dm_cond_white_init) pthread_cond_destroy(&priv->dm.drain_white_cond); if (cr_mutex_init) pthread_mutex_destroy(&priv->cr.lock); if (cr_cond_init) pthread_cond_destroy(&priv->cr.cond); } return ret; } /* Destroy barrier related condition variables and locks */ static void changelog_barrier_pthread_destroy(changelog_priv_t *priv) { pthread_mutex_destroy(&priv->bn.bnotify_mutex); pthread_cond_destroy(&priv->bn.bnotify_cond); pthread_mutex_destroy(&priv->dm.drain_black_mutex); pthread_cond_destroy(&priv->dm.drain_black_cond); pthread_mutex_destroy(&priv->dm.drain_white_mutex); pthread_cond_destroy(&priv->dm.drain_white_cond); pthread_mutex_destroy(&priv->cr.lock); pthread_cond_destroy(&priv->cr.cond); LOCK_DESTROY(&priv->bflags.lock); } static void changelog_cleanup_rpc(xlator_t *this, changelog_priv_t *priv) { /* terminate rpc server */ if (!this->cleanup_starting) changelog_destroy_rpc_listner(this, priv); (void)changelog_cleanup_rpc_threads(this, priv); /* cleanup rot buffs */ rbuf_dtor(priv->rbuf); /* cleanup poller thread */ if (priv->poller) (void)changelog_thread_cleanup(this, priv->poller); } int reconfigure(xlator_t *this, dict_t *options) { int ret = 0; char *tmp = NULL; changelog_priv_t *priv = NULL; gf_boolean_t active_earlier = _gf_true; gf_boolean_t active_now = _gf_true; gf_boolean_t rpc_active_earlier = _gf_true; gf_boolean_t rpc_active_now = _gf_true; gf_boolean_t iniate_rpc = _gf_false; changelog_time_slice_t *slice = NULL; changelog_log_data_t cld = { 0, }; char htime_dir[PATH_MAX] = { 0, }; char csnap_dir[PATH_MAX] = { 0, }; struct timeval tv = { 0, }; uint32_t timeout = 0; priv = this->private; if (!priv) goto out; ret = -1; active_earlier = priv->active; rpc_active_earlier = priv->rpc_active; /* first stop the rollover and the fsync thread */ changelog_cleanup_helper_threads(this, priv); GF_OPTION_RECONF("changelog-dir", tmp, options, str, out); if (!tmp) { gf_smsg(this->name, GF_LOG_ERROR, 0, CHANGELOG_MSG_DIR_OPTIONS_NOT_SET, NULL); goto out; } GF_FREE(priv->changelog_dir); priv->changelog_dir = gf_strdup(tmp); if (!priv->changelog_dir) goto out; ret = mkdir_p(priv->changelog_dir, 0600, _gf_true); if (ret) goto out; CHANGELOG_FILL_HTIME_DIR(priv->changelog_dir, htime_dir); ret = mkdir_p(htime_dir, 0600, _gf_true); if (ret) goto out; CHANGELOG_FILL_CSNAP_DIR(priv->changelog_dir, csnap_dir); ret = mkdir_p(csnap_dir, 0600, _gf_true); if (ret) goto out; GF_OPTION_RECONF("changelog", active_now, options, bool, out); GF_OPTION_RECONF("changelog-notification", rpc_active_now, options, bool, out); /* If journalling is enabled, enable rpc notifications */ if (active_now && !active_earlier) { if (!rpc_active_earlier) iniate_rpc = _gf_true; } if (rpc_active_now && !rpc_active_earlier) { iniate_rpc = _gf_true; } /* TODO: Disable of changelog-notifications is not supported for now * as there is no clean way of cleaning up of rpc resources */ if (iniate_rpc) { ret = changelog_init_rpc(this, priv); if (ret) goto out; priv->rpc_active = _gf_true; } /** * changelog_handle_change() handles changes that could possibly * have been submit changes before changelog deactivation. */ if (!active_now) priv->active = _gf_false; GF_OPTION_RECONF("op-mode", tmp, options, str, out); changelog_assign_opmode(priv, tmp); tmp = NULL; GF_OPTION_RECONF("encoding", tmp, options, str, out); changelog_assign_encoding(priv, tmp); GF_OPTION_RECONF("rollover-time", priv->rollover_time, options, int32, out); GF_OPTION_RECONF("fsync-interval", priv->fsync_interval, options, int32, out); GF_OPTION_RECONF("changelog-barrier-timeout", timeout, options, time, out); changelog_assign_barrier_timeout(priv, timeout); GF_OPTION_RECONF("capture-del-path", priv->capture_del_path, options, bool, out); if (active_now || active_earlier) { ret = changelog_fill_rollover_data(&cld, !active_now); if (ret) goto out; slice = &priv->slice; LOCK(&priv->lock); { ret = changelog_inject_single_event(this, priv, &cld); if (!ret && active_now) SLICE_VERSION_UPDATE(slice); } UNLOCK(&priv->lock); if (ret) goto out; if (active_now) { if (!active_earlier) { gf_smsg(this->name, GF_LOG_INFO, 0, CHANGELOG_MSG_RECONFIGURE, NULL); if (gettimeofday(&tv, NULL)) { gf_smsg(this->name, GF_LOG_ERROR, 0, CHANGELOG_MSG_HTIME_FETCH_FAILED, NULL); ret = -1; goto out; } htime_create(this, priv, tv.tv_sec); } ret = changelog_spawn_helper_threads(this, priv); } } out: if (ret) { /* TODO */ } else { gf_msg_debug(this->name, 0, "changelog reconfigured"); if (active_now && priv) priv->active = _gf_true; } return ret; } static void changelog_freeup_options(xlator_t *this, changelog_priv_t *priv) { int ret = 0; ret = priv->cb->dtor(this, &priv->cd); if (ret) gf_smsg(this->name, GF_LOG_ERROR, 0, CHANGELOG_MSG_FREEUP_FAILED, NULL); GF_FREE(priv->changelog_brick); GF_FREE(priv->changelog_dir); } static int changelog_init_options(xlator_t *this, changelog_priv_t *priv) { int ret = 0; char *tmp = NULL; uint32_t timeout = 0; char htime_dir[PATH_MAX] = { 0, }; char csnap_dir[PATH_MAX] = { 0, }; GF_OPTION_INIT("changelog-brick", tmp, str, error_return); priv->changelog_brick = gf_strdup(tmp); if (!priv->changelog_brick) goto error_return; tmp = NULL; GF_OPTION_INIT("changelog-dir", tmp, str, dealloc_1); priv->changelog_dir = gf_strdup(tmp); if (!priv->changelog_dir) goto dealloc_1; tmp = NULL; /** * create the directory even if change-logging would be inactive * so that consumers can _look_ into it (finding nothing...) */ ret = mkdir_p(priv->changelog_dir, 0600, _gf_true); if (ret) goto dealloc_2; CHANGELOG_FILL_HTIME_DIR(priv->changelog_dir, htime_dir); ret = mkdir_p(htime_dir, 0600, _gf_true); if (ret) goto dealloc_2; CHANGELOG_FILL_CSNAP_DIR(priv->changelog_dir, csnap_dir); ret = mkdir_p(csnap_dir, 0600, _gf_true); if (ret) goto dealloc_2; GF_OPTION_INIT("changelog", priv->active, bool, dealloc_2); GF_OPTION_INIT("changelog-notification", priv->rpc_active, bool, dealloc_2); GF_OPTION_INIT("capture-del-path", priv->capture_del_path, bool, dealloc_2); GF_OPTION_INIT("op-mode", tmp, str, dealloc_2); changelog_assign_opmode(priv, tmp); tmp = NULL; GF_OPTION_INIT("encoding", tmp, str, dealloc_2); changelog_assign_encoding(priv, tmp); changelog_encode_change(priv); GF_OPTION_INIT("rollover-time", priv->rollover_time, int32, dealloc_2); GF_OPTION_INIT("fsync-interval", priv->fsync_interval, int32, dealloc_2); GF_OPTION_INIT("changelog-barrier-timeout", timeout, time, dealloc_2); changelog_assign_barrier_timeout(priv, timeout); GF_ASSERT(cb_bootstrap[priv->op_mode].mode == priv->op_mode); priv->cb = &cb_bootstrap[priv->op_mode]; /* ... now bootstrap the logger */ ret = priv->cb->ctor(this, &priv->cd); if (ret) goto dealloc_2; priv->changelog_fd = -1; return 0; dealloc_2: GF_FREE(priv->changelog_dir); dealloc_1: GF_FREE(priv->changelog_brick); error_return: return -1; } static int changelog_init_rpc(xlator_t *this, changelog_priv_t *priv) { rpcsvc_t *rpc = NULL; changelog_ev_selector_t *selection = NULL; selection = &priv->ev_selection; /* initialize event selection */ changelog_init_event_selection(this, selection); priv->rbuf = rbuf_init(NR_ROTT_BUFFS); if (!priv->rbuf) goto cleanup_thread; rpc = changelog_init_rpc_listener(this, priv, priv->rbuf, NR_DISPATCHERS); if (!rpc) goto cleanup_rbuf; priv->rpc = rpc; return 0; cleanup_rbuf: rbuf_dtor(priv->rbuf); cleanup_thread: if (priv->poller) (void)changelog_thread_cleanup(this, priv->poller); return -1; } int32_t init(xlator_t *this) { int ret = -1; changelog_priv_t *priv = NULL; GF_VALIDATE_OR_GOTO("changelog", this, error_return); if (!this->children || this->children->next) { gf_smsg(this->name, GF_LOG_ERROR, 0, CHANGELOG_MSG_CHILD_MISCONFIGURED, NULL); goto error_return; } if (!this->parents) { gf_smsg(this->name, GF_LOG_ERROR, 0, CHANGELOG_MSG_VOL_MISCONFIGURED, NULL); goto error_return; } priv = GF_CALLOC(1, sizeof(*priv), gf_changelog_mt_priv_t); if (!priv) goto error_return; this->local_pool = mem_pool_new(changelog_local_t, 64); if (!this->local_pool) { gf_smsg(this->name, GF_LOG_ERROR, ENOMEM, CHANGELOG_MSG_NO_MEMORY, NULL); goto cleanup_priv; } LOCK_INIT(&priv->lock); LOCK_INIT(&priv->c_snap_lock); GF_ATOMIC_INIT(priv->listnercnt, 0); GF_ATOMIC_INIT(priv->clntcnt, 0); GF_ATOMIC_INIT(priv->xprtcnt, 0); INIT_LIST_HEAD(&priv->xprt_list); priv->htime_fd = -1; ret = changelog_init_options(this, priv); if (ret) goto cleanup_mempool; /* snap dependency changes */ priv->dm.black_fop_cnt = 0; priv->dm.white_fop_cnt = 0; priv->dm.drain_wait_black = _gf_false; priv->dm.drain_wait_white = _gf_false; priv->current_color = FOP_COLOR_BLACK; priv->explicit_rollover = _gf_false; priv->cr.notify = _gf_false; /* Mutex is not needed as threads are not spawned yet */ priv->bn.bnotify = _gf_false; priv->bn.bnotify_error = _gf_false; ret = changelog_barrier_pthread_init(this, priv); if (ret) goto cleanup_options; LOCK_INIT(&priv->bflags.lock); priv->bflags.barrier_ext = _gf_false; /* Changelog barrier init */ INIT_LIST_HEAD(&priv->queue); priv->barrier_enabled = _gf_false; if (priv->rpc_active || priv->active) { /* RPC ball rolling.. */ ret = changelog_init_rpc(this, priv); if (ret) goto cleanup_barrier; priv->rpc_active = _gf_true; } ret = changelog_init(this, priv); if (ret) goto cleanup_rpc; gf_msg_debug(this->name, 0, "changelog translator loaded"); this->private = priv; return 0; cleanup_rpc: if (priv->rpc_active) { changelog_cleanup_rpc(this, priv); } cleanup_barrier: changelog_barrier_pthread_destroy(priv); cleanup_options: changelog_freeup_options(this, priv); cleanup_mempool: mem_pool_destroy(this->local_pool); this->local_pool = NULL; cleanup_priv: GF_FREE(priv); error_return: this->private = NULL; return -1; } void fini(xlator_t *this) { changelog_priv_t *priv = NULL; struct list_head queue = { 0, }; priv = this->private; if (priv) { if (priv->active || priv->rpc_active) { /* terminate RPC server/threads */ changelog_cleanup_rpc(this, priv); } /* call barrier_disable to cancel timer */ if (priv->barrier_enabled) __chlog_barrier_disable(this, &queue); /* cleanup barrier related objects */ changelog_barrier_pthread_destroy(priv); /* cleanup helper threads */ changelog_cleanup_helper_threads(this, priv); /* cleanup allocated options */ changelog_freeup_options(this, priv); /* deallocate mempool */ mem_pool_destroy(this->local_pool); if (priv->htime_fd != -1) { sys_close(priv->htime_fd); } /* finally, dealloac private variable */ GF_FREE(priv); } this->private = NULL; this->local_pool = NULL; return; } struct xlator_fops fops = { .open = changelog_open, .mknod = changelog_mknod, .mkdir = changelog_mkdir, .create = changelog_create, .symlink = changelog_symlink, .writev = changelog_writev, .truncate = changelog_truncate, .ftruncate = changelog_ftruncate, .link = changelog_link, .rename = changelog_rename, .unlink = changelog_unlink, .rmdir = changelog_rmdir, .setattr = changelog_setattr, .fsetattr = changelog_fsetattr, .setxattr = changelog_setxattr, .fsetxattr = changelog_fsetxattr, .removexattr = changelog_removexattr, .fremovexattr = changelog_fremovexattr, .ipc = changelog_ipc, .xattrop = changelog_xattrop, .fxattrop = changelog_fxattrop, }; struct xlator_cbks cbks = { .forget = changelog_forget, .release = changelog_release, }; struct volume_options options[] = { {.key = {"changelog"}, .type = GF_OPTION_TYPE_BOOL, .default_value = "off", .description = "enable/disable change-logging", .op_version = {3}, .flags = OPT_FLAG_SETTABLE, .level = OPT_STATUS_BASIC, .tags = {"journal", "georep", "glusterfind"}}, {.key = {"changelog-notification"}, .type = GF_OPTION_TYPE_BOOL, .default_value = "off", .description = "enable/disable changelog live notification", .op_version = {3}, .level = OPT_STATUS_BASIC, .tags = {"bitrot", "georep"}}, {.key = {"changelog-brick"}, .type = GF_OPTION_TYPE_PATH, .description = "brick path to generate unique socket file name." " should be the export directory of the volume strictly.", .default_value = "{{ brick.path }}", .op_version = {3}, .tags = {"journal"}}, {.key = {"changelog-dir"}, .type = GF_OPTION_TYPE_PATH, .description = "directory for the changelog files", .default_value = "{{ brick.path }}/.glusterfs/changelogs", .op_version = {3}, .flags = OPT_FLAG_SETTABLE, .level = OPT_STATUS_ADVANCED, .tags = {"journal", "georep", "glusterfind"}}, {.key = {"op-mode"}, .type = GF_OPTION_TYPE_STR, .default_value = "realtime", .value = {"realtime"}, .description = "operation mode - futuristic operation modes", .op_version = {3}, .tags = {"journal"}}, {.key = {"encoding"}, .type = GF_OPTION_TYPE_STR, .default_value = "ascii", .value = {"binary", "ascii"}, .description = "encoding type for changelogs", .op_version = {3}, .flags = OPT_FLAG_SETTABLE, .level = OPT_STATUS_ADVANCED, .tags = {"journal"}}, {.key = {"rollover-time"}, .default_value = "15", .type = GF_OPTION_TYPE_TIME, .description = "time to switch to a new changelog file (in seconds)", .op_version = {3}, .flags = OPT_FLAG_SETTABLE, .level = OPT_STATUS_ADVANCED, .tags = {"journal", "georep", "glusterfind"}}, {.key = {"fsync-interval"}, .type = GF_OPTION_TYPE_TIME, .default_value = "5", .description = "do not open CHANGELOG file with O_SYNC mode." " instead perform fsync() at specified intervals", .op_version = {3}, .flags = OPT_FLAG_SETTABLE, .level = OPT_STATUS_ADVANCED, .tags = {"journal"}}, {.key = {"changelog-barrier-timeout"}, .type = GF_OPTION_TYPE_TIME, .default_value = BARRIER_TIMEOUT, .description = "After 'timeout' seconds since the time 'barrier' " "option was set to \"on\", unlink/rmdir/rename " "operations are no longer blocked and previously " "blocked fops are allowed to go through", .op_version = {3}, .flags = OPT_FLAG_SETTABLE, .level = OPT_STATUS_ADVANCED, .tags = {"journal"}}, {.key = {"capture-del-path"}, .type = GF_OPTION_TYPE_BOOL, .default_value = "off", .description = "enable/disable capturing paths of deleted entries", .op_version = {3}, .flags = OPT_FLAG_SETTABLE, .level = OPT_STATUS_BASIC, .tags = {"journal", "glusterfind"}}, {.key = {NULL}}, }; xlator_api_t xlator_api = { .init = init, .fini = fini, .notify = notify, .reconfigure = reconfigure, .mem_acct_init = mem_acct_init, .op_version = {1}, /* Present from the initial version */ .fops = &fops, .cbks = &cbks, .options = options, .identifier = "changelog", .category = GF_MAINTAINED, };