summaryrefslogtreecommitdiffstats
path: root/geo-replication
diff options
context:
space:
mode:
authorAravinda VK <avishwan@redhat.com>2014-06-23 13:43:20 +0530
committerVijay Bellur <vbellur@redhat.com>2014-09-08 10:41:59 -0700
commitfca81b1300e2afdf3eb7cb75428657a31e92bc00 (patch)
treedbc364a6d5ca2501879eae2afff03a89dce70bfb /geo-replication
parent0db65a084608a9deb0d0917f084e0d55e23e54a7 (diff)
geo-rep: minimize xsync crawl usage and set upper limit to xsync crawl
For effective handling of deletes and renames use history crawl as much as possible. History crawl will run in loop till it syncs all data before live changelog time. When it uses xsync crawl(fallback when changelog not available, or very first crawl) it sets upper limit to crawl. After completing History crawl, it checks actual end time returned by history api to compare with register time, if actual end is less than register time then run history crawl one more time. If first turn history processing time is less than the CHANGELOG ROLLOVER TIME then sleep for the difference, After sleep if it is guaranteed that rollover will happen and switches to live changelog consumption without switching to xsync. This sleep is only when history processing completed < CHANGELOG_ROLLOVER_TIME and sleep only after the first turn, So will not affect the performance. BUG: 1138952 Change-Id: Ida024211d312f60f0e8190805e7469b2165f00e1 Signed-off-by: Aravinda VK <avishwan@redhat.com> Reviewed-on: http://review.gluster.org/8151 Tested-by: Gluster Build System <jenkins@build.gluster.com> Reviewed-by: Venky Shankar <vshankar@redhat.com> Tested-by: Venky Shankar <vshankar@redhat.com> Reviewed-on: http://review.gluster.org/8639 Reviewed-by: Vijay Bellur <vbellur@redhat.com>
Diffstat (limited to 'geo-replication')
-rw-r--r--geo-replication/syncdaemon/master.py101
-rw-r--r--geo-replication/syncdaemon/resource.py55
2 files changed, 118 insertions, 38 deletions
diff --git a/geo-replication/syncdaemon/master.py b/geo-replication/syncdaemon/master.py
index 2decc5de930..e3904736ba2 100644
--- a/geo-replication/syncdaemon/master.py
+++ b/geo-replication/syncdaemon/master.py
@@ -29,6 +29,14 @@ from syncdutils import NoPurgeTimeAvailable, PartialHistoryAvailable
URXTIME = (-1, 0)
+# Default rollover time set in changelog translator
+# changelog rollover time is hardcoded here to avoid the
+# xsync usage when crawling switch happens from history
+# to changelog. If rollover time increased in translator
+# then geo-rep can enter into xsync crawl after history
+# crawl before starting live changelog crawl.
+CHANGELOG_ROLLOVER_TIME = 15
+
# Utility functions to help us to get to closer proximity
# of the DRY principle (no, don't look for elevated or
# perspectivistic things here)
@@ -160,7 +168,10 @@ class NormalMixin(object):
raise GsyncdError("timestamp corruption for " + path)
def need_sync(self, e, xte, xtrd):
- return xte > xtrd
+ if self.xsync_upper_limit is None:
+ return xte > xtrd
+ else:
+ return xte > xtrd and xte < self.xsync_upper_limit
def set_slave_xtime(self, path, mark):
self.slave.server.set_stime(path, self.uuid, mark)
@@ -431,7 +442,7 @@ class GMasterCommon(object):
def register(self):
self.register()
- def crawlwrap(self, oneshot=False):
+ def crawlwrap(self, oneshot=False, no_stime_update=False):
if oneshot:
# it's important to do this during the oneshot crawl as
# for a passive gsyncd (ie. in a replicate scenario)
@@ -499,7 +510,7 @@ class GMasterCommon(object):
time.sleep(5)
continue
self.update_worker_health("Active")
- self.crawl()
+ self.crawl(no_stime_update=no_stime_update)
if oneshot:
return
time.sleep(self.sleep_interval)
@@ -1119,7 +1130,7 @@ class GMasterChangelogMixin(GMasterCommon):
except:
raise
- def crawl(self):
+ def crawl(self, no_stime_update=False):
self.update_worker_crawl_status("Changelog Crawl")
changes = []
# get stime (from the brick) and purge changelogs
@@ -1147,20 +1158,25 @@ class GMasterChangelogMixin(GMasterCommon):
logging.debug('processing changes %s' % repr(changes))
self.process(changes)
- def register(self, changelog_agent):
+ def register(self, register_time, changelog_agent):
self.changelog_agent = changelog_agent
self.sleep_interval = int(gconf.change_interval)
self.changelog_done_func = self.changelog_agent.done
class GMasterChangeloghistoryMixin(GMasterChangelogMixin):
- def register(self, changelog_agent):
+ def register(self, register_time, changelog_agent):
self.changelog_agent = changelog_agent
- self.changelog_register_time = int(time.time())
+ self.changelog_register_time = register_time
+ self.history_crawl_start_time = register_time
self.changelog_done_func = self.changelog_agent.history_done
+ self.history_turns = 0
- def crawl(self):
+ def crawl(self, no_stime_update=False):
+ self.history_turns += 1
self.update_worker_crawl_status("History Crawl")
+ logging.info('starting history crawl... turns: %s' %
+ self.history_turns)
# get stime (from the brick) and purge changelogs
# that are _historical_ to that time.
@@ -1169,11 +1185,9 @@ class GMasterChangeloghistoryMixin(GMasterChangelogMixin):
purge_time = None
if not purge_time or purge_time == URXTIME:
+ logging.info("stime not available, abandoning history crawl")
raise NoPurgeTimeAvailable()
- logging.debug("Get changelog history between %s and %s" %
- (purge_time[0], self.changelog_register_time))
-
# Changelogs backend path is hardcoded as
# <BRICK_PATH>/.glusterfs/changelogs, if user configured to different
# location then consuming history will not work(Known issue as of now)
@@ -1210,11 +1224,26 @@ class GMasterChangeloghistoryMixin(GMasterChangelogMixin):
logging.debug('processing changes %s' % repr(changes))
self.process(changes)
+ history_turn_time = int(time.time()) - self.history_crawl_start_time
+
+ logging.info('finished history crawl syncing between %s - %s.' %
+ (purge_time[0], actual_end))
+
# If TS returned from history_changelog is < register_time
# then FS crawl may be required, since history is only available
# till TS returned from history_changelog
if actual_end < self.changelog_register_time:
- raise PartialHistoryAvailable(str(actual_end))
+ if self.history_turns < 2:
+ sleep_time = 1
+ if history_turn_time < CHANGELOG_ROLLOVER_TIME:
+ sleep_time = CHANGELOG_ROLLOVER_TIME - history_turn_time
+ time.sleep(sleep_time)
+ self.history_crawl_start_time = int(time.time())
+ self.crawl()
+ else:
+ # This exeption will be catched in resource.py and
+ # fallback to xsync for the small gap.
+ raise PartialHistoryAvailable(str(actual_end))
class GMasterXsyncMixin(GMasterChangelogMixin):
@@ -1231,7 +1260,7 @@ class GMasterXsyncMixin(GMasterChangelogMixin):
XSYNC_MAX_ENTRIES = 1 << 13
- def register(self, changelog_agent=None):
+ def register(self, register_time, changelog_agent=None):
self.counter = 0
self.comlist = []
self.stimes = []
@@ -1248,7 +1277,18 @@ class GMasterXsyncMixin(GMasterChangelogMixin):
else:
raise
- def crawl(self):
+ # After changelogs history processing completes, it switches
+ # to xsync/hibrid crawl if history actual end time is less than
+ # live changelog register time. Xsync should only run for that
+ # small gap, ie.. changelog_register_time - history_actual_end_time
+ # If we don't have upper limit to limit the XSync change detection
+ # It will keep on detecting the files even though changelogs are
+ # available for the same. Set upper limit during register
+ # and reset at the end of each crawl. Reseting at the end of
+ # crawl is required if change_detector is set to xsync.
+ self.xsync_upper_limit = (register_time, 0)
+
+ def crawl(self, no_stime_update=False):
"""
event dispatcher thread
@@ -1265,19 +1305,36 @@ class GMasterXsyncMixin(GMasterChangelogMixin):
try:
item = self.comlist.pop(0)
if item[0] == 'finale':
- logging.info('finished hybrid crawl syncing')
+ if self.xsync_upper_limit is not None:
+ logging.info('finished hybrid crawl syncing, endtime: '
+ '%s' % self.xsync_upper_limit[0])
+ else:
+ logging.info('finished hybrid crawl syncing')
+
break
elif item[0] == 'xsync':
logging.info('processing xsync changelog %s' % (item[1]))
self.process([item[1]], 0)
elif item[0] == 'stime':
- logging.debug('setting slave time: %s' % repr(item[1]))
- self.upd_stime(item[1][1], item[1][0])
+ if not no_stime_update:
+ # xsync is started after running history but if
+ # history actual end time is less than register time
+ # then if we update stime, live changelog processing
+ # will skip the changelogs for which TS is less than
+ # stime. During this deletes and renames are not
+ # propogated. By not setting stime live changelog will
+ # start processing from the register time. Since we
+ # have xsync_upper_limit their will not be much
+ # overlap/redo of changelogs.
+ logging.debug('setting slave time: %s' % repr(item[1]))
+ self.upd_stime(item[1][1], item[1][0])
else:
logging.warn('unknown tuple in comlist (%s)' % repr(item))
except IndexError:
time.sleep(1)
+ self.xsync_upper_limit = None
+
def write_entry_change(self, prefix, data=[]):
self.fh.write("%s %s\n" % (prefix, ' '.join(data)))
@@ -1402,7 +1459,10 @@ class GMasterXsyncMixin(GMasterChangelogMixin):
self.write_entry_change("M", [gfid, "SETATTR", str(st.st_uid),
str(st.st_gid), str(st.st_mode)])
self.Xcrawl(e, xtr_root)
- self.stimes.append((e, xte))
+ stime_to_update = xte
+ if self.xsync_upper_limit is not None:
+ stime_to_update = min(self.xsync_upper_limit, xte)
+ self.stimes.append((e, stime_to_update))
elif stat.S_ISLNK(mo):
self.write_entry_change(
"E", [gfid, 'SYMLINK', escape(os.path.join(pargfid,
@@ -1426,7 +1486,10 @@ class GMasterXsyncMixin(GMasterChangelogMixin):
bname))])
self.write_entry_change("D", [gfid])
if path == '.':
- self.stimes.append((path, xtl))
+ stime_to_update = xtl
+ if self.xsync_upper_limit is not None:
+ stime_to_update = min(self.xsync_upper_limit, xtl)
+ self.stimes.append((path, stime_to_update))
self.sync_done(self.stimes, True)
diff --git a/geo-replication/syncdaemon/resource.py b/geo-replication/syncdaemon/resource.py
index b537ff65003..c84265739c5 100644
--- a/geo-replication/syncdaemon/resource.py
+++ b/geo-replication/syncdaemon/resource.py
@@ -1267,6 +1267,7 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):
# register the crawlers and start crawling
# g1 ==> Xsync, g2 ==> config.change_detector(changelog by default)
# g3 ==> changelog History
+ changelog_register_failed = False
(inf, ouf, ra, wa) = gconf.rpc_fd.split(',')
os.close(int(ra))
os.close(int(wa))
@@ -1278,7 +1279,6 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):
"local %s, remote %s" %
(CHANGELOG_AGENT_CLIENT_VERSION, rv))
- g1.register()
try:
workdir = g2.setup_working_dir()
# register with the changelog library
@@ -1288,38 +1288,55 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):
workdir, gconf.changelog_log_file,
g2.CHANGELOG_LOG_LEVEL,
g2.CHANGELOG_CONN_RETRIES)
- g2.register(changelog_agent)
- g3.register(changelog_agent)
- except ChangelogException as e:
- logging.debug("Changelog register failed: %s - %s" %
- (e.errno, e.strerror))
-
+ register_time = int(time.time())
+ g2.register(register_time, changelog_agent)
+ g3.register(register_time, changelog_agent)
+ except ChangelogException:
+ changelog_register_failed = True
+ register_time = int(time.time())
+ logging.info("Changelog register failed, fallback to xsync")
+
+ g1.register(register_time)
+ logging.info("Register time: %s" % register_time)
# oneshot: Try to use changelog history api, if not
# available switch to FS crawl
# Note: if config.change_detector is xsync then
# it will not use changelog history api
try:
- g3.crawlwrap(oneshot=True)
+ if not changelog_register_failed:
+ g3.crawlwrap(oneshot=True)
+ else:
+ g1.crawlwrap(oneshot=True)
except (ChangelogException, NoPurgeTimeAvailable,
PartialHistoryAvailable) as e:
if isinstance(e, ChangelogException):
- logging.debug('Changelog history crawl failed, failback '
- 'to xsync: %s - %s' % (e.errno, e.strerror))
- elif isinstance(e, NoPurgeTimeAvailable):
- logging.debug('Using xsync crawl since no purge time '
- 'available')
+ logging.info('Changelog history crawl failed, fallback '
+ 'to xsync: %s - %s' % (e.errno, e.strerror))
elif isinstance(e, PartialHistoryAvailable):
- logging.debug('Using xsync crawl after consuming history '
- 'till %s' % str(e))
- g1.crawlwrap(oneshot=True)
+ logging.info('Partial history available, using xsync crawl'
+ ' after consuming history '
+ 'till %s' % str(e))
+ g1.crawlwrap(oneshot=True, no_stime_update=True)
+
+ # Reset xsync upper limit. g2, g3 are changelog and history
+ # instances, but if change_detector is set to xsync then
+ # g1, g2, g3 will be xsync instances.
+ g1.xsync_upper_limit = None
+ if getattr(g2, "xsync_upper_limit", None) is not None:
+ g2.xsync_upper_limit = None
+
+ if getattr(g3, "xsync_upper_limit", None) is not None:
+ g3.xsync_upper_limit = None
# crawl loop: Try changelog crawl, if failed
# switch to FS crawl
try:
- g2.crawlwrap()
+ if not changelog_register_failed:
+ g2.crawlwrap()
+ else:
+ g1.crawlwrap()
except ChangelogException as e:
- logging.debug('Changelog crawl failed, failback to xsync: '
- '%s - %s' % (e.errno, e.strerror))
+ logging.info('Changelog crawl failed, fallback to xsync')
g1.crawlwrap()
else:
sup(self, *args)