diff options
Diffstat (limited to 'geo-replication')
26 files changed, 870 insertions, 608 deletions
diff --git a/geo-replication/gsyncd.conf.in b/geo-replication/gsyncd.conf.in index 6160c7c7091..9688c79fab7 100644 --- a/geo-replication/gsyncd.conf.in +++ b/geo-replication/gsyncd.conf.in @@ -23,6 +23,11 @@ configurable=false type=int value=1 +[master-distribution-count] +configurable=false +type=int +value=1 + [glusterd-workdir] value = @GLUSTERD_WORKDIR@ @@ -109,7 +114,7 @@ type=int help=Minimum time interval in seconds for passive worker to become Active [changelog-archive-format] -value=%%Y%%m +value=%Y%m help=Processed changelogs will be archived in working directory. Pattern for archive file [use-meta-volume] @@ -118,7 +123,7 @@ type=bool help=Use this to set Active Passive mode to meta-volume. [meta-volume-mnt] -value=/var/run/gluster/shared_storage +value=/run/gluster/shared_storage help=Meta Volume or Shared Volume mount path [allow-network] @@ -128,10 +133,11 @@ value= value=5 type=int -[use-tarssh] -value=false -type=bool -help=Use sync-mode as tarssh +[sync-method] +value=rsync +help=Sync method for data sync. Available methods are tar over ssh and rsync. Default is rsync. +validation=choice +allowed_values=tarssh,rsync [remote-gsyncd] value = @@ -260,7 +266,9 @@ allowed_values=ERROR,INFO,WARNING,DEBUG [ssh-port] value=22 -validation=int +validation=minmax +min=1 +max=65535 help=Set SSH port type=int diff --git a/geo-replication/setup.py b/geo-replication/setup.py index 6d678baa2f7..0eae469d2d6 100644 --- a/geo-replication/setup.py +++ b/geo-replication/setup.py @@ -1,7 +1,7 @@ # # Copyright (c) 2011-2014 Red Hat, Inc. <http://www.redhat.com> # This file is part of GlusterFS. - +# # This file is licensed to you under your choice of the GNU Lesser # General Public License, version 3 or any later version (LGPLv3 or # later), or the GNU General Public License, version 2 (GPLv2), in all @@ -20,11 +20,11 @@ setup( name=name, version="", description='GlusterFS Geo Replication', - license='', + license='GPLV2 and LGPLV3+', author='Red Hat, Inc.', author_email='gluster-devel@gluster.org', url='http://www.gluster.org', - packages=['syncdaemon', ], + packages=[name, ], test_suite='nose.collector', install_requires=[], scripts=[], diff --git a/geo-replication/src/gsyncd.c b/geo-replication/src/gsyncd.c index cf0e76f69c4..b5aeec5bf33 100644 --- a/geo-replication/src/gsyncd.c +++ b/geo-replication/src/gsyncd.c @@ -7,8 +7,8 @@ later), or the GNU General Public License, version 2 (GPLv2), in all cases as published by the Free Software Foundation. */ -#include "compat.h" -#include "syscall.h" +#include <glusterfs/compat.h> +#include <glusterfs/syscall.h> #include <stdlib.h> #include <stdio.h> @@ -24,13 +24,13 @@ * We unconditionally pass then while building gsyncd binary. */ #ifdef USE_LIBGLUSTERFS -#include "glusterfs.h" -#include "globals.h" -#include "defaults.h" +#include <glusterfs/glusterfs.h> +#include <glusterfs/globals.h> +#include <glusterfs/defaults.h> #endif -#include "common-utils.h" -#include "run.h" +#include <glusterfs/common-utils.h> +#include <glusterfs/run.h> #include "procdiggy.h" #define _GLUSTERD_CALLED_ "_GLUSTERD_CALLED_" diff --git a/geo-replication/src/gverify.sh b/geo-replication/src/gverify.sh index d048de0992b..f5f70d245e0 100755 --- a/geo-replication/src/gverify.sh +++ b/geo-replication/src/gverify.sh @@ -94,6 +94,7 @@ echo $cmd_line; function master_stats() { MASTERVOL=$1; + local inet6=$2; local d; local i; local disk_size; @@ -102,7 +103,12 @@ function master_stats() local m_status; d=$(mktemp -d -t ${0##*/}.XXXXXX 2>/dev/null); - glusterfs -s localhost --xlator-option="*dht.lookup-unhashed=off" --volfile-id $MASTERVOL -l $master_log_file $d; + if [ "$inet6" = "inet6" ]; then + glusterfs -s localhost --xlator-option="*dht.lookup-unhashed=off" --xlator-option="transport.address-family=inet6" --volfile-id $MASTERVOL -l $master_log_file $d; + else + glusterfs -s localhost --xlator-option="*dht.lookup-unhashed=off" --volfile-id $MASTERVOL -l $master_log_file $d; + fi + i=$(get_inode_num $d); if [[ "$i" -ne "1" ]]; then echo 0:0; @@ -124,12 +130,18 @@ function slave_stats() SLAVEUSER=$1; SLAVEHOST=$2; SLAVEVOL=$3; + local inet6=$4; local cmd_line; local ver; local status; d=$(mktemp -d -t ${0##*/}.XXXXXX 2>/dev/null); - glusterfs --xlator-option="*dht.lookup-unhashed=off" --volfile-server $SLAVEHOST --volfile-id $SLAVEVOL -l $slave_log_file $d; + if [ "$inet6" = "inet6" ]; then + glusterfs --xlator-option="*dht.lookup-unhashed=off" --xlator-option="transport.address-family=inet6" --volfile-server $SLAVEHOST --volfile-id $SLAVEVOL -l $slave_log_file $d; + else + glusterfs --xlator-option="*dht.lookup-unhashed=off" --volfile-server $SLAVEHOST --volfile-id $SLAVEVOL -l $slave_log_file $d; + fi + i=$(get_inode_num $d); if [[ "$i" -ne "1" ]]; then echo 0:0; @@ -167,6 +179,10 @@ function main() log_file=$6 > $log_file + inet6=$7 + local cmd_line + local ver + # Use FORCE_BLOCKER flag in the error message to differentiate # between the errors which the force command should bypass @@ -192,20 +208,21 @@ function main() exit 1; fi; + cmd_line=$(cmd_slave); if [[ -z "${GR_SSH_IDENTITY_KEY}" ]]; then - err=$((ssh -p ${SSH_PORT} -oNumberOfPasswordPrompts=0 -oStrictHostKeyChecking=no $2@$3 "gluster --version") 2>&1) + ver=$(ssh -p ${SSH_PORT} -oNumberOfPasswordPrompts=0 -oStrictHostKeyChecking=no $2@$3 bash -c "'$cmd_line'") else - err=$((ssh -p ${SSH_PORT} -i ${GR_SSH_IDENTITY_KEY} -oNumberOfPasswordPrompts=0 -oStrictHostKeyChecking=no $2@$3 "gluster --version") 2>&1) + ver=$(ssh -p ${SSH_PORT} -i ${GR_SSH_IDENTITY_KEY} -oNumberOfPasswordPrompts=0 -oStrictHostKeyChecking=no $2@$3 bash -c "'$cmd_line'") fi - if [ $? -ne 0 ]; then - echo "FORCE_BLOCKER|gluster command on $2@$3 failed. Error: $err" > $log_file + if [ -z "$ver" ]; then + echo "FORCE_BLOCKER|gluster command not found on $3 for user $2." > $log_file exit 1; fi; ERRORS=0; - master_data=$(master_stats $1); - slave_data=$(slave_stats $2 $3 $4); + master_data=$(master_stats $1 ${inet6}); + slave_data=$(slave_stats $2 $3 $4 ${inet6}); master_disk_size=$(echo $master_data | cut -f1 -d':'); slave_disk_size=$(echo $slave_data | cut -f1 -d':'); master_used_size=$(echo $master_data | cut -f2 -d':'); diff --git a/geo-replication/src/peer_georep-sshkey.py.in b/geo-replication/src/peer_georep-sshkey.py.in index 2196fd7491a..58696e9a616 100644 --- a/geo-replication/src/peer_georep-sshkey.py.in +++ b/geo-replication/src/peer_georep-sshkey.py.in @@ -30,8 +30,8 @@ from prettytable import PrettyTable SECRET_PEM = "@GLUSTERD_WORKDIR@/geo-replication/secret.pem" TAR_SSH_PEM = "@GLUSTERD_WORKDIR@/geo-replication/tar_ssh.pem" -GSYNCD_CMD = 'command="@GLUSTERFS_LIBEXECDIR@/gsyncd" ' -TAR_CMD = 'command="tar ${SSH_ORIGINAL_COMMAND#* }" ' +GSYNCD_CMD = 'command="@GLUSTERFS_LIBEXECDIR@/gsyncd" ' +TAR_CMD = 'command="tar ${SSH_ORIGINAL_COMMAND#* }" ' COMMON_SECRET_FILE = "@GLUSTERD_WORKDIR@/geo-replication/common_secret.pem.pub" diff --git a/geo-replication/src/peer_gsec_create.in b/geo-replication/src/peer_gsec_create.in index 05c1638bdcd..6d4a4847013 100755 --- a/geo-replication/src/peer_gsec_create.in +++ b/geo-replication/src/peer_gsec_create.in @@ -18,7 +18,7 @@ if [ "Xcontainer" = "X$1" ]; then output1=`cat "$GLUSTERD_WORKDIR"/geo-replication/secret.pem.pub` output2=`cat "$GLUSTERD_WORKDIR"/geo-replication/tar_ssh.pem.pub` else - output1=`echo command=\"${libexecdir}/glusterfs/gsyncd\" " "``cat "$GLUSTERD_WORKDIR"/geo-replication/secret.pem.pub` - output2=`echo command=\"tar \$\{SSH_ORIGINAL_COMMAND#* \}\" " "``cat "$GLUSTERD_WORKDIR"/geo-replication/tar_ssh.pem.pub` + output1=`echo command=\"${libexecdir}/glusterfs/gsyncd\" ""``cat "$GLUSTERD_WORKDIR"/geo-replication/secret.pem.pub` + output2=`echo command=\"tar \$\{SSH_ORIGINAL_COMMAND#* \}\" ""``cat "$GLUSTERD_WORKDIR"/geo-replication/tar_ssh.pem.pub` fi echo -e "$output1\n$output2" diff --git a/geo-replication/src/peer_mountbroker.py.in b/geo-replication/src/peer_mountbroker.py.in index 83d385cb5eb..40b90ffc560 100644 --- a/geo-replication/src/peer_mountbroker.py.in +++ b/geo-replication/src/peer_mountbroker.py.in @@ -11,6 +11,7 @@ from gluster.cliutils import (execute, Cmd, node_output_ok, from prettytable import PrettyTable LOG_DIR = "@localstatedir@/log/glusterfs/geo-replication-slaves" +CLI_LOG = "@localstatedir@/log/glusterfs/cli.log" GEOREP_DIR = "@GLUSTERD_WORKDIR@/geo-replication" GLUSTERD_VOLFILE = "@GLUSTERD_VOLFILE@" @@ -46,7 +47,7 @@ class MountbrokerUserMgmt(object): for line in f: line = line.strip() if line.startswith("option "): - key, value = line.split(" ")[1:] + key, value = line.split()[1:] self._options[key] = value if line.startswith("#"): self.commented_lines.append(line) @@ -145,7 +146,7 @@ class NodeSetup(Cmd): # chgrp -R <grp> /var/log/glusterfs/geo-replication-slaves # chgrp -R <grp> /var/lib/glusterd/geo-replication # chmod -R 770 /var/log/glusterfs/geo-replication-slaves - # chmod -R 770 /var/lib/glusterd/geo-replication + # chmod 770 /var/lib/glusterd/geo-replication # mkdir -p <mnt_root> # chmod 0711 <mnt_root> # If selinux, @@ -195,8 +196,13 @@ class NodeSetup(Cmd): execute(["chgrp", "-R", args.group, GEOREP_DIR]) execute(["chgrp", "-R", args.group, LOG_DIR]) - execute(["chmod", "-R", "770", GEOREP_DIR]) - execute(["chmod", "-R", "770", args.group, LOG_DIR]) + execute(["chgrp", args.group, CLI_LOG]) + execute(["chmod", "770", GEOREP_DIR]) + execute(["find", LOG_DIR, "-type", "d", "-exec", "chmod", "770", "{}", + "+"]) + execute(["find", LOG_DIR, "-type", "f", "-exec", "chmod", "660", "{}", + "+"]) + execute(["chmod", "660", CLI_LOG]) m.set_mount_root_and_group(args.mount_root, args.group) m.save() @@ -216,8 +222,10 @@ class CliSetup(Cmd): name = "setup" def args(self, parser): - parser.add_argument("mount_root") - parser.add_argument("group") + parser.add_argument("mount_root", + help="Path to the mountbroker-root directory.") + parser.add_argument("group", + help="Group to be used for setup.") def run(self, args): out = execute_in_peers("node-setup", [args.mount_root, @@ -273,7 +281,7 @@ class CliStatus(Cmd): for p in out: node_data = p.output - if node_data == "": + if node_data == "" or node_data == "N/A": node_data = {} users_row_data = "" @@ -327,8 +335,10 @@ class CliAdd(Cmd): name = "add" def args(self, parser): - parser.add_argument("volume") - parser.add_argument("user") + parser.add_argument("volume", + help="Volume to be added.") + parser.add_argument("user", + help="User for which volume is to be added.") def run(self, args): out = execute_in_peers("node-add", [args.volume, @@ -368,8 +378,9 @@ class CliRemove(Cmd): name = "remove" def args(self, parser): - parser.add_argument("--volume", default=".") - parser.add_argument("--user", default=".") + parser.add_argument("--volume", default=".", help="Volume to be removed.") + parser.add_argument("--user", default=".", + help="User for which volume has to be removed.") def run(self, args): out = execute_in_peers("node-remove", [args.volume, diff --git a/geo-replication/src/procdiggy.c b/geo-replication/src/procdiggy.c index 05c1e1edc68..8068ef79a42 100644 --- a/geo-replication/src/procdiggy.c +++ b/geo-replication/src/procdiggy.c @@ -15,8 +15,8 @@ #include <ctype.h> #include <sys/param.h> /* for PATH_MAX */ -#include "common-utils.h" -#include "syscall.h" +#include <glusterfs/common-utils.h> +#include <glusterfs/syscall.h> #include "procdiggy.h" pid_t @@ -31,6 +31,10 @@ pidinfo(pid_t pid, char **name) }; char *p = NULL; int ret = 0; + pid_t lpid = -1; + + if (name) + *name = NULL; snprintf(path, sizeof path, PROC "/%d/status", pid); @@ -38,14 +42,12 @@ pidinfo(pid_t pid, char **name) if (!f) return -1; - if (name) - *name = NULL; for (;;) { size_t len; memset(buf, 0, sizeof(buf)); if (fgets(buf, sizeof(buf), f) == NULL || (len = strlen(buf)) == 0 || buf[len - 1] != '\n') { - pid = -1; + lpid = -1; goto out; } buf[len - 1] = '\0'; @@ -57,7 +59,7 @@ pidinfo(pid_t pid, char **name) ; *name = gf_strdup(p); if (!*name) { - pid = -2; + lpid = -2; goto out; } continue; @@ -71,17 +73,17 @@ pidinfo(pid_t pid, char **name) while (isspace(*++p)) ; - ret = gf_string2int(p, &pid); + ret = gf_string2int(p, &lpid); if (ret == -1) - pid = -1; + lpid = -1; out: fclose(f); - if (pid == -1 && name && *name) + if (lpid == -1 && name && *name) GF_FREE(*name); - if (pid == -2) + if (lpid == -2) fprintf(stderr, "out of memory\n"); - return pid; + return lpid; } int diff --git a/geo-replication/src/set_geo_rep_pem_keys.sh b/geo-replication/src/set_geo_rep_pem_keys.sh index ae23f4ff0c6..8a43fa39d1f 100755 --- a/geo-replication/src/set_geo_rep_pem_keys.sh +++ b/geo-replication/src/set_geo_rep_pem_keys.sh @@ -47,6 +47,7 @@ function main() cp $home_dir/${COMMON_SECRET_PEM_PUB} ${GLUSTERD_WORKDIR}/geo-replication/ gluster system:: copy file /geo-replication/${COMMON_SECRET_PEM_PUB} gluster system:: execute add_secret_pub $user geo-replication/${master_vol}_${slave_vol}_common_secret.pem.pub + gluster vol set ${slave_vol} features.read-only on else echo "$home_dir/common_secret.pem.pub not present. Please run geo-replication command on master with push-pem option to generate the file" exit 1; diff --git a/geo-replication/syncdaemon/Makefile.am b/geo-replication/syncdaemon/Makefile.am index 62c5ce7fe30..d70e3368faf 100644 --- a/geo-replication/syncdaemon/Makefile.am +++ b/geo-replication/syncdaemon/Makefile.am @@ -2,7 +2,7 @@ syncdaemondir = $(GLUSTERFS_LIBEXECDIR)/python/syncdaemon syncdaemon_PYTHON = rconf.py gsyncd.py __init__.py master.py README.md repce.py \ resource.py syncdutils.py monitor.py libcxattr.py gsyncdconfig.py \ - libgfchangelog.py changelogagent.py gsyncdstatus.py conf.py logutils.py \ + libgfchangelog.py gsyncdstatus.py conf.py logutils.py \ subcmds.py argsupgrade.py py2py3.py CLEANFILES = diff --git a/geo-replication/syncdaemon/README.md b/geo-replication/syncdaemon/README.md index 2a202e3f99e..5ab785ae669 100644 --- a/geo-replication/syncdaemon/README.md +++ b/geo-replication/syncdaemon/README.md @@ -19,7 +19,6 @@ INSTALLATION As of now, the supported way of operation is running from the source directory or using the RPMs given. -If you use Python 2.4.x, you need to install the [Ctypes module](http://python.net/crew/theller/ctypes/). CONFIGURATION ------------- diff --git a/geo-replication/syncdaemon/argsupgrade.py b/geo-replication/syncdaemon/argsupgrade.py index 4018143b8b4..7af40633ef8 100644 --- a/geo-replication/syncdaemon/argsupgrade.py +++ b/geo-replication/syncdaemon/argsupgrade.py @@ -84,6 +84,10 @@ def upgrade(): # fail when it does stat to check the existence. init_gsyncd_template_conf() + inet6 = False + if "--inet6" in sys.argv: + inet6 = True + if "--monitor" in sys.argv: # python gsyncd.py --path=/bricks/b1 # --monitor -c gsyncd.conf @@ -147,8 +151,11 @@ def upgrade(): user, hname = remote_addr.split("@") + if not inet6: + hname = gethostbyname(hname) + print(("ssh://%s@%s:gluster://127.0.0.1:%s" % ( - user, gethostbyname(hname), vol))) + user, hname, vol))) sys.exit(0) elif "--normalize-url" in sys.argv: @@ -346,3 +353,7 @@ def upgrade(): if pargs.reset_sync_time: sys.argv.append("--reset-sync-time") + + if inet6: + # Add `--inet6` as first argument + sys.argv = [sys.argv[0], "--inet6"] + sys.argv[1:] diff --git a/geo-replication/syncdaemon/changelogagent.py b/geo-replication/syncdaemon/changelogagent.py deleted file mode 100644 index c5fdbc3a74f..00000000000 --- a/geo-replication/syncdaemon/changelogagent.py +++ /dev/null @@ -1,78 +0,0 @@ -#!/usr/bin/python3 -# -# Copyright (c) 2011-2014 Red Hat, Inc. <http://www.redhat.com> -# This file is part of GlusterFS. - -# This file is licensed to you under your choice of the GNU Lesser -# General Public License, version 3 or any later version (LGPLv3 or -# later), or the GNU General Public License, version 2 (GPLv2), in all -# cases as published by the Free Software Foundation. -# - -import logging -import syncdutils -from syncdutils import select, CHANGELOG_AGENT_SERVER_VERSION -from repce import RepceServer - - -class _MetaChangelog(object): - - def __getattr__(self, meth): - from libgfchangelog import Changes as LChanges - xmeth = [m for m in dir(LChanges) if m[0] != '_'] - if meth not in xmeth: - return - for m in xmeth: - setattr(self, m, getattr(LChanges, m)) - return getattr(self, meth) - -Changes = _MetaChangelog() - - -class Changelog(object): - def version(self): - return CHANGELOG_AGENT_SERVER_VERSION - - def init(self): - return Changes.cl_init() - - def register(self, cl_brick, cl_dir, cl_log, cl_level, retries=0): - return Changes.cl_register(cl_brick, cl_dir, cl_log, cl_level, retries) - - def scan(self): - return Changes.cl_scan() - - def getchanges(self): - return Changes.cl_getchanges() - - def done(self, clfile): - return Changes.cl_done(clfile) - - def history(self, changelog_path, start, end, num_parallel): - return Changes.cl_history_changelog(changelog_path, start, end, - num_parallel) - - def history_scan(self): - return Changes.cl_history_scan() - - def history_getchanges(self): - return Changes.cl_history_getchanges() - - def history_done(self, clfile): - return Changes.cl_history_done(clfile) - - -class ChangelogAgent(object): - def __init__(self, obj, fd_tup): - (inf, ouf, rw, ww) = fd_tup.split(',') - repce = RepceServer(obj, int(inf), int(ouf), 1) - t = syncdutils.Thread(target=lambda: (repce.service_loop(), - syncdutils.finalize())) - t.start() - logging.info('Agent listining...') - - select((), (), ()) - - -def agent(obj, fd_tup): - return ChangelogAgent(obj, fd_tup) diff --git a/geo-replication/syncdaemon/gsyncd.py b/geo-replication/syncdaemon/gsyncd.py index 037f351151b..257ed72c6ae 100644 --- a/geo-replication/syncdaemon/gsyncd.py +++ b/geo-replication/syncdaemon/gsyncd.py @@ -22,8 +22,8 @@ import gsyncdconfig as gconf from rconf import rconf import subcmds from conf import GLUSTERD_WORKDIR, GLUSTERFS_CONFDIR, GCONF_VERSION -from syncdutils import set_term_handler, finalize, lf -from syncdutils import log_raise_exception, FreeObject, escape +from syncdutils import (set_term_handler, finalize, lf, + log_raise_exception, FreeObject, escape) import argsupgrade @@ -47,6 +47,7 @@ def main(): sys.exit(0) parser = ArgumentParser() + parser.add_argument("--inet6", action="store_true") sp = parser.add_subparsers(dest="subcmd") # Monitor Status File update @@ -78,8 +79,6 @@ def main(): help="feedback fd between monitor and worker") p.add_argument("--local-node", help="Local master node") p.add_argument("--local-node-id", help="Local Node ID") - p.add_argument("--rpc-fd", - help="Read and Write fds for worker-agent communication") p.add_argument("--subvol-num", type=int, help="Subvolume number") p.add_argument("--is-hottier", action="store_true", help="Is this brick part of hot tier") @@ -91,19 +90,6 @@ def main(): p.add_argument("-c", "--config-file", help="Config File") p.add_argument("--debug", action="store_true") - # Agent - p = sp.add_parser("agent") - p.add_argument("master", help="Master Volume Name") - p.add_argument("slave", help="Slave details user@host::vol format") - p.add_argument("--local-path", help="Local brick path") - p.add_argument("--local-node", help="Local master node") - p.add_argument("--local-node-id", help="Local Node ID") - p.add_argument("--slave-id", help="Slave Volume ID") - p.add_argument("--rpc-fd", - help="Read and Write fds for worker-agent communication") - p.add_argument("-c", "--config-file", help="Config File") - p.add_argument("--debug", action="store_true") - # Slave p = sp.add_parser("slave") p.add_argument("master", help="Master Volume Name") @@ -133,6 +119,8 @@ def main(): help="Directory where Gluster binaries exist on slave") p.add_argument("--slave-access-mount", action="store_true", help="Do not lazy umount the slave volume") + p.add_argument("--master-dist-count", type=int, + help="Master Distribution count") # Status p = sp.add_parser("status") @@ -228,7 +216,8 @@ def main(): # Set default path for config file in that case # If an subcmd accepts config file then it also accepts # master and Slave arguments. - if config_file is None and hasattr(args, "config_file"): + if config_file is None and hasattr(args, "config_file") \ + and args.subcmd != "slave": config_file = "%s/geo-replication/%s_%s_%s/gsyncd.conf" % ( GLUSTERD_WORKDIR, args.master, @@ -252,6 +241,12 @@ def main(): if args.subcmd == "slave": override_from_args = True + if config_file is not None and \ + args.subcmd in ["monitor", "config-get", "config-set", "config-reset"]: + ret = gconf.is_config_file_old(config_file, args.master, extra_tmpl_args["slavevol"]) + if ret is not None: + gconf.config_upgrade(config_file, ret) + # Load Config file gconf.load(GLUSTERFS_CONFDIR + "/gsyncd.conf", config_file, @@ -261,8 +256,8 @@ def main(): # Default label to print in log file label = args.subcmd - if args.subcmd in ("worker", "agent"): - # If Worker or agent, then add brick path also to label + if args.subcmd in ("worker"): + # If Worker, then add brick path also to label label = "%s %s" % (args.subcmd, args.local_path) elif args.subcmd == "slave": # If Slave add Master node and Brick details @@ -305,7 +300,7 @@ def main(): # Log message for loaded config file if config_file is not None: - logging.info(lf("Using session config file", path=config_file)) + logging.debug(lf("Using session config file", path=config_file)) set_term_handler() excont = FreeObject(exval=0) diff --git a/geo-replication/syncdaemon/gsyncdconfig.py b/geo-replication/syncdaemon/gsyncdconfig.py index 5d439a4c5ee..8848071997a 100644 --- a/geo-replication/syncdaemon/gsyncdconfig.py +++ b/geo-replication/syncdaemon/gsyncdconfig.py @@ -10,12 +10,14 @@ # try: - from configparser import ConfigParser, NoSectionError + from ConfigParser import RawConfigParser, NoSectionError except ImportError: - from ConfigParser import ConfigParser, NoSectionError + from configparser import RawConfigParser, NoSectionError import os +import shutil from string import Template from datetime import datetime +from threading import Lock # Global object which can be used in other modules @@ -34,6 +36,7 @@ class GconfInvalidValue(Exception): class Gconf(object): def __init__(self, default_conf_file, custom_conf_file=None, args={}, extra_tmpl_args={}, override_from_args=False): + self.lock = Lock() self.default_conf_file = default_conf_file self.custom_conf_file = custom_conf_file self.tmp_conf_file = None @@ -91,7 +94,7 @@ class Gconf(object): if name != "all" and not self._is_configurable(name): raise GconfNotConfigurable() - cnf = ConfigParser() + cnf = RawConfigParser() with open(self.custom_conf_file) as f: cnf.readfp(f) @@ -135,7 +138,7 @@ class Gconf(object): if curr_val == value: return True - cnf = ConfigParser() + cnf = RawConfigParser() with open(self.custom_conf_file) as f: cnf.readfp(f) @@ -162,6 +165,11 @@ class Gconf(object): if value is not None and not self._is_valid_value(name, value): raise GconfInvalidValue() + + def _load_with_lock(self): + with self.lock: + self._load() + def _load(self): self.gconf = {} self.template_conf = [] @@ -170,7 +178,7 @@ class Gconf(object): self.session_conf_items = [] self.default_values = {} - conf = ConfigParser() + conf = RawConfigParser() # Default Template config file with open(self.default_conf_file) as f: conf.readfp(f) @@ -229,12 +237,19 @@ class Gconf(object): self._tmpl_substitute() self._do_typecast() - def reload(self): + def reload(self, with_lock=True): if self._is_config_changed(): - self._load() + if with_lock: + self._load_with_lock() + else: + self._load() - def get(self, name, default_value=None): - return self.gconf.get(name, default_value) + def get(self, name, default_value=None, with_lock=True): + if with_lock: + with self.lock: + return self.gconf.get(name, default_value) + else: + return self.gconf.get(name, default_value) def getall(self, show_defaults=False, show_non_configurable=False): cnf = {} @@ -275,8 +290,9 @@ class Gconf(object): return cnf def getr(self, name, default_value=None): - self.reload() - return self.get(name, default_value) + with self.lock: + self.reload(with_lock=False) + return self.get(name, default_value, with_lock=False) def get_help(self, name=None): pass @@ -313,6 +329,9 @@ class Gconf(object): if item["validation"] == "unixtime": return validate_unixtime(value) + if item["validation"] == "int": + return validate_int(value) + return False def _is_config_changed(self): @@ -325,6 +344,53 @@ class Gconf(object): return False +def is_config_file_old(config_file, mastervol, slavevol): + cnf = RawConfigParser() + cnf.read(config_file) + session_section = "peers %s %s" % (mastervol, slavevol) + try: + return dict(cnf.items(session_section)) + except NoSectionError: + return None + +def config_upgrade(config_file, ret): + config_file_backup = os.path.join(os.path.dirname(config_file), "gsyncd.conf.bkp") + + #copy old config file in a backup file + shutil.copyfile(config_file, config_file_backup) + + #write a new config file + config = RawConfigParser() + config.add_section('vars') + + for key, value in ret.items(): + #handle option name changes + if key == "use_tarssh": + new_key = "sync-method" + if value == "true": + new_value = "tarssh" + else: + new_value = "rsync" + config.set('vars', new_key, new_value) + elif key == "timeout": + new_key = "slave-timeout" + config.set('vars', new_key, value) + #for changes like: ignore_deletes to ignore-deletes + else: + new_key = key.replace("_", "-") + config.set('vars', new_key, value) + + with open(config_file, 'w') as configfile: + config.write(configfile) + + +def validate_int(value): + try: + _ = int(value) + return True + except ValueError: + return False + def validate_unixtime(value): try: @@ -338,11 +404,13 @@ def validate_unixtime(value): def validate_minmax(value, minval, maxval): - value = int(value) - minval = int(minval) - maxval = int(maxval) - - return value >= minval and value <= maxval + try: + value = int(value) + minval = int(minval) + maxval = int(maxval) + return value >= minval and value <= maxval + except ValueError: + return False def validate_choice(value, allowed_values): diff --git a/geo-replication/syncdaemon/gsyncdstatus.py b/geo-replication/syncdaemon/gsyncdstatus.py index 72bcb092f01..1a655ff8887 100644 --- a/geo-replication/syncdaemon/gsyncdstatus.py +++ b/geo-replication/syncdaemon/gsyncdstatus.py @@ -23,8 +23,8 @@ from datetime import datetime from errno import EACCES, EAGAIN, ENOENT import logging -from syncdutils import EVENT_GEOREP_ACTIVE, EVENT_GEOREP_PASSIVE, gf_event -from syncdutils import EVENT_GEOREP_CHECKPOINT_COMPLETED, lf +from syncdutils import (EVENT_GEOREP_ACTIVE, EVENT_GEOREP_PASSIVE, gf_event, + EVENT_GEOREP_CHECKPOINT_COMPLETED, lf) DEFAULT_STATUS = "N/A" MONITOR_STATUS = ("Created", "Started", "Paused", "Stopped") diff --git a/geo-replication/syncdaemon/libcxattr.py b/geo-replication/syncdaemon/libcxattr.py index 7f3f6ce453a..e6406c36bd7 100644 --- a/geo-replication/syncdaemon/libcxattr.py +++ b/geo-replication/syncdaemon/libcxattr.py @@ -9,9 +9,9 @@ # import os -from ctypes import CDLL, create_string_buffer, get_errno -import py2py3 -from py2py3 import bytearray_to_str +from ctypes import CDLL, get_errno +from py2py3 import (bytearray_to_str, gr_create_string_buffer, + gr_query_xattr, gr_lsetxattr, gr_lremovexattr) class Xattr(object): @@ -40,7 +40,7 @@ class Xattr(object): @classmethod def _query_xattr(cls, path, siz, syscall, *a): if siz: - buf = create_string_buffer(b'\0' * siz) + buf = gr_create_string_buffer(siz) else: buf = None ret = getattr(cls.libc, syscall)(*((path,) + a + (buf, siz))) @@ -56,7 +56,7 @@ class Xattr(object): @classmethod def lgetxattr(cls, path, attr, siz=0): - return cls._query_xattr(path.encode(), siz, 'lgetxattr', attr.encode()) + return gr_query_xattr(cls, path, siz, 'lgetxattr', attr) @classmethod def lgetxattr_buf(cls, path, attr): @@ -70,7 +70,7 @@ class Xattr(object): @classmethod def llistxattr(cls, path, siz=0): - ret = cls._query_xattr(path.encode(), siz, 'llistxattr') + ret = gr_query_xattr(cls, path, siz, 'llistxattr') if isinstance(ret, str): ret = ret.strip('\0') ret = ret.split('\0') if ret else [] @@ -78,13 +78,13 @@ class Xattr(object): @classmethod def lsetxattr(cls, path, attr, val): - ret = cls.libc.lsetxattr(path.encode(), attr.encode(), val, len(val), 0) + ret = gr_lsetxattr(cls, path, attr, val) if ret == -1: cls.raise_oserr() @classmethod def lremovexattr(cls, path, attr): - ret = cls.libc.lremovexattr(path.encode(), attr.encode()) + ret = gr_lremovexattr(cls, path, attr) if ret == -1: cls.raise_oserr() diff --git a/geo-replication/syncdaemon/libgfchangelog.py b/geo-replication/syncdaemon/libgfchangelog.py index cc40fd5475d..a3bda7282c0 100644 --- a/geo-replication/syncdaemon/libgfchangelog.py +++ b/geo-replication/syncdaemon/libgfchangelog.py @@ -9,130 +9,135 @@ # import os -from ctypes import CDLL, RTLD_GLOBAL, create_string_buffer, \ - get_errno, byref, c_ulong +from ctypes import CDLL, RTLD_GLOBAL, get_errno, byref, c_ulong +from ctypes.util import find_library from syncdutils import ChangelogException, ChangelogHistoryNotAvailable +from py2py3 import (gr_cl_history_changelog, gr_cl_done, + gr_create_string_buffer, gr_cl_register, + gr_cl_history_done, bytearray_to_str) -class Changes(object): - libgfc = CDLL("libgfchangelog.so", mode=RTLD_GLOBAL, - use_errno=True) - - @classmethod - def geterrno(cls): - return get_errno() - - @classmethod - def raise_changelog_err(cls): - errn = cls.geterrno() - raise ChangelogException(errn, os.strerror(errn)) - - @classmethod - def _get_api(cls, call): - return getattr(cls.libgfc, call) - - @classmethod - def cl_init(cls): - ret = cls._get_api('gf_changelog_init')(None) - if ret == -1: - cls.raise_changelog_err() - - @classmethod - def cl_register(cls, brick, path, log_file, log_level, retries=0): - ret = cls._get_api('gf_changelog_register')(brick.encode(), path.encode(), - log_file.encode(), - log_level, retries) - if ret == -1: - cls.raise_changelog_err() - - @classmethod - def cl_scan(cls): - ret = cls._get_api('gf_changelog_scan')() - if ret == -1: - cls.raise_changelog_err() - - @classmethod - def cl_startfresh(cls): - ret = cls._get_api('gf_changelog_start_fresh')() - if ret == -1: - cls.raise_changelog_err() - - @classmethod - def cl_getchanges(cls): - """ remove hardcoding for path name length """ - def clsort(f): - return f.split('.')[-1] - changes = [] - buf = create_string_buffer(b'\0' * 4096) - call = cls._get_api('gf_changelog_next_change') - - while True: - ret = call(buf, 4096) - if ret in (0, -1): - break - changes.append(buf.raw[:ret - 1].decode()) - if ret == -1: - cls.raise_changelog_err() - # cleanup tracker - cls.cl_startfresh() - return sorted(changes, key=clsort) - - @classmethod - def cl_done(cls, clfile): - ret = cls._get_api('gf_changelog_done')(clfile.encode()) - if ret == -1: - cls.raise_changelog_err() - - @classmethod - def cl_history_scan(cls): - ret = cls._get_api('gf_history_changelog_scan')() - if ret == -1: - cls.raise_changelog_err() - - return ret - - @classmethod - def cl_history_changelog(cls, changelog_path, start, end, num_parallel): - actual_end = c_ulong() - ret = cls._get_api('gf_history_changelog')(changelog_path.encode(), start, end, - num_parallel, - byref(actual_end)) - if ret == -1: - cls.raise_changelog_err() - - if ret == -2: - raise ChangelogHistoryNotAvailable() - - return (ret, actual_end.value) - - @classmethod - def cl_history_startfresh(cls): - ret = cls._get_api('gf_history_changelog_start_fresh')() - if ret == -1: - cls.raise_changelog_err() - - @classmethod - def cl_history_getchanges(cls): - """ remove hardcoding for path name length """ - def clsort(f): - return f.split('.')[-1] - - changes = [] - buf = create_string_buffer(b'\0' * 4096) - call = cls._get_api('gf_history_changelog_next_change') - - while True: - ret = call(buf, 4096) - if ret in (0, -1): - break - changes.append(buf.raw[:ret - 1].decode()) - if ret == -1: - cls.raise_changelog_err() - - return sorted(changes, key=clsort) - - @classmethod - def cl_history_done(cls, clfile): - ret = cls._get_api('gf_history_changelog_done')(clfile.encode()) - if ret == -1: - cls.raise_changelog_err() +libgfc = CDLL( + find_library("gfchangelog"), + mode=RTLD_GLOBAL, + use_errno=True +) + + +def _raise_changelog_err(): + errn = get_errno() + raise ChangelogException(errn, os.strerror(errn)) + + +def _init(): + if libgfc.gf_changelog_init(None) == -1: + _raise_changelog_err() + + +def register(brick, path, log_file, log_level, retries=0): + _init() + + ret = gr_cl_register(libgfc, brick, path, log_file, log_level, retries) + + if ret == -1: + _raise_changelog_err() + + +def scan(): + ret = libgfc.gf_changelog_scan() + if ret == -1: + _raise_changelog_err() + + +def startfresh(): + ret = libgfc.gf_changelog_start_fresh() + if ret == -1: + _raise_changelog_err() + + +def getchanges(): + def clsort(cfile): + return cfile.split('.')[-1] + + changes = [] + buf = gr_create_string_buffer(4096) + call = libgfc.gf_changelog_next_change + + while True: + ret = call(buf, 4096) + if ret in (0, -1): + break + + # py2 and py3 compatibility + result = bytearray_to_str(buf.raw[:ret - 1]) + changes.append(result) + + if ret == -1: + _raise_changelog_err() + + # cleanup tracker + startfresh() + + return sorted(changes, key=clsort) + + +def done(clfile): + ret = gr_cl_done(libgfc, clfile) + if ret == -1: + _raise_changelog_err() + + +def history_scan(): + ret = libgfc.gf_history_changelog_scan() + if ret == -1: + _raise_changelog_err() + + return ret + + +def history_changelog(changelog_path, start, end, num_parallel): + actual_end = c_ulong() + ret = gr_cl_history_changelog(libgfc, changelog_path, start, end, + num_parallel, byref(actual_end)) + if ret == -1: + _raise_changelog_err() + + if ret == -2: + raise ChangelogHistoryNotAvailable() + + return (ret, actual_end.value) + + +def history_startfresh(): + ret = libgfc.gf_history_changelog_start_fresh() + if ret == -1: + _raise_changelog_err() + + +def history_getchanges(): + def clsort(cfile): + return cfile.split('.')[-1] + + changes = [] + buf = gr_create_string_buffer(4096) + call = libgfc.gf_history_changelog_next_change + + while True: + ret = call(buf, 4096) + if ret in (0, -1): + break + + # py2 and py3 compatibility + result = bytearray_to_str(buf.raw[:ret - 1]) + changes.append(result) + + if ret == -1: + _raise_changelog_err() + + return sorted(changes, key=clsort) + + +def history_done(clfile): + ret = gr_cl_history_done(libgfc, clfile) + if ret == -1: + _raise_changelog_err() diff --git a/geo-replication/syncdaemon/master.py b/geo-replication/syncdaemon/master.py index 65eaf673099..9501aeae6b5 100644 --- a/geo-replication/syncdaemon/master.py +++ b/geo-replication/syncdaemon/master.py @@ -22,11 +22,13 @@ from threading import Condition, Lock from datetime import datetime import gsyncdconfig as gconf +import libgfchangelog from rconf import rconf -from syncdutils import Thread, GsyncdError, escape_space_newline -from syncdutils import unescape_space_newline, gauxpfx, escape -from syncdutils import lstat, errno_wrap, FreeObject, lf, matching_disk_gfid -from syncdutils import NoStimeAvailable, PartialHistoryAvailable +from syncdutils import (Thread, GsyncdError, escape_space_newline, + unescape_space_newline, gauxpfx, escape, + lstat, errno_wrap, FreeObject, lf, matching_disk_gfid, + NoStimeAvailable, PartialHistoryAvailable, + host_brick_split) URXTIME = (-1, 0) @@ -65,6 +67,9 @@ def _volinfo_hook_relax_foreign(self): def edct(op, **ed): dct = {} dct['op'] = op + # This is used in automatic gfid conflict resolution. + # When marked True, it's skipped during re-processing. + dct['skip_entry'] = False for k in ed: if k == 'stat': st = ed[k] @@ -514,7 +519,7 @@ class GMasterCommon(object): # If crawlwrap is called when partial history available, # then it sets register_time which is the time when geo-rep # worker registered to changelog consumption. Since nsec is - # not considered in register time, their are chances of skipping + # not considered in register time, there are chances of skipping # changes detection in xsync crawl. This limit will be reset when # crawlwrap is called again. self.live_changelog_start_time = None @@ -696,7 +701,7 @@ class GMasterChangelogMixin(GMasterCommon): TYPE_ENTRY = "E " MAX_EF_RETRIES = 10 - MAX_OE_RETRIES = 5 + MAX_OE_RETRIES = 10 # flat directory hierarchy for gfid based access FLAT_DIR_HIERARCHY = '.' @@ -792,6 +797,7 @@ class GMasterChangelogMixin(GMasterCommon): pfx = gauxpfx() fix_entry_ops = [] failures1 = [] + remove_gfids = set() for failure in failures: if failure[2]['name_mismatch']: pbname = failure[2]['slave_entry'] @@ -807,7 +813,7 @@ class GMasterChangelogMixin(GMasterCommon): st = lstat(os.path.join(pfx, slave_gfid)) # Takes care of scenarios with no hardlinks if isinstance(st, int) and st == ENOENT: - logging.info(lf('Entry not present on master. Fixing gfid ' + logging.debug(lf('Entry not present on master. Fixing gfid ' 'mismatch in slave. Deleting the entry', retry_count=retry_count, entry=repr(failure))) @@ -822,46 +828,67 @@ class GMasterChangelogMixin(GMasterCommon): edct('UNLINK', gfid=failure[2]['slave_gfid'], entry=pbname)) + remove_gfids.add(slave_gfid) + if op in ['RENAME']: + # If renamed gfid doesn't exists on master, remove + # rename entry and unlink src on slave + st = lstat(os.path.join(pfx, failure[0]['gfid'])) + if isinstance(st, int) and st == ENOENT: + logging.debug("Unlink source %s" % repr(failure)) + remove_gfids.add(failure[0]['gfid']) + fix_entry_ops.append( + edct('UNLINK', + gfid=failure[0]['gfid'], + entry=failure[0]['entry'])) # Takes care of scenarios of hardlinks/renames on master elif not isinstance(st, int): if matching_disk_gfid(slave_gfid, pbname): # Safe to ignore the failure as master contains same # file with same gfid. Remove entry from entries list - logging.info(lf('Fixing gfid mismatch in slave. ' + logging.debug(lf('Fixing gfid mismatch in slave. ' ' Safe to ignore, take out entry', retry_count=retry_count, entry=repr(failure))) - entries.remove(failure[0]) + remove_gfids.add(failure[0]['gfid']) + if op == 'RENAME': + fix_entry_ops.append( + edct('UNLINK', + gfid=failure[0]['gfid'], + entry=failure[0]['entry'])) # The file exists on master but with different name. # Probably renamed and got missed during xsync crawl. elif failure[2]['slave_isdir']: - realpath = os.readlink(os.path.join(gconf.local_path, - ".glusterfs", - slave_gfid[0:2], - slave_gfid[2:4], - slave_gfid)) + realpath = os.readlink(os.path.join( + rconf.args.local_path, + ".glusterfs", + slave_gfid[0:2], + slave_gfid[2:4], + slave_gfid)) dst_entry = os.path.join(pfx, realpath.split('/')[-2], realpath.split('/')[-1]) src_entry = pbname - logging.info(lf('Fixing dir name/gfid mismatch in ' + logging.debug(lf('Fixing dir name/gfid mismatch in ' 'slave', retry_count=retry_count, entry=repr(failure))) if src_entry == dst_entry: # Safe to ignore the failure as master contains # same directory as in slave with same gfid. # Remove the failure entry from entries list - logging.info(lf('Fixing dir name/gfid mismatch' + logging.debug(lf('Fixing dir name/gfid mismatch' ' in slave. Safe to ignore, ' 'take out entry', retry_count=retry_count, entry=repr(failure))) - entries.remove(failure[0]) + try: + entries.remove(failure[0]) + except ValueError: + pass else: rename_dict = edct('RENAME', gfid=slave_gfid, entry=src_entry, entry1=dst_entry, stat=st, link=None) - logging.info(lf('Fixing dir name/gfid mismatch' + logging.debug(lf('Fixing dir name/gfid mismatch' ' in slave. Renaming', retry_count=retry_count, entry=repr(rename_dict))) @@ -871,7 +898,7 @@ class GMasterChangelogMixin(GMasterCommon): # renamed file exists and we are sure from # matching_disk_gfid check that the entry doesn't # exist with same gfid so we can safely delete on slave - logging.info(lf('Fixing file gfid mismatch in slave. ' + logging.debug(lf('Fixing file gfid mismatch in slave. ' 'Hardlink/Rename Case. Deleting entry', retry_count=retry_count, entry=repr(failure))) @@ -880,31 +907,52 @@ class GMasterChangelogMixin(GMasterCommon): gfid=failure[2]['slave_gfid'], entry=pbname)) elif failure[1] == ENOENT: - # Ignore ENOENT error for fix_entry_ops aka retry_count > 1 - if retry_count > 1: - logging.info(lf('ENOENT error while fixing entry ops. ' - 'Safe to ignore, take out entry', + if op in ['RENAME']: + pbname = failure[0]['entry1'] + else: + pbname = failure[0]['entry'] + + pargfid = pbname.split('/')[1] + st = lstat(os.path.join(pfx, pargfid)) + # Safe to ignore the failure as master doesn't contain + # parent directory. + if isinstance(st, int): + logging.debug(lf('Fixing ENOENT error in slave. Parent ' + 'does not exist on master. Safe to ' + 'ignore, take out entry', retry_count=retry_count, entry=repr(failure))) - entries.remove(failure[0]) - elif op in ('MKNOD', 'CREATE', 'MKDIR'): - pargfid = pbname.split('/')[1] - st = lstat(os.path.join(pfx, pargfid)) - # Safe to ignore the failure as master doesn't contain - # parent directory. - if isinstance(st, int): - logging.info(lf('Fixing ENOENT error in slave. Parent ' - 'does not exist on master. Safe to ' - 'ignore, take out entry', - retry_count=retry_count, - entry=repr(failure))) + try: entries.remove(failure[0]) + except ValueError: + pass + else: + logging.debug(lf('Fixing ENOENT error in slave. Create ' + 'parent directory on slave.', + retry_count=retry_count, + entry=repr(failure))) + realpath = os.readlink(os.path.join(rconf.args.local_path, + ".glusterfs", + pargfid[0:2], + pargfid[2:4], + pargfid)) + dir_entry = os.path.join(pfx, realpath.split('/')[-2], + realpath.split('/')[-1]) + fix_entry_ops.append( + edct('MKDIR', gfid=pargfid, entry=dir_entry, + mode=st.st_mode, uid=st.st_uid, gid=st.st_gid)) + + logging.debug("remove_gfids: %s" % repr(remove_gfids)) + if remove_gfids: + for e in entries: + if e['op'] in ['MKDIR', 'MKNOD', 'CREATE', 'RENAME'] \ + and e['gfid'] in remove_gfids: + logging.debug("Removed entry op from retrial list: entry: %s" % repr(e)) + e['skip_entry'] = True if fix_entry_ops: # Process deletions of entries whose gfids are mismatched failures1 = self.slave.server.entry_ops(fix_entry_ops) - if not failures1: - logging.info("Successfully fixed entry ops with gfid mismatch") return (failures1, fix_entry_ops) @@ -1078,6 +1126,11 @@ class GMasterChangelogMixin(GMasterCommon): os.path.join(pfx, ec[self.POS_ENTRY1 - 1])) entries.append(edct(ty, gfid=gfid, entry=e1, entry1=en, stat=st, link=rl)) + # If src doesn't exist while doing rename, destination + # is created. If data is not followed by rename, this + # remains zero byte file on slave. Hence add data entry + # for renames + datas.add(os.path.join(pfx, gfid)) else: # stat() to get mode and other information if not matching_disk_gfid(gfid, en): @@ -1101,6 +1154,12 @@ class GMasterChangelogMixin(GMasterCommon): rl = None entries.append(edct(ty, stat=st, entry=en, gfid=gfid, link=rl)) + # If src doesn't exist while doing link, destination + # is created based on file type. If data is not + # followed by link, this remains zero byte file on + # slave. Hence add data entry for links + if rl is None: + datas.add(os.path.join(pfx, gfid)) elif ty == 'SYMLINK': rl = errno_wrap(os.readlink, [en], [ENOENT], [ESTALE, EINTR]) @@ -1148,7 +1207,6 @@ class GMasterChangelogMixin(GMasterCommon): logging.debug('entries: %s' % repr(entries)) # Increment counters for Status - self.status.inc_value("entry", len(entries)) self.files_in_batch += len(datas) self.status.inc_value("data", len(datas)) @@ -1166,10 +1224,13 @@ class GMasterChangelogMixin(GMasterCommon): if gconf.get("gfid-conflict-resolution"): count = 0 + if failures: + logging.info(lf('Entry ops failed with gfid mismatch', + count=len(failures))) while failures and count < self.MAX_OE_RETRIES: count += 1 self.handle_entry_failures(failures, entries) - logging.info("Retry original entries. count = %s" % count) + logging.info(lf('Retry original entries', count=count)) failures = self.slave.server.entry_ops(entries) if not failures: logging.info("Successfully fixed all entry ops with " @@ -1211,10 +1272,10 @@ class GMasterChangelogMixin(GMasterCommon): continue meta_entries.append(edct('META', go=go[0], stat=st)) if meta_entries: - self.status.inc_value("meta", len(entries)) + self.status.inc_value("meta", len(meta_entries)) failures = self.slave.server.meta_ops(meta_entries) self.log_failures(failures, 'go', '', 'META') - self.status.dec_value("meta", len(entries)) + self.status.dec_value("meta", len(meta_entries)) self.batch_stats["META_SYNC_TIME"] += time.time() - meta_start_time @@ -1406,7 +1467,7 @@ class GMasterChangelogMixin(GMasterCommon): node = rconf.args.resource_remote node_data = node.split("@") node = node_data[-1] - remote_node_ip = node.split(":")[0] + remote_node_ip, _ = host_brick_split(node) self.status.set_slave_node(remote_node_ip) def changelogs_batch_process(self, changes): @@ -1439,9 +1500,9 @@ class GMasterChangelogMixin(GMasterCommon): # that are _historical_ to that time. data_stime = self.get_data_stime() - self.changelog_agent.scan() + libgfchangelog.scan() self.crawls += 1 - changes = self.changelog_agent.getchanges() + changes = libgfchangelog.getchanges() if changes: if data_stime: logging.info(lf("slave's time", @@ -1458,10 +1519,9 @@ class GMasterChangelogMixin(GMasterCommon): self.changelogs_batch_process(changes) - def register(self, register_time, changelog_agent, status): - self.changelog_agent = changelog_agent + def register(self, register_time, status): self.sleep_interval = gconf.get("change-interval") - self.changelog_done_func = self.changelog_agent.done + self.changelog_done_func = libgfchangelog.done self.tempdir = self.setup_working_dir() self.processed_changelogs_dir = os.path.join(self.tempdir, ".processed") @@ -1470,11 +1530,10 @@ class GMasterChangelogMixin(GMasterCommon): class GMasterChangeloghistoryMixin(GMasterChangelogMixin): - def register(self, register_time, changelog_agent, status): - self.changelog_agent = changelog_agent + def register(self, register_time, status): self.changelog_register_time = register_time self.history_crawl_start_time = register_time - self.changelog_done_func = self.changelog_agent.history_done + self.changelog_done_func = libgfchangelog.history_done self.history_turns = 0 self.tempdir = self.setup_working_dir() self.processed_changelogs_dir = os.path.join(self.tempdir, @@ -1488,6 +1547,12 @@ class GMasterChangeloghistoryMixin(GMasterChangelogMixin): data_stime = self.get_data_stime() end_time = int(time.time()) + + #as start of historical crawl marks Geo-rep worker restart + if gconf.get("ignore-deletes"): + logging.info(lf('ignore-deletes config option is set', + stime=data_stime)) + logging.info(lf('starting history crawl', turns=self.history_turns, stime=data_stime, @@ -1502,7 +1567,7 @@ class GMasterChangeloghistoryMixin(GMasterChangelogMixin): # location then consuming history will not work(Known issue as of now) changelog_path = os.path.join(rconf.args.local_path, ".glusterfs/changelogs") - ret, actual_end = self.changelog_agent.history( + ret, actual_end = libgfchangelog.history_changelog( changelog_path, data_stime[0], end_time, @@ -1514,10 +1579,10 @@ class GMasterChangeloghistoryMixin(GMasterChangelogMixin): # to be processed. returns positive value as number of changelogs # to be processed, which will be fetched using # history_getchanges() - while self.changelog_agent.history_scan() > 0: + while libgfchangelog.history_scan() > 0: self.crawls += 1 - changes = self.changelog_agent.history_getchanges() + changes = libgfchangelog.history_getchanges() if changes: if data_stime: logging.info(lf("slave's time", @@ -1570,7 +1635,7 @@ class GMasterXsyncMixin(GMasterChangelogMixin): XSYNC_MAX_ENTRIES = 1 << 13 - def register(self, register_time=None, changelog_agent=None, status=None): + def register(self, register_time=None, status=None): self.status = status self.counter = 0 self.comlist = [] diff --git a/geo-replication/syncdaemon/monitor.py b/geo-replication/syncdaemon/monitor.py index c45ef24e59f..6aa7b9dfc99 100644 --- a/geo-replication/syncdaemon/monitor.py +++ b/geo-replication/syncdaemon/monitor.py @@ -20,13 +20,14 @@ import random from resource import SSH import gsyncdconfig as gconf +import libgfchangelog from rconf import rconf -from syncdutils import select, waitpid, errno_wrap, lf, grabpidfile -from syncdutils import set_term_handler, GsyncdError -from syncdutils import Thread, finalize, Volinfo, VolinfoFromGconf -from syncdutils import gf_event, EVENT_GEOREP_FAULTY, get_up_nodes +from syncdutils import (select, waitpid, errno_wrap, lf, grabpidfile, + set_term_handler, GsyncdError, + Thread, finalize, Volinfo, VolinfoFromGconf, + gf_event, EVENT_GEOREP_FAULTY, get_up_nodes, + unshare_propagation_supported) from gsyncdstatus import GeorepStatus, set_monitor_status -from syncdutils import unshare_propagation_supported import py2py3 from py2py3 import pipe @@ -37,6 +38,8 @@ def get_subvol_num(brick_idx, vol, hot): tier = vol.is_tier() disperse_count = vol.disperse_count(tier, hot) replica_count = vol.replica_count(tier, hot) + distribute_count = vol.distribution_count(tier, hot) + gconf.setconfig("master-distribution-count", distribute_count) if (tier and not hot): brick_idx = brick_idx - vol.get_hot_bricks_count(tier) @@ -79,7 +82,7 @@ class Monitor(object): # give a chance to graceful exit errno_wrap(os.kill, [-os.getpid(), signal.SIGTERM], [ESRCH]) - def monitor(self, w, argv, cpids, agents, slave_vol, slave_host, master, + def monitor(self, w, argv, cpids, slave_vol, slave_host, master, suuid, slavenodes): """the monitor loop @@ -148,7 +151,7 @@ class Monitor(object): remote_host = "%s@%s" % (remote_user, remote_new[0]) remote_id = remote_new[1] - # Spawn the worker and agent in lock to avoid fd leak + # Spawn the worker in lock to avoid fd leak self.lock.acquire() self.status[w[0]['dir']].set_worker_status(self.ST_INIT) @@ -156,44 +159,10 @@ class Monitor(object): brick=w[0]['dir'], slave_node=remote_host)) - # Couple of pipe pairs for RPC communication b/w - # worker and changelog agent. - - # read/write end for agent - (ra, ww) = pipe() - # read/write end for worker - (rw, wa) = pipe() - - # spawn the agent process - apid = os.fork() - if apid == 0: - os.close(rw) - os.close(ww) - args_to_agent = argv + [ - 'agent', - rconf.args.master, - rconf.args.slave, - '--local-path', w[0]['dir'], - '--local-node', w[0]['host'], - '--local-node-id', w[0]['uuid'], - '--slave-id', suuid, - '--rpc-fd', ','.join([str(ra), str(wa), str(rw), str(ww)]) - ] - - if rconf.args.config_file is not None: - args_to_agent += ['-c', rconf.args.config_file] - - if rconf.args.debug: - args_to_agent.append("--debug") - - os.execv(sys.executable, args_to_agent) - pr, pw = pipe() cpid = os.fork() if cpid == 0: os.close(pr) - os.close(ra) - os.close(wa) args_to_worker = argv + [ 'worker', @@ -204,8 +173,6 @@ class Monitor(object): '--local-node', w[0]['host'], '--local-node-id', w[0]['uuid'], '--slave-id', suuid, - '--rpc-fd', - ','.join([str(rw), str(ww), str(ra), str(wa)]), '--subvol-num', str(w[2]), '--resource-remote', remote_host, '--resource-remote-id', remote_id @@ -236,14 +203,8 @@ class Monitor(object): os.execv(sys.executable, args_to_worker) cpids.add(cpid) - agents.add(apid) os.close(pw) - # close all RPC pipes in monitor - os.close(ra) - os.close(wa) - os.close(rw) - os.close(ww) self.lock.release() t0 = time.time() @@ -252,42 +213,19 @@ class Monitor(object): if so: ret = nwait(cpid, os.WNOHANG) - ret_agent = nwait(apid, os.WNOHANG) - - if ret_agent is not None: - # Agent is died Kill Worker - logging.info(lf("Changelog Agent died, Aborting Worker", - brick=w[0]['dir'])) - errno_wrap(os.kill, [cpid, signal.SIGKILL], [ESRCH]) - nwait(cpid) - nwait(apid) if ret is not None: logging.info(lf("worker died before establishing " "connection", brick=w[0]['dir'])) - nwait(apid) # wait for agent else: logging.debug("worker(%s) connected" % w[0]['dir']) while time.time() < t0 + conn_timeout: ret = nwait(cpid, os.WNOHANG) - ret_agent = nwait(apid, os.WNOHANG) if ret is not None: logging.info(lf("worker died in startup phase", brick=w[0]['dir'])) - nwait(apid) # wait for agent - break - - if ret_agent is not None: - # Agent is died Kill Worker - logging.info(lf("Changelog Agent died, Aborting " - "Worker", - brick=w[0]['dir'])) - errno_wrap(os.kill, [cpid, signal.SIGKILL], - [ESRCH]) - nwait(cpid) - nwait(apid) break time.sleep(1) @@ -302,12 +240,8 @@ class Monitor(object): brick=w[0]['dir'], timeout=conn_timeout)) errno_wrap(os.kill, [cpid, signal.SIGKILL], [ESRCH]) - nwait(apid) # wait for agent ret = nwait(cpid) if ret is None: - # If worker dies, agent terminates on EOF. - # So lets wait for agent first. - nwait(apid) ret = nwait(cpid) if exit_signalled(ret): ret = 0 @@ -331,18 +265,15 @@ class Monitor(object): argv = [os.path.basename(sys.executable), sys.argv[0]] cpids = set() - agents = set() ta = [] for wx in wspx: def wmon(w): - cpid, _ = self.monitor(w, argv, cpids, agents, slave_vol, + cpid, _ = self.monitor(w, argv, cpids, slave_vol, slave_host, master, suuid, slavenodes) time.sleep(1) self.lock.acquire() for cpid in cpids: errno_wrap(os.kill, [cpid, signal.SIGKILL], [ESRCH]) - for apid in agents: - errno_wrap(os.kill, [apid, signal.SIGKILL], [ESRCH]) self.lock.release() finalize(exval=1) t = Thread(target=wmon, args=[wx]) @@ -352,8 +283,8 @@ class Monitor(object): # monitor status was being updated in each monitor thread. It # should not be done as it can cause deadlock for a worker start. # set_monitor_status uses flock to synchronize multple instances - # updating the file. Since each monitor thread forks worker and - # agent, these processes can hold the reference to fd of status + # updating the file. Since each monitor thread forks worker, + # these processes can hold the reference to fd of status # file causing deadlock to workers which starts later as flock # will not be release until all references to same fd is closed. # It will also cause fd leaks. @@ -369,7 +300,7 @@ def distribute(master, slave): if rconf.args.use_gconf_volinfo: mvol = VolinfoFromGconf(master.volume, master=True) else: - mvol = Volinfo(master.volume, master.host) + mvol = Volinfo(master.volume, master.host, master=True) logging.debug('master bricks: ' + repr(mvol.bricks)) prelude = [] slave_host = None @@ -385,7 +316,7 @@ def distribute(master, slave): if rconf.args.use_gconf_volinfo: svol = VolinfoFromGconf(slave.volume, master=False) else: - svol = Volinfo(slave.volume, "localhost", prelude) + svol = Volinfo(slave.volume, "localhost", prelude, master=False) sbricks = svol.bricks suuid = svol.uuid diff --git a/geo-replication/syncdaemon/py2py3.py b/geo-replication/syncdaemon/py2py3.py index 4c0e1152aa8..f9c76e1b50a 100644 --- a/geo-replication/syncdaemon/py2py3.py +++ b/geo-replication/syncdaemon/py2py3.py @@ -12,7 +12,10 @@ import sys import os - +import stat +import struct +from syncdutils import umask +from ctypes import create_string_buffer if sys.version_info >= (3,): def pipe(): @@ -35,6 +38,77 @@ if sys.version_info >= (3,): def str_to_bytearray(string): return bytes([ord(c) for c in string]) + def gr_create_string_buffer(size): + return create_string_buffer(b'\0', size) + + def gr_query_xattr(cls, path, size, syscall, attr=None): + if attr: + return cls._query_xattr(path.encode(), size, syscall, + attr.encode()) + else: + return cls._query_xattr(path.encode(), size, syscall) + + def gr_lsetxattr(cls, path, attr, val): + return cls.libc.lsetxattr(path.encode(), attr.encode(), val, + len(val), 0) + + def gr_lremovexattr(cls, path, attr): + return cls.libc.lremovexattr(path.encode(), attr.encode()) + + def gr_cl_register(libgfapi, brick, path, log_file, log_level, retries): + return libgfapi.gf_changelog_register(brick.encode(), + path.encode(), + log_file.encode(), + log_level, retries) + + def gr_cl_done(libgfapi, clfile): + return libgfapi.gf_changelog_done(clfile.encode()) + + def gr_cl_history_changelog(libgfapi, changelog_path, start, end, num_parallel, + actual_end): + return libgfapi.gf_history_changelog(changelog_path.encode(), + start, end, num_parallel, + actual_end) + + def gr_cl_history_done(libgfapi, clfile): + return libgfapi.gf_history_changelog_done(clfile.encode()) + + # regular file + + def entry_pack_reg(cls, gf, bn, mo, uid, gid): + bn_encoded = bn.encode() + blen = len(bn_encoded) + return struct.pack(cls._fmt_mknod(blen), + uid, gid, gf.encode(), mo, bn_encoded, + stat.S_IMODE(mo), 0, umask()) + + def entry_pack_reg_stat(cls, gf, bn, st): + bn_encoded = bn.encode() + blen = len(bn_encoded) + mo = st['mode'] + return struct.pack(cls._fmt_mknod(blen), + st['uid'], st['gid'], + gf.encode(), mo, bn_encoded, + stat.S_IMODE(mo), 0, umask()) + # mkdir + + def entry_pack_mkdir(cls, gf, bn, mo, uid, gid): + bn_encoded = bn.encode() + blen = len(bn_encoded) + return struct.pack(cls._fmt_mkdir(blen), + uid, gid, gf.encode(), mo, bn_encoded, + stat.S_IMODE(mo), umask()) + # symlink + + def entry_pack_symlink(cls, gf, bn, lnk, st): + bn_encoded = bn.encode() + blen = len(bn_encoded) + lnk_encoded = lnk.encode() + llen = len(lnk_encoded) + return struct.pack(cls._fmt_symlink(blen, llen), + st['uid'], st['gid'], + gf.encode(), st['mode'], bn_encoded, + lnk_encoded) else: def pipe(): (r, w) = os.pipe() @@ -42,8 +116,69 @@ else: # Raw conversion of bytearray to string def bytearray_to_str(byte_arr): - return ''.join([b for b in byte_arr]) + return byte_arr # Raw conversion of string to bytearray def str_to_bytearray(string): - return b"".join([c for c in string]) + return string + + def gr_create_string_buffer(size): + return create_string_buffer('\0', size) + + def gr_query_xattr(cls, path, size, syscall, attr=None): + if attr: + return cls._query_xattr(path, size, syscall, attr) + else: + return cls._query_xattr(path, size, syscall) + + def gr_lsetxattr(cls, path, attr, val): + return cls.libc.lsetxattr(path, attr, val, len(val), 0) + + def gr_lremovexattr(cls, path, attr): + return cls.libc.lremovexattr(path, attr) + + def gr_cl_register(libgfapi, brick, path, log_file, log_level, retries): + return libgfapi.gf_changelog_register(brick, path, log_file, + log_level, retries) + + def gr_cl_done(libgfapi, clfile): + return libgfapi.gf_changelog_done(clfile) + + def gr_cl_history_changelog(libgfapi, changelog_path, start, end, num_parallel, + actual_end): + return libgfapi.gf_history_changelog(changelog_path, start, end, + num_parallel, actual_end) + + def gr_cl_history_done(libgfapi, clfile): + return libgfapi.gf_history_changelog_done(clfile) + + # regular file + + def entry_pack_reg(cls, gf, bn, mo, uid, gid): + blen = len(bn) + return struct.pack(cls._fmt_mknod(blen), + uid, gid, gf, mo, bn, + stat.S_IMODE(mo), 0, umask()) + + def entry_pack_reg_stat(cls, gf, bn, st): + blen = len(bn) + mo = st['mode'] + return struct.pack(cls._fmt_mknod(blen), + st['uid'], st['gid'], + gf, mo, bn, + stat.S_IMODE(mo), 0, umask()) + # mkdir + + def entry_pack_mkdir(cls, gf, bn, mo, uid, gid): + blen = len(bn) + return struct.pack(cls._fmt_mkdir(blen), + uid, gid, gf, mo, bn, + stat.S_IMODE(mo), umask()) + # symlink + + def entry_pack_symlink(cls, gf, bn, lnk, st): + blen = len(bn) + llen = len(lnk) + return struct.pack(cls._fmt_symlink(blen, llen), + st['uid'], st['gid'], + gf, st['mode'], bn, lnk) diff --git a/geo-replication/syncdaemon/repce.py b/geo-replication/syncdaemon/repce.py index 6065b828c99..c622afa6373 100644 --- a/geo-replication/syncdaemon/repce.py +++ b/geo-replication/syncdaemon/repce.py @@ -8,7 +8,6 @@ # cases as published by the Free Software Foundation. # -import _io import os import sys import time @@ -58,9 +57,9 @@ def recv(inf): """load an object from input stream python2 and python3 compatibility, inf is sys.stdin and is opened as text stream by default. Hence using the - buffer attribute + buffer attribute in python3 """ - if isinstance(inf, _io.TextIOWrapper): + if hasattr(inf, "buffer"): return pickle.load(inf.buffer) else: return pickle.load(inf) diff --git a/geo-replication/syncdaemon/resource.py b/geo-replication/syncdaemon/resource.py index 80e1fbcb133..f12c7ceaa36 100644 --- a/geo-replication/syncdaemon/resource.py +++ b/geo-replication/syncdaemon/resource.py @@ -19,30 +19,31 @@ import struct import logging import tempfile import subprocess -from errno import EEXIST, ENOENT, ENODATA, ENOTDIR, ELOOP, EACCES -from errno import EISDIR, ENOTEMPTY, ESTALE, EINVAL, EBUSY, EPERM +from errno import (EEXIST, ENOENT, ENODATA, ENOTDIR, ELOOP, EACCES, + EISDIR, ENOTEMPTY, ESTALE, EINVAL, EBUSY, EPERM) import errno from rconf import rconf import gsyncdconfig as gconf +import libgfchangelog import repce from repce import RepceServer, RepceClient from master import gmaster_builder import syncdutils -from syncdutils import GsyncdError, select, privileged, funcode -from syncdutils import umask, entry2pb, gauxpfx, errno_wrap, lstat -from syncdutils import NoStimeAvailable, PartialHistoryAvailable -from syncdutils import ChangelogException, ChangelogHistoryNotAvailable -from syncdutils import get_changelog_log_level, get_rsync_version -from syncdutils import CHANGELOG_AGENT_CLIENT_VERSION -from syncdutils import GX_GFID_CANONICAL_LEN +from syncdutils import (GsyncdError, select, privileged, funcode, + entry2pb, gauxpfx, errno_wrap, lstat, + NoStimeAvailable, PartialHistoryAvailable, + ChangelogException, ChangelogHistoryNotAvailable, + get_changelog_log_level, get_rsync_version, + GX_GFID_CANONICAL_LEN, + gf_mount_ready, lf, Popen, sup, + Xattr, matching_disk_gfid, get_gfid_from_mnt, + unshare_propagation_supported, get_slv_dir_path) from gsyncdstatus import GeorepStatus -from syncdutils import lf, Popen, sup -from syncdutils import Xattr, matching_disk_gfid, get_gfid_from_mnt -from syncdutils import unshare_propagation_supported, get_slv_dir_path -import py2py3 -from py2py3 import pipe, str_to_bytearray +from py2py3 import (pipe, str_to_bytearray, entry_pack_reg, + entry_pack_reg_stat, entry_pack_mkdir, + entry_pack_symlink) ENOTSUP = getattr(errno, 'ENOTSUP', 'EOPNOTSUPP') @@ -377,37 +378,7 @@ class Server(object): def entry_ops(cls, entries): pfx = gauxpfx() logging.debug('entries: %s' % repr(entries)) - # regular file - - def entry_pack_reg(gf, bn, mo, uid, gid): - blen = len(bn) - return struct.pack(cls._fmt_mknod(blen), - uid, gid, gf.encode(), mo, bn.encode(), - stat.S_IMODE(mo), 0, umask()) - - def entry_pack_reg_stat(gf, bn, st): - blen = len(bn) - mo = st['mode'] - return struct.pack(cls._fmt_mknod(blen), - st['uid'], st['gid'], - gf.encode(), mo, bn.encode(), - stat.S_IMODE(mo), 0, umask()) - # mkdir - - def entry_pack_mkdir(gf, bn, mo, uid, gid): - blen = len(bn) - return struct.pack(cls._fmt_mkdir(blen), - uid, gid, gf.encode(), mo, bn.encode(), - stat.S_IMODE(mo), umask()) - # symlink - - def entry_pack_symlink(gf, bn, lnk, st): - blen = len(bn) - llen = len(lnk) - return struct.pack(cls._fmt_symlink(blen, llen), - st['uid'], st['gid'], - gf.encode(), st['mode'], bn.encode(), - lnk.encode()) + dist_count = rconf.args.master_dist_count def entry_purge(op, entry, gfid, e, uid, gid): # This is an extremely racy code and needs to be fixed ASAP. @@ -457,7 +428,7 @@ class Server(object): e['stat']['uid'] = uid e['stat']['gid'] = gid - if cmd_ret == EEXIST: + if cmd_ret in [EEXIST, ESTALE]: if dst: en = e['entry1'] else: @@ -541,6 +512,12 @@ class Server(object): entry = e['entry'] uid = 0 gid = 0 + + # Skip entry processing if it's marked true during gfid + # conflict resolution + if e['skip_entry']: + continue + if e.get("stat", {}): # Copy UID/GID value and then reset to zero. Copied UID/GID # will be used to run chown once entry is created. @@ -581,8 +558,8 @@ class Server(object): st = lstat(slink) # don't create multiple entries with same gfid if isinstance(st, int): - blob = entry_pack_reg( - gfid, bname, e['mode'], e['uid'], e['gid']) + blob = entry_pack_reg(cls, gfid, bname, + e['mode'], e['uid'], e['gid']) # Self healed hardlinks are recorded as MKNOD. # So if the gfid already exists, it should be # processed as hard link not mknod. @@ -597,8 +574,8 @@ class Server(object): st = lstat(slink) # don't create multiple entries with same gfid if isinstance(st, int): - blob = entry_pack_mkdir( - gfid, bname, e['mode'], e['uid'], e['gid']) + blob = entry_pack_mkdir(cls, gfid, bname, + e['mode'], e['uid'], e['gid']) elif (isinstance(lstat(en), int) or not matching_disk_gfid(gfid, en)): # If gfid of a directory exists on slave but path based @@ -610,6 +587,8 @@ class Server(object): logging.info(lf("Special case: rename on mkdir", gfid=gfid, entry=repr(entry))) src_entry = get_slv_dir_path(slv_host, slv_volume, gfid) + if src_entry is None: + collect_failure(e, ENOENT, uid, gid) if src_entry is not None and src_entry != entry: slv_entry_info = {} slv_entry_info['gfid_mismatch'] = False @@ -626,9 +605,9 @@ class Server(object): if isinstance(st, int): (pg, bname) = entry2pb(entry) if stat.S_ISREG(e['stat']['mode']): - blob = entry_pack_reg_stat(gfid, bname, e['stat']) + blob = entry_pack_reg_stat(cls, gfid, bname, e['stat']) elif stat.S_ISLNK(e['stat']['mode']): - blob = entry_pack_symlink(gfid, bname, e['link'], + blob = entry_pack_symlink(cls, gfid, bname, e['link'], e['stat']) else: cmd_ret = errno_wrap(os.link, @@ -639,7 +618,7 @@ class Server(object): en = e['entry'] st = lstat(entry) if isinstance(st, int): - blob = entry_pack_symlink(gfid, bname, e['link'], + blob = entry_pack_symlink(cls, gfid, bname, e['link'], e['stat']) elif not matching_disk_gfid(gfid, en): collect_failure(e, EEXIST, uid, gid) @@ -654,22 +633,27 @@ class Server(object): # exist with different gfid. if not matching_disk_gfid(gfid, entry): if e['stat'] and not stat.S_ISDIR(e['stat']['mode']): - if stat.S_ISLNK(e['stat']['mode']) and \ - e['link'] is not None: - st1 = lstat(en) - if isinstance(st1, int): - (pg, bname) = entry2pb(en) - blob = entry_pack_symlink(gfid, bname, - e['link'], e['stat']) - elif not matching_disk_gfid(gfid, en): - collect_failure(e, EEXIST, uid, gid, True) + if stat.S_ISLNK(e['stat']['mode']): + # src is not present, so don't sync symlink as + # we don't know target. It's ok to ignore. If + # it's unliked, it's fine. If it's renamed to + # something else, it will be synced then. + if e['link'] is not None: + st1 = lstat(en) + if isinstance(st1, int): + (pg, bname) = entry2pb(en) + blob = entry_pack_symlink(cls, gfid, bname, + e['link'], + e['stat']) + elif not matching_disk_gfid(gfid, en): + collect_failure(e, EEXIST, uid, gid, True) else: slink = os.path.join(pfx, gfid) st = lstat(slink) # don't create multiple entries with same gfid if isinstance(st, int): (pg, bname) = entry2pb(en) - blob = entry_pack_reg_stat(gfid, bname, + blob = entry_pack_reg_stat(cls, gfid, bname, e['stat']) else: cmd_ret = errno_wrap(os.link, [slink, en], @@ -704,15 +688,21 @@ class Server(object): raise else: raise - elif not matching_disk_gfid(gfid, en): + elif not matching_disk_gfid(gfid, en) and dist_count > 1: collect_failure(e, EEXIST, uid, gid, True) else: + # We are here which means matching_disk_gfid for + # both source and destination has returned false + # and distribution count for master vol is greater + # then one. Which basically says both the source and + # destination exist and not hardlinks. + # So we are safe to go ahead with rename here. rename_with_disk_gfid_confirmation(gfid, entry, en, uid, gid) if blob: cmd_ret = errno_wrap(Xattr.lsetxattr, [pg, 'glusterfs.gfid.newfile', blob], - [EEXIST, ENOENT], + [EEXIST, ENOENT, ESTALE], [ESTALE, EINVAL, EBUSY]) collect_failure(e, cmd_ret, uid, gid) @@ -750,10 +740,8 @@ class Server(object): # 'lchown' 'lchmod' 'utime with no-deference' blindly. # But since 'lchmod' and 'utime with no de-reference' is # not supported in python3, we have to rely on 'chmod' - # and 'utime with de-reference'. But 'chmod' - # de-reference the symlink and gets ENOENT, EACCES, - # EPERM errors, hence ignoring those errors if it's on - # symlink file. + # and 'utime with de-reference'. Hence avoiding 'chmod' + # and 'utime' if it's symlink file. is_symlink = False cmd_ret = errno_wrap(os.lchown, [go, uid, gid], [ENOENT], @@ -761,19 +749,17 @@ class Server(object): if isinstance(cmd_ret, int): continue - cmd_ret = errno_wrap(os.chmod, [go, mode], - [ENOENT, EACCES, EPERM], [ESTALE, EINVAL]) - if isinstance(cmd_ret, int): - is_symlink = os.path.islink(go) - if not is_symlink: + is_symlink = os.path.islink(go) + + if not is_symlink: + cmd_ret = errno_wrap(os.chmod, [go, mode], + [ENOENT, EACCES, EPERM], [ESTALE, EINVAL]) + if isinstance(cmd_ret, int): failures.append((e, cmd_ret, "chmod")) - cmd_ret = errno_wrap(os.utime, [go, (atime, mtime)], - [ENOENT, EACCES, EPERM], [ESTALE, EINVAL]) - if isinstance(cmd_ret, int): - if not is_symlink: - is_symlink = os.path.islink(go) - if not is_symlink: + cmd_ret = errno_wrap(os.utime, [go, (atime, mtime)], + [ENOENT, EACCES, EPERM], [ESTALE, EINVAL]) + if isinstance(cmd_ret, int): failures.append((e, cmd_ret, "utime")) return failures @@ -965,6 +951,16 @@ class Mounter(object): logging.exception('mount cleanup failure:') rv = 200 os._exit(rv) + + #Polling the dht.subvol.status value. + RETRIES = 10 + while not gf_mount_ready(): + if RETRIES < 0: + logging.error('Subvols are not up') + break + RETRIES -= 1 + time.sleep(0.2) + logging.debug('auxiliary glusterfs mount prepared') @@ -1260,9 +1256,6 @@ class GLUSTER(object): # register the crawlers and start crawling # g1 ==> Xsync, g2 ==> config.change_detector(changelog by default) # g3 ==> changelog History - (inf, ouf, ra, wa) = rconf.args.rpc_fd.split(',') - changelog_agent = RepceClient(int(inf), int(ouf)) - status = GeorepStatus(gconf.get("state-file"), rconf.args.local_node, rconf.args.local_path, @@ -1270,12 +1263,6 @@ class GLUSTER(object): rconf.args.master, rconf.args.slave) status.reset_on_worker_start() - rv = changelog_agent.version() - if int(rv) != CHANGELOG_AGENT_CLIENT_VERSION: - raise GsyncdError( - "RePCe major version mismatch(changelog agent): " - "local %s, remote %s" % - (CHANGELOG_AGENT_CLIENT_VERSION, rv)) try: workdir = g2.setup_working_dir() @@ -1286,17 +1273,16 @@ class GLUSTER(object): # register with the changelog library # 9 == log level (DEBUG) # 5 == connection retries - changelog_agent.init() - changelog_agent.register(rconf.args.local_path, - workdir, - gconf.get("changelog-log-file"), - get_changelog_log_level( - gconf.get("changelog-log-level")), - g2.CHANGELOG_CONN_RETRIES) + libgfchangelog.register(rconf.args.local_path, + workdir, + gconf.get("changelog-log-file"), + get_changelog_log_level( + gconf.get("changelog-log-level")), + g2.CHANGELOG_CONN_RETRIES) register_time = int(time.time()) - g2.register(register_time, changelog_agent, status) - g3.register(register_time, changelog_agent, status) + g2.register(register_time, status) + g3.register(register_time, status) except ChangelogException as e: logging.error(lf("Changelog register failed", error=e)) sys.exit(1) @@ -1431,7 +1417,9 @@ class SSH(object): '--slave-gluster-log-level', gconf.get("slave-gluster-log-level"), '--slave-gluster-command-dir', - gconf.get("slave-gluster-command-dir")] + gconf.get("slave-gluster-command-dir"), + '--master-dist-count', + str(gconf.get("master-distribution-count"))] if gconf.get("slave-access-mount"): args_to_slave.append('--slave-access-mount') @@ -1496,7 +1484,7 @@ class SSH(object): if log_rsync_performance: # use stdout=PIPE only when log_rsync_performance enabled - # Else rsync will write to stdout and nobody is their + # Else rsync will write to stdout and nobody is there # to consume. If PIPE is full rsync hangs. po = Popen(argv, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True) @@ -1534,7 +1522,7 @@ class SSH(object): return po - def tarssh(self, files, slaveurl, log_err=False): + def tarssh(self, files, log_err=False): """invoke tar+ssh -z (compress) can be use if needed, but omitting it now as it results in weird error (tar+ssh errors out (errcode: 2) @@ -1542,10 +1530,11 @@ class SSH(object): if not files: raise GsyncdError("no files to sync") logging.debug("files: " + ", ".join(files)) - (host, rdir) = slaveurl.split(':') + (host, rdir) = self.slaveurl.split(':') + tar_cmd = ["tar"] + \ ["--sparse", "-cf", "-", "--files-from", "-"] - ssh_cmd = gconf.get("ssh-command-tar").split() + \ + ssh_cmd = gconf.get("ssh-command").split() + \ gconf.get("ssh-options-tar").split() + \ ["-p", str(gconf.get("ssh-port"))] + \ [host, "tar"] + \ @@ -1561,15 +1550,29 @@ class SSH(object): p0.stdin.close() p0.stdout.close() # Allow p0 to receive a SIGPIPE if p1 exits. - # wait for tar to terminate, collecting any errors, further - # waiting for transfer to complete - _, stderr1 = p1.communicate() # stdin and stdout of p0 is already closed, Reset to None and # wait for child process to complete p0.stdin = None p0.stdout = None - p0.communicate() + + def wait_for_tar(p0): + _, stderr = p0.communicate() + if log_err: + for errline in stderr.strip().split("\n")[:-1]: + if "No such file or directory" not in errline: + logging.error(lf("SYNC Error", + sync_engine="Tarssh", + error=errline)) + + t = syncdutils.Thread(target=wait_for_tar, args=(p0, )) + # wait for tar to terminate, collecting any errors, further + # waiting for transfer to complete + t.start() + + # wait for ssh process + _, stderr1 = p1.communicate() + t.join() if log_err: for errline in stderr1.strip().split("\n")[:-1]: diff --git a/geo-replication/syncdaemon/subcmds.py b/geo-replication/syncdaemon/subcmds.py index 30050ec4743..b8508532e30 100644 --- a/geo-replication/syncdaemon/subcmds.py +++ b/geo-replication/syncdaemon/subcmds.py @@ -73,7 +73,11 @@ def subcmd_worker(args): Popen.init_errhandler() fcntl.fcntl(args.feedback_fd, fcntl.F_SETFD, fcntl.FD_CLOEXEC) local = GLUSTER("localhost", args.master) - slavehost, slavevol = args.slave.split("::") + slave_url, slavevol = args.slave.split("::") + if "@" not in slave_url: + slavehost = args.resource_remote + else: + slavehost = "%s@%s" % (slave_url.split("@")[0], args.resource_remote) remote = SSH(slavehost, slavevol) remote.connect_remote() local.connect() @@ -93,25 +97,19 @@ def subcmd_slave(args): local.service_loop() -def subcmd_agent(args): - import os - from changelogagent import agent, Changelog - from syncdutils import lf - - os.setsid() - logging.debug(lf("RPC FD", - rpc_fd=repr(args.rpc_fd))) - return agent(Changelog(), args.rpc_fd) - - def subcmd_voluuidget(args): from subprocess import Popen, PIPE import xml.etree.ElementTree as XET ParseError = XET.ParseError if hasattr(XET, 'ParseError') else SyntaxError - po = Popen(['gluster', '--xml', '--remote-host=' + args.host, - 'volume', 'info', args.volname], bufsize=0, + cmd = ['gluster', '--xml', '--remote-host=' + args.host, + 'volume', 'info', args.volname] + + if args.inet6: + cmd.append("--inet6") + + po = Popen(cmd, bufsize=0, stdin=None, stdout=PIPE, stderr=PIPE, universal_newlines=True) diff --git a/geo-replication/syncdaemon/syncdutils.py b/geo-replication/syncdaemon/syncdutils.py index fad1a3e4f76..a3df103e76c 100644 --- a/geo-replication/syncdaemon/syncdutils.py +++ b/geo-replication/syncdaemon/syncdutils.py @@ -21,8 +21,8 @@ import subprocess import socket from subprocess import PIPE from threading import Lock, Thread as baseThread -from errno import EACCES, EAGAIN, EPIPE, ENOTCONN, ECONNABORTED -from errno import EINTR, ENOENT, ESTALE, EBUSY, errorcode +from errno import (EACCES, EAGAIN, EPIPE, ENOTCONN, ENOMEM, ECONNABORTED, + EINTR, ENOENT, ESTALE, EBUSY, ENODATA, errorcode, EIO) from signal import signal, SIGTERM import select as oselect from os import waitpid as owaitpid @@ -37,10 +37,10 @@ from conf import GLUSTERFS_LIBEXECDIR, UUID_FILE sys.path.insert(1, GLUSTERFS_LIBEXECDIR) EVENTS_ENABLED = True try: - from events.eventtypes import GEOREP_FAULTY as EVENT_GEOREP_FAULTY - from events.eventtypes import GEOREP_ACTIVE as EVENT_GEOREP_ACTIVE - from events.eventtypes import GEOREP_PASSIVE as EVENT_GEOREP_PASSIVE - from events.eventtypes import GEOREP_CHECKPOINT_COMPLETED \ + from gfevents.eventtypes import GEOREP_FAULTY as EVENT_GEOREP_FAULTY + from gfevents.eventtypes import GEOREP_ACTIVE as EVENT_GEOREP_ACTIVE + from gfevents.eventtypes import GEOREP_PASSIVE as EVENT_GEOREP_PASSIVE + from gfevents.eventtypes import GEOREP_CHECKPOINT_COMPLETED \ as EVENT_GEOREP_CHECKPOINT_COMPLETED except ImportError: # Events APIs not installed, dummy eventtypes with None @@ -55,14 +55,15 @@ from rconf import rconf from hashlib import sha256 as sha256 +ENOTSUP = getattr(errno, 'ENOTSUP', 'EOPNOTSUPP') + # auxiliary gfid based access prefix _CL_AUX_GFID_PFX = ".gfid/" +ROOT_GFID = "00000000-0000-0000-0000-000000000001" GF_OP_RETRIES = 10 GX_GFID_CANONICAL_LEN = 37 # canonical gfid len + '\0' -CHANGELOG_AGENT_SERVER_VERSION = 1.0 -CHANGELOG_AGENT_CLIENT_VERSION = 1.0 NodeID = None rsync_version = None unshare_mnt_propagation = None @@ -99,6 +100,19 @@ def unescape_space_newline(s): .replace(NEWLINE_ESCAPE_CHAR, "\n")\ .replace(PERCENTAGE_ESCAPE_CHAR, "%") +# gf_mount_ready() returns 1 if all subvols are up, else 0 +def gf_mount_ready(): + ret = errno_wrap(Xattr.lgetxattr, + ['.', 'dht.subvol.status', 16], + [ENOENT, ENOTSUP, ENODATA], [ENOMEM]) + + if isinstance(ret, int): + logging.error("failed to get the xattr value") + return 1 + ret = ret.rstrip('\x00') + if ret == "1": + return 1 + return 0 def norm(s): if s: @@ -330,13 +344,24 @@ def log_raise_exception(excont): ECONNABORTED): logging.error(lf('Gluster Mount process exited', error=errorcode[exc.errno])) + elif isinstance(exc, OSError) and exc.errno == EIO: + logging.error("Getting \"Input/Output error\" " + "is most likely due to " + "a. Brick is down or " + "b. Split brain issue.") + logging.error("This is expected as per design to " + "keep the consistency of the file system. " + "Once the above issue is resolved " + "geo-replication would automatically " + "proceed further.") + logtag = "FAIL" else: logtag = "FAIL" if not logtag and logging.getLogger().isEnabledFor(logging.DEBUG): logtag = "FULL EXCEPTION TRACE" if logtag: logging.exception(logtag + ": ") - sys.stderr.write("failed with %s.\n" % type(exc).__name__) + sys.stderr.write("failed with %s: %s.\n" % (type(exc).__name__, exc)) excont.exval = 1 sys.exit(excont.exval) @@ -563,7 +588,6 @@ def errno_wrap(call, arg=[], errnos=[], retry_errnos=[]): def lstat(e): return errno_wrap(os.lstat, [e], [ENOENT], [ESTALE, EBUSY]) - def get_gfid_from_mnt(gfidpath): return errno_wrap(Xattr.lgetxattr, [gfidpath, 'glusterfs.gfid.string', @@ -599,7 +623,7 @@ class ChangelogException(OSError): def gf_event(event_type, **kwargs): if EVENTS_ENABLED: - from events.gf_event import gf_event as gfevent + from gfevents.gf_event import gf_event as gfevent gfevent(event_type, **kwargs) @@ -670,9 +694,10 @@ def get_slv_dir_path(slv_host, slv_volume, gfid): global slv_bricks dir_path = ENOENT + pfx = gauxpfx() if not slv_bricks: - slv_info = Volinfo(slv_volume, slv_host) + slv_info = Volinfo(slv_volume, slv_host, master=False) slv_bricks = slv_info.bricks # Result of readlink would be of format as below. # readlink = "../../pgfid[0:2]/pgfid[2:4]/pgfid/basename" @@ -683,19 +708,32 @@ def get_slv_dir_path(slv_host, slv_volume, gfid): gfid[2:4], gfid], [ENOENT], [ESTALE]) if dir_path != ENOENT: - break - - if not isinstance(dir_path, int): - realpath = errno_wrap(os.readlink, [dir_path], - [ENOENT], [ESTALE]) - - if not isinstance(realpath, int): - realpath_parts = realpath.split('/') - pargfid = realpath_parts[-2] - basename = realpath_parts[-1] - pfx = gauxpfx() - dir_entry = os.path.join(pfx, pargfid, basename) - return dir_entry + try: + realpath = errno_wrap(os.readlink, [dir_path], + [ENOENT], [ESTALE]) + if not isinstance(realpath, int): + realpath_parts = realpath.split('/') + pargfid = realpath_parts[-2] + basename = realpath_parts[-1] + dir_entry = os.path.join(pfx, pargfid, basename) + return dir_entry + except OSError: + # .gfid/GFID + gfidpath = unescape_space_newline(os.path.join(pfx, gfid)) + realpath = errno_wrap(Xattr.lgetxattr_buf, + [gfidpath, 'glusterfs.gfid2path'], [ENOENT], [ESTALE]) + if not isinstance(realpath, int): + basename = os.path.basename(realpath).rstrip('\x00') + dirpath = os.path.dirname(realpath) + if dirpath == "/": + pargfid = ROOT_GFID + else: + dirpath = dirpath.strip("/") + pargfid = get_gfid_from_mnt(dirpath) + if isinstance(pargfid, int): + return None + dir_entry = os.path.join(pfx, pargfid, basename) + return dir_entry return None @@ -705,12 +743,12 @@ def lf(event, **kwargs): Log Format helper function, log messages can be easily modified to structured log format. lf("Config Change", sync_jobs=4, brick=/bricks/b1) will be - converted as "Config Change<TAB>brick=/bricks/b1<TAB>sync_jobs=4" + converted as "Config Change [{brick=/bricks/b1}, {sync_jobs=4}]" """ - msg = event + msgparts = [] for k, v in kwargs.items(): - msg += "\t{0}={1}".format(k, v) - return msg + msgparts.append("{%s=%s}" % (k, v)) + return "%s [%s]" % (event, ", ".join(msgparts)) class Popen(subprocess.Popen): @@ -847,7 +885,7 @@ class Popen(subprocess.Popen): break b = os.read(self.stderr.fileno(), 1024) if b: - elines.append(b) + elines.append(b.decode()) else: break self.stderr.close() @@ -856,10 +894,29 @@ class Popen(subprocess.Popen): self.errfail() +def host_brick_split(value): + """ + IPv6 compatible way to split and get the host + and brick information. Example inputs: + node1.example.com:/exports/bricks/brick1/brick + fe80::af0f:df82:844f:ef66%utun0:/exports/bricks/brick1/brick + """ + parts = value.split(":") + brick = parts[-1] + hostparts = parts[0:-1] + return (":".join(hostparts), brick) + + class Volinfo(object): - def __init__(self, vol, host='localhost', prelude=[]): - po = Popen(prelude + ['gluster', '--xml', '--remote-host=' + host, + def __init__(self, vol, host='localhost', prelude=[], master=True): + if master: + gluster_cmd_dir = gconf.get("gluster-command-dir") + else: + gluster_cmd_dir = gconf.get("slave-gluster-command-dir") + + gluster_cmd = os.path.join(gluster_cmd_dir, 'gluster') + po = Popen(prelude + [gluster_cmd, '--xml', '--remote-host=' + host, 'volume', 'info', vol], stdout=PIPE, stderr=PIPE, universal_newlines=True) vix = po.stdout.read() @@ -892,7 +949,7 @@ class Volinfo(object): @memoize def bricks(self): def bparse(b): - host, dirp = b.find("name").text.split(':', 2) + host, dirp = host_brick_split(b.find("name").text) return {'host': host, 'dir': dirp, 'uuid': b.find("hostUuid").text} return [bparse(b) for b in self.get('brick')] @@ -924,6 +981,14 @@ class Volinfo(object): else: return int(self.get('disperseCount')[0].text) + def distribution_count(self, tier, hot): + if (tier and hot): + return int(self.get('hotBricks/hotdistCount')[0].text) + elif (tier and not hot): + return int(self.get('coldBricks/colddistCount')[0].text) + else: + return int(self.get('distCount')[0].text) + @property @memoize def hot_bricks(self): @@ -960,6 +1025,16 @@ class VolinfoFromGconf(object): def is_hot(self, brickpath): return False + def is_uuid(self, value): + try: + uuid.UUID(value) + return True + except ValueError: + return False + + def possible_path(self, value): + return "/" in value + @property @memoize def bricks(self): @@ -973,8 +1048,22 @@ class VolinfoFromGconf(object): out = [] for b in bricks_data: parts = b.split(":") - bpath = parts[2] if len(parts) == 3 else "" - out.append({"host": parts[1], "dir": bpath, "uuid": parts[0]}) + b_uuid = None + if self.is_uuid(parts[0]): + b_uuid = parts[0] + # Set all parts except first + parts = parts[1:] + + if self.possible_path(parts[-1]): + bpath = parts[-1] + # Set all parts except last + parts = parts[0:-1] + + out.append({ + "host": ":".join(parts), # if remaining parts are IPv6 name + "dir": bpath, + "uuid": b_uuid + }) return out @@ -992,6 +1081,9 @@ class VolinfoFromGconf(object): def disperse_count(self, tier, hot): return gconf.get("master-disperse-count") + def distribution_count(self, tier, hot): + return gconf.get("master-distribution-count") + @property @memoize def hot_bricks(self): diff --git a/geo-replication/tests/unit/test_gsyncdstatus.py b/geo-replication/tests/unit/test_gsyncdstatus.py index 483023dbfe9..9c1aa2ad4ad 100755 --- a/geo-replication/tests/unit/test_gsyncdstatus.py +++ b/geo-replication/tests/unit/test_gsyncdstatus.py @@ -13,11 +13,11 @@ import unittest import os import urllib -from syncdaemon.gstatus import GeorepStatus, set_monitor_status -from syncdaemon.gstatus import get_default_values -from syncdaemon.gstatus import MONITOR_STATUS, DEFAULT_STATUS -from syncdaemon.gstatus import STATUS_VALUES, CRAWL_STATUS_VALUES -from syncdaemon.gstatus import human_time, human_time_utc +from syncdaemon.gstatus import (GeorepStatus, set_monitor_status, + get_default_values, + MONITOR_STATUS, DEFAULT_STATUS, + STATUS_VALUES, CRAWL_STATUS_VALUES, + human_time, human_time_utc) class GeorepStatusTestCase(unittest.TestCase): |
