diff options
| author | Aravinda VK <avishwan@redhat.com> | 2014-03-21 12:33:10 +0530 | 
|---|---|---|
| committer | Vijay Bellur <vbellur@redhat.com> | 2014-04-07 21:56:55 -0700 | 
| commit | 238d101e55e067e5afcd43c728884e9ab8d36549 (patch) | |
| tree | 60498b107335c0ae526bfa034bd56303406710ab | |
| parent | 0c20b17c09b2eca82f3c79013fd3fe1c72a957fd (diff) | |
geo-rep: code pep8/flake8 fixes
pep8 is a style guide for python.
http://legacy.python.org/dev/peps/pep-0008/
pep8 can be installed using, `pip install pep8`
Usage: `pep8 <python file>`, For example, `pep8 master.py`
will display all the coding standard errors.
flake8 is used to identify unused imports and other issues
in code.
pip install flake8
cd $GLUSTER_REPO/geo-replication/
flake8 syncdaemon
Updated license headers to each source file.
Change-Id: I01c7d0a6091d21bfa48720e9fb5624b77fa3db4a
Signed-off-by: Aravinda VK <avishwan@redhat.com>
Reviewed-on: http://review.gluster.org/7311
Reviewed-by: Kotresh HR <khiremat@redhat.com>
Reviewed-by: Prashanth Pai <ppai@redhat.com>
Tested-by: Gluster Build System <jenkins@build.gluster.com>
| -rw-r--r-- | geo-replication/syncdaemon/__codecheck.py | 17 | ||||
| -rw-r--r-- | geo-replication/syncdaemon/__init__.py | 9 | ||||
| -rw-r--r-- | geo-replication/syncdaemon/configinterface.py | 57 | ||||
| -rw-r--r-- | geo-replication/syncdaemon/gconf.py | 12 | ||||
| -rw-r--r-- | geo-replication/syncdaemon/gsyncd.py | 292 | ||||
| -rw-r--r-- | geo-replication/syncdaemon/libcxattr.py | 16 | ||||
| -rw-r--r-- | geo-replication/syncdaemon/libgfchangelog.py | 22 | ||||
| -rw-r--r-- | geo-replication/syncdaemon/master.py | 384 | ||||
| -rw-r--r-- | geo-replication/syncdaemon/monitor.py | 78 | ||||
| -rw-r--r-- | geo-replication/syncdaemon/repce.py | 52 | ||||
| -rw-r--r-- | geo-replication/syncdaemon/resource.py | 306 | ||||
| -rw-r--r-- | geo-replication/syncdaemon/syncdutils.py | 114 | 
12 files changed, 912 insertions, 447 deletions
diff --git a/geo-replication/syncdaemon/__codecheck.py b/geo-replication/syncdaemon/__codecheck.py index e3386afba8b..45dbd26bb64 100644 --- a/geo-replication/syncdaemon/__codecheck.py +++ b/geo-replication/syncdaemon/__codecheck.py @@ -1,10 +1,20 @@ +# +# Copyright (c) 2011-2014 Red Hat, Inc. <http://www.redhat.com> +# This file is part of GlusterFS. + +# This file is licensed to you under your choice of the GNU Lesser +# General Public License, version 3 or any later version (LGPLv3 or +# later), or the GNU General Public License, version 2 (GPLv2), in all +# cases as published by the Free Software Foundation. +# +  import os  import os.path  import sys  import tempfile  import shutil -ipd = tempfile.mkdtemp(prefix = 'codecheck-aux') +ipd = tempfile.mkdtemp(prefix='codecheck-aux')  try:      # add a fake ipaddr module, we don't want to @@ -25,7 +35,7 @@ class IPNetwork(list):          if f[-3:] != '.py' or f[0] == '_':              continue          m = f[:-3] -        sys.stdout.write('importing %s ...' %  m) +        sys.stdout.write('importing %s ...' % m)          __import__(m)          print(' OK.') @@ -33,7 +43,8 @@ class IPNetwork(list):          sys.argv = sys.argv[:1] + a      gsyncd = sys.modules['gsyncd'] -    for a in [['--help'], ['--version'], ['--canonicalize-escape-url', '/foo']]: +    for a in [['--help'], ['--version'], +              ['--canonicalize-escape-url', '/foo']]:          print('>>> invoking program with args: %s' % ' '.join(a))          pid = os.fork()          if not pid: diff --git a/geo-replication/syncdaemon/__init__.py b/geo-replication/syncdaemon/__init__.py index e69de29bb2d..b4648b69645 100644 --- a/geo-replication/syncdaemon/__init__.py +++ b/geo-replication/syncdaemon/__init__.py @@ -0,0 +1,9 @@ +# +# Copyright (c) 2011-2014 Red Hat, Inc. <http://www.redhat.com> +# This file is part of GlusterFS. + +# This file is licensed to you under your choice of the GNU Lesser +# General Public License, version 3 or any later version (LGPLv3 or +# later), or the GNU General Public License, version 2 (GPLv2), in all +# cases as published by the Free Software Foundation. +# diff --git a/geo-replication/syncdaemon/configinterface.py b/geo-replication/syncdaemon/configinterface.py index 35f754c98c9..c4d47b5dbda 100644 --- a/geo-replication/syncdaemon/configinterface.py +++ b/geo-replication/syncdaemon/configinterface.py @@ -1,3 +1,13 @@ +# +# Copyright (c) 2011-2014 Red Hat, Inc. <http://www.redhat.com> +# This file is part of GlusterFS. + +# This file is licensed to you under your choice of the GNU Lesser +# General Public License, version 3 or any later version (LGPLv3 or +# later), or the GNU General Public License, version 2 (GPLv2), in all +# cases as published by the Free Software Foundation. +# +  try:      import ConfigParser  except ImportError: @@ -21,14 +31,25 @@ config_version = 2.0  re_type = type(re.compile('')) -  # (SECTION, OPTION, OLD VALUE, NEW VALUE)  CONFIGS = ( -    ("peersrx . .", "georep_session_working_dir", "", "/var/lib/glusterd/geo-replication/${mastervol}_${remotehost}_${slavevol}/"), -    ("peersrx .", "gluster_params", "aux-gfid-mount xlator-option=\*-dht.assert-no-child-down=true", "aux-gfid-mount"), -    ("peersrx . .", "ssh_command_tar", "", "ssh -oPasswordAuthentication=no -oStrictHostKeyChecking=no -i /var/lib/glusterd/geo-replication/tar_ssh.pem"), +    ("peersrx . .", +     "georep_session_working_dir", +     "", +     "/var/lib/glusterd/geo-replication/${mastervol}_${remotehost}_" +     "${slavevol}/"), +    ("peersrx .", +     "gluster_params", +     "aux-gfid-mount xlator-option=\*-dht.assert-no-child-down=true", +     "aux-gfid-mount"), +    ("peersrx . .", +     "ssh_command_tar", +     "", +     "ssh -oPasswordAuthentication=no -oStrictHostKeyChecking=no " +     "-i /var/lib/glusterd/geo-replication/tar_ssh.pem"),  ) +  def upgrade_config_file(path):      config_change = False      config = ConfigParser.RawConfigParser() @@ -72,7 +93,9 @@ def upgrade_config_file(path):  class MultiDict(object): -    """a virtual dict-like class which functions as the union of underlying dicts""" + +    """a virtual dict-like class which functions as the union +    of underlying dicts"""      def __init__(self, *dd):          self.dicts = dd @@ -80,14 +103,15 @@ class MultiDict(object):      def __getitem__(self, key):          val = None          for d in self.dicts: -            if d.get(key) != None: +            if d.get(key) is not None:                  val = d[key] -        if val == None: +        if val is None:              raise KeyError(key)          return val  class GConffile(object): +      """A high-level interface to ConfigParser which flattens the two-tiered         config layout by implenting automatic section dispatch based on initial         parameters. @@ -155,7 +179,8 @@ class GConffile(object):          return self.get(opt, printValue=False)      def section(self, rx=False): -        """get the section name of the section representing .peers in .config""" +        """get the section name of the section representing .peers +        in .config"""          peers = self.peers          if not peers:              peers = ['.', '.'] @@ -209,6 +234,7 @@ class GConffile(object):                  continue              so2[s] = tv              tv += 1 +          def scmp(x, y):              return cmp(*(so2[s] for s in (x, y)))          ss.sort(scmp) @@ -218,12 +244,13 @@ class GConffile(object):          """update @dct from key/values of ours.          key/values are collected from .config by filtering the regexp sections -        according to match, and from .section. The values are treated as templates, -        which are substituted from .auxdicts and (in case of regexp sections) -        match groups. +        according to match, and from .section. The values are treated as +        templates, which are substituted from .auxdicts and (in case of regexp +        sections) match groups.          """          if not self.peers:              raise GsyncdError('no peers given, cannot select matching options') +          def update_from_sect(sect, mud):              for k, v in self.config._sections[sect].items():                  if k == '__name__': @@ -243,7 +270,7 @@ class GConffile(object):                          match = False                          break                      for j in range(len(m.groups())): -                        mad['match%d_%d' % (i+1, j+1)] = m.groups()[j] +                        mad['match%d_%d' % (i + 1, j + 1)] = m.groups()[j]                  if match:                      update_from_sect(sect, MultiDict(dct, mad, *self.auxdicts))          if self.config.has_section(self.section()): @@ -255,7 +282,7 @@ class GConffile(object):             logic described in .update_to)          """          d = {} -        self.update_to(d, allow_unresolved = True) +        self.update_to(d, allow_unresolved=True)          if opt:              opt = norm(opt)              v = d.get(opt) @@ -283,6 +310,7 @@ class GConffile(object):                  self.config.add_section(SECT_META)              self.config.set(SECT_META, 'version', config_version)              return trfn(norm(opt), *a, **kw) +          def updateconf(f):              self.config.write(f)          update_file(self.path, updateconf, mergeconf) @@ -295,7 +323,8 @@ class GConffile(object):              # regarding SECT_ORD, cf. ord_sections              if not self.config.has_section(SECT_ORD):                  self.config.add_section(SECT_ORD) -            self.config.set(SECT_ORD, sect, len(self.config._sections[SECT_ORD])) +            self.config.set( +                SECT_ORD, sect, len(self.config._sections[SECT_ORD]))          self.config.set(sect, opt, val)          return True diff --git a/geo-replication/syncdaemon/gconf.py b/geo-replication/syncdaemon/gconf.py index fe5795f16e2..1fc7c381bc4 100644 --- a/geo-replication/syncdaemon/gconf.py +++ b/geo-replication/syncdaemon/gconf.py @@ -1,6 +1,16 @@ -import os +# +# Copyright (c) 2011-2014 Red Hat, Inc. <http://www.redhat.com> +# This file is part of GlusterFS. + +# This file is licensed to you under your choice of the GNU Lesser +# General Public License, version 3 or any later version (LGPLv3 or +# later), or the GNU General Public License, version 2 (GPLv2), in all +# cases as published by the Free Software Foundation. +# +  class GConf(object): +      """singleton class to store globals         shared between gsyncd modules""" diff --git a/geo-replication/syncdaemon/gsyncd.py b/geo-replication/syncdaemon/gsyncd.py index 6eb62c6b076..426d964de95 100644 --- a/geo-replication/syncdaemon/gsyncd.py +++ b/geo-replication/syncdaemon/gsyncd.py @@ -1,4 +1,13 @@  #!/usr/bin/env python +# +# Copyright (c) 2011-2014 Red Hat, Inc. <http://www.redhat.com> +# This file is part of GlusterFS. + +# This file is licensed to you under your choice of the GNU Lesser +# General Public License, version 3 or any later version (LGPLv3 or +# later), or the GNU General Public License, version 2 (GPLv2), in all +# cases as published by the Free Software Foundation. +#  import os  import os.path @@ -6,25 +15,27 @@ import glob  import sys  import time  import logging -import signal  import shutil  import optparse  import fcntl  import fnmatch  from optparse import OptionParser, SUPPRESS_HELP  from logging import Logger, handlers -from errno import EEXIST, ENOENT +from errno import ENOENT  from ipaddr import IPAddress, IPNetwork  from gconf import gconf -from syncdutils import FreeObject, norm, grabpidfile, finalize, log_raise_exception -from syncdutils import GsyncdError, select, set_term_handler, privileged, update_file +from syncdutils import FreeObject, norm, grabpidfile, finalize +from syncdutils import log_raise_exception, privileged, update_file +from syncdutils import GsyncdError, select, set_term_handler  from configinterface import GConffile, upgrade_config_file  import resource  from monitor import monitor +  class GLogger(Logger): +      """Logger customizations for gsyncd.      It implements a log format similar to that of glusterfs. @@ -51,7 +62,8 @@ class GLogger(Logger):          if lbl:              lbl = '(' + lbl + ')'          lprm = {'datefmt': "%Y-%m-%d %H:%M:%S", -                'format': "[%(asctime)s.%(nsecs)d] %(lvlnam)s [%(module)s" + lbl + ":%(lineno)s:%(funcName)s] %(ctx)s: %(message)s"} +                'format': "[%(asctime)s.%(nsecs)d] %(lvlnam)s [%(module)s" + +                lbl + ":%(lineno)s:%(funcName)s] %(ctx)s: %(message)s"}          lprm.update(kw)          lvl = kw.get('level', logging.INFO)          lprm['level'] = lvl @@ -64,7 +76,7 @@ class GLogger(Logger):              try:                  logging_handler = handlers.WatchedFileHandler(lprm['filename'])                  formatter = logging.Formatter(fmt=lprm['format'], -                datefmt=lprm['datefmt']) +                                              datefmt=lprm['datefmt'])                  logging_handler.setFormatter(formatter)                  logging.getLogger().addHandler(logging_handler)              except AttributeError: @@ -96,6 +108,7 @@ class GLogger(Logger):          gconf.log_metadata = lkw          gconf.log_exit = True +  def startup(**kw):      """set up logging, pidfile grabbing, daemonization"""      if getattr(gconf, 'pid_file', None) and kw.get('go_daemon') != 'postconn': @@ -144,14 +157,15 @@ def main():      gconf.starttime = time.time()      set_term_handler()      GLogger.setup() -    excont = FreeObject(exval = 0) +    excont = FreeObject(exval=0)      try:          try:              main_i()          except:              log_raise_exception(excont)      finally: -        finalize(exval = excont.exval) +        finalize(exval=excont.exval) +  def main_i():      """internal main routine @@ -171,50 +185,71 @@ def main_i():          if val and val != '-':              val = os.path.abspath(val)          setattr(parser.values, opt.dest, val) +      def store_local(opt, optstr, val, parser):          rconf[opt.dest] = val +      def store_local_curry(val):          return lambda o, oo, vx, p: store_local(o, oo, val, p) +      def store_local_obj(op, dmake): -        return lambda o, oo, vx, p: store_local(o, oo, FreeObject(op=op, **dmake(vx)), p) - -    op = OptionParser(usage="%prog [options...] <master> <slave>", version="%prog 0.0.1") -    op.add_option('--gluster-command-dir', metavar='DIR',   default='') -    op.add_option('--gluster-log-file',    metavar='LOGF',  default=os.devnull, type=str, action='callback', callback=store_abs) -    op.add_option('--gluster-log-level',   metavar='LVL') -    op.add_option('--gluster-params',      metavar='PRMS',  default='') -    op.add_option('--glusterd-uuid',       metavar='UUID',  type=str, default='', help=SUPPRESS_HELP) -    op.add_option('--gluster-cli-options', metavar='OPTS',  default='--log-file=-') -    op.add_option('--mountbroker',         metavar='LABEL') -    op.add_option('-p', '--pid-file',      metavar='PIDF',  type=str, action='callback', callback=store_abs) -    op.add_option('-l', '--log-file',      metavar='LOGF',  type=str, action='callback', callback=store_abs) -    op.add_option('--log-file-mbr',        metavar='LOGF',  type=str, action='callback', callback=store_abs) -    op.add_option('--state-file',          metavar='STATF', type=str, action='callback', callback=store_abs) -    op.add_option('--state-detail-file',   metavar='STATF', type=str, action='callback', callback=store_abs) -    op.add_option('--georep-session-working-dir', metavar='STATF', type=str, action='callback', callback=store_abs) -    op.add_option('--ignore-deletes',      default=False, action='store_true') -    op.add_option('--isolated-slave',      default=False, action='store_true') -    op.add_option('--use-rsync-xattrs',    default=False, action='store_true') -    op.add_option('-L', '--log-level',     metavar='LVL') -    op.add_option('-r', '--remote-gsyncd', metavar='CMD',   default=os.path.abspath(sys.argv[0])) -    op.add_option('--volume-id',           metavar='UUID') -    op.add_option('--slave-id',            metavar='ID') -    op.add_option('--session-owner',       metavar='ID') -    op.add_option('--local-id',            metavar='ID',    help=SUPPRESS_HELP, default='') -    op.add_option('--local-path',          metavar='PATH',  help=SUPPRESS_HELP, default='') -    op.add_option('-s', '--ssh-command',   metavar='CMD',   default='ssh') -    op.add_option('--ssh-command-tar',     metavar='CMD',   default='ssh') -    op.add_option('--rsync-command',       metavar='CMD',   default='rsync') -    op.add_option('--rsync-options',       metavar='OPTS',  default='') -    op.add_option('--rsync-ssh-options',   metavar='OPTS',  default='--compress') -    op.add_option('--timeout',             metavar='SEC',   type=int, default=120) -    op.add_option('--connection-timeout',  metavar='SEC',   type=int, default=60, help=SUPPRESS_HELP) -    op.add_option('--sync-jobs',           metavar='N',     type=int, default=3) -    op.add_option('--turns',               metavar='N',     type=int, default=0, help=SUPPRESS_HELP) -    op.add_option('--allow-network',       metavar='IPS',   default='') -    op.add_option('--socketdir',           metavar='DIR') -    op.add_option('--state-socket-unencoded', metavar='SOCKF', type=str, action='callback', callback=store_abs) -    op.add_option('--checkpoint',          metavar='LABEL', default='') +        return lambda o, oo, vx, p: store_local( +            o, oo, FreeObject(op=op, **dmake(vx)), p) + +    op = OptionParser( +        usage="%prog [options...] <master> <slave>", version="%prog 0.0.1") +    op.add_option('--gluster-command-dir', metavar='DIR', default='') +    op.add_option('--gluster-log-file', metavar='LOGF', +                  default=os.devnull, type=str, action='callback', +                  callback=store_abs) +    op.add_option('--gluster-log-level', metavar='LVL') +    op.add_option('--gluster-params', metavar='PRMS', default='') +    op.add_option( +        '--glusterd-uuid', metavar='UUID', type=str, default='', +        help=SUPPRESS_HELP) +    op.add_option( +        '--gluster-cli-options', metavar='OPTS', default='--log-file=-') +    op.add_option('--mountbroker', metavar='LABEL') +    op.add_option('-p', '--pid-file', metavar='PIDF', type=str, +                  action='callback', callback=store_abs) +    op.add_option('-l', '--log-file', metavar='LOGF', type=str, +                  action='callback', callback=store_abs) +    op.add_option('--log-file-mbr', metavar='LOGF', type=str, +                  action='callback', callback=store_abs) +    op.add_option('--state-file', metavar='STATF', type=str, +                  action='callback', callback=store_abs) +    op.add_option('--state-detail-file', metavar='STATF', +                  type=str, action='callback', callback=store_abs) +    op.add_option('--georep-session-working-dir', metavar='STATF', +                  type=str, action='callback', callback=store_abs) +    op.add_option('--ignore-deletes', default=False, action='store_true') +    op.add_option('--isolated-slave', default=False, action='store_true') +    op.add_option('--use-rsync-xattrs', default=False, action='store_true') +    op.add_option('-L', '--log-level', metavar='LVL') +    op.add_option('-r', '--remote-gsyncd', metavar='CMD', +                  default=os.path.abspath(sys.argv[0])) +    op.add_option('--volume-id', metavar='UUID') +    op.add_option('--slave-id', metavar='ID') +    op.add_option('--session-owner', metavar='ID') +    op.add_option('--local-id', metavar='ID', help=SUPPRESS_HELP, default='') +    op.add_option( +        '--local-path', metavar='PATH', help=SUPPRESS_HELP, default='') +    op.add_option('-s', '--ssh-command', metavar='CMD', default='ssh') +    op.add_option('--ssh-command-tar', metavar='CMD', default='ssh') +    op.add_option('--rsync-command', metavar='CMD', default='rsync') +    op.add_option('--rsync-options', metavar='OPTS', default='') +    op.add_option('--rsync-ssh-options', metavar='OPTS', default='--compress') +    op.add_option('--timeout', metavar='SEC', type=int, default=120) +    op.add_option('--connection-timeout', metavar='SEC', +                  type=int, default=60, help=SUPPRESS_HELP) +    op.add_option('--sync-jobs', metavar='N', type=int, default=3) +    op.add_option( +        '--turns', metavar='N', type=int, default=0, help=SUPPRESS_HELP) +    op.add_option('--allow-network', metavar='IPS', default='') +    op.add_option('--socketdir', metavar='DIR') +    op.add_option('--state-socket-unencoded', metavar='SOCKF', +                  type=str, action='callback', callback=store_abs) +    op.add_option('--checkpoint', metavar='LABEL', default='')      # tunables for failover/failback mechanism:      # None   - gsyncd behaves as normal      # blind  - gsyncd works with xtime pairs to identify @@ -225,56 +260,86 @@ def main_i():      op.add_option('--special-sync-mode', type=str, help=SUPPRESS_HELP)      # changelog or xtime? (TODO: Change the default) -    op.add_option('--change-detector', metavar='MODE', type=str, default='xtime') -    # sleep interval for change detection (xtime crawl uses a hardcoded 1 second sleep time) +    op.add_option( +        '--change-detector', metavar='MODE', type=str, default='xtime') +    # sleep interval for change detection (xtime crawl uses a hardcoded 1 +    # second sleep time)      op.add_option('--change-interval', metavar='SEC', type=int, default=3)      # working directory for changelog based mechanism -    op.add_option('--working-dir', metavar='DIR', type=str, action='callback', callback=store_abs) +    op.add_option('--working-dir', metavar='DIR', type=str, +                  action='callback', callback=store_abs)      op.add_option('--use-tarssh', default=False, action='store_true') -    op.add_option('-c', '--config-file',   metavar='CONF',  type=str, action='callback', callback=store_local) +    op.add_option('-c', '--config-file', metavar='CONF', +                  type=str, action='callback', callback=store_local)      # duh. need to specify dest or value will be mapped to None :S -    op.add_option('--monitor', dest='monitor', action='callback', callback=store_local_curry(True)) -    op.add_option('--resource-local', dest='resource_local', type=str, action='callback', callback=store_local) -    op.add_option('--resource-remote', dest='resource_remote', type=str, action='callback', callback=store_local) -    op.add_option('--feedback-fd', dest='feedback_fd', type=int, help=SUPPRESS_HELP, action='callback', callback=store_local) -    op.add_option('--listen', dest='listen', help=SUPPRESS_HELP,      action='callback', callback=store_local_curry(True)) -    op.add_option('-N', '--no-daemon', dest="go_daemon",    action='callback', callback=store_local_curry('dont')) -    op.add_option('--verify', type=str, dest="verify", action='callback', callback=store_local) -    op.add_option('--create', type=str, dest="create", action='callback', callback=store_local) -    op.add_option('--delete', dest='delete', action='callback', callback=store_local_curry(True)) -    op.add_option('--debug', dest="go_daemon",              action='callback', callback=lambda *a: (store_local_curry('dont')(*a), -                                                                                                    setattr(a[-1].values, 'log_file', '-'), -                                                                                                    setattr(a[-1].values, 'log_level', 'DEBUG'))), +    op.add_option('--monitor', dest='monitor', action='callback', +                  callback=store_local_curry(True)) +    op.add_option('--resource-local', dest='resource_local', +                  type=str, action='callback', callback=store_local) +    op.add_option('--resource-remote', dest='resource_remote', +                  type=str, action='callback', callback=store_local) +    op.add_option('--feedback-fd', dest='feedback_fd', type=int, +                  help=SUPPRESS_HELP, action='callback', callback=store_local) +    op.add_option('--listen', dest='listen', help=SUPPRESS_HELP, +                  action='callback', callback=store_local_curry(True)) +    op.add_option('-N', '--no-daemon', dest="go_daemon", +                  action='callback', callback=store_local_curry('dont')) +    op.add_option('--verify', type=str, dest="verify", +                  action='callback', callback=store_local) +    op.add_option('--create', type=str, dest="create", +                  action='callback', callback=store_local) +    op.add_option('--delete', dest='delete', action='callback', +                  callback=store_local_curry(True)) +    op.add_option('--debug', dest="go_daemon", action='callback', +                  callback=lambda *a: (store_local_curry('dont')(*a), +                                       setattr( +                                           a[-1].values, 'log_file', '-'), +                                       setattr(a[-1].values, 'log_level', +                                               'DEBUG'))),      op.add_option('--path', type=str, action='append')      for a in ('check', 'get'): -        op.add_option('--config-' + a,      metavar='OPT',  type=str, dest='config', action='callback', +        op.add_option('--config-' + a, metavar='OPT', type=str, dest='config', +                      action='callback',                        callback=store_local_obj(a, lambda vx: {'opt': vx})) -    op.add_option('--config-get-all', dest='config', action='callback', callback=store_local_obj('get', lambda vx: {'opt': None})) +    op.add_option('--config-get-all', dest='config', action='callback', +                  callback=store_local_obj('get', lambda vx: {'opt': None}))      for m in ('', '-rx', '-glob'):          # call this code 'Pythonic' eh? -        # have to define a one-shot local function to be able to inject (a value depending on the) +        # have to define a one-shot local function to be able +        # to inject (a value depending on the)          # iteration variable into the inner lambda          def conf_mod_opt_regex_variant(rx): -            op.add_option('--config-set' + m,   metavar='OPT VAL', type=str, nargs=2, dest='config', action='callback', -                          callback=store_local_obj('set', lambda vx: {'opt': vx[0], 'val': vx[1], 'rx': rx})) -            op.add_option('--config-del' + m,   metavar='OPT',  type=str, dest='config', action='callback', -                          callback=store_local_obj('del', lambda vx: {'opt': vx, 'rx': rx})) +            op.add_option('--config-set' + m, metavar='OPT VAL', type=str, +                          nargs=2, dest='config', action='callback', +                          callback=store_local_obj('set', lambda vx: { +                              'opt': vx[0], 'val': vx[1], 'rx': rx})) +            op.add_option('--config-del' + m, metavar='OPT', type=str, +                          dest='config', action='callback', +                          callback=store_local_obj('del', lambda vx: { +                              'opt': vx, 'rx': rx}))          conf_mod_opt_regex_variant(m and m[1:] or False) -    op.add_option('--normalize-url',           dest='url_print', action='callback', callback=store_local_curry('normal')) -    op.add_option('--canonicalize-url',        dest='url_print', action='callback', callback=store_local_curry('canon')) -    op.add_option('--canonicalize-escape-url', dest='url_print', action='callback', callback=store_local_curry('canon_esc')) - -    tunables = [ norm(o.get_opt_string()[2:]) for o in op.option_list if o.callback in (store_abs, 'store_true', None) and o.get_opt_string() not in ('--version', '--help') ] -    remote_tunables = [ 'listen', 'go_daemon', 'timeout', 'session_owner', 'config_file', 'use_rsync_xattrs' ] -    rq_remote_tunables = { 'listen': True } - -    # precedence for sources of values: 1) commandline, 2) cfg file, 3) defaults -    # -- for this to work out we need to tell apart defaults from explicitly set -    # options... so churn out the defaults here and call the parser with virgin -    # values container. +    op.add_option('--normalize-url', dest='url_print', +                  action='callback', callback=store_local_curry('normal')) +    op.add_option('--canonicalize-url', dest='url_print', +                  action='callback', callback=store_local_curry('canon')) +    op.add_option('--canonicalize-escape-url', dest='url_print', +                  action='callback', callback=store_local_curry('canon_esc')) + +    tunables = [norm(o.get_opt_string()[2:]) +                for o in op.option_list +                if (o.callback in (store_abs, 'store_true', None) and +                    o.get_opt_string() not in ('--version', '--help'))] +    remote_tunables = ['listen', 'go_daemon', 'timeout', +                       'session_owner', 'config_file', 'use_rsync_xattrs'] +    rq_remote_tunables = {'listen': True} + +    # precedence for sources of values: 1) commandline, 2) cfg file, 3) +    # defaults for this to work out we need to tell apart defaults from +    # explicitly set options... so churn out the defaults here and call +    # the parser with virgin values container.      defaults = op.get_default_values()      opts, args = op.parse_args(values=optparse.Values())      args_orig = args[:] @@ -291,9 +356,9 @@ def main_i():              args.append(None)          args[1] = r      confdata = rconf.get('config') -    if not (len(args) == 2 or \ -            (len(args) == 1 and rconf.get('listen')) or \ -            (len(args) <= 2 and confdata) or \ +    if not (len(args) == 2 or +            (len(args) == 1 and rconf.get('listen')) or +            (len(args) <= 2 and confdata) or              rconf.get('url_print')):          sys.stderr.write("error: incorrect number of arguments\n\n")          sys.stderr.write(op.get_usage() + "\n") @@ -301,8 +366,8 @@ def main_i():      verify = rconf.get('verify')      if verify: -        logging.info (verify) -        logging.info ("Able to spawn gsyncd.py") +        logging.info(verify) +        logging.info("Able to spawn gsyncd.py")          return      restricted = os.getenv('_GSYNCD_RESTRICTED_') @@ -313,14 +378,17 @@ def main_i():          allopts.update(rconf)          bannedtuns = set(allopts.keys()) - set(remote_tunables)          if bannedtuns: -            raise GsyncdError('following tunables cannot be set with restricted SSH invocaton: ' + \ +            raise GsyncdError('following tunables cannot be set with ' +                              'restricted SSH invocaton: ' +                                ', '.join(bannedtuns))          for k, v in rq_remote_tunables.items():              if not k in allopts or allopts[k] != v: -                raise GsyncdError('tunable %s is not set to value %s required for restricted SSH invocaton' % \ +                raise GsyncdError('tunable %s is not set to value %s required ' +                                  'for restricted SSH invocaton' %                                    (k, v))      confrx = getattr(confdata, 'rx', None) +      def makersc(aa, check=True):          if not aa:              return ([], None, None) @@ -330,12 +398,13 @@ def main_i():          if len(ra) > 1:              remote = ra[1]          if check and not local.can_connect_to(remote): -            raise GsyncdError("%s cannot work with %s" % (local.path, remote and remote.path)) +            raise GsyncdError("%s cannot work with %s" % +                              (local.path, remote and remote.path))          return (ra, local, remote)      if confrx:          # peers are regexen, don't try to parse them          if confrx == 'glob': -            args = [ '\A' + fnmatch.translate(a) for a in args ] +            args = ['\A' + fnmatch.translate(a) for a in args]          canon_peers = args          namedict = {}      else: @@ -345,21 +414,24 @@ def main_i():              for r in rscs:                  print(r.get_url(**{'normal': {},                                     'canon': {'canonical': True}, -                                   'canon_esc': {'canonical': True, 'escaped': True}}[dc])) +                                   'canon_esc': {'canonical': True, +                                                 'escaped': True}}[dc]))              return          pa = ([], [], []) -        urlprms = ({}, {'canonical': True}, {'canonical': True, 'escaped': True}) +        urlprms = ( +            {}, {'canonical': True}, {'canonical': True, 'escaped': True})          for x in rscs:              for i in range(len(pa)):                  pa[i].append(x.get_url(**urlprms[i]))          _, canon_peers, canon_esc_peers = pa -        # creating the namedict, a dict representing various ways of referring to / repreenting -        # peers to be fillable in config templates -        mods = (lambda x: x, lambda x: x[0].upper() + x[1:], lambda x: 'e' + x[0].upper() + x[1:]) +        # creating the namedict, a dict representing various ways of referring +        # to / repreenting peers to be fillable in config templates +        mods = (lambda x: x, lambda x: x[ +                0].upper() + x[1:], lambda x: 'e' + x[0].upper() + x[1:])          if remote: -            rmap = { local: ('local', 'master'), remote: ('remote', 'slave') } +            rmap = {local: ('local', 'master'), remote: ('remote', 'slave')}          else: -            rmap = { local: ('local', 'slave') } +            rmap = {local: ('local', 'slave')}          namedict = {}          for i in range(len(rscs)):              x = rscs[i] @@ -370,10 +442,13 @@ def main_i():                  if name == 'remote':                      namedict['remotehost'] = x.remotehost      if not 'config_file' in rconf: -        rconf['config_file'] = os.path.join(os.path.dirname(sys.argv[0]), "conf/gsyncd_template.conf") +        rconf['config_file'] = os.path.join( +            os.path.dirname(sys.argv[0]), "conf/gsyncd_template.conf")      upgrade_config_file(rconf['config_file']) -    gcnf = GConffile(rconf['config_file'], canon_peers, defaults.__dict__, opts.__dict__, namedict) +    gcnf = GConffile( +        rconf['config_file'], canon_peers, +        defaults.__dict__, opts.__dict__, namedict)      checkpoint_change = False      if confdata: @@ -407,7 +482,7 @@ def main_i():      delete = rconf.get('delete')      if delete: -        logging.info ('geo-replication delete') +        logging.info('geo-replication delete')          # Delete pid file, status file, socket file          cleanup_paths = []          if getattr(gconf, 'pid_file', None): @@ -422,7 +497,7 @@ def main_i():          if getattr(gconf, 'state_socket_unencoded', None):              cleanup_paths.append(gconf.state_socket_unencoded) -        cleanup_paths.append(rconf['config_file'][:-11] + "*"); +        cleanup_paths.append(rconf['config_file'][:-11] + "*")          # Cleanup changelog working dirs          if getattr(gconf, 'working_dir', None): @@ -432,7 +507,9 @@ def main_i():                  if sys.exc_info()[1].errno == ENOENT:                      pass                  else: -                    raise GsyncdError('Error while removing working dir: %s' % gconf.working_dir) +                    raise GsyncdError( +                        'Error while removing working dir: %s' % +                        gconf.working_dir)          for path in cleanup_paths:              # To delete temp files @@ -443,10 +520,11 @@ def main_i():      if restricted and gconf.allow_network:          ssh_conn = os.getenv('SSH_CONNECTION')          if not ssh_conn: -            #legacy env var +            # legacy env var              ssh_conn = os.getenv('SSH_CLIENT')          if ssh_conn: -            allowed_networks = [ IPNetwork(a) for a in gconf.allow_network.split(',') ] +            allowed_networks = [IPNetwork(a) +                                for a in gconf.allow_network.split(',')]              client_ip = IPAddress(ssh_conn.split()[0])              allowed = False              for nw in allowed_networks: @@ -460,7 +538,7 @@ def main_i():      if ffd:          fcntl.fcntl(ffd, fcntl.F_SETFD, fcntl.FD_CLOEXEC) -    #normalize loglevel +    # normalize loglevel      lvl0 = gconf.log_level      if isinstance(lvl0, str):          lvl1 = lvl0.upper() @@ -519,7 +597,7 @@ def main_i():      if be_monitor:          label = 'monitor'      elif remote: -        #master +        # master          label = gconf.local_path      else:          label = 'slave' diff --git a/geo-replication/syncdaemon/libcxattr.py b/geo-replication/syncdaemon/libcxattr.py index b5b6956aea6..e6035e26b43 100644 --- a/geo-replication/syncdaemon/libcxattr.py +++ b/geo-replication/syncdaemon/libcxattr.py @@ -1,8 +1,20 @@ +# +# Copyright (c) 2011-2014 Red Hat, Inc. <http://www.redhat.com> +# This file is part of GlusterFS. + +# This file is licensed to you under your choice of the GNU Lesser +# General Public License, version 3 or any later version (LGPLv3 or +# later), or the GNU General Public License, version 2 (GPLv2), in all +# cases as published by the Free Software Foundation. +# +  import os -from ctypes import * +from ctypes import CDLL, c_int, create_string_buffer  from ctypes.util import find_library +  class Xattr(object): +      """singleton that wraps the extended attribues system         interface for python using ctypes @@ -40,7 +52,7 @@ class Xattr(object):      @classmethod      def lgetxattr(cls, path, attr, siz=0): -        return cls._query_xattr( path, siz, 'lgetxattr', attr) +        return cls._query_xattr(path, siz, 'lgetxattr', attr)      @classmethod      def lgetxattr_buf(cls, path, attr): diff --git a/geo-replication/syncdaemon/libgfchangelog.py b/geo-replication/syncdaemon/libgfchangelog.py index 68ec3baf144..ec563b36f29 100644 --- a/geo-replication/syncdaemon/libgfchangelog.py +++ b/geo-replication/syncdaemon/libgfchangelog.py @@ -1,7 +1,18 @@ +# +# Copyright (c) 2011-2014 Red Hat, Inc. <http://www.redhat.com> +# This file is part of GlusterFS. + +# This file is licensed to you under your choice of the GNU Lesser +# General Public License, version 3 or any later version (LGPLv3 or +# later), or the GNU General Public License, version 2 (GPLv2), in all +# cases as published by the Free Software Foundation. +# +  import os -from ctypes import * +from ctypes import CDLL, create_string_buffer, get_errno  from ctypes.util import find_library +  class Changes(object):      libgfc = CDLL(find_library("gfchangelog"), use_errno=True) @@ -19,9 +30,10 @@ class Changes(object):          return getattr(cls.libgfc, call)      @classmethod -    def cl_register(cls, brick, path, log_file, log_level, retries = 0): +    def cl_register(cls, brick, path, log_file, log_level, retries=0):          ret = cls._get_api('gf_changelog_register')(brick, path, -                                                    log_file, log_level, retries) +                                                    log_file, +                                                    log_level, retries)          if ret == -1:              cls.raise_oserr() @@ -49,8 +61,8 @@ class Changes(object):          while True:              ret = call(buf, 4096)              if ret in (0, -1): -                break; -            changes.append(buf.raw[:ret-1]) +                break +            changes.append(buf.raw[:ret - 1])          if ret == -1:              cls.raise_oserr()          # cleanup tracker diff --git a/geo-replication/syncdaemon/master.py b/geo-replication/syncdaemon/master.py index 98a61bc1d75..4301396f9f4 100644 --- a/geo-replication/syncdaemon/master.py +++ b/geo-replication/syncdaemon/master.py @@ -1,25 +1,30 @@ +# +# Copyright (c) 2011-2014 Red Hat, Inc. <http://www.redhat.com> +# This file is part of GlusterFS. + +# This file is licensed to you under your choice of the GNU Lesser +# General Public License, version 3 or any later version (LGPLv3 or +# later), or the GNU General Public License, version 2 (GPLv2), in all +# cases as published by the Free Software Foundation. +# +  import os  import sys  import time  import stat -import random -import signal  import json  import logging  import socket  import string  import errno -from shutil import copyfileobj -from errno import ENOENT, ENODATA, EPIPE, EEXIST, errorcode -from threading import currentThread, Condition, Lock +from errno import ENOENT, ENODATA, EPIPE, EEXIST +from threading import Condition, Lock  from datetime import datetime -from libcxattr import Xattr -  from gconf import gconf -from tempfile import mkdtemp, NamedTemporaryFile -from syncdutils import FreeObject, Thread, GsyncdError, boolify, escape, \ -                       unescape, select, gauxpfx, md5hex, selfkill, entry2pb, \ -                       lstat, errno_wrap, update_file +from tempfile import NamedTemporaryFile +from syncdutils import Thread, GsyncdError, boolify, escape +from syncdutils import unescape, select, gauxpfx, md5hex, selfkill +from syncdutils import lstat, errno_wrap  URXTIME = (-1, 0) @@ -27,18 +32,20 @@ URXTIME = (-1, 0)  # of the DRY principle (no, don't look for elevated or  # perspectivistic things here) +  def _xtime_now():      t = time.time()      sec = int(t)      nsec = int((t - sec) * 1000000)      return (sec, nsec) +  def _volinfo_hook_relax_foreign(self):      volinfo_sys = self.get_sys_volinfo()      fgn_vi = volinfo_sys[self.KFGN]      if fgn_vi:          expiry = fgn_vi['timeout'] - int(time.time()) + 1 -        logging.info('foreign volume info found, waiting %d sec for expiry' % \ +        logging.info('foreign volume info found, waiting %d sec for expiry' %                       expiry)          time.sleep(expiry)          volinfo_sys = self.get_sys_volinfo() @@ -58,10 +65,14 @@ def gmaster_builder(excrawl=None):      logging.info('setting up %s change detection mode' % changemixin)      modemixin = getattr(this, modemixin.capitalize() + 'Mixin')      crawlmixin = getattr(this, 'GMaster' + changemixin.capitalize() + 'Mixin') -    sendmarkmixin = boolify(gconf.use_rsync_xattrs) and SendmarkRsyncMixin or SendmarkNormalMixin -    purgemixin = boolify(gconf.ignore_deletes) and PurgeNoopMixin or PurgeNormalMixin +    sendmarkmixin = boolify( +        gconf.use_rsync_xattrs) and SendmarkRsyncMixin or SendmarkNormalMixin +    purgemixin = boolify( +        gconf.ignore_deletes) and PurgeNoopMixin or PurgeNormalMixin      syncengine = boolify(gconf.use_tarssh) and TarSSHEngine or RsyncEngine -    class _GMaster(crawlmixin, modemixin, sendmarkmixin, purgemixin, syncengine): + +    class _GMaster(crawlmixin, modemixin, sendmarkmixin, +                   purgemixin, syncengine):          pass      return _GMaster @@ -71,6 +82,7 @@ def gmaster_builder(excrawl=None):  # sync modes  class NormalMixin(object): +      """normal geo-rep behavior"""      minus_infinity = URXTIME @@ -152,14 +164,18 @@ class NormalMixin(object):          self.slave.server.set_stime(path, self.uuid, mark)          # self.slave.server.set_xtime_remote(path, self.uuid, mark) +  class PartialMixin(NormalMixin): +      """a variant tuned towards operation with a master         that has partial info of the slave (brick typically)"""      def xtime_reversion_hook(self, path, xtl, xtr):          pass +  class RecoverMixin(NormalMixin): +      """a variant that differs from normal in terms         of ignoring non-indexed files""" @@ -178,11 +194,13 @@ class RecoverMixin(NormalMixin):  # Further mixins for certain tunable behaviors +  class SendmarkNormalMixin(object):      def sendmark_regular(self, *a, **kw):          return self.sendmark(*a, **kw) +  class SendmarkRsyncMixin(object):      def sendmark_regular(self, *a, **kw): @@ -194,19 +212,24 @@ class PurgeNormalMixin(object):      def purge_missing(self, path, names):          self.slave.server.purge(path, names) +  class PurgeNoopMixin(object):      def purge_missing(self, path, names):          pass +  class TarSSHEngine(object): +      """Sync engine that uses tar(1) piped over ssh(1)         for data transfers. Good for lots of small files.      """ +      def a_syncdata(self, files):          logging.debug('files: %s' % (files))          for f in files:              pb = self.syncer.add(f) +              def regjob(se, xte, pb):                  rv = pb.wait()                  if rv[0]: @@ -228,13 +251,17 @@ class TarSSHEngine(object):          self.a_syncdata(files)          self.syncdata_wait() +  class RsyncEngine(object): +      """Sync engine that uses rsync(1) for data transfers""" +      def a_syncdata(self, files):          logging.debug('files: %s' % (files))          for f in files:              logging.debug('candidate for syncing %s' % f)              pb = self.syncer.add(f) +              def regjob(se, xte, pb):                  rv = pb.wait()                  if rv[0]: @@ -258,7 +285,9 @@ class RsyncEngine(object):          self.a_syncdata(files)          self.syncdata_wait() +  class GMasterCommon(object): +      """abstract class impementling master role"""      KFGN = 0 @@ -269,8 +298,9 @@ class GMasterCommon(object):          err out on multiple foreign masters          """ -        fgn_vis, nat_vi = self.master.server.aggregated.foreign_volume_infos(), \ -                          self.master.server.aggregated.native_volume_info() +        fgn_vis, nat_vi = ( +            self.master.server.aggregated.foreign_volume_infos(), +            self.master.server.aggregated.native_volume_info())          fgn_vi = None          if fgn_vis:              if len(fgn_vis) > 1: @@ -316,15 +346,14 @@ class GMasterCommon(object):          if getattr(gconf, 'state_detail_file', None):              try:                  with open(gconf.state_detail_file, 'r+') as f: -                    loaded_data= json.load(f) -                    diff_data = set(default_data) - set (loaded_data) +                    loaded_data = json.load(f) +                    diff_data = set(default_data) - set(loaded_data)                      if len(diff_data):                          for i in diff_data:                              loaded_data[i] = default_data[i]                      return loaded_data -            except (IOError): -                ex = sys.exc_info()[1] -                logging.warn ('Creating new gconf.state_detail_file.') +            except IOError: +                logging.warn('Creating new gconf.state_detail_file.')                  # Create file with initial data                  try:                      with open(gconf.state_detail_file, 'wb') as f: @@ -364,7 +393,8 @@ class GMasterCommon(object):          # - self.turns is the number of turns since start          # - self.total_turns is a limit so that if self.turns reaches it, then          #   we exit (for diagnostic purposes) -        # so, eg., if the master fs changes unceasingly, self.turns will remain 0. +        # so, eg., if the master fs changes unceasingly, self.turns will remain +        # 0.          self.crawls = 0          self.turns = 0          self.total_turns = int(gconf.turns) @@ -394,7 +424,7 @@ class GMasterCommon(object):              t.start()      def should_crawl(cls): -        return (gconf.glusterd_uuid in cls.master.server.node_uuid()) +        return gconf.glusterd_uuid in cls.master.server.node_uuid()      def register(self):          self.register() @@ -416,18 +446,18 @@ class GMasterCommon(object):          volinfo_sys = self.volinfo_hook()          self.volinfo = volinfo_sys[self.KNAT]          inter_master = volinfo_sys[self.KFGN] -        logging.info("%s master with volume id %s ..." % \ -                         (inter_master and "intermediate" or "primary", -                          self.uuid)) +        logging.info("%s master with volume id %s ..." % +                     (inter_master and "intermediate" or "primary", +                      self.uuid))          gconf.configinterface.set('volume_id', self.uuid)          if self.volinfo:              if self.volinfo['retval']: -                logging.warn("master cluster's info may not be valid %d" % \ +                logging.warn("master cluster's info may not be valid %d" %                               self.volinfo['retval'])              self.start_checkpoint_thread()          else:              raise GsyncdError("master volinfo unavailable") -	self.total_crawl_stats = self.get_initial_crawl_data() +        self.total_crawl_stats = self.get_initial_crawl_data()          self.lastreport['time'] = time.time()          logging.info('crawl interval: %d seconds' % self.sleep_interval) @@ -435,7 +465,7 @@ class GMasterCommon(object):          crawl = self.should_crawl()          while not self.terminate:              if self.start: -                logging.debug("... crawl #%d done, took %.6f seconds" % \ +                logging.debug("... crawl #%d done, took %.6f seconds" %                                (self.crawls, time.time() - self.start))              self.start = time.time()              should_display_info = self.start - self.lastreport['time'] >= 60 @@ -443,11 +473,11 @@ class GMasterCommon(object):                  logging.info("%d crawls, %d turns",                               self.crawls - self.lastreport['crawls'],                               self.turns - self.lastreport['turns']) -                self.lastreport.update(crawls = self.crawls, -                                       turns = self.turns, -                                       time = self.start) +                self.lastreport.update(crawls=self.crawls, +                                       turns=self.turns, +                                       time=self.start)              t1 = time.time() -            if int(t1 - t0) >= 60: #lets hardcode this check to 60 seconds +            if int(t1 - t0) >= 60:  # lets hardcode this check to 60 seconds                  crawl = self.should_crawl()                  t0 = t1              self.update_worker_remote_node() @@ -456,11 +486,14 @@ class GMasterCommon(object):                  # bring up _this_ brick to the cluster stime                  # which is min of cluster (but max of the replicas)                  brick_stime = self.xtime('.', self.slave) -                cluster_stime = self.master.server.aggregated.stime_mnt('.', '.'.join([str(self.uuid), str(gconf.slave_id)])) -                logging.debug("Cluster stime: %s | Brick stime: %s" % (repr(cluster_stime), repr(brick_stime))) +                cluster_stime = self.master.server.aggregated.stime_mnt( +                    '.', '.'.join([str(self.uuid), str(gconf.slave_id)])) +                logging.debug("Cluster stime: %s | Brick stime: %s" % +                              (repr(cluster_stime), repr(brick_stime)))                  if not isinstance(cluster_stime, int):                      if brick_stime < cluster_stime: -                        self.slave.server.set_stime(self.FLAT_DIR_HIERARCHY, self.uuid, cluster_stime) +                        self.slave.server.set_stime( +                            self.FLAT_DIR_HIERARCHY, self.uuid, cluster_stime)                  time.sleep(5)                  continue              self.update_worker_health("Active") @@ -489,13 +522,14 @@ class GMasterCommon(object):             with checkpoint @chkpt"""          if xtimish:              val = cls.serialize_xtime(val) -        gconf.configinterface.set('checkpoint_' + prm, "%s:%s" % (escape(chkpt), val)) +        gconf.configinterface.set( +            'checkpoint_' + prm, "%s:%s" % (escape(chkpt), val))      @staticmethod      def humantime(*tpair):          """format xtime-like (sec, nsec) pair to human readable format"""          ts = datetime.fromtimestamp(float('.'.join(str(n) for n in tpair))).\ -               strftime("%Y-%m-%d %H:%M:%S") +            strftime("%Y-%m-%d %H:%M:%S")          if len(tpair) > 1:              ts += '.' + str(tpair[1])          return ts @@ -506,7 +540,7 @@ class GMasterCommon(object):          years = int(years)          days = int(days) -        date="" +        date = ""          m, s = divmod(crawl_time.seconds, 60)          h, m = divmod(m, 60) @@ -515,7 +549,8 @@ class GMasterCommon(object):          if days != 0:              date += "%s %s " % (days, "day" if days == 1 else "days") -        date += "%s:%s:%s" % (string.zfill(h, 2), string.zfill(m, 2), string.zfill(s, 2)) +        date += "%s:%s:%s" % (string.zfill(h, 2), +                              string.zfill(m, 2), string.zfill(s, 2))          return date      def checkpt_service(self, chan, chkpt): @@ -540,16 +575,18 @@ class GMasterCommon(object):              if not checkpt_tgt:                  checkpt_tgt = self.xtime('.')                  if isinstance(checkpt_tgt, int): -                    raise GsyncdError("master root directory is unaccessible (%s)", +                    raise GsyncdError("master root directory is " +                                      "unaccessible (%s)",                                        os.strerror(checkpt_tgt))                  self._set_checkpt_param(chkpt, 'target', checkpt_tgt) -            logging.debug("checkpoint target %s has been determined for checkpoint %s" % \ +            logging.debug("checkpoint target %s has been determined " +                          "for checkpoint %s" %                            (repr(checkpt_tgt), chkpt))              # check if the label is 'now'              chkpt_lbl = chkpt              try: -                x1,x2 = chkpt.split(':') +                x1, x2 = chkpt.split(':')                  if x1 == 'now':                      chkpt_lbl = "as of " + self.humantime(x2)              except: @@ -557,41 +594,46 @@ class GMasterCommon(object):              completed = self._checkpt_param(chkpt, 'completed', xtimish=False)              if completed:                  completed = tuple(int(x) for x in completed.split('.')) -            s,_,_ = select([chan], [], [], (not completed) and 5 or None) +            s, _, _ = select([chan], [], [], (not completed) and 5 or None)              # either request made and we re-check to not              # give back stale data, or we still hunting for completion -            if self.native_xtime(checkpt_tgt) and self.native_xtime(checkpt_tgt) < self.volmark: +            if (self.native_xtime(checkpt_tgt) and ( +                    self.native_xtime(checkpt_tgt) < self.volmark)):                  # indexing has been reset since setting the checkpoint                  status = "is invalid"              else:                  xtr = self.xtime('.', self.slave)                  if isinstance(xtr, int): -                    raise GsyncdError("slave root directory is unaccessible (%s)", +                    raise GsyncdError("slave root directory is " +                                      "unaccessible (%s)",                                        os.strerror(xtr))                  ncompleted = self.xtime_geq(xtr, checkpt_tgt) -                if completed and not ncompleted: # stale data -                    logging.warn("completion time %s for checkpoint %s became stale" % \ +                if completed and not ncompleted:  # stale data +                    logging.warn("completion time %s for checkpoint %s " +                                 "became stale" %                                   (self.humantime(*completed), chkpt))                      completed = None                      gconf.configinterface.delete('checkpoint_completed') -                if ncompleted and not completed: # just reaching completion +                if ncompleted and not completed:  # just reaching completion                      completed = "%.6f" % time.time() -                    self._set_checkpt_param(chkpt, 'completed', completed, xtimish=False) +                    self._set_checkpt_param( +                        chkpt, 'completed', completed, xtimish=False)                      completed = tuple(int(x) for x in completed.split('.'))                      logging.info("checkpoint %s completed" % chkpt)                  status = completed and \ -                  "completed at " + self.humantime(completed[0]) or \ -                  "not reached yet" +                    "completed at " + self.humantime(completed[0]) or \ +                    "not reached yet"              if s:                  conn = None                  try:                      conn, _ = chan.accept()                      try: -                        conn.send("checkpoint %s is %s\0" % (chkpt_lbl, status)) +                        conn.send("checkpoint %s is %s\0" % +                                  (chkpt_lbl, status))                      except:                          exc = sys.exc_info()[1] -                        if (isinstance(exc, OSError) or isinstance(exc, IOError)) and \ -                           exc.errno == EPIPE: +                        if ((isinstance(exc, OSError) or isinstance( +                                exc, IOError)) and exc.errno == EPIPE):                              logging.debug('checkpoint client disconnected')                          else:                              raise @@ -602,11 +644,13 @@ class GMasterCommon(object):      def start_checkpoint_thread(self):          """prepare and start checkpoint service"""          if self.checkpoint_thread or not ( -          getattr(gconf, 'state_socket_unencoded', None) and getattr(gconf, 'socketdir', None) +            getattr(gconf, 'state_socket_unencoded', None) and getattr( +                gconf, 'socketdir', None)          ):              return          chan = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) -        state_socket = os.path.join(gconf.socketdir, md5hex(gconf.state_socket_unencoded) + ".socket") +        state_socket = os.path.join( +            gconf.socketdir, md5hex(gconf.state_socket_unencoded) + ".socket")          try:              os.unlink(state_socket)          except: @@ -621,9 +665,9 @@ class GMasterCommon(object):      def add_job(self, path, label, job, *a, **kw):          """insert @job function to job table at @path with @label""" -        if self.jobtab.get(path) == None: +        if self.jobtab.get(path) is None:              self.jobtab[path] = [] -        self.jobtab[path].append((label, a, lambda : job(*a, **kw))) +        self.jobtab[path].append((label, a, lambda: job(*a, **kw)))      def add_failjob(self, path, label):          """invoke .add_job with a job that does nothing just fails""" @@ -644,7 +688,7 @@ class GMasterCommon(object):              ret = j[-1]()              if not ret:                  succeed = False -        if succeed and not args[0] == None: +        if succeed and not args[0] is None:              self.sendmark(path, *args)          return succeed @@ -657,19 +701,21 @@ class GMasterCommon(object):              self.slave.server.setattr(path, adct)          self.set_slave_xtime(path, mark) +  class GMasterChangelogMixin(GMasterCommon): +      """ changelog based change detection and syncing """      # index for change type and entry      IDX_START = 0 -    IDX_END   = 2 +    IDX_END = 2 -    POS_GFID   = 0 -    POS_TYPE   = 1 +    POS_GFID = 0 +    POS_TYPE = 1      POS_ENTRY1 = -1 -    TYPE_META  = "M " -    TYPE_GFID  = "D " +    TYPE_META = "M " +    TYPE_GFID = "D "      TYPE_ENTRY = "E "      # flat directory heirarchy for gfid based access @@ -686,18 +732,19 @@ class GMasterChangelogMixin(GMasterCommon):      def setup_working_dir(self):          workdir = os.path.join(gconf.working_dir, md5hex(gconf.local_path))          logfile = os.path.join(workdir, 'changes.log') -        logging.debug('changelog working dir %s (log: %s)' % (workdir, logfile)) +        logging.debug('changelog working dir %s (log: %s)' % +                      (workdir, logfile))          return (workdir, logfile)      def process_change(self, change, done, retry):          pfx = gauxpfx() -        clist   = [] +        clist = []          entries = []          meta_gfid = set()          datas = set()          # basic crawl stats: files and bytes -        files_pending  = {'count': 0, 'purge': 0, 'bytes': 0, 'files': []} +        files_pending = {'count': 0, 'purge': 0, 'bytes': 0, 'files': []}          try:              f = open(change, "r")              clist = f.readlines() @@ -750,17 +797,19 @@ class GMasterChangelogMixin(GMasterCommon):                  elif ty in ['CREATE', 'MKDIR', 'MKNOD']:                      entry_update()                      # stat information present in the changelog itself -                    entries.append(edct(ty, gfid=gfid, entry=en, mode=int(ec[2]),\ +                    entries.append(edct(ty, gfid=gfid, entry=en, +                                        mode=int(ec[2]),                                          uid=int(ec[3]), gid=int(ec[4])))                  else:                      # stat() to get mode and other information                      go = os.path.join(pfx, gfid)                      st = lstat(go)                      if isinstance(st, int): -                        if ty == 'RENAME': # special hack for renames... +                        if ty == 'RENAME':  # special hack for renames...                              entries.append(edct('UNLINK', gfid=gfid, entry=en))                          else: -                            logging.debug('file %s got purged in the interim' % go) +                            logging.debug( +                                'file %s got purged in the interim' % go)                          continue                      if ty == 'LINK': @@ -771,17 +820,20 @@ class GMasterChangelogMixin(GMasterCommon):                          if isinstance(rl, int):                              continue                          entry_update() -                        entries.append(edct(ty, stat=st, entry=en, gfid=gfid, link=rl)) +                        entries.append( +                            edct(ty, stat=st, entry=en, gfid=gfid, link=rl))                      elif ty == 'RENAME':                          entry_update() -                        e1 = unescape(os.path.join(pfx, ec[self.POS_ENTRY1 - 1])) -                        entries.append(edct(ty, gfid=gfid, entry=e1, entry1=en, stat=st)) +                        e1 = unescape( +                            os.path.join(pfx, ec[self.POS_ENTRY1 - 1])) +                        entries.append( +                            edct(ty, gfid=gfid, entry=e1, entry1=en, stat=st))                      else:                          logging.warn('ignoring %s [op %s]' % (gfid, ty))              elif et == self.TYPE_GFID:                  datas.add(os.path.join(pfx, ec[0]))              elif et == self.TYPE_META: -                if ec[1] == 'SETATTR': # only setattr's for now... +                if ec[1] == 'SETATTR':  # only setattr's for now...                      meta_gfid.add(os.path.join(pfx, ec[0]))              else:                  logging.warn('got invalid changelog type: %s' % (et)) @@ -789,10 +841,10 @@ class GMasterChangelogMixin(GMasterCommon):          if not retry:              self.update_worker_cumilitive_status(files_pending)          # sync namespace -        if (entries): +        if entries:              self.slave.server.entry_ops(entries)          # sync metadata -        if (meta_gfid): +        if meta_gfid:              meta_entries = []              for go in meta_gfid:                  st = lstat(go) @@ -814,22 +866,25 @@ class GMasterChangelogMixin(GMasterCommon):              self.skipped_gfid_list = []              self.current_files_skipped_count = 0 -            # first, fire all changelog transfers in parallel. entry and metadata -            # are performed synchronously, therefore in serial. However at the end -            # of each changelog, data is synchronized with syncdata_async() - which -            # means it is serial w.r.t entries/metadata of that changelog but -            # happens in parallel with data of other changelogs. +            # first, fire all changelog transfers in parallel. entry and +            # metadata are performed synchronously, therefore in serial. +            # However at the end of each changelog, data is synchronized +            # with syncdata_async() - which means it is serial w.r.t +            # entries/metadata of that changelog but happens in parallel +            # with data of other changelogs.              for change in changes:                  logging.debug('processing change %s' % change)                  self.process_change(change, done, retry)                  if not retry: -                    self.turns += 1 # number of changelogs processed in the batch +                    # number of changelogs processed in the batch +                    self.turns += 1 -            # Now we wait for all the data transfers fired off in the above step -            # to complete. Note that this is not ideal either. Ideally we want to -            # trigger the entry/meta-data transfer of the next batch while waiting -            # for the data transfer of the current batch to finish. +            # Now we wait for all the data transfers fired off in the above +            # step to complete. Note that this is not ideal either. Ideally +            # we want to trigger the entry/meta-data transfer of the next +            # batch while waiting for the data transfer of the current batch +            # to finish.              # Note that the reason to wait for the data transfer (vs doing it              # completely in the background and call the changelog_done() @@ -837,10 +892,11 @@ class GMasterChangelogMixin(GMasterCommon):              # and prevents a spiraling increase of wait stubs from consuming              # unbounded memory and resources. -            # update the slave's time with the timestamp of the _last_ changelog -            # file time suffix. Since, the changelog prefix time is the time when -            # the changelog was rolled over, introduce a tolerence of 1 second to -            # counter the small delta b/w the marker update and gettimeofday(). +            # update the slave's time with the timestamp of the _last_ +            # changelog file time suffix. Since, the changelog prefix time +            # is the time when the changelog was rolled over, introduce a +            # tolerence of 1 second to counter the small delta b/w the +            # marker update and gettimeofday().              # NOTE: this is only for changelog mode, not xsync.              # @change is the last changelog (therefore max time for this batch) @@ -856,10 +912,13 @@ class GMasterChangelogMixin(GMasterCommon):              retry = True              tries += 1              if tries == self.MAX_RETRIES: -                logging.warn('changelogs %s could not be processed - moving on...' % \ +                logging.warn('changelogs %s could not be processed - ' +                             'moving on...' %                               ' '.join(map(os.path.basename, changes))) -                self.update_worker_total_files_skipped(self.current_files_skipped_count) -                logging.warn('SKIPPED GFID = %s' % ','.join(self.skipped_gfid_list)) +                self.update_worker_total_files_skipped( +                    self.current_files_skipped_count) +                logging.warn('SKIPPED GFID = %s' % +                             ','.join(self.skipped_gfid_list))                  self.update_worker_files_syncd()                  if done:                      xtl = (int(change.split('.')[-1]) - 1, 0) @@ -873,7 +932,7 @@ class GMasterChangelogMixin(GMasterCommon):              # entry_ops() that failed... so we retry the _whole_ changelog              # again.              # TODO: remove entry retries when it's gets fixed. -            logging.warn('incomplete sync, retrying changelogs: %s' % \ +            logging.warn('incomplete sync, retrying changelogs: %s' %                           ' '.join(map(os.path.basename, changes)))              time.sleep(0.5) @@ -884,15 +943,15 @@ class GMasterChangelogMixin(GMasterCommon):              self.sendmark(path, stime)      def get_worker_status_file(self): -        file_name = gconf.local_path+'.status' +        file_name = gconf.local_path + '.status'          file_name = file_name.replace("/", "_") -        worker_status_file = gconf.georep_session_working_dir+file_name +        worker_status_file = gconf.georep_session_working_dir + file_name          return worker_status_file      def update_worker_status(self, key, value): -        default_data = {"remote_node":"N/A", -                        "worker status":"Not Started", -                        "crawl status":"N/A", +        default_data = {"remote_node": "N/A", +                        "worker status": "Not Started", +                        "crawl status": "N/A",                          "files_syncd": 0,                          "files_remaining": 0,                          "bytes_remaining": 0, @@ -909,7 +968,7 @@ class GMasterChangelogMixin(GMasterCommon):                  f.flush()                  os.fsync(f.fileno())          except (IOError, OSError, ValueError): -            logging.info ('Creating new %s' % worker_status_file) +            logging.info('Creating new %s' % worker_status_file)              try:                  with open(worker_status_file, 'wb') as f:                      default_data[key] = value @@ -920,9 +979,9 @@ class GMasterChangelogMixin(GMasterCommon):                  raise      def update_worker_cumilitive_status(self, files_pending): -        default_data = {"remote_node":"N/A", -                        "worker status":"Not Started", -                        "crawl status":"N/A", +        default_data = {"remote_node": "N/A", +                        "worker status": "Not Started", +                        "crawl status": "N/A",                          "files_syncd": 0,                          "files_remaining": 0,                          "bytes_remaining": 0, @@ -932,8 +991,8 @@ class GMasterChangelogMixin(GMasterCommon):          try:              with open(worker_status_file, 'r+') as f:                  loaded_data = json.load(f) -                loaded_data['files_remaining']  = files_pending['count'] -                loaded_data['bytes_remaining']  = files_pending['bytes'] +                loaded_data['files_remaining'] = files_pending['count'] +                loaded_data['bytes_remaining'] = files_pending['bytes']                  loaded_data['purges_remaining'] = files_pending['purge']                  os.ftruncate(f.fileno(), 0)                  os.lseek(f.fileno(), 0, os.SEEK_SET) @@ -941,11 +1000,11 @@ class GMasterChangelogMixin(GMasterCommon):                  f.flush()                  os.fsync(f.fileno())          except (IOError, OSError, ValueError): -            logging.info ('Creating new %s' % worker_status_file) +            logging.info('Creating new %s' % worker_status_file)              try:                  with open(worker_status_file, 'wb') as f: -                    default_data['files_remaining']  = files_pending['count'] -                    default_data['bytes_remaining']  = files_pending['bytes'] +                    default_data['files_remaining'] = files_pending['count'] +                    default_data['bytes_remaining'] = files_pending['bytes']                      default_data['purges_remaining'] = files_pending['purge']                      json.dump(default_data, f)                      f.flush() @@ -953,24 +1012,24 @@ class GMasterChangelogMixin(GMasterCommon):              except:                  raise -    def update_worker_remote_node (self): +    def update_worker_remote_node(self):          node = sys.argv[-1]          node = node.split("@")[-1]          remote_node_ip = node.split(":")[0]          remote_node_vol = node.split(":")[3]          remote_node = remote_node_ip + '::' + remote_node_vol -        self.update_worker_status ('remote_node', remote_node) +        self.update_worker_status('remote_node', remote_node) -    def update_worker_health (self, state): -        self.update_worker_status ('worker status', state) +    def update_worker_health(self, state): +        self.update_worker_status('worker status', state) -    def update_worker_crawl_status (self, state): -        self.update_worker_status ('crawl status', state) +    def update_worker_crawl_status(self, state): +        self.update_worker_status('crawl status', state) -    def update_worker_files_syncd (self): -        default_data = {"remote_node":"N/A", -                        "worker status":"Not Started", -                        "crawl status":"N/A", +    def update_worker_files_syncd(self): +        default_data = {"remote_node": "N/A", +                        "worker status": "Not Started", +                        "crawl status": "N/A",                          "files_syncd": 0,                          "files_remaining": 0,                          "bytes_remaining": 0, @@ -981,8 +1040,8 @@ class GMasterChangelogMixin(GMasterCommon):              with open(worker_status_file, 'r+') as f:                  loaded_data = json.load(f)                  loaded_data['files_syncd'] += loaded_data['files_remaining'] -                loaded_data['files_remaining']  = 0 -                loaded_data['bytes_remaining']  = 0 +                loaded_data['files_remaining'] = 0 +                loaded_data['bytes_remaining'] = 0                  loaded_data['purges_remaining'] = 0                  os.ftruncate(f.fileno(), 0)                  os.lseek(f.fileno(), 0, os.SEEK_SET) @@ -990,7 +1049,7 @@ class GMasterChangelogMixin(GMasterCommon):                  f.flush()                  os.fsync(f.fileno())          except (IOError, OSError, ValueError): -            logging.info ('Creating new %s' % worker_status_file) +            logging.info('Creating new %s' % worker_status_file)              try:                  with open(worker_status_file, 'wb') as f:                      json.dump(default_data, f) @@ -999,19 +1058,19 @@ class GMasterChangelogMixin(GMasterCommon):              except:                  raise -    def update_worker_files_remaining (self, state): -        self.update_worker_status ('files_remaining', state) +    def update_worker_files_remaining(self, state): +        self.update_worker_status('files_remaining', state) -    def update_worker_bytes_remaining (self, state): -        self.update_worker_status ('bytes_remaining', state) +    def update_worker_bytes_remaining(self, state): +        self.update_worker_status('bytes_remaining', state) -    def update_worker_purges_remaining (self, state): -        self.update_worker_status ('purges_remaining', state) +    def update_worker_purges_remaining(self, state): +        self.update_worker_status('purges_remaining', state) -    def update_worker_total_files_skipped (self, value): -        default_data = {"remote_node":"N/A", -                        "worker status":"Not Started", -                        "crawl status":"N/A", +    def update_worker_total_files_skipped(self, value): +        default_data = {"remote_node": "N/A", +                        "worker status": "Not Started", +                        "crawl status": "N/A",                          "files_syncd": 0,                          "files_remaining": 0,                          "bytes_remaining": 0, @@ -1029,7 +1088,7 @@ class GMasterChangelogMixin(GMasterCommon):                  f.flush()                  os.fsync(f.fileno())          except (IOError, OSError, ValueError): -            logging.info ('Creating new %s' % worker_status_file) +            logging.info('Creating new %s' % worker_status_file)              try:                  with open(worker_status_file, 'wb') as f:                      default_data['total_files_skipped'] = value @@ -1057,9 +1116,12 @@ class GMasterChangelogMixin(GMasterCommon):          if changes:              if purge_time:                  logging.info("slave's time: %s" % repr(purge_time)) -                processed = [x for x in changes if int(x.split('.')[-1]) < purge_time[0]] +                processed = [x for x in changes +                             if int(x.split('.')[-1]) < purge_time[0]]                  for pr in processed: -                    logging.info('skipping already processed change: %s...' % os.path.basename(pr)) +                    logging.info( +                        'skipping already processed change: %s...' % +                        os.path.basename(pr))                      self.master.server.changelog_done(pr)                      changes.remove(pr)              logging.debug('processing changes %s' % repr(changes)) @@ -1080,7 +1142,9 @@ class GMasterChangelogMixin(GMasterCommon):              # control should not reach here              raise +  class GMasterXsyncMixin(GMasterChangelogMixin): +      """      This crawl needs to be xtime based (as of now      it's not. this is beacuse we generate CHANGELOG @@ -1091,7 +1155,7 @@ class GMasterXsyncMixin(GMasterChangelogMixin):      files, hardlinks and symlinks.      """ -    XSYNC_MAX_ENTRIES = 1<<13 +    XSYNC_MAX_ENTRIES = 1 << 13      def register(self):          self.counter = 0 @@ -1145,7 +1209,8 @@ class GMasterXsyncMixin(GMasterChangelogMixin):      def open(self):          try: -            self.xsync_change = os.path.join(self.tempdir, 'XSYNC-CHANGELOG.' + str(int(time.time()))) +            self.xsync_change = os.path.join( +                self.tempdir, 'XSYNC-CHANGELOG.' + str(int(time.time())))              self.fh = open(self.xsync_change, 'w')          except IOError:              raise @@ -1165,7 +1230,7 @@ class GMasterXsyncMixin(GMasterChangelogMixin):          self.put('xsync', self.fname())          self.counter = 0          if not last: -            time.sleep(1) # make sure changelogs are 1 second apart +            time.sleep(1)  # make sure changelogs are 1 second apart              self.open()      def sync_stime(self, stime=None, last=False): @@ -1207,7 +1272,7 @@ class GMasterXsyncMixin(GMasterChangelogMixin):              xtr_root = self.xtime('.', self.slave)              if isinstance(xtr_root, int):                  if xtr_root != ENOENT: -                    logging.warn("slave cluster not returning the " \ +                    logging.warn("slave cluster not returning the "                                   "correct xtime for root (%d)" % xtr_root)                  xtr_root = self.minus_infinity          xtl = self.xtime(path) @@ -1216,7 +1281,7 @@ class GMasterXsyncMixin(GMasterChangelogMixin):          xtr = self.xtime(path, self.slave)          if isinstance(xtr, int):              if xtr != ENOENT: -                logging.warn("slave cluster not returning the " \ +                logging.warn("slave cluster not returning the "                               "correct xtime for %s (%d)" % (path, xtr))              xtr = self.minus_infinity          xtr = max(xtr, xtr_root) @@ -1235,7 +1300,8 @@ class GMasterXsyncMixin(GMasterChangelogMixin):              e = os.path.join(path, e)              xte = self.xtime(e)              if isinstance(xte, int): -                logging.warn("irregular xtime for %s: %s" % (e, errno.errorcode[xte])) +                logging.warn("irregular xtime for %s: %s" % +                             (e, errno.errorcode[xte]))                  continue              if not self.need_sync(e, xte, xtr):                  continue @@ -1256,35 +1322,51 @@ class GMasterXsyncMixin(GMasterChangelogMixin):                  self.sync_done(self.stimes, False)                  self.stimes = []              if stat.S_ISDIR(mo): -                self.write_entry_change("E", [gfid, 'MKDIR', str(mo), str(st.st_uid), str(st.st_gid), escape(os.path.join(pargfid, bname))]) +                self.write_entry_change("E", [gfid, 'MKDIR', str(mo), str( +                    st.st_uid), str(st.st_gid), escape(os.path.join(pargfid, +                                                                    bname))])                  self.Xcrawl(e, xtr_root)                  self.stimes.append((e, xte))              elif stat.S_ISLNK(mo): -                self.write_entry_change("E", [gfid, 'SYMLINK', escape(os.path.join(pargfid, bname))]) +                self.write_entry_change( +                    "E", [gfid, 'SYMLINK', escape(os.path.join(pargfid, +                                                               bname))])              elif stat.S_ISREG(mo):                  nlink = st.st_nlink -                nlink -= 1 # fixup backend stat link count -                # if a file has a hardlink, create a Changelog entry as 'LINK' so the slave -                # side will decide if to create the new entry, or to create link. +                nlink -= 1  # fixup backend stat link count +                # if a file has a hardlink, create a Changelog entry as +                # 'LINK' so the slave side will decide if to create the +                # new entry, or to create link.                  if nlink == 1: -                    self.write_entry_change("E", [gfid, 'MKNOD', str(mo), str(st.st_uid), str(st.st_gid), escape(os.path.join(pargfid, bname))]) +                    self.write_entry_change("E", +                                            [gfid, 'MKNOD', str(mo), +                                             str(st.st_uid), +                                             str(st.st_gid), +                                             escape(os.path.join( +                                                 pargfid, bname))])                  else: -                    self.write_entry_change("E", [gfid, 'LINK', escape(os.path.join(pargfid, bname))]) +                    self.write_entry_change( +                        "E", [gfid, 'LINK', escape(os.path.join(pargfid, +                                                                bname))])                  self.write_entry_change("D", [gfid])          if path == '.':              self.stimes.append((path, xtl))              self.sync_done(self.stimes, True) +  class BoxClosedErr(Exception):      pass +  class PostBox(list): +      """synchronized collection for storing things thought of as "requests" """      def __init__(self, *a):          list.__init__(self, *a)          # too bad Python stdlib does not have read/write locks... -        # it would suffivce to grab the lock in .append as reader, in .close as writer +        # it would suffivce to grab the lock in .append as reader, in .close as +        # writer          self.lever = Condition()          self.open = True          self.done = False @@ -1319,7 +1401,9 @@ class PostBox(list):          self.open = False          self.lever.release() +  class Syncer(object): +      """a staged queue to relay rsync requests to rsync workers      By "staged queue" its meant that when a consumer comes to the diff --git a/geo-replication/syncdaemon/monitor.py b/geo-replication/syncdaemon/monitor.py index b0262ee30a8..8ed6f832618 100644 --- a/geo-replication/syncdaemon/monitor.py +++ b/geo-replication/syncdaemon/monitor.py @@ -1,3 +1,13 @@ +# +# Copyright (c) 2011-2014 Red Hat, Inc. <http://www.redhat.com> +# This file is part of GlusterFS. + +# This file is licensed to you under your choice of the GNU Lesser +# General Public License, version 3 or any later version (LGPLv3 or +# later), or the GNU General Public License, version 2 (GPLv2), in all +# cases as published by the Free Software Foundation. +# +  import os  import sys  import time @@ -9,12 +19,16 @@ from subprocess import PIPE  from resource import Popen, FILE, GLUSTER, SSH  from threading import Lock  from gconf import gconf -from syncdutils import update_file, select, waitpid, set_term_handler, is_host_local, GsyncdError +from syncdutils import update_file, select, waitpid +from syncdutils import set_term_handler, is_host_local, GsyncdError  from syncdutils import escape, Thread, finalize, memoize +  class Volinfo(object): +      def __init__(self, vol, host='localhost', prelude=[]): -        po = Popen(prelude + ['gluster', '--xml', '--remote-host=' + host, 'volume', 'info', vol], +        po = Popen(prelude + ['gluster', '--xml', '--remote-host=' + host, +                              'volume', 'info', vol],                     stdout=PIPE, stderr=PIPE)          vix = po.stdout.read()          po.wait() @@ -25,7 +39,8 @@ class Volinfo(object):                  via = '(via %s) ' % prelude.join(' ')              else:                  via = ' ' -            raise GsyncdError('getting volume info of %s%s failed with errorcode %s', +            raise GsyncdError('getting volume info of %s%s ' +                              'failed with errorcode %s',                                (vol, via, vi.find('opErrno').text))          self.tree = vi          self.volume = vol @@ -40,25 +55,27 @@ class Volinfo(object):          def bparse(b):              host, dirp = b.text.split(':', 2)              return {'host': host, 'dir': dirp} -        return [ bparse(b) for b in self.get('brick') ] +        return [bparse(b) for b in self.get('brick')]      @property      @memoize      def uuid(self):          ids = self.get('id')          if len(ids) != 1: -            raise GsyncdError("volume info of %s obtained from %s: ambiguous uuid", +            raise GsyncdError("volume info of %s obtained from %s: " +                              "ambiguous uuid",                                self.volume, self.host)          return ids[0].text  class Monitor(object): +      """class which spawns and manages gsyncd workers""" -    ST_INIT     = 'Initializing...' -    ST_STABLE   = 'Stable' -    ST_FAULTY   = 'faulty' -    ST_INCON    = 'inconsistent' +    ST_INIT = 'Initializing...' +    ST_STABLE = 'Stable' +    ST_FAULTY = 'faulty' +    ST_INCON = 'inconsistent'      _ST_ORD = [ST_STABLE, ST_INIT, ST_FAULTY, ST_INCON]      def __init__(self): @@ -68,7 +85,8 @@ class Monitor(object):      def set_state(self, state, w=None):          """set the state that can be used by external agents             like glusterd for status reporting""" -        computestate = lambda: self.state and self._ST_ORD[max(self._ST_ORD.index(s) for s in self.state.values())] +        computestate = lambda: self.state and self._ST_ORD[ +            max(self._ST_ORD.index(s) for s in self.state.values())]          if w:              self.lock.acquire()              old_state = computestate() @@ -112,14 +130,17 @@ class Monitor(object):          self.set_state(self.ST_INIT, w)          ret = 0 +          def nwait(p, o=0):              p2, r = waitpid(p, o)              if not p2:                  return              return r +          def exit_signalled(s):              """ child teminated due to receipt of SIGUSR1 """              return (os.WIFSIGNALED(s) and (os.WTERMSIG(s) == signal.SIGUSR1)) +          def exit_status(s):              if os.WIFEXITED(s):                  return os.WEXITSTATUS(s) @@ -134,7 +155,8 @@ class Monitor(object):                  os.close(pr)                  os.execv(sys.executable, argv + ['--feedback-fd', str(pw),                                                   '--local-path', w[0], -                                                 '--local-id', '.' + escape(w[0]), +                                                 '--local-id', +                                                 '.' + escape(w[0]),                                                   '--resource-remote', w[1]])              self.lock.acquire()              cpids.add(cpid) @@ -145,31 +167,31 @@ class Monitor(object):              os.close(pr)              if so:                  ret = nwait(cpid, os.WNOHANG) -                if ret != None: -                    logging.info("worker(%s) died before establishing " \ +                if ret is not None: +                    logging.info("worker(%s) died before establishing "                                   "connection" % w[0])                  else:                      logging.debug("worker(%s) connected" % w[0])                      while time.time() < t0 + conn_timeout:                          ret = nwait(cpid, os.WNOHANG) -                        if ret != None: -                            logging.info("worker(%s) died in startup " \ +                        if ret is not None: +                            logging.info("worker(%s) died in startup "                                           "phase" % w[0])                              break                          time.sleep(1)              else: -                logging.info("worker(%s) not confirmed in %d sec, " \ +                logging.info("worker(%s) not confirmed in %d sec, "                               "aborting it" % (w[0], conn_timeout))                  os.kill(cpid, signal.SIGKILL)                  ret = nwait(cpid) -            if ret == None: +            if ret is None:                  self.set_state(self.ST_STABLE, w)                  ret = nwait(cpid)              if exit_signalled(ret):                  ret = 0              else:                  ret = exit_status(ret) -                if ret in (0,1): +                if ret in (0, 1):                      self.set_state(self.ST_FAULTY, w)              time.sleep(10)          self.set_state(self.ST_INCON, w) @@ -194,17 +216,18 @@ class Monitor(object):                      os.kill(cpid, signal.SIGKILL)                  self.lock.release()                  finalize(exval=1) -            t = Thread(target = wmon, args=[wx]) +            t = Thread(target=wmon, args=[wx])              t.start()              ta.append(t)          for t in ta:              t.join() +  def distribute(*resources):      master, slave = resources      mvol = Volinfo(master.volume, master.host)      logging.debug('master bricks: ' + repr(mvol.bricks)) -    prelude  = [] +    prelude = []      si = slave      if isinstance(slave, SSH):          prelude = gconf.ssh_command.split() + [slave.remote_addr] @@ -221,23 +244,28 @@ def distribute(*resources):          raise GsyncdError("unkown slave type " + slave.url)      logging.info('slave bricks: ' + repr(sbricks))      if isinstance(si, FILE): -        slaves = [ slave.url ] +        slaves = [slave.url]      else:          slavenodes = set(b['host'] for b in sbricks)          if isinstance(slave, SSH) and not gconf.isolated_slave:              rap = SSH.parse_ssh_address(slave) -            slaves = [ 'ssh://' + rap['user'] + '@' + h + ':' + si.url for h in slavenodes ] +            slaves = ['ssh://' + rap['user'] + '@' + h + ':' + si.url +                      for h in slavenodes]          else: -            slavevols = [ h + ':' + si.volume for h in slavenodes ] +            slavevols = [h + ':' + si.volume for h in slavenodes]              if isinstance(slave, SSH): -                slaves = [ 'ssh://' + rap.remote_addr + ':' + v for v in slavevols ] +                slaves = ['ssh://' + rap.remote_addr + ':' + v +                          for v in slavevols]              else:                  slaves = slavevols -    workerspex = [ (brick['dir'], slaves[idx % len(slaves)]) for idx, brick in enumerate(mvol.bricks) if is_host_local(brick['host']) ] +    workerspex = [(brick['dir'], slaves[idx % len(slaves)]) +                  for idx, brick in enumerate(mvol.bricks) +                  if is_host_local(brick['host'])]      logging.info('worker specs: ' + repr(workerspex))      return workerspex, suuid +  def monitor(*resources):      """oh yeah, actually Monitor is used as singleton, too"""      return Monitor().multiplex(*distribute(*resources)) diff --git a/geo-replication/syncdaemon/repce.py b/geo-replication/syncdaemon/repce.py index 755fb61df48..d7b17dda796 100644 --- a/geo-replication/syncdaemon/repce.py +++ b/geo-replication/syncdaemon/repce.py @@ -1,3 +1,13 @@ +# +# Copyright (c) 2011-2014 Red Hat, Inc. <http://www.redhat.com> +# This file is part of GlusterFS. + +# This file is licensed to you under your choice of the GNU Lesser +# General Public License, version 3 or any later version (LGPLv3 or +# later), or the GNU General Public License, version 2 (GPLv2), in all +# cases as published by the Free Software Foundation. +# +  import os  import sys  import time @@ -24,6 +34,7 @@ from syncdutils import Thread, select  pickle_proto = -1  repce_version = 1.0 +  def ioparse(i, o):      if isinstance(i, int):          i = os.fdopen(i) @@ -34,6 +45,7 @@ def ioparse(i, o):          o = o.fileno()      return (i, o) +  def send(out, *args):      """pickle args and write out wholly in one syscall @@ -43,12 +55,14 @@ def send(out, *args):      """      os.write(out, pickle.dumps(args, pickle_proto)) +  def recv(inf):      """load an object from input stream"""      return pickle.load(inf)  class RepceServer(object): +      """RePCe is Hungarian for canola, http://hu.wikipedia.org/wiki/Repce      ... also our homebrewed RPC backend where the transport layer is @@ -95,16 +109,17 @@ class RepceServer(object):              if rmeth == '__repce_version__':                  res = repce_version              else: -              try: -                  res = getattr(self.obj, rmeth)(*in_data[2:]) -              except: -                  res = sys.exc_info()[1] -                  exc = True -                  logging.exception("call failed: ") +                try: +                    res = getattr(self.obj, rmeth)(*in_data[2:]) +                except: +                    res = sys.exc_info()[1] +                    exc = True +                    logging.exception("call failed: ")              send(self.out, rid, exc, res)  class RepceJob(object): +      """class representing message status we can use      for waiting on reply""" @@ -137,6 +152,7 @@ class RepceJob(object):  class RepceClient(object): +      """RePCe is Hungarian for canola, http://hu.wikipedia.org/wiki/Repce      ... also our homebrewed RPC backend where the transport layer is @@ -148,7 +164,7 @@ class RepceClient(object):      def __init__(self, i, o):          self.inf, self.out = ioparse(i, o)          self.jtab = {} -        t = Thread(target = self.listen) +        t = Thread(target=self.listen)          t.start()      def listen(self): @@ -177,25 +193,31 @@ class RepceClient(object):          return rjob      def __call__(self, meth, *args): -        """RePCe client is callabe, calling it implements a synchronous remote call +        """RePCe client is callabe, calling it implements a synchronous +        remote call. -        We do a .push with a cbk which does a wakeup upon receiving anwser, then wait -        on the RepceJob. +        We do a .push with a cbk which does a wakeup upon receiving anwser, +        then wait on the RepceJob.          """ -        rjob = self.push(meth, *args, **{'cbk': lambda rj, res: rj.wakeup(res)}) +        rjob = self.push( +            meth, *args, **{'cbk': lambda rj, res: rj.wakeup(res)})          exc, res = rjob.wait()          if exc: -            logging.error('call %s (%s) failed on peer with %s' % (repr(rjob), meth, str(type(res).__name__))) +            logging.error('call %s (%s) failed on peer with %s' % +                          (repr(rjob), meth, str(type(res).__name__)))              raise res          logging.debug("call %s %s -> %s" % (repr(rjob), meth, repr(res)))          return res      class mprx(object): -        """method proxy, standard trick to implement rubyesque method_missing -           in Python -        A class is a closure factory, you know what I mean, or go read some SICP. +        """method proxy, standard trick to implement rubyesque +        method_missing in Python + +        A class is a closure factory, you know what I mean, or go read +        some SICP.          """ +          def __init__(self, ins, meth):              self.ins = ins              self.meth = meth diff --git a/geo-replication/syncdaemon/resource.py b/geo-replication/syncdaemon/resource.py index 41add6fb287..2fb6b3078d8 100644 --- a/geo-replication/syncdaemon/resource.py +++ b/geo-replication/syncdaemon/resource.py @@ -1,3 +1,13 @@ +# +# Copyright (c) 2011-2014 Red Hat, Inc. <http://www.redhat.com> +# This file is part of GlusterFS. + +# This file is licensed to you under your choice of the GNU Lesser +# General Public License, version 3 or any later version (LGPLv3 or +# later), or the GNU General Public License, version 2 (GPLv2), in all +# cases as published by the Free Software Foundation. +# +  import re  import os  import sys @@ -12,27 +22,31 @@ import logging  import tempfile  import threading  import subprocess -from errno import EEXIST, ENOENT, ENODATA, ENOTDIR, ELOOP, EISDIR, ENOTEMPTY, ESTALE, EINVAL +from errno import EEXIST, ENOENT, ENODATA, ENOTDIR, ELOOP +from errno import EISDIR, ENOTEMPTY, ESTALE, EINVAL  from select import error as SelectError  from gconf import gconf  import repce  from repce import RepceServer, RepceClient -from  master import gmaster_builder +from master import gmaster_builder  import syncdutils  from syncdutils import GsyncdError, select, privileged, boolify, funcode  from syncdutils import umask, entry2pb, gauxpfx, errno_wrap, lstat -UrlRX  = re.compile('\A(\w+)://([^ *?[]*)\Z') +UrlRX = re.compile('\A(\w+)://([^ *?[]*)\Z')  HostRX = re.compile('[a-z\d](?:[a-z\d.-]*[a-z\d])?', re.I)  UserRX = re.compile("[\w!\#$%&'*+-\/=?^_`{|}~]+") +  def sup(x, *a, **kw):      """a rubyesque "super" for python ;)      invoke caller method in parent class with given args.      """ -    return getattr(super(type(x), x), sys._getframe(1).f_code.co_name)(*a, **kw) +    return getattr(super(type(x), x), +                   sys._getframe(1).f_code.co_name)(*a, **kw) +  def desugar(ustr):      """transform sugared url strings to standard <scheme>://<urlbody> form @@ -57,15 +71,17 @@ def desugar(ustr):              ap = ap[1:]          return "file://" + ap +  def gethostbyname(hnam):      """gethostbyname wrapper"""      try:          return socket.gethostbyname(hnam)      except socket.gaierror:          ex = sys.exc_info()[1] -        raise GsyncdError("failed to resolve %s: %s" % \ +        raise GsyncdError("failed to resolve %s: %s" %                            (hnam, ex.strerror)) +  def parse_url(ustr):      """instantiate an url object by scheme-to-class dispatch @@ -86,6 +102,7 @@ def parse_url(ustr):  class _MetaXattr(object): +      """singleton class, a lazy wrapper around the      libcxattr module @@ -100,17 +117,19 @@ class _MetaXattr(object):      def __getattr__(self, meth):          from libcxattr import Xattr as LXattr -        xmeth = [ m for m in dir(LXattr) if m[0] != '_' ] +        xmeth = [m for m in dir(LXattr) if m[0] != '_']          if not meth in xmeth:              return          for m in xmeth:              setattr(self, m, getattr(LXattr, m))          return getattr(self, meth) +  class _MetaChangelog(object): +      def __getattr__(self, meth):          from libgfchangelog import Changes as LChanges -        xmeth = [ m for m in dir(LChanges) if m[0] != '_' ] +        xmeth = [m for m in dir(LChanges) if m[0] != '_']          if not meth in xmeth:              return          for m in xmeth: @@ -122,6 +141,7 @@ Changes = _MetaChangelog()  class Popen(subprocess.Popen): +      """customized subclass of subprocess.Popen with a ring      buffer for children error output""" @@ -129,11 +149,13 @@ class Popen(subprocess.Popen):      def init_errhandler(cls):          """start the thread which handles children's error output"""          cls.errstore = {} +          def tailer():              while True:                  errstore = cls.errstore.copy()                  try: -                    poe, _ ,_ = select([po.stderr for po in errstore], [], [], 1) +                    poe, _, _ = select( +                        [po.stderr for po in errstore], [], [], 1)                  except (ValueError, SelectError):                      continue                  for po in errstore: @@ -154,12 +176,12 @@ class Popen(subprocess.Popen):                          tots = len(l)                          for lx in la:                              tots += len(lx) -                        while tots > 1<<20 and la: +                        while tots > 1 << 20 and la:                              tots -= len(la.pop(0))                          la.append(l)                      finally:                          po.lock.release() -        t = syncdutils.Thread(target = tailer) +        t = syncdutils.Thread(target=tailer)          t.start()          cls.errhandler = t @@ -189,8 +211,9 @@ class Popen(subprocess.Popen):              ex = sys.exc_info()[1]              if not isinstance(ex, OSError):                  raise -            raise GsyncdError("""execution of "%s" failed with %s (%s)""" % \ -                              (args[0], errno.errorcode[ex.errno], os.strerror(ex.errno))) +            raise GsyncdError("""execution of "%s" failed with %s (%s)""" % +                              (args[0], errno.errorcode[ex.errno], +                               os.strerror(ex.errno)))          if kw.get('stderr') == subprocess.PIPE:              assert(getattr(self, 'errhandler', None))              self.errstore[self] = [] @@ -200,9 +223,10 @@ class Popen(subprocess.Popen):          filling = ""          if self.elines:              filling = ", saying:" -        logging.error("""command "%s" returned with %s%s""" % \ +        logging.error("""command "%s" returned with %s%s""" %                        (" ".join(self.args), repr(self.returncode), filling))          lp = '' +          def logerr(l):              logging.error(self.args[0] + "> " + l)          for l in self.elines: @@ -217,9 +241,9 @@ class Popen(subprocess.Popen):      def errfail(self):          """fail nicely if child did not terminate with success"""          self.errlog() -        syncdutils.finalize(exval = 1) +        syncdutils.finalize(exval=1) -    def terminate_geterr(self, fail_on_err = True): +    def terminate_geterr(self, fail_on_err=True):          """kill child, finalize stderr harvesting (unregister          from errhandler, set up .elines), fail on error if          asked for @@ -230,14 +254,14 @@ class Popen(subprocess.Popen):          finally:              self.lock.release()          elines = self.errstore.pop(self) -        if self.poll() == None: +        if self.poll() is None:              self.terminate() -            if self.poll() == None: +            if self.poll() is None:                  time.sleep(0.1)                  self.kill()                  self.wait()          while True: -            if not select([self.stderr],[],[],0.1)[0]: +            if not select([self.stderr], [], [], 0.1)[0]:                  break              b = os.read(self.stderr.fileno(), 1024)              if b: @@ -251,6 +275,7 @@ class Popen(subprocess.Popen):  class Server(object): +      """singleton implemening those filesystem access primitives         which are needed for geo-replication functionality @@ -260,25 +285,28 @@ class Server(object):      GX_NSPACE_PFX = (privileged() and "trusted" or "system")      GX_NSPACE = GX_NSPACE_PFX + ".glusterfs" -    NTV_FMTSTR = "!" + "B"*19 + "II" +    NTV_FMTSTR = "!" + "B" * 19 + "II"      FRGN_XTRA_FMT = "I"      FRGN_FMTSTR = NTV_FMTSTR + FRGN_XTRA_FMT -    GX_GFID_CANONICAL_LEN = 37 # canonical gfid len + '\0' +    GX_GFID_CANONICAL_LEN = 37  # canonical gfid len + '\0' -    GFID_XATTR = 'trusted.gfid'  # for backend gfid fetch, do not use GX_NSPACE_PFX -    GFID_FMTSTR = "!" + "B"*16 +    # for backend gfid fetch, do not use GX_NSPACE_PFX +    GFID_XATTR = 'trusted.gfid' +    GFID_FMTSTR = "!" + "B" * 16      local_path = ''      @classmethod      def _fmt_mknod(cls, l): -        return "!II%dsI%dsIII" % (cls.GX_GFID_CANONICAL_LEN, l+1) +        return "!II%dsI%dsIII" % (cls.GX_GFID_CANONICAL_LEN, l + 1) +      @classmethod      def _fmt_mkdir(cls, l): -        return "!II%dsI%dsII" % (cls.GX_GFID_CANONICAL_LEN, l+1) +        return "!II%dsI%dsII" % (cls.GX_GFID_CANONICAL_LEN, l + 1) +      @classmethod      def _fmt_symlink(cls, l1, l2): -        return "!II%dsI%ds%ds" % (cls.GX_GFID_CANONICAL_LEN, l1+1, l2+1) +        return "!II%dsI%ds%ds" % (cls.GX_GFID_CANONICAL_LEN, l1 + 1, l2 + 1)      def _pathguard(f):          """decorator method that checks @@ -289,6 +317,7 @@ class Server(object):          fc = funcode(f)          pi = list(fc.co_varnames).index('path') +          def ff(*a):              path = a[pi]              ps = path.split('/') @@ -308,7 +337,6 @@ class Server(object):              raise OSError(ENOTDIR, os.strerror(ENOTDIR))          return os.listdir(path) -      @classmethod      @_pathguard      def lstat(cls, path): @@ -325,7 +353,9 @@ class Server(object):      @_pathguard      def linkto_check(cls, path):          try: -            return not (Xattr.lgetxattr_buf(path, 'trusted.glusterfs.dht.linkto') == '') +            return not ( +                Xattr.lgetxattr_buf(path, +                                    'trusted.glusterfs.dht.linkto') == '')          except (IOError, OSError):              ex = sys.exc_info()[1]              if ex.errno in (ENOENT, ENODATA): @@ -333,13 +363,13 @@ class Server(object):              else:                  raise -      @classmethod      @_pathguard      def gfid(cls, path):          try:              buf = Xattr.lgetxattr(path, cls.GFID_XATTR, 16) -            m = re.match('(.{8})(.{4})(.{4})(.{4})(.{12})', "".join(['%02x' % x for x in struct.unpack(cls.GFID_FMTSTR, buf)])) +            m = re.match('(.{8})(.{4})(.{4})(.{4})(.{12})', "".join( +                ['%02x' % x for x in struct.unpack(cls.GFID_FMTSTR, buf)]))              return '-'.join(m.groups())          except (IOError, OSError):              ex = sys.exc_info()[1] @@ -350,7 +380,9 @@ class Server(object):      @classmethod      def gfid_mnt(cls, gfidpath): -        return errno_wrap(Xattr.lgetxattr, [gfidpath, 'glusterfs.gfid.string', cls.GX_GFID_CANONICAL_LEN], [ENOENT]) +        return errno_wrap(Xattr.lgetxattr, +                          [gfidpath, 'glusterfs.gfid.string', +                           cls.GX_GFID_CANONICAL_LEN], [ENOENT])      @classmethod      @_pathguard @@ -369,7 +401,7 @@ class Server(object):          for e in entries:              cls.purge(os.path.join(path, e))          """ -        me_also = entries == None +        me_also = entries is None          if not entries:              try:                  # if it's a symlink, prevent @@ -435,7 +467,9 @@ class Server(object):          """          try: -            return struct.unpack('!II', Xattr.lgetxattr(path, '.'.join([cls.GX_NSPACE, uuid, 'xtime']), 8)) +            val = Xattr.lgetxattr(path, +                                  '.'.join([cls.GX_NSPACE, uuid, 'xtime'])) +            return struct.unpack('!II', val, 8)          except OSError:              ex = sys.exc_info()[1]              if ex.errno in (ENOENT, ENODATA, ENOTDIR): @@ -454,7 +488,9 @@ class Server(object):          """          try: -            return struct.unpack('!II', Xattr.lgetxattr(path, '.'.join([cls.GX_NSPACE, uuid, 'stime']), 8)) +            val = Xattr.lgetxattr(path, +                                  '.'.join([cls.GX_NSPACE, uuid, 'stime'])) +            return struct.unpack('!II', val, 8)          except OSError:              ex = sys.exc_info()[1]              if ex.errno in (ENOENT, ENODATA, ENOTDIR): @@ -473,7 +509,9 @@ class Server(object):          """          try: -            return struct.unpack('!II', Xattr.lgetxattr(path, '.'.join([cls.GX_NSPACE, uuid, 'stime']), 8)) +            val = Xattr.lgetxattr(path, +                                  '.'.join([cls.GX_NSPACE, uuid, 'stime'])) +            return struct.unpack('!II', val, 8)          except OSError:              ex = sys.exc_info()[1]              if ex.errno in (ENOENT, ENODATA, ENOTDIR): @@ -484,7 +522,8 @@ class Server(object):      @classmethod      def node_uuid(cls, path='.'):          try: -            uuid_l = Xattr.lgetxattr_buf(path, '.'.join([cls.GX_NSPACE, 'node-uuid'])) +            uuid_l = Xattr.lgetxattr_buf( +                path, '.'.join([cls.GX_NSPACE, 'node-uuid']))              return uuid_l[:-1].split(' ')          except OSError:              raise @@ -493,13 +532,17 @@ class Server(object):      @_pathguard      def set_stime(cls, path, uuid, mark):          """set @mark as stime for @uuid on @path""" -        Xattr.lsetxattr(path, '.'.join([cls.GX_NSPACE, uuid, 'stime']), struct.pack('!II', *mark)) +        Xattr.lsetxattr( +            path, '.'.join([cls.GX_NSPACE, uuid, 'stime']), +            struct.pack('!II', *mark))      @classmethod      @_pathguard      def set_xtime(cls, path, uuid, mark):          """set @mark as xtime for @uuid on @path""" -        Xattr.lsetxattr(path, '.'.join([cls.GX_NSPACE, uuid, 'xtime']), struct.pack('!II', *mark)) +        Xattr.lsetxattr( +            path, '.'.join([cls.GX_NSPACE, uuid, 'xtime']), +            struct.pack('!II', *mark))      @classmethod      @_pathguard @@ -511,18 +554,22 @@ class Server(object):          on the brick (this method sets xtime on the          remote slave)          """ -        Xattr.lsetxattr(path, '.'.join([cls.GX_NSPACE, uuid, 'xtime']), struct.pack('!II', *mark)) +        Xattr.lsetxattr( +            path, '.'.join([cls.GX_NSPACE, uuid, 'xtime']), +            struct.pack('!II', *mark))      @classmethod      def entry_ops(cls, entries):          pfx = gauxpfx()          logging.debug('entries: %s' % repr(entries))          # regular file +          def entry_pack_reg(gf, bn, mo, uid, gid):              blen = len(bn)              return struct.pack(cls._fmt_mknod(blen),                                 uid, gid, gf, mo, bn,                                 stat.S_IMODE(mo), 0, umask()) +          def entry_pack_reg_stat(gf, bn, st):              blen = len(bn)              mo = st['mode'] @@ -531,18 +578,21 @@ class Server(object):                                 gf, mo, bn,                                 stat.S_IMODE(mo), 0, umask())          # mkdir +          def entry_pack_mkdir(gf, bn, mo, uid, gid):              blen = len(bn)              return struct.pack(cls._fmt_mkdir(blen),                                 uid, gid, gf, mo, bn,                                 stat.S_IMODE(mo), umask()) -        #symlink +        # symlink +          def entry_pack_symlink(gf, bn, lnk, st):              blen = len(bn)              llen = len(lnk)              return struct.pack(cls._fmt_symlink(blen, llen),                                 st['uid'], st['gid'],                                 gf, st['mode'], bn, lnk) +          def entry_purge(entry, gfid):              # This is an extremely racy code and needs to be fixed ASAP.              # The GFID check here is to be sure that the pargfid/bname @@ -574,9 +624,11 @@ class Server(object):                      else:                          break              elif op in ['CREATE', 'MKNOD']: -                blob = entry_pack_reg(gfid, bname, e['mode'], e['uid'], e['uid']) +                blob = entry_pack_reg( +                    gfid, bname, e['mode'], e['uid'], e['uid'])              elif op == 'MKDIR': -                blob = entry_pack_mkdir(gfid, bname, e['mode'], e['uid'], e['uid']) +                blob = entry_pack_mkdir( +                    gfid, bname, e['mode'], e['uid'], e['uid'])              elif op == 'LINK':                  slink = os.path.join(pfx, gfid)                  st = lstat(slink) @@ -596,21 +648,23 @@ class Server(object):                  else:                      errno_wrap(os.rename, [entry, en], [ENOENT, EEXIST])              if blob: -                errno_wrap(Xattr.lsetxattr_l, [pg, 'glusterfs.gfid.newfile', blob], [EEXIST], [ENOENT, ESTALE, EINVAL]) +                errno_wrap(Xattr.lsetxattr_l, [pg, 'glusterfs.gfid.newfile', +                                               blob], +                           [EEXIST], [ENOENT, ESTALE, EINVAL])      @classmethod      def meta_ops(cls, meta_entries):          logging.debug('Meta-entries: %s' % repr(meta_entries))          for e in meta_entries:              mode = e['stat']['mode'] -            uid  = e['stat']['uid'] -            gid  = e['stat']['gid'] -            go   = e['go'] +            uid = e['stat']['uid'] +            gid = e['stat']['gid'] +            go = e['go']              errno_wrap(os.chmod, [go, mode], [ENOENT], [ESTALE, EINVAL])              errno_wrap(os.chown, [go, uid, gid], [ENOENT], [ESTALE, EINVAL])      @classmethod -    def changelog_register(cls, cl_brick, cl_dir, cl_log, cl_level, retries = 0): +    def changelog_register(cls, cl_brick, cl_dir, cl_log, cl_level, retries=0):          Changes.cl_register(cl_brick, cl_dir, cl_log, cl_level, retries)      @classmethod @@ -649,6 +703,7 @@ class Server(object):          return os.getpid()      last_keep_alive = 0 +      @classmethod      def keep_alive(cls, dct):          """process keepalive messages. @@ -662,9 +717,12 @@ class Server(object):          if dct:              key = '.'.join([cls.GX_NSPACE, 'volume-mark', dct['uuid']])              val = struct.pack(cls.FRGN_FMTSTR, -                              *(dct['version']  + -                                tuple(int(x,16) for x in re.findall('(?:[\da-f]){2}', dct['uuid'])) + -                                (dct['retval'],) + dct['volume_mark'][0:2] + (dct['timeout'],))) +                              *(dct['version'] + +                                tuple(int(x, 16) +                                      for x in re.findall('(?:[\da-f]){2}', +                                                          dct['uuid'])) + +                                (dct['retval'],) + dct['volume_mark'][0:2] + ( +                                    dct['timeout'],)))              Xattr.lsetxattr('.', key, val)          cls.last_keep_alive += 1          return cls.last_keep_alive @@ -676,6 +734,7 @@ class Server(object):  class SlaveLocal(object): +      """mix-in class to implement some factes of a slave server      ("mix-in" is sort of like "abstract class", ie. it's not @@ -697,9 +756,11 @@ class SlaveLocal(object):          """          if boolify(gconf.use_rsync_xattrs) and not privileged(): -            raise GsyncdError("using rsync for extended attributes is not supported") +            raise GsyncdError( +                "using rsync for extended attributes is not supported") -        repce = RepceServer(self.server, sys.stdin, sys.stdout, int(gconf.sync_jobs)) +        repce = RepceServer( +            self.server, sys.stdin, sys.stdout, int(gconf.sync_jobs))          t = syncdutils.Thread(target=lambda: (repce.service_loop(),                                                syncdutils.finalize()))          t.start() @@ -709,12 +770,16 @@ class SlaveLocal(object):                  lp = self.server.last_keep_alive                  time.sleep(int(gconf.timeout))                  if lp == self.server.last_keep_alive: -                    logging.info("connection inactive for %d seconds, stopping" % int(gconf.timeout)) +                    logging.info( +                        "connection inactive for %d seconds, stopping" % +                        int(gconf.timeout))                      break          else:              select((), (), ()) +  class SlaveRemote(object): +      """mix-in class to implement an interface to a remote slave"""      def connect_remote(self, rargs=[], **opts): @@ -731,9 +796,11 @@ class SlaveRemote(object):              extra_opts += ['--session-owner', so]          if boolify(gconf.use_rsync_xattrs):              extra_opts.append('--use-rsync-xattrs') -        po = Popen(rargs + gconf.remote_gsyncd.split() + extra_opts + \ -                   ['-N', '--listen', '--timeout', str(gconf.timeout), slave], -                   stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) +        po = Popen(rargs + gconf.remote_gsyncd.split() + extra_opts + +                   ['-N', '--listen', '--timeout', str(gconf.timeout), +                    slave], +                   stdin=subprocess.PIPE, stdout=subprocess.PIPE, +                   stderr=subprocess.PIPE)          gconf.transport = po          return self.start_fd_client(po.stdout, po.stdin, **opts) @@ -752,7 +819,9 @@ class SlaveRemote(object):              for k, v in da0[i].iteritems():                  da1[i][k] = int(v)          if da1[0] != da1[1]: -            raise GsyncdError("RePCe major version mismatch: local %s, remote %s" % (exrv, rv)) +            raise GsyncdError( +                "RePCe major version mismatch: local %s, remote %s" % +                (exrv, rv))      def rsync(self, files, *args):          """invoke rsync""" @@ -760,17 +829,19 @@ class SlaveRemote(object):              raise GsyncdError("no files to sync")          logging.debug("files: " + ", ".join(files))          argv = gconf.rsync_command.split() + \ -               ['-avR0', '--inplace', '--files-from=-', '--super','--stats', '--numeric-ids', '--no-implied-dirs'] + \ -               gconf.rsync_options.split() + (boolify(gconf.use_rsync_xattrs) and ['--xattrs'] or []) + \ -               ['.'] + list(args) -        po = Popen(argv, stdin=subprocess.PIPE,stderr=subprocess.PIPE) +            ['-avR0', '--inplace', '--files-from=-', '--super', +             '--stats', '--numeric-ids', '--no-implied-dirs'] + \ +            gconf.rsync_options.split() + \ +            (boolify(gconf.use_rsync_xattrs) and ['--xattrs'] or []) + \ +            ['.'] + list(args) +        po = Popen(argv, stdin=subprocess.PIPE, stderr=subprocess.PIPE)          for f in files:              po.stdin.write(f)              po.stdin.write('\0')          po.stdin.close()          po.wait() -        po.terminate_geterr(fail_on_err = False) +        po.terminate_geterr(fail_on_err=False)          return po @@ -784,8 +855,10 @@ class SlaveRemote(object):          logging.debug("files: " + ", ".join(files))          (host, rdir) = slaveurl.split(':')          tar_cmd = ["tar", "-cf", "-", "--files-from", "-"] -        ssh_cmd = gconf.ssh_command_tar.split() +  [host, "tar", "--overwrite", "-xf", "-", "-C", rdir] -        p0 = Popen(tar_cmd, stdout=subprocess.PIPE, stdin=subprocess.PIPE, stderr=subprocess.PIPE) +        ssh_cmd = gconf.ssh_command_tar.split() + \ +            [host, "tar", "--overwrite", "-xf", "-", "-C", rdir] +        p0 = Popen(tar_cmd, stdout=subprocess.PIPE, +                   stdin=subprocess.PIPE, stderr=subprocess.PIPE)          p1 = Popen(ssh_cmd, stdin=p0.stdout, stderr=subprocess.PIPE)          for f in files:              p0.stdin.write(f) @@ -795,14 +868,16 @@ class SlaveRemote(object):          # wait() for tar to terminate, collecting any errors, further          # waiting for transfer to complete          p0.wait() -        p0.terminate_geterr(fail_on_err = False) +        p0.terminate_geterr(fail_on_err=False)          p1.wait() -        p1.terminate_geterr(fail_on_err = False) +        p1.terminate_geterr(fail_on_err=False)          return p1 +  class AbstractUrl(object): +      """abstract base class for url scheme classes"""      def __init__(self, path, pattern): @@ -839,6 +914,7 @@ class AbstractUrl(object):  class FILE(AbstractUrl, SlaveLocal, SlaveRemote): +      """scheme class for file:// urls      can be used to represent a file slave server @@ -847,6 +923,7 @@ class FILE(AbstractUrl, SlaveLocal, SlaveRemote):      """      class FILEServer(Server): +          """included server flavor"""          pass @@ -864,6 +941,7 @@ class FILE(AbstractUrl, SlaveLocal, SlaveRemote):  class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote): +      """scheme class for gluster:// urls      can be used to represent a gluster slave server @@ -874,21 +952,24 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):      """      class GLUSTERServer(Server): +          "server enhancements for a glusterfs backend"""          @classmethod -        def _attr_unpack_dict(cls, xattr, extra_fields = ''): +        def _attr_unpack_dict(cls, xattr, extra_fields=''):              """generic volume mark fetching/parsing backed"""              fmt_string = cls.NTV_FMTSTR + extra_fields              buf = Xattr.lgetxattr('.', xattr, struct.calcsize(fmt_string))              vm = struct.unpack(fmt_string, buf) -            m = re.match('(.{8})(.{4})(.{4})(.{4})(.{12})', "".join(['%02x' % x for x in vm[2:18]])) +            m = re.match( +                '(.{8})(.{4})(.{4})(.{4})(.{12})', +                "".join(['%02x' % x for x in vm[2:18]]))              uuid = '-'.join(m.groups()) -            volinfo = {  'version': vm[0:2], -                         'uuid'   : uuid, -                         'retval' : vm[18], -                         'volume_mark': vm[19:21], -                      } +            volinfo = {'version': vm[0:2], +                       'uuid': uuid, +                       'retval': vm[18], +                       'volume_mark': vm[19:21], +                       }              if extra_fields:                  return volinfo, vm[-len(extra_fields):]              else: @@ -904,7 +985,8 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):                      d, x = cls._attr_unpack_dict(ele, cls.FRGN_XTRA_FMT)                      now = int(time.time())                      if x[0] > now: -                        logging.debug("volinfo[%s] expires: %d (%d sec later)" % \ +                        logging.debug("volinfo[%s] expires: %d " +                                      "(%d sec later)" %                                        (d['uuid'], x[0], x[0] - now))                          d['timeout'] = x[0]                          dict_list.append(d) @@ -919,7 +1001,8 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):          def native_volume_info(cls):              """get the native volume mark of the underlying gluster volume"""              try: -                return cls._attr_unpack_dict('.'.join([cls.GX_NSPACE, 'volume-mark'])) +                return cls._attr_unpack_dict('.'.join([cls.GX_NSPACE, +                                                       'volume-mark']))              except OSError:                  ex = sys.exc_info()[1]                  if ex.errno != ENODATA: @@ -936,9 +1019,10 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):      def can_connect_to(self, remote):          """determine our position in the connectibility matrix"""          return not remote or \ -               (isinstance(remote, SSH) and isinstance(remote.inner_rsc, GLUSTER)) +            (isinstance(remote, SSH) and isinstance(remote.inner_rsc, GLUSTER))      class Mounter(object): +          """Abstract base class for mounter backends"""          def __init__(self, params): @@ -1003,7 +1087,7 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):                      if not t.isAlive():                          break                      if time.time() >= tlim: -                        syncdutils.finalize(exval = 1) +                        syncdutils.finalize(exval=1)                      time.sleep(1)                  os.close(mpo)                  _, rv = syncdutils.waitpid(mh, 0) @@ -1011,7 +1095,8 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):                      rv = (os.WIFEXITED(rv) and os.WEXITSTATUS(rv) or 0) - \                           (os.WIFSIGNALED(rv) and os.WTERMSIG(rv) or 0)                      logging.warn('stale mount possibly left behind on ' + d) -                    raise GsyncdError("cleaning up temp mountpoint %s failed with status %d" % \ +                    raise GsyncdError("cleaning up temp mountpoint %s " +                                      "failed with status %d" %                                        (d, rv))              else:                  rv = 0 @@ -1035,7 +1120,7 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):                          assert(mntpt)                          if mounted:                              po = self.umount_l(mntpt) -                            po.terminate_geterr(fail_on_err = False) +                            po.terminate_geterr(fail_on_err=False)                              if po.returncode != 0:                                  po.errlog()                                  rv = po.returncode @@ -1047,6 +1132,7 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):              logging.debug('auxiliary glusterfs mount prepared')      class DirectMounter(Mounter): +          """mounter backend which calls mount(8), umount(8) directly"""          mountkw = {'stderr': subprocess.PIPE} @@ -1057,15 +1143,17 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):              return ['umount', '-l', d]          def make_mount_argv(self): -            self.mntpt = tempfile.mkdtemp(prefix = 'gsyncd-aux-mount-') -            return [self.get_glusterprog()] + ['--' + p for p in self.params] + [self.mntpt] +            self.mntpt = tempfile.mkdtemp(prefix='gsyncd-aux-mount-') +            return [self.get_glusterprog()] + \ +                ['--' + p for p in self.params] + [self.mntpt] -        def cleanup_mntpt(self, mntpt = None): +        def cleanup_mntpt(self, mntpt=None):              if not mntpt:                  mntpt = self.mntpt              os.rmdir(mntpt)      class MountbrokerMounter(Mounter): +          """mounter backend using the mountbroker gluster service"""          mountkw = {'stderr': subprocess.PIPE, 'stdout': subprocess.PIPE} @@ -1073,7 +1161,8 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):          @classmethod          def make_cli_argv(cls): -            return [cls.get_glusterprog()] + gconf.gluster_cli_options.split() + ['system::'] +            return [cls.get_glusterprog()] + \ +                gconf.gluster_cli_options.split() + ['system::']          @classmethod          def make_umount_argv(cls, d): @@ -1081,7 +1170,8 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):          def make_mount_argv(self, label):              return self.make_cli_argv() + \ -                   ['mount', label, 'user-map-root=' + syncdutils.getusername()] + self.params +                ['mount', label, 'user-map-root=' + +                    syncdutils.getusername()] + self.params          def handle_mounter(self, po):              self.mntpt = po.stdout.readline()[:-1] @@ -1106,9 +1196,10 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):              label = syncdutils.getusername()          mounter = label and self.MountbrokerMounter or self.DirectMounter          params = gconf.gluster_params.split() + \ -                   (gconf.gluster_log_level and ['log-level=' + gconf.gluster_log_level] or []) + \ -                   ['log-file=' + gconf.gluster_log_file, 'volfile-server=' + self.host, -                    'volfile-id=' + self.volume, 'client-pid=-1'] +            (gconf.gluster_log_level and ['log-level=' + +                                          gconf.gluster_log_level] or []) + \ +            ['log-file=' + gconf.gluster_log_file, 'volfile-server=' + +             self.host, 'volfile-id=' + self.volume, 'client-pid=-1']          mounter(params).inhibit(*[l for l in [label] if l])      def connect_remote(self, *a, **kw): @@ -1116,8 +1207,10 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):          self.slavedir = "/proc/%d/cwd" % self.server.pid()      def gmaster_instantiate_tuple(self, slave): -        """return a tuple of the 'one shot' and the 'main crawl' class instance""" -        return (gmaster_builder('xsync')(self, slave), gmaster_builder()(self, slave)) +        """return a tuple of the 'one shot' and the 'main crawl' +        class instance""" +        return (gmaster_builder('xsync')(self, slave), +                gmaster_builder()(self, slave))      def service_loop(self, *args):          """enter service loop @@ -1133,6 +1226,7 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):                  class brickserver(FILE.FILEServer):                      local_path = gconf.local_path                      aggregated = self.server +                      @classmethod                      def entries(cls, path):                          e = super(brickserver, cls).entries(path) @@ -1143,14 +1237,17 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):                              except ValueError:                                  pass                          return e +                      @classmethod                      def lstat(cls, e):                          """ path based backend stat """                          return super(brickserver, cls).lstat(e) +                      @classmethod                      def gfid(cls, e):                          """ path based backend gfid fetch """                          return super(brickserver, cls).gfid(e) +                      @classmethod                      def linkto_check(cls, e):                          return super(brickserver, cls).linkto_check(e) @@ -1158,9 +1255,25 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):                      # define {,set_}xtime in slave, thus preempting                      # the call to remote, so that it takes data from                      # the local brick -                    slave.server.xtime = types.MethodType(lambda _self, path, uuid: brickserver.xtime(path, uuid + '.' + gconf.slave_id), slave.server) -                    slave.server.stime = types.MethodType(lambda _self, path, uuid: brickserver.stime(path, uuid + '.' + gconf.slave_id), slave.server) -                    slave.server.set_stime = types.MethodType(lambda _self, path, uuid, mark: brickserver.set_stime(path, uuid + '.' + gconf.slave_id, mark), slave.server) +                    slave.server.xtime = types.MethodType( +                        lambda _self, path, uuid: ( +                            brickserver.xtime(path, +                                              uuid + '.' + gconf.slave_id) +                        ), +                        slave.server) +                    slave.server.stime = types.MethodType( +                        lambda _self, path, uuid: ( +                            brickserver.stime(path, +                                              uuid + '.' + gconf.slave_id) +                        ), +                        slave.server) +                    slave.server.set_stime = types.MethodType( +                        lambda _self, path, uuid, mark: ( +                            brickserver.set_stime(path, +                                                  uuid + '.' + gconf.slave_id, +                                                  mark) +                        ), +                        slave.server)                  (g1, g2) = self.gmaster_instantiate_tuple(slave)                  g1.master.server = brickserver                  g2.master.server = brickserver @@ -1186,6 +1299,7 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):  class SSH(AbstractUrl, SlaveRemote): +      """scheme class for ssh:// urls      interface to remote slave on master side @@ -1194,7 +1308,9 @@ class SSH(AbstractUrl, SlaveRemote):      def __init__(self, path):          self.remote_addr, inner_url = sup(self, path, -                                          '^((?:%s@)?%s):(.+)' % tuple([ r.pattern for r in (UserRX, HostRX) ])) +                                          '^((?:%s@)?%s):(.+)' % +                                          tuple([r.pattern +                                                 for r in (UserRX, HostRX)]))          self.inner_rsc = parse_url(inner_url)          self.volume = inner_url[1:] @@ -1262,7 +1378,8 @@ class SSH(AbstractUrl, SlaveRemote):                                   self.inner_rsc.url)          deferred = go_daemon == 'postconn' -        ret = sup(self, gconf.ssh_command.split() + gconf.ssh_ctl_args + [self.remote_addr], +        ret = sup(self, gconf.ssh_command.split() + gconf.ssh_ctl_args + +                  [self.remote_addr],                    slave=self.inner_rsc.url, deferred=deferred)          if deferred: @@ -1285,7 +1402,8 @@ class SSH(AbstractUrl, SlaveRemote):              return 'should'      def rsync(self, files): -        return sup(self, files, '-e', " ".join(gconf.ssh_command.split() + gconf.ssh_ctl_args), +        return sup(self, files, '-e', +                   " ".join(gconf.ssh_command.split() + gconf.ssh_ctl_args),                     *(gconf.rsync_ssh_options.split() + [self.slaveurl]))      def tarssh(self, files): diff --git a/geo-replication/syncdaemon/syncdutils.py b/geo-replication/syncdaemon/syncdutils.py index 1b5684c6d0c..822d919ecb1 100644 --- a/geo-replication/syncdaemon/syncdutils.py +++ b/geo-replication/syncdaemon/syncdutils.py @@ -1,3 +1,13 @@ +# +# Copyright (c) 2011-2014 Red Hat, Inc. <http://www.redhat.com> +# This file is part of GlusterFS. + +# This file is licensed to you under your choice of the GNU Lesser +# General Public License, version 3 or any later version (LGPLv3 or +# later), or the GNU General Public License, version 2 (GPLv2), in all +# cases as published by the Free Software Foundation. +# +  import os  import sys  import pwd @@ -7,9 +17,9 @@ import shutil  import logging  import socket  from threading import Lock, Thread as baseThread -from errno import EACCES, EAGAIN, EPIPE, ENOTCONN, ECONNABORTED, EINTR, ENOENT, EPERM, ESTALE, errorcode -from signal import signal, SIGTERM, SIGKILL -from time import sleep +from errno import EACCES, EAGAIN, EPIPE, ENOTCONN, ECONNABORTED +from errno import EINTR, ENOENT, EPERM, ESTALE, errorcode +from signal import signal, SIGTERM  import select as oselect  from os import waitpid as owaitpid @@ -37,25 +47,29 @@ except ImportError:  _CL_AUX_GFID_PFX = ".gfid/"  GF_OP_RETRIES = 20 +  def escape(s):      """the chosen flavor of string escaping, used all over         to turn whatever data to creatable representation"""      return urllib.quote_plus(s) +  def unescape(s):      """inverse of .escape"""      return urllib.unquote_plus(s) +  def norm(s):      if s:          return s.replace('-', '_') -def update_file(path, updater, merger = lambda f: True): + +def update_file(path, updater, merger=lambda f: True):      """update a file in a transaction-like manner"""      fr = fw = None      try: -        fd = os.open(path, os.O_CREAT|os.O_RDWR) +        fd = os.open(path, os.O_CREAT | os.O_RDWR)          try:              fr = os.fdopen(fd, 'r+b')          except: @@ -66,7 +80,7 @@ def update_file(path, updater, merger = lambda f: True):              return          tmpp = path + '.tmp.' + str(os.getpid()) -        fd = os.open(tmpp, os.O_CREAT|os.O_EXCL|os.O_WRONLY) +        fd = os.open(tmpp, os.O_CREAT | os.O_EXCL | os.O_WRONLY)          try:              fw = os.fdopen(fd, 'wb', 0)          except: @@ -80,29 +94,31 @@ def update_file(path, updater, merger = lambda f: True):              if fx:                  fx.close() +  def create_manifest(fname, content):      """      Create manifest file for SSH Control Path      """      fd = None      try: -        fd = os.open(fname, os.O_CREAT|os.O_RDWR) +        fd = os.open(fname, os.O_CREAT | os.O_RDWR)          try:              os.write(fd, content)          except:              os.close(fd)              raise      finally: -        if fd != None: +        if fd is not None:              os.close(fd) +  def setup_ssh_ctl(ctld, remote_addr, resource_url):      """      Setup GConf ssh control path parameters      """      gconf.ssh_ctl_dir = ctld      content = "SLAVE_HOST=%s\nSLAVE_RESOURCE_URL=%s" % (remote_addr, -                                                            resource_url) +                                                        resource_url)      content_md5 = md5hex(content)      fname = os.path.join(gconf.ssh_ctl_dir,                           "%s.mft" % content_md5) @@ -112,16 +128,17 @@ def setup_ssh_ctl(ctld, remote_addr, resource_url):                                  "%s.sock" % content_md5)      gconf.ssh_ctl_args = ["-oControlMaster=auto", "-S", ssh_ctl_path] +  def grabfile(fname, content=None):      """open @fname + contest for its fcntl lock      @content: if given, set the file content to it      """      # damn those messy open() mode codes -    fd = os.open(fname, os.O_CREAT|os.O_RDWR) +    fd = os.open(fname, os.O_CREAT | os.O_RDWR)      f = os.fdopen(fd, 'r+b', 0)      try: -        fcntl.lockf(f, fcntl.LOCK_EX|fcntl.LOCK_NB) +        fcntl.lockf(f, fcntl.LOCK_EX | fcntl.LOCK_NB)      except:          ex = sys.exc_info()[1]          f.close() @@ -139,6 +156,7 @@ def grabfile(fname, content=None):      gconf.permanent_handles.append(f)      return f +  def grabpidfile(fname=None, setpid=True):      """.grabfile customization for pid files"""      if not fname: @@ -150,6 +168,7 @@ def grabpidfile(fname=None, setpid=True):  final_lock = Lock() +  def finalize(*a, **kw):      """all those messy final steps we go trough upon termination @@ -169,7 +188,7 @@ def finalize(*a, **kw):                  if os.waitpid(gconf.cpid, os.WNOHANG)[0] == gconf.cpid:                      # child has terminated                      rm_pidf = True -                    break; +                    break                  time.sleep(0.1)          if rm_pidf:              try: @@ -194,6 +213,7 @@ def finalize(*a, **kw):      sys.stderr.flush()      os._exit(kw.get('exval', 0)) +  def log_raise_exception(excont):      """top-level exception handler @@ -218,20 +238,27 @@ def log_raise_exception(excont):                  logging.error(exc.args[0])              sys.stderr.write('failure: ' + exc.args[0] + '\n')          elif isinstance(exc, PickleError) or isinstance(exc, EOFError) or \ -             ((isinstance(exc, OSError) or isinstance(exc, IOError)) and \ -              exc.errno == EPIPE): +            ((isinstance(exc, OSError) or isinstance(exc, IOError)) and +             exc.errno == EPIPE):              logging.error('connection to peer is broken')              if hasattr(gconf, 'transport'):                  gconf.transport.wait()                  if gconf.transport.returncode == 127:                      logging.warn("!!!!!!!!!!!!!") -                    logging.warn('!!! getting "No such file or directory" errors ' -                                 "is most likely due to MISCONFIGURATION, please consult " -                                 "https://access.redhat.com/site/documentation/en-US/Red_Hat_Storage/2.1/html/Administration_Guide/chap-User_Guide-Geo_Rep-Preparation-Settingup_Environment.html") +                    logging.warn('!!! getting "No such file or directory" ' +                                 "errors is most likely due to " +                                 "MISCONFIGURATION" +                                 ", please consult https://access.redhat.com" +                                 "/site/documentation/en-US/Red_Hat_Storage" +                                 "/2.1/html/Administration_Guide" +                                 "/chap-User_Guide-Geo_Rep-Preparation-" +                                 "Settingup_Environment.html")                      logging.warn("!!!!!!!!!!!!!")                  gconf.transport.terminate_geterr() -        elif isinstance(exc, OSError) and exc.errno in (ENOTCONN, ECONNABORTED): -            logging.error('glusterfs session went down [%s]', errorcode[exc.errno]) +        elif isinstance(exc, OSError) and exc.errno in (ENOTCONN, +                                                        ECONNABORTED): +            logging.error('glusterfs session went down [%s]', +                          errorcode[exc.errno])          else:              logtag = "FAIL"          if not logtag and logging.getLogger().isEnabledFor(logging.DEBUG): @@ -244,46 +271,54 @@ def log_raise_exception(excont):  class FreeObject(object): +      """wildcard class for which any attribute can be set"""      def __init__(self, **kw): -        for k,v in kw.items(): +        for k, v in kw.items():              setattr(self, k, v) +  class Thread(baseThread): +      """thread class flavor for gsyncd      - always a daemon thread      - force exit for whole program if thread        function coughs up an exception      """ +      def __init__(self, *a, **kw):          tf = kw.get('target')          if tf:              def twrap(*aa): -                excont = FreeObject(exval = 0) +                excont = FreeObject(exval=0)                  try:                      tf(*aa)                  except:                      try:                          log_raise_exception(excont)                      finally: -                        finalize(exval = excont.exval) +                        finalize(exval=excont.exval)              kw['target'] = twrap          baseThread.__init__(self, *a, **kw)          self.setDaemon(True) +  class GsyncdError(Exception):      pass -def getusername(uid = None): -    if uid == None: + +def getusername(uid=None): +    if uid is None:          uid = os.geteuid()      return pwd.getpwuid(uid).pw_name +  def privileged():      return os.geteuid() == 0 +  def boolify(s):      """      Generic string to boolean converter @@ -294,7 +329,7 @@ def boolify(s):      - False if it's in false_list      - Warn if it's not present in either and return False      """ -    true_list  = ['true', 'yes', '1', 'on'] +    true_list = ['true', 'yes', '1', 'on']      false_list = ['false', 'no', '0', 'off']      if isinstance(s, bool): @@ -305,10 +340,12 @@ def boolify(s):      if lstr in true_list:          rv = True      elif not lstr in false_list: -        logging.warn("Unknown string (%s) in string to boolean conversion defaulting to False\n" % (s)) +        logging.warn("Unknown string (%s) in string to boolean conversion " +                     "defaulting to False\n" % (s))      return rv +  def eintr_wrap(func, exc, *a):      """      wrapper around syscalls resilient to interrupt caused @@ -322,19 +359,24 @@ def eintr_wrap(func, exc, *a):              if not ex.args[0] == EINTR:                  raise +  def select(*a):      return eintr_wrap(oselect.select, oselect.error, *a) -def waitpid (*a): + +def waitpid(*a):      return eintr_wrap(owaitpid, OSError, *a) +  def set_term_handler(hook=lambda *a: finalize(*a, **{'exval': 1})):      signal(SIGTERM, hook) +  def is_host_local(host):      locaddr = False      for ai in socket.getaddrinfo(host, None): -        # cf. http://github.com/gluster/glusterfs/blob/ce111f47/xlators/mgmt/glusterd/src/glusterd-utils.c#L125 +        # cf. http://github.com/gluster/glusterfs/blob/ce111f47/xlators +        # /mgmt/glusterd/src/glusterd-utils.c#L125          if ai[0] == socket.AF_INET:              if ai[-1][0].split(".")[0] == "127":                  locaddr = True @@ -358,8 +400,8 @@ def is_host_local(host):                  f = open("/proc/sys/net/ipv4/ip_nonlocal_bind")                  if int(f.read()) != 0:                      raise GsyncdError( -                            "non-local bind is set and not allowed to create raw sockets, " -                            "cannot determine if %s is local" % host) +                        "non-local bind is set and not allowed to create " +                        "raw sockets, cannot determine if %s is local" % host)                  s = socket.socket(ai[0], socket.SOCK_DGRAM)              finally:                  if f: @@ -373,6 +415,7 @@ def is_host_local(host):          s.close()      return locaddr +  def funcode(f):      fc = getattr(f, 'func_code', None)      if not fc: @@ -380,32 +423,40 @@ def funcode(f):          fc = f.__code__      return fc +  def memoize(f):      fc = funcode(f)      fn = fc.co_name +      def ff(self, *a, **kw):          rv = getattr(self, '_' + fn, None) -        if rv == None: +        if rv is None:              rv = f(self, *a, **kw)              setattr(self, '_' + fn, rv)          return rv      return ff +  def umask():      return os.umask(0) +  def entry2pb(e):      return e.rsplit('/', 1) +  def gauxpfx():      return _CL_AUX_GFID_PFX +  def md5hex(s):      return md5(s).hexdigest() +  def selfkill(sig=SIGTERM):      os.kill(os.getpid(), sig) +  def errno_wrap(call, arg=[], errnos=[], retry_errnos=[ESTALE]):      """ wrapper around calls resilient to errnos.      retry in case of ESTALE by default. @@ -427,6 +478,7 @@ def errno_wrap(call, arg=[], errnos=[], retry_errnos=[ESTALE]):                  return              time.sleep(0.250)  # retry the call +  def lstat(e):      try:          return os.lstat(e)  | 
