From c86bf48f72686a51c2ed8963a678c8fce1c5cbf3 Mon Sep 17 00:00:00 2001 From: Thiago da Silva Date: Mon, 14 Oct 2013 10:35:50 -0400 Subject: fixing pep8 tests for new gswauth middleware Formatted original swauth code to pass pep8 tests. Change-Id: I7c63a102ece44e8331137d1d5576a58588fe53e2 Signed-off-by: Thiago da Silva Reviewed-on: http://review.gluster.org/6087 Reviewed-by: Luis Pabon Tested-by: Luis Pabon --- gluster/swift/common/middleware/gswauth/setup.py | 10 +- .../common/middleware/gswauth/swauth/middleware.py | 143 +- .../middleware/gswauth/test_swauth/__init__.py | 4 - .../gswauth/test_swauth/unit/test_authtypes.py | 1 - .../gswauth/test_swauth/unit/test_middleware.py | 3279 +++++++++++++------- tox.ini | 2 +- 6 files changed, 2169 insertions(+), 1270 deletions(-) diff --git a/gluster/swift/common/middleware/gswauth/setup.py b/gluster/swift/common/middleware/gswauth/setup.py index ceef1ca..f85718c 100644 --- a/gluster/swift/common/middleware/gswauth/setup.py +++ b/gluster/swift/common/middleware/gswauth/setup.py @@ -73,17 +73,17 @@ setup( 'Operating System :: POSIX :: Linux', 'Programming Language :: Python :: 2.6', 'Environment :: No Input/Output (Daemon)', - ], + ], install_requires=[], # removed for better compat scripts=[ 'bin/swauth-add-account', 'bin/swauth-add-user', 'bin/swauth-cleanup-tokens', 'bin/swauth-delete-account', 'bin/swauth-delete-user', 'bin/swauth-list', 'bin/swauth-prep', 'bin/swauth-set-account-service', - ], + ], entry_points={ 'paste.filter_factory': [ 'swauth=swauth.middleware:filter_factory', - ], - }, - ) + ], + }, +) diff --git a/gluster/swift/common/middleware/gswauth/swauth/middleware.py b/gluster/swift/common/middleware/gswauth/swauth/middleware.py index 495bea8..1dffec5 100644 --- a/gluster/swift/common/middleware/gswauth/swauth/middleware.py +++ b/gluster/swift/common/middleware/gswauth/swauth/middleware.py @@ -22,7 +22,7 @@ from time import gmtime, strftime, time from traceback import format_exc from urllib import quote, unquote from uuid import uuid4 -from hashlib import md5, sha1 +from hashlib import sha1 import hmac import base64 @@ -30,7 +30,7 @@ from eventlet.timeout import Timeout from eventlet import TimeoutError from swift.common.swob import HTTPAccepted, HTTPBadRequest, HTTPConflict, \ HTTPCreated, HTTPForbidden, HTTPMethodNotAllowed, HTTPMovedPermanently, \ - HTTPNoContent, HTTPNotFound, HTTPServiceUnavailable, HTTPUnauthorized, \ + HTTPNoContent, HTTPNotFound, HTTPUnauthorized, \ Request, Response from swift.common.bufferedhttp import http_connect_raw as http_connect @@ -82,8 +82,9 @@ class Swauth(object): raise ValueError(msg) self.swauth_remote_parsed = urlparse(self.swauth_remote) if self.swauth_remote_parsed.scheme not in ('http', 'https'): - msg = _('Cannot handle protocol scheme %s for url %s!') % \ - (self.swauth_remote_parsed.scheme, repr(self.swauth_remote)) + msg = _('Cannot handle protocol scheme %s for url %s!') % ( + self.swauth_remote_parsed.scheme, + repr(self.swauth_remote)) try: self.logger.critical(msg) except Exception: @@ -91,7 +92,8 @@ class Swauth(object): raise ValueError(msg) self.swauth_remote_timeout = int(conf.get('swauth_remote_timeout', 10)) self.auth_account = '%s.auth' % self.reseller_prefix - self.default_swift_cluster = conf.get('default_swift_cluster', + self.default_swift_cluster = conf.get( + 'default_swift_cluster', 'local#http://127.0.0.1:8080/v1') # This setting is a little messy because of the options it has to # provide. The basic format is cluster_name#url, such as the default @@ -130,7 +132,8 @@ class Swauth(object): self.timeout = int(conf.get('node_timeout', 10)) self.itoken = None self.itoken_expires = None - self.allowed_sync_hosts = [h.strip() + self.allowed_sync_hosts = [ + h.strip() for h in conf.get('allowed_sync_hosts', '127.0.0.1').split(',') if h.strip()] # Get an instance of our auth_type encoder for saving and checking the @@ -138,8 +141,9 @@ class Swauth(object): self.auth_type = conf.get('auth_type', 'Plaintext').title() self.auth_encoder = getattr(swauth.authtypes, self.auth_type, None) if self.auth_encoder is None: - raise Exception('Invalid auth_type in config file: %s' - % self.auth_type) + raise Exception( + 'Invalid auth_type in config file: %s' + % self.auth_type) self.auth_encoder.salt = conf.get('auth_type_salt', 'swauthsalt') self.allow_overrides = \ conf.get('allow_overrides', 't').lower() in TRUE_VALUES @@ -200,8 +204,8 @@ class Swauth(object): s3 = env.get('HTTP_AUTHORIZATION') token = env.get('HTTP_X_AUTH_TOKEN', env.get('HTTP_X_STORAGE_TOKEN')) if token and len(token) > swauth.authtypes.MAX_TOKEN_LENGTH: - return HTTPBadRequest(body='Token exceeds maximum length.')(env, - start_response) + return HTTPBadRequest(body='Token exceeds maximum length.')( + env, start_response) if s3 or (token and token.startswith(self.reseller_prefix)): # Note: Empty reseller_prefix will match all tokens. groups = self.get_groups(env, token) @@ -285,7 +289,7 @@ class Swauth(object): try: account = env['HTTP_AUTHORIZATION'].split(' ')[1] account, user, sign = account.split(':') - except Exception, err: + except Exception: self.logger.debug( 'Swauth cannot parse Authorization header value %r' % env['HTTP_AUTHORIZATION']) @@ -327,7 +331,8 @@ class Swauth(object): if not groups: if self.swauth_remote: with Timeout(self.swauth_remote_timeout): - conn = http_connect(self.swauth_remote_parsed.hostname, + conn = http_connect( + self.swauth_remote_parsed.hostname, self.swauth_remote_parsed.port, 'GET', '%s/v2/.token/%s' % (self.swauth_remote_parsed.path, quote(token)), @@ -476,8 +481,8 @@ class Swauth(object): req.start_time = time() handler = None try: - version, account, user, _junk = split_path(req.path_info, - minsegs=0, maxsegs=4, rest_with_last=True) + version, account, user, _junk = split_path( + req.path_info, minsegs=0, maxsegs=4, rest_with_last=True) except ValueError: return HTTPNotFound(request=req) if version in ('v1', 'v1.0', 'auth'): @@ -638,8 +643,8 @@ class Swauth(object): listing = [] marker = '' while True: - path = '/v1/%s?format=json&marker=%s' % (quote('%s/%s' % - (self.auth_account, account)), quote(marker)) + path = '/v1/%s?format=json&marker=%s' % (quote('%s/%s' % ( + self.auth_account, account)), quote(marker)) resp = self.make_pre_authed_request( req.environ, 'GET', path).get_response(self.app) if resp.status_int == 404: @@ -655,8 +660,9 @@ class Swauth(object): if obj['name'][0] != '.': listing.append({'name': obj['name']}) marker = sublisting[-1]['name'].encode('utf-8') - return Response(body=json.dumps({'account_id': account_id, - 'services': services, 'users': listing})) + return Response(body=json.dumps( + {'account_id': account_id, + 'services': services, 'users': listing})) def handle_set_services(self, req): """ @@ -755,14 +761,16 @@ class Swauth(object): resp = self.make_pre_authed_request( req.environ, 'PUT', path).get_response(self.app) if resp.status_int // 100 != 2: - raise Exception('Could not create account within main auth ' + raise Exception( + 'Could not create account within main auth ' 'account: %s %s' % (path, resp.status)) elif resp.status_int // 100 == 2: if 'x-container-meta-account-id' in resp.headers: # Account was already created return HTTPAccepted(request=req) else: - raise Exception('Could not verify account within main auth ' + raise Exception( + 'Could not verify account within main auth ' 'account: %s %s' % (path, resp.status)) account_suffix = req.headers.get('x-account-suffix') if not account_suffix: @@ -772,17 +780,20 @@ class Swauth(object): self.reseller_prefix, account_suffix)) try: conn = self.get_conn() - conn.request('PUT', path, - headers={'X-Auth-Token': self.get_itoken(req.environ), - 'Content-Length': '0'}) + conn.request( + 'PUT', path, + headers={'X-Auth-Token': self.get_itoken(req.environ), + 'Content-Length': '0'}) resp = conn.getresponse() resp.read() if resp.status // 100 != 2: - raise Exception('Could not create account on the Swift ' + raise Exception( + 'Could not create account on the Swift ' 'cluster: %s %s %s' % (path, resp.status, resp.reason)) except (Exception, TimeoutError): - self.logger.error(_('ERROR: Exception while trying to communicate ' - 'with %(scheme)s://%(host)s:%(port)s/%(path)s'), + self.logger.error( + _('ERROR: Exception while trying to communicate ' + 'with %(scheme)s://%(host)s:%(port)s/%(path)s'), {'scheme': self.dsc_parsed2.scheme, 'host': self.dsc_parsed2.hostname, 'port': self.dsc_parsed2.port, 'path': path}) @@ -798,7 +809,8 @@ class Swauth(object): # Record the cluster url(s) for the account path = quote('/v1/%s/%s/.services' % (self.auth_account, account)) services = {'storage': {}} - services['storage'][self.dsc_name] = '%s/%s%s' % (self.dsc_url, + services['storage'][self.dsc_name] = '%s/%s%s' % ( + self.dsc_url, self.reseller_prefix, account_suffix) services['storage']['default'] = self.dsc_name resp = self.make_pre_authed_request( @@ -834,8 +846,9 @@ class Swauth(object): # Make sure the account has no users and get the account_id marker = '' while True: - path = '/v1/%s?format=json&marker=%s' % (quote('%s/%s' % - (self.auth_account, account)), quote(marker)) + path = '/v1/%s?format=json&marker=%s' % ( + quote('%s/%s' % ( + self.auth_account, account)), quote(marker)) resp = self.make_pre_authed_request( req.environ, 'GET', path).get_response(self.app) if resp.status_int == 404: @@ -866,19 +879,22 @@ class Swauth(object): if name != 'default': parsed = urlparse(url) conn = self.get_conn(parsed) - conn.request('DELETE', parsed.path, + conn.request( + 'DELETE', parsed.path, headers={'X-Auth-Token': self.get_itoken(req.environ)}) resp = conn.getresponse() resp.read() if resp.status == 409: if deleted_any: - raise Exception('Managed to delete one or more ' + raise Exception( + 'Managed to delete one or more ' 'service end points, but failed with: ' '%s %s %s' % (url, resp.status, resp.reason)) else: return HTTPConflict(request=req) if resp.status // 100 != 2 and resp.status != 404: - raise Exception('Could not delete account on the ' + raise Exception( + 'Could not delete account on the ' 'Swift cluster: %s %s %s' % (url, resp.status, resp.reason)) deleted_any = True @@ -962,8 +978,10 @@ class Swauth(object): groups = set() marker = '' while True: - path = '/v1/%s?format=json&marker=%s' % (quote('%s/%s' % - (self.auth_account, account)), quote(marker)) + path = '/v1/%s?format=json&marker=%s' % ( + quote( + '%s/%s' % + (self.auth_account, account)), quote(marker)) resp = self.make_pre_authed_request( req.environ, 'GET', path).get_response(self.app) if resp.status_int == 404: @@ -983,11 +1001,11 @@ class Swauth(object): if resp.status_int // 100 != 2: raise Exception('Could not retrieve user object: ' '%s %s' % (path, resp.status)) - groups.update(g['name'] - for g in json.loads(resp.body)['groups']) + groups.update( + g['name'] for g in json.loads(resp.body)['groups']) marker = sublisting[-1]['name'].encode('utf-8') - body = json.dumps({'groups': - [{'name': g} for g in sorted(groups)]}) + body = json.dumps( + {'groups': [{'name': g} for g in sorted(groups)]}) else: path = quote('/v1/%s/%s/%s' % (self.auth_account, account, user)) resp = self.make_pre_authed_request( @@ -1002,8 +1020,8 @@ class Swauth(object): if ('.admin' in display_groups and not self.is_reseller_admin(req)) or \ ('.reseller_admin' in display_groups and - not self.is_super_admin(req)): - return HTTPForbidden(request=req) + not self.is_super_admin(req)): + return HTTPForbidden(request=req) return Response(body=body) def handle_put_user(self, req): @@ -1047,7 +1065,7 @@ class Swauth(object): raise Exception('Could not retrieve account id value: %s %s' % (path, resp.status)) headers = {'X-Object-Meta-Account-Id': - resp.headers['x-container-meta-account-id']} + resp.headers['x-container-meta-account-id']} # Create the object in the main auth account (this object represents # the user) path = quote('/v1/%s/%s/%s' % (self.auth_account, account, user)) @@ -1098,7 +1116,8 @@ class Swauth(object): (path, resp.status)) candidate_token = resp.headers.get('x-object-meta-auth-token') if candidate_token: - path = quote('/v1/%s/.token_%s/%s' % + path = quote( + '/v1/%s/.token_%s/%s' % (self.auth_account, candidate_token[-1], candidate_token)) resp = self.make_pre_authed_request( req.environ, 'DELETE', path).get_response(self.app) @@ -1204,10 +1223,13 @@ class Swauth(object): key == self.super_admin_key: token = self.get_itoken(req.environ) url = '%s/%s.auth' % (self.dsc_url, self.reseller_prefix) - return Response(request=req, - body=json.dumps({'storage': {'default': 'local', 'local': url}}), - headers={'x-auth-token': token, 'x-storage-token': token, - 'x-storage-url': url}) + return Response( + request=req, + body=json.dumps( + {'storage': {'default': 'local', + 'local': url}}), + headers={'x-auth-token': token, 'x-storage-token': token, + 'x-storage-url': url}) # Authenticate user path = quote('/v1/%s/%s/%s' % (self.auth_account, account, user)) resp = self.make_pre_authed_request( @@ -1225,7 +1247,8 @@ class Swauth(object): expires = None candidate_token = resp.headers.get('x-object-meta-auth-token') if candidate_token: - path = quote('/v1/%s/.token_%s/%s' % + path = quote( + '/v1/%s/.token_%s/%s' % (self.auth_account, candidate_token[-1], candidate_token)) delete_token = False try: @@ -1276,10 +1299,11 @@ class Swauth(object): expires = int(time() + token_life) resp = self.make_pre_authed_request( req.environ, 'PUT', path, - json.dumps({'account': account, 'user': user, - 'account_id': account_id, - 'groups': user_detail['groups'], - 'expires': expires})).get_response(self.app) + json.dumps( + {'account': account, 'user': user, + 'account_id': account_id, + 'groups': user_detail['groups'], + 'expires': expires})).get_response(self.app) if resp.status_int // 100 != 2: raise Exception('Could not create new token: %s %s' % (path, resp.status)) @@ -1301,7 +1325,8 @@ class Swauth(object): (path, resp.status)) detail = json.loads(resp.body) url = detail['storage'][detail['storage']['default']] - return Response(request=req, body=resp.body, + return Response( + request=req, body=resp.body, headers={'x-auth-token': token, 'x-storage-token': token, 'x-auth-token-expires': str(int(expires - time())), 'x-storage-url': url}) @@ -1444,7 +1469,7 @@ class Swauth(object): :returns: True if the key is valid for the user, False if not. """ return user_detail and self.auth_encoder().match( - key, user_detail.get('auth')) + key, user_detail.get('auth')) def is_super_admin(self, req): """ @@ -1455,8 +1480,8 @@ class Swauth(object): :param returns: True if .super_admin. """ return req.headers.get('x-auth-admin-user') == '.super_admin' and \ - self.super_admin_key and \ - req.headers.get('x-auth-admin-key') == self.super_admin_key + self.super_admin_key and \ + req.headers.get('x-auth-admin-key') == self.super_admin_key def is_reseller_admin(self, req, admin_detail=None): """ @@ -1497,7 +1522,7 @@ class Swauth(object): req.headers.get('x-auth-admin-key')): return False return admin_detail and admin_detail['account'] == account and \ - '.admin' in (g['name'] for g in admin_detail['groups']) + '.admin' in (g['name'] for g in admin_detail['groups']) return False def posthooklogger(self, env, req): @@ -1523,11 +1548,13 @@ class Swauth(object): if getattr(req, 'client_disconnect', False) or \ getattr(response, 'client_disconnect', False): status_int = 499 - self.logger.info(' '.join(quote(str(x)) for x in (client or '-', + self.logger.info( + ' '.join(quote(str(x)) for x in (client or '-', req.remote_addr or '-', strftime('%d/%b/%Y/%H/%M/%S', gmtime()), req.method, the_request, req.environ['SERVER_PROTOCOL'], status_int, req.referer or '-', req.user_agent or '-', - req.headers.get('x-auth-token', + req.headers.get( + 'x-auth-token', req.headers.get('x-auth-admin-user', '-')), getattr(req, 'bytes_transferred', 0) or '-', getattr(response, 'bytes_transferred', 0) or '-', diff --git a/gluster/swift/common/middleware/gswauth/test_swauth/__init__.py b/gluster/swift/common/middleware/gswauth/test_swauth/__init__.py index 0ee3666..f53bc5a 100644 --- a/gluster/swift/common/middleware/gswauth/test_swauth/__init__.py +++ b/gluster/swift/common/middleware/gswauth/test_swauth/__init__.py @@ -2,9 +2,5 @@ # The code below enables nosetests to work with i18n _() blocks import __builtin__ -import sys -import os -from ConfigParser import MissingSectionHeaderError -from StringIO import StringIO setattr(__builtin__, '_', lambda x: x) diff --git a/gluster/swift/common/middleware/gswauth/test_swauth/unit/test_authtypes.py b/gluster/swift/common/middleware/gswauth/test_swauth/unit/test_authtypes.py index eda1de4..d9b7b55 100644 --- a/gluster/swift/common/middleware/gswauth/test_swauth/unit/test_authtypes.py +++ b/gluster/swift/common/middleware/gswauth/test_swauth/unit/test_authtypes.py @@ -14,7 +14,6 @@ # Pablo Llopis 2011 import unittest -from contextlib import contextmanager from swauth import authtypes diff --git a/gluster/swift/common/middleware/gswauth/test_swauth/unit/test_middleware.py b/gluster/swift/common/middleware/gswauth/test_swauth/unit/test_middleware.py index 9029f89..62259ff 100644 --- a/gluster/swift/common/middleware/gswauth/test_swauth/unit/test_middleware.py +++ b/gluster/swift/common/middleware/gswauth/test_swauth/unit/test_middleware.py @@ -61,11 +61,13 @@ class FakeMemcache(object): class FakeApp(object): - def __init__(self, status_headers_body_iter=None, acl=None, sync_key=None): + def __init__( + self, status_headers_body_iter=None, acl=None, sync_key=None): self.calls = 0 self.status_headers_body_iter = status_headers_body_iter if not self.status_headers_body_iter: - self.status_headers_body_iter = iter([('404 Not Found', {}, '')]) + self.status_headers_body_iter = iter( + [('404 Not Found', {}, '')]) self.acl = acl self.sync_key = sync_key @@ -75,12 +77,14 @@ class FakeApp(object): if self.acl: self.request.acl = self.acl if self.sync_key: - self.request.environ['swift_sync_key'] = self.sync_key + self.request.environ[ + 'swift_sync_key'] = self.sync_key if 'swift.authorize' in env: resp = env['swift.authorize'](self.request) if resp: return resp(env, start_response) - status, headers, body = self.status_headers_body_iter.next() + status, headers, body = self.status_headers_body_iter.next( + ) return Response(status=status, headers=headers, body=body)(env, start_response) @@ -91,7 +95,8 @@ class FakeConn(object): self.calls = 0 self.status_headers_body_iter = status_headers_body_iter if not self.status_headers_body_iter: - self.status_headers_body_iter = iter([('404 Not Found', {}, '')]) + self.status_headers_body_iter = iter( + [('404 Not Found', {}, '')]) def request(self, method, path, headers): self.calls += 1 @@ -124,33 +129,42 @@ class TestAuth(unittest.TestCase): def test_reseller_prefix_init(self): app = FakeApp() - ath = auth.filter_factory({'super_admin_key': 'supertest'})(app) + ath = auth.filter_factory( + {'super_admin_key': 'supertest'})(app) self.assertEquals(ath.reseller_prefix, 'AUTH_') - ath = auth.filter_factory({'super_admin_key': 'supertest', - 'reseller_prefix': 'TEST'})(app) + ath = auth.filter_factory( + {'super_admin_key': 'supertest', + 'reseller_prefix': 'TEST'})(app) self.assertEquals(ath.reseller_prefix, 'TEST_') - ath = auth.filter_factory({'super_admin_key': 'supertest', - 'reseller_prefix': 'TEST_'})(app) + ath = auth.filter_factory( + {'super_admin_key': 'supertest', + 'reseller_prefix': 'TEST_'})(app) self.assertEquals(ath.reseller_prefix, 'TEST_') def test_auth_prefix_init(self): app = FakeApp() - ath = auth.filter_factory({'super_admin_key': 'supertest'})(app) + ath = auth.filter_factory( + {'super_admin_key': 'supertest'})(app) self.assertEquals(ath.auth_prefix, '/auth/') - ath = auth.filter_factory({'super_admin_key': 'supertest', - 'auth_prefix': ''})(app) + ath = auth.filter_factory( + {'super_admin_key': 'supertest', + 'auth_prefix': ''})(app) self.assertEquals(ath.auth_prefix, '/auth/') - ath = auth.filter_factory({'super_admin_key': 'supertest', - 'auth_prefix': '/test/'})(app) + ath = auth.filter_factory( + {'super_admin_key': 'supertest', + 'auth_prefix': '/test/'})(app) self.assertEquals(ath.auth_prefix, '/test/') - ath = auth.filter_factory({'super_admin_key': 'supertest', - 'auth_prefix': '/test'})(app) + ath = auth.filter_factory( + {'super_admin_key': 'supertest', + 'auth_prefix': '/test'})(app) self.assertEquals(ath.auth_prefix, '/test/') - ath = auth.filter_factory({'super_admin_key': 'supertest', - 'auth_prefix': 'test/'})(app) + ath = auth.filter_factory( + {'super_admin_key': 'supertest', + 'auth_prefix': 'test/'})(app) self.assertEquals(ath.auth_prefix, '/test/') - ath = auth.filter_factory({'super_admin_key': 'supertest', - 'auth_prefix': 'test'})(app) + ath = auth.filter_factory( + {'super_admin_key': 'supertest', + 'auth_prefix': 'test'})(app) self.assertEquals(ath.auth_prefix, '/test/') def test_no_auth_type_init(self): @@ -160,16 +174,19 @@ class TestAuth(unittest.TestCase): def test_valid_auth_type_init(self): app = FakeApp() - ath = auth.filter_factory({'auth_type': 'sha1'})(app) + ath = auth.filter_factory( + {'auth_type': 'sha1'})(app) self.assertEquals(ath.auth_type, 'Sha1') - ath = auth.filter_factory({'auth_type': 'plaintext'})(app) + ath = auth.filter_factory( + {'auth_type': 'plaintext'})(app) self.assertEquals(ath.auth_type, 'Plaintext') def test_invalid_auth_type_init(self): app = FakeApp() exc = None try: - auth.filter_factory({'auth_type': 'NONEXISTANT'})(app) + auth.filter_factory( + {'auth_type': 'NONEXISTANT'})(app) except Exception as err: exc = err self.assertEquals(str(exc), @@ -181,46 +198,58 @@ class TestAuth(unittest.TestCase): self.assertRaises(Exception, auth.filter_factory({ 'super_admin_key': 'supertest', 'default_swift_cluster': 'local#badscheme://host/path'}), app) - ath = auth.filter_factory({'super_admin_key': 'supertest'})(app) + ath = auth.filter_factory( + {'super_admin_key': 'supertest'})(app) self.assertEquals(ath.default_swift_cluster, 'local#http://127.0.0.1:8080/v1') - ath = auth.filter_factory({'super_admin_key': 'supertest', + ath = auth.filter_factory({ + 'super_admin_key': 'supertest', 'default_swift_cluster': 'local#http://host/path'})(app) self.assertEquals(ath.default_swift_cluster, 'local#http://host/path') - ath = auth.filter_factory({'super_admin_key': 'supertest', + ath = auth.filter_factory({ + 'super_admin_key': 'supertest', 'default_swift_cluster': 'local#https://host/path/'})(app) self.assertEquals(ath.dsc_url, 'https://host/path') self.assertEquals(ath.dsc_url2, 'https://host/path') - ath = auth.filter_factory({'super_admin_key': 'supertest', + ath = auth.filter_factory({ + 'super_admin_key': 'supertest', 'default_swift_cluster': - 'local#https://host/path/#http://host2/path2/'})(app) + 'local#https://host/path/#http://host2/path2/'})(app) self.assertEquals(ath.dsc_url, 'https://host/path') - self.assertEquals(ath.dsc_url2, 'http://host2/path2') + self.assertEquals( + ath.dsc_url2, + 'http://host2/path2') def test_top_level_denied(self): - resp = Request.blank('/').get_response(self.test_auth) + resp = Request.blank( + '/').get_response(self.test_auth) self.assertEquals(resp.status_int, 401) def test_anon(self): - resp = Request.blank('/v1/AUTH_account').get_response(self.test_auth) + resp = Request.blank( + '/v1/AUTH_account').get_response(self.test_auth) self.assertEquals(resp.status_int, 401) self.assertEquals(resp.environ['swift.authorize'], self.test_auth.authorize) def test_auth_deny_non_reseller_prefix(self): - resp = Request.blank('/v1/BLAH_account', + resp = Request.blank( + '/v1/BLAH_account', headers={'X-Auth-Token': 'BLAH_t'}).get_response(self.test_auth) self.assertEquals(resp.status_int, 401) self.assertEquals(resp.environ['swift.authorize'], self.test_auth.denied_response) - def test_auth_deny_non_reseller_prefix_no_override(self): - fake_authorize = lambda x: Response(status='500 Fake') - resp = Request.blank('/v1/BLAH_account', + def test_auth_deny_non_reseller_prefix_no_override( + self): + fake_authorize = lambda x: Response( + status='500 Fake') + resp = Request.blank( + '/v1/BLAH_account', headers={'X-Auth-Token': 'BLAH_t'}, - environ={'swift.authorize': fake_authorize} - ).get_response(self.test_auth) + environ={'swift.authorize': fake_authorize}).get_response( + self.test_auth) self.assertEquals(resp.status_int, 500) self.assertEquals(resp.environ['swift.authorize'], fake_authorize) @@ -229,12 +258,15 @@ class TestAuth(unittest.TestCase): # outright but set up a denial swift.authorize and pass the request on # down the chain. local_app = FakeApp() - local_auth = auth.filter_factory({'super_admin_key': 'supertest', - 'reseller_prefix': ''})(local_app) - resp = Request.blank('/v1/account', + local_auth = auth.filter_factory( + {'super_admin_key': 'supertest', + 'reseller_prefix': ''})(local_app) + resp = Request.blank( + '/v1/account', headers={'X-Auth-Token': 't'}).get_response(local_auth) self.assertEquals(resp.status_int, 401) - # one for checking auth, two for request passed along + # one for checking auth, two for request passed + # along self.assertEquals(local_app.calls, 2) self.assertEquals(resp.environ['swift.authorize'], local_auth.denied_response) @@ -244,15 +276,18 @@ class TestAuth(unittest.TestCase): # access if our auth server accepts requests local_app = FakeApp(iter([ ('200 Ok', {}, - json.dumps({'account': 'act', 'user': 'act:usr', - 'account_id': 'AUTH_cfa', - 'groups': [{'name': 'act:usr'}, {'name': 'act'}, - {'name': '.admin'}], - 'expires': time() + 60})), + json.dumps( + {'account': 'act', 'user': 'act:usr', + 'account_id': 'AUTH_cfa', + 'groups': [{'name': 'act:usr'}, {'name': 'act'}, + {'name': '.admin'}], + 'expires': time() + 60})), ('204 No Content', {}, '')])) - local_auth = auth.filter_factory({'super_admin_key': 'supertest', - 'reseller_prefix': ''})(local_app) - resp = Request.blank('/v1/act', + local_auth = auth.filter_factory( + {'super_admin_key': 'supertest', + 'reseller_prefix': ''})(local_app) + resp = Request.blank( + '/v1/act', headers={'X-Auth-Token': 't'}).get_response(local_auth) self.assertEquals(resp.status_int, 204) self.assertEquals(local_app.calls, 2) @@ -260,40 +295,51 @@ class TestAuth(unittest.TestCase): local_auth.authorize) def test_auth_no_reseller_prefix_no_token(self): - # Check that normally we set up a call back to our authorize. + # Check that normally we set up a call back to our + # authorize. local_auth = \ - auth.filter_factory({'super_admin_key': 'supertest', - 'reseller_prefix': ''})(FakeApp(iter([]))) - resp = Request.blank('/v1/account').get_response(local_auth) + auth.filter_factory( + {'super_admin_key': 'supertest', + 'reseller_prefix': ''})(FakeApp(iter([]))) + resp = Request.blank( + '/v1/account').get_response( + local_auth) self.assertEquals(resp.status_int, 401) - self.assertEquals(resp.environ['swift.authorize'], - local_auth.authorize) + self.assertEquals( + resp.environ['swift.authorize'], local_auth.authorize) # Now make sure we don't override an existing swift.authorize when we # have no reseller prefix. local_auth = \ - auth.filter_factory({'super_admin_key': 'supertest', - 'reseller_prefix': ''})(FakeApp()) + auth.filter_factory( + {'super_admin_key': 'supertest', + 'reseller_prefix': ''})(FakeApp()) local_authorize = lambda req: Response('test') - resp = Request.blank('/v1/account', environ={'swift.authorize': - local_authorize}).get_response(local_auth) + resp = Request.blank( + '/v1/account', environ={'swift.authorize': + local_authorize}).get_response(local_auth) self.assertEquals(resp.status_int, 200) - self.assertEquals(resp.environ['swift.authorize'], local_authorize) + self.assertEquals( + resp.environ['swift.authorize'], + local_authorize) def test_auth_fail(self): - resp = Request.blank('/v1/AUTH_cfa', + resp = Request.blank( + '/v1/AUTH_cfa', headers={'X-Auth-Token': 'AUTH_t'}).get_response(self.test_auth) self.assertEquals(resp.status_int, 401) def test_auth_success(self): self.test_auth.app = FakeApp(iter([ ('200 Ok', {}, - json.dumps({'account': 'act', 'user': 'act:usr', - 'account_id': 'AUTH_cfa', - 'groups': [{'name': 'act:usr'}, {'name': 'act'}, - {'name': '.admin'}], - 'expires': time() + 60})), + json.dumps( + {'account': 'act', 'user': 'act:usr', + 'account_id': 'AUTH_cfa', + 'groups': [{'name': 'act:usr'}, {'name': 'act'}, + {'name': '.admin'}], + 'expires': time() + 60})), ('204 No Content', {}, '')])) - resp = Request.blank('/v1/AUTH_cfa', + resp = Request.blank( + '/v1/AUTH_cfa', headers={'X-Auth-Token': 'AUTH_t'}).get_response(self.test_auth) self.assertEquals(resp.status_int, 204) self.assertEquals(self.test_auth.app.calls, 2) @@ -303,23 +349,27 @@ class TestAuth(unittest.TestCase): # token contents twice. self.test_auth.app = FakeApp(iter([ ('200 Ok', {}, - json.dumps({'account': 'act', 'user': 'act:usr', - 'account_id': 'AUTH_cfa', - 'groups': [{'name': 'act:usr'}, {'name': 'act'}, - {'name': '.admin'}], - 'expires': time() + 60})), + json.dumps( + {'account': 'act', 'user': 'act:usr', + 'account_id': 'AUTH_cfa', + 'groups': [{'name': 'act:usr'}, {'name': 'act'}, + {'name': '.admin'}], + 'expires': time() + 60})), ('204 No Content', {}, ''), ('200 Ok', {}, - json.dumps({'account': 'act', 'user': 'act:usr', - 'account_id': 'AUTH_cfa', - 'groups': [{'name': 'act:usr'}, {'name': 'act'}, - {'name': '.admin'}], - 'expires': time() + 60})), + json.dumps( + {'account': 'act', 'user': 'act:usr', + 'account_id': 'AUTH_cfa', + 'groups': [{'name': 'act:usr'}, {'name': 'act'}, + {'name': '.admin'}], + 'expires': time() + 60})), ('204 No Content', {}, '')])) - resp = Request.blank('/v1/AUTH_cfa', + resp = Request.blank( + '/v1/AUTH_cfa', headers={'X-Auth-Token': 'AUTH_t'}).get_response(self.test_auth) self.assertEquals(resp.status_int, 204) - resp = Request.blank('/v1/AUTH_cfa', + resp = Request.blank( + '/v1/AUTH_cfa', headers={'X-Auth-Token': 'AUTH_t'}).get_response(self.test_auth) self.assertEquals(resp.status_int, 204) self.assertEquals(self.test_auth.app.calls, 4) @@ -327,24 +377,28 @@ class TestAuth(unittest.TestCase): # the token contents twice. self.test_auth.app = FakeApp(iter([ ('200 Ok', {}, - json.dumps({'account': 'act', 'user': 'act:usr', - 'account_id': 'AUTH_cfa', - 'groups': [{'name': 'act:usr'}, {'name': 'act'}, - {'name': '.admin'}], - 'expires': time() + 60})), + json.dumps( + {'account': 'act', 'user': 'act:usr', + 'account_id': 'AUTH_cfa', + 'groups': [{'name': 'act:usr'}, {'name': 'act'}, + {'name': '.admin'}], + 'expires': time() + 60})), ('204 No Content', {}, ''), - # Don't need a second token object returned if memcache is used + # Don't need a second token object returned if memcache is + # used ('204 No Content', {}, '')])) fake_memcache = FakeMemcache() - resp = Request.blank('/v1/AUTH_cfa', + resp = Request.blank( + '/v1/AUTH_cfa', headers={'X-Auth-Token': 'AUTH_t'}, - environ={'swift.cache': fake_memcache} - ).get_response(self.test_auth) + environ={'swift.cache': fake_memcache}).get_response( + self.test_auth) self.assertEquals(resp.status_int, 204) - resp = Request.blank('/v1/AUTH_cfa', + resp = Request.blank( + '/v1/AUTH_cfa', headers={'X-Auth-Token': 'AUTH_t'}, - environ={'swift.cache': fake_memcache} - ).get_response(self.test_auth) + environ={'swift.cache': fake_memcache}).get_response( + self.test_auth) self.assertEquals(resp.status_int, 204) self.assertEquals(self.test_auth.app.calls, 3) @@ -352,14 +406,16 @@ class TestAuth(unittest.TestCase): self.test_auth.app = FakeApp(iter([ # Request for token (which will have expired) ('200 Ok', {}, - json.dumps({'account': 'act', 'user': 'act:usr', - 'account_id': 'AUTH_cfa', - 'groups': [{'name': 'act:usr'}, {'name': 'act'}, - {'name': '.admin'}], - 'expires': time() - 1})), + json.dumps( + {'account': 'act', 'user': 'act:usr', + 'account_id': 'AUTH_cfa', + 'groups': [{'name': 'act:usr'}, {'name': 'act'}, + {'name': '.admin'}], + 'expires': time() - 1})), # Request to delete token ('204 No Content', {}, '')])) - resp = Request.blank('/v1/AUTH_cfa', + resp = Request.blank( + '/v1/AUTH_cfa', headers={'X-Auth-Token': 'AUTH_t'}).get_response(self.test_auth) self.assertEquals(resp.status_int, 401) self.assertEquals(self.test_auth.app.calls, 2) @@ -367,13 +423,15 @@ class TestAuth(unittest.TestCase): def test_middleware_storage_token(self): self.test_auth.app = FakeApp(iter([ ('200 Ok', {}, - json.dumps({'account': 'act', 'user': 'act:usr', - 'account_id': 'AUTH_cfa', - 'groups': [{'name': 'act:usr'}, {'name': 'act'}, - {'name': '.admin'}], - 'expires': time() + 60})), + json.dumps( + {'account': 'act', 'user': 'act:usr', + 'account_id': 'AUTH_cfa', + 'groups': [{'name': 'act:usr'}, {'name': 'act'}, + {'name': '.admin'}], + 'expires': time() + 60})), ('204 No Content', {}, '')])) - resp = Request.blank('/v1/AUTH_cfa', + resp = Request.blank( + '/v1/AUTH_cfa', headers={'X-Storage-Token': 'AUTH_t'}).get_response(self.test_auth) self.assertEquals(resp.status_int, 204) self.assertEquals(self.test_auth.app.calls, 2) @@ -390,7 +448,9 @@ class TestAuth(unittest.TestCase): def test_authorize_account_access(self): req = Request.blank('/v1/AUTH_cfa') req.remote_user = 'act:usr,act,AUTH_cfa' - self.assertEquals(self.test_auth.authorize(req), None) + self.assertEquals( + self.test_auth.authorize(req), + None) req = Request.blank('/v1/AUTH_cfa') req.remote_user = 'act:usr,act' resp = self.test_auth.authorize(req) @@ -404,11 +464,15 @@ class TestAuth(unittest.TestCase): req = Request.blank('/v1/AUTH_cfa') req.remote_user = 'act:usr,act' req.acl = 'act' - self.assertEquals(self.test_auth.authorize(req), None) + self.assertEquals( + self.test_auth.authorize(req), + None) req = Request.blank('/v1/AUTH_cfa') req.remote_user = 'act:usr,act' req.acl = 'act:usr' - self.assertEquals(self.test_auth.authorize(req), None) + self.assertEquals( + self.test_auth.authorize(req), + None) req = Request.blank('/v1/AUTH_cfa') req.remote_user = 'act:usr,act' req.acl = 'act2' @@ -421,7 +485,8 @@ class TestAuth(unittest.TestCase): self.assertEquals(resp.status_int, 403) def test_deny_cross_reseller(self): - # Tests that cross-reseller is denied, even if ACLs/group names match + # Tests that cross-reseller is denied, even if ACLs/group + # names match req = Request.blank('/v1/OTHER_cfa') req.remote_user = 'act:usr,act,AUTH_cfa' req.acl = 'act' @@ -436,7 +501,9 @@ class TestAuth(unittest.TestCase): req = Request.blank('/v1/AUTH_cfa/c') req.remote_user = 'act:usr,act' req.acl = '.r:*,.rlistings' - self.assertEquals(self.test_auth.authorize(req), None) + self.assertEquals( + self.test_auth.authorize(req), + None) req = Request.blank('/v1/AUTH_cfa/c') req.remote_user = 'act:usr,act' req.acl = '.r:*' # No listings allowed @@ -451,13 +518,17 @@ class TestAuth(unittest.TestCase): req.remote_user = 'act:usr,act' req.referer = 'http://www.example.com/index.html' req.acl = '.r:.example.com,.rlistings' - self.assertEquals(self.test_auth.authorize(req), None) + self.assertEquals( + self.test_auth.authorize(req), + None) req = Request.blank('/v1/AUTH_cfa/c') resp = self.test_auth.authorize(req) self.assertEquals(resp.status_int, 401) req = Request.blank('/v1/AUTH_cfa/c') req.acl = '.r:*,.rlistings' - self.assertEquals(self.test_auth.authorize(req), None) + self.assertEquals( + self.test_auth.authorize(req), + None) req = Request.blank('/v1/AUTH_cfa/c') req.acl = '.r:*' # No listings allowed resp = self.test_auth.authorize(req) @@ -469,42 +540,55 @@ class TestAuth(unittest.TestCase): req = Request.blank('/v1/AUTH_cfa/c') req.referer = 'http://www.example.com/index.html' req.acl = '.r:.example.com,.rlistings' - self.assertEquals(self.test_auth.authorize(req), None) + self.assertEquals( + self.test_auth.authorize(req), + None) def test_detect_reseller_request(self): - req = self._make_request('/v1/AUTH_admin', + req = self._make_request('/v1/AUTH_admin', headers={'X-Auth-Token': 'AUTH_t'}) cache_key = 'AUTH_/auth/AUTH_t' - cache_entry = (time()+3600, '.reseller_admin') - req.environ['swift.cache'].set(cache_key, cache_entry) - resp = req.get_response(self.test_auth) + cache_entry = (time() + 3600, '.reseller_admin') + req.environ['swift.cache'].set( + cache_key, cache_entry) self.assertTrue(req.environ.get('reseller_request')) def test_account_put_permissions(self): - req = Request.blank('/v1/AUTH_new', environ={'REQUEST_METHOD': 'PUT'}) + req = Request.blank( + '/v1/AUTH_new', + environ={'REQUEST_METHOD': 'PUT'}) req.remote_user = 'act:usr,act' resp = self.test_auth.authorize(req) self.assertEquals(resp.status_int, 403) - req = Request.blank('/v1/AUTH_new', environ={'REQUEST_METHOD': 'PUT'}) + req = Request.blank( + '/v1/AUTH_new', + environ={'REQUEST_METHOD': 'PUT'}) req.remote_user = 'act:usr,act,AUTH_other' resp = self.test_auth.authorize(req) self.assertEquals(resp.status_int, 403) - # Even PUTs to your own account as account admin should fail - req = Request.blank('/v1/AUTH_old', environ={'REQUEST_METHOD': 'PUT'}) + # Even PUTs to your own account as account admin + # should fail + req = Request.blank( + '/v1/AUTH_old', + environ={'REQUEST_METHOD': 'PUT'}) req.remote_user = 'act:usr,act,AUTH_old' resp = self.test_auth.authorize(req) self.assertEquals(resp.status_int, 403) - req = Request.blank('/v1/AUTH_new', environ={'REQUEST_METHOD': 'PUT'}) + req = Request.blank( + '/v1/AUTH_new', + environ={'REQUEST_METHOD': 'PUT'}) req.remote_user = 'act:usr,act,.reseller_admin' resp = self.test_auth.authorize(req) self.assertEquals(resp, None) # .super_admin is not something the middleware should ever see or care # about - req = Request.blank('/v1/AUTH_new', environ={'REQUEST_METHOD': 'PUT'}) + req = Request.blank( + '/v1/AUTH_new', + environ={'REQUEST_METHOD': 'PUT'}) req.remote_user = 'act:usr,act,.super_admin' resp = self.test_auth.authorize(req) self.assertEquals(resp.status_int, 403) @@ -522,7 +606,8 @@ class TestAuth(unittest.TestCase): resp = self.test_auth.authorize(req) self.assertEquals(resp.status_int, 403) - # Even DELETEs to your own account as account admin should fail + # Even DELETEs to your own account as account admin should + # fail req = Request.blank('/v1/AUTH_old', environ={'REQUEST_METHOD': 'DELETE'}) req.remote_user = 'act:usr,act,AUTH_old' @@ -545,9 +630,12 @@ class TestAuth(unittest.TestCase): self.assertEquals(resp.status_int, 403) def test_get_token_fail(self): - resp = Request.blank('/auth/v1.0').get_response(self.test_auth) + resp = Request.blank( + '/auth/v1.0').get_response( + self.test_auth) self.assertEquals(resp.status_int, 401) - resp = Request.blank('/auth/v1.0', + resp = Request.blank( + '/auth/v1.0', headers={'X-Auth-User': 'act:usr', 'X-Auth-Key': 'key'}).get_response(self.test_auth) self.assertEquals(resp.status_int, 401) @@ -559,39 +647,47 @@ class TestAuth(unittest.TestCase): json.dumps({"auth": "plaintext:key", "groups": [{'name': "act:usr"}, {'name': "act"}, {'name': ".admin"}]}))])) - resp = Request.blank('/auth/v1.0', + resp = Request.blank( + '/auth/v1.0', headers={'X-Auth-User': 'act:usr', 'X-Auth-Key': 'invalid'}).get_response(self.test_auth) self.assertEquals(resp.status_int, 401) self.assertEquals(self.test_auth.app.calls, 1) - def test_get_token_fail_invalid_x_auth_user_format(self): - resp = Request.blank('/auth/v1/act/auth', + def test_get_token_fail_invalid_x_auth_user_format( + self): + resp = Request.blank( + '/auth/v1/act/auth', headers={'X-Auth-User': 'usr', 'X-Auth-Key': 'key'}).get_response(self.test_auth) self.assertEquals(resp.status_int, 401) - def test_get_token_fail_non_matching_account_in_request(self): - resp = Request.blank('/auth/v1/act/auth', + def test_get_token_fail_non_matching_account_in_request( + self): + resp = Request.blank( + '/auth/v1/act/auth', headers={'X-Auth-User': 'act2:usr', 'X-Auth-Key': 'key'}).get_response(self.test_auth) self.assertEquals(resp.status_int, 401) def test_get_token_fail_bad_path(self): - resp = Request.blank('/auth/v1/act/auth/invalid', + resp = Request.blank( + '/auth/v1/act/auth/invalid', headers={'X-Auth-User': 'act:usr', 'X-Auth-Key': 'key'}).get_response(self.test_auth) self.assertEquals(resp.status_int, 400) def test_get_token_fail_missing_key(self): - resp = Request.blank('/auth/v1/act/auth', + resp = Request.blank( + '/auth/v1/act/auth', headers={'X-Auth-User': 'act:usr'}).get_response(self.test_auth) self.assertEquals(resp.status_int, 401) def test_get_token_fail_get_user_details(self): self.test_auth.app = FakeApp(iter([ ('503 Service Unavailable', {}, '')])) - resp = Request.blank('/auth/v1.0', + resp = Request.blank( + '/auth/v1.0', headers={'X-Auth-User': 'act:usr', 'X-Auth-Key': 'key'}).get_response(self.test_auth) self.assertEquals(resp.status_int, 500) @@ -606,7 +702,8 @@ class TestAuth(unittest.TestCase): {'name': ".admin"}]})), # GET of account ('503 Service Unavailable', {}, '')])) - resp = Request.blank('/auth/v1.0', + resp = Request.blank( + '/auth/v1.0', headers={'X-Auth-User': 'act:usr', 'X-Auth-Key': 'key'}).get_response(self.test_auth) self.assertEquals(resp.status_int, 500) @@ -620,10 +717,12 @@ class TestAuth(unittest.TestCase): "groups": [{'name': "act:usr"}, {'name': "act"}, {'name': ".admin"}]})), # GET of account - ('204 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, ''), + ('204 Ok', + {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, ''), # PUT of new token ('503 Service Unavailable', {}, '')])) - resp = Request.blank('/auth/v1.0', + resp = Request.blank( + '/auth/v1.0', headers={'X-Auth-User': 'act:usr', 'X-Auth-Key': 'key'}).get_response(self.test_auth) self.assertEquals(resp.status_int, 500) @@ -637,12 +736,14 @@ class TestAuth(unittest.TestCase): "groups": [{'name': "act:usr"}, {'name': "act"}, {'name': ".admin"}]})), # GET of account - ('204 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, ''), + ('204 Ok', + {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, ''), # PUT of new token ('201 Created', {}, ''), # POST of token to user object ('503 Service Unavailable', {}, '')])) - resp = Request.blank('/auth/v1.0', + resp = Request.blank( + '/auth/v1.0', headers={'X-Auth-User': 'act:usr', 'X-Auth-Key': 'key'}).get_response(self.test_auth) self.assertEquals(resp.status_int, 500) @@ -656,14 +757,16 @@ class TestAuth(unittest.TestCase): "groups": [{'name': "act:usr"}, {'name': "act"}, {'name': ".admin"}]})), # GET of account - ('204 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, ''), + ('204 Ok', + {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, ''), # PUT of new token ('201 Created', {}, ''), # POST of token to user object ('204 No Content', {}, ''), # GET of services object ('503 Service Unavailable', {}, '')])) - resp = Request.blank('/auth/v1.0', + resp = Request.blank( + '/auth/v1.0', headers={'X-Auth-User': 'act:usr', 'X-Auth-Key': 'key'}).get_response(self.test_auth) self.assertEquals(resp.status_int, 500) @@ -678,7 +781,8 @@ class TestAuth(unittest.TestCase): {'name': ".admin"}]})), # GET of token ('503 Service Unavailable', {}, '')])) - resp = Request.blank('/auth/v1.0', + resp = Request.blank( + '/auth/v1.0', headers={'X-Auth-User': 'act:usr', 'X-Auth-Key': 'key'}).get_response(self.test_auth) self.assertEquals(resp.status_int, 500) @@ -692,7 +796,8 @@ class TestAuth(unittest.TestCase): "groups": [{'name': "act:usr"}, {'name': "act"}, {'name': ".admin"}]})), # GET of account - ('204 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, ''), + ('204 Ok', + {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, ''), # PUT of new token ('201 Created', {}, ''), # POST of token to user object @@ -700,22 +805,26 @@ class TestAuth(unittest.TestCase): # GET of services object ('200 Ok', {}, json.dumps({"storage": {"default": "local", "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}}))])) - resp = Request.blank('/auth/v1.0', + resp = Request.blank( + '/auth/v1.0', headers={'X-Auth-User': 'act:usr', 'X-Auth-Key': 'key'}).get_response(self.test_auth) self.assertEquals(resp.status_int, 200) - self.assert_(resp.headers.get('x-auth-token', + self.assert_(resp.headers.get( + 'x-auth-token', '').startswith('AUTH_tk'), resp.headers.get('x-auth-token')) self.assertEquals(resp.headers.get('x-auth-token'), resp.headers.get('x-storage-token')) self.assertEquals(resp.headers.get('x-storage-url'), 'http://127.0.0.1:8080/v1/AUTH_cfa') - self.assertEquals(json.loads(resp.body), + self.assertEquals( + json.loads(resp.body), {"storage": {"default": "local", "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}}) self.assertEquals(self.test_auth.app.calls, 5) - def test_get_token_success_v1_0_with_user_token_life(self): + def test_get_token_success_v1_0_with_user_token_life( + self): self.test_auth.app = FakeApp(iter([ # GET of user object ('200 Ok', {}, @@ -723,7 +832,8 @@ class TestAuth(unittest.TestCase): "groups": [{'name': "act:usr"}, {'name': "act"}, {'name': ".admin"}]})), # GET of account - ('204 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, ''), + ('204 Ok', + {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, ''), # PUT of new token ('201 Created', {}, ''), # POST of token to user object @@ -731,7 +841,8 @@ class TestAuth(unittest.TestCase): # GET of services object ('200 Ok', {}, json.dumps({"storage": {"default": "local", "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}}))])) - resp = Request.blank('/auth/v1.0', + resp = Request.blank( + '/auth/v1.0', headers={'X-Auth-User': 'act:usr', 'X-Auth-Key': 'key', 'X-Auth-Token-Lifetime': 10}).get_response(self.test_auth) @@ -739,18 +850,21 @@ class TestAuth(unittest.TestCase): left = int(resp.headers['x-auth-token-expires']) self.assertTrue(left > 0, '%d > 0' % left) self.assertTrue(left <= 10, '%d <= 10' % left) - self.assert_(resp.headers.get('x-auth-token', + self.assert_(resp.headers.get( + 'x-auth-token', '').startswith('AUTH_tk'), resp.headers.get('x-auth-token')) self.assertEquals(resp.headers.get('x-auth-token'), resp.headers.get('x-storage-token')) self.assertEquals(resp.headers.get('x-storage-url'), 'http://127.0.0.1:8080/v1/AUTH_cfa') - self.assertEquals(json.loads(resp.body), + self.assertEquals( + json.loads(resp.body), {"storage": {"default": "local", "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}}) self.assertEquals(self.test_auth.app.calls, 5) - def test_get_token_success_v1_0_with_user_token_life_past_max(self): + def test_get_token_success_v1_0_with_user_token_life_past_max( + self): self.test_auth.app = FakeApp(iter([ # GET of user object ('200 Ok', {}, @@ -758,7 +872,8 @@ class TestAuth(unittest.TestCase): "groups": [{'name': "act:usr"}, {'name': "act"}, {'name': ".admin"}]})), # GET of account - ('204 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, ''), + ('204 Ok', + {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, ''), # PUT of new token ('201 Created', {}, ''), # POST of token to user object @@ -778,13 +893,15 @@ class TestAuth(unittest.TestCase): '%d > %d' % (left, DEFAULT_TOKEN_LIFE)) self.assertTrue(left <= MAX_TOKEN_LIFE, '%d <= %d' % (left, MAX_TOKEN_LIFE)) - self.assert_(resp.headers.get('x-auth-token', + self.assert_(resp.headers.get( + 'x-auth-token', '').startswith('AUTH_tk'), resp.headers.get('x-auth-token')) self.assertEquals(resp.headers.get('x-auth-token'), resp.headers.get('x-storage-token')) self.assertEquals(resp.headers.get('x-storage-url'), 'http://127.0.0.1:8080/v1/AUTH_cfa') - self.assertEquals(json.loads(resp.body), + self.assertEquals( + json.loads(resp.body), {"storage": {"default": "local", "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}}) self.assertEquals(self.test_auth.app.calls, 5) @@ -797,7 +914,8 @@ class TestAuth(unittest.TestCase): "groups": [{'name': "act:usr"}, {'name': "act"}, {'name': ".admin"}]})), # GET of account - ('204 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, ''), + ('204 Ok', + {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, ''), # PUT of new token ('201 Created', {}, ''), # POST of token to user object @@ -805,22 +923,26 @@ class TestAuth(unittest.TestCase): # GET of services object ('200 Ok', {}, json.dumps({"storage": {"default": "local", "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}}))])) - resp = Request.blank('/auth/v1/act/auth', + resp = Request.blank( + '/auth/v1/act/auth', headers={'X-Storage-User': 'usr', 'X-Storage-Pass': 'key'}).get_response(self.test_auth) self.assertEquals(resp.status_int, 200) - self.assert_(resp.headers.get('x-auth-token', + self.assert_(resp.headers.get( + 'x-auth-token', '').startswith('AUTH_tk'), resp.headers.get('x-auth-token')) self.assertEquals(resp.headers.get('x-auth-token'), resp.headers.get('x-storage-token')) self.assertEquals(resp.headers.get('x-storage-url'), 'http://127.0.0.1:8080/v1/AUTH_cfa') - self.assertEquals(json.loads(resp.body), + self.assertEquals( + json.loads(resp.body), {"storage": {"default": "local", "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}}) self.assertEquals(self.test_auth.app.calls, 5) - def test_get_token_success_storage_instead_of_auth(self): + def test_get_token_success_storage_instead_of_auth( + self): self.test_auth.app = FakeApp(iter([ # GET of user object ('200 Ok', {}, @@ -828,7 +950,8 @@ class TestAuth(unittest.TestCase): "groups": [{'name': "act:usr"}, {'name': "act"}, {'name': ".admin"}]})), # GET of account - ('204 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, ''), + ('204 Ok', + {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, ''), # PUT of new token ('201 Created', {}, ''), # POST of token to user object @@ -836,22 +959,27 @@ class TestAuth(unittest.TestCase): # GET of services object ('200 Ok', {}, json.dumps({"storage": {"default": "local", "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}}))])) - resp = Request.blank('/auth/v1.0', + resp = Request.blank( + '/auth/v1.0', headers={'X-Storage-User': 'act:usr', 'X-Storage-Pass': 'key'}).get_response(self.test_auth) self.assertEquals(resp.status_int, 200) - self.assert_(resp.headers.get('x-auth-token', - '').startswith('AUTH_tk'), resp.headers.get('x-auth-token')) + self.assert_( + resp.headers.get( + 'x-auth-token', + '').startswith('AUTH_tk'), resp.headers.get('x-auth-token')) self.assertEquals(resp.headers.get('x-auth-token'), resp.headers.get('x-storage-token')) self.assertEquals(resp.headers.get('x-storage-url'), 'http://127.0.0.1:8080/v1/AUTH_cfa') - self.assertEquals(json.loads(resp.body), + self.assertEquals( + json.loads(resp.body), {"storage": {"default": "local", "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}}) self.assertEquals(self.test_auth.app.calls, 5) - def test_get_token_success_v1_act_auth_auth_instead_of_storage(self): + def test_get_token_success_v1_act_auth_auth_instead_of_storage( + self): self.test_auth.app = FakeApp(iter([ # GET of user object ('200 Ok', {}, @@ -859,7 +987,8 @@ class TestAuth(unittest.TestCase): "groups": [{'name': "act:usr"}, {'name': "act"}, {'name': ".admin"}]})), # GET of account - ('204 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, ''), + ('204 Ok', + {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, ''), # PUT of new token ('201 Created', {}, ''), # POST of token to user object @@ -867,17 +996,20 @@ class TestAuth(unittest.TestCase): # GET of services object ('200 Ok', {}, json.dumps({"storage": {"default": "local", "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}}))])) - resp = Request.blank('/auth/v1/act/auth', + resp = Request.blank( + '/auth/v1/act/auth', headers={'X-Auth-User': 'act:usr', 'X-Auth-Key': 'key'}).get_response(self.test_auth) self.assertEquals(resp.status_int, 200) - self.assert_(resp.headers.get('x-auth-token', + self.assert_(resp.headers.get( + 'x-auth-token', '').startswith('AUTH_tk'), resp.headers.get('x-auth-token')) self.assertEquals(resp.headers.get('x-auth-token'), resp.headers.get('x-storage-token')) self.assertEquals(resp.headers.get('x-storage-url'), 'http://127.0.0.1:8080/v1/AUTH_cfa') - self.assertEquals(json.loads(resp.body), + self.assertEquals( + json.loads(resp.body), {"storage": {"default": "local", "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}}) self.assertEquals(self.test_auth.app.calls, 5) @@ -890,28 +1022,35 @@ class TestAuth(unittest.TestCase): "groups": [{'name': "act:usr"}, {'name': "act"}, {'name': ".admin"}]})), # GET of token - ('200 Ok', {}, json.dumps({"account": "act", "user": "usr", - "account_id": "AUTH_cfa", "groups": [{'name': "act:usr"}, - {'name': "key"}, {'name': ".admin"}], - "expires": 9999999999.9999999})), + ('200 Ok', {}, json.dumps( + {"account": "act", "user": "usr", + "account_id": "AUTH_cfa", + "groups": [{'name': "act:usr"}, + {'name': "key"}, {'name': ".admin"}], + "expires": 9999999999.9999999})), # GET of services object ('200 Ok', {}, json.dumps({"storage": {"default": "local", "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}}))])) - resp = Request.blank('/auth/v1.0', + resp = Request.blank( + '/auth/v1.0', headers={'X-Auth-User': 'act:usr', 'X-Auth-Key': 'key'}).get_response(self.test_auth) self.assertEquals(resp.status_int, 200) - self.assertEquals(resp.headers.get('x-auth-token'), 'AUTH_tktest') + self.assertEquals( + resp.headers.get('x-auth-token'), + 'AUTH_tktest') self.assertEquals(resp.headers.get('x-auth-token'), resp.headers.get('x-storage-token')) self.assertEquals(resp.headers.get('x-storage-url'), 'http://127.0.0.1:8080/v1/AUTH_cfa') - self.assertEquals(json.loads(resp.body), + self.assertEquals( + json.loads(resp.body), {"storage": {"default": "local", "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}}) self.assertEquals(self.test_auth.app.calls, 3) - def test_get_token_success_existing_token_but_request_new_one(self): + def test_get_token_success_existing_token_but_request_new_one( + self): self.test_auth.app = FakeApp(iter([ # GET of user object ('200 Ok', {'X-Object-Meta-Auth-Token': 'AUTH_tktest'}, @@ -921,7 +1060,8 @@ class TestAuth(unittest.TestCase): # DELETE of expired token ('204 No Content', {}, ''), # GET of account - ('204 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, ''), + ('204 Ok', + {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, ''), # PUT of new token ('201 Created', {}, ''), # POST of token to user object @@ -929,19 +1069,23 @@ class TestAuth(unittest.TestCase): # GET of services object ('200 Ok', {}, json.dumps({"storage": {"default": "local", "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}}))])) - resp = Request.blank('/auth/v1.0', + resp = Request.blank( + '/auth/v1.0', headers={'X-Auth-User': 'act:usr', 'X-Auth-Key': 'key', - 'X-Auth-New-Token': 'true'}).get_response(self.test_auth) + 'X-Auth-New-Token': 'true'}).get_response( + self.test_auth) self.assertEquals(resp.status_int, 200) - self.assertNotEquals(resp.headers.get('x-auth-token'), 'AUTH_tktest') + self.assertNotEquals( + resp.headers.get('x-auth-token'), 'AUTH_tktest') self.assertEquals(resp.headers.get('x-auth-token'), resp.headers.get('x-storage-token')) self.assertEquals(resp.headers.get('x-storage-url'), 'http://127.0.0.1:8080/v1/AUTH_cfa') - self.assertEquals(json.loads(resp.body), + self.assertEquals( + json.loads(resp.body), {"storage": {"default": "local", - "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}}) + "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}}) self.assertEquals(self.test_auth.app.calls, 6) def test_get_token_success_existing_token_expired(self): @@ -952,14 +1096,17 @@ class TestAuth(unittest.TestCase): "groups": [{'name': "act:usr"}, {'name': "act"}, {'name': ".admin"}]})), # GET of token - ('200 Ok', {}, json.dumps({"account": "act", "user": "usr", - "account_id": "AUTH_cfa", "groups": [{'name': "act:usr"}, - {'name': "key"}, {'name': ".admin"}], - "expires": 0.0})), + ('200 Ok', {}, json.dumps( + {"account": "act", "user": "usr", + "account_id": "AUTH_cfa", + "groups": [{'name': "act:usr"}, + {'name': "key"}, {'name': ".admin"}], + "expires": 0.0})), # DELETE of expired token ('204 No Content', {}, ''), # GET of account - ('204 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, ''), + ('204 Ok', + {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, ''), # PUT of new token ('201 Created', {}, ''), # POST of token to user object @@ -967,21 +1114,26 @@ class TestAuth(unittest.TestCase): # GET of services object ('200 Ok', {}, json.dumps({"storage": {"default": "local", "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}}))])) - resp = Request.blank('/auth/v1.0', + resp = Request.blank( + '/auth/v1.0', headers={'X-Auth-User': 'act:usr', 'X-Auth-Key': 'key'}).get_response(self.test_auth) self.assertEquals(resp.status_int, 200) - self.assertNotEquals(resp.headers.get('x-auth-token'), 'AUTH_tktest') + self.assertNotEquals( + resp.headers.get('x-auth-token'), + 'AUTH_tktest') self.assertEquals(resp.headers.get('x-auth-token'), resp.headers.get('x-storage-token')) self.assertEquals(resp.headers.get('x-storage-url'), 'http://127.0.0.1:8080/v1/AUTH_cfa') - self.assertEquals(json.loads(resp.body), + self.assertEquals( + json.loads(resp.body), {"storage": {"default": "local", - "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}}) + "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}}) self.assertEquals(self.test_auth.app.calls, 7) - def test_get_token_success_existing_token_expired_fail_deleting_old(self): + def test_get_token_success_existing_token_expired_fail_deleting_old( + self): self.test_auth.app = FakeApp(iter([ # GET of user object ('200 Ok', {'X-Object-Meta-Auth-Token': 'AUTH_tktest'}, @@ -990,13 +1142,15 @@ class TestAuth(unittest.TestCase): {'name': ".admin"}]})), # GET of token ('200 Ok', {}, json.dumps({"account": "act", "user": "usr", - "account_id": "AUTH_cfa", "groups": [{'name': "act:usr"}, - {'name': "key"}, {'name': ".admin"}], + "account_id": "AUTH_cfa", + "groups": [{'name': "act:usr"}, + {'name': "key"}, {'name': ".admin"}], "expires": 0.0})), # DELETE of expired token ('503 Service Unavailable', {}, ''), # GET of account - ('204 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, ''), + ('204 Ok', + {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, ''), # PUT of new token ('201 Created', {}, ''), # POST of token to user object @@ -1004,18 +1158,22 @@ class TestAuth(unittest.TestCase): # GET of services object ('200 Ok', {}, json.dumps({"storage": {"default": "local", "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}}))])) - resp = Request.blank('/auth/v1.0', + resp = Request.blank( + '/auth/v1.0', headers={'X-Auth-User': 'act:usr', 'X-Auth-Key': 'key'}).get_response(self.test_auth) self.assertEquals(resp.status_int, 200) - self.assertNotEquals(resp.headers.get('x-auth-token'), 'AUTH_tktest') + self.assertNotEquals( + resp.headers.get('x-auth-token'), + 'AUTH_tktest') self.assertEquals(resp.headers.get('x-auth-token'), resp.headers.get('x-storage-token')) self.assertEquals(resp.headers.get('x-storage-url'), 'http://127.0.0.1:8080/v1/AUTH_cfa') - self.assertEquals(json.loads(resp.body), + self.assertEquals( + json.loads(resp.body), {"storage": {"default": "local", - "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}}) + "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}}) self.assertEquals(self.test_auth.app.calls, 7) def test_prep_success(self): @@ -1029,56 +1187,78 @@ class TestAuth(unittest.TestCase): list_to_iter.append(('201 Created', {}, '')) self.test_auth.app = FakeApp(iter(list_to_iter)) resp = Request.blank('/auth/v2/.prep', - environ={'REQUEST_METHOD': 'POST'}, - headers={'X-Auth-Admin-User': '.super_admin', - 'X-Auth-Admin-Key': 'supertest'} - ).get_response(self.test_auth) + environ={ + 'REQUEST_METHOD': 'POST'}, + headers={ + 'X-Auth-Admin-User': + '.super_admin', + 'X-Auth-Admin-Key': 'supertest'} + ).get_response(self.test_auth) self.assertEquals(resp.status_int, 204) self.assertEquals(self.test_auth.app.calls, 18) def test_prep_bad_method(self): resp = Request.blank('/auth/v2/.prep', - headers={'X-Auth-Admin-User': '.super_admin', - 'X-Auth-Admin-Key': 'supertest'} - ).get_response(self.test_auth) + headers={ + 'X-Auth-Admin-User': + '.super_admin', + 'X-Auth-Admin-Key': 'supertest'} + ).get_response(self.test_auth) self.assertEquals(resp.status_int, 400) resp = Request.blank('/auth/v2/.prep', - environ={'REQUEST_METHOD': 'HEAD'}, - headers={'X-Auth-Admin-User': '.super_admin', - 'X-Auth-Admin-Key': 'supertest'} - ).get_response(self.test_auth) + environ={ + 'REQUEST_METHOD': 'HEAD'}, + headers={ + 'X-Auth-Admin-User': + '.super_admin', + 'X-Auth-Admin-Key': 'supertest'} + ).get_response(self.test_auth) self.assertEquals(resp.status_int, 400) resp = Request.blank('/auth/v2/.prep', - environ={'REQUEST_METHOD': 'PUT'}, - headers={'X-Auth-Admin-User': '.super_admin', - 'X-Auth-Admin-Key': 'supertest'} - ).get_response(self.test_auth) + environ={ + 'REQUEST_METHOD': 'PUT'}, + headers={ + 'X-Auth-Admin-User': + '.super_admin', + 'X-Auth-Admin-Key': 'supertest'} + ).get_response(self.test_auth) self.assertEquals(resp.status_int, 400) def test_prep_bad_creds(self): resp = Request.blank('/auth/v2/.prep', - environ={'REQUEST_METHOD': 'POST'}, - headers={'X-Auth-Admin-User': 'super_admin', - 'X-Auth-Admin-Key': 'supertest'} - ).get_response(self.test_auth) + environ={ + 'REQUEST_METHOD': 'POST'}, + headers={ + 'X-Auth-Admin-User': + 'super_admin', + 'X-Auth-Admin-Key': 'supertest'} + ).get_response(self.test_auth) self.assertEquals(resp.status_int, 403) resp = Request.blank('/auth/v2/.prep', - environ={'REQUEST_METHOD': 'POST'}, - headers={'X-Auth-Admin-User': '.super_admin', - 'X-Auth-Admin-Key': 'upertest'} - ).get_response(self.test_auth) + environ={ + 'REQUEST_METHOD': 'POST'}, + headers={ + 'X-Auth-Admin-User': + '.super_admin', + 'X-Auth-Admin-Key': 'upertest'} + ).get_response(self.test_auth) self.assertEquals(resp.status_int, 403) resp = Request.blank('/auth/v2/.prep', - environ={'REQUEST_METHOD': 'POST'}, - headers={'X-Auth-Admin-User': '.super_admin'} - ).get_response(self.test_auth) + environ={ + 'REQUEST_METHOD': 'POST'}, + headers={ + 'X-Auth-Admin-User': '.super_admin'} + ).get_response(self.test_auth) self.assertEquals(resp.status_int, 403) resp = Request.blank('/auth/v2/.prep', - environ={'REQUEST_METHOD': 'POST'}, - headers={'X-Auth-Admin-Key': 'supertest'} - ).get_response(self.test_auth) + environ={ + 'REQUEST_METHOD': 'POST'}, + headers={ + 'X-Auth-Admin-Key': 'supertest'} + ).get_response(self.test_auth) self.assertEquals(resp.status_int, 403) - resp = Request.blank('/auth/v2/.prep', + resp = Request.blank( + '/auth/v2/.prep', environ={'REQUEST_METHOD': 'POST'}).get_response(self.test_auth) self.assertEquals(resp.status_int, 403) @@ -1087,10 +1267,13 @@ class TestAuth(unittest.TestCase): # PUT of .auth account ('503 Service Unavailable', {}, '')])) resp = Request.blank('/auth/v2/.prep', - environ={'REQUEST_METHOD': 'POST'}, - headers={'X-Auth-Admin-User': '.super_admin', - 'X-Auth-Admin-Key': 'supertest'} - ).get_response(self.test_auth) + environ={ + 'REQUEST_METHOD': 'POST'}, + headers={ + 'X-Auth-Admin-User': + '.super_admin', + 'X-Auth-Admin-Key': 'supertest'} + ).get_response(self.test_auth) self.assertEquals(resp.status_int, 500) self.assertEquals(self.test_auth.app.calls, 1) @@ -1101,10 +1284,13 @@ class TestAuth(unittest.TestCase): # PUT of .token container ('503 Service Unavailable', {}, '')])) resp = Request.blank('/auth/v2/.prep', - environ={'REQUEST_METHOD': 'POST'}, - headers={'X-Auth-Admin-User': '.super_admin', - 'X-Auth-Admin-Key': 'supertest'} - ).get_response(self.test_auth) + environ={ + 'REQUEST_METHOD': 'POST'}, + headers={ + 'X-Auth-Admin-User': + '.super_admin', + 'X-Auth-Admin-Key': 'supertest'} + ).get_response(self.test_auth) self.assertEquals(resp.status_int, 500) self.assertEquals(self.test_auth.app.calls, 2) @@ -1117,10 +1303,13 @@ class TestAuth(unittest.TestCase): # PUT of .account_id container ('503 Service Unavailable', {}, '')])) resp = Request.blank('/auth/v2/.prep', - environ={'REQUEST_METHOD': 'POST'}, - headers={'X-Auth-Admin-User': '.super_admin', - 'X-Auth-Admin-Key': 'supertest'} - ).get_response(self.test_auth) + environ={ + 'REQUEST_METHOD': 'POST'}, + headers={ + 'X-Auth-Admin-User': + '.super_admin', + 'X-Auth-Admin-Key': 'supertest'} + ).get_response(self.test_auth) self.assertEquals(resp.status_int, 500) self.assertEquals(self.test_auth.app.calls, 3) @@ -1129,14 +1318,18 @@ class TestAuth(unittest.TestCase): # GET of .auth account (list containers) ('200 Ok', {}, json.dumps([ {"name": ".token", "count": 0, "bytes": 0}, - {"name": ".account_id", "count": 0, "bytes": 0}, + {"name": ".account_id", + "count": 0, "bytes": 0}, {"name": "act", "count": 0, "bytes": 0}])), - # GET of .auth account (list containers continuation) + # GET of .auth account (list containers + # continuation) ('200 Ok', {}, '[]')])) resp = Request.blank('/auth/v2', - headers={'X-Auth-Admin-User': '.super_admin', - 'X-Auth-Admin-Key': 'supertest'} - ).get_response(self.test_auth) + headers={ + 'X-Auth-Admin-User': + '.super_admin', + 'X-Auth-Admin-Key': 'supertest'} + ).get_response(self.test_auth) self.assertEquals(resp.status_int, 200) self.assertEquals(json.loads(resp.body), {"accounts": [{"name": "act"}]}) @@ -1150,14 +1343,18 @@ class TestAuth(unittest.TestCase): # GET of .auth account (list containers) ('200 Ok', {}, json.dumps([ {"name": ".token", "count": 0, "bytes": 0}, - {"name": ".account_id", "count": 0, "bytes": 0}, + {"name": ".account_id", + "count": 0, "bytes": 0}, {"name": "act", "count": 0, "bytes": 0}])), - # GET of .auth account (list containers continuation) + # GET of .auth account (list containers + # continuation) ('200 Ok', {}, '[]')])) resp = Request.blank('/auth/v2', - headers={'X-Auth-Admin-User': 'act:adm', - 'X-Auth-Admin-Key': 'key'} - ).get_response(self.test_auth) + headers={ + 'X-Auth-Admin-User': + 'act:adm', + 'X-Auth-Admin-Key': 'key'} + ).get_response(self.test_auth) self.assertEquals(resp.status_int, 200) self.assertEquals(json.loads(resp.body), {"accounts": [{"name": "act"}]}) @@ -1168,21 +1365,26 @@ class TestAuth(unittest.TestCase): # GET of user object ('404 Not Found', {}, '')])) resp = Request.blank('/auth/v2', - headers={'X-Auth-Admin-User': 'super:admin', - 'X-Auth-Admin-Key': 'supertest'} - ).get_response(self.test_auth) + headers={ + 'X-Auth-Admin-User': + 'super:admin', + 'X-Auth-Admin-Key': 'supertest'} + ).get_response(self.test_auth) self.assertEquals(resp.status_int, 403) self.assertEquals(self.test_auth.app.calls, 1) self.test_auth.app = FakeApp(iter([ - # GET of user object (account admin, but not reseller admin) + # GET of user object (account admin, but not reseller + # admin) ('200 Ok', {}, json.dumps({"groups": [{"name": "act:adm"}, {"name": "test"}, {"name": ".admin"}], - "auth": "plaintext:key"}))])) + "auth": "plaintext:key"}))])) resp = Request.blank('/auth/v2', - headers={'X-Auth-Admin-User': 'act:adm', - 'X-Auth-Admin-Key': 'key'} - ).get_response(self.test_auth) + headers={ + 'X-Auth-Admin-User': + 'act:adm', + 'X-Auth-Admin-Key': 'key'} + ).get_response(self.test_auth) self.assertEquals(resp.status_int, 403) self.assertEquals(self.test_auth.app.calls, 1) @@ -1191,9 +1393,11 @@ class TestAuth(unittest.TestCase): ('200 Ok', {}, json.dumps({"groups": [{"name": "act:usr"}, {"name": "test"}], "auth": "plaintext:key"}))])) resp = Request.blank('/auth/v2', - headers={'X-Auth-Admin-User': 'act:usr', - 'X-Auth-Admin-Key': 'key'} - ).get_response(self.test_auth) + headers={ + 'X-Auth-Admin-User': + 'act:usr', + 'X-Auth-Admin-Key': 'key'} + ).get_response(self.test_auth) self.assertEquals(resp.status_int, 403) self.assertEquals(self.test_auth.app.calls, 1) @@ -1202,9 +1406,11 @@ class TestAuth(unittest.TestCase): # GET of .auth account (list containers) ('503 Service Unavailable', {}, '')])) resp = Request.blank('/auth/v2', - headers={'X-Auth-Admin-User': '.super_admin', - 'X-Auth-Admin-Key': 'supertest'} - ).get_response(self.test_auth) + headers={ + 'X-Auth-Admin-User': + '.super_admin', + 'X-Auth-Admin-Key': 'supertest'} + ).get_response(self.test_auth) self.assertEquals(resp.status_int, 500) self.assertEquals(self.test_auth.app.calls, 1) @@ -1212,46 +1418,59 @@ class TestAuth(unittest.TestCase): # GET of .auth account (list containers) ('200 Ok', {}, json.dumps([ {"name": ".token", "count": 0, "bytes": 0}, - {"name": ".account_id", "count": 0, "bytes": 0}, + {"name": ".account_id", + "count": 0, "bytes": 0}, {"name": "act", "count": 0, "bytes": 0}])), - # GET of .auth account (list containers continuation) + # GET of .auth account (list containers + # continuation) ('503 Service Unavailable', {}, '')])) resp = Request.blank('/auth/v2', - headers={'X-Auth-Admin-User': '.super_admin', - 'X-Auth-Admin-Key': 'supertest'} - ).get_response(self.test_auth) + headers={ + 'X-Auth-Admin-User': + '.super_admin', + 'X-Auth-Admin-Key': 'supertest'} + ).get_response(self.test_auth) self.assertEquals(resp.status_int, 500) self.assertEquals(self.test_auth.app.calls, 2) def test_get_account_success(self): self.test_auth.app = FakeApp(iter([ # GET of .services object - ('200 Ok', {}, json.dumps({"storage": {"default": "local", - "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}})), + ('200 Ok', {}, + json.dumps( + {"storage": {"default": "local", + "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}})), # GET of account container (list objects) ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, json.dumps([ - {"name": ".services", "hash": "etag", "bytes": 112, - "content_type": "application/octet-stream", - "last_modified": "2010-12-03T17:16:27.618110"}, - {"name": "tester", "hash": "etag", "bytes": 104, - "content_type": "application/octet-stream", - "last_modified": "2010-12-03T17:16:27.736680"}, - {"name": "tester3", "hash": "etag", "bytes": 86, - "content_type": "application/octet-stream", - "last_modified": "2010-12-03T17:16:28.135530"}])), - # GET of account container (list objects continuation) + {"name": ".services", "hash": "etag", "bytes": 112, + "content_type": + "application/octet-stream", + "last_modified": "2010-12-03T17:16:27.618110"}, + {"name": "tester", "hash": "etag", "bytes": 104, + "content_type": + "application/octet-stream", + "last_modified": "2010-12-03T17:16:27.736680"}, + {"name": "tester3", "hash": "etag", "bytes": 86, + "content_type": + "application/octet-stream", + "last_modified": "2010-12-03T17:16:28.135530"}])), + # GET of account container (list objects + # continuation) ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, '[]')])) resp = Request.blank('/auth/v2/act', - headers={'X-Auth-Admin-User': '.super_admin', - 'X-Auth-Admin-Key': 'supertest'} - ).get_response(self.test_auth) + headers={ + 'X-Auth-Admin-User': + '.super_admin', + 'X-Auth-Admin-Key': 'supertest'} + ).get_response(self.test_auth) self.assertEquals(resp.status_int, 200) - self.assertEquals(json.loads(resp.body), + self.assertEquals( + json.loads(resp.body), {'account_id': 'AUTH_cfa', 'services': {'storage': - {'default': 'local', - 'local': 'http://127.0.0.1:8080/v1/AUTH_cfa'}}, + {'default': 'local', + 'local': 'http://127.0.0.1:8080/v1/AUTH_cfa'}}, 'users': [{'name': 'tester'}, {'name': 'tester3'}]}) self.assertEquals(self.test_auth.app.calls, 3) @@ -1259,47 +1478,60 @@ class TestAuth(unittest.TestCase): # GET of user object ('200 Ok', {}, json.dumps({"groups": [{"name": "act:adm"}, {"name": "test"}, {"name": ".admin"}], - "auth": "plaintext:key"})), + "auth": "plaintext:key"})), # GET of .services object - ('200 Ok', {}, json.dumps({"storage": {"default": "local", - "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}})), + ('200 Ok', {}, + json.dumps({"storage": + {"default": "local", + "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}})), # GET of account container (list objects) ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, json.dumps([ - {"name": ".services", "hash": "etag", "bytes": 112, - "content_type": "application/octet-stream", - "last_modified": "2010-12-03T17:16:27.618110"}, - {"name": "tester", "hash": "etag", "bytes": 104, - "content_type": "application/octet-stream", - "last_modified": "2010-12-03T17:16:27.736680"}, - {"name": "tester3", "hash": "etag", "bytes": 86, - "content_type": "application/octet-stream", - "last_modified": "2010-12-03T17:16:28.135530"}])), - # GET of account container (list objects continuation) + {"name": ".services", "hash": "etag", "bytes": 112, + "content_type": + "application/octet-stream", + "last_modified": "2010-12-03T17:16:27.618110"}, + {"name": "tester", "hash": "etag", "bytes": 104, + "content_type": + "application/octet-stream", + "last_modified": "2010-12-03T17:16:27.736680"}, + {"name": "tester3", "hash": "etag", "bytes": 86, + "content_type": + "application/octet-stream", + "last_modified": "2010-12-03T17:16:28.135530"}])), + # GET of account container (list objects + # continuation) ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, '[]')])) resp = Request.blank('/auth/v2/act', - headers={'X-Auth-Admin-User': 'act:adm', - 'X-Auth-Admin-Key': 'key'} - ).get_response(self.test_auth) + headers={ + 'X-Auth-Admin-User': + 'act:adm', + 'X-Auth-Admin-Key': 'key'} + ).get_response(self.test_auth) self.assertEquals(resp.status_int, 200) - self.assertEquals(json.loads(resp.body), + self.assertEquals( + json.loads(resp.body), {'account_id': 'AUTH_cfa', 'services': {'storage': - {'default': 'local', - 'local': 'http://127.0.0.1:8080/v1/AUTH_cfa'}}, + {'default': 'local', + 'local': 'http://127.0.0.1:8080/v1/AUTH_cfa'}}, 'users': [{'name': 'tester'}, {'name': 'tester3'}]}) self.assertEquals(self.test_auth.app.calls, 4) def test_get_account_fail_bad_account_name(self): resp = Request.blank('/auth/v2/.token', - headers={'X-Auth-Admin-User': '.super_admin', - 'X-Auth-Admin-Key': 'supertest'} - ).get_response(self.test_auth) + headers={ + 'X-Auth-Admin-User': + '.super_admin', + 'X-Auth-Admin-Key': 'supertest'} + ).get_response(self.test_auth) self.assertEquals(resp.status_int, 400) resp = Request.blank('/auth/v2/.anything', - headers={'X-Auth-Admin-User': '.super_admin', - 'X-Auth-Admin-Key': 'supertest'} - ).get_response(self.test_auth) + headers={ + 'X-Auth-Admin-User': + '.super_admin', + 'X-Auth-Admin-Key': 'supertest'} + ).get_response(self.test_auth) self.assertEquals(resp.status_int, 400) def test_get_account_fail_creds(self): @@ -1307,21 +1539,26 @@ class TestAuth(unittest.TestCase): # GET of user object ('404 Not Found', {}, '')])) resp = Request.blank('/auth/v2/act', - headers={'X-Auth-Admin-User': 'super:admin', - 'X-Auth-Admin-Key': 'supertest'} - ).get_response(self.test_auth) + headers={ + 'X-Auth-Admin-User': + 'super:admin', + 'X-Auth-Admin-Key': 'supertest'} + ).get_response(self.test_auth) self.assertEquals(resp.status_int, 403) self.assertEquals(self.test_auth.app.calls, 1) self.test_auth.app = FakeApp(iter([ - # GET of user object (account admin, but wrong account) + # GET of user object (account admin, but wrong + # account) ('200 Ok', {}, json.dumps({"groups": [{"name": "act2:adm"}, {"name": "test"}, {"name": ".admin"}], - "auth": "plaintext:key"}))])) + "auth": "plaintext:key"}))])) resp = Request.blank('/auth/v2/act', - headers={'X-Auth-Admin-User': 'act2:adm', - 'X-Auth-Admin-Key': 'key'} - ).get_response(self.test_auth) + headers={ + 'X-Auth-Admin-User': + 'act2:adm', + 'X-Auth-Admin-Key': 'key'} + ).get_response(self.test_auth) self.assertEquals(resp.status_int, 403) self.assertEquals(self.test_auth.app.calls, 1) @@ -1330,9 +1567,11 @@ class TestAuth(unittest.TestCase): ('200 Ok', {}, json.dumps({"groups": [{"name": "act:usr"}, {"name": "test"}], "auth": "plaintext:key"}))])) resp = Request.blank('/auth/v2/act', - headers={'X-Auth-Admin-User': 'act:usr', - 'X-Auth-Admin-Key': 'key'} - ).get_response(self.test_auth) + headers={ + 'X-Auth-Admin-User': + 'act:usr', + 'X-Auth-Admin-Key': 'key'} + ).get_response(self.test_auth) self.assertEquals(resp.status_int, 403) self.assertEquals(self.test_auth.app.calls, 1) @@ -1341,9 +1580,11 @@ class TestAuth(unittest.TestCase): # GET of .services object ('503 Service Unavailable', {}, '')])) resp = Request.blank('/auth/v2/act', - headers={'X-Auth-Admin-User': '.super_admin', - 'X-Auth-Admin-Key': 'supertest'} - ).get_response(self.test_auth) + headers={ + 'X-Auth-Admin-User': + '.super_admin', + 'X-Auth-Admin-Key': 'supertest'} + ).get_response(self.test_auth) self.assertEquals(resp.status_int, 500) self.assertEquals(self.test_auth.app.calls, 1) @@ -1351,79 +1592,102 @@ class TestAuth(unittest.TestCase): # GET of .services object ('404 Not Found', {}, '')])) resp = Request.blank('/auth/v2/act', - headers={'X-Auth-Admin-User': '.super_admin', - 'X-Auth-Admin-Key': 'supertest'} - ).get_response(self.test_auth) + headers={ + 'X-Auth-Admin-User': + '.super_admin', + 'X-Auth-Admin-Key': 'supertest'} + ).get_response(self.test_auth) self.assertEquals(resp.status_int, 404) self.assertEquals(self.test_auth.app.calls, 1) def test_get_account_fail_listing(self): self.test_auth.app = FakeApp(iter([ # GET of .services object - ('200 Ok', {}, json.dumps({"storage": {"default": "local", - "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}})), + ('200 Ok', {}, json.dumps( + {"storage": {"default": "local", + "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}})), # GET of account container (list objects) ('503 Service Unavailable', {}, '')])) resp = Request.blank('/auth/v2/act', - headers={'X-Auth-Admin-User': '.super_admin', - 'X-Auth-Admin-Key': 'supertest'} - ).get_response(self.test_auth) + headers={ + 'X-Auth-Admin-User': + '.super_admin', + 'X-Auth-Admin-Key': 'supertest'} + ).get_response(self.test_auth) self.assertEquals(resp.status_int, 500) self.assertEquals(self.test_auth.app.calls, 2) self.test_auth.app = FakeApp(iter([ # GET of .services object - ('200 Ok', {}, json.dumps({"storage": {"default": "local", - "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}})), + ('200 Ok', {}, json.dumps( + {"storage": {"default": "local", + "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}})), # GET of account container (list objects) ('404 Not Found', {}, '')])) resp = Request.blank('/auth/v2/act', - headers={'X-Auth-Admin-User': '.super_admin', - 'X-Auth-Admin-Key': 'supertest'} - ).get_response(self.test_auth) + headers={ + 'X-Auth-Admin-User': + '.super_admin', + 'X-Auth-Admin-Key': 'supertest'} + ).get_response(self.test_auth) self.assertEquals(resp.status_int, 404) self.assertEquals(self.test_auth.app.calls, 2) self.test_auth.app = FakeApp(iter([ # GET of .services object - ('200 Ok', {}, json.dumps({"storage": {"default": "local", - "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}})), + ('200 Ok', {}, json.dumps( + {"storage": {"default": "local", + "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}})), # GET of account container (list objects) ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, json.dumps([ - {"name": ".services", "hash": "etag", "bytes": 112, - "content_type": "application/octet-stream", - "last_modified": "2010-12-03T17:16:27.618110"}, - {"name": "tester", "hash": "etag", "bytes": 104, - "content_type": "application/octet-stream", - "last_modified": "2010-12-03T17:16:27.736680"}, - {"name": "tester3", "hash": "etag", "bytes": 86, - "content_type": "application/octet-stream", - "last_modified": "2010-12-03T17:16:28.135530"}])), - # GET of account container (list objects continuation) + {"name": ".services", "hash": "etag", "bytes": 112, + "content_type": + "application/octet-stream", + "last_modified": "2010-12-03T17:16:27.618110"}, + {"name": "tester", "hash": "etag", "bytes": 104, + "content_type": + "application/octet-stream", + "last_modified": "2010-12-03T17:16:27.736680"}, + {"name": "tester3", "hash": "etag", "bytes": 86, + "content_type": + "application/octet-stream", + "last_modified": "2010-12-03T17:16:28.135530"}])), + # GET of account container (list objects + # continuation) ('503 Service Unavailable', {}, '')])) resp = Request.blank('/auth/v2/act', - headers={'X-Auth-Admin-User': '.super_admin', - 'X-Auth-Admin-Key': 'supertest'} - ).get_response(self.test_auth) + headers={ + 'X-Auth-Admin-User': + '.super_admin', + 'X-Auth-Admin-Key': 'supertest'} + ).get_response(self.test_auth) self.assertEquals(resp.status_int, 500) self.assertEquals(self.test_auth.app.calls, 3) def test_set_services_new_service(self): self.test_auth.app = FakeApp(iter([ # GET of .services object - ('200 Ok', {}, json.dumps({"storage": {"default": "local", - "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}})), + ('200 Ok', {}, + json.dumps({"storage": + {"default": "local", + "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}})), # PUT of new .services object ('204 No Content', {}, '')])) resp = Request.blank('/auth/v2/act/.services', - environ={'REQUEST_METHOD': 'POST'}, - headers={'X-Auth-Admin-User': '.super_admin', - 'X-Auth-Admin-Key': 'supertest'}, - body=json.dumps({'new_service': {'new_endpoint': 'new_value'}}) - ).get_response(self.test_auth) + environ={ + 'REQUEST_METHOD': 'POST'}, + headers={ + 'X-Auth-Admin-User': + '.super_admin', + 'X-Auth-Admin-Key': 'supertest'}, + body=json.dumps( + {'new_service': + {'new_endpoint': 'new_value'}}) + ).get_response(self.test_auth) self.assertEquals(resp.status_int, 200) - self.assertEquals(json.loads(resp.body), + self.assertEquals( + json.loads(resp.body), {'storage': {'default': 'local', 'local': 'http://127.0.0.1:8080/v1/AUTH_cfa'}, 'new_service': {'new_endpoint': 'new_value'}}) @@ -1432,40 +1696,53 @@ class TestAuth(unittest.TestCase): def test_set_services_new_endpoint(self): self.test_auth.app = FakeApp(iter([ # GET of .services object - ('200 Ok', {}, json.dumps({"storage": {"default": "local", - "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}})), + ('200 Ok', {}, json.dumps( + {"storage": + {"default": "local", + "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}})), # PUT of new .services object ('204 No Content', {}, '')])) resp = Request.blank('/auth/v2/act/.services', - environ={'REQUEST_METHOD': 'POST'}, - headers={'X-Auth-Admin-User': '.super_admin', - 'X-Auth-Admin-Key': 'supertest'}, - body=json.dumps({'storage': {'new_endpoint': 'new_value'}}) - ).get_response(self.test_auth) + environ={ + 'REQUEST_METHOD': 'POST'}, + headers={ + 'X-Auth-Admin-User': + '.super_admin', + 'X-Auth-Admin-Key': 'supertest'}, + body=json.dumps( + {'storage': {'new_endpoint': 'new_value'}}) + ).get_response(self.test_auth) self.assertEquals(resp.status_int, 200) - self.assertEquals(json.loads(resp.body), + self.assertEquals( + json.loads(resp.body), {'storage': {'default': 'local', - 'local': 'http://127.0.0.1:8080/v1/AUTH_cfa', + 'local': + 'http://127.0.0.1:8080/v1/AUTH_cfa', 'new_endpoint': 'new_value'}}) self.assertEquals(self.test_auth.app.calls, 2) def test_set_services_update_endpoint(self): self.test_auth.app = FakeApp(iter([ # GET of .services object - ('200 Ok', {}, json.dumps({"storage": {"default": "local", - "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}})), + ('200 Ok', {}, json.dumps( + {"storage": {"default": "local", + "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}})), # PUT of new .services object ('204 No Content', {}, '')])) resp = Request.blank('/auth/v2/act/.services', - environ={'REQUEST_METHOD': 'POST'}, - headers={'X-Auth-Admin-User': '.super_admin', - 'X-Auth-Admin-Key': 'supertest'}, - body=json.dumps({'storage': {'local': 'new_value'}}) - ).get_response(self.test_auth) + environ={ + 'REQUEST_METHOD': 'POST'}, + headers={ + 'X-Auth-Admin-User': + '.super_admin', + 'X-Auth-Admin-Key': 'supertest'}, + body=json.dumps( + {'storage': {'local': 'new_value'}}) + ).get_response(self.test_auth) self.assertEquals(resp.status_int, 200) self.assertEquals(json.loads(resp.body), - {'storage': {'default': 'local', - 'local': 'new_value'}}) + {'storage': {'default': 'local', + 'local': 'new_value'}}) self.assertEquals(self.test_auth.app.calls, 2) def test_set_services_fail_bad_creds(self): @@ -1473,25 +1750,34 @@ class TestAuth(unittest.TestCase): # GET of user object ('404 Not Found', {}, '')])) resp = Request.blank('/auth/v2/act/.services', - environ={'REQUEST_METHOD': 'POST'}, - headers={'X-Auth-Admin-User': 'super:admin', - 'X-Auth-Admin-Key': 'supertest'}, - body=json.dumps({'storage': {'local': 'new_value'}}) - ).get_response(self.test_auth) + environ={ + 'REQUEST_METHOD': 'POST'}, + headers={ + 'X-Auth-Admin-User': + 'super:admin', + 'X-Auth-Admin-Key': 'supertest'}, + body=json.dumps( + {'storage': {'local': 'new_value'}}) + ).get_response(self.test_auth) self.assertEquals(resp.status_int, 403) self.assertEquals(self.test_auth.app.calls, 1) self.test_auth.app = FakeApp(iter([ - # GET of user object (account admin, but not reseller admin) + # GET of user object (account admin, but not reseller + # admin) ('200 Ok', {}, json.dumps({"groups": [{"name": "act:adm"}, {"name": "test"}, {"name": ".admin"}], - "auth": "plaintext:key"}))])) + "auth": "plaintext:key"}))])) resp = Request.blank('/auth/v2/act/.services', - environ={'REQUEST_METHOD': 'POST'}, - headers={'X-Auth-Admin-User': 'act:adm', - 'X-Auth-Admin-Key': 'key'}, - body=json.dumps({'storage': {'local': 'new_value'}}) - ).get_response(self.test_auth) + environ={ + 'REQUEST_METHOD': 'POST'}, + headers={ + 'X-Auth-Admin-User': + 'act:adm', + 'X-Auth-Admin-Key': 'key'}, + body=json.dumps( + {'storage': {'local': 'new_value'}}) + ).get_response(self.test_auth) self.assertEquals(resp.status_int, 403) self.assertEquals(self.test_auth.app.calls, 1) @@ -1500,37 +1786,51 @@ class TestAuth(unittest.TestCase): ('200 Ok', {}, json.dumps({"groups": [{"name": "act:usr"}, {"name": "test"}], "auth": "plaintext:key"}))])) resp = Request.blank('/auth/v2/act/.services', - environ={'REQUEST_METHOD': 'POST'}, - headers={'X-Auth-Admin-User': 'act:usr', - 'X-Auth-Admin-Key': 'key'}, - body=json.dumps({'storage': {'local': 'new_value'}}) - ).get_response(self.test_auth) + environ={ + 'REQUEST_METHOD': 'POST'}, + headers={ + 'X-Auth-Admin-User': + 'act:usr', + 'X-Auth-Admin-Key': 'key'}, + body=json.dumps( + {'storage': {'local': 'new_value'}}) + ).get_response(self.test_auth) self.assertEquals(resp.status_int, 403) self.assertEquals(self.test_auth.app.calls, 1) def test_set_services_fail_bad_account_name(self): resp = Request.blank('/auth/v2/.act/.services', - environ={'REQUEST_METHOD': 'POST'}, - headers={'X-Auth-Admin-User': '.super_admin', - 'X-Auth-Admin-Key': 'supertest'}, - body=json.dumps({'storage': {'local': 'new_value'}}) - ).get_response(self.test_auth) + environ={ + 'REQUEST_METHOD': 'POST'}, + headers={ + 'X-Auth-Admin-User': + '.super_admin', + 'X-Auth-Admin-Key': 'supertest'}, + body=json.dumps( + {'storage': {'local': 'new_value'}}) + ).get_response(self.test_auth) self.assertEquals(resp.status_int, 400) def test_set_services_fail_bad_json(self): resp = Request.blank('/auth/v2/act/.services', - environ={'REQUEST_METHOD': 'POST'}, - headers={'X-Auth-Admin-User': '.super_admin', - 'X-Auth-Admin-Key': 'supertest'}, - body='garbage' - ).get_response(self.test_auth) + environ={ + 'REQUEST_METHOD': 'POST'}, + headers={ + 'X-Auth-Admin-User': + '.super_admin', + 'X-Auth-Admin-Key': 'supertest'}, + body='garbage' + ).get_response(self.test_auth) self.assertEquals(resp.status_int, 400) resp = Request.blank('/auth/v2/act/.services', - environ={'REQUEST_METHOD': 'POST'}, - headers={'X-Auth-Admin-User': '.super_admin', - 'X-Auth-Admin-Key': 'supertest'}, - body='' - ).get_response(self.test_auth) + environ={ + 'REQUEST_METHOD': 'POST'}, + headers={ + 'X-Auth-Admin-User': + '.super_admin', + 'X-Auth-Admin-Key': 'supertest'}, + body='' + ).get_response(self.test_auth) self.assertEquals(resp.status_int, 400) def test_set_services_fail_get_services(self): @@ -1538,11 +1838,15 @@ class TestAuth(unittest.TestCase): # GET of .services object ('503 Unavailable', {}, '')])) resp = Request.blank('/auth/v2/act/.services', - environ={'REQUEST_METHOD': 'POST'}, - headers={'X-Auth-Admin-User': '.super_admin', - 'X-Auth-Admin-Key': 'supertest'}, - body=json.dumps({'new_service': {'new_endpoint': 'new_value'}}) - ).get_response(self.test_auth) + environ={ + 'REQUEST_METHOD': 'POST'}, + headers={ + 'X-Auth-Admin-User': + '.super_admin', + 'X-Auth-Admin-Key': 'supertest'}, + body=json.dumps({ + 'new_service': {'new_endpoint': 'new_value'}}) + ).get_response(self.test_auth) self.assertEquals(resp.status_int, 500) self.assertEquals(self.test_auth.app.calls, 1) @@ -1550,27 +1854,37 @@ class TestAuth(unittest.TestCase): # GET of .services object ('404 Not Found', {}, '')])) resp = Request.blank('/auth/v2/act/.services', - environ={'REQUEST_METHOD': 'POST'}, - headers={'X-Auth-Admin-User': '.super_admin', - 'X-Auth-Admin-Key': 'supertest'}, - body=json.dumps({'new_service': {'new_endpoint': 'new_value'}}) - ).get_response(self.test_auth) + environ={ + 'REQUEST_METHOD': 'POST'}, + headers={ + 'X-Auth-Admin-User': + '.super_admin', + 'X-Auth-Admin-Key': 'supertest'}, + body=json.dumps({ + 'new_service': {'new_endpoint': 'new_value'}}) + ).get_response(self.test_auth) self.assertEquals(resp.status_int, 404) self.assertEquals(self.test_auth.app.calls, 1) def test_set_services_fail_put_services(self): self.test_auth.app = FakeApp(iter([ # GET of .services object - ('200 Ok', {}, json.dumps({"storage": {"default": "local", - "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}})), + ('200 Ok', {}, json.dumps( + {"storage": {"default": "local", + "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}})), # PUT of new .services object ('503 Unavailable', {}, '')])) resp = Request.blank('/auth/v2/act/.services', - environ={'REQUEST_METHOD': 'POST'}, - headers={'X-Auth-Admin-User': '.super_admin', - 'X-Auth-Admin-Key': 'supertest'}, - body=json.dumps({'new_service': {'new_endpoint': 'new_value'}}) - ).get_response(self.test_auth) + environ={ + 'REQUEST_METHOD': 'POST'}, + headers={ + 'X-Auth-Admin-User': + '.super_admin', + 'X-Auth-Admin-Key': 'supertest'}, + body=json.dumps( + {'new_service': + {'new_endpoint': 'new_value'}}) + ).get_response(self.test_auth) self.assertEquals(resp.status_int, 500) self.assertEquals(self.test_auth.app.calls, 2) @@ -1580,7 +1894,8 @@ class TestAuth(unittest.TestCase): ('201 Created', {}, '')])) self.test_auth.get_conn = lambda: conn self.test_auth.app = FakeApp(iter([ - # Initial HEAD of account container to check for pre-existence + # Initial HEAD of account container to check for + # pre-existence ('404 Not Found', {}, ''), # PUT of account container ('204 No Content', {}, ''), @@ -1588,18 +1903,22 @@ class TestAuth(unittest.TestCase): ('204 No Content', {}, ''), # PUT of .services object ('204 No Content', {}, ''), - # POST to account container updating X-Container-Meta-Account-Id + # POST to account container updating + # X-Container-Meta-Account-Id ('204 No Content', {}, '')])) - resp = Request.blank('/auth/v2/act', - environ={'REQUEST_METHOD': 'PUT', 'swift.cache': FakeMemcache()}, + resp = Request.blank( + '/auth/v2/act', + environ={ + 'REQUEST_METHOD': 'PUT', 'swift.cache': FakeMemcache()}, headers={'X-Auth-Admin-User': '.super_admin', 'X-Auth-Admin-Key': 'supertest'} - ).get_response(self.test_auth) + ).get_response(self.test_auth) self.assertEquals(resp.status_int, 201) self.assertEquals(self.test_auth.app.calls, 5) self.assertEquals(conn.calls, 1) - def test_put_account_success_preexist_but_not_completed(self): + def test_put_account_success_preexist_but_not_completed( + self): conn = FakeConn(iter([ # PUT of storage account itself ('201 Created', {}, '')])) @@ -1607,34 +1926,43 @@ class TestAuth(unittest.TestCase): self.test_auth.app = FakeApp(iter([ # Initial HEAD of account container to check for pre-existence # We're going to show it as existing this time, but with no - # X-Container-Meta-Account-Id, indicating a failed previous attempt + # X-Container-Meta-Account-Id, indicating a failed + # previous attempt ('200 Ok', {}, ''), # PUT of .account_id mapping object ('204 No Content', {}, ''), # PUT of .services object ('204 No Content', {}, ''), - # POST to account container updating X-Container-Meta-Account-Id + # POST to account container updating + # X-Container-Meta-Account-Id ('204 No Content', {}, '')])) - resp = Request.blank('/auth/v2/act', - environ={'REQUEST_METHOD': 'PUT', 'swift.cache': FakeMemcache()}, + resp = Request.blank( + '/auth/v2/act', + environ={ + 'REQUEST_METHOD': 'PUT', + 'swift.cache': FakeMemcache()}, headers={'X-Auth-Admin-User': '.super_admin', 'X-Auth-Admin-Key': 'supertest'} - ).get_response(self.test_auth) + ).get_response(self.test_auth) self.assertEquals(resp.status_int, 201) self.assertEquals(self.test_auth.app.calls, 4) self.assertEquals(conn.calls, 1) - def test_put_account_success_preexist_and_completed(self): + def test_put_account_success_preexist_and_completed( + self): self.test_auth.app = FakeApp(iter([ # Initial HEAD of account container to check for pre-existence # We're going to show it as existing this time, and with an - # X-Container-Meta-Account-Id, indicating it already exists + # X-Container-Meta-Account-Id, indicating it already + # exists ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, '')])) - resp = Request.blank('/auth/v2/act', - environ={'REQUEST_METHOD': 'PUT', 'swift.cache': FakeMemcache()}, + resp = Request.blank( + '/auth/v2/act', + environ={ + 'REQUEST_METHOD': 'PUT', 'swift.cache': FakeMemcache()}, headers={'X-Auth-Admin-User': '.super_admin', 'X-Auth-Admin-Key': 'supertest'} - ).get_response(self.test_auth) + ).get_response(self.test_auth) self.assertEquals(resp.status_int, 202) self.assertEquals(self.test_auth.app.calls, 1) @@ -1644,7 +1972,8 @@ class TestAuth(unittest.TestCase): ('201 Created', {}, '')])) self.test_auth.get_conn = lambda: conn self.test_auth.app = FakeApp(iter([ - # Initial HEAD of account container to check for pre-existence + # Initial HEAD of account container to check for + # pre-existence ('404 Not Found', {}, ''), # PUT of account container ('204 No Content', {}, ''), @@ -1652,16 +1981,21 @@ class TestAuth(unittest.TestCase): ('204 No Content', {}, ''), # PUT of .services object ('204 No Content', {}, ''), - # POST to account container updating X-Container-Meta-Account-Id + # POST to account container updating + # X-Container-Meta-Account-Id ('204 No Content', {}, '')])) - resp = Request.blank('/auth/v2/act', - environ={'REQUEST_METHOD': 'PUT', 'swift.cache': FakeMemcache()}, + resp = Request.blank( + '/auth/v2/act', + environ={ + 'REQUEST_METHOD': 'PUT', 'swift.cache': FakeMemcache()}, headers={'X-Auth-Admin-User': '.super_admin', 'X-Auth-Admin-Key': 'supertest', - 'X-Account-Suffix': 'test-suffix'} - ).get_response(self.test_auth) + 'X-Account-Suffix': 'test-suffix'}).get_response( + self.test_auth) self.assertEquals(resp.status_int, 201) - self.assertEquals(conn.request_path, '/v1/AUTH_test-suffix') + self.assertEquals( + conn.request_path, + '/v1/AUTH_test-suffix') self.assertEquals(self.test_auth.app.calls, 5) self.assertEquals(conn.calls, 1) @@ -1669,24 +2003,29 @@ class TestAuth(unittest.TestCase): self.test_auth.app = FakeApp(iter([ # GET of user object ('404 Not Found', {}, '')])) - resp = Request.blank('/auth/v2/act', - environ={'REQUEST_METHOD': 'PUT', 'swift.cache': FakeMemcache()}, + resp = Request.blank( + '/auth/v2/act', + environ={'REQUEST_METHOD': 'PUT', + 'swift.cache': FakeMemcache()}, headers={'X-Auth-Admin-User': 'super:admin', - 'X-Auth-Admin-Key': 'supertest'}, - ).get_response(self.test_auth) + 'X-Auth-Admin-Key': 'supertest'},).get_response( + self.test_auth) self.assertEquals(resp.status_int, 403) self.assertEquals(self.test_auth.app.calls, 1) self.test_auth.app = FakeApp(iter([ - # GET of user object (account admin, but not reseller admin) + # GET of user object (account admin, but not reseller + # admin) ('200 Ok', {}, json.dumps({"groups": [{"name": "act:adm"}, {"name": "test"}, {"name": ".admin"}], - "auth": "plaintext:key"}))])) - resp = Request.blank('/auth/v2/act', - environ={'REQUEST_METHOD': 'PUT', 'swift.cache': FakeMemcache()}, + "auth": "plaintext:key"}))])) + resp = Request.blank( + '/auth/v2/act', + environ={'REQUEST_METHOD': 'PUT', + 'swift.cache': FakeMemcache()}, headers={'X-Auth-Admin-User': 'act:adm', - 'X-Auth-Admin-Key': 'key'}, - ).get_response(self.test_auth) + 'X-Auth-Admin-Key': 'key'},).get_response( + self.test_auth) self.assertEquals(resp.status_int, 403) self.assertEquals(self.test_auth.app.calls, 1) @@ -1694,45 +2033,53 @@ class TestAuth(unittest.TestCase): # GET of user object (regular user) ('200 Ok', {}, json.dumps({"groups": [{"name": "act:usr"}, {"name": "test"}], "auth": "plaintext:key"}))])) - resp = Request.blank('/auth/v2/act', + resp = Request.blank( + '/auth/v2/act', environ={'REQUEST_METHOD': 'PUT', 'swift.cache': FakeMemcache()}, headers={'X-Auth-Admin-User': 'act:usr', - 'X-Auth-Admin-Key': 'key'}, - ).get_response(self.test_auth) + 'X-Auth-Admin-Key': 'key'},).get_response( + self.test_auth) self.assertEquals(resp.status_int, 403) self.assertEquals(self.test_auth.app.calls, 1) def test_put_account_fail_invalid_account_name(self): - resp = Request.blank('/auth/v2/.act', + resp = Request.blank( + '/auth/v2/.act', environ={'REQUEST_METHOD': 'PUT', 'swift.cache': FakeMemcache()}, headers={'X-Auth-Admin-User': '.super_admin', - 'X-Auth-Admin-Key': 'supertest'}, - ).get_response(self.test_auth) + 'X-Auth-Admin-Key': 'supertest'},).get_response( + self.test_auth) self.assertEquals(resp.status_int, 400) def test_put_account_fail_on_initial_account_head(self): self.test_auth.app = FakeApp(iter([ - # Initial HEAD of account container to check for pre-existence + # Initial HEAD of account container to check for + # pre-existence ('503 Service Unavailable', {}, '')])) - resp = Request.blank('/auth/v2/act', - environ={'REQUEST_METHOD': 'PUT', 'swift.cache': FakeMemcache()}, + resp = Request.blank( + '/auth/v2/act', + environ={'REQUEST_METHOD': 'PUT', + 'swift.cache': FakeMemcache()}, headers={'X-Auth-Admin-User': '.super_admin', - 'X-Auth-Admin-Key': 'supertest'} - ).get_response(self.test_auth) + 'X-Auth-Admin-Key': 'supertest'}).get_response( + self.test_auth) self.assertEquals(resp.status_int, 500) self.assertEquals(self.test_auth.app.calls, 1) def test_put_account_fail_on_account_marker_put(self): self.test_auth.app = FakeApp(iter([ - # Initial HEAD of account container to check for pre-existence + # Initial HEAD of account container to check for + # pre-existence ('404 Not Found', {}, ''), # PUT of account container ('503 Service Unavailable', {}, '')])) - resp = Request.blank('/auth/v2/act', - environ={'REQUEST_METHOD': 'PUT', 'swift.cache': FakeMemcache()}, + resp = Request.blank( + '/auth/v2/act', + environ={'REQUEST_METHOD': 'PUT', + 'swift.cache': FakeMemcache()}, headers={'X-Auth-Admin-User': '.super_admin', - 'X-Auth-Admin-Key': 'supertest'} - ).get_response(self.test_auth) + 'X-Auth-Admin-Key': 'supertest'}).get_response( + self.test_auth) self.assertEquals(resp.status_int, 500) self.assertEquals(self.test_auth.app.calls, 2) @@ -1742,15 +2089,18 @@ class TestAuth(unittest.TestCase): ('503 Service Unavailable', {}, '')])) self.test_auth.get_conn = lambda: conn self.test_auth.app = FakeApp(iter([ - # Initial HEAD of account container to check for pre-existence + # Initial HEAD of account container to check for + # pre-existence ('404 Not Found', {}, ''), # PUT of account container ('204 No Content', {}, '')])) - resp = Request.blank('/auth/v2/act', - environ={'REQUEST_METHOD': 'PUT', 'swift.cache': FakeMemcache()}, + resp = Request.blank( + '/auth/v2/act', + environ={'REQUEST_METHOD': 'PUT', + 'swift.cache': FakeMemcache()}, headers={'X-Auth-Admin-User': '.super_admin', - 'X-Auth-Admin-Key': 'supertest'} - ).get_response(self.test_auth) + 'X-Auth-Admin-Key': 'supertest'}).get_response( + self.test_auth) self.assertEquals(resp.status_int, 500) self.assertEquals(conn.calls, 1) self.assertEquals(self.test_auth.app.calls, 2) @@ -1761,17 +2111,20 @@ class TestAuth(unittest.TestCase): ('201 Created', {}, '')])) self.test_auth.get_conn = lambda: conn self.test_auth.app = FakeApp(iter([ - # Initial HEAD of account container to check for pre-existence + # Initial HEAD of account container to check for + # pre-existence ('404 Not Found', {}, ''), # PUT of account container ('204 No Content', {}, ''), # PUT of .account_id mapping object ('503 Service Unavailable', {}, '')])) - resp = Request.blank('/auth/v2/act', - environ={'REQUEST_METHOD': 'PUT', 'swift.cache': FakeMemcache()}, + resp = Request.blank( + '/auth/v2/act', + environ={'REQUEST_METHOD': 'PUT', + 'swift.cache': FakeMemcache()}, headers={'X-Auth-Admin-User': '.super_admin', - 'X-Auth-Admin-Key': 'supertest'} - ).get_response(self.test_auth) + 'X-Auth-Admin-Key': 'supertest'}).get_response( + self.test_auth) self.assertEquals(resp.status_int, 500) self.assertEquals(conn.calls, 1) self.assertEquals(self.test_auth.app.calls, 3) @@ -1782,7 +2135,8 @@ class TestAuth(unittest.TestCase): ('201 Created', {}, '')])) self.test_auth.get_conn = lambda: conn self.test_auth.app = FakeApp(iter([ - # Initial HEAD of account container to check for pre-existence + # Initial HEAD of account container to check for + # pre-existence ('404 Not Found', {}, ''), # PUT of account container ('204 No Content', {}, ''), @@ -1790,11 +2144,13 @@ class TestAuth(unittest.TestCase): ('204 No Content', {}, ''), # PUT of .services object ('503 Service Unavailable', {}, '')])) - resp = Request.blank('/auth/v2/act', - environ={'REQUEST_METHOD': 'PUT', 'swift.cache': FakeMemcache()}, + resp = Request.blank( + '/auth/v2/act', + environ={'REQUEST_METHOD': 'PUT', + 'swift.cache': FakeMemcache()}, headers={'X-Auth-Admin-User': '.super_admin', - 'X-Auth-Admin-Key': 'supertest'} - ).get_response(self.test_auth) + 'X-Auth-Admin-Key': 'supertest'}).get_response( + self.test_auth) self.assertEquals(resp.status_int, 500) self.assertEquals(conn.calls, 1) self.assertEquals(self.test_auth.app.calls, 4) @@ -1805,7 +2161,8 @@ class TestAuth(unittest.TestCase): ('201 Created', {}, '')])) self.test_auth.get_conn = lambda: conn self.test_auth.app = FakeApp(iter([ - # Initial HEAD of account container to check for pre-existence + # Initial HEAD of account container to check for + # pre-existence ('404 Not Found', {}, ''), # PUT of account container ('204 No Content', {}, ''), @@ -1813,13 +2170,15 @@ class TestAuth(unittest.TestCase): ('204 No Content', {}, ''), # PUT of .services object ('204 No Content', {}, ''), - # POST to account container updating X-Container-Meta-Account-Id + # POST to account container updating + # X-Container-Meta-Account-Id ('503 Service Unavailable', {}, '')])) - resp = Request.blank('/auth/v2/act', + resp = Request.blank( + '/auth/v2/act', environ={'REQUEST_METHOD': 'PUT', 'swift.cache': FakeMemcache()}, headers={'X-Auth-Admin-User': '.super_admin', - 'X-Auth-Admin-Key': 'supertest'} - ).get_response(self.test_auth) + 'X-Auth-Admin-Key': 'supertest'}).get_response( + self.test_auth) self.assertEquals(resp.status_int, 500) self.assertEquals(conn.calls, 1) self.assertEquals(self.test_auth.app.calls, 5) @@ -1830,17 +2189,22 @@ class TestAuth(unittest.TestCase): ('204 No Content', {}, '')])) self.test_auth.get_conn = lambda x: conn self.test_auth.app = FakeApp(iter([ - # Account's container listing, checking for users + # Account's container listing, checking for + # users ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, json.dumps([ - {"name": ".services", "hash": "etag", "bytes": 112, - "content_type": "application/octet-stream", - "last_modified": "2010-12-03T17:16:27.618110"}])), - # Account's container listing, checking for users (continuation) - ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, '[]'), + {"name": ".services", "hash": "etag", "bytes": 112, + "content_type": + "application/octet-stream", + "last_modified": "2010-12-03T17:16:27.618110"}])), + # Account's container listing, checking for users + # (continuation) + ('200 Ok', + {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, '[]'), # GET the .services object - ('200 Ok', {}, json.dumps({"storage": {"default": "local", - "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}})), + ('200 Ok', {}, json.dumps( + {"storage": {"default": "local", + "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}})), # DELETE the .services object ('204 No Content', {}, ''), # DELETE the .account_id mapping object @@ -1848,25 +2212,32 @@ class TestAuth(unittest.TestCase): # DELETE the account container ('204 No Content', {}, '')])) resp = Request.blank('/auth/v2/act', - environ={'REQUEST_METHOD': 'DELETE', - 'swift.cache': FakeMemcache()}, - headers={'X-Auth-Admin-User': '.super_admin', - 'X-Auth-Admin-Key': 'supertest'} - ).get_response(self.test_auth) + environ={ + 'REQUEST_METHOD': 'DELETE', + 'swift.cache': FakeMemcache()}, + headers={ + 'X-Auth-Admin-User': + '.super_admin', + 'X-Auth-Admin-Key': 'supertest'} + ).get_response(self.test_auth) self.assertEquals(resp.status_int, 204) self.assertEquals(self.test_auth.app.calls, 6) self.assertEquals(conn.calls, 1) def test_delete_account_success_missing_services(self): self.test_auth.app = FakeApp(iter([ - # Account's container listing, checking for users + # Account's container listing, checking for + # users ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, json.dumps([ - {"name": ".services", "hash": "etag", "bytes": 112, - "content_type": "application/octet-stream", - "last_modified": "2010-12-03T17:16:27.618110"}])), - # Account's container listing, checking for users (continuation) - ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, '[]'), + {"name": ".services", "hash": "etag", "bytes": 112, + "content_type": + "application/octet-stream", + "last_modified": "2010-12-03T17:16:27.618110"}])), + # Account's container listing, checking for users + # (continuation) + ('200 Ok', + {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, '[]'), # GET the .services object ('404 Not Found', {}, ''), # DELETE the .account_id mapping object @@ -1874,31 +2245,40 @@ class TestAuth(unittest.TestCase): # DELETE the account container ('204 No Content', {}, '')])) resp = Request.blank('/auth/v2/act', - environ={'REQUEST_METHOD': 'DELETE', - 'swift.cache': FakeMemcache()}, - headers={'X-Auth-Admin-User': '.super_admin', - 'X-Auth-Admin-Key': 'supertest'} - ).get_response(self.test_auth) + environ={ + 'REQUEST_METHOD': 'DELETE', + 'swift.cache': FakeMemcache()}, + headers={ + 'X-Auth-Admin-User': + '.super_admin', + 'X-Auth-Admin-Key': 'supertest'} + ).get_response(self.test_auth) self.assertEquals(resp.status_int, 204) self.assertEquals(self.test_auth.app.calls, 5) - def test_delete_account_success_missing_storage_account(self): + def test_delete_account_success_missing_storage_account( + self): conn = FakeConn(iter([ # DELETE of storage account itself ('404 Not Found', {}, '')])) self.test_auth.get_conn = lambda x: conn self.test_auth.app = FakeApp(iter([ - # Account's container listing, checking for users + # Account's container listing, checking for + # users ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, json.dumps([ - {"name": ".services", "hash": "etag", "bytes": 112, - "content_type": "application/octet-stream", - "last_modified": "2010-12-03T17:16:27.618110"}])), - # Account's container listing, checking for users (continuation) - ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, '[]'), + {"name": ".services", "hash": "etag", "bytes": 112, + "content_type": + "application/octet-stream", + "last_modified": "2010-12-03T17:16:27.618110"}])), + # Account's container listing, checking for users + # (continuation) + ('200 Ok', + {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, '[]'), # GET the .services object - ('200 Ok', {}, json.dumps({"storage": {"default": "local", - "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}})), + ('200 Ok', {}, json.dumps( + {"storage": {"default": "local", + "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}})), # DELETE the .services object ('204 No Content', {}, ''), # DELETE the .account_id mapping object @@ -1906,32 +2286,41 @@ class TestAuth(unittest.TestCase): # DELETE the account container ('204 No Content', {}, '')])) resp = Request.blank('/auth/v2/act', - environ={'REQUEST_METHOD': 'DELETE', - 'swift.cache': FakeMemcache()}, - headers={'X-Auth-Admin-User': '.super_admin', - 'X-Auth-Admin-Key': 'supertest'} - ).get_response(self.test_auth) + environ={ + 'REQUEST_METHOD': 'DELETE', + 'swift.cache': FakeMemcache()}, + headers={ + 'X-Auth-Admin-User': + '.super_admin', + 'X-Auth-Admin-Key': 'supertest'} + ).get_response(self.test_auth) self.assertEquals(resp.status_int, 204) self.assertEquals(self.test_auth.app.calls, 6) self.assertEquals(conn.calls, 1) - def test_delete_account_success_missing_account_id_mapping(self): + def test_delete_account_success_missing_account_id_mapping( + self): conn = FakeConn(iter([ # DELETE of storage account itself ('204 No Content', {}, '')])) self.test_auth.get_conn = lambda x: conn self.test_auth.app = FakeApp(iter([ - # Account's container listing, checking for users + # Account's container listing, checking for + # users ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, json.dumps([ - {"name": ".services", "hash": "etag", "bytes": 112, - "content_type": "application/octet-stream", - "last_modified": "2010-12-03T17:16:27.618110"}])), - # Account's container listing, checking for users (continuation) - ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, '[]'), + {"name": ".services", "hash": "etag", "bytes": 112, + "content_type": + "application/octet-stream", + "last_modified": "2010-12-03T17:16:27.618110"}])), + # Account's container listing, checking for users + # (continuation) + ('200 Ok', + {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, '[]'), # GET the .services object - ('200 Ok', {}, json.dumps({"storage": {"default": "local", - "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}})), + ('200 Ok', {}, json.dumps( + {"storage": {"default": "local", + "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}})), # DELETE the .services object ('204 No Content', {}, ''), # DELETE the .account_id mapping object @@ -1939,32 +2328,41 @@ class TestAuth(unittest.TestCase): # DELETE the account container ('204 No Content', {}, '')])) resp = Request.blank('/auth/v2/act', - environ={'REQUEST_METHOD': 'DELETE', - 'swift.cache': FakeMemcache()}, - headers={'X-Auth-Admin-User': '.super_admin', - 'X-Auth-Admin-Key': 'supertest'} - ).get_response(self.test_auth) + environ={ + 'REQUEST_METHOD': 'DELETE', + 'swift.cache': FakeMemcache()}, + headers={ + 'X-Auth-Admin-User': + '.super_admin', + 'X-Auth-Admin-Key': 'supertest'} + ).get_response(self.test_auth) self.assertEquals(resp.status_int, 204) self.assertEquals(self.test_auth.app.calls, 6) self.assertEquals(conn.calls, 1) - def test_delete_account_success_missing_account_container_at_end(self): + def test_delete_account_success_missing_account_container_at_end( + self): conn = FakeConn(iter([ # DELETE of storage account itself ('204 No Content', {}, '')])) self.test_auth.get_conn = lambda x: conn self.test_auth.app = FakeApp(iter([ - # Account's container listing, checking for users + # Account's container listing, checking for + # users ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, json.dumps([ - {"name": ".services", "hash": "etag", "bytes": 112, - "content_type": "application/octet-stream", - "last_modified": "2010-12-03T17:16:27.618110"}])), - # Account's container listing, checking for users (continuation) - ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, '[]'), + {"name": ".services", "hash": "etag", "bytes": 112, + "content_type": + "application/octet-stream", + "last_modified": "2010-12-03T17:16:27.618110"}])), + # Account's container listing, checking for users + # (continuation) + ('200 Ok', + {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, '[]'), # GET the .services object - ('200 Ok', {}, json.dumps({"storage": {"default": "local", - "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}})), + ('200 Ok', {}, json.dumps( + {"storage": {"default": "local", + "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}})), # DELETE the .services object ('204 No Content', {}, ''), # DELETE the .account_id mapping object @@ -1972,11 +2370,14 @@ class TestAuth(unittest.TestCase): # DELETE the account container ('404 Not Found', {}, '')])) resp = Request.blank('/auth/v2/act', - environ={'REQUEST_METHOD': 'DELETE', - 'swift.cache': FakeMemcache()}, - headers={'X-Auth-Admin-User': '.super_admin', - 'X-Auth-Admin-Key': 'supertest'} - ).get_response(self.test_auth) + environ={ + 'REQUEST_METHOD': 'DELETE', + 'swift.cache': FakeMemcache()}, + headers={ + 'X-Auth-Admin-User': + '.super_admin', + 'X-Auth-Admin-Key': 'supertest'} + ).get_response(self.test_auth) self.assertEquals(resp.status_int, 204) self.assertEquals(self.test_auth.app.calls, 6) self.assertEquals(conn.calls, 1) @@ -1986,25 +2387,32 @@ class TestAuth(unittest.TestCase): # GET of user object ('404 Not Found', {}, '')])) resp = Request.blank('/auth/v2/act', - environ={'REQUEST_METHOD': 'DELETE', - 'swift.cache': FakeMemcache()}, - headers={'X-Auth-Admin-User': 'super:admin', - 'X-Auth-Admin-Key': 'supertest'}, - ).get_response(self.test_auth) + environ={ + 'REQUEST_METHOD': 'DELETE', + 'swift.cache': FakeMemcache()}, + headers={ + 'X-Auth-Admin-User': + 'super:admin', + 'X-Auth-Admin-Key': 'supertest'}, + ).get_response(self.test_auth) self.assertEquals(resp.status_int, 403) self.assertEquals(self.test_auth.app.calls, 1) self.test_auth.app = FakeApp(iter([ - # GET of user object (account admin, but not reseller admin) + # GET of user object (account admin, but not reseller + # admin) ('200 Ok', {}, json.dumps({"groups": [{"name": "act:adm"}, {"name": "test"}, {"name": ".admin"}], - "auth": "plaintext:key"}))])) + "auth": "plaintext:key"}))])) resp = Request.blank('/auth/v2/act', - environ={'REQUEST_METHOD': 'DELETE', - 'swift.cache': FakeMemcache()}, - headers={'X-Auth-Admin-User': 'act:adm', - 'X-Auth-Admin-Key': 'key'}, - ).get_response(self.test_auth) + environ={ + 'REQUEST_METHOD': 'DELETE', + 'swift.cache': FakeMemcache()}, + headers={ + 'X-Auth-Admin-User': + 'act:adm', + 'X-Auth-Admin-Key': 'key'}, + ).get_response(self.test_auth) self.assertEquals(resp.status_int, 403) self.assertEquals(self.test_auth.app.calls, 1) @@ -2013,178 +2421,236 @@ class TestAuth(unittest.TestCase): ('200 Ok', {}, json.dumps({"groups": [{"name": "act:usr"}, {"name": "test"}], "auth": "plaintext:key"}))])) resp = Request.blank('/auth/v2/act', - environ={'REQUEST_METHOD': 'DELETE', - 'swift.cache': FakeMemcache()}, - headers={'X-Auth-Admin-User': 'act:usr', - 'X-Auth-Admin-Key': 'key'}, - ).get_response(self.test_auth) + environ={ + 'REQUEST_METHOD': 'DELETE', + 'swift.cache': FakeMemcache()}, + headers={ + 'X-Auth-Admin-User': + 'act:usr', + 'X-Auth-Admin-Key': 'key'}, + ).get_response(self.test_auth) self.assertEquals(resp.status_int, 403) self.assertEquals(self.test_auth.app.calls, 1) def test_delete_account_fail_invalid_account_name(self): resp = Request.blank('/auth/v2/.act', - environ={'REQUEST_METHOD': 'DELETE'}, - headers={'X-Auth-Admin-User': '.super_admin', - 'X-Auth-Admin-Key': 'supertest'} - ).get_response(self.test_auth) + environ={ + 'REQUEST_METHOD': 'DELETE'}, + headers={ + 'X-Auth-Admin-User': + '.super_admin', + 'X-Auth-Admin-Key': 'supertest'} + ).get_response(self.test_auth) self.assertEquals(resp.status_int, 400) def test_delete_account_fail_not_found(self): self.test_auth.app = FakeApp(iter([ - # Account's container listing, checking for users + # Account's container listing, checking for + # users ('404 Not Found', {}, '')])) resp = Request.blank('/auth/v2/act', - environ={'REQUEST_METHOD': 'DELETE', - 'swift.cache': FakeMemcache()}, - headers={'X-Auth-Admin-User': '.super_admin', - 'X-Auth-Admin-Key': 'supertest'} - ).get_response(self.test_auth) + environ={ + 'REQUEST_METHOD': 'DELETE', + 'swift.cache': FakeMemcache()}, + headers={ + 'X-Auth-Admin-User': + '.super_admin', + 'X-Auth-Admin-Key': 'supertest'} + ).get_response(self.test_auth) self.assertEquals(resp.status_int, 404) self.assertEquals(self.test_auth.app.calls, 1) - def test_delete_account_fail_not_found_concurrency(self): + def test_delete_account_fail_not_found_concurrency( + self): self.test_auth.app = FakeApp(iter([ - # Account's container listing, checking for users + # Account's container listing, checking for + # users ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, json.dumps([ - {"name": ".services", "hash": "etag", "bytes": 112, - "content_type": "application/octet-stream", - "last_modified": "2010-12-03T17:16:27.618110"}])), - # Account's container listing, checking for users (continuation) + {"name": ".services", "hash": "etag", "bytes": 112, + "content_type": + "application/octet-stream", + "last_modified": "2010-12-03T17:16:27.618110"}])), + # Account's container listing, checking for users + # (continuation) ('404 Not Found', {}, '')])) resp = Request.blank('/auth/v2/act', - environ={'REQUEST_METHOD': 'DELETE', - 'swift.cache': FakeMemcache()}, - headers={'X-Auth-Admin-User': '.super_admin', - 'X-Auth-Admin-Key': 'supertest'} - ).get_response(self.test_auth) + environ={ + 'REQUEST_METHOD': 'DELETE', + 'swift.cache': FakeMemcache()}, + headers={ + 'X-Auth-Admin-User': + '.super_admin', + 'X-Auth-Admin-Key': 'supertest'} + ).get_response(self.test_auth) self.assertEquals(resp.status_int, 404) self.assertEquals(self.test_auth.app.calls, 2) def test_delete_account_fail_list_account(self): self.test_auth.app = FakeApp(iter([ - # Account's container listing, checking for users + # Account's container listing, checking for + # users ('503 Service Unavailable', {}, '')])) resp = Request.blank('/auth/v2/act', - environ={'REQUEST_METHOD': 'DELETE', - 'swift.cache': FakeMemcache()}, - headers={'X-Auth-Admin-User': '.super_admin', - 'X-Auth-Admin-Key': 'supertest'} - ).get_response(self.test_auth) + environ={ + 'REQUEST_METHOD': 'DELETE', + 'swift.cache': FakeMemcache()}, + headers={ + 'X-Auth-Admin-User': + '.super_admin', + 'X-Auth-Admin-Key': 'supertest'} + ).get_response(self.test_auth) self.assertEquals(resp.status_int, 500) self.assertEquals(self.test_auth.app.calls, 1) - def test_delete_account_fail_list_account_concurrency(self): + def test_delete_account_fail_list_account_concurrency( + self): self.test_auth.app = FakeApp(iter([ - # Account's container listing, checking for users + # Account's container listing, checking for + # users ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, json.dumps([ - {"name": ".services", "hash": "etag", "bytes": 112, - "content_type": "application/octet-stream", - "last_modified": "2010-12-03T17:16:27.618110"}])), - # Account's container listing, checking for users (continuation) + {"name": ".services", "hash": "etag", "bytes": 112, + "content_type": + "application/octet-stream", + "last_modified": "2010-12-03T17:16:27.618110"}])), + # Account's container listing, checking for users + # (continuation) ('503 Service Unavailable', {}, '')])) resp = Request.blank('/auth/v2/act', - environ={'REQUEST_METHOD': 'DELETE', - 'swift.cache': FakeMemcache()}, - headers={'X-Auth-Admin-User': '.super_admin', - 'X-Auth-Admin-Key': 'supertest'} - ).get_response(self.test_auth) + environ={ + 'REQUEST_METHOD': 'DELETE', + 'swift.cache': FakeMemcache()}, + headers={ + 'X-Auth-Admin-User': + '.super_admin', + 'X-Auth-Admin-Key': 'supertest'} + ).get_response(self.test_auth) self.assertEquals(resp.status_int, 500) self.assertEquals(self.test_auth.app.calls, 2) def test_delete_account_fail_has_users(self): self.test_auth.app = FakeApp(iter([ - # Account's container listing, checking for users + # Account's container listing, checking for + # users ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, json.dumps([ - {"name": ".services", "hash": "etag", "bytes": 112, - "content_type": "application/octet-stream", - "last_modified": "2010-12-03T17:16:27.618110"}, - {"name": "tester", "hash": "etag", "bytes": 104, - "content_type": "application/octet-stream", - "last_modified": "2010-12-03T17:16:27.736680"}]))])) + {"name": ".services", "hash": "etag", "bytes": 112, + "content_type": + "application/octet-stream", + "last_modified": "2010-12-03T17:16:27.618110"}, + {"name": "tester", "hash": "etag", "bytes": 104, + "content_type": + "application/octet-stream", + "last_modified": "2010-12-03T17:16:27.736680"}]))])) resp = Request.blank('/auth/v2/act', - environ={'REQUEST_METHOD': 'DELETE', - 'swift.cache': FakeMemcache()}, - headers={'X-Auth-Admin-User': '.super_admin', - 'X-Auth-Admin-Key': 'supertest'} - ).get_response(self.test_auth) + environ={ + 'REQUEST_METHOD': 'DELETE', + 'swift.cache': FakeMemcache()}, + headers={ + 'X-Auth-Admin-User': + '.super_admin', + 'X-Auth-Admin-Key': 'supertest'} + ).get_response(self.test_auth) self.assertEquals(resp.status_int, 409) self.assertEquals(self.test_auth.app.calls, 1) def test_delete_account_fail_has_users2(self): self.test_auth.app = FakeApp(iter([ - # Account's container listing, checking for users + # Account's container listing, checking for + # users ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, json.dumps([ - {"name": ".services", "hash": "etag", "bytes": 112, - "content_type": "application/octet-stream", - "last_modified": "2010-12-03T17:16:27.618110"}])), - # Account's container listing, checking for users (continuation) + {"name": ".services", "hash": "etag", "bytes": 112, + "content_type": + "application/octet-stream", + "last_modified": "2010-12-03T17:16:27.618110"}])), + # Account's container listing, checking for users + # (continuation) ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, json.dumps([ - {"name": "tester", "hash": "etag", "bytes": 104, - "content_type": "application/octet-stream", - "last_modified": "2010-12-03T17:16:27.736680"}]))])) + {"name": "tester", "hash": "etag", "bytes": 104, + "content_type": + "application/octet-stream", + "last_modified": "2010-12-03T17:16:27.736680"}]))])) resp = Request.blank('/auth/v2/act', - environ={'REQUEST_METHOD': 'DELETE', - 'swift.cache': FakeMemcache()}, - headers={'X-Auth-Admin-User': '.super_admin', - 'X-Auth-Admin-Key': 'supertest'} - ).get_response(self.test_auth) + environ={ + 'REQUEST_METHOD': 'DELETE', + 'swift.cache': FakeMemcache()}, + headers={ + 'X-Auth-Admin-User': + '.super_admin', + 'X-Auth-Admin-Key': 'supertest'} + ).get_response(self.test_auth) self.assertEquals(resp.status_int, 409) self.assertEquals(self.test_auth.app.calls, 2) def test_delete_account_fail_get_services(self): self.test_auth.app = FakeApp(iter([ - # Account's container listing, checking for users + # Account's container listing, checking for + # users ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, json.dumps([ - {"name": ".services", "hash": "etag", "bytes": 112, - "content_type": "application/octet-stream", - "last_modified": "2010-12-03T17:16:27.618110"}])), - # Account's container listing, checking for users (continuation) - ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, '[]'), + {"name": ".services", "hash": "etag", "bytes": 112, + "content_type": + "application/octet-stream", + "last_modified": "2010-12-03T17:16:27.618110"}])), + # Account's container listing, checking for users + # (continuation) + ('200 Ok', + {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, '[]'), # GET the .services object ('503 Service Unavailable', {}, '')])) resp = Request.blank('/auth/v2/act', - environ={'REQUEST_METHOD': 'DELETE', - 'swift.cache': FakeMemcache()}, - headers={'X-Auth-Admin-User': '.super_admin', - 'X-Auth-Admin-Key': 'supertest'} - ).get_response(self.test_auth) + environ={ + 'REQUEST_METHOD': 'DELETE', + 'swift.cache': FakeMemcache()}, + headers={ + 'X-Auth-Admin-User': + '.super_admin', + 'X-Auth-Admin-Key': 'supertest'} + ).get_response(self.test_auth) self.assertEquals(resp.status_int, 500) self.assertEquals(self.test_auth.app.calls, 3) - def test_delete_account_fail_delete_storage_account(self): + def test_delete_account_fail_delete_storage_account( + self): conn = FakeConn(iter([ # DELETE of storage account itself ('409 Conflict', {}, '')])) self.test_auth.get_conn = lambda x: conn self.test_auth.app = FakeApp(iter([ - # Account's container listing, checking for users + # Account's container listing, checking for + # users ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, json.dumps([ - {"name": ".services", "hash": "etag", "bytes": 112, - "content_type": "application/octet-stream", - "last_modified": "2010-12-03T17:16:27.618110"}])), - # Account's container listing, checking for users (continuation) - ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, '[]'), + {"name": ".services", "hash": "etag", "bytes": 112, + "content_type": + "application/octet-stream", + "last_modified": "2010-12-03T17:16:27.618110"}])), + # Account's container listing, checking for users + # (continuation) + ('200 Ok', + {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, '[]'), # GET the .services object - ('200 Ok', {}, json.dumps({"storage": {"default": "local", - "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}}))])) + ('200 Ok', {}, json.dumps( + {"storage": {"default": "local", + "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}}))])) resp = Request.blank('/auth/v2/act', - environ={'REQUEST_METHOD': 'DELETE', - 'swift.cache': FakeMemcache()}, - headers={'X-Auth-Admin-User': '.super_admin', - 'X-Auth-Admin-Key': 'supertest'} - ).get_response(self.test_auth) + environ={ + 'REQUEST_METHOD': 'DELETE', + 'swift.cache': FakeMemcache()}, + headers={ + 'X-Auth-Admin-User': + '.super_admin', + 'X-Auth-Admin-Key': 'supertest'} + ).get_response(self.test_auth) self.assertEquals(resp.status_int, 409) self.assertEquals(self.test_auth.app.calls, 3) self.assertEquals(conn.calls, 1) - def test_delete_account_fail_delete_storage_account2(self): + def test_delete_account_fail_delete_storage_account2( + self): conn = FakeConn(iter([ # DELETE of storage account itself ('204 No Content', {}, ''), @@ -2192,56 +2658,74 @@ class TestAuth(unittest.TestCase): ('409 Conflict', {}, '')])) self.test_auth.get_conn = lambda x: conn self.test_auth.app = FakeApp(iter([ - # Account's container listing, checking for users + # Account's container listing, checking for + # users ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, json.dumps([ - {"name": ".services", "hash": "etag", "bytes": 112, - "content_type": "application/octet-stream", - "last_modified": "2010-12-03T17:16:27.618110"}])), - # Account's container listing, checking for users (continuation) - ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, '[]'), + {"name": ".services", "hash": "etag", "bytes": 112, + "content_type": + "application/octet-stream", + "last_modified": "2010-12-03T17:16:27.618110"}])), + # Account's container listing, checking for users + # (continuation) + ('200 Ok', + {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, '[]'), # GET the .services object - ('200 Ok', {}, json.dumps({"storage": {"default": "local", - "local": "http://127.0.0.1:8080/v1/AUTH_cfa", - "other": "http://127.0.0.1:8080/v1/AUTH_cfa2"}}))])) + ('200 Ok', {}, json.dumps( + {"storage": {"default": "local", + "local": "http://127.0.0.1:8080/v1/AUTH_cfa", + "other": "http://127.0.0.1:8080/v1/AUTH_cfa2"}}))])) resp = Request.blank('/auth/v2/act', - environ={'REQUEST_METHOD': 'DELETE', - 'swift.cache': FakeMemcache()}, - headers={'X-Auth-Admin-User': '.super_admin', - 'X-Auth-Admin-Key': 'supertest'} - ).get_response(self.test_auth) + environ={ + 'REQUEST_METHOD': 'DELETE', + 'swift.cache': FakeMemcache()}, + headers={ + 'X-Auth-Admin-User': + '.super_admin', + 'X-Auth-Admin-Key': 'supertest'} + ).get_response(self.test_auth) self.assertEquals(resp.status_int, 500) self.assertEquals(self.test_auth.app.calls, 3) self.assertEquals(conn.calls, 2) - def test_delete_account_fail_delete_storage_account3(self): + def test_delete_account_fail_delete_storage_account3( + self): conn = FakeConn(iter([ # DELETE of storage account itself ('503 Service Unavailable', {}, '')])) self.test_auth.get_conn = lambda x: conn self.test_auth.app = FakeApp(iter([ - # Account's container listing, checking for users + # Account's container listing, checking for + # users ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, json.dumps([ - {"name": ".services", "hash": "etag", "bytes": 112, - "content_type": "application/octet-stream", - "last_modified": "2010-12-03T17:16:27.618110"}])), - # Account's container listing, checking for users (continuation) - ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, '[]'), + {"name": ".services", "hash": "etag", "bytes": 112, + "content_type": + "application/octet-stream", + "last_modified": "2010-12-03T17:16:27.618110"}])), + # Account's container listing, checking for users + # (continuation) + ('200 Ok', + {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, '[]'), # GET the .services object - ('200 Ok', {}, json.dumps({"storage": {"default": "local", - "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}}))])) + ('200 Ok', {}, json.dumps( + {"storage": {"default": "local", + "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}}))])) resp = Request.blank('/auth/v2/act', - environ={'REQUEST_METHOD': 'DELETE', - 'swift.cache': FakeMemcache()}, - headers={'X-Auth-Admin-User': '.super_admin', - 'X-Auth-Admin-Key': 'supertest'} - ).get_response(self.test_auth) + environ={ + 'REQUEST_METHOD': 'DELETE', + 'swift.cache': FakeMemcache()}, + headers={ + 'X-Auth-Admin-User': + '.super_admin', + 'X-Auth-Admin-Key': 'supertest'} + ).get_response(self.test_auth) self.assertEquals(resp.status_int, 500) self.assertEquals(self.test_auth.app.calls, 3) self.assertEquals(conn.calls, 1) - def test_delete_account_fail_delete_storage_account4(self): + def test_delete_account_fail_delete_storage_account4( + self): conn = FakeConn(iter([ # DELETE of storage account itself ('204 No Content', {}, ''), @@ -2249,24 +2733,32 @@ class TestAuth(unittest.TestCase): ('503 Service Unavailable', {}, '')])) self.test_auth.get_conn = lambda x: conn self.test_auth.app = FakeApp(iter([ - # Account's container listing, checking for users + # Account's container listing, checking for + # users ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, json.dumps([ - {"name": ".services", "hash": "etag", "bytes": 112, - "content_type": "application/octet-stream", - "last_modified": "2010-12-03T17:16:27.618110"}])), - # Account's container listing, checking for users (continuation) - ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, '[]'), + {"name": ".services", "hash": "etag", "bytes": 112, + "content_type": + "application/octet-stream", + "last_modified": "2010-12-03T17:16:27.618110"}])), + # Account's container listing, checking for users + # (continuation) + ('200 Ok', + {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, '[]'), # GET the .services object - ('200 Ok', {}, json.dumps({"storage": {"default": "local", - "local": "http://127.0.0.1:8080/v1/AUTH_cfa", - "other": "http://127.0.0.1:8080/v1/AUTH_cfa2"}}))])) + ('200 Ok', {}, json.dumps( + {"storage": {"default": "local", + "local": "http://127.0.0.1:8080/v1/AUTH_cfa", + "other": "http://127.0.0.1:8080/v1/AUTH_cfa2"}}))])) resp = Request.blank('/auth/v2/act', - environ={'REQUEST_METHOD': 'DELETE', - 'swift.cache': FakeMemcache()}, - headers={'X-Auth-Admin-User': '.super_admin', - 'X-Auth-Admin-Key': 'supertest'} - ).get_response(self.test_auth) + environ={ + 'REQUEST_METHOD': 'DELETE', + 'swift.cache': FakeMemcache()}, + headers={ + 'X-Auth-Admin-User': + '.super_admin', + 'X-Auth-Admin-Key': 'supertest'} + ).get_response(self.test_auth) self.assertEquals(resp.status_int, 500) self.assertEquals(self.test_auth.app.calls, 3) self.assertEquals(conn.calls, 2) @@ -2277,77 +2769,100 @@ class TestAuth(unittest.TestCase): ('204 No Content', {}, '')])) self.test_auth.get_conn = lambda x: conn self.test_auth.app = FakeApp(iter([ - # Account's container listing, checking for users + # Account's container listing, checking for + # users ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, json.dumps([ - {"name": ".services", "hash": "etag", "bytes": 112, - "content_type": "application/octet-stream", - "last_modified": "2010-12-03T17:16:27.618110"}])), - # Account's container listing, checking for users (continuation) - ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, '[]'), + {"name": ".services", "hash": "etag", "bytes": 112, + "content_type": + "application/octet-stream", + "last_modified": "2010-12-03T17:16:27.618110"}])), + # Account's container listing, checking for users + # (continuation) + ('200 Ok', + {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, '[]'), # GET the .services object - ('200 Ok', {}, json.dumps({"storage": {"default": "local", - "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}})), + ('200 Ok', {}, json.dumps( + {"storage": {"default": "local", + "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}})), # DELETE the .services object ('503 Service Unavailable', {}, '')])) resp = Request.blank('/auth/v2/act', - environ={'REQUEST_METHOD': 'DELETE', - 'swift.cache': FakeMemcache()}, - headers={'X-Auth-Admin-User': '.super_admin', - 'X-Auth-Admin-Key': 'supertest'} - ).get_response(self.test_auth) + environ={ + 'REQUEST_METHOD': 'DELETE', + 'swift.cache': FakeMemcache()}, + headers={ + 'X-Auth-Admin-User': + '.super_admin', + 'X-Auth-Admin-Key': 'supertest'} + ).get_response(self.test_auth) self.assertEquals(resp.status_int, 500) self.assertEquals(self.test_auth.app.calls, 4) self.assertEquals(conn.calls, 1) - def test_delete_account_fail_delete_account_id_mapping(self): + def test_delete_account_fail_delete_account_id_mapping( + self): conn = FakeConn(iter([ # DELETE of storage account itself ('204 No Content', {}, '')])) self.test_auth.get_conn = lambda x: conn self.test_auth.app = FakeApp(iter([ - # Account's container listing, checking for users + # Account's container listing, checking for + # users ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, json.dumps([ - {"name": ".services", "hash": "etag", "bytes": 112, - "content_type": "application/octet-stream", - "last_modified": "2010-12-03T17:16:27.618110"}])), - # Account's container listing, checking for users (continuation) - ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, '[]'), + {"name": ".services", "hash": "etag", "bytes": 112, + "content_type": + "application/octet-stream", + "last_modified": "2010-12-03T17:16:27.618110"}])), + # Account's container listing, checking for users + # (continuation) + ('200 Ok', + {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, '[]'), # GET the .services object - ('200 Ok', {}, json.dumps({"storage": {"default": "local", - "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}})), + ('200 Ok', {}, json.dumps( + {"storage": {"default": "local", + "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}})), # DELETE the .services object ('204 No Content', {}, ''), # DELETE the .account_id mapping object ('503 Service Unavailable', {}, '')])) resp = Request.blank('/auth/v2/act', - environ={'REQUEST_METHOD': 'DELETE', - 'swift.cache': FakeMemcache()}, - headers={'X-Auth-Admin-User': '.super_admin', - 'X-Auth-Admin-Key': 'supertest'} - ).get_response(self.test_auth) + environ={ + 'REQUEST_METHOD': 'DELETE', + 'swift.cache': FakeMemcache()}, + headers={ + 'X-Auth-Admin-User': + '.super_admin', + 'X-Auth-Admin-Key': 'supertest'} + ).get_response(self.test_auth) self.assertEquals(resp.status_int, 500) self.assertEquals(self.test_auth.app.calls, 5) self.assertEquals(conn.calls, 1) - def test_delete_account_fail_delete_account_container(self): + def test_delete_account_fail_delete_account_container( + self): conn = FakeConn(iter([ # DELETE of storage account itself ('204 No Content', {}, '')])) self.test_auth.get_conn = lambda x: conn self.test_auth.app = FakeApp(iter([ - # Account's container listing, checking for users + # Account's container listing, checking for + # users ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, json.dumps([ - {"name": ".services", "hash": "etag", "bytes": 112, - "content_type": "application/octet-stream", - "last_modified": "2010-12-03T17:16:27.618110"}])), - # Account's container listing, checking for users (continuation) - ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, '[]'), + {"name": ".services", "hash": "etag", "bytes": 112, + "content_type": + "application/octet-stream", + "last_modified": "2010-12-03T17:16:27.618110"}])), + # Account's container listing, checking for users + # (continuation) + ('200 Ok', + {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, '[]'), # GET the .services object - ('200 Ok', {}, json.dumps({"storage": {"default": "local", - "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}})), + ('200 Ok', {}, json.dumps( + {"storage": {"default": "local", + "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}})), # DELETE the .services object ('204 No Content', {}, ''), # DELETE the .account_id mapping object @@ -2355,11 +2870,14 @@ class TestAuth(unittest.TestCase): # DELETE the account container ('503 Service Unavailable', {}, '')])) resp = Request.blank('/auth/v2/act', - environ={'REQUEST_METHOD': 'DELETE', - 'swift.cache': FakeMemcache()}, - headers={'X-Auth-Admin-User': '.super_admin', - 'X-Auth-Admin-Key': 'supertest'} - ).get_response(self.test_auth) + environ={ + 'REQUEST_METHOD': 'DELETE', + 'swift.cache': FakeMemcache()}, + headers={ + 'X-Auth-Admin-User': + '.super_admin', + 'X-Auth-Admin-Key': 'supertest'} + ).get_response(self.test_auth) self.assertEquals(resp.status_int, 500) self.assertEquals(self.test_auth.app.calls, 6) self.assertEquals(conn.calls, 1) @@ -2372,9 +2890,11 @@ class TestAuth(unittest.TestCase): {"name": ".admin"}], "auth": "plaintext:key"}))])) resp = Request.blank('/auth/v2/act/usr', - headers={'X-Auth-Admin-User': '.super_admin', - 'X-Auth-Admin-Key': 'supertest'} - ).get_response(self.test_auth) + headers={ + 'X-Auth-Admin-User': + '.super_admin', + 'X-Auth-Admin-Key': 'supertest'} + ).get_response(self.test_auth) self.assertEquals(resp.status_int, 200) self.assertEquals(resp.body, json.dumps( {"groups": [{"name": "act:usr"}, {"name": "act"}, @@ -2384,15 +2904,18 @@ class TestAuth(unittest.TestCase): def test_get_user_fail_no_super_admin_key(self): local_auth = auth.filter_factory({})(FakeApp(iter([ - # GET of user object (but we should never get here) + # GET of user object (but we should never get + # here) ('200 Ok', {}, json.dumps( {"groups": [{"name": "act:usr"}, {"name": "act"}, {"name": ".admin"}], "auth": "plaintext:key"}))]))) resp = Request.blank('/auth/v2/act/usr', - headers={'X-Auth-Admin-User': '.super_admin', - 'X-Auth-Admin-Key': 'supertest'} - ).get_response(local_auth) + headers={ + 'X-Auth-Admin-User': + '.super_admin', + 'X-Auth-Admin-Key': 'supertest'} + ).get_response(local_auth) self.assertEquals(resp.status_int, 404) self.assertEquals(local_auth.app.calls, 0) @@ -2401,15 +2924,18 @@ class TestAuth(unittest.TestCase): # GET of account container (list objects) ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, json.dumps([ - {"name": ".services", "hash": "etag", "bytes": 112, - "content_type": "application/octet-stream", - "last_modified": "2010-12-03T17:16:27.618110"}, - {"name": "tester", "hash": "etag", "bytes": 104, - "content_type": "application/octet-stream", - "last_modified": "2010-12-03T17:16:27.736680"}, - {"name": "tester3", "hash": "etag", "bytes": 86, - "content_type": "application/octet-stream", - "last_modified": "2010-12-03T17:16:28.135530"}])), + {"name": ".services", "hash": "etag", "bytes": 112, + "content_type": + "application/octet-stream", + "last_modified": "2010-12-03T17:16:27.618110"}, + {"name": "tester", "hash": "etag", "bytes": 104, + "content_type": + "application/octet-stream", + "last_modified": "2010-12-03T17:16:27.736680"}, + {"name": "tester3", "hash": "etag", "bytes": 86, + "content_type": + "application/octet-stream", + "last_modified": "2010-12-03T17:16:28.135530"}])), # GET of user object ('200 Ok', {}, json.dumps( {"groups": [{"name": "act:tester"}, {"name": "act"}, @@ -2419,12 +2945,15 @@ class TestAuth(unittest.TestCase): ('200 Ok', {}, json.dumps( {"groups": [{"name": "act:tester3"}, {"name": "act"}], "auth": "plaintext:key3"})), - # GET of account container (list objects continuation) + # GET of account container (list objects + # continuation) ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, '[]')])) resp = Request.blank('/auth/v2/act/.groups', - headers={'X-Auth-Admin-User': '.super_admin', - 'X-Auth-Admin-Key': 'supertest'} - ).get_response(self.test_auth) + headers={ + 'X-Auth-Admin-User': + '.super_admin', + 'X-Auth-Admin-Key': 'supertest'} + ).get_response(self.test_auth) self.assertEquals(resp.status_int, 200) self.assertEquals(resp.body, json.dumps( {"groups": [{"name": ".admin"}, {"name": "act"}, @@ -2436,33 +2965,40 @@ class TestAuth(unittest.TestCase): # GET of account container (list objects) ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, json.dumps([ - {"name": ".services", "hash": "etag", "bytes": 112, - "content_type": "application/octet-stream", - "last_modified": "2010-12-03T17:16:27.618110"}, - {"name": "tester", "hash": "etag", "bytes": 104, - "content_type": "application/octet-stream", - "last_modified": "2010-12-03T17:16:27.736680"}])), + {"name": ".services", "hash": "etag", "bytes": 112, + "content_type": + "application/octet-stream", + "last_modified": "2010-12-03T17:16:27.618110"}, + {"name": "tester", "hash": "etag", "bytes": 104, + "content_type": + "application/octet-stream", + "last_modified": "2010-12-03T17:16:27.736680"}])), # GET of user object ('200 Ok', {}, json.dumps( {"groups": [{"name": "act:tester"}, {"name": "act"}, {"name": ".admin"}], "auth": "plaintext:key"})), - # GET of account container (list objects continuation) + # GET of account container (list objects + # continuation) ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, json.dumps([ - {"name": "tester3", "hash": "etag", "bytes": 86, - "content_type": "application/octet-stream", - "last_modified": "2010-12-03T17:16:28.135530"}])), + {"name": "tester3", "hash": "etag", "bytes": 86, + "content_type": + "application/octet-stream", + "last_modified": "2010-12-03T17:16:28.135530"}])), # GET of user object ('200 Ok', {}, json.dumps( {"groups": [{"name": "act:tester3"}, {"name": "act"}], "auth": "plaintext:key3"})), - # GET of account container (list objects continuation) + # GET of account container (list objects + # continuation) ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, '[]')])) resp = Request.blank('/auth/v2/act/.groups', - headers={'X-Auth-Admin-User': '.super_admin', - 'X-Auth-Admin-Key': 'supertest'} - ).get_response(self.test_auth) + headers={ + 'X-Auth-Admin-User': + '.super_admin', + 'X-Auth-Admin-Key': 'supertest'} + ).get_response(self.test_auth) self.assertEquals(resp.status_int, 200) self.assertEquals(resp.body, json.dumps( {"groups": [{"name": ".admin"}, {"name": "act"}, @@ -2471,16 +3007,20 @@ class TestAuth(unittest.TestCase): def test_get_user_fail_invalid_account(self): resp = Request.blank('/auth/v2/.invalid/usr', - headers={'X-Auth-Admin-User': '.super_admin', - 'X-Auth-Admin-Key': 'supertest'} - ).get_response(self.test_auth) + headers={ + 'X-Auth-Admin-User': + '.super_admin', + 'X-Auth-Admin-Key': 'supertest'} + ).get_response(self.test_auth) self.assertEquals(resp.status_int, 400) def test_get_user_fail_invalid_user(self): resp = Request.blank('/auth/v2/act/.invalid', - headers={'X-Auth-Admin-User': '.super_admin', - 'X-Auth-Admin-Key': 'supertest'} - ).get_response(self.test_auth) + headers={ + 'X-Auth-Admin-User': + '.super_admin', + 'X-Auth-Admin-Key': 'supertest'} + ).get_response(self.test_auth) self.assertEquals(resp.status_int, 400) def test_get_user_fail_bad_creds(self): @@ -2488,9 +3028,11 @@ class TestAuth(unittest.TestCase): # GET of user object ('404 Not Found', {}, '')])) resp = Request.blank('/auth/v2/act/usr', - headers={'X-Auth-Admin-User': 'super:admin', - 'X-Auth-Admin-Key': 'supertest'}, - ).get_response(self.test_auth) + headers={ + 'X-Auth-Admin-User': + 'super:admin', + 'X-Auth-Admin-Key': 'supertest'}, + ).get_response(self.test_auth) self.assertEquals(resp.status_int, 403) self.assertEquals(self.test_auth.app.calls, 1) @@ -2499,91 +3041,110 @@ class TestAuth(unittest.TestCase): ('200 Ok', {}, json.dumps({"groups": [{"name": "act:usr"}, {"name": "test"}], "auth": "plaintext:key"}))])) resp = Request.blank('/auth/v2/act/usr', - headers={'X-Auth-Admin-User': 'act:usr', - 'X-Auth-Admin-Key': 'key'}, - ).get_response(self.test_auth) + headers={ + 'X-Auth-Admin-User': + 'act:usr', + 'X-Auth-Admin-Key': 'key'}, + ).get_response(self.test_auth) self.assertEquals(resp.status_int, 403) self.assertEquals(self.test_auth.app.calls, 1) def test_get_user_account_admin_success(self): self.test_auth.app = FakeApp(iter([ - # GET of user object (account admin, but not reseller admin) + # GET of user object (account admin, but not reseller + # admin) ('200 Ok', {}, json.dumps({"groups": [{"name": "act:adm"}, {"name": "test"}, {"name": ".admin"}], - "auth": "plaintext:key"})), + "auth": "plaintext:key"})), # GET of requested user object ('200 Ok', {}, json.dumps( {"groups": [{"name": "act:usr"}, {"name": "act"}], "auth": "plaintext:key"}))])) resp = Request.blank('/auth/v2/act/usr', - headers={'X-Auth-Admin-User': 'act:adm', - 'X-Auth-Admin-Key': 'key'} - ).get_response(self.test_auth) + headers={ + 'X-Auth-Admin-User': + 'act:adm', + 'X-Auth-Admin-Key': 'key'} + ).get_response(self.test_auth) self.assertEquals(resp.status_int, 200) self.assertEquals(resp.body, json.dumps( {"groups": [{"name": "act:usr"}, {"name": "act"}], "auth": "plaintext:key"})) self.assertEquals(self.test_auth.app.calls, 2) - def test_get_user_account_admin_fail_getting_account_admin(self): + def test_get_user_account_admin_fail_getting_account_admin( + self): self.test_auth.app = FakeApp(iter([ # GET of user object (account admin check) ('200 Ok', {}, json.dumps({"groups": [{"name": "act:adm"}, {"name": "test"}, {"name": ".admin"}], - "auth": "plaintext:key"})), - # GET of requested user object [who is an .admin as well] + "auth": "plaintext:key"})), + # GET of requested user object [who is an .admin + # as well] ('200 Ok', {}, json.dumps( {"groups": [{"name": "act:usr"}, {"name": "act"}, {"name": ".admin"}], "auth": "plaintext:key"})), - # GET of user object (reseller admin check [and fail here]) + # GET of user object (reseller admin check [and fail + # here]) ('200 Ok', {}, json.dumps({"groups": [{"name": "act:adm"}, {"name": "test"}, {"name": ".admin"}], - "auth": "plaintext:key"}))])) + "auth": "plaintext:key"}))])) resp = Request.blank('/auth/v2/act/usr', - headers={'X-Auth-Admin-User': 'act:adm', - 'X-Auth-Admin-Key': 'key'} - ).get_response(self.test_auth) + headers={ + 'X-Auth-Admin-User': + 'act:adm', + 'X-Auth-Admin-Key': 'key'} + ).get_response(self.test_auth) self.assertEquals(resp.status_int, 403) self.assertEquals(self.test_auth.app.calls, 3) - def test_get_user_account_admin_fail_getting_reseller_admin(self): + def test_get_user_account_admin_fail_getting_reseller_admin( + self): self.test_auth.app = FakeApp(iter([ # GET of user object (account admin check) ('200 Ok', {}, json.dumps({"groups": [{"name": "act:adm"}, {"name": "test"}, {"name": ".admin"}], - "auth": "plaintext:key"})), - # GET of requested user object [who is a .reseller_admin] + "auth": "plaintext:key"})), + # GET of requested user object [who is a + # .reseller_admin] ('200 Ok', {}, json.dumps( {"groups": [{"name": "act:usr"}, {"name": "act"}, {"name": ".reseller_admin"}], "auth": "plaintext:key"}))])) resp = Request.blank('/auth/v2/act/usr', - headers={'X-Auth-Admin-User': 'act:adm', - 'X-Auth-Admin-Key': 'key'} - ).get_response(self.test_auth) + headers={ + 'X-Auth-Admin-User': + 'act:adm', + 'X-Auth-Admin-Key': 'key'} + ).get_response(self.test_auth) self.assertEquals(resp.status_int, 403) self.assertEquals(self.test_auth.app.calls, 2) - def test_get_user_reseller_admin_fail_getting_reseller_admin(self): + def test_get_user_reseller_admin_fail_getting_reseller_admin( + self): self.test_auth.app = FakeApp(iter([ # GET of user object (account admin check) ('200 Ok', {}, json.dumps({"groups": [{"name": "act:adm"}, {"name": "test"}, {"name": ".reseller_admin"}], - "auth": "plaintext:key"})), - # GET of requested user object [who also is a .reseller_admin] + "auth": "plaintext:key"})), + # GET of requested user object [who also is a + # .reseller_admin] ('200 Ok', {}, json.dumps( {"groups": [{"name": "act:usr"}, {"name": "act"}, {"name": ".reseller_admin"}], "auth": "plaintext:key"}))])) resp = Request.blank('/auth/v2/act/usr', - headers={'X-Auth-Admin-User': 'act:adm', - 'X-Auth-Admin-Key': 'key'} - ).get_response(self.test_auth) + headers={ + 'X-Auth-Admin-User': + 'act:adm', + 'X-Auth-Admin-Key': 'key'} + ).get_response(self.test_auth) self.assertEquals(resp.status_int, 403) self.assertEquals(self.test_auth.app.calls, 2) - def test_get_user_super_admin_succeed_getting_reseller_admin(self): + def test_get_user_super_admin_succeed_getting_reseller_admin( + self): self.test_auth.app = FakeApp(iter([ # GET of requested user object ('200 Ok', {}, json.dumps( @@ -2591,9 +3152,11 @@ class TestAuth(unittest.TestCase): {"name": ".reseller_admin"}], "auth": "plaintext:key"}))])) resp = Request.blank('/auth/v2/act/usr', - headers={'X-Auth-Admin-User': '.super_admin', - 'X-Auth-Admin-Key': 'supertest'} - ).get_response(self.test_auth) + headers={ + 'X-Auth-Admin-User': + '.super_admin', + 'X-Auth-Admin-Key': 'supertest'} + ).get_response(self.test_auth) self.assertEquals(resp.status_int, 200) self.assertEquals(resp.body, json.dumps( {"groups": [{"name": "act:usr"}, {"name": "act"}, @@ -2606,9 +3169,11 @@ class TestAuth(unittest.TestCase): # GET of account container (list objects) ('404 Not Found', {}, '')])) resp = Request.blank('/auth/v2/act/.groups', - headers={'X-Auth-Admin-User': '.super_admin', - 'X-Auth-Admin-Key': 'supertest'} - ).get_response(self.test_auth) + headers={ + 'X-Auth-Admin-User': + '.super_admin', + 'X-Auth-Admin-Key': 'supertest'} + ).get_response(self.test_auth) self.assertEquals(resp.status_int, 404) self.assertEquals(self.test_auth.app.calls, 1) @@ -2617,9 +3182,11 @@ class TestAuth(unittest.TestCase): # GET of account container (list objects) ('503 Service Unavailable', {}, '')])) resp = Request.blank('/auth/v2/act/.groups', - headers={'X-Auth-Admin-User': '.super_admin', - 'X-Auth-Admin-Key': 'supertest'} - ).get_response(self.test_auth) + headers={ + 'X-Auth-Admin-User': + '.super_admin', + 'X-Auth-Admin-Key': 'supertest'} + ).get_response(self.test_auth) self.assertEquals(resp.status_int, 500) self.assertEquals(self.test_auth.app.calls, 1) @@ -2628,21 +3195,26 @@ class TestAuth(unittest.TestCase): # GET of account container (list objects) ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, json.dumps([ - {"name": ".services", "hash": "etag", "bytes": 112, - "content_type": "application/octet-stream", - "last_modified": "2010-12-03T17:16:27.618110"}, - {"name": "tester", "hash": "etag", "bytes": 104, - "content_type": "application/octet-stream", - "last_modified": "2010-12-03T17:16:27.736680"}, - {"name": "tester3", "hash": "etag", "bytes": 86, - "content_type": "application/octet-stream", - "last_modified": "2010-12-03T17:16:28.135530"}])), + {"name": ".services", "hash": "etag", "bytes": 112, + "content_type": + "application/octet-stream", + "last_modified": "2010-12-03T17:16:27.618110"}, + {"name": "tester", "hash": "etag", "bytes": 104, + "content_type": + "application/octet-stream", + "last_modified": "2010-12-03T17:16:27.736680"}, + {"name": "tester3", "hash": "etag", "bytes": 86, + "content_type": + "application/octet-stream", + "last_modified": "2010-12-03T17:16:28.135530"}])), # GET of user object ('503 Service Unavailable', {}, '')])) resp = Request.blank('/auth/v2/act/.groups', - headers={'X-Auth-Admin-User': '.super_admin', - 'X-Auth-Admin-Key': 'supertest'} - ).get_response(self.test_auth) + headers={ + 'X-Auth-Admin-User': + '.super_admin', + 'X-Auth-Admin-Key': 'supertest'} + ).get_response(self.test_auth) self.assertEquals(resp.status_int, 500) self.assertEquals(self.test_auth.app.calls, 2) @@ -2651,9 +3223,11 @@ class TestAuth(unittest.TestCase): # GET of user object ('404 Not Found', {}, '')])) resp = Request.blank('/auth/v2/act/usr', - headers={'X-Auth-Admin-User': '.super_admin', - 'X-Auth-Admin-Key': 'supertest'} - ).get_response(self.test_auth) + headers={ + 'X-Auth-Admin-User': + '.super_admin', + 'X-Auth-Admin-Key': 'supertest'} + ).get_response(self.test_auth) self.assertEquals(resp.status_int, 404) self.assertEquals(self.test_auth.app.calls, 1) @@ -2662,100 +3236,138 @@ class TestAuth(unittest.TestCase): # GET of user object ('503 Service Unavailable', {}, '')])) resp = Request.blank('/auth/v2/act/usr', - headers={'X-Auth-Admin-User': '.super_admin', - 'X-Auth-Admin-Key': 'supertest', - 'X-Auth-User-Key': 'key'} - ).get_response(self.test_auth) + headers={ + 'X-Auth-Admin-User': + '.super_admin', + 'X-Auth-Admin-Key': + 'supertest', + 'X-Auth-User-Key': 'key'} + ).get_response(self.test_auth) self.assertEquals(resp.status_int, 500) self.assertEquals(self.test_auth.app.calls, 1) def test_put_user_fail_invalid_account(self): resp = Request.blank('/auth/v2/.invalid/usr', - environ={'REQUEST_METHOD': 'PUT'}, - headers={'X-Auth-Admin-User': '.super_admin', - 'X-Auth-Admin-Key': 'supertest', - 'X-Auth-User-Key': 'key'} - ).get_response(self.test_auth) + environ={ + 'REQUEST_METHOD': 'PUT'}, + headers={ + 'X-Auth-Admin-User': + '.super_admin', + 'X-Auth-Admin-Key': + 'supertest', + 'X-Auth-User-Key': 'key'} + ).get_response(self.test_auth) self.assertEquals(resp.status_int, 400) def test_put_user_fail_invalid_user(self): resp = Request.blank('/auth/v2/act/.usr', - environ={'REQUEST_METHOD': 'PUT'}, - headers={'X-Auth-Admin-User': '.super_admin', - 'X-Auth-Admin-Key': 'supertest', - 'X-Auth-User-Key': 'key'} - ).get_response(self.test_auth) + environ={ + 'REQUEST_METHOD': 'PUT'}, + headers={ + 'X-Auth-Admin-User': + '.super_admin', + 'X-Auth-Admin-Key': + 'supertest', + 'X-Auth-User-Key': 'key'} + ).get_response(self.test_auth) self.assertEquals(resp.status_int, 400) def test_put_user_fail_no_user_key(self): resp = Request.blank('/auth/v2/act/usr', - environ={'REQUEST_METHOD': 'PUT'}, - headers={'X-Auth-Admin-User': '.super_admin', - 'X-Auth-Admin-Key': 'supertest'} - ).get_response(self.test_auth) + environ={ + 'REQUEST_METHOD': 'PUT'}, + headers={ + 'X-Auth-Admin-User': + '.super_admin', + 'X-Auth-Admin-Key': 'supertest'} + ).get_response(self.test_auth) self.assertEquals(resp.status_int, 400) def test_put_user_reseller_admin_fail_bad_creds(self): self.test_auth.app = FakeApp(iter([ # GET of user object (reseller admin) - # This shouldn't actually get called, checked below + # This shouldn't actually get called, checked + # below ('200 Ok', {}, json.dumps({"groups": [{"name": "act:rdm"}, {"name": "test"}, {"name": ".admin"}, {"name": ".reseller_admin"}], "auth": "plaintext:key"}))])) resp = Request.blank('/auth/v2/act/usr', - environ={'REQUEST_METHOD': 'PUT'}, - headers={'X-Auth-Admin-User': 'act:rdm', - 'X-Auth-Admin-Key': 'key', - 'X-Auth-User-Key': 'key', - 'X-Auth-User-Reseller-Admin': 'true'} - ).get_response(self.test_auth) + environ={ + 'REQUEST_METHOD': 'PUT'}, + headers={ + 'X-Auth-Admin-User': + 'act:rdm', + 'X-Auth-Admin-Key': + 'key', + 'X-Auth-User-Key': + 'key', + 'X-Auth-User-Reseller-Admin': 'true'} + ).get_response(self.test_auth) self.assertEquals(resp.status_int, 403) self.assertEquals(self.test_auth.app.calls, 0) self.test_auth.app = FakeApp(iter([ # GET of user object (account admin, but not reseller admin) - # This shouldn't actually get called, checked below + # This shouldn't actually get called, checked + # below ('200 Ok', {}, json.dumps({"groups": [{"name": "act:adm"}, {"name": "test"}, {"name": ".admin"}], - "auth": "plaintext:key"}))])) + "auth": "plaintext:key"}))])) resp = Request.blank('/auth/v2/act/usr', - environ={'REQUEST_METHOD': 'PUT'}, - headers={'X-Auth-Admin-User': 'act:adm', - 'X-Auth-Admin-Key': 'key', - 'X-Auth-User-Key': 'key', - 'X-Auth-User-Reseller-Admin': 'true'} - ).get_response(self.test_auth) + environ={ + 'REQUEST_METHOD': 'PUT'}, + headers={ + 'X-Auth-Admin-User': + 'act:adm', + 'X-Auth-Admin-Key': + 'key', + 'X-Auth-User-Key': + 'key', + 'X-Auth-User-Reseller-Admin': 'true'} + ).get_response(self.test_auth) self.assertEquals(resp.status_int, 403) self.assertEquals(self.test_auth.app.calls, 0) self.test_auth.app = FakeApp(iter([ # GET of user object (regular user) - # This shouldn't actually get called, checked below + # This shouldn't actually get called, checked + # below ('200 Ok', {}, json.dumps({"groups": [{"name": "act:usr"}, {"name": "test"}], "auth": "plaintext:key"}))])) resp = Request.blank('/auth/v2/act/usr', - environ={'REQUEST_METHOD': 'PUT'}, - headers={'X-Auth-Admin-User': 'act:adm', - 'X-Auth-Admin-Key': 'key', - 'X-Auth-User-Key': 'key', - 'X-Auth-User-Reseller-Admin': 'true'} - ).get_response(self.test_auth) + environ={ + 'REQUEST_METHOD': 'PUT'}, + headers={ + 'X-Auth-Admin-User': + 'act:adm', + 'X-Auth-Admin-Key': + 'key', + 'X-Auth-User-Key': + 'key', + 'X-Auth-User-Reseller-Admin': 'true'} + ).get_response(self.test_auth) self.assertEquals(resp.status_int, 403) self.assertEquals(self.test_auth.app.calls, 0) def test_put_user_account_admin_fail_bad_creds(self): self.test_auth.app = FakeApp(iter([ - # GET of user object (account admin, but wrong account) + # GET of user object (account admin, but wrong + # account) ('200 Ok', {}, json.dumps({"groups": [{"name": "act2:adm"}, {"name": "test"}, {"name": ".admin"}], - "auth": "plaintext:key"}))])) + "auth": "plaintext:key"}))])) resp = Request.blank('/auth/v2/act/usr', - environ={'REQUEST_METHOD': 'PUT'}, - headers={'X-Auth-Admin-User': 'act2:adm', - 'X-Auth-Admin-Key': 'key', - 'X-Auth-User-Key': 'key', - 'X-Auth-User-Admin': 'true'} - ).get_response(self.test_auth) + environ={ + 'REQUEST_METHOD': 'PUT'}, + headers={ + 'X-Auth-Admin-User': + 'act2:adm', + 'X-Auth-Admin-Key': + 'key', + 'X-Auth-User-Key': + 'key', + 'X-Auth-User-Admin': 'true'} + ).get_response(self.test_auth) self.assertEquals(resp.status_int, 403) self.assertEquals(self.test_auth.app.calls, 1) @@ -2764,27 +3376,37 @@ class TestAuth(unittest.TestCase): ('200 Ok', {}, json.dumps({"groups": [{"name": "act:usr"}, {"name": "test"}], "auth": "plaintext:key"}))])) resp = Request.blank('/auth/v2/act/usr', - environ={'REQUEST_METHOD': 'PUT'}, - headers={'X-Auth-Admin-User': 'act:usr', - 'X-Auth-Admin-Key': 'key', - 'X-Auth-User-Key': 'key', - 'X-Auth-User-Admin': 'true'} - ).get_response(self.test_auth) + environ={ + 'REQUEST_METHOD': 'PUT'}, + headers={ + 'X-Auth-Admin-User': + 'act:usr', + 'X-Auth-Admin-Key': + 'key', + 'X-Auth-User-Key': + 'key', + 'X-Auth-User-Admin': 'true'} + ).get_response(self.test_auth) self.assertEquals(resp.status_int, 403) self.assertEquals(self.test_auth.app.calls, 1) def test_put_user_regular_fail_bad_creds(self): self.test_auth.app = FakeApp(iter([ - # GET of user object (account admin, but wrong account) + # GET of user object (account admin, but wrong + # account) ('200 Ok', {}, json.dumps({"groups": [{"name": "act2:adm"}, {"name": "test"}, {"name": ".admin"}], - "auth": "plaintext:key"}))])) + "auth": "plaintext:key"}))])) resp = Request.blank('/auth/v2/act/usr', - environ={'REQUEST_METHOD': 'PUT'}, - headers={'X-Auth-Admin-User': 'act2:adm', - 'X-Auth-Admin-Key': 'key', - 'X-Auth-User-Key': 'key'} - ).get_response(self.test_auth) + environ={ + 'REQUEST_METHOD': 'PUT'}, + headers={ + 'X-Auth-Admin-User': + 'act2:adm', + 'X-Auth-Admin-Key': + 'key', + 'X-Auth-User-Key': 'key'} + ).get_response(self.test_auth) self.assertEquals(resp.status_int, 403) self.assertEquals(self.test_auth.app.calls, 1) @@ -2793,97 +3415,130 @@ class TestAuth(unittest.TestCase): ('200 Ok', {}, json.dumps({"groups": [{"name": "act:usr"}, {"name": "test"}], "auth": "plaintext:key"}))])) resp = Request.blank('/auth/v2/act/usr', - environ={'REQUEST_METHOD': 'PUT'}, - headers={'X-Auth-Admin-User': 'act:usr', - 'X-Auth-Admin-Key': 'key', - 'X-Auth-User-Key': 'key'} - ).get_response(self.test_auth) + environ={ + 'REQUEST_METHOD': 'PUT'}, + headers={ + 'X-Auth-Admin-User': + 'act:usr', + 'X-Auth-Admin-Key': + 'key', + 'X-Auth-User-Key': 'key'} + ).get_response(self.test_auth) self.assertEquals(resp.status_int, 403) self.assertEquals(self.test_auth.app.calls, 1) def test_put_user_regular_success(self): self.test_auth.app = FakeApp(iter([ - ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, ''), + ('200 Ok', + {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, ''), # PUT of user object ('201 Created', {}, '')])) resp = Request.blank('/auth/v2/act/usr', - environ={'REQUEST_METHOD': 'PUT'}, - headers={'X-Auth-Admin-User': '.super_admin', - 'X-Auth-Admin-Key': 'supertest', - 'X-Auth-User-Key': 'key'} - ).get_response(self.test_auth) + environ={ + 'REQUEST_METHOD': 'PUT'}, + headers={ + 'X-Auth-Admin-User': + '.super_admin', + 'X-Auth-Admin-Key': + 'supertest', + 'X-Auth-User-Key': 'key'} + ).get_response(self.test_auth) self.assertEquals(resp.status_int, 201) self.assertEquals(self.test_auth.app.calls, 2) - self.assertEquals(json.loads(self.test_auth.app.request.body), + self.assertEquals( + json.loads(self.test_auth.app.request.body), {"groups": [{"name": "act:usr"}, {"name": "act"}], "auth": "plaintext:key"}) def test_put_user_special_chars_success(self): self.test_auth.app = FakeApp(iter([ - ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, ''), + ('200 Ok', + {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, ''), # PUT of user object ('201 Created', {}, '')])) resp = Request.blank('/auth/v2/act/u_s-r', - environ={'REQUEST_METHOD': 'PUT'}, - headers={'X-Auth-Admin-User': '.super_admin', - 'X-Auth-Admin-Key': 'supertest', - 'X-Auth-User-Key': 'key'} - ).get_response(self.test_auth) + environ={ + 'REQUEST_METHOD': 'PUT'}, + headers={ + 'X-Auth-Admin-User': + '.super_admin', + 'X-Auth-Admin-Key': + 'supertest', + 'X-Auth-User-Key': 'key'} + ).get_response(self.test_auth) self.assertEquals(resp.status_int, 201) self.assertEquals(self.test_auth.app.calls, 2) - self.assertEquals(json.loads(self.test_auth.app.request.body), + self.assertEquals( + json.loads(self.test_auth.app.request.body), {"groups": [{"name": "act:u_s-r"}, {"name": "act"}], "auth": "plaintext:key"}) def test_put_user_account_admin_success(self): self.test_auth.app = FakeApp(iter([ - ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, ''), + ('200 Ok', + {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, ''), # PUT of user object ('201 Created', {}, '')])) resp = Request.blank('/auth/v2/act/usr', - environ={'REQUEST_METHOD': 'PUT'}, - headers={'X-Auth-Admin-User': '.super_admin', - 'X-Auth-Admin-Key': 'supertest', - 'X-Auth-User-Key': 'key', - 'X-Auth-User-Admin': 'true'} - ).get_response(self.test_auth) + environ={ + 'REQUEST_METHOD': 'PUT'}, + headers={ + 'X-Auth-Admin-User': + '.super_admin', + 'X-Auth-Admin-Key': + 'supertest', + 'X-Auth-User-Key': 'key', + 'X-Auth-User-Admin': 'true'} + ).get_response(self.test_auth) self.assertEquals(resp.status_int, 201) self.assertEquals(self.test_auth.app.calls, 2) - self.assertEquals(json.loads(self.test_auth.app.request.body), + self.assertEquals( + json.loads(self.test_auth.app.request.body), {"groups": [{"name": "act:usr"}, {"name": "act"}, {"name": ".admin"}], "auth": "plaintext:key"}) def test_put_user_reseller_admin_success(self): self.test_auth.app = FakeApp(iter([ - ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, ''), + ('200 Ok', + {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, ''), # PUT of user object ('201 Created', {}, '')])) resp = Request.blank('/auth/v2/act/usr', - environ={'REQUEST_METHOD': 'PUT'}, - headers={'X-Auth-Admin-User': '.super_admin', - 'X-Auth-Admin-Key': 'supertest', - 'X-Auth-User-Key': 'key', - 'X-Auth-User-Reseller-Admin': 'true'} - ).get_response(self.test_auth) + environ={ + 'REQUEST_METHOD': 'PUT'}, + headers={ + 'X-Auth-Admin-User': + '.super_admin', + 'X-Auth-Admin-Key': + 'supertest', + 'X-Auth-User-Key': 'key', + 'X-Auth-User-Reseller-Admin': 'true'} + ).get_response(self.test_auth) self.assertEquals(resp.status_int, 201) self.assertEquals(self.test_auth.app.calls, 2) - self.assertEquals(json.loads(self.test_auth.app.request.body), + self.assertEquals( + json.loads(self.test_auth.app.request.body), {"groups": [{"name": "act:usr"}, {"name": "act"}, {"name": ".admin"}, {"name": ".reseller_admin"}], "auth": "plaintext:key"}) def test_put_user_fail_not_found(self): self.test_auth.app = FakeApp(iter([ - ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, ''), + ('200 Ok', + {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, ''), # PUT of user object ('404 Not Found', {}, '')])) resp = Request.blank('/auth/v2/act/usr', - environ={'REQUEST_METHOD': 'PUT'}, - headers={'X-Auth-Admin-User': '.super_admin', - 'X-Auth-Admin-Key': 'supertest', - 'X-Auth-User-Key': 'key'} - ).get_response(self.test_auth) + environ={ + 'REQUEST_METHOD': 'PUT'}, + headers={ + 'X-Auth-Admin-User': + '.super_admin', + 'X-Auth-Admin-Key': + 'supertest', + 'X-Auth-User-Key': 'key'} + ).get_response(self.test_auth) self.assertEquals(resp.status_int, 404) self.assertEquals(self.test_auth.app.calls, 2) @@ -2892,25 +3547,33 @@ class TestAuth(unittest.TestCase): # PUT of user object ('503 Service Unavailable', {}, '')])) resp = Request.blank('/auth/v2/act/usr', - environ={'REQUEST_METHOD': 'PUT'}, - headers={'X-Auth-Admin-User': '.super_admin', - 'X-Auth-Admin-Key': 'supertest', - 'X-Auth-User-Key': 'key'} - ).get_response(self.test_auth) + environ={ + 'REQUEST_METHOD': 'PUT'}, + headers={ + 'X-Auth-Admin-User': + '.super_admin', + 'X-Auth-Admin-Key': + 'supertest', + 'X-Auth-User-Key': 'key'} + ).get_response(self.test_auth) self.assertEquals(resp.status_int, 500) self.assertEquals(self.test_auth.app.calls, 1) def test_delete_user_bad_creds(self): self.test_auth.app = FakeApp(iter([ - # GET of user object (account admin, but wrong account) + # GET of user object (account admin, but wrong + # account) ('200 Ok', {}, json.dumps({"groups": [{"name": "act2:adm"}, {"name": "test"}, {"name": ".admin"}], - "auth": "plaintext:key"}))])) + "auth": "plaintext:key"}))])) resp = Request.blank('/auth/v2/act/usr', - environ={'REQUEST_METHOD': 'DELETE'}, - headers={'X-Auth-Admin-User': 'act2:adm', - 'X-Auth-Admin-Key': 'key'} - ).get_response(self.test_auth) + environ={ + 'REQUEST_METHOD': 'DELETE'}, + headers={ + 'X-Auth-Admin-User': + 'act2:adm', + 'X-Auth-Admin-Key': 'key'} + ).get_response(self.test_auth) self.assertEquals(resp.status_int, 403) self.assertEquals(self.test_auth.app.calls, 1) @@ -2919,27 +3582,36 @@ class TestAuth(unittest.TestCase): ('200 Ok', {}, json.dumps({"groups": [{"name": "act:usr"}, {"name": "test"}], "auth": "plaintext:key"}))])) resp = Request.blank('/auth/v2/act/usr', - environ={'REQUEST_METHOD': 'DELETE'}, - headers={'X-Auth-Admin-User': 'act:usr', - 'X-Auth-Admin-Key': 'key'} - ).get_response(self.test_auth) + environ={ + 'REQUEST_METHOD': 'DELETE'}, + headers={ + 'X-Auth-Admin-User': + 'act:usr', + 'X-Auth-Admin-Key': 'key'} + ).get_response(self.test_auth) self.assertEquals(resp.status_int, 403) self.assertEquals(self.test_auth.app.calls, 1) def test_delete_user_invalid_account(self): resp = Request.blank('/auth/v2/.invalid/usr', - environ={'REQUEST_METHOD': 'DELETE'}, - headers={'X-Auth-Admin-User': '.super_admin', - 'X-Auth-Admin-Key': 'supertest'} - ).get_response(self.test_auth) + environ={ + 'REQUEST_METHOD': 'DELETE'}, + headers={ + 'X-Auth-Admin-User': + '.super_admin', + 'X-Auth-Admin-Key': 'supertest'} + ).get_response(self.test_auth) self.assertEquals(resp.status_int, 400) def test_delete_user_invalid_user(self): resp = Request.blank('/auth/v2/act/.invalid', - environ={'REQUEST_METHOD': 'DELETE'}, - headers={'X-Auth-Admin-User': '.super_admin', - 'X-Auth-Admin-Key': 'supertest'} - ).get_response(self.test_auth) + environ={ + 'REQUEST_METHOD': 'DELETE'}, + headers={ + 'X-Auth-Admin-User': + '.super_admin', + 'X-Auth-Admin-Key': 'supertest'} + ).get_response(self.test_auth) self.assertEquals(resp.status_int, 400) def test_delete_user_not_found(self): @@ -2947,10 +3619,13 @@ class TestAuth(unittest.TestCase): # HEAD of user object ('404 Not Found', {}, '')])) resp = Request.blank('/auth/v2/act/usr', - environ={'REQUEST_METHOD': 'DELETE'}, - headers={'X-Auth-Admin-User': '.super_admin', - 'X-Auth-Admin-Key': 'supertest'} - ).get_response(self.test_auth) + environ={ + 'REQUEST_METHOD': 'DELETE'}, + headers={ + 'X-Auth-Admin-User': + '.super_admin', + 'X-Auth-Admin-Key': 'supertest'} + ).get_response(self.test_auth) self.assertEquals(resp.status_int, 404) self.assertEquals(self.test_auth.app.calls, 1) @@ -2959,88 +3634,111 @@ class TestAuth(unittest.TestCase): # HEAD of user object ('503 Service Unavailable', {}, '')])) resp = Request.blank('/auth/v2/act/usr', - environ={'REQUEST_METHOD': 'DELETE'}, - headers={'X-Auth-Admin-User': '.super_admin', - 'X-Auth-Admin-Key': 'supertest'} - ).get_response(self.test_auth) + environ={ + 'REQUEST_METHOD': 'DELETE'}, + headers={ + 'X-Auth-Admin-User': + '.super_admin', + 'X-Auth-Admin-Key': 'supertest'} + ).get_response(self.test_auth) self.assertEquals(resp.status_int, 500) self.assertEquals(self.test_auth.app.calls, 1) def test_delete_user_fail_delete_token(self): self.test_auth.app = FakeApp(iter([ # HEAD of user object - ('200 Ok', {'X-Object-Meta-Auth-Token': 'AUTH_tk'}, ''), + ('200 Ok', + {'X-Object-Meta-Auth-Token': 'AUTH_tk'}, ''), # DELETE of token ('503 Service Unavailable', {}, '')])) resp = Request.blank('/auth/v2/act/usr', - environ={'REQUEST_METHOD': 'DELETE'}, - headers={'X-Auth-Admin-User': '.super_admin', - 'X-Auth-Admin-Key': 'supertest'} - ).get_response(self.test_auth) + environ={ + 'REQUEST_METHOD': 'DELETE'}, + headers={ + 'X-Auth-Admin-User': + '.super_admin', + 'X-Auth-Admin-Key': 'supertest'} + ).get_response(self.test_auth) self.assertEquals(resp.status_int, 500) self.assertEquals(self.test_auth.app.calls, 2) def test_delete_user_fail_delete_user(self): self.test_auth.app = FakeApp(iter([ # HEAD of user object - ('200 Ok', {'X-Object-Meta-Auth-Token': 'AUTH_tk'}, ''), + ('200 Ok', + {'X-Object-Meta-Auth-Token': 'AUTH_tk'}, ''), # DELETE of token ('204 No Content', {}, ''), # DELETE of user object ('503 Service Unavailable', {}, '')])) resp = Request.blank('/auth/v2/act/usr', - environ={'REQUEST_METHOD': 'DELETE'}, - headers={'X-Auth-Admin-User': '.super_admin', - 'X-Auth-Admin-Key': 'supertest'} - ).get_response(self.test_auth) + environ={ + 'REQUEST_METHOD': 'DELETE'}, + headers={ + 'X-Auth-Admin-User': + '.super_admin', + 'X-Auth-Admin-Key': 'supertest'} + ).get_response(self.test_auth) self.assertEquals(resp.status_int, 500) self.assertEquals(self.test_auth.app.calls, 3) def test_delete_user_success(self): self.test_auth.app = FakeApp(iter([ # HEAD of user object - ('200 Ok', {'X-Object-Meta-Auth-Token': 'AUTH_tk'}, ''), + ('200 Ok', + {'X-Object-Meta-Auth-Token': 'AUTH_tk'}, ''), # DELETE of token ('204 No Content', {}, ''), # DELETE of user object ('204 No Content', {}, '')])) resp = Request.blank('/auth/v2/act/usr', - environ={'REQUEST_METHOD': 'DELETE'}, - headers={'X-Auth-Admin-User': '.super_admin', - 'X-Auth-Admin-Key': 'supertest'} - ).get_response(self.test_auth) + environ={ + 'REQUEST_METHOD': 'DELETE'}, + headers={ + 'X-Auth-Admin-User': + '.super_admin', + 'X-Auth-Admin-Key': 'supertest'} + ).get_response(self.test_auth) self.assertEquals(resp.status_int, 204) self.assertEquals(self.test_auth.app.calls, 3) def test_delete_user_success_missing_user_at_end(self): self.test_auth.app = FakeApp(iter([ # HEAD of user object - ('200 Ok', {'X-Object-Meta-Auth-Token': 'AUTH_tk'}, ''), + ('200 Ok', + {'X-Object-Meta-Auth-Token': 'AUTH_tk'}, ''), # DELETE of token ('204 No Content', {}, ''), # DELETE of user object ('404 Not Found', {}, '')])) resp = Request.blank('/auth/v2/act/usr', - environ={'REQUEST_METHOD': 'DELETE'}, - headers={'X-Auth-Admin-User': '.super_admin', - 'X-Auth-Admin-Key': 'supertest'} - ).get_response(self.test_auth) + environ={ + 'REQUEST_METHOD': 'DELETE'}, + headers={ + 'X-Auth-Admin-User': + '.super_admin', + 'X-Auth-Admin-Key': 'supertest'} + ).get_response(self.test_auth) self.assertEquals(resp.status_int, 204) self.assertEquals(self.test_auth.app.calls, 3) def test_delete_user_success_missing_token(self): self.test_auth.app = FakeApp(iter([ # HEAD of user object - ('200 Ok', {'X-Object-Meta-Auth-Token': 'AUTH_tk'}, ''), + ('200 Ok', + {'X-Object-Meta-Auth-Token': 'AUTH_tk'}, ''), # DELETE of token ('404 Not Found', {}, ''), # DELETE of user object ('204 No Content', {}, '')])) resp = Request.blank('/auth/v2/act/usr', - environ={'REQUEST_METHOD': 'DELETE'}, - headers={'X-Auth-Admin-User': '.super_admin', - 'X-Auth-Admin-Key': 'supertest'} - ).get_response(self.test_auth) + environ={ + 'REQUEST_METHOD': 'DELETE'}, + headers={ + 'X-Auth-Admin-User': + '.super_admin', + 'X-Auth-Admin-Key': 'supertest'} + ).get_response(self.test_auth) self.assertEquals(resp.status_int, 204) self.assertEquals(self.test_auth.app.calls, 3) @@ -3051,48 +3749,62 @@ class TestAuth(unittest.TestCase): # DELETE of user object ('204 No Content', {}, '')])) resp = Request.blank('/auth/v2/act/usr', - environ={'REQUEST_METHOD': 'DELETE'}, - headers={'X-Auth-Admin-User': '.super_admin', - 'X-Auth-Admin-Key': 'supertest'} - ).get_response(self.test_auth) + environ={ + 'REQUEST_METHOD': 'DELETE'}, + headers={ + 'X-Auth-Admin-User': + '.super_admin', + 'X-Auth-Admin-Key': 'supertest'} + ).get_response(self.test_auth) self.assertEquals(resp.status_int, 204) self.assertEquals(self.test_auth.app.calls, 2) def test_validate_token_bad_prefix(self): resp = Request.blank('/auth/v2/.token/BAD_token' - ).get_response(self.test_auth) + ).get_response(self.test_auth) self.assertEquals(resp.status_int, 400) def test_validate_token_tmi(self): - resp = Request.blank('/auth/v2/.token/AUTH_token/tmi' - ).get_response(self.test_auth) + resp = Request.blank( + '/auth/v2/.token/AUTH_token/tmi').get_response(self.test_auth) self.assertEquals(resp.status_int, 400) def test_validate_token_bad_memcache(self): fake_memcache = FakeMemcache() fake_memcache.set('AUTH_/auth/AUTH_token', 'bogus') - resp = Request.blank('/auth/v2/.token/AUTH_token', - environ={'swift.cache': - fake_memcache}).get_response(self.test_auth) + resp = Request.blank( + '/auth/v2/.token/AUTH_token', + environ={'swift.cache': fake_memcache}).get_response( + self.test_auth) self.assertEquals(resp.status_int, 500) def test_validate_token_from_memcache(self): fake_memcache = FakeMemcache() - fake_memcache.set('AUTH_/auth/AUTH_token', (time() + 1, 'act:usr,act')) - resp = Request.blank('/auth/v2/.token/AUTH_token', - environ={'swift.cache': - fake_memcache}).get_response(self.test_auth) + fake_memcache.set( + 'AUTH_/auth/AUTH_token', + (time() + 1, + 'act:usr,act')) + resp = Request.blank( + '/auth/v2/.token/AUTH_token', + environ={'swift.cache': fake_memcache}).get_response( + self.test_auth) self.assertEquals(resp.status_int, 204) - self.assertEquals(resp.headers.get('x-auth-groups'), 'act:usr,act') + self.assertEquals( + resp.headers.get('x-auth-groups'), + 'act:usr,act') self.assert_(float(resp.headers['x-auth-ttl']) < 1, resp.headers['x-auth-ttl']) def test_validate_token_from_memcache_expired(self): fake_memcache = FakeMemcache() - fake_memcache.set('AUTH_/auth/AUTH_token', (time() - 1, 'act:usr,act')) - resp = Request.blank('/auth/v2/.token/AUTH_token', - environ={'swift.cache': - fake_memcache}).get_response(self.test_auth) + fake_memcache.set( + 'AUTH_/auth/AUTH_token', + (time() - 1, + 'act:usr,act')) + resp = Request.blank( + '/auth/v2/.token/AUTH_token', + environ={'swift.cache': fake_memcache}).get_response( + self.test_auth) self.assertEquals(resp.status_int, 404) self.assert_('x-auth-groups' not in resp.headers) self.assert_('x-auth-ttl' not in resp.headers) @@ -3106,7 +3818,9 @@ class TestAuth(unittest.TestCase): ).get_response(self.test_auth) self.assertEquals(resp.status_int, 204) self.assertEquals(self.test_auth.app.calls, 1) - self.assertEquals(resp.headers.get('x-auth-groups'), 'act:usr,act') + self.assertEquals( + resp.headers.get('x-auth-groups'), + 'act:usr,act') self.assert_(float(resp.headers['x-auth-ttl']) < 1, resp.headers['x-auth-ttl']) @@ -3126,8 +3840,10 @@ class TestAuth(unittest.TestCase): self.test_auth.app = FakeApp(iter([ # GET of token object ('200 Ok', {}, json.dumps({'account_id': 'AUTH_cfa', 'groups': - [{'name': 'act:usr'}, {'name': 'act'}, {'name': '.admin'}], - 'expires': time() + 1}))])) + [{'name': 'act:usr'}, + {'name': 'act'}, + {'name': '.admin'}], + 'expires': time() + 1}))])) resp = Request.blank('/auth/v2/.token/AUTH_token' ).get_response(self.test_auth) self.assertEquals(resp.status_int, 204) @@ -3139,33 +3855,47 @@ class TestAuth(unittest.TestCase): def test_get_conn_default(self): conn = self.test_auth.get_conn() - self.assertEquals(conn.__class__, auth.HTTPConnection) + self.assertEquals( + conn.__class__, + auth.HTTPConnection) self.assertEquals(conn.host, '127.0.0.1') self.assertEquals(conn.port, 8080) def test_get_conn_default_https(self): - local_auth = auth.filter_factory({'super_admin_key': 'supertest', - 'default_swift_cluster': 'local#https://1.2.3.4/v1'})(FakeApp()) + local_auth = auth.filter_factory( + {'super_admin_key': 'supertest', + 'default_swift_cluster': 'local#https://1.2.3.4/v1'})(FakeApp()) conn = local_auth.get_conn() - self.assertEquals(conn.__class__, auth.HTTPSConnection) + self.assertEquals( + conn.__class__, + auth.HTTPSConnection) self.assertEquals(conn.host, '1.2.3.4') self.assertEquals(conn.port, 443) def test_get_conn_overridden(self): - local_auth = auth.filter_factory({'super_admin_key': 'supertest', - 'default_swift_cluster': 'local#https://1.2.3.4/v1'})(FakeApp()) + local_auth = auth.filter_factory( + {'super_admin_key': 'supertest', + 'default_swift_cluster': 'local#https://1.2.3.4/v1'})(FakeApp()) conn = \ - local_auth.get_conn(urlparsed=auth.urlparse('http://5.6.7.8/v1')) - self.assertEquals(conn.__class__, auth.HTTPConnection) + local_auth.get_conn( + urlparsed=auth.urlparse('http://5.6.7.8/v1')) + self.assertEquals( + conn.__class__, + auth.HTTPConnection) self.assertEquals(conn.host, '5.6.7.8') self.assertEquals(conn.port, 80) def test_get_conn_overridden_https(self): - local_auth = auth.filter_factory({'super_admin_key': 'supertest', - 'default_swift_cluster': 'local#http://1.2.3.4/v1'})(FakeApp()) + local_auth = auth.filter_factory( + {'super_admin_key': 'supertest', + 'default_swift_cluster': 'local#http://1.2.3.4/v1'})(FakeApp()) conn = \ - local_auth.get_conn(urlparsed=auth.urlparse('https://5.6.7.8/v1')) - self.assertEquals(conn.__class__, auth.HTTPSConnection) + local_auth.get_conn( + urlparsed=auth.urlparse( + 'https://5.6.7.8/v1')) + self.assertEquals( + conn.__class__, + auth.HTTPSConnection) self.assertEquals(conn.host, '5.6.7.8') self.assertEquals(conn.port, 443) @@ -3173,32 +3903,43 @@ class TestAuth(unittest.TestCase): exc = None try: self.test_auth.get_itoken({}) - except Exception, err: + except Exception as err: exc = err self.assertEquals(str(exc), 'No memcache set up; required for Swauth middleware') def test_get_itoken_success(self): fmc = FakeMemcache() - itk = self.test_auth.get_itoken({'swift.cache': fmc}) + itk = self.test_auth.get_itoken( + {'swift.cache': fmc}) self.assert_(itk.startswith('AUTH_itk'), itk) expires, groups = fmc.get('AUTH_/auth/%s' % itk) self.assert_(expires > time(), expires) - self.assertEquals(groups, '.auth,.reseller_admin,AUTH_.auth') + self.assertEquals( + groups, + '.auth,.reseller_admin,AUTH_.auth') def test_get_admin_detail_fail_no_colon(self): self.test_auth.app = FakeApp(iter([])) - self.assertEquals(self.test_auth.get_admin_detail(Request.blank('/')), - None) - self.assertEquals(self.test_auth.get_admin_detail(Request.blank('/', - headers={'X-Auth-Admin-User': 'usr'})), None) - self.assertRaises(StopIteration, self.test_auth.get_admin_detail, + self.assertEquals( + self.test_auth.get_admin_detail( + Request.blank('/')), + None) + self.assertEquals( + self.test_auth.get_admin_detail( + Request.blank('/', + headers={'X-Auth-Admin-User': 'usr'})), None) + self.assertRaises( + StopIteration, self.test_auth.get_admin_detail, Request.blank('/', headers={'X-Auth-Admin-User': 'act:usr'})) def test_get_admin_detail_fail_user_not_found(self): - self.test_auth.app = FakeApp(iter([('404 Not Found', {}, '')])) - self.assertEquals(self.test_auth.get_admin_detail(Request.blank('/', - headers={'X-Auth-Admin-User': 'act:usr'})), None) + self.test_auth.app = FakeApp( + iter([('404 Not Found', {}, '')])) + self.assertEquals( + self.test_auth.get_admin_detail( + Request.blank('/', + headers={'X-Auth-Admin-User': 'act:usr'})), None) self.assertEquals(self.test_auth.app.calls, 1) def test_get_admin_detail_fail_get_user_error(self): @@ -3206,12 +3947,13 @@ class TestAuth(unittest.TestCase): ('503 Service Unavailable', {}, '')])) exc = None try: - self.test_auth.get_admin_detail(Request.blank('/', - headers={'X-Auth-Admin-User': 'act:usr'})) - except Exception, err: + self.test_auth.get_admin_detail( + Request.blank('/', + headers={'X-Auth-Admin-User': 'act:usr'})) + except Exception as err: exc = err self.assertEquals(str(exc), 'Could not get admin user object: ' - '/v1/AUTH_.auth/act/usr 503 Service Unavailable') + '/v1/AUTH_.auth/act/usr 503 Service Unavailable') self.assertEquals(self.test_auth.app.calls, 1) def test_get_admin_detail_success(self): @@ -3220,80 +3962,118 @@ class TestAuth(unittest.TestCase): json.dumps({"auth": "plaintext:key", "groups": [{'name': "act:usr"}, {'name': "act"}, {'name': ".admin"}]}))])) - detail = self.test_auth.get_admin_detail(Request.blank('/', - headers={'X-Auth-Admin-User': 'act:usr'})) + detail = self.test_auth.get_admin_detail( + Request.blank('/', + headers={'X-Auth-Admin-User': 'act:usr'})) self.assertEquals(self.test_auth.app.calls, 1) - self.assertEquals(detail, {'account': 'act', - 'auth': 'plaintext:key', - 'groups': [{'name': 'act:usr'}, {'name': 'act'}, - {'name': '.admin'}]}) + self.assertEquals( + detail, {'account': 'act', + 'auth': 'plaintext:key', + 'groups': [{'name': 'act:usr'}, {'name': 'act'}, + {'name': '.admin'}]}) def test_credentials_match_success(self): self.assert_(self.test_auth.credentials_match( {'auth': 'plaintext:key'}, 'key')) def test_credentials_match_fail_no_details(self): - self.assert_(not self.test_auth.credentials_match(None, 'notkey')) + self.assert_( + not self.test_auth.credentials_match(None, 'notkey')) def test_credentials_match_fail_plaintext(self): self.assert_(not self.test_auth.credentials_match( {'auth': 'plaintext:key'}, 'notkey')) def test_is_super_admin_success(self): - self.assert_(self.test_auth.is_super_admin(Request.blank('/', - headers={'X-Auth-Admin-User': '.super_admin', - 'X-Auth-Admin-Key': 'supertest'}))) + self.assert_( + self.test_auth.is_super_admin( + Request.blank( + '/', + headers={'X-Auth-Admin-User': '.super_admin', + 'X-Auth-Admin-Key': 'supertest'}))) def test_is_super_admin_fail_bad_key(self): - self.assert_(not self.test_auth.is_super_admin(Request.blank('/', - headers={'X-Auth-Admin-User': '.super_admin', - 'X-Auth-Admin-Key': 'bad'}))) - self.assert_(not self.test_auth.is_super_admin(Request.blank('/', - headers={'X-Auth-Admin-User': '.super_admin'}))) - self.assert_(not self.test_auth.is_super_admin(Request.blank('/'))) + self.assert_( + not self.test_auth.is_super_admin( + Request.blank('/', + headers={ + 'X-Auth-Admin-User': + '.super_admin', + 'X-Auth-Admin-Key': 'bad'}))) + self.assert_( + not self.test_auth.is_super_admin( + Request.blank('/', + headers={'X-Auth-Admin-User': '.super_admin'}))) + self.assert_( + not self.test_auth.is_super_admin(Request.blank('/'))) def test_is_super_admin_fail_bad_user(self): - self.assert_(not self.test_auth.is_super_admin(Request.blank('/', - headers={'X-Auth-Admin-User': 'bad', - 'X-Auth-Admin-Key': 'supertest'}))) - self.assert_(not self.test_auth.is_super_admin(Request.blank('/', - headers={'X-Auth-Admin-Key': 'supertest'}))) - self.assert_(not self.test_auth.is_super_admin(Request.blank('/'))) + self.assert_( + not self.test_auth.is_super_admin( + Request.blank('/', + headers={ + 'X-Auth-Admin-User': + 'bad', + 'X-Auth-Admin-Key': 'supertest'}))) + self.assert_( + not self.test_auth.is_super_admin( + Request.blank('/', + headers={'X-Auth-Admin-Key': 'supertest'}))) + self.assert_( + not self.test_auth.is_super_admin(Request.blank('/'))) def test_is_reseller_admin_success_is_super_admin(self): - self.assert_(self.test_auth.is_reseller_admin(Request.blank('/', - headers={'X-Auth-Admin-User': '.super_admin', - 'X-Auth-Admin-Key': 'supertest'}))) + self.assert_( + self.test_auth.is_reseller_admin( + Request.blank('/', + headers={ + 'X-Auth-Admin-User': + '.super_admin', + 'X-Auth-Admin-Key': 'supertest'}))) - def test_is_reseller_admin_success_called_get_admin_detail(self): + def test_is_reseller_admin_success_called_get_admin_detail( + self): self.test_auth.app = FakeApp(iter([ ('200 Ok', {}, json.dumps({'auth': 'plaintext:key', 'groups': [{'name': 'act:rdm'}, {'name': 'act'}, {'name': '.admin'}, {'name': '.reseller_admin'}]}))])) - self.assert_(self.test_auth.is_reseller_admin(Request.blank('/', - headers={'X-Auth-Admin-User': 'act:rdm', - 'X-Auth-Admin-Key': 'key'}))) + self.assert_( + self.test_auth.is_reseller_admin( + Request.blank('/', + headers={ + 'X-Auth-Admin-User': + 'act:rdm', + 'X-Auth-Admin-Key': 'key'}))) - def test_is_reseller_admin_fail_only_account_admin(self): + def test_is_reseller_admin_fail_only_account_admin( + self): self.test_auth.app = FakeApp(iter([ ('200 Ok', {}, json.dumps({'auth': 'plaintext:key', 'groups': [{'name': 'act:adm'}, {'name': 'act'}, {'name': '.admin'}]}))])) - self.assert_(not self.test_auth.is_reseller_admin(Request.blank('/', - headers={'X-Auth-Admin-User': 'act:adm', - 'X-Auth-Admin-Key': 'key'}))) + self.assert_( + not self.test_auth.is_reseller_admin( + Request.blank('/', + headers={ + 'X-Auth-Admin-User': + 'act:adm', + 'X-Auth-Admin-Key': 'key'}))) def test_is_reseller_admin_fail_regular_user(self): self.test_auth.app = FakeApp(iter([ ('200 Ok', {}, json.dumps({'auth': 'plaintext:key', 'groups': [{'name': 'act:usr'}, {'name': 'act'}]}))])) - self.assert_(not self.test_auth.is_reseller_admin(Request.blank('/', - headers={'X-Auth-Admin-User': 'act:usr', - 'X-Auth-Admin-Key': 'key'}))) + self.assert_( + not self.test_auth.is_reseller_admin( + Request.blank('/', + headers={ + 'X-Auth-Admin-User': + 'act:usr', + 'X-Auth-Admin-Key': 'key'}))) def test_is_reseller_admin_fail_bad_key(self): self.test_auth.app = FakeApp(iter([ @@ -3302,25 +4082,38 @@ class TestAuth(unittest.TestCase): 'groups': [{'name': 'act:rdm'}, {'name': 'act'}, {'name': '.admin'}, {'name': '.reseller_admin'}]}))])) - self.assert_(not self.test_auth.is_reseller_admin(Request.blank('/', - headers={'X-Auth-Admin-User': 'act:rdm', - 'X-Auth-Admin-Key': 'bad'}))) + self.assert_( + not self.test_auth.is_reseller_admin( + Request.blank('/', + headers={ + 'X-Auth-Admin-User': + 'act:rdm', + 'X-Auth-Admin-Key': 'bad'}))) def test_is_account_admin_success_is_super_admin(self): - self.assert_(self.test_auth.is_account_admin(Request.blank('/', - headers={'X-Auth-Admin-User': '.super_admin', - 'X-Auth-Admin-Key': 'supertest'}), 'act')) + self.assert_( + self.test_auth.is_account_admin( + Request.blank('/', + headers={ + 'X-Auth-Admin-User': + '.super_admin', + 'X-Auth-Admin-Key': 'supertest'}), 'act')) - def test_is_account_admin_success_is_reseller_admin(self): + def test_is_account_admin_success_is_reseller_admin( + self): self.test_auth.app = FakeApp(iter([ ('200 Ok', {}, json.dumps({'auth': 'plaintext:key', 'groups': [{'name': 'act:rdm'}, {'name': 'act'}, {'name': '.admin'}, {'name': '.reseller_admin'}]}))])) - self.assert_(self.test_auth.is_account_admin(Request.blank('/', - headers={'X-Auth-Admin-User': 'act:rdm', - 'X-Auth-Admin-Key': 'key'}), 'act')) + self.assert_( + self.test_auth.is_account_admin( + Request.blank('/', + headers={ + 'X-Auth-Admin-User': + 'act:rdm', + 'X-Auth-Admin-Key': 'key'}), 'act')) def test_is_account_admin_success(self): self.test_auth.app = FakeApp(iter([ @@ -3328,28 +4121,41 @@ class TestAuth(unittest.TestCase): json.dumps({'auth': 'plaintext:key', 'groups': [{'name': 'act:adm'}, {'name': 'act'}, {'name': '.admin'}]}))])) - self.assert_(self.test_auth.is_account_admin(Request.blank('/', - headers={'X-Auth-Admin-User': 'act:adm', - 'X-Auth-Admin-Key': 'key'}), 'act')) + self.assert_( + self.test_auth.is_account_admin( + Request.blank('/', + headers={ + 'X-Auth-Admin-User': + 'act:adm', + 'X-Auth-Admin-Key': 'key'}), 'act')) - def test_is_account_admin_fail_account_admin_different_account(self): + def test_is_account_admin_fail_account_admin_different_account( + self): self.test_auth.app = FakeApp(iter([ ('200 Ok', {}, json.dumps({'auth': 'plaintext:key', 'groups': [{'name': 'act2:adm'}, {'name': 'act2'}, {'name': '.admin'}]}))])) - self.assert_(not self.test_auth.is_account_admin(Request.blank('/', - headers={'X-Auth-Admin-User': 'act2:adm', - 'X-Auth-Admin-Key': 'key'}), 'act')) + self.assert_( + not self.test_auth.is_account_admin( + Request.blank('/', + headers={ + 'X-Auth-Admin-User': + 'act2:adm', + 'X-Auth-Admin-Key': 'key'}), 'act')) def test_is_account_admin_fail_regular_user(self): self.test_auth.app = FakeApp(iter([ ('200 Ok', {}, json.dumps({'auth': 'plaintext:key', 'groups': [{'name': 'act:usr'}, {'name': 'act'}]}))])) - self.assert_(not self.test_auth.is_account_admin(Request.blank('/', - headers={'X-Auth-Admin-User': 'act:usr', - 'X-Auth-Admin-Key': 'key'}), 'act')) + self.assert_( + not self.test_auth.is_account_admin( + Request.blank('/', + headers={ + 'X-Auth-Admin-User': + 'act:usr', + 'X-Auth-Admin-Key': 'key'}), 'act')) def test_is_account_admin_fail_bad_key(self): self.test_auth.app = FakeApp(iter([ @@ -3358,25 +4164,34 @@ class TestAuth(unittest.TestCase): 'groups': [{'name': 'act:rdm'}, {'name': 'act'}, {'name': '.admin'}, {'name': '.reseller_admin'}]}))])) - self.assert_(not self.test_auth.is_account_admin(Request.blank('/', - headers={'X-Auth-Admin-User': 'act:rdm', - 'X-Auth-Admin-Key': 'bad'}), 'act')) - - def test_reseller_admin_but_account_is_internal_use_only(self): + self.assert_( + not self.test_auth.is_account_admin( + Request.blank('/', + headers={ + 'X-Auth-Admin-User': + 'act:rdm', + 'X-Auth-Admin-Key': 'bad'}), 'act')) + + def test_reseller_admin_but_account_is_internal_use_only( + self): req = Request.blank('/v1/AUTH_.auth', environ={'REQUEST_METHOD': 'GET'}) req.remote_user = 'act:usr,act,.reseller_admin' resp = self.test_auth.authorize(req) self.assertEquals(resp.status_int, 403) - def test_reseller_admin_but_account_is_exactly_reseller_prefix(self): - req = Request.blank('/v1/AUTH_', environ={'REQUEST_METHOD': 'GET'}) + def test_reseller_admin_but_account_is_exactly_reseller_prefix( + self): + req = Request.blank( + '/v1/AUTH_', + environ={'REQUEST_METHOD': 'GET'}) req.remote_user = 'act:usr,act,.reseller_admin' resp = self.test_auth.authorize(req) self.assertEquals(resp.status_int, 403) - def _get_token_success_v1_0_encoded(self, saved_user, saved_key, sent_user, - sent_key): + def _get_token_success_v1_0_encoded( + self, saved_user, saved_key, sent_user, + sent_key): self.test_auth.app = FakeApp(iter([ # GET of user object ('200 Ok', {}, @@ -3384,7 +4199,8 @@ class TestAuth(unittest.TestCase): "groups": [{'name': saved_user}, {'name': "act"}, {'name': ".admin"}]})), # GET of account - ('204 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, ''), + ('204 Ok', + {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, ''), # PUT of new token ('201 Created', {}, ''), # POST of token to user object @@ -3392,19 +4208,23 @@ class TestAuth(unittest.TestCase): # GET of services object ('200 Ok', {}, json.dumps({"storage": {"default": "local", "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}}))])) - resp = Request.blank('/auth/v1.0', + resp = Request.blank( + '/auth/v1.0', headers={'X-Auth-User': sent_user, 'X-Auth-Key': sent_key}).get_response(self.test_auth) self.assertEquals(resp.status_int, 200) - self.assert_(resp.headers.get('x-auth-token', - '').startswith('AUTH_tk'), resp.headers.get('x-auth-token')) + self.assert_( + resp.headers.get('x-auth-token', + '').startswith('AUTH_tk'), + resp.headers.get('x-auth-token')) self.assertEquals(resp.headers.get('x-auth-token'), resp.headers.get('x-storage-token')) self.assertEquals(resp.headers.get('x-storage-url'), 'http://127.0.0.1:8080/v1/AUTH_cfa') - self.assertEquals(json.loads(resp.body), + self.assertEquals( + json.loads(resp.body), {"storage": {"default": "local", - "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}}) + "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}}) self.assertEquals(self.test_auth.app.calls, 5) def test_get_token_success_v1_0_encoded1(self): @@ -3420,12 +4240,17 @@ class TestAuth(unittest.TestCase): 'act:u s r', 'k:e:y', 'act%3au%20s%20r', 'k%3Ae%3ay') def test_allowed_sync_hosts(self): - a = auth.filter_factory({'super_admin_key': 'supertest'})(FakeApp()) - self.assertEquals(a.allowed_sync_hosts, ['127.0.0.1']) - a = auth.filter_factory({'super_admin_key': 'supertest', + a = auth.filter_factory( + {'super_admin_key': 'supertest'})(FakeApp()) + self.assertEquals( + a.allowed_sync_hosts, + ['127.0.0.1']) + a = auth.filter_factory({ + 'super_admin_key': 'supertest', 'allowed_sync_hosts': - '1.1.1.1,2.1.1.1, 3.1.1.1 , 4.1.1.1,, , 5.1.1.1'})(FakeApp()) - self.assertEquals(a.allowed_sync_hosts, + '1.1.1.1,2.1.1.1, 3.1.1.1 , 4.1.1.1,, , 5.1.1.1'})(FakeApp()) + self.assertEquals( + a.allowed_sync_hosts, ['1.1.1.1', '2.1.1.1', '3.1.1.1', '4.1.1.1', '5.1.1.1']) def test_reseller_admin_is_owner(self): @@ -3434,20 +4259,24 @@ class TestAuth(unittest.TestCase): def mitm_authorize(req): rv = orig_authorize(req) - owner_values.append(req.environ.get('swift_owner', False)) + owner_values.append( + req.environ.get('swift_owner', False)) return rv self.test_auth.authorize = mitm_authorize self.test_auth.app = FakeApp(iter([ ('200 Ok', {}, - json.dumps({'account': 'other', 'user': 'other:usr', - 'account_id': 'AUTH_other', - 'groups': [{'name': 'other:usr'}, {'name': 'other'}, - {'name': '.reseller_admin'}], - 'expires': time() + 60})), + json.dumps( + {'account': 'other', 'user': 'other:usr', + 'account_id': 'AUTH_other', + 'groups': [{'name': 'other:usr'}, {'name': 'other'}, + {'name': '.reseller_admin'}], + 'expires': time() + 60})), ('204 No Content', {}, '')])) - req = Request.blank('/v1/AUTH_cfa', headers={'X-Auth-Token': 'AUTH_t'}) + req = Request.blank( + '/v1/AUTH_cfa', + headers={'X-Auth-Token': 'AUTH_t'}) resp = req.get_response(self.test_auth) self.assertEquals(resp.status_int, 204) self.assertEquals(owner_values, [True]) @@ -3458,20 +4287,24 @@ class TestAuth(unittest.TestCase): def mitm_authorize(req): rv = orig_authorize(req) - owner_values.append(req.environ.get('swift_owner', False)) + owner_values.append( + req.environ.get('swift_owner', False)) return rv self.test_auth.authorize = mitm_authorize self.test_auth.app = FakeApp(iter([ ('200 Ok', {}, - json.dumps({'account': 'act', 'user': 'act:usr', - 'account_id': 'AUTH_cfa', - 'groups': [{'name': 'act:usr'}, {'name': 'act'}, - {'name': '.admin'}], - 'expires': time() + 60})), + json.dumps( + {'account': 'act', 'user': 'act:usr', + 'account_id': 'AUTH_cfa', + 'groups': [{'name': 'act:usr'}, {'name': 'act'}, + {'name': '.admin'}], + 'expires': time() + 60})), ('204 No Content', {}, '')])) - req = Request.blank('/v1/AUTH_cfa', headers={'X-Auth-Token': 'AUTH_t'}) + req = Request.blank( + '/v1/AUTH_cfa', + headers={'X-Auth-Token': 'AUTH_t'}) resp = req.get_response(self.test_auth) self.assertEquals(resp.status_int, 204) self.assertEquals(owner_values, [True]) @@ -3482,17 +4315,21 @@ class TestAuth(unittest.TestCase): def mitm_authorize(req): rv = orig_authorize(req) - owner_values.append(req.environ.get('swift_owner', False)) + owner_values.append( + req.environ.get('swift_owner', False)) return rv self.test_auth.authorize = mitm_authorize self.test_auth.app = FakeApp(iter([ ('200 Ok', {}, - json.dumps({'account': 'act', 'user': 'act:usr', - 'account_id': 'AUTH_cfa', - 'groups': [{'name': 'act:usr'}, {'name': 'act'}], - 'expires': time() + 60})), + json.dumps( + {'account': 'act', 'user': 'act:usr', + 'account_id': 'AUTH_cfa', + 'groups': + [{'name': 'act:usr'}, { + 'name': 'act'}], + 'expires': time() + 60})), ('204 No Content', {}, '')]), acl='act:usr') req = Request.blank('/v1/AUTH_cfa/c', headers={'X-Auth-Token': 'AUTH_t'}) @@ -3501,87 +4338,119 @@ class TestAuth(unittest.TestCase): self.assertEquals(owner_values, [False]) def test_sync_request_success(self): - self.test_auth.app = FakeApp(iter([('204 No Content', {}, '')]), - sync_key='secret') + self.test_auth.app = FakeApp( + iter([('204 No Content', {}, '')]), + sync_key='secret') req = Request.blank('/v1/AUTH_cfa/c/o', - environ={'REQUEST_METHOD': 'DELETE'}, - headers={'x-container-sync-key': 'secret', - 'x-timestamp': '123.456'}) + environ={ + 'REQUEST_METHOD': 'DELETE'}, + headers={ + 'x-container-sync-key': + 'secret', + 'x-timestamp': '123.456'}) req.remote_addr = '127.0.0.1' resp = req.get_response(self.test_auth) self.assertEquals(resp.status_int, 204) def test_sync_request_fail_key(self): - self.test_auth.app = FakeApp(iter([('204 No Content', {}, '')]), - sync_key='secret') + self.test_auth.app = FakeApp( + iter([('204 No Content', {}, '')]), + sync_key='secret') req = Request.blank('/v1/AUTH_cfa/c/o', - environ={'REQUEST_METHOD': 'DELETE'}, - headers={'x-container-sync-key': 'wrongsecret', - 'x-timestamp': '123.456'}) + environ={ + 'REQUEST_METHOD': 'DELETE'}, + headers={ + 'x-container-sync-key': + 'wrongsecret', + 'x-timestamp': '123.456'}) req.remote_addr = '127.0.0.1' resp = req.get_response(self.test_auth) self.assertEquals(resp.status_int, 401) - self.test_auth.app = FakeApp(iter([('204 No Content', {}, '')]), - sync_key='othersecret') + self.test_auth.app = FakeApp( + iter([('204 No Content', {}, '')]), + sync_key='othersecret') req = Request.blank('/v1/AUTH_cfa/c/o', - environ={'REQUEST_METHOD': 'DELETE'}, - headers={'x-container-sync-key': 'secret', - 'x-timestamp': '123.456'}) + environ={ + 'REQUEST_METHOD': 'DELETE'}, + headers={ + 'x-container-sync-key': + 'secret', + 'x-timestamp': '123.456'}) req.remote_addr = '127.0.0.1' resp = req.get_response(self.test_auth) self.assertEquals(resp.status_int, 401) - self.test_auth.app = FakeApp(iter([('204 No Content', {}, '')]), - sync_key=None) + self.test_auth.app = FakeApp( + iter([('204 No Content', {}, '')]), + sync_key=None) req = Request.blank('/v1/AUTH_cfa/c/o', - environ={'REQUEST_METHOD': 'DELETE'}, - headers={'x-container-sync-key': 'secret', - 'x-timestamp': '123.456'}) + environ={ + 'REQUEST_METHOD': 'DELETE'}, + headers={ + 'x-container-sync-key': + 'secret', + 'x-timestamp': '123.456'}) req.remote_addr = '127.0.0.1' resp = req.get_response(self.test_auth) self.assertEquals(resp.status_int, 401) def test_sync_request_fail_no_timestamp(self): - self.test_auth.app = FakeApp(iter([('204 No Content', {}, '')]), - sync_key='secret') + self.test_auth.app = FakeApp( + iter([('204 No Content', {}, '')]), + sync_key='secret') req = Request.blank('/v1/AUTH_cfa/c/o', - environ={'REQUEST_METHOD': 'DELETE'}, - headers={'x-container-sync-key': 'secret'}) + environ={ + 'REQUEST_METHOD': 'DELETE'}, + headers={'x-container-sync-key': 'secret'}) req.remote_addr = '127.0.0.1' resp = req.get_response(self.test_auth) self.assertEquals(resp.status_int, 401) def test_sync_request_fail_sync_host(self): - self.test_auth.app = FakeApp(iter([('204 No Content', {}, '')]), - sync_key='secret') + self.test_auth.app = FakeApp( + iter([('204 No Content', {}, '')]), + sync_key='secret') req = Request.blank('/v1/AUTH_cfa/c/o', - environ={'REQUEST_METHOD': 'DELETE'}, - headers={'x-container-sync-key': 'secret', - 'x-timestamp': '123.456'}) + environ={ + 'REQUEST_METHOD': 'DELETE'}, + headers={ + 'x-container-sync-key': + 'secret', + 'x-timestamp': '123.456'}) req.remote_addr = '127.0.0.2' resp = req.get_response(self.test_auth) self.assertEquals(resp.status_int, 401) def test_sync_request_success_lb_sync_host(self): - self.test_auth.app = FakeApp(iter([('204 No Content', {}, '')]), - sync_key='secret') + self.test_auth.app = FakeApp( + iter([('204 No Content', {}, '')]), + sync_key='secret') req = Request.blank('/v1/AUTH_cfa/c/o', - environ={'REQUEST_METHOD': 'DELETE'}, - headers={'x-container-sync-key': 'secret', - 'x-timestamp': '123.456', - 'x-forwarded-for': '127.0.0.1'}) + environ={ + 'REQUEST_METHOD': 'DELETE'}, + headers={ + 'x-container-sync-key': + 'secret', + 'x-timestamp': + '123.456', + 'x-forwarded-for': '127.0.0.1'}) req.remote_addr = '127.0.0.2' resp = req.get_response(self.test_auth) self.assertEquals(resp.status_int, 204) - self.test_auth.app = FakeApp(iter([('204 No Content', {}, '')]), - sync_key='secret') + self.test_auth.app = FakeApp( + iter([('204 No Content', {}, '')]), + sync_key='secret') req = Request.blank('/v1/AUTH_cfa/c/o', - environ={'REQUEST_METHOD': 'DELETE'}, - headers={'x-container-sync-key': 'secret', - 'x-timestamp': '123.456', - 'x-cluster-client-ip': '127.0.0.1'}) + environ={ + 'REQUEST_METHOD': 'DELETE'}, + headers={ + 'x-container-sync-key': + 'secret', + 'x-timestamp': + '123.456', + 'x-cluster-client-ip': '127.0.0.1'}) req.remote_addr = '127.0.0.2' resp = req.get_response(self.test_auth) self.assertEquals(resp.status_int, 204) @@ -3593,7 +4462,8 @@ class TestAuth(unittest.TestCase): def test_override_asked_for_but_not_allowed(self): self.test_auth = \ - auth.filter_factory({'allow_overrides': 'false'})(FakeApp()) + auth.filter_factory( + {'allow_overrides': 'false'})(FakeApp()) req = self._make_request('/v1/AUTH_account', environ={'swift.authorize_override': True}) resp = req.get_response(self.test_auth) @@ -3603,31 +4473,38 @@ class TestAuth(unittest.TestCase): def test_override_asked_for_and_allowed(self): self.test_auth = \ - auth.filter_factory({'allow_overrides': 'true'})(FakeApp()) + auth.filter_factory( + {'allow_overrides': 'true'})(FakeApp()) req = self._make_request('/v1/AUTH_account', environ={'swift.authorize_override': True}) resp = req.get_response(self.test_auth) self.assertEquals(resp.status_int, 404) - self.assertTrue('swift.authorize' not in resp.environ) + self.assertTrue( + 'swift.authorize' not in resp.environ) def test_override_default_allowed(self): req = self._make_request('/v1/AUTH_account', environ={'swift.authorize_override': True}) resp = req.get_response(self.test_auth) self.assertEquals(resp.status_int, 404) - self.assertTrue('swift.authorize' not in resp.environ) + self.assertTrue( + 'swift.authorize' not in resp.environ) def test_token_too_long(self): req = self._make_request('/v1/AUTH_account', headers={ 'x-auth-token': 'a' * MAX_TOKEN_LENGTH}) resp = req.get_response(self.test_auth) self.assertEquals(resp.status_int, 401) - self.assertNotEquals(resp.body, 'Token exceeds maximum length.') + self.assertNotEquals( + resp.body, + 'Token exceeds maximum length.') req = self._make_request('/v1/AUTH_account', headers={ 'x-auth-token': 'a' * (MAX_TOKEN_LENGTH + 1)}) resp = req.get_response(self.test_auth) self.assertEquals(resp.status_int, 400) - self.assertEquals(resp.body, 'Token exceeds maximum length.') + self.assertEquals( + resp.body, + 'Token exceeds maximum length.') def test_crazy_authorization(self): req = self._make_request('/v1/AUTH_account', headers={ diff --git a/tox.ini b/tox.ini index f81069a..61ecb9b 100644 --- a/tox.ini +++ b/tox.ini @@ -39,5 +39,5 @@ commands = {posargs} [flake8] ignore = H builtins = _ -exclude = .venv,.tox,dist,doc,test,*egg,gswauth +exclude = .venv,.tox,dist,doc,test,*egg show-source = True -- cgit