From 601884e6e861813e14f9064e2f135eb857d59b17 Mon Sep 17 00:00:00 2001 From: Prashanth Pai Date: Tue, 31 May 2016 18:59:56 +0530 Subject: Implement shutil.copy* methods and os.link() Change-Id: I2de796e7d53732c5a967c6194a43378171fcb3d6 Signed-off-by: Prashanth Pai --- test/functional/libgfapi-python-tests.py | 197 ++++++++++++++++++++++++++++++- 1 file changed, 196 insertions(+), 1 deletion(-) (limited to 'test') diff --git a/test/functional/libgfapi-python-tests.py b/test/functional/libgfapi-python-tests.py index 4e339e0..8c422e6 100644 --- a/test/functional/libgfapi-python-tests.py +++ b/test/functional/libgfapi-python-tests.py @@ -13,10 +13,11 @@ import unittest import os import types import errno +import hashlib import threading from gluster.gfapi import File, Volume -from gluster.exceptions import LibgfapiException +from gluster.exceptions import LibgfapiException, Error from test import get_test_config from ConfigParser import NoSectionError, NoOptionError from uuid import uuid4 @@ -600,6 +601,200 @@ class FileOpsTest(unittest.TestCase): with File(self.vol.open(file_name, os.O_RDONLY)) as f: self.assertRaises(TypeError, f.readinto, str("buf")) + def test_link(self): + name1 = uuid4().hex + self.vol.fopen(name1, 'w').close() + name2 = uuid4().hex + self.vol.link(name1, name2) + self.assertTrue(self.vol.samefile(name1, name2)) + self.assertEqual(self.vol.stat(name1).st_nlink, 2) + self.assertEqual(self.vol.stat(name2).st_nlink, 2) + + # source does not exist + self.assertRaises(OSError, self.vol.link, 'nonexistent file', 'link') + # target already exists + self.assertRaises(OSError, self.vol.link, name1, name2) + + def test_copyfileobj(self): + # Create source file. + src_file = uuid4().hex + with self.vol.fopen(src_file, 'wb') as f: + for i in xrange(2): + f.write(os.urandom(128 * 1024)) + f.write(os.urandom(25 * 1024)) + # Change/set atime and mtime + (atime, mtime) = (692884800, 692884800) + self.vol.utime(src_file, (atime, mtime)) + + # Calculate checksum of source file. + src_file_checksum = hashlib.md5() + with self.vol.fopen(src_file, 'rb') as f: + src_file_checksum.update(f.read(32 * 1024)) + + # Copy file + dest_file = uuid4().hex + with self.vol.fopen(src_file, 'rb') as fsrc: + with self.vol.fopen(dest_file, 'wb') as fdst: + self.vol.copyfileobj(fsrc, fdst) + + # Calculate checksum of destination + dest_file_checksum = hashlib.md5() + with self.vol.fopen(dest_file, 'rb') as f: + dest_file_checksum.update(f.read(32 * 1024)) + + self.assertEqual(src_file_checksum.hexdigest(), + dest_file_checksum.hexdigest()) + + # Copy file with different buffer size + self.vol.unlink(dest_file) + with self.vol.fopen(src_file, 'rb') as fsrc: + with self.vol.fopen(dest_file, 'wb') as fdst: + self.vol.copyfileobj(fsrc, fdst, 32 * 1024) + + # Calculate checksum of destination + dest_file_checksum = hashlib.md5() + with self.vol.fopen(dest_file, 'rb') as f: + dest_file_checksum.update(f.read(32 * 1024)) + + self.assertEqual(src_file_checksum.hexdigest(), + dest_file_checksum.hexdigest()) + + # The destination file should not have same atime and mtime + src_stat = self.vol.stat(src_file) + dest_stat = self.vol.stat(dest_file) + self.assertNotEqual(src_stat.st_atime, dest_stat.st_atime) + self.assertNotEqual(src_stat.st_mtime, dest_stat.st_mtime) + + # Test over-writing destination that exists + dest_file = uuid4().hex + with self.vol.fopen(dest_file, 'w') as f: + data = "A boy wants this test to not fail." + f.write(data) + with self.vol.fopen(src_file, 'rb') as fsrc: + with self.vol.fopen(dest_file, 'wb') as fdst: + self.vol.copyfileobj(fsrc, fdst) + self.assertNotEqual(self.vol.stat(src_file).st_size, len(data)) + + # Test one of the file object is closed + f1 = self.vol.fopen(src_file, 'rb') + f1.close() + f2 = self.vol.fopen(dest_file, 'wb') + self.assertRaises(OSError, self.vol.copyfileobj, f1, f2) + f2.close() + + def test_copyfile_samefile(self): + # Source and destination same error + name = uuid4().hex + self.vol.fopen(name, 'w').close() + self.assertRaises(Error, self.vol.copyfile, name, name) + # Harlink test + name2 = uuid4().hex + self.vol.link(name, name2) + self.assertRaises(Error, self.vol.copyfile, name, name2) + + def test_copymode(self): + src_file = uuid4().hex + self.vol.fopen(src_file, 'w').close() + self.vol.chmod(src_file, 0644) + + dest_file = uuid4().hex + self.vol.fopen(dest_file, 'w').close() + self.vol.chmod(dest_file, 0640) + + self.vol.copymode(src_file, dest_file) + self.assertEqual(self.vol.stat(src_file).st_mode, + self.vol.stat(dest_file).st_mode) + + def test_copystat(self): + # Create source file and set mode, atime, mtime + src_file = uuid4().hex + self.vol.fopen(src_file, 'w').close() + self.vol.chmod(src_file, 0640) + (atime, mtime) = (692884800, 692884800) + self.vol.utime(src_file, (atime, mtime)) + + # Create destination file + dest_file = uuid4().hex + self.vol.fopen(dest_file, 'w').close() + + # Invoke copystat() + self.vol.copystat(src_file, dest_file) + + # Verify + src_stat = self.vol.stat(src_file) + dest_stat = self.vol.stat(dest_file) + self.assertEqual(src_stat.st_mode, dest_stat.st_mode) + self.assertEqual(src_stat.st_atime, dest_stat.st_atime) + self.assertEqual(src_stat.st_mtime, dest_stat.st_mtime) + + def test_copy(self): + # Create source file. + src_file = uuid4().hex + with self.vol.fopen(src_file, 'wb') as f: + for i in xrange(2): + f.write(os.urandom(128 * 1024)) + f.write(os.urandom(25 * 1024)) + + # Calculate checksum of source file. + src_file_checksum = hashlib.md5() + with self.vol.fopen(src_file, 'rb') as f: + src_file_checksum.update(f.read(32 * 1024)) + + # Copy file into dir + dest_dir = uuid4().hex + self.vol.mkdir(dest_dir) + self.vol.copy(src_file, dest_dir) + + # Calculate checksum of destination + dest_file = os.path.join(dest_dir, src_file) + dest_file_checksum = hashlib.md5() + with self.vol.fopen(dest_file, 'rb') as f: + dest_file_checksum.update(f.read(32 * 1024)) + + # verify data + self.assertEqual(src_file_checksum.hexdigest(), + dest_file_checksum.hexdigest()) + + # verify mode + src_stat = self.vol.stat(src_file) + dest_stat = self.vol.stat(dest_file) + self.assertEqual(src_stat.st_mode, dest_stat.st_mode) + + def test_copy2(self): + # Create source file. + src_file = uuid4().hex + with self.vol.fopen(src_file, 'wb') as f: + for i in xrange(2): + f.write(os.urandom(128 * 1024)) + f.write(os.urandom(25 * 1024)) + + # Calculate checksum of source file. + src_file_checksum = hashlib.md5() + with self.vol.fopen(src_file, 'rb') as f: + src_file_checksum.update(f.read(32 * 1024)) + + # Copy file into dir + dest_dir = uuid4().hex + self.vol.mkdir(dest_dir) + self.vol.copy(src_file, dest_dir) + + # Calculate checksum of destination + dest_file = os.path.join(dest_dir, src_file) + dest_file_checksum = hashlib.md5() + with self.vol.fopen(dest_file, 'rb') as f: + dest_file_checksum.update(f.read(32 * 1024)) + + # verify data + self.assertEqual(src_file_checksum.hexdigest(), + dest_file_checksum.hexdigest()) + + # verify mode and stat + src_stat = self.vol.stat(src_file) + dest_stat = self.vol.stat(dest_file) + self.assertEqual(src_stat.st_mode, dest_stat.st_mode) + self.assertEqual(src_stat.st_atime, dest_stat.st_atime) + self.assertEqual(src_stat.st_mtime, dest_stat.st_mtime) + class DirOpsTest(unittest.TestCase): -- cgit