summaryrefslogtreecommitdiffstats
path: root/geo-replication
diff options
context:
space:
mode:
authorAravinda VK <avishwan@redhat.com>2014-08-08 15:06:11 +0530
committerVijay Bellur <vbellur@redhat.com>2014-08-19 23:27:26 -0700
commitd0a3b4307e40b70d73faf4baed9fa0c930682894 (patch)
tree2710140e3a1b38eb0fed69be7b1dc0cd69e0a012 /geo-replication
parent9b5231e5c98b8cfa116838287c7a14042702795f (diff)
geo-rep: Fixing issue with xsync upper limit
While identifying the file/dir to sync, xtime of the file was compared with xsync_upper_limit as `xtime < xsync_upper_limit` After the sync, xtime of parent directory is updated as stime. With the upper limit condition, stime is updated as MIN(xtime_parent, xsync_upper_limit) With this files will get missed if `xtime_of_file == xsync_upper_limit` With this patch xtime_of_file is compared as xtime_of_file <= xsync_upper_limit BUG: 1128093 Change-Id: I5022ce67ba503ed4621531a649a16fc06b2229d9 Signed-off-by: Aravinda VK <avishwan@redhat.com> Reviewed-on: http://review.gluster.org/8439 Tested-by: Gluster Build System <jenkins@build.gluster.com> Reviewed-by: Kotresh HR <khiremat@redhat.com> Reviewed-by: Vijay Bellur <vbellur@redhat.com>
Diffstat (limited to 'geo-replication')
-rw-r--r--geo-replication/syncdaemon/master.py73
-rw-r--r--geo-replication/syncdaemon/resource.py36
2 files changed, 50 insertions, 59 deletions
diff --git a/geo-replication/syncdaemon/master.py b/geo-replication/syncdaemon/master.py
index e3904736ba2..79c90630dca 100644
--- a/geo-replication/syncdaemon/master.py
+++ b/geo-replication/syncdaemon/master.py
@@ -168,10 +168,10 @@ class NormalMixin(object):
raise GsyncdError("timestamp corruption for " + path)
def need_sync(self, e, xte, xtrd):
- if self.xsync_upper_limit is None:
- return xte > xtrd
+ if self.xsync_upper_limit:
+ return xte > xtrd and xte <= self.xsync_upper_limit
else:
- return xte > xtrd and xte < self.xsync_upper_limit
+ return xte > xtrd
def set_slave_xtime(self, path, mark):
self.slave.server.set_stime(path, self.uuid, mark)
@@ -442,13 +442,24 @@ class GMasterCommon(object):
def register(self):
self.register()
- def crawlwrap(self, oneshot=False, no_stime_update=False):
+ def crawlwrap(self, oneshot=False, no_stime_update=False,
+ register_time=None):
if oneshot:
# it's important to do this during the oneshot crawl as
# for a passive gsyncd (ie. in a replicate scenario)
# the keepalive thread would keep the connection alive.
self.init_keep_alive()
+ # If crawlwrap is called when partial history available,
+ # then it sets register_time which is the time when geo-rep
+ # worker registerd to changelog consumption. Since nsec is
+ # not considered in register time, their are chances of skipping
+ # changes detection in xsync crawl. Add 1 sec to upper_limit.
+ # This limit will be reset when crawlwrap is called again.
+ self.xsync_upper_limit = None
+ if register_time:
+ self.xsync_upper_limit = (register_time + 1, 0)
+
# no need to maintain volinfo state machine.
# in a cascading setup, each geo-replication session is
# independent (ie. 'volume-mark' and 'xtime' are not
@@ -757,6 +768,12 @@ class GMasterChangelogMixin(GMasterCommon):
logging.debug('changelog working dir %s' % workdir)
return workdir
+ def get_purge_time(self):
+ purge_time = self.xtime('.', self.slave)
+ if isinstance(purge_time, int):
+ purge_time = None
+ return purge_time
+
def process_change(self, change, done, retry):
pfx = gauxpfx()
clist = []
@@ -1135,9 +1152,7 @@ class GMasterChangelogMixin(GMasterCommon):
changes = []
# get stime (from the brick) and purge changelogs
# that are _historical_ to that time.
- purge_time = self.xtime('.', self.slave)
- if isinstance(purge_time, int):
- purge_time = None
+ purge_time = self.get_purge_time()
self.changelog_agent.scan()
self.crawls += 1
@@ -1175,14 +1190,10 @@ class GMasterChangeloghistoryMixin(GMasterChangelogMixin):
def crawl(self, no_stime_update=False):
self.history_turns += 1
self.update_worker_crawl_status("History Crawl")
- logging.info('starting history crawl... turns: %s' %
- self.history_turns)
+ purge_time = self.get_purge_time()
- # get stime (from the brick) and purge changelogs
- # that are _historical_ to that time.
- purge_time = self.xtime('.', self.slave)
- if isinstance(purge_time, int):
- purge_time = None
+ logging.info('starting history crawl... turns: %s, stime: %s'
+ % (self.history_turns, repr(purge_time)))
if not purge_time or purge_time == URXTIME:
logging.info("stime not available, abandoning history crawl")
@@ -1226,8 +1237,8 @@ class GMasterChangeloghistoryMixin(GMasterChangelogMixin):
history_turn_time = int(time.time()) - self.history_crawl_start_time
- logging.info('finished history crawl syncing between %s - %s.' %
- (purge_time[0], actual_end))
+ logging.info('finished history crawl syncing, endtime: %s, stime: %s'
+ % (actual_end, repr(self.get_purge_time())))
# If TS returned from history_changelog is < register_time
# then FS crawl may be required, since history is only available
@@ -1260,7 +1271,7 @@ class GMasterXsyncMixin(GMasterChangelogMixin):
XSYNC_MAX_ENTRIES = 1 << 13
- def register(self, register_time, changelog_agent=None):
+ def register(self, register_time=None, changelog_agent=None):
self.counter = 0
self.comlist = []
self.stimes = []
@@ -1277,17 +1288,6 @@ class GMasterXsyncMixin(GMasterChangelogMixin):
else:
raise
- # After changelogs history processing completes, it switches
- # to xsync/hibrid crawl if history actual end time is less than
- # live changelog register time. Xsync should only run for that
- # small gap, ie.. changelog_register_time - history_actual_end_time
- # If we don't have upper limit to limit the XSync change detection
- # It will keep on detecting the files even though changelogs are
- # available for the same. Set upper limit during register
- # and reset at the end of each crawl. Reseting at the end of
- # crawl is required if change_detector is set to xsync.
- self.xsync_upper_limit = (register_time, 0)
-
def crawl(self, no_stime_update=False):
"""
event dispatcher thread
@@ -1299,18 +1299,15 @@ class GMasterXsyncMixin(GMasterChangelogMixin):
self.Xcrawl()
t = Thread(target=Xsyncer)
t.start()
- logging.info('starting hybrid crawl...')
+ logging.info('starting hybrid crawl..., stime: %s'
+ % repr(self.get_purge_time()))
self.update_worker_crawl_status("Hybrid Crawl")
while True:
try:
item = self.comlist.pop(0)
if item[0] == 'finale':
- if self.xsync_upper_limit is not None:
- logging.info('finished hybrid crawl syncing, endtime: '
- '%s' % self.xsync_upper_limit[0])
- else:
- logging.info('finished hybrid crawl syncing')
-
+ logging.info('finished hybrid crawl syncing, stime: %s'
+ % repr(self.get_purge_time()))
break
elif item[0] == 'xsync':
logging.info('processing xsync changelog %s' % (item[1]))
@@ -1333,8 +1330,6 @@ class GMasterXsyncMixin(GMasterChangelogMixin):
except IndexError:
time.sleep(1)
- self.xsync_upper_limit = None
-
def write_entry_change(self, prefix, data=[]):
self.fh.write("%s %s\n" % (prefix, ' '.join(data)))
@@ -1460,7 +1455,7 @@ class GMasterXsyncMixin(GMasterChangelogMixin):
str(st.st_gid), str(st.st_mode)])
self.Xcrawl(e, xtr_root)
stime_to_update = xte
- if self.xsync_upper_limit is not None:
+ if self.xsync_upper_limit:
stime_to_update = min(self.xsync_upper_limit, xte)
self.stimes.append((e, stime_to_update))
elif stat.S_ISLNK(mo):
@@ -1487,7 +1482,7 @@ class GMasterXsyncMixin(GMasterChangelogMixin):
self.write_entry_change("D", [gfid])
if path == '.':
stime_to_update = xtl
- if self.xsync_upper_limit is not None:
+ if self.xsync_upper_limit:
stime_to_update = min(self.xsync_upper_limit, xtl)
self.stimes.append((path, stime_to_update))
self.sync_done(self.stimes, True)
diff --git a/geo-replication/syncdaemon/resource.py b/geo-replication/syncdaemon/resource.py
index 09310c1f1aa..2a887daab15 100644
--- a/geo-replication/syncdaemon/resource.py
+++ b/geo-replication/syncdaemon/resource.py
@@ -1293,22 +1293,27 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):
try:
workdir = g2.setup_working_dir()
- # register with the changelog library
- # 9 == log level (DEBUG)
- # 5 == connection retries
- changelog_agent.register(gconf.local_path,
- workdir, gconf.changelog_log_file,
- g2.CHANGELOG_LOG_LEVEL,
- g2.CHANGELOG_CONN_RETRIES)
+ # Register only when change_detector is not set to
+ # xsync, else agent will generate changelog files
+ # in .processing directory of working dir
+ if gconf.change_detector != 'xsync':
+ # register with the changelog library
+ # 9 == log level (DEBUG)
+ # 5 == connection retries
+ changelog_agent.register(gconf.local_path,
+ workdir, gconf.changelog_log_file,
+ g2.CHANGELOG_LOG_LEVEL,
+ g2.CHANGELOG_CONN_RETRIES)
+
register_time = int(time.time())
g2.register(register_time, changelog_agent)
g3.register(register_time, changelog_agent)
except ChangelogException:
changelog_register_failed = True
- register_time = int(time.time())
+ register_time = None
logging.info("Changelog register failed, fallback to xsync")
- g1.register(register_time)
+ g1.register()
logging.info("Register time: %s" % register_time)
# oneshot: Try to use changelog history api, if not
# available switch to FS crawl
@@ -1328,17 +1333,8 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):
logging.info('Partial history available, using xsync crawl'
' after consuming history '
'till %s' % str(e))
- g1.crawlwrap(oneshot=True, no_stime_update=True)
-
- # Reset xsync upper limit. g2, g3 are changelog and history
- # instances, but if change_detector is set to xsync then
- # g1, g2, g3 will be xsync instances.
- g1.xsync_upper_limit = None
- if getattr(g2, "xsync_upper_limit", None) is not None:
- g2.xsync_upper_limit = None
-
- if getattr(g3, "xsync_upper_limit", None) is not None:
- g3.xsync_upper_limit = None
+ g1.crawlwrap(oneshot=True, no_stime_update=True,
+ register_time=register_time)
# crawl loop: Try changelog crawl, if failed
# switch to FS crawl