From cadaed4627078581eddf85ce2b86555f5efec1bd Mon Sep 17 00:00:00 2001 From: Thiago da Silva Date: Thu, 17 Oct 2013 13:27:31 -0400 Subject: moving existing swauth unit tests moving existing swauth unit tests to gluster-swift unit tests location Change-Id: I3445b7ef1a1abe584854f2b04ffc9949b3346814 Signed-off-by: Thiago da Silva Reviewed-on: http://review.gluster.org/6106 Reviewed-by: Luis Pabon Tested-by: Luis Pabon --- test/unit/common/middleware/__init__.py | 0 test/unit/common/middleware/gswauth/__init__.py | 0 .../common/middleware/gswauth/swauth/__init__.py | 0 .../middleware/gswauth/swauth/test_authtypes.py | 63 + .../middleware/gswauth/swauth/test_middleware.py | 4520 ++++++++++++++++++++ 5 files changed, 4583 insertions(+) create mode 100644 test/unit/common/middleware/__init__.py create mode 100644 test/unit/common/middleware/gswauth/__init__.py create mode 100644 test/unit/common/middleware/gswauth/swauth/__init__.py create mode 100644 test/unit/common/middleware/gswauth/swauth/test_authtypes.py create mode 100644 test/unit/common/middleware/gswauth/swauth/test_middleware.py (limited to 'test/unit/common') diff --git a/test/unit/common/middleware/__init__.py b/test/unit/common/middleware/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/test/unit/common/middleware/gswauth/__init__.py b/test/unit/common/middleware/gswauth/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/test/unit/common/middleware/gswauth/swauth/__init__.py b/test/unit/common/middleware/gswauth/swauth/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/test/unit/common/middleware/gswauth/swauth/test_authtypes.py b/test/unit/common/middleware/gswauth/swauth/test_authtypes.py new file mode 100644 index 0000000..aba9ad7 --- /dev/null +++ b/test/unit/common/middleware/gswauth/swauth/test_authtypes.py @@ -0,0 +1,63 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# Pablo Llopis 2011 + +import unittest +import gluster.swift.common.middleware.gswauth.swauth.authtypes as authtypes + + +class TestPlaintext(unittest.TestCase): + + def setUp(self): + self.auth_encoder = authtypes.Plaintext() + + def test_plaintext_encode(self): + enc_key = self.auth_encoder.encode('keystring') + self.assertEquals('plaintext:keystring', enc_key) + + def test_plaintext_valid_match(self): + creds = 'plaintext:keystring' + match = self.auth_encoder.match('keystring', creds) + self.assertEquals(match, True) + + def test_plaintext_invalid_match(self): + creds = 'plaintext:other-keystring' + match = self.auth_encoder.match('keystring', creds) + self.assertEquals(match, False) + + +class TestSha1(unittest.TestCase): + + def setUp(self): + self.auth_encoder = authtypes.Sha1() + self.auth_encoder.salt = 'salt' + + def test_sha1_encode(self): + enc_key = self.auth_encoder.encode('keystring') + self.assertEquals('sha1:salt$d50dc700c296e23ce5b41f7431a0e01f69010f06', + enc_key) + + def test_sha1_valid_match(self): + creds = 'sha1:salt$d50dc700c296e23ce5b41f7431a0e01f69010f06' + match = self.auth_encoder.match('keystring', creds) + self.assertEquals(match, True) + + def test_sha1_invalid_match(self): + creds = 'sha1:salt$deadbabedeadbabedeadbabec0ffeebadc0ffeee' + match = self.auth_encoder.match('keystring', creds) + self.assertEquals(match, False) + + +if __name__ == '__main__': + unittest.main() diff --git a/test/unit/common/middleware/gswauth/swauth/test_middleware.py b/test/unit/common/middleware/gswauth/swauth/test_middleware.py new file mode 100644 index 0000000..627f1be --- /dev/null +++ b/test/unit/common/middleware/gswauth/swauth/test_middleware.py @@ -0,0 +1,4520 @@ +# Copyright (c) 2010-2011 OpenStack, LLC. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +try: + import simplejson as json +except ImportError: + import json +import unittest +from contextlib import contextmanager +from time import time + +from swift.common.swob import Request, Response + +from gluster.swift.common.middleware.gswauth.swauth import middleware as auth +from gluster.swift.common.middleware.gswauth.swauth.authtypes import MAX_TOKEN_LENGTH + + +DEFAULT_TOKEN_LIFE = 86400 +MAX_TOKEN_LIFE = 100000 + + +class FakeMemcache(object): + + def __init__(self): + self.store = {} + + def get(self, key): + return self.store.get(key) + + def set(self, key, value, timeout=0, time=0): + self.store[key] = value + return True + + def incr(self, key, timeout=0, time=0): + self.store[key] = self.store.setdefault(key, 0) + 1 + return self.store[key] + + @contextmanager + def soft_lock(self, key, timeout=0, retries=5, time=0): + yield True + + def delete(self, key): + try: + del self.store[key] + except Exception: + pass + return True + + +class FakeApp(object): + + def __init__( + self, status_headers_body_iter=None, acl=None, sync_key=None): + self.calls = 0 + self.status_headers_body_iter = status_headers_body_iter + if not self.status_headers_body_iter: + self.status_headers_body_iter = iter( + [('404 Not Found', {}, '')]) + self.acl = acl + self.sync_key = sync_key + + def __call__(self, env, start_response): + self.calls += 1 + self.request = Request.blank('', environ=env) + if self.acl: + self.request.acl = self.acl + if self.sync_key: + self.request.environ[ + 'swift_sync_key'] = self.sync_key + if 'swift.authorize' in env: + resp = env['swift.authorize'](self.request) + if resp: + return resp(env, start_response) + status, headers, body = self.status_headers_body_iter.next( + ) + return Response(status=status, headers=headers, + body=body)(env, start_response) + + +class FakeConn(object): + + def __init__(self, status_headers_body_iter=None): + self.calls = 0 + self.status_headers_body_iter = status_headers_body_iter + if not self.status_headers_body_iter: + self.status_headers_body_iter = iter( + [('404 Not Found', {}, '')]) + + def request(self, method, path, headers): + self.calls += 1 + self.request_path = path + self.status, self.headers, self.body = \ + self.status_headers_body_iter.next() + self.status, self.reason = self.status.split(' ', 1) + self.status = int(self.status) + + def getresponse(self): + return self + + def read(self): + body = self.body + self.body = '' + return body + + +class TestAuth(unittest.TestCase): + + def setUp(self): + self.test_auth = \ + auth.filter_factory({ + 'super_admin_key': 'supertest', + 'token_life': str(DEFAULT_TOKEN_LIFE), + 'max_token_life': str(MAX_TOKEN_LIFE)})(FakeApp()) + + def test_super_admin_key_not_required(self): + auth.filter_factory({})(FakeApp()) + + def test_reseller_prefix_init(self): + app = FakeApp() + ath = auth.filter_factory( + {'super_admin_key': 'supertest'})(app) + self.assertEquals(ath.reseller_prefix, 'AUTH_') + ath = auth.filter_factory( + {'super_admin_key': 'supertest', + 'reseller_prefix': 'TEST'})(app) + self.assertEquals(ath.reseller_prefix, 'TEST_') + ath = auth.filter_factory( + {'super_admin_key': 'supertest', + 'reseller_prefix': 'TEST_'})(app) + self.assertEquals(ath.reseller_prefix, 'TEST_') + + def test_auth_prefix_init(self): + app = FakeApp() + ath = auth.filter_factory( + {'super_admin_key': 'supertest'})(app) + self.assertEquals(ath.auth_prefix, '/auth/') + ath = auth.filter_factory( + {'super_admin_key': 'supertest', + 'auth_prefix': ''})(app) + self.assertEquals(ath.auth_prefix, '/auth/') + ath = auth.filter_factory( + {'super_admin_key': 'supertest', + 'auth_prefix': '/test/'})(app) + self.assertEquals(ath.auth_prefix, '/test/') + ath = auth.filter_factory( + {'super_admin_key': 'supertest', + 'auth_prefix': '/test'})(app) + self.assertEquals(ath.auth_prefix, '/test/') + ath = auth.filter_factory( + {'super_admin_key': 'supertest', + 'auth_prefix': 'test/'})(app) + self.assertEquals(ath.auth_prefix, '/test/') + ath = auth.filter_factory( + {'super_admin_key': 'supertest', + 'auth_prefix': 'test'})(app) + self.assertEquals(ath.auth_prefix, '/test/') + + def test_no_auth_type_init(self): + app = FakeApp() + ath = auth.filter_factory({})(app) + self.assertEquals(ath.auth_type, 'Plaintext') + + def test_valid_auth_type_init(self): + app = FakeApp() + ath = auth.filter_factory( + {'auth_type': 'sha1'})(app) + self.assertEquals(ath.auth_type, 'Sha1') + ath = auth.filter_factory( + {'auth_type': 'plaintext'})(app) + self.assertEquals(ath.auth_type, 'Plaintext') + + def test_invalid_auth_type_init(self): + app = FakeApp() + exc = None + try: + auth.filter_factory( + {'auth_type': 'NONEXISTANT'})(app) + except Exception as err: + exc = err + self.assertEquals(str(exc), + 'Invalid auth_type in config file: %s' % + 'Nonexistant') + + def test_default_swift_cluster_init(self): + app = FakeApp() + self.assertRaises(Exception, auth.filter_factory({ + 'super_admin_key': 'supertest', + 'default_swift_cluster': 'local#badscheme://host/path'}), app) + ath = auth.filter_factory( + {'super_admin_key': 'supertest'})(app) + self.assertEquals(ath.default_swift_cluster, + 'local#http://127.0.0.1:8080/v1') + ath = auth.filter_factory({ + 'super_admin_key': 'supertest', + 'default_swift_cluster': 'local#http://host/path'})(app) + self.assertEquals(ath.default_swift_cluster, + 'local#http://host/path') + ath = auth.filter_factory({ + 'super_admin_key': 'supertest', + 'default_swift_cluster': 'local#https://host/path/'})(app) + self.assertEquals(ath.dsc_url, 'https://host/path') + self.assertEquals(ath.dsc_url2, 'https://host/path') + ath = auth.filter_factory({ + 'super_admin_key': 'supertest', + 'default_swift_cluster': + 'local#https://host/path/#http://host2/path2/'})(app) + self.assertEquals(ath.dsc_url, 'https://host/path') + self.assertEquals( + ath.dsc_url2, + 'http://host2/path2') + + def test_top_level_denied(self): + resp = Request.blank( + '/').get_response(self.test_auth) + self.assertEquals(resp.status_int, 401) + + def test_anon(self): + resp = Request.blank( + '/v1/AUTH_account').get_response(self.test_auth) + self.assertEquals(resp.status_int, 401) + self.assertEquals(resp.environ['swift.authorize'], + self.test_auth.authorize) + + def test_auth_deny_non_reseller_prefix(self): + resp = Request.blank( + '/v1/BLAH_account', + headers={'X-Auth-Token': 'BLAH_t'}).get_response(self.test_auth) + self.assertEquals(resp.status_int, 401) + self.assertEquals(resp.environ['swift.authorize'], + self.test_auth.denied_response) + + def test_auth_deny_non_reseller_prefix_no_override( + self): + fake_authorize = lambda x: Response( + status='500 Fake') + resp = Request.blank( + '/v1/BLAH_account', + headers={'X-Auth-Token': 'BLAH_t'}, + environ={'swift.authorize': fake_authorize}).get_response( + self.test_auth) + self.assertEquals(resp.status_int, 500) + self.assertEquals(resp.environ['swift.authorize'], fake_authorize) + + def test_auth_no_reseller_prefix_deny(self): + # Ensures that when we have no reseller prefix, we don't deny a request + # outright but set up a denial swift.authorize and pass the request on + # down the chain. + local_app = FakeApp() + local_auth = auth.filter_factory( + {'super_admin_key': 'supertest', + 'reseller_prefix': ''})(local_app) + resp = Request.blank( + '/v1/account', + headers={'X-Auth-Token': 't'}).get_response(local_auth) + self.assertEquals(resp.status_int, 401) + # one for checking auth, two for request passed + # along + self.assertEquals(local_app.calls, 2) + self.assertEquals(resp.environ['swift.authorize'], + local_auth.denied_response) + + def test_auth_no_reseller_prefix_allow(self): + # Ensures that when we have no reseller prefix, we can still allow + # access if our auth server accepts requests + local_app = FakeApp(iter([ + ('200 Ok', {}, + json.dumps( + {'account': 'act', 'user': 'act:usr', + 'account_id': 'AUTH_cfa', + 'groups': [{'name': 'act:usr'}, {'name': 'act'}, + {'name': '.admin'}], + 'expires': time() + 60})), + ('204 No Content', {}, '')])) + local_auth = auth.filter_factory( + {'super_admin_key': 'supertest', + 'reseller_prefix': ''})(local_app) + resp = Request.blank( + '/v1/act', + headers={'X-Auth-Token': 't'}).get_response(local_auth) + self.assertEquals(resp.status_int, 204) + self.assertEquals(local_app.calls, 2) + self.assertEquals(resp.environ['swift.authorize'], + local_auth.authorize) + + def test_auth_no_reseller_prefix_no_token(self): + # Check that normally we set up a call back to our + # authorize. + local_auth = \ + auth.filter_factory( + {'super_admin_key': 'supertest', + 'reseller_prefix': ''})(FakeApp(iter([]))) + resp = Request.blank( + '/v1/account').get_response( + local_auth) + self.assertEquals(resp.status_int, 401) + self.assertEquals( + resp.environ['swift.authorize'], local_auth.authorize) + # Now make sure we don't override an existing swift.authorize when we + # have no reseller prefix. + local_auth = \ + auth.filter_factory( + {'super_admin_key': 'supertest', + 'reseller_prefix': ''})(FakeApp()) + local_authorize = lambda req: Response('test') + resp = Request.blank( + '/v1/account', environ={'swift.authorize': + local_authorize}).get_response(local_auth) + self.assertEquals(resp.status_int, 200) + self.assertEquals( + resp.environ['swift.authorize'], + local_authorize) + + def test_auth_fail(self): + resp = Request.blank( + '/v1/AUTH_cfa', + headers={'X-Auth-Token': 'AUTH_t'}).get_response(self.test_auth) + self.assertEquals(resp.status_int, 401) + + def test_auth_success(self): + self.test_auth.app = FakeApp(iter([ + ('200 Ok', {}, + json.dumps( + {'account': 'act', 'user': 'act:usr', + 'account_id': 'AUTH_cfa', + 'groups': [{'name': 'act:usr'}, {'name': 'act'}, + {'name': '.admin'}], + 'expires': time() + 60})), + ('204 No Content', {}, '')])) + resp = Request.blank( + '/v1/AUTH_cfa', + headers={'X-Auth-Token': 'AUTH_t'}).get_response(self.test_auth) + self.assertEquals(resp.status_int, 204) + self.assertEquals(self.test_auth.app.calls, 2) + + def test_auth_memcache(self): + # First run our test without memcache, showing we need to return the + # token contents twice. + self.test_auth.app = FakeApp(iter([ + ('200 Ok', {}, + json.dumps( + {'account': 'act', 'user': 'act:usr', + 'account_id': 'AUTH_cfa', + 'groups': [{'name': 'act:usr'}, {'name': 'act'}, + {'name': '.admin'}], + 'expires': time() + 60})), + ('204 No Content', {}, ''), + ('200 Ok', {}, + json.dumps( + {'account': 'act', 'user': 'act:usr', + 'account_id': 'AUTH_cfa', + 'groups': [{'name': 'act:usr'}, {'name': 'act'}, + {'name': '.admin'}], + 'expires': time() + 60})), + ('204 No Content', {}, '')])) + resp = Request.blank( + '/v1/AUTH_cfa', + headers={'X-Auth-Token': 'AUTH_t'}).get_response(self.test_auth) + self.assertEquals(resp.status_int, 204) + resp = Request.blank( + '/v1/AUTH_cfa', + headers={'X-Auth-Token': 'AUTH_t'}).get_response(self.test_auth) + self.assertEquals(resp.status_int, 204) + self.assertEquals(self.test_auth.app.calls, 4) + # Now run our test with memcache, showing we no longer need to return + # the token contents twice. + self.test_auth.app = FakeApp(iter([ + ('200 Ok', {}, + json.dumps( + {'account': 'act', 'user': 'act:usr', + 'account_id': 'AUTH_cfa', + 'groups': [{'name': 'act:usr'}, {'name': 'act'}, + {'name': '.admin'}], + 'expires': time() + 60})), + ('204 No Content', {}, ''), + # Don't need a second token object returned if memcache is + # used + ('204 No Content', {}, '')])) + fake_memcache = FakeMemcache() + resp = Request.blank( + '/v1/AUTH_cfa', + headers={'X-Auth-Token': 'AUTH_t'}, + environ={'swift.cache': fake_memcache}).get_response( + self.test_auth) + self.assertEquals(resp.status_int, 204) + resp = Request.blank( + '/v1/AUTH_cfa', + headers={'X-Auth-Token': 'AUTH_t'}, + environ={'swift.cache': fake_memcache}).get_response( + self.test_auth) + self.assertEquals(resp.status_int, 204) + self.assertEquals(self.test_auth.app.calls, 3) + + def test_auth_just_expired(self): + self.test_auth.app = FakeApp(iter([ + # Request for token (which will have expired) + ('200 Ok', {}, + json.dumps( + {'account': 'act', 'user': 'act:usr', + 'account_id': 'AUTH_cfa', + 'groups': [{'name': 'act:usr'}, {'name': 'act'}, + {'name': '.admin'}], + 'expires': time() - 1})), + # Request to delete token + ('204 No Content', {}, '')])) + resp = Request.blank( + '/v1/AUTH_cfa', + headers={'X-Auth-Token': 'AUTH_t'}).get_response(self.test_auth) + self.assertEquals(resp.status_int, 401) + self.assertEquals(self.test_auth.app.calls, 2) + + def test_middleware_storage_token(self): + self.test_auth.app = FakeApp(iter([ + ('200 Ok', {}, + json.dumps( + {'account': 'act', 'user': 'act:usr', + 'account_id': 'AUTH_cfa', + 'groups': [{'name': 'act:usr'}, {'name': 'act'}, + {'name': '.admin'}], + 'expires': time() + 60})), + ('204 No Content', {}, '')])) + resp = Request.blank( + '/v1/AUTH_cfa', + headers={'X-Storage-Token': 'AUTH_t'}).get_response(self.test_auth) + self.assertEquals(resp.status_int, 204) + self.assertEquals(self.test_auth.app.calls, 2) + + def test_authorize_bad_path(self): + req = Request.blank('/badpath') + resp = self.test_auth.authorize(req) + self.assertEquals(resp.status_int, 401) + req = Request.blank('/badpath') + req.remote_user = 'act:usr,act,AUTH_cfa' + resp = self.test_auth.authorize(req) + self.assertEquals(resp.status_int, 403) + + def test_authorize_account_access(self): + req = Request.blank('/v1/AUTH_cfa') + req.remote_user = 'act:usr,act,AUTH_cfa' + self.assertEquals( + self.test_auth.authorize(req), + None) + req = Request.blank('/v1/AUTH_cfa') + req.remote_user = 'act:usr,act' + resp = self.test_auth.authorize(req) + self.assertEquals(resp.status_int, 403) + + def test_authorize_acl_group_access(self): + req = Request.blank('/v1/AUTH_cfa') + req.remote_user = 'act:usr,act' + resp = self.test_auth.authorize(req) + self.assertEquals(resp.status_int, 403) + req = Request.blank('/v1/AUTH_cfa') + req.remote_user = 'act:usr,act' + req.acl = 'act' + self.assertEquals( + self.test_auth.authorize(req), + None) + req = Request.blank('/v1/AUTH_cfa') + req.remote_user = 'act:usr,act' + req.acl = 'act:usr' + self.assertEquals( + self.test_auth.authorize(req), + None) + req = Request.blank('/v1/AUTH_cfa') + req.remote_user = 'act:usr,act' + req.acl = 'act2' + resp = self.test_auth.authorize(req) + self.assertEquals(resp.status_int, 403) + req = Request.blank('/v1/AUTH_cfa') + req.remote_user = 'act:usr,act' + req.acl = 'act:usr2' + resp = self.test_auth.authorize(req) + self.assertEquals(resp.status_int, 403) + + def test_deny_cross_reseller(self): + # Tests that cross-reseller is denied, even if ACLs/group + # names match + req = Request.blank('/v1/OTHER_cfa') + req.remote_user = 'act:usr,act,AUTH_cfa' + req.acl = 'act' + resp = self.test_auth.authorize(req) + self.assertEquals(resp.status_int, 403) + + def test_authorize_acl_referrer_access(self): + req = Request.blank('/v1/AUTH_cfa/c') + req.remote_user = 'act:usr,act' + resp = self.test_auth.authorize(req) + self.assertEquals(resp.status_int, 403) + req = Request.blank('/v1/AUTH_cfa/c') + req.remote_user = 'act:usr,act' + req.acl = '.r:*,.rlistings' + self.assertEquals( + self.test_auth.authorize(req), + None) + req = Request.blank('/v1/AUTH_cfa/c') + req.remote_user = 'act:usr,act' + req.acl = '.r:*' # No listings allowed + resp = self.test_auth.authorize(req) + self.assertEquals(resp.status_int, 403) + req = Request.blank('/v1/AUTH_cfa/c') + req.remote_user = 'act:usr,act' + req.acl = '.r:.example.com,.rlistings' + resp = self.test_auth.authorize(req) + self.assertEquals(resp.status_int, 403) + req = Request.blank('/v1/AUTH_cfa/c') + req.remote_user = 'act:usr,act' + req.referer = 'http://www.example.com/index.html' + req.acl = '.r:.example.com,.rlistings' + self.assertEquals( + self.test_auth.authorize(req), + None) + req = Request.blank('/v1/AUTH_cfa/c') + resp = self.test_auth.authorize(req) + self.assertEquals(resp.status_int, 401) + req = Request.blank('/v1/AUTH_cfa/c') + req.acl = '.r:*,.rlistings' + self.assertEquals( + self.test_auth.authorize(req), + None) + req = Request.blank('/v1/AUTH_cfa/c') + req.acl = '.r:*' # No listings allowed + resp = self.test_auth.authorize(req) + self.assertEquals(resp.status_int, 401) + req = Request.blank('/v1/AUTH_cfa/c') + req.acl = '.r:.example.com,.rlistings' + resp = self.test_auth.authorize(req) + self.assertEquals(resp.status_int, 401) + req = Request.blank('/v1/AUTH_cfa/c') + req.referer = 'http://www.example.com/index.html' + req.acl = '.r:.example.com,.rlistings' + self.assertEquals( + self.test_auth.authorize(req), + None) + + def test_detect_reseller_request(self): + req = self._make_request('/v1/AUTH_admin', + headers={'X-Auth-Token': 'AUTH_t'}) + cache_key = 'AUTH_/auth/AUTH_t' + cache_entry = (time() + 3600, '.reseller_admin') + req.environ['swift.cache'].set( + cache_key, cache_entry) + resp = req.get_response(self.test_auth) + self.assertTrue(req.environ.get('reseller_request')) + + def test_account_put_permissions(self): + req = Request.blank( + '/v1/AUTH_new', + environ={'REQUEST_METHOD': 'PUT'}) + req.remote_user = 'act:usr,act' + resp = self.test_auth.authorize(req) + self.assertEquals(resp.status_int, 403) + + req = Request.blank( + '/v1/AUTH_new', + environ={'REQUEST_METHOD': 'PUT'}) + req.remote_user = 'act:usr,act,AUTH_other' + resp = self.test_auth.authorize(req) + self.assertEquals(resp.status_int, 403) + + # Even PUTs to your own account as account admin + # should fail + req = Request.blank( + '/v1/AUTH_old', + environ={'REQUEST_METHOD': 'PUT'}) + req.remote_user = 'act:usr,act,AUTH_old' + resp = self.test_auth.authorize(req) + self.assertEquals(resp.status_int, 403) + + req = Request.blank( + '/v1/AUTH_new', + environ={'REQUEST_METHOD': 'PUT'}) + req.remote_user = 'act:usr,act,.reseller_admin' + resp = self.test_auth.authorize(req) + self.assertEquals(resp, None) + + # .super_admin is not something the middleware should ever see or care + # about + req = Request.blank( + '/v1/AUTH_new', + environ={'REQUEST_METHOD': 'PUT'}) + req.remote_user = 'act:usr,act,.super_admin' + resp = self.test_auth.authorize(req) + self.assertEquals(resp.status_int, 403) + + def test_account_delete_permissions(self): + req = Request.blank('/v1/AUTH_new', + environ={'REQUEST_METHOD': 'DELETE'}) + req.remote_user = 'act:usr,act' + resp = self.test_auth.authorize(req) + self.assertEquals(resp.status_int, 403) + + req = Request.blank('/v1/AUTH_new', + environ={'REQUEST_METHOD': 'DELETE'}) + req.remote_user = 'act:usr,act,AUTH_other' + resp = self.test_auth.authorize(req) + self.assertEquals(resp.status_int, 403) + + # Even DELETEs to your own account as account admin should + # fail + req = Request.blank('/v1/AUTH_old', + environ={'REQUEST_METHOD': 'DELETE'}) + req.remote_user = 'act:usr,act,AUTH_old' + resp = self.test_auth.authorize(req) + self.assertEquals(resp.status_int, 403) + + req = Request.blank('/v1/AUTH_new', + environ={'REQUEST_METHOD': 'DELETE'}) + req.remote_user = 'act:usr,act,.reseller_admin' + resp = self.test_auth.authorize(req) + self.assertEquals(resp, None) + + # .super_admin is not something the middleware should ever see or care + # about + req = Request.blank('/v1/AUTH_new', + environ={'REQUEST_METHOD': 'DELETE'}) + req.remote_user = 'act:usr,act,.super_admin' + resp = self.test_auth.authorize(req) + resp = self.test_auth.authorize(req) + self.assertEquals(resp.status_int, 403) + + def test_get_token_fail(self): + resp = Request.blank( + '/auth/v1.0').get_response( + self.test_auth) + self.assertEquals(resp.status_int, 401) + resp = Request.blank( + '/auth/v1.0', + headers={'X-Auth-User': 'act:usr', + 'X-Auth-Key': 'key'}).get_response(self.test_auth) + self.assertEquals(resp.status_int, 401) + + def test_get_token_fail_invalid_key(self): + self.test_auth.app = FakeApp(iter([ + # GET of user object + ('200 Ok', {}, + json.dumps({"auth": "plaintext:key", + "groups": [{'name': "act:usr"}, {'name': "act"}, + {'name': ".admin"}]}))])) + resp = Request.blank( + '/auth/v1.0', + headers={'X-Auth-User': 'act:usr', + 'X-Auth-Key': 'invalid'}).get_response(self.test_auth) + self.assertEquals(resp.status_int, 401) + self.assertEquals(self.test_auth.app.calls, 1) + + def test_get_token_fail_invalid_x_auth_user_format( + self): + resp = Request.blank( + '/auth/v1/act/auth', + headers={'X-Auth-User': 'usr', + 'X-Auth-Key': 'key'}).get_response(self.test_auth) + self.assertEquals(resp.status_int, 401) + + def test_get_token_fail_non_matching_account_in_request( + self): + resp = Request.blank( + '/auth/v1/act/auth', + headers={'X-Auth-User': 'act2:usr', + 'X-Auth-Key': 'key'}).get_response(self.test_auth) + self.assertEquals(resp.status_int, 401) + + def test_get_token_fail_bad_path(self): + resp = Request.blank( + '/auth/v1/act/auth/invalid', + headers={'X-Auth-User': 'act:usr', + 'X-Auth-Key': 'key'}).get_response(self.test_auth) + self.assertEquals(resp.status_int, 400) + + def test_get_token_fail_missing_key(self): + resp = Request.blank( + '/auth/v1/act/auth', + headers={'X-Auth-User': 'act:usr'}).get_response(self.test_auth) + self.assertEquals(resp.status_int, 401) + + def test_get_token_fail_get_user_details(self): + self.test_auth.app = FakeApp(iter([ + ('503 Service Unavailable', {}, '')])) + resp = Request.blank( + '/auth/v1.0', + headers={'X-Auth-User': 'act:usr', + 'X-Auth-Key': 'key'}).get_response(self.test_auth) + self.assertEquals(resp.status_int, 500) + self.assertEquals(self.test_auth.app.calls, 1) + + def test_get_token_fail_get_account(self): + self.test_auth.app = FakeApp(iter([ + # GET of user object + ('200 Ok', {}, + json.dumps({"auth": "plaintext:key", + "groups": [{'name': "act:usr"}, {'name': "act"}, + {'name': ".admin"}]})), + # GET of account + ('503 Service Unavailable', {}, '')])) + resp = Request.blank( + '/auth/v1.0', + headers={'X-Auth-User': 'act:usr', + 'X-Auth-Key': 'key'}).get_response(self.test_auth) + self.assertEquals(resp.status_int, 500) + self.assertEquals(self.test_auth.app.calls, 2) + + def test_get_token_fail_put_new_token(self): + self.test_auth.app = FakeApp(iter([ + # GET of user object + ('200 Ok', {}, + json.dumps({"auth": "plaintext:key", + "groups": [{'name': "act:usr"}, {'name': "act"}, + {'name': ".admin"}]})), + # GET of account + ('204 Ok', + {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, ''), + # PUT of new token + ('503 Service Unavailable', {}, '')])) + resp = Request.blank( + '/auth/v1.0', + headers={'X-Auth-User': 'act:usr', + 'X-Auth-Key': 'key'}).get_response(self.test_auth) + self.assertEquals(resp.status_int, 500) + self.assertEquals(self.test_auth.app.calls, 3) + + def test_get_token_fail_post_to_user(self): + self.test_auth.app = FakeApp(iter([ + # GET of user object + ('200 Ok', {}, + json.dumps({"auth": "plaintext:key", + "groups": [{'name': "act:usr"}, {'name': "act"}, + {'name': ".admin"}]})), + # GET of account + ('204 Ok', + {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, ''), + # PUT of new token + ('201 Created', {}, ''), + # POST of token to user object + ('503 Service Unavailable', {}, '')])) + resp = Request.blank( + '/auth/v1.0', + headers={'X-Auth-User': 'act:usr', + 'X-Auth-Key': 'key'}).get_response(self.test_auth) + self.assertEquals(resp.status_int, 500) + self.assertEquals(self.test_auth.app.calls, 4) + + def test_get_token_fail_get_services(self): + self.test_auth.app = FakeApp(iter([ + # GET of user object + ('200 Ok', {}, + json.dumps({"auth": "plaintext:key", + "groups": [{'name': "act:usr"}, {'name': "act"}, + {'name': ".admin"}]})), + # GET of account + ('204 Ok', + {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, ''), + # PUT of new token + ('201 Created', {}, ''), + # POST of token to user object + ('204 No Content', {}, ''), + # GET of services object + ('503 Service Unavailable', {}, '')])) + resp = Request.blank( + '/auth/v1.0', + headers={'X-Auth-User': 'act:usr', + 'X-Auth-Key': 'key'}).get_response(self.test_auth) + self.assertEquals(resp.status_int, 500) + self.assertEquals(self.test_auth.app.calls, 5) + + def test_get_token_fail_get_existing_token(self): + self.test_auth.app = FakeApp(iter([ + # GET of user object + ('200 Ok', {'X-Object-Meta-Auth-Token': 'AUTH_tktest'}, + json.dumps({"auth": "plaintext:key", + "groups": [{'name': "act:usr"}, {'name': "act"}, + {'name': ".admin"}]})), + # GET of token + ('503 Service Unavailable', {}, '')])) + resp = Request.blank( + '/auth/v1.0', + headers={'X-Auth-User': 'act:usr', + 'X-Auth-Key': 'key'}).get_response(self.test_auth) + self.assertEquals(resp.status_int, 500) + self.assertEquals(self.test_auth.app.calls, 2) + + def test_get_token_success_v1_0(self): + self.test_auth.app = FakeApp(iter([ + # GET of user object + ('200 Ok', {}, + json.dumps({"auth": "plaintext:key", + "groups": [{'name': "act:usr"}, {'name': "act"}, + {'name': ".admin"}]})), + # GET of account + ('204 Ok', + {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, ''), + # PUT of new token + ('201 Created', {}, ''), + # POST of token to user object + ('204 No Content', {}, ''), + # GET of services object + ('200 Ok', {}, json.dumps({"storage": {"default": "local", + "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}}))])) + resp = Request.blank( + '/auth/v1.0', + headers={'X-Auth-User': 'act:usr', + 'X-Auth-Key': 'key'}).get_response(self.test_auth) + self.assertEquals(resp.status_int, 200) + self.assert_(resp.headers.get( + 'x-auth-token', + '').startswith('AUTH_tk'), resp.headers.get('x-auth-token')) + self.assertEquals(resp.headers.get('x-auth-token'), + resp.headers.get('x-storage-token')) + self.assertEquals(resp.headers.get('x-storage-url'), + 'http://127.0.0.1:8080/v1/AUTH_cfa') + self.assertEquals( + json.loads(resp.body), + {"storage": {"default": "local", + "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}}) + self.assertEquals(self.test_auth.app.calls, 5) + + def test_get_token_success_v1_0_with_user_token_life( + self): + self.test_auth.app = FakeApp(iter([ + # GET of user object + ('200 Ok', {}, + json.dumps({"auth": "plaintext:key", + "groups": [{'name': "act:usr"}, {'name': "act"}, + {'name': ".admin"}]})), + # GET of account + ('204 Ok', + {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, ''), + # PUT of new token + ('201 Created', {}, ''), + # POST of token to user object + ('204 No Content', {}, ''), + # GET of services object + ('200 Ok', {}, json.dumps({"storage": {"default": "local", + "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}}))])) + resp = Request.blank( + '/auth/v1.0', + headers={'X-Auth-User': 'act:usr', + 'X-Auth-Key': 'key', + 'X-Auth-Token-Lifetime': 10}).get_response(self.test_auth) + self.assertEquals(resp.status_int, 200) + left = int(resp.headers['x-auth-token-expires']) + self.assertTrue(left > 0, '%d > 0' % left) + self.assertTrue(left <= 10, '%d <= 10' % left) + self.assert_(resp.headers.get( + 'x-auth-token', + '').startswith('AUTH_tk'), resp.headers.get('x-auth-token')) + self.assertEquals(resp.headers.get('x-auth-token'), + resp.headers.get('x-storage-token')) + self.assertEquals(resp.headers.get('x-storage-url'), + 'http://127.0.0.1:8080/v1/AUTH_cfa') + self.assertEquals( + json.loads(resp.body), + {"storage": {"default": "local", + "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}}) + self.assertEquals(self.test_auth.app.calls, 5) + + def test_get_token_success_v1_0_with_user_token_life_past_max( + self): + self.test_auth.app = FakeApp(iter([ + # GET of user object + ('200 Ok', {}, + json.dumps({"auth": "plaintext:key", + "groups": [{'name': "act:usr"}, {'name': "act"}, + {'name': ".admin"}]})), + # GET of account + ('204 Ok', + {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, ''), + # PUT of new token + ('201 Created', {}, ''), + # POST of token to user object + ('204 No Content', {}, ''), + # GET of services object + ('200 Ok', {}, json.dumps({"storage": {"default": "local", + "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}}))])) + req = Request.blank( + '/auth/v1.0', + headers={'X-Auth-User': 'act:usr', + 'X-Auth-Key': 'key', + 'X-Auth-Token-Lifetime': MAX_TOKEN_LIFE * 10}) + resp = req.get_response(self.test_auth) + self.assertEquals(resp.status_int, 200) + left = int(resp.headers['x-auth-token-expires']) + self.assertTrue(left > DEFAULT_TOKEN_LIFE, + '%d > %d' % (left, DEFAULT_TOKEN_LIFE)) + self.assertTrue(left <= MAX_TOKEN_LIFE, + '%d <= %d' % (left, MAX_TOKEN_LIFE)) + self.assert_(resp.headers.get( + 'x-auth-token', + '').startswith('AUTH_tk'), resp.headers.get('x-auth-token')) + self.assertEquals(resp.headers.get('x-auth-token'), + resp.headers.get('x-storage-token')) + self.assertEquals(resp.headers.get('x-storage-url'), + 'http://127.0.0.1:8080/v1/AUTH_cfa') + self.assertEquals( + json.loads(resp.body), + {"storage": {"default": "local", + "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}}) + self.assertEquals(self.test_auth.app.calls, 5) + + def test_get_token_success_v1_act_auth(self): + self.test_auth.app = FakeApp(iter([ + # GET of user object + ('200 Ok', {}, + json.dumps({"auth": "plaintext:key", + "groups": [{'name': "act:usr"}, {'name': "act"}, + {'name': ".admin"}]})), + # GET of account + ('204 Ok', + {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, ''), + # PUT of new token + ('201 Created', {}, ''), + # POST of token to user object + ('204 No Content', {}, ''), + # GET of services object + ('200 Ok', {}, json.dumps({"storage": {"default": "local", + "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}}))])) + resp = Request.blank( + '/auth/v1/act/auth', + headers={'X-Storage-User': 'usr', + 'X-Storage-Pass': 'key'}).get_response(self.test_auth) + self.assertEquals(resp.status_int, 200) + self.assert_(resp.headers.get( + 'x-auth-token', + '').startswith('AUTH_tk'), resp.headers.get('x-auth-token')) + self.assertEquals(resp.headers.get('x-auth-token'), + resp.headers.get('x-storage-token')) + self.assertEquals(resp.headers.get('x-storage-url'), + 'http://127.0.0.1:8080/v1/AUTH_cfa') + self.assertEquals( + json.loads(resp.body), + {"storage": {"default": "local", + "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}}) + self.assertEquals(self.test_auth.app.calls, 5) + + def test_get_token_success_storage_instead_of_auth( + self): + self.test_auth.app = FakeApp(iter([ + # GET of user object + ('200 Ok', {}, + json.dumps({"auth": "plaintext:key", + "groups": [{'name': "act:usr"}, {'name': "act"}, + {'name': ".admin"}]})), + # GET of account + ('204 Ok', + {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, ''), + # PUT of new token + ('201 Created', {}, ''), + # POST of token to user object + ('204 No Content', {}, ''), + # GET of services object + ('200 Ok', {}, json.dumps({"storage": {"default": "local", + "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}}))])) + resp = Request.blank( + '/auth/v1.0', + headers={'X-Storage-User': 'act:usr', + 'X-Storage-Pass': 'key'}).get_response(self.test_auth) + self.assertEquals(resp.status_int, 200) + self.assert_( + resp.headers.get( + 'x-auth-token', + '').startswith('AUTH_tk'), resp.headers.get('x-auth-token')) + self.assertEquals(resp.headers.get('x-auth-token'), + resp.headers.get('x-storage-token')) + self.assertEquals(resp.headers.get('x-storage-url'), + 'http://127.0.0.1:8080/v1/AUTH_cfa') + self.assertEquals( + json.loads(resp.body), + {"storage": {"default": "local", + "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}}) + self.assertEquals(self.test_auth.app.calls, 5) + + def test_get_token_success_v1_act_auth_auth_instead_of_storage( + self): + self.test_auth.app = FakeApp(iter([ + # GET of user object + ('200 Ok', {}, + json.dumps({"auth": "plaintext:key", + "groups": [{'name': "act:usr"}, {'name': "act"}, + {'name': ".admin"}]})), + # GET of account + ('204 Ok', + {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, ''), + # PUT of new token + ('201 Created', {}, ''), + # POST of token to user object + ('204 No Content', {}, ''), + # GET of services object + ('200 Ok', {}, json.dumps({"storage": {"default": "local", + "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}}))])) + resp = Request.blank( + '/auth/v1/act/auth', + headers={'X-Auth-User': 'act:usr', + 'X-Auth-Key': 'key'}).get_response(self.test_auth) + self.assertEquals(resp.status_int, 200) + self.assert_(resp.headers.get( + 'x-auth-token', + '').startswith('AUTH_tk'), resp.headers.get('x-auth-token')) + self.assertEquals(resp.headers.get('x-auth-token'), + resp.headers.get('x-storage-token')) + self.assertEquals(resp.headers.get('x-storage-url'), + 'http://127.0.0.1:8080/v1/AUTH_cfa') + self.assertEquals( + json.loads(resp.body), + {"storage": {"default": "local", + "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}}) + self.assertEquals(self.test_auth.app.calls, 5) + + def test_get_token_success_existing_token(self): + self.test_auth.app = FakeApp(iter([ + # GET of user object + ('200 Ok', {'X-Object-Meta-Auth-Token': 'AUTH_tktest'}, + json.dumps({"auth": "plaintext:key", + "groups": [{'name': "act:usr"}, {'name': "act"}, + {'name': ".admin"}]})), + # GET of token + ('200 Ok', {}, json.dumps( + {"account": "act", "user": "usr", + "account_id": "AUTH_cfa", + "groups": [{'name': "act:usr"}, + {'name': "key"}, {'name': ".admin"}], + "expires": 9999999999.9999999})), + # GET of services object + ('200 Ok', {}, json.dumps({"storage": {"default": "local", + "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}}))])) + resp = Request.blank( + '/auth/v1.0', + headers={'X-Auth-User': 'act:usr', + 'X-Auth-Key': 'key'}).get_response(self.test_auth) + self.assertEquals(resp.status_int, 200) + self.assertEquals( + resp.headers.get('x-auth-token'), + 'AUTH_tktest') + self.assertEquals(resp.headers.get('x-auth-token'), + resp.headers.get('x-storage-token')) + self.assertEquals(resp.headers.get('x-storage-url'), + 'http://127.0.0.1:8080/v1/AUTH_cfa') + self.assertEquals( + json.loads(resp.body), + {"storage": {"default": "local", + "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}}) + self.assertEquals(self.test_auth.app.calls, 3) + + def test_get_token_success_existing_token_but_request_new_one( + self): + self.test_auth.app = FakeApp(iter([ + # GET of user object + ('200 Ok', {'X-Object-Meta-Auth-Token': 'AUTH_tktest'}, + json.dumps({"auth": "plaintext:key", + "groups": [{'name': "act:usr"}, {'name': "act"}, + {'name': ".admin"}]})), + # DELETE of expired token + ('204 No Content', {}, ''), + # GET of account + ('204 Ok', + {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, ''), + # PUT of new token + ('201 Created', {}, ''), + # POST of token to user object + ('204 No Content', {}, ''), + # GET of services object + ('200 Ok', {}, json.dumps({"storage": {"default": "local", + "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}}))])) + resp = Request.blank( + '/auth/v1.0', + headers={'X-Auth-User': 'act:usr', + 'X-Auth-Key': 'key', + 'X-Auth-New-Token': 'true'}).get_response( + self.test_auth) + self.assertEquals(resp.status_int, 200) + self.assertNotEquals( + resp.headers.get('x-auth-token'), 'AUTH_tktest') + self.assertEquals(resp.headers.get('x-auth-token'), + resp.headers.get('x-storage-token')) + self.assertEquals(resp.headers.get('x-storage-url'), + 'http://127.0.0.1:8080/v1/AUTH_cfa') + self.assertEquals( + json.loads(resp.body), + {"storage": {"default": "local", + "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}}) + self.assertEquals(self.test_auth.app.calls, 6) + + def test_get_token_success_existing_token_expired(self): + self.test_auth.app = FakeApp(iter([ + # GET of user object + ('200 Ok', {'X-Object-Meta-Auth-Token': 'AUTH_tktest'}, + json.dumps({"auth": "plaintext:key", + "groups": [{'name': "act:usr"}, {'name': "act"}, + {'name': ".admin"}]})), + # GET of token + ('200 Ok', {}, json.dumps( + {"account": "act", "user": "usr", + "account_id": "AUTH_cfa", + "groups": [{'name': "act:usr"}, + {'name': "key"}, {'name': ".admin"}], + "expires": 0.0})), + # DELETE of expired token + ('204 No Content', {}, ''), + # GET of account + ('204 Ok', + {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, ''), + # PUT of new token + ('201 Created', {}, ''), + # POST of token to user object + ('204 No Content', {}, ''), + # GET of services object + ('200 Ok', {}, json.dumps({"storage": {"default": "local", + "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}}))])) + resp = Request.blank( + '/auth/v1.0', + headers={'X-Auth-User': 'act:usr', + 'X-Auth-Key': 'key'}).get_response(self.test_auth) + self.assertEquals(resp.status_int, 200) + self.assertNotEquals( + resp.headers.get('x-auth-token'), + 'AUTH_tktest') + self.assertEquals(resp.headers.get('x-auth-token'), + resp.headers.get('x-storage-token')) + self.assertEquals(resp.headers.get('x-storage-url'), + 'http://127.0.0.1:8080/v1/AUTH_cfa') + self.assertEquals( + json.loads(resp.body), + {"storage": {"default": "local", + "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}}) + self.assertEquals(self.test_auth.app.calls, 7) + + def test_get_token_success_existing_token_expired_fail_deleting_old( + self): + self.test_auth.app = FakeApp(iter([ + # GET of user object + ('200 Ok', {'X-Object-Meta-Auth-Token': 'AUTH_tktest'}, + json.dumps({"auth": "plaintext:key", + "groups": [{'name': "act:usr"}, {'name': "act"}, + {'name': ".admin"}]})), + # GET of token + ('200 Ok', {}, json.dumps({"account": "act", "user": "usr", + "account_id": "AUTH_cfa", + "groups": [{'name': "act:usr"}, + {'name': "key"}, {'name': ".admin"}], + "expires": 0.0})), + # DELETE of expired token + ('503 Service Unavailable', {}, ''), + # GET of account + ('204 Ok', + {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, ''), + # PUT of new token + ('201 Created', {}, ''), + # POST of token to user object + ('204 No Content', {}, ''), + # GET of services object + ('200 Ok', {}, json.dumps({"storage": {"default": "local", + "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}}))])) + resp = Request.blank( + '/auth/v1.0', + headers={'X-Auth-User': 'act:usr', + 'X-Auth-Key': 'key'}).get_response(self.test_auth) + self.assertEquals(resp.status_int, 200) + self.assertNotEquals( + resp.headers.get('x-auth-token'), + 'AUTH_tktest') + self.assertEquals(resp.headers.get('x-auth-token'), + resp.headers.get('x-storage-token')) + self.assertEquals(resp.headers.get('x-storage-url'), + 'http://127.0.0.1:8080/v1/AUTH_cfa') + self.assertEquals( + json.loads(resp.body), + {"storage": {"default": "local", + "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}}) + self.assertEquals(self.test_auth.app.calls, 7) + + def test_prep_success(self): + list_to_iter = [ + # PUT of .auth account + ('201 Created', {}, ''), + # PUT of .account_id container + ('201 Created', {}, '')] + # PUT of .token* containers + for x in xrange(16): + list_to_iter.append(('201 Created', {}, '')) + self.test_auth.app = FakeApp(iter(list_to_iter)) + resp = Request.blank('/auth/v2/.prep', + environ={ + 'REQUEST_METHOD': 'POST'}, + headers={ + 'X-Auth-Admin-User': + '.super_admin', + 'X-Auth-Admin-Key': 'supertest'} + ).get_response(self.test_auth) + self.assertEquals(resp.status_int, 204) + self.assertEquals(self.test_auth.app.calls, 18) + + def test_prep_bad_method(self): + resp = Request.blank('/auth/v2/.prep', + headers={ + 'X-Auth-Admin-User': + '.super_admin', + 'X-Auth-Admin-Key': 'supertest'} + ).get_response(self.test_auth) + self.assertEquals(resp.status_int, 400) + resp = Request.blank('/auth/v2/.prep', + environ={ + 'REQUEST_METHOD': 'HEAD'}, + headers={ + 'X-Auth-Admin-User': + '.super_admin', + 'X-Auth-Admin-Key': 'supertest'} + ).get_response(self.test_auth) + self.assertEquals(resp.status_int, 400) + resp = Request.blank('/auth/v2/.prep', + environ={ + 'REQUEST_METHOD': 'PUT'}, + headers={ + 'X-Auth-Admin-User': + '.super_admin', + 'X-Auth-Admin-Key': 'supertest'} + ).get_response(self.test_auth) + self.assertEquals(resp.status_int, 400) + + def test_prep_bad_creds(self): + resp = Request.blank('/auth/v2/.prep', + environ={ + 'REQUEST_METHOD': 'POST'}, + headers={ + 'X-Auth-Admin-User': + 'super_admin', + 'X-Auth-Admin-Key': 'supertest'} + ).get_response(self.test_auth) + self.assertEquals(resp.status_int, 403) + resp = Request.blank('/auth/v2/.prep', + environ={ + 'REQUEST_METHOD': 'POST'}, + headers={ + 'X-Auth-Admin-User': + '.super_admin', + 'X-Auth-Admin-Key': 'upertest'} + ).get_response(self.test_auth) + self.assertEquals(resp.status_int, 403) + resp = Request.blank('/auth/v2/.prep', + environ={ + 'REQUEST_METHOD': 'POST'}, + headers={ + 'X-Auth-Admin-User': '.super_admin'} + ).get_response(self.test_auth) + self.assertEquals(resp.status_int, 403) + resp = Request.blank('/auth/v2/.prep', + environ={ + 'REQUEST_METHOD': 'POST'}, + headers={ + 'X-Auth-Admin-Key': 'supertest'} + ).get_response(self.test_auth) + self.assertEquals(resp.status_int, 403) + resp = Request.blank( + '/auth/v2/.prep', + environ={'REQUEST_METHOD': 'POST'}).get_response(self.test_auth) + self.assertEquals(resp.status_int, 403) + + def test_prep_fail_account_create(self): + self.test_auth.app = FakeApp(iter([ + # PUT of .auth account + ('503 Service Unavailable', {}, '')])) + resp = Request.blank('/auth/v2/.prep', + environ={ + 'REQUEST_METHOD': 'POST'}, + headers={ + 'X-Auth-Admin-User': + '.super_admin', + 'X-Auth-Admin-Key': 'supertest'} + ).get_response(self.test_auth) + self.assertEquals(resp.status_int, 500) + self.assertEquals(self.test_auth.app.calls, 1) + + def test_prep_fail_token_container_create(self): + self.test_auth.app = FakeApp(iter([ + # PUT of .auth account + ('201 Created', {}, ''), + # PUT of .token container + ('503 Service Unavailable', {}, '')])) + resp = Request.blank('/auth/v2/.prep', + environ={ + 'REQUEST_METHOD': 'POST'}, + headers={ + 'X-Auth-Admin-User': + '.super_admin', + 'X-Auth-Admin-Key': 'supertest'} + ).get_response(self.test_auth) + self.assertEquals(resp.status_int, 500) + self.assertEquals(self.test_auth.app.calls, 2) + + def test_prep_fail_account_id_container_create(self): + self.test_auth.app = FakeApp(iter([ + # PUT of .auth account + ('201 Created', {}, ''), + # PUT of .token container + ('201 Created', {}, ''), + # PUT of .account_id container + ('503 Service Unavailable', {}, '')])) + resp = Request.blank('/auth/v2/.prep', + environ={ + 'REQUEST_METHOD': 'POST'}, + headers={ + 'X-Auth-Admin-User': + '.super_admin', + 'X-Auth-Admin-Key': 'supertest'} + ).get_response(self.test_auth) + self.assertEquals(resp.status_int, 500) + self.assertEquals(self.test_auth.app.calls, 3) + + def test_get_reseller_success(self): + self.test_auth.app = FakeApp(iter([ + # GET of .auth account (list containers) + ('200 Ok', {}, json.dumps([ + {"name": ".token", "count": 0, "bytes": 0}, + {"name": ".account_id", + "count": 0, "bytes": 0}, + {"name": "act", "count": 0, "bytes": 0}])), + # GET of .auth account (list containers + # continuation) + ('200 Ok', {}, '[]')])) + resp = Request.blank('/auth/v2', + headers={ + 'X-Auth-Admin-User': + '.super_admin', + 'X-Auth-Admin-Key': 'supertest'} + ).get_response(self.test_auth) + self.assertEquals(resp.status_int, 200) + self.assertEquals(json.loads(resp.body), + {"accounts": [{"name": "act"}]}) + self.assertEquals(self.test_auth.app.calls, 2) + + self.test_auth.app = FakeApp(iter([ + # GET of user object + ('200 Ok', {}, json.dumps({"groups": [{"name": "act:adm"}, + {"name": "test"}, {"name": ".admin"}, + {"name": ".reseller_admin"}], "auth": "plaintext:key"})), + # GET of .auth account (list containers) + ('200 Ok', {}, json.dumps([ + {"name": ".token", "count": 0, "bytes": 0}, + {"name": ".account_id", + "count": 0, "bytes": 0}, + {"name": "act", "count": 0, "bytes": 0}])), + # GET of .auth account (list containers + # continuation) + ('200 Ok', {}, '[]')])) + resp = Request.blank('/auth/v2', + headers={ + 'X-Auth-Admin-User': + 'act:adm', + 'X-Auth-Admin-Key': 'key'} + ).get_response(self.test_auth) + self.assertEquals(resp.status_int, 200) + self.assertEquals(json.loads(resp.body), + {"accounts": [{"name": "act"}]}) + self.assertEquals(self.test_auth.app.calls, 3) + + def test_get_reseller_fail_bad_creds(self): + self.test_auth.app = FakeApp(iter([ + # GET of user object + ('404 Not Found', {}, '')])) + resp = Request.blank('/auth/v2', + headers={ + 'X-Auth-Admin-User': + 'super:admin', + 'X-Auth-Admin-Key': 'supertest'} + ).get_response(self.test_auth) + self.assertEquals(resp.status_int, 403) + self.assertEquals(self.test_auth.app.calls, 1) + + self.test_auth.app = FakeApp(iter([ + # GET of user object (account admin, but not reseller + # admin) + ('200 Ok', {}, json.dumps({"groups": [{"name": "act:adm"}, + {"name": "test"}, {"name": ".admin"}], + "auth": "plaintext:key"}))])) + resp = Request.blank('/auth/v2', + headers={ + 'X-Auth-Admin-User': + 'act:adm', + 'X-Auth-Admin-Key': 'key'} + ).get_response(self.test_auth) + self.assertEquals(resp.status_int, 403) + self.assertEquals(self.test_auth.app.calls, 1) + + self.test_auth.app = FakeApp(iter([ + # GET of user object (regular user) + ('200 Ok', {}, json.dumps({"groups": [{"name": "act:usr"}, + {"name": "test"}], "auth": "plaintext:key"}))])) + resp = Request.blank('/auth/v2', + headers={ + 'X-Auth-Admin-User': + 'act:usr', + 'X-Auth-Admin-Key': 'key'} + ).get_response(self.test_auth) + self.assertEquals(resp.status_int, 403) + self.assertEquals(self.test_auth.app.calls, 1) + + def test_get_reseller_fail_listing(self): + self.test_auth.app = FakeApp(iter([ + # GET of .auth account (list containers) + ('503 Service Unavailable', {}, '')])) + resp = Request.blank('/auth/v2', + headers={ + 'X-Auth-Admin-User': + '.super_admin', + 'X-Auth-Admin-Key': 'supertest'} + ).get_response(self.test_auth) + self.assertEquals(resp.status_int, 500) + self.assertEquals(self.test_auth.app.calls, 1) + + self.test_auth.app = FakeApp(iter([ + # GET of .auth account (list containers) + ('200 Ok', {}, json.dumps([ + {"name": ".token", "count": 0, "bytes": 0}, + {"name": ".account_id", + "count": 0, "bytes": 0}, + {"name": "act", "count": 0, "bytes": 0}])), + # GET of .auth account (list containers + # continuation) + ('503 Service Unavailable', {}, '')])) + resp = Request.blank('/auth/v2', + headers={ + 'X-Auth-Admin-User': + '.super_admin', + 'X-Auth-Admin-Key': 'supertest'} + ).get_response(self.test_auth) + self.assertEquals(resp.status_int, 500) + self.assertEquals(self.test_auth.app.calls, 2) + + def test_get_account_success(self): + self.test_auth.app = FakeApp(iter([ + # GET of .services object + ('200 Ok', {}, + json.dumps( + {"storage": {"default": "local", + "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}})), + # GET of account container (list objects) + ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, + json.dumps([ + {"name": ".services", "hash": "etag", "bytes": 112, + "content_type": + "application/octet-stream", + "last_modified": "2010-12-03T17:16:27.618110"}, + {"name": "tester", "hash": "etag", "bytes": 104, + "content_type": + "application/octet-stream", + "last_modified": "2010-12-03T17:16:27.736680"}, + {"name": "tester3", "hash": "etag", "bytes": 86, + "content_type": + "application/octet-stream", + "last_modified": "2010-12-03T17:16:28.135530"}])), + # GET of account container (list objects + # continuation) + ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, '[]')])) + resp = Request.blank('/auth/v2/act', + headers={ + 'X-Auth-Admin-User': + '.super_admin', + 'X-Auth-Admin-Key': 'supertest'} + ).get_response(self.test_auth) + self.assertEquals(resp.status_int, 200) + self.assertEquals( + json.loads(resp.body), + {'account_id': 'AUTH_cfa', + 'services': {'storage': + {'default': 'local', + 'local': 'http://127.0.0.1:8080/v1/AUTH_cfa'}}, + 'users': [{'name': 'tester'}, {'name': 'tester3'}]}) + self.assertEquals(self.test_auth.app.calls, 3) + + self.test_auth.app = FakeApp(iter([ + # GET of user object + ('200 Ok', {}, json.dumps({"groups": [{"name": "act:adm"}, + {"name": "test"}, {"name": ".admin"}], + "auth": "plaintext:key"})), + # GET of .services object + ('200 Ok', {}, + json.dumps({"storage": + {"default": "local", + "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}})), + # GET of account container (list objects) + ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, + json.dumps([ + {"name": ".services", "hash": "etag", "bytes": 112, + "content_type": + "application/octet-stream", + "last_modified": "2010-12-03T17:16:27.618110"}, + {"name": "tester", "hash": "etag", "bytes": 104, + "content_type": + "application/octet-stream", + "last_modified": "2010-12-03T17:16:27.736680"}, + {"name": "tester3", "hash": "etag", "bytes": 86, + "content_type": + "application/octet-stream", + "last_modified": "2010-12-03T17:16:28.135530"}])), + # GET of account container (list objects + # continuation) + ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, '[]')])) + resp = Request.blank('/auth/v2/act', + headers={ + 'X-Auth-Admin-User': + 'act:adm', + 'X-Auth-Admin-Key': 'key'} + ).get_response(self.test_auth) + self.assertEquals(resp.status_int, 200) + self.assertEquals( + json.loads(resp.body), + {'account_id': 'AUTH_cfa', + 'services': {'storage': + {'default': 'local', + 'local': 'http://127.0.0.1:8080/v1/AUTH_cfa'}}, + 'users': [{'name': 'tester'}, {'name': 'tester3'}]}) + self.assertEquals(self.test_auth.app.calls, 4) + + def test_get_account_fail_bad_account_name(self): + resp = Request.blank('/auth/v2/.token', + headers={ + 'X-Auth-Admin-User': + '.super_admin', + 'X-Auth-Admin-Key': 'supertest'} + ).get_response(self.test_auth) + self.assertEquals(resp.status_int, 400) + resp = Request.blank('/auth/v2/.anything', + headers={ + 'X-Auth-Admin-User': + '.super_admin', + 'X-Auth-Admin-Key': 'supertest'} + ).get_response(self.test_auth) + self.assertEquals(resp.status_int, 400) + + def test_get_account_fail_creds(self): + self.test_auth.app = FakeApp(iter([ + # GET of user object + ('404 Not Found', {}, '')])) + resp = Request.blank('/auth/v2/act', + headers={ + 'X-Auth-Admin-User': + 'super:admin', + 'X-Auth-Admin-Key': 'supertest'} + ).get_response(self.test_auth) + self.assertEquals(resp.status_int, 403) + self.assertEquals(self.test_auth.app.calls, 1) + + self.test_auth.app = FakeApp(iter([ + # GET of user object (account admin, but wrong + # account) + ('200 Ok', {}, json.dumps({"groups": [{"name": "act2:adm"}, + {"name": "test"}, {"name": ".admin"}], + "auth": "plaintext:key"}))])) + resp = Request.blank('/auth/v2/act', + headers={ + 'X-Auth-Admin-User': + 'act2:adm', + 'X-Auth-Admin-Key': 'key'} + ).get_response(self.test_auth) + self.assertEquals(resp.status_int, 403) + self.assertEquals(self.test_auth.app.calls, 1) + + self.test_auth.app = FakeApp(iter([ + # GET of user object (regular user) + ('200 Ok', {}, json.dumps({"groups": [{"name": "act:usr"}, + {"name": "test"}], "auth": "plaintext:key"}))])) + resp = Request.blank('/auth/v2/act', + headers={ + 'X-Auth-Admin-User': + 'act:usr', + 'X-Auth-Admin-Key': 'key'} + ).get_response(self.test_auth) + self.assertEquals(resp.status_int, 403) + self.assertEquals(self.test_auth.app.calls, 1) + + def test_get_account_fail_get_services(self): + self.test_auth.app = FakeApp(iter([ + # GET of .services object + ('503 Service Unavailable', {}, '')])) + resp = Request.blank('/auth/v2/act', + headers={ + 'X-Auth-Admin-User': + '.super_admin', + 'X-Auth-Admin-Key': 'supertest'} + ).get_response(self.test_auth) + self.assertEquals(resp.status_int, 500) + self.assertEquals(self.test_auth.app.calls, 1) + + self.test_auth.app = FakeApp(iter([ + # GET of .services object + ('404 Not Found', {}, '')])) + resp = Request.blank('/auth/v2/act', + headers={ + 'X-Auth-Admin-User': + '.super_admin', + 'X-Auth-Admin-Key': 'supertest'} + ).get_response(self.test_auth) + self.assertEquals(resp.status_int, 404) + self.assertEquals(self.test_auth.app.calls, 1) + + def test_get_account_fail_listing(self): + self.test_auth.app = FakeApp(iter([ + # GET of .services object + ('200 Ok', {}, json.dumps( + {"storage": {"default": "local", + "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}})), + # GET of account container (list objects) + ('503 Service Unavailable', {}, '')])) + resp = Request.blank('/auth/v2/act', + headers={ + 'X-Auth-Admin-User': + '.super_admin', + 'X-Auth-Admin-Key': 'supertest'} + ).get_response(self.test_auth) + self.assertEquals(resp.status_int, 500) + self.assertEquals(self.test_auth.app.calls, 2) + + self.test_auth.app = FakeApp(iter([ + # GET of .services object + ('200 Ok', {}, json.dumps( + {"storage": {"default": "local", + "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}})), + # GET of account container (list objects) + ('404 Not Found', {}, '')])) + resp = Request.blank('/auth/v2/act', + headers={ + 'X-Auth-Admin-User': + '.super_admin', + 'X-Auth-Admin-Key': 'supertest'} + ).get_response(self.test_auth) + self.assertEquals(resp.status_int, 404) + self.assertEquals(self.test_auth.app.calls, 2) + + self.test_auth.app = FakeApp(iter([ + # GET of .services object + ('200 Ok', {}, json.dumps( + {"storage": {"default": "local", + "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}})), + # GET of account container (list objects) + ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, + json.dumps([ + {"name": ".services", "hash": "etag", "bytes": 112, + "content_type": + "application/octet-stream", + "last_modified": "2010-12-03T17:16:27.618110"}, + {"name": "tester", "hash": "etag", "bytes": 104, + "content_type": + "application/octet-stream", + "last_modified": "2010-12-03T17:16:27.736680"}, + {"name": "tester3", "hash": "etag", "bytes": 86, + "content_type": + "application/octet-stream", + "last_modified": "2010-12-03T17:16:28.135530"}])), + # GET of account container (list objects + # continuation) + ('503 Service Unavailable', {}, '')])) + resp = Request.blank('/auth/v2/act', + headers={ + 'X-Auth-Admin-User': + '.super_admin', + 'X-Auth-Admin-Key': 'supertest'} + ).get_response(self.test_auth) + self.assertEquals(resp.status_int, 500) + self.assertEquals(self.test_auth.app.calls, 3) + + def test_set_services_new_service(self): + self.test_auth.app = FakeApp(iter([ + # GET of .services object + ('200 Ok', {}, + json.dumps({"storage": + {"default": "local", + "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}})), + # PUT of new .services object + ('204 No Content', {}, '')])) + resp = Request.blank('/auth/v2/act/.services', + environ={ + 'REQUEST_METHOD': 'POST'}, + headers={ + 'X-Auth-Admin-User': + '.super_admin', + 'X-Auth-Admin-Key': 'supertest'}, + body=json.dumps( + {'new_service': + {'new_endpoint': 'new_value'}}) + ).get_response(self.test_auth) + self.assertEquals(resp.status_int, 200) + self.assertEquals( + json.loads(resp.body), + {'storage': {'default': 'local', + 'local': 'http://127.0.0.1:8080/v1/AUTH_cfa'}, + 'new_service': {'new_endpoint': 'new_value'}}) + self.assertEquals(self.test_auth.app.calls, 2) + + def test_set_services_new_endpoint(self): + self.test_auth.app = FakeApp(iter([ + # GET of .services object + ('200 Ok', {}, json.dumps( + {"storage": + {"default": "local", + "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}})), + # PUT of new .services object + ('204 No Content', {}, '')])) + resp = Request.blank('/auth/v2/act/.services', + environ={ + 'REQUEST_METHOD': 'POST'}, + headers={ + 'X-Auth-Admin-User': + '.super_admin', + 'X-Auth-Admin-Key': 'supertest'}, + body=json.dumps( + {'storage': {'new_endpoint': 'new_value'}}) + ).get_response(self.test_auth) + self.assertEquals(resp.status_int, 200) + self.assertEquals( + json.loads(resp.body), + {'storage': {'default': 'local', + 'local': + 'http://127.0.0.1:8080/v1/AUTH_cfa', + 'new_endpoint': 'new_value'}}) + self.assertEquals(self.test_auth.app.calls, 2) + + def test_set_services_update_endpoint(self): + self.test_auth.app = FakeApp(iter([ + # GET of .services object + ('200 Ok', {}, json.dumps( + {"storage": {"default": "local", + "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}})), + # PUT of new .services object + ('204 No Content', {}, '')])) + resp = Request.blank('/auth/v2/act/.services', + environ={ + 'REQUEST_METHOD': 'POST'}, + headers={ + 'X-Auth-Admin-User': + '.super_admin', + 'X-Auth-Admin-Key': 'supertest'}, + body=json.dumps( + {'storage': {'local': 'new_value'}}) + ).get_response(self.test_auth) + self.assertEquals(resp.status_int, 200) + self.assertEquals(json.loads(resp.body), + {'storage': {'default': 'local', + 'local': 'new_value'}}) + self.assertEquals(self.test_auth.app.calls, 2) + + def test_set_services_fail_bad_creds(self): + self.test_auth.app = FakeApp(iter([ + # GET of user object + ('404 Not Found', {}, '')])) + resp = Request.blank('/auth/v2/act/.services', + environ={ + 'REQUEST_METHOD': 'POST'}, + headers={ + 'X-Auth-Admin-User': + 'super:admin', + 'X-Auth-Admin-Key': 'supertest'}, + body=json.dumps( + {'storage': {'local': 'new_value'}}) + ).get_response(self.test_auth) + self.assertEquals(resp.status_int, 403) + self.assertEquals(self.test_auth.app.calls, 1) + + self.test_auth.app = FakeApp(iter([ + # GET of user object (account admin, but not reseller + # admin) + ('200 Ok', {}, json.dumps({"groups": [{"name": "act:adm"}, + {"name": "test"}, {"name": ".admin"}], + "auth": "plaintext:key"}))])) + resp = Request.blank('/auth/v2/act/.services', + environ={ + 'REQUEST_METHOD': 'POST'}, + headers={ + 'X-Auth-Admin-User': + 'act:adm', + 'X-Auth-Admin-Key': 'key'}, + body=json.dumps( + {'storage': {'local': 'new_value'}}) + ).get_response(self.test_auth) + self.assertEquals(resp.status_int, 403) + self.assertEquals(self.test_auth.app.calls, 1) + + self.test_auth.app = FakeApp(iter([ + # GET of user object (regular user) + ('200 Ok', {}, json.dumps({"groups": [{"name": "act:usr"}, + {"name": "test"}], "auth": "plaintext:key"}))])) + resp = Request.blank('/auth/v2/act/.services', + environ={ + 'REQUEST_METHOD': 'POST'}, + headers={ + 'X-Auth-Admin-User': + 'act:usr', + 'X-Auth-Admin-Key': 'key'}, + body=json.dumps( + {'storage': {'local': 'new_value'}}) + ).get_response(self.test_auth) + self.assertEquals(resp.status_int, 403) + self.assertEquals(self.test_auth.app.calls, 1) + + def test_set_services_fail_bad_account_name(self): + resp = Request.blank('/auth/v2/.act/.services', + environ={ + 'REQUEST_METHOD': 'POST'}, + headers={ + 'X-Auth-Admin-User': + '.super_admin', + 'X-Auth-Admin-Key': 'supertest'}, + body=json.dumps( + {'storage': {'local': 'new_value'}}) + ).get_response(self.test_auth) + self.assertEquals(resp.status_int, 400) + + def test_set_services_fail_bad_json(self): + resp = Request.blank('/auth/v2/act/.services', + environ={ + 'REQUEST_METHOD': 'POST'}, + headers={ + 'X-Auth-Admin-User': + '.super_admin', + 'X-Auth-Admin-Key': 'supertest'}, + body='garbage' + ).get_response(self.test_auth) + self.assertEquals(resp.status_int, 400) + resp = Request.blank('/auth/v2/act/.services', + environ={ + 'REQUEST_METHOD': 'POST'}, + headers={ + 'X-Auth-Admin-User': + '.super_admin', + 'X-Auth-Admin-Key': 'supertest'}, + body='' + ).get_response(self.test_auth) + self.assertEquals(resp.status_int, 400) + + def test_set_services_fail_get_services(self): + self.test_auth.app = FakeApp(iter([ + # GET of .services object + ('503 Unavailable', {}, '')])) + resp = Request.blank('/auth/v2/act/.services', + environ={ + 'REQUEST_METHOD': 'POST'}, + headers={ + 'X-Auth-Admin-User': + '.super_admin', + 'X-Auth-Admin-Key': 'supertest'}, + body=json.dumps({ + 'new_service': {'new_endpoint': 'new_value'}}) + ).get_response(self.test_auth) + self.assertEquals(resp.status_int, 500) + self.assertEquals(self.test_auth.app.calls, 1) + + self.test_auth.app = FakeApp(iter([ + # GET of .services object + ('404 Not Found', {}, '')])) + resp = Request.blank('/auth/v2/act/.services', + environ={ + 'REQUEST_METHOD': 'POST'}, + headers={ + 'X-Auth-Admin-User': + '.super_admin', + 'X-Auth-Admin-Key': 'supertest'}, + body=json.dumps({ + 'new_service': {'new_endpoint': 'new_value'}}) + ).get_response(self.test_auth) + self.assertEquals(resp.status_int, 404) + self.assertEquals(self.test_auth.app.calls, 1) + + def test_set_services_fail_put_services(self): + self.test_auth.app = FakeApp(iter([ + # GET of .services object + ('200 Ok', {}, json.dumps( + {"storage": {"default": "local", + "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}})), + # PUT of new .services object + ('503 Unavailable', {}, '')])) + resp = Request.blank('/auth/v2/act/.services', + environ={ + 'REQUEST_METHOD': 'POST'}, + headers={ + 'X-Auth-Admin-User': + '.super_admin', + 'X-Auth-Admin-Key': 'supertest'}, + body=json.dumps( + {'new_service': + {'new_endpoint': 'new_value'}}) + ).get_response(self.test_auth) + self.assertEquals(resp.status_int, 500) + self.assertEquals(self.test_auth.app.calls, 2) + + def test_put_account_success(self): + conn = FakeConn(iter([ + # PUT of storage account itself + ('201 Created', {}, '')])) + self.test_auth.get_conn = lambda: conn + self.test_auth.app = FakeApp(iter([ + # Initial HEAD of account container to check for + # pre-existence + ('404 Not Found', {}, ''), + # PUT of account container + ('204 No Content', {}, ''), + # PUT of .account_id mapping object + ('204 No Content', {}, ''), + # PUT of .services object + ('204 No Content', {}, ''), + # POST to account container updating + # X-Container-Meta-Account-Id + ('204 No Content', {}, '')])) + resp = Request.blank( + '/auth/v2/act', + environ={ + 'REQUEST_METHOD': 'PUT', 'swift.cache': FakeMemcache()}, + headers={'X-Auth-Admin-User': '.super_admin', + 'X-Auth-Admin-Key': 'supertest'} + ).get_response(self.test_auth) + self.assertEquals(resp.status_int, 201) + self.assertEquals(self.test_auth.app.calls, 5) + self.assertEquals(conn.calls, 1) + + def test_put_account_success_preexist_but_not_completed( + self): + conn = FakeConn(iter([ + # PUT of storage account itself + ('201 Created', {}, '')])) + self.test_auth.get_conn = lambda: conn + self.test_auth.app = FakeApp(iter([ + # Initial HEAD of account container to check for pre-existence + # We're going to show it as existing this time, but with no + # X-Container-Meta-Account-Id, indicating a failed + # previous attempt + ('200 Ok', {}, ''), + # PUT of .account_id mapping object + ('204 No Content', {}, ''), + # PUT of .services object + ('204 No Content', {}, ''), + # POST to account container updating + # X-Container-Meta-Account-Id + ('204 No Content', {}, '')])) + resp = Request.blank( + '/auth/v2/act', + environ={ + 'REQUEST_METHOD': 'PUT', + 'swift.cache': FakeMemcache()}, + headers={'X-Auth-Admin-User': '.super_admin', + 'X-Auth-Admin-Key': 'supertest'} + ).get_response(self.test_auth) + self.assertEquals(resp.status_int, 201) + self.assertEquals(self.test_auth.app.calls, 4) + self.assertEquals(conn.calls, 1) + + def test_put_account_success_preexist_and_completed( + self): + self.test_auth.app = FakeApp(iter([ + # Initial HEAD of account container to check for pre-existence + # We're going to show it as existing this time, and with an + # X-Container-Meta-Account-Id, indicating it already + # exists + ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, '')])) + resp = Request.blank( + '/auth/v2/act', + environ={ + 'REQUEST_METHOD': 'PUT', 'swift.cache': FakeMemcache()}, + headers={'X-Auth-Admin-User': '.super_admin', + 'X-Auth-Admin-Key': 'supertest'} + ).get_response(self.test_auth) + self.assertEquals(resp.status_int, 202) + self.assertEquals(self.test_auth.app.calls, 1) + + def test_put_account_success_with_given_suffix(self): + conn = FakeConn(iter([ + # PUT of storage account itself + ('201 Created', {}, '')])) + self.test_auth.get_conn = lambda: conn + self.test_auth.app = FakeApp(iter([ + # Initial HEAD of account container to check for + # pre-existence + ('404 Not Found', {}, ''), + # PUT of account container + ('204 No Content', {}, ''), + # PUT of .account_id mapping object + ('204 No Content', {}, ''), + # PUT of .services object + ('204 No Content', {}, ''), + # POST to account container updating + # X-Container-Meta-Account-Id + ('204 No Content', {}, '')])) + resp = Request.blank( + '/auth/v2/act', + environ={ + 'REQUEST_METHOD': 'PUT', 'swift.cache': FakeMemcache()}, + headers={'X-Auth-Admin-User': '.super_admin', + 'X-Auth-Admin-Key': 'supertest', + 'X-Account-Suffix': 'test-suffix'}).get_response( + self.test_auth) + self.assertEquals(resp.status_int, 201) + self.assertEquals( + conn.request_path, + '/v1/AUTH_test-suffix') + self.assertEquals(self.test_auth.app.calls, 5) + self.assertEquals(conn.calls, 1) + + def test_put_account_fail_bad_creds(self): + self.test_auth.app = FakeApp(iter([ + # GET of user object + ('404 Not Found', {}, '')])) + resp = Request.blank( + '/auth/v2/act', + environ={'REQUEST_METHOD': 'PUT', + 'swift.cache': FakeMemcache()}, + headers={'X-Auth-Admin-User': 'super:admin', + 'X-Auth-Admin-Key': 'supertest'},).get_response( + self.test_auth) + self.assertEquals(resp.status_int, 403) + self.assertEquals(self.test_auth.app.calls, 1) + + self.test_auth.app = FakeApp(iter([ + # GET of user object (account admin, but not reseller + # admin) + ('200 Ok', {}, json.dumps({"groups": [{"name": "act:adm"}, + {"name": "test"}, {"name": ".admin"}], + "auth": "plaintext:key"}))])) + resp = Request.blank( + '/auth/v2/act', + environ={'REQUEST_METHOD': 'PUT', + 'swift.cache': FakeMemcache()}, + headers={'X-Auth-Admin-User': 'act:adm', + 'X-Auth-Admin-Key': 'key'},).get_response( + self.test_auth) + self.assertEquals(resp.status_int, 403) + self.assertEquals(self.test_auth.app.calls, 1) + + self.test_auth.app = FakeApp(iter([ + # GET of user object (regular user) + ('200 Ok', {}, json.dumps({"groups": [{"name": "act:usr"}, + {"name": "test"}], "auth": "plaintext:key"}))])) + resp = Request.blank( + '/auth/v2/act', + environ={'REQUEST_METHOD': 'PUT', 'swift.cache': FakeMemcache()}, + headers={'X-Auth-Admin-User': 'act:usr', + 'X-Auth-Admin-Key': 'key'},).get_response( + self.test_auth) + self.assertEquals(resp.status_int, 403) + self.assertEquals(self.test_auth.app.calls, 1) + + def test_put_account_fail_invalid_account_name(self): + resp = Request.blank( + '/auth/v2/.act', + environ={'REQUEST_METHOD': 'PUT', 'swift.cache': FakeMemcache()}, + headers={'X-Auth-Admin-User': '.super_admin', + 'X-Auth-Admin-Key': 'supertest'},).get_response( + self.test_auth) + self.assertEquals(resp.status_int, 400) + + def test_put_account_fail_on_initial_account_head(self): + self.test_auth.app = FakeApp(iter([ + # Initial HEAD of account container to check for + # pre-existence + ('503 Service Unavailable', {}, '')])) + resp = Request.blank( + '/auth/v2/act', + environ={'REQUEST_METHOD': 'PUT', + 'swift.cache': FakeMemcache()}, + headers={'X-Auth-Admin-User': '.super_admin', + 'X-Auth-Admin-Key': 'supertest'}).get_response( + self.test_auth) + self.assertEquals(resp.status_int, 500) + self.assertEquals(self.test_auth.app.calls, 1) + + def test_put_account_fail_on_account_marker_put(self): + self.test_auth.app = FakeApp(iter([ + # Initial HEAD of account container to check for + # pre-existence + ('404 Not Found', {}, ''), + # PUT of account container + ('503 Service Unavailable', {}, '')])) + resp = Request.blank( + '/auth/v2/act', + environ={'REQUEST_METHOD': 'PUT', + 'swift.cache': FakeMemcache()}, + headers={'X-Auth-Admin-User': '.super_admin', + 'X-Auth-Admin-Key': 'supertest'}).get_response( + self.test_auth) + self.assertEquals(resp.status_int, 500) + self.assertEquals(self.test_auth.app.calls, 2) + + def test_put_account_fail_on_storage_account_put(self): + conn = FakeConn(iter([ + # PUT of storage account itself + ('503 Service Unavailable', {}, '')])) + self.test_auth.get_conn = lambda: conn + self.test_auth.app = FakeApp(iter([ + # Initial HEAD of account container to check for + # pre-existence + ('404 Not Found', {}, ''), + # PUT of account container + ('204 No Content', {}, '')])) + resp = Request.blank( + '/auth/v2/act', + environ={'REQUEST_METHOD': 'PUT', + 'swift.cache': FakeMemcache()}, + headers={'X-Auth-Admin-User': '.super_admin', + 'X-Auth-Admin-Key': 'supertest'}).get_response( + self.test_auth) + self.assertEquals(resp.status_int, 500) + self.assertEquals(conn.calls, 1) + self.assertEquals(self.test_auth.app.calls, 2) + + def test_put_account_fail_on_account_id_mapping(self): + conn = FakeConn(iter([ + # PUT of storage account itself + ('201 Created', {}, '')])) + self.test_auth.get_conn = lambda: conn + self.test_auth.app = FakeApp(iter([ + # Initial HEAD of account container to check for + # pre-existence + ('404 Not Found', {}, ''), + # PUT of account container + ('204 No Content', {}, ''), + # PUT of .account_id mapping object + ('503 Service Unavailable', {}, '')])) + resp = Request.blank( + '/auth/v2/act', + environ={'REQUEST_METHOD': 'PUT', + 'swift.cache': FakeMemcache()}, + headers={'X-Auth-Admin-User': '.super_admin', + 'X-Auth-Admin-Key': 'supertest'}).get_response( + self.test_auth) + self.assertEquals(resp.status_int, 500) + self.assertEquals(conn.calls, 1) + self.assertEquals(self.test_auth.app.calls, 3) + + def test_put_account_fail_on_services_object(self): + conn = FakeConn(iter([ + # PUT of storage account itself + ('201 Created', {}, '')])) + self.test_auth.get_conn = lambda: conn + self.test_auth.app = FakeApp(iter([ + # Initial HEAD of account container to check for + # pre-existence + ('404 Not Found', {}, ''), + # PUT of account container + ('204 No Content', {}, ''), + # PUT of .account_id mapping object + ('204 No Content', {}, ''), + # PUT of .services object + ('503 Service Unavailable', {}, '')])) + resp = Request.blank( + '/auth/v2/act', + environ={'REQUEST_METHOD': 'PUT', + 'swift.cache': FakeMemcache()}, + headers={'X-Auth-Admin-User': '.super_admin', + 'X-Auth-Admin-Key': 'supertest'}).get_response( + self.test_auth) + self.assertEquals(resp.status_int, 500) + self.assertEquals(conn.calls, 1) + self.assertEquals(self.test_auth.app.calls, 4) + + def test_put_account_fail_on_post_mapping(self): + conn = FakeConn(iter([ + # PUT of storage account itself + ('201 Created', {}, '')])) + self.test_auth.get_conn = lambda: conn + self.test_auth.app = FakeApp(iter([ + # Initial HEAD of account container to check for + # pre-existence + ('404 Not Found', {}, ''), + # PUT of account container + ('204 No Content', {}, ''), + # PUT of .account_id mapping object + ('204 No Content', {}, ''), + # PUT of .services object + ('204 No Content', {}, ''), + # POST to account container updating + # X-Container-Meta-Account-Id + ('503 Service Unavailable', {}, '')])) + resp = Request.blank( + '/auth/v2/act', + environ={'REQUEST_METHOD': 'PUT', 'swift.cache': FakeMemcache()}, + headers={'X-Auth-Admin-User': '.super_admin', + 'X-Auth-Admin-Key': 'supertest'}).get_response( + self.test_auth) + self.assertEquals(resp.status_int, 500) + self.assertEquals(conn.calls, 1) + self.assertEquals(self.test_auth.app.calls, 5) + + def test_delete_account_success(self): + conn = FakeConn(iter([ + # DELETE of storage account itself + ('204 No Content', {}, '')])) + self.test_auth.get_conn = lambda x: conn + self.test_auth.app = FakeApp(iter([ + # Account's container listing, checking for + # users + ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, + json.dumps([ + {"name": ".services", "hash": "etag", "bytes": 112, + "content_type": + "application/octet-stream", + "last_modified": "2010-12-03T17:16:27.618110"}])), + # Account's container listing, checking for users + # (continuation) + ('200 Ok', + {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, '[]'), + # GET the .services object + ('200 Ok', {}, json.dumps( + {"storage": {"default": "local", + "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}})), + # DELETE the .services object + ('204 No Content', {}, ''), + # DELETE the .account_id mapping object + ('204 No Content', {}, ''), + # DELETE the account container + ('204 No Content', {}, '')])) + resp = Request.blank('/auth/v2/act', + environ={ + 'REQUEST_METHOD': 'DELETE', + 'swift.cache': FakeMemcache()}, + headers={ + 'X-Auth-Admin-User': + '.super_admin', + 'X-Auth-Admin-Key': 'supertest'} + ).get_response(self.test_auth) + self.assertEquals(resp.status_int, 204) + self.assertEquals(self.test_auth.app.calls, 6) + self.assertEquals(conn.calls, 1) + + def test_delete_account_success_missing_services(self): + self.test_auth.app = FakeApp(iter([ + # Account's container listing, checking for + # users + ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, + json.dumps([ + {"name": ".services", "hash": "etag", "bytes": 112, + "content_type": + "application/octet-stream", + "last_modified": "2010-12-03T17:16:27.618110"}])), + # Account's container listing, checking for users + # (continuation) + ('200 Ok', + {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, '[]'), + # GET the .services object + ('404 Not Found', {}, ''), + # DELETE the .account_id mapping object + ('204 No Content', {}, ''), + # DELETE the account container + ('204 No Content', {}, '')])) + resp = Request.blank('/auth/v2/act', + environ={ + 'REQUEST_METHOD': 'DELETE', + 'swift.cache': FakeMemcache()}, + headers={ + 'X-Auth-Admin-User': + '.super_admin', + 'X-Auth-Admin-Key': 'supertest'} + ).get_response(self.test_auth) + self.assertEquals(resp.status_int, 204) + self.assertEquals(self.test_auth.app.calls, 5) + + def test_delete_account_success_missing_storage_account( + self): + conn = FakeConn(iter([ + # DELETE of storage account itself + ('404 Not Found', {}, '')])) + self.test_auth.get_conn = lambda x: conn + self.test_auth.app = FakeApp(iter([ + # Account's container listing, checking for + # users + ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, + json.dumps([ + {"name": ".services", "hash": "etag", "bytes": 112, + "content_type": + "application/octet-stream", + "last_modified": "2010-12-03T17:16:27.618110"}])), + # Account's container listing, checking for users + # (continuation) + ('200 Ok', + {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, '[]'), + # GET the .services object + ('200 Ok', {}, json.dumps( + {"storage": {"default": "local", + "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}})), + # DELETE the .services object + ('204 No Content', {}, ''), + # DELETE the .account_id mapping object + ('204 No Content', {}, ''), + # DELETE the account container + ('204 No Content', {}, '')])) + resp = Request.blank('/auth/v2/act', + environ={ + 'REQUEST_METHOD': 'DELETE', + 'swift.cache': FakeMemcache()}, + headers={ + 'X-Auth-Admin-User': + '.super_admin', + 'X-Auth-Admin-Key': 'supertest'} + ).get_response(self.test_auth) + self.assertEquals(resp.status_int, 204) + self.assertEquals(self.test_auth.app.calls, 6) + self.assertEquals(conn.calls, 1) + + def test_delete_account_success_missing_account_id_mapping( + self): + conn = FakeConn(iter([ + # DELETE of storage account itself + ('204 No Content', {}, '')])) + self.test_auth.get_conn = lambda x: conn + self.test_auth.app = FakeApp(iter([ + # Account's container listing, checking for + # users + ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, + json.dumps([ + {"name": ".services", "hash": "etag", "bytes": 112, + "content_type": + "application/octet-stream", + "last_modified": "2010-12-03T17:16:27.618110"}])), + # Account's container listing, checking for users + # (continuation) + ('200 Ok', + {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, '[]'), + # GET the .services object + ('200 Ok', {}, json.dumps( + {"storage": {"default": "local", + "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}})), + # DELETE the .services object + ('204 No Content', {}, ''), + # DELETE the .account_id mapping object + ('404 Not Found', {}, ''), + # DELETE the account container + ('204 No Content', {}, '')])) + resp = Request.blank('/auth/v2/act', + environ={ + 'REQUEST_METHOD': 'DELETE', + 'swift.cache': FakeMemcache()}, + headers={ + 'X-Auth-Admin-User': + '.super_admin', + 'X-Auth-Admin-Key': 'supertest'} + ).get_response(self.test_auth) + self.assertEquals(resp.status_int, 204) + self.assertEquals(self.test_auth.app.calls, 6) + self.assertEquals(conn.calls, 1) + + def test_delete_account_success_missing_account_container_at_end( + self): + conn = FakeConn(iter([ + # DELETE of storage account itself + ('204 No Content', {}, '')])) + self.test_auth.get_conn = lambda x: conn + self.test_auth.app = FakeApp(iter([ + # Account's container listing, checking for + # users + ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, + json.dumps([ + {"name": ".services", "hash": "etag", "bytes": 112, + "content_type": + "application/octet-stream", + "last_modified": "2010-12-03T17:16:27.618110"}])), + # Account's container listing, checking for users + # (continuation) + ('200 Ok', + {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, '[]'), + # GET the .services object + ('200 Ok', {}, json.dumps( + {"storage": {"default": "local", + "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}})), + # DELETE the .services object + ('204 No Content', {}, ''), + # DELETE the .account_id mapping object + ('204 No Content', {}, ''), + # DELETE the account container + ('404 Not Found', {}, '')])) + resp = Request.blank('/auth/v2/act', + environ={ + 'REQUEST_METHOD': 'DELETE', + 'swift.cache': FakeMemcache()}, + headers={ + 'X-Auth-Admin-User': + '.super_admin', + 'X-Auth-Admin-Key': 'supertest'} + ).get_response(self.test_auth) + self.assertEquals(resp.status_int, 204) + self.assertEquals(self.test_auth.app.calls, 6) + self.assertEquals(conn.calls, 1) + + def test_delete_account_fail_bad_creds(self): + self.test_auth.app = FakeApp(iter([ + # GET of user object + ('404 Not Found', {}, '')])) + resp = Request.blank('/auth/v2/act', + environ={ + 'REQUEST_METHOD': 'DELETE', + 'swift.cache': FakeMemcache()}, + headers={ + 'X-Auth-Admin-User': + 'super:admin', + 'X-Auth-Admin-Key': 'supertest'}, + ).get_response(self.test_auth) + self.assertEquals(resp.status_int, 403) + self.assertEquals(self.test_auth.app.calls, 1) + + self.test_auth.app = FakeApp(iter([ + # GET of user object (account admin, but not reseller + # admin) + ('200 Ok', {}, json.dumps({"groups": [{"name": "act:adm"}, + {"name": "test"}, {"name": ".admin"}], + "auth": "plaintext:key"}))])) + resp = Request.blank('/auth/v2/act', + environ={ + 'REQUEST_METHOD': 'DELETE', + 'swift.cache': FakeMemcache()}, + headers={ + 'X-Auth-Admin-User': + 'act:adm', + 'X-Auth-Admin-Key': 'key'}, + ).get_response(self.test_auth) + self.assertEquals(resp.status_int, 403) + self.assertEquals(self.test_auth.app.calls, 1) + + self.test_auth.app = FakeApp(iter([ + # GET of user object (regular user) + ('200 Ok', {}, json.dumps({"groups": [{"name": "act:usr"}, + {"name": "test"}], "auth": "plaintext:key"}))])) + resp = Request.blank('/auth/v2/act', + environ={ + 'REQUEST_METHOD': 'DELETE', + 'swift.cache': FakeMemcache()}, + headers={ + 'X-Auth-Admin-User': + 'act:usr', + 'X-Auth-Admin-Key': 'key'}, + ).get_response(self.test_auth) + self.assertEquals(resp.status_int, 403) + self.assertEquals(self.test_auth.app.calls, 1) + + def test_delete_account_fail_invalid_account_name(self): + resp = Request.blank('/auth/v2/.act', + environ={ + 'REQUEST_METHOD': 'DELETE'}, + headers={ + 'X-Auth-Admin-User': + '.super_admin', + 'X-Auth-Admin-Key': 'supertest'} + ).get_response(self.test_auth) + self.assertEquals(resp.status_int, 400) + + def test_delete_account_fail_not_found(self): + self.test_auth.app = FakeApp(iter([ + # Account's container listing, checking for + # users + ('404 Not Found', {}, '')])) + resp = Request.blank('/auth/v2/act', + environ={ + 'REQUEST_METHOD': 'DELETE', + 'swift.cache': FakeMemcache()}, + headers={ + 'X-Auth-Admin-User': + '.super_admin', + 'X-Auth-Admin-Key': 'supertest'} + ).get_response(self.test_auth) + self.assertEquals(resp.status_int, 404) + self.assertEquals(self.test_auth.app.calls, 1) + + def test_delete_account_fail_not_found_concurrency( + self): + self.test_auth.app = FakeApp(iter([ + # Account's container listing, checking for + # users + ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, + json.dumps([ + {"name": ".services", "hash": "etag", "bytes": 112, + "content_type": + "application/octet-stream", + "last_modified": "2010-12-03T17:16:27.618110"}])), + # Account's container listing, checking for users + # (continuation) + ('404 Not Found', {}, '')])) + resp = Request.blank('/auth/v2/act', + environ={ + 'REQUEST_METHOD': 'DELETE', + 'swift.cache': FakeMemcache()}, + headers={ + 'X-Auth-Admin-User': + '.super_admin', + 'X-Auth-Admin-Key': 'supertest'} + ).get_response(self.test_auth) + self.assertEquals(resp.status_int, 404) + self.assertEquals(self.test_auth.app.calls, 2) + + def test_delete_account_fail_list_account(self): + self.test_auth.app = FakeApp(iter([ + # Account's container listing, checking for + # users + ('503 Service Unavailable', {}, '')])) + resp = Request.blank('/auth/v2/act', + environ={ + 'REQUEST_METHOD': 'DELETE', + 'swift.cache': FakeMemcache()}, + headers={ + 'X-Auth-Admin-User': + '.super_admin', + 'X-Auth-Admin-Key': 'supertest'} + ).get_response(self.test_auth) + self.assertEquals(resp.status_int, 500) + self.assertEquals(self.test_auth.app.calls, 1) + + def test_delete_account_fail_list_account_concurrency( + self): + self.test_auth.app = FakeApp(iter([ + # Account's container listing, checking for + # users + ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, + json.dumps([ + {"name": ".services", "hash": "etag", "bytes": 112, + "content_type": + "application/octet-stream", + "last_modified": "2010-12-03T17:16:27.618110"}])), + # Account's container listing, checking for users + # (continuation) + ('503 Service Unavailable', {}, '')])) + resp = Request.blank('/auth/v2/act', + environ={ + 'REQUEST_METHOD': 'DELETE', + 'swift.cache': FakeMemcache()}, + headers={ + 'X-Auth-Admin-User': + '.super_admin', + 'X-Auth-Admin-Key': 'supertest'} + ).get_response(self.test_auth) + self.assertEquals(resp.status_int, 500) + self.assertEquals(self.test_auth.app.calls, 2) + + def test_delete_account_fail_has_users(self): + self.test_auth.app = FakeApp(iter([ + # Account's container listing, checking for + # users + ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, + json.dumps([ + {"name": ".services", "hash": "etag", "bytes": 112, + "content_type": + "application/octet-stream", + "last_modified": "2010-12-03T17:16:27.618110"}, + {"name": "tester", "hash": "etag", "bytes": 104, + "content_type": + "application/octet-stream", + "last_modified": "2010-12-03T17:16:27.736680"}]))])) + resp = Request.blank('/auth/v2/act', + environ={ + 'REQUEST_METHOD': 'DELETE', + 'swift.cache': FakeMemcache()}, + headers={ + 'X-Auth-Admin-User': + '.super_admin', + 'X-Auth-Admin-Key': 'supertest'} + ).get_response(self.test_auth) + self.assertEquals(resp.status_int, 409) + self.assertEquals(self.test_auth.app.calls, 1) + + def test_delete_account_fail_has_users2(self): + self.test_auth.app = FakeApp(iter([ + # Account's container listing, checking for + # users + ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, + json.dumps([ + {"name": ".services", "hash": "etag", "bytes": 112, + "content_type": + "application/octet-stream", + "last_modified": "2010-12-03T17:16:27.618110"}])), + # Account's container listing, checking for users + # (continuation) + ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, + json.dumps([ + {"name": "tester", "hash": "etag", "bytes": 104, + "content_type": + "application/octet-stream", + "last_modified": "2010-12-03T17:16:27.736680"}]))])) + resp = Request.blank('/auth/v2/act', + environ={ + 'REQUEST_METHOD': 'DELETE', + 'swift.cache': FakeMemcache()}, + headers={ + 'X-Auth-Admin-User': + '.super_admin', + 'X-Auth-Admin-Key': 'supertest'} + ).get_response(self.test_auth) + self.assertEquals(resp.status_int, 409) + self.assertEquals(self.test_auth.app.calls, 2) + + def test_delete_account_fail_get_services(self): + self.test_auth.app = FakeApp(iter([ + # Account's container listing, checking for + # users + ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, + json.dumps([ + {"name": ".services", "hash": "etag", "bytes": 112, + "content_type": + "application/octet-stream", + "last_modified": "2010-12-03T17:16:27.618110"}])), + # Account's container listing, checking for users + # (continuation) + ('200 Ok', + {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, '[]'), + # GET the .services object + ('503 Service Unavailable', {}, '')])) + resp = Request.blank('/auth/v2/act', + environ={ + 'REQUEST_METHOD': 'DELETE', + 'swift.cache': FakeMemcache()}, + headers={ + 'X-Auth-Admin-User': + '.super_admin', + 'X-Auth-Admin-Key': 'supertest'} + ).get_response(self.test_auth) + self.assertEquals(resp.status_int, 500) + self.assertEquals(self.test_auth.app.calls, 3) + + def test_delete_account_fail_delete_storage_account( + self): + conn = FakeConn(iter([ + # DELETE of storage account itself + ('409 Conflict', {}, '')])) + self.test_auth.get_conn = lambda x: conn + self.test_auth.app = FakeApp(iter([ + # Account's container listing, checking for + # users + ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, + json.dumps([ + {"name": ".services", "hash": "etag", "bytes": 112, + "content_type": + "application/octet-stream", + "last_modified": "2010-12-03T17:16:27.618110"}])), + # Account's container listing, checking for users + # (continuation) + ('200 Ok', + {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, '[]'), + # GET the .services object + ('200 Ok', {}, json.dumps( + {"storage": {"default": "local", + "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}}))])) + resp = Request.blank('/auth/v2/act', + environ={ + 'REQUEST_METHOD': 'DELETE', + 'swift.cache': FakeMemcache()}, + headers={ + 'X-Auth-Admin-User': + '.super_admin', + 'X-Auth-Admin-Key': 'supertest'} + ).get_response(self.test_auth) + self.assertEquals(resp.status_int, 409) + self.assertEquals(self.test_auth.app.calls, 3) + self.assertEquals(conn.calls, 1) + + def test_delete_account_fail_delete_storage_account2( + self): + conn = FakeConn(iter([ + # DELETE of storage account itself + ('204 No Content', {}, ''), + # DELETE of storage account itself + ('409 Conflict', {}, '')])) + self.test_auth.get_conn = lambda x: conn + self.test_auth.app = FakeApp(iter([ + # Account's container listing, checking for + # users + ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, + json.dumps([ + {"name": ".services", "hash": "etag", "bytes": 112, + "content_type": + "application/octet-stream", + "last_modified": "2010-12-03T17:16:27.618110"}])), + # Account's container listing, checking for users + # (continuation) + ('200 Ok', + {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, '[]'), + # GET the .services object + ('200 Ok', {}, json.dumps( + {"storage": {"default": "local", + "local": "http://127.0.0.1:8080/v1/AUTH_cfa", + "other": "http://127.0.0.1:8080/v1/AUTH_cfa2"}}))])) + resp = Request.blank('/auth/v2/act', + environ={ + 'REQUEST_METHOD': 'DELETE', + 'swift.cache': FakeMemcache()}, + headers={ + 'X-Auth-Admin-User': + '.super_admin', + 'X-Auth-Admin-Key': 'supertest'} + ).get_response(self.test_auth) + self.assertEquals(resp.status_int, 500) + self.assertEquals(self.test_auth.app.calls, 3) + self.assertEquals(conn.calls, 2) + + def test_delete_account_fail_delete_storage_account3( + self): + conn = FakeConn(iter([ + # DELETE of storage account itself + ('503 Service Unavailable', {}, '')])) + self.test_auth.get_conn = lambda x: conn + self.test_auth.app = FakeApp(iter([ + # Account's container listing, checking for + # users + ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, + json.dumps([ + {"name": ".services", "hash": "etag", "bytes": 112, + "content_type": + "application/octet-stream", + "last_modified": "2010-12-03T17:16:27.618110"}])), + # Account's container listing, checking for users + # (continuation) + ('200 Ok', + {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, '[]'), + # GET the .services object + ('200 Ok', {}, json.dumps( + {"storage": {"default": "local", + "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}}))])) + resp = Request.blank('/auth/v2/act', + environ={ + 'REQUEST_METHOD': 'DELETE', + 'swift.cache': FakeMemcache()}, + headers={ + 'X-Auth-Admin-User': + '.super_admin', + 'X-Auth-Admin-Key': 'supertest'} + ).get_response(self.test_auth) + self.assertEquals(resp.status_int, 500) + self.assertEquals(self.test_auth.app.calls, 3) + self.assertEquals(conn.calls, 1) + + def test_delete_account_fail_delete_storage_account4( + self): + conn = FakeConn(iter([ + # DELETE of storage account itself + ('204 No Content', {}, ''), + # DELETE of storage account itself + ('503 Service Unavailable', {}, '')])) + self.test_auth.get_conn = lambda x: conn + self.test_auth.app = FakeApp(iter([ + # Account's container listing, checking for + # users + ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, + json.dumps([ + {"name": ".services", "hash": "etag", "bytes": 112, + "content_type": + "application/octet-stream", + "last_modified": "2010-12-03T17:16:27.618110"}])), + # Account's container listing, checking for users + # (continuation) + ('200 Ok', + {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, '[]'), + # GET the .services object + ('200 Ok', {}, json.dumps( + {"storage": {"default": "local", + "local": "http://127.0.0.1:8080/v1/AUTH_cfa", + "other": "http://127.0.0.1:8080/v1/AUTH_cfa2"}}))])) + resp = Request.blank('/auth/v2/act', + environ={ + 'REQUEST_METHOD': 'DELETE', + 'swift.cache': FakeMemcache()}, + headers={ + 'X-Auth-Admin-User': + '.super_admin', + 'X-Auth-Admin-Key': 'supertest'} + ).get_response(self.test_auth) + self.assertEquals(resp.status_int, 500) + self.assertEquals(self.test_auth.app.calls, 3) + self.assertEquals(conn.calls, 2) + + def test_delete_account_fail_delete_services(self): + conn = FakeConn(iter([ + # DELETE of storage account itself + ('204 No Content', {}, '')])) + self.test_auth.get_conn = lambda x: conn + self.test_auth.app = FakeApp(iter([ + # Account's container listing, checking for + # users + ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, + json.dumps([ + {"name": ".services", "hash": "etag", "bytes": 112, + "content_type": + "application/octet-stream", + "last_modified": "2010-12-03T17:16:27.618110"}])), + # Account's container listing, checking for users + # (continuation) + ('200 Ok', + {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, '[]'), + # GET the .services object + ('200 Ok', {}, json.dumps( + {"storage": {"default": "local", + "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}})), + # DELETE the .services object + ('503 Service Unavailable', {}, '')])) + resp = Request.blank('/auth/v2/act', + environ={ + 'REQUEST_METHOD': 'DELETE', + 'swift.cache': FakeMemcache()}, + headers={ + 'X-Auth-Admin-User': + '.super_admin', + 'X-Auth-Admin-Key': 'supertest'} + ).get_response(self.test_auth) + self.assertEquals(resp.status_int, 500) + self.assertEquals(self.test_auth.app.calls, 4) + self.assertEquals(conn.calls, 1) + + def test_delete_account_fail_delete_account_id_mapping( + self): + conn = FakeConn(iter([ + # DELETE of storage account itself + ('204 No Content', {}, '')])) + self.test_auth.get_conn = lambda x: conn + self.test_auth.app = FakeApp(iter([ + # Account's container listing, checking for + # users + ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, + json.dumps([ + {"name": ".services", "hash": "etag", "bytes": 112, + "content_type": + "application/octet-stream", + "last_modified": "2010-12-03T17:16:27.618110"}])), + # Account's container listing, checking for users + # (continuation) + ('200 Ok', + {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, '[]'), + # GET the .services object + ('200 Ok', {}, json.dumps( + {"storage": {"default": "local", + "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}})), + # DELETE the .services object + ('204 No Content', {}, ''), + # DELETE the .account_id mapping object + ('503 Service Unavailable', {}, '')])) + resp = Request.blank('/auth/v2/act', + environ={ + 'REQUEST_METHOD': 'DELETE', + 'swift.cache': FakeMemcache()}, + headers={ + 'X-Auth-Admin-User': + '.super_admin', + 'X-Auth-Admin-Key': 'supertest'} + ).get_response(self.test_auth) + self.assertEquals(resp.status_int, 500) + self.assertEquals(self.test_auth.app.calls, 5) + self.assertEquals(conn.calls, 1) + + def test_delete_account_fail_delete_account_container( + self): + conn = FakeConn(iter([ + # DELETE of storage account itself + ('204 No Content', {}, '')])) + self.test_auth.get_conn = lambda x: conn + self.test_auth.app = FakeApp(iter([ + # Account's container listing, checking for + # users + ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, + json.dumps([ + {"name": ".services", "hash": "etag", "bytes": 112, + "content_type": + "application/octet-stream", + "last_modified": "2010-12-03T17:16:27.618110"}])), + # Account's container listing, checking for users + # (continuation) + ('200 Ok', + {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, '[]'), + # GET the .services object + ('200 Ok', {}, json.dumps( + {"storage": {"default": "local", + "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}})), + # DELETE the .services object + ('204 No Content', {}, ''), + # DELETE the .account_id mapping object + ('204 No Content', {}, ''), + # DELETE the account container + ('503 Service Unavailable', {}, '')])) + resp = Request.blank('/auth/v2/act', + environ={ + 'REQUEST_METHOD': 'DELETE', + 'swift.cache': FakeMemcache()}, + headers={ + 'X-Auth-Admin-User': + '.super_admin', + 'X-Auth-Admin-Key': 'supertest'} + ).get_response(self.test_auth) + self.assertEquals(resp.status_int, 500) + self.assertEquals(self.test_auth.app.calls, 6) + self.assertEquals(conn.calls, 1) + + def test_get_user_success(self): + self.test_auth.app = FakeApp(iter([ + # GET of user object + ('200 Ok', {}, json.dumps( + {"groups": [{"name": "act:usr"}, {"name": "act"}, + {"name": ".admin"}], + "auth": "plaintext:key"}))])) + resp = Request.blank('/auth/v2/act/usr', + headers={ + 'X-Auth-Admin-User': + '.super_admin', + 'X-Auth-Admin-Key': 'supertest'} + ).get_response(self.test_auth) + self.assertEquals(resp.status_int, 200) + self.assertEquals(resp.body, json.dumps( + {"groups": [{"name": "act:usr"}, {"name": "act"}, + {"name": ".admin"}], + "auth": "plaintext:key"})) + self.assertEquals(self.test_auth.app.calls, 1) + + def test_get_user_fail_no_super_admin_key(self): + local_auth = auth.filter_factory({})(FakeApp(iter([ + # GET of user object (but we should never get + # here) + ('200 Ok', {}, json.dumps( + {"groups": [{"name": "act:usr"}, {"name": "act"}, + {"name": ".admin"}], + "auth": "plaintext:key"}))]))) + resp = Request.blank('/auth/v2/act/usr', + headers={ + 'X-Auth-Admin-User': + '.super_admin', + 'X-Auth-Admin-Key': 'supertest'} + ).get_response(local_auth) + self.assertEquals(resp.status_int, 404) + self.assertEquals(local_auth.app.calls, 0) + + def test_get_user_groups_success(self): + self.test_auth.app = FakeApp(iter([ + # GET of account container (list objects) + ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, + json.dumps([ + {"name": ".services", "hash": "etag", "bytes": 112, + "content_type": + "application/octet-stream", + "last_modified": "2010-12-03T17:16:27.618110"}, + {"name": "tester", "hash": "etag", "bytes": 104, + "content_type": + "application/octet-stream", + "last_modified": "2010-12-03T17:16:27.736680"}, + {"name": "tester3", "hash": "etag", "bytes": 86, + "content_type": + "application/octet-stream", + "last_modified": "2010-12-03T17:16:28.135530"}])), + # GET of user object + ('200 Ok', {}, json.dumps( + {"groups": [{"name": "act:tester"}, {"name": "act"}, + {"name": ".admin"}], + "auth": "plaintext:key"})), + # GET of user object + ('200 Ok', {}, json.dumps( + {"groups": [{"name": "act:tester3"}, {"name": "act"}], + "auth": "plaintext:key3"})), + # GET of account container (list objects + # continuation) + ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, '[]')])) + resp = Request.blank('/auth/v2/act/.groups', + headers={ + 'X-Auth-Admin-User': + '.super_admin', + 'X-Auth-Admin-Key': 'supertest'} + ).get_response(self.test_auth) + self.assertEquals(resp.status_int, 200) + self.assertEquals(resp.body, json.dumps( + {"groups": [{"name": ".admin"}, {"name": "act"}, + {"name": "act:tester"}, {"name": "act:tester3"}]})) + self.assertEquals(self.test_auth.app.calls, 4) + + def test_get_user_groups_success2(self): + self.test_auth.app = FakeApp(iter([ + # GET of account container (list objects) + ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, + json.dumps([ + {"name": ".services", "hash": "etag", "bytes": 112, + "content_type": + "application/octet-stream", + "last_modified": "2010-12-03T17:16:27.618110"}, + {"name": "tester", "hash": "etag", "bytes": 104, + "content_type": + "application/octet-stream", + "last_modified": "2010-12-03T17:16:27.736680"}])), + # GET of user object + ('200 Ok', {}, json.dumps( + {"groups": [{"name": "act:tester"}, {"name": "act"}, + {"name": ".admin"}], + "auth": "plaintext:key"})), + # GET of account container (list objects + # continuation) + ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, + json.dumps([ + {"name": "tester3", "hash": "etag", "bytes": 86, + "content_type": + "application/octet-stream", + "last_modified": "2010-12-03T17:16:28.135530"}])), + # GET of user object + ('200 Ok', {}, json.dumps( + {"groups": [{"name": "act:tester3"}, {"name": "act"}], + "auth": "plaintext:key3"})), + # GET of account container (list objects + # continuation) + ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, '[]')])) + resp = Request.blank('/auth/v2/act/.groups', + headers={ + 'X-Auth-Admin-User': + '.super_admin', + 'X-Auth-Admin-Key': 'supertest'} + ).get_response(self.test_auth) + self.assertEquals(resp.status_int, 200) + self.assertEquals(resp.body, json.dumps( + {"groups": [{"name": ".admin"}, {"name": "act"}, + {"name": "act:tester"}, {"name": "act:tester3"}]})) + self.assertEquals(self.test_auth.app.calls, 5) + + def test_get_user_fail_invalid_account(self): + resp = Request.blank('/auth/v2/.invalid/usr', + headers={ + 'X-Auth-Admin-User': + '.super_admin', + 'X-Auth-Admin-Key': 'supertest'} + ).get_response(self.test_auth) + self.assertEquals(resp.status_int, 400) + + def test_get_user_fail_invalid_user(self): + resp = Request.blank('/auth/v2/act/.invalid', + headers={ + 'X-Auth-Admin-User': + '.super_admin', + 'X-Auth-Admin-Key': 'supertest'} + ).get_response(self.test_auth) + self.assertEquals(resp.status_int, 400) + + def test_get_user_fail_bad_creds(self): + self.test_auth.app = FakeApp(iter([ + # GET of user object + ('404 Not Found', {}, '')])) + resp = Request.blank('/auth/v2/act/usr', + headers={ + 'X-Auth-Admin-User': + 'super:admin', + 'X-Auth-Admin-Key': 'supertest'}, + ).get_response(self.test_auth) + self.assertEquals(resp.status_int, 403) + self.assertEquals(self.test_auth.app.calls, 1) + + self.test_auth.app = FakeApp(iter([ + # GET of user object (regular user) + ('200 Ok', {}, json.dumps({"groups": [{"name": "act:usr"}, + {"name": "test"}], "auth": "plaintext:key"}))])) + resp = Request.blank('/auth/v2/act/usr', + headers={ + 'X-Auth-Admin-User': + 'act:usr', + 'X-Auth-Admin-Key': 'key'}, + ).get_response(self.test_auth) + self.assertEquals(resp.status_int, 403) + self.assertEquals(self.test_auth.app.calls, 1) + + def test_get_user_account_admin_success(self): + self.test_auth.app = FakeApp(iter([ + # GET of user object (account admin, but not reseller + # admin) + ('200 Ok', {}, json.dumps({"groups": [{"name": "act:adm"}, + {"name": "test"}, {"name": ".admin"}], + "auth": "plaintext:key"})), + # GET of requested user object + ('200 Ok', {}, json.dumps( + {"groups": [{"name": "act:usr"}, {"name": "act"}], + "auth": "plaintext:key"}))])) + resp = Request.blank('/auth/v2/act/usr', + headers={ + 'X-Auth-Admin-User': + 'act:adm', + 'X-Auth-Admin-Key': 'key'} + ).get_response(self.test_auth) + self.assertEquals(resp.status_int, 200) + self.assertEquals(resp.body, json.dumps( + {"groups": [{"name": "act:usr"}, {"name": "act"}], + "auth": "plaintext:key"})) + self.assertEquals(self.test_auth.app.calls, 2) + + def test_get_user_account_admin_fail_getting_account_admin( + self): + self.test_auth.app = FakeApp(iter([ + # GET of user object (account admin check) + ('200 Ok', {}, json.dumps({"groups": [{"name": "act:adm"}, + {"name": "test"}, {"name": ".admin"}], + "auth": "plaintext:key"})), + # GET of requested user object [who is an .admin + # as well] + ('200 Ok', {}, json.dumps( + {"groups": [{"name": "act:usr"}, {"name": "act"}, + {"name": ".admin"}], + "auth": "plaintext:key"})), + # GET of user object (reseller admin check [and fail + # here]) + ('200 Ok', {}, json.dumps({"groups": [{"name": "act:adm"}, + {"name": "test"}, {"name": ".admin"}], + "auth": "plaintext:key"}))])) + resp = Request.blank('/auth/v2/act/usr', + headers={ + 'X-Auth-Admin-User': + 'act:adm', + 'X-Auth-Admin-Key': 'key'} + ).get_response(self.test_auth) + self.assertEquals(resp.status_int, 403) + self.assertEquals(self.test_auth.app.calls, 3) + + def test_get_user_account_admin_fail_getting_reseller_admin( + self): + self.test_auth.app = FakeApp(iter([ + # GET of user object (account admin check) + ('200 Ok', {}, json.dumps({"groups": [{"name": "act:adm"}, + {"name": "test"}, {"name": ".admin"}], + "auth": "plaintext:key"})), + # GET of requested user object [who is a + # .reseller_admin] + ('200 Ok', {}, json.dumps( + {"groups": [{"name": "act:usr"}, {"name": "act"}, + {"name": ".reseller_admin"}], + "auth": "plaintext:key"}))])) + resp = Request.blank('/auth/v2/act/usr', + headers={ + 'X-Auth-Admin-User': + 'act:adm', + 'X-Auth-Admin-Key': 'key'} + ).get_response(self.test_auth) + self.assertEquals(resp.status_int, 403) + self.assertEquals(self.test_auth.app.calls, 2) + + def test_get_user_reseller_admin_fail_getting_reseller_admin( + self): + self.test_auth.app = FakeApp(iter([ + # GET of user object (account admin check) + ('200 Ok', {}, json.dumps({"groups": [{"name": "act:adm"}, + {"name": "test"}, {"name": ".reseller_admin"}], + "auth": "plaintext:key"})), + # GET of requested user object [who also is a + # .reseller_admin] + ('200 Ok', {}, json.dumps( + {"groups": [{"name": "act:usr"}, {"name": "act"}, + {"name": ".reseller_admin"}], + "auth": "plaintext:key"}))])) + resp = Request.blank('/auth/v2/act/usr', + headers={ + 'X-Auth-Admin-User': + 'act:adm', + 'X-Auth-Admin-Key': 'key'} + ).get_response(self.test_auth) + self.assertEquals(resp.status_int, 403) + self.assertEquals(self.test_auth.app.calls, 2) + + def test_get_user_super_admin_succeed_getting_reseller_admin( + self): + self.test_auth.app = FakeApp(iter([ + # GET of requested user object + ('200 Ok', {}, json.dumps( + {"groups": [{"name": "act:usr"}, {"name": "act"}, + {"name": ".reseller_admin"}], + "auth": "plaintext:key"}))])) + resp = Request.blank('/auth/v2/act/usr', + headers={ + 'X-Auth-Admin-User': + '.super_admin', + 'X-Auth-Admin-Key': 'supertest'} + ).get_response(self.test_auth) + self.assertEquals(resp.status_int, 200) + self.assertEquals(resp.body, json.dumps( + {"groups": [{"name": "act:usr"}, {"name": "act"}, + {"name": ".reseller_admin"}], + "auth": "plaintext:key"})) + self.assertEquals(self.test_auth.app.calls, 1) + + def test_get_user_groups_not_found(self): + self.test_auth.app = FakeApp(iter([ + # GET of account container (list objects) + ('404 Not Found', {}, '')])) + resp = Request.blank('/auth/v2/act/.groups', + headers={ + 'X-Auth-Admin-User': + '.super_admin', + 'X-Auth-Admin-Key': 'supertest'} + ).get_response(self.test_auth) + self.assertEquals(resp.status_int, 404) + self.assertEquals(self.test_auth.app.calls, 1) + + def test_get_user_groups_fail_listing(self): + self.test_auth.app = FakeApp(iter([ + # GET of account container (list objects) + ('503 Service Unavailable', {}, '')])) + resp = Request.blank('/auth/v2/act/.groups', + headers={ + 'X-Auth-Admin-User': + '.super_admin', + 'X-Auth-Admin-Key': 'supertest'} + ).get_response(self.test_auth) + self.assertEquals(resp.status_int, 500) + self.assertEquals(self.test_auth.app.calls, 1) + + def test_get_user_groups_fail_get_user(self): + self.test_auth.app = FakeApp(iter([ + # GET of account container (list objects) + ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, + json.dumps([ + {"name": ".services", "hash": "etag", "bytes": 112, + "content_type": + "application/octet-stream", + "last_modified": "2010-12-03T17:16:27.618110"}, + {"name": "tester", "hash": "etag", "bytes": 104, + "content_type": + "application/octet-stream", + "last_modified": "2010-12-03T17:16:27.736680"}, + {"name": "tester3", "hash": "etag", "bytes": 86, + "content_type": + "application/octet-stream", + "last_modified": "2010-12-03T17:16:28.135530"}])), + # GET of user object + ('503 Service Unavailable', {}, '')])) + resp = Request.blank('/auth/v2/act/.groups', + headers={ + 'X-Auth-Admin-User': + '.super_admin', + 'X-Auth-Admin-Key': 'supertest'} + ).get_response(self.test_auth) + self.assertEquals(resp.status_int, 500) + self.assertEquals(self.test_auth.app.calls, 2) + + def test_get_user_not_found(self): + self.test_auth.app = FakeApp(iter([ + # GET of user object + ('404 Not Found', {}, '')])) + resp = Request.blank('/auth/v2/act/usr', + headers={ + 'X-Auth-Admin-User': + '.super_admin', + 'X-Auth-Admin-Key': 'supertest'} + ).get_response(self.test_auth) + self.assertEquals(resp.status_int, 404) + self.assertEquals(self.test_auth.app.calls, 1) + + def test_get_user_fail(self): + self.test_auth.app = FakeApp(iter([ + # GET of user object + ('503 Service Unavailable', {}, '')])) + resp = Request.blank('/auth/v2/act/usr', + headers={ + 'X-Auth-Admin-User': + '.super_admin', + 'X-Auth-Admin-Key': + 'supertest', + 'X-Auth-User-Key': 'key'} + ).get_response(self.test_auth) + self.assertEquals(resp.status_int, 500) + self.assertEquals(self.test_auth.app.calls, 1) + + def test_put_user_fail_invalid_account(self): + resp = Request.blank('/auth/v2/.invalid/usr', + environ={ + 'REQUEST_METHOD': 'PUT'}, + headers={ + 'X-Auth-Admin-User': + '.super_admin', + 'X-Auth-Admin-Key': + 'supertest', + 'X-Auth-User-Key': 'key'} + ).get_response(self.test_auth) + self.assertEquals(resp.status_int, 400) + + def test_put_user_fail_invalid_user(self): + resp = Request.blank('/auth/v2/act/.usr', + environ={ + 'REQUEST_METHOD': 'PUT'}, + headers={ + 'X-Auth-Admin-User': + '.super_admin', + 'X-Auth-Admin-Key': + 'supertest', + 'X-Auth-User-Key': 'key'} + ).get_response(self.test_auth) + self.assertEquals(resp.status_int, 400) + + def test_put_user_fail_no_user_key(self): + resp = Request.blank('/auth/v2/act/usr', + environ={ + 'REQUEST_METHOD': 'PUT'}, + headers={ + 'X-Auth-Admin-User': + '.super_admin', + 'X-Auth-Admin-Key': 'supertest'} + ).get_response(self.test_auth) + self.assertEquals(resp.status_int, 400) + + def test_put_user_reseller_admin_fail_bad_creds(self): + self.test_auth.app = FakeApp(iter([ + # GET of user object (reseller admin) + # This shouldn't actually get called, checked + # below + ('200 Ok', {}, json.dumps({"groups": [{"name": "act:rdm"}, + {"name": "test"}, {"name": ".admin"}, + {"name": ".reseller_admin"}], "auth": "plaintext:key"}))])) + resp = Request.blank('/auth/v2/act/usr', + environ={ + 'REQUEST_METHOD': 'PUT'}, + headers={ + 'X-Auth-Admin-User': + 'act:rdm', + 'X-Auth-Admin-Key': + 'key', + 'X-Auth-User-Key': + 'key', + 'X-Auth-User-Reseller-Admin': 'true'} + ).get_response(self.test_auth) + self.assertEquals(resp.status_int, 403) + self.assertEquals(self.test_auth.app.calls, 0) + + self.test_auth.app = FakeApp(iter([ + # GET of user object (account admin, but not reseller admin) + # This shouldn't actually get called, checked + # below + ('200 Ok', {}, json.dumps({"groups": [{"name": "act:adm"}, + {"name": "test"}, {"name": ".admin"}], + "auth": "plaintext:key"}))])) + resp = Request.blank('/auth/v2/act/usr', + environ={ + 'REQUEST_METHOD': 'PUT'}, + headers={ + 'X-Auth-Admin-User': + 'act:adm', + 'X-Auth-Admin-Key': + 'key', + 'X-Auth-User-Key': + 'key', + 'X-Auth-User-Reseller-Admin': 'true'} + ).get_response(self.test_auth) + self.assertEquals(resp.status_int, 403) + self.assertEquals(self.test_auth.app.calls, 0) + + self.test_auth.app = FakeApp(iter([ + # GET of user object (regular user) + # This shouldn't actually get called, checked + # below + ('200 Ok', {}, json.dumps({"groups": [{"name": "act:usr"}, + {"name": "test"}], "auth": "plaintext:key"}))])) + resp = Request.blank('/auth/v2/act/usr', + environ={ + 'REQUEST_METHOD': 'PUT'}, + headers={ + 'X-Auth-Admin-User': + 'act:adm', + 'X-Auth-Admin-Key': + 'key', + 'X-Auth-User-Key': + 'key', + 'X-Auth-User-Reseller-Admin': 'true'} + ).get_response(self.test_auth) + self.assertEquals(resp.status_int, 403) + self.assertEquals(self.test_auth.app.calls, 0) + + def test_put_user_account_admin_fail_bad_creds(self): + self.test_auth.app = FakeApp(iter([ + # GET of user object (account admin, but wrong + # account) + ('200 Ok', {}, json.dumps({"groups": [{"name": "act2:adm"}, + {"name": "test"}, {"name": ".admin"}], + "auth": "plaintext:key"}))])) + resp = Request.blank('/auth/v2/act/usr', + environ={ + 'REQUEST_METHOD': 'PUT'}, + headers={ + 'X-Auth-Admin-User': + 'act2:adm', + 'X-Auth-Admin-Key': + 'key', + 'X-Auth-User-Key': + 'key', + 'X-Auth-User-Admin': 'true'} + ).get_response(self.test_auth) + self.assertEquals(resp.status_int, 403) + self.assertEquals(self.test_auth.app.calls, 1) + + self.test_auth.app = FakeApp(iter([ + # GET of user object (regular user) + ('200 Ok', {}, json.dumps({"groups": [{"name": "act:usr"}, + {"name": "test"}], "auth": "plaintext:key"}))])) + resp = Request.blank('/auth/v2/act/usr', + environ={ + 'REQUEST_METHOD': 'PUT'}, + headers={ + 'X-Auth-Admin-User': + 'act:usr', + 'X-Auth-Admin-Key': + 'key', + 'X-Auth-User-Key': + 'key', + 'X-Auth-User-Admin': 'true'} + ).get_response(self.test_auth) + self.assertEquals(resp.status_int, 403) + self.assertEquals(self.test_auth.app.calls, 1) + + def test_put_user_regular_fail_bad_creds(self): + self.test_auth.app = FakeApp(iter([ + # GET of user object (account admin, but wrong + # account) + ('200 Ok', {}, json.dumps({"groups": [{"name": "act2:adm"}, + {"name": "test"}, {"name": ".admin"}], + "auth": "plaintext:key"}))])) + resp = Request.blank('/auth/v2/act/usr', + environ={ + 'REQUEST_METHOD': 'PUT'}, + headers={ + 'X-Auth-Admin-User': + 'act2:adm', + 'X-Auth-Admin-Key': + 'key', + 'X-Auth-User-Key': 'key'} + ).get_response(self.test_auth) + self.assertEquals(resp.status_int, 403) + self.assertEquals(self.test_auth.app.calls, 1) + + self.test_auth.app = FakeApp(iter([ + # GET of user object (regular user) + ('200 Ok', {}, json.dumps({"groups": [{"name": "act:usr"}, + {"name": "test"}], "auth": "plaintext:key"}))])) + resp = Request.blank('/auth/v2/act/usr', + environ={ + 'REQUEST_METHOD': 'PUT'}, + headers={ + 'X-Auth-Admin-User': + 'act:usr', + 'X-Auth-Admin-Key': + 'key', + 'X-Auth-User-Key': 'key'} + ).get_response(self.test_auth) + self.assertEquals(resp.status_int, 403) + self.assertEquals(self.test_auth.app.calls, 1) + + def test_put_user_regular_success(self): + self.test_auth.app = FakeApp(iter([ + ('200 Ok', + {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, ''), + # PUT of user object + ('201 Created', {}, '')])) + resp = Request.blank('/auth/v2/act/usr', + environ={ + 'REQUEST_METHOD': 'PUT'}, + headers={ + 'X-Auth-Admin-User': + '.super_admin', + 'X-Auth-Admin-Key': + 'supertest', + 'X-Auth-User-Key': 'key'} + ).get_response(self.test_auth) + self.assertEquals(resp.status_int, 201) + self.assertEquals(self.test_auth.app.calls, 2) + self.assertEquals( + json.loads(self.test_auth.app.request.body), + {"groups": [{"name": "act:usr"}, {"name": "act"}], + "auth": "plaintext:key"}) + + def test_put_user_special_chars_success(self): + self.test_auth.app = FakeApp(iter([ + ('200 Ok', + {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, ''), + # PUT of user object + ('201 Created', {}, '')])) + resp = Request.blank('/auth/v2/act/u_s-r', + environ={ + 'REQUEST_METHOD': 'PUT'}, + headers={ + 'X-Auth-Admin-User': + '.super_admin', + 'X-Auth-Admin-Key': + 'supertest', + 'X-Auth-User-Key': 'key'} + ).get_response(self.test_auth) + self.assertEquals(resp.status_int, 201) + self.assertEquals(self.test_auth.app.calls, 2) + self.assertEquals( + json.loads(self.test_auth.app.request.body), + {"groups": [{"name": "act:u_s-r"}, {"name": "act"}], + "auth": "plaintext:key"}) + + def test_put_user_account_admin_success(self): + self.test_auth.app = FakeApp(iter([ + ('200 Ok', + {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, ''), + # PUT of user object + ('201 Created', {}, '')])) + resp = Request.blank('/auth/v2/act/usr', + environ={ + 'REQUEST_METHOD': 'PUT'}, + headers={ + 'X-Auth-Admin-User': + '.super_admin', + 'X-Auth-Admin-Key': + 'supertest', + 'X-Auth-User-Key': 'key', + 'X-Auth-User-Admin': 'true'} + ).get_response(self.test_auth) + self.assertEquals(resp.status_int, 201) + self.assertEquals(self.test_auth.app.calls, 2) + self.assertEquals( + json.loads(self.test_auth.app.request.body), + {"groups": [{"name": "act:usr"}, {"name": "act"}, + {"name": ".admin"}], + "auth": "plaintext:key"}) + + def test_put_user_reseller_admin_success(self): + self.test_auth.app = FakeApp(iter([ + ('200 Ok', + {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, ''), + # PUT of user object + ('201 Created', {}, '')])) + resp = Request.blank('/auth/v2/act/usr', + environ={ + 'REQUEST_METHOD': 'PUT'}, + headers={ + 'X-Auth-Admin-User': + '.super_admin', + 'X-Auth-Admin-Key': + 'supertest', + 'X-Auth-User-Key': 'key', + 'X-Auth-User-Reseller-Admin': 'true'} + ).get_response(self.test_auth) + self.assertEquals(resp.status_int, 201) + self.assertEquals(self.test_auth.app.calls, 2) + self.assertEquals( + json.loads(self.test_auth.app.request.body), + {"groups": [{"name": "act:usr"}, {"name": "act"}, + {"name": ".admin"}, {"name": ".reseller_admin"}], + "auth": "plaintext:key"}) + + def test_put_user_fail_not_found(self): + self.test_auth.app = FakeApp(iter([ + ('200 Ok', + {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, ''), + # PUT of user object + ('404 Not Found', {}, '')])) + resp = Request.blank('/auth/v2/act/usr', + environ={ + 'REQUEST_METHOD': 'PUT'}, + headers={ + 'X-Auth-Admin-User': + '.super_admin', + 'X-Auth-Admin-Key': + 'supertest', + 'X-Auth-User-Key': 'key'} + ).get_response(self.test_auth) + self.assertEquals(resp.status_int, 404) + self.assertEquals(self.test_auth.app.calls, 2) + + def test_put_user_fail(self): + self.test_auth.app = FakeApp(iter([ + # PUT of user object + ('503 Service Unavailable', {}, '')])) + resp = Request.blank('/auth/v2/act/usr', + environ={ + 'REQUEST_METHOD': 'PUT'}, + headers={ + 'X-Auth-Admin-User': + '.super_admin', + 'X-Auth-Admin-Key': + 'supertest', + 'X-Auth-User-Key': 'key'} + ).get_response(self.test_auth) + self.assertEquals(resp.status_int, 500) + self.assertEquals(self.test_auth.app.calls, 1) + + def test_delete_user_bad_creds(self): + self.test_auth.app = FakeApp(iter([ + # GET of user object (account admin, but wrong + # account) + ('200 Ok', {}, json.dumps({"groups": [{"name": "act2:adm"}, + {"name": "test"}, {"name": ".admin"}], + "auth": "plaintext:key"}))])) + resp = Request.blank('/auth/v2/act/usr', + environ={ + 'REQUEST_METHOD': 'DELETE'}, + headers={ + 'X-Auth-Admin-User': + 'act2:adm', + 'X-Auth-Admin-Key': 'key'} + ).get_response(self.test_auth) + self.assertEquals(resp.status_int, 403) + self.assertEquals(self.test_auth.app.calls, 1) + + self.test_auth.app = FakeApp(iter([ + # GET of user object (regular user) + ('200 Ok', {}, json.dumps({"groups": [{"name": "act:usr"}, + {"name": "test"}], "auth": "plaintext:key"}))])) + resp = Request.blank('/auth/v2/act/usr', + environ={ + 'REQUEST_METHOD': 'DELETE'}, + headers={ + 'X-Auth-Admin-User': + 'act:usr', + 'X-Auth-Admin-Key': 'key'} + ).get_response(self.test_auth) + self.assertEquals(resp.status_int, 403) + self.assertEquals(self.test_auth.app.calls, 1) + + def test_delete_user_invalid_account(self): + resp = Request.blank('/auth/v2/.invalid/usr', + environ={ + 'REQUEST_METHOD': 'DELETE'}, + headers={ + 'X-Auth-Admin-User': + '.super_admin', + 'X-Auth-Admin-Key': 'supertest'} + ).get_response(self.test_auth) + self.assertEquals(resp.status_int, 400) + + def test_delete_user_invalid_user(self): + resp = Request.blank('/auth/v2/act/.invalid', + environ={ + 'REQUEST_METHOD': 'DELETE'}, + headers={ + 'X-Auth-Admin-User': + '.super_admin', + 'X-Auth-Admin-Key': 'supertest'} + ).get_response(self.test_auth) + self.assertEquals(resp.status_int, 400) + + def test_delete_user_not_found(self): + self.test_auth.app = FakeApp(iter([ + # HEAD of user object + ('404 Not Found', {}, '')])) + resp = Request.blank('/auth/v2/act/usr', + environ={ + 'REQUEST_METHOD': 'DELETE'}, + headers={ + 'X-Auth-Admin-User': + '.super_admin', + 'X-Auth-Admin-Key': 'supertest'} + ).get_response(self.test_auth) + self.assertEquals(resp.status_int, 404) + self.assertEquals(self.test_auth.app.calls, 1) + + def test_delete_user_fail_head_user(self): + self.test_auth.app = FakeApp(iter([ + # HEAD of user object + ('503 Service Unavailable', {}, '')])) + resp = Request.blank('/auth/v2/act/usr', + environ={ + 'REQUEST_METHOD': 'DELETE'}, + headers={ + 'X-Auth-Admin-User': + '.super_admin', + 'X-Auth-Admin-Key': 'supertest'} + ).get_response(self.test_auth) + self.assertEquals(resp.status_int, 500) + self.assertEquals(self.test_auth.app.calls, 1) + + def test_delete_user_fail_delete_token(self): + self.test_auth.app = FakeApp(iter([ + # HEAD of user object + ('200 Ok', + {'X-Object-Meta-Auth-Token': 'AUTH_tk'}, ''), + # DELETE of token + ('503 Service Unavailable', {}, '')])) + resp = Request.blank('/auth/v2/act/usr', + environ={ + 'REQUEST_METHOD': 'DELETE'}, + headers={ + 'X-Auth-Admin-User': + '.super_admin', + 'X-Auth-Admin-Key': 'supertest'} + ).get_response(self.test_auth) + self.assertEquals(resp.status_int, 500) + self.assertEquals(self.test_auth.app.calls, 2) + + def test_delete_user_fail_delete_user(self): + self.test_auth.app = FakeApp(iter([ + # HEAD of user object + ('200 Ok', + {'X-Object-Meta-Auth-Token': 'AUTH_tk'}, ''), + # DELETE of token + ('204 No Content', {}, ''), + # DELETE of user object + ('503 Service Unavailable', {}, '')])) + resp = Request.blank('/auth/v2/act/usr', + environ={ + 'REQUEST_METHOD': 'DELETE'}, + headers={ + 'X-Auth-Admin-User': + '.super_admin', + 'X-Auth-Admin-Key': 'supertest'} + ).get_response(self.test_auth) + self.assertEquals(resp.status_int, 500) + self.assertEquals(self.test_auth.app.calls, 3) + + def test_delete_user_success(self): + self.test_auth.app = FakeApp(iter([ + # HEAD of user object + ('200 Ok', + {'X-Object-Meta-Auth-Token': 'AUTH_tk'}, ''), + # DELETE of token + ('204 No Content', {}, ''), + # DELETE of user object + ('204 No Content', {}, '')])) + resp = Request.blank('/auth/v2/act/usr', + environ={ + 'REQUEST_METHOD': 'DELETE'}, + headers={ + 'X-Auth-Admin-User': + '.super_admin', + 'X-Auth-Admin-Key': 'supertest'} + ).get_response(self.test_auth) + self.assertEquals(resp.status_int, 204) + self.assertEquals(self.test_auth.app.calls, 3) + + def test_delete_user_success_missing_user_at_end(self): + self.test_auth.app = FakeApp(iter([ + # HEAD of user object + ('200 Ok', + {'X-Object-Meta-Auth-Token': 'AUTH_tk'}, ''), + # DELETE of token + ('204 No Content', {}, ''), + # DELETE of user object + ('404 Not Found', {}, '')])) + resp = Request.blank('/auth/v2/act/usr', + environ={ + 'REQUEST_METHOD': 'DELETE'}, + headers={ + 'X-Auth-Admin-User': + '.super_admin', + 'X-Auth-Admin-Key': 'supertest'} + ).get_response(self.test_auth) + self.assertEquals(resp.status_int, 204) + self.assertEquals(self.test_auth.app.calls, 3) + + def test_delete_user_success_missing_token(self): + self.test_auth.app = FakeApp(iter([ + # HEAD of user object + ('200 Ok', + {'X-Object-Meta-Auth-Token': 'AUTH_tk'}, ''), + # DELETE of token + ('404 Not Found', {}, ''), + # DELETE of user object + ('204 No Content', {}, '')])) + resp = Request.blank('/auth/v2/act/usr', + environ={ + 'REQUEST_METHOD': 'DELETE'}, + headers={ + 'X-Auth-Admin-User': + '.super_admin', + 'X-Auth-Admin-Key': 'supertest'} + ).get_response(self.test_auth) + self.assertEquals(resp.status_int, 204) + self.assertEquals(self.test_auth.app.calls, 3) + + def test_delete_user_success_no_token(self): + self.test_auth.app = FakeApp(iter([ + # HEAD of user object + ('200 Ok', {}, ''), + # DELETE of user object + ('204 No Content', {}, '')])) + resp = Request.blank('/auth/v2/act/usr', + environ={ + 'REQUEST_METHOD': 'DELETE'}, + headers={ + 'X-Auth-Admin-User': + '.super_admin', + 'X-Auth-Admin-Key': 'supertest'} + ).get_response(self.test_auth) + self.assertEquals(resp.status_int, 204) + self.assertEquals(self.test_auth.app.calls, 2) + + def test_validate_token_bad_prefix(self): + resp = Request.blank('/auth/v2/.token/BAD_token' + ).get_response(self.test_auth) + self.assertEquals(resp.status_int, 400) + + def test_validate_token_tmi(self): + resp = Request.blank( + '/auth/v2/.token/AUTH_token/tmi').get_response(self.test_auth) + self.assertEquals(resp.status_int, 400) + + def test_validate_token_bad_memcache(self): + fake_memcache = FakeMemcache() + fake_memcache.set('AUTH_/auth/AUTH_token', 'bogus') + resp = Request.blank( + '/auth/v2/.token/AUTH_token', + environ={'swift.cache': fake_memcache}).get_response( + self.test_auth) + self.assertEquals(resp.status_int, 500) + + def test_validate_token_from_memcache(self): + fake_memcache = FakeMemcache() + fake_memcache.set( + 'AUTH_/auth/AUTH_token', + (time() + 1, + 'act:usr,act')) + resp = Request.blank( + '/auth/v2/.token/AUTH_token', + environ={'swift.cache': fake_memcache}).get_response( + self.test_auth) + self.assertEquals(resp.status_int, 204) + self.assertEquals( + resp.headers.get('x-auth-groups'), + 'act:usr,act') + self.assert_(float(resp.headers['x-auth-ttl']) < 1, + resp.headers['x-auth-ttl']) + + def test_validate_token_from_memcache_expired(self): + fake_memcache = FakeMemcache() + fake_memcache.set( + 'AUTH_/auth/AUTH_token', + (time() - 1, + 'act:usr,act')) + resp = Request.blank( + '/auth/v2/.token/AUTH_token', + environ={'swift.cache': fake_memcache}).get_response( + self.test_auth) + self.assertEquals(resp.status_int, 404) + self.assert_('x-auth-groups' not in resp.headers) + self.assert_('x-auth-ttl' not in resp.headers) + + def test_validate_token_from_object(self): + self.test_auth.app = FakeApp(iter([ + # GET of token object + ('200 Ok', {}, json.dumps({'groups': [{'name': 'act:usr'}, + {'name': 'act'}], 'expires': time() + 1}))])) + resp = Request.blank('/auth/v2/.token/AUTH_token' + ).get_response(self.test_auth) + self.assertEquals(resp.status_int, 204) + self.assertEquals(self.test_auth.app.calls, 1) + self.assertEquals( + resp.headers.get('x-auth-groups'), + 'act:usr,act') + self.assert_(float(resp.headers['x-auth-ttl']) < 1, + resp.headers['x-auth-ttl']) + + def test_validate_token_from_object_expired(self): + self.test_auth.app = FakeApp(iter([ + # GET of token object + ('200 Ok', {}, json.dumps({'groups': 'act:usr,act', + 'expires': time() - 1})), + # DELETE of expired token object + ('204 No Content', {}, '')])) + resp = Request.blank('/auth/v2/.token/AUTH_token' + ).get_response(self.test_auth) + self.assertEquals(resp.status_int, 404) + self.assertEquals(self.test_auth.app.calls, 2) + + def test_validate_token_from_object_with_admin(self): + self.test_auth.app = FakeApp(iter([ + # GET of token object + ('200 Ok', {}, json.dumps({'account_id': 'AUTH_cfa', 'groups': + [{'name': 'act:usr'}, + {'name': 'act'}, + {'name': '.admin'}], + 'expires': time() + 1}))])) + resp = Request.blank('/auth/v2/.token/AUTH_token' + ).get_response(self.test_auth) + self.assertEquals(resp.status_int, 204) + self.assertEquals(self.test_auth.app.calls, 1) + self.assertEquals(resp.headers.get('x-auth-groups'), + 'act:usr,act,AUTH_cfa') + self.assert_(float(resp.headers['x-auth-ttl']) < 1, + resp.headers['x-auth-ttl']) + + def test_get_conn_default(self): + conn = self.test_auth.get_conn() + self.assertEquals( + conn.__class__, + auth.HTTPConnection) + self.assertEquals(conn.host, '127.0.0.1') + self.assertEquals(conn.port, 8080) + + def test_get_conn_default_https(self): + local_auth = auth.filter_factory( + {'super_admin_key': 'supertest', + 'default_swift_cluster': 'local#https://1.2.3.4/v1'})(FakeApp()) + conn = local_auth.get_conn() + self.assertEquals( + conn.__class__, + auth.HTTPSConnection) + self.assertEquals(conn.host, '1.2.3.4') + self.assertEquals(conn.port, 443) + + def test_get_conn_overridden(self): + local_auth = auth.filter_factory( + {'super_admin_key': 'supertest', + 'default_swift_cluster': 'local#https://1.2.3.4/v1'})(FakeApp()) + conn = \ + local_auth.get_conn( + urlparsed=auth.urlparse('http://5.6.7.8/v1')) + self.assertEquals( + conn.__class__, + auth.HTTPConnection) + self.assertEquals(conn.host, '5.6.7.8') + self.assertEquals(conn.port, 80) + + def test_get_conn_overridden_https(self): + local_auth = auth.filter_factory( + {'super_admin_key': 'supertest', + 'default_swift_cluster': 'local#http://1.2.3.4/v1'})(FakeApp()) + conn = \ + local_auth.get_conn( + urlparsed=auth.urlparse( + 'https://5.6.7.8/v1')) + self.assertEquals( + conn.__class__, + auth.HTTPSConnection) + self.assertEquals(conn.host, '5.6.7.8') + self.assertEquals(conn.port, 443) + + def test_get_itoken_fail_no_memcache(self): + exc = None + try: + self.test_auth.get_itoken({}) + except Exception as err: + exc = err + self.assertEquals(str(exc), + 'No memcache set up; required for Swauth middleware') + + def test_get_itoken_success(self): + fmc = FakeMemcache() + itk = self.test_auth.get_itoken( + {'swift.cache': fmc}) + self.assert_(itk.startswith('AUTH_itk'), itk) + expires, groups = fmc.get('AUTH_/auth/%s' % itk) + self.assert_(expires > time(), expires) + self.assertEquals( + groups, + '.auth,.reseller_admin,AUTH_.auth') + + def test_get_admin_detail_fail_no_colon(self): + self.test_auth.app = FakeApp(iter([])) + self.assertEquals( + self.test_auth.get_admin_detail( + Request.blank('/')), + None) + self.assertEquals( + self.test_auth.get_admin_detail( + Request.blank('/', + headers={'X-Auth-Admin-User': 'usr'})), None) + self.assertRaises( + StopIteration, self.test_auth.get_admin_detail, + Request.blank('/', headers={'X-Auth-Admin-User': 'act:usr'})) + + def test_get_admin_detail_fail_user_not_found(self): + self.test_auth.app = FakeApp( + iter([('404 Not Found', {}, '')])) + self.assertEquals( + self.test_auth.get_admin_detail( + Request.blank('/', + headers={'X-Auth-Admin-User': 'act:usr'})), None) + self.assertEquals(self.test_auth.app.calls, 1) + + def test_get_admin_detail_fail_get_user_error(self): + self.test_auth.app = FakeApp(iter([ + ('503 Service Unavailable', {}, '')])) + exc = None + try: + self.test_auth.get_admin_detail( + Request.blank('/', + headers={'X-Auth-Admin-User': 'act:usr'})) + except Exception as err: + exc = err + self.assertEquals(str(exc), 'Could not get admin user object: ' + '/v1/AUTH_.auth/act/usr 503 Service Unavailable') + self.assertEquals(self.test_auth.app.calls, 1) + + def test_get_admin_detail_success(self): + self.test_auth.app = FakeApp(iter([ + ('200 Ok', {}, + json.dumps({"auth": "plaintext:key", + "groups": [{'name': "act:usr"}, {'name': "act"}, + {'name': ".admin"}]}))])) + detail = self.test_auth.get_admin_detail( + Request.blank('/', + headers={'X-Auth-Admin-User': 'act:usr'})) + self.assertEquals(self.test_auth.app.calls, 1) + self.assertEquals( + detail, {'account': 'act', + 'auth': 'plaintext:key', + 'groups': [{'name': 'act:usr'}, {'name': 'act'}, + {'name': '.admin'}]}) + + def test_credentials_match_success(self): + self.assert_(self.test_auth.credentials_match( + {'auth': 'plaintext:key'}, 'key')) + + def test_credentials_match_fail_no_details(self): + self.assert_( + not self.test_auth.credentials_match(None, 'notkey')) + + def test_credentials_match_fail_plaintext(self): + self.assert_(not self.test_auth.credentials_match( + {'auth': 'plaintext:key'}, 'notkey')) + + def test_is_super_admin_success(self): + self.assert_( + self.test_auth.is_super_admin( + Request.blank( + '/', + headers={'X-Auth-Admin-User': '.super_admin', + 'X-Auth-Admin-Key': 'supertest'}))) + + def test_is_super_admin_fail_bad_key(self): + self.assert_( + not self.test_auth.is_super_admin( + Request.blank('/', + headers={ + 'X-Auth-Admin-User': + '.super_admin', + 'X-Auth-Admin-Key': 'bad'}))) + self.assert_( + not self.test_auth.is_super_admin( + Request.blank('/', + headers={'X-Auth-Admin-User': '.super_admin'}))) + self.assert_( + not self.test_auth.is_super_admin(Request.blank('/'))) + + def test_is_super_admin_fail_bad_user(self): + self.assert_( + not self.test_auth.is_super_admin( + Request.blank('/', + headers={ + 'X-Auth-Admin-User': + 'bad', + 'X-Auth-Admin-Key': 'supertest'}))) + self.assert_( + not self.test_auth.is_super_admin( + Request.blank('/', + headers={'X-Auth-Admin-Key': 'supertest'}))) + self.assert_( + not self.test_auth.is_super_admin(Request.blank('/'))) + + def test_is_reseller_admin_success_is_super_admin(self): + self.assert_( + self.test_auth.is_reseller_admin( + Request.blank('/', + headers={ + 'X-Auth-Admin-User': + '.super_admin', + 'X-Auth-Admin-Key': 'supertest'}))) + + def test_is_reseller_admin_success_called_get_admin_detail( + self): + self.test_auth.app = FakeApp(iter([ + ('200 Ok', {}, + json.dumps({'auth': 'plaintext:key', + 'groups': [{'name': 'act:rdm'}, {'name': 'act'}, + {'name': '.admin'}, + {'name': '.reseller_admin'}]}))])) + self.assert_( + self.test_auth.is_reseller_admin( + Request.blank('/', + headers={ + 'X-Auth-Admin-User': + 'act:rdm', + 'X-Auth-Admin-Key': 'key'}))) + + def test_is_reseller_admin_fail_only_account_admin( + self): + self.test_auth.app = FakeApp(iter([ + ('200 Ok', {}, + json.dumps({'auth': 'plaintext:key', + 'groups': [{'name': 'act:adm'}, {'name': 'act'}, + {'name': '.admin'}]}))])) + self.assert_( + not self.test_auth.is_reseller_admin( + Request.blank('/', + headers={ + 'X-Auth-Admin-User': + 'act:adm', + 'X-Auth-Admin-Key': 'key'}))) + + def test_is_reseller_admin_fail_regular_user(self): + self.test_auth.app = FakeApp(iter([ + ('200 Ok', {}, + json.dumps({'auth': 'plaintext:key', + 'groups': [{'name': 'act:usr'}, {'name': 'act'}]}))])) + self.assert_( + not self.test_auth.is_reseller_admin( + Request.blank('/', + headers={ + 'X-Auth-Admin-User': + 'act:usr', + 'X-Auth-Admin-Key': 'key'}))) + + def test_is_reseller_admin_fail_bad_key(self): + self.test_auth.app = FakeApp(iter([ + ('200 Ok', {}, + json.dumps({'auth': 'plaintext:key', + 'groups': [{'name': 'act:rdm'}, {'name': 'act'}, + {'name': '.admin'}, + {'name': '.reseller_admin'}]}))])) + self.assert_( + not self.test_auth.is_reseller_admin( + Request.blank('/', + headers={ + 'X-Auth-Admin-User': + 'act:rdm', + 'X-Auth-Admin-Key': 'bad'}))) + + def test_is_account_admin_success_is_super_admin(self): + self.assert_( + self.test_auth.is_account_admin( + Request.blank('/', + headers={ + 'X-Auth-Admin-User': + '.super_admin', + 'X-Auth-Admin-Key': 'supertest'}), 'act')) + + def test_is_account_admin_success_is_reseller_admin( + self): + self.test_auth.app = FakeApp(iter([ + ('200 Ok', {}, + json.dumps({'auth': 'plaintext:key', + 'groups': [{'name': 'act:rdm'}, {'name': 'act'}, + {'name': '.admin'}, + {'name': '.reseller_admin'}]}))])) + self.assert_( + self.test_auth.is_account_admin( + Request.blank('/', + headers={ + 'X-Auth-Admin-User': + 'act:rdm', + 'X-Auth-Admin-Key': 'key'}), 'act')) + + def test_is_account_admin_success(self): + self.test_auth.app = FakeApp(iter([ + ('200 Ok', {}, + json.dumps({'auth': 'plaintext:key', + 'groups': [{'name': 'act:adm'}, {'name': 'act'}, + {'name': '.admin'}]}))])) + self.assert_( + self.test_auth.is_account_admin( + Request.blank('/', + headers={ + 'X-Auth-Admin-User': + 'act:adm', + 'X-Auth-Admin-Key': 'key'}), 'act')) + + def test_is_account_admin_fail_account_admin_different_account( + self): + self.test_auth.app = FakeApp(iter([ + ('200 Ok', {}, + json.dumps({'auth': 'plaintext:key', + 'groups': [{'name': 'act2:adm'}, {'name': 'act2'}, + {'name': '.admin'}]}))])) + self.assert_( + not self.test_auth.is_account_admin( + Request.blank('/', + headers={ + 'X-Auth-Admin-User': + 'act2:adm', + 'X-Auth-Admin-Key': 'key'}), 'act')) + + def test_is_account_admin_fail_regular_user(self): + self.test_auth.app = FakeApp(iter([ + ('200 Ok', {}, + json.dumps({'auth': 'plaintext:key', + 'groups': [{'name': 'act:usr'}, {'name': 'act'}]}))])) + self.assert_( + not self.test_auth.is_account_admin( + Request.blank('/', + headers={ + 'X-Auth-Admin-User': + 'act:usr', + 'X-Auth-Admin-Key': 'key'}), 'act')) + + def test_is_account_admin_fail_bad_key(self): + self.test_auth.app = FakeApp(iter([ + ('200 Ok', {}, + json.dumps({'auth': 'plaintext:key', + 'groups': [{'name': 'act:rdm'}, {'name': 'act'}, + {'name': '.admin'}, + {'name': '.reseller_admin'}]}))])) + self.assert_( + not self.test_auth.is_account_admin( + Request.blank('/', + headers={ + 'X-Auth-Admin-User': + 'act:rdm', + 'X-Auth-Admin-Key': 'bad'}), 'act')) + + def test_reseller_admin_but_account_is_internal_use_only( + self): + req = Request.blank('/v1/AUTH_.auth', + environ={'REQUEST_METHOD': 'GET'}) + req.remote_user = 'act:usr,act,.reseller_admin' + resp = self.test_auth.authorize(req) + self.assertEquals(resp.status_int, 403) + + def test_reseller_admin_but_account_is_exactly_reseller_prefix( + self): + req = Request.blank( + '/v1/AUTH_', + environ={'REQUEST_METHOD': 'GET'}) + req.remote_user = 'act:usr,act,.reseller_admin' + resp = self.test_auth.authorize(req) + self.assertEquals(resp.status_int, 403) + + def _get_token_success_v1_0_encoded( + self, saved_user, saved_key, sent_user, + sent_key): + self.test_auth.app = FakeApp(iter([ + # GET of user object + ('200 Ok', {}, + json.dumps({"auth": "plaintext:%s" % saved_key, + "groups": [{'name': saved_user}, {'name': "act"}, + {'name': ".admin"}]})), + # GET of account + ('204 Ok', + {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, ''), + # PUT of new token + ('201 Created', {}, ''), + # POST of token to user object + ('204 No Content', {}, ''), + # GET of services object + ('200 Ok', {}, json.dumps({"storage": {"default": "local", + "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}}))])) + resp = Request.blank( + '/auth/v1.0', + headers={'X-Auth-User': sent_user, + 'X-Auth-Key': sent_key}).get_response(self.test_auth) + self.assertEquals(resp.status_int, 200) + self.assert_( + resp.headers.get('x-auth-token', + '').startswith('AUTH_tk'), + resp.headers.get('x-auth-token')) + self.assertEquals(resp.headers.get('x-auth-token'), + resp.headers.get('x-storage-token')) + self.assertEquals(resp.headers.get('x-storage-url'), + 'http://127.0.0.1:8080/v1/AUTH_cfa') + self.assertEquals( + json.loads(resp.body), + {"storage": {"default": "local", + "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}}) + self.assertEquals(self.test_auth.app.calls, 5) + + def test_get_token_success_v1_0_encoded1(self): + self._get_token_success_v1_0_encoded( + 'act:usr', 'key', 'act%3ausr', 'key') + + def test_get_token_success_v1_0_encoded2(self): + self._get_token_success_v1_0_encoded( + 'act:u s r', 'key', 'act%3au%20s%20r', 'key') + + def test_get_token_success_v1_0_encoded3(self): + self._get_token_success_v1_0_encoded( + 'act:u s r', 'k:e:y', 'act%3au%20s%20r', 'k%3Ae%3ay') + + def test_allowed_sync_hosts(self): + a = auth.filter_factory( + {'super_admin_key': 'supertest'})(FakeApp()) + self.assertEquals( + a.allowed_sync_hosts, + ['127.0.0.1']) + a = auth.filter_factory({ + 'super_admin_key': 'supertest', + 'allowed_sync_hosts': + '1.1.1.1,2.1.1.1, 3.1.1.1 , 4.1.1.1,, , 5.1.1.1'})(FakeApp()) + self.assertEquals( + a.allowed_sync_hosts, + ['1.1.1.1', '2.1.1.1', '3.1.1.1', '4.1.1.1', '5.1.1.1']) + + def test_reseller_admin_is_owner(self): + orig_authorize = self.test_auth.authorize + owner_values = [] + + def mitm_authorize(req): + rv = orig_authorize(req) + owner_values.append( + req.environ.get('swift_owner', False)) + return rv + + self.test_auth.authorize = mitm_authorize + + self.test_auth.app = FakeApp(iter([ + ('200 Ok', {}, + json.dumps( + {'account': 'other', 'user': 'other:usr', + 'account_id': 'AUTH_other', + 'groups': [{'name': 'other:usr'}, {'name': 'other'}, + {'name': '.reseller_admin'}], + 'expires': time() + 60})), + ('204 No Content', {}, '')])) + req = Request.blank( + '/v1/AUTH_cfa', + headers={'X-Auth-Token': 'AUTH_t'}) + resp = req.get_response(self.test_auth) + self.assertEquals(resp.status_int, 204) + self.assertEquals(owner_values, [True]) + + def test_admin_is_owner(self): + orig_authorize = self.test_auth.authorize + owner_values = [] + + def mitm_authorize(req): + rv = orig_authorize(req) + owner_values.append( + req.environ.get('swift_owner', False)) + return rv + + self.test_auth.authorize = mitm_authorize + + self.test_auth.app = FakeApp(iter([ + ('200 Ok', {}, + json.dumps( + {'account': 'act', 'user': 'act:usr', + 'account_id': 'AUTH_cfa', + 'groups': [{'name': 'act:usr'}, {'name': 'act'}, + {'name': '.admin'}], + 'expires': time() + 60})), + ('204 No Content', {}, '')])) + req = Request.blank( + '/v1/AUTH_cfa', + headers={'X-Auth-Token': 'AUTH_t'}) + resp = req.get_response(self.test_auth) + self.assertEquals(resp.status_int, 204) + self.assertEquals(owner_values, [True]) + + def test_regular_is_not_owner(self): + orig_authorize = self.test_auth.authorize + owner_values = [] + + def mitm_authorize(req): + rv = orig_authorize(req) + owner_values.append( + req.environ.get('swift_owner', False)) + return rv + + self.test_auth.authorize = mitm_authorize + + self.test_auth.app = FakeApp(iter([ + ('200 Ok', {}, + json.dumps( + {'account': 'act', 'user': 'act:usr', + 'account_id': 'AUTH_cfa', + 'groups': + [{'name': 'act:usr'}, { + 'name': 'act'}], + 'expires': time() + 60})), + ('204 No Content', {}, '')]), acl='act:usr') + req = Request.blank('/v1/AUTH_cfa/c', + headers={'X-Auth-Token': 'AUTH_t'}) + resp = req.get_response(self.test_auth) + self.assertEquals(resp.status_int, 204) + self.assertEquals(owner_values, [False]) + + def test_sync_request_success(self): + self.test_auth.app = FakeApp( + iter([('204 No Content', {}, '')]), + sync_key='secret') + req = Request.blank('/v1/AUTH_cfa/c/o', + environ={ + 'REQUEST_METHOD': 'DELETE'}, + headers={ + 'x-container-sync-key': + 'secret', + 'x-timestamp': '123.456'}) + req.remote_addr = '127.0.0.1' + resp = req.get_response(self.test_auth) + self.assertEquals(resp.status_int, 204) + + def test_sync_request_fail_key(self): + self.test_auth.app = FakeApp( + iter([('204 No Content', {}, '')]), + sync_key='secret') + req = Request.blank('/v1/AUTH_cfa/c/o', + environ={ + 'REQUEST_METHOD': 'DELETE'}, + headers={ + 'x-container-sync-key': + 'wrongsecret', + 'x-timestamp': '123.456'}) + req.remote_addr = '127.0.0.1' + resp = req.get_response(self.test_auth) + self.assertEquals(resp.status_int, 401) + + self.test_auth.app = FakeApp( + iter([('204 No Content', {}, '')]), + sync_key='othersecret') + req = Request.blank('/v1/AUTH_cfa/c/o', + environ={ + 'REQUEST_METHOD': 'DELETE'}, + headers={ + 'x-container-sync-key': + 'secret', + 'x-timestamp': '123.456'}) + req.remote_addr = '127.0.0.1' + resp = req.get_response(self.test_auth) + self.assertEquals(resp.status_int, 401) + + self.test_auth.app = FakeApp( + iter([('204 No Content', {}, '')]), + sync_key=None) + req = Request.blank('/v1/AUTH_cfa/c/o', + environ={ + 'REQUEST_METHOD': 'DELETE'}, + headers={ + 'x-container-sync-key': + 'secret', + 'x-timestamp': '123.456'}) + req.remote_addr = '127.0.0.1' + resp = req.get_response(self.test_auth) + self.assertEquals(resp.status_int, 401) + + def test_sync_request_fail_no_timestamp(self): + self.test_auth.app = FakeApp( + iter([('204 No Content', {}, '')]), + sync_key='secret') + req = Request.blank('/v1/AUTH_cfa/c/o', + environ={ + 'REQUEST_METHOD': 'DELETE'}, + headers={'x-container-sync-key': 'secret'}) + req.remote_addr = '127.0.0.1' + resp = req.get_response(self.test_auth) + self.assertEquals(resp.status_int, 401) + + def test_sync_request_fail_sync_host(self): + self.test_auth.app = FakeApp( + iter([('204 No Content', {}, '')]), + sync_key='secret') + req = Request.blank('/v1/AUTH_cfa/c/o', + environ={ + 'REQUEST_METHOD': 'DELETE'}, + headers={ + 'x-container-sync-key': + 'secret', + 'x-timestamp': '123.456'}) + req.remote_addr = '127.0.0.2' + resp = req.get_response(self.test_auth) + self.assertEquals(resp.status_int, 401) + + def test_sync_request_success_lb_sync_host(self): + self.test_auth.app = FakeApp( + iter([('204 No Content', {}, '')]), + sync_key='secret') + req = Request.blank('/v1/AUTH_cfa/c/o', + environ={ + 'REQUEST_METHOD': 'DELETE'}, + headers={ + 'x-container-sync-key': + 'secret', + 'x-timestamp': + '123.456', + 'x-forwarded-for': '127.0.0.1'}) + req.remote_addr = '127.0.0.2' + resp = req.get_response(self.test_auth) + self.assertEquals(resp.status_int, 204) + + self.test_auth.app = FakeApp( + iter([('204 No Content', {}, '')]), + sync_key='secret') + req = Request.blank('/v1/AUTH_cfa/c/o', + environ={ + 'REQUEST_METHOD': 'DELETE'}, + headers={ + 'x-container-sync-key': + 'secret', + 'x-timestamp': + '123.456', + 'x-cluster-client-ip': '127.0.0.1'}) + req.remote_addr = '127.0.0.2' + resp = req.get_response(self.test_auth) + self.assertEquals(resp.status_int, 204) + + def _make_request(self, path, **kwargs): + req = Request.blank(path, **kwargs) + req.environ['swift.cache'] = FakeMemcache() + return req + + def test_override_asked_for_but_not_allowed(self): + self.test_auth = \ + auth.filter_factory( + {'allow_overrides': 'false'})(FakeApp()) + req = self._make_request('/v1/AUTH_account', + environ={'swift.authorize_override': True}) + resp = req.get_response(self.test_auth) + self.assertEquals(resp.status_int, 401) + self.assertEquals(resp.environ['swift.authorize'], + self.test_auth.authorize) + + def test_override_asked_for_and_allowed(self): + self.test_auth = \ + auth.filter_factory( + {'allow_overrides': 'true'})(FakeApp()) + req = self._make_request('/v1/AUTH_account', + environ={'swift.authorize_override': True}) + resp = req.get_response(self.test_auth) + self.assertEquals(resp.status_int, 404) + self.assertTrue( + 'swift.authorize' not in resp.environ) + + def test_override_default_allowed(self): + req = self._make_request('/v1/AUTH_account', + environ={'swift.authorize_override': True}) + resp = req.get_response(self.test_auth) + self.assertEquals(resp.status_int, 404) + self.assertTrue( + 'swift.authorize' not in resp.environ) + + def test_token_too_long(self): + req = self._make_request('/v1/AUTH_account', headers={ + 'x-auth-token': 'a' * MAX_TOKEN_LENGTH}) + resp = req.get_response(self.test_auth) + self.assertEquals(resp.status_int, 401) + self.assertNotEquals( + resp.body, + 'Token exceeds maximum length.') + req = self._make_request('/v1/AUTH_account', headers={ + 'x-auth-token': 'a' * (MAX_TOKEN_LENGTH + 1)}) + resp = req.get_response(self.test_auth) + self.assertEquals(resp.status_int, 400) + self.assertEquals( + resp.body, + 'Token exceeds maximum length.') + + def test_crazy_authorization(self): + req = self._make_request('/v1/AUTH_account', headers={ + 'authorization': 'somebody elses header value'}) + resp = req.get_response(self.test_auth) + self.assertEquals(resp.status_int, 401) + self.assertEquals(resp.environ['swift.authorize'], + self.test_auth.denied_response) + + +if __name__ == '__main__': + unittest.main() -- cgit