# Copyright (c) 2016 Red Hat, Inc. # # This file is part of libgfapi-python project which is a # subproject of GlusterFS ( www.gluster.org) # # This file is licensed to you under your choice of the GNU Lesser # General Public License, version 3 or any later version (LGPLv3 or # later), or the GNU General Public License, version 2 (GPLv2), in all # cases as published by the Free Software Foundation. import unittest import os import stat import types import errno import hashlib import threading import uuid from nose import SkipTest from test import get_test_config from ConfigParser import NoSectionError, NoOptionError from uuid import uuid4 from gluster.gfapi.api import Stat from gluster.gfapi import File, Volume, DirEntry from gluster.gfapi.exceptions import LibgfapiException, Error config = get_test_config() if config: try: HOST = config.get('func_test', 'gfs_host') except (NoSectionError, NoOptionError): HOST = 'localhost' try: VOLNAME = config.get('func_test', 'gfs_volname') except (NoSectionError, NoOptionError): VOLNAME = 'test' else: HOST = 'localhost' VOLNAME = 'test' GLUSTERD_SOCK_FILE = "/var/run/glusterd.socket" class BinFileOpsTest(unittest.TestCase): vol = None path = None data = None @classmethod def setUpClass(cls): cls.vol = Volume(HOST, VOLNAME) cls.vol.mount() # Cleanup volume cls.vol.rmtree("/", ignore_errors=True) @classmethod def tearDownClass(cls): cls.vol.rmtree("/", ignore_errors=True) cls.vol = None def test_bin_open_and_read(self): # Write binary data data = "Gluster is so awesome" payload = bytearray(data, "ascii") path = self._testMethodName + ".io" with File(self.vol.open(path, os.O_CREAT | os.O_WRONLY | os.O_EXCL, 0644)) as f: f.write(payload) # Read binary data with File(self.vol.open(path, os.O_RDONLY)) as f: buf = f.read() self.assertEqual(bytearray(buf), payload) self.assertEqual(buf.decode("ascii"), data) class FileOpsTest(unittest.TestCase): vol = None path = None data = None @classmethod def setUpClass(cls): cls.vol = Volume(HOST, VOLNAME) cls.vol.mount() # Cleanup volume cls.vol.rmtree("/", ignore_errors=True) @classmethod def tearDownClass(cls): cls.vol.rmtree("/", ignore_errors=True) cls.vol = None def setUp(self): self.data = "gluster is awesome" self.path = self._testMethodName + ".io" with File(self.vol.open(self.path, os.O_CREAT | os.O_WRONLY | os.O_EXCL, 0644), path=self.path) as f: rc = f.write(self.data) self.assertEqual(rc, len(self.data)) f.fsync() self.assertEqual(f.originalpath, self.path) def tearDown(self): self.path = None self.data = None def test_open_and_read(self): with File(self.vol.open(self.path, os.O_RDONLY)) as f: self.assertTrue(isinstance(f, File)) buf = f.read(len(self.data)) self.assertFalse(isinstance(buf, types.IntType)) self.assertEqual(buf, self.data) def test_open_file_not_exist(self): try: f = File(self.vol.open("filenotexist", os.O_WRONLY)) except OSError as e: self.assertEqual(e.errno, errno.ENOENT) else: f.close() self.fail("Expected a OSError with errno.ENOENT") def test_open_err(self): # flags not int self.assertRaises(TypeError, self.vol.open, "file", 'w') # invalid flags self.assertRaises(OSError, self.vol.open, "file", 12345) def test_double_close(self): name = uuid4().hex f = self.vol.fopen(name, 'w') f.close() for i in range(2): try: f.close() except OSError as err: self.assertEqual(err.errno, errno.EBADF) else: self.fail("Expecting OSError") def test_glfd_decorators_IO_on_invalid_glfd(self): name = uuid4().hex with self.vol.fopen(name, 'w') as f: f.write("Valar Morghulis") try: s = f.read() except OSError as err: self.assertEqual(err.errno, errno.EBADF) else: self.fail("Expecting OSError") def test_fopen_err(self): # mode not string self.assertRaises(TypeError, self.vol.fopen, "file", os.O_WRONLY) # invalid mode self.assertRaises(ValueError, self.vol.fopen, "file", 'x+') # file does not exist try: self.vol.fopen("file", "r") except OSError as err: if err.errno != errno.ENOENT: self.fail("Expecting ENOENT") else: self.fail("Expecting ENOENT") def test_fopen(self): # Default permission should be 0666 name = uuid4().hex data = "Gluster is so awesome" with self.vol.fopen(name, 'w') as f: f.write(data) perms = self.vol.stat(name).st_mode & 0777 self.assertEqual(perms, int(0666)) # 'r': Open file for reading. # If not specified, mode should default to 'r' with self.vol.fopen(name) as f: self.assertEqual('r', f.mode) self.assertEqual(f.lseek(0, os.SEEK_CUR), 0) self.assertEqual(f.read(), data) # 'r+': Open for reading and writing. with self.vol.fopen(name, 'r+') as f: self.assertEqual(f.lseek(0, os.SEEK_CUR), 0) self.assertEqual('r+', f.mode) # ftruncate doesn't (and shouldn't) change offset f.ftruncate(0) # writes should pass f.write(data) f.lseek(0, os.SEEK_SET) self.assertEqual(f.read(), data) # 'w': Truncate file to zero length or create text file for writing. self.assertEqual(self.vol.getsize(name), len(data)) with self.vol.fopen(name, 'w') as f: self.assertEqual('w', f.mode) f.fsync() self.assertEqual(self.vol.getsize(name), 0) f.write(data) # 'w+': Open for reading and writing. The file is created if it does # not exist, otherwise it is truncated. with self.vol.fopen(name, 'w+') as f: self.assertEqual('w+', f.mode) f.fsync() self.assertEqual(self.vol.getsize(name), 0) f.write(data) f.lseek(0, os.SEEK_SET) self.assertEqual(f.read(), data) # 'a': Open for appending (writing at end of file). The file is # created if it does not exist. with self.vol.fopen(name, 'a') as f: self.assertEqual('a', f.mode) # This should be appended at the end f.write("hello") with self.vol.fopen(name) as f: self.assertEqual(f.read(), data + "hello") # 'a+': Open for reading and appending (writing at end of file) with self.vol.fopen(name, 'a+') as f: self.assertEqual('a+', f.mode) # This should be appended at the end f.write(" world") f.fsync() f.lseek(0, os.SEEK_SET) self.assertEqual(f.read(), data + "hello world") def test_fopen_in_thread(self): def gluster_fopen(): name = uuid4().hex with self.vol.fopen(name, 'w') as f: f.write('hello world') # the following caused segfault before the fix thread = threading.Thread(target=gluster_fopen) thread.start() thread.join() def test_create_file_already_exists(self): try: f = File(self.vol.open("newfile", os.O_CREAT)) f.close() g = File(self.vol.open("newfile", os.O_CREAT | os.O_EXCL)) except OSError as e: self.assertEqual(e.errno, errno.EEXIST) else: g.close() self.fail("Expected a OSError with errno.EEXIST") def test_write_file_dup_lseek_read(self): try: f = File(self.vol.open("dune", os.O_CREAT | os.O_EXCL | os.O_RDWR)) f.write("I must not fear. Fear is the mind-killer.") fdup = f.dup() self.assertTrue(isinstance(fdup, File)) f.close() ret = fdup.lseek(0, os.SEEK_SET) self.assertEqual(ret, 0) buf = fdup.read(15) self.assertEqual(buf, "I must not fear") ret = fdup.lseek(29, os.SEEK_SET) self.assertEqual(ret, 29) buf = fdup.read(11) self.assertEqual(buf, "mind-killer") fdup.close() except OSError as e: self.fail(e.message) def test_chmod(self): stat = self.vol.stat(self.path) orig_mode = oct(stat.st_mode & 0777) self.assertEqual(orig_mode, '0644L') self.vol.chmod(self.path, 0600) stat = self.vol.stat(self.path) new_mode = oct(stat.st_mode & 0777) self.assertEqual(new_mode, '0600L') def test_exists(self): e = self.vol.exists(self.path) self.assertTrue(e) def test_exists_false(self): e = self.vol.exists("filedoesnotexist") self.assertFalse(e) def test_getsize(self): size = self.vol.getsize(self.path) self.assertEqual(size, len(self.data)) def test_isfile(self): isfile = self.vol.isfile(self.path) self.assertTrue(isfile) def test_isdir_false(self): isdir = self.vol.isdir(self.path) self.assertFalse(isdir) def test_symlink(self): link = self._testMethodName + ".link" self.vol.symlink(self.path, link) islink = self.vol.islink(link) self.assertTrue(islink) def test_islink_false(self): islink = self.vol.islink(self.path) self.assertFalse(islink) def test_lstat(self): sb = self.vol.lstat(self.path) self.assertFalse(isinstance(sb, types.IntType)) self.assertEqual(sb.st_size, len(self.data)) def test_rename(self): newpath = self.path + ".rename" self.vol.rename(self.path, newpath) try: self.vol.lstat(self.path) except OSError as err: if err.errno != errno.ENOENT: self.fail("Expecting ENOENT") else: self.fail("Expecting ENOENT") def test_stat(self): sb = self.vol.stat(self.path) self.assertFalse(isinstance(sb, types.IntType)) self.assertEqual(sb.st_size, len(self.data)) def test_unlink(self): self.vol.unlink(self.path) try: self.vol.lstat(self.path) except OSError as err: if err.errno != errno.ENOENT: self.fail("Expecting ENOENT") else: self.fail("Expecting ENOENT") def test_setxattr(self): value = "hello world" self.vol.setxattr(self.path, "trusted.key1", value) self.assertEqual(self.vol.getxattr(self.path, "trusted.key1"), value) # flag = 1 behavior: fail if xattr exists self.assertRaises(OSError, self.vol.setxattr, self.path, "trusted.key1", "whatever", flags=1) # flag = 1 behavior: pass if xattr does not exist self.vol.setxattr(self.path, "trusted.key2", "awesome", flags=1) self.assertEqual(self.vol.getxattr(self.path, "trusted.key2"), "awesome") # flag = 2 behavior: fail if xattr does not exist self.assertRaises(OSError, self.vol.setxattr, self.path, "trusted.key3", "whatever", flags=2) # flag = 2 behavior: pass if xattr exists self.vol.setxattr(self.path, "trusted.key2", "more awesome", flags=2) self.assertEqual(self.vol.getxattr(self.path, "trusted.key2"), "more awesome") def test_getxattr(self): self.vol.setxattr(self.path, "user.gluster", "awesome") # user does not know the size of value beforehand self.assertEqual(self.vol.getxattr(self.path, "user.gluster"), "awesome") # user knows the size of value beforehand self.assertEqual(self.vol.getxattr(self.path, "user.gluster", size=7), "awesome") # size is larger self.assertEqual(self.vol.getxattr(self.path, "user.gluster", size=20), "awesome") # size is smaller self.assertRaises(OSError, self.vol.getxattr, self.path, "user.gluster", size=1) # size is negative self.assertRaises(ValueError, self.vol.getxattr, self.path, "user.gluster", size=-7) def test_listxattr(self): self.vol.setxattr(self.path, "user.gluster", "awesome") self.vol.setxattr(self.path, "user.gluster2", "awesome2") xattrs = self.vol.listxattr(self.path) self.assertTrue("user.gluster" in xattrs) self.assertTrue("user.gluster2" in xattrs) # Test passing of size # larger size - should pass xattrs = self.vol.listxattr(self.path, size=512) self.assertTrue("user.gluster" in xattrs) self.assertTrue("user.gluster2" in xattrs) # smaller size - should fail self.assertRaises(OSError, self.vol.listxattr, self.path, size=1) # invalid size - should fail self.assertRaises(ValueError, self.vol.listxattr, self.path, size=-1) def test_removexattr(self): self.vol.setxattr(self.path, "user.gluster", "awesome") self.vol.removexattr(self.path, "user.gluster") # The xattr now shouldn't exist self.assertRaises(OSError, self.vol.getxattr, self.path, "user.gluster") # Removing an xattr that does not exist self.assertRaises(OSError, self.vol.removexattr, self.path, "user.gluster") def test_fsetxattr(self): name = uuid4().hex with File(self.vol.open(name, os.O_WRONLY | os.O_CREAT)) as f: f.fsetxattr("user.gluster", "awesome") self.assertEqual(f.fgetxattr("user.gluster"), "awesome") # flag = 1 behavior: fail if xattr exists self.assertRaises(OSError, f.fsetxattr, "user.gluster", "more awesome", flags=1) # flag = 1 behavior: pass if xattr does not exist f.fsetxattr("user.gluster2", "awesome2", flags=1) self.assertEqual(f.fgetxattr("user.gluster2"), "awesome2") # flag = 2 behavior: fail if xattr does not exist self.assertRaises(OSError, f.fsetxattr, "user.whatever", "awesome", flags=2) # flag = 2 behavior: pass if xattr exists f.fsetxattr("user.gluster", "more awesome", flags=2) self.assertEqual(f.fgetxattr("user.gluster"), "more awesome") def test_fremovexattr(self): name = uuid4().hex with File(self.vol.open(name, os.O_WRONLY | os.O_CREAT)) as f: f.fsetxattr("user.gluster", "awesome") f.fremovexattr("user.gluster") # The xattr now shouldn't exist self.assertRaises(OSError, f.fgetxattr, "user.gluster") # Removing an xattr that does not exist self.assertRaises(OSError, f.fremovexattr, "user.gluster") def test_fgetxattr(self): name = uuid4().hex with File(self.vol.open(name, os.O_WRONLY | os.O_CREAT)) as f: f.fsetxattr("user.gluster", "awesome") # user does not know the size of value beforehand self.assertEqual(f.fgetxattr("user.gluster"), "awesome") # user knows the size of value beforehand self.assertEqual(f.fgetxattr("user.gluster", 7), "awesome") # size is larger self.assertEqual(f.fgetxattr("user.gluster", 70), "awesome") # size is smaller self.assertRaises(OSError, f.fgetxattr, "user.gluster", size=1) # size is negative self.assertRaises(ValueError, f.fgetxattr, "user.gluster", size=-7) def test_ftruncate(self): name = uuid4().hex with File(self.vol.open(name, os.O_WRONLY | os.O_CREAT)) as f: f.write("123456789") f.ftruncate(5) f.fsync() with File(self.vol.open(name, os.O_RDONLY)) as f: # The size should be reduced self.assertEqual(f.fgetsize(), 5) # So should be the content. self.assertEqual(f.read(), "12345") def test_fallocate(self): name = uuid4().hex with File(self.vol.open(name, os.O_WRONLY | os.O_CREAT)) as f: f.fallocate(0, 0, 10) f.fsync() # Stat information should now show the allocated size. self.assertEqual(f.fstat().st_size, 10) def test_discard(self): name = uuid4().hex with File(self.vol.open(name, os.O_WRONLY | os.O_CREAT)) as f: f.fallocate(0, 0, 10) f.fsync() self.assertEqual(f.fstat().st_size, 10) # We can't really know if the blocks were actually returned # to filesystem. This functional test only tests if glfs_discard # interfacing is proper and that it returns successfully. f.discard(4, 5) def test_zerofill(self): name = uuid4().hex with File(self.vol.open(name, os.O_RDWR | os.O_CREAT)) as f: f.write('0123456789') f.fsync() self.assertEqual(f.fstat().st_size, 10) f.lseek(0, os.SEEK_SET) self.assertEqual(f.read(), '0123456789') f.zerofill(3, 6) f.lseek(0, os.SEEK_SET) data = f.read() self.assertEqual(data, '012\x00\x00\x00\x00\x00\x009') self.assertEqual(len(data), 10) def test_utime(self): # Create a file name = uuid4().hex self.vol.fopen(name, 'w').close() # Test times arg being invalid for junk in ('a', 1234.1234, (1, 2, 3), (1)): self.assertRaises(TypeError, self.vol.utime, name, junk) # Test normal success # Mission Report: December 16th, 1991 (atime, mtime) = (692884800, 692884800) self.vol.utime(name, (atime, mtime)) st = self.vol.stat(name) self.assertEqual(st.st_atime, atime) self.assertEqual(st.st_mtime, mtime) # Test times = None self.vol.utime(name, None) new_st = self.vol.stat(name) self.assertTrue(new_st.st_atime > st.st_atime) self.assertTrue(new_st.st_mtime > st.st_mtime) # Non-existent file self.assertRaises(OSError, self.vol.utime, 'non-existent-file', None) def test_flistxattr(self): name = uuid4().hex with File(self.vol.open(name, os.O_RDWR | os.O_CREAT)) as f: f.fsetxattr("user.gluster", "awesome") f.fsetxattr("user.gluster2", "awesome2") xattrs = f.flistxattr() self.assertTrue("user.gluster" in xattrs) self.assertTrue("user.gluster2" in xattrs) # Test passing of size # larger size - should pass xattrs = f.flistxattr(size=512) self.assertTrue("user.gluster" in xattrs) self.assertTrue("user.gluster2" in xattrs) # smaller size - should fail self.assertRaises(OSError, f.flistxattr, size=1) # invalid size - should fail self.assertRaises(ValueError, f.flistxattr, size=-1) def test_access(self): file_name = uuid4().hex with File(self.vol.open(file_name, os.O_WRONLY | os.O_CREAT)) as f: f.write("I'm whatever Gotham needs me to be") f.fsync() # Check that file exists self.assertTrue(self.vol.access(file_name, os.F_OK)) # Check that file does not exist self.assertFalse(self.vol.access("nonexistentfile", os.F_OK)) dir_name = uuid4().hex self.vol.mkdir(dir_name) # Check that directory exists self.assertTrue(self.vol.access(dir_name, os.F_OK)) # Check if there is execute and write permission self.assertTrue(self.vol.access(file_name, os.W_OK | os.X_OK)) def test_getcwd_and_chdir(self): # CWD should be root at first self.assertEqual(self.vol.getcwd(), '/') dir_structure = "/%s/%s" % (uuid4().hex, uuid4().hex) self.vol.makedirs(dir_structure) # Change directory self.vol.chdir(dir_structure) # The changed directory should now be CWD self.assertEqual(self.vol.getcwd(), dir_structure) self.vol.chdir("../..") self.assertEqual(self.vol.getcwd(), '/') def test_readlink(self): file_name = uuid4().hex with File(self.vol.open(file_name, os.O_WRONLY | os.O_CREAT)) as f: f.write("It's not who I am underneath," "but what I do that defines me.") f.fsync() # Create a symlink link_name = uuid4().hex self.vol.symlink(file_name, link_name) self.assertEqual(self.vol.readlink(link_name), file_name) def test_readinto(self): file_name = uuid4().hex with File(self.vol.open(file_name, os.O_WRONLY | os.O_CREAT)) as f: s = ''.join([str(i) for i in xrange(10)]) f.write(s) f.fsync() buf = bytearray(1) with File(self.vol.open(file_name, os.O_RDONLY)) as f: for i in xrange(10): # Read one character at a time into buf f.readinto(buf) self.assertEqual(len(buf), 1) self.assertEqual(buf, bytearray(str(i))) with File(self.vol.open(file_name, os.O_RDONLY)) as f: self.assertRaises(TypeError, f.readinto, str("buf")) def test_link(self): name1 = uuid4().hex self.vol.fopen(name1, 'w').close() name2 = uuid4().hex self.vol.link(name1, name2) self.assertTrue(self.vol.samefile(name1, name2)) self.assertEqual(self.vol.stat(name1).st_nlink, 2) self.assertEqual(self.vol.stat(name2).st_nlink, 2) # source does not exist self.assertRaises(OSError, self.vol.link, 'nonexistent file', 'link') # target already exists self.assertRaises(OSError, self.vol.link, name1, name2) def test_copyfileobj(self): # Create source file. src_file = uuid4().hex with self.vol.fopen(src_file, 'wb') as f: for i in xrange(2): f.write(os.urandom(128 * 1024)) f.write(os.urandom(25 * 1024)) # Change/set atime and mtime (atime, mtime) = (692884800, 692884800) self.vol.utime(src_file, (atime, mtime)) # Calculate checksum of source file. src_file_checksum = hashlib.md5() with self.vol.fopen(src_file, 'rb') as f: src_file_checksum.update(f.read(32 * 1024)) # Copy file dest_file = uuid4().hex with self.vol.fopen(src_file, 'rb') as fsrc: with self.vol.fopen(dest_file, 'wb') as fdst: self.vol.copyfileobj(fsrc, fdst) # Calculate checksum of destination dest_file_checksum = hashlib.md5() with self.vol.fopen(dest_file, 'rb') as f: dest_file_checksum.update(f.read(32 * 1024)) self.assertEqual(src_file_checksum.hexdigest(), dest_file_checksum.hexdigest()) # Copy file with different buffer size self.vol.unlink(dest_file) with self.vol.fopen(src_file, 'rb') as fsrc: with self.vol.fopen(dest_file, 'wb') as fdst: self.vol.copyfileobj(fsrc, fdst, 32 * 1024) # Calculate checksum of destination dest_file_checksum = hashlib.md5() with self.vol.fopen(dest_file, 'rb') as f: dest_file_checksum.update(f.read(32 * 1024)) self.assertEqual(src_file_checksum.hexdigest(), dest_file_checksum.hexdigest()) # The destination file should not have same mtime src_stat = self.vol.stat(src_file) dest_stat = self.vol.stat(dest_file) self.assertNotEqual(src_stat.st_mtime, dest_stat.st_mtime) # Test over-writing destination that exists dest_file = uuid4().hex with self.vol.fopen(dest_file, 'w') as f: data = "A boy wants this test to not fail." f.write(data) with self.vol.fopen(src_file, 'rb') as fsrc: with self.vol.fopen(dest_file, 'wb') as fdst: self.vol.copyfileobj(fsrc, fdst) self.assertNotEqual(self.vol.stat(src_file).st_size, len(data)) # Test one of the file object is closed f1 = self.vol.fopen(src_file, 'rb') f1.close() f2 = self.vol.fopen(dest_file, 'wb') self.assertRaises(OSError, self.vol.copyfileobj, f1, f2) f2.close() def test_copyfile_samefile(self): # Source and destination same error name = uuid4().hex self.vol.fopen(name, 'w').close() self.assertRaises(Error, self.vol.copyfile, name, name) # Harlink test name2 = uuid4().hex self.vol.link(name, name2) self.assertRaises(Error, self.vol.copyfile, name, name2) def test_copymode(self): src_file = uuid4().hex self.vol.fopen(src_file, 'w').close() self.vol.chmod(src_file, 0644) dest_file = uuid4().hex self.vol.fopen(dest_file, 'w').close() self.vol.chmod(dest_file, 0640) self.vol.copymode(src_file, dest_file) self.assertEqual(self.vol.stat(src_file).st_mode, self.vol.stat(dest_file).st_mode) def test_copystat(self): # Create source file and set mode, atime, mtime src_file = uuid4().hex self.vol.fopen(src_file, 'w').close() self.vol.chmod(src_file, 0640) (atime, mtime) = (692884800, 692884800) self.vol.utime(src_file, (atime, mtime)) # Create destination file dest_file = uuid4().hex self.vol.fopen(dest_file, 'w').close() # Invoke copystat() self.vol.copystat(src_file, dest_file) # Verify src_stat = self.vol.stat(src_file) dest_stat = self.vol.stat(dest_file) self.assertEqual(src_stat.st_mode, dest_stat.st_mode) self.assertEqual(src_stat.st_atime, dest_stat.st_atime) self.assertEqual(src_stat.st_mtime, dest_stat.st_mtime) def test_copy(self): # Create source file. src_file = uuid4().hex with self.vol.fopen(src_file, 'wb') as f: for i in xrange(2): f.write(os.urandom(128 * 1024)) f.write(os.urandom(25 * 1024)) # Calculate checksum of source file. src_file_checksum = hashlib.md5() with self.vol.fopen(src_file, 'rb') as f: src_file_checksum.update(f.read(32 * 1024)) # Copy file into dir dest_dir = uuid4().hex self.vol.mkdir(dest_dir) self.vol.copy(src_file, dest_dir) # Calculate checksum of destination dest_file = os.path.join(dest_dir, src_file) dest_file_checksum = hashlib.md5() with self.vol.fopen(dest_file, 'rb') as f: dest_file_checksum.update(f.read(32 * 1024)) # verify data self.assertEqual(src_file_checksum.hexdigest(), dest_file_checksum.hexdigest()) # verify mode src_stat = self.vol.stat(src_file) dest_stat = self.vol.stat(dest_file) self.assertEqual(src_stat.st_mode, dest_stat.st_mode) def test_copy2(self): # Create source file. src_file = uuid4().hex with self.vol.fopen(src_file, 'wb') as f: for i in xrange(2): f.write(os.urandom(128 * 1024)) f.write(os.urandom(25 * 1024)) (atime, mtime) = (692884800, 692884800) self.vol.utime(src_file, (atime, mtime)) # Calculate checksum of source file. src_file_checksum = hashlib.md5() with self.vol.fopen(src_file, 'rb') as f: src_file_checksum.update(f.read(32 * 1024)) # Copy file into dir dest_dir = uuid4().hex self.vol.mkdir(dest_dir) self.vol.copy2(src_file, dest_dir) # Calculate checksum of destination dest_file = os.path.join(dest_dir, src_file) dest_file_checksum = hashlib.md5() with self.vol.fopen(dest_file, 'rb') as f: dest_file_checksum.update(f.read(32 * 1024)) # verify data self.assertEqual(src_file_checksum.hexdigest(), dest_file_checksum.hexdigest()) # verify mode and stat src_stat = self.vol.stat(src_file) dest_stat = self.vol.stat(dest_file) self.assertEqual(src_stat.st_mode, dest_stat.st_mode) self.assertEqual(src_stat.st_mtime, dest_stat.st_mtime) class DirOpsTest(unittest.TestCase): data = None dir_path = None @classmethod def setUpClass(cls): cls.vol = Volume(HOST, VOLNAME) cls.vol.mount() # Cleanup volume cls.vol.rmtree("/", ignore_errors=True) @classmethod def tearDownClass(cls): cls.vol.rmtree("/", ignore_errors=True) cls.vol = None def setUp(self): # Create a filesystem tree self.data = "gluster is awesome" self.dir_path = self._testMethodName + "_dir" self.vol.mkdir(self.dir_path, 0755) for x in range(0, 3): d = os.path.join(self.dir_path, 'testdir' + str(x)) self.vol.mkdir(d) # Create files inside two of the three directories if x != 1: for i in range(0, 2): f = os.path.join(d, 'nestedfile' + str(i)) with self.vol.fopen(f, 'w') as f: rc = f.write(self.data) self.assertEqual(rc, len(self.data)) # Create single file in root of directory if x == 2: file_path = os.path.join(self.dir_path, "testfile") with self.vol.fopen(file_path, 'w') as f: rc = f.write(self.data) self.assertEqual(rc, len(self.data)) # Create symlinks - one pointing to a file and another to a dir # Beware: rmtree() cannot remove these symlinks self.vol.symlink("testfile", os.path.join(self.dir_path, 'test_symlink_file')) self.vol.symlink("testdir2", os.path.join(self.dir_path, 'test_symlink_dir')) # The dir tree set up for testing now looks like this: # test_name_here # |-- testdir0 # | |-- nestedfile0 # | |-- nestedfile1 # |-- testdir1 # |-- testdir2 # | |-- nestedfile0 # | |-- nestedfile1 # |-- testfile # |-- testsymlink_file --> testfile # |-- testsymlink_dir --> testdir2 def tearDown(self): self._symlinks_cleanup() self.dir_path = None self.data = None def test_isdir(self): self.assertTrue(self.vol.isdir(self.dir_path)) self.assertFalse(self.vol.isfile(self.dir_path)) def test_listdir(self): dir_list = self.vol.listdir(self.dir_path) dir_list.sort() self.assertEqual(dir_list, ["test_symlink_dir", "test_symlink_file", "testdir0", "testdir1", "testdir2", "testfile"]) def test_listdir_with_stat(self): dir_list = self.vol.listdir_with_stat(self.dir_path) dir_list_sorted = sorted(dir_list, key=lambda tup: tup[0]) dir_count = 0 file_count = 0 symlink_count = 0 for index, (name, stat_info) in enumerate(dir_list_sorted): self.assertTrue(isinstance(stat_info, Stat)) if stat.S_ISREG(stat_info.st_mode): self.assertEqual(stat_info.st_size, len(self.data)) file_count += 1 elif stat.S_ISDIR(stat_info.st_mode): self.assertEqual(stat_info.st_size, 4096) dir_count += 1 elif stat.S_ISLNK(stat_info.st_mode): symlink_count += 1 self.assertEqual(dir_count, 3) self.assertEqual(file_count, 1) self.assertEqual(symlink_count, 2) # Error - path does not exist self.assertRaises(OSError, self.vol.listdir_with_stat, 'non-existent-dir') def test_scandir(self): entries = [] for entry in self.vol.scandir(self.dir_path): self.assertTrue(isinstance(entry, DirEntry)) entries.append(entry) dir_count = 0 file_count = 0 symlink_count = 0 entries_sorted = sorted(entries, key=lambda e: e.name) for index, entry in enumerate(entries_sorted): self.assertTrue(isinstance(entry.stat(), Stat)) if entry.is_file(): self.assertEqual(entry.stat().st_size, len(self.data)) self.assertFalse(entry.is_dir()) file_count += 1 elif entry.is_dir(): self.assertEqual(entry.stat().st_size, 4096) self.assertFalse(entry.is_file()) dir_count += 1 elif entry.is_symlink(): symlink_count += 1 self.assertEqual(dir_count, 3) self.assertEqual(file_count, 1) self.assertEqual(symlink_count, 2) def test_makedirs(self): name = self.dir_path + "/subd1/subd2/subd3" self.vol.makedirs(name, 0755) self.assertTrue(self.vol.isdir(name)) def test_statvfs(self): sb = self.vol.statvfs("/") self.assertFalse(isinstance(sb, types.IntType)) self.assertEqual(sb.f_namemax, 255L) # creating a dir, checking Total number of free inodes # is reduced self.vol.makedirs("statvfs_dir1", 0755) sb2 = self.vol.statvfs("/") self.assertTrue(sb2.f_ffree < sb.f_ffree) def test_rmtree(self): """ by testing rmtree, we are also testing unlink and rmdir """ f = os.path.join(self.dir_path, "testdir0", "nestedfile0") self.vol.exists(f) d = os.path.join(self.dir_path, "testdir0") self.vol.rmtree(d, True) self.assertRaises(OSError, self.vol.lstat, f) self.assertRaises(OSError, self.vol.lstat, d) def test_walk_default(self): # Default: topdown=True, followlinks=False file_list = [] dir_list = [] for root, dirs, files in self.vol.walk(self.dir_path): for name in files: file_list.append(name) for name in dirs: dir_list.append(name) self.assertEqual(len(dir_list), 3) # 3 regular directories self.assertEqual(len(file_list), 7) # 5 regular files + 2 symlinks def test_walk_topdown_and_followinks(self): # topdown=True, followlinks=True file_list = [] dir_list = [] for root, dirs, files in self.vol.walk(self.dir_path, followlinks=True): for name in files: file_list.append(name) for name in dirs: dir_list.append(name) # 4 = 3 regular directories + # 1 symlink which is pointing to a directory self.assertEqual(len(dir_list), 4) # 8 = 5 regular files + # 1 symlink that points to a file + # 2 regular files listed again as they are in a directory which has # a symlink pointing to it. This results in that directory being # visited twice. self.assertEqual(len(file_list), 8) def test_walk_no_topdown_no_followlinks(self): # topdown=False, followlinks=False file_list = [] dir_list = [] for root, dirs, files in self.vol.walk(self.dir_path, topdown=False): for name in files: file_list.append(name) for name in dirs: dir_list.append(name) self.assertEqual(len(dir_list), 3) # 3 regular directories self.assertEqual(len(file_list), 7) # 5 regular files + 2 symlinks def test_walk_no_topdown_and_followlinks(self): # topdown=False, followlinks=True file_list = [] dir_list = [] for root, dirs, files in self.vol.walk(self.dir_path, topdown=False, followlinks=True): for name in files: file_list.append(name) for name in dirs: dir_list.append(name) # 4 = 3 regular directories + # 1 symlink which is pointing to a directory self.assertEqual(len(dir_list), 4) # 8 = 5 regular files + # 1 symlink that points to a file + # 2 regular files listed again as they are in a directory which has # a symlink pointing to it. This results in that directory being # visited twice. self.assertEqual(len(file_list), 8) def test_walk_error(self): # Test onerror handling # # onerror not set try: for root, dirs, files in self.vol.walk("non-existent-path"): pass except OSError: self.fail("No exception should be raised") # onerror method is set def handle_error(err): raise err try: for root, dirs, files in self.vol.walk("non-existent-path", onerror=handle_error): pass except OSError: pass else: self.fail("Expecting OSError exception") def _symlinks_cleanup(self): # rmtree() cannot remove these symlinks, hence removing manually. symlinks = ('test_symlink_dir', 'test_symlink_file') for name in symlinks: try: self.vol.unlink(os.path.join(self.dir_path, name)) except OSError as err: if err.errno != errno.ENOENT: raise def test_copy_tree(self): dest_path = self.dir_path + '_dest' # symlinks = False (contents pointed by symlinks are copied) self.vol.copytree(self.dir_path, dest_path, symlinks=False) file_list = [] dir_list = [] for root, dirs, files in self.vol.walk(dest_path): for name in files: fullpath = os.path.join(root, name) s = self.vol.lstat(fullpath) # Assert that there are no symlinks self.assertFalse(stat.S_ISLNK(s.st_mode)) file_list.append(name) for name in dirs: fullpath = os.path.join(root, name) s = self.vol.lstat(fullpath) # Assert that there are no symlinks self.assertFalse(stat.S_ISLNK(s.st_mode)) dir_list.append(name) self.assertEqual(len(dir_list), 4) # 4 regular directories self.assertEqual(len(file_list), 8) # 8 regular files # Cleanup self.vol.rmtree(dest_path) # symlinks = True (symlinks itself is copied as is) self.vol.copytree(self.dir_path, dest_path, symlinks=True) file_list = [] dir_list = [] for root, dirs, files in self.vol.walk(dest_path): for name in files: file_list.append(name) for name in dirs: dir_list.append(name) self.assertEqual(len(dir_list), 3) # 3 regular directories self.assertEqual(len(file_list), 7) # 5 regular files + 2 symlinks # Error - The destination directory must not exist self.assertRaises(OSError, self.vol.copytree, self.dir_path, dest_path) class TestVolumeInit(unittest.TestCase): def test_mount_umount_default(self): # Create volume object instance vol = Volume(HOST, VOLNAME) # Check attribute init self.assertEqual(vol.log_file, "/dev/null") self.assertEqual(vol.log_level, 7) self.assertEqual(vol.host, HOST) self.assertEqual(vol.volname, VOLNAME) self.assertEqual(vol.port, 24007) self.assertFalse(vol.mounted) # Mount the volume vol.mount() # Check mounted property self.assertTrue(vol.mounted) # Unmount the volume vol.umount() # Check mounted property again self.assertFalse(vol.mounted) # Do a double umount - should not crash or raise exception vol.umount() self.assertFalse(vol.mounted) # Do a double mount - should not crash or raise exception vol.mount() vol.mount() self.assertTrue(vol.mounted) # Unmount the volume vol.umount() self.assertFalse(vol.mounted) def test_mount_err(self): # Volume does not exist fake_volname = str(uuid4().hex)[:10] vol = Volume(HOST, fake_volname) self.assertRaises(LibgfapiException, vol.mount) self.assertFalse(vol.mounted) # Invalid host - glfs_set_volfile_server will fail fake_hostname = str(uuid4().hex)[:10] vol = Volume(fake_hostname, VOLNAME) self.assertRaises(LibgfapiException, vol.mount) self.assertFalse(vol.mounted) def test_set_logging(self): # Create volume object instance vol = Volume(HOST, VOLNAME) # Call set_logging before mount() log_file = "/tmp/%s" % (uuid4().hex) vol.set_logging(log_file, 7) # Mount the volume vol.mount() self.assertTrue(vol.mounted) self.assertEqual(vol.log_file, log_file) self.assertEqual(vol.log_level, 7) # Check that log has been created and exists self.assertTrue(os.path.exists(log_file)) # Change the logging after mounting log_file2 = "/tmp/%s" % (uuid4().hex) vol.set_logging(log_file2, 7) self.assertEqual(vol.log_file, log_file2) # Unmount the volume vol.umount() self.assertFalse(vol.mounted) def test_unix_socket_mount(self): if not os.access(GLUSTERD_SOCK_FILE, os.R_OK | os.W_OK): raise SkipTest("Unix socket file %s not accessible" % GLUSTERD_SOCK_FILE) vol = Volume(GLUSTERD_SOCK_FILE, VOLNAME, proto="unix") vol.mount() self.assertTrue(vol.mounted) vol.umount() self.assertFalse(vol.mounted) def test_get_volume_id(self): vol = Volume(HOST, VOLNAME) vol.mount() self.assertTrue(vol.mounted) self.assertTrue(vol.volid == None) volid = vol.get_volume_id() self.assertTrue(volid != None) try: volid = uuid.UUID(str(volid)) except ValueError: self.fail("Invalid UUID") self.assertTrue(vol.volid != None) vol.umount()