From 286a1308db72c5cfdd6ce16aff3f291ebce257c2 Mon Sep 17 00:00:00 2001 From: Peter Portante Date: Thu, 24 Oct 2013 16:15:25 -0400 Subject: Rebase to OpenStack Swift Havana (1.10.0) Change-Id: I90821230a1a7100c74d97cccc9c445251d0f65e7 Signed-off-by: Peter Portante Reviewed-on: http://review.gluster.org/6157 Reviewed-by: Luis Pabon Tested-by: Luis Pabon --- gluster/swift/obj/diskfile.py | 470 ++++++++++++++++++++++++------------------ 1 file changed, 265 insertions(+), 205 deletions(-) (limited to 'gluster/swift/obj/diskfile.py') diff --git a/gluster/swift/obj/diskfile.py b/gluster/swift/obj/diskfile.py index 26852b1..0e0abef 100644 --- a/gluster/swift/obj/diskfile.py +++ b/gluster/swift/obj/diskfile.py @@ -32,10 +32,12 @@ from swift.common.utils import TRUE_VALUES, drop_buffer_cache, ThreadPool from swift.common.exceptions import DiskFileNotExist, DiskFileError, \ DiskFileNoSpace, DiskFileDeviceUnavailable -from gluster.swift.common.exceptions import GlusterFileSystemOSError +from gluster.swift.common.exceptions import GlusterFileSystemOSError, \ + GlusterFileSystemIOError from gluster.swift.common.Glusterfs import mount from gluster.swift.common.fs_utils import do_fstat, do_open, do_close, \ - do_unlink, do_chown, os_path, do_fsync, do_fchown, do_stat, Fake_file + do_unlink, do_chown, os_path, do_fsync, do_fchown, do_stat, do_write, \ + do_fdatasync, do_rename, Fake_file from gluster.swift.common.utils import read_metadata, write_metadata, \ validate_object, create_object_metadata, rmobjdir, dir_is_object, \ get_object_metadata @@ -45,7 +47,6 @@ from gluster.swift.common.utils import X_CONTENT_LENGTH, X_CONTENT_TYPE, \ from ConfigParser import ConfigParser, NoSectionError, NoOptionError from swift.obj.diskfile import DiskFile as SwiftDiskFile -from swift.obj.diskfile import DiskWriter as SwiftDiskWriter # FIXME: Hopefully we'll be able to move to Python 2.7+ where O_CLOEXEC will # be back ported. See http://www.python.org/dev/peps/pep-0433/ @@ -278,7 +279,7 @@ def _adjust_metadata(metadata): return metadata -class DiskWriter(SwiftDiskWriter): +class DiskWriter(object): """ Encapsulation of the write context for servicing PUT REST API requests. Serves as the context manager object for DiskFile's writer() @@ -286,6 +287,126 @@ class DiskWriter(SwiftDiskWriter): We just override the put() method for Gluster. """ + def __init__(self, disk_file, fd, tmppath, threadpool): + self.disk_file = disk_file + self.fd = fd + self.tmppath = tmppath + self.upload_size = 0 + self.last_sync = 0 + self.threadpool = threadpool + + def write(self, chunk): + """ + Write a chunk of data into the temporary file. + + :param chunk: the chunk of data to write as a string object + """ + + def _write_entire_chunk(chunk): + while chunk: + written = do_write(self.fd, chunk) + self.upload_size += written + chunk = chunk[written:] + + self.threadpool.run_in_thread(_write_entire_chunk, chunk) + + # For large files sync every 512MB (by default) written + diff = self.upload_size - self.last_sync + if diff >= self.disk_file.bytes_per_sync: + self.threadpool.force_run_in_thread(do_fdatasync, self.fd) + drop_buffer_cache(self.fd, self.last_sync, diff) + self.last_sync = self.upload_size + + def _finalize_put(self, metadata): + # Write out metadata before fsync() to ensure it is also forced to + # disk. + write_metadata(self.fd, metadata) + + # We call fsync() before calling drop_cache() to lower the + # amount of redundant work the drop cache code will perform on + # the pages (now that after fsync the pages will be all + # clean). + do_fsync(self.fd) + + # From the Department of the Redundancy Department, make sure + # we call drop_cache() after fsync() to avoid redundant work + # (pages all clean). + drop_buffer_cache(self.fd, 0, self.upload_size) + + # At this point we know that the object's full directory path + # exists, so we can just rename it directly without using Swift's + # swift.common.utils.renamer(), which makes the directory path and + # adds extra stat() calls. + df = self.disk_file + data_file = os.path.join(df.put_datadir, df._obj) + attempts = 1 + while True: + try: + do_rename(self.tmppath, data_file) + except OSError as err: + if err.errno in (errno.ENOENT, errno.EIO) \ + and attempts < MAX_RENAME_ATTEMPTS: + # FIXME: Why either of these two error conditions is + # happening is unknown at this point. This might be a + # FUSE issue of some sort or a possible race + # condition. So let's sleep on it, and double check + # the environment after a good nap. + _random_sleep() + # Tease out why this error occurred. The man page for + # rename reads: + # "The link named by tmppath does not exist; or, a + # directory component in data_file does not exist; + # or, tmppath or data_file is an empty string." + assert len(self.tmppath) > 0 and len(data_file) > 0 + tpstats = do_stat(self.tmppath) + tfstats = do_fstat(self.fd) + assert tfstats + if not tpstats or tfstats.st_ino != tpstats.st_ino: + # Temporary file name conflict + raise DiskFileError( + 'DiskFile.put(): temporary file, %s, was' + ' already renamed (targeted for %s)' % ( + self.tmppath, data_file)) + else: + # Data file target name now has a bad path! + dfstats = do_stat(df.put_datadir) + if not dfstats: + raise DiskFileError( + 'DiskFile.put(): path to object, %s, no' + ' longer exists (targeted for %s)' % ( + df.put_datadir, + data_file)) + else: + is_dir = stat.S_ISDIR(dfstats.st_mode) + if not is_dir: + raise DiskFileError( + 'DiskFile.put(): path to object, %s,' + ' no longer a directory (targeted for' + ' %s)' % (df.put_datadir, + data_file)) + else: + # Let's retry since everything looks okay + logging.warn( + "DiskFile.put(): os.rename('%s','%s')" + " initially failed (%s) but a" + " stat('%s') following that succeeded:" + " %r" % ( + self.tmppath, data_file, + str(err), df.put_datadir, + dfstats)) + attempts += 1 + continue + else: + raise GlusterFileSystemOSError( + err.errno, "%s, os.rename('%s', '%s')" % ( + err.strerror, self.tmppath, data_file)) + else: + # Success! + break + # Close here so the calling context does not have to perform this + # in a thread. + do_close(self.fd) + def put(self, metadata, extension='.data'): """ Finalize writing the file on disk, and renames it from the temp file @@ -306,120 +427,34 @@ class DiskWriter(SwiftDiskWriter): if not df.data_file: # Does not exist, create it data_file = os.path.join(df._obj_path, df._obj) - _, df.metadata = self.threadpool.force_run_in_thread( + _, df._metadata = self.threadpool.force_run_in_thread( df._create_dir_object, data_file, metadata) df.data_file = os.path.join(df._container_path, data_file) - elif not df.is_dir: + elif not df._is_dir: # Exists, but as a file raise DiskFileError('DiskFile.put(): directory creation failed' ' since the target, %s, already exists as' ' a file' % df.data_file) return - if df._is_dir: - # A pre-existing directory already exists on the file - # system, perhaps gratuitously created when another - # object was created, or created externally to Swift - # REST API servicing (UFO use case). - raise DiskFileError('DiskFile.put(): file creation failed since' - ' the target, %s, already exists as a' - ' directory' % df.data_file) - - def finalize_put(): - # Write out metadata before fsync() to ensure it is also forced to - # disk. - write_metadata(self.fd, metadata) - - # We call fsync() before calling drop_cache() to lower the - # amount of redundant work the drop cache code will perform on - # the pages (now that after fsync the pages will be all - # clean). - do_fsync(self.fd) - # From the Department of the Redundancy Department, make sure - # we call drop_cache() after fsync() to avoid redundant work - # (pages all clean). - drop_buffer_cache(self.fd, 0, self.upload_size) - - # At this point we know that the object's full directory path - # exists, so we can just rename it directly without using Swift's - # swift.common.utils.renamer(), which makes the directory path and - # adds extra stat() calls. - data_file = os.path.join(df.put_datadir, df._obj) - attempts = 1 - while True: - try: - os.rename(self.tmppath, data_file) - except OSError as err: - if err.errno in (errno.ENOENT, errno.EIO) \ - and attempts < MAX_RENAME_ATTEMPTS: - # FIXME: Why either of these two error conditions is - # happening is unknown at this point. This might be a - # FUSE issue of some sort or a possible race - # condition. So let's sleep on it, and double check - # the environment after a good nap. - _random_sleep() - # Tease out why this error occurred. The man page for - # rename reads: - # "The link named by tmppath does not exist; or, a - # directory component in data_file does not exist; - # or, tmppath or data_file is an empty string." - assert len(self.tmppath) > 0 and len(data_file) > 0 - tpstats = do_stat(self.tmppath) - tfstats = do_fstat(self.fd) - assert tfstats - if not tpstats or tfstats.st_ino != tpstats.st_ino: - # Temporary file name conflict - raise DiskFileError( - 'DiskFile.put(): temporary file, %s, was' - ' already renamed (targeted for %s)' % ( - self.tmppath, data_file)) - else: - # Data file target name now has a bad path! - dfstats = do_stat(df.put_datadir) - if not dfstats: - raise DiskFileError( - 'DiskFile.put(): path to object, %s, no' - ' longer exists (targeted for %s)' % ( - df.put_datadir, - data_file)) - else: - is_dir = stat.S_ISDIR(dfstats.st_mode) - if not is_dir: - raise DiskFileError( - 'DiskFile.put(): path to object, %s,' - ' no longer a directory (targeted for' - ' %s)' % (df.put_datadir, - data_file)) - else: - # Let's retry since everything looks okay - logging.warn( - "DiskFile.put(): os.rename('%s','%s')" - " initially failed (%s) but a" - " stat('%s') following that succeeded:" - " %r" % ( - self.tmppath, data_file, - str(err), df.put_datadir, - dfstats)) - attempts += 1 - continue - else: - raise GlusterFileSystemOSError( - err.errno, "%s, os.rename('%s', '%s')" % ( - err.strerror, self.tmppath, data_file)) - else: - # Success! - break - # Close here so the calling context does not have to perform this - # in a thread. - do_close(self.fd) - - self.threadpool.force_run_in_thread(finalize_put) + try: + self.threadpool.force_run_in_thread(self._finalize_put, metadata) + except GlusterFileSystemOSError as err: + if err.errno == errno.EISDIR: + # A pre-existing directory already exists on the file + # system, perhaps gratuitously created when another + # object was created, or created externally to Swift + # REST API servicing (UFO use case). + raise DiskFileError('DiskFile.put(): file creation failed' + ' since the target, %s, already exists as' + ' a directory' % df.data_file) + raise # Avoid the unlink() system call as part of the mkstemp context # cleanup self.tmppath = None - df.metadata = metadata + df._metadata = metadata df._filter_metadata() # Mark that it actually exists now @@ -443,7 +478,6 @@ class DiskFile(SwiftDiskFile): :param container: container name for the object :param obj: object name for the object :param logger: logger object for writing out log file messages - :param keep_data_fp: if True, don't close the fp, otherwise close it :param disk_chunk_Size: size of chunks on file reads :param bytes_per_sync: number of bytes between fdatasync calls :param iter_hook: called when __iter__ returns a chunk @@ -456,18 +490,15 @@ class DiskFile(SwiftDiskFile): """ def __init__(self, path, device, partition, account, container, obj, - logger, keep_data_fp=False, - disk_chunk_size=DEFAULT_DISK_CHUNK_SIZE, + logger, disk_chunk_size=DEFAULT_DISK_CHUNK_SIZE, bytes_per_sync=DEFAULT_BYTES_PER_SYNC, iter_hook=None, threadpool=None, obj_dir='objects', mount_check=False, - disallowed_metadata_keys=None, uid=DEFAULT_UID, - gid=DEFAULT_GID): + uid=DEFAULT_UID, gid=DEFAULT_GID): if mount_check and not mount(path, device): raise DiskFileDeviceUnavailable() self.disk_chunk_size = disk_chunk_size self.bytes_per_sync = bytes_per_sync self.iter_hook = iter_hook - self.threadpool = threadpool or ThreadPool(nthreads=0) obj = obj.strip(os.path.sep) if os.path.sep in obj: @@ -491,59 +522,78 @@ class DiskFile(SwiftDiskFile): self.put_datadir = self.datadir self._is_dir = False self.logger = logger - self.metadata = {} - self.meta_file = None + self._metadata = None + # Don't store a value for data_file until we know it exists. + self.data_file = None + self._data_file_size = None self.fp = None self.iter_etag = None self.started_at_0 = False self.read_to_eof = False self.quarantined_dir = None + self.suppress_file_closing = False + self._verify_close = False + self.threadpool = threadpool or ThreadPool(nthreads=0) + # FIXME(portante): this attribute is set after open and affects the + # behavior of the class (i.e. public interface) self.keep_cache = False self.uid = int(uid) self.gid = int(gid) - self.suppress_file_closing = False - # Don't store a value for data_file until we know it exists. - self.data_file = None - data_file = os.path.join(self.put_datadir, self._obj) + def open(self, verify_close=False): + """ + Open the file and read the metadata. + + This method must populate the _metadata attribute. + :param verify_close: force implicit close to verify_file, no effect on + explicit close. + + :raises DiskFileCollision: on md5 collision + """ + data_file = os.path.join(self.put_datadir, self._obj) try: - stats = do_stat(data_file) - except OSError as err: - if err.errno == errno.ENOTDIR: - return + fd = do_open(data_file, os.O_RDONLY | os.O_EXCL) + except GlusterFileSystemOSError as err: + self.logger.exception( + "Error opening file, %s :: %s", data_file, err) else: - if not stats: - return - - self.data_file = data_file - self._is_dir = stat.S_ISDIR(stats.st_mode) - - self.metadata = read_metadata(data_file) - if not self.metadata: - create_object_metadata(data_file) - self.metadata = read_metadata(data_file) - - if not validate_object(self.metadata): - create_object_metadata(data_file) - self.metadata = read_metadata(data_file) - - self._filter_metadata() - - if keep_data_fp: - if not self._is_dir: - # The caller has an assumption that the "fp" field of this - # object is an file object if keep_data_fp is set. However, - # this implementation of the DiskFile object does not need to - # open the file for internal operations. So if the caller - # requests it, we'll just open the file for them. - self.fp = do_open(data_file, 'rb') + try: + stats = do_fstat(fd) + except GlusterFileSystemOSError as err: + self.logger.exception( + "Error stat'ing open file, %s :: %s", data_file, err) else: - self.fp = Fake_file(data_file) + self._is_dir = stat.S_ISDIR(stats.st_mode) + + self.data_file = data_file - def drop_cache(self, fd, offset, length): + self._metadata = read_metadata(fd) + if not self._metadata: + create_object_metadata(fd) + self._metadata = read_metadata(fd) + + if not validate_object(self._metadata): + create_object_metadata(fd) + self._metadata = read_metadata(fd) + + self._filter_metadata() + + if self._is_dir: + # Use a fake file handle to satisfy the super class's + # __iter__ method requirement when dealing with + # directories as objects. + os.close(fd) + self.fp = Fake_file(data_file) + else: + self.fp = os.fdopen(fd, 'rb') + self._verify_close = verify_close + self._metadata = self._metadata or {} + return self + + def _drop_cache(self, fd, offset, length): if fd >= 0: - super(DiskFile, self).drop_cache(fd, offset, length) + super(DiskFile, self)._drop_cache(fd, offset, length) def close(self, verify_file=True): """ @@ -555,12 +605,17 @@ class DiskFile(SwiftDiskFile): if self.fp: do_close(self.fp) self.fp = None + self._metadata = None + self._data_file_size = None + self._verify_close = False def _filter_metadata(self): - if X_TYPE in self.metadata: - self.metadata.pop(X_TYPE) - if X_OBJECT_TYPE in self.metadata: - self.metadata.pop(X_OBJECT_TYPE) + if self._metadata is None: + return + if X_TYPE in self._metadata: + self._metadata.pop(X_TYPE) + if X_OBJECT_TYPE in self._metadata: + self._metadata.pop(X_OBJECT_TYPE) def _create_dir_object(self, dir_path, metadata=None): """ @@ -619,7 +674,7 @@ class DiskFile(SwiftDiskFile): return True, newmd @contextmanager - def writer(self, size=None): + def create(self, size=None): """ Contextmanager to make a temporary file, optionally of a specified initial size. @@ -721,63 +776,68 @@ class DiskFile(SwiftDiskFile): if tombstone: # We don't write tombstone files. So do nothing. return - assert self.data_file is not None, \ - "put_metadata: no file to put metadata into" metadata = _adjust_metadata(metadata) - self.threadpool.run_in_thread(write_metadata, self.data_file, metadata) - self.metadata = metadata - self._filter_metadata() + data_file = os.path.join(self.put_datadir, self._obj) + self.threadpool.run_in_thread(write_metadata, data_file, metadata) - def unlinkold(self, timestamp): + def _delete(self): + if self._is_dir: + # Marker, or object, directory. + # + # Delete from the filesystem only if it contains no objects. + # If it does contain objects, then just remove the object + # metadata tag which will make this directory a + # fake-filesystem-only directory and will be deleted when the + # container or parent directory is deleted. + metadata = read_metadata(self.data_file) + if dir_is_object(metadata): + metadata[X_OBJECT_TYPE] = DIR_NON_OBJECT + write_metadata(self.data_file, metadata) + rmobjdir(self.data_file) + else: + # Delete file object + do_unlink(self.data_file) + + # Garbage collection of non-object directories. Now that we + # deleted the file, determine if the current directory and any + # parent directory may be deleted. + dirname = os.path.dirname(self.data_file) + while dirname and dirname != self._container_path: + # Try to remove any directories that are not objects. + if not rmobjdir(dirname): + # If a directory with objects has been found, we can stop + # garabe collection + break + else: + dirname = os.path.dirname(dirname) + + def delete(self, timestamp): """ Remove any older versions of the object file. Any file that has an older timestamp than timestamp will be deleted. :param timestamp: timestamp to compare with each file """ - if not self.metadata or self.metadata[X_TIMESTAMP] >= timestamp: - return - - assert self.data_file, \ - "Have metadata, %r, but no data_file" % self.metadata - - def _unlinkold(): - if self._is_dir: - # Marker, or object, directory. - # - # Delete from the filesystem only if it contains no objects. - # If it does contain objects, then just remove the object - # metadata tag which will make this directory a - # fake-filesystem-only directory and will be deleted when the - # container or parent directory is deleted. - metadata = read_metadata(self.data_file) - if dir_is_object(metadata): - metadata[X_OBJECT_TYPE] = DIR_NON_OBJECT - write_metadata(self.data_file, metadata) - rmobjdir(self.data_file) - else: - # Delete file object - do_unlink(self.data_file) - - # Garbage collection of non-object directories. Now that we - # deleted the file, determine if the current directory and any - # parent directory may be deleted. - dirname = os.path.dirname(self.data_file) - while dirname and dirname != self._container_path: - # Try to remove any directories that are not objects. - if not rmobjdir(dirname): - # If a directory with objects has been found, we can stop - # garabe collection - break - else: - dirname = os.path.dirname(dirname) - - self.threadpool.run_in_thread(_unlinkold) - - self.metadata = {} + timestamp_fl = float(timestamp) + data_file = os.path.join(self.put_datadir, self._obj) + try: + metadata = read_metadata(data_file) + except (GlusterFileSystemIOError, GlusterFileSystemOSError) as err: + if err.errno != errno.ENOENT: + raise + else: + try: + old_ts = float(metadata[X_TIMESTAMP]) >= timestamp_fl + except (KeyError, ValueError): + # If no X-Timestamp to compare against, or the timestamp is + # not a valid float, we'll just delete the object anyways. + old_ts = False + if not old_ts: + self.threadpool.run_in_thread(self._delete) + self._metadata = {} self.data_file = None - def get_data_file_size(self): + def _get_data_file_size(self): """ Returns the os_path.getsize for the file. Raises an exception if this file does not match the Content-Length stored in the metadata, or if @@ -795,12 +855,12 @@ class DiskFile(SwiftDiskFile): if self.data_file: def _old_getsize(): file_size = os_path.getsize(self.data_file) - if X_CONTENT_LENGTH in self.metadata: - metadata_size = int(self.metadata[X_CONTENT_LENGTH]) + if X_CONTENT_LENGTH in self._metadata: + metadata_size = int(self._metadata[X_CONTENT_LENGTH]) if file_size != metadata_size: # FIXME - bit rot detection? - self.metadata[X_CONTENT_LENGTH] = file_size - write_metadata(self.data_file, self.metadata) + self._metadata[X_CONTENT_LENGTH] = file_size + write_metadata(self.data_file, self._metadata) return file_size file_size = self.threadpool.run_in_thread(_old_getsize) return file_size -- cgit