summaryrefslogtreecommitdiffstats
path: root/xlators/features/marker/utils
diff options
context:
space:
mode:
Diffstat (limited to 'xlators/features/marker/utils')
-rw-r--r--xlators/features/marker/utils/Makefile.am7
-rwxr-xr-xxlators/features/marker/utils/gsyncd.in7
-rw-r--r--xlators/features/marker/utils/syncdaemon/Makefile.am5
-rw-r--r--xlators/features/marker/utils/syncdaemon/README.md81
-rw-r--r--xlators/features/marker/utils/syncdaemon/__init__.py0
-rw-r--r--xlators/features/marker/utils/syncdaemon/gconf.py13
-rw-r--r--xlators/features/marker/utils/syncdaemon/gsyncd.py255
-rw-r--r--xlators/features/marker/utils/syncdaemon/master.py246
-rw-r--r--xlators/features/marker/utils/syncdaemon/repce.py159
-rw-r--r--xlators/features/marker/utils/syncdaemon/resource.py421
10 files changed, 0 insertions, 1194 deletions
diff --git a/xlators/features/marker/utils/Makefile.am b/xlators/features/marker/utils/Makefile.am
deleted file mode 100644
index 8aefea401..000000000
--- a/xlators/features/marker/utils/Makefile.am
+++ /dev/null
@@ -1,7 +0,0 @@
-SUBDIRS = syncdaemon
-
-gsyncddir = $(libexecdir)
-
-gsyncd_SCRIPTS = gsyncd
-
-CLEANFILES =
diff --git a/xlators/features/marker/utils/gsyncd.in b/xlators/features/marker/utils/gsyncd.in
deleted file mode 100755
index 9bbf8041f..000000000
--- a/xlators/features/marker/utils/gsyncd.in
+++ /dev/null
@@ -1,7 +0,0 @@
-#!/bin/sh
-
-prefix="@prefix@"
-exec_prefix="@exec_prefix@"
-libexecdir=`eval echo "@libexecdir@"`
-
-PYTHONPATH="$libexecdir"/python exec @PYTHON@ -c "from syncdaemon import gsyncd; gsyncd.main()" -c @sysconfdir@/glusterfs/gsyncd.conf "$@"
diff --git a/xlators/features/marker/utils/syncdaemon/Makefile.am b/xlators/features/marker/utils/syncdaemon/Makefile.am
deleted file mode 100644
index 047a8cbaa..000000000
--- a/xlators/features/marker/utils/syncdaemon/Makefile.am
+++ /dev/null
@@ -1,5 +0,0 @@
-syncdaemondir = $(libexecdir)/python/syncdaemon
-
-syncdaemon_PYTHON = gconf.py gsyncd.py __init__.py master.py README.md repce.py resource.py
-
-CLEANFILES =
diff --git a/xlators/features/marker/utils/syncdaemon/README.md b/xlators/features/marker/utils/syncdaemon/README.md
deleted file mode 100644
index d45006932..000000000
--- a/xlators/features/marker/utils/syncdaemon/README.md
+++ /dev/null
@@ -1,81 +0,0 @@
-gsycnd, the Gluster Syncdaemon
-==============================
-
-REQUIREMENTS
-------------
-
-_gsyncd_ is a program which can operate either in _master_ or in _slave_ mode.
-Requirements are categorized according to this.
-
-* supported OS is GNU/Linux
-* Python >= 2.5, or 2.4 with Ctypes (see below) (both)
-* OpenSSH >= 4.0 (master) / SSH2 compliant sshd (eg. openssh) (slave)
-* rsync (both)
-* glusterfs with marker support (master); glusterfs (optional on slave)
-* FUSE; for supported versions consult glusterfs
-
-INSTALLATION
-------------
-
-As of now, the supported way of operation is running from the source directory.
-
-If you use Python 2.4.x, you need to install the [Ctypes module](http://python.net/crew/theller/ctypes/).
-
-CONFIGURATION
--------------
-
-gsyncd tunables are a subset of the long command-line options; for listing them,
-type
-
- gsyncd.py --help
-
-and see the long options up to "--config-file". (The leading double dash should be omitted;
-interim underscores and dashes are interchangeable.) The set of options bear some resemblance
-to those of glusterfs and rsync.
-
-The config file format matches the following syntax:
-
- <option1>: <value1>
- <option2>: <value2>
- # comment
-
-By default (unless specified by the option `-c`), gsyncd looks for config file at _conf/gsyncd.conf_
-in the source tree.
-
-USAGE
------
-
-gsyncd is a utilitly for continous mirroring, ie. it mirrors master to slave incrementally.
-Assume we have a gluster volume _pop_ at localhost. We try to set up the following mirrors
-for it with gysncd:
-
-1. _/data/mirror_
-2. local gluster volume _yow_
-3. _/data/far_mirror_ at example.com
-4. gluster volume _moz_ at example.com
-
-The respective gsyncd invocations are (demoing some syntax sugaring):
-
-1.
-
- gsyncd.py gluster://localhost:pop file:///data/mirror
-
- or short form
-
- gsyncd.py :pop /data/mirror
-
-2. `gsyncd :pop :yow`
-3.
-
- gsyncd.py :pop ssh://example.com:/data/far_mirror
-
- or short form
-
- gsyncd.py :pop example.com:/data/far_mirror
-
-4. `gsyncd.py :pop example.com::moz`
-
-gsyncd has to be available on both sides; it's location on the remote side has to be specified
-via the "--remote-gsyncd" option (or "remote-gsyncd" config file parameter). (This option can also be
-used for setting options on the remote side, although the suggested mode of operation is to
-set parameters like log file / pid file in the configuration file.)
diff --git a/xlators/features/marker/utils/syncdaemon/__init__.py b/xlators/features/marker/utils/syncdaemon/__init__.py
deleted file mode 100644
index e69de29bb..000000000
--- a/xlators/features/marker/utils/syncdaemon/__init__.py
+++ /dev/null
diff --git a/xlators/features/marker/utils/syncdaemon/gconf.py b/xlators/features/marker/utils/syncdaemon/gconf.py
deleted file mode 100644
index 7bedce514..000000000
--- a/xlators/features/marker/utils/syncdaemon/gconf.py
+++ /dev/null
@@ -1,13 +0,0 @@
-import os
-
-class GConf(object):
- ssh_ctl_dir = None
- ssh_ctl_args = None
- cpid = None
-
- @classmethod
- def setup_ssh_ctl(cls, ctld):
- cls.ssh_ctl_dir = ctld
- cls.ssh_ctl_args = ["-oControlMaster=auto", "-S", os.path.join(ctld, "gsycnd-ssh-%r@%h:%p")]
-
-gconf = GConf()
diff --git a/xlators/features/marker/utils/syncdaemon/gsyncd.py b/xlators/features/marker/utils/syncdaemon/gsyncd.py
deleted file mode 100644
index f8200dd58..000000000
--- a/xlators/features/marker/utils/syncdaemon/gsyncd.py
+++ /dev/null
@@ -1,255 +0,0 @@
-#!/usr/bin/env python
-
-import os
-import os.path
-import sys
-import time
-import logging
-import signal
-import select
-import shutil
-import optparse
-from optparse import OptionParser, SUPPRESS_HELP
-from logging import Logger
-from errno import EEXIST, ENOENT
-
-from gconf import gconf
-from configinterface import GConffile
-import resource
-
-class GLogger(Logger):
-
- def makeRecord(self, name, level, *a):
- rv = Logger.makeRecord(self, name, level, *a)
- rv.nsecs = (rv.created - int(rv.created)) * 1000000
- fr = sys._getframe(4)
- callee = fr.f_locals.get('self')
- if callee:
- ctx = str(type(callee)).split("'")[1].split('.')[-1]
- else:
- ctx = '<top>'
- if not hasattr(rv, 'funcName'):
- rv.funcName = fr.f_code.co_name
- rv.lvlnam = logging.getLevelName(level)[0]
- rv.ctx = ctx
- return rv
-
- @classmethod
- def setup(cls, **kw):
- if kw.get('slave'):
- sls = "(slave)"
- else:
- sls = ""
- lprm = {'datefmt': "%Y-%m-%d %H:%M:%S",
- 'format': "[%(asctime)s.%(nsecs)d] %(lvlnam)s [%(module)s" + sls + ":%(lineno)s:%(funcName)s] %(ctx)s: %(message)s"}
- lprm.update(kw)
- lvl = kw.get('level', logging.INFO)
- if isinstance(lvl, str):
- lvl = logging.getLevelName(lvl)
- lprm['level'] = lvl
- logging.root = cls("root", lvl)
- logging.setLoggerClass(cls)
- logging.getLogger().handlers = []
- logging.basicConfig(**lprm)
-
-
-def startup(**kw):
- def write_pid(fn):
- fd = None
- try:
- fd = os.open(fn, os.O_CREAT|os.O_TRUNC|os.O_WRONLY|os.O_EXCL)
- os.write(fd, str(os.getpid()) + '\n')
- finally:
- if fd:
- os.close(fd)
-
- if kw.get('go_daemon') == 'should':
- x, y = os.pipe()
- gconf.cpid = os.fork()
- if gconf.cpid:
- os.close(x)
- sys.exit()
- os.close(y)
- # wait for parent to terminate
- # so we can start up with
- # no messing from the dirty
- # ol' bustard
- select.select((x,), (), ())
- os.close(x)
- if getattr(gconf, 'pid_file', None):
- write_pid(gconf.pid_file + '.tmp')
- os.rename(gconf.pid_file + '.tmp', gconf.pid_file)
- os.setsid()
- dn = os.open(os.devnull, os.O_RDWR)
- for f in (sys.stdin, sys.stdout, sys.stderr):
- os.dup2(dn, f.fileno())
- elif getattr(gconf, 'pid_file', None):
- try:
- write_pid(gconf.pid_file)
- except OSError:
- gconf.pid_file = None
- ex = sys.exc_info()[1]
- if ex.errno == EEXIST:
- sys.stderr.write("pidfile is taken, exiting.\n")
- exit(2)
- raise
-
- lkw = {'level': gconf.log_level}
- if kw.get('log_file'):
- lkw['filename'] = kw['log_file']
- GLogger.setup(slave=kw.get('slave'), **lkw)
-
-def finalize(*a):
- if getattr(gconf, 'pid_file', None):
- if gconf.cpid:
- while True:
- f = open(gconf.pid_file)
- pid = f.read()
- f.close()
- pid = int(pid.strip())
- if pid == gconf.cpid:
- break
- if pid != os.getpid():
- raise RuntimeError("corrupt pidfile")
- if os.waitpid(gconf.cpid, os.WNOHANG)[0] == gconf.cpid:
- break;
- time.sleep(0.1)
- else:
- try:
- os.unlink(gconf.pid_file)
- except:
- ex = sys.exc_info()[1]
- if ex.errno == ENOENT:
- pass
- else:
- raise
- if gconf.ssh_ctl_dir and not gconf.cpid:
- shutil.rmtree(gconf.ssh_ctl_dir)
-
-def main():
- # ??? "finally" clause does not take effect with SIGTERM...
- # but this handler neither does
- # signal.signal(signal.SIGTERM, finalize)
- GLogger.setup()
- try:
- try:
- main_i()
- except:
- exc = sys.exc_info()[0]
- if exc != SystemExit:
- logging.exception("FAIL: ")
- sys.stderr.write("failed with %s.\n" % exc.__name__)
- sys.exit(1)
- finally:
- finalize()
- # force exit in non-main thread too
- os._exit(1)
-
-def main_i():
- rconf = {'go_daemon': 'should'}
-
- def store_abs(opt, optstr, val, parser):
- setattr(parser.values, opt.dest, os.path.abspath(val))
- def store_local(opt, optstr, val, parser):
- rconf[opt.dest] = val
- def store_local_curry(val):
- return lambda o, oo, vx, p: store_local(o, oo, val, p)
-
- op = OptionParser(usage="%prog [options...] <master> <slave>", version="%prog 0.0.1")
- op.add_option('--gluster-command', metavar='CMD', default='glusterfs')
- op.add_option('--gluster-log-file', metavar='LOGF', default=os.devnull, type=str, action='callback', callback=store_abs)
- op.add_option('--gluster-log-level', metavar='LVL')
- op.add_option('-p', '--pid-file', metavar='PIDF', type=str, action='callback', callback=store_abs)
- op.add_option('-l', '--log-file', metavar='LOGF', type=str, action='callback', callback=store_abs)
- op.add_option('-L', '--log-level', metavar='LVL')
- op.add_option('-r', '--remote-gsyncd', metavar='CMD', default=os.path.abspath(sys.argv[0]))
- op.add_option('-s', '--ssh-command', metavar='CMD', default='ssh')
- op.add_option('--rsync-command', metavar='CMD', default='rsync')
- op.add_option('--rsync-extra', metavar='ARGS', default='-sS', help=SUPPRESS_HELP)
- op.add_option('--timeout', metavar='SEC', type=int, default=30)
- op.add_option('--sync-jobs', metavar='N', type=int, default=3)
- op.add_option('--turns', metavar='N', type=int, default=0, help=SUPPRESS_HELP)
-
- op.add_option('-c', '--config-file', metavar='CONF', type=str, action='callback', callback=store_local)
- # duh. need to specify dest or value will be mapped to None :S
- op.add_option('--listen', dest='listen', help=SUPPRESS_HELP, action='callback', callback=store_local_curry(True))
- op.add_option('-N', '--no-daemon', dest="go_daemon", action='callback', callback=store_local_curry('dont'))
- op.add_option('--debug', dest="go_daemon", action='callback', callback=lambda *a: (store_local_curry('dont')(*a),
- a[-1].values.__dict__.get('log_level') or \
- a[-1].values.__dict__.update(log_level='DEBUG')))
- op.add_option('--config-get', metavar='OPT', type=str, dest='config', action='callback', callback=store_local)
- op.add_option('--config-get-all', dest='config', action='callback', callback=store_local_curry(True))
- op.add_option('--config-set', metavar='OPT VAL', type=str, nargs=2, dest='config', action='callback', callback=store_local)
- op.add_option('--config-del', metavar='OPT', type=str, dest='config', action='callback', callback=lambda o, oo, vx, p:
- store_local(o, oo, (vx, False), p))
-
- # precedence for sources of values: 1) commandline, 2) cfg file, 3) defaults
- # -- for this to work out we need to tell apart defaults from explicitly set
- # options... so churn out the defaults here and call the parser with virgin
- # values container.
- defaults = op.get_default_values()
- opts, args = op.parse_args(values=optparse.Values())
- if not (len(args) == 2 or (len(args) == 1 and rconf.get('listen')) or (len(args) <= 2 and rconf.get('config'))):
- sys.stderr.write("error: incorrect number of arguments\n\n")
- sys.stderr.write(op.get_usage() + "\n")
- sys.exit(1)
-
- local = remote = None
- if args:
- local = resource.parse_url(args[0])
- if len(args) > 1:
- remote = resource.parse_url(args[1])
- if not local.can_connect_to(remote):
- raise RuntimeError("%s cannot work with %s" % (local.path, remote and remote.path))
- pa = ([], [])
- canon = [False, True]
- for x in (local, remote):
- if x:
- for i in range(2):
- pa[i].append(x.get_url(canonical=canon[i]))
- peers, canon_peers = pa
- if not 'config_file' in rconf:
- rconf['config_file'] = os.path.join(os.path.dirname(sys.argv[0]), "conf/gsyncd.conf")
- gcnf = GConffile(rconf['config_file'], canon_peers)
-
- confdata = rconf.get('config')
- if confdata:
- if isinstance(confdata, tuple):
- if confdata[1]:
- gcnf.set(*confdata)
- else:
- gcnf.delete(confdata[0])
- else:
- if confdata == True:
- confdata = None
- gcnf.get(confdata)
- return
-
- gconf.__dict__.update(defaults.__dict__)
- gcnf.update_to(gconf.__dict__)
- gconf.__dict__.update(opts.__dict__)
-
- go_daemon = rconf['go_daemon']
-
- if isinstance(remote, resource.SSH) and go_daemon == 'should':
- go_daemon = 'postconn'
- log_file = None
- else:
- log_file = gconf.log_file
- startup(go_daemon=go_daemon, log_file=log_file, slave=(not remote))
-
- logging.info("syncing: %s" % " -> ".join(peers))
- if remote:
- go_daemon = remote.connect_remote(go_daemon=go_daemon)
- if go_daemon:
- startup(go_daemon=go_daemon, log_file=gconf.log_file)
- # complete remote connection in child
- remote.connect_remote(go_daemon='done')
- local.connect()
- local.service_loop(*[r for r in [remote] if r])
-
- logging.info("exiting.")
-
-
-if __name__ == "__main__":
- main()
diff --git a/xlators/features/marker/utils/syncdaemon/master.py b/xlators/features/marker/utils/syncdaemon/master.py
deleted file mode 100644
index 1abd03252..000000000
--- a/xlators/features/marker/utils/syncdaemon/master.py
+++ /dev/null
@@ -1,246 +0,0 @@
-import os
-import sys
-import time
-import stat
-import signal
-import logging
-import errno
-from errno import ENOENT, ENODATA
-from threading import Thread, currentThread, Condition, Lock
-
-from gconf import gconf
-
-URXTIME = (-1, 0)
-
-class GMaster(object):
-
- def get_volinfo(self):
- self.volume_info = self.master.server.volume_info()
- if self.volume_info['retval']:
- raise RuntimeError("master is corrupt")
- return self.volume_info
-
- @property
- def uuid(self):
- if not getattr(self, '_uuid', None):
- self._uuid = self.volume_info['uuid']
- return self._uuid
-
- @property
- def volmark(self):
- return self.volume_info['volume_mark']
-
- def xtime(self, path, *a, **opts):
- if a:
- rsc = a[0]
- else:
- rsc = self.master
- if not 'create' in opts:
- opts['create'] = rsc == self.master
- xt = rsc.server.xtime(path, self.uuid)
- if (isinstance(xt, int) or xt < self.volmark) and opts['create']:
- t = time.time()
- sec = int(t)
- nsec = int((t - sec) * 1000000)
- xt = (sec, nsec)
- rsc.server.set_xtime(path, self.uuid, xt)
- if xt == ENODATA:
- xt = URXTIME
- return xt
-
- def __init__(self, master, slave):
- self.master = master
- self.slave = slave
- self.get_volinfo()
- self.jobtab = {}
- self.syncer = Syncer(slave)
- self.total_turns = int(gconf.turns)
- self.turns = 0
- self.start = None
- self.change_seen = None
- logging.info('master started on ' + self.uuid)
- while True:
- self.crawl()
-
- def add_job(self, path, label, job, *a, **kw):
- if self.jobtab.get(path) == None:
- self.jobtab[path] = []
- self.jobtab[path].append((label, a, lambda : job(*a, **kw)))
-
- def wait(self, path, *args):
- jobs = self.jobtab.pop(path, [])
- succeed = True
- for j in jobs:
- ret = j[-1]()
- if not ret:
- succeed = False
- if succeed:
- self.sendmark(path, *args)
- return succeed
-
- def sendmark(self, path, mark, adct=None):
- if adct:
- self.slave.server.setattr(path, adct)
- self.slave.server.set_xtime(path, self.uuid, mark)
-
- def crawl(self, path='.', xtl=None):
- if path == '.':
- if self.start:
- logging.info("crawl took %.6f" % (time.time() - self.start))
- time.sleep(1)
- self.start = time.time()
- logging.info("crawling...")
- self.get_volinfo()
- if self.volume_info['uuid'] != self.uuid:
- raise RuntimeError("master uuid mismatch")
- logging.debug("entering " + path)
- if not xtl:
- xtl = self.xtime(path)
- xtr0 = self.xtime(path, self.slave)
- if isinstance(xtr0, int):
- xtr = URXTIME
- else:
- xtr = xtr0
- if xtr0 == ENOENT:
- self.slave.server.mkdir(path)
- else:
- if xtr > xtl:
- raise RuntimeError("timestamp corruption for " + path)
- if xtl == xtr:
- if path == '.' and self.total_turns and self.change_seen:
- self.turns += 1
- self.change_seen = False
- logging.info("finished turn #%s/%s" % (self.turns, self.total_turns))
- if self.turns == self.total_turns:
- logging.info("reached turn limit, terminating.")
- os.kill(os.getpid(), signal.SIGTERM)
- return
- if path == '.':
- self.change_seen = True
- dem, des = ( x.server.entries(path) for x in (self.master, self.slave) )
- dd = set(des) - set(dem)
- if dd:
- self.slave.server.purge(path, dd)
- chld = []
- for e in dem:
- e = os.path.join(path, e)
- xte = self.xtime(e)
- if isinstance(xte, int):
- logging.warn("irregular xtime for %s: %s" % (e, errno.errorcode[xte]))
- elif xte > xtr:
- chld.append((e, xte))
- def indulgently(e, fnc, blame=None):
- if not blame:
- blame = path
- try:
- return fnc(e)
- except (IOError, OSError):
- ex = sys.exc_info()[1]
- if ex.errno == ENOENT:
- logging.warn("salvaged ENOENT for" + e)
- self.add_job(blame, 'salvage', lambda: False)
- return False
- else:
- raise
- for e, xte in chld:
- st = indulgently(e, lambda e: os.lstat(e))
- if st == False:
- continue
- mo = st.st_mode
- adct = {'own': (st.st_uid, st.st_gid)}
- if stat.S_ISLNK(mo):
- if indulgently(e, lambda e: self.slave.server.symlink(os.readlink(e), e)) == False:
- continue
- self.sendmark(e, xte, adct)
- elif stat.S_ISREG(mo):
- logging.debug("syncing %s ..." % e)
- pb = self.syncer.add(e)
- def regjob(e, xte, pb):
- if pb.wait():
- logging.debug("synced " + e)
- self.sendmark(e, xte)
- return True
- else:
- logging.error("failed to sync " + e)
- self.add_job(path, 'reg', regjob, e, xte, pb)
- elif stat.S_ISDIR(mo):
- adct['mode'] = mo
- if indulgently(e, lambda e: (self.add_job(path, 'cwait', self.wait, e, xte, adct),
- self.crawl(e, xte),
- True)[-1], blame=e) == False:
- continue
- else:
- # ignore fifos, sockets and special files
- pass
- if path == '.':
- self.wait(path, xtl)
-
-class BoxClosedErr(Exception):
- pass
-
-class PostBox(list):
-
- def __init__(self, *a):
- list.__init__(self, *a)
- self.lever = Condition()
- self.open = True
- self.done = False
-
- def wait(self):
- self.lever.acquire()
- if not self.done:
- self.lever.wait()
- self.lever.release()
- return self.result
-
- def wakeup(self, data):
- self.result = data
- self.lever.acquire()
- self.done = True
- self.lever.notifyAll()
- self.lever.release()
-
- def append(self, e):
- self.lever.acquire()
- if not self.open:
- raise BoxClosedErr
- list.append(self, e)
- self.lever.release()
-
- def close(self):
- self.lever.acquire()
- self.open = False
- self.lever.release()
-
-class Syncer(object):
-
- def __init__(self, slave):
- self.slave = slave
- self.lock = Lock()
- self.pb = PostBox()
- for i in range(int(gconf.sync_jobs)):
- t = Thread(target=self.syncjob)
- t.setDaemon = True
- t.start()
-
- def syncjob(self):
- while True:
- pb = None
- while True:
- self.lock.acquire()
- if self.pb:
- pb, self.pb = self.pb, PostBox()
- self.lock.release()
- if pb:
- break
- time.sleep(0.5)
- pb.close()
- pb.wakeup(self.slave.rsync(pb))
-
- def add(self, e):
- while True:
- try:
- self.pb.append(e)
- return self.pb
- except BoxClosedErr:
- pass
diff --git a/xlators/features/marker/utils/syncdaemon/repce.py b/xlators/features/marker/utils/syncdaemon/repce.py
deleted file mode 100644
index aebc8b7d8..000000000
--- a/xlators/features/marker/utils/syncdaemon/repce.py
+++ /dev/null
@@ -1,159 +0,0 @@
-import os
-import sys
-import select
-import time
-import logging
-from threading import Thread, Condition
-try:
- import thread
-except ImportError:
- # py 3
- import _thread as thread
-try:
- from Queue import Queue
-except ImportError:
- # py 3
- from queue import Queue
-try:
- import cPickle as pickle
-except ImportError:
- # py 3
- import pickle
-
-pickle_proto = -1
-repce_version = 1.0
-
-def ioparse(i, o):
- if isinstance(i, int):
- i = os.fdopen(i)
- # rely on duck typing for recognizing
- # streams as that works uniformly
- # in py2 and py3
- if hasattr(o, 'fileno'):
- o = o.fileno()
- return (i, o)
-
-def send(out, *args):
- os.write(out, pickle.dumps(args, pickle_proto))
-
-def recv(inf):
- return pickle.load(inf)
-
-
-class RepceServer(object):
-
- def __init__(self, obj, i, o, wnum=6):
- self.obj = obj
- self.inf, self.out = ioparse(i, o)
- self.wnum = wnum
- self.q = Queue()
-
- def service_loop(self):
- for i in range(self.wnum):
- t = Thread(target=self.worker)
- t.setDaemon(True)
- t.start()
- try:
- while True:
- self.q.put(recv(self.inf))
- except EOFError:
- logging.info("terminating on reaching EOF.")
-
- def worker(self):
- while True:
- in_data = self.q.get(True)
- rid = in_data[0]
- rmeth = in_data[1]
- exc = False
- try:
- res = getattr(self.obj, rmeth)(*in_data[2:])
- except:
- res = sys.exc_info()[1]
- exc = True
- logging.exception("call failed: ")
- send(self.out, rid, exc, res)
-
-
-class RepceJob(object):
-
- def __init__(self, cbk):
- self.rid = (os.getpid(), thread.get_ident(), time.time())
- self.cbk = cbk
- self.lever = Condition()
- self.done = False
-
- def __repr__(self):
- return ':'.join([str(x) for x in self.rid])
-
- def wait(self):
- self.lever.acquire()
- if not self.done:
- self.lever.wait()
- self.lever.release()
- return self.result
-
- def wakeup(self, data):
- self.result = data
- self.lever.acquire()
- self.done = True
- self.lever.notify()
- self.lever.release()
-
-
-class RepceClient(object):
-
- def __init__(self, i, o):
- self.inf, self.out = ioparse(i, o)
- self.jtab = {}
- t = Thread(target = self.listen)
- t.setDaemon(True)
- t.start()
-
- def listen(self):
- while True:
- select.select((self.inf,), (), ())
- rid, exc, res = recv(self.inf)
- rjob = self.jtab.pop(rid)
- if rjob.cbk:
- rjob.cbk(rjob, [exc, res])
-
- def push(self, meth, *args, **kw):
- cbk = kw.get('cbk')
- if not cbk:
- def cbk(rj, res):
- if res[0]:
- raise res[1]
- rjob = RepceJob(cbk)
- self.jtab[rjob.rid] = rjob
- logging.debug("call %s %s%s ..." % (repr(rjob), meth, repr(args)))
- send(self.out, rjob.rid, meth, *args)
- return rjob
-
- def __call__(self, meth, *args):
- rjob = self.push(meth, *args, **{'cbk': lambda rj, res: rj.wakeup(res)})
- exc, res = rjob.wait()
- if exc:
- logging.error('call %s (%s) failed on peer with %s' % (repr(rjob), meth, str(type(res).__name__)))
- raise res
- logging.debug("call %s %s -> %s" % (repr(rjob), meth, repr(res)))
- return res
-
- class mprx(object):
-
- def __init__(self, ins, meth):
- self.ins = ins
- self.meth = meth
-
- def __call__(self, *a):
- return self.ins(self.meth, *a)
-
- def __getattr__(self, meth):
- return self.mprx(self, meth)
-
- def __version__(self):
- d = {'proto': repce_version}
- try:
- d['object'] = self('version')
- except AttributeError:
- pass
- return d
diff --git a/xlators/features/marker/utils/syncdaemon/resource.py b/xlators/features/marker/utils/syncdaemon/resource.py
deleted file mode 100644
index 3aaca02c0..000000000
--- a/xlators/features/marker/utils/syncdaemon/resource.py
+++ /dev/null
@@ -1,421 +0,0 @@
-import re
-import os
-import sys
-import stat
-import time
-import errno
-import struct
-import select
-import socket
-import logging
-import tempfile
-import threading
-from ctypes import *
-from ctypes.util import find_library
-from errno import EEXIST, ENOENT, ENODATA, ENOTDIR, ELOOP
-
-from gconf import gconf
-import repce
-from repce import RepceServer, RepceClient
-from master import GMaster
-
-UrlRX = re.compile('\A(\w+)://(.*)')
-HostRX = re.compile('[a-z\d](?:[a-z\d.-]*[a-z\d])?', re.I)
-UserRX = re.compile("[\w!\#$%&'*+-\/=?^_`{|}~]+")
-
-def sup(x, *a, **kw):
- return getattr(super(type(x), x), sys._getframe(1).f_code.co_name)(*a, **kw)
-
-def desugar(ustr):
- m = re.match('([^:]*):(.*)', ustr)
- if m:
- if not m.groups()[0]:
- return "gluster://localhost" + ustr
- elif '@' in m.groups()[0] or re.search('[:/]', m.groups()[1]):
- return "ssh://" + ustr
- else:
- return "gluster://#{str}"
- else:
- return "file://" + os.path.abspath(ustr)
-
-def parse_url(ustr):
- m = UrlRX.match(ustr)
- if not m:
- ustr = desugar(ustr)
- m = UrlRX.match(ustr)
- if not m:
- raise RuntimeError("malformed url")
- sch, path = m.groups()
- this = sys.modules[__name__]
- if not hasattr(this, sch.upper()):
- raise RuntimeError("unknown url scheme " + sch)
- return getattr(this, sch.upper())(path)
-
-
-class Xattr(object):
-
- libc = CDLL(find_library("libc"))
-
- @classmethod
- def geterrno(cls):
- return c_int.in_dll(cls.libc, 'errno').value
-
- @classmethod
- def raise_oserr(cls):
- errn = cls.geterrno()
- raise OSError(errn, os.strerror(errn))
-
- @classmethod
- def lgetxattr(cls, path, attr, siz=0):
- if siz:
- buf = create_string_buffer('\0' * siz)
- else:
- buf = None
- ret = cls.libc.lgetxattr(path, attr, buf, siz)
- if ret == -1:
- cls.raise_oserr()
- if siz:
- return buf.raw[:ret]
- else:
- return ret
-
- @classmethod
- def lsetxattr(cls, path, attr, val):
- ret = cls.libc.lsetxattr(path, attr, val, len(val), 0)
- if ret == -1:
- cls.raise_oserr()
-
-
-class Server(object):
-
- GX_NSPACE = "trusted.glusterfs"
-
- @staticmethod
- def entries(path):
- try:
- return os.listdir(path)
- except OSError:
- ex = sys.exc_info()[1]
- if ex.errno == ENOTDIR:
- return []
- else:
- raise
-
- @classmethod
- def purge(cls, path, entries=None):
- me_also = entries == None
- if not entries:
- try:
- entries = os.listdir(path)
- except OSError:
- ex = sys.exc_info()[1]
- if ex.errno in (ENOTDIR, ENOENT, ELOOP):
- try:
- os.unlink(path)
- return
- except OSError:
- ex = sys.exc_info()[1]
- if ex.errno != ENOENT:
- raise
- else:
- raise
- for e in entries:
- cls.purge(os.path.join(path, e))
- if me_also:
- os.rmdir(path)
-
- @classmethod
- def _create(cls, path, ctor):
- try:
- ctor(path)
- except OSError:
- ex = sys.exc_info()[1]
- if ex.errno == EEXIST:
- cls.purge(path)
- return ctor(path)
- raise
-
- @classmethod
- def mkdir(cls, path):
- cls._create(path, os.mkdir)
-
- @classmethod
- def symlink(cls, lnk, path):
- cls._create(path, lambda p: os.symlink(lnk, p))
-
- @classmethod
- def xtime(cls, path, uuid):
- try:
- return struct.unpack('!II', Xattr.lgetxattr(path, '.'.join([cls.GX_NSPACE, uuid, 'xtime']), 8))
- except OSError:
- ex = sys.exc_info()[1]
- if ex.errno in (ENOENT, ENODATA):
- return ex.errno
- else:
- raise
-
- @classmethod
- def set_xtime(cls, path, uuid, mark):
- Xattr.lsetxattr(path, '.'.join([cls.GX_NSPACE, uuid, 'xtime']), struct.pack('!II', *mark))
-
- @staticmethod
- def setattr(path, adct):
- own = adct.get('own')
- if own:
- os.lchown(path, *own)
- mode = adct.get('mode')
- if mode:
- os.chmod(path, stat.S_IMODE(mode))
- times = adct.get('times')
- if times:
- os.utime(path, times)
-
- @staticmethod
- def pid():
- return os.getpid()
-
- lastping = 0
- @classmethod
- def ping(cls):
- cls.lastping += 1
- return cls.lastping
-
- @staticmethod
- def version():
- return 1.0
-
-
-class SlaveLocal(object):
-
- def can_connect_to(self, remote):
- return not remote
-
- def service_loop(self):
- repce = RepceServer(self.server, sys.stdin, sys.stdout, int(gconf.sync_jobs))
- t = threading.Thread(target=repce.service_loop)
- t.setDaemon(True)
- t.start()
- logging.info("slave listening")
- if gconf.timeout and int(gconf.timeout) > 0:
- while True:
- lp = self.server.lastping
- time.sleep(int(gconf.timeout))
- if lp == self.server.lastping:
- logging.info("connection inactive for %d seconds, stopping" % int(gconf.timeout))
- break
- else:
- select.select((), (), ())
-
-class SlaveRemote(object):
-
- def connect_remote(self, rargs=[], **opts):
- slave = opts.get('slave', self.url)
- ix, ox = os.pipe()
- iy, oy = os.pipe()
- pid = os.fork()
- if not pid:
- os.close(ox)
- os.dup2(ix, sys.stdin.fileno())
- os.close(iy)
- os.dup2(oy, sys.stdout.fileno())
- argv = rargs + gconf.remote_gsyncd.split() + ['-N', '--listen', '--timeout', str(gconf.timeout), slave]
- os.execvp(argv[0], argv)
- os.close(ix)
- os.close(oy)
- return self.start_fd_client(iy, ox, **opts)
-
- def start_fd_client(self, i, o, **opts):
- self.server = RepceClient(i, o)
- rv = self.server.__version__()
- exrv = {'proto': repce.repce_version, 'object': Server.version()}
- da0 = (rv, exrv)
- da1 = ({}, {})
- for i in range(2):
- for k, v in da0[i].iteritems():
- da1[i][k] = int(v)
- if da1[0] != da1[1]:
- raise RuntimeError("RePCe major version mismatch: local %s, remote %s" % (exrv, rv))
- if gconf.timeout and int(gconf.timeout) > 0:
- def pinger():
- while True:
- self.server.ping()
- time.sleep(int(gconf.timeout) * 0.5)
- t = threading.Thread(target=pinger)
- t.setDaemon(True)
- t.start()
-
- def rsync(self, files, *args):
- if not files:
- raise RuntimeError("no files to sync")
- logging.debug("files: " + ", ".join(files))
- argv = gconf.rsync_command.split() + gconf.rsync_extra.split() + ['-aR'] + files + list(args)
- return os.spawnvp(os.P_WAIT, argv[0], argv) == 0
-
-
-class AbstractUrl(object):
-
- def __init__(self, path, pattern):
- m = re.search(pattern, path)
- if not m:
- raise RuntimeError("malformed path")
- self.path = path
- return m.groups()
-
- def scheme(self):
- return type(self).__name__.lower()
-
- def canonical_path(self):
- return self.path
-
- def get_url(self, canonical=False):
- if canonical:
- pa = self.canonical_path()
- else:
- pa = self.path
- return "://".join((self.scheme(), pa))
-
- @property
- def url(self):
- return self.get_url()
-
-
- ### Concrete resource classes ###
-
-
-class FILE(AbstractUrl, SlaveLocal, SlaveRemote):
-
- class FILEServer(Server):
- pass
-
- server = FILEServer
-
- def __init__(self, path):
- sup(self, path, '^/')
-
- def connect(self):
- os.chdir(self.path)
-
- def rsync(self, files):
- return sup(self, files, self.path)
-
-
-class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):
-
- class GLUSTERServer(Server):
-
- @classmethod
- def volume_info(cls):
- vm = struct.unpack('!' + 'B'*19 + 'II',
- Xattr.lgetxattr('.', '.'.join([cls.GX_NSPACE, 'volume-mark']), 27))
- m = re.match('(.{8})(.{4})(.{4})(.{4})(.{12})', "".join(['%02x' % x for x in vm[2:18]]))
- uuid = '-'.join(m.groups())
- return { 'version': vm[0:2],
- 'uuid' : uuid,
- 'retval' : vm[18],
- 'volume_mark': vm[-2:] }
-
- server = GLUSTERServer
-
- def __init__(self, path):
- self.host, self.volume = sup(self, path, '^(%s):(.+)' % HostRX.pattern)
-
- def canonical_path(self):
- return ':'.join([socket.gethostbyname(self.host), self.volume])
-
- def can_connect_to(self, remote):
- return True
-
- def connect(self):
- d = tempfile.mkdtemp()
- try:
- argv = [gconf.gluster_command] + \
- (gconf.gluster_log_level and ['-L', gConf.gluster_log_level] or []) + \
- ['-l', gconf.gluster_log_file, '-s', self.host,
- '--volfile-id', self.volume, '--client-pid=-1', d]
- if os.spawnvp(os.P_WAIT, argv[0], argv):
- raise RuntimeError("command failed: " + " ".join(argv))
- logging.debug('auxiliary glusterfs mount in place')
- os.chdir(d)
- argv = ['umount', '-l', d]
- if os.spawnvp(os.P_WAIT, argv[0], argv):
- raise RuntimeError("command failed: " + " ".join(argv))
- finally:
- try:
- os.rmdir(d)
- except:
- logging.warn('stale mount possibly left behind on ' + d)
- logging.debug('auxiliary glusterfs mount prepared')
-
- def connect_remote(self, *a, **kw):
- sup(self, *a, **kw)
- self.slavedir = "/proc/%d/cwd" % self.server.pid()
-
- def service_loop(self, *args):
- if args:
- GMaster(self, args[0]).crawl()
- else:
- sup(self, *args)
-
- def rsync(self, files):
- return sup(self, files, self.slavedir)
-
-
-class SSH(AbstractUrl, SlaveRemote):
-
- def __init__(self, path):
- self.remote_addr, inner_url = sup(self, path,
- '^((?:%s@)?%s):(.+)' % tuple([ r.pattern for r in (UserRX, HostRX) ]))
- self.inner_rsc = parse_url(inner_url)
-
- def canonical_path(self):
- m = re.match('([^@]+)@(.+)', self.remote_addr)
- if m:
- u, h = m.groups()
- else:
- u, h = os.getlogin(), self.remote_addr
- remote_addr = '@'.join([u, socket.gethostbyname(h)])
- return ':'.join([remote_addr, self.inner_rsc.get_url(canonical=True)])
-
- def can_connect_to(self, remote):
- return False
-
- def start_fd_client(self, *a, **opts):
- if opts['deferred']:
- return a
- sup(self, *a)
- ityp = type(self.inner_rsc)
- if ityp == FILE:
- slavepath = self.inner_rsc.path
- elif ityp == GLUSTER:
- slavepath = "/proc/%d/cwd" % self.server.pid()
- else:
- raise NotImplementedError
- self.slaveurl = ':'.join([self.remote_addr, slavepath])
-
- def connect_remote(self, go_daemon=None):
- if go_daemon == 'done':
- return self.start_fd_client(*self.fd_pair)
- gconf.setup_ssh_ctl(tempfile.mkdtemp())
- deferred = go_daemon == 'postconn'
- ret = sup(self, gconf.ssh_command.split() + gconf.ssh_ctl_args + [self.remote_addr], slave=self.inner_rsc.url, deferred=deferred)
- if deferred:
- # send a message to peer so that we can wait for
- # the answer from which we know connection is
- # established and we can proceed with daemonization
- # (doing that too early robs the ssh passwd prompt...)
- # However, we'd better not start the RepceClient
- # before daemonization (that's not preserved properly
- # in daemon), we just do a an ad-hoc linear put/get.
- i, o = ret
- inf = os.fdopen(i)
- repce.send(o, None, '__version__')
- select.select((inf,), (), ())
- repce.recv(inf)
- # hack hack hack: store a global reference to the file
- # to save it from getting GC'd which implies closing it
- gconf._in_fd_reference = inf
- self.fd_pair = (i, o)
- return 'should'
-
- def rsync(self, files):
- return sup(self, files, '-ze', " ".join(gconf.ssh_command.split() + gconf.ssh_ctl_args), self.slaveurl)