summaryrefslogtreecommitdiffstats
path: root/geo-replication/syncdaemon/master.py
diff options
context:
space:
mode:
authorAravinda VK <avishwan@redhat.com>2014-04-29 12:14:24 +0530
committerVenky Shankar <vshankar@redhat.com>2014-05-09 00:27:40 -0700
commitc7b0396f680863528248e6f5a162de47184b6c88 (patch)
treeead2f295d041df1e258db4bf09fe10944b15f4d7 /geo-replication/syncdaemon/master.py
parent65757e0f57f93103d87fdf9534c5ca25b66d14b7 (diff)
geo-rep: Pause and Resume feature for geo-replication
Changelog consumption/processing now happens in seperate process group than monitor. When monitor process group gets SIGSTOP all worker process, ssh, rsync will be paused except the changelog processing. When it gets SIGCONT it resumes its operation. Changelog agent runs as RepceServer, geo-rep worker communicates with changelog agent using RepceClient. Change-Id: I35c333e4d8b13d03a7808aed601960eef23cfa04 BUG: 1093602 Signed-off-by: Venky Shankar <vshankar@redhat.com> Signed-off-by: Aravinda VK <avishwan@redhat.com> Reviewed-on: http://review.gluster.org/7322
Diffstat (limited to 'geo-replication/syncdaemon/master.py')
-rw-r--r--geo-replication/syncdaemon/master.py34
1 files changed, 18 insertions, 16 deletions
diff --git a/geo-replication/syncdaemon/master.py b/geo-replication/syncdaemon/master.py
index b6a7c894814..1f1fa1122cb 100644
--- a/geo-replication/syncdaemon/master.py
+++ b/geo-replication/syncdaemon/master.py
@@ -1108,9 +1108,9 @@ class GMasterChangelogMixin(GMasterCommon):
if isinstance(purge_time, int):
purge_time = None
- self.master.server.changelog_scan()
+ self.changelog_agent.scan()
self.crawls += 1
- changes = self.master.server.changelog_getchanges()
+ changes = self.changelog_agent.getchanges()
if changes:
if purge_time:
logging.info("slave's time: %s" % repr(purge_time))
@@ -1120,22 +1120,24 @@ class GMasterChangelogMixin(GMasterCommon):
logging.info(
'skipping already processed change: %s...' %
os.path.basename(pr))
- self.master.server.changelog_done(pr)
+ self.changelog_done_func(pr)
changes.remove(pr)
if changes:
logging.debug('processing changes %s' % repr(changes))
self.process(changes)
- def register(self):
+ def register(self, changelog_agent):
+ self.changelog_agent = changelog_agent
self.sleep_interval = int(gconf.change_interval)
- self.changelog_done_func = self.master.server.changelog_done
+ self.changelog_done_func = self.changelog_agent.done
class GMasterChangeloghistoryMixin(GMasterChangelogMixin):
- def register(self):
+ def register(self, changelog_agent):
+ self.changelog_agent = changelog_agent
self.changelog_register_time = int(time.time())
- self.changelog_done_func = self.master.server.history_changelog_done
+ self.changelog_done_func = self.changelog_agent.history_done
def crawl(self):
self.update_worker_crawl_status("History Crawl")
@@ -1157,21 +1159,21 @@ class GMasterChangeloghistoryMixin(GMasterChangelogMixin):
# location then consuming history will not work(Known issue as of now)
changelog_path = os.path.join(gconf.local_path,
".glusterfs/changelogs")
- ts = self.master.server.history_changelog(changelog_path,
- purge_time[0],
- self.changelog_register_time,
- int(gconf.sync_jobs))
+ ts = self.changelog_agent.history(changelog_path,
+ purge_time[0],
+ self.changelog_register_time,
+ int(gconf.sync_jobs))
# scan followed by getchanges till scan returns zero.
- # history_changelog_scan() is blocking call, till it gets the number
+ # history_scan() is blocking call, till it gets the number
# of changelogs to process. Returns zero when no changelogs
# to be processed. returns positive value as number of changelogs
# to be processed, which will be fetched using
- # history_changelog_getchanges()
- while self.master.server.history_changelog_scan() > 0:
+ # history_getchanges()
+ while self.changelog_agent.history_scan() > 0:
self.crawls += 1
- changes = self.master.server.history_changelog_getchanges()
+ changes = self.changelog_agent.history_getchanges()
if changes:
if purge_time:
logging.info("slave's time: %s" % repr(purge_time))
@@ -1208,7 +1210,7 @@ class GMasterXsyncMixin(GMasterChangelogMixin):
XSYNC_MAX_ENTRIES = 1 << 13
- def register(self):
+ def register(self, changelog_agent=None):
self.counter = 0
self.comlist = []
self.stimes = []