summaryrefslogtreecommitdiffstats
path: root/geo-replication/syncdaemon/master.py
diff options
context:
space:
mode:
Diffstat (limited to 'geo-replication/syncdaemon/master.py')
-rw-r--r--geo-replication/syncdaemon/master.py93
1 files changed, 75 insertions, 18 deletions
diff --git a/geo-replication/syncdaemon/master.py b/geo-replication/syncdaemon/master.py
index 4301396f9f4..3047c99050e 100644
--- a/geo-replication/syncdaemon/master.py
+++ b/geo-replication/syncdaemon/master.py
@@ -25,6 +25,7 @@ from tempfile import NamedTemporaryFile
from syncdutils import Thread, GsyncdError, boolify, escape
from syncdutils import unescape, select, gauxpfx, md5hex, selfkill
from syncdutils import lstat, errno_wrap
+from syncdutils import NoPurgeTimeAvailable, PartialHistoryAvailable
URXTIME = (-1, 0)
@@ -904,7 +905,7 @@ class GMasterChangelogMixin(GMasterCommon):
if done:
xtl = (int(change.split('.')[-1]) - 1, 0)
self.upd_stime(xtl)
- map(self.master.server.changelog_done, changes)
+ map(self.changelog_done_func, changes)
self.update_worker_files_syncd()
break
@@ -923,7 +924,7 @@ class GMasterChangelogMixin(GMasterCommon):
if done:
xtl = (int(change.split('.')[-1]) - 1, 0)
self.upd_stime(xtl)
- map(self.master.server.changelog_done, changes)
+ map(self.changelog_done_func, changes)
break
# it's either entry_ops() or Rsync that failed to do it's
# job. Mostly it's entry_ops() [which currently has a problem
@@ -1106,12 +1107,9 @@ class GMasterChangelogMixin(GMasterCommon):
purge_time = self.xtime('.', self.slave)
if isinstance(purge_time, int):
purge_time = None
- try:
- self.master.server.changelog_scan()
- self.crawls += 1
- except OSError:
- self.fallback_xsync()
- self.update_worker_crawl_status("Hybrid Crawl")
+
+ self.master.server.changelog_scan()
+ self.crawls += 1
changes = self.master.server.changelog_getchanges()
if changes:
if purge_time:
@@ -1124,23 +1122,82 @@ class GMasterChangelogMixin(GMasterCommon):
os.path.basename(pr))
self.master.server.changelog_done(pr)
changes.remove(pr)
- logging.debug('processing changes %s' % repr(changes))
+
if changes:
+ logging.debug('processing changes %s' % repr(changes))
self.process(changes)
def register(self):
(workdir, logfile) = self.setup_working_dir()
self.sleep_interval = int(gconf.change_interval)
+ self.changelog_done_func = self.master.server.changelog_done
# register with the changelog library
- try:
- # 9 == log level (DEBUG)
- # 5 == connection retries
- self.master.server.changelog_register(gconf.local_path,
- workdir, logfile, 9, 5)
- except OSError:
- self.fallback_xsync()
- # control should not reach here
- raise
+ # 9 == log level (DEBUG)
+ # 5 == connection retries
+ self.master.server.changelog_register(gconf.local_path,
+ workdir, logfile, 9, 5)
+
+
+class GMasterChangeloghistoryMixin(GMasterChangelogMixin):
+ def register(self):
+ super(GMasterChangeloghistoryMixin, self).register()
+ self.changelog_register_time = int(time.time())
+ self.changelog_done_func = self.master.server.history_changelog_done
+
+ def crawl(self):
+ self.update_worker_crawl_status("History Crawl")
+
+ # get stime (from the brick) and purge changelogs
+ # that are _historical_ to that time.
+ purge_time = self.xtime('.', self.slave)
+ if isinstance(purge_time, int):
+ purge_time = None
+
+ if not purge_time or purge_time == URXTIME:
+ raise NoPurgeTimeAvailable()
+
+ logging.debug("Get changelog history between %s and %s" %
+ (purge_time[0], self.changelog_register_time))
+
+ # Changelogs backend path is hardcoded as
+ # <BRICK_PATH>/.glusterfs/changelogs, if user configured to different
+ # location then consuming history will not work(Known issue as of now)
+ changelog_path = os.path.join(gconf.local_path,
+ ".glusterfs/changelogs")
+ ts = self.master.server.history_changelog(changelog_path,
+ purge_time[0],
+ self.changelog_register_time)
+
+ # scan followed by getchanges till scan returns zero.
+ # history_changelog_scan() is blocking call, till it gets the number
+ # of changelogs to process. Returns zero when no changelogs
+ # to be processed. returns positive value as number of changelogs
+ # to be processed, which will be fetched using
+ # history_changelog_getchanges()
+ while self.master.server.history_changelog_scan() > 0:
+ self.crawls += 1
+
+ changes = self.master.server.history_changelog_getchanges()
+ if changes:
+ if purge_time:
+ logging.info("slave's time: %s" % repr(purge_time))
+ processed = [x for x in changes
+ if int(x.split('.')[-1]) < purge_time[0]]
+ for pr in processed:
+ logging.info('skipping already processed change: '
+ '%s...' % os.path.basename(pr))
+ self.changelog_done_func(pr)
+ changes.remove(pr)
+
+ if changes:
+ logging.debug('processing changes %s' % repr(changes))
+ self.process(changes)
+
+ # If TS returned from history_changelog is < register_time
+ # then FS crawl may be required, since history is only available
+ # till TS returned from history_changelog
+ if ts < self.changelog_register_time:
+ raise PartialHistoryAvailable(str(ts))
class GMasterXsyncMixin(GMasterChangelogMixin):