summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorThiago da Silva <thiago@redhat.com>2013-10-17 13:27:31 -0400
committerLuis Pabon <lpabon@redhat.com>2013-10-17 12:59:48 -0700
commitcadaed4627078581eddf85ce2b86555f5efec1bd (patch)
tree2375bf1e3baff57e7ba790c295d4ef04ee12ec99
parentc86bf48f72686a51c2ed8963a678c8fce1c5cbf3 (diff)
moving existing swauth unit tests
moving existing swauth unit tests to gluster-swift unit tests location Change-Id: I3445b7ef1a1abe584854f2b04ffc9949b3346814 Signed-off-by: Thiago da Silva <thiago@redhat.com> Reviewed-on: http://review.gluster.org/6106 Reviewed-by: Luis Pabon <lpabon@redhat.com> Tested-by: Luis Pabon <lpabon@redhat.com>
-rw-r--r--gluster/swift/common/middleware/__init__.py0
-rw-r--r--gluster/swift/common/middleware/gswauth/__init__.py0
-rw-r--r--gluster/swift/common/middleware/gswauth/swauth/middleware.py8
-rw-r--r--test/unit/common/middleware/__init__.py0
-rw-r--r--test/unit/common/middleware/gswauth/__init__.py0
-rw-r--r--test/unit/common/middleware/gswauth/swauth/__init__.py0
-rw-r--r--test/unit/common/middleware/gswauth/swauth/test_authtypes.py63
-rw-r--r--test/unit/common/middleware/gswauth/swauth/test_middleware.py4520
8 files changed, 4587 insertions, 4 deletions
diff --git a/gluster/swift/common/middleware/__init__.py b/gluster/swift/common/middleware/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/gluster/swift/common/middleware/__init__.py
diff --git a/gluster/swift/common/middleware/gswauth/__init__.py b/gluster/swift/common/middleware/gswauth/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/gluster/swift/common/middleware/gswauth/__init__.py
diff --git a/gluster/swift/common/middleware/gswauth/swauth/middleware.py b/gluster/swift/common/middleware/gswauth/swauth/middleware.py
index 1dffec5..65f7ad7 100644
--- a/gluster/swift/common/middleware/gswauth/swauth/middleware.py
+++ b/gluster/swift/common/middleware/gswauth/swauth/middleware.py
@@ -39,8 +39,8 @@ from swift.common.utils import cache_from_env, get_logger, get_remote_client, \
split_path, TRUE_VALUES, urlparse
import swift.common.wsgi
-from swauth import swift_version
-import swauth.authtypes
+from gluster.swift.common.middleware.gswauth.swauth import swift_version
+from gluster.swift.common.middleware.gswauth.swauth import authtypes
MEMCACHE_TIME = swift_version.newer_than('1.7.7-dev')
@@ -139,7 +139,7 @@ class Swauth(object):
# Get an instance of our auth_type encoder for saving and checking the
# user's key
self.auth_type = conf.get('auth_type', 'Plaintext').title()
- self.auth_encoder = getattr(swauth.authtypes, self.auth_type, None)
+ self.auth_encoder = getattr(authtypes, self.auth_type, None)
if self.auth_encoder is None:
raise Exception(
'Invalid auth_type in config file: %s'
@@ -203,7 +203,7 @@ class Swauth(object):
return self.handle(env, start_response)
s3 = env.get('HTTP_AUTHORIZATION')
token = env.get('HTTP_X_AUTH_TOKEN', env.get('HTTP_X_STORAGE_TOKEN'))
- if token and len(token) > swauth.authtypes.MAX_TOKEN_LENGTH:
+ if token and len(token) > authtypes.MAX_TOKEN_LENGTH:
return HTTPBadRequest(body='Token exceeds maximum length.')(
env, start_response)
if s3 or (token and token.startswith(self.reseller_prefix)):
diff --git a/test/unit/common/middleware/__init__.py b/test/unit/common/middleware/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/test/unit/common/middleware/__init__.py
diff --git a/test/unit/common/middleware/gswauth/__init__.py b/test/unit/common/middleware/gswauth/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/test/unit/common/middleware/gswauth/__init__.py
diff --git a/test/unit/common/middleware/gswauth/swauth/__init__.py b/test/unit/common/middleware/gswauth/swauth/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/test/unit/common/middleware/gswauth/swauth/__init__.py
diff --git a/test/unit/common/middleware/gswauth/swauth/test_authtypes.py b/test/unit/common/middleware/gswauth/swauth/test_authtypes.py
new file mode 100644
index 0000000..aba9ad7
--- /dev/null
+++ b/test/unit/common/middleware/gswauth/swauth/test_authtypes.py
@@ -0,0 +1,63 @@
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# Pablo Llopis 2011
+
+import unittest
+import gluster.swift.common.middleware.gswauth.swauth.authtypes as authtypes
+
+
+class TestPlaintext(unittest.TestCase):
+
+ def setUp(self):
+ self.auth_encoder = authtypes.Plaintext()
+
+ def test_plaintext_encode(self):
+ enc_key = self.auth_encoder.encode('keystring')
+ self.assertEquals('plaintext:keystring', enc_key)
+
+ def test_plaintext_valid_match(self):
+ creds = 'plaintext:keystring'
+ match = self.auth_encoder.match('keystring', creds)
+ self.assertEquals(match, True)
+
+ def test_plaintext_invalid_match(self):
+ creds = 'plaintext:other-keystring'
+ match = self.auth_encoder.match('keystring', creds)
+ self.assertEquals(match, False)
+
+
+class TestSha1(unittest.TestCase):
+
+ def setUp(self):
+ self.auth_encoder = authtypes.Sha1()
+ self.auth_encoder.salt = 'salt'
+
+ def test_sha1_encode(self):
+ enc_key = self.auth_encoder.encode('keystring')
+ self.assertEquals('sha1:salt$d50dc700c296e23ce5b41f7431a0e01f69010f06',
+ enc_key)
+
+ def test_sha1_valid_match(self):
+ creds = 'sha1:salt$d50dc700c296e23ce5b41f7431a0e01f69010f06'
+ match = self.auth_encoder.match('keystring', creds)
+ self.assertEquals(match, True)
+
+ def test_sha1_invalid_match(self):
+ creds = 'sha1:salt$deadbabedeadbabedeadbabec0ffeebadc0ffeee'
+ match = self.auth_encoder.match('keystring', creds)
+ self.assertEquals(match, False)
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/test/unit/common/middleware/gswauth/swauth/test_middleware.py b/test/unit/common/middleware/gswauth/swauth/test_middleware.py
new file mode 100644
index 0000000..627f1be
--- /dev/null
+++ b/test/unit/common/middleware/gswauth/swauth/test_middleware.py
@@ -0,0 +1,4520 @@
+# Copyright (c) 2010-2011 OpenStack, LLC.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+try:
+ import simplejson as json
+except ImportError:
+ import json
+import unittest
+from contextlib import contextmanager
+from time import time
+
+from swift.common.swob import Request, Response
+
+from gluster.swift.common.middleware.gswauth.swauth import middleware as auth
+from gluster.swift.common.middleware.gswauth.swauth.authtypes import MAX_TOKEN_LENGTH
+
+
+DEFAULT_TOKEN_LIFE = 86400
+MAX_TOKEN_LIFE = 100000
+
+
+class FakeMemcache(object):
+
+ def __init__(self):
+ self.store = {}
+
+ def get(self, key):
+ return self.store.get(key)
+
+ def set(self, key, value, timeout=0, time=0):
+ self.store[key] = value
+ return True
+
+ def incr(self, key, timeout=0, time=0):
+ self.store[key] = self.store.setdefault(key, 0) + 1
+ return self.store[key]
+
+ @contextmanager
+ def soft_lock(self, key, timeout=0, retries=5, time=0):
+ yield True
+
+ def delete(self, key):
+ try:
+ del self.store[key]
+ except Exception:
+ pass
+ return True
+
+
+class FakeApp(object):
+
+ def __init__(
+ self, status_headers_body_iter=None, acl=None, sync_key=None):
+ self.calls = 0
+ self.status_headers_body_iter = status_headers_body_iter
+ if not self.status_headers_body_iter:
+ self.status_headers_body_iter = iter(
+ [('404 Not Found', {}, '')])
+ self.acl = acl
+ self.sync_key = sync_key
+
+ def __call__(self, env, start_response):
+ self.calls += 1
+ self.request = Request.blank('', environ=env)
+ if self.acl:
+ self.request.acl = self.acl
+ if self.sync_key:
+ self.request.environ[
+ 'swift_sync_key'] = self.sync_key
+ if 'swift.authorize' in env:
+ resp = env['swift.authorize'](self.request)
+ if resp:
+ return resp(env, start_response)
+ status, headers, body = self.status_headers_body_iter.next(
+ )
+ return Response(status=status, headers=headers,
+ body=body)(env, start_response)
+
+
+class FakeConn(object):
+
+ def __init__(self, status_headers_body_iter=None):
+ self.calls = 0
+ self.status_headers_body_iter = status_headers_body_iter
+ if not self.status_headers_body_iter:
+ self.status_headers_body_iter = iter(
+ [('404 Not Found', {}, '')])
+
+ def request(self, method, path, headers):
+ self.calls += 1
+ self.request_path = path
+ self.status, self.headers, self.body = \
+ self.status_headers_body_iter.next()
+ self.status, self.reason = self.status.split(' ', 1)
+ self.status = int(self.status)
+
+ def getresponse(self):
+ return self
+
+ def read(self):
+ body = self.body
+ self.body = ''
+ return body
+
+
+class TestAuth(unittest.TestCase):
+
+ def setUp(self):
+ self.test_auth = \
+ auth.filter_factory({
+ 'super_admin_key': 'supertest',
+ 'token_life': str(DEFAULT_TOKEN_LIFE),
+ 'max_token_life': str(MAX_TOKEN_LIFE)})(FakeApp())
+
+ def test_super_admin_key_not_required(self):
+ auth.filter_factory({})(FakeApp())
+
+ def test_reseller_prefix_init(self):
+ app = FakeApp()
+ ath = auth.filter_factory(
+ {'super_admin_key': 'supertest'})(app)
+ self.assertEquals(ath.reseller_prefix, 'AUTH_')
+ ath = auth.filter_factory(
+ {'super_admin_key': 'supertest',
+ 'reseller_prefix': 'TEST'})(app)
+ self.assertEquals(ath.reseller_prefix, 'TEST_')
+ ath = auth.filter_factory(
+ {'super_admin_key': 'supertest',
+ 'reseller_prefix': 'TEST_'})(app)
+ self.assertEquals(ath.reseller_prefix, 'TEST_')
+
+ def test_auth_prefix_init(self):
+ app = FakeApp()
+ ath = auth.filter_factory(
+ {'super_admin_key': 'supertest'})(app)
+ self.assertEquals(ath.auth_prefix, '/auth/')
+ ath = auth.filter_factory(
+ {'super_admin_key': 'supertest',
+ 'auth_prefix': ''})(app)
+ self.assertEquals(ath.auth_prefix, '/auth/')
+ ath = auth.filter_factory(
+ {'super_admin_key': 'supertest',
+ 'auth_prefix': '/test/'})(app)
+ self.assertEquals(ath.auth_prefix, '/test/')
+ ath = auth.filter_factory(
+ {'super_admin_key': 'supertest',
+ 'auth_prefix': '/test'})(app)
+ self.assertEquals(ath.auth_prefix, '/test/')
+ ath = auth.filter_factory(
+ {'super_admin_key': 'supertest',
+ 'auth_prefix': 'test/'})(app)
+ self.assertEquals(ath.auth_prefix, '/test/')
+ ath = auth.filter_factory(
+ {'super_admin_key': 'supertest',
+ 'auth_prefix': 'test'})(app)
+ self.assertEquals(ath.auth_prefix, '/test/')
+
+ def test_no_auth_type_init(self):
+ app = FakeApp()
+ ath = auth.filter_factory({})(app)
+ self.assertEquals(ath.auth_type, 'Plaintext')
+
+ def test_valid_auth_type_init(self):
+ app = FakeApp()
+ ath = auth.filter_factory(
+ {'auth_type': 'sha1'})(app)
+ self.assertEquals(ath.auth_type, 'Sha1')
+ ath = auth.filter_factory(
+ {'auth_type': 'plaintext'})(app)
+ self.assertEquals(ath.auth_type, 'Plaintext')
+
+ def test_invalid_auth_type_init(self):
+ app = FakeApp()
+ exc = None
+ try:
+ auth.filter_factory(
+ {'auth_type': 'NONEXISTANT'})(app)
+ except Exception as err:
+ exc = err
+ self.assertEquals(str(exc),
+ 'Invalid auth_type in config file: %s' %
+ 'Nonexistant')
+
+ def test_default_swift_cluster_init(self):
+ app = FakeApp()
+ self.assertRaises(Exception, auth.filter_factory({
+ 'super_admin_key': 'supertest',
+ 'default_swift_cluster': 'local#badscheme://host/path'}), app)
+ ath = auth.filter_factory(
+ {'super_admin_key': 'supertest'})(app)
+ self.assertEquals(ath.default_swift_cluster,
+ 'local#http://127.0.0.1:8080/v1')
+ ath = auth.filter_factory({
+ 'super_admin_key': 'supertest',
+ 'default_swift_cluster': 'local#http://host/path'})(app)
+ self.assertEquals(ath.default_swift_cluster,
+ 'local#http://host/path')
+ ath = auth.filter_factory({
+ 'super_admin_key': 'supertest',
+ 'default_swift_cluster': 'local#https://host/path/'})(app)
+ self.assertEquals(ath.dsc_url, 'https://host/path')
+ self.assertEquals(ath.dsc_url2, 'https://host/path')
+ ath = auth.filter_factory({
+ 'super_admin_key': 'supertest',
+ 'default_swift_cluster':
+ 'local#https://host/path/#http://host2/path2/'})(app)
+ self.assertEquals(ath.dsc_url, 'https://host/path')
+ self.assertEquals(
+ ath.dsc_url2,
+ 'http://host2/path2')
+
+ def test_top_level_denied(self):
+ resp = Request.blank(
+ '/').get_response(self.test_auth)
+ self.assertEquals(resp.status_int, 401)
+
+ def test_anon(self):
+ resp = Request.blank(
+ '/v1/AUTH_account').get_response(self.test_auth)
+ self.assertEquals(resp.status_int, 401)
+ self.assertEquals(resp.environ['swift.authorize'],
+ self.test_auth.authorize)
+
+ def test_auth_deny_non_reseller_prefix(self):
+ resp = Request.blank(
+ '/v1/BLAH_account',
+ headers={'X-Auth-Token': 'BLAH_t'}).get_response(self.test_auth)
+ self.assertEquals(resp.status_int, 401)
+ self.assertEquals(resp.environ['swift.authorize'],
+ self.test_auth.denied_response)
+
+ def test_auth_deny_non_reseller_prefix_no_override(
+ self):
+ fake_authorize = lambda x: Response(
+ status='500 Fake')
+ resp = Request.blank(
+ '/v1/BLAH_account',
+ headers={'X-Auth-Token': 'BLAH_t'},
+ environ={'swift.authorize': fake_authorize}).get_response(
+ self.test_auth)
+ self.assertEquals(resp.status_int, 500)
+ self.assertEquals(resp.environ['swift.authorize'], fake_authorize)
+
+ def test_auth_no_reseller_prefix_deny(self):
+ # Ensures that when we have no reseller prefix, we don't deny a request
+ # outright but set up a denial swift.authorize and pass the request on
+ # down the chain.
+ local_app = FakeApp()
+ local_auth = auth.filter_factory(
+ {'super_admin_key': 'supertest',
+ 'reseller_prefix': ''})(local_app)
+ resp = Request.blank(
+ '/v1/account',
+ headers={'X-Auth-Token': 't'}).get_response(local_auth)
+ self.assertEquals(resp.status_int, 401)
+ # one for checking auth, two for request passed
+ # along
+ self.assertEquals(local_app.calls, 2)
+ self.assertEquals(resp.environ['swift.authorize'],
+ local_auth.denied_response)
+
+ def test_auth_no_reseller_prefix_allow(self):
+ # Ensures that when we have no reseller prefix, we can still allow
+ # access if our auth server accepts requests
+ local_app = FakeApp(iter([
+ ('200 Ok', {},
+ json.dumps(
+ {'account': 'act', 'user': 'act:usr',
+ 'account_id': 'AUTH_cfa',
+ 'groups': [{'name': 'act:usr'}, {'name': 'act'},
+ {'name': '.admin'}],
+ 'expires': time() + 60})),
+ ('204 No Content', {}, '')]))
+ local_auth = auth.filter_factory(
+ {'super_admin_key': 'supertest',
+ 'reseller_prefix': ''})(local_app)
+ resp = Request.blank(
+ '/v1/act',
+ headers={'X-Auth-Token': 't'}).get_response(local_auth)
+ self.assertEquals(resp.status_int, 204)
+ self.assertEquals(local_app.calls, 2)
+ self.assertEquals(resp.environ['swift.authorize'],
+ local_auth.authorize)
+
+ def test_auth_no_reseller_prefix_no_token(self):
+ # Check that normally we set up a call back to our
+ # authorize.
+ local_auth = \
+ auth.filter_factory(
+ {'super_admin_key': 'supertest',
+ 'reseller_prefix': ''})(FakeApp(iter([])))
+ resp = Request.blank(
+ '/v1/account').get_response(
+ local_auth)
+ self.assertEquals(resp.status_int, 401)
+ self.assertEquals(
+ resp.environ['swift.authorize'], local_auth.authorize)
+ # Now make sure we don't override an existing swift.authorize when we
+ # have no reseller prefix.
+ local_auth = \
+ auth.filter_factory(
+ {'super_admin_key': 'supertest',
+ 'reseller_prefix': ''})(FakeApp())
+ local_authorize = lambda req: Response('test')
+ resp = Request.blank(
+ '/v1/account', environ={'swift.authorize':
+ local_authorize}).get_response(local_auth)
+ self.assertEquals(resp.status_int, 200)
+ self.assertEquals(
+ resp.environ['swift.authorize'],
+ local_authorize)
+
+ def test_auth_fail(self):
+ resp = Request.blank(
+ '/v1/AUTH_cfa',
+ headers={'X-Auth-Token': 'AUTH_t'}).get_response(self.test_auth)
+ self.assertEquals(resp.status_int, 401)
+
+ def test_auth_success(self):
+ self.test_auth.app = FakeApp(iter([
+ ('200 Ok', {},
+ json.dumps(
+ {'account': 'act', 'user': 'act:usr',
+ 'account_id': 'AUTH_cfa',
+ 'groups': [{'name': 'act:usr'}, {'name': 'act'},
+ {'name': '.admin'}],
+ 'expires': time() + 60})),
+ ('204 No Content', {}, '')]))
+ resp = Request.blank(
+ '/v1/AUTH_cfa',
+ headers={'X-Auth-Token': 'AUTH_t'}).get_response(self.test_auth)
+ self.assertEquals(resp.status_int, 204)
+ self.assertEquals(self.test_auth.app.calls, 2)
+
+ def test_auth_memcache(self):
+ # First run our test without memcache, showing we need to return the
+ # token contents twice.
+ self.test_auth.app = FakeApp(iter([
+ ('200 Ok', {},
+ json.dumps(
+ {'account': 'act', 'user': 'act:usr',
+ 'account_id': 'AUTH_cfa',
+ 'groups': [{'name': 'act:usr'}, {'name': 'act'},
+ {'name': '.admin'}],
+ 'expires': time() + 60})),
+ ('204 No Content', {}, ''),
+ ('200 Ok', {},
+ json.dumps(
+ {'account': 'act', 'user': 'act:usr',
+ 'account_id': 'AUTH_cfa',
+ 'groups': [{'name': 'act:usr'}, {'name': 'act'},
+ {'name': '.admin'}],
+ 'expires': time() + 60})),
+ ('204 No Content', {}, '')]))
+ resp = Request.blank(
+ '/v1/AUTH_cfa',
+ headers={'X-Auth-Token': 'AUTH_t'}).get_response(self.test_auth)
+ self.assertEquals(resp.status_int, 204)
+ resp = Request.blank(
+ '/v1/AUTH_cfa',
+ headers={'X-Auth-Token': 'AUTH_t'}).get_response(self.test_auth)
+ self.assertEquals(resp.status_int, 204)
+ self.assertEquals(self.test_auth.app.calls, 4)
+ # Now run our test with memcache, showing we no longer need to return
+ # the token contents twice.
+ self.test_auth.app = FakeApp(iter([
+ ('200 Ok', {},
+ json.dumps(
+ {'account': 'act', 'user': 'act:usr',
+ 'account_id': 'AUTH_cfa',
+ 'groups': [{'name': 'act:usr'}, {'name': 'act'},
+ {'name': '.admin'}],
+ 'expires': time() + 60})),
+ ('204 No Content', {}, ''),
+ # Don't need a second token object returned if memcache is
+ # used
+ ('204 No Content', {}, '')]))
+ fake_memcache = FakeMemcache()
+ resp = Request.blank(
+ '/v1/AUTH_cfa',
+ headers={'X-Auth-Token': 'AUTH_t'},
+ environ={'swift.cache': fake_memcache}).get_response(
+ self.test_auth)
+ self.assertEquals(resp.status_int, 204)
+ resp = Request.blank(
+ '/v1/AUTH_cfa',
+ headers={'X-Auth-Token': 'AUTH_t'},
+ environ={'swift.cache': fake_memcache}).get_response(
+ self.test_auth)
+ self.assertEquals(resp.status_int, 204)
+ self.assertEquals(self.test_auth.app.calls, 3)
+
+ def test_auth_just_expired(self):
+ self.test_auth.app = FakeApp(iter([
+ # Request for token (which will have expired)
+ ('200 Ok', {},
+ json.dumps(
+ {'account': 'act', 'user': 'act:usr',
+ 'account_id': 'AUTH_cfa',
+ 'groups': [{'name': 'act:usr'}, {'name': 'act'},
+ {'name': '.admin'}],
+ 'expires': time() - 1})),
+ # Request to delete token
+ ('204 No Content', {}, '')]))
+ resp = Request.blank(
+ '/v1/AUTH_cfa',
+ headers={'X-Auth-Token': 'AUTH_t'}).get_response(self.test_auth)
+ self.assertEquals(resp.status_int, 401)
+ self.assertEquals(self.test_auth.app.calls, 2)
+
+ def test_middleware_storage_token(self):
+ self.test_auth.app = FakeApp(iter([
+ ('200 Ok', {},
+ json.dumps(
+ {'account': 'act', 'user': 'act:usr',
+ 'account_id': 'AUTH_cfa',
+ 'groups': [{'name': 'act:usr'}, {'name': 'act'},
+ {'name': '.admin'}],
+ 'expires': time() + 60})),
+ ('204 No Content', {}, '')]))
+ resp = Request.blank(
+ '/v1/AUTH_cfa',
+ headers={'X-Storage-Token': 'AUTH_t'}).get_response(self.test_auth)
+ self.assertEquals(resp.status_int, 204)
+ self.assertEquals(self.test_auth.app.calls, 2)
+
+ def test_authorize_bad_path(self):
+ req = Request.blank('/badpath')
+ resp = self.test_auth.authorize(req)
+ self.assertEquals(resp.status_int, 401)
+ req = Request.blank('/badpath')
+ req.remote_user = 'act:usr,act,AUTH_cfa'
+ resp = self.test_auth.authorize(req)
+ self.assertEquals(resp.status_int, 403)
+
+ def test_authorize_account_access(self):
+ req = Request.blank('/v1/AUTH_cfa')
+ req.remote_user = 'act:usr,act,AUTH_cfa'
+ self.assertEquals(
+ self.test_auth.authorize(req),
+ None)
+ req = Request.blank('/v1/AUTH_cfa')
+ req.remote_user = 'act:usr,act'
+ resp = self.test_auth.authorize(req)
+ self.assertEquals(resp.status_int, 403)
+
+ def test_authorize_acl_group_access(self):
+ req = Request.blank('/v1/AUTH_cfa')
+ req.remote_user = 'act:usr,act'
+ resp = self.test_auth.authorize(req)
+ self.assertEquals(resp.status_int, 403)
+ req = Request.blank('/v1/AUTH_cfa')
+ req.remote_user = 'act:usr,act'
+ req.acl = 'act'
+ self.assertEquals(
+ self.test_auth.authorize(req),
+ None)
+ req = Request.blank('/v1/AUTH_cfa')
+ req.remote_user = 'act:usr,act'
+ req.acl = 'act:usr'
+ self.assertEquals(
+ self.test_auth.authorize(req),
+ None)
+ req = Request.blank('/v1/AUTH_cfa')
+ req.remote_user = 'act:usr,act'
+ req.acl = 'act2'
+ resp = self.test_auth.authorize(req)
+ self.assertEquals(resp.status_int, 403)
+ req = Request.blank('/v1/AUTH_cfa')
+ req.remote_user = 'act:usr,act'
+ req.acl = 'act:usr2'
+ resp = self.test_auth.authorize(req)
+ self.assertEquals(resp.status_int, 403)
+
+ def test_deny_cross_reseller(self):
+ # Tests that cross-reseller is denied, even if ACLs/group
+ # names match
+ req = Request.blank('/v1/OTHER_cfa')
+ req.remote_user = 'act:usr,act,AUTH_cfa'
+ req.acl = 'act'
+ resp = self.test_auth.authorize(req)
+ self.assertEquals(resp.status_int, 403)
+
+ def test_authorize_acl_referrer_access(self):
+ req = Request.blank('/v1/AUTH_cfa/c')
+ req.remote_user = 'act:usr,act'
+ resp = self.test_auth.authorize(req)
+ self.assertEquals(resp.status_int, 403)
+ req = Request.blank('/v1/AUTH_cfa/c')
+ req.remote_user = 'act:usr,act'
+ req.acl = '.r:*,.rlistings'
+ self.assertEquals(
+ self.test_auth.authorize(req),
+ None)
+ req = Request.blank('/v1/AUTH_cfa/c')
+ req.remote_user = 'act:usr,act'
+ req.acl = '.r:*' # No listings allowed
+ resp = self.test_auth.authorize(req)
+ self.assertEquals(resp.status_int, 403)
+ req = Request.blank('/v1/AUTH_cfa/c')
+ req.remote_user = 'act:usr,act'
+ req.acl = '.r:.example.com,.rlistings'
+ resp = self.test_auth.authorize(req)
+ self.assertEquals(resp.status_int, 403)
+ req = Request.blank('/v1/AUTH_cfa/c')
+ req.remote_user = 'act:usr,act'
+ req.referer = 'http://www.example.com/index.html'
+ req.acl = '.r:.example.com,.rlistings'
+ self.assertEquals(
+ self.test_auth.authorize(req),
+ None)
+ req = Request.blank('/v1/AUTH_cfa/c')
+ resp = self.test_auth.authorize(req)
+ self.assertEquals(resp.status_int, 401)
+ req = Request.blank('/v1/AUTH_cfa/c')
+ req.acl = '.r:*,.rlistings'
+ self.assertEquals(
+ self.test_auth.authorize(req),
+ None)
+ req = Request.blank('/v1/AUTH_cfa/c')
+ req.acl = '.r:*' # No listings allowed
+ resp = self.test_auth.authorize(req)
+ self.assertEquals(resp.status_int, 401)
+ req = Request.blank('/v1/AUTH_cfa/c')
+ req.acl = '.r:.example.com,.rlistings'
+ resp = self.test_auth.authorize(req)
+ self.assertEquals(resp.status_int, 401)
+ req = Request.blank('/v1/AUTH_cfa/c')
+ req.referer = 'http://www.example.com/index.html'
+ req.acl = '.r:.example.com,.rlistings'
+ self.assertEquals(
+ self.test_auth.authorize(req),
+ None)
+
+ def test_detect_reseller_request(self):
+ req = self._make_request('/v1/AUTH_admin',
+ headers={'X-Auth-Token': 'AUTH_t'})
+ cache_key = 'AUTH_/auth/AUTH_t'
+ cache_entry = (time() + 3600, '.reseller_admin')
+ req.environ['swift.cache'].set(
+ cache_key, cache_entry)
+ resp = req.get_response(self.test_auth)
+ self.assertTrue(req.environ.get('reseller_request'))
+
+ def test_account_put_permissions(self):
+ req = Request.blank(
+ '/v1/AUTH_new',
+ environ={'REQUEST_METHOD': 'PUT'})
+ req.remote_user = 'act:usr,act'
+ resp = self.test_auth.authorize(req)
+ self.assertEquals(resp.status_int, 403)
+
+ req = Request.blank(
+ '/v1/AUTH_new',
+ environ={'REQUEST_METHOD': 'PUT'})
+ req.remote_user = 'act:usr,act,AUTH_other'
+ resp = self.test_auth.authorize(req)
+ self.assertEquals(resp.status_int, 403)
+
+ # Even PUTs to your own account as account admin
+ # should fail
+ req = Request.blank(
+ '/v1/AUTH_old',
+ environ={'REQUEST_METHOD': 'PUT'})
+ req.remote_user = 'act:usr,act,AUTH_old'
+ resp = self.test_auth.authorize(req)
+ self.assertEquals(resp.status_int, 403)
+
+ req = Request.blank(
+ '/v1/AUTH_new',
+ environ={'REQUEST_METHOD': 'PUT'})
+ req.remote_user = 'act:usr,act,.reseller_admin'
+ resp = self.test_auth.authorize(req)
+ self.assertEquals(resp, None)
+
+ # .super_admin is not something the middleware should ever see or care
+ # about
+ req = Request.blank(
+ '/v1/AUTH_new',
+ environ={'REQUEST_METHOD': 'PUT'})
+ req.remote_user = 'act:usr,act,.super_admin'
+ resp = self.test_auth.authorize(req)
+ self.assertEquals(resp.status_int, 403)
+
+ def test_account_delete_permissions(self):
+ req = Request.blank('/v1/AUTH_new',
+ environ={'REQUEST_METHOD': 'DELETE'})
+ req.remote_user = 'act:usr,act'
+ resp = self.test_auth.authorize(req)
+ self.assertEquals(resp.status_int, 403)
+
+ req = Request.blank('/v1/AUTH_new',
+ environ={'REQUEST_METHOD': 'DELETE'})
+ req.remote_user = 'act:usr,act,AUTH_other'
+ resp = self.test_auth.authorize(req)
+ self.assertEquals(resp.status_int, 403)
+
+ # Even DELETEs to your own account as account admin should
+ # fail
+ req = Request.blank('/v1/AUTH_old',
+ environ={'REQUEST_METHOD': 'DELETE'})
+ req.remote_user = 'act:usr,act,AUTH_old'
+ resp = self.test_auth.authorize(req)
+ self.assertEquals(resp.status_int, 403)
+
+ req = Request.blank('/v1/AUTH_new',
+ environ={'REQUEST_METHOD': 'DELETE'})
+ req.remote_user = 'act:usr,act,.reseller_admin'
+ resp = self.test_auth.authorize(req)
+ self.assertEquals(resp, None)
+
+ # .super_admin is not something the middleware should ever see or care
+ # about
+ req = Request.blank('/v1/AUTH_new',
+ environ={'REQUEST_METHOD': 'DELETE'})
+ req.remote_user = 'act:usr,act,.super_admin'
+ resp = self.test_auth.authorize(req)
+ resp = self.test_auth.authorize(req)
+ self.assertEquals(resp.status_int, 403)
+
+ def test_get_token_fail(self):
+ resp = Request.blank(
+ '/auth/v1.0').get_response(
+ self.test_auth)
+ self.assertEquals(resp.status_int, 401)
+ resp = Request.blank(
+ '/auth/v1.0',
+ headers={'X-Auth-User': 'act:usr',
+ 'X-Auth-Key': 'key'}).get_response(self.test_auth)
+ self.assertEquals(resp.status_int, 401)
+
+ def test_get_token_fail_invalid_key(self):
+ self.test_auth.app = FakeApp(iter([
+ # GET of user object
+ ('200 Ok', {},
+ json.dumps({"auth": "plaintext:key",
+ "groups": [{'name': "act:usr"}, {'name': "act"},
+ {'name': ".admin"}]}))]))
+ resp = Request.blank(
+ '/auth/v1.0',
+ headers={'X-Auth-User': 'act:usr',
+ 'X-Auth-Key': 'invalid'}).get_response(self.test_auth)
+ self.assertEquals(resp.status_int, 401)
+ self.assertEquals(self.test_auth.app.calls, 1)
+
+ def test_get_token_fail_invalid_x_auth_user_format(
+ self):
+ resp = Request.blank(
+ '/auth/v1/act/auth',
+ headers={'X-Auth-User': 'usr',
+ 'X-Auth-Key': 'key'}).get_response(self.test_auth)
+ self.assertEquals(resp.status_int, 401)
+
+ def test_get_token_fail_non_matching_account_in_request(
+ self):
+ resp = Request.blank(
+ '/auth/v1/act/auth',
+ headers={'X-Auth-User': 'act2:usr',
+ 'X-Auth-Key': 'key'}).get_response(self.test_auth)
+ self.assertEquals(resp.status_int, 401)
+
+ def test_get_token_fail_bad_path(self):
+ resp = Request.blank(
+ '/auth/v1/act/auth/invalid',
+ headers={'X-Auth-User': 'act:usr',
+ 'X-Auth-Key': 'key'}).get_response(self.test_auth)
+ self.assertEquals(resp.status_int, 400)
+
+ def test_get_token_fail_missing_key(self):
+ resp = Request.blank(
+ '/auth/v1/act/auth',
+ headers={'X-Auth-User': 'act:usr'}).get_response(self.test_auth)
+ self.assertEquals(resp.status_int, 401)
+
+ def test_get_token_fail_get_user_details(self):
+ self.test_auth.app = FakeApp(iter([
+ ('503 Service Unavailable', {}, '')]))
+ resp = Request.blank(
+ '/auth/v1.0',
+ headers={'X-Auth-User': 'act:usr',
+ 'X-Auth-Key': 'key'}).get_response(self.test_auth)
+ self.assertEquals(resp.status_int, 500)
+ self.assertEquals(self.test_auth.app.calls, 1)
+
+ def test_get_token_fail_get_account(self):
+ self.test_auth.app = FakeApp(iter([
+ # GET of user object
+ ('200 Ok', {},
+ json.dumps({"auth": "plaintext:key",
+ "groups": [{'name': "act:usr"}, {'name': "act"},
+ {'name': ".admin"}]})),
+ # GET of account
+ ('503 Service Unavailable', {}, '')]))
+ resp = Request.blank(
+ '/auth/v1.0',
+ headers={'X-Auth-User': 'act:usr',
+ 'X-Auth-Key': 'key'}).get_response(self.test_auth)
+ self.assertEquals(resp.status_int, 500)
+ self.assertEquals(self.test_auth.app.calls, 2)
+
+ def test_get_token_fail_put_new_token(self):
+ self.test_auth.app = FakeApp(iter([
+ # GET of user object
+ ('200 Ok', {},
+ json.dumps({"auth": "plaintext:key",
+ "groups": [{'name': "act:usr"}, {'name': "act"},
+ {'name': ".admin"}]})),
+ # GET of account
+ ('204 Ok',
+ {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, ''),
+ # PUT of new token
+ ('503 Service Unavailable', {}, '')]))
+ resp = Request.blank(
+ '/auth/v1.0',
+ headers={'X-Auth-User': 'act:usr',
+ 'X-Auth-Key': 'key'}).get_response(self.test_auth)
+ self.assertEquals(resp.status_int, 500)
+ self.assertEquals(self.test_auth.app.calls, 3)
+
+ def test_get_token_fail_post_to_user(self):
+ self.test_auth.app = FakeApp(iter([
+ # GET of user object
+ ('200 Ok', {},
+ json.dumps({"auth": "plaintext:key",
+ "groups": [{'name': "act:usr"}, {'name': "act"},
+ {'name': ".admin"}]})),
+ # GET of account
+ ('204 Ok',
+ {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, ''),
+ # PUT of new token
+ ('201 Created', {}, ''),
+ # POST of token to user object
+ ('503 Service Unavailable', {}, '')]))
+ resp = Request.blank(
+ '/auth/v1.0',
+ headers={'X-Auth-User': 'act:usr',
+ 'X-Auth-Key': 'key'}).get_response(self.test_auth)
+ self.assertEquals(resp.status_int, 500)
+ self.assertEquals(self.test_auth.app.calls, 4)
+
+ def test_get_token_fail_get_services(self):
+ self.test_auth.app = FakeApp(iter([
+ # GET of user object
+ ('200 Ok', {},
+ json.dumps({"auth": "plaintext:key",
+ "groups": [{'name': "act:usr"}, {'name': "act"},
+ {'name': ".admin"}]})),
+ # GET of account
+ ('204 Ok',
+ {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, ''),
+ # PUT of new token
+ ('201 Created', {}, ''),
+ # POST of token to user object
+ ('204 No Content', {}, ''),
+ # GET of services object
+ ('503 Service Unavailable', {}, '')]))
+ resp = Request.blank(
+ '/auth/v1.0',
+ headers={'X-Auth-User': 'act:usr',
+ 'X-Auth-Key': 'key'}).get_response(self.test_auth)
+ self.assertEquals(resp.status_int, 500)
+ self.assertEquals(self.test_auth.app.calls, 5)
+
+ def test_get_token_fail_get_existing_token(self):
+ self.test_auth.app = FakeApp(iter([
+ # GET of user object
+ ('200 Ok', {'X-Object-Meta-Auth-Token': 'AUTH_tktest'},
+ json.dumps({"auth": "plaintext:key",
+ "groups": [{'name': "act:usr"}, {'name': "act"},
+ {'name': ".admin"}]})),
+ # GET of token
+ ('503 Service Unavailable', {}, '')]))
+ resp = Request.blank(
+ '/auth/v1.0',
+ headers={'X-Auth-User': 'act:usr',
+ 'X-Auth-Key': 'key'}).get_response(self.test_auth)
+ self.assertEquals(resp.status_int, 500)
+ self.assertEquals(self.test_auth.app.calls, 2)
+
+ def test_get_token_success_v1_0(self):
+ self.test_auth.app = FakeApp(iter([
+ # GET of user object
+ ('200 Ok', {},
+ json.dumps({"auth": "plaintext:key",
+ "groups": [{'name': "act:usr"}, {'name': "act"},
+ {'name': ".admin"}]})),
+ # GET of account
+ ('204 Ok',
+ {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, ''),
+ # PUT of new token
+ ('201 Created', {}, ''),
+ # POST of token to user object
+ ('204 No Content', {}, ''),
+ # GET of services object
+ ('200 Ok', {}, json.dumps({"storage": {"default": "local",
+ "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}}))]))
+ resp = Request.blank(
+ '/auth/v1.0',
+ headers={'X-Auth-User': 'act:usr',
+ 'X-Auth-Key': 'key'}).get_response(self.test_auth)
+ self.assertEquals(resp.status_int, 200)
+ self.assert_(resp.headers.get(
+ 'x-auth-token',
+ '').startswith('AUTH_tk'), resp.headers.get('x-auth-token'))
+ self.assertEquals(resp.headers.get('x-auth-token'),
+ resp.headers.get('x-storage-token'))
+ self.assertEquals(resp.headers.get('x-storage-url'),
+ 'http://127.0.0.1:8080/v1/AUTH_cfa')
+ self.assertEquals(
+ json.loads(resp.body),
+ {"storage": {"default": "local",
+ "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}})
+ self.assertEquals(self.test_auth.app.calls, 5)
+
+ def test_get_token_success_v1_0_with_user_token_life(
+ self):
+ self.test_auth.app = FakeApp(iter([
+ # GET of user object
+ ('200 Ok', {},
+ json.dumps({"auth": "plaintext:key",
+ "groups": [{'name': "act:usr"}, {'name': "act"},
+ {'name': ".admin"}]})),
+ # GET of account
+ ('204 Ok',
+ {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, ''),
+ # PUT of new token
+ ('201 Created', {}, ''),
+ # POST of token to user object
+ ('204 No Content', {}, ''),
+ # GET of services object
+ ('200 Ok', {}, json.dumps({"storage": {"default": "local",
+ "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}}))]))
+ resp = Request.blank(
+ '/auth/v1.0',
+ headers={'X-Auth-User': 'act:usr',
+ 'X-Auth-Key': 'key',
+ 'X-Auth-Token-Lifetime': 10}).get_response(self.test_auth)
+ self.assertEquals(resp.status_int, 200)
+ left = int(resp.headers['x-auth-token-expires'])
+ self.assertTrue(left > 0, '%d > 0' % left)
+ self.assertTrue(left <= 10, '%d <= 10' % left)
+ self.assert_(resp.headers.get(
+ 'x-auth-token',
+ '').startswith('AUTH_tk'), resp.headers.get('x-auth-token'))
+ self.assertEquals(resp.headers.get('x-auth-token'),
+ resp.headers.get('x-storage-token'))
+ self.assertEquals(resp.headers.get('x-storage-url'),
+ 'http://127.0.0.1:8080/v1/AUTH_cfa')
+ self.assertEquals(
+ json.loads(resp.body),
+ {"storage": {"default": "local",
+ "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}})
+ self.assertEquals(self.test_auth.app.calls, 5)
+
+ def test_get_token_success_v1_0_with_user_token_life_past_max(
+ self):
+ self.test_auth.app = FakeApp(iter([
+ # GET of user object
+ ('200 Ok', {},
+ json.dumps({"auth": "plaintext:key",
+ "groups": [{'name': "act:usr"}, {'name': "act"},
+ {'name': ".admin"}]})),
+ # GET of account
+ ('204 Ok',
+ {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, ''),
+ # PUT of new token
+ ('201 Created', {}, ''),
+ # POST of token to user object
+ ('204 No Content', {}, ''),
+ # GET of services object
+ ('200 Ok', {}, json.dumps({"storage": {"default": "local",
+ "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}}))]))
+ req = Request.blank(
+ '/auth/v1.0',
+ headers={'X-Auth-User': 'act:usr',
+ 'X-Auth-Key': 'key',
+ 'X-Auth-Token-Lifetime': MAX_TOKEN_LIFE * 10})
+ resp = req.get_response(self.test_auth)
+ self.assertEquals(resp.status_int, 200)
+ left = int(resp.headers['x-auth-token-expires'])
+ self.assertTrue(left > DEFAULT_TOKEN_LIFE,
+ '%d > %d' % (left, DEFAULT_TOKEN_LIFE))
+ self.assertTrue(left <= MAX_TOKEN_LIFE,
+ '%d <= %d' % (left, MAX_TOKEN_LIFE))
+ self.assert_(resp.headers.get(
+ 'x-auth-token',
+ '').startswith('AUTH_tk'), resp.headers.get('x-auth-token'))
+ self.assertEquals(resp.headers.get('x-auth-token'),
+ resp.headers.get('x-storage-token'))
+ self.assertEquals(resp.headers.get('x-storage-url'),
+ 'http://127.0.0.1:8080/v1/AUTH_cfa')
+ self.assertEquals(
+ json.loads(resp.body),
+ {"storage": {"default": "local",
+ "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}})
+ self.assertEquals(self.test_auth.app.calls, 5)
+
+ def test_get_token_success_v1_act_auth(self):
+ self.test_auth.app = FakeApp(iter([
+ # GET of user object
+ ('200 Ok', {},
+ json.dumps({"auth": "plaintext:key",
+ "groups": [{'name': "act:usr"}, {'name': "act"},
+ {'name': ".admin"}]})),
+ # GET of account
+ ('204 Ok',
+ {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, ''),
+ # PUT of new token
+ ('201 Created', {}, ''),
+ # POST of token to user object
+ ('204 No Content', {}, ''),
+ # GET of services object
+ ('200 Ok', {}, json.dumps({"storage": {"default": "local",
+ "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}}))]))
+ resp = Request.blank(
+ '/auth/v1/act/auth',
+ headers={'X-Storage-User': 'usr',
+ 'X-Storage-Pass': 'key'}).get_response(self.test_auth)
+ self.assertEquals(resp.status_int, 200)
+ self.assert_(resp.headers.get(
+ 'x-auth-token',
+ '').startswith('AUTH_tk'), resp.headers.get('x-auth-token'))
+ self.assertEquals(resp.headers.get('x-auth-token'),
+ resp.headers.get('x-storage-token'))
+ self.assertEquals(resp.headers.get('x-storage-url'),
+ 'http://127.0.0.1:8080/v1/AUTH_cfa')
+ self.assertEquals(
+ json.loads(resp.body),
+ {"storage": {"default": "local",
+ "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}})
+ self.assertEquals(self.test_auth.app.calls, 5)
+
+ def test_get_token_success_storage_instead_of_auth(
+ self):
+ self.test_auth.app = FakeApp(iter([
+ # GET of user object
+ ('200 Ok', {},
+ json.dumps({"auth": "plaintext:key",
+ "groups": [{'name': "act:usr"}, {'name': "act"},
+ {'name': ".admin"}]})),
+ # GET of account
+ ('204 Ok',
+ {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, ''),
+ # PUT of new token
+ ('201 Created', {}, ''),
+ # POST of token to user object
+ ('204 No Content', {}, ''),
+ # GET of services object
+ ('200 Ok', {}, json.dumps({"storage": {"default": "local",
+ "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}}))]))
+ resp = Request.blank(
+ '/auth/v1.0',
+ headers={'X-Storage-User': 'act:usr',
+ 'X-Storage-Pass': 'key'}).get_response(self.test_auth)
+ self.assertEquals(resp.status_int, 200)
+ self.assert_(
+ resp.headers.get(
+ 'x-auth-token',
+ '').startswith('AUTH_tk'), resp.headers.get('x-auth-token'))
+ self.assertEquals(resp.headers.get('x-auth-token'),
+ resp.headers.get('x-storage-token'))
+ self.assertEquals(resp.headers.get('x-storage-url'),
+ 'http://127.0.0.1:8080/v1/AUTH_cfa')
+ self.assertEquals(
+ json.loads(resp.body),
+ {"storage": {"default": "local",
+ "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}})
+ self.assertEquals(self.test_auth.app.calls, 5)
+
+ def test_get_token_success_v1_act_auth_auth_instead_of_storage(
+ self):
+ self.test_auth.app = FakeApp(iter([
+ # GET of user object
+ ('200 Ok', {},
+ json.dumps({"auth": "plaintext:key",
+ "groups": [{'name': "act:usr"}, {'name': "act"},
+ {'name': ".admin"}]})),
+ # GET of account
+ ('204 Ok',
+ {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, ''),
+ # PUT of new token
+ ('201 Created', {}, ''),
+ # POST of token to user object
+ ('204 No Content', {}, ''),
+ # GET of services object
+ ('200 Ok', {}, json.dumps({"storage": {"default": "local",
+ "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}}))]))
+ resp = Request.blank(
+ '/auth/v1/act/auth',
+ headers={'X-Auth-User': 'act:usr',
+ 'X-Auth-Key': 'key'}).get_response(self.test_auth)
+ self.assertEquals(resp.status_int, 200)
+ self.assert_(resp.headers.get(
+ 'x-auth-token',
+ '').startswith('AUTH_tk'), resp.headers.get('x-auth-token'))
+ self.assertEquals(resp.headers.get('x-auth-token'),
+ resp.headers.get('x-storage-token'))
+ self.assertEquals(resp.headers.get('x-storage-url'),
+ 'http://127.0.0.1:8080/v1/AUTH_cfa')
+ self.assertEquals(
+ json.loads(resp.body),
+ {"storage": {"default": "local",
+ "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}})
+ self.assertEquals(self.test_auth.app.calls, 5)
+
+ def test_get_token_success_existing_token(self):
+ self.test_auth.app = FakeApp(iter([
+ # GET of user object
+ ('200 Ok', {'X-Object-Meta-Auth-Token': 'AUTH_tktest'},
+ json.dumps({"auth": "plaintext:key",
+ "groups": [{'name': "act:usr"}, {'name': "act"},
+ {'name': ".admin"}]})),
+ # GET of token
+ ('200 Ok', {}, json.dumps(
+ {"account": "act", "user": "usr",
+ "account_id": "AUTH_cfa",
+ "groups": [{'name': "act:usr"},
+ {'name': "key"}, {'name': ".admin"}],
+ "expires": 9999999999.9999999})),
+ # GET of services object
+ ('200 Ok', {}, json.dumps({"storage": {"default": "local",
+ "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}}))]))
+ resp = Request.blank(
+ '/auth/v1.0',
+ headers={'X-Auth-User': 'act:usr',
+ 'X-Auth-Key': 'key'}).get_response(self.test_auth)
+ self.assertEquals(resp.status_int, 200)
+ self.assertEquals(
+ resp.headers.get('x-auth-token'),
+ 'AUTH_tktest')
+ self.assertEquals(resp.headers.get('x-auth-token'),
+ resp.headers.get('x-storage-token'))
+ self.assertEquals(resp.headers.get('x-storage-url'),
+ 'http://127.0.0.1:8080/v1/AUTH_cfa')
+ self.assertEquals(
+ json.loads(resp.body),
+ {"storage": {"default": "local",
+ "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}})
+ self.assertEquals(self.test_auth.app.calls, 3)
+
+ def test_get_token_success_existing_token_but_request_new_one(
+ self):
+ self.test_auth.app = FakeApp(iter([
+ # GET of user object
+ ('200 Ok', {'X-Object-Meta-Auth-Token': 'AUTH_tktest'},
+ json.dumps({"auth": "plaintext:key",
+ "groups": [{'name': "act:usr"}, {'name': "act"},
+ {'name': ".admin"}]})),
+ # DELETE of expired token
+ ('204 No Content', {}, ''),
+ # GET of account
+ ('204 Ok',
+ {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, ''),
+ # PUT of new token
+ ('201 Created', {}, ''),
+ # POST of token to user object
+ ('204 No Content', {}, ''),
+ # GET of services object
+ ('200 Ok', {}, json.dumps({"storage": {"default": "local",
+ "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}}))]))
+ resp = Request.blank(
+ '/auth/v1.0',
+ headers={'X-Auth-User': 'act:usr',
+ 'X-Auth-Key': 'key',
+ 'X-Auth-New-Token': 'true'}).get_response(
+ self.test_auth)
+ self.assertEquals(resp.status_int, 200)
+ self.assertNotEquals(
+ resp.headers.get('x-auth-token'), 'AUTH_tktest')
+ self.assertEquals(resp.headers.get('x-auth-token'),
+ resp.headers.get('x-storage-token'))
+ self.assertEquals(resp.headers.get('x-storage-url'),
+ 'http://127.0.0.1:8080/v1/AUTH_cfa')
+ self.assertEquals(
+ json.loads(resp.body),
+ {"storage": {"default": "local",
+ "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}})
+ self.assertEquals(self.test_auth.app.calls, 6)
+
+ def test_get_token_success_existing_token_expired(self):
+ self.test_auth.app = FakeApp(iter([
+ # GET of user object
+ ('200 Ok', {'X-Object-Meta-Auth-Token': 'AUTH_tktest'},
+ json.dumps({"auth": "plaintext:key",
+ "groups": [{'name': "act:usr"}, {'name': "act"},
+ {'name': ".admin"}]})),
+ # GET of token
+ ('200 Ok', {}, json.dumps(
+ {"account": "act", "user": "usr",
+ "account_id": "AUTH_cfa",
+ "groups": [{'name': "act:usr"},
+ {'name': "key"}, {'name': ".admin"}],
+ "expires": 0.0})),
+ # DELETE of expired token
+ ('204 No Content', {}, ''),
+ # GET of account
+ ('204 Ok',
+ {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, ''),
+ # PUT of new token
+ ('201 Created', {}, ''),
+ # POST of token to user object
+ ('204 No Content', {}, ''),
+ # GET of services object
+ ('200 Ok', {}, json.dumps({"storage": {"default": "local",
+ "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}}))]))
+ resp = Request.blank(
+ '/auth/v1.0',
+ headers={'X-Auth-User': 'act:usr',
+ 'X-Auth-Key': 'key'}).get_response(self.test_auth)
+ self.assertEquals(resp.status_int, 200)
+ self.assertNotEquals(
+ resp.headers.get('x-auth-token'),
+ 'AUTH_tktest')
+ self.assertEquals(resp.headers.get('x-auth-token'),
+ resp.headers.get('x-storage-token'))
+ self.assertEquals(resp.headers.get('x-storage-url'),
+ 'http://127.0.0.1:8080/v1/AUTH_cfa')
+ self.assertEquals(
+ json.loads(resp.body),
+ {"storage": {"default": "local",
+ "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}})
+ self.assertEquals(self.test_auth.app.calls, 7)
+
+ def test_get_token_success_existing_token_expired_fail_deleting_old(
+ self):
+ self.test_auth.app = FakeApp(iter([
+ # GET of user object
+ ('200 Ok', {'X-Object-Meta-Auth-Token': 'AUTH_tktest'},
+ json.dumps({"auth": "plaintext:key",
+ "groups": [{'name': "act:usr"}, {'name': "act"},
+ {'name': ".admin"}]})),
+ # GET of token
+ ('200 Ok', {}, json.dumps({"account": "act", "user": "usr",
+ "account_id": "AUTH_cfa",
+ "groups": [{'name': "act:usr"},
+ {'name': "key"}, {'name': ".admin"}],
+ "expires": 0.0})),
+ # DELETE of expired token
+ ('503 Service Unavailable', {}, ''),
+ # GET of account
+ ('204 Ok',
+ {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, ''),
+ # PUT of new token
+ ('201 Created', {}, ''),
+ # POST of token to user object
+ ('204 No Content', {}, ''),
+ # GET of services object
+ ('200 Ok', {}, json.dumps({"storage": {"default": "local",
+ "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}}))]))
+ resp = Request.blank(
+ '/auth/v1.0',
+ headers={'X-Auth-User': 'act:usr',
+ 'X-Auth-Key': 'key'}).get_response(self.test_auth)
+ self.assertEquals(resp.status_int, 200)
+ self.assertNotEquals(
+ resp.headers.get('x-auth-token'),
+ 'AUTH_tktest')
+ self.assertEquals(resp.headers.get('x-auth-token'),
+ resp.headers.get('x-storage-token'))
+ self.assertEquals(resp.headers.get('x-storage-url'),
+ 'http://127.0.0.1:8080/v1/AUTH_cfa')
+ self.assertEquals(
+ json.loads(resp.body),
+ {"storage": {"default": "local",
+ "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}})
+ self.assertEquals(self.test_auth.app.calls, 7)
+
+ def test_prep_success(self):
+ list_to_iter = [
+ # PUT of .auth account
+ ('201 Created', {}, ''),
+ # PUT of .account_id container
+ ('201 Created', {}, '')]
+ # PUT of .token* containers
+ for x in xrange(16):
+ list_to_iter.append(('201 Created', {}, ''))
+ self.test_auth.app = FakeApp(iter(list_to_iter))
+ resp = Request.blank('/auth/v2/.prep',
+ environ={
+ 'REQUEST_METHOD': 'POST'},
+ headers={
+ 'X-Auth-Admin-User':
+ '.super_admin',
+ 'X-Auth-Admin-Key': 'supertest'}
+ ).get_response(self.test_auth)
+ self.assertEquals(resp.status_int, 204)
+ self.assertEquals(self.test_auth.app.calls, 18)
+
+ def test_prep_bad_method(self):
+ resp = Request.blank('/auth/v2/.prep',
+ headers={
+ 'X-Auth-Admin-User':
+ '.super_admin',
+ 'X-Auth-Admin-Key': 'supertest'}
+ ).get_response(self.test_auth)
+ self.assertEquals(resp.status_int, 400)
+ resp = Request.blank('/auth/v2/.prep',
+ environ={
+ 'REQUEST_METHOD': 'HEAD'},
+ headers={
+ 'X-Auth-Admin-User':
+ '.super_admin',
+ 'X-Auth-Admin-Key': 'supertest'}
+ ).get_response(self.test_auth)
+ self.assertEquals(resp.status_int, 400)
+ resp = Request.blank('/auth/v2/.prep',
+ environ={
+ 'REQUEST_METHOD': 'PUT'},
+ headers={
+ 'X-Auth-Admin-User':
+ '.super_admin',
+ 'X-Auth-Admin-Key': 'supertest'}
+ ).get_response(self.test_auth)
+ self.assertEquals(resp.status_int, 400)
+
+ def test_prep_bad_creds(self):
+ resp = Request.blank('/auth/v2/.prep',
+ environ={
+ 'REQUEST_METHOD': 'POST'},
+ headers={
+ 'X-Auth-Admin-User':
+ 'super_admin',
+ 'X-Auth-Admin-Key': 'supertest'}
+ ).get_response(self.test_auth)
+ self.assertEquals(resp.status_int, 403)
+ resp = Request.blank('/auth/v2/.prep',
+ environ={
+ 'REQUEST_METHOD': 'POST'},
+ headers={
+ 'X-Auth-Admin-User':
+ '.super_admin',
+ 'X-Auth-Admin-Key': 'upertest'}
+ ).get_response(self.test_auth)
+ self.assertEquals(resp.status_int, 403)
+ resp = Request.blank('/auth/v2/.prep',
+ environ={
+ 'REQUEST_METHOD': 'POST'},
+ headers={
+ 'X-Auth-Admin-User': '.super_admin'}
+ ).get_response(self.test_auth)
+ self.assertEquals(resp.status_int, 403)
+ resp = Request.blank('/auth/v2/.prep',
+ environ={
+ 'REQUEST_METHOD': 'POST'},
+ headers={
+ 'X-Auth-Admin-Key': 'supertest'}
+ ).get_response(self.test_auth)
+ self.assertEquals(resp.status_int, 403)
+ resp = Request.blank(
+ '/auth/v2/.prep',
+ environ={'REQUEST_METHOD': 'POST'}).get_response(self.test_auth)
+ self.assertEquals(resp.status_int, 403)
+
+ def test_prep_fail_account_create(self):
+ self.test_auth.app = FakeApp(iter([
+ # PUT of .auth account
+ ('503 Service Unavailable', {}, '')]))
+ resp = Request.blank('/auth/v2/.prep',
+ environ={
+ 'REQUEST_METHOD': 'POST'},
+ headers={
+ 'X-Auth-Admin-User':
+ '.super_admin',
+ 'X-Auth-Admin-Key': 'supertest'}
+ ).get_response(self.test_auth)
+ self.assertEquals(resp.status_int, 500)
+ self.assertEquals(self.test_auth.app.calls, 1)
+
+ def test_prep_fail_token_container_create(self):
+ self.test_auth.app = FakeApp(iter([
+ # PUT of .auth account
+ ('201 Created', {}, ''),
+ # PUT of .token container
+ ('503 Service Unavailable', {}, '')]))
+ resp = Request.blank('/auth/v2/.prep',
+ environ={
+ 'REQUEST_METHOD': 'POST'},
+ headers={
+ 'X-Auth-Admin-User':
+ '.super_admin',
+ 'X-Auth-Admin-Key': 'supertest'}
+ ).get_response(self.test_auth)
+ self.assertEquals(resp.status_int, 500)
+ self.assertEquals(self.test_auth.app.calls, 2)
+
+ def test_prep_fail_account_id_container_create(self):
+ self.test_auth.app = FakeApp(iter([
+ # PUT of .auth account
+ ('201 Created', {}, ''),
+ # PUT of .token container
+ ('201 Created', {}, ''),
+ # PUT of .account_id container
+ ('503 Service Unavailable', {}, '')]))
+ resp = Request.blank('/auth/v2/.prep',
+ environ={
+ 'REQUEST_METHOD': 'POST'},
+ headers={
+ 'X-Auth-Admin-User':
+ '.super_admin',
+ 'X-Auth-Admin-Key': 'supertest'}
+ ).get_response(self.test_auth)
+ self.assertEquals(resp.status_int, 500)
+ self.assertEquals(self.test_auth.app.calls, 3)
+
+ def test_get_reseller_success(self):
+ self.test_auth.app = FakeApp(iter([
+ # GET of .auth account (list containers)
+ ('200 Ok', {}, json.dumps([
+ {"name": ".token", "count": 0, "bytes": 0},
+ {"name": ".account_id",
+ "count": 0, "bytes": 0},
+ {"name": "act", "count": 0, "bytes": 0}])),
+ # GET of .auth account (list containers
+ # continuation)
+ ('200 Ok', {}, '[]')]))
+ resp = Request.blank('/auth/v2',
+ headers={
+ 'X-Auth-Admin-User':
+ '.super_admin',
+ 'X-Auth-Admin-Key': 'supertest'}
+ ).get_response(self.test_auth)
+ self.assertEquals(resp.status_int, 200)
+ self.assertEquals(json.loads(resp.body),
+ {"accounts": [{"name": "act"}]})
+ self.assertEquals(self.test_auth.app.calls, 2)
+
+ self.test_auth.app = FakeApp(iter([
+ # GET of user object
+ ('200 Ok', {}, json.dumps({"groups": [{"name": "act:adm"},
+ {"name": "test"}, {"name": ".admin"},
+ {"name": ".reseller_admin"}], "auth": "plaintext:key"})),
+ # GET of .auth account (list containers)
+ ('200 Ok', {}, json.dumps([
+ {"name": ".token", "count": 0, "bytes": 0},
+ {"name": ".account_id",
+ "count": 0, "bytes": 0},
+ {"name": "act", "count": 0, "bytes": 0}])),
+ # GET of .auth account (list containers
+ # continuation)
+ ('200 Ok', {}, '[]')]))
+ resp = Request.blank('/auth/v2',
+ headers={
+ 'X-Auth-Admin-User':
+ 'act:adm',
+ 'X-Auth-Admin-Key': 'key'}
+ ).get_response(self.test_auth)
+ self.assertEquals(resp.status_int, 200)
+ self.assertEquals(json.loads(resp.body),
+ {"accounts": [{"name": "act"}]})
+ self.assertEquals(self.test_auth.app.calls, 3)
+
+ def test_get_reseller_fail_bad_creds(self):
+ self.test_auth.app = FakeApp(iter([
+ # GET of user object
+ ('404 Not Found', {}, '')]))
+ resp = Request.blank('/auth/v2',
+ headers={
+ 'X-Auth-Admin-User':
+ 'super:admin',
+ 'X-Auth-Admin-Key': 'supertest'}
+ ).get_response(self.test_auth)
+ self.assertEquals(resp.status_int, 403)
+ self.assertEquals(self.test_auth.app.calls, 1)
+
+ self.test_auth.app = FakeApp(iter([
+ # GET of user object (account admin, but not reseller
+ # admin)
+ ('200 Ok', {}, json.dumps({"groups": [{"name": "act:adm"},
+ {"name": "test"}, {"name": ".admin"}],
+ "auth": "plaintext:key"}))]))
+ resp = Request.blank('/auth/v2',
+ headers={
+ 'X-Auth-Admin-User':
+ 'act:adm',
+ 'X-Auth-Admin-Key': 'key'}
+ ).get_response(self.test_auth)
+ self.assertEquals(resp.status_int, 403)
+ self.assertEquals(self.test_auth.app.calls, 1)
+
+ self.test_auth.app = FakeApp(iter([
+ # GET of user object (regular user)
+ ('200 Ok', {}, json.dumps({"groups": [{"name": "act:usr"},
+ {"name": "test"}], "auth": "plaintext:key"}))]))
+ resp = Request.blank('/auth/v2',
+ headers={
+ 'X-Auth-Admin-User':
+ 'act:usr',
+ 'X-Auth-Admin-Key': 'key'}
+ ).get_response(self.test_auth)
+ self.assertEquals(resp.status_int, 403)
+ self.assertEquals(self.test_auth.app.calls, 1)
+
+ def test_get_reseller_fail_listing(self):
+ self.test_auth.app = FakeApp(iter([
+ # GET of .auth account (list containers)
+ ('503 Service Unavailable', {}, '')]))
+ resp = Request.blank('/auth/v2',
+ headers={
+ 'X-Auth-Admin-User':
+ '.super_admin',
+ 'X-Auth-Admin-Key': 'supertest'}
+ ).get_response(self.test_auth)
+ self.assertEquals(resp.status_int, 500)
+ self.assertEquals(self.test_auth.app.calls, 1)
+
+ self.test_auth.app = FakeApp(iter([
+ # GET of .auth account (list containers)
+ ('200 Ok', {}, json.dumps([
+ {"name": ".token", "count": 0, "bytes": 0},
+ {"name": ".account_id",
+ "count": 0, "bytes": 0},
+ {"name": "act", "count": 0, "bytes": 0}])),
+ # GET of .auth account (list containers
+ # continuation)
+ ('503 Service Unavailable', {}, '')]))
+ resp = Request.blank('/auth/v2',
+ headers={
+ 'X-Auth-Admin-User':
+ '.super_admin',
+ 'X-Auth-Admin-Key': 'supertest'}
+ ).get_response(self.test_auth)
+ self.assertEquals(resp.status_int, 500)
+ self.assertEquals(self.test_auth.app.calls, 2)
+
+ def test_get_account_success(self):
+ self.test_auth.app = FakeApp(iter([
+ # GET of .services object
+ ('200 Ok', {},
+ json.dumps(
+ {"storage": {"default": "local",
+ "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}})),
+ # GET of account container (list objects)
+ ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'},
+ json.dumps([
+ {"name": ".services", "hash": "etag", "bytes": 112,
+ "content_type":
+ "application/octet-stream",
+ "last_modified": "2010-12-03T17:16:27.618110"},
+ {"name": "tester", "hash": "etag", "bytes": 104,
+ "content_type":
+ "application/octet-stream",
+ "last_modified": "2010-12-03T17:16:27.736680"},
+ {"name": "tester3", "hash": "etag", "bytes": 86,
+ "content_type":
+ "application/octet-stream",
+ "last_modified": "2010-12-03T17:16:28.135530"}])),
+ # GET of account container (list objects
+ # continuation)
+ ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, '[]')]))
+ resp = Request.blank('/auth/v2/act',
+ headers={
+ 'X-Auth-Admin-User':
+ '.super_admin',
+ 'X-Auth-Admin-Key': 'supertest'}
+ ).get_response(self.test_auth)
+ self.assertEquals(resp.status_int, 200)
+ self.assertEquals(
+ json.loads(resp.body),
+ {'account_id': 'AUTH_cfa',
+ 'services': {'storage':
+ {'default': 'local',
+ 'local': 'http://127.0.0.1:8080/v1/AUTH_cfa'}},
+ 'users': [{'name': 'tester'}, {'name': 'tester3'}]})
+ self.assertEquals(self.test_auth.app.calls, 3)
+
+ self.test_auth.app = FakeApp(iter([
+ # GET of user object
+ ('200 Ok', {}, json.dumps({"groups": [{"name": "act:adm"},
+ {"name": "test"}, {"name": ".admin"}],
+ "auth": "plaintext:key"})),
+ # GET of .services object
+ ('200 Ok', {},
+ json.dumps({"storage":
+ {"default": "local",
+ "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}})),
+ # GET of account container (list objects)
+ ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'},
+ json.dumps([
+ {"name": ".services", "hash": "etag", "bytes": 112,
+ "content_type":
+ "application/octet-stream",
+ "last_modified": "2010-12-03T17:16:27.618110"},
+ {"name": "tester", "hash": "etag", "bytes": 104,
+ "content_type":
+ "application/octet-stream",
+ "last_modified": "2010-12-03T17:16:27.736680"},
+ {"name": "tester3", "hash": "etag", "bytes": 86,
+ "content_type":
+ "application/octet-stream",
+ "last_modified": "2010-12-03T17:16:28.135530"}])),
+ # GET of account container (list objects
+ # continuation)
+ ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, '[]')]))
+ resp = Request.blank('/auth/v2/act',
+ headers={
+ 'X-Auth-Admin-User':
+ 'act:adm',
+ 'X-Auth-Admin-Key': 'key'}
+ ).get_response(self.test_auth)
+ self.assertEquals(resp.status_int, 200)
+ self.assertEquals(
+ json.loads(resp.body),
+ {'account_id': 'AUTH_cfa',
+ 'services': {'storage':
+ {'default': 'local',
+ 'local': 'http://127.0.0.1:8080/v1/AUTH_cfa'}},
+ 'users': [{'name': 'tester'}, {'name': 'tester3'}]})
+ self.assertEquals(self.test_auth.app.calls, 4)
+
+ def test_get_account_fail_bad_account_name(self):
+ resp = Request.blank('/auth/v2/.token',
+ headers={
+ 'X-Auth-Admin-User':
+ '.super_admin',
+ 'X-Auth-Admin-Key': 'supertest'}
+ ).get_response(self.test_auth)
+ self.assertEquals(resp.status_int, 400)
+ resp = Request.blank('/auth/v2/.anything',
+ headers={
+ 'X-Auth-Admin-User':
+ '.super_admin',
+ 'X-Auth-Admin-Key': 'supertest'}
+ ).get_response(self.test_auth)
+ self.assertEquals(resp.status_int, 400)
+
+ def test_get_account_fail_creds(self):
+ self.test_auth.app = FakeApp(iter([
+ # GET of user object
+ ('404 Not Found', {}, '')]))
+ resp = Request.blank('/auth/v2/act',
+ headers={
+ 'X-Auth-Admin-User':
+ 'super:admin',
+ 'X-Auth-Admin-Key': 'supertest'}
+ ).get_response(self.test_auth)
+ self.assertEquals(resp.status_int, 403)
+ self.assertEquals(self.test_auth.app.calls, 1)
+
+ self.test_auth.app = FakeApp(iter([
+ # GET of user object (account admin, but wrong
+ # account)
+ ('200 Ok', {}, json.dumps({"groups": [{"name": "act2:adm"},
+ {"name": "test"}, {"name": ".admin"}],
+ "auth": "plaintext:key"}))]))
+ resp = Request.blank('/auth/v2/act',
+ headers={
+ 'X-Auth-Admin-User':
+ 'act2:adm',
+ 'X-Auth-Admin-Key': 'key'}
+ ).get_response(self.test_auth)
+ self.assertEquals(resp.status_int, 403)
+ self.assertEquals(self.test_auth.app.calls, 1)
+
+ self.test_auth.app = FakeApp(iter([
+ # GET of user object (regular user)
+ ('200 Ok', {}, json.dumps({"groups": [{"name": "act:usr"},
+ {"name": "test"}], "auth": "plaintext:key"}))]))
+ resp = Request.blank('/auth/v2/act',
+ headers={
+ 'X-Auth-Admin-User':
+ 'act:usr',
+ 'X-Auth-Admin-Key': 'key'}
+ ).get_response(self.test_auth)
+ self.assertEquals(resp.status_int, 403)
+ self.assertEquals(self.test_auth.app.calls, 1)
+
+ def test_get_account_fail_get_services(self):
+ self.test_auth.app = FakeApp(iter([
+ # GET of .services object
+ ('503 Service Unavailable', {}, '')]))
+ resp = Request.blank('/auth/v2/act',
+ headers={
+ 'X-Auth-Admin-User':
+ '.super_admin',
+ 'X-Auth-Admin-Key': 'supertest'}
+ ).get_response(self.test_auth)
+ self.assertEquals(resp.status_int, 500)
+ self.assertEquals(self.test_auth.app.calls, 1)
+
+ self.test_auth.app = FakeApp(iter([
+ # GET of .services object
+ ('404 Not Found', {}, '')]))
+ resp = Request.blank('/auth/v2/act',
+ headers={
+ 'X-Auth-Admin-User':
+ '.super_admin',
+ 'X-Auth-Admin-Key': 'supertest'}
+ ).get_response(self.test_auth)
+ self.assertEquals(resp.status_int, 404)
+ self.assertEquals(self.test_auth.app.calls, 1)
+
+ def test_get_account_fail_listing(self):
+ self.test_auth.app = FakeApp(iter([
+ # GET of .services object
+ ('200 Ok', {}, json.dumps(
+ {"storage": {"default": "local",
+ "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}})),
+ # GET of account container (list objects)
+ ('503 Service Unavailable', {}, '')]))
+ resp = Request.blank('/auth/v2/act',
+ headers={
+ 'X-Auth-Admin-User':
+ '.super_admin',
+ 'X-Auth-Admin-Key': 'supertest'}
+ ).get_response(self.test_auth)
+ self.assertEquals(resp.status_int, 500)
+ self.assertEquals(self.test_auth.app.calls, 2)
+
+ self.test_auth.app = FakeApp(iter([
+ # GET of .services object
+ ('200 Ok', {}, json.dumps(
+ {"storage": {"default": "local",
+ "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}})),
+ # GET of account container (list objects)
+ ('404 Not Found', {}, '')]))
+ resp = Request.blank('/auth/v2/act',
+ headers={
+ 'X-Auth-Admin-User':
+ '.super_admin',
+ 'X-Auth-Admin-Key': 'supertest'}
+ ).get_response(self.test_auth)
+ self.assertEquals(resp.status_int, 404)
+ self.assertEquals(self.test_auth.app.calls, 2)
+
+ self.test_auth.app = FakeApp(iter([
+ # GET of .services object
+ ('200 Ok', {}, json.dumps(
+ {"storage": {"default": "local",
+ "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}})),
+ # GET of account container (list objects)
+ ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'},
+ json.dumps([
+ {"name": ".services", "hash": "etag", "bytes": 112,
+ "content_type":
+ "application/octet-stream",
+ "last_modified": "2010-12-03T17:16:27.618110"},
+ {"name": "tester", "hash": "etag", "bytes": 104,
+ "content_type":
+ "application/octet-stream",
+ "last_modified": "2010-12-03T17:16:27.736680"},
+ {"name": "tester3", "hash": "etag", "bytes": 86,
+ "content_type":
+ "application/octet-stream",
+ "last_modified": "2010-12-03T17:16:28.135530"}])),
+ # GET of account container (list objects
+ # continuation)
+ ('503 Service Unavailable', {}, '')]))
+ resp = Request.blank('/auth/v2/act',
+ headers={
+ 'X-Auth-Admin-User':
+ '.super_admin',
+ 'X-Auth-Admin-Key': 'supertest'}
+ ).get_response(self.test_auth)
+ self.assertEquals(resp.status_int, 500)
+ self.assertEquals(self.test_auth.app.calls, 3)
+
+ def test_set_services_new_service(self):
+ self.test_auth.app = FakeApp(iter([
+ # GET of .services object
+ ('200 Ok', {},
+ json.dumps({"storage":
+ {"default": "local",
+ "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}})),
+ # PUT of new .services object
+ ('204 No Content', {}, '')]))
+ resp = Request.blank('/auth/v2/act/.services',
+ environ={
+ 'REQUEST_METHOD': 'POST'},
+ headers={
+ 'X-Auth-Admin-User':
+ '.super_admin',
+ 'X-Auth-Admin-Key': 'supertest'},
+ body=json.dumps(
+ {'new_service':
+ {'new_endpoint': 'new_value'}})
+ ).get_response(self.test_auth)
+ self.assertEquals(resp.status_int, 200)
+ self.assertEquals(
+ json.loads(resp.body),
+ {'storage': {'default': 'local',
+ 'local': 'http://127.0.0.1:8080/v1/AUTH_cfa'},
+ 'new_service': {'new_endpoint': 'new_value'}})
+ self.assertEquals(self.test_auth.app.calls, 2)
+
+ def test_set_services_new_endpoint(self):
+ self.test_auth.app = FakeApp(iter([
+ # GET of .services object
+ ('200 Ok', {}, json.dumps(
+ {"storage":
+ {"default": "local",
+ "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}})),
+ # PUT of new .services object
+ ('204 No Content', {}, '')]))
+ resp = Request.blank('/auth/v2/act/.services',
+ environ={
+ 'REQUEST_METHOD': 'POST'},
+ headers={
+ 'X-Auth-Admin-User':
+ '.super_admin',
+ 'X-Auth-Admin-Key': 'supertest'},
+ body=json.dumps(
+ {'storage': {'new_endpoint': 'new_value'}})
+ ).get_response(self.test_auth)
+ self.assertEquals(resp.status_int, 200)
+ self.assertEquals(
+ json.loads(resp.body),
+ {'storage': {'default': 'local',
+ 'local':
+ 'http://127.0.0.1:8080/v1/AUTH_cfa',
+ 'new_endpoint': 'new_value'}})
+ self.assertEquals(self.test_auth.app.calls, 2)
+
+ def test_set_services_update_endpoint(self):
+ self.test_auth.app = FakeApp(iter([
+ # GET of .services object
+ ('200 Ok', {}, json.dumps(
+ {"storage": {"default": "local",
+ "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}})),
+ # PUT of new .services object
+ ('204 No Content', {}, '')]))
+ resp = Request.blank('/auth/v2/act/.services',
+ environ={
+ 'REQUEST_METHOD': 'POST'},
+ headers={
+ 'X-Auth-Admin-User':
+ '.super_admin',
+ 'X-Auth-Admin-Key': 'supertest'},
+ body=json.dumps(
+ {'storage': {'local': 'new_value'}})
+ ).get_response(self.test_auth)
+ self.assertEquals(resp.status_int, 200)
+ self.assertEquals(json.loads(resp.body),
+ {'storage': {'default': 'local',
+ 'local': 'new_value'}})
+ self.assertEquals(self.test_auth.app.calls, 2)
+
+ def test_set_services_fail_bad_creds(self):
+ self.test_auth.app = FakeApp(iter([
+ # GET of user object
+ ('404 Not Found', {}, '')]))
+ resp = Request.blank('/auth/v2/act/.services',
+ environ={
+ 'REQUEST_METHOD': 'POST'},
+ headers={
+ 'X-Auth-Admin-User':
+ 'super:admin',
+ 'X-Auth-Admin-Key': 'supertest'},
+ body=json.dumps(
+ {'storage': {'local': 'new_value'}})
+ ).get_response(self.test_auth)
+ self.assertEquals(resp.status_int, 403)
+ self.assertEquals(self.test_auth.app.calls, 1)
+
+ self.test_auth.app = FakeApp(iter([
+ # GET of user object (account admin, but not reseller
+ # admin)
+ ('200 Ok', {}, json.dumps({"groups": [{"name": "act:adm"},
+ {"name": "test"}, {"name": ".admin"}],
+ "auth": "plaintext:key"}))]))
+ resp = Request.blank('/auth/v2/act/.services',
+ environ={
+ 'REQUEST_METHOD': 'POST'},
+ headers={
+ 'X-Auth-Admin-User':
+ 'act:adm',
+ 'X-Auth-Admin-Key': 'key'},
+ body=json.dumps(
+ {'storage': {'local': 'new_value'}})
+ ).get_response(self.test_auth)
+ self.assertEquals(resp.status_int, 403)
+ self.assertEquals(self.test_auth.app.calls, 1)
+
+ self.test_auth.app = FakeApp(iter([
+ # GET of user object (regular user)
+ ('200 Ok', {}, json.dumps({"groups": [{"name": "act:usr"},
+ {"name": "test"}], "auth": "plaintext:key"}))]))
+ resp = Request.blank('/auth/v2/act/.services',
+ environ={
+ 'REQUEST_METHOD': 'POST'},
+ headers={
+ 'X-Auth-Admin-User':
+ 'act:usr',
+ 'X-Auth-Admin-Key': 'key'},
+ body=json.dumps(
+ {'storage': {'local': 'new_value'}})
+ ).get_response(self.test_auth)
+ self.assertEquals(resp.status_int, 403)
+ self.assertEquals(self.test_auth.app.calls, 1)
+
+ def test_set_services_fail_bad_account_name(self):
+ resp = Request.blank('/auth/v2/.act/.services',
+ environ={
+ 'REQUEST_METHOD': 'POST'},
+ headers={
+ 'X-Auth-Admin-User':
+ '.super_admin',
+ 'X-Auth-Admin-Key': 'supertest'},
+ body=json.dumps(
+ {'storage': {'local': 'new_value'}})
+ ).get_response(self.test_auth)
+ self.assertEquals(resp.status_int, 400)
+
+ def test_set_services_fail_bad_json(self):
+ resp = Request.blank('/auth/v2/act/.services',
+ environ={
+ 'REQUEST_METHOD': 'POST'},
+ headers={
+ 'X-Auth-Admin-User':
+ '.super_admin',
+ 'X-Auth-Admin-Key': 'supertest'},
+ body='garbage'
+ ).get_response(self.test_auth)
+ self.assertEquals(resp.status_int, 400)
+ resp = Request.blank('/auth/v2/act/.services',
+ environ={
+ 'REQUEST_METHOD': 'POST'},
+ headers={
+ 'X-Auth-Admin-User':
+ '.super_admin',
+ 'X-Auth-Admin-Key': 'supertest'},
+ body=''
+ ).get_response(self.test_auth)
+ self.assertEquals(resp.status_int, 400)
+
+ def test_set_services_fail_get_services(self):
+ self.test_auth.app = FakeApp(iter([
+ # GET of .services object
+ ('503 Unavailable', {}, '')]))
+ resp = Request.blank('/auth/v2/act/.services',
+ environ={
+ 'REQUEST_METHOD': 'POST'},
+ headers={
+ 'X-Auth-Admin-User':
+ '.super_admin',
+ 'X-Auth-Admin-Key': 'supertest'},
+ body=json.dumps({
+ 'new_service': {'new_endpoint': 'new_value'}})
+ ).get_response(self.test_auth)
+ self.assertEquals(resp.status_int, 500)
+ self.assertEquals(self.test_auth.app.calls, 1)
+
+ self.test_auth.app = FakeApp(iter([
+ # GET of .services object
+ ('404 Not Found', {}, '')]))
+ resp = Request.blank('/auth/v2/act/.services',
+ environ={
+ 'REQUEST_METHOD': 'POST'},
+ headers={
+ 'X-Auth-Admin-User':
+ '.super_admin',
+ 'X-Auth-Admin-Key': 'supertest'},
+ body=json.dumps({
+ 'new_service': {'new_endpoint': 'new_value'}})
+ ).get_response(self.test_auth)
+ self.assertEquals(resp.status_int, 404)
+ self.assertEquals(self.test_auth.app.calls, 1)
+
+ def test_set_services_fail_put_services(self):
+ self.test_auth.app = FakeApp(iter([
+ # GET of .services object
+ ('200 Ok', {}, json.dumps(
+ {"storage": {"default": "local",
+ "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}})),
+ # PUT of new .services object
+ ('503 Unavailable', {}, '')]))
+ resp = Request.blank('/auth/v2/act/.services',
+ environ={
+ 'REQUEST_METHOD': 'POST'},
+ headers={
+ 'X-Auth-Admin-User':
+ '.super_admin',
+ 'X-Auth-Admin-Key': 'supertest'},
+ body=json.dumps(
+ {'new_service':
+ {'new_endpoint': 'new_value'}})
+ ).get_response(self.test_auth)
+ self.assertEquals(resp.status_int, 500)
+ self.assertEquals(self.test_auth.app.calls, 2)
+
+ def test_put_account_success(self):
+ conn = FakeConn(iter([
+ # PUT of storage account itself
+ ('201 Created', {}, '')]))
+ self.test_auth.get_conn = lambda: conn
+ self.test_auth.app = FakeApp(iter([
+ # Initial HEAD of account container to check for
+ # pre-existence
+ ('404 Not Found', {}, ''),
+ # PUT of account container
+ ('204 No Content', {}, ''),
+ # PUT of .account_id mapping object
+ ('204 No Content', {}, ''),
+ # PUT of .services object
+ ('204 No Content', {}, ''),
+ # POST to account container updating
+ # X-Container-Meta-Account-Id
+ ('204 No Content', {}, '')]))
+ resp = Request.blank(
+ '/auth/v2/act',
+ environ={
+ 'REQUEST_METHOD': 'PUT', 'swift.cache': FakeMemcache()},
+ headers={'X-Auth-Admin-User': '.super_admin',
+ 'X-Auth-Admin-Key': 'supertest'}
+ ).get_response(self.test_auth)
+ self.assertEquals(resp.status_int, 201)
+ self.assertEquals(self.test_auth.app.calls, 5)
+ self.assertEquals(conn.calls, 1)
+
+ def test_put_account_success_preexist_but_not_completed(
+ self):
+ conn = FakeConn(iter([
+ # PUT of storage account itself
+ ('201 Created', {}, '')]))
+ self.test_auth.get_conn = lambda: conn
+ self.test_auth.app = FakeApp(iter([
+ # Initial HEAD of account container to check for pre-existence
+ # We're going to show it as existing this time, but with no
+ # X-Container-Meta-Account-Id, indicating a failed
+ # previous attempt
+ ('200 Ok', {}, ''),
+ # PUT of .account_id mapping object
+ ('204 No Content', {}, ''),
+ # PUT of .services object
+ ('204 No Content', {}, ''),
+ # POST to account container updating
+ # X-Container-Meta-Account-Id
+ ('204 No Content', {}, '')]))
+ resp = Request.blank(
+ '/auth/v2/act',
+ environ={
+ 'REQUEST_METHOD': 'PUT',
+ 'swift.cache': FakeMemcache()},
+ headers={'X-Auth-Admin-User': '.super_admin',
+ 'X-Auth-Admin-Key': 'supertest'}
+ ).get_response(self.test_auth)
+ self.assertEquals(resp.status_int, 201)
+ self.assertEquals(self.test_auth.app.calls, 4)
+ self.assertEquals(conn.calls, 1)
+
+ def test_put_account_success_preexist_and_completed(
+ self):
+ self.test_auth.app = FakeApp(iter([
+ # Initial HEAD of account container to check for pre-existence
+ # We're going to show it as existing this time, and with an
+ # X-Container-Meta-Account-Id, indicating it already
+ # exists
+ ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, '')]))
+ resp = Request.blank(
+ '/auth/v2/act',
+ environ={
+ 'REQUEST_METHOD': 'PUT', 'swift.cache': FakeMemcache()},
+ headers={'X-Auth-Admin-User': '.super_admin',
+ 'X-Auth-Admin-Key': 'supertest'}
+ ).get_response(self.test_auth)
+ self.assertEquals(resp.status_int, 202)
+ self.assertEquals(self.test_auth.app.calls, 1)
+
+ def test_put_account_success_with_given_suffix(self):
+ conn = FakeConn(iter([
+ # PUT of storage account itself
+ ('201 Created', {}, '')]))
+ self.test_auth.get_conn = lambda: conn
+ self.test_auth.app = FakeApp(iter([
+ # Initial HEAD of account container to check for
+ # pre-existence
+ ('404 Not Found', {}, ''),
+ # PUT of account container
+ ('204 No Content', {}, ''),
+ # PUT of .account_id mapping object
+ ('204 No Content', {}, ''),
+ # PUT of .services object
+ ('204 No Content', {}, ''),
+ # POST to account container updating
+ # X-Container-Meta-Account-Id
+ ('204 No Content', {}, '')]))
+ resp = Request.blank(
+ '/auth/v2/act',
+ environ={
+ 'REQUEST_METHOD': 'PUT', 'swift.cache': FakeMemcache()},
+ headers={'X-Auth-Admin-User': '.super_admin',
+ 'X-Auth-Admin-Key': 'supertest',
+ 'X-Account-Suffix': 'test-suffix'}).get_response(
+ self.test_auth)
+ self.assertEquals(resp.status_int, 201)
+ self.assertEquals(
+ conn.request_path,
+ '/v1/AUTH_test-suffix')
+ self.assertEquals(self.test_auth.app.calls, 5)
+ self.assertEquals(conn.calls, 1)
+
+ def test_put_account_fail_bad_creds(self):
+ self.test_auth.app = FakeApp(iter([
+ # GET of user object
+ ('404 Not Found', {}, '')]))
+ resp = Request.blank(
+ '/auth/v2/act',
+ environ={'REQUEST_METHOD': 'PUT',
+ 'swift.cache': FakeMemcache()},
+ headers={'X-Auth-Admin-User': 'super:admin',
+ 'X-Auth-Admin-Key': 'supertest'},).get_response(
+ self.test_auth)
+ self.assertEquals(resp.status_int, 403)
+ self.assertEquals(self.test_auth.app.calls, 1)
+
+ self.test_auth.app = FakeApp(iter([
+ # GET of user object (account admin, but not reseller
+ # admin)
+ ('200 Ok', {}, json.dumps({"groups": [{"name": "act:adm"},
+ {"name": "test"}, {"name": ".admin"}],
+ "auth": "plaintext:key"}))]))
+ resp = Request.blank(
+ '/auth/v2/act',
+ environ={'REQUEST_METHOD': 'PUT',
+ 'swift.cache': FakeMemcache()},
+ headers={'X-Auth-Admin-User': 'act:adm',
+ 'X-Auth-Admin-Key': 'key'},).get_response(
+ self.test_auth)
+ self.assertEquals(resp.status_int, 403)
+ self.assertEquals(self.test_auth.app.calls, 1)
+
+ self.test_auth.app = FakeApp(iter([
+ # GET of user object (regular user)
+ ('200 Ok', {}, json.dumps({"groups": [{"name": "act:usr"},
+ {"name": "test"}], "auth": "plaintext:key"}))]))
+ resp = Request.blank(
+ '/auth/v2/act',
+ environ={'REQUEST_METHOD': 'PUT', 'swift.cache': FakeMemcache()},
+ headers={'X-Auth-Admin-User': 'act:usr',
+ 'X-Auth-Admin-Key': 'key'},).get_response(
+ self.test_auth)
+ self.assertEquals(resp.status_int, 403)
+ self.assertEquals(self.test_auth.app.calls, 1)
+
+ def test_put_account_fail_invalid_account_name(self):
+ resp = Request.blank(
+ '/auth/v2/.act',
+ environ={'REQUEST_METHOD': 'PUT', 'swift.cache': FakeMemcache()},
+ headers={'X-Auth-Admin-User': '.super_admin',
+ 'X-Auth-Admin-Key': 'supertest'},).get_response(
+ self.test_auth)
+ self.assertEquals(resp.status_int, 400)
+
+ def test_put_account_fail_on_initial_account_head(self):
+ self.test_auth.app = FakeApp(iter([
+ # Initial HEAD of account container to check for
+ # pre-existence
+ ('503 Service Unavailable', {}, '')]))
+ resp = Request.blank(
+ '/auth/v2/act',
+ environ={'REQUEST_METHOD': 'PUT',
+ 'swift.cache': FakeMemcache()},
+ headers={'X-Auth-Admin-User': '.super_admin',
+ 'X-Auth-Admin-Key': 'supertest'}).get_response(
+ self.test_auth)
+ self.assertEquals(resp.status_int, 500)
+ self.assertEquals(self.test_auth.app.calls, 1)
+
+ def test_put_account_fail_on_account_marker_put(self):
+ self.test_auth.app = FakeApp(iter([
+ # Initial HEAD of account container to check for
+ # pre-existence
+ ('404 Not Found', {}, ''),
+ # PUT of account container
+ ('503 Service Unavailable', {}, '')]))
+ resp = Request.blank(
+ '/auth/v2/act',
+ environ={'REQUEST_METHOD': 'PUT',
+ 'swift.cache': FakeMemcache()},
+ headers={'X-Auth-Admin-User': '.super_admin',
+ 'X-Auth-Admin-Key': 'supertest'}).get_response(
+ self.test_auth)
+ self.assertEquals(resp.status_int, 500)
+ self.assertEquals(self.test_auth.app.calls, 2)
+
+ def test_put_account_fail_on_storage_account_put(self):
+ conn = FakeConn(iter([
+ # PUT of storage account itself
+ ('503 Service Unavailable', {}, '')]))
+ self.test_auth.get_conn = lambda: conn
+ self.test_auth.app = FakeApp(iter([
+ # Initial HEAD of account container to check for
+ # pre-existence
+ ('404 Not Found', {}, ''),
+ # PUT of account container
+ ('204 No Content', {}, '')]))
+ resp = Request.blank(
+ '/auth/v2/act',
+ environ={'REQUEST_METHOD': 'PUT',
+ 'swift.cache': FakeMemcache()},
+ headers={'X-Auth-Admin-User': '.super_admin',
+ 'X-Auth-Admin-Key': 'supertest'}).get_response(
+ self.test_auth)
+ self.assertEquals(resp.status_int, 500)
+ self.assertEquals(conn.calls, 1)
+ self.assertEquals(self.test_auth.app.calls, 2)
+
+ def test_put_account_fail_on_account_id_mapping(self):
+ conn = FakeConn(iter([
+ # PUT of storage account itself
+ ('201 Created', {}, '')]))
+ self.test_auth.get_conn = lambda: conn
+ self.test_auth.app = FakeApp(iter([
+ # Initial HEAD of account container to check for
+ # pre-existence
+ ('404 Not Found', {}, ''),
+ # PUT of account container
+ ('204 No Content', {}, ''),
+ # PUT of .account_id mapping object
+ ('503 Service Unavailable', {}, '')]))
+ resp = Request.blank(
+ '/auth/v2/act',
+ environ={'REQUEST_METHOD': 'PUT',
+ 'swift.cache': FakeMemcache()},
+ headers={'X-Auth-Admin-User': '.super_admin',
+ 'X-Auth-Admin-Key': 'supertest'}).get_response(
+ self.test_auth)
+ self.assertEquals(resp.status_int, 500)
+ self.assertEquals(conn.calls, 1)
+ self.assertEquals(self.test_auth.app.calls, 3)
+
+ def test_put_account_fail_on_services_object(self):
+ conn = FakeConn(iter([
+ # PUT of storage account itself
+ ('201 Created', {}, '')]))
+ self.test_auth.get_conn = lambda: conn
+ self.test_auth.app = FakeApp(iter([
+ # Initial HEAD of account container to check for
+ # pre-existence
+ ('404 Not Found', {}, ''),
+ # PUT of account container
+ ('204 No Content', {}, ''),
+ # PUT of .account_id mapping object
+ ('204 No Content', {}, ''),
+ # PUT of .services object
+ ('503 Service Unavailable', {}, '')]))
+ resp = Request.blank(
+ '/auth/v2/act',
+ environ={'REQUEST_METHOD': 'PUT',
+ 'swift.cache': FakeMemcache()},
+ headers={'X-Auth-Admin-User': '.super_admin',
+ 'X-Auth-Admin-Key': 'supertest'}).get_response(
+ self.test_auth)
+ self.assertEquals(resp.status_int, 500)
+ self.assertEquals(conn.calls, 1)
+ self.assertEquals(self.test_auth.app.calls, 4)
+
+ def test_put_account_fail_on_post_mapping(self):
+ conn = FakeConn(iter([
+ # PUT of storage account itself
+ ('201 Created', {}, '')]))
+ self.test_auth.get_conn = lambda: conn
+ self.test_auth.app = FakeApp(iter([
+ # Initial HEAD of account container to check for
+ # pre-existence
+ ('404 Not Found', {}, ''),
+ # PUT of account container
+ ('204 No Content', {}, ''),
+ # PUT of .account_id mapping object
+ ('204 No Content', {}, ''),
+ # PUT of .services object
+ ('204 No Content', {}, ''),
+ # POST to account container updating
+ # X-Container-Meta-Account-Id
+ ('503 Service Unavailable', {}, '')]))
+ resp = Request.blank(
+ '/auth/v2/act',
+ environ={'REQUEST_METHOD': 'PUT', 'swift.cache': FakeMemcache()},
+ headers={'X-Auth-Admin-User': '.super_admin',
+ 'X-Auth-Admin-Key': 'supertest'}).get_response(
+ self.test_auth)
+ self.assertEquals(resp.status_int, 500)
+ self.assertEquals(conn.calls, 1)
+ self.assertEquals(self.test_auth.app.calls, 5)
+
+ def test_delete_account_success(self):
+ conn = FakeConn(iter([
+ # DELETE of storage account itself
+ ('204 No Content', {}, '')]))
+ self.test_auth.get_conn = lambda x: conn
+ self.test_auth.app = FakeApp(iter([
+ # Account's container listing, checking for
+ # users
+ ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'},
+ json.dumps([
+ {"name": ".services", "hash": "etag", "bytes": 112,
+ "content_type":
+ "application/octet-stream",
+ "last_modified": "2010-12-03T17:16:27.618110"}])),
+ # Account's container listing, checking for users
+ # (continuation)
+ ('200 Ok',
+ {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, '[]'),
+ # GET the .services object
+ ('200 Ok', {}, json.dumps(
+ {"storage": {"default": "local",
+ "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}})),
+ # DELETE the .services object
+ ('204 No Content', {}, ''),
+ # DELETE the .account_id mapping object
+ ('204 No Content', {}, ''),
+ # DELETE the account container
+ ('204 No Content', {}, '')]))
+ resp = Request.blank('/auth/v2/act',
+ environ={
+ 'REQUEST_METHOD': 'DELETE',
+ 'swift.cache': FakeMemcache()},
+ headers={
+ 'X-Auth-Admin-User':
+ '.super_admin',
+ 'X-Auth-Admin-Key': 'supertest'}
+ ).get_response(self.test_auth)
+ self.assertEquals(resp.status_int, 204)
+ self.assertEquals(self.test_auth.app.calls, 6)
+ self.assertEquals(conn.calls, 1)
+
+ def test_delete_account_success_missing_services(self):
+ self.test_auth.app = FakeApp(iter([
+ # Account's container listing, checking for
+ # users
+ ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'},
+ json.dumps([
+ {"name": ".services", "hash": "etag", "bytes": 112,
+ "content_type":
+ "application/octet-stream",
+ "last_modified": "2010-12-03T17:16:27.618110"}])),
+ # Account's container listing, checking for users
+ # (continuation)
+ ('200 Ok',
+ {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, '[]'),
+ # GET the .services object
+ ('404 Not Found', {}, ''),
+ # DELETE the .account_id mapping object
+ ('204 No Content', {}, ''),
+ # DELETE the account container
+ ('204 No Content', {}, '')]))
+ resp = Request.blank('/auth/v2/act',
+ environ={
+ 'REQUEST_METHOD': 'DELETE',
+ 'swift.cache': FakeMemcache()},
+ headers={
+ 'X-Auth-Admin-User':
+ '.super_admin',
+ 'X-Auth-Admin-Key': 'supertest'}
+ ).get_response(self.test_auth)
+ self.assertEquals(resp.status_int, 204)
+ self.assertEquals(self.test_auth.app.calls, 5)
+
+ def test_delete_account_success_missing_storage_account(
+ self):
+ conn = FakeConn(iter([
+ # DELETE of storage account itself
+ ('404 Not Found', {}, '')]))
+ self.test_auth.get_conn = lambda x: conn
+ self.test_auth.app = FakeApp(iter([
+ # Account's container listing, checking for
+ # users
+ ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'},
+ json.dumps([
+ {"name": ".services", "hash": "etag", "bytes": 112,
+ "content_type":
+ "application/octet-stream",
+ "last_modified": "2010-12-03T17:16:27.618110"}])),
+ # Account's container listing, checking for users
+ # (continuation)
+ ('200 Ok',
+ {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, '[]'),
+ # GET the .services object
+ ('200 Ok', {}, json.dumps(
+ {"storage": {"default": "local",
+ "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}})),
+ # DELETE the .services object
+ ('204 No Content', {}, ''),
+ # DELETE the .account_id mapping object
+ ('204 No Content', {}, ''),
+ # DELETE the account container
+ ('204 No Content', {}, '')]))
+ resp = Request.blank('/auth/v2/act',
+ environ={
+ 'REQUEST_METHOD': 'DELETE',
+ 'swift.cache': FakeMemcache()},
+ headers={
+ 'X-Auth-Admin-User':
+ '.super_admin',
+ 'X-Auth-Admin-Key': 'supertest'}
+ ).get_response(self.test_auth)
+ self.assertEquals(resp.status_int, 204)
+ self.assertEquals(self.test_auth.app.calls, 6)
+ self.assertEquals(conn.calls, 1)
+
+ def test_delete_account_success_missing_account_id_mapping(
+ self):
+ conn = FakeConn(iter([
+ # DELETE of storage account itself
+ ('204 No Content', {}, '')]))
+ self.test_auth.get_conn = lambda x: conn
+ self.test_auth.app = FakeApp(iter([
+ # Account's container listing, checking for
+ # users
+ ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'},
+ json.dumps([
+ {"name": ".services", "hash": "etag", "bytes": 112,
+ "content_type":
+ "application/octet-stream",
+ "last_modified": "2010-12-03T17:16:27.618110"}])),
+ # Account's container listing, checking for users
+ # (continuation)
+ ('200 Ok',
+ {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, '[]'),
+ # GET the .services object
+ ('200 Ok', {}, json.dumps(
+ {"storage": {"default": "local",
+ "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}})),
+ # DELETE the .services object
+ ('204 No Content', {}, ''),
+ # DELETE the .account_id mapping object
+ ('404 Not Found', {}, ''),
+ # DELETE the account container
+ ('204 No Content', {}, '')]))
+ resp = Request.blank('/auth/v2/act',
+ environ={
+ 'REQUEST_METHOD': 'DELETE',
+ 'swift.cache': FakeMemcache()},
+ headers={
+ 'X-Auth-Admin-User':
+ '.super_admin',
+ 'X-Auth-Admin-Key': 'supertest'}
+ ).get_response(self.test_auth)
+ self.assertEquals(resp.status_int, 204)
+ self.assertEquals(self.test_auth.app.calls, 6)
+ self.assertEquals(conn.calls, 1)
+
+ def test_delete_account_success_missing_account_container_at_end(
+ self):
+ conn = FakeConn(iter([
+ # DELETE of storage account itself
+ ('204 No Content', {}, '')]))
+ self.test_auth.get_conn = lambda x: conn
+ self.test_auth.app = FakeApp(iter([
+ # Account's container listing, checking for
+ # users
+ ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'},
+ json.dumps([
+ {"name": ".services", "hash": "etag", "bytes": 112,
+ "content_type":
+ "application/octet-stream",
+ "last_modified": "2010-12-03T17:16:27.618110"}])),
+ # Account's container listing, checking for users
+ # (continuation)
+ ('200 Ok',
+ {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, '[]'),
+ # GET the .services object
+ ('200 Ok', {}, json.dumps(
+ {"storage": {"default": "local",
+ "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}})),
+ # DELETE the .services object
+ ('204 No Content', {}, ''),
+ # DELETE the .account_id mapping object
+ ('204 No Content', {}, ''),
+ # DELETE the account container
+ ('404 Not Found', {}, '')]))
+ resp = Request.blank('/auth/v2/act',
+ environ={
+ 'REQUEST_METHOD': 'DELETE',
+ 'swift.cache': FakeMemcache()},
+ headers={
+ 'X-Auth-Admin-User':
+ '.super_admin',
+ 'X-Auth-Admin-Key': 'supertest'}
+ ).get_response(self.test_auth)
+ self.assertEquals(resp.status_int, 204)
+ self.assertEquals(self.test_auth.app.calls, 6)
+ self.assertEquals(conn.calls, 1)
+
+ def test_delete_account_fail_bad_creds(self):
+ self.test_auth.app = FakeApp(iter([
+ # GET of user object
+ ('404 Not Found', {}, '')]))
+ resp = Request.blank('/auth/v2/act',
+ environ={
+ 'REQUEST_METHOD': 'DELETE',
+ 'swift.cache': FakeMemcache()},
+ headers={
+ 'X-Auth-Admin-User':
+ 'super:admin',
+ 'X-Auth-Admin-Key': 'supertest'},
+ ).get_response(self.test_auth)
+ self.assertEquals(resp.status_int, 403)
+ self.assertEquals(self.test_auth.app.calls, 1)
+
+ self.test_auth.app = FakeApp(iter([
+ # GET of user object (account admin, but not reseller
+ # admin)
+ ('200 Ok', {}, json.dumps({"groups": [{"name": "act:adm"},
+ {"name": "test"}, {"name": ".admin"}],
+ "auth": "plaintext:key"}))]))
+ resp = Request.blank('/auth/v2/act',
+ environ={
+ 'REQUEST_METHOD': 'DELETE',
+ 'swift.cache': FakeMemcache()},
+ headers={
+ 'X-Auth-Admin-User':
+ 'act:adm',
+ 'X-Auth-Admin-Key': 'key'},
+ ).get_response(self.test_auth)
+ self.assertEquals(resp.status_int, 403)
+ self.assertEquals(self.test_auth.app.calls, 1)
+
+ self.test_auth.app = FakeApp(iter([
+ # GET of user object (regular user)
+ ('200 Ok', {}, json.dumps({"groups": [{"name": "act:usr"},
+ {"name": "test"}], "auth": "plaintext:key"}))]))
+ resp = Request.blank('/auth/v2/act',
+ environ={
+ 'REQUEST_METHOD': 'DELETE',
+ 'swift.cache': FakeMemcache()},
+ headers={
+ 'X-Auth-Admin-User':
+ 'act:usr',
+ 'X-Auth-Admin-Key': 'key'},
+ ).get_response(self.test_auth)
+ self.assertEquals(resp.status_int, 403)
+ self.assertEquals(self.test_auth.app.calls, 1)
+
+ def test_delete_account_fail_invalid_account_name(self):
+ resp = Request.blank('/auth/v2/.act',
+ environ={
+ 'REQUEST_METHOD': 'DELETE'},
+ headers={
+ 'X-Auth-Admin-User':
+ '.super_admin',
+ 'X-Auth-Admin-Key': 'supertest'}
+ ).get_response(self.test_auth)
+ self.assertEquals(resp.status_int, 400)
+
+ def test_delete_account_fail_not_found(self):
+ self.test_auth.app = FakeApp(iter([
+ # Account's container listing, checking for
+ # users
+ ('404 Not Found', {}, '')]))
+ resp = Request.blank('/auth/v2/act',
+ environ={
+ 'REQUEST_METHOD': 'DELETE',
+ 'swift.cache': FakeMemcache()},
+ headers={
+ 'X-Auth-Admin-User':
+ '.super_admin',
+ 'X-Auth-Admin-Key': 'supertest'}
+ ).get_response(self.test_auth)
+ self.assertEquals(resp.status_int, 404)
+ self.assertEquals(self.test_auth.app.calls, 1)
+
+ def test_delete_account_fail_not_found_concurrency(
+ self):
+ self.test_auth.app = FakeApp(iter([
+ # Account's container listing, checking for
+ # users
+ ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'},
+ json.dumps([
+ {"name": ".services", "hash": "etag", "bytes": 112,
+ "content_type":
+ "application/octet-stream",
+ "last_modified": "2010-12-03T17:16:27.618110"}])),
+ # Account's container listing, checking for users
+ # (continuation)
+ ('404 Not Found', {}, '')]))
+ resp = Request.blank('/auth/v2/act',
+ environ={
+ 'REQUEST_METHOD': 'DELETE',
+ 'swift.cache': FakeMemcache()},
+ headers={
+ 'X-Auth-Admin-User':
+ '.super_admin',
+ 'X-Auth-Admin-Key': 'supertest'}
+ ).get_response(self.test_auth)
+ self.assertEquals(resp.status_int, 404)
+ self.assertEquals(self.test_auth.app.calls, 2)
+
+ def test_delete_account_fail_list_account(self):
+ self.test_auth.app = FakeApp(iter([
+ # Account's container listing, checking for
+ # users
+ ('503 Service Unavailable', {}, '')]))
+ resp = Request.blank('/auth/v2/act',
+ environ={
+ 'REQUEST_METHOD': 'DELETE',
+ 'swift.cache': FakeMemcache()},
+ headers={
+ 'X-Auth-Admin-User':
+ '.super_admin',
+ 'X-Auth-Admin-Key': 'supertest'}
+ ).get_response(self.test_auth)
+ self.assertEquals(resp.status_int, 500)
+ self.assertEquals(self.test_auth.app.calls, 1)
+
+ def test_delete_account_fail_list_account_concurrency(
+ self):
+ self.test_auth.app = FakeApp(iter([
+ # Account's container listing, checking for
+ # users
+ ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'},
+ json.dumps([
+ {"name": ".services", "hash": "etag", "bytes": 112,
+ "content_type":
+ "application/octet-stream",
+ "last_modified": "2010-12-03T17:16:27.618110"}])),
+ # Account's container listing, checking for users
+ # (continuation)
+ ('503 Service Unavailable', {}, '')]))
+ resp = Request.blank('/auth/v2/act',
+ environ={
+ 'REQUEST_METHOD': 'DELETE',
+ 'swift.cache': FakeMemcache()},
+ headers={
+ 'X-Auth-Admin-User':
+ '.super_admin',
+ 'X-Auth-Admin-Key': 'supertest'}
+ ).get_response(self.test_auth)
+ self.assertEquals(resp.status_int, 500)
+ self.assertEquals(self.test_auth.app.calls, 2)
+
+ def test_delete_account_fail_has_users(self):
+ self.test_auth.app = FakeApp(iter([
+ # Account's container listing, checking for
+ # users
+ ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'},
+ json.dumps([
+ {"name": ".services", "hash": "etag", "bytes": 112,
+ "content_type":
+ "application/octet-stream",
+ "last_modified": "2010-12-03T17:16:27.618110"},
+ {"name": "tester", "hash": "etag", "bytes": 104,
+ "content_type":
+ "application/octet-stream",
+ "last_modified": "2010-12-03T17:16:27.736680"}]))]))
+ resp = Request.blank('/auth/v2/act',
+ environ={
+ 'REQUEST_METHOD': 'DELETE',
+ 'swift.cache': FakeMemcache()},
+ headers={
+ 'X-Auth-Admin-User':
+ '.super_admin',
+ 'X-Auth-Admin-Key': 'supertest'}
+ ).get_response(self.test_auth)
+ self.assertEquals(resp.status_int, 409)
+ self.assertEquals(self.test_auth.app.calls, 1)
+
+ def test_delete_account_fail_has_users2(self):
+ self.test_auth.app = FakeApp(iter([
+ # Account's container listing, checking for
+ # users
+ ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'},
+ json.dumps([
+ {"name": ".services", "hash": "etag", "bytes": 112,
+ "content_type":
+ "application/octet-stream",
+ "last_modified": "2010-12-03T17:16:27.618110"}])),
+ # Account's container listing, checking for users
+ # (continuation)
+ ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'},
+ json.dumps([
+ {"name": "tester", "hash": "etag", "bytes": 104,
+ "content_type":
+ "application/octet-stream",
+ "last_modified": "2010-12-03T17:16:27.736680"}]))]))
+ resp = Request.blank('/auth/v2/act',
+ environ={
+ 'REQUEST_METHOD': 'DELETE',
+ 'swift.cache': FakeMemcache()},
+ headers={
+ 'X-Auth-Admin-User':
+ '.super_admin',
+ 'X-Auth-Admin-Key': 'supertest'}
+ ).get_response(self.test_auth)
+ self.assertEquals(resp.status_int, 409)
+ self.assertEquals(self.test_auth.app.calls, 2)
+
+ def test_delete_account_fail_get_services(self):
+ self.test_auth.app = FakeApp(iter([
+ # Account's container listing, checking for
+ # users
+ ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'},
+ json.dumps([
+ {"name": ".services", "hash": "etag", "bytes": 112,
+ "content_type":
+ "application/octet-stream",
+ "last_modified": "2010-12-03T17:16:27.618110"}])),
+ # Account's container listing, checking for users
+ # (continuation)
+ ('200 Ok',
+ {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, '[]'),
+ # GET the .services object
+ ('503 Service Unavailable', {}, '')]))
+ resp = Request.blank('/auth/v2/act',
+ environ={
+ 'REQUEST_METHOD': 'DELETE',
+ 'swift.cache': FakeMemcache()},
+ headers={
+ 'X-Auth-Admin-User':
+ '.super_admin',
+ 'X-Auth-Admin-Key': 'supertest'}
+ ).get_response(self.test_auth)
+ self.assertEquals(resp.status_int, 500)
+ self.assertEquals(self.test_auth.app.calls, 3)
+
+ def test_delete_account_fail_delete_storage_account(
+ self):
+ conn = FakeConn(iter([
+ # DELETE of storage account itself
+ ('409 Conflict', {}, '')]))
+ self.test_auth.get_conn = lambda x: conn
+ self.test_auth.app = FakeApp(iter([
+ # Account's container listing, checking for
+ # users
+ ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'},
+ json.dumps([
+ {"name": ".services", "hash": "etag", "bytes": 112,
+ "content_type":
+ "application/octet-stream",
+ "last_modified": "2010-12-03T17:16:27.618110"}])),
+ # Account's container listing, checking for users
+ # (continuation)
+ ('200 Ok',
+ {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, '[]'),
+ # GET the .services object
+ ('200 Ok', {}, json.dumps(
+ {"storage": {"default": "local",
+ "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}}))]))
+ resp = Request.blank('/auth/v2/act',
+ environ={
+ 'REQUEST_METHOD': 'DELETE',
+ 'swift.cache': FakeMemcache()},
+ headers={
+ 'X-Auth-Admin-User':
+ '.super_admin',
+ 'X-Auth-Admin-Key': 'supertest'}
+ ).get_response(self.test_auth)
+ self.assertEquals(resp.status_int, 409)
+ self.assertEquals(self.test_auth.app.calls, 3)
+ self.assertEquals(conn.calls, 1)
+
+ def test_delete_account_fail_delete_storage_account2(
+ self):
+ conn = FakeConn(iter([
+ # DELETE of storage account itself
+ ('204 No Content', {}, ''),
+ # DELETE of storage account itself
+ ('409 Conflict', {}, '')]))
+ self.test_auth.get_conn = lambda x: conn
+ self.test_auth.app = FakeApp(iter([
+ # Account's container listing, checking for
+ # users
+ ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'},
+ json.dumps([
+ {"name": ".services", "hash": "etag", "bytes": 112,
+ "content_type":
+ "application/octet-stream",
+ "last_modified": "2010-12-03T17:16:27.618110"}])),
+ # Account's container listing, checking for users
+ # (continuation)
+ ('200 Ok',
+ {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, '[]'),
+ # GET the .services object
+ ('200 Ok', {}, json.dumps(
+ {"storage": {"default": "local",
+ "local": "http://127.0.0.1:8080/v1/AUTH_cfa",
+ "other": "http://127.0.0.1:8080/v1/AUTH_cfa2"}}))]))
+ resp = Request.blank('/auth/v2/act',
+ environ={
+ 'REQUEST_METHOD': 'DELETE',
+ 'swift.cache': FakeMemcache()},
+ headers={
+ 'X-Auth-Admin-User':
+ '.super_admin',
+ 'X-Auth-Admin-Key': 'supertest'}
+ ).get_response(self.test_auth)
+ self.assertEquals(resp.status_int, 500)
+ self.assertEquals(self.test_auth.app.calls, 3)
+ self.assertEquals(conn.calls, 2)
+
+ def test_delete_account_fail_delete_storage_account3(
+ self):
+ conn = FakeConn(iter([
+ # DELETE of storage account itself
+ ('503 Service Unavailable', {}, '')]))
+ self.test_auth.get_conn = lambda x: conn
+ self.test_auth.app = FakeApp(iter([
+ # Account's container listing, checking for
+ # users
+ ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'},
+ json.dumps([
+ {"name": ".services", "hash": "etag", "bytes": 112,
+ "content_type":
+ "application/octet-stream",
+ "last_modified": "2010-12-03T17:16:27.618110"}])),
+ # Account's container listing, checking for users
+ # (continuation)
+ ('200 Ok',
+ {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, '[]'),
+ # GET the .services object
+ ('200 Ok', {}, json.dumps(
+ {"storage": {"default": "local",
+ "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}}))]))
+ resp = Request.blank('/auth/v2/act',
+ environ={
+ 'REQUEST_METHOD': 'DELETE',
+ 'swift.cache': FakeMemcache()},
+ headers={
+ 'X-Auth-Admin-User':
+ '.super_admin',
+ 'X-Auth-Admin-Key': 'supertest'}
+ ).get_response(self.test_auth)
+ self.assertEquals(resp.status_int, 500)
+ self.assertEquals(self.test_auth.app.calls, 3)
+ self.assertEquals(conn.calls, 1)
+
+ def test_delete_account_fail_delete_storage_account4(
+ self):
+ conn = FakeConn(iter([
+ # DELETE of storage account itself
+ ('204 No Content', {}, ''),
+ # DELETE of storage account itself
+ ('503 Service Unavailable', {}, '')]))
+ self.test_auth.get_conn = lambda x: conn
+ self.test_auth.app = FakeApp(iter([
+ # Account's container listing, checking for
+ # users
+ ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'},
+ json.dumps([
+ {"name": ".services", "hash": "etag", "bytes": 112,
+ "content_type":
+ "application/octet-stream",
+ "last_modified": "2010-12-03T17:16:27.618110"}])),
+ # Account's container listing, checking for users
+ # (continuation)
+ ('200 Ok',
+ {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, '[]'),
+ # GET the .services object
+ ('200 Ok', {}, json.dumps(
+ {"storage": {"default": "local",
+ "local": "http://127.0.0.1:8080/v1/AUTH_cfa",
+ "other": "http://127.0.0.1:8080/v1/AUTH_cfa2"}}))]))
+ resp = Request.blank('/auth/v2/act',
+ environ={
+ 'REQUEST_METHOD': 'DELETE',
+ 'swift.cache': FakeMemcache()},
+ headers={
+ 'X-Auth-Admin-User':
+ '.super_admin',
+ 'X-Auth-Admin-Key': 'supertest'}
+ ).get_response(self.test_auth)
+ self.assertEquals(resp.status_int, 500)
+ self.assertEquals(self.test_auth.app.calls, 3)
+ self.assertEquals(conn.calls, 2)
+
+ def test_delete_account_fail_delete_services(self):
+ conn = FakeConn(iter([
+ # DELETE of storage account itself
+ ('204 No Content', {}, '')]))
+ self.test_auth.get_conn = lambda x: conn
+ self.test_auth.app = FakeApp(iter([
+ # Account's container listing, checking for
+ # users
+ ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'},
+ json.dumps([
+ {"name": ".services", "hash": "etag", "bytes": 112,
+ "content_type":
+ "application/octet-stream",
+ "last_modified": "2010-12-03T17:16:27.618110"}])),
+ # Account's container listing, checking for users
+ # (continuation)
+ ('200 Ok',
+ {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, '[]'),
+ # GET the .services object
+ ('200 Ok', {}, json.dumps(
+ {"storage": {"default": "local",
+ "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}})),
+ # DELETE the .services object
+ ('503 Service Unavailable', {}, '')]))
+ resp = Request.blank('/auth/v2/act',
+ environ={
+ 'REQUEST_METHOD': 'DELETE',
+ 'swift.cache': FakeMemcache()},
+ headers={
+ 'X-Auth-Admin-User':
+ '.super_admin',
+ 'X-Auth-Admin-Key': 'supertest'}
+ ).get_response(self.test_auth)
+ self.assertEquals(resp.status_int, 500)
+ self.assertEquals(self.test_auth.app.calls, 4)
+ self.assertEquals(conn.calls, 1)
+
+ def test_delete_account_fail_delete_account_id_mapping(
+ self):
+ conn = FakeConn(iter([
+ # DELETE of storage account itself
+ ('204 No Content', {}, '')]))
+ self.test_auth.get_conn = lambda x: conn
+ self.test_auth.app = FakeApp(iter([
+ # Account's container listing, checking for
+ # users
+ ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'},
+ json.dumps([
+ {"name": ".services", "hash": "etag", "bytes": 112,
+ "content_type":
+ "application/octet-stream",
+ "last_modified": "2010-12-03T17:16:27.618110"}])),
+ # Account's container listing, checking for users
+ # (continuation)
+ ('200 Ok',
+ {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, '[]'),
+ # GET the .services object
+ ('200 Ok', {}, json.dumps(
+ {"storage": {"default": "local",
+ "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}})),
+ # DELETE the .services object
+ ('204 No Content', {}, ''),
+ # DELETE the .account_id mapping object
+ ('503 Service Unavailable', {}, '')]))
+ resp = Request.blank('/auth/v2/act',
+ environ={
+ 'REQUEST_METHOD': 'DELETE',
+ 'swift.cache': FakeMemcache()},
+ headers={
+ 'X-Auth-Admin-User':
+ '.super_admin',
+ 'X-Auth-Admin-Key': 'supertest'}
+ ).get_response(self.test_auth)
+ self.assertEquals(resp.status_int, 500)
+ self.assertEquals(self.test_auth.app.calls, 5)
+ self.assertEquals(conn.calls, 1)
+
+ def test_delete_account_fail_delete_account_container(
+ self):
+ conn = FakeConn(iter([
+ # DELETE of storage account itself
+ ('204 No Content', {}, '')]))
+ self.test_auth.get_conn = lambda x: conn
+ self.test_auth.app = FakeApp(iter([
+ # Account's container listing, checking for
+ # users
+ ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'},
+ json.dumps([
+ {"name": ".services", "hash": "etag", "bytes": 112,
+ "content_type":
+ "application/octet-stream",
+ "last_modified": "2010-12-03T17:16:27.618110"}])),
+ # Account's container listing, checking for users
+ # (continuation)
+ ('200 Ok',
+ {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, '[]'),
+ # GET the .services object
+ ('200 Ok', {}, json.dumps(
+ {"storage": {"default": "local",
+ "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}})),
+ # DELETE the .services object
+ ('204 No Content', {}, ''),
+ # DELETE the .account_id mapping object
+ ('204 No Content', {}, ''),
+ # DELETE the account container
+ ('503 Service Unavailable', {}, '')]))
+ resp = Request.blank('/auth/v2/act',
+ environ={
+ 'REQUEST_METHOD': 'DELETE',
+ 'swift.cache': FakeMemcache()},
+ headers={
+ 'X-Auth-Admin-User':
+ '.super_admin',
+ 'X-Auth-Admin-Key': 'supertest'}
+ ).get_response(self.test_auth)
+ self.assertEquals(resp.status_int, 500)
+ self.assertEquals(self.test_auth.app.calls, 6)
+ self.assertEquals(conn.calls, 1)
+
+ def test_get_user_success(self):
+ self.test_auth.app = FakeApp(iter([
+ # GET of user object
+ ('200 Ok', {}, json.dumps(
+ {"groups": [{"name": "act:usr"}, {"name": "act"},
+ {"name": ".admin"}],
+ "auth": "plaintext:key"}))]))
+ resp = Request.blank('/auth/v2/act/usr',
+ headers={
+ 'X-Auth-Admin-User':
+ '.super_admin',
+ 'X-Auth-Admin-Key': 'supertest'}
+ ).get_response(self.test_auth)
+ self.assertEquals(resp.status_int, 200)
+ self.assertEquals(resp.body, json.dumps(
+ {"groups": [{"name": "act:usr"}, {"name": "act"},
+ {"name": ".admin"}],
+ "auth": "plaintext:key"}))
+ self.assertEquals(self.test_auth.app.calls, 1)
+
+ def test_get_user_fail_no_super_admin_key(self):
+ local_auth = auth.filter_factory({})(FakeApp(iter([
+ # GET of user object (but we should never get
+ # here)
+ ('200 Ok', {}, json.dumps(
+ {"groups": [{"name": "act:usr"}, {"name": "act"},
+ {"name": ".admin"}],
+ "auth": "plaintext:key"}))])))
+ resp = Request.blank('/auth/v2/act/usr',
+ headers={
+ 'X-Auth-Admin-User':
+ '.super_admin',
+ 'X-Auth-Admin-Key': 'supertest'}
+ ).get_response(local_auth)
+ self.assertEquals(resp.status_int, 404)
+ self.assertEquals(local_auth.app.calls, 0)
+
+ def test_get_user_groups_success(self):
+ self.test_auth.app = FakeApp(iter([
+ # GET of account container (list objects)
+ ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'},
+ json.dumps([
+ {"name": ".services", "hash": "etag", "bytes": 112,
+ "content_type":
+ "application/octet-stream",
+ "last_modified": "2010-12-03T17:16:27.618110"},
+ {"name": "tester", "hash": "etag", "bytes": 104,
+ "content_type":
+ "application/octet-stream",
+ "last_modified": "2010-12-03T17:16:27.736680"},
+ {"name": "tester3", "hash": "etag", "bytes": 86,
+ "content_type":
+ "application/octet-stream",
+ "last_modified": "2010-12-03T17:16:28.135530"}])),
+ # GET of user object
+ ('200 Ok', {}, json.dumps(
+ {"groups": [{"name": "act:tester"}, {"name": "act"},
+ {"name": ".admin"}],
+ "auth": "plaintext:key"})),
+ # GET of user object
+ ('200 Ok', {}, json.dumps(
+ {"groups": [{"name": "act:tester3"}, {"name": "act"}],
+ "auth": "plaintext:key3"})),
+ # GET of account container (list objects
+ # continuation)
+ ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, '[]')]))
+ resp = Request.blank('/auth/v2/act/.groups',
+ headers={
+ 'X-Auth-Admin-User':
+ '.super_admin',
+ 'X-Auth-Admin-Key': 'supertest'}
+ ).get_response(self.test_auth)
+ self.assertEquals(resp.status_int, 200)
+ self.assertEquals(resp.body, json.dumps(
+ {"groups": [{"name": ".admin"}, {"name": "act"},
+ {"name": "act:tester"}, {"name": "act:tester3"}]}))
+ self.assertEquals(self.test_auth.app.calls, 4)
+
+ def test_get_user_groups_success2(self):
+ self.test_auth.app = FakeApp(iter([
+ # GET of account container (list objects)
+ ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'},
+ json.dumps([
+ {"name": ".services", "hash": "etag", "bytes": 112,
+ "content_type":
+ "application/octet-stream",
+ "last_modified": "2010-12-03T17:16:27.618110"},
+ {"name": "tester", "hash": "etag", "bytes": 104,
+ "content_type":
+ "application/octet-stream",
+ "last_modified": "2010-12-03T17:16:27.736680"}])),
+ # GET of user object
+ ('200 Ok', {}, json.dumps(
+ {"groups": [{"name": "act:tester"}, {"name": "act"},
+ {"name": ".admin"}],
+ "auth": "plaintext:key"})),
+ # GET of account container (list objects
+ # continuation)
+ ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'},
+ json.dumps([
+ {"name": "tester3", "hash": "etag", "bytes": 86,
+ "content_type":
+ "application/octet-stream",
+ "last_modified": "2010-12-03T17:16:28.135530"}])),
+ # GET of user object
+ ('200 Ok', {}, json.dumps(
+ {"groups": [{"name": "act:tester3"}, {"name": "act"}],
+ "auth": "plaintext:key3"})),
+ # GET of account container (list objects
+ # continuation)
+ ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, '[]')]))
+ resp = Request.blank('/auth/v2/act/.groups',
+ headers={
+ 'X-Auth-Admin-User':
+ '.super_admin',
+ 'X-Auth-Admin-Key': 'supertest'}
+ ).get_response(self.test_auth)
+ self.assertEquals(resp.status_int, 200)
+ self.assertEquals(resp.body, json.dumps(
+ {"groups": [{"name": ".admin"}, {"name": "act"},
+ {"name": "act:tester"}, {"name": "act:tester3"}]}))
+ self.assertEquals(self.test_auth.app.calls, 5)
+
+ def test_get_user_fail_invalid_account(self):
+ resp = Request.blank('/auth/v2/.invalid/usr',
+ headers={
+ 'X-Auth-Admin-User':
+ '.super_admin',
+ 'X-Auth-Admin-Key': 'supertest'}
+ ).get_response(self.test_auth)
+ self.assertEquals(resp.status_int, 400)
+
+ def test_get_user_fail_invalid_user(self):
+ resp = Request.blank('/auth/v2/act/.invalid',
+ headers={
+ 'X-Auth-Admin-User':
+ '.super_admin',
+ 'X-Auth-Admin-Key': 'supertest'}
+ ).get_response(self.test_auth)
+ self.assertEquals(resp.status_int, 400)
+
+ def test_get_user_fail_bad_creds(self):
+ self.test_auth.app = FakeApp(iter([
+ # GET of user object
+ ('404 Not Found', {}, '')]))
+ resp = Request.blank('/auth/v2/act/usr',
+ headers={
+ 'X-Auth-Admin-User':
+ 'super:admin',
+ 'X-Auth-Admin-Key': 'supertest'},
+ ).get_response(self.test_auth)
+ self.assertEquals(resp.status_int, 403)
+ self.assertEquals(self.test_auth.app.calls, 1)
+
+ self.test_auth.app = FakeApp(iter([
+ # GET of user object (regular user)
+ ('200 Ok', {}, json.dumps({"groups": [{"name": "act:usr"},
+ {"name": "test"}], "auth": "plaintext:key"}))]))
+ resp = Request.blank('/auth/v2/act/usr',
+ headers={
+ 'X-Auth-Admin-User':
+ 'act:usr',
+ 'X-Auth-Admin-Key': 'key'},
+ ).get_response(self.test_auth)
+ self.assertEquals(resp.status_int, 403)
+ self.assertEquals(self.test_auth.app.calls, 1)
+
+ def test_get_user_account_admin_success(self):
+ self.test_auth.app = FakeApp(iter([
+ # GET of user object (account admin, but not reseller
+ # admin)
+ ('200 Ok', {}, json.dumps({"groups": [{"name": "act:adm"},
+ {"name": "test"}, {"name": ".admin"}],
+ "auth": "plaintext:key"})),
+ # GET of requested user object
+ ('200 Ok', {}, json.dumps(
+ {"groups": [{"name": "act:usr"}, {"name": "act"}],
+ "auth": "plaintext:key"}))]))
+ resp = Request.blank('/auth/v2/act/usr',
+ headers={
+ 'X-Auth-Admin-User':
+ 'act:adm',
+ 'X-Auth-Admin-Key': 'key'}
+ ).get_response(self.test_auth)
+ self.assertEquals(resp.status_int, 200)
+ self.assertEquals(resp.body, json.dumps(
+ {"groups": [{"name": "act:usr"}, {"name": "act"}],
+ "auth": "plaintext:key"}))
+ self.assertEquals(self.test_auth.app.calls, 2)
+
+ def test_get_user_account_admin_fail_getting_account_admin(
+ self):
+ self.test_auth.app = FakeApp(iter([
+ # GET of user object (account admin check)
+ ('200 Ok', {}, json.dumps({"groups": [{"name": "act:adm"},
+ {"name": "test"}, {"name": ".admin"}],
+ "auth": "plaintext:key"})),
+ # GET of requested user object [who is an .admin
+ # as well]
+ ('200 Ok', {}, json.dumps(
+ {"groups": [{"name": "act:usr"}, {"name": "act"},
+ {"name": ".admin"}],
+ "auth": "plaintext:key"})),
+ # GET of user object (reseller admin check [and fail
+ # here])
+ ('200 Ok', {}, json.dumps({"groups": [{"name": "act:adm"},
+ {"name": "test"}, {"name": ".admin"}],
+ "auth": "plaintext:key"}))]))
+ resp = Request.blank('/auth/v2/act/usr',
+ headers={
+ 'X-Auth-Admin-User':
+ 'act:adm',
+ 'X-Auth-Admin-Key': 'key'}
+ ).get_response(self.test_auth)
+ self.assertEquals(resp.status_int, 403)
+ self.assertEquals(self.test_auth.app.calls, 3)
+
+ def test_get_user_account_admin_fail_getting_reseller_admin(
+ self):
+ self.test_auth.app = FakeApp(iter([
+ # GET of user object (account admin check)
+ ('200 Ok', {}, json.dumps({"groups": [{"name": "act:adm"},
+ {"name": "test"}, {"name": ".admin"}],
+ "auth": "plaintext:key"})),
+ # GET of requested user object [who is a
+ # .reseller_admin]
+ ('200 Ok', {}, json.dumps(
+ {"groups": [{"name": "act:usr"}, {"name": "act"},
+ {"name": ".reseller_admin"}],
+ "auth": "plaintext:key"}))]))
+ resp = Request.blank('/auth/v2/act/usr',
+ headers={
+ 'X-Auth-Admin-User':
+ 'act:adm',
+ 'X-Auth-Admin-Key': 'key'}
+ ).get_response(self.test_auth)
+ self.assertEquals(resp.status_int, 403)
+ self.assertEquals(self.test_auth.app.calls, 2)
+
+ def test_get_user_reseller_admin_fail_getting_reseller_admin(
+ self):
+ self.test_auth.app = FakeApp(iter([
+ # GET of user object (account admin check)
+ ('200 Ok', {}, json.dumps({"groups": [{"name": "act:adm"},
+ {"name": "test"}, {"name": ".reseller_admin"}],
+ "auth": "plaintext:key"})),
+ # GET of requested user object [who also is a
+ # .reseller_admin]
+ ('200 Ok', {}, json.dumps(
+ {"groups": [{"name": "act:usr"}, {"name": "act"},
+ {"name": ".reseller_admin"}],
+ "auth": "plaintext:key"}))]))
+ resp = Request.blank('/auth/v2/act/usr',
+ headers={
+ 'X-Auth-Admin-User':
+ 'act:adm',
+ 'X-Auth-Admin-Key': 'key'}
+ ).get_response(self.test_auth)
+ self.assertEquals(resp.status_int, 403)
+ self.assertEquals(self.test_auth.app.calls, 2)
+
+ def test_get_user_super_admin_succeed_getting_reseller_admin(
+ self):
+ self.test_auth.app = FakeApp(iter([
+ # GET of requested user object
+ ('200 Ok', {}, json.dumps(
+ {"groups": [{"name": "act:usr"}, {"name": "act"},
+ {"name": ".reseller_admin"}],
+ "auth": "plaintext:key"}))]))
+ resp = Request.blank('/auth/v2/act/usr',
+ headers={
+ 'X-Auth-Admin-User':
+ '.super_admin',
+ 'X-Auth-Admin-Key': 'supertest'}
+ ).get_response(self.test_auth)
+ self.assertEquals(resp.status_int, 200)
+ self.assertEquals(resp.body, json.dumps(
+ {"groups": [{"name": "act:usr"}, {"name": "act"},
+ {"name": ".reseller_admin"}],
+ "auth": "plaintext:key"}))
+ self.assertEquals(self.test_auth.app.calls, 1)
+
+ def test_get_user_groups_not_found(self):
+ self.test_auth.app = FakeApp(iter([
+ # GET of account container (list objects)
+ ('404 Not Found', {}, '')]))
+ resp = Request.blank('/auth/v2/act/.groups',
+ headers={
+ 'X-Auth-Admin-User':
+ '.super_admin',
+ 'X-Auth-Admin-Key': 'supertest'}
+ ).get_response(self.test_auth)
+ self.assertEquals(resp.status_int, 404)
+ self.assertEquals(self.test_auth.app.calls, 1)
+
+ def test_get_user_groups_fail_listing(self):
+ self.test_auth.app = FakeApp(iter([
+ # GET of account container (list objects)
+ ('503 Service Unavailable', {}, '')]))
+ resp = Request.blank('/auth/v2/act/.groups',
+ headers={
+ 'X-Auth-Admin-User':
+ '.super_admin',
+ 'X-Auth-Admin-Key': 'supertest'}
+ ).get_response(self.test_auth)
+ self.assertEquals(resp.status_int, 500)
+ self.assertEquals(self.test_auth.app.calls, 1)
+
+ def test_get_user_groups_fail_get_user(self):
+ self.test_auth.app = FakeApp(iter([
+ # GET of account container (list objects)
+ ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'},
+ json.dumps([
+ {"name": ".services", "hash": "etag", "bytes": 112,
+ "content_type":
+ "application/octet-stream",
+ "last_modified": "2010-12-03T17:16:27.618110"},
+ {"name": "tester", "hash": "etag", "bytes": 104,
+ "content_type":
+ "application/octet-stream",
+ "last_modified": "2010-12-03T17:16:27.736680"},
+ {"name": "tester3", "hash": "etag", "bytes": 86,
+ "content_type":
+ "application/octet-stream",
+ "last_modified": "2010-12-03T17:16:28.135530"}])),
+ # GET of user object
+ ('503 Service Unavailable', {}, '')]))
+ resp = Request.blank('/auth/v2/act/.groups',
+ headers={
+ 'X-Auth-Admin-User':
+ '.super_admin',
+ 'X-Auth-Admin-Key': 'supertest'}
+ ).get_response(self.test_auth)
+ self.assertEquals(resp.status_int, 500)
+ self.assertEquals(self.test_auth.app.calls, 2)
+
+ def test_get_user_not_found(self):
+ self.test_auth.app = FakeApp(iter([
+ # GET of user object
+ ('404 Not Found', {}, '')]))
+ resp = Request.blank('/auth/v2/act/usr',
+ headers={
+ 'X-Auth-Admin-User':
+ '.super_admin',
+ 'X-Auth-Admin-Key': 'supertest'}
+ ).get_response(self.test_auth)
+ self.assertEquals(resp.status_int, 404)
+ self.assertEquals(self.test_auth.app.calls, 1)
+
+ def test_get_user_fail(self):
+ self.test_auth.app = FakeApp(iter([
+ # GET of user object
+ ('503 Service Unavailable', {}, '')]))
+ resp = Request.blank('/auth/v2/act/usr',
+ headers={
+ 'X-Auth-Admin-User':
+ '.super_admin',
+ 'X-Auth-Admin-Key':
+ 'supertest',
+ 'X-Auth-User-Key': 'key'}
+ ).get_response(self.test_auth)
+ self.assertEquals(resp.status_int, 500)
+ self.assertEquals(self.test_auth.app.calls, 1)
+
+ def test_put_user_fail_invalid_account(self):
+ resp = Request.blank('/auth/v2/.invalid/usr',
+ environ={
+ 'REQUEST_METHOD': 'PUT'},
+ headers={
+ 'X-Auth-Admin-User':
+ '.super_admin',
+ 'X-Auth-Admin-Key':
+ 'supertest',
+ 'X-Auth-User-Key': 'key'}
+ ).get_response(self.test_auth)
+ self.assertEquals(resp.status_int, 400)
+
+ def test_put_user_fail_invalid_user(self):
+ resp = Request.blank('/auth/v2/act/.usr',
+ environ={
+ 'REQUEST_METHOD': 'PUT'},
+ headers={
+ 'X-Auth-Admin-User':
+ '.super_admin',
+ 'X-Auth-Admin-Key':
+ 'supertest',
+ 'X-Auth-User-Key': 'key'}
+ ).get_response(self.test_auth)
+ self.assertEquals(resp.status_int, 400)
+
+ def test_put_user_fail_no_user_key(self):
+ resp = Request.blank('/auth/v2/act/usr',
+ environ={
+ 'REQUEST_METHOD': 'PUT'},
+ headers={
+ 'X-Auth-Admin-User':
+ '.super_admin',
+ 'X-Auth-Admin-Key': 'supertest'}
+ ).get_response(self.test_auth)
+ self.assertEquals(resp.status_int, 400)
+
+ def test_put_user_reseller_admin_fail_bad_creds(self):
+ self.test_auth.app = FakeApp(iter([
+ # GET of user object (reseller admin)
+ # This shouldn't actually get called, checked
+ # below
+ ('200 Ok', {}, json.dumps({"groups": [{"name": "act:rdm"},
+ {"name": "test"}, {"name": ".admin"},
+ {"name": ".reseller_admin"}], "auth": "plaintext:key"}))]))
+ resp = Request.blank('/auth/v2/act/usr',
+ environ={
+ 'REQUEST_METHOD': 'PUT'},
+ headers={
+ 'X-Auth-Admin-User':
+ 'act:rdm',
+ 'X-Auth-Admin-Key':
+ 'key',
+ 'X-Auth-User-Key':
+ 'key',
+ 'X-Auth-User-Reseller-Admin': 'true'}
+ ).get_response(self.test_auth)
+ self.assertEquals(resp.status_int, 403)
+ self.assertEquals(self.test_auth.app.calls, 0)
+
+ self.test_auth.app = FakeApp(iter([
+ # GET of user object (account admin, but not reseller admin)
+ # This shouldn't actually get called, checked
+ # below
+ ('200 Ok', {}, json.dumps({"groups": [{"name": "act:adm"},
+ {"name": "test"}, {"name": ".admin"}],
+ "auth": "plaintext:key"}))]))
+ resp = Request.blank('/auth/v2/act/usr',
+ environ={
+ 'REQUEST_METHOD': 'PUT'},
+ headers={
+ 'X-Auth-Admin-User':
+ 'act:adm',
+ 'X-Auth-Admin-Key':
+ 'key',
+ 'X-Auth-User-Key':
+ 'key',
+ 'X-Auth-User-Reseller-Admin': 'true'}
+ ).get_response(self.test_auth)
+ self.assertEquals(resp.status_int, 403)
+ self.assertEquals(self.test_auth.app.calls, 0)
+
+ self.test_auth.app = FakeApp(iter([
+ # GET of user object (regular user)
+ # This shouldn't actually get called, checked
+ # below
+ ('200 Ok', {}, json.dumps({"groups": [{"name": "act:usr"},
+ {"name": "test"}], "auth": "plaintext:key"}))]))
+ resp = Request.blank('/auth/v2/act/usr',
+ environ={
+ 'REQUEST_METHOD': 'PUT'},
+ headers={
+ 'X-Auth-Admin-User':
+ 'act:adm',
+ 'X-Auth-Admin-Key':
+ 'key',
+ 'X-Auth-User-Key':
+ 'key',
+ 'X-Auth-User-Reseller-Admin': 'true'}
+ ).get_response(self.test_auth)
+ self.assertEquals(resp.status_int, 403)
+ self.assertEquals(self.test_auth.app.calls, 0)
+
+ def test_put_user_account_admin_fail_bad_creds(self):
+ self.test_auth.app = FakeApp(iter([
+ # GET of user object (account admin, but wrong
+ # account)
+ ('200 Ok', {}, json.dumps({"groups": [{"name": "act2:adm"},
+ {"name": "test"}, {"name": ".admin"}],
+ "auth": "plaintext:key"}))]))
+ resp = Request.blank('/auth/v2/act/usr',
+ environ={
+ 'REQUEST_METHOD': 'PUT'},
+ headers={
+ 'X-Auth-Admin-User':
+ 'act2:adm',
+ 'X-Auth-Admin-Key':
+ 'key',
+ 'X-Auth-User-Key':
+ 'key',
+ 'X-Auth-User-Admin': 'true'}
+ ).get_response(self.test_auth)
+ self.assertEquals(resp.status_int, 403)
+ self.assertEquals(self.test_auth.app.calls, 1)
+
+ self.test_auth.app = FakeApp(iter([
+ # GET of user object (regular user)
+ ('200 Ok', {}, json.dumps({"groups": [{"name": "act:usr"},
+ {"name": "test"}], "auth": "plaintext:key"}))]))
+ resp = Request.blank('/auth/v2/act/usr',
+ environ={
+ 'REQUEST_METHOD': 'PUT'},
+ headers={
+ 'X-Auth-Admin-User':
+ 'act:usr',
+ 'X-Auth-Admin-Key':
+ 'key',
+ 'X-Auth-User-Key':
+ 'key',
+ 'X-Auth-User-Admin': 'true'}
+ ).get_response(self.test_auth)
+ self.assertEquals(resp.status_int, 403)
+ self.assertEquals(self.test_auth.app.calls, 1)
+
+ def test_put_user_regular_fail_bad_creds(self):
+ self.test_auth.app = FakeApp(iter([
+ # GET of user object (account admin, but wrong
+ # account)
+ ('200 Ok', {}, json.dumps({"groups": [{"name": "act2:adm"},
+ {"name": "test"}, {"name": ".admin"}],
+ "auth": "plaintext:key"}))]))
+ resp = Request.blank('/auth/v2/act/usr',
+ environ={
+ 'REQUEST_METHOD': 'PUT'},
+ headers={
+ 'X-Auth-Admin-User':
+ 'act2:adm',
+ 'X-Auth-Admin-Key':
+ 'key',
+ 'X-Auth-User-Key': 'key'}
+ ).get_response(self.test_auth)
+ self.assertEquals(resp.status_int, 403)
+ self.assertEquals(self.test_auth.app.calls, 1)
+
+ self.test_auth.app = FakeApp(iter([
+ # GET of user object (regular user)
+ ('200 Ok', {}, json.dumps({"groups": [{"name": "act:usr"},
+ {"name": "test"}], "auth": "plaintext:key"}))]))
+ resp = Request.blank('/auth/v2/act/usr',
+ environ={
+ 'REQUEST_METHOD': 'PUT'},
+ headers={
+ 'X-Auth-Admin-User':
+ 'act:usr',
+ 'X-Auth-Admin-Key':
+ 'key',
+ 'X-Auth-User-Key': 'key'}
+ ).get_response(self.test_auth)
+ self.assertEquals(resp.status_int, 403)
+ self.assertEquals(self.test_auth.app.calls, 1)
+
+ def test_put_user_regular_success(self):
+ self.test_auth.app = FakeApp(iter([
+ ('200 Ok',
+ {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, ''),
+ # PUT of user object
+ ('201 Created', {}, '')]))
+ resp = Request.blank('/auth/v2/act/usr',
+ environ={
+ 'REQUEST_METHOD': 'PUT'},
+ headers={
+ 'X-Auth-Admin-User':
+ '.super_admin',
+ 'X-Auth-Admin-Key':
+ 'supertest',
+ 'X-Auth-User-Key': 'key'}
+ ).get_response(self.test_auth)
+ self.assertEquals(resp.status_int, 201)
+ self.assertEquals(self.test_auth.app.calls, 2)
+ self.assertEquals(
+ json.loads(self.test_auth.app.request.body),
+ {"groups": [{"name": "act:usr"}, {"name": "act"}],
+ "auth": "plaintext:key"})
+
+ def test_put_user_special_chars_success(self):
+ self.test_auth.app = FakeApp(iter([
+ ('200 Ok',
+ {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, ''),
+ # PUT of user object
+ ('201 Created', {}, '')]))
+ resp = Request.blank('/auth/v2/act/u_s-r',
+ environ={
+ 'REQUEST_METHOD': 'PUT'},
+ headers={
+ 'X-Auth-Admin-User':
+ '.super_admin',
+ 'X-Auth-Admin-Key':
+ 'supertest',
+ 'X-Auth-User-Key': 'key'}
+ ).get_response(self.test_auth)
+ self.assertEquals(resp.status_int, 201)
+ self.assertEquals(self.test_auth.app.calls, 2)
+ self.assertEquals(
+ json.loads(self.test_auth.app.request.body),
+ {"groups": [{"name": "act:u_s-r"}, {"name": "act"}],
+ "auth": "plaintext:key"})
+
+ def test_put_user_account_admin_success(self):
+ self.test_auth.app = FakeApp(iter([
+ ('200 Ok',
+ {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, ''),
+ # PUT of user object
+ ('201 Created', {}, '')]))
+ resp = Request.blank('/auth/v2/act/usr',
+ environ={
+ 'REQUEST_METHOD': 'PUT'},
+ headers={
+ 'X-Auth-Admin-User':
+ '.super_admin',
+ 'X-Auth-Admin-Key':
+ 'supertest',
+ 'X-Auth-User-Key': 'key',
+ 'X-Auth-User-Admin': 'true'}
+ ).get_response(self.test_auth)
+ self.assertEquals(resp.status_int, 201)
+ self.assertEquals(self.test_auth.app.calls, 2)
+ self.assertEquals(
+ json.loads(self.test_auth.app.request.body),
+ {"groups": [{"name": "act:usr"}, {"name": "act"},
+ {"name": ".admin"}],
+ "auth": "plaintext:key"})
+
+ def test_put_user_reseller_admin_success(self):
+ self.test_auth.app = FakeApp(iter([
+ ('200 Ok',
+ {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, ''),
+ # PUT of user object
+ ('201 Created', {}, '')]))
+ resp = Request.blank('/auth/v2/act/usr',
+ environ={
+ 'REQUEST_METHOD': 'PUT'},
+ headers={
+ 'X-Auth-Admin-User':
+ '.super_admin',
+ 'X-Auth-Admin-Key':
+ 'supertest',
+ 'X-Auth-User-Key': 'key',
+ 'X-Auth-User-Reseller-Admin': 'true'}
+ ).get_response(self.test_auth)
+ self.assertEquals(resp.status_int, 201)
+ self.assertEquals(self.test_auth.app.calls, 2)
+ self.assertEquals(
+ json.loads(self.test_auth.app.request.body),
+ {"groups": [{"name": "act:usr"}, {"name": "act"},
+ {"name": ".admin"}, {"name": ".reseller_admin"}],
+ "auth": "plaintext:key"})
+
+ def test_put_user_fail_not_found(self):
+ self.test_auth.app = FakeApp(iter([
+ ('200 Ok',
+ {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, ''),
+ # PUT of user object
+ ('404 Not Found', {}, '')]))
+ resp = Request.blank('/auth/v2/act/usr',
+ environ={
+ 'REQUEST_METHOD': 'PUT'},
+ headers={
+ 'X-Auth-Admin-User':
+ '.super_admin',
+ 'X-Auth-Admin-Key':
+ 'supertest',
+ 'X-Auth-User-Key': 'key'}
+ ).get_response(self.test_auth)
+ self.assertEquals(resp.status_int, 404)
+ self.assertEquals(self.test_auth.app.calls, 2)
+
+ def test_put_user_fail(self):
+ self.test_auth.app = FakeApp(iter([
+ # PUT of user object
+ ('503 Service Unavailable', {}, '')]))
+ resp = Request.blank('/auth/v2/act/usr',
+ environ={
+ 'REQUEST_METHOD': 'PUT'},
+ headers={
+ 'X-Auth-Admin-User':
+ '.super_admin',
+ 'X-Auth-Admin-Key':
+ 'supertest',
+ 'X-Auth-User-Key': 'key'}
+ ).get_response(self.test_auth)
+ self.assertEquals(resp.status_int, 500)
+ self.assertEquals(self.test_auth.app.calls, 1)
+
+ def test_delete_user_bad_creds(self):
+ self.test_auth.app = FakeApp(iter([
+ # GET of user object (account admin, but wrong
+ # account)
+ ('200 Ok', {}, json.dumps({"groups": [{"name": "act2:adm"},
+ {"name": "test"}, {"name": ".admin"}],
+ "auth": "plaintext:key"}))]))
+ resp = Request.blank('/auth/v2/act/usr',
+ environ={
+ 'REQUEST_METHOD': 'DELETE'},
+ headers={
+ 'X-Auth-Admin-User':
+ 'act2:adm',
+ 'X-Auth-Admin-Key': 'key'}
+ ).get_response(self.test_auth)
+ self.assertEquals(resp.status_int, 403)
+ self.assertEquals(self.test_auth.app.calls, 1)
+
+ self.test_auth.app = FakeApp(iter([
+ # GET of user object (regular user)
+ ('200 Ok', {}, json.dumps({"groups": [{"name": "act:usr"},
+ {"name": "test"}], "auth": "plaintext:key"}))]))
+ resp = Request.blank('/auth/v2/act/usr',
+ environ={
+ 'REQUEST_METHOD': 'DELETE'},
+ headers={
+ 'X-Auth-Admin-User':
+ 'act:usr',
+ 'X-Auth-Admin-Key': 'key'}
+ ).get_response(self.test_auth)
+ self.assertEquals(resp.status_int, 403)
+ self.assertEquals(self.test_auth.app.calls, 1)
+
+ def test_delete_user_invalid_account(self):
+ resp = Request.blank('/auth/v2/.invalid/usr',
+ environ={
+ 'REQUEST_METHOD': 'DELETE'},
+ headers={
+ 'X-Auth-Admin-User':
+ '.super_admin',
+ 'X-Auth-Admin-Key': 'supertest'}
+ ).get_response(self.test_auth)
+ self.assertEquals(resp.status_int, 400)
+
+ def test_delete_user_invalid_user(self):
+ resp = Request.blank('/auth/v2/act/.invalid',
+ environ={
+ 'REQUEST_METHOD': 'DELETE'},
+ headers={
+ 'X-Auth-Admin-User':
+ '.super_admin',
+ 'X-Auth-Admin-Key': 'supertest'}
+ ).get_response(self.test_auth)
+ self.assertEquals(resp.status_int, 400)
+
+ def test_delete_user_not_found(self):
+ self.test_auth.app = FakeApp(iter([
+ # HEAD of user object
+ ('404 Not Found', {}, '')]))
+ resp = Request.blank('/auth/v2/act/usr',
+ environ={
+ 'REQUEST_METHOD': 'DELETE'},
+ headers={
+ 'X-Auth-Admin-User':
+ '.super_admin',
+ 'X-Auth-Admin-Key': 'supertest'}
+ ).get_response(self.test_auth)
+ self.assertEquals(resp.status_int, 404)
+ self.assertEquals(self.test_auth.app.calls, 1)
+
+ def test_delete_user_fail_head_user(self):
+ self.test_auth.app = FakeApp(iter([
+ # HEAD of user object
+ ('503 Service Unavailable', {}, '')]))
+ resp = Request.blank('/auth/v2/act/usr',
+ environ={
+ 'REQUEST_METHOD': 'DELETE'},
+ headers={
+ 'X-Auth-Admin-User':
+ '.super_admin',
+ 'X-Auth-Admin-Key': 'supertest'}
+ ).get_response(self.test_auth)
+ self.assertEquals(resp.status_int, 500)
+ self.assertEquals(self.test_auth.app.calls, 1)
+
+ def test_delete_user_fail_delete_token(self):
+ self.test_auth.app = FakeApp(iter([
+ # HEAD of user object
+ ('200 Ok',
+ {'X-Object-Meta-Auth-Token': 'AUTH_tk'}, ''),
+ # DELETE of token
+ ('503 Service Unavailable', {}, '')]))
+ resp = Request.blank('/auth/v2/act/usr',
+ environ={
+ 'REQUEST_METHOD': 'DELETE'},
+ headers={
+ 'X-Auth-Admin-User':
+ '.super_admin',
+ 'X-Auth-Admin-Key': 'supertest'}
+ ).get_response(self.test_auth)
+ self.assertEquals(resp.status_int, 500)
+ self.assertEquals(self.test_auth.app.calls, 2)
+
+ def test_delete_user_fail_delete_user(self):
+ self.test_auth.app = FakeApp(iter([
+ # HEAD of user object
+ ('200 Ok',
+ {'X-Object-Meta-Auth-Token': 'AUTH_tk'}, ''),
+ # DELETE of token
+ ('204 No Content', {}, ''),
+ # DELETE of user object
+ ('503 Service Unavailable', {}, '')]))
+ resp = Request.blank('/auth/v2/act/usr',
+ environ={
+ 'REQUEST_METHOD': 'DELETE'},
+ headers={
+ 'X-Auth-Admin-User':
+ '.super_admin',
+ 'X-Auth-Admin-Key': 'supertest'}
+ ).get_response(self.test_auth)
+ self.assertEquals(resp.status_int, 500)
+ self.assertEquals(self.test_auth.app.calls, 3)
+
+ def test_delete_user_success(self):
+ self.test_auth.app = FakeApp(iter([
+ # HEAD of user object
+ ('200 Ok',
+ {'X-Object-Meta-Auth-Token': 'AUTH_tk'}, ''),
+ # DELETE of token
+ ('204 No Content', {}, ''),
+ # DELETE of user object
+ ('204 No Content', {}, '')]))
+ resp = Request.blank('/auth/v2/act/usr',
+ environ={
+ 'REQUEST_METHOD': 'DELETE'},
+ headers={
+ 'X-Auth-Admin-User':
+ '.super_admin',
+ 'X-Auth-Admin-Key': 'supertest'}
+ ).get_response(self.test_auth)
+ self.assertEquals(resp.status_int, 204)
+ self.assertEquals(self.test_auth.app.calls, 3)
+
+ def test_delete_user_success_missing_user_at_end(self):
+ self.test_auth.app = FakeApp(iter([
+ # HEAD of user object
+ ('200 Ok',
+ {'X-Object-Meta-Auth-Token': 'AUTH_tk'}, ''),
+ # DELETE of token
+ ('204 No Content', {}, ''),
+ # DELETE of user object
+ ('404 Not Found', {}, '')]))
+ resp = Request.blank('/auth/v2/act/usr',
+ environ={
+ 'REQUEST_METHOD': 'DELETE'},
+ headers={
+ 'X-Auth-Admin-User':
+ '.super_admin',
+ 'X-Auth-Admin-Key': 'supertest'}
+ ).get_response(self.test_auth)
+ self.assertEquals(resp.status_int, 204)
+ self.assertEquals(self.test_auth.app.calls, 3)
+
+ def test_delete_user_success_missing_token(self):
+ self.test_auth.app = FakeApp(iter([
+ # HEAD of user object
+ ('200 Ok',
+ {'X-Object-Meta-Auth-Token': 'AUTH_tk'}, ''),
+ # DELETE of token
+ ('404 Not Found', {}, ''),
+ # DELETE of user object
+ ('204 No Content', {}, '')]))
+ resp = Request.blank('/auth/v2/act/usr',
+ environ={
+ 'REQUEST_METHOD': 'DELETE'},
+ headers={
+ 'X-Auth-Admin-User':
+ '.super_admin',
+ 'X-Auth-Admin-Key': 'supertest'}
+ ).get_response(self.test_auth)
+ self.assertEquals(resp.status_int, 204)
+ self.assertEquals(self.test_auth.app.calls, 3)
+
+ def test_delete_user_success_no_token(self):
+ self.test_auth.app = FakeApp(iter([
+ # HEAD of user object
+ ('200 Ok', {}, ''),
+ # DELETE of user object
+ ('204 No Content', {}, '')]))
+ resp = Request.blank('/auth/v2/act/usr',
+ environ={
+ 'REQUEST_METHOD': 'DELETE'},
+ headers={
+ 'X-Auth-Admin-User':
+ '.super_admin',
+ 'X-Auth-Admin-Key': 'supertest'}
+ ).get_response(self.test_auth)
+ self.assertEquals(resp.status_int, 204)
+ self.assertEquals(self.test_auth.app.calls, 2)
+
+ def test_validate_token_bad_prefix(self):
+ resp = Request.blank('/auth/v2/.token/BAD_token'
+ ).get_response(self.test_auth)
+ self.assertEquals(resp.status_int, 400)
+
+ def test_validate_token_tmi(self):
+ resp = Request.blank(
+ '/auth/v2/.token/AUTH_token/tmi').get_response(self.test_auth)
+ self.assertEquals(resp.status_int, 400)
+
+ def test_validate_token_bad_memcache(self):
+ fake_memcache = FakeMemcache()
+ fake_memcache.set('AUTH_/auth/AUTH_token', 'bogus')
+ resp = Request.blank(
+ '/auth/v2/.token/AUTH_token',
+ environ={'swift.cache': fake_memcache}).get_response(
+ self.test_auth)
+ self.assertEquals(resp.status_int, 500)
+
+ def test_validate_token_from_memcache(self):
+ fake_memcache = FakeMemcache()
+ fake_memcache.set(
+ 'AUTH_/auth/AUTH_token',
+ (time() + 1,
+ 'act:usr,act'))
+ resp = Request.blank(
+ '/auth/v2/.token/AUTH_token',
+ environ={'swift.cache': fake_memcache}).get_response(
+ self.test_auth)
+ self.assertEquals(resp.status_int, 204)
+ self.assertEquals(
+ resp.headers.get('x-auth-groups'),
+ 'act:usr,act')
+ self.assert_(float(resp.headers['x-auth-ttl']) < 1,
+ resp.headers['x-auth-ttl'])
+
+ def test_validate_token_from_memcache_expired(self):
+ fake_memcache = FakeMemcache()
+ fake_memcache.set(
+ 'AUTH_/auth/AUTH_token',
+ (time() - 1,
+ 'act:usr,act'))
+ resp = Request.blank(
+ '/auth/v2/.token/AUTH_token',
+ environ={'swift.cache': fake_memcache}).get_response(
+ self.test_auth)
+ self.assertEquals(resp.status_int, 404)
+ self.assert_('x-auth-groups' not in resp.headers)
+ self.assert_('x-auth-ttl' not in resp.headers)
+
+ def test_validate_token_from_object(self):
+ self.test_auth.app = FakeApp(iter([
+ # GET of token object
+ ('200 Ok', {}, json.dumps({'groups': [{'name': 'act:usr'},
+ {'name': 'act'}], 'expires': time() + 1}))]))
+ resp = Request.blank('/auth/v2/.token/AUTH_token'
+ ).get_response(self.test_auth)
+ self.assertEquals(resp.status_int, 204)
+ self.assertEquals(self.test_auth.app.calls, 1)
+ self.assertEquals(
+ resp.headers.get('x-auth-groups'),
+ 'act:usr,act')
+ self.assert_(float(resp.headers['x-auth-ttl']) < 1,
+ resp.headers['x-auth-ttl'])
+
+ def test_validate_token_from_object_expired(self):
+ self.test_auth.app = FakeApp(iter([
+ # GET of token object
+ ('200 Ok', {}, json.dumps({'groups': 'act:usr,act',
+ 'expires': time() - 1})),
+ # DELETE of expired token object
+ ('204 No Content', {}, '')]))
+ resp = Request.blank('/auth/v2/.token/AUTH_token'
+ ).get_response(self.test_auth)
+ self.assertEquals(resp.status_int, 404)
+ self.assertEquals(self.test_auth.app.calls, 2)
+
+ def test_validate_token_from_object_with_admin(self):
+ self.test_auth.app = FakeApp(iter([
+ # GET of token object
+ ('200 Ok', {}, json.dumps({'account_id': 'AUTH_cfa', 'groups':
+ [{'name': 'act:usr'},
+ {'name': 'act'},
+ {'name': '.admin'}],
+ 'expires': time() + 1}))]))
+ resp = Request.blank('/auth/v2/.token/AUTH_token'
+ ).get_response(self.test_auth)
+ self.assertEquals(resp.status_int, 204)
+ self.assertEquals(self.test_auth.app.calls, 1)
+ self.assertEquals(resp.headers.get('x-auth-groups'),
+ 'act:usr,act,AUTH_cfa')
+ self.assert_(float(resp.headers['x-auth-ttl']) < 1,
+ resp.headers['x-auth-ttl'])
+
+ def test_get_conn_default(self):
+ conn = self.test_auth.get_conn()
+ self.assertEquals(
+ conn.__class__,
+ auth.HTTPConnection)
+ self.assertEquals(conn.host, '127.0.0.1')
+ self.assertEquals(conn.port, 8080)
+
+ def test_get_conn_default_https(self):
+ local_auth = auth.filter_factory(
+ {'super_admin_key': 'supertest',
+ 'default_swift_cluster': 'local#https://1.2.3.4/v1'})(FakeApp())
+ conn = local_auth.get_conn()
+ self.assertEquals(
+ conn.__class__,
+ auth.HTTPSConnection)
+ self.assertEquals(conn.host, '1.2.3.4')
+ self.assertEquals(conn.port, 443)
+
+ def test_get_conn_overridden(self):
+ local_auth = auth.filter_factory(
+ {'super_admin_key': 'supertest',
+ 'default_swift_cluster': 'local#https://1.2.3.4/v1'})(FakeApp())
+ conn = \
+ local_auth.get_conn(
+ urlparsed=auth.urlparse('http://5.6.7.8/v1'))
+ self.assertEquals(
+ conn.__class__,
+ auth.HTTPConnection)
+ self.assertEquals(conn.host, '5.6.7.8')
+ self.assertEquals(conn.port, 80)
+
+ def test_get_conn_overridden_https(self):
+ local_auth = auth.filter_factory(
+ {'super_admin_key': 'supertest',
+ 'default_swift_cluster': 'local#http://1.2.3.4/v1'})(FakeApp())
+ conn = \
+ local_auth.get_conn(
+ urlparsed=auth.urlparse(
+ 'https://5.6.7.8/v1'))
+ self.assertEquals(
+ conn.__class__,
+ auth.HTTPSConnection)
+ self.assertEquals(conn.host, '5.6.7.8')
+ self.assertEquals(conn.port, 443)
+
+ def test_get_itoken_fail_no_memcache(self):
+ exc = None
+ try:
+ self.test_auth.get_itoken({})
+ except Exception as err:
+ exc = err
+ self.assertEquals(str(exc),
+ 'No memcache set up; required for Swauth middleware')
+
+ def test_get_itoken_success(self):
+ fmc = FakeMemcache()
+ itk = self.test_auth.get_itoken(
+ {'swift.cache': fmc})
+ self.assert_(itk.startswith('AUTH_itk'), itk)
+ expires, groups = fmc.get('AUTH_/auth/%s' % itk)
+ self.assert_(expires > time(), expires)
+ self.assertEquals(
+ groups,
+ '.auth,.reseller_admin,AUTH_.auth')
+
+ def test_get_admin_detail_fail_no_colon(self):
+ self.test_auth.app = FakeApp(iter([]))
+ self.assertEquals(
+ self.test_auth.get_admin_detail(
+ Request.blank('/')),
+ None)
+ self.assertEquals(
+ self.test_auth.get_admin_detail(
+ Request.blank('/',
+ headers={'X-Auth-Admin-User': 'usr'})), None)
+ self.assertRaises(
+ StopIteration, self.test_auth.get_admin_detail,
+ Request.blank('/', headers={'X-Auth-Admin-User': 'act:usr'}))
+
+ def test_get_admin_detail_fail_user_not_found(self):
+ self.test_auth.app = FakeApp(
+ iter([('404 Not Found', {}, '')]))
+ self.assertEquals(
+ self.test_auth.get_admin_detail(
+ Request.blank('/',
+ headers={'X-Auth-Admin-User': 'act:usr'})), None)
+ self.assertEquals(self.test_auth.app.calls, 1)
+
+ def test_get_admin_detail_fail_get_user_error(self):
+ self.test_auth.app = FakeApp(iter([
+ ('503 Service Unavailable', {}, '')]))
+ exc = None
+ try:
+ self.test_auth.get_admin_detail(
+ Request.blank('/',
+ headers={'X-Auth-Admin-User': 'act:usr'}))
+ except Exception as err:
+ exc = err
+ self.assertEquals(str(exc), 'Could not get admin user object: '
+ '/v1/AUTH_.auth/act/usr 503 Service Unavailable')
+ self.assertEquals(self.test_auth.app.calls, 1)
+
+ def test_get_admin_detail_success(self):
+ self.test_auth.app = FakeApp(iter([
+ ('200 Ok', {},
+ json.dumps({"auth": "plaintext:key",
+ "groups": [{'name': "act:usr"}, {'name': "act"},
+ {'name': ".admin"}]}))]))
+ detail = self.test_auth.get_admin_detail(
+ Request.blank('/',
+ headers={'X-Auth-Admin-User': 'act:usr'}))
+ self.assertEquals(self.test_auth.app.calls, 1)
+ self.assertEquals(
+ detail, {'account': 'act',
+ 'auth': 'plaintext:key',
+ 'groups': [{'name': 'act:usr'}, {'name': 'act'},
+ {'name': '.admin'}]})
+
+ def test_credentials_match_success(self):
+ self.assert_(self.test_auth.credentials_match(
+ {'auth': 'plaintext:key'}, 'key'))
+
+ def test_credentials_match_fail_no_details(self):
+ self.assert_(
+ not self.test_auth.credentials_match(None, 'notkey'))
+
+ def test_credentials_match_fail_plaintext(self):
+ self.assert_(not self.test_auth.credentials_match(
+ {'auth': 'plaintext:key'}, 'notkey'))
+
+ def test_is_super_admin_success(self):
+ self.assert_(
+ self.test_auth.is_super_admin(
+ Request.blank(
+ '/',
+ headers={'X-Auth-Admin-User': '.super_admin',
+ 'X-Auth-Admin-Key': 'supertest'})))
+
+ def test_is_super_admin_fail_bad_key(self):
+ self.assert_(
+ not self.test_auth.is_super_admin(
+ Request.blank('/',
+ headers={
+ 'X-Auth-Admin-User':
+ '.super_admin',
+ 'X-Auth-Admin-Key': 'bad'})))
+ self.assert_(
+ not self.test_auth.is_super_admin(
+ Request.blank('/',
+ headers={'X-Auth-Admin-User': '.super_admin'})))
+ self.assert_(
+ not self.test_auth.is_super_admin(Request.blank('/')))
+
+ def test_is_super_admin_fail_bad_user(self):
+ self.assert_(
+ not self.test_auth.is_super_admin(
+ Request.blank('/',
+ headers={
+ 'X-Auth-Admin-User':
+ 'bad',
+ 'X-Auth-Admin-Key': 'supertest'})))
+ self.assert_(
+ not self.test_auth.is_super_admin(
+ Request.blank('/',
+ headers={'X-Auth-Admin-Key': 'supertest'})))
+ self.assert_(
+ not self.test_auth.is_super_admin(Request.blank('/')))
+
+ def test_is_reseller_admin_success_is_super_admin(self):
+ self.assert_(
+ self.test_auth.is_reseller_admin(
+ Request.blank('/',
+ headers={
+ 'X-Auth-Admin-User':
+ '.super_admin',
+ 'X-Auth-Admin-Key': 'supertest'})))
+
+ def test_is_reseller_admin_success_called_get_admin_detail(
+ self):
+ self.test_auth.app = FakeApp(iter([
+ ('200 Ok', {},
+ json.dumps({'auth': 'plaintext:key',
+ 'groups': [{'name': 'act:rdm'}, {'name': 'act'},
+ {'name': '.admin'},
+ {'name': '.reseller_admin'}]}))]))
+ self.assert_(
+ self.test_auth.is_reseller_admin(
+ Request.blank('/',
+ headers={
+ 'X-Auth-Admin-User':
+ 'act:rdm',
+ 'X-Auth-Admin-Key': 'key'})))
+
+ def test_is_reseller_admin_fail_only_account_admin(
+ self):
+ self.test_auth.app = FakeApp(iter([
+ ('200 Ok', {},
+ json.dumps({'auth': 'plaintext:key',
+ 'groups': [{'name': 'act:adm'}, {'name': 'act'},
+ {'name': '.admin'}]}))]))
+ self.assert_(
+ not self.test_auth.is_reseller_admin(
+ Request.blank('/',
+ headers={
+ 'X-Auth-Admin-User':
+ 'act:adm',
+ 'X-Auth-Admin-Key': 'key'})))
+
+ def test_is_reseller_admin_fail_regular_user(self):
+ self.test_auth.app = FakeApp(iter([
+ ('200 Ok', {},
+ json.dumps({'auth': 'plaintext:key',
+ 'groups': [{'name': 'act:usr'}, {'name': 'act'}]}))]))
+ self.assert_(
+ not self.test_auth.is_reseller_admin(
+ Request.blank('/',
+ headers={
+ 'X-Auth-Admin-User':
+ 'act:usr',
+ 'X-Auth-Admin-Key': 'key'})))
+
+ def test_is_reseller_admin_fail_bad_key(self):
+ self.test_auth.app = FakeApp(iter([
+ ('200 Ok', {},
+ json.dumps({'auth': 'plaintext:key',
+ 'groups': [{'name': 'act:rdm'}, {'name': 'act'},
+ {'name': '.admin'},
+ {'name': '.reseller_admin'}]}))]))
+ self.assert_(
+ not self.test_auth.is_reseller_admin(
+ Request.blank('/',
+ headers={
+ 'X-Auth-Admin-User':
+ 'act:rdm',
+ 'X-Auth-Admin-Key': 'bad'})))
+
+ def test_is_account_admin_success_is_super_admin(self):
+ self.assert_(
+ self.test_auth.is_account_admin(
+ Request.blank('/',
+ headers={
+ 'X-Auth-Admin-User':
+ '.super_admin',
+ 'X-Auth-Admin-Key': 'supertest'}), 'act'))
+
+ def test_is_account_admin_success_is_reseller_admin(
+ self):
+ self.test_auth.app = FakeApp(iter([
+ ('200 Ok', {},
+ json.dumps({'auth': 'plaintext:key',
+ 'groups': [{'name': 'act:rdm'}, {'name': 'act'},
+ {'name': '.admin'},
+ {'name': '.reseller_admin'}]}))]))
+ self.assert_(
+ self.test_auth.is_account_admin(
+ Request.blank('/',
+ headers={
+ 'X-Auth-Admin-User':
+ 'act:rdm',
+ 'X-Auth-Admin-Key': 'key'}), 'act'))
+
+ def test_is_account_admin_success(self):
+ self.test_auth.app = FakeApp(iter([
+ ('200 Ok', {},
+ json.dumps({'auth': 'plaintext:key',
+ 'groups': [{'name': 'act:adm'}, {'name': 'act'},
+ {'name': '.admin'}]}))]))
+ self.assert_(
+ self.test_auth.is_account_admin(
+ Request.blank('/',
+ headers={
+ 'X-Auth-Admin-User':
+ 'act:adm',
+ 'X-Auth-Admin-Key': 'key'}), 'act'))
+
+ def test_is_account_admin_fail_account_admin_different_account(
+ self):
+ self.test_auth.app = FakeApp(iter([
+ ('200 Ok', {},
+ json.dumps({'auth': 'plaintext:key',
+ 'groups': [{'name': 'act2:adm'}, {'name': 'act2'},
+ {'name': '.admin'}]}))]))
+ self.assert_(
+ not self.test_auth.is_account_admin(
+ Request.blank('/',
+ headers={
+ 'X-Auth-Admin-User':
+ 'act2:adm',
+ 'X-Auth-Admin-Key': 'key'}), 'act'))
+
+ def test_is_account_admin_fail_regular_user(self):
+ self.test_auth.app = FakeApp(iter([
+ ('200 Ok', {},
+ json.dumps({'auth': 'plaintext:key',
+ 'groups': [{'name': 'act:usr'}, {'name': 'act'}]}))]))
+ self.assert_(
+ not self.test_auth.is_account_admin(
+ Request.blank('/',
+ headers={
+ 'X-Auth-Admin-User':
+ 'act:usr',
+ 'X-Auth-Admin-Key': 'key'}), 'act'))
+
+ def test_is_account_admin_fail_bad_key(self):
+ self.test_auth.app = FakeApp(iter([
+ ('200 Ok', {},
+ json.dumps({'auth': 'plaintext:key',
+ 'groups': [{'name': 'act:rdm'}, {'name': 'act'},
+ {'name': '.admin'},
+ {'name': '.reseller_admin'}]}))]))
+ self.assert_(
+ not self.test_auth.is_account_admin(
+ Request.blank('/',
+ headers={
+ 'X-Auth-Admin-User':
+ 'act:rdm',
+ 'X-Auth-Admin-Key': 'bad'}), 'act'))
+
+ def test_reseller_admin_but_account_is_internal_use_only(
+ self):
+ req = Request.blank('/v1/AUTH_.auth',
+ environ={'REQUEST_METHOD': 'GET'})
+ req.remote_user = 'act:usr,act,.reseller_admin'
+ resp = self.test_auth.authorize(req)
+ self.assertEquals(resp.status_int, 403)
+
+ def test_reseller_admin_but_account_is_exactly_reseller_prefix(
+ self):
+ req = Request.blank(
+ '/v1/AUTH_',
+ environ={'REQUEST_METHOD': 'GET'})
+ req.remote_user = 'act:usr,act,.reseller_admin'
+ resp = self.test_auth.authorize(req)
+ self.assertEquals(resp.status_int, 403)
+
+ def _get_token_success_v1_0_encoded(
+ self, saved_user, saved_key, sent_user,
+ sent_key):
+ self.test_auth.app = FakeApp(iter([
+ # GET of user object
+ ('200 Ok', {},
+ json.dumps({"auth": "plaintext:%s" % saved_key,
+ "groups": [{'name': saved_user}, {'name': "act"},
+ {'name': ".admin"}]})),
+ # GET of account
+ ('204 Ok',
+ {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, ''),
+ # PUT of new token
+ ('201 Created', {}, ''),
+ # POST of token to user object
+ ('204 No Content', {}, ''),
+ # GET of services object
+ ('200 Ok', {}, json.dumps({"storage": {"default": "local",
+ "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}}))]))
+ resp = Request.blank(
+ '/auth/v1.0',
+ headers={'X-Auth-User': sent_user,
+ 'X-Auth-Key': sent_key}).get_response(self.test_auth)
+ self.assertEquals(resp.status_int, 200)
+ self.assert_(
+ resp.headers.get('x-auth-token',
+ '').startswith('AUTH_tk'),
+ resp.headers.get('x-auth-token'))
+ self.assertEquals(resp.headers.get('x-auth-token'),
+ resp.headers.get('x-storage-token'))
+ self.assertEquals(resp.headers.get('x-storage-url'),
+ 'http://127.0.0.1:8080/v1/AUTH_cfa')
+ self.assertEquals(
+ json.loads(resp.body),
+ {"storage": {"default": "local",
+ "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}})
+ self.assertEquals(self.test_auth.app.calls, 5)
+
+ def test_get_token_success_v1_0_encoded1(self):
+ self._get_token_success_v1_0_encoded(
+ 'act:usr', 'key', 'act%3ausr', 'key')
+
+ def test_get_token_success_v1_0_encoded2(self):
+ self._get_token_success_v1_0_encoded(
+ 'act:u s r', 'key', 'act%3au%20s%20r', 'key')
+
+ def test_get_token_success_v1_0_encoded3(self):
+ self._get_token_success_v1_0_encoded(
+ 'act:u s r', 'k:e:y', 'act%3au%20s%20r', 'k%3Ae%3ay')
+
+ def test_allowed_sync_hosts(self):
+ a = auth.filter_factory(
+ {'super_admin_key': 'supertest'})(FakeApp())
+ self.assertEquals(
+ a.allowed_sync_hosts,
+ ['127.0.0.1'])
+ a = auth.filter_factory({
+ 'super_admin_key': 'supertest',
+ 'allowed_sync_hosts':
+ '1.1.1.1,2.1.1.1, 3.1.1.1 , 4.1.1.1,, , 5.1.1.1'})(FakeApp())
+ self.assertEquals(
+ a.allowed_sync_hosts,
+ ['1.1.1.1', '2.1.1.1', '3.1.1.1', '4.1.1.1', '5.1.1.1'])
+
+ def test_reseller_admin_is_owner(self):
+ orig_authorize = self.test_auth.authorize
+ owner_values = []
+
+ def mitm_authorize(req):
+ rv = orig_authorize(req)
+ owner_values.append(
+ req.environ.get('swift_owner', False))
+ return rv
+
+ self.test_auth.authorize = mitm_authorize
+
+ self.test_auth.app = FakeApp(iter([
+ ('200 Ok', {},
+ json.dumps(
+ {'account': 'other', 'user': 'other:usr',
+ 'account_id': 'AUTH_other',
+ 'groups': [{'name': 'other:usr'}, {'name': 'other'},
+ {'name': '.reseller_admin'}],
+ 'expires': time() + 60})),
+ ('204 No Content', {}, '')]))
+ req = Request.blank(
+ '/v1/AUTH_cfa',
+ headers={'X-Auth-Token': 'AUTH_t'})
+ resp = req.get_response(self.test_auth)
+ self.assertEquals(resp.status_int, 204)
+ self.assertEquals(owner_values, [True])
+
+ def test_admin_is_owner(self):
+ orig_authorize = self.test_auth.authorize
+ owner_values = []
+
+ def mitm_authorize(req):
+ rv = orig_authorize(req)
+ owner_values.append(
+ req.environ.get('swift_owner', False))
+ return rv
+
+ self.test_auth.authorize = mitm_authorize
+
+ self.test_auth.app = FakeApp(iter([
+ ('200 Ok', {},
+ json.dumps(
+ {'account': 'act', 'user': 'act:usr',
+ 'account_id': 'AUTH_cfa',
+ 'groups': [{'name': 'act:usr'}, {'name': 'act'},
+ {'name': '.admin'}],
+ 'expires': time() + 60})),
+ ('204 No Content', {}, '')]))
+ req = Request.blank(
+ '/v1/AUTH_cfa',
+ headers={'X-Auth-Token': 'AUTH_t'})
+ resp = req.get_response(self.test_auth)
+ self.assertEquals(resp.status_int, 204)
+ self.assertEquals(owner_values, [True])
+
+ def test_regular_is_not_owner(self):
+ orig_authorize = self.test_auth.authorize
+ owner_values = []
+
+ def mitm_authorize(req):
+ rv = orig_authorize(req)
+ owner_values.append(
+ req.environ.get('swift_owner', False))
+ return rv
+
+ self.test_auth.authorize = mitm_authorize
+
+ self.test_auth.app = FakeApp(iter([
+ ('200 Ok', {},
+ json.dumps(
+ {'account': 'act', 'user': 'act:usr',
+ 'account_id': 'AUTH_cfa',
+ 'groups':
+ [{'name': 'act:usr'}, {
+ 'name': 'act'}],
+ 'expires': time() + 60})),
+ ('204 No Content', {}, '')]), acl='act:usr')
+ req = Request.blank('/v1/AUTH_cfa/c',
+ headers={'X-Auth-Token': 'AUTH_t'})
+ resp = req.get_response(self.test_auth)
+ self.assertEquals(resp.status_int, 204)
+ self.assertEquals(owner_values, [False])
+
+ def test_sync_request_success(self):
+ self.test_auth.app = FakeApp(
+ iter([('204 No Content', {}, '')]),
+ sync_key='secret')
+ req = Request.blank('/v1/AUTH_cfa/c/o',
+ environ={
+ 'REQUEST_METHOD': 'DELETE'},
+ headers={
+ 'x-container-sync-key':
+ 'secret',
+ 'x-timestamp': '123.456'})
+ req.remote_addr = '127.0.0.1'
+ resp = req.get_response(self.test_auth)
+ self.assertEquals(resp.status_int, 204)
+
+ def test_sync_request_fail_key(self):
+ self.test_auth.app = FakeApp(
+ iter([('204 No Content', {}, '')]),
+ sync_key='secret')
+ req = Request.blank('/v1/AUTH_cfa/c/o',
+ environ={
+ 'REQUEST_METHOD': 'DELETE'},
+ headers={
+ 'x-container-sync-key':
+ 'wrongsecret',
+ 'x-timestamp': '123.456'})
+ req.remote_addr = '127.0.0.1'
+ resp = req.get_response(self.test_auth)
+ self.assertEquals(resp.status_int, 401)
+
+ self.test_auth.app = FakeApp(
+ iter([('204 No Content', {}, '')]),
+ sync_key='othersecret')
+ req = Request.blank('/v1/AUTH_cfa/c/o',
+ environ={
+ 'REQUEST_METHOD': 'DELETE'},
+ headers={
+ 'x-container-sync-key':
+ 'secret',
+ 'x-timestamp': '123.456'})
+ req.remote_addr = '127.0.0.1'
+ resp = req.get_response(self.test_auth)
+ self.assertEquals(resp.status_int, 401)
+
+ self.test_auth.app = FakeApp(
+ iter([('204 No Content', {}, '')]),
+ sync_key=None)
+ req = Request.blank('/v1/AUTH_cfa/c/o',
+ environ={
+ 'REQUEST_METHOD': 'DELETE'},
+ headers={
+ 'x-container-sync-key':
+ 'secret',
+ 'x-timestamp': '123.456'})
+ req.remote_addr = '127.0.0.1'
+ resp = req.get_response(self.test_auth)
+ self.assertEquals(resp.status_int, 401)
+
+ def test_sync_request_fail_no_timestamp(self):
+ self.test_auth.app = FakeApp(
+ iter([('204 No Content', {}, '')]),
+ sync_key='secret')
+ req = Request.blank('/v1/AUTH_cfa/c/o',
+ environ={
+ 'REQUEST_METHOD': 'DELETE'},
+ headers={'x-container-sync-key': 'secret'})
+ req.remote_addr = '127.0.0.1'
+ resp = req.get_response(self.test_auth)
+ self.assertEquals(resp.status_int, 401)
+
+ def test_sync_request_fail_sync_host(self):
+ self.test_auth.app = FakeApp(
+ iter([('204 No Content', {}, '')]),
+ sync_key='secret')
+ req = Request.blank('/v1/AUTH_cfa/c/o',
+ environ={
+ 'REQUEST_METHOD': 'DELETE'},
+ headers={
+ 'x-container-sync-key':
+ 'secret',
+ 'x-timestamp': '123.456'})
+ req.remote_addr = '127.0.0.2'
+ resp = req.get_response(self.test_auth)
+ self.assertEquals(resp.status_int, 401)
+
+ def test_sync_request_success_lb_sync_host(self):
+ self.test_auth.app = FakeApp(
+ iter([('204 No Content', {}, '')]),
+ sync_key='secret')
+ req = Request.blank('/v1/AUTH_cfa/c/o',
+ environ={
+ 'REQUEST_METHOD': 'DELETE'},
+ headers={
+ 'x-container-sync-key':
+ 'secret',
+ 'x-timestamp':
+ '123.456',
+ 'x-forwarded-for': '127.0.0.1'})
+ req.remote_addr = '127.0.0.2'
+ resp = req.get_response(self.test_auth)
+ self.assertEquals(resp.status_int, 204)
+
+ self.test_auth.app = FakeApp(
+ iter([('204 No Content', {}, '')]),
+ sync_key='secret')
+ req = Request.blank('/v1/AUTH_cfa/c/o',
+ environ={
+ 'REQUEST_METHOD': 'DELETE'},
+ headers={
+ 'x-container-sync-key':
+ 'secret',
+ 'x-timestamp':
+ '123.456',
+ 'x-cluster-client-ip': '127.0.0.1'})
+ req.remote_addr = '127.0.0.2'
+ resp = req.get_response(self.test_auth)
+ self.assertEquals(resp.status_int, 204)
+
+ def _make_request(self, path, **kwargs):
+ req = Request.blank(path, **kwargs)
+ req.environ['swift.cache'] = FakeMemcache()
+ return req
+
+ def test_override_asked_for_but_not_allowed(self):
+ self.test_auth = \
+ auth.filter_factory(
+ {'allow_overrides': 'false'})(FakeApp())
+ req = self._make_request('/v1/AUTH_account',
+ environ={'swift.authorize_override': True})
+ resp = req.get_response(self.test_auth)
+ self.assertEquals(resp.status_int, 401)
+ self.assertEquals(resp.environ['swift.authorize'],
+ self.test_auth.authorize)
+
+ def test_override_asked_for_and_allowed(self):
+ self.test_auth = \
+ auth.filter_factory(
+ {'allow_overrides': 'true'})(FakeApp())
+ req = self._make_request('/v1/AUTH_account',
+ environ={'swift.authorize_override': True})
+ resp = req.get_response(self.test_auth)
+ self.assertEquals(resp.status_int, 404)
+ self.assertTrue(
+ 'swift.authorize' not in resp.environ)
+
+ def test_override_default_allowed(self):
+ req = self._make_request('/v1/AUTH_account',
+ environ={'swift.authorize_override': True})
+ resp = req.get_response(self.test_auth)
+ self.assertEquals(resp.status_int, 404)
+ self.assertTrue(
+ 'swift.authorize' not in resp.environ)
+
+ def test_token_too_long(self):
+ req = self._make_request('/v1/AUTH_account', headers={
+ 'x-auth-token': 'a' * MAX_TOKEN_LENGTH})
+ resp = req.get_response(self.test_auth)
+ self.assertEquals(resp.status_int, 401)
+ self.assertNotEquals(
+ resp.body,
+ 'Token exceeds maximum length.')
+ req = self._make_request('/v1/AUTH_account', headers={
+ 'x-auth-token': 'a' * (MAX_TOKEN_LENGTH + 1)})
+ resp = req.get_response(self.test_auth)
+ self.assertEquals(resp.status_int, 400)
+ self.assertEquals(
+ resp.body,
+ 'Token exceeds maximum length.')
+
+ def test_crazy_authorization(self):
+ req = self._make_request('/v1/AUTH_account', headers={
+ 'authorization': 'somebody elses header value'})
+ resp = req.get_response(self.test_auth)
+ self.assertEquals(resp.status_int, 401)
+ self.assertEquals(resp.environ['swift.authorize'],
+ self.test_auth.denied_response)
+
+
+if __name__ == '__main__':
+ unittest.main()