summaryrefslogtreecommitdiffstats
path: root/test/functional
diff options
context:
space:
mode:
Diffstat (limited to 'test/functional')
-rw-r--r--test/functional/gluster_swift_tests.py91
-rw-r--r--test/functional/swift_test_client.py47
-rw-r--r--test/functional/swift_testing.py45
-rwxr-xr-xtest/functional/test_account.py584
-rwxr-xr-xtest/functional/test_container.py833
-rwxr-xr-xtest/functional/test_object.py524
-rw-r--r--test/functional/tests.py470
7 files changed, 2289 insertions, 305 deletions
diff --git a/test/functional/gluster_swift_tests.py b/test/functional/gluster_swift_tests.py
index 2768f9d..b4514c9 100644
--- a/test/functional/gluster_swift_tests.py
+++ b/test/functional/gluster_swift_tests.py
@@ -58,44 +58,13 @@ class TestFile(Base):
data_read = file.read()
self.assertEquals(data,data_read)
- def testInvalidHeadersPUT(self):
- #TODO: Although we now support x-delete-at and x-delete-after,
- #retained this test case as we may add some other header to
- #unsupported list in future
- raise SkipTest()
- file = self.env.container.file(Utils.create_name())
- self.assertRaises(ResponseError,
- file.write_random,
- self.env.file_size,
- hdrs={'X-Delete-At': '9876545321'})
- self.assert_status(400)
- self.assertRaises(ResponseError,
- file.write_random,
- self.env.file_size,
- hdrs={'X-Delete-After': '60'})
- self.assert_status(400)
-
- def testInvalidHeadersPOST(self):
- #TODO: Although we now support x-delete-at and x-delete-after,
- #retained this test case as we may add some other header to
- #unsupported list in future
- raise SkipTest()
- file = self.env.container.file(Utils.create_name())
- file.write_random(self.env.file_size)
- headers = file.make_headers(cfg={})
- headers.update({ 'X-Delete-At' : '987654321'})
- # Need to call conn.make_request instead of file.sync_metadata
- # because sync_metadata calls make_headers. make_headers()
- # overwrites any headers in file.metadata as 'user' metadata
- # by appending 'X-Object-Meta-' to any of the headers
- # in file.metadata.
- file.conn.make_request('POST', file.path, hdrs=headers, cfg={})
- self.assertEqual(400, file.conn.response.status)
-
- headers = file.make_headers(cfg={})
- headers.update({ 'X-Delete-After' : '60'})
- file.conn.make_request('POST', file.path, hdrs=headers, cfg={})
- self.assertEqual(400, file.conn.response.status)
+ def test_PUT_large_object(self):
+ file_item = self.env.container.file(Utils.create_name())
+ data = File.random_data(1024 * 1024 * 2)
+ self.assertTrue(file_item.write(data))
+ self.assert_status(201)
+ self.assertTrue(data == file_item.read())
+ self.assert_status(200)
class TestFileUTF8(Base2, TestFile):
@@ -375,3 +344,49 @@ class TestMultiProtocolAccess(Base):
md5_returned = hashlib.md5(data_read_from_mountP).hexdigest()
self.assertEquals(md5_returned,file_info['etag'])
fhOnMountPoint.close()
+
+ def testObjectMetadataWhenFileModified(self):
+ data = "I'm whatever Gotham needs me to be "
+ data_hash = hashlib.md5(data).hexdigest()
+ # Create an object through object interface
+ object_name = Utils.create_name()
+ object_item = self.env.container.file(object_name)
+ object_item.write(data)
+ # Make sure GET works
+ self.assertEqual(data, object_item.read())
+ self.assert_status(200)
+ # Check Etag is right
+ self.assertEqual(data_hash, object_item.info()['etag'])
+ self.assert_status(200)
+
+ # Extend/append more data to file from filesystem interface
+ file_path = os.path.join(self.env.root_dir,
+ self.env.container.name,
+ object_name)
+ more_data = "- Batman"
+ with open(file_path, 'a') as f:
+ f.write(more_data)
+ total_data = data + more_data
+ total_data_hash = hashlib.md5(total_data).hexdigest()
+ # Make sure GET works
+ self.assertEqual(total_data, object_item.read())
+ self.assert_status(200)
+ # Check Etag and content-length is right
+ metadata = object_item.info()
+ self.assert_status(200)
+ self.assertEqual(total_data_hash, metadata['etag'])
+ self.assertEqual(len(total_data), int(metadata['content_length']))
+
+ # Re-write the file to be shorter
+ new_data = "I am Batman"
+ new_data_hash = hashlib.md5(new_data).hexdigest()
+ with open(file_path, 'w') as f:
+ f.write(new_data)
+ # Make sure GET works
+ self.assertEqual(new_data, object_item.read())
+ self.assert_status(200)
+ # Check Etag and content-length is right
+ metadata = object_item.info()
+ self.assert_status(200)
+ self.assertEqual(new_data_hash, metadata['etag'])
+ self.assertEqual(len(new_data), int(metadata['content_length']))
diff --git a/test/functional/swift_test_client.py b/test/functional/swift_test_client.py
index a7c7c96..27e025b 100644
--- a/test/functional/swift_test_client.py
+++ b/test/functional/swift_test_client.py
@@ -144,6 +144,7 @@ class Connection(object):
auth_scheme = 'https://' if self.auth_ssl else 'http://'
auth_netloc = "%s:%d" % (self.auth_host, self.auth_port)
auth_url = auth_scheme + auth_netloc + auth_path
+
(storage_url, storage_token) = get_auth(
auth_url, auth_user, self.password, snet=False,
tenant_name=self.account, auth_version=self.auth_version,
@@ -166,17 +167,29 @@ class Connection(object):
self.storage_host = x[2].split(':')[0]
if ':' in x[2]:
self.storage_port = int(x[2].split(':')[1])
- # Make sure storage_url and the storage_token are
- # strings and not unicode, since
+ # Make sure storage_url is a string and not unicode, since
# keystoneclient (called by swiftclient) returns them in
# unicode and this would cause troubles when doing
# no_safe_quote query.
self.storage_url = str('/%s/%s' % (x[3], x[4]))
- self.storage_token = str(storage_token)
+
+ self.storage_token = storage_token
self.http_connect()
return self.storage_url, self.storage_token
+ def cluster_info(self):
+ """
+ Retrieve the data in /info, or {} on 404
+ """
+ status = self.make_request('GET', '/info',
+ cfg={'absolute_path': True})
+ if status == 404:
+ return {}
+ if not 200 <= status <= 299:
+ raise ResponseError(self.response, 'GET', '/info')
+ return json.loads(self.response.read())
+
def http_connect(self):
self.connection = self.conn_class(self.storage_host,
port=self.storage_port)
@@ -207,8 +220,8 @@ class Connection(object):
def make_request(self, method, path=[], data='', hdrs={}, parms={},
cfg={}):
- if not cfg.get('verbatim_path'):
- # Set verbatim_path=True to make a request to exactly the given
+ if not cfg.get('absolute_path'):
+ # Set absolute_path=True to make a request to exactly the given
# path, not storage path + given path. Useful for
# non-account/container/object requests.
path = self.make_path(path, cfg=cfg)
@@ -305,7 +318,7 @@ class Connection(object):
return self.response.status
-class Base:
+class Base(object):
def __str__(self):
return self.name
@@ -339,6 +352,16 @@ class Account(Base):
self.conn = conn
self.name = str(name)
+ def update_metadata(self, metadata={}, cfg={}):
+ headers = dict(("X-Account-Meta-%s" % k, v)
+ for k, v in metadata.items())
+
+ self.conn.make_request('POST', self.path, hdrs=headers, cfg=cfg)
+ if not 200 <= self.conn.response.status <= 299:
+ raise ResponseError(self.conn.response, 'POST',
+ self.conn.make_path(self.path))
+ return True
+
def container(self, container_name):
return Container(self.conn, self.name, container_name)
@@ -532,6 +555,11 @@ class File(Base):
else:
headers['Content-Type'] = 'application/octet-stream'
+ if cfg.get('x_delete_at'):
+ headers['X-Delete-At'] = cfg.get('x_delete_at')
+ if cfg.get('x_delete_after'):
+ headers['X-Delete-After'] = cfg.get('x_delete_after')
+
for key in self.metadata:
headers['X-Object-Meta-' + key] = self.metadata[key]
@@ -588,7 +616,11 @@ class File(Base):
['last_modified', 'last-modified'],
['etag', 'etag']]
- header_fields = self.header_fields(fields)
+ optional_fields = [['x_delete_at', 'x-delete-at'],
+ ['x_delete_after', 'x-delete-after']]
+
+ header_fields = self.header_fields(fields,
+ optional_fields=optional_fields)
header_fields['etag'] = header_fields['etag'].strip('"')
return header_fields
@@ -705,7 +737,6 @@ class File(Base):
cfg.get('set_content_length')
else:
headers['Content-Length'] = 0
-
self.conn.make_request('POST', self.path, hdrs=headers, cfg=cfg)
if self.conn.response.status not in (201, 202):
diff --git a/test/functional/swift_testing.py b/test/functional/swift_testing.py
index f05cb48..2a1e1fa 100644
--- a/test/functional/swift_testing.py
+++ b/test/functional/swift_testing.py
@@ -19,10 +19,13 @@ import socket
import sys
from time import sleep
from urlparse import urlparse
+import functools
+from nose import SkipTest
from test import get_config
from swiftclient import get_auth, http_connection
+from test.functional.swift_test_client import Connection
conf = get_config('func_test')
web_front_end = conf.get('web_front_end', 'integral')
@@ -184,3 +187,45 @@ def check_response(conn):
resp.read()
raise InternalServerError()
return resp
+
+cluster_info = {}
+
+
+def get_cluster_info():
+ conn = Connection(conf)
+ conn.authenticate()
+ global cluster_info
+ cluster_info = conn.cluster_info()
+
+
+def reset_acl():
+ def post(url, token, parsed, conn):
+ conn.request('POST', parsed.path, '', {
+ 'X-Auth-Token': token,
+ 'X-Account-Access-Control': '{}'
+ })
+ return check_response(conn)
+ resp = retry(post, use_account=1)
+ resp.read()
+
+
+def requires_acls(f):
+ @functools.wraps(f)
+ def wrapper(*args, **kwargs):
+ if skip:
+ raise SkipTest
+ if not cluster_info:
+ get_cluster_info()
+ # Determine whether this cluster has account ACLs; if not, skip test
+ if not cluster_info.get('tempauth', {}).get('account_acls'):
+ raise SkipTest
+ if 'keystoneauth' in cluster_info:
+ # remove when keystoneauth supports account acls
+ raise SkipTest
+ reset_acl()
+ try:
+ rv = f(*args, **kwargs)
+ finally:
+ reset_acl()
+ return rv
+ return wrapper
diff --git a/test/functional/test_account.py b/test/functional/test_account.py
index d456090..1cc61bc 100755
--- a/test/functional/test_account.py
+++ b/test/functional/test_account.py
@@ -17,19 +17,57 @@
import unittest
import json
+from uuid import uuid4
from nose import SkipTest
+from string import letters
from swift.common.constraints import MAX_META_COUNT, MAX_META_NAME_LENGTH, \
MAX_META_OVERALL_SIZE, MAX_META_VALUE_LENGTH
from swift.common.middleware.acl import format_acl
-from test.functional.swift_test_client import Connection
-from test import get_config
-from swift_testing import check_response, retry, skip, web_front_end
+from swift_testing import (check_response, retry, skip, skip2, skip3,
+ web_front_end, requires_acls)
import swift_testing
+from test.functional.tests import load_constraint
class TestAccount(unittest.TestCase):
+ def setUp(self):
+ self.max_meta_count = load_constraint('max_meta_count')
+ self.max_meta_name_length = load_constraint('max_meta_name_length')
+ self.max_meta_overall_size = load_constraint('max_meta_overall_size')
+ self.max_meta_value_length = load_constraint('max_meta_value_length')
+
+ def head(url, token, parsed, conn):
+ conn.request('HEAD', parsed.path, '', {'X-Auth-Token': token})
+ return check_response(conn)
+ resp = retry(head)
+ self.existing_metadata = set([
+ k for k, v in resp.getheaders() if
+ k.lower().startswith('x-account-meta')])
+
+ def tearDown(self):
+ def head(url, token, parsed, conn):
+ conn.request('HEAD', parsed.path, '', {'X-Auth-Token': token})
+ return check_response(conn)
+ resp = retry(head)
+ resp.read()
+ new_metadata = set(
+ [k for k, v in resp.getheaders() if
+ k.lower().startswith('x-account-meta')])
+
+ def clear_meta(url, token, parsed, conn, remove_metadata_keys):
+ headers = {'X-Auth-Token': token}
+ headers.update((k, '') for k in remove_metadata_keys)
+ conn.request('POST', parsed.path, '', headers)
+ return check_response(conn)
+ extra_metadata = list(self.existing_metadata ^ new_metadata)
+ for i in range(0, len(extra_metadata), 90):
+ batch = extra_metadata[i:i + 90]
+ resp = retry(clear_meta, batch)
+ resp.read()
+ self.assertEqual(resp.status // 100, 2)
+
def test_metadata(self):
if skip:
raise SkipTest
@@ -49,49 +87,338 @@ class TestAccount(unittest.TestCase):
resp = retry(post, '')
resp.read()
- self.assertEquals(resp.status, 204)
+ self.assertEqual(resp.status, 204)
resp = retry(head)
resp.read()
self.assert_(resp.status in (200, 204), resp.status)
- self.assertEquals(resp.getheader('x-account-meta-test'), None)
+ self.assertEqual(resp.getheader('x-account-meta-test'), None)
resp = retry(get)
resp.read()
self.assert_(resp.status in (200, 204), resp.status)
- self.assertEquals(resp.getheader('x-account-meta-test'), None)
+ self.assertEqual(resp.getheader('x-account-meta-test'), None)
resp = retry(post, 'Value')
resp.read()
- self.assertEquals(resp.status, 204)
+ self.assertEqual(resp.status, 204)
resp = retry(head)
resp.read()
self.assert_(resp.status in (200, 204), resp.status)
- self.assertEquals(resp.getheader('x-account-meta-test'), 'Value')
+ self.assertEqual(resp.getheader('x-account-meta-test'), 'Value')
resp = retry(get)
resp.read()
self.assert_(resp.status in (200, 204), resp.status)
- self.assertEquals(resp.getheader('x-account-meta-test'), 'Value')
+ self.assertEqual(resp.getheader('x-account-meta-test'), 'Value')
- def test_tempauth_account_acls(self):
- if skip:
+ def test_invalid_acls(self):
+ def post(url, token, parsed, conn, headers):
+ new_headers = dict({'X-Auth-Token': token}, **headers)
+ conn.request('POST', parsed.path, '', new_headers)
+ return check_response(conn)
+
+ # needs to be an acceptable header size
+ num_keys = 8
+ max_key_size = load_constraint('max_header_size') / num_keys
+ acl = {'admin': [c * max_key_size for c in letters[:num_keys]]}
+ headers = {'x-account-access-control': format_acl(
+ version=2, acl_dict=acl)}
+ resp = retry(post, headers=headers, use_account=1)
+ resp.read()
+ self.assertEqual(resp.status, 400)
+
+ # and again a touch smaller
+ acl = {'admin': [c * max_key_size for c in letters[:num_keys - 1]]}
+ headers = {'x-account-access-control': format_acl(
+ version=2, acl_dict=acl)}
+ resp = retry(post, headers=headers, use_account=1)
+ resp.read()
+ self.assertEqual(resp.status, 204)
+
+ @requires_acls
+ def test_invalid_acl_keys(self):
+ def post(url, token, parsed, conn, headers):
+ new_headers = dict({'X-Auth-Token': token}, **headers)
+ conn.request('POST', parsed.path, '', new_headers)
+ return check_response(conn)
+
+ # needs to be json
+ resp = retry(post, headers={'X-Account-Access-Control': 'invalid'},
+ use_account=1)
+ resp.read()
+ self.assertEqual(resp.status, 400)
+
+ acl_user = swift_testing.swift_test_user[1]
+ acl = {'admin': [acl_user], 'invalid_key': 'invalid_value'}
+ headers = {'x-account-access-control': format_acl(
+ version=2, acl_dict=acl)}
+
+ resp = retry(post, headers, use_account=1)
+ resp.read()
+ self.assertEqual(resp.status, 400)
+ self.assertEqual(resp.getheader('X-Account-Access-Control'), None)
+
+ @requires_acls
+ def test_invalid_acl_values(self):
+ def post(url, token, parsed, conn, headers):
+ new_headers = dict({'X-Auth-Token': token}, **headers)
+ conn.request('POST', parsed.path, '', new_headers)
+ return check_response(conn)
+
+ acl = {'admin': 'invalid_value'}
+ headers = {'x-account-access-control': format_acl(
+ version=2, acl_dict=acl)}
+
+ resp = retry(post, headers=headers, use_account=1)
+ resp.read()
+ self.assertEqual(resp.status, 400)
+ self.assertEqual(resp.getheader('X-Account-Access-Control'), None)
+
+ @requires_acls
+ def test_read_only_acl(self):
+ if skip3:
raise SkipTest
- # Determine whether this cluster has account ACLs; if not, skip test
- conn = Connection(get_config('func_test'))
- conn.authenticate()
- status = conn.make_request(
- 'GET', '/info', cfg={'verbatim_path': True})
- if status // 100 != 2:
- # Can't tell if account ACLs are enabled; skip tests proactively.
+ def get(url, token, parsed, conn):
+ conn.request('GET', parsed.path, '', {'X-Auth-Token': token})
+ return check_response(conn)
+
+ def post(url, token, parsed, conn, headers):
+ new_headers = dict({'X-Auth-Token': token}, **headers)
+ conn.request('POST', parsed.path, '', new_headers)
+ return check_response(conn)
+
+ # cannot read account
+ resp = retry(get, use_account=3)
+ resp.read()
+ self.assertEquals(resp.status, 403)
+
+ # grant read access
+ acl_user = swift_testing.swift_test_user[2]
+ acl = {'read-only': [acl_user]}
+ headers = {'x-account-access-control': format_acl(
+ version=2, acl_dict=acl)}
+ resp = retry(post, headers=headers, use_account=1)
+ resp.read()
+ self.assertEqual(resp.status, 204)
+
+ # read-only can read account headers
+ resp = retry(get, use_account=3)
+ resp.read()
+ self.assert_(resp.status in (200, 204))
+ # but not acls
+ self.assertEqual(resp.getheader('X-Account-Access-Control'), None)
+
+ # read-only can not write metadata
+ headers = {'x-account-meta-test': 'value'}
+ resp = retry(post, headers=headers, use_account=3)
+ resp.read()
+ self.assertEqual(resp.status, 403)
+
+ # but they can read it
+ headers = {'x-account-meta-test': 'value'}
+ resp = retry(post, headers=headers, use_account=1)
+ resp.read()
+ self.assertEqual(resp.status, 204)
+ resp = retry(get, use_account=3)
+ resp.read()
+ self.assert_(resp.status in (200, 204))
+ self.assertEqual(resp.getheader('X-Account-Meta-Test'), 'value')
+
+ @requires_acls
+ def test_read_write_acl(self):
+ if skip3:
+ raise SkipTest
+
+ def get(url, token, parsed, conn):
+ conn.request('GET', parsed.path, '', {'X-Auth-Token': token})
+ return check_response(conn)
+
+ def post(url, token, parsed, conn, headers):
+ new_headers = dict({'X-Auth-Token': token}, **headers)
+ conn.request('POST', parsed.path, '', new_headers)
+ return check_response(conn)
+
+ # cannot read account
+ resp = retry(get, use_account=3)
+ resp.read()
+ self.assertEquals(resp.status, 403)
+
+ # grant read-write access
+ acl_user = swift_testing.swift_test_user[2]
+ acl = {'read-write': [acl_user]}
+ headers = {'x-account-access-control': format_acl(
+ version=2, acl_dict=acl)}
+ resp = retry(post, headers=headers, use_account=1)
+ resp.read()
+ self.assertEqual(resp.status, 204)
+
+ # read-write can read account headers
+ resp = retry(get, use_account=3)
+ resp.read()
+ self.assert_(resp.status in (200, 204))
+ # but not acls
+ self.assertEqual(resp.getheader('X-Account-Access-Control'), None)
+
+ # read-write can not write account metadata
+ headers = {'x-account-meta-test': 'value'}
+ resp = retry(post, headers=headers, use_account=3)
+ resp.read()
+ self.assertEqual(resp.status, 403)
+
+ @requires_acls
+ def test_admin_acl(self):
+ if skip3:
+ raise SkipTest
+
+ def get(url, token, parsed, conn):
+ conn.request('GET', parsed.path, '', {'X-Auth-Token': token})
+ return check_response(conn)
+
+ def post(url, token, parsed, conn, headers):
+ new_headers = dict({'X-Auth-Token': token}, **headers)
+ conn.request('POST', parsed.path, '', new_headers)
+ return check_response(conn)
+
+ # cannot read account
+ resp = retry(get, use_account=3)
+ resp.read()
+ self.assertEquals(resp.status, 403)
+
+ # grant admin access
+ acl_user = swift_testing.swift_test_user[2]
+ acl = {'admin': [acl_user]}
+ acl_json_str = format_acl(version=2, acl_dict=acl)
+ headers = {'x-account-access-control': acl_json_str}
+ resp = retry(post, headers=headers, use_account=1)
+ resp.read()
+ self.assertEqual(resp.status, 204)
+
+ # admin can read account headers
+ resp = retry(get, use_account=3)
+ resp.read()
+ self.assert_(resp.status in (200, 204))
+ # including acls
+ self.assertEqual(resp.getheader('X-Account-Access-Control'),
+ acl_json_str)
+
+ # admin can write account metadata
+ value = str(uuid4())
+ headers = {'x-account-meta-test': value}
+ resp = retry(post, headers=headers, use_account=3)
+ resp.read()
+ self.assertEqual(resp.status, 204)
+ resp = retry(get, use_account=3)
+ resp.read()
+ self.assert_(resp.status in (200, 204))
+ self.assertEqual(resp.getheader('X-Account-Meta-Test'), value)
+
+ # admin can even revoke their own access
+ headers = {'x-account-access-control': '{}'}
+ resp = retry(post, headers=headers, use_account=3)
+ resp.read()
+ self.assertEqual(resp.status, 204)
+
+ # and again, cannot read account
+ resp = retry(get, use_account=3)
+ resp.read()
+ self.assertEquals(resp.status, 403)
+
+ @requires_acls
+ def test_protected_tempurl(self):
+ if skip3:
+ raise SkipTest
+
+ def get(url, token, parsed, conn):
+ conn.request('GET', parsed.path, '', {'X-Auth-Token': token})
+ return check_response(conn)
+
+ def post(url, token, parsed, conn, headers):
+ new_headers = dict({'X-Auth-Token': token}, **headers)
+ conn.request('POST', parsed.path, '', new_headers)
+ return check_response(conn)
+
+ # add a account metadata, and temp-url-key to account
+ value = str(uuid4())
+ headers = {
+ 'x-account-meta-temp-url-key': 'secret',
+ 'x-account-meta-test': value,
+ }
+ resp = retry(post, headers=headers, use_account=1)
+ resp.read()
+ self.assertEqual(resp.status, 204)
+
+ # grant read-only access to tester3
+ acl_user = swift_testing.swift_test_user[2]
+ acl = {'read-only': [acl_user]}
+ acl_json_str = format_acl(version=2, acl_dict=acl)
+ headers = {'x-account-access-control': acl_json_str}
+ resp = retry(post, headers=headers, use_account=1)
+ resp.read()
+ self.assertEqual(resp.status, 204)
+
+ # read-only tester3 can read account metadata
+ resp = retry(get, use_account=3)
+ resp.read()
+ self.assert_(resp.status in (200, 204),
+ 'Expected status in (200, 204), got %s' % resp.status)
+ self.assertEqual(resp.getheader('X-Account-Meta-Test'), value)
+ # but not temp-url-key
+ self.assertEqual(resp.getheader('X-Account-Meta-Temp-Url-Key'), None)
+
+ # grant read-write access to tester3
+ acl_user = swift_testing.swift_test_user[2]
+ acl = {'read-write': [acl_user]}
+ acl_json_str = format_acl(version=2, acl_dict=acl)
+ headers = {'x-account-access-control': acl_json_str}
+ resp = retry(post, headers=headers, use_account=1)
+ resp.read()
+ self.assertEqual(resp.status, 204)
+
+ # read-write tester3 can read account metadata
+ resp = retry(get, use_account=3)
+ resp.read()
+ self.assert_(resp.status in (200, 204),
+ 'Expected status in (200, 204), got %s' % resp.status)
+ self.assertEqual(resp.getheader('X-Account-Meta-Test'), value)
+ # but not temp-url-key
+ self.assertEqual(resp.getheader('X-Account-Meta-Temp-Url-Key'), None)
+
+ # grant admin access to tester3
+ acl_user = swift_testing.swift_test_user[2]
+ acl = {'admin': [acl_user]}
+ acl_json_str = format_acl(version=2, acl_dict=acl)
+ headers = {'x-account-access-control': acl_json_str}
+ resp = retry(post, headers=headers, use_account=1)
+ resp.read()
+ self.assertEqual(resp.status, 204)
+
+ # admin tester3 can read account metadata
+ resp = retry(get, use_account=3)
+ resp.read()
+ self.assert_(resp.status in (200, 204),
+ 'Expected status in (200, 204), got %s' % resp.status)
+ self.assertEqual(resp.getheader('X-Account-Meta-Test'), value)
+ # including temp-url-key
+ self.assertEqual(resp.getheader('X-Account-Meta-Temp-Url-Key'),
+ 'secret')
+
+ # admin tester3 can even change temp-url-key
+ secret = str(uuid4())
+ headers = {
+ 'x-account-meta-temp-url-key': secret,
+ }
+ resp = retry(post, headers=headers, use_account=3)
+ resp.read()
+ self.assertEqual(resp.status, 204)
+ resp = retry(get, use_account=3)
+ resp.read()
+ self.assert_(resp.status in (200, 204),
+ 'Expected status in (200, 204), got %s' % resp.status)
+ self.assertEqual(resp.getheader('X-Account-Meta-Temp-Url-Key'),
+ secret)
+
+ @requires_acls
+ def test_account_acls(self):
+ if skip2:
raise SkipTest
- else:
- cluster_info = json.loads(conn.response.read())
- if not cluster_info.get('tempauth', {}).get('account_acls'):
- raise SkipTest
- if 'keystoneauth' in cluster_info:
- # Unfortunate hack -- tempauth (with account ACLs) is expected
- # to play nice with Keystone (without account ACLs), but Zuul
- # functest framework doesn't give us an easy way to get a
- # tempauth user.
- raise SkipTest
def post(url, token, parsed, conn, headers):
new_headers = dict({'X-Auth-Token': token}, **headers)
@@ -212,6 +539,137 @@ class TestAccount(unittest.TestCase):
use_account=1)
resp.read()
+ @requires_acls
+ def test_swift_account_acls(self):
+ if skip:
+ raise SkipTest
+
+ def post(url, token, parsed, conn, headers):
+ new_headers = dict({'X-Auth-Token': token}, **headers)
+ conn.request('POST', parsed.path, '', new_headers)
+ return check_response(conn)
+
+ def head(url, token, parsed, conn):
+ conn.request('HEAD', parsed.path, '', {'X-Auth-Token': token})
+ return check_response(conn)
+
+ def get(url, token, parsed, conn):
+ conn.request('GET', parsed.path, '', {'X-Auth-Token': token})
+ return check_response(conn)
+
+ try:
+ # User1 can POST to their own account
+ resp = retry(post, headers={'X-Account-Access-Control': '{}'})
+ resp.read()
+ self.assertEqual(resp.status, 204)
+ self.assertEqual(resp.getheader('X-Account-Access-Control'), None)
+
+ # User1 can GET their own empty account
+ resp = retry(get)
+ resp.read()
+ self.assertEqual(resp.status // 100, 2)
+ self.assertEqual(resp.getheader('X-Account-Access-Control'), None)
+
+ # User1 can POST non-empty data
+ acl_json = '{"admin":["bob"]}'
+ resp = retry(post, headers={'X-Account-Access-Control': acl_json})
+ resp.read()
+ self.assertEqual(resp.status, 204)
+
+ # User1 can GET the non-empty data
+ resp = retry(get)
+ resp.read()
+ self.assertEqual(resp.status // 100, 2)
+ self.assertEqual(resp.getheader('X-Account-Access-Control'),
+ acl_json)
+
+ # POST non-JSON ACL should fail
+ resp = retry(post, headers={'X-Account-Access-Control': 'yuck'})
+ resp.read()
+ # resp.status will be 400 if tempauth or some other ACL-aware
+ # auth middleware rejects it, or 200 (but silently swallowed by
+ # core Swift) if ACL-unaware auth middleware approves it.
+
+ # A subsequent GET should show the old, valid data, not the garbage
+ resp = retry(get)
+ resp.read()
+ self.assertEqual(resp.status // 100, 2)
+ self.assertEqual(resp.getheader('X-Account-Access-Control'),
+ acl_json)
+
+ finally:
+ # Make sure to clean up even if tests fail -- User2 should not
+ # have access to User1's account in other functional tests!
+ resp = retry(post, headers={'X-Account-Access-Control': '{}'})
+ resp.read()
+
+ def test_swift_prohibits_garbage_account_acls(self):
+ if skip:
+ raise SkipTest
+
+ def post(url, token, parsed, conn, headers):
+ new_headers = dict({'X-Auth-Token': token}, **headers)
+ conn.request('POST', parsed.path, '', new_headers)
+ return check_response(conn)
+
+ def get(url, token, parsed, conn):
+ conn.request('GET', parsed.path, '', {'X-Auth-Token': token})
+ return check_response(conn)
+
+ try:
+ # User1 can POST to their own account
+ resp = retry(post, headers={'X-Account-Access-Control': '{}'})
+ resp.read()
+ self.assertEqual(resp.status, 204)
+ self.assertEqual(resp.getheader('X-Account-Access-Control'), None)
+
+ # User1 can GET their own empty account
+ resp = retry(get)
+ resp.read()
+ self.assertEqual(resp.status // 100, 2)
+ self.assertEqual(resp.getheader('X-Account-Access-Control'), None)
+
+ # User1 can POST non-empty data
+ acl_json = '{"admin":["bob"]}'
+ resp = retry(post, headers={'X-Account-Access-Control': acl_json})
+ resp.read()
+ self.assertEqual(resp.status, 204)
+ # If this request is handled by ACL-aware auth middleware, then the
+ # ACL will be persisted. If it is handled by ACL-unaware auth
+ # middleware, then the header will be thrown out. But the request
+ # should return successfully in any case.
+
+ # User1 can GET the non-empty data
+ resp = retry(get)
+ resp.read()
+ self.assertEqual(resp.status // 100, 2)
+ # ACL will be set if some ACL-aware auth middleware (e.g. tempauth)
+ # propagates it to sysmeta; if no ACL-aware auth middleware does,
+ # then X-Account-Access-Control will still be empty.
+
+ # POST non-JSON ACL should fail
+ resp = retry(post, headers={'X-Account-Access-Control': 'yuck'})
+ resp.read()
+ # resp.status will be 400 if tempauth or some other ACL-aware
+ # auth middleware rejects it, or 200 (but silently swallowed by
+ # core Swift) if ACL-unaware auth middleware approves it.
+
+ # A subsequent GET should either show the old, valid data (if
+ # ACL-aware auth middleware is propagating it) or show nothing
+ # (if no auth middleware in the pipeline is ACL-aware), but should
+ # never return the garbage ACL.
+ resp = retry(get)
+ resp.read()
+ self.assertEqual(resp.status // 100, 2)
+ self.assertNotEqual(resp.getheader('X-Account-Access-Control'),
+ 'yuck')
+
+ finally:
+ # Make sure to clean up even if tests fail -- User2 should not
+ # have access to User1's account in other functional tests!
+ resp = retry(post, headers={'X-Account-Access-Control': '{}'})
+ resp.read()
+
def test_unicode_metadata(self):
if skip:
raise SkipTest
@@ -233,24 +691,24 @@ class TestAccount(unittest.TestCase):
resp = retry(head)
resp.read()
self.assert_(resp.status in (200, 204), resp.status)
- self.assertEquals(resp.getheader(uni_key.encode('utf-8')), '1')
+ self.assertEqual(resp.getheader(uni_key.encode('utf-8')), '1')
resp = retry(post, 'X-Account-Meta-uni', uni_value)
resp.read()
- self.assertEquals(resp.status, 204)
+ self.assertEqual(resp.status, 204)
resp = retry(head)
resp.read()
self.assert_(resp.status in (200, 204), resp.status)
- self.assertEquals(resp.getheader('X-Account-Meta-uni'),
- uni_value.encode('utf-8'))
+ self.assertEqual(resp.getheader('X-Account-Meta-uni'),
+ uni_value.encode('utf-8'))
if (web_front_end == 'integral'):
resp = retry(post, uni_key, uni_value)
resp.read()
- self.assertEquals(resp.status, 204)
+ self.assertEqual(resp.status, 204)
resp = retry(head)
resp.read()
self.assert_(resp.status in (200, 204), resp.status)
- self.assertEquals(resp.getheader(uni_key.encode('utf-8')),
- uni_value.encode('utf-8'))
+ self.assertEqual(resp.getheader(uni_key.encode('utf-8')),
+ uni_value.encode('utf-8'))
def test_multi_metadata(self):
if skip:
@@ -267,19 +725,19 @@ class TestAccount(unittest.TestCase):
resp = retry(post, 'X-Account-Meta-One', '1')
resp.read()
- self.assertEquals(resp.status, 204)
+ self.assertEqual(resp.status, 204)
resp = retry(head)
resp.read()
self.assert_(resp.status in (200, 204), resp.status)
- self.assertEquals(resp.getheader('x-account-meta-one'), '1')
+ self.assertEqual(resp.getheader('x-account-meta-one'), '1')
resp = retry(post, 'X-Account-Meta-Two', '2')
resp.read()
- self.assertEquals(resp.status, 204)
+ self.assertEqual(resp.status, 204)
resp = retry(head)
resp.read()
self.assert_(resp.status in (200, 204), resp.status)
- self.assertEquals(resp.getheader('x-account-meta-one'), '1')
- self.assertEquals(resp.getheader('x-account-meta-two'), '2')
+ self.assertEqual(resp.getheader('x-account-meta-one'), '1')
+ self.assertEqual(resp.getheader('x-account-meta-two'), '2')
def test_bad_metadata(self):
if skip:
@@ -294,35 +752,65 @@ class TestAccount(unittest.TestCase):
resp = retry(post,
{'X-Account-Meta-' + ('k' * MAX_META_NAME_LENGTH): 'v'})
resp.read()
- self.assertEquals(resp.status, 204)
+ self.assertEqual(resp.status, 204)
resp = retry(
post,
{'X-Account-Meta-' + ('k' * (MAX_META_NAME_LENGTH + 1)): 'v'})
resp.read()
- self.assertEquals(resp.status, 400)
+ self.assertEqual(resp.status, 400)
resp = retry(post,
{'X-Account-Meta-Too-Long': 'k' * MAX_META_VALUE_LENGTH})
resp.read()
- self.assertEquals(resp.status, 204)
+ self.assertEqual(resp.status, 204)
resp = retry(
post,
{'X-Account-Meta-Too-Long': 'k' * (MAX_META_VALUE_LENGTH + 1)})
resp.read()
- self.assertEquals(resp.status, 400)
+ self.assertEqual(resp.status, 400)
+
+ def test_bad_metadata2(self):
+ if skip:
+ raise SkipTest
+
+ def post(url, token, parsed, conn, extra_headers):
+ headers = {'X-Auth-Token': token}
+ headers.update(extra_headers)
+ conn.request('POST', parsed.path, '', headers)
+ return check_response(conn)
+
+ # TODO: Find the test that adds these and remove them.
+ headers = {'x-remove-account-meta-temp-url-key': 'remove',
+ 'x-remove-account-meta-temp-url-key-2': 'remove'}
+ resp = retry(post, headers)
headers = {}
for x in xrange(MAX_META_COUNT):
headers['X-Account-Meta-%d' % x] = 'v'
resp = retry(post, headers)
resp.read()
- self.assertEquals(resp.status, 204)
+ self.assertEqual(resp.status, 204)
headers = {}
for x in xrange(MAX_META_COUNT + 1):
headers['X-Account-Meta-%d' % x] = 'v'
resp = retry(post, headers)
resp.read()
- self.assertEquals(resp.status, 400)
+ self.assertEqual(resp.status, 400)
+
+ def test_bad_metadata3(self):
+ if skip:
+ raise SkipTest
+
+ def post(url, token, parsed, conn, extra_headers):
+ headers = {'X-Auth-Token': token}
+ headers.update(extra_headers)
+ conn.request('POST', parsed.path, '', headers)
+ return check_response(conn)
+
+ # TODO: Find the test that adds these and remove them.
+ headers = {'x-remove-account-meta-temp-url-key': 'remove',
+ 'x-remove-account-meta-temp-url-key-2': 'remove'}
+ resp = retry(post, headers)
headers = {}
header_value = 'k' * MAX_META_VALUE_LENGTH
@@ -337,12 +825,12 @@ class TestAccount(unittest.TestCase):
'v' * (MAX_META_OVERALL_SIZE - size - 1)
resp = retry(post, headers)
resp.read()
- self.assertEquals(resp.status, 204)
+ self.assertEqual(resp.status, 204)
headers['X-Account-Meta-k'] = \
'v' * (MAX_META_OVERALL_SIZE - size)
resp = retry(post, headers)
resp.read()
- self.assertEquals(resp.status, 400)
+ self.assertEqual(resp.status, 400)
if __name__ == '__main__':
diff --git a/test/functional/test_container.py b/test/functional/test_container.py
index 15f7fc1..91702e9 100755
--- a/test/functional/test_container.py
+++ b/test/functional/test_container.py
@@ -24,7 +24,7 @@ from swift.common.constraints import MAX_META_COUNT, MAX_META_NAME_LENGTH, \
MAX_META_OVERALL_SIZE, MAX_META_VALUE_LENGTH
from swift_testing import check_response, retry, skip, skip2, skip3, \
- swift_test_perm, web_front_end
+ swift_test_perm, web_front_end, requires_acls, swift_test_user
class TestContainer(unittest.TestCase):
@@ -41,7 +41,7 @@ class TestContainer(unittest.TestCase):
resp = retry(put)
resp.read()
- self.assertEquals(resp.status, 201)
+ self.assertEqual(resp.status, 201)
def tearDown(self):
if skip:
@@ -68,7 +68,7 @@ class TestContainer(unittest.TestCase):
for obj in objs:
resp = retry(delete, obj)
resp.read()
- self.assertEquals(resp.status, 204)
+ self.assertEqual(resp.status, 204)
def delete(url, token, parsed, conn):
conn.request('DELETE', parsed.path + '/' + self.name, '',
@@ -77,7 +77,7 @@ class TestContainer(unittest.TestCase):
resp = retry(delete)
resp.read()
- self.assertEquals(resp.status, 204)
+ self.assertEqual(resp.status, 204)
def test_multi_metadata(self):
if skip:
@@ -95,19 +95,19 @@ class TestContainer(unittest.TestCase):
resp = retry(post, 'X-Container-Meta-One', '1')
resp.read()
- self.assertEquals(resp.status, 204)
+ self.assertEqual(resp.status, 204)
resp = retry(head)
resp.read()
self.assert_(resp.status in (200, 204), resp.status)
- self.assertEquals(resp.getheader('x-container-meta-one'), '1')
+ self.assertEqual(resp.getheader('x-container-meta-one'), '1')
resp = retry(post, 'X-Container-Meta-Two', '2')
resp.read()
- self.assertEquals(resp.status, 204)
+ self.assertEqual(resp.status, 204)
resp = retry(head)
resp.read()
self.assert_(resp.status in (200, 204), resp.status)
- self.assertEquals(resp.getheader('x-container-meta-one'), '1')
- self.assertEquals(resp.getheader('x-container-meta-two'), '2')
+ self.assertEqual(resp.getheader('x-container-meta-one'), '1')
+ self.assertEqual(resp.getheader('x-container-meta-two'), '2')
def test_unicode_metadata(self):
if skip:
@@ -128,28 +128,28 @@ class TestContainer(unittest.TestCase):
if (web_front_end == 'integral'):
resp = retry(post, uni_key, '1')
resp.read()
- self.assertEquals(resp.status, 204)
+ self.assertEqual(resp.status, 204)
resp = retry(head)
resp.read()
self.assert_(resp.status in (200, 204), resp.status)
- self.assertEquals(resp.getheader(uni_key.encode('utf-8')), '1')
+ self.assertEqual(resp.getheader(uni_key.encode('utf-8')), '1')
resp = retry(post, 'X-Container-Meta-uni', uni_value)
resp.read()
- self.assertEquals(resp.status, 204)
+ self.assertEqual(resp.status, 204)
resp = retry(head)
resp.read()
self.assert_(resp.status in (200, 204), resp.status)
- self.assertEquals(resp.getheader('X-Container-Meta-uni'),
- uni_value.encode('utf-8'))
+ self.assertEqual(resp.getheader('X-Container-Meta-uni'),
+ uni_value.encode('utf-8'))
if (web_front_end == 'integral'):
resp = retry(post, uni_key, uni_value)
resp.read()
- self.assertEquals(resp.status, 204)
+ self.assertEqual(resp.status, 204)
resp = retry(head)
resp.read()
self.assert_(resp.status in (200, 204), resp.status)
- self.assertEquals(resp.getheader(uni_key.encode('utf-8')),
- uni_value.encode('utf-8'))
+ self.assertEqual(resp.getheader(uni_key.encode('utf-8')),
+ uni_value.encode('utf-8'))
def test_PUT_metadata(self):
if skip:
@@ -179,34 +179,34 @@ class TestContainer(unittest.TestCase):
name = uuid4().hex
resp = retry(put, name, 'Value')
resp.read()
- self.assertEquals(resp.status, 201)
+ self.assertEqual(resp.status, 201)
resp = retry(head, name)
resp.read()
self.assert_(resp.status in (200, 204), resp.status)
- self.assertEquals(resp.getheader('x-container-meta-test'), 'Value')
+ self.assertEqual(resp.getheader('x-container-meta-test'), 'Value')
resp = retry(get, name)
resp.read()
self.assert_(resp.status in (200, 204), resp.status)
- self.assertEquals(resp.getheader('x-container-meta-test'), 'Value')
+ self.assertEqual(resp.getheader('x-container-meta-test'), 'Value')
resp = retry(delete, name)
resp.read()
- self.assertEquals(resp.status, 204)
+ self.assertEqual(resp.status, 204)
name = uuid4().hex
resp = retry(put, name, '')
resp.read()
- self.assertEquals(resp.status, 201)
+ self.assertEqual(resp.status, 201)
resp = retry(head, name)
resp.read()
self.assert_(resp.status in (200, 204), resp.status)
- self.assertEquals(resp.getheader('x-container-meta-test'), None)
+ self.assertEqual(resp.getheader('x-container-meta-test'), None)
resp = retry(get, name)
resp.read()
self.assert_(resp.status in (200, 204), resp.status)
- self.assertEquals(resp.getheader('x-container-meta-test'), None)
+ self.assertEqual(resp.getheader('x-container-meta-test'), None)
resp = retry(delete, name)
resp.read()
- self.assertEquals(resp.status, 204)
+ self.assertEqual(resp.status, 204)
def test_POST_metadata(self):
if skip:
@@ -231,22 +231,22 @@ class TestContainer(unittest.TestCase):
resp = retry(head)
resp.read()
self.assert_(resp.status in (200, 204), resp.status)
- self.assertEquals(resp.getheader('x-container-meta-test'), None)
+ self.assertEqual(resp.getheader('x-container-meta-test'), None)
resp = retry(get)
resp.read()
self.assert_(resp.status in (200, 204), resp.status)
- self.assertEquals(resp.getheader('x-container-meta-test'), None)
+ self.assertEqual(resp.getheader('x-container-meta-test'), None)
resp = retry(post, 'Value')
resp.read()
- self.assertEquals(resp.status, 204)
+ self.assertEqual(resp.status, 204)
resp = retry(head)
resp.read()
self.assert_(resp.status in (200, 204), resp.status)
- self.assertEquals(resp.getheader('x-container-meta-test'), 'Value')
+ self.assertEqual(resp.getheader('x-container-meta-test'), 'Value')
resp = retry(get)
resp.read()
self.assert_(resp.status in (200, 204), resp.status)
- self.assertEquals(resp.getheader('x-container-meta-test'), 'Value')
+ self.assertEqual(resp.getheader('x-container-meta-test'), 'Value')
def test_PUT_bad_metadata(self):
if skip:
@@ -268,38 +268,38 @@ class TestContainer(unittest.TestCase):
put, name,
{'X-Container-Meta-' + ('k' * MAX_META_NAME_LENGTH): 'v'})
resp.read()
- self.assertEquals(resp.status, 201)
+ self.assertEqual(resp.status, 201)
resp = retry(delete, name)
resp.read()
- self.assertEquals(resp.status, 204)
+ self.assertEqual(resp.status, 204)
name = uuid4().hex
resp = retry(
put, name,
{'X-Container-Meta-' + ('k' * (MAX_META_NAME_LENGTH + 1)): 'v'})
resp.read()
- self.assertEquals(resp.status, 400)
+ self.assertEqual(resp.status, 400)
resp = retry(delete, name)
resp.read()
- self.assertEquals(resp.status, 404)
+ self.assertEqual(resp.status, 404)
name = uuid4().hex
resp = retry(
put, name,
{'X-Container-Meta-Too-Long': 'k' * MAX_META_VALUE_LENGTH})
resp.read()
- self.assertEquals(resp.status, 201)
+ self.assertEqual(resp.status, 201)
resp = retry(delete, name)
resp.read()
- self.assertEquals(resp.status, 204)
+ self.assertEqual(resp.status, 204)
name = uuid4().hex
resp = retry(
put, name,
{'X-Container-Meta-Too-Long': 'k' * (MAX_META_VALUE_LENGTH + 1)})
resp.read()
- self.assertEquals(resp.status, 400)
+ self.assertEqual(resp.status, 400)
resp = retry(delete, name)
resp.read()
- self.assertEquals(resp.status, 404)
+ self.assertEqual(resp.status, 404)
name = uuid4().hex
headers = {}
@@ -307,20 +307,20 @@ class TestContainer(unittest.TestCase):
headers['X-Container-Meta-%d' % x] = 'v'
resp = retry(put, name, headers)
resp.read()
- self.assertEquals(resp.status, 201)
+ self.assertEqual(resp.status, 201)
resp = retry(delete, name)
resp.read()
- self.assertEquals(resp.status, 204)
+ self.assertEqual(resp.status, 204)
name = uuid4().hex
headers = {}
for x in xrange(MAX_META_COUNT + 1):
headers['X-Container-Meta-%d' % x] = 'v'
resp = retry(put, name, headers)
resp.read()
- self.assertEquals(resp.status, 400)
+ self.assertEqual(resp.status, 400)
resp = retry(delete, name)
resp.read()
- self.assertEquals(resp.status, 404)
+ self.assertEqual(resp.status, 404)
name = uuid4().hex
headers = {}
@@ -336,19 +336,19 @@ class TestContainer(unittest.TestCase):
'v' * (MAX_META_OVERALL_SIZE - size - 1)
resp = retry(put, name, headers)
resp.read()
- self.assertEquals(resp.status, 201)
+ self.assertEqual(resp.status, 201)
resp = retry(delete, name)
resp.read()
- self.assertEquals(resp.status, 204)
+ self.assertEqual(resp.status, 204)
name = uuid4().hex
headers['X-Container-Meta-k'] = \
'v' * (MAX_META_OVERALL_SIZE - size)
resp = retry(put, name, headers)
resp.read()
- self.assertEquals(resp.status, 400)
+ self.assertEqual(resp.status, 400)
resp = retry(delete, name)
resp.read()
- self.assertEquals(resp.status, 404)
+ self.assertEqual(resp.status, 404)
def test_POST_bad_metadata(self):
if skip:
@@ -364,36 +364,56 @@ class TestContainer(unittest.TestCase):
post,
{'X-Container-Meta-' + ('k' * MAX_META_NAME_LENGTH): 'v'})
resp.read()
- self.assertEquals(resp.status, 204)
+ self.assertEqual(resp.status, 204)
resp = retry(
post,
{'X-Container-Meta-' + ('k' * (MAX_META_NAME_LENGTH + 1)): 'v'})
resp.read()
- self.assertEquals(resp.status, 400)
+ self.assertEqual(resp.status, 400)
resp = retry(
post,
{'X-Container-Meta-Too-Long': 'k' * MAX_META_VALUE_LENGTH})
resp.read()
- self.assertEquals(resp.status, 204)
+ self.assertEqual(resp.status, 204)
resp = retry(
post,
{'X-Container-Meta-Too-Long': 'k' * (MAX_META_VALUE_LENGTH + 1)})
resp.read()
- self.assertEquals(resp.status, 400)
+ self.assertEqual(resp.status, 400)
+
+ def test_POST_bad_metadata2(self):
+ if skip:
+ raise SkipTest
+
+ def post(url, token, parsed, conn, extra_headers):
+ headers = {'X-Auth-Token': token}
+ headers.update(extra_headers)
+ conn.request('POST', parsed.path + '/' + self.name, '', headers)
+ return check_response(conn)
headers = {}
for x in xrange(MAX_META_COUNT):
headers['X-Container-Meta-%d' % x] = 'v'
resp = retry(post, headers)
resp.read()
- self.assertEquals(resp.status, 204)
+ self.assertEqual(resp.status, 204)
headers = {}
for x in xrange(MAX_META_COUNT + 1):
headers['X-Container-Meta-%d' % x] = 'v'
resp = retry(post, headers)
resp.read()
- self.assertEquals(resp.status, 400)
+ self.assertEqual(resp.status, 400)
+
+ def test_POST_bad_metadata3(self):
+ if skip:
+ raise SkipTest
+
+ def post(url, token, parsed, conn, extra_headers):
+ headers = {'X-Auth-Token': token}
+ headers.update(extra_headers)
+ conn.request('POST', parsed.path + '/' + self.name, '', headers)
+ return check_response(conn)
headers = {}
header_value = 'k' * MAX_META_VALUE_LENGTH
@@ -408,12 +428,12 @@ class TestContainer(unittest.TestCase):
'v' * (MAX_META_OVERALL_SIZE - size - 1)
resp = retry(post, headers)
resp.read()
- self.assertEquals(resp.status, 204)
+ self.assertEqual(resp.status, 204)
headers['X-Container-Meta-k'] = \
'v' * (MAX_META_OVERALL_SIZE - size)
resp = retry(post, headers)
resp.read()
- self.assertEquals(resp.status, 400)
+ self.assertEqual(resp.status, 400)
def test_public_container(self):
if skip:
@@ -437,10 +457,10 @@ class TestContainer(unittest.TestCase):
resp = retry(post)
resp.read()
- self.assertEquals(resp.status, 204)
+ self.assertEqual(resp.status, 204)
resp = retry(get)
resp.read()
- self.assertEquals(resp.status, 204)
+ self.assertEqual(resp.status, 204)
def post(url, token, parsed, conn):
conn.request('POST', parsed.path + '/' + self.name, '',
@@ -449,7 +469,7 @@ class TestContainer(unittest.TestCase):
resp = retry(post)
resp.read()
- self.assertEquals(resp.status, 204)
+ self.assertEqual(resp.status, 204)
try:
resp = retry(get)
raise Exception('Should not have been able to GET')
@@ -479,7 +499,7 @@ class TestContainer(unittest.TestCase):
resp = retry(get2, use_account=2)
resp.read()
- self.assertEquals(resp.status, 403)
+ self.assertEqual(resp.status, 403)
# Make the container accessible by the second account
def post(url, token, parsed, conn):
@@ -491,11 +511,11 @@ class TestContainer(unittest.TestCase):
resp = retry(post)
resp.read()
- self.assertEquals(resp.status, 204)
+ self.assertEqual(resp.status, 204)
# Ensure we can now use the container with the second account
resp = retry(get2, use_account=2)
resp.read()
- self.assertEquals(resp.status, 204)
+ self.assertEqual(resp.status, 204)
# Make the container private again
def post(url, token, parsed, conn):
@@ -506,11 +526,11 @@ class TestContainer(unittest.TestCase):
resp = retry(post)
resp.read()
- self.assertEquals(resp.status, 204)
+ self.assertEqual(resp.status, 204)
# Ensure we can't access the container with the second account again
resp = retry(get2, use_account=2)
resp.read()
- self.assertEquals(resp.status, 403)
+ self.assertEqual(resp.status, 403)
def test_cross_account_public_container(self):
if skip or skip2:
@@ -535,7 +555,7 @@ class TestContainer(unittest.TestCase):
resp = retry(get2, use_account=2)
resp.read()
- self.assertEquals(resp.status, 403)
+ self.assertEqual(resp.status, 403)
# Make the container completely public
def post(url, token, parsed, conn):
@@ -546,11 +566,11 @@ class TestContainer(unittest.TestCase):
resp = retry(post)
resp.read()
- self.assertEquals(resp.status, 204)
+ self.assertEqual(resp.status, 204)
# Ensure we can now read the container with the second account
resp = retry(get2, use_account=2)
resp.read()
- self.assertEquals(resp.status, 204)
+ self.assertEqual(resp.status, 204)
# But we shouldn't be able to write with the second account
def put2(url, token, parsed, conn):
@@ -560,7 +580,7 @@ class TestContainer(unittest.TestCase):
resp = retry(put2, use_account=2)
resp.read()
- self.assertEquals(resp.status, 403)
+ self.assertEqual(resp.status, 403)
# Now make the container also writeable by the second account
def post(url, token, parsed, conn):
@@ -571,15 +591,15 @@ class TestContainer(unittest.TestCase):
resp = retry(post)
resp.read()
- self.assertEquals(resp.status, 204)
+ self.assertEqual(resp.status, 204)
# Ensure we can still read the container with the second account
resp = retry(get2, use_account=2)
resp.read()
- self.assertEquals(resp.status, 204)
+ self.assertEqual(resp.status, 204)
# And that we can now write with the second account
resp = retry(put2, use_account=2)
resp.read()
- self.assertEquals(resp.status, 201)
+ self.assertEqual(resp.status, 201)
def test_nonadmin_user(self):
if skip or skip3:
@@ -604,7 +624,7 @@ class TestContainer(unittest.TestCase):
resp = retry(get3, use_account=3)
resp.read()
- self.assertEquals(resp.status, 403)
+ self.assertEqual(resp.status, 403)
# Make the container accessible by the third account
def post(url, token, parsed, conn):
@@ -615,11 +635,11 @@ class TestContainer(unittest.TestCase):
resp = retry(post)
resp.read()
- self.assertEquals(resp.status, 204)
+ self.assertEqual(resp.status, 204)
# Ensure we can now read the container with the third account
resp = retry(get3, use_account=3)
resp.read()
- self.assertEquals(resp.status, 204)
+ self.assertEqual(resp.status, 204)
# But we shouldn't be able to write with the third account
def put3(url, token, parsed, conn):
@@ -629,7 +649,7 @@ class TestContainer(unittest.TestCase):
resp = retry(put3, use_account=3)
resp.read()
- self.assertEquals(resp.status, 403)
+ self.assertEqual(resp.status, 403)
# Now make the container also writeable by the third account
def post(url, token, parsed, conn):
@@ -640,15 +660,666 @@ class TestContainer(unittest.TestCase):
resp = retry(post)
resp.read()
- self.assertEquals(resp.status, 204)
+ self.assertEqual(resp.status, 204)
# Ensure we can still read the container with the third account
resp = retry(get3, use_account=3)
resp.read()
- self.assertEquals(resp.status, 204)
+ self.assertEqual(resp.status, 204)
# And that we can now write with the third account
resp = retry(put3, use_account=3)
resp.read()
+ self.assertEqual(resp.status, 201)
+
+ @requires_acls
+ def test_read_only_acl_listings(self):
+ if skip3:
+ raise SkipTest
+
+ def get(url, token, parsed, conn):
+ conn.request('GET', parsed.path, '', {'X-Auth-Token': token})
+ return check_response(conn)
+
+ def post_account(url, token, parsed, conn, headers):
+ new_headers = dict({'X-Auth-Token': token}, **headers)
+ conn.request('POST', parsed.path, '', new_headers)
+ return check_response(conn)
+
+ def put(url, token, parsed, conn, name):
+ conn.request('PUT', parsed.path + '/%s' % name, '',
+ {'X-Auth-Token': token})
+ return check_response(conn)
+
+ # cannot list containers
+ resp = retry(get, use_account=3)
+ resp.read()
+ self.assertEquals(resp.status, 403)
+
+ # grant read-only access
+ acl_user = swift_test_user[2]
+ acl = {'read-only': [acl_user]}
+ headers = {'x-account-access-control': json.dumps(acl)}
+ resp = retry(post_account, headers=headers, use_account=1)
+ resp.read()
+ self.assertEqual(resp.status, 204)
+
+ # read-only can list containers
+ resp = retry(get, use_account=3)
+ listing = resp.read()
+ self.assertEquals(resp.status, 200)
+ self.assert_(self.name in listing)
+
+ # read-only can not create containers
+ new_container_name = str(uuid4())
+ resp = retry(put, new_container_name, use_account=3)
+ resp.read()
+ self.assertEquals(resp.status, 403)
+
+ # but it can see newly created ones
+ resp = retry(put, new_container_name, use_account=1)
+ resp.read()
self.assertEquals(resp.status, 201)
+ resp = retry(get, use_account=3)
+ listing = resp.read()
+ self.assertEquals(resp.status, 200)
+ self.assert_(new_container_name in listing)
+
+ @requires_acls
+ def test_read_only_acl_metadata(self):
+ if skip3:
+ raise SkipTest
+
+ def get(url, token, parsed, conn, name):
+ conn.request('GET', parsed.path + '/%s' % name, '',
+ {'X-Auth-Token': token})
+ return check_response(conn)
+
+ def post_account(url, token, parsed, conn, headers):
+ new_headers = dict({'X-Auth-Token': token}, **headers)
+ conn.request('POST', parsed.path, '', new_headers)
+ return check_response(conn)
+
+ def post(url, token, parsed, conn, name, headers):
+ new_headers = dict({'X-Auth-Token': token}, **headers)
+ conn.request('POST', parsed.path + '/%s' % name, '', new_headers)
+ return check_response(conn)
+
+ # add some metadata
+ value = str(uuid4())
+ headers = {'x-container-meta-test': value}
+ resp = retry(post, self.name, headers=headers, use_account=1)
+ resp.read()
+ self.assertEqual(resp.status, 204)
+ resp = retry(get, self.name, use_account=1)
+ resp.read()
+ self.assertEquals(resp.status, 204)
+ self.assertEqual(resp.getheader('X-Container-Meta-Test'), value)
+
+ # cannot see metadata
+ resp = retry(get, self.name, use_account=3)
+ resp.read()
+ self.assertEquals(resp.status, 403)
+
+ # grant read-only access
+ acl_user = swift_test_user[2]
+ acl = {'read-only': [acl_user]}
+ headers = {'x-account-access-control': json.dumps(acl)}
+ resp = retry(post_account, headers=headers, use_account=1)
+ resp.read()
+ self.assertEqual(resp.status, 204)
+
+ # read-only can NOT write container metadata
+ new_value = str(uuid4())
+ headers = {'x-container-meta-test': new_value}
+ resp = retry(post, self.name, headers=headers, use_account=3)
+ resp.read()
+ self.assertEqual(resp.status, 403)
+
+ # read-only can read container metadata
+ resp = retry(get, self.name, use_account=3)
+ resp.read()
+ self.assertEquals(resp.status, 204)
+ self.assertEqual(resp.getheader('X-Container-Meta-Test'), value)
+
+ @requires_acls
+ def test_read_write_acl_listings(self):
+ if skip3:
+ raise SkipTest
+
+ def get(url, token, parsed, conn):
+ conn.request('GET', parsed.path, '', {'X-Auth-Token': token})
+ return check_response(conn)
+
+ def post(url, token, parsed, conn, headers):
+ new_headers = dict({'X-Auth-Token': token}, **headers)
+ conn.request('POST', parsed.path, '', new_headers)
+ return check_response(conn)
+
+ def put(url, token, parsed, conn, name):
+ conn.request('PUT', parsed.path + '/%s' % name, '',
+ {'X-Auth-Token': token})
+ return check_response(conn)
+
+ def delete(url, token, parsed, conn, name):
+ conn.request('DELETE', parsed.path + '/%s' % name, '',
+ {'X-Auth-Token': token})
+ return check_response(conn)
+
+ # cannot list containers
+ resp = retry(get, use_account=3)
+ resp.read()
+ self.assertEquals(resp.status, 403)
+
+ # grant read-write access
+ acl_user = swift_test_user[2]
+ acl = {'read-write': [acl_user]}
+ headers = {'x-account-access-control': json.dumps(acl)}
+ resp = retry(post, headers=headers, use_account=1)
+ resp.read()
+ self.assertEqual(resp.status, 204)
+
+ # can list containers
+ resp = retry(get, use_account=3)
+ listing = resp.read()
+ self.assertEquals(resp.status, 200)
+ self.assert_(self.name in listing)
+
+ # can create new containers
+ new_container_name = str(uuid4())
+ resp = retry(put, new_container_name, use_account=3)
+ resp.read()
+ self.assertEquals(resp.status, 201)
+ resp = retry(get, use_account=3)
+ listing = resp.read()
+ self.assertEquals(resp.status, 200)
+ self.assert_(new_container_name in listing)
+
+ # can also delete them
+ resp = retry(delete, new_container_name, use_account=3)
+ resp.read()
+ self.assertEquals(resp.status, 204)
+ resp = retry(get, use_account=3)
+ listing = resp.read()
+ self.assertEquals(resp.status, 200)
+ self.assert_(new_container_name not in listing)
+
+ # even if they didn't create them
+ empty_container_name = str(uuid4())
+ resp = retry(put, empty_container_name, use_account=1)
+ resp.read()
+ self.assertEquals(resp.status, 201)
+ resp = retry(delete, empty_container_name, use_account=3)
+ resp.read()
+ self.assertEquals(resp.status, 204)
+
+ @requires_acls
+ def test_read_write_acl_metadata(self):
+ if skip3:
+ raise SkipTest
+
+ def get(url, token, parsed, conn, name):
+ conn.request('GET', parsed.path + '/%s' % name, '',
+ {'X-Auth-Token': token})
+ return check_response(conn)
+
+ def post_account(url, token, parsed, conn, headers):
+ new_headers = dict({'X-Auth-Token': token}, **headers)
+ conn.request('POST', parsed.path, '', new_headers)
+ return check_response(conn)
+
+ def post(url, token, parsed, conn, name, headers):
+ new_headers = dict({'X-Auth-Token': token}, **headers)
+ conn.request('POST', parsed.path + '/%s' % name, '', new_headers)
+ return check_response(conn)
+
+ # add some metadata
+ value = str(uuid4())
+ headers = {'x-container-meta-test': value}
+ resp = retry(post, self.name, headers=headers, use_account=1)
+ resp.read()
+ self.assertEqual(resp.status, 204)
+ resp = retry(get, self.name, use_account=1)
+ resp.read()
+ self.assertEquals(resp.status, 204)
+ self.assertEqual(resp.getheader('X-Container-Meta-Test'), value)
+
+ # cannot see metadata
+ resp = retry(get, self.name, use_account=3)
+ resp.read()
+ self.assertEquals(resp.status, 403)
+
+ # grant read-write access
+ acl_user = swift_test_user[2]
+ acl = {'read-write': [acl_user]}
+ headers = {'x-account-access-control': json.dumps(acl)}
+ resp = retry(post_account, headers=headers, use_account=1)
+ resp.read()
+ self.assertEqual(resp.status, 204)
+
+ # read-write can read container metadata
+ resp = retry(get, self.name, use_account=3)
+ resp.read()
+ self.assertEquals(resp.status, 204)
+ self.assertEqual(resp.getheader('X-Container-Meta-Test'), value)
+
+ # read-write can also write container metadata
+ new_value = str(uuid4())
+ headers = {'x-container-meta-test': new_value}
+ resp = retry(post, self.name, headers=headers, use_account=3)
+ resp.read()
+ self.assertEquals(resp.status, 204)
+ resp = retry(get, self.name, use_account=3)
+ resp.read()
+ self.assertEquals(resp.status, 204)
+ self.assertEqual(resp.getheader('X-Container-Meta-Test'), new_value)
+
+ # and remove it
+ headers = {'x-remove-container-meta-test': 'true'}
+ resp = retry(post, self.name, headers=headers, use_account=3)
+ resp.read()
+ self.assertEquals(resp.status, 204)
+ resp = retry(get, self.name, use_account=3)
+ resp.read()
+ self.assertEquals(resp.status, 204)
+ self.assertEqual(resp.getheader('X-Container-Meta-Test'), None)
+
+ @requires_acls
+ def test_admin_acl_listing(self):
+ if skip3:
+ raise SkipTest
+
+ def get(url, token, parsed, conn):
+ conn.request('GET', parsed.path, '', {'X-Auth-Token': token})
+ return check_response(conn)
+
+ def post(url, token, parsed, conn, headers):
+ new_headers = dict({'X-Auth-Token': token}, **headers)
+ conn.request('POST', parsed.path, '', new_headers)
+ return check_response(conn)
+
+ def put(url, token, parsed, conn, name):
+ conn.request('PUT', parsed.path + '/%s' % name, '',
+ {'X-Auth-Token': token})
+ return check_response(conn)
+
+ def delete(url, token, parsed, conn, name):
+ conn.request('DELETE', parsed.path + '/%s' % name, '',
+ {'X-Auth-Token': token})
+ return check_response(conn)
+
+ # cannot list containers
+ resp = retry(get, use_account=3)
+ resp.read()
+ self.assertEquals(resp.status, 403)
+
+ # grant admin access
+ acl_user = swift_test_user[2]
+ acl = {'admin': [acl_user]}
+ headers = {'x-account-access-control': json.dumps(acl)}
+ resp = retry(post, headers=headers, use_account=1)
+ resp.read()
+ self.assertEqual(resp.status, 204)
+
+ # can list containers
+ resp = retry(get, use_account=3)
+ listing = resp.read()
+ self.assertEquals(resp.status, 200)
+ self.assert_(self.name in listing)
+
+ # can create new containers
+ new_container_name = str(uuid4())
+ resp = retry(put, new_container_name, use_account=3)
+ resp.read()
+ self.assertEquals(resp.status, 201)
+ resp = retry(get, use_account=3)
+ listing = resp.read()
+ self.assertEquals(resp.status, 200)
+ self.assert_(new_container_name in listing)
+
+ # can also delete them
+ resp = retry(delete, new_container_name, use_account=3)
+ resp.read()
+ self.assertEquals(resp.status, 204)
+ resp = retry(get, use_account=3)
+ listing = resp.read()
+ self.assertEquals(resp.status, 200)
+ self.assert_(new_container_name not in listing)
+
+ # even if they didn't create them
+ empty_container_name = str(uuid4())
+ resp = retry(put, empty_container_name, use_account=1)
+ resp.read()
+ self.assertEquals(resp.status, 201)
+ resp = retry(delete, empty_container_name, use_account=3)
+ resp.read()
+ self.assertEquals(resp.status, 204)
+
+ @requires_acls
+ def test_admin_acl_metadata(self):
+ if skip3:
+ raise SkipTest
+
+ def get(url, token, parsed, conn, name):
+ conn.request('GET', parsed.path + '/%s' % name, '',
+ {'X-Auth-Token': token})
+ return check_response(conn)
+
+ def post_account(url, token, parsed, conn, headers):
+ new_headers = dict({'X-Auth-Token': token}, **headers)
+ conn.request('POST', parsed.path, '', new_headers)
+ return check_response(conn)
+
+ def post(url, token, parsed, conn, name, headers):
+ new_headers = dict({'X-Auth-Token': token}, **headers)
+ conn.request('POST', parsed.path + '/%s' % name, '', new_headers)
+ return check_response(conn)
+
+ # add some metadata
+ value = str(uuid4())
+ headers = {'x-container-meta-test': value}
+ resp = retry(post, self.name, headers=headers, use_account=1)
+ resp.read()
+ self.assertEqual(resp.status, 204)
+ resp = retry(get, self.name, use_account=1)
+ resp.read()
+ self.assertEquals(resp.status, 204)
+ self.assertEqual(resp.getheader('X-Container-Meta-Test'), value)
+
+ # cannot see metadata
+ resp = retry(get, self.name, use_account=3)
+ resp.read()
+ self.assertEquals(resp.status, 403)
+
+ # grant access
+ acl_user = swift_test_user[2]
+ acl = {'admin': [acl_user]}
+ headers = {'x-account-access-control': json.dumps(acl)}
+ resp = retry(post_account, headers=headers, use_account=1)
+ resp.read()
+ self.assertEqual(resp.status, 204)
+
+ # can read container metadata
+ resp = retry(get, self.name, use_account=3)
+ resp.read()
+ self.assertEquals(resp.status, 204)
+ self.assertEqual(resp.getheader('X-Container-Meta-Test'), value)
+
+ # can also write container metadata
+ new_value = str(uuid4())
+ headers = {'x-container-meta-test': new_value}
+ resp = retry(post, self.name, headers=headers, use_account=3)
+ resp.read()
+ self.assertEquals(resp.status, 204)
+ resp = retry(get, self.name, use_account=3)
+ resp.read()
+ self.assertEquals(resp.status, 204)
+ self.assertEqual(resp.getheader('X-Container-Meta-Test'), new_value)
+
+ # and remove it
+ headers = {'x-remove-container-meta-test': 'true'}
+ resp = retry(post, self.name, headers=headers, use_account=3)
+ resp.read()
+ self.assertEquals(resp.status, 204)
+ resp = retry(get, self.name, use_account=3)
+ resp.read()
+ self.assertEquals(resp.status, 204)
+ self.assertEqual(resp.getheader('X-Container-Meta-Test'), None)
+
+ @requires_acls
+ def test_protected_container_sync(self):
+ if skip3:
+ raise SkipTest
+
+ def get(url, token, parsed, conn, name):
+ conn.request('GET', parsed.path + '/%s' % name, '',
+ {'X-Auth-Token': token})
+ return check_response(conn)
+
+ def post_account(url, token, parsed, conn, headers):
+ new_headers = dict({'X-Auth-Token': token}, **headers)
+ conn.request('POST', parsed.path, '', new_headers)
+ return check_response(conn)
+
+ def post(url, token, parsed, conn, name, headers):
+ new_headers = dict({'X-Auth-Token': token}, **headers)
+ conn.request('POST', parsed.path + '/%s' % name, '', new_headers)
+ return check_response(conn)
+
+ # add some metadata
+ value = str(uuid4())
+ headers = {
+ 'x-container-sync-key': 'secret',
+ 'x-container-meta-test': value,
+ }
+ resp = retry(post, self.name, headers=headers, use_account=1)
+ resp.read()
+ self.assertEqual(resp.status, 204)
+ resp = retry(get, self.name, use_account=1)
+ resp.read()
+ self.assertEquals(resp.status, 204)
+ self.assertEqual(resp.getheader('X-Container-Sync-Key'), 'secret')
+ self.assertEqual(resp.getheader('X-Container-Meta-Test'), value)
+
+ # grant read-only access
+ acl_user = swift_test_user[2]
+ acl = {'read-only': [acl_user]}
+ headers = {'x-account-access-control': json.dumps(acl)}
+ resp = retry(post_account, headers=headers, use_account=1)
+ resp.read()
+ self.assertEqual(resp.status, 204)
+
+ # can read container metadata
+ resp = retry(get, self.name, use_account=3)
+ resp.read()
+ self.assertEquals(resp.status, 204)
+ self.assertEqual(resp.getheader('X-Container-Meta-Test'), value)
+ # but not sync-key
+ self.assertEqual(resp.getheader('X-Container-Sync-Key'), None)
+
+ # and can not write
+ headers = {'x-container-sync-key': str(uuid4())}
+ resp = retry(post, self.name, headers=headers, use_account=3)
+ resp.read()
+ self.assertEqual(resp.status, 403)
+
+ # grant read-write access
+ acl_user = swift_test_user[2]
+ acl = {'read-write': [acl_user]}
+ headers = {'x-account-access-control': json.dumps(acl)}
+ resp = retry(post_account, headers=headers, use_account=1)
+ resp.read()
+ self.assertEqual(resp.status, 204)
+
+ # can read container metadata
+ resp = retry(get, self.name, use_account=3)
+ resp.read()
+ self.assertEquals(resp.status, 204)
+ self.assertEqual(resp.getheader('X-Container-Meta-Test'), value)
+ # but not sync-key
+ self.assertEqual(resp.getheader('X-Container-Sync-Key'), None)
+
+ # sanity check sync-key w/ account1
+ resp = retry(get, self.name, use_account=1)
+ resp.read()
+ self.assertEquals(resp.status, 204)
+ self.assertEqual(resp.getheader('X-Container-Sync-Key'), 'secret')
+
+ # and can write
+ new_value = str(uuid4())
+ headers = {
+ 'x-container-sync-key': str(uuid4()),
+ 'x-container-meta-test': new_value,
+ }
+ resp = retry(post, self.name, headers=headers, use_account=3)
+ resp.read()
+ self.assertEqual(resp.status, 204)
+ resp = retry(get, self.name, use_account=1) # validate w/ account1
+ resp.read()
+ self.assertEquals(resp.status, 204)
+ self.assertEqual(resp.getheader('X-Container-Meta-Test'), new_value)
+ # but can not write sync-key
+ self.assertEqual(resp.getheader('X-Container-Sync-Key'), 'secret')
+
+ # grant admin access
+ acl_user = swift_test_user[2]
+ acl = {'admin': [acl_user]}
+ headers = {'x-account-access-control': json.dumps(acl)}
+ resp = retry(post_account, headers=headers, use_account=1)
+ resp.read()
+ self.assertEqual(resp.status, 204)
+
+ # admin can read container metadata
+ resp = retry(get, self.name, use_account=3)
+ resp.read()
+ self.assertEquals(resp.status, 204)
+ self.assertEqual(resp.getheader('X-Container-Meta-Test'), new_value)
+ # and ALSO sync-key
+ self.assertEqual(resp.getheader('X-Container-Sync-Key'), 'secret')
+
+ # admin tester3 can even change sync-key
+ new_secret = str(uuid4())
+ headers = {'x-container-sync-key': new_secret}
+ resp = retry(post, self.name, headers=headers, use_account=3)
+ resp.read()
+ self.assertEqual(resp.status, 204)
+ resp = retry(get, self.name, use_account=3)
+ resp.read()
+ self.assertEquals(resp.status, 204)
+ self.assertEqual(resp.getheader('X-Container-Sync-Key'), new_secret)
+
+ @requires_acls
+ def test_protected_container_acl(self):
+ if skip3:
+ raise SkipTest
+
+ def get(url, token, parsed, conn, name):
+ conn.request('GET', parsed.path + '/%s' % name, '',
+ {'X-Auth-Token': token})
+ return check_response(conn)
+
+ def post_account(url, token, parsed, conn, headers):
+ new_headers = dict({'X-Auth-Token': token}, **headers)
+ conn.request('POST', parsed.path, '', new_headers)
+ return check_response(conn)
+
+ def post(url, token, parsed, conn, name, headers):
+ new_headers = dict({'X-Auth-Token': token}, **headers)
+ conn.request('POST', parsed.path + '/%s' % name, '', new_headers)
+ return check_response(conn)
+
+ # add some container acls
+ value = str(uuid4())
+ headers = {
+ 'x-container-read': 'jdoe',
+ 'x-container-write': 'jdoe',
+ 'x-container-meta-test': value,
+ }
+ resp = retry(post, self.name, headers=headers, use_account=1)
+ resp.read()
+ self.assertEqual(resp.status, 204)
+ resp = retry(get, self.name, use_account=1)
+ resp.read()
+ self.assertEquals(resp.status, 204)
+ self.assertEqual(resp.getheader('X-Container-Read'), 'jdoe')
+ self.assertEqual(resp.getheader('X-Container-Write'), 'jdoe')
+ self.assertEqual(resp.getheader('X-Container-Meta-Test'), value)
+
+ # grant read-only access
+ acl_user = swift_test_user[2]
+ acl = {'read-only': [acl_user]}
+ headers = {'x-account-access-control': json.dumps(acl)}
+ resp = retry(post_account, headers=headers, use_account=1)
+ resp.read()
+ self.assertEqual(resp.status, 204)
+
+ # can read container metadata
+ resp = retry(get, self.name, use_account=3)
+ resp.read()
+ self.assertEquals(resp.status, 204)
+ self.assertEqual(resp.getheader('X-Container-Meta-Test'), value)
+ # but not container acl
+ self.assertEqual(resp.getheader('X-Container-Read'), None)
+ self.assertEqual(resp.getheader('X-Container-Write'), None)
+
+ # and can not write
+ headers = {
+ 'x-container-read': 'frank',
+ 'x-container-write': 'frank',
+ }
+ resp = retry(post, self.name, headers=headers, use_account=3)
+ resp.read()
+ self.assertEqual(resp.status, 403)
+
+ # grant read-write access
+ acl_user = swift_test_user[2]
+ acl = {'read-write': [acl_user]}
+ headers = {'x-account-access-control': json.dumps(acl)}
+ resp = retry(post_account, headers=headers, use_account=1)
+ resp.read()
+ self.assertEqual(resp.status, 204)
+
+ # can read container metadata
+ resp = retry(get, self.name, use_account=3)
+ resp.read()
+ self.assertEquals(resp.status, 204)
+ self.assertEqual(resp.getheader('X-Container-Meta-Test'), value)
+ # but not container acl
+ self.assertEqual(resp.getheader('X-Container-Read'), None)
+ self.assertEqual(resp.getheader('X-Container-Write'), None)
+
+ # sanity check container acls with account1
+ resp = retry(get, self.name, use_account=1)
+ resp.read()
+ self.assertEquals(resp.status, 204)
+ self.assertEqual(resp.getheader('X-Container-Read'), 'jdoe')
+ self.assertEqual(resp.getheader('X-Container-Write'), 'jdoe')
+
+ # and can write
+ new_value = str(uuid4())
+ headers = {
+ 'x-container-read': 'frank',
+ 'x-container-write': 'frank',
+ 'x-container-meta-test': new_value,
+ }
+ resp = retry(post, self.name, headers=headers, use_account=3)
+ resp.read()
+ self.assertEqual(resp.status, 204)
+ resp = retry(get, self.name, use_account=1) # validate w/ account1
+ resp.read()
+ self.assertEquals(resp.status, 204)
+ self.assertEqual(resp.getheader('X-Container-Meta-Test'), new_value)
+ # but can not write container acls
+ self.assertEqual(resp.getheader('X-Container-Read'), 'jdoe')
+ self.assertEqual(resp.getheader('X-Container-Write'), 'jdoe')
+
+ # grant admin access
+ acl_user = swift_test_user[2]
+ acl = {'admin': [acl_user]}
+ headers = {'x-account-access-control': json.dumps(acl)}
+ resp = retry(post_account, headers=headers, use_account=1)
+ resp.read()
+ self.assertEqual(resp.status, 204)
+
+ # admin can read container metadata
+ resp = retry(get, self.name, use_account=3)
+ resp.read()
+ self.assertEquals(resp.status, 204)
+ self.assertEqual(resp.getheader('X-Container-Meta-Test'), new_value)
+ # and ALSO container acls
+ self.assertEqual(resp.getheader('X-Container-Read'), 'jdoe')
+ self.assertEqual(resp.getheader('X-Container-Write'), 'jdoe')
+
+ # admin tester3 can even change container acls
+ new_value = str(uuid4())
+ headers = {
+ 'x-container-read': '.r:*',
+ }
+ resp = retry(post, self.name, headers=headers, use_account=3)
+ resp.read()
+ self.assertEqual(resp.status, 204)
+ resp = retry(get, self.name, use_account=3)
+ resp.read()
+ self.assertEquals(resp.status, 204)
+ self.assertEqual(resp.getheader('X-Container-Read'), '.r:*')
def test_long_name_content_type(self):
if skip:
@@ -662,9 +1333,9 @@ class TestContainer(unittest.TestCase):
resp = retry(put)
resp.read()
- self.assertEquals(resp.status, 400)
- self.assertEquals(resp.getheader('Content-Type'),
- 'text/html; charset=UTF-8')
+ self.assertEqual(resp.status, 400)
+ self.assertEqual(resp.getheader('Content-Type'),
+ 'text/html; charset=UTF-8')
def test_null_name(self):
if skip:
@@ -677,10 +1348,10 @@ class TestContainer(unittest.TestCase):
resp = retry(put)
if (web_front_end == 'apache2'):
- self.assertEquals(resp.status, 404)
+ self.assertEqual(resp.status, 404)
else:
- self.assertEquals(resp.read(), 'Invalid UTF8 or contains NULL')
- self.assertEquals(resp.status, 412)
+ self.assertEqual(resp.read(), 'Invalid UTF8 or contains NULL')
+ self.assertEqual(resp.status, 412)
if __name__ == '__main__':
diff --git a/test/functional/test_object.py b/test/functional/test_object.py
index dad8635..675de30 100755
--- a/test/functional/test_object.py
+++ b/test/functional/test_object.py
@@ -19,8 +19,10 @@ import unittest
from nose import SkipTest
from uuid import uuid4
+from swift.common.utils import json
+
from swift_testing import check_response, retry, skip, skip3, \
- swift_test_perm, web_front_end
+ swift_test_perm, web_front_end, requires_acls, swift_test_user
class TestObject(unittest.TestCase):
@@ -36,7 +38,7 @@ class TestObject(unittest.TestCase):
return check_response(conn)
resp = retry(put)
resp.read()
- self.assertEquals(resp.status, 201)
+ self.assertEqual(resp.status, 201)
self.obj = uuid4().hex
def put(url, token, parsed, conn):
@@ -46,7 +48,7 @@ class TestObject(unittest.TestCase):
return check_response(conn)
resp = retry(put)
resp.read()
- self.assertEquals(resp.status, 201)
+ self.assertEqual(resp.status, 201)
def tearDown(self):
if skip:
@@ -66,13 +68,13 @@ class TestObject(unittest.TestCase):
return check_response(conn)
resp = retry(list)
object_listing = resp.read()
- self.assertEquals(resp.status, 200)
+ self.assertEqual(resp.status, 200)
# iterate over object listing and delete all objects
for obj in object_listing.splitlines():
resp = retry(delete, obj)
resp.read()
- self.assertEquals(resp.status, 204)
+ self.assertEqual(resp.status, 204)
# delete the container
def delete(url, token, parsed, conn):
@@ -81,7 +83,33 @@ class TestObject(unittest.TestCase):
return check_response(conn)
resp = retry(delete)
resp.read()
- self.assertEquals(resp.status, 204)
+ self.assertEqual(resp.status, 204)
+
+ def test_if_none_match(self):
+ def put(url, token, parsed, conn):
+ conn.request('PUT', '%s/%s/%s' % (
+ parsed.path, self.container, 'if_none_match_test'), '',
+ {'X-Auth-Token': token,
+ 'Content-Length': '0',
+ 'If-None-Match': '*'})
+ return check_response(conn)
+ resp = retry(put)
+ resp.read()
+ self.assertEquals(resp.status, 201)
+ resp = retry(put)
+ resp.read()
+ self.assertEquals(resp.status, 412)
+
+ def put(url, token, parsed, conn):
+ conn.request('PUT', '%s/%s/%s' % (
+ parsed.path, self.container, 'if_none_match_test'), '',
+ {'X-Auth-Token': token,
+ 'Content-Length': '0',
+ 'If-None-Match': 'somethingelse'})
+ return check_response(conn)
+ resp = retry(put)
+ resp.read()
+ self.assertEquals(resp.status, 400)
def test_copy_object(self):
if skip:
@@ -98,8 +126,8 @@ class TestObject(unittest.TestCase):
return check_response(conn)
resp = retry(get_source)
source_contents = resp.read()
- self.assertEquals(resp.status, 200)
- self.assertEquals(source_contents, 'test')
+ self.assertEqual(resp.status, 200)
+ self.assertEqual(source_contents, 'test')
# copy source to dest with X-Copy-From
def put(url, token, parsed, conn):
@@ -110,7 +138,7 @@ class TestObject(unittest.TestCase):
return check_response(conn)
resp = retry(put)
resp.read()
- self.assertEquals(resp.status, 201)
+ self.assertEqual(resp.status, 201)
# contents of dest should be the same as source
def get_dest(url, token, parsed, conn):
@@ -120,8 +148,8 @@ class TestObject(unittest.TestCase):
return check_response(conn)
resp = retry(get_dest)
dest_contents = resp.read()
- self.assertEquals(resp.status, 200)
- self.assertEquals(dest_contents, source_contents)
+ self.assertEqual(resp.status, 200)
+ self.assertEqual(dest_contents, source_contents)
# delete the copy
def delete(url, token, parsed, conn):
@@ -130,11 +158,11 @@ class TestObject(unittest.TestCase):
return check_response(conn)
resp = retry(delete)
resp.read()
- self.assertEquals(resp.status, 204)
+ self.assertEqual(resp.status, 204)
# verify dest does not exist
resp = retry(get_dest)
resp.read()
- self.assertEquals(resp.status, 404)
+ self.assertEqual(resp.status, 404)
# copy source to dest with COPY
def copy(url, token, parsed, conn):
@@ -144,18 +172,18 @@ class TestObject(unittest.TestCase):
return check_response(conn)
resp = retry(copy)
resp.read()
- self.assertEquals(resp.status, 201)
+ self.assertEqual(resp.status, 201)
# contents of dest should be the same as source
resp = retry(get_dest)
dest_contents = resp.read()
- self.assertEquals(resp.status, 200)
- self.assertEquals(dest_contents, source_contents)
+ self.assertEqual(resp.status, 200)
+ self.assertEqual(dest_contents, source_contents)
# delete the copy
resp = retry(delete)
resp.read()
- self.assertEquals(resp.status, 204)
+ self.assertEqual(resp.status, 204)
def test_public_object(self):
if skip:
@@ -178,10 +206,10 @@ class TestObject(unittest.TestCase):
return check_response(conn)
resp = retry(post)
resp.read()
- self.assertEquals(resp.status, 204)
+ self.assertEqual(resp.status, 204)
resp = retry(get)
resp.read()
- self.assertEquals(resp.status, 200)
+ self.assertEqual(resp.status, 200)
def post(url, token, parsed, conn):
conn.request('POST', parsed.path + '/' + self.container, '',
@@ -189,7 +217,7 @@ class TestObject(unittest.TestCase):
return check_response(conn)
resp = retry(post)
resp.read()
- self.assertEquals(resp.status, 204)
+ self.assertEqual(resp.status, 204)
try:
resp = retry(get)
raise Exception('Should not have been able to GET')
@@ -208,7 +236,7 @@ class TestObject(unittest.TestCase):
return check_response(conn)
resp = retry(get, use_account=3)
resp.read()
- self.assertEquals(resp.status, 403)
+ self.assertEqual(resp.status, 403)
# create a shared container writable by account3
shared_container = uuid4().hex
@@ -222,7 +250,7 @@ class TestObject(unittest.TestCase):
return check_response(conn)
resp = retry(put)
resp.read()
- self.assertEquals(resp.status, 201)
+ self.assertEqual(resp.status, 201)
# verify third account can not copy from private container
def copy(url, token, parsed, conn):
@@ -234,7 +262,7 @@ class TestObject(unittest.TestCase):
return check_response(conn)
resp = retry(copy, use_account=3)
resp.read()
- self.assertEquals(resp.status, 403)
+ self.assertEqual(resp.status, 403)
# verify third account can write "obj1" to shared container
def put(url, token, parsed, conn):
@@ -244,7 +272,7 @@ class TestObject(unittest.TestCase):
return check_response(conn)
resp = retry(put, use_account=3)
resp.read()
- self.assertEquals(resp.status, 201)
+ self.assertEqual(resp.status, 201)
# verify third account can copy "obj1" to shared container
def copy2(url, token, parsed, conn):
@@ -255,7 +283,7 @@ class TestObject(unittest.TestCase):
return check_response(conn)
resp = retry(copy2, use_account=3)
resp.read()
- self.assertEquals(resp.status, 201)
+ self.assertEqual(resp.status, 201)
# verify third account STILL can not copy from private container
def copy3(url, token, parsed, conn):
@@ -267,7 +295,7 @@ class TestObject(unittest.TestCase):
return check_response(conn)
resp = retry(copy3, use_account=3)
resp.read()
- self.assertEquals(resp.status, 403)
+ self.assertEqual(resp.status, 403)
# clean up "obj1"
def delete(url, token, parsed, conn):
@@ -277,7 +305,7 @@ class TestObject(unittest.TestCase):
return check_response(conn)
resp = retry(delete)
resp.read()
- self.assertEquals(resp.status, 204)
+ self.assertEqual(resp.status, 204)
# clean up shared_container
def delete(url, token, parsed, conn):
@@ -287,8 +315,251 @@ class TestObject(unittest.TestCase):
return check_response(conn)
resp = retry(delete)
resp.read()
+ self.assertEqual(resp.status, 204)
+
+ @requires_acls
+ def test_read_only(self):
+ if skip3:
+ raise SkipTest
+
+ def get_listing(url, token, parsed, conn):
+ conn.request('GET', '%s/%s' % (parsed.path, self.container), '',
+ {'X-Auth-Token': token})
+ return check_response(conn)
+
+ def post_account(url, token, parsed, conn, headers):
+ new_headers = dict({'X-Auth-Token': token}, **headers)
+ conn.request('POST', parsed.path, '', new_headers)
+ return check_response(conn)
+
+ def get(url, token, parsed, conn, name):
+ conn.request('GET', '%s/%s/%s' % (
+ parsed.path, self.container, name), '',
+ {'X-Auth-Token': token})
+ return check_response(conn)
+
+ def put(url, token, parsed, conn, name):
+ conn.request('PUT', '%s/%s/%s' % (
+ parsed.path, self.container, name), 'test',
+ {'X-Auth-Token': token})
+ return check_response(conn)
+
+ def delete(url, token, parsed, conn, name):
+ conn.request('PUT', '%s/%s/%s' % (
+ parsed.path, self.container, name), '',
+ {'X-Auth-Token': token})
+ return check_response(conn)
+
+ # cannot list objects
+ resp = retry(get_listing, use_account=3)
+ resp.read()
+ self.assertEquals(resp.status, 403)
+
+ # cannot get object
+ resp = retry(get, self.obj, use_account=3)
+ resp.read()
+ self.assertEquals(resp.status, 403)
+
+ # grant read-only access
+ acl_user = swift_test_user[2]
+ acl = {'read-only': [acl_user]}
+ headers = {'x-account-access-control': json.dumps(acl)}
+ resp = retry(post_account, headers=headers, use_account=1)
+ resp.read()
+ self.assertEqual(resp.status, 204)
+
+ # can list objects
+ resp = retry(get_listing, use_account=3)
+ listing = resp.read()
+ self.assertEquals(resp.status, 200)
+ self.assert_(self.obj in listing)
+
+ # can get object
+ resp = retry(get, self.obj, use_account=3)
+ body = resp.read()
+ self.assertEquals(resp.status, 200)
+ self.assertEquals(body, 'test')
+
+ # can not put an object
+ obj_name = str(uuid4())
+ resp = retry(put, obj_name, use_account=3)
+ body = resp.read()
+ self.assertEquals(resp.status, 403)
+
+ # can not delete an object
+ resp = retry(delete, self.obj, use_account=3)
+ body = resp.read()
+ self.assertEquals(resp.status, 403)
+
+ # sanity with account1
+ resp = retry(get_listing, use_account=3)
+ listing = resp.read()
+ self.assertEquals(resp.status, 200)
+ self.assert_(obj_name not in listing)
+ self.assert_(self.obj in listing)
+
+ @requires_acls
+ def test_read_write(self):
+ if skip3:
+ raise SkipTest
+
+ def get_listing(url, token, parsed, conn):
+ conn.request('GET', '%s/%s' % (parsed.path, self.container), '',
+ {'X-Auth-Token': token})
+ return check_response(conn)
+
+ def post_account(url, token, parsed, conn, headers):
+ new_headers = dict({'X-Auth-Token': token}, **headers)
+ conn.request('POST', parsed.path, '', new_headers)
+ return check_response(conn)
+
+ def get(url, token, parsed, conn, name):
+ conn.request('GET', '%s/%s/%s' % (
+ parsed.path, self.container, name), '',
+ {'X-Auth-Token': token})
+ return check_response(conn)
+
+ def put(url, token, parsed, conn, name):
+ conn.request('PUT', '%s/%s/%s' % (
+ parsed.path, self.container, name), 'test',
+ {'X-Auth-Token': token})
+ return check_response(conn)
+
+ def delete(url, token, parsed, conn, name):
+ conn.request('DELETE', '%s/%s/%s' % (
+ parsed.path, self.container, name), '',
+ {'X-Auth-Token': token})
+ return check_response(conn)
+
+ # cannot list objects
+ resp = retry(get_listing, use_account=3)
+ resp.read()
+ self.assertEquals(resp.status, 403)
+
+ # cannot get object
+ resp = retry(get, self.obj, use_account=3)
+ resp.read()
+ self.assertEquals(resp.status, 403)
+
+ # grant read-write access
+ acl_user = swift_test_user[2]
+ acl = {'read-write': [acl_user]}
+ headers = {'x-account-access-control': json.dumps(acl)}
+ resp = retry(post_account, headers=headers, use_account=1)
+ resp.read()
+ self.assertEqual(resp.status, 204)
+
+ # can list objects
+ resp = retry(get_listing, use_account=3)
+ listing = resp.read()
+ self.assertEquals(resp.status, 200)
+ self.assert_(self.obj in listing)
+
+ # can get object
+ resp = retry(get, self.obj, use_account=3)
+ body = resp.read()
+ self.assertEquals(resp.status, 200)
+ self.assertEquals(body, 'test')
+
+ # can put an object
+ obj_name = str(uuid4())
+ resp = retry(put, obj_name, use_account=3)
+ body = resp.read()
+ self.assertEquals(resp.status, 201)
+
+ # can delete an object
+ resp = retry(delete, self.obj, use_account=3)
+ body = resp.read()
self.assertEquals(resp.status, 204)
+ # sanity with account1
+ resp = retry(get_listing, use_account=3)
+ listing = resp.read()
+ self.assertEquals(resp.status, 200)
+ self.assert_(obj_name in listing)
+ self.assert_(self.obj not in listing)
+
+ @requires_acls
+ def test_admin(self):
+ if skip3:
+ raise SkipTest
+
+ def get_listing(url, token, parsed, conn):
+ conn.request('GET', '%s/%s' % (parsed.path, self.container), '',
+ {'X-Auth-Token': token})
+ return check_response(conn)
+
+ def post_account(url, token, parsed, conn, headers):
+ new_headers = dict({'X-Auth-Token': token}, **headers)
+ conn.request('POST', parsed.path, '', new_headers)
+ return check_response(conn)
+
+ def get(url, token, parsed, conn, name):
+ conn.request('GET', '%s/%s/%s' % (
+ parsed.path, self.container, name), '',
+ {'X-Auth-Token': token})
+ return check_response(conn)
+
+ def put(url, token, parsed, conn, name):
+ conn.request('PUT', '%s/%s/%s' % (
+ parsed.path, self.container, name), 'test',
+ {'X-Auth-Token': token})
+ return check_response(conn)
+
+ def delete(url, token, parsed, conn, name):
+ conn.request('DELETE', '%s/%s/%s' % (
+ parsed.path, self.container, name), '',
+ {'X-Auth-Token': token})
+ return check_response(conn)
+
+ # cannot list objects
+ resp = retry(get_listing, use_account=3)
+ resp.read()
+ self.assertEquals(resp.status, 403)
+
+ # cannot get object
+ resp = retry(get, self.obj, use_account=3)
+ resp.read()
+ self.assertEquals(resp.status, 403)
+
+ # grant admin access
+ acl_user = swift_test_user[2]
+ acl = {'admin': [acl_user]}
+ headers = {'x-account-access-control': json.dumps(acl)}
+ resp = retry(post_account, headers=headers, use_account=1)
+ resp.read()
+ self.assertEqual(resp.status, 204)
+
+ # can list objects
+ resp = retry(get_listing, use_account=3)
+ listing = resp.read()
+ self.assertEquals(resp.status, 200)
+ self.assert_(self.obj in listing)
+
+ # can get object
+ resp = retry(get, self.obj, use_account=3)
+ body = resp.read()
+ self.assertEquals(resp.status, 200)
+ self.assertEquals(body, 'test')
+
+ # can put an object
+ obj_name = str(uuid4())
+ resp = retry(put, obj_name, use_account=3)
+ body = resp.read()
+ self.assertEquals(resp.status, 201)
+
+ # can delete an object
+ resp = retry(delete, self.obj, use_account=3)
+ body = resp.read()
+ self.assertEquals(resp.status, 204)
+
+ # sanity with account1
+ resp = retry(get_listing, use_account=3)
+ listing = resp.read()
+ self.assertEquals(resp.status, 200)
+ self.assert_(obj_name in listing)
+ self.assert_(self.obj not in listing)
+
def test_manifest(self):
if skip:
raise SkipTest
@@ -306,7 +577,7 @@ class TestObject(unittest.TestCase):
for objnum in xrange(len(segments1)):
resp = retry(put, objnum)
resp.read()
- self.assertEquals(resp.status, 201)
+ self.assertEqual(resp.status, 201)
# Upload the manifest
def put(url, token, parsed, conn):
@@ -318,7 +589,7 @@ class TestObject(unittest.TestCase):
return check_response(conn)
resp = retry(put)
resp.read()
- self.assertEquals(resp.status, 201)
+ self.assertEqual(resp.status, 201)
# Get the manifest (should get all the segments as the body)
def get(url, token, parsed, conn):
@@ -326,9 +597,9 @@ class TestObject(unittest.TestCase):
parsed.path, self.container), '', {'X-Auth-Token': token})
return check_response(conn)
resp = retry(get)
- self.assertEquals(resp.read(), ''.join(segments1))
- self.assertEquals(resp.status, 200)
- self.assertEquals(resp.getheader('content-type'), 'text/jibberish')
+ self.assertEqual(resp.read(), ''.join(segments1))
+ self.assertEqual(resp.status, 200)
+ self.assertEqual(resp.getheader('content-type'), 'text/jibberish')
# Get with a range at the start of the second segment
def get(url, token, parsed, conn):
@@ -337,8 +608,8 @@ class TestObject(unittest.TestCase):
'X-Auth-Token': token, 'Range': 'bytes=3-'})
return check_response(conn)
resp = retry(get)
- self.assertEquals(resp.read(), ''.join(segments1[1:]))
- self.assertEquals(resp.status, 206)
+ self.assertEqual(resp.read(), ''.join(segments1[1:]))
+ self.assertEqual(resp.status, 206)
# Get with a range in the middle of the second segment
def get(url, token, parsed, conn):
@@ -347,8 +618,8 @@ class TestObject(unittest.TestCase):
'X-Auth-Token': token, 'Range': 'bytes=5-'})
return check_response(conn)
resp = retry(get)
- self.assertEquals(resp.read(), ''.join(segments1)[5:])
- self.assertEquals(resp.status, 206)
+ self.assertEqual(resp.read(), ''.join(segments1)[5:])
+ self.assertEqual(resp.status, 206)
# Get with a full start and stop range
def get(url, token, parsed, conn):
@@ -357,8 +628,8 @@ class TestObject(unittest.TestCase):
'X-Auth-Token': token, 'Range': 'bytes=5-10'})
return check_response(conn)
resp = retry(get)
- self.assertEquals(resp.read(), ''.join(segments1)[5:11])
- self.assertEquals(resp.status, 206)
+ self.assertEqual(resp.read(), ''.join(segments1)[5:11])
+ self.assertEqual(resp.status, 206)
# Upload the second set of segments
def put(url, token, parsed, conn, objnum):
@@ -369,7 +640,7 @@ class TestObject(unittest.TestCase):
for objnum in xrange(len(segments2)):
resp = retry(put, objnum)
resp.read()
- self.assertEquals(resp.status, 201)
+ self.assertEqual(resp.status, 201)
# Get the manifest (should still be the first segments of course)
def get(url, token, parsed, conn):
@@ -377,8 +648,8 @@ class TestObject(unittest.TestCase):
parsed.path, self.container), '', {'X-Auth-Token': token})
return check_response(conn)
resp = retry(get)
- self.assertEquals(resp.read(), ''.join(segments1))
- self.assertEquals(resp.status, 200)
+ self.assertEqual(resp.read(), ''.join(segments1))
+ self.assertEqual(resp.status, 200)
# Update the manifest
def put(url, token, parsed, conn):
@@ -390,7 +661,7 @@ class TestObject(unittest.TestCase):
return check_response(conn)
resp = retry(put)
resp.read()
- self.assertEquals(resp.status, 201)
+ self.assertEqual(resp.status, 201)
# Get the manifest (should be the second set of segments now)
def get(url, token, parsed, conn):
@@ -398,8 +669,8 @@ class TestObject(unittest.TestCase):
parsed.path, self.container), '', {'X-Auth-Token': token})
return check_response(conn)
resp = retry(get)
- self.assertEquals(resp.read(), ''.join(segments2))
- self.assertEquals(resp.status, 200)
+ self.assertEqual(resp.read(), ''.join(segments2))
+ self.assertEqual(resp.status, 200)
if not skip3:
@@ -410,7 +681,7 @@ class TestObject(unittest.TestCase):
return check_response(conn)
resp = retry(get, use_account=3)
resp.read()
- self.assertEquals(resp.status, 403)
+ self.assertEqual(resp.status, 403)
# Grant access to the third account
def post(url, token, parsed, conn):
@@ -420,7 +691,7 @@ class TestObject(unittest.TestCase):
return check_response(conn)
resp = retry(post)
resp.read()
- self.assertEquals(resp.status, 204)
+ self.assertEqual(resp.status, 204)
# The third account should be able to get the manifest now
def get(url, token, parsed, conn):
@@ -428,8 +699,8 @@ class TestObject(unittest.TestCase):
parsed.path, self.container), '', {'X-Auth-Token': token})
return check_response(conn)
resp = retry(get, use_account=3)
- self.assertEquals(resp.read(), ''.join(segments2))
- self.assertEquals(resp.status, 200)
+ self.assertEqual(resp.read(), ''.join(segments2))
+ self.assertEqual(resp.status, 200)
# Create another container for the third set of segments
acontainer = uuid4().hex
@@ -440,7 +711,7 @@ class TestObject(unittest.TestCase):
return check_response(conn)
resp = retry(put)
resp.read()
- self.assertEquals(resp.status, 201)
+ self.assertEqual(resp.status, 201)
# Upload the third set of segments in the other container
def put(url, token, parsed, conn, objnum):
@@ -451,7 +722,7 @@ class TestObject(unittest.TestCase):
for objnum in xrange(len(segments3)):
resp = retry(put, objnum)
resp.read()
- self.assertEquals(resp.status, 201)
+ self.assertEqual(resp.status, 201)
# Update the manifest
def put(url, token, parsed, conn):
@@ -463,7 +734,7 @@ class TestObject(unittest.TestCase):
return check_response(conn)
resp = retry(put)
resp.read()
- self.assertEquals(resp.status, 201)
+ self.assertEqual(resp.status, 201)
# Get the manifest to ensure it's the third set of segments
def get(url, token, parsed, conn):
@@ -471,8 +742,8 @@ class TestObject(unittest.TestCase):
parsed.path, self.container), '', {'X-Auth-Token': token})
return check_response(conn)
resp = retry(get)
- self.assertEquals(resp.read(), ''.join(segments3))
- self.assertEquals(resp.status, 200)
+ self.assertEqual(resp.read(), ''.join(segments3))
+ self.assertEqual(resp.status, 200)
if not skip3:
@@ -486,7 +757,7 @@ class TestObject(unittest.TestCase):
return check_response(conn)
resp = retry(get, use_account=3)
resp.read()
- self.assertEquals(resp.status, 403)
+ self.assertEqual(resp.status, 403)
# Grant access to the third account
def post(url, token, parsed, conn):
@@ -496,7 +767,7 @@ class TestObject(unittest.TestCase):
return check_response(conn)
resp = retry(post)
resp.read()
- self.assertEquals(resp.status, 204)
+ self.assertEqual(resp.status, 204)
# The third account should be able to get the manifest now
def get(url, token, parsed, conn):
@@ -504,8 +775,8 @@ class TestObject(unittest.TestCase):
parsed.path, self.container), '', {'X-Auth-Token': token})
return check_response(conn)
resp = retry(get, use_account=3)
- self.assertEquals(resp.read(), ''.join(segments3))
- self.assertEquals(resp.status, 200)
+ self.assertEqual(resp.read(), ''.join(segments3))
+ self.assertEqual(resp.status, 200)
# Delete the manifest
def delete(url, token, parsed, conn, objnum):
@@ -515,7 +786,7 @@ class TestObject(unittest.TestCase):
return check_response(conn)
resp = retry(delete, objnum)
resp.read()
- self.assertEquals(resp.status, 204)
+ self.assertEqual(resp.status, 204)
# Delete the third set of segments
def delete(url, token, parsed, conn, objnum):
@@ -526,7 +797,7 @@ class TestObject(unittest.TestCase):
for objnum in xrange(len(segments3)):
resp = retry(delete, objnum)
resp.read()
- self.assertEquals(resp.status, 204)
+ self.assertEqual(resp.status, 204)
# Delete the second set of segments
def delete(url, token, parsed, conn, objnum):
@@ -537,7 +808,7 @@ class TestObject(unittest.TestCase):
for objnum in xrange(len(segments2)):
resp = retry(delete, objnum)
resp.read()
- self.assertEquals(resp.status, 204)
+ self.assertEqual(resp.status, 204)
# Delete the first set of segments
def delete(url, token, parsed, conn, objnum):
@@ -548,7 +819,7 @@ class TestObject(unittest.TestCase):
for objnum in xrange(len(segments1)):
resp = retry(delete, objnum)
resp.read()
- self.assertEquals(resp.status, 204)
+ self.assertEqual(resp.status, 204)
# Delete the extra container
def delete(url, token, parsed, conn):
@@ -557,7 +828,7 @@ class TestObject(unittest.TestCase):
return check_response(conn)
resp = retry(delete)
resp.read()
- self.assertEquals(resp.status, 204)
+ self.assertEqual(resp.status, 204)
def test_delete_content_type(self):
if skip:
@@ -569,7 +840,7 @@ class TestObject(unittest.TestCase):
return check_response(conn)
resp = retry(put)
resp.read()
- self.assertEquals(resp.status, 201)
+ self.assertEqual(resp.status, 201)
def delete(url, token, parsed, conn):
conn.request('DELETE', '%s/%s/hi' % (parsed.path, self.container),
@@ -577,9 +848,9 @@ class TestObject(unittest.TestCase):
return check_response(conn)
resp = retry(delete)
resp.read()
- self.assertEquals(resp.status, 204)
- self.assertEquals(resp.getheader('Content-Type'),
- 'text/html; charset=UTF-8')
+ self.assertEqual(resp.status, 204)
+ self.assertEqual(resp.getheader('Content-Type'),
+ 'text/html; charset=UTF-8')
def test_delete_if_delete_at_bad(self):
if skip:
@@ -592,7 +863,7 @@ class TestObject(unittest.TestCase):
return check_response(conn)
resp = retry(put)
resp.read()
- self.assertEquals(resp.status, 201)
+ self.assertEqual(resp.status, 201)
def delete(url, token, parsed, conn):
conn.request('DELETE', '%s/%s/hi' % (parsed.path, self.container),
@@ -601,7 +872,7 @@ class TestObject(unittest.TestCase):
return check_response(conn)
resp = retry(delete)
resp.read()
- self.assertEquals(resp.status, 400)
+ self.assertEqual(resp.status, 400)
def test_null_name(self):
if skip:
@@ -614,10 +885,121 @@ class TestObject(unittest.TestCase):
return check_response(conn)
resp = retry(put)
if (web_front_end == 'apache2'):
- self.assertEquals(resp.status, 404)
+ self.assertEqual(resp.status, 404)
else:
- self.assertEquals(resp.read(), 'Invalid UTF8 or contains NULL')
- self.assertEquals(resp.status, 412)
+ self.assertEqual(resp.read(), 'Invalid UTF8 or contains NULL')
+ self.assertEqual(resp.status, 412)
+
+ def test_cors(self):
+ if skip:
+ raise SkipTest
+
+ def is_strict_mode(url, token, parsed, conn):
+ conn.request('GET', '/info')
+ resp = conn.getresponse()
+ if resp.status // 100 == 2:
+ info = json.loads(resp.read())
+ return info.get('swift', {}).get('strict_cors_mode', False)
+ return False
+
+ def put_cors_cont(url, token, parsed, conn, orig):
+ conn.request(
+ 'PUT', '%s/%s' % (parsed.path, self.container),
+ '', {'X-Auth-Token': token,
+ 'X-Container-Meta-Access-Control-Allow-Origin': orig})
+ return check_response(conn)
+
+ def put_obj(url, token, parsed, conn, obj):
+ conn.request(
+ 'PUT', '%s/%s/%s' % (parsed.path, self.container, obj),
+ 'test', {'X-Auth-Token': token})
+ return check_response(conn)
+
+ def check_cors(url, token, parsed, conn,
+ method, obj, headers):
+ if method != 'OPTIONS':
+ headers['X-Auth-Token'] = token
+ conn.request(
+ method, '%s/%s/%s' % (parsed.path, self.container, obj),
+ '', headers)
+ return conn.getresponse()
+
+ strict_cors = retry(is_strict_mode)
+
+ resp = retry(put_cors_cont, '*')
+ resp.read()
+ self.assertEquals(resp.status // 100, 2)
+
+ resp = retry(put_obj, 'cat')
+ resp.read()
+ self.assertEquals(resp.status // 100, 2)
+
+ resp = retry(check_cors,
+ 'OPTIONS', 'cat', {'Origin': 'http://m.com'})
+ self.assertEquals(resp.status, 401)
+
+ resp = retry(check_cors,
+ 'OPTIONS', 'cat',
+ {'Origin': 'http://m.com',
+ 'Access-Control-Request-Method': 'GET'})
+
+ self.assertEquals(resp.status, 200)
+ resp.read()
+ headers = dict((k.lower(), v) for k, v in resp.getheaders())
+ self.assertEquals(headers.get('access-control-allow-origin'),
+ '*')
+
+ resp = retry(check_cors,
+ 'GET', 'cat', {'Origin': 'http://m.com'})
+ self.assertEquals(resp.status, 200)
+ headers = dict((k.lower(), v) for k, v in resp.getheaders())
+ self.assertEquals(headers.get('access-control-allow-origin'),
+ '*')
+
+ resp = retry(check_cors,
+ 'GET', 'cat', {'Origin': 'http://m.com',
+ 'X-Web-Mode': 'True'})
+ self.assertEquals(resp.status, 200)
+ headers = dict((k.lower(), v) for k, v in resp.getheaders())
+ self.assertEquals(headers.get('access-control-allow-origin'),
+ '*')
+
+ ####################
+
+ resp = retry(put_cors_cont, 'http://secret.com')
+ resp.read()
+ self.assertEquals(resp.status // 100, 2)
+
+ resp = retry(check_cors,
+ 'OPTIONS', 'cat',
+ {'Origin': 'http://m.com',
+ 'Access-Control-Request-Method': 'GET'})
+ resp.read()
+ self.assertEquals(resp.status, 401)
+
+ if strict_cors:
+ resp = retry(check_cors,
+ 'GET', 'cat', {'Origin': 'http://m.com'})
+ resp.read()
+ self.assertEquals(resp.status, 200)
+ headers = dict((k.lower(), v) for k, v in resp.getheaders())
+ self.assertTrue('access-control-allow-origin' not in headers)
+
+ resp = retry(check_cors,
+ 'GET', 'cat', {'Origin': 'http://secret.com'})
+ resp.read()
+ self.assertEquals(resp.status, 200)
+ headers = dict((k.lower(), v) for k, v in resp.getheaders())
+ self.assertEquals(headers.get('access-control-allow-origin'),
+ 'http://secret.com')
+ else:
+ resp = retry(check_cors,
+ 'GET', 'cat', {'Origin': 'http://m.com'})
+ resp.read()
+ self.assertEquals(resp.status, 200)
+ headers = dict((k.lower(), v) for k, v in resp.getheaders())
+ self.assertEquals(headers.get('access-control-allow-origin'),
+ 'http://m.com')
if __name__ == '__main__':
diff --git a/test/functional/tests.py b/test/functional/tests.py
index 0d9a9ef..ad87d7e 100644
--- a/test/functional/tests.py
+++ b/test/functional/tests.py
@@ -19,14 +19,16 @@
from datetime import datetime
import os
import hashlib
+import hmac
import json
import locale
import random
import StringIO
import time
import threading
-import uuid
import unittest
+import urllib
+import uuid
from nose import SkipTest
from ConfigParser import ConfigParser
@@ -36,7 +38,7 @@ from test.functional.swift_test_client import Account, Connection, File, \
from swift.common.constraints import MAX_FILE_SIZE, MAX_META_NAME_LENGTH, \
MAX_META_VALUE_LENGTH, MAX_META_COUNT, MAX_META_OVERALL_SIZE, \
MAX_OBJECT_NAME_LENGTH, CONTAINER_LISTING_LIMIT, ACCOUNT_LISTING_LIMIT, \
- MAX_ACCOUNT_NAME_LENGTH, MAX_CONTAINER_NAME_LENGTH
+ MAX_ACCOUNT_NAME_LENGTH, MAX_CONTAINER_NAME_LENGTH, MAX_HEADER_SIZE
from gluster.swift.common.constraints import \
set_object_name_component_length, get_object_name_component_length
@@ -50,7 +52,8 @@ default_constraints = dict((
('container_listing_limit', CONTAINER_LISTING_LIMIT),
('account_listing_limit', ACCOUNT_LISTING_LIMIT),
('max_account_name_length', MAX_ACCOUNT_NAME_LENGTH),
- ('max_container_name_length', MAX_CONTAINER_NAME_LENGTH)))
+ ('max_container_name_length', MAX_CONTAINER_NAME_LENGTH),
+ ('max_header_size', MAX_HEADER_SIZE)))
constraints_conf = ConfigParser()
conf_exists = constraints_conf.read('/etc/swift/swift.conf')
# Constraints are set first from the test config, then from
@@ -285,7 +288,7 @@ class TestAccount(Base):
if try_count < 5:
time.sleep(1)
- self.assertEquals(info['container_count'], len(self.env.containers))
+ self.assertEqual(info['container_count'], len(self.env.containers))
self.assert_status(204)
def testContainerSerializedInfo(self):
@@ -309,11 +312,11 @@ class TestAccount(Base):
headers = dict(self.env.conn.response.getheaders())
if format_type == 'json':
- self.assertEquals(headers['content-type'],
- 'application/json; charset=utf-8')
+ self.assertEqual(headers['content-type'],
+ 'application/json; charset=utf-8')
elif format_type == 'xml':
- self.assertEquals(headers['content-type'],
- 'application/xml; charset=utf-8')
+ self.assertEqual(headers['content-type'],
+ 'application/xml; charset=utf-8')
def testListingLimit(self):
limit = load_constraint('account_listing_limit')
@@ -337,7 +340,7 @@ class TestAccount(Base):
if isinstance(b[0], dict):
b = [x['name'] for x in b]
- self.assertEquals(a, b)
+ self.assertEqual(a, b)
def testInvalidAuthToken(self):
hdrs = {'X-Auth-Token': 'bogus_auth_token'}
@@ -347,12 +350,12 @@ class TestAccount(Base):
def testLastContainerMarker(self):
for format_type in [None, 'json', 'xml']:
containers = self.env.account.containers({'format': format_type})
- self.assertEquals(len(containers), len(self.env.containers))
+ self.assertEqual(len(containers), len(self.env.containers))
self.assert_status(200)
containers = self.env.account.containers(
parms={'format': format_type, 'marker': containers[-1]})
- self.assertEquals(len(containers), 0)
+ self.assertEqual(len(containers), 0)
if format_type is None:
self.assert_status(204)
else:
@@ -380,8 +383,8 @@ class TestAccount(Base):
parms={'format': format_type})
if isinstance(containers[0], dict):
containers = [x['name'] for x in containers]
- self.assertEquals(sorted(containers, cmp=locale.strcoll),
- containers)
+ self.assertEqual(sorted(containers, cmp=locale.strcoll),
+ containers)
class TestAccountUTF8(Base2, TestAccount):
@@ -518,13 +521,13 @@ class TestContainer(Base):
for format_type in [None, 'json', 'xml']:
for prefix in prefixs:
files = cont.files(parms={'prefix': prefix})
- self.assertEquals(files, sorted(prefix_files[prefix]))
+ self.assertEqual(files, sorted(prefix_files[prefix]))
for format_type in [None, 'json', 'xml']:
for prefix in prefixs:
files = cont.files(parms={'limit': limit_count,
'prefix': prefix})
- self.assertEquals(len(files), limit_count)
+ self.assertEqual(len(files), limit_count)
for file_item in files:
self.assert_(file_item.startswith(prefix))
@@ -548,7 +551,7 @@ class TestContainer(Base):
container = self.env.account.container(valid_utf8)
self.assert_(container.create(cfg={'no_path_quote': True}))
self.assert_(container.name in self.env.account.containers())
- self.assertEquals(container.files(), [])
+ self.assertEqual(container.files(), [])
self.assert_(container.delete())
container = self.env.account.container(invalid_utf8)
@@ -614,12 +617,12 @@ class TestContainer(Base):
def testLastFileMarker(self):
for format_type in [None, 'json', 'xml']:
files = self.env.container.files({'format': format_type})
- self.assertEquals(len(files), len(self.env.files))
+ self.assertEqual(len(files), len(self.env.files))
self.assert_status(200)
files = self.env.container.files(
parms={'format': format_type, 'marker': files[-1]})
- self.assertEquals(len(files), 0)
+ self.assertEqual(len(files), 0)
if format_type is None:
self.assert_status(204)
@@ -665,14 +668,14 @@ class TestContainer(Base):
files = self.env.container.files(parms={'format': format_type})
if isinstance(files[0], dict):
files = [x['name'] for x in files]
- self.assertEquals(sorted(files, cmp=locale.strcoll), files)
+ self.assertEqual(sorted(files, cmp=locale.strcoll), files)
def testContainerInfo(self):
info = self.env.container.info()
self.assert_status(204)
- self.assertEquals(info['object_count'], self.env.file_count)
- self.assertEquals(info['bytes_used'],
- self.env.file_count * self.env.file_size)
+ self.assertEqual(info['object_count'], self.env.file_count)
+ self.assertEqual(info['bytes_used'],
+ self.env.file_count * self.env.file_size)
def testContainerInfoOnContainerThatDoesNotExist(self):
container = self.env.account.container(Utils.create_name())
@@ -683,7 +686,7 @@ class TestContainer(Base):
for format_type in [None, 'json', 'xml']:
files = self.env.container.files(parms={'format': format_type,
'limit': 2})
- self.assertEquals(len(files), 2)
+ self.assertEqual(len(files), 2)
def testTooLongName(self):
cont = self.env.account.container('x' * 257)
@@ -838,7 +841,7 @@ class TestContainerPaths(Base):
if isinstance(files[0], dict):
files = [str(x['name']) for x in files]
- self.assertEquals(files, self.env.stored_files)
+ self.assertEqual(files, self.env.stored_files)
for format_type in ('json', 'xml'):
for file_item in self.env.container.files(parms={'format':
@@ -846,13 +849,13 @@ class TestContainerPaths(Base):
self.assert_(int(file_item['bytes']) >= 0)
self.assert_('last_modified' in file_item)
if file_item['name'].endswith('/'):
- self.assertEquals(file_item['content_type'],
- 'application/directory')
+ self.assertEqual(file_item['content_type'],
+ 'application/directory')
def testStructure(self):
def assert_listing(path, file_list):
files = self.env.container.files(parms={'path': path})
- self.assertEquals(sorted(file_list, cmp=locale.strcoll), files)
+ self.assertEqual(sorted(file_list, cmp=locale.strcoll), files)
if not normalized_urls:
assert_listing('/', ['/dir1/', '/dir2/', '/file1', '/file A'])
assert_listing('/dir1',
@@ -1176,7 +1179,7 @@ class TestFile(Base):
for i in container.files(parms={'format': 'json'}):
file_types_read[i['name'].split('.')[1]] = i['content_type']
- self.assertEquals(file_types, file_types_read)
+ self.assertEqual(file_types, file_types_read)
def testRangedGets(self):
file_length = 10000
@@ -1201,7 +1204,7 @@ class TestFile(Base):
self.assertRaises(ResponseError, file_item.read, hdrs=hdrs)
self.assert_status(416)
else:
- self.assertEquals(file_item.read(hdrs=hdrs), data[-i:])
+ self.assertEqual(file_item.read(hdrs=hdrs), data[-i:])
range_string = 'bytes=%d-' % (i)
hdrs = {'Range': range_string}
@@ -1350,9 +1353,9 @@ class TestFile(Base):
info = file_item.info()
self.assert_status(200)
- self.assertEquals(info['content_length'], self.env.file_size)
- self.assertEquals(info['etag'], md5)
- self.assertEquals(info['content_type'], content_type)
+ self.assertEqual(info['content_length'], self.env.file_size)
+ self.assertEqual(info['etag'], md5)
+ self.assertEqual(info['content_type'], content_type)
self.assert_('last_modified' in info)
def testDeleteOfFileThatDoesNotExist(self):
@@ -1395,7 +1398,7 @@ class TestFile(Base):
file_item = self.env.container.file(file_item.name)
self.assert_(file_item.initialize())
self.assert_status(200)
- self.assertEquals(file_item.metadata, metadata)
+ self.assertEqual(file_item.metadata, metadata)
def testGetContentType(self):
file_name = Utils.create_name()
@@ -1408,7 +1411,7 @@ class TestFile(Base):
file_item = self.env.container.file(file_name)
file_item.read()
- self.assertEquals(content_type, file_item.content_type)
+ self.assertEqual(content_type, file_item.content_type)
def testGetOnFileThatDoesNotExist(self):
# in container that exists
@@ -1449,7 +1452,7 @@ class TestFile(Base):
file_item = self.env.container.file(file_item.name)
self.assert_(file_item.initialize())
self.assert_status(200)
- self.assertEquals(file_item.metadata, metadata)
+ self.assertEqual(file_item.metadata, metadata)
def testSerialization(self):
container = self.env.account.container(Utils.create_name())
@@ -1478,9 +1481,9 @@ class TestFile(Base):
if f['name'] != file_item['name']:
continue
- self.assertEquals(file_item['content_type'],
- f['content_type'])
- self.assertEquals(int(file_item['bytes']), f['bytes'])
+ self.assertEqual(file_item['content_type'],
+ f['content_type'])
+ self.assertEqual(int(file_item['bytes']), f['bytes'])
d = datetime.strptime(
file_item['last_modified'].split('.')[0],
@@ -1488,7 +1491,7 @@ class TestFile(Base):
lm = time.mktime(d.timetuple())
if 'last_modified' in f:
- self.assertEquals(f['last_modified'], lm)
+ self.assertEqual(f['last_modified'], lm)
else:
f['last_modified'] = lm
@@ -1500,11 +1503,11 @@ class TestFile(Base):
headers = dict(self.env.conn.response.getheaders())
if format_type == 'json':
- self.assertEquals(headers['content-type'],
- 'application/json; charset=utf-8')
+ self.assertEqual(headers['content-type'],
+ 'application/json; charset=utf-8')
elif format_type == 'xml':
- self.assertEquals(headers['content-type'],
- 'application/xml; charset=utf-8')
+ self.assertEqual(headers['content-type'],
+ 'application/xml; charset=utf-8')
lm_diff = max([f['last_modified'] for f in files]) -\
min([f['last_modified'] for f in files])
@@ -1547,7 +1550,7 @@ class TestFile(Base):
self.assert_('etag' in headers.keys())
header_etag = headers['etag'].strip('"')
- self.assertEquals(etag, header_etag)
+ self.assertEqual(etag, header_etag)
def testChunkedPut(self):
if (web_front_end == 'apache2'):
@@ -1565,7 +1568,7 @@ class TestFile(Base):
self.assert_(data == file_item.read())
info = file_item.info()
- self.assertEquals(etag, info['etag'])
+ self.assertEqual(etag, info['etag'])
class TestFileUTF8(Base2, TestFile):
@@ -1677,12 +1680,30 @@ class TestDlo(Base):
file_contents,
"aaaaaaaaaabbbbbbbbbbccccccccccddddddddddeeeeeeeeeeffffffffff")
+ def test_copy_manifest(self):
+ # Copying the manifest should result in another manifest
+ try:
+ man1_item = self.env.container.file('man1')
+ man1_item.copy(self.env.container.name, "copied-man1",
+ parms={'multipart-manifest': 'get'})
+
+ copied = self.env.container.file("copied-man1")
+ copied_contents = copied.read(parms={'multipart-manifest': 'get'})
+ self.assertEqual(copied_contents, "man1-contents")
+
+ copied_contents = copied.read()
+ self.assertEqual(
+ copied_contents,
+ "aaaaaaaaaabbbbbbbbbbccccccccccddddddddddeeeeeeeeee")
+ finally:
+ # try not to leave this around for other tests to stumble over
+ self.env.container.file("copied-man1").delete()
class TestDloUTF8(Base2, TestDlo):
set_up = False
-class TestFileComparisonEnv:
+class TestFileComparisonEnv(object):
@classmethod
def setUp(cls):
cls.conn = Connection(config)
@@ -1806,19 +1827,8 @@ class TestSloEnv(object):
cls.conn.authenticate()
if cls.slo_enabled is None:
- status = cls.conn.make_request('GET', '/info',
- cfg={'verbatim_path': True})
- if not (200 <= status <= 299):
- # Can't tell if SLO is enabled or not since we're running
- # against an old cluster, so let's skip the tests instead of
- # possibly having spurious failures.
- cls.slo_enabled = False
- else:
- # Don't bother looking for ValueError here. If something is
- # responding to a GET /info request with invalid JSON, then
- # the cluster is broken and a test failure will let us know.
- cluster_info = json.loads(cls.conn.response.read())
- cls.slo_enabled = 'slo' in cluster_info
+ cluster_info = cls.conn.cluster_info()
+ cls.slo_enabled = 'slo' in cluster_info
if not cls.slo_enabled:
return
@@ -2034,5 +2044,347 @@ class TestSloUTF8(Base2, TestSlo):
set_up = False
+class TestObjectVersioningEnv(object):
+ versioning_enabled = None # tri-state: None initially, then True/False
+
+ @classmethod
+ def setUp(cls):
+ cls.conn = Connection(config)
+ cls.conn.authenticate()
+
+ cls.account = Account(cls.conn, config.get('account',
+ config['username']))
+
+ # avoid getting a prefix that stops halfway through an encoded
+ # character
+ prefix = Utils.create_name().decode("utf-8")[:10].encode("utf-8")
+
+ cls.versions_container = cls.account.container(prefix + "-versions")
+ if not cls.versions_container.create():
+ raise ResponseError(cls.conn.response)
+
+ cls.container = cls.account.container(prefix + "-objs")
+ if not cls.container.create(
+ hdrs={'X-Versions-Location': cls.versions_container.name}):
+ raise ResponseError(cls.conn.response)
+
+ container_info = cls.container.info()
+ # if versioning is off, then X-Versions-Location won't persist
+ cls.versioning_enabled = 'versions' in container_info
+
+
+class TestObjectVersioning(Base):
+ env = TestObjectVersioningEnv
+ set_up = False
+
+ def setUp(self):
+ super(TestObjectVersioning, self).setUp()
+ if self.env.versioning_enabled is False:
+ raise SkipTest("Object versioning not enabled")
+ elif self.env.versioning_enabled is not True:
+ # just some sanity checking
+ raise Exception(
+ "Expected versioning_enabled to be True/False, got %r" %
+ (self.env.versioning_enabled,))
+
+ def test_overwriting(self):
+ container = self.env.container
+ versions_container = self.env.versions_container
+ obj_name = Utils.create_name()
+
+ versioned_obj = container.file(obj_name)
+ versioned_obj.write("aaaaa")
+
+ self.assertEqual(0, versions_container.info()['object_count'])
+
+ versioned_obj.write("bbbbb")
+
+ # the old version got saved off
+ self.assertEqual(1, versions_container.info()['object_count'])
+ versioned_obj_name = versions_container.files()[0]
+ self.assertEqual(
+ "aaaaa", versions_container.file(versioned_obj_name).read())
+
+ # if we overwrite it again, there are two versions
+ versioned_obj.write("ccccc")
+ self.assertEqual(2, versions_container.info()['object_count'])
+
+ # as we delete things, the old contents return
+ self.assertEqual("ccccc", versioned_obj.read())
+ versioned_obj.delete()
+ self.assertEqual("bbbbb", versioned_obj.read())
+ versioned_obj.delete()
+ self.assertEqual("aaaaa", versioned_obj.read())
+ versioned_obj.delete()
+ self.assertRaises(ResponseError, versioned_obj.read)
+
+
+class TestObjectVersioningUTF8(Base2, TestObjectVersioning):
+ set_up = False
+
+
+class TestTempurlEnv(object):
+ tempurl_enabled = None # tri-state: None initially, then True/False
+
+ @classmethod
+ def setUp(cls):
+ cls.conn = Connection(config)
+ cls.conn.authenticate()
+
+ if cls.tempurl_enabled is None:
+ cluster_info = cls.conn.cluster_info()
+ cls.tempurl_enabled = 'tempurl' in cluster_info
+ if not cls.tempurl_enabled:
+ return
+ cls.tempurl_methods = cluster_info['tempurl']['methods']
+
+ cls.tempurl_key = Utils.create_name()
+ cls.tempurl_key2 = Utils.create_name()
+
+ cls.account = Account(
+ cls.conn, config.get('account', config['username']))
+ cls.account.delete_containers()
+ cls.account.update_metadata({
+ 'temp-url-key': cls.tempurl_key,
+ 'temp-url-key-2': cls.tempurl_key2
+ })
+
+ cls.container = cls.account.container(Utils.create_name())
+ if not cls.container.create():
+ raise ResponseError(cls.conn.response)
+
+ cls.obj = cls.container.file(Utils.create_name())
+ cls.obj.write("obj contents")
+ cls.other_obj = cls.container.file(Utils.create_name())
+ cls.other_obj.write("other obj contents")
+
+
+class TestTempurl(Base):
+ env = TestTempurlEnv
+ set_up = False
+
+ def setUp(self):
+ super(TestTempurl, self).setUp()
+ if self.env.tempurl_enabled is False:
+ raise SkipTest("TempURL not enabled")
+ elif self.env.tempurl_enabled is not True:
+ # just some sanity checking
+ raise Exception(
+ "Expected tempurl_enabled to be True/False, got %r" %
+ (self.env.tempurl_enabled,))
+
+ expires = int(time.time()) + 86400
+ sig = self.tempurl_sig(
+ 'GET', expires, self.env.conn.make_path(self.env.obj.path),
+ self.env.tempurl_key)
+ self.obj_tempurl_parms = {'temp_url_sig': sig,
+ 'temp_url_expires': str(expires)}
+
+ def tempurl_sig(self, method, expires, path, key):
+ return hmac.new(
+ key,
+ '%s\n%s\n%s' % (method, expires, urllib.unquote(path)),
+ hashlib.sha1).hexdigest()
+
+ def test_GET(self):
+ contents = self.env.obj.read(
+ parms=self.obj_tempurl_parms,
+ cfg={'no_auth_token': True})
+ self.assertEqual(contents, "obj contents")
+
+ # GET tempurls also allow HEAD requests
+ self.assert_(self.env.obj.info(parms=self.obj_tempurl_parms,
+ cfg={'no_auth_token': True}))
+
+ def test_GET_with_key_2(self):
+ expires = int(time.time()) + 86400
+ sig = self.tempurl_sig(
+ 'GET', expires, self.env.conn.make_path(self.env.obj.path),
+ self.env.tempurl_key2)
+ parms = {'temp_url_sig': sig,
+ 'temp_url_expires': str(expires)}
+
+ contents = self.env.obj.read(parms=parms, cfg={'no_auth_token': True})
+ self.assertEqual(contents, "obj contents")
+
+ def test_PUT(self):
+ new_obj = self.env.container.file(Utils.create_name())
+
+ expires = int(time.time()) + 86400
+ sig = self.tempurl_sig(
+ 'PUT', expires, self.env.conn.make_path(new_obj.path),
+ self.env.tempurl_key)
+ put_parms = {'temp_url_sig': sig,
+ 'temp_url_expires': str(expires)}
+
+ new_obj.write('new obj contents',
+ parms=put_parms, cfg={'no_auth_token': True})
+ self.assertEqual(new_obj.read(), "new obj contents")
+
+ # PUT tempurls also allow HEAD requests
+ self.assert_(new_obj.info(parms=put_parms,
+ cfg={'no_auth_token': True}))
+
+ def test_HEAD(self):
+ expires = int(time.time()) + 86400
+ sig = self.tempurl_sig(
+ 'HEAD', expires, self.env.conn.make_path(self.env.obj.path),
+ self.env.tempurl_key)
+ head_parms = {'temp_url_sig': sig,
+ 'temp_url_expires': str(expires)}
+
+ self.assert_(self.env.obj.info(parms=head_parms,
+ cfg={'no_auth_token': True}))
+ # HEAD tempurls don't allow PUT or GET requests, despite the fact that
+ # PUT and GET tempurls both allow HEAD requests
+ self.assertRaises(ResponseError, self.env.other_obj.read,
+ cfg={'no_auth_token': True},
+ parms=self.obj_tempurl_parms)
+ self.assert_status([401])
+
+ self.assertRaises(ResponseError, self.env.other_obj.write,
+ 'new contents',
+ cfg={'no_auth_token': True},
+ parms=self.obj_tempurl_parms)
+ self.assert_status([401])
+
+ def test_different_object(self):
+ contents = self.env.obj.read(
+ parms=self.obj_tempurl_parms,
+ cfg={'no_auth_token': True})
+ self.assertEqual(contents, "obj contents")
+
+ self.assertRaises(ResponseError, self.env.other_obj.read,
+ cfg={'no_auth_token': True},
+ parms=self.obj_tempurl_parms)
+ self.assert_status([401])
+
+ def test_changing_sig(self):
+ contents = self.env.obj.read(
+ parms=self.obj_tempurl_parms,
+ cfg={'no_auth_token': True})
+ self.assertEqual(contents, "obj contents")
+
+ parms = self.obj_tempurl_parms.copy()
+ if parms['temp_url_sig'][0] == 'a':
+ parms['temp_url_sig'] = 'b' + parms['temp_url_sig'][1:]
+ else:
+ parms['temp_url_sig'] = 'a' + parms['temp_url_sig'][1:]
+
+ self.assertRaises(ResponseError, self.env.obj.read,
+ cfg={'no_auth_token': True},
+ parms=parms)
+ self.assert_status([401])
+
+ def test_changing_expires(self):
+ contents = self.env.obj.read(
+ parms=self.obj_tempurl_parms,
+ cfg={'no_auth_token': True})
+ self.assertEqual(contents, "obj contents")
+
+ parms = self.obj_tempurl_parms.copy()
+ if parms['temp_url_expires'][-1] == '0':
+ parms['temp_url_expires'] = parms['temp_url_expires'][:-1] + '1'
+ else:
+ parms['temp_url_expires'] = parms['temp_url_expires'][:-1] + '0'
+
+ self.assertRaises(ResponseError, self.env.obj.read,
+ cfg={'no_auth_token': True},
+ parms=parms)
+ self.assert_status([401])
+
+
+class TestTempurlUTF8(Base2, TestTempurl):
+ set_up = False
+
+
+class TestSloTempurlEnv(object):
+ enabled = None # tri-state: None initially, then True/False
+
+ @classmethod
+ def setUp(cls):
+ cls.conn = Connection(config)
+ cls.conn.authenticate()
+
+ if cls.enabled is None:
+ cluster_info = cls.conn.cluster_info()
+ cls.enabled = 'tempurl' in cluster_info and 'slo' in cluster_info
+
+ cls.tempurl_key = Utils.create_name()
+
+ cls.account = Account(
+ cls.conn, config.get('account', config['username']))
+ cls.account.delete_containers()
+ cls.account.update_metadata({'temp-url-key': cls.tempurl_key})
+
+ cls.manifest_container = cls.account.container(Utils.create_name())
+ cls.segments_container = cls.account.container(Utils.create_name())
+ if not cls.manifest_container.create():
+ raise ResponseError(cls.conn.response)
+ if not cls.segments_container.create():
+ raise ResponseError(cls.conn.response)
+
+ seg1 = cls.segments_container.file(Utils.create_name())
+ seg1.write('1' * 1024 * 1024)
+
+ seg2 = cls.segments_container.file(Utils.create_name())
+ seg2.write('2' * 1024 * 1024)
+
+ cls.manifest_data = [{'size_bytes': 1024 * 1024,
+ 'etag': seg1.md5,
+ 'path': '/%s/%s' % (cls.segments_container.name,
+ seg1.name)},
+ {'size_bytes': 1024 * 1024,
+ 'etag': seg2.md5,
+ 'path': '/%s/%s' % (cls.segments_container.name,
+ seg2.name)}]
+
+ cls.manifest = cls.manifest_container.file(Utils.create_name())
+ cls.manifest.write(
+ json.dumps(cls.manifest_data),
+ parms={'multipart-manifest': 'put'})
+
+
+class TestSloTempurl(Base):
+ env = TestSloTempurlEnv
+ set_up = False
+
+ def setUp(self):
+ super(TestSloTempurl, self).setUp()
+ if self.env.enabled is False:
+ raise SkipTest("TempURL and SLO not both enabled")
+ elif self.env.enabled is not True:
+ # just some sanity checking
+ raise Exception(
+ "Expected enabled to be True/False, got %r" %
+ (self.env.enabled,))
+
+ def tempurl_sig(self, method, expires, path, key):
+ return hmac.new(
+ key,
+ '%s\n%s\n%s' % (method, expires, urllib.unquote(path)),
+ hashlib.sha1).hexdigest()
+
+ def test_GET(self):
+ expires = int(time.time()) + 86400
+ sig = self.tempurl_sig(
+ 'GET', expires, self.env.conn.make_path(self.env.manifest.path),
+ self.env.tempurl_key)
+ parms = {'temp_url_sig': sig, 'temp_url_expires': str(expires)}
+
+ contents = self.env.manifest.read(
+ parms=parms,
+ cfg={'no_auth_token': True})
+ self.assertEqual(len(contents), 2 * 1024 * 1024)
+
+ # GET tempurls also allow HEAD requests
+ self.assert_(self.env.manifest.info(
+ parms=parms, cfg={'no_auth_token': True}))
+
+
+class TestSloTempurlUTF8(Base2, TestSloTempurl):
+ set_up = False
+
+
if __name__ == '__main__':
unittest.main()