diff options
| author | Aravinda VK <avishwan@redhat.com> | 2015-12-02 19:37:55 +0530 | 
|---|---|---|
| committer | Aravinda VK <avishwan@redhat.com> | 2016-03-08 01:59:17 -0800 | 
| commit | 8883c12216cc0c0770a4207e1e2a62fa16dc1528 (patch) | |
| tree | 7dca2d3793ecd2c2b36100abf2843e2737946ad8 /geo-replication/syncdaemon/master.py | |
| parent | f3b8a931b00cfd0ecee46599ed1ef1aaf236e148 (diff) | |
geo-rep: Handling Rsync/Tar errors efficiently
Geo-rep processes Changelogs in Batch, if one file in batch
fails with rsync error that Changelog file is reprocessed multiple times.
After MAX_RETRY, it logs all the GFIDs from that batch as Skipped.
This patch addresses following issues,
1. When Rsync/Tar fails do not parse Changelog again for retry
2. When Rsync/Tar fails do not replay Entry operations, only retry
   rsync/tar for those GFIDs
3. Log Error in Rsync/Tar only in the last Retry
4. Do not log Skipped GFIDs since Rsync/Tar errors are logged for
   only failed files.
5. Changed Entry failures as Error instead of Warning
BUG: 1313309
Change-Id: Ie134ce2572693056ab9b9008cd8aa5b5d87f7975
Signed-off-by: Aravinda VK <avishwan@redhat.com>
Reviewed-on: http://review.gluster.org/12856
Reviewed-by: Kotresh HR <khiremat@redhat.com>
Reviewed-by: Saravanakumar Arumugam <sarumuga@redhat.com>
CentOS-regression: Gluster Build System <jenkins@build.gluster.com>
Smoke: Gluster Build System <jenkins@build.gluster.com>
NetBSD-regression: NetBSD Build System <jenkins@build.gluster.org>
Reviewed-by: Venky Shankar <vshankar@redhat.com>
(cherry picked from commit d136a789258e8f600e536717da156a242d8ed9a5)
Reviewed-on: http://review.gluster.org/13558
Diffstat (limited to 'geo-replication/syncdaemon/master.py')
| -rw-r--r-- | geo-replication/syncdaemon/master.py | 108 | 
1 files changed, 44 insertions, 64 deletions
diff --git a/geo-replication/syncdaemon/master.py b/geo-replication/syncdaemon/master.py index e743fdf2e50..8d2158fb406 100644 --- a/geo-replication/syncdaemon/master.py +++ b/geo-replication/syncdaemon/master.py @@ -245,8 +245,7 @@ class TarSSHEngine(object):      def a_syncdata(self, files):          logging.debug('files: %s' % (files)) -        self.current_files_skipped_count = 0 -        del self.skipped_gfid_list[:] +          for f in files:              pb = self.syncer.add(f) @@ -260,12 +259,9 @@ class TarSSHEngine(object):                      st = lstat(se)                      if isinstance(st, int):                          # file got unlinked in the interim -                        self.unlinked_gfids.append(se) +                        self.unlinked_gfids.add(se)                          return True -                    se_list = se.split('/') -                    self.current_files_skipped_count += 1 -                    self.skipped_gfid_list.append(se_list[1])              self.add_job(self.FLAT_DIR_HIERARCHY, 'reg', regjob, f, None, pb)      def syncdata_wait(self): @@ -283,8 +279,7 @@ class RsyncEngine(object):      def a_syncdata(self, files):          logging.debug('files: %s' % (files)) -        self.current_files_skipped_count = 0 -        del self.skipped_gfid_list[:] +          for f in files:              logging.debug('candidate for syncing %s' % f)              pb = self.syncer.add(f) @@ -299,12 +294,9 @@ class RsyncEngine(object):                      st = lstat(se)                      if isinstance(st, int):                          # file got unlinked in the interim -                        self.unlinked_gfids.append(se) +                        self.unlinked_gfids.add(se)                          return True -                    se_list = se.split('/') -                    self.current_files_skipped_count += 1 -                    self.skipped_gfid_list.append(se_list[1])              self.add_job(self.FLAT_DIR_HIERARCHY, 'reg', regjob, f, None, pb)      def syncdata_wait(self): @@ -364,35 +356,6 @@ class GMasterCommon(object):          self.make_xtime_opts(rsc == self.master, opts)          return self.xtime_low(rsc, path, **opts) -    def get_initial_crawl_data(self): -        # while persisting only 'files_syncd' is non-zero, rest of -        # the stats are nulls. lets keep it that way in case they -        # are needed to be used some day... -        default_data = {'files_syncd': 0, -                        'files_remaining': 0, -                        'bytes_remaining': 0, -                        'purges_remaining': 0, -                        'total_files_skipped': 0} -        if getattr(gconf, 'state_detail_file', None): -            try: -                with open(gconf.state_detail_file, 'r+') as f: -                    loaded_data = json.load(f) -                    diff_data = set(default_data) - set(loaded_data) -                    if len(diff_data): -                        for i in diff_data: -                            loaded_data[i] = default_data[i] -                    return loaded_data -            except IOError: -                logging.warn('Creating new gconf.state_detail_file.') -                # Create file with initial data -                try: -                    with open(gconf.state_detail_file, 'wb') as f: -                        json.dump(default_data, f) -                    return default_data -                except: -                    raise -        return default_data -      def __init__(self, master, slave):          self.master = master          self.slave = slave @@ -424,9 +387,7 @@ class GMasterCommon(object):          self.volinfo = None          self.terminate = False          self.sleep_interval = 1 -        self.current_files_skipped_count = 0 -        self.skipped_gfid_list = [] -        self.unlinked_gfids = [] +        self.unlinked_gfids = set()      def init_keep_alive(cls):          """start the keep-alive thread """ @@ -819,7 +780,8 @@ class GMasterChangelogMixin(GMasterCommon):                  st = lstat(os.path.join(gfid_prefix, failure[0][entry_key]))                  if not isinstance(st, int):                      num_failures += 1 -                    logging.warn('%s FAILED: %s' % (log_prefix, repr(failure))) +                    logging.error('%s FAILED: %s' % (log_prefix, +                                                     repr(failure)))              self.status.inc_value("failures", num_failures) @@ -994,17 +956,17 @@ class GMasterChangelogMixin(GMasterCommon):          # sync data          if datas:              self.a_syncdata(datas) +            self.datas_in_batch.update(datas)      def process(self, changes, done=1):          tries = 0          retry = False -        self.unlinked_gfids = [] +        self.unlinked_gfids = set()          self.files_in_batch = 0 +        self.datas_in_batch = set() +        self.syncer.disable_errorlog()          while True: -            self.skipped_gfid_list = [] -            self.current_files_skipped_count = 0 -              # first, fire all changelog transfers in parallel. entry and              # metadata are performed synchronously, therefore in serial.              # However at the end of each changelog, data is synchronized @@ -1012,12 +974,25 @@ class GMasterChangelogMixin(GMasterCommon):              # entries/metadata of that changelog but happens in parallel              # with data of other changelogs. -            for change in changes: -                logging.debug('processing change %s' % change) -                self.process_change(change, done, retry) -                if not retry: -                    # number of changelogs processed in the batch -                    self.turns += 1 +            if retry: +                if tries == (self.MAX_RETRIES - 1): +                    # Enable Error logging if it is last retry +                    self.syncer.enable_errorlog() + +                # Remove Unlinked GFIDs from Queue +                for unlinked_gfid in self.unlinked_gfids: +                    self.datas_in_batch.remove(unlinked_gfid) + +                # Retry only Sync. Do not retry entry ops +                if self.datas_in_batch: +                    self.a_syncdata(self.datas_in_batch) +            else: +                for change in changes: +                    logging.debug('processing change %s' % change) +                    self.process_change(change, done, retry) +                    if not retry: +                        # number of changelogs processed in the batch +                        self.turns += 1              # Now we wait for all the data transfers fired off in the above              # step to complete. Note that this is not ideal either. Ideally @@ -1040,7 +1015,7 @@ class GMasterChangelogMixin(GMasterCommon):              # @change is the last changelog (therefore max time for this batch)              if self.syncdata_wait(): -                self.unlinked_gfids = [] +                self.unlinked_gfids = set()                  if done:                      xtl = (int(change.split('.')[-1]) - 1, 0)                      self.upd_stime(xtl) @@ -1050,23 +1025,21 @@ class GMasterChangelogMixin(GMasterCommon):                  # Reset Data counter after sync                  self.status.dec_value("data", self.files_in_batch)                  self.files_in_batch = 0 +                self.datas_in_batch = set()                  break              # We do not know which changelog transfer failed, retry everything.              retry = True              tries += 1              if tries == self.MAX_RETRIES: -                logging.warn('changelogs %s could not be processed - ' -                             'moving on...' % -                             ' '.join(map(os.path.basename, changes))) -                self.status.inc_value("failures", -                                      self.current_files_skipped_count) -                logging.warn('SKIPPED GFID = %s' % -                             ','.join(self.skipped_gfid_list)) +                logging.error('changelogs %s could not be processed ' +                              'completely - moving on...' % +                              ' '.join(map(os.path.basename, changes)))                  # Reset data counter on failure                  self.status.dec_value("data", self.files_in_batch)                  self.files_in_batch = 0 +                self.datas_in_batch = set()                  if done:                      xtl = (int(change.split('.')[-1]) - 1, 0) @@ -1570,6 +1543,7 @@ class Syncer(object):      def __init__(self, slave, sync_engine, resilient_errnos=[]):          """spawn worker threads""" +        self.log_err = False          self.slave = slave          self.lock = Lock()          self.pb = PostBox() @@ -1592,7 +1566,7 @@ class Syncer(object):                      break                  time.sleep(0.5)              pb.close() -            po = self.sync_engine(pb) +            po = self.sync_engine(pb, self.log_err)              if po.returncode == 0:                  ret = (True, 0)              elif po.returncode in self.errnos_ok: @@ -1609,3 +1583,9 @@ class Syncer(object):                  return pb              except BoxClosedErr:                  pass + +    def enable_errorlog(self): +        self.log_err = True + +    def disable_errorlog(self): +        self.log_err = False  | 
