summaryrefslogtreecommitdiffstats
path: root/test/functional/test_account.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/functional/test_account.py')
-rwxr-xr-xtest/functional/test_account.py584
1 files changed, 536 insertions, 48 deletions
diff --git a/test/functional/test_account.py b/test/functional/test_account.py
index d456090..1cc61bc 100755
--- a/test/functional/test_account.py
+++ b/test/functional/test_account.py
@@ -17,19 +17,57 @@
import unittest
import json
+from uuid import uuid4
from nose import SkipTest
+from string import letters
from swift.common.constraints import MAX_META_COUNT, MAX_META_NAME_LENGTH, \
MAX_META_OVERALL_SIZE, MAX_META_VALUE_LENGTH
from swift.common.middleware.acl import format_acl
-from test.functional.swift_test_client import Connection
-from test import get_config
-from swift_testing import check_response, retry, skip, web_front_end
+from swift_testing import (check_response, retry, skip, skip2, skip3,
+ web_front_end, requires_acls)
import swift_testing
+from test.functional.tests import load_constraint
class TestAccount(unittest.TestCase):
+ def setUp(self):
+ self.max_meta_count = load_constraint('max_meta_count')
+ self.max_meta_name_length = load_constraint('max_meta_name_length')
+ self.max_meta_overall_size = load_constraint('max_meta_overall_size')
+ self.max_meta_value_length = load_constraint('max_meta_value_length')
+
+ def head(url, token, parsed, conn):
+ conn.request('HEAD', parsed.path, '', {'X-Auth-Token': token})
+ return check_response(conn)
+ resp = retry(head)
+ self.existing_metadata = set([
+ k for k, v in resp.getheaders() if
+ k.lower().startswith('x-account-meta')])
+
+ def tearDown(self):
+ def head(url, token, parsed, conn):
+ conn.request('HEAD', parsed.path, '', {'X-Auth-Token': token})
+ return check_response(conn)
+ resp = retry(head)
+ resp.read()
+ new_metadata = set(
+ [k for k, v in resp.getheaders() if
+ k.lower().startswith('x-account-meta')])
+
+ def clear_meta(url, token, parsed, conn, remove_metadata_keys):
+ headers = {'X-Auth-Token': token}
+ headers.update((k, '') for k in remove_metadata_keys)
+ conn.request('POST', parsed.path, '', headers)
+ return check_response(conn)
+ extra_metadata = list(self.existing_metadata ^ new_metadata)
+ for i in range(0, len(extra_metadata), 90):
+ batch = extra_metadata[i:i + 90]
+ resp = retry(clear_meta, batch)
+ resp.read()
+ self.assertEqual(resp.status // 100, 2)
+
def test_metadata(self):
if skip:
raise SkipTest
@@ -49,49 +87,338 @@ class TestAccount(unittest.TestCase):
resp = retry(post, '')
resp.read()
- self.assertEquals(resp.status, 204)
+ self.assertEqual(resp.status, 204)
resp = retry(head)
resp.read()
self.assert_(resp.status in (200, 204), resp.status)
- self.assertEquals(resp.getheader('x-account-meta-test'), None)
+ self.assertEqual(resp.getheader('x-account-meta-test'), None)
resp = retry(get)
resp.read()
self.assert_(resp.status in (200, 204), resp.status)
- self.assertEquals(resp.getheader('x-account-meta-test'), None)
+ self.assertEqual(resp.getheader('x-account-meta-test'), None)
resp = retry(post, 'Value')
resp.read()
- self.assertEquals(resp.status, 204)
+ self.assertEqual(resp.status, 204)
resp = retry(head)
resp.read()
self.assert_(resp.status in (200, 204), resp.status)
- self.assertEquals(resp.getheader('x-account-meta-test'), 'Value')
+ self.assertEqual(resp.getheader('x-account-meta-test'), 'Value')
resp = retry(get)
resp.read()
self.assert_(resp.status in (200, 204), resp.status)
- self.assertEquals(resp.getheader('x-account-meta-test'), 'Value')
+ self.assertEqual(resp.getheader('x-account-meta-test'), 'Value')
- def test_tempauth_account_acls(self):
- if skip:
+ def test_invalid_acls(self):
+ def post(url, token, parsed, conn, headers):
+ new_headers = dict({'X-Auth-Token': token}, **headers)
+ conn.request('POST', parsed.path, '', new_headers)
+ return check_response(conn)
+
+ # needs to be an acceptable header size
+ num_keys = 8
+ max_key_size = load_constraint('max_header_size') / num_keys
+ acl = {'admin': [c * max_key_size for c in letters[:num_keys]]}
+ headers = {'x-account-access-control': format_acl(
+ version=2, acl_dict=acl)}
+ resp = retry(post, headers=headers, use_account=1)
+ resp.read()
+ self.assertEqual(resp.status, 400)
+
+ # and again a touch smaller
+ acl = {'admin': [c * max_key_size for c in letters[:num_keys - 1]]}
+ headers = {'x-account-access-control': format_acl(
+ version=2, acl_dict=acl)}
+ resp = retry(post, headers=headers, use_account=1)
+ resp.read()
+ self.assertEqual(resp.status, 204)
+
+ @requires_acls
+ def test_invalid_acl_keys(self):
+ def post(url, token, parsed, conn, headers):
+ new_headers = dict({'X-Auth-Token': token}, **headers)
+ conn.request('POST', parsed.path, '', new_headers)
+ return check_response(conn)
+
+ # needs to be json
+ resp = retry(post, headers={'X-Account-Access-Control': 'invalid'},
+ use_account=1)
+ resp.read()
+ self.assertEqual(resp.status, 400)
+
+ acl_user = swift_testing.swift_test_user[1]
+ acl = {'admin': [acl_user], 'invalid_key': 'invalid_value'}
+ headers = {'x-account-access-control': format_acl(
+ version=2, acl_dict=acl)}
+
+ resp = retry(post, headers, use_account=1)
+ resp.read()
+ self.assertEqual(resp.status, 400)
+ self.assertEqual(resp.getheader('X-Account-Access-Control'), None)
+
+ @requires_acls
+ def test_invalid_acl_values(self):
+ def post(url, token, parsed, conn, headers):
+ new_headers = dict({'X-Auth-Token': token}, **headers)
+ conn.request('POST', parsed.path, '', new_headers)
+ return check_response(conn)
+
+ acl = {'admin': 'invalid_value'}
+ headers = {'x-account-access-control': format_acl(
+ version=2, acl_dict=acl)}
+
+ resp = retry(post, headers=headers, use_account=1)
+ resp.read()
+ self.assertEqual(resp.status, 400)
+ self.assertEqual(resp.getheader('X-Account-Access-Control'), None)
+
+ @requires_acls
+ def test_read_only_acl(self):
+ if skip3:
raise SkipTest
- # Determine whether this cluster has account ACLs; if not, skip test
- conn = Connection(get_config('func_test'))
- conn.authenticate()
- status = conn.make_request(
- 'GET', '/info', cfg={'verbatim_path': True})
- if status // 100 != 2:
- # Can't tell if account ACLs are enabled; skip tests proactively.
+ def get(url, token, parsed, conn):
+ conn.request('GET', parsed.path, '', {'X-Auth-Token': token})
+ return check_response(conn)
+
+ def post(url, token, parsed, conn, headers):
+ new_headers = dict({'X-Auth-Token': token}, **headers)
+ conn.request('POST', parsed.path, '', new_headers)
+ return check_response(conn)
+
+ # cannot read account
+ resp = retry(get, use_account=3)
+ resp.read()
+ self.assertEquals(resp.status, 403)
+
+ # grant read access
+ acl_user = swift_testing.swift_test_user[2]
+ acl = {'read-only': [acl_user]}
+ headers = {'x-account-access-control': format_acl(
+ version=2, acl_dict=acl)}
+ resp = retry(post, headers=headers, use_account=1)
+ resp.read()
+ self.assertEqual(resp.status, 204)
+
+ # read-only can read account headers
+ resp = retry(get, use_account=3)
+ resp.read()
+ self.assert_(resp.status in (200, 204))
+ # but not acls
+ self.assertEqual(resp.getheader('X-Account-Access-Control'), None)
+
+ # read-only can not write metadata
+ headers = {'x-account-meta-test': 'value'}
+ resp = retry(post, headers=headers, use_account=3)
+ resp.read()
+ self.assertEqual(resp.status, 403)
+
+ # but they can read it
+ headers = {'x-account-meta-test': 'value'}
+ resp = retry(post, headers=headers, use_account=1)
+ resp.read()
+ self.assertEqual(resp.status, 204)
+ resp = retry(get, use_account=3)
+ resp.read()
+ self.assert_(resp.status in (200, 204))
+ self.assertEqual(resp.getheader('X-Account-Meta-Test'), 'value')
+
+ @requires_acls
+ def test_read_write_acl(self):
+ if skip3:
+ raise SkipTest
+
+ def get(url, token, parsed, conn):
+ conn.request('GET', parsed.path, '', {'X-Auth-Token': token})
+ return check_response(conn)
+
+ def post(url, token, parsed, conn, headers):
+ new_headers = dict({'X-Auth-Token': token}, **headers)
+ conn.request('POST', parsed.path, '', new_headers)
+ return check_response(conn)
+
+ # cannot read account
+ resp = retry(get, use_account=3)
+ resp.read()
+ self.assertEquals(resp.status, 403)
+
+ # grant read-write access
+ acl_user = swift_testing.swift_test_user[2]
+ acl = {'read-write': [acl_user]}
+ headers = {'x-account-access-control': format_acl(
+ version=2, acl_dict=acl)}
+ resp = retry(post, headers=headers, use_account=1)
+ resp.read()
+ self.assertEqual(resp.status, 204)
+
+ # read-write can read account headers
+ resp = retry(get, use_account=3)
+ resp.read()
+ self.assert_(resp.status in (200, 204))
+ # but not acls
+ self.assertEqual(resp.getheader('X-Account-Access-Control'), None)
+
+ # read-write can not write account metadata
+ headers = {'x-account-meta-test': 'value'}
+ resp = retry(post, headers=headers, use_account=3)
+ resp.read()
+ self.assertEqual(resp.status, 403)
+
+ @requires_acls
+ def test_admin_acl(self):
+ if skip3:
+ raise SkipTest
+
+ def get(url, token, parsed, conn):
+ conn.request('GET', parsed.path, '', {'X-Auth-Token': token})
+ return check_response(conn)
+
+ def post(url, token, parsed, conn, headers):
+ new_headers = dict({'X-Auth-Token': token}, **headers)
+ conn.request('POST', parsed.path, '', new_headers)
+ return check_response(conn)
+
+ # cannot read account
+ resp = retry(get, use_account=3)
+ resp.read()
+ self.assertEquals(resp.status, 403)
+
+ # grant admin access
+ acl_user = swift_testing.swift_test_user[2]
+ acl = {'admin': [acl_user]}
+ acl_json_str = format_acl(version=2, acl_dict=acl)
+ headers = {'x-account-access-control': acl_json_str}
+ resp = retry(post, headers=headers, use_account=1)
+ resp.read()
+ self.assertEqual(resp.status, 204)
+
+ # admin can read account headers
+ resp = retry(get, use_account=3)
+ resp.read()
+ self.assert_(resp.status in (200, 204))
+ # including acls
+ self.assertEqual(resp.getheader('X-Account-Access-Control'),
+ acl_json_str)
+
+ # admin can write account metadata
+ value = str(uuid4())
+ headers = {'x-account-meta-test': value}
+ resp = retry(post, headers=headers, use_account=3)
+ resp.read()
+ self.assertEqual(resp.status, 204)
+ resp = retry(get, use_account=3)
+ resp.read()
+ self.assert_(resp.status in (200, 204))
+ self.assertEqual(resp.getheader('X-Account-Meta-Test'), value)
+
+ # admin can even revoke their own access
+ headers = {'x-account-access-control': '{}'}
+ resp = retry(post, headers=headers, use_account=3)
+ resp.read()
+ self.assertEqual(resp.status, 204)
+
+ # and again, cannot read account
+ resp = retry(get, use_account=3)
+ resp.read()
+ self.assertEquals(resp.status, 403)
+
+ @requires_acls
+ def test_protected_tempurl(self):
+ if skip3:
+ raise SkipTest
+
+ def get(url, token, parsed, conn):
+ conn.request('GET', parsed.path, '', {'X-Auth-Token': token})
+ return check_response(conn)
+
+ def post(url, token, parsed, conn, headers):
+ new_headers = dict({'X-Auth-Token': token}, **headers)
+ conn.request('POST', parsed.path, '', new_headers)
+ return check_response(conn)
+
+ # add a account metadata, and temp-url-key to account
+ value = str(uuid4())
+ headers = {
+ 'x-account-meta-temp-url-key': 'secret',
+ 'x-account-meta-test': value,
+ }
+ resp = retry(post, headers=headers, use_account=1)
+ resp.read()
+ self.assertEqual(resp.status, 204)
+
+ # grant read-only access to tester3
+ acl_user = swift_testing.swift_test_user[2]
+ acl = {'read-only': [acl_user]}
+ acl_json_str = format_acl(version=2, acl_dict=acl)
+ headers = {'x-account-access-control': acl_json_str}
+ resp = retry(post, headers=headers, use_account=1)
+ resp.read()
+ self.assertEqual(resp.status, 204)
+
+ # read-only tester3 can read account metadata
+ resp = retry(get, use_account=3)
+ resp.read()
+ self.assert_(resp.status in (200, 204),
+ 'Expected status in (200, 204), got %s' % resp.status)
+ self.assertEqual(resp.getheader('X-Account-Meta-Test'), value)
+ # but not temp-url-key
+ self.assertEqual(resp.getheader('X-Account-Meta-Temp-Url-Key'), None)
+
+ # grant read-write access to tester3
+ acl_user = swift_testing.swift_test_user[2]
+ acl = {'read-write': [acl_user]}
+ acl_json_str = format_acl(version=2, acl_dict=acl)
+ headers = {'x-account-access-control': acl_json_str}
+ resp = retry(post, headers=headers, use_account=1)
+ resp.read()
+ self.assertEqual(resp.status, 204)
+
+ # read-write tester3 can read account metadata
+ resp = retry(get, use_account=3)
+ resp.read()
+ self.assert_(resp.status in (200, 204),
+ 'Expected status in (200, 204), got %s' % resp.status)
+ self.assertEqual(resp.getheader('X-Account-Meta-Test'), value)
+ # but not temp-url-key
+ self.assertEqual(resp.getheader('X-Account-Meta-Temp-Url-Key'), None)
+
+ # grant admin access to tester3
+ acl_user = swift_testing.swift_test_user[2]
+ acl = {'admin': [acl_user]}
+ acl_json_str = format_acl(version=2, acl_dict=acl)
+ headers = {'x-account-access-control': acl_json_str}
+ resp = retry(post, headers=headers, use_account=1)
+ resp.read()
+ self.assertEqual(resp.status, 204)
+
+ # admin tester3 can read account metadata
+ resp = retry(get, use_account=3)
+ resp.read()
+ self.assert_(resp.status in (200, 204),
+ 'Expected status in (200, 204), got %s' % resp.status)
+ self.assertEqual(resp.getheader('X-Account-Meta-Test'), value)
+ # including temp-url-key
+ self.assertEqual(resp.getheader('X-Account-Meta-Temp-Url-Key'),
+ 'secret')
+
+ # admin tester3 can even change temp-url-key
+ secret = str(uuid4())
+ headers = {
+ 'x-account-meta-temp-url-key': secret,
+ }
+ resp = retry(post, headers=headers, use_account=3)
+ resp.read()
+ self.assertEqual(resp.status, 204)
+ resp = retry(get, use_account=3)
+ resp.read()
+ self.assert_(resp.status in (200, 204),
+ 'Expected status in (200, 204), got %s' % resp.status)
+ self.assertEqual(resp.getheader('X-Account-Meta-Temp-Url-Key'),
+ secret)
+
+ @requires_acls
+ def test_account_acls(self):
+ if skip2:
raise SkipTest
- else:
- cluster_info = json.loads(conn.response.read())
- if not cluster_info.get('tempauth', {}).get('account_acls'):
- raise SkipTest
- if 'keystoneauth' in cluster_info:
- # Unfortunate hack -- tempauth (with account ACLs) is expected
- # to play nice with Keystone (without account ACLs), but Zuul
- # functest framework doesn't give us an easy way to get a
- # tempauth user.
- raise SkipTest
def post(url, token, parsed, conn, headers):
new_headers = dict({'X-Auth-Token': token}, **headers)
@@ -212,6 +539,137 @@ class TestAccount(unittest.TestCase):
use_account=1)
resp.read()
+ @requires_acls
+ def test_swift_account_acls(self):
+ if skip:
+ raise SkipTest
+
+ def post(url, token, parsed, conn, headers):
+ new_headers = dict({'X-Auth-Token': token}, **headers)
+ conn.request('POST', parsed.path, '', new_headers)
+ return check_response(conn)
+
+ def head(url, token, parsed, conn):
+ conn.request('HEAD', parsed.path, '', {'X-Auth-Token': token})
+ return check_response(conn)
+
+ def get(url, token, parsed, conn):
+ conn.request('GET', parsed.path, '', {'X-Auth-Token': token})
+ return check_response(conn)
+
+ try:
+ # User1 can POST to their own account
+ resp = retry(post, headers={'X-Account-Access-Control': '{}'})
+ resp.read()
+ self.assertEqual(resp.status, 204)
+ self.assertEqual(resp.getheader('X-Account-Access-Control'), None)
+
+ # User1 can GET their own empty account
+ resp = retry(get)
+ resp.read()
+ self.assertEqual(resp.status // 100, 2)
+ self.assertEqual(resp.getheader('X-Account-Access-Control'), None)
+
+ # User1 can POST non-empty data
+ acl_json = '{"admin":["bob"]}'
+ resp = retry(post, headers={'X-Account-Access-Control': acl_json})
+ resp.read()
+ self.assertEqual(resp.status, 204)
+
+ # User1 can GET the non-empty data
+ resp = retry(get)
+ resp.read()
+ self.assertEqual(resp.status // 100, 2)
+ self.assertEqual(resp.getheader('X-Account-Access-Control'),
+ acl_json)
+
+ # POST non-JSON ACL should fail
+ resp = retry(post, headers={'X-Account-Access-Control': 'yuck'})
+ resp.read()
+ # resp.status will be 400 if tempauth or some other ACL-aware
+ # auth middleware rejects it, or 200 (but silently swallowed by
+ # core Swift) if ACL-unaware auth middleware approves it.
+
+ # A subsequent GET should show the old, valid data, not the garbage
+ resp = retry(get)
+ resp.read()
+ self.assertEqual(resp.status // 100, 2)
+ self.assertEqual(resp.getheader('X-Account-Access-Control'),
+ acl_json)
+
+ finally:
+ # Make sure to clean up even if tests fail -- User2 should not
+ # have access to User1's account in other functional tests!
+ resp = retry(post, headers={'X-Account-Access-Control': '{}'})
+ resp.read()
+
+ def test_swift_prohibits_garbage_account_acls(self):
+ if skip:
+ raise SkipTest
+
+ def post(url, token, parsed, conn, headers):
+ new_headers = dict({'X-Auth-Token': token}, **headers)
+ conn.request('POST', parsed.path, '', new_headers)
+ return check_response(conn)
+
+ def get(url, token, parsed, conn):
+ conn.request('GET', parsed.path, '', {'X-Auth-Token': token})
+ return check_response(conn)
+
+ try:
+ # User1 can POST to their own account
+ resp = retry(post, headers={'X-Account-Access-Control': '{}'})
+ resp.read()
+ self.assertEqual(resp.status, 204)
+ self.assertEqual(resp.getheader('X-Account-Access-Control'), None)
+
+ # User1 can GET their own empty account
+ resp = retry(get)
+ resp.read()
+ self.assertEqual(resp.status // 100, 2)
+ self.assertEqual(resp.getheader('X-Account-Access-Control'), None)
+
+ # User1 can POST non-empty data
+ acl_json = '{"admin":["bob"]}'
+ resp = retry(post, headers={'X-Account-Access-Control': acl_json})
+ resp.read()
+ self.assertEqual(resp.status, 204)
+ # If this request is handled by ACL-aware auth middleware, then the
+ # ACL will be persisted. If it is handled by ACL-unaware auth
+ # middleware, then the header will be thrown out. But the request
+ # should return successfully in any case.
+
+ # User1 can GET the non-empty data
+ resp = retry(get)
+ resp.read()
+ self.assertEqual(resp.status // 100, 2)
+ # ACL will be set if some ACL-aware auth middleware (e.g. tempauth)
+ # propagates it to sysmeta; if no ACL-aware auth middleware does,
+ # then X-Account-Access-Control will still be empty.
+
+ # POST non-JSON ACL should fail
+ resp = retry(post, headers={'X-Account-Access-Control': 'yuck'})
+ resp.read()
+ # resp.status will be 400 if tempauth or some other ACL-aware
+ # auth middleware rejects it, or 200 (but silently swallowed by
+ # core Swift) if ACL-unaware auth middleware approves it.
+
+ # A subsequent GET should either show the old, valid data (if
+ # ACL-aware auth middleware is propagating it) or show nothing
+ # (if no auth middleware in the pipeline is ACL-aware), but should
+ # never return the garbage ACL.
+ resp = retry(get)
+ resp.read()
+ self.assertEqual(resp.status // 100, 2)
+ self.assertNotEqual(resp.getheader('X-Account-Access-Control'),
+ 'yuck')
+
+ finally:
+ # Make sure to clean up even if tests fail -- User2 should not
+ # have access to User1's account in other functional tests!
+ resp = retry(post, headers={'X-Account-Access-Control': '{}'})
+ resp.read()
+
def test_unicode_metadata(self):
if skip:
raise SkipTest
@@ -233,24 +691,24 @@ class TestAccount(unittest.TestCase):
resp = retry(head)
resp.read()
self.assert_(resp.status in (200, 204), resp.status)
- self.assertEquals(resp.getheader(uni_key.encode('utf-8')), '1')
+ self.assertEqual(resp.getheader(uni_key.encode('utf-8')), '1')
resp = retry(post, 'X-Account-Meta-uni', uni_value)
resp.read()
- self.assertEquals(resp.status, 204)
+ self.assertEqual(resp.status, 204)
resp = retry(head)
resp.read()
self.assert_(resp.status in (200, 204), resp.status)
- self.assertEquals(resp.getheader('X-Account-Meta-uni'),
- uni_value.encode('utf-8'))
+ self.assertEqual(resp.getheader('X-Account-Meta-uni'),
+ uni_value.encode('utf-8'))
if (web_front_end == 'integral'):
resp = retry(post, uni_key, uni_value)
resp.read()
- self.assertEquals(resp.status, 204)
+ self.assertEqual(resp.status, 204)
resp = retry(head)
resp.read()
self.assert_(resp.status in (200, 204), resp.status)
- self.assertEquals(resp.getheader(uni_key.encode('utf-8')),
- uni_value.encode('utf-8'))
+ self.assertEqual(resp.getheader(uni_key.encode('utf-8')),
+ uni_value.encode('utf-8'))
def test_multi_metadata(self):
if skip:
@@ -267,19 +725,19 @@ class TestAccount(unittest.TestCase):
resp = retry(post, 'X-Account-Meta-One', '1')
resp.read()
- self.assertEquals(resp.status, 204)
+ self.assertEqual(resp.status, 204)
resp = retry(head)
resp.read()
self.assert_(resp.status in (200, 204), resp.status)
- self.assertEquals(resp.getheader('x-account-meta-one'), '1')
+ self.assertEqual(resp.getheader('x-account-meta-one'), '1')
resp = retry(post, 'X-Account-Meta-Two', '2')
resp.read()
- self.assertEquals(resp.status, 204)
+ self.assertEqual(resp.status, 204)
resp = retry(head)
resp.read()
self.assert_(resp.status in (200, 204), resp.status)
- self.assertEquals(resp.getheader('x-account-meta-one'), '1')
- self.assertEquals(resp.getheader('x-account-meta-two'), '2')
+ self.assertEqual(resp.getheader('x-account-meta-one'), '1')
+ self.assertEqual(resp.getheader('x-account-meta-two'), '2')
def test_bad_metadata(self):
if skip:
@@ -294,35 +752,65 @@ class TestAccount(unittest.TestCase):
resp = retry(post,
{'X-Account-Meta-' + ('k' * MAX_META_NAME_LENGTH): 'v'})
resp.read()
- self.assertEquals(resp.status, 204)
+ self.assertEqual(resp.status, 204)
resp = retry(
post,
{'X-Account-Meta-' + ('k' * (MAX_META_NAME_LENGTH + 1)): 'v'})
resp.read()
- self.assertEquals(resp.status, 400)
+ self.assertEqual(resp.status, 400)
resp = retry(post,
{'X-Account-Meta-Too-Long': 'k' * MAX_META_VALUE_LENGTH})
resp.read()
- self.assertEquals(resp.status, 204)
+ self.assertEqual(resp.status, 204)
resp = retry(
post,
{'X-Account-Meta-Too-Long': 'k' * (MAX_META_VALUE_LENGTH + 1)})
resp.read()
- self.assertEquals(resp.status, 400)
+ self.assertEqual(resp.status, 400)
+
+ def test_bad_metadata2(self):
+ if skip:
+ raise SkipTest
+
+ def post(url, token, parsed, conn, extra_headers):
+ headers = {'X-Auth-Token': token}
+ headers.update(extra_headers)
+ conn.request('POST', parsed.path, '', headers)
+ return check_response(conn)
+
+ # TODO: Find the test that adds these and remove them.
+ headers = {'x-remove-account-meta-temp-url-key': 'remove',
+ 'x-remove-account-meta-temp-url-key-2': 'remove'}
+ resp = retry(post, headers)
headers = {}
for x in xrange(MAX_META_COUNT):
headers['X-Account-Meta-%d' % x] = 'v'
resp = retry(post, headers)
resp.read()
- self.assertEquals(resp.status, 204)
+ self.assertEqual(resp.status, 204)
headers = {}
for x in xrange(MAX_META_COUNT + 1):
headers['X-Account-Meta-%d' % x] = 'v'
resp = retry(post, headers)
resp.read()
- self.assertEquals(resp.status, 400)
+ self.assertEqual(resp.status, 400)
+
+ def test_bad_metadata3(self):
+ if skip:
+ raise SkipTest
+
+ def post(url, token, parsed, conn, extra_headers):
+ headers = {'X-Auth-Token': token}
+ headers.update(extra_headers)
+ conn.request('POST', parsed.path, '', headers)
+ return check_response(conn)
+
+ # TODO: Find the test that adds these and remove them.
+ headers = {'x-remove-account-meta-temp-url-key': 'remove',
+ 'x-remove-account-meta-temp-url-key-2': 'remove'}
+ resp = retry(post, headers)
headers = {}
header_value = 'k' * MAX_META_VALUE_LENGTH
@@ -337,12 +825,12 @@ class TestAccount(unittest.TestCase):
'v' * (MAX_META_OVERALL_SIZE - size - 1)
resp = retry(post, headers)
resp.read()
- self.assertEquals(resp.status, 204)
+ self.assertEqual(resp.status, 204)
headers['X-Account-Meta-k'] = \
'v' * (MAX_META_OVERALL_SIZE - size)
resp = retry(post, headers)
resp.read()
- self.assertEquals(resp.status, 400)
+ self.assertEqual(resp.status, 400)
if __name__ == '__main__':