diff options
Diffstat (limited to 'tools/glusterfind/src/main.py')
| -rw-r--r-- | tools/glusterfind/src/main.py | 278 | 
1 files changed, 126 insertions, 152 deletions
diff --git a/tools/glusterfind/src/main.py b/tools/glusterfind/src/main.py index ceaf1173897..089a3aec3c5 100644 --- a/tools/glusterfind/src/main.py +++ b/tools/glusterfind/src/main.py @@ -19,7 +19,8 @@ import logging  import shutil  from utils import execute, is_host_local, mkdirp, fail -from utils import setup_logger, human_time +from utils import setup_logger, human_time, handle_rm_error +from utils import get_changelog_rollover_time, cache_output  import conf @@ -29,6 +30,7 @@ GlusterFS Incremental API  ParseError = etree.ParseError if hasattr(etree, 'ParseError') else SyntaxError  logger = logging.getLogger() +node_outfiles = []  class StoreAbsPath(Action): @@ -46,33 +48,13 @@ def get_pem_key_path(session, volume):                          "%s_%s_secret.pem" % (session, volume)) -def node_run(volume, host, path, start, outfile, args, fallback=False): +def node_cmd(host, host_uuid, task, cmd, args, opts):      """ -    If host is local node, execute the command locally. If not local -    execute the CHANGE_DETECTOR command via ssh and copy the output file from -    remote node using scp. +    Runs command via ssh if host is not local      """ -    localdir = is_host_local(host) -    pem_key_path = get_pem_key_path(args.session, args.volume) +    localdir = is_host_local(host_uuid) -    # If Full backup is requested or start time is zero, use brickfind -    change_detector = conf.get_change_detector(args.change_detector) -    if ((start == 0 or args.full) and args.change_detector == "changelog") or \ -       fallback: -        change_detector = conf.get_change_detector("brickfind") - -    # CHANGE_DETECTOR <SESSION> <VOLUME> <BRICK> <OUTFILE> <START> --debug -    # --gfidpath <TYPE> -    cmd = [change_detector, -           args.session, -           volume, -           path, -           outfile, -           str(start), -           "--output-prefix", -           args.output_prefix] + \ -        (["--debug"] if args.debug else []) + \ -        (["--full"] if args.full else []) +    pem_key_path = get_pem_key_path(args.session, args.volume)      if not localdir:          # prefix with ssh command if not local node @@ -80,128 +62,112 @@ def node_run(volume, host, path, start, outfile, args, fallback=False):                 "-i", pem_key_path,                 "root@%s" % host] + cmd -    rc, out, err = execute(cmd, logger=logger) -    if rc == 2: -        # Partial History Fallback -        logger.info("%s %s Fallback to brickfind" % (host, err.strip())) -        # Exit only from process, handled in main. -        sys.exit(rc) -    elif rc != 0: -        fail("%s - Change detection failed" % host, logger=logger) +    execute(cmd, exit_msg="%s - %s failed" % (host, task), logger=logger) -    if not localdir: +    if opts.get("copy_outfile", False):          cmd_copy = ["scp",                      "-i", pem_key_path, -                    "root@%s:/%s" % (host, outfile), -                    os.path.dirname(outfile)] +                    "root@%s:/%s" % (host, opts.get("node_outfile")), +                    os.path.dirname(opts.get("node_outfile"))]          execute(cmd_copy, exit_msg="%s - Copy command failed" % host,                  logger=logger) -def node_cleanup(host, args): -    localdir = is_host_local(host) - -    pem_key_path = get_pem_key_path(args.session, args.volume) - -    # CHANGE_DETECTOR <SESSION> <VOLUME> <BRICK> <OUTFILE> <START> --debug -    # --gfidpath <TYPE> -    cmd = [conf.get_opt("nodecleanup"), -           args.session, -           args.volume] + (["--debug"] if args.debug else []) - -    if not localdir: -        # prefix with ssh command if not local node -        cmd = ["ssh", -               "-i", pem_key_path, -               "root@%s" % host] + cmd - -    execute(cmd, exit_msg="%s - Cleanup failed" % host, logger=logger) - - -def cleanup(nodes, args): +def run_cmd_nodes(task, args, **kwargs): +    global node_outfiles +    nodes = get_nodes(args.volume)      pool = []      for num, node in enumerate(nodes):          host, brick = node[1].split(":") -        # temp output file +        host_uuid = node[0] +        cmd = [] +        opts = {}          node_outfile = os.path.join(conf.get_opt("working_dir"), -                                    args.session, -                                    args.volume, -                                    "tmp_output_%s.txt" % num) - -        try: -            os.remove(node_outfile) -        except (OSError, IOError): -            # TODO: Cleanup Failure, Handle -            pass - -        p = Process(target=node_cleanup, -                    args=(host, args)) -        p.start() -        pool.append(p) - -    exit_codes = 0 -    for p in pool: -        p.join() -        exit_codes += (0 if p.exitcode == 0 else 1) - -    if exit_codes != 0: -        sys.exit(1) - - -def failback_node_run(brick_details, idx, volume, start, outfile, args): -    host, brick = brick_details.split(":") -    p = Process(target=node_run, -                args=(volume, host, brick, start, outfile, args, True)) -    p.start() -    p.join() -    return p.exitcode - - -def run_in_nodes(volume, start, args): -    """ -    Get nodes of volume using gluster volume info, spawn a process -    each for a Node. Merge the output files once all the process -    complete their tasks. -    """ -    nodes = get_nodes(volume) -    pool = [] -    node_outfiles = [] -    for num, node in enumerate(nodes): -        host, brick = node[1].split(":") -        # temp output file -        node_outfile = os.path.join(conf.get_opt("working_dir"), -                                    args.session, -                                    volume, -                                    "tmp_output_%s.txt" % num) -        node_outfiles.append(node_outfile) -        p = Process(target=node_run, args=(volume, host, brick, start, -                                           node_outfile, args)) -        p.start() -        pool.append(p) - -    exit_codes = 0 -    for idx, p in enumerate(pool): +                                    args.session, args.volume, +                                    "tmp_output_%s" % num) + +        if task == "pre": +            # If Full backup is requested or start time is zero, use brickfind +            change_detector = conf.get_change_detector("changelog") +            if args.full: +                change_detector = conf.get_change_detector("brickfind") + +            node_outfiles.append(node_outfile) + +            cmd = [change_detector, +                   args.session, +                   args.volume, +                   brick, +                   node_outfile, +                   str(kwargs.get("start")), +                   "--output-prefix", +                   args.output_prefix] + \ +                (["--debug"] if args.debug else []) + \ +                (["--only-namespace-changes"] if args.only_namespace_changes +                 else []) + +            opts["node_outfile"] = node_outfile +            opts["copy_outfile"] = True +        elif task == "cleanup": +            # After pre run, cleanup the working directory and other temp files +            # Remove the copied node_outfile in main node +            try: +                os.remove(node_outfile) +            except (OSError, IOError): +                logger.warn("Failed to cleanup temporary file %s" % +                            node_outfile) +                pass + +            cmd = [conf.get_opt("nodeagent"), +                   "cleanup", +                   args.session, +                   args.volume] + (["--debug"] if args.debug else []) +        elif task == "create": +            # When glusterfind create, create session directory in +            # each brick nodes +            cmd = [conf.get_opt("nodeagent"), +                   "create", +                   args.session, +                   args.volume, +                   brick, +                   kwargs.get("time_to_update")] + \ +                (["--debug"] if args.debug else []) + \ +                (["--reset-session-time"] if args.reset_session_time +                 else []) +        elif task == "post": +            # Rename pre status file to actual status file in each node +            cmd = [conf.get_opt("nodeagent"), +                   "post", +                   args.session, +                   args.volume, +                   brick] + \ +                (["--debug"] if args.debug else []) +        elif task == "delete": +            # When glusterfind delete, cleanup all the session files/dirs +            # from each node. +            cmd = [conf.get_opt("nodeagent"), +                   "delete", +                   args.session, +                   args.volume] + \ +                (["--debug"] if args.debug else []) + +        if cmd: +            p = Process(target=node_cmd, +                        args=(host, host_uuid, task, cmd, args, opts)) +            p.start() +            pool.append(p) + +    for num, p in enumerate(pool):          p.join() -        # Handle the Changelog failure, fallback to Brickfind -        if p.exitcode == 2: -            rc = failback_node_run(nodes[idx][1], idx, volume, start, -                                   node_outfiles[idx], args) -            exit_codes += (0 if rc == 0 else 1) -        elif p.exitcode != 0: -            exit_codes += (0 if p.exitcode == 0 else 1) - -    if exit_codes != 0: -        sys.exit(1) - -    # Merge all output files -    cmd = ["sort", "-u"] + node_outfiles + ["-o", args.outfile] -    execute(cmd, -            exit_msg="Failed to merge output files " -            "collected from nodes", logger=logger) - -    cleanup(nodes, args) +        if p.exitcode != 0: +            logger.warn("Command %s failed in %s" % (task, nodes[num][1])) +            if task in ["create", "delete"]: +                fail("Command %s failed in %s" % (task, nodes[num][1])) +            elif task == "pre" and args.disable_partial: +                sys.exit(1) +@cache_output  def get_nodes(volume):      """      Get the gluster volume info xml output and parse to get @@ -237,6 +203,9 @@ def _get_args():      parser_create.add_argument("--debug", help="Debug", action="store_true")      parser_create.add_argument("--force", help="Force option to recreate "                                 "the session", action="store_true") +    parser_create.add_argument("--reset-session-time", +                               help="Reset Session Time to Current Time", +                               action="store_true")      # delete <SESSION> <VOLUME> [--debug]      parser_delete = subparsers.add_parser('delete') @@ -250,7 +219,7 @@ def _get_args():      parser_list.add_argument("--volume", help="Volume Name", default="")      parser_list.add_argument("--debug", help="Debug", action="store_true") -    # pre <SESSION> <VOLUME> <OUTFILE> [--change-detector <CHANGE_DETECTOR>] +    # pre <SESSION> <VOLUME> <OUTFILE>      #     [--output-prefix <OUTPUT_PREFIX>] [--full]      parser_pre = subparsers.add_parser('pre')      parser_pre.add_argument("session", help="Session Name") @@ -258,10 +227,8 @@ def _get_args():      parser_pre.add_argument("outfile", help="Output File", action=StoreAbsPath)      parser_pre.add_argument("--debug", help="Debug", action="store_true")      parser_pre.add_argument("--full", help="Full find", action="store_true") -    parser_pre.add_argument("--change-detector", dest="change_detector", -                            help="Change detection", -                            choices=conf.list_change_detectors(), -                            type=str, default='changelog') +    parser_pre.add_argument("--disable-partial", help="Disable Partial find, " +                            "Fail when one node fails", action="store_true")      parser_pre.add_argument("--output-prefix", help="File prefix in output",                              default=".")      parser_pre.add_argument("--regenerate-outfile", @@ -363,12 +330,15 @@ def mode_create(session_dir, args):          logger.info("Volume option set %s, changelog.changelog on"                      % args.volume) -    if not os.path.exists(status_file): +    # Add Rollover time to current time to make sure changelogs +    # will be available if we use this time as start time +    time_to_update = int(time.time()) + get_changelog_rollover_time( +        args.volume) + +    run_cmd_nodes("create", args, time_to_update=str(time_to_update)) + +    if not os.path.exists(status_file) or args.reset_session_time:          with open(status_file, "w", buffering=0) as f: -            # Add Rollover time to current time to make sure changelogs -            # will be available if we use this time as start time -            time_to_update = int(time.time()) + int( -                conf.get_opt("changelog_rollover_time"))              f.write(str(time_to_update))      sys.exit(0) @@ -378,8 +348,8 @@ def mode_pre(session_dir, args):      """      Read from Session file and write to session.pre file      """ -    endtime_to_update = int(time.time()) - int( -        conf.get_opt("changelog_rollover_time")) +    endtime_to_update = int(time.time()) - get_changelog_rollover_time( +        args.volume)      status_file = os.path.join(session_dir, args.volume, "status")      status_file_pre = status_file + ".pre" @@ -404,7 +374,15 @@ def mode_pre(session_dir, args):                   "Start time: %s, End time: %s"                   % (args.session, args.volume, start, endtime_to_update)) -    run_in_nodes(args.volume, start, args) +    run_cmd_nodes("pre", args, start=start) + +    # Merger +    cmd = ["sort", "-u"] + node_outfiles + ["-o", args.outfile] +    execute(cmd, +            exit_msg="Failed to merge output files " +            "collected from nodes", logger=logger) + +    run_cmd_nodes("cleanup", args)      with open(status_file_pre, "w", buffering=0) as f:          f.write(str(endtime_to_update)) @@ -423,6 +401,7 @@ def mode_post(session_dir, args):      status_file_pre = status_file + ".pre"      if os.path.exists(status_file_pre): +        run_cmd_nodes("post", args)          os.rename(status_file_pre, status_file)          sys.exit(0)      else: @@ -430,12 +409,7 @@ def mode_post(session_dir, args):  def mode_delete(session_dir, args): -    def handle_rm_error(func, path, exc_info): -        if exc_info[1].errno == ENOENT: -            return - -        raise exc_info[1] - +    run_cmd_nodes("delete", args)      shutil.rmtree(os.path.join(session_dir, args.volume),                    onerror=handle_rm_error)  | 
