diff options
Diffstat (limited to 'geo-replication')
| -rw-r--r-- | geo-replication/syncdaemon/Makefile.am | 2 | ||||
| -rw-r--r-- | geo-replication/syncdaemon/changelogagent.py | 78 | ||||
| -rw-r--r-- | geo-replication/syncdaemon/gsyncd.py | 13 | ||||
| -rw-r--r-- | geo-replication/syncdaemon/master.py | 34 | ||||
| -rw-r--r-- | geo-replication/syncdaemon/monitor.py | 34 | ||||
| -rw-r--r-- | geo-replication/syncdaemon/resource.py | 69 | ||||
| -rw-r--r-- | geo-replication/syncdaemon/syncdutils.py | 3 | 
7 files changed, 162 insertions, 71 deletions
diff --git a/geo-replication/syncdaemon/Makefile.am b/geo-replication/syncdaemon/Makefile.am index 83f969639cc..885963eae2b 100644 --- a/geo-replication/syncdaemon/Makefile.am +++ b/geo-replication/syncdaemon/Makefile.am @@ -2,6 +2,6 @@ syncdaemondir = $(libexecdir)/glusterfs/python/syncdaemon  syncdaemon_PYTHON = gconf.py gsyncd.py __init__.py master.py README.md repce.py \  	resource.py configinterface.py syncdutils.py monitor.py libcxattr.py \ -	$(top_builddir)/contrib/ipaddr-py/ipaddr.py libgfchangelog.py +	$(top_builddir)/contrib/ipaddr-py/ipaddr.py libgfchangelog.py changelogagent.py  CLEANFILES = diff --git a/geo-replication/syncdaemon/changelogagent.py b/geo-replication/syncdaemon/changelogagent.py new file mode 100644 index 00000000000..54d82cefcd2 --- /dev/null +++ b/geo-replication/syncdaemon/changelogagent.py @@ -0,0 +1,78 @@ +#!/usr/bin/env python +# +# Copyright (c) 2011-2014 Red Hat, Inc. <http://www.redhat.com> +# This file is part of GlusterFS. + +# This file is licensed to you under your choice of the GNU Lesser +# General Public License, version 3 or any later version (LGPLv3 or +# later), or the GNU General Public License, version 2 (GPLv2), in all +# cases as published by the Free Software Foundation. +# + +import os +import logging +import syncdutils +from syncdutils import select, CHANGELOG_AGENT_SERVER_VERSION +from repce import RepceServer + + +class _MetaChangelog(object): + +    def __getattr__(self, meth): +        from libgfchangelog import Changes as LChanges +        xmeth = [m for m in dir(LChanges) if m[0] != '_'] +        if meth not in xmeth: +            return +        for m in xmeth: +            setattr(self, m, getattr(LChanges, m)) +        return getattr(self, meth) + +Changes = _MetaChangelog() + + +class Changelog(object): +    def version(self): +        return CHANGELOG_AGENT_SERVER_VERSION + +    def register(self, cl_brick, cl_dir, cl_log, cl_level, retries=0): +        return Changes.cl_register(cl_brick, cl_dir, cl_log, cl_level, retries) + +    def scan(self): +        return Changes.cl_scan() + +    def getchanges(self): +        return Changes.cl_getchanges() + +    def done(self, clfile): +        return Changes.cl_done(clfile) + +    def history(self, changelog_path, start, end, num_parallel): +        return Changes.cl_history_changelog(changelog_path, start, end, +                                            num_parallel) + +    def history_scan(self): +        return Changes.cl_history_scan() + +    def history_getchanges(self): +        return Changes.cl_history_getchanges() + +    def history_done(self, clfile): +        return Changes.cl_history_done(clfile) + + +class ChangelogAgent(object): +    def __init__(self, obj, fd_tup): +        (inf, ouf, rw, ww) = fd_tup.split(',') +        os.close(int(rw)) +        os.close(int(ww)) +        repce = RepceServer(obj, int(inf), int(ouf), 1) +        t = syncdutils.Thread(target=lambda: (repce.service_loop(), +                                              syncdutils.finalize())) +        t.start() +        logging.info('Agent listining...') + +        select((), (), ()) + + +def agent(obj, fd_tup): +    return ChangelogAgent(obj, fd_tup) diff --git a/geo-replication/syncdaemon/gsyncd.py b/geo-replication/syncdaemon/gsyncd.py index 426d964de95..7d463ad23f3 100644 --- a/geo-replication/syncdaemon/gsyncd.py +++ b/geo-replication/syncdaemon/gsyncd.py @@ -32,6 +32,7 @@ from syncdutils import GsyncdError, select, set_term_handler  from configinterface import GConffile, upgrade_config_file  import resource  from monitor import monitor +from changelogagent import agent, Changelog  class GLogger(Logger): @@ -175,6 +176,7 @@ def main_i():      - query/manipulate configuration      - format gsyncd urls using gsyncd's url parsing engine      - start service in following modes, in given stages: +      - agent: startup(), ChangelogAgent()        - monitor: startup(), monitor()        - master: startup(), connect_remote(), connect(), service_loop()        - slave: startup(), connect(), service_loop() @@ -275,12 +277,15 @@ def main_i():      # duh. need to specify dest or value will be mapped to None :S      op.add_option('--monitor', dest='monitor', action='callback',                    callback=store_local_curry(True)) +    op.add_option('--agent', dest='agent', action='callback', +                  callback=store_local_curry(True))      op.add_option('--resource-local', dest='resource_local',                    type=str, action='callback', callback=store_local)      op.add_option('--resource-remote', dest='resource_remote',                    type=str, action='callback', callback=store_local)      op.add_option('--feedback-fd', dest='feedback_fd', type=int,                    help=SUPPRESS_HELP, action='callback', callback=store_local) +    op.add_option('--rpc-fd', dest='rpc_fd', type=str, help=SUPPRESS_HELP)      op.add_option('--listen', dest='listen', help=SUPPRESS_HELP,                    action='callback', callback=store_local_curry(True))      op.add_option('-N', '--no-daemon', dest="go_daemon", @@ -586,6 +591,7 @@ def main_i():      go_daemon = rconf['go_daemon']      be_monitor = rconf.get('monitor') +    be_agent = rconf.get('agent')      rscs, local, remote = makersc(args)      if not be_monitor and isinstance(remote, resource.SSH) and \ @@ -596,6 +602,8 @@ def main_i():          log_file = gconf.log_file      if be_monitor:          label = 'monitor' +    elif be_agent: +        label = 'agent'      elif remote:          # master          label = gconf.local_path @@ -604,6 +612,11 @@ def main_i():      startup(go_daemon=go_daemon, log_file=log_file, label=label)      resource.Popen.init_errhandler() +    if be_agent: +        os.setsid() +        logging.debug('rpc_fd: %s' % repr(gconf.rpc_fd)) +        return agent(Changelog(), gconf.rpc_fd) +      if be_monitor:          return monitor(*rscs) diff --git a/geo-replication/syncdaemon/master.py b/geo-replication/syncdaemon/master.py index b6a7c894814..1f1fa1122cb 100644 --- a/geo-replication/syncdaemon/master.py +++ b/geo-replication/syncdaemon/master.py @@ -1108,9 +1108,9 @@ class GMasterChangelogMixin(GMasterCommon):          if isinstance(purge_time, int):              purge_time = None -        self.master.server.changelog_scan() +        self.changelog_agent.scan()          self.crawls += 1 -        changes = self.master.server.changelog_getchanges() +        changes = self.changelog_agent.getchanges()          if changes:              if purge_time:                  logging.info("slave's time: %s" % repr(purge_time)) @@ -1120,22 +1120,24 @@ class GMasterChangelogMixin(GMasterCommon):                      logging.info(                          'skipping already processed change: %s...' %                          os.path.basename(pr)) -                    self.master.server.changelog_done(pr) +                    self.changelog_done_func(pr)                      changes.remove(pr)              if changes:                  logging.debug('processing changes %s' % repr(changes))                  self.process(changes) -    def register(self): +    def register(self, changelog_agent): +        self.changelog_agent = changelog_agent          self.sleep_interval = int(gconf.change_interval) -        self.changelog_done_func = self.master.server.changelog_done +        self.changelog_done_func = self.changelog_agent.done  class GMasterChangeloghistoryMixin(GMasterChangelogMixin): -    def register(self): +    def register(self, changelog_agent): +        self.changelog_agent = changelog_agent          self.changelog_register_time = int(time.time()) -        self.changelog_done_func = self.master.server.history_changelog_done +        self.changelog_done_func = self.changelog_agent.history_done      def crawl(self):          self.update_worker_crawl_status("History Crawl") @@ -1157,21 +1159,21 @@ class GMasterChangeloghistoryMixin(GMasterChangelogMixin):          # location then consuming history will not work(Known issue as of now)          changelog_path = os.path.join(gconf.local_path,                                        ".glusterfs/changelogs") -        ts = self.master.server.history_changelog(changelog_path, -                                                  purge_time[0], -                                                  self.changelog_register_time, -                                                  int(gconf.sync_jobs)) +        ts = self.changelog_agent.history(changelog_path, +                                          purge_time[0], +                                          self.changelog_register_time, +                                          int(gconf.sync_jobs))          # scan followed by getchanges till scan returns zero. -        # history_changelog_scan() is blocking call, till it gets the number +        # history_scan() is blocking call, till it gets the number          # of changelogs to process. Returns zero when no changelogs          # to be processed. returns positive value as number of changelogs          # to be processed, which will be fetched using -        # history_changelog_getchanges() -        while self.master.server.history_changelog_scan() > 0: +        # history_getchanges() +        while self.changelog_agent.history_scan() > 0:              self.crawls += 1 -            changes = self.master.server.history_changelog_getchanges() +            changes = self.changelog_agent.history_getchanges()              if changes:                  if purge_time:                      logging.info("slave's time: %s" % repr(purge_time)) @@ -1208,7 +1210,7 @@ class GMasterXsyncMixin(GMasterChangelogMixin):      XSYNC_MAX_ENTRIES = 1 << 13 -    def register(self): +    def register(self, changelog_agent=None):          self.counter = 0          self.comlist = []          self.stimes = [] diff --git a/geo-replication/syncdaemon/monitor.py b/geo-replication/syncdaemon/monitor.py index 8ed6f832618..e49a24ee5f5 100644 --- a/geo-replication/syncdaemon/monitor.py +++ b/geo-replication/syncdaemon/monitor.py @@ -108,7 +108,7 @@ class Monitor(object):          # give a chance to graceful exit          os.kill(-os.getpid(), signal.SIGTERM) -    def monitor(self, w, argv, cpids): +    def monitor(self, w, argv, cpids, agents):          """the monitor loop          Basic logic is a blantantly simple blunt heuristics: @@ -149,6 +149,23 @@ class Monitor(object):          while ret in (0, 1):              logging.info('-' * conn_timeout)              logging.info('starting gsyncd worker') + +            # Couple of pipe pairs for RPC communication b/w +            # worker and changelog agent. + +            # read/write end for agent +            (ra, ww) = os.pipe() +            # read/write end for worker +            (rw, wa) = os.pipe() + +            # spawn the agent process +            apid = os.fork() +            if apid == 0: +                os.execv(sys.executable, argv + ['--local-path', w[0], +                                                 '--agent', +                                                 '--rpc-fd', +                                                 ','.join([str(ra), str(wa), +                                                           str(rw), str(ww)])])              pr, pw = os.pipe()              cpid = os.fork()              if cpid == 0: @@ -157,14 +174,26 @@ class Monitor(object):                                                   '--local-path', w[0],                                                   '--local-id',                                                   '.' + escape(w[0]), +                                                 '--rpc-fd', +                                                 ','.join([str(rw), str(ww), +                                                           str(ra), str(wa)]),                                                   '--resource-remote', w[1]])              self.lock.acquire()              cpids.add(cpid) +            agents.add(apid)              self.lock.release()              os.close(pw) +              t0 = time.time()              so = select((pr,), (), (), conn_timeout)[0]              os.close(pr) + +            # close all RPC pipes in monitor +            os.close(ra) +            os.close(wa) +            os.close(rw) +            os.close(ww) +              if so:                  ret = nwait(cpid, os.WNOHANG)                  if ret is not None: @@ -206,10 +235,11 @@ class Monitor(object):          argv.insert(0, os.path.basename(sys.executable))          cpids = set() +        agents = set()          ta = []          for wx in wspx:              def wmon(w): -                cpid, _ = self.monitor(w, argv, cpids) +                cpid, _ = self.monitor(w, argv, cpids, agents)                  time.sleep(1)                  self.lock.acquire()                  for cpid in cpids: diff --git a/geo-replication/syncdaemon/resource.py b/geo-replication/syncdaemon/resource.py index 185722f5df0..79dc9e79e9d 100644 --- a/geo-replication/syncdaemon/resource.py +++ b/geo-replication/syncdaemon/resource.py @@ -35,6 +35,8 @@ from syncdutils import GsyncdError, select, privileged, boolify, funcode  from syncdutils import umask, entry2pb, gauxpfx, errno_wrap, lstat  from syncdutils import NoPurgeTimeAvailable, PartialHistoryAvailable  from syncdutils import ChangelogException +from syncdutils import CHANGELOG_AGENT_CLIENT_VERSION +  UrlRX = re.compile('\A(\w+)://([^ *?[]*)\Z')  HostRX = re.compile('[a-z\d](?:[a-z\d.-]*[a-z\d])?', re.I) @@ -127,19 +129,7 @@ class _MetaXattr(object):          return getattr(self, meth) -class _MetaChangelog(object): - -    def __getattr__(self, meth): -        from libgfchangelog import Changes as LChanges -        xmeth = [m for m in dir(LChanges) if m[0] != '_'] -        if not meth in xmeth: -            return -        for m in xmeth: -            setattr(self, m, getattr(LChanges, m)) -        return getattr(self, meth) -  Xattr = _MetaXattr() -Changes = _MetaChangelog()  class Popen(subprocess.Popen): @@ -669,39 +659,6 @@ class Server(object):              errno_wrap(os.chown, [go, uid, gid], [ENOENT], [ESTALE, EINVAL])      @classmethod -    def changelog_register(cls, cl_brick, cl_dir, cl_log, cl_level, retries=0): -        Changes.cl_register(cl_brick, cl_dir, cl_log, cl_level, retries) - -    @classmethod -    def changelog_scan(cls): -        Changes.cl_scan() - -    @classmethod -    def changelog_getchanges(cls): -        return Changes.cl_getchanges() - -    @classmethod -    def changelog_done(cls, clfile): -        Changes.cl_done(clfile) - -    @classmethod -    def history_changelog(cls, changelog_path, start, end, num_parallel): -        return Changes.cl_history_changelog(changelog_path, start, end, -                                            num_parallel) - -    @classmethod -    def history_changelog_scan(cls): -        return Changes.cl_history_scan() - -    @classmethod -    def history_changelog_getchanges(cls): -        return Changes.cl_history_getchanges() - -    @classmethod -    def history_changelog_done(cls, clfile): -        Changes.cl_history_done(clfile) - -    @classmethod      @_pathguard      def setattr(cls, path, adct):          """set file attributes @@ -932,9 +889,6 @@ class AbstractUrl(object):          return self.get_url() -  ### Concrete resource classes ### - -  class FILE(AbstractUrl, SlaveLocal, SlaveRemote):      """scheme class for file:// urls @@ -1311,16 +1265,27 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):              # register the crawlers and start crawling              # g1 ==> Xsync, g2 ==> config.change_detector(changelog by default)              # g3 ==> changelog History +            (inf, ouf, ra, wa) = gconf.rpc_fd.split(',') +            os.close(int(ra)) +            os.close(int(wa)) +            changelog_agent = RepceClient(int(inf), int(ouf)) +            rv = changelog_agent.version() +            if int(rv) != CHANGELOG_AGENT_CLIENT_VERSION: +                raise GsyncdError( +                    "RePCe major version mismatch(changelog agent): " +                    "local %s, remote %s" % +                    (CHANGELOG_AGENT_CLIENT_VERSION, rv)) +              g1.register()              try:                  (workdir, logfile) = g2.setup_working_dir()                  # register with the changelog library                  # 9 == log level (DEBUG)                  # 5 == connection retries -                brickserver.changelog_register(gconf.local_path, -                                               workdir, logfile, 9, 5) -                g2.register() -                g3.register() +                changelog_agent.register(gconf.local_path, +                                         workdir, logfile, 9, 5) +                g2.register(changelog_agent) +                g3.register(changelog_agent)              except ChangelogException as e:                  logging.debug("Changelog register failed: %s - %s" %                                (e.errno, e.strerror)) diff --git a/geo-replication/syncdaemon/syncdutils.py b/geo-replication/syncdaemon/syncdutils.py index 65daeb0fe7c..9eda6044472 100644 --- a/geo-replication/syncdaemon/syncdutils.py +++ b/geo-replication/syncdaemon/syncdutils.py @@ -47,6 +47,9 @@ except ImportError:  _CL_AUX_GFID_PFX = ".gfid/"  GF_OP_RETRIES = 20 +CHANGELOG_AGENT_SERVER_VERSION = 1.0 +CHANGELOG_AGENT_CLIENT_VERSION = 1.0 +  def escape(s):      """the chosen flavor of string escaping, used all over  | 
