summaryrefslogtreecommitdiffstats
path: root/test/unit/common/middleware
diff options
context:
space:
mode:
authorThiago da Silva <thiago@redhat.com>2014-04-22 14:15:02 -0400
committerPrashanth Pai <ppai@redhat.com>2016-01-06 07:53:12 -0800
commit2a8f9f0f530327039c32e444b6a27130b12666bd (patch)
treee24e38b5b3c0245a0acafc63fc50bacbf7de718a /test/unit/common/middleware
parent4c6ca1db931377b75583f61a7bca262cfc27b0fa (diff)
Update repo
This is a squashed commit imported from this repo: https://github.com/openstack/swiftonfile/tree/icehouse Contains the follwing commits from above mentioned repo: eb50236 Merge "Backport: Fix metadata overall limits bug" into icehouse 79ea52a Backport: Fix metadata overall limits bug bc43f0b Fix inconsistent data being returned on GET ad0bb79 Import HTTPBadRequest from swift's module 74d02e6 Exclude .trashcan dir from container listing b2dbc15 Catch ESTALE in addition to ENOENT 8d60b48 Properly handle read_metadata() exceptions 6762fc6 Fix object server leaking file descriptors 2842e82 Fix API incompatibility in update_metadata() 2beeef6 Merge "Remove swiftkerbauth code" into icehouse 93dbcb5 Update object-expirer.conf with explanations c9d2f09 Merge "Check if /etc/swift exists in ring builder" into icehouse d66c14c Remove swiftkerbauth code 3142ed2 Add object expiration functests 97153d1 Merge "Cleanup functest and undo old patch" into icehouse bc234d0 Remove old travis config file and fix typo 260c8ef Check if /etc/swift exists in ring builder 637dac9 Cleanup functest and undo old patch 051e068 Merge pull request #35 from prashanthpai/backport-1 be104a3 Merge pull request #36 from prashanthpai/backport-2 ff76f42 fix issue with GET on large object (icehouse-backport) 04d0a99 Fix unlink call after successful rename 4c6ca1d updating README file with project name change 10b2680 Merge pull request #18 from thiagol11/icehouse 5bcab8f Updating version on __init__ file 5c2cba2 Merge pull request #15 from thiagol11/update_spec 52b00a8 updating spec file to add dependency on swift icehouse ae7c93b Merge pull request #6 from prashanthpai/rebase 191e55b Revert: allow non-root user to run functests cb7e968 Modify unit tests and func tests d23fd1b Sync with OpenStack Swift v1.13.1 b6d1671 Merge pull request #12 from pushpesh/functionalnosetestremove 962622b Merge pull request #8 from thiagol11/update_readme 4560857 Merge pull request #9 from prashanthpai/spec-expirer be0ae7e Minor update 65000f1 Removing functionalnosetests 8ab1069 Fix object-expirer.conf-gluster RPM build error afee30f added new support filesystem section 527b01f updated README.md to Swift-On-File 9a240c7 Merge pull request #3 from thiagol11/add_jenkins_to_travis 34b5a8b removing blank lines 3568b64 fixing missing fi d8f5b0f adding support to run jenkins triggered by travis 6f4a88c Removing functionalnosetests 8041944 Update README.md c015148 Merge pull request #2 from thiagol11/master 3ddd952 fixing travis file to run correct unit test c582669 adding travis status badge to README 8093096 adding py26 unit testing to travis 37835fd trigger travis build cb6332a adding travis ci testing All tests have been run sucessfully against this. tox -e p2p8,py27,functest Change-Id: I096b611da852d3eb3913844034b443b8272c2ac4 Signed-off-by: Prashanth Pai <ppai@redhat.com> Reviewed-on: http://review.gluster.org/13188
Diffstat (limited to 'test/unit/common/middleware')
-rw-r--r--test/unit/common/middleware/swiftkerbauth/__init__.py0
-rw-r--r--test/unit/common/middleware/swiftkerbauth/test_kerbauth.py478
-rw-r--r--test/unit/common/middleware/swiftkerbauth/test_kerbauth_utils.py77
3 files changed, 0 insertions, 555 deletions
diff --git a/test/unit/common/middleware/swiftkerbauth/__init__.py b/test/unit/common/middleware/swiftkerbauth/__init__.py
deleted file mode 100644
index e69de29..0000000
--- a/test/unit/common/middleware/swiftkerbauth/__init__.py
+++ /dev/null
diff --git a/test/unit/common/middleware/swiftkerbauth/test_kerbauth.py b/test/unit/common/middleware/swiftkerbauth/test_kerbauth.py
deleted file mode 100644
index 537b8d3..0000000
--- a/test/unit/common/middleware/swiftkerbauth/test_kerbauth.py
+++ /dev/null
@@ -1,478 +0,0 @@
-# Copyright (c) 2013 Red Hat, Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
-# implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import os
-import errno
-import unittest
-from time import time
-from mock import patch, Mock
-from test.unit import FakeMemcache
-from swift.common.swob import Request, Response
-from gluster.swift.common.middleware.swiftkerbauth import kerbauth as auth
-
-EXT_AUTHENTICATION_URL = "127.0.0.1"
-REDIRECT_STATUS = 303 # HTTPSeeOther
-
-
-def my_filter_factory(global_conf, **local_conf):
- if 'ext_authentication_url' not in global_conf:
- global_conf['ext_authentication_url'] = EXT_AUTHENTICATION_URL
- conf = global_conf.copy()
- conf.update(local_conf)
-
- def auth_filter(app):
- return auth.KerbAuth(app, conf)
- return auth_filter
-
-# Monkey patching filter_factory to always pass ext_authentication_url
-# as a parameter. Absence of ext_authentication_url raises a RuntimeError
-
-
-def patch_filter_factory():
- auth.filter_factory = my_filter_factory
-
-
-def unpatch_filter_factory():
- reload(auth)
-
-
-class FakeApp(object):
-
- def __init__(self, status_headers_body_iter=None, acl=None, sync_key=None):
- self.calls = 0
- self.status_headers_body_iter = status_headers_body_iter
- if not self.status_headers_body_iter:
- self.status_headers_body_iter = iter([('404 Not Found', {}, '')])
- self.acl = acl
- self.sync_key = sync_key
-
- def __call__(self, env, start_response):
- self.calls += 1
- self.request = Request.blank('', environ=env)
- if self.acl:
- self.request.acl = self.acl
- if self.sync_key:
- self.request.environ['swift_sync_key'] = self.sync_key
- if 'swift.authorize' in env:
- resp = env['swift.authorize'](self.request)
- if resp:
- return resp(env, start_response)
- status, headers, body = self.status_headers_body_iter.next()
- return Response(status=status, headers=headers,
- body=body)(env, start_response)
-
-
-class TestKerbAuth(unittest.TestCase):
-
- # Patch auth.filter_factory()
- patch_filter_factory()
-
- def setUp(self):
- self.test_auth = \
- auth.filter_factory({'auth_method': 'active'})(FakeApp())
- self.test_auth_passive = \
- auth.filter_factory({'auth_method': 'passive'})(FakeApp())
-
- def _make_request(self, path, **kwargs):
- req = Request.blank(path, **kwargs)
- req.environ['swift.cache'] = FakeMemcache()
- return req
-
- def test_no_ext_authentication_url(self):
- app = FakeApp()
- try:
- # Use original auth.filter_factory and NOT monkey patched version
- unpatch_filter_factory()
- auth.filter_factory({})(app)
- except RuntimeError as e:
- # Restore monkey patched version
- patch_filter_factory()
- self.assertTrue(e.args[0].startswith("Missing filter parameter "
- "ext_authentication_url"))
-
- def test_reseller_prefix_init(self):
- app = FakeApp()
- ath = auth.filter_factory({})(app)
- self.assertEquals(ath.reseller_prefix, 'AUTH_')
- ath = auth.filter_factory({'reseller_prefix': 'TEST'})(app)
- self.assertEquals(ath.reseller_prefix, 'TEST_')
- ath = auth.filter_factory({'reseller_prefix': 'TEST_'})(app)
- self.assertEquals(ath.reseller_prefix, 'TEST_')
-
- def test_auth_prefix_init(self):
- app = FakeApp()
- ath = auth.filter_factory({})(app)
- self.assertEquals(ath.auth_prefix, '/auth/')
- ath = auth.filter_factory({'auth_prefix': ''})(app)
- self.assertEquals(ath.auth_prefix, '/auth/')
- ath = auth.filter_factory({'auth_prefix': '/'})(app)
- self.assertEquals(ath.auth_prefix, '/auth/')
- ath = auth.filter_factory({'auth_prefix': '/test/'})(app)
- self.assertEquals(ath.auth_prefix, '/test/')
- ath = auth.filter_factory({'auth_prefix': '/test'})(app)
- self.assertEquals(ath.auth_prefix, '/test/')
- ath = auth.filter_factory({'auth_prefix': 'test/'})(app)
- self.assertEquals(ath.auth_prefix, '/test/')
- ath = auth.filter_factory({'auth_prefix': 'test'})(app)
- self.assertEquals(ath.auth_prefix, '/test/')
-
- def test_top_level_redirect(self):
- req = self._make_request('/')
- resp = req.get_response(self.test_auth)
- self.assertEquals(resp.status_int, REDIRECT_STATUS)
- self.assertEquals(req.environ['swift.authorize'],
- self.test_auth.denied_response)
-
- def test_passive_top_level_deny(self):
- req = self._make_request('/')
- resp = req.get_response(self.test_auth_passive)
- self.assertEquals(resp.status_int, 401)
- self.assertEquals(req.environ['swift.authorize'],
- self.test_auth_passive.denied_response)
-
- def test_passive_deny_invalid_token(self):
- req = self._make_request('/v1/AUTH_account',
- headers={'X-Auth-Token': 'AUTH_t'})
- resp = req.get_response(self.test_auth_passive)
- self.assertEquals(resp.status_int, 401)
-
- def test_override_asked_for_and_allowed(self):
- self.test_auth = \
- auth.filter_factory({'allow_overrides': 'true'})(FakeApp())
- req = self._make_request('/v1/AUTH_account',
- environ={'swift.authorize_override': True})
- resp = req.get_response(self.test_auth)
- self.assertEquals(resp.status_int, 404)
- self.assertTrue('swift.authorize' not in req.environ)
-
- def test_override_default_allowed(self):
- req = self._make_request('/v1/AUTH_account',
- environ={'swift.authorize_override': True})
- resp = req.get_response(self.test_auth)
- self.assertEquals(resp.status_int, 404)
- self.assertTrue('swift.authorize' not in req.environ)
-
- def test_options_call(self):
- req = self._make_request('/v1/AUTH_cfa/c/o',
- environ={'REQUEST_METHOD': 'OPTIONS'})
- resp = self.test_auth.authorize(req)
- self.assertEquals(resp, None)
-
- def test_auth_deny_non_reseller_prefix_no_override(self):
- fake_authorize = lambda x: Response(status='500 Fake')
- req = self._make_request('/v1/BLAH_account',
- headers={'X-Auth-Token': 'BLAH_t'},
- environ={'swift.authorize': fake_authorize}
- )
- resp = req.get_response(self.test_auth)
- self.assertEquals(resp.status_int, 500)
- self.assertEquals(req.environ['swift.authorize'], fake_authorize)
-
- def test_authorize_acl_group_access(self):
- req = self._make_request('/v1/AUTH_cfa')
- req.remote_user = 'act:usr,act'
- resp = self.test_auth.authorize(req)
- self.assertEquals(resp.status_int, 403)
- req = self._make_request('/v1/AUTH_cfa')
- req.remote_user = 'act:usr,act'
- req.acl = 'act'
- self.assertEquals(self.test_auth.authorize(req), None)
- req = self._make_request('/v1/AUTH_cfa')
- req.remote_user = 'act:usr,act'
- req.acl = 'act:usr'
- self.assertEquals(self.test_auth.authorize(req), None)
- req = self._make_request('/v1/AUTH_cfa')
- req.remote_user = 'act:usr,act'
-
- def test_deny_cross_reseller(self):
- # Tests that cross-reseller is denied, even if ACLs/group names match
- req = self._make_request('/v1/OTHER_cfa')
- req.remote_user = 'act:usr,act,AUTH_cfa'
- req.acl = 'act'
- resp = self.test_auth.authorize(req)
- self.assertEquals(resp.status_int, 403)
-
- def test_authorize_acl_referer_after_user_groups(self):
- req = self._make_request('/v1/AUTH_cfa/c')
- req.remote_user = 'act:usr'
- req.acl = '.r:*,act:usr'
- self.assertEquals(self.test_auth.authorize(req), None)
-
- def test_detect_reseller_request(self):
- req = self._make_request('/v1/AUTH_admin',
- headers={'X-Auth-Token': 'AUTH_t'})
- cache_key = 'AUTH_/token/AUTH_t'
- cache_entry = (time() + 3600, '.reseller_admin')
- req.environ['swift.cache'].set(cache_key, cache_entry)
- req.get_response(self.test_auth)
- self.assertTrue(req.environ.get('reseller_request', False))
-
- def test_regular_is_not_owner(self):
- orig_authorize = self.test_auth.authorize
- owner_values = []
-
- def mitm_authorize(req):
- rv = orig_authorize(req)
- owner_values.append(req.environ.get('swift_owner', False))
- return rv
-
- self.test_auth.authorize = mitm_authorize
-
- req = self._make_request(
- '/v1/AUTH_cfa/c',
- headers={'X-Auth-Token': 'AUTH_t'})
- req.remote_user = 'act:usr'
- self.test_auth.authorize(req)
- self.assertEquals(owner_values, [False])
-
- def test_no_memcache(self):
- env = {'swift.cache': None}
- try:
- self.test_auth.get_groups(env, None)
- except Exception as e:
- self.assertTrue(e.args[0].startswith("Memcache required"))
-
- def test_handle_request(self):
- req = self._make_request('/auth/v1.0')
- resp = self.test_auth.handle_request(req)
- self.assertEquals(resp.status_int, REDIRECT_STATUS)
-
- def test_handle_request_bad_request(self):
- req = self._make_request('////')
- resp = self.test_auth.handle_request(req)
- self.assertEquals(resp.status_int, 404)
-
- def test_handle_request_no_handler(self):
- req = self._make_request('/blah/blah/blah/blah')
- resp = self.test_auth.handle_request(req)
- self.assertEquals(resp.status_int, 400)
-
- def test_handle_get_token_bad_request(self):
- req = self._make_request('/blah/blah')
- resp = self.test_auth.handle_get_token(req)
- self.assertEquals(resp.status_int, 400)
- req = self._make_request('/////')
- resp = self.test_auth.handle_get_token(req)
- self.assertEquals(resp.status_int, 404)
-
- def test_passive_handle_get_token_no_user_or_key(self):
- #No user and key
- req = self._make_request('/auth/v1.0')
- resp = self.test_auth_passive.handle_get_token(req)
- self.assertEquals(resp.status_int, REDIRECT_STATUS)
- #User given but no key
- req = self._make_request('/auth/v1.0',
- headers={'X-Auth-User': 'test:user'})
- resp = self.test_auth_passive.handle_get_token(req)
- self.assertEquals(resp.status_int, 401)
-
- def test_passive_handle_get_token_account_in_req_path(self):
- req = self._make_request('/v1/test/auth',
- headers={'X-Auth-User': 'test:user',
- 'X-Auth-Key': 'password'})
- _mock_run_kinit = Mock(return_value=0)
- _mock_get_groups = Mock(return_value="user,auth_test")
- with patch('gluster.swift.common.middleware.swiftkerbauth.kerbauth.run_kinit', _mock_run_kinit):
- with patch('gluster.swift.common.middleware.swiftkerbauth.kerbauth.get_groups_from_username',
- _mock_get_groups):
- resp = self.test_auth_passive.handle_get_token(req)
- _mock_run_kinit.assert_called_once_with('user', 'password')
- self.assertEquals(_mock_get_groups.call_count, 2)
- self.assertEquals(resp.status_int, 200)
- self.assertTrue(resp.headers['X-Auth-Token'] is not None)
- self.assertTrue(resp.headers['X-Storage-Token'] is not None)
- self.assertTrue(resp.headers['X-Storage-Url'] is not None)
-
- def test_passive_handle_get_token_user_invalid_or_no__account(self):
- #X-Auth-User not in acc:user format
- req = self._make_request('/auth/v1.0',
- headers={'X-Auth-User': 'user'})
- resp = self.test_auth_passive.handle_get_token(req)
- self.assertEquals(resp.status_int, 401)
- req = self._make_request('/v1/test/auth',
- headers={'X-Auth-User': 'user'})
- resp = self.test_auth_passive.handle_get_token(req)
- self.assertEquals(resp.status_int, 401)
- # Account name mismatch
- req = self._make_request('/v1/test/auth',
- headers={'X-Auth-User': 'wrongacc:user'})
- resp = self.test_auth_passive.handle_get_token(req)
- self.assertEquals(resp.status_int, 401)
-
- def test_passive_handle_get_token_no_kinit(self):
- req = self._make_request('/auth/v1.0',
- headers={'X-Auth-User': 'test:user',
- 'X-Auth-Key': 'password'})
- _mock_run_kinit = Mock(side_effect=OSError(errno.ENOENT,
- os.strerror(errno.ENOENT)))
- with patch('gluster.swift.common.middleware.swiftkerbauth.kerbauth.run_kinit', _mock_run_kinit):
- resp = self.test_auth_passive.handle_get_token(req)
- self.assertEquals(resp.status_int, 500)
- self.assertTrue("kinit command not found" in resp.body)
- _mock_run_kinit.assert_called_once_with('user', 'password')
-
- def test_passive_handle_get_token_kinit_fail(self):
- req = self._make_request('/auth/v1.0',
- headers={'X-Auth-User': 'test:user',
- 'X-Auth-Key': 'password'})
- _mock_run_kinit = Mock(return_value=1)
- with patch('gluster.swift.common.middleware.swiftkerbauth.kerbauth.run_kinit', _mock_run_kinit):
- resp = self.test_auth_passive.handle_get_token(req)
- self.assertEquals(resp.status_int, 401)
- _mock_run_kinit.assert_called_once_with('user', 'password')
-
- def test_passive_handle_get_token_kinit_success_token_not_present(self):
- req = self._make_request('/auth/v1.0',
- headers={'X-Auth-User': 'test:user',
- 'X-Auth-Key': 'password'})
- _mock_run_kinit = Mock(return_value=0)
- _mock_get_groups = Mock(return_value="user,auth_test")
- with patch('gluster.swift.common.middleware.swiftkerbauth.kerbauth.run_kinit', _mock_run_kinit):
- with patch('gluster.swift.common.middleware.swiftkerbauth.kerbauth.get_groups_from_username',
- _mock_get_groups):
- resp = self.test_auth_passive.handle_get_token(req)
- _mock_run_kinit.assert_called_once_with('user', 'password')
- self.assertEquals(_mock_get_groups.call_count, 2)
- self.assertEquals(resp.status_int, 200)
- self.assertTrue(resp.headers['X-Auth-Token'] is not None)
- self.assertTrue(resp.headers['X-Storage-Token'] is not None)
- self.assertTrue(resp.headers['X-Storage-Url'] is not None)
-
- def test_passive_handle_get_token_kinit_realm_and_memcache(self):
- req = self._make_request('/auth/v1.0',
- headers={'X-Auth-User': 'test:user',
- 'X-Auth-Key': 'password'})
- req.environ['swift.cache'] = None
- _auth_passive = \
- auth.filter_factory({'auth_method': 'passive',
- 'realm_name': 'EXAMPLE.COM'})(FakeApp())
- _mock_run_kinit = Mock(return_value=0)
- _mock_get_groups = Mock(return_value="user,auth_test")
- with patch('gluster.swift.common.middleware.swiftkerbauth.kerbauth.run_kinit', _mock_run_kinit):
- with patch('gluster.swift.common.middleware.swiftkerbauth.kerbauth.get_groups_from_username',
- _mock_get_groups):
- try:
- _auth_passive.handle_get_token(req)
- except Exception as e:
- self.assertTrue(e.args[0].startswith("Memcache "
- "required"))
- else:
- self.fail("Expected Exception - Memcache required")
- _mock_run_kinit.assert_called_once_with('user@EXAMPLE.COM', 'password')
- _mock_get_groups.assert_called_once_with('user')
-
- def test_passive_handle_get_token_user_in_any__account(self):
- req = self._make_request('/auth/v1.0',
- headers={'X-Auth-User': 'test:user',
- 'X-Auth-Key': 'password'})
- _mock_run_kinit = Mock(return_value=0)
- _mock_get_groups = Mock(return_value="user,auth_blah")
- with patch('gluster.swift.common.middleware.swiftkerbauth.kerbauth.run_kinit', _mock_run_kinit):
- with patch('gluster.swift.common.middleware.swiftkerbauth.kerbauth.get_groups_from_username',
- _mock_get_groups):
- resp = self.test_auth_passive.handle_get_token(req)
- self.assertEquals(resp.status_int, 401)
- _mock_run_kinit.assert_called_once_with('user', 'password')
- _mock_get_groups.assert_called_once_with('user')
-
- def test_handle(self):
- req = self._make_request('/auth/v1.0')
- resp = req.get_response(self.test_auth)
- self.assertEquals(resp.status_int, REDIRECT_STATUS)
-
- def test_authorize_invalid_req(self):
- req = self._make_request('/')
- resp = self.test_auth.authorize(req)
- self.assertEquals(resp.status_int, 404)
-
- def test_authorize_set_swift_owner(self):
- req = self._make_request('/v1/AUTH_test/c1/o1')
- req.remote_user = 'test,auth_reseller_admin'
- resp = self.test_auth.authorize(req)
- self.assertEquals(req.environ['swift_owner'], True)
- self.assertTrue(resp is None)
- req = self._make_request('/v1/AUTH_test/c1/o1')
- req.remote_user = 'test,auth_test'
- resp = self.test_auth.authorize(req)
- self.assertEquals(req.environ['swift_owner'], True)
- self.assertTrue(resp is None)
-
- def test_authorize_swift_sync_key(self):
- req = self._make_request(
- '/v1/AUTH_cfa/c/o',
- environ={'swift_sync_key': 'secret'},
- headers={'x-container-sync-key': 'secret',
- 'x-timestamp': '123.456'})
- resp = self.test_auth.authorize(req)
- self.assertTrue(resp is None)
-
- def test_authorize_acl_referrer_access(self):
- req = self._make_request('/v1/AUTH_cfa/c')
- req.remote_user = 'act:usr,act'
- resp = self.test_auth.authorize(req)
- self.assertEquals(resp.status_int, 403)
- req = self._make_request('/v1/AUTH_cfa/c')
- req.remote_user = 'act:usr,act'
- req.acl = '.r:*,.rlistings'
- self.assertEquals(self.test_auth.authorize(req), None)
- req = self._make_request('/v1/AUTH_cfa/c')
- req.remote_user = 'act:usr,act'
- req.acl = '.r:*' # No listings allowed
- resp = self.test_auth.authorize(req)
- self.assertEquals(resp.status_int, 403)
- req = self._make_request('/v1/AUTH_cfa/c')
- req.remote_user = 'act:usr,act'
- req.acl = '.r:.example.com,.rlistings'
- resp = self.test_auth.authorize(req)
- self.assertEquals(resp.status_int, 403)
- req = self._make_request('/v1/AUTH_cfa/c')
- req.remote_user = 'act:usr,act'
- req.referer = 'http://www.example.com/index.html'
- req.acl = '.r:.example.com,.rlistings'
- self.assertEquals(self.test_auth.authorize(req), None)
- req = self._make_request('/v1/AUTH_cfa/c')
- resp = self.test_auth.authorize(req)
- self.assertEquals(resp.status_int, REDIRECT_STATUS)
- req = self._make_request('/v1/AUTH_cfa/c')
- req.acl = '.r:*,.rlistings'
- self.assertEquals(self.test_auth.authorize(req), None)
- req = self._make_request('/v1/AUTH_cfa/c')
- req.acl = '.r:*' # No listings allowed
- resp = self.test_auth.authorize(req)
- self.assertEquals(resp.status_int, REDIRECT_STATUS)
- req = self._make_request('/v1/AUTH_cfa/c')
- req.acl = '.r:.example.com,.rlistings'
- resp = self.test_auth.authorize(req)
- self.assertEquals(resp.status_int, REDIRECT_STATUS)
- req = self._make_request('/v1/AUTH_cfa/c')
- req.referer = 'http://www.example.com/index.html'
- req.acl = '.r:.example.com,.rlistings'
- self.assertEquals(self.test_auth.authorize(req), None)
-
- def test_handle_x_storage_token(self):
- req = self._make_request(
- '/auth/v1.0',
- headers={'x-storage-token': 'blahblah', })
- resp = req.get_response(self.test_auth)
- self.assertEquals(resp.status_int, REDIRECT_STATUS)
-
- def test_invalid_token(self):
- req = self._make_request('/k1/test')
- req.environ['HTTP_X_AUTH_TOKEN'] = 'AUTH_blahblahblah'
- resp = req.get_response(self.test_auth)
- self.assertEquals(resp.status_int, REDIRECT_STATUS)
-
-if __name__ == '__main__':
- unittest.main()
diff --git a/test/unit/common/middleware/swiftkerbauth/test_kerbauth_utils.py b/test/unit/common/middleware/swiftkerbauth/test_kerbauth_utils.py
deleted file mode 100644
index 2a4e90b..0000000
--- a/test/unit/common/middleware/swiftkerbauth/test_kerbauth_utils.py
+++ /dev/null
@@ -1,77 +0,0 @@
-# Copyright (c) 2013 Red Hat, Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
-# implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import unittest
-import re
-from time import time
-from test.unit import FakeMemcache
-from gluster.swift.common.middleware.swiftkerbauth import kerbauth_utils as ku
-
-
-class TestKerbUtils(unittest.TestCase):
-
- def test_get_remote_user(self):
- env = {'REMOTE_USER': "auth_admin@EXAMPLE.COM"}
- result = ku.get_remote_user(env)
- self.assertEqual(result, "auth_admin")
-
- def test_get_remote_user_err(self):
- env = {'REMOTE_USER': "auth_admin"}
- try:
- ku.get_remote_user(env)
- except RuntimeError as err:
- self.assertTrue(err.args[0].startswith("Malformed REMOTE_USER"))
- else:
- self.fail("Expected RuntimeError")
-
- def test_get_auth_data(self):
- mc = FakeMemcache()
- expiry = time() + 100
- ku.set_auth_data(mc, "root", "AUTH_tk", expiry, "root,admin")
- (token, expires, groups) = ku.get_auth_data(mc, "root")
- self.assertEqual(("AUTH_tk", expiry, "root,admin"),
- (token, expires, groups))
-
- def test_get_auth_data_err(self):
- mc = FakeMemcache()
- (token, expires, groups) = ku.get_auth_data(mc, "root")
- self.assertEqual((token, expires, groups), (None, None, None))
-
- expiry = time() - 1
- ku.set_auth_data(mc, "root", "AUTH_tk", expiry, "root,admin")
- (token, expires, groups) = ku.get_auth_data(mc, "root")
- self.assertEqual((token, expires, groups), (None, None, None))
-
- def test_set_auth_data(self):
- mc = FakeMemcache()
- expiry = time() + 100
- ku.set_auth_data(mc, "root", "AUTH_tk", expiry, "root,admin")
-
- def test_generate_token(self):
- token = ku.generate_token()
- matches = re.match('AUTH_tk[a-f0-9]{32}', token)
- self.assertTrue(matches is not None)
-
- def test_get_groups_from_username(self):
- groups = ku.get_groups_from_username("root")
- self.assertTrue("root" in groups)
-
- def test_get_groups_from_username_err(self):
- try:
- ku.get_groups_from_username("Zroot")
- except RuntimeError as err:
- self.assertTrue(err.args[0].startswith("Failure running id -G"))
- else:
- self.fail("Expected RuntimeError")