summaryrefslogtreecommitdiffstats
path: root/xlators/features/sdfs
diff options
context:
space:
mode:
Diffstat (limited to 'xlators/features/sdfs')
-rw-r--r--xlators/features/sdfs/Makefile.am3
-rw-r--r--xlators/features/sdfs/src/Makefile.am19
-rw-r--r--xlators/features/sdfs/src/sdfs-messages.h67
-rw-r--r--xlators/features/sdfs/src/sdfs.c1479
-rw-r--r--xlators/features/sdfs/src/sdfs.h49
5 files changed, 1617 insertions, 0 deletions
diff --git a/xlators/features/sdfs/Makefile.am b/xlators/features/sdfs/Makefile.am
new file mode 100644
index 00000000000..a985f42a877
--- /dev/null
+++ b/xlators/features/sdfs/Makefile.am
@@ -0,0 +1,3 @@
+SUBDIRS = src
+
+CLEANFILES =
diff --git a/xlators/features/sdfs/src/Makefile.am b/xlators/features/sdfs/src/Makefile.am
new file mode 100644
index 00000000000..6118d46ad22
--- /dev/null
+++ b/xlators/features/sdfs/src/Makefile.am
@@ -0,0 +1,19 @@
+if WITH_SERVER
+xlator_LTLIBRARIES = sdfs.la
+endif
+xlatordir = $(libdir)/glusterfs/$(PACKAGE_VERSION)/xlator/features
+
+sdfs_la_LDFLAGS = -module $(GF_XLATOR_DEFAULT_LDFLAGS)
+
+sdfs_la_SOURCES = sdfs.c
+sdfs_la_LIBADD = $(top_builddir)/libglusterfs/src/libglusterfs.la
+
+noinst_HEADERS = sdfs.h sdfs-messages.h $(top_builddir)/xlators/lib/src/libxlator.h
+
+AM_CPPFLAGS = $(GF_CPPFLAGS) -I$(top_srcdir)/libglusterfs/src \
+ -I$(top_srcdir)/xlators/lib/src \
+ -I$(top_srcdir)/rpc/xdr/src/ -I$(top_builddir)/rpc/xdr/src/
+
+AM_CFLAGS = -Wall -fno-strict-aliasing $(GF_CFLAGS)
+
+CLEANFILES =
diff --git a/xlators/features/sdfs/src/sdfs-messages.h b/xlators/features/sdfs/src/sdfs-messages.h
new file mode 100644
index 00000000000..3053efa8935
--- /dev/null
+++ b/xlators/features/sdfs/src/sdfs-messages.h
@@ -0,0 +1,67 @@
+/*
+ Copyright (c) 2016 Red Hat, Inc. <http://www.redhat.com>
+ This file is part of GlusterFS.
+
+ This file is licensed to you under your choice of the GNU Lesser
+ General Public License, version 3 or any later version (LGPLv3 or
+ later), or the GNU General Public License, version 2 (GPLv2), in all
+ cases as published by the Free Software Foundation.
+ */
+
+#ifndef _DFS_MESSAGES_H_
+#define _DFS_MESSAGES_H_
+
+#include <glusterfs/glfs-message-id.h>
+
+/* file bit-rot-bitd-messages.h
+ * brief SDFS log-message IDs and their descriptions
+ */
+
+/* NOTE: Rules for message additions
+ * 1) Each instance of a message is _better_ left with a unique message ID, even
+ * if the message format is the same. Reasoning is that, if the message
+ * format needs to change in one instance, the other instances are not
+ * impacted or the new change does not change the ID of the instance being
+ * modified.
+ * 2) Addition of a message,
+ * - Should increment the GLFS_NUM_MESSAGES
+ * - Append to the list of messages defined, towards the end
+ * - Retain macro naming as glfs_msg_X (for redability across developers)
+ * NOTE: Rules for message format modifications
+ * 3) Check acorss the code if the message ID macro in question is reused
+ * anywhere. If reused then then the modifications should ensure correctness
+ * everywhere, or needs a new message ID as (1) above was not adhered to. If
+ * not used anywhere, proceed with the required modification.
+ * NOTE: Rules for message deletion
+ * 4) Check (3) and if used anywhere else, then cannot be deleted. If not used
+ * anywhere, then can be deleted, but will leave a hole by design, as
+ * addition rules specify modification to the end of the list and not filling
+ * holes.
+ */
+
+#define GLFS_SDFS_BASE GLFS_MSGID_COMP_SDFS
+#define GLFS_SDFS_NUM_MESSAGES 2
+#define GLFS_MSGID_END (GLFS_SDFS_BASE + GLFS_SDFS_NUM_MESSAGES + 1)
+/* Messaged with message IDs */
+#define glfs_msg_start_x GLFS_DFS_BASE, "Invalid: Start of messages"
+/*------------*/
+
+#define SDFS_MSG_ENTRYLK_ERROR (GLFS_SDFS_BASE + 1)
+/*!
+ * @messageid
+ * @diagnosis
+ * @recommendedaction
+ *
+ */
+
+#define SDFS_MSG_MKDIR_ERROR (GLFS_SDFS_BASE + 2)
+/*!
+ * @messageid
+ * @diagnosis
+ * @recommendedaction
+ *
+ */
+/*------------*/
+
+#define glfs_msg_end_x GLFS_MSGID_END, "Invalid: End of messages"
+#endif /* !_SDFS_MESSAGES_H_ */
diff --git a/xlators/features/sdfs/src/sdfs.c b/xlators/features/sdfs/src/sdfs.c
new file mode 100644
index 00000000000..aaf13f0852e
--- /dev/null
+++ b/xlators/features/sdfs/src/sdfs.c
@@ -0,0 +1,1479 @@
+/*
+ Copyright (c) 2016 Red Hat, Inc. <http://www.redhat.com>
+ This file is part of GlusterFS.
+
+ This file is licensed to you under your choice of the GNU Lesser
+ General Public License, version 3 or any later version (LGPLv3 or
+ later), or the GNU General Public License, version 2 (GPLv2), in all
+ cases as published by the Free Software Foundation.
+*/
+#include <libgen.h>
+#include "sdfs.h"
+
+static int
+sdfs_frame_return(call_frame_t *frame)
+{
+ sdfs_local_t *local = NULL;
+
+ if (!frame)
+ return -1;
+
+ local = frame->local;
+
+ return GF_ATOMIC_DEC(local->call_cnt);
+}
+
+static void
+sdfs_lock_free(sdfs_entry_lock_t *entrylk)
+{
+ if (entrylk == NULL)
+ goto out;
+
+ loc_wipe(&entrylk->parent_loc);
+ GF_FREE(entrylk->basename);
+
+out:
+ return;
+}
+
+static void
+sdfs_lock_array_free(sdfs_lock_t *lock)
+{
+ sdfs_entry_lock_t *entrylk = NULL;
+ int i = 0;
+
+ if (lock == NULL)
+ goto out;
+
+ for (i = 0; i < lock->lock_count; i++) {
+ entrylk = &lock->entrylk[i];
+ sdfs_lock_free(entrylk);
+ }
+
+out:
+ return;
+}
+
+static void
+sdfs_local_cleanup(sdfs_local_t *local)
+{
+ if (!local)
+ return;
+
+ loc_wipe(&local->loc);
+ loc_wipe(&local->parent_loc);
+
+ if (local->stub) {
+ call_stub_destroy(local->stub);
+ local->stub = NULL;
+ }
+
+ sdfs_lock_array_free(local->lock);
+ GF_FREE(local->lock);
+
+ mem_put(local);
+}
+
+static int
+sdfs_build_parent_loc(loc_t *parent, loc_t *child)
+{
+ int ret = -1;
+ char *path = NULL;
+
+ if (!child->parent) {
+ goto out;
+ }
+ parent->inode = inode_ref(child->parent);
+ path = gf_strdup(child->path);
+ if (!path) {
+ ret = -ENOMEM;
+ goto out;
+ }
+
+ parent->path = dirname(path);
+ if (!parent->path) {
+ goto out;
+ }
+
+ gf_uuid_copy(parent->gfid, child->pargfid);
+ return 0;
+
+out:
+ GF_FREE(path);
+ return ret;
+}
+
+static sdfs_local_t *
+sdfs_local_init(call_frame_t *frame, xlator_t *this)
+{
+ sdfs_local_t *local = NULL;
+
+ local = mem_get0(this->local_pool);
+ if (!local)
+ goto out;
+
+ frame->local = local;
+out:
+ return local;
+}
+
+static int
+sdfs_get_new_frame_common(call_frame_t *frame, call_frame_t **new_frame)
+{
+ int ret = -1;
+ sdfs_local_t *local = NULL;
+ client_t *client = NULL;
+
+ *new_frame = copy_frame(frame);
+ if (!*new_frame) {
+ goto err;
+ }
+
+ client = frame->root->client;
+ gf_client_ref(client);
+ (*new_frame)->root->client = client;
+
+ local = sdfs_local_init(*new_frame, THIS);
+ if (!local) {
+ goto err;
+ }
+
+ local->main_frame = frame;
+ /*Set unique lk-owner for the fop*/
+ set_lk_owner_from_ptr(&(*new_frame)->root->lk_owner, (*new_frame)->root);
+
+ ret = 0;
+err:
+ if ((ret == -1) && (*new_frame)) {
+ SDFS_STACK_DESTROY((*new_frame));
+ *new_frame = NULL;
+ }
+
+ return ret;
+}
+
+static int
+sdfs_get_new_frame(call_frame_t *frame, loc_t *loc, call_frame_t **new_frame)
+{
+ int ret = -1;
+ sdfs_local_t *local = NULL;
+
+ ret = sdfs_get_new_frame_common(frame, new_frame);
+ if (ret < 0) {
+ goto err;
+ }
+
+ local = (*new_frame)->local;
+
+ ret = sdfs_build_parent_loc(&local->parent_loc, loc);
+ if (ret) {
+ goto err;
+ }
+
+ ret = loc_copy(&local->loc, loc);
+ if (ret == -1) {
+ goto err;
+ }
+
+ ret = 0;
+err:
+ if (ret && (*new_frame)) {
+ SDFS_STACK_DESTROY((*new_frame));
+ *new_frame = NULL;
+ ret = -1;
+ }
+
+ return ret;
+}
+
+static int
+sdfs_get_new_frame_readdirp(call_frame_t *frame, fd_t *fd,
+ call_frame_t **new_frame)
+{
+ int ret = -1;
+ sdfs_local_t *local = NULL;
+
+ ret = sdfs_get_new_frame_common(frame, new_frame);
+ if (ret < 0) {
+ goto err;
+ }
+
+ local = (*new_frame)->local;
+ local->parent_loc.inode = inode_ref(fd->inode);
+ gf_uuid_copy(local->parent_loc.gfid, fd->inode->gfid);
+
+ ret = 0;
+err:
+ return ret;
+}
+
+int
+sdfs_entrylk_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
+ int32_t op_ret, int32_t op_errno, dict_t *xdata)
+{
+ sdfs_local_t *local = NULL;
+ call_stub_t *stub = NULL;
+
+ local = frame->local;
+
+ local->op_ret = op_ret;
+ local->op_errno = op_errno;
+
+ if (local->stub) {
+ stub = local->stub;
+ local->stub = NULL;
+ call_resume(stub);
+ } else {
+ if (op_ret < 0)
+ gf_msg(this->name, GF_LOG_ERROR, 0, SDFS_MSG_ENTRYLK_ERROR,
+ "Unlocking entry lock failed for %s", local->loc.name);
+
+ SDFS_STACK_DESTROY(frame);
+ }
+
+ return 0;
+}
+
+int
+sdfs_mkdir_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
+ int32_t op_ret, int32_t op_errno, inode_t *inode,
+ struct iatt *stbuf, struct iatt *preparent,
+ struct iatt *postparent, dict_t *xdata)
+{
+ sdfs_local_t *local = NULL;
+
+ local = frame->local;
+
+ STACK_UNWIND_STRICT(mkdir, local->main_frame, op_ret, op_errno, inode,
+ stbuf, preparent, postparent, xdata);
+
+ local->main_frame = NULL;
+ STACK_WIND(frame, sdfs_entrylk_cbk, FIRST_CHILD(this),
+ FIRST_CHILD(this)->fops->entrylk, this->name, &local->parent_loc,
+ local->loc.name, ENTRYLK_UNLOCK, ENTRYLK_WRLCK, xdata);
+ return 0;
+}
+
+int
+sdfs_mkdir_helper(call_frame_t *frame, xlator_t *this, loc_t *loc, mode_t mode,
+ mode_t umask, dict_t *xdata)
+{
+ sdfs_local_t *local = NULL;
+ char gfid[GF_UUID_BUF_SIZE] = {0};
+ int op_errno = -1;
+
+ local = frame->local;
+
+ gf_uuid_unparse(loc->pargfid, gfid);
+
+ if (local->op_ret < 0) {
+ gf_msg(this->name, GF_LOG_ERROR, 0, SDFS_MSG_ENTRYLK_ERROR,
+ "Acquiring entry lock failed for directory %s "
+ "with parent gfid %s",
+ local->loc.name, gfid);
+ op_errno = local->op_errno;
+ goto err;
+ }
+
+ STACK_WIND(frame, sdfs_mkdir_cbk, FIRST_CHILD(this),
+ FIRST_CHILD(this)->fops->mkdir, loc, mode, umask, xdata);
+
+ return 0;
+err:
+ STACK_UNWIND_STRICT(mkdir, local->main_frame, -1, op_errno, NULL, NULL,
+ NULL, NULL, NULL);
+
+ local->main_frame = NULL;
+ SDFS_STACK_DESTROY(frame);
+ return 0;
+}
+
+int
+sdfs_mkdir(call_frame_t *frame, xlator_t *this, loc_t *loc, mode_t mode,
+ mode_t umask, dict_t *xdata)
+{
+ sdfs_local_t *local = NULL;
+ call_frame_t *new_frame = NULL;
+ call_stub_t *stub = NULL;
+ int op_errno = 0;
+
+ if (-1 == sdfs_get_new_frame(frame, loc, &new_frame)) {
+ op_errno = ENOMEM;
+ goto err;
+ }
+
+ stub = fop_mkdir_stub(new_frame, sdfs_mkdir_helper, loc, mode, umask,
+ xdata);
+ if (!stub) {
+ op_errno = ENOMEM;
+ goto err;
+ }
+
+ local = new_frame->local;
+ local->stub = stub;
+
+ STACK_WIND(new_frame, sdfs_entrylk_cbk, FIRST_CHILD(this),
+ FIRST_CHILD(this)->fops->entrylk, this->name, &local->parent_loc,
+ local->loc.name, ENTRYLK_LOCK, ENTRYLK_WRLCK, xdata);
+
+ return 0;
+err:
+ STACK_UNWIND_STRICT(mkdir, frame, -1, op_errno, NULL, NULL, NULL, NULL,
+ NULL);
+
+ if (new_frame)
+ SDFS_STACK_DESTROY(new_frame);
+
+ return 0;
+}
+
+int
+sdfs_rmdir_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
+ int32_t op_ret, int32_t op_errno, struct iatt *preparent,
+ struct iatt *postparent, dict_t *xdata)
+{
+ sdfs_local_t *local = NULL;
+
+ local = frame->local;
+
+ STACK_UNWIND_STRICT(rmdir, local->main_frame, op_ret, op_errno, preparent,
+ postparent, xdata);
+
+ local->main_frame = NULL;
+ STACK_WIND(frame, sdfs_entrylk_cbk, FIRST_CHILD(this),
+ FIRST_CHILD(this)->fops->entrylk, this->name, &local->parent_loc,
+ local->loc.name, ENTRYLK_UNLOCK, ENTRYLK_WRLCK, xdata);
+ return 0;
+}
+
+int
+sdfs_rmdir_helper(call_frame_t *frame, xlator_t *this, loc_t *loc, int flags,
+ dict_t *xdata)
+{
+ sdfs_local_t *local = NULL;
+ char gfid[GF_UUID_BUF_SIZE] = {0};
+
+ local = frame->local;
+
+ gf_uuid_unparse(loc->pargfid, gfid);
+
+ if (local->op_ret < 0) {
+ gf_msg(this->name, GF_LOG_ERROR, 0, SDFS_MSG_ENTRYLK_ERROR,
+ "Acquiring entry lock failed for directory %s "
+ "with parent gfid %s",
+ local->loc.name, gfid);
+ goto err;
+ }
+
+ STACK_WIND(frame, sdfs_rmdir_cbk, FIRST_CHILD(this),
+ FIRST_CHILD(this)->fops->rmdir, loc, flags, xdata);
+
+ return 0;
+err:
+ STACK_UNWIND_STRICT(rmdir, local->main_frame, -1, local->op_errno, NULL,
+ NULL, NULL);
+
+ local->main_frame = NULL;
+ SDFS_STACK_DESTROY(frame);
+ return 0;
+}
+
+int
+sdfs_rmdir(call_frame_t *frame, xlator_t *this, loc_t *loc, int flags,
+ dict_t *xdata)
+{
+ sdfs_local_t *local = NULL;
+ call_frame_t *new_frame = NULL;
+ call_stub_t *stub = NULL;
+ int op_errno = 0;
+
+ if (-1 == sdfs_get_new_frame(frame, loc, &new_frame)) {
+ op_errno = ENOMEM;
+ goto err;
+ }
+
+ stub = fop_rmdir_stub(new_frame, sdfs_rmdir_helper, loc, flags, xdata);
+ if (!stub) {
+ op_errno = ENOMEM;
+ goto err;
+ }
+
+ local = new_frame->local;
+ local->stub = stub;
+
+ STACK_WIND(new_frame, sdfs_entrylk_cbk, FIRST_CHILD(this),
+ FIRST_CHILD(this)->fops->entrylk, this->name, &local->parent_loc,
+ local->loc.name, ENTRYLK_LOCK, ENTRYLK_WRLCK, xdata);
+
+ return 0;
+err:
+ STACK_UNWIND_STRICT(rmdir, frame, -1, op_errno, NULL, NULL, NULL);
+
+ if (new_frame)
+ SDFS_STACK_DESTROY(new_frame);
+
+ return 0;
+}
+
+int
+sdfs_create_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
+ int32_t op_ret, int32_t op_errno, fd_t *fd, inode_t *inode,
+ struct iatt *stbuf, struct iatt *preparent,
+ struct iatt *postparent, dict_t *xdata)
+{
+ sdfs_local_t *local = NULL;
+
+ local = frame->local;
+
+ STACK_UNWIND_STRICT(create, local->main_frame, op_ret, op_errno, fd, inode,
+ stbuf, preparent, postparent, xdata);
+
+ local->main_frame = NULL;
+ STACK_WIND(frame, sdfs_entrylk_cbk, FIRST_CHILD(this),
+ FIRST_CHILD(this)->fops->entrylk, this->name, &local->parent_loc,
+ local->loc.name, ENTRYLK_UNLOCK, ENTRYLK_WRLCK, xdata);
+ return 0;
+}
+
+int
+sdfs_create_helper(call_frame_t *frame, xlator_t *this, loc_t *loc,
+ int32_t flags, mode_t mode, mode_t umask, fd_t *fd,
+ dict_t *xdata)
+{
+ sdfs_local_t *local = NULL;
+ char gfid[GF_UUID_BUF_SIZE] = {0};
+
+ local = frame->local;
+
+ gf_uuid_unparse(loc->pargfid, gfid);
+
+ if (local->op_ret < 0) {
+ gf_msg(this->name, GF_LOG_ERROR, 0, SDFS_MSG_ENTRYLK_ERROR,
+ "Acquiring entry lock failed for directory %s "
+ "with parent gfid %s",
+ local->loc.name, gfid);
+ goto err;
+ }
+
+ STACK_WIND(frame, sdfs_create_cbk, FIRST_CHILD(this),
+ FIRST_CHILD(this)->fops->create, loc, flags, mode, umask, fd,
+ xdata);
+
+ return 0;
+err:
+ STACK_UNWIND_STRICT(create, local->main_frame, -1, local->op_errno, NULL,
+ NULL, NULL, NULL, NULL, NULL);
+
+ local->main_frame = NULL;
+ SDFS_STACK_DESTROY(frame);
+ return 0;
+}
+
+int
+sdfs_create(call_frame_t *frame, xlator_t *this, loc_t *loc, int32_t flags,
+ mode_t mode, mode_t umask, fd_t *fd, dict_t *xdata)
+{
+ sdfs_local_t *local = NULL;
+ call_frame_t *new_frame = NULL;
+ call_stub_t *stub = NULL;
+ int op_errno = 0;
+
+ if (-1 == sdfs_get_new_frame(frame, loc, &new_frame)) {
+ op_errno = ENOMEM;
+ goto err;
+ }
+
+ stub = fop_create_stub(new_frame, sdfs_create_helper, loc, flags, mode,
+ umask, fd, xdata);
+ if (!stub) {
+ op_errno = ENOMEM;
+ goto err;
+ }
+
+ local = new_frame->local;
+ local->stub = stub;
+
+ STACK_WIND(new_frame, sdfs_entrylk_cbk, FIRST_CHILD(this),
+ FIRST_CHILD(this)->fops->entrylk, this->name, &local->parent_loc,
+ local->loc.name, ENTRYLK_LOCK, ENTRYLK_WRLCK, xdata);
+
+ return 0;
+err:
+ STACK_UNWIND_STRICT(create, frame, -1, op_errno, NULL, NULL, NULL, NULL,
+ NULL, NULL);
+
+ if (new_frame)
+ SDFS_STACK_DESTROY(new_frame);
+
+ return 0;
+}
+
+int
+sdfs_unlink_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
+ int32_t op_ret, int32_t op_errno, struct iatt *preparent,
+ struct iatt *postparent, dict_t *xdata)
+{
+ sdfs_local_t *local = NULL;
+
+ local = frame->local;
+
+ STACK_UNWIND_STRICT(unlink, local->main_frame, op_ret, op_errno, preparent,
+ postparent, xdata);
+
+ local->main_frame = NULL;
+ STACK_WIND(frame, sdfs_entrylk_cbk, FIRST_CHILD(this),
+ FIRST_CHILD(this)->fops->entrylk, this->name, &local->parent_loc,
+ local->loc.name, ENTRYLK_UNLOCK, ENTRYLK_WRLCK, xdata);
+ return 0;
+}
+
+int
+sdfs_unlink_helper(call_frame_t *frame, xlator_t *this, loc_t *loc, int flags,
+ dict_t *xdata)
+{
+ sdfs_local_t *local = NULL;
+ char gfid[GF_UUID_BUF_SIZE] = {0};
+
+ local = frame->local;
+
+ gf_uuid_unparse(loc->pargfid, gfid);
+
+ if (local->op_ret < 0) {
+ gf_msg(this->name, GF_LOG_ERROR, 0, SDFS_MSG_ENTRYLK_ERROR,
+ "Acquiring entry lock failed for directory %s "
+ "with parent gfid %s",
+ local->loc.name, gfid);
+ goto err;
+ }
+
+ STACK_WIND(frame, sdfs_unlink_cbk, FIRST_CHILD(this),
+ FIRST_CHILD(this)->fops->unlink, loc, flags, xdata);
+
+ return 0;
+err:
+ STACK_UNWIND_STRICT(unlink, local->main_frame, -1, local->op_errno, NULL,
+ NULL, NULL);
+
+ local->main_frame = NULL;
+ SDFS_STACK_DESTROY(frame);
+ return 0;
+}
+
+int
+sdfs_unlink(call_frame_t *frame, xlator_t *this, loc_t *loc, int flags,
+ dict_t *xdata)
+{
+ sdfs_local_t *local = NULL;
+ call_frame_t *new_frame = NULL;
+ call_stub_t *stub = NULL;
+ int op_errno = 0;
+
+ if (-1 == sdfs_get_new_frame(frame, loc, &new_frame)) {
+ op_errno = ENOMEM;
+ goto err;
+ }
+
+ stub = fop_unlink_stub(new_frame, sdfs_unlink_helper, loc, flags, xdata);
+ if (!stub) {
+ op_errno = ENOMEM;
+ goto err;
+ }
+
+ local = new_frame->local;
+ local->stub = stub;
+
+ STACK_WIND(new_frame, sdfs_entrylk_cbk, FIRST_CHILD(this),
+ FIRST_CHILD(this)->fops->entrylk, this->name, &local->parent_loc,
+ local->loc.name, ENTRYLK_LOCK, ENTRYLK_WRLCK, xdata);
+
+ return 0;
+err:
+ STACK_UNWIND_STRICT(unlink, frame, -1, op_errno, NULL, NULL, NULL);
+
+ if (new_frame)
+ SDFS_STACK_DESTROY(new_frame);
+
+ return 0;
+}
+
+int
+sdfs_symlink_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
+ int32_t op_ret, int32_t op_errno, inode_t *inode,
+ struct iatt *stbuf, struct iatt *preparent,
+ struct iatt *postparent, dict_t *xdata)
+{
+ sdfs_local_t *local = NULL;
+
+ local = frame->local;
+
+ STACK_UNWIND_STRICT(link, local->main_frame, op_ret, op_errno, inode, stbuf,
+ preparent, postparent, xdata);
+
+ local->main_frame = NULL;
+ STACK_WIND(frame, sdfs_entrylk_cbk, FIRST_CHILD(this),
+ FIRST_CHILD(this)->fops->entrylk, this->name, &local->parent_loc,
+ local->loc.name, ENTRYLK_UNLOCK, ENTRYLK_WRLCK, xdata);
+ return 0;
+}
+
+int
+sdfs_symlink_helper(call_frame_t *frame, xlator_t *this, const char *linkname,
+ loc_t *loc, mode_t umask, dict_t *xdata)
+{
+ sdfs_local_t *local = NULL;
+ char gfid[GF_UUID_BUF_SIZE] = {0};
+
+ local = frame->local;
+
+ gf_uuid_unparse(loc->pargfid, gfid);
+
+ if (local->op_ret < 0) {
+ gf_msg(this->name, GF_LOG_ERROR, 0, SDFS_MSG_ENTRYLK_ERROR,
+ "Acquiring entry lock failed for directory %s "
+ "with parent gfid %s",
+ local->loc.name, gfid);
+ goto err;
+ }
+
+ STACK_WIND(frame, sdfs_symlink_cbk, FIRST_CHILD(this),
+ FIRST_CHILD(this)->fops->symlink, linkname, loc, umask, xdata);
+
+ return 0;
+err:
+ STACK_UNWIND_STRICT(link, local->main_frame, -1, local->op_errno, NULL,
+ NULL, NULL, NULL, NULL);
+
+ local->main_frame = NULL;
+ SDFS_STACK_DESTROY(frame);
+ return 0;
+}
+
+int
+sdfs_symlink(call_frame_t *frame, xlator_t *this, const char *linkname,
+ loc_t *loc, mode_t umask, dict_t *xdata)
+{
+ sdfs_local_t *local = NULL;
+ call_frame_t *new_frame = NULL;
+ call_stub_t *stub = NULL;
+ int op_errno = 0;
+
+ if (-1 == sdfs_get_new_frame(frame, loc, &new_frame)) {
+ op_errno = ENOMEM;
+ goto err;
+ }
+
+ stub = fop_symlink_stub(new_frame, sdfs_symlink_helper, linkname, loc,
+ umask, xdata);
+ if (!stub) {
+ op_errno = ENOMEM;
+ goto err;
+ }
+
+ local = new_frame->local;
+ local->stub = stub;
+
+ STACK_WIND(new_frame, sdfs_entrylk_cbk, FIRST_CHILD(this),
+ FIRST_CHILD(this)->fops->entrylk, this->name, &local->parent_loc,
+ local->loc.name, ENTRYLK_LOCK, ENTRYLK_WRLCK, xdata);
+
+ return 0;
+err:
+ STACK_UNWIND_STRICT(link, frame, -1, op_errno, NULL, NULL, NULL, NULL,
+ NULL);
+
+ if (new_frame)
+ SDFS_STACK_DESTROY(new_frame);
+
+ return 0;
+}
+
+int
+sdfs_common_entrylk_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
+ int32_t op_ret, int32_t op_errno, dict_t *xdata)
+{
+ sdfs_local_t *local = NULL;
+ int this_call_cnt = 0;
+ int lk_index = 0;
+ sdfs_lock_t *locks = NULL;
+ call_stub_t *stub = NULL;
+
+ local = frame->local;
+ locks = local->lock;
+ lk_index = (long)cookie;
+
+ if (op_ret < 0) {
+ local->op_ret = op_ret;
+ local->op_errno = op_errno;
+ } else {
+ locks->entrylk->locked[lk_index] = _gf_true;
+ }
+
+ this_call_cnt = sdfs_frame_return(frame);
+ if (this_call_cnt > 0) {
+ gf_log(this->name, GF_LOG_DEBUG,
+ "As there are more callcnt (%d) returning without WIND",
+ this_call_cnt);
+ return 0;
+ }
+
+ if (local->stub) {
+ stub = local->stub;
+ local->stub = NULL;
+ call_resume(stub);
+ } else {
+ if (local->op_ret < 0)
+ gf_msg(this->name, GF_LOG_ERROR, 0, SDFS_MSG_ENTRYLK_ERROR,
+ "unlocking entry lock failed ");
+ SDFS_STACK_DESTROY(frame);
+ }
+
+ return 0;
+}
+
+int
+sdfs_link_cbk(call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret,
+ int32_t op_errno, inode_t *inode, struct iatt *stbuf,
+ struct iatt *preparent, struct iatt *postparent, dict_t *xdata)
+{
+ sdfs_local_t *local = NULL;
+ sdfs_lock_t *lock = NULL;
+ int i = 0;
+ int lock_count = 0;
+
+ local = frame->local;
+ lock = local->lock;
+
+ STACK_UNWIND_STRICT(link, local->main_frame, op_ret, op_errno, inode, stbuf,
+ preparent, postparent, xdata);
+
+ local->main_frame = NULL;
+ lock_count = lock->lock_count;
+ for (i = 0; i < lock_count; i++) {
+ STACK_WIND_COOKIE(frame, sdfs_common_entrylk_cbk, (void *)(long)i,
+ FIRST_CHILD(this), FIRST_CHILD(this)->fops->entrylk,
+ this->name, &lock->entrylk[i].parent_loc,
+ lock->entrylk[i].basename, ENTRYLK_UNLOCK,
+ ENTRYLK_WRLCK, xdata);
+ }
+
+ return 0;
+}
+
+int
+sdfs_link_helper(call_frame_t *frame, xlator_t *this, loc_t *oldloc,
+ loc_t *newloc, dict_t *xdata)
+{
+ sdfs_local_t *local = NULL;
+ sdfs_lock_t *locks = NULL;
+ gf_boolean_t stack_destroy = _gf_true;
+ int lock_count = 0;
+ int i = 0;
+
+ local = frame->local;
+ locks = local->lock;
+
+ if (local->op_ret < 0) {
+ gf_msg(this->name, GF_LOG_ERROR, 0, SDFS_MSG_ENTRYLK_ERROR,
+ "Acquiring entry lock failed");
+ goto err;
+ }
+
+ STACK_WIND(frame, sdfs_link_cbk, FIRST_CHILD(this),
+ FIRST_CHILD(this)->fops->link, oldloc, newloc, xdata);
+
+ return 0;
+err:
+ STACK_UNWIND_STRICT(link, local->main_frame, -1, local->op_errno, NULL,
+ NULL, NULL, NULL, NULL);
+
+ local->main_frame = NULL;
+ for (i = 0; i < locks->lock_count && locks->entrylk->locked[i]; i++) {
+ lock_count++;
+ }
+ GF_ATOMIC_INIT(local->call_cnt, lock_count);
+
+ for (i = 0; i < lock_count; i++) {
+ if (!locks->entrylk->locked[i]) {
+ lock_count++;
+ continue;
+ }
+
+ stack_destroy = _gf_false;
+ STACK_WIND(frame, sdfs_common_entrylk_cbk, FIRST_CHILD(this),
+ FIRST_CHILD(this)->fops->entrylk, this->name,
+ &locks->entrylk[i].parent_loc, locks->entrylk[i].basename,
+ ENTRYLK_UNLOCK, ENTRYLK_WRLCK, xdata);
+ }
+
+ if (stack_destroy)
+ SDFS_STACK_DESTROY(frame);
+
+ return 0;
+}
+
+static int
+sdfs_init_entry_lock(sdfs_entry_lock_t *lock, loc_t *loc)
+{
+ int ret = 0;
+
+ ret = sdfs_build_parent_loc(&lock->parent_loc, loc);
+ if (ret)
+ return -1;
+
+ lock->basename = gf_strdup(loc->name);
+ if (!lock->basename)
+ return -1;
+
+ return 0;
+}
+
+int
+sdfs_entry_lock_cmp(const void *l1, const void *l2)
+{
+ const sdfs_entry_lock_t *r1 = l1;
+ const sdfs_entry_lock_t *r2 = l2;
+ int ret = 0;
+ uuid_t gfid1 = {0};
+ uuid_t gfid2 = {0};
+
+ loc_gfid((loc_t *)&r1->parent_loc, gfid1);
+ loc_gfid((loc_t *)&r2->parent_loc, gfid2);
+ ret = gf_uuid_compare(gfid1, gfid2);
+ /*Entrylks with NULL basename are the 'smallest'*/
+ if (ret == 0) {
+ if (!r1->basename)
+ return -1;
+ if (!r2->basename)
+ return 1;
+ ret = strcmp(r1->basename, r2->basename);
+ }
+
+ if (ret <= 0)
+ return -1;
+ else
+ return 1;
+}
+
+int
+sdfs_link(call_frame_t *frame, xlator_t *this, loc_t *oldloc, loc_t *newloc,
+ dict_t *xdata)
+{
+ sdfs_local_t *local = NULL;
+ call_frame_t *new_frame = NULL;
+ call_stub_t *stub = NULL;
+ sdfs_lock_t *lock = NULL;
+ client_t *client = NULL;
+ int ret = 0;
+ int op_errno = ENOMEM;
+
+ new_frame = copy_frame(frame);
+ if (!new_frame) {
+ op_errno = ENOMEM;
+ goto err;
+ }
+ /*Set unique lk-owner for the fop*/
+ set_lk_owner_from_ptr(&new_frame->root->lk_owner, new_frame->root);
+
+ gf_client_ref(client);
+ new_frame->root->client = client;
+ local = sdfs_local_init(new_frame, this);
+ if (!local) {
+ op_errno = ENOMEM;
+ goto err;
+ }
+
+ local->main_frame = frame;
+
+ lock = GF_CALLOC(1, sizeof(*lock), gf_common_mt_char);
+ if (!lock)
+ goto err;
+
+ local->lock = lock;
+
+ ret = sdfs_init_entry_lock(&lock->entrylk[0], newloc);
+ if (ret)
+ goto err;
+
+ ++lock->lock_count;
+
+ local->lock = lock;
+ GF_ATOMIC_INIT(local->call_cnt, lock->lock_count);
+
+ ret = loc_copy(&local->loc, newloc);
+ if (ret == -1) {
+ op_errno = ENOMEM;
+ goto err;
+ }
+
+ stub = fop_link_stub(new_frame, sdfs_link_helper, oldloc, newloc, xdata);
+ if (!stub) {
+ op_errno = ENOMEM;
+ goto err;
+ }
+
+ local->stub = stub;
+
+ STACK_WIND_COOKIE(new_frame, sdfs_common_entrylk_cbk, 0, FIRST_CHILD(this),
+ FIRST_CHILD(this)->fops->entrylk, this->name,
+ &lock->entrylk[0].parent_loc, lock->entrylk[0].basename,
+ ENTRYLK_LOCK, ENTRYLK_WRLCK, xdata);
+
+ return 0;
+err:
+
+ STACK_UNWIND_STRICT(link, frame, -1, op_errno, NULL, NULL, NULL, NULL,
+ NULL);
+
+ if (new_frame)
+ SDFS_STACK_DESTROY(new_frame);
+
+ return 0;
+}
+
+int
+sdfs_mknod_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
+ int32_t op_ret, int32_t op_errno, inode_t *inode,
+ struct iatt *stbuf, struct iatt *preparent,
+ struct iatt *postparent, dict_t *xdata)
+{
+ sdfs_local_t *local = NULL;
+
+ local = frame->local;
+
+ STACK_UNWIND_STRICT(mknod, local->main_frame, op_ret, op_errno, inode,
+ stbuf, preparent, postparent, xdata);
+
+ local->main_frame = NULL;
+ STACK_WIND(frame, sdfs_entrylk_cbk, FIRST_CHILD(this),
+ FIRST_CHILD(this)->fops->entrylk, this->name, &local->parent_loc,
+ local->loc.name, ENTRYLK_UNLOCK, ENTRYLK_WRLCK, xdata);
+ return 0;
+}
+
+int
+sdfs_mknod_helper(call_frame_t *frame, xlator_t *this, loc_t *loc, mode_t mode,
+ dev_t rdev, mode_t umask, dict_t *xdata)
+{
+ sdfs_local_t *local = NULL;
+ char gfid[GF_UUID_BUF_SIZE] = {0};
+
+ local = frame->local;
+
+ gf_uuid_unparse(loc->pargfid, gfid);
+
+ if (local->op_ret < 0) {
+ gf_msg(this->name, GF_LOG_ERROR, 0, SDFS_MSG_ENTRYLK_ERROR,
+ "Acquiring entry lock failed for directory %s "
+ "with parent gfid %s",
+ local->loc.name, gfid);
+ goto err;
+ }
+
+ STACK_WIND(frame, sdfs_mknod_cbk, FIRST_CHILD(this),
+ FIRST_CHILD(this)->fops->mknod, loc, mode, rdev, umask, xdata);
+
+ return 0;
+err:
+ STACK_UNWIND_STRICT(mknod, local->main_frame, -1, local->op_errno, NULL,
+ NULL, NULL, NULL, NULL);
+
+ local->main_frame = NULL;
+ SDFS_STACK_DESTROY(frame);
+ return 0;
+}
+
+int
+sdfs_mknod(call_frame_t *frame, xlator_t *this, loc_t *loc, mode_t mode,
+ dev_t rdev, mode_t umask, dict_t *xdata)
+{
+ sdfs_local_t *local = NULL;
+ call_frame_t *new_frame = NULL;
+ call_stub_t *stub = NULL;
+ int op_errno = 0;
+
+ if (-1 == sdfs_get_new_frame(frame, loc, &new_frame)) {
+ op_errno = ENOMEM;
+ goto err;
+ }
+
+ stub = fop_mknod_stub(new_frame, sdfs_mknod_helper, loc, mode, rdev, umask,
+ xdata);
+ if (!stub) {
+ op_errno = ENOMEM;
+ goto err;
+ }
+
+ local = new_frame->local;
+ local->stub = stub;
+
+ STACK_WIND(new_frame, sdfs_entrylk_cbk, FIRST_CHILD(this),
+ FIRST_CHILD(this)->fops->entrylk, this->name, &local->parent_loc,
+ local->loc.name, ENTRYLK_LOCK, ENTRYLK_WRLCK, xdata);
+
+ return 0;
+err:
+ STACK_UNWIND_STRICT(mknod, frame, -1, op_errno, NULL, NULL, NULL, NULL,
+ NULL);
+
+ if (new_frame)
+ SDFS_STACK_DESTROY(new_frame);
+
+ return 0;
+}
+
+int
+sdfs_rename_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
+ int32_t op_ret, int32_t op_errno, struct iatt *stbuf,
+ struct iatt *preoldparent, struct iatt *postoldparent,
+ struct iatt *prenewparent, struct iatt *postnewparent,
+ dict_t *xdata)
+{
+ sdfs_local_t *local = NULL;
+ sdfs_lock_t *lock = NULL;
+ int i = 0;
+ int call_cnt = 0;
+
+ local = frame->local;
+ lock = local->lock;
+ GF_ATOMIC_INIT(local->call_cnt, lock->lock_count);
+
+ STACK_UNWIND_STRICT(rename, local->main_frame, op_ret, op_errno, stbuf,
+ preoldparent, postoldparent, prenewparent,
+ postnewparent, xdata);
+
+ local->main_frame = NULL;
+ call_cnt = GF_ATOMIC_GET(local->call_cnt);
+
+ for (i = 0; i < call_cnt; i++) {
+ STACK_WIND_COOKIE(frame, sdfs_common_entrylk_cbk, (void *)(long)i,
+ FIRST_CHILD(this), FIRST_CHILD(this)->fops->entrylk,
+ this->name, &lock->entrylk[i].parent_loc,
+ lock->entrylk[i].basename, ENTRYLK_UNLOCK,
+ ENTRYLK_WRLCK, xdata);
+ }
+
+ return 0;
+}
+
+int
+sdfs_rename_helper(call_frame_t *frame, xlator_t *this, loc_t *oldloc,
+ loc_t *newloc, dict_t *xdata)
+{
+ sdfs_local_t *local = NULL;
+ sdfs_lock_t *lock = NULL;
+ gf_boolean_t stack_destroy = _gf_true;
+ int lock_count = 0;
+ int i = 0;
+
+ local = frame->local;
+ lock = local->lock;
+
+ if (local->op_ret < 0) {
+ gf_msg(this->name, GF_LOG_ERROR, 0, SDFS_MSG_ENTRYLK_ERROR,
+ "Acquiring entry lock failed ");
+ goto err;
+ }
+
+ STACK_WIND(frame, sdfs_rename_cbk, FIRST_CHILD(this),
+ FIRST_CHILD(this)->fops->rename, oldloc, newloc, xdata);
+
+ return 0;
+
+err:
+ STACK_UNWIND_STRICT(rename, local->main_frame, -1, local->op_errno, NULL,
+ NULL, NULL, NULL, NULL, NULL);
+
+ local->main_frame = NULL;
+ for (i = 0; i < lock->lock_count && lock->entrylk->locked[i]; i++) {
+ lock_count++;
+ }
+ GF_ATOMIC_INIT(local->call_cnt, lock_count);
+
+ for (i = 0; i < lock_count; i++) {
+ if (!lock->entrylk->locked[i]) {
+ lock_count++;
+ continue;
+ }
+ stack_destroy = _gf_false;
+ STACK_WIND(frame, sdfs_common_entrylk_cbk, FIRST_CHILD(this),
+ FIRST_CHILD(this)->fops->entrylk, this->name,
+ &lock->entrylk[i].parent_loc, lock->entrylk[i].basename,
+ ENTRYLK_UNLOCK, ENTRYLK_WRLCK, xdata);
+ }
+
+ if (stack_destroy)
+ SDFS_STACK_DESTROY(frame);
+
+ return 0;
+}
+
+int
+sdfs_rename(call_frame_t *frame, xlator_t *this, loc_t *oldloc, loc_t *newloc,
+ dict_t *xdata)
+{
+ sdfs_local_t *local = NULL;
+ sdfs_lock_t *lock = NULL;
+ call_frame_t *new_frame = NULL;
+ call_stub_t *stub = NULL;
+ client_t *client = NULL;
+ int ret = 0;
+ int op_errno = ENOMEM;
+ int i = 0;
+ int call_cnt = 0;
+
+ new_frame = copy_frame(frame);
+ if (!new_frame) {
+ op_errno = ENOMEM;
+ goto err;
+ }
+ /*Set unique lk-owner for the fop*/
+ set_lk_owner_from_ptr(&new_frame->root->lk_owner, new_frame->root);
+
+ gf_client_ref(client);
+ new_frame->root->client = client;
+ local = sdfs_local_init(new_frame, this);
+ if (!local) {
+ op_errno = ENOMEM;
+ goto err;
+ }
+
+ local->main_frame = frame;
+
+ lock = GF_CALLOC(1, sizeof(*lock), gf_common_mt_char);
+ if (!lock)
+ goto err;
+
+ local->lock = lock;
+
+ ret = sdfs_init_entry_lock(&lock->entrylk[0], oldloc);
+ if (ret)
+ goto err;
+ lock->entrylk->locked[0] = _gf_false;
+
+ ++lock->lock_count;
+
+ ret = sdfs_init_entry_lock(&lock->entrylk[1], newloc);
+ if (ret)
+ goto err;
+ lock->entrylk->locked[1] = _gf_false;
+
+ ++lock->lock_count;
+
+ qsort(lock->entrylk, lock->lock_count, sizeof(*lock->entrylk),
+ sdfs_entry_lock_cmp);
+
+ local->lock = lock;
+ GF_ATOMIC_INIT(local->call_cnt, lock->lock_count);
+
+ stub = fop_rename_stub(new_frame, sdfs_rename_helper, oldloc, newloc,
+ xdata);
+ if (!stub) {
+ op_errno = ENOMEM;
+ goto err;
+ }
+
+ local->stub = stub;
+ call_cnt = GF_ATOMIC_GET(local->call_cnt);
+ for (i = 0; i < call_cnt; i++) {
+ STACK_WIND_COOKIE(new_frame, sdfs_common_entrylk_cbk, (void *)(long)i,
+ FIRST_CHILD(this), FIRST_CHILD(this)->fops->entrylk,
+ this->name, &lock->entrylk[i].parent_loc,
+ lock->entrylk[i].basename, ENTRYLK_LOCK,
+ ENTRYLK_WRLCK, xdata);
+ }
+
+ return 0;
+err:
+
+ STACK_UNWIND_STRICT(rename, frame, -1, op_errno, NULL, NULL, NULL, NULL,
+ NULL, NULL);
+
+ if (new_frame)
+ SDFS_STACK_DESTROY(new_frame);
+
+ return 0;
+}
+
+int
+sdfs_lookup_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
+ int32_t op_ret, int32_t op_errno, inode_t *inode,
+ struct iatt *stbuf, dict_t *xdata, struct iatt *postparent)
+{
+ sdfs_local_t *local = NULL;
+
+ local = frame->local;
+
+ if (!local->loc.parent) {
+ sdfs_local_cleanup(local);
+ frame->local = NULL;
+ STACK_UNWIND_STRICT(lookup, frame, op_ret, op_errno, inode, stbuf,
+ xdata, postparent);
+ return 0;
+ }
+
+ STACK_UNWIND_STRICT(lookup, local->main_frame, op_ret, op_errno, inode,
+ stbuf, xdata, postparent);
+
+ local->main_frame = NULL;
+ STACK_WIND(frame, sdfs_entrylk_cbk, FIRST_CHILD(this),
+ FIRST_CHILD(this)->fops->entrylk, this->name, &local->parent_loc,
+ local->loc.name, ENTRYLK_UNLOCK, ENTRYLK_RDLCK, xdata);
+ return 0;
+}
+
+int
+sdfs_lookup_helper(call_frame_t *frame, xlator_t *this, loc_t *loc,
+ dict_t *xdata)
+{
+ sdfs_local_t *local = NULL;
+ char gfid[GF_UUID_BUF_SIZE] = {0};
+
+ local = frame->local;
+
+ gf_uuid_unparse(loc->pargfid, gfid);
+
+ if (local->op_ret < 0) {
+ gf_msg(this->name, GF_LOG_ERROR, 0, SDFS_MSG_ENTRYLK_ERROR,
+ "Acquiring entry lock failed for directory %s "
+ "with parent gfid %s",
+ local->loc.name, gfid);
+ goto err;
+ }
+
+ STACK_WIND(frame, sdfs_lookup_cbk, FIRST_CHILD(this),
+ FIRST_CHILD(this)->fops->lookup, loc, xdata);
+
+ return 0;
+err:
+ STACK_UNWIND_STRICT(lookup, local->main_frame, -1, local->op_errno, NULL,
+ NULL, NULL, NULL);
+ local->main_frame = NULL;
+
+ SDFS_STACK_DESTROY(frame);
+ return 0;
+}
+
+int
+sdfs_lookup(call_frame_t *frame, xlator_t *this, loc_t *loc, dict_t *xdata)
+{
+ sdfs_local_t *local = NULL;
+ call_frame_t *new_frame = NULL;
+ call_stub_t *stub = NULL;
+ int op_errno = 0;
+
+ if (!loc->parent) {
+ local = sdfs_local_init(frame, this);
+ if (!local) {
+ op_errno = ENOMEM;
+ goto err;
+ }
+
+ STACK_WIND_TAIL(frame, FIRST_CHILD(this),
+ FIRST_CHILD(this)->fops->lookup, loc, xdata);
+ return 0;
+ }
+
+ if (-1 == sdfs_get_new_frame(frame, loc, &new_frame)) {
+ op_errno = ENOMEM;
+ goto err;
+ }
+
+ stub = fop_lookup_stub(new_frame, sdfs_lookup_helper, loc, xdata);
+ if (!stub) {
+ op_errno = ENOMEM;
+ goto err;
+ }
+
+ local = new_frame->local;
+ local->stub = stub;
+
+ STACK_WIND(new_frame, sdfs_entrylk_cbk, FIRST_CHILD(this),
+ FIRST_CHILD(this)->fops->entrylk, this->name, &local->parent_loc,
+ local->loc.name, ENTRYLK_LOCK, ENTRYLK_RDLCK, xdata);
+
+ return 0;
+
+err:
+ STACK_UNWIND_STRICT(lookup, frame, -1, op_errno, NULL, NULL, NULL, NULL);
+
+ if (new_frame)
+ SDFS_STACK_DESTROY(new_frame);
+
+ return 0;
+}
+
+int32_t
+sdfs_readdirp_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
+ int32_t op_ret, int32_t op_errno, gf_dirent_t *entries,
+ dict_t *xdata)
+{
+ sdfs_local_t *local = NULL;
+
+ local = frame->local;
+ STACK_UNWIND_STRICT(readdirp, local->main_frame, op_ret, op_errno, entries,
+ xdata);
+
+ local->main_frame = NULL;
+ STACK_WIND(frame, sdfs_entrylk_cbk, FIRST_CHILD(this),
+ FIRST_CHILD(this)->fops->entrylk, this->name, &local->parent_loc,
+ NULL, ENTRYLK_UNLOCK, ENTRYLK_RDLCK, xdata);
+ return 0;
+}
+
+int32_t
+sdfs_readdirp_helper(call_frame_t *frame, xlator_t *this, fd_t *fd, size_t size,
+ off_t off, dict_t *xdata)
+{
+ sdfs_local_t *local = NULL;
+ char gfid[GF_UUID_BUF_SIZE] = {0};
+
+ local = frame->local;
+
+ gf_uuid_unparse(fd->inode->gfid, gfid);
+
+ if (local->op_ret < 0) {
+ gf_msg(this->name, GF_LOG_ERROR, 0, SDFS_MSG_ENTRYLK_ERROR,
+ "Acquiring entry lock failed for directory %s "
+ "with parent gfid %s",
+ local->loc.name, gfid);
+ goto err;
+ }
+
+ STACK_WIND(frame, sdfs_readdirp_cbk, FIRST_CHILD(this),
+ FIRST_CHILD(this)->fops->readdirp, fd, size, off, xdata);
+
+ return 0;
+err:
+ STACK_UNWIND_STRICT(readdirp, local->main_frame, -1, local->op_errno, NULL,
+ NULL);
+
+ local->main_frame = NULL;
+
+ SDFS_STACK_DESTROY(frame);
+ return 0;
+}
+
+int32_t
+sdfs_readdirp(call_frame_t *frame, xlator_t *this, fd_t *fd, size_t size,
+ off_t off, dict_t *xdata)
+{
+ sdfs_local_t *local = NULL;
+ call_frame_t *new_frame = NULL;
+ call_stub_t *stub = NULL;
+ int op_errno = 0;
+
+ if (-1 == sdfs_get_new_frame_readdirp(frame, fd, &new_frame)) {
+ op_errno = ENOMEM;
+ goto err;
+ }
+
+ stub = fop_readdirp_stub(new_frame, sdfs_readdirp_helper, fd, size, off,
+ xdata);
+ if (!stub) {
+ op_errno = ENOMEM;
+ goto err;
+ }
+
+ local = new_frame->local;
+ local->stub = stub;
+
+ STACK_WIND(new_frame, sdfs_entrylk_cbk, FIRST_CHILD(this),
+ FIRST_CHILD(this)->fops->entrylk, this->name, &local->parent_loc,
+ NULL, ENTRYLK_LOCK, ENTRYLK_RDLCK, xdata);
+
+ return 0;
+
+err:
+ STACK_UNWIND_STRICT(readdirp, frame, -1, op_errno, NULL, NULL);
+
+ if (new_frame)
+ SDFS_STACK_DESTROY(new_frame);
+
+ return 0;
+}
+
+int
+init(xlator_t *this)
+{
+ int ret = -1;
+
+ if (!this->children || this->children->next) {
+ gf_log(this->name, GF_LOG_ERROR,
+ "'dentry-fop-serializer' not configured with exactly one child");
+ goto out;
+ }
+
+ if (!this->parents) {
+ gf_log(this->name, GF_LOG_WARNING, "dangling volume. check volfile ");
+ }
+
+ this->local_pool = mem_pool_new(sdfs_local_t, 512);
+ if (!this->local_pool) {
+ goto out;
+ }
+
+ GF_OPTION_INIT("pass-through", this->pass_through, bool, out);
+
+ ret = 0;
+
+out:
+ return ret;
+}
+
+int
+reconfigure(xlator_t *this, dict_t *options)
+{
+ int ret = -1;
+
+ GF_OPTION_RECONF("pass-through", this->pass_through, options, bool, out);
+
+ ret = 0;
+out:
+ return ret;
+}
+
+void
+fini(xlator_t *this)
+{
+ mem_pool_destroy(this->local_pool);
+ this->local_pool = NULL;
+ return;
+}
+
+struct xlator_fops fops = {
+ .mkdir = sdfs_mkdir,
+ .rmdir = sdfs_rmdir,
+ .create = sdfs_create,
+ .unlink = sdfs_unlink,
+ .symlink = sdfs_symlink,
+ .link = sdfs_link,
+ .mknod = sdfs_mknod,
+ .rename = sdfs_rename,
+ .lookup = sdfs_lookup,
+ .readdirp = sdfs_readdirp,
+};
+
+struct xlator_cbks cbks;
+
+struct volume_options options[] = {
+ {.key = {"pass-through"},
+ .type = GF_OPTION_TYPE_BOOL,
+ .default_value = "true",
+ .op_version = {GD_OP_VERSION_4_1_0},
+ .flags = OPT_FLAG_SETTABLE | OPT_FLAG_DOC | OPT_FLAG_CLIENT_OPT,
+ .tags = {"sdfs"},
+ .description = "Enable/Disable dentry serialize functionality"},
+ {.key = {NULL}},
+};
+
+xlator_api_t xlator_api = {
+ .init = init,
+ .fini = fini,
+ .reconfigure = reconfigure,
+ .op_version = {GD_OP_VERSION_4_0_0},
+ .fops = &fops,
+ .cbks = &cbks,
+ .options = options,
+ .identifier = "sdfs",
+ .category = GF_TECH_PREVIEW,
+};
diff --git a/xlators/features/sdfs/src/sdfs.h b/xlators/features/sdfs/src/sdfs.h
new file mode 100644
index 00000000000..dded5a2d7fc
--- /dev/null
+++ b/xlators/features/sdfs/src/sdfs.h
@@ -0,0 +1,49 @@
+/*
+ Copyright (c) 2016 Red Hat, Inc. <http://www.redhat.com>
+ This file is part of GlusterFS.
+
+ This file is licensed to you under your choice of the GNU Lesser
+ General Public License, version 3 or any later version (LGPLv3 or
+ later), or the GNU General Public License, version 2 (GPLv2), in all
+ cases as published by the Free Software Foundation.
+*/
+
+#include <glusterfs/xlator.h>
+#include <glusterfs/call-stub.h>
+#include "sdfs-messages.h"
+#include <glusterfs/atomic.h>
+
+#define SDFS_LOCK_COUNT_MAX 2
+
+typedef struct {
+ loc_t parent_loc;
+ char *basename;
+ int locked[SDFS_LOCK_COUNT_MAX];
+} sdfs_entry_lock_t;
+
+typedef struct {
+ sdfs_entry_lock_t entrylk[SDFS_LOCK_COUNT_MAX];
+ int lock_count;
+} sdfs_lock_t;
+
+struct sdfs_local {
+ call_frame_t *main_frame;
+ loc_t loc;
+ loc_t parent_loc;
+ call_stub_t *stub;
+ sdfs_lock_t *lock;
+ int op_ret;
+ int op_errno;
+ gf_atomic_t call_cnt;
+};
+typedef struct sdfs_local sdfs_local_t;
+
+#define SDFS_STACK_DESTROY(frame) \
+ do { \
+ sdfs_local_t *__local = NULL; \
+ __local = frame->local; \
+ frame->local = NULL; \
+ gf_client_unref(frame->root->client); \
+ STACK_DESTROY(frame->root); \
+ sdfs_local_cleanup(__local); \
+ } while (0)