From 6a8e9a70e9489a8f17405adf64462899d6a4ca81 Mon Sep 17 00:00:00 2001 From: Luis Pabon Date: Fri, 10 Jan 2014 19:44:39 -0500 Subject: Sync with OpenStack v1.11.0 Jan 10 2014 Updated tox.ini, functional tests, and proxy unit tests. BUG: https://bugs.launchpad.net/bugs/1268017 Change-Id: I5ff8359b8abdb8fe5ae82492c12f57c395992735 Signed-off-by: Luis Pabon Reviewed-on: http://review.gluster.org/6682 Reviewed-by: Thiago da Silva Tested-by: Thiago da Silva --- test/unit/proxy/controllers/test_info.py | 293 +++++++++++++++++++++++++++++++ 1 file changed, 293 insertions(+) create mode 100644 test/unit/proxy/controllers/test_info.py (limited to 'test/unit/proxy/controllers/test_info.py') diff --git a/test/unit/proxy/controllers/test_info.py b/test/unit/proxy/controllers/test_info.py new file mode 100644 index 0000000..f33beba --- /dev/null +++ b/test/unit/proxy/controllers/test_info.py @@ -0,0 +1,293 @@ +# Copyright (c) 2010-2012 OpenStack Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest +import time +from mock import Mock + +from swift.proxy.controllers import InfoController +from swift.proxy.server import Application as ProxyApp +from swift.common import utils +from swift.common.utils import json +from swift.common.swob import Request, HTTPException + + +class TestInfoController(unittest.TestCase): + + def setUp(self): + utils._swift_info = {} + utils._swift_admin_info = {} + + def get_controller(self, expose_info=None, disallowed_sections=None, + admin_key=None): + disallowed_sections = disallowed_sections or [] + + app = Mock(spec=ProxyApp) + return InfoController(app, None, expose_info, + disallowed_sections, admin_key) + + def start_response(self, status, headers): + self.got_statuses.append(status) + for h in headers: + self.got_headers.append({h[0]: h[1]}) + + def test_disabled_info(self): + controller = self.get_controller(expose_info=False) + + req = Request.blank( + '/info', environ={'REQUEST_METHOD': 'GET'}) + resp = controller.GET(req) + self.assertTrue(isinstance(resp, HTTPException)) + self.assertEqual('403 Forbidden', str(resp)) + + def test_get_info(self): + controller = self.get_controller(expose_info=True) + utils._swift_info = {'foo': {'bar': 'baz'}} + utils._swift_admin_info = {'qux': {'quux': 'corge'}} + + req = Request.blank( + '/info', environ={'REQUEST_METHOD': 'GET'}) + resp = controller.GET(req) + self.assertTrue(isinstance(resp, HTTPException)) + self.assertEqual('200 OK', str(resp)) + info = json.loads(resp.body) + self.assertTrue('admin' not in info) + self.assertTrue('foo' in info) + self.assertTrue('bar' in info['foo']) + self.assertEqual(info['foo']['bar'], 'baz') + + def test_options_info(self): + controller = self.get_controller(expose_info=True) + + req = Request.blank( + '/info', environ={'REQUEST_METHOD': 'GET'}) + resp = controller.OPTIONS(req) + self.assertTrue(isinstance(resp, HTTPException)) + self.assertEqual('200 OK', str(resp)) + self.assertTrue('Allow' in resp.headers) + + def test_get_info_cors(self): + controller = self.get_controller(expose_info=True) + utils._swift_info = {'foo': {'bar': 'baz'}} + utils._swift_admin_info = {'qux': {'quux': 'corge'}} + + req = Request.blank( + '/info', environ={'REQUEST_METHOD': 'GET'}, + headers={'Origin': 'http://example.com'}) + resp = controller.GET(req) + self.assertTrue(isinstance(resp, HTTPException)) + self.assertEqual('200 OK', str(resp)) + info = json.loads(resp.body) + self.assertTrue('admin' not in info) + self.assertTrue('foo' in info) + self.assertTrue('bar' in info['foo']) + self.assertEqual(info['foo']['bar'], 'baz') + self.assertTrue('Access-Control-Allow-Origin' in resp.headers) + self.assertTrue('Access-Control-Expose-Headers' in resp.headers) + + def test_head_info(self): + controller = self.get_controller(expose_info=True) + utils._swift_info = {'foo': {'bar': 'baz'}} + utils._swift_admin_info = {'qux': {'quux': 'corge'}} + + req = Request.blank( + '/info', environ={'REQUEST_METHOD': 'HEAD'}) + resp = controller.HEAD(req) + self.assertTrue(isinstance(resp, HTTPException)) + self.assertEqual('200 OK', str(resp)) + + def test_disallow_info(self): + controller = self.get_controller(expose_info=True, + disallowed_sections=['foo2']) + utils._swift_info = {'foo': {'bar': 'baz'}, + 'foo2': {'bar2': 'baz2'}} + utils._swift_admin_info = {'qux': {'quux': 'corge'}} + + req = Request.blank( + '/info', environ={'REQUEST_METHOD': 'GET'}) + resp = controller.GET(req) + self.assertTrue(isinstance(resp, HTTPException)) + self.assertEqual('200 OK', str(resp)) + info = json.loads(resp.body) + self.assertTrue('foo' in info) + self.assertTrue('bar' in info['foo']) + self.assertEqual(info['foo']['bar'], 'baz') + self.assertTrue('foo2' not in info) + + def test_disabled_admin_info(self): + controller = self.get_controller(expose_info=True, admin_key='') + utils._swift_info = {'foo': {'bar': 'baz'}} + utils._swift_admin_info = {'qux': {'quux': 'corge'}} + + expires = int(time.time() + 86400) + sig = utils.get_hmac('GET', '/info', expires, '') + path = '/info?swiftinfo_sig={sig}&swiftinfo_expires={expires}'.format( + sig=sig, expires=expires) + req = Request.blank( + path, environ={'REQUEST_METHOD': 'GET'}) + resp = controller.GET(req) + self.assertTrue(isinstance(resp, HTTPException)) + self.assertEqual('403 Forbidden', str(resp)) + + def test_get_admin_info(self): + controller = self.get_controller(expose_info=True, + admin_key='secret-admin-key') + utils._swift_info = {'foo': {'bar': 'baz'}} + utils._swift_admin_info = {'qux': {'quux': 'corge'}} + + expires = int(time.time() + 86400) + sig = utils.get_hmac('GET', '/info', expires, 'secret-admin-key') + path = '/info?swiftinfo_sig={sig}&swiftinfo_expires={expires}'.format( + sig=sig, expires=expires) + req = Request.blank( + path, environ={'REQUEST_METHOD': 'GET'}) + resp = controller.GET(req) + self.assertTrue(isinstance(resp, HTTPException)) + self.assertEqual('200 OK', str(resp)) + info = json.loads(resp.body) + self.assertTrue('admin' in info) + self.assertTrue('qux' in info['admin']) + self.assertTrue('quux' in info['admin']['qux']) + self.assertEqual(info['admin']['qux']['quux'], 'corge') + + def test_head_admin_info(self): + controller = self.get_controller(expose_info=True, + admin_key='secret-admin-key') + utils._swift_info = {'foo': {'bar': 'baz'}} + utils._swift_admin_info = {'qux': {'quux': 'corge'}} + + expires = int(time.time() + 86400) + sig = utils.get_hmac('GET', '/info', expires, 'secret-admin-key') + path = '/info?swiftinfo_sig={sig}&swiftinfo_expires={expires}'.format( + sig=sig, expires=expires) + req = Request.blank( + path, environ={'REQUEST_METHOD': 'HEAD'}) + resp = controller.GET(req) + self.assertTrue(isinstance(resp, HTTPException)) + self.assertEqual('200 OK', str(resp)) + + expires = int(time.time() + 86400) + sig = utils.get_hmac('HEAD', '/info', expires, 'secret-admin-key') + path = '/info?swiftinfo_sig={sig}&swiftinfo_expires={expires}'.format( + sig=sig, expires=expires) + req = Request.blank( + path, environ={'REQUEST_METHOD': 'HEAD'}) + resp = controller.GET(req) + self.assertTrue(isinstance(resp, HTTPException)) + self.assertEqual('200 OK', str(resp)) + + def test_get_admin_info_invalid_method(self): + controller = self.get_controller(expose_info=True, + admin_key='secret-admin-key') + utils._swift_info = {'foo': {'bar': 'baz'}} + utils._swift_admin_info = {'qux': {'quux': 'corge'}} + + expires = int(time.time() + 86400) + sig = utils.get_hmac('HEAD', '/info', expires, 'secret-admin-key') + path = '/info?swiftinfo_sig={sig}&swiftinfo_expires={expires}'.format( + sig=sig, expires=expires) + req = Request.blank( + path, environ={'REQUEST_METHOD': 'GET'}) + resp = controller.GET(req) + self.assertTrue(isinstance(resp, HTTPException)) + self.assertEqual('401 Unauthorized', str(resp)) + + def test_get_admin_info_invalid_expires(self): + controller = self.get_controller(expose_info=True, + admin_key='secret-admin-key') + utils._swift_info = {'foo': {'bar': 'baz'}} + utils._swift_admin_info = {'qux': {'quux': 'corge'}} + + expires = 1 + sig = utils.get_hmac('GET', '/info', expires, 'secret-admin-key') + path = '/info?swiftinfo_sig={sig}&swiftinfo_expires={expires}'.format( + sig=sig, expires=expires) + req = Request.blank( + path, environ={'REQUEST_METHOD': 'GET'}) + resp = controller.GET(req) + self.assertTrue(isinstance(resp, HTTPException)) + self.assertEqual('401 Unauthorized', str(resp)) + + expires = 'abc' + sig = utils.get_hmac('GET', '/info', expires, 'secret-admin-key') + path = '/info?swiftinfo_sig={sig}&swiftinfo_expires={expires}'.format( + sig=sig, expires=expires) + req = Request.blank( + path, environ={'REQUEST_METHOD': 'GET'}) + resp = controller.GET(req) + self.assertTrue(isinstance(resp, HTTPException)) + self.assertEqual('401 Unauthorized', str(resp)) + + def test_get_admin_info_invalid_path(self): + controller = self.get_controller(expose_info=True, + admin_key='secret-admin-key') + utils._swift_info = {'foo': {'bar': 'baz'}} + utils._swift_admin_info = {'qux': {'quux': 'corge'}} + + expires = int(time.time() + 86400) + sig = utils.get_hmac('GET', '/foo', expires, 'secret-admin-key') + path = '/info?swiftinfo_sig={sig}&swiftinfo_expires={expires}'.format( + sig=sig, expires=expires) + req = Request.blank( + path, environ={'REQUEST_METHOD': 'GET'}) + resp = controller.GET(req) + self.assertTrue(isinstance(resp, HTTPException)) + self.assertEqual('401 Unauthorized', str(resp)) + + def test_get_admin_info_invalid_key(self): + controller = self.get_controller(expose_info=True, + admin_key='secret-admin-key') + utils._swift_info = {'foo': {'bar': 'baz'}} + utils._swift_admin_info = {'qux': {'quux': 'corge'}} + + expires = int(time.time() + 86400) + sig = utils.get_hmac('GET', '/foo', expires, 'invalid-admin-key') + path = '/info?swiftinfo_sig={sig}&swiftinfo_expires={expires}'.format( + sig=sig, expires=expires) + req = Request.blank( + path, environ={'REQUEST_METHOD': 'GET'}) + resp = controller.GET(req) + self.assertTrue(isinstance(resp, HTTPException)) + self.assertEqual('401 Unauthorized', str(resp)) + + def test_admin_disallow_info(self): + controller = self.get_controller(expose_info=True, + disallowed_sections=['foo2'], + admin_key='secret-admin-key') + utils._swift_info = {'foo': {'bar': 'baz'}, + 'foo2': {'bar2': 'baz2'}} + utils._swift_admin_info = {'qux': {'quux': 'corge'}} + + expires = int(time.time() + 86400) + sig = utils.get_hmac('GET', '/info', expires, 'secret-admin-key') + path = '/info?swiftinfo_sig={sig}&swiftinfo_expires={expires}'.format( + sig=sig, expires=expires) + req = Request.blank( + path, environ={'REQUEST_METHOD': 'GET'}) + resp = controller.GET(req) + self.assertTrue(isinstance(resp, HTTPException)) + self.assertEqual('200 OK', str(resp)) + info = json.loads(resp.body) + self.assertTrue('foo2' not in info) + self.assertTrue('admin' in info) + self.assertTrue('disallowed_sections' in info['admin']) + self.assertTrue('foo2' in info['admin']['disallowed_sections']) + self.assertTrue('qux' in info['admin']) + self.assertTrue('quux' in info['admin']['qux']) + self.assertEqual(info['admin']['qux']['quux'], 'corge') + + +if __name__ == '__main__': + unittest.main() -- cgit