diff options
| -rw-r--r-- | tools/glusterfind/src/changelog.py | 18 | ||||
| -rw-r--r-- | tools/glusterfind/src/main.py | 175 | 
2 files changed, 155 insertions, 38 deletions
diff --git a/tools/glusterfind/src/changelog.py b/tools/glusterfind/src/changelog.py index 4d0a190286e..d6f3dc188ac 100644 --- a/tools/glusterfind/src/changelog.py +++ b/tools/glusterfind/src/changelog.py @@ -351,6 +351,8 @@ def _get_args():      parser.add_argument("brick", help="Brick Name")      parser.add_argument("outfile", help="Output File")      parser.add_argument("start", help="Start Time", type=int) +    parser.add_argument("--only-query", help="Query mode only (no session)", +                        action="store_true")      parser.add_argument("--debug", help="Debug", action="store_true")      parser.add_argument("--output-prefix", help="File prefix in output",                          default=".") @@ -378,19 +380,23 @@ if __name__ == "__main__":      mkdirp(os.path.join(session_dir, args.volume), exit_on_err=True,             logger=logger) -    try: -        with open(status_file) as f: -            start = int(f.read().strip()) -    except (ValueError, OSError, IOError): +    if args.only_query:          start = args.start +    else: +        try: +            with open(status_file) as f: +                start = int(f.read().strip()) +        except (ValueError, OSError, IOError): +            start = args.start      end = int(time.time()) - get_changelog_rollover_time(args.volume)      logger.info("%s Started Changelog Crawl - Start: %s End: %s" % (args.brick,                                                                      start,                                                                      end))      actual_end = changelog_crawl(args.brick, start, end, args) -    with open(status_file_pre, "w", buffering=0) as f: -        f.write(str(actual_end)) +    if not args.only_query: +        with open(status_file_pre, "w", buffering=0) as f: +            f.write(str(actual_end))      logger.info("%s Finished Changelog Crawl - End: %s" % (args.brick,                                                             actual_end)) diff --git a/tools/glusterfind/src/main.py b/tools/glusterfind/src/main.py index 9bc4872ad9b..6d03cbed5f2 100644 --- a/tools/glusterfind/src/main.py +++ b/tools/glusterfind/src/main.py @@ -126,6 +126,25 @@ def run_cmd_nodes(task, args, **kwargs):              opts["node_outfile"] = node_outfile              opts["copy_outfile"] = True +        elif task == "query": +            # If Full backup is requested or start time is zero, use brickfind +            change_detector = conf.get_change_detector("changelog") +            node_outfiles.append(node_outfile) + +            cmd = [change_detector, +                   args.session, +                   args.volume, +                   brick, +                   node_outfile, +                   str(kwargs.get("start"))] + \ +                ["--only-query"] + \ +                ["--output-prefix", args.output_prefix] + \ +                (["--debug"] if args.debug else []) + \ +                (["--only-namespace-changes"] +                    if args.only_namespace_changes else []) + +            opts["node_outfile"] = node_outfile +            opts["copy_outfile"] = True          elif task == "cleanup":              # After pre run, cleanup the working directory and other temp files              # Remove the copied node_outfile in main node @@ -271,6 +290,23 @@ def _get_args():                              help="List only namespace changes",                              action="store_true") +    # query <VOLUME> <OUTFILE> --since-time <SINCE_TIME> +    #       [--output-prefix <OUTPUT_PREFIX>] [--full] +    parser_pre = subparsers.add_parser('query') +    parser_pre.add_argument("volume", help="Volume Name") +    parser_pre.add_argument("outfile", help="Output File", +                            action=StoreAbsPath) +    parser_pre.add_argument("--since-time", help="UNIX epoch time since which " +                            "listing is required", type=int) +    parser_pre.add_argument("--debug", help="Debug", action="store_true") +    parser_pre.add_argument("--disable-partial", help="Disable Partial find, " +                            "Fail when one node fails", action="store_true") +    parser_pre.add_argument("--output-prefix", help="File prefix in output", +                            default=".") +    parser_pre.add_argument("-N", "--only-namespace-changes", +                            help="List only namespace changes", +                            action="store_true") +      # post <SESSION> <VOLUME>      parser_post = subparsers.add_parser('post')      parser_post.add_argument("session", help="Session Name") @@ -333,6 +369,45 @@ def ssh_setup(args):      logger.info("Ssh key added to authorized_keys of Volume nodes") +def enable_volume_options(args): +    execute(["gluster", "volume", "set", +             args.volume, "build-pgfid", "on"], +            exit_msg="Failed to set volume option build-pgfid on", +            logger=logger) +    logger.info("Volume option set %s, build-pgfid on" % args.volume) + +    execute(["gluster", "volume", "set", +             args.volume, "changelog.changelog", "on"], +            exit_msg="Failed to set volume option " +            "changelog.changelog on", logger=logger) +    logger.info("Volume option set %s, changelog.changelog on" +                % args.volume) + +    execute(["gluster", "volume", "set", +             args.volume, "changelog.capture-del-path", "on"], +            exit_msg="Failed to set volume option " +            "changelog.capture-del-path on", logger=logger) +    logger.info("Volume option set %s, changelog.capture-del-path on" +                % args.volume) + + +def write_output(args, outfilemerger): +    with open(args.outfile, "a") as f: +        for row in outfilemerger.get(): +            # Multiple paths in case of Hardlinks +            paths = row[1].split(",") +            row_2_rep = None +            for p in paths: +                if p == "": +                    continue +                p_rep = p.replace("%2F%2F", "%2F") +                if not row_2_rep: +                    row_2_rep = row[2].replace("%2F%2F", "%2F") +                if p_rep == row_2_rep: +                    continue +                f.write("%s %s %s\n" % (row[0], p_rep, row_2_rep)) + +  def mode_create(session_dir, args):      logger.debug("Init is called - Session: %s, Volume: %s"                   % (args.session, args.volume)) @@ -360,26 +435,7 @@ def mode_create(session_dir, args):      if not os.path.exists(status_file) or args.force:          ssh_setup(args) - -        execute(["gluster", "volume", "set", -                 args.volume, "build-pgfid", "on"], -                exit_msg="Failed to set volume option build-pgfid on", -                logger=logger) -        logger.info("Volume option set %s, build-pgfid on" % args.volume) - -        execute(["gluster", "volume", "set", -                 args.volume, "changelog.changelog", "on"], -                exit_msg="Failed to set volume option " -                "changelog.changelog on", logger=logger) -        logger.info("Volume option set %s, changelog.changelog on" -                    % args.volume) - -        execute(["gluster", "volume", "set", -                 args.volume, "changelog.capture-del-path", "on"], -                exit_msg="Failed to set volume option " -                "changelog.capture-del-path on", logger=logger) -        logger.info("Volume option set %s, changelog.capture-del-path on" -                    % args.volume) +        enable_volume_options(args)      # Add Rollover time to current time to make sure changelogs      # will be available if we use this time as start time @@ -398,6 +454,59 @@ def mode_create(session_dir, args):      sys.exit(0) +def mode_query(session_dir, args): +    # Verify volume status +    cmd = ["gluster", 'volume', 'info', args.volume, "--xml"] +    _, data, _ = execute(cmd, +                         exit_msg="Failed to Run Gluster Volume Info", +                         logger=logger) +    try: +        tree = etree.fromstring(data) +        statusStr = tree.find('volInfo/volumes/volume/statusStr').text +    except (ParseError, AttributeError) as e: +        fail("Invalid Volume: %s" % e, logger=logger) + +    if statusStr != "Started": +        fail("Volume %s is not online" % args.volume, logger=logger) + +    mkdirp(session_dir, exit_on_err=True, logger=logger) +    mkdirp(os.path.join(session_dir, args.volume), exit_on_err=True, +           logger=logger) +    mkdirp(os.path.dirname(args.outfile), exit_on_err=True, logger=logger) + +    # Configure cluster for pasword-less SSH +    ssh_setup(args) + +    # Enable volume options for changelog capture +    enable_volume_options(args) + +    # Start query command processing +    if args.since_time: +        start = args.since_time +        logger.debug("Query is called - Session: %s, Volume: %s, " +                     "Start time: %s" +                     % ("default", args.volume, start)) + +        run_cmd_nodes("query", args, start=start) + +        # Merger +        # Read each Changelogs db and generate finaldb +        create_file(args.outfile, exit_on_err=True, logger=logger) +        outfilemerger = OutputMerger(args.outfile + ".db", node_outfiles) +        write_output(args, outfilemerger) + +        try: +            os.remove(args.outfile + ".db") +        except (IOError, OSError): +            pass + +        run_cmd_nodes("cleanup", args) + +        sys.stdout.write("Generated output file %s\n" % args.outfile) +    else: +        fail("Please specify --since-time option") + +  def mode_pre(session_dir, args):      """      Read from Session file and write to session.pre file @@ -441,15 +550,7 @@ def mode_pre(session_dir, args):          create_file(args.outfile, exit_on_err=True, logger=logger)          outfilemerger = OutputMerger(args.outfile + ".db", node_outfiles) -        with open(args.outfile, "a") as f: -            for row in outfilemerger.get(): -                # Multiple paths in case of Hardlinks -                paths = row[1].split(",") -                for p in paths: -                    if p == "" or p.replace("%2F%2F","%2F") == \ -                       row[2].replace("%2F%2F","%2F"): -                        continue -                    f.write("%s %s %s\n" % (row[0], p, row[2])) +        write_output(args, outfilemerger)      try:          os.remove(args.outfile + ".db") @@ -566,18 +667,28 @@ def main():          args = _get_args()          mkdirp(conf.get_opt("session_dir"), exit_on_err=True) +        # force the default session name if mode is "query" +        if args.mode == "query": +            args.session = "default" +          if args.mode == "list":              session_dir = conf.get_opt("session_dir")          else:              session_dir = os.path.join(conf.get_opt("session_dir"),                                         args.session) -        if not os.path.exists(session_dir) and args.mode not in ["create", -                                                                 "list"]: +        if not os.path.exists(session_dir) and \ +                args.mode not in ["create", "list", "query"]: +            fail("Invalid session %s" % args.session) + +        # "default" is a system defined session name +        if args.mode in ["create", "post", "pre", "delete"] and \ +                args.session == "default":              fail("Invalid session %s" % args.session)          vol_dir = os.path.join(session_dir, args.volume) -        if not os.path.exists(vol_dir) and args.mode not in ["create", "list"]: +        if not os.path.exists(vol_dir) and args.mode not in \ +                ["create", "list", "query"]:              fail("Session %s not created with volume %s" %                   (args.session, args.volume))  | 
