From badf662b1521ed6158f42a6ac11024ae80d99ecc Mon Sep 17 00:00:00 2001 From: krad Date: Tue, 5 Dec 2017 13:53:27 -0800 Subject: distributed-tests: Framework for running tests on a worker cluster Summary: This framework helps distribute running of unit tests on a cluster of machines. Test Plan: Run tests on fb cluster Reviewers: sshreyas, jdarcy Change-Id: If309f504d9aa959cc8b01c85bff3b5503a890ff1 updates #374 Signed-off-by: krad --- extras/distributed-testing/README | 28 + .../distributed-testing/distributed-test-build-env | 21 + .../distributed-testing/distributed-test-build.sh | 27 + extras/distributed-testing/distributed-test-env | 144 ++++ .../distributed-testing/distributed-test-runner.py | 842 +++++++++++++++++++++ extras/distributed-testing/distributed-test.sh | 91 +++ 6 files changed, 1153 insertions(+) create mode 100644 extras/distributed-testing/README create mode 100644 extras/distributed-testing/distributed-test-build-env create mode 100755 extras/distributed-testing/distributed-test-build.sh create mode 100644 extras/distributed-testing/distributed-test-env create mode 100755 extras/distributed-testing/distributed-test-runner.py create mode 100755 extras/distributed-testing/distributed-test.sh (limited to 'extras') diff --git a/extras/distributed-testing/README b/extras/distributed-testing/README new file mode 100644 index 00000000000..928d943f211 --- /dev/null +++ b/extras/distributed-testing/README @@ -0,0 +1,28 @@ +PROBLEM + +The testing methodology of Gluster is extremely slow. It takes a very long time (6+ hrs) to run the basic tests on a single machine. It takes about 20+ hours to run code analysis version of tests like valgrind, asan, tsan etc. + +SOLUTION + +The fundamental problem is that the tests cannot be parallelized on a single machine. The natural solution is to run these tests on a cluster of machines. In a nutshell, apply map-reduce to run unit tests. + +WORK @ Facebook + +At Facebook we have applied the map-reduce approach to testing and have observed 10X improvements. + +The solution supports the following + +Distribute tests across machines, collect results/logs +Share worker pool across different testers +Try failure 3 times on 3 different machines before calling it a failure +Support running asan, valgrind, asan-noleaks +Self management of worker pools. The clients will manage the worker pool including version update, no manual maintenance required +WORK + +Port the code from gluster-fb-3.8 to gluster master + +HOW TO RUN + +./extras/distributed-testing/distributed-test.sh --hosts '

' + +All hosts should have no password for ssh via root. This can be achieved with keys setup on the client and the server machines. diff --git a/extras/distributed-testing/distributed-test-build-env b/extras/distributed-testing/distributed-test-build-env new file mode 100644 index 00000000000..4046eee8b40 --- /dev/null +++ b/extras/distributed-testing/distributed-test-build-env @@ -0,0 +1,21 @@ +#!/bin/bash + +GF_CONF_OPTS="--localstatedir=/var --sysconfdir /var/lib --prefix /usr --libdir /usr/lib64 \ + --enable-fusermount --enable-mempool --enable-api --with-jemalloc\ + --disable-tiering --with-ipv6-default --enable-gnfs" + +if [ -x /usr/lib/rpm/redhat/dist.sh ]; then + REDHAT_MAJOR=$(/usr/lib/rpm/redhat/dist.sh --distnum) +else + REDHAT_MAJOR=0 +fi + +ASAN_ENABLED=${ASAN_ENABLED:=0} +if [ "$ASAN_ENABLED" -eq "1" ]; then + GF_CONF_OPTS="$GF_CONF_OPTS --with-asan" +fi + +GF_CONF_OPTS="$GF_CONF_OPTS --with-systemd" +export GF_CONF_OPTS + +export CFLAGS="-O0 -ggdb -fPIC -Wall" diff --git a/extras/distributed-testing/distributed-test-build.sh b/extras/distributed-testing/distributed-test-build.sh new file mode 100755 index 00000000000..e8910d8425c --- /dev/null +++ b/extras/distributed-testing/distributed-test-build.sh @@ -0,0 +1,27 @@ +#!/bin/bash + +set -e + +EXTRA_CONFIGURE_ARGS="$@" +ASAN_REQUESTED=false +for arg in $EXTRA_CONFIGURE_ARGS; do + if [ $arg == "--with-asan" ]; then + echo "Requested ASAN, cleaning build first." + make -j distclean || true + touch .with_asan + ASAN_REQUESTED=true + fi +done + +if [ $ASAN_REQUESTED == false ]; then + if [ -f .with_asan ]; then + echo "Previous build was with ASAN, cleaning build first." + make -j distclean || true + rm -v .with_asan + fi +fi + +source extras/distributed-testing/distributed-test-build-env +./autogen.sh +./configure $GF_CONF_OPTS $EXTRA_CONFIGURE_ARGS +make -j diff --git a/extras/distributed-testing/distributed-test-env b/extras/distributed-testing/distributed-test-env new file mode 100644 index 00000000000..5699adf97bf --- /dev/null +++ b/extras/distributed-testing/distributed-test-env @@ -0,0 +1,144 @@ +#!/bin/bash + +SMOKE_TESTS="\ + tests/basic/*.t\ + tests/basic/afr/*.t\ + tests/basic/distribute/*.t\ + tests/bugs/fb*.t\ + tests/features/brick-min-free-space.t\ +" + +KNOWN_FLAKY_TESTS="\ +" + +BROKEN_TESTS="\ + tests/features/recon.t\ + tests/features/ipc.t\ + tests/bugs/io-cache/bug-read-hang.t\ + tests/bugs/rpc/bug-847624.t\ + tests/encryption/crypt.t\ + tests/bugs/shard/bug-shard-zerofill.t\ + tests/bugs/shard/shard-append-test.t\ + tests/bugs/shard/zero-flag.t\ + tests/bugs/shard/bug-shard-discard.t\ + tests/bugs/rpc/bug-921072.t\ + tests/bugs/protocol/bug-1433815-auth-allow.t\ + tests/bugs/glusterfs-server/bug-904300.t\ + tests/bugs/glusterd/bug-1507466-reset-brick-commit-force.t\ + tests/bugs/glusterd/bug-948729/bug-948729-mode-script.t\ + tests/bugs/glusterd/bug-948729/bug-948729.t\ + tests/bugs/glusterd/bug-948729/bug-948729-force.t\ + tests/bugs/glusterd/bug-1482906-peer-file-blank-line.t\ + tests/bugs/glusterd/bug-889630.t\ + tests/bugs/glusterd/bug-1047955.t\ + tests/bugs/glusterd/bug-1344407-volume-delete-on-node-down.t\ + tests/bugs/glusterd/bug-888752.t\ + tests/bugs/glusterd/bug-1245142-rebalance_test.t\ + tests/bugs/glusterd/bug-1109741-auth-mgmt-handshake.t\ + tests/bugs/glusterd/bug-1223213-peerid-fix.t\ + tests/bugs/glusterd/bug-1454418-seg-fault.t\ + tests/bugs/glusterd/bug-1266818-shared-storage-disable.t\ + tests/bugs/gfapi/glfs_vol_set_IO_ERR.t\ + tests/bugs/gfapi/bug-1447266/1460514.t\ + tests/bugs/gfapi/bug-1093594.t\ + tests/bugs/gfapi/bug-1319374-THIS-crash.t\ + tests/bugs/cli/bug-1169302.t\ + tests/bugs/ec/bug-1161886.t\ + tests/bugs/glusterd/bug-1238706-daemons-stop-on-peer-cleanup.t\ + tests/bugs/gfapi/bug-1447266/bug-1447266.t\ + tests/bugs/tier/bug-1205545-CTR-and-trash-integration.t\ + tests/bugs/glusterd/bug-1091935-brick-order-check-from-cli-to-glusterd.t\ + tests/bugs/glusterd/bug-964059.t\ + tests/bugs/glusterd/bug-1230121-replica_subvol_count_correct_cal.t\ + tests/bugs/glusterd/bug-1022055.t\ + tests/bugs/glusterd/bug-1322145-disallow-detatch-peer.t\ + tests/bugs/distribute/bug-1066798.t\ + tests/bugs/posix/disallow-gfid-volumeid-fremovexattr.t\ + tests/bugs/tier/bug-1279376-rename-demoted-file.t\ + tests/bugs/glusterd/bug-1303028-Rebalance-glusterd-rpc-connection-issue.t\ + tests/bugs/posix/bug-990028.t\ + tests/bugs/snapshot/bug-1140162-file-snapshot-features-encrypt-opts-validation.t\ + tests/bugs/glusterd/bug-948686.t\ + tests/bugs/glusterd/bug-1213295-snapd-svc-uninitialized.t\ + tests/bugs/glusterd/bug-1352277-spawn-daemons-on-two-node-setup.t\ + tests/bugs/glusterd/bug-1323287-real_path-handshake-test.t\ + tests/bugs/distribute/bug-1389697.t\ + tests/bugs/glusterd/bug-1173414-mgmt-v3-remote-lock-failure.t\ + tests/bugs/md-cache/afr-stale-read.t\ + tests/bugs/snapshot/bug-1202436-calculate-quota-cksum-during-snap-restore.t\ + tests/bugs/snapshot/bug-1112613.t\ + tests/bugs/snapshot/bug-1049834.t\ + tests/bugs/glusterd/bug-1420637-volume-sync-fix.t\ + tests/bugs/glusterd/bug-1104642.t\ + tests/bugs/glusterd/bug-1177132-quorum-validation.t\ + tests/bugs/snapshot/bug-1087203.t\ + tests/bugs/glusterd/bug-1293414-import-brickinfo-uuid.t\ + tests/bugs/snapshot/bug-1512451-snapshot-creation-failed-after-brick-reset.t\ + tests/bugs/glusterd/bug-1383893-daemons-to-follow-quorum.t\ + tests/bugs/nfs/bug-1116503.t\ + tests/bugs/nfs/bug-1157223-symlink-mounting.t\ + tests/bugs/glusterd/bug-1483058-replace-brick-quorum-validation.t\ + tests/bugs/bitrot/1207029-bitrot-daemon-should-start-on-valid-node.t\ + tests/bugs/snapshot/bug-1205592.t\ + tests/bugs/replicate/bug-1473026.t\ + tests/bugs/glusterd/bug-913555.t\ + tests/basic/bd.t\ + tests/bugs/quota/bug-1287996.t\ + tests/bugs/bitrot/1209752-volume-status-should-show-bitrot-scrub-info.t\ + tests/bugs/quota/bug-1288474.t\ + tests/bugs/snapshot/bug-1482023-snpashot-issue-with-other-processes-accessing-mounted-path.t\ + tests/basic/gfapi/bug-1241104.t\ + tests/basic/gfapi/glfs_sysrq.t\ + tests/basic/gfapi/glfd-lkowner.t\ + tests/basic/gfapi/upcall-register-api.t\ + tests/basic/gfapi/bug1291259.t\ + tests/basic/gfapi/gfapi-dup.t\ + tests/basic/gfapi/anonymous_fd.t\ + tests/basic/gfapi/gfapi-trunc.t\ + tests/basic/gfapi/glfs_xreaddirplus_r.t\ + tests/basic/gfapi/upcall-cache-invalidate.t\ + tests/basic/gfapi/gfapi-async-calls-test.t\ + tests/basic/gfapi/gfapi-ssl-test.t\ + tests/bugs/glusterd/bug-1245045-remove-brick-validation.t\ + tests/basic/ec/ec-seek.t\ + tests/bugs/glusterd/bug-1345727-bricks-stop-on-no-quorum-validation.t\ + tests/bugs/glusterd/bug-1367478-volume-start-validation-after-glusterd-restart.t\ + tests/basic/meta.t\ + tests/geo-rep/georep-basic-dr-tarssh.t\ + tests/basic/tier/file_with_spaces.t\ + tests/basic/tier/bug-1214222-directories_missing_after_attach_tier.t\ + tests/basic/tier/readdir-during-migration.t\ + tests/basic/tier/ctr-rename-overwrite.t\ + tests/basic/tier/tier_lookup_heal.t\ + tests/basic/tier/record-metadata-heat.t\ + tests/basic/tier/bug-1260185-donot-allow-detach-commit-unnecessarily.t\ + tests/basic/tier/locked_file_migration.t\ + tests/bugs/glusterd/bug-1231437-rebalance-test-in-cluster.t\ + tests/basic/tier/tier-heald.t\ + tests/basic/tier/frequency-counters.t\ + tests/basic/glusterd/arbiter-volume-probe.t\ + tests/basic/tier/tier-snapshot.t\ + tests/basic/glusterd/volfile_server_switch.t\ + tests/geo-rep/georep-basic-dr-rsync.t\ + tests/basic/distribute/rebal-all-nodes-migrate.t\ + tests/basic/jbr/jbr.t\ + tests/basic/volume-snapshot.t\ + tests/basic/afr/granular-esh/cli.t\ + tests/bitrot/bug-1294786.t\ + tests/basic/mgmt_v3-locks.t\ + tests/basic/tier/tierd_check.t\ + tests/basic/volume-snapshot-clone.t\ + tests/bugs/replicate/bug-1290965-detect-bitrotten-objects.t\ + tests/basic/tier/fops-during-migration.t\ + tests/basic/tier/fops-during-migration-pause.t\ + tests/basic/tier/unlink-during-migration.t\ + tests/basic/tier/new-tier-cmds.t\ + tests/basic/tier/tier.t\ + tests/bugs/readdir-ahead/bug-1436090.t\ + tests/basic/tier/legacy-many.t\ + tests/basic/gfapi/gfapi-load-volfile.t +" + +SMOKE_TESTS=$(echo $SMOKE_TESTS | tr -s ' ' ' ') +KNOWN_FLAKY_TESTS=$(echo $KNOWN_FLAKY_TESTS | tr -s ' ' ' ') +BROKEN_TESTS=$(echo $BROKEN_TESTS | tr -s ' ' ' ') diff --git a/extras/distributed-testing/distributed-test-runner.py b/extras/distributed-testing/distributed-test-runner.py new file mode 100755 index 00000000000..9a74b7ab5c5 --- /dev/null +++ b/extras/distributed-testing/distributed-test-runner.py @@ -0,0 +1,842 @@ +#!/usr/bin/env python + +from __future__ import absolute_import +from __future__ import division +from __future__ import unicode_literals +from __future__ import print_function +import re +import sys +import fcntl +import base64 +import threading +import socket +import os +import shlex +import argparse +import subprocess +import time +import SimpleXMLRPCServer +import xmlrpclib +import md5 +import httplib +import uuid + +DEFAULT_PORT = 9999 +TEST_TIMEOUT_S = 15 * 60 +CLIENT_CONNECT_TIMEOUT_S = 10 +CLIENT_TIMEOUT_S = 60 +PATCH_FILE_UID = str(uuid.uuid4()) +SSH_TIMEOUT_S = 10 +MAX_ATTEMPTS = 3 + + +def patch_file(): + return "/tmp/%s-patch.tar.gz" % PATCH_FILE_UID + +# .............................................................................. +# SimpleXMLRPCServer IPv6 Wrapper +# .............................................................................. + + +class IPv6SimpleXMLRPCServer(SimpleXMLRPCServer.SimpleXMLRPCServer): + def __init__(self, addr): + SimpleXMLRPCServer.SimpleXMLRPCServer.__init__(self, addr) + + def server_bind(self): + if self.socket: + self.socket.close() + self.socket = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) + self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + SimpleXMLRPCServer.SimpleXMLRPCServer.server_bind(self) + + +class IPv6HTTPConnection(httplib.HTTPConnection): + def __init__(self, host): + self.host = host + httplib.HTTPConnection.__init__(self, host) + + def connect(self): + old_timeout = socket.getdefaulttimeout() + self.sock = socket.create_connection((self.host, self.port), + timeout=CLIENT_CONNECT_TIMEOUT_S) + self.sock.settimeout(old_timeout) + + +class IPv6Transport(xmlrpclib.Transport): + def __init__(self, *args, **kwargs): + xmlrpclib.Transport.__init__(self, *args, **kwargs) + + def make_connection(self, host): + return IPv6HTTPConnection(host) + + +# .............................................................................. +# Common +# .............................................................................. + + +class Timer: + def __init__(self): + self.start = time.time() + + def elapsed_s(self): + return int(time.time() - self.start) + + def reset(self): + ret = self.elapsed_s() + self.start = time.time() + return ret + + +def encode(buf): + return base64.b16encode(buf) + + +def decode(buf): + return base64.b16decode(buf) + + +def get_file_content(path): + with open(path, "r") as f: + return f.read() + + +def write_to_file(path, data): + with open(path, "w") as f: + f.write(data) + + +def failsafe(fn, args=()): + try: + return (True, fn(*args)) + except (xmlrpclib.Fault, xmlrpclib.ProtocolError, xmlrpclib.ResponseError, + Exception) as err: + Log.debug(str(err)) + return (False, None) + + +class LogLevel: + DEBUG = 2 + ERROR = 1 + CLI = 0 + + +class Log: + LOGLEVEL = LogLevel.ERROR + + @staticmethod + def _normalize(msg): + return msg[:100] + + @staticmethod + def debug(msg): + if Log.LOGLEVEL >= LogLevel.DEBUG: + sys.stdout.write(" %s\n" % Log._normalize(msg)) + sys.stdout.flush() + + @staticmethod + def error(msg): + sys.stderr.write(" %s\n" % Log._normalize(msg)) + + @staticmethod + def header(msg): + sys.stderr.write("* %s *\n" % Log._normalize(msg)) + + @staticmethod + def cli(msg): + sys.stderr.write("%s\n" % msg) + + +class Shell: + def __init__(self, cwd=None, logpath=None): + self.cwd = cwd + self.shell = True + self.redirect = open(os.devnull if not logpath else logpath, "wr+") + + def __del__(self): + self.redirect.close() + + def cd(self, cwd): + Log.debug("cd %s" % cwd) + self.cwd = cwd + + def truncate(self): + self.redirect.truncate(0) + + def read_logs(self): + self.redirect.seek(0) + return self.redirect.read() + + def check_call(self, cmd): + status = self.call(cmd) + if status: + raise Exception("Error running command %s. status=%s" + % (cmd, status)) + + def call(self, cmd): + if isinstance(cmd, list): + return self._calls(cmd) + + return self._call(cmd) + + def ssh(self, hostname, cmd, id_rsa=None): + flags = "" if not id_rsa else "-i " + id_rsa + return self.call("timeout %s ssh %s root@%s \"%s\"" % + (SSH_TIMEOUT_S, flags, hostname, cmd)) + + def scp(self, hostname, src, dest, id_rsa=None): + flags = "" if not id_rsa else "-i " + id_rsa + return self.call("timeout %s scp %s %s root@%s:%s" % + (SSH_TIMEOUT_S, flags, src, hostname, dest)) + + def output(self, cmd, cwd=None): + Log.debug("%s> %s" % (cwd, cmd)) + return subprocess.check_output(shlex.split(cmd), cwd=self.cwd) + + def _calls(self, cmds): + Log.debug("Running commands. %s" % cmds) + for c in cmds: + status = self.call(c) + if status: + Log.error("Commands failed with %s" % status) + return status + return 0 + + def _call(self, cmd): + if not self.shell: + cmd = shlex.split(cmd) + + Log.debug("%s> %s" % (self.cwd, cmd)) + + status = subprocess.call(cmd, cwd=self.cwd, shell=self.shell, + stdout=self.redirect, stderr=self.redirect) + + Log.debug("return %s" % status) + return status + + +# .............................................................................. +# Server role +# .............................................................................. + +class TestServer: + def __init__(self, port, scratchdir): + self.port = port + self.scratchdir = scratchdir + self.shell = Shell() + self.rpc = None + self.pidf = None + + self.shell.check_call("mkdir -p %s" % self.scratchdir) + self._process_lock() + + def __del__(self): + if self.pidf: + self.pidf.close() + + def init(self): + Log.debug("Starting xmlrpc server on port %s" % self.port) + self.rpc = IPv6SimpleXMLRPCServer(("", self.port)) + self.rpc.register_instance(Handlers(self.scratchdir)) + + def serve(self): + (status, _) = failsafe(self.rpc.serve_forever) + Log.cli("== End ==") + + def _process_lock(self): + pid_filename = os.path.basename(__file__).replace("/", "-") + pid_filepath = "%s/%s.pid" % (self.scratchdir, pid_filename) + self.pidf = open(pid_filepath, "w") + try: + fcntl.lockf(self.pidf, fcntl.LOCK_EX | fcntl.LOCK_NB) + # We have the lock, kick anybody listening on this port + self.shell.call("kill $(lsof -t -i:%s)" % self.port) + except IOError: + Log.error("Another process instance is running") + sys.exit(0) + +# +# Server Handler +# + + +handler_lock = threading.Lock() +handler_serving_since = Timer() + + +def synchronized(func): + def decorator(*args, **kws): + handler_lock.acquire() + h = args[0] + try: + h.shell.truncate() + ret = func(*args, **kws) + return ret + except Exception() as err: + Log.error(str(err)) + Log.error(decode(h._log_content())) + raise + finally: + handler_lock.release() + handler_serving_since.reset() + + return decorator + + +class Handlers: + def __init__(self, scratchdir): + self.client_id = None + self.scratchdir = scratchdir + self.gluster_root = "%s/glusterfs" % self.scratchdir + self.shell = Shell(logpath="%s/test-handlers.log" % self.scratchdir) + + def hello(self, id): + if not handler_lock.acquire(False): + return False + try: + return self._hello_locked(id) + finally: + handler_lock.release() + + def _hello_locked(self, id): + if handler_serving_since.elapsed_s() > CLIENT_TIMEOUT_S: + Log.debug("Disconnected client %s" % self.client_id) + self.client_id = None + + if not self.client_id: + self.client_id = id + handler_serving_since.reset() + return True + + return (id == self.client_id) + + @synchronized + def ping(self, id=None): + if id: + return id == self.client_id + return True + + @synchronized + def bye(self, id): + assert id == self.client_id + self.client_id = None + handler_serving_since.reset() + return True + + @synchronized + def cleanup(self, id): + assert id == self.client_id + self.shell.cd(self.gluster_root) + self.shell.check_call("PATH=.:$PATH; sudo ./clean_gfs_devserver.sh") + return True + + @synchronized + def copy(self, id, name, content): + with open("%s/%s" % (self.scratchdir, name), "w+") as f: + f.write(decode(content)) + return True + + @synchronized + def copygzip(self, id, content): + assert id == self.client_id + gzipfile = "%s/tmp.tar.gz" % self.scratchdir + tarfile = "%s/tmp.tar" % self.scratchdir + self.shell.check_call("rm -f %s" % gzipfile) + self.shell.check_call("rm -f %s" % tarfile) + write_to_file(gzipfile, decode(content)) + + self.shell.cd(self.scratchdir) + self.shell.check_call("rm -r -f %s" % self.gluster_root) + self.shell.check_call("mkdir -p %s" % self.gluster_root) + + self.shell.cd(self.gluster_root) + cmds = [ + "gunzip -f -q %s" % gzipfile, + "tar -xvf %s" % tarfile + ] + return self.shell.call(cmds) == 0 + + @synchronized + def build(self, id, asan=False): + assert id == self.client_id + self.shell.cd(self.gluster_root) + self.shell.call("make clean") + env = "ASAN_ENABLED=1" if asan else "" + return self.shell.call( + "%s ./extras/distributed-testing/distributed-test-build.sh" % env) == 0 + + @synchronized + def install(self, id): + assert id == self.client_id + self.shell.cd(self.gluster_root) + return self.shell.call("make install") == 0 + + @synchronized + def prove(self, id, test, timeout, valgrind=False, asan_noleaks=True): + assert id == self.client_id + self.shell.cd(self.gluster_root) + env = "DEBUG=1 " + if valgrind: + cmd = "valgrind" + cmd += " --tool=memcheck --leak-check=full --track-origins=yes" + cmd += " --show-leak-kinds=all -v prove -v" + elif asan_noleaks: + cmd = "prove -v" + env += "ASAN_OPTIONS=detect_leaks=0 " + else: + cmd = "prove -v" + + status = self.shell.call( + "%s timeout %s %s %s" % (env, timeout, cmd, test)) + + if status != 0: + return (False, self._log_content()) + return (True, "") + + def _log_content(self): + return encode(self.shell.read_logs()) + +# .............................................................................. +# Cli role +# .............................................................................. + + +class RPCConnection((threading.Thread)): + def __init__(self, host, port, path, cb): + threading.Thread.__init__(self) + self.host = host + self.port = port + self.path = path + self.shell = Shell() + self.cb = cb + self.stop = False + self.proxy = None + self.logid = "%s:%s" % (self.host, self.port) + + def connect(self): + (status, ret) = failsafe(self._connect) + return (status and ret) + + def _connect(self): + url = "http://%s:%s" % (self.host, self.port) + self.proxy = xmlrpclib.ServerProxy(url, transport=IPv6Transport()) + return self.proxy.hello(self.cb.id) + + def disconnect(self): + self.stop = True + + def ping(self): + return self.proxy.ping() + + def init(self): + return self._copy() and self._compile_and_install() + + def run(self): + (status, ret) = failsafe(self.init) + if not status: + self.cb.note_lost_connection(self) + return + elif not ret: + self.cb.note_setup_failed(self) + return + + while not self.stop: + (status, ret) = failsafe(self._run) + if not status or not ret: + self.cb.note_lost_connection(self) + break + time.sleep(0) + + failsafe(self.proxy.bye, (self.cb.id,)) + Log.debug("%s connection thread stopped" % self.host) + + def _run(self): + test = self.cb.next_test() + (status, _) = failsafe(self._execute_next, (test,)) + if not status: + self.cb.note_retry(test) + return False + return True + + def _execute_next(self, test): + if not test: + time.sleep(1) + return + + (status, error) = self.proxy.prove(self.cb.id, test, + self.cb.test_timeout, + self.cb.valgrind, + self.cb.asan_noleaks) + if status: + self.cb.note_done(test) + else: + self.cb.note_error(test, error) + + def _compile_and_install(self): + Log.debug("<%s> Build " % self.logid) + asan = self.cb.asan or self.cb.asan_noleaks + return (self.proxy.build(self.cb.id, asan) and + self.proxy.install(self.cb.id)) + + def _copy(self): + return self._copy_gzip() + + def _copy_gzip(self): + Log.cli("<%s> copying and compiling %s to remote" % + (self.logid, self.path)) + data = encode(get_file_content(patch_file())) + Log.debug("GZIP size = %s B" % len(data)) + return self.proxy.copygzip(self.cb.id, data) + + +class RPCConnectionPool: + def __init__(self, gluster_path, hosts, n, id_rsa): + self.gluster_path = gluster_path + self.hosts = hosts + self.conns = [] + self.faulty = [] + self.n = int(len(hosts) / 2) + 1 if not n else n + self.id_rsa = id_rsa + self.stop = False + self.scanner = threading.Thread(target=self._scan_hosts_loop) + self.kicker = threading.Thread(target=self._kick_hosts_loop) + self.shell = Shell() + self.since_start = Timer() + + self.shell.check_call("rm -f %s" % patch_file()) + self.shell.check_call("tar -zcvf %s ." % patch_file()) + self.id = md5.new(get_file_content(patch_file())).hexdigest() + Log.cli("client UID %s" % self.id) + Log.cli("patch UID %s" % PATCH_FILE_UID) + + def __del__(self): + self.shell.check_call("rm -f %s" % patch_file()) + + def pool_status(self): + elapsed_m = int(self.since_start.elapsed_s() / 60) + return "%s/%s connected, %smin elapsed" % (len(self.conns), self.n, + elapsed_m) + + def connect(self): + Log.debug("Starting scanner") + self.scanner.start() + self.kicker.start() + + def disconnect(self): + self.stop = True + for conn in self.conns: + conn.disconnect() + + def note_lost_connection(self, conn): + Log.cli("lost connection to %s" % conn.host) + self.conns.remove(conn) + self.hosts.append((conn.host, conn.port)) + + def note_setup_failed(self, conn): + Log.error("Setup failed on %s:%s" % (conn.host, conn.port)) + self.conns.remove(conn) + self.faulty.append((conn.host, conn.port)) + + def _scan_hosts_loop(self): + Log.debug("Scanner thread started") + while not self.stop: + failsafe(self._scan_hosts) + time.sleep(5) + + def _scan_hosts(self): + if len(self.hosts) == 0 and len(self.conns) == 0: + Log.error("no more hosts available to loadbalance") + sys.exit(1) + + for (host, port) in self.hosts: + if (len(self.conns) >= self.n) or self.stop: + break + self._scan_host(host, port) + + def _scan_host(self, host, port): + Log.debug("scanning %s:%s" % (host, port)) + c = RPCConnection(host, port, self.gluster_path, self) + (status, result) = failsafe(c.connect) + if status and result: + self.hosts.remove((host, port)) + Log.debug("Connected to %s:%s" % (host, port)) + self.conns.append(c) + c.start() + Log.debug("%s / %s connected" % (len(self.conns), self.n)) + else: + Log.debug("Failed to connect to %s:%s" % (host, port)) + + def _kick_hosts_loop(self): + Log.debug("Kick thread started") + while not self.stop: + time.sleep(10) + failsafe(self._kick_hosts) + + Log.debug("Kick thread stopped") + + def _is_pingable(self, host, port): + c = RPCConnection(host, port, self.gluster_path, self) + failsafe(c.connect) + (status, result) = failsafe(c.ping) + return status and result + + def _kick_hosts(self): + # Do not kick hosts if we have the optimal number of connections + if (len(self.conns) >= self.n) or self.stop: + Log.debug("Skip kicking hosts") + return + + # Check and if dead kick all hosts + for (host, port) in self.hosts: + if self.stop: + Log.debug("Break kicking hosts") + break + + if self._is_pingable(host, port): + Log.debug("Host=%s is alive. Won't kick" % host) + continue + + Log.debug("Kicking %s" % host) + mypath = sys.argv[0] + myname = os.path.basename(mypath) + destpath = "/tmp/%s" % myname + sh = Shell() + sh.scp(host, mypath, destpath, self.id_rsa) + sh.ssh(host, "nohup %s --server &>> %s.log &" % + (destpath, destpath), self.id_rsa) + + def join(self): + self.scanner.join() + self.kicker.join() + for c in self.conns: + c.join() + + +# .............................................................................. +# test role +# .............................................................................. + +class TestRunner(RPCConnectionPool): + def __init__(self, gluster_path, hosts, n, tests, flaky_tests, valgrind, + asan, asan_noleaks, id_rsa, test_timeout): + RPCConnectionPool.__init__(self, gluster_path, self._parse_hosts(hosts), + n, id_rsa) + self.flaky_tests = flaky_tests.split(" ") + self.pending = [] + self.done = [] + self.error = [] + self.retry = {} + self.error_logs = [] + self.stats_timer = Timer() + self.valgrind = valgrind + self.asan = asan + self.asan_noleaks = asan_noleaks + self.test_timeout = test_timeout + + self.tests = self._get_tests(tests) + + Log.debug("tests: %s" % self.tests) + + def _get_tests(self, tests): + if not tests or tests == "all": + return self._not_flaky(self._all()) + elif tests == "flaky": + return self.flaky_tests + else: + return self._not_flaky(tests.strip().split(" ")) + + def run(self): + self.connect() + self.join() + return len(self.error) + + def _pretty_print(self, data): + if isinstance(data, list): + str = "" + for i in data: + str = "%s %s" % (str, i) + return str + return "%s" % data + + def print_result(self): + Log.cli("== RESULTS ==") + Log.cli("SUCCESS : %s" % len(self.done)) + Log.cli("ERRORS : %s" % len(self.error)) + Log.cli("== ERRORS ==") + Log.cli(self._pretty_print(self.error)) + Log.cli("== LOGS ==") + Log.cli(self._pretty_print(self.error_logs)) + Log.cli("== END ==") + + def next_test(self): + if len(self.tests): + test = self.tests.pop() + self.pending.append(test) + return test + + if not len(self.pending): + self.disconnect() + + return None + + def _pct_completed(self): + total = len(self.tests) + len(self.pending) + len(self.done) + total += len(self.error) + completed = len(self.done) + len(self.error) + return 0 if not total else int(completed / total * 100) + + def note_done(self, test): + Log.cli("%s PASS (%s%% done) (%s)" % (test, self._pct_completed(), + self.pool_status())) + self.pending.remove(test) + self.done.append(test) + if test in self.retry: + del self.retry[test] + + def note_error(self, test, errstr): + Log.cli("%s FAIL" % test) + self.pending.remove(test) + if test not in self.retry: + self.retry[test] = 1 + + if errstr: + path = "%s/%s-%s.log" % ("/tmp", test.replace("/", "-"), + self.retry[test]) + failsafe(write_to_file, (path, decode(errstr),)) + self.error_logs.append(path) + + if self.retry[test] < MAX_ATTEMPTS: + self.retry[test] += 1 + Log.debug("retry test %s attempt %s" % (test, self.retry[test])) + self.tests.append(test) + else: + Log.debug("giveup attempt test %s" % test) + del self.retry[test] + self.error.append(test) + + def note_retry(self, test): + Log.cli("retry %s on another host" % test) + self.pending.remove(test) + self.tests.append(test) + + # + # test classifications + # + def _all(self): + return self._list_tests(["tests"], recursive=True) + + def _not_flaky(self, tests): + for t in self.flaky_tests: + if t in tests: + tests.remove(t) + return tests + + def _list_tests(self, prefixes, recursive=False, ignore_ifnotexist=False): + tests = [] + for prefix in prefixes: + real_path = "%s/%s" % (self.gluster_path, prefix) + if not os.path.exists(real_path) and ignore_ifnotexist: + continue + for f in os.listdir(real_path): + if os.path.isdir(real_path + "/" + f): + if recursive: + tests += self._list_tests([prefix + "/" + f], recursive) + else: + if re.match(r".*\.t$", f): + tests += [prefix + "/" + f] + return tests + + def _parse_hosts(self, hosts): + ret = [] + for h in args.hosts.split(" "): + ret.append((h, DEFAULT_PORT)) + Log.debug(ret) + return ret + +# .............................................................................. +# Roles entry point +# .............................................................................. + + +def run_as_server(args): + if not args.server_path: + Log.error("please provide server path") + return 1 + + server = TestServer(args.port, args.server_path) + server.init() + server.serve() + return 0 + + +def run_as_tester(args): + Log.header("GLUSTER TEST CLI") + + Log.debug("args = %s" % args) + + tests = TestRunner(args.gluster_path, args.hosts, args.n, args.tests, + args.flaky_tests, valgrind=args.valgrind, + asan=args.asan, asan_noleaks=args.asan_noleaks, + id_rsa=args.id_rsa, test_timeout=args.test_timeout) + result = tests.run() + tests.print_result() + return result + +# .............................................................................. +# main +# .............................................................................. + + +def main(args): + if args.v: + Log.LOGLEVEL = LogLevel.DEBUG + + if args.server and args.tester: + Log.error("Invalid arguments. More than one role specified") + sys.exit(1) + + if args.server: + sys.exit(run_as_server(args)) + elif args.tester: + sys.exit(run_as_tester(args)) + else: + Log.error("please specify a mode for CI") + parser.print_help() + sys.exit(1) + + +parser = argparse.ArgumentParser(description="Gluster CI") + +# server role +parser.add_argument("--server", help="start server", action="store_true") +parser.add_argument("--server_path", help="server scratch space", + default="/tmp/gluster-test") +parser.add_argument("--host", help="server address to listen", default="") +parser.add_argument("--port", help="server port to listen", + type=int, default=DEFAULT_PORT) +# test role +parser.add_argument("--tester", help="start tester", action="store_true") +parser.add_argument("--valgrind", help="run tests under valgrind", + action="store_true") +parser.add_argument("--asan", help="test with asan enabled", + action="store_true") +parser.add_argument("--asan-noleaks", help="test with asan but no mem leaks", + action="store_true") +parser.add_argument("--tests", help="all/flaky/list of tests", default=None) +parser.add_argument("--flaky_tests", help="list of flaky tests", default=None) +parser.add_argument("--n", help="max number of machines to use", type=int, + default=0) +parser.add_argument("--hosts", help="list of worker machines") +parser.add_argument("--gluster_path", help="gluster path to test", + default=os.getcwd()) +parser.add_argument("--id-rsa", help="private key to use for ssh", + default=None) +parser.add_argument("--test-timeout", + help="test timeout in sec (default 15min)", + default=TEST_TIMEOUT_S) +# general +parser.add_argument("-v", help="verbose", action="store_true") + +args = parser.parse_args() + +main(args) diff --git a/extras/distributed-testing/distributed-test.sh b/extras/distributed-testing/distributed-test.sh new file mode 100755 index 00000000000..1ceff033cba --- /dev/null +++ b/extras/distributed-testing/distributed-test.sh @@ -0,0 +1,91 @@ +#!/bin/bash + +source ./extras/distributed-testing/distributed-test-env + +N=0 +TESTS='all' +FLAKY=$KNOWN_FLAKY_TESTS +BROKEN=$BROKEN_TESTS +TEST_TIMEOUT_S=900 + +FLAGS="" + +function print_env { + echo "Settings:" + echo "N=$N" + echo -e "-------\nHOSTS\n$HOSTS\n-------" + echo -e "TESTS\n$TESTS\n-------" + echo -e "SKIP\n$FLAKY $BROKEN\n-------" + echo -e "TEST_TIMEOUT_S=$TEST_TIMEOUT_S s\n" +} + +function cleanup { + rm -f /tmp/test-*.log +} + +function usage { + echo "Usage: $0 [-h or --help] [-v or --verbose] + [--all] [--flaky] [--smoke] [--broken] + [--valgrind] [--asan] [--asan-noleaks] + [--hosts ] [-n ] + [--tests ] + [--id-rsa ] + " +} + +function parse_args () { + args=`getopt \ + -o hvn: \ + --long help,verbose,valgrind,asan,asan-noleaks,all,\ +smoke,flaky,broken,hosts:,tests:,id-rsa:,test-timeout: \ + -n 'fb-remote-test.sh' -- "$@"` + + if [ $? != 0 ]; then + echo "Error parsing getopt" + exit 1 + fi + + eval set -- "$args" + + while true; do + case "$1" in + -h | --help) usage ; exit 1 ;; + -v | --verbose) FLAGS="$FLAGS -v" ; shift ;; + --valgrind) FLAGS="$FLAGS --valgrind" ; shift ;; + --asan-noleaks) FLAGS="$FLAGS --asan-noleaks"; shift ;; + --asan) FLAGS="$FLAGS --asan" ; shift ;; + --hosts) HOSTS=$2; shift 2 ;; + --tests) TESTS=$2; FLAKY= ; BROKEN= ; shift 2 ;; + --test-timeout) TEST_TIMEOUT_S=$2; shift 2 ;; + --all) TESTS='all' ; shift 1 ;; + --flaky) TESTS=$FLAKY; FLAKY= ; shift 1 ;; + --smoke) TESTS=$SMOKE_TESTS; shift 1 ;; + --broken) TESTS=$BROKEN_TESTS; FLAKY= ; BROKEN= ; shift 1 ;; + --id-rsa) FLAGS="$FLAGS --id-rsa $2" ; shift 2 ;; + -n) N=$2; shift 2 ;; + *) break ;; + esac + done + run_tests_args="$@" +} + +function main { + parse_args "$@" + + if [ -z "$HOSTS" ]; then + echo "Please provide hosts to run the tests in" + exit -1 + fi + + print_env + + cleanup + + "extras/distributed-testing/distributed-test-runner.py" $FLAGS --tester \ + --n "$N" --hosts "$HOSTS" --tests "$TESTS" \ + --flaky_tests "$FLAKY $BROKEN" --test-timeout "$TEST_TIMEOUT_S" + + exit $? +} + +main "$@" -- cgit