From 85300e25f2d47e33b169d14fa9eb0b7cfe39011b Mon Sep 17 00:00:00 2001 From: Csaba Henk Date: Thu, 27 Jan 2011 05:23:35 +0000 Subject: adding syncdaemon Signed-off-by: Csaba Henk Signed-off-by: Anand V. Avati BUG: 2310 (georeplication) URL: http://bugs.gluster.com/cgi-bin/bugzilla3/show_bug.cgi?id=2310 --- xlators/features/marker/Makefile.am | 2 +- xlators/features/marker/utils/Makefile.am | 7 + xlators/features/marker/utils/gsyncd.in | 7 + .../features/marker/utils/syncdaemon/Makefile.am | 5 + xlators/features/marker/utils/syncdaemon/README.md | 81 +++++ .../features/marker/utils/syncdaemon/__init__.py | 0 xlators/features/marker/utils/syncdaemon/gconf.py | 13 + xlators/features/marker/utils/syncdaemon/gsyncd.py | 230 +++++++++++++ xlators/features/marker/utils/syncdaemon/master.py | 240 +++++++++++++ xlators/features/marker/utils/syncdaemon/repce.py | 150 +++++++++ .../features/marker/utils/syncdaemon/resource.py | 372 +++++++++++++++++++++ .../features/marker/utils/syncdaemon/simplecfg.py | 77 +++++ 12 files changed, 1183 insertions(+), 1 deletion(-) create mode 100644 xlators/features/marker/utils/Makefile.am create mode 100755 xlators/features/marker/utils/gsyncd.in create mode 100644 xlators/features/marker/utils/syncdaemon/Makefile.am create mode 100644 xlators/features/marker/utils/syncdaemon/README.md create mode 100644 xlators/features/marker/utils/syncdaemon/__init__.py create mode 100644 xlators/features/marker/utils/syncdaemon/gconf.py create mode 100644 xlators/features/marker/utils/syncdaemon/gsyncd.py create mode 100644 xlators/features/marker/utils/syncdaemon/master.py create mode 100644 xlators/features/marker/utils/syncdaemon/repce.py create mode 100644 xlators/features/marker/utils/syncdaemon/resource.py create mode 100644 xlators/features/marker/utils/syncdaemon/simplecfg.py (limited to 'xlators') diff --git a/xlators/features/marker/Makefile.am b/xlators/features/marker/Makefile.am index a985f42a877..a6ba2de16ae 100644 --- a/xlators/features/marker/Makefile.am +++ b/xlators/features/marker/Makefile.am @@ -1,3 +1,3 @@ -SUBDIRS = src +SUBDIRS = src @SYNCDAEMON_SUBDIR@ CLEANFILES = diff --git a/xlators/features/marker/utils/Makefile.am b/xlators/features/marker/utils/Makefile.am new file mode 100644 index 00000000000..8aefea4011b --- /dev/null +++ b/xlators/features/marker/utils/Makefile.am @@ -0,0 +1,7 @@ +SUBDIRS = syncdaemon + +gsyncddir = $(libexecdir) + +gsyncd_SCRIPTS = gsyncd + +CLEANFILES = diff --git a/xlators/features/marker/utils/gsyncd.in b/xlators/features/marker/utils/gsyncd.in new file mode 100755 index 00000000000..9bbf8041f36 --- /dev/null +++ b/xlators/features/marker/utils/gsyncd.in @@ -0,0 +1,7 @@ +#!/bin/sh + +prefix="@prefix@" +exec_prefix="@exec_prefix@" +libexecdir=`eval echo "@libexecdir@"` + +PYTHONPATH="$libexecdir"/python exec @PYTHON@ -c "from syncdaemon import gsyncd; gsyncd.main()" -c @sysconfdir@/glusterfs/gsyncd.conf "$@" diff --git a/xlators/features/marker/utils/syncdaemon/Makefile.am b/xlators/features/marker/utils/syncdaemon/Makefile.am new file mode 100644 index 00000000000..1d5014a9eb3 --- /dev/null +++ b/xlators/features/marker/utils/syncdaemon/Makefile.am @@ -0,0 +1,5 @@ +syncdaemondir = $(libexecdir)/python/syncdaemon + +syncdaemon_PYTHON = gconf.py gsyncd.py __init__.py master.py README.md repce.py resource.py simplecfg.py + +CLEANFILES = diff --git a/xlators/features/marker/utils/syncdaemon/README.md b/xlators/features/marker/utils/syncdaemon/README.md new file mode 100644 index 00000000000..d45006932d1 --- /dev/null +++ b/xlators/features/marker/utils/syncdaemon/README.md @@ -0,0 +1,81 @@ +gsycnd, the Gluster Syncdaemon +============================== + +REQUIREMENTS +------------ + +_gsyncd_ is a program which can operate either in _master_ or in _slave_ mode. +Requirements are categorized according to this. + +* supported OS is GNU/Linux +* Python >= 2.5, or 2.4 with Ctypes (see below) (both) +* OpenSSH >= 4.0 (master) / SSH2 compliant sshd (eg. openssh) (slave) +* rsync (both) +* glusterfs with marker support (master); glusterfs (optional on slave) +* FUSE; for supported versions consult glusterfs + +INSTALLATION +------------ + +As of now, the supported way of operation is running from the source directory. + +If you use Python 2.4.x, you need to install the [Ctypes module](http://python.net/crew/theller/ctypes/). + +CONFIGURATION +------------- + +gsyncd tunables are a subset of the long command-line options; for listing them, +type + + gsyncd.py --help + +and see the long options up to "--config-file". (The leading double dash should be omitted; +interim underscores and dashes are interchangeable.) The set of options bear some resemblance +to those of glusterfs and rsync. + +The config file format matches the following syntax: + + : + : + # comment + +By default (unless specified by the option `-c`), gsyncd looks for config file at _conf/gsyncd.conf_ +in the source tree. + +USAGE +----- + +gsyncd is a utilitly for continous mirroring, ie. it mirrors master to slave incrementally. +Assume we have a gluster volume _pop_ at localhost. We try to set up the following mirrors +for it with gysncd: + +1. _/data/mirror_ +2. local gluster volume _yow_ +3. _/data/far_mirror_ at example.com +4. gluster volume _moz_ at example.com + +The respective gsyncd invocations are (demoing some syntax sugaring): + +1. + + gsyncd.py gluster://localhost:pop file:///data/mirror + + or short form + + gsyncd.py :pop /data/mirror + +2. `gsyncd :pop :yow` +3. + + gsyncd.py :pop ssh://example.com:/data/far_mirror + + or short form + + gsyncd.py :pop example.com:/data/far_mirror + +4. `gsyncd.py :pop example.com::moz` + +gsyncd has to be available on both sides; it's location on the remote side has to be specified +via the "--remote-gsyncd" option (or "remote-gsyncd" config file parameter). (This option can also be +used for setting options on the remote side, although the suggested mode of operation is to +set parameters like log file / pid file in the configuration file.) diff --git a/xlators/features/marker/utils/syncdaemon/__init__.py b/xlators/features/marker/utils/syncdaemon/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/xlators/features/marker/utils/syncdaemon/gconf.py b/xlators/features/marker/utils/syncdaemon/gconf.py new file mode 100644 index 00000000000..7bedce5148a --- /dev/null +++ b/xlators/features/marker/utils/syncdaemon/gconf.py @@ -0,0 +1,13 @@ +import os + +class GConf(object): + ssh_ctl_dir = None + ssh_ctl_args = None + cpid = None + + @classmethod + def setup_ssh_ctl(cls, ctld): + cls.ssh_ctl_dir = ctld + cls.ssh_ctl_args = ["-oControlMaster=auto", "-S", os.path.join(ctld, "gsycnd-ssh-%r@%h:%p")] + +gconf = GConf() diff --git a/xlators/features/marker/utils/syncdaemon/gsyncd.py b/xlators/features/marker/utils/syncdaemon/gsyncd.py new file mode 100644 index 00000000000..f84df502185 --- /dev/null +++ b/xlators/features/marker/utils/syncdaemon/gsyncd.py @@ -0,0 +1,230 @@ +#!/usr/bin/env python + +import os +import os.path +import sys +import time +import logging +import signal +import select +import shutil +import optparse +from optparse import OptionParser, SUPPRESS_HELP +from logging import Logger +from errno import EEXIST, ENOENT + +from gconf import gconf +import resource +from simplecfg import SimpleCfg + +class GLogger(Logger): + + def makeRecord(self, name, level, *a): + rv = Logger.makeRecord(self, name, level, *a) + rv.nsecs = (rv.created - int(rv.created)) * 1000000 + fr = sys._getframe(4) + callee = fr.f_locals.get('self') + if callee: + ctx = str(type(callee)).split("'")[1].split('.')[-1] + else: + ctx = '' + if not hasattr(rv, 'funcName'): + rv.funcName = fr.f_code.co_name + rv.lvlnam = logging.getLevelName(level)[0] + rv.ctx = ctx + return rv + + @classmethod + def setup(cls, **kw): + lprm = {'datefmt': "%Y-%m-%d %H:%M:%S", + 'format': "[%(asctime)s.%(nsecs)d] %(lvlnam)s [%(module)s:%(lineno)s:%(funcName)s] %(ctx)s: %(message)s"} + lprm.update(kw) + lvl = kw.get('level', logging.INFO) + if isinstance(lvl, str): + lvl = logging.getLevelName(lvl) + lprm['level'] = lvl + logging.root = cls("root", lvl) + logging.setLoggerClass(cls) + logging.getLogger().handlers = [] + logging.basicConfig(**lprm) + + +def startup(**kw): + def write_pid(fn): + fd = None + try: + fd = os.open(fn, os.O_CREAT|os.O_TRUNC|os.O_WRONLY|os.O_EXCL) + os.write(fd, str(os.getpid()) + '\n') + finally: + if fd: + os.close(fd) + + if kw.get('go_daemon') == 'should': + x, y = os.pipe() + gconf.cpid = os.fork() + if gconf.cpid: + os.close(x) + sys.exit() + os.close(y) + # wait for parent to terminate + # so we can start up with + # no messing from the dirty + # ol' bustard + select.select((x,), (), ()) + os.close(x) + if getattr(gconf, 'pid_file', None): + write_pid(gconf.pid_file + '.tmp') + os.rename(gconf.pid_file + '.tmp', gconf.pid_file) + os.setsid() + dn = os.open(os.devnull, os.O_RDWR) + for f in (sys.stdin, sys.stdout, sys.stderr): + os.dup2(dn, f.fileno()) + elif getattr(gconf, 'pid_file', None): + try: + write_pid(gconf.pid_file) + except OSError: + gconf.pid_file = None + ex = sys.exc_info()[1] + if ex.errno == EEXIST: + sys.stderr.write("pidfile is taken, exiting.\n") + exit(2) + raise + + lkw = {'level': gconf.log_level} + if kw.get('log_file'): + lkw['filename'] = kw['log_file'] + GLogger.setup(**lkw) + +def finalize(*a): + if getattr(gconf, 'pid_file', None): + if gconf.cpid: + while True: + f = open(gconf.pid_file) + pid = f.read() + f.close() + pid = int(pid.strip()) + if pid == gconf.cpid: + break + if pid != os.getpid(): + raise RuntimeError("corrupt pidfile") + if os.waitpid(gconf.cpid, os.WNOHANG)[0] == gconf.cpid: + break; + time.sleep(0.1) + else: + try: + os.unlink(gconf.pid_file) + except: + ex = sys.exc_info()[1] + if ex.errno == ENOENT: + pass + else: + raise + if gconf.ssh_ctl_dir and not gconf.cpid: + shutil.rmtree(gconf.ssh_ctl_dir) + +def main(): + # ??? "finally" clause does not take effect with SIGTERM... + # but this handler neither does + # signal.signal(signal.SIGTERM, finalize) + GLogger.setup() + try: + try: + main_i() + except: + exc = sys.exc_info()[0] + if exc != SystemExit: + logging.exception("FAIL: ") + sys.stderr.write("failed with %s.\n" % exc.__name__) + exit(1) + finally: + finalize() + +def main_i(): + rconf = {'go_daemon': 'should'} + + def store_abs(opt, optstr, val, parser): + setattr(parser.values, opt.dest, os.path.abspath(val)) + def store_local(opt, optstr, val, parser): + rconf[opt.dest] = val + def store_local_curry(val): + return lambda o, oo, vx, p: store_local(o, oo, val, p) + + op = OptionParser(usage="%prog [options...] ", version="%prog 0.0.1") + op.add_option('--gluster-command', metavar='CMD', default='glusterfs') + op.add_option('--gluster-log-file', metavar='LOGF', default=os.devnull, type=str, action='callback', callback=store_abs) + op.add_option('--gluster-log-level', metavar='LVL') + op.add_option('-p', '--pid-file', metavar='PIDF', type=str, action='callback', callback=store_abs) + op.add_option('-l', '--log-file', metavar='LOGF', type=str, action='callback', callback=store_abs) + op.add_option('-L', '--log-level', metavar='LVL') + op.add_option('-r', '--remote-gsyncd', metavar='CMD', default=os.path.abspath(sys.argv[0])) + op.add_option('-s', '--ssh-command', metavar='CMD', default='ssh') + op.add_option('--rsync-command', metavar='CMD', default='rsync') + op.add_option('--rsync-extra', metavar='ARGS', default='-sS', help=SUPPRESS_HELP) + op.add_option('--timeout', metavar='SEC', type=int, default=30) + op.add_option('--sync-jobs', metavar='N', type=int, default=3) + op.add_option('--turns', metavar='N', type=int, default=0, help=SUPPRESS_HELP) + + op.add_option('-c', '--config-file', metavar='CONF', type=str, action='callback', callback=store_local) + # duh. need to specify dest or value will be mapped to None :S + op.add_option('--listen', dest='listen', help=SUPPRESS_HELP, action='callback', callback=store_local_curry(True)) + op.add_option('-N', '--no-daemon', dest="go_daemon", action='callback', callback=store_local_curry('dont')) + op.add_option('--debug', dest="go_daemon", action='callback', callback=lambda *a: (store_local_curry('dont')(*a), + a[-1].values.__dict__.get('log_level') or \ + a[-1].values.__dict__.update(log_level='DEBUG'))) + # precedence for sources of values: 1) commandline, 2) cfg file, 3) defaults + # -- for this to work out we need to tell apart defaults from explicitly set + # options... so churn out the defaults here and call the parser with virgin + # values container. + defaults = op.get_default_values() + opts, args = op.parse_args(values=optparse.Values()) + if not (len(args) == 2 or (len(args) == 1 and rconf.get('listen'))): + sys.stderr.write("error: incorrect number of arguments\n\n") + sys.stderr.write(op.get_usage() + "\n") + sys.exit(1) + + gconf.__dict__.update(defaults.__dict__) + # XXX add global config support + if not 'config_file' in rconf: + rconf['config_file'] = os.path.join(os.path.dirname(sys.argv[0]), "conf/gsyncd.conf") + try: + cfg = SimpleCfg() + cfg.read(rconf['config_file']) + gconf.__dict__.update(cfg) + except IOError: + ex = sys.exc_info()[1] + if ex.errno != ENOENT: + raise + gconf.__dict__.update(opts.__dict__) + + local = resource.parse_url(args[0]) + remote = None + if len(args) > 1: + remote = resource.parse_url(args[1]) + + if not local.can_connect_to(remote): + raise RuntimeError("%s cannot work with %s" % (local.path, remote and remote.path)) + + go_daemon = rconf['go_daemon'] + + if isinstance(remote, resource.SSH) and go_daemon == 'should': + go_daemon = 'postconn' + log_file = None + else: + log_file = gconf.log_file + startup(go_daemon=go_daemon, log_file=log_file) + + logging.info("syncing: %s" % " -> ".join([x.url for x in [local, remote] if x])) + if remote: + go_daemon = remote.connect_remote(go_daemon=go_daemon) + if go_daemon: + startup(go_daemon=go_daemon, log_file=gconf.log_file) + # complete remote connection in child + remote.connect_remote(go_daemon='done') + local.connect() + local.service_loop(*[r for r in [remote] if r]) + + logging.info("exiting.") + + +if __name__ == "__main__": + main() diff --git a/xlators/features/marker/utils/syncdaemon/master.py b/xlators/features/marker/utils/syncdaemon/master.py new file mode 100644 index 00000000000..a2f9f718eb4 --- /dev/null +++ b/xlators/features/marker/utils/syncdaemon/master.py @@ -0,0 +1,240 @@ +import os +import sys +import time +import stat +import signal +import logging +import errno +from errno import ENOENT, ENODATA +from threading import Thread, currentThread, Condition, Lock + +from gconf import gconf + +URXTIME = (-1, 0) + +class GMaster(object): + + def get_volinfo(self): + self.volume_info = self.master.server.volume_info() + if self.volume_info['retval']: + raise RuntimeError("master is corrupt") + return self.volume_info + + @property + def uuid(self): + if not getattr(self, '_uuid', None): + self._uuid = self.volume_info['uuid'] + return self._uuid + + @property + def volmark(self): + return self.volume_info['volume_mark'] + + def xtime(self, path, *a, **opts): + if a: + rsc = a[0] + else: + rsc = self.master + if not 'create' in opts: + opts['create'] = rsc == self.master + xt = rsc.server.xtime(path, self.uuid) + if (isinstance(xt, int) or xt < self.volmark) and opts['create']: + t = time.time() + sec = int(t) + nsec = int((t - sec) * 1000000) + xt = (sec, nsec) + rsc.server.set_xtime(path, self.uuid, xt) + if xt == ENODATA: + xt = URXTIME + return xt + + def __init__(self, master, slave): + self.master = master + self.slave = slave + self.get_volinfo() + self.jobtab = {} + self.syncer = Syncer(slave) + self.total_turns = int(gconf.turns) + self.turns = 0 + self.start = None + self.change_seen = None + logging.info('master started on ' + self.uuid) + while True: + self.crawl() + + def add_job(self, path, label, job, *a, **kw): + if self.jobtab.get(path) == None: + self.jobtab[path] = [] + self.jobtab[path].append((label, a, lambda : job(*a, **kw))) + + def wait(self, path, mark): + jobs = self.jobtab.pop(path, []) + succeed = True + for j in jobs: + ret = j[-1]() + if not ret: + succeed = False + if succeed: + self.sendmark(path, mark) + return succeed + + def sendmark(self, path, mark): + self.slave.server.set_xtime(path, self.uuid, mark) + + def crawl(self, path='.', xtl=None): + if path == '.': + if self.start: + logging.info("crawl took %.6f" % (time.time() - self.start)) + time.sleep(1) + self.start = time.time() + logging.info("crawling...") + self.get_volinfo() + if self.volume_info['uuid'] != self.uuid: + raise RuntimeError("master uuid mismatch") + logging.debug("entering " + path) + if not xtl: + xtl = self.xtime(path) + xtr0 = self.xtime(path, self.slave) + if isinstance(xtr0, int): + xtr = URXTIME + else: + xtr = xtr0 + if xtr0 == ENOENT: + self.slave.server.mkdir(path) + else: + if xtr > xtl: + raise RuntimeError("timestamp corruption for " + path) + if xtl == xtr: + if path == '.' and self.total_turns and self.change_seen: + self.turns += 1 + self.change_seen = False + logging.info("finished turn #%s/%s" % (self.turns, self.total_turns)) + if self.turns == self.total_turns: + logging.info("reached turn limit, terminating.") + os.kill(os.getpid(), signal.SIGTERM) + return + if path == '.': + self.change_seen = True + dem, des = ( x.server.entries(path) for x in (self.master, self.slave) ) + dd = set(des) - set(dem) + if dd: + self.slave.server.purge(path, dd) + chld = [] + for e in dem: + e = os.path.join(path, e) + xte = self.xtime(e) + if isinstance(xte, int): + logging.warn("irregular xtime for %s: %s" % (e, errno.errorcode[xte])) + elif xte > xtr: + chld.append((e, xte)) + def indulgently(e, fnc, blame=None): + if not blame: + blame = path + try: + return fnc(e) + except (IOError, OSError): + ex = sys.exc_info()[1] + if ex.errno == ENOENT: + logging.warn("salvaged ENOENT for" + e) + self.add_job(blame, 'salvage', lambda: False) + return False + else: + raise + for e, xte in chld: + mo = indulgently(e, lambda e: os.lstat(e).st_mode) + if mo == False: + continue + if stat.S_ISLNK(mo): + self.slave.server.symlink(os.readlink(e), e) + self.sendmark(e, xte) + elif stat.S_ISREG(mo): + logging.debug("syncing %s ..." % e) + pb = self.syncer.add(e) + def regjob(e, xte, pb): + if pb.wait(): + logging.debug("synced " + e) + self.sendmark(e, xte) + return True + else: + logging.error("failed to sync " + e) + self.add_job(path, 'reg', regjob, e, xte, pb) + elif stat.S_ISDIR(mo): + if indulgently(e, lambda e: (self.add_job(path, 'cwait', self.wait, e, xte), + self.crawl(e, xte), + True)[-1], blame=e) == False: + continue + else: + # ignore fifos, sockets and special files + pass + if path == '.': + self.wait(path, xtl) + +class BoxClosedErr(Exception): + pass + +class PostBox(list): + + def __init__(self, *a): + list.__init__(self, *a) + self.lever = Condition() + self.open = True + self.done = False + + def wait(self): + self.lever.acquire() + if not self.done: + self.lever.wait() + self.lever.release() + return self.result + + def wakeup(self, data): + self.result = data + self.lever.acquire() + self.done = True + self.lever.notifyAll() + self.lever.release() + + def append(self, e): + self.lever.acquire() + if not self.open: + raise BoxClosedErr + list.append(self, e) + self.lever.release() + + def close(self): + self.lever.acquire() + self.open = False + self.lever.release() + +class Syncer(object): + + def __init__(self, slave): + self.slave = slave + self.lock = Lock() + self.pb = PostBox() + for i in range(int(gconf.sync_jobs)): + t = Thread(target=self.syncjob) + t.setDaemon = True + t.start() + + def syncjob(self): + while True: + pb = None + while True: + self.lock.acquire() + if self.pb: + pb, self.pb = self.pb, PostBox() + self.lock.release() + if pb: + break + time.sleep(0.5) + pb.close() + pb.wakeup(self.slave.rsync(pb)) + + def add(self, e): + while True: + try: + self.pb.append(e) + return self.pb + except BoxClosedErr: + pass diff --git a/xlators/features/marker/utils/syncdaemon/repce.py b/xlators/features/marker/utils/syncdaemon/repce.py new file mode 100644 index 00000000000..f878d481a6c --- /dev/null +++ b/xlators/features/marker/utils/syncdaemon/repce.py @@ -0,0 +1,150 @@ +import os +import sys +import select +import time +import logging +from threading import Thread, Condition +try: + import thread +except ImportError: + # py 3 + import _thread as thread +try: + from Queue import Queue +except ImportError: + # py 3 + from queue import Queue +try: + import cPickle as pickle +except ImportError: + # py 3 + import pickle + +pickle_proto = -1 + +def ioparse(i, o): + if isinstance(i, int): + i = os.fdopen(i) + # rely on duck typing for recognizing + # streams as that works uniformly + # in py2 and py3 + if hasattr(o, 'fileno'): + o = o.fileno() + return (i, o) + +def send(out, *args): + os.write(out, pickle.dumps(args, pickle_proto)) + +def recv(inf): + return pickle.load(inf) + + +class RepceServer(object): + + def __init__(self, obj, i, o, wnum=6): + self.obj = obj + self.inf, self.out = ioparse(i, o) + self.wnum = wnum + self.q = Queue() + + def service_loop(self): + for i in range(self.wnum): + t = Thread(target=self.worker) + t.setDaemon(True) + t.start() + try: + while True: + self.q.put(recv(self.inf)) + except EOFError: + logging.info("terminating on reaching EOF.") + + def worker(self): + while True: + in_data = self.q.get(True) + rid = in_data[0] + rmeth = in_data[1] + exc = False + try: + res = getattr(self.obj, rmeth)(*in_data[2:]) + except: + res = sys.exc_info()[1] + exc = True + logging.exception("call failed: ") + send(self.out, rid, exc, res) + + +class RepceJob(object): + + def __init__(self, cbk): + self.rid = (os.getpid(), thread.get_ident(), time.time()) + self.cbk = cbk + self.lever = Condition() + self.done = False + + def __repr__(self): + return ':'.join([str(x) for x in self.rid]) + + def wait(self): + self.lever.acquire() + if not self.done: + self.lever.wait() + self.lever.release() + return self.result + + def wakeup(self, data): + self.result = data + self.lever.acquire() + self.done = True + self.lever.notify() + self.lever.release() + + +class RepceClient(object): + + def __init__(self, i, o): + self.inf, self.out = ioparse(i, o) + self.jtab = {} + t = Thread(target = self.listen) + t.setDaemon(True) + t.start() + + def listen(self): + while True: + select.select((self.inf,), (), ()) + rid, exc, res = recv(self.inf) + rjob = self.jtab.pop(rid) + if rjob.cbk: + rjob.cbk(rjob, [exc, res]) + + def push(self, meth, *args, **kw): + cbk = kw.get('cbk') + if not cbk: + def cbk(rj, res): + if res[0]: + raise res[1] + rjob = RepceJob(cbk) + self.jtab[rjob.rid] = rjob + logging.debug("call %s %s%s ..." % (repr(rjob), meth, repr(args))) + send(self.out, rjob.rid, meth, *args) + return rjob + + def __call__(self, meth, *args): + rjob = self.push(meth, *args, **{'cbk': lambda rj, res: rj.wakeup(res)}) + exc, res = rjob.wait() + if exc: + logging.error('call %s (%s) failed on peer with %s' % (repr(rjob), meth, str(type(res).__name__))) + raise res + logging.debug("call %s %s -> %s" % (repr(rjob), meth, repr(res))) + return res + + class mprx(object): + + def __init__(self, ins, meth): + self.ins = ins + self.meth = meth + + def __call__(self, *a): + return self.ins(self.meth, *a) + + def __getattr__(self, meth): + return self.mprx(self, meth) diff --git a/xlators/features/marker/utils/syncdaemon/resource.py b/xlators/features/marker/utils/syncdaemon/resource.py new file mode 100644 index 00000000000..bbf7459bb55 --- /dev/null +++ b/xlators/features/marker/utils/syncdaemon/resource.py @@ -0,0 +1,372 @@ +import re +import os +import sys +import time +import errno +import struct +import select +import logging +import tempfile +import threading +from ctypes import * +from ctypes.util import find_library +from errno import EEXIST, ENOENT, ENODATA, ENOTDIR + +from gconf import gconf +import repce +from repce import RepceServer, RepceClient +from master import GMaster + +UrlRX = re.compile('\A(\w+)://(.*)') +HostRX = re.compile('[a-z\d](?:[a-z\d.-]*[a-z\d])?', re.I) +UserRX = re.compile("[\w!\#$%&'*+-\/=?^_`{|}~]+") + +def sup(x, *a, **kw): + return getattr(super(type(x), x), sys._getframe(1).f_code.co_name)(*a, **kw) + +def desugar(ustr): + m = re.match('([^:]*):(.*)', ustr) + if m: + if not m.groups()[0]: + return "gluster://localhost" + ustr + elif '@' in m.groups()[0] or re.search('[:/]', m.groups()[1]): + return "ssh://" + ustr + else: + return "gluster://#{str}" + else: + return "file://" + os.path.abspath(ustr) + +def parse_url(ustr): + m = UrlRX.match(ustr) + if not m: + ustr = desugar(ustr) + m = UrlRX.match(ustr) + if not m: + raise RuntimeError("malformed url") + sch, path = m.groups() + this = sys.modules[__name__] + if not hasattr(this, sch.upper()): + raise RuntimeError("unknown url scheme " + sch) + return getattr(this, sch.upper())(path) + + +class Xattr(object): + + libc = CDLL(find_library("libc")) + + @classmethod + def geterrno(cls): + return c_int.in_dll(cls.libc, 'errno').value + + @classmethod + def raise_oserr(cls): + errn = cls.geterrno() + raise OSError(errn, os.strerror(errn)) + + @classmethod + def lgetxattr(cls, path, attr, siz=0): + if siz: + buf = create_string_buffer('\0' * siz) + else: + buf = None + ret = cls.libc.lgetxattr(path, attr, buf, siz) + if ret == -1: + cls.raise_oserr() + if siz: + return buf.raw[:ret] + else: + return ret + + @classmethod + def lsetxattr(cls, path, attr, val): + ret = cls.libc.lsetxattr(path, attr, val, len(val), 0) + if ret == -1: + cls.raise_oserr() + + +class Server(object): + + GX_NSPACE = "trusted.glusterfs" + + @staticmethod + def entries(path): + try: + return os.listdir(path) + except OSError: + ex = sys.exc_info()[1] + if ex.errno == ENOTDIR: + return [] + else: + raise + + @classmethod + def purge(cls, path, entries=None): + me_also = entries == None + if not entries: + try: + entries = os.listdir(path) + except OSError: + ex = sys.exc_info()[1] + if ex.errno in (ENOTDIR, ENOENT): + try: + os.unlink(path) + return + except OSError: + ex = sys.exc_info()[1] + if ex.errno != ENOENT: + raise + else: + raise + for e in entries: + cls.purge(os.path.join(path, e)) + if me_also: + os.rmdir(path) + + @classmethod + def _create(cls, path, ctor): + try: + ctor(path) + except OSError: + ex = sys.exc_info()[1] + if ex.errno == EEXIST: + cls.purge(path) + return ctor(path) + raise + + @classmethod + def mkdir(cls, path): + cls._create(path, os.mkdir) + + @classmethod + def symlink(cls, lnk, path): + cls._create(path, lambda p: os.symlink(lnk, p)) + + @classmethod + def xtime(cls, path, uuid): + try: + return struct.unpack('!II', Xattr.lgetxattr(path, '.'.join([cls.GX_NSPACE, uuid, 'xtime']), 8)) + except OSError: + ex = sys.exc_info()[1] + if ex.errno in (ENOENT, ENODATA): + return ex.errno + else: + raise + + @classmethod + def set_xtime(cls, path, uuid, mark): + Xattr.lsetxattr(path, '.'.join([cls.GX_NSPACE, uuid, 'xtime']), struct.pack('!II', *mark)) + + @staticmethod + def pid(): + return os.getpid() + + lastping = 0 + @classmethod + def ping(cls): + cls.lastping += 1 + return cls.lastping + + +class SlaveLocal(object): + + def can_connect_to(self, remote): + return not remote + + def service_loop(self): + repce = RepceServer(self.server, sys.stdin, sys.stdout, int(gconf.sync_jobs)) + t = threading.Thread(target=repce.service_loop) + t.setDaemon(True) + t.start() + logging.info("slave listening") + if gconf.timeout and int(gconf.timeout) > 0: + while True: + lp = self.server.lastping + time.sleep(int(gconf.timeout)) + if lp == self.server.lastping: + logging.info("connection inactive for %d seconds, stopping" % int(gconf.timeout)) + break + else: + select.select((), (), ()) + +class SlaveRemote(object): + + def connect_remote(self, rargs=[], **opts): + slave = opts.get('slave', self.url) + ix, ox = os.pipe() + iy, oy = os.pipe() + pid = os.fork() + if not pid: + os.close(ox) + os.dup2(ix, sys.stdin.fileno()) + os.close(iy) + os.dup2(oy, sys.stdout.fileno()) + argv = rargs + gconf.remote_gsyncd.split() + ['-N', '--listen', '--timeout', str(gconf.timeout), slave] + os.execvp(argv[0], argv) + os.close(ix) + os.close(oy) + return self.start_fd_client(iy, ox, **opts) + + def start_fd_client(self, i, o, **opts): + self.server = RepceClient(i, o) + if gconf.timeout and int(gconf.timeout) > 0: + def pinger(): + while True: + self.server.ping() + time.sleep(int(gconf.timeout) * 0.5) + t = threading.Thread(target=pinger) + t.setDaemon(True) + t.start() + + def rsync(self, files, *args): + if not files: + raise RuntimeError("no files to sync") + logging.debug("files: " + ", ".join(files)) + argv = gconf.rsync_command.split() + gconf.rsync_extra.split() + ['-aR'] + files + list(args) + return os.spawnvp(os.P_WAIT, argv[0], argv) == 0 + + +class AbstractUrl(object): + + def __init__(self, path, pattern): + m = re.search(pattern, path) + if not m: + raise RuntimeError("malformed path") + self.path = path + return m.groups() + + def scheme(self): + return type(self).__name__.lower() + + @property + def url(self): + return "://".join((self.scheme(), self.path)) + + + ### Concrete resource classes ### + + +class FILE(AbstractUrl, SlaveLocal, SlaveRemote): + + class FILEServer(Server): + pass + + server = FILEServer + + def __init__(self, path): + sup(self, path, '^/') + + def connect(self): + os.chdir(self.path) + + def rsync(self, files): + return sup(self, files, self.path) + + +class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote): + + class GLUSTERServer(Server): + + @classmethod + def volume_info(cls): + vm = struct.unpack('!' + 'B'*19 + 'II', + Xattr.lgetxattr('.', '.'.join([cls.GX_NSPACE, 'volume-mark']), 27)) + m = re.match('(.{8})(.{4})(.{4})(.{4})(.{12})', "".join(['%02x' % x for x in vm[2:18]])) + uuid = '-'.join(m.groups()) + return { 'version': vm[0:2], + 'uuid' : uuid, + 'retval' : vm[18], + 'volume_mark': vm[-2:] } + + server = GLUSTERServer + + def __init__(self, path): + self.host, self.volume = sup(self, path, '^(%s):(.+)' % HostRX.pattern) + + def can_connect_to(self, remote): + return True + + def connect(self): + d = tempfile.mkdtemp() + try: + argv = [gconf.gluster_command] + \ + (gconf.gluster_log_level and ['-L', gConf.gluster_log_level] or []) + \ + ['-l', gconf.gluster_log_file, '-s', self.host, + '--volfile-id', self.volume, '--client-pid=-1', d] + if os.spawnvp(os.P_WAIT, argv[0], argv): + raise RuntimeError("command failed: " + " ".join(argv)) + logging.debug('auxiliary glusterfs mount in place') + os.chdir(d) + argv = ['umount', '-l', d] + if os.spawnvp(os.P_WAIT, argv[0], argv): + raise RuntimeError("command failed: " + " ".join(argv)) + finally: + try: + os.rmdir(d) + except: + logging.warn('stale mount left behind on ' + d) + logging.debug('auxiliary glusterfs mount prepared') + + def connect_remote(self, *a, **kw): + sup(self, *a, **kw) + self.slavedir = "/proc/%d/cwd" % self.server.pid() + + def service_loop(self, *args): + if args: + GMaster(self, args[0]).crawl() + else: + sup(self, *args) + + def rsync(self, files): + return sup(self, files, self.slavedir) + + +class SSH(AbstractUrl, SlaveRemote): + + def __init__(self, path): + self.remote_addr, inner_url = sup(self, path, + '^((?:%s@)?%s):(.+)' % tuple([ r.pattern for r in (UserRX, HostRX) ])) + self.inner_rsc = parse_url(inner_url) + + def can_connect_to(self, remote): + return False + + def start_fd_client(self, *a, **opts): + if opts['deferred']: + return a + sup(self, *a) + ityp = type(self.inner_rsc) + if ityp == FILE: + slavepath = self.inner_rsc.path + elif ityp == GLUSTER: + slavepath = "/proc/%d/cwd" % self.server.pid() + else: + raise NotImplementedError + self.slaveurl = ':'.join([self.remote_addr, slavepath]) + + def connect_remote(self, go_daemon=None): + if go_daemon == 'done': + return self.start_fd_client(*self.fd_pair) + gconf.setup_ssh_ctl(tempfile.mkdtemp()) + deferred = go_daemon == 'postconn' + ret = sup(self, gconf.ssh_command.split() + gconf.ssh_ctl_args + [self.remote_addr], slave=self.inner_rsc.url, deferred=deferred) + if deferred: + # send a ping to peer so that we can wait for + # the answer from which we know connection is + # established and we can proceed with daemonization + # (doing that too early robs the ssh passwd prompt...) + # However, we'd better not start the RepceClient + # before daemonization (that's not preserved properly + # in daemon), we just do a an ad-hoc linear put/get. + i, o = ret + inf = os.fdopen(i) + repce.send(o, None, 'ping') + select.select((inf,), (), ()) + repce.recv(inf) + # hack hack hack: store a global reference to the file + # to save it from getting GC'd which implies closing it + gconf._in_fd_reference = inf + self.fd_pair = (i, o) + return 'should' + + def rsync(self, files): + return sup(self, files, '-ze', " ".join(gconf.ssh_command.split() + gconf.ssh_ctl_args), self.slaveurl) diff --git a/xlators/features/marker/utils/syncdaemon/simplecfg.py b/xlators/features/marker/utils/syncdaemon/simplecfg.py new file mode 100644 index 00000000000..fc3863ef4e7 --- /dev/null +++ b/xlators/features/marker/utils/syncdaemon/simplecfg.py @@ -0,0 +1,77 @@ +import re +import tempfile +import os + +CommentRe = re.compile('\s*(#|$)') +ParseRe = re.compile('\s*(\S+):\s+(.*\S)\s+$') + +class SimpleCfgError(Exception): + pass + +class SimpleCfg(dict): + """ + Read/write support for a simple config file format. + Entries can be of the form "key: value". + "#" comments are supported. Whitespace-only lines are ignored. + """ + + def __init__(self, *a, **kw): + dict.__init__(self, *a, **kw) + self.klist = dict.keys(self) + + def __setitem__(self, k, v): + k = k.replace('-', '_') + if not k in self: + self.klist.append(k) + dict.__setitem__(self, k, v) + + def __iter__(self): + return self.klist.__iter__() + + def keys(self): + return self.klist + + def pop(self, key, *a): + e = dict.pop(self, key, *a) + self.klist.remove(key) + return e + + def readstream(self, s): + while True: + l = s.readline() + if not l: + break + m = ParseRe.match(l) + if m: + k, v = m.groups() + self[k] = v + elif not CommentRe.match(l): + raise SimpleCfgError('syntax error') + + def writestream(self, s): + for k in self: + s.write('%s: %s\n' % (k, self[k])) + + def read(self, file): + f = None + try: + f = open(file) + self.readstream(f) + finally: + if f: + f.close() + + def write(self, file): + tfd = None + tfil = None + try: + tfd, tname = tempfile.mkstemp(dir=os.path.dirname(file)) + tfil, tfd = os.fdopen(tfd, 'w'), None + self.writestream(tfil) + os.fsync(tfil.fileno()) + os.rename(tname, file) + finally: + if tfd != None: + os.close(tfd) + if tfil != None: + tfil.close() -- cgit