diff options
Diffstat (limited to 'geo-replication')
| -rw-r--r-- | geo-replication/syncdaemon/master.py | 101 | ||||
| -rw-r--r-- | geo-replication/syncdaemon/resource.py | 55 | 
2 files changed, 118 insertions, 38 deletions
diff --git a/geo-replication/syncdaemon/master.py b/geo-replication/syncdaemon/master.py index 2decc5de930..e3904736ba2 100644 --- a/geo-replication/syncdaemon/master.py +++ b/geo-replication/syncdaemon/master.py @@ -29,6 +29,14 @@ from syncdutils import NoPurgeTimeAvailable, PartialHistoryAvailable  URXTIME = (-1, 0) +# Default rollover time set in changelog translator +# changelog rollover time is hardcoded here to avoid the +# xsync usage when crawling switch happens from history +# to changelog. If rollover time increased in translator +# then geo-rep can enter into xsync crawl after history +# crawl before starting live changelog crawl. +CHANGELOG_ROLLOVER_TIME = 15 +  # Utility functions to help us to get to closer proximity  # of the DRY principle (no, don't look for elevated or  # perspectivistic things here) @@ -160,7 +168,10 @@ class NormalMixin(object):              raise GsyncdError("timestamp corruption for " + path)      def need_sync(self, e, xte, xtrd): -        return xte > xtrd +        if self.xsync_upper_limit is None: +            return xte > xtrd +        else: +            return xte > xtrd and xte < self.xsync_upper_limit      def set_slave_xtime(self, path, mark):          self.slave.server.set_stime(path, self.uuid, mark) @@ -431,7 +442,7 @@ class GMasterCommon(object):      def register(self):          self.register() -    def crawlwrap(self, oneshot=False): +    def crawlwrap(self, oneshot=False, no_stime_update=False):          if oneshot:              # it's important to do this during the oneshot crawl as              # for a passive gsyncd (ie. in a replicate scenario) @@ -499,7 +510,7 @@ class GMasterCommon(object):                  time.sleep(5)                  continue              self.update_worker_health("Active") -            self.crawl() +            self.crawl(no_stime_update=no_stime_update)              if oneshot:                  return              time.sleep(self.sleep_interval) @@ -1119,7 +1130,7 @@ class GMasterChangelogMixin(GMasterCommon):              except:                  raise -    def crawl(self): +    def crawl(self, no_stime_update=False):          self.update_worker_crawl_status("Changelog Crawl")          changes = []          # get stime (from the brick) and purge changelogs @@ -1147,20 +1158,25 @@ class GMasterChangelogMixin(GMasterCommon):                  logging.debug('processing changes %s' % repr(changes))                  self.process(changes) -    def register(self, changelog_agent): +    def register(self, register_time, changelog_agent):          self.changelog_agent = changelog_agent          self.sleep_interval = int(gconf.change_interval)          self.changelog_done_func = self.changelog_agent.done  class GMasterChangeloghistoryMixin(GMasterChangelogMixin): -    def register(self, changelog_agent): +    def register(self, register_time, changelog_agent):          self.changelog_agent = changelog_agent -        self.changelog_register_time = int(time.time()) +        self.changelog_register_time = register_time +        self.history_crawl_start_time = register_time          self.changelog_done_func = self.changelog_agent.history_done +        self.history_turns = 0 -    def crawl(self): +    def crawl(self, no_stime_update=False): +        self.history_turns += 1          self.update_worker_crawl_status("History Crawl") +        logging.info('starting history crawl... turns: %s' % +                     self.history_turns)          # get stime (from the brick) and purge changelogs          # that are _historical_ to that time. @@ -1169,11 +1185,9 @@ class GMasterChangeloghistoryMixin(GMasterChangelogMixin):              purge_time = None          if not purge_time or purge_time == URXTIME: +            logging.info("stime not available, abandoning history crawl")              raise NoPurgeTimeAvailable() -        logging.debug("Get changelog history between %s and %s" % -                      (purge_time[0], self.changelog_register_time)) -          # Changelogs backend path is hardcoded as          # <BRICK_PATH>/.glusterfs/changelogs, if user configured to different          # location then consuming history will not work(Known issue as of now) @@ -1210,11 +1224,26 @@ class GMasterChangeloghistoryMixin(GMasterChangelogMixin):                  logging.debug('processing changes %s' % repr(changes))                  self.process(changes) +        history_turn_time = int(time.time()) - self.history_crawl_start_time + +        logging.info('finished history crawl syncing between %s - %s.' % +                     (purge_time[0], actual_end)) +          # If TS returned from history_changelog is < register_time          # then FS crawl may be required, since history is only available          # till TS returned from history_changelog          if actual_end < self.changelog_register_time: -            raise PartialHistoryAvailable(str(actual_end)) +            if self.history_turns < 2: +                sleep_time = 1 +                if history_turn_time < CHANGELOG_ROLLOVER_TIME: +                    sleep_time = CHANGELOG_ROLLOVER_TIME - history_turn_time +                time.sleep(sleep_time) +                self.history_crawl_start_time = int(time.time()) +                self.crawl() +            else: +                # This exeption will be catched in resource.py and +                # fallback to xsync for the small gap. +                raise PartialHistoryAvailable(str(actual_end))  class GMasterXsyncMixin(GMasterChangelogMixin): @@ -1231,7 +1260,7 @@ class GMasterXsyncMixin(GMasterChangelogMixin):      XSYNC_MAX_ENTRIES = 1 << 13 -    def register(self, changelog_agent=None): +    def register(self, register_time, changelog_agent=None):          self.counter = 0          self.comlist = []          self.stimes = [] @@ -1248,7 +1277,18 @@ class GMasterXsyncMixin(GMasterChangelogMixin):              else:                  raise -    def crawl(self): +        # After changelogs history processing completes, it switches +        # to xsync/hibrid crawl if history actual end time is less than +        # live changelog register time. Xsync should only run for that +        # small gap, ie.. changelog_register_time - history_actual_end_time +        # If we don't have upper limit to limit the XSync change detection +        # It will keep on detecting the files even though changelogs are +        # available for the same. Set upper limit during register +        # and reset at the end of each crawl. Reseting at the end of +        # crawl is required if change_detector is set to xsync. +        self.xsync_upper_limit = (register_time, 0) + +    def crawl(self, no_stime_update=False):          """          event dispatcher thread @@ -1265,19 +1305,36 @@ class GMasterXsyncMixin(GMasterChangelogMixin):              try:                  item = self.comlist.pop(0)                  if item[0] == 'finale': -                    logging.info('finished hybrid crawl syncing') +                    if self.xsync_upper_limit is not None: +                        logging.info('finished hybrid crawl syncing, endtime: ' +                                     '%s' % self.xsync_upper_limit[0]) +                    else: +                        logging.info('finished hybrid crawl syncing') +                      break                  elif item[0] == 'xsync':                      logging.info('processing xsync changelog %s' % (item[1]))                      self.process([item[1]], 0)                  elif item[0] == 'stime': -                    logging.debug('setting slave time: %s' % repr(item[1])) -                    self.upd_stime(item[1][1], item[1][0]) +                    if not no_stime_update: +                        # xsync is started after running history but if +                        # history actual end time is less than register time +                        # then if we update stime, live changelog processing +                        # will skip the changelogs for which TS is less than +                        # stime. During this deletes and renames are not +                        # propogated. By not setting stime live changelog will +                        # start processing from the register time. Since we +                        # have xsync_upper_limit their will not be much +                        # overlap/redo of changelogs. +                        logging.debug('setting slave time: %s' % repr(item[1])) +                        self.upd_stime(item[1][1], item[1][0])                  else:                      logging.warn('unknown tuple in comlist (%s)' % repr(item))              except IndexError:                  time.sleep(1) +        self.xsync_upper_limit = None +      def write_entry_change(self, prefix, data=[]):          self.fh.write("%s %s\n" % (prefix, ' '.join(data))) @@ -1402,7 +1459,10 @@ class GMasterXsyncMixin(GMasterChangelogMixin):                  self.write_entry_change("M", [gfid, "SETATTR", str(st.st_uid),                                                str(st.st_gid), str(st.st_mode)])                  self.Xcrawl(e, xtr_root) -                self.stimes.append((e, xte)) +                stime_to_update = xte +                if self.xsync_upper_limit is not None: +                    stime_to_update = min(self.xsync_upper_limit, xte) +                self.stimes.append((e, stime_to_update))              elif stat.S_ISLNK(mo):                  self.write_entry_change(                      "E", [gfid, 'SYMLINK', escape(os.path.join(pargfid, @@ -1426,7 +1486,10 @@ class GMasterXsyncMixin(GMasterChangelogMixin):                                                                  bname))])                  self.write_entry_change("D", [gfid])          if path == '.': -            self.stimes.append((path, xtl)) +            stime_to_update = xtl +            if self.xsync_upper_limit is not None: +                stime_to_update = min(self.xsync_upper_limit, xtl) +            self.stimes.append((path, stime_to_update))              self.sync_done(self.stimes, True) diff --git a/geo-replication/syncdaemon/resource.py b/geo-replication/syncdaemon/resource.py index b537ff65003..c84265739c5 100644 --- a/geo-replication/syncdaemon/resource.py +++ b/geo-replication/syncdaemon/resource.py @@ -1267,6 +1267,7 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):              # register the crawlers and start crawling              # g1 ==> Xsync, g2 ==> config.change_detector(changelog by default)              # g3 ==> changelog History +            changelog_register_failed = False              (inf, ouf, ra, wa) = gconf.rpc_fd.split(',')              os.close(int(ra))              os.close(int(wa)) @@ -1278,7 +1279,6 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):                      "local %s, remote %s" %                      (CHANGELOG_AGENT_CLIENT_VERSION, rv)) -            g1.register()              try:                  workdir = g2.setup_working_dir()                  # register with the changelog library @@ -1288,38 +1288,55 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):                                           workdir, gconf.changelog_log_file,                                           g2.CHANGELOG_LOG_LEVEL,                                           g2.CHANGELOG_CONN_RETRIES) -                g2.register(changelog_agent) -                g3.register(changelog_agent) -            except ChangelogException as e: -                logging.debug("Changelog register failed: %s - %s" % -                              (e.errno, e.strerror)) - +                register_time = int(time.time()) +                g2.register(register_time, changelog_agent) +                g3.register(register_time, changelog_agent) +            except ChangelogException: +                changelog_register_failed = True +                register_time = int(time.time()) +                logging.info("Changelog register failed, fallback to xsync") + +            g1.register(register_time) +            logging.info("Register time: %s" % register_time)              # oneshot: Try to use changelog history api, if not              # available switch to FS crawl              # Note: if config.change_detector is xsync then              # it will not use changelog history api              try: -                g3.crawlwrap(oneshot=True) +                if not changelog_register_failed: +                    g3.crawlwrap(oneshot=True) +                else: +                    g1.crawlwrap(oneshot=True)              except (ChangelogException, NoPurgeTimeAvailable,                      PartialHistoryAvailable) as e:                  if isinstance(e, ChangelogException): -                    logging.debug('Changelog history crawl failed, failback ' -                                  'to xsync: %s - %s' % (e.errno, e.strerror)) -                elif isinstance(e, NoPurgeTimeAvailable): -                    logging.debug('Using xsync crawl since no purge time ' -                                  'available') +                    logging.info('Changelog history crawl failed, fallback ' +                                 'to xsync: %s - %s' % (e.errno, e.strerror))                  elif isinstance(e, PartialHistoryAvailable): -                    logging.debug('Using xsync crawl after consuming history ' -                                  'till %s' % str(e)) -                g1.crawlwrap(oneshot=True) +                    logging.info('Partial history available, using xsync crawl' +                                 ' after consuming history ' +                                 'till %s' % str(e)) +                g1.crawlwrap(oneshot=True, no_stime_update=True) + +            # Reset xsync upper limit. g2, g3 are changelog and history +            # instances, but if change_detector is set to xsync then +            # g1, g2, g3 will be xsync instances. +            g1.xsync_upper_limit = None +            if getattr(g2, "xsync_upper_limit", None) is not None: +                g2.xsync_upper_limit = None + +            if getattr(g3, "xsync_upper_limit", None) is not None: +                g3.xsync_upper_limit = None              # crawl loop: Try changelog crawl, if failed              # switch to FS crawl              try: -                g2.crawlwrap() +                if not changelog_register_failed: +                    g2.crawlwrap() +                else: +                    g1.crawlwrap()              except ChangelogException as e: -                logging.debug('Changelog crawl failed, failback to xsync: ' -                              '%s - %s' % (e.errno, e.strerror)) +                logging.info('Changelog crawl failed, fallback to xsync')                  g1.crawlwrap()          else:              sup(self, *args)  | 
