diff options
| author | Avra Sengupta <asengupt@redhat.com> | 2013-05-27 22:23:57 +0530 | 
|---|---|---|
| committer | Vijay Bellur <vbellur@redhat.com> | 2013-07-22 01:53:03 -0700 | 
| commit | 950371be29d029179ac5cd0ad2dfdbfcd4467b96 (patch) | |
| tree | a979d3c710c25074088bd05cef5b6a0ede9505a9 /geo-replication/syncdaemon/gsyncd.py | |
| parent | 11f6c56f83b977a08f9d74563249cef59e22a05d (diff) | |
move 'xlators/marker/utils/' to 'geo-replication/' directory
Change-Id: Ibd0faefecc15b6713eda28bc96794ae58aff45aa
BUG: 847839
Original Author: Amar Tumballi <amarts@redhat.com>
Signed-off-by: Avra Sengupta <asengupt@redhat.com>
Reviewed-on: http://review.gluster.org/5133
Tested-by: Gluster Build System <jenkins@build.gluster.com>
Reviewed-by: Vijay Bellur <vbellur@redhat.com>
Diffstat (limited to 'geo-replication/syncdaemon/gsyncd.py')
| -rw-r--r-- | geo-replication/syncdaemon/gsyncd.py | 419 | 
1 files changed, 419 insertions, 0 deletions
diff --git a/geo-replication/syncdaemon/gsyncd.py b/geo-replication/syncdaemon/gsyncd.py new file mode 100644 index 00000000000..387900e6ce8 --- /dev/null +++ b/geo-replication/syncdaemon/gsyncd.py @@ -0,0 +1,419 @@ +#!/usr/bin/env python + +import os +import os.path +import sys +import time +import logging +import signal +import optparse +import fcntl +import fnmatch +from optparse import OptionParser, SUPPRESS_HELP +from logging import Logger +from errno import EEXIST, ENOENT + +from ipaddr import IPAddress, IPNetwork + +from gconf import gconf +from syncdutils import FreeObject, norm, grabpidfile, finalize, log_raise_exception +from syncdutils import GsyncdError, select, set_term_handler, privileged +from configinterface import GConffile +import resource +from monitor import monitor + +class GLogger(Logger): +    """Logger customizations for gsyncd. + +    It implements a log format similar to that of glusterfs. +    """ + +    def makeRecord(self, name, level, *a): +        rv = Logger.makeRecord(self, name, level, *a) +        rv.nsecs = (rv.created - int(rv.created)) * 1000000 +        fr = sys._getframe(4) +        callee = fr.f_locals.get('self') +        if callee: +            ctx = str(type(callee)).split("'")[1].split('.')[-1] +        else: +            ctx = '<top>' +        if not hasattr(rv, 'funcName'): +            rv.funcName = fr.f_code.co_name +        rv.lvlnam = logging.getLevelName(level)[0] +        rv.ctx = ctx +        return rv + +    @classmethod +    def setup(cls, **kw): +        lbl = kw.get('label', "") +        if lbl: +            lbl = '(' + lbl + ')' +        lprm = {'datefmt': "%Y-%m-%d %H:%M:%S", +                'format': "[%(asctime)s.%(nsecs)d] %(lvlnam)s [%(module)s" + lbl + ":%(lineno)s:%(funcName)s] %(ctx)s: %(message)s"} +        lprm.update(kw) +        lvl = kw.get('level', logging.INFO) +        lprm['level'] = lvl +        logging.root = cls("root", lvl) +        logging.setLoggerClass(cls) +        logging.getLogger().handlers = [] +        logging.basicConfig(**lprm) + +    @classmethod +    def _gsyncd_loginit(cls, **kw): +        lkw = {} +        if gconf.log_level: +            lkw['level'] = gconf.log_level +        if kw.get('log_file'): +            if kw['log_file'] in ('-', '/dev/stderr'): +                lkw['stream'] = sys.stderr +            elif kw['log_file'] == '/dev/stdout': +                lkw['stream'] = sys.stdout +            else: +                lkw['filename'] = kw['log_file'] + +        cls.setup(label=kw.get('label'), **lkw) + +        lkw.update({'saved_label': kw.get('label')}) +        gconf.log_metadata = lkw +        gconf.log_exit = True + +def startup(**kw): +    """set up logging, pidfile grabbing, daemonization""" +    if getattr(gconf, 'pid_file', None) and kw.get('go_daemon') != 'postconn': +        if not grabpidfile(): +            sys.stderr.write("pidfile is taken, exiting.\n") +            sys.exit(2) +        gconf.pid_file_owned = True + +    if kw.get('go_daemon') == 'should': +        x, y = os.pipe() +        gconf.cpid = os.fork() +        if gconf.cpid: +            os.close(x) +            sys.exit() +        os.close(y) +        os.setsid() +        dn = os.open(os.devnull, os.O_RDWR) +        for f in (sys.stdin, sys.stdout, sys.stderr): +            os.dup2(dn, f.fileno()) +        if getattr(gconf, 'pid_file', None): +            if not grabpidfile(gconf.pid_file + '.tmp'): +                raise GsyncdError("cannot grab temporary pidfile") +            os.rename(gconf.pid_file + '.tmp', gconf.pid_file) +        # wait for parent to terminate +        # so we can start up with +        # no messing from the dirty +        # ol' bustard +        select((x,), (), ()) +        os.close(x) + +    GLogger._gsyncd_loginit(**kw) + +def main(): +    """main routine, signal/exception handling boilerplates""" +    gconf.starttime = time.time() +    set_term_handler() +    GLogger.setup() +    excont = FreeObject(exval = 0) +    try: +        try: +            main_i() +        except: +            log_raise_exception(excont) +    finally: +        finalize(exval = excont.exval) + +def main_i(): +    """internal main routine + +    parse command line, decide what action will be taken; +    we can either: +    - query/manipulate configuration +    - format gsyncd urls using gsyncd's url parsing engine +    - start service in following modes, in given stages: +      - monitor: startup(), monitor() +      - master: startup(), connect_remote(), connect(), service_loop() +      - slave: startup(), connect(), service_loop() +    """ +    rconf = {'go_daemon': 'should'} + +    def store_abs(opt, optstr, val, parser): +        if val and val != '-': +            val = os.path.abspath(val) +        setattr(parser.values, opt.dest, val) +    def store_local(opt, optstr, val, parser): +        rconf[opt.dest] = val +    def store_local_curry(val): +        return lambda o, oo, vx, p: store_local(o, oo, val, p) +    def store_local_obj(op, dmake): +        return lambda o, oo, vx, p: store_local(o, oo, FreeObject(op=op, **dmake(vx)), p) + +    op = OptionParser(usage="%prog [options...] <master> <slave>", version="%prog 0.0.1") +    op.add_option('--gluster-command-dir', metavar='DIR',   default='') +    op.add_option('--gluster-log-file',    metavar='LOGF',  default=os.devnull, type=str, action='callback', callback=store_abs) +    op.add_option('--gluster-log-level',   metavar='LVL') +    op.add_option('--gluster-params',      metavar='PRMS',  default='') +    op.add_option('--gluster-cli-options', metavar='OPTS',  default='--log-file=-') +    op.add_option('--mountbroker',         metavar='LABEL') +    op.add_option('-p', '--pid-file',      metavar='PIDF',  type=str, action='callback', callback=store_abs) +    op.add_option('-l', '--log-file',      metavar='LOGF',  type=str, action='callback', callback=store_abs) +    op.add_option('--log-file-mbr',        metavar='LOGF',  type=str, action='callback', callback=store_abs) +    op.add_option('--state-file',          metavar='STATF', type=str, action='callback', callback=store_abs) +    op.add_option('--ignore-deletes',      default=False, action='store_true') +    op.add_option('--use-rsync-xattrs',    default=False, action='store_true') +    op.add_option('-L', '--log-level',     metavar='LVL') +    op.add_option('-r', '--remote-gsyncd', metavar='CMD',   default=os.path.abspath(sys.argv[0])) +    op.add_option('--volume-id',           metavar='UUID') +    op.add_option('--session-owner',       metavar='ID') +    op.add_option('-s', '--ssh-command',   metavar='CMD',   default='ssh') +    op.add_option('--rsync-command',       metavar='CMD',   default='rsync') +    op.add_option('--rsync-options',       metavar='OPTS',  default='--sparse') +    op.add_option('--rsync-ssh-options',   metavar='OPTS',  default='--compress') +    op.add_option('--timeout',             metavar='SEC',   type=int, default=120) +    op.add_option('--connection-timeout',  metavar='SEC',   type=int, default=60, help=SUPPRESS_HELP) +    op.add_option('--sync-jobs',           metavar='N',     type=int, default=3) +    op.add_option('--turns',               metavar='N',     type=int, default=0, help=SUPPRESS_HELP) +    op.add_option('--allow-network',       metavar='IPS',   default='') +    op.add_option('--socketdir',           metavar='DIR') +    op.add_option('--state-socket-unencoded', metavar='SOCKF', type=str, action='callback', callback=store_abs) +    op.add_option('--checkpoint',          metavar='LABEL', default='') +    # tunables for failover/failback mechanism: +    # None   - gsyncd behaves as normal +    # blind  - gsyncd works with xtime pairs to identify +    #          candidates for synchronization +    # wrapup - same as normal mode but does not assign +    #          xtimes to orphaned files +    # see crawl() for usage of the above tunables +    op.add_option('--special-sync-mode', type=str, help=SUPPRESS_HELP) + +    op.add_option('-c', '--config-file',   metavar='CONF',  type=str, action='callback', callback=store_local) +    # duh. need to specify dest or value will be mapped to None :S +    op.add_option('--monitor', dest='monitor', action='callback', callback=store_local_curry(True)) +    op.add_option('--feedback-fd', dest='feedback_fd', type=int, help=SUPPRESS_HELP, action='callback', callback=store_local) +    op.add_option('--listen', dest='listen', help=SUPPRESS_HELP,      action='callback', callback=store_local_curry(True)) +    op.add_option('-N', '--no-daemon', dest="go_daemon",    action='callback', callback=store_local_curry('dont')) +    op.add_option('--debug', dest="go_daemon",              action='callback', callback=lambda *a: (store_local_curry('dont')(*a), +                                                                                                    setattr(a[-1].values, 'log_file', '-'), +                                                                                                    setattr(a[-1].values, 'log_level', 'DEBUG'))), + +    for a in ('check', 'get'): +        op.add_option('--config-' + a,      metavar='OPT',  type=str, dest='config', action='callback', +                      callback=store_local_obj(a, lambda vx: {'opt': vx})) +    op.add_option('--config-get-all', dest='config', action='callback', callback=store_local_obj('get', lambda vx: {'opt': None})) +    for m in ('', '-rx', '-glob'): +        # call this code 'Pythonic' eh? +        # have to define a one-shot local function to be able to inject (a value depending on the) +        # iteration variable into the inner lambda +        def conf_mod_opt_regex_variant(rx): +            op.add_option('--config-set' + m,   metavar='OPT VAL', type=str, nargs=2, dest='config', action='callback', +                          callback=store_local_obj('set', lambda vx: {'opt': vx[0], 'val': vx[1], 'rx': rx})) +            op.add_option('--config-del' + m,   metavar='OPT',  type=str, dest='config', action='callback', +                          callback=store_local_obj('del', lambda vx: {'opt': vx, 'rx': rx})) +        conf_mod_opt_regex_variant(m and m[1:] or False) + +    op.add_option('--normalize-url',           dest='url_print', action='callback', callback=store_local_curry('normal')) +    op.add_option('--canonicalize-url',        dest='url_print', action='callback', callback=store_local_curry('canon')) +    op.add_option('--canonicalize-escape-url', dest='url_print', action='callback', callback=store_local_curry('canon_esc')) + +    tunables = [ norm(o.get_opt_string()[2:]) for o in op.option_list if o.callback in (store_abs, 'store_true', None) and o.get_opt_string() not in ('--version', '--help') ] +    remote_tunables = [ 'listen', 'go_daemon', 'timeout', 'session_owner', 'config_file', 'use_rsync_xattrs' ] +    rq_remote_tunables = { 'listen': True } + +    # precedence for sources of values: 1) commandline, 2) cfg file, 3) defaults +    # -- for this to work out we need to tell apart defaults from explicitly set +    # options... so churn out the defaults here and call the parser with virgin +    # values container. +    defaults = op.get_default_values() +    opts, args = op.parse_args(values=optparse.Values()) +    confdata = rconf.get('config') +    if not (len(args) == 2 or \ +            (len(args) == 1 and rconf.get('listen')) or \ +            (len(args) <= 2 and confdata) or \ +            rconf.get('url_print')): +        sys.stderr.write("error: incorrect number of arguments\n\n") +        sys.stderr.write(op.get_usage() + "\n") +        sys.exit(1) + +    restricted = os.getenv('_GSYNCD_RESTRICTED_') + +    if restricted: +        allopts = {} +        allopts.update(opts.__dict__) +        allopts.update(rconf) +        bannedtuns = set(allopts.keys()) - set(remote_tunables) +        if bannedtuns: +            raise GsyncdError('following tunables cannot be set with restricted SSH invocaton: ' + \ +                              ', '.join(bannedtuns)) +        for k, v in rq_remote_tunables.items(): +            if not k in allopts or allopts[k] != v: +                raise GsyncdError('tunable %s is not set to value %s required for restricted SSH invocaton' % \ +                                  (k, v)) + +    confrx = getattr(confdata, 'rx', None) +    if confrx: +        # peers are regexen, don't try to parse them +        if confrx == 'glob': +            args = [ '\A' + fnmatch.translate(a) for a in args ] +        canon_peers = args +        namedict = {} +    else: +        rscs = [resource.parse_url(u) for u in args] +        dc = rconf.get('url_print') +        if dc: +            for r in rscs: +                print(r.get_url(**{'normal': {}, +                                   'canon': {'canonical': True}, +                                   'canon_esc': {'canonical': True, 'escaped': True}}[dc])) +            return +        local = remote = None +        if rscs: +            local = rscs[0] +            if len(rscs) > 1: +                remote = rscs[1] +            if not local.can_connect_to(remote): +                raise GsyncdError("%s cannot work with %s" % (local.path, remote and remote.path)) +        pa = ([], [], []) +        urlprms = ({}, {'canonical': True}, {'canonical': True, 'escaped': True}) +        for x in rscs: +            for i in range(len(pa)): +                pa[i].append(x.get_url(**urlprms[i])) +        peers, canon_peers, canon_esc_peers = pa +        # creating the namedict, a dict representing various ways of referring to / repreenting +        # peers to be fillable in config templates +        mods = (lambda x: x, lambda x: x[0].upper() + x[1:], lambda x: 'e' + x[0].upper() + x[1:]) +        if remote: +            rmap = { local: ('local', 'master'), remote: ('remote', 'slave') } +        else: +            rmap = { local: ('local', 'slave') } +        namedict = {} +        for i in range(len(rscs)): +            x = rscs[i] +            for name in rmap[x]: +                for j in range(3): +                    namedict[mods[j](name)] = pa[j][i] +                if x.scheme == 'gluster': +                    namedict[name + 'vol'] = x.volume +    if not 'config_file' in rconf: +        rconf['config_file'] = os.path.join(os.path.dirname(sys.argv[0]), "conf/gsyncd.conf") +    gcnf = GConffile(rconf['config_file'], canon_peers, defaults.__dict__, opts.__dict__, namedict) + +    checkpoint_change = False +    if confdata: +        opt_ok = norm(confdata.opt) in tunables + [None] +        if confdata.op == 'check': +            if opt_ok: +                sys.exit(0) +            else: +                sys.exit(1) +        elif not opt_ok: +            raise GsyncdError("not a valid option: " + confdata.opt) +        if confdata.op == 'get': +            gcnf.get(confdata.opt) +        elif confdata.op == 'set': +            gcnf.set(confdata.opt, confdata.val, confdata.rx) +        elif confdata.op == 'del': +            gcnf.delete(confdata.opt, confdata.rx) +        # when modifying checkpoint, it's important to make a log +        # of that, so in that case we go on to set up logging even +        # if its just config invocation +        if confdata.opt == 'checkpoint' and confdata.op in ('set', 'del') and \ +           not confdata.rx: +            checkpoint_change = True +        if not checkpoint_change: +            return + +    gconf.__dict__.update(defaults.__dict__) +    gcnf.update_to(gconf.__dict__) +    gconf.__dict__.update(opts.__dict__) +    gconf.configinterface = gcnf + +    if restricted and gconf.allow_network: +        ssh_conn = os.getenv('SSH_CONNECTION') +        if not ssh_conn: +            #legacy env var +            ssh_conn = os.getenv('SSH_CLIENT') +        if ssh_conn: +            allowed_networks = [ IPNetwork(a) for a in gconf.allow_network.split(',') ] +            client_ip = IPAddress(ssh_conn.split()[0]) +            allowed = False +            for nw in allowed_networks: +                if client_ip in nw: +                    allowed = True +                    break +            if not allowed: +                raise GsyncdError("client IP address is not allowed") + +    ffd = rconf.get('feedback_fd') +    if ffd: +        fcntl.fcntl(ffd, fcntl.F_SETFD, fcntl.FD_CLOEXEC) + +    #normalize loglevel +    lvl0 = gconf.log_level +    if isinstance(lvl0, str): +        lvl1 = lvl0.upper() +        lvl2 = logging.getLevelName(lvl1) +        # I have _never_ _ever_ seen such an utterly braindead +        # error condition +        if lvl2 == "Level " + lvl1: +            raise GsyncdError('cannot recognize log level "%s"' % lvl0) +        gconf.log_level = lvl2 + +    if not privileged() and gconf.log_file_mbr: +        gconf.log_file = gconf.log_file_mbr + +    if checkpoint_change: +        try: +            GLogger._gsyncd_loginit(log_file=gconf.log_file, label='conf') +            if confdata.op == 'set': +                logging.info('checkpoint %s set' % confdata.val) +            elif confdata.op == 'del': +                logging.info('checkpoint info was reset') +        except IOError: +            if sys.exc_info()[1].errno == ENOENT: +                # directory of log path is not present, +                # which happens if we get here from +                # a peer-multiplexed "config-set checkpoint" +                # (as that directory is created only on the +                # original node) +                pass +            else: +                raise +        return + +    go_daemon = rconf['go_daemon'] +    be_monitor = rconf.get('monitor') + +    if not be_monitor and isinstance(remote, resource.SSH) and \ +       go_daemon == 'should': +        go_daemon = 'postconn' +        log_file = None +    else: +        log_file = gconf.log_file +    if be_monitor: +        label = 'monitor' +    elif remote: +        #master +        label = '' +    else: +        label = 'slave' +    startup(go_daemon=go_daemon, log_file=log_file, label=label) + +    if be_monitor: +        return monitor() + +    logging.info("syncing: %s" % " -> ".join(peers)) +    resource.Popen.init_errhandler() +    if remote: +        go_daemon = remote.connect_remote(go_daemon=go_daemon) +        if go_daemon: +            startup(go_daemon=go_daemon, log_file=gconf.log_file) +            # complete remote connection in child +            remote.connect_remote(go_daemon='done') +    local.connect() +    if ffd: +        os.close(ffd) +    local.service_loop(*[r for r in [remote] if r]) + + +if __name__ == "__main__": +    main()  | 
