diff options
Diffstat (limited to 'tools/glusterfind/src')
| -rw-r--r-- | tools/glusterfind/src/Makefile.am | 6 | ||||
| -rw-r--r-- | tools/glusterfind/src/__init__.py | 2 | ||||
| -rw-r--r-- | tools/glusterfind/src/brickfind.py | 42 | ||||
| -rw-r--r-- | tools/glusterfind/src/changelog.py | 198 | ||||
| -rw-r--r-- | tools/glusterfind/src/changelogdata.py | 102 | ||||
| -rw-r--r-- | tools/glusterfind/src/conf.py | 9 | ||||
| -rw-r--r-- | tools/glusterfind/src/gfind_py2py3.py | 88 | ||||
| -rw-r--r-- | tools/glusterfind/src/libgfchangelog.py | 43 | ||||
| -rw-r--r-- | tools/glusterfind/src/main.py | 592 | ||||
| -rw-r--r-- | tools/glusterfind/src/nodeagent.py | 28 | ||||
| -rw-r--r-- | tools/glusterfind/src/utils.py | 63 |
11 files changed, 927 insertions, 246 deletions
diff --git a/tools/glusterfind/src/Makefile.am b/tools/glusterfind/src/Makefile.am index 541ff946c04..43b6141b01c 100644 --- a/tools/glusterfind/src/Makefile.am +++ b/tools/glusterfind/src/Makefile.am @@ -1,12 +1,14 @@ -glusterfinddir = $(libexecdir)/glusterfs/glusterfind +glusterfinddir = $(GLUSTERFS_LIBEXECDIR)/glusterfind +if WITH_SERVER glusterfind_PYTHON = conf.py utils.py __init__.py \ - main.py libgfchangelog.py changelogdata.py + main.py libgfchangelog.py changelogdata.py gfind_py2py3.py glusterfind_SCRIPTS = changelog.py nodeagent.py \ brickfind.py glusterfind_DATA = tool.conf +endif EXTRA_DIST = changelog.py nodeagent.py brickfind.py \ tool.conf changelogdata.py diff --git a/tools/glusterfind/src/__init__.py b/tools/glusterfind/src/__init__.py index eb941c6d67c..1753698b5fa 100644 --- a/tools/glusterfind/src/__init__.py +++ b/tools/glusterfind/src/__init__.py @@ -1,4 +1,4 @@ -#!/usr/bin/env python +# -*- coding: utf-8 -*- # Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com/> # This file is part of GlusterFS. diff --git a/tools/glusterfind/src/brickfind.py b/tools/glusterfind/src/brickfind.py index f300638d602..73b6350188d 100644 --- a/tools/glusterfind/src/brickfind.py +++ b/tools/glusterfind/src/brickfind.py @@ -1,4 +1,5 @@ -#!/usr/bin/env python +#!/usr/bin/python3 +# -*- coding: utf-8 -*- # Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com/> # This file is part of GlusterFS. @@ -12,7 +13,10 @@ import os import sys import logging from argparse import ArgumentParser, RawDescriptionHelpFormatter -import urllib +try: + import urllib.parse as urllib +except ImportError: + import urllib import time from utils import mkdirp, setup_logger, create_file, output_write, find @@ -37,10 +41,20 @@ def brickfind_crawl(brick, args): with open(args.outfile, "a+") as fout: brick_path_len = len(brick) - def output_callback(path, filter_result): + def output_callback(path, filter_result, is_dir): path = path.strip() path = path[brick_path_len+1:] - output_write(fout, path, args.output_prefix, encode=True) + + if args.type == "both": + output_write(fout, path, args.output_prefix, + encode=(not args.no_encode), tag=args.tag, + field_separator=args.field_separator) + else: + if (is_dir and args.type == "d") or ( + (not is_dir) and args.type == "f"): + output_write(fout, path, args.output_prefix, + encode=(not args.no_encode), tag=args.tag, + field_separator=args.field_separator) ignore_dirs = [os.path.join(brick, dirname) for dirname in @@ -59,12 +73,23 @@ def _get_args(): parser.add_argument("session", help="Session Name") parser.add_argument("volume", help="Volume Name") + parser.add_argument("node", help="Node Name") parser.add_argument("brick", help="Brick Name") parser.add_argument("outfile", help="Output File") - parser.add_argument("start", help="Start Time", type=float) + parser.add_argument("tag", help="Tag to prefix file name with") + parser.add_argument("--only-query", help="Only query, No session update", + action="store_true") parser.add_argument("--debug", help="Debug", action="store_true") + parser.add_argument("--no-encode", + help="Do not encode path in outfile", + action="store_true") parser.add_argument("--output-prefix", help="File prefix in output", default=".") + parser.add_argument('--type', help="type: f, f-files only" + " d, d-directories only, by default = both", + default='both') + parser.add_argument("--field-separator", help="Field separator", + default=" ") return parser.parse_args() @@ -73,7 +98,7 @@ if __name__ == "__main__": args = _get_args() session_dir = os.path.join(conf.get_opt("session_dir"), args.session) status_file = os.path.join(session_dir, args.volume, - "%s.status" % urllib.quote_plus(args.brick)) + "%s.status" % urllib.quote_plus(args.brick)) status_file_pre = status_file + ".pre" mkdirp(os.path.join(session_dir, args.volume), exit_on_err=True, logger=logger) @@ -87,6 +112,7 @@ if __name__ == "__main__": time_to_update = int(time.time()) brickfind_crawl(args.brick, args) - with open(status_file_pre, "w", buffering=0) as f: - f.write(str(time_to_update)) + if not args.only_query: + with open(status_file_pre, "w") as f: + f.write(str(time_to_update)) sys.exit(0) diff --git a/tools/glusterfind/src/changelog.py b/tools/glusterfind/src/changelog.py index b5f71c7c0ee..a5e9ea4288f 100644 --- a/tools/glusterfind/src/changelog.py +++ b/tools/glusterfind/src/changelog.py @@ -1,4 +1,5 @@ -#!/usr/bin/env python +#!/usr/bin/python3 +# -*- coding: utf-8 -*- # Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com/> # This file is part of GlusterFS. @@ -13,14 +14,20 @@ import sys import time import xattr import logging +from gfind_py2py3 import bytearray_to_str from argparse import ArgumentParser, RawDescriptionHelpFormatter import hashlib -import urllib +try: + import urllib.parse as urllib +except ImportError: + import urllib +import codecs import libgfchangelog from utils import mkdirp, symlink_gfid_to_path from utils import fail, setup_logger, find from utils import get_changelog_rollover_time +from utils import output_path_prepare from changelogdata import ChangelogData import conf @@ -37,19 +44,6 @@ history_turn_time = 0 logger = logging.getLogger() -def output_path_prepare(path, output_prefix): - """ - If Prefix is set, joins to Path, removes ending slash - and encodes it. - """ - if output_prefix != ".": - path = os.path.join(output_prefix, path) - if path.endswith("/"): - path = path[0:len(path)-1] - - return urllib.quote_plus(path) - - def pgfid_to_path(brick, changelog_data): """ For all the pgfids in table, converts into path using recursive @@ -57,14 +51,17 @@ def pgfid_to_path(brick, changelog_data): """ # pgfid1 to path1 in case of CREATE/MKNOD/MKDIR/LINK/SYMLINK for row in changelog_data.gfidpath_get_distinct("pgfid1", {"path1": ""}): - # In case of Data/Metadata only, pgfid1 will not be their + # In case of Data/Metadata only, pgfid1 will not be there if row[0] == "": continue - path = symlink_gfid_to_path(brick, row[0]) - path = output_path_prepare(path, args.output_prefix) - - changelog_data.gfidpath_set_path1(path, row[0]) + try: + path = symlink_gfid_to_path(brick, row[0]) + path = output_path_prepare(path, args) + changelog_data.gfidpath_set_path1(path, row[0]) + except (IOError, OSError) as e: + logger.warn("Error converting to path: %s" % e) + continue # pgfid2 to path2 in case of RENAME for row in changelog_data.gfidpath_get_distinct("pgfid2", @@ -74,13 +71,14 @@ def pgfid_to_path(brick, changelog_data): if row[0] == "": continue - path = symlink_gfid_to_path(brick, row[0]) - if path == "": + try: + path = symlink_gfid_to_path(brick, row[0]) + path = output_path_prepare(path, args) + changelog_data.gfidpath_set_path2(path, row[0]) + except (IOError, OSError) as e: + logger.warn("Error converting to path: %s" % e) continue - path = output_path_prepare(path, args.output_prefix) - changelog_data.gfidpath_set_path2(path, row[0]) - def populate_pgfid_and_inodegfid(brick, changelog_data): """ @@ -94,29 +92,69 @@ def populate_pgfid_and_inodegfid(brick, changelog_data): p = os.path.join(brick, ".glusterfs", gfid[0:2], gfid[2:4], gfid) if os.path.islink(p): # It is a Directory if GFID backend path is symlink - path = symlink_gfid_to_path(brick, gfid) - if path == "": + try: + path = symlink_gfid_to_path(brick, gfid) + path = output_path_prepare(path, args) + changelog_data.gfidpath_update({"path1": path}, + {"gfid": gfid}) + except (IOError, OSError) as e: + logger.warn("Error converting to path: %s" % e) continue - - path = output_path_prepare(path, args.output_prefix) - - changelog_data.gfidpath_update({"path1": path}, - {"gfid": row[0]}) else: try: # INODE and GFID to inodegfid table changelog_data.inodegfid_add(os.stat(p).st_ino, gfid) file_xattrs = xattr.list(p) for x in file_xattrs: - if x.startswith("trusted.pgfid."): + x_str = bytearray_to_str(x) + if x_str.startswith("trusted.pgfid."): # PGFID in pgfid table - changelog_data.pgfid_add(x.split(".")[-1]) + changelog_data.pgfid_add(x_str.split(".")[-1]) except (IOError, OSError): # All OS Errors ignored, since failures will be logged # in End. All GFIDs present in gfidpath table continue +def enum_hard_links_using_gfid2path(brick, gfid, args): + hardlinks = [] + p = os.path.join(brick, ".glusterfs", gfid[0:2], gfid[2:4], gfid) + if not os.path.isdir(p): + # we have a symlink or a normal file + try: + file_xattrs = xattr.list(p) + for x in file_xattrs: + x_str = bytearray_to_str(x) + if x_str.startswith("trusted.gfid2path."): + # get the value for the xattr i.e. <PGFID>/<BN> + v = xattr.getxattr(p, x_str) + v_str = bytearray_to_str(v) + pgfid, bn = v_str.split(os.sep) + try: + path = symlink_gfid_to_path(brick, pgfid) + fullpath = os.path.join(path, bn) + fullpath = output_path_prepare(fullpath, args) + hardlinks.append(fullpath) + except (IOError, OSError) as e: + logger.warn("Error converting to path: %s" % e) + continue + except (IOError, OSError): + pass + return hardlinks + + +def gfid_to_all_paths_using_gfid2path(brick, changelog_data, args): + path = "" + for row in changelog_data.gfidpath_get({"path1": "", "type": "MODIFY"}): + gfid = row[3].strip() + logger.debug("Processing gfid %s" % gfid) + hardlinks = enum_hard_links_using_gfid2path(brick, gfid, args) + + path = ",".join(hardlinks) + + changelog_data.gfidpath_update({"path1": path}, {"gfid": gfid}) + + def gfid_to_path_using_pgfid(brick, changelog_data, args): """ For all the pgfids collected, Converts to Path and @@ -151,7 +189,7 @@ def gfid_to_path_using_pgfid(brick, changelog_data, args): path = path.strip() path = path[brick_path_len+1:] - path = output_path_prepare(path, args.output_prefix) + path = output_path_prepare(path, args) changelog_data.append_path1(path, inode) changelog_data.inodegfid_update({"converted": 1}, {"inode": inode}) @@ -161,12 +199,16 @@ def gfid_to_path_using_pgfid(brick, changelog_data, args): conf.get_opt("brick_ignore_dirs").split(",")] for row in changelog_data.pgfid_get(): - path = symlink_gfid_to_path(brick, row[0]) - find(os.path.join(brick, path), - callback_func=output_callback, - filter_func=inode_filter, - ignore_dirs=ignore_dirs, - subdirs_crawl=False) + try: + path = symlink_gfid_to_path(brick, row[0]) + find(os.path.join(brick, path), + callback_func=output_callback, + filter_func=inode_filter, + ignore_dirs=ignore_dirs, + subdirs_crawl=False) + except (IOError, OSError) as e: + logger.warn("Error converting to path: %s" % e) + continue def gfid_to_path_using_batchfind(brick, changelog_data): @@ -195,7 +237,7 @@ def gfid_to_path_using_batchfind(brick, changelog_data): # Also updates converted flag in inodegfid table as 1 path = path.strip() path = path[brick_path_len+1:] - path = output_path_prepare(path, args.output_prefix) + path = output_path_prepare(path, args) changelog_data.append_path1(path, inode) @@ -209,11 +251,11 @@ def gfid_to_path_using_batchfind(brick, changelog_data): ignore_dirs=ignore_dirs) -def parse_changelog_to_db(changelog_data, filename): +def parse_changelog_to_db(changelog_data, filename, args): """ Parses a Changelog file and populates data in gfidpath table """ - with open(filename) as f: + with codecs.open(filename, encoding="utf-8") as f: changelogfile = os.path.basename(filename) for line in f: data = line.strip().split(" ") @@ -245,7 +287,7 @@ def get_changes(brick, hash_dir, log_file, start, end, args): session_dir = os.path.join(conf.get_opt("session_dir"), args.session) status_file = os.path.join(session_dir, args.volume, - "%s.status" % urllib.quote_plus(args.brick)) + "%s.status" % urllib.quote_plus(args.brick)) # Get previous session try: @@ -262,7 +304,7 @@ def get_changes(brick, hash_dir, log_file, start, end, args): fail("%s Changelog register failed: %s" % (brick, e), logger=logger) # Output files to record GFIDs and GFID to Path failure GFIDs - changelog_data = ChangelogData(args.outfile) + changelog_data = ChangelogData(args.outfile, args) # Changelogs path(Hard coded to BRICK/.glusterfs/changelogs cl_path = os.path.join(brick, ".glusterfs/changelogs") @@ -272,9 +314,10 @@ def get_changes(brick, hash_dir, log_file, start, end, args): actual_end = libgfchangelog.cl_history_changelog( cl_path, start, end, CHANGELOGAPI_NUM_WORKERS) except libgfchangelog.ChangelogException as e: - fail("%s Historical Changelogs not available: %s" % (brick, e), - logger=logger) + fail("%s: %s Historical Changelogs not available: %s" % + (args.node, brick, e), logger=logger) + logger.info("[1/4] Starting changelog parsing ...") try: # scan followed by getchanges till scan returns zero. # history_scan() is blocking call, till it gets the number @@ -284,33 +327,46 @@ def get_changes(brick, hash_dir, log_file, start, end, args): # history_getchanges() changes = [] while libgfchangelog.cl_history_scan() > 0: - changes += libgfchangelog.cl_history_getchanges() + changes = libgfchangelog.cl_history_getchanges() for change in changes: # Ignore if last processed changelog comes # again in list if change.endswith(".%s" % start): continue - parse_changelog_to_db(changelog_data, change) - libgfchangelog.cl_history_done(change) + try: + parse_changelog_to_db(changelog_data, change, args) + libgfchangelog.cl_history_done(change) + except IOError as e: + logger.warn("Error parsing changelog file %s: %s" % + (change, e)) changelog_data.commit() except libgfchangelog.ChangelogException as e: fail("%s Error during Changelog Crawl: %s" % (brick, e), logger=logger) + logger.info("[1/4] Finished changelog parsing.") + # Convert all pgfid available from Changelogs + logger.info("[2/4] Starting 'pgfid to path' conversions ...") pgfid_to_path(brick, changelog_data) changelog_data.commit() + logger.info("[2/4] Finished 'pgfid to path' conversions.") - # Convert all GFIDs for which no other additional details available - gfid_to_path_using_pgfid(brick, changelog_data, args) + # Convert all gfids recorded for data and metadata to all hardlink paths + logger.info("[3/4] Starting 'gfid2path' conversions ...") + gfid_to_all_paths_using_gfid2path(brick, changelog_data, args) changelog_data.commit() + logger.info("[3/4] Finished 'gfid2path' conversions.") # If some GFIDs fail to get converted from previous step, # convert using find + logger.info("[4/4] Starting 'gfid to path using batchfind' " + "conversions ...") gfid_to_path_using_batchfind(brick, changelog_data) changelog_data.commit() + logger.info("[4/4] Finished 'gfid to path using batchfind' conversions.") return actual_end @@ -324,7 +380,7 @@ def changelog_crawl(brick, start, end, args): # WORKING_DIR/BRICKHASH/OUTFILE working_dir = os.path.dirname(args.outfile) - brickhash = hashlib.sha1(brick) + brickhash = hashlib.sha1(brick.encode()) brickhash = str(brickhash.hexdigest()) working_dir = os.path.join(working_dir, brickhash) @@ -346,12 +402,20 @@ def _get_args(): parser.add_argument("session", help="Session Name") parser.add_argument("volume", help="Volume Name") + parser.add_argument("node", help="Node Name") parser.add_argument("brick", help="Brick Name") parser.add_argument("outfile", help="Output File") parser.add_argument("start", help="Start Time", type=int) + parser.add_argument("end", help="End Time", type=int) + parser.add_argument("--only-query", help="Query mode only (no session)", + action="store_true") parser.add_argument("--debug", help="Debug", action="store_true") + parser.add_argument("--no-encode", + help="Do not encode path in outfile", + action="store_true") parser.add_argument("--output-prefix", help="File prefix in output", default=".") + parser.add_argument("--type",default="both") parser.add_argument("-N", "--only-namespace-changes", help="List only namespace changes", action="store_true") @@ -371,24 +435,34 @@ if __name__ == "__main__": session_dir = os.path.join(conf.get_opt("session_dir"), args.session) status_file = os.path.join(session_dir, args.volume, - "%s.status" % urllib.quote_plus(args.brick)) + "%s.status" % urllib.quote_plus(args.brick)) status_file_pre = status_file + ".pre" mkdirp(os.path.join(session_dir, args.volume), exit_on_err=True, logger=logger) - try: - with open(status_file) as f: - start = int(f.read().strip()) - except (ValueError, OSError, IOError): + end = -1 + if args.only_query: start = args.start + end = args.end + else: + try: + with open(status_file) as f: + start = int(f.read().strip()) + except (ValueError, OSError, IOError): + start = args.start + + # end time is optional; so a -1 may be sent to use the default method of + # identifying the end time + if end == -1: + end = int(time.time()) - get_changelog_rollover_time(args.volume) - end = int(time.time()) - get_changelog_rollover_time(args.volume) logger.info("%s Started Changelog Crawl - Start: %s End: %s" % (args.brick, start, end)) actual_end = changelog_crawl(args.brick, start, end, args) - with open(status_file_pre, "w", buffering=0) as f: - f.write(str(actual_end)) + if not args.only_query: + with open(status_file_pre, "w") as f: + f.write(str(actual_end)) logger.info("%s Finished Changelog Crawl - End: %s" % (args.brick, actual_end)) diff --git a/tools/glusterfind/src/changelogdata.py b/tools/glusterfind/src/changelogdata.py index c42aa2a2315..641593cf4b1 100644 --- a/tools/glusterfind/src/changelogdata.py +++ b/tools/glusterfind/src/changelogdata.py @@ -1,4 +1,4 @@ -#!/usr/bin/env python +# -*- coding: utf-8 -*- # Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com/> # This file is part of GlusterFS. @@ -9,10 +9,10 @@ # cases as published by the Free Software Foundation. import sqlite3 -import urllib import os -from utils import RecordType +from utils import RecordType, unquote_plus_space_newline +from utils import output_path_prepare class OutputMerger(object): @@ -82,13 +82,15 @@ class OutputMerger(object): class ChangelogData(object): - def __init__(self, dbpath): + def __init__(self, dbpath, args): self.conn = sqlite3.connect(dbpath) self.cursor = self.conn.cursor() self.cursor_reader = self.conn.cursor() self._create_table_gfidpath() self._create_table_pgfid() self._create_table_inodegfid() + self.args = args + self.path_sep = "/" def _create_table_gfidpath(self): drop_table = "DROP TABLE IF EXISTS gfidpath" @@ -100,16 +102,21 @@ class ChangelogData(object): ts VARCHAR, type VARCHAR, gfid VARCHAR(40), - pgfid1 VARCHAR(40), - bn1 VARCHAR(500), - pgfid2 VARCHAR(40), - bn2 VARCHAR(500), + pgfid1 VARCHAR(40) DEFAULT '', + bn1 VARCHAR(500) DEFAULT '', + pgfid2 VARCHAR(40) DEFAULT '', + bn2 VARCHAR(500) DEFAULT '', path1 VARCHAR DEFAULT '', path2 VARCHAR DEFAULT '' ) """ self.cursor.execute(create_table) + create_index = """ + CREATE INDEX gfid_index ON gfidpath(gfid); + """ + self.cursor.execute(create_index) + def _create_table_inodegfid(self): drop_table = "DROP TABLE IF EXISTS inodegfid" self.cursor.execute(drop_table) @@ -283,7 +290,7 @@ class ChangelogData(object): def append_path1(self, path, inode): # || is for concatenate in SQL - query = """UPDATE gfidpath SET path1 = ',' || ? + query = """UPDATE gfidpath SET path1 = path1 || ',' || ? WHERE gfid IN (SELECT gfid FROM inodegfid WHERE inode = ?)""" self.cursor.execute(query, (path, inode)) @@ -293,8 +300,8 @@ class ChangelogData(object): update_str1 = "? || bn1" update_str2 = "? || bn2" else: - update_str1 = "? || '%2F' || bn1" - update_str2 = "? || '%2F' || bn2" + update_str1 = "? || '{0}' || bn1".format(self.path_sep) + update_str2 = "? || '{0}' || bn2".format(self.path_sep) query = """UPDATE gfidpath SET path1 = %s WHERE pgfid1 = ?""" % update_str1 @@ -310,7 +317,7 @@ class ChangelogData(object): if path2 == "": update_str = "? || bn2" else: - update_str = "? || '%2F' || bn2" + update_str = "? || '{0}' || bn2".format(self.path_sep) query = """UPDATE gfidpath SET path2 = %s WHERE pgfid2 = ?""" % update_str @@ -319,21 +326,21 @@ class ChangelogData(object): def when_create_mknod_mkdir(self, changelogfile, data): # E <GFID> <MKNOD|CREATE|MKDIR> <MODE> <USER> <GRP> <PGFID>/<BNAME> # Add the Entry to DB - pgfid1, bn1 = urllib.unquote_plus(data[6]).split("/", 1) + pgfid1, bn1 = data[6].split("/", 1) - # Quote again the basename - bn1 = urllib.quote_plus(bn1.strip()) + if self.args.no_encode: + bn1 = unquote_plus_space_newline(bn1).strip() self.gfidpath_add(changelogfile, RecordType.NEW, data[1], pgfid1, bn1) def when_rename(self, changelogfile, data): # E <GFID> RENAME <OLD_PGFID>/<BNAME> <PGFID>/<BNAME> - pgfid1, bn1 = urllib.unquote_plus(data[3]).split("/", 1) - pgfid2, bn2 = urllib.unquote_plus(data[4]).split("/", 1) + pgfid1, bn1 = data[3].split("/", 1) + pgfid2, bn2 = data[4].split("/", 1) - # Quote again the basename - bn1 = urllib.quote_plus(bn1.strip()) - bn2 = urllib.quote_plus(bn2.strip()) + if self.args.no_encode: + bn1 = unquote_plus_space_newline(bn1).strip() + bn2 = unquote_plus_space_newline(bn2).strip() if self.gfidpath_exists({"gfid": data[1], "type": "NEW", "pgfid1": pgfid1, "bn1": bn1}): @@ -344,37 +351,58 @@ class ChangelogData(object): "pgfid1": pgfid1, "bn1": bn1}) elif self.gfidpath_exists({"gfid": data[1], "type": "RENAME", "pgfid2": pgfid1, "bn2": bn1}): - # If <OLD_PGFID>/<BNAME> is same as <PGFID2>/<BN2>(may be previous - # RENAME) then UPDATE <NEW_PGFID>/<BNAME> as <PGFID2>/<BN2> - self.gfidpath_update({"pgfid2": pgfid2, "bn2": bn2}, - {"gfid": data[1], "type": "RENAME", - "pgfid2": pgfid1, "bn2": bn1}) + # If we are renaming file back to original name then just + # delete the entry since it will effectively be a no-op + if self.gfidpath_exists({"gfid": data[1], "type": "RENAME", + "pgfid2": pgfid1, "bn2": bn1, + "pgfid1": pgfid2, "bn1": bn2}): + self.gfidpath_delete({"gfid": data[1], "type": "RENAME", + "pgfid2": pgfid1, "bn2": bn1}) + else: + # If <OLD_PGFID>/<BNAME> is same as <PGFID2>/<BN2> + # (may be previous RENAME) + # then UPDATE <NEW_PGFID>/<BNAME> as <PGFID2>/<BN2> + self.gfidpath_update({"pgfid2": pgfid2, "bn2": bn2}, + {"gfid": data[1], "type": "RENAME", + "pgfid2": pgfid1, "bn2": bn1}) else: # Else insert as RENAME self.gfidpath_add(changelogfile, RecordType.RENAME, data[1], pgfid1, bn1, pgfid2, bn2) + if self.gfidpath_exists({"gfid": data[1], "type": "MODIFY"}): + # If MODIFY exists already for that GFID, remove it and insert + # again so that MODIFY entry comes after RENAME entry + # Output will have MODIFY <NEWNAME> + self.gfidpath_delete({"gfid": data[1], "type": "MODIFY"}) + self.gfidpath_add(changelogfile, RecordType.MODIFY, data[1]) + def when_link_symlink(self, changelogfile, data): # E <GFID> <LINK|SYMLINK> <PGFID>/<BASENAME> # Add as New record in Db as Type NEW - pgfid1, bn1 = urllib.unquote_plus(data[3]).split("/", 1) - - # Quote again the basename - bn1 = urllib.quote_plus(bn1.strip()) + pgfid1, bn1 = data[3].split("/", 1) + if self.args.no_encode: + bn1 = unquote_plus_space_newline(bn1).strip() self.gfidpath_add(changelogfile, RecordType.NEW, data[1], pgfid1, bn1) def when_data_meta(self, changelogfile, data): # If GFID row exists, Ignore else Add to Db - if not self.gfidpath_exists({"gfid": data[1]}): + if not self.gfidpath_exists({"gfid": data[1], "type": "NEW"}) and \ + not self.gfidpath_exists({"gfid": data[1], "type": "MODIFY"}): self.gfidpath_add(changelogfile, RecordType.MODIFY, data[1]) def when_unlink_rmdir(self, changelogfile, data): # E <GFID> <UNLINK|RMDIR> <PGFID>/<BASENAME> - pgfid1, bn1 = urllib.unquote_plus(data[3]).split("/", 1) - # Quote again the basename - bn1 = urllib.quote_plus(bn1.strip()) + pgfid1, bn1 = data[3].split("/", 1) + + if self.args.no_encode: + bn1 = unquote_plus_space_newline(bn1).strip() + deleted_path = data[4] if len(data) == 5 else "" + if deleted_path != "": + deleted_path = unquote_plus_space_newline(deleted_path) + deleted_path = output_path_prepare(deleted_path, self.args) if self.gfidpath_exists({"gfid": data[1], "type": "NEW", "pgfid1": pgfid1, "bn1": bn1}): @@ -400,12 +428,12 @@ class ChangelogData(object): "bn2": bn1}) # If deleted directory is parent for somebody - query1 = """UPDATE gfidpath SET path1 = ? || '%2F' || bn1 - WHERE pgfid1 = ? AND path1 != ''""" + query1 = """UPDATE gfidpath SET path1 = ? || '{0}' || bn1 + WHERE pgfid1 = ? AND path1 != ''""".format(self.path_sep) self.cursor.execute(query1, (deleted_path, data[1])) - query1 = """UPDATE gfidpath SET path2 = ? || '%2F' || bn1 - WHERE pgfid2 = ? AND path2 != ''""" + query1 = """UPDATE gfidpath SET path2 = ? || '{0}' || bn1 + WHERE pgfid2 = ? AND path2 != ''""".format(self.path_sep) self.cursor.execute(query1, (deleted_path, data[1])) def commit(self): diff --git a/tools/glusterfind/src/conf.py b/tools/glusterfind/src/conf.py index 2c6eac2bb14..3849ba5dd1f 100644 --- a/tools/glusterfind/src/conf.py +++ b/tools/glusterfind/src/conf.py @@ -1,4 +1,4 @@ -#!/usr/bin/env python +# -*- coding: utf-8 -*- # Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com/> # This file is part of GlusterFS. @@ -9,9 +9,12 @@ # cases as published by the Free Software Foundation. import os -import ConfigParser +try: + from ConfigParser import ConfigParser +except ImportError: + from configparser import ConfigParser -config = ConfigParser.ConfigParser() +config = ConfigParser() config.read(os.path.join(os.path.dirname(os.path.abspath(__file__)), "tool.conf")) diff --git a/tools/glusterfind/src/gfind_py2py3.py b/tools/glusterfind/src/gfind_py2py3.py new file mode 100644 index 00000000000..87324fbf350 --- /dev/null +++ b/tools/glusterfind/src/gfind_py2py3.py @@ -0,0 +1,88 @@ +# +# Copyright (c) 2018 Red Hat, Inc. <http://www.redhat.com> +# This file is part of GlusterFS. + +# This file is licensed to you under your choice of the GNU Lesser +# General Public License, version 3 or any later version (LGPLv3 or +# later), or the GNU General Public License, version 2 (GPLv2), in all +# cases as published by the Free Software Foundation. +# + +# All python2/python3 compatibility routines + +import os +import sys +from ctypes import create_string_buffer + +if sys.version_info >= (3,): + + # Raw conversion of bytearray to string. Used in the cases where + # buffer is created by create_string_buffer which is a 8-bit char + # array and passed to syscalls to fetch results. Using encode/decode + # doesn't work as it converts to string altering the size. + # def bytearray_to_str(byte_arr): + def bytearray_to_str(byte_arr): + return ''.join([chr(b) for b in byte_arr]) + + def gf_create_string_buffer(size): + return create_string_buffer(b'\0', size) + + def gfind_history_changelog(libgfc, changelog_path, start, end, num_parallel, + actual_end): + return libgfc.gf_history_changelog(changelog_path.encode(), start, end, num_parallel, + actual_end) + + def gfind_changelog_register(libgfc, brick, path, log_file, log_level, + retries): + return libgfc.gf_changelog_register(brick.encode(), path.encode(), log_file.encode(), + log_level, retries) + + def gfind_history_changelog_done(libgfc, clfile): + return libgfc.gf_history_changelog_done(clfile.encode()) + + def gfind_write_row(f, row, field_separator, p_rep, row_2_rep): + f.write(u"{0}{1}{2}{3}{4}\n".format(row, + field_separator, + p_rep, + field_separator, + row_2_rep)) + + def gfind_write(f, row, field_separator, p_rep): + f.write(u"{0}{1}{2}\n".format(row, + field_separator, + p_rep)) + + +else: + + # Raw conversion of bytearray to string + def bytearray_to_str(byte_arr): + return byte_arr + + def gf_create_string_buffer(size): + return create_string_buffer('\0', size) + + def gfind_history_changelog(libgfc, changelog_path, start, end, num_parallel, + actual_end): + return libgfc.gf_history_changelog(changelog_path, start, end, + num_parallel, actual_end) + + def gfind_changelog_register(libgfc, brick, path, log_file, log_level, + retries): + return libgfc.gf_changelog_register(brick, path, log_file, + log_level, retries) + + def gfind_history_changelog_done(libgfc, clfile): + return libgfc.gf_history_changelog_done(clfile) + + def gfind_write_row(f, row, field_separator, p_rep, row_2_rep): + f.write(u"{0}{1}{2}{3}{4}\n".format(row, + field_separator, + p_rep, + field_separator, + row_2_rep).encode()) + + def gfind_write(f, row, field_separator, p_rep): + f.write(u"{0}{1}{2}\n".format(row, + field_separator, + p_rep).encode()) diff --git a/tools/glusterfind/src/libgfchangelog.py b/tools/glusterfind/src/libgfchangelog.py index 44e8fd5a61a..513bb101e93 100644 --- a/tools/glusterfind/src/libgfchangelog.py +++ b/tools/glusterfind/src/libgfchangelog.py @@ -1,4 +1,4 @@ -#!/usr/bin/env python +# -*- coding: utf-8 -*- # Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com/> # This file is part of GlusterFS. @@ -9,51 +9,52 @@ # cases as published by the Free Software Foundation. import os -from ctypes import CDLL, get_errno, create_string_buffer, c_ulong, byref -from ctypes import RTLD_GLOBAL +from ctypes import CDLL, RTLD_GLOBAL, get_errno, create_string_buffer, c_ulong, byref from ctypes.util import find_library +from gfind_py2py3 import bytearray_to_str, gf_create_string_buffer +from gfind_py2py3 import gfind_history_changelog, gfind_changelog_register +from gfind_py2py3 import gfind_history_changelog_done class ChangelogException(OSError): pass +libgfc = CDLL(find_library("gfchangelog"), mode=RTLD_GLOBAL, use_errno=True) -libgfc = CDLL(find_library("gfchangelog"), use_errno=True, mode=RTLD_GLOBAL) - -def raise_oserr(): +def raise_oserr(prefix=None): errn = get_errno() - raise ChangelogException(errn, os.strerror(errn)) + prefix_or_empty = prefix + ": " if prefix else "" + raise ChangelogException(errn, prefix_or_empty + os.strerror(errn)) def cl_init(): ret = libgfc.gf_changelog_init(None) if ret == -1: - raise_oserr() + raise_oserr(prefix="gf_changelog_init") def cl_register(brick, path, log_file, log_level, retries=0): - ret = libgfc.gf_changelog_register(brick, path, log_file, - log_level, retries) + ret = gfind_changelog_register(libgfc, brick, path, log_file,log_level, retries) if ret == -1: - raise_oserr() + raise_oserr(prefix="gf_changelog_register") def cl_history_scan(): ret = libgfc.gf_history_changelog_scan() if ret == -1: - raise_oserr() + raise_oserr(prefix="gf_history_changelog_scan") return ret def cl_history_changelog(changelog_path, start, end, num_parallel): actual_end = c_ulong() - ret = libgfc.gf_history_changelog(changelog_path, start, end, + ret = gfind_history_changelog(libgfc,changelog_path, start, end, num_parallel, byref(actual_end)) if ret == -1: - raise_oserr() + raise_oserr(prefix="gf_history_changelog") return actual_end.value @@ -61,7 +62,7 @@ def cl_history_changelog(changelog_path, start, end, num_parallel): def cl_history_startfresh(): ret = libgfc.gf_history_changelog_start_fresh() if ret == -1: - raise_oserr() + raise_oserr(prefix="gf_history_changelog_start_fresh") def cl_history_getchanges(): @@ -70,20 +71,22 @@ def cl_history_getchanges(): return f.split('.')[-1] changes = [] - buf = create_string_buffer('\0', 4096) + buf = gf_create_string_buffer(4096) while True: ret = libgfc.gf_history_changelog_next_change(buf, 4096) if ret in (0, -1): break - changes.append(buf.raw[:ret - 1]) + # py2 and py3 compatibility + result = bytearray_to_str(buf.raw[:ret - 1]) + changes.append(result) if ret == -1: - raise_oserr() + raise_oserr(prefix="gf_history_changelog_next_change") return sorted(changes, key=clsort) def cl_history_done(clfile): - ret = libgfc.gf_history_changelog_done(clfile) + ret = gfind_history_changelog_done(libgfc, clfile) if ret == -1: - raise_oserr() + raise_oserr(prefix="gf_history_changelog_done") diff --git a/tools/glusterfind/src/main.py b/tools/glusterfind/src/main.py index d9936eebde1..4b5466d0114 100644 --- a/tools/glusterfind/src/main.py +++ b/tools/glusterfind/src/main.py @@ -1,4 +1,5 @@ -#!/usr/bin/env python +#!/usr/bin/python3 +# -*- coding: utf-8 -*- # Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com/> # This file is part of GlusterFS. @@ -9,14 +10,20 @@ # cases as published by the Free Software Foundation. import sys -from errno import ENOENT +from errno import ENOENT, ENOTEMPTY import time from multiprocessing import Process import os import xml.etree.cElementTree as etree from argparse import ArgumentParser, RawDescriptionHelpFormatter, Action +from gfind_py2py3 import gfind_write_row, gfind_write import logging import shutil +import tempfile +import signal +from datetime import datetime +import codecs +import re from utils import execute, is_host_local, mkdirp, fail from utils import setup_logger, human_time, handle_rm_error @@ -30,7 +37,9 @@ GlusterFS Incremental API ParseError = etree.ParseError if hasattr(etree, 'ParseError') else SyntaxError logger = logging.getLogger() -node_outfiles = [] +vol_statusStr = "" +gtmpfilename = None +g_pid_nodefile_map = {} class StoreAbsPath(Action): @@ -52,29 +61,59 @@ def node_cmd(host, host_uuid, task, cmd, args, opts): """ Runs command via ssh if host is not local """ - localdir = is_host_local(host_uuid) - - pem_key_path = get_pem_key_path(args.session, args.volume) - - if not localdir: - # prefix with ssh command if not local node - cmd = ["ssh", - "-i", pem_key_path, - "root@%s" % host] + cmd - - execute(cmd, exit_msg="%s - %s failed" % (host, task), logger=logger) - - if opts.get("copy_outfile", False): - cmd_copy = ["scp", - "-i", pem_key_path, - "root@%s:/%s" % (host, opts.get("node_outfile")), - os.path.dirname(opts.get("node_outfile"))] - execute(cmd_copy, exit_msg="%s - Copy command failed" % host, - logger=logger) + try: + localdir = is_host_local(host_uuid) + + # this is so to avoid deleting the ssh keys on local node which + # otherwise cause ssh password prompts on the console (race conditions) + # mode_delete() should be cleaning up the session tree + if localdir and task == "delete": + return + + pem_key_path = get_pem_key_path(args.session, args.volume) + + if not localdir: + # prefix with ssh command if not local node + cmd = ["ssh", + "-oNumberOfPasswordPrompts=0", + "-oStrictHostKeyChecking=no", + # We force TTY allocation (-t -t) so that Ctrl+C is handed + # through; see: + # https://bugzilla.redhat.com/show_bug.cgi?id=1382236 + # Note that this turns stderr of the remote `cmd` + # into stdout locally. + "-t", + "-t", + "-i", pem_key_path, + "root@%s" % host] + cmd + + (returncode, err, out) = execute(cmd, logger=logger) + if returncode != 0: + # Because the `-t -t` above turns the remote stderr into + # local stdout, we need to log both stderr and stdout + # here to print all error messages. + fail("%s - %s failed; stdout (including remote stderr):\n" + "%s\n" + "stderr:\n" + "%s" % (host, task, out, err), + returncode, + logger=logger) + + if opts.get("copy_outfile", False) and not localdir: + cmd_copy = ["scp", + "-oNumberOfPasswordPrompts=0", + "-oStrictHostKeyChecking=no", + "-i", pem_key_path, + "root@%s:/%s" % (host, opts.get("node_outfile")), + os.path.dirname(opts.get("node_outfile"))] + execute(cmd_copy, exit_msg="%s - Copy command failed" % host, + logger=logger) + except KeyboardInterrupt: + sys.exit(2) def run_cmd_nodes(task, args, **kwargs): - global node_outfiles + global g_pid_nodefile_map nodes = get_nodes(args.volume) pool = [] for num, node in enumerate(nodes): @@ -82,35 +121,98 @@ def run_cmd_nodes(task, args, **kwargs): host_uuid = node[0] cmd = [] opts = {} + + # tmpfilename is valid only for tasks: pre, query and cleanup + tmpfilename = kwargs.get("tmpfilename", "BADNAME") + node_outfile = os.path.join(conf.get_opt("working_dir"), args.session, args.volume, + tmpfilename, "tmp_output_%s" % num) if task == "pre": + if vol_statusStr != "Started": + fail("Volume %s is not online" % args.volume, + logger=logger) + # If Full backup is requested or start time is zero, use brickfind change_detector = conf.get_change_detector("changelog") + tag = None if args.full: change_detector = conf.get_change_detector("brickfind") + tag = args.tag_for_full_find.strip() + if tag == "": + tag = '""' if not is_host_local(host_uuid) else "" + + # remote file will be copied into this directory + mkdirp(os.path.dirname(node_outfile), + exit_on_err=True, logger=logger) - node_outfiles.append(node_outfile) + FS = args.field_separator + if not is_host_local(host_uuid): + FS = "'" + FS + "'" cmd = [change_detector, args.session, args.volume, + host, brick, - node_outfile, - str(kwargs.get("start")), - "--output-prefix", - args.output_prefix] + \ + node_outfile] + \ + ([str(kwargs.get("start")), str(kwargs.get("end"))] + if not args.full else []) + \ + ([tag] if tag is not None else []) + \ + ["--output-prefix", args.output_prefix] + \ (["--debug"] if args.debug else []) + \ + (["--no-encode"] if args.no_encode else []) + \ (["--only-namespace-changes"] if args.only_namespace_changes - else []) + else []) + \ + (["--type", args.type]) + \ + (["--field-separator", FS] if args.full else []) + + opts["node_outfile"] = node_outfile + opts["copy_outfile"] = True + elif task == "query": + # If Full backup is requested or start time is zero, use brickfind + tag = None + change_detector = conf.get_change_detector("changelog") + if args.full: + change_detector = conf.get_change_detector("brickfind") + tag = args.tag_for_full_find.strip() + if tag == "": + tag = '""' if not is_host_local(host_uuid) else "" + + # remote file will be copied into this directory + mkdirp(os.path.dirname(node_outfile), + exit_on_err=True, logger=logger) + + FS = args.field_separator + if not is_host_local(host_uuid): + FS = "'" + FS + "'" + + cmd = [change_detector, + args.session, + args.volume, + host, + brick, + node_outfile] + \ + ([str(kwargs.get("start")), str(kwargs.get("end"))] + if not args.full else []) + \ + ([tag] if tag is not None else []) + \ + ["--only-query"] + \ + ["--output-prefix", args.output_prefix] + \ + (["--debug"] if args.debug else []) + \ + (["--no-encode"] if args.no_encode else []) + \ + (["--only-namespace-changes"] + if args.only_namespace_changes else []) + \ + (["--type", args.type]) + \ + (["--field-separator", FS] if args.full else []) opts["node_outfile"] = node_outfile opts["copy_outfile"] = True elif task == "cleanup": - # After pre run, cleanup the working directory and other temp files - # Remove the copied node_outfile in main node + # After pre/query run, cleanup the working directory and other + # temp files. Remove the directory to which node_outfile has + # been copied in main node try: os.remove(node_outfile) except (OSError, IOError): @@ -121,8 +223,14 @@ def run_cmd_nodes(task, args, **kwargs): cmd = [conf.get_opt("nodeagent"), "cleanup", args.session, - args.volume] + (["--debug"] if args.debug else []) + args.volume, + os.path.dirname(node_outfile)] + \ + (["--debug"] if args.debug else []) elif task == "create": + if vol_statusStr != "Started": + fail("Volume %s is not online" % args.volume, + logger=logger) + # When glusterfind create, create session directory in # each brick nodes cmd = [conf.get_opt("nodeagent"), @@ -156,6 +264,7 @@ def run_cmd_nodes(task, args, **kwargs): args=(host, host_uuid, task, cmd, args, opts)) p.start() pool.append(p) + g_pid_nodefile_map[p.pid] = node_outfile for num, p in enumerate(pool): p.join() @@ -163,8 +272,11 @@ def run_cmd_nodes(task, args, **kwargs): logger.warn("Command %s failed in %s" % (task, nodes[num][1])) if task in ["create", "delete"]: fail("Command %s failed in %s" % (task, nodes[num][1])) - elif task == "pre" and args.disable_partial: - sys.exit(1) + elif task == "pre" or task == "query": + if args.disable_partial: + sys.exit(1) + else: + del g_pid_nodefile_map[p.pid] @cache_output @@ -173,18 +285,37 @@ def get_nodes(volume): Get the gluster volume info xml output and parse to get the brick details. """ + global vol_statusStr + cmd = ["gluster", 'volume', 'info', volume, "--xml"] _, data, _ = execute(cmd, exit_msg="Failed to Run Gluster Volume Info", logger=logger) tree = etree.fromstring(data) + # Test to check if volume has been deleted after session creation + count_el = tree.find('volInfo/volumes/count') + if int(count_el.text) == 0: + fail("Unable to get volume details", logger=logger) + + # this status is used in caller: run_cmd_nodes + vol_statusStr = tree.find('volInfo/volumes/volume/statusStr').text + vol_typeStr = tree.find('volInfo/volumes/volume/typeStr').text + nodes = [] volume_el = tree.find('volInfo/volumes/volume') try: - for b in volume_el.findall('bricks/brick'): - nodes.append((b.find('hostUuid').text, - b.find('name').text)) + brick_elems = [] + if vol_typeStr == "Tier": + brick_elems.append('bricks/hotBricks/brick') + brick_elems.append('bricks/coldBricks/brick') + else: + brick_elems.append('bricks/brick') + + for elem in brick_elems: + for b in volume_el.findall(elem): + nodes.append((b.find('hostUuid').text, + b.find('name').text)) except (ParseError, AttributeError, ValueError) as e: fail("Failed to parse Volume Info: %s" % e, logger=logger) @@ -195,6 +326,7 @@ def _get_args(): parser = ArgumentParser(formatter_class=RawDescriptionHelpFormatter, description=PROG_DESCRIPTION) subparsers = parser.add_subparsers(dest="mode") + subparsers.required = True # create <SESSION> <VOLUME> [--debug] [--force] parser_create = subparsers.add_parser('create') @@ -226,6 +358,9 @@ def _get_args(): parser_pre.add_argument("volume", help="Volume Name") parser_pre.add_argument("outfile", help="Output File", action=StoreAbsPath) parser_pre.add_argument("--debug", help="Debug", action="store_true") + parser_pre.add_argument("--no-encode", + help="Do not encode path in output file", + action="store_true") parser_pre.add_argument("--full", help="Full find", action="store_true") parser_pre.add_argument("--disable-partial", help="Disable Partial find, " "Fail when one node fails", action="store_true") @@ -238,6 +373,48 @@ def _get_args(): parser_pre.add_argument("-N", "--only-namespace-changes", help="List only namespace changes", action="store_true") + parser_pre.add_argument("--tag-for-full-find", + help="Tag prefix for file names emitted during" + " a full find operation; default: \"NEW\"", + default="NEW") + parser_pre.add_argument('--type', help="type: f, f-files only" + " d, d-directories only, by default = both", + default='both', choices=["f", "d", "both"]) + parser_pre.add_argument("--field-separator", help="Field separator string", + default=" ") + + # query <VOLUME> <OUTFILE> --since-time <SINCE_TIME> + # [--output-prefix <OUTPUT_PREFIX>] [--full] + parser_query = subparsers.add_parser('query') + parser_query.add_argument("volume", help="Volume Name") + parser_query.add_argument("outfile", help="Output File", + action=StoreAbsPath) + parser_query.add_argument("--since-time", help="UNIX epoch time since " + "which listing is required", type=int) + parser_query.add_argument("--end-time", help="UNIX epoch time up to " + "which listing is required", type=int) + parser_query.add_argument("--no-encode", + help="Do not encode path in output file", + action="store_true") + parser_query.add_argument("--full", help="Full find", action="store_true") + parser_query.add_argument("--debug", help="Debug", action="store_true") + parser_query.add_argument("--disable-partial", help="Disable Partial find," + " Fail when one node fails", action="store_true") + parser_query.add_argument("--output-prefix", help="File prefix in output", + default=".") + parser_query.add_argument("-N", "--only-namespace-changes", + help="List only namespace changes", + action="store_true") + parser_query.add_argument("--tag-for-full-find", + help="Tag prefix for file names emitted during" + " a full find operation; default: \"NEW\"", + default="NEW") + parser_query.add_argument('--type', help="type: f, f-files only" + " d, d-directories only, by default = both", + default='both', choices=["f", "d", "both"]) + parser_query.add_argument("--field-separator", + help="Field separator string", + default=" ") # post <SESSION> <VOLUME> parser_post = subparsers.add_parser('post') @@ -301,14 +478,94 @@ def ssh_setup(args): logger.info("Ssh key added to authorized_keys of Volume nodes") +def enable_volume_options(args): + execute(["gluster", "volume", "set", + args.volume, "build-pgfid", "on"], + exit_msg="Failed to set volume option build-pgfid on", + logger=logger) + logger.info("Volume option set %s, build-pgfid on" % args.volume) + + execute(["gluster", "volume", "set", + args.volume, "changelog.changelog", "on"], + exit_msg="Failed to set volume option " + "changelog.changelog on", logger=logger) + logger.info("Volume option set %s, changelog.changelog on" + % args.volume) + + execute(["gluster", "volume", "set", + args.volume, "changelog.capture-del-path", "on"], + exit_msg="Failed to set volume option " + "changelog.capture-del-path on", logger=logger) + logger.info("Volume option set %s, changelog.capture-del-path on" + % args.volume) + + +def write_output(outfile, outfilemerger, field_separator): + with codecs.open(outfile, "a", encoding="utf-8") as f: + for row in outfilemerger.get(): + # Multiple paths in case of Hardlinks + paths = row[1].split(",") + row_2_rep = None + for p in paths: + if p == "": + continue + p_rep = p.replace("//", "/") + if not row_2_rep: + row_2_rep = row[2].replace("//", "/") + if p_rep == row_2_rep: + continue + + if row_2_rep and row_2_rep != "": + gfind_write_row(f, row[0], field_separator, p_rep, row_2_rep) + + else: + gfind_write(f, row[0], field_separator, p_rep) + +def validate_volume(volume): + cmd = ["gluster", 'volume', 'info', volume, "--xml"] + _, data, _ = execute(cmd, + exit_msg="Failed to Run Gluster Volume Info", + logger=logger) + try: + tree = etree.fromstring(data) + statusStr = tree.find('volInfo/volumes/volume/statusStr').text + except (ParseError, AttributeError) as e: + fail("Invalid Volume: Check the Volume name! %s" % e) + if statusStr != "Started": + fail("Volume %s is not online" % volume) + +# The rules for a valid session name. +SESSION_NAME_RULES = { + 'min_length': 2, + 'max_length': 256, # same as maximum volume length + # Specifies all alphanumeric characters, underscore, hyphen. + 'valid_chars': r'0-9a-zA-Z_-', +} + + +# checks valid session name, fail otherwise +def validate_session_name(session): + # Check for minimum length + if len(session) < SESSION_NAME_RULES['min_length']: + fail('session_name must be at least ' + + str(SESSION_NAME_RULES['min_length']) + ' characters long.') + # Check for maximum length + if len(session) > SESSION_NAME_RULES['max_length']: + fail('session_name must not exceed ' + + str(SESSION_NAME_RULES['max_length']) + ' characters length.') + + # Matches strings composed entirely of characters specified within + if not re.match(r'^[' + SESSION_NAME_RULES['valid_chars'] + + ']+$', session): + fail('Session name can only contain these characters: ' + + SESSION_NAME_RULES['valid_chars']) + + def mode_create(session_dir, args): + validate_session_name(args.session) + logger.debug("Init is called - Session: %s, Volume: %s" % (args.session, args.volume)) - - execute(["gluster", "volume", "info", args.volume], - exit_msg="Unable to get volume details", - logger=logger) - mkdirp(session_dir, exit_on_err=True, logger=logger) mkdirp(os.path.join(session_dir, args.volume), exit_on_err=True, logger=logger) @@ -319,19 +576,7 @@ def mode_create(session_dir, args): if not os.path.exists(status_file) or args.force: ssh_setup(args) - - execute(["gluster", "volume", "set", - args.volume, "build-pgfid", "on"], - exit_msg="Failed to set volume option build-pgfid on", - logger=logger) - logger.info("Volume option set %s, build-pgfid on" % args.volume) - - execute(["gluster", "volume", "set", - args.volume, "changelog.changelog", "on"], - exit_msg="Failed to set volume option " - "changelog.changelog on", logger=logger) - logger.info("Volume option set %s, changelog.changelog on" - % args.volume) + enable_volume_options(args) # Add Rollover time to current time to make sure changelogs # will be available if we use this time as start time @@ -341,13 +586,111 @@ def mode_create(session_dir, args): run_cmd_nodes("create", args, time_to_update=str(time_to_update)) if not os.path.exists(status_file) or args.reset_session_time: - with open(status_file, "w", buffering=0) as f: + with open(status_file, "w") as f: f.write(str(time_to_update)) + sys.stdout.write("Session %s created with volume %s\n" % + (args.session, args.volume)) + sys.exit(0) +def mode_query(session_dir, args): + global gtmpfilename + global g_pid_nodefile_map + + # Verify volume status + cmd = ["gluster", 'volume', 'info', args.volume, "--xml"] + _, data, _ = execute(cmd, + exit_msg="Failed to Run Gluster Volume Info", + logger=logger) + try: + tree = etree.fromstring(data) + statusStr = tree.find('volInfo/volumes/volume/statusStr').text + except (ParseError, AttributeError) as e: + fail("Invalid Volume: %s" % e, logger=logger) + + if statusStr != "Started": + fail("Volume %s is not online" % args.volume, logger=logger) + + mkdirp(session_dir, exit_on_err=True, logger=logger) + mkdirp(os.path.join(session_dir, args.volume), exit_on_err=True, + logger=logger) + mkdirp(os.path.dirname(args.outfile), exit_on_err=True, logger=logger) + + # Configure cluster for pasword-less SSH + ssh_setup(args) + + # Enable volume options for changelog capture + enable_volume_options(args) + + # Test options + if not args.full and args.type in ["f", "d"]: + fail("--type can only be used with --full") + if not args.since_time and not args.end_time and not args.full: + fail("Please specify either {--since-time and optionally --end-time} " + "or --full", logger=logger) + + if args.since_time and args.end_time and args.full: + fail("Please specify either {--since-time and optionally --end-time} " + "or --full, but not both", + logger=logger) + + if args.end_time and not args.since_time: + fail("Please specify --since-time as well", logger=logger) + + # Start query command processing + start = -1 + end = -1 + if args.since_time: + start = args.since_time + if args.end_time: + end = args.end_time + else: + start = 0 # --full option is handled separately + + logger.debug("Query is called - Session: %s, Volume: %s, " + "Start time: %s, End time: %s" + % ("default", args.volume, start, end)) + + prefix = datetime.now().strftime("%Y%m%d-%H%M%S-%f-") + gtmpfilename = prefix + next(tempfile._get_candidate_names()) + + run_cmd_nodes("query", args, start=start, end=end, + tmpfilename=gtmpfilename) + + # Merger + if args.full: + if len(g_pid_nodefile_map) > 0: + cmd = ["sort", "-u"] + list(g_pid_nodefile_map.values()) + \ + ["-o", args.outfile] + execute(cmd, + exit_msg="Failed to merge output files " + "collected from nodes", logger=logger) + else: + fail("Failed to collect any output files from peers. " + "Looks like all bricks are offline.", logger=logger) + else: + # Read each Changelogs db and generate finaldb + create_file(args.outfile, exit_on_err=True, logger=logger) + outfilemerger = OutputMerger(args.outfile + ".db", + list(g_pid_nodefile_map.values())) + write_output(args.outfile, outfilemerger, args.field_separator) + + try: + os.remove(args.outfile + ".db") + except (IOError, OSError): + pass + + run_cmd_nodes("cleanup", args, tmpfilename=gtmpfilename) + + sys.stdout.write("Generated output file %s\n" % args.outfile) + + def mode_pre(session_dir, args): + global gtmpfilename + global g_pid_nodefile_map + """ Read from Session file and write to session.pre file """ @@ -358,6 +701,9 @@ def mode_pre(session_dir, args): mkdirp(os.path.dirname(args.outfile), exit_on_err=True, logger=logger) + if not args.full and args.type in ["f", "d"]: + fail("--type can only be used with --full") + # If Pre status file exists and running pre command again if os.path.exists(status_file_pre) and not args.regenerate_outfile: fail("Post command is not run after last pre, " @@ -377,36 +723,37 @@ def mode_pre(session_dir, args): "Start time: %s, End time: %s" % (args.session, args.volume, start, endtime_to_update)) - run_cmd_nodes("pre", args, start=start) + prefix = datetime.now().strftime("%Y%m%d-%H%M%S-%f-") + gtmpfilename = prefix + next(tempfile._get_candidate_names()) + + run_cmd_nodes("pre", args, start=start, end=-1, tmpfilename=gtmpfilename) # Merger if args.full: - cmd = ["sort", "-u"] + node_outfiles + ["-o", args.outfile] - execute(cmd, - exit_msg="Failed to merge output files " - "collected from nodes", logger=logger) + if len(g_pid_nodefile_map) > 0: + cmd = ["sort", "-u"] + list(g_pid_nodefile_map.values()) + \ + ["-o", args.outfile] + execute(cmd, + exit_msg="Failed to merge output files " + "collected from nodes", logger=logger) + else: + fail("Failed to collect any output files from peers. " + "Looks like all bricks are offline.", logger=logger) else: # Read each Changelogs db and generate finaldb create_file(args.outfile, exit_on_err=True, logger=logger) - outfilemerger = OutputMerger(args.outfile + ".db", node_outfiles) - - with open(args.outfile, "a") as f: - for row in outfilemerger.get(): - # Multiple paths in case of Hardlinks - paths = row[1].split(",") - for p in paths: - if p == "": - continue - f.write("%s %s %s\n" % (row[0], p, row[2])) + outfilemerger = OutputMerger(args.outfile + ".db", + list(g_pid_nodefile_map.values())) + write_output(args.outfile, outfilemerger, args.field_separator) try: os.remove(args.outfile + ".db") except (IOError, OSError): pass - run_cmd_nodes("cleanup", args) + run_cmd_nodes("cleanup", args, tmpfilename=gtmpfilename) - with open(status_file_pre, "w", buffering=0) as f: + with open(status_file_pre, "w") as f: f.write(str(endtime_to_update)) sys.stdout.write("Generated output file %s\n" % args.outfile) @@ -425,6 +772,8 @@ def mode_post(session_dir, args): if os.path.exists(status_file_pre): run_cmd_nodes("post", args) os.rename(status_file_pre, status_file) + sys.stdout.write("Session %s with volume %s updated\n" % + (args.session, args.volume)) sys.exit(0) else: fail("Pre script is not run", logger=logger) @@ -434,6 +783,17 @@ def mode_delete(session_dir, args): run_cmd_nodes("delete", args) shutil.rmtree(os.path.join(session_dir, args.volume), onerror=handle_rm_error) + sys.stdout.write("Session %s with volume %s deleted\n" % + (args.session, args.volume)) + + # If the session contains only this volume, then cleanup the + # session directory. If a session contains multiple volumes + # then os.rmdir will fail with ENOTEMPTY + try: + os.rmdir(session_dir) + except OSError as e: + if not e.errno == ENOTEMPTY: + logger.warn("Failed to delete session directory: %s" % e) def mode_list(session_dir, args): @@ -467,7 +827,7 @@ def mode_list(session_dir, args): last_processed = f.read().strip() except (OSError, IOError) as e: if e.errno == ENOENT: - pass + continue else: raise output.append((session, volname, last_processed)) @@ -489,31 +849,73 @@ def mode_list(session_dir, args): volname.ljust(25), sess_time.ljust(25))) - if not output and (args.session or args.volume): - fail("Invalid Session", logger=logger) + if not output: + if args.session or args.volume: + fail("Invalid Session", logger=logger) + else: + sys.stdout.write("No sessions found.\n") def main(): - args = _get_args() - mkdirp(conf.get_opt("session_dir"), exit_on_err=True) + global gtmpfilename - if args.mode == "list": - session_dir = conf.get_opt("session_dir") - else: - session_dir = os.path.join(conf.get_opt("session_dir"), - args.session) + args = None - if not os.path.exists(session_dir) and args.mode not in ["create", "list"]: - fail("Invalid session %s" % args.session) + try: + args = _get_args() + mkdirp(conf.get_opt("session_dir"), exit_on_err=True) - mkdirp(os.path.join(conf.get_opt("log_dir"), args.session, args.volume), - exit_on_err=True) - log_file = os.path.join(conf.get_opt("log_dir"), - args.session, - args.volume, - "cli.log") - setup_logger(logger, log_file, args.debug) + # force the default session name if mode is "query" + if args.mode == "query": + args.session = "default" + + if args.mode == "list": + session_dir = conf.get_opt("session_dir") + else: + session_dir = os.path.join(conf.get_opt("session_dir"), + args.session) - # globals() will have all the functions already defined. - # mode_<args.mode> will be the function name to be called - globals()["mode_" + args.mode](session_dir, args) + if not os.path.exists(session_dir) and \ + args.mode not in ["create", "list", "query"]: + fail("Invalid session %s" % args.session) + + # volume involved, validate the volume first + if args.mode not in ["list"]: + validate_volume(args.volume) + + + # "default" is a system defined session name + if args.mode in ["create", "post", "pre", "delete"] and \ + args.session == "default": + fail("Invalid session %s" % args.session) + + vol_dir = os.path.join(session_dir, args.volume) + if not os.path.exists(vol_dir) and args.mode not in \ + ["create", "list", "query"]: + fail("Session %s not created with volume %s" % + (args.session, args.volume)) + + mkdirp(os.path.join(conf.get_opt("log_dir"), + args.session, + args.volume), + exit_on_err=True) + log_file = os.path.join(conf.get_opt("log_dir"), + args.session, + args.volume, + "cli.log") + setup_logger(logger, log_file, args.debug) + + # globals() will have all the functions already defined. + # mode_<args.mode> will be the function name to be called + globals()["mode_" + args.mode](session_dir, args) + except KeyboardInterrupt: + if args is not None: + if args.mode == "pre" or args.mode == "query": + # cleanup session + if gtmpfilename is not None: + # no more interrupts until we clean up + signal.signal(signal.SIGINT, signal.SIG_IGN) + run_cmd_nodes("cleanup", args, tmpfilename=gtmpfilename) + + # Interrupted, exit with non zero error code + sys.exit(2) diff --git a/tools/glusterfind/src/nodeagent.py b/tools/glusterfind/src/nodeagent.py index 2e8c2fc9759..679daa6fa76 100644 --- a/tools/glusterfind/src/nodeagent.py +++ b/tools/glusterfind/src/nodeagent.py @@ -1,4 +1,5 @@ -#!/usr/bin/env python +#!/usr/bin/python3 +# -*- coding: utf-8 -*- # Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com/> # This file is part of GlusterFS. @@ -13,7 +14,11 @@ import sys import os import logging from argparse import ArgumentParser, RawDescriptionHelpFormatter -import urllib +try: + import urllib.parse as urllib +except ImportError: + import urllib +from errno import ENOTEMPTY from utils import setup_logger, mkdirp, handle_rm_error import conf @@ -24,7 +29,8 @@ logger = logging.getLogger() def mode_cleanup(args): working_dir = os.path.join(conf.get_opt("working_dir"), args.session, - args.volume) + args.volume, + args.tmpfilename) mkdirp(os.path.join(conf.get_opt("log_dir"), args.session, args.volume), exit_on_err=True) @@ -46,13 +52,13 @@ def mode_create(args): session_dir = os.path.join(conf.get_opt("session_dir"), args.session) status_file = os.path.join(session_dir, args.volume, - "%s.status" % urllib.quote_plus(args.brick)) + "%s.status" % urllib.quote_plus(args.brick)) mkdirp(os.path.join(session_dir, args.volume), exit_on_err=True, logger=logger) if not os.path.exists(status_file) or args.reset_session_time: - with open(status_file, "w", buffering=0) as f: + with open(status_file, "w") as f: f.write(args.time_to_update) sys.exit(0) @@ -61,7 +67,7 @@ def mode_create(args): def mode_post(args): session_dir = os.path.join(conf.get_opt("session_dir"), args.session) status_file = os.path.join(session_dir, args.volume, - "%s.status" % urllib.quote_plus(args.brick)) + "%s.status" % urllib.quote_plus(args.brick)) mkdirp(os.path.join(session_dir, args.volume), exit_on_err=True, logger=logger) @@ -78,6 +84,15 @@ def mode_delete(args): shutil.rmtree(os.path.join(session_dir, args.volume), onerror=handle_rm_error) + # If the session contains only this volume, then cleanup the + # session directory. If a session contains multiple volumes + # then os.rmdir will fail with ENOTEMPTY + try: + os.rmdir(session_dir) + except OSError as e: + if not e.errno == ENOTEMPTY: + logger.warn("Failed to delete session directory: %s" % e) + def _get_args(): parser = ArgumentParser(formatter_class=RawDescriptionHelpFormatter, @@ -87,6 +102,7 @@ def _get_args(): parser_cleanup = subparsers.add_parser('cleanup') parser_cleanup.add_argument("session", help="Session Name") parser_cleanup.add_argument("volume", help="Volume Name") + parser_cleanup.add_argument("tmpfilename", help="Temporary File Name") parser_cleanup.add_argument("--debug", help="Debug", action="store_true") parser_session_create = subparsers.add_parser('create') diff --git a/tools/glusterfind/src/utils.py b/tools/glusterfind/src/utils.py index cda5ea6378e..906ebd8f252 100644 --- a/tools/glusterfind/src/utils.py +++ b/tools/glusterfind/src/utils.py @@ -1,4 +1,4 @@ -#!/usr/bin/env python +# -*- coding: utf-8 -*- # Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com/> # This file is part of GlusterFS. @@ -15,10 +15,12 @@ import xml.etree.cElementTree as etree import logging import os from datetime import datetime -import urllib ROOT_GFID = "00000000-0000-0000-0000-000000000001" DEFAULT_CHANGELOG_INTERVAL = 15 +SPACE_ESCAPE_CHAR = "%20" +NEWLINE_ESCAPE_CHAR = "%0A" +PERCENTAGE_ESCAPE_CHAR = "%25" ParseError = etree.ParseError if hasattr(etree, 'ParseError') else SyntaxError cache_data = {} @@ -34,10 +36,10 @@ class RecordType(object): def cache_output(func): def wrapper(*args, **kwargs): global cache_data - if cache_data.get(func.func_name, None) is None: - cache_data[func.func_name] = func(*args, **kwargs) + if cache_data.get(func.__name__, None) is None: + cache_data[func.__name__] = func(*args, **kwargs) - return cache_data[func.func_name] + return cache_data[func.__name__] return wrapper @@ -56,12 +58,13 @@ def find(path, callback_func=lambda x: True, filter_func=lambda x: True, # Capture filter_func output and pass it to callback function filter_result = filter_func(path) if filter_result is not None: - callback_func(path, filter_result) + callback_func(path, filter_result, os.path.isdir(path)) for p in os.listdir(path): full_path = os.path.join(path, p) - if os.path.isdir(full_path): + is_dir = os.path.isdir(full_path) + if is_dir: if subdirs_crawl: find(full_path, callback_func, filter_func, ignore_dirs) else: @@ -71,10 +74,11 @@ def find(path, callback_func=lambda x: True, filter_func=lambda x: True, else: filter_result = filter_func(full_path) if filter_result is not None: - callback_func(full_path, filter_result) + callback_func(full_path, filter_result, is_dir) -def output_write(f, path, prefix=".", encode=False): +def output_write(f, path, prefix=".", encode=False, tag="", + field_separator=" "): if path == "": return @@ -82,9 +86,12 @@ def output_write(f, path, prefix=".", encode=False): path = os.path.join(prefix, path) if encode: - path = urllib.quote_plus(path) + path = quote_plus_space_newline(path) - f.write("%s\n" % path) + # set the field separator + FS = "" if tag == "" else field_separator + + f.write("%s%s%s\n" % (tag.strip(), FS, path)) def human_time(ts): @@ -223,6 +230,38 @@ def get_changelog_rollover_time(volumename): try: tree = etree.fromstring(out) - return int(tree.find('volGetopts/Value').text) + val = tree.find('volGetopts/Opt/Value').text + if val is not None: + # Filter the value by split, as it may be 'X (DEFAULT)' + # and we only need 'X' + return int(val.split(' ', 1)[0]) except ParseError: return DEFAULT_CHANGELOG_INTERVAL + + +def output_path_prepare(path, args): + """ + If Prefix is set, joins to Path, removes ending slash + and encodes it. + """ + if args.output_prefix != ".": + path = os.path.join(args.output_prefix, path) + if path.endswith("/"): + path = path[0:len(path)-1] + + if args.no_encode: + return path + else: + return quote_plus_space_newline(path) + + +def unquote_plus_space_newline(s): + return s.replace(SPACE_ESCAPE_CHAR, " ")\ + .replace(NEWLINE_ESCAPE_CHAR, "\n")\ + .replace(PERCENTAGE_ESCAPE_CHAR, "%") + + +def quote_plus_space_newline(s): + return s.replace("%", PERCENTAGE_ESCAPE_CHAR)\ + .replace(" ", SPACE_ESCAPE_CHAR)\ + .replace("\n", NEWLINE_ESCAPE_CHAR) |
