summaryrefslogtreecommitdiffstats
path: root/test/functional/test_account.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/functional/test_account.py')
-rwxr-xr-xtest/functional/test_account.py123
1 files changed, 76 insertions, 47 deletions
diff --git a/test/functional/test_account.py b/test/functional/test_account.py
index 1cc61bc..30a8e74 100755
--- a/test/functional/test_account.py
+++ b/test/functional/test_account.py
@@ -21,13 +21,11 @@ from uuid import uuid4
from nose import SkipTest
from string import letters
-from swift.common.constraints import MAX_META_COUNT, MAX_META_NAME_LENGTH, \
- MAX_META_OVERALL_SIZE, MAX_META_VALUE_LENGTH
from swift.common.middleware.acl import format_acl
-from swift_testing import (check_response, retry, skip, skip2, skip3,
- web_front_end, requires_acls)
-import swift_testing
-from test.functional.tests import load_constraint
+
+from test.functional import check_response, retry, requires_acls, \
+ load_constraint
+import test.functional as tf
class TestAccount(unittest.TestCase):
@@ -69,7 +67,7 @@ class TestAccount(unittest.TestCase):
self.assertEqual(resp.status // 100, 2)
def test_metadata(self):
- if skip:
+ if tf.skip:
raise SkipTest
def post(url, token, parsed, conn, value):
@@ -109,6 +107,9 @@ class TestAccount(unittest.TestCase):
self.assertEqual(resp.getheader('x-account-meta-test'), 'Value')
def test_invalid_acls(self):
+ if tf.skip:
+ raise SkipTest
+
def post(url, token, parsed, conn, headers):
new_headers = dict({'X-Auth-Token': token}, **headers)
conn.request('POST', parsed.path, '', new_headers)
@@ -145,7 +146,7 @@ class TestAccount(unittest.TestCase):
resp.read()
self.assertEqual(resp.status, 400)
- acl_user = swift_testing.swift_test_user[1]
+ acl_user = tf.swift_test_user[1]
acl = {'admin': [acl_user], 'invalid_key': 'invalid_value'}
headers = {'x-account-access-control': format_acl(
version=2, acl_dict=acl)}
@@ -173,7 +174,7 @@ class TestAccount(unittest.TestCase):
@requires_acls
def test_read_only_acl(self):
- if skip3:
+ if tf.skip3:
raise SkipTest
def get(url, token, parsed, conn):
@@ -191,7 +192,7 @@ class TestAccount(unittest.TestCase):
self.assertEquals(resp.status, 403)
# grant read access
- acl_user = swift_testing.swift_test_user[2]
+ acl_user = tf.swift_test_user[2]
acl = {'read-only': [acl_user]}
headers = {'x-account-access-control': format_acl(
version=2, acl_dict=acl)}
@@ -224,7 +225,7 @@ class TestAccount(unittest.TestCase):
@requires_acls
def test_read_write_acl(self):
- if skip3:
+ if tf.skip3:
raise SkipTest
def get(url, token, parsed, conn):
@@ -242,7 +243,7 @@ class TestAccount(unittest.TestCase):
self.assertEquals(resp.status, 403)
# grant read-write access
- acl_user = swift_testing.swift_test_user[2]
+ acl_user = tf.swift_test_user[2]
acl = {'read-write': [acl_user]}
headers = {'x-account-access-control': format_acl(
version=2, acl_dict=acl)}
@@ -265,7 +266,7 @@ class TestAccount(unittest.TestCase):
@requires_acls
def test_admin_acl(self):
- if skip3:
+ if tf.skip3:
raise SkipTest
def get(url, token, parsed, conn):
@@ -283,7 +284,7 @@ class TestAccount(unittest.TestCase):
self.assertEquals(resp.status, 403)
# grant admin access
- acl_user = swift_testing.swift_test_user[2]
+ acl_user = tf.swift_test_user[2]
acl = {'admin': [acl_user]}
acl_json_str = format_acl(version=2, acl_dict=acl)
headers = {'x-account-access-control': acl_json_str}
@@ -323,7 +324,7 @@ class TestAccount(unittest.TestCase):
@requires_acls
def test_protected_tempurl(self):
- if skip3:
+ if tf.skip3:
raise SkipTest
def get(url, token, parsed, conn):
@@ -335,7 +336,7 @@ class TestAccount(unittest.TestCase):
conn.request('POST', parsed.path, '', new_headers)
return check_response(conn)
- # add a account metadata, and temp-url-key to account
+ # add an account metadata, and temp-url-key to account
value = str(uuid4())
headers = {
'x-account-meta-temp-url-key': 'secret',
@@ -346,7 +347,7 @@ class TestAccount(unittest.TestCase):
self.assertEqual(resp.status, 204)
# grant read-only access to tester3
- acl_user = swift_testing.swift_test_user[2]
+ acl_user = tf.swift_test_user[2]
acl = {'read-only': [acl_user]}
acl_json_str = format_acl(version=2, acl_dict=acl)
headers = {'x-account-access-control': acl_json_str}
@@ -364,7 +365,7 @@ class TestAccount(unittest.TestCase):
self.assertEqual(resp.getheader('X-Account-Meta-Temp-Url-Key'), None)
# grant read-write access to tester3
- acl_user = swift_testing.swift_test_user[2]
+ acl_user = tf.swift_test_user[2]
acl = {'read-write': [acl_user]}
acl_json_str = format_acl(version=2, acl_dict=acl)
headers = {'x-account-access-control': acl_json_str}
@@ -382,7 +383,7 @@ class TestAccount(unittest.TestCase):
self.assertEqual(resp.getheader('X-Account-Meta-Temp-Url-Key'), None)
# grant admin access to tester3
- acl_user = swift_testing.swift_test_user[2]
+ acl_user = tf.swift_test_user[2]
acl = {'admin': [acl_user]}
acl_json_str = format_acl(version=2, acl_dict=acl)
headers = {'x-account-access-control': acl_json_str}
@@ -417,7 +418,7 @@ class TestAccount(unittest.TestCase):
@requires_acls
def test_account_acls(self):
- if skip2:
+ if tf.skip2:
raise SkipTest
def post(url, token, parsed, conn, headers):
@@ -464,7 +465,7 @@ class TestAccount(unittest.TestCase):
# User1 is swift_owner of their own account, so they can POST an
# ACL -- let's do this and make User2 (test_user[1]) an admin
- acl_user = swift_testing.swift_test_user[1]
+ acl_user = tf.swift_test_user[1]
acl = {'admin': [acl_user]}
headers = {'x-account-access-control': format_acl(
version=2, acl_dict=acl)}
@@ -541,7 +542,7 @@ class TestAccount(unittest.TestCase):
@requires_acls
def test_swift_account_acls(self):
- if skip:
+ if tf.skip:
raise SkipTest
def post(url, token, parsed, conn, headers):
@@ -604,7 +605,7 @@ class TestAccount(unittest.TestCase):
resp.read()
def test_swift_prohibits_garbage_account_acls(self):
- if skip:
+ if tf.skip:
raise SkipTest
def post(url, token, parsed, conn, headers):
@@ -671,7 +672,7 @@ class TestAccount(unittest.TestCase):
resp.read()
def test_unicode_metadata(self):
- if skip:
+ if tf.skip:
raise SkipTest
def post(url, token, parsed, conn, name, value):
@@ -684,7 +685,7 @@ class TestAccount(unittest.TestCase):
return check_response(conn)
uni_key = u'X-Account-Meta-uni\u0E12'
uni_value = u'uni\u0E12'
- if (web_front_end == 'integral'):
+ if (tf.web_front_end == 'integral'):
resp = retry(post, uni_key, '1')
resp.read()
self.assertTrue(resp.status in (201, 204))
@@ -700,7 +701,7 @@ class TestAccount(unittest.TestCase):
self.assert_(resp.status in (200, 204), resp.status)
self.assertEqual(resp.getheader('X-Account-Meta-uni'),
uni_value.encode('utf-8'))
- if (web_front_end == 'integral'):
+ if (tf.web_front_end == 'integral'):
resp = retry(post, uni_key, uni_value)
resp.read()
self.assertEqual(resp.status, 204)
@@ -711,7 +712,7 @@ class TestAccount(unittest.TestCase):
uni_value.encode('utf-8'))
def test_multi_metadata(self):
- if skip:
+ if tf.skip:
raise SkipTest
def post(url, token, parsed, conn, name, value):
@@ -740,7 +741,7 @@ class TestAccount(unittest.TestCase):
self.assertEqual(resp.getheader('x-account-meta-two'), '2')
def test_bad_metadata(self):
- if skip:
+ if tf.skip:
raise SkipTest
def post(url, token, parsed, conn, extra_headers):
@@ -750,27 +751,31 @@ class TestAccount(unittest.TestCase):
return check_response(conn)
resp = retry(post,
- {'X-Account-Meta-' + ('k' * MAX_META_NAME_LENGTH): 'v'})
+ {'X-Account-Meta-' + (
+ 'k' * self.max_meta_name_length): 'v'})
resp.read()
self.assertEqual(resp.status, 204)
resp = retry(
post,
- {'X-Account-Meta-' + ('k' * (MAX_META_NAME_LENGTH + 1)): 'v'})
+ {'X-Account-Meta-' + ('k' * (
+ self.max_meta_name_length + 1)): 'v'})
resp.read()
self.assertEqual(resp.status, 400)
resp = retry(post,
- {'X-Account-Meta-Too-Long': 'k' * MAX_META_VALUE_LENGTH})
+ {'X-Account-Meta-Too-Long': (
+ 'k' * self.max_meta_value_length)})
resp.read()
self.assertEqual(resp.status, 204)
resp = retry(
post,
- {'X-Account-Meta-Too-Long': 'k' * (MAX_META_VALUE_LENGTH + 1)})
+ {'X-Account-Meta-Too-Long': 'k' * (
+ self.max_meta_value_length + 1)})
resp.read()
self.assertEqual(resp.status, 400)
def test_bad_metadata2(self):
- if skip:
+ if tf.skip:
raise SkipTest
def post(url, token, parsed, conn, extra_headers):
@@ -785,20 +790,20 @@ class TestAccount(unittest.TestCase):
resp = retry(post, headers)
headers = {}
- for x in xrange(MAX_META_COUNT):
+ for x in xrange(self.max_meta_count):
headers['X-Account-Meta-%d' % x] = 'v'
resp = retry(post, headers)
resp.read()
self.assertEqual(resp.status, 204)
headers = {}
- for x in xrange(MAX_META_COUNT + 1):
+ for x in xrange(self.max_meta_count + 1):
headers['X-Account-Meta-%d' % x] = 'v'
resp = retry(post, headers)
resp.read()
self.assertEqual(resp.status, 400)
def test_bad_metadata3(self):
- if skip:
+ if tf.skip:
raise SkipTest
def post(url, token, parsed, conn, extra_headers):
@@ -807,31 +812,55 @@ class TestAccount(unittest.TestCase):
conn.request('POST', parsed.path, '', headers)
return check_response(conn)
- # TODO: Find the test that adds these and remove them.
- headers = {'x-remove-account-meta-temp-url-key': 'remove',
- 'x-remove-account-meta-temp-url-key-2': 'remove'}
- resp = retry(post, headers)
-
headers = {}
- header_value = 'k' * MAX_META_VALUE_LENGTH
+ header_value = 'k' * self.max_meta_value_length
size = 0
x = 0
- while size < MAX_META_OVERALL_SIZE - 4 - MAX_META_VALUE_LENGTH:
- size += 4 + MAX_META_VALUE_LENGTH
+ while size < (self.max_meta_overall_size - 4
+ - self.max_meta_value_length):
+ size += 4 + self.max_meta_value_length
headers['X-Account-Meta-%04d' % x] = header_value
x += 1
- if MAX_META_OVERALL_SIZE - size > 1:
+ if self.max_meta_overall_size - size > 1:
headers['X-Account-Meta-k'] = \
- 'v' * (MAX_META_OVERALL_SIZE - size - 1)
+ 'v' * (self.max_meta_overall_size - size - 1)
resp = retry(post, headers)
resp.read()
self.assertEqual(resp.status, 204)
headers['X-Account-Meta-k'] = \
- 'v' * (MAX_META_OVERALL_SIZE - size)
+ 'v' * (self.max_meta_overall_size - size)
resp = retry(post, headers)
resp.read()
self.assertEqual(resp.status, 400)
+class TestAccountInNonDefaultDomain(unittest.TestCase):
+ def setUp(self):
+ if tf.skip or tf.skip2 or tf.skip_if_not_v3:
+ raise SkipTest('AUTH VERSION 3 SPECIFIC TEST')
+
+ def test_project_domain_id_header(self):
+ # make sure account exists (assumes account auto create)
+ def post(url, token, parsed, conn):
+ conn.request('POST', parsed.path, '',
+ {'X-Auth-Token': token})
+ return check_response(conn)
+
+ resp = retry(post, use_account=4)
+ resp.read()
+ self.assertEqual(resp.status, 204)
+
+ # account in non-default domain should have a project domain id
+ def head(url, token, parsed, conn):
+ conn.request('HEAD', parsed.path, '',
+ {'X-Auth-Token': token})
+ return check_response(conn)
+
+ resp = retry(head, use_account=4)
+ resp.read()
+ self.assertEqual(resp.status, 204)
+ self.assertTrue('X-Account-Project-Domain-Id' in resp.headers)
+
+
if __name__ == '__main__':
unittest.main()