summaryrefslogtreecommitdiffstats
path: root/test/unit/__init__.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/unit/__init__.py')
-rw-r--r--test/unit/__init__.py103
1 files changed, 85 insertions, 18 deletions
diff --git a/test/unit/__init__.py b/test/unit/__init__.py
index 04895b4..76e09c1 100644
--- a/test/unit/__init__.py
+++ b/test/unit/__init__.py
@@ -1,3 +1,18 @@
+# Copyright (c) 2010-2012 OpenStack Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
""" Swift tests """
import os
@@ -8,6 +23,7 @@ from sys import exc_info
from contextlib import contextmanager
from collections import defaultdict
from tempfile import NamedTemporaryFile
+import time
from eventlet.green import socket
from tempfile import mkdtemp
from shutil import rmtree
@@ -142,13 +158,24 @@ def tmpfile(content):
xattr_data = {}
-def _get_inode(fd):
- if not isinstance(fd, int):
- try:
- fd = fd.fileno()
- except AttributeError:
- return os.stat(fd).st_ino
- return os.fstat(fd).st_ino
+def _get_inode(fd_or_name):
+ try:
+ if isinstance(fd_or_name, int):
+ fd = fd_or_name
+ else:
+ try:
+ fd = fd_or_name.fileno()
+ except AttributeError:
+ fd = None
+ if fd is None:
+ ino = os.stat(fd_or_name).st_ino
+ else:
+ ino = os.fstat(fd).st_ino
+ except OSError as err:
+ ioerr = IOError()
+ ioerr.errno = err.errno
+ raise ioerr
+ return ino
def _setxattr(fd, k, v):
@@ -199,27 +226,57 @@ class NullLoggingHandler(logging.Handler):
pass
-class FakeLogger(object):
+class UnmockTimeModule(object):
+ """
+ Even if a test mocks time.time - you can restore unmolested behavior in a
+ another module who imports time directly by monkey patching it's imported
+ reference to the module with an instance of this class
+ """
+
+ _orig_time = time.time
+
+ def __getattribute__(self, name):
+ if name == 'time':
+ return UnmockTimeModule._orig_time
+ return getattr(time, name)
+
+
+# logging.LogRecord.__init__ calls time.time
+logging.time = UnmockTimeModule()
+
+
+class FakeLogger(logging.Logger):
# a thread safe logger
def __init__(self, *args, **kwargs):
self._clear()
+ self.name = 'swift.unit.fake_logger'
self.level = logging.NOTSET
if 'facility' in kwargs:
self.facility = kwargs['facility']
def _clear(self):
self.log_dict = defaultdict(list)
+ self.lines_dict = defaultdict(list)
def _store_in(store_name):
def stub_fn(self, *args, **kwargs):
self.log_dict[store_name].append((args, kwargs))
return stub_fn
- error = _store_in('error')
- info = _store_in('info')
- warning = _store_in('warning')
- debug = _store_in('debug')
+ def _store_and_log_in(store_name):
+ def stub_fn(self, *args, **kwargs):
+ self.log_dict[store_name].append((args, kwargs))
+ self._log(store_name, args[0], args[1:], **kwargs)
+ return stub_fn
+
+ def get_lines_for_level(self, level):
+ return self.lines_dict[level]
+
+ error = _store_and_log_in('error')
+ info = _store_and_log_in('info')
+ warning = _store_and_log_in('warning')
+ debug = _store_and_log_in('debug')
def exception(self, *args, **kwargs):
self.log_dict['exception'].append((args, kwargs, str(exc_info()[1])))
@@ -267,7 +324,13 @@ class FakeLogger(object):
pass
def handle(self, record):
- pass
+ try:
+ line = record.getMessage()
+ except TypeError:
+ print 'WARNING: unable to format log message %r %% %r' % (
+ record.msg, record.args)
+ raise
+ self.lines_dict[record.levelno].append(line)
def flush(self):
pass
@@ -354,11 +417,13 @@ def mock(update):
else:
deletes.append((module, attr))
setattr(module, attr, value)
- yield True
- for module, attr, value in returns:
- setattr(module, attr, value)
- for module, attr in deletes:
- delattr(module, attr)
+ try:
+ yield True
+ finally:
+ for module, attr, value in returns:
+ setattr(module, attr, value)
+ for module, attr in deletes:
+ delattr(module, attr)
def fake_http_connect(*code_iter, **kwargs):
@@ -466,6 +531,8 @@ def fake_http_connect(*code_iter, **kwargs):
body_iter = iter(body_iter)
def connect(*args, **ckwargs):
+ if kwargs.get('slow_connect', False):
+ sleep(0.1)
if 'give_content_type' in kwargs:
if len(args) >= 7 and 'Content-Type' in args[6]:
kwargs['give_content_type'](args[6]['Content-Type'])