summaryrefslogtreecommitdiffstats
path: root/geo-replication/syncdaemon/resource.py
diff options
context:
space:
mode:
authorAravinda VK <avishwan@redhat.com>2014-04-29 12:14:24 +0530
committerVenky Shankar <vshankar@redhat.com>2014-05-09 00:27:40 -0700
commitc7b0396f680863528248e6f5a162de47184b6c88 (patch)
treeead2f295d041df1e258db4bf09fe10944b15f4d7 /geo-replication/syncdaemon/resource.py
parent65757e0f57f93103d87fdf9534c5ca25b66d14b7 (diff)
geo-rep: Pause and Resume feature for geo-replication
Changelog consumption/processing now happens in seperate process group than monitor. When monitor process group gets SIGSTOP all worker process, ssh, rsync will be paused except the changelog processing. When it gets SIGCONT it resumes its operation. Changelog agent runs as RepceServer, geo-rep worker communicates with changelog agent using RepceClient. Change-Id: I35c333e4d8b13d03a7808aed601960eef23cfa04 BUG: 1093602 Signed-off-by: Venky Shankar <vshankar@redhat.com> Signed-off-by: Aravinda VK <avishwan@redhat.com> Reviewed-on: http://review.gluster.org/7322
Diffstat (limited to 'geo-replication/syncdaemon/resource.py')
-rw-r--r--geo-replication/syncdaemon/resource.py69
1 files changed, 17 insertions, 52 deletions
diff --git a/geo-replication/syncdaemon/resource.py b/geo-replication/syncdaemon/resource.py
index 185722f5df0..79dc9e79e9d 100644
--- a/geo-replication/syncdaemon/resource.py
+++ b/geo-replication/syncdaemon/resource.py
@@ -35,6 +35,8 @@ from syncdutils import GsyncdError, select, privileged, boolify, funcode
from syncdutils import umask, entry2pb, gauxpfx, errno_wrap, lstat
from syncdutils import NoPurgeTimeAvailable, PartialHistoryAvailable
from syncdutils import ChangelogException
+from syncdutils import CHANGELOG_AGENT_CLIENT_VERSION
+
UrlRX = re.compile('\A(\w+)://([^ *?[]*)\Z')
HostRX = re.compile('[a-z\d](?:[a-z\d.-]*[a-z\d])?', re.I)
@@ -127,19 +129,7 @@ class _MetaXattr(object):
return getattr(self, meth)
-class _MetaChangelog(object):
-
- def __getattr__(self, meth):
- from libgfchangelog import Changes as LChanges
- xmeth = [m for m in dir(LChanges) if m[0] != '_']
- if not meth in xmeth:
- return
- for m in xmeth:
- setattr(self, m, getattr(LChanges, m))
- return getattr(self, meth)
-
Xattr = _MetaXattr()
-Changes = _MetaChangelog()
class Popen(subprocess.Popen):
@@ -669,39 +659,6 @@ class Server(object):
errno_wrap(os.chown, [go, uid, gid], [ENOENT], [ESTALE, EINVAL])
@classmethod
- def changelog_register(cls, cl_brick, cl_dir, cl_log, cl_level, retries=0):
- Changes.cl_register(cl_brick, cl_dir, cl_log, cl_level, retries)
-
- @classmethod
- def changelog_scan(cls):
- Changes.cl_scan()
-
- @classmethod
- def changelog_getchanges(cls):
- return Changes.cl_getchanges()
-
- @classmethod
- def changelog_done(cls, clfile):
- Changes.cl_done(clfile)
-
- @classmethod
- def history_changelog(cls, changelog_path, start, end, num_parallel):
- return Changes.cl_history_changelog(changelog_path, start, end,
- num_parallel)
-
- @classmethod
- def history_changelog_scan(cls):
- return Changes.cl_history_scan()
-
- @classmethod
- def history_changelog_getchanges(cls):
- return Changes.cl_history_getchanges()
-
- @classmethod
- def history_changelog_done(cls, clfile):
- Changes.cl_history_done(clfile)
-
- @classmethod
@_pathguard
def setattr(cls, path, adct):
"""set file attributes
@@ -932,9 +889,6 @@ class AbstractUrl(object):
return self.get_url()
- ### Concrete resource classes ###
-
-
class FILE(AbstractUrl, SlaveLocal, SlaveRemote):
"""scheme class for file:// urls
@@ -1311,16 +1265,27 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):
# register the crawlers and start crawling
# g1 ==> Xsync, g2 ==> config.change_detector(changelog by default)
# g3 ==> changelog History
+ (inf, ouf, ra, wa) = gconf.rpc_fd.split(',')
+ os.close(int(ra))
+ os.close(int(wa))
+ changelog_agent = RepceClient(int(inf), int(ouf))
+ rv = changelog_agent.version()
+ if int(rv) != CHANGELOG_AGENT_CLIENT_VERSION:
+ raise GsyncdError(
+ "RePCe major version mismatch(changelog agent): "
+ "local %s, remote %s" %
+ (CHANGELOG_AGENT_CLIENT_VERSION, rv))
+
g1.register()
try:
(workdir, logfile) = g2.setup_working_dir()
# register with the changelog library
# 9 == log level (DEBUG)
# 5 == connection retries
- brickserver.changelog_register(gconf.local_path,
- workdir, logfile, 9, 5)
- g2.register()
- g3.register()
+ changelog_agent.register(gconf.local_path,
+ workdir, logfile, 9, 5)
+ g2.register(changelog_agent)
+ g3.register(changelog_agent)
except ChangelogException as e:
logging.debug("Changelog register failed: %s - %s" %
(e.errno, e.strerror))