From 950371be29d029179ac5cd0ad2dfdbfcd4467b96 Mon Sep 17 00:00:00 2001 From: Avra Sengupta Date: Mon, 27 May 2013 22:23:57 +0530 Subject: move 'xlators/marker/utils/' to 'geo-replication/' directory Change-Id: Ibd0faefecc15b6713eda28bc96794ae58aff45aa BUG: 847839 Original Author: Amar Tumballi Signed-off-by: Avra Sengupta Reviewed-on: http://review.gluster.org/5133 Tested-by: Gluster Build System Reviewed-by: Vijay Bellur --- Makefile.am | 6 +- configure.ac | 8 +- geo-replication/Makefile.am | 3 + geo-replication/src/Makefile.am | 26 + geo-replication/src/gsyncd.c | 367 ++++++++ geo-replication/src/procdiggy.c | 121 +++ geo-replication/src/procdiggy.h | 20 + geo-replication/syncdaemon/Makefile.am | 7 + geo-replication/syncdaemon/README.md | 81 ++ geo-replication/syncdaemon/__codecheck.py | 46 + geo-replication/syncdaemon/__init__.py | 0 geo-replication/syncdaemon/configinterface.py | 224 +++++ geo-replication/syncdaemon/gconf.py | 20 + geo-replication/syncdaemon/gsyncd.py | 419 +++++++++ geo-replication/syncdaemon/libcxattr.py | 72 ++ geo-replication/syncdaemon/master.py | 961 ++++++++++++++++++++ geo-replication/syncdaemon/monitor.py | 129 +++ geo-replication/syncdaemon/repce.py | 225 +++++ geo-replication/syncdaemon/resource.py | 972 +++++++++++++++++++++ geo-replication/syncdaemon/syncdutils.py | 288 ++++++ tests/bugs/bug-861945.t | 51 -- tests/bugs/bug-864499.t | 20 - tests/bugs/bug-864506.t | 33 - tests/bugs/bug-874272.t | 48 - xlators/features/marker/Makefile.am | 2 +- xlators/features/marker/utils/Makefile.am | 3 - xlators/features/marker/utils/src/Makefile.am | 26 - xlators/features/marker/utils/src/gsyncd.c | 367 -------- xlators/features/marker/utils/src/procdiggy.c | 121 --- xlators/features/marker/utils/src/procdiggy.h | 20 - .../features/marker/utils/syncdaemon/Makefile.am | 7 - xlators/features/marker/utils/syncdaemon/README.md | 81 -- .../marker/utils/syncdaemon/__codecheck.py | 46 - .../features/marker/utils/syncdaemon/__init__.py | 0 .../marker/utils/syncdaemon/configinterface.py | 224 ----- xlators/features/marker/utils/syncdaemon/gconf.py | 20 - xlators/features/marker/utils/syncdaemon/gsyncd.py | 419 --------- .../features/marker/utils/syncdaemon/libcxattr.py | 72 -- xlators/features/marker/utils/syncdaemon/master.py | 961 -------------------- .../features/marker/utils/syncdaemon/monitor.py | 129 --- xlators/features/marker/utils/syncdaemon/repce.py | 225 ----- .../features/marker/utils/syncdaemon/resource.py | 972 --------------------- .../features/marker/utils/syncdaemon/syncdutils.py | 288 ------ 43 files changed, 3990 insertions(+), 4140 deletions(-) create mode 100644 geo-replication/Makefile.am create mode 100644 geo-replication/src/Makefile.am create mode 100644 geo-replication/src/gsyncd.c create mode 100644 geo-replication/src/procdiggy.c create mode 100644 geo-replication/src/procdiggy.h create mode 100644 geo-replication/syncdaemon/Makefile.am create mode 100644 geo-replication/syncdaemon/README.md create mode 100644 geo-replication/syncdaemon/__codecheck.py create mode 100644 geo-replication/syncdaemon/__init__.py create mode 100644 geo-replication/syncdaemon/configinterface.py create mode 100644 geo-replication/syncdaemon/gconf.py create mode 100644 geo-replication/syncdaemon/gsyncd.py create mode 100644 geo-replication/syncdaemon/libcxattr.py create mode 100644 geo-replication/syncdaemon/master.py create mode 100644 geo-replication/syncdaemon/monitor.py create mode 100644 geo-replication/syncdaemon/repce.py create mode 100644 geo-replication/syncdaemon/resource.py create mode 100644 geo-replication/syncdaemon/syncdutils.py delete mode 100755 tests/bugs/bug-861945.t delete mode 100644 tests/bugs/bug-864499.t delete mode 100755 tests/bugs/bug-864506.t delete mode 100755 tests/bugs/bug-874272.t delete mode 100644 xlators/features/marker/utils/Makefile.am delete mode 100644 xlators/features/marker/utils/src/Makefile.am delete mode 100644 xlators/features/marker/utils/src/gsyncd.c delete mode 100644 xlators/features/marker/utils/src/procdiggy.c delete mode 100644 xlators/features/marker/utils/src/procdiggy.h delete mode 100644 xlators/features/marker/utils/syncdaemon/Makefile.am delete mode 100644 xlators/features/marker/utils/syncdaemon/README.md delete mode 100644 xlators/features/marker/utils/syncdaemon/__codecheck.py delete mode 100644 xlators/features/marker/utils/syncdaemon/__init__.py delete mode 100644 xlators/features/marker/utils/syncdaemon/configinterface.py delete mode 100644 xlators/features/marker/utils/syncdaemon/gconf.py delete mode 100644 xlators/features/marker/utils/syncdaemon/gsyncd.py delete mode 100644 xlators/features/marker/utils/syncdaemon/libcxattr.py delete mode 100644 xlators/features/marker/utils/syncdaemon/master.py delete mode 100644 xlators/features/marker/utils/syncdaemon/monitor.py delete mode 100644 xlators/features/marker/utils/syncdaemon/repce.py delete mode 100644 xlators/features/marker/utils/syncdaemon/resource.py delete mode 100644 xlators/features/marker/utils/syncdaemon/syncdutils.py diff --git a/Makefile.am b/Makefile.am index 85054dcab38..eba8ed12da6 100644 --- a/Makefile.am +++ b/Makefile.am @@ -6,7 +6,7 @@ EXTRA_DIST = autogen.sh \ gen-headers.py SUBDIRS = argp-standalone libglusterfs rpc api xlators glusterfsd \ - $(FUSERMOUNT_SUBDIR) doc extras cli + $(FUSERMOUNT_SUBDIR) doc extras cli @SYNCDAEMON_SUBDIR@ pkgconfigdir = @pkgconfigdir@ pkgconfig_DATA = glusterfs-api.pc libgfchangelog.pc @@ -20,7 +20,9 @@ gitclean: distclean rm -fr autom4te.cache rm -f missing aclocal.m4 config.h.in config.guess config.sub ltmain.sh install-sh configure depcomp rm -fr argp-standalone/autom4te.cache - rm -f argp-standalone/aclocal.m4 argp-standalone/config.h.in argp-standalone/configure argp-standalone/depcomp argp-standalone/install-sh argp-standalone/missing + rm -f argp-standalone/aclocal.m4 argp-standalone/config.h.in + rm -f argp-standalone/configure argp-standalone/depcomp + rm -f argp-standalone/install-sh argp-standalone/missing dist-hook: (cd $(srcdir) && git diff && echo ===== git log ==== && git log) > $(distdir)/ChangeLog diff --git a/configure.ac b/configure.ac index c9a816d768c..cfccf3f9e64 100644 --- a/configure.ac +++ b/configure.ac @@ -107,9 +107,6 @@ AC_CONFIG_FILES([Makefile xlators/features/quota/src/Makefile xlators/features/marker/Makefile xlators/features/marker/src/Makefile - xlators/features/marker/utils/Makefile - xlators/features/marker/utils/src/Makefile - xlators/features/marker/utils/syncdaemon/Makefile xlators/features/read-only/Makefile xlators/features/read-only/src/Makefile xlators/features/mac-compat/Makefile @@ -158,6 +155,9 @@ AC_CONFIG_FILES([Makefile libgfchangelog.pc api/Makefile api/src/Makefile + geo-replication/Makefile + geo-replication/src/Makefile + geo-replication/syncdaemon/Makefile glusterfs.spec]) AC_CANONICAL_HOST @@ -422,7 +422,7 @@ case $host_os in esac SYNCDAEMON_COMPILE=0 if test "x$enable_georeplication" != "xno"; then - SYNCDAEMON_SUBDIR=utils + SYNCDAEMON_SUBDIR=geo-replication SYNCDAEMON_COMPILE=1 BUILD_SYNCDAEMON="yes" diff --git a/geo-replication/Makefile.am b/geo-replication/Makefile.am new file mode 100644 index 00000000000..556951d9fb7 --- /dev/null +++ b/geo-replication/Makefile.am @@ -0,0 +1,3 @@ +SUBDIRS = syncdaemon src + +CLEANFILES = diff --git a/geo-replication/src/Makefile.am b/geo-replication/src/Makefile.am new file mode 100644 index 00000000000..9e410cda633 --- /dev/null +++ b/geo-replication/src/Makefile.am @@ -0,0 +1,26 @@ +gsyncddir = $(libexecdir)/glusterfs + +gsyncd_PROGRAMS = gsyncd + +gsyncd_SOURCES = gsyncd.c procdiggy.c + +gsyncd_LDADD = $(top_builddir)/libglusterfs/src/libglusterfs.la \ + $(GF_GLUSTERFS_LIBS) + +gsyncd_LDFLAGS = $(GF_LDFLAGS) + +noinst_HEADERS = procdiggy.h + +AM_CPPFLAGS = $(GF_CPPFLAGS) \ + -I$(top_srcdir)/libglusterfs/src\ + -DGSYNCD_PREFIX=\"$(libexecdir)/glusterfs\"\ + -DUSE_LIBGLUSTERFS\ + -DSBIN_DIR=\"$(sbindir)\" -DPYTHON=\"$(PYTHON)\" + +AM_CFLAGS = -Wall $(GF_CFLAGS) + + +CLEANFILES = + +$(top_builddir)/libglusterfs/src/libglusterfs.la: + $(MAKE) -C $(top_builddir)/libglusterfs/src/ all diff --git a/geo-replication/src/gsyncd.c b/geo-replication/src/gsyncd.c new file mode 100644 index 00000000000..9c4a5bdffb3 --- /dev/null +++ b/geo-replication/src/gsyncd.c @@ -0,0 +1,367 @@ +/* + Copyright (c) 2011-2012 Red Hat, Inc. + This file is part of GlusterFS. + + This file is licensed to you under your choice of the GNU Lesser + General Public License, version 3 or any later version (LGPLv3 or + later), or the GNU General Public License, version 2 (GPLv2), in all + cases as published by the Free Software Foundation. +*/ + +#ifndef _CONFIG_H +#define _CONFIG_H +#include "config.h" +#endif + +#include +#include +#include +#include +#include /* for PATH_MAX */ + +/* NOTE (USE_LIBGLUSTERFS): + * ------------------------ + * When USE_LIBGLUSTERFS debugging sumbol is passed; perform + * glusterfs translator like initialization so that glusterfs + * globals, contexts are valid when glustefs api's are invoked. + * We unconditionally pass then while building gsyncd binary. + */ +#ifdef USE_LIBGLUSTERFS +#include "glusterfs.h" +#include "globals.h" +#endif + +#include "common-utils.h" +#include "run.h" +#include "procdiggy.h" + +#define _GLUSTERD_CALLED_ "_GLUSTERD_CALLED_" +#define _GSYNCD_DISPATCHED_ "_GSYNCD_DISPATCHED_" +#define GSYNCD_CONF "geo-replication/gsyncd.conf" +#define GSYNCD_PY "gsyncd.py" +#define RSYNC "rsync" + +int restricted = 0; + +static int +duplexpand (void **buf, size_t tsiz, size_t *len) +{ + size_t osiz = tsiz * *len; + char *p = realloc (*buf, osiz << 1); + if (!p) { + free(*buf); + return -1; + } + + memset (p + osiz, 0, osiz); + *buf = p; + *len <<= 1; + + return 0; +} + +static int +str2argv (char *str, char ***argv) +{ + char *p = NULL; + char *savetok = NULL; + int argc = 0; + size_t argv_len = 32; + int ret = 0; + + assert (str); + str = strdup (str); + if (!str) + return -1; + + *argv = calloc (argv_len, sizeof (**argv)); + if (!*argv) + goto error; + + while ((p = strtok_r (str, " ", &savetok))) { + str = NULL; + + argc++; + if (argc == argv_len) { + ret = duplexpand ((void *)argv, + sizeof (**argv), + &argv_len); + if (ret == -1) + goto error; + } + (*argv)[argc - 1] = p; + } + + return argc; + + error: + fprintf (stderr, "out of memory\n"); + return -1; +} + +static int +invoke_gsyncd (int argc, char **argv) +{ + char config_file[PATH_MAX] = {0,}; + size_t gluster_workdir_len = 0; + runner_t runner = {0,}; + int i = 0; + int j = 0; + char *nargv[argc + 4]; + char *python = NULL; + + if (restricted) { + size_t len; + /* in restricted mode we forcibly use the system-wide config */ + runinit (&runner); + runner_add_args (&runner, SBIN_DIR"/gluster", + "--log-file=-", "system::", "getwd", + NULL); + runner_redir (&runner, STDOUT_FILENO, RUN_PIPE); + if (runner_start (&runner) == 0 && + fgets (config_file, PATH_MAX, + runner_chio (&runner, STDOUT_FILENO)) != NULL && + (len = strlen (config_file)) && + config_file[len - 1] == '\n' && + runner_end (&runner) == 0) + gluster_workdir_len = len - 1; + + if (gluster_workdir_len) { + if (gluster_workdir_len + 1 + strlen (GSYNCD_CONF) + 1 > + PATH_MAX) + goto error; + config_file[gluster_workdir_len] = '/'; + strcat (config_file, GSYNCD_CONF); + } else + goto error; + + if (setenv ("_GSYNCD_RESTRICTED_", "1", 1) == -1) + goto error; + } + + if (chdir ("/") == -1) + goto error; + + j = 0; + python = getenv("PYTHON"); + if(!python) + python = PYTHON; + nargv[j++] = python; + nargv[j++] = GSYNCD_PREFIX"/python/syncdaemon/"GSYNCD_PY; + for (i = 1; i < argc; i++) + nargv[j++] = argv[i]; + if (config_file[0]) { + nargv[j++] = "-c"; + nargv[j++] = config_file; + } + nargv[j++] = NULL; + + execvp (python, nargv); + + fprintf (stderr, "exec of '%s' failed\n", python); + return 127; + + error: + fprintf (stderr, "gsyncd initializaion failed\n"); + return 1; +} + + +static int +find_gsyncd (pid_t pid, pid_t ppid, char *name, void *data) +{ + char buf[NAME_MAX * 2] = {0,}; + char path[PATH_MAX] = {0,}; + char *p = NULL; + int zeros = 0; + int ret = 0; + int fd = -1; + pid_t *pida = (pid_t *)data; + + if (ppid != pida[0]) + return 0; + + sprintf (path, PROC"/%d/cmdline", pid); + fd = open (path, O_RDONLY); + if (fd == -1) + return 0; + ret = read (fd, buf, sizeof (buf)); + close (fd); + if (ret == -1) + return 0; + for (zeros = 0, p = buf; zeros < 2 && p < buf + ret; p++) + zeros += !*p; + + ret = 0; + switch (zeros) { + case 2: + if ((strcmp (basename (buf), basename (PYTHON)) || + strcmp (basename (buf + strlen (buf) + 1), GSYNCD_PY)) == 0) { + ret = 1; + break; + } + /* fallthrough */ + case 1: + if (strcmp (basename (buf), GSYNCD_PY) == 0) + ret = 1; + } + + if (ret == 1) { + if (pida[1] != -1) { + fprintf (stderr, GSYNCD_PY" sibling is not unique"); + return -1; + } + pida[1] = pid; + } + + return 0; +} + +static int +invoke_rsync (int argc, char **argv) +{ + int i = 0; + char path[PATH_MAX] = {0,}; + pid_t pid = -1; + pid_t ppid = -1; + pid_t pida[] = {-1, -1}; + char *name = NULL; + char buf[PATH_MAX + 1] = {0,}; + int ret = 0; + + assert (argv[argc] == NULL); + + if (argc < 2 || strcmp (argv[1], "--server") != 0) + goto error; + + for (i = 2; i < argc && argv[i][0] == '-'; i++); + + if (!(i == argc - 2 && strcmp (argv[i], ".") == 0 && argv[i + 1][0] == '/')) { + fprintf (stderr, "need an rsync invocation without protected args\n"); + goto error; + } + + /* look up sshd we are spawned from */ + for (pid = getpid () ;; pid = ppid) { + ppid = pidinfo (pid, &name); + if (ppid < 0) { + fprintf (stderr, "sshd ancestor not found\n"); + goto error; + } + if (strcmp (name, "sshd") == 0) { + GF_FREE (name); + break; + } + GF_FREE (name); + } + /* look up "ssh-sibling" gsyncd */ + pida[0] = pid; + ret = prociter (find_gsyncd, pida); + if (ret == -1 || pida[1] == -1) { + fprintf (stderr, "gsyncd sibling not found\n"); + goto error; + } + /* check if rsync target matches gsyncd target */ + sprintf (path, PROC"/%d/cwd", pida[1]); + ret = readlink (path, buf, sizeof (buf)); + if (ret == -1 || ret == sizeof (buf)) + goto error; + if (strcmp (argv[argc - 1], "/") == 0 /* root dir cannot be a target */ || + (strcmp (argv[argc - 1], path) /* match against gluster target */ && + strcmp (argv[argc - 1], buf) /* match against file target */) != 0) { + fprintf (stderr, "rsync target does not match "GEOREP" session\n"); + goto error; + } + + argv[0] = RSYNC; + + execvp (RSYNC, argv); + + fprintf (stderr, "exec of "RSYNC" failed\n"); + return 127; + + error: + fprintf (stderr, "disallowed "RSYNC" invocation\n"); + return 1; +} + + +struct invocable { + char *name; + int (*invoker) (int argc, char **argv); +}; + +struct invocable invocables[] = { + { "rsync", invoke_rsync }, + { "gsyncd", invoke_gsyncd }, + { NULL, NULL} +}; + +int +main (int argc, char **argv) +{ + char *evas = NULL; + struct invocable *i = NULL; + char *b = NULL; + char *sargv = NULL; + +#ifdef USE_LIBGLUSTERFS + glusterfs_ctx_t *ctx = NULL; + + ctx = glusterfs_ctx_new (); + if (!ctx) + return ENOMEM; + + if (glusterfs_globals_init (ctx)) + return 1; + + THIS->ctx = ctx; +#endif + + evas = getenv (_GLUSTERD_CALLED_); + if (evas && strcmp (evas, "1") == 0) + /* OK, we know glusterd called us, no need to look for further config + * ... altough this conclusion should not inherit to our children + */ + unsetenv (_GLUSTERD_CALLED_); + else { + /* we regard all gsyncd invocations unsafe + * that do not come from glusterd and + * therefore restrict it + */ + restricted = 1; + + if (!getenv (_GSYNCD_DISPATCHED_)) { + evas = getenv ("SSH_ORIGINAL_COMMAND"); + if (evas) + sargv = evas; + else { + evas = getenv ("SHELL"); + if (evas && strcmp (basename (evas), "gsyncd") == 0 && + argc == 3 && strcmp (argv[1], "-c") == 0) + sargv = argv[2]; + } + } + + } + + if (!(sargv && restricted)) + return invoke_gsyncd (argc, argv); + + argc = str2argv (sargv, &argv); + if (argc == -1 || setenv (_GSYNCD_DISPATCHED_, "1", 1) == -1) { + fprintf (stderr, "internal error\n"); + return 1; + } + + b = basename (argv[0]); + for (i = invocables; i->name; i++) { + if (strcmp (b, i->name) == 0) + return i->invoker (argc, argv); + } + + fprintf (stderr, "invoking %s in restricted SSH session is not allowed\n", + b); + + return 1; +} diff --git a/geo-replication/src/procdiggy.c b/geo-replication/src/procdiggy.c new file mode 100644 index 00000000000..1eba414c116 --- /dev/null +++ b/geo-replication/src/procdiggy.c @@ -0,0 +1,121 @@ +/* + Copyright (c) 2011-2012 Red Hat, Inc. + This file is part of GlusterFS. + + This file is licensed to you under your choice of the GNU Lesser + General Public License, version 3 or any later version (LGPLv3 or + later), or the GNU General Public License, version 2 (GPLv2), in all + cases as published by the Free Software Foundation. +*/ + +#ifndef _CONFIG_H +#define _CONFIG_H +#include "config.h" +#endif + +#include +#include +#include +#include +#include +#include /* for PATH_MAX */ + +#include "common-utils.h" +#include "procdiggy.h" + +pid_t +pidinfo (pid_t pid, char **name) +{ + char buf[NAME_MAX * 2] = {0,}; + FILE *f = NULL; + char path[PATH_MAX] = {0,}; + char *p = NULL; + int ret = 0; + + sprintf (path, PROC"/%d/status", pid); + + f = fopen (path, "r"); + if (!f) + return -1; + + if (name) + *name = NULL; + for (;;) { + size_t len; + memset (buf, 0, sizeof (buf)); + if (fgets (buf, sizeof (buf), f) == NULL || + (len = strlen (buf)) == 0 || + buf[len - 1] != '\n') { + pid = -1; + goto out; + } + buf[len - 1] = '\0'; + + if (name && !*name) { + p = strtail (buf, "Name:"); + if (p) { + while (isspace (*++p)); + *name = gf_strdup (p); + if (!*name) { + pid = -2; + goto out; + } + continue; + } + } + + p = strtail (buf, "PPid:"); + if (p) + break; + } + + while (isspace (*++p)); + ret = gf_string2int (p, &pid); + if (ret == -1) + pid = -1; + + out: + fclose (f); + if (pid == -1 && name && *name) + GF_FREE (name); + if (pid == -2) + fprintf (stderr, "out of memory\n"); + return pid; +} + +int +prociter (int (*proch) (pid_t pid, pid_t ppid, char *tmpname, void *data), + void *data) +{ + char *name = NULL; + DIR *d = NULL; + struct dirent *de = NULL; + pid_t pid = -1; + pid_t ppid = -1; + int ret = 0; + + d = opendir (PROC); + if (!d) + return -1; + while (errno = 0, de = readdir (d)) { + if (gf_string2int (de->d_name, &pid) != -1 && pid >= 0) { + ppid = pidinfo (pid, &name); + switch (ppid) { + case -1: continue; + case -2: ret = -1; break; + } + ret = proch (pid, ppid, name, data); + GF_FREE (name); + if (ret) + break; + } + } + closedir (d); + if (!de && errno) { + fprintf (stderr, "failed to traverse "PROC" (%s)\n", + strerror (errno)); + ret = -1; + } + + return ret; +} diff --git a/geo-replication/src/procdiggy.h b/geo-replication/src/procdiggy.h new file mode 100644 index 00000000000..56dfc4eb213 --- /dev/null +++ b/geo-replication/src/procdiggy.h @@ -0,0 +1,20 @@ +/* + Copyright (c) 2011-2012 Red Hat, Inc. + This file is part of GlusterFS. + + This file is licensed to you under your choice of the GNU Lesser + General Public License, version 3 or any later version (LGPLv3 or + later), or the GNU General Public License, version 2 (GPLv2), in all + cases as published by the Free Software Foundation. +*/ +#ifdef __NetBSD__ +#include +#endif /* __NetBSD__ */ + +#define PROC "/proc" + +pid_t pidinfo (pid_t pid, char **name); + +int prociter (int (*proch) (pid_t pid, pid_t ppid, char *name, void *data), + void *data); + diff --git a/geo-replication/syncdaemon/Makefile.am b/geo-replication/syncdaemon/Makefile.am new file mode 100644 index 00000000000..c19f6b45919 --- /dev/null +++ b/geo-replication/syncdaemon/Makefile.am @@ -0,0 +1,7 @@ +syncdaemondir = $(libexecdir)/glusterfs/python/syncdaemon + +syncdaemon_PYTHON = gconf.py gsyncd.py __init__.py master.py README.md repce.py \ + resource.py configinterface.py syncdutils.py monitor.py libcxattr.py \ + $(top_builddir)/contrib/ipaddr-py/ipaddr.py + +CLEANFILES = diff --git a/geo-replication/syncdaemon/README.md b/geo-replication/syncdaemon/README.md new file mode 100644 index 00000000000..d45006932d1 --- /dev/null +++ b/geo-replication/syncdaemon/README.md @@ -0,0 +1,81 @@ +gsycnd, the Gluster Syncdaemon +============================== + +REQUIREMENTS +------------ + +_gsyncd_ is a program which can operate either in _master_ or in _slave_ mode. +Requirements are categorized according to this. + +* supported OS is GNU/Linux +* Python >= 2.5, or 2.4 with Ctypes (see below) (both) +* OpenSSH >= 4.0 (master) / SSH2 compliant sshd (eg. openssh) (slave) +* rsync (both) +* glusterfs with marker support (master); glusterfs (optional on slave) +* FUSE; for supported versions consult glusterfs + +INSTALLATION +------------ + +As of now, the supported way of operation is running from the source directory. + +If you use Python 2.4.x, you need to install the [Ctypes module](http://python.net/crew/theller/ctypes/). + +CONFIGURATION +------------- + +gsyncd tunables are a subset of the long command-line options; for listing them, +type + + gsyncd.py --help + +and see the long options up to "--config-file". (The leading double dash should be omitted; +interim underscores and dashes are interchangeable.) The set of options bear some resemblance +to those of glusterfs and rsync. + +The config file format matches the following syntax: + + : + : + # comment + +By default (unless specified by the option `-c`), gsyncd looks for config file at _conf/gsyncd.conf_ +in the source tree. + +USAGE +----- + +gsyncd is a utilitly for continous mirroring, ie. it mirrors master to slave incrementally. +Assume we have a gluster volume _pop_ at localhost. We try to set up the following mirrors +for it with gysncd: + +1. _/data/mirror_ +2. local gluster volume _yow_ +3. _/data/far_mirror_ at example.com +4. gluster volume _moz_ at example.com + +The respective gsyncd invocations are (demoing some syntax sugaring): + +1. + + gsyncd.py gluster://localhost:pop file:///data/mirror + + or short form + + gsyncd.py :pop /data/mirror + +2. `gsyncd :pop :yow` +3. + + gsyncd.py :pop ssh://example.com:/data/far_mirror + + or short form + + gsyncd.py :pop example.com:/data/far_mirror + +4. `gsyncd.py :pop example.com::moz` + +gsyncd has to be available on both sides; it's location on the remote side has to be specified +via the "--remote-gsyncd" option (or "remote-gsyncd" config file parameter). (This option can also be +used for setting options on the remote side, although the suggested mode of operation is to +set parameters like log file / pid file in the configuration file.) diff --git a/geo-replication/syncdaemon/__codecheck.py b/geo-replication/syncdaemon/__codecheck.py new file mode 100644 index 00000000000..e3386afba8b --- /dev/null +++ b/geo-replication/syncdaemon/__codecheck.py @@ -0,0 +1,46 @@ +import os +import os.path +import sys +import tempfile +import shutil + +ipd = tempfile.mkdtemp(prefix = 'codecheck-aux') + +try: + # add a fake ipaddr module, we don't want to + # deal with the real one (just test our code) + f = open(os.path.join(ipd, 'ipaddr.py'), 'w') + f.write(""" +class IPAddress(object): + pass +class IPNetwork(list): + pass +""") + f.close() + sys.path.append(ipd) + + fl = os.listdir(os.path.dirname(sys.argv[0]) or '.') + fl.sort() + for f in fl: + if f[-3:] != '.py' or f[0] == '_': + continue + m = f[:-3] + sys.stdout.write('importing %s ...' % m) + __import__(m) + print(' OK.') + + def sys_argv_set(a): + sys.argv = sys.argv[:1] + a + + gsyncd = sys.modules['gsyncd'] + for a in [['--help'], ['--version'], ['--canonicalize-escape-url', '/foo']]: + print('>>> invoking program with args: %s' % ' '.join(a)) + pid = os.fork() + if not pid: + sys_argv_set(a) + gsyncd.main() + _, r = os.waitpid(pid, 0) + if r: + raise RuntimeError('invocation failed') +finally: + shutil.rmtree(ipd) diff --git a/geo-replication/syncdaemon/__init__.py b/geo-replication/syncdaemon/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/geo-replication/syncdaemon/configinterface.py b/geo-replication/syncdaemon/configinterface.py new file mode 100644 index 00000000000..e55bec519e9 --- /dev/null +++ b/geo-replication/syncdaemon/configinterface.py @@ -0,0 +1,224 @@ +try: + import ConfigParser +except ImportError: + # py 3 + import configparser as ConfigParser +import re +from string import Template + +from syncdutils import escape, unescape, norm, update_file, GsyncdError + +SECT_ORD = '__section_order__' +SECT_META = '__meta__' +config_version = 2.0 + +re_type = type(re.compile('')) + + +class MultiDict(object): + """a virtual dict-like class which functions as the union of underlying dicts""" + + def __init__(self, *dd): + self.dicts = dd + + def __getitem__(self, key): + val = None + for d in self.dicts: + if d.get(key): + val = d[key] + if not val: + raise KeyError(key) + return val + + +class GConffile(object): + """A high-level interface to ConfigParser which flattens the two-tiered + config layout by implenting automatic section dispatch based on initial + parameters. + + Also ensure section ordering in terms of their time of addition -- a compat + hack for Python < 2.7. + """ + + def _normconfig(self): + """normalize config keys by s/-/_/g""" + for n, s in self.config._sections.items(): + if n.find('__') == 0: + continue + s2 = type(s)() + for k, v in s.items(): + if k.find('__') != 0: + k = norm(k) + s2[k] = v + self.config._sections[n] = s2 + + def __init__(self, path, peers, *dd): + """ + - .path: location of config file + - .config: underlying ConfigParser instance + - .peers: on behalf of whom we flatten .config + (master, or master-slave url pair) + - .auxdicts: template subtituents + """ + self.peers = peers + self.path = path + self.auxdicts = dd + self.config = ConfigParser.RawConfigParser() + self.config.read(path) + self._normconfig() + + def section(self, rx=False): + """get the section name of the section representing .peers in .config""" + peers = self.peers + if not peers: + peers = ['.', '.'] + rx = True + if rx: + st = 'peersrx' + else: + st = 'peers' + return ' '.join([st] + [escape(u) for u in peers]) + + @staticmethod + def parse_section(section): + """retrieve peers sequence encoded by section name + (as urls or regexen, depending on section type) + """ + sl = section.split() + st = sl.pop(0) + sl = [unescape(u) for u in sl] + if st == 'peersrx': + sl = [re.compile(u) for u in sl] + return sl + + def ord_sections(self): + """Return an ordered list of sections. + + Ordering happens based on the auxiliary + SECT_ORD section storing indices for each + section added through the config API. + + To not to go corrupt in case of manually + written config files, we take care to append + also those sections which are not registered + in SECT_ORD. + + Needed for python 2.{4,5,6} where ConfigParser + cannot yet order sections/options internally. + """ + so = {} + if self.config.has_section(SECT_ORD): + so = self.config._sections[SECT_ORD] + so2 = {} + for k, v in so.items(): + if k != '__name__': + so2[k] = int(v) + tv = 0 + if so2: + tv = max(so2.values()) + 1 + ss = [s for s in self.config.sections() if s.find('__') != 0] + for s in ss: + if s in so.keys(): + continue + so2[s] = tv + tv += 1 + def scmp(x, y): + return cmp(*(so2[s] for s in (x, y))) + ss.sort(scmp) + return ss + + def update_to(self, dct, allow_unresolved=False): + """update @dct from key/values of ours. + + key/values are collected from .config by filtering the regexp sections + according to match, and from .section. The values are treated as templates, + which are substituted from .auxdicts and (in case of regexp sections) + match groups. + """ + if not self.peers: + raise GsyncdError('no peers given, cannot select matching options') + def update_from_sect(sect, mud): + for k, v in self.config._sections[sect].items(): + if k == '__name__': + continue + if allow_unresolved: + dct[k] = Template(v).safe_substitute(mud) + else: + dct[k] = Template(v).substitute(mud) + for sect in self.ord_sections(): + sp = self.parse_section(sect) + if isinstance(sp[0], re_type) and len(sp) == len(self.peers): + match = True + mad = {} + for i in range(len(sp)): + m = sp[i].search(self.peers[i]) + if not m: + match = False + break + for j in range(len(m.groups())): + mad['match%d_%d' % (i+1, j+1)] = m.groups()[j] + if match: + update_from_sect(sect, MultiDict(dct, mad, *self.auxdicts)) + if self.config.has_section(self.section()): + update_from_sect(self.section(), MultiDict(dct, *self.auxdicts)) + + def get(self, opt=None): + """print the matching key/value pairs from .config, + or if @opt given, the value for @opt (according to the + logic described in .update_to) + """ + d = {} + self.update_to(d, allow_unresolved = True) + if opt: + opt = norm(opt) + v = d.get(opt) + if v: + print(v) + else: + for k, v in d.iteritems(): + if k == '__name__': + continue + print("%s: %s" % (k, v)) + + def write(self, trfn, opt, *a, **kw): + """update on-disk config transactionally + + @trfn is the transaction function + """ + def mergeconf(f): + self.config = ConfigParser.RawConfigParser() + self.config.readfp(f) + self._normconfig() + if not self.config.has_section(SECT_META): + self.config.add_section(SECT_META) + self.config.set(SECT_META, 'version', config_version) + return trfn(norm(opt), *a, **kw) + def updateconf(f): + self.config.write(f) + update_file(self.path, updateconf, mergeconf) + + def _set(self, opt, val, rx=False): + """set @opt to @val in .section""" + sect = self.section(rx) + if not self.config.has_section(sect): + self.config.add_section(sect) + # regarding SECT_ORD, cf. ord_sections + if not self.config.has_section(SECT_ORD): + self.config.add_section(SECT_ORD) + self.config.set(SECT_ORD, sect, len(self.config._sections[SECT_ORD])) + self.config.set(sect, opt, val) + return True + + def set(self, opt, *a, **kw): + """perform ._set transactionally""" + self.write(self._set, opt, *a, **kw) + + def _delete(self, opt, rx=False): + """delete @opt from .section""" + sect = self.section(rx) + if self.config.has_section(sect): + return self.config.remove_option(sect, opt) + + def delete(self, opt, *a, **kw): + """perform ._delete transactionally""" + self.write(self._delete, opt, *a, **kw) diff --git a/geo-replication/syncdaemon/gconf.py b/geo-replication/syncdaemon/gconf.py new file mode 100644 index 00000000000..146c72a1825 --- /dev/null +++ b/geo-replication/syncdaemon/gconf.py @@ -0,0 +1,20 @@ +import os + +class GConf(object): + """singleton class to store globals + shared between gsyncd modules""" + + ssh_ctl_dir = None + ssh_ctl_args = None + cpid = None + pid_file_owned = False + log_exit = False + permanent_handles = [] + log_metadata = {} + + @classmethod + def setup_ssh_ctl(cls, ctld): + cls.ssh_ctl_dir = ctld + cls.ssh_ctl_args = ["-oControlMaster=auto", "-S", os.path.join(ctld, "gsycnd-ssh-%r@%h:%p")] + +gconf = GConf() diff --git a/geo-replication/syncdaemon/gsyncd.py b/geo-replication/syncdaemon/gsyncd.py new file mode 100644 index 00000000000..387900e6ce8 --- /dev/null +++ b/geo-replication/syncdaemon/gsyncd.py @@ -0,0 +1,419 @@ +#!/usr/bin/env python + +import os +import os.path +import sys +import time +import logging +import signal +import optparse +import fcntl +import fnmatch +from optparse import OptionParser, SUPPRESS_HELP +from logging import Logger +from errno import EEXIST, ENOENT + +from ipaddr import IPAddress, IPNetwork + +from gconf import gconf +from syncdutils import FreeObject, norm, grabpidfile, finalize, log_raise_exception +from syncdutils import GsyncdError, select, set_term_handler, privileged +from configinterface import GConffile +import resource +from monitor import monitor + +class GLogger(Logger): + """Logger customizations for gsyncd. + + It implements a log format similar to that of glusterfs. + """ + + def makeRecord(self, name, level, *a): + rv = Logger.makeRecord(self, name, level, *a) + rv.nsecs = (rv.created - int(rv.created)) * 1000000 + fr = sys._getframe(4) + callee = fr.f_locals.get('self') + if callee: + ctx = str(type(callee)).split("'")[1].split('.')[-1] + else: + ctx = '' + if not hasattr(rv, 'funcName'): + rv.funcName = fr.f_code.co_name + rv.lvlnam = logging.getLevelName(level)[0] + rv.ctx = ctx + return rv + + @classmethod + def setup(cls, **kw): + lbl = kw.get('label', "") + if lbl: + lbl = '(' + lbl + ')' + lprm = {'datefmt': "%Y-%m-%d %H:%M:%S", + 'format': "[%(asctime)s.%(nsecs)d] %(lvlnam)s [%(module)s" + lbl + ":%(lineno)s:%(funcName)s] %(ctx)s: %(message)s"} + lprm.update(kw) + lvl = kw.get('level', logging.INFO) + lprm['level'] = lvl + logging.root = cls("root", lvl) + logging.setLoggerClass(cls) + logging.getLogger().handlers = [] + logging.basicConfig(**lprm) + + @classmethod + def _gsyncd_loginit(cls, **kw): + lkw = {} + if gconf.log_level: + lkw['level'] = gconf.log_level + if kw.get('log_file'): + if kw['log_file'] in ('-', '/dev/stderr'): + lkw['stream'] = sys.stderr + elif kw['log_file'] == '/dev/stdout': + lkw['stream'] = sys.stdout + else: + lkw['filename'] = kw['log_file'] + + cls.setup(label=kw.get('label'), **lkw) + + lkw.update({'saved_label': kw.get('label')}) + gconf.log_metadata = lkw + gconf.log_exit = True + +def startup(**kw): + """set up logging, pidfile grabbing, daemonization""" + if getattr(gconf, 'pid_file', None) and kw.get('go_daemon') != 'postconn': + if not grabpidfile(): + sys.stderr.write("pidfile is taken, exiting.\n") + sys.exit(2) + gconf.pid_file_owned = True + + if kw.get('go_daemon') == 'should': + x, y = os.pipe() + gconf.cpid = os.fork() + if gconf.cpid: + os.close(x) + sys.exit() + os.close(y) + os.setsid() + dn = os.open(os.devnull, os.O_RDWR) + for f in (sys.stdin, sys.stdout, sys.stderr): + os.dup2(dn, f.fileno()) + if getattr(gconf, 'pid_file', None): + if not grabpidfile(gconf.pid_file + '.tmp'): + raise GsyncdError("cannot grab temporary pidfile") + os.rename(gconf.pid_file + '.tmp', gconf.pid_file) + # wait for parent to terminate + # so we can start up with + # no messing from the dirty + # ol' bustard + select((x,), (), ()) + os.close(x) + + GLogger._gsyncd_loginit(**kw) + +def main(): + """main routine, signal/exception handling boilerplates""" + gconf.starttime = time.time() + set_term_handler() + GLogger.setup() + excont = FreeObject(exval = 0) + try: + try: + main_i() + except: + log_raise_exception(excont) + finally: + finalize(exval = excont.exval) + +def main_i(): + """internal main routine + + parse command line, decide what action will be taken; + we can either: + - query/manipulate configuration + - format gsyncd urls using gsyncd's url parsing engine + - start service in following modes, in given stages: + - monitor: startup(), monitor() + - master: startup(), connect_remote(), connect(), service_loop() + - slave: startup(), connect(), service_loop() + """ + rconf = {'go_daemon': 'should'} + + def store_abs(opt, optstr, val, parser): + if val and val != '-': + val = os.path.abspath(val) + setattr(parser.values, opt.dest, val) + def store_local(opt, optstr, val, parser): + rconf[opt.dest] = val + def store_local_curry(val): + return lambda o, oo, vx, p: store_local(o, oo, val, p) + def store_local_obj(op, dmake): + return lambda o, oo, vx, p: store_local(o, oo, FreeObject(op=op, **dmake(vx)), p) + + op = OptionParser(usage="%prog [options...] ", version="%prog 0.0.1") + op.add_option('--gluster-command-dir', metavar='DIR', default='') + op.add_option('--gluster-log-file', metavar='LOGF', default=os.devnull, type=str, action='callback', callback=store_abs) + op.add_option('--gluster-log-level', metavar='LVL') + op.add_option('--gluster-params', metavar='PRMS', default='') + op.add_option('--gluster-cli-options', metavar='OPTS', default='--log-file=-') + op.add_option('--mountbroker', metavar='LABEL') + op.add_option('-p', '--pid-file', metavar='PIDF', type=str, action='callback', callback=store_abs) + op.add_option('-l', '--log-file', metavar='LOGF', type=str, action='callback', callback=store_abs) + op.add_option('--log-file-mbr', metavar='LOGF', type=str, action='callback', callback=store_abs) + op.add_option('--state-file', metavar='STATF', type=str, action='callback', callback=store_abs) + op.add_option('--ignore-deletes', default=False, action='store_true') + op.add_option('--use-rsync-xattrs', default=False, action='store_true') + op.add_option('-L', '--log-level', metavar='LVL') + op.add_option('-r', '--remote-gsyncd', metavar='CMD', default=os.path.abspath(sys.argv[0])) + op.add_option('--volume-id', metavar='UUID') + op.add_option('--session-owner', metavar='ID') + op.add_option('-s', '--ssh-command', metavar='CMD', default='ssh') + op.add_option('--rsync-command', metavar='CMD', default='rsync') + op.add_option('--rsync-options', metavar='OPTS', default='--sparse') + op.add_option('--rsync-ssh-options', metavar='OPTS', default='--compress') + op.add_option('--timeout', metavar='SEC', type=int, default=120) + op.add_option('--connection-timeout', metavar='SEC', type=int, default=60, help=SUPPRESS_HELP) + op.add_option('--sync-jobs', metavar='N', type=int, default=3) + op.add_option('--turns', metavar='N', type=int, default=0, help=SUPPRESS_HELP) + op.add_option('--allow-network', metavar='IPS', default='') + op.add_option('--socketdir', metavar='DIR') + op.add_option('--state-socket-unencoded', metavar='SOCKF', type=str, action='callback', callback=store_abs) + op.add_option('--checkpoint', metavar='LABEL', default='') + # tunables for failover/failback mechanism: + # None - gsyncd behaves as normal + # blind - gsyncd works with xtime pairs to identify + # candidates for synchronization + # wrapup - same as normal mode but does not assign + # xtimes to orphaned files + # see crawl() for usage of the above tunables + op.add_option('--special-sync-mode', type=str, help=SUPPRESS_HELP) + + op.add_option('-c', '--config-file', metavar='CONF', type=str, action='callback', callback=store_local) + # duh. need to specify dest or value will be mapped to None :S + op.add_option('--monitor', dest='monitor', action='callback', callback=store_local_curry(True)) + op.add_option('--feedback-fd', dest='feedback_fd', type=int, help=SUPPRESS_HELP, action='callback', callback=store_local) + op.add_option('--listen', dest='listen', help=SUPPRESS_HELP, action='callback', callback=store_local_curry(True)) + op.add_option('-N', '--no-daemon', dest="go_daemon", action='callback', callback=store_local_curry('dont')) + op.add_option('--debug', dest="go_daemon", action='callback', callback=lambda *a: (store_local_curry('dont')(*a), + setattr(a[-1].values, 'log_file', '-'), + setattr(a[-1].values, 'log_level', 'DEBUG'))), + + for a in ('check', 'get'): + op.add_option('--config-' + a, metavar='OPT', type=str, dest='config', action='callback', + callback=store_local_obj(a, lambda vx: {'opt': vx})) + op.add_option('--config-get-all', dest='config', action='callback', callback=store_local_obj('get', lambda vx: {'opt': None})) + for m in ('', '-rx', '-glob'): + # call this code 'Pythonic' eh? + # have to define a one-shot local function to be able to inject (a value depending on the) + # iteration variable into the inner lambda + def conf_mod_opt_regex_variant(rx): + op.add_option('--config-set' + m, metavar='OPT VAL', type=str, nargs=2, dest='config', action='callback', + callback=store_local_obj('set', lambda vx: {'opt': vx[0], 'val': vx[1], 'rx': rx})) + op.add_option('--config-del' + m, metavar='OPT', type=str, dest='config', action='callback', + callback=store_local_obj('del', lambda vx: {'opt': vx, 'rx': rx})) + conf_mod_opt_regex_variant(m and m[1:] or False) + + op.add_option('--normalize-url', dest='url_print', action='callback', callback=store_local_curry('normal')) + op.add_option('--canonicalize-url', dest='url_print', action='callback', callback=store_local_curry('canon')) + op.add_option('--canonicalize-escape-url', dest='url_print', action='callback', callback=store_local_curry('canon_esc')) + + tunables = [ norm(o.get_opt_string()[2:]) for o in op.option_list if o.callback in (store_abs, 'store_true', None) and o.get_opt_string() not in ('--version', '--help') ] + remote_tunables = [ 'listen', 'go_daemon', 'timeout', 'session_owner', 'config_file', 'use_rsync_xattrs' ] + rq_remote_tunables = { 'listen': True } + + # precedence for sources of values: 1) commandline, 2) cfg file, 3) defaults + # -- for this to work out we need to tell apart defaults from explicitly set + # options... so churn out the defaults here and call the parser with virgin + # values container. + defaults = op.get_default_values() + opts, args = op.parse_args(values=optparse.Values()) + confdata = rconf.get('config') + if not (len(args) == 2 or \ + (len(args) == 1 and rconf.get('listen')) or \ + (len(args) <= 2 and confdata) or \ + rconf.get('url_print')): + sys.stderr.write("error: incorrect number of arguments\n\n") + sys.stderr.write(op.get_usage() + "\n") + sys.exit(1) + + restricted = os.getenv('_GSYNCD_RESTRICTED_') + + if restricted: + allopts = {} + allopts.update(opts.__dict__) + allopts.update(rconf) + bannedtuns = set(allopts.keys()) - set(remote_tunables) + if bannedtuns: + raise GsyncdError('following tunables cannot be set with restricted SSH invocaton: ' + \ + ', '.join(bannedtuns)) + for k, v in rq_remote_tunables.items(): + if not k in allopts or allopts[k] != v: + raise GsyncdError('tunable %s is not set to value %s required for restricted SSH invocaton' % \ + (k, v)) + + confrx = getattr(confdata, 'rx', None) + if confrx: + # peers are regexen, don't try to parse them + if confrx == 'glob': + args = [ '\A' + fnmatch.translate(a) for a in args ] + canon_peers = args + namedict = {} + else: + rscs = [resource.parse_url(u) for u in args] + dc = rconf.get('url_print') + if dc: + for r in rscs: + print(r.get_url(**{'normal': {}, + 'canon': {'canonical': True}, + 'canon_esc': {'canonical': True, 'escaped': True}}[dc])) + return + local = remote = None + if rscs: + local = rscs[0] + if len(rscs) > 1: + remote = rscs[1] + if not local.can_connect_to(remote): + raise GsyncdError("%s cannot work with %s" % (local.path, remote and remote.path)) + pa = ([], [], []) + urlprms = ({}, {'canonical': True}, {'canonical': True, 'escaped': True}) + for x in rscs: + for i in range(len(pa)): + pa[i].append(x.get_url(**urlprms[i])) + peers, canon_peers, canon_esc_peers = pa + # creating the namedict, a dict representing various ways of referring to / repreenting + # peers to be fillable in config templates + mods = (lambda x: x, lambda x: x[0].upper() + x[1:], lambda x: 'e' + x[0].upper() + x[1:]) + if remote: + rmap = { local: ('local', 'master'), remote: ('remote', 'slave') } + else: + rmap = { local: ('local', 'slave') } + namedict = {} + for i in range(len(rscs)): + x = rscs[i] + for name in rmap[x]: + for j in range(3): + namedict[mods[j](name)] = pa[j][i] + if x.scheme == 'gluster': + namedict[name + 'vol'] = x.volume + if not 'config_file' in rconf: + rconf['config_file'] = os.path.join(os.path.dirname(sys.argv[0]), "conf/gsyncd.conf") + gcnf = GConffile(rconf['config_file'], canon_peers, defaults.__dict__, opts.__dict__, namedict) + + checkpoint_change = False + if confdata: + opt_ok = norm(confdata.opt) in tunables + [None] + if confdata.op == 'check': + if opt_ok: + sys.exit(0) + else: + sys.exit(1) + elif not opt_ok: + raise GsyncdError("not a valid option: " + confdata.opt) + if confdata.op == 'get': + gcnf.get(confdata.opt) + elif confdata.op == 'set': + gcnf.set(confdata.opt, confdata.val, confdata.rx) + elif confdata.op == 'del': + gcnf.delete(confdata.opt, confdata.rx) + # when modifying checkpoint, it's important to make a log + # of that, so in that case we go on to set up logging even + # if its just config invocation + if confdata.opt == 'checkpoint' and confdata.op in ('set', 'del') and \ + not confdata.rx: + checkpoint_change = True + if not checkpoint_change: + return + + gconf.__dict__.update(defaults.__dict__) + gcnf.update_to(gconf.__dict__) + gconf.__dict__.update(opts.__dict__) + gconf.configinterface = gcnf + + if restricted and gconf.allow_network: + ssh_conn = os.getenv('SSH_CONNECTION') + if not ssh_conn: + #legacy env var + ssh_conn = os.getenv('SSH_CLIENT') + if ssh_conn: + allowed_networks = [ IPNetwork(a) for a in gconf.allow_network.split(',') ] + client_ip = IPAddress(ssh_conn.split()[0]) + allowed = False + for nw in allowed_networks: + if client_ip in nw: + allowed = True + break + if not allowed: + raise GsyncdError("client IP address is not allowed") + + ffd = rconf.get('feedback_fd') + if ffd: + fcntl.fcntl(ffd, fcntl.F_SETFD, fcntl.FD_CLOEXEC) + + #normalize loglevel + lvl0 = gconf.log_level + if isinstance(lvl0, str): + lvl1 = lvl0.upper() + lvl2 = logging.getLevelName(lvl1) + # I have _never_ _ever_ seen such an utterly braindead + # error condition + if lvl2 == "Level " + lvl1: + raise GsyncdError('cannot recognize log level "%s"' % lvl0) + gconf.log_level = lvl2 + + if not privileged() and gconf.log_file_mbr: + gconf.log_file = gconf.log_file_mbr + + if checkpoint_change: + try: + GLogger._gsyncd_loginit(log_file=gconf.log_file, label='conf') + if confdata.op == 'set': + logging.info('checkpoint %s set' % confdata.val) + elif confdata.op == 'del': + logging.info('checkpoint info was reset') + except IOError: + if sys.exc_info()[1].errno == ENOENT: + # directory of log path is not present, + # which happens if we get here from + # a peer-multiplexed "config-set checkpoint" + # (as that directory is created only on the + # original node) + pass + else: + raise + return + + go_daemon = rconf['go_daemon'] + be_monitor = rconf.get('monitor') + + if not be_monitor and isinstance(remote, resource.SSH) and \ + go_daemon == 'should': + go_daemon = 'postconn' + log_file = None + else: + log_file = gconf.log_file + if be_monitor: + label = 'monitor' + elif remote: + #master + label = '' + else: + label = 'slave' + startup(go_daemon=go_daemon, log_file=log_file, label=label) + + if be_monitor: + return monitor() + + logging.info("syncing: %s" % " -> ".join(peers)) + resource.Popen.init_errhandler() + if remote: + go_daemon = remote.connect_remote(go_daemon=go_daemon) + if go_daemon: + startup(go_daemon=go_daemon, log_file=gconf.log_file) + # complete remote connection in child + remote.connect_remote(go_daemon='done') + local.connect() + if ffd: + os.close(ffd) + local.service_loop(*[r for r in [remote] if r]) + + +if __name__ == "__main__": + main() diff --git a/geo-replication/syncdaemon/libcxattr.py b/geo-replication/syncdaemon/libcxattr.py new file mode 100644 index 00000000000..f0a9d22920a --- /dev/null +++ b/geo-replication/syncdaemon/libcxattr.py @@ -0,0 +1,72 @@ +import os +from ctypes import * +from ctypes.util import find_library + +class Xattr(object): + """singleton that wraps the extended attribues system + interface for python using ctypes + + Just implement it to the degree we need it, in particular + - we need just the l*xattr variants, ie. we never want symlinks to be + followed + - don't need size discovery for getxattr, as we always know the exact + sizes we expect + """ + + libc = CDLL(find_library("libc")) + + @classmethod + def geterrno(cls): + return c_int.in_dll(cls.libc, 'errno').value + + @classmethod + def raise_oserr(cls): + errn = cls.geterrno() + raise OSError(errn, os.strerror(errn)) + + @classmethod + def _query_xattr(cls, path, siz, syscall, *a): + if siz: + buf = create_string_buffer('\0' * siz) + else: + buf = None + ret = getattr(cls.libc, syscall)(*((path,) + a + (buf, siz))) + if ret == -1: + cls.raise_oserr() + if siz: + return buf.raw[:ret] + else: + return ret + + @classmethod + def lgetxattr(cls, path, attr, siz=0): + return cls._query_xattr( path, siz, 'lgetxattr', attr) + + @classmethod + def llistxattr(cls, path, siz=0): + ret = cls._query_xattr(path, siz, 'llistxattr') + if isinstance(ret, str): + ret = ret.split('\0') + return ret + + @classmethod + def lsetxattr(cls, path, attr, val): + ret = cls.libc.lsetxattr(path, attr, val, len(val), 0) + if ret == -1: + cls.raise_oserr() + + @classmethod + def lremovexattr(cls, path, attr): + ret = cls.libc.lremovexattr(path, attr) + if ret == -1: + cls.raise_oserr() + + @classmethod + def llistxattr_buf(cls, path): + """listxattr variant with size discovery""" + size = cls.llistxattr(path) + if size == -1: + cls.raise_oserr() + if size == 0: + return [] + return cls.llistxattr(path, size) diff --git a/geo-replication/syncdaemon/master.py b/geo-replication/syncdaemon/master.py new file mode 100644 index 00000000000..f903f30595d --- /dev/null +++ b/geo-replication/syncdaemon/master.py @@ -0,0 +1,961 @@ +import os +import sys +import time +import stat +import random +import signal +import logging +import socket +import errno +import re +from errno import ENOENT, ENODATA, EPIPE +from threading import currentThread, Condition, Lock +from datetime import datetime +try: + from hashlib import md5 as md5 +except ImportError: + # py 2.4 + from md5 import new as md5 + +from gconf import gconf +from syncdutils import FreeObject, Thread, GsyncdError, boolify, \ + escape, unescape, select + +URXTIME = (-1, 0) + +# Utility functions to help us to get to closer proximity +# of the DRY principle (no, don't look for elevated or +# perspectivistic things here) + +def _xtime_now(): + t = time.time() + sec = int(t) + nsec = int((t - sec) * 1000000) + return (sec, nsec) + +def _volinfo_hook_relax_foreign(self): + volinfo_sys = self.get_sys_volinfo() + fgn_vi = volinfo_sys[self.KFGN] + if fgn_vi: + expiry = fgn_vi['timeout'] - int(time.time()) + 1 + logging.info('foreign volume info found, waiting %d sec for expiry' % \ + expiry) + time.sleep(expiry) + volinfo_sys = self.get_sys_volinfo() + self.volinfo_state, state_change = self.volinfo_state_machine(self.volinfo_state, + volinfo_sys) + if self.inter_master: + raise GsyncdError("cannot be intermediate master in special mode") + return (volinfo_sys, state_change) + + +# The API! + +def gmaster_builder(): + """produce the GMaster class variant corresponding + to sync mode""" + this = sys.modules[__name__] + modemixin = gconf.special_sync_mode + if not modemixin: + modemixin = 'normal' + logging.info('setting up master for %s sync mode' % modemixin) + modemixin = getattr(this, modemixin.capitalize() + 'Mixin') + sendmarkmixin = boolify(gconf.use_rsync_xattrs) and SendmarkRsyncMixin or SendmarkNormalMixin + purgemixin = boolify(gconf.ignore_deletes) and PurgeNoopMixin or PurgeNormalMixin + class _GMaster(GMasterBase, modemixin, sendmarkmixin, purgemixin): + pass + return _GMaster + + +# Mixin classes that implement the data format +# and logic particularities of the certain +# sync modes + +class NormalMixin(object): + """normal geo-rep behavior""" + + minus_infinity = URXTIME + + # following staticmethods ideally would be + # methods of an xtime object (in particular, + # implementing the hooks needed for comparison + # operators), but at this point we don't yet + # have a dedicated xtime class + + @staticmethod + def serialize_xtime(xt): + return "%d.%d" % tuple(xt) + + @staticmethod + def deserialize_xtime(xt): + return tuple(int(x) for x in xt.split(".")) + + @staticmethod + def native_xtime(xt): + return xt + + @staticmethod + def xtime_geq(xt0, xt1): + return xt0 >= xt1 + + def make_xtime_opts(self, is_master, opts): + if not 'create' in opts: + opts['create'] = is_master and not self.inter_master + if not 'default_xtime' in opts: + if is_master and self.inter_master: + opts['default_xtime'] = ENODATA + else: + opts['default_xtime'] = URXTIME + + def xtime_low(self, server, path, **opts): + xt = server.xtime(path, self.uuid) + if isinstance(xt, int) and xt != ENODATA: + return xt + if xt == ENODATA or xt < self.volmark: + if opts['create']: + xt = _xtime_now() + server.set_xtime(path, self.uuid, xt) + else: + xt = opts['default_xtime'] + return xt + + def keepalive_payload_hook(self, timo, gap): + # first grab a reference as self.volinfo + # can be changed in main thread + vi = self.volinfo + if vi: + # then have a private copy which we can mod + vi = vi.copy() + vi['timeout'] = int(time.time()) + timo + else: + # send keep-alives more frequently to + # avoid a delay in announcing our volume info + # to slave if it becomes established in the + # meantime + gap = min(10, gap) + return (vi, gap) + + def volinfo_hook(self): + volinfo_sys = self.get_sys_volinfo() + self.volinfo_state, state_change = self.volinfo_state_machine(self.volinfo_state, + volinfo_sys) + return (volinfo_sys, state_change) + + def xtime_reversion_hook(self, path, xtl, xtr): + if xtr > xtl: + raise GsyncdError("timestamp corruption for " + path) + + def need_sync(self, e, xte, xtrd): + return xte > xtrd + + def set_slave_xtime(self, path, mark): + self.slave.server.set_xtime(path, self.uuid, mark) + +class WrapupMixin(NormalMixin): + """a variant that differs from normal in terms + of ignoring non-indexed files""" + + @staticmethod + def make_xtime_opts(is_master, opts): + if not 'create' in opts: + opts['create'] = False + if not 'default_xtime' in opts: + opts['default_xtime'] = URXTIME + + @staticmethod + def keepalive_payload_hook(timo, gap): + return (None, gap) + + def volinfo_hook(self): + return _volinfo_hook_relax_foreign(self) + +class BlindMixin(object): + """Geo-rep flavor using vectored xtime. + + Coordinates are the master, slave uuid pair; + in master coordinate behavior is normal, + in slave coordinate we force synchronization + on any value difference (these are in disjunctive + relation, ie. if either orders the entry to be + synced, it shall be synced. + """ + + minus_infinity = (URXTIME, None) + + @staticmethod + def serialize_xtime(xt): + a = [] + for x in xt: + if not x: + x = ('None', '') + a.extend(x) + return '.'.join(str(n) for n in a) + + @staticmethod + def deserialize_xtime(xt): + a = xt.split(".") + a = (tuple(a[0:2]), tuple(a[3:4])) + b = [] + for p in a: + if p[0] == 'None': + p = None + else: + p = tuple(int(x) for x in p) + b.append(p) + return tuple(b) + + @staticmethod + def native_xtime(xt): + return xt[0] + + @staticmethod + def xtime_geq(xt0, xt1): + return (not xt1[0] or xt0[0] >= xt1[0]) and \ + (not xt1[1] or xt0[1] >= xt1[1]) + + @property + def ruuid(self): + if self.volinfo_r: + return self.volinfo_r['uuid'] + + @staticmethod + def make_xtime_opts(is_master, opts): + if not 'create' in opts: + opts['create'] = is_master + if not 'default_xtime' in opts: + opts['default_xtime'] = URXTIME + + def xtime_low(self, server, path, **opts): + xtd = server.xtime_vec(path, self.uuid, self.ruuid) + if isinstance(xtd, int): + return xtd + xt = (xtd[self.uuid], xtd[self.ruuid]) + if not xt[1] and (not xt[0] or xt[0] < self.volmark): + if opts['create']: + # not expected, but can happen if file originates + # from interrupted gsyncd transfer + logging.warn('have to fix up missing xtime on ' + path) + xt0 = _xtime_now() + server.set_xtime(path, self.uuid, xt0) + else: + xt0 = opts['default_xtime'] + xt = (xt0, xt[1]) + return xt + + @staticmethod + def keepalive_payload_hook(timo, gap): + return (None, gap) + + def volinfo_hook(self): + res = _volinfo_hook_relax_foreign(self) + volinfo_r_new = self.slave.server.native_volume_info() + if volinfo_r_new['retval']: + raise GsyncdError("slave is corrupt") + if getattr(self, 'volinfo_r', None): + if self.volinfo_r['uuid'] != volinfo_r_new['uuid']: + raise GsyncdError("uuid mismatch on slave") + self.volinfo_r = volinfo_r_new + return res + + def xtime_reversion_hook(self, path, xtl, xtr): + if not isinstance(xtr[0], int) and \ + (isinstance(xtl[0], int) or xtr[0] > xtl[0]): + raise GsyncdError("timestamp corruption for " + path) + + def need_sync(self, e, xte, xtrd): + if xte[0]: + if not xtrd[0] or xte[0] > xtrd[0]: + # there is outstanding diff at 0th pos, + # we can short-cut to true + return True + # we arrived to this point by either of these + # two possiblilites: + # - no outstanding difference at 0th pos, + # wanna see 1st pos if he raises veto + # against "no need to sync" proposal + # - no data at 0th pos, 1st pos will have + # to decide (due to xtime assignment, + # in this case 1st pos does carry data + # -- iow, if 1st pos did not have data, + # and 0th neither, 0th would have been + # force-feeded) + if not xte[1]: + # no data, no veto + return False + # the hard work: for 1st pos, + # the conduct is fetch corresponding + # slave data and do a "blind" comparison + # (ie. do not care who is newer, we trigger + # sync on non-identical xitmes) + xtr = self.xtime(e, self.slave) + return isinstance(xtr, int) or xte[1] != xtr[1] + + def set_slave_xtime(self, path, mark): + xtd = {} + for (u, t) in zip((self.uuid, self.ruuid), mark): + if t: + xtd[u] = t + self.slave.server.set_xtime_vec(path, xtd) + + +# Further mixins for certain tunable behaviors + +class SendmarkNormalMixin(object): + + def sendmark_regular(self, *a, **kw): + return self.sendmark(*a, **kw) + +class SendmarkRsyncMixin(object): + + def sendmark_regular(self, *a, **kw): + pass + + +class PurgeNormalMixin(object): + + def purge_missing(self, path, names): + self.slave.server.purge(path, names) + +class PurgeNoopMixin(object): + + def purge_missing(self, path, names): + pass + + + +class GMasterBase(object): + """abstract class impementling master role""" + + KFGN = 0 + KNAT = 1 + + def get_sys_volinfo(self): + """query volume marks on fs root + + err out on multiple foreign masters + """ + fgn_vis, nat_vi = self.master.server.foreign_volume_infos(), \ + self.master.server.native_volume_info() + fgn_vi = None + if fgn_vis: + if len(fgn_vis) > 1: + raise GsyncdError("cannot work with multiple foreign masters") + fgn_vi = fgn_vis[0] + return fgn_vi, nat_vi + + @property + def uuid(self): + if self.volinfo: + return self.volinfo['uuid'] + + @property + def volmark(self): + if self.volinfo: + return self.volinfo['volume_mark'] + + @property + def inter_master(self): + """decide if we are an intermediate master + in a cascading setup + """ + return self.volinfo_state[self.KFGN] and True or False + + def xtime(self, path, *a, **opts): + """get amended xtime + + as of amending, we can create missing xtime, or + determine a valid value if what we get is expired + (as of the volume mark expiry); way of amendig + depends on @opts and on subject of query (master + or slave). + """ + if a: + rsc = a[0] + else: + rsc = self.master + self.make_xtime_opts(rsc == self.master, opts) + return self.xtime_low(rsc.server, path, **opts) + + def __init__(self, master, slave): + self.master = master + self.slave = slave + self.jobtab = {} + self.syncer = Syncer(slave) + # crawls vs. turns: + # - self.crawls is simply the number of crawl() invocations on root + # - one turn is a maximal consecutive sequence of crawls so that each + # crawl in it detects a change to be synced + # - self.turns is the number of turns since start + # - self.total_turns is a limit so that if self.turns reaches it, then + # we exit (for diagnostic purposes) + # so, eg., if the master fs changes unceasingly, self.turns will remain 0. + self.crawls = 0 + self.turns = 0 + self.total_turns = int(gconf.turns) + self.lastreport = {'crawls': 0, 'turns': 0} + self.start = None + self.change_seen = None + self.syncTime=0 + self.lastSyncTime=0 + self.crawlStartTime=0 + self.crawlTime=0 + self.filesSynced=0 + self.bytesSynced=0 + # the authoritative (foreign, native) volinfo pair + # which lets us deduce what to do when we refetch + # the volinfos from system + uuid_preset = getattr(gconf, 'volume_id', None) + self.volinfo_state = (uuid_preset and {'uuid': uuid_preset}, None) + # the actual volinfo we make use of + self.volinfo = None + self.terminate = False + self.checkpoint_thread = None + + @classmethod + def _checkpt_param(cls, chkpt, prm, xtimish=True): + """use config backend to lookup a parameter belonging to + checkpoint @chkpt""" + cprm = getattr(gconf, 'checkpoint_' + prm, None) + if not cprm: + return + chkpt_mapped, val = cprm.split(':', 1) + if unescape(chkpt_mapped) != chkpt: + return + if xtimish: + val = cls.deserialize_xtime(val) + return val + + @classmethod + def _set_checkpt_param(cls, chkpt, prm, val, xtimish=True): + """use config backend to store a parameter associated + with checkpoint @chkpt""" + if xtimish: + val = cls.serialize_xtime(val) + gconf.configinterface.set('checkpoint_' + prm, "%s:%s" % (escape(chkpt), val)) + + @staticmethod + def humantime(*tpair): + """format xtime-like (sec, nsec) pair to human readable format""" + ts = datetime.fromtimestamp(float('.'.join(str(n) for n in tpair))).\ + strftime("%Y-%m-%d %H:%M:%S") + if len(tpair) > 1: + ts += '.' + str(tpair[1]) + return ts + + def get_extra_info(self): + str_info="\nFile synced : %d" %(self.filesSynced) + str_info+="\nBytes Synced : %d KB" %(self.syncer.bytesSynced) + str_info+="\nSync Time : %f seconds" %(self.syncTime) + self.crawlTime=datetime.now()-self.crawlStartTime + years , days =divmod(self.crawlTime.days,365.25) + years=int(years) + days=int(days) + + date="" + m, s = divmod(self.crawlTime.seconds, 60) + h, m = divmod(m, 60) + + if years!=0 : + date+=str(years)+" year " + if days!=0 : + date+=str(days)+" day " + if h!=0 : + date+=str(h)+" H : " + if m!=0 or h!=0 : + date+=str(m)+" M : " + + date+=str(s)+" S" + self.crawlTime=date + str_info+="\nCrawl Time : %s" %(str(self.crawlTime)) + str_info+="\n\0" + return str_info + + def checkpt_service(self, chan, chkpt, tgt): + """checkpoint service loop + + monitor and verify checkpoint status for @chkpt, and listen + for incoming requests for whom we serve a pretty-formatted + status report""" + if not chkpt: + # dummy loop for the case when there is no checkpt set + while True: + select([chan], [], []) + conn, _ = chan.accept() + conn.send(self.get_extra_info()) + conn.close() + completed = self._checkpt_param(chkpt, 'completed', xtimish=False) + if completed: + completed = tuple(int(x) for x in completed.split('.')) + while True: + s,_,_ = select([chan], [], [], (not completed) and 5 or None) + # either request made and we re-check to not + # give back stale data, or we still hunting for completion + if self.native_xtime(tgt) and self.native_xtime(tgt) < self.volmark: + # indexing has been reset since setting the checkpoint + status = "is invalid" + else: + xtr = self.xtime('.', self.slave) + if isinstance(xtr, int): + raise GsyncdError("slave root directory is unaccessible (%s)", + os.strerror(xtr)) + ncompleted = self.xtime_geq(xtr, tgt) + if completed and not ncompleted: # stale data + logging.warn("completion time %s for checkpoint %s became stale" % \ + (self.humantime(*completed), chkpt)) + completed = None + gconf.confdata.delete('checkpoint-completed') + if ncompleted and not completed: # just reaching completion + completed = "%.6f" % time.time() + self._set_checkpt_param(chkpt, 'completed', completed, xtimish=False) + completed = tuple(int(x) for x in completed.split('.')) + logging.info("checkpoint %s completed" % chkpt) + status = completed and \ + "completed at " + self.humantime(completed[0]) or \ + "not reached yet" + if s: + conn = None + try: + conn, _ = chan.accept() + try: + conn.send(" | checkpoint %s %s %s" % (chkpt, status,self.get_extra_info())) + except: + exc = sys.exc_info()[1] + if (isinstance(exc, OSError) or isinstance(exc, IOError)) and \ + exc.errno == EPIPE: + logging.debug('checkpoint client disconnected') + else: + raise + finally: + if conn: + conn.close() + + def start_checkpoint_thread(self): + """prepare and start checkpoint service""" + if self.checkpoint_thread or not ( + getattr(gconf, 'state_socket_unencoded', None) and getattr(gconf, 'socketdir', None) + ): + return + chan = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + state_socket = os.path.join(gconf.socketdir, md5(gconf.state_socket_unencoded).hexdigest() + ".socket") + try: + os.unlink(state_socket) + except: + if sys.exc_info()[0] == OSError: + pass + chan.bind(state_socket) + chan.listen(1) + checkpt_tgt = None + if gconf.checkpoint: + checkpt_tgt = self._checkpt_param(gconf.checkpoint, 'target') + if not checkpt_tgt: + checkpt_tgt = self.xtime('.') + if isinstance(checkpt_tgt, int): + raise GsyncdError("master root directory is unaccessible (%s)", + os.strerror(checkpt_tgt)) + self._set_checkpt_param(gconf.checkpoint, 'target', checkpt_tgt) + logging.debug("checkpoint target %s has been determined for checkpoint %s" % \ + (repr(checkpt_tgt), gconf.checkpoint)) + t = Thread(target=self.checkpt_service, args=(chan, gconf.checkpoint, checkpt_tgt)) + t.start() + self.checkpoint_thread = t + + def crawl_loop(self): + """start the keep-alive thread and iterate .crawl""" + timo = int(gconf.timeout or 0) + if timo > 0: + def keep_alive(): + while True: + vi, gap = self.keepalive_payload_hook(timo, timo * 0.5) + self.slave.server.keep_alive(vi) + time.sleep(gap) + t = Thread(target=keep_alive) + t.start() + self.lastreport['time'] = time.time() + self.crawlStartTime=datetime.now() + while not self.terminate: + self.crawl() + + def add_job(self, path, label, job, *a, **kw): + """insert @job function to job table at @path with @label""" + if self.jobtab.get(path) == None: + self.jobtab[path] = [] + self.jobtab[path].append((label, a, lambda : job(*a, **kw))) + + def add_failjob(self, path, label): + """invoke .add_job with a job that does nothing just fails""" + logging.debug('salvaged: ' + label) + self.add_job(path, label, lambda: False) + + def wait(self, path, *args): + """perform jobs registered for @path + + Reset jobtab entry for @path, + determine success as the conjuction of + success of all the jobs. In case of + success, call .sendmark on @path + """ + jobs = self.jobtab.pop(path, []) + succeed = True + for j in jobs: + ret = j[-1]() + if not ret: + succeed = False + if succeed: + self.sendmark(path, *args) + return succeed + + def sendmark(self, path, mark, adct=None): + """update slave side xtime for @path to master side xtime + + also can send a setattr payload (see Server.setattr). + """ + if adct: + self.slave.server.setattr(path, adct) + self.set_slave_xtime(path, mark) + + @staticmethod + def volinfo_state_machine(volinfo_state, volinfo_sys): + """compute new volinfo_state from old one and incoming + as of current system state, also indicating if there was a + change regarding which volume mark is the authoritative one + + @volinfo_state, @volinfo_sys are pairs of volume mark dicts + (foreign, native). + + Note this method is marked as static, ie. the computation is + pure, without reliance on any excess implicit state. State + transitions which are deemed as ambiguous or banned will raise + an exception. + + """ + # store the value below "boxed" to emulate proper closures + # (variables of the enclosing scope are available inner functions + # provided they are no reassigned; mutation is OK). + param = FreeObject(relax_mismatch = False, state_change = None, index=-1) + def select_vi(vi0, vi): + param.index += 1 + if vi and (not vi0 or vi0['uuid'] == vi['uuid']): + if not vi0 and not param.relax_mismatch: + param.state_change = param.index + # valid new value found; for the rest, we are graceful about + # uuid mismatch + param.relax_mismatch = True + return vi + if vi0 and vi and vi0['uuid'] != vi['uuid'] and not param.relax_mismatch: + # uuid mismatch for master candidate, bail out + raise GsyncdError("aborting on uuid change from %s to %s" % \ + (vi0['uuid'], vi['uuid'])) + # fall back to old + return vi0 + newstate = tuple(select_vi(*vip) for vip in zip(volinfo_state, volinfo_sys)) + srep = lambda vi: vi and vi['uuid'][0:8] + logging.debug('(%s, %s) << (%s, %s) -> (%s, %s)' % \ + tuple(srep(vi) for vi in volinfo_state + volinfo_sys + newstate)) + return newstate, param.state_change + + def crawl(self, path='.', xtl=None): + """crawling... + + Standing around + All the right people + Crawling + Tennis on Tuesday + The ladder is long + It is your nature + You've gotta suntan + Football on Sunday + Society boy + + Recursively walk the master side tree and check if updates are + needed due to xtime differences. One invocation of crawl checks + children of @path and do a recursive enter only on + those directory children where there is an update needed. + + Way of updates depend on file type: + - for symlinks, sync them directy and synchronously + - for regular children, register jobs for @path (cf. .add_job) to start + and wait on their rsync + - for directory children, register a job for @path which waits (.wait) + on jobs for the given child + (other kind of filesystem nodes are not considered) + + Those slave side children which do not exist on master are simply + purged (see Server.purge). + + Behavior is fault tolerant, synchronization is adaptive: if some action fails, + just go on relentlessly, adding a fail job (see .add_failjob) which will prevent + the .sendmark on @path, so when the next crawl will arrive to @path it will not + see it as up-to-date and will try to sync it again. While this semantics can be + supported by funky design principles (http://c2.com/cgi/wiki?LazinessImpatienceHubris), + the ultimate reason which excludes other possibilities is simply transience: we cannot + assert that the file systems (master / slave) underneath do not change and actions + taken upon some condition will not lose their context by the time they are performed. + """ + if path == '.': + if self.start: + self.crawls += 1 + logging.debug("... crawl #%d done, took %.6f seconds" % \ + (self.crawls, time.time() - self.start)) + time.sleep(1) + self.start = time.time() + should_display_info = self.start - self.lastreport['time'] >= 60 + if should_display_info: + logging.info("completed %d crawls, %d turns", + self.crawls - self.lastreport['crawls'], + self.turns - self.lastreport['turns']) + self.lastreport.update(crawls = self.crawls, + turns = self.turns, + time = self.start) + volinfo_sys, state_change = self.volinfo_hook() + if self.inter_master: + self.volinfo = volinfo_sys[self.KFGN] + else: + self.volinfo = volinfo_sys[self.KNAT] + if state_change == self.KFGN or (state_change == self.KNAT and not self.inter_master): + logging.info('new master is %s', self.uuid) + if self.volinfo: + logging.info("%s master with volume id %s ..." % \ + (self.inter_master and "intermediate" or "primary", + self.uuid)) + if state_change == self.KFGN: + gconf.configinterface.set('volume_id', self.uuid) + if self.volinfo: + if self.volinfo['retval']: + raise GsyncdError ("master is corrupt") + self.start_checkpoint_thread() + else: + if should_display_info or self.crawls == 0: + if self.inter_master: + logging.info("waiting for being synced from %s ..." % \ + self.volinfo_state[self.KFGN]['uuid']) + else: + logging.info("waiting for volume info ...") + return + logging.debug("entering " + path) + if not xtl: + xtl = self.xtime(path) + if isinstance(xtl, int): + self.add_failjob(path, 'no-local-node') + return + xtr = self.xtime(path, self.slave) + if isinstance(xtr, int): + if xtr != ENOENT: + self.slave.server.purge(path) + try: + self.slave.server.mkdir(path) + except OSError: + self.add_failjob(path, 'no-remote-node') + return + xtr = self.minus_infinity + else: + self.xtime_reversion_hook(path, xtl, xtr) + if xtl == xtr: + if path == '.' and self.change_seen: + self.turns += 1 + self.change_seen = False + if self.total_turns: + logging.info("finished turn #%s/%s" % \ + (self.turns, self.total_turns)) + if self.turns == self.total_turns: + logging.info("reached turn limit") + self.terminate = True + return + if path == '.': + self.change_seen = True + try: + dem = self.master.server.entries(path) + except OSError: + self.add_failjob(path, 'local-entries-fail') + return + random.shuffle(dem) + try: + des = self.slave.server.entries(path) + except OSError: + self.slave.server.purge(path) + try: + self.slave.server.mkdir(path) + des = self.slave.server.entries(path) + except OSError: + self.add_failjob(path, 'remote-entries-fail') + return + dd = set(des) - set(dem) + if dd: + self.purge_missing(path, dd) + chld = [] + for e in dem: + e = os.path.join(path, e) + xte = self.xtime(e) + if isinstance(xte, int): + logging.warn("irregular xtime for %s: %s" % (e, errno.errorcode[xte])) + elif self.need_sync(e, xte, xtr): + chld.append((e, xte)) + def indulgently(e, fnc, blame=None): + if not blame: + blame = path + try: + return fnc(e) + except (IOError, OSError): + ex = sys.exc_info()[1] + if ex.errno == ENOENT: + logging.warn("salvaged ENOENT for " + e) + self.add_failjob(blame, 'by-indulgently') + return False + else: + raise + for e, xte in chld: + st = indulgently(e, lambda e: os.lstat(e)) + if st == False: + continue + mo = st.st_mode + adct = {'own': (st.st_uid, st.st_gid)} + if stat.S_ISLNK(mo): + if indulgently(e, lambda e: self.slave.server.symlink(os.readlink(e), e)) == False: + continue + self.sendmark(e, xte, adct) + elif stat.S_ISREG(mo): + logging.debug("syncing %s ..." % e) + pb = self.syncer.add(e) + timeA=datetime.now() + def regjob(e, xte, pb): + if pb.wait(): + logging.debug("synced " + e) + self.sendmark_regular(e, xte) + + timeB=datetime.now() + self.lastSyncTime=timeB-timeA + self.syncTime=(self.syncTime+self.lastSyncTime.microseconds)/(10.0**6) + self.filesSynced=self.filesSynced+1 + return True + else: + logging.warn("failed to sync " + e) + self.add_job(path, 'reg', regjob, e, xte, pb) + elif stat.S_ISDIR(mo): + adct['mode'] = mo + if indulgently(e, lambda e: (self.add_job(path, 'cwait', self.wait, e, xte, adct), + self.crawl(e, xte), + True)[-1], blame=e) == False: + continue + else: + # ignore fifos, sockets and special files + pass + if path == '.': + self.wait(path, xtl) + +class BoxClosedErr(Exception): + pass + +class PostBox(list): + """synchronized collection for storing things thought of as "requests" """ + + def __init__(self, *a): + list.__init__(self, *a) + # too bad Python stdlib does not have read/write locks... + # it would suffivce to grab the lock in .append as reader, in .close as writer + self.lever = Condition() + self.open = True + self.done = False + + def wait(self): + """wait on requests to be processed""" + self.lever.acquire() + if not self.done: + self.lever.wait() + self.lever.release() + return self.result + + def wakeup(self, data): + """wake up requestors with the result""" + self.result = data + self.lever.acquire() + self.done = True + self.lever.notifyAll() + self.lever.release() + + def append(self, e): + """post a request""" + self.lever.acquire() + if not self.open: + raise BoxClosedErr + list.append(self, e) + self.lever.release() + + def close(self): + """prohibit the posting of further requests""" + self.lever.acquire() + self.open = False + self.lever.release() + +class Syncer(object): + """a staged queue to relay rsync requests to rsync workers + + By "staged queue" its meant that when a consumer comes to the + queue, it takes _all_ entries, leaving the queue empty. + (I don't know if there is an official term for this pattern.) + + The queue uses a PostBox to accumulate incoming items. + When a consumer (rsync worker) comes, a new PostBox is + set up and the old one is passed on to the consumer. + + Instead of the simplistic scheme of having one big lock + which synchronizes both the addition of new items and + PostBox exchanges, use a separate lock to arbitrate consumers, + and rely on PostBox's synchronization mechanisms take + care about additions. + + There is a corner case racy situation, producers vs. consumers, + which is not handled by this scheme: namely, when the PostBox + exchange occurs in between being passed to the producer for posting + and the post placement. But that's what Postbox.close is for: + such a posting will find the PostBox closed, in which case + the producer can re-try posting against the actual PostBox of + the queue. + + To aid accumlation of items in the PostBoxen before grabbed + by an rsync worker, the worker goes to sleep a bit after + each completed syncjob. + """ + + def __init__(self, slave): + """spawn worker threads""" + self.slave = slave + self.lock = Lock() + self.pb = PostBox() + self.bytesSynced=0 + for i in range(int(gconf.sync_jobs)): + t = Thread(target=self.syncjob) + t.start() + + def syncjob(self): + """the life of a worker""" + while True: + pb = None + while True: + self.lock.acquire() + if self.pb: + pb, self.pb = self.pb, PostBox() + self.lock.release() + if pb: + break + time.sleep(0.5) + pb.close() + po = self.slave.rsync(pb) + if po.returncode == 0: + regEx=re.search('\ *total\ *transferred\ *file\ *size:\ *(\d+)\ *bytes\ *',po.stdout.read(),re.IGNORECASE) + if regEx: + self.bytesSynced+=(int(regEx.group(1)))/1024 + ret = True + elif po.returncode in (23, 24): + # partial transfer (cf. rsync(1)), that's normal + ret = False + else: + po.errfail() + pb.wakeup(ret) + + def add(self, e): + while True: + pb = self.pb + try: + pb.append(e) + return pb + except BoxClosedErr: + pass diff --git a/geo-replication/syncdaemon/monitor.py b/geo-replication/syncdaemon/monitor.py new file mode 100644 index 00000000000..b8956dcc2b9 --- /dev/null +++ b/geo-replication/syncdaemon/monitor.py @@ -0,0 +1,129 @@ +import os +import sys +import time +import signal +import logging +from gconf import gconf +from syncdutils import update_file, select, waitpid, set_term_handler + +class Monitor(object): + """class which spawns and manages gsyncd workers""" + + def __init__(self): + self.state = None + + def set_state(self, state): + """set the state that can be used by external agents + like glusterd for status reporting""" + if state == self.state: + return + self.state = state + logging.info('new state: %s' % state) + if getattr(gconf, 'state_file', None): + update_file(gconf.state_file, lambda f: f.write(state + '\n')) + + def monitor(self): + """the monitor loop + + Basic logic is a blantantly simple blunt heuristics: + if spawned client survives 60 secs, it's considered OK. + This servers us pretty well as it's not vulneralbe to + any kind of irregular behavior of the child... + + ... well, except for one: if children is hung up on + waiting for some event, it can survive aeons, still + will be defunct. So we tweak the above logic to + expect the worker to send us a signal within 60 secs + (in the form of closing its end of a pipe). The worker + does this when it's done with the setup stage + ready to enter the service loop (note it's the setup + stage which is vulnerable to hangs -- the full + blown worker blows up on EPIPE if the net goes down, + due to the keep-alive thread) + """ + def sigcont_handler(*a): + """ + Re-init logging and send group kill signal + """ + md = gconf.log_metadata + logging.shutdown() + lcls = logging.getLoggerClass() + lcls.setup(label=md.get('saved_label'), **md) + pid = os.getpid() + os.kill(-pid, signal.SIGUSR1) + signal.signal(signal.SIGUSR1, lambda *a: ()) + signal.signal(signal.SIGCONT, sigcont_handler) + + argv = sys.argv[:] + for o in ('-N', '--no-daemon', '--monitor'): + while o in argv: + argv.remove(o) + argv.extend(('-N', '-p', '')) + argv.insert(0, os.path.basename(sys.executable)) + + self.set_state('starting...') + ret = 0 + def nwait(p, o=0): + p2, r = waitpid(p, o) + if not p2: + return + return r + def exit_signalled(s): + """ child teminated due to receipt of SIGUSR1 """ + return (os.WIFSIGNALED(s) and (os.WTERMSIG(s) == signal.SIGUSR1)) + def exit_status(s): + if os.WIFEXITED(s): + return os.WEXITSTATUS(s) + return 1 + conn_timeout = int(gconf.connection_timeout) + while ret in (0, 1): + logging.info('-' * conn_timeout) + logging.info('starting gsyncd worker') + pr, pw = os.pipe() + cpid = os.fork() + if cpid == 0: + os.close(pr) + os.execv(sys.executable, argv + ['--feedback-fd', str(pw)]) + os.close(pw) + t0 = time.time() + so = select((pr,), (), (), conn_timeout)[0] + os.close(pr) + if so: + ret = nwait(cpid, os.WNOHANG) + if ret != None: + logging.debug("worker died before establishing connection") + else: + logging.debug("worker seems to be connected (?? racy check)") + while time.time() < t0 + conn_timeout: + ret = nwait(cpid, os.WNOHANG) + if ret != None: + logging.debug("worker died in startup phase") + break + time.sleep(1) + else: + logging.debug("worker not confirmed in %d sec, aborting it" % \ + conn_timeout) + # relax one SIGTERM by setting a handler that sets back + # standard handler + set_term_handler(lambda *a: set_term_handler()) + # give a chance to graceful exit + os.kill(-os.getpid(), signal.SIGTERM) + time.sleep(1) + os.kill(cpid, signal.SIGKILL) + ret = nwait(cpid) + if ret == None: + self.set_state('OK') + ret = nwait(cpid) + if exit_signalled(ret): + ret = 0 + else: + ret = exit_status(ret) + if ret in (0,1): + self.set_state('faulty') + time.sleep(10) + self.set_state('inconsistent') + return ret + +def monitor(): + """oh yeah, actually Monitor is used as singleton, too""" + return Monitor().monitor() diff --git a/geo-replication/syncdaemon/repce.py b/geo-replication/syncdaemon/repce.py new file mode 100644 index 00000000000..755fb61df48 --- /dev/null +++ b/geo-replication/syncdaemon/repce.py @@ -0,0 +1,225 @@ +import os +import sys +import time +import logging +from threading import Condition +try: + import thread +except ImportError: + # py 3 + import _thread as thread +try: + from Queue import Queue +except ImportError: + # py 3 + from queue import Queue +try: + import cPickle as pickle +except ImportError: + # py 3 + import pickle + +from syncdutils import Thread, select + +pickle_proto = -1 +repce_version = 1.0 + +def ioparse(i, o): + if isinstance(i, int): + i = os.fdopen(i) + # rely on duck typing for recognizing + # streams as that works uniformly + # in py2 and py3 + if hasattr(o, 'fileno'): + o = o.fileno() + return (i, o) + +def send(out, *args): + """pickle args and write out wholly in one syscall + + ie. not use the ability of pickle to dump directly to + a stream, as that would potentially mess up messages + by interleaving them + """ + os.write(out, pickle.dumps(args, pickle_proto)) + +def recv(inf): + """load an object from input stream""" + return pickle.load(inf) + + +class RepceServer(object): + """RePCe is Hungarian for canola, http://hu.wikipedia.org/wiki/Repce + + ... also our homebrewed RPC backend where the transport layer is + reduced to a pair of filehandles. + + This is the server component. + """ + + def __init__(self, obj, i, o, wnum=6): + """register a backend object .obj to which incoming messages + are dispatched, also incoming/outcoming streams + """ + self.obj = obj + self.inf, self.out = ioparse(i, o) + self.wnum = wnum + self.q = Queue() + + def service_loop(self): + """fire up worker threads, get messages and dispatch among them""" + for i in range(self.wnum): + t = Thread(target=self.worker) + t.start() + try: + while True: + self.q.put(recv(self.inf)) + except EOFError: + logging.info("terminating on reaching EOF.") + + def worker(self): + """life of a worker + + Get message, extract its id, method name and arguments + (kwargs not supported), call method on .obj. + Send back message id + return value. + If method call throws an exception, rescue it, and send + back the exception as result (with flag marking it as + exception). + """ + while True: + in_data = self.q.get(True) + rid = in_data[0] + rmeth = in_data[1] + exc = False + if rmeth == '__repce_version__': + res = repce_version + else: + try: + res = getattr(self.obj, rmeth)(*in_data[2:]) + except: + res = sys.exc_info()[1] + exc = True + logging.exception("call failed: ") + send(self.out, rid, exc, res) + + +class RepceJob(object): + """class representing message status we can use + for waiting on reply""" + + def __init__(self, cbk): + """ + - .rid: (process-wise) unique id + - .cbk: what we do upon receiving reply + """ + self.rid = (os.getpid(), thread.get_ident(), time.time()) + self.cbk = cbk + self.lever = Condition() + self.done = False + + def __repr__(self): + return ':'.join([str(x) for x in self.rid]) + + def wait(self): + self.lever.acquire() + if not self.done: + self.lever.wait() + self.lever.release() + return self.result + + def wakeup(self, data): + self.result = data + self.lever.acquire() + self.done = True + self.lever.notify() + self.lever.release() + + +class RepceClient(object): + """RePCe is Hungarian for canola, http://hu.wikipedia.org/wiki/Repce + + ... also our homebrewed RPC backend where the transport layer is + reduced to a pair of filehandles. + + This is the client component. + """ + + def __init__(self, i, o): + self.inf, self.out = ioparse(i, o) + self.jtab = {} + t = Thread(target = self.listen) + t.start() + + def listen(self): + while True: + select((self.inf,), (), ()) + rid, exc, res = recv(self.inf) + rjob = self.jtab.pop(rid) + if rjob.cbk: + rjob.cbk(rjob, [exc, res]) + + def push(self, meth, *args, **kw): + """wrap arguments in a RepceJob, send them to server + and return the RepceJob + + @cbk to pass on RepceJob can be given as kwarg. + """ + cbk = kw.get('cbk') + if not cbk: + def cbk(rj, res): + if res[0]: + raise res[1] + rjob = RepceJob(cbk) + self.jtab[rjob.rid] = rjob + logging.debug("call %s %s%s ..." % (repr(rjob), meth, repr(args))) + send(self.out, rjob.rid, meth, *args) + return rjob + + def __call__(self, meth, *args): + """RePCe client is callabe, calling it implements a synchronous remote call + + We do a .push with a cbk which does a wakeup upon receiving anwser, then wait + on the RepceJob. + """ + rjob = self.push(meth, *args, **{'cbk': lambda rj, res: rj.wakeup(res)}) + exc, res = rjob.wait() + if exc: + logging.error('call %s (%s) failed on peer with %s' % (repr(rjob), meth, str(type(res).__name__))) + raise res + logging.debug("call %s %s -> %s" % (repr(rjob), meth, repr(res))) + return res + + class mprx(object): + """method proxy, standard trick to implement rubyesque method_missing + in Python + + A class is a closure factory, you know what I mean, or go read some SICP. + """ + def __init__(self, ins, meth): + self.ins = ins + self.meth = meth + + def __call__(self, *a): + return self.ins(self.meth, *a) + + def __getattr__(self, meth): + """this implements transparent method dispatch to remote object, + so that you don't need to call the RepceClient instance like + + rclient('how_old_are_you_if_born_in', 1979) + + but you can make it into an ordinary method call like + + rclient.how_old_are_you_if_born_in(1979) + """ + return self.mprx(self, meth) + + def __version__(self): + """used in handshake to verify compatibility""" + d = {'proto': self('__repce_version__')} + try: + d['object'] = self('version') + except AttributeError: + pass + return d diff --git a/geo-replication/syncdaemon/resource.py b/geo-replication/syncdaemon/resource.py new file mode 100644 index 00000000000..73102fbcb44 --- /dev/null +++ b/geo-replication/syncdaemon/resource.py @@ -0,0 +1,972 @@ +import re +import os +import sys +import stat +import time +import fcntl +import errno +import struct +import socket +import logging +import tempfile +import threading +import subprocess +from errno import EEXIST, ENOENT, ENODATA, ENOTDIR, ELOOP, EISDIR +from select import error as SelectError + +from gconf import gconf +import repce +from repce import RepceServer, RepceClient +from master import gmaster_builder +import syncdutils +from syncdutils import GsyncdError, select, privileged, boolify + +UrlRX = re.compile('\A(\w+)://([^ *?[]*)\Z') +HostRX = re.compile('[a-z\d](?:[a-z\d.-]*[a-z\d])?', re.I) +UserRX = re.compile("[\w!\#$%&'*+-\/=?^_`{|}~]+") + +def sup(x, *a, **kw): + """a rubyesque "super" for python ;) + + invoke caller method in parent class with given args. + """ + return getattr(super(type(x), x), sys._getframe(1).f_code.co_name)(*a, **kw) + +def desugar(ustr): + """transform sugared url strings to standard :// form + + parsing logic enforces the constraint that sugared forms should contatin + a ':' or a '/', which ensures that sugared urls do not conflict with + gluster volume names. + """ + m = re.match('([^:]*):(.*)', ustr) + if m: + if not m.groups()[0]: + return "gluster://localhost" + ustr + elif '@' in m.groups()[0] or re.search('[:/]', m.groups()[1]): + return "ssh://" + ustr + else: + return "gluster://" + ustr + else: + if ustr[0] != '/': + raise GsyncdError("cannot resolve sugared url '%s'" % ustr) + ap = os.path.normpath(ustr) + if ap.startswith('//'): + ap = ap[1:] + return "file://" + ap + +def gethostbyname(hnam): + """gethostbyname wrapper""" + try: + return socket.gethostbyname(hnam) + except socket.gaierror: + ex = sys.exc_info()[1] + raise GsyncdError("failed to resolve %s: %s" % \ + (hnam, ex.strerror)) + +def parse_url(ustr): + """instantiate an url object by scheme-to-class dispatch + + The url classes taken into consideration are the ones in + this module whose names are full-caps. + """ + m = UrlRX.match(ustr) + if not m: + ustr = desugar(ustr) + m = UrlRX.match(ustr) + if not m: + raise GsyncdError("malformed url") + sch, path = m.groups() + this = sys.modules[__name__] + if not hasattr(this, sch.upper()): + raise GsyncdError("unknown url scheme " + sch) + return getattr(this, sch.upper())(path) + + +class _MetaXattr(object): + """singleton class, a lazy wrapper around the + libcxattr module + + libcxattr (a heavy import due to ctypes) is + loaded only when when the single + instance is tried to be used. + + This reduces runtime for those invocations + which do not need filesystem manipulation + (eg. for config, url parsing) + """ + + def __getattr__(self, meth): + from libcxattr import Xattr as LXattr + xmeth = [ m for m in dir(LXattr) if m[0] != '_' ] + if not meth in xmeth: + return + for m in xmeth: + setattr(self, m, getattr(LXattr, m)) + return getattr(self, meth) + +Xattr = _MetaXattr() + + +class Popen(subprocess.Popen): + """customized subclass of subprocess.Popen with a ring + buffer for children error output""" + + @classmethod + def init_errhandler(cls): + """start the thread which handles children's error output""" + cls.errstore = {} + def tailer(): + while True: + errstore = cls.errstore.copy() + try: + poe, _ ,_ = select([po.stderr for po in errstore], [], [], 1) + except (ValueError, SelectError): + continue + for po in errstore: + if po.stderr not in poe: + continue + po.lock.acquire() + try: + if po.on_death_row: + continue + la = errstore[po] + try: + fd = po.stderr.fileno() + except ValueError: # file is already closed + continue + l = os.read(fd, 1024) + if not l: + continue + tots = len(l) + for lx in la: + tots += len(lx) + while tots > 1<<20 and la: + tots -= len(la.pop(0)) + la.append(l) + finally: + po.lock.release() + t = syncdutils.Thread(target = tailer) + t.start() + cls.errhandler = t + + @classmethod + def fork(cls): + """fork wrapper that restarts errhandler thread in child""" + pid = os.fork() + if not pid: + cls.init_errhandler() + return pid + + def __init__(self, args, *a, **kw): + """customizations for subprocess.Popen instantiation + + - 'close_fds' is taken to be the default + - if child's stderr is chosen to be managed, + register it with the error handler thread + """ + self.args = args + if 'close_fds' not in kw: + kw['close_fds'] = True + self.lock = threading.Lock() + self.on_death_row = False + try: + sup(self, args, *a, **kw) + except: + ex = sys.exc_info()[1] + if not isinstance(ex, OSError): + raise + raise GsyncdError("""execution of "%s" failed with %s (%s)""" % \ + (args[0], errno.errorcode[ex.errno], os.strerror(ex.errno))) + if kw.get('stderr') == subprocess.PIPE: + assert(getattr(self, 'errhandler', None)) + self.errstore[self] = [] + + def errlog(self): + """make a log about child's failure event""" + filling = "" + if self.elines: + filling = ", saying:" + logging.error("""command "%s" returned with %s%s""" % \ + (" ".join(self.args), repr(self.returncode), filling)) + lp = '' + def logerr(l): + logging.error(self.args[0] + "> " + l) + for l in self.elines: + ls = l.split('\n') + ls[0] = lp + ls[0] + lp = ls.pop() + for ll in ls: + logerr(ll) + if lp: + logerr(lp) + + def errfail(self): + """fail nicely if child did not terminate with success""" + self.errlog() + syncdutils.finalize(exval = 1) + + def terminate_geterr(self, fail_on_err = True): + """kill child, finalize stderr harvesting (unregister + from errhandler, set up .elines), fail on error if + asked for + """ + self.lock.acquire() + try: + self.on_death_row = True + finally: + self.lock.release() + elines = self.errstore.pop(self) + if self.poll() == None: + self.terminate() + if self.poll() == None: + time.sleep(0.1) + self.kill() + self.wait() + while True: + if not select([self.stderr],[],[],0.1)[0]: + break + b = os.read(self.stderr.fileno(), 1024) + if b: + elines.append(b) + else: + break + self.stderr.close() + self.elines = elines + if fail_on_err and self.returncode != 0: + self.errfail() + + +class Server(object): + """singleton implemening those filesystem access primitives + which are needed for geo-replication functionality + + (Singleton in the sense it's a class which has only static + and classmethods and is used directly, without instantiation.) + """ + + GX_NSPACE = (privileged() and "trusted" or "system") + ".glusterfs" + NTV_FMTSTR = "!" + "B"*19 + "II" + FRGN_XTRA_FMT = "I" + FRGN_FMTSTR = NTV_FMTSTR + FRGN_XTRA_FMT + + def _pathguard(f): + """decorator method that checks + the path argument of the decorated + functions to make sure it does not + point out of the managed tree + """ + + fc = getattr(f, 'func_code', None) + if not fc: + # python 3 + fc = f.__code__ + pi = list(fc.co_varnames).index('path') + def ff(*a): + path = a[pi] + ps = path.split('/') + if path[0] == '/' or '..' in ps: + raise ValueError('unsafe path') + return f(*a) + return ff + + @staticmethod + @_pathguard + def entries(path): + """directory entries in an array""" + # prevent symlinks being followed + if not stat.S_ISDIR(os.lstat(path).st_mode): + raise OSError(ENOTDIR, os.strerror(ENOTDIR)) + return os.listdir(path) + + @classmethod + @_pathguard + def purge(cls, path, entries=None): + """force-delete subtrees + + If @entries is not specified, delete + the whole subtree under @path (including + @path). + + Otherwise, @entries should be a + a sequence of children of @path, and + the effect is identical with a joint + @entries-less purge on them, ie. + + for e in entries: + cls.purge(os.path.join(path, e)) + """ + me_also = entries == None + if not entries: + try: + # if it's a symlink, prevent + # following it + try: + os.unlink(path) + return + except OSError: + ex = sys.exc_info()[1] + if ex.errno == EISDIR: + entries = os.listdir(path) + else: + raise + except OSError: + ex = sys.exc_info()[1] + if ex.errno in (ENOTDIR, ENOENT, ELOOP): + try: + os.unlink(path) + return + except OSError: + ex = sys.exc_info()[1] + if ex.errno == ENOENT: + return + raise + else: + raise + for e in entries: + cls.purge(os.path.join(path, e)) + if me_also: + os.rmdir(path) + + @classmethod + @_pathguard + def _create(cls, path, ctor): + """path creation backend routine""" + try: + ctor(path) + except OSError: + ex = sys.exc_info()[1] + if ex.errno == EEXIST: + cls.purge(path) + return ctor(path) + raise + + @classmethod + @_pathguard + def mkdir(cls, path): + cls._create(path, os.mkdir) + + @classmethod + @_pathguard + def symlink(cls, lnk, path): + cls._create(path, lambda p: os.symlink(lnk, p)) + + @classmethod + @_pathguard + def xtime(cls, path, uuid): + """query xtime extended attribute + + Return xtime of @path for @uuid as a pair of integers. + "Normal" errors due to non-existent @path or extended attribute + are tolerated and errno is returned in such a case. + """ + + try: + return struct.unpack('!II', Xattr.lgetxattr(path, '.'.join([cls.GX_NSPACE, uuid, 'xtime']), 8)) + except OSError: + ex = sys.exc_info()[1] + if ex.errno in (ENOENT, ENODATA, ENOTDIR): + return ex.errno + else: + raise + + @classmethod + def xtime_vec(cls, path, *uuids): + """vectored version of @xtime + + accepts a list of uuids and returns a dictionary + with uuid as key(s) and xtime as value(s) + """ + xt = {} + for uuid in uuids: + xtu = cls.xtime(path, uuid) + if xtu == ENODATA: + xtu = None + if isinstance(xtu, int): + return xtu + xt[uuid] = xtu + return xt + + @classmethod + @_pathguard + def set_xtime(cls, path, uuid, mark): + """set @mark as xtime for @uuid on @path""" + Xattr.lsetxattr(path, '.'.join([cls.GX_NSPACE, uuid, 'xtime']), struct.pack('!II', *mark)) + + @classmethod + def set_xtime_vec(cls, path, mark_dct): + """vectored (or dictered) version of set_xtime + + ignore values that match @ignore + """ + for u,t in mark_dct.items(): + cls.set_xtime(path, u, t) + + @staticmethod + @_pathguard + def setattr(path, adct): + """set file attributes + + @adct is a dict, where 'own', 'mode' and 'times' + keys are looked for and values used to perform + chown, chmod or utimes on @path. + """ + own = adct.get('own') + if own: + os.lchown(path, *own) + mode = adct.get('mode') + if mode: + os.chmod(path, stat.S_IMODE(mode)) + times = adct.get('times') + if times: + os.utime(path, times) + + @staticmethod + def pid(): + return os.getpid() + + last_keep_alive = 0 + @classmethod + def keep_alive(cls, dct): + """process keepalive messages. + + Return keep-alive counter (number of received keep-alive + messages). + + Now the "keep-alive" message can also have a payload which is + used to set a foreign volume-mark on the underlying file system. + """ + if dct: + key = '.'.join([cls.GX_NSPACE, 'volume-mark', dct['uuid']]) + val = struct.pack(cls.FRGN_FMTSTR, + *(dct['version'] + + tuple(int(x,16) for x in re.findall('(?:[\da-f]){2}', dct['uuid'])) + + (dct['retval'],) + dct['volume_mark'][0:2] + (dct['timeout'],))) + Xattr.lsetxattr('.', key, val) + cls.last_keep_alive += 1 + return cls.last_keep_alive + + @staticmethod + def version(): + """version used in handshake""" + return 1.0 + + +class SlaveLocal(object): + """mix-in class to implement some factes of a slave server + + ("mix-in" is sort of like "abstract class", ie. it's not + instantiated just included in the ancesty DAG. I use "mix-in" + to indicate that it's not used as an abstract base class, + rather just taken in to implement additional functionality + on the basis of the assumed availability of certain interfaces.) + """ + + def can_connect_to(self, remote): + """determine our position in the connectibility matrix""" + return not remote + + def service_loop(self): + """start a RePCe server serving self's server + + stop servicing if a timeout is configured and got no + keep-alime in that inteval + """ + + if boolify(gconf.use_rsync_xattrs) and not privileged(): + raise GsyncdError("using rsync for extended attributes is not supported") + + repce = RepceServer(self.server, sys.stdin, sys.stdout, int(gconf.sync_jobs)) + t = syncdutils.Thread(target=lambda: (repce.service_loop(), + syncdutils.finalize())) + t.start() + logging.info("slave listening") + if gconf.timeout and int(gconf.timeout) > 0: + while True: + lp = self.server.last_keep_alive + time.sleep(int(gconf.timeout)) + if lp == self.server.last_keep_alive: + logging.info("connection inactive for %d seconds, stopping" % int(gconf.timeout)) + break + else: + select((), (), ()) + +class SlaveRemote(object): + """mix-in class to implement an interface to a remote slave""" + + def connect_remote(self, rargs=[], **opts): + """connects to a remote slave + + Invoke an auxiliary utility (slave gsyncd, possibly wrapped) + which sets up the connection and set up a RePCe client to + communicate throuh its stdio. + """ + slave = opts.get('slave', self.url) + extra_opts = [] + so = getattr(gconf, 'session_owner', None) + if so: + extra_opts += ['--session-owner', so] + if boolify(gconf.use_rsync_xattrs): + extra_opts.append('--use-rsync-xattrs') + po = Popen(rargs + gconf.remote_gsyncd.split() + extra_opts + \ + ['-N', '--listen', '--timeout', str(gconf.timeout), slave], + stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + gconf.transport = po + return self.start_fd_client(po.stdout, po.stdin, **opts) + + def start_fd_client(self, i, o, **opts): + """set up RePCe client, handshake with server + + It's cut out as a separate method to let + subclasses hook into client startup + """ + self.server = RepceClient(i, o) + rv = self.server.__version__() + exrv = {'proto': repce.repce_version, 'object': Server.version()} + da0 = (rv, exrv) + da1 = ({}, {}) + for i in range(2): + for k, v in da0[i].iteritems(): + da1[i][k] = int(v) + if da1[0] != da1[1]: + raise GsyncdError("RePCe major version mismatch: local %s, remote %s" % (exrv, rv)) + + def rsync(self, files, *args): + """invoke rsync""" + if not files: + raise GsyncdError("no files to sync") + logging.debug("files: " + ", ".join(files)) + argv = gconf.rsync_command.split() + \ + ['-aR0', '--files-from=-', '--super','--stats', '--numeric-ids', '--no-implied-dirs'] + \ + gconf.rsync_options.split() + (boolify(gconf.use_rsync_xattrs) and ['--xattrs'] or []) + \ + ['.'] + list(args) + po = Popen(argv, stdin=subprocess.PIPE,stdout=subprocess.PIPE,stderr=subprocess.PIPE) + for f in files: + po.stdin.write(f) + po.stdin.write('\0') + + po.stdin.close() + po.wait() + po.terminate_geterr(fail_on_err = False) + + return po + + +class AbstractUrl(object): + """abstract base class for url scheme classes""" + + def __init__(self, path, pattern): + m = re.search(pattern, path) + if not m: + raise GsyncdError("malformed path") + self.path = path + return m.groups() + + @property + def scheme(self): + return type(self).__name__.lower() + + def canonical_path(self): + return self.path + + def get_url(self, canonical=False, escaped=False): + """format self's url in various styles""" + if canonical: + pa = self.canonical_path() + else: + pa = self.path + u = "://".join((self.scheme, pa)) + if escaped: + u = syncdutils.escape(u) + return u + + @property + def url(self): + return self.get_url() + + + ### Concrete resource classes ### + + +class FILE(AbstractUrl, SlaveLocal, SlaveRemote): + """scheme class for file:// urls + + can be used to represent a file slave server + on slave side, or interface to a remote file + file server on master side + """ + + class FILEServer(Server): + """included server flavor""" + pass + + server = FILEServer + + def __init__(self, path): + sup(self, path, '^/') + + def connect(self): + """inhibit the resource beyond""" + os.chdir(self.path) + + def rsync(self, files): + return sup(self, files, self.path) + + +class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote): + """scheme class for gluster:// urls + + can be used to represent a gluster slave server + on slave side, or interface to a remote gluster + slave on master side, or to represent master + (slave-ish features come from the mixins, master + functionality is outsourced to GMaster from master) + """ + + class GLUSTERServer(Server): + "server enhancements for a glusterfs backend""" + + @classmethod + def _attr_unpack_dict(cls, xattr, extra_fields = ''): + """generic volume mark fetching/parsing backed""" + fmt_string = cls.NTV_FMTSTR + extra_fields + buf = Xattr.lgetxattr('.', xattr, struct.calcsize(fmt_string)) + vm = struct.unpack(fmt_string, buf) + m = re.match('(.{8})(.{4})(.{4})(.{4})(.{12})', "".join(['%02x' % x for x in vm[2:18]])) + uuid = '-'.join(m.groups()) + volinfo = { 'version': vm[0:2], + 'uuid' : uuid, + 'retval' : vm[18], + 'volume_mark': vm[19:21], + } + if extra_fields: + return volinfo, vm[-len(extra_fields):] + else: + return volinfo + + @classmethod + def foreign_volume_infos(cls): + """return list of valid (not expired) foreign volume marks""" + dict_list = [] + xattr_list = Xattr.llistxattr_buf('.') + for ele in xattr_list: + if ele.find('.'.join([cls.GX_NSPACE, 'volume-mark', ''])) == 0: + d, x = cls._attr_unpack_dict(ele, cls.FRGN_XTRA_FMT) + now = int(time.time()) + if x[0] > now: + logging.debug("volinfo[%s] expires: %d (%d sec later)" % \ + (d['uuid'], x[0], x[0] - now)) + d['timeout'] = x[0] + dict_list.append(d) + else: + try: + Xattr.lremovexattr('.', ele) + except OSError: + pass + return dict_list + + @classmethod + def native_volume_info(cls): + """get the native volume mark of the underlying gluster volume""" + try: + return cls._attr_unpack_dict('.'.join([cls.GX_NSPACE, 'volume-mark'])) + except OSError: + ex = sys.exc_info()[1] + if ex.errno != ENODATA: + raise + + server = GLUSTERServer + + def __init__(self, path): + self.host, self.volume = sup(self, path, '^(%s):(.+)' % HostRX.pattern) + + def canonical_path(self): + return ':'.join([gethostbyname(self.host), self.volume]) + + def can_connect_to(self, remote): + """determine our position in the connectibility matrix""" + return True + + class Mounter(object): + """Abstract base class for mounter backends""" + + def __init__(self, params): + self.params = params + self.mntpt = None + + @classmethod + def get_glusterprog(cls): + return os.path.join(gconf.gluster_command_dir, cls.glusterprog) + + def umount_l(self, d): + """perform lazy umount""" + po = Popen(self.make_umount_argv(d), stderr=subprocess.PIPE) + po.wait() + return po + + @classmethod + def make_umount_argv(cls, d): + raise NotImplementedError + + def make_mount_argv(self, *a): + raise NotImplementedError + + def cleanup_mntpt(self, *a): + pass + + def handle_mounter(self, po): + po.wait() + + def inhibit(self, *a): + """inhibit a gluster filesystem + + Mount glusterfs over a temporary mountpoint, + change into the mount, and lazy unmount the + filesystem. + """ + + mpi, mpo = os.pipe() + mh = Popen.fork() + if mh: + os.close(mpi) + fcntl.fcntl(mpo, fcntl.F_SETFD, fcntl.FD_CLOEXEC) + d = None + margv = self.make_mount_argv(*a) + if self.mntpt: + # mntpt is determined pre-mount + d = self.mntpt + os.write(mpo, d + '\0') + po = Popen(margv, **self.mountkw) + self.handle_mounter(po) + po.terminate_geterr() + logging.debug('auxiliary glusterfs mount in place') + if not d: + # mntpt is determined during mount + d = self.mntpt + os.write(mpo, d + '\0') + os.write(mpo, 'M') + t = syncdutils.Thread(target=lambda: os.chdir(d)) + t.start() + tlim = gconf.starttime + int(gconf.connection_timeout) + while True: + if not t.isAlive(): + break + if time.time() >= tlim: + syncdutils.finalize(exval = 1) + time.sleep(1) + os.close(mpo) + _, rv = syncdutils.waitpid(mh, 0) + if rv: + rv = (os.WIFEXITED(rv) and os.WEXITSTATUS(rv) or 0) - \ + (os.WIFSIGNALED(rv) and os.WTERMSIG(rv) or 0) + logging.warn('stale mount possibly left behind on ' + d) + raise GsyncdError("cleaning up temp mountpoint %s failed with status %d" % \ + (d, rv)) + else: + rv = 0 + try: + os.setsid() + os.close(mpo) + mntdata = '' + while True: + c = os.read(mpi, 1) + if not c: + break + mntdata += c + if mntdata: + mounted = False + if mntdata[-1] == 'M': + mntdata = mntdata[:-1] + assert(mntdata) + mounted = True + assert(mntdata[-1] == '\0') + mntpt = mntdata[:-1] + assert(mntpt) + if mounted: + po = self.umount_l(mntpt) + po.terminate_geterr(fail_on_err = False) + if po.returncode != 0: + po.errlog() + rv = po.returncode + self.cleanup_mntpt(mntpt) + except: + logging.exception('mount cleanup failure:') + rv = 200 + os._exit(rv) + logging.debug('auxiliary glusterfs mount prepared') + + class DirectMounter(Mounter): + """mounter backend which calls mount(8), umount(8) directly""" + + mountkw = {'stderr': subprocess.PIPE} + glusterprog = 'glusterfs' + + @staticmethod + def make_umount_argv(d): + return ['umount', '-l', d] + + def make_mount_argv(self): + self.mntpt = tempfile.mkdtemp(prefix = 'gsyncd-aux-mount-') + return [self.get_glusterprog()] + ['--' + p for p in self.params] + [self.mntpt] + + def cleanup_mntpt(self, mntpt = None): + if not mntpt: + mntpt = self.mntpt + os.rmdir(mntpt) + + class MountbrokerMounter(Mounter): + """mounter backend using the mountbroker gluster service""" + + mountkw = {'stderr': subprocess.PIPE, 'stdout': subprocess.PIPE} + glusterprog = 'gluster' + + @classmethod + def make_cli_argv(cls): + return [cls.get_glusterprog()] + gconf.gluster_cli_options.split() + ['system::'] + + @classmethod + def make_umount_argv(cls, d): + return cls.make_cli_argv() + ['umount', d, 'lazy'] + + def make_mount_argv(self, label): + return self.make_cli_argv() + \ + ['mount', label, 'user-map-root=' + syncdutils.getusername()] + self.params + + def handle_mounter(self, po): + self.mntpt = po.stdout.readline()[:-1] + po.stdout.close() + sup(self, po) + if po.returncode != 0: + # if cli terminated with error due to being + # refused by glusterd, what it put + # out on stdout is a diagnostic message + logging.error('glusterd answered: %s' % self.mntpt) + + def connect(self): + """inhibit the resource beyond + + Choose mounting backend (direct or mountbroker), + set up glusterfs parameters and perform the mount + with given backend + """ + + label = getattr(gconf, 'mountbroker', None) + if not label and not privileged(): + label = syncdutils.getusername() + mounter = label and self.MountbrokerMounter or self.DirectMounter + params = gconf.gluster_params.split() + \ + (gconf.gluster_log_level and ['log-level=' + gconf.gluster_log_level] or []) + \ + ['log-file=' + gconf.gluster_log_file, 'volfile-server=' + self.host, + 'volfile-id=' + self.volume, 'client-pid=-1'] + mounter(params).inhibit(*[l for l in [label] if l]) + + def connect_remote(self, *a, **kw): + sup(self, *a, **kw) + self.slavedir = "/proc/%d/cwd" % self.server.pid() + + def service_loop(self, *args): + """enter service loop + + - if slave given, instantiate GMaster and + pass control to that instance, which implements + master behavior + - else do that's what's inherited + """ + if args: + gmaster_builder()(self, args[0]).crawl_loop() + else: + sup(self, *args) + + def rsync(self, files): + return sup(self, files, self.slavedir) + + +class SSH(AbstractUrl, SlaveRemote): + """scheme class for ssh:// urls + + interface to remote slave on master side + implementing an ssh based proxy + """ + + def __init__(self, path): + self.remote_addr, inner_url = sup(self, path, + '^((?:%s@)?%s):(.+)' % tuple([ r.pattern for r in (UserRX, HostRX) ])) + self.inner_rsc = parse_url(inner_url) + + def canonical_path(self): + m = re.match('([^@]+)@(.+)', self.remote_addr) + if m: + u, h = m.groups() + else: + u, h = syncdutils.getusername(), self.remote_addr + remote_addr = '@'.join([u, gethostbyname(h)]) + return ':'.join([remote_addr, self.inner_rsc.get_url(canonical=True)]) + + def can_connect_to(self, remote): + """determine our position in the connectibility matrix""" + return False + + def start_fd_client(self, *a, **opts): + """customizations for client startup + + - be a no-op if we are to daemonize (client startup is deferred + to post-daemon stage) + - determine target url for rsync after consulting server + """ + if opts.get('deferred'): + return a + sup(self, *a) + ityp = type(self.inner_rsc) + if ityp == FILE: + slavepath = self.inner_rsc.path + elif ityp == GLUSTER: + slavepath = "/proc/%d/cwd" % self.server.pid() + else: + raise NotImplementedError + self.slaveurl = ':'.join([self.remote_addr, slavepath]) + + def connect_remote(self, go_daemon=None): + """connect to inner slave url through outer ssh url + + Wrap the connecting utility in ssh. + + Much care is put into daemonizing: in that case + ssh is started before daemonization, but + RePCe client is to be created after that (as ssh + interactive password auth would be defeated by + a daemonized ssh, while client should be present + only in the final process). In that case the action + is taken apart to two parts, this method is ivoked + once pre-daemon, once post-daemon. Use @go_daemon + to deiced what part to perform. + + [NB. ATM gluster product does not makes use of interactive + authentication.] + """ + if go_daemon == 'done': + return self.start_fd_client(*self.fd_pair) + gconf.setup_ssh_ctl(tempfile.mkdtemp(prefix='gsyncd-aux-ssh-')) + deferred = go_daemon == 'postconn' + ret = sup(self, gconf.ssh_command.split() + gconf.ssh_ctl_args + [self.remote_addr], slave=self.inner_rsc.url, deferred=deferred) + if deferred: + # send a message to peer so that we can wait for + # the answer from which we know connection is + # established and we can proceed with daemonization + # (doing that too early robs the ssh passwd prompt...) + # However, we'd better not start the RepceClient + # before daemonization (that's not preserved properly + # in daemon), we just do a an ad-hoc linear put/get. + i, o = ret + inf = os.fdopen(i) + repce.send(o, None, '__repce_version__') + select((inf,), (), ()) + repce.recv(inf) + # hack hack hack: store a global reference to the file + # to save it from getting GC'd which implies closing it + gconf.permanent_handles.append(inf) + self.fd_pair = (i, o) + return 'should' + + def rsync(self, files): + return sup(self, files, '-e', " ".join(gconf.ssh_command.split() + gconf.ssh_ctl_args), + *(gconf.rsync_ssh_options.split() + [self.slaveurl])) diff --git a/geo-replication/syncdaemon/syncdutils.py b/geo-replication/syncdaemon/syncdutils.py new file mode 100644 index 00000000000..0764c07904d --- /dev/null +++ b/geo-replication/syncdaemon/syncdutils.py @@ -0,0 +1,288 @@ +import os +import sys +import pwd +import time +import fcntl +import shutil +import logging +from threading import Lock, Thread as baseThread +from errno import EACCES, EAGAIN, EPIPE, ENOTCONN, ECONNABORTED, EINTR, errorcode +from signal import signal, SIGTERM, SIGKILL +from time import sleep +import select as oselect +from os import waitpid as owaitpid +try: + from cPickle import PickleError +except ImportError: + # py 3 + from pickle import PickleError + +from gconf import gconf + +try: + # py 3 + from urllib import parse as urllib +except ImportError: + import urllib + +def escape(s): + """the chosen flavor of string escaping, used all over + to turn whatever data to creatable representation""" + return urllib.quote_plus(s) + +def unescape(s): + """inverse of .escape""" + return urllib.unquote_plus(s) + +def norm(s): + if s: + return s.replace('-', '_') + +def update_file(path, updater, merger = lambda f: True): + """update a file in a transaction-like manner""" + + fr = fw = None + try: + fd = os.open(path, os.O_CREAT|os.O_RDWR) + try: + fr = os.fdopen(fd, 'r+b') + except: + os.close(fd) + raise + fcntl.lockf(fr, fcntl.LOCK_EX) + if not merger(fr): + return + + tmpp = path + '.tmp.' + str(os.getpid()) + fd = os.open(tmpp, os.O_CREAT|os.O_EXCL|os.O_WRONLY) + try: + fw = os.fdopen(fd, 'wb', 0) + except: + os.close(fd) + raise + updater(fw) + os.fsync(fd) + os.rename(tmpp, path) + finally: + for fx in (fr, fw): + if fx: + fx.close() + +def grabfile(fname, content=None): + """open @fname + contest for its fcntl lock + + @content: if given, set the file content to it + """ + # damn those messy open() mode codes + fd = os.open(fname, os.O_CREAT|os.O_RDWR) + f = os.fdopen(fd, 'r+b', 0) + try: + fcntl.lockf(f, fcntl.LOCK_EX|fcntl.LOCK_NB) + except: + ex = sys.exc_info()[1] + f.close() + if isinstance(ex, IOError) and ex.errno in (EACCES, EAGAIN): + # cannot grab, it's taken + return + raise + if content: + try: + f.truncate() + f.write(content) + except: + f.close() + raise + gconf.permanent_handles.append(f) + return f + +def grabpidfile(fname=None, setpid=True): + """.grabfile customization for pid files""" + if not fname: + fname = gconf.pid_file + content = None + if setpid: + content = str(os.getpid()) + '\n' + return grabfile(fname, content=content) + +final_lock = Lock() + +def finalize(*a, **kw): + """all those messy final steps we go trough upon termination + + Do away with pidfile, ssh control dir and logging. + """ + final_lock.acquire() + if getattr(gconf, 'pid_file', None): + rm_pidf = gconf.pid_file_owned + if gconf.cpid: + # exit path from parent branch of daemonization + rm_pidf = False + while True: + f = grabpidfile(setpid=False) + if not f: + # child has already taken over pidfile + break + if os.waitpid(gconf.cpid, os.WNOHANG)[0] == gconf.cpid: + # child has terminated + rm_pidf = True + break; + time.sleep(0.1) + if rm_pidf: + try: + os.unlink(gconf.pid_file) + except: + ex = sys.exc_info()[1] + if ex.errno == ENOENT: + pass + else: + raise + if gconf.ssh_ctl_dir and not gconf.cpid: + shutil.rmtree(gconf.ssh_ctl_dir) + if getattr(gconf, 'state_socket', None): + try: + os.unlink(gconf.state_socket) + except: + if sys.exc_info()[0] == OSError: + pass + if gconf.log_exit: + logging.info("exiting.") + sys.stdout.flush() + sys.stderr.flush() + os._exit(kw.get('exval', 0)) + +def log_raise_exception(excont): + """top-level exception handler + + Try to some fancy things to cover up we face with an error. + Translate some weird sounding but well understood exceptions + into human-friendly lingo + """ + is_filelog = False + for h in logging.getLogger().handlers: + fno = getattr(getattr(h, 'stream', None), 'fileno', None) + if fno and not os.isatty(fno()): + is_filelog = True + + exc = sys.exc_info()[1] + if isinstance(exc, SystemExit): + excont.exval = exc.code or 0 + raise + else: + logtag = None + if isinstance(exc, GsyncdError): + if is_filelog: + logging.error(exc.args[0]) + sys.stderr.write('failure: ' + exc.args[0] + '\n') + elif isinstance(exc, PickleError) or isinstance(exc, EOFError) or \ + ((isinstance(exc, OSError) or isinstance(exc, IOError)) and \ + exc.errno == EPIPE): + logging.error('connection to peer is broken') + if hasattr(gconf, 'transport'): + gconf.transport.wait() + if gconf.transport.returncode == 127: + logging.warn("!!!!!!!!!!!!!") + logging.warn('!!! getting "No such file or directory" errors ' + "is most likely due to MISCONFIGURATION, please consult " + "http://access.redhat.com/knowledge/docs/en-US/Red_Hat_Storage/2.0/html/Administration_Guide/chap-User_Guide-Geo_Rep-Preparation-Settingup_Environment.html") + logging.warn("!!!!!!!!!!!!!") + gconf.transport.terminate_geterr() + elif isinstance(exc, OSError) and exc.errno in (ENOTCONN, ECONNABORTED): + logging.error('glusterfs session went down [%s]', errorcode[exc.errno]) + else: + logtag = "FAIL" + if not logtag and logging.getLogger().isEnabledFor(logging.DEBUG): + logtag = "FULL EXCEPTION TRACE" + if logtag: + logging.exception(logtag + ": ") + sys.stderr.write("failed with %s.\n" % type(exc).__name__) + excont.exval = 1 + sys.exit(excont.exval) + + +class FreeObject(object): + """wildcard class for which any attribute can be set""" + + def __init__(self, **kw): + for k,v in kw.items(): + setattr(self, k, v) + +class Thread(baseThread): + """thread class flavor for gsyncd + + - always a daemon thread + - force exit for whole program if thread + function coughs up an exception + """ + def __init__(self, *a, **kw): + tf = kw.get('target') + if tf: + def twrap(*aa): + excont = FreeObject(exval = 0) + try: + tf(*aa) + except: + try: + log_raise_exception(excont) + finally: + finalize(exval = excont.exval) + kw['target'] = twrap + baseThread.__init__(self, *a, **kw) + self.setDaemon(True) + +class GsyncdError(Exception): + pass + +def getusername(uid = None): + if uid == None: + uid = os.geteuid() + return pwd.getpwuid(uid).pw_name + +def privileged(): + return os.geteuid() == 0 + +def boolify(s): + """ + Generic string to boolean converter + + return + - Quick return if string 's' is of type bool + - True if it's in true_list + - False if it's in false_list + - Warn if it's not present in either and return False + """ + true_list = ['true', 'yes', '1', 'on'] + false_list = ['false', 'no', '0', 'off'] + + if isinstance(s, bool): + return s + + rv = False + lstr = s.lower() + if lstr in true_list: + rv = True + elif not lstr in false_list: + logging.warn("Unknown string (%s) in string to boolean conversion defaulting to False\n" % (s)) + + return rv + +def eintr_wrap(func, exc, *a): + """ + wrapper around syscalls resilient to interrupt caused + by signals + """ + while True: + try: + return func(*a) + except exc: + ex = sys.exc_info()[1] + if not ex.args[0] == EINTR: + raise + +def select(*a): + return eintr_wrap(oselect.select, oselect.error, *a) + +def waitpid (*a): + return eintr_wrap(owaitpid, OSError, *a) + +def set_term_handler(hook=lambda *a: finalize(*a, **{'exval': 1})): + signal(SIGTERM, hook) diff --git a/tests/bugs/bug-861945.t b/tests/bugs/bug-861945.t deleted file mode 100755 index 29ccb33c8b1..00000000000 --- a/tests/bugs/bug-861945.t +++ /dev/null @@ -1,51 +0,0 @@ -#!/bin/bash - -. $(dirname $0)/../include.rc - -cleanup; - -function georep_start_and_check() -{ - local master=$1 - local slave=$2 - - $CLI volume geo-replication $master $slave start -} - -function georep_stop() -{ - local master=$1 - local slave=$2 - - $CLI volume geo-replication $master $slave stop -} - -TEST glusterd -TEST pidof glusterd -TEST $CLI volume create $V0 $H0:$B0/brick0 $H0:$B0/brick1 -TEST $CLI volume start $V0 - -sleep 5 - -slave=`mktemp -d` -mkdir -p $slave - -# check normal functionality of geo-replication -EXPECT_KEYWORD "successful" georep_start_and_check $V0 $slave -TEST georep_stop $V0 $slave - -# now invoke replace brick -TEST $CLI volume replace-brick $V0 $H0:$B0/brick1 $H0:$B0/brick2 start - -# check if CLI refuses to start geo replication -EXPECT_KEYWORD "failed" georep_start_and_check $V0 $slave - -# commit replace brick operation -TEST $CLI volume replace-brick $V0 $H0:$B0/brick1 $H0:$B0/brick2 commit - -# geo replication should work as usual -EXPECT_KEYWORD "successful" georep_start_and_check $V0 $slave -TEST georep_stop $V0 $slave - -rm -rf $slave -cleanup diff --git a/tests/bugs/bug-864499.t b/tests/bugs/bug-864499.t deleted file mode 100644 index 03c7bbe136f..00000000000 --- a/tests/bugs/bug-864499.t +++ /dev/null @@ -1,20 +0,0 @@ -#!/bin/bash - -. $(dirname $0)/../include.rc - -cleanup; - - -TEST glusterd -TEST pidof glusterd -TEST $CLI volume create $V0 $H0:$B0/brick0 -TEST $CLI volume start $V0 - -TEST "$CLI volume geo-replication $V0 $B0/slave start --xml | xmllint --format -" - -TEST "$CLI volume geo-replication $V0 $B0/slave stop --xml | xmllint --format -" - -TEST $CLI volume stop $V0 - -cleanup; - diff --git a/tests/bugs/bug-864506.t b/tests/bugs/bug-864506.t deleted file mode 100755 index c281f381124..00000000000 --- a/tests/bugs/bug-864506.t +++ /dev/null @@ -1,33 +0,0 @@ -#!/bin/bash - -. $(dirname $0)/../include.rc - -cleanup; - - -TEST glusterd -TEST pidof glusterd -TEST $CLI volume create $V0 $H0:$B0/brick0 -TEST $CLI volume start $V0 - -TEST "mkdir $B0/slave" - -TEST "$CLI volume geo-replication $V0 $B0/slave start --xml | xmllint --format -" - -TEST "$CLI volume geo-replication $V0 $B0/slave2 start --xml | xmllint --format -" - -TEST "$CLI volume geo-replication status --xml | xmllint --format -" - -TEST "$CLI volume geo-replication $V0 $B0/slave status --xml | xmllint --format -" - -TEST "$CLI volume geo-replication $V0 $B0/slave2 status --xml | xmllint --format -" - -TEST "$CLI volume geo-replication $V0 $B0/slave stop --xml | xmllint --format -" - -TEST "$CLI volume geo-replication $V0 $B0/slave2 stop --xml | xmllint --format -" - -TEST $CLI volume stop $V0 - -TEST "rmdir $B0/slave" - -cleanup; diff --git a/tests/bugs/bug-874272.t b/tests/bugs/bug-874272.t deleted file mode 100755 index 01793a36873..00000000000 --- a/tests/bugs/bug-874272.t +++ /dev/null @@ -1,48 +0,0 @@ -#!/bin/bash - -. $(dirname $0)/../include.rc - -cleanup; - -TEST glusterd -TEST pidof glusterd -TEST $CLI volume info; - -function volinfo_field() -{ - local vol=$1; - local field=$2; - - $CLI volume info $vol | grep "^$field: " | sed 's/.*: //'; -} - -TEST $CLI volume create $V0 $H0:$B0/brick1; -EXPECT 'Created' volinfo_field $V0 'Status'; - -TEST $CLI volume start $V0; -EXPECT 'Started' volinfo_field $V0 'Status'; - -## Wait for volume to register with rpc.mountd -sleep 5; - -#mount on a random dir -TEST MOUNTDIR="/tmp/$RANDOM" -TEST mkdir $MOUNTDIR -TEST mount -t nfs -o vers=3,nolock,soft,intr $H0:/$V0 $MOUNTDIR; -flag=0 - -TEST touch $MOUNTDIR/testfile - -TEST GEOREPDIR="/tmp/$RANDOM" -TEST mkdir $GEOREPDIR - -TEST $CLI volume geo-replication $V0 file:///$GEOREPDIR start - -for i in {1..500}; do cat /etc/passwd >> $MOUNTDIR/testfile; if [ $? -ne 0 ]; then flag=1; break; fi; done -TEST [ $flag -eq 0 ] -TEST rm -rf $GEOREPDIR - -TEST umount $MOUNTDIR -TEST rm -rf $MOUNTDIR - -cleanup; diff --git a/xlators/features/marker/Makefile.am b/xlators/features/marker/Makefile.am index a6ba2de16ae..a985f42a877 100644 --- a/xlators/features/marker/Makefile.am +++ b/xlators/features/marker/Makefile.am @@ -1,3 +1,3 @@ -SUBDIRS = src @SYNCDAEMON_SUBDIR@ +SUBDIRS = src CLEANFILES = diff --git a/xlators/features/marker/utils/Makefile.am b/xlators/features/marker/utils/Makefile.am deleted file mode 100644 index 556951d9fb7..00000000000 --- a/xlators/features/marker/utils/Makefile.am +++ /dev/null @@ -1,3 +0,0 @@ -SUBDIRS = syncdaemon src - -CLEANFILES = diff --git a/xlators/features/marker/utils/src/Makefile.am b/xlators/features/marker/utils/src/Makefile.am deleted file mode 100644 index 9e410cda633..00000000000 --- a/xlators/features/marker/utils/src/Makefile.am +++ /dev/null @@ -1,26 +0,0 @@ -gsyncddir = $(libexecdir)/glusterfs - -gsyncd_PROGRAMS = gsyncd - -gsyncd_SOURCES = gsyncd.c procdiggy.c - -gsyncd_LDADD = $(top_builddir)/libglusterfs/src/libglusterfs.la \ - $(GF_GLUSTERFS_LIBS) - -gsyncd_LDFLAGS = $(GF_LDFLAGS) - -noinst_HEADERS = procdiggy.h - -AM_CPPFLAGS = $(GF_CPPFLAGS) \ - -I$(top_srcdir)/libglusterfs/src\ - -DGSYNCD_PREFIX=\"$(libexecdir)/glusterfs\"\ - -DUSE_LIBGLUSTERFS\ - -DSBIN_DIR=\"$(sbindir)\" -DPYTHON=\"$(PYTHON)\" - -AM_CFLAGS = -Wall $(GF_CFLAGS) - - -CLEANFILES = - -$(top_builddir)/libglusterfs/src/libglusterfs.la: - $(MAKE) -C $(top_builddir)/libglusterfs/src/ all diff --git a/xlators/features/marker/utils/src/gsyncd.c b/xlators/features/marker/utils/src/gsyncd.c deleted file mode 100644 index 9c4a5bdffb3..00000000000 --- a/xlators/features/marker/utils/src/gsyncd.c +++ /dev/null @@ -1,367 +0,0 @@ -/* - Copyright (c) 2011-2012 Red Hat, Inc. - This file is part of GlusterFS. - - This file is licensed to you under your choice of the GNU Lesser - General Public License, version 3 or any later version (LGPLv3 or - later), or the GNU General Public License, version 2 (GPLv2), in all - cases as published by the Free Software Foundation. -*/ - -#ifndef _CONFIG_H -#define _CONFIG_H -#include "config.h" -#endif - -#include -#include -#include -#include -#include /* for PATH_MAX */ - -/* NOTE (USE_LIBGLUSTERFS): - * ------------------------ - * When USE_LIBGLUSTERFS debugging sumbol is passed; perform - * glusterfs translator like initialization so that glusterfs - * globals, contexts are valid when glustefs api's are invoked. - * We unconditionally pass then while building gsyncd binary. - */ -#ifdef USE_LIBGLUSTERFS -#include "glusterfs.h" -#include "globals.h" -#endif - -#include "common-utils.h" -#include "run.h" -#include "procdiggy.h" - -#define _GLUSTERD_CALLED_ "_GLUSTERD_CALLED_" -#define _GSYNCD_DISPATCHED_ "_GSYNCD_DISPATCHED_" -#define GSYNCD_CONF "geo-replication/gsyncd.conf" -#define GSYNCD_PY "gsyncd.py" -#define RSYNC "rsync" - -int restricted = 0; - -static int -duplexpand (void **buf, size_t tsiz, size_t *len) -{ - size_t osiz = tsiz * *len; - char *p = realloc (*buf, osiz << 1); - if (!p) { - free(*buf); - return -1; - } - - memset (p + osiz, 0, osiz); - *buf = p; - *len <<= 1; - - return 0; -} - -static int -str2argv (char *str, char ***argv) -{ - char *p = NULL; - char *savetok = NULL; - int argc = 0; - size_t argv_len = 32; - int ret = 0; - - assert (str); - str = strdup (str); - if (!str) - return -1; - - *argv = calloc (argv_len, sizeof (**argv)); - if (!*argv) - goto error; - - while ((p = strtok_r (str, " ", &savetok))) { - str = NULL; - - argc++; - if (argc == argv_len) { - ret = duplexpand ((void *)argv, - sizeof (**argv), - &argv_len); - if (ret == -1) - goto error; - } - (*argv)[argc - 1] = p; - } - - return argc; - - error: - fprintf (stderr, "out of memory\n"); - return -1; -} - -static int -invoke_gsyncd (int argc, char **argv) -{ - char config_file[PATH_MAX] = {0,}; - size_t gluster_workdir_len = 0; - runner_t runner = {0,}; - int i = 0; - int j = 0; - char *nargv[argc + 4]; - char *python = NULL; - - if (restricted) { - size_t len; - /* in restricted mode we forcibly use the system-wide config */ - runinit (&runner); - runner_add_args (&runner, SBIN_DIR"/gluster", - "--log-file=-", "system::", "getwd", - NULL); - runner_redir (&runner, STDOUT_FILENO, RUN_PIPE); - if (runner_start (&runner) == 0 && - fgets (config_file, PATH_MAX, - runner_chio (&runner, STDOUT_FILENO)) != NULL && - (len = strlen (config_file)) && - config_file[len - 1] == '\n' && - runner_end (&runner) == 0) - gluster_workdir_len = len - 1; - - if (gluster_workdir_len) { - if (gluster_workdir_len + 1 + strlen (GSYNCD_CONF) + 1 > - PATH_MAX) - goto error; - config_file[gluster_workdir_len] = '/'; - strcat (config_file, GSYNCD_CONF); - } else - goto error; - - if (setenv ("_GSYNCD_RESTRICTED_", "1", 1) == -1) - goto error; - } - - if (chdir ("/") == -1) - goto error; - - j = 0; - python = getenv("PYTHON"); - if(!python) - python = PYTHON; - nargv[j++] = python; - nargv[j++] = GSYNCD_PREFIX"/python/syncdaemon/"GSYNCD_PY; - for (i = 1; i < argc; i++) - nargv[j++] = argv[i]; - if (config_file[0]) { - nargv[j++] = "-c"; - nargv[j++] = config_file; - } - nargv[j++] = NULL; - - execvp (python, nargv); - - fprintf (stderr, "exec of '%s' failed\n", python); - return 127; - - error: - fprintf (stderr, "gsyncd initializaion failed\n"); - return 1; -} - - -static int -find_gsyncd (pid_t pid, pid_t ppid, char *name, void *data) -{ - char buf[NAME_MAX * 2] = {0,}; - char path[PATH_MAX] = {0,}; - char *p = NULL; - int zeros = 0; - int ret = 0; - int fd = -1; - pid_t *pida = (pid_t *)data; - - if (ppid != pida[0]) - return 0; - - sprintf (path, PROC"/%d/cmdline", pid); - fd = open (path, O_RDONLY); - if (fd == -1) - return 0; - ret = read (fd, buf, sizeof (buf)); - close (fd); - if (ret == -1) - return 0; - for (zeros = 0, p = buf; zeros < 2 && p < buf + ret; p++) - zeros += !*p; - - ret = 0; - switch (zeros) { - case 2: - if ((strcmp (basename (buf), basename (PYTHON)) || - strcmp (basename (buf + strlen (buf) + 1), GSYNCD_PY)) == 0) { - ret = 1; - break; - } - /* fallthrough */ - case 1: - if (strcmp (basename (buf), GSYNCD_PY) == 0) - ret = 1; - } - - if (ret == 1) { - if (pida[1] != -1) { - fprintf (stderr, GSYNCD_PY" sibling is not unique"); - return -1; - } - pida[1] = pid; - } - - return 0; -} - -static int -invoke_rsync (int argc, char **argv) -{ - int i = 0; - char path[PATH_MAX] = {0,}; - pid_t pid = -1; - pid_t ppid = -1; - pid_t pida[] = {-1, -1}; - char *name = NULL; - char buf[PATH_MAX + 1] = {0,}; - int ret = 0; - - assert (argv[argc] == NULL); - - if (argc < 2 || strcmp (argv[1], "--server") != 0) - goto error; - - for (i = 2; i < argc && argv[i][0] == '-'; i++); - - if (!(i == argc - 2 && strcmp (argv[i], ".") == 0 && argv[i + 1][0] == '/')) { - fprintf (stderr, "need an rsync invocation without protected args\n"); - goto error; - } - - /* look up sshd we are spawned from */ - for (pid = getpid () ;; pid = ppid) { - ppid = pidinfo (pid, &name); - if (ppid < 0) { - fprintf (stderr, "sshd ancestor not found\n"); - goto error; - } - if (strcmp (name, "sshd") == 0) { - GF_FREE (name); - break; - } - GF_FREE (name); - } - /* look up "ssh-sibling" gsyncd */ - pida[0] = pid; - ret = prociter (find_gsyncd, pida); - if (ret == -1 || pida[1] == -1) { - fprintf (stderr, "gsyncd sibling not found\n"); - goto error; - } - /* check if rsync target matches gsyncd target */ - sprintf (path, PROC"/%d/cwd", pida[1]); - ret = readlink (path, buf, sizeof (buf)); - if (ret == -1 || ret == sizeof (buf)) - goto error; - if (strcmp (argv[argc - 1], "/") == 0 /* root dir cannot be a target */ || - (strcmp (argv[argc - 1], path) /* match against gluster target */ && - strcmp (argv[argc - 1], buf) /* match against file target */) != 0) { - fprintf (stderr, "rsync target does not match "GEOREP" session\n"); - goto error; - } - - argv[0] = RSYNC; - - execvp (RSYNC, argv); - - fprintf (stderr, "exec of "RSYNC" failed\n"); - return 127; - - error: - fprintf (stderr, "disallowed "RSYNC" invocation\n"); - return 1; -} - - -struct invocable { - char *name; - int (*invoker) (int argc, char **argv); -}; - -struct invocable invocables[] = { - { "rsync", invoke_rsync }, - { "gsyncd", invoke_gsyncd }, - { NULL, NULL} -}; - -int -main (int argc, char **argv) -{ - char *evas = NULL; - struct invocable *i = NULL; - char *b = NULL; - char *sargv = NULL; - -#ifdef USE_LIBGLUSTERFS - glusterfs_ctx_t *ctx = NULL; - - ctx = glusterfs_ctx_new (); - if (!ctx) - return ENOMEM; - - if (glusterfs_globals_init (ctx)) - return 1; - - THIS->ctx = ctx; -#endif - - evas = getenv (_GLUSTERD_CALLED_); - if (evas && strcmp (evas, "1") == 0) - /* OK, we know glusterd called us, no need to look for further config - * ... altough this conclusion should not inherit to our children - */ - unsetenv (_GLUSTERD_CALLED_); - else { - /* we regard all gsyncd invocations unsafe - * that do not come from glusterd and - * therefore restrict it - */ - restricted = 1; - - if (!getenv (_GSYNCD_DISPATCHED_)) { - evas = getenv ("SSH_ORIGINAL_COMMAND"); - if (evas) - sargv = evas; - else { - evas = getenv ("SHELL"); - if (evas && strcmp (basename (evas), "gsyncd") == 0 && - argc == 3 && strcmp (argv[1], "-c") == 0) - sargv = argv[2]; - } - } - - } - - if (!(sargv && restricted)) - return invoke_gsyncd (argc, argv); - - argc = str2argv (sargv, &argv); - if (argc == -1 || setenv (_GSYNCD_DISPATCHED_, "1", 1) == -1) { - fprintf (stderr, "internal error\n"); - return 1; - } - - b = basename (argv[0]); - for (i = invocables; i->name; i++) { - if (strcmp (b, i->name) == 0) - return i->invoker (argc, argv); - } - - fprintf (stderr, "invoking %s in restricted SSH session is not allowed\n", - b); - - return 1; -} diff --git a/xlators/features/marker/utils/src/procdiggy.c b/xlators/features/marker/utils/src/procdiggy.c deleted file mode 100644 index 1eba414c116..00000000000 --- a/xlators/features/marker/utils/src/procdiggy.c +++ /dev/null @@ -1,121 +0,0 @@ -/* - Copyright (c) 2011-2012 Red Hat, Inc. - This file is part of GlusterFS. - - This file is licensed to you under your choice of the GNU Lesser - General Public License, version 3 or any later version (LGPLv3 or - later), or the GNU General Public License, version 2 (GPLv2), in all - cases as published by the Free Software Foundation. -*/ - -#ifndef _CONFIG_H -#define _CONFIG_H -#include "config.h" -#endif - -#include -#include -#include -#include -#include -#include /* for PATH_MAX */ - -#include "common-utils.h" -#include "procdiggy.h" - -pid_t -pidinfo (pid_t pid, char **name) -{ - char buf[NAME_MAX * 2] = {0,}; - FILE *f = NULL; - char path[PATH_MAX] = {0,}; - char *p = NULL; - int ret = 0; - - sprintf (path, PROC"/%d/status", pid); - - f = fopen (path, "r"); - if (!f) - return -1; - - if (name) - *name = NULL; - for (;;) { - size_t len; - memset (buf, 0, sizeof (buf)); - if (fgets (buf, sizeof (buf), f) == NULL || - (len = strlen (buf)) == 0 || - buf[len - 1] != '\n') { - pid = -1; - goto out; - } - buf[len - 1] = '\0'; - - if (name && !*name) { - p = strtail (buf, "Name:"); - if (p) { - while (isspace (*++p)); - *name = gf_strdup (p); - if (!*name) { - pid = -2; - goto out; - } - continue; - } - } - - p = strtail (buf, "PPid:"); - if (p) - break; - } - - while (isspace (*++p)); - ret = gf_string2int (p, &pid); - if (ret == -1) - pid = -1; - - out: - fclose (f); - if (pid == -1 && name && *name) - GF_FREE (name); - if (pid == -2) - fprintf (stderr, "out of memory\n"); - return pid; -} - -int -prociter (int (*proch) (pid_t pid, pid_t ppid, char *tmpname, void *data), - void *data) -{ - char *name = NULL; - DIR *d = NULL; - struct dirent *de = NULL; - pid_t pid = -1; - pid_t ppid = -1; - int ret = 0; - - d = opendir (PROC); - if (!d) - return -1; - while (errno = 0, de = readdir (d)) { - if (gf_string2int (de->d_name, &pid) != -1 && pid >= 0) { - ppid = pidinfo (pid, &name); - switch (ppid) { - case -1: continue; - case -2: ret = -1; break; - } - ret = proch (pid, ppid, name, data); - GF_FREE (name); - if (ret) - break; - } - } - closedir (d); - if (!de && errno) { - fprintf (stderr, "failed to traverse "PROC" (%s)\n", - strerror (errno)); - ret = -1; - } - - return ret; -} diff --git a/xlators/features/marker/utils/src/procdiggy.h b/xlators/features/marker/utils/src/procdiggy.h deleted file mode 100644 index 56dfc4eb213..00000000000 --- a/xlators/features/marker/utils/src/procdiggy.h +++ /dev/null @@ -1,20 +0,0 @@ -/* - Copyright (c) 2011-2012 Red Hat, Inc. - This file is part of GlusterFS. - - This file is licensed to you under your choice of the GNU Lesser - General Public License, version 3 or any later version (LGPLv3 or - later), or the GNU General Public License, version 2 (GPLv2), in all - cases as published by the Free Software Foundation. -*/ -#ifdef __NetBSD__ -#include -#endif /* __NetBSD__ */ - -#define PROC "/proc" - -pid_t pidinfo (pid_t pid, char **name); - -int prociter (int (*proch) (pid_t pid, pid_t ppid, char *name, void *data), - void *data); - diff --git a/xlators/features/marker/utils/syncdaemon/Makefile.am b/xlators/features/marker/utils/syncdaemon/Makefile.am deleted file mode 100644 index c19f6b45919..00000000000 --- a/xlators/features/marker/utils/syncdaemon/Makefile.am +++ /dev/null @@ -1,7 +0,0 @@ -syncdaemondir = $(libexecdir)/glusterfs/python/syncdaemon - -syncdaemon_PYTHON = gconf.py gsyncd.py __init__.py master.py README.md repce.py \ - resource.py configinterface.py syncdutils.py monitor.py libcxattr.py \ - $(top_builddir)/contrib/ipaddr-py/ipaddr.py - -CLEANFILES = diff --git a/xlators/features/marker/utils/syncdaemon/README.md b/xlators/features/marker/utils/syncdaemon/README.md deleted file mode 100644 index d45006932d1..00000000000 --- a/xlators/features/marker/utils/syncdaemon/README.md +++ /dev/null @@ -1,81 +0,0 @@ -gsycnd, the Gluster Syncdaemon -============================== - -REQUIREMENTS ------------- - -_gsyncd_ is a program which can operate either in _master_ or in _slave_ mode. -Requirements are categorized according to this. - -* supported OS is GNU/Linux -* Python >= 2.5, or 2.4 with Ctypes (see below) (both) -* OpenSSH >= 4.0 (master) / SSH2 compliant sshd (eg. openssh) (slave) -* rsync (both) -* glusterfs with marker support (master); glusterfs (optional on slave) -* FUSE; for supported versions consult glusterfs - -INSTALLATION ------------- - -As of now, the supported way of operation is running from the source directory. - -If you use Python 2.4.x, you need to install the [Ctypes module](http://python.net/crew/theller/ctypes/). - -CONFIGURATION -------------- - -gsyncd tunables are a subset of the long command-line options; for listing them, -type - - gsyncd.py --help - -and see the long options up to "--config-file". (The leading double dash should be omitted; -interim underscores and dashes are interchangeable.) The set of options bear some resemblance -to those of glusterfs and rsync. - -The config file format matches the following syntax: - - : - : - # comment - -By default (unless specified by the option `-c`), gsyncd looks for config file at _conf/gsyncd.conf_ -in the source tree. - -USAGE ------ - -gsyncd is a utilitly for continous mirroring, ie. it mirrors master to slave incrementally. -Assume we have a gluster volume _pop_ at localhost. We try to set up the following mirrors -for it with gysncd: - -1. _/data/mirror_ -2. local gluster volume _yow_ -3. _/data/far_mirror_ at example.com -4. gluster volume _moz_ at example.com - -The respective gsyncd invocations are (demoing some syntax sugaring): - -1. - - gsyncd.py gluster://localhost:pop file:///data/mirror - - or short form - - gsyncd.py :pop /data/mirror - -2. `gsyncd :pop :yow` -3. - - gsyncd.py :pop ssh://example.com:/data/far_mirror - - or short form - - gsyncd.py :pop example.com:/data/far_mirror - -4. `gsyncd.py :pop example.com::moz` - -gsyncd has to be available on both sides; it's location on the remote side has to be specified -via the "--remote-gsyncd" option (or "remote-gsyncd" config file parameter). (This option can also be -used for setting options on the remote side, although the suggested mode of operation is to -set parameters like log file / pid file in the configuration file.) diff --git a/xlators/features/marker/utils/syncdaemon/__codecheck.py b/xlators/features/marker/utils/syncdaemon/__codecheck.py deleted file mode 100644 index e3386afba8b..00000000000 --- a/xlators/features/marker/utils/syncdaemon/__codecheck.py +++ /dev/null @@ -1,46 +0,0 @@ -import os -import os.path -import sys -import tempfile -import shutil - -ipd = tempfile.mkdtemp(prefix = 'codecheck-aux') - -try: - # add a fake ipaddr module, we don't want to - # deal with the real one (just test our code) - f = open(os.path.join(ipd, 'ipaddr.py'), 'w') - f.write(""" -class IPAddress(object): - pass -class IPNetwork(list): - pass -""") - f.close() - sys.path.append(ipd) - - fl = os.listdir(os.path.dirname(sys.argv[0]) or '.') - fl.sort() - for f in fl: - if f[-3:] != '.py' or f[0] == '_': - continue - m = f[:-3] - sys.stdout.write('importing %s ...' % m) - __import__(m) - print(' OK.') - - def sys_argv_set(a): - sys.argv = sys.argv[:1] + a - - gsyncd = sys.modules['gsyncd'] - for a in [['--help'], ['--version'], ['--canonicalize-escape-url', '/foo']]: - print('>>> invoking program with args: %s' % ' '.join(a)) - pid = os.fork() - if not pid: - sys_argv_set(a) - gsyncd.main() - _, r = os.waitpid(pid, 0) - if r: - raise RuntimeError('invocation failed') -finally: - shutil.rmtree(ipd) diff --git a/xlators/features/marker/utils/syncdaemon/__init__.py b/xlators/features/marker/utils/syncdaemon/__init__.py deleted file mode 100644 index e69de29bb2d..00000000000 diff --git a/xlators/features/marker/utils/syncdaemon/configinterface.py b/xlators/features/marker/utils/syncdaemon/configinterface.py deleted file mode 100644 index e55bec519e9..00000000000 --- a/xlators/features/marker/utils/syncdaemon/configinterface.py +++ /dev/null @@ -1,224 +0,0 @@ -try: - import ConfigParser -except ImportError: - # py 3 - import configparser as ConfigParser -import re -from string import Template - -from syncdutils import escape, unescape, norm, update_file, GsyncdError - -SECT_ORD = '__section_order__' -SECT_META = '__meta__' -config_version = 2.0 - -re_type = type(re.compile('')) - - -class MultiDict(object): - """a virtual dict-like class which functions as the union of underlying dicts""" - - def __init__(self, *dd): - self.dicts = dd - - def __getitem__(self, key): - val = None - for d in self.dicts: - if d.get(key): - val = d[key] - if not val: - raise KeyError(key) - return val - - -class GConffile(object): - """A high-level interface to ConfigParser which flattens the two-tiered - config layout by implenting automatic section dispatch based on initial - parameters. - - Also ensure section ordering in terms of their time of addition -- a compat - hack for Python < 2.7. - """ - - def _normconfig(self): - """normalize config keys by s/-/_/g""" - for n, s in self.config._sections.items(): - if n.find('__') == 0: - continue - s2 = type(s)() - for k, v in s.items(): - if k.find('__') != 0: - k = norm(k) - s2[k] = v - self.config._sections[n] = s2 - - def __init__(self, path, peers, *dd): - """ - - .path: location of config file - - .config: underlying ConfigParser instance - - .peers: on behalf of whom we flatten .config - (master, or master-slave url pair) - - .auxdicts: template subtituents - """ - self.peers = peers - self.path = path - self.auxdicts = dd - self.config = ConfigParser.RawConfigParser() - self.config.read(path) - self._normconfig() - - def section(self, rx=False): - """get the section name of the section representing .peers in .config""" - peers = self.peers - if not peers: - peers = ['.', '.'] - rx = True - if rx: - st = 'peersrx' - else: - st = 'peers' - return ' '.join([st] + [escape(u) for u in peers]) - - @staticmethod - def parse_section(section): - """retrieve peers sequence encoded by section name - (as urls or regexen, depending on section type) - """ - sl = section.split() - st = sl.pop(0) - sl = [unescape(u) for u in sl] - if st == 'peersrx': - sl = [re.compile(u) for u in sl] - return sl - - def ord_sections(self): - """Return an ordered list of sections. - - Ordering happens based on the auxiliary - SECT_ORD section storing indices for each - section added through the config API. - - To not to go corrupt in case of manually - written config files, we take care to append - also those sections which are not registered - in SECT_ORD. - - Needed for python 2.{4,5,6} where ConfigParser - cannot yet order sections/options internally. - """ - so = {} - if self.config.has_section(SECT_ORD): - so = self.config._sections[SECT_ORD] - so2 = {} - for k, v in so.items(): - if k != '__name__': - so2[k] = int(v) - tv = 0 - if so2: - tv = max(so2.values()) + 1 - ss = [s for s in self.config.sections() if s.find('__') != 0] - for s in ss: - if s in so.keys(): - continue - so2[s] = tv - tv += 1 - def scmp(x, y): - return cmp(*(so2[s] for s in (x, y))) - ss.sort(scmp) - return ss - - def update_to(self, dct, allow_unresolved=False): - """update @dct from key/values of ours. - - key/values are collected from .config by filtering the regexp sections - according to match, and from .section. The values are treated as templates, - which are substituted from .auxdicts and (in case of regexp sections) - match groups. - """ - if not self.peers: - raise GsyncdError('no peers given, cannot select matching options') - def update_from_sect(sect, mud): - for k, v in self.config._sections[sect].items(): - if k == '__name__': - continue - if allow_unresolved: - dct[k] = Template(v).safe_substitute(mud) - else: - dct[k] = Template(v).substitute(mud) - for sect in self.ord_sections(): - sp = self.parse_section(sect) - if isinstance(sp[0], re_type) and len(sp) == len(self.peers): - match = True - mad = {} - for i in range(len(sp)): - m = sp[i].search(self.peers[i]) - if not m: - match = False - break - for j in range(len(m.groups())): - mad['match%d_%d' % (i+1, j+1)] = m.groups()[j] - if match: - update_from_sect(sect, MultiDict(dct, mad, *self.auxdicts)) - if self.config.has_section(self.section()): - update_from_sect(self.section(), MultiDict(dct, *self.auxdicts)) - - def get(self, opt=None): - """print the matching key/value pairs from .config, - or if @opt given, the value for @opt (according to the - logic described in .update_to) - """ - d = {} - self.update_to(d, allow_unresolved = True) - if opt: - opt = norm(opt) - v = d.get(opt) - if v: - print(v) - else: - for k, v in d.iteritems(): - if k == '__name__': - continue - print("%s: %s" % (k, v)) - - def write(self, trfn, opt, *a, **kw): - """update on-disk config transactionally - - @trfn is the transaction function - """ - def mergeconf(f): - self.config = ConfigParser.RawConfigParser() - self.config.readfp(f) - self._normconfig() - if not self.config.has_section(SECT_META): - self.config.add_section(SECT_META) - self.config.set(SECT_META, 'version', config_version) - return trfn(norm(opt), *a, **kw) - def updateconf(f): - self.config.write(f) - update_file(self.path, updateconf, mergeconf) - - def _set(self, opt, val, rx=False): - """set @opt to @val in .section""" - sect = self.section(rx) - if not self.config.has_section(sect): - self.config.add_section(sect) - # regarding SECT_ORD, cf. ord_sections - if not self.config.has_section(SECT_ORD): - self.config.add_section(SECT_ORD) - self.config.set(SECT_ORD, sect, len(self.config._sections[SECT_ORD])) - self.config.set(sect, opt, val) - return True - - def set(self, opt, *a, **kw): - """perform ._set transactionally""" - self.write(self._set, opt, *a, **kw) - - def _delete(self, opt, rx=False): - """delete @opt from .section""" - sect = self.section(rx) - if self.config.has_section(sect): - return self.config.remove_option(sect, opt) - - def delete(self, opt, *a, **kw): - """perform ._delete transactionally""" - self.write(self._delete, opt, *a, **kw) diff --git a/xlators/features/marker/utils/syncdaemon/gconf.py b/xlators/features/marker/utils/syncdaemon/gconf.py deleted file mode 100644 index 146c72a1825..00000000000 --- a/xlators/features/marker/utils/syncdaemon/gconf.py +++ /dev/null @@ -1,20 +0,0 @@ -import os - -class GConf(object): - """singleton class to store globals - shared between gsyncd modules""" - - ssh_ctl_dir = None - ssh_ctl_args = None - cpid = None - pid_file_owned = False - log_exit = False - permanent_handles = [] - log_metadata = {} - - @classmethod - def setup_ssh_ctl(cls, ctld): - cls.ssh_ctl_dir = ctld - cls.ssh_ctl_args = ["-oControlMaster=auto", "-S", os.path.join(ctld, "gsycnd-ssh-%r@%h:%p")] - -gconf = GConf() diff --git a/xlators/features/marker/utils/syncdaemon/gsyncd.py b/xlators/features/marker/utils/syncdaemon/gsyncd.py deleted file mode 100644 index 387900e6ce8..00000000000 --- a/xlators/features/marker/utils/syncdaemon/gsyncd.py +++ /dev/null @@ -1,419 +0,0 @@ -#!/usr/bin/env python - -import os -import os.path -import sys -import time -import logging -import signal -import optparse -import fcntl -import fnmatch -from optparse import OptionParser, SUPPRESS_HELP -from logging import Logger -from errno import EEXIST, ENOENT - -from ipaddr import IPAddress, IPNetwork - -from gconf import gconf -from syncdutils import FreeObject, norm, grabpidfile, finalize, log_raise_exception -from syncdutils import GsyncdError, select, set_term_handler, privileged -from configinterface import GConffile -import resource -from monitor import monitor - -class GLogger(Logger): - """Logger customizations for gsyncd. - - It implements a log format similar to that of glusterfs. - """ - - def makeRecord(self, name, level, *a): - rv = Logger.makeRecord(self, name, level, *a) - rv.nsecs = (rv.created - int(rv.created)) * 1000000 - fr = sys._getframe(4) - callee = fr.f_locals.get('self') - if callee: - ctx = str(type(callee)).split("'")[1].split('.')[-1] - else: - ctx = '' - if not hasattr(rv, 'funcName'): - rv.funcName = fr.f_code.co_name - rv.lvlnam = logging.getLevelName(level)[0] - rv.ctx = ctx - return rv - - @classmethod - def setup(cls, **kw): - lbl = kw.get('label', "") - if lbl: - lbl = '(' + lbl + ')' - lprm = {'datefmt': "%Y-%m-%d %H:%M:%S", - 'format': "[%(asctime)s.%(nsecs)d] %(lvlnam)s [%(module)s" + lbl + ":%(lineno)s:%(funcName)s] %(ctx)s: %(message)s"} - lprm.update(kw) - lvl = kw.get('level', logging.INFO) - lprm['level'] = lvl - logging.root = cls("root", lvl) - logging.setLoggerClass(cls) - logging.getLogger().handlers = [] - logging.basicConfig(**lprm) - - @classmethod - def _gsyncd_loginit(cls, **kw): - lkw = {} - if gconf.log_level: - lkw['level'] = gconf.log_level - if kw.get('log_file'): - if kw['log_file'] in ('-', '/dev/stderr'): - lkw['stream'] = sys.stderr - elif kw['log_file'] == '/dev/stdout': - lkw['stream'] = sys.stdout - else: - lkw['filename'] = kw['log_file'] - - cls.setup(label=kw.get('label'), **lkw) - - lkw.update({'saved_label': kw.get('label')}) - gconf.log_metadata = lkw - gconf.log_exit = True - -def startup(**kw): - """set up logging, pidfile grabbing, daemonization""" - if getattr(gconf, 'pid_file', None) and kw.get('go_daemon') != 'postconn': - if not grabpidfile(): - sys.stderr.write("pidfile is taken, exiting.\n") - sys.exit(2) - gconf.pid_file_owned = True - - if kw.get('go_daemon') == 'should': - x, y = os.pipe() - gconf.cpid = os.fork() - if gconf.cpid: - os.close(x) - sys.exit() - os.close(y) - os.setsid() - dn = os.open(os.devnull, os.O_RDWR) - for f in (sys.stdin, sys.stdout, sys.stderr): - os.dup2(dn, f.fileno()) - if getattr(gconf, 'pid_file', None): - if not grabpidfile(gconf.pid_file + '.tmp'): - raise GsyncdError("cannot grab temporary pidfile") - os.rename(gconf.pid_file + '.tmp', gconf.pid_file) - # wait for parent to terminate - # so we can start up with - # no messing from the dirty - # ol' bustard - select((x,), (), ()) - os.close(x) - - GLogger._gsyncd_loginit(**kw) - -def main(): - """main routine, signal/exception handling boilerplates""" - gconf.starttime = time.time() - set_term_handler() - GLogger.setup() - excont = FreeObject(exval = 0) - try: - try: - main_i() - except: - log_raise_exception(excont) - finally: - finalize(exval = excont.exval) - -def main_i(): - """internal main routine - - parse command line, decide what action will be taken; - we can either: - - query/manipulate configuration - - format gsyncd urls using gsyncd's url parsing engine - - start service in following modes, in given stages: - - monitor: startup(), monitor() - - master: startup(), connect_remote(), connect(), service_loop() - - slave: startup(), connect(), service_loop() - """ - rconf = {'go_daemon': 'should'} - - def store_abs(opt, optstr, val, parser): - if val and val != '-': - val = os.path.abspath(val) - setattr(parser.values, opt.dest, val) - def store_local(opt, optstr, val, parser): - rconf[opt.dest] = val - def store_local_curry(val): - return lambda o, oo, vx, p: store_local(o, oo, val, p) - def store_local_obj(op, dmake): - return lambda o, oo, vx, p: store_local(o, oo, FreeObject(op=op, **dmake(vx)), p) - - op = OptionParser(usage="%prog [options...] ", version="%prog 0.0.1") - op.add_option('--gluster-command-dir', metavar='DIR', default='') - op.add_option('--gluster-log-file', metavar='LOGF', default=os.devnull, type=str, action='callback', callback=store_abs) - op.add_option('--gluster-log-level', metavar='LVL') - op.add_option('--gluster-params', metavar='PRMS', default='') - op.add_option('--gluster-cli-options', metavar='OPTS', default='--log-file=-') - op.add_option('--mountbroker', metavar='LABEL') - op.add_option('-p', '--pid-file', metavar='PIDF', type=str, action='callback', callback=store_abs) - op.add_option('-l', '--log-file', metavar='LOGF', type=str, action='callback', callback=store_abs) - op.add_option('--log-file-mbr', metavar='LOGF', type=str, action='callback', callback=store_abs) - op.add_option('--state-file', metavar='STATF', type=str, action='callback', callback=store_abs) - op.add_option('--ignore-deletes', default=False, action='store_true') - op.add_option('--use-rsync-xattrs', default=False, action='store_true') - op.add_option('-L', '--log-level', metavar='LVL') - op.add_option('-r', '--remote-gsyncd', metavar='CMD', default=os.path.abspath(sys.argv[0])) - op.add_option('--volume-id', metavar='UUID') - op.add_option('--session-owner', metavar='ID') - op.add_option('-s', '--ssh-command', metavar='CMD', default='ssh') - op.add_option('--rsync-command', metavar='CMD', default='rsync') - op.add_option('--rsync-options', metavar='OPTS', default='--sparse') - op.add_option('--rsync-ssh-options', metavar='OPTS', default='--compress') - op.add_option('--timeout', metavar='SEC', type=int, default=120) - op.add_option('--connection-timeout', metavar='SEC', type=int, default=60, help=SUPPRESS_HELP) - op.add_option('--sync-jobs', metavar='N', type=int, default=3) - op.add_option('--turns', metavar='N', type=int, default=0, help=SUPPRESS_HELP) - op.add_option('--allow-network', metavar='IPS', default='') - op.add_option('--socketdir', metavar='DIR') - op.add_option('--state-socket-unencoded', metavar='SOCKF', type=str, action='callback', callback=store_abs) - op.add_option('--checkpoint', metavar='LABEL', default='') - # tunables for failover/failback mechanism: - # None - gsyncd behaves as normal - # blind - gsyncd works with xtime pairs to identify - # candidates for synchronization - # wrapup - same as normal mode but does not assign - # xtimes to orphaned files - # see crawl() for usage of the above tunables - op.add_option('--special-sync-mode', type=str, help=SUPPRESS_HELP) - - op.add_option('-c', '--config-file', metavar='CONF', type=str, action='callback', callback=store_local) - # duh. need to specify dest or value will be mapped to None :S - op.add_option('--monitor', dest='monitor', action='callback', callback=store_local_curry(True)) - op.add_option('--feedback-fd', dest='feedback_fd', type=int, help=SUPPRESS_HELP, action='callback', callback=store_local) - op.add_option('--listen', dest='listen', help=SUPPRESS_HELP, action='callback', callback=store_local_curry(True)) - op.add_option('-N', '--no-daemon', dest="go_daemon", action='callback', callback=store_local_curry('dont')) - op.add_option('--debug', dest="go_daemon", action='callback', callback=lambda *a: (store_local_curry('dont')(*a), - setattr(a[-1].values, 'log_file', '-'), - setattr(a[-1].values, 'log_level', 'DEBUG'))), - - for a in ('check', 'get'): - op.add_option('--config-' + a, metavar='OPT', type=str, dest='config', action='callback', - callback=store_local_obj(a, lambda vx: {'opt': vx})) - op.add_option('--config-get-all', dest='config', action='callback', callback=store_local_obj('get', lambda vx: {'opt': None})) - for m in ('', '-rx', '-glob'): - # call this code 'Pythonic' eh? - # have to define a one-shot local function to be able to inject (a value depending on the) - # iteration variable into the inner lambda - def conf_mod_opt_regex_variant(rx): - op.add_option('--config-set' + m, metavar='OPT VAL', type=str, nargs=2, dest='config', action='callback', - callback=store_local_obj('set', lambda vx: {'opt': vx[0], 'val': vx[1], 'rx': rx})) - op.add_option('--config-del' + m, metavar='OPT', type=str, dest='config', action='callback', - callback=store_local_obj('del', lambda vx: {'opt': vx, 'rx': rx})) - conf_mod_opt_regex_variant(m and m[1:] or False) - - op.add_option('--normalize-url', dest='url_print', action='callback', callback=store_local_curry('normal')) - op.add_option('--canonicalize-url', dest='url_print', action='callback', callback=store_local_curry('canon')) - op.add_option('--canonicalize-escape-url', dest='url_print', action='callback', callback=store_local_curry('canon_esc')) - - tunables = [ norm(o.get_opt_string()[2:]) for o in op.option_list if o.callback in (store_abs, 'store_true', None) and o.get_opt_string() not in ('--version', '--help') ] - remote_tunables = [ 'listen', 'go_daemon', 'timeout', 'session_owner', 'config_file', 'use_rsync_xattrs' ] - rq_remote_tunables = { 'listen': True } - - # precedence for sources of values: 1) commandline, 2) cfg file, 3) defaults - # -- for this to work out we need to tell apart defaults from explicitly set - # options... so churn out the defaults here and call the parser with virgin - # values container. - defaults = op.get_default_values() - opts, args = op.parse_args(values=optparse.Values()) - confdata = rconf.get('config') - if not (len(args) == 2 or \ - (len(args) == 1 and rconf.get('listen')) or \ - (len(args) <= 2 and confdata) or \ - rconf.get('url_print')): - sys.stderr.write("error: incorrect number of arguments\n\n") - sys.stderr.write(op.get_usage() + "\n") - sys.exit(1) - - restricted = os.getenv('_GSYNCD_RESTRICTED_') - - if restricted: - allopts = {} - allopts.update(opts.__dict__) - allopts.update(rconf) - bannedtuns = set(allopts.keys()) - set(remote_tunables) - if bannedtuns: - raise GsyncdError('following tunables cannot be set with restricted SSH invocaton: ' + \ - ', '.join(bannedtuns)) - for k, v in rq_remote_tunables.items(): - if not k in allopts or allopts[k] != v: - raise GsyncdError('tunable %s is not set to value %s required for restricted SSH invocaton' % \ - (k, v)) - - confrx = getattr(confdata, 'rx', None) - if confrx: - # peers are regexen, don't try to parse them - if confrx == 'glob': - args = [ '\A' + fnmatch.translate(a) for a in args ] - canon_peers = args - namedict = {} - else: - rscs = [resource.parse_url(u) for u in args] - dc = rconf.get('url_print') - if dc: - for r in rscs: - print(r.get_url(**{'normal': {}, - 'canon': {'canonical': True}, - 'canon_esc': {'canonical': True, 'escaped': True}}[dc])) - return - local = remote = None - if rscs: - local = rscs[0] - if len(rscs) > 1: - remote = rscs[1] - if not local.can_connect_to(remote): - raise GsyncdError("%s cannot work with %s" % (local.path, remote and remote.path)) - pa = ([], [], []) - urlprms = ({}, {'canonical': True}, {'canonical': True, 'escaped': True}) - for x in rscs: - for i in range(len(pa)): - pa[i].append(x.get_url(**urlprms[i])) - peers, canon_peers, canon_esc_peers = pa - # creating the namedict, a dict representing various ways of referring to / repreenting - # peers to be fillable in config templates - mods = (lambda x: x, lambda x: x[0].upper() + x[1:], lambda x: 'e' + x[0].upper() + x[1:]) - if remote: - rmap = { local: ('local', 'master'), remote: ('remote', 'slave') } - else: - rmap = { local: ('local', 'slave') } - namedict = {} - for i in range(len(rscs)): - x = rscs[i] - for name in rmap[x]: - for j in range(3): - namedict[mods[j](name)] = pa[j][i] - if x.scheme == 'gluster': - namedict[name + 'vol'] = x.volume - if not 'config_file' in rconf: - rconf['config_file'] = os.path.join(os.path.dirname(sys.argv[0]), "conf/gsyncd.conf") - gcnf = GConffile(rconf['config_file'], canon_peers, defaults.__dict__, opts.__dict__, namedict) - - checkpoint_change = False - if confdata: - opt_ok = norm(confdata.opt) in tunables + [None] - if confdata.op == 'check': - if opt_ok: - sys.exit(0) - else: - sys.exit(1) - elif not opt_ok: - raise GsyncdError("not a valid option: " + confdata.opt) - if confdata.op == 'get': - gcnf.get(confdata.opt) - elif confdata.op == 'set': - gcnf.set(confdata.opt, confdata.val, confdata.rx) - elif confdata.op == 'del': - gcnf.delete(confdata.opt, confdata.rx) - # when modifying checkpoint, it's important to make a log - # of that, so in that case we go on to set up logging even - # if its just config invocation - if confdata.opt == 'checkpoint' and confdata.op in ('set', 'del') and \ - not confdata.rx: - checkpoint_change = True - if not checkpoint_change: - return - - gconf.__dict__.update(defaults.__dict__) - gcnf.update_to(gconf.__dict__) - gconf.__dict__.update(opts.__dict__) - gconf.configinterface = gcnf - - if restricted and gconf.allow_network: - ssh_conn = os.getenv('SSH_CONNECTION') - if not ssh_conn: - #legacy env var - ssh_conn = os.getenv('SSH_CLIENT') - if ssh_conn: - allowed_networks = [ IPNetwork(a) for a in gconf.allow_network.split(',') ] - client_ip = IPAddress(ssh_conn.split()[0]) - allowed = False - for nw in allowed_networks: - if client_ip in nw: - allowed = True - break - if not allowed: - raise GsyncdError("client IP address is not allowed") - - ffd = rconf.get('feedback_fd') - if ffd: - fcntl.fcntl(ffd, fcntl.F_SETFD, fcntl.FD_CLOEXEC) - - #normalize loglevel - lvl0 = gconf.log_level - if isinstance(lvl0, str): - lvl1 = lvl0.upper() - lvl2 = logging.getLevelName(lvl1) - # I have _never_ _ever_ seen such an utterly braindead - # error condition - if lvl2 == "Level " + lvl1: - raise GsyncdError('cannot recognize log level "%s"' % lvl0) - gconf.log_level = lvl2 - - if not privileged() and gconf.log_file_mbr: - gconf.log_file = gconf.log_file_mbr - - if checkpoint_change: - try: - GLogger._gsyncd_loginit(log_file=gconf.log_file, label='conf') - if confdata.op == 'set': - logging.info('checkpoint %s set' % confdata.val) - elif confdata.op == 'del': - logging.info('checkpoint info was reset') - except IOError: - if sys.exc_info()[1].errno == ENOENT: - # directory of log path is not present, - # which happens if we get here from - # a peer-multiplexed "config-set checkpoint" - # (as that directory is created only on the - # original node) - pass - else: - raise - return - - go_daemon = rconf['go_daemon'] - be_monitor = rconf.get('monitor') - - if not be_monitor and isinstance(remote, resource.SSH) and \ - go_daemon == 'should': - go_daemon = 'postconn' - log_file = None - else: - log_file = gconf.log_file - if be_monitor: - label = 'monitor' - elif remote: - #master - label = '' - else: - label = 'slave' - startup(go_daemon=go_daemon, log_file=log_file, label=label) - - if be_monitor: - return monitor() - - logging.info("syncing: %s" % " -> ".join(peers)) - resource.Popen.init_errhandler() - if remote: - go_daemon = remote.connect_remote(go_daemon=go_daemon) - if go_daemon: - startup(go_daemon=go_daemon, log_file=gconf.log_file) - # complete remote connection in child - remote.connect_remote(go_daemon='done') - local.connect() - if ffd: - os.close(ffd) - local.service_loop(*[r for r in [remote] if r]) - - -if __name__ == "__main__": - main() diff --git a/xlators/features/marker/utils/syncdaemon/libcxattr.py b/xlators/features/marker/utils/syncdaemon/libcxattr.py deleted file mode 100644 index f0a9d22920a..00000000000 --- a/xlators/features/marker/utils/syncdaemon/libcxattr.py +++ /dev/null @@ -1,72 +0,0 @@ -import os -from ctypes import * -from ctypes.util import find_library - -class Xattr(object): - """singleton that wraps the extended attribues system - interface for python using ctypes - - Just implement it to the degree we need it, in particular - - we need just the l*xattr variants, ie. we never want symlinks to be - followed - - don't need size discovery for getxattr, as we always know the exact - sizes we expect - """ - - libc = CDLL(find_library("libc")) - - @classmethod - def geterrno(cls): - return c_int.in_dll(cls.libc, 'errno').value - - @classmethod - def raise_oserr(cls): - errn = cls.geterrno() - raise OSError(errn, os.strerror(errn)) - - @classmethod - def _query_xattr(cls, path, siz, syscall, *a): - if siz: - buf = create_string_buffer('\0' * siz) - else: - buf = None - ret = getattr(cls.libc, syscall)(*((path,) + a + (buf, siz))) - if ret == -1: - cls.raise_oserr() - if siz: - return buf.raw[:ret] - else: - return ret - - @classmethod - def lgetxattr(cls, path, attr, siz=0): - return cls._query_xattr( path, siz, 'lgetxattr', attr) - - @classmethod - def llistxattr(cls, path, siz=0): - ret = cls._query_xattr(path, siz, 'llistxattr') - if isinstance(ret, str): - ret = ret.split('\0') - return ret - - @classmethod - def lsetxattr(cls, path, attr, val): - ret = cls.libc.lsetxattr(path, attr, val, len(val), 0) - if ret == -1: - cls.raise_oserr() - - @classmethod - def lremovexattr(cls, path, attr): - ret = cls.libc.lremovexattr(path, attr) - if ret == -1: - cls.raise_oserr() - - @classmethod - def llistxattr_buf(cls, path): - """listxattr variant with size discovery""" - size = cls.llistxattr(path) - if size == -1: - cls.raise_oserr() - if size == 0: - return [] - return cls.llistxattr(path, size) diff --git a/xlators/features/marker/utils/syncdaemon/master.py b/xlators/features/marker/utils/syncdaemon/master.py deleted file mode 100644 index f903f30595d..00000000000 --- a/xlators/features/marker/utils/syncdaemon/master.py +++ /dev/null @@ -1,961 +0,0 @@ -import os -import sys -import time -import stat -import random -import signal -import logging -import socket -import errno -import re -from errno import ENOENT, ENODATA, EPIPE -from threading import currentThread, Condition, Lock -from datetime import datetime -try: - from hashlib import md5 as md5 -except ImportError: - # py 2.4 - from md5 import new as md5 - -from gconf import gconf -from syncdutils import FreeObject, Thread, GsyncdError, boolify, \ - escape, unescape, select - -URXTIME = (-1, 0) - -# Utility functions to help us to get to closer proximity -# of the DRY principle (no, don't look for elevated or -# perspectivistic things here) - -def _xtime_now(): - t = time.time() - sec = int(t) - nsec = int((t - sec) * 1000000) - return (sec, nsec) - -def _volinfo_hook_relax_foreign(self): - volinfo_sys = self.get_sys_volinfo() - fgn_vi = volinfo_sys[self.KFGN] - if fgn_vi: - expiry = fgn_vi['timeout'] - int(time.time()) + 1 - logging.info('foreign volume info found, waiting %d sec for expiry' % \ - expiry) - time.sleep(expiry) - volinfo_sys = self.get_sys_volinfo() - self.volinfo_state, state_change = self.volinfo_state_machine(self.volinfo_state, - volinfo_sys) - if self.inter_master: - raise GsyncdError("cannot be intermediate master in special mode") - return (volinfo_sys, state_change) - - -# The API! - -def gmaster_builder(): - """produce the GMaster class variant corresponding - to sync mode""" - this = sys.modules[__name__] - modemixin = gconf.special_sync_mode - if not modemixin: - modemixin = 'normal' - logging.info('setting up master for %s sync mode' % modemixin) - modemixin = getattr(this, modemixin.capitalize() + 'Mixin') - sendmarkmixin = boolify(gconf.use_rsync_xattrs) and SendmarkRsyncMixin or SendmarkNormalMixin - purgemixin = boolify(gconf.ignore_deletes) and PurgeNoopMixin or PurgeNormalMixin - class _GMaster(GMasterBase, modemixin, sendmarkmixin, purgemixin): - pass - return _GMaster - - -# Mixin classes that implement the data format -# and logic particularities of the certain -# sync modes - -class NormalMixin(object): - """normal geo-rep behavior""" - - minus_infinity = URXTIME - - # following staticmethods ideally would be - # methods of an xtime object (in particular, - # implementing the hooks needed for comparison - # operators), but at this point we don't yet - # have a dedicated xtime class - - @staticmethod - def serialize_xtime(xt): - return "%d.%d" % tuple(xt) - - @staticmethod - def deserialize_xtime(xt): - return tuple(int(x) for x in xt.split(".")) - - @staticmethod - def native_xtime(xt): - return xt - - @staticmethod - def xtime_geq(xt0, xt1): - return xt0 >= xt1 - - def make_xtime_opts(self, is_master, opts): - if not 'create' in opts: - opts['create'] = is_master and not self.inter_master - if not 'default_xtime' in opts: - if is_master and self.inter_master: - opts['default_xtime'] = ENODATA - else: - opts['default_xtime'] = URXTIME - - def xtime_low(self, server, path, **opts): - xt = server.xtime(path, self.uuid) - if isinstance(xt, int) and xt != ENODATA: - return xt - if xt == ENODATA or xt < self.volmark: - if opts['create']: - xt = _xtime_now() - server.set_xtime(path, self.uuid, xt) - else: - xt = opts['default_xtime'] - return xt - - def keepalive_payload_hook(self, timo, gap): - # first grab a reference as self.volinfo - # can be changed in main thread - vi = self.volinfo - if vi: - # then have a private copy which we can mod - vi = vi.copy() - vi['timeout'] = int(time.time()) + timo - else: - # send keep-alives more frequently to - # avoid a delay in announcing our volume info - # to slave if it becomes established in the - # meantime - gap = min(10, gap) - return (vi, gap) - - def volinfo_hook(self): - volinfo_sys = self.get_sys_volinfo() - self.volinfo_state, state_change = self.volinfo_state_machine(self.volinfo_state, - volinfo_sys) - return (volinfo_sys, state_change) - - def xtime_reversion_hook(self, path, xtl, xtr): - if xtr > xtl: - raise GsyncdError("timestamp corruption for " + path) - - def need_sync(self, e, xte, xtrd): - return xte > xtrd - - def set_slave_xtime(self, path, mark): - self.slave.server.set_xtime(path, self.uuid, mark) - -class WrapupMixin(NormalMixin): - """a variant that differs from normal in terms - of ignoring non-indexed files""" - - @staticmethod - def make_xtime_opts(is_master, opts): - if not 'create' in opts: - opts['create'] = False - if not 'default_xtime' in opts: - opts['default_xtime'] = URXTIME - - @staticmethod - def keepalive_payload_hook(timo, gap): - return (None, gap) - - def volinfo_hook(self): - return _volinfo_hook_relax_foreign(self) - -class BlindMixin(object): - """Geo-rep flavor using vectored xtime. - - Coordinates are the master, slave uuid pair; - in master coordinate behavior is normal, - in slave coordinate we force synchronization - on any value difference (these are in disjunctive - relation, ie. if either orders the entry to be - synced, it shall be synced. - """ - - minus_infinity = (URXTIME, None) - - @staticmethod - def serialize_xtime(xt): - a = [] - for x in xt: - if not x: - x = ('None', '') - a.extend(x) - return '.'.join(str(n) for n in a) - - @staticmethod - def deserialize_xtime(xt): - a = xt.split(".") - a = (tuple(a[0:2]), tuple(a[3:4])) - b = [] - for p in a: - if p[0] == 'None': - p = None - else: - p = tuple(int(x) for x in p) - b.append(p) - return tuple(b) - - @staticmethod - def native_xtime(xt): - return xt[0] - - @staticmethod - def xtime_geq(xt0, xt1): - return (not xt1[0] or xt0[0] >= xt1[0]) and \ - (not xt1[1] or xt0[1] >= xt1[1]) - - @property - def ruuid(self): - if self.volinfo_r: - return self.volinfo_r['uuid'] - - @staticmethod - def make_xtime_opts(is_master, opts): - if not 'create' in opts: - opts['create'] = is_master - if not 'default_xtime' in opts: - opts['default_xtime'] = URXTIME - - def xtime_low(self, server, path, **opts): - xtd = server.xtime_vec(path, self.uuid, self.ruuid) - if isinstance(xtd, int): - return xtd - xt = (xtd[self.uuid], xtd[self.ruuid]) - if not xt[1] and (not xt[0] or xt[0] < self.volmark): - if opts['create']: - # not expected, but can happen if file originates - # from interrupted gsyncd transfer - logging.warn('have to fix up missing xtime on ' + path) - xt0 = _xtime_now() - server.set_xtime(path, self.uuid, xt0) - else: - xt0 = opts['default_xtime'] - xt = (xt0, xt[1]) - return xt - - @staticmethod - def keepalive_payload_hook(timo, gap): - return (None, gap) - - def volinfo_hook(self): - res = _volinfo_hook_relax_foreign(self) - volinfo_r_new = self.slave.server.native_volume_info() - if volinfo_r_new['retval']: - raise GsyncdError("slave is corrupt") - if getattr(self, 'volinfo_r', None): - if self.volinfo_r['uuid'] != volinfo_r_new['uuid']: - raise GsyncdError("uuid mismatch on slave") - self.volinfo_r = volinfo_r_new - return res - - def xtime_reversion_hook(self, path, xtl, xtr): - if not isinstance(xtr[0], int) and \ - (isinstance(xtl[0], int) or xtr[0] > xtl[0]): - raise GsyncdError("timestamp corruption for " + path) - - def need_sync(self, e, xte, xtrd): - if xte[0]: - if not xtrd[0] or xte[0] > xtrd[0]: - # there is outstanding diff at 0th pos, - # we can short-cut to true - return True - # we arrived to this point by either of these - # two possiblilites: - # - no outstanding difference at 0th pos, - # wanna see 1st pos if he raises veto - # against "no need to sync" proposal - # - no data at 0th pos, 1st pos will have - # to decide (due to xtime assignment, - # in this case 1st pos does carry data - # -- iow, if 1st pos did not have data, - # and 0th neither, 0th would have been - # force-feeded) - if not xte[1]: - # no data, no veto - return False - # the hard work: for 1st pos, - # the conduct is fetch corresponding - # slave data and do a "blind" comparison - # (ie. do not care who is newer, we trigger - # sync on non-identical xitmes) - xtr = self.xtime(e, self.slave) - return isinstance(xtr, int) or xte[1] != xtr[1] - - def set_slave_xtime(self, path, mark): - xtd = {} - for (u, t) in zip((self.uuid, self.ruuid), mark): - if t: - xtd[u] = t - self.slave.server.set_xtime_vec(path, xtd) - - -# Further mixins for certain tunable behaviors - -class SendmarkNormalMixin(object): - - def sendmark_regular(self, *a, **kw): - return self.sendmark(*a, **kw) - -class SendmarkRsyncMixin(object): - - def sendmark_regular(self, *a, **kw): - pass - - -class PurgeNormalMixin(object): - - def purge_missing(self, path, names): - self.slave.server.purge(path, names) - -class PurgeNoopMixin(object): - - def purge_missing(self, path, names): - pass - - - -class GMasterBase(object): - """abstract class impementling master role""" - - KFGN = 0 - KNAT = 1 - - def get_sys_volinfo(self): - """query volume marks on fs root - - err out on multiple foreign masters - """ - fgn_vis, nat_vi = self.master.server.foreign_volume_infos(), \ - self.master.server.native_volume_info() - fgn_vi = None - if fgn_vis: - if len(fgn_vis) > 1: - raise GsyncdError("cannot work with multiple foreign masters") - fgn_vi = fgn_vis[0] - return fgn_vi, nat_vi - - @property - def uuid(self): - if self.volinfo: - return self.volinfo['uuid'] - - @property - def volmark(self): - if self.volinfo: - return self.volinfo['volume_mark'] - - @property - def inter_master(self): - """decide if we are an intermediate master - in a cascading setup - """ - return self.volinfo_state[self.KFGN] and True or False - - def xtime(self, path, *a, **opts): - """get amended xtime - - as of amending, we can create missing xtime, or - determine a valid value if what we get is expired - (as of the volume mark expiry); way of amendig - depends on @opts and on subject of query (master - or slave). - """ - if a: - rsc = a[0] - else: - rsc = self.master - self.make_xtime_opts(rsc == self.master, opts) - return self.xtime_low(rsc.server, path, **opts) - - def __init__(self, master, slave): - self.master = master - self.slave = slave - self.jobtab = {} - self.syncer = Syncer(slave) - # crawls vs. turns: - # - self.crawls is simply the number of crawl() invocations on root - # - one turn is a maximal consecutive sequence of crawls so that each - # crawl in it detects a change to be synced - # - self.turns is the number of turns since start - # - self.total_turns is a limit so that if self.turns reaches it, then - # we exit (for diagnostic purposes) - # so, eg., if the master fs changes unceasingly, self.turns will remain 0. - self.crawls = 0 - self.turns = 0 - self.total_turns = int(gconf.turns) - self.lastreport = {'crawls': 0, 'turns': 0} - self.start = None - self.change_seen = None - self.syncTime=0 - self.lastSyncTime=0 - self.crawlStartTime=0 - self.crawlTime=0 - self.filesSynced=0 - self.bytesSynced=0 - # the authoritative (foreign, native) volinfo pair - # which lets us deduce what to do when we refetch - # the volinfos from system - uuid_preset = getattr(gconf, 'volume_id', None) - self.volinfo_state = (uuid_preset and {'uuid': uuid_preset}, None) - # the actual volinfo we make use of - self.volinfo = None - self.terminate = False - self.checkpoint_thread = None - - @classmethod - def _checkpt_param(cls, chkpt, prm, xtimish=True): - """use config backend to lookup a parameter belonging to - checkpoint @chkpt""" - cprm = getattr(gconf, 'checkpoint_' + prm, None) - if not cprm: - return - chkpt_mapped, val = cprm.split(':', 1) - if unescape(chkpt_mapped) != chkpt: - return - if xtimish: - val = cls.deserialize_xtime(val) - return val - - @classmethod - def _set_checkpt_param(cls, chkpt, prm, val, xtimish=True): - """use config backend to store a parameter associated - with checkpoint @chkpt""" - if xtimish: - val = cls.serialize_xtime(val) - gconf.configinterface.set('checkpoint_' + prm, "%s:%s" % (escape(chkpt), val)) - - @staticmethod - def humantime(*tpair): - """format xtime-like (sec, nsec) pair to human readable format""" - ts = datetime.fromtimestamp(float('.'.join(str(n) for n in tpair))).\ - strftime("%Y-%m-%d %H:%M:%S") - if len(tpair) > 1: - ts += '.' + str(tpair[1]) - return ts - - def get_extra_info(self): - str_info="\nFile synced : %d" %(self.filesSynced) - str_info+="\nBytes Synced : %d KB" %(self.syncer.bytesSynced) - str_info+="\nSync Time : %f seconds" %(self.syncTime) - self.crawlTime=datetime.now()-self.crawlStartTime - years , days =divmod(self.crawlTime.days,365.25) - years=int(years) - days=int(days) - - date="" - m, s = divmod(self.crawlTime.seconds, 60) - h, m = divmod(m, 60) - - if years!=0 : - date+=str(years)+" year " - if days!=0 : - date+=str(days)+" day " - if h!=0 : - date+=str(h)+" H : " - if m!=0 or h!=0 : - date+=str(m)+" M : " - - date+=str(s)+" S" - self.crawlTime=date - str_info+="\nCrawl Time : %s" %(str(self.crawlTime)) - str_info+="\n\0" - return str_info - - def checkpt_service(self, chan, chkpt, tgt): - """checkpoint service loop - - monitor and verify checkpoint status for @chkpt, and listen - for incoming requests for whom we serve a pretty-formatted - status report""" - if not chkpt: - # dummy loop for the case when there is no checkpt set - while True: - select([chan], [], []) - conn, _ = chan.accept() - conn.send(self.get_extra_info()) - conn.close() - completed = self._checkpt_param(chkpt, 'completed', xtimish=False) - if completed: - completed = tuple(int(x) for x in completed.split('.')) - while True: - s,_,_ = select([chan], [], [], (not completed) and 5 or None) - # either request made and we re-check to not - # give back stale data, or we still hunting for completion - if self.native_xtime(tgt) and self.native_xtime(tgt) < self.volmark: - # indexing has been reset since setting the checkpoint - status = "is invalid" - else: - xtr = self.xtime('.', self.slave) - if isinstance(xtr, int): - raise GsyncdError("slave root directory is unaccessible (%s)", - os.strerror(xtr)) - ncompleted = self.xtime_geq(xtr, tgt) - if completed and not ncompleted: # stale data - logging.warn("completion time %s for checkpoint %s became stale" % \ - (self.humantime(*completed), chkpt)) - completed = None - gconf.confdata.delete('checkpoint-completed') - if ncompleted and not completed: # just reaching completion - completed = "%.6f" % time.time() - self._set_checkpt_param(chkpt, 'completed', completed, xtimish=False) - completed = tuple(int(x) for x in completed.split('.')) - logging.info("checkpoint %s completed" % chkpt) - status = completed and \ - "completed at " + self.humantime(completed[0]) or \ - "not reached yet" - if s: - conn = None - try: - conn, _ = chan.accept() - try: - conn.send(" | checkpoint %s %s %s" % (chkpt, status,self.get_extra_info())) - except: - exc = sys.exc_info()[1] - if (isinstance(exc, OSError) or isinstance(exc, IOError)) and \ - exc.errno == EPIPE: - logging.debug('checkpoint client disconnected') - else: - raise - finally: - if conn: - conn.close() - - def start_checkpoint_thread(self): - """prepare and start checkpoint service""" - if self.checkpoint_thread or not ( - getattr(gconf, 'state_socket_unencoded', None) and getattr(gconf, 'socketdir', None) - ): - return - chan = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) - state_socket = os.path.join(gconf.socketdir, md5(gconf.state_socket_unencoded).hexdigest() + ".socket") - try: - os.unlink(state_socket) - except: - if sys.exc_info()[0] == OSError: - pass - chan.bind(state_socket) - chan.listen(1) - checkpt_tgt = None - if gconf.checkpoint: - checkpt_tgt = self._checkpt_param(gconf.checkpoint, 'target') - if not checkpt_tgt: - checkpt_tgt = self.xtime('.') - if isinstance(checkpt_tgt, int): - raise GsyncdError("master root directory is unaccessible (%s)", - os.strerror(checkpt_tgt)) - self._set_checkpt_param(gconf.checkpoint, 'target', checkpt_tgt) - logging.debug("checkpoint target %s has been determined for checkpoint %s" % \ - (repr(checkpt_tgt), gconf.checkpoint)) - t = Thread(target=self.checkpt_service, args=(chan, gconf.checkpoint, checkpt_tgt)) - t.start() - self.checkpoint_thread = t - - def crawl_loop(self): - """start the keep-alive thread and iterate .crawl""" - timo = int(gconf.timeout or 0) - if timo > 0: - def keep_alive(): - while True: - vi, gap = self.keepalive_payload_hook(timo, timo * 0.5) - self.slave.server.keep_alive(vi) - time.sleep(gap) - t = Thread(target=keep_alive) - t.start() - self.lastreport['time'] = time.time() - self.crawlStartTime=datetime.now() - while not self.terminate: - self.crawl() - - def add_job(self, path, label, job, *a, **kw): - """insert @job function to job table at @path with @label""" - if self.jobtab.get(path) == None: - self.jobtab[path] = [] - self.jobtab[path].append((label, a, lambda : job(*a, **kw))) - - def add_failjob(self, path, label): - """invoke .add_job with a job that does nothing just fails""" - logging.debug('salvaged: ' + label) - self.add_job(path, label, lambda: False) - - def wait(self, path, *args): - """perform jobs registered for @path - - Reset jobtab entry for @path, - determine success as the conjuction of - success of all the jobs. In case of - success, call .sendmark on @path - """ - jobs = self.jobtab.pop(path, []) - succeed = True - for j in jobs: - ret = j[-1]() - if not ret: - succeed = False - if succeed: - self.sendmark(path, *args) - return succeed - - def sendmark(self, path, mark, adct=None): - """update slave side xtime for @path to master side xtime - - also can send a setattr payload (see Server.setattr). - """ - if adct: - self.slave.server.setattr(path, adct) - self.set_slave_xtime(path, mark) - - @staticmethod - def volinfo_state_machine(volinfo_state, volinfo_sys): - """compute new volinfo_state from old one and incoming - as of current system state, also indicating if there was a - change regarding which volume mark is the authoritative one - - @volinfo_state, @volinfo_sys are pairs of volume mark dicts - (foreign, native). - - Note this method is marked as static, ie. the computation is - pure, without reliance on any excess implicit state. State - transitions which are deemed as ambiguous or banned will raise - an exception. - - """ - # store the value below "boxed" to emulate proper closures - # (variables of the enclosing scope are available inner functions - # provided they are no reassigned; mutation is OK). - param = FreeObject(relax_mismatch = False, state_change = None, index=-1) - def select_vi(vi0, vi): - param.index += 1 - if vi and (not vi0 or vi0['uuid'] == vi['uuid']): - if not vi0 and not param.relax_mismatch: - param.state_change = param.index - # valid new value found; for the rest, we are graceful about - # uuid mismatch - param.relax_mismatch = True - return vi - if vi0 and vi and vi0['uuid'] != vi['uuid'] and not param.relax_mismatch: - # uuid mismatch for master candidate, bail out - raise GsyncdError("aborting on uuid change from %s to %s" % \ - (vi0['uuid'], vi['uuid'])) - # fall back to old - return vi0 - newstate = tuple(select_vi(*vip) for vip in zip(volinfo_state, volinfo_sys)) - srep = lambda vi: vi and vi['uuid'][0:8] - logging.debug('(%s, %s) << (%s, %s) -> (%s, %s)' % \ - tuple(srep(vi) for vi in volinfo_state + volinfo_sys + newstate)) - return newstate, param.state_change - - def crawl(self, path='.', xtl=None): - """crawling... - - Standing around - All the right people - Crawling - Tennis on Tuesday - The ladder is long - It is your nature - You've gotta suntan - Football on Sunday - Society boy - - Recursively walk the master side tree and check if updates are - needed due to xtime differences. One invocation of crawl checks - children of @path and do a recursive enter only on - those directory children where there is an update needed. - - Way of updates depend on file type: - - for symlinks, sync them directy and synchronously - - for regular children, register jobs for @path (cf. .add_job) to start - and wait on their rsync - - for directory children, register a job for @path which waits (.wait) - on jobs for the given child - (other kind of filesystem nodes are not considered) - - Those slave side children which do not exist on master are simply - purged (see Server.purge). - - Behavior is fault tolerant, synchronization is adaptive: if some action fails, - just go on relentlessly, adding a fail job (see .add_failjob) which will prevent - the .sendmark on @path, so when the next crawl will arrive to @path it will not - see it as up-to-date and will try to sync it again. While this semantics can be - supported by funky design principles (http://c2.com/cgi/wiki?LazinessImpatienceHubris), - the ultimate reason which excludes other possibilities is simply transience: we cannot - assert that the file systems (master / slave) underneath do not change and actions - taken upon some condition will not lose their context by the time they are performed. - """ - if path == '.': - if self.start: - self.crawls += 1 - logging.debug("... crawl #%d done, took %.6f seconds" % \ - (self.crawls, time.time() - self.start)) - time.sleep(1) - self.start = time.time() - should_display_info = self.start - self.lastreport['time'] >= 60 - if should_display_info: - logging.info("completed %d crawls, %d turns", - self.crawls - self.lastreport['crawls'], - self.turns - self.lastreport['turns']) - self.lastreport.update(crawls = self.crawls, - turns = self.turns, - time = self.start) - volinfo_sys, state_change = self.volinfo_hook() - if self.inter_master: - self.volinfo = volinfo_sys[self.KFGN] - else: - self.volinfo = volinfo_sys[self.KNAT] - if state_change == self.KFGN or (state_change == self.KNAT and not self.inter_master): - logging.info('new master is %s', self.uuid) - if self.volinfo: - logging.info("%s master with volume id %s ..." % \ - (self.inter_master and "intermediate" or "primary", - self.uuid)) - if state_change == self.KFGN: - gconf.configinterface.set('volume_id', self.uuid) - if self.volinfo: - if self.volinfo['retval']: - raise GsyncdError ("master is corrupt") - self.start_checkpoint_thread() - else: - if should_display_info or self.crawls == 0: - if self.inter_master: - logging.info("waiting for being synced from %s ..." % \ - self.volinfo_state[self.KFGN]['uuid']) - else: - logging.info("waiting for volume info ...") - return - logging.debug("entering " + path) - if not xtl: - xtl = self.xtime(path) - if isinstance(xtl, int): - self.add_failjob(path, 'no-local-node') - return - xtr = self.xtime(path, self.slave) - if isinstance(xtr, int): - if xtr != ENOENT: - self.slave.server.purge(path) - try: - self.slave.server.mkdir(path) - except OSError: - self.add_failjob(path, 'no-remote-node') - return - xtr = self.minus_infinity - else: - self.xtime_reversion_hook(path, xtl, xtr) - if xtl == xtr: - if path == '.' and self.change_seen: - self.turns += 1 - self.change_seen = False - if self.total_turns: - logging.info("finished turn #%s/%s" % \ - (self.turns, self.total_turns)) - if self.turns == self.total_turns: - logging.info("reached turn limit") - self.terminate = True - return - if path == '.': - self.change_seen = True - try: - dem = self.master.server.entries(path) - except OSError: - self.add_failjob(path, 'local-entries-fail') - return - random.shuffle(dem) - try: - des = self.slave.server.entries(path) - except OSError: - self.slave.server.purge(path) - try: - self.slave.server.mkdir(path) - des = self.slave.server.entries(path) - except OSError: - self.add_failjob(path, 'remote-entries-fail') - return - dd = set(des) - set(dem) - if dd: - self.purge_missing(path, dd) - chld = [] - for e in dem: - e = os.path.join(path, e) - xte = self.xtime(e) - if isinstance(xte, int): - logging.warn("irregular xtime for %s: %s" % (e, errno.errorcode[xte])) - elif self.need_sync(e, xte, xtr): - chld.append((e, xte)) - def indulgently(e, fnc, blame=None): - if not blame: - blame = path - try: - return fnc(e) - except (IOError, OSError): - ex = sys.exc_info()[1] - if ex.errno == ENOENT: - logging.warn("salvaged ENOENT for " + e) - self.add_failjob(blame, 'by-indulgently') - return False - else: - raise - for e, xte in chld: - st = indulgently(e, lambda e: os.lstat(e)) - if st == False: - continue - mo = st.st_mode - adct = {'own': (st.st_uid, st.st_gid)} - if stat.S_ISLNK(mo): - if indulgently(e, lambda e: self.slave.server.symlink(os.readlink(e), e)) == False: - continue - self.sendmark(e, xte, adct) - elif stat.S_ISREG(mo): - logging.debug("syncing %s ..." % e) - pb = self.syncer.add(e) - timeA=datetime.now() - def regjob(e, xte, pb): - if pb.wait(): - logging.debug("synced " + e) - self.sendmark_regular(e, xte) - - timeB=datetime.now() - self.lastSyncTime=timeB-timeA - self.syncTime=(self.syncTime+self.lastSyncTime.microseconds)/(10.0**6) - self.filesSynced=self.filesSynced+1 - return True - else: - logging.warn("failed to sync " + e) - self.add_job(path, 'reg', regjob, e, xte, pb) - elif stat.S_ISDIR(mo): - adct['mode'] = mo - if indulgently(e, lambda e: (self.add_job(path, 'cwait', self.wait, e, xte, adct), - self.crawl(e, xte), - True)[-1], blame=e) == False: - continue - else: - # ignore fifos, sockets and special files - pass - if path == '.': - self.wait(path, xtl) - -class BoxClosedErr(Exception): - pass - -class PostBox(list): - """synchronized collection for storing things thought of as "requests" """ - - def __init__(self, *a): - list.__init__(self, *a) - # too bad Python stdlib does not have read/write locks... - # it would suffivce to grab the lock in .append as reader, in .close as writer - self.lever = Condition() - self.open = True - self.done = False - - def wait(self): - """wait on requests to be processed""" - self.lever.acquire() - if not self.done: - self.lever.wait() - self.lever.release() - return self.result - - def wakeup(self, data): - """wake up requestors with the result""" - self.result = data - self.lever.acquire() - self.done = True - self.lever.notifyAll() - self.lever.release() - - def append(self, e): - """post a request""" - self.lever.acquire() - if not self.open: - raise BoxClosedErr - list.append(self, e) - self.lever.release() - - def close(self): - """prohibit the posting of further requests""" - self.lever.acquire() - self.open = False - self.lever.release() - -class Syncer(object): - """a staged queue to relay rsync requests to rsync workers - - By "staged queue" its meant that when a consumer comes to the - queue, it takes _all_ entries, leaving the queue empty. - (I don't know if there is an official term for this pattern.) - - The queue uses a PostBox to accumulate incoming items. - When a consumer (rsync worker) comes, a new PostBox is - set up and the old one is passed on to the consumer. - - Instead of the simplistic scheme of having one big lock - which synchronizes both the addition of new items and - PostBox exchanges, use a separate lock to arbitrate consumers, - and rely on PostBox's synchronization mechanisms take - care about additions. - - There is a corner case racy situation, producers vs. consumers, - which is not handled by this scheme: namely, when the PostBox - exchange occurs in between being passed to the producer for posting - and the post placement. But that's what Postbox.close is for: - such a posting will find the PostBox closed, in which case - the producer can re-try posting against the actual PostBox of - the queue. - - To aid accumlation of items in the PostBoxen before grabbed - by an rsync worker, the worker goes to sleep a bit after - each completed syncjob. - """ - - def __init__(self, slave): - """spawn worker threads""" - self.slave = slave - self.lock = Lock() - self.pb = PostBox() - self.bytesSynced=0 - for i in range(int(gconf.sync_jobs)): - t = Thread(target=self.syncjob) - t.start() - - def syncjob(self): - """the life of a worker""" - while True: - pb = None - while True: - self.lock.acquire() - if self.pb: - pb, self.pb = self.pb, PostBox() - self.lock.release() - if pb: - break - time.sleep(0.5) - pb.close() - po = self.slave.rsync(pb) - if po.returncode == 0: - regEx=re.search('\ *total\ *transferred\ *file\ *size:\ *(\d+)\ *bytes\ *',po.stdout.read(),re.IGNORECASE) - if regEx: - self.bytesSynced+=(int(regEx.group(1)))/1024 - ret = True - elif po.returncode in (23, 24): - # partial transfer (cf. rsync(1)), that's normal - ret = False - else: - po.errfail() - pb.wakeup(ret) - - def add(self, e): - while True: - pb = self.pb - try: - pb.append(e) - return pb - except BoxClosedErr: - pass diff --git a/xlators/features/marker/utils/syncdaemon/monitor.py b/xlators/features/marker/utils/syncdaemon/monitor.py deleted file mode 100644 index b8956dcc2b9..00000000000 --- a/xlators/features/marker/utils/syncdaemon/monitor.py +++ /dev/null @@ -1,129 +0,0 @@ -import os -import sys -import time -import signal -import logging -from gconf import gconf -from syncdutils import update_file, select, waitpid, set_term_handler - -class Monitor(object): - """class which spawns and manages gsyncd workers""" - - def __init__(self): - self.state = None - - def set_state(self, state): - """set the state that can be used by external agents - like glusterd for status reporting""" - if state == self.state: - return - self.state = state - logging.info('new state: %s' % state) - if getattr(gconf, 'state_file', None): - update_file(gconf.state_file, lambda f: f.write(state + '\n')) - - def monitor(self): - """the monitor loop - - Basic logic is a blantantly simple blunt heuristics: - if spawned client survives 60 secs, it's considered OK. - This servers us pretty well as it's not vulneralbe to - any kind of irregular behavior of the child... - - ... well, except for one: if children is hung up on - waiting for some event, it can survive aeons, still - will be defunct. So we tweak the above logic to - expect the worker to send us a signal within 60 secs - (in the form of closing its end of a pipe). The worker - does this when it's done with the setup stage - ready to enter the service loop (note it's the setup - stage which is vulnerable to hangs -- the full - blown worker blows up on EPIPE if the net goes down, - due to the keep-alive thread) - """ - def sigcont_handler(*a): - """ - Re-init logging and send group kill signal - """ - md = gconf.log_metadata - logging.shutdown() - lcls = logging.getLoggerClass() - lcls.setup(label=md.get('saved_label'), **md) - pid = os.getpid() - os.kill(-pid, signal.SIGUSR1) - signal.signal(signal.SIGUSR1, lambda *a: ()) - signal.signal(signal.SIGCONT, sigcont_handler) - - argv = sys.argv[:] - for o in ('-N', '--no-daemon', '--monitor'): - while o in argv: - argv.remove(o) - argv.extend(('-N', '-p', '')) - argv.insert(0, os.path.basename(sys.executable)) - - self.set_state('starting...') - ret = 0 - def nwait(p, o=0): - p2, r = waitpid(p, o) - if not p2: - return - return r - def exit_signalled(s): - """ child teminated due to receipt of SIGUSR1 """ - return (os.WIFSIGNALED(s) and (os.WTERMSIG(s) == signal.SIGUSR1)) - def exit_status(s): - if os.WIFEXITED(s): - return os.WEXITSTATUS(s) - return 1 - conn_timeout = int(gconf.connection_timeout) - while ret in (0, 1): - logging.info('-' * conn_timeout) - logging.info('starting gsyncd worker') - pr, pw = os.pipe() - cpid = os.fork() - if cpid == 0: - os.close(pr) - os.execv(sys.executable, argv + ['--feedback-fd', str(pw)]) - os.close(pw) - t0 = time.time() - so = select((pr,), (), (), conn_timeout)[0] - os.close(pr) - if so: - ret = nwait(cpid, os.WNOHANG) - if ret != None: - logging.debug("worker died before establishing connection") - else: - logging.debug("worker seems to be connected (?? racy check)") - while time.time() < t0 + conn_timeout: - ret = nwait(cpid, os.WNOHANG) - if ret != None: - logging.debug("worker died in startup phase") - break - time.sleep(1) - else: - logging.debug("worker not confirmed in %d sec, aborting it" % \ - conn_timeout) - # relax one SIGTERM by setting a handler that sets back - # standard handler - set_term_handler(lambda *a: set_term_handler()) - # give a chance to graceful exit - os.kill(-os.getpid(), signal.SIGTERM) - time.sleep(1) - os.kill(cpid, signal.SIGKILL) - ret = nwait(cpid) - if ret == None: - self.set_state('OK') - ret = nwait(cpid) - if exit_signalled(ret): - ret = 0 - else: - ret = exit_status(ret) - if ret in (0,1): - self.set_state('faulty') - time.sleep(10) - self.set_state('inconsistent') - return ret - -def monitor(): - """oh yeah, actually Monitor is used as singleton, too""" - return Monitor().monitor() diff --git a/xlators/features/marker/utils/syncdaemon/repce.py b/xlators/features/marker/utils/syncdaemon/repce.py deleted file mode 100644 index 755fb61df48..00000000000 --- a/xlators/features/marker/utils/syncdaemon/repce.py +++ /dev/null @@ -1,225 +0,0 @@ -import os -import sys -import time -import logging -from threading import Condition -try: - import thread -except ImportError: - # py 3 - import _thread as thread -try: - from Queue import Queue -except ImportError: - # py 3 - from queue import Queue -try: - import cPickle as pickle -except ImportError: - # py 3 - import pickle - -from syncdutils import Thread, select - -pickle_proto = -1 -repce_version = 1.0 - -def ioparse(i, o): - if isinstance(i, int): - i = os.fdopen(i) - # rely on duck typing for recognizing - # streams as that works uniformly - # in py2 and py3 - if hasattr(o, 'fileno'): - o = o.fileno() - return (i, o) - -def send(out, *args): - """pickle args and write out wholly in one syscall - - ie. not use the ability of pickle to dump directly to - a stream, as that would potentially mess up messages - by interleaving them - """ - os.write(out, pickle.dumps(args, pickle_proto)) - -def recv(inf): - """load an object from input stream""" - return pickle.load(inf) - - -class RepceServer(object): - """RePCe is Hungarian for canola, http://hu.wikipedia.org/wiki/Repce - - ... also our homebrewed RPC backend where the transport layer is - reduced to a pair of filehandles. - - This is the server component. - """ - - def __init__(self, obj, i, o, wnum=6): - """register a backend object .obj to which incoming messages - are dispatched, also incoming/outcoming streams - """ - self.obj = obj - self.inf, self.out = ioparse(i, o) - self.wnum = wnum - self.q = Queue() - - def service_loop(self): - """fire up worker threads, get messages and dispatch among them""" - for i in range(self.wnum): - t = Thread(target=self.worker) - t.start() - try: - while True: - self.q.put(recv(self.inf)) - except EOFError: - logging.info("terminating on reaching EOF.") - - def worker(self): - """life of a worker - - Get message, extract its id, method name and arguments - (kwargs not supported), call method on .obj. - Send back message id + return value. - If method call throws an exception, rescue it, and send - back the exception as result (with flag marking it as - exception). - """ - while True: - in_data = self.q.get(True) - rid = in_data[0] - rmeth = in_data[1] - exc = False - if rmeth == '__repce_version__': - res = repce_version - else: - try: - res = getattr(self.obj, rmeth)(*in_data[2:]) - except: - res = sys.exc_info()[1] - exc = True - logging.exception("call failed: ") - send(self.out, rid, exc, res) - - -class RepceJob(object): - """class representing message status we can use - for waiting on reply""" - - def __init__(self, cbk): - """ - - .rid: (process-wise) unique id - - .cbk: what we do upon receiving reply - """ - self.rid = (os.getpid(), thread.get_ident(), time.time()) - self.cbk = cbk - self.lever = Condition() - self.done = False - - def __repr__(self): - return ':'.join([str(x) for x in self.rid]) - - def wait(self): - self.lever.acquire() - if not self.done: - self.lever.wait() - self.lever.release() - return self.result - - def wakeup(self, data): - self.result = data - self.lever.acquire() - self.done = True - self.lever.notify() - self.lever.release() - - -class RepceClient(object): - """RePCe is Hungarian for canola, http://hu.wikipedia.org/wiki/Repce - - ... also our homebrewed RPC backend where the transport layer is - reduced to a pair of filehandles. - - This is the client component. - """ - - def __init__(self, i, o): - self.inf, self.out = ioparse(i, o) - self.jtab = {} - t = Thread(target = self.listen) - t.start() - - def listen(self): - while True: - select((self.inf,), (), ()) - rid, exc, res = recv(self.inf) - rjob = self.jtab.pop(rid) - if rjob.cbk: - rjob.cbk(rjob, [exc, res]) - - def push(self, meth, *args, **kw): - """wrap arguments in a RepceJob, send them to server - and return the RepceJob - - @cbk to pass on RepceJob can be given as kwarg. - """ - cbk = kw.get('cbk') - if not cbk: - def cbk(rj, res): - if res[0]: - raise res[1] - rjob = RepceJob(cbk) - self.jtab[rjob.rid] = rjob - logging.debug("call %s %s%s ..." % (repr(rjob), meth, repr(args))) - send(self.out, rjob.rid, meth, *args) - return rjob - - def __call__(self, meth, *args): - """RePCe client is callabe, calling it implements a synchronous remote call - - We do a .push with a cbk which does a wakeup upon receiving anwser, then wait - on the RepceJob. - """ - rjob = self.push(meth, *args, **{'cbk': lambda rj, res: rj.wakeup(res)}) - exc, res = rjob.wait() - if exc: - logging.error('call %s (%s) failed on peer with %s' % (repr(rjob), meth, str(type(res).__name__))) - raise res - logging.debug("call %s %s -> %s" % (repr(rjob), meth, repr(res))) - return res - - class mprx(object): - """method proxy, standard trick to implement rubyesque method_missing - in Python - - A class is a closure factory, you know what I mean, or go read some SICP. - """ - def __init__(self, ins, meth): - self.ins = ins - self.meth = meth - - def __call__(self, *a): - return self.ins(self.meth, *a) - - def __getattr__(self, meth): - """this implements transparent method dispatch to remote object, - so that you don't need to call the RepceClient instance like - - rclient('how_old_are_you_if_born_in', 1979) - - but you can make it into an ordinary method call like - - rclient.how_old_are_you_if_born_in(1979) - """ - return self.mprx(self, meth) - - def __version__(self): - """used in handshake to verify compatibility""" - d = {'proto': self('__repce_version__')} - try: - d['object'] = self('version') - except AttributeError: - pass - return d diff --git a/xlators/features/marker/utils/syncdaemon/resource.py b/xlators/features/marker/utils/syncdaemon/resource.py deleted file mode 100644 index 73102fbcb44..00000000000 --- a/xlators/features/marker/utils/syncdaemon/resource.py +++ /dev/null @@ -1,972 +0,0 @@ -import re -import os -import sys -import stat -import time -import fcntl -import errno -import struct -import socket -import logging -import tempfile -import threading -import subprocess -from errno import EEXIST, ENOENT, ENODATA, ENOTDIR, ELOOP, EISDIR -from select import error as SelectError - -from gconf import gconf -import repce -from repce import RepceServer, RepceClient -from master import gmaster_builder -import syncdutils -from syncdutils import GsyncdError, select, privileged, boolify - -UrlRX = re.compile('\A(\w+)://([^ *?[]*)\Z') -HostRX = re.compile('[a-z\d](?:[a-z\d.-]*[a-z\d])?', re.I) -UserRX = re.compile("[\w!\#$%&'*+-\/=?^_`{|}~]+") - -def sup(x, *a, **kw): - """a rubyesque "super" for python ;) - - invoke caller method in parent class with given args. - """ - return getattr(super(type(x), x), sys._getframe(1).f_code.co_name)(*a, **kw) - -def desugar(ustr): - """transform sugared url strings to standard :// form - - parsing logic enforces the constraint that sugared forms should contatin - a ':' or a '/', which ensures that sugared urls do not conflict with - gluster volume names. - """ - m = re.match('([^:]*):(.*)', ustr) - if m: - if not m.groups()[0]: - return "gluster://localhost" + ustr - elif '@' in m.groups()[0] or re.search('[:/]', m.groups()[1]): - return "ssh://" + ustr - else: - return "gluster://" + ustr - else: - if ustr[0] != '/': - raise GsyncdError("cannot resolve sugared url '%s'" % ustr) - ap = os.path.normpath(ustr) - if ap.startswith('//'): - ap = ap[1:] - return "file://" + ap - -def gethostbyname(hnam): - """gethostbyname wrapper""" - try: - return socket.gethostbyname(hnam) - except socket.gaierror: - ex = sys.exc_info()[1] - raise GsyncdError("failed to resolve %s: %s" % \ - (hnam, ex.strerror)) - -def parse_url(ustr): - """instantiate an url object by scheme-to-class dispatch - - The url classes taken into consideration are the ones in - this module whose names are full-caps. - """ - m = UrlRX.match(ustr) - if not m: - ustr = desugar(ustr) - m = UrlRX.match(ustr) - if not m: - raise GsyncdError("malformed url") - sch, path = m.groups() - this = sys.modules[__name__] - if not hasattr(this, sch.upper()): - raise GsyncdError("unknown url scheme " + sch) - return getattr(this, sch.upper())(path) - - -class _MetaXattr(object): - """singleton class, a lazy wrapper around the - libcxattr module - - libcxattr (a heavy import due to ctypes) is - loaded only when when the single - instance is tried to be used. - - This reduces runtime for those invocations - which do not need filesystem manipulation - (eg. for config, url parsing) - """ - - def __getattr__(self, meth): - from libcxattr import Xattr as LXattr - xmeth = [ m for m in dir(LXattr) if m[0] != '_' ] - if not meth in xmeth: - return - for m in xmeth: - setattr(self, m, getattr(LXattr, m)) - return getattr(self, meth) - -Xattr = _MetaXattr() - - -class Popen(subprocess.Popen): - """customized subclass of subprocess.Popen with a ring - buffer for children error output""" - - @classmethod - def init_errhandler(cls): - """start the thread which handles children's error output""" - cls.errstore = {} - def tailer(): - while True: - errstore = cls.errstore.copy() - try: - poe, _ ,_ = select([po.stderr for po in errstore], [], [], 1) - except (ValueError, SelectError): - continue - for po in errstore: - if po.stderr not in poe: - continue - po.lock.acquire() - try: - if po.on_death_row: - continue - la = errstore[po] - try: - fd = po.stderr.fileno() - except ValueError: # file is already closed - continue - l = os.read(fd, 1024) - if not l: - continue - tots = len(l) - for lx in la: - tots += len(lx) - while tots > 1<<20 and la: - tots -= len(la.pop(0)) - la.append(l) - finally: - po.lock.release() - t = syncdutils.Thread(target = tailer) - t.start() - cls.errhandler = t - - @classmethod - def fork(cls): - """fork wrapper that restarts errhandler thread in child""" - pid = os.fork() - if not pid: - cls.init_errhandler() - return pid - - def __init__(self, args, *a, **kw): - """customizations for subprocess.Popen instantiation - - - 'close_fds' is taken to be the default - - if child's stderr is chosen to be managed, - register it with the error handler thread - """ - self.args = args - if 'close_fds' not in kw: - kw['close_fds'] = True - self.lock = threading.Lock() - self.on_death_row = False - try: - sup(self, args, *a, **kw) - except: - ex = sys.exc_info()[1] - if not isinstance(ex, OSError): - raise - raise GsyncdError("""execution of "%s" failed with %s (%s)""" % \ - (args[0], errno.errorcode[ex.errno], os.strerror(ex.errno))) - if kw.get('stderr') == subprocess.PIPE: - assert(getattr(self, 'errhandler', None)) - self.errstore[self] = [] - - def errlog(self): - """make a log about child's failure event""" - filling = "" - if self.elines: - filling = ", saying:" - logging.error("""command "%s" returned with %s%s""" % \ - (" ".join(self.args), repr(self.returncode), filling)) - lp = '' - def logerr(l): - logging.error(self.args[0] + "> " + l) - for l in self.elines: - ls = l.split('\n') - ls[0] = lp + ls[0] - lp = ls.pop() - for ll in ls: - logerr(ll) - if lp: - logerr(lp) - - def errfail(self): - """fail nicely if child did not terminate with success""" - self.errlog() - syncdutils.finalize(exval = 1) - - def terminate_geterr(self, fail_on_err = True): - """kill child, finalize stderr harvesting (unregister - from errhandler, set up .elines), fail on error if - asked for - """ - self.lock.acquire() - try: - self.on_death_row = True - finally: - self.lock.release() - elines = self.errstore.pop(self) - if self.poll() == None: - self.terminate() - if self.poll() == None: - time.sleep(0.1) - self.kill() - self.wait() - while True: - if not select([self.stderr],[],[],0.1)[0]: - break - b = os.read(self.stderr.fileno(), 1024) - if b: - elines.append(b) - else: - break - self.stderr.close() - self.elines = elines - if fail_on_err and self.returncode != 0: - self.errfail() - - -class Server(object): - """singleton implemening those filesystem access primitives - which are needed for geo-replication functionality - - (Singleton in the sense it's a class which has only static - and classmethods and is used directly, without instantiation.) - """ - - GX_NSPACE = (privileged() and "trusted" or "system") + ".glusterfs" - NTV_FMTSTR = "!" + "B"*19 + "II" - FRGN_XTRA_FMT = "I" - FRGN_FMTSTR = NTV_FMTSTR + FRGN_XTRA_FMT - - def _pathguard(f): - """decorator method that checks - the path argument of the decorated - functions to make sure it does not - point out of the managed tree - """ - - fc = getattr(f, 'func_code', None) - if not fc: - # python 3 - fc = f.__code__ - pi = list(fc.co_varnames).index('path') - def ff(*a): - path = a[pi] - ps = path.split('/') - if path[0] == '/' or '..' in ps: - raise ValueError('unsafe path') - return f(*a) - return ff - - @staticmethod - @_pathguard - def entries(path): - """directory entries in an array""" - # prevent symlinks being followed - if not stat.S_ISDIR(os.lstat(path).st_mode): - raise OSError(ENOTDIR, os.strerror(ENOTDIR)) - return os.listdir(path) - - @classmethod - @_pathguard - def purge(cls, path, entries=None): - """force-delete subtrees - - If @entries is not specified, delete - the whole subtree under @path (including - @path). - - Otherwise, @entries should be a - a sequence of children of @path, and - the effect is identical with a joint - @entries-less purge on them, ie. - - for e in entries: - cls.purge(os.path.join(path, e)) - """ - me_also = entries == None - if not entries: - try: - # if it's a symlink, prevent - # following it - try: - os.unlink(path) - return - except OSError: - ex = sys.exc_info()[1] - if ex.errno == EISDIR: - entries = os.listdir(path) - else: - raise - except OSError: - ex = sys.exc_info()[1] - if ex.errno in (ENOTDIR, ENOENT, ELOOP): - try: - os.unlink(path) - return - except OSError: - ex = sys.exc_info()[1] - if ex.errno == ENOENT: - return - raise - else: - raise - for e in entries: - cls.purge(os.path.join(path, e)) - if me_also: - os.rmdir(path) - - @classmethod - @_pathguard - def _create(cls, path, ctor): - """path creation backend routine""" - try: - ctor(path) - except OSError: - ex = sys.exc_info()[1] - if ex.errno == EEXIST: - cls.purge(path) - return ctor(path) - raise - - @classmethod - @_pathguard - def mkdir(cls, path): - cls._create(path, os.mkdir) - - @classmethod - @_pathguard - def symlink(cls, lnk, path): - cls._create(path, lambda p: os.symlink(lnk, p)) - - @classmethod - @_pathguard - def xtime(cls, path, uuid): - """query xtime extended attribute - - Return xtime of @path for @uuid as a pair of integers. - "Normal" errors due to non-existent @path or extended attribute - are tolerated and errno is returned in such a case. - """ - - try: - return struct.unpack('!II', Xattr.lgetxattr(path, '.'.join([cls.GX_NSPACE, uuid, 'xtime']), 8)) - except OSError: - ex = sys.exc_info()[1] - if ex.errno in (ENOENT, ENODATA, ENOTDIR): - return ex.errno - else: - raise - - @classmethod - def xtime_vec(cls, path, *uuids): - """vectored version of @xtime - - accepts a list of uuids and returns a dictionary - with uuid as key(s) and xtime as value(s) - """ - xt = {} - for uuid in uuids: - xtu = cls.xtime(path, uuid) - if xtu == ENODATA: - xtu = None - if isinstance(xtu, int): - return xtu - xt[uuid] = xtu - return xt - - @classmethod - @_pathguard - def set_xtime(cls, path, uuid, mark): - """set @mark as xtime for @uuid on @path""" - Xattr.lsetxattr(path, '.'.join([cls.GX_NSPACE, uuid, 'xtime']), struct.pack('!II', *mark)) - - @classmethod - def set_xtime_vec(cls, path, mark_dct): - """vectored (or dictered) version of set_xtime - - ignore values that match @ignore - """ - for u,t in mark_dct.items(): - cls.set_xtime(path, u, t) - - @staticmethod - @_pathguard - def setattr(path, adct): - """set file attributes - - @adct is a dict, where 'own', 'mode' and 'times' - keys are looked for and values used to perform - chown, chmod or utimes on @path. - """ - own = adct.get('own') - if own: - os.lchown(path, *own) - mode = adct.get('mode') - if mode: - os.chmod(path, stat.S_IMODE(mode)) - times = adct.get('times') - if times: - os.utime(path, times) - - @staticmethod - def pid(): - return os.getpid() - - last_keep_alive = 0 - @classmethod - def keep_alive(cls, dct): - """process keepalive messages. - - Return keep-alive counter (number of received keep-alive - messages). - - Now the "keep-alive" message can also have a payload which is - used to set a foreign volume-mark on the underlying file system. - """ - if dct: - key = '.'.join([cls.GX_NSPACE, 'volume-mark', dct['uuid']]) - val = struct.pack(cls.FRGN_FMTSTR, - *(dct['version'] + - tuple(int(x,16) for x in re.findall('(?:[\da-f]){2}', dct['uuid'])) + - (dct['retval'],) + dct['volume_mark'][0:2] + (dct['timeout'],))) - Xattr.lsetxattr('.', key, val) - cls.last_keep_alive += 1 - return cls.last_keep_alive - - @staticmethod - def version(): - """version used in handshake""" - return 1.0 - - -class SlaveLocal(object): - """mix-in class to implement some factes of a slave server - - ("mix-in" is sort of like "abstract class", ie. it's not - instantiated just included in the ancesty DAG. I use "mix-in" - to indicate that it's not used as an abstract base class, - rather just taken in to implement additional functionality - on the basis of the assumed availability of certain interfaces.) - """ - - def can_connect_to(self, remote): - """determine our position in the connectibility matrix""" - return not remote - - def service_loop(self): - """start a RePCe server serving self's server - - stop servicing if a timeout is configured and got no - keep-alime in that inteval - """ - - if boolify(gconf.use_rsync_xattrs) and not privileged(): - raise GsyncdError("using rsync for extended attributes is not supported") - - repce = RepceServer(self.server, sys.stdin, sys.stdout, int(gconf.sync_jobs)) - t = syncdutils.Thread(target=lambda: (repce.service_loop(), - syncdutils.finalize())) - t.start() - logging.info("slave listening") - if gconf.timeout and int(gconf.timeout) > 0: - while True: - lp = self.server.last_keep_alive - time.sleep(int(gconf.timeout)) - if lp == self.server.last_keep_alive: - logging.info("connection inactive for %d seconds, stopping" % int(gconf.timeout)) - break - else: - select((), (), ()) - -class SlaveRemote(object): - """mix-in class to implement an interface to a remote slave""" - - def connect_remote(self, rargs=[], **opts): - """connects to a remote slave - - Invoke an auxiliary utility (slave gsyncd, possibly wrapped) - which sets up the connection and set up a RePCe client to - communicate throuh its stdio. - """ - slave = opts.get('slave', self.url) - extra_opts = [] - so = getattr(gconf, 'session_owner', None) - if so: - extra_opts += ['--session-owner', so] - if boolify(gconf.use_rsync_xattrs): - extra_opts.append('--use-rsync-xattrs') - po = Popen(rargs + gconf.remote_gsyncd.split() + extra_opts + \ - ['-N', '--listen', '--timeout', str(gconf.timeout), slave], - stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - gconf.transport = po - return self.start_fd_client(po.stdout, po.stdin, **opts) - - def start_fd_client(self, i, o, **opts): - """set up RePCe client, handshake with server - - It's cut out as a separate method to let - subclasses hook into client startup - """ - self.server = RepceClient(i, o) - rv = self.server.__version__() - exrv = {'proto': repce.repce_version, 'object': Server.version()} - da0 = (rv, exrv) - da1 = ({}, {}) - for i in range(2): - for k, v in da0[i].iteritems(): - da1[i][k] = int(v) - if da1[0] != da1[1]: - raise GsyncdError("RePCe major version mismatch: local %s, remote %s" % (exrv, rv)) - - def rsync(self, files, *args): - """invoke rsync""" - if not files: - raise GsyncdError("no files to sync") - logging.debug("files: " + ", ".join(files)) - argv = gconf.rsync_command.split() + \ - ['-aR0', '--files-from=-', '--super','--stats', '--numeric-ids', '--no-implied-dirs'] + \ - gconf.rsync_options.split() + (boolify(gconf.use_rsync_xattrs) and ['--xattrs'] or []) + \ - ['.'] + list(args) - po = Popen(argv, stdin=subprocess.PIPE,stdout=subprocess.PIPE,stderr=subprocess.PIPE) - for f in files: - po.stdin.write(f) - po.stdin.write('\0') - - po.stdin.close() - po.wait() - po.terminate_geterr(fail_on_err = False) - - return po - - -class AbstractUrl(object): - """abstract base class for url scheme classes""" - - def __init__(self, path, pattern): - m = re.search(pattern, path) - if not m: - raise GsyncdError("malformed path") - self.path = path - return m.groups() - - @property - def scheme(self): - return type(self).__name__.lower() - - def canonical_path(self): - return self.path - - def get_url(self, canonical=False, escaped=False): - """format self's url in various styles""" - if canonical: - pa = self.canonical_path() - else: - pa = self.path - u = "://".join((self.scheme, pa)) - if escaped: - u = syncdutils.escape(u) - return u - - @property - def url(self): - return self.get_url() - - - ### Concrete resource classes ### - - -class FILE(AbstractUrl, SlaveLocal, SlaveRemote): - """scheme class for file:// urls - - can be used to represent a file slave server - on slave side, or interface to a remote file - file server on master side - """ - - class FILEServer(Server): - """included server flavor""" - pass - - server = FILEServer - - def __init__(self, path): - sup(self, path, '^/') - - def connect(self): - """inhibit the resource beyond""" - os.chdir(self.path) - - def rsync(self, files): - return sup(self, files, self.path) - - -class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote): - """scheme class for gluster:// urls - - can be used to represent a gluster slave server - on slave side, or interface to a remote gluster - slave on master side, or to represent master - (slave-ish features come from the mixins, master - functionality is outsourced to GMaster from master) - """ - - class GLUSTERServer(Server): - "server enhancements for a glusterfs backend""" - - @classmethod - def _attr_unpack_dict(cls, xattr, extra_fields = ''): - """generic volume mark fetching/parsing backed""" - fmt_string = cls.NTV_FMTSTR + extra_fields - buf = Xattr.lgetxattr('.', xattr, struct.calcsize(fmt_string)) - vm = struct.unpack(fmt_string, buf) - m = re.match('(.{8})(.{4})(.{4})(.{4})(.{12})', "".join(['%02x' % x for x in vm[2:18]])) - uuid = '-'.join(m.groups()) - volinfo = { 'version': vm[0:2], - 'uuid' : uuid, - 'retval' : vm[18], - 'volume_mark': vm[19:21], - } - if extra_fields: - return volinfo, vm[-len(extra_fields):] - else: - return volinfo - - @classmethod - def foreign_volume_infos(cls): - """return list of valid (not expired) foreign volume marks""" - dict_list = [] - xattr_list = Xattr.llistxattr_buf('.') - for ele in xattr_list: - if ele.find('.'.join([cls.GX_NSPACE, 'volume-mark', ''])) == 0: - d, x = cls._attr_unpack_dict(ele, cls.FRGN_XTRA_FMT) - now = int(time.time()) - if x[0] > now: - logging.debug("volinfo[%s] expires: %d (%d sec later)" % \ - (d['uuid'], x[0], x[0] - now)) - d['timeout'] = x[0] - dict_list.append(d) - else: - try: - Xattr.lremovexattr('.', ele) - except OSError: - pass - return dict_list - - @classmethod - def native_volume_info(cls): - """get the native volume mark of the underlying gluster volume""" - try: - return cls._attr_unpack_dict('.'.join([cls.GX_NSPACE, 'volume-mark'])) - except OSError: - ex = sys.exc_info()[1] - if ex.errno != ENODATA: - raise - - server = GLUSTERServer - - def __init__(self, path): - self.host, self.volume = sup(self, path, '^(%s):(.+)' % HostRX.pattern) - - def canonical_path(self): - return ':'.join([gethostbyname(self.host), self.volume]) - - def can_connect_to(self, remote): - """determine our position in the connectibility matrix""" - return True - - class Mounter(object): - """Abstract base class for mounter backends""" - - def __init__(self, params): - self.params = params - self.mntpt = None - - @classmethod - def get_glusterprog(cls): - return os.path.join(gconf.gluster_command_dir, cls.glusterprog) - - def umount_l(self, d): - """perform lazy umount""" - po = Popen(self.make_umount_argv(d), stderr=subprocess.PIPE) - po.wait() - return po - - @classmethod - def make_umount_argv(cls, d): - raise NotImplementedError - - def make_mount_argv(self, *a): - raise NotImplementedError - - def cleanup_mntpt(self, *a): - pass - - def handle_mounter(self, po): - po.wait() - - def inhibit(self, *a): - """inhibit a gluster filesystem - - Mount glusterfs over a temporary mountpoint, - change into the mount, and lazy unmount the - filesystem. - """ - - mpi, mpo = os.pipe() - mh = Popen.fork() - if mh: - os.close(mpi) - fcntl.fcntl(mpo, fcntl.F_SETFD, fcntl.FD_CLOEXEC) - d = None - margv = self.make_mount_argv(*a) - if self.mntpt: - # mntpt is determined pre-mount - d = self.mntpt - os.write(mpo, d + '\0') - po = Popen(margv, **self.mountkw) - self.handle_mounter(po) - po.terminate_geterr() - logging.debug('auxiliary glusterfs mount in place') - if not d: - # mntpt is determined during mount - d = self.mntpt - os.write(mpo, d + '\0') - os.write(mpo, 'M') - t = syncdutils.Thread(target=lambda: os.chdir(d)) - t.start() - tlim = gconf.starttime + int(gconf.connection_timeout) - while True: - if not t.isAlive(): - break - if time.time() >= tlim: - syncdutils.finalize(exval = 1) - time.sleep(1) - os.close(mpo) - _, rv = syncdutils.waitpid(mh, 0) - if rv: - rv = (os.WIFEXITED(rv) and os.WEXITSTATUS(rv) or 0) - \ - (os.WIFSIGNALED(rv) and os.WTERMSIG(rv) or 0) - logging.warn('stale mount possibly left behind on ' + d) - raise GsyncdError("cleaning up temp mountpoint %s failed with status %d" % \ - (d, rv)) - else: - rv = 0 - try: - os.setsid() - os.close(mpo) - mntdata = '' - while True: - c = os.read(mpi, 1) - if not c: - break - mntdata += c - if mntdata: - mounted = False - if mntdata[-1] == 'M': - mntdata = mntdata[:-1] - assert(mntdata) - mounted = True - assert(mntdata[-1] == '\0') - mntpt = mntdata[:-1] - assert(mntpt) - if mounted: - po = self.umount_l(mntpt) - po.terminate_geterr(fail_on_err = False) - if po.returncode != 0: - po.errlog() - rv = po.returncode - self.cleanup_mntpt(mntpt) - except: - logging.exception('mount cleanup failure:') - rv = 200 - os._exit(rv) - logging.debug('auxiliary glusterfs mount prepared') - - class DirectMounter(Mounter): - """mounter backend which calls mount(8), umount(8) directly""" - - mountkw = {'stderr': subprocess.PIPE} - glusterprog = 'glusterfs' - - @staticmethod - def make_umount_argv(d): - return ['umount', '-l', d] - - def make_mount_argv(self): - self.mntpt = tempfile.mkdtemp(prefix = 'gsyncd-aux-mount-') - return [self.get_glusterprog()] + ['--' + p for p in self.params] + [self.mntpt] - - def cleanup_mntpt(self, mntpt = None): - if not mntpt: - mntpt = self.mntpt - os.rmdir(mntpt) - - class MountbrokerMounter(Mounter): - """mounter backend using the mountbroker gluster service""" - - mountkw = {'stderr': subprocess.PIPE, 'stdout': subprocess.PIPE} - glusterprog = 'gluster' - - @classmethod - def make_cli_argv(cls): - return [cls.get_glusterprog()] + gconf.gluster_cli_options.split() + ['system::'] - - @classmethod - def make_umount_argv(cls, d): - return cls.make_cli_argv() + ['umount', d, 'lazy'] - - def make_mount_argv(self, label): - return self.make_cli_argv() + \ - ['mount', label, 'user-map-root=' + syncdutils.getusername()] + self.params - - def handle_mounter(self, po): - self.mntpt = po.stdout.readline()[:-1] - po.stdout.close() - sup(self, po) - if po.returncode != 0: - # if cli terminated with error due to being - # refused by glusterd, what it put - # out on stdout is a diagnostic message - logging.error('glusterd answered: %s' % self.mntpt) - - def connect(self): - """inhibit the resource beyond - - Choose mounting backend (direct or mountbroker), - set up glusterfs parameters and perform the mount - with given backend - """ - - label = getattr(gconf, 'mountbroker', None) - if not label and not privileged(): - label = syncdutils.getusername() - mounter = label and self.MountbrokerMounter or self.DirectMounter - params = gconf.gluster_params.split() + \ - (gconf.gluster_log_level and ['log-level=' + gconf.gluster_log_level] or []) + \ - ['log-file=' + gconf.gluster_log_file, 'volfile-server=' + self.host, - 'volfile-id=' + self.volume, 'client-pid=-1'] - mounter(params).inhibit(*[l for l in [label] if l]) - - def connect_remote(self, *a, **kw): - sup(self, *a, **kw) - self.slavedir = "/proc/%d/cwd" % self.server.pid() - - def service_loop(self, *args): - """enter service loop - - - if slave given, instantiate GMaster and - pass control to that instance, which implements - master behavior - - else do that's what's inherited - """ - if args: - gmaster_builder()(self, args[0]).crawl_loop() - else: - sup(self, *args) - - def rsync(self, files): - return sup(self, files, self.slavedir) - - -class SSH(AbstractUrl, SlaveRemote): - """scheme class for ssh:// urls - - interface to remote slave on master side - implementing an ssh based proxy - """ - - def __init__(self, path): - self.remote_addr, inner_url = sup(self, path, - '^((?:%s@)?%s):(.+)' % tuple([ r.pattern for r in (UserRX, HostRX) ])) - self.inner_rsc = parse_url(inner_url) - - def canonical_path(self): - m = re.match('([^@]+)@(.+)', self.remote_addr) - if m: - u, h = m.groups() - else: - u, h = syncdutils.getusername(), self.remote_addr - remote_addr = '@'.join([u, gethostbyname(h)]) - return ':'.join([remote_addr, self.inner_rsc.get_url(canonical=True)]) - - def can_connect_to(self, remote): - """determine our position in the connectibility matrix""" - return False - - def start_fd_client(self, *a, **opts): - """customizations for client startup - - - be a no-op if we are to daemonize (client startup is deferred - to post-daemon stage) - - determine target url for rsync after consulting server - """ - if opts.get('deferred'): - return a - sup(self, *a) - ityp = type(self.inner_rsc) - if ityp == FILE: - slavepath = self.inner_rsc.path - elif ityp == GLUSTER: - slavepath = "/proc/%d/cwd" % self.server.pid() - else: - raise NotImplementedError - self.slaveurl = ':'.join([self.remote_addr, slavepath]) - - def connect_remote(self, go_daemon=None): - """connect to inner slave url through outer ssh url - - Wrap the connecting utility in ssh. - - Much care is put into daemonizing: in that case - ssh is started before daemonization, but - RePCe client is to be created after that (as ssh - interactive password auth would be defeated by - a daemonized ssh, while client should be present - only in the final process). In that case the action - is taken apart to two parts, this method is ivoked - once pre-daemon, once post-daemon. Use @go_daemon - to deiced what part to perform. - - [NB. ATM gluster product does not makes use of interactive - authentication.] - """ - if go_daemon == 'done': - return self.start_fd_client(*self.fd_pair) - gconf.setup_ssh_ctl(tempfile.mkdtemp(prefix='gsyncd-aux-ssh-')) - deferred = go_daemon == 'postconn' - ret = sup(self, gconf.ssh_command.split() + gconf.ssh_ctl_args + [self.remote_addr], slave=self.inner_rsc.url, deferred=deferred) - if deferred: - # send a message to peer so that we can wait for - # the answer from which we know connection is - # established and we can proceed with daemonization - # (doing that too early robs the ssh passwd prompt...) - # However, we'd better not start the RepceClient - # before daemonization (that's not preserved properly - # in daemon), we just do a an ad-hoc linear put/get. - i, o = ret - inf = os.fdopen(i) - repce.send(o, None, '__repce_version__') - select((inf,), (), ()) - repce.recv(inf) - # hack hack hack: store a global reference to the file - # to save it from getting GC'd which implies closing it - gconf.permanent_handles.append(inf) - self.fd_pair = (i, o) - return 'should' - - def rsync(self, files): - return sup(self, files, '-e', " ".join(gconf.ssh_command.split() + gconf.ssh_ctl_args), - *(gconf.rsync_ssh_options.split() + [self.slaveurl])) diff --git a/xlators/features/marker/utils/syncdaemon/syncdutils.py b/xlators/features/marker/utils/syncdaemon/syncdutils.py deleted file mode 100644 index 0764c07904d..00000000000 --- a/xlators/features/marker/utils/syncdaemon/syncdutils.py +++ /dev/null @@ -1,288 +0,0 @@ -import os -import sys -import pwd -import time -import fcntl -import shutil -import logging -from threading import Lock, Thread as baseThread -from errno import EACCES, EAGAIN, EPIPE, ENOTCONN, ECONNABORTED, EINTR, errorcode -from signal import signal, SIGTERM, SIGKILL -from time import sleep -import select as oselect -from os import waitpid as owaitpid -try: - from cPickle import PickleError -except ImportError: - # py 3 - from pickle import PickleError - -from gconf import gconf - -try: - # py 3 - from urllib import parse as urllib -except ImportError: - import urllib - -def escape(s): - """the chosen flavor of string escaping, used all over - to turn whatever data to creatable representation""" - return urllib.quote_plus(s) - -def unescape(s): - """inverse of .escape""" - return urllib.unquote_plus(s) - -def norm(s): - if s: - return s.replace('-', '_') - -def update_file(path, updater, merger = lambda f: True): - """update a file in a transaction-like manner""" - - fr = fw = None - try: - fd = os.open(path, os.O_CREAT|os.O_RDWR) - try: - fr = os.fdopen(fd, 'r+b') - except: - os.close(fd) - raise - fcntl.lockf(fr, fcntl.LOCK_EX) - if not merger(fr): - return - - tmpp = path + '.tmp.' + str(os.getpid()) - fd = os.open(tmpp, os.O_CREAT|os.O_EXCL|os.O_WRONLY) - try: - fw = os.fdopen(fd, 'wb', 0) - except: - os.close(fd) - raise - updater(fw) - os.fsync(fd) - os.rename(tmpp, path) - finally: - for fx in (fr, fw): - if fx: - fx.close() - -def grabfile(fname, content=None): - """open @fname + contest for its fcntl lock - - @content: if given, set the file content to it - """ - # damn those messy open() mode codes - fd = os.open(fname, os.O_CREAT|os.O_RDWR) - f = os.fdopen(fd, 'r+b', 0) - try: - fcntl.lockf(f, fcntl.LOCK_EX|fcntl.LOCK_NB) - except: - ex = sys.exc_info()[1] - f.close() - if isinstance(ex, IOError) and ex.errno in (EACCES, EAGAIN): - # cannot grab, it's taken - return - raise - if content: - try: - f.truncate() - f.write(content) - except: - f.close() - raise - gconf.permanent_handles.append(f) - return f - -def grabpidfile(fname=None, setpid=True): - """.grabfile customization for pid files""" - if not fname: - fname = gconf.pid_file - content = None - if setpid: - content = str(os.getpid()) + '\n' - return grabfile(fname, content=content) - -final_lock = Lock() - -def finalize(*a, **kw): - """all those messy final steps we go trough upon termination - - Do away with pidfile, ssh control dir and logging. - """ - final_lock.acquire() - if getattr(gconf, 'pid_file', None): - rm_pidf = gconf.pid_file_owned - if gconf.cpid: - # exit path from parent branch of daemonization - rm_pidf = False - while True: - f = grabpidfile(setpid=False) - if not f: - # child has already taken over pidfile - break - if os.waitpid(gconf.cpid, os.WNOHANG)[0] == gconf.cpid: - # child has terminated - rm_pidf = True - break; - time.sleep(0.1) - if rm_pidf: - try: - os.unlink(gconf.pid_file) - except: - ex = sys.exc_info()[1] - if ex.errno == ENOENT: - pass - else: - raise - if gconf.ssh_ctl_dir and not gconf.cpid: - shutil.rmtree(gconf.ssh_ctl_dir) - if getattr(gconf, 'state_socket', None): - try: - os.unlink(gconf.state_socket) - except: - if sys.exc_info()[0] == OSError: - pass - if gconf.log_exit: - logging.info("exiting.") - sys.stdout.flush() - sys.stderr.flush() - os._exit(kw.get('exval', 0)) - -def log_raise_exception(excont): - """top-level exception handler - - Try to some fancy things to cover up we face with an error. - Translate some weird sounding but well understood exceptions - into human-friendly lingo - """ - is_filelog = False - for h in logging.getLogger().handlers: - fno = getattr(getattr(h, 'stream', None), 'fileno', None) - if fno and not os.isatty(fno()): - is_filelog = True - - exc = sys.exc_info()[1] - if isinstance(exc, SystemExit): - excont.exval = exc.code or 0 - raise - else: - logtag = None - if isinstance(exc, GsyncdError): - if is_filelog: - logging.error(exc.args[0]) - sys.stderr.write('failure: ' + exc.args[0] + '\n') - elif isinstance(exc, PickleError) or isinstance(exc, EOFError) or \ - ((isinstance(exc, OSError) or isinstance(exc, IOError)) and \ - exc.errno == EPIPE): - logging.error('connection to peer is broken') - if hasattr(gconf, 'transport'): - gconf.transport.wait() - if gconf.transport.returncode == 127: - logging.warn("!!!!!!!!!!!!!") - logging.warn('!!! getting "No such file or directory" errors ' - "is most likely due to MISCONFIGURATION, please consult " - "http://access.redhat.com/knowledge/docs/en-US/Red_Hat_Storage/2.0/html/Administration_Guide/chap-User_Guide-Geo_Rep-Preparation-Settingup_Environment.html") - logging.warn("!!!!!!!!!!!!!") - gconf.transport.terminate_geterr() - elif isinstance(exc, OSError) and exc.errno in (ENOTCONN, ECONNABORTED): - logging.error('glusterfs session went down [%s]', errorcode[exc.errno]) - else: - logtag = "FAIL" - if not logtag and logging.getLogger().isEnabledFor(logging.DEBUG): - logtag = "FULL EXCEPTION TRACE" - if logtag: - logging.exception(logtag + ": ") - sys.stderr.write("failed with %s.\n" % type(exc).__name__) - excont.exval = 1 - sys.exit(excont.exval) - - -class FreeObject(object): - """wildcard class for which any attribute can be set""" - - def __init__(self, **kw): - for k,v in kw.items(): - setattr(self, k, v) - -class Thread(baseThread): - """thread class flavor for gsyncd - - - always a daemon thread - - force exit for whole program if thread - function coughs up an exception - """ - def __init__(self, *a, **kw): - tf = kw.get('target') - if tf: - def twrap(*aa): - excont = FreeObject(exval = 0) - try: - tf(*aa) - except: - try: - log_raise_exception(excont) - finally: - finalize(exval = excont.exval) - kw['target'] = twrap - baseThread.__init__(self, *a, **kw) - self.setDaemon(True) - -class GsyncdError(Exception): - pass - -def getusername(uid = None): - if uid == None: - uid = os.geteuid() - return pwd.getpwuid(uid).pw_name - -def privileged(): - return os.geteuid() == 0 - -def boolify(s): - """ - Generic string to boolean converter - - return - - Quick return if string 's' is of type bool - - True if it's in true_list - - False if it's in false_list - - Warn if it's not present in either and return False - """ - true_list = ['true', 'yes', '1', 'on'] - false_list = ['false', 'no', '0', 'off'] - - if isinstance(s, bool): - return s - - rv = False - lstr = s.lower() - if lstr in true_list: - rv = True - elif not lstr in false_list: - logging.warn("Unknown string (%s) in string to boolean conversion defaulting to False\n" % (s)) - - return rv - -def eintr_wrap(func, exc, *a): - """ - wrapper around syscalls resilient to interrupt caused - by signals - """ - while True: - try: - return func(*a) - except exc: - ex = sys.exc_info()[1] - if not ex.args[0] == EINTR: - raise - -def select(*a): - return eintr_wrap(oselect.select, oselect.error, *a) - -def waitpid (*a): - return eintr_wrap(owaitpid, OSError, *a) - -def set_term_handler(hook=lambda *a: finalize(*a, **{'exval': 1})): - signal(SIGTERM, hook) -- cgit