summaryrefslogtreecommitdiffstats
path: root/geo-replication
diff options
context:
space:
mode:
Diffstat (limited to 'geo-replication')
-rw-r--r--geo-replication/gsyncd.conf.in22
-rw-r--r--geo-replication/setup.py6
-rw-r--r--geo-replication/src/gsyncd.c14
-rwxr-xr-xgeo-replication/src/gverify.sh33
-rw-r--r--geo-replication/src/peer_georep-sshkey.py.in4
-rwxr-xr-xgeo-replication/src/peer_gsec_create.in4
-rw-r--r--geo-replication/src/peer_mountbroker.py.in33
-rw-r--r--geo-replication/src/procdiggy.c24
-rwxr-xr-xgeo-replication/src/set_geo_rep_pem_keys.sh1
-rw-r--r--geo-replication/syncdaemon/Makefile.am2
-rw-r--r--geo-replication/syncdaemon/README.md1
-rw-r--r--geo-replication/syncdaemon/argsupgrade.py13
-rw-r--r--geo-replication/syncdaemon/changelogagent.py78
-rw-r--r--geo-replication/syncdaemon/gsyncd.py37
-rw-r--r--geo-replication/syncdaemon/gsyncdconfig.py100
-rw-r--r--geo-replication/syncdaemon/gsyncdstatus.py4
-rw-r--r--geo-replication/syncdaemon/libcxattr.py16
-rw-r--r--geo-replication/syncdaemon/libgfchangelog.py253
-rw-r--r--geo-replication/syncdaemon/master.py173
-rw-r--r--geo-replication/syncdaemon/monitor.py99
-rw-r--r--geo-replication/syncdaemon/py2py3.py141
-rw-r--r--geo-replication/syncdaemon/repce.py5
-rw-r--r--geo-replication/syncdaemon/resource.py217
-rw-r--r--geo-replication/syncdaemon/subcmds.py26
-rw-r--r--geo-replication/syncdaemon/syncdutils.py162
-rwxr-xr-xgeo-replication/tests/unit/test_gsyncdstatus.py10
26 files changed, 870 insertions, 608 deletions
diff --git a/geo-replication/gsyncd.conf.in b/geo-replication/gsyncd.conf.in
index 6160c7c7091..9688c79fab7 100644
--- a/geo-replication/gsyncd.conf.in
+++ b/geo-replication/gsyncd.conf.in
@@ -23,6 +23,11 @@ configurable=false
type=int
value=1
+[master-distribution-count]
+configurable=false
+type=int
+value=1
+
[glusterd-workdir]
value = @GLUSTERD_WORKDIR@
@@ -109,7 +114,7 @@ type=int
help=Minimum time interval in seconds for passive worker to become Active
[changelog-archive-format]
-value=%%Y%%m
+value=%Y%m
help=Processed changelogs will be archived in working directory. Pattern for archive file
[use-meta-volume]
@@ -118,7 +123,7 @@ type=bool
help=Use this to set Active Passive mode to meta-volume.
[meta-volume-mnt]
-value=/var/run/gluster/shared_storage
+value=/run/gluster/shared_storage
help=Meta Volume or Shared Volume mount path
[allow-network]
@@ -128,10 +133,11 @@ value=
value=5
type=int
-[use-tarssh]
-value=false
-type=bool
-help=Use sync-mode as tarssh
+[sync-method]
+value=rsync
+help=Sync method for data sync. Available methods are tar over ssh and rsync. Default is rsync.
+validation=choice
+allowed_values=tarssh,rsync
[remote-gsyncd]
value =
@@ -260,7 +266,9 @@ allowed_values=ERROR,INFO,WARNING,DEBUG
[ssh-port]
value=22
-validation=int
+validation=minmax
+min=1
+max=65535
help=Set SSH port
type=int
diff --git a/geo-replication/setup.py b/geo-replication/setup.py
index 6d678baa2f7..0eae469d2d6 100644
--- a/geo-replication/setup.py
+++ b/geo-replication/setup.py
@@ -1,7 +1,7 @@
#
# Copyright (c) 2011-2014 Red Hat, Inc. <http://www.redhat.com>
# This file is part of GlusterFS.
-
+#
# This file is licensed to you under your choice of the GNU Lesser
# General Public License, version 3 or any later version (LGPLv3 or
# later), or the GNU General Public License, version 2 (GPLv2), in all
@@ -20,11 +20,11 @@ setup(
name=name,
version="",
description='GlusterFS Geo Replication',
- license='',
+ license='GPLV2 and LGPLV3+',
author='Red Hat, Inc.',
author_email='gluster-devel@gluster.org',
url='http://www.gluster.org',
- packages=['syncdaemon', ],
+ packages=[name, ],
test_suite='nose.collector',
install_requires=[],
scripts=[],
diff --git a/geo-replication/src/gsyncd.c b/geo-replication/src/gsyncd.c
index cf0e76f69c4..b5aeec5bf33 100644
--- a/geo-replication/src/gsyncd.c
+++ b/geo-replication/src/gsyncd.c
@@ -7,8 +7,8 @@
later), or the GNU General Public License, version 2 (GPLv2), in all
cases as published by the Free Software Foundation.
*/
-#include "compat.h"
-#include "syscall.h"
+#include <glusterfs/compat.h>
+#include <glusterfs/syscall.h>
#include <stdlib.h>
#include <stdio.h>
@@ -24,13 +24,13 @@
* We unconditionally pass then while building gsyncd binary.
*/
#ifdef USE_LIBGLUSTERFS
-#include "glusterfs.h"
-#include "globals.h"
-#include "defaults.h"
+#include <glusterfs/glusterfs.h>
+#include <glusterfs/globals.h>
+#include <glusterfs/defaults.h>
#endif
-#include "common-utils.h"
-#include "run.h"
+#include <glusterfs/common-utils.h>
+#include <glusterfs/run.h>
#include "procdiggy.h"
#define _GLUSTERD_CALLED_ "_GLUSTERD_CALLED_"
diff --git a/geo-replication/src/gverify.sh b/geo-replication/src/gverify.sh
index d048de0992b..f5f70d245e0 100755
--- a/geo-replication/src/gverify.sh
+++ b/geo-replication/src/gverify.sh
@@ -94,6 +94,7 @@ echo $cmd_line;
function master_stats()
{
MASTERVOL=$1;
+ local inet6=$2;
local d;
local i;
local disk_size;
@@ -102,7 +103,12 @@ function master_stats()
local m_status;
d=$(mktemp -d -t ${0##*/}.XXXXXX 2>/dev/null);
- glusterfs -s localhost --xlator-option="*dht.lookup-unhashed=off" --volfile-id $MASTERVOL -l $master_log_file $d;
+ if [ "$inet6" = "inet6" ]; then
+ glusterfs -s localhost --xlator-option="*dht.lookup-unhashed=off" --xlator-option="transport.address-family=inet6" --volfile-id $MASTERVOL -l $master_log_file $d;
+ else
+ glusterfs -s localhost --xlator-option="*dht.lookup-unhashed=off" --volfile-id $MASTERVOL -l $master_log_file $d;
+ fi
+
i=$(get_inode_num $d);
if [[ "$i" -ne "1" ]]; then
echo 0:0;
@@ -124,12 +130,18 @@ function slave_stats()
SLAVEUSER=$1;
SLAVEHOST=$2;
SLAVEVOL=$3;
+ local inet6=$4;
local cmd_line;
local ver;
local status;
d=$(mktemp -d -t ${0##*/}.XXXXXX 2>/dev/null);
- glusterfs --xlator-option="*dht.lookup-unhashed=off" --volfile-server $SLAVEHOST --volfile-id $SLAVEVOL -l $slave_log_file $d;
+ if [ "$inet6" = "inet6" ]; then
+ glusterfs --xlator-option="*dht.lookup-unhashed=off" --xlator-option="transport.address-family=inet6" --volfile-server $SLAVEHOST --volfile-id $SLAVEVOL -l $slave_log_file $d;
+ else
+ glusterfs --xlator-option="*dht.lookup-unhashed=off" --volfile-server $SLAVEHOST --volfile-id $SLAVEVOL -l $slave_log_file $d;
+ fi
+
i=$(get_inode_num $d);
if [[ "$i" -ne "1" ]]; then
echo 0:0;
@@ -167,6 +179,10 @@ function main()
log_file=$6
> $log_file
+ inet6=$7
+ local cmd_line
+ local ver
+
# Use FORCE_BLOCKER flag in the error message to differentiate
# between the errors which the force command should bypass
@@ -192,20 +208,21 @@ function main()
exit 1;
fi;
+ cmd_line=$(cmd_slave);
if [[ -z "${GR_SSH_IDENTITY_KEY}" ]]; then
- err=$((ssh -p ${SSH_PORT} -oNumberOfPasswordPrompts=0 -oStrictHostKeyChecking=no $2@$3 "gluster --version") 2>&1)
+ ver=$(ssh -p ${SSH_PORT} -oNumberOfPasswordPrompts=0 -oStrictHostKeyChecking=no $2@$3 bash -c "'$cmd_line'")
else
- err=$((ssh -p ${SSH_PORT} -i ${GR_SSH_IDENTITY_KEY} -oNumberOfPasswordPrompts=0 -oStrictHostKeyChecking=no $2@$3 "gluster --version") 2>&1)
+ ver=$(ssh -p ${SSH_PORT} -i ${GR_SSH_IDENTITY_KEY} -oNumberOfPasswordPrompts=0 -oStrictHostKeyChecking=no $2@$3 bash -c "'$cmd_line'")
fi
- if [ $? -ne 0 ]; then
- echo "FORCE_BLOCKER|gluster command on $2@$3 failed. Error: $err" > $log_file
+ if [ -z "$ver" ]; then
+ echo "FORCE_BLOCKER|gluster command not found on $3 for user $2." > $log_file
exit 1;
fi;
ERRORS=0;
- master_data=$(master_stats $1);
- slave_data=$(slave_stats $2 $3 $4);
+ master_data=$(master_stats $1 ${inet6});
+ slave_data=$(slave_stats $2 $3 $4 ${inet6});
master_disk_size=$(echo $master_data | cut -f1 -d':');
slave_disk_size=$(echo $slave_data | cut -f1 -d':');
master_used_size=$(echo $master_data | cut -f2 -d':');
diff --git a/geo-replication/src/peer_georep-sshkey.py.in b/geo-replication/src/peer_georep-sshkey.py.in
index 2196fd7491a..58696e9a616 100644
--- a/geo-replication/src/peer_georep-sshkey.py.in
+++ b/geo-replication/src/peer_georep-sshkey.py.in
@@ -30,8 +30,8 @@ from prettytable import PrettyTable
SECRET_PEM = "@GLUSTERD_WORKDIR@/geo-replication/secret.pem"
TAR_SSH_PEM = "@GLUSTERD_WORKDIR@/geo-replication/tar_ssh.pem"
-GSYNCD_CMD = 'command="@GLUSTERFS_LIBEXECDIR@/gsyncd" '
-TAR_CMD = 'command="tar ${SSH_ORIGINAL_COMMAND#* }" '
+GSYNCD_CMD = 'command="@GLUSTERFS_LIBEXECDIR@/gsyncd" '
+TAR_CMD = 'command="tar ${SSH_ORIGINAL_COMMAND#* }" '
COMMON_SECRET_FILE = "@GLUSTERD_WORKDIR@/geo-replication/common_secret.pem.pub"
diff --git a/geo-replication/src/peer_gsec_create.in b/geo-replication/src/peer_gsec_create.in
index 05c1638bdcd..6d4a4847013 100755
--- a/geo-replication/src/peer_gsec_create.in
+++ b/geo-replication/src/peer_gsec_create.in
@@ -18,7 +18,7 @@ if [ "Xcontainer" = "X$1" ]; then
output1=`cat "$GLUSTERD_WORKDIR"/geo-replication/secret.pem.pub`
output2=`cat "$GLUSTERD_WORKDIR"/geo-replication/tar_ssh.pem.pub`
else
- output1=`echo command=\"${libexecdir}/glusterfs/gsyncd\" " "``cat "$GLUSTERD_WORKDIR"/geo-replication/secret.pem.pub`
- output2=`echo command=\"tar \$\{SSH_ORIGINAL_COMMAND#* \}\" " "``cat "$GLUSTERD_WORKDIR"/geo-replication/tar_ssh.pem.pub`
+ output1=`echo command=\"${libexecdir}/glusterfs/gsyncd\" ""``cat "$GLUSTERD_WORKDIR"/geo-replication/secret.pem.pub`
+ output2=`echo command=\"tar \$\{SSH_ORIGINAL_COMMAND#* \}\" ""``cat "$GLUSTERD_WORKDIR"/geo-replication/tar_ssh.pem.pub`
fi
echo -e "$output1\n$output2"
diff --git a/geo-replication/src/peer_mountbroker.py.in b/geo-replication/src/peer_mountbroker.py.in
index 83d385cb5eb..40b90ffc560 100644
--- a/geo-replication/src/peer_mountbroker.py.in
+++ b/geo-replication/src/peer_mountbroker.py.in
@@ -11,6 +11,7 @@ from gluster.cliutils import (execute, Cmd, node_output_ok,
from prettytable import PrettyTable
LOG_DIR = "@localstatedir@/log/glusterfs/geo-replication-slaves"
+CLI_LOG = "@localstatedir@/log/glusterfs/cli.log"
GEOREP_DIR = "@GLUSTERD_WORKDIR@/geo-replication"
GLUSTERD_VOLFILE = "@GLUSTERD_VOLFILE@"
@@ -46,7 +47,7 @@ class MountbrokerUserMgmt(object):
for line in f:
line = line.strip()
if line.startswith("option "):
- key, value = line.split(" ")[1:]
+ key, value = line.split()[1:]
self._options[key] = value
if line.startswith("#"):
self.commented_lines.append(line)
@@ -145,7 +146,7 @@ class NodeSetup(Cmd):
# chgrp -R <grp> /var/log/glusterfs/geo-replication-slaves
# chgrp -R <grp> /var/lib/glusterd/geo-replication
# chmod -R 770 /var/log/glusterfs/geo-replication-slaves
- # chmod -R 770 /var/lib/glusterd/geo-replication
+ # chmod 770 /var/lib/glusterd/geo-replication
# mkdir -p <mnt_root>
# chmod 0711 <mnt_root>
# If selinux,
@@ -195,8 +196,13 @@ class NodeSetup(Cmd):
execute(["chgrp", "-R", args.group, GEOREP_DIR])
execute(["chgrp", "-R", args.group, LOG_DIR])
- execute(["chmod", "-R", "770", GEOREP_DIR])
- execute(["chmod", "-R", "770", args.group, LOG_DIR])
+ execute(["chgrp", args.group, CLI_LOG])
+ execute(["chmod", "770", GEOREP_DIR])
+ execute(["find", LOG_DIR, "-type", "d", "-exec", "chmod", "770", "{}",
+ "+"])
+ execute(["find", LOG_DIR, "-type", "f", "-exec", "chmod", "660", "{}",
+ "+"])
+ execute(["chmod", "660", CLI_LOG])
m.set_mount_root_and_group(args.mount_root, args.group)
m.save()
@@ -216,8 +222,10 @@ class CliSetup(Cmd):
name = "setup"
def args(self, parser):
- parser.add_argument("mount_root")
- parser.add_argument("group")
+ parser.add_argument("mount_root",
+ help="Path to the mountbroker-root directory.")
+ parser.add_argument("group",
+ help="Group to be used for setup.")
def run(self, args):
out = execute_in_peers("node-setup", [args.mount_root,
@@ -273,7 +281,7 @@ class CliStatus(Cmd):
for p in out:
node_data = p.output
- if node_data == "":
+ if node_data == "" or node_data == "N/A":
node_data = {}
users_row_data = ""
@@ -327,8 +335,10 @@ class CliAdd(Cmd):
name = "add"
def args(self, parser):
- parser.add_argument("volume")
- parser.add_argument("user")
+ parser.add_argument("volume",
+ help="Volume to be added.")
+ parser.add_argument("user",
+ help="User for which volume is to be added.")
def run(self, args):
out = execute_in_peers("node-add", [args.volume,
@@ -368,8 +378,9 @@ class CliRemove(Cmd):
name = "remove"
def args(self, parser):
- parser.add_argument("--volume", default=".")
- parser.add_argument("--user", default=".")
+ parser.add_argument("--volume", default=".", help="Volume to be removed.")
+ parser.add_argument("--user", default=".",
+ help="User for which volume has to be removed.")
def run(self, args):
out = execute_in_peers("node-remove", [args.volume,
diff --git a/geo-replication/src/procdiggy.c b/geo-replication/src/procdiggy.c
index 05c1e1edc68..8068ef79a42 100644
--- a/geo-replication/src/procdiggy.c
+++ b/geo-replication/src/procdiggy.c
@@ -15,8 +15,8 @@
#include <ctype.h>
#include <sys/param.h> /* for PATH_MAX */
-#include "common-utils.h"
-#include "syscall.h"
+#include <glusterfs/common-utils.h>
+#include <glusterfs/syscall.h>
#include "procdiggy.h"
pid_t
@@ -31,6 +31,10 @@ pidinfo(pid_t pid, char **name)
};
char *p = NULL;
int ret = 0;
+ pid_t lpid = -1;
+
+ if (name)
+ *name = NULL;
snprintf(path, sizeof path, PROC "/%d/status", pid);
@@ -38,14 +42,12 @@ pidinfo(pid_t pid, char **name)
if (!f)
return -1;
- if (name)
- *name = NULL;
for (;;) {
size_t len;
memset(buf, 0, sizeof(buf));
if (fgets(buf, sizeof(buf), f) == NULL || (len = strlen(buf)) == 0 ||
buf[len - 1] != '\n') {
- pid = -1;
+ lpid = -1;
goto out;
}
buf[len - 1] = '\0';
@@ -57,7 +59,7 @@ pidinfo(pid_t pid, char **name)
;
*name = gf_strdup(p);
if (!*name) {
- pid = -2;
+ lpid = -2;
goto out;
}
continue;
@@ -71,17 +73,17 @@ pidinfo(pid_t pid, char **name)
while (isspace(*++p))
;
- ret = gf_string2int(p, &pid);
+ ret = gf_string2int(p, &lpid);
if (ret == -1)
- pid = -1;
+ lpid = -1;
out:
fclose(f);
- if (pid == -1 && name && *name)
+ if (lpid == -1 && name && *name)
GF_FREE(*name);
- if (pid == -2)
+ if (lpid == -2)
fprintf(stderr, "out of memory\n");
- return pid;
+ return lpid;
}
int
diff --git a/geo-replication/src/set_geo_rep_pem_keys.sh b/geo-replication/src/set_geo_rep_pem_keys.sh
index ae23f4ff0c6..8a43fa39d1f 100755
--- a/geo-replication/src/set_geo_rep_pem_keys.sh
+++ b/geo-replication/src/set_geo_rep_pem_keys.sh
@@ -47,6 +47,7 @@ function main()
cp $home_dir/${COMMON_SECRET_PEM_PUB} ${GLUSTERD_WORKDIR}/geo-replication/
gluster system:: copy file /geo-replication/${COMMON_SECRET_PEM_PUB}
gluster system:: execute add_secret_pub $user geo-replication/${master_vol}_${slave_vol}_common_secret.pem.pub
+ gluster vol set ${slave_vol} features.read-only on
else
echo "$home_dir/common_secret.pem.pub not present. Please run geo-replication command on master with push-pem option to generate the file"
exit 1;
diff --git a/geo-replication/syncdaemon/Makefile.am b/geo-replication/syncdaemon/Makefile.am
index 62c5ce7fe30..d70e3368faf 100644
--- a/geo-replication/syncdaemon/Makefile.am
+++ b/geo-replication/syncdaemon/Makefile.am
@@ -2,7 +2,7 @@ syncdaemondir = $(GLUSTERFS_LIBEXECDIR)/python/syncdaemon
syncdaemon_PYTHON = rconf.py gsyncd.py __init__.py master.py README.md repce.py \
resource.py syncdutils.py monitor.py libcxattr.py gsyncdconfig.py \
- libgfchangelog.py changelogagent.py gsyncdstatus.py conf.py logutils.py \
+ libgfchangelog.py gsyncdstatus.py conf.py logutils.py \
subcmds.py argsupgrade.py py2py3.py
CLEANFILES =
diff --git a/geo-replication/syncdaemon/README.md b/geo-replication/syncdaemon/README.md
index 2a202e3f99e..5ab785ae669 100644
--- a/geo-replication/syncdaemon/README.md
+++ b/geo-replication/syncdaemon/README.md
@@ -19,7 +19,6 @@ INSTALLATION
As of now, the supported way of operation is running from the source directory or using the RPMs given.
-If you use Python 2.4.x, you need to install the [Ctypes module](http://python.net/crew/theller/ctypes/).
CONFIGURATION
-------------
diff --git a/geo-replication/syncdaemon/argsupgrade.py b/geo-replication/syncdaemon/argsupgrade.py
index 4018143b8b4..7af40633ef8 100644
--- a/geo-replication/syncdaemon/argsupgrade.py
+++ b/geo-replication/syncdaemon/argsupgrade.py
@@ -84,6 +84,10 @@ def upgrade():
# fail when it does stat to check the existence.
init_gsyncd_template_conf()
+ inet6 = False
+ if "--inet6" in sys.argv:
+ inet6 = True
+
if "--monitor" in sys.argv:
# python gsyncd.py --path=/bricks/b1
# --monitor -c gsyncd.conf
@@ -147,8 +151,11 @@ def upgrade():
user, hname = remote_addr.split("@")
+ if not inet6:
+ hname = gethostbyname(hname)
+
print(("ssh://%s@%s:gluster://127.0.0.1:%s" % (
- user, gethostbyname(hname), vol)))
+ user, hname, vol)))
sys.exit(0)
elif "--normalize-url" in sys.argv:
@@ -346,3 +353,7 @@ def upgrade():
if pargs.reset_sync_time:
sys.argv.append("--reset-sync-time")
+
+ if inet6:
+ # Add `--inet6` as first argument
+ sys.argv = [sys.argv[0], "--inet6"] + sys.argv[1:]
diff --git a/geo-replication/syncdaemon/changelogagent.py b/geo-replication/syncdaemon/changelogagent.py
deleted file mode 100644
index c5fdbc3a74f..00000000000
--- a/geo-replication/syncdaemon/changelogagent.py
+++ /dev/null
@@ -1,78 +0,0 @@
-#!/usr/bin/python3
-#
-# Copyright (c) 2011-2014 Red Hat, Inc. <http://www.redhat.com>
-# This file is part of GlusterFS.
-
-# This file is licensed to you under your choice of the GNU Lesser
-# General Public License, version 3 or any later version (LGPLv3 or
-# later), or the GNU General Public License, version 2 (GPLv2), in all
-# cases as published by the Free Software Foundation.
-#
-
-import logging
-import syncdutils
-from syncdutils import select, CHANGELOG_AGENT_SERVER_VERSION
-from repce import RepceServer
-
-
-class _MetaChangelog(object):
-
- def __getattr__(self, meth):
- from libgfchangelog import Changes as LChanges
- xmeth = [m for m in dir(LChanges) if m[0] != '_']
- if meth not in xmeth:
- return
- for m in xmeth:
- setattr(self, m, getattr(LChanges, m))
- return getattr(self, meth)
-
-Changes = _MetaChangelog()
-
-
-class Changelog(object):
- def version(self):
- return CHANGELOG_AGENT_SERVER_VERSION
-
- def init(self):
- return Changes.cl_init()
-
- def register(self, cl_brick, cl_dir, cl_log, cl_level, retries=0):
- return Changes.cl_register(cl_brick, cl_dir, cl_log, cl_level, retries)
-
- def scan(self):
- return Changes.cl_scan()
-
- def getchanges(self):
- return Changes.cl_getchanges()
-
- def done(self, clfile):
- return Changes.cl_done(clfile)
-
- def history(self, changelog_path, start, end, num_parallel):
- return Changes.cl_history_changelog(changelog_path, start, end,
- num_parallel)
-
- def history_scan(self):
- return Changes.cl_history_scan()
-
- def history_getchanges(self):
- return Changes.cl_history_getchanges()
-
- def history_done(self, clfile):
- return Changes.cl_history_done(clfile)
-
-
-class ChangelogAgent(object):
- def __init__(self, obj, fd_tup):
- (inf, ouf, rw, ww) = fd_tup.split(',')
- repce = RepceServer(obj, int(inf), int(ouf), 1)
- t = syncdutils.Thread(target=lambda: (repce.service_loop(),
- syncdutils.finalize()))
- t.start()
- logging.info('Agent listining...')
-
- select((), (), ())
-
-
-def agent(obj, fd_tup):
- return ChangelogAgent(obj, fd_tup)
diff --git a/geo-replication/syncdaemon/gsyncd.py b/geo-replication/syncdaemon/gsyncd.py
index 037f351151b..257ed72c6ae 100644
--- a/geo-replication/syncdaemon/gsyncd.py
+++ b/geo-replication/syncdaemon/gsyncd.py
@@ -22,8 +22,8 @@ import gsyncdconfig as gconf
from rconf import rconf
import subcmds
from conf import GLUSTERD_WORKDIR, GLUSTERFS_CONFDIR, GCONF_VERSION
-from syncdutils import set_term_handler, finalize, lf
-from syncdutils import log_raise_exception, FreeObject, escape
+from syncdutils import (set_term_handler, finalize, lf,
+ log_raise_exception, FreeObject, escape)
import argsupgrade
@@ -47,6 +47,7 @@ def main():
sys.exit(0)
parser = ArgumentParser()
+ parser.add_argument("--inet6", action="store_true")
sp = parser.add_subparsers(dest="subcmd")
# Monitor Status File update
@@ -78,8 +79,6 @@ def main():
help="feedback fd between monitor and worker")
p.add_argument("--local-node", help="Local master node")
p.add_argument("--local-node-id", help="Local Node ID")
- p.add_argument("--rpc-fd",
- help="Read and Write fds for worker-agent communication")
p.add_argument("--subvol-num", type=int, help="Subvolume number")
p.add_argument("--is-hottier", action="store_true",
help="Is this brick part of hot tier")
@@ -91,19 +90,6 @@ def main():
p.add_argument("-c", "--config-file", help="Config File")
p.add_argument("--debug", action="store_true")
- # Agent
- p = sp.add_parser("agent")
- p.add_argument("master", help="Master Volume Name")
- p.add_argument("slave", help="Slave details user@host::vol format")
- p.add_argument("--local-path", help="Local brick path")
- p.add_argument("--local-node", help="Local master node")
- p.add_argument("--local-node-id", help="Local Node ID")
- p.add_argument("--slave-id", help="Slave Volume ID")
- p.add_argument("--rpc-fd",
- help="Read and Write fds for worker-agent communication")
- p.add_argument("-c", "--config-file", help="Config File")
- p.add_argument("--debug", action="store_true")
-
# Slave
p = sp.add_parser("slave")
p.add_argument("master", help="Master Volume Name")
@@ -133,6 +119,8 @@ def main():
help="Directory where Gluster binaries exist on slave")
p.add_argument("--slave-access-mount", action="store_true",
help="Do not lazy umount the slave volume")
+ p.add_argument("--master-dist-count", type=int,
+ help="Master Distribution count")
# Status
p = sp.add_parser("status")
@@ -228,7 +216,8 @@ def main():
# Set default path for config file in that case
# If an subcmd accepts config file then it also accepts
# master and Slave arguments.
- if config_file is None and hasattr(args, "config_file"):
+ if config_file is None and hasattr(args, "config_file") \
+ and args.subcmd != "slave":
config_file = "%s/geo-replication/%s_%s_%s/gsyncd.conf" % (
GLUSTERD_WORKDIR,
args.master,
@@ -252,6 +241,12 @@ def main():
if args.subcmd == "slave":
override_from_args = True
+ if config_file is not None and \
+ args.subcmd in ["monitor", "config-get", "config-set", "config-reset"]:
+ ret = gconf.is_config_file_old(config_file, args.master, extra_tmpl_args["slavevol"])
+ if ret is not None:
+ gconf.config_upgrade(config_file, ret)
+
# Load Config file
gconf.load(GLUSTERFS_CONFDIR + "/gsyncd.conf",
config_file,
@@ -261,8 +256,8 @@ def main():
# Default label to print in log file
label = args.subcmd
- if args.subcmd in ("worker", "agent"):
- # If Worker or agent, then add brick path also to label
+ if args.subcmd in ("worker"):
+ # If Worker, then add brick path also to label
label = "%s %s" % (args.subcmd, args.local_path)
elif args.subcmd == "slave":
# If Slave add Master node and Brick details
@@ -305,7 +300,7 @@ def main():
# Log message for loaded config file
if config_file is not None:
- logging.info(lf("Using session config file", path=config_file))
+ logging.debug(lf("Using session config file", path=config_file))
set_term_handler()
excont = FreeObject(exval=0)
diff --git a/geo-replication/syncdaemon/gsyncdconfig.py b/geo-replication/syncdaemon/gsyncdconfig.py
index 5d439a4c5ee..8848071997a 100644
--- a/geo-replication/syncdaemon/gsyncdconfig.py
+++ b/geo-replication/syncdaemon/gsyncdconfig.py
@@ -10,12 +10,14 @@
#
try:
- from configparser import ConfigParser, NoSectionError
+ from ConfigParser import RawConfigParser, NoSectionError
except ImportError:
- from ConfigParser import ConfigParser, NoSectionError
+ from configparser import RawConfigParser, NoSectionError
import os
+import shutil
from string import Template
from datetime import datetime
+from threading import Lock
# Global object which can be used in other modules
@@ -34,6 +36,7 @@ class GconfInvalidValue(Exception):
class Gconf(object):
def __init__(self, default_conf_file, custom_conf_file=None,
args={}, extra_tmpl_args={}, override_from_args=False):
+ self.lock = Lock()
self.default_conf_file = default_conf_file
self.custom_conf_file = custom_conf_file
self.tmp_conf_file = None
@@ -91,7 +94,7 @@ class Gconf(object):
if name != "all" and not self._is_configurable(name):
raise GconfNotConfigurable()
- cnf = ConfigParser()
+ cnf = RawConfigParser()
with open(self.custom_conf_file) as f:
cnf.readfp(f)
@@ -135,7 +138,7 @@ class Gconf(object):
if curr_val == value:
return True
- cnf = ConfigParser()
+ cnf = RawConfigParser()
with open(self.custom_conf_file) as f:
cnf.readfp(f)
@@ -162,6 +165,11 @@ class Gconf(object):
if value is not None and not self._is_valid_value(name, value):
raise GconfInvalidValue()
+
+ def _load_with_lock(self):
+ with self.lock:
+ self._load()
+
def _load(self):
self.gconf = {}
self.template_conf = []
@@ -170,7 +178,7 @@ class Gconf(object):
self.session_conf_items = []
self.default_values = {}
- conf = ConfigParser()
+ conf = RawConfigParser()
# Default Template config file
with open(self.default_conf_file) as f:
conf.readfp(f)
@@ -229,12 +237,19 @@ class Gconf(object):
self._tmpl_substitute()
self._do_typecast()
- def reload(self):
+ def reload(self, with_lock=True):
if self._is_config_changed():
- self._load()
+ if with_lock:
+ self._load_with_lock()
+ else:
+ self._load()
- def get(self, name, default_value=None):
- return self.gconf.get(name, default_value)
+ def get(self, name, default_value=None, with_lock=True):
+ if with_lock:
+ with self.lock:
+ return self.gconf.get(name, default_value)
+ else:
+ return self.gconf.get(name, default_value)
def getall(self, show_defaults=False, show_non_configurable=False):
cnf = {}
@@ -275,8 +290,9 @@ class Gconf(object):
return cnf
def getr(self, name, default_value=None):
- self.reload()
- return self.get(name, default_value)
+ with self.lock:
+ self.reload(with_lock=False)
+ return self.get(name, default_value, with_lock=False)
def get_help(self, name=None):
pass
@@ -313,6 +329,9 @@ class Gconf(object):
if item["validation"] == "unixtime":
return validate_unixtime(value)
+ if item["validation"] == "int":
+ return validate_int(value)
+
return False
def _is_config_changed(self):
@@ -325,6 +344,53 @@ class Gconf(object):
return False
+def is_config_file_old(config_file, mastervol, slavevol):
+ cnf = RawConfigParser()
+ cnf.read(config_file)
+ session_section = "peers %s %s" % (mastervol, slavevol)
+ try:
+ return dict(cnf.items(session_section))
+ except NoSectionError:
+ return None
+
+def config_upgrade(config_file, ret):
+ config_file_backup = os.path.join(os.path.dirname(config_file), "gsyncd.conf.bkp")
+
+ #copy old config file in a backup file
+ shutil.copyfile(config_file, config_file_backup)
+
+ #write a new config file
+ config = RawConfigParser()
+ config.add_section('vars')
+
+ for key, value in ret.items():
+ #handle option name changes
+ if key == "use_tarssh":
+ new_key = "sync-method"
+ if value == "true":
+ new_value = "tarssh"
+ else:
+ new_value = "rsync"
+ config.set('vars', new_key, new_value)
+ elif key == "timeout":
+ new_key = "slave-timeout"
+ config.set('vars', new_key, value)
+ #for changes like: ignore_deletes to ignore-deletes
+ else:
+ new_key = key.replace("_", "-")
+ config.set('vars', new_key, value)
+
+ with open(config_file, 'w') as configfile:
+ config.write(configfile)
+
+
+def validate_int(value):
+ try:
+ _ = int(value)
+ return True
+ except ValueError:
+ return False
+
def validate_unixtime(value):
try:
@@ -338,11 +404,13 @@ def validate_unixtime(value):
def validate_minmax(value, minval, maxval):
- value = int(value)
- minval = int(minval)
- maxval = int(maxval)
-
- return value >= minval and value <= maxval
+ try:
+ value = int(value)
+ minval = int(minval)
+ maxval = int(maxval)
+ return value >= minval and value <= maxval
+ except ValueError:
+ return False
def validate_choice(value, allowed_values):
diff --git a/geo-replication/syncdaemon/gsyncdstatus.py b/geo-replication/syncdaemon/gsyncdstatus.py
index 72bcb092f01..1a655ff8887 100644
--- a/geo-replication/syncdaemon/gsyncdstatus.py
+++ b/geo-replication/syncdaemon/gsyncdstatus.py
@@ -23,8 +23,8 @@ from datetime import datetime
from errno import EACCES, EAGAIN, ENOENT
import logging
-from syncdutils import EVENT_GEOREP_ACTIVE, EVENT_GEOREP_PASSIVE, gf_event
-from syncdutils import EVENT_GEOREP_CHECKPOINT_COMPLETED, lf
+from syncdutils import (EVENT_GEOREP_ACTIVE, EVENT_GEOREP_PASSIVE, gf_event,
+ EVENT_GEOREP_CHECKPOINT_COMPLETED, lf)
DEFAULT_STATUS = "N/A"
MONITOR_STATUS = ("Created", "Started", "Paused", "Stopped")
diff --git a/geo-replication/syncdaemon/libcxattr.py b/geo-replication/syncdaemon/libcxattr.py
index 7f3f6ce453a..e6406c36bd7 100644
--- a/geo-replication/syncdaemon/libcxattr.py
+++ b/geo-replication/syncdaemon/libcxattr.py
@@ -9,9 +9,9 @@
#
import os
-from ctypes import CDLL, create_string_buffer, get_errno
-import py2py3
-from py2py3 import bytearray_to_str
+from ctypes import CDLL, get_errno
+from py2py3 import (bytearray_to_str, gr_create_string_buffer,
+ gr_query_xattr, gr_lsetxattr, gr_lremovexattr)
class Xattr(object):
@@ -40,7 +40,7 @@ class Xattr(object):
@classmethod
def _query_xattr(cls, path, siz, syscall, *a):
if siz:
- buf = create_string_buffer(b'\0' * siz)
+ buf = gr_create_string_buffer(siz)
else:
buf = None
ret = getattr(cls.libc, syscall)(*((path,) + a + (buf, siz)))
@@ -56,7 +56,7 @@ class Xattr(object):
@classmethod
def lgetxattr(cls, path, attr, siz=0):
- return cls._query_xattr(path.encode(), siz, 'lgetxattr', attr.encode())
+ return gr_query_xattr(cls, path, siz, 'lgetxattr', attr)
@classmethod
def lgetxattr_buf(cls, path, attr):
@@ -70,7 +70,7 @@ class Xattr(object):
@classmethod
def llistxattr(cls, path, siz=0):
- ret = cls._query_xattr(path.encode(), siz, 'llistxattr')
+ ret = gr_query_xattr(cls, path, siz, 'llistxattr')
if isinstance(ret, str):
ret = ret.strip('\0')
ret = ret.split('\0') if ret else []
@@ -78,13 +78,13 @@ class Xattr(object):
@classmethod
def lsetxattr(cls, path, attr, val):
- ret = cls.libc.lsetxattr(path.encode(), attr.encode(), val, len(val), 0)
+ ret = gr_lsetxattr(cls, path, attr, val)
if ret == -1:
cls.raise_oserr()
@classmethod
def lremovexattr(cls, path, attr):
- ret = cls.libc.lremovexattr(path.encode(), attr.encode())
+ ret = gr_lremovexattr(cls, path, attr)
if ret == -1:
cls.raise_oserr()
diff --git a/geo-replication/syncdaemon/libgfchangelog.py b/geo-replication/syncdaemon/libgfchangelog.py
index cc40fd5475d..a3bda7282c0 100644
--- a/geo-replication/syncdaemon/libgfchangelog.py
+++ b/geo-replication/syncdaemon/libgfchangelog.py
@@ -9,130 +9,135 @@
#
import os
-from ctypes import CDLL, RTLD_GLOBAL, create_string_buffer, \
- get_errno, byref, c_ulong
+from ctypes import CDLL, RTLD_GLOBAL, get_errno, byref, c_ulong
+from ctypes.util import find_library
from syncdutils import ChangelogException, ChangelogHistoryNotAvailable
+from py2py3 import (gr_cl_history_changelog, gr_cl_done,
+ gr_create_string_buffer, gr_cl_register,
+ gr_cl_history_done, bytearray_to_str)
-class Changes(object):
- libgfc = CDLL("libgfchangelog.so", mode=RTLD_GLOBAL,
- use_errno=True)
-
- @classmethod
- def geterrno(cls):
- return get_errno()
-
- @classmethod
- def raise_changelog_err(cls):
- errn = cls.geterrno()
- raise ChangelogException(errn, os.strerror(errn))
-
- @classmethod
- def _get_api(cls, call):
- return getattr(cls.libgfc, call)
-
- @classmethod
- def cl_init(cls):
- ret = cls._get_api('gf_changelog_init')(None)
- if ret == -1:
- cls.raise_changelog_err()
-
- @classmethod
- def cl_register(cls, brick, path, log_file, log_level, retries=0):
- ret = cls._get_api('gf_changelog_register')(brick.encode(), path.encode(),
- log_file.encode(),
- log_level, retries)
- if ret == -1:
- cls.raise_changelog_err()
-
- @classmethod
- def cl_scan(cls):
- ret = cls._get_api('gf_changelog_scan')()
- if ret == -1:
- cls.raise_changelog_err()
-
- @classmethod
- def cl_startfresh(cls):
- ret = cls._get_api('gf_changelog_start_fresh')()
- if ret == -1:
- cls.raise_changelog_err()
-
- @classmethod
- def cl_getchanges(cls):
- """ remove hardcoding for path name length """
- def clsort(f):
- return f.split('.')[-1]
- changes = []
- buf = create_string_buffer(b'\0' * 4096)
- call = cls._get_api('gf_changelog_next_change')
-
- while True:
- ret = call(buf, 4096)
- if ret in (0, -1):
- break
- changes.append(buf.raw[:ret - 1].decode())
- if ret == -1:
- cls.raise_changelog_err()
- # cleanup tracker
- cls.cl_startfresh()
- return sorted(changes, key=clsort)
-
- @classmethod
- def cl_done(cls, clfile):
- ret = cls._get_api('gf_changelog_done')(clfile.encode())
- if ret == -1:
- cls.raise_changelog_err()
-
- @classmethod
- def cl_history_scan(cls):
- ret = cls._get_api('gf_history_changelog_scan')()
- if ret == -1:
- cls.raise_changelog_err()
-
- return ret
-
- @classmethod
- def cl_history_changelog(cls, changelog_path, start, end, num_parallel):
- actual_end = c_ulong()
- ret = cls._get_api('gf_history_changelog')(changelog_path.encode(), start, end,
- num_parallel,
- byref(actual_end))
- if ret == -1:
- cls.raise_changelog_err()
-
- if ret == -2:
- raise ChangelogHistoryNotAvailable()
-
- return (ret, actual_end.value)
-
- @classmethod
- def cl_history_startfresh(cls):
- ret = cls._get_api('gf_history_changelog_start_fresh')()
- if ret == -1:
- cls.raise_changelog_err()
-
- @classmethod
- def cl_history_getchanges(cls):
- """ remove hardcoding for path name length """
- def clsort(f):
- return f.split('.')[-1]
-
- changes = []
- buf = create_string_buffer(b'\0' * 4096)
- call = cls._get_api('gf_history_changelog_next_change')
-
- while True:
- ret = call(buf, 4096)
- if ret in (0, -1):
- break
- changes.append(buf.raw[:ret - 1].decode())
- if ret == -1:
- cls.raise_changelog_err()
-
- return sorted(changes, key=clsort)
-
- @classmethod
- def cl_history_done(cls, clfile):
- ret = cls._get_api('gf_history_changelog_done')(clfile.encode())
- if ret == -1:
- cls.raise_changelog_err()
+libgfc = CDLL(
+ find_library("gfchangelog"),
+ mode=RTLD_GLOBAL,
+ use_errno=True
+)
+
+
+def _raise_changelog_err():
+ errn = get_errno()
+ raise ChangelogException(errn, os.strerror(errn))
+
+
+def _init():
+ if libgfc.gf_changelog_init(None) == -1:
+ _raise_changelog_err()
+
+
+def register(brick, path, log_file, log_level, retries=0):
+ _init()
+
+ ret = gr_cl_register(libgfc, brick, path, log_file, log_level, retries)
+
+ if ret == -1:
+ _raise_changelog_err()
+
+
+def scan():
+ ret = libgfc.gf_changelog_scan()
+ if ret == -1:
+ _raise_changelog_err()
+
+
+def startfresh():
+ ret = libgfc.gf_changelog_start_fresh()
+ if ret == -1:
+ _raise_changelog_err()
+
+
+def getchanges():
+ def clsort(cfile):
+ return cfile.split('.')[-1]
+
+ changes = []
+ buf = gr_create_string_buffer(4096)
+ call = libgfc.gf_changelog_next_change
+
+ while True:
+ ret = call(buf, 4096)
+ if ret in (0, -1):
+ break
+
+ # py2 and py3 compatibility
+ result = bytearray_to_str(buf.raw[:ret - 1])
+ changes.append(result)
+
+ if ret == -1:
+ _raise_changelog_err()
+
+ # cleanup tracker
+ startfresh()
+
+ return sorted(changes, key=clsort)
+
+
+def done(clfile):
+ ret = gr_cl_done(libgfc, clfile)
+ if ret == -1:
+ _raise_changelog_err()
+
+
+def history_scan():
+ ret = libgfc.gf_history_changelog_scan()
+ if ret == -1:
+ _raise_changelog_err()
+
+ return ret
+
+
+def history_changelog(changelog_path, start, end, num_parallel):
+ actual_end = c_ulong()
+ ret = gr_cl_history_changelog(libgfc, changelog_path, start, end,
+ num_parallel, byref(actual_end))
+ if ret == -1:
+ _raise_changelog_err()
+
+ if ret == -2:
+ raise ChangelogHistoryNotAvailable()
+
+ return (ret, actual_end.value)
+
+
+def history_startfresh():
+ ret = libgfc.gf_history_changelog_start_fresh()
+ if ret == -1:
+ _raise_changelog_err()
+
+
+def history_getchanges():
+ def clsort(cfile):
+ return cfile.split('.')[-1]
+
+ changes = []
+ buf = gr_create_string_buffer(4096)
+ call = libgfc.gf_history_changelog_next_change
+
+ while True:
+ ret = call(buf, 4096)
+ if ret in (0, -1):
+ break
+
+ # py2 and py3 compatibility
+ result = bytearray_to_str(buf.raw[:ret - 1])
+ changes.append(result)
+
+ if ret == -1:
+ _raise_changelog_err()
+
+ return sorted(changes, key=clsort)
+
+
+def history_done(clfile):
+ ret = gr_cl_history_done(libgfc, clfile)
+ if ret == -1:
+ _raise_changelog_err()
diff --git a/geo-replication/syncdaemon/master.py b/geo-replication/syncdaemon/master.py
index 65eaf673099..9501aeae6b5 100644
--- a/geo-replication/syncdaemon/master.py
+++ b/geo-replication/syncdaemon/master.py
@@ -22,11 +22,13 @@ from threading import Condition, Lock
from datetime import datetime
import gsyncdconfig as gconf
+import libgfchangelog
from rconf import rconf
-from syncdutils import Thread, GsyncdError, escape_space_newline
-from syncdutils import unescape_space_newline, gauxpfx, escape
-from syncdutils import lstat, errno_wrap, FreeObject, lf, matching_disk_gfid
-from syncdutils import NoStimeAvailable, PartialHistoryAvailable
+from syncdutils import (Thread, GsyncdError, escape_space_newline,
+ unescape_space_newline, gauxpfx, escape,
+ lstat, errno_wrap, FreeObject, lf, matching_disk_gfid,
+ NoStimeAvailable, PartialHistoryAvailable,
+ host_brick_split)
URXTIME = (-1, 0)
@@ -65,6 +67,9 @@ def _volinfo_hook_relax_foreign(self):
def edct(op, **ed):
dct = {}
dct['op'] = op
+ # This is used in automatic gfid conflict resolution.
+ # When marked True, it's skipped during re-processing.
+ dct['skip_entry'] = False
for k in ed:
if k == 'stat':
st = ed[k]
@@ -514,7 +519,7 @@ class GMasterCommon(object):
# If crawlwrap is called when partial history available,
# then it sets register_time which is the time when geo-rep
# worker registered to changelog consumption. Since nsec is
- # not considered in register time, their are chances of skipping
+ # not considered in register time, there are chances of skipping
# changes detection in xsync crawl. This limit will be reset when
# crawlwrap is called again.
self.live_changelog_start_time = None
@@ -696,7 +701,7 @@ class GMasterChangelogMixin(GMasterCommon):
TYPE_ENTRY = "E "
MAX_EF_RETRIES = 10
- MAX_OE_RETRIES = 5
+ MAX_OE_RETRIES = 10
# flat directory hierarchy for gfid based access
FLAT_DIR_HIERARCHY = '.'
@@ -792,6 +797,7 @@ class GMasterChangelogMixin(GMasterCommon):
pfx = gauxpfx()
fix_entry_ops = []
failures1 = []
+ remove_gfids = set()
for failure in failures:
if failure[2]['name_mismatch']:
pbname = failure[2]['slave_entry']
@@ -807,7 +813,7 @@ class GMasterChangelogMixin(GMasterCommon):
st = lstat(os.path.join(pfx, slave_gfid))
# Takes care of scenarios with no hardlinks
if isinstance(st, int) and st == ENOENT:
- logging.info(lf('Entry not present on master. Fixing gfid '
+ logging.debug(lf('Entry not present on master. Fixing gfid '
'mismatch in slave. Deleting the entry',
retry_count=retry_count,
entry=repr(failure)))
@@ -822,46 +828,67 @@ class GMasterChangelogMixin(GMasterCommon):
edct('UNLINK',
gfid=failure[2]['slave_gfid'],
entry=pbname))
+ remove_gfids.add(slave_gfid)
+ if op in ['RENAME']:
+ # If renamed gfid doesn't exists on master, remove
+ # rename entry and unlink src on slave
+ st = lstat(os.path.join(pfx, failure[0]['gfid']))
+ if isinstance(st, int) and st == ENOENT:
+ logging.debug("Unlink source %s" % repr(failure))
+ remove_gfids.add(failure[0]['gfid'])
+ fix_entry_ops.append(
+ edct('UNLINK',
+ gfid=failure[0]['gfid'],
+ entry=failure[0]['entry']))
# Takes care of scenarios of hardlinks/renames on master
elif not isinstance(st, int):
if matching_disk_gfid(slave_gfid, pbname):
# Safe to ignore the failure as master contains same
# file with same gfid. Remove entry from entries list
- logging.info(lf('Fixing gfid mismatch in slave. '
+ logging.debug(lf('Fixing gfid mismatch in slave. '
' Safe to ignore, take out entry',
retry_count=retry_count,
entry=repr(failure)))
- entries.remove(failure[0])
+ remove_gfids.add(failure[0]['gfid'])
+ if op == 'RENAME':
+ fix_entry_ops.append(
+ edct('UNLINK',
+ gfid=failure[0]['gfid'],
+ entry=failure[0]['entry']))
# The file exists on master but with different name.
# Probably renamed and got missed during xsync crawl.
elif failure[2]['slave_isdir']:
- realpath = os.readlink(os.path.join(gconf.local_path,
- ".glusterfs",
- slave_gfid[0:2],
- slave_gfid[2:4],
- slave_gfid))
+ realpath = os.readlink(os.path.join(
+ rconf.args.local_path,
+ ".glusterfs",
+ slave_gfid[0:2],
+ slave_gfid[2:4],
+ slave_gfid))
dst_entry = os.path.join(pfx, realpath.split('/')[-2],
realpath.split('/')[-1])
src_entry = pbname
- logging.info(lf('Fixing dir name/gfid mismatch in '
+ logging.debug(lf('Fixing dir name/gfid mismatch in '
'slave', retry_count=retry_count,
entry=repr(failure)))
if src_entry == dst_entry:
# Safe to ignore the failure as master contains
# same directory as in slave with same gfid.
# Remove the failure entry from entries list
- logging.info(lf('Fixing dir name/gfid mismatch'
+ logging.debug(lf('Fixing dir name/gfid mismatch'
' in slave. Safe to ignore, '
'take out entry',
retry_count=retry_count,
entry=repr(failure)))
- entries.remove(failure[0])
+ try:
+ entries.remove(failure[0])
+ except ValueError:
+ pass
else:
rename_dict = edct('RENAME', gfid=slave_gfid,
entry=src_entry,
entry1=dst_entry, stat=st,
link=None)
- logging.info(lf('Fixing dir name/gfid mismatch'
+ logging.debug(lf('Fixing dir name/gfid mismatch'
' in slave. Renaming',
retry_count=retry_count,
entry=repr(rename_dict)))
@@ -871,7 +898,7 @@ class GMasterChangelogMixin(GMasterCommon):
# renamed file exists and we are sure from
# matching_disk_gfid check that the entry doesn't
# exist with same gfid so we can safely delete on slave
- logging.info(lf('Fixing file gfid mismatch in slave. '
+ logging.debug(lf('Fixing file gfid mismatch in slave. '
'Hardlink/Rename Case. Deleting entry',
retry_count=retry_count,
entry=repr(failure)))
@@ -880,31 +907,52 @@ class GMasterChangelogMixin(GMasterCommon):
gfid=failure[2]['slave_gfid'],
entry=pbname))
elif failure[1] == ENOENT:
- # Ignore ENOENT error for fix_entry_ops aka retry_count > 1
- if retry_count > 1:
- logging.info(lf('ENOENT error while fixing entry ops. '
- 'Safe to ignore, take out entry',
+ if op in ['RENAME']:
+ pbname = failure[0]['entry1']
+ else:
+ pbname = failure[0]['entry']
+
+ pargfid = pbname.split('/')[1]
+ st = lstat(os.path.join(pfx, pargfid))
+ # Safe to ignore the failure as master doesn't contain
+ # parent directory.
+ if isinstance(st, int):
+ logging.debug(lf('Fixing ENOENT error in slave. Parent '
+ 'does not exist on master. Safe to '
+ 'ignore, take out entry',
retry_count=retry_count,
entry=repr(failure)))
- entries.remove(failure[0])
- elif op in ('MKNOD', 'CREATE', 'MKDIR'):
- pargfid = pbname.split('/')[1]
- st = lstat(os.path.join(pfx, pargfid))
- # Safe to ignore the failure as master doesn't contain
- # parent directory.
- if isinstance(st, int):
- logging.info(lf('Fixing ENOENT error in slave. Parent '
- 'does not exist on master. Safe to '
- 'ignore, take out entry',
- retry_count=retry_count,
- entry=repr(failure)))
+ try:
entries.remove(failure[0])
+ except ValueError:
+ pass
+ else:
+ logging.debug(lf('Fixing ENOENT error in slave. Create '
+ 'parent directory on slave.',
+ retry_count=retry_count,
+ entry=repr(failure)))
+ realpath = os.readlink(os.path.join(rconf.args.local_path,
+ ".glusterfs",
+ pargfid[0:2],
+ pargfid[2:4],
+ pargfid))
+ dir_entry = os.path.join(pfx, realpath.split('/')[-2],
+ realpath.split('/')[-1])
+ fix_entry_ops.append(
+ edct('MKDIR', gfid=pargfid, entry=dir_entry,
+ mode=st.st_mode, uid=st.st_uid, gid=st.st_gid))
+
+ logging.debug("remove_gfids: %s" % repr(remove_gfids))
+ if remove_gfids:
+ for e in entries:
+ if e['op'] in ['MKDIR', 'MKNOD', 'CREATE', 'RENAME'] \
+ and e['gfid'] in remove_gfids:
+ logging.debug("Removed entry op from retrial list: entry: %s" % repr(e))
+ e['skip_entry'] = True
if fix_entry_ops:
# Process deletions of entries whose gfids are mismatched
failures1 = self.slave.server.entry_ops(fix_entry_ops)
- if not failures1:
- logging.info("Successfully fixed entry ops with gfid mismatch")
return (failures1, fix_entry_ops)
@@ -1078,6 +1126,11 @@ class GMasterChangelogMixin(GMasterCommon):
os.path.join(pfx, ec[self.POS_ENTRY1 - 1]))
entries.append(edct(ty, gfid=gfid, entry=e1, entry1=en,
stat=st, link=rl))
+ # If src doesn't exist while doing rename, destination
+ # is created. If data is not followed by rename, this
+ # remains zero byte file on slave. Hence add data entry
+ # for renames
+ datas.add(os.path.join(pfx, gfid))
else:
# stat() to get mode and other information
if not matching_disk_gfid(gfid, en):
@@ -1101,6 +1154,12 @@ class GMasterChangelogMixin(GMasterCommon):
rl = None
entries.append(edct(ty, stat=st, entry=en, gfid=gfid,
link=rl))
+ # If src doesn't exist while doing link, destination
+ # is created based on file type. If data is not
+ # followed by link, this remains zero byte file on
+ # slave. Hence add data entry for links
+ if rl is None:
+ datas.add(os.path.join(pfx, gfid))
elif ty == 'SYMLINK':
rl = errno_wrap(os.readlink, [en], [ENOENT],
[ESTALE, EINTR])
@@ -1148,7 +1207,6 @@ class GMasterChangelogMixin(GMasterCommon):
logging.debug('entries: %s' % repr(entries))
# Increment counters for Status
- self.status.inc_value("entry", len(entries))
self.files_in_batch += len(datas)
self.status.inc_value("data", len(datas))
@@ -1166,10 +1224,13 @@ class GMasterChangelogMixin(GMasterCommon):
if gconf.get("gfid-conflict-resolution"):
count = 0
+ if failures:
+ logging.info(lf('Entry ops failed with gfid mismatch',
+ count=len(failures)))
while failures and count < self.MAX_OE_RETRIES:
count += 1
self.handle_entry_failures(failures, entries)
- logging.info("Retry original entries. count = %s" % count)
+ logging.info(lf('Retry original entries', count=count))
failures = self.slave.server.entry_ops(entries)
if not failures:
logging.info("Successfully fixed all entry ops with "
@@ -1211,10 +1272,10 @@ class GMasterChangelogMixin(GMasterCommon):
continue
meta_entries.append(edct('META', go=go[0], stat=st))
if meta_entries:
- self.status.inc_value("meta", len(entries))
+ self.status.inc_value("meta", len(meta_entries))
failures = self.slave.server.meta_ops(meta_entries)
self.log_failures(failures, 'go', '', 'META')
- self.status.dec_value("meta", len(entries))
+ self.status.dec_value("meta", len(meta_entries))
self.batch_stats["META_SYNC_TIME"] += time.time() - meta_start_time
@@ -1406,7 +1467,7 @@ class GMasterChangelogMixin(GMasterCommon):
node = rconf.args.resource_remote
node_data = node.split("@")
node = node_data[-1]
- remote_node_ip = node.split(":")[0]
+ remote_node_ip, _ = host_brick_split(node)
self.status.set_slave_node(remote_node_ip)
def changelogs_batch_process(self, changes):
@@ -1439,9 +1500,9 @@ class GMasterChangelogMixin(GMasterCommon):
# that are _historical_ to that time.
data_stime = self.get_data_stime()
- self.changelog_agent.scan()
+ libgfchangelog.scan()
self.crawls += 1
- changes = self.changelog_agent.getchanges()
+ changes = libgfchangelog.getchanges()
if changes:
if data_stime:
logging.info(lf("slave's time",
@@ -1458,10 +1519,9 @@ class GMasterChangelogMixin(GMasterCommon):
self.changelogs_batch_process(changes)
- def register(self, register_time, changelog_agent, status):
- self.changelog_agent = changelog_agent
+ def register(self, register_time, status):
self.sleep_interval = gconf.get("change-interval")
- self.changelog_done_func = self.changelog_agent.done
+ self.changelog_done_func = libgfchangelog.done
self.tempdir = self.setup_working_dir()
self.processed_changelogs_dir = os.path.join(self.tempdir,
".processed")
@@ -1470,11 +1530,10 @@ class GMasterChangelogMixin(GMasterCommon):
class GMasterChangeloghistoryMixin(GMasterChangelogMixin):
- def register(self, register_time, changelog_agent, status):
- self.changelog_agent = changelog_agent
+ def register(self, register_time, status):
self.changelog_register_time = register_time
self.history_crawl_start_time = register_time
- self.changelog_done_func = self.changelog_agent.history_done
+ self.changelog_done_func = libgfchangelog.history_done
self.history_turns = 0
self.tempdir = self.setup_working_dir()
self.processed_changelogs_dir = os.path.join(self.tempdir,
@@ -1488,6 +1547,12 @@ class GMasterChangeloghistoryMixin(GMasterChangelogMixin):
data_stime = self.get_data_stime()
end_time = int(time.time())
+
+ #as start of historical crawl marks Geo-rep worker restart
+ if gconf.get("ignore-deletes"):
+ logging.info(lf('ignore-deletes config option is set',
+ stime=data_stime))
+
logging.info(lf('starting history crawl',
turns=self.history_turns,
stime=data_stime,
@@ -1502,7 +1567,7 @@ class GMasterChangeloghistoryMixin(GMasterChangelogMixin):
# location then consuming history will not work(Known issue as of now)
changelog_path = os.path.join(rconf.args.local_path,
".glusterfs/changelogs")
- ret, actual_end = self.changelog_agent.history(
+ ret, actual_end = libgfchangelog.history_changelog(
changelog_path,
data_stime[0],
end_time,
@@ -1514,10 +1579,10 @@ class GMasterChangeloghistoryMixin(GMasterChangelogMixin):
# to be processed. returns positive value as number of changelogs
# to be processed, which will be fetched using
# history_getchanges()
- while self.changelog_agent.history_scan() > 0:
+ while libgfchangelog.history_scan() > 0:
self.crawls += 1
- changes = self.changelog_agent.history_getchanges()
+ changes = libgfchangelog.history_getchanges()
if changes:
if data_stime:
logging.info(lf("slave's time",
@@ -1570,7 +1635,7 @@ class GMasterXsyncMixin(GMasterChangelogMixin):
XSYNC_MAX_ENTRIES = 1 << 13
- def register(self, register_time=None, changelog_agent=None, status=None):
+ def register(self, register_time=None, status=None):
self.status = status
self.counter = 0
self.comlist = []
diff --git a/geo-replication/syncdaemon/monitor.py b/geo-replication/syncdaemon/monitor.py
index c45ef24e59f..6aa7b9dfc99 100644
--- a/geo-replication/syncdaemon/monitor.py
+++ b/geo-replication/syncdaemon/monitor.py
@@ -20,13 +20,14 @@ import random
from resource import SSH
import gsyncdconfig as gconf
+import libgfchangelog
from rconf import rconf
-from syncdutils import select, waitpid, errno_wrap, lf, grabpidfile
-from syncdutils import set_term_handler, GsyncdError
-from syncdutils import Thread, finalize, Volinfo, VolinfoFromGconf
-from syncdutils import gf_event, EVENT_GEOREP_FAULTY, get_up_nodes
+from syncdutils import (select, waitpid, errno_wrap, lf, grabpidfile,
+ set_term_handler, GsyncdError,
+ Thread, finalize, Volinfo, VolinfoFromGconf,
+ gf_event, EVENT_GEOREP_FAULTY, get_up_nodes,
+ unshare_propagation_supported)
from gsyncdstatus import GeorepStatus, set_monitor_status
-from syncdutils import unshare_propagation_supported
import py2py3
from py2py3 import pipe
@@ -37,6 +38,8 @@ def get_subvol_num(brick_idx, vol, hot):
tier = vol.is_tier()
disperse_count = vol.disperse_count(tier, hot)
replica_count = vol.replica_count(tier, hot)
+ distribute_count = vol.distribution_count(tier, hot)
+ gconf.setconfig("master-distribution-count", distribute_count)
if (tier and not hot):
brick_idx = brick_idx - vol.get_hot_bricks_count(tier)
@@ -79,7 +82,7 @@ class Monitor(object):
# give a chance to graceful exit
errno_wrap(os.kill, [-os.getpid(), signal.SIGTERM], [ESRCH])
- def monitor(self, w, argv, cpids, agents, slave_vol, slave_host, master,
+ def monitor(self, w, argv, cpids, slave_vol, slave_host, master,
suuid, slavenodes):
"""the monitor loop
@@ -148,7 +151,7 @@ class Monitor(object):
remote_host = "%s@%s" % (remote_user, remote_new[0])
remote_id = remote_new[1]
- # Spawn the worker and agent in lock to avoid fd leak
+ # Spawn the worker in lock to avoid fd leak
self.lock.acquire()
self.status[w[0]['dir']].set_worker_status(self.ST_INIT)
@@ -156,44 +159,10 @@ class Monitor(object):
brick=w[0]['dir'],
slave_node=remote_host))
- # Couple of pipe pairs for RPC communication b/w
- # worker and changelog agent.
-
- # read/write end for agent
- (ra, ww) = pipe()
- # read/write end for worker
- (rw, wa) = pipe()
-
- # spawn the agent process
- apid = os.fork()
- if apid == 0:
- os.close(rw)
- os.close(ww)
- args_to_agent = argv + [
- 'agent',
- rconf.args.master,
- rconf.args.slave,
- '--local-path', w[0]['dir'],
- '--local-node', w[0]['host'],
- '--local-node-id', w[0]['uuid'],
- '--slave-id', suuid,
- '--rpc-fd', ','.join([str(ra), str(wa), str(rw), str(ww)])
- ]
-
- if rconf.args.config_file is not None:
- args_to_agent += ['-c', rconf.args.config_file]
-
- if rconf.args.debug:
- args_to_agent.append("--debug")
-
- os.execv(sys.executable, args_to_agent)
-
pr, pw = pipe()
cpid = os.fork()
if cpid == 0:
os.close(pr)
- os.close(ra)
- os.close(wa)
args_to_worker = argv + [
'worker',
@@ -204,8 +173,6 @@ class Monitor(object):
'--local-node', w[0]['host'],
'--local-node-id', w[0]['uuid'],
'--slave-id', suuid,
- '--rpc-fd',
- ','.join([str(rw), str(ww), str(ra), str(wa)]),
'--subvol-num', str(w[2]),
'--resource-remote', remote_host,
'--resource-remote-id', remote_id
@@ -236,14 +203,8 @@ class Monitor(object):
os.execv(sys.executable, args_to_worker)
cpids.add(cpid)
- agents.add(apid)
os.close(pw)
- # close all RPC pipes in monitor
- os.close(ra)
- os.close(wa)
- os.close(rw)
- os.close(ww)
self.lock.release()
t0 = time.time()
@@ -252,42 +213,19 @@ class Monitor(object):
if so:
ret = nwait(cpid, os.WNOHANG)
- ret_agent = nwait(apid, os.WNOHANG)
-
- if ret_agent is not None:
- # Agent is died Kill Worker
- logging.info(lf("Changelog Agent died, Aborting Worker",
- brick=w[0]['dir']))
- errno_wrap(os.kill, [cpid, signal.SIGKILL], [ESRCH])
- nwait(cpid)
- nwait(apid)
if ret is not None:
logging.info(lf("worker died before establishing "
"connection",
brick=w[0]['dir']))
- nwait(apid) # wait for agent
else:
logging.debug("worker(%s) connected" % w[0]['dir'])
while time.time() < t0 + conn_timeout:
ret = nwait(cpid, os.WNOHANG)
- ret_agent = nwait(apid, os.WNOHANG)
if ret is not None:
logging.info(lf("worker died in startup phase",
brick=w[0]['dir']))
- nwait(apid) # wait for agent
- break
-
- if ret_agent is not None:
- # Agent is died Kill Worker
- logging.info(lf("Changelog Agent died, Aborting "
- "Worker",
- brick=w[0]['dir']))
- errno_wrap(os.kill, [cpid, signal.SIGKILL],
- [ESRCH])
- nwait(cpid)
- nwait(apid)
break
time.sleep(1)
@@ -302,12 +240,8 @@ class Monitor(object):
brick=w[0]['dir'],
timeout=conn_timeout))
errno_wrap(os.kill, [cpid, signal.SIGKILL], [ESRCH])
- nwait(apid) # wait for agent
ret = nwait(cpid)
if ret is None:
- # If worker dies, agent terminates on EOF.
- # So lets wait for agent first.
- nwait(apid)
ret = nwait(cpid)
if exit_signalled(ret):
ret = 0
@@ -331,18 +265,15 @@ class Monitor(object):
argv = [os.path.basename(sys.executable), sys.argv[0]]
cpids = set()
- agents = set()
ta = []
for wx in wspx:
def wmon(w):
- cpid, _ = self.monitor(w, argv, cpids, agents, slave_vol,
+ cpid, _ = self.monitor(w, argv, cpids, slave_vol,
slave_host, master, suuid, slavenodes)
time.sleep(1)
self.lock.acquire()
for cpid in cpids:
errno_wrap(os.kill, [cpid, signal.SIGKILL], [ESRCH])
- for apid in agents:
- errno_wrap(os.kill, [apid, signal.SIGKILL], [ESRCH])
self.lock.release()
finalize(exval=1)
t = Thread(target=wmon, args=[wx])
@@ -352,8 +283,8 @@ class Monitor(object):
# monitor status was being updated in each monitor thread. It
# should not be done as it can cause deadlock for a worker start.
# set_monitor_status uses flock to synchronize multple instances
- # updating the file. Since each monitor thread forks worker and
- # agent, these processes can hold the reference to fd of status
+ # updating the file. Since each monitor thread forks worker,
+ # these processes can hold the reference to fd of status
# file causing deadlock to workers which starts later as flock
# will not be release until all references to same fd is closed.
# It will also cause fd leaks.
@@ -369,7 +300,7 @@ def distribute(master, slave):
if rconf.args.use_gconf_volinfo:
mvol = VolinfoFromGconf(master.volume, master=True)
else:
- mvol = Volinfo(master.volume, master.host)
+ mvol = Volinfo(master.volume, master.host, master=True)
logging.debug('master bricks: ' + repr(mvol.bricks))
prelude = []
slave_host = None
@@ -385,7 +316,7 @@ def distribute(master, slave):
if rconf.args.use_gconf_volinfo:
svol = VolinfoFromGconf(slave.volume, master=False)
else:
- svol = Volinfo(slave.volume, "localhost", prelude)
+ svol = Volinfo(slave.volume, "localhost", prelude, master=False)
sbricks = svol.bricks
suuid = svol.uuid
diff --git a/geo-replication/syncdaemon/py2py3.py b/geo-replication/syncdaemon/py2py3.py
index 4c0e1152aa8..f9c76e1b50a 100644
--- a/geo-replication/syncdaemon/py2py3.py
+++ b/geo-replication/syncdaemon/py2py3.py
@@ -12,7 +12,10 @@
import sys
import os
-
+import stat
+import struct
+from syncdutils import umask
+from ctypes import create_string_buffer
if sys.version_info >= (3,):
def pipe():
@@ -35,6 +38,77 @@ if sys.version_info >= (3,):
def str_to_bytearray(string):
return bytes([ord(c) for c in string])
+ def gr_create_string_buffer(size):
+ return create_string_buffer(b'\0', size)
+
+ def gr_query_xattr(cls, path, size, syscall, attr=None):
+ if attr:
+ return cls._query_xattr(path.encode(), size, syscall,
+ attr.encode())
+ else:
+ return cls._query_xattr(path.encode(), size, syscall)
+
+ def gr_lsetxattr(cls, path, attr, val):
+ return cls.libc.lsetxattr(path.encode(), attr.encode(), val,
+ len(val), 0)
+
+ def gr_lremovexattr(cls, path, attr):
+ return cls.libc.lremovexattr(path.encode(), attr.encode())
+
+ def gr_cl_register(libgfapi, brick, path, log_file, log_level, retries):
+ return libgfapi.gf_changelog_register(brick.encode(),
+ path.encode(),
+ log_file.encode(),
+ log_level, retries)
+
+ def gr_cl_done(libgfapi, clfile):
+ return libgfapi.gf_changelog_done(clfile.encode())
+
+ def gr_cl_history_changelog(libgfapi, changelog_path, start, end, num_parallel,
+ actual_end):
+ return libgfapi.gf_history_changelog(changelog_path.encode(),
+ start, end, num_parallel,
+ actual_end)
+
+ def gr_cl_history_done(libgfapi, clfile):
+ return libgfapi.gf_history_changelog_done(clfile.encode())
+
+ # regular file
+
+ def entry_pack_reg(cls, gf, bn, mo, uid, gid):
+ bn_encoded = bn.encode()
+ blen = len(bn_encoded)
+ return struct.pack(cls._fmt_mknod(blen),
+ uid, gid, gf.encode(), mo, bn_encoded,
+ stat.S_IMODE(mo), 0, umask())
+
+ def entry_pack_reg_stat(cls, gf, bn, st):
+ bn_encoded = bn.encode()
+ blen = len(bn_encoded)
+ mo = st['mode']
+ return struct.pack(cls._fmt_mknod(blen),
+ st['uid'], st['gid'],
+ gf.encode(), mo, bn_encoded,
+ stat.S_IMODE(mo), 0, umask())
+ # mkdir
+
+ def entry_pack_mkdir(cls, gf, bn, mo, uid, gid):
+ bn_encoded = bn.encode()
+ blen = len(bn_encoded)
+ return struct.pack(cls._fmt_mkdir(blen),
+ uid, gid, gf.encode(), mo, bn_encoded,
+ stat.S_IMODE(mo), umask())
+ # symlink
+
+ def entry_pack_symlink(cls, gf, bn, lnk, st):
+ bn_encoded = bn.encode()
+ blen = len(bn_encoded)
+ lnk_encoded = lnk.encode()
+ llen = len(lnk_encoded)
+ return struct.pack(cls._fmt_symlink(blen, llen),
+ st['uid'], st['gid'],
+ gf.encode(), st['mode'], bn_encoded,
+ lnk_encoded)
else:
def pipe():
(r, w) = os.pipe()
@@ -42,8 +116,69 @@ else:
# Raw conversion of bytearray to string
def bytearray_to_str(byte_arr):
- return ''.join([b for b in byte_arr])
+ return byte_arr
# Raw conversion of string to bytearray
def str_to_bytearray(string):
- return b"".join([c for c in string])
+ return string
+
+ def gr_create_string_buffer(size):
+ return create_string_buffer('\0', size)
+
+ def gr_query_xattr(cls, path, size, syscall, attr=None):
+ if attr:
+ return cls._query_xattr(path, size, syscall, attr)
+ else:
+ return cls._query_xattr(path, size, syscall)
+
+ def gr_lsetxattr(cls, path, attr, val):
+ return cls.libc.lsetxattr(path, attr, val, len(val), 0)
+
+ def gr_lremovexattr(cls, path, attr):
+ return cls.libc.lremovexattr(path, attr)
+
+ def gr_cl_register(libgfapi, brick, path, log_file, log_level, retries):
+ return libgfapi.gf_changelog_register(brick, path, log_file,
+ log_level, retries)
+
+ def gr_cl_done(libgfapi, clfile):
+ return libgfapi.gf_changelog_done(clfile)
+
+ def gr_cl_history_changelog(libgfapi, changelog_path, start, end, num_parallel,
+ actual_end):
+ return libgfapi.gf_history_changelog(changelog_path, start, end,
+ num_parallel, actual_end)
+
+ def gr_cl_history_done(libgfapi, clfile):
+ return libgfapi.gf_history_changelog_done(clfile)
+
+ # regular file
+
+ def entry_pack_reg(cls, gf, bn, mo, uid, gid):
+ blen = len(bn)
+ return struct.pack(cls._fmt_mknod(blen),
+ uid, gid, gf, mo, bn,
+ stat.S_IMODE(mo), 0, umask())
+
+ def entry_pack_reg_stat(cls, gf, bn, st):
+ blen = len(bn)
+ mo = st['mode']
+ return struct.pack(cls._fmt_mknod(blen),
+ st['uid'], st['gid'],
+ gf, mo, bn,
+ stat.S_IMODE(mo), 0, umask())
+ # mkdir
+
+ def entry_pack_mkdir(cls, gf, bn, mo, uid, gid):
+ blen = len(bn)
+ return struct.pack(cls._fmt_mkdir(blen),
+ uid, gid, gf, mo, bn,
+ stat.S_IMODE(mo), umask())
+ # symlink
+
+ def entry_pack_symlink(cls, gf, bn, lnk, st):
+ blen = len(bn)
+ llen = len(lnk)
+ return struct.pack(cls._fmt_symlink(blen, llen),
+ st['uid'], st['gid'],
+ gf, st['mode'], bn, lnk)
diff --git a/geo-replication/syncdaemon/repce.py b/geo-replication/syncdaemon/repce.py
index 6065b828c99..c622afa6373 100644
--- a/geo-replication/syncdaemon/repce.py
+++ b/geo-replication/syncdaemon/repce.py
@@ -8,7 +8,6 @@
# cases as published by the Free Software Foundation.
#
-import _io
import os
import sys
import time
@@ -58,9 +57,9 @@ def recv(inf):
"""load an object from input stream
python2 and python3 compatibility, inf is sys.stdin
and is opened as text stream by default. Hence using the
- buffer attribute
+ buffer attribute in python3
"""
- if isinstance(inf, _io.TextIOWrapper):
+ if hasattr(inf, "buffer"):
return pickle.load(inf.buffer)
else:
return pickle.load(inf)
diff --git a/geo-replication/syncdaemon/resource.py b/geo-replication/syncdaemon/resource.py
index 80e1fbcb133..f12c7ceaa36 100644
--- a/geo-replication/syncdaemon/resource.py
+++ b/geo-replication/syncdaemon/resource.py
@@ -19,30 +19,31 @@ import struct
import logging
import tempfile
import subprocess
-from errno import EEXIST, ENOENT, ENODATA, ENOTDIR, ELOOP, EACCES
-from errno import EISDIR, ENOTEMPTY, ESTALE, EINVAL, EBUSY, EPERM
+from errno import (EEXIST, ENOENT, ENODATA, ENOTDIR, ELOOP, EACCES,
+ EISDIR, ENOTEMPTY, ESTALE, EINVAL, EBUSY, EPERM)
import errno
from rconf import rconf
import gsyncdconfig as gconf
+import libgfchangelog
import repce
from repce import RepceServer, RepceClient
from master import gmaster_builder
import syncdutils
-from syncdutils import GsyncdError, select, privileged, funcode
-from syncdutils import umask, entry2pb, gauxpfx, errno_wrap, lstat
-from syncdutils import NoStimeAvailable, PartialHistoryAvailable
-from syncdutils import ChangelogException, ChangelogHistoryNotAvailable
-from syncdutils import get_changelog_log_level, get_rsync_version
-from syncdutils import CHANGELOG_AGENT_CLIENT_VERSION
-from syncdutils import GX_GFID_CANONICAL_LEN
+from syncdutils import (GsyncdError, select, privileged, funcode,
+ entry2pb, gauxpfx, errno_wrap, lstat,
+ NoStimeAvailable, PartialHistoryAvailable,
+ ChangelogException, ChangelogHistoryNotAvailable,
+ get_changelog_log_level, get_rsync_version,
+ GX_GFID_CANONICAL_LEN,
+ gf_mount_ready, lf, Popen, sup,
+ Xattr, matching_disk_gfid, get_gfid_from_mnt,
+ unshare_propagation_supported, get_slv_dir_path)
from gsyncdstatus import GeorepStatus
-from syncdutils import lf, Popen, sup
-from syncdutils import Xattr, matching_disk_gfid, get_gfid_from_mnt
-from syncdutils import unshare_propagation_supported, get_slv_dir_path
-import py2py3
-from py2py3 import pipe, str_to_bytearray
+from py2py3 import (pipe, str_to_bytearray, entry_pack_reg,
+ entry_pack_reg_stat, entry_pack_mkdir,
+ entry_pack_symlink)
ENOTSUP = getattr(errno, 'ENOTSUP', 'EOPNOTSUPP')
@@ -377,37 +378,7 @@ class Server(object):
def entry_ops(cls, entries):
pfx = gauxpfx()
logging.debug('entries: %s' % repr(entries))
- # regular file
-
- def entry_pack_reg(gf, bn, mo, uid, gid):
- blen = len(bn)
- return struct.pack(cls._fmt_mknod(blen),
- uid, gid, gf.encode(), mo, bn.encode(),
- stat.S_IMODE(mo), 0, umask())
-
- def entry_pack_reg_stat(gf, bn, st):
- blen = len(bn)
- mo = st['mode']
- return struct.pack(cls._fmt_mknod(blen),
- st['uid'], st['gid'],
- gf.encode(), mo, bn.encode(),
- stat.S_IMODE(mo), 0, umask())
- # mkdir
-
- def entry_pack_mkdir(gf, bn, mo, uid, gid):
- blen = len(bn)
- return struct.pack(cls._fmt_mkdir(blen),
- uid, gid, gf.encode(), mo, bn.encode(),
- stat.S_IMODE(mo), umask())
- # symlink
-
- def entry_pack_symlink(gf, bn, lnk, st):
- blen = len(bn)
- llen = len(lnk)
- return struct.pack(cls._fmt_symlink(blen, llen),
- st['uid'], st['gid'],
- gf.encode(), st['mode'], bn.encode(),
- lnk.encode())
+ dist_count = rconf.args.master_dist_count
def entry_purge(op, entry, gfid, e, uid, gid):
# This is an extremely racy code and needs to be fixed ASAP.
@@ -457,7 +428,7 @@ class Server(object):
e['stat']['uid'] = uid
e['stat']['gid'] = gid
- if cmd_ret == EEXIST:
+ if cmd_ret in [EEXIST, ESTALE]:
if dst:
en = e['entry1']
else:
@@ -541,6 +512,12 @@ class Server(object):
entry = e['entry']
uid = 0
gid = 0
+
+ # Skip entry processing if it's marked true during gfid
+ # conflict resolution
+ if e['skip_entry']:
+ continue
+
if e.get("stat", {}):
# Copy UID/GID value and then reset to zero. Copied UID/GID
# will be used to run chown once entry is created.
@@ -581,8 +558,8 @@ class Server(object):
st = lstat(slink)
# don't create multiple entries with same gfid
if isinstance(st, int):
- blob = entry_pack_reg(
- gfid, bname, e['mode'], e['uid'], e['gid'])
+ blob = entry_pack_reg(cls, gfid, bname,
+ e['mode'], e['uid'], e['gid'])
# Self healed hardlinks are recorded as MKNOD.
# So if the gfid already exists, it should be
# processed as hard link not mknod.
@@ -597,8 +574,8 @@ class Server(object):
st = lstat(slink)
# don't create multiple entries with same gfid
if isinstance(st, int):
- blob = entry_pack_mkdir(
- gfid, bname, e['mode'], e['uid'], e['gid'])
+ blob = entry_pack_mkdir(cls, gfid, bname,
+ e['mode'], e['uid'], e['gid'])
elif (isinstance(lstat(en), int) or
not matching_disk_gfid(gfid, en)):
# If gfid of a directory exists on slave but path based
@@ -610,6 +587,8 @@ class Server(object):
logging.info(lf("Special case: rename on mkdir",
gfid=gfid, entry=repr(entry)))
src_entry = get_slv_dir_path(slv_host, slv_volume, gfid)
+ if src_entry is None:
+ collect_failure(e, ENOENT, uid, gid)
if src_entry is not None and src_entry != entry:
slv_entry_info = {}
slv_entry_info['gfid_mismatch'] = False
@@ -626,9 +605,9 @@ class Server(object):
if isinstance(st, int):
(pg, bname) = entry2pb(entry)
if stat.S_ISREG(e['stat']['mode']):
- blob = entry_pack_reg_stat(gfid, bname, e['stat'])
+ blob = entry_pack_reg_stat(cls, gfid, bname, e['stat'])
elif stat.S_ISLNK(e['stat']['mode']):
- blob = entry_pack_symlink(gfid, bname, e['link'],
+ blob = entry_pack_symlink(cls, gfid, bname, e['link'],
e['stat'])
else:
cmd_ret = errno_wrap(os.link,
@@ -639,7 +618,7 @@ class Server(object):
en = e['entry']
st = lstat(entry)
if isinstance(st, int):
- blob = entry_pack_symlink(gfid, bname, e['link'],
+ blob = entry_pack_symlink(cls, gfid, bname, e['link'],
e['stat'])
elif not matching_disk_gfid(gfid, en):
collect_failure(e, EEXIST, uid, gid)
@@ -654,22 +633,27 @@ class Server(object):
# exist with different gfid.
if not matching_disk_gfid(gfid, entry):
if e['stat'] and not stat.S_ISDIR(e['stat']['mode']):
- if stat.S_ISLNK(e['stat']['mode']) and \
- e['link'] is not None:
- st1 = lstat(en)
- if isinstance(st1, int):
- (pg, bname) = entry2pb(en)
- blob = entry_pack_symlink(gfid, bname,
- e['link'], e['stat'])
- elif not matching_disk_gfid(gfid, en):
- collect_failure(e, EEXIST, uid, gid, True)
+ if stat.S_ISLNK(e['stat']['mode']):
+ # src is not present, so don't sync symlink as
+ # we don't know target. It's ok to ignore. If
+ # it's unliked, it's fine. If it's renamed to
+ # something else, it will be synced then.
+ if e['link'] is not None:
+ st1 = lstat(en)
+ if isinstance(st1, int):
+ (pg, bname) = entry2pb(en)
+ blob = entry_pack_symlink(cls, gfid, bname,
+ e['link'],
+ e['stat'])
+ elif not matching_disk_gfid(gfid, en):
+ collect_failure(e, EEXIST, uid, gid, True)
else:
slink = os.path.join(pfx, gfid)
st = lstat(slink)
# don't create multiple entries with same gfid
if isinstance(st, int):
(pg, bname) = entry2pb(en)
- blob = entry_pack_reg_stat(gfid, bname,
+ blob = entry_pack_reg_stat(cls, gfid, bname,
e['stat'])
else:
cmd_ret = errno_wrap(os.link, [slink, en],
@@ -704,15 +688,21 @@ class Server(object):
raise
else:
raise
- elif not matching_disk_gfid(gfid, en):
+ elif not matching_disk_gfid(gfid, en) and dist_count > 1:
collect_failure(e, EEXIST, uid, gid, True)
else:
+ # We are here which means matching_disk_gfid for
+ # both source and destination has returned false
+ # and distribution count for master vol is greater
+ # then one. Which basically says both the source and
+ # destination exist and not hardlinks.
+ # So we are safe to go ahead with rename here.
rename_with_disk_gfid_confirmation(gfid, entry, en,
uid, gid)
if blob:
cmd_ret = errno_wrap(Xattr.lsetxattr,
[pg, 'glusterfs.gfid.newfile', blob],
- [EEXIST, ENOENT],
+ [EEXIST, ENOENT, ESTALE],
[ESTALE, EINVAL, EBUSY])
collect_failure(e, cmd_ret, uid, gid)
@@ -750,10 +740,8 @@ class Server(object):
# 'lchown' 'lchmod' 'utime with no-deference' blindly.
# But since 'lchmod' and 'utime with no de-reference' is
# not supported in python3, we have to rely on 'chmod'
- # and 'utime with de-reference'. But 'chmod'
- # de-reference the symlink and gets ENOENT, EACCES,
- # EPERM errors, hence ignoring those errors if it's on
- # symlink file.
+ # and 'utime with de-reference'. Hence avoiding 'chmod'
+ # and 'utime' if it's symlink file.
is_symlink = False
cmd_ret = errno_wrap(os.lchown, [go, uid, gid], [ENOENT],
@@ -761,19 +749,17 @@ class Server(object):
if isinstance(cmd_ret, int):
continue
- cmd_ret = errno_wrap(os.chmod, [go, mode],
- [ENOENT, EACCES, EPERM], [ESTALE, EINVAL])
- if isinstance(cmd_ret, int):
- is_symlink = os.path.islink(go)
- if not is_symlink:
+ is_symlink = os.path.islink(go)
+
+ if not is_symlink:
+ cmd_ret = errno_wrap(os.chmod, [go, mode],
+ [ENOENT, EACCES, EPERM], [ESTALE, EINVAL])
+ if isinstance(cmd_ret, int):
failures.append((e, cmd_ret, "chmod"))
- cmd_ret = errno_wrap(os.utime, [go, (atime, mtime)],
- [ENOENT, EACCES, EPERM], [ESTALE, EINVAL])
- if isinstance(cmd_ret, int):
- if not is_symlink:
- is_symlink = os.path.islink(go)
- if not is_symlink:
+ cmd_ret = errno_wrap(os.utime, [go, (atime, mtime)],
+ [ENOENT, EACCES, EPERM], [ESTALE, EINVAL])
+ if isinstance(cmd_ret, int):
failures.append((e, cmd_ret, "utime"))
return failures
@@ -965,6 +951,16 @@ class Mounter(object):
logging.exception('mount cleanup failure:')
rv = 200
os._exit(rv)
+
+ #Polling the dht.subvol.status value.
+ RETRIES = 10
+ while not gf_mount_ready():
+ if RETRIES < 0:
+ logging.error('Subvols are not up')
+ break
+ RETRIES -= 1
+ time.sleep(0.2)
+
logging.debug('auxiliary glusterfs mount prepared')
@@ -1260,9 +1256,6 @@ class GLUSTER(object):
# register the crawlers and start crawling
# g1 ==> Xsync, g2 ==> config.change_detector(changelog by default)
# g3 ==> changelog History
- (inf, ouf, ra, wa) = rconf.args.rpc_fd.split(',')
- changelog_agent = RepceClient(int(inf), int(ouf))
-
status = GeorepStatus(gconf.get("state-file"),
rconf.args.local_node,
rconf.args.local_path,
@@ -1270,12 +1263,6 @@ class GLUSTER(object):
rconf.args.master,
rconf.args.slave)
status.reset_on_worker_start()
- rv = changelog_agent.version()
- if int(rv) != CHANGELOG_AGENT_CLIENT_VERSION:
- raise GsyncdError(
- "RePCe major version mismatch(changelog agent): "
- "local %s, remote %s" %
- (CHANGELOG_AGENT_CLIENT_VERSION, rv))
try:
workdir = g2.setup_working_dir()
@@ -1286,17 +1273,16 @@ class GLUSTER(object):
# register with the changelog library
# 9 == log level (DEBUG)
# 5 == connection retries
- changelog_agent.init()
- changelog_agent.register(rconf.args.local_path,
- workdir,
- gconf.get("changelog-log-file"),
- get_changelog_log_level(
- gconf.get("changelog-log-level")),
- g2.CHANGELOG_CONN_RETRIES)
+ libgfchangelog.register(rconf.args.local_path,
+ workdir,
+ gconf.get("changelog-log-file"),
+ get_changelog_log_level(
+ gconf.get("changelog-log-level")),
+ g2.CHANGELOG_CONN_RETRIES)
register_time = int(time.time())
- g2.register(register_time, changelog_agent, status)
- g3.register(register_time, changelog_agent, status)
+ g2.register(register_time, status)
+ g3.register(register_time, status)
except ChangelogException as e:
logging.error(lf("Changelog register failed", error=e))
sys.exit(1)
@@ -1431,7 +1417,9 @@ class SSH(object):
'--slave-gluster-log-level',
gconf.get("slave-gluster-log-level"),
'--slave-gluster-command-dir',
- gconf.get("slave-gluster-command-dir")]
+ gconf.get("slave-gluster-command-dir"),
+ '--master-dist-count',
+ str(gconf.get("master-distribution-count"))]
if gconf.get("slave-access-mount"):
args_to_slave.append('--slave-access-mount')
@@ -1496,7 +1484,7 @@ class SSH(object):
if log_rsync_performance:
# use stdout=PIPE only when log_rsync_performance enabled
- # Else rsync will write to stdout and nobody is their
+ # Else rsync will write to stdout and nobody is there
# to consume. If PIPE is full rsync hangs.
po = Popen(argv, stdin=subprocess.PIPE, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, universal_newlines=True)
@@ -1534,7 +1522,7 @@ class SSH(object):
return po
- def tarssh(self, files, slaveurl, log_err=False):
+ def tarssh(self, files, log_err=False):
"""invoke tar+ssh
-z (compress) can be use if needed, but omitting it now
as it results in weird error (tar+ssh errors out (errcode: 2)
@@ -1542,10 +1530,11 @@ class SSH(object):
if not files:
raise GsyncdError("no files to sync")
logging.debug("files: " + ", ".join(files))
- (host, rdir) = slaveurl.split(':')
+ (host, rdir) = self.slaveurl.split(':')
+
tar_cmd = ["tar"] + \
["--sparse", "-cf", "-", "--files-from", "-"]
- ssh_cmd = gconf.get("ssh-command-tar").split() + \
+ ssh_cmd = gconf.get("ssh-command").split() + \
gconf.get("ssh-options-tar").split() + \
["-p", str(gconf.get("ssh-port"))] + \
[host, "tar"] + \
@@ -1561,15 +1550,29 @@ class SSH(object):
p0.stdin.close()
p0.stdout.close() # Allow p0 to receive a SIGPIPE if p1 exits.
- # wait for tar to terminate, collecting any errors, further
- # waiting for transfer to complete
- _, stderr1 = p1.communicate()
# stdin and stdout of p0 is already closed, Reset to None and
# wait for child process to complete
p0.stdin = None
p0.stdout = None
- p0.communicate()
+
+ def wait_for_tar(p0):
+ _, stderr = p0.communicate()
+ if log_err:
+ for errline in stderr.strip().split("\n")[:-1]:
+ if "No such file or directory" not in errline:
+ logging.error(lf("SYNC Error",
+ sync_engine="Tarssh",
+ error=errline))
+
+ t = syncdutils.Thread(target=wait_for_tar, args=(p0, ))
+ # wait for tar to terminate, collecting any errors, further
+ # waiting for transfer to complete
+ t.start()
+
+ # wait for ssh process
+ _, stderr1 = p1.communicate()
+ t.join()
if log_err:
for errline in stderr1.strip().split("\n")[:-1]:
diff --git a/geo-replication/syncdaemon/subcmds.py b/geo-replication/syncdaemon/subcmds.py
index 30050ec4743..b8508532e30 100644
--- a/geo-replication/syncdaemon/subcmds.py
+++ b/geo-replication/syncdaemon/subcmds.py
@@ -73,7 +73,11 @@ def subcmd_worker(args):
Popen.init_errhandler()
fcntl.fcntl(args.feedback_fd, fcntl.F_SETFD, fcntl.FD_CLOEXEC)
local = GLUSTER("localhost", args.master)
- slavehost, slavevol = args.slave.split("::")
+ slave_url, slavevol = args.slave.split("::")
+ if "@" not in slave_url:
+ slavehost = args.resource_remote
+ else:
+ slavehost = "%s@%s" % (slave_url.split("@")[0], args.resource_remote)
remote = SSH(slavehost, slavevol)
remote.connect_remote()
local.connect()
@@ -93,25 +97,19 @@ def subcmd_slave(args):
local.service_loop()
-def subcmd_agent(args):
- import os
- from changelogagent import agent, Changelog
- from syncdutils import lf
-
- os.setsid()
- logging.debug(lf("RPC FD",
- rpc_fd=repr(args.rpc_fd)))
- return agent(Changelog(), args.rpc_fd)
-
-
def subcmd_voluuidget(args):
from subprocess import Popen, PIPE
import xml.etree.ElementTree as XET
ParseError = XET.ParseError if hasattr(XET, 'ParseError') else SyntaxError
- po = Popen(['gluster', '--xml', '--remote-host=' + args.host,
- 'volume', 'info', args.volname], bufsize=0,
+ cmd = ['gluster', '--xml', '--remote-host=' + args.host,
+ 'volume', 'info', args.volname]
+
+ if args.inet6:
+ cmd.append("--inet6")
+
+ po = Popen(cmd, bufsize=0,
stdin=None, stdout=PIPE, stderr=PIPE,
universal_newlines=True)
diff --git a/geo-replication/syncdaemon/syncdutils.py b/geo-replication/syncdaemon/syncdutils.py
index fad1a3e4f76..a3df103e76c 100644
--- a/geo-replication/syncdaemon/syncdutils.py
+++ b/geo-replication/syncdaemon/syncdutils.py
@@ -21,8 +21,8 @@ import subprocess
import socket
from subprocess import PIPE
from threading import Lock, Thread as baseThread
-from errno import EACCES, EAGAIN, EPIPE, ENOTCONN, ECONNABORTED
-from errno import EINTR, ENOENT, ESTALE, EBUSY, errorcode
+from errno import (EACCES, EAGAIN, EPIPE, ENOTCONN, ENOMEM, ECONNABORTED,
+ EINTR, ENOENT, ESTALE, EBUSY, ENODATA, errorcode, EIO)
from signal import signal, SIGTERM
import select as oselect
from os import waitpid as owaitpid
@@ -37,10 +37,10 @@ from conf import GLUSTERFS_LIBEXECDIR, UUID_FILE
sys.path.insert(1, GLUSTERFS_LIBEXECDIR)
EVENTS_ENABLED = True
try:
- from events.eventtypes import GEOREP_FAULTY as EVENT_GEOREP_FAULTY
- from events.eventtypes import GEOREP_ACTIVE as EVENT_GEOREP_ACTIVE
- from events.eventtypes import GEOREP_PASSIVE as EVENT_GEOREP_PASSIVE
- from events.eventtypes import GEOREP_CHECKPOINT_COMPLETED \
+ from gfevents.eventtypes import GEOREP_FAULTY as EVENT_GEOREP_FAULTY
+ from gfevents.eventtypes import GEOREP_ACTIVE as EVENT_GEOREP_ACTIVE
+ from gfevents.eventtypes import GEOREP_PASSIVE as EVENT_GEOREP_PASSIVE
+ from gfevents.eventtypes import GEOREP_CHECKPOINT_COMPLETED \
as EVENT_GEOREP_CHECKPOINT_COMPLETED
except ImportError:
# Events APIs not installed, dummy eventtypes with None
@@ -55,14 +55,15 @@ from rconf import rconf
from hashlib import sha256 as sha256
+ENOTSUP = getattr(errno, 'ENOTSUP', 'EOPNOTSUPP')
+
# auxiliary gfid based access prefix
_CL_AUX_GFID_PFX = ".gfid/"
+ROOT_GFID = "00000000-0000-0000-0000-000000000001"
GF_OP_RETRIES = 10
GX_GFID_CANONICAL_LEN = 37 # canonical gfid len + '\0'
-CHANGELOG_AGENT_SERVER_VERSION = 1.0
-CHANGELOG_AGENT_CLIENT_VERSION = 1.0
NodeID = None
rsync_version = None
unshare_mnt_propagation = None
@@ -99,6 +100,19 @@ def unescape_space_newline(s):
.replace(NEWLINE_ESCAPE_CHAR, "\n")\
.replace(PERCENTAGE_ESCAPE_CHAR, "%")
+# gf_mount_ready() returns 1 if all subvols are up, else 0
+def gf_mount_ready():
+ ret = errno_wrap(Xattr.lgetxattr,
+ ['.', 'dht.subvol.status', 16],
+ [ENOENT, ENOTSUP, ENODATA], [ENOMEM])
+
+ if isinstance(ret, int):
+ logging.error("failed to get the xattr value")
+ return 1
+ ret = ret.rstrip('\x00')
+ if ret == "1":
+ return 1
+ return 0
def norm(s):
if s:
@@ -330,13 +344,24 @@ def log_raise_exception(excont):
ECONNABORTED):
logging.error(lf('Gluster Mount process exited',
error=errorcode[exc.errno]))
+ elif isinstance(exc, OSError) and exc.errno == EIO:
+ logging.error("Getting \"Input/Output error\" "
+ "is most likely due to "
+ "a. Brick is down or "
+ "b. Split brain issue.")
+ logging.error("This is expected as per design to "
+ "keep the consistency of the file system. "
+ "Once the above issue is resolved "
+ "geo-replication would automatically "
+ "proceed further.")
+ logtag = "FAIL"
else:
logtag = "FAIL"
if not logtag and logging.getLogger().isEnabledFor(logging.DEBUG):
logtag = "FULL EXCEPTION TRACE"
if logtag:
logging.exception(logtag + ": ")
- sys.stderr.write("failed with %s.\n" % type(exc).__name__)
+ sys.stderr.write("failed with %s: %s.\n" % (type(exc).__name__, exc))
excont.exval = 1
sys.exit(excont.exval)
@@ -563,7 +588,6 @@ def errno_wrap(call, arg=[], errnos=[], retry_errnos=[]):
def lstat(e):
return errno_wrap(os.lstat, [e], [ENOENT], [ESTALE, EBUSY])
-
def get_gfid_from_mnt(gfidpath):
return errno_wrap(Xattr.lgetxattr,
[gfidpath, 'glusterfs.gfid.string',
@@ -599,7 +623,7 @@ class ChangelogException(OSError):
def gf_event(event_type, **kwargs):
if EVENTS_ENABLED:
- from events.gf_event import gf_event as gfevent
+ from gfevents.gf_event import gf_event as gfevent
gfevent(event_type, **kwargs)
@@ -670,9 +694,10 @@ def get_slv_dir_path(slv_host, slv_volume, gfid):
global slv_bricks
dir_path = ENOENT
+ pfx = gauxpfx()
if not slv_bricks:
- slv_info = Volinfo(slv_volume, slv_host)
+ slv_info = Volinfo(slv_volume, slv_host, master=False)
slv_bricks = slv_info.bricks
# Result of readlink would be of format as below.
# readlink = "../../pgfid[0:2]/pgfid[2:4]/pgfid/basename"
@@ -683,19 +708,32 @@ def get_slv_dir_path(slv_host, slv_volume, gfid):
gfid[2:4],
gfid], [ENOENT], [ESTALE])
if dir_path != ENOENT:
- break
-
- if not isinstance(dir_path, int):
- realpath = errno_wrap(os.readlink, [dir_path],
- [ENOENT], [ESTALE])
-
- if not isinstance(realpath, int):
- realpath_parts = realpath.split('/')
- pargfid = realpath_parts[-2]
- basename = realpath_parts[-1]
- pfx = gauxpfx()
- dir_entry = os.path.join(pfx, pargfid, basename)
- return dir_entry
+ try:
+ realpath = errno_wrap(os.readlink, [dir_path],
+ [ENOENT], [ESTALE])
+ if not isinstance(realpath, int):
+ realpath_parts = realpath.split('/')
+ pargfid = realpath_parts[-2]
+ basename = realpath_parts[-1]
+ dir_entry = os.path.join(pfx, pargfid, basename)
+ return dir_entry
+ except OSError:
+ # .gfid/GFID
+ gfidpath = unescape_space_newline(os.path.join(pfx, gfid))
+ realpath = errno_wrap(Xattr.lgetxattr_buf,
+ [gfidpath, 'glusterfs.gfid2path'], [ENOENT], [ESTALE])
+ if not isinstance(realpath, int):
+ basename = os.path.basename(realpath).rstrip('\x00')
+ dirpath = os.path.dirname(realpath)
+ if dirpath == "/":
+ pargfid = ROOT_GFID
+ else:
+ dirpath = dirpath.strip("/")
+ pargfid = get_gfid_from_mnt(dirpath)
+ if isinstance(pargfid, int):
+ return None
+ dir_entry = os.path.join(pfx, pargfid, basename)
+ return dir_entry
return None
@@ -705,12 +743,12 @@ def lf(event, **kwargs):
Log Format helper function, log messages can be
easily modified to structured log format.
lf("Config Change", sync_jobs=4, brick=/bricks/b1) will be
- converted as "Config Change<TAB>brick=/bricks/b1<TAB>sync_jobs=4"
+ converted as "Config Change [{brick=/bricks/b1}, {sync_jobs=4}]"
"""
- msg = event
+ msgparts = []
for k, v in kwargs.items():
- msg += "\t{0}={1}".format(k, v)
- return msg
+ msgparts.append("{%s=%s}" % (k, v))
+ return "%s [%s]" % (event, ", ".join(msgparts))
class Popen(subprocess.Popen):
@@ -847,7 +885,7 @@ class Popen(subprocess.Popen):
break
b = os.read(self.stderr.fileno(), 1024)
if b:
- elines.append(b)
+ elines.append(b.decode())
else:
break
self.stderr.close()
@@ -856,10 +894,29 @@ class Popen(subprocess.Popen):
self.errfail()
+def host_brick_split(value):
+ """
+ IPv6 compatible way to split and get the host
+ and brick information. Example inputs:
+ node1.example.com:/exports/bricks/brick1/brick
+ fe80::af0f:df82:844f:ef66%utun0:/exports/bricks/brick1/brick
+ """
+ parts = value.split(":")
+ brick = parts[-1]
+ hostparts = parts[0:-1]
+ return (":".join(hostparts), brick)
+
+
class Volinfo(object):
- def __init__(self, vol, host='localhost', prelude=[]):
- po = Popen(prelude + ['gluster', '--xml', '--remote-host=' + host,
+ def __init__(self, vol, host='localhost', prelude=[], master=True):
+ if master:
+ gluster_cmd_dir = gconf.get("gluster-command-dir")
+ else:
+ gluster_cmd_dir = gconf.get("slave-gluster-command-dir")
+
+ gluster_cmd = os.path.join(gluster_cmd_dir, 'gluster')
+ po = Popen(prelude + [gluster_cmd, '--xml', '--remote-host=' + host,
'volume', 'info', vol],
stdout=PIPE, stderr=PIPE, universal_newlines=True)
vix = po.stdout.read()
@@ -892,7 +949,7 @@ class Volinfo(object):
@memoize
def bricks(self):
def bparse(b):
- host, dirp = b.find("name").text.split(':', 2)
+ host, dirp = host_brick_split(b.find("name").text)
return {'host': host, 'dir': dirp, 'uuid': b.find("hostUuid").text}
return [bparse(b) for b in self.get('brick')]
@@ -924,6 +981,14 @@ class Volinfo(object):
else:
return int(self.get('disperseCount')[0].text)
+ def distribution_count(self, tier, hot):
+ if (tier and hot):
+ return int(self.get('hotBricks/hotdistCount')[0].text)
+ elif (tier and not hot):
+ return int(self.get('coldBricks/colddistCount')[0].text)
+ else:
+ return int(self.get('distCount')[0].text)
+
@property
@memoize
def hot_bricks(self):
@@ -960,6 +1025,16 @@ class VolinfoFromGconf(object):
def is_hot(self, brickpath):
return False
+ def is_uuid(self, value):
+ try:
+ uuid.UUID(value)
+ return True
+ except ValueError:
+ return False
+
+ def possible_path(self, value):
+ return "/" in value
+
@property
@memoize
def bricks(self):
@@ -973,8 +1048,22 @@ class VolinfoFromGconf(object):
out = []
for b in bricks_data:
parts = b.split(":")
- bpath = parts[2] if len(parts) == 3 else ""
- out.append({"host": parts[1], "dir": bpath, "uuid": parts[0]})
+ b_uuid = None
+ if self.is_uuid(parts[0]):
+ b_uuid = parts[0]
+ # Set all parts except first
+ parts = parts[1:]
+
+ if self.possible_path(parts[-1]):
+ bpath = parts[-1]
+ # Set all parts except last
+ parts = parts[0:-1]
+
+ out.append({
+ "host": ":".join(parts), # if remaining parts are IPv6 name
+ "dir": bpath,
+ "uuid": b_uuid
+ })
return out
@@ -992,6 +1081,9 @@ class VolinfoFromGconf(object):
def disperse_count(self, tier, hot):
return gconf.get("master-disperse-count")
+ def distribution_count(self, tier, hot):
+ return gconf.get("master-distribution-count")
+
@property
@memoize
def hot_bricks(self):
diff --git a/geo-replication/tests/unit/test_gsyncdstatus.py b/geo-replication/tests/unit/test_gsyncdstatus.py
index 483023dbfe9..9c1aa2ad4ad 100755
--- a/geo-replication/tests/unit/test_gsyncdstatus.py
+++ b/geo-replication/tests/unit/test_gsyncdstatus.py
@@ -13,11 +13,11 @@ import unittest
import os
import urllib
-from syncdaemon.gstatus import GeorepStatus, set_monitor_status
-from syncdaemon.gstatus import get_default_values
-from syncdaemon.gstatus import MONITOR_STATUS, DEFAULT_STATUS
-from syncdaemon.gstatus import STATUS_VALUES, CRAWL_STATUS_VALUES
-from syncdaemon.gstatus import human_time, human_time_utc
+from syncdaemon.gstatus import (GeorepStatus, set_monitor_status,
+ get_default_values,
+ MONITOR_STATUS, DEFAULT_STATUS,
+ STATUS_VALUES, CRAWL_STATUS_VALUES,
+ human_time, human_time_utc)
class GeorepStatusTestCase(unittest.TestCase):