diff options
Diffstat (limited to 'geo-replication/syncdaemon/resource.py')
| -rw-r--r-- | geo-replication/syncdaemon/resource.py | 1812 |
1 files changed, 789 insertions, 1023 deletions
diff --git a/geo-replication/syncdaemon/resource.py b/geo-replication/syncdaemon/resource.py index 7af3bda4bef..f12c7ceaa36 100644 --- a/geo-replication/syncdaemon/resource.py +++ b/geo-replication/syncdaemon/resource.py @@ -13,248 +13,43 @@ import os import sys import stat import time -import signal import fcntl -import errno import types import struct -import socket import logging import tempfile -import threading import subprocess +from errno import (EEXIST, ENOENT, ENODATA, ENOTDIR, ELOOP, EACCES, + EISDIR, ENOTEMPTY, ESTALE, EINVAL, EBUSY, EPERM) import errno -from errno import EEXIST, ENOENT, ENODATA, ENOTDIR, ELOOP, EACCES -from errno import EISDIR, ENOTEMPTY, ESTALE, EINVAL, EBUSY, EPERM -from select import error as SelectError -import shutil -from gconf import gconf +from rconf import rconf +import gsyncdconfig as gconf +import libgfchangelog + import repce from repce import RepceServer, RepceClient from master import gmaster_builder import syncdutils -from syncdutils import GsyncdError, select, privileged, boolify, funcode -from syncdutils import umask, entry2pb, gauxpfx, errno_wrap, lstat -from syncdutils import NoStimeAvailable, PartialHistoryAvailable -from syncdutils import ChangelogException, ChangelogHistoryNotAvailable -from syncdutils import get_changelog_log_level, get_rsync_version -from syncdutils import CHANGELOG_AGENT_CLIENT_VERSION -from syncdutils import GX_GFID_CANONICAL_LEN +from syncdutils import (GsyncdError, select, privileged, funcode, + entry2pb, gauxpfx, errno_wrap, lstat, + NoStimeAvailable, PartialHistoryAvailable, + ChangelogException, ChangelogHistoryNotAvailable, + get_changelog_log_level, get_rsync_version, + GX_GFID_CANONICAL_LEN, + gf_mount_ready, lf, Popen, sup, + Xattr, matching_disk_gfid, get_gfid_from_mnt, + unshare_propagation_supported, get_slv_dir_path) from gsyncdstatus import GeorepStatus -from syncdutils import get_master_and_slave_data_from_args -from syncdutils import mntpt_list, lf -from syncdutils import Xattr, matching_disk_gfid, get_gfid_from_mnt +from py2py3 import (pipe, str_to_bytearray, entry_pack_reg, + entry_pack_reg_stat, entry_pack_mkdir, + entry_pack_symlink) -UrlRX = re.compile('\A(\w+)://([^ *?[]*)\Z') -HostRX = re.compile('[a-zA-Z\d](?:[a-zA-Z\d.-]*[a-zA-Z\d])?', re.I) -UserRX = re.compile("[\w!\#$%&'*+-\/=?^_`{|}~]+") ENOTSUP = getattr(errno, 'ENOTSUP', 'EOPNOTSUPP') -def sup(x, *a, **kw): - """a rubyesque "super" for python ;) - - invoke caller method in parent class with given args. - """ - return getattr(super(type(x), x), - sys._getframe(1).f_code.co_name)(*a, **kw) - - -def desugar(ustr): - """transform sugared url strings to standard <scheme>://<urlbody> form - - parsing logic enforces the constraint that sugared forms should contatin - a ':' or a '/', which ensures that sugared urls do not conflict with - gluster volume names. - """ - m = re.match('([^:]*):(.*)', ustr) - if m: - if not m.groups()[0]: - return "gluster://localhost" + ustr - elif '@' in m.groups()[0] or re.search('[:/]', m.groups()[1]): - return "ssh://" + ustr - else: - return "gluster://" + ustr - else: - if ustr[0] != '/': - raise GsyncdError("cannot resolve sugared url '%s'" % ustr) - ap = os.path.normpath(ustr) - if ap.startswith('//'): - ap = ap[1:] - return "file://" + ap - - -def gethostbyname(hnam): - """gethostbyname wrapper""" - try: - return socket.gethostbyname(hnam) - except socket.gaierror: - ex = sys.exc_info()[1] - raise GsyncdError("failed to resolve %s: %s" % - (hnam, ex.strerror)) - - -def parse_url(ustr): - """instantiate an url object by scheme-to-class dispatch - - The url classes taken into consideration are the ones in - this module whose names are full-caps. - """ - m = UrlRX.match(ustr) - if not m: - ustr = desugar(ustr) - m = UrlRX.match(ustr) - if not m: - raise GsyncdError("malformed url") - sch, path = m.groups() - this = sys.modules[__name__] - if not hasattr(this, sch.upper()): - raise GsyncdError("unknown url scheme " + sch) - return getattr(this, sch.upper())(path) - - -class Popen(subprocess.Popen): - - """customized subclass of subprocess.Popen with a ring - buffer for children error output""" - - @classmethod - def init_errhandler(cls): - """start the thread which handles children's error output""" - cls.errstore = {} - - def tailer(): - while True: - errstore = cls.errstore.copy() - try: - poe, _, _ = select( - [po.stderr for po in errstore], [], [], 1) - except (ValueError, SelectError): - # stderr is already closed wait for some time before - # checking next error - time.sleep(0.5) - continue - for po in errstore: - if po.stderr not in poe: - continue - po.lock.acquire() - try: - if po.on_death_row: - continue - la = errstore[po] - try: - fd = po.stderr.fileno() - except ValueError: # file is already closed - time.sleep(0.5) - continue - - try: - l = os.read(fd, 1024) - except OSError: - time.sleep(0.5) - continue - - if not l: - continue - tots = len(l) - for lx in la: - tots += len(lx) - while tots > 1 << 20 and la: - tots -= len(la.pop(0)) - la.append(l) - finally: - po.lock.release() - t = syncdutils.Thread(target=tailer) - t.start() - cls.errhandler = t - - @classmethod - def fork(cls): - """fork wrapper that restarts errhandler thread in child""" - pid = os.fork() - if not pid: - cls.init_errhandler() - return pid - - def __init__(self, args, *a, **kw): - """customizations for subprocess.Popen instantiation - - - 'close_fds' is taken to be the default - - if child's stderr is chosen to be managed, - register it with the error handler thread - """ - self.args = args - if 'close_fds' not in kw: - kw['close_fds'] = True - self.lock = threading.Lock() - self.on_death_row = False - self.elines = [] - try: - sup(self, args, *a, **kw) - except: - ex = sys.exc_info()[1] - if not isinstance(ex, OSError): - raise - raise GsyncdError("""execution of "%s" failed with %s (%s)""" % - (args[0], errno.errorcode[ex.errno], - os.strerror(ex.errno))) - if kw.get('stderr') == subprocess.PIPE: - assert(getattr(self, 'errhandler', None)) - self.errstore[self] = [] - - def errlog(self): - """make a log about child's failure event""" - logging.error(lf("command returned error", - cmd=" ".join(self.args), - error=self.returncode)) - lp = '' - - def logerr(l): - logging.error(self.args[0] + "> " + l) - for l in self.elines: - ls = l.split('\n') - ls[0] = lp + ls[0] - lp = ls.pop() - for ll in ls: - logerr(ll) - if lp: - logerr(lp) - - def errfail(self): - """fail nicely if child did not terminate with success""" - self.errlog() - syncdutils.finalize(exval=1) - - def terminate_geterr(self, fail_on_err=True): - """kill child, finalize stderr harvesting (unregister - from errhandler, set up .elines), fail on error if - asked for - """ - self.lock.acquire() - try: - self.on_death_row = True - finally: - self.lock.release() - elines = self.errstore.pop(self) - if self.poll() is None: - self.terminate() - if self.poll() is None: - time.sleep(0.1) - self.kill() - self.wait() - while True: - if not select([self.stderr], [], [], 0.1)[0]: - break - b = os.read(self.stderr.fileno(), 1024) - if b: - elines.append(b) - else: - break - self.stderr.close() - self.elines = elines - if fail_on_err and self.returncode != 0: - self.errfail() +slv_volume = None +slv_host = None class Server(object): @@ -300,14 +95,14 @@ class Server(object): fc = funcode(f) pi = list(fc.co_varnames).index('path') - def ff(*a): - path = a[pi] + def ff(*args): + path = args[pi] ps = path.split('/') if path[0] == '/' or '..' in ps: raise ValueError('unsafe path') - a = list(a) - a[pi] = os.path.join(a[0].local_path, path) - return f(*a) + args = list(args) + args[pi] = os.path.join(args[0].local_path, path) + return f(*args) return ff @classmethod @@ -353,6 +148,7 @@ class Server(object): if buf == ENOENT: return buf else: + buf = str_to_bytearray(buf) m = re.match('(.{8})(.{4})(.{4})(.{4})(.{12})', "".join( ['%02x' % x for x in struct.unpack(cls.GFID_FMTSTR, buf)])) return '-'.join(m.groups()) @@ -443,6 +239,7 @@ class Server(object): val = Xattr.lgetxattr(path, '.'.join([cls.GX_NSPACE, uuid, 'xtime']), 8) + val = str_to_bytearray(val) return struct.unpack('!II', val) except OSError: ex = sys.exc_info()[1] @@ -465,6 +262,7 @@ class Server(object): val = Xattr.lgetxattr(path, '.'.join([cls.GX_NSPACE, uuid, 'stime']), 8) + val = str_to_bytearray(val) return struct.unpack('!II', val) except OSError: ex = sys.exc_info()[1] @@ -487,6 +285,7 @@ class Server(object): val = Xattr.lgetxattr(path, '.'.join([cls.GX_NSPACE, uuid, 'stime']), 8) + val = str_to_bytearray(val) return struct.unpack('!II', val) except OSError: ex = sys.exc_info()[1] @@ -510,6 +309,7 @@ class Server(object): '.'.join([cls.GX_NSPACE, uuid, 'entry_stime']), 8) + val = str_to_bytearray(val) return struct.unpack('!II', val) except OSError: ex = sys.exc_info()[1] @@ -578,50 +378,31 @@ class Server(object): def entry_ops(cls, entries): pfx = gauxpfx() logging.debug('entries: %s' % repr(entries)) - # regular file - - def entry_pack_reg(gf, bn, mo, uid, gid): - blen = len(bn) - return struct.pack(cls._fmt_mknod(blen), - uid, gid, gf, mo, bn, - stat.S_IMODE(mo), 0, umask()) - - def entry_pack_reg_stat(gf, bn, st): - blen = len(bn) - mo = st['mode'] - return struct.pack(cls._fmt_mknod(blen), - st['uid'], st['gid'], - gf, mo, bn, - stat.S_IMODE(mo), 0, umask()) - # mkdir - - def entry_pack_mkdir(gf, bn, mo, uid, gid): - blen = len(bn) - return struct.pack(cls._fmt_mkdir(blen), - uid, gid, gf, mo, bn, - stat.S_IMODE(mo), umask()) - # symlink - - def entry_pack_symlink(gf, bn, lnk, st): - blen = len(bn) - llen = len(lnk) - return struct.pack(cls._fmt_symlink(blen, llen), - st['uid'], st['gid'], - gf, st['mode'], bn, lnk) - - def entry_purge(op, entry, gfid, e): + dist_count = rconf.args.master_dist_count + + def entry_purge(op, entry, gfid, e, uid, gid): # This is an extremely racy code and needs to be fixed ASAP. # The GFID check here is to be sure that the pargfid/bname # to be purged is the GFID gotten from the changelog. # (a stat(changelog_gfid) would also be valid here) # The race here is between the GFID check and the purge. + + # If the entry or the gfid of the file to be deleted is not present + # on slave, we can ignore the unlink/rmdir + if isinstance(lstat(entry), int) or \ + isinstance(lstat(os.path.join(pfx, gfid)), int): + return + if not matching_disk_gfid(gfid, entry): - collect_failure(e, EEXIST) + collect_failure(e, EEXIST, uid, gid) return if op == 'UNLINK': er = errno_wrap(os.unlink, [entry], [ENOENT, ESTALE], [EBUSY]) - return er + # EISDIR is safe error, ignore. This can only happen when + # unlink is sent from master while fixing gfid conflicts. + if er != EISDIR: + return er elif op == 'RMDIR': er = errno_wrap(os.rmdir, [entry], [ENOENT, ESTALE, @@ -629,27 +410,40 @@ class Server(object): if er == ENOTEMPTY: return er - def collect_failure(e, cmd_ret, dst=False): + def collect_failure(e, cmd_ret, uid, gid, dst=False): slv_entry_info = {} slv_entry_info['gfid_mismatch'] = False + slv_entry_info['name_mismatch'] = False slv_entry_info['dst'] = dst + slv_entry_info['slave_isdir'] = False + slv_entry_info['slave_name'] = None + slv_entry_info['slave_gfid'] = None # We do this for failing fops on Slave # Master should be logging this if cmd_ret is None: return False - if cmd_ret == EEXIST: + if e.get("stat", {}): + # Copy actual UID/GID value back to entry stat + e['stat']['uid'] = uid + e['stat']['gid'] = gid + + if cmd_ret in [EEXIST, ESTALE]: if dst: en = e['entry1'] else: en = e['entry'] disk_gfid = get_gfid_from_mnt(en) - if isinstance(disk_gfid, basestring) and e['gfid'] != disk_gfid: + if isinstance(disk_gfid, str) and \ + e['gfid'] != disk_gfid: slv_entry_info['gfid_mismatch'] = True st = lstat(en) if not isinstance(st, int): if st and stat.S_ISDIR(st.st_mode): slv_entry_info['slave_isdir'] = True + dir_name = get_slv_dir_path(slv_host, slv_volume, + disk_gfid) + slv_entry_info['slave_name'] = dir_name else: slv_entry_info['slave_isdir'] = False slv_entry_info['slave_gfid'] = disk_gfid @@ -695,7 +489,7 @@ class Server(object): errno_wrap(os.rmdir, [path], [ENOENT, ESTALE], [EBUSY]) - def rename_with_disk_gfid_confirmation(gfid, entry, en): + def rename_with_disk_gfid_confirmation(gfid, entry, en, uid, gid): if not matching_disk_gfid(gfid, entry): logging.error(lf("RENAME ignored: source entry does not match " "with on-disk gfid", @@ -703,14 +497,13 @@ class Server(object): gfid=gfid, disk_gfid=get_gfid_from_mnt(entry), target=en)) - collect_failure(e, EEXIST) + collect_failure(e, EEXIST, uid, gid) return cmd_ret = errno_wrap(os.rename, [entry, en], [ENOENT, EEXIST], [ESTALE, EBUSY]) - collect_failure(e, cmd_ret) - + collect_failure(e, cmd_ret, uid, gid) for e in entries: blob = None @@ -719,6 +512,12 @@ class Server(object): entry = e['entry'] uid = 0 gid = 0 + + # Skip entry processing if it's marked true during gfid + # conflict resolution + if e['skip_entry']: + continue + if e.get("stat", {}): # Copy UID/GID value and then reset to zero. Copied UID/GID # will be used to run chown once entry is created. @@ -731,7 +530,7 @@ class Server(object): if op in ['RMDIR', 'UNLINK']: # Try once, if rmdir failed with ENOTEMPTY # then delete recursively. - er = entry_purge(op, entry, gfid, e) + er = entry_purge(op, entry, gfid, e, uid, gid) if isinstance(er, int): if er == ENOTEMPTY and op == 'RMDIR': # Retry if ENOTEMPTY, ESTALE @@ -759,8 +558,8 @@ class Server(object): st = lstat(slink) # don't create multiple entries with same gfid if isinstance(st, int): - blob = entry_pack_reg( - gfid, bname, e['mode'], e['uid'], e['gid']) + blob = entry_pack_reg(cls, gfid, bname, + e['mode'], e['uid'], e['gid']) # Self healed hardlinks are recorded as MKNOD. # So if the gfid already exists, it should be # processed as hard link not mknod. @@ -768,67 +567,104 @@ class Server(object): cmd_ret = errno_wrap(os.link, [slink, entry], [ENOENT, EEXIST], [ESTALE]) - collect_failure(e, cmd_ret) + collect_failure(e, cmd_ret, uid, gid) elif op == 'MKDIR': + en = e['entry'] slink = os.path.join(pfx, gfid) st = lstat(slink) # don't create multiple entries with same gfid if isinstance(st, int): - blob = entry_pack_mkdir( - gfid, bname, e['mode'], e['uid'], e['gid']) + blob = entry_pack_mkdir(cls, gfid, bname, + e['mode'], e['uid'], e['gid']) + elif (isinstance(lstat(en), int) or + not matching_disk_gfid(gfid, en)): + # If gfid of a directory exists on slave but path based + # create is getting EEXIST. This means the directory is + # renamed in master but recorded as MKDIR during hybrid + # crawl. Get the directory path by reading the backend + # symlink and trying to rename to new name as said by + # master. + logging.info(lf("Special case: rename on mkdir", + gfid=gfid, entry=repr(entry))) + src_entry = get_slv_dir_path(slv_host, slv_volume, gfid) + if src_entry is None: + collect_failure(e, ENOENT, uid, gid) + if src_entry is not None and src_entry != entry: + slv_entry_info = {} + slv_entry_info['gfid_mismatch'] = False + slv_entry_info['name_mismatch'] = True + slv_entry_info['dst'] = False + slv_entry_info['slave_isdir'] = True + slv_entry_info['slave_gfid'] = gfid + slv_entry_info['slave_entry'] = src_entry + + failures.append((e, EEXIST, slv_entry_info)) elif op == 'LINK': slink = os.path.join(pfx, gfid) st = lstat(slink) if isinstance(st, int): (pg, bname) = entry2pb(entry) if stat.S_ISREG(e['stat']['mode']): - blob = entry_pack_reg_stat(gfid, bname, e['stat']) + blob = entry_pack_reg_stat(cls, gfid, bname, e['stat']) elif stat.S_ISLNK(e['stat']['mode']): - blob = entry_pack_symlink(gfid, bname, e['link'], + blob = entry_pack_symlink(cls, gfid, bname, e['link'], e['stat']) else: cmd_ret = errno_wrap(os.link, [slink, entry], [ENOENT, EEXIST], [ESTALE]) - collect_failure(e, cmd_ret) + collect_failure(e, cmd_ret, uid, gid) elif op == 'SYMLINK': en = e['entry'] st = lstat(entry) if isinstance(st, int): - blob = entry_pack_symlink(gfid, bname, e['link'], + blob = entry_pack_symlink(cls, gfid, bname, e['link'], e['stat']) elif not matching_disk_gfid(gfid, en): - collect_failure(e, EEXIST) + collect_failure(e, EEXIST, uid, gid) elif op == 'RENAME': en = e['entry1'] - st = lstat(entry) - if isinstance(st, int): + # The matching disk gfid check validates two things + # 1. Validates name is present, return false otherwise + # 2. Validates gfid is same, returns false otherwise + # So both validations are necessary to decide src doesn't + # exist. We can't rely on only gfid stat as hardlink could + # be present and we can't rely only on name as name could + # exist with different gfid. + if not matching_disk_gfid(gfid, entry): if e['stat'] and not stat.S_ISDIR(e['stat']['mode']): - if stat.S_ISLNK(e['stat']['mode']) and \ - e['link'] is not None: - st1 = lstat(en) - if isinstance(st1, int): - (pg, bname) = entry2pb(en) - blob = entry_pack_symlink(gfid, bname, - e['link'], e['stat']) - elif not matching_disk_gfid(gfid, en): - collect_failure(e, EEXIST, True) + if stat.S_ISLNK(e['stat']['mode']): + # src is not present, so don't sync symlink as + # we don't know target. It's ok to ignore. If + # it's unliked, it's fine. If it's renamed to + # something else, it will be synced then. + if e['link'] is not None: + st1 = lstat(en) + if isinstance(st1, int): + (pg, bname) = entry2pb(en) + blob = entry_pack_symlink(cls, gfid, bname, + e['link'], + e['stat']) + elif not matching_disk_gfid(gfid, en): + collect_failure(e, EEXIST, uid, gid, True) else: slink = os.path.join(pfx, gfid) st = lstat(slink) # don't create multiple entries with same gfid if isinstance(st, int): (pg, bname) = entry2pb(en) - blob = entry_pack_reg_stat(gfid, bname, + blob = entry_pack_reg_stat(cls, gfid, bname, e['stat']) else: cmd_ret = errno_wrap(os.link, [slink, en], [ENOENT, EEXIST], [ESTALE]) - collect_failure(e, cmd_ret) + collect_failure(e, cmd_ret, uid, gid) else: + st = lstat(entry) st1 = lstat(en) if isinstance(st1, int): - rename_with_disk_gfid_confirmation(gfid, entry, en) + rename_with_disk_gfid_confirmation(gfid, entry, en, + uid, gid) else: if st.st_ino == st1.st_ino: # we have a hard link, we can now unlink source @@ -843,8 +679,8 @@ class Server(object): except OSError as e: if e.errno == ENOTEMPTY: logging.error( - lf("Unable to delete directory" - ", Both Old and New" + lf("Directory Rename failed. " + "Both Old and New" " directories exists", old=entry, new=en)) @@ -852,25 +688,32 @@ class Server(object): raise else: raise - elif not matching_disk_gfid(gfid, en): - collect_failure(e, EEXIST, True) + elif not matching_disk_gfid(gfid, en) and dist_count > 1: + collect_failure(e, EEXIST, uid, gid, True) else: - rename_with_disk_gfid_confirmation(gfid, entry, en) + # We are here which means matching_disk_gfid for + # both source and destination has returned false + # and distribution count for master vol is greater + # then one. Which basically says both the source and + # destination exist and not hardlinks. + # So we are safe to go ahead with rename here. + rename_with_disk_gfid_confirmation(gfid, entry, en, + uid, gid) if blob: cmd_ret = errno_wrap(Xattr.lsetxattr, [pg, 'glusterfs.gfid.newfile', blob], - [EEXIST, ENOENT], + [EEXIST, ENOENT, ESTALE], [ESTALE, EINVAL, EBUSY]) - failed = collect_failure(e, cmd_ret) + collect_failure(e, cmd_ret, uid, gid) # If UID/GID is different than zero that means we are trying # create Entry with different UID/GID. Create Entry with # UID:0 and GID:0, and then call chown to set UID/GID if uid != 0 or gid != 0: path = os.path.join(pfx, gfid) - cmd_ret = errno_wrap(os.chown, [path, uid, gid], [ENOENT], + cmd_ret = errno_wrap(os.lchown, [path, uid, gid], [ENOENT], [ESTALE, EINVAL]) - collect_failure(e, cmd_ret) + collect_failure(e, cmd_ret, uid, gid) return failures @@ -897,10 +740,8 @@ class Server(object): # 'lchown' 'lchmod' 'utime with no-deference' blindly. # But since 'lchmod' and 'utime with no de-reference' is # not supported in python3, we have to rely on 'chmod' - # and 'utime with de-reference'. But 'chmod' - # de-reference the symlink and gets ENOENT, EACCES, - # EPERM errors, hence ignoring those errors if it's on - # symlink file. + # and 'utime with de-reference'. Hence avoiding 'chmod' + # and 'utime' if it's symlink file. is_symlink = False cmd_ret = errno_wrap(os.lchown, [go, uid, gid], [ENOENT], @@ -908,19 +749,17 @@ class Server(object): if isinstance(cmd_ret, int): continue - cmd_ret = errno_wrap(os.chmod, [go, mode], - [ENOENT, EACCES, EPERM], [ESTALE, EINVAL]) - if isinstance(cmd_ret, int): - is_symlink = os.path.islink(go) - if not is_symlink: + is_symlink = os.path.islink(go) + + if not is_symlink: + cmd_ret = errno_wrap(os.chmod, [go, mode], + [ENOENT, EACCES, EPERM], [ESTALE, EINVAL]) + if isinstance(cmd_ret, int): failures.append((e, cmd_ret, "chmod")) - cmd_ret = errno_wrap(os.utime, [go, (atime, mtime)], - [ENOENT, EACCES, EPERM], [ESTALE, EINVAL]) - if isinstance(cmd_ret, int): - if not is_symlink: - is_symlink = os.path.islink(go) - if not is_symlink: + cmd_ret = errno_wrap(os.utime, [go, (atime, mtime)], + [ENOENT, EACCES, EPERM], [ESTALE, EINVAL]) + if isinstance(cmd_ret, int): failures.append((e, cmd_ret, "utime")) return failures @@ -978,274 +817,273 @@ class Server(object): return 1.0 -class SlaveLocal(object): - - """mix-in class to implement some factes of a slave server - - ("mix-in" is sort of like "abstract class", ie. it's not - instantiated just included in the ancesty DAG. I use "mix-in" - to indicate that it's not used as an abstract base class, - rather just taken in to implement additional functionality - on the basis of the assumed availability of certain interfaces.) - """ - - def can_connect_to(self, remote): - """determine our position in the connectibility matrix""" - return not remote - - def service_loop(self): - """start a RePCe server serving self's server +class Mounter(object): - stop servicing if a timeout is configured and got no - keep-alime in that inteval - """ + """Abstract base class for mounter backends""" - if boolify(gconf.use_rsync_xattrs) and not privileged(): - raise GsyncdError( - "using rsync for extended attributes is not supported") - - repce = RepceServer( - self.server, sys.stdin, sys.stdout, int(gconf.sync_jobs)) - t = syncdutils.Thread(target=lambda: (repce.service_loop(), - syncdutils.finalize())) - t.start() - logging.info("slave listening") - if gconf.timeout and int(gconf.timeout) > 0: - while True: - lp = self.server.last_keep_alive - time.sleep(int(gconf.timeout)) - if lp == self.server.last_keep_alive: - logging.info( - lf("connection inactive, stopping", - timeout=int(gconf.timeout))) - break - else: - select((), (), ()) + def __init__(self, params): + self.params = params + self.mntpt = None + self.umount_cmd = [] + @classmethod + def get_glusterprog(cls): + gluster_cmd_dir = gconf.get("gluster-command-dir") + if rconf.args.subcmd == "slave": + gluster_cmd_dir = gconf.get("slave-gluster-command-dir") + return os.path.join(gluster_cmd_dir, cls.glusterprog) + + def umount_l(self, d): + """perform lazy umount""" + po = Popen(self.make_umount_argv(d), stderr=subprocess.PIPE, + universal_newlines=True) + po.wait() + return po -class SlaveRemote(object): + @classmethod + def make_umount_argv(cls, d): + raise NotImplementedError - """mix-in class to implement an interface to a remote slave""" + def make_mount_argv(self, label=None): + raise NotImplementedError - def connect_remote(self, rargs=[], **opts): - """connects to a remote slave + def cleanup_mntpt(self, *a): + pass - Invoke an auxiliary utility (slave gsyncd, possibly wrapped) - which sets up the connection and set up a RePCe client to - communicate throuh its stdio. - """ - slave = opts.get('slave', self.url) - extra_opts = [] - so = getattr(gconf, 'session_owner', None) - if so: - extra_opts += ['--session-owner', so] - li = getattr(gconf, 'local_id', None) - if li: - extra_opts += ['--local-id', li] - ln = getattr(gconf, 'local_node', None) - if ln: - extra_opts += ['--local-node', ln] - if boolify(gconf.use_rsync_xattrs): - extra_opts.append('--use-rsync-xattrs') - if boolify(gconf.access_mount): - extra_opts.append('--access-mount') - po = Popen(rargs + gconf.remote_gsyncd.split() + extra_opts + - ['-N', '--listen', '--timeout', str(gconf.timeout), - slave], - stdin=subprocess.PIPE, stdout=subprocess.PIPE, - stderr=subprocess.PIPE) - gconf.transport = po - return self.start_fd_client(po.stdout, po.stdin, **opts) + def handle_mounter(self, po): + po.wait() - def start_fd_client(self, i, o, **opts): - """set up RePCe client, handshake with server + def inhibit(self, label): + """inhibit a gluster filesystem - It's cut out as a separate method to let - subclasses hook into client startup + Mount glusterfs over a temporary mountpoint, + change into the mount, and lazy unmount the + filesystem. """ - self.server = RepceClient(i, o) - rv = self.server.__version__() - exrv = {'proto': repce.repce_version, 'object': Server.version()} - da0 = (rv, exrv) - da1 = ({}, {}) - for i in range(2): - for k, v in da0[i].iteritems(): - da1[i][k] = int(v) - if da1[0] != da1[1]: - raise GsyncdError( - "RePCe major version mismatch: local %s, remote %s" % - (exrv, rv)) - - def rsync(self, files, *args, **kw): - """invoke rsync""" - if not files: - raise GsyncdError("no files to sync") - logging.debug("files: " + ", ".join(files)) - - extra_rsync_flags = [] - # Performance flag, --ignore-missing-args, if rsync version is - # greater than 3.1.0 then include this flag. - if boolify(gconf.rsync_opt_ignore_missing_args) and \ - get_rsync_version(gconf.rsync_command) >= "3.1.0": - extra_rsync_flags = ["--ignore-missing-args"] - - argv = gconf.rsync_command.split() + \ - ['-aR0', '--inplace', '--files-from=-', '--super', - '--stats', '--numeric-ids', '--no-implied-dirs'] + \ - (boolify(gconf.rsync_opt_existing) and ['--existing'] or []) + \ - gconf.rsync_options.split() + \ - (boolify(gconf.sync_xattrs) and ['--xattrs'] or []) + \ - (boolify(gconf.sync_acls) and ['--acls'] or []) + \ - extra_rsync_flags + \ - ['.'] + list(args) - - log_rsync_performance = boolify(gconf.configinterface.get_realtime( - "log_rsync_performance", default_value=False)) + mpi, mpo = pipe() + mh = Popen.fork() + if mh: + # Parent + os.close(mpi) + fcntl.fcntl(mpo, fcntl.F_SETFD, fcntl.FD_CLOEXEC) + d = None + margv = self.make_mount_argv(label) + if self.mntpt: + # mntpt is determined pre-mount + d = self.mntpt + mnt_msg = d + '\0' + encoded_msg = mnt_msg.encode() + os.write(mpo, encoded_msg) + po = Popen(margv, **self.mountkw) + self.handle_mounter(po) + po.terminate_geterr() + logging.debug('auxiliary glusterfs mount in place') + if not d: + # mntpt is determined during mount + d = self.mntpt + mnt_msg = d + '\0' + encoded_msg = mnt_msg.encode() + os.write(mpo, encoded_msg) + encoded_msg = 'M'.encode() + os.write(mpo, encoded_msg) + t = syncdutils.Thread(target=lambda: os.chdir(d)) + t.start() + tlim = rconf.starttime + gconf.get("connection-timeout") + while True: + if not t.isAlive(): + break - if log_rsync_performance: - # use stdout=PIPE only when log_rsync_performance enabled - # Else rsync will write to stdout and nobody is their - # to consume. If PIPE is full rsync hangs. - po = Popen(argv, stdin=subprocess.PIPE, stdout=subprocess.PIPE, - stderr=subprocess.PIPE) + if time.time() >= tlim: + syncdutils.finalize(exval=1) + time.sleep(1) + os.close(mpo) + _, rv = syncdutils.waitpid(mh, 0) + if rv: + rv = (os.WIFEXITED(rv) and os.WEXITSTATUS(rv) or 0) - \ + (os.WIFSIGNALED(rv) and os.WTERMSIG(rv) or 0) + logging.warn(lf('stale mount possibly left behind', + path=d)) + raise GsyncdError("cleaning up temp mountpoint %s " + "failed with status %d" % + (d, rv)) else: - po = Popen(argv, stdin=subprocess.PIPE, stderr=subprocess.PIPE) - - for f in files: - po.stdin.write(f) - po.stdin.write('\0') - - stdout, stderr = po.communicate() - - if kw.get("log_err", False): - for errline in stderr.strip().split("\n")[:-1]: - logging.error(lf("SYNC Error", - sync_engine="Rsync", - error=errline)) - - if log_rsync_performance: - rsync_msg = [] - for line in stdout.split("\n"): - if line.startswith("Number of files:") or \ - line.startswith("Number of regular files transferred:") or \ - line.startswith("Total file size:") or \ - line.startswith("Total transferred file size:") or \ - line.startswith("Literal data:") or \ - line.startswith("Matched data:") or \ - line.startswith("Total bytes sent:") or \ - line.startswith("Total bytes received:") or \ - line.startswith("sent "): - rsync_msg.append(line) - logging.info(lf("rsync performance", - data=", ".join(rsync_msg))) - - return po - - def tarssh(self, files, slaveurl, log_err=False): - """invoke tar+ssh - -z (compress) can be use if needed, but omitting it now - as it results in weird error (tar+ssh errors out (errcode: 2) - """ - if not files: - raise GsyncdError("no files to sync") - logging.debug("files: " + ", ".join(files)) - (host, rdir) = slaveurl.split(':') - tar_cmd = ["tar"] + \ - ["--sparse", "-cf", "-", "--files-from", "-"] - ssh_cmd = gconf.ssh_command_tar.split() + \ - ["-p", str(gconf.ssh_port)] + \ - [host, "tar"] + \ - ["--overwrite", "-xf", "-", "-C", rdir] - p0 = Popen(tar_cmd, stdout=subprocess.PIPE, - stdin=subprocess.PIPE, stderr=subprocess.PIPE) - p1 = Popen(ssh_cmd, stdin=p0.stdout, stderr=subprocess.PIPE) - for f in files: - p0.stdin.write(f) - p0.stdin.write('\n') + rv = 0 + try: + os.setsid() + os.close(mpo) + mntdata = '' + while True: + c = os.read(mpi, 1) + c = c.decode() + if not c: + break + mntdata += c + if mntdata: + mounted = False + if mntdata[-1] == 'M': + mntdata = mntdata[:-1] + assert(mntdata) + mounted = True + assert(mntdata[-1] == '\0') + mntpt = mntdata[:-1] + assert(mntpt) + + umount_master = False + umount_slave = False + if rconf.args.subcmd == "worker" \ + and not unshare_propagation_supported() \ + and not gconf.get("access-mount"): + umount_master = True + if rconf.args.subcmd == "slave" \ + and not gconf.get("slave-access-mount"): + umount_slave = True + + if mounted and (umount_master or umount_slave): + po = self.umount_l(mntpt) + po.terminate_geterr(fail_on_err=False) + if po.returncode != 0: + po.errlog() + rv = po.returncode + logging.debug("Lazy umount done: %s" % mntpt) + if umount_master or umount_slave: + self.cleanup_mntpt(mntpt) + except: + logging.exception('mount cleanup failure:') + rv = 200 + os._exit(rv) + + #Polling the dht.subvol.status value. + RETRIES = 10 + while not gf_mount_ready(): + if RETRIES < 0: + logging.error('Subvols are not up') + break + RETRIES -= 1 + time.sleep(0.2) - p0.stdin.close() - p0.stdout.close() # Allow p0 to receive a SIGPIPE if p1 exits. - # wait for tar to terminate, collecting any errors, further - # waiting for transfer to complete - _, stderr1 = p1.communicate() + logging.debug('auxiliary glusterfs mount prepared') - # stdin and stdout of p0 is already closed, Reset to None and - # wait for child process to complete - p0.stdin = None - p0.stdout = None - p0.communicate() - if log_err: - for errline in stderr1.strip().split("\n")[:-1]: - logging.error(lf("SYNC Error", - sync_engine="Tarssh", - error=errline)) +class DirectMounter(Mounter): - return p1 + """mounter backend which calls mount(8), umount(8) directly""" + mountkw = {'stderr': subprocess.PIPE, 'universal_newlines': True} + glusterprog = 'glusterfs' -class AbstractUrl(object): + @staticmethod + def make_umount_argv(d): + return ['umount', '-l', d] - """abstract base class for url scheme classes""" + def make_mount_argv(self, label=None): + self.mntpt = tempfile.mkdtemp(prefix='gsyncd-aux-mount-') + rconf.mount_point = self.mntpt + return [self.get_glusterprog()] + \ + ['--' + p for p in self.params] + [self.mntpt] - def __init__(self, path, pattern): - m = re.search(pattern, path) - if not m: - raise GsyncdError("malformed path") - self.path = path - return m.groups() + def cleanup_mntpt(self, mntpt=None): + if not mntpt: + mntpt = self.mntpt + errno_wrap(os.rmdir, [mntpt], [ENOENT, EBUSY]) - @property - def scheme(self): - return type(self).__name__.lower() - def canonical_path(self): - return self.path +class MountbrokerMounter(Mounter): - def get_url(self, canonical=False, escaped=False): - """format self's url in various styles""" - if canonical: - pa = self.canonical_path() - else: - pa = self.path - u = "://".join((self.scheme, pa)) - if escaped: - u = syncdutils.escape(u) - return u + """mounter backend using the mountbroker gluster service""" - @property - def url(self): - return self.get_url() + mountkw = {'stderr': subprocess.PIPE, 'stdout': subprocess.PIPE, + 'universal_newlines': True} + glusterprog = 'gluster' + @classmethod + def make_cli_argv(cls): + return [cls.get_glusterprog()] + ['--remote-host=localhost'] + \ + gconf.get("gluster-cli-options").split() + ['system::'] -class FILE(AbstractUrl, SlaveLocal, SlaveRemote): + @classmethod + def make_umount_argv(cls, d): + return cls.make_cli_argv() + ['umount', d, 'lazy'] - """scheme class for file:// urls + def make_mount_argv(self, label): + return self.make_cli_argv() + \ + ['mount', label, 'user-map-root=' + + syncdutils.getusername()] + self.params - can be used to represent a file slave server - on slave side, or interface to a remote file - file server on master side - """ + def handle_mounter(self, po): + self.mntpt = po.stdout.readline()[:-1] + rconf.mount_point = self.mntpt + rconf.mountbroker = True + self.umount_cmd = self.make_cli_argv() + ['umount'] + rconf.mbr_umount_cmd = self.umount_cmd + po.stdout.close() + sup(self, po) + if po.returncode != 0: + # if cli terminated with error due to being + # refused by glusterd, what it put + # out on stdout is a diagnostic message + logging.error(lf('glusterd answered', mnt=self.mntpt)) - class FILEServer(Server): - """included server flavor""" - pass +class GLUSTERServer(Server): - server = FILEServer + "server enhancements for a glusterfs backend""" - def __init__(self, path): - sup(self, path, '^/') + @classmethod + def _attr_unpack_dict(cls, xattr, extra_fields=''): + """generic volume mark fetching/parsing backed""" + fmt_string = cls.NTV_FMTSTR + extra_fields + buf = Xattr.lgetxattr('.', xattr, struct.calcsize(fmt_string)) + buf = str_to_bytearray(buf) + vm = struct.unpack(fmt_string, buf) + m = re.match( + '(.{8})(.{4})(.{4})(.{4})(.{12})', + "".join(['%02x' % x for x in vm[2:18]])) + uuid = '-'.join(m.groups()) + volinfo = {'version': vm[0:2], + 'uuid': uuid, + 'retval': vm[18], + 'volume_mark': vm[19:21], + } + if extra_fields: + return volinfo, vm[-len(extra_fields):] + else: + return volinfo - def connect(self): - """inhibit the resource beyond""" - os.chdir(self.path) + @classmethod + def foreign_volume_infos(cls): + """return list of valid (not expired) foreign volume marks""" + dict_list = [] + xattr_list = Xattr.llistxattr_buf('.') + for ele in xattr_list: + if ele.find('.'.join([cls.GX_NSPACE, 'volume-mark', ''])) == 0: + d, x = cls._attr_unpack_dict(ele, cls.FRGN_XTRA_FMT) + now = int(time.time()) + if x[0] > now: + logging.debug("volinfo[%s] expires: %d " + "(%d sec later)" % + (d['uuid'], x[0], x[0] - now)) + d['timeout'] = x[0] + dict_list.append(d) + else: + try: + Xattr.lremovexattr('.', ele) + except OSError: + pass + return dict_list - def rsync(self, files, log_err=False): - return sup(self, files, self.path, log_err=log_err) + @classmethod + def native_volume_info(cls): + """get the native volume mark of the underlying gluster volume""" + try: + return cls._attr_unpack_dict('.'.join([cls.GX_NSPACE, + 'volume-mark'])) + except OSError: + ex = sys.exc_info()[1] + if ex.errno != ENODATA: + raise -class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote): +class GLUSTER(object): """scheme class for gluster:// urls @@ -1255,241 +1093,17 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote): (slave-ish features come from the mixins, master functionality is outsourced to GMaster from master) """ - - class GLUSTERServer(Server): - - "server enhancements for a glusterfs backend""" - - @classmethod - def _attr_unpack_dict(cls, xattr, extra_fields=''): - """generic volume mark fetching/parsing backed""" - fmt_string = cls.NTV_FMTSTR + extra_fields - buf = Xattr.lgetxattr('.', xattr, struct.calcsize(fmt_string)) - vm = struct.unpack(fmt_string, buf) - m = re.match( - '(.{8})(.{4})(.{4})(.{4})(.{12})', - "".join(['%02x' % x for x in vm[2:18]])) - uuid = '-'.join(m.groups()) - volinfo = {'version': vm[0:2], - 'uuid': uuid, - 'retval': vm[18], - 'volume_mark': vm[19:21], - } - if extra_fields: - return volinfo, vm[-len(extra_fields):] - else: - return volinfo - - @classmethod - def foreign_volume_infos(cls): - """return list of valid (not expired) foreign volume marks""" - dict_list = [] - xattr_list = Xattr.llistxattr_buf('.') - for ele in xattr_list: - if ele.find('.'.join([cls.GX_NSPACE, 'volume-mark', ''])) == 0: - d, x = cls._attr_unpack_dict(ele, cls.FRGN_XTRA_FMT) - now = int(time.time()) - if x[0] > now: - logging.debug("volinfo[%s] expires: %d " - "(%d sec later)" % - (d['uuid'], x[0], x[0] - now)) - d['timeout'] = x[0] - dict_list.append(d) - else: - try: - Xattr.lremovexattr('.', ele) - except OSError: - pass - return dict_list - - @classmethod - def native_volume_info(cls): - """get the native volume mark of the underlying gluster volume""" - try: - return cls._attr_unpack_dict('.'.join([cls.GX_NSPACE, - 'volume-mark'])) - except OSError: - ex = sys.exc_info()[1] - if ex.errno != ENODATA: - raise - server = GLUSTERServer - def __init__(self, path): - self.host, self.volume = sup(self, path, '^(%s):(.+)' % HostRX.pattern) - - def canonical_path(self): - return ':'.join([gethostbyname(self.host), self.volume]) - - def can_connect_to(self, remote): - """determine our position in the connectibility matrix""" - return not remote or \ - (isinstance(remote, SSH) and isinstance(remote.inner_rsc, GLUSTER)) + def __init__(self, host, volume): + self.path = "%s:%s" % (host, volume) + self.host = host + self.volume = volume - class Mounter(object): - - """Abstract base class for mounter backends""" - - def __init__(self, params): - self.params = params - self.mntpt = None - - @classmethod - def get_glusterprog(cls): - return os.path.join(gconf.gluster_command_dir, cls.glusterprog) - - def umount_l(self, d): - """perform lazy umount""" - po = Popen(self.make_umount_argv(d), stderr=subprocess.PIPE) - po.wait() - return po - - @classmethod - def make_umount_argv(cls, d): - raise NotImplementedError - - def make_mount_argv(self, *a): - raise NotImplementedError - - def cleanup_mntpt(self, *a): - pass - - def handle_mounter(self, po): - po.wait() - - def inhibit(self, *a): - """inhibit a gluster filesystem - - Mount glusterfs over a temporary mountpoint, - change into the mount, and lazy unmount the - filesystem. - """ - - mpi, mpo = os.pipe() - mh = Popen.fork() - if mh: - os.close(mpi) - fcntl.fcntl(mpo, fcntl.F_SETFD, fcntl.FD_CLOEXEC) - d = None - margv = self.make_mount_argv(*a) - if self.mntpt: - # mntpt is determined pre-mount - d = self.mntpt - os.write(mpo, d + '\0') - po = Popen(margv, **self.mountkw) - self.handle_mounter(po) - po.terminate_geterr() - logging.debug('auxiliary glusterfs mount in place') - if not d: - # mntpt is determined during mount - d = self.mntpt - os.write(mpo, d + '\0') - os.write(mpo, 'M') - t = syncdutils.Thread(target=lambda: os.chdir(d)) - t.start() - tlim = gconf.starttime + int(gconf.connection_timeout) - while True: - if not t.isAlive(): - break - if time.time() >= tlim: - syncdutils.finalize(exval=1) - time.sleep(1) - os.close(mpo) - _, rv = syncdutils.waitpid(mh, 0) - if rv: - rv = (os.WIFEXITED(rv) and os.WEXITSTATUS(rv) or 0) - \ - (os.WIFSIGNALED(rv) and os.WTERMSIG(rv) or 0) - logging.warn(lf('stale mount possibly left behind', - path=d)) - raise GsyncdError("cleaning up temp mountpoint %s " - "failed with status %d" % - (d, rv)) - else: - rv = 0 - try: - os.setsid() - os.close(mpo) - mntdata = '' - while True: - c = os.read(mpi, 1) - if not c: - break - mntdata += c - if mntdata: - mounted = False - if mntdata[-1] == 'M': - mntdata = mntdata[:-1] - assert(mntdata) - mounted = True - assert(mntdata[-1] == '\0') - mntpt = mntdata[:-1] - assert(mntpt) - if mounted and not boolify(gconf.access_mount): - po = self.umount_l(mntpt) - po.terminate_geterr(fail_on_err=False) - if po.returncode != 0: - po.errlog() - rv = po.returncode - if not boolify(gconf.access_mount): - self.cleanup_mntpt(mntpt) - except: - logging.exception('mount cleanup failure:') - rv = 200 - os._exit(rv) - logging.debug('auxiliary glusterfs mount prepared') - - class DirectMounter(Mounter): - - """mounter backend which calls mount(8), umount(8) directly""" - - mountkw = {'stderr': subprocess.PIPE} - glusterprog = 'glusterfs' - - @staticmethod - def make_umount_argv(d): - return ['umount', '-l', d] - - def make_mount_argv(self): - self.mntpt = tempfile.mkdtemp(prefix='gsyncd-aux-mount-') - mntpt_list.append(self.mntpt) - return [self.get_glusterprog()] + \ - ['--' + p for p in self.params] + [self.mntpt] - - def cleanup_mntpt(self, mntpt=None): - if not mntpt: - mntpt = self.mntpt - errno_wrap(os.rmdir, [mntpt], [ENOENT, EBUSY]) - - class MountbrokerMounter(Mounter): - - """mounter backend using the mountbroker gluster service""" - - mountkw = {'stderr': subprocess.PIPE, 'stdout': subprocess.PIPE} - glusterprog = 'gluster' - - @classmethod - def make_cli_argv(cls): - return [cls.get_glusterprog()] + ['--remote-host=localhost'] + \ - gconf.gluster_cli_options.split() + ['system::'] - - @classmethod - def make_umount_argv(cls, d): - return cls.make_cli_argv() + ['umount', d, 'lazy'] - - def make_mount_argv(self, label): - return self.make_cli_argv() + \ - ['mount', label, 'user-map-root=' + - syncdutils.getusername()] + self.params - - def handle_mounter(self, po): - self.mntpt = po.stdout.readline()[:-1] - po.stdout.close() - sup(self, po) - if po.returncode != 0: - # if cli terminated with error due to being - # refused by glusterd, what it put - # out on stdout is a diagnostic message - logging.error(lf('glusterd answered', mnt=self.mntpt)) + global slv_volume + global slv_host + slv_volume = self.volume + slv_host = self.host def connect(self): """inhibit the resource beyond @@ -1501,23 +1115,29 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote): logging.info("Mounting gluster volume locally...") t0 = time.time() - label = getattr(gconf, 'mountbroker', None) + label = gconf.get('mountbroker', None) if not label and not privileged(): label = syncdutils.getusername() - mounter = label and self.MountbrokerMounter or self.DirectMounter - params = gconf.gluster_params.split() + \ - (gconf.gluster_log_level and ['log-level=' + - gconf.gluster_log_level] or []) + \ - ['log-file=' + gconf.gluster_log_file, 'volfile-server=' + - self.host, 'volfile-id=' + self.volume, 'client-pid=-1'] - mounter(params).inhibit(*[l for l in [label] if l]) + mounter = label and MountbrokerMounter or DirectMounter + + log_file = gconf.get("gluster-log-file") + if rconf.args.subcmd == "slave": + log_file = gconf.get("slave-gluster-log-file") + + log_level = gconf.get("gluster-log-level") + if rconf.args.subcmd == "slave": + log_level = gconf.get("slave-gluster-log-level") + + params = gconf.get("gluster-params").split() + \ + ['log-level=' + log_level] + \ + ['log-file=' + log_file, 'volfile-server=' + self.host] + \ + ['volfile-id=' + self.volume, 'client-pid=-1'] + + self.mounter = mounter(params) + self.mounter.inhibit(label) logging.info(lf("Mounted gluster volume", duration="%.4f" % (time.time() - t0))) - def connect_remote(self, *a, **kw): - sup(self, *a, **kw) - self.slavedir = "/proc/%d/cwd" % self.server.pid() - def gmaster_instantiate_tuple(self, slave): """return a tuple of the 'one shot' and the 'main crawl' class instance""" @@ -1525,7 +1145,7 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote): gmaster_builder()(self, slave), gmaster_builder('changeloghistory')(self, slave)) - def service_loop(self, *args): + def service_loop(self, slave=None): """enter service loop - if slave given, instantiate GMaster and @@ -1533,171 +1153,173 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote): master behavior - else do that's what's inherited """ - if args: - slave = args[0] - if gconf.local_path: - class brickserver(FILE.FILEServer): - local_path = gconf.local_path - aggregated = self.server - - @classmethod - def entries(cls, path): - e = super(brickserver, cls).entries(path) - # on the brick don't mess with /.glusterfs - if path == '.': - try: - e.remove('.glusterfs') - e.remove('.trashcan') - except ValueError: - pass - return e - - @classmethod - def lstat(cls, e): - """ path based backend stat """ - return super(brickserver, cls).lstat(e) - - @classmethod - def gfid(cls, e): - """ path based backend gfid fetch """ - return super(brickserver, cls).gfid(e) - - @classmethod - def linkto_check(cls, e): - return super(brickserver, cls).linkto_check(e) - if gconf.slave_id: - # define {,set_}xtime in slave, thus preempting - # the call to remote, so that it takes data from - # the local brick - slave.server.xtime = types.MethodType( - lambda _self, path, uuid: ( - brickserver.xtime(path, - uuid + '.' + gconf.slave_id) - ), - slave.server) - slave.server.stime = types.MethodType( - lambda _self, path, uuid: ( - brickserver.stime(path, - uuid + '.' + gconf.slave_id) - ), - slave.server) - slave.server.entry_stime = types.MethodType( - lambda _self, path, uuid: ( - brickserver.entry_stime( - path, - uuid + '.' + gconf.slave_id) - ), - slave.server) - slave.server.set_stime = types.MethodType( - lambda _self, path, uuid, mark: ( - brickserver.set_stime(path, - uuid + '.' + gconf.slave_id, - mark) - ), - slave.server) - slave.server.set_entry_stime = types.MethodType( - lambda _self, path, uuid, mark: ( - brickserver.set_entry_stime( - path, - uuid + '.' + gconf.slave_id, - mark) - ), - slave.server) - (g1, g2, g3) = self.gmaster_instantiate_tuple(slave) - g1.master.server = brickserver - g2.master.server = brickserver - g3.master.server = brickserver - else: - (g1, g2, g3) = self.gmaster_instantiate_tuple(slave) - g1.master.server.aggregated = gmaster.master.server - g2.master.server.aggregated = gmaster.master.server - g3.master.server.aggregated = gmaster.master.server - # bad bad bad: bad way to do things like this - # need to make this elegant - # register the crawlers and start crawling - # g1 ==> Xsync, g2 ==> config.change_detector(changelog by default) - # g3 ==> changelog History - changelog_register_failed = False - (inf, ouf, ra, wa) = gconf.rpc_fd.split(',') - changelog_agent = RepceClient(int(inf), int(ouf)) - master_name, slave_data = get_master_and_slave_data_from_args( - sys.argv) - status = GeorepStatus(gconf.state_file, gconf.local_node, - gconf.local_path, - gconf.local_node_id, - master_name, slave_data) - status.reset_on_worker_start() - rv = changelog_agent.version() - if int(rv) != CHANGELOG_AGENT_CLIENT_VERSION: + if rconf.args.subcmd == "slave": + if gconf.get("use-rsync-xattrs") and not privileged(): raise GsyncdError( - "RePCe major version mismatch(changelog agent): " - "local %s, remote %s" % - (CHANGELOG_AGENT_CLIENT_VERSION, rv)) + "using rsync for extended attributes is not supported") + + repce = RepceServer( + self.server, sys.stdin, sys.stdout, gconf.get("sync-jobs")) + t = syncdutils.Thread(target=lambda: (repce.service_loop(), + syncdutils.finalize())) + t.start() + logging.info("slave listening") + if gconf.get("slave-timeout") and gconf.get("slave-timeout") > 0: + while True: + lp = self.server.last_keep_alive + time.sleep(gconf.get("slave-timeout")) + if lp == self.server.last_keep_alive: + logging.info( + lf("connection inactive, stopping", + timeout=gconf.get("slave-timeout"))) + break + else: + select((), (), ()) - try: - workdir = g2.setup_working_dir() - # Register only when change_detector is not set to - # xsync, else agent will generate changelog files - # in .processing directory of working dir - if gconf.change_detector != 'xsync': - # register with the changelog library - # 9 == log level (DEBUG) - # 5 == connection retries - changelog_agent.init() - changelog_agent.register(gconf.local_path, - workdir, gconf.changelog_log_file, - get_changelog_log_level( - gconf.changelog_log_level), - g2.CHANGELOG_CONN_RETRIES) - - register_time = int(time.time()) - g2.register(register_time, changelog_agent, status) - g3.register(register_time, changelog_agent, status) - except ChangelogException as e: - logging.error(lf("Changelog register failed", error=e)) - sys.exit(1) - - g1.register(status=status) - logging.info(lf("Register time", - time=register_time)) - # oneshot: Try to use changelog history api, if not - # available switch to FS crawl - # Note: if config.change_detector is xsync then - # it will not use changelog history api - try: - g3.crawlwrap(oneshot=True) - except PartialHistoryAvailable as e: - logging.info(lf('Partial history available, using xsync crawl' - ' after consuming history', - till=e)) - g1.crawlwrap(oneshot=True, register_time=register_time) - except ChangelogHistoryNotAvailable: - logging.info('Changelog history not available, using xsync') - g1.crawlwrap(oneshot=True, register_time=register_time) - except NoStimeAvailable: - logging.info('No stime available, using xsync crawl') - g1.crawlwrap(oneshot=True, register_time=register_time) - except ChangelogException as e: - logging.error(lf("Changelog History Crawl failed", - error=e)) - sys.exit(1) + return - try: - g2.crawlwrap() - except ChangelogException as e: - logging.error(lf("Changelog crawl failed", error=e)) - sys.exit(1) - else: - sup(self, *args) + class brickserver(Server): + local_path = rconf.args.local_path + aggregated = self.server - def rsync(self, files, log_err=False): - return sup(self, files, self.slavedir, log_err=log_err) + @classmethod + def entries(cls, path): + e = super(brickserver, cls).entries(path) + # on the brick don't mess with /.glusterfs + if path == '.': + try: + e.remove('.glusterfs') + e.remove('.trashcan') + except ValueError: + pass + return e + + @classmethod + def lstat(cls, e): + """ path based backend stat """ + return super(brickserver, cls).lstat(e) + + @classmethod + def gfid(cls, e): + """ path based backend gfid fetch """ + return super(brickserver, cls).gfid(e) + + @classmethod + def linkto_check(cls, e): + return super(brickserver, cls).linkto_check(e) + + # define {,set_}xtime in slave, thus preempting + # the call to remote, so that it takes data from + # the local brick + slave.server.xtime = types.MethodType( + lambda _self, path, uuid: ( + brickserver.xtime(path, + uuid + '.' + rconf.args.slave_id) + ), + slave.server) + slave.server.stime = types.MethodType( + lambda _self, path, uuid: ( + brickserver.stime(path, + uuid + '.' + rconf.args.slave_id) + ), + slave.server) + slave.server.entry_stime = types.MethodType( + lambda _self, path, uuid: ( + brickserver.entry_stime( + path, + uuid + '.' + rconf.args.slave_id) + ), + slave.server) + slave.server.set_stime = types.MethodType( + lambda _self, path, uuid, mark: ( + brickserver.set_stime(path, + uuid + '.' + rconf.args.slave_id, + mark) + ), + slave.server) + slave.server.set_entry_stime = types.MethodType( + lambda _self, path, uuid, mark: ( + brickserver.set_entry_stime( + path, + uuid + '.' + rconf.args.slave_id, + mark) + ), + slave.server) + + (g1, g2, g3) = self.gmaster_instantiate_tuple(slave) + g1.master.server = brickserver + g2.master.server = brickserver + g3.master.server = brickserver + + # bad bad bad: bad way to do things like this + # need to make this elegant + # register the crawlers and start crawling + # g1 ==> Xsync, g2 ==> config.change_detector(changelog by default) + # g3 ==> changelog History + status = GeorepStatus(gconf.get("state-file"), + rconf.args.local_node, + rconf.args.local_path, + rconf.args.local_node_id, + rconf.args.master, + rconf.args.slave) + status.reset_on_worker_start() - def tarssh(self, files, log_err=False): - return sup(self, files, self.slavedir, log_err=log_err) + try: + workdir = g2.setup_working_dir() + # Register only when change_detector is not set to + # xsync, else agent will generate changelog files + # in .processing directory of working dir + if gconf.get("change-detector") != 'xsync': + # register with the changelog library + # 9 == log level (DEBUG) + # 5 == connection retries + libgfchangelog.register(rconf.args.local_path, + workdir, + gconf.get("changelog-log-file"), + get_changelog_log_level( + gconf.get("changelog-log-level")), + g2.CHANGELOG_CONN_RETRIES) + + register_time = int(time.time()) + g2.register(register_time, status) + g3.register(register_time, status) + except ChangelogException as e: + logging.error(lf("Changelog register failed", error=e)) + sys.exit(1) + + g1.register(status=status) + logging.info(lf("Register time", + time=register_time)) + # oneshot: Try to use changelog history api, if not + # available switch to FS crawl + # Note: if config.change_detector is xsync then + # it will not use changelog history api + try: + g3.crawlwrap(oneshot=True) + except PartialHistoryAvailable as e: + logging.info(lf('Partial history available, using xsync crawl' + ' after consuming history', + till=e)) + g1.crawlwrap(oneshot=True, register_time=register_time) + except ChangelogHistoryNotAvailable: + logging.info('Changelog history not available, using xsync') + g1.crawlwrap(oneshot=True, register_time=register_time) + except NoStimeAvailable: + logging.info('No stime available, using xsync crawl') + g1.crawlwrap(oneshot=True, register_time=register_time) + except ChangelogException as e: + logging.error(lf("Changelog History Crawl failed", + error=e)) + sys.exit(1) + + try: + g2.crawlwrap() + except ChangelogException as e: + logging.error(lf("Changelog crawl failed", error=e)) + sys.exit(1) -class SSH(AbstractUrl, SlaveRemote): +class SSH(object): """scheme class for ssh:// urls @@ -1705,13 +1327,9 @@ class SSH(AbstractUrl, SlaveRemote): implementing an ssh based proxy """ - def __init__(self, path): - self.remote_addr, inner_url = sup(self, path, - '^((?:%s@)?%s):(.+)' % - tuple([r.pattern - for r in (UserRX, HostRX)])) - self.inner_rsc = parse_url(inner_url) - self.volume = inner_url[1:] + def __init__(self, host, volume): + self.remote_addr = host + self.volume = volume @staticmethod def parse_ssh_address(self): @@ -1723,35 +1341,28 @@ class SSH(AbstractUrl, SlaveRemote): self.remotehost = h return {'user': u, 'host': h} - def canonical_path(self): - rap = self.parse_ssh_address(self) - remote_addr = '@'.join([rap['user'], gethostbyname(rap['host'])]) - return ':'.join([remote_addr, self.inner_rsc.get_url(canonical=True)]) - - def can_connect_to(self, remote): - """determine our position in the connectibility matrix""" - return False - - def start_fd_client(self, *a, **opts): - """customizations for client startup + def start_fd_client(self, i, o): + """set up RePCe client, handshake with server - - be a no-op if we are to daemonize (client startup is deferred - to post-daemon stage) - - determine target url for rsync after consulting server + It's cut out as a separate method to let + subclasses hook into client startup """ - if opts.get('deferred'): - return a - sup(self, *a) - ityp = type(self.inner_rsc) - if ityp == FILE: - slavepath = self.inner_rsc.path - elif ityp == GLUSTER: - slavepath = "/proc/%d/cwd" % self.server.pid() - else: - raise NotImplementedError + self.server = RepceClient(i, o) + rv = self.server.__version__() + exrv = {'proto': repce.repce_version, 'object': Server.version()} + da0 = (rv, exrv) + da1 = ({}, {}) + for i in range(2): + for k, v in da0[i].items(): + da1[i][k] = int(v) + if da1[0] != da1[1]: + raise GsyncdError( + "RePCe major version mismatch: local %s, remote %s" % + (exrv, rv)) + slavepath = "/proc/%d/cwd" % self.server.pid() self.slaveurl = ':'.join([self.remote_addr, slavepath]) - def connect_remote(self, go_daemon=None): + def connect_remote(self): """connect to inner slave url through outer ssh url Wrap the connecting utility in ssh. @@ -1769,49 +1380,204 @@ class SSH(AbstractUrl, SlaveRemote): [NB. ATM gluster product does not makes use of interactive authentication.] """ - if go_daemon == 'done': - return self.start_fd_client(*self.fd_pair) - syncdutils.setup_ssh_ctl(tempfile.mkdtemp(prefix='gsyncd-aux-ssh-'), self.remote_addr, - self.inner_rsc.url) + self.volume) - deferred = go_daemon == 'postconn' logging.info("Initializing SSH connection between master and slave...") t0 = time.time() - ret = sup(self, gconf.ssh_command.split() + - ["-p", str(gconf.ssh_port)] + - gconf.ssh_ctl_args + [self.remote_addr], - slave=self.inner_rsc.url, deferred=deferred) + + extra_opts = [] + remote_gsyncd = gconf.get("remote-gsyncd") + if remote_gsyncd == "": + remote_gsyncd = "/nonexistent/gsyncd" + + if gconf.get("use-rsync-xattrs"): + extra_opts.append('--use-rsync-xattrs') + + args_to_slave = [gconf.get("ssh-command")] + \ + gconf.get("ssh-options").split() + \ + ["-p", str(gconf.get("ssh-port"))] + \ + rconf.ssh_ctl_args + [self.remote_addr] + \ + [remote_gsyncd, "slave"] + \ + extra_opts + \ + [rconf.args.master, rconf.args.slave] + \ + [ + '--master-node', rconf.args.local_node, + '--master-node-id', rconf.args.local_node_id, + '--master-brick', rconf.args.local_path, + '--local-node', rconf.args.resource_remote, + '--local-node-id', rconf.args.resource_remote_id] + \ + [ + # Add all config arguments here, slave gsyncd will not use + # config file in slave side, so all overriding options should + # be sent as arguments + '--slave-timeout', str(gconf.get("slave-timeout")), + '--slave-log-level', gconf.get("slave-log-level"), + '--slave-gluster-log-level', + gconf.get("slave-gluster-log-level"), + '--slave-gluster-command-dir', + gconf.get("slave-gluster-command-dir"), + '--master-dist-count', + str(gconf.get("master-distribution-count"))] + + if gconf.get("slave-access-mount"): + args_to_slave.append('--slave-access-mount') + + if rconf.args.debug: + args_to_slave.append('--debug') + + po = Popen(args_to_slave, + stdin=subprocess.PIPE, stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + rconf.transport = po + self.start_fd_client(po.stdout, po.stdin) logging.info(lf("SSH connection between master and slave established.", duration="%.4f" % (time.time() - t0))) - if deferred: - # send a message to peer so that we can wait for - # the answer from which we know connection is - # established and we can proceed with daemonization - # (doing that too early robs the ssh passwd prompt...) - # However, we'd better not start the RepceClient - # before daemonization (that's not preserved properly - # in daemon), we just do a an ad-hoc linear put/get. - i, o = ret - inf = os.fdopen(i) - repce.send(o, None, '__repce_version__') - select((inf,), (), ()) - repce.recv(inf) - # hack hack hack: store a global reference to the file - # to save it from getting GC'd which implies closing it - gconf.permanent_handles.append(inf) - self.fd_pair = (i, o) - return 'should' - - def rsync(self, files, log_err=False): - return sup(self, files, '-e', - " ".join(gconf.ssh_command.split() + - ["-p", str(gconf.ssh_port)] + - gconf.ssh_ctl_args), - *(gconf.rsync_ssh_options.split() + [self.slaveurl]), - log_err=log_err) + def rsync(self, files, *args, **kw): + """invoke rsync""" + if not files: + raise GsyncdError("no files to sync") + logging.debug("files: " + ", ".join(files)) + + extra_rsync_flags = [] + # Performance flag, --ignore-missing-args, if rsync version is + # greater than 3.1.0 then include this flag. + if gconf.get("rsync-opt-ignore-missing-args") and \ + get_rsync_version(gconf.get("rsync-command")) >= "3.1.0": + extra_rsync_flags = ["--ignore-missing-args"] + + rsync_ssh_opts = [gconf.get("ssh-command")] + \ + gconf.get("ssh-options").split() + \ + ["-p", str(gconf.get("ssh-port"))] + \ + rconf.ssh_ctl_args + \ + gconf.get("rsync-ssh-options").split() + + argv = [ + gconf.get("rsync-command"), + '-aR0', + '--inplace', + '--files-from=-', + '--super', + '--stats', + '--numeric-ids', + '--no-implied-dirs' + ] + + if gconf.get("rsync-opt-existing"): + argv += ["--existing"] + + if gconf.get("sync-xattrs"): + argv += ['--xattrs'] + + if gconf.get("sync-acls"): + argv += ['--acls'] + + argv = argv + \ + gconf.get("rsync-options").split() + \ + extra_rsync_flags + ['.'] + \ + ["-e", " ".join(rsync_ssh_opts)] + \ + [self.slaveurl] + + log_rsync_performance = gconf.getr("log-rsync-performance", False) + + if log_rsync_performance: + # use stdout=PIPE only when log_rsync_performance enabled + # Else rsync will write to stdout and nobody is there + # to consume. If PIPE is full rsync hangs. + po = Popen(argv, stdin=subprocess.PIPE, stdout=subprocess.PIPE, + stderr=subprocess.PIPE, universal_newlines=True) + else: + po = Popen(argv, stdin=subprocess.PIPE, stderr=subprocess.PIPE, + universal_newlines=True) + + for f in files: + po.stdin.write(f) + po.stdin.write('\0') + + stdout, stderr = po.communicate() + + if kw.get("log_err", False): + for errline in stderr.strip().split("\n")[:-1]: + logging.error(lf("SYNC Error", + sync_engine="Rsync", + error=errline)) + + if log_rsync_performance: + rsync_msg = [] + for line in stdout.split("\n"): + if line.startswith("Number of files:") or \ + line.startswith("Number of regular files transferred:") or \ + line.startswith("Total file size:") or \ + line.startswith("Total transferred file size:") or \ + line.startswith("Literal data:") or \ + line.startswith("Matched data:") or \ + line.startswith("Total bytes sent:") or \ + line.startswith("Total bytes received:") or \ + line.startswith("sent "): + rsync_msg.append(line) + logging.info(lf("rsync performance", + data=", ".join(rsync_msg))) + + return po def tarssh(self, files, log_err=False): - return sup(self, files, self.slaveurl, log_err=log_err) + """invoke tar+ssh + -z (compress) can be use if needed, but omitting it now + as it results in weird error (tar+ssh errors out (errcode: 2) + """ + if not files: + raise GsyncdError("no files to sync") + logging.debug("files: " + ", ".join(files)) + (host, rdir) = self.slaveurl.split(':') + + tar_cmd = ["tar"] + \ + ["--sparse", "-cf", "-", "--files-from", "-"] + ssh_cmd = gconf.get("ssh-command").split() + \ + gconf.get("ssh-options-tar").split() + \ + ["-p", str(gconf.get("ssh-port"))] + \ + [host, "tar"] + \ + ["--overwrite", "-xf", "-", "-C", rdir] + p0 = Popen(tar_cmd, stdout=subprocess.PIPE, + stdin=subprocess.PIPE, stderr=subprocess.PIPE, + universal_newlines=True) + p1 = Popen(ssh_cmd, stdin=p0.stdout, stderr=subprocess.PIPE, + universal_newlines=True) + for f in files: + p0.stdin.write(f) + p0.stdin.write('\n') + + p0.stdin.close() + p0.stdout.close() # Allow p0 to receive a SIGPIPE if p1 exits. + + # stdin and stdout of p0 is already closed, Reset to None and + # wait for child process to complete + p0.stdin = None + p0.stdout = None + + def wait_for_tar(p0): + _, stderr = p0.communicate() + if log_err: + for errline in stderr.strip().split("\n")[:-1]: + if "No such file or directory" not in errline: + logging.error(lf("SYNC Error", + sync_engine="Tarssh", + error=errline)) + + t = syncdutils.Thread(target=wait_for_tar, args=(p0, )) + # wait for tar to terminate, collecting any errors, further + # waiting for transfer to complete + t.start() + + # wait for ssh process + _, stderr1 = p1.communicate() + t.join() + + if log_err: + for errline in stderr1.strip().split("\n")[:-1]: + logging.error(lf("SYNC Error", + sync_engine="Tarssh", + error=errline)) + + return p1 |
