diff options
Diffstat (limited to 'tools/glusterfind/src/main.py')
| -rw-r--r-- | tools/glusterfind/src/main.py | 157 |
1 files changed, 115 insertions, 42 deletions
diff --git a/tools/glusterfind/src/main.py b/tools/glusterfind/src/main.py index 3d0f02a65d4..4b5466d0114 100644 --- a/tools/glusterfind/src/main.py +++ b/tools/glusterfind/src/main.py @@ -1,4 +1,4 @@ -#!/usr/bin/env python +#!/usr/bin/python3 # -*- coding: utf-8 -*- # Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com/> @@ -16,18 +16,20 @@ from multiprocessing import Process import os import xml.etree.cElementTree as etree from argparse import ArgumentParser, RawDescriptionHelpFormatter, Action +from gfind_py2py3 import gfind_write_row, gfind_write import logging import shutil import tempfile import signal from datetime import datetime +import codecs +import re from utils import execute, is_host_local, mkdirp, fail from utils import setup_logger, human_time, handle_rm_error from utils import get_changelog_rollover_time, cache_output, create_file import conf from changelogdata import OutputMerger -import codecs PROG_DESCRIPTION = """ GlusterFS Incremental API @@ -35,9 +37,9 @@ GlusterFS Incremental API ParseError = etree.ParseError if hasattr(etree, 'ParseError') else SyntaxError logger = logging.getLogger() -node_outfiles = [] vol_statusStr = "" gtmpfilename = None +g_pid_nodefile_map = {} class StoreAbsPath(Action): @@ -75,12 +77,27 @@ def node_cmd(host, host_uuid, task, cmd, args, opts): cmd = ["ssh", "-oNumberOfPasswordPrompts=0", "-oStrictHostKeyChecking=no", + # We force TTY allocation (-t -t) so that Ctrl+C is handed + # through; see: + # https://bugzilla.redhat.com/show_bug.cgi?id=1382236 + # Note that this turns stderr of the remote `cmd` + # into stdout locally. "-t", "-t", "-i", pem_key_path, "root@%s" % host] + cmd - execute(cmd, exit_msg="%s - %s failed" % (host, task), logger=logger) + (returncode, err, out) = execute(cmd, logger=logger) + if returncode != 0: + # Because the `-t -t` above turns the remote stderr into + # local stdout, we need to log both stderr and stdout + # here to print all error messages. + fail("%s - %s failed; stdout (including remote stderr):\n" + "%s\n" + "stderr:\n" + "%s" % (host, task, out, err), + returncode, + logger=logger) if opts.get("copy_outfile", False) and not localdir: cmd_copy = ["scp", @@ -96,7 +113,7 @@ def node_cmd(host, host_uuid, task, cmd, args, opts): def run_cmd_nodes(task, args, **kwargs): - global node_outfiles + global g_pid_nodefile_map nodes = get_nodes(args.volume) pool = [] for num, node in enumerate(nodes): @@ -127,7 +144,6 @@ def run_cmd_nodes(task, args, **kwargs): if tag == "": tag = '""' if not is_host_local(host_uuid) else "" - node_outfiles.append(node_outfile) # remote file will be copied into this directory mkdirp(os.path.dirname(node_outfile), exit_on_err=True, logger=logger) @@ -150,6 +166,7 @@ def run_cmd_nodes(task, args, **kwargs): (["--no-encode"] if args.no_encode else []) + \ (["--only-namespace-changes"] if args.only_namespace_changes else []) + \ + (["--type", args.type]) + \ (["--field-separator", FS] if args.full else []) opts["node_outfile"] = node_outfile @@ -164,7 +181,6 @@ def run_cmd_nodes(task, args, **kwargs): if tag == "": tag = '""' if not is_host_local(host_uuid) else "" - node_outfiles.append(node_outfile) # remote file will be copied into this directory mkdirp(os.path.dirname(node_outfile), exit_on_err=True, logger=logger) @@ -188,6 +204,7 @@ def run_cmd_nodes(task, args, **kwargs): (["--no-encode"] if args.no_encode else []) + \ (["--only-namespace-changes"] if args.only_namespace_changes else []) + \ + (["--type", args.type]) + \ (["--field-separator", FS] if args.full else []) opts["node_outfile"] = node_outfile @@ -247,6 +264,7 @@ def run_cmd_nodes(task, args, **kwargs): args=(host, host_uuid, task, cmd, args, opts)) p.start() pool.append(p) + g_pid_nodefile_map[p.pid] = node_outfile for num, p in enumerate(pool): p.join() @@ -254,8 +272,11 @@ def run_cmd_nodes(task, args, **kwargs): logger.warn("Command %s failed in %s" % (task, nodes[num][1])) if task in ["create", "delete"]: fail("Command %s failed in %s" % (task, nodes[num][1])) - elif task == "pre" and args.disable_partial: - sys.exit(1) + elif task == "pre" or task == "query": + if args.disable_partial: + sys.exit(1) + else: + del g_pid_nodefile_map[p.pid] @cache_output @@ -305,6 +326,7 @@ def _get_args(): parser = ArgumentParser(formatter_class=RawDescriptionHelpFormatter, description=PROG_DESCRIPTION) subparsers = parser.add_subparsers(dest="mode") + subparsers.required = True # create <SESSION> <VOLUME> [--debug] [--force] parser_create = subparsers.add_parser('create') @@ -355,6 +377,9 @@ def _get_args(): help="Tag prefix for file names emitted during" " a full find operation; default: \"NEW\"", default="NEW") + parser_pre.add_argument('--type', help="type: f, f-files only" + " d, d-directories only, by default = both", + default='both', choices=["f", "d", "both"]) parser_pre.add_argument("--field-separator", help="Field separator string", default=" ") @@ -366,7 +391,7 @@ def _get_args(): action=StoreAbsPath) parser_query.add_argument("--since-time", help="UNIX epoch time since " "which listing is required", type=int) - parser_query.add_argument("--end-time", help="UNIX epoch time upto " + parser_query.add_argument("--end-time", help="UNIX epoch time up to " "which listing is required", type=int) parser_query.add_argument("--no-encode", help="Do not encode path in output file", @@ -384,6 +409,9 @@ def _get_args(): help="Tag prefix for file names emitted during" " a full find operation; default: \"NEW\"", default="NEW") + parser_query.add_argument('--type', help="type: f, f-files only" + " d, d-directories only, by default = both", + default='both', choices=["f", "d", "both"]) parser_query.add_argument("--field-separator", help="Field separator string", default=" ") @@ -481,30 +509,20 @@ def write_output(outfile, outfilemerger, field_separator): for p in paths: if p == "": continue - p_rep = p.replace("%2F%2F", "%2F").replace("//", "/") + p_rep = p.replace("//", "/") if not row_2_rep: - row_2_rep = row[2].replace("%2F%2F", "%2F").replace("//", - "/") + row_2_rep = row[2].replace("//", "/") if p_rep == row_2_rep: continue if row_2_rep and row_2_rep != "": - f.write(u"{0}{1}{2}{3}{4}\n".format(row[0], - field_separator, - p_rep, - field_separator, - row_2_rep)) - else: - f.write(u"{0}{1}{2}\n".format(row[0], - field_separator, - p_rep)) + gfind_write_row(f, row[0], field_separator, p_rep, row_2_rep) + else: + gfind_write(f, row[0], field_separator, p_rep) -def mode_create(session_dir, args): - logger.debug("Init is called - Session: %s, Volume: %s" - % (args.session, args.volume)) - - cmd = ["gluster", 'volume', 'info', args.volume, "--xml"] +def validate_volume(volume): + cmd = ["gluster", 'volume', 'info', volume, "--xml"] _, data, _ = execute(cmd, exit_msg="Failed to Run Gluster Volume Info", logger=logger) @@ -512,11 +530,42 @@ def mode_create(session_dir, args): tree = etree.fromstring(data) statusStr = tree.find('volInfo/volumes/volume/statusStr').text except (ParseError, AttributeError) as e: - fail("Invalid Volume: %s" % e, logger=logger) - + fail("Invalid Volume: Check the Volume name! %s" % e) if statusStr != "Started": - fail("Volume %s is not online" % args.volume, logger=logger) + fail("Volume %s is not online" % volume) + +# The rules for a valid session name. +SESSION_NAME_RULES = { + 'min_length': 2, + 'max_length': 256, # same as maximum volume length + # Specifies all alphanumeric characters, underscore, hyphen. + 'valid_chars': r'0-9a-zA-Z_-', +} + + +# checks valid session name, fail otherwise +def validate_session_name(session): + # Check for minimum length + if len(session) < SESSION_NAME_RULES['min_length']: + fail('session_name must be at least ' + + str(SESSION_NAME_RULES['min_length']) + ' characters long.') + # Check for maximum length + if len(session) > SESSION_NAME_RULES['max_length']: + fail('session_name must not exceed ' + + str(SESSION_NAME_RULES['max_length']) + ' characters length.') + + # Matches strings composed entirely of characters specified within + if not re.match(r'^[' + SESSION_NAME_RULES['valid_chars'] + + ']+$', session): + fail('Session name can only contain these characters: ' + + SESSION_NAME_RULES['valid_chars']) + + +def mode_create(session_dir, args): + validate_session_name(args.session) + logger.debug("Init is called - Session: %s, Volume: %s" + % (args.session, args.volume)) mkdirp(session_dir, exit_on_err=True, logger=logger) mkdirp(os.path.join(session_dir, args.volume), exit_on_err=True, logger=logger) @@ -537,7 +586,7 @@ def mode_create(session_dir, args): run_cmd_nodes("create", args, time_to_update=str(time_to_update)) if not os.path.exists(status_file) or args.reset_session_time: - with open(status_file, "w", buffering=0) as f: + with open(status_file, "w") as f: f.write(str(time_to_update)) sys.stdout.write("Session %s created with volume %s\n" % @@ -548,6 +597,7 @@ def mode_create(session_dir, args): def mode_query(session_dir, args): global gtmpfilename + global g_pid_nodefile_map # Verify volume status cmd = ["gluster", 'volume', 'info', args.volume, "--xml"] @@ -575,6 +625,8 @@ def mode_query(session_dir, args): enable_volume_options(args) # Test options + if not args.full and args.type in ["f", "d"]: + fail("--type can only be used with --full") if not args.since_time and not args.end_time and not args.full: fail("Please specify either {--since-time and optionally --end-time} " "or --full", logger=logger) @@ -609,14 +661,20 @@ def mode_query(session_dir, args): # Merger if args.full: - cmd = ["sort", "-u"] + node_outfiles + ["-o", args.outfile] - execute(cmd, - exit_msg="Failed to merge output files " - "collected from nodes", logger=logger) + if len(g_pid_nodefile_map) > 0: + cmd = ["sort", "-u"] + list(g_pid_nodefile_map.values()) + \ + ["-o", args.outfile] + execute(cmd, + exit_msg="Failed to merge output files " + "collected from nodes", logger=logger) + else: + fail("Failed to collect any output files from peers. " + "Looks like all bricks are offline.", logger=logger) else: # Read each Changelogs db and generate finaldb create_file(args.outfile, exit_on_err=True, logger=logger) - outfilemerger = OutputMerger(args.outfile + ".db", node_outfiles) + outfilemerger = OutputMerger(args.outfile + ".db", + list(g_pid_nodefile_map.values())) write_output(args.outfile, outfilemerger, args.field_separator) try: @@ -631,6 +689,7 @@ def mode_query(session_dir, args): def mode_pre(session_dir, args): global gtmpfilename + global g_pid_nodefile_map """ Read from Session file and write to session.pre file @@ -642,6 +701,9 @@ def mode_pre(session_dir, args): mkdirp(os.path.dirname(args.outfile), exit_on_err=True, logger=logger) + if not args.full and args.type in ["f", "d"]: + fail("--type can only be used with --full") + # If Pre status file exists and running pre command again if os.path.exists(status_file_pre) and not args.regenerate_outfile: fail("Post command is not run after last pre, " @@ -668,14 +730,20 @@ def mode_pre(session_dir, args): # Merger if args.full: - cmd = ["sort", "-u"] + node_outfiles + ["-o", args.outfile] - execute(cmd, - exit_msg="Failed to merge output files " - "collected from nodes", logger=logger) + if len(g_pid_nodefile_map) > 0: + cmd = ["sort", "-u"] + list(g_pid_nodefile_map.values()) + \ + ["-o", args.outfile] + execute(cmd, + exit_msg="Failed to merge output files " + "collected from nodes", logger=logger) + else: + fail("Failed to collect any output files from peers. " + "Looks like all bricks are offline.", logger=logger) else: # Read each Changelogs db and generate finaldb create_file(args.outfile, exit_on_err=True, logger=logger) - outfilemerger = OutputMerger(args.outfile + ".db", node_outfiles) + outfilemerger = OutputMerger(args.outfile + ".db", + list(g_pid_nodefile_map.values())) write_output(args.outfile, outfilemerger, args.field_separator) try: @@ -685,7 +753,7 @@ def mode_pre(session_dir, args): run_cmd_nodes("cleanup", args, tmpfilename=gtmpfilename) - with open(status_file_pre, "w", buffering=0) as f: + with open(status_file_pre, "w") as f: f.write(str(endtime_to_update)) sys.stdout.write("Generated output file %s\n" % args.outfile) @@ -811,6 +879,11 @@ def main(): args.mode not in ["create", "list", "query"]: fail("Invalid session %s" % args.session) + # volume involved, validate the volume first + if args.mode not in ["list"]: + validate_volume(args.volume) + + # "default" is a system defined session name if args.mode in ["create", "post", "pre", "delete"] and \ args.session == "default": |
