summaryrefslogtreecommitdiffstats
path: root/test/unit/common
diff options
context:
space:
mode:
Diffstat (limited to 'test/unit/common')
-rw-r--r--test/unit/common/middleware/swiftkerbauth/__init__.py0
-rw-r--r--test/unit/common/middleware/swiftkerbauth/test_kerbauth.py478
-rw-r--r--test/unit/common/middleware/swiftkerbauth/test_kerbauth_utils.py77
-rw-r--r--test/unit/common/test_constraints.py73
-rw-r--r--test/unit/common/test_diskdir.py14
-rw-r--r--test/unit/common/test_utils.py24
6 files changed, 30 insertions, 636 deletions
diff --git a/test/unit/common/middleware/swiftkerbauth/__init__.py b/test/unit/common/middleware/swiftkerbauth/__init__.py
deleted file mode 100644
index e69de29..0000000
--- a/test/unit/common/middleware/swiftkerbauth/__init__.py
+++ /dev/null
diff --git a/test/unit/common/middleware/swiftkerbauth/test_kerbauth.py b/test/unit/common/middleware/swiftkerbauth/test_kerbauth.py
deleted file mode 100644
index 537b8d3..0000000
--- a/test/unit/common/middleware/swiftkerbauth/test_kerbauth.py
+++ /dev/null
@@ -1,478 +0,0 @@
-# Copyright (c) 2013 Red Hat, Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
-# implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import os
-import errno
-import unittest
-from time import time
-from mock import patch, Mock
-from test.unit import FakeMemcache
-from swift.common.swob import Request, Response
-from gluster.swift.common.middleware.swiftkerbauth import kerbauth as auth
-
-EXT_AUTHENTICATION_URL = "127.0.0.1"
-REDIRECT_STATUS = 303 # HTTPSeeOther
-
-
-def my_filter_factory(global_conf, **local_conf):
- if 'ext_authentication_url' not in global_conf:
- global_conf['ext_authentication_url'] = EXT_AUTHENTICATION_URL
- conf = global_conf.copy()
- conf.update(local_conf)
-
- def auth_filter(app):
- return auth.KerbAuth(app, conf)
- return auth_filter
-
-# Monkey patching filter_factory to always pass ext_authentication_url
-# as a parameter. Absence of ext_authentication_url raises a RuntimeError
-
-
-def patch_filter_factory():
- auth.filter_factory = my_filter_factory
-
-
-def unpatch_filter_factory():
- reload(auth)
-
-
-class FakeApp(object):
-
- def __init__(self, status_headers_body_iter=None, acl=None, sync_key=None):
- self.calls = 0
- self.status_headers_body_iter = status_headers_body_iter
- if not self.status_headers_body_iter:
- self.status_headers_body_iter = iter([('404 Not Found', {}, '')])
- self.acl = acl
- self.sync_key = sync_key
-
- def __call__(self, env, start_response):
- self.calls += 1
- self.request = Request.blank('', environ=env)
- if self.acl:
- self.request.acl = self.acl
- if self.sync_key:
- self.request.environ['swift_sync_key'] = self.sync_key
- if 'swift.authorize' in env:
- resp = env['swift.authorize'](self.request)
- if resp:
- return resp(env, start_response)
- status, headers, body = self.status_headers_body_iter.next()
- return Response(status=status, headers=headers,
- body=body)(env, start_response)
-
-
-class TestKerbAuth(unittest.TestCase):
-
- # Patch auth.filter_factory()
- patch_filter_factory()
-
- def setUp(self):
- self.test_auth = \
- auth.filter_factory({'auth_method': 'active'})(FakeApp())
- self.test_auth_passive = \
- auth.filter_factory({'auth_method': 'passive'})(FakeApp())
-
- def _make_request(self, path, **kwargs):
- req = Request.blank(path, **kwargs)
- req.environ['swift.cache'] = FakeMemcache()
- return req
-
- def test_no_ext_authentication_url(self):
- app = FakeApp()
- try:
- # Use original auth.filter_factory and NOT monkey patched version
- unpatch_filter_factory()
- auth.filter_factory({})(app)
- except RuntimeError as e:
- # Restore monkey patched version
- patch_filter_factory()
- self.assertTrue(e.args[0].startswith("Missing filter parameter "
- "ext_authentication_url"))
-
- def test_reseller_prefix_init(self):
- app = FakeApp()
- ath = auth.filter_factory({})(app)
- self.assertEquals(ath.reseller_prefix, 'AUTH_')
- ath = auth.filter_factory({'reseller_prefix': 'TEST'})(app)
- self.assertEquals(ath.reseller_prefix, 'TEST_')
- ath = auth.filter_factory({'reseller_prefix': 'TEST_'})(app)
- self.assertEquals(ath.reseller_prefix, 'TEST_')
-
- def test_auth_prefix_init(self):
- app = FakeApp()
- ath = auth.filter_factory({})(app)
- self.assertEquals(ath.auth_prefix, '/auth/')
- ath = auth.filter_factory({'auth_prefix': ''})(app)
- self.assertEquals(ath.auth_prefix, '/auth/')
- ath = auth.filter_factory({'auth_prefix': '/'})(app)
- self.assertEquals(ath.auth_prefix, '/auth/')
- ath = auth.filter_factory({'auth_prefix': '/test/'})(app)
- self.assertEquals(ath.auth_prefix, '/test/')
- ath = auth.filter_factory({'auth_prefix': '/test'})(app)
- self.assertEquals(ath.auth_prefix, '/test/')
- ath = auth.filter_factory({'auth_prefix': 'test/'})(app)
- self.assertEquals(ath.auth_prefix, '/test/')
- ath = auth.filter_factory({'auth_prefix': 'test'})(app)
- self.assertEquals(ath.auth_prefix, '/test/')
-
- def test_top_level_redirect(self):
- req = self._make_request('/')
- resp = req.get_response(self.test_auth)
- self.assertEquals(resp.status_int, REDIRECT_STATUS)
- self.assertEquals(req.environ['swift.authorize'],
- self.test_auth.denied_response)
-
- def test_passive_top_level_deny(self):
- req = self._make_request('/')
- resp = req.get_response(self.test_auth_passive)
- self.assertEquals(resp.status_int, 401)
- self.assertEquals(req.environ['swift.authorize'],
- self.test_auth_passive.denied_response)
-
- def test_passive_deny_invalid_token(self):
- req = self._make_request('/v1/AUTH_account',
- headers={'X-Auth-Token': 'AUTH_t'})
- resp = req.get_response(self.test_auth_passive)
- self.assertEquals(resp.status_int, 401)
-
- def test_override_asked_for_and_allowed(self):
- self.test_auth = \
- auth.filter_factory({'allow_overrides': 'true'})(FakeApp())
- req = self._make_request('/v1/AUTH_account',
- environ={'swift.authorize_override': True})
- resp = req.get_response(self.test_auth)
- self.assertEquals(resp.status_int, 404)
- self.assertTrue('swift.authorize' not in req.environ)
-
- def test_override_default_allowed(self):
- req = self._make_request('/v1/AUTH_account',
- environ={'swift.authorize_override': True})
- resp = req.get_response(self.test_auth)
- self.assertEquals(resp.status_int, 404)
- self.assertTrue('swift.authorize' not in req.environ)
-
- def test_options_call(self):
- req = self._make_request('/v1/AUTH_cfa/c/o',
- environ={'REQUEST_METHOD': 'OPTIONS'})
- resp = self.test_auth.authorize(req)
- self.assertEquals(resp, None)
-
- def test_auth_deny_non_reseller_prefix_no_override(self):
- fake_authorize = lambda x: Response(status='500 Fake')
- req = self._make_request('/v1/BLAH_account',
- headers={'X-Auth-Token': 'BLAH_t'},
- environ={'swift.authorize': fake_authorize}
- )
- resp = req.get_response(self.test_auth)
- self.assertEquals(resp.status_int, 500)
- self.assertEquals(req.environ['swift.authorize'], fake_authorize)
-
- def test_authorize_acl_group_access(self):
- req = self._make_request('/v1/AUTH_cfa')
- req.remote_user = 'act:usr,act'
- resp = self.test_auth.authorize(req)
- self.assertEquals(resp.status_int, 403)
- req = self._make_request('/v1/AUTH_cfa')
- req.remote_user = 'act:usr,act'
- req.acl = 'act'
- self.assertEquals(self.test_auth.authorize(req), None)
- req = self._make_request('/v1/AUTH_cfa')
- req.remote_user = 'act:usr,act'
- req.acl = 'act:usr'
- self.assertEquals(self.test_auth.authorize(req), None)
- req = self._make_request('/v1/AUTH_cfa')
- req.remote_user = 'act:usr,act'
-
- def test_deny_cross_reseller(self):
- # Tests that cross-reseller is denied, even if ACLs/group names match
- req = self._make_request('/v1/OTHER_cfa')
- req.remote_user = 'act:usr,act,AUTH_cfa'
- req.acl = 'act'
- resp = self.test_auth.authorize(req)
- self.assertEquals(resp.status_int, 403)
-
- def test_authorize_acl_referer_after_user_groups(self):
- req = self._make_request('/v1/AUTH_cfa/c')
- req.remote_user = 'act:usr'
- req.acl = '.r:*,act:usr'
- self.assertEquals(self.test_auth.authorize(req), None)
-
- def test_detect_reseller_request(self):
- req = self._make_request('/v1/AUTH_admin',
- headers={'X-Auth-Token': 'AUTH_t'})
- cache_key = 'AUTH_/token/AUTH_t'
- cache_entry = (time() + 3600, '.reseller_admin')
- req.environ['swift.cache'].set(cache_key, cache_entry)
- req.get_response(self.test_auth)
- self.assertTrue(req.environ.get('reseller_request', False))
-
- def test_regular_is_not_owner(self):
- orig_authorize = self.test_auth.authorize
- owner_values = []
-
- def mitm_authorize(req):
- rv = orig_authorize(req)
- owner_values.append(req.environ.get('swift_owner', False))
- return rv
-
- self.test_auth.authorize = mitm_authorize
-
- req = self._make_request(
- '/v1/AUTH_cfa/c',
- headers={'X-Auth-Token': 'AUTH_t'})
- req.remote_user = 'act:usr'
- self.test_auth.authorize(req)
- self.assertEquals(owner_values, [False])
-
- def test_no_memcache(self):
- env = {'swift.cache': None}
- try:
- self.test_auth.get_groups(env, None)
- except Exception as e:
- self.assertTrue(e.args[0].startswith("Memcache required"))
-
- def test_handle_request(self):
- req = self._make_request('/auth/v1.0')
- resp = self.test_auth.handle_request(req)
- self.assertEquals(resp.status_int, REDIRECT_STATUS)
-
- def test_handle_request_bad_request(self):
- req = self._make_request('////')
- resp = self.test_auth.handle_request(req)
- self.assertEquals(resp.status_int, 404)
-
- def test_handle_request_no_handler(self):
- req = self._make_request('/blah/blah/blah/blah')
- resp = self.test_auth.handle_request(req)
- self.assertEquals(resp.status_int, 400)
-
- def test_handle_get_token_bad_request(self):
- req = self._make_request('/blah/blah')
- resp = self.test_auth.handle_get_token(req)
- self.assertEquals(resp.status_int, 400)
- req = self._make_request('/////')
- resp = self.test_auth.handle_get_token(req)
- self.assertEquals(resp.status_int, 404)
-
- def test_passive_handle_get_token_no_user_or_key(self):
- #No user and key
- req = self._make_request('/auth/v1.0')
- resp = self.test_auth_passive.handle_get_token(req)
- self.assertEquals(resp.status_int, REDIRECT_STATUS)
- #User given but no key
- req = self._make_request('/auth/v1.0',
- headers={'X-Auth-User': 'test:user'})
- resp = self.test_auth_passive.handle_get_token(req)
- self.assertEquals(resp.status_int, 401)
-
- def test_passive_handle_get_token_account_in_req_path(self):
- req = self._make_request('/v1/test/auth',
- headers={'X-Auth-User': 'test:user',
- 'X-Auth-Key': 'password'})
- _mock_run_kinit = Mock(return_value=0)
- _mock_get_groups = Mock(return_value="user,auth_test")
- with patch('gluster.swift.common.middleware.swiftkerbauth.kerbauth.run_kinit', _mock_run_kinit):
- with patch('gluster.swift.common.middleware.swiftkerbauth.kerbauth.get_groups_from_username',
- _mock_get_groups):
- resp = self.test_auth_passive.handle_get_token(req)
- _mock_run_kinit.assert_called_once_with('user', 'password')
- self.assertEquals(_mock_get_groups.call_count, 2)
- self.assertEquals(resp.status_int, 200)
- self.assertTrue(resp.headers['X-Auth-Token'] is not None)
- self.assertTrue(resp.headers['X-Storage-Token'] is not None)
- self.assertTrue(resp.headers['X-Storage-Url'] is not None)
-
- def test_passive_handle_get_token_user_invalid_or_no__account(self):
- #X-Auth-User not in acc:user format
- req = self._make_request('/auth/v1.0',
- headers={'X-Auth-User': 'user'})
- resp = self.test_auth_passive.handle_get_token(req)
- self.assertEquals(resp.status_int, 401)
- req = self._make_request('/v1/test/auth',
- headers={'X-Auth-User': 'user'})
- resp = self.test_auth_passive.handle_get_token(req)
- self.assertEquals(resp.status_int, 401)
- # Account name mismatch
- req = self._make_request('/v1/test/auth',
- headers={'X-Auth-User': 'wrongacc:user'})
- resp = self.test_auth_passive.handle_get_token(req)
- self.assertEquals(resp.status_int, 401)
-
- def test_passive_handle_get_token_no_kinit(self):
- req = self._make_request('/auth/v1.0',
- headers={'X-Auth-User': 'test:user',
- 'X-Auth-Key': 'password'})
- _mock_run_kinit = Mock(side_effect=OSError(errno.ENOENT,
- os.strerror(errno.ENOENT)))
- with patch('gluster.swift.common.middleware.swiftkerbauth.kerbauth.run_kinit', _mock_run_kinit):
- resp = self.test_auth_passive.handle_get_token(req)
- self.assertEquals(resp.status_int, 500)
- self.assertTrue("kinit command not found" in resp.body)
- _mock_run_kinit.assert_called_once_with('user', 'password')
-
- def test_passive_handle_get_token_kinit_fail(self):
- req = self._make_request('/auth/v1.0',
- headers={'X-Auth-User': 'test:user',
- 'X-Auth-Key': 'password'})
- _mock_run_kinit = Mock(return_value=1)
- with patch('gluster.swift.common.middleware.swiftkerbauth.kerbauth.run_kinit', _mock_run_kinit):
- resp = self.test_auth_passive.handle_get_token(req)
- self.assertEquals(resp.status_int, 401)
- _mock_run_kinit.assert_called_once_with('user', 'password')
-
- def test_passive_handle_get_token_kinit_success_token_not_present(self):
- req = self._make_request('/auth/v1.0',
- headers={'X-Auth-User': 'test:user',
- 'X-Auth-Key': 'password'})
- _mock_run_kinit = Mock(return_value=0)
- _mock_get_groups = Mock(return_value="user,auth_test")
- with patch('gluster.swift.common.middleware.swiftkerbauth.kerbauth.run_kinit', _mock_run_kinit):
- with patch('gluster.swift.common.middleware.swiftkerbauth.kerbauth.get_groups_from_username',
- _mock_get_groups):
- resp = self.test_auth_passive.handle_get_token(req)
- _mock_run_kinit.assert_called_once_with('user', 'password')
- self.assertEquals(_mock_get_groups.call_count, 2)
- self.assertEquals(resp.status_int, 200)
- self.assertTrue(resp.headers['X-Auth-Token'] is not None)
- self.assertTrue(resp.headers['X-Storage-Token'] is not None)
- self.assertTrue(resp.headers['X-Storage-Url'] is not None)
-
- def test_passive_handle_get_token_kinit_realm_and_memcache(self):
- req = self._make_request('/auth/v1.0',
- headers={'X-Auth-User': 'test:user',
- 'X-Auth-Key': 'password'})
- req.environ['swift.cache'] = None
- _auth_passive = \
- auth.filter_factory({'auth_method': 'passive',
- 'realm_name': 'EXAMPLE.COM'})(FakeApp())
- _mock_run_kinit = Mock(return_value=0)
- _mock_get_groups = Mock(return_value="user,auth_test")
- with patch('gluster.swift.common.middleware.swiftkerbauth.kerbauth.run_kinit', _mock_run_kinit):
- with patch('gluster.swift.common.middleware.swiftkerbauth.kerbauth.get_groups_from_username',
- _mock_get_groups):
- try:
- _auth_passive.handle_get_token(req)
- except Exception as e:
- self.assertTrue(e.args[0].startswith("Memcache "
- "required"))
- else:
- self.fail("Expected Exception - Memcache required")
- _mock_run_kinit.assert_called_once_with('user@EXAMPLE.COM', 'password')
- _mock_get_groups.assert_called_once_with('user')
-
- def test_passive_handle_get_token_user_in_any__account(self):
- req = self._make_request('/auth/v1.0',
- headers={'X-Auth-User': 'test:user',
- 'X-Auth-Key': 'password'})
- _mock_run_kinit = Mock(return_value=0)
- _mock_get_groups = Mock(return_value="user,auth_blah")
- with patch('gluster.swift.common.middleware.swiftkerbauth.kerbauth.run_kinit', _mock_run_kinit):
- with patch('gluster.swift.common.middleware.swiftkerbauth.kerbauth.get_groups_from_username',
- _mock_get_groups):
- resp = self.test_auth_passive.handle_get_token(req)
- self.assertEquals(resp.status_int, 401)
- _mock_run_kinit.assert_called_once_with('user', 'password')
- _mock_get_groups.assert_called_once_with('user')
-
- def test_handle(self):
- req = self._make_request('/auth/v1.0')
- resp = req.get_response(self.test_auth)
- self.assertEquals(resp.status_int, REDIRECT_STATUS)
-
- def test_authorize_invalid_req(self):
- req = self._make_request('/')
- resp = self.test_auth.authorize(req)
- self.assertEquals(resp.status_int, 404)
-
- def test_authorize_set_swift_owner(self):
- req = self._make_request('/v1/AUTH_test/c1/o1')
- req.remote_user = 'test,auth_reseller_admin'
- resp = self.test_auth.authorize(req)
- self.assertEquals(req.environ['swift_owner'], True)
- self.assertTrue(resp is None)
- req = self._make_request('/v1/AUTH_test/c1/o1')
- req.remote_user = 'test,auth_test'
- resp = self.test_auth.authorize(req)
- self.assertEquals(req.environ['swift_owner'], True)
- self.assertTrue(resp is None)
-
- def test_authorize_swift_sync_key(self):
- req = self._make_request(
- '/v1/AUTH_cfa/c/o',
- environ={'swift_sync_key': 'secret'},
- headers={'x-container-sync-key': 'secret',
- 'x-timestamp': '123.456'})
- resp = self.test_auth.authorize(req)
- self.assertTrue(resp is None)
-
- def test_authorize_acl_referrer_access(self):
- req = self._make_request('/v1/AUTH_cfa/c')
- req.remote_user = 'act:usr,act'
- resp = self.test_auth.authorize(req)
- self.assertEquals(resp.status_int, 403)
- req = self._make_request('/v1/AUTH_cfa/c')
- req.remote_user = 'act:usr,act'
- req.acl = '.r:*,.rlistings'
- self.assertEquals(self.test_auth.authorize(req), None)
- req = self._make_request('/v1/AUTH_cfa/c')
- req.remote_user = 'act:usr,act'
- req.acl = '.r:*' # No listings allowed
- resp = self.test_auth.authorize(req)
- self.assertEquals(resp.status_int, 403)
- req = self._make_request('/v1/AUTH_cfa/c')
- req.remote_user = 'act:usr,act'
- req.acl = '.r:.example.com,.rlistings'
- resp = self.test_auth.authorize(req)
- self.assertEquals(resp.status_int, 403)
- req = self._make_request('/v1/AUTH_cfa/c')
- req.remote_user = 'act:usr,act'
- req.referer = 'http://www.example.com/index.html'
- req.acl = '.r:.example.com,.rlistings'
- self.assertEquals(self.test_auth.authorize(req), None)
- req = self._make_request('/v1/AUTH_cfa/c')
- resp = self.test_auth.authorize(req)
- self.assertEquals(resp.status_int, REDIRECT_STATUS)
- req = self._make_request('/v1/AUTH_cfa/c')
- req.acl = '.r:*,.rlistings'
- self.assertEquals(self.test_auth.authorize(req), None)
- req = self._make_request('/v1/AUTH_cfa/c')
- req.acl = '.r:*' # No listings allowed
- resp = self.test_auth.authorize(req)
- self.assertEquals(resp.status_int, REDIRECT_STATUS)
- req = self._make_request('/v1/AUTH_cfa/c')
- req.acl = '.r:.example.com,.rlistings'
- resp = self.test_auth.authorize(req)
- self.assertEquals(resp.status_int, REDIRECT_STATUS)
- req = self._make_request('/v1/AUTH_cfa/c')
- req.referer = 'http://www.example.com/index.html'
- req.acl = '.r:.example.com,.rlistings'
- self.assertEquals(self.test_auth.authorize(req), None)
-
- def test_handle_x_storage_token(self):
- req = self._make_request(
- '/auth/v1.0',
- headers={'x-storage-token': 'blahblah', })
- resp = req.get_response(self.test_auth)
- self.assertEquals(resp.status_int, REDIRECT_STATUS)
-
- def test_invalid_token(self):
- req = self._make_request('/k1/test')
- req.environ['HTTP_X_AUTH_TOKEN'] = 'AUTH_blahblahblah'
- resp = req.get_response(self.test_auth)
- self.assertEquals(resp.status_int, REDIRECT_STATUS)
-
-if __name__ == '__main__':
- unittest.main()
diff --git a/test/unit/common/middleware/swiftkerbauth/test_kerbauth_utils.py b/test/unit/common/middleware/swiftkerbauth/test_kerbauth_utils.py
deleted file mode 100644
index 2a4e90b..0000000
--- a/test/unit/common/middleware/swiftkerbauth/test_kerbauth_utils.py
+++ /dev/null
@@ -1,77 +0,0 @@
-# Copyright (c) 2013 Red Hat, Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
-# implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import unittest
-import re
-from time import time
-from test.unit import FakeMemcache
-from gluster.swift.common.middleware.swiftkerbauth import kerbauth_utils as ku
-
-
-class TestKerbUtils(unittest.TestCase):
-
- def test_get_remote_user(self):
- env = {'REMOTE_USER': "auth_admin@EXAMPLE.COM"}
- result = ku.get_remote_user(env)
- self.assertEqual(result, "auth_admin")
-
- def test_get_remote_user_err(self):
- env = {'REMOTE_USER': "auth_admin"}
- try:
- ku.get_remote_user(env)
- except RuntimeError as err:
- self.assertTrue(err.args[0].startswith("Malformed REMOTE_USER"))
- else:
- self.fail("Expected RuntimeError")
-
- def test_get_auth_data(self):
- mc = FakeMemcache()
- expiry = time() + 100
- ku.set_auth_data(mc, "root", "AUTH_tk", expiry, "root,admin")
- (token, expires, groups) = ku.get_auth_data(mc, "root")
- self.assertEqual(("AUTH_tk", expiry, "root,admin"),
- (token, expires, groups))
-
- def test_get_auth_data_err(self):
- mc = FakeMemcache()
- (token, expires, groups) = ku.get_auth_data(mc, "root")
- self.assertEqual((token, expires, groups), (None, None, None))
-
- expiry = time() - 1
- ku.set_auth_data(mc, "root", "AUTH_tk", expiry, "root,admin")
- (token, expires, groups) = ku.get_auth_data(mc, "root")
- self.assertEqual((token, expires, groups), (None, None, None))
-
- def test_set_auth_data(self):
- mc = FakeMemcache()
- expiry = time() + 100
- ku.set_auth_data(mc, "root", "AUTH_tk", expiry, "root,admin")
-
- def test_generate_token(self):
- token = ku.generate_token()
- matches = re.match('AUTH_tk[a-f0-9]{32}', token)
- self.assertTrue(matches is not None)
-
- def test_get_groups_from_username(self):
- groups = ku.get_groups_from_username("root")
- self.assertTrue("root" in groups)
-
- def test_get_groups_from_username_err(self):
- try:
- ku.get_groups_from_username("Zroot")
- except RuntimeError as err:
- self.assertTrue(err.args[0].startswith("Failure running id -G"))
- else:
- self.fail("Expected RuntimeError")
diff --git a/test/unit/common/test_constraints.py b/test/unit/common/test_constraints.py
index 6c78d75..e8ddd69 100644
--- a/test/unit/common/test_constraints.py
+++ b/test/unit/common/test_constraints.py
@@ -57,10 +57,10 @@ class TestConstraints(unittest.TestCase):
cnt.set_object_name_component_length()
self.assertEqual(len, cnt.get_object_name_component_length())
- with patch('swift.common.constraints.constraints_conf_int',
- mock_constraints_conf_int):
- cnt.set_object_name_component_length()
- self.assertEqual(cnt.get_object_name_component_length(), 1000)
+ with patch('swift.common.constraints.constraints_conf_int',
+ mock_constraints_conf_int):
+ cnt.set_object_name_component_length()
+ self.assertEqual(cnt.get_object_name_component_length(), 1000)
def test_validate_obj_name_component(self):
max_obj_len = cnt.get_object_name_component_length()
@@ -75,65 +75,6 @@ class TestConstraints(unittest.TestCase):
self.assertTrue(cnt.validate_obj_name_component('..'))
self.assertTrue(cnt.validate_obj_name_component(''))
- def test_validate_headers(self):
- req = Mock()
- req.headers = []
- self.assertEqual(cnt.validate_headers(req), '')
- req.headers = ['x-some-header']
- self.assertEqual(cnt.validate_headers(req), '')
- #TODO: Although we now support x-delete-at and x-delete-after,
- #retained this test case as we may add some other header to
- #unsupported list in future
- raise SkipTest
- req.headers = ['x-delete-at', 'x-some-header']
- self.assertNotEqual(cnt.validate_headers(req), '')
- req.headers = ['x-delete-after', 'x-some-header']
- self.assertNotEqual(cnt.validate_headers(req), '')
- req.headers = ['x-delete-at', 'x-delete-after', 'x-some-header']
- self.assertNotEqual(cnt.validate_headers(req), '')
-
- def test_validate_headers_ignoring_config_set(self):
- with patch('gluster.swift.common.constraints.'
- 'Glusterfs._ignore_unsupported_headers', True):
- req = Mock()
- req.headers = []
- self.assertEqual(cnt.validate_headers(req), '')
- req.headers = ['x-some-header']
- self.assertEqual(cnt.validate_headers(req), '')
- #TODO: Although we now support x-delete-at and x-delete-after,
- #retained this test case as we may add some other header to
- #unsupported list in future
- raise SkipTest
- req.headers = ['x-delete-at', 'x-some-header']
- self.assertEqual(cnt.validate_headers(req), '')
- req.headers = ['x-delete-after', 'x-some-header']
- self.assertEqual(cnt.validate_headers(req), '')
- req.headers = ['x-delete-at', 'x-delete-after', 'x-some-header']
- self.assertEqual(cnt.validate_headers(req), '')
-
- def test_gluster_check_metadata(self):
- mock_check_metadata = Mock()
- with patch('gluster.swift.common.constraints.__check_metadata',
- mock_check_metadata):
- req = Mock()
- req.headers = []
- cnt.gluster_check_metadata(req, 'object')
- self.assertTrue(1, mock_check_metadata.call_count)
- cnt.gluster_check_metadata(req, 'object', POST=False)
- self.assertTrue(1, mock_check_metadata.call_count)
- req.headers = ['x-some-header']
- self.assertEqual(cnt.gluster_check_metadata(req, 'object', POST=False), None)
- #TODO: Although we now support x-delete-at and x-delete-after,
- #retained this test case as we may add some other header to
- #unsupported list in future
- raise SkipTest
- req.headers = ['x-delete-at', 'x-some-header']
- self.assertNotEqual(cnt.gluster_check_metadata(req, 'object', POST=False), None)
- req.headers = ['x-delete-after', 'x-some-header']
- self.assertNotEqual(cnt.gluster_check_metadata(req, 'object', POST=False), None)
- req.headers = ['x-delete-at', 'x-delete-after', 'x-some-header']
- self.assertNotEqual(cnt.gluster_check_metadata(req, 'object', POST=False), None)
-
def test_gluster_check_object_creation(self):
with patch('gluster.swift.common.constraints.__check_object_creation',
mock_check_object_creation):
@@ -147,9 +88,3 @@ class TestConstraints(unittest.TestCase):
req = Mock()
req.headers = []
self.assertTrue(cnt.gluster_check_object_creation(req, 'dir/.'))
- #TODO: Although we now support x-delete-at and x-delete-after,
- #retained this test case as we may add some other header to
- #unsupported list in future
- raise SkipTest
- req.headers = ['x-delete-at']
- self.assertTrue(cnt.gluster_check_object_creation(req, 'dir/z'))
diff --git a/test/unit/common/test_diskdir.py b/test/unit/common/test_diskdir.py
index f32c3ad..ebc554a 100644
--- a/test/unit/common/test_diskdir.py
+++ b/test/unit/common/test_diskdir.py
@@ -408,7 +408,7 @@ class TestContainerBroker(unittest.TestCase):
self.assertEqual(os.path.basename(broker.db_file), 'db_file.db')
broker.initialize(self.initial_ts)
self.assertTrue(os.path.isdir(self.container))
- self.assertEquals(self.initial_ts, broker.metadata[utils.X_TIMESTAMP])
+ self.assertEquals(self.initial_ts, broker.metadata[utils.X_TIMESTAMP][0])
self.assertFalse(broker.is_deleted())
def test_creation_existing(self):
@@ -419,7 +419,7 @@ class TestContainerBroker(unittest.TestCase):
self.assertEqual(os.path.basename(broker.db_file), 'db_file.db')
broker.initialize(self.initial_ts)
self.assertTrue(os.path.isdir(self.container))
- self.assertEquals(self.initial_ts, broker.metadata[utils.X_TIMESTAMP])
+ self.assertEquals(self.initial_ts, broker.metadata[utils.X_TIMESTAMP][0])
self.assertFalse(broker.is_deleted())
def test_creation_existing_bad_metadata(self):
@@ -432,7 +432,7 @@ class TestContainerBroker(unittest.TestCase):
self.assertEqual(os.path.basename(broker.db_file), 'db_file.db')
broker.initialize(self.initial_ts)
self.assertTrue(os.path.isdir(self.container))
- self.assertEquals(self.initial_ts, broker.metadata[utils.X_TIMESTAMP])
+ self.assertEquals(self.initial_ts, broker.metadata[utils.X_TIMESTAMP][0])
self.assertFalse(broker.is_deleted())
def test_empty(self):
@@ -954,7 +954,7 @@ class TestContainerBroker(unittest.TestCase):
self.assertEqual(os.path.basename(broker.db_file), 'db_file.db')
broker.initialize(self.initial_ts)
self.assertTrue(os.path.isdir(self.container))
- self.assertEquals(self.initial_ts, broker.metadata[utils.X_TIMESTAMP])
+ self.assertEquals(self.initial_ts, broker.metadata[utils.X_TIMESTAMP][0])
self.assertFalse(broker.is_deleted())
broker.delete_db(normalize_timestamp(time()))
self.assertTrue(broker.is_deleted())
@@ -1005,7 +1005,7 @@ class TestAccountBroker(unittest.TestCase):
self.assertEqual(os.path.basename(broker.db_file), 'db_file.db')
broker.initialize(self.initial_ts)
self.assertTrue(os.path.isdir(self.drive_fullpath))
- self.assertEquals(self.initial_ts, broker.metadata[utils.X_TIMESTAMP])
+ self.assertEquals(self.initial_ts, broker.metadata[utils.X_TIMESTAMP][0])
self.assertFalse(broker.is_deleted())
def test_creation_bad_metadata(self):
@@ -1016,7 +1016,7 @@ class TestAccountBroker(unittest.TestCase):
self.assertEqual(os.path.basename(broker.db_file), 'db_file.db')
broker.initialize(self.initial_ts)
self.assertTrue(os.path.isdir(self.drive_fullpath))
- self.assertEquals(self.initial_ts, broker.metadata[utils.X_TIMESTAMP])
+ self.assertEquals(self.initial_ts, broker.metadata[utils.X_TIMESTAMP][0])
self.assertFalse(broker.is_deleted())
def test_empty(self):
@@ -1219,7 +1219,7 @@ class TestAccountBroker(unittest.TestCase):
self.assertEqual(os.path.basename(broker.db_file), 'db_file.db')
broker.initialize(self.initial_ts)
self.assertTrue(os.path.isdir(self.drive_fullpath))
- self.assertEquals(self.initial_ts, broker.metadata[utils.X_TIMESTAMP])
+ self.assertEquals(self.initial_ts, broker.metadata[utils.X_TIMESTAMP][0])
self.assertFalse(broker.is_deleted())
broker.delete_db(normalize_timestamp(time()))
# Deleting the "db" should be a NOOP
diff --git a/test/unit/common/test_utils.py b/test/unit/common/test_utils.py
index dd03bd8..55bd0cf 100644
--- a/test/unit/common/test_utils.py
+++ b/test/unit/common/test_utils.py
@@ -25,9 +25,10 @@ import hashlib
import tarfile
import shutil
from collections import defaultdict
-from mock import patch
+from mock import patch, Mock
from gluster.swift.common import utils, Glusterfs
-from gluster.swift.common.exceptions import GlusterFileSystemOSError
+from gluster.swift.common.exceptions import GlusterFileSystemOSError,\
+ GlusterFileSystemIOError
from swift.common.exceptions import DiskFileNoSpace
#
@@ -712,6 +713,18 @@ class TestUtils(unittest.TestCase):
ret = utils.validate_object(md)
assert ret
+ def test_validate_object_with_stat(self):
+ md = {utils.X_TIMESTAMP: 'na',
+ utils.X_CONTENT_TYPE: 'na',
+ utils.X_ETAG: 'bad',
+ utils.X_CONTENT_LENGTH: '12345',
+ utils.X_TYPE: utils.OBJECT,
+ utils.X_OBJECT_TYPE: 'na'}
+ fake_stat = Mock(st_size=12346)
+ self.assertFalse(utils.validate_object(md, fake_stat))
+ fake_stat = Mock(st_size=12345)
+ self.assertTrue(utils.validate_object(md, fake_stat))
+
class TestUtilsDirObjects(unittest.TestCase):
@@ -794,7 +807,8 @@ class TestUtilsDirObjects(unittest.TestCase):
def _mock_rm(path):
print "_mock_rm-metadata_enoent(%s)" % path
shutil.rmtree(path)
- raise OSError(errno.ENOENT, os.strerror(errno.ENOENT))
+ raise GlusterFileSystemIOError(errno.ENOENT,
+ os.strerror(errno.ENOENT))
# Remove the files
for f in self.files:
@@ -805,8 +819,8 @@ class TestUtilsDirObjects(unittest.TestCase):
try:
try:
self.assertTrue(utils.rmobjdir(self.rootdir))
- except OSError:
- self.fail("Unexpected OSError")
+ except IOError:
+ self.fail("Unexpected IOError")
else:
pass
finally: