summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--geo-replication/syncdaemon/master.py101
-rw-r--r--geo-replication/syncdaemon/resource.py55
2 files changed, 118 insertions, 38 deletions
diff --git a/geo-replication/syncdaemon/master.py b/geo-replication/syncdaemon/master.py
index 2decc5de930..e3904736ba2 100644
--- a/geo-replication/syncdaemon/master.py
+++ b/geo-replication/syncdaemon/master.py
@@ -29,6 +29,14 @@ from syncdutils import NoPurgeTimeAvailable, PartialHistoryAvailable
URXTIME = (-1, 0)
+# Default rollover time set in changelog translator
+# changelog rollover time is hardcoded here to avoid the
+# xsync usage when crawling switch happens from history
+# to changelog. If rollover time increased in translator
+# then geo-rep can enter into xsync crawl after history
+# crawl before starting live changelog crawl.
+CHANGELOG_ROLLOVER_TIME = 15
+
# Utility functions to help us to get to closer proximity
# of the DRY principle (no, don't look for elevated or
# perspectivistic things here)
@@ -160,7 +168,10 @@ class NormalMixin(object):
raise GsyncdError("timestamp corruption for " + path)
def need_sync(self, e, xte, xtrd):
- return xte > xtrd
+ if self.xsync_upper_limit is None:
+ return xte > xtrd
+ else:
+ return xte > xtrd and xte < self.xsync_upper_limit
def set_slave_xtime(self, path, mark):
self.slave.server.set_stime(path, self.uuid, mark)
@@ -431,7 +442,7 @@ class GMasterCommon(object):
def register(self):
self.register()
- def crawlwrap(self, oneshot=False):
+ def crawlwrap(self, oneshot=False, no_stime_update=False):
if oneshot:
# it's important to do this during the oneshot crawl as
# for a passive gsyncd (ie. in a replicate scenario)
@@ -499,7 +510,7 @@ class GMasterCommon(object):
time.sleep(5)
continue
self.update_worker_health("Active")
- self.crawl()
+ self.crawl(no_stime_update=no_stime_update)
if oneshot:
return
time.sleep(self.sleep_interval)
@@ -1119,7 +1130,7 @@ class GMasterChangelogMixin(GMasterCommon):
except:
raise
- def crawl(self):
+ def crawl(self, no_stime_update=False):
self.update_worker_crawl_status("Changelog Crawl")
changes = []
# get stime (from the brick) and purge changelogs
@@ -1147,20 +1158,25 @@ class GMasterChangelogMixin(GMasterCommon):
logging.debug('processing changes %s' % repr(changes))
self.process(changes)
- def register(self, changelog_agent):
+ def register(self, register_time, changelog_agent):
self.changelog_agent = changelog_agent
self.sleep_interval = int(gconf.change_interval)
self.changelog_done_func = self.changelog_agent.done
class GMasterChangeloghistoryMixin(GMasterChangelogMixin):
- def register(self, changelog_agent):
+ def register(self, register_time, changelog_agent):
self.changelog_agent = changelog_agent
- self.changelog_register_time = int(time.time())
+ self.changelog_register_time = register_time
+ self.history_crawl_start_time = register_time
self.changelog_done_func = self.changelog_agent.history_done
+ self.history_turns = 0
- def crawl(self):
+ def crawl(self, no_stime_update=False):
+ self.history_turns += 1
self.update_worker_crawl_status("History Crawl")
+ logging.info('starting history crawl... turns: %s' %
+ self.history_turns)
# get stime (from the brick) and purge changelogs
# that are _historical_ to that time.
@@ -1169,11 +1185,9 @@ class GMasterChangeloghistoryMixin(GMasterChangelogMixin):
purge_time = None
if not purge_time or purge_time == URXTIME:
+ logging.info("stime not available, abandoning history crawl")
raise NoPurgeTimeAvailable()
- logging.debug("Get changelog history between %s and %s" %
- (purge_time[0], self.changelog_register_time))
-
# Changelogs backend path is hardcoded as
# <BRICK_PATH>/.glusterfs/changelogs, if user configured to different
# location then consuming history will not work(Known issue as of now)
@@ -1210,11 +1224,26 @@ class GMasterChangeloghistoryMixin(GMasterChangelogMixin):
logging.debug('processing changes %s' % repr(changes))
self.process(changes)
+ history_turn_time = int(time.time()) - self.history_crawl_start_time
+
+ logging.info('finished history crawl syncing between %s - %s.' %
+ (purge_time[0], actual_end))
+
# If TS returned from history_changelog is < register_time
# then FS crawl may be required, since history is only available
# till TS returned from history_changelog
if actual_end < self.changelog_register_time:
- raise PartialHistoryAvailable(str(actual_end))
+ if self.history_turns < 2:
+ sleep_time = 1
+ if history_turn_time < CHANGELOG_ROLLOVER_TIME:
+ sleep_time = CHANGELOG_ROLLOVER_TIME - history_turn_time
+ time.sleep(sleep_time)
+ self.history_crawl_start_time = int(time.time())
+ self.crawl()
+ else:
+ # This exeption will be catched in resource.py and
+ # fallback to xsync for the small gap.
+ raise PartialHistoryAvailable(str(actual_end))
class GMasterXsyncMixin(GMasterChangelogMixin):
@@ -1231,7 +1260,7 @@ class GMasterXsyncMixin(GMasterChangelogMixin):
XSYNC_MAX_ENTRIES = 1 << 13
- def register(self, changelog_agent=None):
+ def register(self, register_time, changelog_agent=None):
self.counter = 0
self.comlist = []
self.stimes = []
@@ -1248,7 +1277,18 @@ class GMasterXsyncMixin(GMasterChangelogMixin):
else:
raise
- def crawl(self):
+ # After changelogs history processing completes, it switches
+ # to xsync/hibrid crawl if history actual end time is less than
+ # live changelog register time. Xsync should only run for that
+ # small gap, ie.. changelog_register_time - history_actual_end_time
+ # If we don't have upper limit to limit the XSync change detection
+ # It will keep on detecting the files even though changelogs are
+ # available for the same. Set upper limit during register
+ # and reset at the end of each crawl. Reseting at the end of
+ # crawl is required if change_detector is set to xsync.
+ self.xsync_upper_limit = (register_time, 0)
+
+ def crawl(self, no_stime_update=False):
"""
event dispatcher thread
@@ -1265,19 +1305,36 @@ class GMasterXsyncMixin(GMasterChangelogMixin):
try:
item = self.comlist.pop(0)
if item[0] == 'finale':
- logging.info('finished hybrid crawl syncing')
+ if self.xsync_upper_limit is not None:
+ logging.info('finished hybrid crawl syncing, endtime: '
+ '%s' % self.xsync_upper_limit[0])
+ else:
+ logging.info('finished hybrid crawl syncing')
+
break
elif item[0] == 'xsync':
logging.info('processing xsync changelog %s' % (item[1]))
self.process([item[1]], 0)
elif item[0] == 'stime':
- logging.debug('setting slave time: %s' % repr(item[1]))
- self.upd_stime(item[1][1], item[1][0])
+ if not no_stime_update:
+ # xsync is started after running history but if
+ # history actual end time is less than register time
+ # then if we update stime, live changelog processing
+ # will skip the changelogs for which TS is less than
+ # stime. During this deletes and renames are not
+ # propogated. By not setting stime live changelog will
+ # start processing from the register time. Since we
+ # have xsync_upper_limit their will not be much
+ # overlap/redo of changelogs.
+ logging.debug('setting slave time: %s' % repr(item[1]))
+ self.upd_stime(item[1][1], item[1][0])
else:
logging.warn('unknown tuple in comlist (%s)' % repr(item))
except IndexError:
time.sleep(1)
+ self.xsync_upper_limit = None
+
def write_entry_change(self, prefix, data=[]):
self.fh.write("%s %s\n" % (prefix, ' '.join(data)))
@@ -1402,7 +1459,10 @@ class GMasterXsyncMixin(GMasterChangelogMixin):
self.write_entry_change("M", [gfid, "SETATTR", str(st.st_uid),
str(st.st_gid), str(st.st_mode)])
self.Xcrawl(e, xtr_root)
- self.stimes.append((e, xte))
+ stime_to_update = xte
+ if self.xsync_upper_limit is not None:
+ stime_to_update = min(self.xsync_upper_limit, xte)
+ self.stimes.append((e, stime_to_update))
elif stat.S_ISLNK(mo):
self.write_entry_change(
"E", [gfid, 'SYMLINK', escape(os.path.join(pargfid,
@@ -1426,7 +1486,10 @@ class GMasterXsyncMixin(GMasterChangelogMixin):
bname))])
self.write_entry_change("D", [gfid])
if path == '.':
- self.stimes.append((path, xtl))
+ stime_to_update = xtl
+ if self.xsync_upper_limit is not None:
+ stime_to_update = min(self.xsync_upper_limit, xtl)
+ self.stimes.append((path, stime_to_update))
self.sync_done(self.stimes, True)
diff --git a/geo-replication/syncdaemon/resource.py b/geo-replication/syncdaemon/resource.py
index b537ff65003..c84265739c5 100644
--- a/geo-replication/syncdaemon/resource.py
+++ b/geo-replication/syncdaemon/resource.py
@@ -1267,6 +1267,7 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):
# register the crawlers and start crawling
# g1 ==> Xsync, g2 ==> config.change_detector(changelog by default)
# g3 ==> changelog History
+ changelog_register_failed = False
(inf, ouf, ra, wa) = gconf.rpc_fd.split(',')
os.close(int(ra))
os.close(int(wa))
@@ -1278,7 +1279,6 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):
"local %s, remote %s" %
(CHANGELOG_AGENT_CLIENT_VERSION, rv))
- g1.register()
try:
workdir = g2.setup_working_dir()
# register with the changelog library
@@ -1288,38 +1288,55 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):
workdir, gconf.changelog_log_file,
g2.CHANGELOG_LOG_LEVEL,
g2.CHANGELOG_CONN_RETRIES)
- g2.register(changelog_agent)
- g3.register(changelog_agent)
- except ChangelogException as e:
- logging.debug("Changelog register failed: %s - %s" %
- (e.errno, e.strerror))
-
+ register_time = int(time.time())
+ g2.register(register_time, changelog_agent)
+ g3.register(register_time, changelog_agent)
+ except ChangelogException:
+ changelog_register_failed = True
+ register_time = int(time.time())
+ logging.info("Changelog register failed, fallback to xsync")
+
+ g1.register(register_time)
+ logging.info("Register time: %s" % register_time)
# oneshot: Try to use changelog history api, if not
# available switch to FS crawl
# Note: if config.change_detector is xsync then
# it will not use changelog history api
try:
- g3.crawlwrap(oneshot=True)
+ if not changelog_register_failed:
+ g3.crawlwrap(oneshot=True)
+ else:
+ g1.crawlwrap(oneshot=True)
except (ChangelogException, NoPurgeTimeAvailable,
PartialHistoryAvailable) as e:
if isinstance(e, ChangelogException):
- logging.debug('Changelog history crawl failed, failback '
- 'to xsync: %s - %s' % (e.errno, e.strerror))
- elif isinstance(e, NoPurgeTimeAvailable):
- logging.debug('Using xsync crawl since no purge time '
- 'available')
+ logging.info('Changelog history crawl failed, fallback '
+ 'to xsync: %s - %s' % (e.errno, e.strerror))
elif isinstance(e, PartialHistoryAvailable):
- logging.debug('Using xsync crawl after consuming history '
- 'till %s' % str(e))
- g1.crawlwrap(oneshot=True)
+ logging.info('Partial history available, using xsync crawl'
+ ' after consuming history '
+ 'till %s' % str(e))
+ g1.crawlwrap(oneshot=True, no_stime_update=True)
+
+ # Reset xsync upper limit. g2, g3 are changelog and history
+ # instances, but if change_detector is set to xsync then
+ # g1, g2, g3 will be xsync instances.
+ g1.xsync_upper_limit = None
+ if getattr(g2, "xsync_upper_limit", None) is not None:
+ g2.xsync_upper_limit = None
+
+ if getattr(g3, "xsync_upper_limit", None) is not None:
+ g3.xsync_upper_limit = None
# crawl loop: Try changelog crawl, if failed
# switch to FS crawl
try:
- g2.crawlwrap()
+ if not changelog_register_failed:
+ g2.crawlwrap()
+ else:
+ g1.crawlwrap()
except ChangelogException as e:
- logging.debug('Changelog crawl failed, failback to xsync: '
- '%s - %s' % (e.errno, e.strerror))
+ logging.info('Changelog crawl failed, fallback to xsync')
g1.crawlwrap()
else:
sup(self, *args)