diff options
| author | Kotresh HR <khiremat@redhat.com> | 2017-06-19 13:40:35 -0400 | 
|---|---|---|
| committer | Aravinda VK <avishwan@redhat.com> | 2017-07-21 10:38:12 +0000 | 
| commit | 88af8d7ac515fcde1b8dc096afe6dbe7ab40d6ea (patch) | |
| tree | 9ba560f2f2c56e4cf5fbaad7846079640c2bf0d9 | |
| parent | 890ae2a1b2ce72d22657e7463405e59bee1e298a (diff) | |
geo-rep: Handle possible entry failures gracefully
Updates: #246
Change-Id: If0ce83fe8dd3068bfb671f398b2e82ac831288d0
Signed-off-by: Kotresh HR <khiremat@redhat.com>
Reviewed-on: https://review.gluster.org/17577
Smoke: Gluster Build System <jenkins@build.gluster.org>
CentOS-regression: Gluster Build System <jenkins@build.gluster.org>
Reviewed-by: Aravinda VK <avishwan@redhat.com>
| -rw-r--r-- | geo-replication/syncdaemon/master.py | 93 | ||||
| -rw-r--r-- | geo-replication/syncdaemon/resource.py | 27 | 
2 files changed, 107 insertions, 13 deletions
diff --git a/geo-replication/syncdaemon/master.py b/geo-replication/syncdaemon/master.py index 372717e36f2..9a53189348e 100644 --- a/geo-replication/syncdaemon/master.py +++ b/geo-replication/syncdaemon/master.py @@ -691,6 +691,8 @@ class GMasterChangelogMixin(GMasterCommon):      TYPE_GFID = "D "      TYPE_ENTRY = "E " +    MAX_EF_RETRIES = 15 +      # flat directory hierarchy for gfid based access      FLAT_DIR_HIERARCHY = '.' @@ -785,6 +787,95 @@ class GMasterChangelogMixin(GMasterCommon):          self.status.inc_value("failures", num_failures) +    def fix_possible_entry_failures(self, failures, retry_count): +        pfx = gauxpfx() +        fix_entry_ops = [] +        failures1 = [] +        for failure in failures: +            if failure[2]['gfid_mismatch']: +                slave_gfid = failure[2]['slave_gfid'] +                st = lstat(os.path.join(pfx, slave_gfid)) +                if isinstance(st, int) and st == ENOENT: +                    logging.info ("Fixing gfid mismatch [%s]: Deleting %s" +                                  % (retry_count, repr(failure))) +                    #Add deletion to fix_entry_ops list +                    pbname = failure[0]['entry'] +                    if failure[2]['slave_isdir']: +                        fix_entry_ops.append(edct('RMDIR', +                                                  gfid=failure[2]['slave_gfid'], +                                                  entry=pbname)) +                    else: +                        fix_entry_ops.append(edct('UNLINK', +                                                  gfid=failure[2]['slave_gfid'], +                                                  entry=pbname)) +                elif not isinstance(st, int): +                    #The file exists on master but with different name. +                    #Probabaly renamed and got missed during xsync crawl. +                    if failure[2]['slave_isdir']: +                        logging.info ("Fixing gfid mismatch [%s]: %s" +                                      % (retry_count, repr(failure))) +                        realpath = os.readlink(os.path.join(gconf.local_path, +                                                            ".glusterfs", +                                                            slave_gfid[0:2], +                                                            slave_gfid[2:4], +                                                            slave_gfid)) +                        dst_entry = os.path.join(pfx, realpath.split('/')[-2], +                                                 realpath.split('/')[-1]) +                        rename_dict = edct('RENAME', gfid=slave_gfid, +                                           entry=failure[0]['entry'], +                                           entry1=dst_entry, stat=st, +                                           link=None) +                        logging.info ("Fixing gfid mismatch [%s]: Renaming %s" +                                      % (retry_count, repr(rename_dict))) +                        fix_entry_ops.append(rename_dict) +                    else: +                        logging.info ("Fixing gfid mismatch [%s]: Deleting %s" +                                      % (retry_count, repr(failure))) +                        pbname = failure[0]['entry'] +                        fix_entry_ops.append(edct('UNLINK', +                                                  gfid=failure[2]['slave_gfid'], +                                                  entry=pbname)) +                        logging.error ("GFID MISMATCH: ENTRY CANNOT BE FIXED: " +                                       "gfid: %s" % slave_gfid) + +        if fix_entry_ops: +            #Process deletions of entries whose gfids are mismatched +            failures1 = self.slave.server.entry_ops(fix_entry_ops) +            if not failures1: +                logging.info ("Sucessfully fixed entry ops with gfid mismatch") + +        return failures1 + +    def handle_entry_failures(self, failures, entries): +        retries = 0 +        pending_failures = False +        failures1 = [] +        failures2 = [] + +        if failures: +            pending_failures = True +            failures1 = failures + +            while pending_failures and retries < self.MAX_EF_RETRIES: +                retries += 1 +                failures2 = self.fix_possible_entry_failures(failures1, +                                                             retries) +                if not failures2: +                    pending_failures = False +                else: +                    pending_failures = True +                    failures1 = failures2 + +            if pending_failures: +                for failure in failures1: +                    logging.error("Failed to fix entry ops %s", repr(failure)) +            else: +                #Retry original entry list 5 times +                failures = self.slave.server.entry_ops(entries) + +            self.log_failures(failures, 'gfid', gauxpfx(), 'ENTRY') + +      def process_change(self, change, done, retry):          pfx = gauxpfx()          clist = [] @@ -997,7 +1088,7 @@ class GMasterChangelogMixin(GMasterCommon):              self.status.inc_value("entry", len(entries))              failures = self.slave.server.entry_ops(entries) -            self.log_failures(failures, 'gfid', gauxpfx(), 'ENTRY') +            self.handle_entry_failures(failures, entries)              self.status.dec_value("entry", len(entries))              # Update Entry stime in Brick Root only in case of Changelog mode diff --git a/geo-replication/syncdaemon/resource.py b/geo-replication/syncdaemon/resource.py index 37f6e1cabc1..275e9fd29ab 100644 --- a/geo-replication/syncdaemon/resource.py +++ b/geo-replication/syncdaemon/resource.py @@ -642,13 +642,14 @@ class Server(object):                                 st['uid'], st['gid'],                                 gf, st['mode'], bn, lnk) -        def entry_purge(op, entry, gfid): +        def entry_purge(op, entry, gfid, e):              # This is an extremely racy code and needs to be fixed ASAP.              # The GFID check here is to be sure that the pargfid/bname              # to be purged is the GFID gotten from the changelog.              # (a stat(changelog_gfid) would also be valid here)              # The race here is between the GFID check and the purge.              if not matching_disk_gfid(gfid, entry): +                collect_failure(e, EEXIST)                  return              if op == 'UNLINK': @@ -661,6 +662,8 @@ class Server(object):                      return er          def collect_failure(e, cmd_ret): +            slv_entry_info = {} +            slv_entry_info['gfid_mismatch'] = False              # We do this for failing fops on Slave              # Master should be logging this              if cmd_ret is None: @@ -669,11 +672,19 @@ class Server(object):              if cmd_ret == EEXIST:                  disk_gfid = cls.gfid_mnt(e['entry'])                  if isinstance(disk_gfid, basestring) and e['gfid'] != disk_gfid: -                    failures.append((e, cmd_ret, disk_gfid)) +                    slv_entry_info['gfid_mismatch'] = True +                    st = lstat(e['entry']) +                    if not isinstance(st, int): +                        if st and stat.S_ISDIR(st.st_mode): +                            slv_entry_info['slave_isdir'] = True +                        else: +                            slv_entry_info['slave_isdir'] = False +                    slv_entry_info['slave_gfid'] = disk_gfid +                    failures.append((e, cmd_ret, slv_entry_info))                  else:                      return False              else: -                failures.append((e, cmd_ret)) +                failures.append((e, cmd_ret, slv_entry_info))              return True @@ -756,7 +767,7 @@ class Server(object):              if op in ['RMDIR', 'UNLINK']:                  # Try once, if rmdir failed with ENOTEMPTY                  # then delete recursively. -                er = entry_purge(op, entry, gfid) +                er = entry_purge(op, entry, gfid, e)                  if isinstance(er, int):                      if er == ENOTEMPTY and op == 'RMDIR':                          # Retry if ENOTEMPTY, ESTALE @@ -855,14 +866,6 @@ class Server(object):                                       [ESTALE, EINVAL, EBUSY])                  failed = collect_failure(e, cmd_ret) -                # If directory creation is failed, return immediately before -                # further processing. Allowing it to further process will -                # cause the entire directory tree to fail syncing to slave. -                # Hence master will log and raise exception if it's -                # directory failure. -                if failed and op == 'MKDIR': -                    return failures -                  # If UID/GID is different than zero that means we are trying                  # create Entry with different UID/GID. Create Entry with                  # UID:0 and GID:0, and then call chown to set UID/GID  | 
