summaryrefslogtreecommitdiffstats
path: root/geo-replication/syncdaemon/resource.py
diff options
context:
space:
mode:
Diffstat (limited to 'geo-replication/syncdaemon/resource.py')
-rw-r--r--geo-replication/syncdaemon/resource.py1389
1 files changed, 605 insertions, 784 deletions
diff --git a/geo-replication/syncdaemon/resource.py b/geo-replication/syncdaemon/resource.py
index c617c7b312e..f5a19629e7a 100644
--- a/geo-replication/syncdaemon/resource.py
+++ b/geo-replication/syncdaemon/resource.py
@@ -13,25 +13,24 @@ import os
import sys
import stat
import time
-import signal
import fcntl
import types
import struct
-import socket
import logging
import tempfile
import subprocess
-import errno
from errno import EEXIST, ENOENT, ENODATA, ENOTDIR, ELOOP, EACCES
from errno import EISDIR, ENOTEMPTY, ESTALE, EINVAL, EBUSY, EPERM
-import shutil
+import errno
+
+from rconf import rconf
+import gsyncdconfig as gconf
-from gconf import gconf
import repce
from repce import RepceServer, RepceClient
from master import gmaster_builder
import syncdutils
-from syncdutils import GsyncdError, select, privileged, boolify, funcode
+from syncdutils import GsyncdError, select, privileged, funcode
from syncdutils import umask, entry2pb, gauxpfx, errno_wrap, lstat
from syncdutils import NoStimeAvailable, PartialHistoryAvailable
from syncdutils import ChangelogException, ChangelogHistoryNotAvailable
@@ -39,13 +38,9 @@ from syncdutils import get_changelog_log_level, get_rsync_version
from syncdutils import CHANGELOG_AGENT_CLIENT_VERSION
from syncdutils import GX_GFID_CANONICAL_LEN
from gsyncdstatus import GeorepStatus
-from syncdutils import get_master_and_slave_data_from_args
from syncdutils import mntpt_list, lf, Popen, sup, Volinfo
from syncdutils import Xattr, matching_disk_gfid, get_gfid_from_mnt
-UrlRX = re.compile('\A(\w+)://([^ *?[]*)\Z')
-HostRX = re.compile('[a-zA-Z\d](?:[a-zA-Z\d.-]*[a-zA-Z\d])?', re.I)
-UserRX = re.compile("[\w!\#$%&'*+-\/=?^_`{|}~]+")
ENOTSUP = getattr(errno, 'ENOTSUP', 'EOPNOTSUPP')
@@ -53,58 +48,6 @@ slv_volume = None
slv_host = None
slv_bricks = None
-def desugar(ustr):
- """transform sugared url strings to standard <scheme>://<urlbody> form
-
- parsing logic enforces the constraint that sugared forms should contatin
- a ':' or a '/', which ensures that sugared urls do not conflict with
- gluster volume names.
- """
- m = re.match('([^:]*):(.*)', ustr)
- if m:
- if not m.groups()[0]:
- return "gluster://localhost" + ustr
- elif '@' in m.groups()[0] or re.search('[:/]', m.groups()[1]):
- return "ssh://" + ustr
- else:
- return "gluster://" + ustr
- else:
- if ustr[0] != '/':
- raise GsyncdError("cannot resolve sugared url '%s'" % ustr)
- ap = os.path.normpath(ustr)
- if ap.startswith('//'):
- ap = ap[1:]
- return "file://" + ap
-
-
-def gethostbyname(hnam):
- """gethostbyname wrapper"""
- try:
- return socket.gethostbyname(hnam)
- except socket.gaierror:
- ex = sys.exc_info()[1]
- raise GsyncdError("failed to resolve %s: %s" %
- (hnam, ex.strerror))
-
-
-def parse_url(ustr):
- """instantiate an url object by scheme-to-class dispatch
-
- The url classes taken into consideration are the ones in
- this module whose names are full-caps.
- """
- m = UrlRX.match(ustr)
- if not m:
- ustr = desugar(ustr)
- m = UrlRX.match(ustr)
- if not m:
- raise GsyncdError("malformed url")
- sch, path = m.groups()
- this = sys.modules[__name__]
- if not hasattr(this, sch.upper()):
- raise GsyncdError("unknown url scheme " + sch)
- return getattr(this, sch.upper())(path)
-
class Server(object):
@@ -149,14 +92,14 @@ class Server(object):
fc = funcode(f)
pi = list(fc.co_varnames).index('path')
- def ff(*a):
- path = a[pi]
+ def ff(*args):
+ path = args[pi]
ps = path.split('/')
if path[0] == '/' or '..' in ps:
raise ValueError('unsafe path')
- a = list(a)
- a[pi] = os.path.join(a[0].local_path, path)
- return f(*a)
+ args = list(args)
+ args[pi] = os.path.join(args[0].local_path, path)
+ return f(*args)
return ff
@classmethod
@@ -493,7 +436,8 @@ class Server(object):
else:
en = e['entry']
disk_gfid = get_gfid_from_mnt(en)
- if isinstance(disk_gfid, basestring) and e['gfid'] != disk_gfid:
+ if isinstance(disk_gfid, basestring) and \
+ e['gfid'] != disk_gfid:
slv_entry_info['gfid_mismatch'] = True
st = lstat(en)
if not isinstance(st, int):
@@ -560,7 +504,6 @@ class Server(object):
[ENOENT, EEXIST], [ESTALE, EBUSY])
collect_failure(e, cmd_ret)
-
for e in entries:
blob = None
op = e['op']
@@ -636,19 +579,21 @@ class Server(object):
global slv_volume
global slv_host
if not slv_bricks:
- slv_info = Volinfo (slv_volume, slv_host)
+ slv_info = Volinfo(slv_volume, slv_host)
slv_bricks = slv_info.bricks
# Result of readlink would be of format as below.
# readlink = "../../pgfid[0:2]/pgfid[2:4]/pgfid/basename"
realpath = os.readlink(os.path.join(slv_bricks[0]['dir'],
- ".glusterfs", gfid[0:2],
- gfid[2:4], gfid))
+ ".glusterfs",
+ gfid[0:2],
+ gfid[2:4],
+ gfid))
realpath_parts = realpath.split('/')
src_pargfid = realpath_parts[-2]
src_basename = realpath_parts[-1]
src_entry = os.path.join(pfx, src_pargfid, src_basename)
logging.info(lf("Special case: rename on mkdir",
- gfid=gfid, entry=repr(entry)))
+ gfid=gfid, entry=repr(entry)))
rename_with_disk_gfid_confirmation(gfid, src_entry, entry)
elif op == 'LINK':
slink = os.path.join(pfx, gfid)
@@ -735,7 +680,7 @@ class Server(object):
[pg, 'glusterfs.gfid.newfile', blob],
[EEXIST, ENOENT],
[ESTALE, EINVAL, EBUSY])
- failed = collect_failure(e, cmd_ret)
+ collect_failure(e, cmd_ret)
# If UID/GID is different than zero that means we are trying
# create Entry with different UID/GID. Create Entry with
@@ -852,274 +797,239 @@ class Server(object):
return 1.0
-class SlaveLocal(object):
+class Mounter(object):
- """mix-in class to implement some factes of a slave server
-
- ("mix-in" is sort of like "abstract class", ie. it's not
- instantiated just included in the ancesty DAG. I use "mix-in"
- to indicate that it's not used as an abstract base class,
- rather just taken in to implement additional functionality
- on the basis of the assumed availability of certain interfaces.)
- """
+ """Abstract base class for mounter backends"""
- def can_connect_to(self, remote):
- """determine our position in the connectibility matrix"""
- return not remote
-
- def service_loop(self):
- """start a RePCe server serving self's server
-
- stop servicing if a timeout is configured and got no
- keep-alime in that inteval
- """
-
- if boolify(gconf.use_rsync_xattrs) and not privileged():
- raise GsyncdError(
- "using rsync for extended attributes is not supported")
-
- repce = RepceServer(
- self.server, sys.stdin, sys.stdout, int(gconf.sync_jobs))
- t = syncdutils.Thread(target=lambda: (repce.service_loop(),
- syncdutils.finalize()))
- t.start()
- logging.info("slave listening")
- if gconf.timeout and int(gconf.timeout) > 0:
- while True:
- lp = self.server.last_keep_alive
- time.sleep(int(gconf.timeout))
- if lp == self.server.last_keep_alive:
- logging.info(
- lf("connection inactive, stopping",
- timeout=int(gconf.timeout)))
- break
- else:
- select((), (), ())
+ def __init__(self, params):
+ self.params = params
+ self.mntpt = None
+ @classmethod
+ def get_glusterprog(cls):
+ return os.path.join(gconf.get("gluster-command-dir"),
+ cls.glusterprog)
+
+ def umount_l(self, d):
+ """perform lazy umount"""
+ po = Popen(self.make_umount_argv(d), stderr=subprocess.PIPE)
+ po.wait()
+ return po
-class SlaveRemote(object):
+ @classmethod
+ def make_umount_argv(cls, d):
+ raise NotImplementedError
- """mix-in class to implement an interface to a remote slave"""
+ def make_mount_argv(self, label=None):
+ raise NotImplementedError
- def connect_remote(self, rargs=[], **opts):
- """connects to a remote slave
+ def cleanup_mntpt(self, *a):
+ pass
- Invoke an auxiliary utility (slave gsyncd, possibly wrapped)
- which sets up the connection and set up a RePCe client to
- communicate throuh its stdio.
- """
- slave = opts.get('slave', self.url)
- extra_opts = []
- so = getattr(gconf, 'session_owner', None)
- if so:
- extra_opts += ['--session-owner', so]
- li = getattr(gconf, 'local_id', None)
- if li:
- extra_opts += ['--local-id', li]
- ln = getattr(gconf, 'local_node', None)
- if ln:
- extra_opts += ['--local-node', ln]
- if boolify(gconf.use_rsync_xattrs):
- extra_opts.append('--use-rsync-xattrs')
- if boolify(gconf.access_mount):
- extra_opts.append('--access-mount')
- po = Popen(rargs + gconf.remote_gsyncd.split() + extra_opts +
- ['-N', '--listen', '--timeout', str(gconf.timeout),
- slave],
- stdin=subprocess.PIPE, stdout=subprocess.PIPE,
- stderr=subprocess.PIPE)
- gconf.transport = po
- return self.start_fd_client(po.stdout, po.stdin, **opts)
+ def handle_mounter(self, po):
+ po.wait()
- def start_fd_client(self, i, o, **opts):
- """set up RePCe client, handshake with server
+ def inhibit(self, label):
+ """inhibit a gluster filesystem
- It's cut out as a separate method to let
- subclasses hook into client startup
+ Mount glusterfs over a temporary mountpoint,
+ change into the mount, and lazy unmount the
+ filesystem.
"""
- self.server = RepceClient(i, o)
- rv = self.server.__version__()
- exrv = {'proto': repce.repce_version, 'object': Server.version()}
- da0 = (rv, exrv)
- da1 = ({}, {})
- for i in range(2):
- for k, v in da0[i].iteritems():
- da1[i][k] = int(v)
- if da1[0] != da1[1]:
- raise GsyncdError(
- "RePCe major version mismatch: local %s, remote %s" %
- (exrv, rv))
-
- def rsync(self, files, *args, **kw):
- """invoke rsync"""
- if not files:
- raise GsyncdError("no files to sync")
- logging.debug("files: " + ", ".join(files))
-
- extra_rsync_flags = []
- # Performance flag, --ignore-missing-args, if rsync version is
- # greater than 3.1.0 then include this flag.
- if boolify(gconf.rsync_opt_ignore_missing_args) and \
- get_rsync_version(gconf.rsync_command) >= "3.1.0":
- extra_rsync_flags = ["--ignore-missing-args"]
-
- argv = gconf.rsync_command.split() + \
- ['-aR0', '--inplace', '--files-from=-', '--super',
- '--stats', '--numeric-ids', '--no-implied-dirs'] + \
- (boolify(gconf.rsync_opt_existing) and ['--existing'] or []) + \
- gconf.rsync_options.split() + \
- (boolify(gconf.sync_xattrs) and ['--xattrs'] or []) + \
- (boolify(gconf.sync_acls) and ['--acls'] or []) + \
- extra_rsync_flags + \
- ['.'] + list(args)
-
- log_rsync_performance = boolify(gconf.configinterface.get_realtime(
- "log_rsync_performance", default_value=False))
+ access_mount = gconf.get("access-mount")
+ if rconf.args.subcmd == "slave":
+ access_mount = gconf.get("slave-access-mount")
+
+ mpi, mpo = os.pipe()
+ mh = Popen.fork()
+ if mh:
+ # Parent
+ os.close(mpi)
+ fcntl.fcntl(mpo, fcntl.F_SETFD, fcntl.FD_CLOEXEC)
+ d = None
+ margv = self.make_mount_argv(label)
+ if self.mntpt:
+ # mntpt is determined pre-mount
+ d = self.mntpt
+ os.write(mpo, d + '\0')
+ po = Popen(margv, **self.mountkw)
+ self.handle_mounter(po)
+ po.terminate_geterr()
+ logging.debug('auxiliary glusterfs mount in place')
+ if not d:
+ # mntpt is determined during mount
+ d = self.mntpt
+ os.write(mpo, d + '\0')
+ os.write(mpo, 'M')
+ t = syncdutils.Thread(target=lambda: os.chdir(d))
+ t.start()
+ tlim = rconf.starttime + gconf.get("connection-timeout")
+ while True:
+ if not t.isAlive():
+ break
- if log_rsync_performance:
- # use stdout=PIPE only when log_rsync_performance enabled
- # Else rsync will write to stdout and nobody is their
- # to consume. If PIPE is full rsync hangs.
- po = Popen(argv, stdin=subprocess.PIPE, stdout=subprocess.PIPE,
- stderr=subprocess.PIPE)
+ if time.time() >= tlim:
+ syncdutils.finalize(exval=1)
+ time.sleep(1)
+ os.close(mpo)
+ _, rv = syncdutils.waitpid(mh, 0)
+ if rv:
+ rv = (os.WIFEXITED(rv) and os.WEXITSTATUS(rv) or 0) - \
+ (os.WIFSIGNALED(rv) and os.WTERMSIG(rv) or 0)
+ logging.warn(lf('stale mount possibly left behind',
+ path=d))
+ raise GsyncdError("cleaning up temp mountpoint %s "
+ "failed with status %d" %
+ (d, rv))
else:
- po = Popen(argv, stdin=subprocess.PIPE, stderr=subprocess.PIPE)
+ rv = 0
+ try:
+ os.setsid()
+ os.close(mpo)
+ mntdata = ''
+ while True:
+ c = os.read(mpi, 1)
+ if not c:
+ break
+ mntdata += c
+ if mntdata:
+ mounted = False
+ if mntdata[-1] == 'M':
+ mntdata = mntdata[:-1]
+ assert(mntdata)
+ mounted = True
+ assert(mntdata[-1] == '\0')
+ mntpt = mntdata[:-1]
+ assert(mntpt)
+ if mounted and not access_mount:
+ po = self.umount_l(mntpt)
+ po.terminate_geterr(fail_on_err=False)
+ if po.returncode != 0:
+ po.errlog()
+ rv = po.returncode
+ if not access_mount:
+ self.cleanup_mntpt(mntpt)
+ except:
+ logging.exception('mount cleanup failure:')
+ rv = 200
+ os._exit(rv)
+ logging.debug('auxiliary glusterfs mount prepared')
+
+
+class DirectMounter(Mounter):
+
+ """mounter backend which calls mount(8), umount(8) directly"""
+
+ mountkw = {'stderr': subprocess.PIPE}
+ glusterprog = 'glusterfs'
- for f in files:
- po.stdin.write(f)
- po.stdin.write('\0')
+ @staticmethod
+ def make_umount_argv(d):
+ return ['umount', '-l', d]
- stdout, stderr = po.communicate()
+ def make_mount_argv(self, label=None):
+ self.mntpt = tempfile.mkdtemp(prefix='gsyncd-aux-mount-')
+ mntpt_list.append(self.mntpt)
+ return [self.get_glusterprog()] + \
+ ['--' + p for p in self.params] + [self.mntpt]
- if kw.get("log_err", False):
- for errline in stderr.strip().split("\n")[:-1]:
- logging.error(lf("SYNC Error",
- sync_engine="Rsync",
- error=errline))
+ def cleanup_mntpt(self, mntpt=None):
+ if not mntpt:
+ mntpt = self.mntpt
+ errno_wrap(os.rmdir, [mntpt], [ENOENT, EBUSY])
- if log_rsync_performance:
- rsync_msg = []
- for line in stdout.split("\n"):
- if line.startswith("Number of files:") or \
- line.startswith("Number of regular files transferred:") or \
- line.startswith("Total file size:") or \
- line.startswith("Total transferred file size:") or \
- line.startswith("Literal data:") or \
- line.startswith("Matched data:") or \
- line.startswith("Total bytes sent:") or \
- line.startswith("Total bytes received:") or \
- line.startswith("sent "):
- rsync_msg.append(line)
- logging.info(lf("rsync performance",
- data=", ".join(rsync_msg)))
- return po
+class MountbrokerMounter(Mounter):
- def tarssh(self, files, slaveurl, log_err=False):
- """invoke tar+ssh
- -z (compress) can be use if needed, but omitting it now
- as it results in weird error (tar+ssh errors out (errcode: 2)
- """
- if not files:
- raise GsyncdError("no files to sync")
- logging.debug("files: " + ", ".join(files))
- (host, rdir) = slaveurl.split(':')
- tar_cmd = ["tar"] + \
- ["--sparse", "-cf", "-", "--files-from", "-"]
- ssh_cmd = gconf.ssh_command_tar.split() + \
- ["-p", str(gconf.ssh_port)] + \
- [host, "tar"] + \
- ["--overwrite", "-xf", "-", "-C", rdir]
- p0 = Popen(tar_cmd, stdout=subprocess.PIPE,
- stdin=subprocess.PIPE, stderr=subprocess.PIPE)
- p1 = Popen(ssh_cmd, stdin=p0.stdout, stderr=subprocess.PIPE)
- for f in files:
- p0.stdin.write(f)
- p0.stdin.write('\n')
+ """mounter backend using the mountbroker gluster service"""
- p0.stdin.close()
- p0.stdout.close() # Allow p0 to receive a SIGPIPE if p1 exits.
- # wait for tar to terminate, collecting any errors, further
- # waiting for transfer to complete
- _, stderr1 = p1.communicate()
+ mountkw = {'stderr': subprocess.PIPE, 'stdout': subprocess.PIPE}
+ glusterprog = 'gluster'
- # stdin and stdout of p0 is already closed, Reset to None and
- # wait for child process to complete
- p0.stdin = None
- p0.stdout = None
- p0.communicate()
-
- if log_err:
- for errline in stderr1.strip().split("\n")[:-1]:
- logging.error(lf("SYNC Error",
- sync_engine="Tarssh",
- error=errline))
-
- return p1
+ @classmethod
+ def make_cli_argv(cls):
+ return [cls.get_glusterprog()] + ['--remote-host=localhost'] + \
+ gconf.get("gluster-cli-options").split() + ['system::']
+ @classmethod
+ def make_umount_argv(cls, d):
+ return cls.make_cli_argv() + ['umount', d, 'lazy']
-class AbstractUrl(object):
+ def make_mount_argv(self, label):
+ return self.make_cli_argv() + \
+ ['mount', label, 'user-map-root=' +
+ syncdutils.getusername()] + self.params
- """abstract base class for url scheme classes"""
+ def handle_mounter(self, po):
+ self.mntpt = po.stdout.readline()[:-1]
+ po.stdout.close()
+ sup(self, po)
+ if po.returncode != 0:
+ # if cli terminated with error due to being
+ # refused by glusterd, what it put
+ # out on stdout is a diagnostic message
+ logging.error(lf('glusterd answered', mnt=self.mntpt))
- def __init__(self, path, pattern):
- m = re.search(pattern, path)
- if not m:
- raise GsyncdError("malformed path")
- self.path = path
- return m.groups()
- @property
- def scheme(self):
- return type(self).__name__.lower()
+class GLUSTERServer(Server):
- def canonical_path(self):
- return self.path
+ "server enhancements for a glusterfs backend"""
- def get_url(self, canonical=False, escaped=False):
- """format self's url in various styles"""
- if canonical:
- pa = self.canonical_path()
+ @classmethod
+ def _attr_unpack_dict(cls, xattr, extra_fields=''):
+ """generic volume mark fetching/parsing backed"""
+ fmt_string = cls.NTV_FMTSTR + extra_fields
+ buf = Xattr.lgetxattr('.', xattr, struct.calcsize(fmt_string))
+ vm = struct.unpack(fmt_string, buf)
+ m = re.match(
+ '(.{8})(.{4})(.{4})(.{4})(.{12})',
+ "".join(['%02x' % x for x in vm[2:18]]))
+ uuid = '-'.join(m.groups())
+ volinfo = {'version': vm[0:2],
+ 'uuid': uuid,
+ 'retval': vm[18],
+ 'volume_mark': vm[19:21],
+ }
+ if extra_fields:
+ return volinfo, vm[-len(extra_fields):]
else:
- pa = self.path
- u = "://".join((self.scheme, pa))
- if escaped:
- u = syncdutils.escape(u)
- return u
-
- @property
- def url(self):
- return self.get_url()
-
-
-class FILE(AbstractUrl, SlaveLocal, SlaveRemote):
-
- """scheme class for file:// urls
-
- can be used to represent a file slave server
- on slave side, or interface to a remote file
- file server on master side
- """
-
- class FILEServer(Server):
-
- """included server flavor"""
- pass
-
- server = FILEServer
-
- def __init__(self, path):
- sup(self, path, '^/')
+ return volinfo
- def connect(self):
- """inhibit the resource beyond"""
- os.chdir(self.path)
+ @classmethod
+ def foreign_volume_infos(cls):
+ """return list of valid (not expired) foreign volume marks"""
+ dict_list = []
+ xattr_list = Xattr.llistxattr_buf('.')
+ for ele in xattr_list:
+ if ele.find('.'.join([cls.GX_NSPACE, 'volume-mark', ''])) == 0:
+ d, x = cls._attr_unpack_dict(ele, cls.FRGN_XTRA_FMT)
+ now = int(time.time())
+ if x[0] > now:
+ logging.debug("volinfo[%s] expires: %d "
+ "(%d sec later)" %
+ (d['uuid'], x[0], x[0] - now))
+ d['timeout'] = x[0]
+ dict_list.append(d)
+ else:
+ try:
+ Xattr.lremovexattr('.', ele)
+ except OSError:
+ pass
+ return dict_list
- def rsync(self, files, log_err=False):
- return sup(self, files, self.path, log_err=log_err)
+ @classmethod
+ def native_volume_info(cls):
+ """get the native volume mark of the underlying gluster volume"""
+ try:
+ return cls._attr_unpack_dict('.'.join([cls.GX_NSPACE,
+ 'volume-mark']))
+ except OSError:
+ ex = sys.exc_info()[1]
+ if ex.errno != ENODATA:
+ raise
-class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):
+class GLUSTER(object):
"""scheme class for gluster:// urls
@@ -1129,247 +1039,18 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):
(slave-ish features come from the mixins, master
functionality is outsourced to GMaster from master)
"""
-
- class GLUSTERServer(Server):
-
- "server enhancements for a glusterfs backend"""
-
- @classmethod
- def _attr_unpack_dict(cls, xattr, extra_fields=''):
- """generic volume mark fetching/parsing backed"""
- fmt_string = cls.NTV_FMTSTR + extra_fields
- buf = Xattr.lgetxattr('.', xattr, struct.calcsize(fmt_string))
- vm = struct.unpack(fmt_string, buf)
- m = re.match(
- '(.{8})(.{4})(.{4})(.{4})(.{12})',
- "".join(['%02x' % x for x in vm[2:18]]))
- uuid = '-'.join(m.groups())
- volinfo = {'version': vm[0:2],
- 'uuid': uuid,
- 'retval': vm[18],
- 'volume_mark': vm[19:21],
- }
- if extra_fields:
- return volinfo, vm[-len(extra_fields):]
- else:
- return volinfo
-
- @classmethod
- def foreign_volume_infos(cls):
- """return list of valid (not expired) foreign volume marks"""
- dict_list = []
- xattr_list = Xattr.llistxattr_buf('.')
- for ele in xattr_list:
- if ele.find('.'.join([cls.GX_NSPACE, 'volume-mark', ''])) == 0:
- d, x = cls._attr_unpack_dict(ele, cls.FRGN_XTRA_FMT)
- now = int(time.time())
- if x[0] > now:
- logging.debug("volinfo[%s] expires: %d "
- "(%d sec later)" %
- (d['uuid'], x[0], x[0] - now))
- d['timeout'] = x[0]
- dict_list.append(d)
- else:
- try:
- Xattr.lremovexattr('.', ele)
- except OSError:
- pass
- return dict_list
-
- @classmethod
- def native_volume_info(cls):
- """get the native volume mark of the underlying gluster volume"""
- try:
- return cls._attr_unpack_dict('.'.join([cls.GX_NSPACE,
- 'volume-mark']))
- except OSError:
- ex = sys.exc_info()[1]
- if ex.errno != ENODATA:
- raise
-
server = GLUSTERServer
- def __init__(self, path):
- self.host, self.volume = sup(self, path, '^(%s):(.+)' % HostRX.pattern)
+ def __init__(self, host, volume):
+ self.path = "%s:%s" % (host, volume)
+ self.host = host
+ self.volume = volume
global slv_volume
global slv_host
slv_volume = self.volume
slv_host = self.host
- def canonical_path(self):
- return ':'.join([gethostbyname(self.host), self.volume])
-
- def can_connect_to(self, remote):
- """determine our position in the connectibility matrix"""
- return not remote or \
- (isinstance(remote, SSH) and isinstance(remote.inner_rsc, GLUSTER))
-
- class Mounter(object):
-
- """Abstract base class for mounter backends"""
-
- def __init__(self, params):
- self.params = params
- self.mntpt = None
-
- @classmethod
- def get_glusterprog(cls):
- return os.path.join(gconf.gluster_command_dir, cls.glusterprog)
-
- def umount_l(self, d):
- """perform lazy umount"""
- po = Popen(self.make_umount_argv(d), stderr=subprocess.PIPE)
- po.wait()
- return po
-
- @classmethod
- def make_umount_argv(cls, d):
- raise NotImplementedError
-
- def make_mount_argv(self, *a):
- raise NotImplementedError
-
- def cleanup_mntpt(self, *a):
- pass
-
- def handle_mounter(self, po):
- po.wait()
-
- def inhibit(self, *a):
- """inhibit a gluster filesystem
-
- Mount glusterfs over a temporary mountpoint,
- change into the mount, and lazy unmount the
- filesystem.
- """
-
- mpi, mpo = os.pipe()
- mh = Popen.fork()
- if mh:
- os.close(mpi)
- fcntl.fcntl(mpo, fcntl.F_SETFD, fcntl.FD_CLOEXEC)
- d = None
- margv = self.make_mount_argv(*a)
- if self.mntpt:
- # mntpt is determined pre-mount
- d = self.mntpt
- os.write(mpo, d + '\0')
- po = Popen(margv, **self.mountkw)
- self.handle_mounter(po)
- po.terminate_geterr()
- logging.debug('auxiliary glusterfs mount in place')
- if not d:
- # mntpt is determined during mount
- d = self.mntpt
- os.write(mpo, d + '\0')
- os.write(mpo, 'M')
- t = syncdutils.Thread(target=lambda: os.chdir(d))
- t.start()
- tlim = gconf.starttime + int(gconf.connection_timeout)
- while True:
- if not t.isAlive():
- break
- if time.time() >= tlim:
- syncdutils.finalize(exval=1)
- time.sleep(1)
- os.close(mpo)
- _, rv = syncdutils.waitpid(mh, 0)
- if rv:
- rv = (os.WIFEXITED(rv) and os.WEXITSTATUS(rv) or 0) - \
- (os.WIFSIGNALED(rv) and os.WTERMSIG(rv) or 0)
- logging.warn(lf('stale mount possibly left behind',
- path=d))
- raise GsyncdError("cleaning up temp mountpoint %s "
- "failed with status %d" %
- (d, rv))
- else:
- rv = 0
- try:
- os.setsid()
- os.close(mpo)
- mntdata = ''
- while True:
- c = os.read(mpi, 1)
- if not c:
- break
- mntdata += c
- if mntdata:
- mounted = False
- if mntdata[-1] == 'M':
- mntdata = mntdata[:-1]
- assert(mntdata)
- mounted = True
- assert(mntdata[-1] == '\0')
- mntpt = mntdata[:-1]
- assert(mntpt)
- if mounted and not boolify(gconf.access_mount):
- po = self.umount_l(mntpt)
- po.terminate_geterr(fail_on_err=False)
- if po.returncode != 0:
- po.errlog()
- rv = po.returncode
- if not boolify(gconf.access_mount):
- self.cleanup_mntpt(mntpt)
- except:
- logging.exception('mount cleanup failure:')
- rv = 200
- os._exit(rv)
- logging.debug('auxiliary glusterfs mount prepared')
-
- class DirectMounter(Mounter):
-
- """mounter backend which calls mount(8), umount(8) directly"""
-
- mountkw = {'stderr': subprocess.PIPE}
- glusterprog = 'glusterfs'
-
- @staticmethod
- def make_umount_argv(d):
- return ['umount', '-l', d]
-
- def make_mount_argv(self):
- self.mntpt = tempfile.mkdtemp(prefix='gsyncd-aux-mount-')
- mntpt_list.append(self.mntpt)
- return [self.get_glusterprog()] + \
- ['--' + p for p in self.params] + [self.mntpt]
-
- def cleanup_mntpt(self, mntpt=None):
- if not mntpt:
- mntpt = self.mntpt
- errno_wrap(os.rmdir, [mntpt], [ENOENT, EBUSY])
-
- class MountbrokerMounter(Mounter):
-
- """mounter backend using the mountbroker gluster service"""
-
- mountkw = {'stderr': subprocess.PIPE, 'stdout': subprocess.PIPE}
- glusterprog = 'gluster'
-
- @classmethod
- def make_cli_argv(cls):
- return [cls.get_glusterprog()] + ['--remote-host=localhost'] + \
- gconf.gluster_cli_options.split() + ['system::']
-
- @classmethod
- def make_umount_argv(cls, d):
- return cls.make_cli_argv() + ['umount', d, 'lazy']
-
- def make_mount_argv(self, label):
- return self.make_cli_argv() + \
- ['mount', label, 'user-map-root=' +
- syncdutils.getusername()] + self.params
-
- def handle_mounter(self, po):
- self.mntpt = po.stdout.readline()[:-1]
- po.stdout.close()
- sup(self, po)
- if po.returncode != 0:
- # if cli terminated with error due to being
- # refused by glusterd, what it put
- # out on stdout is a diagnostic message
- logging.error(lf('glusterd answered', mnt=self.mntpt))
-
def connect(self):
"""inhibit the resource beyond
@@ -1380,23 +1061,29 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):
logging.info("Mounting gluster volume locally...")
t0 = time.time()
- label = getattr(gconf, 'mountbroker', None)
+ label = gconf.get('mountbroker', None)
if not label and not privileged():
label = syncdutils.getusername()
- mounter = label and self.MountbrokerMounter or self.DirectMounter
- params = gconf.gluster_params.split() + \
- (gconf.gluster_log_level and ['log-level=' +
- gconf.gluster_log_level] or []) + \
- ['log-file=' + gconf.gluster_log_file, 'volfile-server=' +
- self.host, 'volfile-id=' + self.volume, 'client-pid=-1']
- mounter(params).inhibit(*[l for l in [label] if l])
+ mounter = label and MountbrokerMounter or DirectMounter
+
+ log_file = gconf.get("gluster-log-file")
+ if rconf.args.subcmd == "slave":
+ log_file = gconf.get("slave-gluster-log-file")
+
+ log_level = gconf.get("gluster-log-level")
+ if rconf.args.subcmd == "slave":
+ log_level = gconf.get("slave-gluster-log-level")
+
+ params = gconf.get("gluster-params").split() + \
+ ['log-level=' + log_level] + \
+ ['log-file=' + log_file, 'volfile-server=' + self.host] + \
+ ['volfile-id=' + self.volume, 'client-pid=-1']
+
+ self.mounter = mounter(params)
+ self.mounter.inhibit(label)
logging.info(lf("Mounted gluster volume",
duration="%.4f" % (time.time() - t0)))
- def connect_remote(self, *a, **kw):
- sup(self, *a, **kw)
- self.slavedir = "/proc/%d/cwd" % self.server.pid()
-
def gmaster_instantiate_tuple(self, slave):
"""return a tuple of the 'one shot' and the 'main crawl'
class instance"""
@@ -1404,7 +1091,7 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):
gmaster_builder()(self, slave),
gmaster_builder('changeloghistory')(self, slave))
- def service_loop(self, *args):
+ def service_loop(self, slave=None):
"""enter service loop
- if slave given, instantiate GMaster and
@@ -1412,171 +1099,183 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):
master behavior
- else do that's what's inherited
"""
- if args:
- slave = args[0]
- if gconf.local_path:
- class brickserver(FILE.FILEServer):
- local_path = gconf.local_path
- aggregated = self.server
-
- @classmethod
- def entries(cls, path):
- e = super(brickserver, cls).entries(path)
- # on the brick don't mess with /.glusterfs
- if path == '.':
- try:
- e.remove('.glusterfs')
- e.remove('.trashcan')
- except ValueError:
- pass
- return e
-
- @classmethod
- def lstat(cls, e):
- """ path based backend stat """
- return super(brickserver, cls).lstat(e)
-
- @classmethod
- def gfid(cls, e):
- """ path based backend gfid fetch """
- return super(brickserver, cls).gfid(e)
-
- @classmethod
- def linkto_check(cls, e):
- return super(brickserver, cls).linkto_check(e)
- if gconf.slave_id:
- # define {,set_}xtime in slave, thus preempting
- # the call to remote, so that it takes data from
- # the local brick
- slave.server.xtime = types.MethodType(
- lambda _self, path, uuid: (
- brickserver.xtime(path,
- uuid + '.' + gconf.slave_id)
- ),
- slave.server)
- slave.server.stime = types.MethodType(
- lambda _self, path, uuid: (
- brickserver.stime(path,
- uuid + '.' + gconf.slave_id)
- ),
- slave.server)
- slave.server.entry_stime = types.MethodType(
- lambda _self, path, uuid: (
- brickserver.entry_stime(
- path,
- uuid + '.' + gconf.slave_id)
- ),
- slave.server)
- slave.server.set_stime = types.MethodType(
- lambda _self, path, uuid, mark: (
- brickserver.set_stime(path,
- uuid + '.' + gconf.slave_id,
- mark)
- ),
- slave.server)
- slave.server.set_entry_stime = types.MethodType(
- lambda _self, path, uuid, mark: (
- brickserver.set_entry_stime(
- path,
- uuid + '.' + gconf.slave_id,
- mark)
- ),
- slave.server)
- (g1, g2, g3) = self.gmaster_instantiate_tuple(slave)
- g1.master.server = brickserver
- g2.master.server = brickserver
- g3.master.server = brickserver
- else:
- (g1, g2, g3) = self.gmaster_instantiate_tuple(slave)
- g1.master.server.aggregated = gmaster.master.server
- g2.master.server.aggregated = gmaster.master.server
- g3.master.server.aggregated = gmaster.master.server
- # bad bad bad: bad way to do things like this
- # need to make this elegant
- # register the crawlers and start crawling
- # g1 ==> Xsync, g2 ==> config.change_detector(changelog by default)
- # g3 ==> changelog History
- changelog_register_failed = False
- (inf, ouf, ra, wa) = gconf.rpc_fd.split(',')
- changelog_agent = RepceClient(int(inf), int(ouf))
- master_name, slave_data = get_master_and_slave_data_from_args(
- sys.argv)
- status = GeorepStatus(gconf.state_file, gconf.local_node,
- gconf.local_path,
- gconf.local_node_id,
- master_name, slave_data)
- status.reset_on_worker_start()
- rv = changelog_agent.version()
- if int(rv) != CHANGELOG_AGENT_CLIENT_VERSION:
+ if rconf.args.subcmd == "slave":
+ if gconf.get("use-rsync-xattrs") and not privileged():
raise GsyncdError(
- "RePCe major version mismatch(changelog agent): "
- "local %s, remote %s" %
- (CHANGELOG_AGENT_CLIENT_VERSION, rv))
+ "using rsync for extended attributes is not supported")
+
+ repce = RepceServer(
+ self.server, sys.stdin, sys.stdout, gconf.get("sync-jobs"))
+ t = syncdutils.Thread(target=lambda: (repce.service_loop(),
+ syncdutils.finalize()))
+ t.start()
+ logging.info("slave listening")
+ if gconf.get("slave-timeout") and gconf.get("slave-timeout") > 0:
+ while True:
+ lp = self.server.last_keep_alive
+ time.sleep(gconf.get("slave-timeout"))
+ if lp == self.server.last_keep_alive:
+ logging.info(
+ lf("connection inactive, stopping",
+ timeout=gconf.get("slave-timeout")))
+ break
+ else:
+ select((), (), ())
- try:
- workdir = g2.setup_working_dir()
- # Register only when change_detector is not set to
- # xsync, else agent will generate changelog files
- # in .processing directory of working dir
- if gconf.change_detector != 'xsync':
- # register with the changelog library
- # 9 == log level (DEBUG)
- # 5 == connection retries
- changelog_agent.init()
- changelog_agent.register(gconf.local_path,
- workdir, gconf.changelog_log_file,
- get_changelog_log_level(
- gconf.changelog_log_level),
- g2.CHANGELOG_CONN_RETRIES)
-
- register_time = int(time.time())
- g2.register(register_time, changelog_agent, status)
- g3.register(register_time, changelog_agent, status)
- except ChangelogException as e:
- logging.error(lf("Changelog register failed", error=e))
- sys.exit(1)
-
- g1.register(status=status)
- logging.info(lf("Register time",
- time=register_time))
- # oneshot: Try to use changelog history api, if not
- # available switch to FS crawl
- # Note: if config.change_detector is xsync then
- # it will not use changelog history api
- try:
- g3.crawlwrap(oneshot=True)
- except PartialHistoryAvailable as e:
- logging.info(lf('Partial history available, using xsync crawl'
- ' after consuming history',
- till=e))
- g1.crawlwrap(oneshot=True, register_time=register_time)
- except ChangelogHistoryNotAvailable:
- logging.info('Changelog history not available, using xsync')
- g1.crawlwrap(oneshot=True, register_time=register_time)
- except NoStimeAvailable:
- logging.info('No stime available, using xsync crawl')
- g1.crawlwrap(oneshot=True, register_time=register_time)
- except ChangelogException as e:
- logging.error(lf("Changelog History Crawl failed",
- error=e))
- sys.exit(1)
+ return
- try:
- g2.crawlwrap()
- except ChangelogException as e:
- logging.error(lf("Changelog crawl failed", error=e))
- sys.exit(1)
- else:
- sup(self, *args)
+ class brickserver(Server):
+ local_path = rconf.args.local_path
+ aggregated = self.server
+
+ @classmethod
+ def entries(cls, path):
+ e = super(brickserver, cls).entries(path)
+ # on the brick don't mess with /.glusterfs
+ if path == '.':
+ try:
+ e.remove('.glusterfs')
+ e.remove('.trashcan')
+ except ValueError:
+ pass
+ return e
+
+ @classmethod
+ def lstat(cls, e):
+ """ path based backend stat """
+ return super(brickserver, cls).lstat(e)
+
+ @classmethod
+ def gfid(cls, e):
+ """ path based backend gfid fetch """
+ return super(brickserver, cls).gfid(e)
+
+ @classmethod
+ def linkto_check(cls, e):
+ return super(brickserver, cls).linkto_check(e)
+
+ # define {,set_}xtime in slave, thus preempting
+ # the call to remote, so that it takes data from
+ # the local brick
+ slave.server.xtime = types.MethodType(
+ lambda _self, path, uuid: (
+ brickserver.xtime(path,
+ uuid + '.' + rconf.args.slave_id)
+ ),
+ slave.server)
+ slave.server.stime = types.MethodType(
+ lambda _self, path, uuid: (
+ brickserver.stime(path,
+ uuid + '.' + rconf.args.slave_id)
+ ),
+ slave.server)
+ slave.server.entry_stime = types.MethodType(
+ lambda _self, path, uuid: (
+ brickserver.entry_stime(
+ path,
+ uuid + '.' + rconf.args.slave_id)
+ ),
+ slave.server)
+ slave.server.set_stime = types.MethodType(
+ lambda _self, path, uuid, mark: (
+ brickserver.set_stime(path,
+ uuid + '.' + rconf.args.slave_id,
+ mark)
+ ),
+ slave.server)
+ slave.server.set_entry_stime = types.MethodType(
+ lambda _self, path, uuid, mark: (
+ brickserver.set_entry_stime(
+ path,
+ uuid + '.' + rconf.args.slave_id,
+ mark)
+ ),
+ slave.server)
+
+ (g1, g2, g3) = self.gmaster_instantiate_tuple(slave)
+ g1.master.server = brickserver
+ g2.master.server = brickserver
+ g3.master.server = brickserver
+
+ # bad bad bad: bad way to do things like this
+ # need to make this elegant
+ # register the crawlers and start crawling
+ # g1 ==> Xsync, g2 ==> config.change_detector(changelog by default)
+ # g3 ==> changelog History
+ (inf, ouf, ra, wa) = rconf.args.rpc_fd.split(',')
+ changelog_agent = RepceClient(int(inf), int(ouf))
+
+ status = GeorepStatus(gconf.get("state-file"),
+ rconf.args.local_node,
+ rconf.args.local_path,
+ rconf.args.local_node_id,
+ rconf.args.master,
+ rconf.args.slave)
+ status.reset_on_worker_start()
+ rv = changelog_agent.version()
+ if int(rv) != CHANGELOG_AGENT_CLIENT_VERSION:
+ raise GsyncdError(
+ "RePCe major version mismatch(changelog agent): "
+ "local %s, remote %s" %
+ (CHANGELOG_AGENT_CLIENT_VERSION, rv))
- def rsync(self, files, log_err=False):
- return sup(self, files, self.slavedir, log_err=log_err)
+ try:
+ workdir = g2.setup_working_dir()
+ # Register only when change_detector is not set to
+ # xsync, else agent will generate changelog files
+ # in .processing directory of working dir
+ if gconf.get("change-detector") != 'xsync':
+ # register with the changelog library
+ # 9 == log level (DEBUG)
+ # 5 == connection retries
+ changelog_agent.init()
+ changelog_agent.register(rconf.args.local_path,
+ workdir,
+ gconf.get("changelog-log-file"),
+ get_changelog_log_level(
+ gconf.get("changelog-log-level")),
+ g2.CHANGELOG_CONN_RETRIES)
+
+ register_time = int(time.time())
+ g2.register(register_time, changelog_agent, status)
+ g3.register(register_time, changelog_agent, status)
+ except ChangelogException as e:
+ logging.error(lf("Changelog register failed", error=e))
+ sys.exit(1)
+
+ g1.register(status=status)
+ logging.info(lf("Register time",
+ time=register_time))
+ # oneshot: Try to use changelog history api, if not
+ # available switch to FS crawl
+ # Note: if config.change_detector is xsync then
+ # it will not use changelog history api
+ try:
+ g3.crawlwrap(oneshot=True)
+ except PartialHistoryAvailable as e:
+ logging.info(lf('Partial history available, using xsync crawl'
+ ' after consuming history',
+ till=e))
+ g1.crawlwrap(oneshot=True, register_time=register_time)
+ except ChangelogHistoryNotAvailable:
+ logging.info('Changelog history not available, using xsync')
+ g1.crawlwrap(oneshot=True, register_time=register_time)
+ except NoStimeAvailable:
+ logging.info('No stime available, using xsync crawl')
+ g1.crawlwrap(oneshot=True, register_time=register_time)
+ except ChangelogException as e:
+ logging.error(lf("Changelog History Crawl failed",
+ error=e))
+ sys.exit(1)
- def tarssh(self, files, log_err=False):
- return sup(self, files, self.slavedir, log_err=log_err)
+ try:
+ g2.crawlwrap()
+ except ChangelogException as e:
+ logging.error(lf("Changelog crawl failed", error=e))
+ sys.exit(1)
-class SSH(AbstractUrl, SlaveRemote):
+class SSH(object):
"""scheme class for ssh:// urls
@@ -1584,13 +1283,9 @@ class SSH(AbstractUrl, SlaveRemote):
implementing an ssh based proxy
"""
- def __init__(self, path):
- self.remote_addr, inner_url = sup(self, path,
- '^((?:%s@)?%s):(.+)' %
- tuple([r.pattern
- for r in (UserRX, HostRX)]))
- self.inner_rsc = parse_url(inner_url)
- self.volume = inner_url[1:]
+ def __init__(self, host, volume):
+ self.remote_addr = host
+ self.volume = volume
@staticmethod
def parse_ssh_address(self):
@@ -1602,35 +1297,28 @@ class SSH(AbstractUrl, SlaveRemote):
self.remotehost = h
return {'user': u, 'host': h}
- def canonical_path(self):
- rap = self.parse_ssh_address(self)
- remote_addr = '@'.join([rap['user'], gethostbyname(rap['host'])])
- return ':'.join([remote_addr, self.inner_rsc.get_url(canonical=True)])
-
- def can_connect_to(self, remote):
- """determine our position in the connectibility matrix"""
- return False
-
- def start_fd_client(self, *a, **opts):
- """customizations for client startup
+ def start_fd_client(self, i, o):
+ """set up RePCe client, handshake with server
- - be a no-op if we are to daemonize (client startup is deferred
- to post-daemon stage)
- - determine target url for rsync after consulting server
+ It's cut out as a separate method to let
+ subclasses hook into client startup
"""
- if opts.get('deferred'):
- return a
- sup(self, *a)
- ityp = type(self.inner_rsc)
- if ityp == FILE:
- slavepath = self.inner_rsc.path
- elif ityp == GLUSTER:
- slavepath = "/proc/%d/cwd" % self.server.pid()
- else:
- raise NotImplementedError
+ self.server = RepceClient(i, o)
+ rv = self.server.__version__()
+ exrv = {'proto': repce.repce_version, 'object': Server.version()}
+ da0 = (rv, exrv)
+ da1 = ({}, {})
+ for i in range(2):
+ for k, v in da0[i].iteritems():
+ da1[i][k] = int(v)
+ if da1[0] != da1[1]:
+ raise GsyncdError(
+ "RePCe major version mismatch: local %s, remote %s" %
+ (exrv, rv))
+ slavepath = "/proc/%d/cwd" % self.server.pid()
self.slaveurl = ':'.join([self.remote_addr, slavepath])
- def connect_remote(self, go_daemon=None):
+ def connect_remote(self):
"""connect to inner slave url through outer ssh url
Wrap the connecting utility in ssh.
@@ -1648,49 +1336,182 @@ class SSH(AbstractUrl, SlaveRemote):
[NB. ATM gluster product does not makes use of interactive
authentication.]
"""
- if go_daemon == 'done':
- return self.start_fd_client(*self.fd_pair)
-
syncdutils.setup_ssh_ctl(tempfile.mkdtemp(prefix='gsyncd-aux-ssh-'),
self.remote_addr,
- self.inner_rsc.url)
+ self.volume)
- deferred = go_daemon == 'postconn'
logging.info("Initializing SSH connection between master and slave...")
t0 = time.time()
- ret = sup(self, gconf.ssh_command.split() +
- ["-p", str(gconf.ssh_port)] +
- gconf.ssh_ctl_args + [self.remote_addr],
- slave=self.inner_rsc.url, deferred=deferred)
+
+ extra_opts = []
+ remote_gsyncd = gconf.get("remote-gsyncd")
+ if remote_gsyncd == "":
+ remote_gsyncd = "/nonexistent/gsyncd"
+
+ if gconf.get("use-rsync-xattrs"):
+ extra_opts.append('--use-rsync-xattrs')
+
+ args_to_slave = [gconf.get("ssh-command")] + \
+ gconf.get("ssh-options").split() + \
+ ["-p", str(gconf.get("ssh-port"))] + \
+ rconf.ssh_ctl_args + [self.remote_addr] + \
+ [remote_gsyncd, "slave"] + \
+ extra_opts + \
+ [rconf.args.master, rconf.args.slave] + \
+ [
+ '--master-node', rconf.args.local_node,
+ '--master-node-id', rconf.args.local_node_id,
+ '--master-brick', rconf.args.local_path,
+ '--local-node', rconf.args.resource_remote,
+ '--local-node-id', rconf.args.resource_remote_id] + \
+ [
+ # Add all config arguments here, slave gsyncd will not use
+ # config file in slave side, so all overridding options should
+ # be sent as arguments
+ '--slave-timeout', str(gconf.get("slave-timeout")),
+ '--slave-log-level', gconf.get("slave-log-level"),
+ '--slave-gluster-log-level',
+ gconf.get("slave-gluster-log-level")]
+
+ if gconf.get("slave-access-mount"):
+ args_to_slave.append('--slave-access-mount')
+
+ if rconf.args.debug:
+ args_to_slave.append('--debug')
+
+ po = Popen(args_to_slave,
+ stdin=subprocess.PIPE, stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE)
+ rconf.transport = po
+ self.start_fd_client(po.stdout, po.stdin)
logging.info(lf("SSH connection between master and slave established.",
duration="%.4f" % (time.time() - t0)))
- if deferred:
- # send a message to peer so that we can wait for
- # the answer from which we know connection is
- # established and we can proceed with daemonization
- # (doing that too early robs the ssh passwd prompt...)
- # However, we'd better not start the RepceClient
- # before daemonization (that's not preserved properly
- # in daemon), we just do a an ad-hoc linear put/get.
- i, o = ret
- inf = os.fdopen(i)
- repce.send(o, None, '__repce_version__')
- select((inf,), (), ())
- repce.recv(inf)
- # hack hack hack: store a global reference to the file
- # to save it from getting GC'd which implies closing it
- gconf.permanent_handles.append(inf)
- self.fd_pair = (i, o)
- return 'should'
-
- def rsync(self, files, log_err=False):
- return sup(self, files, '-e',
- " ".join(gconf.ssh_command.split() +
- ["-p", str(gconf.ssh_port)] +
- gconf.ssh_ctl_args),
- *(gconf.rsync_ssh_options.split() + [self.slaveurl]),
- log_err=log_err)
-
- def tarssh(self, files, log_err=False):
- return sup(self, files, self.slaveurl, log_err=log_err)
+ def rsync(self, files, *args, **kw):
+ """invoke rsync"""
+ if not files:
+ raise GsyncdError("no files to sync")
+ logging.debug("files: " + ", ".join(files))
+
+ extra_rsync_flags = []
+ # Performance flag, --ignore-missing-args, if rsync version is
+ # greater than 3.1.0 then include this flag.
+ if gconf.get("rsync-opt-ignore-missing-args") and \
+ get_rsync_version(gconf.get("rsync-command")) >= "3.1.0":
+ extra_rsync_flags = ["--ignore-missing-args"]
+
+ rsync_ssh_opts = [gconf.get("ssh-command")] + \
+ gconf.get("ssh-options").split() + \
+ ["-p", str(gconf.get("ssh-port"))] + \
+ rconf.ssh_ctl_args + \
+ gconf.get("rsync-ssh-options").split()
+
+ argv = [
+ gconf.get("rsync-command"),
+ '-aR0',
+ '--inplace',
+ '--files-from=-',
+ '--super',
+ '--stats',
+ '--numeric-ids',
+ '--no-implied-dirs'
+ ]
+
+ if gconf.get("rsync-opt-existing"):
+ argv += ["--existing"]
+
+ if gconf.get("sync-xattrs"):
+ argv += ['--xattrs']
+
+ if gconf.get("sync-acls"):
+ argv += ['--acls']
+
+ argv = argv + \
+ gconf.get("rsync-options").split() + \
+ extra_rsync_flags + ['.'] + \
+ ["-e", " ".join(rsync_ssh_opts)] + \
+ [self.slaveurl]
+
+ log_rsync_performance = gconf.getr("log-rsync-performance", False)
+
+ if log_rsync_performance:
+ # use stdout=PIPE only when log_rsync_performance enabled
+ # Else rsync will write to stdout and nobody is their
+ # to consume. If PIPE is full rsync hangs.
+ po = Popen(argv, stdin=subprocess.PIPE, stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE)
+ else:
+ po = Popen(argv, stdin=subprocess.PIPE, stderr=subprocess.PIPE)
+
+ for f in files:
+ po.stdin.write(f)
+ po.stdin.write('\0')
+
+ stdout, stderr = po.communicate()
+
+ if kw.get("log_err", False):
+ for errline in stderr.strip().split("\n")[:-1]:
+ logging.error(lf("SYNC Error",
+ sync_engine="Rsync",
+ error=errline))
+
+ if log_rsync_performance:
+ rsync_msg = []
+ for line in stdout.split("\n"):
+ if line.startswith("Number of files:") or \
+ line.startswith("Number of regular files transferred:") or \
+ line.startswith("Total file size:") or \
+ line.startswith("Total transferred file size:") or \
+ line.startswith("Literal data:") or \
+ line.startswith("Matched data:") or \
+ line.startswith("Total bytes sent:") or \
+ line.startswith("Total bytes received:") or \
+ line.startswith("sent "):
+ rsync_msg.append(line)
+ logging.info(lf("rsync performance",
+ data=", ".join(rsync_msg)))
+
+ return po
+
+ def tarssh(self, files, slaveurl, log_err=False):
+ """invoke tar+ssh
+ -z (compress) can be use if needed, but omitting it now
+ as it results in weird error (tar+ssh errors out (errcode: 2)
+ """
+ if not files:
+ raise GsyncdError("no files to sync")
+ logging.debug("files: " + ", ".join(files))
+ (host, rdir) = slaveurl.split(':')
+ tar_cmd = ["tar"] + \
+ ["--sparse", "-cf", "-", "--files-from", "-"]
+ ssh_cmd = gconf.get("ssh-command-tar").split() + \
+ gconf.get("ssh-options-tar").split() + \
+ ["-p", str(gconf.get("ssh-port"))] + \
+ [host, "tar"] + \
+ ["--overwrite", "-xf", "-", "-C", rdir]
+ p0 = Popen(tar_cmd, stdout=subprocess.PIPE,
+ stdin=subprocess.PIPE, stderr=subprocess.PIPE)
+ p1 = Popen(ssh_cmd, stdin=p0.stdout, stderr=subprocess.PIPE)
+ for f in files:
+ p0.stdin.write(f)
+ p0.stdin.write('\n')
+
+ p0.stdin.close()
+ p0.stdout.close() # Allow p0 to receive a SIGPIPE if p1 exits.
+ # wait for tar to terminate, collecting any errors, further
+ # waiting for transfer to complete
+ _, stderr1 = p1.communicate()
+
+ # stdin and stdout of p0 is already closed, Reset to None and
+ # wait for child process to complete
+ p0.stdin = None
+ p0.stdout = None
+ p0.communicate()
+
+ if log_err:
+ for errline in stderr1.strip().split("\n")[:-1]:
+ logging.error(lf("SYNC Error",
+ sync_engine="Tarssh",
+ error=errline))
+
+ return p1