From e7ff30ac7612753e289ae043f88a1f3a8c8f19ce Mon Sep 17 00:00:00 2001 From: Aravinda VK Date: Mon, 20 Apr 2015 14:03:58 +0530 Subject: tools/glusterfind: Partial Find This is optional and enabled by default, if one node fails Glusterfind will not fail to return list of files from other nodes. This behavior can be changed using --disable-partial Now session is maintained in each nodes as well as in initiator node. Every pre command will pick the status file from respective node and start collecting list of changes happened after the status time. --reset-session-time, new option to force reset the session time. Next incremental run will start from this time. Change-detector argument is removed since Changelog mode is required to detect deletes and Renames. BUG: 1219467 Change-Id: I1d0a0629facc3d26780200ccdf39b221ab4037c4 Original-Author: Aravinda VK Reviewed-On: http://review.gluster.org/#/c/10320 Signed-off-by: Kotresh HR Reviewed-on: http://review.gluster.org/10639 Tested-by: Gluster Build System Reviewed-by: Vijay Bellur --- tools/glusterfind/src/Makefile.am | 4 +- tools/glusterfind/src/brickfind.py | 39 ++--- tools/glusterfind/src/changelog.py | 40 ++++- tools/glusterfind/src/main.py | 278 ++++++++++++++++------------------- tools/glusterfind/src/nodeagent.py | 123 ++++++++++++++++ tools/glusterfind/src/nodecleanup.py | 51 ------- tools/glusterfind/src/tool.conf.in | 3 +- tools/glusterfind/src/utils.py | 118 ++++++++------- 8 files changed, 367 insertions(+), 289 deletions(-) create mode 100644 tools/glusterfind/src/nodeagent.py delete mode 100644 tools/glusterfind/src/nodecleanup.py (limited to 'tools/glusterfind') diff --git a/tools/glusterfind/src/Makefile.am b/tools/glusterfind/src/Makefile.am index 458b820fd19..7b819828d97 100644 --- a/tools/glusterfind/src/Makefile.am +++ b/tools/glusterfind/src/Makefile.am @@ -3,12 +3,12 @@ glusterfinddir = $(libexecdir)/glusterfs/glusterfind glusterfind_PYTHON = conf.py utils.py __init__.py \ main.py libgfchangelog.py -glusterfind_SCRIPTS = changelog.py nodecleanup.py \ +glusterfind_SCRIPTS = changelog.py nodeagent.py \ brickfind.py glusterfind_DATA = tool.conf -EXTRA_DIST = changelog.py nodecleanup.py brickfind.py \ +EXTRA_DIST = changelog.py nodeagent.py brickfind.py \ tool.conf CLEANFILES = diff --git a/tools/glusterfind/src/brickfind.py b/tools/glusterfind/src/brickfind.py index 1090f408e28..9758bef56ff 100644 --- a/tools/glusterfind/src/brickfind.py +++ b/tools/glusterfind/src/brickfind.py @@ -12,7 +12,8 @@ import os import sys import logging from argparse import ArgumentParser, RawDescriptionHelpFormatter -from errno import ENOENT +import urllib +import time from utils import mkdirp, setup_logger, create_file, output_write, find import conf @@ -36,36 +37,17 @@ def brickfind_crawl(brick, args): with open(args.outfile, "a+") as fout: brick_path_len = len(brick) - def mtime_filter(path): - try: - st = os.lstat(path) - except (OSError, IOError) as e: - if e.errno == ENOENT: - st = None - else: - raise - - if st and (st.st_mtime > args.start or st.st_ctime > args.start): - return True - - return False - def output_callback(path): path = path.strip() path = path[brick_path_len+1:] - output_write(fout, path, args.output_prefix) + output_write(fout, path, args.output_prefix, encode=True) ignore_dirs = [os.path.join(brick, dirname) for dirname in conf.get_opt("brick_ignore_dirs").split(",")] - if args.full: - find(brick, callback_func=output_callback, - ignore_dirs=ignore_dirs) - else: - find(brick, callback_func=output_callback, - filter_func=mtime_filter, - ignore_dirs=ignore_dirs) + find(brick, callback_func=output_callback, + ignore_dirs=ignore_dirs) fout.flush() os.fsync(fout.fileno()) @@ -81,7 +63,6 @@ def _get_args(): parser.add_argument("outfile", help="Output File") parser.add_argument("start", help="Start Time", type=float) parser.add_argument("--debug", help="Debug", action="store_true") - parser.add_argument("--full", help="Full Find", action="store_true") parser.add_argument("--output-prefix", help="File prefix in output", default=".") @@ -90,6 +71,12 @@ def _get_args(): if __name__ == "__main__": args = _get_args() + session_dir = os.path.join(conf.get_opt("session_dir"), args.session) + status_file = os.path.join(session_dir, args.volume, + "%s.status" % urllib.quote_plus(args.brick)) + status_file_pre = status_file + ".pre" + mkdirp(os.path.join(session_dir, args.volume), exit_on_err=True, + logger=logger) mkdirp(os.path.join(conf.get_opt("log_dir"), args.session, args.volume), exit_on_err=True) log_file = os.path.join(conf.get_opt("log_dir"), @@ -97,5 +84,9 @@ if __name__ == "__main__": args.volume, "brickfind.log") setup_logger(logger, log_file, args.debug) + + time_to_update = int(time.time()) brickfind_crawl(args.brick, args) + with open(status_file_pre, "w", buffering=0) as f: + f.write(str(time_to_update)) sys.exit(0) diff --git a/tools/glusterfind/src/changelog.py b/tools/glusterfind/src/changelog.py index eb73635fb32..2c4ee9106e1 100644 --- a/tools/glusterfind/src/changelog.py +++ b/tools/glusterfind/src/changelog.py @@ -16,10 +16,12 @@ from errno import ENOENT import logging from argparse import ArgumentParser, RawDescriptionHelpFormatter import hashlib +import urllib import libgfchangelog from utils import create_file, mkdirp, execute, symlink_gfid_to_path from utils import fail, setup_logger, output_write, find +from utils import get_changelog_rollover_time import conf @@ -190,7 +192,7 @@ def sort_unique(filename): exit_msg="Sort failed", logger=logger) -def get_changes(brick, hash_dir, log_file, end, args): +def get_changes(brick, hash_dir, log_file, start, end, args): """ Makes use of libgfchangelog's history API to get changelogs containing changes from start and end time. Further collects @@ -216,7 +218,7 @@ def get_changes(brick, hash_dir, log_file, end, args): # Fail if History fails for requested Start and End try: actual_end = libgfchangelog.cl_history_changelog( - cl_path, args.start, end, CHANGELOGAPI_NUM_WORKERS) + cl_path, start, end, CHANGELOGAPI_NUM_WORKERS) except libgfchangelog.ChangelogException as e: fail("%s Historical Changelogs not available: %s" % (brick, e), logger=logger) @@ -235,6 +237,11 @@ def get_changes(brick, hash_dir, log_file, end, args): if changes: with open(gfid_list_path, 'a+') as fgfid: for change in changes: + # Ignore if last processed changelog comes + # again in list + if change.endswith(".%s" % start): + continue + with open(change) as f: for line in f: # Space delimited list, collect GFID @@ -259,8 +266,10 @@ def get_changes(brick, hash_dir, log_file, end, args): args.outfile, gfid_list_failures_file) gfid_to_path_using_batchfind(brick, gfid_list_failures_file, args.outfile) + return actual_end -def changelog_crawl(brick, end, args): + +def changelog_crawl(brick, start, end, args): """ Init function, prepares working dir and calls Changelog query """ @@ -283,8 +292,8 @@ def changelog_crawl(brick, end, args): "changelog.%s.log" % brickhash) logger.info("%s Started Changelog Crawl. Start: %s, End: %s" - % (brick, args.start, end)) - get_changes(brick, working_dir, log_file, end, args) + % (brick, start, end)) + return get_changes(brick, working_dir, log_file, start, end, args) def _get_args(): @@ -312,6 +321,23 @@ if __name__ == "__main__": args.volume, "changelog.log") setup_logger(logger, log_file, args.debug) - end = int(time.time()) - int(conf.get_opt("changelog_rollover_time")) - changelog_crawl(args.brick, end, args) + + session_dir = os.path.join(conf.get_opt("session_dir"), args.session) + status_file = os.path.join(session_dir, args.volume, + "%s.status" % urllib.quote_plus(args.brick)) + status_file_pre = status_file + ".pre" + mkdirp(os.path.join(session_dir, args.volume), exit_on_err=True, + logger=logger) + + try: + with open(status_file) as f: + start = int(f.read().strip()) + except (ValueError, OSError, IOError): + start = args.start + + end = int(time.time()) - get_changelog_rollover_time(args.volume) + actual_end = changelog_crawl(args.brick, start, end, args) + with open(status_file_pre, "w", buffering=0) as f: + f.write(str(actual_end)) + sys.exit(0) diff --git a/tools/glusterfind/src/main.py b/tools/glusterfind/src/main.py index ceaf1173897..089a3aec3c5 100644 --- a/tools/glusterfind/src/main.py +++ b/tools/glusterfind/src/main.py @@ -19,7 +19,8 @@ import logging import shutil from utils import execute, is_host_local, mkdirp, fail -from utils import setup_logger, human_time +from utils import setup_logger, human_time, handle_rm_error +from utils import get_changelog_rollover_time, cache_output import conf @@ -29,6 +30,7 @@ GlusterFS Incremental API ParseError = etree.ParseError if hasattr(etree, 'ParseError') else SyntaxError logger = logging.getLogger() +node_outfiles = [] class StoreAbsPath(Action): @@ -46,33 +48,13 @@ def get_pem_key_path(session, volume): "%s_%s_secret.pem" % (session, volume)) -def node_run(volume, host, path, start, outfile, args, fallback=False): +def node_cmd(host, host_uuid, task, cmd, args, opts): """ - If host is local node, execute the command locally. If not local - execute the CHANGE_DETECTOR command via ssh and copy the output file from - remote node using scp. + Runs command via ssh if host is not local """ - localdir = is_host_local(host) - pem_key_path = get_pem_key_path(args.session, args.volume) + localdir = is_host_local(host_uuid) - # If Full backup is requested or start time is zero, use brickfind - change_detector = conf.get_change_detector(args.change_detector) - if ((start == 0 or args.full) and args.change_detector == "changelog") or \ - fallback: - change_detector = conf.get_change_detector("brickfind") - - # CHANGE_DETECTOR --debug - # --gfidpath - cmd = [change_detector, - args.session, - volume, - path, - outfile, - str(start), - "--output-prefix", - args.output_prefix] + \ - (["--debug"] if args.debug else []) + \ - (["--full"] if args.full else []) + pem_key_path = get_pem_key_path(args.session, args.volume) if not localdir: # prefix with ssh command if not local node @@ -80,128 +62,112 @@ def node_run(volume, host, path, start, outfile, args, fallback=False): "-i", pem_key_path, "root@%s" % host] + cmd - rc, out, err = execute(cmd, logger=logger) - if rc == 2: - # Partial History Fallback - logger.info("%s %s Fallback to brickfind" % (host, err.strip())) - # Exit only from process, handled in main. - sys.exit(rc) - elif rc != 0: - fail("%s - Change detection failed" % host, logger=logger) + execute(cmd, exit_msg="%s - %s failed" % (host, task), logger=logger) - if not localdir: + if opts.get("copy_outfile", False): cmd_copy = ["scp", "-i", pem_key_path, - "root@%s:/%s" % (host, outfile), - os.path.dirname(outfile)] + "root@%s:/%s" % (host, opts.get("node_outfile")), + os.path.dirname(opts.get("node_outfile"))] execute(cmd_copy, exit_msg="%s - Copy command failed" % host, logger=logger) -def node_cleanup(host, args): - localdir = is_host_local(host) - - pem_key_path = get_pem_key_path(args.session, args.volume) - - # CHANGE_DETECTOR --debug - # --gfidpath - cmd = [conf.get_opt("nodecleanup"), - args.session, - args.volume] + (["--debug"] if args.debug else []) - - if not localdir: - # prefix with ssh command if not local node - cmd = ["ssh", - "-i", pem_key_path, - "root@%s" % host] + cmd - - execute(cmd, exit_msg="%s - Cleanup failed" % host, logger=logger) - - -def cleanup(nodes, args): +def run_cmd_nodes(task, args, **kwargs): + global node_outfiles + nodes = get_nodes(args.volume) pool = [] for num, node in enumerate(nodes): host, brick = node[1].split(":") - # temp output file + host_uuid = node[0] + cmd = [] + opts = {} node_outfile = os.path.join(conf.get_opt("working_dir"), - args.session, - args.volume, - "tmp_output_%s.txt" % num) - - try: - os.remove(node_outfile) - except (OSError, IOError): - # TODO: Cleanup Failure, Handle - pass - - p = Process(target=node_cleanup, - args=(host, args)) - p.start() - pool.append(p) - - exit_codes = 0 - for p in pool: - p.join() - exit_codes += (0 if p.exitcode == 0 else 1) - - if exit_codes != 0: - sys.exit(1) - - -def failback_node_run(brick_details, idx, volume, start, outfile, args): - host, brick = brick_details.split(":") - p = Process(target=node_run, - args=(volume, host, brick, start, outfile, args, True)) - p.start() - p.join() - return p.exitcode - - -def run_in_nodes(volume, start, args): - """ - Get nodes of volume using gluster volume info, spawn a process - each for a Node. Merge the output files once all the process - complete their tasks. - """ - nodes = get_nodes(volume) - pool = [] - node_outfiles = [] - for num, node in enumerate(nodes): - host, brick = node[1].split(":") - # temp output file - node_outfile = os.path.join(conf.get_opt("working_dir"), - args.session, - volume, - "tmp_output_%s.txt" % num) - node_outfiles.append(node_outfile) - p = Process(target=node_run, args=(volume, host, brick, start, - node_outfile, args)) - p.start() - pool.append(p) - - exit_codes = 0 - for idx, p in enumerate(pool): + args.session, args.volume, + "tmp_output_%s" % num) + + if task == "pre": + # If Full backup is requested or start time is zero, use brickfind + change_detector = conf.get_change_detector("changelog") + if args.full: + change_detector = conf.get_change_detector("brickfind") + + node_outfiles.append(node_outfile) + + cmd = [change_detector, + args.session, + args.volume, + brick, + node_outfile, + str(kwargs.get("start")), + "--output-prefix", + args.output_prefix] + \ + (["--debug"] if args.debug else []) + \ + (["--only-namespace-changes"] if args.only_namespace_changes + else []) + + opts["node_outfile"] = node_outfile + opts["copy_outfile"] = True + elif task == "cleanup": + # After pre run, cleanup the working directory and other temp files + # Remove the copied node_outfile in main node + try: + os.remove(node_outfile) + except (OSError, IOError): + logger.warn("Failed to cleanup temporary file %s" % + node_outfile) + pass + + cmd = [conf.get_opt("nodeagent"), + "cleanup", + args.session, + args.volume] + (["--debug"] if args.debug else []) + elif task == "create": + # When glusterfind create, create session directory in + # each brick nodes + cmd = [conf.get_opt("nodeagent"), + "create", + args.session, + args.volume, + brick, + kwargs.get("time_to_update")] + \ + (["--debug"] if args.debug else []) + \ + (["--reset-session-time"] if args.reset_session_time + else []) + elif task == "post": + # Rename pre status file to actual status file in each node + cmd = [conf.get_opt("nodeagent"), + "post", + args.session, + args.volume, + brick] + \ + (["--debug"] if args.debug else []) + elif task == "delete": + # When glusterfind delete, cleanup all the session files/dirs + # from each node. + cmd = [conf.get_opt("nodeagent"), + "delete", + args.session, + args.volume] + \ + (["--debug"] if args.debug else []) + + if cmd: + p = Process(target=node_cmd, + args=(host, host_uuid, task, cmd, args, opts)) + p.start() + pool.append(p) + + for num, p in enumerate(pool): p.join() - # Handle the Changelog failure, fallback to Brickfind - if p.exitcode == 2: - rc = failback_node_run(nodes[idx][1], idx, volume, start, - node_outfiles[idx], args) - exit_codes += (0 if rc == 0 else 1) - elif p.exitcode != 0: - exit_codes += (0 if p.exitcode == 0 else 1) - - if exit_codes != 0: - sys.exit(1) - - # Merge all output files - cmd = ["sort", "-u"] + node_outfiles + ["-o", args.outfile] - execute(cmd, - exit_msg="Failed to merge output files " - "collected from nodes", logger=logger) - - cleanup(nodes, args) + if p.exitcode != 0: + logger.warn("Command %s failed in %s" % (task, nodes[num][1])) + if task in ["create", "delete"]: + fail("Command %s failed in %s" % (task, nodes[num][1])) + elif task == "pre" and args.disable_partial: + sys.exit(1) +@cache_output def get_nodes(volume): """ Get the gluster volume info xml output and parse to get @@ -237,6 +203,9 @@ def _get_args(): parser_create.add_argument("--debug", help="Debug", action="store_true") parser_create.add_argument("--force", help="Force option to recreate " "the session", action="store_true") + parser_create.add_argument("--reset-session-time", + help="Reset Session Time to Current Time", + action="store_true") # delete [--debug] parser_delete = subparsers.add_parser('delete') @@ -250,7 +219,7 @@ def _get_args(): parser_list.add_argument("--volume", help="Volume Name", default="") parser_list.add_argument("--debug", help="Debug", action="store_true") - # pre [--change-detector ] + # pre # [--output-prefix ] [--full] parser_pre = subparsers.add_parser('pre') parser_pre.add_argument("session", help="Session Name") @@ -258,10 +227,8 @@ def _get_args(): parser_pre.add_argument("outfile", help="Output File", action=StoreAbsPath) parser_pre.add_argument("--debug", help="Debug", action="store_true") parser_pre.add_argument("--full", help="Full find", action="store_true") - parser_pre.add_argument("--change-detector", dest="change_detector", - help="Change detection", - choices=conf.list_change_detectors(), - type=str, default='changelog') + parser_pre.add_argument("--disable-partial", help="Disable Partial find, " + "Fail when one node fails", action="store_true") parser_pre.add_argument("--output-prefix", help="File prefix in output", default=".") parser_pre.add_argument("--regenerate-outfile", @@ -363,12 +330,15 @@ def mode_create(session_dir, args): logger.info("Volume option set %s, changelog.changelog on" % args.volume) - if not os.path.exists(status_file): + # Add Rollover time to current time to make sure changelogs + # will be available if we use this time as start time + time_to_update = int(time.time()) + get_changelog_rollover_time( + args.volume) + + run_cmd_nodes("create", args, time_to_update=str(time_to_update)) + + if not os.path.exists(status_file) or args.reset_session_time: with open(status_file, "w", buffering=0) as f: - # Add Rollover time to current time to make sure changelogs - # will be available if we use this time as start time - time_to_update = int(time.time()) + int( - conf.get_opt("changelog_rollover_time")) f.write(str(time_to_update)) sys.exit(0) @@ -378,8 +348,8 @@ def mode_pre(session_dir, args): """ Read from Session file and write to session.pre file """ - endtime_to_update = int(time.time()) - int( - conf.get_opt("changelog_rollover_time")) + endtime_to_update = int(time.time()) - get_changelog_rollover_time( + args.volume) status_file = os.path.join(session_dir, args.volume, "status") status_file_pre = status_file + ".pre" @@ -404,7 +374,15 @@ def mode_pre(session_dir, args): "Start time: %s, End time: %s" % (args.session, args.volume, start, endtime_to_update)) - run_in_nodes(args.volume, start, args) + run_cmd_nodes("pre", args, start=start) + + # Merger + cmd = ["sort", "-u"] + node_outfiles + ["-o", args.outfile] + execute(cmd, + exit_msg="Failed to merge output files " + "collected from nodes", logger=logger) + + run_cmd_nodes("cleanup", args) with open(status_file_pre, "w", buffering=0) as f: f.write(str(endtime_to_update)) @@ -423,6 +401,7 @@ def mode_post(session_dir, args): status_file_pre = status_file + ".pre" if os.path.exists(status_file_pre): + run_cmd_nodes("post", args) os.rename(status_file_pre, status_file) sys.exit(0) else: @@ -430,12 +409,7 @@ def mode_post(session_dir, args): def mode_delete(session_dir, args): - def handle_rm_error(func, path, exc_info): - if exc_info[1].errno == ENOENT: - return - - raise exc_info[1] - + run_cmd_nodes("delete", args) shutil.rmtree(os.path.join(session_dir, args.volume), onerror=handle_rm_error) diff --git a/tools/glusterfind/src/nodeagent.py b/tools/glusterfind/src/nodeagent.py new file mode 100644 index 00000000000..2e8c2fc9759 --- /dev/null +++ b/tools/glusterfind/src/nodeagent.py @@ -0,0 +1,123 @@ +#!/usr/bin/env python + +# Copyright (c) 2015 Red Hat, Inc. +# This file is part of GlusterFS. +# +# This file is licensed to you under your choice of the GNU Lesser +# General Public License, version 3 or any later version (LGPLv3 or +# later), or the GNU General Public License, version 2 (GPLv2), in all +# cases as published by the Free Software Foundation. + +import shutil +import sys +import os +import logging +from argparse import ArgumentParser, RawDescriptionHelpFormatter +import urllib + +from utils import setup_logger, mkdirp, handle_rm_error +import conf + +logger = logging.getLogger() + + +def mode_cleanup(args): + working_dir = os.path.join(conf.get_opt("working_dir"), + args.session, + args.volume) + + mkdirp(os.path.join(conf.get_opt("log_dir"), args.session, args.volume), + exit_on_err=True) + log_file = os.path.join(conf.get_opt("log_dir"), + args.session, + args.volume, + "changelog.log") + + setup_logger(logger, log_file) + + try: + shutil.rmtree(working_dir, onerror=handle_rm_error) + except (OSError, IOError) as e: + logger.error("Failed to delete working directory: %s" % e) + sys.exit(1) + + +def mode_create(args): + session_dir = os.path.join(conf.get_opt("session_dir"), + args.session) + status_file = os.path.join(session_dir, args.volume, + "%s.status" % urllib.quote_plus(args.brick)) + + mkdirp(os.path.join(session_dir, args.volume), exit_on_err=True, + logger=logger) + + if not os.path.exists(status_file) or args.reset_session_time: + with open(status_file, "w", buffering=0) as f: + f.write(args.time_to_update) + + sys.exit(0) + + +def mode_post(args): + session_dir = os.path.join(conf.get_opt("session_dir"), args.session) + status_file = os.path.join(session_dir, args.volume, + "%s.status" % urllib.quote_plus(args.brick)) + + mkdirp(os.path.join(session_dir, args.volume), exit_on_err=True, + logger=logger) + status_file_pre = status_file + ".pre" + + if os.path.exists(status_file_pre): + os.rename(status_file_pre, status_file) + sys.exit(0) + + +def mode_delete(args): + session_dir = os.path.join(conf.get_opt("session_dir"), + args.session) + shutil.rmtree(os.path.join(session_dir, args.volume), + onerror=handle_rm_error) + + +def _get_args(): + parser = ArgumentParser(formatter_class=RawDescriptionHelpFormatter, + description="Node Agent") + subparsers = parser.add_subparsers(dest="mode") + + parser_cleanup = subparsers.add_parser('cleanup') + parser_cleanup.add_argument("session", help="Session Name") + parser_cleanup.add_argument("volume", help="Volume Name") + parser_cleanup.add_argument("--debug", help="Debug", action="store_true") + + parser_session_create = subparsers.add_parser('create') + parser_session_create.add_argument("session", help="Session Name") + parser_session_create.add_argument("volume", help="Volume Name") + parser_session_create.add_argument("brick", help="Brick Path") + parser_session_create.add_argument("time_to_update", help="Time to Update") + parser_session_create.add_argument("--reset-session-time", + help="Reset Session Time", + action="store_true") + parser_session_create.add_argument("--debug", help="Debug", + action="store_true") + + parser_post = subparsers.add_parser('post') + parser_post.add_argument("session", help="Session Name") + parser_post.add_argument("volume", help="Volume Name") + parser_post.add_argument("brick", help="Brick Path") + parser_post.add_argument("--debug", help="Debug", + action="store_true") + + parser_delete = subparsers.add_parser('delete') + parser_delete.add_argument("session", help="Session Name") + parser_delete.add_argument("volume", help="Volume Name") + parser_delete.add_argument("--debug", help="Debug", + action="store_true") + return parser.parse_args() + + +if __name__ == "__main__": + args = _get_args() + + # globals() will have all the functions already defined. + # mode_ will be the function name to be called + globals()["mode_" + args.mode](args) diff --git a/tools/glusterfind/src/nodecleanup.py b/tools/glusterfind/src/nodecleanup.py deleted file mode 100644 index a31d4d83acd..00000000000 --- a/tools/glusterfind/src/nodecleanup.py +++ /dev/null @@ -1,51 +0,0 @@ -#!/usr/bin/env python - -# Copyright (c) 2015 Red Hat, Inc. -# This file is part of GlusterFS. -# -# This file is licensed to you under your choice of the GNU Lesser -# General Public License, version 3 or any later version (LGPLv3 or -# later), or the GNU General Public License, version 2 (GPLv2), in all -# cases as published by the Free Software Foundation. - -import shutil -import sys -import os -import logging -from errno import ENOENT - -from utils import setup_logger, mkdirp -import conf - -logger = logging.getLogger() - - -if __name__ == "__main__": - # Args: - session = sys.argv[1] - volume = sys.argv[2] - - working_dir = os.path.join(conf.get_opt("working_dir"), - session, - volume) - - mkdirp(os.path.join(conf.get_opt("log_dir"), session, volume), - exit_on_err=True) - log_file = os.path.join(conf.get_opt("log_dir"), - session, - volume, - "changelog.log") - - setup_logger(logger, log_file) - - try: - def handle_rm_error(func, path, exc_info): - if exc_info[1].errno == ENOENT: - return - - raise exc_info[1] - - shutil.rmtree(working_dir, onerror=handle_rm_error) - except (OSError, IOError) as e: - logger.error("Failed to delete working directory: %s" % e) - sys.exit(1) diff --git a/tools/glusterfind/src/tool.conf.in b/tools/glusterfind/src/tool.conf.in index 54230cb4dca..a80f4a784c0 100644 --- a/tools/glusterfind/src/tool.conf.in +++ b/tools/glusterfind/src/tool.conf.in @@ -2,8 +2,7 @@ session_dir=@GLUSTERD_WORKDIR@/glusterfind/ working_dir=@GLUSTERFSD_MISCDIR@/glusterfind/ log_dir=/var/log/glusterfs/glusterfind/ -nodecleanup=@GLUSTERFS_LIBEXECDIR@/glusterfind/nodecleanup.py -changelog_rollover_time=15 +nodeagent=@GLUSTERFS_LIBEXECDIR@/glusterfind/nodeagent.py brick_ignore_dirs=.glusterfs,.trashcan [change_detectors] diff --git a/tools/glusterfind/src/utils.py b/tools/glusterfind/src/utils.py index de9c027e299..aea9a9dc82d 100644 --- a/tools/glusterfind/src/utils.py +++ b/tools/glusterfind/src/utils.py @@ -9,14 +9,36 @@ # cases as published by the Free Software Foundation. import sys -import socket from subprocess import PIPE, Popen -from errno import EPERM, EEXIST +from errno import EEXIST, ENOENT +import xml.etree.cElementTree as etree import logging import os from datetime import datetime +import urllib ROOT_GFID = "00000000-0000-0000-0000-000000000001" +DEFAULT_CHANGELOG_INTERVAL = 15 + +ParseError = etree.ParseError if hasattr(etree, 'ParseError') else SyntaxError +cache_data = {} + + +def cache_output(func): + def wrapper(*args, **kwargs): + global cache_data + if cache_data.get(func.func_name, None) is None: + cache_data[func.func_name] = func(*args, **kwargs) + + return cache_data[func.func_name] + return wrapper + + +def handle_rm_error(func, path, exc_info): + if exc_info[1].errno == ENOENT: + return + + raise exc_info[1] def find(path, callback_func=lambda x: True, filter_func=lambda x: True, @@ -41,12 +63,16 @@ def find(path, callback_func=lambda x: True, filter_func=lambda x: True, callback_func(full_path) -def output_write(f, path, prefix="."): +def output_write(f, path, prefix=".", encode=False): if path == "": return if prefix != ".": path = os.path.join(prefix, path) + + if encode: + path = urllib.quote_plus(path) + f.write("%s\n" % path) @@ -153,51 +179,41 @@ def symlink_gfid_to_path(brick, gfid): return out_path -def is_host_local(host): - """ - Find if a host is local or not. - Code copied from $GLUSTERFS/geo-replication/syncdaemon/syncdutils.py - """ - locaddr = False - for ai in socket.getaddrinfo(host, None): - # cf. http://github.com/gluster/glusterfs/blob/ce111f47/xlators - # /mgmt/glusterd/src/glusterd-utils.c#L125 - if ai[0] == socket.AF_INET: - if ai[-1][0].split(".")[0] == "127": - locaddr = True - break - elif ai[0] == socket.AF_INET6: - if ai[-1][0] == "::1": - locaddr = True - break - else: - continue - try: - # use ICMP socket to avoid net.ipv4.ip_nonlocal_bind issue, - # cf. https://bugzilla.redhat.com/show_bug.cgi?id=890587 - s = socket.socket(ai[0], socket.SOCK_RAW, socket.IPPROTO_ICMP) - except socket.error: - ex = sys.exc_info()[1] - if ex.errno != EPERM: - raise - f = None - try: - f = open("/proc/sys/net/ipv4/ip_nonlocal_bind") - if int(f.read()) != 0: - logger.warning("non-local bind is set and not " - "allowed to create " - "raw sockets, cannot determine " - "if %s is local" % host) - return False - s = socket.socket(ai[0], socket.SOCK_DGRAM) - finally: - if f: - f.close() - try: - s.bind(ai[-1]) - locaddr = True - break - except: - pass - s.close() - return locaddr +@cache_output +def get_my_uuid(): + cmd = ["gluster", "system::", "uuid", "get", "--xml"] + rc, out, err = execute(cmd) + + if rc != 0: + return None + + tree = etree.fromstring(out) + uuid_el = tree.find("uuidGenerate/uuid") + return uuid_el.text + + +def is_host_local(host_uuid): + # Get UUID only if it is not done previously + # else Cache the UUID value + my_uuid = get_my_uuid() + if my_uuid == host_uuid: + return True + + return False + + +def get_changelog_rollover_time(volumename): + cmd = ["gluster", "volume", "get", volumename, + "changelog.rollover-time", "--xml"] + rc, out, err = execute(cmd) + + if rc != 0: + return DEFAULT_CHANGELOG_INTERVAL + + try: + tree = etree.fromstring(out) + return int(tree.find('volGetopts/Value').text) + except ParseError: + return DEFAULT_CHANGELOG_INTERVAL + + -- cgit