# Copyright (c) 2010-2011 OpenStack, LLC. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or # implied. # See the License for the specific language governing permissions and # limitations under the License. try: import simplejson as json except ImportError: import json import unittest from nose import SkipTest from contextlib import contextmanager from time import time from swift.common.swob import Request, Response from gluster.swift.common.middleware.gswauth.swauth import middleware as auth from gluster.swift.common.middleware.gswauth.swauth.authtypes import MAX_TOKEN_LENGTH DEFAULT_TOKEN_LIFE = 86400 MAX_TOKEN_LIFE = 100000 class FakeMemcache(object): def __init__(self): self.store = {} def get(self, key): return self.store.get(key) def set(self, key, value, timeout=0, time=0): self.store[key] = value return True def incr(self, key, timeout=0, time=0): self.store[key] = self.store.setdefault(key, 0) + 1 return self.store[key] @contextmanager def soft_lock(self, key, timeout=0, retries=5, time=0): yield True def delete(self, key): try: del self.store[key] except Exception: pass return True class FakeApp(object): def __init__( self, status_headers_body_iter=None, acl=None, sync_key=None): self.calls = 0 self.status_headers_body_iter = status_headers_body_iter if not self.status_headers_body_iter: self.status_headers_body_iter = iter( [('404 Not Found', {}, '')]) self.acl = acl self.sync_key = sync_key def __call__(self, env, start_response): self.calls += 1 self.request = Request.blank('', environ=env) if self.acl: self.request.acl = self.acl if self.sync_key: self.request.environ[ 'swift_sync_key'] = self.sync_key if 'swift.authorize' in env: resp = env['swift.authorize'](self.request) if resp: return resp(env, start_response) status, headers, body = self.status_headers_body_iter.next( ) return Response(status=status, headers=headers, body=body)(env, start_response) class FakeConn(object): def __init__(self, status_headers_body_iter=None): self.calls = 0 self.status_headers_body_iter = status_headers_body_iter if not self.status_headers_body_iter: self.status_headers_body_iter = iter( [('404 Not Found', {}, '')]) def request(self, method, path, headers): self.calls += 1 self.request_path = path self.status, self.headers, self.body = \ self.status_headers_body_iter.next() self.status, self.reason = self.status.split(' ', 1) self.status = int(self.status) def getresponse(self): return self def read(self): body = self.body self.body = '' return body class TestAuth(unittest.TestCase): def setUp(self): self.test_auth = \ auth.filter_factory({ 'super_admin_key': 'supertest', 'token_life': str(DEFAULT_TOKEN_LIFE), 'max_token_life': str(MAX_TOKEN_LIFE)})(FakeApp()) def test_super_admin_key_not_required(self): auth.filter_factory({})(FakeApp()) def test_reseller_prefix_init(self): app = FakeApp() ath = auth.filter_factory( {'super_admin_key': 'supertest'})(app) self.assertEquals(ath.reseller_prefix, 'AUTH_') ath = auth.filter_factory( {'super_admin_key': 'supertest', 'reseller_prefix': 'TEST'})(app) self.assertEquals(ath.reseller_prefix, 'TEST_') ath = auth.filter_factory( {'super_admin_key': 'supertest', 'reseller_prefix': 'TEST_'})(app) self.assertEquals(ath.reseller_prefix, 'TEST_') def test_auth_prefix_init(self): app = FakeApp() ath = auth.filter_factory( {'super_admin_key': 'supertest'})(app) self.assertEquals(ath.auth_prefix, '/auth/') ath = auth.filter_factory( {'super_admin_key': 'supertest', 'auth_prefix': ''})(app) self.assertEquals(ath.auth_prefix, '/auth/') ath = auth.filter_factory( {'super_admin_key': 'supertest', 'auth_prefix': '/test/'})(app) self.assertEquals(ath.auth_prefix, '/test/') ath = auth.filter_factory( {'super_admin_key': 'supertest', 'auth_prefix': '/test'})(app) self.assertEquals(ath.auth_prefix, '/test/') ath = auth.filter_factory( {'super_admin_key': 'supertest', 'auth_prefix': 'test/'})(app) self.assertEquals(ath.auth_prefix, '/test/') ath = auth.filter_factory( {'super_admin_key': 'supertest', 'auth_prefix': 'test'})(app) self.assertEquals(ath.auth_prefix, '/test/') def test_no_auth_type_init(self): app = FakeApp() ath = auth.filter_factory({})(app) self.assertEquals(ath.auth_type, 'Plaintext') def test_valid_auth_type_init(self): app = FakeApp() ath = auth.filter_factory( {'auth_type': 'sha1'})(app) self.assertEquals(ath.auth_type, 'Sha1') ath = auth.filter_factory( {'auth_type': 'plaintext'})(app) self.assertEquals(ath.auth_type, 'Plaintext') def test_invalid_auth_type_init(self): app = FakeApp() exc = None try: auth.filter_factory( {'auth_type': 'NONEXISTANT'})(app) except Exception as err: exc = err self.assertEquals(str(exc), 'Invalid auth_type in config file: %s' % 'Nonexistant') def test_default_metadata_volume_init(self): app = FakeApp() ath = auth.filter_factory({})(app) self.assertEquals(ath.metadata_volume, 'gsmetadata') def test_conf_metadata_volume_init(self): app = FakeApp() ath = auth.filter_factory( {'metadata_volume': 'meta_test'})(app) self.assertEquals(ath.metadata_volume, 'meta_test') ath = auth.filter_factory( {'metadata_volume': 'new_meta_volume'})(app) self.assertEquals(ath.metadata_volume, 'new_meta_volume') def test_default_swift_cluster_init(self): app = FakeApp() self.assertRaises(Exception, auth.filter_factory({ 'super_admin_key': 'supertest', 'default_swift_cluster': 'local#badscheme://host/path'}), app) ath = auth.filter_factory( {'super_admin_key': 'supertest'})(app) self.assertEquals(ath.default_swift_cluster, 'local#http://127.0.0.1:8080/v1') ath = auth.filter_factory({ 'super_admin_key': 'supertest', 'default_swift_cluster': 'local#http://host/path'})(app) self.assertEquals(ath.default_swift_cluster, 'local#http://host/path') ath = auth.filter_factory({ 'super_admin_key': 'supertest', 'default_swift_cluster': 'local#https://host/path/'})(app) self.assertEquals(ath.dsc_url, 'https://host/path') self.assertEquals(ath.dsc_url2, 'https://host/path') ath = auth.filter_factory({ 'super_admin_key': 'supertest', 'default_swift_cluster': 'local#https://host/path/#http://host2/path2/'})(app) self.assertEquals(ath.dsc_url, 'https://host/path') self.assertEquals( ath.dsc_url2, 'http://host2/path2') def test_top_level_denied(self): resp = Request.blank( '/').get_response(self.test_auth) self.assertEquals(resp.status_int, 401) def test_anon(self): resp = Request.blank( '/v1/AUTH_account').get_response(self.test_auth) self.assertEquals(resp.status_int, 401) self.assertEquals(resp.environ['swift.authorize'], self.test_auth.authorize) def test_auth_deny_non_reseller_prefix(self): resp = Request.blank( '/v1/BLAH_account', headers={'X-Auth-Token': 'BLAH_t'}).get_response(self.test_auth) self.assertEquals(resp.status_int, 401) self.assertEquals(resp.environ['swift.authorize'], self.test_auth.denied_response) def test_auth_deny_non_reseller_prefix_no_override( self): fake_authorize = lambda x: Response( status='500 Fake') resp = Request.blank( '/v1/BLAH_account', headers={'X-Auth-Token': 'BLAH_t'}, environ={'swift.authorize': fake_authorize}).get_response( self.test_auth) self.assertEquals(resp.status_int, 500) self.assertEquals(resp.environ['swift.authorize'], fake_authorize) def test_auth_no_reseller_prefix_deny(self): # Ensures that when we have no reseller prefix, we don't deny a request # outright but set up a denial swift.authorize and pass the request on # down the chain. local_app = FakeApp() local_auth = auth.filter_factory( {'super_admin_key': 'supertest', 'reseller_prefix': ''})(local_app) resp = Request.blank( '/v1/account', headers={'X-Auth-Token': 't'}).get_response(local_auth) self.assertEquals(resp.status_int, 401) # one for checking auth, two for request passed # along self.assertEquals(local_app.calls, 2) self.assertEquals(resp.environ['swift.authorize'], local_auth.denied_response) def test_auth_no_reseller_prefix_allow(self): # Ensures that when we have no reseller prefix, we can still allow # access if our auth server accepts requests local_app = FakeApp(iter([ ('200 Ok', {}, json.dumps( {'account': 'act', 'user': 'act:usr', 'account_id': 'AUTH_cfa', 'groups': [{'name': 'act:usr'}, {'name': 'act'}, {'name': '.admin'}], 'expires': time() + 60})), ('204 No Content', {}, '')])) local_auth = auth.filter_factory( {'super_admin_key': 'supertest', 'reseller_prefix': ''})(local_app) resp = Request.blank( '/v1/act', headers={'X-Auth-Token': 't'}).get_response(local_auth) self.assertEquals(resp.status_int, 204) self.assertEquals(local_app.calls, 2) self.assertEquals(resp.environ['swift.authorize'], local_auth.authorize) def test_auth_no_reseller_prefix_no_token(self): # Check that normally we set up a call back to our # authorize. local_auth = \ auth.filter_factory( {'super_admin_key': 'supertest', 'reseller_prefix': ''})(FakeApp(iter([]))) resp = Request.blank( '/v1/account').get_response( local_auth) self.assertEquals(resp.status_int, 401) self.assertEquals( resp.environ['swift.authorize'], local_auth.authorize) # Now make sure we don't override an existing swift.authorize when we # have no reseller prefix. local_auth = \ auth.filter_factory( {'super_admin_key': 'supertest', 'reseller_prefix': ''})(FakeApp()) local_authorize = lambda req: Response('test') resp = Request.blank( '/v1/account', environ={'swift.authorize': local_authorize}).get_response(local_auth) self.assertEquals(resp.status_int, 200) self.assertEquals( resp.environ['swift.authorize'], local_authorize) def test_auth_fail(self): resp = Request.blank( '/v1/AUTH_cfa', headers={'X-Auth-Token': 'AUTH_t'}).get_response(self.test_auth) self.assertEquals(resp.status_int, 401) def test_auth_success(self): self.test_auth.app = FakeApp(iter([ ('200 Ok', {}, json.dumps( {'account': 'act', 'user': 'act:usr', 'account_id': 'AUTH_cfa', 'groups': [{'name': 'act:usr'}, {'name': 'act'}, {'name': '.admin'}], 'expires': time() + 60})), ('204 No Content', {}, '')])) resp = Request.blank( '/v1/AUTH_cfa', headers={'X-Auth-Token': 'AUTH_t'}).get_response(self.test_auth) self.assertEquals(resp.status_int, 204) self.assertEquals(self.test_auth.app.calls, 2) def test_auth_memcache(self): # First run our test without memcache, showing we need to return the # token contents twice. self.test_auth.app = FakeApp(iter([ ('200 Ok', {}, json.dumps( {'account': 'act', 'user': 'act:usr', 'account_id': 'AUTH_cfa', 'groups': [{'name': 'act:usr'}, {'name': 'act'}, {'name': '.admin'}], 'expires': time() + 60})), ('204 No Content', {}, ''), ('200 Ok', {}, json.dumps( {'account': 'act', 'user': 'act:usr', 'account_id': 'AUTH_cfa', 'groups': [{'name': 'act:usr'}, {'name': 'act'}, {'name': '.admin'}], 'expires': time() + 60})), ('204 No Content', {}, '')])) resp = Request.blank( '/v1/AUTH_cfa', headers={'X-Auth-Token': 'AUTH_t'}).get_response(self.test_auth) self.assertEquals(resp.status_int, 204) resp = Request.blank( '/v1/AUTH_cfa', headers={'X-Auth-Token': 'AUTH_t'}).get_response(self.test_auth) self.assertEquals(resp.status_int, 204) self.assertEquals(self.test_auth.app.calls, 4) # Now run our test with memcache, showing we no longer need to return # the token contents twice. self.test_auth.app = FakeApp(iter([ ('200 Ok', {}, json.dumps( {'account': 'act', 'user': 'act:usr', 'account_id': 'AUTH_cfa', 'groups': [{'name': 'act:usr'}, {'name': 'act'}, {'name': '.admin'}], 'expires': time() + 60})), ('204 No Content', {}, ''), # Don't need a second token object returned if memcache is # used ('204 No Content', {}, '')])) fake_memcache = FakeMemcache() resp = Request.blank( '/v1/AUTH_cfa', headers={'X-Auth-Token': 'AUTH_t'}, environ={'swift.cache': fake_memcache}).get_response( self.test_auth) self.assertEquals(resp.status_int, 204) resp = Request.blank( '/v1/AUTH_cfa', headers={'X-Auth-Token': 'AUTH_t'}, environ={'swift.cache': fake_memcache}).get_response( self.test_auth) self.assertEquals(resp.status_int, 204) self.assertEquals(self.test_auth.app.calls, 3) def test_auth_just_expired(self): self.test_auth.app = FakeApp(iter([ # Request for token (which will have expired) ('200 Ok', {}, json.dumps( {'account': 'act', 'user': 'act:usr', 'account_id': 'AUTH_cfa', 'groups': [{'name': 'act:usr'}, {'name': 'act'}, {'name': '.admin'}], 'expires': time() - 1})), # Request to delete token ('204 No Content', {}, '')])) resp = Request.blank( '/v1/AUTH_cfa', headers={'X-Auth-Token': 'AUTH_t'}).get_response(self.test_auth) self.assertEquals(resp.status_int, 401) self.assertEquals(self.test_auth.app.calls, 2) def test_middleware_storage_token(self): self.test_auth.app = FakeApp(iter([ ('200 Ok', {}, json.dumps( {'account': 'act', 'user': 'act:usr', 'account_id': 'AUTH_cfa', 'groups': [{'name': 'act:usr'}, {'name': 'act'}, {'name': '.admin'}], 'expires': time() + 60})), ('204 No Content', {}, '')])) resp = Request.blank( '/v1/AUTH_cfa', headers={'X-Storage-Token': 'AUTH_t'}).get_response(self.test_auth) self.assertEquals(resp.status_int, 204) self.assertEquals(self.test_auth.app.calls, 2) def test_authorize_bad_path(self): req = Request.blank('/badpath') resp = self.test_auth.authorize(req) self.assertEquals(resp.status_int, 401) req = Request.blank('/badpath') req.remote_user = 'act:usr,act,AUTH_cfa' resp = self.test_auth.authorize(req) self.assertEquals(resp.status_int, 403) def test_authorize_account_access(self): req = Request.blank('/v1/AUTH_cfa') req.remote_user = 'act:usr,act,AUTH_cfa' self.assertEquals( self.test_auth.authorize(req), None) req = Request.blank('/v1/AUTH_cfa') req.remote_user = 'act:usr,act' resp = self.test_auth.authorize(req) self.assertEquals(resp.status_int, 403) def test_authorize_acl_group_access(self): req = Request.blank('/v1/AUTH_cfa') req.remote_user = 'act:usr,act' resp = self.test_auth.authorize(req) self.assertEquals(resp.status_int, 403) req = Request.blank('/v1/AUTH_cfa') req.remote_user = 'act:usr,act' req.acl = 'act' self.assertEquals( self.test_auth.authorize(req), None) req = Request.blank('/v1/AUTH_cfa') req.remote_user = 'act:usr,act' req.acl = 'act:usr' self.assertEquals( self.test_auth.authorize(req), None) req = Request.blank('/v1/AUTH_cfa') req.remote_user = 'act:usr,act' req.acl = 'act2' resp = self.test_auth.authorize(req) self.assertEquals(resp.status_int, 403) req = Request.blank('/v1/AUTH_cfa') req.remote_user = 'act:usr,act' req.acl = 'act:usr2' resp = self.test_auth.authorize(req) self.assertEquals(resp.status_int, 403) def test_deny_cross_reseller(self): # Tests that cross-reseller is denied, even if ACLs/group # names match req = Request.blank('/v1/OTHER_cfa') req.remote_user = 'act:usr,act,AUTH_cfa' req.acl = 'act' resp = self.test_auth.authorize(req) self.assertEquals(resp.status_int, 403) def test_authorize_acl_referrer_access(self): req = Request.blank('/v1/AUTH_cfa/c') req.remote_user = 'act:usr,act' resp = self.test_auth.authorize(req) self.assertEquals(resp.status_int, 403) req = Request.blank('/v1/AUTH_cfa/c') req.remote_user = 'act:usr,act' req.acl = '.r:*,.rlistings' self.assertEquals( self.test_auth.authorize(req), None) req = Request.blank('/v1/AUTH_cfa/c') req.remote_user = 'act:usr,act' req.acl = '.r:*' # No listings allowed resp = self.test_auth.authorize(req) self.assertEquals(resp.status_int, 403) req = Request.blank('/v1/AUTH_cfa/c') req.remote_user = 'act:usr,act' req.acl = '.r:.example.com,.rlistings' resp = self.test_auth.authorize(req) self.assertEquals(resp.status_int, 403) req = Request.blank('/v1/AUTH_cfa/c') req.remote_user = 'act:usr,act' req.referer = 'http://www.example.com/index.html' req.acl = '.r:.example.com,.rlistings' self.assertEquals( self.test_auth.authorize(req), None) req = Request.blank('/v1/AUTH_cfa/c') resp = self.test_auth.authorize(req) self.assertEquals(resp.status_int, 401) req = Request.blank('/v1/AUTH_cfa/c') req.acl = '.r:*,.rlistings' self.assertEquals( self.test_auth.authorize(req), None) req = Request.blank('/v1/AUTH_cfa/c') req.acl = '.r:*' # No listings allowed resp = self.test_auth.authorize(req) self.assertEquals(resp.status_int, 401) req = Request.blank('/v1/AUTH_cfa/c') req.acl = '.r:.example.com,.rlistings' resp = self.test_auth.authorize(req) self.assertEquals(resp.status_int, 401) req = Request.blank('/v1/AUTH_cfa/c') req.referer = 'http://www.example.com/index.html' req.acl = '.r:.example.com,.rlistings' self.assertEquals( self.test_auth.authorize(req), None) def test_detect_reseller_request(self): req = self._make_request('/v1/AUTH_admin', headers={'X-Auth-Token': 'AUTH_t'}) cache_key = 'AUTH_/auth/AUTH_t' cache_entry = (time() + 3600, '.reseller_admin') req.environ['swift.cache'].set( cache_key, cache_entry) resp = req.get_response(self.test_auth) self.assertTrue(req.environ.get('reseller_request')) def test_account_put_permissions(self): req = Request.blank( '/v1/AUTH_new', environ={'REQUEST_METHOD': 'PUT'}) req.remote_user = 'act:usr,act' resp = self.test_auth.authorize(req) self.assertEquals(resp.status_int, 403) req = Request.blank( '/v1/AUTH_new', environ={'REQUEST_METHOD': 'PUT'}) req.remote_user = 'act:usr,act,AUTH_other' resp = self.test_auth.authorize(req) self.assertEquals(resp.status_int, 403) # Even PUTs to your own account as account admin # should fail req = Request.blank( '/v1/AUTH_old', environ={'REQUEST_METHOD': 'PUT'}) req.remote_user = 'act:usr,act,AUTH_old' resp = self.test_auth.authorize(req) self.assertEquals(resp.status_int, 403) req = Request.blank( '/v1/AUTH_new', environ={'REQUEST_METHOD': 'PUT'}) req.remote_user = 'act:usr,act,.reseller_admin' resp = self.test_auth.authorize(req) self.assertEquals(resp, None) # .super_admin is not something the middleware should ever see or care # about req = Request.blank( '/v1/AUTH_new', environ={'REQUEST_METHOD': 'PUT'}) req.remote_user = 'act:usr,act,.super_admin' resp = self.test_auth.authorize(req) self.assertEquals(resp.status_int, 403) def test_account_delete_permissions(self): req = Request.blank('/v1/AUTH_new', environ={'REQUEST_METHOD': 'DELETE'}) req.remote_user = 'act:usr,act' resp = self.test_auth.authorize(req) self.assertEquals(resp.status_int, 403) req = Request.blank('/v1/AUTH_new', environ={'REQUEST_METHOD': 'DELETE'}) req.remote_user = 'act:usr,act,AUTH_other' resp = self.test_auth.authorize(req) self.assertEquals(resp.status_int, 403) # Even DELETEs to your own account as account admin should # fail req = Request.blank('/v1/AUTH_old', environ={'REQUEST_METHOD': 'DELETE'}) req.remote_user = 'act:usr,act,AUTH_old' resp = self.test_auth.authorize(req) self.assertEquals(resp.status_int, 403) req = Request.blank('/v1/AUTH_new', environ={'REQUEST_METHOD': 'DELETE'}) req.remote_user = 'act:usr,act,.reseller_admin' resp = self.test_auth.authorize(req) self.assertEquals(resp, None) # .super_admin is not something the middleware should ever see or care # about req = Request.blank('/v1/AUTH_new', environ={'REQUEST_METHOD': 'DELETE'}) req.remote_user = 'act:usr,act,.super_admin' resp = self.test_auth.authorize(req) resp = self.test_auth.authorize(req) self.assertEquals(resp.status_int, 403) def test_get_token_fail(self): resp = Request.blank( '/auth/v1.0').get_response( self.test_auth) self.assertEquals(resp.status_int, 401) resp = Request.blank( '/auth/v1.0', headers={'X-Auth-User': 'act:usr', 'X-Auth-Key': 'key'}).get_response(self.test_auth) self.assertEquals(resp.status_int, 401) def test_get_token_fail_invalid_key(self): self.test_auth.app = FakeApp(iter([ # GET of user object ('200 Ok', {}, json.dumps({"auth": "plaintext:key", "groups": [{'name': "act:usr"}, {'name': "act"}, {'name': ".admin"}]}))])) resp = Request.blank( '/auth/v1.0', headers={'X-Auth-User': 'act:usr', 'X-Auth-Key': 'invalid'}).get_response(self.test_auth) self.assertEquals(resp.status_int, 401) self.assertEquals(self.test_auth.app.calls, 1) def test_get_token_fail_invalid_x_auth_user_format( self): resp = Request.blank( '/auth/v1/act/auth', headers={'X-Auth-User': 'usr', 'X-Auth-Key': 'key'}).get_response(self.test_auth) self.assertEquals(resp.status_int, 401) def test_get_token_fail_non_matching_account_in_request( self): resp = Request.blank( '/auth/v1/act/auth', headers={'X-Auth-User': 'act2:usr', 'X-Auth-Key': 'key'}).get_response(self.test_auth) self.assertEquals(resp.status_int, 401) def test_get_token_fail_bad_path(self): resp = Request.blank( '/auth/v1/act/auth/invalid', headers={'X-Auth-User': 'act:usr', 'X-Auth-Key': 'key'}).get_response(self.test_auth) self.assertEquals(resp.status_int, 400) def test_get_token_fail_missing_key(self): resp = Request.blank( '/auth/v1/act/auth', headers={'X-Auth-User': 'act:usr'}).get_response(self.test_auth) self.assertEquals(resp.status_int, 401) def test_get_token_fail_get_user_details(self): self.test_auth.app = FakeApp(iter([ ('503 Service Unavailable', {}, '')])) resp = Request.blank( '/auth/v1.0', headers={'X-Auth-User': 'act:usr', 'X-Auth-Key': 'key'}).get_response(self.test_auth) self.assertEquals(resp.status_int, 500) self.assertEquals(self.test_auth.app.calls, 1) def test_get_token_fail_get_account(self): self.test_auth.app = FakeApp(iter([ # GET of user object ('200 Ok', {}, json.dumps({"auth": "plaintext:key", "groups": [{'name': "act:usr"}, {'name': "act"}, {'name': ".admin"}]})), # GET of account ('503 Service Unavailable', {}, '')])) resp = Request.blank( '/auth/v1.0', headers={'X-Auth-User': 'act:usr', 'X-Auth-Key': 'key'}).get_response(self.test_auth) self.assertEquals(resp.status_int, 500) self.assertEquals(self.test_auth.app.calls, 2) def test_get_token_fail_put_new_token(self): self.test_auth.app = FakeApp(iter([ # GET of user object ('200 Ok', {}, json.dumps({"auth": "plaintext:key", "groups": [{'name': "act:usr"}, {'name': "act"}, {'name': ".admin"}]})), # GET of account ('204 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, ''), # PUT of new token ('503 Service Unavailable', {}, '')])) resp = Request.blank( '/auth/v1.0', headers={'X-Auth-User': 'act:usr', 'X-Auth-Key': 'key'}).get_response(self.test_auth) self.assertEquals(resp.status_int, 500) self.assertEquals(self.test_auth.app.calls, 3) def test_get_token_fail_post_to_user(self): self.test_auth.app = FakeApp(iter([ # GET of user object ('200 Ok', {}, json.dumps({"auth": "plaintext:key", "groups": [{'name': "act:usr"}, {'name': "act"}, {'name': ".admin"}]})), # GET of account ('204 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, ''), # PUT of new token ('201 Created', {}, ''), # POST of token to user object ('503 Service Unavailable', {}, '')])) resp = Request.blank( '/auth/v1.0', headers={'X-Auth-User': 'act:usr', 'X-Auth-Key': 'key'}).get_response(self.test_auth) self.assertEquals(resp.status_int, 500) self.assertEquals(self.test_auth.app.calls, 4) def test_get_token_fail_get_services(self): self.test_auth.app = FakeApp(iter([ # GET of user object ('200 Ok', {}, json.dumps({"auth": "plaintext:key", "groups": [{'name': "act:usr"}, {'name': "act"}, {'name': ".admin"}]})), # GET of account ('204 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, ''), # PUT of new token ('201 Created', {}, ''), # POST of token to user object ('204 No Content', {}, ''), # GET of services object ('503 Service Unavailable', {}, '')])) resp = Request.blank( '/auth/v1.0', headers={'X-Auth-User': 'act:usr', 'X-Auth-Key': 'key'}).get_response(self.test_auth) self.assertEquals(resp.status_int, 500) self.assertEquals(self.test_auth.app.calls, 5) def test_get_token_fail_get_existing_token(self): self.test_auth.app = FakeApp(iter([ # GET of user object ('200 Ok', {'X-Object-Meta-Auth-Token': 'AUTH_tktest'}, json.dumps({"auth": "plaintext:key", "groups": [{'name': "act:usr"}, {'name': "act"}, {'name': ".admin"}]})), # GET of token ('503 Service Unavailable', {}, '')])) resp = Request.blank( '/auth/v1.0', headers={'X-Auth-User': 'act:usr', 'X-Auth-Key': 'key'}).get_response(self.test_auth) self.assertEquals(resp.status_int, 500) self.assertEquals(self.test_auth.app.calls, 2) def test_get_token_for_auth_acct_success(self): fmc = FakeMemcache() local_auth = \ auth.filter_factory({ 'super_admin_key': 'supertest', 'metadata_volume': 'gsmd', 'token_life': str(DEFAULT_TOKEN_LIFE), 'max_token_life': str(MAX_TOKEN_LIFE)})(FakeApp()) resp = Request.blank( '/auth/v1.0', environ={'REQUEST_METHOD': 'GET', 'swift.cache': fmc}, headers={'X-Auth-User': 'act:.super_admin', 'X-Auth-Key': 'supertest'}).get_response(local_auth) self.assertEquals(resp.status_int, 200) itk = resp.headers.get('x-auth-token') self.assertTrue(itk.startswith('AUTH_itk'), itk) self.assertEquals(resp.headers.get('x-storage-url'), 'http://127.0.0.1:8080/v1/AUTH_gsmd') expires, groups = fmc.get('AUTH_/auth/%s' % itk) self.assertEquals(groups, 'gsmd,.reseller_admin,AUTH_gsmd') def test_get_token_for_auth_acct_fail_passwd(self): local_auth = \ auth.filter_factory({ 'super_admin_key': 'supertest', 'metadata_volume': 'gsmd', 'token_life': str(DEFAULT_TOKEN_LIFE), 'max_token_life': str(MAX_TOKEN_LIFE)})(FakeApp()) resp = Request.blank( '/auth/v1.0', environ={'REQUEST_METHOD': 'GET', 'swift.cache': FakeMemcache()}, headers={'X-Auth-User': 'act:.super_admin', 'X-Auth-Key': 'invalidpasswd'}).get_response(local_auth) self.assertEquals(resp.status_int, 401) def test_get_token_success_v1_0(self): self.test_auth.app = FakeApp(iter([ # GET of user object ('200 Ok', {}, json.dumps({"auth": "plaintext:key", "groups": [{'name': "act:usr"}, {'name': "act"}, {'name': ".admin"}]})), # GET of account ('204 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, ''), # PUT of new token ('201 Created', {}, ''), # POST of token to user object ('204 No Content', {}, ''), # GET of services object ('200 Ok', {}, json.dumps({"storage": {"default": "local", "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}}))])) resp = Request.blank( '/auth/v1.0', headers={'X-Auth-User': 'act:usr', 'X-Auth-Key': 'key'}).get_response(self.test_auth) self.assertEquals(resp.status_int, 200) self.assert_(resp.headers.get( 'x-auth-token', '').startswith('AUTH_tk'), resp.headers.get('x-auth-token')) self.assertEquals(resp.headers.get('x-auth-token'), resp.headers.get('x-storage-token')) self.assertEquals(resp.headers.get('x-storage-url'), 'http://127.0.0.1:8080/v1/AUTH_cfa') self.assertEquals( json.loads(resp.body), {"storage": {"default": "local", "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}}) self.assertEquals(self.test_auth.app.calls, 5) def test_get_token_success_v1_0_with_user_token_life( self): self.test_auth.app = FakeApp(iter([ # GET of user object ('200 Ok', {}, json.dumps({"auth": "plaintext:key", "groups": [{'name': "act:usr"}, {'name': "act"}, {'name': ".admin"}]})), # GET of account ('204 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, ''), # PUT of new token ('201 Created', {}, ''), # POST of token to user object ('204 No Content', {}, ''), # GET of services object ('200 Ok', {}, json.dumps({"storage": {"default": "local", "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}}))])) resp = Request.blank( '/auth/v1.0', headers={'X-Auth-User': 'act:usr', 'X-Auth-Key': 'key', 'X-Auth-Token-Lifetime': 10}).get_response(self.test_auth) self.assertEquals(resp.status_int, 200) left = int(resp.headers['x-auth-token-expires']) self.assertTrue(left > 0, '%d > 0' % left) self.assertTrue(left <= 10, '%d <= 10' % left) self.assert_(resp.headers.get( 'x-auth-token', '').startswith('AUTH_tk'), resp.headers.get('x-auth-token')) self.assertEquals(resp.headers.get('x-auth-token'), resp.headers.get('x-storage-token')) self.assertEquals(resp.headers.get('x-storage-url'), 'http://127.0.0.1:8080/v1/AUTH_cfa') self.assertEquals( json.loads(resp.body), {"storage": {"default": "local", "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}}) self.assertEquals(self.test_auth.app.calls, 5) def test_get_token_success_v1_0_with_user_token_life_past_max( self): self.test_auth.app = FakeApp(iter([ # GET of user object ('200 Ok', {}, json.dumps({"auth": "plaintext:key", "groups": [{'name': "act:usr"}, {'name': "act"}, {'name': ".admin"}]})), # GET of account ('204 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, ''), # PUT of new token ('201 Created', {}, ''), # POST of token to user object ('204 No Content', {}, ''), # GET of services object ('200 Ok', {}, json.dumps({"storage": {"default": "local", "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}}))])) req = Request.blank( '/auth/v1.0', headers={'X-Auth-User': 'act:usr', 'X-Auth-Key': 'key', 'X-Auth-Token-Lifetime': MAX_TOKEN_LIFE * 10}) resp = req.get_response(self.test_auth) self.assertEquals(resp.status_int, 200) left = int(resp.headers['x-auth-token-expires']) self.assertTrue(left > DEFAULT_TOKEN_LIFE, '%d > %d' % (left, DEFAULT_TOKEN_LIFE)) self.assertTrue(left <= MAX_TOKEN_LIFE, '%d <= %d' % (left, MAX_TOKEN_LIFE)) self.assert_(resp.headers.get( 'x-auth-token', '').startswith('AUTH_tk'), resp.headers.get('x-auth-token')) self.assertEquals(resp.headers.get('x-auth-token'), resp.headers.get('x-storage-token')) self.assertEquals(resp.headers.get('x-storage-url'), 'http://127.0.0.1:8080/v1/AUTH_cfa') self.assertEquals( json.loads(resp.body), {"storage": {"default": "local", "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}}) self.assertEquals(self.test_auth.app.calls, 5) def test_get_token_success_v1_act_auth(self): self.test_auth.app = FakeApp(iter([ # GET of user object ('200 Ok', {}, json.dumps({"auth": "plaintext:key", "groups": [{'name': "act:usr"}, {'name': "act"}, {'name': ".admin"}]})), # GET of account ('204 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, ''), # PUT of new token ('201 Created', {}, ''), # POST of token to user object ('204 No Content', {}, ''), # GET of services object ('200 Ok', {}, json.dumps({"storage": {"default": "local", "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}}))])) resp = Request.blank( '/auth/v1/act/auth', headers={'X-Storage-User': 'usr', 'X-Storage-Pass': 'key'}).get_response(self.test_auth) self.assertEquals(resp.status_int, 200) self.assert_(resp.headers.get( 'x-auth-token', '').startswith('AUTH_tk'), resp.headers.get('x-auth-token')) self.assertEquals(resp.headers.get('x-auth-token'), resp.headers.get('x-storage-token')) self.assertEquals(resp.headers.get('x-storage-url'), 'http://127.0.0.1:8080/v1/AUTH_cfa') self.assertEquals( json.loads(resp.body), {"storage": {"default": "local", "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}}) self.assertEquals(self.test_auth.app.calls, 5) def test_get_token_success_storage_instead_of_auth( self): self.test_auth.app = FakeApp(iter([ # GET of user object ('200 Ok', {}, json.dumps({"auth": "plaintext:key", "groups": [{'name': "act:usr"}, {'name': "act"}, {'name': ".admin"}]})), # GET of account ('204 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, ''), # PUT of new token ('201 Created', {}, ''), # POST of token to user object ('204 No Content', {}, ''), # GET of services object ('200 Ok', {}, json.dumps({"storage": {"default": "local", "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}}))])) resp = Request.blank( '/auth/v1.0', headers={'X-Storage-User': 'act:usr', 'X-Storage-Pass': 'key'}).get_response(self.test_auth) self.assertEquals(resp.status_int, 200) self.assert_( resp.headers.get( 'x-auth-token', '').startswith('AUTH_tk'), resp.headers.get('x-auth-token')) self.assertEquals(resp.headers.get('x-auth-token'), resp.headers.get('x-storage-token')) self.assertEquals(resp.headers.get('x-storage-url'), 'http://127.0.0.1:8080/v1/AUTH_cfa') self.assertEquals( json.loads(resp.body), {"storage": {"default": "local", "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}}) self.assertEquals(self.test_auth.app.calls, 5) def test_get_token_success_v1_act_auth_auth_instead_of_storage( self): self.test_auth.app = FakeApp(iter([ # GET of user object ('200 Ok', {}, json.dumps({"auth": "plaintext:key", "groups": [{'name': "act:usr"}, {'name': "act"}, {'name': ".admin"}]})), # GET of account ('204 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, ''), # PUT of new token ('201 Created', {}, ''), # POST of token to user object ('204 No Content', {}, ''), # GET of services object ('200 Ok', {}, json.dumps({"storage": {"default": "local", "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}}))])) resp = Request.blank( '/auth/v1/act/auth', headers={'X-Auth-User': 'act:usr', 'X-Auth-Key': 'key'}).get_response(self.test_auth) self.assertEquals(resp.status_int, 200) self.assert_(resp.headers.get( 'x-auth-token', '').startswith('AUTH_tk'), resp.headers.get('x-auth-token')) self.assertEquals(resp.headers.get('x-auth-token'), resp.headers.get('x-storage-token')) self.assertEquals(resp.headers.get('x-storage-url'), 'http://127.0.0.1:8080/v1/AUTH_cfa') self.assertEquals( json.loads(resp.body), {"storage": {"default": "local", "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}}) self.assertEquals(self.test_auth.app.calls, 5) def test_get_token_success_existing_token(self): self.test_auth.app = FakeApp(iter([ # GET of user object ('200 Ok', {'X-Object-Meta-Auth-Token': 'AUTH_tktest'}, json.dumps({"auth": "plaintext:key", "groups": [{'name': "act:usr"}, {'name': "act"}, {'name': ".admin"}]})), # GET of token ('200 Ok', {}, json.dumps( {"account": "act", "user": "usr", "account_id": "AUTH_cfa", "groups": [{'name': "act:usr"}, {'name': "key"}, {'name': ".admin"}], "expires": 9999999999.9999999})), # GET of services object ('200 Ok', {}, json.dumps({"storage": {"default": "local", "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}}))])) resp = Request.blank( '/auth/v1.0', headers={'X-Auth-User': 'act:usr', 'X-Auth-Key': 'key'}).get_response(self.test_auth) self.assertEquals(resp.status_int, 200) self.assertEquals( resp.headers.get('x-auth-token'), 'AUTH_tktest') self.assertEquals(resp.headers.get('x-auth-token'), resp.headers.get('x-storage-token')) self.assertEquals(resp.headers.get('x-storage-url'), 'http://127.0.0.1:8080/v1/AUTH_cfa') self.assertEquals( json.loads(resp.body), {"storage": {"default": "local", "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}}) self.assertEquals(self.test_auth.app.calls, 3) def test_get_token_success_existing_token_but_request_new_one( self): self.test_auth.app = FakeApp(iter([ # GET of user object ('200 Ok', {'X-Object-Meta-Auth-Token': 'AUTH_tktest'}, json.dumps({"auth": "plaintext:key", "groups": [{'name': "act:usr"}, {'name': "act"}, {'name': ".admin"}]})), # DELETE of expired token ('204 No Content', {}, ''), # GET of account ('204 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, ''), # PUT of new token ('201 Created', {}, ''), # POST of token to user object ('204 No Content', {}, ''), # GET of services object ('200 Ok', {}, json.dumps({"storage": {"default": "local", "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}}))])) resp = Request.blank( '/auth/v1.0', headers={'X-Auth-User': 'act:usr', 'X-Auth-Key': 'key', 'X-Auth-New-Token': 'true'}).get_response( self.test_auth) self.assertEquals(resp.status_int, 200) self.assertNotEquals( resp.headers.get('x-auth-token'), 'AUTH_tktest') self.assertEquals(resp.headers.get('x-auth-token'), resp.headers.get('x-storage-token')) self.assertEquals(resp.headers.get('x-storage-url'), 'http://127.0.0.1:8080/v1/AUTH_cfa') self.assertEquals( json.loads(resp.body), {"storage": {"default": "local", "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}}) self.assertEquals(self.test_auth.app.calls, 6) def test_get_token_success_existing_token_expired(self): self.test_auth.app = FakeApp(iter([ # GET of user object ('200 Ok', {'X-Object-Meta-Auth-Token': 'AUTH_tktest'}, json.dumps({"auth": "plaintext:key", "groups": [{'name': "act:usr"}, {'name': "act"}, {'name': ".admin"}]})), # GET of token ('200 Ok', {}, json.dumps( {"account": "act", "user": "usr", "account_id": "AUTH_cfa", "groups": [{'name': "act:usr"}, {'name': "key"}, {'name': ".admin"}], "expires": 0.0})), # DELETE of expired token ('204 No Content', {}, ''), # GET of account ('204 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, ''), # PUT of new token ('201 Created', {}, ''), # POST of token to user object ('204 No Content', {}, ''), # GET of services object ('200 Ok', {}, json.dumps({"storage": {"default": "local", "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}}))])) resp = Request.blank( '/auth/v1.0', headers={'X-Auth-User': 'act:usr', 'X-Auth-Key': 'key'}).get_response(self.test_auth) self.assertEquals(resp.status_int, 200) self.assertNotEquals( resp.headers.get('x-auth-token'), 'AUTH_tktest') self.assertEquals(resp.headers.get('x-auth-token'), resp.headers.get('x-storage-token')) self.assertEquals(resp.headers.get('x-storage-url'), 'http://127.0.0.1:8080/v1/AUTH_cfa') self.assertEquals( json.loads(resp.body), {"storage": {"default": "local", "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}}) self.assertEquals(self.test_auth.app.calls, 7) def test_get_token_success_existing_token_expired_fail_deleting_old( self): self.test_auth.app = FakeApp(iter([ # GET of user object ('200 Ok', {'X-Object-Meta-Auth-Token': 'AUTH_tktest'}, json.dumps({"auth": "plaintext:key", "groups": [{'name': "act:usr"}, {'name': "act"}, {'name': ".admin"}]})), # GET of token ('200 Ok', {}, json.dumps({"account": "act", "user": "usr", "account_id": "AUTH_cfa", "groups": [{'name': "act:usr"}, {'name': "key"}, {'name': ".admin"}], "expires": 0.0})), # DELETE of expired token ('503 Service Unavailable', {}, ''), # GET of account ('204 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, ''), # PUT of new token ('201 Created', {}, ''), # POST of token to user object ('204 No Content', {}, ''), # GET of services object ('200 Ok', {}, json.dumps({"storage": {"default": "local", "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}}))])) resp = Request.blank( '/auth/v1.0', headers={'X-Auth-User': 'act:usr', 'X-Auth-Key': 'key'}).get_response(self.test_auth) self.assertEquals(resp.status_int, 200) self.assertNotEquals( resp.headers.get('x-auth-token'), 'AUTH_tktest') self.assertEquals(resp.headers.get('x-auth-token'), resp.headers.get('x-storage-token')) self.assertEquals(resp.headers.get('x-storage-url'), 'http://127.0.0.1:8080/v1/AUTH_cfa') self.assertEquals( json.loads(resp.body), {"storage": {"default": "local", "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}}) self.assertEquals(self.test_auth.app.calls, 7) def test_prep_success(self): list_to_iter = [ # PUT of gsmetadata account ('201 Created', {}, ''), # PUT of .account_id container ('201 Created', {}, '')] # PUT of .token* containers for x in xrange(16): list_to_iter.append(('201 Created', {}, '')) self.test_auth.app = FakeApp(iter(list_to_iter)) resp = Request.blank('/auth/v2/.prep', environ={ 'REQUEST_METHOD': 'POST'}, headers={ 'X-Auth-Admin-User': '.super_admin', 'X-Auth-Admin-Key': 'supertest'} ).get_response(self.test_auth) self.assertEquals(resp.status_int, 204) self.assertEquals(self.test_auth.app.calls, 17) def test_prep_bad_method(self): resp = Request.blank('/auth/v2/.prep', headers={ 'X-Auth-Admin-User': '.super_admin', 'X-Auth-Admin-Key': 'supertest'} ).get_response(self.test_auth) self.assertEquals(resp.status_int, 400) resp = Request.blank('/auth/v2/.prep', environ={ 'REQUEST_METHOD': 'HEAD'}, headers={ 'X-Auth-Admin-User': '.super_admin', 'X-Auth-Admin-Key': 'supertest'} ).get_response(self.test_auth) self.assertEquals(resp.status_int, 400) resp = Request.blank('/auth/v2/.prep', environ={ 'REQUEST_METHOD': 'PUT'}, headers={ 'X-Auth-Admin-User': '.super_admin', 'X-Auth-Admin-Key': 'supertest'} ).get_response(self.test_auth) self.assertEquals(resp.status_int, 400) def test_prep_bad_creds(self): resp = Request.blank('/auth/v2/.prep', environ={ 'REQUEST_METHOD': 'POST'}, headers={ 'X-Auth-Admin-User': 'super_admin', 'X-Auth-Admin-Key': 'supertest'} ).get_response(self.test_auth) self.assertEquals(resp.status_int, 401) resp = Request.blank('/auth/v2/.prep', environ={ 'REQUEST_METHOD': 'POST'}, headers={ 'X-Auth-Admin-User': '.super_admin', 'X-Auth-Admin-Key': 'upertest'} ).get_response(self.test_auth) self.assertEquals(resp.status_int, 401) resp = Request.blank('/auth/v2/.prep', environ={ 'REQUEST_METHOD': 'POST'}, headers={ 'X-Auth-Admin-User': '.super_admin'} ).get_response(self.test_auth) self.assertEquals(resp.status_int, 401) resp = Request.blank('/auth/v2/.prep', environ={ 'REQUEST_METHOD': 'POST'}, headers={ 'X-Auth-Admin-Key': 'supertest'} ).get_response(self.test_auth) self.assertEquals(resp.status_int, 401) resp = Request.blank( '/auth/v2/.prep', environ={'REQUEST_METHOD': 'POST'}).get_response(self.test_auth) self.assertEquals(resp.status_int, 401) def test_prep_fail_account_create(self): self.test_auth.app = FakeApp(iter([ # PUT of gsmetadata account ('503 Service Unavailable', {}, '')])) resp = Request.blank('/auth/v2/.prep', environ={ 'REQUEST_METHOD': 'POST'}, headers={ 'X-Auth-Admin-User': '.super_admin', 'X-Auth-Admin-Key': 'supertest'} ).get_response(self.test_auth) self.assertEquals(resp.status_int, 500) self.assertEquals(self.test_auth.app.calls, 1) def test_prep_fail_token_container_create(self): self.test_auth.app = FakeApp(iter([ # PUT of gsmetadata account ('201 Created', {}, ''), # PUT of .token container ('503 Service Unavailable', {}, '')])) resp = Request.blank('/auth/v2/.prep', environ={ 'REQUEST_METHOD': 'POST'}, headers={ 'X-Auth-Admin-User': '.super_admin', 'X-Auth-Admin-Key': 'supertest'} ).get_response(self.test_auth) self.assertEquals(resp.status_int, 500) self.assertEquals(self.test_auth.app.calls, 2) def test_prep_fail_account_id_container_create(self): self.test_auth.app = FakeApp(iter([ # PUT of gsmetadata account ('201 Created', {}, ''), # PUT of .token container ('201 Created', {}, ''), # PUT of .account_id container ('503 Service Unavailable', {}, '')])) resp = Request.blank('/auth/v2/.prep', environ={ 'REQUEST_METHOD': 'POST'}, headers={ 'X-Auth-Admin-User': '.super_admin', 'X-Auth-Admin-Key': 'supertest'} ).get_response(self.test_auth) self.assertEquals(resp.status_int, 500) self.assertEquals(self.test_auth.app.calls, 3) def test_get_reseller_success(self): self.test_auth.app = FakeApp(iter([ # GET of gsmetadata account (list containers) ('200 Ok', {}, json.dumps([ {"name": ".token", "count": 0, "bytes": 0}, {"name": ".account_id", "count": 0, "bytes": 0}, {"name": "act", "count": 0, "bytes": 0}])), # GET of gsmetadata account (list containers # continuation) ('200 Ok', {}, '[]')])) resp = Request.blank('/auth/v2', headers={ 'X-Auth-Admin-User': '.super_admin', 'X-Auth-Admin-Key': 'supertest'} ).get_response(self.test_auth) self.assertEquals(resp.status_int, 200) self.assertEquals(json.loads(resp.body), {"accounts": [{"name": "act"}]}) self.assertEquals(self.test_auth.app.calls, 2) self.test_auth.app = FakeApp(iter([ # GET of user object ('200 Ok', {}, json.dumps({"groups": [{"name": "act:adm"}, {"name": "test"}, {"name": ".admin"}, {"name": ".reseller_admin"}], "auth": "plaintext:key"})), # GET of gsmetadata account (list containers) ('200 Ok', {}, json.dumps([ {"name": ".token", "count": 0, "bytes": 0}, {"name": ".account_id", "count": 0, "bytes": 0}, {"name": "act", "count": 0, "bytes": 0}])), # GET of gsmetadata account (list containers # continuation) ('200 Ok', {}, '[]')])) resp = Request.blank('/auth/v2', headers={ 'X-Auth-Admin-User': 'act:adm', 'X-Auth-Admin-Key': 'key'} ).get_response(self.test_auth) self.assertEquals(resp.status_int, 200) self.assertEquals(json.loads(resp.body), {"accounts": [{"name": "act"}]}) self.assertEquals(self.test_auth.app.calls, 3) def test_get_reseller_fail_bad_creds(self): self.test_auth.app = FakeApp(iter([ # GET of user object ('404 Not Found', {}, '')])) resp = Request.blank('/auth/v2', headers={ 'X-Auth-Admin-User': 'super:admin', 'X-Auth-Admin-Key': 'supertest'} ).get_response(self.test_auth) self.assertEquals(resp.status_int, 401) self.assertEquals(self.test_auth.app.calls, 1) self.test_auth.app = FakeApp(iter([ # GET of user object (account admin, but not reseller # admin) ('200 Ok', {}, json.dumps({"groups": [{"name": "act:adm"}, {"name": "test"}, {"name": ".admin"}], "auth": "plaintext:key"}))])) resp = Request.blank('/auth/v2', headers={ 'X-Auth-Admin-User': 'act:adm', 'X-Auth-Admin-Key': 'key'} ).get_response(self.test_auth) self.assertEquals(resp.status_int, 403) self.assertEquals(self.test_auth.app.calls, 1) self.test_auth.app = FakeApp(iter([ # GET of user object (regular user) ('200 Ok', {}, json.dumps({"groups": [{"name": "act:usr"}, {"name": "test"}], "auth": "plaintext:key"}))])) resp = Request.blank('/auth/v2', headers={ 'X-Auth-Admin-User': 'act:usr', 'X-Auth-Admin-Key': 'key'} ).get_response(self.test_auth) self.assertEquals(resp.status_int, 403) self.assertEquals(self.test_auth.app.calls, 1) def test_get_reseller_fail_listing(self): self.test_auth.app = FakeApp(iter([ # GET of gsmetadata account (list containers) ('503 Service Unavailable', {}, '')])) resp = Request.blank('/auth/v2', headers={ 'X-Auth-Admin-User': '.super_admin', 'X-Auth-Admin-Key': 'supertest'} ).get_response(self.test_auth) self.assertEquals(resp.status_int, 500) self.assertEquals(self.test_auth.app.calls, 1) self.test_auth.app = FakeApp(iter([ # GET of gsmetadata account (list containers) ('200 Ok', {}, json.dumps([ {"name": ".token", "count": 0, "bytes": 0}, {"name": ".account_id", "count": 0, "bytes": 0}, {"name": "act", "count": 0, "bytes": 0}])), # GET of gsmetadata account (list containers # continuation) ('503 Service Unavailable', {}, '')])) resp = Request.blank('/auth/v2', headers={ 'X-Auth-Admin-User': '.super_admin', 'X-Auth-Admin-Key': 'supertest'} ).get_response(self.test_auth) self.assertEquals(resp.status_int, 500) self.assertEquals(self.test_auth.app.calls, 2) def test_get_account_success(self): self.test_auth.app = FakeApp(iter([ # GET of .services object ('200 Ok', {}, json.dumps( {"storage": {"default": "local", "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}})), # GET of account container (list objects) ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, json.dumps([ {"name": ".services", "hash": "etag", "bytes": 112, "content_type": "application/octet-stream", "last_modified": "2010-12-03T17:16:27.618110"}, {"name": "tester", "hash": "etag", "bytes": 104, "content_type": "application/octet-stream", "last_modified": "2010-12-03T17:16:27.736680"}, {"name": "tester3", "hash": "etag", "bytes": 86, "content_type": "application/octet-stream", "last_modified": "2010-12-03T17:16:28.135530"}])), # GET of account container (list objects # continuation) ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, '[]')])) resp = Request.blank('/auth/v2/act', headers={ 'X-Auth-Admin-User': '.super_admin', 'X-Auth-Admin-Key': 'supertest'} ).get_response(self.test_auth) self.assertEquals(resp.status_int, 200) self.assertEquals( json.loads(resp.body), {'account_id': 'AUTH_cfa', 'services': {'storage': {'default': 'local', 'local': 'http://127.0.0.1:8080/v1/AUTH_cfa'}}, 'users': [{'name': 'tester'}, {'name': 'tester3'}]}) self.assertEquals(self.test_auth.app.calls, 3) self.test_auth.app = FakeApp(iter([ # GET of user object ('200 Ok', {}, json.dumps({"groups": [{"name": "act:adm"}, {"name": "test"}, {"name": ".admin"}], "auth": "plaintext:key"})), # GET of .services object ('200 Ok', {}, json.dumps({"storage": {"default": "local", "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}})), # GET of account container (list objects) ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, json.dumps([ {"name": ".services", "hash": "etag", "bytes": 112, "content_type": "application/octet-stream", "last_modified": "2010-12-03T17:16:27.618110"}, {"name": "tester", "hash": "etag", "bytes": 104, "content_type": "application/octet-stream", "last_modified": "2010-12-03T17:16:27.736680"}, {"name": "tester3", "hash": "etag", "bytes": 86, "content_type": "application/octet-stream", "last_modified": "2010-12-03T17:16:28.135530"}])), # GET of account container (list objects # continuation) ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, '[]')])) resp = Request.blank('/auth/v2/act', headers={ 'X-Auth-Admin-User': 'act:adm', 'X-Auth-Admin-Key': 'key'} ).get_response(self.test_auth) self.assertEquals(resp.status_int, 200) self.assertEquals( json.loads(resp.body), {'account_id': 'AUTH_cfa', 'services': {'storage': {'default': 'local', 'local': 'http://127.0.0.1:8080/v1/AUTH_cfa'}}, 'users': [{'name': 'tester'}, {'name': 'tester3'}]}) self.assertEquals(self.test_auth.app.calls, 4) def test_get_account_fail_bad_account_name(self): resp = Request.blank('/auth/v2/.token', headers={ 'X-Auth-Admin-User': '.super_admin', 'X-Auth-Admin-Key': 'supertest'} ).get_response(self.test_auth) self.assertEquals(resp.status_int, 400) resp = Request.blank('/auth/v2/.anything', headers={ 'X-Auth-Admin-User': '.super_admin', 'X-Auth-Admin-Key': 'supertest'} ).get_response(self.test_auth) self.assertEquals(resp.status_int, 400) def test_get_account_fail_creds(self): self.test_auth.app = FakeApp(iter([ # GET of user object ('404 Not Found', {}, '')])) resp = Request.blank('/auth/v2/act', headers={ 'X-Auth-Admin-User': 'super:admin', 'X-Auth-Admin-Key': 'supertest'} ).get_response(self.test_auth) self.assertEquals(resp.status_int, 401) self.assertEquals(self.test_auth.app.calls, 1) self.test_auth.app = FakeApp(iter([ # GET of user object (account admin, but wrong # account) ('200 Ok', {}, json.dumps({"groups": [{"name": "act2:adm"}, {"name": "test"}, {"name": ".admin"}], "auth": "plaintext:key"}))])) resp = Request.blank('/auth/v2/act', headers={ 'X-Auth-Admin-User': 'act2:adm', 'X-Auth-Admin-Key': 'key'} ).get_response(self.test_auth) self.assertEquals(resp.status_int, 403) self.assertEquals(self.test_auth.app.calls, 1) self.test_auth.app = FakeApp(iter([ # GET of user object (regular user) ('200 Ok', {}, json.dumps({"groups": [{"name": "act:usr"}, {"name": "test"}], "auth": "plaintext:key"}))])) resp = Request.blank('/auth/v2/act', headers={ 'X-Auth-Admin-User': 'act:usr', 'X-Auth-Admin-Key': 'key'} ).get_response(self.test_auth) self.assertEquals(resp.status_int, 403) self.assertEquals(self.test_auth.app.calls, 1) def test_get_account_fail_get_services(self): self.test_auth.app = FakeApp(iter([ # GET of .services object ('503 Service Unavailable', {}, '')])) resp = Request.blank('/auth/v2/act', headers={ 'X-Auth-Admin-User': '.super_admin', 'X-Auth-Admin-Key': 'supertest'} ).get_response(self.test_auth) self.assertEquals(resp.status_int, 500) self.assertEquals(self.test_auth.app.calls, 1) self.test_auth.app = FakeApp(iter([ # GET of .services object ('404 Not Found', {}, '')])) resp = Request.blank('/auth/v2/act', headers={ 'X-Auth-Admin-User': '.super_admin', 'X-Auth-Admin-Key': 'supertest'} ).get_response(self.test_auth) self.assertEquals(resp.status_int, 404) self.assertEquals(self.test_auth.app.calls, 1) def test_get_account_fail_listing(self): self.test_auth.app = FakeApp(iter([ # GET of .services object ('200 Ok', {}, json.dumps( {"storage": {"default": "local", "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}})), # GET of account container (list objects) ('503 Service Unavailable', {}, '')])) resp = Request.blank('/auth/v2/act', headers={ 'X-Auth-Admin-User': '.super_admin', 'X-Auth-Admin-Key': 'supertest'} ).get_response(self.test_auth) self.assertEquals(resp.status_int, 500) self.assertEquals(self.test_auth.app.calls, 2) self.test_auth.app = FakeApp(iter([ # GET of .services object ('200 Ok', {}, json.dumps( {"storage": {"default": "local", "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}})), # GET of account container (list objects) ('404 Not Found', {}, '')])) resp = Request.blank('/auth/v2/act', headers={ 'X-Auth-Admin-User': '.super_admin', 'X-Auth-Admin-Key': 'supertest'} ).get_response(self.test_auth) self.assertEquals(resp.status_int, 404) self.assertEquals(self.test_auth.app.calls, 2) self.test_auth.app = FakeApp(iter([ # GET of .services object ('200 Ok', {}, json.dumps( {"storage": {"default": "local", "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}})), # GET of account container (list objects) ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, json.dumps([ {"name": ".services", "hash": "etag", "bytes": 112, "content_type": "application/octet-stream", "last_modified": "2010-12-03T17:16:27.618110"}, {"name": "tester", "hash": "etag", "bytes": 104, "content_type": "application/octet-stream", "last_modified": "2010-12-03T17:16:27.736680"}, {"name": "tester3", "hash": "etag", "bytes": 86, "content_type": "application/octet-stream", "last_modified": "2010-12-03T17:16:28.135530"}])), # GET of account container (list objects # continuation) ('503 Service Unavailable', {}, '')])) resp = Request.blank('/auth/v2/act', headers={ 'X-Auth-Admin-User': '.super_admin', 'X-Auth-Admin-Key': 'supertest'} ).get_response(self.test_auth) self.assertEquals(resp.status_int, 500) self.assertEquals(self.test_auth.app.calls, 3) def test_set_services_new_service(self): self.test_auth.app = FakeApp(iter([ # GET of .services object ('200 Ok', {}, json.dumps({"storage": {"default": "local", "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}})), # PUT of new .services object ('204 No Content', {}, '')])) resp = Request.blank('/auth/v2/act/.services', environ={ 'REQUEST_METHOD': 'POST'}, headers={ 'X-Auth-Admin-User': '.super_admin', 'X-Auth-Admin-Key': 'supertest'}, body=json.dumps( {'new_service': {'new_endpoint': 'new_value'}}) ).get_response(self.test_auth) self.assertEquals(resp.status_int, 200) self.assertEquals( json.loads(resp.body), {'storage': {'default': 'local', 'local': 'http://127.0.0.1:8080/v1/AUTH_cfa'}, 'new_service': {'new_endpoint': 'new_value'}}) self.assertEquals(self.test_auth.app.calls, 2) def test_set_services_new_endpoint(self): self.test_auth.app = FakeApp(iter([ # GET of .services object ('200 Ok', {}, json.dumps( {"storage": {"default": "local", "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}})), # PUT of new .services object ('204 No Content', {}, '')])) resp = Request.blank('/auth/v2/act/.services', environ={ 'REQUEST_METHOD': 'POST'}, headers={ 'X-Auth-Admin-User': '.super_admin', 'X-Auth-Admin-Key': 'supertest'}, body=json.dumps( {'storage': {'new_endpoint': 'new_value'}}) ).get_response(self.test_auth) self.assertEquals(resp.status_int, 200) self.assertEquals( json.loads(resp.body), {'storage': {'default': 'local', 'local': 'http://127.0.0.1:8080/v1/AUTH_cfa', 'new_endpoint': 'new_value'}}) self.assertEquals(self.test_auth.app.calls, 2) def test_set_services_update_endpoint(self): self.test_auth.app = FakeApp(iter([ # GET of .services object ('200 Ok', {}, json.dumps( {"storage": {"default": "local", "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}})), # PUT of new .services object ('204 No Content', {}, '')])) resp = Request.blank('/auth/v2/act/.services', environ={ 'REQUEST_METHOD': 'POST'}, headers={ 'X-Auth-Admin-User': '.super_admin', 'X-Auth-Admin-Key': 'supertest'}, body=json.dumps( {'storage': {'local': 'new_value'}}) ).get_response(self.test_auth) self.assertEquals(resp.status_int, 200) self.assertEquals(json.loads(resp.body), {'storage': {'default': 'local', 'local': 'new_value'}}) self.assertEquals(self.test_auth.app.calls, 2) def test_set_services_fail_bad_creds(self): self.test_auth.app = FakeApp(iter([ # GET of user object ('404 Not Found', {}, '')])) resp = Request.blank('/auth/v2/act/.services', environ={ 'REQUEST_METHOD': 'POST'}, headers={ 'X-Auth-Admin-User': 'super:admin', 'X-Auth-Admin-Key': 'supertest'}, body=json.dumps( {'storage': {'local': 'new_value'}}) ).get_response(self.test_auth) self.assertEquals(resp.status_int, 401) self.assertEquals(self.test_auth.app.calls, 1) self.test_auth.app = FakeApp(iter([ # GET of user object (account admin, but not reseller # admin) ('200 Ok', {}, json.dumps({"groups": [{"name": "act:adm"}, {"name": "test"}, {"name": ".admin"}], "auth": "plaintext:key"}))])) resp = Request.blank('/auth/v2/act/.services', environ={ 'REQUEST_METHOD': 'POST'}, headers={ 'X-Auth-Admin-User': 'act:adm', 'X-Auth-Admin-Key': 'key'}, body=json.dumps( {'storage': {'local': 'new_value'}}) ).get_response(self.test_auth) self.assertEquals(resp.status_int, 403) self.assertEquals(self.test_auth.app.calls, 1) self.test_auth.app = FakeApp(iter([ # GET of user object (regular user) ('200 Ok', {}, json.dumps({"groups": [{"name": "act:usr"}, {"name": "test"}], "auth": "plaintext:key"}))])) resp = Request.blank('/auth/v2/act/.services', environ={ 'REQUEST_METHOD': 'POST'}, headers={ 'X-Auth-Admin-User': 'act:usr', 'X-Auth-Admin-Key': 'key'}, body=json.dumps( {'storage': {'local': 'new_value'}}) ).get_response(self.test_auth) self.assertEquals(resp.status_int, 403) self.assertEquals(self.test_auth.app.calls, 1) def test_set_services_fail_bad_account_name(self): resp = Request.blank('/auth/v2/.act/.services', environ={ 'REQUEST_METHOD': 'POST'}, headers={ 'X-Auth-Admin-User': '.super_admin', 'X-Auth-Admin-Key': 'supertest'}, body=json.dumps( {'storage': {'local': 'new_value'}}) ).get_response(self.test_auth) self.assertEquals(resp.status_int, 400) def test_set_services_fail_bad_json(self): resp = Request.blank('/auth/v2/act/.services', environ={ 'REQUEST_METHOD': 'POST'}, headers={ 'X-Auth-Admin-User': '.super_admin', 'X-Auth-Admin-Key': 'supertest'}, body='garbage' ).get_response(self.test_auth) self.assertEquals(resp.status_int, 400) resp = Request.blank('/auth/v2/act/.services', environ={ 'REQUEST_METHOD': 'POST'}, headers={ 'X-Auth-Admin-User': '.super_admin', 'X-Auth-Admin-Key': 'supertest'}, body='' ).get_response(self.test_auth) self.assertEquals(resp.status_int, 400) def test_set_services_fail_get_services(self): self.test_auth.app = FakeApp(iter([ # GET of .services object ('503 Unavailable', {}, '')])) resp = Request.blank('/auth/v2/act/.services', environ={ 'REQUEST_METHOD': 'POST'}, headers={ 'X-Auth-Admin-User': '.super_admin', 'X-Auth-Admin-Key': 'supertest'}, body=json.dumps({ 'new_service': {'new_endpoint': 'new_value'}}) ).get_response(self.test_auth) self.assertEquals(resp.status_int, 500) self.assertEquals(self.test_auth.app.calls, 1) self.test_auth.app = FakeApp(iter([ # GET of .services object ('404 Not Found', {}, '')])) resp = Request.blank('/auth/v2/act/.services', environ={ 'REQUEST_METHOD': 'POST'}, headers={ 'X-Auth-Admin-User': '.super_admin', 'X-Auth-Admin-Key': 'supertest'}, body=json.dumps({ 'new_service': {'new_endpoint': 'new_value'}}) ).get_response(self.test_auth) self.assertEquals(resp.status_int, 404) self.assertEquals(self.test_auth.app.calls, 1) def test_set_services_fail_put_services(self): self.test_auth.app = FakeApp(iter([ # GET of .services object ('200 Ok', {}, json.dumps( {"storage": {"default": "local", "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}})), # PUT of new .services object ('503 Unavailable', {}, '')])) resp = Request.blank('/auth/v2/act/.services', environ={ 'REQUEST_METHOD': 'POST'}, headers={ 'X-Auth-Admin-User': '.super_admin', 'X-Auth-Admin-Key': 'supertest'}, body=json.dumps( {'new_service': {'new_endpoint': 'new_value'}}) ).get_response(self.test_auth) self.assertEquals(resp.status_int, 500) self.assertEquals(self.test_auth.app.calls, 2) def test_put_account_success(self): self.test_auth.app = FakeApp(iter([ # Initial HEAD of account container to check for # pre-existence ('404 Not Found', {}, ''), # PUT of account container ('204 No Content', {}, ''), # PUT of .account_id mapping object ('204 No Content', {}, ''), # PUT of .services object ('204 No Content', {}, ''), # POST to account container updating # X-Container-Meta-Account-Id ('204 No Content', {}, '')])) resp = Request.blank( '/auth/v2/act', environ={ 'REQUEST_METHOD': 'PUT', 'swift.cache': FakeMemcache()}, headers={'X-Auth-Admin-User': '.super_admin', 'X-Auth-Admin-Key': 'supertest'} ).get_response(self.test_auth) self.assertEquals(resp.status_int, 201) self.assertEquals(self.test_auth.app.calls, 5) def test_put_account_success_preexist_but_not_completed( self): self.test_auth.app = FakeApp(iter([ # Initial HEAD of account container to check for pre-existence # We're going to show it as existing this time, but with no # X-Container-Meta-Account-Id, indicating a failed # previous attempt ('200 Ok', {}, ''), # PUT of .account_id mapping object ('204 No Content', {}, ''), # PUT of .services object ('204 No Content', {}, ''), # POST to account container updating # X-Container-Meta-Account-Id ('204 No Content', {}, '')])) resp = Request.blank( '/auth/v2/act', environ={ 'REQUEST_METHOD': 'PUT', 'swift.cache': FakeMemcache()}, headers={'X-Auth-Admin-User': '.super_admin', 'X-Auth-Admin-Key': 'supertest'} ).get_response(self.test_auth) self.assertEquals(resp.status_int, 201) self.assertEquals(self.test_auth.app.calls, 4) def test_put_account_success_preexist_and_completed( self): self.test_auth.app = FakeApp(iter([ # Initial HEAD of account container to check for pre-existence # We're going to show it as existing this time, and with an # X-Container-Meta-Account-Id, indicating it already # exists ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, '')])) resp = Request.blank( '/auth/v2/act', environ={ 'REQUEST_METHOD': 'PUT', 'swift.cache': FakeMemcache()}, headers={'X-Auth-Admin-User': '.super_admin', 'X-Auth-Admin-Key': 'supertest'} ).get_response(self.test_auth) self.assertEquals(resp.status_int, 202) self.assertEquals(self.test_auth.app.calls, 1) def test_put_account_success_with_given_suffix(self): self.test_auth.app = FakeApp(iter([ # Initial HEAD of account container to check for # pre-existence ('404 Not Found', {}, ''), # PUT of account container ('204 No Content', {}, ''), # PUT of .account_id mapping object ('204 No Content', {}, ''), # PUT of .services object ('204 No Content', {}, ''), # POST to account container updating # X-Container-Meta-Account-Id ('204 No Content', {}, '')])) resp = Request.blank( '/auth/v2/act', environ={ 'REQUEST_METHOD': 'PUT', 'swift.cache': FakeMemcache()}, headers={'X-Auth-Admin-User': '.super_admin', 'X-Auth-Admin-Key': 'supertest', 'X-Account-Suffix': 'test-suffix'}).get_response( self.test_auth) self.assertEquals(resp.status_int, 201) self.assertEquals(self.test_auth.app.calls, 5) def test_put_account_fail_bad_creds(self): self.test_auth.app = FakeApp(iter([ # GET of user object ('404 Not Found', {}, '')])) resp = Request.blank( '/auth/v2/act', environ={'REQUEST_METHOD': 'PUT', 'swift.cache': FakeMemcache()}, headers={'X-Auth-Admin-User': 'super:admin', 'X-Auth-Admin-Key': 'supertest'},).get_response( self.test_auth) self.assertEquals(resp.status_int, 401) self.assertEquals(self.test_auth.app.calls, 1) self.test_auth.app = FakeApp(iter([ # GET of user object (account admin, but not reseller # admin) ('200 Ok', {}, json.dumps({"groups": [{"name": "act:adm"}, {"name": "test"}, {"name": ".admin"}], "auth": "plaintext:key"}))])) resp = Request.blank( '/auth/v2/act', environ={'REQUEST_METHOD': 'PUT', 'swift.cache': FakeMemcache()}, headers={'X-Auth-Admin-User': 'act:adm', 'X-Auth-Admin-Key': 'key'},).get_response( self.test_auth) self.assertEquals(resp.status_int, 403) self.assertEquals(self.test_auth.app.calls, 1) self.test_auth.app = FakeApp(iter([ # GET of user object (regular user) ('200 Ok', {}, json.dumps({"groups": [{"name": "act:usr"}, {"name": "test"}], "auth": "plaintext:key"}))])) resp = Request.blank( '/auth/v2/act', environ={'REQUEST_METHOD': 'PUT', 'swift.cache': FakeMemcache()}, headers={'X-Auth-Admin-User': 'act:usr', 'X-Auth-Admin-Key': 'key'},).get_response( self.test_auth) self.assertEquals(resp.status_int, 403) self.assertEquals(self.test_auth.app.calls, 1) def test_put_account_fail_invalid_account_name(self): resp = Request.blank( '/auth/v2/.act', environ={'REQUEST_METHOD': 'PUT', 'swift.cache': FakeMemcache()}, headers={'X-Auth-Admin-User': '.super_admin', 'X-Auth-Admin-Key': 'supertest'},).get_response( self.test_auth) self.assertEquals(resp.status_int, 400) def test_put_account_fail_on_initial_account_head(self): self.test_auth.app = FakeApp(iter([ # Initial HEAD of account container to check for # pre-existence ('503 Service Unavailable', {}, '')])) resp = Request.blank( '/auth/v2/act', environ={'REQUEST_METHOD': 'PUT', 'swift.cache': FakeMemcache()}, headers={'X-Auth-Admin-User': '.super_admin', 'X-Auth-Admin-Key': 'supertest'}).get_response( self.test_auth) self.assertEquals(resp.status_int, 500) self.assertEquals(self.test_auth.app.calls, 1) def test_put_account_fail_on_account_marker_put(self): self.test_auth.app = FakeApp(iter([ # Initial HEAD of account container to check for # pre-existence ('404 Not Found', {}, ''), # PUT of account container ('503 Service Unavailable', {}, '')])) resp = Request.blank( '/auth/v2/act', environ={'REQUEST_METHOD': 'PUT', 'swift.cache': FakeMemcache()}, headers={'X-Auth-Admin-User': '.super_admin', 'X-Auth-Admin-Key': 'supertest'}).get_response( self.test_auth) self.assertEquals(resp.status_int, 500) self.assertEquals(self.test_auth.app.calls, 2) def test_put_account_fail_on_storage_account_put(self): self.test_auth.app = FakeApp(iter([ # Initial HEAD of account container to check for # pre-existence ('404 Not Found', {}, ''), # PUT of account container ('204 No Content', {}, '')])) resp = Request.blank( '/auth/v2/act', environ={'REQUEST_METHOD': 'PUT', 'swift.cache': FakeMemcache()}, headers={'X-Auth-Admin-User': '.super_admin', 'X-Auth-Admin-Key': 'supertest'}).get_response( self.test_auth) self.assertEquals(resp.status_int, 500) self.assertEquals(self.test_auth.app.calls, 3) def test_put_account_fail_on_account_id_mapping(self): self.test_auth.app = FakeApp(iter([ # Initial HEAD of account container to check for # pre-existence ('404 Not Found', {}, ''), # PUT of account container ('204 No Content', {}, ''), # PUT of .account_id mapping object ('503 Service Unavailable', {}, '')])) resp = Request.blank( '/auth/v2/act', environ={'REQUEST_METHOD': 'PUT', 'swift.cache': FakeMemcache()}, headers={'X-Auth-Admin-User': '.super_admin', 'X-Auth-Admin-Key': 'supertest'}).get_response( self.test_auth) self.assertEquals(resp.status_int, 500) self.assertEquals(self.test_auth.app.calls, 3) def test_put_account_fail_on_services_object(self): self.test_auth.app = FakeApp(iter([ # Initial HEAD of account container to check for # pre-existence ('404 Not Found', {}, ''), # PUT of account container ('204 No Content', {}, ''), # PUT of .account_id mapping object ('204 No Content', {}, ''), # PUT of .services object ('503 Service Unavailable', {}, '')])) resp = Request.blank( '/auth/v2/act', environ={'REQUEST_METHOD': 'PUT', 'swift.cache': FakeMemcache()}, headers={'X-Auth-Admin-User': '.super_admin', 'X-Auth-Admin-Key': 'supertest'}).get_response( self.test_auth) self.assertEquals(resp.status_int, 500) self.assertEquals(self.test_auth.app.calls, 4) def test_put_account_fail_on_post_mapping(self): self.test_auth.app = FakeApp(iter([ # Initial HEAD of account container to check for # pre-existence ('404 Not Found', {}, ''), # PUT of account container ('204 No Content', {}, ''), # PUT of .account_id mapping object ('204 No Content', {}, ''), # PUT of .services object ('204 No Content', {}, ''), # POST to account container updating # X-Container-Meta-Account-Id ('503 Service Unavailable', {}, '')])) resp = Request.blank( '/auth/v2/act', environ={'REQUEST_METHOD': 'PUT', 'swift.cache': FakeMemcache()}, headers={'X-Auth-Admin-User': '.super_admin', 'X-Auth-Admin-Key': 'supertest'}).get_response( self.test_auth) self.assertEquals(resp.status_int, 500) self.assertEquals(self.test_auth.app.calls, 5) def test_delete_account_success(self): self.test_auth.app = FakeApp(iter([ # Account's container listing, checking for # users ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, json.dumps([ {"name": ".services", "hash": "etag", "bytes": 112, "content_type": "application/octet-stream", "last_modified": "2010-12-03T17:16:27.618110"}])), # Account's container listing, checking for users # (continuation) ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, '[]'), # GET the .services object ('200 Ok', {}, json.dumps( {"storage": {"default": "local", "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}})), # DELETE the .services object ('204 No Content', {}, ''), # DELETE the .account_id mapping object ('204 No Content', {}, ''), # DELETE the account container ('204 No Content', {}, '')])) resp = Request.blank('/auth/v2/act', environ={ 'REQUEST_METHOD': 'DELETE', 'swift.cache': FakeMemcache()}, headers={ 'X-Auth-Admin-User': '.super_admin', 'X-Auth-Admin-Key': 'supertest'} ).get_response(self.test_auth) self.assertEquals(resp.status_int, 204) self.assertEquals(self.test_auth.app.calls, 6) def test_delete_account_success_missing_services(self): self.test_auth.app = FakeApp(iter([ # Account's container listing, checking for # users ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, json.dumps([ {"name": ".services", "hash": "etag", "bytes": 112, "content_type": "application/octet-stream", "last_modified": "2010-12-03T17:16:27.618110"}])), # Account's container listing, checking for users # (continuation) ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, '[]'), # GET the .services object ('404 Not Found', {}, ''), # DELETE the .account_id mapping object ('204 No Content', {}, ''), # DELETE the account container ('204 No Content', {}, '')])) resp = Request.blank('/auth/v2/act', environ={ 'REQUEST_METHOD': 'DELETE', 'swift.cache': FakeMemcache()}, headers={ 'X-Auth-Admin-User': '.super_admin', 'X-Auth-Admin-Key': 'supertest'} ).get_response(self.test_auth) self.assertEquals(resp.status_int, 204) self.assertEquals(self.test_auth.app.calls, 5) def test_delete_account_success_missing_storage_account( self): self.test_auth.app = FakeApp(iter([ # Account's container listing, checking for # users ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, json.dumps([ {"name": ".services", "hash": "etag", "bytes": 112, "content_type": "application/octet-stream", "last_modified": "2010-12-03T17:16:27.618110"}])), # Account's container listing, checking for users # (continuation) ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, '[]'), # GET the .services object ('200 Ok', {}, json.dumps( {"storage": {"default": "local", "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}})), # DELETE the .services object ('204 No Content', {}, ''), # DELETE the .account_id mapping object ('204 No Content', {}, ''), # DELETE the account container ('204 No Content', {}, '')])) resp = Request.blank('/auth/v2/act', environ={ 'REQUEST_METHOD': 'DELETE', 'swift.cache': FakeMemcache()}, headers={ 'X-Auth-Admin-User': '.super_admin', 'X-Auth-Admin-Key': 'supertest'} ).get_response(self.test_auth) self.assertEquals(resp.status_int, 204) self.assertEquals(self.test_auth.app.calls, 6) def test_delete_account_success_missing_account_id_mapping( self): self.test_auth.app = FakeApp(iter([ # Account's container listing, checking for # users ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, json.dumps([ {"name": ".services", "hash": "etag", "bytes": 112, "content_type": "application/octet-stream", "last_modified": "2010-12-03T17:16:27.618110"}])), # Account's container listing, checking for users # (continuation) ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, '[]'), # GET the .services object ('200 Ok', {}, json.dumps( {"storage": {"default": "local", "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}})), # DELETE the .services object ('204 No Content', {}, ''), # DELETE the .account_id mapping object ('404 Not Found', {}, ''), # DELETE the account container ('204 No Content', {}, '')])) resp = Request.blank('/auth/v2/act', environ={ 'REQUEST_METHOD': 'DELETE', 'swift.cache': FakeMemcache()}, headers={ 'X-Auth-Admin-User': '.super_admin', 'X-Auth-Admin-Key': 'supertest'} ).get_response(self.test_auth) self.assertEquals(resp.status_int, 204) self.assertEquals(self.test_auth.app.calls, 6) def test_delete_account_success_missing_account_container_at_end( self): self.test_auth.app = FakeApp(iter([ # Account's container listing, checking for # users ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, json.dumps([ {"name": ".services", "hash": "etag", "bytes": 112, "content_type": "application/octet-stream", "last_modified": "2010-12-03T17:16:27.618110"}])), # Account's container listing, checking for users # (continuation) ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, '[]'), # GET the .services object ('200 Ok', {}, json.dumps( {"storage": {"default": "local", "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}})), # DELETE the .services object ('204 No Content', {}, ''), # DELETE the .account_id mapping object ('204 No Content', {}, ''), # DELETE the account container ('404 Not Found', {}, '')])) resp = Request.blank('/auth/v2/act', environ={ 'REQUEST_METHOD': 'DELETE', 'swift.cache': FakeMemcache()}, headers={ 'X-Auth-Admin-User': '.super_admin', 'X-Auth-Admin-Key': 'supertest'} ).get_response(self.test_auth) self.assertEquals(resp.status_int, 204) self.assertEquals(self.test_auth.app.calls, 6) def test_delete_account_fail_bad_creds(self): self.test_auth.app = FakeApp(iter([ # GET of user object ('404 Not Found', {}, '')])) resp = Request.blank('/auth/v2/act', environ={ 'REQUEST_METHOD': 'DELETE', 'swift.cache': FakeMemcache()}, headers={ 'X-Auth-Admin-User': 'super:admin', 'X-Auth-Admin-Key': 'supertest'}, ).get_response(self.test_auth) self.assertEquals(resp.status_int, 401) self.assertEquals(self.test_auth.app.calls, 1) self.test_auth.app = FakeApp(iter([ # GET of user object (account admin, but not reseller # admin) ('200 Ok', {}, json.dumps({"groups": [{"name": "act:adm"}, {"name": "test"}, {"name": ".admin"}], "auth": "plaintext:key"}))])) resp = Request.blank('/auth/v2/act', environ={ 'REQUEST_METHOD': 'DELETE', 'swift.cache': FakeMemcache()}, headers={ 'X-Auth-Admin-User': 'act:adm', 'X-Auth-Admin-Key': 'key'}, ).get_response(self.test_auth) self.assertEquals(resp.status_int, 403) self.assertEquals(self.test_auth.app.calls, 1) self.test_auth.app = FakeApp(iter([ # GET of user object (regular user) ('200 Ok', {}, json.dumps({"groups": [{"name": "act:usr"}, {"name": "test"}], "auth": "plaintext:key"}))])) resp = Request.blank('/auth/v2/act', environ={ 'REQUEST_METHOD': 'DELETE', 'swift.cache': FakeMemcache()}, headers={ 'X-Auth-Admin-User': 'act:usr', 'X-Auth-Admin-Key': 'key'}, ).get_response(self.test_auth) self.assertEquals(resp.status_int, 403) self.assertEquals(self.test_auth.app.calls, 1) def test_delete_account_fail_invalid_account_name(self): resp = Request.blank('/auth/v2/.act', environ={ 'REQUEST_METHOD': 'DELETE'}, headers={ 'X-Auth-Admin-User': '.super_admin', 'X-Auth-Admin-Key': 'supertest'} ).get_response(self.test_auth) self.assertEquals(resp.status_int, 400) def test_delete_account_fail_not_found(self): self.test_auth.app = FakeApp(iter([ # Account's container listing, checking for # users ('404 Not Found', {}, '')])) resp = Request.blank('/auth/v2/act', environ={ 'REQUEST_METHOD': 'DELETE', 'swift.cache': FakeMemcache()}, headers={ 'X-Auth-Admin-User': '.super_admin', 'X-Auth-Admin-Key': 'supertest'} ).get_response(self.test_auth) self.assertEquals(resp.status_int, 404) self.assertEquals(self.test_auth.app.calls, 1) def test_delete_account_fail_not_found_concurrency( self): self.test_auth.app = FakeApp(iter([ # Account's container listing, checking for # users ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, json.dumps([ {"name": ".services", "hash": "etag", "bytes": 112, "content_type": "application/octet-stream", "last_modified": "2010-12-03T17:16:27.618110"}])), # Account's container listing, checking for users # (continuation) ('404 Not Found', {}, '')])) resp = Request.blank('/auth/v2/act', environ={ 'REQUEST_METHOD': 'DELETE', 'swift.cache': FakeMemcache()}, headers={ 'X-Auth-Admin-User': '.super_admin', 'X-Auth-Admin-Key': 'supertest'} ).get_response(self.test_auth) self.assertEquals(resp.status_int, 404) self.assertEquals(self.test_auth.app.calls, 2) def test_delete_account_fail_list_account(self): self.test_auth.app = FakeApp(iter([ # Account's container listing, checking for # users ('503 Service Unavailable', {}, '')])) resp = Request.blank('/auth/v2/act', environ={ 'REQUEST_METHOD': 'DELETE', 'swift.cache': FakeMemcache()}, headers={ 'X-Auth-Admin-User': '.super_admin', 'X-Auth-Admin-Key': 'supertest'} ).get_response(self.test_auth) self.assertEquals(resp.status_int, 500) self.assertEquals(self.test_auth.app.calls, 1) def test_delete_account_fail_list_account_concurrency( self): self.test_auth.app = FakeApp(iter([ # Account's container listing, checking for # users ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, json.dumps([ {"name": ".services", "hash": "etag", "bytes": 112, "content_type": "application/octet-stream", "last_modified": "2010-12-03T17:16:27.618110"}])), # Account's container listing, checking for users # (continuation) ('503 Service Unavailable', {}, '')])) resp = Request.blank('/auth/v2/act', environ={ 'REQUEST_METHOD': 'DELETE', 'swift.cache': FakeMemcache()}, headers={ 'X-Auth-Admin-User': '.super_admin', 'X-Auth-Admin-Key': 'supertest'} ).get_response(self.test_auth) self.assertEquals(resp.status_int, 500) self.assertEquals(self.test_auth.app.calls, 2) def test_delete_account_fail_has_users(self): self.test_auth.app = FakeApp(iter([ # Account's container listing, checking for # users ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, json.dumps([ {"name": ".services", "hash": "etag", "bytes": 112, "content_type": "application/octet-stream", "last_modified": "2010-12-03T17:16:27.618110"}, {"name": "tester", "hash": "etag", "bytes": 104, "content_type": "application/octet-stream", "last_modified": "2010-12-03T17:16:27.736680"}]))])) resp = Request.blank('/auth/v2/act', environ={ 'REQUEST_METHOD': 'DELETE', 'swift.cache': FakeMemcache()}, headers={ 'X-Auth-Admin-User': '.super_admin', 'X-Auth-Admin-Key': 'supertest'} ).get_response(self.test_auth) self.assertEquals(resp.status_int, 409) self.assertEquals(self.test_auth.app.calls, 1) def test_delete_account_fail_has_users2(self): self.test_auth.app = FakeApp(iter([ # Account's container listing, checking for # users ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, json.dumps([ {"name": ".services", "hash": "etag", "bytes": 112, "content_type": "application/octet-stream", "last_modified": "2010-12-03T17:16:27.618110"}])), # Account's container listing, checking for users # (continuation) ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, json.dumps([ {"name": "tester", "hash": "etag", "bytes": 104, "content_type": "application/octet-stream", "last_modified": "2010-12-03T17:16:27.736680"}]))])) resp = Request.blank('/auth/v2/act', environ={ 'REQUEST_METHOD': 'DELETE', 'swift.cache': FakeMemcache()}, headers={ 'X-Auth-Admin-User': '.super_admin', 'X-Auth-Admin-Key': 'supertest'} ).get_response(self.test_auth) self.assertEquals(resp.status_int, 409) self.assertEquals(self.test_auth.app.calls, 2) def test_delete_account_fail_get_services(self): self.test_auth.app = FakeApp(iter([ # Account's container listing, checking for # users ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, json.dumps([ {"name": ".services", "hash": "etag", "bytes": 112, "content_type": "application/octet-stream", "last_modified": "2010-12-03T17:16:27.618110"}])), # Account's container listing, checking for users # (continuation) ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, '[]'), # GET the .services object ('503 Service Unavailable', {}, '')])) resp = Request.blank('/auth/v2/act', environ={ 'REQUEST_METHOD': 'DELETE', 'swift.cache': FakeMemcache()}, headers={ 'X-Auth-Admin-User': '.super_admin', 'X-Auth-Admin-Key': 'supertest'} ).get_response(self.test_auth) self.assertEquals(resp.status_int, 500) self.assertEquals(self.test_auth.app.calls, 3) def test_delete_account_fail_delete_storage_account( self): self.test_auth.app = FakeApp(iter([ # Account's container listing, checking for # users ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, json.dumps([ {"name": ".services", "hash": "etag", "bytes": 112, "content_type": "application/octet-stream", "last_modified": "2010-12-03T17:16:27.618110"}])), # Account's container listing, checking for users # (continuation) ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, '[]'), # GET the .services object ('200 Ok', {}, json.dumps( {"storage": {"default": "local", "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}}))])) resp = Request.blank('/auth/v2/act', environ={ 'REQUEST_METHOD': 'DELETE', 'swift.cache': FakeMemcache()}, headers={ 'X-Auth-Admin-User': '.super_admin', 'X-Auth-Admin-Key': 'supertest'} ).get_response(self.test_auth) self.assertEquals(resp.status_int, 500) self.assertEquals(self.test_auth.app.calls, 4) def test_delete_account_fail_delete_storage_account2( self): self.test_auth.app = FakeApp(iter([ # Account's container listing, checking for # users ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, json.dumps([ {"name": ".services", "hash": "etag", "bytes": 112, "content_type": "application/octet-stream", "last_modified": "2010-12-03T17:16:27.618110"}])), # Account's container listing, checking for users # (continuation) ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, '[]'), # GET the .services object ('200 Ok', {}, json.dumps( {"storage": {"default": "local", "local": "http://127.0.0.1:8080/v1/AUTH_cfa", "other": "http://127.0.0.1:8080/v1/AUTH_cfa2"}}))])) resp = Request.blank('/auth/v2/act', environ={ 'REQUEST_METHOD': 'DELETE', 'swift.cache': FakeMemcache()}, headers={ 'X-Auth-Admin-User': '.super_admin', 'X-Auth-Admin-Key': 'supertest'} ).get_response(self.test_auth) self.assertEquals(resp.status_int, 500) self.assertEquals(self.test_auth.app.calls, 4) def test_delete_account_fail_delete_storage_account3( self): self.test_auth.app = FakeApp(iter([ # Account's container listing, checking for # users ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, json.dumps([ {"name": ".services", "hash": "etag", "bytes": 112, "content_type": "application/octet-stream", "last_modified": "2010-12-03T17:16:27.618110"}])), # Account's container listing, checking for users # (continuation) ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, '[]'), # GET the .services object ('200 Ok', {}, json.dumps( {"storage": {"default": "local", "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}}))])) resp = Request.blank('/auth/v2/act', environ={ 'REQUEST_METHOD': 'DELETE', 'swift.cache': FakeMemcache()}, headers={ 'X-Auth-Admin-User': '.super_admin', 'X-Auth-Admin-Key': 'supertest'} ).get_response(self.test_auth) self.assertEquals(resp.status_int, 500) self.assertEquals(self.test_auth.app.calls, 4) def test_delete_account_fail_delete_storage_account4( self): self.test_auth.app = FakeApp(iter([ # Account's container listing, checking for # users ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, json.dumps([ {"name": ".services", "hash": "etag", "bytes": 112, "content_type": "application/octet-stream", "last_modified": "2010-12-03T17:16:27.618110"}])), # Account's container listing, checking for users # (continuation) ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, '[]'), # GET the .services object ('200 Ok', {}, json.dumps( {"storage": {"default": "local", "local": "http://127.0.0.1:8080/v1/AUTH_cfa", "other": "http://127.0.0.1:8080/v1/AUTH_cfa2"}}))])) resp = Request.blank('/auth/v2/act', environ={ 'REQUEST_METHOD': 'DELETE', 'swift.cache': FakeMemcache()}, headers={ 'X-Auth-Admin-User': '.super_admin', 'X-Auth-Admin-Key': 'supertest'} ).get_response(self.test_auth) self.assertEquals(resp.status_int, 500) self.assertEquals(self.test_auth.app.calls, 4) def test_delete_account_fail_delete_services(self): self.test_auth.app = FakeApp(iter([ # Account's container listing, checking for # users ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, json.dumps([ {"name": ".services", "hash": "etag", "bytes": 112, "content_type": "application/octet-stream", "last_modified": "2010-12-03T17:16:27.618110"}])), # Account's container listing, checking for users # (continuation) ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, '[]'), # GET the .services object ('200 Ok', {}, json.dumps( {"storage": {"default": "local", "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}})), # DELETE the .services object ('503 Service Unavailable', {}, '')])) resp = Request.blank('/auth/v2/act', environ={ 'REQUEST_METHOD': 'DELETE', 'swift.cache': FakeMemcache()}, headers={ 'X-Auth-Admin-User': '.super_admin', 'X-Auth-Admin-Key': 'supertest'} ).get_response(self.test_auth) self.assertEquals(resp.status_int, 500) self.assertEquals(self.test_auth.app.calls, 4) def test_delete_account_fail_delete_account_id_mapping( self): self.test_auth.app = FakeApp(iter([ # Account's container listing, checking for # users ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, json.dumps([ {"name": ".services", "hash": "etag", "bytes": 112, "content_type": "application/octet-stream", "last_modified": "2010-12-03T17:16:27.618110"}])), # Account's container listing, checking for users # (continuation) ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, '[]'), # GET the .services object ('200 Ok', {}, json.dumps( {"storage": {"default": "local", "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}})), # DELETE the .services object ('204 No Content', {}, ''), # DELETE the .account_id mapping object ('503 Service Unavailable', {}, '')])) resp = Request.blank('/auth/v2/act', environ={ 'REQUEST_METHOD': 'DELETE', 'swift.cache': FakeMemcache()}, headers={ 'X-Auth-Admin-User': '.super_admin', 'X-Auth-Admin-Key': 'supertest'} ).get_response(self.test_auth) self.assertEquals(resp.status_int, 500) self.assertEquals(self.test_auth.app.calls, 5) def test_delete_account_fail_delete_account_container( self): self.test_auth.app = FakeApp(iter([ # Account's container listing, checking for # users ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, json.dumps([ {"name": ".services", "hash": "etag", "bytes": 112, "content_type": "application/octet-stream", "last_modified": "2010-12-03T17:16:27.618110"}])), # Account's container listing, checking for users # (continuation) ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, '[]'), # GET the .services object ('200 Ok', {}, json.dumps( {"storage": {"default": "local", "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}})), # DELETE the .services object ('204 No Content', {}, ''), # DELETE the .account_id mapping object ('204 No Content', {}, ''), # DELETE the account container ('503 Service Unavailable', {}, '')])) resp = Request.blank('/auth/v2/act', environ={ 'REQUEST_METHOD': 'DELETE', 'swift.cache': FakeMemcache()}, headers={ 'X-Auth-Admin-User': '.super_admin', 'X-Auth-Admin-Key': 'supertest'} ).get_response(self.test_auth) self.assertEquals(resp.status_int, 500) self.assertEquals(self.test_auth.app.calls, 6) def test_get_user_success(self): self.test_auth.app = FakeApp(iter([ # GET of user object ('200 Ok', {}, json.dumps( {"groups": [{"name": "act:usr"}, {"name": "act"}, {"name": ".admin"}], "auth": "plaintext:key"}))])) resp = Request.blank('/auth/v2/act/usr', headers={ 'X-Auth-Admin-User': '.super_admin', 'X-Auth-Admin-Key': 'supertest'} ).get_response(self.test_auth) self.assertEquals(resp.status_int, 200) self.assertEquals(resp.body, json.dumps( {"groups": [{"name": "act:usr"}, {"name": "act"}, {"name": ".admin"}], "auth": "plaintext:key"})) self.assertEquals(self.test_auth.app.calls, 1) def test_get_user_fail_no_super_admin_key(self): local_auth = auth.filter_factory({})(FakeApp(iter([ # GET of user object (but we should never get # here) ('200 Ok', {}, json.dumps( {"groups": [{"name": "act:usr"}, {"name": "act"}, {"name": ".admin"}], "auth": "plaintext:key"}))]))) resp = Request.blank('/auth/v2/act/usr', headers={ 'X-Auth-Admin-User': '.super_admin', 'X-Auth-Admin-Key': 'supertest'} ).get_response(local_auth) self.assertEquals(resp.status_int, 404) self.assertEquals(local_auth.app.calls, 0) def test_get_user_groups_success(self): self.test_auth.app = FakeApp(iter([ # GET of account container (list objects) ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, json.dumps([ {"name": ".services", "hash": "etag", "bytes": 112, "content_type": "application/octet-stream", "last_modified": "2010-12-03T17:16:27.618110"}, {"name": "tester", "hash": "etag", "bytes": 104, "content_type": "application/octet-stream", "last_modified": "2010-12-03T17:16:27.736680"}, {"name": "tester3", "hash": "etag", "bytes": 86, "content_type": "application/octet-stream", "last_modified": "2010-12-03T17:16:28.135530"}])), # GET of user object ('200 Ok', {}, json.dumps( {"groups": [{"name": "act:tester"}, {"name": "act"}, {"name": ".admin"}], "auth": "plaintext:key"})), # GET of user object ('200 Ok', {}, json.dumps( {"groups": [{"name": "act:tester3"}, {"name": "act"}], "auth": "plaintext:key3"})), # GET of account container (list objects # continuation) ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, '[]')])) resp = Request.blank('/auth/v2/act/.groups', headers={ 'X-Auth-Admin-User': '.super_admin', 'X-Auth-Admin-Key': 'supertest'} ).get_response(self.test_auth) self.assertEquals(resp.status_int, 200) self.assertEquals(resp.body, json.dumps( {"groups": [{"name": ".admin"}, {"name": "act"}, {"name": "act:tester"}, {"name": "act:tester3"}]})) self.assertEquals(self.test_auth.app.calls, 4) def test_get_user_groups_success2(self): self.test_auth.app = FakeApp(iter([ # GET of account container (list objects) ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, json.dumps([ {"name": ".services", "hash": "etag", "bytes": 112, "content_type": "application/octet-stream", "last_modified": "2010-12-03T17:16:27.618110"}, {"name": "tester", "hash": "etag", "bytes": 104, "content_type": "application/octet-stream", "last_modified": "2010-12-03T17:16:27.736680"}])), # GET of user object ('200 Ok', {}, json.dumps( {"groups": [{"name": "act:tester"}, {"name": "act"}, {"name": ".admin"}], "auth": "plaintext:key"})), # GET of account container (list objects # continuation) ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, json.dumps([ {"name": "tester3", "hash": "etag", "bytes": 86, "content_type": "application/octet-stream", "last_modified": "2010-12-03T17:16:28.135530"}])), # GET of user object ('200 Ok', {}, json.dumps( {"groups": [{"name": "act:tester3"}, {"name": "act"}], "auth": "plaintext:key3"})), # GET of account container (list objects # continuation) ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, '[]')])) resp = Request.blank('/auth/v2/act/.groups', headers={ 'X-Auth-Admin-User': '.super_admin', 'X-Auth-Admin-Key': 'supertest'} ).get_response(self.test_auth) self.assertEquals(resp.status_int, 200) self.assertEquals(resp.body, json.dumps( {"groups": [{"name": ".admin"}, {"name": "act"}, {"name": "act:tester"}, {"name": "act:tester3"}]})) self.assertEquals(self.test_auth.app.calls, 5) def test_get_user_fail_invalid_account(self): resp = Request.blank('/auth/v2/.invalid/usr', headers={ 'X-Auth-Admin-User': '.super_admin', 'X-Auth-Admin-Key': 'supertest'} ).get_response(self.test_auth) self.assertEquals(resp.status_int, 400) def test_get_user_fail_invalid_user(self): resp = Request.blank('/auth/v2/act/.invalid', headers={ 'X-Auth-Admin-User': '.super_admin', 'X-Auth-Admin-Key': 'supertest'} ).get_response(self.test_auth) self.assertEquals(resp.status_int, 400) def test_get_user_fail_bad_creds(self): self.test_auth.app = FakeApp(iter([ # GET of user object ('404 Not Found', {}, '')])) resp = Request.blank('/auth/v2/act/usr', headers={ 'X-Auth-Admin-User': 'super:admin', 'X-Auth-Admin-Key': 'supertest'}, ).get_response(self.test_auth) self.assertEquals(resp.status_int, 401) self.assertEquals(self.test_auth.app.calls, 1) self.test_auth.app = FakeApp(iter([ # GET of user object (regular user) ('200 Ok', {}, json.dumps({"groups": [{"name": "act:usr"}, {"name": "test"}], "auth": "plaintext:key"}))])) resp = Request.blank('/auth/v2/act/usr', headers={ 'X-Auth-Admin-User': 'act:usr', 'X-Auth-Admin-Key': 'key'}, ).get_response(self.test_auth) self.assertEquals(resp.status_int, 403) self.assertEquals(self.test_auth.app.calls, 1) def test_get_user_account_admin_success(self): self.test_auth.app = FakeApp(iter([ # GET of user object (account admin, but not reseller # admin) ('200 Ok', {}, json.dumps({"groups": [{"name": "act:adm"}, {"name": "test"}, {"name": ".admin"}], "auth": "plaintext:key"})), # GET of requested user object ('200 Ok', {}, json.dumps( {"groups": [{"name": "act:usr"}, {"name": "act"}], "auth": "plaintext:key"}))])) resp = Request.blank('/auth/v2/act/usr', headers={ 'X-Auth-Admin-User': 'act:adm', 'X-Auth-Admin-Key': 'key'} ).get_response(self.test_auth) self.assertEquals(resp.status_int, 200) self.assertEquals(resp.body, json.dumps( {"groups": [{"name": "act:usr"}, {"name": "act"}], "auth": "plaintext:key"})) self.assertEquals(self.test_auth.app.calls, 2) def test_get_user_account_admin_fail_getting_account_admin( self): self.test_auth.app = FakeApp(iter([ # GET of user object (account admin check) ('200 Ok', {}, json.dumps({"groups": [{"name": "act:adm"}, {"name": "test"}, {"name": ".admin"}], "auth": "plaintext:key"})), # GET of requested user object [who is an .admin # as well] ('200 Ok', {}, json.dumps( {"groups": [{"name": "act:usr"}, {"name": "act"}, {"name": ".admin"}], "auth": "plaintext:key"})), # GET of user object (reseller admin check [and fail # here]) ('200 Ok', {}, json.dumps({"groups": [{"name": "act:adm"}, {"name": "test"}, {"name": ".admin"}], "auth": "plaintext:key"}))])) resp = Request.blank('/auth/v2/act/usr', headers={ 'X-Auth-Admin-User': 'act:adm', 'X-Auth-Admin-Key': 'key'} ).get_response(self.test_auth) self.assertEquals(resp.status_int, 403) self.assertEquals(self.test_auth.app.calls, 3) def test_get_user_account_admin_fail_getting_reseller_admin( self): self.test_auth.app = FakeApp(iter([ # GET of user object (account admin check) ('200 Ok', {}, json.dumps({"groups": [{"name": "act:adm"}, {"name": "test"}, {"name": ".admin"}], "auth": "plaintext:key"})), # GET of requested user object [who is a # .reseller_admin] ('200 Ok', {}, json.dumps( {"groups": [{"name": "act:usr"}, {"name": "act"}, {"name": ".reseller_admin"}], "auth": "plaintext:key"}))])) resp = Request.blank('/auth/v2/act/usr', headers={ 'X-Auth-Admin-User': 'act:adm', 'X-Auth-Admin-Key': 'key'} ).get_response(self.test_auth) self.assertEquals(resp.status_int, 403) self.assertEquals(self.test_auth.app.calls, 2) def test_get_user_reseller_admin_fail_getting_reseller_admin( self): self.test_auth.app = FakeApp(iter([ # GET of user object (account admin check) ('200 Ok', {}, json.dumps({"groups": [{"name": "act:adm"}, {"name": "test"}, {"name": ".reseller_admin"}], "auth": "plaintext:key"})), # GET of requested user object [who also is a # .reseller_admin] ('200 Ok', {}, json.dumps( {"groups": [{"name": "act:usr"}, {"name": "act"}, {"name": ".reseller_admin"}], "auth": "plaintext:key"}))])) resp = Request.blank('/auth/v2/act/usr', headers={ 'X-Auth-Admin-User': 'act:adm', 'X-Auth-Admin-Key': 'key'} ).get_response(self.test_auth) self.assertEquals(resp.status_int, 403) self.assertEquals(self.test_auth.app.calls, 2) def test_get_user_super_admin_succeed_getting_reseller_admin( self): self.test_auth.app = FakeApp(iter([ # GET of requested user object ('200 Ok', {}, json.dumps( {"groups": [{"name": "act:usr"}, {"name": "act"}, {"name": ".reseller_admin"}], "auth": "plaintext:key"}))])) resp = Request.blank('/auth/v2/act/usr', headers={ 'X-Auth-Admin-User': '.super_admin', 'X-Auth-Admin-Key': 'supertest'} ).get_response(self.test_auth) self.assertEquals(resp.status_int, 200) self.assertEquals(resp.body, json.dumps( {"groups": [{"name": "act:usr"}, {"name": "act"}, {"name": ".reseller_admin"}], "auth": "plaintext:key"})) self.assertEquals(self.test_auth.app.calls, 1) def test_get_user_groups_not_found(self): self.test_auth.app = FakeApp(iter([ # GET of account container (list objects) ('404 Not Found', {}, '')])) resp = Request.blank('/auth/v2/act/.groups', headers={ 'X-Auth-Admin-User': '.super_admin', 'X-Auth-Admin-Key': 'supertest'} ).get_response(self.test_auth) self.assertEquals(resp.status_int, 404) self.assertEquals(self.test_auth.app.calls, 1) def test_get_user_groups_fail_listing(self): self.test_auth.app = FakeApp(iter([ # GET of account container (list objects) ('503 Service Unavailable', {}, '')])) resp = Request.blank('/auth/v2/act/.groups', headers={ 'X-Auth-Admin-User': '.super_admin', 'X-Auth-Admin-Key': 'supertest'} ).get_response(self.test_auth) self.assertEquals(resp.status_int, 500) self.assertEquals(self.test_auth.app.calls, 1) def test_get_user_groups_fail_get_user(self): self.test_auth.app = FakeApp(iter([ # GET of account container (list objects) ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, json.dumps([ {"name": ".services", "hash": "etag", "bytes": 112, "content_type": "application/octet-stream", "last_modified": "2010-12-03T17:16:27.618110"}, {"name": "tester", "hash": "etag", "bytes": 104, "content_type": "application/octet-stream", "last_modified": "2010-12-03T17:16:27.736680"}, {"name": "tester3", "hash": "etag", "bytes": 86, "content_type": "application/octet-stream", "last_modified": "2010-12-03T17:16:28.135530"}])), # GET of user object ('503 Service Unavailable', {}, '')])) resp = Request.blank('/auth/v2/act/.groups', headers={ 'X-Auth-Admin-User': '.super_admin', 'X-Auth-Admin-Key': 'supertest'} ).get_response(self.test_auth) self.assertEquals(resp.status_int, 500) self.assertEquals(self.test_auth.app.calls, 2) def test_get_user_not_found(self): self.test_auth.app = FakeApp(iter([ # GET of user object ('404 Not Found', {}, '')])) resp = Request.blank('/auth/v2/act/usr', headers={ 'X-Auth-Admin-User': '.super_admin', 'X-Auth-Admin-Key': 'supertest'} ).get_response(self.test_auth) self.assertEquals(resp.status_int, 404) self.assertEquals(self.test_auth.app.calls, 1) def test_get_user_fail(self): self.test_auth.app = FakeApp(iter([ # GET of user object ('503 Service Unavailable', {}, '')])) resp = Request.blank('/auth/v2/act/usr', headers={ 'X-Auth-Admin-User': '.super_admin', 'X-Auth-Admin-Key': 'supertest', 'X-Auth-User-Key': 'key'} ).get_response(self.test_auth) self.assertEquals(resp.status_int, 500) self.assertEquals(self.test_auth.app.calls, 1) def test_put_user_fail_invalid_account(self): resp = Request.blank('/auth/v2/.invalid/usr', environ={ 'REQUEST_METHOD': 'PUT'}, headers={ 'X-Auth-Admin-User': '.super_admin', 'X-Auth-Admin-Key': 'supertest', 'X-Auth-User-Key': 'key'} ).get_response(self.test_auth) self.assertEquals(resp.status_int, 400) def test_put_user_fail_invalid_user(self): resp = Request.blank('/auth/v2/act/.usr', environ={ 'REQUEST_METHOD': 'PUT'}, headers={ 'X-Auth-Admin-User': '.super_admin', 'X-Auth-Admin-Key': 'supertest', 'X-Auth-User-Key': 'key'} ).get_response(self.test_auth) self.assertEquals(resp.status_int, 400) def test_put_user_fail_no_user_key(self): resp = Request.blank('/auth/v2/act/usr', environ={ 'REQUEST_METHOD': 'PUT'}, headers={ 'X-Auth-Admin-User': '.super_admin', 'X-Auth-Admin-Key': 'supertest'} ).get_response(self.test_auth) self.assertEquals(resp.status_int, 400) def test_put_user_reseller_admin_fail_bad_creds(self): self.test_auth.app = FakeApp(iter([ # GET of user object (reseller admin) # This shouldn't actually get called, checked # below ('200 Ok', {}, json.dumps({"groups": [{"name": "act:rdm"}, {"name": "test"}, {"name": ".admin"}, {"name": ".reseller_admin"}], "auth": "plaintext:key"}))])) resp = Request.blank('/auth/v2/act/usr', environ={ 'REQUEST_METHOD': 'PUT'}, headers={ 'X-Auth-Admin-User': 'act:rdm', 'X-Auth-Admin-Key': 'key', 'X-Auth-User-Key': 'key', 'X-Auth-User-Reseller-Admin': 'true'} ).get_response(self.test_auth) self.assertEquals(resp.status_int, 401) self.assertEquals(self.test_auth.app.calls, 0) self.test_auth.app = FakeApp(iter([ # GET of user object (account admin, but not reseller admin) # This shouldn't actually get called, checked # below ('200 Ok', {}, json.dumps({"groups": [{"name": "act:adm"}, {"name": "test"}, {"name": ".admin"}], "auth": "plaintext:key"}))])) resp = Request.blank('/auth/v2/act/usr', environ={ 'REQUEST_METHOD': 'PUT'}, headers={ 'X-Auth-Admin-User': 'act:adm', 'X-Auth-Admin-Key': 'key', 'X-Auth-User-Key': 'key', 'X-Auth-User-Reseller-Admin': 'true'} ).get_response(self.test_auth) self.assertEquals(resp.status_int, 401) self.assertEquals(self.test_auth.app.calls, 0) self.test_auth.app = FakeApp(iter([ # GET of user object (regular user) # This shouldn't actually get called, checked # below ('200 Ok', {}, json.dumps({"groups": [{"name": "act:usr"}, {"name": "test"}], "auth": "plaintext:key"}))])) resp = Request.blank('/auth/v2/act/usr', environ={ 'REQUEST_METHOD': 'PUT'}, headers={ 'X-Auth-Admin-User': 'act:adm', 'X-Auth-Admin-Key': 'key', 'X-Auth-User-Key': 'key', 'X-Auth-User-Reseller-Admin': 'true'} ).get_response(self.test_auth) self.assertEquals(resp.status_int, 401) self.assertEquals(self.test_auth.app.calls, 0) def test_put_user_account_admin_fail_bad_creds(self): self.test_auth.app = FakeApp(iter([ # GET of user object (account admin, but wrong # account) ('200 Ok', {}, json.dumps({"groups": [{"name": "act2:adm"}, {"name": "test"}, {"name": ".admin"}], "auth": "plaintext:key"}))])) resp = Request.blank('/auth/v2/act/usr', environ={ 'REQUEST_METHOD': 'PUT'}, headers={ 'X-Auth-Admin-User': 'act2:adm', 'X-Auth-Admin-Key': 'key', 'X-Auth-User-Key': 'key', 'X-Auth-User-Admin': 'true'} ).get_response(self.test_auth) self.assertEquals(resp.status_int, 403) self.assertEquals(self.test_auth.app.calls, 1) self.test_auth.app = FakeApp(iter([ # GET of user object (regular user) ('200 Ok', {}, json.dumps({"groups": [{"name": "act:usr"}, {"name": "test"}], "auth": "plaintext:key"}))])) resp = Request.blank('/auth/v2/act/usr', environ={ 'REQUEST_METHOD': 'PUT'}, headers={ 'X-Auth-Admin-User': 'act:usr', 'X-Auth-Admin-Key': 'key', 'X-Auth-User-Key': 'key', 'X-Auth-User-Admin': 'true'} ).get_response(self.test_auth) self.assertEquals(resp.status_int, 403) self.assertEquals(self.test_auth.app.calls, 1) def test_put_user_regular_fail_bad_creds(self): self.test_auth.app = FakeApp(iter([ # GET of user object (account admin, but wrong # account) ('200 Ok', {}, json.dumps({"groups": [{"name": "act2:adm"}, {"name": "test"}, {"name": ".admin"}], "auth": "plaintext:key"}))])) resp = Request.blank('/auth/v2/act/usr', environ={ 'REQUEST_METHOD': 'PUT'}, headers={ 'X-Auth-Admin-User': 'act2:adm', 'X-Auth-Admin-Key': 'key', 'X-Auth-User-Key': 'key'} ).get_response(self.test_auth) self.assertEquals(resp.status_int, 403) self.assertEquals(self.test_auth.app.calls, 1) self.test_auth.app = FakeApp(iter([ # GET of user object (regular user) ('200 Ok', {}, json.dumps({"groups": [{"name": "act:usr"}, {"name": "test"}], "auth": "plaintext:key"}))])) resp = Request.blank('/auth/v2/act/usr', environ={ 'REQUEST_METHOD': 'PUT'}, headers={ 'X-Auth-Admin-User': 'act:usr', 'X-Auth-Admin-Key': 'key', 'X-Auth-User-Key': 'key'} ).get_response(self.test_auth) self.assertEquals(resp.status_int, 403) self.assertEquals(self.test_auth.app.calls, 1) def test_put_user_regular_success(self): self.test_auth.app = FakeApp(iter([ ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, ''), # PUT of user object ('201 Created', {}, '')])) resp = Request.blank('/auth/v2/act/usr', environ={ 'REQUEST_METHOD': 'PUT'}, headers={ 'X-Auth-Admin-User': '.super_admin', 'X-Auth-Admin-Key': 'supertest', 'X-Auth-User-Key': 'key'} ).get_response(self.test_auth) self.assertEquals(resp.status_int, 201) self.assertEquals(self.test_auth.app.calls, 2) self.assertEquals( json.loads(self.test_auth.app.request.body), {"groups": [{"name": "act:usr"}, {"name": "act"}], "auth": "plaintext:key"}) def test_put_user_special_chars_success(self): self.test_auth.app = FakeApp(iter([ ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, ''), # PUT of user object ('201 Created', {}, '')])) resp = Request.blank('/auth/v2/act/u_s-r', environ={ 'REQUEST_METHOD': 'PUT'}, headers={ 'X-Auth-Admin-User': '.super_admin', 'X-Auth-Admin-Key': 'supertest', 'X-Auth-User-Key': 'key'} ).get_response(self.test_auth) self.assertEquals(resp.status_int, 201) self.assertEquals(self.test_auth.app.calls, 2) self.assertEquals( json.loads(self.test_auth.app.request.body), {"groups": [{"name": "act:u_s-r"}, {"name": "act"}], "auth": "plaintext:key"}) def test_put_user_account_admin_success(self): self.test_auth.app = FakeApp(iter([ ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, ''), # PUT of user object ('201 Created', {}, '')])) resp = Request.blank('/auth/v2/act/usr', environ={ 'REQUEST_METHOD': 'PUT'}, headers={ 'X-Auth-Admin-User': '.super_admin', 'X-Auth-Admin-Key': 'supertest', 'X-Auth-User-Key': 'key', 'X-Auth-User-Admin': 'true'} ).get_response(self.test_auth) self.assertEquals(resp.status_int, 201) self.assertEquals(self.test_auth.app.calls, 2) self.assertEquals( json.loads(self.test_auth.app.request.body), {"groups": [{"name": "act:usr"}, {"name": "act"}, {"name": ".admin"}], "auth": "plaintext:key"}) def test_put_user_reseller_admin_success(self): self.test_auth.app = FakeApp(iter([ ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, ''), # PUT of user object ('201 Created', {}, '')])) resp = Request.blank('/auth/v2/act/usr', environ={ 'REQUEST_METHOD': 'PUT'}, headers={ 'X-Auth-Admin-User': '.super_admin', 'X-Auth-Admin-Key': 'supertest', 'X-Auth-User-Key': 'key', 'X-Auth-User-Reseller-Admin': 'true'} ).get_response(self.test_auth) self.assertEquals(resp.status_int, 201) self.assertEquals(self.test_auth.app.calls, 2) self.assertEquals( json.loads(self.test_auth.app.request.body), {"groups": [{"name": "act:usr"}, {"name": "act"}, {"name": ".admin"}, {"name": ".reseller_admin"}], "auth": "plaintext:key"}) def test_put_user_fail_not_found(self): self.test_auth.app = FakeApp(iter([ ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, ''), # PUT of user object ('404 Not Found', {}, '')])) resp = Request.blank('/auth/v2/act/usr', environ={ 'REQUEST_METHOD': 'PUT'}, headers={ 'X-Auth-Admin-User': '.super_admin', 'X-Auth-Admin-Key': 'supertest', 'X-Auth-User-Key': 'key'} ).get_response(self.test_auth) self.assertEquals(resp.status_int, 404) self.assertEquals(self.test_auth.app.calls, 2) def test_put_user_fail(self): self.test_auth.app = FakeApp(iter([ # PUT of user object ('503 Service Unavailable', {}, '')])) resp = Request.blank('/auth/v2/act/usr', environ={ 'REQUEST_METHOD': 'PUT'}, headers={ 'X-Auth-Admin-User': '.super_admin', 'X-Auth-Admin-Key': 'supertest', 'X-Auth-User-Key': 'key'} ).get_response(self.test_auth) self.assertEquals(resp.status_int, 500) self.assertEquals(self.test_auth.app.calls, 1) def test_delete_user_bad_creds(self): self.test_auth.app = FakeApp(iter([ # GET of user object (account admin, but wrong # account) ('200 Ok', {}, json.dumps({"groups": [{"name": "act2:adm"}, {"name": "test"}, {"name": ".admin"}], "auth": "plaintext:key"}))])) resp = Request.blank('/auth/v2/act/usr', environ={ 'REQUEST_METHOD': 'DELETE'}, headers={ 'X-Auth-Admin-User': 'act2:adm', 'X-Auth-Admin-Key': 'key'} ).get_response(self.test_auth) self.assertEquals(resp.status_int, 403) self.assertEquals(self.test_auth.app.calls, 1) self.test_auth.app = FakeApp(iter([ # GET of user object (regular user) ('200 Ok', {}, json.dumps({"groups": [{"name": "act:usr"}, {"name": "test"}], "auth": "plaintext:key"}))])) resp = Request.blank('/auth/v2/act/usr', environ={ 'REQUEST_METHOD': 'DELETE'}, headers={ 'X-Auth-Admin-User': 'act:usr', 'X-Auth-Admin-Key': 'key'} ).get_response(self.test_auth) self.assertEquals(resp.status_int, 403) self.assertEquals(self.test_auth.app.calls, 1) def test_delete_user_invalid_account(self): resp = Request.blank('/auth/v2/.invalid/usr', environ={ 'REQUEST_METHOD': 'DELETE'}, headers={ 'X-Auth-Admin-User': '.super_admin', 'X-Auth-Admin-Key': 'supertest'} ).get_response(self.test_auth) self.assertEquals(resp.status_int, 400) def test_delete_user_invalid_user(self): resp = Request.blank('/auth/v2/act/.invalid', environ={ 'REQUEST_METHOD': 'DELETE'}, headers={ 'X-Auth-Admin-User': '.super_admin', 'X-Auth-Admin-Key': 'supertest'} ).get_response(self.test_auth) self.assertEquals(resp.status_int, 400) def test_delete_user_not_found(self): self.test_auth.app = FakeApp(iter([ # HEAD of user object ('404 Not Found', {}, '')])) resp = Request.blank('/auth/v2/act/usr', environ={ 'REQUEST_METHOD': 'DELETE'}, headers={ 'X-Auth-Admin-User': '.super_admin', 'X-Auth-Admin-Key': 'supertest'} ).get_response(self.test_auth) self.assertEquals(resp.status_int, 404) self.assertEquals(self.test_auth.app.calls, 1) def test_delete_user_fail_head_user(self): self.test_auth.app = FakeApp(iter([ # HEAD of user object ('503 Service Unavailable', {}, '')])) resp = Request.blank('/auth/v2/act/usr', environ={ 'REQUEST_METHOD': 'DELETE'}, headers={ 'X-Auth-Admin-User': '.super_admin', 'X-Auth-Admin-Key': 'supertest'} ).get_response(self.test_auth) self.assertEquals(resp.status_int, 500) self.assertEquals(self.test_auth.app.calls, 1) def test_delete_user_fail_delete_token(self): self.test_auth.app = FakeApp(iter([ # HEAD of user object ('200 Ok', {'X-Object-Meta-Auth-Token': 'AUTH_tk'}, ''), # DELETE of token ('503 Service Unavailable', {}, '')])) resp = Request.blank('/auth/v2/act/usr', environ={ 'REQUEST_METHOD': 'DELETE'}, headers={ 'X-Auth-Admin-User': '.super_admin', 'X-Auth-Admin-Key': 'supertest'} ).get_response(self.test_auth) self.assertEquals(resp.status_int, 500) self.assertEquals(self.test_auth.app.calls, 2) def test_delete_user_fail_delete_user(self): self.test_auth.app = FakeApp(iter([ # HEAD of user object ('200 Ok', {'X-Object-Meta-Auth-Token': 'AUTH_tk'}, ''), # DELETE of token ('204 No Content', {}, ''), # DELETE of user object ('503 Service Unavailable', {}, '')])) resp = Request.blank('/auth/v2/act/usr', environ={ 'REQUEST_METHOD': 'DELETE'}, headers={ 'X-Auth-Admin-User': '.super_admin', 'X-Auth-Admin-Key': 'supertest'} ).get_response(self.test_auth) self.assertEquals(resp.status_int, 500) self.assertEquals(self.test_auth.app.calls, 3) def test_delete_user_success(self): self.test_auth.app = FakeApp(iter([ # HEAD of user object ('200 Ok', {'X-Object-Meta-Auth-Token': 'AUTH_tk'}, ''), # DELETE of token ('204 No Content', {}, ''), # DELETE of user object ('204 No Content', {}, '')])) resp = Request.blank('/auth/v2/act/usr', environ={ 'REQUEST_METHOD': 'DELETE'}, headers={ 'X-Auth-Admin-User': '.super_admin', 'X-Auth-Admin-Key': 'supertest'} ).get_response(self.test_auth) self.assertEquals(resp.status_int, 204) self.assertEquals(self.test_auth.app.calls, 3) def test_delete_user_success_missing_user_at_end(self): self.test_auth.app = FakeApp(iter([ # HEAD of user object ('200 Ok', {'X-Object-Meta-Auth-Token': 'AUTH_tk'}, ''), # DELETE of token ('204 No Content', {}, ''), # DELETE of user object ('404 Not Found', {}, '')])) resp = Request.blank('/auth/v2/act/usr', environ={ 'REQUEST_METHOD': 'DELETE'}, headers={ 'X-Auth-Admin-User': '.super_admin', 'X-Auth-Admin-Key': 'supertest'} ).get_response(self.test_auth) self.assertEquals(resp.status_int, 204) self.assertEquals(self.test_auth.app.calls, 3) def test_delete_user_success_missing_token(self): self.test_auth.app = FakeApp(iter([ # HEAD of user object ('200 Ok', {'X-Object-Meta-Auth-Token': 'AUTH_tk'}, ''), # DELETE of token ('404 Not Found', {}, ''), # DELETE of user object ('204 No Content', {}, '')])) resp = Request.blank('/auth/v2/act/usr', environ={ 'REQUEST_METHOD': 'DELETE'}, headers={ 'X-Auth-Admin-User': '.super_admin', 'X-Auth-Admin-Key': 'supertest'} ).get_response(self.test_auth) self.assertEquals(resp.status_int, 204) self.assertEquals(self.test_auth.app.calls, 3) def test_delete_user_success_no_token(self): self.test_auth.app = FakeApp(iter([ # HEAD of user object ('200 Ok', {}, ''), # DELETE of user object ('204 No Content', {}, '')])) resp = Request.blank('/auth/v2/act/usr', environ={ 'REQUEST_METHOD': 'DELETE'}, headers={ 'X-Auth-Admin-User': '.super_admin', 'X-Auth-Admin-Key': 'supertest'} ).get_response(self.test_auth) self.assertEquals(resp.status_int, 204) self.assertEquals(self.test_auth.app.calls, 2) def test_validate_token_bad_prefix(self): resp = Request.blank('/auth/v2/.token/BAD_token' ).get_response(self.test_auth) self.assertEquals(resp.status_int, 400) def test_validate_token_tmi(self): resp = Request.blank( '/auth/v2/.token/AUTH_token/tmi').get_response(self.test_auth) self.assertEquals(resp.status_int, 400) def test_validate_token_bad_memcache(self): fake_memcache = FakeMemcache() fake_memcache.set('AUTH_/auth/AUTH_token', 'bogus') resp = Request.blank( '/auth/v2/.token/AUTH_token', environ={'swift.cache': fake_memcache}).get_response( self.test_auth) self.assertEquals(resp.status_int, 500) def test_validate_token_from_memcache(self): fake_memcache = FakeMemcache() fake_memcache.set( 'AUTH_/auth/AUTH_token', (time() + 1, 'act:usr,act')) resp = Request.blank( '/auth/v2/.token/AUTH_token', environ={'swift.cache': fake_memcache}).get_response( self.test_auth) self.assertEquals(resp.status_int, 204) self.assertEquals( resp.headers.get('x-auth-groups'), 'act:usr,act') self.assert_(float(resp.headers['x-auth-ttl']) < 1, resp.headers['x-auth-ttl']) def test_validate_token_from_memcache_expired(self): fake_memcache = FakeMemcache() fake_memcache.set( 'AUTH_/auth/AUTH_token', (time() - 1, 'act:usr,act')) resp = Request.blank( '/auth/v2/.token/AUTH_token', environ={'swift.cache': fake_memcache}).get_response( self.test_auth) self.assertEquals(resp.status_int, 404) self.assert_('x-auth-groups' not in resp.headers) self.assert_('x-auth-ttl' not in resp.headers) def test_validate_token_from_object(self): self.test_auth.app = FakeApp(iter([ # GET of token object ('200 Ok', {}, json.dumps({'groups': [{'name': 'act:usr'}, {'name': 'act'}], 'expires': time() + 1}))])) resp = Request.blank('/auth/v2/.token/AUTH_token' ).get_response(self.test_auth) self.assertEquals(resp.status_int, 204) self.assertEquals(self.test_auth.app.calls, 1) self.assertEquals( resp.headers.get('x-auth-groups'), 'act:usr,act') self.assert_(float(resp.headers['x-auth-ttl']) < 1, resp.headers['x-auth-ttl']) def test_validate_token_from_object_expired(self): self.test_auth.app = FakeApp(iter([ # GET of token object ('200 Ok', {}, json.dumps({'groups': 'act:usr,act', 'expires': time() - 1})), # DELETE of expired token object ('204 No Content', {}, '')])) resp = Request.blank('/auth/v2/.token/AUTH_token' ).get_response(self.test_auth) self.assertEquals(resp.status_int, 404) self.assertEquals(self.test_auth.app.calls, 2) def test_validate_token_from_object_with_admin(self): self.test_auth.app = FakeApp(iter([ # GET of token object ('200 Ok', {}, json.dumps({'account_id': 'AUTH_cfa', 'groups': [{'name': 'act:usr'}, {'name': 'act'}, {'name': '.admin'}], 'expires': time() + 1}))])) resp = Request.blank('/auth/v2/.token/AUTH_token' ).get_response(self.test_auth) self.assertEquals(resp.status_int, 204) self.assertEquals(self.test_auth.app.calls, 1) self.assertEquals(resp.headers.get('x-auth-groups'), 'act:usr,act,AUTH_cfa') self.assert_(float(resp.headers['x-auth-ttl']) < 1, resp.headers['x-auth-ttl']) def test_get_conn_default(self): conn = self.test_auth.get_conn() self.assertEquals( conn.__class__, auth.HTTPConnection) self.assertEquals(conn.host, '127.0.0.1') self.assertEquals(conn.port, 8080) def test_get_conn_default_https(self): local_auth = auth.filter_factory( {'super_admin_key': 'supertest', 'default_swift_cluster': 'local#https://1.2.3.4/v1'})(FakeApp()) conn = local_auth.get_conn() self.assertEquals( conn.__class__, auth.HTTPSConnection) self.assertEquals(conn.host, '1.2.3.4') self.assertEquals(conn.port, 443) def test_get_conn_overridden(self): local_auth = auth.filter_factory( {'super_admin_key': 'supertest', 'default_swift_cluster': 'local#https://1.2.3.4/v1'})(FakeApp()) conn = \ local_auth.get_conn( urlparsed=auth.urlparse('http://5.6.7.8/v1')) self.assertEquals( conn.__class__, auth.HTTPConnection) self.assertEquals(conn.host, '5.6.7.8') self.assertEquals(conn.port, 80) def test_get_conn_overridden_https(self): local_auth = auth.filter_factory( {'super_admin_key': 'supertest', 'default_swift_cluster': 'local#http://1.2.3.4/v1'})(FakeApp()) conn = \ local_auth.get_conn( urlparsed=auth.urlparse( 'https://5.6.7.8/v1')) self.assertEquals( conn.__class__, auth.HTTPSConnection) self.assertEquals(conn.host, '5.6.7.8') self.assertEquals(conn.port, 443) def test_get_itoken_fail_no_memcache(self): exc = None try: self.test_auth.get_itoken({}) except Exception as err: exc = err self.assertEquals(str(exc), 'No memcache set up; required for Swauth middleware') def test_get_itoken_success(self): fmc = FakeMemcache() itk = self.test_auth.get_itoken( {'swift.cache': fmc}) self.assert_(itk.startswith('AUTH_itk'), itk) expires, groups = fmc.get('AUTH_/auth/%s' % itk) self.assert_(expires > time(), expires) self.assertEquals( groups, 'gsmetadata,.reseller_admin,AUTH_gsmetadata') def test_get_admin_detail_fail_no_colon(self): self.test_auth.app = FakeApp(iter([])) self.assertEquals( self.test_auth.get_admin_detail( Request.blank('/')), None) self.assertEquals( self.test_auth.get_admin_detail( Request.blank('/', headers={'X-Auth-Admin-User': 'usr'})), None) self.assertRaises( StopIteration, self.test_auth.get_admin_detail, Request.blank('/', headers={'X-Auth-Admin-User': 'act:usr'})) def test_get_admin_detail_fail_user_not_found(self): self.test_auth.app = FakeApp( iter([('404 Not Found', {}, '')])) self.assertEquals( self.test_auth.get_admin_detail( Request.blank('/', headers={'X-Auth-Admin-User': 'act:usr'})), None) self.assertEquals(self.test_auth.app.calls, 1) def test_get_admin_detail_fail_get_user_error(self): self.test_auth.app = FakeApp(iter([ ('503 Service Unavailable', {}, '')])) exc = None try: self.test_auth.get_admin_detail( Request.blank('/', headers={'X-Auth-Admin-User': 'act:usr'})) except Exception as err: exc = err self.assertEquals(str(exc), 'Could not get admin user object: ' '/v1/AUTH_gsmetadata/act/usr 503 Service Unavailable') self.assertEquals(self.test_auth.app.calls, 1) def test_get_admin_detail_success(self): self.test_auth.app = FakeApp(iter([ ('200 Ok', {}, json.dumps({"auth": "plaintext:key", "groups": [{'name': "act:usr"}, {'name': "act"}, {'name': ".admin"}]}))])) detail = self.test_auth.get_admin_detail( Request.blank('/', headers={'X-Auth-Admin-User': 'act:usr'})) self.assertEquals(self.test_auth.app.calls, 1) self.assertEquals( detail, {'account': 'act', 'auth': 'plaintext:key', 'groups': [{'name': 'act:usr'}, {'name': 'act'}, {'name': '.admin'}]}) def test_credentials_match_success(self): self.assert_(self.test_auth.credentials_match( {'auth': 'plaintext:key'}, 'key')) def test_credentials_match_fail_no_details(self): self.assert_( not self.test_auth.credentials_match(None, 'notkey')) def test_credentials_match_fail_plaintext(self): self.assert_(not self.test_auth.credentials_match( {'auth': 'plaintext:key'}, 'notkey')) def test_is_super_admin_success(self): self.assert_( self.test_auth.is_super_admin( Request.blank( '/', headers={'X-Auth-Admin-User': '.super_admin', 'X-Auth-Admin-Key': 'supertest'}))) def test_is_super_admin_fail_bad_key(self): self.assert_( not self.test_auth.is_super_admin( Request.blank('/', headers={ 'X-Auth-Admin-User': '.super_admin', 'X-Auth-Admin-Key': 'bad'}))) self.assert_( not self.test_auth.is_super_admin( Request.blank('/', headers={'X-Auth-Admin-User': '.super_admin'}))) self.assert_( not self.test_auth.is_super_admin(Request.blank('/'))) def test_is_super_admin_fail_bad_user(self): self.assert_( not self.test_auth.is_super_admin( Request.blank('/', headers={ 'X-Auth-Admin-User': 'bad', 'X-Auth-Admin-Key': 'supertest'}))) self.assert_( not self.test_auth.is_super_admin( Request.blank('/', headers={'X-Auth-Admin-Key': 'supertest'}))) self.assert_( not self.test_auth.is_super_admin(Request.blank('/'))) def test_is_reseller_admin_success_is_super_admin(self): self.assert_( self.test_auth.is_reseller_admin( Request.blank('/', headers={ 'X-Auth-Admin-User': '.super_admin', 'X-Auth-Admin-Key': 'supertest'}))) def test_is_reseller_admin_success_called_get_admin_detail( self): self.test_auth.app = FakeApp(iter([ ('200 Ok', {}, json.dumps({'auth': 'plaintext:key', 'groups': [{'name': 'act:rdm'}, {'name': 'act'}, {'name': '.admin'}, {'name': '.reseller_admin'}]}))])) self.assert_( self.test_auth.is_reseller_admin( Request.blank('/', headers={ 'X-Auth-Admin-User': 'act:rdm', 'X-Auth-Admin-Key': 'key'}))) def test_is_reseller_admin_fail_only_account_admin( self): self.test_auth.app = FakeApp(iter([ ('200 Ok', {}, json.dumps({'auth': 'plaintext:key', 'groups': [{'name': 'act:adm'}, {'name': 'act'}, {'name': '.admin'}]}))])) self.assert_( not self.test_auth.is_reseller_admin( Request.blank('/', headers={ 'X-Auth-Admin-User': 'act:adm', 'X-Auth-Admin-Key': 'key'}))) def test_is_reseller_admin_fail_regular_user(self): self.test_auth.app = FakeApp(iter([ ('200 Ok', {}, json.dumps({'auth': 'plaintext:key', 'groups': [{'name': 'act:usr'}, {'name': 'act'}]}))])) self.assert_( not self.test_auth.is_reseller_admin( Request.blank('/', headers={ 'X-Auth-Admin-User': 'act:usr', 'X-Auth-Admin-Key': 'key'}))) def test_is_reseller_admin_fail_bad_key(self): self.test_auth.app = FakeApp(iter([ ('200 Ok', {}, json.dumps({'auth': 'plaintext:key', 'groups': [{'name': 'act:rdm'}, {'name': 'act'}, {'name': '.admin'}, {'name': '.reseller_admin'}]}))])) self.assert_( not self.test_auth.is_reseller_admin( Request.blank('/', headers={ 'X-Auth-Admin-User': 'act:rdm', 'X-Auth-Admin-Key': 'bad'}))) def test_is_account_admin_success_is_super_admin(self): self.assert_( self.test_auth.is_account_admin( Request.blank('/', headers={ 'X-Auth-Admin-User': '.super_admin', 'X-Auth-Admin-Key': 'supertest'}), 'act')) def test_is_account_admin_success_is_reseller_admin( self): self.test_auth.app = FakeApp(iter([ ('200 Ok', {}, json.dumps({'auth': 'plaintext:key', 'groups': [{'name': 'act:rdm'}, {'name': 'act'}, {'name': '.admin'}, {'name': '.reseller_admin'}]}))])) self.assert_( self.test_auth.is_account_admin( Request.blank('/', headers={ 'X-Auth-Admin-User': 'act:rdm', 'X-Auth-Admin-Key': 'key'}), 'act')) def test_is_account_admin_success(self): self.test_auth.app = FakeApp(iter([ ('200 Ok', {}, json.dumps({'auth': 'plaintext:key', 'groups': [{'name': 'act:adm'}, {'name': 'act'}, {'name': '.admin'}]}))])) self.assert_( self.test_auth.is_account_admin( Request.blank('/', headers={ 'X-Auth-Admin-User': 'act:adm', 'X-Auth-Admin-Key': 'key'}), 'act')) def test_is_account_admin_fail_account_admin_different_account( self): self.test_auth.app = FakeApp(iter([ ('200 Ok', {}, json.dumps({'auth': 'plaintext:key', 'groups': [{'name': 'act2:adm'}, {'name': 'act2'}, {'name': '.admin'}]}))])) self.assert_( not self.test_auth.is_account_admin( Request.blank('/', headers={ 'X-Auth-Admin-User': 'act2:adm', 'X-Auth-Admin-Key': 'key'}), 'act')) def test_is_account_admin_fail_regular_user(self): self.test_auth.app = FakeApp(iter([ ('200 Ok', {}, json.dumps({'auth': 'plaintext:key', 'groups': [{'name': 'act:usr'}, {'name': 'act'}]}))])) self.assert_( not self.test_auth.is_account_admin( Request.blank('/', headers={ 'X-Auth-Admin-User': 'act:usr', 'X-Auth-Admin-Key': 'key'}), 'act')) def test_is_account_admin_fail_bad_key(self): self.test_auth.app = FakeApp(iter([ ('200 Ok', {}, json.dumps({'auth': 'plaintext:key', 'groups': [{'name': 'act:rdm'}, {'name': 'act'}, {'name': '.admin'}, {'name': '.reseller_admin'}]}))])) self.assert_( not self.test_auth.is_account_admin( Request.blank('/', headers={ 'X-Auth-Admin-User': 'act:rdm', 'X-Auth-Admin-Key': 'bad'}), 'act')) def test_reseller_admin_but_account_is_internal_use_only( self): req = Request.blank('/v1/AUTH_gsmetadata', environ={'REQUEST_METHOD': 'GET'}) req.remote_user = 'act:usr,act,.reseller_admin' resp = self.test_auth.authorize(req) self.assertEquals(resp.status_int, 403) def test_reseller_admin_but_account_is_exactly_reseller_prefix( self): req = Request.blank( '/v1/AUTH_', environ={'REQUEST_METHOD': 'GET'}) req.remote_user = 'act:usr,act,.reseller_admin' resp = self.test_auth.authorize(req) self.assertEquals(resp.status_int, 403) def _get_token_success_v1_0_encoded( self, saved_user, saved_key, sent_user, sent_key): self.test_auth.app = FakeApp(iter([ # GET of user object ('200 Ok', {}, json.dumps({"auth": "plaintext:%s" % saved_key, "groups": [{'name': saved_user}, {'name': "act"}, {'name': ".admin"}]})), # GET of account ('204 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, ''), # PUT of new token ('201 Created', {}, ''), # POST of token to user object ('204 No Content', {}, ''), # GET of services object ('200 Ok', {}, json.dumps({"storage": {"default": "local", "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}}))])) resp = Request.blank( '/auth/v1.0', headers={'X-Auth-User': sent_user, 'X-Auth-Key': sent_key}).get_response(self.test_auth) self.assertEquals(resp.status_int, 200) self.assert_( resp.headers.get('x-auth-token', '').startswith('AUTH_tk'), resp.headers.get('x-auth-token')) self.assertEquals(resp.headers.get('x-auth-token'), resp.headers.get('x-storage-token')) self.assertEquals(resp.headers.get('x-storage-url'), 'http://127.0.0.1:8080/v1/AUTH_cfa') self.assertEquals( json.loads(resp.body), {"storage": {"default": "local", "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}}) self.assertEquals(self.test_auth.app.calls, 5) def test_get_token_success_v1_0_encoded1(self): self._get_token_success_v1_0_encoded( 'act:usr', 'key', 'act%3ausr', 'key') def test_get_token_success_v1_0_encoded2(self): self._get_token_success_v1_0_encoded( 'act:u s r', 'key', 'act%3au%20s%20r', 'key') def test_get_token_success_v1_0_encoded3(self): self._get_token_success_v1_0_encoded( 'act:u s r', 'k:e:y', 'act%3au%20s%20r', 'k%3Ae%3ay') def test_allowed_sync_hosts(self): a = auth.filter_factory( {'super_admin_key': 'supertest'})(FakeApp()) self.assertEquals( a.allowed_sync_hosts, ['127.0.0.1']) a = auth.filter_factory({ 'super_admin_key': 'supertest', 'allowed_sync_hosts': '1.1.1.1,2.1.1.1, 3.1.1.1 , 4.1.1.1,, , 5.1.1.1'})(FakeApp()) self.assertEquals( a.allowed_sync_hosts, ['1.1.1.1', '2.1.1.1', '3.1.1.1', '4.1.1.1', '5.1.1.1']) def test_reseller_admin_is_owner(self): orig_authorize = self.test_auth.authorize owner_values = [] def mitm_authorize(req): rv = orig_authorize(req) owner_values.append( req.environ.get('swift_owner', False)) return rv self.test_auth.authorize = mitm_authorize self.test_auth.app = FakeApp(iter([ ('200 Ok', {}, json.dumps( {'account': 'other', 'user': 'other:usr', 'account_id': 'AUTH_other', 'groups': [{'name': 'other:usr'}, {'name': 'other'}, {'name': '.reseller_admin'}], 'expires': time() + 60})), ('204 No Content', {}, '')])) req = Request.blank( '/v1/AUTH_cfa', headers={'X-Auth-Token': 'AUTH_t'}) resp = req.get_response(self.test_auth) self.assertEquals(resp.status_int, 204) self.assertEquals(owner_values, [True]) def test_admin_is_owner(self): orig_authorize = self.test_auth.authorize owner_values = [] def mitm_authorize(req): rv = orig_authorize(req) owner_values.append( req.environ.get('swift_owner', False)) return rv self.test_auth.authorize = mitm_authorize self.test_auth.app = FakeApp(iter([ ('200 Ok', {}, json.dumps( {'account': 'act', 'user': 'act:usr', 'account_id': 'AUTH_cfa', 'groups': [{'name': 'act:usr'}, {'name': 'act'}, {'name': '.admin'}], 'expires': time() + 60})), ('204 No Content', {}, '')])) req = Request.blank( '/v1/AUTH_cfa', headers={'X-Auth-Token': 'AUTH_t'}) resp = req.get_response(self.test_auth) self.assertEquals(resp.status_int, 204) self.assertEquals(owner_values, [True]) def test_regular_is_not_owner(self): orig_authorize = self.test_auth.authorize owner_values = [] def mitm_authorize(req): rv = orig_authorize(req) owner_values.append( req.environ.get('swift_owner', False)) return rv self.test_auth.authorize = mitm_authorize self.test_auth.app = FakeApp(iter([ ('200 Ok', {}, json.dumps( {'account': 'act', 'user': 'act:usr', 'account_id': 'AUTH_cfa', 'groups': [{'name': 'act:usr'}, { 'name': 'act'}], 'expires': time() + 60})), ('204 No Content', {}, '')]), acl='act:usr') req = Request.blank('/v1/AUTH_cfa/c', headers={'X-Auth-Token': 'AUTH_t'}) resp = req.get_response(self.test_auth) self.assertEquals(resp.status_int, 204) self.assertEquals(owner_values, [False]) def test_sync_request_success(self): self.test_auth.app = FakeApp( iter([('204 No Content', {}, '')]), sync_key='secret') req = Request.blank('/v1/AUTH_cfa/c/o', environ={ 'REQUEST_METHOD': 'DELETE'}, headers={ 'x-container-sync-key': 'secret', 'x-timestamp': '123.456'}) req.remote_addr = '127.0.0.1' resp = req.get_response(self.test_auth) self.assertEquals(resp.status_int, 204) def test_sync_request_fail_key(self): self.test_auth.app = FakeApp( iter([('204 No Content', {}, '')]), sync_key='secret') req = Request.blank('/v1/AUTH_cfa/c/o', environ={ 'REQUEST_METHOD': 'DELETE'}, headers={ 'x-container-sync-key': 'wrongsecret', 'x-timestamp': '123.456'}) req.remote_addr = '127.0.0.1' resp = req.get_response(self.test_auth) self.assertEquals(resp.status_int, 401) self.test_auth.app = FakeApp( iter([('204 No Content', {}, '')]), sync_key='othersecret') req = Request.blank('/v1/AUTH_cfa/c/o', environ={ 'REQUEST_METHOD': 'DELETE'}, headers={ 'x-container-sync-key': 'secret', 'x-timestamp': '123.456'}) req.remote_addr = '127.0.0.1' resp = req.get_response(self.test_auth) self.assertEquals(resp.status_int, 401) self.test_auth.app = FakeApp( iter([('204 No Content', {}, '')]), sync_key=None) req = Request.blank('/v1/AUTH_cfa/c/o', environ={ 'REQUEST_METHOD': 'DELETE'}, headers={ 'x-container-sync-key': 'secret', 'x-timestamp': '123.456'}) req.remote_addr = '127.0.0.1' resp = req.get_response(self.test_auth) self.assertEquals(resp.status_int, 401) def test_sync_request_fail_no_timestamp(self): self.test_auth.app = FakeApp( iter([('204 No Content', {}, '')]), sync_key='secret') req = Request.blank('/v1/AUTH_cfa/c/o', environ={ 'REQUEST_METHOD': 'DELETE'}, headers={'x-container-sync-key': 'secret'}) req.remote_addr = '127.0.0.1' resp = req.get_response(self.test_auth) self.assertEquals(resp.status_int, 401) def test_sync_request_fail_sync_host(self): self.test_auth.app = FakeApp( iter([('204 No Content', {}, '')]), sync_key='secret') req = Request.blank('/v1/AUTH_cfa/c/o', environ={ 'REQUEST_METHOD': 'DELETE'}, headers={ 'x-container-sync-key': 'secret', 'x-timestamp': '123.456'}) req.remote_addr = '127.0.0.2' resp = req.get_response(self.test_auth) self.assertEquals(resp.status_int, 401) def test_sync_request_success_lb_sync_host(self): self.test_auth.app = FakeApp( iter([('204 No Content', {}, '')]), sync_key='secret') req = Request.blank('/v1/AUTH_cfa/c/o', environ={ 'REQUEST_METHOD': 'DELETE'}, headers={ 'x-container-sync-key': 'secret', 'x-timestamp': '123.456', 'x-forwarded-for': '127.0.0.1'}) req.remote_addr = '127.0.0.2' resp = req.get_response(self.test_auth) self.assertEquals(resp.status_int, 204) self.test_auth.app = FakeApp( iter([('204 No Content', {}, '')]), sync_key='secret') req = Request.blank('/v1/AUTH_cfa/c/o', environ={ 'REQUEST_METHOD': 'DELETE'}, headers={ 'x-container-sync-key': 'secret', 'x-timestamp': '123.456', 'x-cluster-client-ip': '127.0.0.1'}) req.remote_addr = '127.0.0.2' resp = req.get_response(self.test_auth) self.assertEquals(resp.status_int, 204) def _make_request(self, path, **kwargs): req = Request.blank(path, **kwargs) req.environ['swift.cache'] = FakeMemcache() return req def test_override_asked_for_but_not_allowed(self): self.test_auth = \ auth.filter_factory( {'allow_overrides': 'false'})(FakeApp()) req = self._make_request('/v1/AUTH_account', environ={'swift.authorize_override': True}) resp = req.get_response(self.test_auth) self.assertEquals(resp.status_int, 401) self.assertEquals(resp.environ['swift.authorize'], self.test_auth.authorize) def test_override_asked_for_and_allowed(self): self.test_auth = \ auth.filter_factory( {'allow_overrides': 'true'})(FakeApp()) req = self._make_request('/v1/AUTH_account', environ={'swift.authorize_override': True}) resp = req.get_response(self.test_auth) self.assertEquals(resp.status_int, 404) self.assertTrue( 'swift.authorize' not in resp.environ) def test_override_default_allowed(self): req = self._make_request('/v1/AUTH_account', environ={'swift.authorize_override': True}) resp = req.get_response(self.test_auth) self.assertEquals(resp.status_int, 404) self.assertTrue( 'swift.authorize' not in resp.environ) def test_token_too_long(self): req = self._make_request('/v1/AUTH_account', headers={ 'x-auth-token': 'a' * MAX_TOKEN_LENGTH}) resp = req.get_response(self.test_auth) self.assertEquals(resp.status_int, 401) self.assertNotEquals( resp.body, 'Token exceeds maximum length.') req = self._make_request('/v1/AUTH_account', headers={ 'x-auth-token': 'a' * (MAX_TOKEN_LENGTH + 1)}) resp = req.get_response(self.test_auth) self.assertEquals(resp.status_int, 400) self.assertEquals( resp.body, 'Token exceeds maximum length.') def test_crazy_authorization(self): req = self._make_request('/v1/AUTH_account', headers={ 'authorization': 'somebody elses header value'}) resp = req.get_response(self.test_auth) self.assertEquals(resp.status_int, 401) self.assertEquals(resp.environ['swift.authorize'], self.test_auth.denied_response) if __name__ == '__main__': unittest.main()