From 950371be29d029179ac5cd0ad2dfdbfcd4467b96 Mon Sep 17 00:00:00 2001 From: Avra Sengupta Date: Mon, 27 May 2013 22:23:57 +0530 Subject: move 'xlators/marker/utils/' to 'geo-replication/' directory Change-Id: Ibd0faefecc15b6713eda28bc96794ae58aff45aa BUG: 847839 Original Author: Amar Tumballi Signed-off-by: Avra Sengupta Reviewed-on: http://review.gluster.org/5133 Tested-by: Gluster Build System Reviewed-by: Vijay Bellur --- geo-replication/syncdaemon/resource.py | 972 +++++++++++++++++++++++++++++++++ 1 file changed, 972 insertions(+) create mode 100644 geo-replication/syncdaemon/resource.py (limited to 'geo-replication/syncdaemon/resource.py') diff --git a/geo-replication/syncdaemon/resource.py b/geo-replication/syncdaemon/resource.py new file mode 100644 index 000000000..73102fbcb --- /dev/null +++ b/geo-replication/syncdaemon/resource.py @@ -0,0 +1,972 @@ +import re +import os +import sys +import stat +import time +import fcntl +import errno +import struct +import socket +import logging +import tempfile +import threading +import subprocess +from errno import EEXIST, ENOENT, ENODATA, ENOTDIR, ELOOP, EISDIR +from select import error as SelectError + +from gconf import gconf +import repce +from repce import RepceServer, RepceClient +from master import gmaster_builder +import syncdutils +from syncdutils import GsyncdError, select, privileged, boolify + +UrlRX = re.compile('\A(\w+)://([^ *?[]*)\Z') +HostRX = re.compile('[a-z\d](?:[a-z\d.-]*[a-z\d])?', re.I) +UserRX = re.compile("[\w!\#$%&'*+-\/=?^_`{|}~]+") + +def sup(x, *a, **kw): + """a rubyesque "super" for python ;) + + invoke caller method in parent class with given args. + """ + return getattr(super(type(x), x), sys._getframe(1).f_code.co_name)(*a, **kw) + +def desugar(ustr): + """transform sugared url strings to standard :// form + + parsing logic enforces the constraint that sugared forms should contatin + a ':' or a '/', which ensures that sugared urls do not conflict with + gluster volume names. + """ + m = re.match('([^:]*):(.*)', ustr) + if m: + if not m.groups()[0]: + return "gluster://localhost" + ustr + elif '@' in m.groups()[0] or re.search('[:/]', m.groups()[1]): + return "ssh://" + ustr + else: + return "gluster://" + ustr + else: + if ustr[0] != '/': + raise GsyncdError("cannot resolve sugared url '%s'" % ustr) + ap = os.path.normpath(ustr) + if ap.startswith('//'): + ap = ap[1:] + return "file://" + ap + +def gethostbyname(hnam): + """gethostbyname wrapper""" + try: + return socket.gethostbyname(hnam) + except socket.gaierror: + ex = sys.exc_info()[1] + raise GsyncdError("failed to resolve %s: %s" % \ + (hnam, ex.strerror)) + +def parse_url(ustr): + """instantiate an url object by scheme-to-class dispatch + + The url classes taken into consideration are the ones in + this module whose names are full-caps. + """ + m = UrlRX.match(ustr) + if not m: + ustr = desugar(ustr) + m = UrlRX.match(ustr) + if not m: + raise GsyncdError("malformed url") + sch, path = m.groups() + this = sys.modules[__name__] + if not hasattr(this, sch.upper()): + raise GsyncdError("unknown url scheme " + sch) + return getattr(this, sch.upper())(path) + + +class _MetaXattr(object): + """singleton class, a lazy wrapper around the + libcxattr module + + libcxattr (a heavy import due to ctypes) is + loaded only when when the single + instance is tried to be used. + + This reduces runtime for those invocations + which do not need filesystem manipulation + (eg. for config, url parsing) + """ + + def __getattr__(self, meth): + from libcxattr import Xattr as LXattr + xmeth = [ m for m in dir(LXattr) if m[0] != '_' ] + if not meth in xmeth: + return + for m in xmeth: + setattr(self, m, getattr(LXattr, m)) + return getattr(self, meth) + +Xattr = _MetaXattr() + + +class Popen(subprocess.Popen): + """customized subclass of subprocess.Popen with a ring + buffer for children error output""" + + @classmethod + def init_errhandler(cls): + """start the thread which handles children's error output""" + cls.errstore = {} + def tailer(): + while True: + errstore = cls.errstore.copy() + try: + poe, _ ,_ = select([po.stderr for po in errstore], [], [], 1) + except (ValueError, SelectError): + continue + for po in errstore: + if po.stderr not in poe: + continue + po.lock.acquire() + try: + if po.on_death_row: + continue + la = errstore[po] + try: + fd = po.stderr.fileno() + except ValueError: # file is already closed + continue + l = os.read(fd, 1024) + if not l: + continue + tots = len(l) + for lx in la: + tots += len(lx) + while tots > 1<<20 and la: + tots -= len(la.pop(0)) + la.append(l) + finally: + po.lock.release() + t = syncdutils.Thread(target = tailer) + t.start() + cls.errhandler = t + + @classmethod + def fork(cls): + """fork wrapper that restarts errhandler thread in child""" + pid = os.fork() + if not pid: + cls.init_errhandler() + return pid + + def __init__(self, args, *a, **kw): + """customizations for subprocess.Popen instantiation + + - 'close_fds' is taken to be the default + - if child's stderr is chosen to be managed, + register it with the error handler thread + """ + self.args = args + if 'close_fds' not in kw: + kw['close_fds'] = True + self.lock = threading.Lock() + self.on_death_row = False + try: + sup(self, args, *a, **kw) + except: + ex = sys.exc_info()[1] + if not isinstance(ex, OSError): + raise + raise GsyncdError("""execution of "%s" failed with %s (%s)""" % \ + (args[0], errno.errorcode[ex.errno], os.strerror(ex.errno))) + if kw.get('stderr') == subprocess.PIPE: + assert(getattr(self, 'errhandler', None)) + self.errstore[self] = [] + + def errlog(self): + """make a log about child's failure event""" + filling = "" + if self.elines: + filling = ", saying:" + logging.error("""command "%s" returned with %s%s""" % \ + (" ".join(self.args), repr(self.returncode), filling)) + lp = '' + def logerr(l): + logging.error(self.args[0] + "> " + l) + for l in self.elines: + ls = l.split('\n') + ls[0] = lp + ls[0] + lp = ls.pop() + for ll in ls: + logerr(ll) + if lp: + logerr(lp) + + def errfail(self): + """fail nicely if child did not terminate with success""" + self.errlog() + syncdutils.finalize(exval = 1) + + def terminate_geterr(self, fail_on_err = True): + """kill child, finalize stderr harvesting (unregister + from errhandler, set up .elines), fail on error if + asked for + """ + self.lock.acquire() + try: + self.on_death_row = True + finally: + self.lock.release() + elines = self.errstore.pop(self) + if self.poll() == None: + self.terminate() + if self.poll() == None: + time.sleep(0.1) + self.kill() + self.wait() + while True: + if not select([self.stderr],[],[],0.1)[0]: + break + b = os.read(self.stderr.fileno(), 1024) + if b: + elines.append(b) + else: + break + self.stderr.close() + self.elines = elines + if fail_on_err and self.returncode != 0: + self.errfail() + + +class Server(object): + """singleton implemening those filesystem access primitives + which are needed for geo-replication functionality + + (Singleton in the sense it's a class which has only static + and classmethods and is used directly, without instantiation.) + """ + + GX_NSPACE = (privileged() and "trusted" or "system") + ".glusterfs" + NTV_FMTSTR = "!" + "B"*19 + "II" + FRGN_XTRA_FMT = "I" + FRGN_FMTSTR = NTV_FMTSTR + FRGN_XTRA_FMT + + def _pathguard(f): + """decorator method that checks + the path argument of the decorated + functions to make sure it does not + point out of the managed tree + """ + + fc = getattr(f, 'func_code', None) + if not fc: + # python 3 + fc = f.__code__ + pi = list(fc.co_varnames).index('path') + def ff(*a): + path = a[pi] + ps = path.split('/') + if path[0] == '/' or '..' in ps: + raise ValueError('unsafe path') + return f(*a) + return ff + + @staticmethod + @_pathguard + def entries(path): + """directory entries in an array""" + # prevent symlinks being followed + if not stat.S_ISDIR(os.lstat(path).st_mode): + raise OSError(ENOTDIR, os.strerror(ENOTDIR)) + return os.listdir(path) + + @classmethod + @_pathguard + def purge(cls, path, entries=None): + """force-delete subtrees + + If @entries is not specified, delete + the whole subtree under @path (including + @path). + + Otherwise, @entries should be a + a sequence of children of @path, and + the effect is identical with a joint + @entries-less purge on them, ie. + + for e in entries: + cls.purge(os.path.join(path, e)) + """ + me_also = entries == None + if not entries: + try: + # if it's a symlink, prevent + # following it + try: + os.unlink(path) + return + except OSError: + ex = sys.exc_info()[1] + if ex.errno == EISDIR: + entries = os.listdir(path) + else: + raise + except OSError: + ex = sys.exc_info()[1] + if ex.errno in (ENOTDIR, ENOENT, ELOOP): + try: + os.unlink(path) + return + except OSError: + ex = sys.exc_info()[1] + if ex.errno == ENOENT: + return + raise + else: + raise + for e in entries: + cls.purge(os.path.join(path, e)) + if me_also: + os.rmdir(path) + + @classmethod + @_pathguard + def _create(cls, path, ctor): + """path creation backend routine""" + try: + ctor(path) + except OSError: + ex = sys.exc_info()[1] + if ex.errno == EEXIST: + cls.purge(path) + return ctor(path) + raise + + @classmethod + @_pathguard + def mkdir(cls, path): + cls._create(path, os.mkdir) + + @classmethod + @_pathguard + def symlink(cls, lnk, path): + cls._create(path, lambda p: os.symlink(lnk, p)) + + @classmethod + @_pathguard + def xtime(cls, path, uuid): + """query xtime extended attribute + + Return xtime of @path for @uuid as a pair of integers. + "Normal" errors due to non-existent @path or extended attribute + are tolerated and errno is returned in such a case. + """ + + try: + return struct.unpack('!II', Xattr.lgetxattr(path, '.'.join([cls.GX_NSPACE, uuid, 'xtime']), 8)) + except OSError: + ex = sys.exc_info()[1] + if ex.errno in (ENOENT, ENODATA, ENOTDIR): + return ex.errno + else: + raise + + @classmethod + def xtime_vec(cls, path, *uuids): + """vectored version of @xtime + + accepts a list of uuids and returns a dictionary + with uuid as key(s) and xtime as value(s) + """ + xt = {} + for uuid in uuids: + xtu = cls.xtime(path, uuid) + if xtu == ENODATA: + xtu = None + if isinstance(xtu, int): + return xtu + xt[uuid] = xtu + return xt + + @classmethod + @_pathguard + def set_xtime(cls, path, uuid, mark): + """set @mark as xtime for @uuid on @path""" + Xattr.lsetxattr(path, '.'.join([cls.GX_NSPACE, uuid, 'xtime']), struct.pack('!II', *mark)) + + @classmethod + def set_xtime_vec(cls, path, mark_dct): + """vectored (or dictered) version of set_xtime + + ignore values that match @ignore + """ + for u,t in mark_dct.items(): + cls.set_xtime(path, u, t) + + @staticmethod + @_pathguard + def setattr(path, adct): + """set file attributes + + @adct is a dict, where 'own', 'mode' and 'times' + keys are looked for and values used to perform + chown, chmod or utimes on @path. + """ + own = adct.get('own') + if own: + os.lchown(path, *own) + mode = adct.get('mode') + if mode: + os.chmod(path, stat.S_IMODE(mode)) + times = adct.get('times') + if times: + os.utime(path, times) + + @staticmethod + def pid(): + return os.getpid() + + last_keep_alive = 0 + @classmethod + def keep_alive(cls, dct): + """process keepalive messages. + + Return keep-alive counter (number of received keep-alive + messages). + + Now the "keep-alive" message can also have a payload which is + used to set a foreign volume-mark on the underlying file system. + """ + if dct: + key = '.'.join([cls.GX_NSPACE, 'volume-mark', dct['uuid']]) + val = struct.pack(cls.FRGN_FMTSTR, + *(dct['version'] + + tuple(int(x,16) for x in re.findall('(?:[\da-f]){2}', dct['uuid'])) + + (dct['retval'],) + dct['volume_mark'][0:2] + (dct['timeout'],))) + Xattr.lsetxattr('.', key, val) + cls.last_keep_alive += 1 + return cls.last_keep_alive + + @staticmethod + def version(): + """version used in handshake""" + return 1.0 + + +class SlaveLocal(object): + """mix-in class to implement some factes of a slave server + + ("mix-in" is sort of like "abstract class", ie. it's not + instantiated just included in the ancesty DAG. I use "mix-in" + to indicate that it's not used as an abstract base class, + rather just taken in to implement additional functionality + on the basis of the assumed availability of certain interfaces.) + """ + + def can_connect_to(self, remote): + """determine our position in the connectibility matrix""" + return not remote + + def service_loop(self): + """start a RePCe server serving self's server + + stop servicing if a timeout is configured and got no + keep-alime in that inteval + """ + + if boolify(gconf.use_rsync_xattrs) and not privileged(): + raise GsyncdError("using rsync for extended attributes is not supported") + + repce = RepceServer(self.server, sys.stdin, sys.stdout, int(gconf.sync_jobs)) + t = syncdutils.Thread(target=lambda: (repce.service_loop(), + syncdutils.finalize())) + t.start() + logging.info("slave listening") + if gconf.timeout and int(gconf.timeout) > 0: + while True: + lp = self.server.last_keep_alive + time.sleep(int(gconf.timeout)) + if lp == self.server.last_keep_alive: + logging.info("connection inactive for %d seconds, stopping" % int(gconf.timeout)) + break + else: + select((), (), ()) + +class SlaveRemote(object): + """mix-in class to implement an interface to a remote slave""" + + def connect_remote(self, rargs=[], **opts): + """connects to a remote slave + + Invoke an auxiliary utility (slave gsyncd, possibly wrapped) + which sets up the connection and set up a RePCe client to + communicate throuh its stdio. + """ + slave = opts.get('slave', self.url) + extra_opts = [] + so = getattr(gconf, 'session_owner', None) + if so: + extra_opts += ['--session-owner', so] + if boolify(gconf.use_rsync_xattrs): + extra_opts.append('--use-rsync-xattrs') + po = Popen(rargs + gconf.remote_gsyncd.split() + extra_opts + \ + ['-N', '--listen', '--timeout', str(gconf.timeout), slave], + stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + gconf.transport = po + return self.start_fd_client(po.stdout, po.stdin, **opts) + + def start_fd_client(self, i, o, **opts): + """set up RePCe client, handshake with server + + It's cut out as a separate method to let + subclasses hook into client startup + """ + self.server = RepceClient(i, o) + rv = self.server.__version__() + exrv = {'proto': repce.repce_version, 'object': Server.version()} + da0 = (rv, exrv) + da1 = ({}, {}) + for i in range(2): + for k, v in da0[i].iteritems(): + da1[i][k] = int(v) + if da1[0] != da1[1]: + raise GsyncdError("RePCe major version mismatch: local %s, remote %s" % (exrv, rv)) + + def rsync(self, files, *args): + """invoke rsync""" + if not files: + raise GsyncdError("no files to sync") + logging.debug("files: " + ", ".join(files)) + argv = gconf.rsync_command.split() + \ + ['-aR0', '--files-from=-', '--super','--stats', '--numeric-ids', '--no-implied-dirs'] + \ + gconf.rsync_options.split() + (boolify(gconf.use_rsync_xattrs) and ['--xattrs'] or []) + \ + ['.'] + list(args) + po = Popen(argv, stdin=subprocess.PIPE,stdout=subprocess.PIPE,stderr=subprocess.PIPE) + for f in files: + po.stdin.write(f) + po.stdin.write('\0') + + po.stdin.close() + po.wait() + po.terminate_geterr(fail_on_err = False) + + return po + + +class AbstractUrl(object): + """abstract base class for url scheme classes""" + + def __init__(self, path, pattern): + m = re.search(pattern, path) + if not m: + raise GsyncdError("malformed path") + self.path = path + return m.groups() + + @property + def scheme(self): + return type(self).__name__.lower() + + def canonical_path(self): + return self.path + + def get_url(self, canonical=False, escaped=False): + """format self's url in various styles""" + if canonical: + pa = self.canonical_path() + else: + pa = self.path + u = "://".join((self.scheme, pa)) + if escaped: + u = syncdutils.escape(u) + return u + + @property + def url(self): + return self.get_url() + + + ### Concrete resource classes ### + + +class FILE(AbstractUrl, SlaveLocal, SlaveRemote): + """scheme class for file:// urls + + can be used to represent a file slave server + on slave side, or interface to a remote file + file server on master side + """ + + class FILEServer(Server): + """included server flavor""" + pass + + server = FILEServer + + def __init__(self, path): + sup(self, path, '^/') + + def connect(self): + """inhibit the resource beyond""" + os.chdir(self.path) + + def rsync(self, files): + return sup(self, files, self.path) + + +class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote): + """scheme class for gluster:// urls + + can be used to represent a gluster slave server + on slave side, or interface to a remote gluster + slave on master side, or to represent master + (slave-ish features come from the mixins, master + functionality is outsourced to GMaster from master) + """ + + class GLUSTERServer(Server): + "server enhancements for a glusterfs backend""" + + @classmethod + def _attr_unpack_dict(cls, xattr, extra_fields = ''): + """generic volume mark fetching/parsing backed""" + fmt_string = cls.NTV_FMTSTR + extra_fields + buf = Xattr.lgetxattr('.', xattr, struct.calcsize(fmt_string)) + vm = struct.unpack(fmt_string, buf) + m = re.match('(.{8})(.{4})(.{4})(.{4})(.{12})', "".join(['%02x' % x for x in vm[2:18]])) + uuid = '-'.join(m.groups()) + volinfo = { 'version': vm[0:2], + 'uuid' : uuid, + 'retval' : vm[18], + 'volume_mark': vm[19:21], + } + if extra_fields: + return volinfo, vm[-len(extra_fields):] + else: + return volinfo + + @classmethod + def foreign_volume_infos(cls): + """return list of valid (not expired) foreign volume marks""" + dict_list = [] + xattr_list = Xattr.llistxattr_buf('.') + for ele in xattr_list: + if ele.find('.'.join([cls.GX_NSPACE, 'volume-mark', ''])) == 0: + d, x = cls._attr_unpack_dict(ele, cls.FRGN_XTRA_FMT) + now = int(time.time()) + if x[0] > now: + logging.debug("volinfo[%s] expires: %d (%d sec later)" % \ + (d['uuid'], x[0], x[0] - now)) + d['timeout'] = x[0] + dict_list.append(d) + else: + try: + Xattr.lremovexattr('.', ele) + except OSError: + pass + return dict_list + + @classmethod + def native_volume_info(cls): + """get the native volume mark of the underlying gluster volume""" + try: + return cls._attr_unpack_dict('.'.join([cls.GX_NSPACE, 'volume-mark'])) + except OSError: + ex = sys.exc_info()[1] + if ex.errno != ENODATA: + raise + + server = GLUSTERServer + + def __init__(self, path): + self.host, self.volume = sup(self, path, '^(%s):(.+)' % HostRX.pattern) + + def canonical_path(self): + return ':'.join([gethostbyname(self.host), self.volume]) + + def can_connect_to(self, remote): + """determine our position in the connectibility matrix""" + return True + + class Mounter(object): + """Abstract base class for mounter backends""" + + def __init__(self, params): + self.params = params + self.mntpt = None + + @classmethod + def get_glusterprog(cls): + return os.path.join(gconf.gluster_command_dir, cls.glusterprog) + + def umount_l(self, d): + """perform lazy umount""" + po = Popen(self.make_umount_argv(d), stderr=subprocess.PIPE) + po.wait() + return po + + @classmethod + def make_umount_argv(cls, d): + raise NotImplementedError + + def make_mount_argv(self, *a): + raise NotImplementedError + + def cleanup_mntpt(self, *a): + pass + + def handle_mounter(self, po): + po.wait() + + def inhibit(self, *a): + """inhibit a gluster filesystem + + Mount glusterfs over a temporary mountpoint, + change into the mount, and lazy unmount the + filesystem. + """ + + mpi, mpo = os.pipe() + mh = Popen.fork() + if mh: + os.close(mpi) + fcntl.fcntl(mpo, fcntl.F_SETFD, fcntl.FD_CLOEXEC) + d = None + margv = self.make_mount_argv(*a) + if self.mntpt: + # mntpt is determined pre-mount + d = self.mntpt + os.write(mpo, d + '\0') + po = Popen(margv, **self.mountkw) + self.handle_mounter(po) + po.terminate_geterr() + logging.debug('auxiliary glusterfs mount in place') + if not d: + # mntpt is determined during mount + d = self.mntpt + os.write(mpo, d + '\0') + os.write(mpo, 'M') + t = syncdutils.Thread(target=lambda: os.chdir(d)) + t.start() + tlim = gconf.starttime + int(gconf.connection_timeout) + while True: + if not t.isAlive(): + break + if time.time() >= tlim: + syncdutils.finalize(exval = 1) + time.sleep(1) + os.close(mpo) + _, rv = syncdutils.waitpid(mh, 0) + if rv: + rv = (os.WIFEXITED(rv) and os.WEXITSTATUS(rv) or 0) - \ + (os.WIFSIGNALED(rv) and os.WTERMSIG(rv) or 0) + logging.warn('stale mount possibly left behind on ' + d) + raise GsyncdError("cleaning up temp mountpoint %s failed with status %d" % \ + (d, rv)) + else: + rv = 0 + try: + os.setsid() + os.close(mpo) + mntdata = '' + while True: + c = os.read(mpi, 1) + if not c: + break + mntdata += c + if mntdata: + mounted = False + if mntdata[-1] == 'M': + mntdata = mntdata[:-1] + assert(mntdata) + mounted = True + assert(mntdata[-1] == '\0') + mntpt = mntdata[:-1] + assert(mntpt) + if mounted: + po = self.umount_l(mntpt) + po.terminate_geterr(fail_on_err = False) + if po.returncode != 0: + po.errlog() + rv = po.returncode + self.cleanup_mntpt(mntpt) + except: + logging.exception('mount cleanup failure:') + rv = 200 + os._exit(rv) + logging.debug('auxiliary glusterfs mount prepared') + + class DirectMounter(Mounter): + """mounter backend which calls mount(8), umount(8) directly""" + + mountkw = {'stderr': subprocess.PIPE} + glusterprog = 'glusterfs' + + @staticmethod + def make_umount_argv(d): + return ['umount', '-l', d] + + def make_mount_argv(self): + self.mntpt = tempfile.mkdtemp(prefix = 'gsyncd-aux-mount-') + return [self.get_glusterprog()] + ['--' + p for p in self.params] + [self.mntpt] + + def cleanup_mntpt(self, mntpt = None): + if not mntpt: + mntpt = self.mntpt + os.rmdir(mntpt) + + class MountbrokerMounter(Mounter): + """mounter backend using the mountbroker gluster service""" + + mountkw = {'stderr': subprocess.PIPE, 'stdout': subprocess.PIPE} + glusterprog = 'gluster' + + @classmethod + def make_cli_argv(cls): + return [cls.get_glusterprog()] + gconf.gluster_cli_options.split() + ['system::'] + + @classmethod + def make_umount_argv(cls, d): + return cls.make_cli_argv() + ['umount', d, 'lazy'] + + def make_mount_argv(self, label): + return self.make_cli_argv() + \ + ['mount', label, 'user-map-root=' + syncdutils.getusername()] + self.params + + def handle_mounter(self, po): + self.mntpt = po.stdout.readline()[:-1] + po.stdout.close() + sup(self, po) + if po.returncode != 0: + # if cli terminated with error due to being + # refused by glusterd, what it put + # out on stdout is a diagnostic message + logging.error('glusterd answered: %s' % self.mntpt) + + def connect(self): + """inhibit the resource beyond + + Choose mounting backend (direct or mountbroker), + set up glusterfs parameters and perform the mount + with given backend + """ + + label = getattr(gconf, 'mountbroker', None) + if not label and not privileged(): + label = syncdutils.getusername() + mounter = label and self.MountbrokerMounter or self.DirectMounter + params = gconf.gluster_params.split() + \ + (gconf.gluster_log_level and ['log-level=' + gconf.gluster_log_level] or []) + \ + ['log-file=' + gconf.gluster_log_file, 'volfile-server=' + self.host, + 'volfile-id=' + self.volume, 'client-pid=-1'] + mounter(params).inhibit(*[l for l in [label] if l]) + + def connect_remote(self, *a, **kw): + sup(self, *a, **kw) + self.slavedir = "/proc/%d/cwd" % self.server.pid() + + def service_loop(self, *args): + """enter service loop + + - if slave given, instantiate GMaster and + pass control to that instance, which implements + master behavior + - else do that's what's inherited + """ + if args: + gmaster_builder()(self, args[0]).crawl_loop() + else: + sup(self, *args) + + def rsync(self, files): + return sup(self, files, self.slavedir) + + +class SSH(AbstractUrl, SlaveRemote): + """scheme class for ssh:// urls + + interface to remote slave on master side + implementing an ssh based proxy + """ + + def __init__(self, path): + self.remote_addr, inner_url = sup(self, path, + '^((?:%s@)?%s):(.+)' % tuple([ r.pattern for r in (UserRX, HostRX) ])) + self.inner_rsc = parse_url(inner_url) + + def canonical_path(self): + m = re.match('([^@]+)@(.+)', self.remote_addr) + if m: + u, h = m.groups() + else: + u, h = syncdutils.getusername(), self.remote_addr + remote_addr = '@'.join([u, gethostbyname(h)]) + return ':'.join([remote_addr, self.inner_rsc.get_url(canonical=True)]) + + def can_connect_to(self, remote): + """determine our position in the connectibility matrix""" + return False + + def start_fd_client(self, *a, **opts): + """customizations for client startup + + - be a no-op if we are to daemonize (client startup is deferred + to post-daemon stage) + - determine target url for rsync after consulting server + """ + if opts.get('deferred'): + return a + sup(self, *a) + ityp = type(self.inner_rsc) + if ityp == FILE: + slavepath = self.inner_rsc.path + elif ityp == GLUSTER: + slavepath = "/proc/%d/cwd" % self.server.pid() + else: + raise NotImplementedError + self.slaveurl = ':'.join([self.remote_addr, slavepath]) + + def connect_remote(self, go_daemon=None): + """connect to inner slave url through outer ssh url + + Wrap the connecting utility in ssh. + + Much care is put into daemonizing: in that case + ssh is started before daemonization, but + RePCe client is to be created after that (as ssh + interactive password auth would be defeated by + a daemonized ssh, while client should be present + only in the final process). In that case the action + is taken apart to two parts, this method is ivoked + once pre-daemon, once post-daemon. Use @go_daemon + to deiced what part to perform. + + [NB. ATM gluster product does not makes use of interactive + authentication.] + """ + if go_daemon == 'done': + return self.start_fd_client(*self.fd_pair) + gconf.setup_ssh_ctl(tempfile.mkdtemp(prefix='gsyncd-aux-ssh-')) + deferred = go_daemon == 'postconn' + ret = sup(self, gconf.ssh_command.split() + gconf.ssh_ctl_args + [self.remote_addr], slave=self.inner_rsc.url, deferred=deferred) + if deferred: + # send a message to peer so that we can wait for + # the answer from which we know connection is + # established and we can proceed with daemonization + # (doing that too early robs the ssh passwd prompt...) + # However, we'd better not start the RepceClient + # before daemonization (that's not preserved properly + # in daemon), we just do a an ad-hoc linear put/get. + i, o = ret + inf = os.fdopen(i) + repce.send(o, None, '__repce_version__') + select((inf,), (), ()) + repce.recv(inf) + # hack hack hack: store a global reference to the file + # to save it from getting GC'd which implies closing it + gconf.permanent_handles.append(inf) + self.fd_pair = (i, o) + return 'should' + + def rsync(self, files): + return sup(self, files, '-e', " ".join(gconf.ssh_command.split() + gconf.ssh_ctl_args), + *(gconf.rsync_ssh_options.split() + [self.slaveurl])) -- cgit