diff options
| author | Csaba Henk <csaba@gluster.com> | 2011-01-27 05:23:35 +0000 | 
|---|---|---|
| committer | Anand V. Avati <avati@dev.gluster.com> | 2011-01-27 03:17:20 -0800 | 
| commit | 85300e25f2d47e33b169d14fa9eb0b7cfe39011b (patch) | |
| tree | 6a00e8790358f1321855122d90a78e669168c1d1 | |
| parent | 7d883898c5225df3f7c38e67274b74ff8ac396c0 (diff) | |
adding syncdaemon
Signed-off-by: Csaba Henk <csaba@gluster.com>
Signed-off-by: Anand V. Avati <avati@dev.gluster.com>
BUG: 2310 (georeplication)
URL: http://bugs.gluster.com/cgi-bin/bugzilla3/show_bug.cgi?id=2310
| -rw-r--r-- | configure.ac | 34 | ||||
| -rw-r--r-- | xlators/features/marker/Makefile.am | 2 | ||||
| -rw-r--r-- | xlators/features/marker/utils/Makefile.am | 7 | ||||
| -rwxr-xr-x | xlators/features/marker/utils/gsyncd.in | 7 | ||||
| -rw-r--r-- | xlators/features/marker/utils/syncdaemon/Makefile.am | 5 | ||||
| -rw-r--r-- | xlators/features/marker/utils/syncdaemon/README.md | 81 | ||||
| -rw-r--r-- | xlators/features/marker/utils/syncdaemon/__init__.py | 0 | ||||
| -rw-r--r-- | xlators/features/marker/utils/syncdaemon/gconf.py | 13 | ||||
| -rw-r--r-- | xlators/features/marker/utils/syncdaemon/gsyncd.py | 230 | ||||
| -rw-r--r-- | xlators/features/marker/utils/syncdaemon/master.py | 240 | ||||
| -rw-r--r-- | xlators/features/marker/utils/syncdaemon/repce.py | 150 | ||||
| -rw-r--r-- | xlators/features/marker/utils/syncdaemon/resource.py | 372 | ||||
| -rw-r--r-- | xlators/features/marker/utils/syncdaemon/simplecfg.py | 77 | 
13 files changed, 1217 insertions, 1 deletions
diff --git a/configure.ac b/configure.ac index e8d3c07a8..3ce78028f 100644 --- a/configure.ac +++ b/configure.ac @@ -93,6 +93,9 @@ AC_CONFIG_FILES([Makefile  		xlators/features/quota/src/Makefile                  xlators/features/marker/Makefile                  xlators/features/marker/src/Makefile +                xlators/features/marker/utils/Makefile +                xlators/features/marker/utils/gsyncd +                xlators/features/marker/utils/syncdaemon/Makefile  		xlators/features/read-only/Makefile  		xlators/features/read-only/src/Makefile  		xlators/features/mac-compat/Makefile @@ -281,6 +284,36 @@ AC_SUBST(RDMA_SUBDIR)  # end IBVERBS section +# SYNCDAEMON section +AC_ARG_ENABLE([georeplication], +	      AC_HELP_STRING([--disable-georeplication], +			     [Do not install georeplication components])) + +BUILD_SYNCDAEMON=no +if test "x$enable_georeplication" != "xno"; then +  SYNCDAEMON_SUBDIR=utils +  BUILD_SYNCDAEMON="yes" +  AM_PATH_PYTHON([2.4]) +  echo -n "checking if python is python 2.x... " +  if echo $PYTHON_VERSION | grep ^2; then +    : +  else +    echo no +    AC_MSG_ERROR([only python 2.x is supported]) +  fi +  echo -n "checking if python has ctypes support... " +  if "$PYTHON" -c 'import ctypes' 2>/dev/null; then +    echo yes +  else +    echo no +    AC_MSG_ERROR([python does not have ctypes support]) +  fi +fi + +AC_SUBST(SYNCDAEMON_SUBDIR) +# end SYNCDAEMON section + +  dnl FreeBSD > 5 has execinfo as a Ported library for giving a workaround  dnl solution to GCC backtrace functionality @@ -430,4 +463,5 @@ echo "epoll IO multiplex : $BUILD_EPOLL"  echo "argp-standalone    : $BUILD_ARGP_STANDALONE"  echo "fusermount         : $BUILD_FUSERMOUNT"  echo "readline           : $BUILD_READLINE" +echo "georeplication     : $BUILD_SYNCDAEMON"  echo diff --git a/xlators/features/marker/Makefile.am b/xlators/features/marker/Makefile.am index a985f42a8..a6ba2de16 100644 --- a/xlators/features/marker/Makefile.am +++ b/xlators/features/marker/Makefile.am @@ -1,3 +1,3 @@ -SUBDIRS = src +SUBDIRS = src @SYNCDAEMON_SUBDIR@  CLEANFILES = diff --git a/xlators/features/marker/utils/Makefile.am b/xlators/features/marker/utils/Makefile.am new file mode 100644 index 000000000..8aefea401 --- /dev/null +++ b/xlators/features/marker/utils/Makefile.am @@ -0,0 +1,7 @@ +SUBDIRS = syncdaemon + +gsyncddir = $(libexecdir) + +gsyncd_SCRIPTS = gsyncd + +CLEANFILES = diff --git a/xlators/features/marker/utils/gsyncd.in b/xlators/features/marker/utils/gsyncd.in new file mode 100755 index 000000000..9bbf8041f --- /dev/null +++ b/xlators/features/marker/utils/gsyncd.in @@ -0,0 +1,7 @@ +#!/bin/sh + +prefix="@prefix@" +exec_prefix="@exec_prefix@" +libexecdir=`eval echo "@libexecdir@"` + +PYTHONPATH="$libexecdir"/python exec @PYTHON@ -c "from syncdaemon import gsyncd; gsyncd.main()" -c @sysconfdir@/glusterfs/gsyncd.conf "$@" diff --git a/xlators/features/marker/utils/syncdaemon/Makefile.am b/xlators/features/marker/utils/syncdaemon/Makefile.am new file mode 100644 index 000000000..1d5014a9e --- /dev/null +++ b/xlators/features/marker/utils/syncdaemon/Makefile.am @@ -0,0 +1,5 @@ +syncdaemondir = $(libexecdir)/python/syncdaemon + +syncdaemon_PYTHON = gconf.py gsyncd.py __init__.py master.py README.md repce.py resource.py simplecfg.py + +CLEANFILES = diff --git a/xlators/features/marker/utils/syncdaemon/README.md b/xlators/features/marker/utils/syncdaemon/README.md new file mode 100644 index 000000000..d45006932 --- /dev/null +++ b/xlators/features/marker/utils/syncdaemon/README.md @@ -0,0 +1,81 @@ +gsycnd, the Gluster Syncdaemon +============================== + +REQUIREMENTS +------------ + +_gsyncd_ is a program which can operate either in _master_ or in _slave_ mode. +Requirements are categorized according to this. + +* supported OS is GNU/Linux +* Python >= 2.5, or 2.4 with Ctypes (see below) (both) +* OpenSSH >= 4.0 (master) / SSH2 compliant sshd (eg. openssh) (slave) +* rsync (both) +* glusterfs with marker support (master); glusterfs (optional on slave) +* FUSE; for supported versions consult glusterfs + +INSTALLATION +------------ + +As of now, the supported way of operation is running from the source directory. + +If you use Python 2.4.x, you need to install the [Ctypes module](http://python.net/crew/theller/ctypes/). + +CONFIGURATION +------------- + +gsyncd tunables are a subset of the long command-line options; for listing them, +type + +    gsyncd.py --help + +and see the long options up to "--config-file". (The leading double dash should be omitted; +interim underscores and dashes are interchangeable.) The set of options bear some resemblance +to those of glusterfs and rsync. + +The config file format matches the following syntax: + +      <option1>: <value1> +      <option2>: <value2> +      # comment + +By default (unless specified by the option `-c`), gsyncd looks for config file at _conf/gsyncd.conf_ +in the source tree. + +USAGE +----- + +gsyncd is a utilitly for continous mirroring, ie. it mirrors master to slave incrementally. +Assume we have a gluster volume _pop_ at localhost. We try to set up the following mirrors +for it with gysncd: + +1. _/data/mirror_ +2. local gluster volume _yow_ +3. _/data/far_mirror_ at example.com +4. gluster volume _moz_ at example.com + +The respective gsyncd invocations are (demoing some syntax sugaring): + +1. + +      gsyncd.py gluster://localhost:pop file:///data/mirror + +  or short form + +      gsyncd.py :pop /data/mirror + +2. `gsyncd :pop :yow` +3. + +       gsyncd.py :pop ssh://example.com:/data/far_mirror + +  or short form + +       gsyncd.py :pop example.com:/data/far_mirror + +4. `gsyncd.py :pop example.com::moz` + +gsyncd has to be available on both sides; it's location on the remote side has to be specified +via the "--remote-gsyncd" option (or "remote-gsyncd" config file parameter). (This option can also be +used for setting options on the remote side, although the suggested mode of operation is to +set parameters like log file / pid file in the configuration file.) diff --git a/xlators/features/marker/utils/syncdaemon/__init__.py b/xlators/features/marker/utils/syncdaemon/__init__.py new file mode 100644 index 000000000..e69de29bb --- /dev/null +++ b/xlators/features/marker/utils/syncdaemon/__init__.py diff --git a/xlators/features/marker/utils/syncdaemon/gconf.py b/xlators/features/marker/utils/syncdaemon/gconf.py new file mode 100644 index 000000000..7bedce514 --- /dev/null +++ b/xlators/features/marker/utils/syncdaemon/gconf.py @@ -0,0 +1,13 @@ +import os + +class GConf(object): +    ssh_ctl_dir = None +    ssh_ctl_args = None +    cpid = None + +    @classmethod +    def setup_ssh_ctl(cls, ctld): +        cls.ssh_ctl_dir = ctld +        cls.ssh_ctl_args = ["-oControlMaster=auto", "-S", os.path.join(ctld, "gsycnd-ssh-%r@%h:%p")] + +gconf = GConf() diff --git a/xlators/features/marker/utils/syncdaemon/gsyncd.py b/xlators/features/marker/utils/syncdaemon/gsyncd.py new file mode 100644 index 000000000..f84df5021 --- /dev/null +++ b/xlators/features/marker/utils/syncdaemon/gsyncd.py @@ -0,0 +1,230 @@ +#!/usr/bin/env python + +import os +import os.path +import sys +import time +import logging +import signal +import select +import shutil +import optparse +from optparse import OptionParser, SUPPRESS_HELP +from logging import Logger +from errno import EEXIST, ENOENT + +from gconf import gconf +import resource +from simplecfg import SimpleCfg + +class GLogger(Logger): + +    def makeRecord(self, name, level, *a): +        rv = Logger.makeRecord(self, name, level, *a) +        rv.nsecs = (rv.created - int(rv.created)) * 1000000 +        fr = sys._getframe(4) +        callee = fr.f_locals.get('self') +        if callee: +            ctx = str(type(callee)).split("'")[1].split('.')[-1] +        else: +            ctx = '<top>' +        if not hasattr(rv, 'funcName'): +            rv.funcName = fr.f_code.co_name +        rv.lvlnam = logging.getLevelName(level)[0] +        rv.ctx = ctx +        return rv + +    @classmethod +    def setup(cls, **kw): +        lprm = {'datefmt': "%Y-%m-%d %H:%M:%S", +                'format': "[%(asctime)s.%(nsecs)d] %(lvlnam)s [%(module)s:%(lineno)s:%(funcName)s] %(ctx)s: %(message)s"} +        lprm.update(kw) +        lvl = kw.get('level', logging.INFO) +        if isinstance(lvl, str): +            lvl = logging.getLevelName(lvl) +            lprm['level'] = lvl +        logging.root = cls("root", lvl) +        logging.setLoggerClass(cls) +        logging.getLogger().handlers = [] +        logging.basicConfig(**lprm) + + +def startup(**kw): +    def write_pid(fn): +        fd = None +        try: +            fd = os.open(fn, os.O_CREAT|os.O_TRUNC|os.O_WRONLY|os.O_EXCL) +            os.write(fd, str(os.getpid()) + '\n') +        finally: +            if fd: +                os.close(fd) + +    if kw.get('go_daemon') == 'should': +        x, y = os.pipe() +        gconf.cpid = os.fork() +        if gconf.cpid: +            os.close(x) +            sys.exit() +        os.close(y) +        # wait for parent to terminate +        # so we can start up with +        # no messing from the dirty +        # ol' bustard +        select.select((x,), (), ()) +        os.close(x) +        if getattr(gconf, 'pid_file', None): +            write_pid(gconf.pid_file + '.tmp') +            os.rename(gconf.pid_file + '.tmp', gconf.pid_file) +        os.setsid() +        dn = os.open(os.devnull, os.O_RDWR) +        for f in (sys.stdin, sys.stdout, sys.stderr): +            os.dup2(dn, f.fileno()) +    elif getattr(gconf, 'pid_file', None): +        try: +            write_pid(gconf.pid_file) +        except OSError: +            gconf.pid_file = None +            ex = sys.exc_info()[1] +            if ex.errno == EEXIST: +                sys.stderr.write("pidfile is taken, exiting.\n") +                exit(2) +            raise + +    lkw = {'level': gconf.log_level} +    if kw.get('log_file'): +        lkw['filename'] = kw['log_file'] +    GLogger.setup(**lkw) + +def finalize(*a): +    if getattr(gconf, 'pid_file', None): +        if gconf.cpid: +            while True: +                f = open(gconf.pid_file) +                pid = f.read() +                f.close() +                pid = int(pid.strip()) +                if pid == gconf.cpid: +                    break +                if pid != os.getpid(): +                    raise RuntimeError("corrupt pidfile") +                if os.waitpid(gconf.cpid, os.WNOHANG)[0] == gconf.cpid: +                    break; +                time.sleep(0.1) +        else: +            try: +                os.unlink(gconf.pid_file) +            except: +                ex = sys.exc_info()[1] +                if ex.errno == ENOENT: +                    pass +                else: +                    raise +    if gconf.ssh_ctl_dir and not gconf.cpid: +        shutil.rmtree(gconf.ssh_ctl_dir) + +def main(): +    # ??? "finally" clause does not take effect with SIGTERM... +    # but this handler neither does +    # signal.signal(signal.SIGTERM, finalize) +    GLogger.setup() +    try: +        try: +            main_i() +        except: +            exc = sys.exc_info()[0] +            if exc != SystemExit: +                logging.exception("FAIL: ") +                sys.stderr.write("failed with %s.\n" % exc.__name__) +                exit(1) +    finally: +        finalize() + +def main_i(): +    rconf = {'go_daemon': 'should'} + +    def store_abs(opt, optstr, val, parser): +        setattr(parser.values, opt.dest, os.path.abspath(val)) +    def store_local(opt, optstr, val, parser): +        rconf[opt.dest] = val +    def store_local_curry(val): +        return lambda o, oo, vx, p: store_local(o, oo, val, p) + +    op = OptionParser(usage="%prog [options...] <master> <slave>", version="%prog 0.0.1") +    op.add_option('--gluster-command',     metavar='CMD',   default='glusterfs') +    op.add_option('--gluster-log-file',    metavar='LOGF',  default=os.devnull, type=str, action='callback', callback=store_abs) +    op.add_option('--gluster-log-level',   metavar='LVL') +    op.add_option('-p', '--pid-file',      metavar='PIDF',  type=str, action='callback', callback=store_abs) +    op.add_option('-l', '--log-file',      metavar='LOGF',  type=str, action='callback', callback=store_abs) +    op.add_option('-L', '--log-level',     metavar='LVL') +    op.add_option('-r', '--remote-gsyncd', metavar='CMD',   default=os.path.abspath(sys.argv[0])) +    op.add_option('-s', '--ssh-command',   metavar='CMD',   default='ssh') +    op.add_option('--rsync-command',       metavar='CMD',   default='rsync') +    op.add_option('--rsync-extra',         metavar='ARGS',  default='-sS', help=SUPPRESS_HELP) +    op.add_option('--timeout',             metavar='SEC',   type=int, default=30) +    op.add_option('--sync-jobs',           metavar='N',     type=int, default=3) +    op.add_option('--turns',               metavar='N',     type=int, default=0, help=SUPPRESS_HELP) + +    op.add_option('-c', '--config-file',   metavar='CONF',  type=str, action='callback', callback=store_local) +    # duh. need to specify dest or value will be mapped to None :S +    op.add_option('--listen', dest='listen', help=SUPPRESS_HELP,      action='callback', callback=store_local_curry(True)) +    op.add_option('-N', '--no-daemon', dest="go_daemon",    action='callback', callback=store_local_curry('dont')) +    op.add_option('--debug', dest="go_daemon",              action='callback', callback=lambda *a: (store_local_curry('dont')(*a), +                                                                                                    a[-1].values.__dict__.get('log_level') or \ +                                                                                                     a[-1].values.__dict__.update(log_level='DEBUG'))) +    # precedence for sources of values: 1) commandline, 2) cfg file, 3) defaults +    # -- for this to work out we need to tell apart defaults from explicitly set +    # options... so churn out the defaults here and call the parser with virgin +    # values container. +    defaults = op.get_default_values() +    opts, args = op.parse_args(values=optparse.Values()) +    if not (len(args) == 2 or (len(args) == 1 and rconf.get('listen'))): +        sys.stderr.write("error: incorrect number of arguments\n\n") +        sys.stderr.write(op.get_usage() + "\n") +        sys.exit(1) + +    gconf.__dict__.update(defaults.__dict__) +    # XXX add global config support +    if not 'config_file' in rconf: +        rconf['config_file'] = os.path.join(os.path.dirname(sys.argv[0]), "conf/gsyncd.conf") +    try: +        cfg = SimpleCfg() +        cfg.read(rconf['config_file']) +        gconf.__dict__.update(cfg) +    except IOError: +        ex = sys.exc_info()[1] +        if ex.errno != ENOENT: +            raise +    gconf.__dict__.update(opts.__dict__) + +    local = resource.parse_url(args[0]) +    remote = None +    if len(args) > 1: +        remote = resource.parse_url(args[1]) + +    if not local.can_connect_to(remote): +        raise RuntimeError("%s cannot work with %s" % (local.path, remote and remote.path)) + +    go_daemon = rconf['go_daemon'] + +    if isinstance(remote, resource.SSH) and go_daemon == 'should': +        go_daemon = 'postconn' +        log_file = None +    else: +        log_file = gconf.log_file +    startup(go_daemon=go_daemon, log_file=log_file) + +    logging.info("syncing: %s" % " -> ".join([x.url for x in [local, remote] if x])) +    if remote: +        go_daemon = remote.connect_remote(go_daemon=go_daemon) +        if go_daemon: +            startup(go_daemon=go_daemon, log_file=gconf.log_file) +            # complete remote connection in child +            remote.connect_remote(go_daemon='done') +    local.connect() +    local.service_loop(*[r for r in [remote] if r]) + +    logging.info("exiting.") + + +if __name__ == "__main__": +    main() diff --git a/xlators/features/marker/utils/syncdaemon/master.py b/xlators/features/marker/utils/syncdaemon/master.py new file mode 100644 index 000000000..a2f9f718e --- /dev/null +++ b/xlators/features/marker/utils/syncdaemon/master.py @@ -0,0 +1,240 @@ +import os +import sys +import time +import stat +import signal +import logging +import errno +from errno import ENOENT, ENODATA +from threading import Thread, currentThread, Condition, Lock + +from gconf import gconf + +URXTIME = (-1, 0) + +class GMaster(object): + +    def get_volinfo(self): +        self.volume_info = self.master.server.volume_info() +        if self.volume_info['retval']: +            raise RuntimeError("master is corrupt") +        return self.volume_info + +    @property +    def uuid(self): +        if not getattr(self, '_uuid', None): +            self._uuid = self.volume_info['uuid'] +        return self._uuid + +    @property +    def volmark(self): +        return self.volume_info['volume_mark'] + +    def xtime(self, path, *a, **opts): +        if a: +            rsc = a[0] +        else: +            rsc = self.master +        if not 'create' in opts: +            opts['create'] = rsc == self.master +        xt = rsc.server.xtime(path, self.uuid) +        if (isinstance(xt, int) or xt < self.volmark) and opts['create']: +            t = time.time() +            sec = int(t) +            nsec = int((t - sec) * 1000000) +            xt = (sec, nsec) +            rsc.server.set_xtime(path, self.uuid, xt) +        if xt == ENODATA: +            xt = URXTIME +        return xt + +    def __init__(self, master, slave): +        self.master = master +        self.slave = slave +        self.get_volinfo() +        self.jobtab = {} +        self.syncer = Syncer(slave) +        self.total_turns = int(gconf.turns) +        self.turns = 0 +        self.start = None +        self.change_seen = None +        logging.info('master started on ' + self.uuid) +        while True: +            self.crawl() + +    def add_job(self, path, label, job, *a, **kw): +        if self.jobtab.get(path) == None: +            self.jobtab[path] = [] +        self.jobtab[path].append((label, a, lambda : job(*a, **kw))) + +    def wait(self, path, mark): +        jobs = self.jobtab.pop(path, []) +        succeed = True +        for j in jobs: +            ret = j[-1]() +            if not ret: +                succeed = False +        if succeed: +            self.sendmark(path, mark) +        return succeed + +    def sendmark(self, path, mark): +        self.slave.server.set_xtime(path, self.uuid, mark) + +    def crawl(self, path='.', xtl=None): +        if path == '.': +            if self.start: +                logging.info("crawl took %.6f" % (time.time() - self.start)) +            time.sleep(1) +            self.start = time.time() +            logging.info("crawling...") +            self.get_volinfo() +            if self.volume_info['uuid'] != self.uuid: +                raise RuntimeError("master uuid mismatch") +        logging.debug("entering " + path) +        if not xtl: +            xtl = self.xtime(path) +        xtr0 = self.xtime(path, self.slave) +        if isinstance(xtr0, int): +            xtr = URXTIME +        else: +            xtr = xtr0 +        if xtr0 == ENOENT: +            self.slave.server.mkdir(path) +        else: +            if xtr > xtl: +                raise RuntimeError("timestamp corruption for " + path) +            if xtl == xtr: +                if path == '.' and self.total_turns and self.change_seen: +                    self.turns += 1 +                    self.change_seen = False +                    logging.info("finished turn #%s/%s" % (self.turns, self.total_turns)) +                    if self.turns == self.total_turns: +                        logging.info("reached turn limit, terminating.") +                        os.kill(os.getpid(), signal.SIGTERM) +                return +        if path == '.': +            self.change_seen = True +        dem, des = ( x.server.entries(path) for x in (self.master, self.slave) ) +        dd = set(des) - set(dem) +        if dd: +            self.slave.server.purge(path, dd) +        chld = [] +        for e in dem: +            e = os.path.join(path, e) +            xte = self.xtime(e) +            if isinstance(xte, int): +                logging.warn("irregular xtime for %s: %s" % (e, errno.errorcode[xte])) +            elif xte > xtr: +                chld.append((e, xte)) +        def indulgently(e, fnc, blame=None): +            if not blame: +                blame = path +            try: +                return fnc(e) +            except (IOError, OSError): +                ex = sys.exc_info()[1] +                if ex.errno == ENOENT: +                    logging.warn("salvaged ENOENT for" + e) +                    self.add_job(blame, 'salvage', lambda: False) +                    return False +                else: +                    raise +        for e, xte in chld: +            mo = indulgently(e, lambda e: os.lstat(e).st_mode) +            if mo == False: +                continue +            if stat.S_ISLNK(mo): +                self.slave.server.symlink(os.readlink(e), e) +                self.sendmark(e, xte) +            elif stat.S_ISREG(mo): +                logging.debug("syncing %s ..." % e) +                pb = self.syncer.add(e) +                def regjob(e, xte, pb): +                    if pb.wait(): +                        logging.debug("synced " + e) +                        self.sendmark(e, xte) +                        return True +                    else: +                        logging.error("failed to sync " + e) +                self.add_job(path, 'reg', regjob, e, xte, pb) +            elif stat.S_ISDIR(mo): +                if indulgently(e, lambda e: (self.add_job(path, 'cwait', self.wait, e, xte), +                                             self.crawl(e, xte), +                                             True)[-1], blame=e) == False: +                    continue +            else: +                # ignore fifos, sockets and special files +                pass +        if path == '.': +            self.wait(path, xtl) + +class BoxClosedErr(Exception): +    pass + +class PostBox(list): + +    def __init__(self, *a): +        list.__init__(self, *a) +        self.lever = Condition() +        self.open = True +        self.done = False + +    def wait(self): +        self.lever.acquire() +        if not self.done: +            self.lever.wait() +        self.lever.release() +        return self.result + +    def wakeup(self, data): +        self.result = data +        self.lever.acquire() +        self.done = True +        self.lever.notifyAll() +        self.lever.release() + +    def append(self, e): +        self.lever.acquire() +        if not self.open: +            raise BoxClosedErr +        list.append(self, e) +        self.lever.release() + +    def close(self): +        self.lever.acquire() +        self.open = False +        self.lever.release() + +class Syncer(object): + +    def __init__(self, slave): +        self.slave = slave +        self.lock = Lock() +        self.pb = PostBox() +        for i in range(int(gconf.sync_jobs)): +            t = Thread(target=self.syncjob) +            t.setDaemon = True +            t.start() + +    def syncjob(self): +        while True: +            pb = None +            while True: +                self.lock.acquire() +                if self.pb: +                    pb, self.pb = self.pb, PostBox() +                self.lock.release() +                if pb: +                    break +                time.sleep(0.5) +            pb.close() +            pb.wakeup(self.slave.rsync(pb)) + +    def add(self, e): +        while True: +            try: +                self.pb.append(e) +                return self.pb +            except BoxClosedErr: +                pass diff --git a/xlators/features/marker/utils/syncdaemon/repce.py b/xlators/features/marker/utils/syncdaemon/repce.py new file mode 100644 index 000000000..f878d481a --- /dev/null +++ b/xlators/features/marker/utils/syncdaemon/repce.py @@ -0,0 +1,150 @@ +import os +import sys +import select +import time +import logging +from threading import Thread, Condition +try: +    import thread +except ImportError: +    # py 3 +    import _thread as thread +try: +    from Queue import Queue +except ImportError: +    # py 3 +    from queue import Queue +try: +    import cPickle as pickle +except ImportError: +    # py 3 +    import pickle + +pickle_proto = -1 + +def ioparse(i, o): +    if isinstance(i, int): +        i = os.fdopen(i) +    # rely on duck typing for recognizing +    # streams as that works uniformly +    # in py2 and py3 +    if hasattr(o, 'fileno'): +        o = o.fileno() +    return (i, o) + +def send(out, *args): +    os.write(out, pickle.dumps(args, pickle_proto)) + +def recv(inf): +    return pickle.load(inf) + + +class RepceServer(object): + +    def __init__(self, obj, i, o, wnum=6): +        self.obj = obj +        self.inf, self.out = ioparse(i, o) +        self.wnum = wnum +        self.q = Queue() + +    def service_loop(self): +        for i in range(self.wnum): +            t = Thread(target=self.worker) +            t.setDaemon(True) +            t.start() +        try: +            while True: +                self.q.put(recv(self.inf)) +        except EOFError: +                logging.info("terminating on reaching EOF.") + +    def worker(self): +        while True: +            in_data = self.q.get(True) +            rid = in_data[0] +            rmeth = in_data[1] +            exc = False +            try: +                res = getattr(self.obj, rmeth)(*in_data[2:]) +            except: +                res = sys.exc_info()[1] +                exc = True +                logging.exception("call failed: ") +            send(self.out, rid, exc, res) + + +class RepceJob(object): + +    def __init__(self, cbk): +        self.rid = (os.getpid(), thread.get_ident(), time.time()) +        self.cbk = cbk +        self.lever = Condition() +        self.done = False + +    def __repr__(self): +        return ':'.join([str(x) for x in self.rid]) + +    def wait(self): +        self.lever.acquire() +        if not self.done: +            self.lever.wait() +        self.lever.release() +        return self.result + +    def wakeup(self, data): +        self.result = data +        self.lever.acquire() +        self.done = True +        self.lever.notify() +        self.lever.release() + + +class RepceClient(object): + +    def __init__(self, i, o): +        self.inf, self.out = ioparse(i, o) +        self.jtab = {} +        t = Thread(target = self.listen) +        t.setDaemon(True) +        t.start() + +    def listen(self): +        while True: +            select.select((self.inf,), (), ()) +            rid, exc, res = recv(self.inf) +            rjob = self.jtab.pop(rid) +            if rjob.cbk: +                rjob.cbk(rjob, [exc, res]) + +    def push(self, meth, *args, **kw): +        cbk = kw.get('cbk') +        if not cbk: +            def cbk(rj, res): +                if res[0]: +                    raise res[1] +        rjob = RepceJob(cbk) +        self.jtab[rjob.rid] = rjob +        logging.debug("call %s %s%s ..." % (repr(rjob), meth, repr(args))) +        send(self.out, rjob.rid, meth, *args) +        return rjob + +    def __call__(self, meth, *args): +        rjob = self.push(meth, *args, **{'cbk': lambda rj, res: rj.wakeup(res)}) +        exc, res = rjob.wait() +        if exc: +            logging.error('call %s (%s) failed on peer with %s' % (repr(rjob), meth, str(type(res).__name__))) +            raise res +        logging.debug("call %s %s -> %s" % (repr(rjob), meth, repr(res))) +        return res + +    class mprx(object): + +        def __init__(self, ins, meth): +            self.ins = ins +            self.meth = meth + +        def __call__(self, *a): +            return self.ins(self.meth, *a) + +    def __getattr__(self, meth): +        return self.mprx(self, meth) diff --git a/xlators/features/marker/utils/syncdaemon/resource.py b/xlators/features/marker/utils/syncdaemon/resource.py new file mode 100644 index 000000000..bbf7459bb --- /dev/null +++ b/xlators/features/marker/utils/syncdaemon/resource.py @@ -0,0 +1,372 @@ +import re +import os +import sys +import time +import errno +import struct +import select +import logging +import tempfile +import threading +from ctypes import * +from ctypes.util import find_library +from errno import EEXIST, ENOENT, ENODATA, ENOTDIR + +from gconf import gconf +import repce +from repce import RepceServer, RepceClient +from master import GMaster + +UrlRX  = re.compile('\A(\w+)://(.*)') +HostRX = re.compile('[a-z\d](?:[a-z\d.-]*[a-z\d])?', re.I) +UserRX = re.compile("[\w!\#$%&'*+-\/=?^_`{|}~]+") + +def sup(x, *a, **kw): +    return getattr(super(type(x), x), sys._getframe(1).f_code.co_name)(*a, **kw) + +def desugar(ustr): +    m = re.match('([^:]*):(.*)', ustr) +    if m: +        if not m.groups()[0]: +            return "gluster://localhost" + ustr +        elif '@' in m.groups()[0] or re.search('[:/]', m.groups()[1]): +            return "ssh://" + ustr +        else: +            return "gluster://#{str}" +    else: +        return "file://" + os.path.abspath(ustr) + +def parse_url(ustr): +    m = UrlRX.match(ustr) +    if not m: +        ustr = desugar(ustr) +    m = UrlRX.match(ustr) +    if not m: +        raise RuntimeError("malformed url") +    sch, path = m.groups() +    this = sys.modules[__name__] +    if not hasattr(this, sch.upper()): +        raise RuntimeError("unknown url scheme " + sch) +    return getattr(this, sch.upper())(path) + + +class Xattr(object): + +    libc = CDLL(find_library("libc")) + +    @classmethod +    def geterrno(cls): +        return c_int.in_dll(cls.libc, 'errno').value + +    @classmethod +    def raise_oserr(cls): +        errn = cls.geterrno() +        raise OSError(errn, os.strerror(errn)) + +    @classmethod +    def lgetxattr(cls, path, attr, siz=0): +        if siz: +            buf = create_string_buffer('\0' * siz) +        else: +            buf = None +        ret = cls.libc.lgetxattr(path, attr, buf, siz) +        if ret == -1: +            cls.raise_oserr() +        if siz: +            return buf.raw[:ret] +        else: +            return ret + +    @classmethod +    def lsetxattr(cls, path, attr, val): +        ret = cls.libc.lsetxattr(path, attr, val, len(val), 0) +        if ret == -1: +            cls.raise_oserr() + + +class Server(object): + +    GX_NSPACE = "trusted.glusterfs" + +    @staticmethod +    def entries(path): +        try: +            return os.listdir(path) +        except OSError: +            ex = sys.exc_info()[1] +            if ex.errno == ENOTDIR: +                return [] +            else: +                raise + +    @classmethod +    def purge(cls, path, entries=None): +        me_also = entries == None +        if not entries: +            try: +                entries = os.listdir(path) +            except OSError: +                ex = sys.exc_info()[1] +                if ex.errno in (ENOTDIR, ENOENT): +                    try: +                        os.unlink(path) +                        return +                    except OSError: +                        ex = sys.exc_info()[1] +                        if ex.errno != ENOENT: +                            raise +                else: +                    raise +        for e in entries: +            cls.purge(os.path.join(path, e)) +        if me_also: +            os.rmdir(path) + +    @classmethod +    def _create(cls, path, ctor): +        try: +            ctor(path) +        except OSError: +            ex = sys.exc_info()[1] +            if ex.errno == EEXIST: +                cls.purge(path) +                return ctor(path) +            raise + +    @classmethod +    def mkdir(cls, path): +        cls._create(path, os.mkdir) + +    @classmethod +    def symlink(cls, lnk, path): +        cls._create(path, lambda p: os.symlink(lnk, p)) + +    @classmethod +    def xtime(cls, path, uuid): +        try: +            return struct.unpack('!II', Xattr.lgetxattr(path, '.'.join([cls.GX_NSPACE, uuid, 'xtime']), 8)) +        except OSError: +            ex = sys.exc_info()[1] +            if ex.errno in (ENOENT, ENODATA): +                return ex.errno +            else: +                raise + +    @classmethod +    def set_xtime(cls, path, uuid, mark): +        Xattr.lsetxattr(path, '.'.join([cls.GX_NSPACE, uuid, 'xtime']), struct.pack('!II', *mark)) + +    @staticmethod +    def pid(): +        return os.getpid() + +    lastping = 0 +    @classmethod +    def ping(cls): +        cls.lastping += 1 +        return cls.lastping + + +class SlaveLocal(object): + +    def can_connect_to(self, remote): +        return not remote + +    def service_loop(self): +        repce = RepceServer(self.server, sys.stdin, sys.stdout, int(gconf.sync_jobs)) +        t = threading.Thread(target=repce.service_loop) +        t.setDaemon(True) +        t.start() +        logging.info("slave listening") +        if gconf.timeout and int(gconf.timeout) > 0: +            while True: +                lp = self.server.lastping +                time.sleep(int(gconf.timeout)) +                if lp == self.server.lastping: +                    logging.info("connection inactive for %d seconds, stopping" % int(gconf.timeout)) +                    break +        else: +            select.select((), (), ()) + +class SlaveRemote(object): + +    def connect_remote(self, rargs=[], **opts): +        slave = opts.get('slave', self.url) +        ix, ox = os.pipe() +        iy, oy = os.pipe() +        pid = os.fork() +        if not pid: +            os.close(ox) +            os.dup2(ix, sys.stdin.fileno()) +            os.close(iy) +            os.dup2(oy, sys.stdout.fileno()) +            argv = rargs + gconf.remote_gsyncd.split() + ['-N', '--listen', '--timeout', str(gconf.timeout), slave] +            os.execvp(argv[0], argv) +        os.close(ix) +        os.close(oy) +        return self.start_fd_client(iy, ox, **opts) + +    def start_fd_client(self, i, o, **opts): +        self.server = RepceClient(i, o) +        if gconf.timeout and int(gconf.timeout) > 0: +            def pinger(): +                while True: +                    self.server.ping() +                    time.sleep(int(gconf.timeout) * 0.5) +            t = threading.Thread(target=pinger) +            t.setDaemon(True) +            t.start() + +    def rsync(self, files, *args): +        if not files: +            raise RuntimeError("no files to sync") +        logging.debug("files: " + ", ".join(files)) +        argv = gconf.rsync_command.split() + gconf.rsync_extra.split() + ['-aR'] + files + list(args) +        return os.spawnvp(os.P_WAIT, argv[0], argv) == 0 + + +class AbstractUrl(object): + +    def __init__(self, path, pattern): +        m = re.search(pattern, path) +        if not m: +            raise RuntimeError("malformed path") +        self.path = path +        return m.groups() + +    def scheme(self): +        return type(self).__name__.lower() + +    @property +    def url(self): +        return "://".join((self.scheme(), self.path)) + + +  ### Concrete resource classes ### + + +class FILE(AbstractUrl, SlaveLocal, SlaveRemote): + +    class FILEServer(Server): +        pass + +    server = FILEServer + +    def __init__(self, path): +        sup(self, path, '^/') + +    def connect(self): +        os.chdir(self.path) + +    def rsync(self, files): +        return sup(self, files, self.path) + + +class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote): + +    class GLUSTERServer(Server): + +        @classmethod +        def volume_info(cls): +            vm = struct.unpack('!' + 'B'*19 + 'II', +                               Xattr.lgetxattr('.', '.'.join([cls.GX_NSPACE, 'volume-mark']), 27)) +            m = re.match('(.{8})(.{4})(.{4})(.{4})(.{12})', "".join(['%02x' % x for x in vm[2:18]])) +            uuid = '-'.join(m.groups()) +            return { 'version': vm[0:2], +                     'uuid'   : uuid, +                     'retval' : vm[18], +                     'volume_mark': vm[-2:] } + +    server = GLUSTERServer + +    def __init__(self, path): +        self.host, self.volume = sup(self, path, '^(%s):(.+)' % HostRX.pattern) + +    def can_connect_to(self, remote): +        return True + +    def connect(self): +        d = tempfile.mkdtemp() +        try: +            argv = [gconf.gluster_command] + \ +                    (gconf.gluster_log_level and ['-L', gConf.gluster_log_level] or []) + \ +                    ['-l', gconf.gluster_log_file, '-s', self.host, +                     '--volfile-id', self.volume, '--client-pid=-1', d] +            if os.spawnvp(os.P_WAIT, argv[0], argv): +                raise RuntimeError("command failed: " + " ".join(argv)) +            logging.debug('auxiliary glusterfs mount in place') +            os.chdir(d) +            argv = ['umount', '-l', d] +            if os.spawnvp(os.P_WAIT, argv[0], argv): +                raise RuntimeError("command failed: " + " ".join(argv)) +        finally: +            try: +                os.rmdir(d) +            except: +                logging.warn('stale mount left behind on ' + d) +        logging.debug('auxiliary glusterfs mount prepared') + +    def connect_remote(self, *a, **kw): +        sup(self, *a, **kw) +        self.slavedir = "/proc/%d/cwd" % self.server.pid() + +    def service_loop(self, *args): +        if args: +            GMaster(self, args[0]).crawl() +        else: +            sup(self, *args) + +    def rsync(self, files): +        return sup(self, files, self.slavedir) + + +class SSH(AbstractUrl, SlaveRemote): + +    def __init__(self, path): +        self.remote_addr, inner_url = sup(self, path, +                                          '^((?:%s@)?%s):(.+)' % tuple([ r.pattern for r in (UserRX, HostRX) ])) +        self.inner_rsc = parse_url(inner_url) + +    def can_connect_to(self, remote): +        return False + +    def start_fd_client(self, *a, **opts): +        if opts['deferred']: +            return a +        sup(self, *a) +        ityp = type(self.inner_rsc) +        if ityp == FILE: +            slavepath = self.inner_rsc.path +        elif ityp == GLUSTER: +            slavepath = "/proc/%d/cwd" % self.server.pid() +        else: +            raise NotImplementedError +        self.slaveurl = ':'.join([self.remote_addr, slavepath]) + +    def connect_remote(self, go_daemon=None): +        if go_daemon == 'done': +            return self.start_fd_client(*self.fd_pair) +        gconf.setup_ssh_ctl(tempfile.mkdtemp()) +        deferred = go_daemon == 'postconn' +        ret = sup(self, gconf.ssh_command.split() + gconf.ssh_ctl_args + [self.remote_addr], slave=self.inner_rsc.url, deferred=deferred) +        if deferred: +            # send a ping to peer so that we can wait for +            # the answer from which we know connection is +            # established and we can proceed with daemonization +            # (doing that too early robs the ssh passwd prompt...) +            # However, we'd better not start the RepceClient +            # before daemonization (that's not preserved properly +            # in daemon), we just do a an ad-hoc linear put/get. +            i, o = ret +            inf = os.fdopen(i) +            repce.send(o, None, 'ping') +            select.select((inf,), (), ()) +            repce.recv(inf) +            # hack hack hack: store a global reference to the file +            # to save it from getting GC'd which implies closing it +            gconf._in_fd_reference = inf +            self.fd_pair = (i, o) +            return 'should' + +    def rsync(self, files): +        return sup(self, files, '-ze', " ".join(gconf.ssh_command.split() + gconf.ssh_ctl_args), self.slaveurl) diff --git a/xlators/features/marker/utils/syncdaemon/simplecfg.py b/xlators/features/marker/utils/syncdaemon/simplecfg.py new file mode 100644 index 000000000..fc3863ef4 --- /dev/null +++ b/xlators/features/marker/utils/syncdaemon/simplecfg.py @@ -0,0 +1,77 @@ +import re +import tempfile +import os + +CommentRe = re.compile('\s*(#|$)') +ParseRe = re.compile('\s*(\S+):\s+(.*\S)\s+$') + +class SimpleCfgError(Exception): +    pass + +class SimpleCfg(dict): +    """ +    Read/write support for a simple config file format. +    Entries can be of the form "key: value". +    "#" comments are supported. Whitespace-only lines are ignored. +    """ + +    def __init__(self, *a, **kw): +        dict.__init__(self, *a, **kw) +        self.klist =  dict.keys(self) + +    def __setitem__(self, k, v): +        k = k.replace('-', '_') +        if not k in self: +            self.klist.append(k) +        dict.__setitem__(self, k, v) + +    def __iter__(self): +        return self.klist.__iter__() + +    def keys(self): +        return self.klist + +    def pop(self, key, *a): +        e = dict.pop(self, key, *a) +        self.klist.remove(key) +        return e + +    def readstream(self, s): +        while True: +            l = s.readline() +            if not l: +                break +            m = ParseRe.match(l) +            if m: +                k, v = m.groups() +                self[k] = v +            elif not CommentRe.match(l): +                raise SimpleCfgError('syntax error') + +    def writestream(self, s): +        for k in self: +            s.write('%s: %s\n' % (k, self[k])) + +    def read(self, file): +        f = None +        try: +            f = open(file) +            self.readstream(f) +        finally: +            if f: +                f.close() + +    def write(self, file): +        tfd = None +        tfil = None +        try: +            tfd, tname = tempfile.mkstemp(dir=os.path.dirname(file)) +            tfil, tfd = os.fdopen(tfd, 'w'), None +            self.writestream(tfil) +            os.fsync(tfil.fileno()) +            os.rename(tname, file) +        finally: +            if tfd != None: +                os.close(tfd) +            if tfil != None: +                tfil.close()  | 
