summaryrefslogtreecommitdiffstats
path: root/tools/glusterfind/src/main.py
diff options
context:
space:
mode:
Diffstat (limited to 'tools/glusterfind/src/main.py')
-rw-r--r--tools/glusterfind/src/main.py278
1 files changed, 126 insertions, 152 deletions
diff --git a/tools/glusterfind/src/main.py b/tools/glusterfind/src/main.py
index ceaf1173897..089a3aec3c5 100644
--- a/tools/glusterfind/src/main.py
+++ b/tools/glusterfind/src/main.py
@@ -19,7 +19,8 @@ import logging
import shutil
from utils import execute, is_host_local, mkdirp, fail
-from utils import setup_logger, human_time
+from utils import setup_logger, human_time, handle_rm_error
+from utils import get_changelog_rollover_time, cache_output
import conf
@@ -29,6 +30,7 @@ GlusterFS Incremental API
ParseError = etree.ParseError if hasattr(etree, 'ParseError') else SyntaxError
logger = logging.getLogger()
+node_outfiles = []
class StoreAbsPath(Action):
@@ -46,33 +48,13 @@ def get_pem_key_path(session, volume):
"%s_%s_secret.pem" % (session, volume))
-def node_run(volume, host, path, start, outfile, args, fallback=False):
+def node_cmd(host, host_uuid, task, cmd, args, opts):
"""
- If host is local node, execute the command locally. If not local
- execute the CHANGE_DETECTOR command via ssh and copy the output file from
- remote node using scp.
+ Runs command via ssh if host is not local
"""
- localdir = is_host_local(host)
- pem_key_path = get_pem_key_path(args.session, args.volume)
+ localdir = is_host_local(host_uuid)
- # If Full backup is requested or start time is zero, use brickfind
- change_detector = conf.get_change_detector(args.change_detector)
- if ((start == 0 or args.full) and args.change_detector == "changelog") or \
- fallback:
- change_detector = conf.get_change_detector("brickfind")
-
- # CHANGE_DETECTOR <SESSION> <VOLUME> <BRICK> <OUTFILE> <START> --debug
- # --gfidpath <TYPE>
- cmd = [change_detector,
- args.session,
- volume,
- path,
- outfile,
- str(start),
- "--output-prefix",
- args.output_prefix] + \
- (["--debug"] if args.debug else []) + \
- (["--full"] if args.full else [])
+ pem_key_path = get_pem_key_path(args.session, args.volume)
if not localdir:
# prefix with ssh command if not local node
@@ -80,128 +62,112 @@ def node_run(volume, host, path, start, outfile, args, fallback=False):
"-i", pem_key_path,
"root@%s" % host] + cmd
- rc, out, err = execute(cmd, logger=logger)
- if rc == 2:
- # Partial History Fallback
- logger.info("%s %s Fallback to brickfind" % (host, err.strip()))
- # Exit only from process, handled in main.
- sys.exit(rc)
- elif rc != 0:
- fail("%s - Change detection failed" % host, logger=logger)
+ execute(cmd, exit_msg="%s - %s failed" % (host, task), logger=logger)
- if not localdir:
+ if opts.get("copy_outfile", False):
cmd_copy = ["scp",
"-i", pem_key_path,
- "root@%s:/%s" % (host, outfile),
- os.path.dirname(outfile)]
+ "root@%s:/%s" % (host, opts.get("node_outfile")),
+ os.path.dirname(opts.get("node_outfile"))]
execute(cmd_copy, exit_msg="%s - Copy command failed" % host,
logger=logger)
-def node_cleanup(host, args):
- localdir = is_host_local(host)
-
- pem_key_path = get_pem_key_path(args.session, args.volume)
-
- # CHANGE_DETECTOR <SESSION> <VOLUME> <BRICK> <OUTFILE> <START> --debug
- # --gfidpath <TYPE>
- cmd = [conf.get_opt("nodecleanup"),
- args.session,
- args.volume] + (["--debug"] if args.debug else [])
-
- if not localdir:
- # prefix with ssh command if not local node
- cmd = ["ssh",
- "-i", pem_key_path,
- "root@%s" % host] + cmd
-
- execute(cmd, exit_msg="%s - Cleanup failed" % host, logger=logger)
-
-
-def cleanup(nodes, args):
+def run_cmd_nodes(task, args, **kwargs):
+ global node_outfiles
+ nodes = get_nodes(args.volume)
pool = []
for num, node in enumerate(nodes):
host, brick = node[1].split(":")
- # temp output file
+ host_uuid = node[0]
+ cmd = []
+ opts = {}
node_outfile = os.path.join(conf.get_opt("working_dir"),
- args.session,
- args.volume,
- "tmp_output_%s.txt" % num)
-
- try:
- os.remove(node_outfile)
- except (OSError, IOError):
- # TODO: Cleanup Failure, Handle
- pass
-
- p = Process(target=node_cleanup,
- args=(host, args))
- p.start()
- pool.append(p)
-
- exit_codes = 0
- for p in pool:
- p.join()
- exit_codes += (0 if p.exitcode == 0 else 1)
-
- if exit_codes != 0:
- sys.exit(1)
-
-
-def failback_node_run(brick_details, idx, volume, start, outfile, args):
- host, brick = brick_details.split(":")
- p = Process(target=node_run,
- args=(volume, host, brick, start, outfile, args, True))
- p.start()
- p.join()
- return p.exitcode
-
-
-def run_in_nodes(volume, start, args):
- """
- Get nodes of volume using gluster volume info, spawn a process
- each for a Node. Merge the output files once all the process
- complete their tasks.
- """
- nodes = get_nodes(volume)
- pool = []
- node_outfiles = []
- for num, node in enumerate(nodes):
- host, brick = node[1].split(":")
- # temp output file
- node_outfile = os.path.join(conf.get_opt("working_dir"),
- args.session,
- volume,
- "tmp_output_%s.txt" % num)
- node_outfiles.append(node_outfile)
- p = Process(target=node_run, args=(volume, host, brick, start,
- node_outfile, args))
- p.start()
- pool.append(p)
-
- exit_codes = 0
- for idx, p in enumerate(pool):
+ args.session, args.volume,
+ "tmp_output_%s" % num)
+
+ if task == "pre":
+ # If Full backup is requested or start time is zero, use brickfind
+ change_detector = conf.get_change_detector("changelog")
+ if args.full:
+ change_detector = conf.get_change_detector("brickfind")
+
+ node_outfiles.append(node_outfile)
+
+ cmd = [change_detector,
+ args.session,
+ args.volume,
+ brick,
+ node_outfile,
+ str(kwargs.get("start")),
+ "--output-prefix",
+ args.output_prefix] + \
+ (["--debug"] if args.debug else []) + \
+ (["--only-namespace-changes"] if args.only_namespace_changes
+ else [])
+
+ opts["node_outfile"] = node_outfile
+ opts["copy_outfile"] = True
+ elif task == "cleanup":
+ # After pre run, cleanup the working directory and other temp files
+ # Remove the copied node_outfile in main node
+ try:
+ os.remove(node_outfile)
+ except (OSError, IOError):
+ logger.warn("Failed to cleanup temporary file %s" %
+ node_outfile)
+ pass
+
+ cmd = [conf.get_opt("nodeagent"),
+ "cleanup",
+ args.session,
+ args.volume] + (["--debug"] if args.debug else [])
+ elif task == "create":
+ # When glusterfind create, create session directory in
+ # each brick nodes
+ cmd = [conf.get_opt("nodeagent"),
+ "create",
+ args.session,
+ args.volume,
+ brick,
+ kwargs.get("time_to_update")] + \
+ (["--debug"] if args.debug else []) + \
+ (["--reset-session-time"] if args.reset_session_time
+ else [])
+ elif task == "post":
+ # Rename pre status file to actual status file in each node
+ cmd = [conf.get_opt("nodeagent"),
+ "post",
+ args.session,
+ args.volume,
+ brick] + \
+ (["--debug"] if args.debug else [])
+ elif task == "delete":
+ # When glusterfind delete, cleanup all the session files/dirs
+ # from each node.
+ cmd = [conf.get_opt("nodeagent"),
+ "delete",
+ args.session,
+ args.volume] + \
+ (["--debug"] if args.debug else [])
+
+ if cmd:
+ p = Process(target=node_cmd,
+ args=(host, host_uuid, task, cmd, args, opts))
+ p.start()
+ pool.append(p)
+
+ for num, p in enumerate(pool):
p.join()
- # Handle the Changelog failure, fallback to Brickfind
- if p.exitcode == 2:
- rc = failback_node_run(nodes[idx][1], idx, volume, start,
- node_outfiles[idx], args)
- exit_codes += (0 if rc == 0 else 1)
- elif p.exitcode != 0:
- exit_codes += (0 if p.exitcode == 0 else 1)
-
- if exit_codes != 0:
- sys.exit(1)
-
- # Merge all output files
- cmd = ["sort", "-u"] + node_outfiles + ["-o", args.outfile]
- execute(cmd,
- exit_msg="Failed to merge output files "
- "collected from nodes", logger=logger)
-
- cleanup(nodes, args)
+ if p.exitcode != 0:
+ logger.warn("Command %s failed in %s" % (task, nodes[num][1]))
+ if task in ["create", "delete"]:
+ fail("Command %s failed in %s" % (task, nodes[num][1]))
+ elif task == "pre" and args.disable_partial:
+ sys.exit(1)
+@cache_output
def get_nodes(volume):
"""
Get the gluster volume info xml output and parse to get
@@ -237,6 +203,9 @@ def _get_args():
parser_create.add_argument("--debug", help="Debug", action="store_true")
parser_create.add_argument("--force", help="Force option to recreate "
"the session", action="store_true")
+ parser_create.add_argument("--reset-session-time",
+ help="Reset Session Time to Current Time",
+ action="store_true")
# delete <SESSION> <VOLUME> [--debug]
parser_delete = subparsers.add_parser('delete')
@@ -250,7 +219,7 @@ def _get_args():
parser_list.add_argument("--volume", help="Volume Name", default="")
parser_list.add_argument("--debug", help="Debug", action="store_true")
- # pre <SESSION> <VOLUME> <OUTFILE> [--change-detector <CHANGE_DETECTOR>]
+ # pre <SESSION> <VOLUME> <OUTFILE>
# [--output-prefix <OUTPUT_PREFIX>] [--full]
parser_pre = subparsers.add_parser('pre')
parser_pre.add_argument("session", help="Session Name")
@@ -258,10 +227,8 @@ def _get_args():
parser_pre.add_argument("outfile", help="Output File", action=StoreAbsPath)
parser_pre.add_argument("--debug", help="Debug", action="store_true")
parser_pre.add_argument("--full", help="Full find", action="store_true")
- parser_pre.add_argument("--change-detector", dest="change_detector",
- help="Change detection",
- choices=conf.list_change_detectors(),
- type=str, default='changelog')
+ parser_pre.add_argument("--disable-partial", help="Disable Partial find, "
+ "Fail when one node fails", action="store_true")
parser_pre.add_argument("--output-prefix", help="File prefix in output",
default=".")
parser_pre.add_argument("--regenerate-outfile",
@@ -363,12 +330,15 @@ def mode_create(session_dir, args):
logger.info("Volume option set %s, changelog.changelog on"
% args.volume)
- if not os.path.exists(status_file):
+ # Add Rollover time to current time to make sure changelogs
+ # will be available if we use this time as start time
+ time_to_update = int(time.time()) + get_changelog_rollover_time(
+ args.volume)
+
+ run_cmd_nodes("create", args, time_to_update=str(time_to_update))
+
+ if not os.path.exists(status_file) or args.reset_session_time:
with open(status_file, "w", buffering=0) as f:
- # Add Rollover time to current time to make sure changelogs
- # will be available if we use this time as start time
- time_to_update = int(time.time()) + int(
- conf.get_opt("changelog_rollover_time"))
f.write(str(time_to_update))
sys.exit(0)
@@ -378,8 +348,8 @@ def mode_pre(session_dir, args):
"""
Read from Session file and write to session.pre file
"""
- endtime_to_update = int(time.time()) - int(
- conf.get_opt("changelog_rollover_time"))
+ endtime_to_update = int(time.time()) - get_changelog_rollover_time(
+ args.volume)
status_file = os.path.join(session_dir, args.volume, "status")
status_file_pre = status_file + ".pre"
@@ -404,7 +374,15 @@ def mode_pre(session_dir, args):
"Start time: %s, End time: %s"
% (args.session, args.volume, start, endtime_to_update))
- run_in_nodes(args.volume, start, args)
+ run_cmd_nodes("pre", args, start=start)
+
+ # Merger
+ cmd = ["sort", "-u"] + node_outfiles + ["-o", args.outfile]
+ execute(cmd,
+ exit_msg="Failed to merge output files "
+ "collected from nodes", logger=logger)
+
+ run_cmd_nodes("cleanup", args)
with open(status_file_pre, "w", buffering=0) as f:
f.write(str(endtime_to_update))
@@ -423,6 +401,7 @@ def mode_post(session_dir, args):
status_file_pre = status_file + ".pre"
if os.path.exists(status_file_pre):
+ run_cmd_nodes("post", args)
os.rename(status_file_pre, status_file)
sys.exit(0)
else:
@@ -430,12 +409,7 @@ def mode_post(session_dir, args):
def mode_delete(session_dir, args):
- def handle_rm_error(func, path, exc_info):
- if exc_info[1].errno == ENOENT:
- return
-
- raise exc_info[1]
-
+ run_cmd_nodes("delete", args)
shutil.rmtree(os.path.join(session_dir, args.volume),
onerror=handle_rm_error)