summaryrefslogtreecommitdiffstats
path: root/tools/glusterfind/src/changelog.py
diff options
context:
space:
mode:
Diffstat (limited to 'tools/glusterfind/src/changelog.py')
-rw-r--r--tools/glusterfind/src/changelog.py394
1 files changed, 223 insertions, 171 deletions
diff --git a/tools/glusterfind/src/changelog.py b/tools/glusterfind/src/changelog.py
index 2c4ee9106e1..b5f71c7c0ee 100644
--- a/tools/glusterfind/src/changelog.py
+++ b/tools/glusterfind/src/changelog.py
@@ -12,16 +12,16 @@ import os
import sys
import time
import xattr
-from errno import ENOENT
import logging
from argparse import ArgumentParser, RawDescriptionHelpFormatter
import hashlib
import urllib
import libgfchangelog
-from utils import create_file, mkdirp, execute, symlink_gfid_to_path
-from utils import fail, setup_logger, output_write, find
+from utils import mkdirp, symlink_gfid_to_path
+from utils import fail, setup_logger, find
from utils import get_changelog_rollover_time
+from changelogdata import ChangelogData
import conf
@@ -37,159 +37,202 @@ history_turn_time = 0
logger = logging.getLogger()
-def gfid_to_path_using_batchfind(brick, gfids_file, output_file):
+def output_path_prepare(path, output_prefix):
"""
- find -samefile gets the inode number and crawls entire namespace
- to get the list of files/dirs having same inode number.
- Do find without any option, except the ignore directory option,
- print the output in <INODE_NUM> <PATH> format, use this output
- to look into in-memory dictionary of inode numbers got from the
- list of GFIDs
+ If Prefix is set, joins to Path, removes ending slash
+ and encodes it.
"""
- with open(output_file, "a+") as fout:
- inode_dict = {}
- with open(gfids_file) as f:
- for gfid in f:
- gfid = gfid.strip()
- backend_path = os.path.join(brick, ".glusterfs",
- gfid[0:2], gfid[2:4], gfid)
-
- try:
- inode_dict[str(os.stat(backend_path).st_ino)] = 1
- except (IOError, OSError) as e:
- if e.errno == ENOENT:
- continue
- else:
- fail("%s Failed to convert to path from "
- "GFID %s: %s" % (brick, gfid, e), logger=logger)
-
- if not inode_dict:
- return
-
- def inode_filter(path):
- try:
- st = os.lstat(path)
- except (OSError, IOError) as e:
- if e.errno == ENOENT:
- st = None
- else:
- raise
-
- if st and inode_dict.get(str(st.st_ino), None):
- return True
-
- return False
-
- brick_path_len = len(brick)
-
- def output_callback(path):
- path = path.strip()
- path = path[brick_path_len+1:]
- output_write(fout, path, args.output_prefix)
-
- ignore_dirs = [os.path.join(brick, dirname)
- for dirname in
- conf.get_opt("brick_ignore_dirs").split(",")]
- # Length of brick path, to remove from output path
- find(brick, callback_func=output_callback,
- filter_func=inode_filter,
- ignore_dirs=ignore_dirs)
+ if output_prefix != ".":
+ path = os.path.join(output_prefix, path)
+ if path.endswith("/"):
+ path = path[0:len(path)-1]
+
+ return urllib.quote_plus(path)
+
+
+def pgfid_to_path(brick, changelog_data):
+ """
+ For all the pgfids in table, converts into path using recursive
+ readlink.
+ """
+ # pgfid1 to path1 in case of CREATE/MKNOD/MKDIR/LINK/SYMLINK
+ for row in changelog_data.gfidpath_get_distinct("pgfid1", {"path1": ""}):
+ # In case of Data/Metadata only, pgfid1 will not be their
+ if row[0] == "":
+ continue
+
+ path = symlink_gfid_to_path(brick, row[0])
+ path = output_path_prepare(path, args.output_prefix)
+
+ changelog_data.gfidpath_set_path1(path, row[0])
+
+ # pgfid2 to path2 in case of RENAME
+ for row in changelog_data.gfidpath_get_distinct("pgfid2",
+ {"type": "RENAME",
+ "path2": ""}):
+ # Only in case of Rename pgfid2 exists
+ if row[0] == "":
+ continue
- fout.flush()
- os.fsync(fout.fileno())
+ path = symlink_gfid_to_path(brick, row[0])
+ if path == "":
+ continue
+ path = output_path_prepare(path, args.output_prefix)
+ changelog_data.gfidpath_set_path2(path, row[0])
-def gfid_to_path_using_pgfid(brick, gfids_file, output_file, outfile_failures):
+
+def populate_pgfid_and_inodegfid(brick, changelog_data):
"""
- Parent GFID is saved as xattr, collect Parent GFIDs from all
- the files from gfids_file. Convert parent GFID to path and Crawl
- each directories to get the list of files/dirs having same inode number.
- Do find with maxdepth as 1 and print the output in <INODE_NUM> <PATH>
- format, use this output to look into in memory dictionary of inode
- numbers got from the list of GFIDs
+ For all the DATA/METADATA modifications GFID,
+ If symlink, directly convert to Path using Readlink.
+ If not symlink, try to get PGFIDs via xattr query and populate it
+ to pgfid table, collect inodes in inodegfid table
"""
- with open(output_file, "a+") as fout:
- pgfids = set()
- inode_dict = {}
- with open(gfids_file) as f:
- for gfid in f:
- gfid = gfid.strip()
- p = os.path.join(brick,
- ".glusterfs",
- gfid[0:2],
- gfid[2:4],
- gfid)
- if os.path.islink(p):
- path = symlink_gfid_to_path(brick, gfid)
- output_write(fout, path, args.output_prefix)
- else:
- try:
- inode_dict[str(os.stat(p).st_ino)] = 1
- file_xattrs = xattr.list(p)
- num_parent_gfid = 0
- for x in file_xattrs:
- if x.startswith("trusted.pgfid."):
- num_parent_gfid += 1
- pgfids.add(x.split(".")[-1])
-
- if num_parent_gfid == 0:
- with open(outfile_failures, "a") as f:
- f.write("%s\n" % gfid)
- f.flush()
- os.fsync(f.fileno())
-
- except (IOError, OSError) as e:
- if e.errno == ENOENT:
- continue
- else:
- fail("%s Failed to convert to path from "
- "GFID %s: %s" % (brick, gfid, e),
- logger=logger)
-
- if not inode_dict:
- return
-
- def inode_filter(path):
+ for row in changelog_data.gfidpath_get({"path1": "", "type": "MODIFY"}):
+ gfid = row[3].strip()
+ p = os.path.join(brick, ".glusterfs", gfid[0:2], gfid[2:4], gfid)
+ if os.path.islink(p):
+ # It is a Directory if GFID backend path is symlink
+ path = symlink_gfid_to_path(brick, gfid)
+ if path == "":
+ continue
+
+ path = output_path_prepare(path, args.output_prefix)
+
+ changelog_data.gfidpath_update({"path1": path},
+ {"gfid": row[0]})
+ else:
try:
- st = os.lstat(path)
- except (OSError, IOError) as e:
- if e.errno == ENOENT:
- st = None
- else:
- raise
+ # INODE and GFID to inodegfid table
+ changelog_data.inodegfid_add(os.stat(p).st_ino, gfid)
+ file_xattrs = xattr.list(p)
+ for x in file_xattrs:
+ if x.startswith("trusted.pgfid."):
+ # PGFID in pgfid table
+ changelog_data.pgfid_add(x.split(".")[-1])
+ except (IOError, OSError):
+ # All OS Errors ignored, since failures will be logged
+ # in End. All GFIDs present in gfidpath table
+ continue
+
+
+def gfid_to_path_using_pgfid(brick, changelog_data, args):
+ """
+ For all the pgfids collected, Converts to Path and
+ does readdir on those directories and looks up inodegfid
+ table for matching inode number.
+ """
+ populate_pgfid_and_inodegfid(brick, changelog_data)
+
+ # If no GFIDs needs conversion to Path
+ if not changelog_data.inodegfid_exists({"converted": 0}):
+ return
+
+ def inode_filter(path):
+ # Looks in inodegfid table, if exists returns
+ # inode number else None
+ try:
+ st = os.lstat(path)
+ except (OSError, IOError):
+ st = None
+
+ if st and changelog_data.inodegfid_exists({"inode": st.st_ino}):
+ return st.st_ino
+
+ return None
+
+ # Length of brick path, to remove from output path
+ brick_path_len = len(brick)
+
+ def output_callback(path, inode):
+ # For each path found, encodes it and updates path1
+ # Also updates converted flag in inodegfid table as 1
+ path = path.strip()
+ path = path[brick_path_len+1:]
+
+ path = output_path_prepare(path, args.output_prefix)
- if st and inode_dict.get(str(st.st_ino), None):
- return True
+ changelog_data.append_path1(path, inode)
+ changelog_data.inodegfid_update({"converted": 1}, {"inode": inode})
- return False
+ ignore_dirs = [os.path.join(brick, dirname)
+ for dirname in
+ conf.get_opt("brick_ignore_dirs").split(",")]
- # Length of brick path, to remove from output path
- brick_path_len = len(brick)
+ for row in changelog_data.pgfid_get():
+ path = symlink_gfid_to_path(brick, row[0])
+ find(os.path.join(brick, path),
+ callback_func=output_callback,
+ filter_func=inode_filter,
+ ignore_dirs=ignore_dirs,
+ subdirs_crawl=False)
+
+
+def gfid_to_path_using_batchfind(brick, changelog_data):
+ # If all the GFIDs converted using gfid_to_path_using_pgfid
+ if not changelog_data.inodegfid_exists({"converted": 0}):
+ return
+
+ def inode_filter(path):
+ # Looks in inodegfid table, if exists returns
+ # inode number else None
+ try:
+ st = os.lstat(path)
+ except (OSError, IOError):
+ st = None
+
+ if st and changelog_data.inodegfid_exists({"inode": st.st_ino}):
+ return st.st_ino
- def output_callback(path):
- path = path.strip()
- path = path[brick_path_len+1:]
- output_write(fout, path, args.output_prefix)
+ return None
- ignore_dirs = [os.path.join(brick, dirname)
- for dirname in
- conf.get_opt("brick_ignore_dirs").split(",")]
+ # Length of brick path, to remove from output path
+ brick_path_len = len(brick)
- for pgfid in pgfids:
- path = symlink_gfid_to_path(brick, pgfid)
- find(os.path.join(brick, path),
- callback_func=output_callback,
- filter_func=inode_filter,
- ignore_dirs=ignore_dirs,
- subdirs_crawl=False)
+ def output_callback(path, inode):
+ # For each path found, encodes it and updates path1
+ # Also updates converted flag in inodegfid table as 1
+ path = path.strip()
+ path = path[brick_path_len+1:]
+ path = output_path_prepare(path, args.output_prefix)
- fout.flush()
- os.fsync(fout.fileno())
+ changelog_data.append_path1(path, inode)
+ ignore_dirs = [os.path.join(brick, dirname)
+ for dirname in
+ conf.get_opt("brick_ignore_dirs").split(",")]
-def sort_unique(filename):
- execute(["sort", "-u", "-o", filename, filename],
- exit_msg="Sort failed", logger=logger)
+ # Full Namespace Crawl
+ find(brick, callback_func=output_callback,
+ filter_func=inode_filter,
+ ignore_dirs=ignore_dirs)
+
+
+def parse_changelog_to_db(changelog_data, filename):
+ """
+ Parses a Changelog file and populates data in gfidpath table
+ """
+ with open(filename) as f:
+ changelogfile = os.path.basename(filename)
+ for line in f:
+ data = line.strip().split(" ")
+ if data[0] == "E" and data[2] in ["CREATE", "MKNOD", "MKDIR"]:
+ # CREATE/MKDIR/MKNOD
+ changelog_data.when_create_mknod_mkdir(changelogfile, data)
+ elif data[0] in ["D", "M"]:
+ # DATA/META
+ if not args.only_namespace_changes:
+ changelog_data.when_data_meta(changelogfile, data)
+ elif data[0] == "E" and data[2] in ["LINK", "SYMLINK"]:
+ # LINK/SYMLINK
+ changelog_data.when_link_symlink(changelogfile, data)
+ elif data[0] == "E" and data[2] == "RENAME":
+ # RENAME
+ changelog_data.when_rename(changelogfile, data)
+ elif data[0] == "E" and data[2] in ["UNLINK", "RMDIR"]:
+ # UNLINK/RMDIR
+ changelog_data.when_unlink_rmdir(changelogfile, data)
def get_changes(brick, hash_dir, log_file, start, end, args):
@@ -199,6 +242,18 @@ def get_changes(brick, hash_dir, log_file, start, end, args):
the modified gfids from the changelogs and writes the list
of gfid to 'gfid_list' file.
"""
+ session_dir = os.path.join(conf.get_opt("session_dir"),
+ args.session)
+ status_file = os.path.join(session_dir, args.volume,
+ "%s.status" % urllib.quote_plus(args.brick))
+
+ # Get previous session
+ try:
+ with open(status_file) as f:
+ start = int(f.read().strip())
+ except (ValueError, OSError, IOError):
+ start = args.start
+
try:
libgfchangelog.cl_init()
libgfchangelog.cl_register(brick, hash_dir, log_file,
@@ -207,10 +262,7 @@ def get_changes(brick, hash_dir, log_file, start, end, args):
fail("%s Changelog register failed: %s" % (brick, e), logger=logger)
# Output files to record GFIDs and GFID to Path failure GFIDs
- gfid_list_path = args.outfile + ".gfids"
- gfid_list_failures_file = gfid_list_path + ".failures"
- create_file(gfid_list_path, exit_on_err=True, logger=logger)
- create_file(gfid_list_failures_file, exit_on_err=True, logger=logger)
+ changelog_data = ChangelogData(args.outfile)
# Changelogs path(Hard coded to BRICK/.glusterfs/changelogs
cl_path = os.path.join(brick, ".glusterfs/changelogs")
@@ -234,37 +286,31 @@ def get_changes(brick, hash_dir, log_file, start, end, args):
while libgfchangelog.cl_history_scan() > 0:
changes += libgfchangelog.cl_history_getchanges()
- if changes:
- with open(gfid_list_path, 'a+') as fgfid:
- for change in changes:
- # Ignore if last processed changelog comes
- # again in list
- if change.endswith(".%s" % start):
- continue
-
- with open(change) as f:
- for line in f:
- # Space delimited list, collect GFID
- details = line.split()
- fgfid.write("%s\n" % details[1])
-
- libgfchangelog.cl_history_done(change)
- fgfid.flush()
- os.fsync(fgfid.fileno())
+ for change in changes:
+ # Ignore if last processed changelog comes
+ # again in list
+ if change.endswith(".%s" % start):
+ continue
+ parse_changelog_to_db(changelog_data, change)
+ libgfchangelog.cl_history_done(change)
+
+ changelog_data.commit()
except libgfchangelog.ChangelogException as e:
fail("%s Error during Changelog Crawl: %s" % (brick, e),
logger=logger)
- # If TS returned from history_changelog is < end time
- # then FS crawl may be required, since history is only available
- # till TS returned from history_changelog
- if actual_end < end:
- fail("Partial History available with Changelog", 2, logger=logger)
+ # Convert all pgfid available from Changelogs
+ pgfid_to_path(brick, changelog_data)
+ changelog_data.commit()
+
+ # Convert all GFIDs for which no other additional details available
+ gfid_to_path_using_pgfid(brick, changelog_data, args)
+ changelog_data.commit()
- sort_unique(gfid_list_path)
- gfid_to_path_using_pgfid(brick, gfid_list_path,
- args.outfile, gfid_list_failures_file)
- gfid_to_path_using_batchfind(brick, gfid_list_failures_file, args.outfile)
+ # If some GFIDs fail to get converted from previous step,
+ # convert using find
+ gfid_to_path_using_batchfind(brick, changelog_data)
+ changelog_data.commit()
return actual_end
@@ -283,8 +329,6 @@ def changelog_crawl(brick, start, end, args):
working_dir = os.path.join(working_dir, brickhash)
mkdirp(working_dir, exit_on_err=True, logger=logger)
- create_file(args.outfile, exit_on_err=True, logger=logger)
- create_file(args.outfile + ".gfids", exit_on_err=True, logger=logger)
log_file = os.path.join(conf.get_opt("log_dir"),
args.session,
@@ -308,6 +352,9 @@ def _get_args():
parser.add_argument("--debug", help="Debug", action="store_true")
parser.add_argument("--output-prefix", help="File prefix in output",
default=".")
+ parser.add_argument("-N", "--only-namespace-changes",
+ help="List only namespace changes",
+ action="store_true")
return parser.parse_args()
@@ -336,8 +383,13 @@ if __name__ == "__main__":
start = args.start
end = int(time.time()) - get_changelog_rollover_time(args.volume)
+ logger.info("%s Started Changelog Crawl - Start: %s End: %s" % (args.brick,
+ start,
+ end))
actual_end = changelog_crawl(args.brick, start, end, args)
with open(status_file_pre, "w", buffering=0) as f:
f.write(str(actual_end))
+ logger.info("%s Finished Changelog Crawl - End: %s" % (args.brick,
+ actual_end))
sys.exit(0)