summaryrefslogtreecommitdiffstats
path: root/xlators/features/marker/utils/syncdaemon/gsyncd.py
diff options
context:
space:
mode:
Diffstat (limited to 'xlators/features/marker/utils/syncdaemon/gsyncd.py')
-rw-r--r--xlators/features/marker/utils/syncdaemon/gsyncd.py230
1 files changed, 230 insertions, 0 deletions
diff --git a/xlators/features/marker/utils/syncdaemon/gsyncd.py b/xlators/features/marker/utils/syncdaemon/gsyncd.py
new file mode 100644
index 00000000000..f84df502185
--- /dev/null
+++ b/xlators/features/marker/utils/syncdaemon/gsyncd.py
@@ -0,0 +1,230 @@
+#!/usr/bin/env python
+
+import os
+import os.path
+import sys
+import time
+import logging
+import signal
+import select
+import shutil
+import optparse
+from optparse import OptionParser, SUPPRESS_HELP
+from logging import Logger
+from errno import EEXIST, ENOENT
+
+from gconf import gconf
+import resource
+from simplecfg import SimpleCfg
+
+class GLogger(Logger):
+
+ def makeRecord(self, name, level, *a):
+ rv = Logger.makeRecord(self, name, level, *a)
+ rv.nsecs = (rv.created - int(rv.created)) * 1000000
+ fr = sys._getframe(4)
+ callee = fr.f_locals.get('self')
+ if callee:
+ ctx = str(type(callee)).split("'")[1].split('.')[-1]
+ else:
+ ctx = '<top>'
+ if not hasattr(rv, 'funcName'):
+ rv.funcName = fr.f_code.co_name
+ rv.lvlnam = logging.getLevelName(level)[0]
+ rv.ctx = ctx
+ return rv
+
+ @classmethod
+ def setup(cls, **kw):
+ lprm = {'datefmt': "%Y-%m-%d %H:%M:%S",
+ 'format': "[%(asctime)s.%(nsecs)d] %(lvlnam)s [%(module)s:%(lineno)s:%(funcName)s] %(ctx)s: %(message)s"}
+ lprm.update(kw)
+ lvl = kw.get('level', logging.INFO)
+ if isinstance(lvl, str):
+ lvl = logging.getLevelName(lvl)
+ lprm['level'] = lvl
+ logging.root = cls("root", lvl)
+ logging.setLoggerClass(cls)
+ logging.getLogger().handlers = []
+ logging.basicConfig(**lprm)
+
+
+def startup(**kw):
+ def write_pid(fn):
+ fd = None
+ try:
+ fd = os.open(fn, os.O_CREAT|os.O_TRUNC|os.O_WRONLY|os.O_EXCL)
+ os.write(fd, str(os.getpid()) + '\n')
+ finally:
+ if fd:
+ os.close(fd)
+
+ if kw.get('go_daemon') == 'should':
+ x, y = os.pipe()
+ gconf.cpid = os.fork()
+ if gconf.cpid:
+ os.close(x)
+ sys.exit()
+ os.close(y)
+ # wait for parent to terminate
+ # so we can start up with
+ # no messing from the dirty
+ # ol' bustard
+ select.select((x,), (), ())
+ os.close(x)
+ if getattr(gconf, 'pid_file', None):
+ write_pid(gconf.pid_file + '.tmp')
+ os.rename(gconf.pid_file + '.tmp', gconf.pid_file)
+ os.setsid()
+ dn = os.open(os.devnull, os.O_RDWR)
+ for f in (sys.stdin, sys.stdout, sys.stderr):
+ os.dup2(dn, f.fileno())
+ elif getattr(gconf, 'pid_file', None):
+ try:
+ write_pid(gconf.pid_file)
+ except OSError:
+ gconf.pid_file = None
+ ex = sys.exc_info()[1]
+ if ex.errno == EEXIST:
+ sys.stderr.write("pidfile is taken, exiting.\n")
+ exit(2)
+ raise
+
+ lkw = {'level': gconf.log_level}
+ if kw.get('log_file'):
+ lkw['filename'] = kw['log_file']
+ GLogger.setup(**lkw)
+
+def finalize(*a):
+ if getattr(gconf, 'pid_file', None):
+ if gconf.cpid:
+ while True:
+ f = open(gconf.pid_file)
+ pid = f.read()
+ f.close()
+ pid = int(pid.strip())
+ if pid == gconf.cpid:
+ break
+ if pid != os.getpid():
+ raise RuntimeError("corrupt pidfile")
+ if os.waitpid(gconf.cpid, os.WNOHANG)[0] == gconf.cpid:
+ break;
+ time.sleep(0.1)
+ else:
+ try:
+ os.unlink(gconf.pid_file)
+ except:
+ ex = sys.exc_info()[1]
+ if ex.errno == ENOENT:
+ pass
+ else:
+ raise
+ if gconf.ssh_ctl_dir and not gconf.cpid:
+ shutil.rmtree(gconf.ssh_ctl_dir)
+
+def main():
+ # ??? "finally" clause does not take effect with SIGTERM...
+ # but this handler neither does
+ # signal.signal(signal.SIGTERM, finalize)
+ GLogger.setup()
+ try:
+ try:
+ main_i()
+ except:
+ exc = sys.exc_info()[0]
+ if exc != SystemExit:
+ logging.exception("FAIL: ")
+ sys.stderr.write("failed with %s.\n" % exc.__name__)
+ exit(1)
+ finally:
+ finalize()
+
+def main_i():
+ rconf = {'go_daemon': 'should'}
+
+ def store_abs(opt, optstr, val, parser):
+ setattr(parser.values, opt.dest, os.path.abspath(val))
+ def store_local(opt, optstr, val, parser):
+ rconf[opt.dest] = val
+ def store_local_curry(val):
+ return lambda o, oo, vx, p: store_local(o, oo, val, p)
+
+ op = OptionParser(usage="%prog [options...] <master> <slave>", version="%prog 0.0.1")
+ op.add_option('--gluster-command', metavar='CMD', default='glusterfs')
+ op.add_option('--gluster-log-file', metavar='LOGF', default=os.devnull, type=str, action='callback', callback=store_abs)
+ op.add_option('--gluster-log-level', metavar='LVL')
+ op.add_option('-p', '--pid-file', metavar='PIDF', type=str, action='callback', callback=store_abs)
+ op.add_option('-l', '--log-file', metavar='LOGF', type=str, action='callback', callback=store_abs)
+ op.add_option('-L', '--log-level', metavar='LVL')
+ op.add_option('-r', '--remote-gsyncd', metavar='CMD', default=os.path.abspath(sys.argv[0]))
+ op.add_option('-s', '--ssh-command', metavar='CMD', default='ssh')
+ op.add_option('--rsync-command', metavar='CMD', default='rsync')
+ op.add_option('--rsync-extra', metavar='ARGS', default='-sS', help=SUPPRESS_HELP)
+ op.add_option('--timeout', metavar='SEC', type=int, default=30)
+ op.add_option('--sync-jobs', metavar='N', type=int, default=3)
+ op.add_option('--turns', metavar='N', type=int, default=0, help=SUPPRESS_HELP)
+
+ op.add_option('-c', '--config-file', metavar='CONF', type=str, action='callback', callback=store_local)
+ # duh. need to specify dest or value will be mapped to None :S
+ op.add_option('--listen', dest='listen', help=SUPPRESS_HELP, action='callback', callback=store_local_curry(True))
+ op.add_option('-N', '--no-daemon', dest="go_daemon", action='callback', callback=store_local_curry('dont'))
+ op.add_option('--debug', dest="go_daemon", action='callback', callback=lambda *a: (store_local_curry('dont')(*a),
+ a[-1].values.__dict__.get('log_level') or \
+ a[-1].values.__dict__.update(log_level='DEBUG')))
+ # precedence for sources of values: 1) commandline, 2) cfg file, 3) defaults
+ # -- for this to work out we need to tell apart defaults from explicitly set
+ # options... so churn out the defaults here and call the parser with virgin
+ # values container.
+ defaults = op.get_default_values()
+ opts, args = op.parse_args(values=optparse.Values())
+ if not (len(args) == 2 or (len(args) == 1 and rconf.get('listen'))):
+ sys.stderr.write("error: incorrect number of arguments\n\n")
+ sys.stderr.write(op.get_usage() + "\n")
+ sys.exit(1)
+
+ gconf.__dict__.update(defaults.__dict__)
+ # XXX add global config support
+ if not 'config_file' in rconf:
+ rconf['config_file'] = os.path.join(os.path.dirname(sys.argv[0]), "conf/gsyncd.conf")
+ try:
+ cfg = SimpleCfg()
+ cfg.read(rconf['config_file'])
+ gconf.__dict__.update(cfg)
+ except IOError:
+ ex = sys.exc_info()[1]
+ if ex.errno != ENOENT:
+ raise
+ gconf.__dict__.update(opts.__dict__)
+
+ local = resource.parse_url(args[0])
+ remote = None
+ if len(args) > 1:
+ remote = resource.parse_url(args[1])
+
+ if not local.can_connect_to(remote):
+ raise RuntimeError("%s cannot work with %s" % (local.path, remote and remote.path))
+
+ go_daemon = rconf['go_daemon']
+
+ if isinstance(remote, resource.SSH) and go_daemon == 'should':
+ go_daemon = 'postconn'
+ log_file = None
+ else:
+ log_file = gconf.log_file
+ startup(go_daemon=go_daemon, log_file=log_file)
+
+ logging.info("syncing: %s" % " -> ".join([x.url for x in [local, remote] if x]))
+ if remote:
+ go_daemon = remote.connect_remote(go_daemon=go_daemon)
+ if go_daemon:
+ startup(go_daemon=go_daemon, log_file=gconf.log_file)
+ # complete remote connection in child
+ remote.connect_remote(go_daemon='done')
+ local.connect()
+ local.service_loop(*[r for r in [remote] if r])
+
+ logging.info("exiting.")
+
+
+if __name__ == "__main__":
+ main()