diff options
| -rw-r--r-- | geo-replication/syncdaemon/master.py | 108 | ||||
| -rw-r--r-- | geo-replication/syncdaemon/resource.py | 49 | 
2 files changed, 70 insertions, 87 deletions
diff --git a/geo-replication/syncdaemon/master.py b/geo-replication/syncdaemon/master.py index e743fdf2e50..8d2158fb406 100644 --- a/geo-replication/syncdaemon/master.py +++ b/geo-replication/syncdaemon/master.py @@ -245,8 +245,7 @@ class TarSSHEngine(object):      def a_syncdata(self, files):          logging.debug('files: %s' % (files)) -        self.current_files_skipped_count = 0 -        del self.skipped_gfid_list[:] +          for f in files:              pb = self.syncer.add(f) @@ -260,12 +259,9 @@ class TarSSHEngine(object):                      st = lstat(se)                      if isinstance(st, int):                          # file got unlinked in the interim -                        self.unlinked_gfids.append(se) +                        self.unlinked_gfids.add(se)                          return True -                    se_list = se.split('/') -                    self.current_files_skipped_count += 1 -                    self.skipped_gfid_list.append(se_list[1])              self.add_job(self.FLAT_DIR_HIERARCHY, 'reg', regjob, f, None, pb)      def syncdata_wait(self): @@ -283,8 +279,7 @@ class RsyncEngine(object):      def a_syncdata(self, files):          logging.debug('files: %s' % (files)) -        self.current_files_skipped_count = 0 -        del self.skipped_gfid_list[:] +          for f in files:              logging.debug('candidate for syncing %s' % f)              pb = self.syncer.add(f) @@ -299,12 +294,9 @@ class RsyncEngine(object):                      st = lstat(se)                      if isinstance(st, int):                          # file got unlinked in the interim -                        self.unlinked_gfids.append(se) +                        self.unlinked_gfids.add(se)                          return True -                    se_list = se.split('/') -                    self.current_files_skipped_count += 1 -                    self.skipped_gfid_list.append(se_list[1])              self.add_job(self.FLAT_DIR_HIERARCHY, 'reg', regjob, f, None, pb)      def syncdata_wait(self): @@ -364,35 +356,6 @@ class GMasterCommon(object):          self.make_xtime_opts(rsc == self.master, opts)          return self.xtime_low(rsc, path, **opts) -    def get_initial_crawl_data(self): -        # while persisting only 'files_syncd' is non-zero, rest of -        # the stats are nulls. lets keep it that way in case they -        # are needed to be used some day... -        default_data = {'files_syncd': 0, -                        'files_remaining': 0, -                        'bytes_remaining': 0, -                        'purges_remaining': 0, -                        'total_files_skipped': 0} -        if getattr(gconf, 'state_detail_file', None): -            try: -                with open(gconf.state_detail_file, 'r+') as f: -                    loaded_data = json.load(f) -                    diff_data = set(default_data) - set(loaded_data) -                    if len(diff_data): -                        for i in diff_data: -                            loaded_data[i] = default_data[i] -                    return loaded_data -            except IOError: -                logging.warn('Creating new gconf.state_detail_file.') -                # Create file with initial data -                try: -                    with open(gconf.state_detail_file, 'wb') as f: -                        json.dump(default_data, f) -                    return default_data -                except: -                    raise -        return default_data -      def __init__(self, master, slave):          self.master = master          self.slave = slave @@ -424,9 +387,7 @@ class GMasterCommon(object):          self.volinfo = None          self.terminate = False          self.sleep_interval = 1 -        self.current_files_skipped_count = 0 -        self.skipped_gfid_list = [] -        self.unlinked_gfids = [] +        self.unlinked_gfids = set()      def init_keep_alive(cls):          """start the keep-alive thread """ @@ -819,7 +780,8 @@ class GMasterChangelogMixin(GMasterCommon):                  st = lstat(os.path.join(gfid_prefix, failure[0][entry_key]))                  if not isinstance(st, int):                      num_failures += 1 -                    logging.warn('%s FAILED: %s' % (log_prefix, repr(failure))) +                    logging.error('%s FAILED: %s' % (log_prefix, +                                                     repr(failure)))              self.status.inc_value("failures", num_failures) @@ -994,17 +956,17 @@ class GMasterChangelogMixin(GMasterCommon):          # sync data          if datas:              self.a_syncdata(datas) +            self.datas_in_batch.update(datas)      def process(self, changes, done=1):          tries = 0          retry = False -        self.unlinked_gfids = [] +        self.unlinked_gfids = set()          self.files_in_batch = 0 +        self.datas_in_batch = set() +        self.syncer.disable_errorlog()          while True: -            self.skipped_gfid_list = [] -            self.current_files_skipped_count = 0 -              # first, fire all changelog transfers in parallel. entry and              # metadata are performed synchronously, therefore in serial.              # However at the end of each changelog, data is synchronized @@ -1012,12 +974,25 @@ class GMasterChangelogMixin(GMasterCommon):              # entries/metadata of that changelog but happens in parallel              # with data of other changelogs. -            for change in changes: -                logging.debug('processing change %s' % change) -                self.process_change(change, done, retry) -                if not retry: -                    # number of changelogs processed in the batch -                    self.turns += 1 +            if retry: +                if tries == (self.MAX_RETRIES - 1): +                    # Enable Error logging if it is last retry +                    self.syncer.enable_errorlog() + +                # Remove Unlinked GFIDs from Queue +                for unlinked_gfid in self.unlinked_gfids: +                    self.datas_in_batch.remove(unlinked_gfid) + +                # Retry only Sync. Do not retry entry ops +                if self.datas_in_batch: +                    self.a_syncdata(self.datas_in_batch) +            else: +                for change in changes: +                    logging.debug('processing change %s' % change) +                    self.process_change(change, done, retry) +                    if not retry: +                        # number of changelogs processed in the batch +                        self.turns += 1              # Now we wait for all the data transfers fired off in the above              # step to complete. Note that this is not ideal either. Ideally @@ -1040,7 +1015,7 @@ class GMasterChangelogMixin(GMasterCommon):              # @change is the last changelog (therefore max time for this batch)              if self.syncdata_wait(): -                self.unlinked_gfids = [] +                self.unlinked_gfids = set()                  if done:                      xtl = (int(change.split('.')[-1]) - 1, 0)                      self.upd_stime(xtl) @@ -1050,23 +1025,21 @@ class GMasterChangelogMixin(GMasterCommon):                  # Reset Data counter after sync                  self.status.dec_value("data", self.files_in_batch)                  self.files_in_batch = 0 +                self.datas_in_batch = set()                  break              # We do not know which changelog transfer failed, retry everything.              retry = True              tries += 1              if tries == self.MAX_RETRIES: -                logging.warn('changelogs %s could not be processed - ' -                             'moving on...' % -                             ' '.join(map(os.path.basename, changes))) -                self.status.inc_value("failures", -                                      self.current_files_skipped_count) -                logging.warn('SKIPPED GFID = %s' % -                             ','.join(self.skipped_gfid_list)) +                logging.error('changelogs %s could not be processed ' +                              'completely - moving on...' % +                              ' '.join(map(os.path.basename, changes)))                  # Reset data counter on failure                  self.status.dec_value("data", self.files_in_batch)                  self.files_in_batch = 0 +                self.datas_in_batch = set()                  if done:                      xtl = (int(change.split('.')[-1]) - 1, 0) @@ -1570,6 +1543,7 @@ class Syncer(object):      def __init__(self, slave, sync_engine, resilient_errnos=[]):          """spawn worker threads""" +        self.log_err = False          self.slave = slave          self.lock = Lock()          self.pb = PostBox() @@ -1592,7 +1566,7 @@ class Syncer(object):                      break                  time.sleep(0.5)              pb.close() -            po = self.sync_engine(pb) +            po = self.sync_engine(pb, self.log_err)              if po.returncode == 0:                  ret = (True, 0)              elif po.returncode in self.errnos_ok: @@ -1609,3 +1583,9 @@ class Syncer(object):                  return pb              except BoxClosedErr:                  pass + +    def enable_errorlog(self): +        self.log_err = True + +    def disable_errorlog(self): +        self.log_err = False diff --git a/geo-replication/syncdaemon/resource.py b/geo-replication/syncdaemon/resource.py index 19363401e65..ac697eb39ed 100644 --- a/geo-replication/syncdaemon/resource.py +++ b/geo-replication/syncdaemon/resource.py @@ -906,7 +906,7 @@ class SlaveRemote(object):                  "RePCe major version mismatch: local %s, remote %s" %                  (exrv, rv)) -    def rsync(self, files, *args): +    def rsync(self, files, *args, **kw):          """invoke rsync"""          if not files:              raise GsyncdError("no files to sync") @@ -932,12 +932,15 @@ class SlaveRemote(object):              po.stdin.write(f)              po.stdin.write('\0') -        po.stdin.close() +        stdout, stderr = po.communicate() + +        if kw.get("log_err", False): +            for errline in stderr.strip().split("\n")[:-1]: +                logging.error("SYNC Error(Rsync): %s" % errline)          if gconf.log_rsync_performance: -            out = po.stdout.read()              rsync_msg = [] -            for line in out.split("\n"): +            for line in stdout.split("\n"):                  if line.startswith("Number of files:") or \                     line.startswith("Number of regular files transferred:") or \                     line.startswith("Total file size:") or \ @@ -949,12 +952,10 @@ class SlaveRemote(object):                     line.startswith("sent "):                      rsync_msg.append(line)              logging.info("rsync performance: %s" % ", ".join(rsync_msg)) -        po.wait() -        po.terminate_geterr(fail_on_err=False)          return po -    def tarssh(self, files, slaveurl): +    def tarssh(self, files, slaveurl, log_err=False):          """invoke tar+ssh          -z (compress) can be use if needed, but omitting it now          as it results in weird error (tar+ssh errors out (errcode: 2) @@ -975,15 +976,16 @@ class SlaveRemote(object):          for f in files:              p0.stdin.write(f)              p0.stdin.write('\n') -        p0.stdin.close() -        # wait() for tar to terminate, collecting any errors, further +        p0.stdin.close() +        p0.stdout.close()  # Allow p0 to receive a SIGPIPE if p1 exits. +        # wait for tar to terminate, collecting any errors, further          # waiting for transfer to complete -        p0.wait() -        p0.terminate_geterr(fail_on_err=False) +        _, stderr1 = p1.communicate() -        p1.wait() -        p1.terminate_geterr(fail_on_err=False) +        if log_err: +            for errline in stderr1.strip().split("\n")[:-1]: +                logging.error("SYNC Error(Untar): %s" % errline)          return p1 @@ -1045,8 +1047,8 @@ class FILE(AbstractUrl, SlaveLocal, SlaveRemote):          """inhibit the resource beyond"""          os.chdir(self.path) -    def rsync(self, files): -        return sup(self, files, self.path) +    def rsync(self, files, log_err=False): +        return sup(self, files, self.path, log_err=log_err)  class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote): @@ -1460,11 +1462,11 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):          else:              sup(self, *args) -    def rsync(self, files): -        return sup(self, files, self.slavedir) +    def rsync(self, files, log_err=False): +        return sup(self, files, self.slavedir, log_err=log_err) -    def tarssh(self, files): -        return sup(self, files, self.slavedir) +    def tarssh(self, files, log_err=False): +        return sup(self, files, self.slavedir, log_err=log_err)  class SSH(AbstractUrl, SlaveRemote): @@ -1571,12 +1573,13 @@ class SSH(AbstractUrl, SlaveRemote):              self.fd_pair = (i, o)              return 'should' -    def rsync(self, files): +    def rsync(self, files, log_err=False):          return sup(self, files, '-e',                     " ".join(gconf.ssh_command.split() +                              ["-p", str(gconf.ssh_port)] +                              gconf.ssh_ctl_args), -                   *(gconf.rsync_ssh_options.split() + [self.slaveurl])) +                   *(gconf.rsync_ssh_options.split() + [self.slaveurl]), +                   log_err=log_err) -    def tarssh(self, files): -        return sup(self, files, self.slaveurl) +    def tarssh(self, files, log_err=False): +        return sup(self, files, self.slaveurl, log_err=log_err)  | 
