diff options
author | Aravinda VK <avishwan@redhat.com> | 2014-04-29 12:14:24 +0530 |
---|---|---|
committer | Venky Shankar <vshankar@redhat.com> | 2014-05-09 00:27:40 -0700 |
commit | c7b0396f680863528248e6f5a162de47184b6c88 (patch) | |
tree | ead2f295d041df1e258db4bf09fe10944b15f4d7 /geo-replication/syncdaemon/master.py | |
parent | 65757e0f57f93103d87fdf9534c5ca25b66d14b7 (diff) |
geo-rep: Pause and Resume feature for geo-replication
Changelog consumption/processing now happens in seperate process
group than monitor. When monitor process group gets SIGSTOP all
worker process, ssh, rsync will be paused except the changelog
processing. When it gets SIGCONT it resumes its operation.
Changelog agent runs as RepceServer, geo-rep worker communicates
with changelog agent using RepceClient.
Change-Id: I35c333e4d8b13d03a7808aed601960eef23cfa04
BUG: 1093602
Signed-off-by: Venky Shankar <vshankar@redhat.com>
Signed-off-by: Aravinda VK <avishwan@redhat.com>
Reviewed-on: http://review.gluster.org/7322
Diffstat (limited to 'geo-replication/syncdaemon/master.py')
-rw-r--r-- | geo-replication/syncdaemon/master.py | 34 |
1 files changed, 18 insertions, 16 deletions
diff --git a/geo-replication/syncdaemon/master.py b/geo-replication/syncdaemon/master.py index b6a7c894814..1f1fa1122cb 100644 --- a/geo-replication/syncdaemon/master.py +++ b/geo-replication/syncdaemon/master.py @@ -1108,9 +1108,9 @@ class GMasterChangelogMixin(GMasterCommon): if isinstance(purge_time, int): purge_time = None - self.master.server.changelog_scan() + self.changelog_agent.scan() self.crawls += 1 - changes = self.master.server.changelog_getchanges() + changes = self.changelog_agent.getchanges() if changes: if purge_time: logging.info("slave's time: %s" % repr(purge_time)) @@ -1120,22 +1120,24 @@ class GMasterChangelogMixin(GMasterCommon): logging.info( 'skipping already processed change: %s...' % os.path.basename(pr)) - self.master.server.changelog_done(pr) + self.changelog_done_func(pr) changes.remove(pr) if changes: logging.debug('processing changes %s' % repr(changes)) self.process(changes) - def register(self): + def register(self, changelog_agent): + self.changelog_agent = changelog_agent self.sleep_interval = int(gconf.change_interval) - self.changelog_done_func = self.master.server.changelog_done + self.changelog_done_func = self.changelog_agent.done class GMasterChangeloghistoryMixin(GMasterChangelogMixin): - def register(self): + def register(self, changelog_agent): + self.changelog_agent = changelog_agent self.changelog_register_time = int(time.time()) - self.changelog_done_func = self.master.server.history_changelog_done + self.changelog_done_func = self.changelog_agent.history_done def crawl(self): self.update_worker_crawl_status("History Crawl") @@ -1157,21 +1159,21 @@ class GMasterChangeloghistoryMixin(GMasterChangelogMixin): # location then consuming history will not work(Known issue as of now) changelog_path = os.path.join(gconf.local_path, ".glusterfs/changelogs") - ts = self.master.server.history_changelog(changelog_path, - purge_time[0], - self.changelog_register_time, - int(gconf.sync_jobs)) + ts = self.changelog_agent.history(changelog_path, + purge_time[0], + self.changelog_register_time, + int(gconf.sync_jobs)) # scan followed by getchanges till scan returns zero. - # history_changelog_scan() is blocking call, till it gets the number + # history_scan() is blocking call, till it gets the number # of changelogs to process. Returns zero when no changelogs # to be processed. returns positive value as number of changelogs # to be processed, which will be fetched using - # history_changelog_getchanges() - while self.master.server.history_changelog_scan() > 0: + # history_getchanges() + while self.changelog_agent.history_scan() > 0: self.crawls += 1 - changes = self.master.server.history_changelog_getchanges() + changes = self.changelog_agent.history_getchanges() if changes: if purge_time: logging.info("slave's time: %s" % repr(purge_time)) @@ -1208,7 +1210,7 @@ class GMasterXsyncMixin(GMasterChangelogMixin): XSYNC_MAX_ENTRIES = 1 << 13 - def register(self): + def register(self, changelog_agent=None): self.counter = 0 self.comlist = [] self.stimes = [] |