summaryrefslogtreecommitdiffstats
path: root/test/functional/test_object.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/functional/test_object.py')
-rwxr-xr-xtest/functional/test_object.py353
1 files changed, 289 insertions, 64 deletions
diff --git a/test/functional/test_object.py b/test/functional/test_object.py
index 675de30..e74a7f6 100755
--- a/test/functional/test_object.py
+++ b/test/functional/test_object.py
@@ -21,24 +21,22 @@ from uuid import uuid4
from swift.common.utils import json
-from swift_testing import check_response, retry, skip, skip3, \
- swift_test_perm, web_front_end, requires_acls, swift_test_user
+from test.functional import check_response, retry, requires_acls, \
+ requires_policies
+import test.functional as tf
class TestObject(unittest.TestCase):
def setUp(self):
- if skip:
+ if tf.skip:
raise SkipTest
self.container = uuid4().hex
- def put(url, token, parsed, conn):
- conn.request('PUT', parsed.path + '/' + self.container, '',
- {'X-Auth-Token': token})
- return check_response(conn)
- resp = retry(put)
- resp.read()
- self.assertEqual(resp.status, 201)
+ self.containers = []
+ self._create_container(self.container)
+ self._create_container(self.container, use_account=2)
+
self.obj = uuid4().hex
def put(url, token, parsed, conn):
@@ -50,40 +48,65 @@ class TestObject(unittest.TestCase):
resp.read()
self.assertEqual(resp.status, 201)
+ def _create_container(self, name=None, headers=None, use_account=1):
+ if not name:
+ name = uuid4().hex
+ self.containers.append(name)
+ headers = headers or {}
+
+ def put(url, token, parsed, conn, name):
+ new_headers = dict({'X-Auth-Token': token}, **headers)
+ conn.request('PUT', parsed.path + '/' + name, '',
+ new_headers)
+ return check_response(conn)
+ resp = retry(put, name, use_account=use_account)
+ resp.read()
+ self.assertEqual(resp.status, 201)
+ return name
+
def tearDown(self):
- if skip:
+ if tf.skip:
raise SkipTest
- def delete(url, token, parsed, conn, obj):
- conn.request('DELETE',
- '%s/%s/%s' % (parsed.path, self.container, obj),
- '', {'X-Auth-Token': token})
+ # get list of objects in container
+ def get(url, token, parsed, conn, container):
+ conn.request(
+ 'GET', parsed.path + '/' + container + '?format=json', '',
+ {'X-Auth-Token': token})
return check_response(conn)
- # get list of objects in container
- def list(url, token, parsed, conn):
- conn.request('GET',
- '%s/%s' % (parsed.path, self.container),
- '', {'X-Auth-Token': token})
+ # delete an object
+ def delete(url, token, parsed, conn, container, obj):
+ conn.request(
+ 'DELETE', '/'.join([parsed.path, container, obj['name']]), '',
+ {'X-Auth-Token': token})
return check_response(conn)
- resp = retry(list)
- object_listing = resp.read()
- self.assertEqual(resp.status, 200)
- # iterate over object listing and delete all objects
- for obj in object_listing.splitlines():
- resp = retry(delete, obj)
- resp.read()
- self.assertEqual(resp.status, 204)
+ for container in self.containers:
+ while True:
+ resp = retry(get, container)
+ body = resp.read()
+ if resp.status == 404:
+ break
+ self.assert_(resp.status // 100 == 2, resp.status)
+ objs = json.loads(body)
+ if not objs:
+ break
+ for obj in objs:
+ resp = retry(delete, container, obj)
+ resp.read()
+ self.assertEqual(resp.status, 204)
# delete the container
- def delete(url, token, parsed, conn):
- conn.request('DELETE', parsed.path + '/' + self.container, '',
+ def delete(url, token, parsed, conn, name):
+ conn.request('DELETE', parsed.path + '/' + name, '',
{'X-Auth-Token': token})
return check_response(conn)
- resp = retry(delete)
- resp.read()
- self.assertEqual(resp.status, 204)
+
+ for container in self.containers:
+ resp = retry(delete, container)
+ resp.read()
+ self.assert_(resp.status in (204, 404))
def test_if_none_match(self):
def put(url, token, parsed, conn):
@@ -111,8 +134,47 @@ class TestObject(unittest.TestCase):
resp.read()
self.assertEquals(resp.status, 400)
+ def test_non_integer_x_delete_after(self):
+ def put(url, token, parsed, conn):
+ conn.request('PUT', '%s/%s/%s' % (parsed.path, self.container,
+ 'non_integer_x_delete_after'),
+ '', {'X-Auth-Token': token,
+ 'Content-Length': '0',
+ 'X-Delete-After': '*'})
+ return check_response(conn)
+ resp = retry(put)
+ body = resp.read()
+ self.assertEquals(resp.status, 400)
+ self.assertEqual(body, 'Non-integer X-Delete-After')
+
+ def test_non_integer_x_delete_at(self):
+ def put(url, token, parsed, conn):
+ conn.request('PUT', '%s/%s/%s' % (parsed.path, self.container,
+ 'non_integer_x_delete_at'),
+ '', {'X-Auth-Token': token,
+ 'Content-Length': '0',
+ 'X-Delete-At': '*'})
+ return check_response(conn)
+ resp = retry(put)
+ body = resp.read()
+ self.assertEquals(resp.status, 400)
+ self.assertEqual(body, 'Non-integer X-Delete-At')
+
+ def test_x_delete_at_in_the_past(self):
+ def put(url, token, parsed, conn):
+ conn.request('PUT', '%s/%s/%s' % (parsed.path, self.container,
+ 'x_delete_at_in_the_past'),
+ '', {'X-Auth-Token': token,
+ 'Content-Length': '0',
+ 'X-Delete-At': '0'})
+ return check_response(conn)
+ resp = retry(put)
+ body = resp.read()
+ self.assertEquals(resp.status, 400)
+ self.assertEqual(body, 'X-Delete-At in past')
+
def test_copy_object(self):
- if skip:
+ if tf.skip:
raise SkipTest
source = '%s/%s' % (self.container, self.obj)
@@ -185,8 +247,118 @@ class TestObject(unittest.TestCase):
resp.read()
self.assertEqual(resp.status, 204)
+ def test_copy_between_accounts(self):
+ if tf.skip:
+ raise SkipTest
+
+ source = '%s/%s' % (self.container, self.obj)
+ dest = '%s/%s' % (self.container, 'test_copy')
+
+ # get contents of source
+ def get_source(url, token, parsed, conn):
+ conn.request('GET',
+ '%s/%s' % (parsed.path, source),
+ '', {'X-Auth-Token': token})
+ return check_response(conn)
+ resp = retry(get_source)
+ source_contents = resp.read()
+ self.assertEqual(resp.status, 200)
+ self.assertEqual(source_contents, 'test')
+
+ acct = tf.parsed[0].path.split('/', 2)[2]
+
+ # copy source to dest with X-Copy-From-Account
+ def put(url, token, parsed, conn):
+ conn.request('PUT', '%s/%s' % (parsed.path, dest), '',
+ {'X-Auth-Token': token,
+ 'Content-Length': '0',
+ 'X-Copy-From-Account': acct,
+ 'X-Copy-From': source})
+ return check_response(conn)
+ # try to put, will not succeed
+ # user does not have permissions to read from source
+ resp = retry(put, use_account=2)
+ self.assertEqual(resp.status, 403)
+
+ # add acl to allow reading from source
+ def post(url, token, parsed, conn):
+ conn.request('POST', '%s/%s' % (parsed.path, self.container), '',
+ {'X-Auth-Token': token,
+ 'X-Container-Read': tf.swift_test_perm[1]})
+ return check_response(conn)
+ resp = retry(post)
+ self.assertEqual(resp.status, 204)
+
+ # retry previous put, now should succeed
+ resp = retry(put, use_account=2)
+ self.assertEqual(resp.status, 201)
+
+ # contents of dest should be the same as source
+ def get_dest(url, token, parsed, conn):
+ conn.request('GET',
+ '%s/%s' % (parsed.path, dest),
+ '', {'X-Auth-Token': token})
+ return check_response(conn)
+ resp = retry(get_dest, use_account=2)
+ dest_contents = resp.read()
+ self.assertEqual(resp.status, 200)
+ self.assertEqual(dest_contents, source_contents)
+
+ # delete the copy
+ def delete(url, token, parsed, conn):
+ conn.request('DELETE', '%s/%s' % (parsed.path, dest), '',
+ {'X-Auth-Token': token})
+ return check_response(conn)
+ resp = retry(delete, use_account=2)
+ resp.read()
+ self.assertEqual(resp.status, 204)
+ # verify dest does not exist
+ resp = retry(get_dest, use_account=2)
+ resp.read()
+ self.assertEqual(resp.status, 404)
+
+ acct_dest = tf.parsed[1].path.split('/', 2)[2]
+
+ # copy source to dest with COPY
+ def copy(url, token, parsed, conn):
+ conn.request('COPY', '%s/%s' % (parsed.path, source), '',
+ {'X-Auth-Token': token,
+ 'Destination-Account': acct_dest,
+ 'Destination': dest})
+ return check_response(conn)
+ # try to copy, will not succeed
+ # user does not have permissions to write to destination
+ resp = retry(copy)
+ resp.read()
+ self.assertEqual(resp.status, 403)
+
+ # add acl to allow write to destination
+ def post(url, token, parsed, conn):
+ conn.request('POST', '%s/%s' % (parsed.path, self.container), '',
+ {'X-Auth-Token': token,
+ 'X-Container-Write': tf.swift_test_perm[0]})
+ return check_response(conn)
+ resp = retry(post, use_account=2)
+ self.assertEqual(resp.status, 204)
+
+ # now copy will succeed
+ resp = retry(copy)
+ resp.read()
+ self.assertEqual(resp.status, 201)
+
+ # contents of dest should be the same as source
+ resp = retry(get_dest, use_account=2)
+ dest_contents = resp.read()
+ self.assertEqual(resp.status, 200)
+ self.assertEqual(dest_contents, source_contents)
+
+ # delete the copy
+ resp = retry(delete, use_account=2)
+ resp.read()
+ self.assertEqual(resp.status, 204)
+
def test_public_object(self):
- if skip:
+ if tf.skip:
raise SkipTest
def get(url, token, parsed, conn):
@@ -225,7 +397,7 @@ class TestObject(unittest.TestCase):
self.assert_(str(err).startswith('No result after '))
def test_private_object(self):
- if skip or skip3:
+ if tf.skip or tf.skip3:
raise SkipTest
# Ensure we can't access the object with the third account
@@ -245,8 +417,8 @@ class TestObject(unittest.TestCase):
conn.request('PUT', '%s/%s' % (
parsed.path, shared_container), '',
{'X-Auth-Token': token,
- 'X-Container-Read': swift_test_perm[2],
- 'X-Container-Write': swift_test_perm[2]})
+ 'X-Container-Read': tf.swift_test_perm[2],
+ 'X-Container-Write': tf.swift_test_perm[2]})
return check_response(conn)
resp = retry(put)
resp.read()
@@ -319,8 +491,8 @@ class TestObject(unittest.TestCase):
@requires_acls
def test_read_only(self):
- if skip3:
- raise SkipTest
+ if tf.skip3:
+ raise tf.SkipTest
def get_listing(url, token, parsed, conn):
conn.request('GET', '%s/%s' % (parsed.path, self.container), '',
@@ -361,7 +533,7 @@ class TestObject(unittest.TestCase):
self.assertEquals(resp.status, 403)
# grant read-only access
- acl_user = swift_test_user[2]
+ acl_user = tf.swift_test_user[2]
acl = {'read-only': [acl_user]}
headers = {'x-account-access-control': json.dumps(acl)}
resp = retry(post_account, headers=headers, use_account=1)
@@ -400,7 +572,7 @@ class TestObject(unittest.TestCase):
@requires_acls
def test_read_write(self):
- if skip3:
+ if tf.skip3:
raise SkipTest
def get_listing(url, token, parsed, conn):
@@ -442,7 +614,7 @@ class TestObject(unittest.TestCase):
self.assertEquals(resp.status, 403)
# grant read-write access
- acl_user = swift_test_user[2]
+ acl_user = tf.swift_test_user[2]
acl = {'read-write': [acl_user]}
headers = {'x-account-access-control': json.dumps(acl)}
resp = retry(post_account, headers=headers, use_account=1)
@@ -481,7 +653,7 @@ class TestObject(unittest.TestCase):
@requires_acls
def test_admin(self):
- if skip3:
+ if tf.skip3:
raise SkipTest
def get_listing(url, token, parsed, conn):
@@ -523,7 +695,7 @@ class TestObject(unittest.TestCase):
self.assertEquals(resp.status, 403)
# grant admin access
- acl_user = swift_test_user[2]
+ acl_user = tf.swift_test_user[2]
acl = {'admin': [acl_user]}
headers = {'x-account-access-control': json.dumps(acl)}
resp = retry(post_account, headers=headers, use_account=1)
@@ -561,7 +733,7 @@ class TestObject(unittest.TestCase):
self.assert_(self.obj not in listing)
def test_manifest(self):
- if skip:
+ if tf.skip:
raise SkipTest
# Data for the object segments
segments1 = ['one', 'two', 'three', 'four', 'five']
@@ -672,7 +844,7 @@ class TestObject(unittest.TestCase):
self.assertEqual(resp.read(), ''.join(segments2))
self.assertEqual(resp.status, 200)
- if not skip3:
+ if not tf.skip3:
# Ensure we can't access the manifest with the third account
def get(url, token, parsed, conn):
@@ -687,7 +859,7 @@ class TestObject(unittest.TestCase):
def post(url, token, parsed, conn):
conn.request('POST', '%s/%s' % (parsed.path, self.container),
'', {'X-Auth-Token': token,
- 'X-Container-Read': swift_test_perm[2]})
+ 'X-Container-Read': tf.swift_test_perm[2]})
return check_response(conn)
resp = retry(post)
resp.read()
@@ -745,7 +917,7 @@ class TestObject(unittest.TestCase):
self.assertEqual(resp.read(), ''.join(segments3))
self.assertEqual(resp.status, 200)
- if not skip3:
+ if not tf.skip3:
# Ensure we can't access the manifest with the third account
# (because the segments are in a protected container even if the
@@ -763,7 +935,7 @@ class TestObject(unittest.TestCase):
def post(url, token, parsed, conn):
conn.request('POST', '%s/%s' % (parsed.path, acontainer),
'', {'X-Auth-Token': token,
- 'X-Container-Read': swift_test_perm[2]})
+ 'X-Container-Read': tf.swift_test_perm[2]})
return check_response(conn)
resp = retry(post)
resp.read()
@@ -831,7 +1003,7 @@ class TestObject(unittest.TestCase):
self.assertEqual(resp.status, 204)
def test_delete_content_type(self):
- if skip:
+ if tf.skip:
raise SkipTest
def put(url, token, parsed, conn):
@@ -853,7 +1025,7 @@ class TestObject(unittest.TestCase):
'text/html; charset=UTF-8')
def test_delete_if_delete_at_bad(self):
- if skip:
+ if tf.skip:
raise SkipTest
def put(url, token, parsed, conn):
@@ -875,7 +1047,7 @@ class TestObject(unittest.TestCase):
self.assertEqual(resp.status, 400)
def test_null_name(self):
- if skip:
+ if tf.skip:
raise SkipTest
def put(url, token, parsed, conn):
@@ -884,23 +1056,20 @@ class TestObject(unittest.TestCase):
self.container), 'test', {'X-Auth-Token': token})
return check_response(conn)
resp = retry(put)
- if (web_front_end == 'apache2'):
+ if (tf.web_front_end == 'apache2'):
self.assertEqual(resp.status, 404)
else:
self.assertEqual(resp.read(), 'Invalid UTF8 or contains NULL')
self.assertEqual(resp.status, 412)
def test_cors(self):
- if skip:
+ if tf.skip:
raise SkipTest
- def is_strict_mode(url, token, parsed, conn):
- conn.request('GET', '/info')
- resp = conn.getresponse()
- if resp.status // 100 == 2:
- info = json.loads(resp.read())
- return info.get('swift', {}).get('strict_cors_mode', False)
- return False
+ try:
+ strict_cors = tf.cluster_info['swift']['strict_cors_mode']
+ except KeyError:
+ raise SkipTest("cors mode is unknown")
def put_cors_cont(url, token, parsed, conn, orig):
conn.request(
@@ -924,8 +1093,6 @@ class TestObject(unittest.TestCase):
'', headers)
return conn.getresponse()
- strict_cors = retry(is_strict_mode)
-
resp = retry(put_cors_cont, '*')
resp.read()
self.assertEquals(resp.status // 100, 2)
@@ -1001,6 +1168,64 @@ class TestObject(unittest.TestCase):
self.assertEquals(headers.get('access-control-allow-origin'),
'http://m.com')
+ @requires_policies
+ def test_cross_policy_copy(self):
+ # create container in first policy
+ policy = self.policies.select()
+ container = self._create_container(
+ headers={'X-Storage-Policy': policy['name']})
+ obj = uuid4().hex
+
+ # create a container in second policy
+ other_policy = self.policies.exclude(name=policy['name']).select()
+ other_container = self._create_container(
+ headers={'X-Storage-Policy': other_policy['name']})
+ other_obj = uuid4().hex
+
+ def put_obj(url, token, parsed, conn, container, obj):
+ # to keep track of things, use the original path as the body
+ content = '%s/%s' % (container, obj)
+ path = '%s/%s' % (parsed.path, content)
+ conn.request('PUT', path, content, {'X-Auth-Token': token})
+ return check_response(conn)
+
+ # create objects
+ for c, o in zip((container, other_container), (obj, other_obj)):
+ resp = retry(put_obj, c, o)
+ resp.read()
+ self.assertEqual(resp.status, 201)
+
+ def put_copy_from(url, token, parsed, conn, container, obj, source):
+ dest_path = '%s/%s/%s' % (parsed.path, container, obj)
+ conn.request('PUT', dest_path, '',
+ {'X-Auth-Token': token,
+ 'Content-Length': '0',
+ 'X-Copy-From': source})
+ return check_response(conn)
+
+ copy_requests = (
+ (container, other_obj, '%s/%s' % (other_container, other_obj)),
+ (other_container, obj, '%s/%s' % (container, obj)),
+ )
+
+ # copy objects
+ for c, o, source in copy_requests:
+ resp = retry(put_copy_from, c, o, source)
+ resp.read()
+ self.assertEqual(resp.status, 201)
+
+ def get_obj(url, token, parsed, conn, container, obj):
+ path = '%s/%s/%s' % (parsed.path, container, obj)
+ conn.request('GET', path, '', {'X-Auth-Token': token})
+ return check_response(conn)
+
+ # validate contents, contents should be source
+ validate_requests = copy_requests
+ for c, o, body in validate_requests:
+ resp = retry(get_obj, c, o)
+ self.assertEqual(resp.status, 200)
+ self.assertEqual(body, resp.read())
+
if __name__ == '__main__':
unittest.main()