summaryrefslogtreecommitdiffstats
path: root/geo-replication/syncdaemon/resource.py
diff options
context:
space:
mode:
Diffstat (limited to 'geo-replication/syncdaemon/resource.py')
-rw-r--r--geo-replication/syncdaemon/resource.py85
1 files changed, 64 insertions, 21 deletions
diff --git a/geo-replication/syncdaemon/resource.py b/geo-replication/syncdaemon/resource.py
index 6bf1ad03e70..2a04d632091 100644
--- a/geo-replication/syncdaemon/resource.py
+++ b/geo-replication/syncdaemon/resource.py
@@ -597,11 +597,9 @@ class Server(object):
# to be purged is the GFID gotten from the changelog.
# (a stat(changelog_gfid) would also be valid here)
# The race here is between the GFID check and the purge.
- disk_gfid = cls.gfid_mnt(entry)
- if isinstance(disk_gfid, int):
- return
- if not gfid == disk_gfid:
+ if not matching_disk_gfid(gfid, entry):
return
+
er = errno_wrap(os.unlink, [entry], [ENOENT, EISDIR])
if isinstance(er, int):
if er == EISDIR:
@@ -624,6 +622,48 @@ class Server(object):
failures.append((e, cmd_ret))
failures = []
+
+ def matching_disk_gfid(gfid, entry):
+ disk_gfid = cls.gfid_mnt(entry)
+ if isinstance(disk_gfid, int):
+ return False
+
+ if not gfid == disk_gfid:
+ return False
+
+ return True
+
+ def recursive_rmdir(gfid, entry, path):
+ """disk_gfid check added for original path for which
+ recursive_delete is called. This disk gfid check executed
+ before every Unlink/Rmdir. If disk gfid is not matching
+ with GFID from Changelog, that means other worker
+ deleted the directory. Even if the subdir/file present,
+ it belongs to different parent. Exit without performing
+ further deletes.
+ """
+ if not matching_disk_gfid(gfid, entry):
+ return
+
+ names = []
+ names = errno_wrap(os.listdir, [path], [ENOENT])
+ if isinstance(names, int):
+ return
+
+ for name in names:
+ fullname = os.path.join(path, name)
+ if not matching_disk_gfid(gfid, entry):
+ return
+ er = errno_wrap(os.remove, [fullname], [ENOENT, EISDIR])
+
+ if er == EISDIR:
+ recursive_rmdir(gfid, entry, fullname)
+
+ if not matching_disk_gfid(gfid, entry):
+ return
+
+ errno_wrap(os.rmdir, [path], [ENOENT])
+
for e in entries:
blob = None
op = e['op']
@@ -631,23 +671,26 @@ class Server(object):
entry = e['entry']
(pg, bname) = entry2pb(entry)
if op in ['RMDIR', 'UNLINK']:
- while True:
- er = entry_purge(entry, gfid)
- if isinstance(er, int):
- if er == ENOTEMPTY and op == 'RMDIR':
- er1 = errno_wrap(shutil.rmtree,
- [os.path.join(pg, bname)],
- [ENOENT])
- if not isinstance(er1, int):
- logging.info("Removed %s/%s recursively" %
- (pg, bname))
- break
-
+ # Try once, if rmdir failed with ENOTEMPTY
+ # then delete recursively.
+ er = entry_purge(entry, gfid)
+ if isinstance(er, int):
+ if er == ENOTEMPTY and op == 'RMDIR':
+ # Retry if ENOTEMPTY, ESTALE
+ er1 = errno_wrap(recursive_rmdir,
+ [gfid, entry,
+ os.path.join(pg, bname)],
+ [], [ENOTEMPTY, ESTALE, ENODATA])
+ if not isinstance(er1, int):
+ logging.debug("Removed %s => %s/%s recursively" %
+ (gfid, pg, bname))
+ else:
+ logging.warn("Recursive remove %s => %s/%s"
+ "failed: %s" % (gfid, pg, bname,
+ os.strerror(er1)))
+ else:
logging.warn("Failed to remove %s => %s/%s. %s" %
(gfid, pg, bname, os.strerror(er)))
- time.sleep(1)
- else:
- break
elif op in ['CREATE', 'MKNOD']:
blob = entry_pack_reg(
gfid, bname, e['mode'], e['uid'], e['gid'])
@@ -682,8 +725,8 @@ class Server(object):
if blob:
cmd_ret = errno_wrap(Xattr.lsetxattr,
[pg, 'glusterfs.gfid.newfile', blob],
- [EEXIST],
- [ENOENT, ESTALE, EINVAL])
+ [EEXIST, ENOENT],
+ [ESTALE, EINVAL])
collect_failure(e, cmd_ret)
return failures