summaryrefslogtreecommitdiffstats
path: root/geo-replication/syncdaemon/master.py
diff options
context:
space:
mode:
Diffstat (limited to 'geo-replication/syncdaemon/master.py')
-rw-r--r--geo-replication/syncdaemon/master.py101
1 files changed, 82 insertions, 19 deletions
diff --git a/geo-replication/syncdaemon/master.py b/geo-replication/syncdaemon/master.py
index 2decc5de930..e3904736ba2 100644
--- a/geo-replication/syncdaemon/master.py
+++ b/geo-replication/syncdaemon/master.py
@@ -29,6 +29,14 @@ from syncdutils import NoPurgeTimeAvailable, PartialHistoryAvailable
URXTIME = (-1, 0)
+# Default rollover time set in changelog translator
+# changelog rollover time is hardcoded here to avoid the
+# xsync usage when crawling switch happens from history
+# to changelog. If rollover time increased in translator
+# then geo-rep can enter into xsync crawl after history
+# crawl before starting live changelog crawl.
+CHANGELOG_ROLLOVER_TIME = 15
+
# Utility functions to help us to get to closer proximity
# of the DRY principle (no, don't look for elevated or
# perspectivistic things here)
@@ -160,7 +168,10 @@ class NormalMixin(object):
raise GsyncdError("timestamp corruption for " + path)
def need_sync(self, e, xte, xtrd):
- return xte > xtrd
+ if self.xsync_upper_limit is None:
+ return xte > xtrd
+ else:
+ return xte > xtrd and xte < self.xsync_upper_limit
def set_slave_xtime(self, path, mark):
self.slave.server.set_stime(path, self.uuid, mark)
@@ -431,7 +442,7 @@ class GMasterCommon(object):
def register(self):
self.register()
- def crawlwrap(self, oneshot=False):
+ def crawlwrap(self, oneshot=False, no_stime_update=False):
if oneshot:
# it's important to do this during the oneshot crawl as
# for a passive gsyncd (ie. in a replicate scenario)
@@ -499,7 +510,7 @@ class GMasterCommon(object):
time.sleep(5)
continue
self.update_worker_health("Active")
- self.crawl()
+ self.crawl(no_stime_update=no_stime_update)
if oneshot:
return
time.sleep(self.sleep_interval)
@@ -1119,7 +1130,7 @@ class GMasterChangelogMixin(GMasterCommon):
except:
raise
- def crawl(self):
+ def crawl(self, no_stime_update=False):
self.update_worker_crawl_status("Changelog Crawl")
changes = []
# get stime (from the brick) and purge changelogs
@@ -1147,20 +1158,25 @@ class GMasterChangelogMixin(GMasterCommon):
logging.debug('processing changes %s' % repr(changes))
self.process(changes)
- def register(self, changelog_agent):
+ def register(self, register_time, changelog_agent):
self.changelog_agent = changelog_agent
self.sleep_interval = int(gconf.change_interval)
self.changelog_done_func = self.changelog_agent.done
class GMasterChangeloghistoryMixin(GMasterChangelogMixin):
- def register(self, changelog_agent):
+ def register(self, register_time, changelog_agent):
self.changelog_agent = changelog_agent
- self.changelog_register_time = int(time.time())
+ self.changelog_register_time = register_time
+ self.history_crawl_start_time = register_time
self.changelog_done_func = self.changelog_agent.history_done
+ self.history_turns = 0
- def crawl(self):
+ def crawl(self, no_stime_update=False):
+ self.history_turns += 1
self.update_worker_crawl_status("History Crawl")
+ logging.info('starting history crawl... turns: %s' %
+ self.history_turns)
# get stime (from the brick) and purge changelogs
# that are _historical_ to that time.
@@ -1169,11 +1185,9 @@ class GMasterChangeloghistoryMixin(GMasterChangelogMixin):
purge_time = None
if not purge_time or purge_time == URXTIME:
+ logging.info("stime not available, abandoning history crawl")
raise NoPurgeTimeAvailable()
- logging.debug("Get changelog history between %s and %s" %
- (purge_time[0], self.changelog_register_time))
-
# Changelogs backend path is hardcoded as
# <BRICK_PATH>/.glusterfs/changelogs, if user configured to different
# location then consuming history will not work(Known issue as of now)
@@ -1210,11 +1224,26 @@ class GMasterChangeloghistoryMixin(GMasterChangelogMixin):
logging.debug('processing changes %s' % repr(changes))
self.process(changes)
+ history_turn_time = int(time.time()) - self.history_crawl_start_time
+
+ logging.info('finished history crawl syncing between %s - %s.' %
+ (purge_time[0], actual_end))
+
# If TS returned from history_changelog is < register_time
# then FS crawl may be required, since history is only available
# till TS returned from history_changelog
if actual_end < self.changelog_register_time:
- raise PartialHistoryAvailable(str(actual_end))
+ if self.history_turns < 2:
+ sleep_time = 1
+ if history_turn_time < CHANGELOG_ROLLOVER_TIME:
+ sleep_time = CHANGELOG_ROLLOVER_TIME - history_turn_time
+ time.sleep(sleep_time)
+ self.history_crawl_start_time = int(time.time())
+ self.crawl()
+ else:
+ # This exeption will be catched in resource.py and
+ # fallback to xsync for the small gap.
+ raise PartialHistoryAvailable(str(actual_end))
class GMasterXsyncMixin(GMasterChangelogMixin):
@@ -1231,7 +1260,7 @@ class GMasterXsyncMixin(GMasterChangelogMixin):
XSYNC_MAX_ENTRIES = 1 << 13
- def register(self, changelog_agent=None):
+ def register(self, register_time, changelog_agent=None):
self.counter = 0
self.comlist = []
self.stimes = []
@@ -1248,7 +1277,18 @@ class GMasterXsyncMixin(GMasterChangelogMixin):
else:
raise
- def crawl(self):
+ # After changelogs history processing completes, it switches
+ # to xsync/hibrid crawl if history actual end time is less than
+ # live changelog register time. Xsync should only run for that
+ # small gap, ie.. changelog_register_time - history_actual_end_time
+ # If we don't have upper limit to limit the XSync change detection
+ # It will keep on detecting the files even though changelogs are
+ # available for the same. Set upper limit during register
+ # and reset at the end of each crawl. Reseting at the end of
+ # crawl is required if change_detector is set to xsync.
+ self.xsync_upper_limit = (register_time, 0)
+
+ def crawl(self, no_stime_update=False):
"""
event dispatcher thread
@@ -1265,19 +1305,36 @@ class GMasterXsyncMixin(GMasterChangelogMixin):
try:
item = self.comlist.pop(0)
if item[0] == 'finale':
- logging.info('finished hybrid crawl syncing')
+ if self.xsync_upper_limit is not None:
+ logging.info('finished hybrid crawl syncing, endtime: '
+ '%s' % self.xsync_upper_limit[0])
+ else:
+ logging.info('finished hybrid crawl syncing')
+
break
elif item[0] == 'xsync':
logging.info('processing xsync changelog %s' % (item[1]))
self.process([item[1]], 0)
elif item[0] == 'stime':
- logging.debug('setting slave time: %s' % repr(item[1]))
- self.upd_stime(item[1][1], item[1][0])
+ if not no_stime_update:
+ # xsync is started after running history but if
+ # history actual end time is less than register time
+ # then if we update stime, live changelog processing
+ # will skip the changelogs for which TS is less than
+ # stime. During this deletes and renames are not
+ # propogated. By not setting stime live changelog will
+ # start processing from the register time. Since we
+ # have xsync_upper_limit their will not be much
+ # overlap/redo of changelogs.
+ logging.debug('setting slave time: %s' % repr(item[1]))
+ self.upd_stime(item[1][1], item[1][0])
else:
logging.warn('unknown tuple in comlist (%s)' % repr(item))
except IndexError:
time.sleep(1)
+ self.xsync_upper_limit = None
+
def write_entry_change(self, prefix, data=[]):
self.fh.write("%s %s\n" % (prefix, ' '.join(data)))
@@ -1402,7 +1459,10 @@ class GMasterXsyncMixin(GMasterChangelogMixin):
self.write_entry_change("M", [gfid, "SETATTR", str(st.st_uid),
str(st.st_gid), str(st.st_mode)])
self.Xcrawl(e, xtr_root)
- self.stimes.append((e, xte))
+ stime_to_update = xte
+ if self.xsync_upper_limit is not None:
+ stime_to_update = min(self.xsync_upper_limit, xte)
+ self.stimes.append((e, stime_to_update))
elif stat.S_ISLNK(mo):
self.write_entry_change(
"E", [gfid, 'SYMLINK', escape(os.path.join(pargfid,
@@ -1426,7 +1486,10 @@ class GMasterXsyncMixin(GMasterChangelogMixin):
bname))])
self.write_entry_change("D", [gfid])
if path == '.':
- self.stimes.append((path, xtl))
+ stime_to_update = xtl
+ if self.xsync_upper_limit is not None:
+ stime_to_update = min(self.xsync_upper_limit, xtl)
+ self.stimes.append((path, stime_to_update))
self.sync_done(self.stimes, True)