summaryrefslogtreecommitdiffstats
path: root/gluster/swift/common/middleware
diff options
context:
space:
mode:
authorThiago da Silva <thiago@redhat.com>2014-04-22 14:15:02 -0400
committerPrashanth Pai <ppai@redhat.com>2016-01-06 07:53:12 -0800
commit2a8f9f0f530327039c32e444b6a27130b12666bd (patch)
treee24e38b5b3c0245a0acafc63fc50bacbf7de718a /gluster/swift/common/middleware
parent4c6ca1db931377b75583f61a7bca262cfc27b0fa (diff)
Update repo
This is a squashed commit imported from this repo: https://github.com/openstack/swiftonfile/tree/icehouse Contains the follwing commits from above mentioned repo: eb50236 Merge "Backport: Fix metadata overall limits bug" into icehouse 79ea52a Backport: Fix metadata overall limits bug bc43f0b Fix inconsistent data being returned on GET ad0bb79 Import HTTPBadRequest from swift's module 74d02e6 Exclude .trashcan dir from container listing b2dbc15 Catch ESTALE in addition to ENOENT 8d60b48 Properly handle read_metadata() exceptions 6762fc6 Fix object server leaking file descriptors 2842e82 Fix API incompatibility in update_metadata() 2beeef6 Merge "Remove swiftkerbauth code" into icehouse 93dbcb5 Update object-expirer.conf with explanations c9d2f09 Merge "Check if /etc/swift exists in ring builder" into icehouse d66c14c Remove swiftkerbauth code 3142ed2 Add object expiration functests 97153d1 Merge "Cleanup functest and undo old patch" into icehouse bc234d0 Remove old travis config file and fix typo 260c8ef Check if /etc/swift exists in ring builder 637dac9 Cleanup functest and undo old patch 051e068 Merge pull request #35 from prashanthpai/backport-1 be104a3 Merge pull request #36 from prashanthpai/backport-2 ff76f42 fix issue with GET on large object (icehouse-backport) 04d0a99 Fix unlink call after successful rename 4c6ca1d updating README file with project name change 10b2680 Merge pull request #18 from thiagol11/icehouse 5bcab8f Updating version on __init__ file 5c2cba2 Merge pull request #15 from thiagol11/update_spec 52b00a8 updating spec file to add dependency on swift icehouse ae7c93b Merge pull request #6 from prashanthpai/rebase 191e55b Revert: allow non-root user to run functests cb7e968 Modify unit tests and func tests d23fd1b Sync with OpenStack Swift v1.13.1 b6d1671 Merge pull request #12 from pushpesh/functionalnosetestremove 962622b Merge pull request #8 from thiagol11/update_readme 4560857 Merge pull request #9 from prashanthpai/spec-expirer be0ae7e Minor update 65000f1 Removing functionalnosetests 8ab1069 Fix object-expirer.conf-gluster RPM build error afee30f added new support filesystem section 527b01f updated README.md to Swift-On-File 9a240c7 Merge pull request #3 from thiagol11/add_jenkins_to_travis 34b5a8b removing blank lines 3568b64 fixing missing fi d8f5b0f adding support to run jenkins triggered by travis 6f4a88c Removing functionalnosetests 8041944 Update README.md c015148 Merge pull request #2 from thiagol11/master 3ddd952 fixing travis file to run correct unit test c582669 adding travis status badge to README 8093096 adding py26 unit testing to travis 37835fd trigger travis build cb6332a adding travis ci testing All tests have been run sucessfully against this. tox -e p2p8,py27,functest Change-Id: I096b611da852d3eb3913844034b443b8272c2ac4 Signed-off-by: Prashanth Pai <ppai@redhat.com> Reviewed-on: http://review.gluster.org/13188
Diffstat (limited to 'gluster/swift/common/middleware')
-rw-r--r--gluster/swift/common/middleware/gswauth/swauth/middleware.py33
-rw-r--r--gluster/swift/common/middleware/gswauth/test_swauth/__init__.py6
-rw-r--r--gluster/swift/common/middleware/gswauth/test_swauth/unit/__init__.py0
-rw-r--r--gluster/swift/common/middleware/gswauth/test_swauth/unit/test_authtypes.py63
-rw-r--r--gluster/swift/common/middleware/gswauth/test_swauth/unit/test_middleware.py4519
-rw-r--r--gluster/swift/common/middleware/swiftkerbauth/__init__.py38
-rw-r--r--gluster/swift/common/middleware/swiftkerbauth/apachekerbauth/etc/httpd/conf.d/swift-auth.conf12
-rwxr-xr-xgluster/swift/common/middleware/swiftkerbauth/apachekerbauth/var/www/cgi-bin/swift-auth70
-rw-r--r--gluster/swift/common/middleware/swiftkerbauth/kerbauth.py463
-rw-r--r--gluster/swift/common/middleware/swiftkerbauth/kerbauth_utils.py137
10 files changed, 17 insertions, 5324 deletions
diff --git a/gluster/swift/common/middleware/gswauth/swauth/middleware.py b/gluster/swift/common/middleware/gswauth/swauth/middleware.py
index 314eedb..cdcc638 100644
--- a/gluster/swift/common/middleware/gswauth/swauth/middleware.py
+++ b/gluster/swift/common/middleware/gswauth/swauth/middleware.py
@@ -241,7 +241,7 @@ class Swauth(object):
version, rest = split_path(env.get('PATH_INFO', ''),
1, 2, True)
except ValueError:
- version, rest = None, None
+ rest = None
if rest and rest.startswith(self.reseller_prefix):
# Handle anonymous access to accounts I'm the definitive
# auth for.
@@ -693,7 +693,7 @@ class Swauth(object):
return HTTPBadRequest(request=req)
try:
new_services = json.loads(req.body)
- except ValueError, err:
+ except ValueError as err:
return HTTPBadRequest(body=str(err))
# Get the current services information
path = quote('/v1/%s/%s/.services' % (self.auth_account, account))
@@ -1405,7 +1405,7 @@ class Swauth(object):
memcache_key,
(self.itoken_expires,
'%s,.reseller_admin,%s' % (self.metadata_volume,
- self.auth_account)),
+ self.auth_account)),
timeout=self.token_life)
return self.itoken
@@ -1589,19 +1589,20 @@ class Swauth(object):
if getattr(req, 'client_disconnect', False) or \
getattr(response, 'client_disconnect', False):
status_int = 499
- self.logger.info(
- ' '.join(quote(str(x)) for x in (client or '-',
- req.remote_addr or '-', strftime('%d/%b/%Y/%H/%M/%S', gmtime()),
- req.method, the_request, req.environ['SERVER_PROTOCOL'],
- status_int, req.referer or '-', req.user_agent or '-',
- req.headers.get(
- 'x-auth-token',
- req.headers.get('x-auth-admin-user', '-')),
- getattr(req, 'bytes_transferred', 0) or '-',
- getattr(response, 'bytes_transferred', 0) or '-',
- req.headers.get('etag', '-'),
- req.headers.get('x-trans-id', '-'), logged_headers or '-',
- trans_time)))
+ self.logger.info(' '.join(quote(str(x)) for x in
+ (client or '-',
+ req.remote_addr or '-', strftime('%d/%b/%Y/%H/%M/%S',
+ gmtime()),
+ req.method, the_request,
+ req.environ['SERVER_PROTOCOL'], status_int,
+ req.referer or '-', req.user_agent or '-',
+ req.headers.get('x-auth-token', req.headers.get(
+ 'x-auth-admin-user', '-')),
+ getattr(req, 'bytes_transferred', 0) or '-',
+ getattr(response, 'bytes_transferred', 0) or '-',
+ req.headers.get('etag', '-'),
+ req.headers.get('x-trans-id', '-'), logged_headers
+ or '-', trans_time)))
def filter_factory(global_conf, **local_conf):
diff --git a/gluster/swift/common/middleware/gswauth/test_swauth/__init__.py b/gluster/swift/common/middleware/gswauth/test_swauth/__init__.py
deleted file mode 100644
index f53bc5a..0000000
--- a/gluster/swift/common/middleware/gswauth/test_swauth/__init__.py
+++ /dev/null
@@ -1,6 +0,0 @@
-# See http://code.google.com/p/python-nose/issues/detail?id=373
-# The code below enables nosetests to work with i18n _() blocks
-
-import __builtin__
-
-setattr(__builtin__, '_', lambda x: x)
diff --git a/gluster/swift/common/middleware/gswauth/test_swauth/unit/__init__.py b/gluster/swift/common/middleware/gswauth/test_swauth/unit/__init__.py
deleted file mode 100644
index e69de29..0000000
--- a/gluster/swift/common/middleware/gswauth/test_swauth/unit/__init__.py
+++ /dev/null
diff --git a/gluster/swift/common/middleware/gswauth/test_swauth/unit/test_authtypes.py b/gluster/swift/common/middleware/gswauth/test_swauth/unit/test_authtypes.py
deleted file mode 100644
index d9b7b55..0000000
--- a/gluster/swift/common/middleware/gswauth/test_swauth/unit/test_authtypes.py
+++ /dev/null
@@ -1,63 +0,0 @@
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
-# implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-# Pablo Llopis 2011
-
-import unittest
-from swauth import authtypes
-
-
-class TestPlaintext(unittest.TestCase):
-
- def setUp(self):
- self.auth_encoder = authtypes.Plaintext()
-
- def test_plaintext_encode(self):
- enc_key = self.auth_encoder.encode('keystring')
- self.assertEquals('plaintext:keystring', enc_key)
-
- def test_plaintext_valid_match(self):
- creds = 'plaintext:keystring'
- match = self.auth_encoder.match('keystring', creds)
- self.assertEquals(match, True)
-
- def test_plaintext_invalid_match(self):
- creds = 'plaintext:other-keystring'
- match = self.auth_encoder.match('keystring', creds)
- self.assertEquals(match, False)
-
-
-class TestSha1(unittest.TestCase):
-
- def setUp(self):
- self.auth_encoder = authtypes.Sha1()
- self.auth_encoder.salt = 'salt'
-
- def test_sha1_encode(self):
- enc_key = self.auth_encoder.encode('keystring')
- self.assertEquals('sha1:salt$d50dc700c296e23ce5b41f7431a0e01f69010f06',
- enc_key)
-
- def test_sha1_valid_match(self):
- creds = 'sha1:salt$d50dc700c296e23ce5b41f7431a0e01f69010f06'
- match = self.auth_encoder.match('keystring', creds)
- self.assertEquals(match, True)
-
- def test_sha1_invalid_match(self):
- creds = 'sha1:salt$deadbabedeadbabedeadbabec0ffeebadc0ffeee'
- match = self.auth_encoder.match('keystring', creds)
- self.assertEquals(match, False)
-
-
-if __name__ == '__main__':
- unittest.main()
diff --git a/gluster/swift/common/middleware/gswauth/test_swauth/unit/test_middleware.py b/gluster/swift/common/middleware/gswauth/test_swauth/unit/test_middleware.py
deleted file mode 100644
index 62259ff..0000000
--- a/gluster/swift/common/middleware/gswauth/test_swauth/unit/test_middleware.py
+++ /dev/null
@@ -1,4519 +0,0 @@
-# Copyright (c) 2010-2011 OpenStack, LLC.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
-# implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-try:
- import simplejson as json
-except ImportError:
- import json
-import unittest
-from contextlib import contextmanager
-from time import time
-
-from swift.common.swob import Request, Response
-
-from swauth import middleware as auth
-from swauth.authtypes import MAX_TOKEN_LENGTH
-
-
-DEFAULT_TOKEN_LIFE = 86400
-MAX_TOKEN_LIFE = 100000
-
-
-class FakeMemcache(object):
-
- def __init__(self):
- self.store = {}
-
- def get(self, key):
- return self.store.get(key)
-
- def set(self, key, value, timeout=0, time=0):
- self.store[key] = value
- return True
-
- def incr(self, key, timeout=0, time=0):
- self.store[key] = self.store.setdefault(key, 0) + 1
- return self.store[key]
-
- @contextmanager
- def soft_lock(self, key, timeout=0, retries=5, time=0):
- yield True
-
- def delete(self, key):
- try:
- del self.store[key]
- except Exception:
- pass
- return True
-
-
-class FakeApp(object):
-
- def __init__(
- self, status_headers_body_iter=None, acl=None, sync_key=None):
- self.calls = 0
- self.status_headers_body_iter = status_headers_body_iter
- if not self.status_headers_body_iter:
- self.status_headers_body_iter = iter(
- [('404 Not Found', {}, '')])
- self.acl = acl
- self.sync_key = sync_key
-
- def __call__(self, env, start_response):
- self.calls += 1
- self.request = Request.blank('', environ=env)
- if self.acl:
- self.request.acl = self.acl
- if self.sync_key:
- self.request.environ[
- 'swift_sync_key'] = self.sync_key
- if 'swift.authorize' in env:
- resp = env['swift.authorize'](self.request)
- if resp:
- return resp(env, start_response)
- status, headers, body = self.status_headers_body_iter.next(
- )
- return Response(status=status, headers=headers,
- body=body)(env, start_response)
-
-
-class FakeConn(object):
-
- def __init__(self, status_headers_body_iter=None):
- self.calls = 0
- self.status_headers_body_iter = status_headers_body_iter
- if not self.status_headers_body_iter:
- self.status_headers_body_iter = iter(
- [('404 Not Found', {}, '')])
-
- def request(self, method, path, headers):
- self.calls += 1
- self.request_path = path
- self.status, self.headers, self.body = \
- self.status_headers_body_iter.next()
- self.status, self.reason = self.status.split(' ', 1)
- self.status = int(self.status)
-
- def getresponse(self):
- return self
-
- def read(self):
- body = self.body
- self.body = ''
- return body
-
-
-class TestAuth(unittest.TestCase):
-
- def setUp(self):
- self.test_auth = \
- auth.filter_factory({
- 'super_admin_key': 'supertest',
- 'token_life': str(DEFAULT_TOKEN_LIFE),
- 'max_token_life': str(MAX_TOKEN_LIFE)})(FakeApp())
-
- def test_super_admin_key_not_required(self):
- auth.filter_factory({})(FakeApp())
-
- def test_reseller_prefix_init(self):
- app = FakeApp()
- ath = auth.filter_factory(
- {'super_admin_key': 'supertest'})(app)
- self.assertEquals(ath.reseller_prefix, 'AUTH_')
- ath = auth.filter_factory(
- {'super_admin_key': 'supertest',
- 'reseller_prefix': 'TEST'})(app)
- self.assertEquals(ath.reseller_prefix, 'TEST_')
- ath = auth.filter_factory(
- {'super_admin_key': 'supertest',
- 'reseller_prefix': 'TEST_'})(app)
- self.assertEquals(ath.reseller_prefix, 'TEST_')
-
- def test_auth_prefix_init(self):
- app = FakeApp()
- ath = auth.filter_factory(
- {'super_admin_key': 'supertest'})(app)
- self.assertEquals(ath.auth_prefix, '/auth/')
- ath = auth.filter_factory(
- {'super_admin_key': 'supertest',
- 'auth_prefix': ''})(app)
- self.assertEquals(ath.auth_prefix, '/auth/')
- ath = auth.filter_factory(
- {'super_admin_key': 'supertest',
- 'auth_prefix': '/test/'})(app)
- self.assertEquals(ath.auth_prefix, '/test/')
- ath = auth.filter_factory(
- {'super_admin_key': 'supertest',
- 'auth_prefix': '/test'})(app)
- self.assertEquals(ath.auth_prefix, '/test/')
- ath = auth.filter_factory(
- {'super_admin_key': 'supertest',
- 'auth_prefix': 'test/'})(app)
- self.assertEquals(ath.auth_prefix, '/test/')
- ath = auth.filter_factory(
- {'super_admin_key': 'supertest',
- 'auth_prefix': 'test'})(app)
- self.assertEquals(ath.auth_prefix, '/test/')
-
- def test_no_auth_type_init(self):
- app = FakeApp()
- ath = auth.filter_factory({})(app)
- self.assertEquals(ath.auth_type, 'Plaintext')
-
- def test_valid_auth_type_init(self):
- app = FakeApp()
- ath = auth.filter_factory(
- {'auth_type': 'sha1'})(app)
- self.assertEquals(ath.auth_type, 'Sha1')
- ath = auth.filter_factory(
- {'auth_type': 'plaintext'})(app)
- self.assertEquals(ath.auth_type, 'Plaintext')
-
- def test_invalid_auth_type_init(self):
- app = FakeApp()
- exc = None
- try:
- auth.filter_factory(
- {'auth_type': 'NONEXISTANT'})(app)
- except Exception as err:
- exc = err
- self.assertEquals(str(exc),
- 'Invalid auth_type in config file: %s' %
- 'Nonexistant')
-
- def test_default_swift_cluster_init(self):
- app = FakeApp()
- self.assertRaises(Exception, auth.filter_factory({
- 'super_admin_key': 'supertest',
- 'default_swift_cluster': 'local#badscheme://host/path'}), app)
- ath = auth.filter_factory(
- {'super_admin_key': 'supertest'})(app)
- self.assertEquals(ath.default_swift_cluster,
- 'local#http://127.0.0.1:8080/v1')
- ath = auth.filter_factory({
- 'super_admin_key': 'supertest',
- 'default_swift_cluster': 'local#http://host/path'})(app)
- self.assertEquals(ath.default_swift_cluster,
- 'local#http://host/path')
- ath = auth.filter_factory({
- 'super_admin_key': 'supertest',
- 'default_swift_cluster': 'local#https://host/path/'})(app)
- self.assertEquals(ath.dsc_url, 'https://host/path')
- self.assertEquals(ath.dsc_url2, 'https://host/path')
- ath = auth.filter_factory({
- 'super_admin_key': 'supertest',
- 'default_swift_cluster':
- 'local#https://host/path/#http://host2/path2/'})(app)
- self.assertEquals(ath.dsc_url, 'https://host/path')
- self.assertEquals(
- ath.dsc_url2,
- 'http://host2/path2')
-
- def test_top_level_denied(self):
- resp = Request.blank(
- '/').get_response(self.test_auth)
- self.assertEquals(resp.status_int, 401)
-
- def test_anon(self):
- resp = Request.blank(
- '/v1/AUTH_account').get_response(self.test_auth)
- self.assertEquals(resp.status_int, 401)
- self.assertEquals(resp.environ['swift.authorize'],
- self.test_auth.authorize)
-
- def test_auth_deny_non_reseller_prefix(self):
- resp = Request.blank(
- '/v1/BLAH_account',
- headers={'X-Auth-Token': 'BLAH_t'}).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 401)
- self.assertEquals(resp.environ['swift.authorize'],
- self.test_auth.denied_response)
-
- def test_auth_deny_non_reseller_prefix_no_override(
- self):
- fake_authorize = lambda x: Response(
- status='500 Fake')
- resp = Request.blank(
- '/v1/BLAH_account',
- headers={'X-Auth-Token': 'BLAH_t'},
- environ={'swift.authorize': fake_authorize}).get_response(
- self.test_auth)
- self.assertEquals(resp.status_int, 500)
- self.assertEquals(resp.environ['swift.authorize'], fake_authorize)
-
- def test_auth_no_reseller_prefix_deny(self):
- # Ensures that when we have no reseller prefix, we don't deny a request
- # outright but set up a denial swift.authorize and pass the request on
- # down the chain.
- local_app = FakeApp()
- local_auth = auth.filter_factory(
- {'super_admin_key': 'supertest',
- 'reseller_prefix': ''})(local_app)
- resp = Request.blank(
- '/v1/account',
- headers={'X-Auth-Token': 't'}).get_response(local_auth)
- self.assertEquals(resp.status_int, 401)
- # one for checking auth, two for request passed
- # along
- self.assertEquals(local_app.calls, 2)
- self.assertEquals(resp.environ['swift.authorize'],
- local_auth.denied_response)
-
- def test_auth_no_reseller_prefix_allow(self):
- # Ensures that when we have no reseller prefix, we can still allow
- # access if our auth server accepts requests
- local_app = FakeApp(iter([
- ('200 Ok', {},
- json.dumps(
- {'account': 'act', 'user': 'act:usr',
- 'account_id': 'AUTH_cfa',
- 'groups': [{'name': 'act:usr'}, {'name': 'act'},
- {'name': '.admin'}],
- 'expires': time() + 60})),
- ('204 No Content', {}, '')]))
- local_auth = auth.filter_factory(
- {'super_admin_key': 'supertest',
- 'reseller_prefix': ''})(local_app)
- resp = Request.blank(
- '/v1/act',
- headers={'X-Auth-Token': 't'}).get_response(local_auth)
- self.assertEquals(resp.status_int, 204)
- self.assertEquals(local_app.calls, 2)
- self.assertEquals(resp.environ['swift.authorize'],
- local_auth.authorize)
-
- def test_auth_no_reseller_prefix_no_token(self):
- # Check that normally we set up a call back to our
- # authorize.
- local_auth = \
- auth.filter_factory(
- {'super_admin_key': 'supertest',
- 'reseller_prefix': ''})(FakeApp(iter([])))
- resp = Request.blank(
- '/v1/account').get_response(
- local_auth)
- self.assertEquals(resp.status_int, 401)
- self.assertEquals(
- resp.environ['swift.authorize'], local_auth.authorize)
- # Now make sure we don't override an existing swift.authorize when we
- # have no reseller prefix.
- local_auth = \
- auth.filter_factory(
- {'super_admin_key': 'supertest',
- 'reseller_prefix': ''})(FakeApp())
- local_authorize = lambda req: Response('test')
- resp = Request.blank(
- '/v1/account', environ={'swift.authorize':
- local_authorize}).get_response(local_auth)
- self.assertEquals(resp.status_int, 200)
- self.assertEquals(
- resp.environ['swift.authorize'],
- local_authorize)
-
- def test_auth_fail(self):
- resp = Request.blank(
- '/v1/AUTH_cfa',
- headers={'X-Auth-Token': 'AUTH_t'}).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 401)
-
- def test_auth_success(self):
- self.test_auth.app = FakeApp(iter([
- ('200 Ok', {},
- json.dumps(
- {'account': 'act', 'user': 'act:usr',
- 'account_id': 'AUTH_cfa',
- 'groups': [{'name': 'act:usr'}, {'name': 'act'},
- {'name': '.admin'}],
- 'expires': time() + 60})),
- ('204 No Content', {}, '')]))
- resp = Request.blank(
- '/v1/AUTH_cfa',
- headers={'X-Auth-Token': 'AUTH_t'}).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 204)
- self.assertEquals(self.test_auth.app.calls, 2)
-
- def test_auth_memcache(self):
- # First run our test without memcache, showing we need to return the
- # token contents twice.
- self.test_auth.app = FakeApp(iter([
- ('200 Ok', {},
- json.dumps(
- {'account': 'act', 'user': 'act:usr',
- 'account_id': 'AUTH_cfa',
- 'groups': [{'name': 'act:usr'}, {'name': 'act'},
- {'name': '.admin'}],
- 'expires': time() + 60})),
- ('204 No Content', {}, ''),
- ('200 Ok', {},
- json.dumps(
- {'account': 'act', 'user': 'act:usr',
- 'account_id': 'AUTH_cfa',
- 'groups': [{'name': 'act:usr'}, {'name': 'act'},
- {'name': '.admin'}],
- 'expires': time() + 60})),
- ('204 No Content', {}, '')]))
- resp = Request.blank(
- '/v1/AUTH_cfa',
- headers={'X-Auth-Token': 'AUTH_t'}).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 204)
- resp = Request.blank(
- '/v1/AUTH_cfa',
- headers={'X-Auth-Token': 'AUTH_t'}).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 204)
- self.assertEquals(self.test_auth.app.calls, 4)
- # Now run our test with memcache, showing we no longer need to return
- # the token contents twice.
- self.test_auth.app = FakeApp(iter([
- ('200 Ok', {},
- json.dumps(
- {'account': 'act', 'user': 'act:usr',
- 'account_id': 'AUTH_cfa',
- 'groups': [{'name': 'act:usr'}, {'name': 'act'},
- {'name': '.admin'}],
- 'expires': time() + 60})),
- ('204 No Content', {}, ''),
- # Don't need a second token object returned if memcache is
- # used
- ('204 No Content', {}, '')]))
- fake_memcache = FakeMemcache()
- resp = Request.blank(
- '/v1/AUTH_cfa',
- headers={'X-Auth-Token': 'AUTH_t'},
- environ={'swift.cache': fake_memcache}).get_response(
- self.test_auth)
- self.assertEquals(resp.status_int, 204)
- resp = Request.blank(
- '/v1/AUTH_cfa',
- headers={'X-Auth-Token': 'AUTH_t'},
- environ={'swift.cache': fake_memcache}).get_response(
- self.test_auth)
- self.assertEquals(resp.status_int, 204)
- self.assertEquals(self.test_auth.app.calls, 3)
-
- def test_auth_just_expired(self):
- self.test_auth.app = FakeApp(iter([
- # Request for token (which will have expired)
- ('200 Ok', {},
- json.dumps(
- {'account': 'act', 'user': 'act:usr',
- 'account_id': 'AUTH_cfa',
- 'groups': [{'name': 'act:usr'}, {'name': 'act'},
- {'name': '.admin'}],
- 'expires': time() - 1})),
- # Request to delete token
- ('204 No Content', {}, '')]))
- resp = Request.blank(
- '/v1/AUTH_cfa',
- headers={'X-Auth-Token': 'AUTH_t'}).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 401)
- self.assertEquals(self.test_auth.app.calls, 2)
-
- def test_middleware_storage_token(self):
- self.test_auth.app = FakeApp(iter([
- ('200 Ok', {},
- json.dumps(
- {'account': 'act', 'user': 'act:usr',
- 'account_id': 'AUTH_cfa',
- 'groups': [{'name': 'act:usr'}, {'name': 'act'},
- {'name': '.admin'}],
- 'expires': time() + 60})),
- ('204 No Content', {}, '')]))
- resp = Request.blank(
- '/v1/AUTH_cfa',
- headers={'X-Storage-Token': 'AUTH_t'}).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 204)
- self.assertEquals(self.test_auth.app.calls, 2)
-
- def test_authorize_bad_path(self):
- req = Request.blank('/badpath')
- resp = self.test_auth.authorize(req)
- self.assertEquals(resp.status_int, 401)
- req = Request.blank('/badpath')
- req.remote_user = 'act:usr,act,AUTH_cfa'
- resp = self.test_auth.authorize(req)
- self.assertEquals(resp.status_int, 403)
-
- def test_authorize_account_access(self):
- req = Request.blank('/v1/AUTH_cfa')
- req.remote_user = 'act:usr,act,AUTH_cfa'
- self.assertEquals(
- self.test_auth.authorize(req),
- None)
- req = Request.blank('/v1/AUTH_cfa')
- req.remote_user = 'act:usr,act'
- resp = self.test_auth.authorize(req)
- self.assertEquals(resp.status_int, 403)
-
- def test_authorize_acl_group_access(self):
- req = Request.blank('/v1/AUTH_cfa')
- req.remote_user = 'act:usr,act'
- resp = self.test_auth.authorize(req)
- self.assertEquals(resp.status_int, 403)
- req = Request.blank('/v1/AUTH_cfa')
- req.remote_user = 'act:usr,act'
- req.acl = 'act'
- self.assertEquals(
- self.test_auth.authorize(req),
- None)
- req = Request.blank('/v1/AUTH_cfa')
- req.remote_user = 'act:usr,act'
- req.acl = 'act:usr'
- self.assertEquals(
- self.test_auth.authorize(req),
- None)
- req = Request.blank('/v1/AUTH_cfa')
- req.remote_user = 'act:usr,act'
- req.acl = 'act2'
- resp = self.test_auth.authorize(req)
- self.assertEquals(resp.status_int, 403)
- req = Request.blank('/v1/AUTH_cfa')
- req.remote_user = 'act:usr,act'
- req.acl = 'act:usr2'
- resp = self.test_auth.authorize(req)
- self.assertEquals(resp.status_int, 403)
-
- def test_deny_cross_reseller(self):
- # Tests that cross-reseller is denied, even if ACLs/group
- # names match
- req = Request.blank('/v1/OTHER_cfa')
- req.remote_user = 'act:usr,act,AUTH_cfa'
- req.acl = 'act'
- resp = self.test_auth.authorize(req)
- self.assertEquals(resp.status_int, 403)
-
- def test_authorize_acl_referrer_access(self):
- req = Request.blank('/v1/AUTH_cfa/c')
- req.remote_user = 'act:usr,act'
- resp = self.test_auth.authorize(req)
- self.assertEquals(resp.status_int, 403)
- req = Request.blank('/v1/AUTH_cfa/c')
- req.remote_user = 'act:usr,act'
- req.acl = '.r:*,.rlistings'
- self.assertEquals(
- self.test_auth.authorize(req),
- None)
- req = Request.blank('/v1/AUTH_cfa/c')
- req.remote_user = 'act:usr,act'
- req.acl = '.r:*' # No listings allowed
- resp = self.test_auth.authorize(req)
- self.assertEquals(resp.status_int, 403)
- req = Request.blank('/v1/AUTH_cfa/c')
- req.remote_user = 'act:usr,act'
- req.acl = '.r:.example.com,.rlistings'
- resp = self.test_auth.authorize(req)
- self.assertEquals(resp.status_int, 403)
- req = Request.blank('/v1/AUTH_cfa/c')
- req.remote_user = 'act:usr,act'
- req.referer = 'http://www.example.com/index.html'
- req.acl = '.r:.example.com,.rlistings'
- self.assertEquals(
- self.test_auth.authorize(req),
- None)
- req = Request.blank('/v1/AUTH_cfa/c')
- resp = self.test_auth.authorize(req)
- self.assertEquals(resp.status_int, 401)
- req = Request.blank('/v1/AUTH_cfa/c')
- req.acl = '.r:*,.rlistings'
- self.assertEquals(
- self.test_auth.authorize(req),
- None)
- req = Request.blank('/v1/AUTH_cfa/c')
- req.acl = '.r:*' # No listings allowed
- resp = self.test_auth.authorize(req)
- self.assertEquals(resp.status_int, 401)
- req = Request.blank('/v1/AUTH_cfa/c')
- req.acl = '.r:.example.com,.rlistings'
- resp = self.test_auth.authorize(req)
- self.assertEquals(resp.status_int, 401)
- req = Request.blank('/v1/AUTH_cfa/c')
- req.referer = 'http://www.example.com/index.html'
- req.acl = '.r:.example.com,.rlistings'
- self.assertEquals(
- self.test_auth.authorize(req),
- None)
-
- def test_detect_reseller_request(self):
- req = self._make_request('/v1/AUTH_admin',
- headers={'X-Auth-Token': 'AUTH_t'})
- cache_key = 'AUTH_/auth/AUTH_t'
- cache_entry = (time() + 3600, '.reseller_admin')
- req.environ['swift.cache'].set(
- cache_key, cache_entry)
- self.assertTrue(req.environ.get('reseller_request'))
-
- def test_account_put_permissions(self):
- req = Request.blank(
- '/v1/AUTH_new',
- environ={'REQUEST_METHOD': 'PUT'})
- req.remote_user = 'act:usr,act'
- resp = self.test_auth.authorize(req)
- self.assertEquals(resp.status_int, 403)
-
- req = Request.blank(
- '/v1/AUTH_new',
- environ={'REQUEST_METHOD': 'PUT'})
- req.remote_user = 'act:usr,act,AUTH_other'
- resp = self.test_auth.authorize(req)
- self.assertEquals(resp.status_int, 403)
-
- # Even PUTs to your own account as account admin
- # should fail
- req = Request.blank(
- '/v1/AUTH_old',
- environ={'REQUEST_METHOD': 'PUT'})
- req.remote_user = 'act:usr,act,AUTH_old'
- resp = self.test_auth.authorize(req)
- self.assertEquals(resp.status_int, 403)
-
- req = Request.blank(
- '/v1/AUTH_new',
- environ={'REQUEST_METHOD': 'PUT'})
- req.remote_user = 'act:usr,act,.reseller_admin'
- resp = self.test_auth.authorize(req)
- self.assertEquals(resp, None)
-
- # .super_admin is not something the middleware should ever see or care
- # about
- req = Request.blank(
- '/v1/AUTH_new',
- environ={'REQUEST_METHOD': 'PUT'})
- req.remote_user = 'act:usr,act,.super_admin'
- resp = self.test_auth.authorize(req)
- self.assertEquals(resp.status_int, 403)
-
- def test_account_delete_permissions(self):
- req = Request.blank('/v1/AUTH_new',
- environ={'REQUEST_METHOD': 'DELETE'})
- req.remote_user = 'act:usr,act'
- resp = self.test_auth.authorize(req)
- self.assertEquals(resp.status_int, 403)
-
- req = Request.blank('/v1/AUTH_new',
- environ={'REQUEST_METHOD': 'DELETE'})
- req.remote_user = 'act:usr,act,AUTH_other'
- resp = self.test_auth.authorize(req)
- self.assertEquals(resp.status_int, 403)
-
- # Even DELETEs to your own account as account admin should
- # fail
- req = Request.blank('/v1/AUTH_old',
- environ={'REQUEST_METHOD': 'DELETE'})
- req.remote_user = 'act:usr,act,AUTH_old'
- resp = self.test_auth.authorize(req)
- self.assertEquals(resp.status_int, 403)
-
- req = Request.blank('/v1/AUTH_new',
- environ={'REQUEST_METHOD': 'DELETE'})
- req.remote_user = 'act:usr,act,.reseller_admin'
- resp = self.test_auth.authorize(req)
- self.assertEquals(resp, None)
-
- # .super_admin is not something the middleware should ever see or care
- # about
- req = Request.blank('/v1/AUTH_new',
- environ={'REQUEST_METHOD': 'DELETE'})
- req.remote_user = 'act:usr,act,.super_admin'
- resp = self.test_auth.authorize(req)
- resp = self.test_auth.authorize(req)
- self.assertEquals(resp.status_int, 403)
-
- def test_get_token_fail(self):
- resp = Request.blank(
- '/auth/v1.0').get_response(
- self.test_auth)
- self.assertEquals(resp.status_int, 401)
- resp = Request.blank(
- '/auth/v1.0',
- headers={'X-Auth-User': 'act:usr',
- 'X-Auth-Key': 'key'}).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 401)
-
- def test_get_token_fail_invalid_key(self):
- self.test_auth.app = FakeApp(iter([
- # GET of user object
- ('200 Ok', {},
- json.dumps({"auth": "plaintext:key",
- "groups": [{'name': "act:usr"}, {'name': "act"},
- {'name': ".admin"}]}))]))
- resp = Request.blank(
- '/auth/v1.0',
- headers={'X-Auth-User': 'act:usr',
- 'X-Auth-Key': 'invalid'}).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 401)
- self.assertEquals(self.test_auth.app.calls, 1)
-
- def test_get_token_fail_invalid_x_auth_user_format(
- self):
- resp = Request.blank(
- '/auth/v1/act/auth',
- headers={'X-Auth-User': 'usr',
- 'X-Auth-Key': 'key'}).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 401)
-
- def test_get_token_fail_non_matching_account_in_request(
- self):
- resp = Request.blank(
- '/auth/v1/act/auth',
- headers={'X-Auth-User': 'act2:usr',
- 'X-Auth-Key': 'key'}).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 401)
-
- def test_get_token_fail_bad_path(self):
- resp = Request.blank(
- '/auth/v1/act/auth/invalid',
- headers={'X-Auth-User': 'act:usr',
- 'X-Auth-Key': 'key'}).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 400)
-
- def test_get_token_fail_missing_key(self):
- resp = Request.blank(
- '/auth/v1/act/auth',
- headers={'X-Auth-User': 'act:usr'}).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 401)
-
- def test_get_token_fail_get_user_details(self):
- self.test_auth.app = FakeApp(iter([
- ('503 Service Unavailable', {}, '')]))
- resp = Request.blank(
- '/auth/v1.0',
- headers={'X-Auth-User': 'act:usr',
- 'X-Auth-Key': 'key'}).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 500)
- self.assertEquals(self.test_auth.app.calls, 1)
-
- def test_get_token_fail_get_account(self):
- self.test_auth.app = FakeApp(iter([
- # GET of user object
- ('200 Ok', {},
- json.dumps({"auth": "plaintext:key",
- "groups": [{'name': "act:usr"}, {'name': "act"},
- {'name': ".admin"}]})),
- # GET of account
- ('503 Service Unavailable', {}, '')]))
- resp = Request.blank(
- '/auth/v1.0',
- headers={'X-Auth-User': 'act:usr',
- 'X-Auth-Key': 'key'}).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 500)
- self.assertEquals(self.test_auth.app.calls, 2)
-
- def test_get_token_fail_put_new_token(self):
- self.test_auth.app = FakeApp(iter([
- # GET of user object
- ('200 Ok', {},
- json.dumps({"auth": "plaintext:key",
- "groups": [{'name': "act:usr"}, {'name': "act"},
- {'name': ".admin"}]})),
- # GET of account
- ('204 Ok',
- {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, ''),
- # PUT of new token
- ('503 Service Unavailable', {}, '')]))
- resp = Request.blank(
- '/auth/v1.0',
- headers={'X-Auth-User': 'act:usr',
- 'X-Auth-Key': 'key'}).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 500)
- self.assertEquals(self.test_auth.app.calls, 3)
-
- def test_get_token_fail_post_to_user(self):
- self.test_auth.app = FakeApp(iter([
- # GET of user object
- ('200 Ok', {},
- json.dumps({"auth": "plaintext:key",
- "groups": [{'name': "act:usr"}, {'name': "act"},
- {'name': ".admin"}]})),
- # GET of account
- ('204 Ok',
- {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, ''),
- # PUT of new token
- ('201 Created', {}, ''),
- # POST of token to user object
- ('503 Service Unavailable', {}, '')]))
- resp = Request.blank(
- '/auth/v1.0',
- headers={'X-Auth-User': 'act:usr',
- 'X-Auth-Key': 'key'}).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 500)
- self.assertEquals(self.test_auth.app.calls, 4)
-
- def test_get_token_fail_get_services(self):
- self.test_auth.app = FakeApp(iter([
- # GET of user object
- ('200 Ok', {},
- json.dumps({"auth": "plaintext:key",
- "groups": [{'name': "act:usr"}, {'name': "act"},
- {'name': ".admin"}]})),
- # GET of account
- ('204 Ok',
- {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, ''),
- # PUT of new token
- ('201 Created', {}, ''),
- # POST of token to user object
- ('204 No Content', {}, ''),
- # GET of services object
- ('503 Service Unavailable', {}, '')]))
- resp = Request.blank(
- '/auth/v1.0',
- headers={'X-Auth-User': 'act:usr',
- 'X-Auth-Key': 'key'}).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 500)
- self.assertEquals(self.test_auth.app.calls, 5)
-
- def test_get_token_fail_get_existing_token(self):
- self.test_auth.app = FakeApp(iter([
- # GET of user object
- ('200 Ok', {'X-Object-Meta-Auth-Token': 'AUTH_tktest'},
- json.dumps({"auth": "plaintext:key",
- "groups": [{'name': "act:usr"}, {'name': "act"},
- {'name': ".admin"}]})),
- # GET of token
- ('503 Service Unavailable', {}, '')]))
- resp = Request.blank(
- '/auth/v1.0',
- headers={'X-Auth-User': 'act:usr',
- 'X-Auth-Key': 'key'}).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 500)
- self.assertEquals(self.test_auth.app.calls, 2)
-
- def test_get_token_success_v1_0(self):
- self.test_auth.app = FakeApp(iter([
- # GET of user object
- ('200 Ok', {},
- json.dumps({"auth": "plaintext:key",
- "groups": [{'name': "act:usr"}, {'name': "act"},
- {'name': ".admin"}]})),
- # GET of account
- ('204 Ok',
- {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, ''),
- # PUT of new token
- ('201 Created', {}, ''),
- # POST of token to user object
- ('204 No Content', {}, ''),
- # GET of services object
- ('200 Ok', {}, json.dumps({"storage": {"default": "local",
- "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}}))]))
- resp = Request.blank(
- '/auth/v1.0',
- headers={'X-Auth-User': 'act:usr',
- 'X-Auth-Key': 'key'}).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 200)
- self.assert_(resp.headers.get(
- 'x-auth-token',
- '').startswith('AUTH_tk'), resp.headers.get('x-auth-token'))
- self.assertEquals(resp.headers.get('x-auth-token'),
- resp.headers.get('x-storage-token'))
- self.assertEquals(resp.headers.get('x-storage-url'),
- 'http://127.0.0.1:8080/v1/AUTH_cfa')
- self.assertEquals(
- json.loads(resp.body),
- {"storage": {"default": "local",
- "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}})
- self.assertEquals(self.test_auth.app.calls, 5)
-
- def test_get_token_success_v1_0_with_user_token_life(
- self):
- self.test_auth.app = FakeApp(iter([
- # GET of user object
- ('200 Ok', {},
- json.dumps({"auth": "plaintext:key",
- "groups": [{'name': "act:usr"}, {'name': "act"},
- {'name': ".admin"}]})),
- # GET of account
- ('204 Ok',
- {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, ''),
- # PUT of new token
- ('201 Created', {}, ''),
- # POST of token to user object
- ('204 No Content', {}, ''),
- # GET of services object
- ('200 Ok', {}, json.dumps({"storage": {"default": "local",
- "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}}))]))
- resp = Request.blank(
- '/auth/v1.0',
- headers={'X-Auth-User': 'act:usr',
- 'X-Auth-Key': 'key',
- 'X-Auth-Token-Lifetime': 10}).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 200)
- left = int(resp.headers['x-auth-token-expires'])
- self.assertTrue(left > 0, '%d > 0' % left)
- self.assertTrue(left <= 10, '%d <= 10' % left)
- self.assert_(resp.headers.get(
- 'x-auth-token',
- '').startswith('AUTH_tk'), resp.headers.get('x-auth-token'))
- self.assertEquals(resp.headers.get('x-auth-token'),
- resp.headers.get('x-storage-token'))
- self.assertEquals(resp.headers.get('x-storage-url'),
- 'http://127.0.0.1:8080/v1/AUTH_cfa')
- self.assertEquals(
- json.loads(resp.body),
- {"storage": {"default": "local",
- "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}})
- self.assertEquals(self.test_auth.app.calls, 5)
-
- def test_get_token_success_v1_0_with_user_token_life_past_max(
- self):
- self.test_auth.app = FakeApp(iter([
- # GET of user object
- ('200 Ok', {},
- json.dumps({"auth": "plaintext:key",
- "groups": [{'name': "act:usr"}, {'name': "act"},
- {'name': ".admin"}]})),
- # GET of account
- ('204 Ok',
- {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, ''),
- # PUT of new token
- ('201 Created', {}, ''),
- # POST of token to user object
- ('204 No Content', {}, ''),
- # GET of services object
- ('200 Ok', {}, json.dumps({"storage": {"default": "local",
- "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}}))]))
- req = Request.blank(
- '/auth/v1.0',
- headers={'X-Auth-User': 'act:usr',
- 'X-Auth-Key': 'key',
- 'X-Auth-Token-Lifetime': MAX_TOKEN_LIFE * 10})
- resp = req.get_response(self.test_auth)
- self.assertEquals(resp.status_int, 200)
- left = int(resp.headers['x-auth-token-expires'])
- self.assertTrue(left > DEFAULT_TOKEN_LIFE,
- '%d > %d' % (left, DEFAULT_TOKEN_LIFE))
- self.assertTrue(left <= MAX_TOKEN_LIFE,
- '%d <= %d' % (left, MAX_TOKEN_LIFE))
- self.assert_(resp.headers.get(
- 'x-auth-token',
- '').startswith('AUTH_tk'), resp.headers.get('x-auth-token'))
- self.assertEquals(resp.headers.get('x-auth-token'),
- resp.headers.get('x-storage-token'))
- self.assertEquals(resp.headers.get('x-storage-url'),
- 'http://127.0.0.1:8080/v1/AUTH_cfa')
- self.assertEquals(
- json.loads(resp.body),
- {"storage": {"default": "local",
- "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}})
- self.assertEquals(self.test_auth.app.calls, 5)
-
- def test_get_token_success_v1_act_auth(self):
- self.test_auth.app = FakeApp(iter([
- # GET of user object
- ('200 Ok', {},
- json.dumps({"auth": "plaintext:key",
- "groups": [{'name': "act:usr"}, {'name': "act"},
- {'name': ".admin"}]})),
- # GET of account
- ('204 Ok',
- {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, ''),
- # PUT of new token
- ('201 Created', {}, ''),
- # POST of token to user object
- ('204 No Content', {}, ''),
- # GET of services object
- ('200 Ok', {}, json.dumps({"storage": {"default": "local",
- "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}}))]))
- resp = Request.blank(
- '/auth/v1/act/auth',
- headers={'X-Storage-User': 'usr',
- 'X-Storage-Pass': 'key'}).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 200)
- self.assert_(resp.headers.get(
- 'x-auth-token',
- '').startswith('AUTH_tk'), resp.headers.get('x-auth-token'))
- self.assertEquals(resp.headers.get('x-auth-token'),
- resp.headers.get('x-storage-token'))
- self.assertEquals(resp.headers.get('x-storage-url'),
- 'http://127.0.0.1:8080/v1/AUTH_cfa')
- self.assertEquals(
- json.loads(resp.body),
- {"storage": {"default": "local",
- "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}})
- self.assertEquals(self.test_auth.app.calls, 5)
-
- def test_get_token_success_storage_instead_of_auth(
- self):
- self.test_auth.app = FakeApp(iter([
- # GET of user object
- ('200 Ok', {},
- json.dumps({"auth": "plaintext:key",
- "groups": [{'name': "act:usr"}, {'name': "act"},
- {'name': ".admin"}]})),
- # GET of account
- ('204 Ok',
- {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, ''),
- # PUT of new token
- ('201 Created', {}, ''),
- # POST of token to user object
- ('204 No Content', {}, ''),
- # GET of services object
- ('200 Ok', {}, json.dumps({"storage": {"default": "local",
- "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}}))]))
- resp = Request.blank(
- '/auth/v1.0',
- headers={'X-Storage-User': 'act:usr',
- 'X-Storage-Pass': 'key'}).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 200)
- self.assert_(
- resp.headers.get(
- 'x-auth-token',
- '').startswith('AUTH_tk'), resp.headers.get('x-auth-token'))
- self.assertEquals(resp.headers.get('x-auth-token'),
- resp.headers.get('x-storage-token'))
- self.assertEquals(resp.headers.get('x-storage-url'),
- 'http://127.0.0.1:8080/v1/AUTH_cfa')
- self.assertEquals(
- json.loads(resp.body),
- {"storage": {"default": "local",
- "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}})
- self.assertEquals(self.test_auth.app.calls, 5)
-
- def test_get_token_success_v1_act_auth_auth_instead_of_storage(
- self):
- self.test_auth.app = FakeApp(iter([
- # GET of user object
- ('200 Ok', {},
- json.dumps({"auth": "plaintext:key",
- "groups": [{'name': "act:usr"}, {'name': "act"},
- {'name': ".admin"}]})),
- # GET of account
- ('204 Ok',
- {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, ''),
- # PUT of new token
- ('201 Created', {}, ''),
- # POST of token to user object
- ('204 No Content', {}, ''),
- # GET of services object
- ('200 Ok', {}, json.dumps({"storage": {"default": "local",
- "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}}))]))
- resp = Request.blank(
- '/auth/v1/act/auth',
- headers={'X-Auth-User': 'act:usr',
- 'X-Auth-Key': 'key'}).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 200)
- self.assert_(resp.headers.get(
- 'x-auth-token',
- '').startswith('AUTH_tk'), resp.headers.get('x-auth-token'))
- self.assertEquals(resp.headers.get('x-auth-token'),
- resp.headers.get('x-storage-token'))
- self.assertEquals(resp.headers.get('x-storage-url'),
- 'http://127.0.0.1:8080/v1/AUTH_cfa')
- self.assertEquals(
- json.loads(resp.body),
- {"storage": {"default": "local",
- "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}})
- self.assertEquals(self.test_auth.app.calls, 5)
-
- def test_get_token_success_existing_token(self):
- self.test_auth.app = FakeApp(iter([
- # GET of user object
- ('200 Ok', {'X-Object-Meta-Auth-Token': 'AUTH_tktest'},
- json.dumps({"auth": "plaintext:key",
- "groups": [{'name': "act:usr"}, {'name': "act"},
- {'name': ".admin"}]})),
- # GET of token
- ('200 Ok', {}, json.dumps(
- {"account": "act", "user": "usr",
- "account_id": "AUTH_cfa",
- "groups": [{'name': "act:usr"},
- {'name': "key"}, {'name': ".admin"}],
- "expires": 9999999999.9999999})),
- # GET of services object
- ('200 Ok', {}, json.dumps({"storage": {"default": "local",
- "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}}))]))
- resp = Request.blank(
- '/auth/v1.0',
- headers={'X-Auth-User': 'act:usr',
- 'X-Auth-Key': 'key'}).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 200)
- self.assertEquals(
- resp.headers.get('x-auth-token'),
- 'AUTH_tktest')
- self.assertEquals(resp.headers.get('x-auth-token'),
- resp.headers.get('x-storage-token'))
- self.assertEquals(resp.headers.get('x-storage-url'),
- 'http://127.0.0.1:8080/v1/AUTH_cfa')
- self.assertEquals(
- json.loads(resp.body),
- {"storage": {"default": "local",
- "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}})
- self.assertEquals(self.test_auth.app.calls, 3)
-
- def test_get_token_success_existing_token_but_request_new_one(
- self):
- self.test_auth.app = FakeApp(iter([
- # GET of user object
- ('200 Ok', {'X-Object-Meta-Auth-Token': 'AUTH_tktest'},
- json.dumps({"auth": "plaintext:key",
- "groups": [{'name': "act:usr"}, {'name': "act"},
- {'name': ".admin"}]})),
- # DELETE of expired token
- ('204 No Content', {}, ''),
- # GET of account
- ('204 Ok',
- {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, ''),
- # PUT of new token
- ('201 Created', {}, ''),
- # POST of token to user object
- ('204 No Content', {}, ''),
- # GET of services object
- ('200 Ok', {}, json.dumps({"storage": {"default": "local",
- "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}}))]))
- resp = Request.blank(
- '/auth/v1.0',
- headers={'X-Auth-User': 'act:usr',
- 'X-Auth-Key': 'key',
- 'X-Auth-New-Token': 'true'}).get_response(
- self.test_auth)
- self.assertEquals(resp.status_int, 200)
- self.assertNotEquals(
- resp.headers.get('x-auth-token'), 'AUTH_tktest')
- self.assertEquals(resp.headers.get('x-auth-token'),
- resp.headers.get('x-storage-token'))
- self.assertEquals(resp.headers.get('x-storage-url'),
- 'http://127.0.0.1:8080/v1/AUTH_cfa')
- self.assertEquals(
- json.loads(resp.body),
- {"storage": {"default": "local",
- "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}})
- self.assertEquals(self.test_auth.app.calls, 6)
-
- def test_get_token_success_existing_token_expired(self):
- self.test_auth.app = FakeApp(iter([
- # GET of user object
- ('200 Ok', {'X-Object-Meta-Auth-Token': 'AUTH_tktest'},
- json.dumps({"auth": "plaintext:key",
- "groups": [{'name': "act:usr"}, {'name': "act"},
- {'name': ".admin"}]})),
- # GET of token
- ('200 Ok', {}, json.dumps(
- {"account": "act", "user": "usr",
- "account_id": "AUTH_cfa",
- "groups": [{'name': "act:usr"},
- {'name': "key"}, {'name': ".admin"}],
- "expires": 0.0})),
- # DELETE of expired token
- ('204 No Content', {}, ''),
- # GET of account
- ('204 Ok',
- {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, ''),
- # PUT of new token
- ('201 Created', {}, ''),
- # POST of token to user object
- ('204 No Content', {}, ''),
- # GET of services object
- ('200 Ok', {}, json.dumps({"storage": {"default": "local",
- "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}}))]))
- resp = Request.blank(
- '/auth/v1.0',
- headers={'X-Auth-User': 'act:usr',
- 'X-Auth-Key': 'key'}).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 200)
- self.assertNotEquals(
- resp.headers.get('x-auth-token'),
- 'AUTH_tktest')
- self.assertEquals(resp.headers.get('x-auth-token'),
- resp.headers.get('x-storage-token'))
- self.assertEquals(resp.headers.get('x-storage-url'),
- 'http://127.0.0.1:8080/v1/AUTH_cfa')
- self.assertEquals(
- json.loads(resp.body),
- {"storage": {"default": "local",
- "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}})
- self.assertEquals(self.test_auth.app.calls, 7)
-
- def test_get_token_success_existing_token_expired_fail_deleting_old(
- self):
- self.test_auth.app = FakeApp(iter([
- # GET of user object
- ('200 Ok', {'X-Object-Meta-Auth-Token': 'AUTH_tktest'},
- json.dumps({"auth": "plaintext:key",
- "groups": [{'name': "act:usr"}, {'name': "act"},
- {'name': ".admin"}]})),
- # GET of token
- ('200 Ok', {}, json.dumps({"account": "act", "user": "usr",
- "account_id": "AUTH_cfa",
- "groups": [{'name': "act:usr"},
- {'name': "key"}, {'name': ".admin"}],
- "expires": 0.0})),
- # DELETE of expired token
- ('503 Service Unavailable', {}, ''),
- # GET of account
- ('204 Ok',
- {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, ''),
- # PUT of new token
- ('201 Created', {}, ''),
- # POST of token to user object
- ('204 No Content', {}, ''),
- # GET of services object
- ('200 Ok', {}, json.dumps({"storage": {"default": "local",
- "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}}))]))
- resp = Request.blank(
- '/auth/v1.0',
- headers={'X-Auth-User': 'act:usr',
- 'X-Auth-Key': 'key'}).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 200)
- self.assertNotEquals(
- resp.headers.get('x-auth-token'),
- 'AUTH_tktest')
- self.assertEquals(resp.headers.get('x-auth-token'),
- resp.headers.get('x-storage-token'))
- self.assertEquals(resp.headers.get('x-storage-url'),
- 'http://127.0.0.1:8080/v1/AUTH_cfa')
- self.assertEquals(
- json.loads(resp.body),
- {"storage": {"default": "local",
- "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}})
- self.assertEquals(self.test_auth.app.calls, 7)
-
- def test_prep_success(self):
- list_to_iter = [
- # PUT of .auth account
- ('201 Created', {}, ''),
- # PUT of .account_id container
- ('201 Created', {}, '')]
- # PUT of .token* containers
- for x in xrange(16):
- list_to_iter.append(('201 Created', {}, ''))
- self.test_auth.app = FakeApp(iter(list_to_iter))
- resp = Request.blank('/auth/v2/.prep',
- environ={
- 'REQUEST_METHOD': 'POST'},
- headers={
- 'X-Auth-Admin-User':
- '.super_admin',
- 'X-Auth-Admin-Key': 'supertest'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 204)
- self.assertEquals(self.test_auth.app.calls, 18)
-
- def test_prep_bad_method(self):
- resp = Request.blank('/auth/v2/.prep',
- headers={
- 'X-Auth-Admin-User':
- '.super_admin',
- 'X-Auth-Admin-Key': 'supertest'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 400)
- resp = Request.blank('/auth/v2/.prep',
- environ={
- 'REQUEST_METHOD': 'HEAD'},
- headers={
- 'X-Auth-Admin-User':
- '.super_admin',
- 'X-Auth-Admin-Key': 'supertest'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 400)
- resp = Request.blank('/auth/v2/.prep',
- environ={
- 'REQUEST_METHOD': 'PUT'},
- headers={
- 'X-Auth-Admin-User':
- '.super_admin',
- 'X-Auth-Admin-Key': 'supertest'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 400)
-
- def test_prep_bad_creds(self):
- resp = Request.blank('/auth/v2/.prep',
- environ={
- 'REQUEST_METHOD': 'POST'},
- headers={
- 'X-Auth-Admin-User':
- 'super_admin',
- 'X-Auth-Admin-Key': 'supertest'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 403)
- resp = Request.blank('/auth/v2/.prep',
- environ={
- 'REQUEST_METHOD': 'POST'},
- headers={
- 'X-Auth-Admin-User':
- '.super_admin',
- 'X-Auth-Admin-Key': 'upertest'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 403)
- resp = Request.blank('/auth/v2/.prep',
- environ={
- 'REQUEST_METHOD': 'POST'},
- headers={
- 'X-Auth-Admin-User': '.super_admin'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 403)
- resp = Request.blank('/auth/v2/.prep',
- environ={
- 'REQUEST_METHOD': 'POST'},
- headers={
- 'X-Auth-Admin-Key': 'supertest'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 403)
- resp = Request.blank(
- '/auth/v2/.prep',
- environ={'REQUEST_METHOD': 'POST'}).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 403)
-
- def test_prep_fail_account_create(self):
- self.test_auth.app = FakeApp(iter([
- # PUT of .auth account
- ('503 Service Unavailable', {}, '')]))
- resp = Request.blank('/auth/v2/.prep',
- environ={
- 'REQUEST_METHOD': 'POST'},
- headers={
- 'X-Auth-Admin-User':
- '.super_admin',
- 'X-Auth-Admin-Key': 'supertest'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 500)
- self.assertEquals(self.test_auth.app.calls, 1)
-
- def test_prep_fail_token_container_create(self):
- self.test_auth.app = FakeApp(iter([
- # PUT of .auth account
- ('201 Created', {}, ''),
- # PUT of .token container
- ('503 Service Unavailable', {}, '')]))
- resp = Request.blank('/auth/v2/.prep',
- environ={
- 'REQUEST_METHOD': 'POST'},
- headers={
- 'X-Auth-Admin-User':
- '.super_admin',
- 'X-Auth-Admin-Key': 'supertest'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 500)
- self.assertEquals(self.test_auth.app.calls, 2)
-
- def test_prep_fail_account_id_container_create(self):
- self.test_auth.app = FakeApp(iter([
- # PUT of .auth account
- ('201 Created', {}, ''),
- # PUT of .token container
- ('201 Created', {}, ''),
- # PUT of .account_id container
- ('503 Service Unavailable', {}, '')]))
- resp = Request.blank('/auth/v2/.prep',
- environ={
- 'REQUEST_METHOD': 'POST'},
- headers={
- 'X-Auth-Admin-User':
- '.super_admin',
- 'X-Auth-Admin-Key': 'supertest'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 500)
- self.assertEquals(self.test_auth.app.calls, 3)
-
- def test_get_reseller_success(self):
- self.test_auth.app = FakeApp(iter([
- # GET of .auth account (list containers)
- ('200 Ok', {}, json.dumps([
- {"name": ".token", "count": 0, "bytes": 0},
- {"name": ".account_id",
- "count": 0, "bytes": 0},
- {"name": "act", "count": 0, "bytes": 0}])),
- # GET of .auth account (list containers
- # continuation)
- ('200 Ok', {}, '[]')]))
- resp = Request.blank('/auth/v2',
- headers={
- 'X-Auth-Admin-User':
- '.super_admin',
- 'X-Auth-Admin-Key': 'supertest'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 200)
- self.assertEquals(json.loads(resp.body),
- {"accounts": [{"name": "act"}]})
- self.assertEquals(self.test_auth.app.calls, 2)
-
- self.test_auth.app = FakeApp(iter([
- # GET of user object
- ('200 Ok', {}, json.dumps({"groups": [{"name": "act:adm"},
- {"name": "test"}, {"name": ".admin"},
- {"name": ".reseller_admin"}], "auth": "plaintext:key"})),
- # GET of .auth account (list containers)
- ('200 Ok', {}, json.dumps([
- {"name": ".token", "count": 0, "bytes": 0},
- {"name": ".account_id",
- "count": 0, "bytes": 0},
- {"name": "act", "count": 0, "bytes": 0}])),
- # GET of .auth account (list containers
- # continuation)
- ('200 Ok', {}, '[]')]))
- resp = Request.blank('/auth/v2',
- headers={
- 'X-Auth-Admin-User':
- 'act:adm',
- 'X-Auth-Admin-Key': 'key'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 200)
- self.assertEquals(json.loads(resp.body),
- {"accounts": [{"name": "act"}]})
- self.assertEquals(self.test_auth.app.calls, 3)
-
- def test_get_reseller_fail_bad_creds(self):
- self.test_auth.app = FakeApp(iter([
- # GET of user object
- ('404 Not Found', {}, '')]))
- resp = Request.blank('/auth/v2',
- headers={
- 'X-Auth-Admin-User':
- 'super:admin',
- 'X-Auth-Admin-Key': 'supertest'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 403)
- self.assertEquals(self.test_auth.app.calls, 1)
-
- self.test_auth.app = FakeApp(iter([
- # GET of user object (account admin, but not reseller
- # admin)
- ('200 Ok', {}, json.dumps({"groups": [{"name": "act:adm"},
- {"name": "test"}, {"name": ".admin"}],
- "auth": "plaintext:key"}))]))
- resp = Request.blank('/auth/v2',
- headers={
- 'X-Auth-Admin-User':
- 'act:adm',
- 'X-Auth-Admin-Key': 'key'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 403)
- self.assertEquals(self.test_auth.app.calls, 1)
-
- self.test_auth.app = FakeApp(iter([
- # GET of user object (regular user)
- ('200 Ok', {}, json.dumps({"groups": [{"name": "act:usr"},
- {"name": "test"}], "auth": "plaintext:key"}))]))
- resp = Request.blank('/auth/v2',
- headers={
- 'X-Auth-Admin-User':
- 'act:usr',
- 'X-Auth-Admin-Key': 'key'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 403)
- self.assertEquals(self.test_auth.app.calls, 1)
-
- def test_get_reseller_fail_listing(self):
- self.test_auth.app = FakeApp(iter([
- # GET of .auth account (list containers)
- ('503 Service Unavailable', {}, '')]))
- resp = Request.blank('/auth/v2',
- headers={
- 'X-Auth-Admin-User':
- '.super_admin',
- 'X-Auth-Admin-Key': 'supertest'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 500)
- self.assertEquals(self.test_auth.app.calls, 1)
-
- self.test_auth.app = FakeApp(iter([
- # GET of .auth account (list containers)
- ('200 Ok', {}, json.dumps([
- {"name": ".token", "count": 0, "bytes": 0},
- {"name": ".account_id",
- "count": 0, "bytes": 0},
- {"name": "act", "count": 0, "bytes": 0}])),
- # GET of .auth account (list containers
- # continuation)
- ('503 Service Unavailable', {}, '')]))
- resp = Request.blank('/auth/v2',
- headers={
- 'X-Auth-Admin-User':
- '.super_admin',
- 'X-Auth-Admin-Key': 'supertest'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 500)
- self.assertEquals(self.test_auth.app.calls, 2)
-
- def test_get_account_success(self):
- self.test_auth.app = FakeApp(iter([
- # GET of .services object
- ('200 Ok', {},
- json.dumps(
- {"storage": {"default": "local",
- "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}})),
- # GET of account container (list objects)
- ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'},
- json.dumps([
- {"name": ".services", "hash": "etag", "bytes": 112,
- "content_type":
- "application/octet-stream",
- "last_modified": "2010-12-03T17:16:27.618110"},
- {"name": "tester", "hash": "etag", "bytes": 104,
- "content_type":
- "application/octet-stream",
- "last_modified": "2010-12-03T17:16:27.736680"},
- {"name": "tester3", "hash": "etag", "bytes": 86,
- "content_type":
- "application/octet-stream",
- "last_modified": "2010-12-03T17:16:28.135530"}])),
- # GET of account container (list objects
- # continuation)
- ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, '[]')]))
- resp = Request.blank('/auth/v2/act',
- headers={
- 'X-Auth-Admin-User':
- '.super_admin',
- 'X-Auth-Admin-Key': 'supertest'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 200)
- self.assertEquals(
- json.loads(resp.body),
- {'account_id': 'AUTH_cfa',
- 'services': {'storage':
- {'default': 'local',
- 'local': 'http://127.0.0.1:8080/v1/AUTH_cfa'}},
- 'users': [{'name': 'tester'}, {'name': 'tester3'}]})
- self.assertEquals(self.test_auth.app.calls, 3)
-
- self.test_auth.app = FakeApp(iter([
- # GET of user object
- ('200 Ok', {}, json.dumps({"groups": [{"name": "act:adm"},
- {"name": "test"}, {"name": ".admin"}],
- "auth": "plaintext:key"})),
- # GET of .services object
- ('200 Ok', {},
- json.dumps({"storage":
- {"default": "local",
- "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}})),
- # GET of account container (list objects)
- ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'},
- json.dumps([
- {"name": ".services", "hash": "etag", "bytes": 112,
- "content_type":
- "application/octet-stream",
- "last_modified": "2010-12-03T17:16:27.618110"},
- {"name": "tester", "hash": "etag", "bytes": 104,
- "content_type":
- "application/octet-stream",
- "last_modified": "2010-12-03T17:16:27.736680"},
- {"name": "tester3", "hash": "etag", "bytes": 86,
- "content_type":
- "application/octet-stream",
- "last_modified": "2010-12-03T17:16:28.135530"}])),
- # GET of account container (list objects
- # continuation)
- ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, '[]')]))
- resp = Request.blank('/auth/v2/act',
- headers={
- 'X-Auth-Admin-User':
- 'act:adm',
- 'X-Auth-Admin-Key': 'key'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 200)
- self.assertEquals(
- json.loads(resp.body),
- {'account_id': 'AUTH_cfa',
- 'services': {'storage':
- {'default': 'local',
- 'local': 'http://127.0.0.1:8080/v1/AUTH_cfa'}},
- 'users': [{'name': 'tester'}, {'name': 'tester3'}]})
- self.assertEquals(self.test_auth.app.calls, 4)
-
- def test_get_account_fail_bad_account_name(self):
- resp = Request.blank('/auth/v2/.token',
- headers={
- 'X-Auth-Admin-User':
- '.super_admin',
- 'X-Auth-Admin-Key': 'supertest'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 400)
- resp = Request.blank('/auth/v2/.anything',
- headers={
- 'X-Auth-Admin-User':
- '.super_admin',
- 'X-Auth-Admin-Key': 'supertest'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 400)
-
- def test_get_account_fail_creds(self):
- self.test_auth.app = FakeApp(iter([
- # GET of user object
- ('404 Not Found', {}, '')]))
- resp = Request.blank('/auth/v2/act',
- headers={
- 'X-Auth-Admin-User':
- 'super:admin',
- 'X-Auth-Admin-Key': 'supertest'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 403)
- self.assertEquals(self.test_auth.app.calls, 1)
-
- self.test_auth.app = FakeApp(iter([
- # GET of user object (account admin, but wrong
- # account)
- ('200 Ok', {}, json.dumps({"groups": [{"name": "act2:adm"},
- {"name": "test"}, {"name": ".admin"}],
- "auth": "plaintext:key"}))]))
- resp = Request.blank('/auth/v2/act',
- headers={
- 'X-Auth-Admin-User':
- 'act2:adm',
- 'X-Auth-Admin-Key': 'key'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 403)
- self.assertEquals(self.test_auth.app.calls, 1)
-
- self.test_auth.app = FakeApp(iter([
- # GET of user object (regular user)
- ('200 Ok', {}, json.dumps({"groups": [{"name": "act:usr"},
- {"name": "test"}], "auth": "plaintext:key"}))]))
- resp = Request.blank('/auth/v2/act',
- headers={
- 'X-Auth-Admin-User':
- 'act:usr',
- 'X-Auth-Admin-Key': 'key'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 403)
- self.assertEquals(self.test_auth.app.calls, 1)
-
- def test_get_account_fail_get_services(self):
- self.test_auth.app = FakeApp(iter([
- # GET of .services object
- ('503 Service Unavailable', {}, '')]))
- resp = Request.blank('/auth/v2/act',
- headers={
- 'X-Auth-Admin-User':
- '.super_admin',
- 'X-Auth-Admin-Key': 'supertest'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 500)
- self.assertEquals(self.test_auth.app.calls, 1)
-
- self.test_auth.app = FakeApp(iter([
- # GET of .services object
- ('404 Not Found', {}, '')]))
- resp = Request.blank('/auth/v2/act',
- headers={
- 'X-Auth-Admin-User':
- '.super_admin',
- 'X-Auth-Admin-Key': 'supertest'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 404)
- self.assertEquals(self.test_auth.app.calls, 1)
-
- def test_get_account_fail_listing(self):
- self.test_auth.app = FakeApp(iter([
- # GET of .services object
- ('200 Ok', {}, json.dumps(
- {"storage": {"default": "local",
- "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}})),
- # GET of account container (list objects)
- ('503 Service Unavailable', {}, '')]))
- resp = Request.blank('/auth/v2/act',
- headers={
- 'X-Auth-Admin-User':
- '.super_admin',
- 'X-Auth-Admin-Key': 'supertest'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 500)
- self.assertEquals(self.test_auth.app.calls, 2)
-
- self.test_auth.app = FakeApp(iter([
- # GET of .services object
- ('200 Ok', {}, json.dumps(
- {"storage": {"default": "local",
- "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}})),
- # GET of account container (list objects)
- ('404 Not Found', {}, '')]))
- resp = Request.blank('/auth/v2/act',
- headers={
- 'X-Auth-Admin-User':
- '.super_admin',
- 'X-Auth-Admin-Key': 'supertest'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 404)
- self.assertEquals(self.test_auth.app.calls, 2)
-
- self.test_auth.app = FakeApp(iter([
- # GET of .services object
- ('200 Ok', {}, json.dumps(
- {"storage": {"default": "local",
- "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}})),
- # GET of account container (list objects)
- ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'},
- json.dumps([
- {"name": ".services", "hash": "etag", "bytes": 112,
- "content_type":
- "application/octet-stream",
- "last_modified": "2010-12-03T17:16:27.618110"},
- {"name": "tester", "hash": "etag", "bytes": 104,
- "content_type":
- "application/octet-stream",
- "last_modified": "2010-12-03T17:16:27.736680"},
- {"name": "tester3", "hash": "etag", "bytes": 86,
- "content_type":
- "application/octet-stream",
- "last_modified": "2010-12-03T17:16:28.135530"}])),
- # GET of account container (list objects
- # continuation)
- ('503 Service Unavailable', {}, '')]))
- resp = Request.blank('/auth/v2/act',
- headers={
- 'X-Auth-Admin-User':
- '.super_admin',
- 'X-Auth-Admin-Key': 'supertest'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 500)
- self.assertEquals(self.test_auth.app.calls, 3)
-
- def test_set_services_new_service(self):
- self.test_auth.app = FakeApp(iter([
- # GET of .services object
- ('200 Ok', {},
- json.dumps({"storage":
- {"default": "local",
- "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}})),
- # PUT of new .services object
- ('204 No Content', {}, '')]))
- resp = Request.blank('/auth/v2/act/.services',
- environ={
- 'REQUEST_METHOD': 'POST'},
- headers={
- 'X-Auth-Admin-User':
- '.super_admin',
- 'X-Auth-Admin-Key': 'supertest'},
- body=json.dumps(
- {'new_service':
- {'new_endpoint': 'new_value'}})
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 200)
- self.assertEquals(
- json.loads(resp.body),
- {'storage': {'default': 'local',
- 'local': 'http://127.0.0.1:8080/v1/AUTH_cfa'},
- 'new_service': {'new_endpoint': 'new_value'}})
- self.assertEquals(self.test_auth.app.calls, 2)
-
- def test_set_services_new_endpoint(self):
- self.test_auth.app = FakeApp(iter([
- # GET of .services object
- ('200 Ok', {}, json.dumps(
- {"storage":
- {"default": "local",
- "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}})),
- # PUT of new .services object
- ('204 No Content', {}, '')]))
- resp = Request.blank('/auth/v2/act/.services',
- environ={
- 'REQUEST_METHOD': 'POST'},
- headers={
- 'X-Auth-Admin-User':
- '.super_admin',
- 'X-Auth-Admin-Key': 'supertest'},
- body=json.dumps(
- {'storage': {'new_endpoint': 'new_value'}})
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 200)
- self.assertEquals(
- json.loads(resp.body),
- {'storage': {'default': 'local',
- 'local':
- 'http://127.0.0.1:8080/v1/AUTH_cfa',
- 'new_endpoint': 'new_value'}})
- self.assertEquals(self.test_auth.app.calls, 2)
-
- def test_set_services_update_endpoint(self):
- self.test_auth.app = FakeApp(iter([
- # GET of .services object
- ('200 Ok', {}, json.dumps(
- {"storage": {"default": "local",
- "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}})),
- # PUT of new .services object
- ('204 No Content', {}, '')]))
- resp = Request.blank('/auth/v2/act/.services',
- environ={
- 'REQUEST_METHOD': 'POST'},
- headers={
- 'X-Auth-Admin-User':
- '.super_admin',
- 'X-Auth-Admin-Key': 'supertest'},
- body=json.dumps(
- {'storage': {'local': 'new_value'}})
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 200)
- self.assertEquals(json.loads(resp.body),
- {'storage': {'default': 'local',
- 'local': 'new_value'}})
- self.assertEquals(self.test_auth.app.calls, 2)
-
- def test_set_services_fail_bad_creds(self):
- self.test_auth.app = FakeApp(iter([
- # GET of user object
- ('404 Not Found', {}, '')]))
- resp = Request.blank('/auth/v2/act/.services',
- environ={
- 'REQUEST_METHOD': 'POST'},
- headers={
- 'X-Auth-Admin-User':
- 'super:admin',
- 'X-Auth-Admin-Key': 'supertest'},
- body=json.dumps(
- {'storage': {'local': 'new_value'}})
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 403)
- self.assertEquals(self.test_auth.app.calls, 1)
-
- self.test_auth.app = FakeApp(iter([
- # GET of user object (account admin, but not reseller
- # admin)
- ('200 Ok', {}, json.dumps({"groups": [{"name": "act:adm"},
- {"name": "test"}, {"name": ".admin"}],
- "auth": "plaintext:key"}))]))
- resp = Request.blank('/auth/v2/act/.services',
- environ={
- 'REQUEST_METHOD': 'POST'},
- headers={
- 'X-Auth-Admin-User':
- 'act:adm',
- 'X-Auth-Admin-Key': 'key'},
- body=json.dumps(
- {'storage': {'local': 'new_value'}})
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 403)
- self.assertEquals(self.test_auth.app.calls, 1)
-
- self.test_auth.app = FakeApp(iter([
- # GET of user object (regular user)
- ('200 Ok', {}, json.dumps({"groups": [{"name": "act:usr"},
- {"name": "test"}], "auth": "plaintext:key"}))]))
- resp = Request.blank('/auth/v2/act/.services',
- environ={
- 'REQUEST_METHOD': 'POST'},
- headers={
- 'X-Auth-Admin-User':
- 'act:usr',
- 'X-Auth-Admin-Key': 'key'},
- body=json.dumps(
- {'storage': {'local': 'new_value'}})
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 403)
- self.assertEquals(self.test_auth.app.calls, 1)
-
- def test_set_services_fail_bad_account_name(self):
- resp = Request.blank('/auth/v2/.act/.services',
- environ={
- 'REQUEST_METHOD': 'POST'},
- headers={
- 'X-Auth-Admin-User':
- '.super_admin',
- 'X-Auth-Admin-Key': 'supertest'},
- body=json.dumps(
- {'storage': {'local': 'new_value'}})
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 400)
-
- def test_set_services_fail_bad_json(self):
- resp = Request.blank('/auth/v2/act/.services',
- environ={
- 'REQUEST_METHOD': 'POST'},
- headers={
- 'X-Auth-Admin-User':
- '.super_admin',
- 'X-Auth-Admin-Key': 'supertest'},
- body='garbage'
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 400)
- resp = Request.blank('/auth/v2/act/.services',
- environ={
- 'REQUEST_METHOD': 'POST'},
- headers={
- 'X-Auth-Admin-User':
- '.super_admin',
- 'X-Auth-Admin-Key': 'supertest'},
- body=''
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 400)
-
- def test_set_services_fail_get_services(self):
- self.test_auth.app = FakeApp(iter([
- # GET of .services object
- ('503 Unavailable', {}, '')]))
- resp = Request.blank('/auth/v2/act/.services',
- environ={
- 'REQUEST_METHOD': 'POST'},
- headers={
- 'X-Auth-Admin-User':
- '.super_admin',
- 'X-Auth-Admin-Key': 'supertest'},
- body=json.dumps({
- 'new_service': {'new_endpoint': 'new_value'}})
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 500)
- self.assertEquals(self.test_auth.app.calls, 1)
-
- self.test_auth.app = FakeApp(iter([
- # GET of .services object
- ('404 Not Found', {}, '')]))
- resp = Request.blank('/auth/v2/act/.services',
- environ={
- 'REQUEST_METHOD': 'POST'},
- headers={
- 'X-Auth-Admin-User':
- '.super_admin',
- 'X-Auth-Admin-Key': 'supertest'},
- body=json.dumps({
- 'new_service': {'new_endpoint': 'new_value'}})
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 404)
- self.assertEquals(self.test_auth.app.calls, 1)
-
- def test_set_services_fail_put_services(self):
- self.test_auth.app = FakeApp(iter([
- # GET of .services object
- ('200 Ok', {}, json.dumps(
- {"storage": {"default": "local",
- "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}})),
- # PUT of new .services object
- ('503 Unavailable', {}, '')]))
- resp = Request.blank('/auth/v2/act/.services',
- environ={
- 'REQUEST_METHOD': 'POST'},
- headers={
- 'X-Auth-Admin-User':
- '.super_admin',
- 'X-Auth-Admin-Key': 'supertest'},
- body=json.dumps(
- {'new_service':
- {'new_endpoint': 'new_value'}})
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 500)
- self.assertEquals(self.test_auth.app.calls, 2)
-
- def test_put_account_success(self):
- conn = FakeConn(iter([
- # PUT of storage account itself
- ('201 Created', {}, '')]))
- self.test_auth.get_conn = lambda: conn
- self.test_auth.app = FakeApp(iter([
- # Initial HEAD of account container to check for
- # pre-existence
- ('404 Not Found', {}, ''),
- # PUT of account container
- ('204 No Content', {}, ''),
- # PUT of .account_id mapping object
- ('204 No Content', {}, ''),
- # PUT of .services object
- ('204 No Content', {}, ''),
- # POST to account container updating
- # X-Container-Meta-Account-Id
- ('204 No Content', {}, '')]))
- resp = Request.blank(
- '/auth/v2/act',
- environ={
- 'REQUEST_METHOD': 'PUT', 'swift.cache': FakeMemcache()},
- headers={'X-Auth-Admin-User': '.super_admin',
- 'X-Auth-Admin-Key': 'supertest'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 201)
- self.assertEquals(self.test_auth.app.calls, 5)
- self.assertEquals(conn.calls, 1)
-
- def test_put_account_success_preexist_but_not_completed(
- self):
- conn = FakeConn(iter([
- # PUT of storage account itself
- ('201 Created', {}, '')]))
- self.test_auth.get_conn = lambda: conn
- self.test_auth.app = FakeApp(iter([
- # Initial HEAD of account container to check for pre-existence
- # We're going to show it as existing this time, but with no
- # X-Container-Meta-Account-Id, indicating a failed
- # previous attempt
- ('200 Ok', {}, ''),
- # PUT of .account_id mapping object
- ('204 No Content', {}, ''),
- # PUT of .services object
- ('204 No Content', {}, ''),
- # POST to account container updating
- # X-Container-Meta-Account-Id
- ('204 No Content', {}, '')]))
- resp = Request.blank(
- '/auth/v2/act',
- environ={
- 'REQUEST_METHOD': 'PUT',
- 'swift.cache': FakeMemcache()},
- headers={'X-Auth-Admin-User': '.super_admin',
- 'X-Auth-Admin-Key': 'supertest'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 201)
- self.assertEquals(self.test_auth.app.calls, 4)
- self.assertEquals(conn.calls, 1)
-
- def test_put_account_success_preexist_and_completed(
- self):
- self.test_auth.app = FakeApp(iter([
- # Initial HEAD of account container to check for pre-existence
- # We're going to show it as existing this time, and with an
- # X-Container-Meta-Account-Id, indicating it already
- # exists
- ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, '')]))
- resp = Request.blank(
- '/auth/v2/act',
- environ={
- 'REQUEST_METHOD': 'PUT', 'swift.cache': FakeMemcache()},
- headers={'X-Auth-Admin-User': '.super_admin',
- 'X-Auth-Admin-Key': 'supertest'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 202)
- self.assertEquals(self.test_auth.app.calls, 1)
-
- def test_put_account_success_with_given_suffix(self):
- conn = FakeConn(iter([
- # PUT of storage account itself
- ('201 Created', {}, '')]))
- self.test_auth.get_conn = lambda: conn
- self.test_auth.app = FakeApp(iter([
- # Initial HEAD of account container to check for
- # pre-existence
- ('404 Not Found', {}, ''),
- # PUT of account container
- ('204 No Content', {}, ''),
- # PUT of .account_id mapping object
- ('204 No Content', {}, ''),
- # PUT of .services object
- ('204 No Content', {}, ''),
- # POST to account container updating
- # X-Container-Meta-Account-Id
- ('204 No Content', {}, '')]))
- resp = Request.blank(
- '/auth/v2/act',
- environ={
- 'REQUEST_METHOD': 'PUT', 'swift.cache': FakeMemcache()},
- headers={'X-Auth-Admin-User': '.super_admin',
- 'X-Auth-Admin-Key': 'supertest',
- 'X-Account-Suffix': 'test-suffix'}).get_response(
- self.test_auth)
- self.assertEquals(resp.status_int, 201)
- self.assertEquals(
- conn.request_path,
- '/v1/AUTH_test-suffix')
- self.assertEquals(self.test_auth.app.calls, 5)
- self.assertEquals(conn.calls, 1)
-
- def test_put_account_fail_bad_creds(self):
- self.test_auth.app = FakeApp(iter([
- # GET of user object
- ('404 Not Found', {}, '')]))
- resp = Request.blank(
- '/auth/v2/act',
- environ={'REQUEST_METHOD': 'PUT',
- 'swift.cache': FakeMemcache()},
- headers={'X-Auth-Admin-User': 'super:admin',
- 'X-Auth-Admin-Key': 'supertest'},).get_response(
- self.test_auth)
- self.assertEquals(resp.status_int, 403)
- self.assertEquals(self.test_auth.app.calls, 1)
-
- self.test_auth.app = FakeApp(iter([
- # GET of user object (account admin, but not reseller
- # admin)
- ('200 Ok', {}, json.dumps({"groups": [{"name": "act:adm"},
- {"name": "test"}, {"name": ".admin"}],
- "auth": "plaintext:key"}))]))
- resp = Request.blank(
- '/auth/v2/act',
- environ={'REQUEST_METHOD': 'PUT',
- 'swift.cache': FakeMemcache()},
- headers={'X-Auth-Admin-User': 'act:adm',
- 'X-Auth-Admin-Key': 'key'},).get_response(
- self.test_auth)
- self.assertEquals(resp.status_int, 403)
- self.assertEquals(self.test_auth.app.calls, 1)
-
- self.test_auth.app = FakeApp(iter([
- # GET of user object (regular user)
- ('200 Ok', {}, json.dumps({"groups": [{"name": "act:usr"},
- {"name": "test"}], "auth": "plaintext:key"}))]))
- resp = Request.blank(
- '/auth/v2/act',
- environ={'REQUEST_METHOD': 'PUT', 'swift.cache': FakeMemcache()},
- headers={'X-Auth-Admin-User': 'act:usr',
- 'X-Auth-Admin-Key': 'key'},).get_response(
- self.test_auth)
- self.assertEquals(resp.status_int, 403)
- self.assertEquals(self.test_auth.app.calls, 1)
-
- def test_put_account_fail_invalid_account_name(self):
- resp = Request.blank(
- '/auth/v2/.act',
- environ={'REQUEST_METHOD': 'PUT', 'swift.cache': FakeMemcache()},
- headers={'X-Auth-Admin-User': '.super_admin',
- 'X-Auth-Admin-Key': 'supertest'},).get_response(
- self.test_auth)
- self.assertEquals(resp.status_int, 400)
-
- def test_put_account_fail_on_initial_account_head(self):
- self.test_auth.app = FakeApp(iter([
- # Initial HEAD of account container to check for
- # pre-existence
- ('503 Service Unavailable', {}, '')]))
- resp = Request.blank(
- '/auth/v2/act',
- environ={'REQUEST_METHOD': 'PUT',
- 'swift.cache': FakeMemcache()},
- headers={'X-Auth-Admin-User': '.super_admin',
- 'X-Auth-Admin-Key': 'supertest'}).get_response(
- self.test_auth)
- self.assertEquals(resp.status_int, 500)
- self.assertEquals(self.test_auth.app.calls, 1)
-
- def test_put_account_fail_on_account_marker_put(self):
- self.test_auth.app = FakeApp(iter([
- # Initial HEAD of account container to check for
- # pre-existence
- ('404 Not Found', {}, ''),
- # PUT of account container
- ('503 Service Unavailable', {}, '')]))
- resp = Request.blank(
- '/auth/v2/act',
- environ={'REQUEST_METHOD': 'PUT',
- 'swift.cache': FakeMemcache()},
- headers={'X-Auth-Admin-User': '.super_admin',
- 'X-Auth-Admin-Key': 'supertest'}).get_response(
- self.test_auth)
- self.assertEquals(resp.status_int, 500)
- self.assertEquals(self.test_auth.app.calls, 2)
-
- def test_put_account_fail_on_storage_account_put(self):
- conn = FakeConn(iter([
- # PUT of storage account itself
- ('503 Service Unavailable', {}, '')]))
- self.test_auth.get_conn = lambda: conn
- self.test_auth.app = FakeApp(iter([
- # Initial HEAD of account container to check for
- # pre-existence
- ('404 Not Found', {}, ''),
- # PUT of account container
- ('204 No Content', {}, '')]))
- resp = Request.blank(
- '/auth/v2/act',
- environ={'REQUEST_METHOD': 'PUT',
- 'swift.cache': FakeMemcache()},
- headers={'X-Auth-Admin-User': '.super_admin',
- 'X-Auth-Admin-Key': 'supertest'}).get_response(
- self.test_auth)
- self.assertEquals(resp.status_int, 500)
- self.assertEquals(conn.calls, 1)
- self.assertEquals(self.test_auth.app.calls, 2)
-
- def test_put_account_fail_on_account_id_mapping(self):
- conn = FakeConn(iter([
- # PUT of storage account itself
- ('201 Created', {}, '')]))
- self.test_auth.get_conn = lambda: conn
- self.test_auth.app = FakeApp(iter([
- # Initial HEAD of account container to check for
- # pre-existence
- ('404 Not Found', {}, ''),
- # PUT of account container
- ('204 No Content', {}, ''),
- # PUT of .account_id mapping object
- ('503 Service Unavailable', {}, '')]))
- resp = Request.blank(
- '/auth/v2/act',
- environ={'REQUEST_METHOD': 'PUT',
- 'swift.cache': FakeMemcache()},
- headers={'X-Auth-Admin-User': '.super_admin',
- 'X-Auth-Admin-Key': 'supertest'}).get_response(
- self.test_auth)
- self.assertEquals(resp.status_int, 500)
- self.assertEquals(conn.calls, 1)
- self.assertEquals(self.test_auth.app.calls, 3)
-
- def test_put_account_fail_on_services_object(self):
- conn = FakeConn(iter([
- # PUT of storage account itself
- ('201 Created', {}, '')]))
- self.test_auth.get_conn = lambda: conn
- self.test_auth.app = FakeApp(iter([
- # Initial HEAD of account container to check for
- # pre-existence
- ('404 Not Found', {}, ''),
- # PUT of account container
- ('204 No Content', {}, ''),
- # PUT of .account_id mapping object
- ('204 No Content', {}, ''),
- # PUT of .services object
- ('503 Service Unavailable', {}, '')]))
- resp = Request.blank(
- '/auth/v2/act',
- environ={'REQUEST_METHOD': 'PUT',
- 'swift.cache': FakeMemcache()},
- headers={'X-Auth-Admin-User': '.super_admin',
- 'X-Auth-Admin-Key': 'supertest'}).get_response(
- self.test_auth)
- self.assertEquals(resp.status_int, 500)
- self.assertEquals(conn.calls, 1)
- self.assertEquals(self.test_auth.app.calls, 4)
-
- def test_put_account_fail_on_post_mapping(self):
- conn = FakeConn(iter([
- # PUT of storage account itself
- ('201 Created', {}, '')]))
- self.test_auth.get_conn = lambda: conn
- self.test_auth.app = FakeApp(iter([
- # Initial HEAD of account container to check for
- # pre-existence
- ('404 Not Found', {}, ''),
- # PUT of account container
- ('204 No Content', {}, ''),
- # PUT of .account_id mapping object
- ('204 No Content', {}, ''),
- # PUT of .services object
- ('204 No Content', {}, ''),
- # POST to account container updating
- # X-Container-Meta-Account-Id
- ('503 Service Unavailable', {}, '')]))
- resp = Request.blank(
- '/auth/v2/act',
- environ={'REQUEST_METHOD': 'PUT', 'swift.cache': FakeMemcache()},
- headers={'X-Auth-Admin-User': '.super_admin',
- 'X-Auth-Admin-Key': 'supertest'}).get_response(
- self.test_auth)
- self.assertEquals(resp.status_int, 500)
- self.assertEquals(conn.calls, 1)
- self.assertEquals(self.test_auth.app.calls, 5)
-
- def test_delete_account_success(self):
- conn = FakeConn(iter([
- # DELETE of storage account itself
- ('204 No Content', {}, '')]))
- self.test_auth.get_conn = lambda x: conn
- self.test_auth.app = FakeApp(iter([
- # Account's container listing, checking for
- # users
- ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'},
- json.dumps([
- {"name": ".services", "hash": "etag", "bytes": 112,
- "content_type":
- "application/octet-stream",
- "last_modified": "2010-12-03T17:16:27.618110"}])),
- # Account's container listing, checking for users
- # (continuation)
- ('200 Ok',
- {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, '[]'),
- # GET the .services object
- ('200 Ok', {}, json.dumps(
- {"storage": {"default": "local",
- "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}})),
- # DELETE the .services object
- ('204 No Content', {}, ''),
- # DELETE the .account_id mapping object
- ('204 No Content', {}, ''),
- # DELETE the account container
- ('204 No Content', {}, '')]))
- resp = Request.blank('/auth/v2/act',
- environ={
- 'REQUEST_METHOD': 'DELETE',
- 'swift.cache': FakeMemcache()},
- headers={
- 'X-Auth-Admin-User':
- '.super_admin',
- 'X-Auth-Admin-Key': 'supertest'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 204)
- self.assertEquals(self.test_auth.app.calls, 6)
- self.assertEquals(conn.calls, 1)
-
- def test_delete_account_success_missing_services(self):
- self.test_auth.app = FakeApp(iter([
- # Account's container listing, checking for
- # users
- ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'},
- json.dumps([
- {"name": ".services", "hash": "etag", "bytes": 112,
- "content_type":
- "application/octet-stream",
- "last_modified": "2010-12-03T17:16:27.618110"}])),
- # Account's container listing, checking for users
- # (continuation)
- ('200 Ok',
- {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, '[]'),
- # GET the .services object
- ('404 Not Found', {}, ''),
- # DELETE the .account_id mapping object
- ('204 No Content', {}, ''),
- # DELETE the account container
- ('204 No Content', {}, '')]))
- resp = Request.blank('/auth/v2/act',
- environ={
- 'REQUEST_METHOD': 'DELETE',
- 'swift.cache': FakeMemcache()},
- headers={
- 'X-Auth-Admin-User':
- '.super_admin',
- 'X-Auth-Admin-Key': 'supertest'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 204)
- self.assertEquals(self.test_auth.app.calls, 5)
-
- def test_delete_account_success_missing_storage_account(
- self):
- conn = FakeConn(iter([
- # DELETE of storage account itself
- ('404 Not Found', {}, '')]))
- self.test_auth.get_conn = lambda x: conn
- self.test_auth.app = FakeApp(iter([
- # Account's container listing, checking for
- # users
- ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'},
- json.dumps([
- {"name": ".services", "hash": "etag", "bytes": 112,
- "content_type":
- "application/octet-stream",
- "last_modified": "2010-12-03T17:16:27.618110"}])),
- # Account's container listing, checking for users
- # (continuation)
- ('200 Ok',
- {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, '[]'),
- # GET the .services object
- ('200 Ok', {}, json.dumps(
- {"storage": {"default": "local",
- "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}})),
- # DELETE the .services object
- ('204 No Content', {}, ''),
- # DELETE the .account_id mapping object
- ('204 No Content', {}, ''),
- # DELETE the account container
- ('204 No Content', {}, '')]))
- resp = Request.blank('/auth/v2/act',
- environ={
- 'REQUEST_METHOD': 'DELETE',
- 'swift.cache': FakeMemcache()},
- headers={
- 'X-Auth-Admin-User':
- '.super_admin',
- 'X-Auth-Admin-Key': 'supertest'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 204)
- self.assertEquals(self.test_auth.app.calls, 6)
- self.assertEquals(conn.calls, 1)
-
- def test_delete_account_success_missing_account_id_mapping(
- self):
- conn = FakeConn(iter([
- # DELETE of storage account itself
- ('204 No Content', {}, '')]))
- self.test_auth.get_conn = lambda x: conn
- self.test_auth.app = FakeApp(iter([
- # Account's container listing, checking for
- # users
- ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'},
- json.dumps([
- {"name": ".services", "hash": "etag", "bytes": 112,
- "content_type":
- "application/octet-stream",
- "last_modified": "2010-12-03T17:16:27.618110"}])),
- # Account's container listing, checking for users
- # (continuation)
- ('200 Ok',
- {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, '[]'),
- # GET the .services object
- ('200 Ok', {}, json.dumps(
- {"storage": {"default": "local",
- "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}})),
- # DELETE the .services object
- ('204 No Content', {}, ''),
- # DELETE the .account_id mapping object
- ('404 Not Found', {}, ''),
- # DELETE the account container
- ('204 No Content', {}, '')]))
- resp = Request.blank('/auth/v2/act',
- environ={
- 'REQUEST_METHOD': 'DELETE',
- 'swift.cache': FakeMemcache()},
- headers={
- 'X-Auth-Admin-User':
- '.super_admin',
- 'X-Auth-Admin-Key': 'supertest'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 204)
- self.assertEquals(self.test_auth.app.calls, 6)
- self.assertEquals(conn.calls, 1)
-
- def test_delete_account_success_missing_account_container_at_end(
- self):
- conn = FakeConn(iter([
- # DELETE of storage account itself
- ('204 No Content', {}, '')]))
- self.test_auth.get_conn = lambda x: conn
- self.test_auth.app = FakeApp(iter([
- # Account's container listing, checking for
- # users
- ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'},
- json.dumps([
- {"name": ".services", "hash": "etag", "bytes": 112,
- "content_type":
- "application/octet-stream",
- "last_modified": "2010-12-03T17:16:27.618110"}])),
- # Account's container listing, checking for users
- # (continuation)
- ('200 Ok',
- {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, '[]'),
- # GET the .services object
- ('200 Ok', {}, json.dumps(
- {"storage": {"default": "local",
- "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}})),
- # DELETE the .services object
- ('204 No Content', {}, ''),
- # DELETE the .account_id mapping object
- ('204 No Content', {}, ''),
- # DELETE the account container
- ('404 Not Found', {}, '')]))
- resp = Request.blank('/auth/v2/act',
- environ={
- 'REQUEST_METHOD': 'DELETE',
- 'swift.cache': FakeMemcache()},
- headers={
- 'X-Auth-Admin-User':
- '.super_admin',
- 'X-Auth-Admin-Key': 'supertest'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 204)
- self.assertEquals(self.test_auth.app.calls, 6)
- self.assertEquals(conn.calls, 1)
-
- def test_delete_account_fail_bad_creds(self):
- self.test_auth.app = FakeApp(iter([
- # GET of user object
- ('404 Not Found', {}, '')]))
- resp = Request.blank('/auth/v2/act',
- environ={
- 'REQUEST_METHOD': 'DELETE',
- 'swift.cache': FakeMemcache()},
- headers={
- 'X-Auth-Admin-User':
- 'super:admin',
- 'X-Auth-Admin-Key': 'supertest'},
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 403)
- self.assertEquals(self.test_auth.app.calls, 1)
-
- self.test_auth.app = FakeApp(iter([
- # GET of user object (account admin, but not reseller
- # admin)
- ('200 Ok', {}, json.dumps({"groups": [{"name": "act:adm"},
- {"name": "test"}, {"name": ".admin"}],
- "auth": "plaintext:key"}))]))
- resp = Request.blank('/auth/v2/act',
- environ={
- 'REQUEST_METHOD': 'DELETE',
- 'swift.cache': FakeMemcache()},
- headers={
- 'X-Auth-Admin-User':
- 'act:adm',
- 'X-Auth-Admin-Key': 'key'},
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 403)
- self.assertEquals(self.test_auth.app.calls, 1)
-
- self.test_auth.app = FakeApp(iter([
- # GET of user object (regular user)
- ('200 Ok', {}, json.dumps({"groups": [{"name": "act:usr"},
- {"name": "test"}], "auth": "plaintext:key"}))]))
- resp = Request.blank('/auth/v2/act',
- environ={
- 'REQUEST_METHOD': 'DELETE',
- 'swift.cache': FakeMemcache()},
- headers={
- 'X-Auth-Admin-User':
- 'act:usr',
- 'X-Auth-Admin-Key': 'key'},
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 403)
- self.assertEquals(self.test_auth.app.calls, 1)
-
- def test_delete_account_fail_invalid_account_name(self):
- resp = Request.blank('/auth/v2/.act',
- environ={
- 'REQUEST_METHOD': 'DELETE'},
- headers={
- 'X-Auth-Admin-User':
- '.super_admin',
- 'X-Auth-Admin-Key': 'supertest'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 400)
-
- def test_delete_account_fail_not_found(self):
- self.test_auth.app = FakeApp(iter([
- # Account's container listing, checking for
- # users
- ('404 Not Found', {}, '')]))
- resp = Request.blank('/auth/v2/act',
- environ={
- 'REQUEST_METHOD': 'DELETE',
- 'swift.cache': FakeMemcache()},
- headers={
- 'X-Auth-Admin-User':
- '.super_admin',
- 'X-Auth-Admin-Key': 'supertest'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 404)
- self.assertEquals(self.test_auth.app.calls, 1)
-
- def test_delete_account_fail_not_found_concurrency(
- self):
- self.test_auth.app = FakeApp(iter([
- # Account's container listing, checking for
- # users
- ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'},
- json.dumps([
- {"name": ".services", "hash": "etag", "bytes": 112,
- "content_type":
- "application/octet-stream",
- "last_modified": "2010-12-03T17:16:27.618110"}])),
- # Account's container listing, checking for users
- # (continuation)
- ('404 Not Found', {}, '')]))
- resp = Request.blank('/auth/v2/act',
- environ={
- 'REQUEST_METHOD': 'DELETE',
- 'swift.cache': FakeMemcache()},
- headers={
- 'X-Auth-Admin-User':
- '.super_admin',
- 'X-Auth-Admin-Key': 'supertest'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 404)
- self.assertEquals(self.test_auth.app.calls, 2)
-
- def test_delete_account_fail_list_account(self):
- self.test_auth.app = FakeApp(iter([
- # Account's container listing, checking for
- # users
- ('503 Service Unavailable', {}, '')]))
- resp = Request.blank('/auth/v2/act',
- environ={
- 'REQUEST_METHOD': 'DELETE',
- 'swift.cache': FakeMemcache()},
- headers={
- 'X-Auth-Admin-User':
- '.super_admin',
- 'X-Auth-Admin-Key': 'supertest'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 500)
- self.assertEquals(self.test_auth.app.calls, 1)
-
- def test_delete_account_fail_list_account_concurrency(
- self):
- self.test_auth.app = FakeApp(iter([
- # Account's container listing, checking for
- # users
- ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'},
- json.dumps([
- {"name": ".services", "hash": "etag", "bytes": 112,
- "content_type":
- "application/octet-stream",
- "last_modified": "2010-12-03T17:16:27.618110"}])),
- # Account's container listing, checking for users
- # (continuation)
- ('503 Service Unavailable', {}, '')]))
- resp = Request.blank('/auth/v2/act',
- environ={
- 'REQUEST_METHOD': 'DELETE',
- 'swift.cache': FakeMemcache()},
- headers={
- 'X-Auth-Admin-User':
- '.super_admin',
- 'X-Auth-Admin-Key': 'supertest'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 500)
- self.assertEquals(self.test_auth.app.calls, 2)
-
- def test_delete_account_fail_has_users(self):
- self.test_auth.app = FakeApp(iter([
- # Account's container listing, checking for
- # users
- ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'},
- json.dumps([
- {"name": ".services", "hash": "etag", "bytes": 112,
- "content_type":
- "application/octet-stream",
- "last_modified": "2010-12-03T17:16:27.618110"},
- {"name": "tester", "hash": "etag", "bytes": 104,
- "content_type":
- "application/octet-stream",
- "last_modified": "2010-12-03T17:16:27.736680"}]))]))
- resp = Request.blank('/auth/v2/act',
- environ={
- 'REQUEST_METHOD': 'DELETE',
- 'swift.cache': FakeMemcache()},
- headers={
- 'X-Auth-Admin-User':
- '.super_admin',
- 'X-Auth-Admin-Key': 'supertest'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 409)
- self.assertEquals(self.test_auth.app.calls, 1)
-
- def test_delete_account_fail_has_users2(self):
- self.test_auth.app = FakeApp(iter([
- # Account's container listing, checking for
- # users
- ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'},
- json.dumps([
- {"name": ".services", "hash": "etag", "bytes": 112,
- "content_type":
- "application/octet-stream",
- "last_modified": "2010-12-03T17:16:27.618110"}])),
- # Account's container listing, checking for users
- # (continuation)
- ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'},
- json.dumps([
- {"name": "tester", "hash": "etag", "bytes": 104,
- "content_type":
- "application/octet-stream",
- "last_modified": "2010-12-03T17:16:27.736680"}]))]))
- resp = Request.blank('/auth/v2/act',
- environ={
- 'REQUEST_METHOD': 'DELETE',
- 'swift.cache': FakeMemcache()},
- headers={
- 'X-Auth-Admin-User':
- '.super_admin',
- 'X-Auth-Admin-Key': 'supertest'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 409)
- self.assertEquals(self.test_auth.app.calls, 2)
-
- def test_delete_account_fail_get_services(self):
- self.test_auth.app = FakeApp(iter([
- # Account's container listing, checking for
- # users
- ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'},
- json.dumps([
- {"name": ".services", "hash": "etag", "bytes": 112,
- "content_type":
- "application/octet-stream",
- "last_modified": "2010-12-03T17:16:27.618110"}])),
- # Account's container listing, checking for users
- # (continuation)
- ('200 Ok',
- {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, '[]'),
- # GET the .services object
- ('503 Service Unavailable', {}, '')]))
- resp = Request.blank('/auth/v2/act',
- environ={
- 'REQUEST_METHOD': 'DELETE',
- 'swift.cache': FakeMemcache()},
- headers={
- 'X-Auth-Admin-User':
- '.super_admin',
- 'X-Auth-Admin-Key': 'supertest'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 500)
- self.assertEquals(self.test_auth.app.calls, 3)
-
- def test_delete_account_fail_delete_storage_account(
- self):
- conn = FakeConn(iter([
- # DELETE of storage account itself
- ('409 Conflict', {}, '')]))
- self.test_auth.get_conn = lambda x: conn
- self.test_auth.app = FakeApp(iter([
- # Account's container listing, checking for
- # users
- ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'},
- json.dumps([
- {"name": ".services", "hash": "etag", "bytes": 112,
- "content_type":
- "application/octet-stream",
- "last_modified": "2010-12-03T17:16:27.618110"}])),
- # Account's container listing, checking for users
- # (continuation)
- ('200 Ok',
- {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, '[]'),
- # GET the .services object
- ('200 Ok', {}, json.dumps(
- {"storage": {"default": "local",
- "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}}))]))
- resp = Request.blank('/auth/v2/act',
- environ={
- 'REQUEST_METHOD': 'DELETE',
- 'swift.cache': FakeMemcache()},
- headers={
- 'X-Auth-Admin-User':
- '.super_admin',
- 'X-Auth-Admin-Key': 'supertest'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 409)
- self.assertEquals(self.test_auth.app.calls, 3)
- self.assertEquals(conn.calls, 1)
-
- def test_delete_account_fail_delete_storage_account2(
- self):
- conn = FakeConn(iter([
- # DELETE of storage account itself
- ('204 No Content', {}, ''),
- # DELETE of storage account itself
- ('409 Conflict', {}, '')]))
- self.test_auth.get_conn = lambda x: conn
- self.test_auth.app = FakeApp(iter([
- # Account's container listing, checking for
- # users
- ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'},
- json.dumps([
- {"name": ".services", "hash": "etag", "bytes": 112,
- "content_type":
- "application/octet-stream",
- "last_modified": "2010-12-03T17:16:27.618110"}])),
- # Account's container listing, checking for users
- # (continuation)
- ('200 Ok',
- {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, '[]'),
- # GET the .services object
- ('200 Ok', {}, json.dumps(
- {"storage": {"default": "local",
- "local": "http://127.0.0.1:8080/v1/AUTH_cfa",
- "other": "http://127.0.0.1:8080/v1/AUTH_cfa2"}}))]))
- resp = Request.blank('/auth/v2/act',
- environ={
- 'REQUEST_METHOD': 'DELETE',
- 'swift.cache': FakeMemcache()},
- headers={
- 'X-Auth-Admin-User':
- '.super_admin',
- 'X-Auth-Admin-Key': 'supertest'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 500)
- self.assertEquals(self.test_auth.app.calls, 3)
- self.assertEquals(conn.calls, 2)
-
- def test_delete_account_fail_delete_storage_account3(
- self):
- conn = FakeConn(iter([
- # DELETE of storage account itself
- ('503 Service Unavailable', {}, '')]))
- self.test_auth.get_conn = lambda x: conn
- self.test_auth.app = FakeApp(iter([
- # Account's container listing, checking for
- # users
- ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'},
- json.dumps([
- {"name": ".services", "hash": "etag", "bytes": 112,
- "content_type":
- "application/octet-stream",
- "last_modified": "2010-12-03T17:16:27.618110"}])),
- # Account's container listing, checking for users
- # (continuation)
- ('200 Ok',
- {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, '[]'),
- # GET the .services object
- ('200 Ok', {}, json.dumps(
- {"storage": {"default": "local",
- "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}}))]))
- resp = Request.blank('/auth/v2/act',
- environ={
- 'REQUEST_METHOD': 'DELETE',
- 'swift.cache': FakeMemcache()},
- headers={
- 'X-Auth-Admin-User':
- '.super_admin',
- 'X-Auth-Admin-Key': 'supertest'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 500)
- self.assertEquals(self.test_auth.app.calls, 3)
- self.assertEquals(conn.calls, 1)
-
- def test_delete_account_fail_delete_storage_account4(
- self):
- conn = FakeConn(iter([
- # DELETE of storage account itself
- ('204 No Content', {}, ''),
- # DELETE of storage account itself
- ('503 Service Unavailable', {}, '')]))
- self.test_auth.get_conn = lambda x: conn
- self.test_auth.app = FakeApp(iter([
- # Account's container listing, checking for
- # users
- ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'},
- json.dumps([
- {"name": ".services", "hash": "etag", "bytes": 112,
- "content_type":
- "application/octet-stream",
- "last_modified": "2010-12-03T17:16:27.618110"}])),
- # Account's container listing, checking for users
- # (continuation)
- ('200 Ok',
- {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, '[]'),
- # GET the .services object
- ('200 Ok', {}, json.dumps(
- {"storage": {"default": "local",
- "local": "http://127.0.0.1:8080/v1/AUTH_cfa",
- "other": "http://127.0.0.1:8080/v1/AUTH_cfa2"}}))]))
- resp = Request.blank('/auth/v2/act',
- environ={
- 'REQUEST_METHOD': 'DELETE',
- 'swift.cache': FakeMemcache()},
- headers={
- 'X-Auth-Admin-User':
- '.super_admin',
- 'X-Auth-Admin-Key': 'supertest'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 500)
- self.assertEquals(self.test_auth.app.calls, 3)
- self.assertEquals(conn.calls, 2)
-
- def test_delete_account_fail_delete_services(self):
- conn = FakeConn(iter([
- # DELETE of storage account itself
- ('204 No Content', {}, '')]))
- self.test_auth.get_conn = lambda x: conn
- self.test_auth.app = FakeApp(iter([
- # Account's container listing, checking for
- # users
- ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'},
- json.dumps([
- {"name": ".services", "hash": "etag", "bytes": 112,
- "content_type":
- "application/octet-stream",
- "last_modified": "2010-12-03T17:16:27.618110"}])),
- # Account's container listing, checking for users
- # (continuation)
- ('200 Ok',
- {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, '[]'),
- # GET the .services object
- ('200 Ok', {}, json.dumps(
- {"storage": {"default": "local",
- "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}})),
- # DELETE the .services object
- ('503 Service Unavailable', {}, '')]))
- resp = Request.blank('/auth/v2/act',
- environ={
- 'REQUEST_METHOD': 'DELETE',
- 'swift.cache': FakeMemcache()},
- headers={
- 'X-Auth-Admin-User':
- '.super_admin',
- 'X-Auth-Admin-Key': 'supertest'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 500)
- self.assertEquals(self.test_auth.app.calls, 4)
- self.assertEquals(conn.calls, 1)
-
- def test_delete_account_fail_delete_account_id_mapping(
- self):
- conn = FakeConn(iter([
- # DELETE of storage account itself
- ('204 No Content', {}, '')]))
- self.test_auth.get_conn = lambda x: conn
- self.test_auth.app = FakeApp(iter([
- # Account's container listing, checking for
- # users
- ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'},
- json.dumps([
- {"name": ".services", "hash": "etag", "bytes": 112,
- "content_type":
- "application/octet-stream",
- "last_modified": "2010-12-03T17:16:27.618110"}])),
- # Account's container listing, checking for users
- # (continuation)
- ('200 Ok',
- {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, '[]'),
- # GET the .services object
- ('200 Ok', {}, json.dumps(
- {"storage": {"default": "local",
- "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}})),
- # DELETE the .services object
- ('204 No Content', {}, ''),
- # DELETE the .account_id mapping object
- ('503 Service Unavailable', {}, '')]))
- resp = Request.blank('/auth/v2/act',
- environ={
- 'REQUEST_METHOD': 'DELETE',
- 'swift.cache': FakeMemcache()},
- headers={
- 'X-Auth-Admin-User':
- '.super_admin',
- 'X-Auth-Admin-Key': 'supertest'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 500)
- self.assertEquals(self.test_auth.app.calls, 5)
- self.assertEquals(conn.calls, 1)
-
- def test_delete_account_fail_delete_account_container(
- self):
- conn = FakeConn(iter([
- # DELETE of storage account itself
- ('204 No Content', {}, '')]))
- self.test_auth.get_conn = lambda x: conn
- self.test_auth.app = FakeApp(iter([
- # Account's container listing, checking for
- # users
- ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'},
- json.dumps([
- {"name": ".services", "hash": "etag", "bytes": 112,
- "content_type":
- "application/octet-stream",
- "last_modified": "2010-12-03T17:16:27.618110"}])),
- # Account's container listing, checking for users
- # (continuation)
- ('200 Ok',
- {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, '[]'),
- # GET the .services object
- ('200 Ok', {}, json.dumps(
- {"storage": {"default": "local",
- "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}})),
- # DELETE the .services object
- ('204 No Content', {}, ''),
- # DELETE the .account_id mapping object
- ('204 No Content', {}, ''),
- # DELETE the account container
- ('503 Service Unavailable', {}, '')]))
- resp = Request.blank('/auth/v2/act',
- environ={
- 'REQUEST_METHOD': 'DELETE',
- 'swift.cache': FakeMemcache()},
- headers={
- 'X-Auth-Admin-User':
- '.super_admin',
- 'X-Auth-Admin-Key': 'supertest'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 500)
- self.assertEquals(self.test_auth.app.calls, 6)
- self.assertEquals(conn.calls, 1)
-
- def test_get_user_success(self):
- self.test_auth.app = FakeApp(iter([
- # GET of user object
- ('200 Ok', {}, json.dumps(
- {"groups": [{"name": "act:usr"}, {"name": "act"},
- {"name": ".admin"}],
- "auth": "plaintext:key"}))]))
- resp = Request.blank('/auth/v2/act/usr',
- headers={
- 'X-Auth-Admin-User':
- '.super_admin',
- 'X-Auth-Admin-Key': 'supertest'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 200)
- self.assertEquals(resp.body, json.dumps(
- {"groups": [{"name": "act:usr"}, {"name": "act"},
- {"name": ".admin"}],
- "auth": "plaintext:key"}))
- self.assertEquals(self.test_auth.app.calls, 1)
-
- def test_get_user_fail_no_super_admin_key(self):
- local_auth = auth.filter_factory({})(FakeApp(iter([
- # GET of user object (but we should never get
- # here)
- ('200 Ok', {}, json.dumps(
- {"groups": [{"name": "act:usr"}, {"name": "act"},
- {"name": ".admin"}],
- "auth": "plaintext:key"}))])))
- resp = Request.blank('/auth/v2/act/usr',
- headers={
- 'X-Auth-Admin-User':
- '.super_admin',
- 'X-Auth-Admin-Key': 'supertest'}
- ).get_response(local_auth)
- self.assertEquals(resp.status_int, 404)
- self.assertEquals(local_auth.app.calls, 0)
-
- def test_get_user_groups_success(self):
- self.test_auth.app = FakeApp(iter([
- # GET of account container (list objects)
- ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'},
- json.dumps([
- {"name": ".services", "hash": "etag", "bytes": 112,
- "content_type":
- "application/octet-stream",
- "last_modified": "2010-12-03T17:16:27.618110"},
- {"name": "tester", "hash": "etag", "bytes": 104,
- "content_type":
- "application/octet-stream",
- "last_modified": "2010-12-03T17:16:27.736680"},
- {"name": "tester3", "hash": "etag", "bytes": 86,
- "content_type":
- "application/octet-stream",
- "last_modified": "2010-12-03T17:16:28.135530"}])),
- # GET of user object
- ('200 Ok', {}, json.dumps(
- {"groups": [{"name": "act:tester"}, {"name": "act"},
- {"name": ".admin"}],
- "auth": "plaintext:key"})),
- # GET of user object
- ('200 Ok', {}, json.dumps(
- {"groups": [{"name": "act:tester3"}, {"name": "act"}],
- "auth": "plaintext:key3"})),
- # GET of account container (list objects
- # continuation)
- ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, '[]')]))
- resp = Request.blank('/auth/v2/act/.groups',
- headers={
- 'X-Auth-Admin-User':
- '.super_admin',
- 'X-Auth-Admin-Key': 'supertest'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 200)
- self.assertEquals(resp.body, json.dumps(
- {"groups": [{"name": ".admin"}, {"name": "act"},
- {"name": "act:tester"}, {"name": "act:tester3"}]}))
- self.assertEquals(self.test_auth.app.calls, 4)
-
- def test_get_user_groups_success2(self):
- self.test_auth.app = FakeApp(iter([
- # GET of account container (list objects)
- ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'},
- json.dumps([
- {"name": ".services", "hash": "etag", "bytes": 112,
- "content_type":
- "application/octet-stream",
- "last_modified": "2010-12-03T17:16:27.618110"},
- {"name": "tester", "hash": "etag", "bytes": 104,
- "content_type":
- "application/octet-stream",
- "last_modified": "2010-12-03T17:16:27.736680"}])),
- # GET of user object
- ('200 Ok', {}, json.dumps(
- {"groups": [{"name": "act:tester"}, {"name": "act"},
- {"name": ".admin"}],
- "auth": "plaintext:key"})),
- # GET of account container (list objects
- # continuation)
- ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'},
- json.dumps([
- {"name": "tester3", "hash": "etag", "bytes": 86,
- "content_type":
- "application/octet-stream",
- "last_modified": "2010-12-03T17:16:28.135530"}])),
- # GET of user object
- ('200 Ok', {}, json.dumps(
- {"groups": [{"name": "act:tester3"}, {"name": "act"}],
- "auth": "plaintext:key3"})),
- # GET of account container (list objects
- # continuation)
- ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, '[]')]))
- resp = Request.blank('/auth/v2/act/.groups',
- headers={
- 'X-Auth-Admin-User':
- '.super_admin',
- 'X-Auth-Admin-Key': 'supertest'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 200)
- self.assertEquals(resp.body, json.dumps(
- {"groups": [{"name": ".admin"}, {"name": "act"},
- {"name": "act:tester"}, {"name": "act:tester3"}]}))
- self.assertEquals(self.test_auth.app.calls, 5)
-
- def test_get_user_fail_invalid_account(self):
- resp = Request.blank('/auth/v2/.invalid/usr',
- headers={
- 'X-Auth-Admin-User':
- '.super_admin',
- 'X-Auth-Admin-Key': 'supertest'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 400)
-
- def test_get_user_fail_invalid_user(self):
- resp = Request.blank('/auth/v2/act/.invalid',
- headers={
- 'X-Auth-Admin-User':
- '.super_admin',
- 'X-Auth-Admin-Key': 'supertest'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 400)
-
- def test_get_user_fail_bad_creds(self):
- self.test_auth.app = FakeApp(iter([
- # GET of user object
- ('404 Not Found', {}, '')]))
- resp = Request.blank('/auth/v2/act/usr',
- headers={
- 'X-Auth-Admin-User':
- 'super:admin',
- 'X-Auth-Admin-Key': 'supertest'},
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 403)
- self.assertEquals(self.test_auth.app.calls, 1)
-
- self.test_auth.app = FakeApp(iter([
- # GET of user object (regular user)
- ('200 Ok', {}, json.dumps({"groups": [{"name": "act:usr"},
- {"name": "test"}], "auth": "plaintext:key"}))]))
- resp = Request.blank('/auth/v2/act/usr',
- headers={
- 'X-Auth-Admin-User':
- 'act:usr',
- 'X-Auth-Admin-Key': 'key'},
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 403)
- self.assertEquals(self.test_auth.app.calls, 1)
-
- def test_get_user_account_admin_success(self):
- self.test_auth.app = FakeApp(iter([
- # GET of user object (account admin, but not reseller
- # admin)
- ('200 Ok', {}, json.dumps({"groups": [{"name": "act:adm"},
- {"name": "test"}, {"name": ".admin"}],
- "auth": "plaintext:key"})),
- # GET of requested user object
- ('200 Ok', {}, json.dumps(
- {"groups": [{"name": "act:usr"}, {"name": "act"}],
- "auth": "plaintext:key"}))]))
- resp = Request.blank('/auth/v2/act/usr',
- headers={
- 'X-Auth-Admin-User':
- 'act:adm',
- 'X-Auth-Admin-Key': 'key'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 200)
- self.assertEquals(resp.body, json.dumps(
- {"groups": [{"name": "act:usr"}, {"name": "act"}],
- "auth": "plaintext:key"}))
- self.assertEquals(self.test_auth.app.calls, 2)
-
- def test_get_user_account_admin_fail_getting_account_admin(
- self):
- self.test_auth.app = FakeApp(iter([
- # GET of user object (account admin check)
- ('200 Ok', {}, json.dumps({"groups": [{"name": "act:adm"},
- {"name": "test"}, {"name": ".admin"}],
- "auth": "plaintext:key"})),
- # GET of requested user object [who is an .admin
- # as well]
- ('200 Ok', {}, json.dumps(
- {"groups": [{"name": "act:usr"}, {"name": "act"},
- {"name": ".admin"}],
- "auth": "plaintext:key"})),
- # GET of user object (reseller admin check [and fail
- # here])
- ('200 Ok', {}, json.dumps({"groups": [{"name": "act:adm"},
- {"name": "test"}, {"name": ".admin"}],
- "auth": "plaintext:key"}))]))
- resp = Request.blank('/auth/v2/act/usr',
- headers={
- 'X-Auth-Admin-User':
- 'act:adm',
- 'X-Auth-Admin-Key': 'key'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 403)
- self.assertEquals(self.test_auth.app.calls, 3)
-
- def test_get_user_account_admin_fail_getting_reseller_admin(
- self):
- self.test_auth.app = FakeApp(iter([
- # GET of user object (account admin check)
- ('200 Ok', {}, json.dumps({"groups": [{"name": "act:adm"},
- {"name": "test"}, {"name": ".admin"}],
- "auth": "plaintext:key"})),
- # GET of requested user object [who is a
- # .reseller_admin]
- ('200 Ok', {}, json.dumps(
- {"groups": [{"name": "act:usr"}, {"name": "act"},
- {"name": ".reseller_admin"}],
- "auth": "plaintext:key"}))]))
- resp = Request.blank('/auth/v2/act/usr',
- headers={
- 'X-Auth-Admin-User':
- 'act:adm',
- 'X-Auth-Admin-Key': 'key'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 403)
- self.assertEquals(self.test_auth.app.calls, 2)
-
- def test_get_user_reseller_admin_fail_getting_reseller_admin(
- self):
- self.test_auth.app = FakeApp(iter([
- # GET of user object (account admin check)
- ('200 Ok', {}, json.dumps({"groups": [{"name": "act:adm"},
- {"name": "test"}, {"name": ".reseller_admin"}],
- "auth": "plaintext:key"})),
- # GET of requested user object [who also is a
- # .reseller_admin]
- ('200 Ok', {}, json.dumps(
- {"groups": [{"name": "act:usr"}, {"name": "act"},
- {"name": ".reseller_admin"}],
- "auth": "plaintext:key"}))]))
- resp = Request.blank('/auth/v2/act/usr',
- headers={
- 'X-Auth-Admin-User':
- 'act:adm',
- 'X-Auth-Admin-Key': 'key'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 403)
- self.assertEquals(self.test_auth.app.calls, 2)
-
- def test_get_user_super_admin_succeed_getting_reseller_admin(
- self):
- self.test_auth.app = FakeApp(iter([
- # GET of requested user object
- ('200 Ok', {}, json.dumps(
- {"groups": [{"name": "act:usr"}, {"name": "act"},
- {"name": ".reseller_admin"}],
- "auth": "plaintext:key"}))]))
- resp = Request.blank('/auth/v2/act/usr',
- headers={
- 'X-Auth-Admin-User':
- '.super_admin',
- 'X-Auth-Admin-Key': 'supertest'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 200)
- self.assertEquals(resp.body, json.dumps(
- {"groups": [{"name": "act:usr"}, {"name": "act"},
- {"name": ".reseller_admin"}],
- "auth": "plaintext:key"}))
- self.assertEquals(self.test_auth.app.calls, 1)
-
- def test_get_user_groups_not_found(self):
- self.test_auth.app = FakeApp(iter([
- # GET of account container (list objects)
- ('404 Not Found', {}, '')]))
- resp = Request.blank('/auth/v2/act/.groups',
- headers={
- 'X-Auth-Admin-User':
- '.super_admin',
- 'X-Auth-Admin-Key': 'supertest'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 404)
- self.assertEquals(self.test_auth.app.calls, 1)
-
- def test_get_user_groups_fail_listing(self):
- self.test_auth.app = FakeApp(iter([
- # GET of account container (list objects)
- ('503 Service Unavailable', {}, '')]))
- resp = Request.blank('/auth/v2/act/.groups',
- headers={
- 'X-Auth-Admin-User':
- '.super_admin',
- 'X-Auth-Admin-Key': 'supertest'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 500)
- self.assertEquals(self.test_auth.app.calls, 1)
-
- def test_get_user_groups_fail_get_user(self):
- self.test_auth.app = FakeApp(iter([
- # GET of account container (list objects)
- ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'},
- json.dumps([
- {"name": ".services", "hash": "etag", "bytes": 112,
- "content_type":
- "application/octet-stream",
- "last_modified": "2010-12-03T17:16:27.618110"},
- {"name": "tester", "hash": "etag", "bytes": 104,
- "content_type":
- "application/octet-stream",
- "last_modified": "2010-12-03T17:16:27.736680"},
- {"name": "tester3", "hash": "etag", "bytes": 86,
- "content_type":
- "application/octet-stream",
- "last_modified": "2010-12-03T17:16:28.135530"}])),
- # GET of user object
- ('503 Service Unavailable', {}, '')]))
- resp = Request.blank('/auth/v2/act/.groups',
- headers={
- 'X-Auth-Admin-User':
- '.super_admin',
- 'X-Auth-Admin-Key': 'supertest'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 500)
- self.assertEquals(self.test_auth.app.calls, 2)
-
- def test_get_user_not_found(self):
- self.test_auth.app = FakeApp(iter([
- # GET of user object
- ('404 Not Found', {}, '')]))
- resp = Request.blank('/auth/v2/act/usr',
- headers={
- 'X-Auth-Admin-User':
- '.super_admin',
- 'X-Auth-Admin-Key': 'supertest'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 404)
- self.assertEquals(self.test_auth.app.calls, 1)
-
- def test_get_user_fail(self):
- self.test_auth.app = FakeApp(iter([
- # GET of user object
- ('503 Service Unavailable', {}, '')]))
- resp = Request.blank('/auth/v2/act/usr',
- headers={
- 'X-Auth-Admin-User':
- '.super_admin',
- 'X-Auth-Admin-Key':
- 'supertest',
- 'X-Auth-User-Key': 'key'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 500)
- self.assertEquals(self.test_auth.app.calls, 1)
-
- def test_put_user_fail_invalid_account(self):
- resp = Request.blank('/auth/v2/.invalid/usr',
- environ={
- 'REQUEST_METHOD': 'PUT'},
- headers={
- 'X-Auth-Admin-User':
- '.super_admin',
- 'X-Auth-Admin-Key':
- 'supertest',
- 'X-Auth-User-Key': 'key'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 400)
-
- def test_put_user_fail_invalid_user(self):
- resp = Request.blank('/auth/v2/act/.usr',
- environ={
- 'REQUEST_METHOD': 'PUT'},
- headers={
- 'X-Auth-Admin-User':
- '.super_admin',
- 'X-Auth-Admin-Key':
- 'supertest',
- 'X-Auth-User-Key': 'key'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 400)
-
- def test_put_user_fail_no_user_key(self):
- resp = Request.blank('/auth/v2/act/usr',
- environ={
- 'REQUEST_METHOD': 'PUT'},
- headers={
- 'X-Auth-Admin-User':
- '.super_admin',
- 'X-Auth-Admin-Key': 'supertest'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 400)
-
- def test_put_user_reseller_admin_fail_bad_creds(self):
- self.test_auth.app = FakeApp(iter([
- # GET of user object (reseller admin)
- # This shouldn't actually get called, checked
- # below
- ('200 Ok', {}, json.dumps({"groups": [{"name": "act:rdm"},
- {"name": "test"}, {"name": ".admin"},
- {"name": ".reseller_admin"}], "auth": "plaintext:key"}))]))
- resp = Request.blank('/auth/v2/act/usr',
- environ={
- 'REQUEST_METHOD': 'PUT'},
- headers={
- 'X-Auth-Admin-User':
- 'act:rdm',
- 'X-Auth-Admin-Key':
- 'key',
- 'X-Auth-User-Key':
- 'key',
- 'X-Auth-User-Reseller-Admin': 'true'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 403)
- self.assertEquals(self.test_auth.app.calls, 0)
-
- self.test_auth.app = FakeApp(iter([
- # GET of user object (account admin, but not reseller admin)
- # This shouldn't actually get called, checked
- # below
- ('200 Ok', {}, json.dumps({"groups": [{"name": "act:adm"},
- {"name": "test"}, {"name": ".admin"}],
- "auth": "plaintext:key"}))]))
- resp = Request.blank('/auth/v2/act/usr',
- environ={
- 'REQUEST_METHOD': 'PUT'},
- headers={
- 'X-Auth-Admin-User':
- 'act:adm',
- 'X-Auth-Admin-Key':
- 'key',
- 'X-Auth-User-Key':
- 'key',
- 'X-Auth-User-Reseller-Admin': 'true'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 403)
- self.assertEquals(self.test_auth.app.calls, 0)
-
- self.test_auth.app = FakeApp(iter([
- # GET of user object (regular user)
- # This shouldn't actually get called, checked
- # below
- ('200 Ok', {}, json.dumps({"groups": [{"name": "act:usr"},
- {"name": "test"}], "auth": "plaintext:key"}))]))
- resp = Request.blank('/auth/v2/act/usr',
- environ={
- 'REQUEST_METHOD': 'PUT'},
- headers={
- 'X-Auth-Admin-User':
- 'act:adm',
- 'X-Auth-Admin-Key':
- 'key',
- 'X-Auth-User-Key':
- 'key',
- 'X-Auth-User-Reseller-Admin': 'true'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 403)
- self.assertEquals(self.test_auth.app.calls, 0)
-
- def test_put_user_account_admin_fail_bad_creds(self):
- self.test_auth.app = FakeApp(iter([
- # GET of user object (account admin, but wrong
- # account)
- ('200 Ok', {}, json.dumps({"groups": [{"name": "act2:adm"},
- {"name": "test"}, {"name": ".admin"}],
- "auth": "plaintext:key"}))]))
- resp = Request.blank('/auth/v2/act/usr',
- environ={
- 'REQUEST_METHOD': 'PUT'},
- headers={
- 'X-Auth-Admin-User':
- 'act2:adm',
- 'X-Auth-Admin-Key':
- 'key',
- 'X-Auth-User-Key':
- 'key',
- 'X-Auth-User-Admin': 'true'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 403)
- self.assertEquals(self.test_auth.app.calls, 1)
-
- self.test_auth.app = FakeApp(iter([
- # GET of user object (regular user)
- ('200 Ok', {}, json.dumps({"groups": [{"name": "act:usr"},
- {"name": "test"}], "auth": "plaintext:key"}))]))
- resp = Request.blank('/auth/v2/act/usr',
- environ={
- 'REQUEST_METHOD': 'PUT'},
- headers={
- 'X-Auth-Admin-User':
- 'act:usr',
- 'X-Auth-Admin-Key':
- 'key',
- 'X-Auth-User-Key':
- 'key',
- 'X-Auth-User-Admin': 'true'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 403)
- self.assertEquals(self.test_auth.app.calls, 1)
-
- def test_put_user_regular_fail_bad_creds(self):
- self.test_auth.app = FakeApp(iter([
- # GET of user object (account admin, but wrong
- # account)
- ('200 Ok', {}, json.dumps({"groups": [{"name": "act2:adm"},
- {"name": "test"}, {"name": ".admin"}],
- "auth": "plaintext:key"}))]))
- resp = Request.blank('/auth/v2/act/usr',
- environ={
- 'REQUEST_METHOD': 'PUT'},
- headers={
- 'X-Auth-Admin-User':
- 'act2:adm',
- 'X-Auth-Admin-Key':
- 'key',
- 'X-Auth-User-Key': 'key'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 403)
- self.assertEquals(self.test_auth.app.calls, 1)
-
- self.test_auth.app = FakeApp(iter([
- # GET of user object (regular user)
- ('200 Ok', {}, json.dumps({"groups": [{"name": "act:usr"},
- {"name": "test"}], "auth": "plaintext:key"}))]))
- resp = Request.blank('/auth/v2/act/usr',
- environ={
- 'REQUEST_METHOD': 'PUT'},
- headers={
- 'X-Auth-Admin-User':
- 'act:usr',
- 'X-Auth-Admin-Key':
- 'key',
- 'X-Auth-User-Key': 'key'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 403)
- self.assertEquals(self.test_auth.app.calls, 1)
-
- def test_put_user_regular_success(self):
- self.test_auth.app = FakeApp(iter([
- ('200 Ok',
- {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, ''),
- # PUT of user object
- ('201 Created', {}, '')]))
- resp = Request.blank('/auth/v2/act/usr',
- environ={
- 'REQUEST_METHOD': 'PUT'},
- headers={
- 'X-Auth-Admin-User':
- '.super_admin',
- 'X-Auth-Admin-Key':
- 'supertest',
- 'X-Auth-User-Key': 'key'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 201)
- self.assertEquals(self.test_auth.app.calls, 2)
- self.assertEquals(
- json.loads(self.test_auth.app.request.body),
- {"groups": [{"name": "act:usr"}, {"name": "act"}],
- "auth": "plaintext:key"})
-
- def test_put_user_special_chars_success(self):
- self.test_auth.app = FakeApp(iter([
- ('200 Ok',
- {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, ''),
- # PUT of user object
- ('201 Created', {}, '')]))
- resp = Request.blank('/auth/v2/act/u_s-r',
- environ={
- 'REQUEST_METHOD': 'PUT'},
- headers={
- 'X-Auth-Admin-User':
- '.super_admin',
- 'X-Auth-Admin-Key':
- 'supertest',
- 'X-Auth-User-Key': 'key'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 201)
- self.assertEquals(self.test_auth.app.calls, 2)
- self.assertEquals(
- json.loads(self.test_auth.app.request.body),
- {"groups": [{"name": "act:u_s-r"}, {"name": "act"}],
- "auth": "plaintext:key"})
-
- def test_put_user_account_admin_success(self):
- self.test_auth.app = FakeApp(iter([
- ('200 Ok',
- {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, ''),
- # PUT of user object
- ('201 Created', {}, '')]))
- resp = Request.blank('/auth/v2/act/usr',
- environ={
- 'REQUEST_METHOD': 'PUT'},
- headers={
- 'X-Auth-Admin-User':
- '.super_admin',
- 'X-Auth-Admin-Key':
- 'supertest',
- 'X-Auth-User-Key': 'key',
- 'X-Auth-User-Admin': 'true'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 201)
- self.assertEquals(self.test_auth.app.calls, 2)
- self.assertEquals(
- json.loads(self.test_auth.app.request.body),
- {"groups": [{"name": "act:usr"}, {"name": "act"},
- {"name": ".admin"}],
- "auth": "plaintext:key"})
-
- def test_put_user_reseller_admin_success(self):
- self.test_auth.app = FakeApp(iter([
- ('200 Ok',
- {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, ''),
- # PUT of user object
- ('201 Created', {}, '')]))
- resp = Request.blank('/auth/v2/act/usr',
- environ={
- 'REQUEST_METHOD': 'PUT'},
- headers={
- 'X-Auth-Admin-User':
- '.super_admin',
- 'X-Auth-Admin-Key':
- 'supertest',
- 'X-Auth-User-Key': 'key',
- 'X-Auth-User-Reseller-Admin': 'true'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 201)
- self.assertEquals(self.test_auth.app.calls, 2)
- self.assertEquals(
- json.loads(self.test_auth.app.request.body),
- {"groups": [{"name": "act:usr"}, {"name": "act"},
- {"name": ".admin"}, {"name": ".reseller_admin"}],
- "auth": "plaintext:key"})
-
- def test_put_user_fail_not_found(self):
- self.test_auth.app = FakeApp(iter([
- ('200 Ok',
- {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, ''),
- # PUT of user object
- ('404 Not Found', {}, '')]))
- resp = Request.blank('/auth/v2/act/usr',
- environ={
- 'REQUEST_METHOD': 'PUT'},
- headers={
- 'X-Auth-Admin-User':
- '.super_admin',
- 'X-Auth-Admin-Key':
- 'supertest',
- 'X-Auth-User-Key': 'key'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 404)
- self.assertEquals(self.test_auth.app.calls, 2)
-
- def test_put_user_fail(self):
- self.test_auth.app = FakeApp(iter([
- # PUT of user object
- ('503 Service Unavailable', {}, '')]))
- resp = Request.blank('/auth/v2/act/usr',
- environ={
- 'REQUEST_METHOD': 'PUT'},
- headers={
- 'X-Auth-Admin-User':
- '.super_admin',
- 'X-Auth-Admin-Key':
- 'supertest',
- 'X-Auth-User-Key': 'key'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 500)
- self.assertEquals(self.test_auth.app.calls, 1)
-
- def test_delete_user_bad_creds(self):
- self.test_auth.app = FakeApp(iter([
- # GET of user object (account admin, but wrong
- # account)
- ('200 Ok', {}, json.dumps({"groups": [{"name": "act2:adm"},
- {"name": "test"}, {"name": ".admin"}],
- "auth": "plaintext:key"}))]))
- resp = Request.blank('/auth/v2/act/usr',
- environ={
- 'REQUEST_METHOD': 'DELETE'},
- headers={
- 'X-Auth-Admin-User':
- 'act2:adm',
- 'X-Auth-Admin-Key': 'key'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 403)
- self.assertEquals(self.test_auth.app.calls, 1)
-
- self.test_auth.app = FakeApp(iter([
- # GET of user object (regular user)
- ('200 Ok', {}, json.dumps({"groups": [{"name": "act:usr"},
- {"name": "test"}], "auth": "plaintext:key"}))]))
- resp = Request.blank('/auth/v2/act/usr',
- environ={
- 'REQUEST_METHOD': 'DELETE'},
- headers={
- 'X-Auth-Admin-User':
- 'act:usr',
- 'X-Auth-Admin-Key': 'key'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 403)
- self.assertEquals(self.test_auth.app.calls, 1)
-
- def test_delete_user_invalid_account(self):
- resp = Request.blank('/auth/v2/.invalid/usr',
- environ={
- 'REQUEST_METHOD': 'DELETE'},
- headers={
- 'X-Auth-Admin-User':
- '.super_admin',
- 'X-Auth-Admin-Key': 'supertest'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 400)
-
- def test_delete_user_invalid_user(self):
- resp = Request.blank('/auth/v2/act/.invalid',
- environ={
- 'REQUEST_METHOD': 'DELETE'},
- headers={
- 'X-Auth-Admin-User':
- '.super_admin',
- 'X-Auth-Admin-Key': 'supertest'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 400)
-
- def test_delete_user_not_found(self):
- self.test_auth.app = FakeApp(iter([
- # HEAD of user object
- ('404 Not Found', {}, '')]))
- resp = Request.blank('/auth/v2/act/usr',
- environ={
- 'REQUEST_METHOD': 'DELETE'},
- headers={
- 'X-Auth-Admin-User':
- '.super_admin',
- 'X-Auth-Admin-Key': 'supertest'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 404)
- self.assertEquals(self.test_auth.app.calls, 1)
-
- def test_delete_user_fail_head_user(self):
- self.test_auth.app = FakeApp(iter([
- # HEAD of user object
- ('503 Service Unavailable', {}, '')]))
- resp = Request.blank('/auth/v2/act/usr',
- environ={
- 'REQUEST_METHOD': 'DELETE'},
- headers={
- 'X-Auth-Admin-User':
- '.super_admin',
- 'X-Auth-Admin-Key': 'supertest'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 500)
- self.assertEquals(self.test_auth.app.calls, 1)
-
- def test_delete_user_fail_delete_token(self):
- self.test_auth.app = FakeApp(iter([
- # HEAD of user object
- ('200 Ok',
- {'X-Object-Meta-Auth-Token': 'AUTH_tk'}, ''),
- # DELETE of token
- ('503 Service Unavailable', {}, '')]))
- resp = Request.blank('/auth/v2/act/usr',
- environ={
- 'REQUEST_METHOD': 'DELETE'},
- headers={
- 'X-Auth-Admin-User':
- '.super_admin',
- 'X-Auth-Admin-Key': 'supertest'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 500)
- self.assertEquals(self.test_auth.app.calls, 2)
-
- def test_delete_user_fail_delete_user(self):
- self.test_auth.app = FakeApp(iter([
- # HEAD of user object
- ('200 Ok',
- {'X-Object-Meta-Auth-Token': 'AUTH_tk'}, ''),
- # DELETE of token
- ('204 No Content', {}, ''),
- # DELETE of user object
- ('503 Service Unavailable', {}, '')]))
- resp = Request.blank('/auth/v2/act/usr',
- environ={
- 'REQUEST_METHOD': 'DELETE'},
- headers={
- 'X-Auth-Admin-User':
- '.super_admin',
- 'X-Auth-Admin-Key': 'supertest'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 500)
- self.assertEquals(self.test_auth.app.calls, 3)
-
- def test_delete_user_success(self):
- self.test_auth.app = FakeApp(iter([
- # HEAD of user object
- ('200 Ok',
- {'X-Object-Meta-Auth-Token': 'AUTH_tk'}, ''),
- # DELETE of token
- ('204 No Content', {}, ''),
- # DELETE of user object
- ('204 No Content', {}, '')]))
- resp = Request.blank('/auth/v2/act/usr',
- environ={
- 'REQUEST_METHOD': 'DELETE'},
- headers={
- 'X-Auth-Admin-User':
- '.super_admin',
- 'X-Auth-Admin-Key': 'supertest'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 204)
- self.assertEquals(self.test_auth.app.calls, 3)
-
- def test_delete_user_success_missing_user_at_end(self):
- self.test_auth.app = FakeApp(iter([
- # HEAD of user object
- ('200 Ok',
- {'X-Object-Meta-Auth-Token': 'AUTH_tk'}, ''),
- # DELETE of token
- ('204 No Content', {}, ''),
- # DELETE of user object
- ('404 Not Found', {}, '')]))
- resp = Request.blank('/auth/v2/act/usr',
- environ={
- 'REQUEST_METHOD': 'DELETE'},
- headers={
- 'X-Auth-Admin-User':
- '.super_admin',
- 'X-Auth-Admin-Key': 'supertest'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 204)
- self.assertEquals(self.test_auth.app.calls, 3)
-
- def test_delete_user_success_missing_token(self):
- self.test_auth.app = FakeApp(iter([
- # HEAD of user object
- ('200 Ok',
- {'X-Object-Meta-Auth-Token': 'AUTH_tk'}, ''),
- # DELETE of token
- ('404 Not Found', {}, ''),
- # DELETE of user object
- ('204 No Content', {}, '')]))
- resp = Request.blank('/auth/v2/act/usr',
- environ={
- 'REQUEST_METHOD': 'DELETE'},
- headers={
- 'X-Auth-Admin-User':
- '.super_admin',
- 'X-Auth-Admin-Key': 'supertest'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 204)
- self.assertEquals(self.test_auth.app.calls, 3)
-
- def test_delete_user_success_no_token(self):
- self.test_auth.app = FakeApp(iter([
- # HEAD of user object
- ('200 Ok', {}, ''),
- # DELETE of user object
- ('204 No Content', {}, '')]))
- resp = Request.blank('/auth/v2/act/usr',
- environ={
- 'REQUEST_METHOD': 'DELETE'},
- headers={
- 'X-Auth-Admin-User':
- '.super_admin',
- 'X-Auth-Admin-Key': 'supertest'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 204)
- self.assertEquals(self.test_auth.app.calls, 2)
-
- def test_validate_token_bad_prefix(self):
- resp = Request.blank('/auth/v2/.token/BAD_token'
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 400)
-
- def test_validate_token_tmi(self):
- resp = Request.blank(
- '/auth/v2/.token/AUTH_token/tmi').get_response(self.test_auth)
- self.assertEquals(resp.status_int, 400)
-
- def test_validate_token_bad_memcache(self):
- fake_memcache = FakeMemcache()
- fake_memcache.set('AUTH_/auth/AUTH_token', 'bogus')
- resp = Request.blank(
- '/auth/v2/.token/AUTH_token',
- environ={'swift.cache': fake_memcache}).get_response(
- self.test_auth)
- self.assertEquals(resp.status_int, 500)
-
- def test_validate_token_from_memcache(self):
- fake_memcache = FakeMemcache()
- fake_memcache.set(
- 'AUTH_/auth/AUTH_token',
- (time() + 1,
- 'act:usr,act'))
- resp = Request.blank(
- '/auth/v2/.token/AUTH_token',
- environ={'swift.cache': fake_memcache}).get_response(
- self.test_auth)
- self.assertEquals(resp.status_int, 204)
- self.assertEquals(
- resp.headers.get('x-auth-groups'),
- 'act:usr,act')
- self.assert_(float(resp.headers['x-auth-ttl']) < 1,
- resp.headers['x-auth-ttl'])
-
- def test_validate_token_from_memcache_expired(self):
- fake_memcache = FakeMemcache()
- fake_memcache.set(
- 'AUTH_/auth/AUTH_token',
- (time() - 1,
- 'act:usr,act'))
- resp = Request.blank(
- '/auth/v2/.token/AUTH_token',
- environ={'swift.cache': fake_memcache}).get_response(
- self.test_auth)
- self.assertEquals(resp.status_int, 404)
- self.assert_('x-auth-groups' not in resp.headers)
- self.assert_('x-auth-ttl' not in resp.headers)
-
- def test_validate_token_from_object(self):
- self.test_auth.app = FakeApp(iter([
- # GET of token object
- ('200 Ok', {}, json.dumps({'groups': [{'name': 'act:usr'},
- {'name': 'act'}], 'expires': time() + 1}))]))
- resp = Request.blank('/auth/v2/.token/AUTH_token'
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 204)
- self.assertEquals(self.test_auth.app.calls, 1)
- self.assertEquals(
- resp.headers.get('x-auth-groups'),
- 'act:usr,act')
- self.assert_(float(resp.headers['x-auth-ttl']) < 1,
- resp.headers['x-auth-ttl'])
-
- def test_validate_token_from_object_expired(self):
- self.test_auth.app = FakeApp(iter([
- # GET of token object
- ('200 Ok', {}, json.dumps({'groups': 'act:usr,act',
- 'expires': time() - 1})),
- # DELETE of expired token object
- ('204 No Content', {}, '')]))
- resp = Request.blank('/auth/v2/.token/AUTH_token'
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 404)
- self.assertEquals(self.test_auth.app.calls, 2)
-
- def test_validate_token_from_object_with_admin(self):
- self.test_auth.app = FakeApp(iter([
- # GET of token object
- ('200 Ok', {}, json.dumps({'account_id': 'AUTH_cfa', 'groups':
- [{'name': 'act:usr'},
- {'name': 'act'},
- {'name': '.admin'}],
- 'expires': time() + 1}))]))
- resp = Request.blank('/auth/v2/.token/AUTH_token'
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 204)
- self.assertEquals(self.test_auth.app.calls, 1)
- self.assertEquals(resp.headers.get('x-auth-groups'),
- 'act:usr,act,AUTH_cfa')
- self.assert_(float(resp.headers['x-auth-ttl']) < 1,
- resp.headers['x-auth-ttl'])
-
- def test_get_conn_default(self):
- conn = self.test_auth.get_conn()
- self.assertEquals(
- conn.__class__,
- auth.HTTPConnection)
- self.assertEquals(conn.host, '127.0.0.1')
- self.assertEquals(conn.port, 8080)
-
- def test_get_conn_default_https(self):
- local_auth = auth.filter_factory(
- {'super_admin_key': 'supertest',
- 'default_swift_cluster': 'local#https://1.2.3.4/v1'})(FakeApp())
- conn = local_auth.get_conn()
- self.assertEquals(
- conn.__class__,
- auth.HTTPSConnection)
- self.assertEquals(conn.host, '1.2.3.4')
- self.assertEquals(conn.port, 443)
-
- def test_get_conn_overridden(self):
- local_auth = auth.filter_factory(
- {'super_admin_key': 'supertest',
- 'default_swift_cluster': 'local#https://1.2.3.4/v1'})(FakeApp())
- conn = \
- local_auth.get_conn(
- urlparsed=auth.urlparse('http://5.6.7.8/v1'))
- self.assertEquals(
- conn.__class__,
- auth.HTTPConnection)
- self.assertEquals(conn.host, '5.6.7.8')
- self.assertEquals(conn.port, 80)
-
- def test_get_conn_overridden_https(self):
- local_auth = auth.filter_factory(
- {'super_admin_key': 'supertest',
- 'default_swift_cluster': 'local#http://1.2.3.4/v1'})(FakeApp())
- conn = \
- local_auth.get_conn(
- urlparsed=auth.urlparse(
- 'https://5.6.7.8/v1'))
- self.assertEquals(
- conn.__class__,
- auth.HTTPSConnection)
- self.assertEquals(conn.host, '5.6.7.8')
- self.assertEquals(conn.port, 443)
-
- def test_get_itoken_fail_no_memcache(self):
- exc = None
- try:
- self.test_auth.get_itoken({})
- except Exception as err:
- exc = err
- self.assertEquals(str(exc),
- 'No memcache set up; required for Swauth middleware')
-
- def test_get_itoken_success(self):
- fmc = FakeMemcache()
- itk = self.test_auth.get_itoken(
- {'swift.cache': fmc})
- self.assert_(itk.startswith('AUTH_itk'), itk)
- expires, groups = fmc.get('AUTH_/auth/%s' % itk)
- self.assert_(expires > time(), expires)
- self.assertEquals(
- groups,
- '.auth,.reseller_admin,AUTH_.auth')
-
- def test_get_admin_detail_fail_no_colon(self):
- self.test_auth.app = FakeApp(iter([]))
- self.assertEquals(
- self.test_auth.get_admin_detail(
- Request.blank('/')),
- None)
- self.assertEquals(
- self.test_auth.get_admin_detail(
- Request.blank('/',
- headers={'X-Auth-Admin-User': 'usr'})), None)
- self.assertRaises(
- StopIteration, self.test_auth.get_admin_detail,
- Request.blank('/', headers={'X-Auth-Admin-User': 'act:usr'}))
-
- def test_get_admin_detail_fail_user_not_found(self):
- self.test_auth.app = FakeApp(
- iter([('404 Not Found', {}, '')]))
- self.assertEquals(
- self.test_auth.get_admin_detail(
- Request.blank('/',
- headers={'X-Auth-Admin-User': 'act:usr'})), None)
- self.assertEquals(self.test_auth.app.calls, 1)
-
- def test_get_admin_detail_fail_get_user_error(self):
- self.test_auth.app = FakeApp(iter([
- ('503 Service Unavailable', {}, '')]))
- exc = None
- try:
- self.test_auth.get_admin_detail(
- Request.blank('/',
- headers={'X-Auth-Admin-User': 'act:usr'}))
- except Exception as err:
- exc = err
- self.assertEquals(str(exc), 'Could not get admin user object: '
- '/v1/AUTH_.auth/act/usr 503 Service Unavailable')
- self.assertEquals(self.test_auth.app.calls, 1)
-
- def test_get_admin_detail_success(self):
- self.test_auth.app = FakeApp(iter([
- ('200 Ok', {},
- json.dumps({"auth": "plaintext:key",
- "groups": [{'name': "act:usr"}, {'name': "act"},
- {'name': ".admin"}]}))]))
- detail = self.test_auth.get_admin_detail(
- Request.blank('/',
- headers={'X-Auth-Admin-User': 'act:usr'}))
- self.assertEquals(self.test_auth.app.calls, 1)
- self.assertEquals(
- detail, {'account': 'act',
- 'auth': 'plaintext:key',
- 'groups': [{'name': 'act:usr'}, {'name': 'act'},
- {'name': '.admin'}]})
-
- def test_credentials_match_success(self):
- self.assert_(self.test_auth.credentials_match(
- {'auth': 'plaintext:key'}, 'key'))
-
- def test_credentials_match_fail_no_details(self):
- self.assert_(
- not self.test_auth.credentials_match(None, 'notkey'))
-
- def test_credentials_match_fail_plaintext(self):
- self.assert_(not self.test_auth.credentials_match(
- {'auth': 'plaintext:key'}, 'notkey'))
-
- def test_is_super_admin_success(self):
- self.assert_(
- self.test_auth.is_super_admin(
- Request.blank(
- '/',
- headers={'X-Auth-Admin-User': '.super_admin',
- 'X-Auth-Admin-Key': 'supertest'})))
-
- def test_is_super_admin_fail_bad_key(self):
- self.assert_(
- not self.test_auth.is_super_admin(
- Request.blank('/',
- headers={
- 'X-Auth-Admin-User':
- '.super_admin',
- 'X-Auth-Admin-Key': 'bad'})))
- self.assert_(
- not self.test_auth.is_super_admin(
- Request.blank('/',
- headers={'X-Auth-Admin-User': '.super_admin'})))
- self.assert_(
- not self.test_auth.is_super_admin(Request.blank('/')))
-
- def test_is_super_admin_fail_bad_user(self):
- self.assert_(
- not self.test_auth.is_super_admin(
- Request.blank('/',
- headers={
- 'X-Auth-Admin-User':
- 'bad',
- 'X-Auth-Admin-Key': 'supertest'})))
- self.assert_(
- not self.test_auth.is_super_admin(
- Request.blank('/',
- headers={'X-Auth-Admin-Key': 'supertest'})))
- self.assert_(
- not self.test_auth.is_super_admin(Request.blank('/')))
-
- def test_is_reseller_admin_success_is_super_admin(self):
- self.assert_(
- self.test_auth.is_reseller_admin(
- Request.blank('/',
- headers={
- 'X-Auth-Admin-User':
- '.super_admin',
- 'X-Auth-Admin-Key': 'supertest'})))
-
- def test_is_reseller_admin_success_called_get_admin_detail(
- self):
- self.test_auth.app = FakeApp(iter([
- ('200 Ok', {},
- json.dumps({'auth': 'plaintext:key',
- 'groups': [{'name': 'act:rdm'}, {'name': 'act'},
- {'name': '.admin'},
- {'name': '.reseller_admin'}]}))]))
- self.assert_(
- self.test_auth.is_reseller_admin(
- Request.blank('/',
- headers={
- 'X-Auth-Admin-User':
- 'act:rdm',
- 'X-Auth-Admin-Key': 'key'})))
-
- def test_is_reseller_admin_fail_only_account_admin(
- self):
- self.test_auth.app = FakeApp(iter([
- ('200 Ok', {},
- json.dumps({'auth': 'plaintext:key',
- 'groups': [{'name': 'act:adm'}, {'name': 'act'},
- {'name': '.admin'}]}))]))
- self.assert_(
- not self.test_auth.is_reseller_admin(
- Request.blank('/',
- headers={
- 'X-Auth-Admin-User':
- 'act:adm',
- 'X-Auth-Admin-Key': 'key'})))
-
- def test_is_reseller_admin_fail_regular_user(self):
- self.test_auth.app = FakeApp(iter([
- ('200 Ok', {},
- json.dumps({'auth': 'plaintext:key',
- 'groups': [{'name': 'act:usr'}, {'name': 'act'}]}))]))
- self.assert_(
- not self.test_auth.is_reseller_admin(
- Request.blank('/',
- headers={
- 'X-Auth-Admin-User':
- 'act:usr',
- 'X-Auth-Admin-Key': 'key'})))
-
- def test_is_reseller_admin_fail_bad_key(self):
- self.test_auth.app = FakeApp(iter([
- ('200 Ok', {},
- json.dumps({'auth': 'plaintext:key',
- 'groups': [{'name': 'act:rdm'}, {'name': 'act'},
- {'name': '.admin'},
- {'name': '.reseller_admin'}]}))]))
- self.assert_(
- not self.test_auth.is_reseller_admin(
- Request.blank('/',
- headers={
- 'X-Auth-Admin-User':
- 'act:rdm',
- 'X-Auth-Admin-Key': 'bad'})))
-
- def test_is_account_admin_success_is_super_admin(self):
- self.assert_(
- self.test_auth.is_account_admin(
- Request.blank('/',
- headers={
- 'X-Auth-Admin-User':
- '.super_admin',
- 'X-Auth-Admin-Key': 'supertest'}), 'act'))
-
- def test_is_account_admin_success_is_reseller_admin(
- self):
- self.test_auth.app = FakeApp(iter([
- ('200 Ok', {},
- json.dumps({'auth': 'plaintext:key',
- 'groups': [{'name': 'act:rdm'}, {'name': 'act'},
- {'name': '.admin'},
- {'name': '.reseller_admin'}]}))]))
- self.assert_(
- self.test_auth.is_account_admin(
- Request.blank('/',
- headers={
- 'X-Auth-Admin-User':
- 'act:rdm',
- 'X-Auth-Admin-Key': 'key'}), 'act'))
-
- def test_is_account_admin_success(self):
- self.test_auth.app = FakeApp(iter([
- ('200 Ok', {},
- json.dumps({'auth': 'plaintext:key',
- 'groups': [{'name': 'act:adm'}, {'name': 'act'},
- {'name': '.admin'}]}))]))
- self.assert_(
- self.test_auth.is_account_admin(
- Request.blank('/',
- headers={
- 'X-Auth-Admin-User':
- 'act:adm',
- 'X-Auth-Admin-Key': 'key'}), 'act'))
-
- def test_is_account_admin_fail_account_admin_different_account(
- self):
- self.test_auth.app = FakeApp(iter([
- ('200 Ok', {},
- json.dumps({'auth': 'plaintext:key',
- 'groups': [{'name': 'act2:adm'}, {'name': 'act2'},
- {'name': '.admin'}]}))]))
- self.assert_(
- not self.test_auth.is_account_admin(
- Request.blank('/',
- headers={
- 'X-Auth-Admin-User':
- 'act2:adm',
- 'X-Auth-Admin-Key': 'key'}), 'act'))
-
- def test_is_account_admin_fail_regular_user(self):
- self.test_auth.app = FakeApp(iter([
- ('200 Ok', {},
- json.dumps({'auth': 'plaintext:key',
- 'groups': [{'name': 'act:usr'}, {'name': 'act'}]}))]))
- self.assert_(
- not self.test_auth.is_account_admin(
- Request.blank('/',
- headers={
- 'X-Auth-Admin-User':
- 'act:usr',
- 'X-Auth-Admin-Key': 'key'}), 'act'))
-
- def test_is_account_admin_fail_bad_key(self):
- self.test_auth.app = FakeApp(iter([
- ('200 Ok', {},
- json.dumps({'auth': 'plaintext:key',
- 'groups': [{'name': 'act:rdm'}, {'name': 'act'},
- {'name': '.admin'},
- {'name': '.reseller_admin'}]}))]))
- self.assert_(
- not self.test_auth.is_account_admin(
- Request.blank('/',
- headers={
- 'X-Auth-Admin-User':
- 'act:rdm',
- 'X-Auth-Admin-Key': 'bad'}), 'act'))
-
- def test_reseller_admin_but_account_is_internal_use_only(
- self):
- req = Request.blank('/v1/AUTH_.auth',
- environ={'REQUEST_METHOD': 'GET'})
- req.remote_user = 'act:usr,act,.reseller_admin'
- resp = self.test_auth.authorize(req)
- self.assertEquals(resp.status_int, 403)
-
- def test_reseller_admin_but_account_is_exactly_reseller_prefix(
- self):
- req = Request.blank(
- '/v1/AUTH_',
- environ={'REQUEST_METHOD': 'GET'})
- req.remote_user = 'act:usr,act,.reseller_admin'
- resp = self.test_auth.authorize(req)
- self.assertEquals(resp.status_int, 403)
-
- def _get_token_success_v1_0_encoded(
- self, saved_user, saved_key, sent_user,
- sent_key):
- self.test_auth.app = FakeApp(iter([
- # GET of user object
- ('200 Ok', {},
- json.dumps({"auth": "plaintext:%s" % saved_key,
- "groups": [{'name': saved_user}, {'name': "act"},
- {'name': ".admin"}]})),
- # GET of account
- ('204 Ok',
- {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, ''),
- # PUT of new token
- ('201 Created', {}, ''),
- # POST of token to user object
- ('204 No Content', {}, ''),
- # GET of services object
- ('200 Ok', {}, json.dumps({"storage": {"default": "local",
- "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}}))]))
- resp = Request.blank(
- '/auth/v1.0',
- headers={'X-Auth-User': sent_user,
- 'X-Auth-Key': sent_key}).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 200)
- self.assert_(
- resp.headers.get('x-auth-token',
- '').startswith('AUTH_tk'),
- resp.headers.get('x-auth-token'))
- self.assertEquals(resp.headers.get('x-auth-token'),
- resp.headers.get('x-storage-token'))
- self.assertEquals(resp.headers.get('x-storage-url'),
- 'http://127.0.0.1:8080/v1/AUTH_cfa')
- self.assertEquals(
- json.loads(resp.body),
- {"storage": {"default": "local",
- "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}})
- self.assertEquals(self.test_auth.app.calls, 5)
-
- def test_get_token_success_v1_0_encoded1(self):
- self._get_token_success_v1_0_encoded(
- 'act:usr', 'key', 'act%3ausr', 'key')
-
- def test_get_token_success_v1_0_encoded2(self):
- self._get_token_success_v1_0_encoded(
- 'act:u s r', 'key', 'act%3au%20s%20r', 'key')
-
- def test_get_token_success_v1_0_encoded3(self):
- self._get_token_success_v1_0_encoded(
- 'act:u s r', 'k:e:y', 'act%3au%20s%20r', 'k%3Ae%3ay')
-
- def test_allowed_sync_hosts(self):
- a = auth.filter_factory(
- {'super_admin_key': 'supertest'})(FakeApp())
- self.assertEquals(
- a.allowed_sync_hosts,
- ['127.0.0.1'])
- a = auth.filter_factory({
- 'super_admin_key': 'supertest',
- 'allowed_sync_hosts':
- '1.1.1.1,2.1.1.1, 3.1.1.1 , 4.1.1.1,, , 5.1.1.1'})(FakeApp())
- self.assertEquals(
- a.allowed_sync_hosts,
- ['1.1.1.1', '2.1.1.1', '3.1.1.1', '4.1.1.1', '5.1.1.1'])
-
- def test_reseller_admin_is_owner(self):
- orig_authorize = self.test_auth.authorize
- owner_values = []
-
- def mitm_authorize(req):
- rv = orig_authorize(req)
- owner_values.append(
- req.environ.get('swift_owner', False))
- return rv
-
- self.test_auth.authorize = mitm_authorize
-
- self.test_auth.app = FakeApp(iter([
- ('200 Ok', {},
- json.dumps(
- {'account': 'other', 'user': 'other:usr',
- 'account_id': 'AUTH_other',
- 'groups': [{'name': 'other:usr'}, {'name': 'other'},
- {'name': '.reseller_admin'}],
- 'expires': time() + 60})),
- ('204 No Content', {}, '')]))
- req = Request.blank(
- '/v1/AUTH_cfa',
- headers={'X-Auth-Token': 'AUTH_t'})
- resp = req.get_response(self.test_auth)
- self.assertEquals(resp.status_int, 204)
- self.assertEquals(owner_values, [True])
-
- def test_admin_is_owner(self):
- orig_authorize = self.test_auth.authorize
- owner_values = []
-
- def mitm_authorize(req):
- rv = orig_authorize(req)
- owner_values.append(
- req.environ.get('swift_owner', False))
- return rv
-
- self.test_auth.authorize = mitm_authorize
-
- self.test_auth.app = FakeApp(iter([
- ('200 Ok', {},
- json.dumps(
- {'account': 'act', 'user': 'act:usr',
- 'account_id': 'AUTH_cfa',
- 'groups': [{'name': 'act:usr'}, {'name': 'act'},
- {'name': '.admin'}],
- 'expires': time() + 60})),
- ('204 No Content', {}, '')]))
- req = Request.blank(
- '/v1/AUTH_cfa',
- headers={'X-Auth-Token': 'AUTH_t'})
- resp = req.get_response(self.test_auth)
- self.assertEquals(resp.status_int, 204)
- self.assertEquals(owner_values, [True])
-
- def test_regular_is_not_owner(self):
- orig_authorize = self.test_auth.authorize
- owner_values = []
-
- def mitm_authorize(req):
- rv = orig_authorize(req)
- owner_values.append(
- req.environ.get('swift_owner', False))
- return rv
-
- self.test_auth.authorize = mitm_authorize
-
- self.test_auth.app = FakeApp(iter([
- ('200 Ok', {},
- json.dumps(
- {'account': 'act', 'user': 'act:usr',
- 'account_id': 'AUTH_cfa',
- 'groups':
- [{'name': 'act:usr'}, {
- 'name': 'act'}],
- 'expires': time() + 60})),
- ('204 No Content', {}, '')]), acl='act:usr')
- req = Request.blank('/v1/AUTH_cfa/c',
- headers={'X-Auth-Token': 'AUTH_t'})
- resp = req.get_response(self.test_auth)
- self.assertEquals(resp.status_int, 204)
- self.assertEquals(owner_values, [False])
-
- def test_sync_request_success(self):
- self.test_auth.app = FakeApp(
- iter([('204 No Content', {}, '')]),
- sync_key='secret')
- req = Request.blank('/v1/AUTH_cfa/c/o',
- environ={
- 'REQUEST_METHOD': 'DELETE'},
- headers={
- 'x-container-sync-key':
- 'secret',
- 'x-timestamp': '123.456'})
- req.remote_addr = '127.0.0.1'
- resp = req.get_response(self.test_auth)
- self.assertEquals(resp.status_int, 204)
-
- def test_sync_request_fail_key(self):
- self.test_auth.app = FakeApp(
- iter([('204 No Content', {}, '')]),
- sync_key='secret')
- req = Request.blank('/v1/AUTH_cfa/c/o',
- environ={
- 'REQUEST_METHOD': 'DELETE'},
- headers={
- 'x-container-sync-key':
- 'wrongsecret',
- 'x-timestamp': '123.456'})
- req.remote_addr = '127.0.0.1'
- resp = req.get_response(self.test_auth)
- self.assertEquals(resp.status_int, 401)
-
- self.test_auth.app = FakeApp(
- iter([('204 No Content', {}, '')]),
- sync_key='othersecret')
- req = Request.blank('/v1/AUTH_cfa/c/o',
- environ={
- 'REQUEST_METHOD': 'DELETE'},
- headers={
- 'x-container-sync-key':
- 'secret',
- 'x-timestamp': '123.456'})
- req.remote_addr = '127.0.0.1'
- resp = req.get_response(self.test_auth)
- self.assertEquals(resp.status_int, 401)
-
- self.test_auth.app = FakeApp(
- iter([('204 No Content', {}, '')]),
- sync_key=None)
- req = Request.blank('/v1/AUTH_cfa/c/o',
- environ={
- 'REQUEST_METHOD': 'DELETE'},
- headers={
- 'x-container-sync-key':
- 'secret',
- 'x-timestamp': '123.456'})
- req.remote_addr = '127.0.0.1'
- resp = req.get_response(self.test_auth)
- self.assertEquals(resp.status_int, 401)
-
- def test_sync_request_fail_no_timestamp(self):
- self.test_auth.app = FakeApp(
- iter([('204 No Content', {}, '')]),
- sync_key='secret')
- req = Request.blank('/v1/AUTH_cfa/c/o',
- environ={
- 'REQUEST_METHOD': 'DELETE'},
- headers={'x-container-sync-key': 'secret'})
- req.remote_addr = '127.0.0.1'
- resp = req.get_response(self.test_auth)
- self.assertEquals(resp.status_int, 401)
-
- def test_sync_request_fail_sync_host(self):
- self.test_auth.app = FakeApp(
- iter([('204 No Content', {}, '')]),
- sync_key='secret')
- req = Request.blank('/v1/AUTH_cfa/c/o',
- environ={
- 'REQUEST_METHOD': 'DELETE'},
- headers={
- 'x-container-sync-key':
- 'secret',
- 'x-timestamp': '123.456'})
- req.remote_addr = '127.0.0.2'
- resp = req.get_response(self.test_auth)
- self.assertEquals(resp.status_int, 401)
-
- def test_sync_request_success_lb_sync_host(self):
- self.test_auth.app = FakeApp(
- iter([('204 No Content', {}, '')]),
- sync_key='secret')
- req = Request.blank('/v1/AUTH_cfa/c/o',
- environ={
- 'REQUEST_METHOD': 'DELETE'},
- headers={
- 'x-container-sync-key':
- 'secret',
- 'x-timestamp':
- '123.456',
- 'x-forwarded-for': '127.0.0.1'})
- req.remote_addr = '127.0.0.2'
- resp = req.get_response(self.test_auth)
- self.assertEquals(resp.status_int, 204)
-
- self.test_auth.app = FakeApp(
- iter([('204 No Content', {}, '')]),
- sync_key='secret')
- req = Request.blank('/v1/AUTH_cfa/c/o',
- environ={
- 'REQUEST_METHOD': 'DELETE'},
- headers={
- 'x-container-sync-key':
- 'secret',
- 'x-timestamp':
- '123.456',
- 'x-cluster-client-ip': '127.0.0.1'})
- req.remote_addr = '127.0.0.2'
- resp = req.get_response(self.test_auth)
- self.assertEquals(resp.status_int, 204)
-
- def _make_request(self, path, **kwargs):
- req = Request.blank(path, **kwargs)
- req.environ['swift.cache'] = FakeMemcache()
- return req
-
- def test_override_asked_for_but_not_allowed(self):
- self.test_auth = \
- auth.filter_factory(
- {'allow_overrides': 'false'})(FakeApp())
- req = self._make_request('/v1/AUTH_account',
- environ={'swift.authorize_override': True})
- resp = req.get_response(self.test_auth)
- self.assertEquals(resp.status_int, 401)
- self.assertEquals(resp.environ['swift.authorize'],
- self.test_auth.authorize)
-
- def test_override_asked_for_and_allowed(self):
- self.test_auth = \
- auth.filter_factory(
- {'allow_overrides': 'true'})(FakeApp())
- req = self._make_request('/v1/AUTH_account',
- environ={'swift.authorize_override': True})
- resp = req.get_response(self.test_auth)
- self.assertEquals(resp.status_int, 404)
- self.assertTrue(
- 'swift.authorize' not in resp.environ)
-
- def test_override_default_allowed(self):
- req = self._make_request('/v1/AUTH_account',
- environ={'swift.authorize_override': True})
- resp = req.get_response(self.test_auth)
- self.assertEquals(resp.status_int, 404)
- self.assertTrue(
- 'swift.authorize' not in resp.environ)
-
- def test_token_too_long(self):
- req = self._make_request('/v1/AUTH_account', headers={
- 'x-auth-token': 'a' * MAX_TOKEN_LENGTH})
- resp = req.get_response(self.test_auth)
- self.assertEquals(resp.status_int, 401)
- self.assertNotEquals(
- resp.body,
- 'Token exceeds maximum length.')
- req = self._make_request('/v1/AUTH_account', headers={
- 'x-auth-token': 'a' * (MAX_TOKEN_LENGTH + 1)})
- resp = req.get_response(self.test_auth)
- self.assertEquals(resp.status_int, 400)
- self.assertEquals(
- resp.body,
- 'Token exceeds maximum length.')
-
- def test_crazy_authorization(self):
- req = self._make_request('/v1/AUTH_account', headers={
- 'authorization': 'somebody elses header value'})
- resp = req.get_response(self.test_auth)
- self.assertEquals(resp.status_int, 401)
- self.assertEquals(resp.environ['swift.authorize'],
- self.test_auth.denied_response)
-
-
-if __name__ == '__main__':
- unittest.main()
diff --git a/gluster/swift/common/middleware/swiftkerbauth/__init__.py b/gluster/swift/common/middleware/swiftkerbauth/__init__.py
deleted file mode 100644
index c752df7..0000000
--- a/gluster/swift/common/middleware/swiftkerbauth/__init__.py
+++ /dev/null
@@ -1,38 +0,0 @@
-#!/usr/bin/env python
-# Copyright (c) 2013 Red Hat, Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
-# implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from swift.common.utils import readconf, config_true_value
-
-config_file = {}
-try:
- config_file = readconf("/etc/swift/proxy-server.conf",
- section_name="filter:cache")
-except SystemExit:
- pass
-
-MEMCACHE_SERVERS = config_file.get('memcache_servers', None)
-
-config_file = {}
-
-try:
- config_file = readconf("/etc/swift/proxy-server.conf",
- section_name="filter:kerbauth")
-except SystemExit:
- pass
-
-TOKEN_LIFE = int(config_file.get('token_life', 86400))
-RESELLER_PREFIX = config_file.get('reseller_prefix', "AUTH_")
-DEBUG_HEADERS = config_true_value(config_file.get('debug_headers', 'yes'))
diff --git a/gluster/swift/common/middleware/swiftkerbauth/apachekerbauth/etc/httpd/conf.d/swift-auth.conf b/gluster/swift/common/middleware/swiftkerbauth/apachekerbauth/etc/httpd/conf.d/swift-auth.conf
deleted file mode 100644
index 68472d8..0000000
--- a/gluster/swift/common/middleware/swiftkerbauth/apachekerbauth/etc/httpd/conf.d/swift-auth.conf
+++ /dev/null
@@ -1,12 +0,0 @@
-<Location /cgi-bin/swift-auth>
- AuthType Kerberos
- AuthName "Swift Authentication"
- KrbMethodNegotiate On
- KrbMethodK5Passwd On
- KrbSaveCredentials On
- KrbServiceName HTTP/client.example.com
- KrbAuthRealms EXAMPLE.COM
- Krb5KeyTab /etc/httpd/conf/http.keytab
- KrbVerifyKDC Off
- Require valid-user
-</Location>
diff --git a/gluster/swift/common/middleware/swiftkerbauth/apachekerbauth/var/www/cgi-bin/swift-auth b/gluster/swift/common/middleware/swiftkerbauth/apachekerbauth/var/www/cgi-bin/swift-auth
deleted file mode 100755
index 11fe0e2..0000000
--- a/gluster/swift/common/middleware/swiftkerbauth/apachekerbauth/var/www/cgi-bin/swift-auth
+++ /dev/null
@@ -1,70 +0,0 @@
-#!/usr/bin/python
-
-# Copyright (c) 2013 Red Hat, Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-# Requires the following command to be run:
-# setsebool -P httpd_can_network_connect 1
-# setsebool -P httpd_can_network_memcache 1
-
-import os
-import cgi
-from swift.common.memcached import MemcacheRing
-from time import time, ctime
-from swiftkerbauth import MEMCACHE_SERVERS, TOKEN_LIFE, DEBUG_HEADERS
-from swiftkerbauth.kerbauth_utils import get_remote_user, get_auth_data, \
- generate_token, set_auth_data, get_groups_from_username
-
-
-def main():
- try:
- username = get_remote_user(os.environ)
- except RuntimeError:
- print "Status: 401 Unauthorized\n"
- print "Malformed REMOTE_USER"
- return
-
- if not MEMCACHE_SERVERS:
- print "Status: 500 Internal Server Error\n"
- print "Memcache not configured in /etc/swift/proxy-server.conf"
- return
-
- mc_servers = [s.strip() for s in MEMCACHE_SERVERS.split(',') if s.strip()]
- mc = MemcacheRing(mc_servers)
-
- token, expires, groups = get_auth_data(mc, username)
-
- if not token:
- token = generate_token()
- expires = time() + TOKEN_LIFE
- groups = get_groups_from_username(username)
- set_auth_data(mc, username, token, expires, groups)
-
- print "X-Auth-Token: %s" % token
- print "X-Storage-Token: %s" % token
-
- # For debugging.
- if DEBUG_HEADERS:
- print "X-Debug-Remote-User: %s" % username
- print "X-Debug-Groups: %s" % groups
- print "X-Debug-Token-Life: %ss" % TOKEN_LIFE
- print "X-Debug-Token-Expires: %s" % ctime(expires)
-
- print ""
-
-try:
- print("Content-Type: text/html")
- main()
-except:
- cgi.print_exception()
diff --git a/gluster/swift/common/middleware/swiftkerbauth/kerbauth.py b/gluster/swift/common/middleware/swiftkerbauth/kerbauth.py
deleted file mode 100644
index 1a63a40..0000000
--- a/gluster/swift/common/middleware/swiftkerbauth/kerbauth.py
+++ /dev/null
@@ -1,463 +0,0 @@
-# Copyright (c) 2013 Red Hat, Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import errno
-from time import time, ctime
-from traceback import format_exc
-from eventlet import Timeout
-from urllib import unquote
-
-from swift.common.swob import Request, Response
-from swift.common.swob import HTTPBadRequest, HTTPForbidden, HTTPNotFound, \
- HTTPSeeOther, HTTPUnauthorized, HTTPServerError
-
-from swift.common.middleware.acl import clean_acl, parse_acl, referrer_allowed
-from swift.common.utils import cache_from_env, get_logger, \
- split_path, config_true_value
-from gluster.swift.common.middleware.swiftkerbauth.kerbauth_utils import \
- get_auth_data, generate_token, \
- set_auth_data, run_kinit, get_groups_from_username
-
-
-class KerbAuth(object):
- """
- Test authentication and authorization system.
-
- Add to your pipeline in proxy-server.conf, such as::
-
- [pipeline:main]
- pipeline = catch_errors cache kerbauth proxy-server
-
- Set account auto creation to true in proxy-server.conf::
-
- [app:proxy-server]
- account_autocreate = true
-
- And add a kerbauth filter section, such as::
-
- [filter:kerbauth]
- use = egg:swiftkerbauth#kerbauth
-
- See the proxy-server.conf-sample for more information.
-
- :param app: The next WSGI app in the pipeline
- :param conf: The dict of configuration values
- """
-
- def __init__(self, app, conf):
- self.app = app
- self.conf = conf
- self.logger = get_logger(conf, log_route='kerbauth')
- self.log_headers = config_true_value(conf.get('log_headers', 'f'))
- self.reseller_prefix = conf.get('reseller_prefix', 'AUTH').strip()
- if self.reseller_prefix and self.reseller_prefix[-1] != '_':
- self.reseller_prefix += '_'
- self.logger.set_statsd_prefix('kerbauth.%s' % (
- self.reseller_prefix if self.reseller_prefix else 'NONE',))
- self.auth_prefix = conf.get('auth_prefix', '/auth/')
- if not self.auth_prefix or not self.auth_prefix.strip('/'):
- self.logger.warning('Rewriting invalid auth prefix "%s" to '
- '"/auth/" (Non-empty auth prefix path '
- 'is required)' % self.auth_prefix)
- self.auth_prefix = '/auth/'
- if self.auth_prefix[0] != '/':
- self.auth_prefix = '/' + self.auth_prefix
- if self.auth_prefix[-1] != '/':
- self.auth_prefix += '/'
- self.token_life = int(conf.get('token_life', 86400))
- self.auth_method = conf.get('auth_method', 'passive')
- self.debug_headers = config_true_value(
- conf.get('debug_headers', 'yes'))
- self.realm_name = conf.get('realm_name', None)
- self.allow_overrides = config_true_value(
- conf.get('allow_overrides', 't'))
- self.storage_url_scheme = conf.get('storage_url_scheme', 'default')
- self.ext_authentication_url = conf.get('ext_authentication_url')
- if not self.ext_authentication_url:
- raise RuntimeError("Missing filter parameter ext_authentication_"
- "url in /etc/swift/proxy-server.conf")
-
- def __call__(self, env, start_response):
- """
- Accepts a standard WSGI application call, authenticating the request
- and installing callback hooks for authorization and ACL header
- validation. For an authenticated request, REMOTE_USER will be set to a
- comma separated list of the user's groups.
-
- If the request matches the self.auth_prefix, the request will be
- routed through the internal auth request handler (self.handle).
- This is to handle granting tokens, etc.
- """
- if self.allow_overrides and env.get('swift.authorize_override', False):
- return self.app(env, start_response)
- if env.get('PATH_INFO', '').startswith(self.auth_prefix):
- return self.handle(env, start_response)
- token = env.get('HTTP_X_AUTH_TOKEN', env.get('HTTP_X_STORAGE_TOKEN'))
- if token and token.startswith(self.reseller_prefix):
- groups = self.get_groups(env, token)
- if groups:
- user = groups and groups.split(',', 1)[0] or ''
- trans_id = env.get('swift.trans_id')
- self.logger.debug('User: %s uses token %s (trans_id %s)' %
- (user, token, trans_id))
- env['REMOTE_USER'] = groups
- env['swift.authorize'] = self.authorize
- env['swift.clean_acl'] = clean_acl
- if '.reseller_admin' in groups:
- env['reseller_request'] = True
- else:
- # Invalid token (may be expired)
- if self.auth_method == "active":
- return HTTPSeeOther(
- location=self.ext_authentication_url)(env,
- start_response)
- elif self.auth_method == "passive":
- self.logger.increment('unauthorized')
- return HTTPUnauthorized()(env, start_response)
- else:
- # With a non-empty reseller_prefix, I would like to be called
- # back for anonymous access to accounts I know I'm the
- # definitive auth for.
- try:
- version, rest = split_path(env.get('PATH_INFO', ''),
- 1, 2, True)
- except ValueError:
- version, rest = None, None
- self.logger.increment('errors')
- # Not my token, not my account, I can't authorize this request,
- # deny all is a good idea if not already set...
- if 'swift.authorize' not in env:
- env['swift.authorize'] = self.denied_response
-
- return self.app(env, start_response)
-
- def get_groups(self, env, token):
- """
- Get groups for the given token.
-
- :param env: The current WSGI environment dictionary.
- :param token: Token to validate and return a group string for.
-
- :returns: None if the token is invalid or a string containing a comma
- separated list of groups the authenticated user is a member
- of. The first group in the list is also considered a unique
- identifier for that user.
- """
- groups = None
- memcache_client = cache_from_env(env)
- if not memcache_client:
- raise Exception('Memcache required')
- memcache_token_key = '%s/token/%s' % (self.reseller_prefix, token)
- cached_auth_data = memcache_client.get(memcache_token_key)
- if cached_auth_data:
- expires, groups = cached_auth_data
- if expires < time():
- groups = None
-
- return groups
-
- def authorize(self, req):
- """
- Returns None if the request is authorized to continue or a standard
- WSGI response callable if not.
-
- Assumes that user groups are all lower case, which is true when Red Hat
- Enterprise Linux Identity Management is used.
- """
- try:
- version, account, container, obj = req.split_path(1, 4, True)
- except ValueError:
- self.logger.increment('errors')
- return HTTPNotFound(request=req)
-
- if not account or not account.startswith(self.reseller_prefix):
- self.logger.debug("Account name: %s doesn't start with "
- "reseller_prefix: %s."
- % (account, self.reseller_prefix))
- return self.denied_response(req)
-
- user_groups = (req.remote_user or '').split(',')
- account_user = user_groups[1] if len(user_groups) > 1 else None
- # If the user is in the reseller_admin group for our prefix, he gets
- # full access to all accounts we manage. For the default reseller
- # prefix, the group name is auth_reseller_admin.
- admin_group = ("%sreseller_admin" % self.reseller_prefix).lower()
- if admin_group in user_groups and \
- account != self.reseller_prefix and \
- account[len(self.reseller_prefix)] != '.':
- req.environ['swift_owner'] = True
- return None
-
- # The "account" is part of the request URL, and already contains the
- # reseller prefix, like in "/v1/AUTH_vol1/pictures/pic1.png".
- if account.lower() in user_groups and \
- (req.method not in ('DELETE', 'PUT') or container):
- # If the user is admin for the account and is not trying to do an
- # account DELETE or PUT...
- req.environ['swift_owner'] = True
- self.logger.debug("User %s has admin authorizing."
- % account_user)
- return None
-
- if (req.environ.get('swift_sync_key')
- and (req.environ['swift_sync_key'] ==
- req.headers.get('x-container-sync-key', None))
- and 'x-timestamp' in req.headers):
- self.logger.debug("Allow request with container sync-key: %s."
- % req.environ['swift_sync_key'])
- return None
-
- if req.method == 'OPTIONS':
- #allow OPTIONS requests to proceed as normal
- self.logger.debug("Allow OPTIONS request.")
- return None
-
- referrers, groups = parse_acl(getattr(req, 'acl', None))
-
- if referrer_allowed(req.referer, referrers):
- if obj or '.rlistings' in groups:
- self.logger.debug("Allow authorizing %s via referer ACL."
- % req.referer)
- return None
-
- for user_group in user_groups:
- if user_group in groups:
- self.logger.debug("User %s allowed in ACL: %s authorizing."
- % (account_user, user_group))
- return None
-
- return self.denied_response(req)
-
- def denied_response(self, req):
- """
- Returns a standard WSGI response callable with the status of 403 or 401
- depending on whether the REMOTE_USER is set or not.
- """
- if req.remote_user:
- self.logger.increment('forbidden')
- return HTTPForbidden(request=req)
- else:
- if self.auth_method == "active":
- return HTTPSeeOther(location=self.ext_authentication_url)
- elif self.auth_method == "passive":
- self.logger.increment('unauthorized')
- return HTTPUnauthorized(request=req)
-
- def handle(self, env, start_response):
- """
- WSGI entry point for auth requests (ones that match the
- self.auth_prefix).
- Wraps env in swob.Request object and passes it down.
-
- :param env: WSGI environment dictionary
- :param start_response: WSGI callable
- """
- try:
- req = Request(env)
- if self.auth_prefix:
- req.path_info_pop()
- req.bytes_transferred = '-'
- req.client_disconnect = False
- if 'x-storage-token' in req.headers and \
- 'x-auth-token' not in req.headers:
- req.headers['x-auth-token'] = req.headers['x-storage-token']
- return self.handle_request(req)(env, start_response)
- except (Exception, Timeout):
- print "EXCEPTION IN handle: %s: %s" % (format_exc(), env)
- self.logger.increment('errors')
- start_response('500 Server Error',
- [('Content-Type', 'text/plain')])
- return ['Internal server error.\n']
-
- def handle_request(self, req):
- """
- Entry point for auth requests (ones that match the self.auth_prefix).
- Should return a WSGI-style callable (such as webob.Response).
-
- :param req: swob.Request object
- """
- req.start_time = time()
- handler = None
- try:
- version, account, user, _junk = req.split_path(1, 4, True)
- except ValueError:
- self.logger.increment('errors')
- return HTTPNotFound(request=req)
- if version in ('v1', 'v1.0', 'auth'):
- if req.method == 'GET':
- handler = self.handle_get_token
- if not handler:
- self.logger.increment('errors')
- req.response = HTTPBadRequest(request=req)
- else:
- req.response = handler(req)
- return req.response
-
- def handle_get_token(self, req):
- """
- Handles the various `request for token and service end point(s)` calls.
- There are various formats to support the various auth servers in the
- past.
-
- "Active Mode" usage:
- All formats require GSS (Kerberos) authentication.
-
- GET <auth-prefix>/v1/<act>/auth
- GET <auth-prefix>/auth
- GET <auth-prefix>/v1.0
-
- On successful authentication, the response will have X-Auth-Token
- and X-Storage-Token set to the token to use with Swift.
-
- "Passive Mode" usage::
-
- GET <auth-prefix>/v1/<act>/auth
- X-Auth-User: <act>:<usr> or X-Storage-User: <usr>
- X-Auth-Key: <key> or X-Storage-Pass: <key>
- GET <auth-prefix>/auth
- X-Auth-User: <act>:<usr> or X-Storage-User: <act>:<usr>
- X-Auth-Key: <key> or X-Storage-Pass: <key>
- GET <auth-prefix>/v1.0
- X-Auth-User: <act>:<usr> or X-Storage-User: <act>:<usr>
- X-Auth-Key: <key> or X-Storage-Pass: <key>
-
- Values should be url encoded, "act%3Ausr" instead of "act:usr" for
- example; however, for backwards compatibility the colon may be
- included unencoded.
-
- On successful authentication, the response will have X-Auth-Token
- and X-Storage-Token set to the token to use with Swift and
- X-Storage-URL set to the URL to the default Swift cluster to use.
-
- :param req: The swob.Request to process.
- :returns: swob.Response, 2xx on success with data set as explained
- above.
- """
- # Validate the request info
- try:
- pathsegs = split_path(req.path_info, 1, 3, True)
- except ValueError:
- self.logger.increment('errors')
- return HTTPNotFound(request=req)
- if not ((pathsegs[0] == 'v1' and pathsegs[2] == 'auth')
- or pathsegs[0] in ('auth', 'v1.0')):
- return HTTPBadRequest(request=req)
-
- # Client is inside the domain
- if self.auth_method == "active":
- return HTTPSeeOther(location=self.ext_authentication_url)
-
- # Client is outside the domain
- elif self.auth_method == "passive":
- account, user, key = None, None, None
- # Extract user, account and key from request
- if pathsegs[0] == 'v1' and pathsegs[2] == 'auth':
- account = pathsegs[1]
- user = req.headers.get('x-storage-user')
- if not user:
- user = unquote(req.headers.get('x-auth-user', ''))
- if user:
- if ':' not in user:
- return HTTPUnauthorized(request=req)
- else:
- account2, user = user.split(':', 1)
- if account != account2:
- return HTTPUnauthorized(request=req)
- key = req.headers.get('x-storage-pass')
- if not key:
- key = unquote(req.headers.get('x-auth-key', ''))
- elif pathsegs[0] in ('auth', 'v1.0'):
- user = unquote(req.headers.get('x-auth-user', ''))
- if not user:
- user = req.headers.get('x-storage-user')
- if user:
- if ':' not in user:
- return HTTPUnauthorized(request=req)
- else:
- account, user = user.split(':', 1)
- key = unquote(req.headers.get('x-auth-key', ''))
- if not key:
- key = req.headers.get('x-storage-pass')
-
- if not (account or user or key):
- # If all are not given, client may be part of the domain
- return HTTPSeeOther(location=self.ext_authentication_url)
- elif None in (key, user, account):
- # If only one or two of them is given, but not all
- return HTTPUnauthorized(request=req)
-
- # Run kinit on the user
- if self.realm_name and "@" not in user:
- user = user + "@" + self.realm_name
- try:
- ret = run_kinit(user, key)
- except OSError as e:
- if e.errno == errno.ENOENT:
- return HTTPServerError("kinit command not found\n")
- if ret != 0:
- self.logger.warning("Failed: kinit %s", user)
- if ret == -1:
- self.logger.warning("Failed: kinit: Password has probably "
- "expired.")
- return HTTPServerError("Kinit is taking too long.\n")
- return HTTPUnauthorized(request=req)
- self.logger.debug("kinit succeeded")
-
- if "@" in user:
- user = user.split("@")[0]
-
- # Check if user really belongs to the account
- groups_list = get_groups_from_username(user).strip().split(",")
- user_group = ("%s%s" % (self.reseller_prefix, account)).lower()
- reseller_admin_group = \
- ("%sreseller_admin" % self.reseller_prefix).lower()
- if user_group not in groups_list:
- # Check if user is reseller_admin. If not, return Unauthorized.
- # On AD/IdM server, auth_reseller_admin is a separate group
- if reseller_admin_group not in groups_list:
- return HTTPUnauthorized(request=req)
-
- mc = cache_from_env(req.environ)
- if not mc:
- raise Exception('Memcache required')
- token, expires, groups = get_auth_data(mc, user)
- if not token:
- token = generate_token()
- expires = time() + self.token_life
- groups = get_groups_from_username(user)
- set_auth_data(mc, user, token, expires, groups)
-
- headers = {'X-Auth-Token': token,
- 'X-Storage-Token': token}
-
- if self.debug_headers:
- headers.update({'X-Debug-Remote-User': user,
- 'X-Debug-Groups:': groups,
- 'X-Debug-Token-Life': self.token_life,
- 'X-Debug-Token-Expires': ctime(expires)})
-
- resp = Response(request=req, headers=headers)
- resp.headers['X-Storage-Url'] = \
- '%s/v1/%s%s' % (resp.host_url, self.reseller_prefix, account)
- return resp
-
-
-def filter_factory(global_conf, **local_conf):
- """Returns a WSGI filter app for use with paste.deploy."""
- conf = global_conf.copy()
- conf.update(local_conf)
-
- def auth_filter(app):
- return KerbAuth(app, conf)
- return auth_filter
diff --git a/gluster/swift/common/middleware/swiftkerbauth/kerbauth_utils.py b/gluster/swift/common/middleware/swiftkerbauth/kerbauth_utils.py
deleted file mode 100644
index 599ef99..0000000
--- a/gluster/swift/common/middleware/swiftkerbauth/kerbauth_utils.py
+++ /dev/null
@@ -1,137 +0,0 @@
-# Copyright (c) 2013 Red Hat, Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
-# implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import re
-import random
-import grp
-import signal
-from subprocess import Popen, PIPE
-from time import time
-from gluster.swift.common.middleware.swiftkerbauth \
- import TOKEN_LIFE, RESELLER_PREFIX
-
-
-def get_remote_user(env):
- """Retrieve REMOTE_USER set by Apache from environment."""
- remote_user = env.get('REMOTE_USER', "")
- matches = re.match('([^@]+)@.*', remote_user)
- if not matches:
- raise RuntimeError("Malformed REMOTE_USER \"%s\"" % remote_user)
- return matches.group(1)
-
-
-def get_auth_data(mc, username):
- """
- Returns the token, expiry time and groups for the user if it already exists
- on memcache. Returns None otherwise.
-
- :param mc: MemcacheRing object
- :param username: swift user
- """
- token, expires, groups = None, None, None
- memcache_user_key = '%s/user/%s' % (RESELLER_PREFIX, username)
- candidate_token = mc.get(memcache_user_key)
- if candidate_token:
- memcache_token_key = '%s/token/%s' % (RESELLER_PREFIX, candidate_token)
- cached_auth_data = mc.get(memcache_token_key)
- if cached_auth_data:
- expires, groups = cached_auth_data
- if expires > time():
- token = candidate_token
- else:
- expires, groups = None, None
- return (token, expires, groups)
-
-
-def set_auth_data(mc, username, token, expires, groups):
- """
- Stores the following key value pairs on Memcache:
- (token, expires+groups)
- (user, token)
- """
- auth_data = (expires, groups)
- memcache_token_key = "%s/token/%s" % (RESELLER_PREFIX, token)
- mc.set(memcache_token_key, auth_data, time=TOKEN_LIFE)
-
- # Record the token with the user info for future use.
- memcache_user_key = '%s/user/%s' % (RESELLER_PREFIX, username)
- mc.set(memcache_user_key, token, time=TOKEN_LIFE)
-
-
-def generate_token():
- """Generates a random token."""
- # We don't use uuid.uuid4() here because importing the uuid module
- # causes (harmless) SELinux denials in the audit log on RHEL 6. If this
- # is a security concern, a custom SELinux policy module could be
- # written to not log those denials.
- r = random.SystemRandom()
- token = '%stk%s' % \
- (RESELLER_PREFIX,
- ''.join(r.choice('abcdef0123456789') for x in range(32)))
- return token
-
-
-def get_groups_from_username(username):
- """Return a set of groups to which the user belongs to."""
- # Retrieve the numerical group IDs. We cannot list the group names
- # because group names from Active Directory may contain spaces, and
- # we wouldn't be able to split the list of group names into its
- # elements.
- p = Popen(['id', '-G', username], stdout=PIPE)
- if p.wait() != 0:
- raise RuntimeError("Failure running id -G for %s" % username)
- (p_stdout, p_stderr) = p.communicate()
-
- # Convert the group numbers into group names.
- groups = []
- for gid in p_stdout.strip().split(" "):
- groups.append(grp.getgrgid(int(gid))[0])
-
- # The first element of the list is considered a unique identifier
- # for the user. We add the username to accomplish this.
- if username in groups:
- groups.remove(username)
- groups = [username] + groups
- groups = ','.join(groups)
- return groups
-
-
-def run_kinit(username, password):
- """Runs kinit command as a child process and returns the status code."""
- kinit = Popen(['kinit', username],
- stdin=PIPE, stdout=PIPE, stderr=PIPE)
- kinit.stdin.write('%s\n' % password)
-
- # The following code handles a corner case where the Kerberos password
- # has expired and a prompt is displayed to enter new password. Ideally,
- # we would want to read from stdout but these are blocked reads. This is
- # a hack to kill the process if it's taking too long!
-
- class Alarm(Exception):
- pass
-
- def signal_handler(signum, frame):
- raise Alarm
- # Set the signal handler and a 1-second alarm
- signal.signal(signal.SIGALRM, signal_handler)
- signal.alarm(1)
- try:
- kinit.wait() # Wait for the child to exit
- signal.alarm(0) # Reset the alarm
- return kinit.returncode # Exit status of child on graceful exit
- except Alarm:
- # Taking too long, kill and return error
- kinit.kill()
- return -1