summaryrefslogtreecommitdiffstats
path: root/test/functional/tests.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/functional/tests.py')
-rw-r--r--test/functional/tests.py470
1 files changed, 411 insertions, 59 deletions
diff --git a/test/functional/tests.py b/test/functional/tests.py
index 0d9a9ef..ad87d7e 100644
--- a/test/functional/tests.py
+++ b/test/functional/tests.py
@@ -19,14 +19,16 @@
from datetime import datetime
import os
import hashlib
+import hmac
import json
import locale
import random
import StringIO
import time
import threading
-import uuid
import unittest
+import urllib
+import uuid
from nose import SkipTest
from ConfigParser import ConfigParser
@@ -36,7 +38,7 @@ from test.functional.swift_test_client import Account, Connection, File, \
from swift.common.constraints import MAX_FILE_SIZE, MAX_META_NAME_LENGTH, \
MAX_META_VALUE_LENGTH, MAX_META_COUNT, MAX_META_OVERALL_SIZE, \
MAX_OBJECT_NAME_LENGTH, CONTAINER_LISTING_LIMIT, ACCOUNT_LISTING_LIMIT, \
- MAX_ACCOUNT_NAME_LENGTH, MAX_CONTAINER_NAME_LENGTH
+ MAX_ACCOUNT_NAME_LENGTH, MAX_CONTAINER_NAME_LENGTH, MAX_HEADER_SIZE
from gluster.swift.common.constraints import \
set_object_name_component_length, get_object_name_component_length
@@ -50,7 +52,8 @@ default_constraints = dict((
('container_listing_limit', CONTAINER_LISTING_LIMIT),
('account_listing_limit', ACCOUNT_LISTING_LIMIT),
('max_account_name_length', MAX_ACCOUNT_NAME_LENGTH),
- ('max_container_name_length', MAX_CONTAINER_NAME_LENGTH)))
+ ('max_container_name_length', MAX_CONTAINER_NAME_LENGTH),
+ ('max_header_size', MAX_HEADER_SIZE)))
constraints_conf = ConfigParser()
conf_exists = constraints_conf.read('/etc/swift/swift.conf')
# Constraints are set first from the test config, then from
@@ -285,7 +288,7 @@ class TestAccount(Base):
if try_count < 5:
time.sleep(1)
- self.assertEquals(info['container_count'], len(self.env.containers))
+ self.assertEqual(info['container_count'], len(self.env.containers))
self.assert_status(204)
def testContainerSerializedInfo(self):
@@ -309,11 +312,11 @@ class TestAccount(Base):
headers = dict(self.env.conn.response.getheaders())
if format_type == 'json':
- self.assertEquals(headers['content-type'],
- 'application/json; charset=utf-8')
+ self.assertEqual(headers['content-type'],
+ 'application/json; charset=utf-8')
elif format_type == 'xml':
- self.assertEquals(headers['content-type'],
- 'application/xml; charset=utf-8')
+ self.assertEqual(headers['content-type'],
+ 'application/xml; charset=utf-8')
def testListingLimit(self):
limit = load_constraint('account_listing_limit')
@@ -337,7 +340,7 @@ class TestAccount(Base):
if isinstance(b[0], dict):
b = [x['name'] for x in b]
- self.assertEquals(a, b)
+ self.assertEqual(a, b)
def testInvalidAuthToken(self):
hdrs = {'X-Auth-Token': 'bogus_auth_token'}
@@ -347,12 +350,12 @@ class TestAccount(Base):
def testLastContainerMarker(self):
for format_type in [None, 'json', 'xml']:
containers = self.env.account.containers({'format': format_type})
- self.assertEquals(len(containers), len(self.env.containers))
+ self.assertEqual(len(containers), len(self.env.containers))
self.assert_status(200)
containers = self.env.account.containers(
parms={'format': format_type, 'marker': containers[-1]})
- self.assertEquals(len(containers), 0)
+ self.assertEqual(len(containers), 0)
if format_type is None:
self.assert_status(204)
else:
@@ -380,8 +383,8 @@ class TestAccount(Base):
parms={'format': format_type})
if isinstance(containers[0], dict):
containers = [x['name'] for x in containers]
- self.assertEquals(sorted(containers, cmp=locale.strcoll),
- containers)
+ self.assertEqual(sorted(containers, cmp=locale.strcoll),
+ containers)
class TestAccountUTF8(Base2, TestAccount):
@@ -518,13 +521,13 @@ class TestContainer(Base):
for format_type in [None, 'json', 'xml']:
for prefix in prefixs:
files = cont.files(parms={'prefix': prefix})
- self.assertEquals(files, sorted(prefix_files[prefix]))
+ self.assertEqual(files, sorted(prefix_files[prefix]))
for format_type in [None, 'json', 'xml']:
for prefix in prefixs:
files = cont.files(parms={'limit': limit_count,
'prefix': prefix})
- self.assertEquals(len(files), limit_count)
+ self.assertEqual(len(files), limit_count)
for file_item in files:
self.assert_(file_item.startswith(prefix))
@@ -548,7 +551,7 @@ class TestContainer(Base):
container = self.env.account.container(valid_utf8)
self.assert_(container.create(cfg={'no_path_quote': True}))
self.assert_(container.name in self.env.account.containers())
- self.assertEquals(container.files(), [])
+ self.assertEqual(container.files(), [])
self.assert_(container.delete())
container = self.env.account.container(invalid_utf8)
@@ -614,12 +617,12 @@ class TestContainer(Base):
def testLastFileMarker(self):
for format_type in [None, 'json', 'xml']:
files = self.env.container.files({'format': format_type})
- self.assertEquals(len(files), len(self.env.files))
+ self.assertEqual(len(files), len(self.env.files))
self.assert_status(200)
files = self.env.container.files(
parms={'format': format_type, 'marker': files[-1]})
- self.assertEquals(len(files), 0)
+ self.assertEqual(len(files), 0)
if format_type is None:
self.assert_status(204)
@@ -665,14 +668,14 @@ class TestContainer(Base):
files = self.env.container.files(parms={'format': format_type})
if isinstance(files[0], dict):
files = [x['name'] for x in files]
- self.assertEquals(sorted(files, cmp=locale.strcoll), files)
+ self.assertEqual(sorted(files, cmp=locale.strcoll), files)
def testContainerInfo(self):
info = self.env.container.info()
self.assert_status(204)
- self.assertEquals(info['object_count'], self.env.file_count)
- self.assertEquals(info['bytes_used'],
- self.env.file_count * self.env.file_size)
+ self.assertEqual(info['object_count'], self.env.file_count)
+ self.assertEqual(info['bytes_used'],
+ self.env.file_count * self.env.file_size)
def testContainerInfoOnContainerThatDoesNotExist(self):
container = self.env.account.container(Utils.create_name())
@@ -683,7 +686,7 @@ class TestContainer(Base):
for format_type in [None, 'json', 'xml']:
files = self.env.container.files(parms={'format': format_type,
'limit': 2})
- self.assertEquals(len(files), 2)
+ self.assertEqual(len(files), 2)
def testTooLongName(self):
cont = self.env.account.container('x' * 257)
@@ -838,7 +841,7 @@ class TestContainerPaths(Base):
if isinstance(files[0], dict):
files = [str(x['name']) for x in files]
- self.assertEquals(files, self.env.stored_files)
+ self.assertEqual(files, self.env.stored_files)
for format_type in ('json', 'xml'):
for file_item in self.env.container.files(parms={'format':
@@ -846,13 +849,13 @@ class TestContainerPaths(Base):
self.assert_(int(file_item['bytes']) >= 0)
self.assert_('last_modified' in file_item)
if file_item['name'].endswith('/'):
- self.assertEquals(file_item['content_type'],
- 'application/directory')
+ self.assertEqual(file_item['content_type'],
+ 'application/directory')
def testStructure(self):
def assert_listing(path, file_list):
files = self.env.container.files(parms={'path': path})
- self.assertEquals(sorted(file_list, cmp=locale.strcoll), files)
+ self.assertEqual(sorted(file_list, cmp=locale.strcoll), files)
if not normalized_urls:
assert_listing('/', ['/dir1/', '/dir2/', '/file1', '/file A'])
assert_listing('/dir1',
@@ -1176,7 +1179,7 @@ class TestFile(Base):
for i in container.files(parms={'format': 'json'}):
file_types_read[i['name'].split('.')[1]] = i['content_type']
- self.assertEquals(file_types, file_types_read)
+ self.assertEqual(file_types, file_types_read)
def testRangedGets(self):
file_length = 10000
@@ -1201,7 +1204,7 @@ class TestFile(Base):
self.assertRaises(ResponseError, file_item.read, hdrs=hdrs)
self.assert_status(416)
else:
- self.assertEquals(file_item.read(hdrs=hdrs), data[-i:])
+ self.assertEqual(file_item.read(hdrs=hdrs), data[-i:])
range_string = 'bytes=%d-' % (i)
hdrs = {'Range': range_string}
@@ -1350,9 +1353,9 @@ class TestFile(Base):
info = file_item.info()
self.assert_status(200)
- self.assertEquals(info['content_length'], self.env.file_size)
- self.assertEquals(info['etag'], md5)
- self.assertEquals(info['content_type'], content_type)
+ self.assertEqual(info['content_length'], self.env.file_size)
+ self.assertEqual(info['etag'], md5)
+ self.assertEqual(info['content_type'], content_type)
self.assert_('last_modified' in info)
def testDeleteOfFileThatDoesNotExist(self):
@@ -1395,7 +1398,7 @@ class TestFile(Base):
file_item = self.env.container.file(file_item.name)
self.assert_(file_item.initialize())
self.assert_status(200)
- self.assertEquals(file_item.metadata, metadata)
+ self.assertEqual(file_item.metadata, metadata)
def testGetContentType(self):
file_name = Utils.create_name()
@@ -1408,7 +1411,7 @@ class TestFile(Base):
file_item = self.env.container.file(file_name)
file_item.read()
- self.assertEquals(content_type, file_item.content_type)
+ self.assertEqual(content_type, file_item.content_type)
def testGetOnFileThatDoesNotExist(self):
# in container that exists
@@ -1449,7 +1452,7 @@ class TestFile(Base):
file_item = self.env.container.file(file_item.name)
self.assert_(file_item.initialize())
self.assert_status(200)
- self.assertEquals(file_item.metadata, metadata)
+ self.assertEqual(file_item.metadata, metadata)
def testSerialization(self):
container = self.env.account.container(Utils.create_name())
@@ -1478,9 +1481,9 @@ class TestFile(Base):
if f['name'] != file_item['name']:
continue
- self.assertEquals(file_item['content_type'],
- f['content_type'])
- self.assertEquals(int(file_item['bytes']), f['bytes'])
+ self.assertEqual(file_item['content_type'],
+ f['content_type'])
+ self.assertEqual(int(file_item['bytes']), f['bytes'])
d = datetime.strptime(
file_item['last_modified'].split('.')[0],
@@ -1488,7 +1491,7 @@ class TestFile(Base):
lm = time.mktime(d.timetuple())
if 'last_modified' in f:
- self.assertEquals(f['last_modified'], lm)
+ self.assertEqual(f['last_modified'], lm)
else:
f['last_modified'] = lm
@@ -1500,11 +1503,11 @@ class TestFile(Base):
headers = dict(self.env.conn.response.getheaders())
if format_type == 'json':
- self.assertEquals(headers['content-type'],
- 'application/json; charset=utf-8')
+ self.assertEqual(headers['content-type'],
+ 'application/json; charset=utf-8')
elif format_type == 'xml':
- self.assertEquals(headers['content-type'],
- 'application/xml; charset=utf-8')
+ self.assertEqual(headers['content-type'],
+ 'application/xml; charset=utf-8')
lm_diff = max([f['last_modified'] for f in files]) -\
min([f['last_modified'] for f in files])
@@ -1547,7 +1550,7 @@ class TestFile(Base):
self.assert_('etag' in headers.keys())
header_etag = headers['etag'].strip('"')
- self.assertEquals(etag, header_etag)
+ self.assertEqual(etag, header_etag)
def testChunkedPut(self):
if (web_front_end == 'apache2'):
@@ -1565,7 +1568,7 @@ class TestFile(Base):
self.assert_(data == file_item.read())
info = file_item.info()
- self.assertEquals(etag, info['etag'])
+ self.assertEqual(etag, info['etag'])
class TestFileUTF8(Base2, TestFile):
@@ -1677,12 +1680,30 @@ class TestDlo(Base):
file_contents,
"aaaaaaaaaabbbbbbbbbbccccccccccddddddddddeeeeeeeeeeffffffffff")
+ def test_copy_manifest(self):
+ # Copying the manifest should result in another manifest
+ try:
+ man1_item = self.env.container.file('man1')
+ man1_item.copy(self.env.container.name, "copied-man1",
+ parms={'multipart-manifest': 'get'})
+
+ copied = self.env.container.file("copied-man1")
+ copied_contents = copied.read(parms={'multipart-manifest': 'get'})
+ self.assertEqual(copied_contents, "man1-contents")
+
+ copied_contents = copied.read()
+ self.assertEqual(
+ copied_contents,
+ "aaaaaaaaaabbbbbbbbbbccccccccccddddddddddeeeeeeeeee")
+ finally:
+ # try not to leave this around for other tests to stumble over
+ self.env.container.file("copied-man1").delete()
class TestDloUTF8(Base2, TestDlo):
set_up = False
-class TestFileComparisonEnv:
+class TestFileComparisonEnv(object):
@classmethod
def setUp(cls):
cls.conn = Connection(config)
@@ -1806,19 +1827,8 @@ class TestSloEnv(object):
cls.conn.authenticate()
if cls.slo_enabled is None:
- status = cls.conn.make_request('GET', '/info',
- cfg={'verbatim_path': True})
- if not (200 <= status <= 299):
- # Can't tell if SLO is enabled or not since we're running
- # against an old cluster, so let's skip the tests instead of
- # possibly having spurious failures.
- cls.slo_enabled = False
- else:
- # Don't bother looking for ValueError here. If something is
- # responding to a GET /info request with invalid JSON, then
- # the cluster is broken and a test failure will let us know.
- cluster_info = json.loads(cls.conn.response.read())
- cls.slo_enabled = 'slo' in cluster_info
+ cluster_info = cls.conn.cluster_info()
+ cls.slo_enabled = 'slo' in cluster_info
if not cls.slo_enabled:
return
@@ -2034,5 +2044,347 @@ class TestSloUTF8(Base2, TestSlo):
set_up = False
+class TestObjectVersioningEnv(object):
+ versioning_enabled = None # tri-state: None initially, then True/False
+
+ @classmethod
+ def setUp(cls):
+ cls.conn = Connection(config)
+ cls.conn.authenticate()
+
+ cls.account = Account(cls.conn, config.get('account',
+ config['username']))
+
+ # avoid getting a prefix that stops halfway through an encoded
+ # character
+ prefix = Utils.create_name().decode("utf-8")[:10].encode("utf-8")
+
+ cls.versions_container = cls.account.container(prefix + "-versions")
+ if not cls.versions_container.create():
+ raise ResponseError(cls.conn.response)
+
+ cls.container = cls.account.container(prefix + "-objs")
+ if not cls.container.create(
+ hdrs={'X-Versions-Location': cls.versions_container.name}):
+ raise ResponseError(cls.conn.response)
+
+ container_info = cls.container.info()
+ # if versioning is off, then X-Versions-Location won't persist
+ cls.versioning_enabled = 'versions' in container_info
+
+
+class TestObjectVersioning(Base):
+ env = TestObjectVersioningEnv
+ set_up = False
+
+ def setUp(self):
+ super(TestObjectVersioning, self).setUp()
+ if self.env.versioning_enabled is False:
+ raise SkipTest("Object versioning not enabled")
+ elif self.env.versioning_enabled is not True:
+ # just some sanity checking
+ raise Exception(
+ "Expected versioning_enabled to be True/False, got %r" %
+ (self.env.versioning_enabled,))
+
+ def test_overwriting(self):
+ container = self.env.container
+ versions_container = self.env.versions_container
+ obj_name = Utils.create_name()
+
+ versioned_obj = container.file(obj_name)
+ versioned_obj.write("aaaaa")
+
+ self.assertEqual(0, versions_container.info()['object_count'])
+
+ versioned_obj.write("bbbbb")
+
+ # the old version got saved off
+ self.assertEqual(1, versions_container.info()['object_count'])
+ versioned_obj_name = versions_container.files()[0]
+ self.assertEqual(
+ "aaaaa", versions_container.file(versioned_obj_name).read())
+
+ # if we overwrite it again, there are two versions
+ versioned_obj.write("ccccc")
+ self.assertEqual(2, versions_container.info()['object_count'])
+
+ # as we delete things, the old contents return
+ self.assertEqual("ccccc", versioned_obj.read())
+ versioned_obj.delete()
+ self.assertEqual("bbbbb", versioned_obj.read())
+ versioned_obj.delete()
+ self.assertEqual("aaaaa", versioned_obj.read())
+ versioned_obj.delete()
+ self.assertRaises(ResponseError, versioned_obj.read)
+
+
+class TestObjectVersioningUTF8(Base2, TestObjectVersioning):
+ set_up = False
+
+
+class TestTempurlEnv(object):
+ tempurl_enabled = None # tri-state: None initially, then True/False
+
+ @classmethod
+ def setUp(cls):
+ cls.conn = Connection(config)
+ cls.conn.authenticate()
+
+ if cls.tempurl_enabled is None:
+ cluster_info = cls.conn.cluster_info()
+ cls.tempurl_enabled = 'tempurl' in cluster_info
+ if not cls.tempurl_enabled:
+ return
+ cls.tempurl_methods = cluster_info['tempurl']['methods']
+
+ cls.tempurl_key = Utils.create_name()
+ cls.tempurl_key2 = Utils.create_name()
+
+ cls.account = Account(
+ cls.conn, config.get('account', config['username']))
+ cls.account.delete_containers()
+ cls.account.update_metadata({
+ 'temp-url-key': cls.tempurl_key,
+ 'temp-url-key-2': cls.tempurl_key2
+ })
+
+ cls.container = cls.account.container(Utils.create_name())
+ if not cls.container.create():
+ raise ResponseError(cls.conn.response)
+
+ cls.obj = cls.container.file(Utils.create_name())
+ cls.obj.write("obj contents")
+ cls.other_obj = cls.container.file(Utils.create_name())
+ cls.other_obj.write("other obj contents")
+
+
+class TestTempurl(Base):
+ env = TestTempurlEnv
+ set_up = False
+
+ def setUp(self):
+ super(TestTempurl, self).setUp()
+ if self.env.tempurl_enabled is False:
+ raise SkipTest("TempURL not enabled")
+ elif self.env.tempurl_enabled is not True:
+ # just some sanity checking
+ raise Exception(
+ "Expected tempurl_enabled to be True/False, got %r" %
+ (self.env.tempurl_enabled,))
+
+ expires = int(time.time()) + 86400
+ sig = self.tempurl_sig(
+ 'GET', expires, self.env.conn.make_path(self.env.obj.path),
+ self.env.tempurl_key)
+ self.obj_tempurl_parms = {'temp_url_sig': sig,
+ 'temp_url_expires': str(expires)}
+
+ def tempurl_sig(self, method, expires, path, key):
+ return hmac.new(
+ key,
+ '%s\n%s\n%s' % (method, expires, urllib.unquote(path)),
+ hashlib.sha1).hexdigest()
+
+ def test_GET(self):
+ contents = self.env.obj.read(
+ parms=self.obj_tempurl_parms,
+ cfg={'no_auth_token': True})
+ self.assertEqual(contents, "obj contents")
+
+ # GET tempurls also allow HEAD requests
+ self.assert_(self.env.obj.info(parms=self.obj_tempurl_parms,
+ cfg={'no_auth_token': True}))
+
+ def test_GET_with_key_2(self):
+ expires = int(time.time()) + 86400
+ sig = self.tempurl_sig(
+ 'GET', expires, self.env.conn.make_path(self.env.obj.path),
+ self.env.tempurl_key2)
+ parms = {'temp_url_sig': sig,
+ 'temp_url_expires': str(expires)}
+
+ contents = self.env.obj.read(parms=parms, cfg={'no_auth_token': True})
+ self.assertEqual(contents, "obj contents")
+
+ def test_PUT(self):
+ new_obj = self.env.container.file(Utils.create_name())
+
+ expires = int(time.time()) + 86400
+ sig = self.tempurl_sig(
+ 'PUT', expires, self.env.conn.make_path(new_obj.path),
+ self.env.tempurl_key)
+ put_parms = {'temp_url_sig': sig,
+ 'temp_url_expires': str(expires)}
+
+ new_obj.write('new obj contents',
+ parms=put_parms, cfg={'no_auth_token': True})
+ self.assertEqual(new_obj.read(), "new obj contents")
+
+ # PUT tempurls also allow HEAD requests
+ self.assert_(new_obj.info(parms=put_parms,
+ cfg={'no_auth_token': True}))
+
+ def test_HEAD(self):
+ expires = int(time.time()) + 86400
+ sig = self.tempurl_sig(
+ 'HEAD', expires, self.env.conn.make_path(self.env.obj.path),
+ self.env.tempurl_key)
+ head_parms = {'temp_url_sig': sig,
+ 'temp_url_expires': str(expires)}
+
+ self.assert_(self.env.obj.info(parms=head_parms,
+ cfg={'no_auth_token': True}))
+ # HEAD tempurls don't allow PUT or GET requests, despite the fact that
+ # PUT and GET tempurls both allow HEAD requests
+ self.assertRaises(ResponseError, self.env.other_obj.read,
+ cfg={'no_auth_token': True},
+ parms=self.obj_tempurl_parms)
+ self.assert_status([401])
+
+ self.assertRaises(ResponseError, self.env.other_obj.write,
+ 'new contents',
+ cfg={'no_auth_token': True},
+ parms=self.obj_tempurl_parms)
+ self.assert_status([401])
+
+ def test_different_object(self):
+ contents = self.env.obj.read(
+ parms=self.obj_tempurl_parms,
+ cfg={'no_auth_token': True})
+ self.assertEqual(contents, "obj contents")
+
+ self.assertRaises(ResponseError, self.env.other_obj.read,
+ cfg={'no_auth_token': True},
+ parms=self.obj_tempurl_parms)
+ self.assert_status([401])
+
+ def test_changing_sig(self):
+ contents = self.env.obj.read(
+ parms=self.obj_tempurl_parms,
+ cfg={'no_auth_token': True})
+ self.assertEqual(contents, "obj contents")
+
+ parms = self.obj_tempurl_parms.copy()
+ if parms['temp_url_sig'][0] == 'a':
+ parms['temp_url_sig'] = 'b' + parms['temp_url_sig'][1:]
+ else:
+ parms['temp_url_sig'] = 'a' + parms['temp_url_sig'][1:]
+
+ self.assertRaises(ResponseError, self.env.obj.read,
+ cfg={'no_auth_token': True},
+ parms=parms)
+ self.assert_status([401])
+
+ def test_changing_expires(self):
+ contents = self.env.obj.read(
+ parms=self.obj_tempurl_parms,
+ cfg={'no_auth_token': True})
+ self.assertEqual(contents, "obj contents")
+
+ parms = self.obj_tempurl_parms.copy()
+ if parms['temp_url_expires'][-1] == '0':
+ parms['temp_url_expires'] = parms['temp_url_expires'][:-1] + '1'
+ else:
+ parms['temp_url_expires'] = parms['temp_url_expires'][:-1] + '0'
+
+ self.assertRaises(ResponseError, self.env.obj.read,
+ cfg={'no_auth_token': True},
+ parms=parms)
+ self.assert_status([401])
+
+
+class TestTempurlUTF8(Base2, TestTempurl):
+ set_up = False
+
+
+class TestSloTempurlEnv(object):
+ enabled = None # tri-state: None initially, then True/False
+
+ @classmethod
+ def setUp(cls):
+ cls.conn = Connection(config)
+ cls.conn.authenticate()
+
+ if cls.enabled is None:
+ cluster_info = cls.conn.cluster_info()
+ cls.enabled = 'tempurl' in cluster_info and 'slo' in cluster_info
+
+ cls.tempurl_key = Utils.create_name()
+
+ cls.account = Account(
+ cls.conn, config.get('account', config['username']))
+ cls.account.delete_containers()
+ cls.account.update_metadata({'temp-url-key': cls.tempurl_key})
+
+ cls.manifest_container = cls.account.container(Utils.create_name())
+ cls.segments_container = cls.account.container(Utils.create_name())
+ if not cls.manifest_container.create():
+ raise ResponseError(cls.conn.response)
+ if not cls.segments_container.create():
+ raise ResponseError(cls.conn.response)
+
+ seg1 = cls.segments_container.file(Utils.create_name())
+ seg1.write('1' * 1024 * 1024)
+
+ seg2 = cls.segments_container.file(Utils.create_name())
+ seg2.write('2' * 1024 * 1024)
+
+ cls.manifest_data = [{'size_bytes': 1024 * 1024,
+ 'etag': seg1.md5,
+ 'path': '/%s/%s' % (cls.segments_container.name,
+ seg1.name)},
+ {'size_bytes': 1024 * 1024,
+ 'etag': seg2.md5,
+ 'path': '/%s/%s' % (cls.segments_container.name,
+ seg2.name)}]
+
+ cls.manifest = cls.manifest_container.file(Utils.create_name())
+ cls.manifest.write(
+ json.dumps(cls.manifest_data),
+ parms={'multipart-manifest': 'put'})
+
+
+class TestSloTempurl(Base):
+ env = TestSloTempurlEnv
+ set_up = False
+
+ def setUp(self):
+ super(TestSloTempurl, self).setUp()
+ if self.env.enabled is False:
+ raise SkipTest("TempURL and SLO not both enabled")
+ elif self.env.enabled is not True:
+ # just some sanity checking
+ raise Exception(
+ "Expected enabled to be True/False, got %r" %
+ (self.env.enabled,))
+
+ def tempurl_sig(self, method, expires, path, key):
+ return hmac.new(
+ key,
+ '%s\n%s\n%s' % (method, expires, urllib.unquote(path)),
+ hashlib.sha1).hexdigest()
+
+ def test_GET(self):
+ expires = int(time.time()) + 86400
+ sig = self.tempurl_sig(
+ 'GET', expires, self.env.conn.make_path(self.env.manifest.path),
+ self.env.tempurl_key)
+ parms = {'temp_url_sig': sig, 'temp_url_expires': str(expires)}
+
+ contents = self.env.manifest.read(
+ parms=parms,
+ cfg={'no_auth_token': True})
+ self.assertEqual(len(contents), 2 * 1024 * 1024)
+
+ # GET tempurls also allow HEAD requests
+ self.assert_(self.env.manifest.info(
+ parms=parms, cfg={'no_auth_token': True}))
+
+
+class TestSloTempurlUTF8(Base2, TestSloTempurl):
+ set_up = False
+
+
if __name__ == '__main__':
unittest.main()