diff options
Diffstat (limited to 'test/unit/common')
| -rw-r--r-- | test/unit/common/middleware/swiftkerbauth/__init__.py | 0 | ||||
| -rw-r--r-- | test/unit/common/middleware/swiftkerbauth/test_kerbauth.py | 478 | ||||
| -rw-r--r-- | test/unit/common/middleware/swiftkerbauth/test_kerbauth_utils.py | 77 | ||||
| -rw-r--r-- | test/unit/common/test_constraints.py | 73 | ||||
| -rw-r--r-- | test/unit/common/test_diskdir.py | 14 | ||||
| -rw-r--r-- | test/unit/common/test_utils.py | 24 | 
6 files changed, 30 insertions, 636 deletions
diff --git a/test/unit/common/middleware/swiftkerbauth/__init__.py b/test/unit/common/middleware/swiftkerbauth/__init__.py deleted file mode 100644 index e69de29..0000000 --- a/test/unit/common/middleware/swiftkerbauth/__init__.py +++ /dev/null diff --git a/test/unit/common/middleware/swiftkerbauth/test_kerbauth.py b/test/unit/common/middleware/swiftkerbauth/test_kerbauth.py deleted file mode 100644 index 537b8d3..0000000 --- a/test/unit/common/middleware/swiftkerbauth/test_kerbauth.py +++ /dev/null @@ -1,478 +0,0 @@ -# Copyright (c) 2013 Red Hat, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -#    http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or -# implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import os -import errno -import unittest -from time import time -from mock import patch, Mock -from test.unit import FakeMemcache -from swift.common.swob import Request, Response -from gluster.swift.common.middleware.swiftkerbauth import kerbauth as auth - -EXT_AUTHENTICATION_URL = "127.0.0.1" -REDIRECT_STATUS = 303  # HTTPSeeOther - - -def my_filter_factory(global_conf, **local_conf): -    if 'ext_authentication_url' not in global_conf: -        global_conf['ext_authentication_url'] = EXT_AUTHENTICATION_URL -    conf = global_conf.copy() -    conf.update(local_conf) - -    def auth_filter(app): -        return auth.KerbAuth(app, conf) -    return auth_filter - -# Monkey patching filter_factory to always pass ext_authentication_url -# as a parameter. Absence of ext_authentication_url raises a RuntimeError - - -def patch_filter_factory(): -    auth.filter_factory = my_filter_factory - - -def unpatch_filter_factory(): -    reload(auth) - - -class FakeApp(object): - -    def __init__(self, status_headers_body_iter=None, acl=None, sync_key=None): -        self.calls = 0 -        self.status_headers_body_iter = status_headers_body_iter -        if not self.status_headers_body_iter: -            self.status_headers_body_iter = iter([('404 Not Found', {}, '')]) -        self.acl = acl -        self.sync_key = sync_key - -    def __call__(self, env, start_response): -        self.calls += 1 -        self.request = Request.blank('', environ=env) -        if self.acl: -            self.request.acl = self.acl -        if self.sync_key: -            self.request.environ['swift_sync_key'] = self.sync_key -        if 'swift.authorize' in env: -            resp = env['swift.authorize'](self.request) -            if resp: -                return resp(env, start_response) -        status, headers, body = self.status_headers_body_iter.next() -        return Response(status=status, headers=headers, -                        body=body)(env, start_response) - - -class TestKerbAuth(unittest.TestCase): - -    # Patch auth.filter_factory() -    patch_filter_factory() - -    def setUp(self): -        self.test_auth = \ -            auth.filter_factory({'auth_method': 'active'})(FakeApp()) -        self.test_auth_passive = \ -            auth.filter_factory({'auth_method': 'passive'})(FakeApp()) - -    def _make_request(self, path, **kwargs): -        req = Request.blank(path, **kwargs) -        req.environ['swift.cache'] = FakeMemcache() -        return req - -    def test_no_ext_authentication_url(self): -        app = FakeApp() -        try: -            # Use original auth.filter_factory and NOT monkey patched version -            unpatch_filter_factory() -            auth.filter_factory({})(app) -        except RuntimeError as e: -            # Restore monkey patched version -            patch_filter_factory() -            self.assertTrue(e.args[0].startswith("Missing filter parameter " -                                                 "ext_authentication_url")) - -    def test_reseller_prefix_init(self): -        app = FakeApp() -        ath = auth.filter_factory({})(app) -        self.assertEquals(ath.reseller_prefix, 'AUTH_') -        ath = auth.filter_factory({'reseller_prefix': 'TEST'})(app) -        self.assertEquals(ath.reseller_prefix, 'TEST_') -        ath = auth.filter_factory({'reseller_prefix': 'TEST_'})(app) -        self.assertEquals(ath.reseller_prefix, 'TEST_') - -    def test_auth_prefix_init(self): -        app = FakeApp() -        ath = auth.filter_factory({})(app) -        self.assertEquals(ath.auth_prefix, '/auth/') -        ath = auth.filter_factory({'auth_prefix': ''})(app) -        self.assertEquals(ath.auth_prefix, '/auth/') -        ath = auth.filter_factory({'auth_prefix': '/'})(app) -        self.assertEquals(ath.auth_prefix, '/auth/') -        ath = auth.filter_factory({'auth_prefix': '/test/'})(app) -        self.assertEquals(ath.auth_prefix, '/test/') -        ath = auth.filter_factory({'auth_prefix': '/test'})(app) -        self.assertEquals(ath.auth_prefix, '/test/') -        ath = auth.filter_factory({'auth_prefix': 'test/'})(app) -        self.assertEquals(ath.auth_prefix, '/test/') -        ath = auth.filter_factory({'auth_prefix': 'test'})(app) -        self.assertEquals(ath.auth_prefix, '/test/') - -    def test_top_level_redirect(self): -        req = self._make_request('/') -        resp = req.get_response(self.test_auth) -        self.assertEquals(resp.status_int, REDIRECT_STATUS) -        self.assertEquals(req.environ['swift.authorize'], -                          self.test_auth.denied_response) - -    def test_passive_top_level_deny(self): -        req = self._make_request('/') -        resp = req.get_response(self.test_auth_passive) -        self.assertEquals(resp.status_int, 401) -        self.assertEquals(req.environ['swift.authorize'], -                          self.test_auth_passive.denied_response) - -    def test_passive_deny_invalid_token(self): -        req = self._make_request('/v1/AUTH_account', -                                 headers={'X-Auth-Token': 'AUTH_t'}) -        resp = req.get_response(self.test_auth_passive) -        self.assertEquals(resp.status_int, 401) - -    def test_override_asked_for_and_allowed(self): -        self.test_auth = \ -            auth.filter_factory({'allow_overrides': 'true'})(FakeApp()) -        req = self._make_request('/v1/AUTH_account', -                                 environ={'swift.authorize_override': True}) -        resp = req.get_response(self.test_auth) -        self.assertEquals(resp.status_int, 404) -        self.assertTrue('swift.authorize' not in req.environ) - -    def test_override_default_allowed(self): -        req = self._make_request('/v1/AUTH_account', -                                 environ={'swift.authorize_override': True}) -        resp = req.get_response(self.test_auth) -        self.assertEquals(resp.status_int, 404) -        self.assertTrue('swift.authorize' not in req.environ) - -    def test_options_call(self): -        req = self._make_request('/v1/AUTH_cfa/c/o', -                                 environ={'REQUEST_METHOD': 'OPTIONS'}) -        resp = self.test_auth.authorize(req) -        self.assertEquals(resp, None) - -    def test_auth_deny_non_reseller_prefix_no_override(self): -        fake_authorize = lambda x: Response(status='500 Fake') -        req = self._make_request('/v1/BLAH_account', -                                 headers={'X-Auth-Token': 'BLAH_t'}, -                                 environ={'swift.authorize': fake_authorize} -                                 ) -        resp = req.get_response(self.test_auth) -        self.assertEquals(resp.status_int, 500) -        self.assertEquals(req.environ['swift.authorize'], fake_authorize) - -    def test_authorize_acl_group_access(self): -        req = self._make_request('/v1/AUTH_cfa') -        req.remote_user = 'act:usr,act' -        resp = self.test_auth.authorize(req) -        self.assertEquals(resp.status_int, 403) -        req = self._make_request('/v1/AUTH_cfa') -        req.remote_user = 'act:usr,act' -        req.acl = 'act' -        self.assertEquals(self.test_auth.authorize(req), None) -        req = self._make_request('/v1/AUTH_cfa') -        req.remote_user = 'act:usr,act' -        req.acl = 'act:usr' -        self.assertEquals(self.test_auth.authorize(req), None) -        req = self._make_request('/v1/AUTH_cfa') -        req.remote_user = 'act:usr,act' - -    def test_deny_cross_reseller(self): -        # Tests that cross-reseller is denied, even if ACLs/group names match -        req = self._make_request('/v1/OTHER_cfa') -        req.remote_user = 'act:usr,act,AUTH_cfa' -        req.acl = 'act' -        resp = self.test_auth.authorize(req) -        self.assertEquals(resp.status_int, 403) - -    def test_authorize_acl_referer_after_user_groups(self): -        req = self._make_request('/v1/AUTH_cfa/c') -        req.remote_user = 'act:usr' -        req.acl = '.r:*,act:usr' -        self.assertEquals(self.test_auth.authorize(req), None) - -    def test_detect_reseller_request(self): -        req = self._make_request('/v1/AUTH_admin', -                                 headers={'X-Auth-Token': 'AUTH_t'}) -        cache_key = 'AUTH_/token/AUTH_t' -        cache_entry = (time() + 3600, '.reseller_admin') -        req.environ['swift.cache'].set(cache_key, cache_entry) -        req.get_response(self.test_auth) -        self.assertTrue(req.environ.get('reseller_request', False)) - -    def test_regular_is_not_owner(self): -        orig_authorize = self.test_auth.authorize -        owner_values = [] - -        def mitm_authorize(req): -            rv = orig_authorize(req) -            owner_values.append(req.environ.get('swift_owner', False)) -            return rv - -        self.test_auth.authorize = mitm_authorize - -        req = self._make_request( -            '/v1/AUTH_cfa/c', -            headers={'X-Auth-Token': 'AUTH_t'}) -        req.remote_user = 'act:usr' -        self.test_auth.authorize(req) -        self.assertEquals(owner_values, [False]) - -    def test_no_memcache(self): -        env = {'swift.cache': None} -        try: -            self.test_auth.get_groups(env, None) -        except Exception as e: -            self.assertTrue(e.args[0].startswith("Memcache required")) - -    def test_handle_request(self): -        req = self._make_request('/auth/v1.0') -        resp = self.test_auth.handle_request(req) -        self.assertEquals(resp.status_int, REDIRECT_STATUS) - -    def test_handle_request_bad_request(self): -        req = self._make_request('////') -        resp = self.test_auth.handle_request(req) -        self.assertEquals(resp.status_int, 404) - -    def test_handle_request_no_handler(self): -        req = self._make_request('/blah/blah/blah/blah') -        resp = self.test_auth.handle_request(req) -        self.assertEquals(resp.status_int, 400) - -    def test_handle_get_token_bad_request(self): -        req = self._make_request('/blah/blah') -        resp = self.test_auth.handle_get_token(req) -        self.assertEquals(resp.status_int, 400) -        req = self._make_request('/////') -        resp = self.test_auth.handle_get_token(req) -        self.assertEquals(resp.status_int, 404) - -    def test_passive_handle_get_token_no_user_or_key(self): -        #No user and key -        req = self._make_request('/auth/v1.0') -        resp = self.test_auth_passive.handle_get_token(req) -        self.assertEquals(resp.status_int, REDIRECT_STATUS) -        #User given but no key -        req = self._make_request('/auth/v1.0', -                                 headers={'X-Auth-User': 'test:user'}) -        resp = self.test_auth_passive.handle_get_token(req) -        self.assertEquals(resp.status_int, 401) - -    def test_passive_handle_get_token_account_in_req_path(self): -        req = self._make_request('/v1/test/auth', -                                 headers={'X-Auth-User': 'test:user', -                                          'X-Auth-Key': 'password'}) -        _mock_run_kinit = Mock(return_value=0) -        _mock_get_groups = Mock(return_value="user,auth_test") -        with patch('gluster.swift.common.middleware.swiftkerbauth.kerbauth.run_kinit', _mock_run_kinit): -            with patch('gluster.swift.common.middleware.swiftkerbauth.kerbauth.get_groups_from_username', -                       _mock_get_groups): -                resp = self.test_auth_passive.handle_get_token(req) -        _mock_run_kinit.assert_called_once_with('user', 'password') -        self.assertEquals(_mock_get_groups.call_count, 2) -        self.assertEquals(resp.status_int, 200) -        self.assertTrue(resp.headers['X-Auth-Token'] is not None) -        self.assertTrue(resp.headers['X-Storage-Token'] is not None) -        self.assertTrue(resp.headers['X-Storage-Url'] is not None) - -    def test_passive_handle_get_token_user_invalid_or_no__account(self): -        #X-Auth-User not in acc:user format -        req = self._make_request('/auth/v1.0', -                                 headers={'X-Auth-User': 'user'}) -        resp = self.test_auth_passive.handle_get_token(req) -        self.assertEquals(resp.status_int, 401) -        req = self._make_request('/v1/test/auth', -                                 headers={'X-Auth-User': 'user'}) -        resp = self.test_auth_passive.handle_get_token(req) -        self.assertEquals(resp.status_int, 401) -        # Account name mismatch -        req = self._make_request('/v1/test/auth', -                                 headers={'X-Auth-User': 'wrongacc:user'}) -        resp = self.test_auth_passive.handle_get_token(req) -        self.assertEquals(resp.status_int, 401) - -    def test_passive_handle_get_token_no_kinit(self): -        req = self._make_request('/auth/v1.0', -                                 headers={'X-Auth-User': 'test:user', -                                          'X-Auth-Key': 'password'}) -        _mock_run_kinit = Mock(side_effect=OSError(errno.ENOENT, -                                                   os.strerror(errno.ENOENT))) -        with patch('gluster.swift.common.middleware.swiftkerbauth.kerbauth.run_kinit', _mock_run_kinit): -            resp = self.test_auth_passive.handle_get_token(req) -        self.assertEquals(resp.status_int, 500) -        self.assertTrue("kinit command not found" in resp.body) -        _mock_run_kinit.assert_called_once_with('user', 'password') - -    def test_passive_handle_get_token_kinit_fail(self): -        req = self._make_request('/auth/v1.0', -                                 headers={'X-Auth-User': 'test:user', -                                          'X-Auth-Key': 'password'}) -        _mock_run_kinit = Mock(return_value=1) -        with patch('gluster.swift.common.middleware.swiftkerbauth.kerbauth.run_kinit', _mock_run_kinit): -            resp = self.test_auth_passive.handle_get_token(req) -        self.assertEquals(resp.status_int, 401) -        _mock_run_kinit.assert_called_once_with('user', 'password') - -    def test_passive_handle_get_token_kinit_success_token_not_present(self): -        req = self._make_request('/auth/v1.0', -                                 headers={'X-Auth-User': 'test:user', -                                          'X-Auth-Key': 'password'}) -        _mock_run_kinit = Mock(return_value=0) -        _mock_get_groups = Mock(return_value="user,auth_test") -        with patch('gluster.swift.common.middleware.swiftkerbauth.kerbauth.run_kinit', _mock_run_kinit): -            with patch('gluster.swift.common.middleware.swiftkerbauth.kerbauth.get_groups_from_username', -                       _mock_get_groups): -                resp = self.test_auth_passive.handle_get_token(req) -        _mock_run_kinit.assert_called_once_with('user', 'password') -        self.assertEquals(_mock_get_groups.call_count, 2) -        self.assertEquals(resp.status_int, 200) -        self.assertTrue(resp.headers['X-Auth-Token'] is not None) -        self.assertTrue(resp.headers['X-Storage-Token'] is not None) -        self.assertTrue(resp.headers['X-Storage-Url'] is not None) - -    def test_passive_handle_get_token_kinit_realm_and_memcache(self): -        req = self._make_request('/auth/v1.0', -                                 headers={'X-Auth-User': 'test:user', -                                          'X-Auth-Key': 'password'}) -        req.environ['swift.cache'] = None -        _auth_passive = \ -            auth.filter_factory({'auth_method': 'passive', -                                'realm_name': 'EXAMPLE.COM'})(FakeApp()) -        _mock_run_kinit = Mock(return_value=0) -        _mock_get_groups = Mock(return_value="user,auth_test") -        with patch('gluster.swift.common.middleware.swiftkerbauth.kerbauth.run_kinit', _mock_run_kinit): -            with patch('gluster.swift.common.middleware.swiftkerbauth.kerbauth.get_groups_from_username', -                       _mock_get_groups): -                    try: -                        _auth_passive.handle_get_token(req) -                    except Exception as e: -                        self.assertTrue(e.args[0].startswith("Memcache " -                                                             "required")) -                    else: -                        self.fail("Expected Exception - Memcache required") -        _mock_run_kinit.assert_called_once_with('user@EXAMPLE.COM', 'password') -        _mock_get_groups.assert_called_once_with('user') - -    def test_passive_handle_get_token_user_in_any__account(self): -        req = self._make_request('/auth/v1.0', -                                 headers={'X-Auth-User': 'test:user', -                                          'X-Auth-Key': 'password'}) -        _mock_run_kinit = Mock(return_value=0) -        _mock_get_groups = Mock(return_value="user,auth_blah") -        with patch('gluster.swift.common.middleware.swiftkerbauth.kerbauth.run_kinit', _mock_run_kinit): -            with patch('gluster.swift.common.middleware.swiftkerbauth.kerbauth.get_groups_from_username', -                       _mock_get_groups): -                resp = self.test_auth_passive.handle_get_token(req) -                self.assertEquals(resp.status_int, 401) -        _mock_run_kinit.assert_called_once_with('user', 'password') -        _mock_get_groups.assert_called_once_with('user') - -    def test_handle(self): -        req = self._make_request('/auth/v1.0') -        resp = req.get_response(self.test_auth) -        self.assertEquals(resp.status_int, REDIRECT_STATUS) - -    def test_authorize_invalid_req(self): -        req = self._make_request('/') -        resp = self.test_auth.authorize(req) -        self.assertEquals(resp.status_int, 404) - -    def test_authorize_set_swift_owner(self): -        req = self._make_request('/v1/AUTH_test/c1/o1') -        req.remote_user = 'test,auth_reseller_admin' -        resp = self.test_auth.authorize(req) -        self.assertEquals(req.environ['swift_owner'], True) -        self.assertTrue(resp is None) -        req = self._make_request('/v1/AUTH_test/c1/o1') -        req.remote_user = 'test,auth_test' -        resp = self.test_auth.authorize(req) -        self.assertEquals(req.environ['swift_owner'], True) -        self.assertTrue(resp is None) - -    def test_authorize_swift_sync_key(self): -        req = self._make_request( -            '/v1/AUTH_cfa/c/o', -            environ={'swift_sync_key': 'secret'}, -            headers={'x-container-sync-key': 'secret', -                     'x-timestamp': '123.456'}) -        resp = self.test_auth.authorize(req) -        self.assertTrue(resp is None) - -    def test_authorize_acl_referrer_access(self): -        req = self._make_request('/v1/AUTH_cfa/c') -        req.remote_user = 'act:usr,act' -        resp = self.test_auth.authorize(req) -        self.assertEquals(resp.status_int, 403) -        req = self._make_request('/v1/AUTH_cfa/c') -        req.remote_user = 'act:usr,act' -        req.acl = '.r:*,.rlistings' -        self.assertEquals(self.test_auth.authorize(req), None) -        req = self._make_request('/v1/AUTH_cfa/c') -        req.remote_user = 'act:usr,act' -        req.acl = '.r:*'  # No listings allowed -        resp = self.test_auth.authorize(req) -        self.assertEquals(resp.status_int, 403) -        req = self._make_request('/v1/AUTH_cfa/c') -        req.remote_user = 'act:usr,act' -        req.acl = '.r:.example.com,.rlistings' -        resp = self.test_auth.authorize(req) -        self.assertEquals(resp.status_int, 403) -        req = self._make_request('/v1/AUTH_cfa/c') -        req.remote_user = 'act:usr,act' -        req.referer = 'http://www.example.com/index.html' -        req.acl = '.r:.example.com,.rlistings' -        self.assertEquals(self.test_auth.authorize(req), None) -        req = self._make_request('/v1/AUTH_cfa/c') -        resp = self.test_auth.authorize(req) -        self.assertEquals(resp.status_int, REDIRECT_STATUS) -        req = self._make_request('/v1/AUTH_cfa/c') -        req.acl = '.r:*,.rlistings' -        self.assertEquals(self.test_auth.authorize(req), None) -        req = self._make_request('/v1/AUTH_cfa/c') -        req.acl = '.r:*'  # No listings allowed -        resp = self.test_auth.authorize(req) -        self.assertEquals(resp.status_int, REDIRECT_STATUS) -        req = self._make_request('/v1/AUTH_cfa/c') -        req.acl = '.r:.example.com,.rlistings' -        resp = self.test_auth.authorize(req) -        self.assertEquals(resp.status_int, REDIRECT_STATUS) -        req = self._make_request('/v1/AUTH_cfa/c') -        req.referer = 'http://www.example.com/index.html' -        req.acl = '.r:.example.com,.rlistings' -        self.assertEquals(self.test_auth.authorize(req), None) - -    def test_handle_x_storage_token(self): -        req = self._make_request( -            '/auth/v1.0', -            headers={'x-storage-token': 'blahblah', }) -        resp = req.get_response(self.test_auth) -        self.assertEquals(resp.status_int, REDIRECT_STATUS) - -    def test_invalid_token(self): -        req = self._make_request('/k1/test') -        req.environ['HTTP_X_AUTH_TOKEN'] = 'AUTH_blahblahblah' -        resp = req.get_response(self.test_auth) -        self.assertEquals(resp.status_int, REDIRECT_STATUS) - -if __name__ == '__main__': -    unittest.main() diff --git a/test/unit/common/middleware/swiftkerbauth/test_kerbauth_utils.py b/test/unit/common/middleware/swiftkerbauth/test_kerbauth_utils.py deleted file mode 100644 index 2a4e90b..0000000 --- a/test/unit/common/middleware/swiftkerbauth/test_kerbauth_utils.py +++ /dev/null @@ -1,77 +0,0 @@ -# Copyright (c) 2013 Red Hat, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -#    http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or -# implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import unittest -import re -from time import time -from test.unit import FakeMemcache -from gluster.swift.common.middleware.swiftkerbauth import kerbauth_utils as ku - - -class TestKerbUtils(unittest.TestCase): - -    def test_get_remote_user(self): -        env = {'REMOTE_USER': "auth_admin@EXAMPLE.COM"} -        result = ku.get_remote_user(env) -        self.assertEqual(result, "auth_admin") - -    def test_get_remote_user_err(self): -        env = {'REMOTE_USER': "auth_admin"} -        try: -            ku.get_remote_user(env) -        except RuntimeError as err: -            self.assertTrue(err.args[0].startswith("Malformed REMOTE_USER")) -        else: -            self.fail("Expected RuntimeError") - -    def test_get_auth_data(self): -        mc = FakeMemcache() -        expiry = time() + 100 -        ku.set_auth_data(mc, "root", "AUTH_tk", expiry, "root,admin") -        (token, expires, groups) = ku.get_auth_data(mc, "root") -        self.assertEqual(("AUTH_tk", expiry, "root,admin"), -                         (token, expires, groups)) - -    def test_get_auth_data_err(self): -        mc = FakeMemcache() -        (token, expires, groups) = ku.get_auth_data(mc, "root") -        self.assertEqual((token, expires, groups), (None, None, None)) - -        expiry = time() - 1 -        ku.set_auth_data(mc, "root", "AUTH_tk", expiry, "root,admin") -        (token, expires, groups) = ku.get_auth_data(mc, "root") -        self.assertEqual((token, expires, groups), (None, None, None)) - -    def test_set_auth_data(self): -        mc = FakeMemcache() -        expiry = time() + 100 -        ku.set_auth_data(mc, "root", "AUTH_tk", expiry, "root,admin") - -    def test_generate_token(self): -        token = ku.generate_token() -        matches = re.match('AUTH_tk[a-f0-9]{32}', token) -        self.assertTrue(matches is not None) - -    def test_get_groups_from_username(self): -        groups = ku.get_groups_from_username("root") -        self.assertTrue("root" in groups) - -    def test_get_groups_from_username_err(self): -        try: -            ku.get_groups_from_username("Zroot") -        except RuntimeError as err: -            self.assertTrue(err.args[0].startswith("Failure running id -G")) -        else: -            self.fail("Expected RuntimeError") diff --git a/test/unit/common/test_constraints.py b/test/unit/common/test_constraints.py index 6c78d75..e8ddd69 100644 --- a/test/unit/common/test_constraints.py +++ b/test/unit/common/test_constraints.py @@ -57,10 +57,10 @@ class TestConstraints(unittest.TestCase):              cnt.set_object_name_component_length()              self.assertEqual(len, cnt.get_object_name_component_length()) -        with patch('swift.common.constraints.constraints_conf_int', -                   mock_constraints_conf_int): -            cnt.set_object_name_component_length() -            self.assertEqual(cnt.get_object_name_component_length(), 1000) +            with patch('swift.common.constraints.constraints_conf_int', +                       mock_constraints_conf_int): +                cnt.set_object_name_component_length() +                self.assertEqual(cnt.get_object_name_component_length(), 1000)      def test_validate_obj_name_component(self):          max_obj_len = cnt.get_object_name_component_length() @@ -75,65 +75,6 @@ class TestConstraints(unittest.TestCase):          self.assertTrue(cnt.validate_obj_name_component('..'))          self.assertTrue(cnt.validate_obj_name_component('')) -    def test_validate_headers(self): -        req = Mock() -        req.headers = [] -        self.assertEqual(cnt.validate_headers(req), '') -        req.headers = ['x-some-header'] -        self.assertEqual(cnt.validate_headers(req), '') -        #TODO: Although we now support x-delete-at and x-delete-after, -        #retained this test case as we may add some other header to -        #unsupported list in future -        raise SkipTest -        req.headers = ['x-delete-at', 'x-some-header'] -        self.assertNotEqual(cnt.validate_headers(req), '') -        req.headers = ['x-delete-after', 'x-some-header'] -        self.assertNotEqual(cnt.validate_headers(req), '') -        req.headers = ['x-delete-at', 'x-delete-after', 'x-some-header'] -        self.assertNotEqual(cnt.validate_headers(req), '') - -    def test_validate_headers_ignoring_config_set(self): -        with patch('gluster.swift.common.constraints.' -                   'Glusterfs._ignore_unsupported_headers', True): -            req = Mock() -            req.headers = [] -            self.assertEqual(cnt.validate_headers(req), '') -            req.headers = ['x-some-header'] -            self.assertEqual(cnt.validate_headers(req), '') -            #TODO: Although we now support x-delete-at and x-delete-after, -            #retained this test case as we may add some other header to -            #unsupported list in future -            raise SkipTest -            req.headers = ['x-delete-at', 'x-some-header'] -            self.assertEqual(cnt.validate_headers(req), '') -            req.headers = ['x-delete-after', 'x-some-header'] -            self.assertEqual(cnt.validate_headers(req), '') -            req.headers = ['x-delete-at', 'x-delete-after', 'x-some-header'] -            self.assertEqual(cnt.validate_headers(req), '') - -    def test_gluster_check_metadata(self): -        mock_check_metadata = Mock() -        with patch('gluster.swift.common.constraints.__check_metadata', -                   mock_check_metadata): -            req = Mock() -            req.headers = [] -            cnt.gluster_check_metadata(req, 'object') -            self.assertTrue(1, mock_check_metadata.call_count) -            cnt.gluster_check_metadata(req, 'object', POST=False) -            self.assertTrue(1, mock_check_metadata.call_count) -            req.headers = ['x-some-header'] -            self.assertEqual(cnt.gluster_check_metadata(req, 'object', POST=False), None) -            #TODO: Although we now support x-delete-at and x-delete-after, -            #retained this test case as we may add some other header to -            #unsupported list in future -            raise SkipTest -            req.headers = ['x-delete-at', 'x-some-header'] -            self.assertNotEqual(cnt.gluster_check_metadata(req, 'object', POST=False), None) -            req.headers = ['x-delete-after', 'x-some-header'] -            self.assertNotEqual(cnt.gluster_check_metadata(req, 'object', POST=False), None) -            req.headers = ['x-delete-at', 'x-delete-after', 'x-some-header'] -            self.assertNotEqual(cnt.gluster_check_metadata(req, 'object', POST=False), None) -      def test_gluster_check_object_creation(self):          with patch('gluster.swift.common.constraints.__check_object_creation',                     mock_check_object_creation): @@ -147,9 +88,3 @@ class TestConstraints(unittest.TestCase):              req = Mock()              req.headers = []              self.assertTrue(cnt.gluster_check_object_creation(req, 'dir/.')) -            #TODO: Although we now support x-delete-at and x-delete-after, -            #retained this test case as we may add some other header to -            #unsupported list in future -            raise SkipTest -            req.headers = ['x-delete-at'] -            self.assertTrue(cnt.gluster_check_object_creation(req, 'dir/z')) diff --git a/test/unit/common/test_diskdir.py b/test/unit/common/test_diskdir.py index f32c3ad..ebc554a 100644 --- a/test/unit/common/test_diskdir.py +++ b/test/unit/common/test_diskdir.py @@ -408,7 +408,7 @@ class TestContainerBroker(unittest.TestCase):          self.assertEqual(os.path.basename(broker.db_file), 'db_file.db')          broker.initialize(self.initial_ts)          self.assertTrue(os.path.isdir(self.container)) -        self.assertEquals(self.initial_ts, broker.metadata[utils.X_TIMESTAMP]) +        self.assertEquals(self.initial_ts, broker.metadata[utils.X_TIMESTAMP][0])          self.assertFalse(broker.is_deleted())      def test_creation_existing(self): @@ -419,7 +419,7 @@ class TestContainerBroker(unittest.TestCase):          self.assertEqual(os.path.basename(broker.db_file), 'db_file.db')          broker.initialize(self.initial_ts)          self.assertTrue(os.path.isdir(self.container)) -        self.assertEquals(self.initial_ts, broker.metadata[utils.X_TIMESTAMP]) +        self.assertEquals(self.initial_ts, broker.metadata[utils.X_TIMESTAMP][0])          self.assertFalse(broker.is_deleted())      def test_creation_existing_bad_metadata(self): @@ -432,7 +432,7 @@ class TestContainerBroker(unittest.TestCase):          self.assertEqual(os.path.basename(broker.db_file), 'db_file.db')          broker.initialize(self.initial_ts)          self.assertTrue(os.path.isdir(self.container)) -        self.assertEquals(self.initial_ts, broker.metadata[utils.X_TIMESTAMP]) +        self.assertEquals(self.initial_ts, broker.metadata[utils.X_TIMESTAMP][0])          self.assertFalse(broker.is_deleted())      def test_empty(self): @@ -954,7 +954,7 @@ class TestContainerBroker(unittest.TestCase):          self.assertEqual(os.path.basename(broker.db_file), 'db_file.db')          broker.initialize(self.initial_ts)          self.assertTrue(os.path.isdir(self.container)) -        self.assertEquals(self.initial_ts, broker.metadata[utils.X_TIMESTAMP]) +        self.assertEquals(self.initial_ts, broker.metadata[utils.X_TIMESTAMP][0])          self.assertFalse(broker.is_deleted())          broker.delete_db(normalize_timestamp(time()))          self.assertTrue(broker.is_deleted()) @@ -1005,7 +1005,7 @@ class TestAccountBroker(unittest.TestCase):          self.assertEqual(os.path.basename(broker.db_file), 'db_file.db')          broker.initialize(self.initial_ts)          self.assertTrue(os.path.isdir(self.drive_fullpath)) -        self.assertEquals(self.initial_ts, broker.metadata[utils.X_TIMESTAMP]) +        self.assertEquals(self.initial_ts, broker.metadata[utils.X_TIMESTAMP][0])          self.assertFalse(broker.is_deleted())      def test_creation_bad_metadata(self): @@ -1016,7 +1016,7 @@ class TestAccountBroker(unittest.TestCase):          self.assertEqual(os.path.basename(broker.db_file), 'db_file.db')          broker.initialize(self.initial_ts)          self.assertTrue(os.path.isdir(self.drive_fullpath)) -        self.assertEquals(self.initial_ts, broker.metadata[utils.X_TIMESTAMP]) +        self.assertEquals(self.initial_ts, broker.metadata[utils.X_TIMESTAMP][0])          self.assertFalse(broker.is_deleted())      def test_empty(self): @@ -1219,7 +1219,7 @@ class TestAccountBroker(unittest.TestCase):          self.assertEqual(os.path.basename(broker.db_file), 'db_file.db')          broker.initialize(self.initial_ts)          self.assertTrue(os.path.isdir(self.drive_fullpath)) -        self.assertEquals(self.initial_ts, broker.metadata[utils.X_TIMESTAMP]) +        self.assertEquals(self.initial_ts, broker.metadata[utils.X_TIMESTAMP][0])          self.assertFalse(broker.is_deleted())          broker.delete_db(normalize_timestamp(time()))          # Deleting the "db" should be a NOOP diff --git a/test/unit/common/test_utils.py b/test/unit/common/test_utils.py index dd03bd8..55bd0cf 100644 --- a/test/unit/common/test_utils.py +++ b/test/unit/common/test_utils.py @@ -25,9 +25,10 @@ import hashlib  import tarfile  import shutil  from collections import defaultdict -from mock import patch +from mock import patch, Mock  from gluster.swift.common import utils, Glusterfs -from gluster.swift.common.exceptions import GlusterFileSystemOSError +from gluster.swift.common.exceptions import GlusterFileSystemOSError,\ +    GlusterFileSystemIOError  from swift.common.exceptions import DiskFileNoSpace  # @@ -712,6 +713,18 @@ class TestUtils(unittest.TestCase):          ret = utils.validate_object(md)          assert ret +    def test_validate_object_with_stat(self): +        md = {utils.X_TIMESTAMP: 'na', +              utils.X_CONTENT_TYPE: 'na', +              utils.X_ETAG: 'bad', +              utils.X_CONTENT_LENGTH: '12345', +              utils.X_TYPE: utils.OBJECT, +              utils.X_OBJECT_TYPE: 'na'} +        fake_stat = Mock(st_size=12346) +        self.assertFalse(utils.validate_object(md, fake_stat)) +        fake_stat = Mock(st_size=12345) +        self.assertTrue(utils.validate_object(md, fake_stat)) +  class TestUtilsDirObjects(unittest.TestCase): @@ -794,7 +807,8 @@ class TestUtilsDirObjects(unittest.TestCase):          def _mock_rm(path):              print "_mock_rm-metadata_enoent(%s)" % path              shutil.rmtree(path) -            raise OSError(errno.ENOENT, os.strerror(errno.ENOENT)) +            raise GlusterFileSystemIOError(errno.ENOENT, +                                           os.strerror(errno.ENOENT))          # Remove the files          for f in self.files: @@ -805,8 +819,8 @@ class TestUtilsDirObjects(unittest.TestCase):          try:              try:                  self.assertTrue(utils.rmobjdir(self.rootdir)) -            except OSError: -                self.fail("Unexpected OSError") +            except IOError: +                self.fail("Unexpected IOError")              else:                  pass          finally:  | 
