diff options
Diffstat (limited to 'geo-replication/syncdaemon/master.py')
| -rw-r--r-- | geo-replication/syncdaemon/master.py | 93 | 
1 files changed, 75 insertions, 18 deletions
diff --git a/geo-replication/syncdaemon/master.py b/geo-replication/syncdaemon/master.py index 4301396f9f4..3047c99050e 100644 --- a/geo-replication/syncdaemon/master.py +++ b/geo-replication/syncdaemon/master.py @@ -25,6 +25,7 @@ from tempfile import NamedTemporaryFile  from syncdutils import Thread, GsyncdError, boolify, escape  from syncdutils import unescape, select, gauxpfx, md5hex, selfkill  from syncdutils import lstat, errno_wrap +from syncdutils import NoPurgeTimeAvailable, PartialHistoryAvailable  URXTIME = (-1, 0) @@ -904,7 +905,7 @@ class GMasterChangelogMixin(GMasterCommon):                  if done:                      xtl = (int(change.split('.')[-1]) - 1, 0)                      self.upd_stime(xtl) -                    map(self.master.server.changelog_done, changes) +                    map(self.changelog_done_func, changes)                  self.update_worker_files_syncd()                  break @@ -923,7 +924,7 @@ class GMasterChangelogMixin(GMasterCommon):                  if done:                      xtl = (int(change.split('.')[-1]) - 1, 0)                      self.upd_stime(xtl) -                    map(self.master.server.changelog_done, changes) +                    map(self.changelog_done_func, changes)                  break              # it's either entry_ops() or Rsync that failed to do it's              # job. Mostly it's entry_ops() [which currently has a problem @@ -1106,12 +1107,9 @@ class GMasterChangelogMixin(GMasterCommon):          purge_time = self.xtime('.', self.slave)          if isinstance(purge_time, int):              purge_time = None -        try: -            self.master.server.changelog_scan() -            self.crawls += 1 -        except OSError: -            self.fallback_xsync() -            self.update_worker_crawl_status("Hybrid Crawl") + +        self.master.server.changelog_scan() +        self.crawls += 1          changes = self.master.server.changelog_getchanges()          if changes:              if purge_time: @@ -1124,23 +1122,82 @@ class GMasterChangelogMixin(GMasterCommon):                          os.path.basename(pr))                      self.master.server.changelog_done(pr)                      changes.remove(pr) -            logging.debug('processing changes %s' % repr(changes)) +              if changes: +                logging.debug('processing changes %s' % repr(changes))                  self.process(changes)      def register(self):          (workdir, logfile) = self.setup_working_dir()          self.sleep_interval = int(gconf.change_interval) +        self.changelog_done_func = self.master.server.changelog_done          # register with the changelog library -        try: -            # 9 == log level (DEBUG) -            # 5 == connection retries -            self.master.server.changelog_register(gconf.local_path, -                                                  workdir, logfile, 9, 5) -        except OSError: -            self.fallback_xsync() -            # control should not reach here -            raise +        # 9 == log level (DEBUG) +        # 5 == connection retries +        self.master.server.changelog_register(gconf.local_path, +                                              workdir, logfile, 9, 5) + + +class GMasterChangeloghistoryMixin(GMasterChangelogMixin): +    def register(self): +        super(GMasterChangeloghistoryMixin, self).register() +        self.changelog_register_time = int(time.time()) +        self.changelog_done_func = self.master.server.history_changelog_done + +    def crawl(self): +        self.update_worker_crawl_status("History Crawl") + +        # get stime (from the brick) and purge changelogs +        # that are _historical_ to that time. +        purge_time = self.xtime('.', self.slave) +        if isinstance(purge_time, int): +            purge_time = None + +        if not purge_time or purge_time == URXTIME: +            raise NoPurgeTimeAvailable() + +        logging.debug("Get changelog history between %s and %s" % +                      (purge_time[0], self.changelog_register_time)) + +        # Changelogs backend path is hardcoded as +        # <BRICK_PATH>/.glusterfs/changelogs, if user configured to different +        # location then consuming history will not work(Known issue as of now) +        changelog_path = os.path.join(gconf.local_path, +                                      ".glusterfs/changelogs") +        ts = self.master.server.history_changelog(changelog_path, +                                                  purge_time[0], +                                                  self.changelog_register_time) + +        # scan followed by getchanges till scan returns zero. +        # history_changelog_scan() is blocking call, till it gets the number +        # of changelogs to process. Returns zero when no changelogs +        # to be processed. returns positive value as number of changelogs +        # to be processed, which will be fetched using +        # history_changelog_getchanges() +        while self.master.server.history_changelog_scan() > 0: +            self.crawls += 1 + +            changes = self.master.server.history_changelog_getchanges() +            if changes: +                if purge_time: +                    logging.info("slave's time: %s" % repr(purge_time)) +                    processed = [x for x in changes +                                 if int(x.split('.')[-1]) < purge_time[0]] +                    for pr in processed: +                        logging.info('skipping already processed change: ' +                                     '%s...' % os.path.basename(pr)) +                        self.changelog_done_func(pr) +                        changes.remove(pr) + +            if changes: +                logging.debug('processing changes %s' % repr(changes)) +                self.process(changes) + +        # If TS returned from history_changelog is < register_time +        # then FS crawl may be required, since history is only available +        # till TS returned from history_changelog +        if ts < self.changelog_register_time: +            raise PartialHistoryAvailable(str(ts))  class GMasterXsyncMixin(GMasterChangelogMixin):  | 
