summaryrefslogtreecommitdiffstats
path: root/test/functional/test_object.py
diff options
context:
space:
mode:
authorThiago da Silva <thiago@redhat.com>2014-04-22 14:15:02 -0400
committerPrashanth Pai <ppai@redhat.com>2016-01-06 07:53:12 -0800
commit2a8f9f0f530327039c32e444b6a27130b12666bd (patch)
treee24e38b5b3c0245a0acafc63fc50bacbf7de718a /test/functional/test_object.py
parent4c6ca1db931377b75583f61a7bca262cfc27b0fa (diff)
Update repo
This is a squashed commit imported from this repo: https://github.com/openstack/swiftonfile/tree/icehouse Contains the follwing commits from above mentioned repo: eb50236 Merge "Backport: Fix metadata overall limits bug" into icehouse 79ea52a Backport: Fix metadata overall limits bug bc43f0b Fix inconsistent data being returned on GET ad0bb79 Import HTTPBadRequest from swift's module 74d02e6 Exclude .trashcan dir from container listing b2dbc15 Catch ESTALE in addition to ENOENT 8d60b48 Properly handle read_metadata() exceptions 6762fc6 Fix object server leaking file descriptors 2842e82 Fix API incompatibility in update_metadata() 2beeef6 Merge "Remove swiftkerbauth code" into icehouse 93dbcb5 Update object-expirer.conf with explanations c9d2f09 Merge "Check if /etc/swift exists in ring builder" into icehouse d66c14c Remove swiftkerbauth code 3142ed2 Add object expiration functests 97153d1 Merge "Cleanup functest and undo old patch" into icehouse bc234d0 Remove old travis config file and fix typo 260c8ef Check if /etc/swift exists in ring builder 637dac9 Cleanup functest and undo old patch 051e068 Merge pull request #35 from prashanthpai/backport-1 be104a3 Merge pull request #36 from prashanthpai/backport-2 ff76f42 fix issue with GET on large object (icehouse-backport) 04d0a99 Fix unlink call after successful rename 4c6ca1d updating README file with project name change 10b2680 Merge pull request #18 from thiagol11/icehouse 5bcab8f Updating version on __init__ file 5c2cba2 Merge pull request #15 from thiagol11/update_spec 52b00a8 updating spec file to add dependency on swift icehouse ae7c93b Merge pull request #6 from prashanthpai/rebase 191e55b Revert: allow non-root user to run functests cb7e968 Modify unit tests and func tests d23fd1b Sync with OpenStack Swift v1.13.1 b6d1671 Merge pull request #12 from pushpesh/functionalnosetestremove 962622b Merge pull request #8 from thiagol11/update_readme 4560857 Merge pull request #9 from prashanthpai/spec-expirer be0ae7e Minor update 65000f1 Removing functionalnosetests 8ab1069 Fix object-expirer.conf-gluster RPM build error afee30f added new support filesystem section 527b01f updated README.md to Swift-On-File 9a240c7 Merge pull request #3 from thiagol11/add_jenkins_to_travis 34b5a8b removing blank lines 3568b64 fixing missing fi d8f5b0f adding support to run jenkins triggered by travis 6f4a88c Removing functionalnosetests 8041944 Update README.md c015148 Merge pull request #2 from thiagol11/master 3ddd952 fixing travis file to run correct unit test c582669 adding travis status badge to README 8093096 adding py26 unit testing to travis 37835fd trigger travis build cb6332a adding travis ci testing All tests have been run sucessfully against this. tox -e p2p8,py27,functest Change-Id: I096b611da852d3eb3913844034b443b8272c2ac4 Signed-off-by: Prashanth Pai <ppai@redhat.com> Reviewed-on: http://review.gluster.org/13188
Diffstat (limited to 'test/functional/test_object.py')
-rwxr-xr-xtest/functional/test_object.py524
1 files changed, 453 insertions, 71 deletions
diff --git a/test/functional/test_object.py b/test/functional/test_object.py
index dad8635..675de30 100755
--- a/test/functional/test_object.py
+++ b/test/functional/test_object.py
@@ -19,8 +19,10 @@ import unittest
from nose import SkipTest
from uuid import uuid4
+from swift.common.utils import json
+
from swift_testing import check_response, retry, skip, skip3, \
- swift_test_perm, web_front_end
+ swift_test_perm, web_front_end, requires_acls, swift_test_user
class TestObject(unittest.TestCase):
@@ -36,7 +38,7 @@ class TestObject(unittest.TestCase):
return check_response(conn)
resp = retry(put)
resp.read()
- self.assertEquals(resp.status, 201)
+ self.assertEqual(resp.status, 201)
self.obj = uuid4().hex
def put(url, token, parsed, conn):
@@ -46,7 +48,7 @@ class TestObject(unittest.TestCase):
return check_response(conn)
resp = retry(put)
resp.read()
- self.assertEquals(resp.status, 201)
+ self.assertEqual(resp.status, 201)
def tearDown(self):
if skip:
@@ -66,13 +68,13 @@ class TestObject(unittest.TestCase):
return check_response(conn)
resp = retry(list)
object_listing = resp.read()
- self.assertEquals(resp.status, 200)
+ self.assertEqual(resp.status, 200)
# iterate over object listing and delete all objects
for obj in object_listing.splitlines():
resp = retry(delete, obj)
resp.read()
- self.assertEquals(resp.status, 204)
+ self.assertEqual(resp.status, 204)
# delete the container
def delete(url, token, parsed, conn):
@@ -81,7 +83,33 @@ class TestObject(unittest.TestCase):
return check_response(conn)
resp = retry(delete)
resp.read()
- self.assertEquals(resp.status, 204)
+ self.assertEqual(resp.status, 204)
+
+ def test_if_none_match(self):
+ def put(url, token, parsed, conn):
+ conn.request('PUT', '%s/%s/%s' % (
+ parsed.path, self.container, 'if_none_match_test'), '',
+ {'X-Auth-Token': token,
+ 'Content-Length': '0',
+ 'If-None-Match': '*'})
+ return check_response(conn)
+ resp = retry(put)
+ resp.read()
+ self.assertEquals(resp.status, 201)
+ resp = retry(put)
+ resp.read()
+ self.assertEquals(resp.status, 412)
+
+ def put(url, token, parsed, conn):
+ conn.request('PUT', '%s/%s/%s' % (
+ parsed.path, self.container, 'if_none_match_test'), '',
+ {'X-Auth-Token': token,
+ 'Content-Length': '0',
+ 'If-None-Match': 'somethingelse'})
+ return check_response(conn)
+ resp = retry(put)
+ resp.read()
+ self.assertEquals(resp.status, 400)
def test_copy_object(self):
if skip:
@@ -98,8 +126,8 @@ class TestObject(unittest.TestCase):
return check_response(conn)
resp = retry(get_source)
source_contents = resp.read()
- self.assertEquals(resp.status, 200)
- self.assertEquals(source_contents, 'test')
+ self.assertEqual(resp.status, 200)
+ self.assertEqual(source_contents, 'test')
# copy source to dest with X-Copy-From
def put(url, token, parsed, conn):
@@ -110,7 +138,7 @@ class TestObject(unittest.TestCase):
return check_response(conn)
resp = retry(put)
resp.read()
- self.assertEquals(resp.status, 201)
+ self.assertEqual(resp.status, 201)
# contents of dest should be the same as source
def get_dest(url, token, parsed, conn):
@@ -120,8 +148,8 @@ class TestObject(unittest.TestCase):
return check_response(conn)
resp = retry(get_dest)
dest_contents = resp.read()
- self.assertEquals(resp.status, 200)
- self.assertEquals(dest_contents, source_contents)
+ self.assertEqual(resp.status, 200)
+ self.assertEqual(dest_contents, source_contents)
# delete the copy
def delete(url, token, parsed, conn):
@@ -130,11 +158,11 @@ class TestObject(unittest.TestCase):
return check_response(conn)
resp = retry(delete)
resp.read()
- self.assertEquals(resp.status, 204)
+ self.assertEqual(resp.status, 204)
# verify dest does not exist
resp = retry(get_dest)
resp.read()
- self.assertEquals(resp.status, 404)
+ self.assertEqual(resp.status, 404)
# copy source to dest with COPY
def copy(url, token, parsed, conn):
@@ -144,18 +172,18 @@ class TestObject(unittest.TestCase):
return check_response(conn)
resp = retry(copy)
resp.read()
- self.assertEquals(resp.status, 201)
+ self.assertEqual(resp.status, 201)
# contents of dest should be the same as source
resp = retry(get_dest)
dest_contents = resp.read()
- self.assertEquals(resp.status, 200)
- self.assertEquals(dest_contents, source_contents)
+ self.assertEqual(resp.status, 200)
+ self.assertEqual(dest_contents, source_contents)
# delete the copy
resp = retry(delete)
resp.read()
- self.assertEquals(resp.status, 204)
+ self.assertEqual(resp.status, 204)
def test_public_object(self):
if skip:
@@ -178,10 +206,10 @@ class TestObject(unittest.TestCase):
return check_response(conn)
resp = retry(post)
resp.read()
- self.assertEquals(resp.status, 204)
+ self.assertEqual(resp.status, 204)
resp = retry(get)
resp.read()
- self.assertEquals(resp.status, 200)
+ self.assertEqual(resp.status, 200)
def post(url, token, parsed, conn):
conn.request('POST', parsed.path + '/' + self.container, '',
@@ -189,7 +217,7 @@ class TestObject(unittest.TestCase):
return check_response(conn)
resp = retry(post)
resp.read()
- self.assertEquals(resp.status, 204)
+ self.assertEqual(resp.status, 204)
try:
resp = retry(get)
raise Exception('Should not have been able to GET')
@@ -208,7 +236,7 @@ class TestObject(unittest.TestCase):
return check_response(conn)
resp = retry(get, use_account=3)
resp.read()
- self.assertEquals(resp.status, 403)
+ self.assertEqual(resp.status, 403)
# create a shared container writable by account3
shared_container = uuid4().hex
@@ -222,7 +250,7 @@ class TestObject(unittest.TestCase):
return check_response(conn)
resp = retry(put)
resp.read()
- self.assertEquals(resp.status, 201)
+ self.assertEqual(resp.status, 201)
# verify third account can not copy from private container
def copy(url, token, parsed, conn):
@@ -234,7 +262,7 @@ class TestObject(unittest.TestCase):
return check_response(conn)
resp = retry(copy, use_account=3)
resp.read()
- self.assertEquals(resp.status, 403)
+ self.assertEqual(resp.status, 403)
# verify third account can write "obj1" to shared container
def put(url, token, parsed, conn):
@@ -244,7 +272,7 @@ class TestObject(unittest.TestCase):
return check_response(conn)
resp = retry(put, use_account=3)
resp.read()
- self.assertEquals(resp.status, 201)
+ self.assertEqual(resp.status, 201)
# verify third account can copy "obj1" to shared container
def copy2(url, token, parsed, conn):
@@ -255,7 +283,7 @@ class TestObject(unittest.TestCase):
return check_response(conn)
resp = retry(copy2, use_account=3)
resp.read()
- self.assertEquals(resp.status, 201)
+ self.assertEqual(resp.status, 201)
# verify third account STILL can not copy from private container
def copy3(url, token, parsed, conn):
@@ -267,7 +295,7 @@ class TestObject(unittest.TestCase):
return check_response(conn)
resp = retry(copy3, use_account=3)
resp.read()
- self.assertEquals(resp.status, 403)
+ self.assertEqual(resp.status, 403)
# clean up "obj1"
def delete(url, token, parsed, conn):
@@ -277,7 +305,7 @@ class TestObject(unittest.TestCase):
return check_response(conn)
resp = retry(delete)
resp.read()
- self.assertEquals(resp.status, 204)
+ self.assertEqual(resp.status, 204)
# clean up shared_container
def delete(url, token, parsed, conn):
@@ -287,8 +315,251 @@ class TestObject(unittest.TestCase):
return check_response(conn)
resp = retry(delete)
resp.read()
+ self.assertEqual(resp.status, 204)
+
+ @requires_acls
+ def test_read_only(self):
+ if skip3:
+ raise SkipTest
+
+ def get_listing(url, token, parsed, conn):
+ conn.request('GET', '%s/%s' % (parsed.path, self.container), '',
+ {'X-Auth-Token': token})
+ return check_response(conn)
+
+ def post_account(url, token, parsed, conn, headers):
+ new_headers = dict({'X-Auth-Token': token}, **headers)
+ conn.request('POST', parsed.path, '', new_headers)
+ return check_response(conn)
+
+ def get(url, token, parsed, conn, name):
+ conn.request('GET', '%s/%s/%s' % (
+ parsed.path, self.container, name), '',
+ {'X-Auth-Token': token})
+ return check_response(conn)
+
+ def put(url, token, parsed, conn, name):
+ conn.request('PUT', '%s/%s/%s' % (
+ parsed.path, self.container, name), 'test',
+ {'X-Auth-Token': token})
+ return check_response(conn)
+
+ def delete(url, token, parsed, conn, name):
+ conn.request('PUT', '%s/%s/%s' % (
+ parsed.path, self.container, name), '',
+ {'X-Auth-Token': token})
+ return check_response(conn)
+
+ # cannot list objects
+ resp = retry(get_listing, use_account=3)
+ resp.read()
+ self.assertEquals(resp.status, 403)
+
+ # cannot get object
+ resp = retry(get, self.obj, use_account=3)
+ resp.read()
+ self.assertEquals(resp.status, 403)
+
+ # grant read-only access
+ acl_user = swift_test_user[2]
+ acl = {'read-only': [acl_user]}
+ headers = {'x-account-access-control': json.dumps(acl)}
+ resp = retry(post_account, headers=headers, use_account=1)
+ resp.read()
+ self.assertEqual(resp.status, 204)
+
+ # can list objects
+ resp = retry(get_listing, use_account=3)
+ listing = resp.read()
+ self.assertEquals(resp.status, 200)
+ self.assert_(self.obj in listing)
+
+ # can get object
+ resp = retry(get, self.obj, use_account=3)
+ body = resp.read()
+ self.assertEquals(resp.status, 200)
+ self.assertEquals(body, 'test')
+
+ # can not put an object
+ obj_name = str(uuid4())
+ resp = retry(put, obj_name, use_account=3)
+ body = resp.read()
+ self.assertEquals(resp.status, 403)
+
+ # can not delete an object
+ resp = retry(delete, self.obj, use_account=3)
+ body = resp.read()
+ self.assertEquals(resp.status, 403)
+
+ # sanity with account1
+ resp = retry(get_listing, use_account=3)
+ listing = resp.read()
+ self.assertEquals(resp.status, 200)
+ self.assert_(obj_name not in listing)
+ self.assert_(self.obj in listing)
+
+ @requires_acls
+ def test_read_write(self):
+ if skip3:
+ raise SkipTest
+
+ def get_listing(url, token, parsed, conn):
+ conn.request('GET', '%s/%s' % (parsed.path, self.container), '',
+ {'X-Auth-Token': token})
+ return check_response(conn)
+
+ def post_account(url, token, parsed, conn, headers):
+ new_headers = dict({'X-Auth-Token': token}, **headers)
+ conn.request('POST', parsed.path, '', new_headers)
+ return check_response(conn)
+
+ def get(url, token, parsed, conn, name):
+ conn.request('GET', '%s/%s/%s' % (
+ parsed.path, self.container, name), '',
+ {'X-Auth-Token': token})
+ return check_response(conn)
+
+ def put(url, token, parsed, conn, name):
+ conn.request('PUT', '%s/%s/%s' % (
+ parsed.path, self.container, name), 'test',
+ {'X-Auth-Token': token})
+ return check_response(conn)
+
+ def delete(url, token, parsed, conn, name):
+ conn.request('DELETE', '%s/%s/%s' % (
+ parsed.path, self.container, name), '',
+ {'X-Auth-Token': token})
+ return check_response(conn)
+
+ # cannot list objects
+ resp = retry(get_listing, use_account=3)
+ resp.read()
+ self.assertEquals(resp.status, 403)
+
+ # cannot get object
+ resp = retry(get, self.obj, use_account=3)
+ resp.read()
+ self.assertEquals(resp.status, 403)
+
+ # grant read-write access
+ acl_user = swift_test_user[2]
+ acl = {'read-write': [acl_user]}
+ headers = {'x-account-access-control': json.dumps(acl)}
+ resp = retry(post_account, headers=headers, use_account=1)
+ resp.read()
+ self.assertEqual(resp.status, 204)
+
+ # can list objects
+ resp = retry(get_listing, use_account=3)
+ listing = resp.read()
+ self.assertEquals(resp.status, 200)
+ self.assert_(self.obj in listing)
+
+ # can get object
+ resp = retry(get, self.obj, use_account=3)
+ body = resp.read()
+ self.assertEquals(resp.status, 200)
+ self.assertEquals(body, 'test')
+
+ # can put an object
+ obj_name = str(uuid4())
+ resp = retry(put, obj_name, use_account=3)
+ body = resp.read()
+ self.assertEquals(resp.status, 201)
+
+ # can delete an object
+ resp = retry(delete, self.obj, use_account=3)
+ body = resp.read()
self.assertEquals(resp.status, 204)
+ # sanity with account1
+ resp = retry(get_listing, use_account=3)
+ listing = resp.read()
+ self.assertEquals(resp.status, 200)
+ self.assert_(obj_name in listing)
+ self.assert_(self.obj not in listing)
+
+ @requires_acls
+ def test_admin(self):
+ if skip3:
+ raise SkipTest
+
+ def get_listing(url, token, parsed, conn):
+ conn.request('GET', '%s/%s' % (parsed.path, self.container), '',
+ {'X-Auth-Token': token})
+ return check_response(conn)
+
+ def post_account(url, token, parsed, conn, headers):
+ new_headers = dict({'X-Auth-Token': token}, **headers)
+ conn.request('POST', parsed.path, '', new_headers)
+ return check_response(conn)
+
+ def get(url, token, parsed, conn, name):
+ conn.request('GET', '%s/%s/%s' % (
+ parsed.path, self.container, name), '',
+ {'X-Auth-Token': token})
+ return check_response(conn)
+
+ def put(url, token, parsed, conn, name):
+ conn.request('PUT', '%s/%s/%s' % (
+ parsed.path, self.container, name), 'test',
+ {'X-Auth-Token': token})
+ return check_response(conn)
+
+ def delete(url, token, parsed, conn, name):
+ conn.request('DELETE', '%s/%s/%s' % (
+ parsed.path, self.container, name), '',
+ {'X-Auth-Token': token})
+ return check_response(conn)
+
+ # cannot list objects
+ resp = retry(get_listing, use_account=3)
+ resp.read()
+ self.assertEquals(resp.status, 403)
+
+ # cannot get object
+ resp = retry(get, self.obj, use_account=3)
+ resp.read()
+ self.assertEquals(resp.status, 403)
+
+ # grant admin access
+ acl_user = swift_test_user[2]
+ acl = {'admin': [acl_user]}
+ headers = {'x-account-access-control': json.dumps(acl)}
+ resp = retry(post_account, headers=headers, use_account=1)
+ resp.read()
+ self.assertEqual(resp.status, 204)
+
+ # can list objects
+ resp = retry(get_listing, use_account=3)
+ listing = resp.read()
+ self.assertEquals(resp.status, 200)
+ self.assert_(self.obj in listing)
+
+ # can get object
+ resp = retry(get, self.obj, use_account=3)
+ body = resp.read()
+ self.assertEquals(resp.status, 200)
+ self.assertEquals(body, 'test')
+
+ # can put an object
+ obj_name = str(uuid4())
+ resp = retry(put, obj_name, use_account=3)
+ body = resp.read()
+ self.assertEquals(resp.status, 201)
+
+ # can delete an object
+ resp = retry(delete, self.obj, use_account=3)
+ body = resp.read()
+ self.assertEquals(resp.status, 204)
+
+ # sanity with account1
+ resp = retry(get_listing, use_account=3)
+ listing = resp.read()
+ self.assertEquals(resp.status, 200)
+ self.assert_(obj_name in listing)
+ self.assert_(self.obj not in listing)
+
def test_manifest(self):
if skip:
raise SkipTest
@@ -306,7 +577,7 @@ class TestObject(unittest.TestCase):
for objnum in xrange(len(segments1)):
resp = retry(put, objnum)
resp.read()
- self.assertEquals(resp.status, 201)
+ self.assertEqual(resp.status, 201)
# Upload the manifest
def put(url, token, parsed, conn):
@@ -318,7 +589,7 @@ class TestObject(unittest.TestCase):
return check_response(conn)
resp = retry(put)
resp.read()
- self.assertEquals(resp.status, 201)
+ self.assertEqual(resp.status, 201)
# Get the manifest (should get all the segments as the body)
def get(url, token, parsed, conn):
@@ -326,9 +597,9 @@ class TestObject(unittest.TestCase):
parsed.path, self.container), '', {'X-Auth-Token': token})
return check_response(conn)
resp = retry(get)
- self.assertEquals(resp.read(), ''.join(segments1))
- self.assertEquals(resp.status, 200)
- self.assertEquals(resp.getheader('content-type'), 'text/jibberish')
+ self.assertEqual(resp.read(), ''.join(segments1))
+ self.assertEqual(resp.status, 200)
+ self.assertEqual(resp.getheader('content-type'), 'text/jibberish')
# Get with a range at the start of the second segment
def get(url, token, parsed, conn):
@@ -337,8 +608,8 @@ class TestObject(unittest.TestCase):
'X-Auth-Token': token, 'Range': 'bytes=3-'})
return check_response(conn)
resp = retry(get)
- self.assertEquals(resp.read(), ''.join(segments1[1:]))
- self.assertEquals(resp.status, 206)
+ self.assertEqual(resp.read(), ''.join(segments1[1:]))
+ self.assertEqual(resp.status, 206)
# Get with a range in the middle of the second segment
def get(url, token, parsed, conn):
@@ -347,8 +618,8 @@ class TestObject(unittest.TestCase):
'X-Auth-Token': token, 'Range': 'bytes=5-'})
return check_response(conn)
resp = retry(get)
- self.assertEquals(resp.read(), ''.join(segments1)[5:])
- self.assertEquals(resp.status, 206)
+ self.assertEqual(resp.read(), ''.join(segments1)[5:])
+ self.assertEqual(resp.status, 206)
# Get with a full start and stop range
def get(url, token, parsed, conn):
@@ -357,8 +628,8 @@ class TestObject(unittest.TestCase):
'X-Auth-Token': token, 'Range': 'bytes=5-10'})
return check_response(conn)
resp = retry(get)
- self.assertEquals(resp.read(), ''.join(segments1)[5:11])
- self.assertEquals(resp.status, 206)
+ self.assertEqual(resp.read(), ''.join(segments1)[5:11])
+ self.assertEqual(resp.status, 206)
# Upload the second set of segments
def put(url, token, parsed, conn, objnum):
@@ -369,7 +640,7 @@ class TestObject(unittest.TestCase):
for objnum in xrange(len(segments2)):
resp = retry(put, objnum)
resp.read()
- self.assertEquals(resp.status, 201)
+ self.assertEqual(resp.status, 201)
# Get the manifest (should still be the first segments of course)
def get(url, token, parsed, conn):
@@ -377,8 +648,8 @@ class TestObject(unittest.TestCase):
parsed.path, self.container), '', {'X-Auth-Token': token})
return check_response(conn)
resp = retry(get)
- self.assertEquals(resp.read(), ''.join(segments1))
- self.assertEquals(resp.status, 200)
+ self.assertEqual(resp.read(), ''.join(segments1))
+ self.assertEqual(resp.status, 200)
# Update the manifest
def put(url, token, parsed, conn):
@@ -390,7 +661,7 @@ class TestObject(unittest.TestCase):
return check_response(conn)
resp = retry(put)
resp.read()
- self.assertEquals(resp.status, 201)
+ self.assertEqual(resp.status, 201)
# Get the manifest (should be the second set of segments now)
def get(url, token, parsed, conn):
@@ -398,8 +669,8 @@ class TestObject(unittest.TestCase):
parsed.path, self.container), '', {'X-Auth-Token': token})
return check_response(conn)
resp = retry(get)
- self.assertEquals(resp.read(), ''.join(segments2))
- self.assertEquals(resp.status, 200)
+ self.assertEqual(resp.read(), ''.join(segments2))
+ self.assertEqual(resp.status, 200)
if not skip3:
@@ -410,7 +681,7 @@ class TestObject(unittest.TestCase):
return check_response(conn)
resp = retry(get, use_account=3)
resp.read()
- self.assertEquals(resp.status, 403)
+ self.assertEqual(resp.status, 403)
# Grant access to the third account
def post(url, token, parsed, conn):
@@ -420,7 +691,7 @@ class TestObject(unittest.TestCase):
return check_response(conn)
resp = retry(post)
resp.read()
- self.assertEquals(resp.status, 204)
+ self.assertEqual(resp.status, 204)
# The third account should be able to get the manifest now
def get(url, token, parsed, conn):
@@ -428,8 +699,8 @@ class TestObject(unittest.TestCase):
parsed.path, self.container), '', {'X-Auth-Token': token})
return check_response(conn)
resp = retry(get, use_account=3)
- self.assertEquals(resp.read(), ''.join(segments2))
- self.assertEquals(resp.status, 200)
+ self.assertEqual(resp.read(), ''.join(segments2))
+ self.assertEqual(resp.status, 200)
# Create another container for the third set of segments
acontainer = uuid4().hex
@@ -440,7 +711,7 @@ class TestObject(unittest.TestCase):
return check_response(conn)
resp = retry(put)
resp.read()
- self.assertEquals(resp.status, 201)
+ self.assertEqual(resp.status, 201)
# Upload the third set of segments in the other container
def put(url, token, parsed, conn, objnum):
@@ -451,7 +722,7 @@ class TestObject(unittest.TestCase):
for objnum in xrange(len(segments3)):
resp = retry(put, objnum)
resp.read()
- self.assertEquals(resp.status, 201)
+ self.assertEqual(resp.status, 201)
# Update the manifest
def put(url, token, parsed, conn):
@@ -463,7 +734,7 @@ class TestObject(unittest.TestCase):
return check_response(conn)
resp = retry(put)
resp.read()
- self.assertEquals(resp.status, 201)
+ self.assertEqual(resp.status, 201)
# Get the manifest to ensure it's the third set of segments
def get(url, token, parsed, conn):
@@ -471,8 +742,8 @@ class TestObject(unittest.TestCase):
parsed.path, self.container), '', {'X-Auth-Token': token})
return check_response(conn)
resp = retry(get)
- self.assertEquals(resp.read(), ''.join(segments3))
- self.assertEquals(resp.status, 200)
+ self.assertEqual(resp.read(), ''.join(segments3))
+ self.assertEqual(resp.status, 200)
if not skip3:
@@ -486,7 +757,7 @@ class TestObject(unittest.TestCase):
return check_response(conn)
resp = retry(get, use_account=3)
resp.read()
- self.assertEquals(resp.status, 403)
+ self.assertEqual(resp.status, 403)
# Grant access to the third account
def post(url, token, parsed, conn):
@@ -496,7 +767,7 @@ class TestObject(unittest.TestCase):
return check_response(conn)
resp = retry(post)
resp.read()
- self.assertEquals(resp.status, 204)
+ self.assertEqual(resp.status, 204)
# The third account should be able to get the manifest now
def get(url, token, parsed, conn):
@@ -504,8 +775,8 @@ class TestObject(unittest.TestCase):
parsed.path, self.container), '', {'X-Auth-Token': token})
return check_response(conn)
resp = retry(get, use_account=3)
- self.assertEquals(resp.read(), ''.join(segments3))
- self.assertEquals(resp.status, 200)
+ self.assertEqual(resp.read(), ''.join(segments3))
+ self.assertEqual(resp.status, 200)
# Delete the manifest
def delete(url, token, parsed, conn, objnum):
@@ -515,7 +786,7 @@ class TestObject(unittest.TestCase):
return check_response(conn)
resp = retry(delete, objnum)
resp.read()
- self.assertEquals(resp.status, 204)
+ self.assertEqual(resp.status, 204)
# Delete the third set of segments
def delete(url, token, parsed, conn, objnum):
@@ -526,7 +797,7 @@ class TestObject(unittest.TestCase):
for objnum in xrange(len(segments3)):
resp = retry(delete, objnum)
resp.read()
- self.assertEquals(resp.status, 204)
+ self.assertEqual(resp.status, 204)
# Delete the second set of segments
def delete(url, token, parsed, conn, objnum):
@@ -537,7 +808,7 @@ class TestObject(unittest.TestCase):
for objnum in xrange(len(segments2)):
resp = retry(delete, objnum)
resp.read()
- self.assertEquals(resp.status, 204)
+ self.assertEqual(resp.status, 204)
# Delete the first set of segments
def delete(url, token, parsed, conn, objnum):
@@ -548,7 +819,7 @@ class TestObject(unittest.TestCase):
for objnum in xrange(len(segments1)):
resp = retry(delete, objnum)
resp.read()
- self.assertEquals(resp.status, 204)
+ self.assertEqual(resp.status, 204)
# Delete the extra container
def delete(url, token, parsed, conn):
@@ -557,7 +828,7 @@ class TestObject(unittest.TestCase):
return check_response(conn)
resp = retry(delete)
resp.read()
- self.assertEquals(resp.status, 204)
+ self.assertEqual(resp.status, 204)
def test_delete_content_type(self):
if skip:
@@ -569,7 +840,7 @@ class TestObject(unittest.TestCase):
return check_response(conn)
resp = retry(put)
resp.read()
- self.assertEquals(resp.status, 201)
+ self.assertEqual(resp.status, 201)
def delete(url, token, parsed, conn):
conn.request('DELETE', '%s/%s/hi' % (parsed.path, self.container),
@@ -577,9 +848,9 @@ class TestObject(unittest.TestCase):
return check_response(conn)
resp = retry(delete)
resp.read()
- self.assertEquals(resp.status, 204)
- self.assertEquals(resp.getheader('Content-Type'),
- 'text/html; charset=UTF-8')
+ self.assertEqual(resp.status, 204)
+ self.assertEqual(resp.getheader('Content-Type'),
+ 'text/html; charset=UTF-8')
def test_delete_if_delete_at_bad(self):
if skip:
@@ -592,7 +863,7 @@ class TestObject(unittest.TestCase):
return check_response(conn)
resp = retry(put)
resp.read()
- self.assertEquals(resp.status, 201)
+ self.assertEqual(resp.status, 201)
def delete(url, token, parsed, conn):
conn.request('DELETE', '%s/%s/hi' % (parsed.path, self.container),
@@ -601,7 +872,7 @@ class TestObject(unittest.TestCase):
return check_response(conn)
resp = retry(delete)
resp.read()
- self.assertEquals(resp.status, 400)
+ self.assertEqual(resp.status, 400)
def test_null_name(self):
if skip:
@@ -614,10 +885,121 @@ class TestObject(unittest.TestCase):
return check_response(conn)
resp = retry(put)
if (web_front_end == 'apache2'):
- self.assertEquals(resp.status, 404)
+ self.assertEqual(resp.status, 404)
else:
- self.assertEquals(resp.read(), 'Invalid UTF8 or contains NULL')
- self.assertEquals(resp.status, 412)
+ self.assertEqual(resp.read(), 'Invalid UTF8 or contains NULL')
+ self.assertEqual(resp.status, 412)
+
+ def test_cors(self):
+ if skip:
+ raise SkipTest
+
+ def is_strict_mode(url, token, parsed, conn):
+ conn.request('GET', '/info')
+ resp = conn.getresponse()
+ if resp.status // 100 == 2:
+ info = json.loads(resp.read())
+ return info.get('swift', {}).get('strict_cors_mode', False)
+ return False
+
+ def put_cors_cont(url, token, parsed, conn, orig):
+ conn.request(
+ 'PUT', '%s/%s' % (parsed.path, self.container),
+ '', {'X-Auth-Token': token,
+ 'X-Container-Meta-Access-Control-Allow-Origin': orig})
+ return check_response(conn)
+
+ def put_obj(url, token, parsed, conn, obj):
+ conn.request(
+ 'PUT', '%s/%s/%s' % (parsed.path, self.container, obj),
+ 'test', {'X-Auth-Token': token})
+ return check_response(conn)
+
+ def check_cors(url, token, parsed, conn,
+ method, obj, headers):
+ if method != 'OPTIONS':
+ headers['X-Auth-Token'] = token
+ conn.request(
+ method, '%s/%s/%s' % (parsed.path, self.container, obj),
+ '', headers)
+ return conn.getresponse()
+
+ strict_cors = retry(is_strict_mode)
+
+ resp = retry(put_cors_cont, '*')
+ resp.read()
+ self.assertEquals(resp.status // 100, 2)
+
+ resp = retry(put_obj, 'cat')
+ resp.read()
+ self.assertEquals(resp.status // 100, 2)
+
+ resp = retry(check_cors,
+ 'OPTIONS', 'cat', {'Origin': 'http://m.com'})
+ self.assertEquals(resp.status, 401)
+
+ resp = retry(check_cors,
+ 'OPTIONS', 'cat',
+ {'Origin': 'http://m.com',
+ 'Access-Control-Request-Method': 'GET'})
+
+ self.assertEquals(resp.status, 200)
+ resp.read()
+ headers = dict((k.lower(), v) for k, v in resp.getheaders())
+ self.assertEquals(headers.get('access-control-allow-origin'),
+ '*')
+
+ resp = retry(check_cors,
+ 'GET', 'cat', {'Origin': 'http://m.com'})
+ self.assertEquals(resp.status, 200)
+ headers = dict((k.lower(), v) for k, v in resp.getheaders())
+ self.assertEquals(headers.get('access-control-allow-origin'),
+ '*')
+
+ resp = retry(check_cors,
+ 'GET', 'cat', {'Origin': 'http://m.com',
+ 'X-Web-Mode': 'True'})
+ self.assertEquals(resp.status, 200)
+ headers = dict((k.lower(), v) for k, v in resp.getheaders())
+ self.assertEquals(headers.get('access-control-allow-origin'),
+ '*')
+
+ ####################
+
+ resp = retry(put_cors_cont, 'http://secret.com')
+ resp.read()
+ self.assertEquals(resp.status // 100, 2)
+
+ resp = retry(check_cors,
+ 'OPTIONS', 'cat',
+ {'Origin': 'http://m.com',
+ 'Access-Control-Request-Method': 'GET'})
+ resp.read()
+ self.assertEquals(resp.status, 401)
+
+ if strict_cors:
+ resp = retry(check_cors,
+ 'GET', 'cat', {'Origin': 'http://m.com'})
+ resp.read()
+ self.assertEquals(resp.status, 200)
+ headers = dict((k.lower(), v) for k, v in resp.getheaders())
+ self.assertTrue('access-control-allow-origin' not in headers)
+
+ resp = retry(check_cors,
+ 'GET', 'cat', {'Origin': 'http://secret.com'})
+ resp.read()
+ self.assertEquals(resp.status, 200)
+ headers = dict((k.lower(), v) for k, v in resp.getheaders())
+ self.assertEquals(headers.get('access-control-allow-origin'),
+ 'http://secret.com')
+ else:
+ resp = retry(check_cors,
+ 'GET', 'cat', {'Origin': 'http://m.com'})
+ resp.read()
+ self.assertEquals(resp.status, 200)
+ headers = dict((k.lower(), v) for k, v in resp.getheaders())
+ self.assertEquals(headers.get('access-control-allow-origin'),
+ 'http://m.com')
if __name__ == '__main__':