summaryrefslogtreecommitdiffstats
path: root/test/functional/swift_test_client.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/functional/swift_test_client.py')
-rw-r--r--test/functional/swift_test_client.py736
1 files changed, 736 insertions, 0 deletions
diff --git a/test/functional/swift_test_client.py b/test/functional/swift_test_client.py
new file mode 100644
index 0000000..a6d8aec
--- /dev/null
+++ b/test/functional/swift_test_client.py
@@ -0,0 +1,736 @@
+# Copyright (c) 2010-2012 OpenStack, LLC.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import hashlib
+import httplib
+import os
+import random
+import socket
+import StringIO
+import time
+import urllib
+
+import simplejson as json
+
+from nose import SkipTest
+from xml.dom import minidom
+from swiftclient import get_auth
+
+
+class AuthenticationFailed(Exception):
+ pass
+
+
+class RequestError(Exception):
+ pass
+
+
+class ResponseError(Exception):
+ def __init__(self, response):
+ self.status = response.status
+ self.reason = response.reason
+ Exception.__init__(self)
+
+ def __str__(self):
+ return '%d: %s' % (self.status, self.reason)
+
+ def __repr__(self):
+ return '%d: %s' % (self.status, self.reason)
+
+
+def listing_empty(method):
+ for i in xrange(0, 6):
+ if len(method()) == 0:
+ return True
+
+ time.sleep(2 ** i)
+
+ return False
+
+
+def listing_items(method):
+ marker = None
+ once = True
+ items = []
+
+ while once or items:
+ for i in items:
+ yield i
+
+ if once or marker:
+ if marker:
+ items = method(parms={'marker': marker})
+ else:
+ items = method()
+
+ if len(items) == 10000:
+ marker = items[-1]
+ else:
+ marker = None
+
+ once = False
+ else:
+ items = []
+
+
+class Connection(object):
+ def __init__(self, config):
+ for key in 'auth_host auth_port auth_ssl username password'.split():
+ if key not in config:
+ raise SkipTest
+
+ self.auth_host = config['auth_host']
+ self.auth_port = int(config['auth_port'])
+ self.auth_ssl = config['auth_ssl'] in ('on', 'true', 'yes', '1')
+ self.auth_prefix = config.get('auth_prefix', '/')
+ self.auth_version = str(config.get('auth_version', '1'))
+
+ self.account = config.get('account')
+ self.username = config['username']
+ self.password = config['password']
+
+ self.storage_host = None
+ self.storage_port = None
+
+ self.conn_class = None
+
+ def get_account(self):
+ return Account(self, self.account)
+
+ def authenticate(self, clone_conn=None):
+ if clone_conn:
+ self.conn_class = clone_conn.conn_class
+ self.storage_host = clone_conn.storage_host
+ self.storage_url = clone_conn.storage_url
+ self.storage_port = clone_conn.storage_port
+ self.storage_token = clone_conn.storage_token
+ return
+
+ if self.auth_version == "1":
+ auth_path = '%sv1.0' % (self.auth_prefix)
+ if self.account:
+ auth_user = '%s:%s' % (self.account, self.username)
+ else:
+ auth_user = self.username
+ else:
+ auth_user = self.username
+ auth_path = self.auth_prefix
+ auth_scheme = 'https://' if self.auth_ssl else 'http://'
+ auth_netloc = "%s:%d" % (self.auth_host, self.auth_port)
+ auth_url = auth_scheme + auth_netloc + auth_path
+
+ (storage_url, storage_token) = get_auth(auth_url,
+ auth_user, self.password,
+ snet=False,
+ tenant_name=self.account,
+ auth_version=self.auth_version,
+ os_options={})
+
+ if not (storage_url and storage_token):
+ raise AuthenticationFailed()
+
+ x = storage_url.split('/')
+
+ if x[0] == 'http:':
+ self.conn_class = httplib.HTTPConnection
+ self.storage_port = 80
+ elif x[0] == 'https:':
+ self.conn_class = httplib.HTTPSConnection
+ self.storage_port = 443
+ else:
+ raise ValueError('unexpected protocol %s' % (x[0]))
+
+ self.storage_host = x[2].split(':')[0]
+ if ':' in x[2]:
+ self.storage_port = int(x[2].split(':')[1])
+ self.storage_url = '/%s/%s' % (x[3], x[4])
+
+ self.storage_token = storage_token
+
+ self.http_connect()
+ return self.storage_url, self.storage_token
+
+ def http_connect(self):
+ self.connection = self.conn_class(self.storage_host,
+ port=self.storage_port)
+ #self.connection.set_debuglevel(3)
+
+ def make_path(self, path=[], cfg={}):
+ if cfg.get('version_only_path'):
+ return '/' + self.storage_url.split('/')[1]
+
+ if path:
+ quote = urllib.quote
+ if cfg.get('no_quote') or cfg.get('no_path_quote'):
+ quote = lambda x: x
+ return '%s/%s' % (self.storage_url,
+ '/'.join([quote(i) for i in path]))
+ else:
+ return self.storage_url
+
+ def make_headers(self, hdrs, cfg={}):
+ headers = {}
+
+ if not cfg.get('no_auth_token'):
+ headers['X-Auth-Token'] = self.storage_token
+
+ if isinstance(hdrs, dict):
+ headers.update(hdrs)
+ return headers
+
+ def make_request(self, method, path=[], data='', hdrs={}, parms={},
+ cfg={}):
+ path = self.make_path(path, cfg=cfg)
+ headers = self.make_headers(hdrs, cfg=cfg)
+ if isinstance(parms, dict) and parms:
+ quote = urllib.quote
+ if cfg.get('no_quote') or cfg.get('no_parms_quote'):
+ quote = lambda x: x
+ query_args = ['%s=%s' % (quote(x), quote(str(y)))
+ for (x, y) in parms.items()]
+ path = '%s?%s' % (path, '&'.join(query_args))
+ if not cfg.get('no_content_length'):
+ if cfg.get('set_content_length'):
+ headers['Content-Length'] = cfg.get('set_content_length')
+ else:
+ headers['Content-Length'] = len(data)
+
+ def try_request():
+ self.http_connect()
+ self.connection.request(method, path, data, headers)
+ return self.connection.getresponse()
+
+ self.response = None
+ try_count = 0
+ while try_count < 5:
+ try_count += 1
+
+ try:
+ self.response = try_request()
+ except httplib.HTTPException:
+ continue
+
+ if self.response.status == 401:
+ self.authenticate()
+ continue
+ elif self.response.status == 503:
+ if try_count != 5:
+ time.sleep(5)
+ continue
+
+ break
+
+ if self.response:
+ return self.response.status
+
+ raise RequestError('Unable to complete http request')
+
+ def put_start(self, path, hdrs={}, parms={}, cfg={}, chunked=False):
+ self.http_connect()
+
+ path = self.make_path(path, cfg)
+ headers = self.make_headers(hdrs, cfg=cfg)
+
+ if chunked:
+ headers['Transfer-Encoding'] = 'chunked'
+ headers.pop('Content-Length', None)
+
+ if isinstance(parms, dict) and parms:
+ quote = urllib.quote
+ if cfg.get('no_quote') or cfg.get('no_parms_quote'):
+ quote = lambda x: x
+ query_args = ['%s=%s' % (quote(x), quote(str(y)))
+ for (x, y) in parms.items()]
+ path = '%s?%s' % (path, '&'.join(query_args))
+
+ query_args = ['%s=%s' % (urllib.quote(x),
+ urllib.quote(str(y))) for (x, y) in parms.items()]
+ path = '%s?%s' % (path, '&'.join(query_args))
+
+ self.connection = self.conn_class(self.storage_host,
+ port=self.storage_port)
+ #self.connection.set_debuglevel(3)
+ self.connection.putrequest('PUT', path)
+ for key, value in headers.iteritems():
+ self.connection.putheader(key, value)
+ self.connection.endheaders()
+
+ def put_data(self, data, chunked=False):
+ if chunked:
+ self.connection.send('%x\r\n%s\r\n' % (len(data), data))
+ else:
+ self.connection.send(data)
+
+ def put_end(self, chunked=False):
+ if chunked:
+ self.connection.send('0\r\n\r\n')
+
+ self.response = self.connection.getresponse()
+ self.connection.close()
+ return self.response.status
+
+
+class Base:
+ def __str__(self):
+ return self.name
+
+ def header_fields(self, fields):
+ headers = dict(self.conn.response.getheaders())
+ ret = {}
+ for field in fields:
+ if field[1] not in headers:
+ raise ValueError("%s was not found in response header" %
+ (field[1]))
+
+ try:
+ ret[field[0]] = int(headers[field[1]])
+ except ValueError:
+ ret[field[0]] = headers[field[1]]
+ return ret
+
+
+class Account(Base):
+ def __init__(self, conn, name):
+ self.conn = conn
+ self.name = str(name)
+
+ def container(self, container_name):
+ return Container(self.conn, self.name, container_name)
+
+ def containers(self, hdrs={}, parms={}, cfg={}):
+ format = parms.get('format', None)
+ if format not in [None, 'json', 'xml']:
+ raise RequestError('Invalid format: %s' % format)
+ if format is None and 'format' in parms:
+ del parms['format']
+
+ status = self.conn.make_request('GET', self.path, hdrs=hdrs,
+ parms=parms, cfg=cfg)
+ if status == 200:
+ if format == 'json':
+ conts = json.loads(self.conn.response.read())
+ for cont in conts:
+ cont['name'] = cont['name'].encode('utf-8')
+ return conts
+ elif format == 'xml':
+ conts = []
+ tree = minidom.parseString(self.conn.response.read())
+ for x in tree.getElementsByTagName('container'):
+ cont = {}
+ for key in ['name', 'count', 'bytes']:
+ cont[key] = x.getElementsByTagName(key)[0].\
+ childNodes[0].nodeValue
+ conts.append(cont)
+ for cont in conts:
+ cont['name'] = cont['name'].encode('utf-8')
+ return conts
+ else:
+ lines = self.conn.response.read().split('\n')
+ if lines and not lines[-1]:
+ lines = lines[:-1]
+ return lines
+ elif status == 204:
+ return []
+
+ raise ResponseError(self.conn.response)
+
+ def delete_containers(self):
+ for c in listing_items(self.containers):
+ cont = self.container(c)
+ if not cont.delete_recursive():
+ return False
+
+ return listing_empty(self.containers)
+
+ def info(self, hdrs={}, parms={}, cfg={}):
+ if self.conn.make_request('HEAD', self.path, hdrs=hdrs,
+ parms=parms, cfg=cfg) != 204:
+
+ raise ResponseError(self.conn.response)
+
+ fields = [['object_count', 'x-account-object-count'],
+ ['container_count', 'x-account-container-count'],
+ ['bytes_used', 'x-account-bytes-used']]
+
+ return self.header_fields(fields)
+
+ @property
+ def path(self):
+ return []
+
+
+class Container(Base):
+ def __init__(self, conn, account, name):
+ self.conn = conn
+ self.account = str(account)
+ self.name = str(name)
+
+ def create(self, hdrs={}, parms={}, cfg={}):
+ return self.conn.make_request('PUT', self.path, hdrs=hdrs,
+ parms=parms, cfg=cfg) in (201, 202)
+
+ def delete(self, hdrs={}, parms={}):
+ return self.conn.make_request('DELETE', self.path, hdrs=hdrs,
+ parms=parms) == 204
+
+ def delete_files(self):
+ for f in listing_items(self.files):
+ file = self.file(f)
+ if not file.delete():
+ return False
+
+ return listing_empty(self.files)
+
+ def delete_recursive(self):
+ return self.delete_files() and self.delete()
+
+ def file(self, file_name):
+ return File(self.conn, self.account, self.name, file_name)
+
+ def files(self, hdrs={}, parms={}, cfg={}):
+ format = parms.get('format', None)
+ if format not in [None, 'json', 'xml']:
+ raise RequestError('Invalid format: %s' % format)
+ if format is None and 'format' in parms:
+ del parms['format']
+
+ status = self.conn.make_request('GET', self.path, hdrs=hdrs,
+ parms=parms, cfg=cfg)
+ if status == 200:
+ if format == 'json':
+ files = json.loads(self.conn.response.read())
+
+ for file in files:
+ file['name'] = file['name'].encode('utf-8')
+ file['content_type'] = file['content_type'].encode('utf-8')
+ return files
+ elif format == 'xml':
+ files = []
+ tree = minidom.parseString(self.conn.response.read())
+ for x in tree.getElementsByTagName('object'):
+ file = {}
+ for key in ['name', 'hash', 'bytes', 'content_type',
+ 'last_modified']:
+
+ file[key] = x.getElementsByTagName(key)[0].\
+ childNodes[0].nodeValue
+ files.append(file)
+
+ for file in files:
+ file['name'] = file['name'].encode('utf-8')
+ file['content_type'] = file['content_type'].encode('utf-8')
+ return files
+ else:
+ content = self.conn.response.read()
+ if content:
+ lines = content.split('\n')
+ if lines and not lines[-1]:
+ lines = lines[:-1]
+ return lines
+ else:
+ return []
+ elif status == 204:
+ return []
+
+ raise ResponseError(self.conn.response)
+
+ def info(self, hdrs={}, parms={}, cfg={}):
+ status = self.conn.make_request('HEAD', self.path, hdrs=hdrs,
+ parms=parms, cfg=cfg)
+
+ if self.conn.response.status == 204:
+ fields = [['bytes_used', 'x-container-bytes-used'],
+ ['object_count', 'x-container-object-count']]
+
+ return self.header_fields(fields)
+
+ raise ResponseError(self.conn.response)
+
+ @property
+ def path(self):
+ return [self.name]
+
+
+class File(Base):
+ def __init__(self, conn, account, container, name):
+ self.conn = conn
+ self.account = str(account)
+ self.container = str(container)
+ self.name = str(name)
+
+ self.chunked_write_in_progress = False
+ self.content_type = None
+ self.size = None
+ self.metadata = {}
+
+ def make_headers(self, cfg={}):
+ headers = {}
+ if not cfg.get('no_content_length'):
+ if cfg.get('set_content_length'):
+ headers['Content-Length'] = cfg.get('set_content_length')
+ elif self.size:
+ headers['Content-Length'] = self.size
+ else:
+ headers['Content-Length'] = 0
+
+ if cfg.get('no_content_type'):
+ pass
+ elif self.content_type:
+ headers['Content-Type'] = self.content_type
+ else:
+ headers['Content-Type'] = 'application/octet-stream'
+
+ for key in self.metadata:
+ headers['X-Object-Meta-' + key] = self.metadata[key]
+
+ return headers
+
+ @classmethod
+ def compute_md5sum(cls, data):
+ block_size = 4096
+
+ if isinstance(data, str):
+ data = StringIO.StringIO(data)
+
+ checksum = hashlib.md5()
+ buff = data.read(block_size)
+ while buff:
+ checksum.update(buff)
+ buff = data.read(block_size)
+ data.seek(0)
+ return checksum.hexdigest()
+
+ def copy(self, dest_cont, dest_file, hdrs={}, parms={}, cfg={}):
+ if 'destination' in cfg:
+ headers = {'Destination': cfg['destination']}
+ elif cfg.get('no_destination'):
+ headers = {}
+ else:
+ headers = {'Destination': '%s/%s' % (dest_cont, dest_file)}
+ headers.update(hdrs)
+
+ if 'Destination' in headers:
+ headers['Destination'] = urllib.quote(headers['Destination'])
+
+ return self.conn.make_request('COPY', self.path, hdrs=headers,
+ parms=parms) == 201
+
+ def delete(self, hdrs={}, parms={}):
+ if self.conn.make_request('DELETE', self.path, hdrs=hdrs,
+ parms=parms) != 204:
+
+ raise ResponseError(self.conn.response)
+
+ return True
+
+ def info(self, hdrs={}, parms={}, cfg={}):
+ if self.conn.make_request('HEAD', self.path, hdrs=hdrs,
+ parms=parms, cfg=cfg) != 200:
+
+ raise ResponseError(self.conn.response)
+
+ fields = [['content_length', 'content-length'],
+ ['content_type', 'content-type'],
+ ['last_modified', 'last-modified'],
+ ['etag', 'etag']]
+
+ header_fields = self.header_fields(fields)
+ header_fields['etag'] = header_fields['etag'].strip('"')
+ return header_fields
+
+ def initialize(self, hdrs={}, parms={}):
+ if not self.name:
+ return False
+
+ status = self.conn.make_request('HEAD', self.path, hdrs=hdrs,
+ parms=parms)
+ if status == 404:
+ return False
+ elif (status < 200) or (status > 299):
+ raise ResponseError(self.conn.response)
+
+ for hdr in self.conn.response.getheaders():
+ if hdr[0].lower() == 'content-type':
+ self.content_type = hdr[1]
+ if hdr[0].lower().startswith('x-object-meta-'):
+ self.metadata[hdr[0][14:]] = hdr[1]
+ if hdr[0].lower() == 'etag':
+ self.etag = hdr[1].strip('"')
+ if hdr[0].lower() == 'content-length':
+ self.size = int(hdr[1])
+ if hdr[0].lower() == 'last-modified':
+ self.last_modified = hdr[1]
+
+ return True
+
+ def load_from_filename(self, filename, callback=None):
+ fobj = open(filename, 'rb')
+ self.write(fobj, callback=callback)
+ fobj.close()
+
+ @property
+ def path(self):
+ return [self.container, self.name]
+
+ @classmethod
+ def random_data(cls, size=None):
+ if size is None:
+ size = random.randint(1, 32768)
+ fd = open('/dev/urandom', 'r')
+ data = fd.read(size)
+ fd.close()
+ return data
+
+ def read(self, size=-1, offset=0, hdrs=None, buffer=None,
+ callback=None, cfg={}):
+
+ if size > 0:
+ range = 'bytes=%d-%d' % (offset, (offset + size) - 1)
+ if hdrs:
+ hdrs['Range'] = range
+ else:
+ hdrs = {'Range': range}
+
+ status = self.conn.make_request('GET', self.path, hdrs=hdrs,
+ cfg=cfg)
+
+ if(status < 200) or (status > 299):
+ raise ResponseError(self.conn.response)
+
+ for hdr in self.conn.response.getheaders():
+ if hdr[0].lower() == 'content-type':
+ self.content_type = hdr[1]
+
+ if hasattr(buffer, 'write'):
+ scratch = self.conn.response.read(8192)
+ transferred = 0
+
+ while len(scratch) > 0:
+ buffer.write(scratch)
+ transferred += len(scratch)
+ if callable(callback):
+ callback(transferred, self.size)
+ scratch = self.conn.response.read(8192)
+ return None
+ else:
+ return self.conn.response.read()
+
+ def read_md5(self):
+ status = self.conn.make_request('GET', self.path)
+
+ if(status < 200) or (status > 299):
+ raise ResponseError(self.conn.response)
+
+ checksum = hashlib.md5()
+
+ scratch = self.conn.response.read(8192)
+ while len(scratch) > 0:
+ checksum.update(scratch)
+ scratch = self.conn.response.read(8192)
+
+ return checksum.hexdigest()
+
+ def save_to_filename(self, filename, callback=None):
+ try:
+ fobj = open(filename, 'wb')
+ self.read(buffer=fobj, callback=callback)
+ finally:
+ fobj.close()
+
+ def sync_metadata(self, metadata={}, cfg={}):
+ self.metadata.update(metadata)
+
+ if self.metadata:
+ headers = self.make_headers(cfg=cfg)
+ if not cfg.get('no_content_length'):
+ if cfg.get('set_content_length'):
+ headers['Content-Length'] = \
+ cfg.get('set_content_length')
+ else:
+ headers['Content-Length'] = 0
+
+ self.conn.make_request('POST', self.path, hdrs=headers, cfg=cfg)
+
+ if self.conn.response.status not in (201, 202):
+ raise ResponseError(self.conn.response)
+
+ return True
+
+ def chunked_write(self, data=None, hdrs={}, parms={}, cfg={}):
+ if data is not None and self.chunked_write_in_progress:
+ self.conn.put_data(data, True)
+ elif data is not None:
+ self.chunked_write_in_progress = True
+
+ headers = self.make_headers(cfg=cfg)
+ headers.update(hdrs)
+
+ self.conn.put_start(self.path, hdrs=headers, parms=parms,
+ cfg=cfg, chunked=True)
+
+ self.conn.put_data(data, True)
+ elif self.chunked_write_in_progress:
+ self.chunked_write_in_progress = False
+ return self.conn.put_end(True) == 201
+ else:
+ raise RuntimeError
+
+ def write(self, data='', hdrs={}, parms={}, callback=None, cfg={}):
+ block_size = 2 ** 20
+
+ if isinstance(data, file):
+ try:
+ data.flush()
+ data.seek(0)
+ except IOError:
+ pass
+ self.size = int(os.fstat(data.fileno())[6])
+ else:
+ data = StringIO.StringIO(data)
+ self.size = data.len
+
+ headers = self.make_headers(cfg=cfg)
+ headers.update(hdrs)
+
+ self.conn.put_start(self.path, hdrs=headers, parms=parms, cfg=cfg)
+
+ transferred = 0
+ buff = data.read(block_size)
+ try:
+ while len(buff) > 0:
+ self.conn.put_data(buff)
+ buff = data.read(block_size)
+ transferred += len(buff)
+ if callable(callback):
+ callback(transferred, self.size)
+
+ self.conn.put_end()
+ except socket.timeout, err:
+ raise err
+
+ if (self.conn.response.status < 200) or \
+ (self.conn.response.status > 299):
+ raise ResponseError(self.conn.response)
+
+ self.md5 = self.compute_md5sum(data)
+
+ return True
+
+ def write_random(self, size=None, hdrs={}, parms={}, cfg={}):
+ data = self.random_data(size)
+ if not self.write(data, hdrs=hdrs, parms=parms, cfg=cfg):
+ raise ResponseError(self.conn.response)
+ self.md5 = self.compute_md5sum(StringIO.StringIO(data))
+ return data