summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAravinda VK <avishwan@redhat.com>2015-04-12 17:46:45 +0530
committerVijay Bellur <vbellur@redhat.com>2015-05-05 06:45:01 -0700
commit08107796c89f5f201b24d689ab6757237c743c0d (patch)
tree4fe4af7979e6734e9b2246fd4b2582570e8592f0
parentcfa6c85334fd62175aa114d779873b6790d6db8a (diff)
geo-rep: Minimize rm -rf race in Geo-rep
While doing RMDIR worker gets ENOTEMPTY because same directory will have files from other bricks which are not deleted since that worker is slow processing. So geo-rep does recursive_delete. Recursive delete was done using shutil.rmtree. once started, it will not check disk_gfid in between. So it ends up deleting the new files created by other workers. Also if other worker creates files after one worker gets list of files to be deleted, then first worker will again get ENOTEMPTY again. To fix these races, retry is added when it gets ENOTEMPTY/ESTALE/ENODATA. And disk_gfid check added for original path for which recursive_delete is called. This disk gfid check executed before every Unlink/Rmdir. If disk gfid is not matching with GFID from Changelog, that means other worker deleted the directory. Even if the subdir/file present, it belongs to different parent. Exit without performing further deletes. Retry on ENOENT during create is ignored, since if CREATE/MKNOD/MKDIR failed with ENOENT will not succeed unless parent directory is created again. Rsync errors handling was handling unlinked_gfids_list only for one Changelog, but when processed in batch it fails to detect unlinked_gfids and retries again. Finally skips the entire Changelogs in that batch. Fixed this issue by moving self.unlinked_gfids reset logic before batch start and after batch end. Most of the Geo-rep races with rm -rf is eliminated with this patch, but in some cases stale directories left in some bricks and in mount point we get ENOTEMPTY.(DHT issue, Error will be logged in Slave log) BUG: 1211037 Change-Id: I8716b88e4c741545f526095bf789f7c1e28008cb Signed-off-by: Aravinda VK <avishwan@redhat.com> Reviewed-on: http://review.gluster.org/10204 Reviewed-by: Kotresh HR <khiremat@redhat.com> Tested-by: Gluster Build System <jenkins@build.gluster.com> Tested-by: NetBSD Build System Reviewed-by: Vijay Bellur <vbellur@redhat.com>
-rw-r--r--geo-replication/syncdaemon/master.py3
-rw-r--r--geo-replication/syncdaemon/resource.py85
-rw-r--r--geo-replication/syncdaemon/syncdutils.py6
3 files changed, 68 insertions, 26 deletions
diff --git a/geo-replication/syncdaemon/master.py b/geo-replication/syncdaemon/master.py
index 8e4c43046b0..ef79f02a52c 100644
--- a/geo-replication/syncdaemon/master.py
+++ b/geo-replication/syncdaemon/master.py
@@ -244,7 +244,6 @@ class TarSSHEngine(object):
"""
def a_syncdata(self, files):
- self.unlinked_gfids = []
logging.debug('files: %s' % (files))
self.current_files_skipped_count = 0
del self.skipped_gfid_list[:]
@@ -283,7 +282,6 @@ class RsyncEngine(object):
"""Sync engine that uses rsync(1) for data transfers"""
def a_syncdata(self, files):
- self.unlinked_gfids = []
logging.debug('files: %s' % (files))
self.current_files_skipped_count = 0
del self.skipped_gfid_list[:]
@@ -930,6 +928,7 @@ class GMasterChangelogMixin(GMasterCommon):
def process(self, changes, done=1):
tries = 0
retry = False
+ self.unlinked_gfids = []
while True:
self.skipped_gfid_list = []
diff --git a/geo-replication/syncdaemon/resource.py b/geo-replication/syncdaemon/resource.py
index 6bf1ad03e70..2a04d632091 100644
--- a/geo-replication/syncdaemon/resource.py
+++ b/geo-replication/syncdaemon/resource.py
@@ -597,11 +597,9 @@ class Server(object):
# to be purged is the GFID gotten from the changelog.
# (a stat(changelog_gfid) would also be valid here)
# The race here is between the GFID check and the purge.
- disk_gfid = cls.gfid_mnt(entry)
- if isinstance(disk_gfid, int):
- return
- if not gfid == disk_gfid:
+ if not matching_disk_gfid(gfid, entry):
return
+
er = errno_wrap(os.unlink, [entry], [ENOENT, EISDIR])
if isinstance(er, int):
if er == EISDIR:
@@ -624,6 +622,48 @@ class Server(object):
failures.append((e, cmd_ret))
failures = []
+
+ def matching_disk_gfid(gfid, entry):
+ disk_gfid = cls.gfid_mnt(entry)
+ if isinstance(disk_gfid, int):
+ return False
+
+ if not gfid == disk_gfid:
+ return False
+
+ return True
+
+ def recursive_rmdir(gfid, entry, path):
+ """disk_gfid check added for original path for which
+ recursive_delete is called. This disk gfid check executed
+ before every Unlink/Rmdir. If disk gfid is not matching
+ with GFID from Changelog, that means other worker
+ deleted the directory. Even if the subdir/file present,
+ it belongs to different parent. Exit without performing
+ further deletes.
+ """
+ if not matching_disk_gfid(gfid, entry):
+ return
+
+ names = []
+ names = errno_wrap(os.listdir, [path], [ENOENT])
+ if isinstance(names, int):
+ return
+
+ for name in names:
+ fullname = os.path.join(path, name)
+ if not matching_disk_gfid(gfid, entry):
+ return
+ er = errno_wrap(os.remove, [fullname], [ENOENT, EISDIR])
+
+ if er == EISDIR:
+ recursive_rmdir(gfid, entry, fullname)
+
+ if not matching_disk_gfid(gfid, entry):
+ return
+
+ errno_wrap(os.rmdir, [path], [ENOENT])
+
for e in entries:
blob = None
op = e['op']
@@ -631,23 +671,26 @@ class Server(object):
entry = e['entry']
(pg, bname) = entry2pb(entry)
if op in ['RMDIR', 'UNLINK']:
- while True:
- er = entry_purge(entry, gfid)
- if isinstance(er, int):
- if er == ENOTEMPTY and op == 'RMDIR':
- er1 = errno_wrap(shutil.rmtree,
- [os.path.join(pg, bname)],
- [ENOENT])
- if not isinstance(er1, int):
- logging.info("Removed %s/%s recursively" %
- (pg, bname))
- break
-
+ # Try once, if rmdir failed with ENOTEMPTY
+ # then delete recursively.
+ er = entry_purge(entry, gfid)
+ if isinstance(er, int):
+ if er == ENOTEMPTY and op == 'RMDIR':
+ # Retry if ENOTEMPTY, ESTALE
+ er1 = errno_wrap(recursive_rmdir,
+ [gfid, entry,
+ os.path.join(pg, bname)],
+ [], [ENOTEMPTY, ESTALE, ENODATA])
+ if not isinstance(er1, int):
+ logging.debug("Removed %s => %s/%s recursively" %
+ (gfid, pg, bname))
+ else:
+ logging.warn("Recursive remove %s => %s/%s"
+ "failed: %s" % (gfid, pg, bname,
+ os.strerror(er1)))
+ else:
logging.warn("Failed to remove %s => %s/%s. %s" %
(gfid, pg, bname, os.strerror(er)))
- time.sleep(1)
- else:
- break
elif op in ['CREATE', 'MKNOD']:
blob = entry_pack_reg(
gfid, bname, e['mode'], e['uid'], e['gid'])
@@ -682,8 +725,8 @@ class Server(object):
if blob:
cmd_ret = errno_wrap(Xattr.lsetxattr,
[pg, 'glusterfs.gfid.newfile', blob],
- [EEXIST],
- [ENOENT, ESTALE, EINVAL])
+ [EEXIST, ENOENT],
+ [ESTALE, EINVAL])
collect_failure(e, cmd_ret)
return failures
diff --git a/geo-replication/syncdaemon/syncdutils.py b/geo-replication/syncdaemon/syncdutils.py
index 2614c828104..b565ec66cb5 100644
--- a/geo-replication/syncdaemon/syncdutils.py
+++ b/geo-replication/syncdaemon/syncdutils.py
@@ -466,9 +466,8 @@ def selfkill(sig=SIGTERM):
os.kill(os.getpid(), sig)
-def errno_wrap(call, arg=[], errnos=[], retry_errnos=[ESTALE]):
+def errno_wrap(call, arg=[], errnos=[], retry_errnos=[]):
""" wrapper around calls resilient to errnos.
- retry in case of ESTALE by default.
"""
nr_tries = 0
while True:
@@ -483,7 +482,8 @@ def errno_wrap(call, arg=[], errnos=[], retry_errnos=[ESTALE]):
nr_tries += 1
if nr_tries == GF_OP_RETRIES:
# probably a screwed state, cannot do much...
- logging.warn('reached maximum retries (%s)...' % repr(arg))
+ logging.warn('reached maximum retries (%s)...%s' %
+ (repr(arg), ex))
return ex.errno
time.sleep(0.250) # retry the call