diff options
| author | Avra Sengupta <asengupt@redhat.com> | 2013-05-27 22:23:57 +0530 | 
|---|---|---|
| committer | Vijay Bellur <vbellur@redhat.com> | 2013-07-22 01:53:03 -0700 | 
| commit | 950371be29d029179ac5cd0ad2dfdbfcd4467b96 (patch) | |
| tree | a979d3c710c25074088bd05cef5b6a0ede9505a9 /geo-replication/syncdaemon/syncdutils.py | |
| parent | 11f6c56f83b977a08f9d74563249cef59e22a05d (diff) | |
move 'xlators/marker/utils/' to 'geo-replication/' directory
Change-Id: Ibd0faefecc15b6713eda28bc96794ae58aff45aa
BUG: 847839
Original Author: Amar Tumballi <amarts@redhat.com>
Signed-off-by: Avra Sengupta <asengupt@redhat.com>
Reviewed-on: http://review.gluster.org/5133
Tested-by: Gluster Build System <jenkins@build.gluster.com>
Reviewed-by: Vijay Bellur <vbellur@redhat.com>
Diffstat (limited to 'geo-replication/syncdaemon/syncdutils.py')
| -rw-r--r-- | geo-replication/syncdaemon/syncdutils.py | 288 | 
1 files changed, 288 insertions, 0 deletions
diff --git a/geo-replication/syncdaemon/syncdutils.py b/geo-replication/syncdaemon/syncdutils.py new file mode 100644 index 00000000000..0764c07904d --- /dev/null +++ b/geo-replication/syncdaemon/syncdutils.py @@ -0,0 +1,288 @@ +import os +import sys +import pwd +import time +import fcntl +import shutil +import logging +from threading import Lock, Thread as baseThread +from errno import EACCES, EAGAIN, EPIPE, ENOTCONN, ECONNABORTED, EINTR, errorcode +from signal import signal, SIGTERM, SIGKILL +from time import sleep +import select as oselect +from os import waitpid as owaitpid +try: +    from cPickle import PickleError +except ImportError: +    # py 3 +    from pickle import PickleError + +from gconf import gconf + +try: +    # py 3 +    from urllib import parse as urllib +except ImportError: +    import urllib + +def escape(s): +    """the chosen flavor of string escaping, used all over +       to turn whatever data to creatable representation""" +    return urllib.quote_plus(s) + +def unescape(s): +    """inverse of .escape""" +    return urllib.unquote_plus(s) + +def norm(s): +    if s: +        return s.replace('-', '_') + +def update_file(path, updater, merger = lambda f: True): +    """update a file in a transaction-like manner""" + +    fr = fw = None +    try: +        fd = os.open(path, os.O_CREAT|os.O_RDWR) +        try: +            fr = os.fdopen(fd, 'r+b') +        except: +            os.close(fd) +            raise +        fcntl.lockf(fr, fcntl.LOCK_EX) +        if not merger(fr): +            return + +        tmpp = path + '.tmp.' + str(os.getpid()) +        fd = os.open(tmpp, os.O_CREAT|os.O_EXCL|os.O_WRONLY) +        try: +            fw = os.fdopen(fd, 'wb', 0) +        except: +            os.close(fd) +            raise +        updater(fw) +        os.fsync(fd) +        os.rename(tmpp, path) +    finally: +        for fx in (fr, fw): +            if fx: +                fx.close() + +def grabfile(fname, content=None): +    """open @fname + contest for its fcntl lock + +    @content: if given, set the file content to it +    """ +    # damn those messy open() mode codes +    fd = os.open(fname, os.O_CREAT|os.O_RDWR) +    f = os.fdopen(fd, 'r+b', 0) +    try: +        fcntl.lockf(f, fcntl.LOCK_EX|fcntl.LOCK_NB) +    except: +        ex = sys.exc_info()[1] +        f.close() +        if isinstance(ex, IOError) and ex.errno in (EACCES, EAGAIN): +            # cannot grab, it's taken +            return +        raise +    if content: +        try: +            f.truncate() +            f.write(content) +        except: +            f.close() +            raise +    gconf.permanent_handles.append(f) +    return f + +def grabpidfile(fname=None, setpid=True): +    """.grabfile customization for pid files""" +    if not fname: +        fname = gconf.pid_file +    content = None +    if setpid: +        content = str(os.getpid()) + '\n' +    return grabfile(fname, content=content) + +final_lock = Lock() + +def finalize(*a, **kw): +    """all those messy final steps we go trough upon termination + +    Do away with pidfile, ssh control dir and logging. +    """ +    final_lock.acquire() +    if getattr(gconf, 'pid_file', None): +        rm_pidf = gconf.pid_file_owned +        if gconf.cpid: +            # exit path from parent branch of daemonization +            rm_pidf = False +            while True: +                f = grabpidfile(setpid=False) +                if not f: +                    # child has already taken over pidfile +                    break +                if os.waitpid(gconf.cpid, os.WNOHANG)[0] == gconf.cpid: +                    # child has terminated +                    rm_pidf = True +                    break; +                time.sleep(0.1) +        if rm_pidf: +            try: +                os.unlink(gconf.pid_file) +            except: +                ex = sys.exc_info()[1] +                if ex.errno == ENOENT: +                    pass +                else: +                    raise +    if gconf.ssh_ctl_dir and not gconf.cpid: +        shutil.rmtree(gconf.ssh_ctl_dir) +    if getattr(gconf, 'state_socket', None): +        try: +            os.unlink(gconf.state_socket) +        except: +            if sys.exc_info()[0] == OSError: +                pass +    if gconf.log_exit: +        logging.info("exiting.") +    sys.stdout.flush() +    sys.stderr.flush() +    os._exit(kw.get('exval', 0)) + +def log_raise_exception(excont): +    """top-level exception handler + +    Try to some fancy things to cover up we face with an error. +    Translate some weird sounding but well understood exceptions +    into human-friendly lingo +    """ +    is_filelog = False +    for h in logging.getLogger().handlers: +        fno = getattr(getattr(h, 'stream', None), 'fileno', None) +        if fno and not os.isatty(fno()): +            is_filelog = True + +    exc = sys.exc_info()[1] +    if isinstance(exc, SystemExit): +        excont.exval = exc.code or 0 +        raise +    else: +        logtag = None +        if isinstance(exc, GsyncdError): +            if is_filelog: +                logging.error(exc.args[0]) +            sys.stderr.write('failure: ' + exc.args[0] + '\n') +        elif isinstance(exc, PickleError) or isinstance(exc, EOFError) or \ +             ((isinstance(exc, OSError) or isinstance(exc, IOError)) and \ +              exc.errno == EPIPE): +            logging.error('connection to peer is broken') +            if hasattr(gconf, 'transport'): +                gconf.transport.wait() +                if gconf.transport.returncode == 127: +                    logging.warn("!!!!!!!!!!!!!") +                    logging.warn('!!! getting "No such file or directory" errors ' +                                 "is most likely due to MISCONFIGURATION, please consult " +                                 "http://access.redhat.com/knowledge/docs/en-US/Red_Hat_Storage/2.0/html/Administration_Guide/chap-User_Guide-Geo_Rep-Preparation-Settingup_Environment.html") +                    logging.warn("!!!!!!!!!!!!!") +                gconf.transport.terminate_geterr() +        elif isinstance(exc, OSError) and exc.errno in (ENOTCONN, ECONNABORTED): +            logging.error('glusterfs session went down [%s]', errorcode[exc.errno]) +        else: +            logtag = "FAIL" +        if not logtag and logging.getLogger().isEnabledFor(logging.DEBUG): +            logtag = "FULL EXCEPTION TRACE" +        if logtag: +            logging.exception(logtag + ": ") +            sys.stderr.write("failed with %s.\n" % type(exc).__name__) +        excont.exval = 1 +        sys.exit(excont.exval) + + +class FreeObject(object): +    """wildcard class for which any attribute can be set""" + +    def __init__(self, **kw): +        for k,v in kw.items(): +            setattr(self, k, v) + +class Thread(baseThread): +    """thread class flavor for gsyncd + +    - always a daemon thread +    - force exit for whole program if thread +      function coughs up an exception +    """ +    def __init__(self, *a, **kw): +        tf = kw.get('target') +        if tf: +            def twrap(*aa): +                excont = FreeObject(exval = 0) +                try: +                    tf(*aa) +                except: +                    try: +                        log_raise_exception(excont) +                    finally: +                        finalize(exval = excont.exval) +            kw['target'] = twrap +        baseThread.__init__(self, *a, **kw) +        self.setDaemon(True) + +class GsyncdError(Exception): +    pass + +def getusername(uid = None): +    if uid == None: +        uid = os.geteuid() +    return pwd.getpwuid(uid).pw_name + +def privileged(): +    return os.geteuid() == 0 + +def boolify(s): +    """ +    Generic string to boolean converter + +    return +    - Quick return if string 's' is of type bool +    - True if it's in true_list +    - False if it's in false_list +    - Warn if it's not present in either and return False +    """ +    true_list  = ['true', 'yes', '1', 'on'] +    false_list = ['false', 'no', '0', 'off'] + +    if isinstance(s, bool): +        return s + +    rv = False +    lstr = s.lower() +    if lstr in true_list: +        rv = True +    elif not lstr in false_list: +        logging.warn("Unknown string (%s) in string to boolean conversion defaulting to False\n" % (s)) + +    return rv + +def eintr_wrap(func, exc, *a): +    """ +    wrapper around syscalls resilient to interrupt caused +    by signals +    """ +    while True: +        try: +            return func(*a) +        except exc: +            ex = sys.exc_info()[1] +            if not ex.args[0] == EINTR: +                raise + +def select(*a): +    return eintr_wrap(oselect.select, oselect.error, *a) + +def waitpid (*a): +    return eintr_wrap(owaitpid, OSError, *a) + +def set_term_handler(hook=lambda *a: finalize(*a, **{'exval': 1})): +    signal(SIGTERM, hook)  | 
