summaryrefslogtreecommitdiffstats
path: root/xlators/cluster/dht/src/dht-rename.c
diff options
context:
space:
mode:
authorRaghavendra G <rgowdapp@redhat.com>2018-02-08 17:12:41 +0530
committerRaghavendra G <rgowdapp@redhat.com>2018-05-07 06:04:10 +0000
commit5e356e3f470779433bfe6b0b368676062842b367 (patch)
tree79f5bcf2ed1b3b0b377cdb36176eb185103dc8f4 /xlators/cluster/dht/src/dht-rename.c
parent01a7d27bbaf12618fde56ca05cf9c953445493f3 (diff)
cluster/dht: fixes to parallel renames to same destination codepath
Test case: # while true; do uuid="`uuidgen`"; echo "some data" > "test$uuid"; mv "test$uuid" "test" -f || break; echo "done:$uuid"; done This script was run in parallel from multiple mountpoints Along the course of getting the above usecase working, many issues were found: Issue 1: ======= consider a case of rename (src, dst). We can encounter a situation where, * dst is a file present at the time of lookup * dst is removed by the time rename fop reaches glusterfs In this scenario, acquring inodelk on dst fails with ESTALE resulting in failure of rename. However, as per POSIX irrespective of whether dst is present or not, rename should be successful. Acquiring entrylk provides synchronization even in races like this. Algorithm: 1. Take inodelks on src and dst (if dst is present) on respective cached subvols. These inodelks are done to preserve backward compatibility with older clients, so that synchronization is preserved when a volume is mounted by clients of different versions. Once relevant older versions (3.10, 3.12, 3.13) reach EOL, this code can be removed. 2. Ignore ENOENT/ESTALE errors of inodelk on dst. 3. protect namespace of src and dst. To protect namespace of a file, take inodelk on parent on hashed subvol, then take entrylk on the same subvol on parent with basename of file. inodelk on parent is done to guard against changes to parent layout so that hashed subvol won't change during rename. 4. <rest of rename continues> 5. unlock all locks Issue 2: ======== linkfile creation in lookup codepath can race with a rename. Imagine the following scenario: * lookup finds a data-file with gfid - gfid-dst - without a corresponding linkto file on hashed-subvol. It decides to create linkto file with gfid - gfid-dst. - Note that some codepaths of dht-rename deletes linkto file of dst as first step. So, a lookup racing with an in-progress rename can easily run into this situation. * a rename (src-path:gfid-src, dst-path:gfid-dst) renames data-file and hence gfid of data-file changes to gfid-src with path dst-path. * lookup proceeds and creates linkto file - dst-path - with gfid - dst-gfid - on hashed-subvol. * rename tries to create a linkto file dst-path with src-gfid on hashed-subvol, but it fails with EEXIST. But EEXIST is ignored during linkto file creation. Now we've ended with dst-path having different gfids - dst-gfid on linkto file and src-gfid on data file. Future lookups on dst-path will always fail with ESTALE, due to differing gfids. The fix is to synchronize linkfile creation in lookup path with rename using the same mechanism of protecting namespace explained in solution of Issue 1. Once locks are acquired, before proceeding with linkfile creation, we check whether conditions for linkto file creation are still valid. If not, we skip linkto file creation. Issue 3: ======== gfid of dst-path can change by the time locks are acquired. This means, either another rename overwrote dst-path or dst-path was deleted and recreated by a different client. When this happens, cached-subvol for dst can change. If rename proceeds with old-gfid and old-cached subvol, we'll end up in inconsistent state(s) like dst-path with different gfids on different subvols, more than one data-file being present etc. Fix is to do the lookup with a new inode after protecting namespace of dst. Post lookup, we've to compare gfids and correct local state appropriately to be in sync with backend. Issue 4: ======== During revalidate lookup, if following a linkto file doesn't lead to a valid data-file, local->cached-subvol was not reset to NULL. This means we would be operating on a stale state which can lead to inconsistency. As a fix, reset it to NULL before proceeding with lookup everywhere. Issue 5: ======== Stale dentries left out in inode table on brick resulted in failures of link fop even though the file/dentry didn't exist on backend fs. A patch is submitted to fix this issue. Please check the dependency tree of current patch on gerrit for details In short, we fix the problem by not blindly trusting the inode-table. Instead we validate whether dentry is present by doing lookup on backend fs. Change-Id: I832e5c47d232f90c4edb1fafc512bf19bebde165 updates: bz#1543279 BUG: 1543279 Signed-off-by: Raghavendra G <rgowdapp@redhat.com>
Diffstat (limited to 'xlators/cluster/dht/src/dht-rename.c')
-rw-r--r--xlators/cluster/dht/src/dht-rename.c361
1 files changed, 307 insertions, 54 deletions
diff --git a/xlators/cluster/dht/src/dht-rename.c b/xlators/cluster/dht/src/dht-rename.c
index 3dc042e3ce5..d311ac633e3 100644
--- a/xlators/cluster/dht/src/dht-rename.c
+++ b/xlators/cluster/dht/src/dht-rename.c
@@ -18,6 +18,9 @@
#include "defaults.h"
int dht_rename_unlock (call_frame_t *frame, xlator_t *this);
+int32_t
+dht_rename_lock_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
+ int32_t op_ret, int32_t op_errno, dict_t *xdata);
int
dht_rename_unlock_cbk (call_frame_t *frame, void *cookie,
@@ -44,7 +47,7 @@ dht_rename_unlock_cbk (call_frame_t *frame, void *cookie,
}
static void
-dht_rename_unlock_src (call_frame_t *frame, xlator_t *this)
+dht_rename_dir_unlock_src (call_frame_t *frame, xlator_t *this)
{
dht_local_t *local = NULL;
@@ -54,7 +57,7 @@ dht_rename_unlock_src (call_frame_t *frame, xlator_t *this)
}
static void
-dht_rename_unlock_dst (call_frame_t *frame, xlator_t *this)
+dht_rename_dir_unlock_dst (call_frame_t *frame, xlator_t *this)
{
dht_local_t *local = NULL;
int op_ret = -1;
@@ -107,8 +110,8 @@ static int
dht_rename_dir_unlock (call_frame_t *frame, xlator_t *this)
{
- dht_rename_unlock_src (frame, this);
- dht_rename_unlock_dst (frame, this);
+ dht_rename_dir_unlock_src (frame, this);
+ dht_rename_dir_unlock_dst (frame, this);
return 0;
}
int
@@ -721,12 +724,13 @@ dht_rename_unlock (call_frame_t *frame, xlator_t *this)
int op_ret = -1;
char src_gfid[GF_UUID_BUF_SIZE] = {0};
char dst_gfid[GF_UUID_BUF_SIZE] = {0};
+ dht_ilock_wrap_t inodelk_wrapper = {0, };
local = frame->local;
- op_ret = dht_unlock_inodelk (frame,
- local->lock[0].layout.parent_layout.locks,
- local->lock[0].layout.parent_layout.lk_count,
- dht_rename_unlock_cbk);
+ inodelk_wrapper.locks = local->rename_inodelk_backward_compatible;
+ inodelk_wrapper.lk_count = local->rename_inodelk_bc_count;
+
+ op_ret = dht_unlock_inodelk_wrapper (frame, &inodelk_wrapper);
if (op_ret < 0) {
uuid_utoa_r (local->loc.inode->gfid, src_gfid);
@@ -752,10 +756,13 @@ dht_rename_unlock (call_frame_t *frame, xlator_t *this)
"stale locks left on bricks",
local->loc.path, src_gfid,
local->loc2.path, dst_gfid);
-
- dht_rename_unlock_cbk (frame, NULL, this, 0, 0, NULL);
}
+ dht_unlock_namespace (frame, &local->lock[0]);
+ dht_unlock_namespace (frame, &local->lock[1]);
+
+ dht_rename_unlock_cbk (frame, NULL, this, local->op_ret,
+ local->op_errno, NULL);
return 0;
}
@@ -1470,6 +1477,8 @@ dht_rename_lookup_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
char gfid_local[GF_UUID_BUF_SIZE] = {0};
char gfid_server[GF_UUID_BUF_SIZE] = {0};
int child_index = -1;
+ gf_boolean_t is_src = _gf_false;
+ loc_t *loc = NULL;
child_index = (long)cookie;
@@ -1477,22 +1486,98 @@ dht_rename_lookup_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
local = frame->local;
conf = this->private;
+ is_src = (child_index == 0);
+ if (is_src)
+ loc = &local->loc;
+ else
+ loc = &local->loc2;
+
+ if (op_ret >= 0) {
+ if (is_src)
+ local->src_cached
+ = dht_subvol_get_cached (this,
+ local->loc.inode);
+ else {
+ if (loc->inode)
+ gf_uuid_unparse (loc->inode->gfid, gfid_local);
+
+ gf_msg_debug (this->name, 0,
+ "dst_cached before lookup: %s, "
+ "(path:%s)(gfid:%s),",
+ local->loc2.path,
+ local->dst_cached
+ ? local->dst_cached->name :
+ NULL,
+ local->dst_cached ? gfid_local : NULL);
+
+ local->dst_cached
+ = dht_subvol_get_cached (this,
+ local->loc2_copy.inode);
+
+ gf_uuid_unparse (stbuf->ia_gfid, gfid_local);
+
+ gf_msg_debug (this->name, GF_LOG_WARNING,
+ "dst_cached after lookup: %s, "
+ "(path:%s)(gfid:%s)",
+ local->loc2.path,
+ local->dst_cached
+ ? local->dst_cached->name :
+ NULL,
+ local->dst_cached ? gfid_local : NULL);
+
+
+ if ((local->loc2.inode == NULL)
+ || gf_uuid_compare (stbuf->ia_gfid,
+ local->loc2.inode->gfid)) {
+ if (local->loc2.inode != NULL) {
+ inode_unlink (local->loc2.inode,
+ local->loc2.parent,
+ local->loc2.name);
+ inode_unref (local->loc2.inode);
+ }
+
+ local->loc2.inode
+ = inode_link (local->loc2_copy.inode,
+ local->loc2_copy.parent,
+ local->loc2_copy.name,
+ stbuf);
+ gf_uuid_copy (local->loc2.gfid,
+ stbuf->ia_gfid);
+ }
+ }
+ }
+
if (op_ret < 0) {
- /* The meaning of is_linkfile is overloaded here. For locking
- * to work properly both rebalance and rename should acquire
- * lock on datafile. The reason for sending this lookup is to
- * find out whether we've acquired a lock on data file.
- * Between the lookup before rename and this rename, the
- * file could be migrated by a rebalance process and now this
- * file this might be a linkto file. We verify that by sending
- * this lookup. However, if this lookup fails we cannot really
- * say whether we've acquired lock on a datafile or linkto file.
- * So, we act conservatively and _assume_
- * that this is a linkfile and fail the rename operation.
- */
- local->is_linkfile = _gf_true;
- local->op_errno = op_errno;
- } else if (xattr && check_is_linkfile (inode, stbuf, xattr,
+ if (is_src) {
+ /* The meaning of is_linkfile is overloaded here. For locking
+ * to work properly both rebalance and rename should acquire
+ * lock on datafile. The reason for sending this lookup is to
+ * find out whether we've acquired a lock on data file.
+ * Between the lookup before rename and this rename, the
+ * file could be migrated by a rebalance process and now this
+ * file this might be a linkto file. We verify that by sending
+ * this lookup. However, if this lookup fails we cannot really
+ * say whether we've acquired lock on a datafile or linkto file.
+ * So, we act conservatively and _assume_
+ * that this is a linkfile and fail the rename operation.
+ */
+ local->is_linkfile = _gf_true;
+ local->op_errno = op_errno;
+ } else {
+ if (local->dst_cached)
+ gf_msg_debug (this->name, op_errno,
+ "file %s (gfid:%s) was present "
+ "(hashed-subvol=%s, "
+ "cached-subvol=%s) before rename,"
+ " but lookup failed",
+ local->loc2.path,
+ uuid_utoa (local->loc2.inode->gfid),
+ local->dst_hashed->name,
+ local->dst_cached->name);
+ if (dht_inode_missing (op_errno))
+ local->dst_cached = NULL;
+ }
+ } else if (is_src && xattr && check_is_linkfile (inode, stbuf, xattr,
conf->link_xattr_name)) {
local->is_linkfile = _gf_true;
/* Found linkto file instead of data file, passdown ENOENT
@@ -1500,11 +1585,9 @@ dht_rename_lookup_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
local->op_errno = ENOENT;
}
- if (!local->is_linkfile &&
- gf_uuid_compare (local->lock[0].layout.parent_layout.locks[child_index]->loc.gfid,
- stbuf->ia_gfid)) {
- gf_uuid_unparse (local->lock[0].layout.parent_layout.locks[child_index]->loc.gfid,
- gfid_local);
+ if (!local->is_linkfile && (op_ret >= 0) &&
+ gf_uuid_compare (loc->gfid, stbuf->ia_gfid)) {
+ gf_uuid_unparse (loc->gfid, gfid_local);
gf_uuid_unparse (stbuf->ia_gfid, gfid_server);
gf_msg (this->name, GF_LOG_WARNING, 0,
@@ -1537,6 +1620,123 @@ fail:
return 0;
}
+int
+dht_rename_file_lock1_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
+ int32_t op_ret, int32_t op_errno, dict_t *xdata)
+{
+ dht_local_t *local = NULL;
+ char src_gfid[GF_UUID_BUF_SIZE] = {0};
+ char dst_gfid[GF_UUID_BUF_SIZE] = {0};
+ int ret = 0;
+ loc_t *loc = NULL;
+ xlator_t *subvol = NULL;
+
+ local = frame->local;
+
+ if (op_ret < 0) {
+ uuid_utoa_r (local->loc.inode->gfid, src_gfid);
+
+ if (local->loc2.inode)
+ uuid_utoa_r (local->loc2.inode->gfid, dst_gfid);
+
+ gf_msg (this->name, GF_LOG_WARNING, op_errno,
+ DHT_MSG_INODE_LK_ERROR,
+ "protecting namespace of %s failed"
+ "rename (%s:%s:%s %s:%s:%s)",
+ local->current == &local->lock[0] ? local->loc.path
+ : local->loc2.path,
+ local->loc.path, src_gfid, local->src_hashed->name,
+ local->loc2.path, dst_gfid,
+ local->dst_hashed ? local->dst_hashed->name : NULL);
+
+ local->op_ret = -1;
+ local->op_errno = op_errno;
+ goto err;
+ }
+
+ if (local->current == &local->lock[0]) {
+ loc = &local->loc2;
+ subvol = local->dst_hashed;
+ local->current = &local->lock[1];
+ } else {
+ loc = &local->loc;
+ subvol = local->src_hashed;
+ local->current = &local->lock[0];
+ }
+
+ ret = dht_protect_namespace (frame, loc, subvol, &local->current->ns,
+ dht_rename_lock_cbk);
+ if (ret < 0) {
+ op_errno = EINVAL;
+ goto err;
+ }
+
+ return 0;
+err:
+ /* No harm in calling an extra unlock */
+ dht_rename_unlock (frame, this);
+ return 0;
+}
+
+int32_t
+dht_rename_file_protect_namespace (call_frame_t *frame, void *cookie,
+ xlator_t *this, int32_t op_ret,
+ int32_t op_errno, dict_t *xdata)
+{
+ dht_local_t *local = NULL;
+ char src_gfid[GF_UUID_BUF_SIZE] = {0};
+ char dst_gfid[GF_UUID_BUF_SIZE] = {0};
+ int ret = 0;
+ loc_t *loc = NULL;
+ xlator_t *subvol = NULL;
+
+ local = frame->local;
+
+ if (op_ret < 0) {
+ uuid_utoa_r (local->loc.inode->gfid, src_gfid);
+
+ if (local->loc2.inode)
+ uuid_utoa_r (local->loc2.inode->gfid, dst_gfid);
+
+ gf_msg (this->name, GF_LOG_WARNING, op_errno,
+ DHT_MSG_INODE_LK_ERROR,
+ "acquiring inodelk failed "
+ "rename (%s:%s:%s %s:%s:%s)",
+ local->loc.path, src_gfid, local->src_cached->name,
+ local->loc2.path, dst_gfid,
+ local->dst_cached ? local->dst_cached->name : NULL);
+
+ local->op_ret = -1;
+ local->op_errno = op_errno;
+
+ goto err;
+ }
+
+ /* Locks on src and dst needs to ordered which otherwise might cause
+ * deadlocks when rename (src, dst) and rename (dst, src) is done from
+ * two different clients
+ */
+ dht_order_rename_lock (frame, &loc, &subvol);
+
+ ret = dht_protect_namespace (frame, loc, subvol,
+ &local->current->ns,
+ dht_rename_file_lock1_cbk);
+ if (ret < 0) {
+ op_errno = EINVAL;
+ goto err;
+ }
+
+ return 0;
+
+err:
+ /* Its fine to call unlock even when no locks are acquired, as we check
+ * for lock->locked before winding a unlock call.
+ */
+ dht_rename_unlock (frame, this);
+
+ return 0;
+}
+
int32_t
dht_rename_lock_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
int32_t op_ret, int32_t op_errno, dict_t *xdata)
@@ -1547,8 +1747,8 @@ dht_rename_lock_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
dict_t *xattr_req = NULL;
dht_conf_t *conf = NULL;
int i = 0;
- int count = 0;
-
+ xlator_t *subvol = NULL;
+ dht_lock_t *lock = NULL;
local = frame->local;
conf = this->private;
@@ -1561,11 +1761,13 @@ dht_rename_lock_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
gf_msg (this->name, GF_LOG_WARNING, op_errno,
DHT_MSG_INODE_LK_ERROR,
- "acquiring inodelk failed "
+ "protecting namespace of %s failed. "
"rename (%s:%s:%s %s:%s:%s)",
- local->loc.path, src_gfid, local->src_cached->name,
+ local->current == &local->lock[0] ? local->loc.path
+ : local->loc2.path,
+ local->loc.path, src_gfid, local->src_hashed->name,
local->loc2.path, dst_gfid,
- local->dst_cached ? local->dst_cached->name : NULL);
+ local->dst_hashed ? local->dst_hashed->name : NULL);
local->op_ret = -1;
local->op_errno = op_errno;
@@ -1588,7 +1790,19 @@ dht_rename_lock_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
goto done;
}
- count = local->call_cnt = local->lock[0].layout.parent_layout.lk_count;
+ /* dst_cached might've changed. This normally happens for two reasons:
+ * 1. rebalance migrated dst
+ * 2. Another parallel rename was done overwriting dst
+ *
+ * Doing a lookup on local->loc2 when dst exists, but is associated
+ * with a different gfid will result in an ESTALE error. So, do a fresh
+ * lookup with a new inode on dst-path and handle change of dst-cached
+ * in the cbk. Also, to identify dst-cached changes we do a lookup on
+ * "this" rather than the subvol.
+ */
+ loc_copy (&local->loc2_copy, &local->loc2);
+ inode_unref (local->loc2_copy.inode);
+ local->loc2_copy.inode = inode_new (local->loc.inode->table);
/* Why not use local->lock.locks[?].loc for lookup post lock phase
* ---------------------------------------------------------------
@@ -1608,13 +1822,26 @@ dht_rename_lock_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
* exists with the name that the client requested with.
* */
- for (i = 0; i < count; i++) {
- STACK_WIND_COOKIE (frame, dht_rename_lookup_cbk, (void *)(long)i,
- local->lock[0].layout.parent_layout.locks[i]->xl,
- local->lock[0].layout.parent_layout.locks[i]->xl->fops->lookup,
- ((gf_uuid_compare (local->loc.gfid, \
- local->lock[0].layout.parent_layout.locks[i]->loc.gfid) == 0) ?
- &local->loc : &local->loc2), xattr_req);
+ local->call_cnt = 2;
+ for (i = 0; i < 2; i++) {
+ if (i == 0) {
+ lock = local->rename_inodelk_backward_compatible[0];
+ if (gf_uuid_compare (local->loc.gfid,
+ lock->loc.gfid) == 0)
+ subvol = lock->xl;
+ else {
+ lock = local->rename_inodelk_backward_compatible[1];
+ subvol = lock->xl;
+ }
+ } else {
+ subvol = this;
+ }
+
+ STACK_WIND_COOKIE (frame, dht_rename_lookup_cbk,
+ (void *)(long)i, subvol,
+ subvol->fops->lookup,
+ (i == 0) ? &local->loc : &local->loc2_copy,
+ xattr_req);
}
dict_unref (xattr_req);
@@ -1644,7 +1871,8 @@ dht_rename_lock (call_frame_t *frame)
if (local->dst_cached)
count++;
- lk_array = GF_CALLOC (count, sizeof (*lk_array), gf_common_mt_pointer);
+ lk_array = GF_CALLOC (count, sizeof (*lk_array),
+ gf_common_mt_pointer);
if (lk_array == NULL)
goto err;
@@ -1655,22 +1883,40 @@ dht_rename_lock (call_frame_t *frame)
goto err;
if (local->dst_cached) {
+ /* dst might be removed by the time inodelk reaches bricks,
+ * which can result in ESTALE errors. POSIX imposes no
+ * restriction for dst to be present for renames to be
+ * successful. So, we'll ignore ESTALE errors. As far as
+ * synchronization on dst goes, we'll achieve the same by
+ * holding entrylk on parent directory of dst in the namespace
+ * of basename(dst). Also, there might not be quorum in cluster
+ * xlators like EC/disperse on errno, in which case they return
+ * EIO. For eg., in a disperse (4 + 2), 3 might return success
+ * and three might return ESTALE. Disperse, having no Quorum
+ * unwinds inodelk with EIO. So, ignore EIO too.
+ */
lk_array[1] = dht_lock_new (frame->this, local->dst_cached,
&local->loc2, F_WRLCK,
DHT_FILE_MIGRATE_DOMAIN, NULL,
- FAIL_ON_ANY_ERROR);
+ IGNORE_ENOENT_ESTALE_EIO);
if (lk_array[1] == NULL)
goto err;
}
- local->lock[0].layout.parent_layout.locks = lk_array;
- local->lock[0].layout.parent_layout.lk_count = count;
+ local->rename_inodelk_backward_compatible = lk_array;
+ local->rename_inodelk_bc_count = count;
+ /* retaining inodelks for the sake of backward compatibility. Please
+ * make sure to remove this inodelk once all of 3.10, 3.12 and 3.13
+ * reach EOL. Better way of getting synchronization would be to acquire
+ * entrylks on src and dst parent directories in the namespace of
+ * basenames of src and dst
+ */
ret = dht_blocking_inodelk (frame, lk_array, count,
- dht_rename_lock_cbk);
+ dht_rename_file_protect_namespace);
if (ret < 0) {
- local->lock[0].layout.parent_layout.locks = NULL;
- local->lock[0].layout.parent_layout.lk_count = 0;
+ local->rename_inodelk_backward_compatible = NULL;
+ local->rename_inodelk_bc_count = 0;
goto err;
}
@@ -1701,6 +1947,7 @@ dht_rename (call_frame_t *frame, xlator_t *this,
dht_local_t *local = NULL;
dht_conf_t *conf = NULL;
char gfid[GF_UUID_BUF_SIZE] = {0};
+ char newgfid[GF_UUID_BUF_SIZE] = {0};
VALIDATE_OR_GOTO (frame, err);
VALIDATE_OR_GOTO (this, err);
@@ -1772,11 +2019,15 @@ dht_rename (call_frame_t *frame, xlator_t *this,
if (xdata)
local->xattr_req = dict_ref (xdata);
+ if (newloc->inode)
+ gf_uuid_unparse(newloc->inode->gfid, newgfid);
+
gf_msg (this->name, GF_LOG_INFO, 0,
DHT_MSG_RENAME_INFO,
- "renaming %s (hash=%s/cache=%s) => %s (hash=%s/cache=%s)",
- oldloc->path, src_hashed->name, src_cached->name,
- newloc->path, dst_hashed->name,
+ "renaming %s (%s) (hash=%s/cache=%s) => %s (%s) "
+ "(hash=%s/cache=%s) ",
+ oldloc->path, gfid, src_hashed->name, src_cached->name,
+ newloc->path, newloc->inode ? newgfid : NULL, dst_hashed->name,
dst_cached ? dst_cached->name : "<nul>");
if (IA_ISDIR (oldloc->inode->ia_type)) {
@@ -1784,8 +2035,10 @@ dht_rename (call_frame_t *frame, xlator_t *this,
} else {
local->op_ret = 0;
ret = dht_rename_lock (frame);
- if (ret < 0)
+ if (ret < 0) {
+ op_errno = ENOMEM;
goto err;
+ }
}
return 0;