summaryrefslogtreecommitdiffstats
path: root/tools
diff options
context:
space:
mode:
Diffstat (limited to 'tools')
-rw-r--r--tools/Makefile.am3
-rw-r--r--tools/gfind_missing_files/Makefile.am32
-rw-r--r--tools/gfind_missing_files/gcrawler.c581
-rw-r--r--tools/gfind_missing_files/gfid_to_path.py162
-rw-r--r--tools/gfind_missing_files/gfid_to_path.sh43
-rw-r--r--tools/gfind_missing_files/gfind_missing_files.sh119
-rw-r--r--tools/glusterfind/Makefile.am24
-rwxr-xr-xtools/glusterfind/S57glusterfind-delete-post.py69
-rw-r--r--tools/glusterfind/glusterfind.in18
-rw-r--r--tools/glusterfind/src/Makefile.am16
-rw-r--r--tools/glusterfind/src/__init__.py9
-rw-r--r--tools/glusterfind/src/brickfind.py118
-rw-r--r--tools/glusterfind/src/changelog.py469
-rw-r--r--tools/glusterfind/src/changelogdata.py440
-rw-r--r--tools/glusterfind/src/conf.py31
-rw-r--r--tools/glusterfind/src/gfind_py2py3.py88
-rw-r--r--tools/glusterfind/src/libgfchangelog.py92
-rw-r--r--tools/glusterfind/src/main.py921
-rw-r--r--tools/glusterfind/src/nodeagent.py139
-rw-r--r--tools/glusterfind/src/tool.conf.in10
-rw-r--r--tools/glusterfind/src/utils.py267
-rw-r--r--tools/setgfid2path/Makefile.am5
-rw-r--r--tools/setgfid2path/gluster-setgfid2path.854
-rw-r--r--tools/setgfid2path/src/Makefile.am16
-rw-r--r--tools/setgfid2path/src/main.c130
25 files changed, 3856 insertions, 0 deletions
diff --git a/tools/Makefile.am b/tools/Makefile.am
new file mode 100644
index 00000000000..5808a3728cd
--- /dev/null
+++ b/tools/Makefile.am
@@ -0,0 +1,3 @@
+SUBDIRS = gfind_missing_files glusterfind setgfid2path
+
+CLEANFILES =
diff --git a/tools/gfind_missing_files/Makefile.am b/tools/gfind_missing_files/Makefile.am
new file mode 100644
index 00000000000..181fe7091f3
--- /dev/null
+++ b/tools/gfind_missing_files/Makefile.am
@@ -0,0 +1,32 @@
+gfindmissingfilesdir = $(GLUSTERFS_LIBEXECDIR)/gfind_missing_files
+
+if WITH_SERVER
+gfindmissingfiles_SCRIPTS = gfind_missing_files.sh gfid_to_path.sh \
+ gfid_to_path.py
+endif
+
+EXTRA_DIST = gfind_missing_files.sh gfid_to_path.sh \
+ gfid_to_path.py
+
+if WITH_SERVER
+gfindmissingfiles_PROGRAMS = gcrawler
+endif
+
+gcrawler_SOURCES = gcrawler.c
+gcrawler_LDADD = $(top_builddir)/libglusterfs/src/libglusterfs.la
+gcrawler_LDFLAGS = $(GF_LDFLAGS)
+
+AM_CPPFLAGS = $(GF_CPPFLAGS) -I$(top_srcdir)/libglusterfs/src
+
+AM_CFLAGS = -Wall $(GF_CFLAGS)
+
+if WITH_SERVER
+uninstall-local:
+ rm -f $(DESTDIR)$(sbindir)/gfind_missing_files
+
+install-data-local:
+ rm -f $(DESTDIR)$(sbindir)/gfind_missing_files
+ ln -s $(GLUSTERFS_LIBEXECDIR)/gfind_missing_files/gfind_missing_files.sh $(DESTDIR)$(sbindir)/gfind_missing_files
+endif
+
+CLEANFILES =
diff --git a/tools/gfind_missing_files/gcrawler.c b/tools/gfind_missing_files/gcrawler.c
new file mode 100644
index 00000000000..4acbe92bc8f
--- /dev/null
+++ b/tools/gfind_missing_files/gcrawler.c
@@ -0,0 +1,581 @@
+/*
+ Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com>
+ This file is part of GlusterFS.
+
+ This file is licensed to you under your choice of the GNU Lesser
+ General Public License, version 3 or any later version (LGPLv3 or
+ later), or the GNU General Public License, version 2 (GPLv2), in all
+ cases as published by the Free Software Foundation.
+*/
+
+#include <stdio.h>
+#include <errno.h>
+#include <sys/stat.h>
+#include <unistd.h>
+#include <stdlib.h>
+#include <string.h>
+#include <dirent.h>
+#include <assert.h>
+#include <glusterfs/locking.h>
+
+#include <glusterfs/compat.h>
+#include <glusterfs/list.h>
+#include <glusterfs/syscall.h>
+
+#define THREAD_MAX 32
+#define BUMP(name) INC(name, 1)
+#define DEFAULT_WORKERS 4
+
+#define NEW(x) \
+ { \
+ x = calloc(1, sizeof(typeof(*x))); \
+ }
+
+#define err(x...) fprintf(stderr, x)
+#define out(x...) fprintf(stdout, x)
+#define dbg(x...) \
+ do { \
+ if (debug) \
+ fprintf(stdout, x); \
+ } while (0)
+#define tout(x...) \
+ do { \
+ out("[%ld] ", pthread_self()); \
+ out(x); \
+ } while (0)
+#define terr(x...) \
+ do { \
+ err("[%ld] ", pthread_self()); \
+ err(x); \
+ } while (0)
+#define tdbg(x...) \
+ do { \
+ dbg("[%ld] ", pthread_self()); \
+ dbg(x); \
+ } while (0)
+
+int debug = 0;
+const char *slavemnt = NULL;
+int workers = 0;
+
+struct stats {
+ unsigned long long int cnt_skipped_gfids;
+};
+
+pthread_spinlock_t stats_lock;
+
+struct stats stats_total;
+int stats = 0;
+
+#define INC(name, val) \
+ do { \
+ if (!stats) \
+ break; \
+ pthread_spin_lock(&stats_lock); \
+ { \
+ stats_total.cnt_##name += val; \
+ } \
+ pthread_spin_unlock(&stats_lock); \
+ } while (0)
+
+void
+stats_dump()
+{
+ if (!stats)
+ return;
+
+ out("-------------------------------------------\n");
+ out("Skipped_Files : %10lld\n", stats_total.cnt_skipped_gfids);
+ out("-------------------------------------------\n");
+}
+
+struct dirjob {
+ struct list_head list;
+
+ char *dirname;
+
+ struct dirjob *parent;
+ int ret; /* final status of this subtree */
+ int refcnt; /* how many dirjobs have this as parent */
+
+ pthread_spinlock_t lock;
+};
+
+struct xwork {
+ pthread_t cthreads[THREAD_MAX]; /* crawler threads */
+ int count;
+ int idle;
+ int stop;
+
+ struct dirjob crawl;
+
+ struct dirjob *rootjob; /* to verify completion in xwork_fini() */
+
+ pthread_mutex_t mutex;
+ pthread_cond_t cond;
+};
+
+struct dirjob *
+dirjob_ref(struct dirjob *job)
+{
+ pthread_spin_lock(&job->lock);
+ {
+ job->refcnt++;
+ }
+ pthread_spin_unlock(&job->lock);
+
+ return job;
+}
+
+void
+dirjob_free(struct dirjob *job)
+{
+ assert(list_empty(&job->list));
+
+ pthread_spin_destroy(&job->lock);
+ free(job->dirname);
+ free(job);
+}
+
+void
+dirjob_ret(struct dirjob *job, int err)
+{
+ int ret = 0;
+ int refcnt = 0;
+ struct dirjob *parent = NULL;
+
+ pthread_spin_lock(&job->lock);
+ {
+ refcnt = --job->refcnt;
+ job->ret = (job->ret || err);
+ }
+ pthread_spin_unlock(&job->lock);
+
+ if (refcnt == 0) {
+ ret = job->ret;
+
+ if (ret)
+ terr("Failed: %s (%d)\n", job->dirname, ret);
+ else
+ tdbg("Finished: %s\n", job->dirname);
+
+ parent = job->parent;
+ if (parent)
+ dirjob_ret(parent, ret);
+
+ dirjob_free(job);
+ job = NULL;
+ }
+}
+
+struct dirjob *
+dirjob_new(const char *dir, struct dirjob *parent)
+{
+ struct dirjob *job = NULL;
+
+ NEW(job);
+ if (!job)
+ return NULL;
+
+ job->dirname = strdup(dir);
+ if (!job->dirname) {
+ free(job);
+ return NULL;
+ }
+
+ INIT_LIST_HEAD(&job->list);
+ pthread_spin_init(&job->lock, PTHREAD_PROCESS_PRIVATE);
+ job->ret = 0;
+
+ if (parent)
+ job->parent = dirjob_ref(parent);
+
+ job->refcnt = 1;
+
+ return job;
+}
+
+void
+xwork_addcrawl(struct xwork *xwork, struct dirjob *job)
+{
+ pthread_mutex_lock(&xwork->mutex);
+ {
+ list_add_tail(&job->list, &xwork->crawl.list);
+ pthread_cond_broadcast(&xwork->cond);
+ }
+ pthread_mutex_unlock(&xwork->mutex);
+}
+
+int
+xwork_add(struct xwork *xwork, const char *dir, struct dirjob *parent)
+{
+ struct dirjob *job = NULL;
+
+ job = dirjob_new(dir, parent);
+ if (!job)
+ return -1;
+
+ xwork_addcrawl(xwork, job);
+
+ return 0;
+}
+
+struct dirjob *
+xwork_pick(struct xwork *xwork, int block)
+{
+ struct dirjob *job = NULL;
+ struct list_head *head = NULL;
+
+ head = &xwork->crawl.list;
+
+ pthread_mutex_lock(&xwork->mutex);
+ {
+ for (;;) {
+ if (xwork->stop)
+ break;
+
+ if (!list_empty(head)) {
+ job = list_entry(head->next, typeof(*job), list);
+ list_del_init(&job->list);
+ break;
+ }
+
+ if (((xwork->count * 2) == xwork->idle) &&
+ list_empty(&xwork->crawl.list)) {
+ /* no outstanding jobs, and no
+ active workers
+ */
+ tdbg("Jobless. Terminating\n");
+ xwork->stop = 1;
+ pthread_cond_broadcast(&xwork->cond);
+ break;
+ }
+
+ if (!block)
+ break;
+
+ xwork->idle++;
+ pthread_cond_wait(&xwork->cond, &xwork->mutex);
+ xwork->idle--;
+ }
+ }
+ pthread_mutex_unlock(&xwork->mutex);
+
+ return job;
+}
+
+int
+skip_name(const char *dirname, const char *name)
+{
+ if (strcmp(name, ".") == 0)
+ return 1;
+
+ if (strcmp(name, "..") == 0)
+ return 1;
+
+ if (strcmp(name, "changelogs") == 0)
+ return 1;
+
+ if (strcmp(name, "health_check") == 0)
+ return 1;
+
+ if (strcmp(name, "indices") == 0)
+ return 1;
+
+ if (strcmp(name, "landfill") == 0)
+ return 1;
+
+ return 0;
+}
+
+int
+skip_stat(struct dirjob *job, const char *name)
+{
+ if (job == NULL)
+ return 0;
+
+ if (strcmp(job->dirname, ".glusterfs") == 0) {
+ tdbg(
+ "Directly adding directories under .glusterfs "
+ "to global list: %s\n",
+ name);
+ return 1;
+ }
+
+ if (job->parent != NULL) {
+ if (strcmp(job->parent->dirname, ".glusterfs") == 0) {
+ tdbg(
+ "Directly adding directories under .glusterfs/XX "
+ "to global list: %s\n",
+ name);
+ return 1;
+ }
+ }
+
+ return 0;
+}
+
+int
+xworker_do_crawl(struct xwork *xwork, struct dirjob *job)
+{
+ DIR *dirp = NULL;
+ int ret = -1;
+ int boff;
+ int plen;
+ char *path = NULL;
+ struct dirjob *cjob = NULL;
+ struct stat statbuf = {
+ 0,
+ };
+ struct dirent *entry;
+ struct dirent scratch[2] = {
+ {
+ 0,
+ },
+ };
+ char gfid_path[PATH_MAX] = {
+ 0,
+ };
+
+ plen = strlen(job->dirname) + 256 + 2;
+ path = alloca(plen);
+
+ tdbg("Entering: %s\n", job->dirname);
+
+ dirp = sys_opendir(job->dirname);
+ if (!dirp) {
+ terr("opendir failed on %s (%s)\n", job->dirname, strerror(errno));
+ goto out;
+ }
+
+ boff = sprintf(path, "%s/", job->dirname);
+
+ for (;;) {
+ errno = 0;
+ entry = sys_readdir(dirp, scratch);
+ if (!entry || errno != 0) {
+ if (errno != 0) {
+ err("readdir(%s): %s\n", job->dirname, strerror(errno));
+ ret = errno;
+ goto out;
+ }
+ break;
+ }
+
+ if (entry->d_ino == 0)
+ continue;
+
+ if (skip_name(job->dirname, entry->d_name))
+ continue;
+
+ /* It is sure that, children and grandchildren of .glusterfs
+ * are directories, just add them to global queue.
+ */
+ if (skip_stat(job, entry->d_name)) {
+ strncpy(path + boff, entry->d_name, (plen - boff));
+ cjob = dirjob_new(path, job);
+ if (!cjob) {
+ err("dirjob_new(%s): %s\n", path, strerror(errno));
+ ret = -1;
+ goto out;
+ }
+ xwork_addcrawl(xwork, cjob);
+ continue;
+ }
+
+ (void)snprintf(gfid_path, sizeof(gfid_path), "%s/.gfid/%s", slavemnt,
+ entry->d_name);
+ ret = sys_lstat(gfid_path, &statbuf);
+
+ if (ret && errno == ENOENT) {
+ out("%s\n", entry->d_name);
+ BUMP(skipped_gfids);
+ }
+
+ if (ret && errno != ENOENT) {
+ err("stat on slave failed(%s): %s\n", gfid_path, strerror(errno));
+ goto out;
+ }
+ }
+
+ ret = 0;
+out:
+ if (dirp)
+ (void)sys_closedir(dirp);
+
+ return ret;
+}
+
+void *
+xworker_crawl(void *data)
+{
+ struct xwork *xwork = data;
+ struct dirjob *job = NULL;
+ int ret = -1;
+
+ while ((job = xwork_pick(xwork, 0))) {
+ ret = xworker_do_crawl(xwork, job);
+ dirjob_ret(job, ret);
+ }
+
+ return NULL;
+}
+
+int
+xwork_fini(struct xwork *xwork, int stop)
+{
+ int i = 0;
+ int ret = 0;
+ void *tret = 0;
+
+ pthread_mutex_lock(&xwork->mutex);
+ {
+ xwork->stop = (xwork->stop || stop);
+ pthread_cond_broadcast(&xwork->cond);
+ }
+ pthread_mutex_unlock(&xwork->mutex);
+
+ for (i = 0; i < xwork->count; i++) {
+ pthread_join(xwork->cthreads[i], &tret);
+ tdbg("CThread id %ld returned %p\n", xwork->cthreads[i], tret);
+ }
+
+ if (debug) {
+ assert(xwork->rootjob->refcnt == 1);
+ dirjob_ret(xwork->rootjob, 0);
+ }
+
+ if (stats)
+ pthread_spin_destroy(&stats_lock);
+
+ return ret;
+}
+
+int
+xwork_init(struct xwork *xwork, int count)
+{
+ int i = 0;
+ int ret = 0;
+ struct dirjob *rootjob = NULL;
+
+ if (stats)
+ pthread_spin_init(&stats_lock, PTHREAD_PROCESS_PRIVATE);
+
+ pthread_mutex_init(&xwork->mutex, NULL);
+ pthread_cond_init(&xwork->cond, NULL);
+
+ INIT_LIST_HEAD(&xwork->crawl.list);
+
+ rootjob = dirjob_new(".glusterfs", NULL);
+ if (debug)
+ xwork->rootjob = dirjob_ref(rootjob);
+
+ xwork_addcrawl(xwork, rootjob);
+
+ xwork->count = count;
+ for (i = 0; i < count; i++) {
+ ret = pthread_create(&xwork->cthreads[i], NULL, xworker_crawl, xwork);
+ if (ret)
+ break;
+ tdbg("Spawned crawler %d thread %ld\n", i, xwork->cthreads[i]);
+ }
+
+ return ret;
+}
+
+int
+xfind(const char *basedir)
+{
+ struct xwork xwork;
+ int ret = 0;
+ char *cwd = NULL;
+
+ ret = chdir(basedir);
+ if (ret) {
+ err("%s: %s\n", basedir, strerror(errno));
+ return ret;
+ }
+
+ cwd = getcwd(0, 0);
+ if (!cwd) {
+ err("getcwd(): %s\n", strerror(errno));
+ return -1;
+ }
+
+ tdbg("Working directory: %s\n", cwd);
+ free(cwd);
+
+ memset(&xwork, 0, sizeof(xwork));
+
+ ret = xwork_init(&xwork, workers);
+ if (ret == 0)
+ xworker_crawl(&xwork);
+
+ ret = xwork_fini(&xwork, ret);
+ stats_dump();
+
+ return ret;
+}
+
+static char *
+parse_and_validate_args(int argc, char *argv[])
+{
+ char *basedir = NULL;
+ struct stat d = {
+ 0,
+ };
+ int ret = -1;
+#ifndef __FreeBSD__
+ unsigned char volume_id[16];
+#endif /* __FreeBSD__ */
+ char *slv_mnt = NULL;
+
+ if (argc != 4) {
+ err("Usage: %s <DIR> <SLAVE-VOL-MOUNT> <CRAWL-THREAD-COUNT>\n",
+ argv[0]);
+ return NULL;
+ }
+
+ basedir = argv[1];
+ ret = sys_lstat(basedir, &d);
+ if (ret) {
+ err("%s: %s\n", basedir, strerror(errno));
+ return NULL;
+ }
+
+#ifndef __FreeBSD__
+ ret = sys_lgetxattr(basedir, "trusted.glusterfs.volume-id", volume_id, 16);
+ if (ret != 16) {
+ err("%s:Not a valid brick path.\n", basedir);
+ return NULL;
+ }
+#endif /* __FreeBSD__ */
+
+ slv_mnt = argv[2];
+ ret = sys_lstat(slv_mnt, &d);
+ if (ret) {
+ err("%s: %s\n", slv_mnt, strerror(errno));
+ return NULL;
+ }
+ slavemnt = argv[2];
+
+ workers = atoi(argv[3]);
+ if (workers <= 0)
+ workers = DEFAULT_WORKERS;
+
+ return basedir;
+}
+
+int
+main(int argc, char *argv[])
+{
+ char *basedir = NULL;
+
+ basedir = parse_and_validate_args(argc, argv);
+ if (!basedir)
+ return 1;
+
+ xfind(basedir);
+
+ return 0;
+}
diff --git a/tools/gfind_missing_files/gfid_to_path.py b/tools/gfind_missing_files/gfid_to_path.py
new file mode 100644
index 00000000000..01e08a9494a
--- /dev/null
+++ b/tools/gfind_missing_files/gfid_to_path.py
@@ -0,0 +1,162 @@
+#!/usr/bin/python3
+
+# Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com/>
+# This file is part of GlusterFS.
+#
+# This file is licensed to you under your choice of the GNU Lesser
+# General Public License, version 3 or any later version (LGPLv3 or
+# later), or the GNU General Public License, version 2 (GPLv2), in all
+# cases as published by the Free Software Foundation.
+
+import sys
+import os
+import xattr
+import uuid
+import re
+import errno
+
+CHANGELOG_SEARCH_MAX_TRY = 31
+DEC_CTIME_START = 5
+ROOT_GFID = "00000000-0000-0000-0000-000000000001"
+MAX_NUM_CHANGELOGS_TRY = 2
+
+
+def output_not_found(gfid):
+ # Write GFID to stderr
+ sys.stderr.write("%s\n" % gfid)
+
+
+def output_success(path):
+ # Write converted Path to Stdout
+ sys.stdout.write("%s\n" % path)
+
+
+def full_dir_path(gfid):
+ out_path = ""
+ while True:
+ path = os.path.join(".glusterfs", gfid[0:2], gfid[2:4], gfid)
+ path_readlink = os.readlink(path)
+ pgfid = os.path.dirname(path_readlink)
+ out_path = os.path.join(os.path.basename(path_readlink), out_path)
+ if pgfid == "../../00/00/%s" % ROOT_GFID:
+ out_path = os.path.join("./", out_path)
+ break
+ gfid = os.path.basename(pgfid)
+ return out_path
+
+
+def find_path_from_changelog(fd, gfid):
+ """
+ In given Changelog File, finds using following pattern
+ <T><GFID>\x00<TYPE>\x00<MODE>\x00<UID>\x00<GID>\x00<PARGFID>/<BASENAME>
+ Pattern search finds PARGFID and BASENAME, Convert PARGFID to Path
+ Using readlink and add basename to form Full path.
+ """
+ content = fd.read()
+
+ pattern = "E%s" % gfid
+ pattern += "\x00(3|23)\x00\d+\x00\d+\x00\d+\x00([^\x00]+)/([^\x00]+)"
+ pat = re.compile(pattern)
+ match = pat.search(content)
+
+ if match:
+ pgfid = match.group(2)
+ basename = match.group(3)
+ if pgfid == ROOT_GFID:
+ return os.path.join("./", basename)
+ else:
+ full_path_parent = full_dir_path(pgfid)
+ if full_path_parent:
+ return os.path.join(full_path_parent, basename)
+
+ return None
+
+
+def gfid_to_path(gfid):
+ """
+ Try readlink, if it is directory it succeeds.
+ Get ctime of the GFID file, Decrement by 5 sec
+ Search for Changelog filename, Since Changelog file generated
+ every 15 sec, Search and get immediate next Changelog after the file
+ Creation. Get the Path by searching in Changelog file.
+ Get the resultant file's GFID and Compare with the input, If these
+ GFIDs are different then Some thing is changed(May be Rename)
+ """
+ gfid = gfid.strip()
+ gpath = os.path.join(".glusterfs", gfid[0:2], gfid[2:4], gfid)
+ try:
+ output_success(full_dir_path(gfid))
+ return
+ except OSError:
+ # Not an SymLink
+ pass
+
+ try:
+ ctime = int(os.stat(gpath).st_ctime)
+ ctime -= DEC_CTIME_START
+ except (OSError, IOError):
+ output_not_found(gfid)
+ return
+
+ path = None
+ found_changelog = False
+ changelog_parse_try = 0
+ for i in range(CHANGELOG_SEARCH_MAX_TRY):
+ cl = os.path.join(".glusterfs/changelogs", "CHANGELOG.%s" % ctime)
+
+ try:
+ with open(cl, "rb") as f:
+ changelog_parse_try += 1
+ found_changelog = True
+ path = find_path_from_changelog(f, gfid)
+ if not path and changelog_parse_try < MAX_NUM_CHANGELOGS_TRY:
+ ctime += 1
+ continue
+ break
+ except (IOError, OSError) as e:
+ if e.errno == errno.ENOENT:
+ ctime += 1
+ else:
+ break
+
+ if not found_changelog:
+ output_not_found(gfid)
+ return
+
+ if not path:
+ output_not_found(gfid)
+ return
+ gfid1 = str(uuid.UUID(bytes=xattr.get(path, "trusted.gfid")))
+ if gfid != gfid1:
+ output_not_found(gfid)
+ return
+
+ output_success(path)
+
+
+def main():
+ num_arguments = 3
+ if not sys.stdin.isatty():
+ num_arguments = 2
+
+ if len(sys.argv) != num_arguments:
+ sys.stderr.write("Invalid arguments\nUsage: "
+ "%s <BRICK_PATH> <GFID_FILE>\n" % sys.argv[0])
+ sys.exit(1)
+
+ path = sys.argv[1]
+
+ if sys.stdin.isatty():
+ gfid_list = os.path.abspath(sys.argv[2])
+ os.chdir(path)
+ with open(gfid_list) as f:
+ for gfid in f:
+ gfid_to_path(gfid)
+ else:
+ os.chdir(path)
+ for gfid in sys.stdin:
+ gfid_to_path(gfid)
+
+
+if __name__ == "__main__":
+ main()
diff --git a/tools/gfind_missing_files/gfid_to_path.sh b/tools/gfind_missing_files/gfid_to_path.sh
new file mode 100644
index 00000000000..ebe817ac2f3
--- /dev/null
+++ b/tools/gfind_missing_files/gfid_to_path.sh
@@ -0,0 +1,43 @@
+#!/bin/sh
+
+## Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com/>
+## This file is part of GlusterFS.
+##
+## This file is licensed to you under your choice of the GNU Lesser
+## General Public License, version 3 or any later version (LGPLv3 or
+## later), or the GNU General Public License, version 2 (GPLv2), in all
+## cases as published by the Free Software Foundation.
+
+E_BADARGS=65
+
+
+gfid_to_path ()
+{
+ brick_dir=$1;
+ gfid_file=$(readlink -e $2);
+
+ current_dir=$(pwd);
+ cd $brick_dir;
+
+ while read gfid
+ do
+ to_search=`echo .glusterfs/${gfid:0:2}"/"${gfid:2:2}"/"$gfid`;
+ find . -samefile $to_search | grep -v $to_search;
+ done < $gfid_file;
+
+ cd $current_dir;
+}
+
+
+main ()
+{
+ if [ $# -ne 2 ]
+ then
+ echo "Usage: `basename $0` BRICK_DIR GFID_FILE";
+ exit $E_BADARGS;
+ fi
+
+ gfid_to_path $1 $2;
+}
+
+main "$@";
diff --git a/tools/gfind_missing_files/gfind_missing_files.sh b/tools/gfind_missing_files/gfind_missing_files.sh
new file mode 100644
index 00000000000..e7aaa0b5dd4
--- /dev/null
+++ b/tools/gfind_missing_files/gfind_missing_files.sh
@@ -0,0 +1,119 @@
+#!/bin/sh
+
+## Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com/>
+## This file is part of GlusterFS.
+##
+## This file is licensed to you under your choice of the GNU Lesser
+## General Public License, version 3 or any later version (LGPLv3 or
+## later), or the GNU General Public License, version 2 (GPLv2), in all
+## cases as published by the Free Software Foundation.
+
+BRICKPATH= #Brick path of gluster volume
+SLAVEHOST= #Slave hostname
+SLAVEVOL= #Slave volume
+SLAVEMNT= #Slave gluster volume mount point
+WORKERS=4 #Default number of worker threads
+
+out()
+{
+ echo "$@";
+}
+
+fatal()
+{
+ out FATAL "$@";
+ exit 1
+}
+
+ping_host ()
+{
+ ### Use bash internal socket support
+ {
+ exec 400<>/dev/tcp/$1/$2
+ if [ $? -ne '0' ]; then
+ return 1;
+ else
+ exec 400>&-
+ return 0;
+ fi
+ } 1>&2 2>/dev/null
+}
+
+mount_slave()
+{
+ local i; # inode number
+ SSH_PORT=22
+
+ SLAVEMNT=`mktemp -d`
+ [ "x$SLAVEMNT" = "x" ] && fatal "Could not mktemp directory";
+ [ -d "$SLAVEMNT" ] || fatal "$SLAVEMNT not a directory";
+
+ ping_host ${SLAVEHOST} $SSH_PORT
+ if [ $? -ne 0 ]; then
+ echo "$SLAVEHOST not reachable.";
+ exit 1;
+ fi;
+
+ glusterfs --volfile-id=$SLAVEVOL --aux-gfid-mount --volfile-server=$SLAVEHOST $SLAVEMNT;
+ i=$(stat -c '%i' $SLAVEMNT);
+ [ "x$i" = "x1" ] || fatal "Could not mount volume $2 on $SLAVEMNT Please check host and volume exists";
+}
+
+parse_cli()
+{
+ if [ "$#" -ne 4 ]; then
+ echo "Usage: gfind_missing_files <brick-path> <slave-host> <slave-vol> <OUTFILE>"
+ exit 1
+ else
+ BRICKPATH=$1;
+ SLAVEHOST=$2;
+ SLAVEVOL=$3;
+ OUTFILE=$4;
+
+ mount_slave;
+ echo "Slave volume is mounted at ${SLAVEMNT}"
+ echo
+ fi
+}
+
+main()
+{
+ parse_cli "$@";
+
+ echo "Calling crawler...";
+ path=$(readlink -e $0)
+ $(dirname $path)/gcrawler ${BRICKPATH} ${SLAVEMNT} ${WORKERS} > ${OUTFILE}
+
+ #Clean up the mount
+ umount $SLAVEMNT;
+ rmdir $SLAVEMNT;
+
+ echo "Crawl Complete."
+ num_files_missing=$(wc -l ${OUTFILE} | awk '{print $1}')
+ if [ $num_files_missing -eq 0 ]
+ then
+ echo "Total Missing File Count : 0"
+ exit 0;
+ fi
+
+ echo "gfids of skipped files are available in the file ${OUTFILE}"
+ echo
+ echo "Starting gfid to path conversion"
+
+ #Call python script to convert gfids to full pathname
+ INFILE=$(readlink -e ${OUTFILE})
+ python $(dirname $path)/gfid_to_path.py ${BRICKPATH} ${INFILE} 1> ${OUTFILE}_pathnames 2> ${OUTFILE}_gfids
+ echo "Path names of skipped files are available in the file ${OUTFILE}_pathnames"
+
+ gfid_to_path_failures=$(wc -l ${OUTFILE}_gfids | awk '{print $1}')
+ if [ $gfid_to_path_failures -gt 0 ]
+ then
+ echo "WARNING: Unable to convert some GFIDs to Paths, GFIDs logged to ${OUTFILE}_gfids"
+ echo "Use $(dirname $path)/gfid_to_path.sh <brick-path> ${OUTFILE}_gfids to convert those GFIDs to Path"
+ fi
+
+ #Output
+ echo "Total Missing File Count : $(wc -l ${OUTFILE} | awk '{print $1}')"
+}
+
+main "$@";
diff --git a/tools/glusterfind/Makefile.am b/tools/glusterfind/Makefile.am
new file mode 100644
index 00000000000..f17dbdb228e
--- /dev/null
+++ b/tools/glusterfind/Makefile.am
@@ -0,0 +1,24 @@
+SUBDIRS = src
+
+EXTRA_DIST = S57glusterfind-delete-post.py glusterfind
+
+if WITH_SERVER
+bin_SCRIPTS = glusterfind
+endif
+
+CLEANFILES = $(bin_SCRIPTS)
+
+if WITH_SERVER
+deletehookscriptsdir = $(GLUSTERFS_LIBEXECDIR)/glusterfind/
+deletehookscripts_SCRIPTS = S57glusterfind-delete-post.py
+
+uninstall-local:
+ rm -f $(DESTDIR)$(GLUSTERD_WORKDIR)/hooks/1/delete/post/S57glusterfind-delete-post
+
+install-data-local:
+ $(mkdir_p) $(DESTDIR)$(GLUSTERD_WORKDIR)/glusterfind/.keys
+ $(mkdir_p) $(DESTDIR)$(GLUSTERD_WORKDIR)/hooks/1/delete/post/
+ rm -f $(DESTDIR)$(GLUSTERD_WORKDIR)/hooks/1/delete/post/S57glusterfind-delete-post
+ ln -s $(GLUSTERFS_LIBEXECDIR)/glusterfind/S57glusterfind-delete-post.py \
+ $(DESTDIR)$(GLUSTERD_WORKDIR)/hooks/1/delete/post/S57glusterfind-delete-post
+endif
diff --git a/tools/glusterfind/S57glusterfind-delete-post.py b/tools/glusterfind/S57glusterfind-delete-post.py
new file mode 100755
index 00000000000..5beece220f0
--- /dev/null
+++ b/tools/glusterfind/S57glusterfind-delete-post.py
@@ -0,0 +1,69 @@
+#!/usr/bin/python3
+import os
+import shutil
+from errno import ENOENT
+from subprocess import Popen, PIPE
+from argparse import ArgumentParser
+
+
+DEFAULT_GLUSTERD_WORKDIR = "/var/lib/glusterd"
+
+
+def handle_rm_error(func, path, exc_info):
+ if exc_info[1].errno == ENOENT:
+ return
+
+ raise exc_info[1]
+
+
+def get_glusterd_workdir():
+ p = Popen(["gluster", "system::", "getwd"],
+ stdout=PIPE, stderr=PIPE, universal_newlines=True)
+
+ out, _ = p.communicate()
+
+ if p.returncode == 0:
+ return out.strip()
+ else:
+ return DEFAULT_GLUSTERD_WORKDIR
+
+
+def get_args():
+ parser = ArgumentParser(description="Volume delete post hook script")
+ parser.add_argument("--volname")
+ return parser.parse_args()
+
+
+def main():
+ args = get_args()
+ glusterfind_dir = os.path.join(get_glusterd_workdir(), "glusterfind")
+
+ # Check all session directories, if any directory found for
+ # the deleted volume, cleanup all the session directories
+ try:
+ ls_glusterfind_dir = os.listdir(glusterfind_dir)
+ except OSError:
+ ls_glusterfind_dir = []
+
+ for session in ls_glusterfind_dir:
+ # don't blow away the keys directory
+ if session == ".keys":
+ continue
+
+ # Possible session directory
+ volume_session_path = os.path.join(glusterfind_dir,
+ session,
+ args.volname)
+ if os.path.exists(volume_session_path):
+ shutil.rmtree(volume_session_path, onerror=handle_rm_error)
+
+ # Try to Remove directory, if any other dir exists for different
+ # volume, then rmdir will fail with ENOTEMPTY which is fine
+ try:
+ os.rmdir(os.path.join(glusterfind_dir, session))
+ except (OSError, IOError):
+ pass
+
+
+if __name__ == "__main__":
+ main()
diff --git a/tools/glusterfind/glusterfind.in b/tools/glusterfind/glusterfind.in
new file mode 100644
index 00000000000..ca154b625dd
--- /dev/null
+++ b/tools/glusterfind/glusterfind.in
@@ -0,0 +1,18 @@
+#!/usr/bin/python3
+
+# Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com/>
+# This file is part of GlusterFS.
+#
+# This file is licensed to you under your choice of the GNU Lesser
+# General Public License, version 3 or any later version (LGPLv3 or
+# later), or the GNU General Public License, version 2 (GPLv2), in all
+# cases as published by the Free Software Foundation.
+
+import sys
+sys.path.insert(1, '@GLUSTERFS_LIBEXECDIR@/')
+sys.path.insert(1, '@GLUSTERFS_LIBEXECDIR@/glusterfind')
+
+from glusterfind.main import main
+
+if __name__ == "__main__":
+ main()
diff --git a/tools/glusterfind/src/Makefile.am b/tools/glusterfind/src/Makefile.am
new file mode 100644
index 00000000000..43b6141b01c
--- /dev/null
+++ b/tools/glusterfind/src/Makefile.am
@@ -0,0 +1,16 @@
+glusterfinddir = $(GLUSTERFS_LIBEXECDIR)/glusterfind
+
+if WITH_SERVER
+glusterfind_PYTHON = conf.py utils.py __init__.py \
+ main.py libgfchangelog.py changelogdata.py gfind_py2py3.py
+
+glusterfind_SCRIPTS = changelog.py nodeagent.py \
+ brickfind.py
+
+glusterfind_DATA = tool.conf
+endif
+
+EXTRA_DIST = changelog.py nodeagent.py brickfind.py \
+ tool.conf changelogdata.py
+
+CLEANFILES =
diff --git a/tools/glusterfind/src/__init__.py b/tools/glusterfind/src/__init__.py
new file mode 100644
index 00000000000..1753698b5fa
--- /dev/null
+++ b/tools/glusterfind/src/__init__.py
@@ -0,0 +1,9 @@
+# -*- coding: utf-8 -*-
+
+# Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com/>
+# This file is part of GlusterFS.
+#
+# This file is licensed to you under your choice of the GNU Lesser
+# General Public License, version 3 or any later version (LGPLv3 or
+# later), or the GNU General Public License, version 2 (GPLv2), in all
+# cases as published by the Free Software Foundation.
diff --git a/tools/glusterfind/src/brickfind.py b/tools/glusterfind/src/brickfind.py
new file mode 100644
index 00000000000..73b6350188d
--- /dev/null
+++ b/tools/glusterfind/src/brickfind.py
@@ -0,0 +1,118 @@
+#!/usr/bin/python3
+# -*- coding: utf-8 -*-
+
+# Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com/>
+# This file is part of GlusterFS.
+#
+# This file is licensed to you under your choice of the GNU Lesser
+# General Public License, version 3 or any later version (LGPLv3 or
+# later), or the GNU General Public License, version 2 (GPLv2), in all
+# cases as published by the Free Software Foundation.
+
+import os
+import sys
+import logging
+from argparse import ArgumentParser, RawDescriptionHelpFormatter
+try:
+ import urllib.parse as urllib
+except ImportError:
+ import urllib
+import time
+
+from utils import mkdirp, setup_logger, create_file, output_write, find
+import conf
+
+
+PROG_DESCRIPTION = """
+Changelog Crawler
+"""
+
+logger = logging.getLogger()
+
+
+def brickfind_crawl(brick, args):
+ if brick.endswith("/"):
+ brick = brick[0:len(brick)-1]
+
+ working_dir = os.path.dirname(args.outfile)
+ mkdirp(working_dir, exit_on_err=True, logger=logger)
+ create_file(args.outfile, exit_on_err=True, logger=logger)
+
+ with open(args.outfile, "a+") as fout:
+ brick_path_len = len(brick)
+
+ def output_callback(path, filter_result, is_dir):
+ path = path.strip()
+ path = path[brick_path_len+1:]
+
+ if args.type == "both":
+ output_write(fout, path, args.output_prefix,
+ encode=(not args.no_encode), tag=args.tag,
+ field_separator=args.field_separator)
+ else:
+ if (is_dir and args.type == "d") or (
+ (not is_dir) and args.type == "f"):
+ output_write(fout, path, args.output_prefix,
+ encode=(not args.no_encode), tag=args.tag,
+ field_separator=args.field_separator)
+
+ ignore_dirs = [os.path.join(brick, dirname)
+ for dirname in
+ conf.get_opt("brick_ignore_dirs").split(",")]
+
+ find(brick, callback_func=output_callback,
+ ignore_dirs=ignore_dirs)
+
+ fout.flush()
+ os.fsync(fout.fileno())
+
+
+def _get_args():
+ parser = ArgumentParser(formatter_class=RawDescriptionHelpFormatter,
+ description=PROG_DESCRIPTION)
+
+ parser.add_argument("session", help="Session Name")
+ parser.add_argument("volume", help="Volume Name")
+ parser.add_argument("node", help="Node Name")
+ parser.add_argument("brick", help="Brick Name")
+ parser.add_argument("outfile", help="Output File")
+ parser.add_argument("tag", help="Tag to prefix file name with")
+ parser.add_argument("--only-query", help="Only query, No session update",
+ action="store_true")
+ parser.add_argument("--debug", help="Debug", action="store_true")
+ parser.add_argument("--no-encode",
+ help="Do not encode path in outfile",
+ action="store_true")
+ parser.add_argument("--output-prefix", help="File prefix in output",
+ default=".")
+ parser.add_argument('--type', help="type: f, f-files only"
+ " d, d-directories only, by default = both",
+ default='both')
+ parser.add_argument("--field-separator", help="Field separator",
+ default=" ")
+
+ return parser.parse_args()
+
+
+if __name__ == "__main__":
+ args = _get_args()
+ session_dir = os.path.join(conf.get_opt("session_dir"), args.session)
+ status_file = os.path.join(session_dir, args.volume,
+ "%s.status" % urllib.quote_plus(args.brick))
+ status_file_pre = status_file + ".pre"
+ mkdirp(os.path.join(session_dir, args.volume), exit_on_err=True,
+ logger=logger)
+ mkdirp(os.path.join(conf.get_opt("log_dir"), args.session, args.volume),
+ exit_on_err=True)
+ log_file = os.path.join(conf.get_opt("log_dir"),
+ args.session,
+ args.volume,
+ "brickfind.log")
+ setup_logger(logger, log_file, args.debug)
+
+ time_to_update = int(time.time())
+ brickfind_crawl(args.brick, args)
+ if not args.only_query:
+ with open(status_file_pre, "w") as f:
+ f.write(str(time_to_update))
+ sys.exit(0)
diff --git a/tools/glusterfind/src/changelog.py b/tools/glusterfind/src/changelog.py
new file mode 100644
index 00000000000..a5e9ea4288f
--- /dev/null
+++ b/tools/glusterfind/src/changelog.py
@@ -0,0 +1,469 @@
+#!/usr/bin/python3
+# -*- coding: utf-8 -*-
+
+# Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com/>
+# This file is part of GlusterFS.
+#
+# This file is licensed to you under your choice of the GNU Lesser
+# General Public License, version 3 or any later version (LGPLv3 or
+# later), or the GNU General Public License, version 2 (GPLv2), in all
+# cases as published by the Free Software Foundation.
+
+import os
+import sys
+import time
+import xattr
+import logging
+from gfind_py2py3 import bytearray_to_str
+from argparse import ArgumentParser, RawDescriptionHelpFormatter
+import hashlib
+try:
+ import urllib.parse as urllib
+except ImportError:
+ import urllib
+import codecs
+
+import libgfchangelog
+from utils import mkdirp, symlink_gfid_to_path
+from utils import fail, setup_logger, find
+from utils import get_changelog_rollover_time
+from utils import output_path_prepare
+from changelogdata import ChangelogData
+import conf
+
+
+CHANGELOG_LOG_LEVEL = 9
+CHANGELOG_CONN_RETRIES = 5
+CHANGELOGAPI_NUM_WORKERS = 3
+PROG_DESCRIPTION = """
+Changelog Crawler
+"""
+history_turns = 0
+history_turn_time = 0
+
+logger = logging.getLogger()
+
+
+def pgfid_to_path(brick, changelog_data):
+ """
+ For all the pgfids in table, converts into path using recursive
+ readlink.
+ """
+ # pgfid1 to path1 in case of CREATE/MKNOD/MKDIR/LINK/SYMLINK
+ for row in changelog_data.gfidpath_get_distinct("pgfid1", {"path1": ""}):
+ # In case of Data/Metadata only, pgfid1 will not be there
+ if row[0] == "":
+ continue
+
+ try:
+ path = symlink_gfid_to_path(brick, row[0])
+ path = output_path_prepare(path, args)
+ changelog_data.gfidpath_set_path1(path, row[0])
+ except (IOError, OSError) as e:
+ logger.warn("Error converting to path: %s" % e)
+ continue
+
+ # pgfid2 to path2 in case of RENAME
+ for row in changelog_data.gfidpath_get_distinct("pgfid2",
+ {"type": "RENAME",
+ "path2": ""}):
+ # Only in case of Rename pgfid2 exists
+ if row[0] == "":
+ continue
+
+ try:
+ path = symlink_gfid_to_path(brick, row[0])
+ path = output_path_prepare(path, args)
+ changelog_data.gfidpath_set_path2(path, row[0])
+ except (IOError, OSError) as e:
+ logger.warn("Error converting to path: %s" % e)
+ continue
+
+
+def populate_pgfid_and_inodegfid(brick, changelog_data):
+ """
+ For all the DATA/METADATA modifications GFID,
+ If symlink, directly convert to Path using Readlink.
+ If not symlink, try to get PGFIDs via xattr query and populate it
+ to pgfid table, collect inodes in inodegfid table
+ """
+ for row in changelog_data.gfidpath_get({"path1": "", "type": "MODIFY"}):
+ gfid = row[3].strip()
+ p = os.path.join(brick, ".glusterfs", gfid[0:2], gfid[2:4], gfid)
+ if os.path.islink(p):
+ # It is a Directory if GFID backend path is symlink
+ try:
+ path = symlink_gfid_to_path(brick, gfid)
+ path = output_path_prepare(path, args)
+ changelog_data.gfidpath_update({"path1": path},
+ {"gfid": gfid})
+ except (IOError, OSError) as e:
+ logger.warn("Error converting to path: %s" % e)
+ continue
+ else:
+ try:
+ # INODE and GFID to inodegfid table
+ changelog_data.inodegfid_add(os.stat(p).st_ino, gfid)
+ file_xattrs = xattr.list(p)
+ for x in file_xattrs:
+ x_str = bytearray_to_str(x)
+ if x_str.startswith("trusted.pgfid."):
+ # PGFID in pgfid table
+ changelog_data.pgfid_add(x_str.split(".")[-1])
+ except (IOError, OSError):
+ # All OS Errors ignored, since failures will be logged
+ # in End. All GFIDs present in gfidpath table
+ continue
+
+
+def enum_hard_links_using_gfid2path(brick, gfid, args):
+ hardlinks = []
+ p = os.path.join(brick, ".glusterfs", gfid[0:2], gfid[2:4], gfid)
+ if not os.path.isdir(p):
+ # we have a symlink or a normal file
+ try:
+ file_xattrs = xattr.list(p)
+ for x in file_xattrs:
+ x_str = bytearray_to_str(x)
+ if x_str.startswith("trusted.gfid2path."):
+ # get the value for the xattr i.e. <PGFID>/<BN>
+ v = xattr.getxattr(p, x_str)
+ v_str = bytearray_to_str(v)
+ pgfid, bn = v_str.split(os.sep)
+ try:
+ path = symlink_gfid_to_path(brick, pgfid)
+ fullpath = os.path.join(path, bn)
+ fullpath = output_path_prepare(fullpath, args)
+ hardlinks.append(fullpath)
+ except (IOError, OSError) as e:
+ logger.warn("Error converting to path: %s" % e)
+ continue
+ except (IOError, OSError):
+ pass
+ return hardlinks
+
+
+def gfid_to_all_paths_using_gfid2path(brick, changelog_data, args):
+ path = ""
+ for row in changelog_data.gfidpath_get({"path1": "", "type": "MODIFY"}):
+ gfid = row[3].strip()
+ logger.debug("Processing gfid %s" % gfid)
+ hardlinks = enum_hard_links_using_gfid2path(brick, gfid, args)
+
+ path = ",".join(hardlinks)
+
+ changelog_data.gfidpath_update({"path1": path}, {"gfid": gfid})
+
+
+def gfid_to_path_using_pgfid(brick, changelog_data, args):
+ """
+ For all the pgfids collected, Converts to Path and
+ does readdir on those directories and looks up inodegfid
+ table for matching inode number.
+ """
+ populate_pgfid_and_inodegfid(brick, changelog_data)
+
+ # If no GFIDs needs conversion to Path
+ if not changelog_data.inodegfid_exists({"converted": 0}):
+ return
+
+ def inode_filter(path):
+ # Looks in inodegfid table, if exists returns
+ # inode number else None
+ try:
+ st = os.lstat(path)
+ except (OSError, IOError):
+ st = None
+
+ if st and changelog_data.inodegfid_exists({"inode": st.st_ino}):
+ return st.st_ino
+
+ return None
+
+ # Length of brick path, to remove from output path
+ brick_path_len = len(brick)
+
+ def output_callback(path, inode):
+ # For each path found, encodes it and updates path1
+ # Also updates converted flag in inodegfid table as 1
+ path = path.strip()
+ path = path[brick_path_len+1:]
+
+ path = output_path_prepare(path, args)
+
+ changelog_data.append_path1(path, inode)
+ changelog_data.inodegfid_update({"converted": 1}, {"inode": inode})
+
+ ignore_dirs = [os.path.join(brick, dirname)
+ for dirname in
+ conf.get_opt("brick_ignore_dirs").split(",")]
+
+ for row in changelog_data.pgfid_get():
+ try:
+ path = symlink_gfid_to_path(brick, row[0])
+ find(os.path.join(brick, path),
+ callback_func=output_callback,
+ filter_func=inode_filter,
+ ignore_dirs=ignore_dirs,
+ subdirs_crawl=False)
+ except (IOError, OSError) as e:
+ logger.warn("Error converting to path: %s" % e)
+ continue
+
+
+def gfid_to_path_using_batchfind(brick, changelog_data):
+ # If all the GFIDs converted using gfid_to_path_using_pgfid
+ if not changelog_data.inodegfid_exists({"converted": 0}):
+ return
+
+ def inode_filter(path):
+ # Looks in inodegfid table, if exists returns
+ # inode number else None
+ try:
+ st = os.lstat(path)
+ except (OSError, IOError):
+ st = None
+
+ if st and changelog_data.inodegfid_exists({"inode": st.st_ino}):
+ return st.st_ino
+
+ return None
+
+ # Length of brick path, to remove from output path
+ brick_path_len = len(brick)
+
+ def output_callback(path, inode):
+ # For each path found, encodes it and updates path1
+ # Also updates converted flag in inodegfid table as 1
+ path = path.strip()
+ path = path[brick_path_len+1:]
+ path = output_path_prepare(path, args)
+
+ changelog_data.append_path1(path, inode)
+
+ ignore_dirs = [os.path.join(brick, dirname)
+ for dirname in
+ conf.get_opt("brick_ignore_dirs").split(",")]
+
+ # Full Namespace Crawl
+ find(brick, callback_func=output_callback,
+ filter_func=inode_filter,
+ ignore_dirs=ignore_dirs)
+
+
+def parse_changelog_to_db(changelog_data, filename, args):
+ """
+ Parses a Changelog file and populates data in gfidpath table
+ """
+ with codecs.open(filename, encoding="utf-8") as f:
+ changelogfile = os.path.basename(filename)
+ for line in f:
+ data = line.strip().split(" ")
+ if data[0] == "E" and data[2] in ["CREATE", "MKNOD", "MKDIR"]:
+ # CREATE/MKDIR/MKNOD
+ changelog_data.when_create_mknod_mkdir(changelogfile, data)
+ elif data[0] in ["D", "M"]:
+ # DATA/META
+ if not args.only_namespace_changes:
+ changelog_data.when_data_meta(changelogfile, data)
+ elif data[0] == "E" and data[2] in ["LINK", "SYMLINK"]:
+ # LINK/SYMLINK
+ changelog_data.when_link_symlink(changelogfile, data)
+ elif data[0] == "E" and data[2] == "RENAME":
+ # RENAME
+ changelog_data.when_rename(changelogfile, data)
+ elif data[0] == "E" and data[2] in ["UNLINK", "RMDIR"]:
+ # UNLINK/RMDIR
+ changelog_data.when_unlink_rmdir(changelogfile, data)
+
+
+def get_changes(brick, hash_dir, log_file, start, end, args):
+ """
+ Makes use of libgfchangelog's history API to get changelogs
+ containing changes from start and end time. Further collects
+ the modified gfids from the changelogs and writes the list
+ of gfid to 'gfid_list' file.
+ """
+ session_dir = os.path.join(conf.get_opt("session_dir"),
+ args.session)
+ status_file = os.path.join(session_dir, args.volume,
+ "%s.status" % urllib.quote_plus(args.brick))
+
+ # Get previous session
+ try:
+ with open(status_file) as f:
+ start = int(f.read().strip())
+ except (ValueError, OSError, IOError):
+ start = args.start
+
+ try:
+ libgfchangelog.cl_init()
+ libgfchangelog.cl_register(brick, hash_dir, log_file,
+ CHANGELOG_LOG_LEVEL, CHANGELOG_CONN_RETRIES)
+ except libgfchangelog.ChangelogException as e:
+ fail("%s Changelog register failed: %s" % (brick, e), logger=logger)
+
+ # Output files to record GFIDs and GFID to Path failure GFIDs
+ changelog_data = ChangelogData(args.outfile, args)
+
+ # Changelogs path(Hard coded to BRICK/.glusterfs/changelogs
+ cl_path = os.path.join(brick, ".glusterfs/changelogs")
+
+ # Fail if History fails for requested Start and End
+ try:
+ actual_end = libgfchangelog.cl_history_changelog(
+ cl_path, start, end, CHANGELOGAPI_NUM_WORKERS)
+ except libgfchangelog.ChangelogException as e:
+ fail("%s: %s Historical Changelogs not available: %s" %
+ (args.node, brick, e), logger=logger)
+
+ logger.info("[1/4] Starting changelog parsing ...")
+ try:
+ # scan followed by getchanges till scan returns zero.
+ # history_scan() is blocking call, till it gets the number
+ # of changelogs to process. Returns zero when no changelogs
+ # to be processed. returns positive value as number of changelogs
+ # to be processed, which will be fetched using
+ # history_getchanges()
+ changes = []
+ while libgfchangelog.cl_history_scan() > 0:
+ changes = libgfchangelog.cl_history_getchanges()
+
+ for change in changes:
+ # Ignore if last processed changelog comes
+ # again in list
+ if change.endswith(".%s" % start):
+ continue
+ try:
+ parse_changelog_to_db(changelog_data, change, args)
+ libgfchangelog.cl_history_done(change)
+ except IOError as e:
+ logger.warn("Error parsing changelog file %s: %s" %
+ (change, e))
+
+ changelog_data.commit()
+ except libgfchangelog.ChangelogException as e:
+ fail("%s Error during Changelog Crawl: %s" % (brick, e),
+ logger=logger)
+
+ logger.info("[1/4] Finished changelog parsing.")
+
+ # Convert all pgfid available from Changelogs
+ logger.info("[2/4] Starting 'pgfid to path' conversions ...")
+ pgfid_to_path(brick, changelog_data)
+ changelog_data.commit()
+ logger.info("[2/4] Finished 'pgfid to path' conversions.")
+
+ # Convert all gfids recorded for data and metadata to all hardlink paths
+ logger.info("[3/4] Starting 'gfid2path' conversions ...")
+ gfid_to_all_paths_using_gfid2path(brick, changelog_data, args)
+ changelog_data.commit()
+ logger.info("[3/4] Finished 'gfid2path' conversions.")
+
+ # If some GFIDs fail to get converted from previous step,
+ # convert using find
+ logger.info("[4/4] Starting 'gfid to path using batchfind' "
+ "conversions ...")
+ gfid_to_path_using_batchfind(brick, changelog_data)
+ changelog_data.commit()
+ logger.info("[4/4] Finished 'gfid to path using batchfind' conversions.")
+
+ return actual_end
+
+
+def changelog_crawl(brick, start, end, args):
+ """
+ Init function, prepares working dir and calls Changelog query
+ """
+ if brick.endswith("/"):
+ brick = brick[0:len(brick)-1]
+
+ # WORKING_DIR/BRICKHASH/OUTFILE
+ working_dir = os.path.dirname(args.outfile)
+ brickhash = hashlib.sha1(brick.encode())
+ brickhash = str(brickhash.hexdigest())
+ working_dir = os.path.join(working_dir, brickhash)
+
+ mkdirp(working_dir, exit_on_err=True, logger=logger)
+
+ log_file = os.path.join(conf.get_opt("log_dir"),
+ args.session,
+ args.volume,
+ "changelog.%s.log" % brickhash)
+
+ logger.info("%s Started Changelog Crawl. Start: %s, End: %s"
+ % (brick, start, end))
+ return get_changes(brick, working_dir, log_file, start, end, args)
+
+
+def _get_args():
+ parser = ArgumentParser(formatter_class=RawDescriptionHelpFormatter,
+ description=PROG_DESCRIPTION)
+
+ parser.add_argument("session", help="Session Name")
+ parser.add_argument("volume", help="Volume Name")
+ parser.add_argument("node", help="Node Name")
+ parser.add_argument("brick", help="Brick Name")
+ parser.add_argument("outfile", help="Output File")
+ parser.add_argument("start", help="Start Time", type=int)
+ parser.add_argument("end", help="End Time", type=int)
+ parser.add_argument("--only-query", help="Query mode only (no session)",
+ action="store_true")
+ parser.add_argument("--debug", help="Debug", action="store_true")
+ parser.add_argument("--no-encode",
+ help="Do not encode path in outfile",
+ action="store_true")
+ parser.add_argument("--output-prefix", help="File prefix in output",
+ default=".")
+ parser.add_argument("--type",default="both")
+ parser.add_argument("-N", "--only-namespace-changes",
+ help="List only namespace changes",
+ action="store_true")
+
+ return parser.parse_args()
+
+
+if __name__ == "__main__":
+ args = _get_args()
+ mkdirp(os.path.join(conf.get_opt("log_dir"), args.session, args.volume),
+ exit_on_err=True)
+ log_file = os.path.join(conf.get_opt("log_dir"),
+ args.session,
+ args.volume,
+ "changelog.log")
+ setup_logger(logger, log_file, args.debug)
+
+ session_dir = os.path.join(conf.get_opt("session_dir"), args.session)
+ status_file = os.path.join(session_dir, args.volume,
+ "%s.status" % urllib.quote_plus(args.brick))
+ status_file_pre = status_file + ".pre"
+ mkdirp(os.path.join(session_dir, args.volume), exit_on_err=True,
+ logger=logger)
+
+ end = -1
+ if args.only_query:
+ start = args.start
+ end = args.end
+ else:
+ try:
+ with open(status_file) as f:
+ start = int(f.read().strip())
+ except (ValueError, OSError, IOError):
+ start = args.start
+
+ # end time is optional; so a -1 may be sent to use the default method of
+ # identifying the end time
+ if end == -1:
+ end = int(time.time()) - get_changelog_rollover_time(args.volume)
+
+ logger.info("%s Started Changelog Crawl - Start: %s End: %s" % (args.brick,
+ start,
+ end))
+ actual_end = changelog_crawl(args.brick, start, end, args)
+ if not args.only_query:
+ with open(status_file_pre, "w") as f:
+ f.write(str(actual_end))
+
+ logger.info("%s Finished Changelog Crawl - End: %s" % (args.brick,
+ actual_end))
+ sys.exit(0)
diff --git a/tools/glusterfind/src/changelogdata.py b/tools/glusterfind/src/changelogdata.py
new file mode 100644
index 00000000000..641593cf4b1
--- /dev/null
+++ b/tools/glusterfind/src/changelogdata.py
@@ -0,0 +1,440 @@
+# -*- coding: utf-8 -*-
+
+# Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com/>
+# This file is part of GlusterFS.
+#
+# This file is licensed to you under your choice of the GNU Lesser
+# General Public License, version 3 or any later version (LGPLv3 or
+# later), or the GNU General Public License, version 2 (GPLv2), in all
+# cases as published by the Free Software Foundation.
+
+import sqlite3
+import os
+
+from utils import RecordType, unquote_plus_space_newline
+from utils import output_path_prepare
+
+
+class OutputMerger(object):
+ """
+ Class to merge the output files collected from
+ different nodes
+ """
+ def __init__(self, db_path, all_dbs):
+ self.conn = sqlite3.connect(db_path)
+ self.cursor = self.conn.cursor()
+ self.cursor_reader = self.conn.cursor()
+ query = "DROP TABLE IF EXISTS finallist"
+ self.cursor.execute(query)
+
+ query = """
+ CREATE TABLE finallist(
+ id INTEGER PRIMARY KEY AUTOINCREMENT,
+ ts VARCHAR,
+ type VARCHAR,
+ gfid VARCHAR,
+ path1 VARCHAR,
+ path2 VARCHAR,
+ UNIQUE (type, path1, path2) ON CONFLICT IGNORE
+ )
+ """
+ self.cursor.execute(query)
+
+ # If node database exists, read each db and insert into
+ # final table. Ignore if combination of TYPE PATH1 PATH2
+ # already exists
+ for node_db in all_dbs:
+ if os.path.exists(node_db):
+ conn = sqlite3.connect(node_db)
+ cursor = conn.cursor()
+ query = """
+ SELECT ts, type, gfid, path1, path2
+ FROM gfidpath
+ WHERE path1 != ''
+ ORDER BY id ASC
+ """
+ for row in cursor.execute(query):
+ self.add_if_not_exists(row[0], row[1], row[2],
+ row[3], row[4])
+
+ self.conn.commit()
+
+ def add_if_not_exists(self, ts, ty, gfid, path1, path2=""):
+ # Adds record to finallist only if not exists
+ query = """
+ INSERT INTO finallist(ts, type, gfid, path1, path2)
+ VALUES(?, ?, ?, ?, ?)
+ """
+ self.cursor.execute(query, (ts, ty, gfid, path1, path2))
+
+ def get(self):
+ query = """SELECT type, path1, path2 FROM finallist
+ ORDER BY ts ASC, id ASC"""
+ return self.cursor_reader.execute(query)
+
+ def get_failures(self):
+ query = """
+ SELECT gfid
+ FROM finallist
+ WHERE path1 = '' OR (path2 = '' AND type = 'RENAME')
+ """
+ return self.cursor_reader.execute(query)
+
+
+class ChangelogData(object):
+ def __init__(self, dbpath, args):
+ self.conn = sqlite3.connect(dbpath)
+ self.cursor = self.conn.cursor()
+ self.cursor_reader = self.conn.cursor()
+ self._create_table_gfidpath()
+ self._create_table_pgfid()
+ self._create_table_inodegfid()
+ self.args = args
+ self.path_sep = "/"
+
+ def _create_table_gfidpath(self):
+ drop_table = "DROP TABLE IF EXISTS gfidpath"
+ self.cursor.execute(drop_table)
+
+ create_table = """
+ CREATE TABLE gfidpath(
+ id INTEGER PRIMARY KEY AUTOINCREMENT,
+ ts VARCHAR,
+ type VARCHAR,
+ gfid VARCHAR(40),
+ pgfid1 VARCHAR(40) DEFAULT '',
+ bn1 VARCHAR(500) DEFAULT '',
+ pgfid2 VARCHAR(40) DEFAULT '',
+ bn2 VARCHAR(500) DEFAULT '',
+ path1 VARCHAR DEFAULT '',
+ path2 VARCHAR DEFAULT ''
+ )
+ """
+ self.cursor.execute(create_table)
+
+ create_index = """
+ CREATE INDEX gfid_index ON gfidpath(gfid);
+ """
+ self.cursor.execute(create_index)
+
+ def _create_table_inodegfid(self):
+ drop_table = "DROP TABLE IF EXISTS inodegfid"
+ self.cursor.execute(drop_table)
+
+ create_table = """
+ CREATE TABLE inodegfid(
+ inode INTEGER PRIMARY KEY,
+ gfid VARCHAR(40),
+ converted INTEGER DEFAULT 0,
+ UNIQUE (inode, gfid) ON CONFLICT IGNORE
+ )
+ """
+ self.cursor.execute(create_table)
+
+ def _create_table_pgfid(self):
+ drop_table = "DROP TABLE IF EXISTS pgfid"
+ self.cursor.execute(drop_table)
+
+ create_table = """
+ CREATE TABLE pgfid(
+ pgfid VARCHAR(40) PRIMARY KEY,
+ UNIQUE (pgfid) ON CONFLICT IGNORE
+ )
+ """
+ self.cursor.execute(create_table)
+
+ def _get(self, tablename, filters):
+ # SELECT * FROM <TABLENAME> WHERE <CONDITION>
+ params = []
+ query = "SELECT * FROM %s WHERE 1=1" % tablename
+
+ for key, value in filters.items():
+ query += " AND %s = ?" % key
+ params.append(value)
+
+ return self.cursor_reader.execute(query, params)
+
+ def _get_distinct(self, tablename, distinct_field, filters):
+ # SELECT DISTINCT <COL> FROM <TABLENAME> WHERE <CONDITION>
+ params = []
+ query = "SELECT DISTINCT %s FROM %s WHERE 1=1" % (distinct_field,
+ tablename)
+
+ for key, value in filters.items():
+ query += " AND %s = ?" % key
+ params.append(value)
+
+ return self.cursor_reader.execute(query, params)
+
+ def _delete(self, tablename, filters):
+ # DELETE FROM <TABLENAME> WHERE <CONDITIONS>
+ query = "DELETE FROM %s WHERE 1=1" % tablename
+ params = []
+
+ for key, value in filters.items():
+ query += " AND %s = ?" % key
+ params.append(value)
+
+ self.cursor.execute(query, params)
+
+ def _add(self, tablename, data):
+ # INSERT INTO <TABLENAME>(<col1>, <col2>..) VALUES(?,?..)
+ query = "INSERT INTO %s(" % tablename
+ fields = []
+ params = []
+ for key, value in data.items():
+ fields.append(key)
+ params.append(value)
+
+ values_substitute = len(fields)*["?"]
+ query += "%s) VALUES(%s)" % (",".join(fields),
+ ",".join(values_substitute))
+ self.cursor.execute(query, params)
+
+ def _update(self, tablename, data, filters):
+ # UPDATE <TABLENAME> SET col1 = ?,.. WHERE col1=? AND ..
+ params = []
+ update_fields = []
+ for key, value in data.items():
+ update_fields.append("%s = ?" % key)
+ params.append(value)
+
+ query = "UPDATE %s SET %s WHERE 1 = 1" % (tablename,
+ ", ".join(update_fields))
+
+ for key, value in filters.items():
+ query += " AND %s = ?" % key
+ params.append(value)
+
+ self.cursor.execute(query, params)
+
+ def _exists(self, tablename, filters):
+ if not filters:
+ return False
+
+ query = "SELECT COUNT(1) FROM %s WHERE 1=1" % tablename
+ params = []
+
+ for key, value in filters.items():
+ query += " AND %s = ?" % key
+ params.append(value)
+
+ self.cursor.execute(query, params)
+ row = self.cursor.fetchone()
+ return True if row[0] > 0 else False
+
+ def gfidpath_add(self, changelogfile, ty, gfid, pgfid1="", bn1="",
+ pgfid2="", bn2="", path1="", path2=""):
+ self._add("gfidpath", {
+ "ts": changelogfile.split(".")[-1],
+ "type": ty,
+ "gfid": gfid,
+ "pgfid1": pgfid1,
+ "bn1": bn1,
+ "pgfid2": pgfid2,
+ "bn2": bn2,
+ "path1": path1,
+ "path2": path2
+ })
+
+ def gfidpath_update(self, data, filters):
+ self._update("gfidpath", data, filters)
+
+ def gfidpath_delete(self, filters):
+ self._delete("gfidpath", filters)
+
+ def gfidpath_exists(self, filters):
+ return self._exists("gfidpath", filters)
+
+ def gfidpath_get(self, filters={}):
+ return self._get("gfidpath", filters)
+
+ def gfidpath_get_distinct(self, distinct_field, filters={}):
+ return self._get_distinct("gfidpath", distinct_field, filters)
+
+ def pgfid_add(self, pgfid):
+ self._add("pgfid", {
+ "pgfid": pgfid
+ })
+
+ def pgfid_update(self, data, filters):
+ self._update("pgfid", data, filters)
+
+ def pgfid_get(self, filters={}):
+ return self._get("pgfid", filters)
+
+ def pgfid_get_distinct(self, distinct_field, filters={}):
+ return self._get_distinct("pgfid", distinct_field, filters)
+
+ def pgfid_exists(self, filters):
+ return self._exists("pgfid", filters)
+
+ def inodegfid_add(self, inode, gfid, converted=0):
+ self._add("inodegfid", {
+ "inode": inode,
+ "gfid": gfid,
+ "converted": converted
+ })
+
+ def inodegfid_update(self, data, filters):
+ self._update("inodegfid", data, filters)
+
+ def inodegfid_get(self, filters={}):
+ return self._get("inodegfid", filters)
+
+ def inodegfid_get_distinct(self, distinct_field, filters={}):
+ return self._get_distinct("inodegfid", distinct_field, filters)
+
+ def inodegfid_exists(self, filters):
+ return self._exists("inodegfid", filters)
+
+ def append_path1(self, path, inode):
+ # || is for concatenate in SQL
+ query = """UPDATE gfidpath SET path1 = path1 || ',' || ?
+ WHERE gfid IN (SELECT gfid FROM inodegfid WHERE inode = ?)"""
+ self.cursor.execute(query, (path, inode))
+
+ def gfidpath_set_path1(self, path1, pgfid1):
+ # || is for concatenate in SQL
+ if path1 == "":
+ update_str1 = "? || bn1"
+ update_str2 = "? || bn2"
+ else:
+ update_str1 = "? || '{0}' || bn1".format(self.path_sep)
+ update_str2 = "? || '{0}' || bn2".format(self.path_sep)
+
+ query = """UPDATE gfidpath SET path1 = %s
+ WHERE pgfid1 = ?""" % update_str1
+ self.cursor.execute(query, (path1, pgfid1))
+
+ # Set Path2 if pgfid1 and pgfid2 are same
+ query = """UPDATE gfidpath SET path2 = %s
+ WHERE pgfid2 = ?""" % update_str2
+ self.cursor.execute(query, (path1, pgfid1))
+
+ def gfidpath_set_path2(self, path2, pgfid2):
+ # || is for concatenate in SQL
+ if path2 == "":
+ update_str = "? || bn2"
+ else:
+ update_str = "? || '{0}' || bn2".format(self.path_sep)
+
+ query = """UPDATE gfidpath SET path2 = %s
+ WHERE pgfid2 = ?""" % update_str
+ self.cursor.execute(query, (path2, pgfid2))
+
+ def when_create_mknod_mkdir(self, changelogfile, data):
+ # E <GFID> <MKNOD|CREATE|MKDIR> <MODE> <USER> <GRP> <PGFID>/<BNAME>
+ # Add the Entry to DB
+ pgfid1, bn1 = data[6].split("/", 1)
+
+ if self.args.no_encode:
+ bn1 = unquote_plus_space_newline(bn1).strip()
+
+ self.gfidpath_add(changelogfile, RecordType.NEW, data[1], pgfid1, bn1)
+
+ def when_rename(self, changelogfile, data):
+ # E <GFID> RENAME <OLD_PGFID>/<BNAME> <PGFID>/<BNAME>
+ pgfid1, bn1 = data[3].split("/", 1)
+ pgfid2, bn2 = data[4].split("/", 1)
+
+ if self.args.no_encode:
+ bn1 = unquote_plus_space_newline(bn1).strip()
+ bn2 = unquote_plus_space_newline(bn2).strip()
+
+ if self.gfidpath_exists({"gfid": data[1], "type": "NEW",
+ "pgfid1": pgfid1, "bn1": bn1}):
+ # If <OLD_PGFID>/<BNAME> is same as CREATE, Update
+ # <NEW_PGFID>/<BNAME> in NEW.
+ self.gfidpath_update({"pgfid1": pgfid2, "bn1": bn2},
+ {"gfid": data[1], "type": "NEW",
+ "pgfid1": pgfid1, "bn1": bn1})
+ elif self.gfidpath_exists({"gfid": data[1], "type": "RENAME",
+ "pgfid2": pgfid1, "bn2": bn1}):
+ # If we are renaming file back to original name then just
+ # delete the entry since it will effectively be a no-op
+ if self.gfidpath_exists({"gfid": data[1], "type": "RENAME",
+ "pgfid2": pgfid1, "bn2": bn1,
+ "pgfid1": pgfid2, "bn1": bn2}):
+ self.gfidpath_delete({"gfid": data[1], "type": "RENAME",
+ "pgfid2": pgfid1, "bn2": bn1})
+ else:
+ # If <OLD_PGFID>/<BNAME> is same as <PGFID2>/<BN2>
+ # (may be previous RENAME)
+ # then UPDATE <NEW_PGFID>/<BNAME> as <PGFID2>/<BN2>
+ self.gfidpath_update({"pgfid2": pgfid2, "bn2": bn2},
+ {"gfid": data[1], "type": "RENAME",
+ "pgfid2": pgfid1, "bn2": bn1})
+ else:
+ # Else insert as RENAME
+ self.gfidpath_add(changelogfile, RecordType.RENAME, data[1],
+ pgfid1, bn1, pgfid2, bn2)
+
+ if self.gfidpath_exists({"gfid": data[1], "type": "MODIFY"}):
+ # If MODIFY exists already for that GFID, remove it and insert
+ # again so that MODIFY entry comes after RENAME entry
+ # Output will have MODIFY <NEWNAME>
+ self.gfidpath_delete({"gfid": data[1], "type": "MODIFY"})
+ self.gfidpath_add(changelogfile, RecordType.MODIFY, data[1])
+
+ def when_link_symlink(self, changelogfile, data):
+ # E <GFID> <LINK|SYMLINK> <PGFID>/<BASENAME>
+ # Add as New record in Db as Type NEW
+ pgfid1, bn1 = data[3].split("/", 1)
+ if self.args.no_encode:
+ bn1 = unquote_plus_space_newline(bn1).strip()
+
+ self.gfidpath_add(changelogfile, RecordType.NEW, data[1], pgfid1, bn1)
+
+ def when_data_meta(self, changelogfile, data):
+ # If GFID row exists, Ignore else Add to Db
+ if not self.gfidpath_exists({"gfid": data[1], "type": "NEW"}) and \
+ not self.gfidpath_exists({"gfid": data[1], "type": "MODIFY"}):
+ self.gfidpath_add(changelogfile, RecordType.MODIFY, data[1])
+
+ def when_unlink_rmdir(self, changelogfile, data):
+ # E <GFID> <UNLINK|RMDIR> <PGFID>/<BASENAME>
+ pgfid1, bn1 = data[3].split("/", 1)
+
+ if self.args.no_encode:
+ bn1 = unquote_plus_space_newline(bn1).strip()
+
+ deleted_path = data[4] if len(data) == 5 else ""
+ if deleted_path != "":
+ deleted_path = unquote_plus_space_newline(deleted_path)
+ deleted_path = output_path_prepare(deleted_path, self.args)
+
+ if self.gfidpath_exists({"gfid": data[1], "type": "NEW",
+ "pgfid1": pgfid1, "bn1": bn1}):
+ # If path exists in table as NEW with same GFID
+ # Delete that row
+ self.gfidpath_delete({"gfid": data[1], "type": "NEW",
+ "pgfid1": pgfid1, "bn1": bn1})
+ else:
+ # Else Record as DELETE
+ self.gfidpath_add(changelogfile, RecordType.DELETE, data[1],
+ pgfid1, bn1, path1=deleted_path)
+
+ # Update path1 as deleted_path if pgfid1 and bn1 is same as deleted
+ self.gfidpath_update({"path1": deleted_path}, {"gfid": data[1],
+ "pgfid1": pgfid1,
+ "bn1": bn1})
+
+ # Update path2 as deleted_path if pgfid2 and bn2 is same as deleted
+ self.gfidpath_update({"path2": deleted_path}, {
+ "type": RecordType.RENAME,
+ "gfid": data[1],
+ "pgfid2": pgfid1,
+ "bn2": bn1})
+
+ # If deleted directory is parent for somebody
+ query1 = """UPDATE gfidpath SET path1 = ? || '{0}' || bn1
+ WHERE pgfid1 = ? AND path1 != ''""".format(self.path_sep)
+ self.cursor.execute(query1, (deleted_path, data[1]))
+
+ query1 = """UPDATE gfidpath SET path2 = ? || '{0}' || bn1
+ WHERE pgfid2 = ? AND path2 != ''""".format(self.path_sep)
+ self.cursor.execute(query1, (deleted_path, data[1]))
+
+ def commit(self):
+ self.conn.commit()
diff --git a/tools/glusterfind/src/conf.py b/tools/glusterfind/src/conf.py
new file mode 100644
index 00000000000..3849ba5dd1f
--- /dev/null
+++ b/tools/glusterfind/src/conf.py
@@ -0,0 +1,31 @@
+# -*- coding: utf-8 -*-
+
+# Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com/>
+# This file is part of GlusterFS.
+#
+# This file is licensed to you under your choice of the GNU Lesser
+# General Public License, version 3 or any later version (LGPLv3 or
+# later), or the GNU General Public License, version 2 (GPLv2), in all
+# cases as published by the Free Software Foundation.
+
+import os
+try:
+ from ConfigParser import ConfigParser
+except ImportError:
+ from configparser import ConfigParser
+
+config = ConfigParser()
+config.read(os.path.join(os.path.dirname(os.path.abspath(__file__)),
+ "tool.conf"))
+
+
+def list_change_detectors():
+ return dict(config.items("change_detectors")).keys()
+
+
+def get_opt(opt):
+ return config.get("vars", opt)
+
+
+def get_change_detector(opt):
+ return config.get("change_detectors", opt)
diff --git a/tools/glusterfind/src/gfind_py2py3.py b/tools/glusterfind/src/gfind_py2py3.py
new file mode 100644
index 00000000000..87324fbf350
--- /dev/null
+++ b/tools/glusterfind/src/gfind_py2py3.py
@@ -0,0 +1,88 @@
+#
+# Copyright (c) 2018 Red Hat, Inc. <http://www.redhat.com>
+# This file is part of GlusterFS.
+
+# This file is licensed to you under your choice of the GNU Lesser
+# General Public License, version 3 or any later version (LGPLv3 or
+# later), or the GNU General Public License, version 2 (GPLv2), in all
+# cases as published by the Free Software Foundation.
+#
+
+# All python2/python3 compatibility routines
+
+import os
+import sys
+from ctypes import create_string_buffer
+
+if sys.version_info >= (3,):
+
+ # Raw conversion of bytearray to string. Used in the cases where
+ # buffer is created by create_string_buffer which is a 8-bit char
+ # array and passed to syscalls to fetch results. Using encode/decode
+ # doesn't work as it converts to string altering the size.
+ # def bytearray_to_str(byte_arr):
+ def bytearray_to_str(byte_arr):
+ return ''.join([chr(b) for b in byte_arr])
+
+ def gf_create_string_buffer(size):
+ return create_string_buffer(b'\0', size)
+
+ def gfind_history_changelog(libgfc, changelog_path, start, end, num_parallel,
+ actual_end):
+ return libgfc.gf_history_changelog(changelog_path.encode(), start, end, num_parallel,
+ actual_end)
+
+ def gfind_changelog_register(libgfc, brick, path, log_file, log_level,
+ retries):
+ return libgfc.gf_changelog_register(brick.encode(), path.encode(), log_file.encode(),
+ log_level, retries)
+
+ def gfind_history_changelog_done(libgfc, clfile):
+ return libgfc.gf_history_changelog_done(clfile.encode())
+
+ def gfind_write_row(f, row, field_separator, p_rep, row_2_rep):
+ f.write(u"{0}{1}{2}{3}{4}\n".format(row,
+ field_separator,
+ p_rep,
+ field_separator,
+ row_2_rep))
+
+ def gfind_write(f, row, field_separator, p_rep):
+ f.write(u"{0}{1}{2}\n".format(row,
+ field_separator,
+ p_rep))
+
+
+else:
+
+ # Raw conversion of bytearray to string
+ def bytearray_to_str(byte_arr):
+ return byte_arr
+
+ def gf_create_string_buffer(size):
+ return create_string_buffer('\0', size)
+
+ def gfind_history_changelog(libgfc, changelog_path, start, end, num_parallel,
+ actual_end):
+ return libgfc.gf_history_changelog(changelog_path, start, end,
+ num_parallel, actual_end)
+
+ def gfind_changelog_register(libgfc, brick, path, log_file, log_level,
+ retries):
+ return libgfc.gf_changelog_register(brick, path, log_file,
+ log_level, retries)
+
+ def gfind_history_changelog_done(libgfc, clfile):
+ return libgfc.gf_history_changelog_done(clfile)
+
+ def gfind_write_row(f, row, field_separator, p_rep, row_2_rep):
+ f.write(u"{0}{1}{2}{3}{4}\n".format(row,
+ field_separator,
+ p_rep,
+ field_separator,
+ row_2_rep).encode())
+
+ def gfind_write(f, row, field_separator, p_rep):
+ f.write(u"{0}{1}{2}\n".format(row,
+ field_separator,
+ p_rep).encode())
diff --git a/tools/glusterfind/src/libgfchangelog.py b/tools/glusterfind/src/libgfchangelog.py
new file mode 100644
index 00000000000..513bb101e93
--- /dev/null
+++ b/tools/glusterfind/src/libgfchangelog.py
@@ -0,0 +1,92 @@
+# -*- coding: utf-8 -*-
+
+# Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com/>
+# This file is part of GlusterFS.
+#
+# This file is licensed to you under your choice of the GNU Lesser
+# General Public License, version 3 or any later version (LGPLv3 or
+# later), or the GNU General Public License, version 2 (GPLv2), in all
+# cases as published by the Free Software Foundation.
+
+import os
+from ctypes import CDLL, RTLD_GLOBAL, get_errno, create_string_buffer, c_ulong, byref
+from ctypes.util import find_library
+from gfind_py2py3 import bytearray_to_str, gf_create_string_buffer
+from gfind_py2py3 import gfind_history_changelog, gfind_changelog_register
+from gfind_py2py3 import gfind_history_changelog_done
+
+
+class ChangelogException(OSError):
+ pass
+
+libgfc = CDLL(find_library("gfchangelog"), mode=RTLD_GLOBAL, use_errno=True)
+
+
+def raise_oserr(prefix=None):
+ errn = get_errno()
+ prefix_or_empty = prefix + ": " if prefix else ""
+ raise ChangelogException(errn, prefix_or_empty + os.strerror(errn))
+
+
+def cl_init():
+ ret = libgfc.gf_changelog_init(None)
+ if ret == -1:
+ raise_oserr(prefix="gf_changelog_init")
+
+
+def cl_register(brick, path, log_file, log_level, retries=0):
+ ret = gfind_changelog_register(libgfc, brick, path, log_file,log_level, retries)
+ if ret == -1:
+ raise_oserr(prefix="gf_changelog_register")
+
+
+def cl_history_scan():
+ ret = libgfc.gf_history_changelog_scan()
+ if ret == -1:
+ raise_oserr(prefix="gf_history_changelog_scan")
+
+ return ret
+
+
+def cl_history_changelog(changelog_path, start, end, num_parallel):
+ actual_end = c_ulong()
+ ret = gfind_history_changelog(libgfc,changelog_path, start, end,
+ num_parallel,
+ byref(actual_end))
+ if ret == -1:
+ raise_oserr(prefix="gf_history_changelog")
+
+ return actual_end.value
+
+
+def cl_history_startfresh():
+ ret = libgfc.gf_history_changelog_start_fresh()
+ if ret == -1:
+ raise_oserr(prefix="gf_history_changelog_start_fresh")
+
+
+def cl_history_getchanges():
+ """ remove hardcoding for path name length """
+ def clsort(f):
+ return f.split('.')[-1]
+
+ changes = []
+ buf = gf_create_string_buffer(4096)
+
+ while True:
+ ret = libgfc.gf_history_changelog_next_change(buf, 4096)
+ if ret in (0, -1):
+ break
+ # py2 and py3 compatibility
+ result = bytearray_to_str(buf.raw[:ret - 1])
+ changes.append(result)
+ if ret == -1:
+ raise_oserr(prefix="gf_history_changelog_next_change")
+
+ return sorted(changes, key=clsort)
+
+
+def cl_history_done(clfile):
+ ret = gfind_history_changelog_done(libgfc, clfile)
+ if ret == -1:
+ raise_oserr(prefix="gf_history_changelog_done")
diff --git a/tools/glusterfind/src/main.py b/tools/glusterfind/src/main.py
new file mode 100644
index 00000000000..4b5466d0114
--- /dev/null
+++ b/tools/glusterfind/src/main.py
@@ -0,0 +1,921 @@
+#!/usr/bin/python3
+# -*- coding: utf-8 -*-
+
+# Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com/>
+# This file is part of GlusterFS.
+#
+# This file is licensed to you under your choice of the GNU Lesser
+# General Public License, version 3 or any later version (LGPLv3 or
+# later), or the GNU General Public License, version 2 (GPLv2), in all
+# cases as published by the Free Software Foundation.
+
+import sys
+from errno import ENOENT, ENOTEMPTY
+import time
+from multiprocessing import Process
+import os
+import xml.etree.cElementTree as etree
+from argparse import ArgumentParser, RawDescriptionHelpFormatter, Action
+from gfind_py2py3 import gfind_write_row, gfind_write
+import logging
+import shutil
+import tempfile
+import signal
+from datetime import datetime
+import codecs
+import re
+
+from utils import execute, is_host_local, mkdirp, fail
+from utils import setup_logger, human_time, handle_rm_error
+from utils import get_changelog_rollover_time, cache_output, create_file
+import conf
+from changelogdata import OutputMerger
+
+PROG_DESCRIPTION = """
+GlusterFS Incremental API
+"""
+ParseError = etree.ParseError if hasattr(etree, 'ParseError') else SyntaxError
+
+logger = logging.getLogger()
+vol_statusStr = ""
+gtmpfilename = None
+g_pid_nodefile_map = {}
+
+
+class StoreAbsPath(Action):
+ def __init__(self, option_strings, dest, nargs=None, **kwargs):
+ super(StoreAbsPath, self).__init__(option_strings, dest, **kwargs)
+
+ def __call__(self, parser, namespace, values, option_string=None):
+ setattr(namespace, self.dest, os.path.abspath(values))
+
+
+def get_pem_key_path(session, volume):
+ return os.path.join(conf.get_opt("session_dir"),
+ session,
+ volume,
+ "%s_%s_secret.pem" % (session, volume))
+
+
+def node_cmd(host, host_uuid, task, cmd, args, opts):
+ """
+ Runs command via ssh if host is not local
+ """
+ try:
+ localdir = is_host_local(host_uuid)
+
+ # this is so to avoid deleting the ssh keys on local node which
+ # otherwise cause ssh password prompts on the console (race conditions)
+ # mode_delete() should be cleaning up the session tree
+ if localdir and task == "delete":
+ return
+
+ pem_key_path = get_pem_key_path(args.session, args.volume)
+
+ if not localdir:
+ # prefix with ssh command if not local node
+ cmd = ["ssh",
+ "-oNumberOfPasswordPrompts=0",
+ "-oStrictHostKeyChecking=no",
+ # We force TTY allocation (-t -t) so that Ctrl+C is handed
+ # through; see:
+ # https://bugzilla.redhat.com/show_bug.cgi?id=1382236
+ # Note that this turns stderr of the remote `cmd`
+ # into stdout locally.
+ "-t",
+ "-t",
+ "-i", pem_key_path,
+ "root@%s" % host] + cmd
+
+ (returncode, err, out) = execute(cmd, logger=logger)
+ if returncode != 0:
+ # Because the `-t -t` above turns the remote stderr into
+ # local stdout, we need to log both stderr and stdout
+ # here to print all error messages.
+ fail("%s - %s failed; stdout (including remote stderr):\n"
+ "%s\n"
+ "stderr:\n"
+ "%s" % (host, task, out, err),
+ returncode,
+ logger=logger)
+
+ if opts.get("copy_outfile", False) and not localdir:
+ cmd_copy = ["scp",
+ "-oNumberOfPasswordPrompts=0",
+ "-oStrictHostKeyChecking=no",
+ "-i", pem_key_path,
+ "root@%s:/%s" % (host, opts.get("node_outfile")),
+ os.path.dirname(opts.get("node_outfile"))]
+ execute(cmd_copy, exit_msg="%s - Copy command failed" % host,
+ logger=logger)
+ except KeyboardInterrupt:
+ sys.exit(2)
+
+
+def run_cmd_nodes(task, args, **kwargs):
+ global g_pid_nodefile_map
+ nodes = get_nodes(args.volume)
+ pool = []
+ for num, node in enumerate(nodes):
+ host, brick = node[1].split(":")
+ host_uuid = node[0]
+ cmd = []
+ opts = {}
+
+ # tmpfilename is valid only for tasks: pre, query and cleanup
+ tmpfilename = kwargs.get("tmpfilename", "BADNAME")
+
+ node_outfile = os.path.join(conf.get_opt("working_dir"),
+ args.session, args.volume,
+ tmpfilename,
+ "tmp_output_%s" % num)
+
+ if task == "pre":
+ if vol_statusStr != "Started":
+ fail("Volume %s is not online" % args.volume,
+ logger=logger)
+
+ # If Full backup is requested or start time is zero, use brickfind
+ change_detector = conf.get_change_detector("changelog")
+ tag = None
+ if args.full:
+ change_detector = conf.get_change_detector("brickfind")
+ tag = args.tag_for_full_find.strip()
+ if tag == "":
+ tag = '""' if not is_host_local(host_uuid) else ""
+
+ # remote file will be copied into this directory
+ mkdirp(os.path.dirname(node_outfile),
+ exit_on_err=True, logger=logger)
+
+ FS = args.field_separator
+ if not is_host_local(host_uuid):
+ FS = "'" + FS + "'"
+
+ cmd = [change_detector,
+ args.session,
+ args.volume,
+ host,
+ brick,
+ node_outfile] + \
+ ([str(kwargs.get("start")), str(kwargs.get("end"))]
+ if not args.full else []) + \
+ ([tag] if tag is not None else []) + \
+ ["--output-prefix", args.output_prefix] + \
+ (["--debug"] if args.debug else []) + \
+ (["--no-encode"] if args.no_encode else []) + \
+ (["--only-namespace-changes"] if args.only_namespace_changes
+ else []) + \
+ (["--type", args.type]) + \
+ (["--field-separator", FS] if args.full else [])
+
+ opts["node_outfile"] = node_outfile
+ opts["copy_outfile"] = True
+ elif task == "query":
+ # If Full backup is requested or start time is zero, use brickfind
+ tag = None
+ change_detector = conf.get_change_detector("changelog")
+ if args.full:
+ change_detector = conf.get_change_detector("brickfind")
+ tag = args.tag_for_full_find.strip()
+ if tag == "":
+ tag = '""' if not is_host_local(host_uuid) else ""
+
+ # remote file will be copied into this directory
+ mkdirp(os.path.dirname(node_outfile),
+ exit_on_err=True, logger=logger)
+
+ FS = args.field_separator
+ if not is_host_local(host_uuid):
+ FS = "'" + FS + "'"
+
+ cmd = [change_detector,
+ args.session,
+ args.volume,
+ host,
+ brick,
+ node_outfile] + \
+ ([str(kwargs.get("start")), str(kwargs.get("end"))]
+ if not args.full else []) + \
+ ([tag] if tag is not None else []) + \
+ ["--only-query"] + \
+ ["--output-prefix", args.output_prefix] + \
+ (["--debug"] if args.debug else []) + \
+ (["--no-encode"] if args.no_encode else []) + \
+ (["--only-namespace-changes"]
+ if args.only_namespace_changes else []) + \
+ (["--type", args.type]) + \
+ (["--field-separator", FS] if args.full else [])
+
+ opts["node_outfile"] = node_outfile
+ opts["copy_outfile"] = True
+ elif task == "cleanup":
+ # After pre/query run, cleanup the working directory and other
+ # temp files. Remove the directory to which node_outfile has
+ # been copied in main node
+ try:
+ os.remove(node_outfile)
+ except (OSError, IOError):
+ logger.warn("Failed to cleanup temporary file %s" %
+ node_outfile)
+ pass
+
+ cmd = [conf.get_opt("nodeagent"),
+ "cleanup",
+ args.session,
+ args.volume,
+ os.path.dirname(node_outfile)] + \
+ (["--debug"] if args.debug else [])
+ elif task == "create":
+ if vol_statusStr != "Started":
+ fail("Volume %s is not online" % args.volume,
+ logger=logger)
+
+ # When glusterfind create, create session directory in
+ # each brick nodes
+ cmd = [conf.get_opt("nodeagent"),
+ "create",
+ args.session,
+ args.volume,
+ brick,
+ kwargs.get("time_to_update")] + \
+ (["--debug"] if args.debug else []) + \
+ (["--reset-session-time"] if args.reset_session_time
+ else [])
+ elif task == "post":
+ # Rename pre status file to actual status file in each node
+ cmd = [conf.get_opt("nodeagent"),
+ "post",
+ args.session,
+ args.volume,
+ brick] + \
+ (["--debug"] if args.debug else [])
+ elif task == "delete":
+ # When glusterfind delete, cleanup all the session files/dirs
+ # from each node.
+ cmd = [conf.get_opt("nodeagent"),
+ "delete",
+ args.session,
+ args.volume] + \
+ (["--debug"] if args.debug else [])
+
+ if cmd:
+ p = Process(target=node_cmd,
+ args=(host, host_uuid, task, cmd, args, opts))
+ p.start()
+ pool.append(p)
+ g_pid_nodefile_map[p.pid] = node_outfile
+
+ for num, p in enumerate(pool):
+ p.join()
+ if p.exitcode != 0:
+ logger.warn("Command %s failed in %s" % (task, nodes[num][1]))
+ if task in ["create", "delete"]:
+ fail("Command %s failed in %s" % (task, nodes[num][1]))
+ elif task == "pre" or task == "query":
+ if args.disable_partial:
+ sys.exit(1)
+ else:
+ del g_pid_nodefile_map[p.pid]
+
+
+@cache_output
+def get_nodes(volume):
+ """
+ Get the gluster volume info xml output and parse to get
+ the brick details.
+ """
+ global vol_statusStr
+
+ cmd = ["gluster", 'volume', 'info', volume, "--xml"]
+ _, data, _ = execute(cmd,
+ exit_msg="Failed to Run Gluster Volume Info",
+ logger=logger)
+ tree = etree.fromstring(data)
+
+ # Test to check if volume has been deleted after session creation
+ count_el = tree.find('volInfo/volumes/count')
+ if int(count_el.text) == 0:
+ fail("Unable to get volume details", logger=logger)
+
+ # this status is used in caller: run_cmd_nodes
+ vol_statusStr = tree.find('volInfo/volumes/volume/statusStr').text
+ vol_typeStr = tree.find('volInfo/volumes/volume/typeStr').text
+
+ nodes = []
+ volume_el = tree.find('volInfo/volumes/volume')
+ try:
+ brick_elems = []
+ if vol_typeStr == "Tier":
+ brick_elems.append('bricks/hotBricks/brick')
+ brick_elems.append('bricks/coldBricks/brick')
+ else:
+ brick_elems.append('bricks/brick')
+
+ for elem in brick_elems:
+ for b in volume_el.findall(elem):
+ nodes.append((b.find('hostUuid').text,
+ b.find('name').text))
+ except (ParseError, AttributeError, ValueError) as e:
+ fail("Failed to parse Volume Info: %s" % e, logger=logger)
+
+ return nodes
+
+
+def _get_args():
+ parser = ArgumentParser(formatter_class=RawDescriptionHelpFormatter,
+ description=PROG_DESCRIPTION)
+ subparsers = parser.add_subparsers(dest="mode")
+ subparsers.required = True
+
+ # create <SESSION> <VOLUME> [--debug] [--force]
+ parser_create = subparsers.add_parser('create')
+ parser_create.add_argument("session", help="Session Name")
+ parser_create.add_argument("volume", help="Volume Name")
+ parser_create.add_argument("--debug", help="Debug", action="store_true")
+ parser_create.add_argument("--force", help="Force option to recreate "
+ "the session", action="store_true")
+ parser_create.add_argument("--reset-session-time",
+ help="Reset Session Time to Current Time",
+ action="store_true")
+
+ # delete <SESSION> <VOLUME> [--debug]
+ parser_delete = subparsers.add_parser('delete')
+ parser_delete.add_argument("session", help="Session Name")
+ parser_delete.add_argument("volume", help="Volume Name")
+ parser_delete.add_argument("--debug", help="Debug", action="store_true")
+
+ # list [--session <SESSION>] [--volume <VOLUME>]
+ parser_list = subparsers.add_parser('list')
+ parser_list.add_argument("--session", help="Session Name", default="")
+ parser_list.add_argument("--volume", help="Volume Name", default="")
+ parser_list.add_argument("--debug", help="Debug", action="store_true")
+
+ # pre <SESSION> <VOLUME> <OUTFILE>
+ # [--output-prefix <OUTPUT_PREFIX>] [--full]
+ parser_pre = subparsers.add_parser('pre')
+ parser_pre.add_argument("session", help="Session Name")
+ parser_pre.add_argument("volume", help="Volume Name")
+ parser_pre.add_argument("outfile", help="Output File", action=StoreAbsPath)
+ parser_pre.add_argument("--debug", help="Debug", action="store_true")
+ parser_pre.add_argument("--no-encode",
+ help="Do not encode path in output file",
+ action="store_true")
+ parser_pre.add_argument("--full", help="Full find", action="store_true")
+ parser_pre.add_argument("--disable-partial", help="Disable Partial find, "
+ "Fail when one node fails", action="store_true")
+ parser_pre.add_argument("--output-prefix", help="File prefix in output",
+ default=".")
+ parser_pre.add_argument("--regenerate-outfile",
+ help="Regenerate outfile, discard the outfile "
+ "generated from last pre command",
+ action="store_true")
+ parser_pre.add_argument("-N", "--only-namespace-changes",
+ help="List only namespace changes",
+ action="store_true")
+ parser_pre.add_argument("--tag-for-full-find",
+ help="Tag prefix for file names emitted during"
+ " a full find operation; default: \"NEW\"",
+ default="NEW")
+ parser_pre.add_argument('--type', help="type: f, f-files only"
+ " d, d-directories only, by default = both",
+ default='both', choices=["f", "d", "both"])
+ parser_pre.add_argument("--field-separator", help="Field separator string",
+ default=" ")
+
+ # query <VOLUME> <OUTFILE> --since-time <SINCE_TIME>
+ # [--output-prefix <OUTPUT_PREFIX>] [--full]
+ parser_query = subparsers.add_parser('query')
+ parser_query.add_argument("volume", help="Volume Name")
+ parser_query.add_argument("outfile", help="Output File",
+ action=StoreAbsPath)
+ parser_query.add_argument("--since-time", help="UNIX epoch time since "
+ "which listing is required", type=int)
+ parser_query.add_argument("--end-time", help="UNIX epoch time up to "
+ "which listing is required", type=int)
+ parser_query.add_argument("--no-encode",
+ help="Do not encode path in output file",
+ action="store_true")
+ parser_query.add_argument("--full", help="Full find", action="store_true")
+ parser_query.add_argument("--debug", help="Debug", action="store_true")
+ parser_query.add_argument("--disable-partial", help="Disable Partial find,"
+ " Fail when one node fails", action="store_true")
+ parser_query.add_argument("--output-prefix", help="File prefix in output",
+ default=".")
+ parser_query.add_argument("-N", "--only-namespace-changes",
+ help="List only namespace changes",
+ action="store_true")
+ parser_query.add_argument("--tag-for-full-find",
+ help="Tag prefix for file names emitted during"
+ " a full find operation; default: \"NEW\"",
+ default="NEW")
+ parser_query.add_argument('--type', help="type: f, f-files only"
+ " d, d-directories only, by default = both",
+ default='both', choices=["f", "d", "both"])
+ parser_query.add_argument("--field-separator",
+ help="Field separator string",
+ default=" ")
+
+ # post <SESSION> <VOLUME>
+ parser_post = subparsers.add_parser('post')
+ parser_post.add_argument("session", help="Session Name")
+ parser_post.add_argument("volume", help="Volume Name")
+ parser_post.add_argument("--debug", help="Debug", action="store_true")
+
+ return parser.parse_args()
+
+
+def ssh_setup(args):
+ pem_key_path = get_pem_key_path(args.session, args.volume)
+
+ if not os.path.exists(pem_key_path):
+ # Generate ssh-key
+ cmd = ["ssh-keygen",
+ "-N",
+ "",
+ "-f",
+ pem_key_path]
+ execute(cmd,
+ exit_msg="Unable to generate ssh key %s"
+ % pem_key_path,
+ logger=logger)
+
+ logger.info("Ssh key generated %s" % pem_key_path)
+
+ try:
+ shutil.copyfile(pem_key_path + ".pub",
+ os.path.join(conf.get_opt("session_dir"),
+ ".keys",
+ "%s_%s_secret.pem.pub" % (args.session,
+ args.volume)))
+ except (IOError, OSError) as e:
+ fail("Failed to copy public key to %s: %s"
+ % (os.path.join(conf.get_opt("session_dir"), ".keys"), e),
+ logger=logger)
+
+ # Copy pub file to all nodes
+ cmd = ["gluster",
+ "system::",
+ "copy",
+ "file",
+ "/glusterfind/.keys/%s.pub" % os.path.basename(pem_key_path)]
+
+ execute(cmd, exit_msg="Failed to distribute ssh keys", logger=logger)
+
+ logger.info("Distributed ssh key to all nodes of Volume")
+
+ # Add to authorized_keys file in each node
+ cmd = ["gluster",
+ "system::",
+ "execute",
+ "add_secret_pub",
+ "root",
+ "/glusterfind/.keys/%s.pub" % os.path.basename(pem_key_path)]
+ execute(cmd,
+ exit_msg="Failed to add ssh keys to authorized_keys file",
+ logger=logger)
+
+ logger.info("Ssh key added to authorized_keys of Volume nodes")
+
+
+def enable_volume_options(args):
+ execute(["gluster", "volume", "set",
+ args.volume, "build-pgfid", "on"],
+ exit_msg="Failed to set volume option build-pgfid on",
+ logger=logger)
+ logger.info("Volume option set %s, build-pgfid on" % args.volume)
+
+ execute(["gluster", "volume", "set",
+ args.volume, "changelog.changelog", "on"],
+ exit_msg="Failed to set volume option "
+ "changelog.changelog on", logger=logger)
+ logger.info("Volume option set %s, changelog.changelog on"
+ % args.volume)
+
+ execute(["gluster", "volume", "set",
+ args.volume, "changelog.capture-del-path", "on"],
+ exit_msg="Failed to set volume option "
+ "changelog.capture-del-path on", logger=logger)
+ logger.info("Volume option set %s, changelog.capture-del-path on"
+ % args.volume)
+
+
+def write_output(outfile, outfilemerger, field_separator):
+ with codecs.open(outfile, "a", encoding="utf-8") as f:
+ for row in outfilemerger.get():
+ # Multiple paths in case of Hardlinks
+ paths = row[1].split(",")
+ row_2_rep = None
+ for p in paths:
+ if p == "":
+ continue
+ p_rep = p.replace("//", "/")
+ if not row_2_rep:
+ row_2_rep = row[2].replace("//", "/")
+ if p_rep == row_2_rep:
+ continue
+
+ if row_2_rep and row_2_rep != "":
+ gfind_write_row(f, row[0], field_separator, p_rep, row_2_rep)
+
+ else:
+ gfind_write(f, row[0], field_separator, p_rep)
+
+def validate_volume(volume):
+ cmd = ["gluster", 'volume', 'info', volume, "--xml"]
+ _, data, _ = execute(cmd,
+ exit_msg="Failed to Run Gluster Volume Info",
+ logger=logger)
+ try:
+ tree = etree.fromstring(data)
+ statusStr = tree.find('volInfo/volumes/volume/statusStr').text
+ except (ParseError, AttributeError) as e:
+ fail("Invalid Volume: Check the Volume name! %s" % e)
+ if statusStr != "Started":
+ fail("Volume %s is not online" % volume)
+
+# The rules for a valid session name.
+SESSION_NAME_RULES = {
+ 'min_length': 2,
+ 'max_length': 256, # same as maximum volume length
+ # Specifies all alphanumeric characters, underscore, hyphen.
+ 'valid_chars': r'0-9a-zA-Z_-',
+}
+
+
+# checks valid session name, fail otherwise
+def validate_session_name(session):
+ # Check for minimum length
+ if len(session) < SESSION_NAME_RULES['min_length']:
+ fail('session_name must be at least ' +
+ str(SESSION_NAME_RULES['min_length']) + ' characters long.')
+ # Check for maximum length
+ if len(session) > SESSION_NAME_RULES['max_length']:
+ fail('session_name must not exceed ' +
+ str(SESSION_NAME_RULES['max_length']) + ' characters length.')
+
+ # Matches strings composed entirely of characters specified within
+ if not re.match(r'^[' + SESSION_NAME_RULES['valid_chars'] +
+ ']+$', session):
+ fail('Session name can only contain these characters: ' +
+ SESSION_NAME_RULES['valid_chars'])
+
+
+def mode_create(session_dir, args):
+ validate_session_name(args.session)
+
+ logger.debug("Init is called - Session: %s, Volume: %s"
+ % (args.session, args.volume))
+ mkdirp(session_dir, exit_on_err=True, logger=logger)
+ mkdirp(os.path.join(session_dir, args.volume), exit_on_err=True,
+ logger=logger)
+ status_file = os.path.join(session_dir, args.volume, "status")
+
+ if os.path.exists(status_file) and not args.force:
+ fail("Session %s already created" % args.session, logger=logger)
+
+ if not os.path.exists(status_file) or args.force:
+ ssh_setup(args)
+ enable_volume_options(args)
+
+ # Add Rollover time to current time to make sure changelogs
+ # will be available if we use this time as start time
+ time_to_update = int(time.time()) + get_changelog_rollover_time(
+ args.volume)
+
+ run_cmd_nodes("create", args, time_to_update=str(time_to_update))
+
+ if not os.path.exists(status_file) or args.reset_session_time:
+ with open(status_file, "w") as f:
+ f.write(str(time_to_update))
+
+ sys.stdout.write("Session %s created with volume %s\n" %
+ (args.session, args.volume))
+
+ sys.exit(0)
+
+
+def mode_query(session_dir, args):
+ global gtmpfilename
+ global g_pid_nodefile_map
+
+ # Verify volume status
+ cmd = ["gluster", 'volume', 'info', args.volume, "--xml"]
+ _, data, _ = execute(cmd,
+ exit_msg="Failed to Run Gluster Volume Info",
+ logger=logger)
+ try:
+ tree = etree.fromstring(data)
+ statusStr = tree.find('volInfo/volumes/volume/statusStr').text
+ except (ParseError, AttributeError) as e:
+ fail("Invalid Volume: %s" % e, logger=logger)
+
+ if statusStr != "Started":
+ fail("Volume %s is not online" % args.volume, logger=logger)
+
+ mkdirp(session_dir, exit_on_err=True, logger=logger)
+ mkdirp(os.path.join(session_dir, args.volume), exit_on_err=True,
+ logger=logger)
+ mkdirp(os.path.dirname(args.outfile), exit_on_err=True, logger=logger)
+
+ # Configure cluster for pasword-less SSH
+ ssh_setup(args)
+
+ # Enable volume options for changelog capture
+ enable_volume_options(args)
+
+ # Test options
+ if not args.full and args.type in ["f", "d"]:
+ fail("--type can only be used with --full")
+ if not args.since_time and not args.end_time and not args.full:
+ fail("Please specify either {--since-time and optionally --end-time} "
+ "or --full", logger=logger)
+
+ if args.since_time and args.end_time and args.full:
+ fail("Please specify either {--since-time and optionally --end-time} "
+ "or --full, but not both",
+ logger=logger)
+
+ if args.end_time and not args.since_time:
+ fail("Please specify --since-time as well", logger=logger)
+
+ # Start query command processing
+ start = -1
+ end = -1
+ if args.since_time:
+ start = args.since_time
+ if args.end_time:
+ end = args.end_time
+ else:
+ start = 0 # --full option is handled separately
+
+ logger.debug("Query is called - Session: %s, Volume: %s, "
+ "Start time: %s, End time: %s"
+ % ("default", args.volume, start, end))
+
+ prefix = datetime.now().strftime("%Y%m%d-%H%M%S-%f-")
+ gtmpfilename = prefix + next(tempfile._get_candidate_names())
+
+ run_cmd_nodes("query", args, start=start, end=end,
+ tmpfilename=gtmpfilename)
+
+ # Merger
+ if args.full:
+ if len(g_pid_nodefile_map) > 0:
+ cmd = ["sort", "-u"] + list(g_pid_nodefile_map.values()) + \
+ ["-o", args.outfile]
+ execute(cmd,
+ exit_msg="Failed to merge output files "
+ "collected from nodes", logger=logger)
+ else:
+ fail("Failed to collect any output files from peers. "
+ "Looks like all bricks are offline.", logger=logger)
+ else:
+ # Read each Changelogs db and generate finaldb
+ create_file(args.outfile, exit_on_err=True, logger=logger)
+ outfilemerger = OutputMerger(args.outfile + ".db",
+ list(g_pid_nodefile_map.values()))
+ write_output(args.outfile, outfilemerger, args.field_separator)
+
+ try:
+ os.remove(args.outfile + ".db")
+ except (IOError, OSError):
+ pass
+
+ run_cmd_nodes("cleanup", args, tmpfilename=gtmpfilename)
+
+ sys.stdout.write("Generated output file %s\n" % args.outfile)
+
+
+def mode_pre(session_dir, args):
+ global gtmpfilename
+ global g_pid_nodefile_map
+
+ """
+ Read from Session file and write to session.pre file
+ """
+ endtime_to_update = int(time.time()) - get_changelog_rollover_time(
+ args.volume)
+ status_file = os.path.join(session_dir, args.volume, "status")
+ status_file_pre = status_file + ".pre"
+
+ mkdirp(os.path.dirname(args.outfile), exit_on_err=True, logger=logger)
+
+ if not args.full and args.type in ["f", "d"]:
+ fail("--type can only be used with --full")
+
+ # If Pre status file exists and running pre command again
+ if os.path.exists(status_file_pre) and not args.regenerate_outfile:
+ fail("Post command is not run after last pre, "
+ "use --regenerate-outfile")
+
+ start = 0
+ try:
+ with open(status_file) as f:
+ start = int(f.read().strip())
+ except ValueError:
+ pass
+ except (OSError, IOError) as e:
+ fail("Error Opening Session file %s: %s"
+ % (status_file, e), logger=logger)
+
+ logger.debug("Pre is called - Session: %s, Volume: %s, "
+ "Start time: %s, End time: %s"
+ % (args.session, args.volume, start, endtime_to_update))
+
+ prefix = datetime.now().strftime("%Y%m%d-%H%M%S-%f-")
+ gtmpfilename = prefix + next(tempfile._get_candidate_names())
+
+ run_cmd_nodes("pre", args, start=start, end=-1, tmpfilename=gtmpfilename)
+
+ # Merger
+ if args.full:
+ if len(g_pid_nodefile_map) > 0:
+ cmd = ["sort", "-u"] + list(g_pid_nodefile_map.values()) + \
+ ["-o", args.outfile]
+ execute(cmd,
+ exit_msg="Failed to merge output files "
+ "collected from nodes", logger=logger)
+ else:
+ fail("Failed to collect any output files from peers. "
+ "Looks like all bricks are offline.", logger=logger)
+ else:
+ # Read each Changelogs db and generate finaldb
+ create_file(args.outfile, exit_on_err=True, logger=logger)
+ outfilemerger = OutputMerger(args.outfile + ".db",
+ list(g_pid_nodefile_map.values()))
+ write_output(args.outfile, outfilemerger, args.field_separator)
+
+ try:
+ os.remove(args.outfile + ".db")
+ except (IOError, OSError):
+ pass
+
+ run_cmd_nodes("cleanup", args, tmpfilename=gtmpfilename)
+
+ with open(status_file_pre, "w") as f:
+ f.write(str(endtime_to_update))
+
+ sys.stdout.write("Generated output file %s\n" % args.outfile)
+
+
+def mode_post(session_dir, args):
+ """
+ If pre session file exists, overwrite session file
+ If pre session file does not exists, return ERROR
+ """
+ status_file = os.path.join(session_dir, args.volume, "status")
+ logger.debug("Post is called - Session: %s, Volume: %s"
+ % (args.session, args.volume))
+ status_file_pre = status_file + ".pre"
+
+ if os.path.exists(status_file_pre):
+ run_cmd_nodes("post", args)
+ os.rename(status_file_pre, status_file)
+ sys.stdout.write("Session %s with volume %s updated\n" %
+ (args.session, args.volume))
+ sys.exit(0)
+ else:
+ fail("Pre script is not run", logger=logger)
+
+
+def mode_delete(session_dir, args):
+ run_cmd_nodes("delete", args)
+ shutil.rmtree(os.path.join(session_dir, args.volume),
+ onerror=handle_rm_error)
+ sys.stdout.write("Session %s with volume %s deleted\n" %
+ (args.session, args.volume))
+
+ # If the session contains only this volume, then cleanup the
+ # session directory. If a session contains multiple volumes
+ # then os.rmdir will fail with ENOTEMPTY
+ try:
+ os.rmdir(session_dir)
+ except OSError as e:
+ if not e.errno == ENOTEMPTY:
+ logger.warn("Failed to delete session directory: %s" % e)
+
+
+def mode_list(session_dir, args):
+ """
+ List available sessions to stdout, if session name is set
+ only list that session.
+ """
+ if args.session:
+ if not os.path.exists(os.path.join(session_dir, args.session)):
+ fail("Invalid Session", logger=logger)
+ sessions = [args.session]
+ else:
+ sessions = []
+ for d in os.listdir(session_dir):
+ if d != ".keys":
+ sessions.append(d)
+
+ output = []
+ for session in sessions:
+ # Session Volume Last Processed
+ volnames = os.listdir(os.path.join(session_dir, session))
+
+ for volname in volnames:
+ if args.volume and args.volume != volname:
+ continue
+
+ status_file = os.path.join(session_dir, session, volname, "status")
+ last_processed = None
+ try:
+ with open(status_file) as f:
+ last_processed = f.read().strip()
+ except (OSError, IOError) as e:
+ if e.errno == ENOENT:
+ continue
+ else:
+ raise
+ output.append((session, volname, last_processed))
+
+ if output:
+ sys.stdout.write("%s %s %s\n" % ("SESSION".ljust(25),
+ "VOLUME".ljust(25),
+ "SESSION TIME".ljust(25)))
+ sys.stdout.write("-"*75)
+ sys.stdout.write("\n")
+ for session, volname, last_processed in output:
+ sess_time = 'Session Corrupted'
+ if last_processed:
+ try:
+ sess_time = human_time(last_processed)
+ except TypeError:
+ sess_time = 'Session Corrupted'
+ sys.stdout.write("%s %s %s\n" % (session.ljust(25),
+ volname.ljust(25),
+ sess_time.ljust(25)))
+
+ if not output:
+ if args.session or args.volume:
+ fail("Invalid Session", logger=logger)
+ else:
+ sys.stdout.write("No sessions found.\n")
+
+
+def main():
+ global gtmpfilename
+
+ args = None
+
+ try:
+ args = _get_args()
+ mkdirp(conf.get_opt("session_dir"), exit_on_err=True)
+
+ # force the default session name if mode is "query"
+ if args.mode == "query":
+ args.session = "default"
+
+ if args.mode == "list":
+ session_dir = conf.get_opt("session_dir")
+ else:
+ session_dir = os.path.join(conf.get_opt("session_dir"),
+ args.session)
+
+ if not os.path.exists(session_dir) and \
+ args.mode not in ["create", "list", "query"]:
+ fail("Invalid session %s" % args.session)
+
+ # volume involved, validate the volume first
+ if args.mode not in ["list"]:
+ validate_volume(args.volume)
+
+
+ # "default" is a system defined session name
+ if args.mode in ["create", "post", "pre", "delete"] and \
+ args.session == "default":
+ fail("Invalid session %s" % args.session)
+
+ vol_dir = os.path.join(session_dir, args.volume)
+ if not os.path.exists(vol_dir) and args.mode not in \
+ ["create", "list", "query"]:
+ fail("Session %s not created with volume %s" %
+ (args.session, args.volume))
+
+ mkdirp(os.path.join(conf.get_opt("log_dir"),
+ args.session,
+ args.volume),
+ exit_on_err=True)
+ log_file = os.path.join(conf.get_opt("log_dir"),
+ args.session,
+ args.volume,
+ "cli.log")
+ setup_logger(logger, log_file, args.debug)
+
+ # globals() will have all the functions already defined.
+ # mode_<args.mode> will be the function name to be called
+ globals()["mode_" + args.mode](session_dir, args)
+ except KeyboardInterrupt:
+ if args is not None:
+ if args.mode == "pre" or args.mode == "query":
+ # cleanup session
+ if gtmpfilename is not None:
+ # no more interrupts until we clean up
+ signal.signal(signal.SIGINT, signal.SIG_IGN)
+ run_cmd_nodes("cleanup", args, tmpfilename=gtmpfilename)
+
+ # Interrupted, exit with non zero error code
+ sys.exit(2)
diff --git a/tools/glusterfind/src/nodeagent.py b/tools/glusterfind/src/nodeagent.py
new file mode 100644
index 00000000000..679daa6fa76
--- /dev/null
+++ b/tools/glusterfind/src/nodeagent.py
@@ -0,0 +1,139 @@
+#!/usr/bin/python3
+# -*- coding: utf-8 -*-
+
+# Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com/>
+# This file is part of GlusterFS.
+#
+# This file is licensed to you under your choice of the GNU Lesser
+# General Public License, version 3 or any later version (LGPLv3 or
+# later), or the GNU General Public License, version 2 (GPLv2), in all
+# cases as published by the Free Software Foundation.
+
+import shutil
+import sys
+import os
+import logging
+from argparse import ArgumentParser, RawDescriptionHelpFormatter
+try:
+ import urllib.parse as urllib
+except ImportError:
+ import urllib
+from errno import ENOTEMPTY
+
+from utils import setup_logger, mkdirp, handle_rm_error
+import conf
+
+logger = logging.getLogger()
+
+
+def mode_cleanup(args):
+ working_dir = os.path.join(conf.get_opt("working_dir"),
+ args.session,
+ args.volume,
+ args.tmpfilename)
+
+ mkdirp(os.path.join(conf.get_opt("log_dir"), args.session, args.volume),
+ exit_on_err=True)
+ log_file = os.path.join(conf.get_opt("log_dir"),
+ args.session,
+ args.volume,
+ "changelog.log")
+
+ setup_logger(logger, log_file)
+
+ try:
+ shutil.rmtree(working_dir, onerror=handle_rm_error)
+ except (OSError, IOError) as e:
+ logger.error("Failed to delete working directory: %s" % e)
+ sys.exit(1)
+
+
+def mode_create(args):
+ session_dir = os.path.join(conf.get_opt("session_dir"),
+ args.session)
+ status_file = os.path.join(session_dir, args.volume,
+ "%s.status" % urllib.quote_plus(args.brick))
+
+ mkdirp(os.path.join(session_dir, args.volume), exit_on_err=True,
+ logger=logger)
+
+ if not os.path.exists(status_file) or args.reset_session_time:
+ with open(status_file, "w") as f:
+ f.write(args.time_to_update)
+
+ sys.exit(0)
+
+
+def mode_post(args):
+ session_dir = os.path.join(conf.get_opt("session_dir"), args.session)
+ status_file = os.path.join(session_dir, args.volume,
+ "%s.status" % urllib.quote_plus(args.brick))
+
+ mkdirp(os.path.join(session_dir, args.volume), exit_on_err=True,
+ logger=logger)
+ status_file_pre = status_file + ".pre"
+
+ if os.path.exists(status_file_pre):
+ os.rename(status_file_pre, status_file)
+ sys.exit(0)
+
+
+def mode_delete(args):
+ session_dir = os.path.join(conf.get_opt("session_dir"),
+ args.session)
+ shutil.rmtree(os.path.join(session_dir, args.volume),
+ onerror=handle_rm_error)
+
+ # If the session contains only this volume, then cleanup the
+ # session directory. If a session contains multiple volumes
+ # then os.rmdir will fail with ENOTEMPTY
+ try:
+ os.rmdir(session_dir)
+ except OSError as e:
+ if not e.errno == ENOTEMPTY:
+ logger.warn("Failed to delete session directory: %s" % e)
+
+
+def _get_args():
+ parser = ArgumentParser(formatter_class=RawDescriptionHelpFormatter,
+ description="Node Agent")
+ subparsers = parser.add_subparsers(dest="mode")
+
+ parser_cleanup = subparsers.add_parser('cleanup')
+ parser_cleanup.add_argument("session", help="Session Name")
+ parser_cleanup.add_argument("volume", help="Volume Name")
+ parser_cleanup.add_argument("tmpfilename", help="Temporary File Name")
+ parser_cleanup.add_argument("--debug", help="Debug", action="store_true")
+
+ parser_session_create = subparsers.add_parser('create')
+ parser_session_create.add_argument("session", help="Session Name")
+ parser_session_create.add_argument("volume", help="Volume Name")
+ parser_session_create.add_argument("brick", help="Brick Path")
+ parser_session_create.add_argument("time_to_update", help="Time to Update")
+ parser_session_create.add_argument("--reset-session-time",
+ help="Reset Session Time",
+ action="store_true")
+ parser_session_create.add_argument("--debug", help="Debug",
+ action="store_true")
+
+ parser_post = subparsers.add_parser('post')
+ parser_post.add_argument("session", help="Session Name")
+ parser_post.add_argument("volume", help="Volume Name")
+ parser_post.add_argument("brick", help="Brick Path")
+ parser_post.add_argument("--debug", help="Debug",
+ action="store_true")
+
+ parser_delete = subparsers.add_parser('delete')
+ parser_delete.add_argument("session", help="Session Name")
+ parser_delete.add_argument("volume", help="Volume Name")
+ parser_delete.add_argument("--debug", help="Debug",
+ action="store_true")
+ return parser.parse_args()
+
+
+if __name__ == "__main__":
+ args = _get_args()
+
+ # globals() will have all the functions already defined.
+ # mode_<args.mode> will be the function name to be called
+ globals()["mode_" + args.mode](args)
diff --git a/tools/glusterfind/src/tool.conf.in b/tools/glusterfind/src/tool.conf.in
new file mode 100644
index 00000000000..a80f4a784c0
--- /dev/null
+++ b/tools/glusterfind/src/tool.conf.in
@@ -0,0 +1,10 @@
+[vars]
+session_dir=@GLUSTERD_WORKDIR@/glusterfind/
+working_dir=@GLUSTERFSD_MISCDIR@/glusterfind/
+log_dir=/var/log/glusterfs/glusterfind/
+nodeagent=@GLUSTERFS_LIBEXECDIR@/glusterfind/nodeagent.py
+brick_ignore_dirs=.glusterfs,.trashcan
+
+[change_detectors]
+changelog=@GLUSTERFS_LIBEXECDIR@/glusterfind/changelog.py
+brickfind=@GLUSTERFS_LIBEXECDIR@/glusterfind/brickfind.py \ No newline at end of file
diff --git a/tools/glusterfind/src/utils.py b/tools/glusterfind/src/utils.py
new file mode 100644
index 00000000000..906ebd8f252
--- /dev/null
+++ b/tools/glusterfind/src/utils.py
@@ -0,0 +1,267 @@
+# -*- coding: utf-8 -*-
+
+# Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com/>
+# This file is part of GlusterFS.
+#
+# This file is licensed to you under your choice of the GNU Lesser
+# General Public License, version 3 or any later version (LGPLv3 or
+# later), or the GNU General Public License, version 2 (GPLv2), in all
+# cases as published by the Free Software Foundation.
+
+import sys
+from subprocess import PIPE, Popen
+from errno import EEXIST, ENOENT
+import xml.etree.cElementTree as etree
+import logging
+import os
+from datetime import datetime
+
+ROOT_GFID = "00000000-0000-0000-0000-000000000001"
+DEFAULT_CHANGELOG_INTERVAL = 15
+SPACE_ESCAPE_CHAR = "%20"
+NEWLINE_ESCAPE_CHAR = "%0A"
+PERCENTAGE_ESCAPE_CHAR = "%25"
+
+ParseError = etree.ParseError if hasattr(etree, 'ParseError') else SyntaxError
+cache_data = {}
+
+
+class RecordType(object):
+ NEW = "NEW"
+ MODIFY = "MODIFY"
+ RENAME = "RENAME"
+ DELETE = "DELETE"
+
+
+def cache_output(func):
+ def wrapper(*args, **kwargs):
+ global cache_data
+ if cache_data.get(func.__name__, None) is None:
+ cache_data[func.__name__] = func(*args, **kwargs)
+
+ return cache_data[func.__name__]
+ return wrapper
+
+
+def handle_rm_error(func, path, exc_info):
+ if exc_info[1].errno == ENOENT:
+ return
+
+ raise exc_info[1]
+
+
+def find(path, callback_func=lambda x: True, filter_func=lambda x: True,
+ ignore_dirs=[], subdirs_crawl=True):
+ if path in ignore_dirs:
+ return
+
+ # Capture filter_func output and pass it to callback function
+ filter_result = filter_func(path)
+ if filter_result is not None:
+ callback_func(path, filter_result, os.path.isdir(path))
+
+ for p in os.listdir(path):
+ full_path = os.path.join(path, p)
+
+ is_dir = os.path.isdir(full_path)
+ if is_dir:
+ if subdirs_crawl:
+ find(full_path, callback_func, filter_func, ignore_dirs)
+ else:
+ filter_result = filter_func(full_path)
+ if filter_result is not None:
+ callback_func(full_path, filter_result)
+ else:
+ filter_result = filter_func(full_path)
+ if filter_result is not None:
+ callback_func(full_path, filter_result, is_dir)
+
+
+def output_write(f, path, prefix=".", encode=False, tag="",
+ field_separator=" "):
+ if path == "":
+ return
+
+ if prefix != ".":
+ path = os.path.join(prefix, path)
+
+ if encode:
+ path = quote_plus_space_newline(path)
+
+ # set the field separator
+ FS = "" if tag == "" else field_separator
+
+ f.write("%s%s%s\n" % (tag.strip(), FS, path))
+
+
+def human_time(ts):
+ return datetime.fromtimestamp(float(ts)).strftime("%Y-%m-%d %H:%M:%S")
+
+
+def setup_logger(logger, path, debug=False):
+ if debug:
+ logger.setLevel(logging.DEBUG)
+ else:
+ logger.setLevel(logging.INFO)
+
+ # create the logging file handler
+ fh = logging.FileHandler(path)
+
+ formatter = logging.Formatter("[%(asctime)s] %(levelname)s "
+ "[%(module)s - %(lineno)s:%(funcName)s] "
+ "- %(message)s")
+
+ fh.setFormatter(formatter)
+
+ # add handler to logger object
+ logger.addHandler(fh)
+
+
+def create_file(path, exit_on_err=False, logger=None):
+ """
+ If file exists overwrite. Print error to stderr and exit
+ if exit_on_err is set, else raise the exception. Consumer
+ should handle the exception.
+ """
+ try:
+ open(path, 'w').close()
+ except (OSError, IOError) as e:
+ if exit_on_err:
+ fail("Failed to create file %s: %s" % (path, e), logger=logger)
+ else:
+ raise
+
+
+def mkdirp(path, exit_on_err=False, logger=None):
+ """
+ Try creating required directory structure
+ ignore EEXIST and raise exception for rest of the errors.
+ Print error in stderr and exit if exit_on_err is set, else
+ raise exception.
+ """
+ try:
+ os.makedirs(path)
+ except (OSError, IOError) as e:
+ if e.errno == EEXIST and os.path.isdir(path):
+ pass
+ else:
+ if exit_on_err:
+ fail("Fail to create dir %s: %s" % (path, e), logger=logger)
+ else:
+ raise
+
+
+def fail(msg, code=1, logger=None):
+ """
+ Write error to stderr and exit
+ """
+ if logger:
+ logger.error(msg)
+ sys.stderr.write("%s\n" % msg)
+ sys.exit(code)
+
+
+def execute(cmd, exit_msg=None, logger=None):
+ """
+ If failure_msg is not None then return returncode, out and error.
+ If failure msg is set, write to stderr and exit.
+ """
+ p = Popen(cmd, stdin=PIPE, stdout=PIPE, stderr=PIPE, close_fds=True)
+
+ (out, err) = p.communicate()
+ if p.returncode != 0 and exit_msg is not None:
+ fail("%s: %s" % (exit_msg, err), p.returncode, logger=logger)
+
+ return (p.returncode, out, err)
+
+
+def symlink_gfid_to_path(brick, gfid):
+ """
+ Each directories are symlinked to file named GFID
+ in .glusterfs directory of brick backend. Using readlink
+ we get PARGFID/basename of dir. readlink recursively till
+ we get PARGFID as ROOT_GFID.
+ """
+ if gfid == ROOT_GFID:
+ return ""
+
+ out_path = ""
+ while True:
+ path = os.path.join(brick, ".glusterfs", gfid[0:2], gfid[2:4], gfid)
+ path_readlink = os.readlink(path)
+ pgfid = os.path.dirname(path_readlink)
+ out_path = os.path.join(os.path.basename(path_readlink), out_path)
+ if pgfid == "../../00/00/%s" % ROOT_GFID:
+ break
+ gfid = os.path.basename(pgfid)
+ return out_path
+
+
+@cache_output
+def get_my_uuid():
+ cmd = ["gluster", "system::", "uuid", "get", "--xml"]
+ rc, out, err = execute(cmd)
+
+ if rc != 0:
+ return None
+
+ tree = etree.fromstring(out)
+ uuid_el = tree.find("uuidGenerate/uuid")
+ return uuid_el.text
+
+
+def is_host_local(host_uuid):
+ # Get UUID only if it is not done previously
+ # else Cache the UUID value
+ my_uuid = get_my_uuid()
+ if my_uuid == host_uuid:
+ return True
+
+ return False
+
+
+def get_changelog_rollover_time(volumename):
+ cmd = ["gluster", "volume", "get", volumename,
+ "changelog.rollover-time", "--xml"]
+ rc, out, err = execute(cmd)
+
+ if rc != 0:
+ return DEFAULT_CHANGELOG_INTERVAL
+
+ try:
+ tree = etree.fromstring(out)
+ val = tree.find('volGetopts/Opt/Value').text
+ if val is not None:
+ # Filter the value by split, as it may be 'X (DEFAULT)'
+ # and we only need 'X'
+ return int(val.split(' ', 1)[0])
+ except ParseError:
+ return DEFAULT_CHANGELOG_INTERVAL
+
+
+def output_path_prepare(path, args):
+ """
+ If Prefix is set, joins to Path, removes ending slash
+ and encodes it.
+ """
+ if args.output_prefix != ".":
+ path = os.path.join(args.output_prefix, path)
+ if path.endswith("/"):
+ path = path[0:len(path)-1]
+
+ if args.no_encode:
+ return path
+ else:
+ return quote_plus_space_newline(path)
+
+
+def unquote_plus_space_newline(s):
+ return s.replace(SPACE_ESCAPE_CHAR, " ")\
+ .replace(NEWLINE_ESCAPE_CHAR, "\n")\
+ .replace(PERCENTAGE_ESCAPE_CHAR, "%")
+
+
+def quote_plus_space_newline(s):
+ return s.replace("%", PERCENTAGE_ESCAPE_CHAR)\
+ .replace(" ", SPACE_ESCAPE_CHAR)\
+ .replace("\n", NEWLINE_ESCAPE_CHAR)
diff --git a/tools/setgfid2path/Makefile.am b/tools/setgfid2path/Makefile.am
new file mode 100644
index 00000000000..c14787a80ce
--- /dev/null
+++ b/tools/setgfid2path/Makefile.am
@@ -0,0 +1,5 @@
+SUBDIRS = src
+
+EXTRA_DIST = gluster-setgfid2path.8
+
+man8_MANS = gluster-setgfid2path.8
diff --git a/tools/setgfid2path/gluster-setgfid2path.8 b/tools/setgfid2path/gluster-setgfid2path.8
new file mode 100644
index 00000000000..2e228ca8514
--- /dev/null
+++ b/tools/setgfid2path/gluster-setgfid2path.8
@@ -0,0 +1,54 @@
+
+.\" Copyright (c) 2017 Red Hat, Inc. <http://www.redhat.com>
+.\" This file is part of GlusterFS.
+.\"
+.\" This file is licensed to you under your choice of the GNU Lesser
+.\" General Public License, version 3 or any later version (LGPLv3 or
+.\" later), or the GNU General Public License, version 2 (GPLv2), in all
+.\" cases as published by the Free Software Foundation.
+.\"
+.\"
+.TH gluster-setgfid2path 8 "Command line utility to set GFID to Path Xattrs"
+.SH NAME
+gluster-setgfid2path - Gluster tool to set GFID to Path xattrs
+.SH SYNOPSIS
+.B gluster-setgfid2path
+.IR file
+.SH DESCRIPTION
+New feature introduced with Gluster release 3.12, to find full path from GFID.
+This feature can be enabled using Volume set command \fBgluster volume set
+<VOLUME> storage.gfid2path enable\fR
+.PP
+Once \fBgfid2path\fR feature is enabled, it starts recording the necessary
+xattrs required for the feature. But it will not add xattrs for the already
+existing files. This tool provides facility to update the gfid2path xattrs for
+the given file path.
+
+.SH EXAMPLES
+To add xattrs of a single file,
+.PP
+.nf
+.RS
+gluster-setgfid2path /bricks/b1/hello.txt
+.RE
+.fi
+.PP
+To set xattr for all the existing files, run the below script on each bricks.
+.PP
+.nf
+.RS
+BRICK=/bricks/b1
+find $BRICK -type d \\( -path "${BRICK}/.trashcan" -o -path \\
+ "${BRICK}/.glusterfs" \\) -prune -o -type f \\
+ -exec gluster-setgfid2path {} \\;
+.RE
+.fi
+.PP
+.SH SEE ALSO
+.nf
+\fBgluster\fR(8)
+\fR
+.fi
+.SH COPYRIGHT
+.nf
+Copyright(c) 2017 Red Hat, Inc. <http://www.redhat.com>
diff --git a/tools/setgfid2path/src/Makefile.am b/tools/setgfid2path/src/Makefile.am
new file mode 100644
index 00000000000..7316d117070
--- /dev/null
+++ b/tools/setgfid2path/src/Makefile.am
@@ -0,0 +1,16 @@
+gluster_setgfid2pathdir = $(sbindir)
+
+if WITH_SERVER
+gluster_setgfid2path_PROGRAMS = gluster-setgfid2path
+endif
+
+gluster_setgfid2path_SOURCES = main.c
+
+gluster_setgfid2path_LDADD = $(top_builddir)/libglusterfs/src/libglusterfs.la
+
+gluster_setgfid2path_LDFLAGS = $(GF_LDFLAGS)
+
+AM_CPPFLAGS = $(GF_CPPFLAGS) -I$(top_srcdir)/libglusterfs/src \
+ -I$(top_builddir)/rpc/xdr/src
+
+AM_CFLAGS = -Wall $(GF_CFLAGS)
diff --git a/tools/setgfid2path/src/main.c b/tools/setgfid2path/src/main.c
new file mode 100644
index 00000000000..4320a7b2481
--- /dev/null
+++ b/tools/setgfid2path/src/main.c
@@ -0,0 +1,130 @@
+/*
+ Copyright (c) 2017 Red Hat, Inc. <http://www.redhat.com>
+ This file is part of GlusterFS.
+
+ This file is licensed to you under your choice of the GNU Lesser
+ General Public License, version 3 or any later version (LGPLv3 or
+ later), or the GNU General Public License, version 2 (GPLv2), in all
+ cases as published by the Free Software Foundation.
+ */
+#include <stdio.h>
+#include <libgen.h>
+
+#include <glusterfs/common-utils.h>
+#include <glusterfs/syscall.h>
+
+#define MAX_GFID2PATH_LINK_SUP 500
+#define GFID_SIZE 16
+#define GFID_XATTR_KEY "trusted.gfid"
+
+int
+main(int argc, char **argv)
+{
+ int ret = 0;
+ struct stat st;
+ char *dname = NULL;
+ char *bname = NULL;
+ ssize_t ret_size = 0;
+ uuid_t pgfid_raw = {
+ 0,
+ };
+ char pgfid[36 + 1] = "";
+ char xxh64[GF_XXH64_DIGEST_LENGTH * 2 + 1] = {
+ 0,
+ };
+ char pgfid_bname[1024] = {
+ 0,
+ };
+ char *key = NULL;
+ char *val = NULL;
+ size_t key_size = 0;
+ size_t val_size = 0;
+ const char *file_path = NULL;
+ char *file_path1 = NULL;
+ char *file_path2 = NULL;
+
+ if (argc != 2) {
+ fprintf(stderr, "Usage: setgfid2path <file-path>\n");
+ return -1;
+ }
+
+ ret = sys_lstat(argv[1], &st);
+ if (ret != 0) {
+ fprintf(stderr, "Invalid File Path\n");
+ return -1;
+ }
+
+ if (st.st_nlink >= MAX_GFID2PATH_LINK_SUP) {
+ fprintf(stderr,
+ "Number of Hardlink support exceeded. "
+ "max=%d\n",
+ MAX_GFID2PATH_LINK_SUP);
+ return -1;
+ }
+
+ file_path = argv[1];
+ file_path1 = strdup(file_path);
+ file_path2 = strdup(file_path);
+
+ dname = dirname(file_path1);
+ bname = basename(file_path2);
+
+ /* Get GFID of Parent directory */
+ ret_size = sys_lgetxattr(dname, GFID_XATTR_KEY, pgfid_raw, GFID_SIZE);
+ if (ret_size != GFID_SIZE) {
+ fprintf(stderr, "Failed to get GFID of parent directory. dir=%s\n",
+ dname);
+ ret = -1;
+ goto out;
+ }
+
+ /* Convert to UUID format */
+ if (uuid_utoa_r(pgfid_raw, pgfid) == NULL) {
+ fprintf(stderr,
+ "Failed to format GFID of parent directory. "
+ "dir=%s GFID=%s\n",
+ dname, pgfid_raw);
+ ret = -1;
+ goto out;
+ }
+
+ /* Find xxhash for PGFID/BaseName */
+ snprintf(pgfid_bname, sizeof(pgfid_bname), "%s/%s", pgfid, bname);
+ gf_xxh64_wrapper((unsigned char *)pgfid_bname, strlen(pgfid_bname),
+ GF_XXHSUM64_DEFAULT_SEED, xxh64);
+
+ key_size = SLEN(GFID2PATH_XATTR_KEY_PREFIX) + GF_XXH64_DIGEST_LENGTH * 2 +
+ 1;
+ key = alloca(key_size);
+ snprintf(key, key_size, GFID2PATH_XATTR_KEY_PREFIX "%s", xxh64);
+
+ val_size = UUID_CANONICAL_FORM_LEN + NAME_MAX + 2;
+ val = alloca(val_size);
+ snprintf(val, val_size, "%s/%s", pgfid, bname);
+
+ /* Set the Xattr, ignore if same key xattr already exists */
+ ret = sys_lsetxattr(file_path, key, val, strlen(val), XATTR_CREATE);
+ if (ret == -1) {
+ if (errno == EEXIST) {
+ printf("Xattr already exists, ignoring..\n");
+ ret = 0;
+ goto out;
+ }
+
+ fprintf(stderr, "Failed to set gfid2path xattr. errno=%d\n error=%s",
+ errno, strerror(errno));
+ ret = -1;
+ goto out;
+ }
+
+ printf("Success. file=%s key=%s value=%s\n", file_path, key, val);
+
+out:
+ if (file_path1 != NULL)
+ free(file_path1);
+
+ if (file_path2 != NULL)
+ free(file_path2);
+
+ return ret;
+}