From 08107796c89f5f201b24d689ab6757237c743c0d Mon Sep 17 00:00:00 2001 From: Aravinda VK Date: Sun, 12 Apr 2015 17:46:45 +0530 Subject: geo-rep: Minimize rm -rf race in Geo-rep While doing RMDIR worker gets ENOTEMPTY because same directory will have files from other bricks which are not deleted since that worker is slow processing. So geo-rep does recursive_delete. Recursive delete was done using shutil.rmtree. once started, it will not check disk_gfid in between. So it ends up deleting the new files created by other workers. Also if other worker creates files after one worker gets list of files to be deleted, then first worker will again get ENOTEMPTY again. To fix these races, retry is added when it gets ENOTEMPTY/ESTALE/ENODATA. And disk_gfid check added for original path for which recursive_delete is called. This disk gfid check executed before every Unlink/Rmdir. If disk gfid is not matching with GFID from Changelog, that means other worker deleted the directory. Even if the subdir/file present, it belongs to different parent. Exit without performing further deletes. Retry on ENOENT during create is ignored, since if CREATE/MKNOD/MKDIR failed with ENOENT will not succeed unless parent directory is created again. Rsync errors handling was handling unlinked_gfids_list only for one Changelog, but when processed in batch it fails to detect unlinked_gfids and retries again. Finally skips the entire Changelogs in that batch. Fixed this issue by moving self.unlinked_gfids reset logic before batch start and after batch end. Most of the Geo-rep races with rm -rf is eliminated with this patch, but in some cases stale directories left in some bricks and in mount point we get ENOTEMPTY.(DHT issue, Error will be logged in Slave log) BUG: 1211037 Change-Id: I8716b88e4c741545f526095bf789f7c1e28008cb Signed-off-by: Aravinda VK Reviewed-on: http://review.gluster.org/10204 Reviewed-by: Kotresh HR Tested-by: Gluster Build System Tested-by: NetBSD Build System Reviewed-by: Vijay Bellur --- geo-replication/syncdaemon/master.py | 3 +- geo-replication/syncdaemon/resource.py | 85 ++++++++++++++++++++++++-------- geo-replication/syncdaemon/syncdutils.py | 6 +-- 3 files changed, 68 insertions(+), 26 deletions(-) (limited to 'geo-replication') diff --git a/geo-replication/syncdaemon/master.py b/geo-replication/syncdaemon/master.py index 8e4c43046b0..ef79f02a52c 100644 --- a/geo-replication/syncdaemon/master.py +++ b/geo-replication/syncdaemon/master.py @@ -244,7 +244,6 @@ class TarSSHEngine(object): """ def a_syncdata(self, files): - self.unlinked_gfids = [] logging.debug('files: %s' % (files)) self.current_files_skipped_count = 0 del self.skipped_gfid_list[:] @@ -283,7 +282,6 @@ class RsyncEngine(object): """Sync engine that uses rsync(1) for data transfers""" def a_syncdata(self, files): - self.unlinked_gfids = [] logging.debug('files: %s' % (files)) self.current_files_skipped_count = 0 del self.skipped_gfid_list[:] @@ -930,6 +928,7 @@ class GMasterChangelogMixin(GMasterCommon): def process(self, changes, done=1): tries = 0 retry = False + self.unlinked_gfids = [] while True: self.skipped_gfid_list = [] diff --git a/geo-replication/syncdaemon/resource.py b/geo-replication/syncdaemon/resource.py index 6bf1ad03e70..2a04d632091 100644 --- a/geo-replication/syncdaemon/resource.py +++ b/geo-replication/syncdaemon/resource.py @@ -597,11 +597,9 @@ class Server(object): # to be purged is the GFID gotten from the changelog. # (a stat(changelog_gfid) would also be valid here) # The race here is between the GFID check and the purge. - disk_gfid = cls.gfid_mnt(entry) - if isinstance(disk_gfid, int): - return - if not gfid == disk_gfid: + if not matching_disk_gfid(gfid, entry): return + er = errno_wrap(os.unlink, [entry], [ENOENT, EISDIR]) if isinstance(er, int): if er == EISDIR: @@ -624,6 +622,48 @@ class Server(object): failures.append((e, cmd_ret)) failures = [] + + def matching_disk_gfid(gfid, entry): + disk_gfid = cls.gfid_mnt(entry) + if isinstance(disk_gfid, int): + return False + + if not gfid == disk_gfid: + return False + + return True + + def recursive_rmdir(gfid, entry, path): + """disk_gfid check added for original path for which + recursive_delete is called. This disk gfid check executed + before every Unlink/Rmdir. If disk gfid is not matching + with GFID from Changelog, that means other worker + deleted the directory. Even if the subdir/file present, + it belongs to different parent. Exit without performing + further deletes. + """ + if not matching_disk_gfid(gfid, entry): + return + + names = [] + names = errno_wrap(os.listdir, [path], [ENOENT]) + if isinstance(names, int): + return + + for name in names: + fullname = os.path.join(path, name) + if not matching_disk_gfid(gfid, entry): + return + er = errno_wrap(os.remove, [fullname], [ENOENT, EISDIR]) + + if er == EISDIR: + recursive_rmdir(gfid, entry, fullname) + + if not matching_disk_gfid(gfid, entry): + return + + errno_wrap(os.rmdir, [path], [ENOENT]) + for e in entries: blob = None op = e['op'] @@ -631,23 +671,26 @@ class Server(object): entry = e['entry'] (pg, bname) = entry2pb(entry) if op in ['RMDIR', 'UNLINK']: - while True: - er = entry_purge(entry, gfid) - if isinstance(er, int): - if er == ENOTEMPTY and op == 'RMDIR': - er1 = errno_wrap(shutil.rmtree, - [os.path.join(pg, bname)], - [ENOENT]) - if not isinstance(er1, int): - logging.info("Removed %s/%s recursively" % - (pg, bname)) - break - + # Try once, if rmdir failed with ENOTEMPTY + # then delete recursively. + er = entry_purge(entry, gfid) + if isinstance(er, int): + if er == ENOTEMPTY and op == 'RMDIR': + # Retry if ENOTEMPTY, ESTALE + er1 = errno_wrap(recursive_rmdir, + [gfid, entry, + os.path.join(pg, bname)], + [], [ENOTEMPTY, ESTALE, ENODATA]) + if not isinstance(er1, int): + logging.debug("Removed %s => %s/%s recursively" % + (gfid, pg, bname)) + else: + logging.warn("Recursive remove %s => %s/%s" + "failed: %s" % (gfid, pg, bname, + os.strerror(er1))) + else: logging.warn("Failed to remove %s => %s/%s. %s" % (gfid, pg, bname, os.strerror(er))) - time.sleep(1) - else: - break elif op in ['CREATE', 'MKNOD']: blob = entry_pack_reg( gfid, bname, e['mode'], e['uid'], e['gid']) @@ -682,8 +725,8 @@ class Server(object): if blob: cmd_ret = errno_wrap(Xattr.lsetxattr, [pg, 'glusterfs.gfid.newfile', blob], - [EEXIST], - [ENOENT, ESTALE, EINVAL]) + [EEXIST, ENOENT], + [ESTALE, EINVAL]) collect_failure(e, cmd_ret) return failures diff --git a/geo-replication/syncdaemon/syncdutils.py b/geo-replication/syncdaemon/syncdutils.py index 2614c828104..b565ec66cb5 100644 --- a/geo-replication/syncdaemon/syncdutils.py +++ b/geo-replication/syncdaemon/syncdutils.py @@ -466,9 +466,8 @@ def selfkill(sig=SIGTERM): os.kill(os.getpid(), sig) -def errno_wrap(call, arg=[], errnos=[], retry_errnos=[ESTALE]): +def errno_wrap(call, arg=[], errnos=[], retry_errnos=[]): """ wrapper around calls resilient to errnos. - retry in case of ESTALE by default. """ nr_tries = 0 while True: @@ -483,7 +482,8 @@ def errno_wrap(call, arg=[], errnos=[], retry_errnos=[ESTALE]): nr_tries += 1 if nr_tries == GF_OP_RETRIES: # probably a screwed state, cannot do much... - logging.warn('reached maximum retries (%s)...' % repr(arg)) + logging.warn('reached maximum retries (%s)...%s' % + (repr(arg), ex)) return ex.errno time.sleep(0.250) # retry the call -- cgit