# Copyright (c) 2012 Red Hat, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or # implied. # See the License for the specific language governing permissions and # limitations under the License. """ Tests for gluster.swift.common.DiskFile """ import os import stat import errno import unittest import tempfile import shutil from hashlib import md5 from swift.common.utils import normalize_timestamp from swift.common.exceptions import DiskFileNotExist, DiskFileError import gluster.swift.common.DiskFile import gluster.swift.common.utils from gluster.swift.common.DiskFile import Gluster_DiskFile, \ AlreadyExistsAsDir from gluster.swift.common.utils import DEFAULT_UID, DEFAULT_GID, X_TYPE, \ X_OBJECT_TYPE from test_utils import _initxattr, _destroyxattr from test.unit import FakeLogger _metadata = {} def _mock_read_metadata(filename): if filename in _metadata: md = _metadata[filename] else: md = {} return md def _mock_write_metadata(filename, metadata): _metadata[filename] = metadata def _mock_clear_metadata(): _metadata = {} class MockException(Exception): pass def _mock_rmobjdir(p): raise MockException("gluster.swift.common.DiskFile.rmobjdir() called") def _mock_do_fsync(fd): return def _mock_os_unlink_eacces_err(f): ose = OSError() ose.errno = errno.EACCES raise ose def _mock_getsize_eaccess_err(f): ose = OSError() ose.errno = errno.EACCES raise ose def _mock_do_rmdir_eacces_err(f): ose = OSError() ose.errno = errno.EACCES raise ose class MockRenamerCalled(Exception): pass def _mock_renamer(a, b): raise MockRenamerCalled() class TestDiskFile(unittest.TestCase): """ Tests for gluster.swift.common.DiskFile """ def setUp(self): self.lg = FakeLogger() _initxattr() _mock_clear_metadata() self._saved_df_wm = gluster.swift.common.DiskFile.write_metadata self._saved_df_rm = gluster.swift.common.DiskFile.read_metadata gluster.swift.common.DiskFile.write_metadata = _mock_write_metadata gluster.swift.common.DiskFile.read_metadata = _mock_read_metadata self._saved_ut_wm = gluster.swift.common.utils.write_metadata self._saved_ut_rm = gluster.swift.common.utils.read_metadata gluster.swift.common.utils.write_metadata = _mock_write_metadata gluster.swift.common.utils.read_metadata = _mock_read_metadata self._saved_do_fsync = gluster.swift.common.DiskFile.do_fsync gluster.swift.common.DiskFile.do_fsync = _mock_do_fsync def tearDown(self): self.lg = None _destroyxattr() gluster.swift.common.DiskFile.write_metadata = self._saved_df_wm gluster.swift.common.DiskFile.read_metadata = self._saved_df_rm gluster.swift.common.utils.write_metadata = self._saved_ut_wm gluster.swift.common.utils.read_metadata = self._saved_ut_rm gluster.swift.common.DiskFile.do_fsync = self._saved_do_fsync def test_constructor_no_slash(self): assert not os.path.exists("/tmp/foo") gdf = Gluster_DiskFile("/tmp/foo", "vol0", "p57", "ufo47", "bar", "z", self.lg) assert gdf._obj == "z" assert gdf._obj_path == "" assert gdf.name == "bar" assert gdf.datadir == "/tmp/foo/vol0/bar" assert gdf.device_path == "/tmp/foo/vol0" assert gdf._container_path == "/tmp/foo/vol0/bar" assert gdf.disk_chunk_size == 65536 assert gdf.iter_hook == None assert gdf.logger == self.lg assert gdf.uid == DEFAULT_UID assert gdf.gid == DEFAULT_GID assert gdf.metadata == {} assert gdf.meta_file == None assert gdf.data_file == None assert gdf.fp == None assert gdf.iter_etag == None assert not gdf.started_at_0 assert not gdf.read_to_eof assert gdf.quarantined_dir == None assert not gdf.keep_cache assert not gdf._is_dir def test_constructor_leadtrail_slash(self): assert not os.path.exists("/tmp/foo") gdf = Gluster_DiskFile("/tmp/foo", "vol0", "p57", "ufo47", "bar", "/b/a/z/", self.lg) assert gdf._obj == "z" assert gdf._obj_path == "b/a" assert gdf.name == "bar/b/a" assert gdf.datadir == "/tmp/foo/vol0/bar/b/a" assert gdf.device_path == "/tmp/foo/vol0" def test_constructor_no_metadata(self): td = tempfile.mkdtemp() the_path = os.path.join(td, "vol0", "bar") the_file = os.path.join(the_path, "z") try: os.makedirs(the_path) with open(the_file, "wb") as fd: fd.write("1234") stats = os.stat(the_file) ts = normalize_timestamp(stats.st_ctime) etag = md5() etag.update("1234") etag = etag.hexdigest() exp_md = { 'Content-Length': 4, 'ETag': etag, 'X-Timestamp': ts, 'Content-Type': 'application/octet-stream'} gdf = Gluster_DiskFile(td, "vol0", "p57", "ufo47", "bar", "z", self.lg) assert gdf._obj == "z" assert gdf.data_file == the_file assert not gdf._is_dir assert gdf.fp is None assert gdf.metadata == exp_md finally: shutil.rmtree(td) def test_constructor_existing_metadata(self): td = tempfile.mkdtemp() the_path = os.path.join(td, "vol0", "bar") the_file = os.path.join(the_path, "z") try: os.makedirs(the_path) with open(the_file, "wb") as fd: fd.write("1234") ini_md = { 'X-Type': 'Object', 'X-Object-Type': 'file', 'Content-Length': 5, 'ETag': 'etag', 'X-Timestamp': 'ts', 'Content-Type': 'application/loctet-stream'} _metadata[the_file] = ini_md exp_md = ini_md.copy() del exp_md['X-Type'] del exp_md['X-Object-Type'] gdf = Gluster_DiskFile(td, "vol0", "p57", "ufo47", "bar", "z", self.lg) assert gdf._obj == "z" assert gdf.data_file == the_file assert not gdf._is_dir assert gdf.fp is None assert gdf.metadata == exp_md finally: shutil.rmtree(td) def test_constructor_invalid_existing_metadata(self): td = tempfile.mkdtemp() the_path = os.path.join(td, "vol0", "bar") the_file = os.path.join(the_path, "z") inv_md = { 'Content-Length': 5, 'ETag': 'etag', 'X-Timestamp': 'ts', 'Content-Type': 'application/loctet-stream'} _metadata[the_file] = inv_md try: os.makedirs(the_path) with open(the_file, "wb") as fd: fd.write("1234") gdf = Gluster_DiskFile(td, "vol0", "p57", "ufo47", "bar", "z", self.lg) assert gdf._obj == "z" assert gdf.data_file == the_file assert not gdf._is_dir assert gdf.fp is None assert gdf.metadata != inv_md finally: shutil.rmtree(td) def test_constructor_isdir(self): td = tempfile.mkdtemp() the_path = os.path.join(td, "vol0", "bar") the_dir = os.path.join(the_path, "d") try: os.makedirs(the_dir) ini_md = { 'X-Type': 'Object', 'X-Object-Type': 'dir', 'Content-Length': 5, 'ETag': 'etag', 'X-Timestamp': 'ts', 'Content-Type': 'application/loctet-stream'} _metadata[the_dir] = ini_md exp_md = ini_md.copy() del exp_md['X-Type'] del exp_md['X-Object-Type'] gdf = Gluster_DiskFile(td, "vol0", "p57", "ufo47", "bar", "d", self.lg, keep_data_fp=True) assert gdf._obj == "d" assert gdf.data_file == the_dir assert gdf._is_dir assert gdf.fp is None assert gdf.metadata == exp_md finally: shutil.rmtree(td) def test_constructor_keep_data_fp(self): td = tempfile.mkdtemp() the_path = os.path.join(td, "vol0", "bar") the_file = os.path.join(the_path, "z") try: os.makedirs(the_path) with open(the_file, "wb") as fd: fd.write("1234") gdf = Gluster_DiskFile(td, "vol0", "p57", "ufo47", "bar", "z", self.lg, keep_data_fp=True) assert gdf._obj == "z" assert gdf.data_file == the_file assert not gdf._is_dir assert gdf.fp is not None finally: shutil.rmtree(td) def test_constructor_chunk_size(self): assert not os.path.exists("/tmp/foo") gdf = Gluster_DiskFile("/tmp/foo", "vol0", "p57", "ufo47", "bar", "z", self.lg, disk_chunk_size=8192) assert gdf.disk_chunk_size == 8192 def test_constructor_iter_hook(self): assert not os.path.exists("/tmp/foo") gdf = Gluster_DiskFile("/tmp/foo", "vol0", "p57", "ufo47", "bar", "z", self.lg, iter_hook='hook') assert gdf.iter_hook == 'hook' def test_close(self): assert not os.path.exists("/tmp/foo") gdf = Gluster_DiskFile("/tmp/foo", "vol0", "p57", "ufo47", "bar", "z", self.lg) # Should be a no-op, as by default is_dir is False, but fp is None gdf.close() gdf._is_dir = True gdf.fp = "123" # Should still be a no-op as is_dir is True (marker directory) gdf.close() assert gdf.fp == "123" gdf._is_dir = False saved_dc = gluster.swift.common.DiskFile.do_close self.called = False def our_do_close(fp): self.called = True gluster.swift.common.DiskFile.do_close = our_do_close try: gdf.close() assert self.called assert gdf.fp is None finally: gluster.swift.common.DiskFile.do_close = saved_dc def test_is_deleted(self): assert not os.path.exists("/tmp/foo") gdf = Gluster_DiskFile("/tmp/foo", "vol0", "p57", "ufo47", "bar", "z", self.lg) assert gdf.is_deleted() gdf.data_file = "/tmp/foo/bar" assert not gdf.is_deleted() def test_create_dir_object(self): td = tempfile.mkdtemp() the_dir = os.path.join(td, "vol0", "bar", "dir") try: gdf = Gluster_DiskFile(td, "vol0", "p57", "ufo47", "bar", "dir/z", self.lg) # Not created, dir object path is different, just checking assert gdf._obj == "z" gdf._create_dir_object(the_dir) assert os.path.isdir(the_dir) assert the_dir in _metadata finally: shutil.rmtree(td) def test_create_dir_object_exists(self): td = tempfile.mkdtemp() the_path = os.path.join(td, "vol0", "bar") the_dir = os.path.join(the_path, "dir") try: os.makedirs(the_path) with open(the_dir, "wb") as fd: fd.write("1234") gdf = Gluster_DiskFile(td, "vol0", "p57", "ufo47", "bar", "dir/z", self.lg) # Not created, dir object path is different, just checking assert gdf._obj == "z" def _mock_do_chown(p, u, g): assert u == DEFAULT_UID assert g == DEFAULT_GID dc = gluster.swift.common.DiskFile.do_chown gluster.swift.common.DiskFile.do_chown = _mock_do_chown self.assertRaises(DiskFileError, gdf._create_dir_object, the_dir) gluster.swift.common.DiskFile.do_chown = dc self.assertFalse(os.path.isdir(the_dir)) self.assertFalse(the_dir in _metadata) finally: shutil.rmtree(td) def test_put_metadata(self): td = tempfile.mkdtemp() the_path = os.path.join(td, "vol0", "bar") the_dir = os.path.join(the_path, "z") try: os.makedirs(the_dir) gdf = Gluster_DiskFile(td, "vol0", "p57", "ufo47", "bar", "z", self.lg) md = { 'Content-Type': 'application/octet-stream', 'a': 'b' } gdf.put_metadata(md.copy()) assert gdf.metadata == md, "gdf.metadata = %r, md = %r" % (gdf.metadata, md) assert _metadata[the_dir] == md finally: shutil.rmtree(td) def test_put_w_tombstone(self): assert not os.path.exists("/tmp/foo") gdf = Gluster_DiskFile("/tmp/foo", "vol0", "p57", "ufo47", "bar", "z", self.lg) assert gdf.metadata == {} gdf.put_metadata({'x': '1'}, tombstone=True) assert gdf.metadata == {} def test_put_w_meta_file(self): td = tempfile.mkdtemp() the_path = os.path.join(td, "vol0", "bar") the_file = os.path.join(the_path, "z") try: os.makedirs(the_path) with open(the_file, "wb") as fd: fd.write("1234") gdf = Gluster_DiskFile(td, "vol0", "p57", "ufo47", "bar", "z", self.lg) newmd = gdf.metadata.copy() newmd['X-Object-Meta-test'] = '1234' gdf.put_metadata(newmd) assert gdf.metadata == newmd assert _metadata[the_file] == newmd finally: shutil.rmtree(td) def test_put_w_meta_file_no_content_type(self): td = tempfile.mkdtemp() the_path = os.path.join(td, "vol0", "bar") the_file = os.path.join(the_path, "z") try: os.makedirs(the_path) with open(the_file, "wb") as fd: fd.write("1234") gdf = Gluster_DiskFile(td, "vol0", "p57", "ufo47", "bar", "z", self.lg) newmd = gdf.metadata.copy() newmd['Content-Type'] = '' newmd['X-Object-Meta-test'] = '1234' gdf.put_metadata(newmd) assert gdf.metadata == newmd assert _metadata[the_file] == newmd finally: shutil.rmtree(td) def test_put_w_meta_dir(self): td = tempfile.mkdtemp() the_path = os.path.join(td, "vol0", "bar") the_dir = os.path.join(the_path, "dir") try: os.makedirs(the_dir) gdf = Gluster_DiskFile(td, "vol0", "p57", "ufo47", "bar", "dir", self.lg) newmd = gdf.metadata.copy() newmd['X-Object-Meta-test'] = '1234' gdf.put_metadata(newmd) assert gdf.metadata == newmd assert _metadata[the_dir] == newmd finally: shutil.rmtree(td) def test_put_w_marker_dir(self): td = tempfile.mkdtemp() the_path = os.path.join(td, "vol0", "bar") the_dir = os.path.join(the_path, "dir") try: os.makedirs(the_dir) gdf = Gluster_DiskFile(td, "vol0", "p57", "ufo47", "bar", "dir", self.lg) newmd = gdf.metadata.copy() newmd['X-Object-Meta-test'] = '1234' gdf.put_metadata(newmd) assert gdf.metadata == newmd assert _metadata[the_dir] == newmd finally: shutil.rmtree(td) def test_put_w_marker_dir_create(self): td = tempfile.mkdtemp() the_path = os.path.join(td, "vol0", "bar") the_dir = os.path.join(the_path, "dir") try: gdf = Gluster_DiskFile(td, "vol0", "p57", "ufo47", "bar", "dir", self.lg) assert gdf.metadata == {} newmd = { 'Content-Length': 0, 'ETag': 'etag', 'X-Timestamp': 'ts', 'Content-Type': 'application/directory'} gdf.put(None, newmd, extension='.dir') assert gdf.data_file == the_dir assert gdf.metadata == newmd assert _metadata[the_dir] == newmd finally: shutil.rmtree(td) def test_put_is_dir(self): td = tempfile.mkdtemp() the_path = os.path.join(td, "vol0", "bar") the_dir = os.path.join(the_path, "dir") try: os.makedirs(the_dir) gdf = Gluster_DiskFile(td, "vol0", "p57", "ufo47", "bar", "dir", self.lg) origmd = gdf.metadata.copy() origfmd = _metadata[the_dir] newmd = gdf.metadata.copy() # FIXME: This is a hack to get to the code-path; it is not clear # how this can happen normally. newmd['Content-Type'] = '' newmd['X-Object-Meta-test'] = '1234' try: gdf.put(None, newmd, extension='.data') except AlreadyExistsAsDir: pass else: self.fail("Expected to encounter 'already-exists-as-dir' exception") assert gdf.metadata == origmd assert _metadata[the_dir] == origfmd finally: shutil.rmtree(td) def test_put(self): td = tempfile.mkdtemp() try: gdf = Gluster_DiskFile(td, "vol0", "p57", "ufo47", "bar", "z", self.lg) assert gdf._obj == "z" assert gdf._obj_path == "" assert gdf.name == "bar" assert gdf.datadir == os.path.join(td, "vol0", "bar") assert gdf.data_file is None body = '1234\n' etag = md5() etag.update(body) etag = etag.hexdigest() metadata = { 'X-Timestamp': '1234', 'Content-Type': 'file', 'ETag': etag, 'Content-Length': '5', } with gdf.mkstemp() as fd: assert gdf.tmppath is not None tmppath = gdf.tmppath os.write(fd, body) gdf.put(fd, metadata) assert gdf.data_file == os.path.join(td, "vol0", "bar", "z") assert os.path.exists(gdf.data_file) assert not os.path.exists(tmppath) finally: shutil.rmtree(td) def test_put_obj_path(self): the_obj_path = os.path.join("b", "a") the_file = os.path.join(the_obj_path, "z") td = tempfile.mkdtemp() try: gdf = Gluster_DiskFile(td, "vol0", "p57", "ufo47", "bar", the_file, self.lg) assert gdf._obj == "z" assert gdf._obj_path == the_obj_path assert gdf.name == os.path.join("bar", "b", "a") assert gdf.datadir == os.path.join(td, "vol0", "bar", "b", "a") assert gdf.data_file is None body = '1234\n' etag = md5() etag.update(body) etag = etag.hexdigest() metadata = { 'X-Timestamp': '1234', 'Content-Type': 'file', 'ETag': etag, 'Content-Length': '5', } with gdf.mkstemp() as fd: assert gdf.tmppath is not None tmppath = gdf.tmppath os.write(fd, body) gdf.put(fd, metadata) assert gdf.data_file == os.path.join(td, "vol0", "bar", "b", "a", "z") assert os.path.exists(gdf.data_file) assert not os.path.exists(tmppath) finally: shutil.rmtree(td) def test_unlinkold_no_metadata(self): assert not os.path.exists("/tmp/foo") gdf = Gluster_DiskFile("/tmp/foo", "vol0", "p57", "ufo47", "bar", "z", self.lg) assert gdf.metadata == {} _saved_rmobjdir = gluster.swift.common.DiskFile.rmobjdir gluster.swift.common.DiskFile.rmobjdir = _mock_rmobjdir try: gdf.unlinkold(None) except MockException as exp: self.fail(str(exp)) finally: gluster.swift.common.DiskFile.rmobjdir = _saved_rmobjdir def test_unlinkold_same_timestamp(self): assert not os.path.exists("/tmp/foo") gdf = Gluster_DiskFile("/tmp/foo", "vol0", "p57", "ufo47", "bar", "z", self.lg) assert gdf.metadata == {} gdf.metadata['X-Timestamp'] = 1 _saved_rmobjdir = gluster.swift.common.DiskFile.rmobjdir gluster.swift.common.DiskFile.rmobjdir = _mock_rmobjdir try: gdf.unlinkold(1) except MockException as exp: self.fail(str(exp)) finally: gluster.swift.common.DiskFile.rmobjdir = _saved_rmobjdir def test_unlinkold_file(self): td = tempfile.mkdtemp() the_path = os.path.join(td, "vol0", "bar") the_file = os.path.join(the_path, "z") try: os.makedirs(the_path) with open(the_file, "wb") as fd: fd.write("1234") gdf = Gluster_DiskFile(td, "vol0", "p57", "ufo47", "bar", "z", self.lg) assert gdf._obj == "z" assert gdf.data_file == the_file assert not gdf._is_dir later = float(gdf.metadata['X-Timestamp']) + 1 gdf.unlinkold(normalize_timestamp(later)) assert os.path.isdir(gdf.datadir) assert not os.path.exists(os.path.join(gdf.datadir, gdf._obj)) finally: shutil.rmtree(td) def test_unlinkold_file_not_found(self): td = tempfile.mkdtemp() the_path = os.path.join(td, "vol0", "bar") the_file = os.path.join(the_path, "z") try: os.makedirs(the_path) with open(the_file, "wb") as fd: fd.write("1234") gdf = Gluster_DiskFile(td, "vol0", "p57", "ufo47", "bar", "z", self.lg) assert gdf._obj == "z" assert gdf.data_file == the_file assert not gdf._is_dir # Handle the case the file is not in the directory listing. os.unlink(the_file) later = float(gdf.metadata['X-Timestamp']) + 1 gdf.unlinkold(normalize_timestamp(later)) assert os.path.isdir(gdf.datadir) assert not os.path.exists(os.path.join(gdf.datadir, gdf._obj)) finally: shutil.rmtree(td) def test_unlinkold_file_unlink_error(self): td = tempfile.mkdtemp() the_path = os.path.join(td, "vol0", "bar") the_file = os.path.join(the_path, "z") try: os.makedirs(the_path) with open(the_file, "wb") as fd: fd.write("1234") gdf = Gluster_DiskFile(td, "vol0", "p57", "ufo47", "bar", "z", self.lg) assert gdf._obj == "z" assert gdf.data_file == the_file assert not gdf._is_dir later = float(gdf.metadata['X-Timestamp']) + 1 stats = os.stat(the_path) os.chmod(the_path, stats.st_mode & (~stat.S_IWUSR)) # Handle the case os_unlink() raises an OSError __os_unlink = os.unlink os.unlink = _mock_os_unlink_eacces_err try: gdf.unlinkold(normalize_timestamp(later)) except OSError as e: assert e.errno == errno.EACCES else: self.fail("Excepted an OSError when unlinking file") finally: os.unlink = __os_unlink os.chmod(the_path, stats.st_mode) assert os.path.isdir(gdf.datadir) assert os.path.exists(os.path.join(gdf.datadir, gdf._obj)) finally: shutil.rmtree(td) def test_unlinkold_is_dir(self): td = tempfile.mkdtemp() the_path = os.path.join(td, "vol0", "bar") the_dir = os.path.join(the_path, "d") try: os.makedirs(the_dir) gdf = Gluster_DiskFile(td, "vol0", "p57", "ufo47", "bar", "d", self.lg, keep_data_fp=True) assert gdf.data_file == the_dir assert gdf._is_dir later = float(gdf.metadata['X-Timestamp']) + 1 gdf.unlinkold(normalize_timestamp(later)) assert os.path.isdir(gdf.datadir) assert not os.path.exists(os.path.join(gdf.datadir, gdf._obj)) finally: shutil.rmtree(td) def test_unlinkold_is_dir_failure(self): td = tempfile.mkdtemp() the_path = os.path.join(td, "vol0", "bar") the_dir = os.path.join(the_path, "d") try: os.makedirs(the_dir) gdf = Gluster_DiskFile(td, "vol0", "p57", "ufo47", "bar", "d", self.lg, keep_data_fp=True) assert gdf.data_file == the_dir assert gdf._is_dir stats = os.stat(gdf.datadir) os.chmod(gdf.datadir, 0) __os_rmdir = os.rmdir os.rmdir = _mock_do_rmdir_eacces_err try: later = float(gdf.metadata['X-Timestamp']) + 1 gdf.unlinkold(normalize_timestamp(later)) finally: os.chmod(gdf.datadir, stats.st_mode) os.rmdir = __os_rmdir assert os.path.isdir(gdf.datadir) self.assertTrue(gdf.data_file is None) finally: shutil.rmtree(td) def test_get_data_file_size(self): td = tempfile.mkdtemp() the_path = os.path.join(td, "vol0", "bar") the_file = os.path.join(the_path, "z") try: os.makedirs(the_path) with open(the_file, "wb") as fd: fd.write("1234") gdf = Gluster_DiskFile(td, "vol0", "p57", "ufo47", "bar", "z", self.lg) assert gdf._obj == "z" assert gdf.data_file == the_file assert not gdf._is_dir assert 4 == gdf.get_data_file_size() finally: shutil.rmtree(td) def test_get_data_file_size(self): td = tempfile.mkdtemp() the_path = os.path.join(td, "vol0", "bar") the_file = os.path.join(the_path, "z") try: os.makedirs(the_path) with open(the_file, "wb") as fd: fd.write("1234") gdf = Gluster_DiskFile(td, "vol0", "p57", "ufo47", "bar", "z", self.lg) assert gdf._obj == "z" assert gdf.data_file == the_file assert not gdf._is_dir assert 4 == gdf.metadata['Content-Length'] gdf.metadata['Content-Length'] = 3 assert 4 == gdf.get_data_file_size() assert 4 == gdf.metadata['Content-Length'] finally: shutil.rmtree(td) def test_get_data_file_size_dne(self): assert not os.path.exists("/tmp/foo") gdf = Gluster_DiskFile("/tmp/foo", "vol0", "p57", "ufo47", "bar", "/b/a/z/", self.lg) try: s = gdf.get_data_file_size() except DiskFileNotExist: pass else: self.fail("Expected DiskFileNotExist exception") def test_get_data_file_size_dne_os_err(self): td = tempfile.mkdtemp() the_path = os.path.join(td, "vol0", "bar") the_file = os.path.join(the_path, "z") try: os.makedirs(the_path) with open(the_file, "wb") as fd: fd.write("1234") gdf = Gluster_DiskFile(td, "vol0", "p57", "ufo47", "bar", "z", self.lg) assert gdf._obj == "z" assert gdf.data_file == the_file assert not gdf._is_dir gdf.data_file = gdf.data_file + ".dne" try: s = gdf.get_data_file_size() except DiskFileNotExist: pass else: self.fail("Expected DiskFileNotExist exception") finally: shutil.rmtree(td) def test_get_data_file_size_os_err(self): td = tempfile.mkdtemp() the_path = os.path.join(td, "vol0", "bar") the_file = os.path.join(the_path, "z") try: os.makedirs(the_path) with open(the_file, "wb") as fd: fd.write("1234") gdf = Gluster_DiskFile(td, "vol0", "p57", "ufo47", "bar", "z", self.lg) assert gdf._obj == "z" assert gdf.data_file == the_file assert not gdf._is_dir stats = os.stat(the_path) os.chmod(the_path, 0) __os_path_getsize = os.path.getsize os.path.getsize = _mock_getsize_eaccess_err try: s = gdf.get_data_file_size() except OSError as err: assert err.errno == errno.EACCES else: self.fail("Expected OSError exception") finally: os.path.getsize = __os_path_getsize os.chmod(the_path, stats.st_mode) finally: shutil.rmtree(td) def test_get_data_file_size_dir(self): td = tempfile.mkdtemp() the_path = os.path.join(td, "vol0", "bar") the_dir = os.path.join(the_path, "d") try: os.makedirs(the_dir) gdf = Gluster_DiskFile(td, "vol0", "p57", "ufo47", "bar", "d", self.lg, keep_data_fp=True) assert gdf._obj == "d" assert gdf.data_file == the_dir assert gdf._is_dir assert 0 == gdf.get_data_file_size() finally: shutil.rmtree(td) def test_filter_metadata(self): assert not os.path.exists("/tmp/foo") gdf = Gluster_DiskFile("/tmp/foo", "vol0", "p57", "ufo47", "bar", "z", self.lg) assert gdf.metadata == {} gdf.filter_metadata() assert gdf.metadata == {} gdf.metadata[X_TYPE] = 'a' gdf.metadata[X_OBJECT_TYPE] = 'b' gdf.metadata['foobar'] = 'c' gdf.filter_metadata() assert X_TYPE not in gdf.metadata assert X_OBJECT_TYPE not in gdf.metadata assert 'foobar' in gdf.metadata def test_mkstemp(self): td = tempfile.mkdtemp() the_path = os.path.join(td, "vol0", "bar") the_dir = os.path.join(the_path, "dir") try: gdf = Gluster_DiskFile(td, "vol0", "p57", "ufo47", "bar", "dir/z", self.lg) saved_tmppath = '' with gdf.mkstemp() as fd: assert gdf.datadir == os.path.join(td, "vol0", "bar", "dir") assert os.path.isdir(gdf.datadir) saved_tmppath = gdf.tmppath assert os.path.dirname(saved_tmppath) == gdf.datadir assert os.path.basename(saved_tmppath)[:3] == '.z.' assert os.path.exists(saved_tmppath) os.write(fd, "123") # At the end of previous with block a close on fd is called. # Calling os.close on the same fd will raise an OSError # exception and we must catch it. try: os.close(fd) except OSError as err: pass else: self.fail("Exception expected") assert not os.path.exists(saved_tmppath) finally: shutil.rmtree(td) def test_mkstemp_err_on_close(self): td = tempfile.mkdtemp() the_path = os.path.join(td, "vol0", "bar") the_dir = os.path.join(the_path, "dir") try: gdf = Gluster_DiskFile(td, "vol0", "p57", "ufo47", "bar", "dir/z", self.lg) saved_tmppath = '' with gdf.mkstemp() as fd: assert gdf.datadir == os.path.join(td, "vol0", "bar", "dir") assert os.path.isdir(gdf.datadir) saved_tmppath = gdf.tmppath assert os.path.dirname(saved_tmppath) == gdf.datadir assert os.path.basename(saved_tmppath)[:3] == '.z.' assert os.path.exists(saved_tmppath) os.write(fd, "123") # Closing the fd prematurely should not raise any exceptions. os.close(fd) assert not os.path.exists(saved_tmppath) finally: shutil.rmtree(td) def test_mkstemp_err_on_unlink(self): td = tempfile.mkdtemp() the_path = os.path.join(td, "vol0", "bar") the_dir = os.path.join(the_path, "dir") try: gdf = Gluster_DiskFile(td, "vol0", "p57", "ufo47", "bar", "dir/z", self.lg) saved_tmppath = '' with gdf.mkstemp() as fd: assert gdf.datadir == os.path.join(td, "vol0", "bar", "dir") assert os.path.isdir(gdf.datadir) saved_tmppath = gdf.tmppath assert os.path.dirname(saved_tmppath) == gdf.datadir assert os.path.basename(saved_tmppath)[:3] == '.z.' assert os.path.exists(saved_tmppath) os.write(fd, "123") os.unlink(saved_tmppath) assert not os.path.exists(saved_tmppath) finally: shutil.rmtree(td)