diff options
Diffstat (limited to 'geo-replication')
| -rw-r--r-- | geo-replication/syncdaemon/master.py | 3 | ||||
| -rw-r--r-- | geo-replication/syncdaemon/resource.py | 85 | ||||
| -rw-r--r-- | geo-replication/syncdaemon/syncdutils.py | 6 | 
3 files changed, 68 insertions, 26 deletions
| diff --git a/geo-replication/syncdaemon/master.py b/geo-replication/syncdaemon/master.py index 8e4c43046b0..ef79f02a52c 100644 --- a/geo-replication/syncdaemon/master.py +++ b/geo-replication/syncdaemon/master.py @@ -244,7 +244,6 @@ class TarSSHEngine(object):      """      def a_syncdata(self, files): -        self.unlinked_gfids = []          logging.debug('files: %s' % (files))          self.current_files_skipped_count = 0          del self.skipped_gfid_list[:] @@ -283,7 +282,6 @@ class RsyncEngine(object):      """Sync engine that uses rsync(1) for data transfers"""      def a_syncdata(self, files): -        self.unlinked_gfids = []          logging.debug('files: %s' % (files))          self.current_files_skipped_count = 0          del self.skipped_gfid_list[:] @@ -930,6 +928,7 @@ class GMasterChangelogMixin(GMasterCommon):      def process(self, changes, done=1):          tries = 0          retry = False +        self.unlinked_gfids = []          while True:              self.skipped_gfid_list = [] diff --git a/geo-replication/syncdaemon/resource.py b/geo-replication/syncdaemon/resource.py index 6bf1ad03e70..2a04d632091 100644 --- a/geo-replication/syncdaemon/resource.py +++ b/geo-replication/syncdaemon/resource.py @@ -597,11 +597,9 @@ class Server(object):              # to be purged is the GFID gotten from the changelog.              # (a stat(changelog_gfid) would also be valid here)              # The race here is between the GFID check and the purge. -            disk_gfid = cls.gfid_mnt(entry) -            if isinstance(disk_gfid, int): -                return -            if not gfid == disk_gfid: +            if not matching_disk_gfid(gfid, entry):                  return +              er = errno_wrap(os.unlink, [entry], [ENOENT, EISDIR])              if isinstance(er, int):                  if er == EISDIR: @@ -624,6 +622,48 @@ class Server(object):                  failures.append((e, cmd_ret))          failures = [] + +        def matching_disk_gfid(gfid, entry): +            disk_gfid = cls.gfid_mnt(entry) +            if isinstance(disk_gfid, int): +                return False + +            if not gfid == disk_gfid: +                return False + +            return True + +        def recursive_rmdir(gfid, entry, path): +            """disk_gfid check added for original path for which +            recursive_delete is called. This disk gfid check executed +            before every Unlink/Rmdir. If disk gfid is not matching +            with GFID from Changelog, that means other worker +            deleted the directory. Even if the subdir/file present, +            it belongs to different parent. Exit without performing +            further deletes. +            """ +            if not matching_disk_gfid(gfid, entry): +                return + +            names = [] +            names = errno_wrap(os.listdir, [path], [ENOENT]) +            if isinstance(names, int): +                return + +            for name in names: +                fullname = os.path.join(path, name) +                if not matching_disk_gfid(gfid, entry): +                    return +                er = errno_wrap(os.remove, [fullname], [ENOENT, EISDIR]) + +                if er == EISDIR: +                    recursive_rmdir(gfid, entry, fullname) + +            if not matching_disk_gfid(gfid, entry): +                return + +            errno_wrap(os.rmdir, [path], [ENOENT]) +          for e in entries:              blob = None              op = e['op'] @@ -631,23 +671,26 @@ class Server(object):              entry = e['entry']              (pg, bname) = entry2pb(entry)              if op in ['RMDIR', 'UNLINK']: -                while True: -                    er = entry_purge(entry, gfid) -                    if isinstance(er, int): -                        if er == ENOTEMPTY and op == 'RMDIR': -                            er1 = errno_wrap(shutil.rmtree, -                                             [os.path.join(pg, bname)], -                                             [ENOENT]) -                            if not isinstance(er1, int): -                                logging.info("Removed %s/%s recursively" % -                                             (pg, bname)) -                                break - +                # Try once, if rmdir failed with ENOTEMPTY +                # then delete recursively. +                er = entry_purge(entry, gfid) +                if isinstance(er, int): +                    if er == ENOTEMPTY and op == 'RMDIR': +                        # Retry if ENOTEMPTY, ESTALE +                        er1 = errno_wrap(recursive_rmdir, +                                         [gfid, entry, +                                          os.path.join(pg, bname)], +                                         [], [ENOTEMPTY, ESTALE, ENODATA]) +                        if not isinstance(er1, int): +                            logging.debug("Removed %s => %s/%s recursively" % +                                          (gfid, pg, bname)) +                        else: +                            logging.warn("Recursive remove %s => %s/%s" +                                         "failed: %s" % (gfid, pg, bname, +                                                         os.strerror(er1))) +                    else:                          logging.warn("Failed to remove %s => %s/%s. %s" %                                       (gfid, pg, bname, os.strerror(er))) -                        time.sleep(1) -                    else: -                        break              elif op in ['CREATE', 'MKNOD']:                  blob = entry_pack_reg(                      gfid, bname, e['mode'], e['uid'], e['gid']) @@ -682,8 +725,8 @@ class Server(object):              if blob:                  cmd_ret = errno_wrap(Xattr.lsetxattr,                                       [pg, 'glusterfs.gfid.newfile', blob], -                                     [EEXIST], -                                     [ENOENT, ESTALE, EINVAL]) +                                     [EEXIST, ENOENT], +                                     [ESTALE, EINVAL])                  collect_failure(e, cmd_ret)          return failures diff --git a/geo-replication/syncdaemon/syncdutils.py b/geo-replication/syncdaemon/syncdutils.py index 2614c828104..b565ec66cb5 100644 --- a/geo-replication/syncdaemon/syncdutils.py +++ b/geo-replication/syncdaemon/syncdutils.py @@ -466,9 +466,8 @@ def selfkill(sig=SIGTERM):      os.kill(os.getpid(), sig) -def errno_wrap(call, arg=[], errnos=[], retry_errnos=[ESTALE]): +def errno_wrap(call, arg=[], errnos=[], retry_errnos=[]):      """ wrapper around calls resilient to errnos. -    retry in case of ESTALE by default.      """      nr_tries = 0      while True: @@ -483,7 +482,8 @@ def errno_wrap(call, arg=[], errnos=[], retry_errnos=[ESTALE]):              nr_tries += 1              if nr_tries == GF_OP_RETRIES:                  # probably a screwed state, cannot do much... -                logging.warn('reached maximum retries (%s)...' % repr(arg)) +                logging.warn('reached maximum retries (%s)...%s' % +                             (repr(arg), ex))                  return ex.errno              time.sleep(0.250)  # retry the call | 
