summaryrefslogtreecommitdiffstats
path: root/test/functional/test_container.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/functional/test_container.py')
-rwxr-xr-xtest/functional/test_container.py549
1 files changed, 450 insertions, 99 deletions
diff --git a/test/functional/test_container.py b/test/functional/test_container.py
index 91702e9..d7896a4 100755
--- a/test/functional/test_container.py
+++ b/test/functional/test_container.py
@@ -20,19 +20,19 @@ import unittest
from nose import SkipTest
from uuid import uuid4
-from swift.common.constraints import MAX_META_COUNT, MAX_META_NAME_LENGTH, \
- MAX_META_OVERALL_SIZE, MAX_META_VALUE_LENGTH
-
-from swift_testing import check_response, retry, skip, skip2, skip3, \
- swift_test_perm, web_front_end, requires_acls, swift_test_user
+from test.functional import check_response, retry, requires_acls, \
+ load_constraint, requires_policies
+import test.functional as tf
class TestContainer(unittest.TestCase):
def setUp(self):
- if skip:
+ if tf.skip:
raise SkipTest
self.name = uuid4().hex
+ # this container isn't created by default, but will be cleaned up
+ self.container = uuid4().hex
def put(url, token, parsed, conn):
conn.request('PUT', parsed.path + '/' + self.name, '',
@@ -43,44 +43,58 @@ class TestContainer(unittest.TestCase):
resp.read()
self.assertEqual(resp.status, 201)
+ self.max_meta_count = load_constraint('max_meta_count')
+ self.max_meta_name_length = load_constraint('max_meta_name_length')
+ self.max_meta_overall_size = load_constraint('max_meta_overall_size')
+ self.max_meta_value_length = load_constraint('max_meta_value_length')
+
def tearDown(self):
- if skip:
+ if tf.skip:
raise SkipTest
- def get(url, token, parsed, conn):
- conn.request('GET', parsed.path + '/' + self.name + '?format=json',
- '', {'X-Auth-Token': token})
- return check_response(conn)
-
- def delete(url, token, parsed, conn, obj):
- conn.request('DELETE',
- '/'.join([parsed.path, self.name, obj['name']]), '',
- {'X-Auth-Token': token})
- return check_response(conn)
-
- while True:
- resp = retry(get)
- body = resp.read()
- self.assert_(resp.status // 100 == 2, resp.status)
- objs = json.loads(body)
- if not objs:
- break
- for obj in objs:
- resp = retry(delete, obj)
- resp.read()
- self.assertEqual(resp.status, 204)
-
- def delete(url, token, parsed, conn):
- conn.request('DELETE', parsed.path + '/' + self.name, '',
+ def get(url, token, parsed, conn, container):
+ conn.request(
+ 'GET', parsed.path + '/' + container + '?format=json', '',
+ {'X-Auth-Token': token})
+ return check_response(conn)
+
+ def delete(url, token, parsed, conn, container, obj):
+ conn.request(
+ 'DELETE', '/'.join([parsed.path, container, obj['name']]), '',
+ {'X-Auth-Token': token})
+ return check_response(conn)
+
+ for container in (self.name, self.container):
+ while True:
+ resp = retry(get, container)
+ body = resp.read()
+ if resp.status == 404:
+ break
+ self.assert_(resp.status // 100 == 2, resp.status)
+ objs = json.loads(body)
+ if not objs:
+ break
+ for obj in objs:
+ resp = retry(delete, container, obj)
+ resp.read()
+ self.assertEqual(resp.status, 204)
+
+ def delete(url, token, parsed, conn, container):
+ conn.request('DELETE', parsed.path + '/' + container, '',
{'X-Auth-Token': token})
return check_response(conn)
- resp = retry(delete)
+ resp = retry(delete, self.name)
resp.read()
self.assertEqual(resp.status, 204)
+ # container may have not been created
+ resp = retry(delete, self.container)
+ resp.read()
+ self.assert_(resp.status in (204, 404))
+
def test_multi_metadata(self):
- if skip:
+ if tf.skip:
raise SkipTest
def post(url, token, parsed, conn, name, value):
@@ -110,7 +124,7 @@ class TestContainer(unittest.TestCase):
self.assertEqual(resp.getheader('x-container-meta-two'), '2')
def test_unicode_metadata(self):
- if skip:
+ if tf.skip:
raise SkipTest
def post(url, token, parsed, conn, name, value):
@@ -125,7 +139,7 @@ class TestContainer(unittest.TestCase):
uni_key = u'X-Container-Meta-uni\u0E12'
uni_value = u'uni\u0E12'
- if (web_front_end == 'integral'):
+ if (tf.web_front_end == 'integral'):
resp = retry(post, uni_key, '1')
resp.read()
self.assertEqual(resp.status, 204)
@@ -141,7 +155,7 @@ class TestContainer(unittest.TestCase):
self.assert_(resp.status in (200, 204), resp.status)
self.assertEqual(resp.getheader('X-Container-Meta-uni'),
uni_value.encode('utf-8'))
- if (web_front_end == 'integral'):
+ if (tf.web_front_end == 'integral'):
resp = retry(post, uni_key, uni_value)
resp.read()
self.assertEqual(resp.status, 204)
@@ -152,7 +166,7 @@ class TestContainer(unittest.TestCase):
uni_value.encode('utf-8'))
def test_PUT_metadata(self):
- if skip:
+ if tf.skip:
raise SkipTest
def put(url, token, parsed, conn, name, value):
@@ -209,7 +223,7 @@ class TestContainer(unittest.TestCase):
self.assertEqual(resp.status, 204)
def test_POST_metadata(self):
- if skip:
+ if tf.skip:
raise SkipTest
def post(url, token, parsed, conn, value):
@@ -249,7 +263,7 @@ class TestContainer(unittest.TestCase):
self.assertEqual(resp.getheader('x-container-meta-test'), 'Value')
def test_PUT_bad_metadata(self):
- if skip:
+ if tf.skip:
raise SkipTest
def put(url, token, parsed, conn, name, extra_headers):
@@ -266,7 +280,7 @@ class TestContainer(unittest.TestCase):
name = uuid4().hex
resp = retry(
put, name,
- {'X-Container-Meta-' + ('k' * MAX_META_NAME_LENGTH): 'v'})
+ {'X-Container-Meta-' + ('k' * self.max_meta_name_length): 'v'})
resp.read()
self.assertEqual(resp.status, 201)
resp = retry(delete, name)
@@ -275,7 +289,8 @@ class TestContainer(unittest.TestCase):
name = uuid4().hex
resp = retry(
put, name,
- {'X-Container-Meta-' + ('k' * (MAX_META_NAME_LENGTH + 1)): 'v'})
+ {'X-Container-Meta-' + (
+ 'k' * (self.max_meta_name_length + 1)): 'v'})
resp.read()
self.assertEqual(resp.status, 400)
resp = retry(delete, name)
@@ -285,7 +300,7 @@ class TestContainer(unittest.TestCase):
name = uuid4().hex
resp = retry(
put, name,
- {'X-Container-Meta-Too-Long': 'k' * MAX_META_VALUE_LENGTH})
+ {'X-Container-Meta-Too-Long': 'k' * self.max_meta_value_length})
resp.read()
self.assertEqual(resp.status, 201)
resp = retry(delete, name)
@@ -294,7 +309,8 @@ class TestContainer(unittest.TestCase):
name = uuid4().hex
resp = retry(
put, name,
- {'X-Container-Meta-Too-Long': 'k' * (MAX_META_VALUE_LENGTH + 1)})
+ {'X-Container-Meta-Too-Long': 'k' * (
+ self.max_meta_value_length + 1)})
resp.read()
self.assertEqual(resp.status, 400)
resp = retry(delete, name)
@@ -303,7 +319,7 @@ class TestContainer(unittest.TestCase):
name = uuid4().hex
headers = {}
- for x in xrange(MAX_META_COUNT):
+ for x in xrange(self.max_meta_count):
headers['X-Container-Meta-%d' % x] = 'v'
resp = retry(put, name, headers)
resp.read()
@@ -313,7 +329,7 @@ class TestContainer(unittest.TestCase):
self.assertEqual(resp.status, 204)
name = uuid4().hex
headers = {}
- for x in xrange(MAX_META_COUNT + 1):
+ for x in xrange(self.max_meta_count + 1):
headers['X-Container-Meta-%d' % x] = 'v'
resp = retry(put, name, headers)
resp.read()
@@ -324,16 +340,17 @@ class TestContainer(unittest.TestCase):
name = uuid4().hex
headers = {}
- header_value = 'k' * MAX_META_VALUE_LENGTH
+ header_value = 'k' * self.max_meta_value_length
size = 0
x = 0
- while size < MAX_META_OVERALL_SIZE - 4 - MAX_META_VALUE_LENGTH:
- size += 4 + MAX_META_VALUE_LENGTH
+ while size < (self.max_meta_overall_size - 4
+ - self.max_meta_value_length):
+ size += 4 + self.max_meta_value_length
headers['X-Container-Meta-%04d' % x] = header_value
x += 1
- if MAX_META_OVERALL_SIZE - size > 1:
+ if self.max_meta_overall_size - size > 1:
headers['X-Container-Meta-k'] = \
- 'v' * (MAX_META_OVERALL_SIZE - size - 1)
+ 'v' * (self.max_meta_overall_size - size - 1)
resp = retry(put, name, headers)
resp.read()
self.assertEqual(resp.status, 201)
@@ -342,7 +359,7 @@ class TestContainer(unittest.TestCase):
self.assertEqual(resp.status, 204)
name = uuid4().hex
headers['X-Container-Meta-k'] = \
- 'v' * (MAX_META_OVERALL_SIZE - size)
+ 'v' * (self.max_meta_overall_size - size)
resp = retry(put, name, headers)
resp.read()
self.assertEqual(resp.status, 400)
@@ -351,7 +368,7 @@ class TestContainer(unittest.TestCase):
self.assertEqual(resp.status, 404)
def test_POST_bad_metadata(self):
- if skip:
+ if tf.skip:
raise SkipTest
def post(url, token, parsed, conn, extra_headers):
@@ -362,28 +379,30 @@ class TestContainer(unittest.TestCase):
resp = retry(
post,
- {'X-Container-Meta-' + ('k' * MAX_META_NAME_LENGTH): 'v'})
+ {'X-Container-Meta-' + ('k' * self.max_meta_name_length): 'v'})
resp.read()
self.assertEqual(resp.status, 204)
resp = retry(
post,
- {'X-Container-Meta-' + ('k' * (MAX_META_NAME_LENGTH + 1)): 'v'})
+ {'X-Container-Meta-' + (
+ 'k' * (self.max_meta_name_length + 1)): 'v'})
resp.read()
self.assertEqual(resp.status, 400)
resp = retry(
post,
- {'X-Container-Meta-Too-Long': 'k' * MAX_META_VALUE_LENGTH})
+ {'X-Container-Meta-Too-Long': 'k' * self.max_meta_value_length})
resp.read()
self.assertEqual(resp.status, 204)
resp = retry(
post,
- {'X-Container-Meta-Too-Long': 'k' * (MAX_META_VALUE_LENGTH + 1)})
+ {'X-Container-Meta-Too-Long': 'k' * (
+ self.max_meta_value_length + 1)})
resp.read()
self.assertEqual(resp.status, 400)
def test_POST_bad_metadata2(self):
- if skip:
+ if tf.skip:
raise SkipTest
def post(url, token, parsed, conn, extra_headers):
@@ -393,20 +412,20 @@ class TestContainer(unittest.TestCase):
return check_response(conn)
headers = {}
- for x in xrange(MAX_META_COUNT):
+ for x in xrange(self.max_meta_count):
headers['X-Container-Meta-%d' % x] = 'v'
resp = retry(post, headers)
resp.read()
self.assertEqual(resp.status, 204)
headers = {}
- for x in xrange(MAX_META_COUNT + 1):
+ for x in xrange(self.max_meta_count + 1):
headers['X-Container-Meta-%d' % x] = 'v'
resp = retry(post, headers)
resp.read()
self.assertEqual(resp.status, 400)
def test_POST_bad_metadata3(self):
- if skip:
+ if tf.skip:
raise SkipTest
def post(url, token, parsed, conn, extra_headers):
@@ -416,27 +435,28 @@ class TestContainer(unittest.TestCase):
return check_response(conn)
headers = {}
- header_value = 'k' * MAX_META_VALUE_LENGTH
+ header_value = 'k' * self.max_meta_value_length
size = 0
x = 0
- while size < MAX_META_OVERALL_SIZE - 4 - MAX_META_VALUE_LENGTH:
- size += 4 + MAX_META_VALUE_LENGTH
+ while size < (self.max_meta_overall_size - 4
+ - self.max_meta_value_length):
+ size += 4 + self.max_meta_value_length
headers['X-Container-Meta-%04d' % x] = header_value
x += 1
- if MAX_META_OVERALL_SIZE - size > 1:
+ if self.max_meta_overall_size - size > 1:
headers['X-Container-Meta-k'] = \
- 'v' * (MAX_META_OVERALL_SIZE - size - 1)
+ 'v' * (self.max_meta_overall_size - size - 1)
resp = retry(post, headers)
resp.read()
self.assertEqual(resp.status, 204)
headers['X-Container-Meta-k'] = \
- 'v' * (MAX_META_OVERALL_SIZE - size)
+ 'v' * (self.max_meta_overall_size - size)
resp = retry(post, headers)
resp.read()
self.assertEqual(resp.status, 400)
def test_public_container(self):
- if skip:
+ if tf.skip:
raise SkipTest
def get(url, token, parsed, conn):
@@ -477,7 +497,7 @@ class TestContainer(unittest.TestCase):
self.assert_(str(err).startswith('No result after '), err)
def test_cross_account_container(self):
- if skip or skip2:
+ if tf.skip or tf.skip2:
raise SkipTest
# Obtain the first account's string
first_account = ['unknown']
@@ -505,8 +525,8 @@ class TestContainer(unittest.TestCase):
def post(url, token, parsed, conn):
conn.request('POST', parsed.path + '/' + self.name, '',
{'X-Auth-Token': token,
- 'X-Container-Read': swift_test_perm[1],
- 'X-Container-Write': swift_test_perm[1]})
+ 'X-Container-Read': tf.swift_test_perm[1],
+ 'X-Container-Write': tf.swift_test_perm[1]})
return check_response(conn)
resp = retry(post)
@@ -533,7 +553,7 @@ class TestContainer(unittest.TestCase):
self.assertEqual(resp.status, 403)
def test_cross_account_public_container(self):
- if skip or skip2:
+ if tf.skip or tf.skip2:
raise SkipTest
# Obtain the first account's string
first_account = ['unknown']
@@ -586,7 +606,7 @@ class TestContainer(unittest.TestCase):
def post(url, token, parsed, conn):
conn.request('POST', parsed.path + '/' + self.name, '',
{'X-Auth-Token': token,
- 'X-Container-Write': swift_test_perm[1]})
+ 'X-Container-Write': tf.swift_test_perm[1]})
return check_response(conn)
resp = retry(post)
@@ -602,7 +622,7 @@ class TestContainer(unittest.TestCase):
self.assertEqual(resp.status, 201)
def test_nonadmin_user(self):
- if skip or skip3:
+ if tf.skip or tf.skip3:
raise SkipTest
# Obtain the first account's string
first_account = ['unknown']
@@ -630,7 +650,7 @@ class TestContainer(unittest.TestCase):
def post(url, token, parsed, conn):
conn.request('POST', parsed.path + '/' + self.name, '',
{'X-Auth-Token': token,
- 'X-Container-Read': swift_test_perm[2]})
+ 'X-Container-Read': tf.swift_test_perm[2]})
return check_response(conn)
resp = retry(post)
@@ -655,7 +675,7 @@ class TestContainer(unittest.TestCase):
def post(url, token, parsed, conn):
conn.request('POST', parsed.path + '/' + self.name, '',
{'X-Auth-Token': token,
- 'X-Container-Write': swift_test_perm[2]})
+ 'X-Container-Write': tf.swift_test_perm[2]})
return check_response(conn)
resp = retry(post)
@@ -672,7 +692,7 @@ class TestContainer(unittest.TestCase):
@requires_acls
def test_read_only_acl_listings(self):
- if skip3:
+ if tf.skip3:
raise SkipTest
def get(url, token, parsed, conn):
@@ -695,7 +715,7 @@ class TestContainer(unittest.TestCase):
self.assertEquals(resp.status, 403)
# grant read-only access
- acl_user = swift_test_user[2]
+ acl_user = tf.swift_test_user[2]
acl = {'read-only': [acl_user]}
headers = {'x-account-access-control': json.dumps(acl)}
resp = retry(post_account, headers=headers, use_account=1)
@@ -725,7 +745,7 @@ class TestContainer(unittest.TestCase):
@requires_acls
def test_read_only_acl_metadata(self):
- if skip3:
+ if tf.skip3:
raise SkipTest
def get(url, token, parsed, conn, name):
@@ -760,7 +780,7 @@ class TestContainer(unittest.TestCase):
self.assertEquals(resp.status, 403)
# grant read-only access
- acl_user = swift_test_user[2]
+ acl_user = tf.swift_test_user[2]
acl = {'read-only': [acl_user]}
headers = {'x-account-access-control': json.dumps(acl)}
resp = retry(post_account, headers=headers, use_account=1)
@@ -782,7 +802,7 @@ class TestContainer(unittest.TestCase):
@requires_acls
def test_read_write_acl_listings(self):
- if skip3:
+ if tf.skip3:
raise SkipTest
def get(url, token, parsed, conn):
@@ -810,7 +830,7 @@ class TestContainer(unittest.TestCase):
self.assertEquals(resp.status, 403)
# grant read-write access
- acl_user = swift_test_user[2]
+ acl_user = tf.swift_test_user[2]
acl = {'read-write': [acl_user]}
headers = {'x-account-access-control': json.dumps(acl)}
resp = retry(post, headers=headers, use_account=1)
@@ -853,7 +873,7 @@ class TestContainer(unittest.TestCase):
@requires_acls
def test_read_write_acl_metadata(self):
- if skip3:
+ if tf.skip3:
raise SkipTest
def get(url, token, parsed, conn, name):
@@ -888,7 +908,7 @@ class TestContainer(unittest.TestCase):
self.assertEquals(resp.status, 403)
# grant read-write access
- acl_user = swift_test_user[2]
+ acl_user = tf.swift_test_user[2]
acl = {'read-write': [acl_user]}
headers = {'x-account-access-control': json.dumps(acl)}
resp = retry(post_account, headers=headers, use_account=1)
@@ -924,7 +944,7 @@ class TestContainer(unittest.TestCase):
@requires_acls
def test_admin_acl_listing(self):
- if skip3:
+ if tf.skip3:
raise SkipTest
def get(url, token, parsed, conn):
@@ -952,7 +972,7 @@ class TestContainer(unittest.TestCase):
self.assertEquals(resp.status, 403)
# grant admin access
- acl_user = swift_test_user[2]
+ acl_user = tf.swift_test_user[2]
acl = {'admin': [acl_user]}
headers = {'x-account-access-control': json.dumps(acl)}
resp = retry(post, headers=headers, use_account=1)
@@ -995,7 +1015,7 @@ class TestContainer(unittest.TestCase):
@requires_acls
def test_admin_acl_metadata(self):
- if skip3:
+ if tf.skip3:
raise SkipTest
def get(url, token, parsed, conn, name):
@@ -1030,7 +1050,7 @@ class TestContainer(unittest.TestCase):
self.assertEquals(resp.status, 403)
# grant access
- acl_user = swift_test_user[2]
+ acl_user = tf.swift_test_user[2]
acl = {'admin': [acl_user]}
headers = {'x-account-access-control': json.dumps(acl)}
resp = retry(post_account, headers=headers, use_account=1)
@@ -1066,7 +1086,7 @@ class TestContainer(unittest.TestCase):
@requires_acls
def test_protected_container_sync(self):
- if skip3:
+ if tf.skip3:
raise SkipTest
def get(url, token, parsed, conn, name):
@@ -1100,7 +1120,7 @@ class TestContainer(unittest.TestCase):
self.assertEqual(resp.getheader('X-Container-Meta-Test'), value)
# grant read-only access
- acl_user = swift_test_user[2]
+ acl_user = tf.swift_test_user[2]
acl = {'read-only': [acl_user]}
headers = {'x-account-access-control': json.dumps(acl)}
resp = retry(post_account, headers=headers, use_account=1)
@@ -1122,7 +1142,7 @@ class TestContainer(unittest.TestCase):
self.assertEqual(resp.status, 403)
# grant read-write access
- acl_user = swift_test_user[2]
+ acl_user = tf.swift_test_user[2]
acl = {'read-write': [acl_user]}
headers = {'x-account-access-control': json.dumps(acl)}
resp = retry(post_account, headers=headers, use_account=1)
@@ -1160,7 +1180,7 @@ class TestContainer(unittest.TestCase):
self.assertEqual(resp.getheader('X-Container-Sync-Key'), 'secret')
# grant admin access
- acl_user = swift_test_user[2]
+ acl_user = tf.swift_test_user[2]
acl = {'admin': [acl_user]}
headers = {'x-account-access-control': json.dumps(acl)}
resp = retry(post_account, headers=headers, use_account=1)
@@ -1188,7 +1208,7 @@ class TestContainer(unittest.TestCase):
@requires_acls
def test_protected_container_acl(self):
- if skip3:
+ if tf.skip3:
raise SkipTest
def get(url, token, parsed, conn, name):
@@ -1224,7 +1244,7 @@ class TestContainer(unittest.TestCase):
self.assertEqual(resp.getheader('X-Container-Meta-Test'), value)
# grant read-only access
- acl_user = swift_test_user[2]
+ acl_user = tf.swift_test_user[2]
acl = {'read-only': [acl_user]}
headers = {'x-account-access-control': json.dumps(acl)}
resp = retry(post_account, headers=headers, use_account=1)
@@ -1250,7 +1270,7 @@ class TestContainer(unittest.TestCase):
self.assertEqual(resp.status, 403)
# grant read-write access
- acl_user = swift_test_user[2]
+ acl_user = tf.swift_test_user[2]
acl = {'read-write': [acl_user]}
headers = {'x-account-access-control': json.dumps(acl)}
resp = retry(post_account, headers=headers, use_account=1)
@@ -1292,7 +1312,7 @@ class TestContainer(unittest.TestCase):
self.assertEqual(resp.getheader('X-Container-Write'), 'jdoe')
# grant admin access
- acl_user = swift_test_user[2]
+ acl_user = tf.swift_test_user[2]
acl = {'admin': [acl_user]}
headers = {'x-account-access-control': json.dumps(acl)}
resp = retry(post_account, headers=headers, use_account=1)
@@ -1322,7 +1342,7 @@ class TestContainer(unittest.TestCase):
self.assertEqual(resp.getheader('X-Container-Read'), '.r:*')
def test_long_name_content_type(self):
- if skip:
+ if tf.skip:
raise SkipTest
def put(url, token, parsed, conn):
@@ -1338,7 +1358,7 @@ class TestContainer(unittest.TestCase):
'text/html; charset=UTF-8')
def test_null_name(self):
- if skip:
+ if tf.skip:
raise SkipTest
def put(url, token, parsed, conn):
@@ -1347,12 +1367,343 @@ class TestContainer(unittest.TestCase):
return check_response(conn)
resp = retry(put)
- if (web_front_end == 'apache2'):
+ if (tf.web_front_end == 'apache2'):
self.assertEqual(resp.status, 404)
else:
self.assertEqual(resp.read(), 'Invalid UTF8 or contains NULL')
self.assertEqual(resp.status, 412)
+ def test_create_container_gets_default_policy_by_default(self):
+ try:
+ default_policy = \
+ tf.FunctionalStoragePolicyCollection.from_info().default
+ except AssertionError:
+ raise SkipTest()
+
+ def put(url, token, parsed, conn):
+ conn.request('PUT', parsed.path + '/' + self.container, '',
+ {'X-Auth-Token': token})
+ return check_response(conn)
+ resp = retry(put)
+ resp.read()
+ self.assertEqual(resp.status // 100, 2)
+
+ def head(url, token, parsed, conn):
+ conn.request('HEAD', parsed.path + '/' + self.container, '',
+ {'X-Auth-Token': token})
+ return check_response(conn)
+ resp = retry(head)
+ resp.read()
+ headers = dict((k.lower(), v) for k, v in resp.getheaders())
+ self.assertEquals(headers.get('x-storage-policy'),
+ default_policy['name'])
+
+ def test_error_invalid_storage_policy_name(self):
+ def put(url, token, parsed, conn, headers):
+ new_headers = dict({'X-Auth-Token': token}, **headers)
+ conn.request('PUT', parsed.path + '/' + self.container, '',
+ new_headers)
+ return check_response(conn)
+
+ # create
+ resp = retry(put, {'X-Storage-Policy': uuid4().hex})
+ resp.read()
+ self.assertEqual(resp.status, 400)
+
+ @requires_policies
+ def test_create_non_default_storage_policy_container(self):
+ policy = self.policies.exclude(default=True).select()
+
+ def put(url, token, parsed, conn, headers=None):
+ base_headers = {'X-Auth-Token': token}
+ if headers:
+ base_headers.update(headers)
+ conn.request('PUT', parsed.path + '/' + self.container, '',
+ base_headers)
+ return check_response(conn)
+ headers = {'X-Storage-Policy': policy['name']}
+ resp = retry(put, headers=headers)
+ resp.read()
+ self.assertEqual(resp.status, 201)
+
+ def head(url, token, parsed, conn):
+ conn.request('HEAD', parsed.path + '/' + self.container, '',
+ {'X-Auth-Token': token})
+ return check_response(conn)
+ resp = retry(head)
+ resp.read()
+ headers = dict((k.lower(), v) for k, v in resp.getheaders())
+ self.assertEquals(headers.get('x-storage-policy'),
+ policy['name'])
+
+ # and test recreate with-out specifying Storage Policy
+ resp = retry(put)
+ resp.read()
+ self.assertEqual(resp.status, 202)
+ # should still be original storage policy
+ resp = retry(head)
+ resp.read()
+ headers = dict((k.lower(), v) for k, v in resp.getheaders())
+ self.assertEquals(headers.get('x-storage-policy'),
+ policy['name'])
+
+ # delete it
+ def delete(url, token, parsed, conn):
+ conn.request('DELETE', parsed.path + '/' + self.container, '',
+ {'X-Auth-Token': token})
+ return check_response(conn)
+ resp = retry(delete)
+ resp.read()
+ self.assertEqual(resp.status, 204)
+
+ # verify no policy header
+ resp = retry(head)
+ resp.read()
+ headers = dict((k.lower(), v) for k, v in resp.getheaders())
+ self.assertEquals(headers.get('x-storage-policy'), None)
+
+ @requires_policies
+ def test_conflict_change_storage_policy_with_put(self):
+ def put(url, token, parsed, conn, headers):
+ new_headers = dict({'X-Auth-Token': token}, **headers)
+ conn.request('PUT', parsed.path + '/' + self.container, '',
+ new_headers)
+ return check_response(conn)
+
+ # create
+ policy = self.policies.select()
+ resp = retry(put, {'X-Storage-Policy': policy['name']})
+ resp.read()
+ self.assertEqual(resp.status, 201)
+
+ # can't change it
+ other_policy = self.policies.exclude(name=policy['name']).select()
+ resp = retry(put, {'X-Storage-Policy': other_policy['name']})
+ resp.read()
+ self.assertEqual(resp.status, 409)
+
+ def head(url, token, parsed, conn):
+ conn.request('HEAD', parsed.path + '/' + self.container, '',
+ {'X-Auth-Token': token})
+ return check_response(conn)
+ # still original policy
+ resp = retry(head)
+ resp.read()
+ headers = dict((k.lower(), v) for k, v in resp.getheaders())
+ self.assertEquals(headers.get('x-storage-policy'),
+ policy['name'])
+
+ @requires_policies
+ def test_noop_change_storage_policy_with_post(self):
+ def put(url, token, parsed, conn, headers):
+ new_headers = dict({'X-Auth-Token': token}, **headers)
+ conn.request('PUT', parsed.path + '/' + self.container, '',
+ new_headers)
+ return check_response(conn)
+
+ # create
+ policy = self.policies.select()
+ resp = retry(put, {'X-Storage-Policy': policy['name']})
+ resp.read()
+ self.assertEqual(resp.status, 201)
+
+ def post(url, token, parsed, conn, headers):
+ new_headers = dict({'X-Auth-Token': token}, **headers)
+ conn.request('POST', parsed.path + '/' + self.container, '',
+ new_headers)
+ return check_response(conn)
+ # attempt update
+ for header in ('X-Storage-Policy', 'X-Storage-Policy-Index'):
+ other_policy = self.policies.exclude(name=policy['name']).select()
+ resp = retry(post, {header: other_policy['name']})
+ resp.read()
+ self.assertEqual(resp.status, 204)
+
+ def head(url, token, parsed, conn):
+ conn.request('HEAD', parsed.path + '/' + self.container, '',
+ {'X-Auth-Token': token})
+ return check_response(conn)
+ # still original policy
+ resp = retry(head)
+ resp.read()
+ headers = dict((k.lower(), v) for k, v in resp.getheaders())
+ self.assertEquals(headers.get('x-storage-policy'),
+ policy['name'])
+
+
+class BaseTestContainerACLs(unittest.TestCase):
+ # subclasses can change the account in which container
+ # is created/deleted by setUp/tearDown
+ account = 1
+
+ def _get_account(self, url, token, parsed, conn):
+ return parsed.path
+
+ def _get_tenant_id(self, url, token, parsed, conn):
+ account = parsed.path
+ return account.replace('/v1/AUTH_', '', 1)
+
+ def setUp(self):
+ if tf.skip or tf.skip2 or tf.skip_if_not_v3:
+ raise SkipTest('AUTH VERSION 3 SPECIFIC TEST')
+ self.name = uuid4().hex
+
+ def put(url, token, parsed, conn):
+ conn.request('PUT', parsed.path + '/' + self.name, '',
+ {'X-Auth-Token': token})
+ return check_response(conn)
+
+ resp = retry(put, use_account=self.account)
+ resp.read()
+ self.assertEqual(resp.status, 201)
+
+ def tearDown(self):
+ if tf.skip or tf.skip2 or tf.skip_if_not_v3:
+ raise SkipTest
+
+ def get(url, token, parsed, conn):
+ conn.request('GET', parsed.path + '/' + self.name + '?format=json',
+ '', {'X-Auth-Token': token})
+ return check_response(conn)
+
+ def delete(url, token, parsed, conn, obj):
+ conn.request('DELETE',
+ '/'.join([parsed.path, self.name, obj['name']]), '',
+ {'X-Auth-Token': token})
+ return check_response(conn)
+
+ while True:
+ resp = retry(get, use_account=self.account)
+ body = resp.read()
+ self.assert_(resp.status // 100 == 2, resp.status)
+ objs = json.loads(body)
+ if not objs:
+ break
+ for obj in objs:
+ resp = retry(delete, obj, use_account=self.account)
+ resp.read()
+ self.assertEqual(resp.status, 204)
+
+ def delete(url, token, parsed, conn):
+ conn.request('DELETE', parsed.path + '/' + self.name, '',
+ {'X-Auth-Token': token})
+ return check_response(conn)
+
+ resp = retry(delete, use_account=self.account)
+ resp.read()
+ self.assertEqual(resp.status, 204)
+
+ def _assert_cross_account_acl_granted(self, granted, grantee_account, acl):
+ '''
+ Check whether a given container ACL is granted when a user specified
+ by account_b attempts to access a container.
+ '''
+ # Obtain the first account's string
+ first_account = retry(self._get_account, use_account=self.account)
+
+ # Ensure we can't access the container with the grantee account
+ def get2(url, token, parsed, conn):
+ conn.request('GET', first_account + '/' + self.name, '',
+ {'X-Auth-Token': token})
+ return check_response(conn)
+
+ resp = retry(get2, use_account=grantee_account)
+ resp.read()
+ self.assertEqual(resp.status, 403)
+
+ def put2(url, token, parsed, conn):
+ conn.request('PUT', first_account + '/' + self.name + '/object',
+ 'test object', {'X-Auth-Token': token})
+ return check_response(conn)
+
+ resp = retry(put2, use_account=grantee_account)
+ resp.read()
+ self.assertEqual(resp.status, 403)
+
+ # Post ACL to the container
+ def post(url, token, parsed, conn):
+ conn.request('POST', parsed.path + '/' + self.name, '',
+ {'X-Auth-Token': token,
+ 'X-Container-Read': acl,
+ 'X-Container-Write': acl})
+ return check_response(conn)
+
+ resp = retry(post, use_account=self.account)
+ resp.read()
+ self.assertEqual(resp.status, 204)
+
+ # Check access to container from grantee account with ACL in place
+ resp = retry(get2, use_account=grantee_account)
+ resp.read()
+ expected = 204 if granted else 403
+ self.assertEqual(resp.status, expected)
+
+ resp = retry(put2, use_account=grantee_account)
+ resp.read()
+ expected = 201 if granted else 403
+ self.assertEqual(resp.status, expected)
+
+ # Make the container private again
+ def post(url, token, parsed, conn):
+ conn.request('POST', parsed.path + '/' + self.name, '',
+ {'X-Auth-Token': token, 'X-Container-Read': '',
+ 'X-Container-Write': ''})
+ return check_response(conn)
+
+ resp = retry(post, use_account=self.account)
+ resp.read()
+ self.assertEqual(resp.status, 204)
+
+ # Ensure we can't access the container with the grantee account again
+ resp = retry(get2, use_account=grantee_account)
+ resp.read()
+ self.assertEqual(resp.status, 403)
+
+ resp = retry(put2, use_account=grantee_account)
+ resp.read()
+ self.assertEqual(resp.status, 403)
+
+
+class TestContainerACLsAccount1(BaseTestContainerACLs):
+ def test_cross_account_acl_names_with_user_in_non_default_domain(self):
+ # names in acls are disallowed when grantee is in a non-default domain
+ acl = '%s:%s' % (tf.swift_test_tenant[3], tf.swift_test_user[3])
+ self._assert_cross_account_acl_granted(False, 4, acl)
+
+ def test_cross_account_acl_ids_with_user_in_non_default_domain(self):
+ # ids are allowed in acls when grantee is in a non-default domain
+ tenant_id = retry(self._get_tenant_id, use_account=4)
+ acl = '%s:%s' % (tenant_id, '*')
+ self._assert_cross_account_acl_granted(True, 4, acl)
+
+ def test_cross_account_acl_names_in_default_domain(self):
+ # names are allowed in acls when grantee and project are in
+ # the default domain
+ acl = '%s:%s' % (tf.swift_test_tenant[1], tf.swift_test_user[1])
+ self._assert_cross_account_acl_granted(True, 2, acl)
+
+ def test_cross_account_acl_ids_in_default_domain(self):
+ # ids are allowed in acls when grantee and project are in
+ # the default domain
+ tenant_id = retry(self._get_tenant_id, use_account=2)
+ acl = '%s:%s' % (tenant_id, '*')
+ self._assert_cross_account_acl_granted(True, 2, acl)
+
+
+class TestContainerACLsAccount4(BaseTestContainerACLs):
+ account = 4
+
+ def test_cross_account_acl_names_with_project_in_non_default_domain(self):
+ # names in acls are disallowed when project is in a non-default domain
+ acl = '%s:%s' % (tf.swift_test_tenant[0], tf.swift_test_user[0])
+ self._assert_cross_account_acl_granted(False, 1, acl)
+
+ def test_cross_account_acl_ids_with_project_in_non_default_domain(self):
+ # ids are allowed in acls when project is in a non-default domain
+ tenant_id = retry(self._get_tenant_id, use_account=1)
+ acl = '%s:%s' % (tenant_id, '*')
+ self._assert_cross_account_acl_granted(True, 1, acl)
+
if __name__ == '__main__':
unittest.main()