diff options
| -rw-r--r-- | geo-replication/syncdaemon/master.py | 50 | ||||
| -rw-r--r-- | geo-replication/syncdaemon/resource.py | 5 | 
2 files changed, 38 insertions, 17 deletions
diff --git a/geo-replication/syncdaemon/master.py b/geo-replication/syncdaemon/master.py index 79c90630dca..d8d26baafe1 100644 --- a/geo-replication/syncdaemon/master.py +++ b/geo-replication/syncdaemon/master.py @@ -239,6 +239,7 @@ class TarSSHEngine(object):      """      def a_syncdata(self, files): +        self.unlinked_gfids = []          logging.debug('files: %s' % (files))          for f in files:              pb = self.syncer.add(f) @@ -252,6 +253,7 @@ class TarSSHEngine(object):                      # stat check for file presence                      st = lstat(se)                      if isinstance(st, int): +                        self.unlinked_gfids.append(se)                          return True                      logging.warn('tar+ssh: %s [errcode: %d]' % (se, rv[1]))              self.add_job(self.FLAT_DIR_HIERARCHY, 'reg', regjob, f, None, pb) @@ -270,6 +272,7 @@ class RsyncEngine(object):      """Sync engine that uses rsync(1) for data transfers"""      def a_syncdata(self, files): +        self.unlinked_gfids = []          logging.debug('files: %s' % (files))          for f in files:              logging.debug('candidate for syncing %s' % f) @@ -286,6 +289,7 @@ class RsyncEngine(object):                          st = lstat(se)                          if isinstance(st, int):                              # file got unlinked in the interim +                            self.unlinked_gfids.append(se)                              return True                      logging.warn('Rsync: %s [errcode: %d]' % (se, rv[1]))              self.add_job(self.FLAT_DIR_HIERARCHY, 'reg', regjob, f, None, pb) @@ -423,6 +427,7 @@ class GMasterCommon(object):          self.checkpoint_thread = None          self.current_files_skipped_count = 0          self.skipped_gfid_list = [] +        self.unlinked_gfids = []      def init_keep_alive(cls):          """start the keep-alive thread """ @@ -797,9 +802,10 @@ class GMasterChangelogMixin(GMasterCommon):                  if k == 'stat':                      st = ed[k]                      dst = dct['stat'] = {} -                    dst['uid'] = st.st_uid -                    dst['gid'] = st.st_gid -                    dst['mode'] = st.st_mode +                    if st: +                        dst['uid'] = st.st_uid +                        dst['gid'] = st.st_gid +                        dst['mode'] = st.st_mode                  else:                      dct[k] = ed[k]              return dct @@ -830,6 +836,12 @@ class GMasterChangelogMixin(GMasterCommon):                  gfid = ec[self.POS_GFID]                  if ty in ['UNLINK', 'RMDIR']: +                    # Remove from DATA list, so that rsync will +                    # not fail +                    pt = os.path.join(pfx, ec[0]) +                    if pt in datas: +                        datas.remove(pt) +                      purge_update()                      entries.append(edct(ty, gfid=gfid, entry=en))                  elif ty in ['CREATE', 'MKDIR', 'MKNOD']: @@ -838,16 +850,22 @@ class GMasterChangelogMixin(GMasterCommon):                      entries.append(edct(ty, gfid=gfid, entry=en,                                          mode=int(ec[2]),                                          uid=int(ec[3]), gid=int(ec[4]))) +                elif ty == "RENAME": +                    go = os.path.join(pfx, gfid) +                    st = lstat(go) +                    if isinstance(st, int): +                        st = {} + +                    entry_update() +                    e1 = unescape(os.path.join(pfx, ec[self.POS_ENTRY1 - 1])) +                    entries.append(edct(ty, gfid=gfid, entry=e1, entry1=en, +                                        stat=st))                  else:                      # stat() to get mode and other information                      go = os.path.join(pfx, gfid)                      st = lstat(go)                      if isinstance(st, int): -                        if ty == 'RENAME':  # special hack for renames... -                            entries.append(edct('UNLINK', gfid=gfid, entry=en)) -                        else: -                            logging.debug( -                                'file %s got purged in the interim' % go) +                        logging.debug('file %s got purged in the interim' % go)                          continue                      if ty == 'LINK': @@ -860,16 +878,17 @@ class GMasterChangelogMixin(GMasterCommon):                          entry_update()                          entries.append(                              edct(ty, stat=st, entry=en, gfid=gfid, link=rl)) -                    elif ty == 'RENAME': -                        entry_update() -                        e1 = unescape( -                            os.path.join(pfx, ec[self.POS_ENTRY1 - 1])) -                        entries.append( -                            edct(ty, gfid=gfid, entry=e1, entry1=en, stat=st))                      else:                          logging.warn('ignoring %s [op %s]' % (gfid, ty))              elif et == self.TYPE_GFID: -                datas.add(os.path.join(pfx, ec[0])) +                # If self.unlinked_gfids is available, then that means it is +                # retrying the changelog second time. Do not add the GFID's +                # to rsync job if failed previously but unlinked in master +                if self.unlinked_gfids and \ +                   os.path.join(pfx, ec[0]) in self.unlinked_gfids: +                    logging.debug("ignoring data, since file purged interim") +                else: +                    datas.add(os.path.join(pfx, ec[0]))              elif et == self.TYPE_META:                  if ec[1] == 'SETATTR':  # only setattr's for now...                      if len(ec) == 5: @@ -950,6 +969,7 @@ class GMasterChangelogMixin(GMasterCommon):              # @change is the last changelog (therefore max time for this batch)              if self.syncdata_wait(): +                self.unlinked_gfids = []                  if done:                      xtl = (int(change.split('.')[-1]) - 1, 0)                      self.upd_stime(xtl) diff --git a/geo-replication/syncdaemon/resource.py b/geo-replication/syncdaemon/resource.py index 2a887daab15..1bee0a3338f 100644 --- a/geo-replication/syncdaemon/resource.py +++ b/geo-replication/syncdaemon/resource.py @@ -651,8 +651,9 @@ class Server(object):                  en = e['entry1']                  st = lstat(entry)                  if isinstance(st, int): -                    (pg, bname) = entry2pb(en) -                    blob = entry_pack_reg_stat(gfid, bname, e['stat']) +                    if e['stat']: +                        (pg, bname) = entry2pb(en) +                        blob = entry_pack_reg_stat(gfid, bname, e['stat'])                  else:                      errno_wrap(os.rename, [entry, en], [ENOENT, EEXIST])              if blob:  | 
