summaryrefslogtreecommitdiffstats
path: root/test/functional/tests.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/functional/tests.py')
-rw-r--r--test/functional/tests.py668
1 files changed, 334 insertions, 334 deletions
diff --git a/test/functional/tests.py b/test/functional/tests.py
index dcd8b51..e5ccda2 100644
--- a/test/functional/tests.py
+++ b/test/functional/tests.py
@@ -1,5 +1,5 @@
#!/usr/bin/python -u
-# Copyright (c) 2010-2013 OpenStack, LLC.
+# Copyright (c) 2010-2012 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -14,20 +14,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-# Copyright (c) 2013 Red Hat, Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
-# implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
+# Modifications by Red Hat, Inc.
from datetime import datetime
import os
@@ -89,6 +76,7 @@ web_front_end = config.get('web_front_end', 'integral')
normalized_urls = config.get('normalized_urls', False)
set_object_name_component_length()
+
def load_constraint(name):
c = config[name]
if not isinstance(c, int):
@@ -143,7 +131,7 @@ def timeout(seconds, method, *args, **kwargs):
def run(self):
try:
self.method(*self.args, **self.kwargs)
- except Exception, e:
+ except Exception as e:
self.exception = e
t = TimeoutThread(method, *args, **kwargs)
@@ -304,23 +292,24 @@ class TestAccount(Base):
info = {'bytes': 0}
info['count'] = random.randint(10, 30)
for i in range(info['count']):
- file = container.file(Utils.create_name())
+ file_item = container.file(Utils.create_name())
bytes = random.randint(1, 32768)
- file.write_random(bytes)
+ file_item.write_random(bytes)
info['bytes'] += bytes
container_info[container.name] = info
- for format in ['json', 'xml']:
- for a in self.env.account.containers(parms={'format': format}):
+ for format_type in ['json', 'xml']:
+ for a in self.env.account.containers(
+ parms={'format': format_type}):
self.assert_(a['count'] >= 0)
self.assert_(a['bytes'] >= 0)
headers = dict(self.env.conn.response.getheaders())
- if format == 'json':
+ if format_type == 'json':
self.assertEquals(headers['content-type'],
'application/json; charset=utf-8')
- elif format == 'xml':
+ elif format_type == 'xml':
self.assertEquals(headers['content-type'],
'application/xml; charset=utf-8')
@@ -340,8 +329,8 @@ class TestAccount(Base):
def testContainerListing(self):
a = sorted([c.name for c in self.env.containers])
- for format in [None, 'json', 'xml']:
- b = self.env.account.containers(parms={'format': format})
+ for format_type in [None, 'json', 'xml']:
+ b = self.env.account.containers(parms={'format': format_type})
if isinstance(b[0], dict):
b = [x['name'] for x in b]
@@ -354,27 +343,29 @@ class TestAccount(Base):
self.assert_status(401)
def testLastContainerMarker(self):
- for format in [None, 'json', 'xml']:
- containers = self.env.account.containers({'format': format})
+ for format_type in [None, 'json', 'xml']:
+ containers = self.env.account.containers({'format': format_type})
self.assertEquals(len(containers), len(self.env.containers))
self.assert_status(200)
containers = self.env.account.containers(
- parms={'format': format, 'marker': containers[-1]})
+ parms={'format': format_type, 'marker': containers[-1]})
self.assertEquals(len(containers), 0)
- if format is None:
+ if format_type is None:
self.assert_status(204)
else:
self.assert_status(200)
def testMarkerLimitContainerList(self):
- for format in [None, 'json', 'xml']:
+ for format_type in [None, 'json', 'xml']:
for marker in ['0', 'A', 'I', 'R', 'Z', 'a', 'i', 'r', 'z',
'abc123', 'mnop', 'xyz']:
limit = random.randint(2, 9)
containers = self.env.account.containers(
- parms={'format': format, 'marker': marker, 'limit': limit})
+ parms={'format': format_type,
+ 'marker': marker,
+ 'limit': limit})
self.assert_(len(containers) <= limit)
if containers:
if isinstance(containers[0], dict):
@@ -382,9 +373,9 @@ class TestAccount(Base):
self.assert_(locale.strcoll(containers[0], marker) > 0)
def testContainersOrderedByName(self):
- for format in [None, 'json', 'xml']:
+ for format_type in [None, 'json', 'xml']:
containers = self.env.account.containers(
- parms={'format': format})
+ parms={'format': format_type})
if isinstance(containers[0], dict):
containers = [x['name'] for x in containers]
self.assertEquals(sorted(containers, cmp=locale.strcoll),
@@ -410,11 +401,11 @@ class TestAccountNoContainers(Base):
set_up = False
def testGetRequest(self):
- for format in [None, 'json', 'xml']:
+ for format_type in [None, 'json', 'xml']:
self.assert_(not self.env.account.containers(
- parms={'format': format}))
+ parms={'format': format_type}))
- if format is None:
+ if format_type is None:
self.assert_status(204)
else:
self.assert_status(200)
@@ -441,9 +432,9 @@ class TestContainerEnv:
cls.file_size = 128
cls.files = list()
for x in range(cls.file_count):
- file = cls.container.file(Utils.create_name())
- file.write_random(cls.file_size)
- cls.files.append(file.name)
+ file_item = cls.container.file(Utils.create_name())
+ file_item.write_random(cls.file_size)
+ cls.files.append(file_item.name)
class TestContainerDev(Base):
@@ -475,12 +466,12 @@ class TestContainer(Base):
def testFileThenContainerDelete(self):
cont = self.env.account.container(Utils.create_name())
self.assert_(cont.create())
- file = cont.file(Utils.create_name())
- self.assert_(file.write_random())
+ file_item = cont.file(Utils.create_name())
+ self.assert_(file_item.write_random())
- self.assert_(file.delete())
+ self.assert_(file_item.delete())
self.assert_status(204)
- self.assert_(file.name not in cont.files())
+ self.assert_(file_item.name not in cont.files())
self.assert_(cont.delete())
self.assert_status(204)
@@ -492,8 +483,8 @@ class TestContainer(Base):
files = sorted([Utils.create_name() for x in xrange(10)])
for f in files:
- file = cont.file(f)
- self.assert_(file.write_random())
+ file_item = cont.file(f)
+ self.assert_(file_item.write_random())
for i in xrange(len(files)):
f = files[i]
@@ -518,23 +509,23 @@ class TestContainer(Base):
prefix_files[prefix] = []
for i in range(prefix_file_count):
- file = cont.file(prefix + Utils.create_name())
- file.write()
- prefix_files[prefix].append(file.name)
+ file_item = cont.file(prefix + Utils.create_name())
+ file_item.write()
+ prefix_files[prefix].append(file_item.name)
- for format in [None, 'json', 'xml']:
+ for format_type in [None, 'json', 'xml']:
for prefix in prefixs:
files = cont.files(parms={'prefix': prefix})
self.assertEquals(files, sorted(prefix_files[prefix]))
- for format in [None, 'json', 'xml']:
+ for format_type in [None, 'json', 'xml']:
for prefix in prefixs:
files = cont.files(parms={'limit': limit_count,
'prefix': prefix})
self.assertEquals(len(files), limit_count)
- for file in files:
- self.assert_(file.startswith(prefix))
+ for file_item in files:
+ self.assert_(file_item.startswith(prefix))
def testCreate(self):
cont = self.env.account.container(Utils.create_name())
@@ -543,10 +534,10 @@ class TestContainer(Base):
self.assert_(cont.name in self.env.account.containers())
def testContainerFileListOnContainerThatDoesNotExist(self):
- for format in [None, 'json', 'xml']:
+ for format_type in [None, 'json', 'xml']:
container = self.env.account.container(Utils.create_name())
self.assertRaises(ResponseError, container.files,
- parms={'format': format})
+ parms={'format': format_type})
self.assert_status(404)
def testUtf8Container(self):
@@ -606,52 +597,52 @@ class TestContainer(Base):
def testDeleteOnContainerWithFiles(self):
cont = self.env.account.container(Utils.create_name())
self.assert_(cont.create())
- file = cont.file(Utils.create_name())
- file.write_random(self.env.file_size)
- self.assert_(file.name in cont.files())
+ file_item = cont.file(Utils.create_name())
+ file_item.write_random(self.env.file_size)
+ self.assert_(file_item.name in cont.files())
self.assert_(not cont.delete())
self.assert_status(409)
def testFileCreateInContainerThatDoesNotExist(self):
- file = File(self.env.conn, self.env.account, Utils.create_name(),
- Utils.create_name())
- self.assertRaises(ResponseError, file.write)
+ file_item = File(self.env.conn, self.env.account, Utils.create_name(),
+ Utils.create_name())
+ self.assertRaises(ResponseError, file_item.write)
self.assert_status(404)
def testLastFileMarker(self):
- for format in [None, 'json', 'xml']:
- files = self.env.container.files({'format': format})
+ for format_type in [None, 'json', 'xml']:
+ files = self.env.container.files({'format': format_type})
self.assertEquals(len(files), len(self.env.files))
self.assert_status(200)
files = self.env.container.files(
- parms={'format': format, 'marker': files[-1]})
+ parms={'format': format_type, 'marker': files[-1]})
self.assertEquals(len(files), 0)
- if format is None:
+ if format_type is None:
self.assert_status(204)
else:
self.assert_status(200)
def testContainerFileList(self):
- for format in [None, 'json', 'xml']:
- files = self.env.container.files(parms={'format': format})
+ for format_type in [None, 'json', 'xml']:
+ files = self.env.container.files(parms={'format': format_type})
self.assert_status(200)
if isinstance(files[0], dict):
files = [x['name'] for x in files]
- for file in self.env.files:
- self.assert_(file in files)
+ for file_item in self.env.files:
+ self.assert_(file_item in files)
- for file in files:
- self.assert_(file in self.env.files)
+ for file_item in files:
+ self.assert_(file_item in self.env.files)
def testMarkerLimitFileList(self):
- for format in [None, 'json', 'xml']:
+ for format_type in [None, 'json', 'xml']:
for marker in ['0', 'A', 'I', 'R', 'Z', 'a', 'i', 'r', 'z',
'abc123', 'mnop', 'xyz']:
limit = random.randint(2, self.env.file_count - 1)
- files = self.env.container.files(parms={'format': format,
+ files = self.env.container.files(parms={'format': format_type,
'marker': marker,
'limit': limit})
@@ -668,8 +659,8 @@ class TestContainer(Base):
self.assert_(locale.strcoll(files[0], marker) > 0)
def testFileOrder(self):
- for format in [None, 'json', 'xml']:
- files = self.env.container.files(parms={'format': format})
+ for format_type in [None, 'json', 'xml']:
+ files = self.env.container.files(parms={'format': format_type})
if isinstance(files[0], dict):
files = [x['name'] for x in files]
self.assertEquals(sorted(files, cmp=locale.strcoll), files)
@@ -687,8 +678,8 @@ class TestContainer(Base):
self.assert_status(404)
def testContainerFileListWithLimit(self):
- for format in [None, 'json', 'xml']:
- files = self.env.container.files(parms={'format': format,
+ for format_type in [None, 'json', 'xml']:
+ files = self.env.container.files(parms={'format': format_type,
'limit': 2})
self.assertEquals(len(files), 2)
@@ -707,8 +698,8 @@ class TestContainer(Base):
cont = self.env.account.container(Utils.create_name())
self.assertRaises(ResponseError, cont.files)
self.assert_(cont.create())
- file = cont.file(Utils.create_name())
- file.write_random()
+ file_item = cont.file(Utils.create_name())
+ file_item.write_random()
class TestContainerUTF8(Base2, TestContainer):
@@ -775,12 +766,13 @@ class TestContainerPathsEnv:
stored_files = set()
for f in cls.files:
- file = cls.container.file(f)
+ file_item = cls.container.file(f)
if f.endswith('/'):
- file.write(hdrs={'Content-Type': 'application/directory'})
+ file_item.write(hdrs={'Content-Type': 'application/directory'})
else:
- file.write_random(cls.file_size, hdrs={'Content-Type':
- 'application/directory'})
+ file_item.write_random(cls.file_size,
+ hdrs={'Content-Type':
+ 'application/directory'})
if (normalized_urls):
nfile = '/'.join(filter(None, f.split('/')))
if (f[-1] == '/'):
@@ -791,8 +783,6 @@ class TestContainerPathsEnv:
cls.stored_files = sorted(stored_files)
-
-
class TestContainerPaths(Base):
env = TestContainerPathsEnv
set_up = False
@@ -805,61 +795,62 @@ class TestContainerPaths(Base):
if count > 10:
raise ValueError('too deep recursion')
- for file in self.env.container.files(parms={'path': path}):
- self.assert_(file.startswith(path))
- if file.endswith('/'):
- recurse_path(file, count + 1)
- found_dirs.append(file)
+ for file_item in self.env.container.files(parms={'path': path}):
+ self.assert_(file_item.startswith(path))
+ if file_item.endswith('/'):
+ recurse_path(file_item, count + 1)
+ found_dirs.append(file_item)
else:
- found_files.append(file)
+ found_files.append(file_item)
recurse_path('')
- for file in self.env.stored_files:
- if file.startswith('/'):
- self.assert_(file not in found_dirs)
- self.assert_(file not in found_files)
- elif file.endswith('/'):
- self.assert_(file in found_dirs)
- self.assert_(file not in found_files)
+ for file_item in self.env.stored_files:
+ if file_item.startswith('/'):
+ self.assert_(file_item not in found_dirs)
+ self.assert_(file_item not in found_files)
+ elif file_item.endswith('/'):
+ self.assert_(file_item in found_dirs)
+ self.assert_(file_item not in found_files)
else:
- self.assert_(file in found_files)
- self.assert_(file not in found_dirs)
+ self.assert_(file_item in found_files)
+ self.assert_(file_item not in found_dirs)
found_files = []
found_dirs = []
recurse_path('/')
- for file in self.env.stored_files:
- if not file.startswith('/'):
- self.assert_(file not in found_dirs)
- self.assert_(file not in found_files)
- elif file.endswith('/'):
- self.assert_(file in found_dirs)
- self.assert_(file not in found_files)
+ for file_item in self.env.stored_files:
+ if not file_item.startswith('/'):
+ self.assert_(file_item not in found_dirs)
+ self.assert_(file_item not in found_files)
+ elif file_item.endswith('/'):
+ self.assert_(file_item in found_dirs)
+ self.assert_(file_item not in found_files)
else:
- self.assert_(file in found_files)
- self.assert_(file not in found_dirs)
+ self.assert_(file_item in found_files)
+ self.assert_(file_item not in found_dirs)
def testContainerListing(self):
- for format in (None, 'json', 'xml'):
- files = self.env.container.files(parms={'format': format})
+ for format_type in (None, 'json', 'xml'):
+ files = self.env.container.files(parms={'format': format_type})
if isinstance(files[0], dict):
files = [str(x['name']) for x in files]
self.assertEquals(files, self.env.stored_files)
- for format in ('json', 'xml'):
- for file in self.env.container.files(parms={'format': format}):
- self.assert_(int(file['bytes']) >= 0)
- self.assert_('last_modified' in file)
- if file['name'].endswith('/'):
- self.assertEquals(file['content_type'],
+ for format_type in ('json', 'xml'):
+ for file_item in self.env.container.files(parms={'format':
+ format_type}):
+ self.assert_(int(file_item['bytes']) >= 0)
+ self.assert_('last_modified' in file_item)
+ if file_item['name'].endswith('/'):
+ self.assertEquals(file_item['content_type'],
'application/directory')
def testStructure(self):
- def assert_listing(path, list):
+ def assert_listing(path, file_list):
files = self.env.container.files(parms={'path': path})
- self.assertEquals(sorted(list, cmp=locale.strcoll), files)
+ self.assertEquals(sorted(file_list, cmp=locale.strcoll), files)
if not normalized_urls:
assert_listing('/', ['/dir1/', '/dir2/', '/file1', '/file A'])
assert_listing('/dir1',
@@ -927,14 +918,14 @@ class TestFile(Base):
def testCopy(self):
# makes sure to test encoded characters"
source_filename = 'dealde%2Fl04 011e%204c8df/flash.png'
- file = self.env.container.file(source_filename)
+ file_item = self.env.container.file(source_filename)
metadata = {}
for i in range(1):
metadata[Utils.create_ascii_name()] = Utils.create_name()
- data = file.write_random()
- file.sync_metadata(metadata)
+ data = file_item.write_random()
+ file_item.sync_metadata(metadata)
dest_cont = self.env.account.container(Utils.create_name())
self.assert_(dest_cont.create())
@@ -945,21 +936,21 @@ class TestFile(Base):
for prefix in ('', '/'):
dest_filename = Utils.create_name()
- file = self.env.container.file(source_filename)
- file.copy('%s%s' % (prefix, cont), dest_filename)
+ file_item = self.env.container.file(source_filename)
+ file_item.copy('%s%s' % (prefix, cont), dest_filename)
self.assert_(dest_filename in cont.files())
- file = cont.file(dest_filename)
+ file_item = cont.file(dest_filename)
- self.assert_(data == file.read())
- self.assert_(file.initialize())
- self.assert_(metadata == file.metadata)
+ self.assert_(data == file_item.read())
+ self.assert_(file_item.initialize())
+ self.assert_(metadata == file_item.metadata)
def testCopy404s(self):
source_filename = Utils.create_name()
- file = self.env.container.file(source_filename)
- file.write_random()
+ file_item = self.env.container.file(source_filename)
+ file_item.write_random()
dest_cont = self.env.account.container(Utils.create_name())
self.assert_(dest_cont.create())
@@ -967,60 +958,65 @@ class TestFile(Base):
for prefix in ('', '/'):
# invalid source container
source_cont = self.env.account.container(Utils.create_name())
- file = source_cont.file(source_filename)
- self.assert_(not file.copy('%s%s' % (prefix, self.env.container),
- Utils.create_name()))
+ file_item = source_cont.file(source_filename)
+ self.assert_(not file_item.copy(
+ '%s%s' % (prefix, self.env.container),
+ Utils.create_name()))
self.assert_status(404)
- self.assert_(not file.copy('%s%s' % (prefix, dest_cont),
- Utils.create_name()))
+ self.assert_(not file_item.copy('%s%s' % (prefix, dest_cont),
+ Utils.create_name()))
self.assert_status(404)
# invalid source object
- file = self.env.container.file(Utils.create_name())
- self.assert_(not file.copy('%s%s' % (prefix, self.env.container),
- Utils.create_name()))
+ file_item = self.env.container.file(Utils.create_name())
+ self.assert_(not file_item.copy(
+ '%s%s' % (prefix, self.env.container),
+ Utils.create_name()))
self.assert_status(404)
- self.assert_(not file.copy('%s%s' % (prefix, dest_cont),
- Utils.create_name()))
+ self.assert_(not file_item.copy('%s%s' % (prefix, dest_cont),
+ Utils.create_name()))
self.assert_status(404)
# invalid destination container
- file = self.env.container.file(source_filename)
- self.assert_(not file.copy('%s%s' % (prefix, Utils.create_name()),
- Utils.create_name()))
+ file_item = self.env.container.file(source_filename)
+ self.assert_(not file_item.copy(
+ '%s%s' % (prefix, Utils.create_name()),
+ Utils.create_name()))
def testCopyNoDestinationHeader(self):
source_filename = Utils.create_name()
- file = self.env.container.file(source_filename)
- file.write_random()
+ file_item = self.env.container.file(source_filename)
+ file_item.write_random()
- file = self.env.container.file(source_filename)
- self.assert_(not file.copy(Utils.create_name(), Utils.create_name(),
+ file_item = self.env.container.file(source_filename)
+ self.assert_(not file_item.copy(Utils.create_name(),
+ Utils.create_name(),
cfg={'no_destination': True}))
self.assert_status(412)
def testCopyDestinationSlashProblems(self):
source_filename = Utils.create_name()
- file = self.env.container.file(source_filename)
- file.write_random()
+ file_item = self.env.container.file(source_filename)
+ file_item.write_random()
# no slash
- self.assert_(not file.copy(Utils.create_name(), Utils.create_name(),
+ self.assert_(not file_item.copy(Utils.create_name(),
+ Utils.create_name(),
cfg={'destination': Utils.create_name()}))
self.assert_status(412)
def testCopyFromHeader(self):
source_filename = Utils.create_name()
- file = self.env.container.file(source_filename)
+ file_item = self.env.container.file(source_filename)
metadata = {}
for i in range(1):
metadata[Utils.create_ascii_name()] = Utils.create_name()
- file.metadata = metadata
+ file_item.metadata = metadata
- data = file.write_random()
+ data = file_item.write_random()
dest_cont = self.env.account.container(Utils.create_name())
self.assert_(dest_cont.create())
@@ -1031,35 +1027,35 @@ class TestFile(Base):
for prefix in ('', '/'):
dest_filename = Utils.create_name()
- file = cont.file(dest_filename)
- file.write(hdrs={'X-Copy-From': '%s%s/%s' % (prefix,
- self.env.container.name, source_filename)})
+ file_item = cont.file(dest_filename)
+ file_item.write(hdrs={'X-Copy-From': '%s%s/%s' % (
+ prefix, self.env.container.name, source_filename)})
self.assert_(dest_filename in cont.files())
- file = cont.file(dest_filename)
+ file_item = cont.file(dest_filename)
- self.assert_(data == file.read())
- self.assert_(file.initialize())
- self.assert_(metadata == file.metadata)
+ self.assert_(data == file_item.read())
+ self.assert_(file_item.initialize())
+ self.assert_(metadata == file_item.metadata)
def testCopyFromHeader404s(self):
source_filename = Utils.create_name()
- file = self.env.container.file(source_filename)
- file.write_random()
+ file_item = self.env.container.file(source_filename)
+ file_item.write_random()
for prefix in ('', '/'):
# invalid source container
- file = self.env.container.file(Utils.create_name())
- self.assertRaises(ResponseError, file.write,
+ file_item = self.env.container.file(Utils.create_name())
+ self.assertRaises(ResponseError, file_item.write,
hdrs={'X-Copy-From': '%s%s/%s' %
(prefix,
Utils.create_name(), source_filename)})
self.assert_status(404)
# invalid source object
- file = self.env.container.file(Utils.create_name())
- self.assertRaises(ResponseError, file.write,
+ file_item = self.env.container.file(Utils.create_name())
+ self.assertRaises(ResponseError, file_item.write,
hdrs={'X-Copy-From': '%s%s/%s' %
(prefix,
self.env.container.name, Utils.create_name())})
@@ -1067,8 +1063,8 @@ class TestFile(Base):
# invalid destination container
dest_cont = self.env.account.container(Utils.create_name())
- file = dest_cont.file(Utils.create_name())
- self.assertRaises(ResponseError, file.write,
+ file_item = dest_cont.file(Utils.create_name())
+ self.assertRaises(ResponseError, file_item.write,
hdrs={'X-Copy-From': '%s%s/%s' %
(prefix,
self.env.container.name, source_filename)})
@@ -1078,13 +1074,13 @@ class TestFile(Base):
limit = load_constraint('max_object_name_length')
for l in (1, 10, limit / 2, limit - 1, limit, limit + 1, limit * 2):
- file = self.env.container.file(create_limit_filename(l))
+ file_item = self.env.container.file(create_limit_filename(l))
if l <= limit:
- self.assert_(file.write())
+ self.assert_(file_item.write())
self.assert_status(201)
else:
- self.assertRaises(ResponseError, file.write)
+ self.assertRaises(ResponseError, file_item.write)
self.assert_status(400)
def testQuestionMarkInName(self):
@@ -1095,30 +1091,32 @@ class TestFile(Base):
else:
file_name = Utils.create_name(6) + '?' + Utils.create_name(6)
- file = self.env.container.file(file_name)
- self.assert_(file.write(cfg={'no_path_quote': True}))
+ file_item = self.env.container.file(file_name)
+ self.assert_(file_item.write(cfg={'no_path_quote': True}))
self.assert_(file_name not in self.env.container.files())
self.assert_(file_name.split('?')[0] in self.env.container.files())
def testDeleteThen404s(self):
- file = self.env.container.file(Utils.create_name())
- self.assert_(file.write_random())
+ file_item = self.env.container.file(Utils.create_name())
+ self.assert_(file_item.write_random())
self.assert_status(201)
- self.assert_(file.delete())
+ self.assert_(file_item.delete())
self.assert_status(204)
- file.metadata = {Utils.create_ascii_name(): Utils.create_name()}
+ file_item.metadata = {Utils.create_ascii_name(): Utils.create_name()}
- for method in (file.info, file.read, file.sync_metadata,
- file.delete):
+ for method in (file_item.info,
+ file_item.read,
+ file_item.sync_metadata,
+ file_item.delete):
self.assertRaises(ResponseError, method)
self.assert_status(404)
def testBlankMetadataName(self):
- file = self.env.container.file(Utils.create_name())
- file.metadata = {'': Utils.create_name()}
- self.assertRaises(ResponseError, file.write_random)
+ file_item = self.env.container.file(Utils.create_name())
+ file_item.metadata = {'': Utils.create_name()}
+ self.assertRaises(ResponseError, file_item.write_random)
self.assert_status(400)
def testMetadataNumberLimit(self):
@@ -1143,22 +1141,22 @@ class TestFile(Base):
size += len(key) + len(val)
metadata[key] = val
- file = self.env.container.file(Utils.create_name())
- file.metadata = metadata
+ file_item = self.env.container.file(Utils.create_name())
+ file_item.metadata = metadata
if i <= number_limit:
- self.assert_(file.write())
+ self.assert_(file_item.write())
self.assert_status(201)
- self.assert_(file.sync_metadata())
+ self.assert_(file_item.sync_metadata())
self.assert_status((201, 202))
else:
- self.assertRaises(ResponseError, file.write)
+ self.assertRaises(ResponseError, file_item.write)
self.assert_status(400)
- file.metadata = {}
- self.assert_(file.write())
+ file_item.metadata = {}
+ self.assert_(file_item.write())
self.assert_status(201)
- file.metadata = metadata
- self.assertRaises(ResponseError, file.sync_metadata)
+ file_item.metadata = metadata
+ self.assertRaises(ResponseError, file_item.sync_metadata)
self.assert_status(400)
def testContentTypeGuessing(self):
@@ -1169,8 +1167,8 @@ class TestFile(Base):
self.assert_(container.create())
for i in file_types.keys():
- file = container.file(Utils.create_name() + '.' + i)
- file.write('', cfg={'no_content_type': True})
+ file_item = container.file(Utils.create_name() + '.' + i)
+ file_item.write('', cfg={'no_content_type': True})
file_types_read = {}
for i in container.files(parms={'format': 'json'}):
@@ -1181,13 +1179,13 @@ class TestFile(Base):
def testRangedGets(self):
file_length = 10000
range_size = file_length / 10
- file = self.env.container.file(Utils.create_name())
- data = file.write_random(file_length)
+ file_item = self.env.container.file(Utils.create_name())
+ data = file_item.write_random(file_length)
for i in range(0, file_length, range_size):
range_string = 'bytes=%d-%d' % (i, i + range_size - 1)
hdrs = {'Range': range_string}
- self.assert_(data[i: i + range_size] == file.read(hdrs=hdrs),
+ self.assert_(data[i: i + range_size] == file_item.read(hdrs=hdrs),
range_string)
range_string = 'bytes=-%d' % (i)
@@ -1198,45 +1196,45 @@ class TestFile(Base):
# least one suffix-byte-range-spec with a NON-ZERO
# suffix-length, then the byte-range-set is satisfiable.
# Otherwise, the byte-range-set is unsatisfiable.
- self.assertRaises(ResponseError, file.read, hdrs=hdrs)
+ self.assertRaises(ResponseError, file_item.read, hdrs=hdrs)
self.assert_status(416)
else:
- self.assertEquals(file.read(hdrs=hdrs), data[-i:])
+ self.assertEquals(file_item.read(hdrs=hdrs), data[-i:])
range_string = 'bytes=%d-' % (i)
hdrs = {'Range': range_string}
- self.assert_(file.read(hdrs=hdrs) == data[i - file_length:],
+ self.assert_(file_item.read(hdrs=hdrs) == data[i - file_length:],
range_string)
range_string = 'bytes=%d-%d' % (file_length + 1000, file_length + 2000)
hdrs = {'Range': range_string}
- self.assertRaises(ResponseError, file.read, hdrs=hdrs)
+ self.assertRaises(ResponseError, file_item.read, hdrs=hdrs)
self.assert_status(416)
range_string = 'bytes=%d-%d' % (file_length - 1000, file_length + 2000)
hdrs = {'Range': range_string}
- self.assert_(file.read(hdrs=hdrs) == data[-1000:], range_string)
+ self.assert_(file_item.read(hdrs=hdrs) == data[-1000:], range_string)
hdrs = {'Range': '0-4'}
- self.assert_(file.read(hdrs=hdrs) == data, range_string)
+ self.assert_(file_item.read(hdrs=hdrs) == data, range_string)
# RFC 2616 14.35.1
# "If the entity is shorter than the specified suffix-length, the
# entire entity-body is used."
range_string = 'bytes=-%d' % (file_length + 10)
hdrs = {'Range': range_string}
- self.assert_(file.read(hdrs=hdrs) == data, range_string)
+ self.assert_(file_item.read(hdrs=hdrs) == data, range_string)
def testRangedGetsWithLWSinHeader(self):
#Skip this test until webob 1.2 can tolerate LWS in Range header.
file_length = 10000
- file = self.env.container.file(Utils.create_name())
- data = file.write_random(file_length)
+ file_item = self.env.container.file(Utils.create_name())
+ data = file_item.write_random(file_length)
for r in ('BYTES=0-999', 'bytes = 0-999', 'BYTES = 0 - 999',
'bytes = 0 - 999', 'bytes=0 - 999', 'bytes=0-999 '):
- self.assert_(file.read(hdrs={'Range': r}) == data[0:1000])
+ self.assert_(file_item.read(hdrs={'Range': r}) == data[0:1000])
def testFileSizeLimit(self):
limit = load_constraint('max_file_size')
@@ -1245,39 +1243,40 @@ class TestFile(Base):
for i in (limit - 100, limit - 10, limit - 1, limit, limit + 1,
limit + 10, limit + 100):
- file = self.env.container.file(Utils.create_name())
+ file_item = self.env.container.file(Utils.create_name())
if i <= limit:
- self.assert_(timeout(tsecs, file.write,
+ self.assert_(timeout(tsecs, file_item.write,
cfg={'set_content_length': i}))
else:
self.assertRaises(ResponseError, timeout, tsecs,
- file.write, cfg={'set_content_length': i})
+ file_item.write,
+ cfg={'set_content_length': i})
def testNoContentLengthForPut(self):
- file = self.env.container.file(Utils.create_name())
- self.assertRaises(ResponseError, file.write, 'testing',
+ file_item = self.env.container.file(Utils.create_name())
+ self.assertRaises(ResponseError, file_item.write, 'testing',
cfg={'no_content_length': True})
self.assert_status(411)
def testDelete(self):
- file = self.env.container.file(Utils.create_name())
- file.write_random(self.env.file_size)
+ file_item = self.env.container.file(Utils.create_name())
+ file_item.write_random(self.env.file_size)
- self.assert_(file.name in self.env.container.files())
- self.assert_(file.delete())
- self.assert_(file.name not in self.env.container.files())
+ self.assert_(file_item.name in self.env.container.files())
+ self.assert_(file_item.delete())
+ self.assert_(file_item.name not in self.env.container.files())
def testBadHeaders(self):
file_length = 100
# no content type on puts should be ok
- file = self.env.container.file(Utils.create_name())
- file.write_random(file_length, cfg={'no_content_type': True})
+ file_item = self.env.container.file(Utils.create_name())
+ file_item.write_random(file_length, cfg={'no_content_type': True})
self.assert_status(201)
# content length x
- self.assertRaises(ResponseError, file.write_random, file_length,
+ self.assertRaises(ResponseError, file_item.write_random, file_length,
hdrs={'Content-Length': 'X'},
cfg={'no_content_length': True})
self.assert_status(400)
@@ -1290,7 +1289,7 @@ class TestFile(Base):
self.assert_status(405)
# bad range headers
- self.assert_(len(file.read(hdrs={'Range': 'parsecs=8-12'})) ==
+ self.assert_(len(file_item.read(hdrs={'Range': 'parsecs=8-12'})) ==
file_length)
self.assert_status(200)
@@ -1304,49 +1303,49 @@ class TestFile(Base):
for l in lengths:
metadata = {'a' * l[0]: 'b' * l[1]}
- file = self.env.container.file(Utils.create_name())
- file.metadata = metadata
+ file_item = self.env.container.file(Utils.create_name())
+ file_item.metadata = metadata
if l[0] <= key_limit and l[1] <= value_limit:
- self.assert_(file.write())
+ self.assert_(file_item.write())
self.assert_status(201)
- self.assert_(file.sync_metadata())
+ self.assert_(file_item.sync_metadata())
else:
- self.assertRaises(ResponseError, file.write)
+ self.assertRaises(ResponseError, file_item.write)
self.assert_status(400)
- file.metadata = {}
- self.assert_(file.write())
+ file_item.metadata = {}
+ self.assert_(file_item.write())
self.assert_status(201)
- file.metadata = metadata
- self.assertRaises(ResponseError, file.sync_metadata)
+ file_item.metadata = metadata
+ self.assertRaises(ResponseError, file_item.sync_metadata)
self.assert_status(400)
def testEtagWayoff(self):
- file = self.env.container.file(Utils.create_name())
+ file_item = self.env.container.file(Utils.create_name())
hdrs = {'etag': 'reallylonganddefinitelynotavalidetagvalue'}
- self.assertRaises(ResponseError, file.write_random, hdrs=hdrs)
+ self.assertRaises(ResponseError, file_item.write_random, hdrs=hdrs)
self.assert_status(422)
def testFileCreate(self):
for i in range(10):
- file = self.env.container.file(Utils.create_name())
- data = file.write_random()
+ file_item = self.env.container.file(Utils.create_name())
+ data = file_item.write_random()
self.assert_status(201)
- self.assert_(data == file.read())
+ self.assert_(data == file_item.read())
self.assert_status(200)
def testHead(self):
file_name = Utils.create_name()
content_type = Utils.create_name()
- file = self.env.container.file(file_name)
- file.content_type = content_type
- file.write_random(self.env.file_size)
+ file_item = self.env.container.file(file_name)
+ file_item.content_type = content_type
+ file_item.write_random(self.env.file_size)
- md5 = file.md5
+ md5 = file_item.md5
- file = self.env.container.file(file_name)
- info = file.info()
+ file_item = self.env.container.file(file_name)
+ info = file_item.info()
self.assert_status(200)
self.assertEquals(info['content_length'], self.env.file_size)
@@ -1356,83 +1355,83 @@ class TestFile(Base):
def testDeleteOfFileThatDoesNotExist(self):
# in container that exists
- file = self.env.container.file(Utils.create_name())
- self.assertRaises(ResponseError, file.delete)
+ file_item = self.env.container.file(Utils.create_name())
+ self.assertRaises(ResponseError, file_item.delete)
self.assert_status(404)
# in container that does not exist
container = self.env.account.container(Utils.create_name())
- file = container.file(Utils.create_name())
- self.assertRaises(ResponseError, file.delete)
+ file_item = container.file(Utils.create_name())
+ self.assertRaises(ResponseError, file_item.delete)
self.assert_status(404)
def testHeadOnFileThatDoesNotExist(self):
# in container that exists
- file = self.env.container.file(Utils.create_name())
- self.assertRaises(ResponseError, file.info)
+ file_item = self.env.container.file(Utils.create_name())
+ self.assertRaises(ResponseError, file_item.info)
self.assert_status(404)
# in container that does not exist
container = self.env.account.container(Utils.create_name())
- file = container.file(Utils.create_name())
- self.assertRaises(ResponseError, file.info)
+ file_item = container.file(Utils.create_name())
+ self.assertRaises(ResponseError, file_item.info)
self.assert_status(404)
def testMetadataOnPost(self):
- file = self.env.container.file(Utils.create_name())
- file.write_random(self.env.file_size)
+ file_item = self.env.container.file(Utils.create_name())
+ file_item.write_random(self.env.file_size)
for i in range(10):
metadata = {}
- for i in range(10):
+ for j in range(10):
metadata[Utils.create_ascii_name()] = Utils.create_name()
- file.metadata = metadata
- self.assert_(file.sync_metadata())
+ file_item.metadata = metadata
+ self.assert_(file_item.sync_metadata())
self.assert_status((201, 202))
- file = self.env.container.file(file.name)
- self.assert_(file.initialize())
+ file_item = self.env.container.file(file_item.name)
+ self.assert_(file_item.initialize())
self.assert_status(200)
- self.assertEquals(file.metadata, metadata)
+ self.assertEquals(file_item.metadata, metadata)
def testGetContentType(self):
file_name = Utils.create_name()
content_type = Utils.create_name()
- file = self.env.container.file(file_name)
- file.content_type = content_type
- file.write_random()
+ file_item = self.env.container.file(file_name)
+ file_item.content_type = content_type
+ file_item.write_random()
- file = self.env.container.file(file_name)
- file.read()
+ file_item = self.env.container.file(file_name)
+ file_item.read()
- self.assertEquals(content_type, file.content_type)
+ self.assertEquals(content_type, file_item.content_type)
def testGetOnFileThatDoesNotExist(self):
# in container that exists
- file = self.env.container.file(Utils.create_name())
- self.assertRaises(ResponseError, file.read)
+ file_item = self.env.container.file(Utils.create_name())
+ self.assertRaises(ResponseError, file_item.read)
self.assert_status(404)
# in container that does not exist
container = self.env.account.container(Utils.create_name())
- file = container.file(Utils.create_name())
- self.assertRaises(ResponseError, file.read)
+ file_item = container.file(Utils.create_name())
+ self.assertRaises(ResponseError, file_item.read)
self.assert_status(404)
def testPostOnFileThatDoesNotExist(self):
# in container that exists
- file = self.env.container.file(Utils.create_name())
- file.metadata['Field'] = 'Value'
- self.assertRaises(ResponseError, file.sync_metadata)
+ file_item = self.env.container.file(Utils.create_name())
+ file_item.metadata['Field'] = 'Value'
+ self.assertRaises(ResponseError, file_item.sync_metadata)
self.assert_status(404)
# in container that does not exist
container = self.env.account.container(Utils.create_name())
- file = container.file(Utils.create_name())
- file.metadata['Field'] = 'Value'
- self.assertRaises(ResponseError, file.sync_metadata)
+ file_item = container.file(Utils.create_name())
+ file_item.metadata['Field'] = 'Value'
+ self.assertRaises(ResponseError, file_item.sync_metadata)
self.assert_status(404)
def testMetadataOnPut(self):
@@ -1441,14 +1440,14 @@ class TestFile(Base):
for j in range(10):
metadata[Utils.create_ascii_name()] = Utils.create_name()
- file = self.env.container.file(Utils.create_name())
- file.metadata = metadata
- file.write_random(self.env.file_size)
+ file_item = self.env.container.file(Utils.create_name())
+ file_item.metadata = metadata
+ file_item.write_random(self.env.file_size)
- file = self.env.container.file(file.name)
- self.assert_(file.initialize())
+ file_item = self.env.container.file(file_item.name)
+ self.assert_(file_item.initialize())
self.assert_status(200)
- self.assertEquals(file.metadata, metadata)
+ self.assertEquals(file_item.metadata, metadata)
def testSerialization(self):
container = self.env.account.container(Utils.create_name())
@@ -1461,28 +1460,29 @@ class TestFile(Base):
write_time = time.time()
for f in files:
- file = container.file(f['name'])
- file.content_type = f['content_type']
- file.write_random(f['bytes'])
+ file_item = container.file(f['name'])
+ file_item.content_type = f['content_type']
+ file_item.write_random(f['bytes'])
- f['hash'] = file.md5
+ f['hash'] = file_item.md5
f['json'] = False
f['xml'] = False
write_time = time.time() - write_time
- for format in ['json', 'xml']:
- for file in container.files(parms={'format': format}):
+ for format_type in ['json', 'xml']:
+ for file_item in container.files(parms={'format': format_type}):
found = False
for f in files:
- if f['name'] != file['name']:
+ if f['name'] != file_item['name']:
continue
- self.assertEquals(file['content_type'],
+ self.assertEquals(file_item['content_type'],
f['content_type'])
- self.assertEquals(int(file['bytes']), f['bytes'])
+ self.assertEquals(int(file_item['bytes']), f['bytes'])
- d = datetime.strptime(file['last_modified'].split('.')[0],
- "%Y-%m-%dT%H:%M:%S")
+ d = datetime.strptime(
+ file_item['last_modified'].split('.')[0],
+ "%Y-%m-%dT%H:%M:%S")
lm = time.mktime(d.timetuple())
if 'last_modified' in f:
@@ -1490,17 +1490,17 @@ class TestFile(Base):
else:
f['last_modified'] = lm
- f[format] = True
+ f[format_type] = True
found = True
self.assert_(found, 'Unexpected file %s found in '
- '%s listing' % (file['name'], format))
+ '%s listing' % (file_item['name'], format_type))
headers = dict(self.env.conn.response.getheaders())
- if format == 'json':
+ if format_type == 'json':
self.assertEquals(headers['content-type'],
'application/json; charset=utf-8')
- elif format == 'xml':
+ elif format_type == 'xml':
self.assertEquals(headers['content-type'],
'application/xml; charset=utf-8')
@@ -1510,35 +1510,35 @@ class TestFile(Base):
'modified times should be less than time to write files')
for f in files:
- for format in ['json', 'xml']:
- self.assert_(f[format], 'File %s not found in %s listing'
- % (f['name'], format))
+ for format_type in ['json', 'xml']:
+ self.assert_(f[format_type], 'File %s not found in %s listing'
+ % (f['name'], format_type))
def testStackedOverwrite(self):
- file = self.env.container.file(Utils.create_name())
+ file_item = self.env.container.file(Utils.create_name())
for i in range(1, 11):
- data = file.write_random(512)
- file.write(data)
+ data = file_item.write_random(512)
+ file_item.write(data)
- self.assert_(file.read() == data)
+ self.assert_(file_item.read() == data)
def testTooLongName(self):
- file = self.env.container.file('x' * 1025)
- self.assertRaises(ResponseError, file.write)
+ file_item = self.env.container.file('x' * 1025)
+ self.assertRaises(ResponseError, file_item.write)
self.assert_status(400)
def testZeroByteFile(self):
- file = self.env.container.file(Utils.create_name())
+ file_item = self.env.container.file(Utils.create_name())
- self.assert_(file.write(''))
- self.assert_(file.name in self.env.container.files())
- self.assert_(file.read() == '')
+ self.assert_(file_item.write(''))
+ self.assert_(file_item.name in self.env.container.files())
+ self.assert_(file_item.read() == '')
def testEtagResponse(self):
- file = self.env.container.file(Utils.create_name())
+ file_item = self.env.container.file(Utils.create_name())
- data = StringIO.StringIO(file.write_random(512))
+ data = StringIO.StringIO(file_item.write_random(512))
etag = File.compute_md5sum(data)
headers = dict(self.env.conn.response.getheaders())
@@ -1554,15 +1554,15 @@ class TestFile(Base):
etag = File.compute_md5sum(data)
for i in (1, 10, 100, 1000):
- file = self.env.container.file(Utils.create_name())
+ file_item = self.env.container.file(Utils.create_name())
for j in chunks(data, i):
- file.chunked_write(j)
+ file_item.chunked_write(j)
- self.assert_(file.chunked_write())
- self.assert_(data == file.read())
+ self.assert_(file_item.chunked_write())
+ self.assert_(data == file_item.read())
- info = file.info()
+ info = file_item.info()
self.assertEquals(etag, info['etag'])
@@ -1588,9 +1588,9 @@ class TestFileComparisonEnv:
cls.file_size = 128
cls.files = list()
for x in range(cls.file_count):
- file = cls.container.file(Utils.create_name())
- file.write_random(cls.file_size)
- cls.files.append(file)
+ file_item = cls.container.file(Utils.create_name())
+ file_item.write_random(cls.file_size)
+ cls.files.append(file_item)
cls.time_old = time.asctime(time.localtime(time.time() - 86400))
cls.time_new = time.asctime(time.localtime(time.time() + 86400))
@@ -1601,55 +1601,55 @@ class TestFileComparison(Base):
set_up = False
def testIfMatch(self):
- for file in self.env.files:
- hdrs = {'If-Match': file.md5}
- self.assert_(file.read(hdrs=hdrs))
+ for file_item in self.env.files:
+ hdrs = {'If-Match': file_item.md5}
+ self.assert_(file_item.read(hdrs=hdrs))
hdrs = {'If-Match': 'bogus'}
- self.assertRaises(ResponseError, file.read, hdrs=hdrs)
+ self.assertRaises(ResponseError, file_item.read, hdrs=hdrs)
self.assert_status(412)
def testIfNoneMatch(self):
- for file in self.env.files:
+ for file_item in self.env.files:
hdrs = {'If-None-Match': 'bogus'}
- self.assert_(file.read(hdrs=hdrs))
+ self.assert_(file_item.read(hdrs=hdrs))
- hdrs = {'If-None-Match': file.md5}
- self.assertRaises(ResponseError, file.read, hdrs=hdrs)
+ hdrs = {'If-None-Match': file_item.md5}
+ self.assertRaises(ResponseError, file_item.read, hdrs=hdrs)
self.assert_status(304)
def testIfModifiedSince(self):
- for file in self.env.files:
+ for file_item in self.env.files:
hdrs = {'If-Modified-Since': self.env.time_old}
- self.assert_(file.read(hdrs=hdrs))
+ self.assert_(file_item.read(hdrs=hdrs))
hdrs = {'If-Modified-Since': self.env.time_new}
- self.assertRaises(ResponseError, file.read, hdrs=hdrs)
+ self.assertRaises(ResponseError, file_item.read, hdrs=hdrs)
self.assert_status(304)
def testIfUnmodifiedSince(self):
- for file in self.env.files:
+ for file_item in self.env.files:
hdrs = {'If-Unmodified-Since': self.env.time_new}
- self.assert_(file.read(hdrs=hdrs))
+ self.assert_(file_item.read(hdrs=hdrs))
hdrs = {'If-Unmodified-Since': self.env.time_old}
- self.assertRaises(ResponseError, file.read, hdrs=hdrs)
+ self.assertRaises(ResponseError, file_item.read, hdrs=hdrs)
self.assert_status(412)
def testIfMatchAndUnmodified(self):
- for file in self.env.files:
- hdrs = {'If-Match': file.md5,
+ for file_item in self.env.files:
+ hdrs = {'If-Match': file_item.md5,
'If-Unmodified-Since': self.env.time_new}
- self.assert_(file.read(hdrs=hdrs))
+ self.assert_(file_item.read(hdrs=hdrs))
hdrs = {'If-Match': 'bogus',
'If-Unmodified-Since': self.env.time_new}
- self.assertRaises(ResponseError, file.read, hdrs=hdrs)
+ self.assertRaises(ResponseError, file_item.read, hdrs=hdrs)
self.assert_status(412)
- hdrs = {'If-Match': file.md5,
+ hdrs = {'If-Match': file_item.md5,
'If-Unmodified-Since': self.env.time_old}
- self.assertRaises(ResponseError, file.read, hdrs=hdrs)
+ self.assertRaises(ResponseError, file_item.read, hdrs=hdrs)
self.assert_status(412)