diff options
Diffstat (limited to 'test/functional/swift_testing.py')
-rw-r--r-- | test/functional/swift_testing.py | 45 |
1 files changed, 45 insertions, 0 deletions
diff --git a/test/functional/swift_testing.py b/test/functional/swift_testing.py index f05cb48..2a1e1fa 100644 --- a/test/functional/swift_testing.py +++ b/test/functional/swift_testing.py @@ -19,10 +19,13 @@ import socket import sys from time import sleep from urlparse import urlparse +import functools +from nose import SkipTest from test import get_config from swiftclient import get_auth, http_connection +from test.functional.swift_test_client import Connection conf = get_config('func_test') web_front_end = conf.get('web_front_end', 'integral') @@ -184,3 +187,45 @@ def check_response(conn): resp.read() raise InternalServerError() return resp + +cluster_info = {} + + +def get_cluster_info(): + conn = Connection(conf) + conn.authenticate() + global cluster_info + cluster_info = conn.cluster_info() + + +def reset_acl(): + def post(url, token, parsed, conn): + conn.request('POST', parsed.path, '', { + 'X-Auth-Token': token, + 'X-Account-Access-Control': '{}' + }) + return check_response(conn) + resp = retry(post, use_account=1) + resp.read() + + +def requires_acls(f): + @functools.wraps(f) + def wrapper(*args, **kwargs): + if skip: + raise SkipTest + if not cluster_info: + get_cluster_info() + # Determine whether this cluster has account ACLs; if not, skip test + if not cluster_info.get('tempauth', {}).get('account_acls'): + raise SkipTest + if 'keystoneauth' in cluster_info: + # remove when keystoneauth supports account acls + raise SkipTest + reset_acl() + try: + rv = f(*args, **kwargs) + finally: + reset_acl() + return rv + return wrapper |