diff options
| -rw-r--r-- | geo-replication/syncdaemon/libgfchangelog.py | 62 | ||||
| -rw-r--r-- | geo-replication/syncdaemon/master.py | 93 | ||||
| -rw-r--r-- | geo-replication/syncdaemon/resource.py | 66 | ||||
| -rw-r--r-- | geo-replication/syncdaemon/syncdutils.py | 8 | 
4 files changed, 198 insertions, 31 deletions
diff --git a/geo-replication/syncdaemon/libgfchangelog.py b/geo-replication/syncdaemon/libgfchangelog.py index ec563b36f29..0fa32a73499 100644 --- a/geo-replication/syncdaemon/libgfchangelog.py +++ b/geo-replication/syncdaemon/libgfchangelog.py @@ -13,6 +13,10 @@ from ctypes import CDLL, create_string_buffer, get_errno  from ctypes.util import find_library +class ChangelogException(OSError): +    pass + +  class Changes(object):      libgfc = CDLL(find_library("gfchangelog"), use_errno=True) @@ -21,9 +25,9 @@ class Changes(object):          return get_errno()      @classmethod -    def raise_oserr(cls): +    def raise_changelog_err(cls):          errn = cls.geterrno() -        raise OSError(errn, os.strerror(errn)) +        raise ChangelogException(errn, os.strerror(errn))      @classmethod      def _get_api(cls, call): @@ -35,19 +39,19 @@ class Changes(object):                                                      log_file,                                                      log_level, retries)          if ret == -1: -            cls.raise_oserr() +            cls.raise_changelog_err()      @classmethod      def cl_scan(cls):          ret = cls._get_api('gf_changelog_scan')()          if ret == -1: -            cls.raise_oserr() +            cls.raise_changelog_err()      @classmethod      def cl_startfresh(cls):          ret = cls._get_api('gf_changelog_start_fresh')()          if ret == -1: -            cls.raise_oserr() +            cls.raise_changelog_err()      @classmethod      def cl_getchanges(cls): @@ -64,7 +68,7 @@ class Changes(object):                  break              changes.append(buf.raw[:ret - 1])          if ret == -1: -            cls.raise_oserr() +            cls.raise_changelog_err()          # cleanup tracker          cls.cl_startfresh()          return sorted(changes, key=clsort) @@ -73,4 +77,48 @@ class Changes(object):      def cl_done(cls, clfile):          ret = cls._get_api('gf_changelog_done')(clfile)          if ret == -1: -            cls.raise_oserr() +            cls.raise_changelog_err() + +    @classmethod +    def cl_history_scan(cls): +        ret = cls._get_api('gf_history_changelog_scan')() +        if ret == -1: +            cls.raise_changelog_err() + +        return ret + +    @classmethod +    def cl_history_changelog(cls, changelog_path, start, end): +        ret = cls._get_api('gf_history_changelog')(changelog_path, start, end) +        if ret == -1: +            cls.raise_changelog_err() + +        return ret + +    @classmethod +    def cl_history_startfresh(cls): +        ret = cls._get_api('gf_history_changelog_start_fresh')() +        if ret == -1: +            cls.raise_changelog_err() + +    @classmethod +    def cl_history_getchanges(cls): +        changes = [] +        buf = create_string_buffer('\0', 4096) +        call = cls._get_api('gf_history_changelog_next_change') + +        while True: +            ret = call(buf, 4096) +            if ret in (0, -1): +                break +            changes.append(buf.raw[:ret - 1]) +        if ret == -1: +            cls.raise_changelog_err() + +        return changes + +    @classmethod +    def cl_history_done(cls, clfile): +        ret = cls._get_api('gf_history_changelog_done')(clfile) +        if ret == -1: +            cls.raise_changelog_err() diff --git a/geo-replication/syncdaemon/master.py b/geo-replication/syncdaemon/master.py index 4301396f9f4..3047c99050e 100644 --- a/geo-replication/syncdaemon/master.py +++ b/geo-replication/syncdaemon/master.py @@ -25,6 +25,7 @@ from tempfile import NamedTemporaryFile  from syncdutils import Thread, GsyncdError, boolify, escape  from syncdutils import unescape, select, gauxpfx, md5hex, selfkill  from syncdutils import lstat, errno_wrap +from syncdutils import NoPurgeTimeAvailable, PartialHistoryAvailable  URXTIME = (-1, 0) @@ -904,7 +905,7 @@ class GMasterChangelogMixin(GMasterCommon):                  if done:                      xtl = (int(change.split('.')[-1]) - 1, 0)                      self.upd_stime(xtl) -                    map(self.master.server.changelog_done, changes) +                    map(self.changelog_done_func, changes)                  self.update_worker_files_syncd()                  break @@ -923,7 +924,7 @@ class GMasterChangelogMixin(GMasterCommon):                  if done:                      xtl = (int(change.split('.')[-1]) - 1, 0)                      self.upd_stime(xtl) -                    map(self.master.server.changelog_done, changes) +                    map(self.changelog_done_func, changes)                  break              # it's either entry_ops() or Rsync that failed to do it's              # job. Mostly it's entry_ops() [which currently has a problem @@ -1106,12 +1107,9 @@ class GMasterChangelogMixin(GMasterCommon):          purge_time = self.xtime('.', self.slave)          if isinstance(purge_time, int):              purge_time = None -        try: -            self.master.server.changelog_scan() -            self.crawls += 1 -        except OSError: -            self.fallback_xsync() -            self.update_worker_crawl_status("Hybrid Crawl") + +        self.master.server.changelog_scan() +        self.crawls += 1          changes = self.master.server.changelog_getchanges()          if changes:              if purge_time: @@ -1124,23 +1122,82 @@ class GMasterChangelogMixin(GMasterCommon):                          os.path.basename(pr))                      self.master.server.changelog_done(pr)                      changes.remove(pr) -            logging.debug('processing changes %s' % repr(changes)) +              if changes: +                logging.debug('processing changes %s' % repr(changes))                  self.process(changes)      def register(self):          (workdir, logfile) = self.setup_working_dir()          self.sleep_interval = int(gconf.change_interval) +        self.changelog_done_func = self.master.server.changelog_done          # register with the changelog library -        try: -            # 9 == log level (DEBUG) -            # 5 == connection retries -            self.master.server.changelog_register(gconf.local_path, -                                                  workdir, logfile, 9, 5) -        except OSError: -            self.fallback_xsync() -            # control should not reach here -            raise +        # 9 == log level (DEBUG) +        # 5 == connection retries +        self.master.server.changelog_register(gconf.local_path, +                                              workdir, logfile, 9, 5) + + +class GMasterChangeloghistoryMixin(GMasterChangelogMixin): +    def register(self): +        super(GMasterChangeloghistoryMixin, self).register() +        self.changelog_register_time = int(time.time()) +        self.changelog_done_func = self.master.server.history_changelog_done + +    def crawl(self): +        self.update_worker_crawl_status("History Crawl") + +        # get stime (from the brick) and purge changelogs +        # that are _historical_ to that time. +        purge_time = self.xtime('.', self.slave) +        if isinstance(purge_time, int): +            purge_time = None + +        if not purge_time or purge_time == URXTIME: +            raise NoPurgeTimeAvailable() + +        logging.debug("Get changelog history between %s and %s" % +                      (purge_time[0], self.changelog_register_time)) + +        # Changelogs backend path is hardcoded as +        # <BRICK_PATH>/.glusterfs/changelogs, if user configured to different +        # location then consuming history will not work(Known issue as of now) +        changelog_path = os.path.join(gconf.local_path, +                                      ".glusterfs/changelogs") +        ts = self.master.server.history_changelog(changelog_path, +                                                  purge_time[0], +                                                  self.changelog_register_time) + +        # scan followed by getchanges till scan returns zero. +        # history_changelog_scan() is blocking call, till it gets the number +        # of changelogs to process. Returns zero when no changelogs +        # to be processed. returns positive value as number of changelogs +        # to be processed, which will be fetched using +        # history_changelog_getchanges() +        while self.master.server.history_changelog_scan() > 0: +            self.crawls += 1 + +            changes = self.master.server.history_changelog_getchanges() +            if changes: +                if purge_time: +                    logging.info("slave's time: %s" % repr(purge_time)) +                    processed = [x for x in changes +                                 if int(x.split('.')[-1]) < purge_time[0]] +                    for pr in processed: +                        logging.info('skipping already processed change: ' +                                     '%s...' % os.path.basename(pr)) +                        self.changelog_done_func(pr) +                        changes.remove(pr) + +            if changes: +                logging.debug('processing changes %s' % repr(changes)) +                self.process(changes) + +        # If TS returned from history_changelog is < register_time +        # then FS crawl may be required, since history is only available +        # till TS returned from history_changelog +        if ts < self.changelog_register_time: +            raise PartialHistoryAvailable(str(ts))  class GMasterXsyncMixin(GMasterChangelogMixin): diff --git a/geo-replication/syncdaemon/resource.py b/geo-replication/syncdaemon/resource.py index e3cf33ffdc5..aaf257e9c71 100644 --- a/geo-replication/syncdaemon/resource.py +++ b/geo-replication/syncdaemon/resource.py @@ -33,6 +33,8 @@ from master import gmaster_builder  import syncdutils  from syncdutils import GsyncdError, select, privileged, boolify, funcode  from syncdutils import umask, entry2pb, gauxpfx, errno_wrap, lstat +from syncdutils import NoPurgeTimeAvailable, PartialHistoryAvailable +from libgfchangelog import ChangelogException  UrlRX = re.compile('\A(\w+)://([^ *?[]*)\Z')  HostRX = re.compile('[a-z\d](?:[a-z\d.-]*[a-z\d])?', re.I) @@ -683,6 +685,22 @@ class Server(object):          Changes.cl_done(clfile)      @classmethod +    def history_changelog(cls, changelog_path, start, end): +        return Changes.cl_history_changelog(changelog_path, start, end) + +    @classmethod +    def history_changelog_scan(cls): +        return Changes.cl_history_scan() + +    @classmethod +    def history_changelog_getchanges(cls): +        return Changes.cl_history_getchanges() + +    @classmethod +    def history_changelog_done(cls, clfile): +        Changes.cl_history_done(clfile) + +    @classmethod      @_pathguard      def setattr(cls, path, adct):          """set file attributes @@ -1213,7 +1231,8 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):          """return a tuple of the 'one shot' and the 'main crawl'          class instance"""          return (gmaster_builder('xsync')(self, slave), -                gmaster_builder()(self, slave)) +                gmaster_builder()(self, slave), +                gmaster_builder('changeloghistory')(self, slave))      def service_loop(self, *args):          """enter service loop @@ -1277,20 +1296,55 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):                                                    mark)                          ),                          slave.server) -                (g1, g2) = self.gmaster_instantiate_tuple(slave) +                (g1, g2, g3) = self.gmaster_instantiate_tuple(slave)                  g1.master.server = brickserver                  g2.master.server = brickserver +                g3.master.server = brickserver              else: -                (g1, g2) = self.gmaster_instantiate_tuple(slave) +                (g1, g2, g3) = self.gmaster_instantiate_tuple(slave)                  g1.master.server.aggregated = gmaster.master.server                  g2.master.server.aggregated = gmaster.master.server +                g3.master.server.aggregated = gmaster.master.server              # bad bad bad: bad way to do things like this              # need to make this elegant              # register the crawlers and start crawling +            # g1 ==> Xsync, g2 ==> config.change_detector(changelog by default) +            # g3 ==> changelog History              g1.register() -            g2.register() -            g1.crawlwrap(oneshot=True) -            g2.crawlwrap() +            try: +                g2.register() +                g3.register() +            except ChangelogException as e: +                logging.debug("Changelog register failed: %s - %s" % +                              (e.errno, e.strerror)) + +            # oneshot: Try to use changelog history api, if not +            # available switch to FS crawl +            # Note: if config.change_detector is xsync then +            # it will not use changelog history api +            try: +                g3.crawlwrap(oneshot=True) +            except (ChangelogException, NoPurgeTimeAvailable, +                    PartialHistoryAvailable) as e: +                if isinstance(e, ChangelogException): +                    logging.debug('Changelog history crawl failed, failback ' +                                  'to xsync: %s - %s' % (e.errno, e.strerror)) +                elif isinstance(e, NoPurgeTimeAvailable): +                    logging.debug('Using xsync crawl since no purge time ' +                                  'available') +                elif isinstance(e, PartialHistoryAvailable): +                    logging.debug('Using xsync crawl after consuming history ' +                                  'till %s' % str(e)) +                g1.crawlwrap(oneshot=True) + +            # crawl loop: Try changelog crawl, if failed +            # switch to FS crawl +            try: +                g2.crawlwrap() +            except ChangelogException as e: +                logging.debug('Changelog crawl failed, failback to xsync: ' +                              '%s - %s' % (e.errno, e.strerror)) +                g1.crawlwrap()          else:              sup(self, *args) diff --git a/geo-replication/syncdaemon/syncdutils.py b/geo-replication/syncdaemon/syncdutils.py index 822d919ecb1..d4ded39f562 100644 --- a/geo-replication/syncdaemon/syncdutils.py +++ b/geo-replication/syncdaemon/syncdutils.py @@ -488,3 +488,11 @@ def lstat(e):              return ex.errno          else:              raise + + +class NoPurgeTimeAvailable(Exception): +    pass + + +class PartialHistoryAvailable(Exception): +    pass  | 
