summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--geo-replication/syncdaemon/master.py3
-rw-r--r--geo-replication/syncdaemon/resource.py85
-rw-r--r--geo-replication/syncdaemon/syncdutils.py6
3 files changed, 68 insertions, 26 deletions
diff --git a/geo-replication/syncdaemon/master.py b/geo-replication/syncdaemon/master.py
index 8e4c43046b0..ef79f02a52c 100644
--- a/geo-replication/syncdaemon/master.py
+++ b/geo-replication/syncdaemon/master.py
@@ -244,7 +244,6 @@ class TarSSHEngine(object):
"""
def a_syncdata(self, files):
- self.unlinked_gfids = []
logging.debug('files: %s' % (files))
self.current_files_skipped_count = 0
del self.skipped_gfid_list[:]
@@ -283,7 +282,6 @@ class RsyncEngine(object):
"""Sync engine that uses rsync(1) for data transfers"""
def a_syncdata(self, files):
- self.unlinked_gfids = []
logging.debug('files: %s' % (files))
self.current_files_skipped_count = 0
del self.skipped_gfid_list[:]
@@ -930,6 +928,7 @@ class GMasterChangelogMixin(GMasterCommon):
def process(self, changes, done=1):
tries = 0
retry = False
+ self.unlinked_gfids = []
while True:
self.skipped_gfid_list = []
diff --git a/geo-replication/syncdaemon/resource.py b/geo-replication/syncdaemon/resource.py
index 6bf1ad03e70..2a04d632091 100644
--- a/geo-replication/syncdaemon/resource.py
+++ b/geo-replication/syncdaemon/resource.py
@@ -597,11 +597,9 @@ class Server(object):
# to be purged is the GFID gotten from the changelog.
# (a stat(changelog_gfid) would also be valid here)
# The race here is between the GFID check and the purge.
- disk_gfid = cls.gfid_mnt(entry)
- if isinstance(disk_gfid, int):
- return
- if not gfid == disk_gfid:
+ if not matching_disk_gfid(gfid, entry):
return
+
er = errno_wrap(os.unlink, [entry], [ENOENT, EISDIR])
if isinstance(er, int):
if er == EISDIR:
@@ -624,6 +622,48 @@ class Server(object):
failures.append((e, cmd_ret))
failures = []
+
+ def matching_disk_gfid(gfid, entry):
+ disk_gfid = cls.gfid_mnt(entry)
+ if isinstance(disk_gfid, int):
+ return False
+
+ if not gfid == disk_gfid:
+ return False
+
+ return True
+
+ def recursive_rmdir(gfid, entry, path):
+ """disk_gfid check added for original path for which
+ recursive_delete is called. This disk gfid check executed
+ before every Unlink/Rmdir. If disk gfid is not matching
+ with GFID from Changelog, that means other worker
+ deleted the directory. Even if the subdir/file present,
+ it belongs to different parent. Exit without performing
+ further deletes.
+ """
+ if not matching_disk_gfid(gfid, entry):
+ return
+
+ names = []
+ names = errno_wrap(os.listdir, [path], [ENOENT])
+ if isinstance(names, int):
+ return
+
+ for name in names:
+ fullname = os.path.join(path, name)
+ if not matching_disk_gfid(gfid, entry):
+ return
+ er = errno_wrap(os.remove, [fullname], [ENOENT, EISDIR])
+
+ if er == EISDIR:
+ recursive_rmdir(gfid, entry, fullname)
+
+ if not matching_disk_gfid(gfid, entry):
+ return
+
+ errno_wrap(os.rmdir, [path], [ENOENT])
+
for e in entries:
blob = None
op = e['op']
@@ -631,23 +671,26 @@ class Server(object):
entry = e['entry']
(pg, bname) = entry2pb(entry)
if op in ['RMDIR', 'UNLINK']:
- while True:
- er = entry_purge(entry, gfid)
- if isinstance(er, int):
- if er == ENOTEMPTY and op == 'RMDIR':
- er1 = errno_wrap(shutil.rmtree,
- [os.path.join(pg, bname)],
- [ENOENT])
- if not isinstance(er1, int):
- logging.info("Removed %s/%s recursively" %
- (pg, bname))
- break
-
+ # Try once, if rmdir failed with ENOTEMPTY
+ # then delete recursively.
+ er = entry_purge(entry, gfid)
+ if isinstance(er, int):
+ if er == ENOTEMPTY and op == 'RMDIR':
+ # Retry if ENOTEMPTY, ESTALE
+ er1 = errno_wrap(recursive_rmdir,
+ [gfid, entry,
+ os.path.join(pg, bname)],
+ [], [ENOTEMPTY, ESTALE, ENODATA])
+ if not isinstance(er1, int):
+ logging.debug("Removed %s => %s/%s recursively" %
+ (gfid, pg, bname))
+ else:
+ logging.warn("Recursive remove %s => %s/%s"
+ "failed: %s" % (gfid, pg, bname,
+ os.strerror(er1)))
+ else:
logging.warn("Failed to remove %s => %s/%s. %s" %
(gfid, pg, bname, os.strerror(er)))
- time.sleep(1)
- else:
- break
elif op in ['CREATE', 'MKNOD']:
blob = entry_pack_reg(
gfid, bname, e['mode'], e['uid'], e['gid'])
@@ -682,8 +725,8 @@ class Server(object):
if blob:
cmd_ret = errno_wrap(Xattr.lsetxattr,
[pg, 'glusterfs.gfid.newfile', blob],
- [EEXIST],
- [ENOENT, ESTALE, EINVAL])
+ [EEXIST, ENOENT],
+ [ESTALE, EINVAL])
collect_failure(e, cmd_ret)
return failures
diff --git a/geo-replication/syncdaemon/syncdutils.py b/geo-replication/syncdaemon/syncdutils.py
index 2614c828104..b565ec66cb5 100644
--- a/geo-replication/syncdaemon/syncdutils.py
+++ b/geo-replication/syncdaemon/syncdutils.py
@@ -466,9 +466,8 @@ def selfkill(sig=SIGTERM):
os.kill(os.getpid(), sig)
-def errno_wrap(call, arg=[], errnos=[], retry_errnos=[ESTALE]):
+def errno_wrap(call, arg=[], errnos=[], retry_errnos=[]):
""" wrapper around calls resilient to errnos.
- retry in case of ESTALE by default.
"""
nr_tries = 0
while True:
@@ -483,7 +482,8 @@ def errno_wrap(call, arg=[], errnos=[], retry_errnos=[ESTALE]):
nr_tries += 1
if nr_tries == GF_OP_RETRIES:
# probably a screwed state, cannot do much...
- logging.warn('reached maximum retries (%s)...' % repr(arg))
+ logging.warn('reached maximum retries (%s)...%s' %
+ (repr(arg), ex))
return ex.errno
time.sleep(0.250) # retry the call