summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSakshi Bansal <sabansal@redhat.com>2018-01-22 14:38:17 +0530
committerRaghavendra G <rgowdapp@redhat.com>2018-01-24 05:39:44 +0000
commit78868033bc208bdb681266d0129e66f5627295a2 (patch)
tree89695fbabe3cc077669b23934aac24c3f457ef3c
parent5cbe21b1ddf6eae4782afa33ce841fc4aa195974 (diff)
dentry fop serializer: added new server side xlator for dentry fop serialization
Problems addressed by this xlator : [1]. To prevent race between parallel mkdir,mkdir and lookup etc. Fops like mkdir/create, lookup, rename, unlink, link that happen on a particular dentry must be serialized to ensure atomicity. Another possible case can be a fresh lookup to find existance of a path whose gfid is not set yet. Further, storage/posix employs a ctime based heuristic 'is_fresh_file' (interval time is less than 1 second of current time) to check fresh-ness of file. With serialization of these two fops (lookup & mkdir), we eliminate the race altogether. [2]. Staleness of dentries This causes exponential increase in traversal time for any inode in the subtree of the directory pointed by stale dentry. Cause : Stale dentry is created because of following two operations: a. dentry creation due to inode_link, done during operations like lookup, mkdir, create, mknod, symlink, create and b. dentry unlinking due to various operations like rmdir, rename, unlink. The reason is __inode_link uses __is_dentry_cyclic, which explores all possible path to avoid cyclic link formation during inode linkage. __is_dentry_cyclic explores stale-dentry(ies) and its all ancestors which is increases traversing time exponentially. Implementation : To acheive this all fops on dentry must take entry locks before they proceed, once they have acquired locks, they perform the fop and then release the lock. Some documentation from email conversation: [1] http://www.gluster.org/pipermail/gluster-devel/2015-December/047314.html [2] http://www.gluster.org/pipermail/gluster-devel/2015-August/046428.html With this patch, the feature is optional, enable it by running: `gluster volume set $volname features.sdfs enable` Also the feature is tested for a month without issues in the experiemental branch for all the regression. Change-Id: I6e80ba3cabfa6facd5dda63bd482b9bf18b6b79b Fixes: #397 BUG: 1304962 Signed-off-by: Sakshi Bansal <sabansal@redhat.com> Signed-off-by: Amar Tumballi <amarts@redhat.com> Signed-off-by: Sunny Kumar <sunkumar@redhat.com>
-rw-r--r--configure.ac2
-rw-r--r--glusterfs.spec.in1
-rw-r--r--libglusterfs/src/glfs-message-id.h1
-rw-r--r--xlators/features/Makefile.am7
-rw-r--r--xlators/features/sdfs/Makefile.am3
-rw-r--r--xlators/features/sdfs/src/Makefile.am17
-rw-r--r--xlators/features/sdfs/src/sdfs-messages.h69
-rw-r--r--xlators/features/sdfs/src/sdfs.c1351
-rw-r--r--xlators/features/sdfs/src/sdfs.h49
-rw-r--r--xlators/mgmt/glusterd/src/glusterd-volgen.c26
-rw-r--r--xlators/mgmt/glusterd/src/glusterd-volume-set.c10
11 files changed, 1532 insertions, 4 deletions
diff --git a/configure.ac b/configure.ac
index 81c56d4e68f..ed84c661797 100644
--- a/configure.ac
+++ b/configure.ac
@@ -152,6 +152,8 @@ AC_CONFIG_FILES([Makefile
xlators/features/marker/src/Makefile
xlators/features/selinux/Makefile
xlators/features/selinux/src/Makefile
+ xlators/features/sdfs/Makefile
+ xlators/features/sdfs/src/Makefile
xlators/features/read-only/Makefile
xlators/features/read-only/src/Makefile
xlators/features/compress/Makefile
diff --git a/glusterfs.spec.in b/glusterfs.spec.in
index 2dbd013c1d9..f7f8cbae6b3 100644
--- a/glusterfs.spec.in
+++ b/glusterfs.spec.in
@@ -1256,6 +1256,7 @@ exit 0
%{_libdir}/glusterfs/%{version}%{?prereltag}/xlator/features/arbiter.so
%{_libdir}/glusterfs/%{version}%{?prereltag}/xlator/features/bit-rot.so
%{_libdir}/glusterfs/%{version}%{?prereltag}/xlator/features/bitrot-stub.so
+ %{_libdir}/glusterfs/%{version}%{?prereltag}/xlator/features/sdfs.so
%if ( 0%{!?_without_tiering:1} )
%{_libdir}/glusterfs/%{version}%{?prereltag}/xlator/features/changetimerecorder.so
%{_libdir}/libgfdb.so.*
diff --git a/libglusterfs/src/glfs-message-id.h b/libglusterfs/src/glfs-message-id.h
index 241913d5b45..6a7d07bef84 100644
--- a/libglusterfs/src/glfs-message-id.h
+++ b/libglusterfs/src/glfs-message-id.h
@@ -85,6 +85,7 @@ enum _msgid_comp {
GLFS_MSGID_COMP(NLC, 1),
GLFS_MSGID_COMP(SL, 1),
GLFS_MSGID_COMP(HAM, 1),
+ GLFS_MSGID_COMP(SDFS, 1),
/* --- new segments for messages goes above this line --- */
diff --git a/xlators/features/Makefile.am b/xlators/features/Makefile.am
index f7791b0cc32..1e24679903d 100644
--- a/xlators/features/Makefile.am
+++ b/xlators/features/Makefile.am
@@ -1,6 +1,5 @@
SUBDIRS = locks quota read-only quiesce marker index barrier \
- arbiter compress changelog changetimerecorder \
- gfid-access $(GLUPY_SUBDIR) upcall snapview-client snapview-server \
- trash shard bit-rot leases selinux
-
+ arbiter compress changelog changetimerecorder \
+ gfid-access $(GLUPY_SUBDIR) upcall snapview-client snapview-server \
+ trash shard bit-rot leases selinux sdfs
CLEANFILES =
diff --git a/xlators/features/sdfs/Makefile.am b/xlators/features/sdfs/Makefile.am
new file mode 100644
index 00000000000..a985f42a877
--- /dev/null
+++ b/xlators/features/sdfs/Makefile.am
@@ -0,0 +1,3 @@
+SUBDIRS = src
+
+CLEANFILES =
diff --git a/xlators/features/sdfs/src/Makefile.am b/xlators/features/sdfs/src/Makefile.am
new file mode 100644
index 00000000000..ec9ed804b32
--- /dev/null
+++ b/xlators/features/sdfs/src/Makefile.am
@@ -0,0 +1,17 @@
+xlator_LTLIBRARIES = sdfs.la
+xlatordir = $(libdir)/glusterfs/$(PACKAGE_VERSION)/xlator/features
+
+sdfs_la_LDFLAGS = -module $(GF_XLATOR_DEFAULT_LDFLAGS)
+
+sdfs_la_SOURCES = sdfs.c
+sdfs_la_LIBADD = $(top_builddir)/libglusterfs/src/libglusterfs.la
+
+noinst_HEADERS = sdfs.h sdfs-messages.h $(top_builddir)/xlators/lib/src/libxlator.h
+
+AM_CPPFLAGS = $(GF_CPPFLAGS) -I$(top_srcdir)/libglusterfs/src \
+ -I$(top_srcdir)/xlators/lib/src \
+ -I$(top_srcdir)/rpc/xdr/src/ -I$(top_builddir)/rpc/xdr/src/
+
+AM_CFLAGS = -Wall -fno-strict-aliasing $(GF_CFLAGS)
+
+CLEANFILES =
diff --git a/xlators/features/sdfs/src/sdfs-messages.h b/xlators/features/sdfs/src/sdfs-messages.h
new file mode 100644
index 00000000000..6c7a9d90667
--- /dev/null
+++ b/xlators/features/sdfs/src/sdfs-messages.h
@@ -0,0 +1,69 @@
+/*
+ Copyright (c) 2016 Red Hat, Inc. <http://www.redhat.com>
+ This file is part of GlusterFS.
+
+ This file is licensed to you under your choice of the GNU Lesser
+ General Public License, version 3 or any later version (LGPLv3 or
+ later), or the GNU General Public License, version 2 (GPLv2), in all
+ cases as published by the Free Software Foundation.
+ */
+
+#ifndef _DFS_MESSAGES_H_
+#define _DFS_MESSAGES_H_
+
+#include "glfs-message-id.h"
+
+/* file bit-rot-bitd-messages.h
+ * brief SDFS log-message IDs and their descriptions
+ */
+
+/* NOTE: Rules for message additions
+ * 1) Each instance of a message is _better_ left with a unique message ID, even
+ * if the message format is the same. Reasoning is that, if the message
+ * format needs to change in one instance, the other instances are not
+ * impacted or the new change does not change the ID of the instance being
+ * modified.
+ * 2) Addition of a message,
+ * - Should increment the GLFS_NUM_MESSAGES
+ * - Append to the list of messages defined, towards the end
+ * - Retain macro naming as glfs_msg_X (for redability across developers)
+ * NOTE: Rules for message format modifications
+ * 3) Check acorss the code if the message ID macro in question is reused
+ * anywhere. If reused then then the modifications should ensure correctness
+ * everywhere, or needs a new message ID as (1) above was not adhered to. If
+ * not used anywhere, proceed with the required modification.
+ * NOTE: Rules for message deletion
+ * 4) Check (3) and if used anywhere else, then cannot be deleted. If not used
+ * anywhere, then can be deleted, but will leave a hole by design, as
+ * addition rules specify modification to the end of the list and not filling
+ * holes.
+ */
+
+#define GLFS_SDFS_BASE GLFS_MSGID_COMP_SDFS
+#define GLFS_SDFS_NUM_MESSAGES 2
+#define GLFS_MSGID_END (GLFS_SDFS_BASE + \
+ GLFS_SDFS_NUM_MESSAGES + 1)
+/* Messaged with message IDs */
+#define glfs_msg_start_x GLFS_DFS_BASE, "Invalid: Start of messages"
+/*------------*/
+
+
+#define SDFS_MSG_ENTRYLK_ERROR (GLFS_SDFS_BASE + 1)
+/*!
+ * @messageid
+ * @diagnosis
+ * @recommendedaction
+ *
+ */
+
+#define SDFS_MSG_MKDIR_ERROR (GLFS_SDFS_BASE + 2)
+/*!
+ * @messageid
+ * @diagnosis
+ * @recommendedaction
+ *
+ */
+/*------------*/
+
+#define glfs_msg_end_x GLFS_MSGID_END, "Invalid: End of messages"
+#endif /* !_SDFS_MESSAGES_H_ */
diff --git a/xlators/features/sdfs/src/sdfs.c b/xlators/features/sdfs/src/sdfs.c
new file mode 100644
index 00000000000..3b70dce5d27
--- /dev/null
+++ b/xlators/features/sdfs/src/sdfs.c
@@ -0,0 +1,1351 @@
+/*
+ Copyright (c) 2016 Red Hat, Inc. <http://www.redhat.com>
+ This file is part of GlusterFS.
+
+ This file is licensed to you under your choice of the GNU Lesser
+ General Public License, version 3 or any later version (LGPLv3 or
+ later), or the GNU General Public License, version 2 (GPLv2), in all
+ cases as published by the Free Software Foundation.
+*/
+#include <libgen.h>
+#include "sdfs.h"
+
+static int
+sdfs_frame_return (call_frame_t *frame)
+{
+ sdfs_local_t *local = NULL;
+
+ if (!frame)
+ return -1;
+
+ local = frame->local;
+
+ return GF_ATOMIC_DEC (local->call_cnt);
+}
+
+static void
+sdfs_lock_free (sdfs_entry_lock_t *entrylk)
+{
+ if (entrylk == NULL)
+ goto out;
+
+ loc_wipe (&entrylk->parent_loc);
+ GF_FREE (entrylk->basename);
+
+out:
+ return;
+}
+
+static void
+sdfs_lock_array_free (sdfs_lock_t *lock)
+{
+ sdfs_entry_lock_t *entrylk = NULL;
+ int i = 0;
+
+ if (lock == NULL)
+ goto out;
+
+ for (i = 0; i < lock->lock_count; i++) {
+ entrylk = &lock->entrylk[i];
+ sdfs_lock_free (entrylk);
+ }
+
+out:
+ return;
+}
+
+static void
+sdfs_local_cleanup (sdfs_local_t *local)
+{
+ if (!local)
+ return;
+
+ loc_wipe (&local->loc);
+ loc_wipe (&local->parent_loc);
+
+ if (local->stub) {
+ call_stub_destroy (local->stub);
+ local->stub = NULL;
+ }
+
+ sdfs_lock_array_free (local->lock);
+ GF_FREE (local->lock);
+
+ mem_put (local);
+}
+
+static int
+sdfs_build_parent_loc (loc_t *parent, loc_t *child)
+{
+ int ret = -1;
+ char *path = NULL;
+
+ if (!child->parent) {
+ goto out;
+ }
+ parent->inode = inode_ref (child->parent);
+ path = gf_strdup (child->path);
+ if (!path) {
+ ret = -ENOMEM;
+ goto out;
+ }
+
+ parent->path = dirname(path);
+ if (!parent->path) {
+ goto out;
+ }
+
+ gf_uuid_copy (parent->gfid, child->pargfid);
+ return 0;
+
+out:
+ GF_FREE (path);
+ return ret;
+}
+
+static sdfs_local_t *
+sdfs_local_init (call_frame_t *frame, xlator_t *this)
+{
+ sdfs_local_t *local = NULL;
+
+ local = mem_get0 (this->local_pool);
+ if (!local)
+ goto out;
+
+ frame->local = local;
+out:
+ return local;
+}
+
+static int
+sdfs_get_new_frame (call_frame_t *frame, loc_t *loc, call_frame_t **new_frame)
+{
+ int ret = -1;
+ sdfs_local_t *local = NULL;
+ client_t *client = NULL;
+
+ *new_frame = copy_frame (frame);
+ if (!*new_frame) {
+ goto err;
+ }
+
+ client = frame->root->client;
+ gf_client_ref (client);
+ (*new_frame)->root->client = client;
+ local = sdfs_local_init (*new_frame, THIS);
+ if (!local) {
+ goto err;
+ }
+
+ local->main_frame = frame;
+
+ ret = sdfs_build_parent_loc (&local->parent_loc, loc);
+ if (ret) {
+ goto err;
+ }
+
+ ret = loc_copy (&local->loc, loc);
+ if (ret == -1) {
+ goto err;
+ }
+
+ ret = 0;
+err:
+ if (ret == -1) {
+ SDFS_STACK_DESTROY (frame);
+ }
+ return ret;
+}
+
+int
+sdfs_entrylk_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
+ int32_t op_ret, int32_t op_errno, dict_t *xdata)
+{
+ sdfs_local_t *local = NULL;
+ call_stub_t *stub = NULL;
+
+ local = frame->local;
+
+ local->op_ret = op_ret;
+ local->op_errno = op_errno;
+
+ if (local->stub) {
+ stub = local->stub;
+ local->stub = NULL;
+ call_resume (stub);
+ } else {
+ if (op_ret < 0)
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ SDFS_MSG_ENTRYLK_ERROR,
+ "Unlocking entry lock failed for %s",
+ local->loc.name);
+
+ SDFS_STACK_DESTROY (frame);
+ }
+
+ return 0;
+}
+
+int
+sdfs_mkdir_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
+ int32_t op_ret, int32_t op_errno, inode_t *inode,
+ struct iatt *stbuf, struct iatt *preparent,
+ struct iatt *postparent, dict_t *xdata)
+{
+ sdfs_local_t *local = NULL;
+
+ local = frame->local;
+
+ STACK_UNWIND_STRICT (mkdir, local->main_frame, op_ret, op_errno, inode,
+ stbuf, preparent, postparent, xdata);
+
+ local->main_frame = NULL;
+ STACK_WIND (frame, sdfs_entrylk_cbk, FIRST_CHILD (this),
+ FIRST_CHILD(this)->fops->entrylk,
+ this->name, &local->parent_loc, local->loc.name,
+ ENTRYLK_UNLOCK, ENTRYLK_WRLCK, xdata);
+ return 0;
+}
+
+int
+sdfs_mkdir_helper (call_frame_t *frame, xlator_t *this, loc_t *loc,
+ mode_t mode, mode_t umask, dict_t *xdata)
+{
+ sdfs_local_t *local = NULL;
+ char gfid[GF_UUID_BUF_SIZE] = {0};
+ int op_errno = -1;
+
+ local = frame->local;
+
+ gf_uuid_unparse(loc->pargfid, gfid);
+
+ if (local->op_ret < 0) {
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ SDFS_MSG_ENTRYLK_ERROR,
+ "Acquiring entry lock failed for directory %s "
+ "with parent gfid %s", local->loc.name, gfid);
+ op_errno = local->op_errno;
+ goto err;
+ }
+
+ STACK_WIND (frame, sdfs_mkdir_cbk, FIRST_CHILD(this),
+ FIRST_CHILD(this)->fops->mkdir, loc,
+ mode, umask, xdata);
+
+ return 0;
+err:
+ STACK_UNWIND_STRICT (mkdir, local->main_frame, -1, op_errno,
+ NULL, NULL, NULL, NULL, NULL);
+
+ local->main_frame = NULL;
+ SDFS_STACK_DESTROY (frame);
+ return 0;
+}
+
+int
+sdfs_mkdir (call_frame_t *frame, xlator_t *this, loc_t *loc, mode_t mode,
+ mode_t umask, dict_t *xdata)
+{
+ sdfs_local_t *local = NULL;
+ call_frame_t *new_frame = NULL;
+ call_stub_t *stub = NULL;
+ int op_errno = 0;
+
+ if (-1 == sdfs_get_new_frame (frame, loc, &new_frame)) {
+ op_errno = ENOMEM;
+ goto err;
+ }
+
+ stub = fop_mkdir_stub (new_frame, sdfs_mkdir_helper, loc, mode,
+ umask, xdata);
+ if (!stub) {
+ op_errno = ENOMEM;
+ goto err;
+ }
+
+ ((sdfs_local_t *)new_frame->local)->stub = stub;
+
+ STACK_WIND (new_frame, sdfs_entrylk_cbk,
+ FIRST_CHILD (this),
+ FIRST_CHILD(this)->fops->entrylk,
+ this->name, &local->parent_loc, local->loc.name,
+ ENTRYLK_LOCK, ENTRYLK_WRLCK, xdata);
+
+ return 0;
+err:
+ STACK_UNWIND_STRICT (mkdir, frame, -1, op_errno, NULL, NULL,
+ NULL, NULL, NULL);
+
+ if (new_frame)
+ SDFS_STACK_DESTROY (new_frame);
+
+ return 0;
+}
+
+int
+sdfs_rmdir_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
+ int32_t op_ret, int32_t op_errno, struct iatt *preparent,
+ struct iatt *postparent, dict_t *xdata)
+{
+ sdfs_local_t *local = NULL;
+
+ local = frame->local;
+
+ STACK_UNWIND_STRICT (rmdir, local->main_frame, op_ret, op_errno,
+ preparent, postparent, xdata);
+
+ local->main_frame = NULL;
+ STACK_WIND (frame, sdfs_entrylk_cbk, FIRST_CHILD (this),
+ FIRST_CHILD(this)->fops->entrylk,
+ this->name, &local->parent_loc, local->loc.name,
+ ENTRYLK_UNLOCK, ENTRYLK_WRLCK, xdata);
+ return 0;
+}
+
+int
+sdfs_rmdir_helper (call_frame_t *frame, xlator_t *this, loc_t *loc,
+ int flags, dict_t *xdata)
+{
+ sdfs_local_t *local = NULL;
+ char gfid[GF_UUID_BUF_SIZE] = {0};
+
+ local = frame->local;
+
+ gf_uuid_unparse(loc->pargfid, gfid);
+
+ if (local->op_ret < 0) {
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ SDFS_MSG_ENTRYLK_ERROR,
+ "Acquiring entry lock failed for directory %s "
+ "with parent gfid %s", local->loc.name, gfid);
+ goto err;
+ }
+
+ STACK_WIND (frame, sdfs_rmdir_cbk, FIRST_CHILD(this),
+ FIRST_CHILD(this)->fops->rmdir, loc,
+ flags, xdata);
+
+ return 0;
+err:
+ STACK_UNWIND_STRICT (rmdir, local->main_frame, -1, local->op_errno,
+ NULL, NULL, NULL);
+
+ local->main_frame = NULL;
+ SDFS_STACK_DESTROY (frame);
+ return 0;
+}
+
+int
+sdfs_rmdir (call_frame_t *frame, xlator_t *this, loc_t *loc, int flags,
+ dict_t *xdata)
+{
+ sdfs_local_t *local = NULL;
+ call_frame_t *new_frame = NULL;
+ call_stub_t *stub = NULL;
+ int op_errno = 0;
+
+ if (-1 == sdfs_get_new_frame (frame, loc, &new_frame)) {
+ op_errno = ENOMEM;
+ goto err;
+ }
+
+ stub = fop_rmdir_stub (new_frame, sdfs_rmdir_helper, loc, flags, xdata);
+ if (!stub) {
+ op_errno = ENOMEM;
+ goto err;
+ }
+
+ ((sdfs_local_t *)new_frame->local)->stub = stub;
+
+ STACK_WIND (new_frame, sdfs_entrylk_cbk,
+ FIRST_CHILD (this),
+ FIRST_CHILD(this)->fops->entrylk,
+ this->name, &local->parent_loc, local->loc.name,
+ ENTRYLK_LOCK, ENTRYLK_WRLCK, xdata);
+
+ return 0;
+err:
+ STACK_UNWIND_STRICT (rmdir, frame, -1, op_errno, NULL, NULL,
+ NULL);
+
+ if (new_frame)
+ SDFS_STACK_DESTROY (new_frame);
+
+ return 0;
+}
+
+int
+sdfs_create_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
+ int32_t op_ret, int32_t op_errno,
+ fd_t *fd, inode_t *inode, struct iatt *stbuf,
+ struct iatt *preparent, struct iatt *postparent,
+ dict_t *xdata)
+{
+ sdfs_local_t *local = NULL;
+
+ local = frame->local;
+
+ STACK_UNWIND_STRICT (create, local->main_frame, op_ret, op_errno, fd,
+ inode, stbuf, preparent, postparent, xdata);
+
+ local->main_frame = NULL;
+ STACK_WIND (frame, sdfs_entrylk_cbk, FIRST_CHILD (this),
+ FIRST_CHILD(this)->fops->entrylk,
+ this->name, &local->parent_loc, local->loc.name,
+ ENTRYLK_UNLOCK, ENTRYLK_WRLCK, xdata);
+ return 0;
+}
+
+int
+sdfs_create_helper (call_frame_t *frame, xlator_t *this, loc_t *loc,
+ int32_t flags, mode_t mode, mode_t umask, fd_t *fd,
+ dict_t *xdata)
+{
+ sdfs_local_t *local = NULL;
+ char gfid[GF_UUID_BUF_SIZE] = {0};
+
+ local = frame->local;
+
+ gf_uuid_unparse(loc->pargfid, gfid);
+
+ if (local->op_ret < 0) {
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ SDFS_MSG_ENTRYLK_ERROR,
+ "Acquiring entry lock failed for directory %s "
+ "with parent gfid %s", local->loc.name, gfid);
+ goto err;
+ }
+
+ STACK_WIND (frame, sdfs_create_cbk, FIRST_CHILD(this),
+ FIRST_CHILD(this)->fops->create, loc, flags,
+ mode, umask, fd, xdata);
+
+ return 0;
+err:
+ STACK_UNWIND_STRICT (create, local->main_frame, -1, local->op_errno,
+ NULL, NULL, NULL, NULL, NULL, NULL);
+
+ local->main_frame = NULL;
+ SDFS_STACK_DESTROY (frame);
+ return 0;
+}
+
+int
+sdfs_create (call_frame_t *frame, xlator_t *this, loc_t *loc,
+ int32_t flags, mode_t mode, mode_t umask,
+ fd_t *fd, dict_t *xdata)
+{
+ sdfs_local_t *local = NULL;
+ call_frame_t *new_frame = NULL;
+ call_stub_t *stub = NULL;
+ int op_errno = 0;
+
+ if (-1 == sdfs_get_new_frame (frame, loc, &new_frame)) {
+ op_errno = ENOMEM;
+ goto err;
+ }
+
+ stub = fop_create_stub (new_frame, sdfs_create_helper, loc,
+ flags, mode, umask, fd, xdata);
+ if (!stub) {
+ op_errno = ENOMEM;
+ goto err;
+ }
+
+ ((sdfs_local_t *)new_frame->local)->stub = stub;
+
+ STACK_WIND (new_frame, sdfs_entrylk_cbk,
+ FIRST_CHILD (this),
+ FIRST_CHILD(this)->fops->entrylk,
+ this->name, &local->parent_loc, local->loc.name,
+ ENTRYLK_LOCK, ENTRYLK_WRLCK, xdata);
+
+ return 0;
+err:
+ STACK_UNWIND_STRICT (create, frame, -1, op_errno, NULL, NULL,
+ NULL, NULL, NULL, NULL);
+
+ if (new_frame)
+ SDFS_STACK_DESTROY (new_frame);
+
+ return 0;
+}
+
+int
+sdfs_unlink_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
+ int32_t op_ret, int32_t op_errno, struct iatt *preparent,
+ struct iatt *postparent, dict_t *xdata)
+{
+ sdfs_local_t *local = NULL;
+
+ local = frame->local;
+
+ STACK_UNWIND_STRICT (unlink, local->main_frame, op_ret, op_errno,
+ preparent, postparent, xdata);
+
+ local->main_frame = NULL;
+ STACK_WIND (frame, sdfs_entrylk_cbk, FIRST_CHILD (this),
+ FIRST_CHILD(this)->fops->entrylk,
+ this->name, &local->parent_loc, local->loc.name,
+ ENTRYLK_UNLOCK, ENTRYLK_WRLCK, xdata);
+ return 0;
+}
+
+int
+sdfs_unlink_helper (call_frame_t *frame, xlator_t *this, loc_t *loc,
+ int flags, dict_t *xdata)
+{
+ sdfs_local_t *local = NULL;
+ char gfid[GF_UUID_BUF_SIZE] = {0};
+
+ local = frame->local;
+
+ gf_uuid_unparse(loc->pargfid, gfid);
+
+ if (local->op_ret < 0) {
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ SDFS_MSG_ENTRYLK_ERROR,
+ "Acquiring entry lock failed for directory %s "
+ "with parent gfid %s", local->loc.name, gfid);
+ goto err;
+ }
+
+ STACK_WIND (frame, sdfs_unlink_cbk, FIRST_CHILD(this),
+ FIRST_CHILD(this)->fops->unlink, loc, flags, xdata);
+
+ return 0;
+err:
+ STACK_UNWIND_STRICT (unlink, local->main_frame, -1, local->op_errno,
+ NULL, NULL, NULL);
+
+ local->main_frame = NULL;
+ SDFS_STACK_DESTROY (frame);
+ return 0;
+}
+
+int
+sdfs_unlink (call_frame_t *frame, xlator_t *this, loc_t *loc,
+ int flags, dict_t *xdata)
+{
+ sdfs_local_t *local = NULL;
+ call_frame_t *new_frame = NULL;
+ call_stub_t *stub = NULL;
+ int op_errno = 0;
+
+ if (-1 == sdfs_get_new_frame (frame, loc, &new_frame)) {
+ op_errno = ENOMEM;
+ goto err;
+ }
+
+ stub = fop_unlink_stub (new_frame, sdfs_unlink_helper, loc,
+ flags, xdata);
+ if (!stub) {
+ op_errno = ENOMEM;
+ goto err;
+ }
+
+ ((sdfs_local_t *)new_frame->local)->stub = stub;
+
+ STACK_WIND (new_frame, sdfs_entrylk_cbk,
+ FIRST_CHILD (this),
+ FIRST_CHILD(this)->fops->entrylk,
+ this->name, &local->parent_loc, local->loc.name,
+ ENTRYLK_LOCK, ENTRYLK_WRLCK, xdata);
+
+ return 0;
+err:
+ STACK_UNWIND_STRICT (unlink, frame, -1, op_errno, NULL, NULL,
+ NULL);
+
+ if (new_frame)
+ SDFS_STACK_DESTROY (new_frame);
+
+ return 0;
+}
+
+int
+sdfs_symlink_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
+ int32_t op_ret, int32_t op_errno, inode_t *inode,
+ struct iatt *stbuf, struct iatt *preparent,
+ struct iatt *postparent, dict_t *xdata)
+{
+ sdfs_local_t *local = NULL;
+
+ local = frame->local;
+
+ STACK_UNWIND_STRICT (link, local->main_frame, op_ret, op_errno, inode,
+ stbuf, preparent, postparent, xdata);
+
+ local->main_frame = NULL;
+ STACK_WIND (frame, sdfs_entrylk_cbk, FIRST_CHILD (this),
+ FIRST_CHILD(this)->fops->entrylk,
+ this->name, &local->parent_loc, local->loc.name,
+ ENTRYLK_UNLOCK, ENTRYLK_WRLCK, xdata);
+ return 0;
+}
+
+int
+sdfs_symlink_helper (call_frame_t *frame, xlator_t *this,
+ const char *linkname, loc_t *loc, mode_t umask,
+ dict_t *xdata)
+{
+ sdfs_local_t *local = NULL;
+ char gfid[GF_UUID_BUF_SIZE] = {0};
+
+ local = frame->local;
+
+ gf_uuid_unparse(loc->pargfid, gfid);
+
+ if (local->op_ret < 0) {
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ SDFS_MSG_ENTRYLK_ERROR,
+ "Acquiring entry lock failed for directory %s "
+ "with parent gfid %s", local->loc.name, gfid);
+ goto err;
+ }
+
+ STACK_WIND (frame, sdfs_symlink_cbk, FIRST_CHILD(this),
+ FIRST_CHILD(this)->fops->symlink, linkname, loc,
+ umask, xdata);
+
+ return 0;
+err:
+ STACK_UNWIND_STRICT (link, local->main_frame, -1, local->op_errno,
+ NULL, NULL, NULL, NULL, NULL);
+
+ local->main_frame = NULL;
+ SDFS_STACK_DESTROY (frame);
+ return 0;
+}
+
+int
+sdfs_symlink (call_frame_t *frame, xlator_t *this, const char *linkname,
+ loc_t *loc, mode_t umask, dict_t *xdata)
+{
+ sdfs_local_t *local = NULL;
+ call_frame_t *new_frame = NULL;
+ call_stub_t *stub = NULL;
+ int op_errno = 0;
+
+ if (-1 == sdfs_get_new_frame (frame, loc, &new_frame)) {
+ op_errno = ENOMEM;
+ goto err;
+ }
+
+ stub = fop_symlink_stub (new_frame, sdfs_symlink_helper, linkname, loc,
+ umask, xdata);
+ if (!stub) {
+ op_errno = ENOMEM;
+ goto err;
+ }
+
+ ((sdfs_local_t *)new_frame->local)->stub = stub;
+
+ STACK_WIND (new_frame, sdfs_entrylk_cbk,
+ FIRST_CHILD (this),
+ FIRST_CHILD(this)->fops->entrylk,
+ this->name, &local->parent_loc, local->loc.name,
+ ENTRYLK_LOCK, ENTRYLK_WRLCK, xdata);
+
+ return 0;
+err:
+ STACK_UNWIND_STRICT (link, frame, -1, op_errno, NULL, NULL,
+ NULL, NULL, NULL);
+
+ if (new_frame)
+ SDFS_STACK_DESTROY (new_frame);
+
+ return 0;
+}
+
+int
+sdfs_common_entrylk_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
+ int32_t op_ret, int32_t op_errno, dict_t *xdata)
+{
+ sdfs_local_t *local = NULL;
+ int this_call_cnt = 0;
+ int lk_index = 0;
+ sdfs_lock_t *locks = NULL;
+ call_stub_t *stub = NULL;
+
+ local = frame->local;
+ locks = local->lock;
+ lk_index = (long) cookie;
+
+ if (op_ret < 0) {
+ local->op_ret = op_ret;
+ local->op_errno = op_errno;
+ } else {
+ locks->entrylk->locked[lk_index] = _gf_true;
+ }
+
+ this_call_cnt = sdfs_frame_return (frame);
+ if (this_call_cnt > 0) {
+ gf_log (this->name, GF_LOG_DEBUG,
+ "As there are more callcnt (%d) returning without WIND",
+ this_call_cnt);
+ return 0;
+ }
+
+ if (local->stub) {
+ stub = local->stub;
+ local->stub = NULL;
+ call_resume (stub);
+ } else {
+ if (local->op_ret < 0)
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ SDFS_MSG_ENTRYLK_ERROR,
+ "unlocking entry lock failed ");
+ SDFS_STACK_DESTROY (frame);
+ }
+
+ return 0;
+}
+
+int
+sdfs_link_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
+ int32_t op_ret, int32_t op_errno,
+ inode_t *inode, struct iatt *stbuf, struct iatt *preparent,
+ struct iatt *postparent, dict_t *xdata)
+{
+ sdfs_local_t *local = NULL;
+ sdfs_lock_t *lock = NULL;
+ int i = 0;
+ int lock_count = 0;
+
+ local = frame->local;
+ lock = local->lock;
+
+ STACK_UNWIND_STRICT (link, local->main_frame, op_ret, op_errno, inode,
+ stbuf, preparent, postparent, xdata);
+
+ local->main_frame = NULL;
+ lock_count = lock->lock_count;
+ for (i = 0; i < lock_count; i++) {
+ STACK_WIND_COOKIE (frame, sdfs_common_entrylk_cbk,
+ (void *)(long) i,
+ FIRST_CHILD (this),
+ FIRST_CHILD(this)->fops->entrylk,
+ this->name, &lock->entrylk[i].parent_loc,
+ lock->entrylk[i].basename,
+ ENTRYLK_UNLOCK, ENTRYLK_WRLCK, xdata);
+ }
+
+ return 0;
+}
+
+int
+sdfs_link_helper (call_frame_t *frame, xlator_t *this, loc_t *oldloc,
+ loc_t *newloc, dict_t *xdata)
+{
+ sdfs_local_t *local = NULL;
+ sdfs_lock_t *locks = NULL;
+ gf_boolean_t stack_destroy = _gf_true;
+ int lock_count = 0;
+ int i = 0;
+
+ local = frame->local;
+ locks = local->lock;
+
+ if (local->op_ret < 0) {
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ SDFS_MSG_ENTRYLK_ERROR,
+ "Acquiring entry lock failed");
+ goto err;
+ }
+
+ STACK_WIND (frame, sdfs_link_cbk, FIRST_CHILD(this),
+ FIRST_CHILD(this)->fops->link, oldloc, newloc,
+ xdata);
+
+ return 0;
+err:
+ STACK_UNWIND_STRICT (link, local->main_frame, -1, local->op_errno,
+ NULL, NULL, NULL, NULL, NULL);
+
+ local->main_frame = NULL;
+ for (i = 0; i < locks->lock_count && locks->entrylk->locked[i]; i++) {
+ lock_count++;
+ }
+ GF_ATOMIC_INIT (local->call_cnt, lock_count);
+
+ for (i = 0; i < lock_count; i++) {
+ if (!locks->entrylk->locked[i]) {
+ lock_count++;
+ continue;
+ }
+
+ stack_destroy = _gf_false;
+ STACK_WIND (frame, sdfs_common_entrylk_cbk,
+ FIRST_CHILD (this),
+ FIRST_CHILD(this)->fops->entrylk,
+ this->name, &locks->entrylk[i].parent_loc,
+ locks->entrylk[i].basename,
+ ENTRYLK_UNLOCK, ENTRYLK_WRLCK, xdata);
+ }
+
+ if (stack_destroy)
+ SDFS_STACK_DESTROY (frame);
+
+ return 0;
+}
+
+static int
+sdfs_init_entry_lock (sdfs_entry_lock_t *lock, loc_t *loc)
+{
+ int ret = 0;
+
+ ret = sdfs_build_parent_loc (&lock->parent_loc, loc);
+ if (ret)
+ return -1;
+
+ lock->basename = gf_strdup (loc->name);
+ if (!lock->basename)
+ return -1;
+
+ return 0;
+}
+
+int
+sdfs_entry_lock_cmp (const void *l1, const void *l2)
+{
+ const sdfs_entry_lock_t *r1 = l1;
+ const sdfs_entry_lock_t *r2 = l2;
+ int ret = 0;
+ uuid_t gfid1 = {0};
+ uuid_t gfid2 = {0};
+
+ loc_gfid ((loc_t *)&r1->parent_loc, gfid1);
+ loc_gfid ((loc_t *)&r2->parent_loc, gfid2);
+ ret = gf_uuid_compare (gfid1, gfid2);
+ /*Entrylks with NULL basename are the 'smallest'*/
+ if (ret == 0) {
+ if (!r1->basename)
+ return -1;
+ if (!r2->basename)
+ return 1;
+ ret = strcmp (r1->basename, r2->basename);
+ }
+
+ if (ret <= 0)
+ return -1;
+ else
+ return 1;
+}
+
+int
+sdfs_link (call_frame_t *frame, xlator_t *this, loc_t *oldloc,
+ loc_t *newloc, dict_t *xdata)
+{
+ sdfs_local_t *local = NULL;
+ call_frame_t *new_frame = NULL;
+ call_stub_t *stub = NULL;
+ sdfs_lock_t *lock = NULL;
+ client_t *client = NULL;
+ int ret = 0;
+ int op_errno = 0;
+
+ new_frame = copy_frame (frame);
+ if (!new_frame) {
+ op_errno = ENOMEM;
+ goto err;
+ }
+
+ gf_client_ref (client);
+ new_frame->root->client = client;
+ local = sdfs_local_init (new_frame, this);
+ if (!local) {
+ op_errno = ENOMEM;
+ goto err;
+ }
+
+ local->main_frame = frame;
+
+
+ lock = GF_CALLOC (1, sizeof (*lock), gf_common_mt_char);
+ if (!lock)
+ goto err;
+
+ local->lock = lock;
+
+ ret = sdfs_init_entry_lock (&lock->entrylk[0], newloc);
+ if (ret)
+ goto err;
+
+ ++lock->lock_count;
+
+ local->lock = lock;
+ GF_ATOMIC_INIT (local->call_cnt, lock->lock_count);
+
+ loc_copy (&local->loc, newloc);
+ if (ret == -1) {
+ op_errno = ENOMEM;
+ goto err;
+ }
+
+ stub = fop_link_stub (new_frame, sdfs_link_helper, oldloc,
+ newloc, xdata);
+ if (!stub) {
+ op_errno = ENOMEM;
+ goto err;
+ }
+
+ local->stub = stub;
+
+ STACK_WIND_COOKIE (new_frame, sdfs_common_entrylk_cbk,
+ 0, FIRST_CHILD (this),
+ FIRST_CHILD(this)->fops->entrylk,
+ this->name, &lock->entrylk[0].parent_loc,
+ lock->entrylk[0].basename, ENTRYLK_LOCK,
+ ENTRYLK_WRLCK, xdata);
+
+ return 0;
+err:
+
+ STACK_UNWIND_STRICT (link, frame, -1, op_errno, NULL, NULL,
+ NULL, NULL, NULL);
+
+ if (new_frame)
+ SDFS_STACK_DESTROY (new_frame);
+
+ return 0;
+}
+
+int
+sdfs_mknod_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
+ int32_t op_ret, int32_t op_errno, inode_t *inode,
+ struct iatt *stbuf, struct iatt *preparent,
+ struct iatt *postparent, dict_t *xdata)
+{
+ sdfs_local_t *local = NULL;
+
+ local = frame->local;
+
+ STACK_UNWIND_STRICT (mknod, local->main_frame, op_ret, op_errno, inode,
+ stbuf, preparent, postparent, xdata);
+
+ local->main_frame = NULL;
+ STACK_WIND (frame, sdfs_entrylk_cbk, FIRST_CHILD (this),
+ FIRST_CHILD(this)->fops->entrylk,
+ this->name, &local->parent_loc, local->loc.name,
+ ENTRYLK_UNLOCK, ENTRYLK_WRLCK, xdata);
+ return 0;
+}
+
+int
+sdfs_mknod_helper (call_frame_t *frame, xlator_t *this, loc_t *loc,
+ mode_t mode, dev_t rdev, mode_t umask, dict_t *xdata)
+{
+ sdfs_local_t *local = NULL;
+ char gfid[GF_UUID_BUF_SIZE] = {0};
+
+ local = frame->local;
+
+ gf_uuid_unparse(loc->pargfid, gfid);
+
+ if (local->op_ret < 0) {
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ SDFS_MSG_ENTRYLK_ERROR,
+ "Acquiring entry lock failed for directory %s "
+ "with parent gfid %s", local->loc.name, gfid);
+ goto err;
+ }
+
+ STACK_WIND (frame, sdfs_mknod_cbk, FIRST_CHILD(this),
+ FIRST_CHILD(this)->fops->mknod, loc, mode, rdev,
+ umask, xdata);
+
+ return 0;
+err:
+ STACK_UNWIND_STRICT (mknod, local->main_frame, -1, local->op_errno,
+ NULL, NULL, NULL, NULL, NULL);
+
+ local->main_frame = NULL;
+ SDFS_STACK_DESTROY (frame);
+ return 0;
+}
+
+int
+sdfs_mknod (call_frame_t *frame, xlator_t *this, loc_t *loc, mode_t mode,
+ dev_t rdev, mode_t umask, dict_t *xdata)
+{
+ sdfs_local_t *local = NULL;
+ call_frame_t *new_frame = NULL;
+ call_stub_t *stub = NULL;
+ int op_errno = 0;
+
+ if (-1 == sdfs_get_new_frame (frame, loc, &new_frame)) {
+ op_errno = ENOMEM;
+ goto err;
+ }
+
+ stub = fop_mknod_stub (new_frame, sdfs_mknod_helper, loc, mode,
+ rdev, umask, xdata);
+ if (!stub) {
+ op_errno = ENOMEM;
+ goto err;
+ }
+
+ ((sdfs_local_t *)new_frame->local)->stub = stub;
+
+ STACK_WIND (new_frame, sdfs_entrylk_cbk,
+ FIRST_CHILD (this),
+ FIRST_CHILD(this)->fops->entrylk,
+ this->name, &local->parent_loc, local->loc.name,
+ ENTRYLK_LOCK, ENTRYLK_WRLCK, xdata);
+
+ return 0;
+err:
+ STACK_UNWIND_STRICT (mknod, frame, -1, op_errno, NULL, NULL,
+ NULL, NULL, NULL);
+
+ if (new_frame)
+ SDFS_STACK_DESTROY (new_frame);
+
+ return 0;
+}
+
+int
+sdfs_rename_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
+ int32_t op_ret, int32_t op_errno, struct iatt *stbuf,
+ struct iatt *preoldparent, struct iatt *postoldparent,
+ struct iatt *prenewparent, struct iatt *postnewparent,
+ dict_t *xdata)
+{
+ sdfs_local_t *local = NULL;
+ sdfs_lock_t *lock = NULL;
+ int i = 0;
+ int call_cnt = 0;
+
+ local = frame->local;
+ lock = local->lock;
+ GF_ATOMIC_INIT (local->call_cnt, lock->lock_count);
+
+ STACK_UNWIND_STRICT (rename, local->main_frame, op_ret, op_errno, stbuf,
+ preoldparent, postoldparent, prenewparent,
+ postnewparent, xdata);
+
+ local->main_frame = NULL;
+ call_cnt = GF_ATOMIC_GET (local->call_cnt);
+
+ for (i = 0; i < call_cnt; i++) {
+ STACK_WIND_COOKIE (frame, sdfs_common_entrylk_cbk,
+ (void *)(long) i,
+ FIRST_CHILD (this),
+ FIRST_CHILD(this)->fops->entrylk,
+ this->name, &lock->entrylk[i].parent_loc,
+ lock->entrylk[i].basename,
+ ENTRYLK_UNLOCK, ENTRYLK_WRLCK, xdata);
+ }
+
+ return 0;
+}
+
+int
+sdfs_rename_helper (call_frame_t *frame, xlator_t *this, loc_t *oldloc,
+ loc_t *newloc, dict_t *xdata)
+{
+ sdfs_local_t *local = NULL;
+ sdfs_lock_t *lock = NULL;
+ gf_boolean_t stack_destroy = _gf_true;
+ int lock_count = 0;
+ int i = 0;
+
+ local = frame->local;
+ lock = local->lock;
+
+ if (local->op_ret < 0) {
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ SDFS_MSG_ENTRYLK_ERROR,
+ "Acquiring entry lock failed ");
+ goto err;
+ }
+
+ STACK_WIND (frame, sdfs_rename_cbk, FIRST_CHILD(this),
+ FIRST_CHILD(this)->fops->rename, oldloc, newloc,
+ xdata);
+
+ return 0;
+
+err:
+ STACK_UNWIND_STRICT (rename, local->main_frame, -1, local->op_errno,
+ NULL, NULL, NULL, NULL, NULL, NULL);
+
+ local->main_frame = NULL;
+ for (i = 0; i < lock->lock_count && lock->entrylk->locked[i]; i++) {
+ lock_count++;
+ }
+ GF_ATOMIC_INIT (local->call_cnt, lock_count);
+
+ for (i = 0; i < lock_count; i++) {
+ if (!lock->entrylk->locked[i]) {
+ lock_count++;
+ continue;
+ }
+ stack_destroy = _gf_false;
+ STACK_WIND (frame, sdfs_common_entrylk_cbk,
+ FIRST_CHILD (this),
+ FIRST_CHILD(this)->fops->entrylk,
+ this->name, &lock->entrylk[i].parent_loc,
+ lock->entrylk[i].basename,
+ ENTRYLK_UNLOCK, ENTRYLK_WRLCK, xdata);
+ }
+
+ if (stack_destroy)
+ SDFS_STACK_DESTROY (frame);
+
+ return 0;
+}
+
+int
+sdfs_rename (call_frame_t *frame, xlator_t *this, loc_t *oldloc,
+ loc_t *newloc, dict_t *xdata)
+{
+ sdfs_local_t *local = NULL;
+ sdfs_lock_t *lock = NULL;
+ call_frame_t *new_frame = NULL;
+ call_stub_t *stub = NULL;
+ client_t *client = NULL;
+ int ret = 0;
+ int op_errno = -1;
+ int i = 0;
+ int call_cnt = 0;
+
+ new_frame = copy_frame (frame);
+ if (!new_frame) {
+ op_errno = ENOMEM;
+ goto err;
+ }
+
+ gf_client_ref (client);
+ new_frame->root->client = client;
+ local = sdfs_local_init (new_frame, this);
+ if (!local) {
+ op_errno = ENOMEM;
+ goto err;
+ }
+
+ local->main_frame = frame;
+
+ lock = GF_CALLOC (1, sizeof (*lock), gf_common_mt_char);
+ if (!lock)
+ goto err;
+
+ local->lock = lock;
+
+ ret = sdfs_init_entry_lock (&lock->entrylk[0], oldloc);
+ if (ret)
+ goto err;
+ lock->entrylk->locked[0] = _gf_false;
+
+ ++lock->lock_count;
+
+ ret = sdfs_init_entry_lock (&lock->entrylk[1], newloc);
+ if (ret)
+ goto err;
+ lock->entrylk->locked[1] = _gf_false;
+
+ ++lock->lock_count;
+
+ qsort (lock->entrylk, lock->lock_count, sizeof (*lock->entrylk),
+ sdfs_entry_lock_cmp);
+
+ local->lock = lock;
+ GF_ATOMIC_INIT (local->call_cnt, lock->lock_count);
+
+ stub = fop_rename_stub (new_frame, sdfs_rename_helper, oldloc,
+ newloc, xdata);
+ if (!stub) {
+ op_errno = ENOMEM;
+ goto err;
+ }
+
+ local->stub = stub;
+ call_cnt = GF_ATOMIC_GET (local->call_cnt);
+ for (i = 0; i < call_cnt; i++) {
+ STACK_WIND_COOKIE (new_frame, sdfs_common_entrylk_cbk,
+ (void *)(long) i,
+ FIRST_CHILD (this),
+ FIRST_CHILD(this)->fops->entrylk,
+ this->name, &lock->entrylk[i].parent_loc,
+ lock->entrylk[i].basename,
+ ENTRYLK_LOCK, ENTRYLK_WRLCK, xdata);
+ }
+
+ return 0;
+err:
+
+ STACK_UNWIND_STRICT (rename, frame, -1, op_errno, NULL, NULL,
+ NULL, NULL, NULL, NULL);
+
+ if (new_frame)
+ SDFS_STACK_DESTROY (new_frame);
+
+ return 0;
+}
+
+int
+sdfs_lookup_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
+ int32_t op_ret, int32_t op_errno, inode_t *inode,
+ struct iatt *stbuf, dict_t *xdata,
+ struct iatt *postparent)
+{
+ sdfs_local_t *local = NULL;
+
+ local = frame->local;
+
+ if (!local->loc.parent) {
+ sdfs_local_cleanup (local);
+ frame->local = NULL;
+ STACK_UNWIND_STRICT (lookup, frame, op_ret, op_errno,
+ inode, stbuf, xdata, postparent);
+ return 0;
+ }
+
+ STACK_UNWIND_STRICT (lookup, local->main_frame, op_ret, op_errno, inode,
+ stbuf, xdata, postparent);
+
+ local->main_frame = NULL;
+ STACK_WIND (frame, sdfs_entrylk_cbk, FIRST_CHILD (this),
+ FIRST_CHILD(this)->fops->entrylk,
+ this->name, &local->parent_loc, local->loc.name,
+ ENTRYLK_UNLOCK, ENTRYLK_RDLCK, xdata);
+ return 0;
+}
+
+int
+sdfs_lookup_helper (call_frame_t *frame, xlator_t *this, loc_t *loc,
+ dict_t *xdata)
+{
+ sdfs_local_t *local = NULL;
+ char gfid[GF_UUID_BUF_SIZE] = {0};
+
+ local = frame->local;
+
+ gf_uuid_unparse(loc->pargfid, gfid);
+
+ if (local->op_ret < 0) {
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ SDFS_MSG_ENTRYLK_ERROR,
+ "Acquiring entry lock failed for directory %s "
+ "with parent gfid %s", local->loc.name, gfid);
+ goto err;
+ }
+
+ STACK_WIND (frame, sdfs_lookup_cbk, FIRST_CHILD(this),
+ FIRST_CHILD(this)->fops->lookup, loc, xdata);
+
+ return 0;
+err:
+ STACK_UNWIND_STRICT (lookup, local->main_frame, -1, local->op_errno,
+ NULL, NULL, NULL, NULL);
+ local->main_frame = NULL;
+
+ SDFS_STACK_DESTROY (frame);
+ return 0;
+}
+
+int
+sdfs_lookup (call_frame_t *frame, xlator_t *this, loc_t *loc,
+ dict_t *xdata)
+{
+ sdfs_local_t *local = NULL;
+ call_frame_t *new_frame = NULL;
+ call_stub_t *stub = NULL;
+ int op_errno = 0;
+
+ if (!loc->parent) {
+ local = sdfs_local_init (frame, this);
+ if (!local) {
+ op_errno = ENOMEM;
+ goto err;
+ }
+
+ STACK_WIND_TAIL(frame, FIRST_CHILD(this),
+ FIRST_CHILD(this)->fops->lookup,
+ loc, xdata);
+ return 0;
+ }
+
+ if (-1 == sdfs_get_new_frame (frame, loc, &new_frame)) {
+ op_errno = ENOMEM;
+ goto err;
+ }
+
+ stub = fop_lookup_stub (new_frame, sdfs_lookup_helper, loc,
+ xdata);
+ if (!stub) {
+ op_errno = ENOMEM;
+ goto err;
+ }
+
+ ((sdfs_local_t *)new_frame->local)->stub = stub;
+
+ STACK_WIND (new_frame, sdfs_entrylk_cbk,
+ FIRST_CHILD (this),
+ FIRST_CHILD(this)->fops->entrylk,
+ this->name, &local->parent_loc, local->loc.name,
+ ENTRYLK_LOCK, ENTRYLK_RDLCK, xdata);
+
+ return 0;
+
+err:
+ STACK_UNWIND_STRICT (lookup, frame, -1, op_errno, NULL, NULL,
+ NULL, NULL);
+
+ if (new_frame)
+ SDFS_STACK_DESTROY (new_frame);
+
+ return 0;
+}
+
+int
+init (xlator_t *this)
+{
+ int ret = -1;
+
+ if (!this->children || this->children->next) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "'dentry-fop-serializer' not configured with exactly one child");
+ goto out;
+ }
+
+ if (!this->parents) {
+ gf_log (this->name, GF_LOG_WARNING,
+ "dangling volume. check volfile ");
+ }
+
+ this->local_pool = mem_pool_new (sdfs_local_t, 512);
+ if (!this->local_pool) {
+ goto out;
+ }
+
+ ret = 0;
+
+out:
+ return ret;
+}
+
+int
+fini (xlator_t *this)
+{
+ mem_pool_destroy (this->local_pool);
+
+ return 0;
+}
+
+
+struct xlator_fops fops = {
+ .mkdir = sdfs_mkdir,
+ .rmdir = sdfs_rmdir,
+ .create = sdfs_create,
+ .unlink = sdfs_unlink,
+ .symlink = sdfs_symlink,
+ .link = sdfs_link,
+ .mknod = sdfs_mknod,
+ .rename = sdfs_rename,
+ .lookup = sdfs_lookup,
+};
+
+struct xlator_cbks cbks;
+
diff --git a/xlators/features/sdfs/src/sdfs.h b/xlators/features/sdfs/src/sdfs.h
new file mode 100644
index 00000000000..d28257eda5e
--- /dev/null
+++ b/xlators/features/sdfs/src/sdfs.h
@@ -0,0 +1,49 @@
+/*
+ Copyright (c) 2016 Red Hat, Inc. <http://www.redhat.com>
+ This file is part of GlusterFS.
+
+ This file is licensed to you under your choice of the GNU Lesser
+ General Public License, version 3 or any later version (LGPLv3 or
+ later), or the GNU General Public License, version 2 (GPLv2), in all
+ cases as published by the Free Software Foundation.
+*/
+
+#include "xlator.h"
+#include "call-stub.h"
+#include "sdfs-messages.h"
+#include "atomic.h"
+
+#define SDFS_LOCK_COUNT_MAX 2
+
+typedef struct{
+ loc_t parent_loc;
+ char *basename;
+ int locked[SDFS_LOCK_COUNT_MAX];
+} sdfs_entry_lock_t;
+
+typedef struct {
+ sdfs_entry_lock_t entrylk[SDFS_LOCK_COUNT_MAX];
+ int lock_count;
+} sdfs_lock_t;
+
+struct sdfs_local {
+ call_frame_t *main_frame;
+ loc_t loc;
+ loc_t parent_loc;
+ call_stub_t *stub;
+ sdfs_lock_t *lock;
+ int op_ret;
+ int op_errno;
+ gf_atomic_t call_cnt;
+};
+typedef struct sdfs_local sdfs_local_t;
+
+#define SDFS_STACK_DESTROY(frame) do { \
+ sdfs_local_t *__local = NULL; \
+ __local = frame->local; \
+ frame->local = NULL; \
+ gf_client_unref (frame->root->client); \
+ STACK_DESTROY (frame->root); \
+ sdfs_local_cleanup (__local); \
+ } while (0)
+
diff --git a/xlators/mgmt/glusterd/src/glusterd-volgen.c b/xlators/mgmt/glusterd/src/glusterd-volgen.c
index 6a02da17918..235a4bd4022 100644
--- a/xlators/mgmt/glusterd/src/glusterd-volgen.c
+++ b/xlators/mgmt/glusterd/src/glusterd-volgen.c
@@ -2024,6 +2024,31 @@ out:
return ret;
}
+static int
+brick_graph_add_sdfs (volgen_graph_t *graph, glusterd_volinfo_t *volinfo,
+ dict_t *set_dict, glusterd_brickinfo_t *brickinfo)
+{
+ xlator_t *xl = NULL;
+ int ret = -1;
+
+ if (!graph || !volinfo)
+ goto out;
+
+ if (!dict_get_str_boolean (set_dict, "features.sdfs", 0)) {
+ /* update only if option is enabled */
+ ret = 0;
+ goto out;
+ }
+
+ xl = volgen_graph_add (graph, "features/sdfs", volinfo->volname);
+ if (!xl)
+ goto out;
+
+ ret = 0;
+out:
+ return ret;
+}
+
xlator_t *
add_one_peer (volgen_graph_t *graph, glusterd_brickinfo_t *peer,
char *volname, uint16_t index)
@@ -2616,6 +2641,7 @@ static volgen_brick_xlator_t server_graph_table[] = {
{brick_graph_add_server, NULL},
{brick_graph_add_decompounder, "decompounder"},
{brick_graph_add_io_stats, "NULL"},
+ {brick_graph_add_sdfs, "sdfs"},
{brick_graph_add_cdc, NULL},
{brick_graph_add_quota, "quota"},
{brick_graph_add_index, "index"},
diff --git a/xlators/mgmt/glusterd/src/glusterd-volume-set.c b/xlators/mgmt/glusterd/src/glusterd-volume-set.c
index acb493540da..1d4797afbeb 100644
--- a/xlators/mgmt/glusterd/src/glusterd-volume-set.c
+++ b/xlators/mgmt/glusterd/src/glusterd-volume-set.c
@@ -3718,6 +3718,16 @@ struct volopt_map_entry glusterd_volopt_map[] = {
.op_version = GD_OP_VERSION_3_13_0,
.flags = VOLOPT_FLAG_CLIENT_OPT
},
+ { .key = "features.sdfs",
+ .voltype = "features/sdfs",
+ .value = "off",
+ .option = "!features",
+ .op_version = GD_OP_VERSION_4_0_0,
+ .description = "enable/disable dentry serialization xlator in volume",
+ .flags = VOLOPT_FLAG_CLIENT_OPT | VOLOPT_FLAG_XLATOR_OPT,
+ .type = NO_DOC,
+ },
+
{ .key = NULL
}
};