summaryrefslogtreecommitdiffstats
path: root/test/functionalnosetests
diff options
context:
space:
mode:
Diffstat (limited to 'test/functionalnosetests')
-rw-r--r--test/functionalnosetests/__init__.py0
-rw-r--r--test/functionalnosetests/swift_testing.py175
-rwxr-xr-xtest/functionalnosetests/test_account.py152
-rwxr-xr-xtest/functionalnosetests/test_container.py573
-rwxr-xr-xtest/functionalnosetests/test_object.py600
5 files changed, 1500 insertions, 0 deletions
diff --git a/test/functionalnosetests/__init__.py b/test/functionalnosetests/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/test/functionalnosetests/__init__.py
diff --git a/test/functionalnosetests/swift_testing.py b/test/functionalnosetests/swift_testing.py
new file mode 100644
index 0000000..023a753
--- /dev/null
+++ b/test/functionalnosetests/swift_testing.py
@@ -0,0 +1,175 @@
+# Copyright (c) 2010-2012 OpenStack, LLC.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import errno
+import os
+import socket
+import sys
+from time import sleep
+from nose import SkipTest
+from ConfigParser import MissingSectionHeaderError
+
+from test import get_config
+
+from swiftclient import get_auth, http_connection, HTTPException
+
+conf = get_config('func_test')
+web_front_end = conf.get('web_front_end', 'integral')
+normalized_urls = conf.get('normalized_urls', False)
+
+# If no conf was read, we will fall back to old school env vars
+swift_test_auth = os.environ.get('SWIFT_TEST_AUTH')
+swift_test_user = [os.environ.get('SWIFT_TEST_USER'), None, None]
+swift_test_key = [os.environ.get('SWIFT_TEST_KEY'), None, None]
+swift_test_tenant = ['', '', '']
+swift_test_perm = ['', '', '']
+
+if conf:
+ swift_test_auth_version = str(conf.get('auth_version', '1'))
+
+ swift_test_auth = 'http'
+ if conf.get('auth_ssl', 'no').lower() in ('yes', 'true', 'on', '1'):
+ swift_test_auth = 'https'
+ if 'auth_prefix' not in conf:
+ conf['auth_prefix'] = '/'
+ try:
+ swift_test_auth += \
+ '://%(auth_host)s:%(auth_port)s%(auth_prefix)s' % conf
+ except KeyError:
+ pass # skip
+
+ if swift_test_auth_version == "1":
+ swift_test_auth += 'v1.0'
+
+ if 'account' in conf:
+ swift_test_user[0] = '%(account)s:%(username)s' % conf
+ else:
+ swift_test_user[0] = '%(username)s' % conf
+ swift_test_key[0] = conf['password']
+ try:
+ swift_test_user[1] = '%s%s' % \
+ ('%s:' % conf['account2'] if 'account2' in conf else '',
+ conf['username2'])
+ swift_test_key[1] = conf['password2']
+ except KeyError, err:
+ pass # old conf, no second account tests can be run
+ try:
+ swift_test_user[2] = '%s%s' % ('%s:' % conf['account'] if 'account'
+ in conf else '', conf['username3'])
+ swift_test_key[2] = conf['password3']
+ except KeyError, err:
+ pass # old conf, no third account tests can be run
+
+ for _ in range(3):
+ swift_test_perm[_] = swift_test_user[_]
+
+ else:
+ swift_test_user[0] = conf['username']
+ swift_test_tenant[0] = conf['account']
+ swift_test_key[0] = conf['password']
+ swift_test_user[1] = conf['username2']
+ swift_test_tenant[1] = conf['account2']
+ swift_test_key[1] = conf['password2']
+ swift_test_user[2] = conf['username3']
+ swift_test_tenant[2] = conf['account']
+ swift_test_key[2] = conf['password3']
+
+ for _ in range(3):
+ swift_test_perm[_] = swift_test_tenant[_] + ':' + swift_test_user[_]
+
+skip = not all([swift_test_auth, swift_test_user[0], swift_test_key[0]])
+if skip:
+ print >>sys.stderr, 'SKIPPING FUNCTIONAL TESTS DUE TO NO CONFIG'
+
+skip2 = not all([not skip, swift_test_user[1], swift_test_key[1]])
+if not skip and skip2:
+ print >>sys.stderr, \
+ 'SKIPPING SECOND ACCOUNT FUNCTIONAL TESTS DUE TO NO CONFIG FOR THEM'
+
+skip3 = not all([not skip, swift_test_user[2], swift_test_key[2]])
+if not skip and skip3:
+ print >>sys.stderr, \
+ 'SKIPPING THIRD ACCOUNT FUNCTIONAL TESTS DUE TO NO CONFIG FOR THEM'
+
+
+class AuthError(Exception):
+ pass
+
+
+class InternalServerError(Exception):
+ pass
+
+
+url = [None, None, None]
+token = [None, None, None]
+parsed = [None, None, None]
+conn = [None, None, None]
+
+
+def retry(func, *args, **kwargs):
+ """
+ You can use the kwargs to override the 'retries' (default: 5) and
+ 'use_account' (default: 1).
+ """
+ global url, token, parsed, conn
+ retries = kwargs.get('retries', 5)
+ use_account = 1
+ if 'use_account' in kwargs:
+ use_account = kwargs['use_account']
+ del kwargs['use_account']
+ use_account -= 1
+ attempts = 0
+ backoff = 1
+ while attempts <= retries:
+ attempts += 1
+ try:
+ if not url[use_account] or not token[use_account]:
+ url[use_account], token[use_account] = \
+ get_auth(swift_test_auth, swift_test_user[use_account],
+ swift_test_key[use_account],
+ snet=False,
+ tenant_name=swift_test_tenant[use_account],
+ auth_version=swift_test_auth_version,
+ os_options={})
+ parsed[use_account] = conn[use_account] = None
+ if not parsed[use_account] or not conn[use_account]:
+ parsed[use_account], conn[use_account] = \
+ http_connection(url[use_account])
+ return func(url[use_account], token[use_account],
+ parsed[use_account], conn[use_account], *args, **kwargs)
+ except (socket.error, HTTPException):
+ if attempts > retries:
+ raise
+ parsed[use_account] = conn[use_account] = None
+ except AuthError, err:
+ url[use_account] = token[use_account] = None
+ continue
+ except InternalServerError, err:
+ pass
+ if attempts <= retries:
+ sleep(backoff)
+ backoff *= 2
+ raise Exception('No result after %s retries.' % retries)
+
+
+def check_response(conn):
+ resp = conn.getresponse()
+ if resp.status == 401:
+ resp.read()
+ raise AuthError()
+ elif resp.status // 100 == 5:
+ resp.read()
+ raise InternalServerError()
+ return resp
diff --git a/test/functionalnosetests/test_account.py b/test/functionalnosetests/test_account.py
new file mode 100755
index 0000000..ae6e3c4
--- /dev/null
+++ b/test/functionalnosetests/test_account.py
@@ -0,0 +1,152 @@
+#!/usr/bin/python
+
+# Copyright (c) 2010-2012 OpenStack, LLC.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import unittest
+from nose import SkipTest
+
+from swift.common.constraints import MAX_META_COUNT, MAX_META_NAME_LENGTH, \
+ MAX_META_OVERALL_SIZE, MAX_META_VALUE_LENGTH
+
+from swift_testing import check_response, retry, skip
+
+
+class TestAccount(unittest.TestCase):
+
+ def test_metadata(self):
+ if skip:
+ raise SkipTest
+ def post(url, token, parsed, conn, value):
+ conn.request('POST', parsed.path, '',
+ {'X-Auth-Token': token, 'X-Account-Meta-Test': value})
+ return check_response(conn)
+ def head(url, token, parsed, conn):
+ conn.request('HEAD', parsed.path, '', {'X-Auth-Token': token})
+ return check_response(conn)
+ def get(url, token, parsed, conn):
+ conn.request('GET', parsed.path, '', {'X-Auth-Token': token})
+ return check_response(conn)
+ resp = retry(post, '')
+ resp.read()
+ self.assertEquals(resp.status, 204)
+ resp = retry(head)
+ resp.read()
+ self.assert_(resp.status in (200, 204), resp.status)
+ self.assertEquals(resp.getheader('x-account-meta-test'), None)
+ resp = retry(get)
+ resp.read()
+ self.assert_(resp.status in (200, 204), resp.status)
+ self.assertEquals(resp.getheader('x-account-meta-test'), None)
+ resp = retry(post, 'Value')
+ resp.read()
+ self.assertEquals(resp.status, 204)
+ resp = retry(head)
+ resp.read()
+ self.assert_(resp.status in (200, 204), resp.status)
+ self.assertEquals(resp.getheader('x-account-meta-test'), 'Value')
+ resp = retry(get)
+ resp.read()
+ self.assert_(resp.status in (200, 204), resp.status)
+ self.assertEquals(resp.getheader('x-account-meta-test'), 'Value')
+
+ def test_multi_metadata(self):
+ if skip:
+ raise SkipTest
+ def post(url, token, parsed, conn, name, value):
+ conn.request('POST', parsed.path, '',
+ {'X-Auth-Token': token, name: value})
+ return check_response(conn)
+ def head(url, token, parsed, conn):
+ conn.request('HEAD', parsed.path, '', {'X-Auth-Token': token})
+ return check_response(conn)
+ resp = retry(post, 'X-Account-Meta-One', '1')
+ resp.read()
+ self.assertEquals(resp.status, 204)
+ resp = retry(head)
+ resp.read()
+ self.assert_(resp.status in (200, 204), resp.status)
+ self.assertEquals(resp.getheader('x-account-meta-one'), '1')
+ resp = retry(post, 'X-Account-Meta-Two', '2')
+ resp.read()
+ self.assertEquals(resp.status, 204)
+ resp = retry(head)
+ resp.read()
+ self.assert_(resp.status in (200, 204), resp.status)
+ self.assertEquals(resp.getheader('x-account-meta-one'), '1')
+ self.assertEquals(resp.getheader('x-account-meta-two'), '2')
+
+ def test_bad_metadata(self):
+ if skip:
+ raise SkipTest
+ def post(url, token, parsed, conn, extra_headers):
+ headers = {'X-Auth-Token': token}
+ headers.update(extra_headers)
+ conn.request('POST', parsed.path, '', headers)
+ return check_response(conn)
+ resp = retry(post,
+ {'X-Account-Meta-' + ('k' * MAX_META_NAME_LENGTH): 'v'})
+ resp.read()
+ self.assertEquals(resp.status, 204)
+ resp = retry(post,
+ {'X-Account-Meta-' + ('k' * (MAX_META_NAME_LENGTH + 1)): 'v'})
+ resp.read()
+ self.assertEquals(resp.status, 400)
+
+ resp = retry(post,
+ {'X-Account-Meta-Too-Long': 'k' * MAX_META_VALUE_LENGTH})
+ resp.read()
+ self.assertEquals(resp.status, 204)
+ resp = retry(post,
+ {'X-Account-Meta-Too-Long': 'k' * (MAX_META_VALUE_LENGTH + 1)})
+ resp.read()
+ self.assertEquals(resp.status, 400)
+
+ headers = {}
+ for x in xrange(MAX_META_COUNT):
+ headers['X-Account-Meta-%d' % x] = 'v'
+ resp = retry(post, headers)
+ resp.read()
+ self.assertEquals(resp.status, 204)
+ headers = {}
+ for x in xrange(MAX_META_COUNT + 1):
+ headers['X-Account-Meta-%d' % x] = 'v'
+ resp = retry(post, headers)
+ resp.read()
+ self.assertEquals(resp.status, 400)
+
+ headers = {}
+ header_value = 'k' * MAX_META_VALUE_LENGTH
+ size = 0
+ x = 0
+ while size < MAX_META_OVERALL_SIZE - 4 - MAX_META_VALUE_LENGTH:
+ size += 4 + MAX_META_VALUE_LENGTH
+ headers['X-Account-Meta-%04d' % x] = header_value
+ x += 1
+ if MAX_META_OVERALL_SIZE - size > 1:
+ headers['X-Account-Meta-k'] = \
+ 'v' * (MAX_META_OVERALL_SIZE - size - 1)
+ resp = retry(post, headers)
+ resp.read()
+ self.assertEquals(resp.status, 204)
+ headers['X-Account-Meta-k'] = \
+ 'v' * (MAX_META_OVERALL_SIZE - size)
+ resp = retry(post, headers)
+ resp.read()
+ self.assertEquals(resp.status, 400)
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/test/functionalnosetests/test_container.py b/test/functionalnosetests/test_container.py
new file mode 100755
index 0000000..e92a86c
--- /dev/null
+++ b/test/functionalnosetests/test_container.py
@@ -0,0 +1,573 @@
+#!/usr/bin/python
+
+# Copyright (c) 2010-2012 OpenStack, LLC.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import json
+import unittest
+from nose import SkipTest
+from uuid import uuid4
+
+from swift.common.constraints import MAX_META_COUNT, MAX_META_NAME_LENGTH, \
+ MAX_META_OVERALL_SIZE, MAX_META_VALUE_LENGTH
+
+from swift_testing import check_response, retry, skip, skip2, skip3, \
+ swift_test_perm, web_front_end
+
+
+class TestContainer(unittest.TestCase):
+
+ def setUp(self):
+ if skip:
+ raise SkipTest
+ self.name = uuid4().hex
+ def put(url, token, parsed, conn):
+ conn.request('PUT', parsed.path + '/' + self.name, '',
+ {'X-Auth-Token': token})
+ return check_response(conn)
+ resp = retry(put)
+ resp.read()
+ self.assertEquals(resp.status, 201)
+
+ def tearDown(self):
+ if skip:
+ raise SkipTest
+ def get(url, token, parsed, conn):
+ conn.request('GET', parsed.path + '/' + self.name + '?format=json',
+ '', {'X-Auth-Token': token})
+ return check_response(conn)
+ def delete(url, token, parsed, conn, obj):
+ conn.request('DELETE',
+ '/'.join([parsed.path, self.name, obj['name']]), '',
+ {'X-Auth-Token': token})
+ return check_response(conn)
+ while True:
+ resp = retry(get)
+ body = resp.read()
+ self.assert_(resp.status // 100 == 2, resp.status)
+ objs = json.loads(body)
+ if not objs:
+ break
+ for obj in objs:
+ resp = retry(delete, obj)
+ resp.read()
+ self.assertEquals(resp.status, 204)
+ def delete(url, token, parsed, conn):
+ conn.request('DELETE', parsed.path + '/' + self.name, '',
+ {'X-Auth-Token': token})
+ return check_response(conn)
+ resp = retry(delete)
+ resp.read()
+ self.assertEquals(resp.status, 204)
+
+ def test_multi_metadata(self):
+ if skip:
+ raise SkipTest
+ def post(url, token, parsed, conn, name, value):
+ conn.request('POST', parsed.path + '/' + self.name, '',
+ {'X-Auth-Token': token, name: value})
+ return check_response(conn)
+ def head(url, token, parsed, conn):
+ conn.request('HEAD', parsed.path + '/' + self.name, '',
+ {'X-Auth-Token': token})
+ return check_response(conn)
+ resp = retry(post, 'X-Container-Meta-One', '1')
+ resp.read()
+ self.assertEquals(resp.status, 204)
+ resp = retry(head)
+ resp.read()
+ self.assert_(resp.status in (200, 204), resp.status)
+ self.assertEquals(resp.getheader('x-container-meta-one'), '1')
+ resp = retry(post, 'X-Container-Meta-Two', '2')
+ resp.read()
+ self.assertEquals(resp.status, 204)
+ resp = retry(head)
+ resp.read()
+ self.assert_(resp.status in (200, 204), resp.status)
+ self.assertEquals(resp.getheader('x-container-meta-one'), '1')
+ self.assertEquals(resp.getheader('x-container-meta-two'), '2')
+
+ def test_PUT_metadata(self):
+ if skip:
+ raise SkipTest
+ def put(url, token, parsed, conn, name, value):
+ conn.request('PUT', parsed.path + '/' + name, '',
+ {'X-Auth-Token': token, 'X-Container-Meta-Test': value})
+ return check_response(conn)
+ def head(url, token, parsed, conn, name):
+ conn.request('HEAD', parsed.path + '/' + name, '',
+ {'X-Auth-Token': token})
+ return check_response(conn)
+ def get(url, token, parsed, conn, name):
+ conn.request('GET', parsed.path + '/' + name, '',
+ {'X-Auth-Token': token})
+ return check_response(conn)
+ def delete(url, token, parsed, conn, name):
+ conn.request('DELETE', parsed.path + '/' + name, '',
+ {'X-Auth-Token': token})
+ return check_response(conn)
+ name = uuid4().hex
+ resp = retry(put, name, 'Value')
+ resp.read()
+ self.assertEquals(resp.status, 201)
+ resp = retry(head, name)
+ resp.read()
+ self.assert_(resp.status in (200, 204), resp.status)
+ self.assertEquals(resp.getheader('x-container-meta-test'), 'Value')
+ resp = retry(get, name)
+ resp.read()
+ self.assert_(resp.status in (200, 204), resp.status)
+ self.assertEquals(resp.getheader('x-container-meta-test'), 'Value')
+ resp = retry(delete, name)
+ resp.read()
+ self.assertEquals(resp.status, 204)
+
+ name = uuid4().hex
+ resp = retry(put, name, '')
+ resp.read()
+ self.assertEquals(resp.status, 201)
+ resp = retry(head, name)
+ resp.read()
+ self.assert_(resp.status in (200, 204), resp.status)
+ self.assertEquals(resp.getheader('x-container-meta-test'), None)
+ resp = retry(get, name)
+ resp.read()
+ self.assert_(resp.status in (200, 204), resp.status)
+ self.assertEquals(resp.getheader('x-container-meta-test'), None)
+ resp = retry(delete, name)
+ resp.read()
+ self.assertEquals(resp.status, 204)
+
+ def test_POST_metadata(self):
+ if skip:
+ raise SkipTest
+ def post(url, token, parsed, conn, value):
+ conn.request('POST', parsed.path + '/' + self.name, '',
+ {'X-Auth-Token': token, 'X-Container-Meta-Test': value})
+ return check_response(conn)
+ def head(url, token, parsed, conn):
+ conn.request('HEAD', parsed.path + '/' + self.name, '',
+ {'X-Auth-Token': token})
+ return check_response(conn)
+ def get(url, token, parsed, conn):
+ conn.request('GET', parsed.path + '/' + self.name, '',
+ {'X-Auth-Token': token})
+ return check_response(conn)
+ resp = retry(head)
+ resp.read()
+ self.assert_(resp.status in (200, 204), resp.status)
+ self.assertEquals(resp.getheader('x-container-meta-test'), None)
+ resp = retry(get)
+ resp.read()
+ self.assert_(resp.status in (200, 204), resp.status)
+ self.assertEquals(resp.getheader('x-container-meta-test'), None)
+ resp = retry(post, 'Value')
+ resp.read()
+ self.assertEquals(resp.status, 204)
+ resp = retry(head)
+ resp.read()
+ self.assert_(resp.status in (200, 204), resp.status)
+ self.assertEquals(resp.getheader('x-container-meta-test'), 'Value')
+ resp = retry(get)
+ resp.read()
+ self.assert_(resp.status in (200, 204), resp.status)
+ self.assertEquals(resp.getheader('x-container-meta-test'), 'Value')
+
+ def test_PUT_bad_metadata(self):
+ if skip:
+ raise SkipTest
+ def put(url, token, parsed, conn, name, extra_headers):
+ headers = {'X-Auth-Token': token}
+ headers.update(extra_headers)
+ conn.request('PUT', parsed.path + '/' + name, '', headers)
+ return check_response(conn)
+ def delete(url, token, parsed, conn, name):
+ conn.request('DELETE', parsed.path + '/' + name, '',
+ {'X-Auth-Token': token})
+ return check_response(conn)
+ name = uuid4().hex
+ resp = retry(put, name,
+ {'X-Container-Meta-' + ('k' * MAX_META_NAME_LENGTH): 'v'})
+ resp.read()
+ self.assertEquals(resp.status, 201)
+ resp = retry(delete, name)
+ resp.read()
+ self.assertEquals(resp.status, 204)
+ name = uuid4().hex
+ resp = retry(put, name,
+ {'X-Container-Meta-' + ('k' * (MAX_META_NAME_LENGTH + 1)): 'v'})
+ resp.read()
+ self.assertEquals(resp.status, 400)
+ resp = retry(delete, name)
+ resp.read()
+ self.assertEquals(resp.status, 404)
+
+ name = uuid4().hex
+ resp = retry(put, name,
+ {'X-Container-Meta-Too-Long': 'k' * MAX_META_VALUE_LENGTH})
+ resp.read()
+ self.assertEquals(resp.status, 201)
+ resp = retry(delete, name)
+ resp.read()
+ self.assertEquals(resp.status, 204)
+ name = uuid4().hex
+ resp = retry(put, name,
+ {'X-Container-Meta-Too-Long': 'k' * (MAX_META_VALUE_LENGTH + 1)})
+ resp.read()
+ self.assertEquals(resp.status, 400)
+ resp = retry(delete, name)
+ resp.read()
+ self.assertEquals(resp.status, 404)
+
+ name = uuid4().hex
+ headers = {}
+ for x in xrange(MAX_META_COUNT):
+ headers['X-Container-Meta-%d' % x] = 'v'
+ resp = retry(put, name, headers)
+ resp.read()
+ self.assertEquals(resp.status, 201)
+ resp = retry(delete, name)
+ resp.read()
+ self.assertEquals(resp.status, 204)
+ name = uuid4().hex
+ headers = {}
+ for x in xrange(MAX_META_COUNT + 1):
+ headers['X-Container-Meta-%d' % x] = 'v'
+ resp = retry(put, name, headers)
+ resp.read()
+ self.assertEquals(resp.status, 400)
+ resp = retry(delete, name)
+ resp.read()
+ self.assertEquals(resp.status, 404)
+
+ name = uuid4().hex
+ headers = {}
+ header_value = 'k' * MAX_META_VALUE_LENGTH
+ size = 0
+ x = 0
+ while size < MAX_META_OVERALL_SIZE - 4 - MAX_META_VALUE_LENGTH:
+ size += 4 + MAX_META_VALUE_LENGTH
+ headers['X-Container-Meta-%04d' % x] = header_value
+ x += 1
+ if MAX_META_OVERALL_SIZE - size > 1:
+ headers['X-Container-Meta-k'] = \
+ 'v' * (MAX_META_OVERALL_SIZE - size - 1)
+ resp = retry(put, name, headers)
+ resp.read()
+ self.assertEquals(resp.status, 201)
+ resp = retry(delete, name)
+ resp.read()
+ self.assertEquals(resp.status, 204)
+ name = uuid4().hex
+ headers['X-Container-Meta-k'] = \
+ 'v' * (MAX_META_OVERALL_SIZE - size)
+ resp = retry(put, name, headers)
+ resp.read()
+ self.assertEquals(resp.status, 400)
+ resp = retry(delete, name)
+ resp.read()
+ self.assertEquals(resp.status, 404)
+
+ def test_POST_bad_metadata(self):
+ if skip:
+ raise SkipTest
+ def post(url, token, parsed, conn, extra_headers):
+ headers = {'X-Auth-Token': token}
+ headers.update(extra_headers)
+ conn.request('POST', parsed.path + '/' + self.name, '', headers)
+ return check_response(conn)
+ resp = retry(post,
+ {'X-Container-Meta-' + ('k' * MAX_META_NAME_LENGTH): 'v'})
+ resp.read()
+ self.assertEquals(resp.status, 204)
+ resp = retry(post,
+ {'X-Container-Meta-' + ('k' * (MAX_META_NAME_LENGTH + 1)): 'v'})
+ resp.read()
+ self.assertEquals(resp.status, 400)
+
+ resp = retry(post,
+ {'X-Container-Meta-Too-Long': 'k' * MAX_META_VALUE_LENGTH})
+ resp.read()
+ self.assertEquals(resp.status, 204)
+ resp = retry(post,
+ {'X-Container-Meta-Too-Long': 'k' * (MAX_META_VALUE_LENGTH + 1)})
+ resp.read()
+ self.assertEquals(resp.status, 400)
+
+ headers = {}
+ for x in xrange(MAX_META_COUNT):
+ headers['X-Container-Meta-%d' % x] = 'v'
+ resp = retry(post, headers)
+ resp.read()
+ self.assertEquals(resp.status, 204)
+ headers = {}
+ for x in xrange(MAX_META_COUNT + 1):
+ headers['X-Container-Meta-%d' % x] = 'v'
+ resp = retry(post, headers)
+ resp.read()
+ self.assertEquals(resp.status, 400)
+
+ headers = {}
+ header_value = 'k' * MAX_META_VALUE_LENGTH
+ size = 0
+ x = 0
+ while size < MAX_META_OVERALL_SIZE - 4 - MAX_META_VALUE_LENGTH:
+ size += 4 + MAX_META_VALUE_LENGTH
+ headers['X-Container-Meta-%04d' % x] = header_value
+ x += 1
+ if MAX_META_OVERALL_SIZE - size > 1:
+ headers['X-Container-Meta-k'] = \
+ 'v' * (MAX_META_OVERALL_SIZE - size - 1)
+ resp = retry(post, headers)
+ resp.read()
+ self.assertEquals(resp.status, 204)
+ headers['X-Container-Meta-k'] = \
+ 'v' * (MAX_META_OVERALL_SIZE - size)
+ resp = retry(post, headers)
+ resp.read()
+ self.assertEquals(resp.status, 400)
+
+ def test_public_container(self):
+ if skip:
+ raise SkipTest
+ def get(url, token, parsed, conn):
+ conn.request('GET', parsed.path + '/' + self.name)
+ return check_response(conn)
+ try:
+ resp = retry(get)
+ raise Exception('Should not have been able to GET')
+ except Exception, err:
+ self.assert_(str(err).startswith('No result after '), err)
+ def post(url, token, parsed, conn):
+ conn.request('POST', parsed.path + '/' + self.name, '',
+ {'X-Auth-Token': token,
+ 'X-Container-Read': '.r:*,.rlistings'})
+ return check_response(conn)
+ resp = retry(post)
+ resp.read()
+ self.assertEquals(resp.status, 204)
+ resp = retry(get)
+ resp.read()
+ self.assertEquals(resp.status, 204)
+ def post(url, token, parsed, conn):
+ conn.request('POST', parsed.path + '/' + self.name, '',
+ {'X-Auth-Token': token, 'X-Container-Read': ''})
+ return check_response(conn)
+ resp = retry(post)
+ resp.read()
+ self.assertEquals(resp.status, 204)
+ try:
+ resp = retry(get)
+ raise Exception('Should not have been able to GET')
+ except Exception, err:
+ self.assert_(str(err).startswith('No result after '), err)
+
+ def test_cross_account_container(self):
+ if skip or skip2:
+ raise SkipTest
+ # Obtain the first account's string
+ first_account = ['unknown']
+ def get1(url, token, parsed, conn):
+ first_account[0] = parsed.path
+ conn.request('HEAD', parsed.path + '/' + self.name, '',
+ {'X-Auth-Token': token})
+ return check_response(conn)
+ resp = retry(get1)
+ resp.read()
+ # Ensure we can't access the container with the second account
+ def get2(url, token, parsed, conn):
+ conn.request('GET', first_account[0] + '/' + self.name, '',
+ {'X-Auth-Token': token})
+ return check_response(conn)
+ resp = retry(get2, use_account=2)
+ resp.read()
+ self.assertEquals(resp.status, 403)
+ # Make the container accessible by the second account
+ def post(url, token, parsed, conn):
+ conn.request('POST', parsed.path + '/' + self.name, '',
+ {'X-Auth-Token': token, 'X-Container-Read': swift_test_perm[1],
+ 'X-Container-Write': swift_test_perm[1]})
+ return check_response(conn)
+ resp = retry(post)
+ resp.read()
+ self.assertEquals(resp.status, 204)
+ # Ensure we can now use the container with the second account
+ resp = retry(get2, use_account=2)
+ resp.read()
+ self.assertEquals(resp.status, 204)
+ # Make the container private again
+ def post(url, token, parsed, conn):
+ conn.request('POST', parsed.path + '/' + self.name, '',
+ {'X-Auth-Token': token, 'X-Container-Read': '',
+ 'X-Container-Write': ''})
+ return check_response(conn)
+ resp = retry(post)
+ resp.read()
+ self.assertEquals(resp.status, 204)
+ # Ensure we can't access the container with the second account again
+ resp = retry(get2, use_account=2)
+ resp.read()
+ self.assertEquals(resp.status, 403)
+
+ def test_cross_account_public_container(self):
+ if skip or skip2:
+ raise SkipTest
+ # Obtain the first account's string
+ first_account = ['unknown']
+ def get1(url, token, parsed, conn):
+ first_account[0] = parsed.path
+ conn.request('HEAD', parsed.path + '/' + self.name, '',
+ {'X-Auth-Token': token})
+ return check_response(conn)
+ resp = retry(get1)
+ resp.read()
+ # Ensure we can't access the container with the second account
+ def get2(url, token, parsed, conn):
+ conn.request('GET', first_account[0] + '/' + self.name, '',
+ {'X-Auth-Token': token})
+ return check_response(conn)
+ resp = retry(get2, use_account=2)
+ resp.read()
+ self.assertEquals(resp.status, 403)
+ # Make the container completely public
+ def post(url, token, parsed, conn):
+ conn.request('POST', parsed.path + '/' + self.name, '',
+ {'X-Auth-Token': token,
+ 'X-Container-Read': '.r:*,.rlistings'})
+ return check_response(conn)
+ resp = retry(post)
+ resp.read()
+ self.assertEquals(resp.status, 204)
+ # Ensure we can now read the container with the second account
+ resp = retry(get2, use_account=2)
+ resp.read()
+ self.assertEquals(resp.status, 204)
+ # But we shouldn't be able to write with the second account
+ def put2(url, token, parsed, conn):
+ conn.request('PUT', first_account[0] + '/' + self.name + '/object',
+ 'test object', {'X-Auth-Token': token})
+ return check_response(conn)
+ resp = retry(put2, use_account=2)
+ resp.read()
+ self.assertEquals(resp.status, 403)
+ # Now make the container also writeable by the second account
+ def post(url, token, parsed, conn):
+ conn.request('POST', parsed.path + '/' + self.name, '',
+ {'X-Auth-Token': token,
+ 'X-Container-Write': swift_test_perm[1]})
+ return check_response(conn)
+ resp = retry(post)
+ resp.read()
+ self.assertEquals(resp.status, 204)
+ # Ensure we can still read the container with the second account
+ resp = retry(get2, use_account=2)
+ resp.read()
+ self.assertEquals(resp.status, 204)
+ # And that we can now write with the second account
+ resp = retry(put2, use_account=2)
+ resp.read()
+ self.assertEquals(resp.status, 201)
+
+ def test_nonadmin_user(self):
+ if skip or skip3:
+ raise SkipTest
+ # Obtain the first account's string
+ first_account = ['unknown']
+ def get1(url, token, parsed, conn):
+ first_account[0] = parsed.path
+ conn.request('HEAD', parsed.path + '/' + self.name, '',
+ {'X-Auth-Token': token})
+ return check_response(conn)
+ resp = retry(get1)
+ resp.read()
+ # Ensure we can't access the container with the third account
+ def get3(url, token, parsed, conn):
+ conn.request('GET', first_account[0] + '/' + self.name, '',
+ {'X-Auth-Token': token})
+ return check_response(conn)
+ resp = retry(get3, use_account=3)
+ resp.read()
+ self.assertEquals(resp.status, 403)
+ # Make the container accessible by the third account
+ def post(url, token, parsed, conn):
+ conn.request('POST', parsed.path + '/' + self.name, '',
+ {'X-Auth-Token': token, 'X-Container-Read': swift_test_perm[2]})
+ return check_response(conn)
+ resp = retry(post)
+ resp.read()
+ self.assertEquals(resp.status, 204)
+ # Ensure we can now read the container with the third account
+ resp = retry(get3, use_account=3)
+ resp.read()
+ self.assertEquals(resp.status, 204)
+ # But we shouldn't be able to write with the third account
+ def put3(url, token, parsed, conn):
+ conn.request('PUT', first_account[0] + '/' + self.name + '/object',
+ 'test object', {'X-Auth-Token': token})
+ return check_response(conn)
+ resp = retry(put3, use_account=3)
+ resp.read()
+ self.assertEquals(resp.status, 403)
+ # Now make the container also writeable by the third account
+ def post(url, token, parsed, conn):
+ conn.request('POST', parsed.path + '/' + self.name, '',
+ {'X-Auth-Token': token,
+ 'X-Container-Write': swift_test_perm[2]})
+ return check_response(conn)
+ resp = retry(post)
+ resp.read()
+ self.assertEquals(resp.status, 204)
+ # Ensure we can still read the container with the third account
+ resp = retry(get3, use_account=3)
+ resp.read()
+ self.assertEquals(resp.status, 204)
+ # And that we can now write with the third account
+ resp = retry(put3, use_account=3)
+ resp.read()
+ self.assertEquals(resp.status, 201)
+
+ def test_long_name_content_type(self):
+ if skip:
+ raise SkipTest
+
+ def put(url, token, parsed, conn):
+ container_name = 'X' * 2048
+ conn.request('PUT', '%s/%s' % (parsed.path,
+ container_name), 'there', {'X-Auth-Token': token})
+ return check_response(conn)
+ resp = retry(put)
+ resp.read()
+ self.assertEquals(resp.status, 400)
+ self.assertEquals(resp.getheader('Content-Type'),
+ 'text/html; charset=UTF-8')
+
+ def test_null_name(self):
+ if skip:
+ raise SkipTest
+
+ def put(url, token, parsed, conn):
+ conn.request('PUT', '%s/abc%%00def' % parsed.path, '',
+ {'X-Auth-Token': token})
+ return check_response(conn)
+ resp = retry(put)
+ if (web_front_end == 'apache2'):
+ self.assertEquals(resp.status, 404)
+ else:
+ self.assertEquals(resp.read(), 'Invalid UTF8 or contains NULL')
+ self.assertEquals(resp.status, 412)
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/test/functionalnosetests/test_object.py b/test/functionalnosetests/test_object.py
new file mode 100755
index 0000000..168375d
--- /dev/null
+++ b/test/functionalnosetests/test_object.py
@@ -0,0 +1,600 @@
+#!/usr/bin/python
+
+# Copyright (c) 2010-2012 OpenStack, LLC.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import unittest
+from nose import SkipTest
+from uuid import uuid4
+
+from swift.common.constraints import MAX_META_COUNT, MAX_META_NAME_LENGTH, \
+ MAX_META_OVERALL_SIZE, MAX_META_VALUE_LENGTH
+
+from swift_testing import check_response, retry, skip, skip3, \
+ swift_test_perm, web_front_end
+from test import get_config
+
+
+class TestObject(unittest.TestCase):
+
+ def setUp(self):
+ if skip:
+ raise SkipTest
+ self.container = uuid4().hex
+
+ def put(url, token, parsed, conn):
+ conn.request('PUT', parsed.path + '/' + self.container, '',
+ {'X-Auth-Token': token})
+ return check_response(conn)
+ resp = retry(put)
+ resp.read()
+ self.assertEquals(resp.status, 201)
+ self.obj = uuid4().hex
+
+ def put(url, token, parsed, conn):
+ conn.request('PUT', '%s/%s/%s' % (parsed.path, self.container,
+ self.obj), 'test', {'X-Auth-Token': token})
+ return check_response(conn)
+ resp = retry(put)
+ resp.read()
+ self.assertEquals(resp.status, 201)
+
+ def tearDown(self):
+ if skip:
+ raise SkipTest
+
+ def delete(url, token, parsed, conn, obj):
+ conn.request('DELETE',
+ '%s/%s/%s' % (parsed.path, self.container, obj),
+ '', {'X-Auth-Token': token})
+ return check_response(conn)
+
+ # get list of objects in container
+ def list(url, token, parsed, conn):
+ conn.request('GET',
+ '%s/%s' % (parsed.path, self.container),
+ '', {'X-Auth-Token': token})
+ return check_response(conn)
+ resp = retry(list)
+ object_listing = resp.read()
+ self.assertEquals(resp.status, 200)
+
+ # iterate over object listing and delete all objects
+ for obj in object_listing.splitlines():
+ resp = retry(delete, obj)
+ resp.read()
+ self.assertEquals(resp.status, 204)
+
+ # delete the container
+ def delete(url, token, parsed, conn):
+ conn.request('DELETE', parsed.path + '/' + self.container, '',
+ {'X-Auth-Token': token})
+ return check_response(conn)
+ resp = retry(delete)
+ resp.read()
+ self.assertEquals(resp.status, 204)
+
+ def test_copy_object(self):
+ if skip:
+ raise SkipTest
+
+ source = '%s/%s' % (self.container, self.obj)
+ dest = '%s/%s' % (self.container, 'test_copy')
+
+ # get contents of source
+ def get_source(url, token, parsed, conn):
+ conn.request('GET',
+ '%s/%s' % (parsed.path, source),
+ '', {'X-Auth-Token': token})
+ return check_response(conn)
+ resp = retry(get_source)
+ source_contents = resp.read()
+ self.assertEquals(resp.status, 200)
+ self.assertEquals(source_contents, 'test')
+
+ # copy source to dest with X-Copy-From
+ def put(url, token, parsed, conn):
+ conn.request('PUT', '%s/%s' % (parsed.path, dest), '',
+ {'X-Auth-Token': token,
+ 'Content-Length': '0',
+ 'X-Copy-From': source})
+ return check_response(conn)
+ resp = retry(put)
+ contents = resp.read()
+ self.assertEquals(resp.status, 201)
+
+ # contents of dest should be the same as source
+ def get_dest(url, token, parsed, conn):
+ conn.request('GET',
+ '%s/%s' % (parsed.path, dest),
+ '', {'X-Auth-Token': token})
+ return check_response(conn)
+ resp = retry(get_dest)
+ dest_contents = resp.read()
+ self.assertEquals(resp.status, 200)
+ self.assertEquals(dest_contents, source_contents)
+
+ # delete the copy
+ def delete(url, token, parsed, conn):
+ conn.request('DELETE', '%s/%s' % (parsed.path, dest), '',
+ {'X-Auth-Token': token})
+ return check_response(conn)
+ resp = retry(delete)
+ resp.read()
+ self.assertEquals(resp.status, 204)
+ # verify dest does not exist
+ resp = retry(get_dest)
+ resp.read()
+ self.assertEquals(resp.status, 404)
+
+ # copy source to dest with COPY
+ def copy(url, token, parsed, conn):
+ conn.request('COPY', '%s/%s' % (parsed.path, source), '',
+ {'X-Auth-Token': token,
+ 'Destination': dest})
+ return check_response(conn)
+ resp = retry(copy)
+ contents = resp.read()
+ self.assertEquals(resp.status, 201)
+
+ # contents of dest should be the same as source
+ resp = retry(get_dest)
+ dest_contents = resp.read()
+ self.assertEquals(resp.status, 200)
+ self.assertEquals(dest_contents, source_contents)
+
+ # delete the copy
+ resp = retry(delete)
+ resp.read()
+ self.assertEquals(resp.status, 204)
+
+ def test_public_object(self):
+ if skip:
+ raise SkipTest
+
+ def get(url, token, parsed, conn):
+ conn.request('GET',
+ '%s/%s/%s' % (parsed.path, self.container, self.obj))
+ return check_response(conn)
+ try:
+ resp = retry(get)
+ raise Exception('Should not have been able to GET')
+ except Exception, err:
+ self.assert_(str(err).startswith('No result after '))
+
+ def post(url, token, parsed, conn):
+ conn.request('POST', parsed.path + '/' + self.container, '',
+ {'X-Auth-Token': token,
+ 'X-Container-Read': '.r:*'})
+ return check_response(conn)
+ resp = retry(post)
+ resp.read()
+ self.assertEquals(resp.status, 204)
+ resp = retry(get)
+ resp.read()
+ self.assertEquals(resp.status, 200)
+
+ def post(url, token, parsed, conn):
+ conn.request('POST', parsed.path + '/' + self.container, '',
+ {'X-Auth-Token': token, 'X-Container-Read': ''})
+ return check_response(conn)
+ resp = retry(post)
+ resp.read()
+ self.assertEquals(resp.status, 204)
+ try:
+ resp = retry(get)
+ raise Exception('Should not have been able to GET')
+ except Exception, err:
+ self.assert_(str(err).startswith('No result after '))
+
+ def test_private_object(self):
+ if skip or skip3:
+ raise SkipTest
+
+ # Ensure we can't access the object with the third account
+ def get(url, token, parsed, conn):
+ conn.request('GET', '%s/%s/%s' % (parsed.path, self.container,
+ self.obj), '',
+ {'X-Auth-Token': token})
+ return check_response(conn)
+ resp = retry(get, use_account=3)
+ resp.read()
+ self.assertEquals(resp.status, 403)
+
+ # create a shared container writable by account3
+ shared_container = uuid4().hex
+
+ def put(url, token, parsed, conn):
+ conn.request('PUT', '%s/%s' % (parsed.path,
+ shared_container), '',
+ {'X-Auth-Token': token,
+ 'X-Container-Read': swift_test_perm[2],
+ 'X-Container-Write': swift_test_perm[2]})
+ return check_response(conn)
+ resp = retry(put)
+ resp.read()
+ self.assertEquals(resp.status, 201)
+
+ # verify third account can not copy from private container
+ def copy(url, token, parsed, conn):
+ conn.request('PUT', '%s/%s/%s' % (parsed.path,
+ shared_container,
+ 'private_object'),
+ '', {'X-Auth-Token': token,
+ 'Content-Length': '0',
+ 'X-Copy-From': '%s/%s' % (self.container,
+ self.obj)})
+ return check_response(conn)
+ resp = retry(copy, use_account=3)
+ resp.read()
+ self.assertEquals(resp.status, 403)
+
+ # verify third account can write "obj1" to shared container
+ def put(url, token, parsed, conn):
+ conn.request('PUT', '%s/%s/%s' % (parsed.path, shared_container,
+ 'obj1'), 'test', {'X-Auth-Token': token})
+ return check_response(conn)
+ resp = retry(put, use_account=3)
+ resp.read()
+ self.assertEquals(resp.status, 201)
+
+ # verify third account can copy "obj1" to shared container
+ def copy2(url, token, parsed, conn):
+ conn.request('COPY', '%s/%s/%s' % (parsed.path,
+ shared_container,
+ 'obj1'),
+ '', {'X-Auth-Token': token,
+ 'Destination': '%s/%s' % (shared_container,
+ 'obj1')})
+ return check_response(conn)
+ resp = retry(copy2, use_account=3)
+ resp.read()
+ self.assertEquals(resp.status, 201)
+
+ # verify third account STILL can not copy from private container
+ def copy3(url, token, parsed, conn):
+ conn.request('COPY', '%s/%s/%s' % (parsed.path,
+ self.container,
+ self.obj),
+ '', {'X-Auth-Token': token,
+ 'Destination': '%s/%s' % (shared_container,
+ 'private_object')})
+ return check_response(conn)
+ resp = retry(copy3, use_account=3)
+ resp.read()
+ self.assertEquals(resp.status, 403)
+
+ # clean up "obj1"
+ def delete(url, token, parsed, conn):
+ conn.request('DELETE', '%s/%s/%s' % (parsed.path, shared_container,
+ 'obj1'), '', {'X-Auth-Token': token})
+ return check_response(conn)
+ resp = retry(delete)
+ resp.read()
+ self.assertEquals(resp.status, 204)
+
+ # clean up shared_container
+ def delete(url, token, parsed, conn):
+ conn.request('DELETE',
+ parsed.path + '/' + shared_container, '',
+ {'X-Auth-Token': token})
+ return check_response(conn)
+ resp = retry(delete)
+ resp.read()
+ self.assertEquals(resp.status, 204)
+
+ def test_manifest(self):
+ if skip:
+ raise SkipTest
+ # Data for the object segments
+ segments1 = ['one', 'two', 'three', 'four', 'five']
+ segments2 = ['six', 'seven', 'eight']
+ segments3 = ['nine', 'ten', 'eleven']
+
+ # Upload the first set of segments
+ def put(url, token, parsed, conn, objnum):
+ conn.request('PUT', '%s/%s/segments1/%s' % (parsed.path,
+ self.container, str(objnum)), segments1[objnum],
+ {'X-Auth-Token': token})
+ return check_response(conn)
+ for objnum in xrange(len(segments1)):
+ resp = retry(put, objnum)
+ resp.read()
+ self.assertEquals(resp.status, 201)
+
+ # Upload the manifest
+ def put(url, token, parsed, conn):
+ conn.request('PUT', '%s/%s/manifest' % (parsed.path,
+ self.container), '', {'X-Auth-Token': token,
+ 'X-Object-Manifest': '%s/segments1/' % self.container,
+ 'Content-Type': 'text/jibberish', 'Content-Length': '0'})
+ return check_response(conn)
+ resp = retry(put)
+ resp.read()
+ self.assertEquals(resp.status, 201)
+
+ # Get the manifest (should get all the segments as the body)
+ def get(url, token, parsed, conn):
+ conn.request('GET', '%s/%s/manifest' % (parsed.path,
+ self.container), '', {'X-Auth-Token': token})
+ return check_response(conn)
+ resp = retry(get)
+ self.assertEquals(resp.read(), ''.join(segments1))
+ self.assertEquals(resp.status, 200)
+ self.assertEquals(resp.getheader('content-type'), 'text/jibberish')
+
+ # Get with a range at the start of the second segment
+ def get(url, token, parsed, conn):
+ conn.request('GET', '%s/%s/manifest' % (parsed.path,
+ self.container), '', {'X-Auth-Token': token, 'Range':
+ 'bytes=3-'})
+ return check_response(conn)
+ resp = retry(get)
+ self.assertEquals(resp.read(), ''.join(segments1[1:]))
+ self.assertEquals(resp.status, 206)
+
+ # Get with a range in the middle of the second segment
+ def get(url, token, parsed, conn):
+ conn.request('GET', '%s/%s/manifest' % (parsed.path,
+ self.container), '', {'X-Auth-Token': token, 'Range':
+ 'bytes=5-'})
+ return check_response(conn)
+ resp = retry(get)
+ self.assertEquals(resp.read(), ''.join(segments1)[5:])
+ self.assertEquals(resp.status, 206)
+
+ # Get with a full start and stop range
+ def get(url, token, parsed, conn):
+ conn.request('GET', '%s/%s/manifest' % (parsed.path,
+ self.container), '', {'X-Auth-Token': token, 'Range':
+ 'bytes=5-10'})
+ return check_response(conn)
+ resp = retry(get)
+ self.assertEquals(resp.read(), ''.join(segments1)[5:11])
+ self.assertEquals(resp.status, 206)
+
+ # Upload the second set of segments
+ def put(url, token, parsed, conn, objnum):
+ conn.request('PUT', '%s/%s/segments2/%s' % (parsed.path,
+ self.container, str(objnum)), segments2[objnum],
+ {'X-Auth-Token': token})
+ return check_response(conn)
+ for objnum in xrange(len(segments2)):
+ resp = retry(put, objnum)
+ resp.read()
+ self.assertEquals(resp.status, 201)
+
+ # Get the manifest (should still be the first segments of course)
+ def get(url, token, parsed, conn):
+ conn.request('GET', '%s/%s/manifest' % (parsed.path,
+ self.container), '', {'X-Auth-Token': token})
+ return check_response(conn)
+ resp = retry(get)
+ self.assertEquals(resp.read(), ''.join(segments1))
+ self.assertEquals(resp.status, 200)
+
+ # Update the manifest
+ def put(url, token, parsed, conn):
+ conn.request('PUT', '%s/%s/manifest' % (parsed.path,
+ self.container), '', {'X-Auth-Token': token,
+ 'X-Object-Manifest': '%s/segments2/' % self.container,
+ 'Content-Length': '0'})
+ return check_response(conn)
+ resp = retry(put)
+ resp.read()
+ self.assertEquals(resp.status, 201)
+
+ # Get the manifest (should be the second set of segments now)
+ def get(url, token, parsed, conn):
+ conn.request('GET', '%s/%s/manifest' % (parsed.path,
+ self.container), '', {'X-Auth-Token': token})
+ return check_response(conn)
+ resp = retry(get)
+ self.assertEquals(resp.read(), ''.join(segments2))
+ self.assertEquals(resp.status, 200)
+
+ if not skip3:
+
+ # Ensure we can't access the manifest with the third account
+ def get(url, token, parsed, conn):
+ conn.request('GET', '%s/%s/manifest' % (parsed.path,
+ self.container), '', {'X-Auth-Token': token})
+ return check_response(conn)
+ resp = retry(get, use_account=3)
+ resp.read()
+ self.assertEquals(resp.status, 403)
+
+ # Grant access to the third account
+ def post(url, token, parsed, conn):
+ conn.request('POST', '%s/%s' % (parsed.path, self.container),
+ '', {'X-Auth-Token': token,
+ 'X-Container-Read': swift_test_perm[2]})
+ return check_response(conn)
+ resp = retry(post)
+ resp.read()
+ self.assertEquals(resp.status, 204)
+
+ # The third account should be able to get the manifest now
+ def get(url, token, parsed, conn):
+ conn.request('GET', '%s/%s/manifest' % (parsed.path,
+ self.container), '', {'X-Auth-Token': token})
+ return check_response(conn)
+ resp = retry(get, use_account=3)
+ self.assertEquals(resp.read(), ''.join(segments2))
+ self.assertEquals(resp.status, 200)
+
+ # Create another container for the third set of segments
+ acontainer = uuid4().hex
+
+ def put(url, token, parsed, conn):
+ conn.request('PUT', parsed.path + '/' + acontainer, '',
+ {'X-Auth-Token': token})
+ return check_response(conn)
+ resp = retry(put)
+ resp.read()
+ self.assertEquals(resp.status, 201)
+
+ # Upload the third set of segments in the other container
+ def put(url, token, parsed, conn, objnum):
+ conn.request('PUT', '%s/%s/segments3/%s' % (parsed.path,
+ acontainer, str(objnum)), segments3[objnum],
+ {'X-Auth-Token': token})
+ return check_response(conn)
+ for objnum in xrange(len(segments3)):
+ resp = retry(put, objnum)
+ resp.read()
+ self.assertEquals(resp.status, 201)
+
+ # Update the manifest
+ def put(url, token, parsed, conn):
+ conn.request('PUT', '%s/%s/manifest' % (parsed.path,
+ self.container), '', {'X-Auth-Token': token,
+ 'X-Object-Manifest': '%s/segments3/' % acontainer,
+ 'Content-Length': '0'})
+ return check_response(conn)
+ resp = retry(put)
+ resp.read()
+ self.assertEquals(resp.status, 201)
+
+ # Get the manifest to ensure it's the third set of segments
+ def get(url, token, parsed, conn):
+ conn.request('GET', '%s/%s/manifest' % (parsed.path,
+ self.container), '', {'X-Auth-Token': token})
+ return check_response(conn)
+ resp = retry(get)
+ self.assertEquals(resp.read(), ''.join(segments3))
+ self.assertEquals(resp.status, 200)
+
+ if not skip3:
+
+ # Ensure we can't access the manifest with the third account
+ # (because the segments are in a protected container even if the
+ # manifest itself is not).
+
+ def get(url, token, parsed, conn):
+ conn.request('GET', '%s/%s/manifest' % (parsed.path,
+ self.container), '', {'X-Auth-Token': token})
+ return check_response(conn)
+ resp = retry(get, use_account=3)
+ resp.read()
+ self.assertEquals(resp.status, 403)
+
+ # Grant access to the third account
+ def post(url, token, parsed, conn):
+ conn.request('POST', '%s/%s' % (parsed.path, acontainer),
+ '', {'X-Auth-Token': token,
+ 'X-Container-Read': swift_test_perm[2]})
+ return check_response(conn)
+ resp = retry(post)
+ resp.read()
+ self.assertEquals(resp.status, 204)
+
+ # The third account should be able to get the manifest now
+ def get(url, token, parsed, conn):
+ conn.request('GET', '%s/%s/manifest' % (parsed.path,
+ self.container), '', {'X-Auth-Token': token})
+ return check_response(conn)
+ resp = retry(get, use_account=3)
+ self.assertEquals(resp.read(), ''.join(segments3))
+ self.assertEquals(resp.status, 200)
+
+ # Delete the manifest
+ def delete(url, token, parsed, conn, objnum):
+ conn.request('DELETE', '%s/%s/manifest' % (parsed.path,
+ self.container), '', {'X-Auth-Token': token})
+ return check_response(conn)
+ resp = retry(delete, objnum)
+ resp.read()
+ self.assertEquals(resp.status, 204)
+
+ # Delete the third set of segments
+ def delete(url, token, parsed, conn, objnum):
+ conn.request('DELETE', '%s/%s/segments3/%s' % (parsed.path,
+ acontainer, str(objnum)), '', {'X-Auth-Token': token})
+ return check_response(conn)
+ for objnum in xrange(len(segments3)):
+ resp = retry(delete, objnum)
+ resp.read()
+ self.assertEquals(resp.status, 204)
+
+ # Delete the second set of segments
+ def delete(url, token, parsed, conn, objnum):
+ conn.request('DELETE', '%s/%s/segments2/%s' % (parsed.path,
+ self.container, str(objnum)), '', {'X-Auth-Token': token})
+ return check_response(conn)
+ for objnum in xrange(len(segments2)):
+ resp = retry(delete, objnum)
+ resp.read()
+ self.assertEquals(resp.status, 204)
+
+ # Delete the first set of segments
+ def delete(url, token, parsed, conn, objnum):
+ conn.request('DELETE', '%s/%s/segments1/%s' % (parsed.path,
+ self.container, str(objnum)), '', {'X-Auth-Token': token})
+ return check_response(conn)
+ for objnum in xrange(len(segments1)):
+ resp = retry(delete, objnum)
+ resp.read()
+ self.assertEquals(resp.status, 204)
+
+ # Delete the extra container
+ def delete(url, token, parsed, conn):
+ conn.request('DELETE', '%s/%s' % (parsed.path, acontainer), '',
+ {'X-Auth-Token': token})
+ return check_response(conn)
+ resp = retry(delete)
+ resp.read()
+ self.assertEquals(resp.status, 204)
+
+ def test_delete_content_type(self):
+ if skip:
+ raise SkipTest
+
+ def put(url, token, parsed, conn):
+ conn.request('PUT', '%s/%s/hi' % (parsed.path,
+ self.container), 'there', {'X-Auth-Token': token})
+ return check_response(conn)
+ resp = retry(put)
+ resp.read()
+ self.assertEquals(resp.status, 201)
+
+ def delete(url, token, parsed, conn):
+ conn.request('DELETE', '%s/%s/hi' % (parsed.path, self.container),
+ '', {'X-Auth-Token': token})
+ return check_response(conn)
+ resp = retry(delete)
+ resp.read()
+ self.assertEquals(resp.status, 204)
+ self.assertEquals(resp.getheader('Content-Type'),
+ 'text/html; charset=UTF-8')
+
+ def test_null_name(self):
+ if skip:
+ raise SkipTest
+
+ def put(url, token, parsed, conn):
+ conn.request('PUT', '%s/%s/abc%%00def' % (parsed.path,
+ self.container), 'test', {'X-Auth-Token': token})
+ return check_response(conn)
+ resp = retry(put)
+ if (web_front_end == 'apache2'):
+ self.assertEquals(resp.status, 404)
+ else:
+ self.assertEquals(resp.read(), 'Invalid UTF8 or contains NULL')
+ self.assertEquals(resp.status, 412)
+
+
+if __name__ == '__main__':
+ unittest.main()