summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--geo-replication/syncdaemon/Makefile.am2
-rw-r--r--geo-replication/syncdaemon/changelogagent.py78
-rw-r--r--geo-replication/syncdaemon/gsyncd.py13
-rw-r--r--geo-replication/syncdaemon/master.py34
-rw-r--r--geo-replication/syncdaemon/monitor.py34
-rw-r--r--geo-replication/syncdaemon/resource.py69
-rw-r--r--geo-replication/syncdaemon/syncdutils.py3
7 files changed, 162 insertions, 71 deletions
diff --git a/geo-replication/syncdaemon/Makefile.am b/geo-replication/syncdaemon/Makefile.am
index 83f969639cc..885963eae2b 100644
--- a/geo-replication/syncdaemon/Makefile.am
+++ b/geo-replication/syncdaemon/Makefile.am
@@ -2,6 +2,6 @@ syncdaemondir = $(libexecdir)/glusterfs/python/syncdaemon
syncdaemon_PYTHON = gconf.py gsyncd.py __init__.py master.py README.md repce.py \
resource.py configinterface.py syncdutils.py monitor.py libcxattr.py \
- $(top_builddir)/contrib/ipaddr-py/ipaddr.py libgfchangelog.py
+ $(top_builddir)/contrib/ipaddr-py/ipaddr.py libgfchangelog.py changelogagent.py
CLEANFILES =
diff --git a/geo-replication/syncdaemon/changelogagent.py b/geo-replication/syncdaemon/changelogagent.py
new file mode 100644
index 00000000000..54d82cefcd2
--- /dev/null
+++ b/geo-replication/syncdaemon/changelogagent.py
@@ -0,0 +1,78 @@
+#!/usr/bin/env python
+#
+# Copyright (c) 2011-2014 Red Hat, Inc. <http://www.redhat.com>
+# This file is part of GlusterFS.
+
+# This file is licensed to you under your choice of the GNU Lesser
+# General Public License, version 3 or any later version (LGPLv3 or
+# later), or the GNU General Public License, version 2 (GPLv2), in all
+# cases as published by the Free Software Foundation.
+#
+
+import os
+import logging
+import syncdutils
+from syncdutils import select, CHANGELOG_AGENT_SERVER_VERSION
+from repce import RepceServer
+
+
+class _MetaChangelog(object):
+
+ def __getattr__(self, meth):
+ from libgfchangelog import Changes as LChanges
+ xmeth = [m for m in dir(LChanges) if m[0] != '_']
+ if meth not in xmeth:
+ return
+ for m in xmeth:
+ setattr(self, m, getattr(LChanges, m))
+ return getattr(self, meth)
+
+Changes = _MetaChangelog()
+
+
+class Changelog(object):
+ def version(self):
+ return CHANGELOG_AGENT_SERVER_VERSION
+
+ def register(self, cl_brick, cl_dir, cl_log, cl_level, retries=0):
+ return Changes.cl_register(cl_brick, cl_dir, cl_log, cl_level, retries)
+
+ def scan(self):
+ return Changes.cl_scan()
+
+ def getchanges(self):
+ return Changes.cl_getchanges()
+
+ def done(self, clfile):
+ return Changes.cl_done(clfile)
+
+ def history(self, changelog_path, start, end, num_parallel):
+ return Changes.cl_history_changelog(changelog_path, start, end,
+ num_parallel)
+
+ def history_scan(self):
+ return Changes.cl_history_scan()
+
+ def history_getchanges(self):
+ return Changes.cl_history_getchanges()
+
+ def history_done(self, clfile):
+ return Changes.cl_history_done(clfile)
+
+
+class ChangelogAgent(object):
+ def __init__(self, obj, fd_tup):
+ (inf, ouf, rw, ww) = fd_tup.split(',')
+ os.close(int(rw))
+ os.close(int(ww))
+ repce = RepceServer(obj, int(inf), int(ouf), 1)
+ t = syncdutils.Thread(target=lambda: (repce.service_loop(),
+ syncdutils.finalize()))
+ t.start()
+ logging.info('Agent listining...')
+
+ select((), (), ())
+
+
+def agent(obj, fd_tup):
+ return ChangelogAgent(obj, fd_tup)
diff --git a/geo-replication/syncdaemon/gsyncd.py b/geo-replication/syncdaemon/gsyncd.py
index 426d964de95..7d463ad23f3 100644
--- a/geo-replication/syncdaemon/gsyncd.py
+++ b/geo-replication/syncdaemon/gsyncd.py
@@ -32,6 +32,7 @@ from syncdutils import GsyncdError, select, set_term_handler
from configinterface import GConffile, upgrade_config_file
import resource
from monitor import monitor
+from changelogagent import agent, Changelog
class GLogger(Logger):
@@ -175,6 +176,7 @@ def main_i():
- query/manipulate configuration
- format gsyncd urls using gsyncd's url parsing engine
- start service in following modes, in given stages:
+ - agent: startup(), ChangelogAgent()
- monitor: startup(), monitor()
- master: startup(), connect_remote(), connect(), service_loop()
- slave: startup(), connect(), service_loop()
@@ -275,12 +277,15 @@ def main_i():
# duh. need to specify dest or value will be mapped to None :S
op.add_option('--monitor', dest='monitor', action='callback',
callback=store_local_curry(True))
+ op.add_option('--agent', dest='agent', action='callback',
+ callback=store_local_curry(True))
op.add_option('--resource-local', dest='resource_local',
type=str, action='callback', callback=store_local)
op.add_option('--resource-remote', dest='resource_remote',
type=str, action='callback', callback=store_local)
op.add_option('--feedback-fd', dest='feedback_fd', type=int,
help=SUPPRESS_HELP, action='callback', callback=store_local)
+ op.add_option('--rpc-fd', dest='rpc_fd', type=str, help=SUPPRESS_HELP)
op.add_option('--listen', dest='listen', help=SUPPRESS_HELP,
action='callback', callback=store_local_curry(True))
op.add_option('-N', '--no-daemon', dest="go_daemon",
@@ -586,6 +591,7 @@ def main_i():
go_daemon = rconf['go_daemon']
be_monitor = rconf.get('monitor')
+ be_agent = rconf.get('agent')
rscs, local, remote = makersc(args)
if not be_monitor and isinstance(remote, resource.SSH) and \
@@ -596,6 +602,8 @@ def main_i():
log_file = gconf.log_file
if be_monitor:
label = 'monitor'
+ elif be_agent:
+ label = 'agent'
elif remote:
# master
label = gconf.local_path
@@ -604,6 +612,11 @@ def main_i():
startup(go_daemon=go_daemon, log_file=log_file, label=label)
resource.Popen.init_errhandler()
+ if be_agent:
+ os.setsid()
+ logging.debug('rpc_fd: %s' % repr(gconf.rpc_fd))
+ return agent(Changelog(), gconf.rpc_fd)
+
if be_monitor:
return monitor(*rscs)
diff --git a/geo-replication/syncdaemon/master.py b/geo-replication/syncdaemon/master.py
index b6a7c894814..1f1fa1122cb 100644
--- a/geo-replication/syncdaemon/master.py
+++ b/geo-replication/syncdaemon/master.py
@@ -1108,9 +1108,9 @@ class GMasterChangelogMixin(GMasterCommon):
if isinstance(purge_time, int):
purge_time = None
- self.master.server.changelog_scan()
+ self.changelog_agent.scan()
self.crawls += 1
- changes = self.master.server.changelog_getchanges()
+ changes = self.changelog_agent.getchanges()
if changes:
if purge_time:
logging.info("slave's time: %s" % repr(purge_time))
@@ -1120,22 +1120,24 @@ class GMasterChangelogMixin(GMasterCommon):
logging.info(
'skipping already processed change: %s...' %
os.path.basename(pr))
- self.master.server.changelog_done(pr)
+ self.changelog_done_func(pr)
changes.remove(pr)
if changes:
logging.debug('processing changes %s' % repr(changes))
self.process(changes)
- def register(self):
+ def register(self, changelog_agent):
+ self.changelog_agent = changelog_agent
self.sleep_interval = int(gconf.change_interval)
- self.changelog_done_func = self.master.server.changelog_done
+ self.changelog_done_func = self.changelog_agent.done
class GMasterChangeloghistoryMixin(GMasterChangelogMixin):
- def register(self):
+ def register(self, changelog_agent):
+ self.changelog_agent = changelog_agent
self.changelog_register_time = int(time.time())
- self.changelog_done_func = self.master.server.history_changelog_done
+ self.changelog_done_func = self.changelog_agent.history_done
def crawl(self):
self.update_worker_crawl_status("History Crawl")
@@ -1157,21 +1159,21 @@ class GMasterChangeloghistoryMixin(GMasterChangelogMixin):
# location then consuming history will not work(Known issue as of now)
changelog_path = os.path.join(gconf.local_path,
".glusterfs/changelogs")
- ts = self.master.server.history_changelog(changelog_path,
- purge_time[0],
- self.changelog_register_time,
- int(gconf.sync_jobs))
+ ts = self.changelog_agent.history(changelog_path,
+ purge_time[0],
+ self.changelog_register_time,
+ int(gconf.sync_jobs))
# scan followed by getchanges till scan returns zero.
- # history_changelog_scan() is blocking call, till it gets the number
+ # history_scan() is blocking call, till it gets the number
# of changelogs to process. Returns zero when no changelogs
# to be processed. returns positive value as number of changelogs
# to be processed, which will be fetched using
- # history_changelog_getchanges()
- while self.master.server.history_changelog_scan() > 0:
+ # history_getchanges()
+ while self.changelog_agent.history_scan() > 0:
self.crawls += 1
- changes = self.master.server.history_changelog_getchanges()
+ changes = self.changelog_agent.history_getchanges()
if changes:
if purge_time:
logging.info("slave's time: %s" % repr(purge_time))
@@ -1208,7 +1210,7 @@ class GMasterXsyncMixin(GMasterChangelogMixin):
XSYNC_MAX_ENTRIES = 1 << 13
- def register(self):
+ def register(self, changelog_agent=None):
self.counter = 0
self.comlist = []
self.stimes = []
diff --git a/geo-replication/syncdaemon/monitor.py b/geo-replication/syncdaemon/monitor.py
index 8ed6f832618..e49a24ee5f5 100644
--- a/geo-replication/syncdaemon/monitor.py
+++ b/geo-replication/syncdaemon/monitor.py
@@ -108,7 +108,7 @@ class Monitor(object):
# give a chance to graceful exit
os.kill(-os.getpid(), signal.SIGTERM)
- def monitor(self, w, argv, cpids):
+ def monitor(self, w, argv, cpids, agents):
"""the monitor loop
Basic logic is a blantantly simple blunt heuristics:
@@ -149,6 +149,23 @@ class Monitor(object):
while ret in (0, 1):
logging.info('-' * conn_timeout)
logging.info('starting gsyncd worker')
+
+ # Couple of pipe pairs for RPC communication b/w
+ # worker and changelog agent.
+
+ # read/write end for agent
+ (ra, ww) = os.pipe()
+ # read/write end for worker
+ (rw, wa) = os.pipe()
+
+ # spawn the agent process
+ apid = os.fork()
+ if apid == 0:
+ os.execv(sys.executable, argv + ['--local-path', w[0],
+ '--agent',
+ '--rpc-fd',
+ ','.join([str(ra), str(wa),
+ str(rw), str(ww)])])
pr, pw = os.pipe()
cpid = os.fork()
if cpid == 0:
@@ -157,14 +174,26 @@ class Monitor(object):
'--local-path', w[0],
'--local-id',
'.' + escape(w[0]),
+ '--rpc-fd',
+ ','.join([str(rw), str(ww),
+ str(ra), str(wa)]),
'--resource-remote', w[1]])
self.lock.acquire()
cpids.add(cpid)
+ agents.add(apid)
self.lock.release()
os.close(pw)
+
t0 = time.time()
so = select((pr,), (), (), conn_timeout)[0]
os.close(pr)
+
+ # close all RPC pipes in monitor
+ os.close(ra)
+ os.close(wa)
+ os.close(rw)
+ os.close(ww)
+
if so:
ret = nwait(cpid, os.WNOHANG)
if ret is not None:
@@ -206,10 +235,11 @@ class Monitor(object):
argv.insert(0, os.path.basename(sys.executable))
cpids = set()
+ agents = set()
ta = []
for wx in wspx:
def wmon(w):
- cpid, _ = self.monitor(w, argv, cpids)
+ cpid, _ = self.monitor(w, argv, cpids, agents)
time.sleep(1)
self.lock.acquire()
for cpid in cpids:
diff --git a/geo-replication/syncdaemon/resource.py b/geo-replication/syncdaemon/resource.py
index 185722f5df0..79dc9e79e9d 100644
--- a/geo-replication/syncdaemon/resource.py
+++ b/geo-replication/syncdaemon/resource.py
@@ -35,6 +35,8 @@ from syncdutils import GsyncdError, select, privileged, boolify, funcode
from syncdutils import umask, entry2pb, gauxpfx, errno_wrap, lstat
from syncdutils import NoPurgeTimeAvailable, PartialHistoryAvailable
from syncdutils import ChangelogException
+from syncdutils import CHANGELOG_AGENT_CLIENT_VERSION
+
UrlRX = re.compile('\A(\w+)://([^ *?[]*)\Z')
HostRX = re.compile('[a-z\d](?:[a-z\d.-]*[a-z\d])?', re.I)
@@ -127,19 +129,7 @@ class _MetaXattr(object):
return getattr(self, meth)
-class _MetaChangelog(object):
-
- def __getattr__(self, meth):
- from libgfchangelog import Changes as LChanges
- xmeth = [m for m in dir(LChanges) if m[0] != '_']
- if not meth in xmeth:
- return
- for m in xmeth:
- setattr(self, m, getattr(LChanges, m))
- return getattr(self, meth)
-
Xattr = _MetaXattr()
-Changes = _MetaChangelog()
class Popen(subprocess.Popen):
@@ -669,39 +659,6 @@ class Server(object):
errno_wrap(os.chown, [go, uid, gid], [ENOENT], [ESTALE, EINVAL])
@classmethod
- def changelog_register(cls, cl_brick, cl_dir, cl_log, cl_level, retries=0):
- Changes.cl_register(cl_brick, cl_dir, cl_log, cl_level, retries)
-
- @classmethod
- def changelog_scan(cls):
- Changes.cl_scan()
-
- @classmethod
- def changelog_getchanges(cls):
- return Changes.cl_getchanges()
-
- @classmethod
- def changelog_done(cls, clfile):
- Changes.cl_done(clfile)
-
- @classmethod
- def history_changelog(cls, changelog_path, start, end, num_parallel):
- return Changes.cl_history_changelog(changelog_path, start, end,
- num_parallel)
-
- @classmethod
- def history_changelog_scan(cls):
- return Changes.cl_history_scan()
-
- @classmethod
- def history_changelog_getchanges(cls):
- return Changes.cl_history_getchanges()
-
- @classmethod
- def history_changelog_done(cls, clfile):
- Changes.cl_history_done(clfile)
-
- @classmethod
@_pathguard
def setattr(cls, path, adct):
"""set file attributes
@@ -932,9 +889,6 @@ class AbstractUrl(object):
return self.get_url()
- ### Concrete resource classes ###
-
-
class FILE(AbstractUrl, SlaveLocal, SlaveRemote):
"""scheme class for file:// urls
@@ -1311,16 +1265,27 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):
# register the crawlers and start crawling
# g1 ==> Xsync, g2 ==> config.change_detector(changelog by default)
# g3 ==> changelog History
+ (inf, ouf, ra, wa) = gconf.rpc_fd.split(',')
+ os.close(int(ra))
+ os.close(int(wa))
+ changelog_agent = RepceClient(int(inf), int(ouf))
+ rv = changelog_agent.version()
+ if int(rv) != CHANGELOG_AGENT_CLIENT_VERSION:
+ raise GsyncdError(
+ "RePCe major version mismatch(changelog agent): "
+ "local %s, remote %s" %
+ (CHANGELOG_AGENT_CLIENT_VERSION, rv))
+
g1.register()
try:
(workdir, logfile) = g2.setup_working_dir()
# register with the changelog library
# 9 == log level (DEBUG)
# 5 == connection retries
- brickserver.changelog_register(gconf.local_path,
- workdir, logfile, 9, 5)
- g2.register()
- g3.register()
+ changelog_agent.register(gconf.local_path,
+ workdir, logfile, 9, 5)
+ g2.register(changelog_agent)
+ g3.register(changelog_agent)
except ChangelogException as e:
logging.debug("Changelog register failed: %s - %s" %
(e.errno, e.strerror))
diff --git a/geo-replication/syncdaemon/syncdutils.py b/geo-replication/syncdaemon/syncdutils.py
index 65daeb0fe7c..9eda6044472 100644
--- a/geo-replication/syncdaemon/syncdutils.py
+++ b/geo-replication/syncdaemon/syncdutils.py
@@ -47,6 +47,9 @@ except ImportError:
_CL_AUX_GFID_PFX = ".gfid/"
GF_OP_RETRIES = 20
+CHANGELOG_AGENT_SERVER_VERSION = 1.0
+CHANGELOG_AGENT_CLIENT_VERSION = 1.0
+
def escape(s):
"""the chosen flavor of string escaping, used all over