diff options
Diffstat (limited to 'extras/geo-rep/schedule_georep.py')
| -rw-r--r-- | extras/geo-rep/schedule_georep.py | 475 | 
1 files changed, 475 insertions, 0 deletions
diff --git a/extras/geo-rep/schedule_georep.py b/extras/geo-rep/schedule_georep.py new file mode 100644 index 00000000000..9b9b131f474 --- /dev/null +++ b/extras/geo-rep/schedule_georep.py @@ -0,0 +1,475 @@ +#!/usr/bin/env python +""" +Schedule Geo-replication +------------------------ +A tool to run Geo-replication when required. This can be used to +schedule the Geo-replication to run once in a day using + +   # Run daily at 08:30pm +   30 20 * * * root python /usr/share/glusterfs/scripts/schedule_georep.py \\ +      --no-color gv1 fvm1 gv2 >> /var/log/glusterfs/schedule_georep.log 2>&1 + +This tool does the following, + +1. Stop Geo-replication if Started +2. Start Geo-replication +3. Set Checkpoint +4. Check the Status and see Checkpoint is Complete.(LOOP) +5. If checkpoint complete, Stop Geo-replication + +Usage: + +    python /usr/share/glusterfs/scripts/schedule_georep.py <MASTERVOL> \\ +         <SLAVEHOST> <SLAVEVOL> + +For example, + +    python /usr/share/glusterfs/scripts/schedule_georep.py gv1 fvm1 gv2 + +""" +import subprocess +import time +import xml.etree.cElementTree as etree +import sys +from contextlib import contextmanager +import tempfile +import os +from argparse import ArgumentParser, RawDescriptionHelpFormatter + +ParseError = etree.ParseError if hasattr(etree, 'ParseError') else SyntaxError +cache_data = {} + +SESSION_MOUNT_LOG_FILE = ("/var/log/glusterfs/geo-replication" +                          "/schedule_georep.mount.log") + +USE_CLI_COLOR = True + + +class GlusterBadXmlFormat(Exception): +    """ +    Exception class for XML Parse Errors +    """ +    pass + + +def output_notok(msg, err="", exitcode=1): +    if USE_CLI_COLOR: +        out = "\033[31m[NOT OK]\033[0m {0}\n{1}\n" +    else: +        out = "[NOT OK] {0}\n{1}\n" +    sys.stderr.write(out.format(msg, err)) +    sys.exit(exitcode) + + +def output_warning(msg): +    if USE_CLI_COLOR: +        out = "\033[33m[  WARN]\033[0m {0}\n" +    else: +        out = "[  WARN] {0}\n" +    sys.stderr.write(out.format(msg)) + + +def output_ok(msg): +    if USE_CLI_COLOR: +        out = "\033[32m[    OK]\033[0m {0}\n" +    else: +        out = "[    OK] {0}\n" +    sys.stderr.write(out.format(msg)) + + +def execute(cmd, success_msg="", failure_msg="", exitcode=-1): +    """ +    Generic wrapper to execute the CLI commands. Returns Output if success. +    On success it can print message in stdout if specified. +    On failure, exits after writing to stderr. +    """ +    p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) +    out, err = p.communicate() +    if p.returncode == 0: +        if success_msg: +            output_ok(success_msg) +        return out +    else: +        err_msg = err if err else out +        output_notok(failure_msg, err=err_msg, exitcode=exitcode) + + +def cache_output_with_args(func): +    """ +    Decorator function to remember the output of any function +    """ +    def wrapper(*args, **kwargs): +        global cache_data +        key = "_".join([func.func_name] + list(args)) +        if cache_data.get(key, None) is None: +            cache_data[key] = func(*args, **kwargs) + +        return cache_data[key] +    return wrapper + + +def cleanup(hostname, volname, mnt): +    """ +    Unmount the Volume and Remove the temporary directory +    """ +    execute(["umount", mnt], +            failure_msg="Unable to Unmount Gluster Volume " +            "{0}:{1}(Mounted at {2})".format(hostname, volname, mnt)) +    execute(["rmdir", mnt], +            failure_msg="Unable to Remove temp directory " +            "{0}".format(mnt)) + + +@contextmanager +def glustermount(hostname, volname): +    """ +    Context manager for Mounting Gluster Volume +    Use as +        with glustermount(HOSTNAME, VOLNAME) as MNT: +            # Do your stuff +    Automatically unmounts it in case of Exceptions/out of context +    """ +    mnt = tempfile.mkdtemp(prefix="georepsetup_") +    execute(["glusterfs", +             "--xlator-option=\"*dht.lookup-unhashed=off\"", +             "--volfile-server", hostname, +             "--volfile-id", volname, +             "-l", SESSION_MOUNT_LOG_FILE, +             "--client-pid=-1", +             mnt], +            failure_msg="Unable to Mount Gluster Volume " +            "{0}:{1}".format(hostname, volname)) +    if os.path.ismount(mnt): +        yield mnt +    else: +        output_notok("Unable to Mount Gluster Volume " +                     "{0}:{1}".format(hostname, volname)) +    cleanup(hostname, volname, mnt) + + +@cache_output_with_args +def get_bricks(volname): +    """ +    Returns Bricks list, caches the Bricks list for a volume once +    parsed. +    """ +    value = [] +    cmd = ["gluster", "volume", "info", volname, "--xml"] +    info = execute(cmd) +    try: +        tree = etree.fromstring(info) +        volume_el = tree.find('volInfo/volumes/volume') +        for b in volume_el.findall('bricks/brick'): +            value.append({"name": b.find("name").text, +                          "hostUuid": b.find("hostUuid").text}) +    except ParseError: +        raise GlusterBadXmlFormat("Bad XML Format: %s" % " ".join(cmd)) + +    return value + + +def get_georep_status(mastervol, slave): +    session_keys = set() +    out = {} +    cmd = ["gluster", "volume", "geo-replication"] +    if mastervol is not None: +        cmd += [mastervol] +        if slave: +            cmd += [slave] + +    cmd += ["status", "--xml"] +    info = execute(cmd) + +    try: +        tree = etree.fromstring(info) +        # Get All Sessions +        for volume_el in tree.findall("geoRep/volume"): +            sessions_el = volume_el.find("sessions") +            # Master Volume name if multiple Volumes +            mvol = volume_el.find("name").text + +            # For each session, collect the details +            for session in sessions_el.findall("session"): +                session_slave = "{0}:{1}".format(mvol, session.find( +                    "session_slave").text) +                session_keys.add(session_slave) +                out[session_slave] = {} + +                for pair in session.findall('pair'): +                    master_brick = "{0}:{1}".format( +                        pair.find("master_node").text, +                        pair.find("master_brick").text +                    ) + +                    out[session_slave][master_brick] = { +                        "mastervol": mvol, +                        "slavevol": pair.find("slave").text.split("::")[-1], +                        "master_node": pair.find("master_node").text, +                        "master_brick": pair.find("master_brick").text, +                        "slave_user": pair.find("slave_user").text, +                        "slave": pair.find("slave").text, +                        "slave_node": pair.find("slave_node").text, +                        "status": pair.find("status").text, +                        "crawl_status": pair.find("crawl_status").text, +                        "entry": pair.find("entry").text, +                        "data": pair.find("data").text, +                        "meta": pair.find("meta").text, +                        "failures": pair.find("failures").text, +                        "checkpoint_completed": pair.find( +                            "checkpoint_completed").text, +                        "master_node_uuid": pair.find("master_node_uuid").text, +                        "last_synced": pair.find("last_synced").text, +                        "checkpoint_time": pair.find("checkpoint_time").text, +                        "checkpoint_completion_time": +                        pair.find("checkpoint_completion_time").text +                    } +    except ParseError: +        raise GlusterBadXmlFormat("Bad XML Format: %s" % " ".join(cmd)) + +    return session_keys, out + + +def get_offline_status(volname, brick, node_uuid, slave): +    node, brick = brick.split(":") +    if "@" not in slave: +        slave_user = "root" +    else: +        slave_user, _ = slave.split("@") + +    return { +        "mastervol": volname, +        "slavevol": slave.split("::")[-1], +        "master_node": node, +        "master_brick": brick, +        "slave_user": slave_user, +        "slave": slave, +        "slave_node": "N/A", +        "status": "Offline", +        "crawl_status": "N/A", +        "entry": "N/A", +        "data": "N/A", +        "meta": "N/A", +        "failures": "N/A", +        "checkpoint_completed": "N/A", +        "master_node_uuid": node_uuid, +        "last_synced": "N/A", +        "checkpoint_time": "N/A", +        "checkpoint_completion_time": "N/A" +    } + + +def get(mastervol=None, slave=None): +    """ +    This function gets list of Bricks of Master Volume and collects +    respective Geo-rep status. Output will be always ordered as the +    bricks list in Master Volume. If Geo-rep status is not available +    for any brick then it updates OFFLINE status. +    """ +    out = [] +    session_keys, gstatus = get_georep_status(mastervol, slave) + +    for session in session_keys: +        mvol, _, slave = session.split(":", 2) +        slave = slave.replace("ssh://", "") +        master_bricks = get_bricks(mvol) +        out.append([]) +        for brick in master_bricks: +            bname = brick["name"] +            if gstatus.get(session) and gstatus[session].get(bname, None): +                out[-1].append(gstatus[session][bname]) +            else: +                out[-1].append( +                    get_offline_status(mvol, bname, brick["hostUuid"], slave)) + +    return out + + +def get_summary(mastervol, slave_url): +    """ +    Wrapper function around Geo-rep Status and Gluster Volume Info +    This combines the output from Bricks list and Geo-rep Status. +    If a Master Brick node is down or Status is faulty then increments +    the faulty counter. It also collects the checkpoint status from all +    workers and compares with Number of Bricks. +    """ +    down_rows = [] +    faulty_rows = [] +    out = [] + +    status_data = get(mastervol, slave_url) + +    for session in status_data: +        summary = { +            "active": 0, +            "passive": 0, +            "faulty": 0, +            "initializing": 0, +            "stopped": 0, +            "created": 0, +            "offline": 0, +            "paused": 0, +            "workers": 0, +            "completed_checkpoints": 0, +            "checkpoint": False, +            "checkpoints_ok": False, +            "ok": False +        } + +        for row in session: +            summary[row["status"].replace("...", "").lower()] += 1 +            summary["workers"] += 1 +            if row["checkpoint_completed"] == "Yes": +                summary["completed_checkpoints"] += 1 + +            session_name = "{0}=>{1}".format( +                row["mastervol"], +                row["slave"].replace("ssh://", "") +            ) + +            if row["status"] == "Faulty": +                faulty_rows.append("{0}:{1}".format(row["master_node"], +                                                    row["master_brick"])) + +            if row["status"] == "Offline": +                down_rows.append("{0}:{1}".format(row["master_node"], +                                                  row["master_brick"])) + +        if summary["active"] == summary["completed_checkpoints"] and \ +           summary["faulty"] == 0 and summary["offline"] == 0: +            summary["checkpoints_ok"] = True + +        if summary["faulty"] == 0 and summary["offline"] == 0: +            summary["ok"] = True + +        out.append([session_name, summary, faulty_rows, down_rows]) + +    return out + + +def touch_mount_root(mastervol): +    # Create a Mount and Touch the Mount point root, +    # Hack to make sure some event available after +    # setting Checkpoint. Without this their is a chance of +    # Checkpoint never completes. +    with glustermount("localhost", mastervol) as mnt: +        execute(["touch", mnt]) + + +def main(args): +    turns = 1 + +    # Stop Force +    cmd = ["gluster", "volume", "geo-replication", args.mastervol, +           "%s::%s" % (args.slave, args.slavevol), "stop", "force"] +    execute(cmd) +    output_ok("Stopped Geo-replication") + +    # Set Checkpoint to NOW +    cmd = ["gluster", "volume", "geo-replication", args.mastervol, +           "%s::%s" % (args.slave, args.slavevol), "config", "checkpoint", +           "now"] +    execute(cmd) +    output_ok("Set Checkpoint") + +    # Start the Geo-replication +    cmd = ["gluster", "volume", "geo-replication", args.mastervol, +           "%s::%s" % (args.slave, args.slavevol), "start"] +    execute(cmd) +    output_ok("Started Geo-replication and watching Status for " +              "Checkpoint completion") + +    touch_mount_root(args.mastervol) + +    start_time = int(time.time()) +    duration = 0 + +    # Sleep till Geo-rep initializes +    time.sleep(60) + +    slave_url = "{0}::{1}".format(args.slave, args.slavevol) + +    # Loop to Check the Geo-replication Status and Checkpoint +    # If All Status OK and all Checkpoints complete, +    # Stop the Geo-replication and Log the Completeness +    while True: +        session_summary = get_summary(args.mastervol, +                                      slave_url) +        if len(session_summary) == 0: +            # If Status command fails with another transaction error +            # or any other error. Gluster cmd still produces XML output +            # with different message +            output_warning("Unable to get Geo-replication Status") +            time.sleep(1) +            continue + +        session_name, summary, faulty_rows, down_rows = session_summary[0] +        chkpt_status = "COMPLETE" if summary["checkpoints_ok"] else \ +                       "NOT COMPLETE" +        ok_status = "OK" if summary["ok"] else "NOT OK" + +        if summary["ok"]: +            output_ok("All Checkpoints {1}, " +                      "All status {2} (Turns {0:>3})".format( +                          turns, chkpt_status, ok_status)) +        else: +            if not summary["checkpoints_ok"]: +                # If Checkpoint is not complete after a iteration means brick +                # was down and came online now. SETATTR on mount is not +                # recorded, So again issue touch on mount root So that +                # Stime will increase and Checkpoint will complete. +                touch_mount_root(args.mastervol) + +            output_warning("All Checkpoints {1}, " +                           "All status {2} (Turns {0:>3})".format( +                               turns, chkpt_status, ok_status)) + +        if summary["checkpoints_ok"]: +            output_ok("Stopping Geo-replication session now") +            cmd = ["gluster", "volume", "geo-replication", args.mastervol, +                   "%s::%s" % (args.slave, args.slavevol), "stop"] +            execute(cmd) +            break + +        if not summary["ok"]: +            output_warning("Geo-rep workers Faulty/Offline, " +                           "Faulty: {0} Offline: {1}".format( +                               repr(faulty_rows), +                               repr(down_rows))) + +        # Increment the turns and Sleep for 10 sec +        turns += 1 +        duration = int(time.time()) - start_time +        if args.timeout > 0 and duration > (args.timeout * 60): +            cmd = ["gluster", "volume", "geo-replication", args.mastervol, +                   "%s::%s" % (args.slave, args.slavevol), "stop", "force"] +            execute(cmd) +            output_notok("Timed out, Stopping Geo-replication(" +                         "Duration: {0}sec)".format(duration)) + +        time.sleep(args.interval) + +if __name__ == "__main__": +    parser = ArgumentParser(formatter_class=RawDescriptionHelpFormatter, +                            description=__doc__) +    parser.add_argument("mastervol", help="Master Volume Name") +    parser.add_argument("slave", +                        help="SLAVEHOST or root@SLAVEHOST " +                        "or user@SLAVEHOST", +                        metavar="SLAVE") +    parser.add_argument("slavevol", help="Slave Volume Name") +    parser.add_argument("--interval", help="Interval in Seconds. " +                        "Wait time before each status check", +                        type=int, default=10) +    parser.add_argument("--timeout", help="Timeout in minutes. Script will " +                        "stop Geo-replication if Checkpoint is not complete " +                        "in the specified timeout time", type=int, +                        default=0) +    parser.add_argument("--no-color", help="Use Color in CLI output", +                        action="store_true") +    args = parser.parse_args() +    if args.no_color: +        USE_CLI_COLOR = False +    try: +        main(args) +    except KeyboardInterrupt: +        output_notok("Exiting...")  | 
