summaryrefslogtreecommitdiffstats
path: root/tools/glusterfind/src/changelog.py
diff options
context:
space:
mode:
authorAravinda VK <avishwan@redhat.com>2015-04-30 12:28:17 +0530
committerVijay Bellur <vbellur@redhat.com>2015-05-08 21:59:10 -0700
commite88837ed0ff68093912c2b8e996c5851c53674ca (patch)
tree854c30520331099685b29e28e8b8dd15fa357d3a /tools/glusterfind/src/changelog.py
parent2676c402bc47ee89b763393e496a013e82d76e54 (diff)
tools/glusterfind: GFID to Path conversion using Changelog
Records fop information collected from Changelogs in sqlite database. This is only working database, not required after processing. After post processing, output file is generated by reading these database files. This is applicable only in incremental run, When a changelog is parsed, all the details are saved in Db. GFID to Path is converted to those files for which information is available in Changelogs. For all the failed cases, it tries to convert to Path using Pgfid, if not found GFID to Path is done using find. BUG: 1201284 Change-Id: I53f168860dae15a0149004835e67f97aebd822be Signed-off-by: Aravinda VK <avishwan@redhat.com> Reviewed-on: http://review.gluster.org/10463 Reviewed-by: Kotresh HR <khiremat@redhat.com> Tested-by: Gluster Build System <jenkins@build.gluster.com> Reviewed-by: Vijay Bellur <vbellur@redhat.com>
Diffstat (limited to 'tools/glusterfind/src/changelog.py')
-rw-r--r--tools/glusterfind/src/changelog.py394
1 files changed, 223 insertions, 171 deletions
diff --git a/tools/glusterfind/src/changelog.py b/tools/glusterfind/src/changelog.py
index 2c4ee9106e1..b5f71c7c0ee 100644
--- a/tools/glusterfind/src/changelog.py
+++ b/tools/glusterfind/src/changelog.py
@@ -12,16 +12,16 @@ import os
import sys
import time
import xattr
-from errno import ENOENT
import logging
from argparse import ArgumentParser, RawDescriptionHelpFormatter
import hashlib
import urllib
import libgfchangelog
-from utils import create_file, mkdirp, execute, symlink_gfid_to_path
-from utils import fail, setup_logger, output_write, find
+from utils import mkdirp, symlink_gfid_to_path
+from utils import fail, setup_logger, find
from utils import get_changelog_rollover_time
+from changelogdata import ChangelogData
import conf
@@ -37,159 +37,202 @@ history_turn_time = 0
logger = logging.getLogger()
-def gfid_to_path_using_batchfind(brick, gfids_file, output_file):
+def output_path_prepare(path, output_prefix):
"""
- find -samefile gets the inode number and crawls entire namespace
- to get the list of files/dirs having same inode number.
- Do find without any option, except the ignore directory option,
- print the output in <INODE_NUM> <PATH> format, use this output
- to look into in-memory dictionary of inode numbers got from the
- list of GFIDs
+ If Prefix is set, joins to Path, removes ending slash
+ and encodes it.
"""
- with open(output_file, "a+") as fout:
- inode_dict = {}
- with open(gfids_file) as f:
- for gfid in f:
- gfid = gfid.strip()
- backend_path = os.path.join(brick, ".glusterfs",
- gfid[0:2], gfid[2:4], gfid)
-
- try:
- inode_dict[str(os.stat(backend_path).st_ino)] = 1
- except (IOError, OSError) as e:
- if e.errno == ENOENT:
- continue
- else:
- fail("%s Failed to convert to path from "
- "GFID %s: %s" % (brick, gfid, e), logger=logger)
-
- if not inode_dict:
- return
-
- def inode_filter(path):
- try:
- st = os.lstat(path)
- except (OSError, IOError) as e:
- if e.errno == ENOENT:
- st = None
- else:
- raise
-
- if st and inode_dict.get(str(st.st_ino), None):
- return True
-
- return False
-
- brick_path_len = len(brick)
-
- def output_callback(path):
- path = path.strip()
- path = path[brick_path_len+1:]
- output_write(fout, path, args.output_prefix)
-
- ignore_dirs = [os.path.join(brick, dirname)
- for dirname in
- conf.get_opt("brick_ignore_dirs").split(",")]
- # Length of brick path, to remove from output path
- find(brick, callback_func=output_callback,
- filter_func=inode_filter,
- ignore_dirs=ignore_dirs)
+ if output_prefix != ".":
+ path = os.path.join(output_prefix, path)
+ if path.endswith("/"):
+ path = path[0:len(path)-1]
+
+ return urllib.quote_plus(path)
+
+
+def pgfid_to_path(brick, changelog_data):
+ """
+ For all the pgfids in table, converts into path using recursive
+ readlink.
+ """
+ # pgfid1 to path1 in case of CREATE/MKNOD/MKDIR/LINK/SYMLINK
+ for row in changelog_data.gfidpath_get_distinct("pgfid1", {"path1": ""}):
+ # In case of Data/Metadata only, pgfid1 will not be their
+ if row[0] == "":
+ continue
+
+ path = symlink_gfid_to_path(brick, row[0])
+ path = output_path_prepare(path, args.output_prefix)
+
+ changelog_data.gfidpath_set_path1(path, row[0])
+
+ # pgfid2 to path2 in case of RENAME
+ for row in changelog_data.gfidpath_get_distinct("pgfid2",
+ {"type": "RENAME",
+ "path2": ""}):
+ # Only in case of Rename pgfid2 exists
+ if row[0] == "":
+ continue
- fout.flush()
- os.fsync(fout.fileno())
+ path = symlink_gfid_to_path(brick, row[0])
+ if path == "":
+ continue
+ path = output_path_prepare(path, args.output_prefix)
+ changelog_data.gfidpath_set_path2(path, row[0])
-def gfid_to_path_using_pgfid(brick, gfids_file, output_file, outfile_failures):
+
+def populate_pgfid_and_inodegfid(brick, changelog_data):
"""
- Parent GFID is saved as xattr, collect Parent GFIDs from all
- the files from gfids_file. Convert parent GFID to path and Crawl
- each directories to get the list of files/dirs having same inode number.
- Do find with maxdepth as 1 and print the output in <INODE_NUM> <PATH>
- format, use this output to look into in memory dictionary of inode
- numbers got from the list of GFIDs
+ For all the DATA/METADATA modifications GFID,
+ If symlink, directly convert to Path using Readlink.
+ If not symlink, try to get PGFIDs via xattr query and populate it
+ to pgfid table, collect inodes in inodegfid table
"""
- with open(output_file, "a+") as fout:
- pgfids = set()
- inode_dict = {}
- with open(gfids_file) as f:
- for gfid in f:
- gfid = gfid.strip()
- p = os.path.join(brick,
- ".glusterfs",
- gfid[0:2],
- gfid[2:4],
- gfid)
- if os.path.islink(p):
- path = symlink_gfid_to_path(brick, gfid)
- output_write(fout, path, args.output_prefix)
- else:
- try:
- inode_dict[str(os.stat(p).st_ino)] = 1
- file_xattrs = xattr.list(p)
- num_parent_gfid = 0
- for x in file_xattrs:
- if x.startswith("trusted.pgfid."):
- num_parent_gfid += 1
- pgfids.add(x.split(".")[-1])
-
- if num_parent_gfid == 0:
- with open(outfile_failures, "a") as f:
- f.write("%s\n" % gfid)
- f.flush()
- os.fsync(f.fileno())
-
- except (IOError, OSError) as e:
- if e.errno == ENOENT:
- continue
- else:
- fail("%s Failed to convert to path from "
- "GFID %s: %s" % (brick, gfid, e),
- logger=logger)
-
- if not inode_dict:
- return
-
- def inode_filter(path):
+ for row in changelog_data.gfidpath_get({"path1": "", "type": "MODIFY"}):
+ gfid = row[3].strip()
+ p = os.path.join(brick, ".glusterfs", gfid[0:2], gfid[2:4], gfid)
+ if os.path.islink(p):
+ # It is a Directory if GFID backend path is symlink
+ path = symlink_gfid_to_path(brick, gfid)
+ if path == "":
+ continue
+
+ path = output_path_prepare(path, args.output_prefix)
+
+ changelog_data.gfidpath_update({"path1": path},
+ {"gfid": row[0]})
+ else:
try:
- st = os.lstat(path)
- except (OSError, IOError) as e:
- if e.errno == ENOENT:
- st = None
- else:
- raise
+ # INODE and GFID to inodegfid table
+ changelog_data.inodegfid_add(os.stat(p).st_ino, gfid)
+ file_xattrs = xattr.list(p)
+ for x in file_xattrs:
+ if x.startswith("trusted.pgfid."):
+ # PGFID in pgfid table
+ changelog_data.pgfid_add(x.split(".")[-1])
+ except (IOError, OSError):
+ # All OS Errors ignored, since failures will be logged
+ # in End. All GFIDs present in gfidpath table
+ continue
+
+
+def gfid_to_path_using_pgfid(brick, changelog_data, args):
+ """
+ For all the pgfids collected, Converts to Path and
+ does readdir on those directories and looks up inodegfid
+ table for matching inode number.
+ """
+ populate_pgfid_and_inodegfid(brick, changelog_data)
+
+ # If no GFIDs needs conversion to Path
+ if not changelog_data.inodegfid_exists({"converted": 0}):
+ return
+
+ def inode_filter(path):
+ # Looks in inodegfid table, if exists returns
+ # inode number else None
+ try:
+ st = os.lstat(path)
+ except (OSError, IOError):
+ st = None
+
+ if st and changelog_data.inodegfid_exists({"inode": st.st_ino}):
+ return st.st_ino
+
+ return None
+
+ # Length of brick path, to remove from output path
+ brick_path_len = len(brick)
+
+ def output_callback(path, inode):
+ # For each path found, encodes it and updates path1
+ # Also updates converted flag in inodegfid table as 1
+ path = path.strip()
+ path = path[brick_path_len+1:]
+
+ path = output_path_prepare(path, args.output_prefix)
- if st and inode_dict.get(str(st.st_ino), None):
- return True
+ changelog_data.append_path1(path, inode)
+ changelog_data.inodegfid_update({"converted": 1}, {"inode": inode})
- return False
+ ignore_dirs = [os.path.join(brick, dirname)
+ for dirname in
+ conf.get_opt("brick_ignore_dirs").split(",")]
- # Length of brick path, to remove from output path
- brick_path_len = len(brick)
+ for row in changelog_data.pgfid_get():
+ path = symlink_gfid_to_path(brick, row[0])
+ find(os.path.join(brick, path),
+ callback_func=output_callback,
+ filter_func=inode_filter,
+ ignore_dirs=ignore_dirs,
+ subdirs_crawl=False)
+
+
+def gfid_to_path_using_batchfind(brick, changelog_data):
+ # If all the GFIDs converted using gfid_to_path_using_pgfid
+ if not changelog_data.inodegfid_exists({"converted": 0}):
+ return
+
+ def inode_filter(path):
+ # Looks in inodegfid table, if exists returns
+ # inode number else None
+ try:
+ st = os.lstat(path)
+ except (OSError, IOError):
+ st = None
+
+ if st and changelog_data.inodegfid_exists({"inode": st.st_ino}):
+ return st.st_ino
- def output_callback(path):
- path = path.strip()
- path = path[brick_path_len+1:]
- output_write(fout, path, args.output_prefix)
+ return None
- ignore_dirs = [os.path.join(brick, dirname)
- for dirname in
- conf.get_opt("brick_ignore_dirs").split(",")]
+ # Length of brick path, to remove from output path
+ brick_path_len = len(brick)
- for pgfid in pgfids:
- path = symlink_gfid_to_path(brick, pgfid)
- find(os.path.join(brick, path),
- callback_func=output_callback,
- filter_func=inode_filter,
- ignore_dirs=ignore_dirs,
- subdirs_crawl=False)
+ def output_callback(path, inode):
+ # For each path found, encodes it and updates path1
+ # Also updates converted flag in inodegfid table as 1
+ path = path.strip()
+ path = path[brick_path_len+1:]
+ path = output_path_prepare(path, args.output_prefix)
- fout.flush()
- os.fsync(fout.fileno())
+ changelog_data.append_path1(path, inode)
+ ignore_dirs = [os.path.join(brick, dirname)
+ for dirname in
+ conf.get_opt("brick_ignore_dirs").split(",")]
-def sort_unique(filename):
- execute(["sort", "-u", "-o", filename, filename],
- exit_msg="Sort failed", logger=logger)
+ # Full Namespace Crawl
+ find(brick, callback_func=output_callback,
+ filter_func=inode_filter,
+ ignore_dirs=ignore_dirs)
+
+
+def parse_changelog_to_db(changelog_data, filename):
+ """
+ Parses a Changelog file and populates data in gfidpath table
+ """
+ with open(filename) as f:
+ changelogfile = os.path.basename(filename)
+ for line in f:
+ data = line.strip().split(" ")
+ if data[0] == "E" and data[2] in ["CREATE", "MKNOD", "MKDIR"]:
+ # CREATE/MKDIR/MKNOD
+ changelog_data.when_create_mknod_mkdir(changelogfile, data)
+ elif data[0] in ["D", "M"]:
+ # DATA/META
+ if not args.only_namespace_changes:
+ changelog_data.when_data_meta(changelogfile, data)
+ elif data[0] == "E" and data[2] in ["LINK", "SYMLINK"]:
+ # LINK/SYMLINK
+ changelog_data.when_link_symlink(changelogfile, data)
+ elif data[0] == "E" and data[2] == "RENAME":
+ # RENAME
+ changelog_data.when_rename(changelogfile, data)
+ elif data[0] == "E" and data[2] in ["UNLINK", "RMDIR"]:
+ # UNLINK/RMDIR
+ changelog_data.when_unlink_rmdir(changelogfile, data)
def get_changes(brick, hash_dir, log_file, start, end, args):
@@ -199,6 +242,18 @@ def get_changes(brick, hash_dir, log_file, start, end, args):
the modified gfids from the changelogs and writes the list
of gfid to 'gfid_list' file.
"""
+ session_dir = os.path.join(conf.get_opt("session_dir"),
+ args.session)
+ status_file = os.path.join(session_dir, args.volume,
+ "%s.status" % urllib.quote_plus(args.brick))
+
+ # Get previous session
+ try:
+ with open(status_file) as f:
+ start = int(f.read().strip())
+ except (ValueError, OSError, IOError):
+ start = args.start
+
try:
libgfchangelog.cl_init()
libgfchangelog.cl_register(brick, hash_dir, log_file,
@@ -207,10 +262,7 @@ def get_changes(brick, hash_dir, log_file, start, end, args):
fail("%s Changelog register failed: %s" % (brick, e), logger=logger)
# Output files to record GFIDs and GFID to Path failure GFIDs
- gfid_list_path = args.outfile + ".gfids"
- gfid_list_failures_file = gfid_list_path + ".failures"
- create_file(gfid_list_path, exit_on_err=True, logger=logger)
- create_file(gfid_list_failures_file, exit_on_err=True, logger=logger)
+ changelog_data = ChangelogData(args.outfile)
# Changelogs path(Hard coded to BRICK/.glusterfs/changelogs
cl_path = os.path.join(brick, ".glusterfs/changelogs")
@@ -234,37 +286,31 @@ def get_changes(brick, hash_dir, log_file, start, end, args):
while libgfchangelog.cl_history_scan() > 0:
changes += libgfchangelog.cl_history_getchanges()
- if changes:
- with open(gfid_list_path, 'a+') as fgfid:
- for change in changes:
- # Ignore if last processed changelog comes
- # again in list
- if change.endswith(".%s" % start):
- continue
-
- with open(change) as f:
- for line in f:
- # Space delimited list, collect GFID
- details = line.split()
- fgfid.write("%s\n" % details[1])
-
- libgfchangelog.cl_history_done(change)
- fgfid.flush()
- os.fsync(fgfid.fileno())
+ for change in changes:
+ # Ignore if last processed changelog comes
+ # again in list
+ if change.endswith(".%s" % start):
+ continue
+ parse_changelog_to_db(changelog_data, change)
+ libgfchangelog.cl_history_done(change)
+
+ changelog_data.commit()
except libgfchangelog.ChangelogException as e:
fail("%s Error during Changelog Crawl: %s" % (brick, e),
logger=logger)
- # If TS returned from history_changelog is < end time
- # then FS crawl may be required, since history is only available
- # till TS returned from history_changelog
- if actual_end < end:
- fail("Partial History available with Changelog", 2, logger=logger)
+ # Convert all pgfid available from Changelogs
+ pgfid_to_path(brick, changelog_data)
+ changelog_data.commit()
+
+ # Convert all GFIDs for which no other additional details available
+ gfid_to_path_using_pgfid(brick, changelog_data, args)
+ changelog_data.commit()
- sort_unique(gfid_list_path)
- gfid_to_path_using_pgfid(brick, gfid_list_path,
- args.outfile, gfid_list_failures_file)
- gfid_to_path_using_batchfind(brick, gfid_list_failures_file, args.outfile)
+ # If some GFIDs fail to get converted from previous step,
+ # convert using find
+ gfid_to_path_using_batchfind(brick, changelog_data)
+ changelog_data.commit()
return actual_end
@@ -283,8 +329,6 @@ def changelog_crawl(brick, start, end, args):
working_dir = os.path.join(working_dir, brickhash)
mkdirp(working_dir, exit_on_err=True, logger=logger)
- create_file(args.outfile, exit_on_err=True, logger=logger)
- create_file(args.outfile + ".gfids", exit_on_err=True, logger=logger)
log_file = os.path.join(conf.get_opt("log_dir"),
args.session,
@@ -308,6 +352,9 @@ def _get_args():
parser.add_argument("--debug", help="Debug", action="store_true")
parser.add_argument("--output-prefix", help="File prefix in output",
default=".")
+ parser.add_argument("-N", "--only-namespace-changes",
+ help="List only namespace changes",
+ action="store_true")
return parser.parse_args()
@@ -336,8 +383,13 @@ if __name__ == "__main__":
start = args.start
end = int(time.time()) - get_changelog_rollover_time(args.volume)
+ logger.info("%s Started Changelog Crawl - Start: %s End: %s" % (args.brick,
+ start,
+ end))
actual_end = changelog_crawl(args.brick, start, end, args)
with open(status_file_pre, "w", buffering=0) as f:
f.write(str(actual_end))
+ logger.info("%s Finished Changelog Crawl - End: %s" % (args.brick,
+ actual_end))
sys.exit(0)