diff options
| -rw-r--r-- | .gitignore | 5 | ||||
| -rw-r--r-- | configure.ac | 18 | ||||
| -rw-r--r-- | extras/Makefile.am | 3 | ||||
| -rwxr-xr-x | extras/hook-scripts/S56glusterd-geo-rep-create-post.sh | 2 | ||||
| -rw-r--r-- | extras/peer_add_secret_pub.in (renamed from geo-replication/src/peer_add_secret_pub.in) | 16 | ||||
| -rw-r--r-- | geo-replication/src/Makefile.am | 2 | ||||
| -rwxr-xr-x | geo-replication/src/set_geo_rep_pem_keys.sh | 2 | ||||
| -rw-r--r-- | glusterfs.spec.in | 14 | ||||
| -rw-r--r-- | tools/Makefile.am | 2 | ||||
| -rw-r--r-- | tools/glusterfind/Makefile.am | 7 | ||||
| -rw-r--r-- | tools/glusterfind/glusterfind.in | 17 | ||||
| -rw-r--r-- | tools/glusterfind/src/Makefile.am | 14 | ||||
| -rw-r--r-- | tools/glusterfind/src/__init__.py | 9 | ||||
| -rw-r--r-- | tools/glusterfind/src/brickfind.py | 97 | ||||
| -rw-r--r-- | tools/glusterfind/src/changelog.py | 309 | ||||
| -rw-r--r-- | tools/glusterfind/src/conf.py | 28 | ||||
| -rw-r--r-- | tools/glusterfind/src/libgfchangelog.py | 83 | ||||
| -rw-r--r-- | tools/glusterfind/src/main.py | 468 | ||||
| -rw-r--r-- | tools/glusterfind/src/nodecleanup.py | 51 | ||||
| -rw-r--r-- | tools/glusterfind/src/tool.conf.in | 11 | ||||
| -rw-r--r-- | tools/glusterfind/src/utils.py | 203 | 
21 files changed, 1341 insertions, 20 deletions
diff --git a/.gitignore b/.gitignore index acead3677ae..c8c07f9517d 100644 --- a/.gitignore +++ b/.gitignore @@ -56,8 +56,6 @@ extras/systemd/glusterd.service  extras/who-wrote-glusterfs/gitdm  geo-replication/.tox  geo-replication/src/gsyncd -geo-replication/src/gsyncd -geo-replication/src/peer_add_secret_pub  geo-replication/src/peer_gsec_create  geo-replication/src/peer_mountbroker  geo-replication/src/set_geo_rep_pem_keys.sh @@ -86,3 +84,6 @@ xlators/features/glupy/src/__init__.py  xlators/features/glupy/src/setup.py  xlators/mount/fuse/utils/mount.glusterfs  xlators/mount/fuse/utils/mount_glusterfs +extras/peer_add_secret_pub +tools/glusterfind/glusterfind +tools/glusterfind/src/tool.conf diff --git a/configure.ac b/configure.ac index 84eb0f40d45..c3f505bcc00 100644 --- a/configure.ac +++ b/configure.ac @@ -39,8 +39,8 @@ AC_CONFIG_FILES([Makefile                  libglusterfs/Makefile                  libglusterfs/src/Makefile                  geo-replication/src/peer_gsec_create -                geo-replication/src/peer_add_secret_pub                  geo-replication/src/peer_mountbroker +                extras/peer_add_secret_pub                  geo-replication/syncdaemon/configinterface.py                  glusterfsd/Makefile                  glusterfsd/src/Makefile @@ -226,7 +226,11 @@ AC_CONFIG_FILES([Makefile                  tools/gfind_missing_files/Makefile                  heal/Makefile                  heal/src/Makefile -                glusterfs.spec]) +                glusterfs.spec +                tools/glusterfind/src/tool.conf +                tools/glusterfind/glusterfind +                tools/glusterfind/Makefile +                tools/glusterfind/src/Makefile])  AC_CANONICAL_HOST @@ -887,6 +891,14 @@ if test "x$ac_cv_header_execinfo_h" = "xno"; then     fi  fi +old_prefix=$prefix +if test "x$prefix" = xNONE; then +	prefix=$ac_default_prefix +fi +GLUSTERFS_LIBEXECDIR="$(eval echo $prefix)/libexec/glusterfs" +GLUSTERFSD_MISCDIR="$(eval echo $prefix)/var/lib/misc/glusterfsd" +prefix=$old_prefix +  ### Dirty hacky stuff to make LOCALSTATEDIR work  if test "x$prefix" = xNONE; then     test $localstatedir = '${prefix}/var' && localstatedir=$ac_default_prefix/var @@ -1193,6 +1205,8 @@ AM_CONDITIONAL([GF_BSD_HOST_OS], test "${GF_HOST_OS}" = "GF_BSD_HOST_OS")  AC_SUBST(GLUSTERD_WORKDIR)  AM_CONDITIONAL([GF_INSTALL_GLUSTERD_WORKDIR], test ! -d ${GLUSTERD_WORKDIR} && test -d ${sysconfdir}/glusterd )  AC_SUBST(GLUSTERD_VOLFILE) +AC_SUBST(GLUSTERFS_LIBEXECDIR) +AC_SUBST(GLUSTERFSD_MISCDIR)  dnl pkg-config versioning  dnl diff --git a/extras/Makefile.am b/extras/Makefile.am index e2b29f2da45..89f69440423 100644 --- a/extras/Makefile.am +++ b/extras/Makefile.am @@ -1,3 +1,6 @@ +gsyncddir = $(libexecdir)/glusterfs +gsyncd_SCRIPTS = peer_add_secret_pub +  EditorModedir = $(docdir)  EditorMode_DATA = glusterfs-mode.el glusterfs.vim diff --git a/extras/hook-scripts/S56glusterd-geo-rep-create-post.sh b/extras/hook-scripts/S56glusterd-geo-rep-create-post.sh index 8d3734e8097..067dd7427da 100755 --- a/extras/hook-scripts/S56glusterd-geo-rep-create-post.sh +++ b/extras/hook-scripts/S56glusterd-geo-rep-create-post.sh @@ -75,6 +75,6 @@ if [ -f $pub_file ]; then          scp $pub_file $slave_ip:$pub_file_tmp          ssh $slave_ip "mv $pub_file_tmp ${pub_file_dname}/${mastervol}_${slavevol}_${pub_file_bname}"          ssh $slave_ip "gluster system:: copy file /geo-replication/${mastervol}_${slavevol}_common_secret.pem.pub > /dev/null" -        ssh $slave_ip "gluster system:: execute add_secret_pub root $mastervol $slavevol > /dev/null" +        ssh $slave_ip "gluster system:: execute add_secret_pub root geo-replication/${mastervol}_${slavevol}_common_secret.pem.pub > /dev/null"      fi  fi diff --git a/geo-replication/src/peer_add_secret_pub.in b/extras/peer_add_secret_pub.in index 1fc31642c8a..e3a9aa2a48b 100644 --- a/geo-replication/src/peer_add_secret_pub.in +++ b/extras/peer_add_secret_pub.in @@ -1,21 +1,15 @@  #!/bin/bash  user=$1 -mastervol=$2 -slavevol=$3 +pub_file=$2  if [ "$user" == "" ]; then      echo "Invalid User";      exit 1;  fi -if [ "$mastervol" == "" ]; then -    echo "Invalid master volume"; -    exit 1; -fi - -if [ "$slavevol" == "" ]; then -    echo "Invalid slave volume"; +if [ "$pub_file" == "" ]; then +    echo "Invalid pub file";      exit 1;  fi @@ -59,13 +53,11 @@ if [ ! -d $authorized_keys_file ]; then      chown $user: $authorized_keys_file;  fi -pub_file=${mastervol}_${slavevol}_common_secret.pem.pub -  # Add to authorized_keys file only if not exists already  while read line  do      grep -Fxq "$line" $authorized_keys_file;      [ $? -ne 0 ] && echo "$line" >> $authorized_keys_file; -done < "$GLUSTERD_WORKDIR"/geo-replication/$pub_file; +done < "$GLUSTERD_WORKDIR"/$pub_file;  exit 0; diff --git a/geo-replication/src/Makefile.am b/geo-replication/src/Makefile.am index 512128dfd2d..e9d6bc1a27e 100644 --- a/geo-replication/src/Makefile.am +++ b/geo-replication/src/Makefile.am @@ -1,6 +1,6 @@  gsyncddir = $(libexecdir)/glusterfs -gsyncd_SCRIPTS = gverify.sh peer_add_secret_pub peer_gsec_create \ +gsyncd_SCRIPTS = gverify.sh peer_gsec_create \  	set_geo_rep_pem_keys.sh peer_mountbroker  # peer_gsec_create and peer_add_secret_pub are not added to diff --git a/geo-replication/src/set_geo_rep_pem_keys.sh b/geo-replication/src/set_geo_rep_pem_keys.sh index c7cbdf36e4b..4f613da28df 100755 --- a/geo-replication/src/set_geo_rep_pem_keys.sh +++ b/geo-replication/src/set_geo_rep_pem_keys.sh @@ -45,7 +45,7 @@ function main()      if [ -f $home_dir/${COMMON_SECRET_PEM_PUB} ]; then          cp $home_dir/${COMMON_SECRET_PEM_PUB} ${GLUSTERD_WORKDIR}/geo-replication/          gluster system:: copy file /geo-replication/${COMMON_SECRET_PEM_PUB} -        gluster system:: execute add_secret_pub $user ${master_vol} ${slave_vol} +        gluster system:: execute add_secret_pub $user geo-replication/${master_vol}_${slave_vol}_common_secret.pem.pub      else          echo "$home_dir/common_secret.pem.pub not present. Please run geo-replication command on master with push-pem option to generate the file"          exit 1; diff --git a/glusterfs.spec.in b/glusterfs.spec.in index b385f377490..0df22cd3c4c 100644 --- a/glusterfs.spec.in +++ b/glusterfs.spec.in @@ -1078,6 +1078,17 @@ fi  %ghost %dir %attr(0755,-,-) %{_sharedstatedir}/glusterd/nfs/run  %ghost      %attr(0600,-,-) %{_sharedstatedir}/glusterd/nfs/run/nfs.pid +# Incrementalapi +%{_libexecdir}/glusterfs/glusterfind +%{_bindir}/glusterfind +%exclude %{_libexecdir}/glusterfs/glusterfind/brickfind.pyc +%exclude %{_libexecdir}/glusterfs/glusterfind/changelog.pyc +%exclude %{_libexecdir}/glusterfs/glusterfind/nodecleanup.pyc +%exclude %{_libexecdir}/glusterfs/glusterfind/brickfind.pyo +%exclude %{_libexecdir}/glusterfs/glusterfind/changelog.pyo +%exclude %{_libexecdir}/glusterfs/glusterfind/nodecleanup.pyo + +  %changelog  * Tue Mar 17 2015 Kaleb S. KEITHLEY <kkeithle@redhat.com>  - glusterfs-ganesha sub-package @@ -1085,6 +1096,9 @@ fi  * Thu Mar 12 2015 Kotresh H R <khiremat@redhat.com>  - gfind_missing_files tool is included (#1187140) +* Tue Mar 03 2015 Aravinda VK <avishwan@redhat.com> +- Included glusterfind files as part of server package. +  * Thu Feb 26 2015 Kaleb S. KEITHLEY <kkeithle@redhat.com>  - enable cmocka unittest support only when asked for (#1067059) diff --git a/tools/Makefile.am b/tools/Makefile.am index 74229ab41e7..d689f60fa52 100644 --- a/tools/Makefile.am +++ b/tools/Makefile.am @@ -1,3 +1,3 @@ -SUBDIRS = gfind_missing_files +SUBDIRS = gfind_missing_files glusterfind  CLEANFILES = diff --git a/tools/glusterfind/Makefile.am b/tools/glusterfind/Makefile.am new file mode 100644 index 00000000000..c99a3ddcb37 --- /dev/null +++ b/tools/glusterfind/Makefile.am @@ -0,0 +1,7 @@ +SUBDIRS = src + +EXTRA_DIST = + +bin_SCRIPTS = glusterfind + +CLEANFILES = $(bin_SCRIPTS) diff --git a/tools/glusterfind/glusterfind.in b/tools/glusterfind/glusterfind.in new file mode 100644 index 00000000000..cff8973980a --- /dev/null +++ b/tools/glusterfind/glusterfind.in @@ -0,0 +1,17 @@ +#!/usr/bin/env python + +# Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com/> +# This file is part of GlusterFS. +# +# This file is licensed to you under your choice of the GNU Lesser +# General Public License, version 3 or any later version (LGPLv3 or +# later), or the GNU General Public License, version 2 (GPLv2), in all +# cases as published by the Free Software Foundation. + +import sys +sys.path.insert(1, '@GLUSTERFS_LIBEXECDIR@/') + +from glusterfind.main import main + +if __name__ == "__main__": +    main() diff --git a/tools/glusterfind/src/Makefile.am b/tools/glusterfind/src/Makefile.am new file mode 100644 index 00000000000..458b820fd19 --- /dev/null +++ b/tools/glusterfind/src/Makefile.am @@ -0,0 +1,14 @@ +glusterfinddir = $(libexecdir)/glusterfs/glusterfind + +glusterfind_PYTHON = conf.py utils.py __init__.py \ +	main.py libgfchangelog.py + +glusterfind_SCRIPTS = changelog.py nodecleanup.py \ +	brickfind.py + +glusterfind_DATA = tool.conf + +EXTRA_DIST = changelog.py nodecleanup.py brickfind.py \ +	tool.conf + +CLEANFILES = diff --git a/tools/glusterfind/src/__init__.py b/tools/glusterfind/src/__init__.py new file mode 100644 index 00000000000..eb941c6d67c --- /dev/null +++ b/tools/glusterfind/src/__init__.py @@ -0,0 +1,9 @@ +#!/usr/bin/env python + +# Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com/> +# This file is part of GlusterFS. +# +# This file is licensed to you under your choice of the GNU Lesser +# General Public License, version 3 or any later version (LGPLv3 or +# later), or the GNU General Public License, version 2 (GPLv2), in all +# cases as published by the Free Software Foundation. diff --git a/tools/glusterfind/src/brickfind.py b/tools/glusterfind/src/brickfind.py new file mode 100644 index 00000000000..4aee225d22e --- /dev/null +++ b/tools/glusterfind/src/brickfind.py @@ -0,0 +1,97 @@ +#!/usr/bin/env python + +# Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com/> +# This file is part of GlusterFS. +# +# This file is licensed to you under your choice of the GNU Lesser +# General Public License, version 3 or any later version (LGPLv3 or +# later), or the GNU General Public License, version 2 (GPLv2), in all +# cases as published by the Free Software Foundation. + +import os +import sys +import logging +from argparse import ArgumentParser, RawDescriptionHelpFormatter +from errno import ENOENT + +from utils import mkdirp, setup_logger, create_file, output_write, find +import conf + + +PROG_DESCRIPTION = """ +Changelog Crawler +""" + +logger = logging.getLogger() + + +def brickfind_crawl(brick, args): +    if brick.endswith("/"): +        brick = brick[0:len(brick)-1] + +    working_dir = os.path.dirname(args.outfile) +    mkdirp(working_dir, exit_on_err=True, logger=logger) +    create_file(args.outfile, exit_on_err=True, logger=logger) + +    with open(args.outfile, "a+") as fout: +        brick_path_len = len(brick) + +        def mtime_filter(path): +            try: +                st = os.lstat(path) +            except (OSError, IOError) as e: +                if e.errno == ENOENT: +                    st = None +                else: +                    raise + +            if st and (st.st_mtime > args.start or st.st_ctime > args.start): +                return True + +            return False + +        def output_callback(path): +            path = path.strip() +            path = path[brick_path_len+1:] +            output_write(fout, path, args.output_prefix) + +        if args.full: +            find(brick, callback_func=output_callback, +                 ignore_dirs=[".glusterfs"]) +        else: +            find(brick, callback_func=output_callback, +                 filter_func=mtime_filter, +                 ignore_dirs=[".glusterfs"]) + +        fout.flush() +        os.fsync(fout.fileno()) + + +def _get_args(): +    parser = ArgumentParser(formatter_class=RawDescriptionHelpFormatter, +                            description=PROG_DESCRIPTION) + +    parser.add_argument("session", help="Session Name") +    parser.add_argument("volume", help="Volume Name") +    parser.add_argument("brick", help="Brick Name") +    parser.add_argument("outfile", help="Output File") +    parser.add_argument("start", help="Start Time", type=float) +    parser.add_argument("--debug", help="Debug", action="store_true") +    parser.add_argument("--full", help="Full Find", action="store_true") +    parser.add_argument("--output-prefix", help="File prefix in output", +                        default=".") + +    return parser.parse_args() + + +if __name__ == "__main__": +    args = _get_args() +    mkdirp(os.path.join(conf.get_opt("log_dir"), args.session, args.volume), +           exit_on_err=True) +    log_file = os.path.join(conf.get_opt("log_dir"), +                            args.session, +                            args.volume, +                            "brickfind.log") +    setup_logger(logger, log_file, args.debug) +    brickfind_crawl(args.brick, args) +    sys.exit(0) diff --git a/tools/glusterfind/src/changelog.py b/tools/glusterfind/src/changelog.py new file mode 100644 index 00000000000..b7697ea5030 --- /dev/null +++ b/tools/glusterfind/src/changelog.py @@ -0,0 +1,309 @@ +#!/usr/bin/env python + +# Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com/> +# This file is part of GlusterFS. +# +# This file is licensed to you under your choice of the GNU Lesser +# General Public License, version 3 or any later version (LGPLv3 or +# later), or the GNU General Public License, version 2 (GPLv2), in all +# cases as published by the Free Software Foundation. + +import os +import sys +import time +import xattr +from errno import ENOENT +import logging +from argparse import ArgumentParser, RawDescriptionHelpFormatter +import hashlib + +import libgfchangelog +from utils import create_file, mkdirp, execute, symlink_gfid_to_path +from utils import fail, setup_logger, output_write, find +import conf + + +CHANGELOG_LOG_LEVEL = 9 +CHANGELOG_CONN_RETRIES = 5 +CHANGELOGAPI_NUM_WORKERS = 3 +PROG_DESCRIPTION = """ +Changelog Crawler +""" +history_turns = 0 +history_turn_time = 0 + +logger = logging.getLogger() + + +def gfid_to_path_using_batchfind(brick, gfids_file, output_file): +    """ +    find -samefile gets the inode number and crawls entire namespace +    to get the list of files/dirs having same inode number. +    Do find without any option, except the ignore directory option, +    print the output in <INODE_NUM> <PATH> format, use this output +    to look into in-memory dictionary of inode numbers got from the +    list of GFIDs +    """ +    with open(output_file, "a+") as fout: +        inode_dict = {} +        with open(gfids_file) as f: +            for gfid in f: +                gfid = gfid.strip() +                backend_path = os.path.join(brick, ".glusterfs", +                                            gfid[0:2], gfid[2:4], gfid) + +                try: +                    inode_dict[str(os.stat(backend_path).st_ino)] = 1 +                except (IOError, OSError) as e: +                    if e.errno == ENOENT: +                        continue +                    else: +                        fail("%s Failed to convert to path from " +                             "GFID %s: %s" % (brick, gfid, e), logger=logger) + +        if not inode_dict: +            return + +        def inode_filter(path): +            try: +                st = os.lstat(path) +            except (OSError, IOError) as e: +                if e.errno == ENOENT: +                    st = None +                else: +                    raise + +            if st and inode_dict.get(str(st.st_ino), None): +                return True + +            return False + +        brick_path_len = len(brick) + +        def output_callback(path): +            path = path.strip() +            path = path[brick_path_len+1:] +            output_write(fout, path, args.output_prefix) + +        # Length of brick path, to remove from output path +        find(brick, callback_func=output_callback, +             filter_func=inode_filter, +             ignore_dirs=[".glusterfs"]) + +        fout.flush() +        os.fsync(fout.fileno()) + + +def gfid_to_path_using_pgfid(brick, gfids_file, output_file, outfile_failures): +    """ +    Parent GFID is saved as xattr, collect Parent GFIDs from all +    the files from gfids_file. Convert parent GFID to path and Crawl +    each directories to get the list of files/dirs having same inode number. +    Do find with maxdepth as 1 and print the output in <INODE_NUM> <PATH> +    format, use this output to look into in memory dictionary of inode +    numbers got from the list of GFIDs +    """ +    with open(output_file, "a+") as fout: +        pgfids = set() +        inode_dict = {} +        with open(gfids_file) as f: +            for gfid in f: +                gfid = gfid.strip() +                p = os.path.join(brick, +                                 ".glusterfs", +                                 gfid[0:2], +                                 gfid[2:4], +                                 gfid) +                if os.path.islink(p): +                    path = symlink_gfid_to_path(brick, gfid) +                    output_write(fout, path, args.output_prefix) +                else: +                    try: +                        inode_dict[str(os.stat(p).st_ino)] = 1 +                        file_xattrs = xattr.list(p) +                        num_parent_gfid = 0 +                        for x in file_xattrs: +                            if x.startswith("trusted.pgfid."): +                                num_parent_gfid += 1 +                                pgfids.add(x.split(".")[-1]) + +                        if num_parent_gfid == 0: +                            with open(outfile_failures, "a") as f: +                                f.write("%s\n" % gfid) +                                f.flush() +                                os.fsync(f.fileno()) + +                    except (IOError, OSError) as e: +                        if e.errno == ENOENT: +                            continue +                        else: +                            fail("%s Failed to convert to path from " +                                 "GFID %s: %s" % (brick, gfid, e), +                                 logger=logger) + +        if not inode_dict: +            return + +        def inode_filter(path): +            try: +                st = os.lstat(path) +            except (OSError, IOError) as e: +                if e.errno == ENOENT: +                    st = None +                else: +                    raise + +            if st and inode_dict.get(str(st.st_ino), None): +                return True + +            return False + +        # Length of brick path, to remove from output path +        brick_path_len = len(brick) + +        def output_callback(path): +            path = path.strip() +            path = path[brick_path_len+1:] +            output_write(fout, path, args.output_prefix) + +        for pgfid in pgfids: +            path = symlink_gfid_to_path(brick, pgfid) +            find(os.path.join(brick, path), +                 callback_func=output_callback, +                 filter_func=inode_filter, +                 ignore_dirs=[".glusterfs"], +                 subdirs_crawl=False) + +        fout.flush() +        os.fsync(fout.fileno()) + + +def sort_unique(filename): +    execute(["sort", "-u", "-o", filename, filename], +            exit_msg="Sort failed", logger=logger) + + +def get_changes(brick, hash_dir, log_file, end, args): +    """ +    Makes use of libgfchangelog's history API to get changelogs +    containing changes from start and end time. Further collects +    the modified gfids from the changelogs and writes the list +    of gfid to 'gfid_list' file. +    """ +    try: +        libgfchangelog.cl_register(brick, hash_dir, log_file, +                                   CHANGELOG_LOG_LEVEL, CHANGELOG_CONN_RETRIES) +    except libgfchangelog.ChangelogException as e: +        fail("%s Changelog register failed: %s" % (brick, e), logger=logger) + +    # Output files to record GFIDs and GFID to Path failure GFIDs +    gfid_list_path = args.outfile + ".gfids" +    gfid_list_failures_file = gfid_list_path + ".failures" +    create_file(gfid_list_path, exit_on_err=True, logger=logger) +    create_file(gfid_list_failures_file, exit_on_err=True, logger=logger) + +    # Changelogs path(Hard coded to BRICK/.glusterfs/changelogs +    cl_path = os.path.join(brick, ".glusterfs/changelogs") + +    # Fail if History fails for requested Start and End +    try: +        actual_end = libgfchangelog.cl_history_changelog( +            cl_path, args.start, end, CHANGELOGAPI_NUM_WORKERS) +    except libgfchangelog.ChangelogException as e: +        fail("%s Historical Changelogs not available: %s" % (brick, e), +             logger=logger) + +    try: +        # scan followed by getchanges till scan returns zero. +        # history_scan() is blocking call, till it gets the number +        # of changelogs to process. Returns zero when no changelogs +        # to be processed. returns positive value as number of changelogs +        # to be processed, which will be fetched using +        # history_getchanges() +        changes = [] +        while libgfchangelog.cl_history_scan() > 0: +            changes += libgfchangelog.cl_history_getchanges() + +            if changes: +                with open(gfid_list_path, 'a+') as fgfid: +                    for change in changes: +                        with open(change) as f: +                            for line in f: +                                # Space delimited list, collect GFID +                                details = line.split() +                                fgfid.write("%s\n" % details[1]) + +                        libgfchangelog.cl_history_done(change) +                    fgfid.flush() +                    os.fsync(fgfid.fileno()) +    except libgfchangelog.ChangelogException as e: +        fail("%s Error during Changelog Crawl: %s" % (brick, e), +             logger=logger) + +    # If TS returned from history_changelog is < end time +    # then FS crawl may be required, since history is only available +    # till TS returned from history_changelog +    if actual_end < end: +        fail("Partial History available with Changelog", 2, logger=logger) + +    sort_unique(gfid_list_path) +    gfid_to_path_using_pgfid(brick, gfid_list_path, +                             args.outfile, gfid_list_failures_file) +    gfid_to_path_using_batchfind(brick, gfid_list_failures_file, args.outfile) + + +def changelog_crawl(brick, end, args): +    """ +    Init function, prepares working dir and calls Changelog query +    """ +    if brick.endswith("/"): +        brick = brick[0:len(brick)-1] + +    # WORKING_DIR/BRICKHASH/OUTFILE +    working_dir = os.path.dirname(args.outfile) +    brickhash = hashlib.sha1(brick) +    brickhash = str(brickhash.hexdigest()) +    working_dir = os.path.join(working_dir, brickhash) + +    mkdirp(working_dir, exit_on_err=True, logger=logger) +    create_file(args.outfile, exit_on_err=True, logger=logger) +    create_file(args.outfile + ".gfids", exit_on_err=True, logger=logger) + +    log_file = os.path.join(conf.get_opt("log_dir"), +                            args.session, +                            args.volume, +                            "changelog.%s.log" % brickhash) + +    logger.info("%s Started Changelog Crawl. Start: %s, End: %s" +                % (brick, args.start, end)) +    get_changes(brick, working_dir, log_file, end, args) + + +def _get_args(): +    parser = ArgumentParser(formatter_class=RawDescriptionHelpFormatter, +                            description=PROG_DESCRIPTION) + +    parser.add_argument("session", help="Session Name") +    parser.add_argument("volume", help="Volume Name") +    parser.add_argument("brick", help="Brick Name") +    parser.add_argument("outfile", help="Output File") +    parser.add_argument("start", help="Start Time", type=int) +    parser.add_argument("--debug", help="Debug", action="store_true") +    parser.add_argument("--output-prefix", help="File prefix in output", +                        default=".") + +    return parser.parse_args() + + +if __name__ == "__main__": +    args = _get_args() +    mkdirp(os.path.join(conf.get_opt("log_dir"), args.session, args.volume), +           exit_on_err=True) +    log_file = os.path.join(conf.get_opt("log_dir"), +                            args.session, +                            args.volume, +                            "changelog.log") +    setup_logger(logger, log_file, args.debug) +    end = int(time.time()) - int(conf.get_opt("changelog_rollover_time")) +    changelog_crawl(args.brick, end, args) +    sys.exit(0) diff --git a/tools/glusterfind/src/conf.py b/tools/glusterfind/src/conf.py new file mode 100644 index 00000000000..2c6eac2bb14 --- /dev/null +++ b/tools/glusterfind/src/conf.py @@ -0,0 +1,28 @@ +#!/usr/bin/env python + +# Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com/> +# This file is part of GlusterFS. +# +# This file is licensed to you under your choice of the GNU Lesser +# General Public License, version 3 or any later version (LGPLv3 or +# later), or the GNU General Public License, version 2 (GPLv2), in all +# cases as published by the Free Software Foundation. + +import os +import ConfigParser + +config = ConfigParser.ConfigParser() +config.read(os.path.join(os.path.dirname(os.path.abspath(__file__)), +                         "tool.conf")) + + +def list_change_detectors(): +    return dict(config.items("change_detectors")).keys() + + +def get_opt(opt): +    return config.get("vars", opt) + + +def get_change_detector(opt): +    return config.get("change_detectors", opt) diff --git a/tools/glusterfind/src/libgfchangelog.py b/tools/glusterfind/src/libgfchangelog.py new file mode 100644 index 00000000000..e54a16a4742 --- /dev/null +++ b/tools/glusterfind/src/libgfchangelog.py @@ -0,0 +1,83 @@ +#!/usr/bin/env python + +# Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com/> +# This file is part of GlusterFS. +# +# This file is licensed to you under your choice of the GNU Lesser +# General Public License, version 3 or any later version (LGPLv3 or +# later), or the GNU General Public License, version 2 (GPLv2), in all +# cases as published by the Free Software Foundation. + +import os +from ctypes import CDLL, get_errno, create_string_buffer, c_ulong, byref +from ctypes import RTLD_GLOBAL +from ctypes.util import find_library + + +class ChangelogException(OSError): +    pass + + +libgfc = CDLL(find_library("gfchangelog"), use_errno=True, mode=RTLD_GLOBAL) + + +def raise_oserr(): +    errn = get_errno() +    raise ChangelogException(errn, os.strerror(errn)) + + +def cl_register(brick, path, log_file, log_level, retries=0): +    ret = libgfc.gf_changelog_register(brick, path, log_file, +                                       log_level, retries) +    if ret == -1: +        raise_oserr() + + +def cl_history_scan(): +    ret = libgfc.gf_history_changelog_scan() +    if ret == -1: +        raise_oserr() + +    return ret + + +def cl_history_changelog(changelog_path, start, end, num_parallel): +    actual_end = c_ulong() +    ret = libgfc.gf_history_changelog(changelog_path, start, end, +                                      num_parallel, +                                      byref(actual_end)) +    if ret == -1: +        raise_oserr() + +    return actual_end.value + + +def cl_history_startfresh(): +    ret = libgfc.gf_history_changelog_start_fresh() +    if ret == -1: +        raise_oserr() + + +def cl_history_getchanges(): +    """ remove hardcoding for path name length """ +    def clsort(f): +        return f.split('.')[-1] + +    changes = [] +    buf = create_string_buffer('\0', 4096) + +    while True: +        ret = libgfc.gf_history_changelog_next_change(buf, 4096) +        if ret in (0, -1): +            break +        changes.append(buf.raw[:ret - 1]) +    if ret == -1: +        raise_oserr() + +    return sorted(changes, key=clsort) + + +def cl_history_done(clfile): +    ret = libgfc.gf_history_changelog_done(clfile) +    if ret == -1: +        raise_oserr() diff --git a/tools/glusterfind/src/main.py b/tools/glusterfind/src/main.py new file mode 100644 index 00000000000..d6b9a24dec9 --- /dev/null +++ b/tools/glusterfind/src/main.py @@ -0,0 +1,468 @@ +#!/usr/bin/env python + +# Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com/> +# This file is part of GlusterFS. +# +# This file is licensed to you under your choice of the GNU Lesser +# General Public License, version 3 or any later version (LGPLv3 or +# later), or the GNU General Public License, version 2 (GPLv2), in all +# cases as published by the Free Software Foundation. + +import sys +from errno import ENOENT +import time +from multiprocessing import Process +import os +import xml.etree.cElementTree as etree +from argparse import ArgumentParser, RawDescriptionHelpFormatter +import logging +import shutil + +from utils import execute, is_host_local, mkdirp, fail +from utils import setup_logger, human_time +import conf + + +PROG_DESCRIPTION = """ +GlusterFS Incremental API +""" +ParseError = etree.ParseError if hasattr(etree, 'ParseError') else SyntaxError + +logger = logging.getLogger() + + +def node_run(volume, host, path, start, outfile, args, fallback=False): +    """ +    If host is local node, execute the command locally. If not local +    execute the CHANGE_DETECTOR command via ssh and copy the output file from +    remote node using scp. +    """ +    localdir = is_host_local(host) + +    # If Full backup is requested or start time is zero, use brickfind +    change_detector = conf.get_change_detector(args.change_detector) +    if ((start == 0 or args.full) and args.change_detector == "changelog") or \ +       fallback: +        change_detector = conf.get_change_detector("brickfind") + +    # CHANGE_DETECTOR <SESSION> <VOLUME> <BRICK> <OUTFILE> <START> --debug +    # --gfidpath <TYPE> +    cmd = [change_detector, +           args.session, +           volume, +           path, +           outfile, +           str(start), +           "--output-prefix", +           args.output_prefix] + \ +        (["--debug"] if args.debug else []) + \ +        (["--full"] if args.full else []) + +    if not localdir: +        # prefix with ssh command if not local node +        cmd = ["ssh", +               "-i", conf.get_opt("secret_pem"), +               "root@%s" % host] + cmd + +    rc, out, err = execute(cmd, logger=logger) +    if rc == 2: +        # Partial History Fallback +        logger.info("%s %s Fallback to brickfind" % (host, err.strip())) +        # Exit only from process, handled in main. +        sys.exit(rc) +    elif rc != 0: +        fail("%s - Change detection failed" % host, logger=logger) + +    if not localdir: +        cmd_copy = ["scp", +                    "-i", conf.get_opt("secret_pem"), +                    "root@%s:/%s" % (host, outfile), +                    os.path.dirname(outfile)] +        execute(cmd_copy, exit_msg="%s - Copy command failed" % host, +                logger=logger) + + +def node_cleanup(host, args): +    localdir = is_host_local(host) + +    # CHANGE_DETECTOR <SESSION> <VOLUME> <BRICK> <OUTFILE> <START> --debug +    # --gfidpath <TYPE> +    cmd = [conf.get_opt("nodecleanup"), +           args.session, +           args.volume] + (["--debug"] if args.debug else []) + +    if not localdir: +        # prefix with ssh command if not local node +        cmd = ["ssh", +               "-i", conf.get_opt("secret_pem"), +               "root@%s" % host] + cmd + +    execute(cmd, exit_msg="%s - Cleanup failed" % host, logger=logger) + + +def cleanup(nodes, args): +    pool = [] +    for num, node in enumerate(nodes): +        host, brick = node[1].split(":") +        # temp output file +        node_outfile = os.path.join(conf.get_opt("working_dir"), +                                    args.session, +                                    args.volume, +                                    "tmp_output_%s.txt" % num) + +        try: +            os.remove(node_outfile) +        except (OSError, IOError): +            # TODO: Cleanup Failure, Handle +            pass + +        p = Process(target=node_cleanup, +                    args=(host, args)) +        p.start() +        pool.append(p) + +    exit_codes = 0 +    for p in pool: +        p.join() +        exit_codes += (0 if p.exitcode == 0 else 1) + +    if exit_codes != 0: +        sys.exit(1) + + +def failback_node_run(brick_details, idx, volume, start, outfile, args): +    host, brick = brick_details.split(":") +    p = Process(target=node_run, +                args=(volume, host, brick, start, outfile, args, True)) +    p.start() +    p.join() +    return p.exitcode + + +def run_in_nodes(volume, start, args): +    """ +    Get nodes of volume using gluster volume info, spawn a process +    each for a Node. Merge the output files once all the process +    complete their tasks. +    """ +    nodes = get_nodes(volume) +    pool = [] +    node_outfiles = [] +    for num, node in enumerate(nodes): +        host, brick = node[1].split(":") +        # temp output file +        node_outfile = os.path.join(conf.get_opt("working_dir"), +                                    args.session, +                                    volume, +                                    "tmp_output_%s.txt" % num) +        node_outfiles.append(node_outfile) +        p = Process(target=node_run, args=(volume, host, brick, start, +                                           node_outfile, args)) +        p.start() +        pool.append(p) + +    exit_codes = 0 +    for idx, p in enumerate(pool): +        p.join() +        # Handle the Changelog failure, fallback to Brickfind +        if p.exitcode == 2: +            rc = failback_node_run(nodes[idx][1], idx, volume, start, +                                   node_outfiles[idx], args) +            exit_codes += (0 if rc == 0 else 1) +        elif p.exitcode != 0: +            exit_codes += (0 if p.exitcode == 0 else 1) + +    if exit_codes != 0: +        sys.exit(1) + +    # Merge all output files +    cmd = ["sort", "-u"] + node_outfiles + ["-o", args.outfile] +    execute(cmd, +            exit_msg="Failed to merge output files " +            "collected from nodes", logger=logger) + +    cleanup(nodes, args) + + +def get_nodes(volume): +    """ +    Get the gluster volume info xml output and parse to get +    the brick details. +    """ +    cmd = ["gluster", 'volume', 'info', volume, "--xml"] +    _, data, _ = execute(cmd, +                         exit_msg="Failed to Run Gluster Volume Info", +                         logger=logger) +    tree = etree.fromstring(data) + +    nodes = [] +    volume_el = tree.find('volInfo/volumes/volume') +    try: +        for b in volume_el.findall('bricks/brick'): +            nodes.append((b.find('hostUuid').text, +                          b.find('name').text)) +    except (ParseError, AttributeError, ValueError) as e: +        fail("Failed to parse Volume Info: %s" % e, logger=logger) + +    return nodes + + +def _get_args(): +    parser = ArgumentParser(formatter_class=RawDescriptionHelpFormatter, +                            description=PROG_DESCRIPTION) +    subparsers = parser.add_subparsers(dest="mode") + +    # create <SESSION> <VOLUME> [--debug] [--force] +    parser_create = subparsers.add_parser('create') +    parser_create.add_argument("session", help="Session Name") +    parser_create.add_argument("volume", help="Volume Name") +    parser_create.add_argument("--debug", help="Debug", action="store_true") +    parser_create.add_argument("--force", help="Force option to recreate " +                               "the session", action="store_true") + +    # delete <SESSION> <VOLUME> [--debug] +    parser_delete = subparsers.add_parser('delete') +    parser_delete.add_argument("session", help="Session Name") +    parser_delete.add_argument("volume", help="Volume Name") +    parser_delete.add_argument("--debug", help="Debug", action="store_true") + +    # list [--session <SESSION>] [--volume <VOLUME>] +    parser_list = subparsers.add_parser('list') +    parser_list.add_argument("--session", help="Session Name", default="") +    parser_list.add_argument("--volume", help="Volume Name", default="") +    parser_list.add_argument("--debug", help="Debug", action="store_true") + +    # pre <SESSION> <VOLUME> <OUTFILE> [--change-detector <CHANGE_DETECTOR>] +    #     [--output-prefix <OUTPUT_PREFIX>] [--full] +    parser_pre = subparsers.add_parser('pre') +    parser_pre.add_argument("session", help="Session Name") +    parser_pre.add_argument("volume", help="Volume Name") +    parser_pre.add_argument("outfile", help="Output File") +    parser_pre.add_argument("--debug", help="Debug", action="store_true") +    parser_pre.add_argument("--full", help="Full find", action="store_true") +    parser_pre.add_argument("--change-detector", dest="change_detector", +                            help="Change detection", +                            choices=conf.list_change_detectors(), +                            type=str, default='changelog') +    parser_pre.add_argument("--output-prefix", help="File prefix in output", +                            default=".") + +    # post <SESSION> <VOLUME> +    parser_post = subparsers.add_parser('post') +    parser_post.add_argument("session", help="Session Name") +    parser_post.add_argument("volume", help="Volume Name") +    parser_post.add_argument("--debug", help="Debug", action="store_true") + +    return parser.parse_args() + + +def ssh_setup(): +    if not os.path.exists(conf.get_opt("secret_pem")): +        # Generate ssh-key +        cmd = ["ssh-keygen", +               "-N", +               "", +               "-f", +               conf.get_opt("secret_pem")] +        execute(cmd, +                exit_msg="Unable to generate ssh key %s" +                % conf.get_opt("secret_pem"), +                logger=logger) + +        logger.info("Ssh key generated %s" % conf.get_opt("secret_pem")) + +    # Copy pub file to all nodes +    cmd = ["gluster", +           "system::", +           "copy", +           "file", +           "/" + os.path.basename(conf.get_opt("secret_pem")) + ".pub"] +    execute(cmd, exit_msg="Failed to distribute ssh keys", logger=logger) + +    logger.info("Distributed ssh key to all nodes of Volume") + +    # Add to authorized_keys file in each node +    cmd = ["gluster", +           "system::", +           "execute", +           "add_secret_pub", +           "root", +           os.path.basename(conf.get_opt("secret_pem")) + ".pub"] +    execute(cmd, +            exit_msg="Failed to add ssh keys to authorized_keys file", +            logger=logger) + +    logger.info("Ssh key added to authorized_keys of Volume nodes") + + +def mode_create(session_dir, args): +    logger.debug("Init is called - Session: %s, Volume: %s" +                 % (args.session, args.volume)) + +    mkdirp(session_dir, exit_on_err=True, logger=logger) +    mkdirp(os.path.join(session_dir, args.volume), exit_on_err=True, +           logger=logger) +    status_file = os.path.join(session_dir, args.volume, "status") + +    if os.path.exists(status_file) and not args.force: +        fail("Session %s already created" % args.session, logger=logger) + +    if not os.path.exists(status_file) or args.force: +        ssh_setup() + +        execute(["gluster", "volume", "set", +                 args.volume, "build-pgfid", "on"], +                exit_msg="Failed to set volume option build-pgfid on", +                logger=logger) +        logger.info("Volume option set %s, build-pgfid on" % args.volume) + +        execute(["gluster", "volume", "set", +                 args.volume, "changelog.changelog", "on"], +                exit_msg="Failed to set volume option " +                "changelog.changelog on", logger=logger) +        logger.info("Volume option set %s, changelog.changelog on" +                    % args.volume) + +    if not os.path.exists(status_file): +        with open(status_file, "w", buffering=0) as f: +            # Add Rollover time to current time to make sure changelogs +            # will be available if we use this time as start time +            time_to_update = int(time.time()) + int( +                conf.get_opt("changelog_rollover_time")) +            f.write(str(time_to_update)) + +    sys.exit(0) + + +def mode_pre(session_dir, args): +    """ +    Read from Session file and write to session.pre file +    """ +    endtime_to_update = int(time.time()) - int( +        conf.get_opt("changelog_rollover_time")) +    status_file = os.path.join(session_dir, args.volume, "status") +    status_file_pre = status_file + ".pre" + +    mkdirp(os.path.dirname(args.outfile), exit_on_err=True, logger=logger) + +    start = 0 +    try: +        with open(status_file) as f: +            start = int(f.read().strip()) +    except ValueError: +        pass +    except (OSError, IOError) as e: +        fail("Error Opening Session file %s: %s" +             % (status_file, e), logger=logger) + +    logger.debug("Pre is called - Session: %s, Volume: %s, " +                 "Start time: %s, End time: %s" +                 % (args.session, args.volume, start, endtime_to_update)) + +    run_in_nodes(args.volume, start, args) + +    with open(status_file_pre, "w", buffering=0) as f: +        f.write(str(endtime_to_update)) + +    sys.stdout.write("Generated output file %s\n" % args.outfile) + + +def mode_post(session_dir, args): +    """ +    If pre session file exists, overwrite session file +    If pre session file does not exists, return ERROR +    """ +    status_file = os.path.join(session_dir, args.volume, "status") +    logger.debug("Post is called - Session: %s, Volume: %s" +                 % (args.session, args.volume)) +    status_file_pre = status_file + ".pre" + +    if os.path.exists(status_file_pre): +        os.rename(status_file_pre, status_file) +        sys.exit(0) +    else: +        fail("Pre script is not run", logger=logger) + + +def mode_delete(session_dir, args): +    def handle_rm_error(func, path, exc_info): +        if exc_info[1].errno == ENOENT: +            return + +        raise exc_info[1] + +    shutil.rmtree(os.path.join(session_dir, args.volume), +                  onerror=handle_rm_error) + + +def mode_list(session_dir, args): +    """ +    List available sessions to stdout, if session name is set +    only list that session. +    """ +    if args.session: +        if not os.path.exists(os.path.join(session_dir, args.session)): +            fail("Invalid Session", logger=logger) +        sessions = [args.session] +    else: +        sessions = [] +        for d in os.listdir(session_dir): +            sessions.append(d) + +    output = [] +    for session in sessions: +        # Session Volume Last Processed +        volnames = os.listdir(os.path.join(session_dir, session)) + +        for volname in volnames: +            if args.volume and args.volume != volname: +                continue + +            status_file = os.path.join(session_dir, session, volname, "status") +            last_processed = None +            try: +                with open(status_file) as f: +                    last_processed = f.read().strip() +            except (OSError, IOError) as e: +                if e.errno == ENOENT: +                    pass +                else: +                    raise +            output.append((session, volname, last_processed)) + +    if output: +        sys.stdout.write("%s %s %s\n" % ("SESSION".ljust(25), +                                         "VOLUME".ljust(25), +                                         "SESSION TIME".ljust(25))) +        sys.stdout.write("-"*75) +        sys.stdout.write("\n") +    for session, volname, last_processed in output: +        sys.stdout.write("%s %s %s\n" % (session.ljust(25), +                                         volname.ljust(25), +                                         human_time(last_processed).ljust(25))) + + +def main(): +    args = _get_args() +    mkdirp(conf.get_opt("session_dir"), exit_on_err=True) + +    if args.mode == "list": +        session_dir = conf.get_opt("session_dir") +    else: +        session_dir = os.path.join(conf.get_opt("session_dir"), +                                   args.session) + +    if not os.path.exists(session_dir) and args.mode not in ["create", "list"]: +        fail("Invalid session %s" % args.session) + +    mkdirp(os.path.join(conf.get_opt("log_dir"), args.session, args.volume), +           exit_on_err=True) +    log_file = os.path.join(conf.get_opt("log_dir"), +                            args.session, +                            args.volume, +                            "cli.log") +    setup_logger(logger, log_file, args.debug) + +    # globals() will have all the functions already defined. +    # mode_<args.mode> will be the function name to be called +    globals()["mode_" + args.mode](session_dir, args) diff --git a/tools/glusterfind/src/nodecleanup.py b/tools/glusterfind/src/nodecleanup.py new file mode 100644 index 00000000000..a31d4d83acd --- /dev/null +++ b/tools/glusterfind/src/nodecleanup.py @@ -0,0 +1,51 @@ +#!/usr/bin/env python + +# Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com/> +# This file is part of GlusterFS. +# +# This file is licensed to you under your choice of the GNU Lesser +# General Public License, version 3 or any later version (LGPLv3 or +# later), or the GNU General Public License, version 2 (GPLv2), in all +# cases as published by the Free Software Foundation. + +import shutil +import sys +import os +import logging +from errno import ENOENT + +from utils import setup_logger, mkdirp +import conf + +logger = logging.getLogger() + + +if __name__ == "__main__": +    # Args: <SESSION> <VOLUME> +    session = sys.argv[1] +    volume = sys.argv[2] + +    working_dir = os.path.join(conf.get_opt("working_dir"), +                               session, +                               volume) + +    mkdirp(os.path.join(conf.get_opt("log_dir"), session, volume), +           exit_on_err=True) +    log_file = os.path.join(conf.get_opt("log_dir"), +                            session, +                            volume, +                            "changelog.log") + +    setup_logger(logger, log_file) + +    try: +        def handle_rm_error(func, path, exc_info): +            if exc_info[1].errno == ENOENT: +                return + +            raise exc_info[1] + +        shutil.rmtree(working_dir, onerror=handle_rm_error) +    except (OSError, IOError) as e: +        logger.error("Failed to delete working directory: %s" % e) +        sys.exit(1) diff --git a/tools/glusterfind/src/tool.conf.in b/tools/glusterfind/src/tool.conf.in new file mode 100644 index 00000000000..bae46499aa0 --- /dev/null +++ b/tools/glusterfind/src/tool.conf.in @@ -0,0 +1,11 @@ +[vars] +session_dir=@GLUSTERD_WORKDIR@/glusterfind/ +secret_pem=@GLUSTERD_WORKDIR@/glusterfind.secret.pem +working_dir=@GLUSTERFSD_MISCDIR@/glusterfind/ +log_dir=/var/log/glusterfs/glusterfind/ +nodecleanup=@GLUSTERFS_LIBEXECDIR@/glusterfind/nodecleanup.py +changelog_rollover_time=15 + +[change_detectors] +changelog=@GLUSTERFS_LIBEXECDIR@/glusterfind/changelog.py +brickfind=@GLUSTERFS_LIBEXECDIR@/glusterfind/brickfind.py
\ No newline at end of file diff --git a/tools/glusterfind/src/utils.py b/tools/glusterfind/src/utils.py new file mode 100644 index 00000000000..c503a2b9f58 --- /dev/null +++ b/tools/glusterfind/src/utils.py @@ -0,0 +1,203 @@ +#!/usr/bin/env python + +# Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com/> +# This file is part of GlusterFS. +# +# This file is licensed to you under your choice of the GNU Lesser +# General Public License, version 3 or any later version (LGPLv3 or +# later), or the GNU General Public License, version 2 (GPLv2), in all +# cases as published by the Free Software Foundation. + +import sys +import socket +from subprocess import PIPE, Popen +from errno import EPERM, EEXIST +import logging +import os +from datetime import datetime + +ROOT_GFID = "00000000-0000-0000-0000-000000000001" + + +def find(path, callback_func=lambda x: True, filter_func=lambda x: True, +         ignore_dirs=[], subdirs_crawl=True): +    if os.path.basename(path) in ignore_dirs: +        return + +    if filter_func(path): +        callback_func(path) + +    for p in os.listdir(path): +        full_path = os.path.join(path, p) + +        if os.path.isdir(full_path): +            if subdirs_crawl: +                find(full_path, callback_func, filter_func, ignore_dirs) +            else: +                if filter_func(full_path): +                    callback_func(full_path) +        else: +            if filter_func(full_path): +                callback_func(full_path) + + +def output_write(f, path, prefix="."): +    if path == "": +        return + +    if prefix != ".": +        path = os.path.join(prefix, path) +    f.write("%s\n" % path) + + +def human_time(ts): +    return datetime.fromtimestamp(float(ts)).strftime("%Y-%m-%d %H:%M:%S") + + +def setup_logger(logger, path, debug=False): +    if debug: +        logger.setLevel(logging.DEBUG) +    else: +        logger.setLevel(logging.INFO) + +    # create the logging file handler +    fh = logging.FileHandler(path) + +    formatter = logging.Formatter("[%(asctime)s] %(levelname)s " +                                  "[%(module)s - %(lineno)s:%(funcName)s] " +                                  "- %(message)s") + +    fh.setFormatter(formatter) + +    # add handler to logger object +    logger.addHandler(fh) + + +def create_file(path, exit_on_err=False, logger=None): +    """ +    If file exists overwrite. Print error to stderr and exit +    if exit_on_err is set, else raise the exception. Consumer +    should handle the exception. +    """ +    try: +        open(path, 'w').close() +    except (OSError, IOError) as e: +        if exit_on_err: +            fail("Failed to create file %s: %s" % (path, e), logger=logger) +        else: +            raise + + +def mkdirp(path, exit_on_err=False, logger=None): +    """ +    Try creating required directory structure +    ignore EEXIST and raise exception for rest of the errors. +    Print error in stderr and exit if exit_on_err is set, else +    raise exception. +    """ +    try: +        os.makedirs(path) +    except (OSError, IOError) as e: +        if e.errno == EEXIST and os.path.isdir(path): +            pass +        else: +            if exit_on_err: +                fail("Fail to create dir %s: %s" % (path, e), logger=logger) +            else: +                raise + + +def fail(msg, code=1, logger=None): +    """ +    Write error to stderr and exit +    """ +    if logger: +        logger.error(msg) +    sys.stderr.write("%s\n" % msg) +    sys.exit(code) + + +def execute(cmd, exit_msg=None, logger=None): +    """ +    If failure_msg is not None then return returncode, out and error. +    If failure msg is set, write to stderr and exit. +    """ +    p = Popen(cmd, stdin=PIPE, stdout=PIPE, stderr=PIPE, close_fds=True) + +    (out, err) = p.communicate() +    if p.returncode != 0 and exit_msg is not None: +        fail("%s: %s" % (exit_msg, err), p.returncode, logger=logger) + +    return (p.returncode, out, err) + + +def symlink_gfid_to_path(brick, gfid): +    """ +    Each directories are symlinked to file named GFID +    in .glusterfs directory of brick backend. Using readlink +    we get PARGFID/basename of dir. readlink recursively till +    we get PARGFID as ROOT_GFID. +    """ +    if gfid == ROOT_GFID: +        return "" + +    out_path = "" +    while True: +        path = os.path.join(brick, ".glusterfs", gfid[0:2], gfid[2:4], gfid) +        path_readlink = os.readlink(path) +        pgfid = os.path.dirname(path_readlink) +        out_path = os.path.join(os.path.basename(path_readlink), out_path) +        if pgfid == "../../00/00/%s" % ROOT_GFID: +            break +        gfid = os.path.basename(pgfid) +    return out_path + + +def is_host_local(host): +    """ +    Find if a host is local or not. +    Code copied from $GLUSTERFS/geo-replication/syncdaemon/syncdutils.py +    """ +    locaddr = False +    for ai in socket.getaddrinfo(host, None): +        # cf. http://github.com/gluster/glusterfs/blob/ce111f47/xlators +        # /mgmt/glusterd/src/glusterd-utils.c#L125 +        if ai[0] == socket.AF_INET: +            if ai[-1][0].split(".")[0] == "127": +                locaddr = True +                break +        elif ai[0] == socket.AF_INET6: +            if ai[-1][0] == "::1": +                locaddr = True +                break +        else: +            continue +        try: +            # use ICMP socket to avoid net.ipv4.ip_nonlocal_bind issue, +            # cf. https://bugzilla.redhat.com/show_bug.cgi?id=890587 +            s = socket.socket(ai[0], socket.SOCK_RAW, socket.IPPROTO_ICMP) +        except socket.error: +            ex = sys.exc_info()[1] +            if ex.errno != EPERM: +                raise +            f = None +            try: +                f = open("/proc/sys/net/ipv4/ip_nonlocal_bind") +                if int(f.read()) != 0: +                    logger.warning("non-local bind is set and not " +                                   "allowed to create " +                                   "raw sockets, cannot determine " +                                   "if %s is local" % host) +                    return False +                s = socket.socket(ai[0], socket.SOCK_DGRAM) +            finally: +                if f: +                    f.close() +        try: +            s.bind(ai[-1]) +            locaddr = True +            break +        except: +            pass +        s.close() +    return locaddr  | 
