From 100d6b01bd40d8b01335e5ecd4a592df79e75b63 Mon Sep 17 00:00:00 2001 From: Peter Portante Date: Fri, 20 Sep 2013 11:25:04 -0400 Subject: Rebase to lastest OpenStack Swift DiskFile API As of October 28th, 2013, we rebase to OpenStack Swift master (commit 4bfe674) to pick up the lastest officially supported DiskFile API changes. We use a snapshot of OpenStack Swift stored in the gluster-swift launchpad downloads area so that we can deliberately rebase at our own pace. With this refactoring, all the code for handling I/O is wholly contained in the swift tree for object operations. This will allow us to use a different fs_utils implementation in the future (for example, one based on a yet-to-be-implemented python bindings over libgfapi). This also means the "Fake_file" class has been removed entirely. Change-Id: I767983f88c59786e30b6c64da16d1cb6ab3c3e7f Signed-off-by: Peter Portante Reviewed-on: http://review.gluster.org/5993 Reviewed-by: Luis Pabon Tested-by: Luis Pabon --- gluster/swift/common/fs_utils.py | 99 +++--- gluster/swift/common/utils.py | 62 +--- gluster/swift/obj/diskfile.py | 751 ++++++++++++++++++++++++--------------- gluster/swift/obj/server.py | 59 +-- gluster/swift/proxy/server.py | 2 +- 5 files changed, 575 insertions(+), 398 deletions(-) (limited to 'gluster/swift') diff --git a/gluster/swift/common/fs_utils.py b/gluster/swift/common/fs_utils.py index 8b26fd0..199984a 100644 --- a/gluster/swift/common/fs_utils.py +++ b/gluster/swift/common/fs_utils.py @@ -18,42 +18,39 @@ import os import errno import stat import random -import os.path as os_path # noqa +import ctypes +import os.path as _os_path from eventlet import sleep +from swift.common.utils import load_libc_function from gluster.swift.common.exceptions import FileOrDirNotFoundError, \ - NotDirectoryError, GlusterFileSystemOSError, GlusterFileSystemIOError + NotDirectoryError, GlusterFileSystemOSError -class Fake_file(object): - def __init__(self, path): - self.path = path - - def tell(self): - return 0 - - def read(self, count): - return None - - def fileno(self): - return -1 - - def close(self): - pass +os_path = _os_path def do_walk(*args, **kwargs): return os.walk(*args, **kwargs) -def do_write(fd, msg): +def do_write(fd, buf): try: - cnt = os.write(fd, msg) + cnt = os.write(fd, buf) except OSError as err: raise GlusterFileSystemOSError( err.errno, '%s, os.write("%s", ...)' % (err.strerror, fd)) return cnt +def do_read(fd, n): + try: + buf = os.read(fd, n) + except OSError as err: + raise GlusterFileSystemOSError( + err.errno, '%s, os.write("%s", ...)' % (err.strerror, fd)) + return buf + + def do_ismount(path): """ Test whether a path is a mount point. @@ -203,37 +200,21 @@ def do_fstat(fd): def do_open(path, flags, **kwargs): - if isinstance(flags, int): - try: - fd = os.open(path, flags, **kwargs) - except OSError as err: - raise GlusterFileSystemOSError( - err.errno, '%s, os.open("%s", %x, %r)' % ( - err.strerror, path, flags, kwargs)) - return fd - else: - try: - fp = open(path, flags, **kwargs) - except IOError as err: - raise GlusterFileSystemIOError( - err.errno, '%s, open("%s", %s, %r)' % ( - err.strerror, path, flags, kwargs)) - return fp + try: + fd = os.open(path, flags, **kwargs) + except OSError as err: + raise GlusterFileSystemOSError( + err.errno, '%s, os.open("%s", %x, %r)' % ( + err.strerror, path, flags, kwargs)) + return fd def do_close(fd): - if isinstance(fd, file) or isinstance(fd, Fake_file): - try: - fd.close() - except IOError as err: - raise GlusterFileSystemIOError( - err.errno, '%s, os.close(%s)' % (err.strerror, fd)) - else: - try: - os.close(fd) - except OSError as err: - raise GlusterFileSystemOSError( - err.errno, '%s, os.close(%s)' % (err.strerror, fd)) + try: + os.close(fd) + except OSError as err: + raise GlusterFileSystemOSError( + err.errno, '%s, os.close(%s)' % (err.strerror, fd)) def do_unlink(path, log=True): @@ -268,9 +249,31 @@ def do_fsync(fd): def do_fdatasync(fd): try: os.fdatasync(fd) + except AttributeError: + do_fsync(fd) except OSError as err: raise GlusterFileSystemOSError( - err.errno, '%s, os.fdatasync("%s")' % (err.strerror, fd)) + err.errno, '%s, os.fsync("%s")' % (err.strerror, fd)) + + +_posix_fadvise = None + + +def do_fadvise64(fd, offset, length): + global _posix_fadvise + if _posix_fadvise is None: + _posix_fadvise = load_libc_function('posix_fadvise64') + # 4 means "POSIX_FADV_DONTNEED" + _posix_fadvise(fd, ctypes.c_uint64(offset), + ctypes.c_uint64(length), 4) + + +def do_lseek(fd, pos, how): + try: + os.lseek(fd, pos, how) + except OSError as err: + raise GlusterFileSystemOSError( + err.errno, '%s, os.fsync("%s")' % (err.strerror, fd)) def mkdirs(path): diff --git a/gluster/swift/common/utils.py b/gluster/swift/common/utils.py index 5d4b6a4..6773836 100644 --- a/gluster/swift/common/utils.py +++ b/gluster/swift/common/utils.py @@ -17,15 +17,13 @@ import os import stat import errno import xattr -import random import logging from hashlib import md5 from eventlet import sleep import cPickle as pickle -from swift.common.utils import normalize_timestamp from gluster.swift.common.exceptions import GlusterFileSystemIOError -from gluster.swift.common.fs_utils import do_rename, do_fsync, os_path, \ - do_stat, do_fstat, do_listdir, do_walk, do_rmdir +from gluster.swift.common.fs_utils import os_path, do_stat, do_listdir, \ + do_walk, do_rmdir, do_fstat from gluster.swift.common import Glusterfs X_CONTENT_TYPE = 'Content-Type' @@ -56,6 +54,21 @@ PICKLE_PROTOCOL = 2 CHUNK_SIZE = 65536 +def normalize_timestamp(timestamp): + """ + Format a timestamp (string or numeric) into a standardized + xxxxxxxxxx.xxxxx (10.5) format. + + Note that timestamps using values greater than or equal to November 20th, + 2286 at 17:46 UTC will use 11 digits to represent the number of + seconds. + + :param timestamp: unix timestamp + :returns: normalized timestamp as a string + """ + return "%016.05f" % (float(timestamp)) + + def read_metadata(path_or_fd): """ Helper function to read the pickled metadata from a File/Directory. @@ -207,7 +220,6 @@ def validate_account(metadata): def validate_object(metadata): if not metadata: - logging.warn('validate_object: No metadata') return False if X_TIMESTAMP not in metadata.keys() or \ @@ -451,38 +463,6 @@ def create_account_metadata(acc_path): return rmd -def write_pickle(obj, dest, tmp=None, pickle_protocol=0): - """ - Ensure that a pickle file gets written to disk. The file is first written - to a tmp file location in the destination directory path, ensured it is - synced to disk, then moved to its final destination name. - - This version takes advantage of Gluster's dot-prefix-dot-suffix naming - where the a file named ".thefile.name.9a7aasv" is hashed to the same - Gluster node as "thefile.name". This ensures the renaming of a temp file - once written does not move it to another Gluster node. - - :param obj: python object to be pickled - :param dest: path of final destination file - :param tmp: path to tmp to use, defaults to None (ignored) - :param pickle_protocol: protocol to pickle the obj with, defaults to 0 - """ - dirname = os.path.dirname(dest) - basename = os.path.basename(dest) - tmpname = '.' + basename + '.' + \ - md5(basename + str(random.random())).hexdigest() - tmppath = os.path.join(dirname, tmpname) - with open(tmppath, 'wb') as fo: - pickle.dump(obj, fo, pickle_protocol) - # TODO: This flush() method call turns into a flush() system call - # We'll need to wrap this as well, but we would do this by writing - #a context manager for our own open() method which returns an object - # in fo which makes the gluster API call. - fo.flush() - do_fsync(fo) - do_rename(tmppath, dest) - - # The following dir_xxx calls should definitely be replaced # with a Metadata class to encapsulate their implementation. # :FIXME: For now we have them as functions, but we should @@ -557,11 +537,3 @@ def rmobjdir(dir_path): raise else: return True - - -# Over-ride Swift's utils.write_pickle with ours -# -# FIXME: Is this even invoked anymore given we don't perform container or -# account updates? -import swift.common.utils -swift.common.utils.write_pickle = write_pickle diff --git a/gluster/swift/obj/diskfile.py b/gluster/swift/obj/diskfile.py index 0e0abef..b09179d 100644 --- a/gluster/swift/obj/diskfile.py +++ b/gluster/swift/obj/diskfile.py @@ -23,37 +23,37 @@ try: except ImportError: import random import logging +from collections import defaultdict from socket import gethostname from hashlib import md5 from eventlet import sleep from greenlet import getcurrent from contextlib import contextmanager -from swift.common.utils import TRUE_VALUES, drop_buffer_cache, ThreadPool +from swift.common.utils import TRUE_VALUES, ThreadPool, config_true_value from swift.common.exceptions import DiskFileNotExist, DiskFileError, \ - DiskFileNoSpace, DiskFileDeviceUnavailable + DiskFileNoSpace, DiskFileDeviceUnavailable, DiskFileNotOpen +from swift.common.swob import multi_range_iterator -from gluster.swift.common.exceptions import GlusterFileSystemOSError, \ - GlusterFileSystemIOError +from gluster.swift.common.exceptions import GlusterFileSystemOSError from gluster.swift.common.Glusterfs import mount from gluster.swift.common.fs_utils import do_fstat, do_open, do_close, \ - do_unlink, do_chown, os_path, do_fsync, do_fchown, do_stat, do_write, \ - do_fdatasync, do_rename, Fake_file + do_unlink, do_chown, do_fsync, do_fchown, do_stat, do_write, do_read, \ + do_fadvise64, do_rename, do_fdatasync, do_lseek from gluster.swift.common.utils import read_metadata, write_metadata, \ validate_object, create_object_metadata, rmobjdir, dir_is_object, \ get_object_metadata -from gluster.swift.common.utils import X_CONTENT_LENGTH, X_CONTENT_TYPE, \ +from gluster.swift.common.utils import X_CONTENT_TYPE, \ X_TIMESTAMP, X_TYPE, X_OBJECT_TYPE, FILE, OBJECT, DIR_TYPE, \ FILE_TYPE, DEFAULT_UID, DEFAULT_GID, DIR_NON_OBJECT, DIR_OBJECT from ConfigParser import ConfigParser, NoSectionError, NoOptionError -from swift.obj.diskfile import DiskFile as SwiftDiskFile - # FIXME: Hopefully we'll be able to move to Python 2.7+ where O_CLOEXEC will # be back ported. See http://www.python.org/dev/peps/pep-0433/ O_CLOEXEC = 02000000 DEFAULT_DISK_CHUNK_SIZE = 65536 -DEFAULT_BYTES_PER_SYNC = (512 * 1024 * 1024) +DEFAULT_KEEP_CACHE_SIZE = (5 * 1024 * 1024) +DEFAULT_MB_PER_SYNC = 512 # keep these lower-case DISALLOWED_HEADERS = set('content-length content-type deleted etag'.split()) @@ -279,70 +279,130 @@ def _adjust_metadata(metadata): return metadata -class DiskWriter(object): +class OnDiskManager(object): + """ + Management class for devices, providing common place for shared parameters + and methods not provided by the DiskFile class (which primarily services + the object server REST API layer). + + The `get_diskfile()` method is how this implementation creates a `DiskFile` + object. + + .. note:: + + This class is reference implementation specific and not part of the + pluggable on-disk backend API. + + :param conf: caller provided configuration object + :param logger: caller provided logger + """ + def __init__(self, conf, logger): + self.logger = logger + self.disk_chunk_size = int(conf.get('disk_chunk_size', + DEFAULT_DISK_CHUNK_SIZE)) + self.keep_cache_size = int(conf.get('keep_cache_size', + DEFAULT_KEEP_CACHE_SIZE)) + self.bytes_per_sync = int(conf.get('mb_per_sync', + DEFAULT_MB_PER_SYNC)) * 1024 * 1024 + self.devices = conf.get('devices', '/srv/node/') + self.mount_check = config_true_value(conf.get('mount_check', 'true')) + threads_per_disk = int(conf.get('threads_per_disk', '0')) + self.threadpools = defaultdict( + lambda: ThreadPool(nthreads=threads_per_disk)) + + def _get_dev_path(self, device): + """ + Return the path to a device, checking to see that it is a proper mount + point based on a configuration parameter. + + :param device: name of target device + :returns: full path to the device, None if the path to the device is + not a proper mount point. + """ + if self.mount_check and not mount(self.devices, device): + dev_path = None + else: + dev_path = os.path.join(self.devices, device) + return dev_path + + def get_diskfile(self, device, account, container, obj, + **kwargs): + dev_path = self._get_dev_path(device) + if not dev_path: + raise DiskFileDeviceUnavailable() + return DiskFile(self, dev_path, self.threadpools[device], + account, container, obj, **kwargs) + + +class DiskFileWriter(object): """ Encapsulation of the write context for servicing PUT REST API - requests. Serves as the context manager object for DiskFile's writer() + requests. Serves as the context manager object for DiskFile's create() method. - We just override the put() method for Gluster. + """ - def __init__(self, disk_file, fd, tmppath, threadpool): - self.disk_file = disk_file - self.fd = fd - self.tmppath = tmppath - self.upload_size = 0 - self.last_sync = 0 - self.threadpool = threadpool + def __init__(self, fd, tmppath, disk_file): + # Parameter tracking + self._fd = fd + self._tmppath = tmppath + self._disk_file = disk_file + + # Internal attributes + self._upload_size = 0 + self._last_sync = 0 + + def _write_entire_chunk(self, chunk): + bytes_per_sync = self._disk_file._mgr.bytes_per_sync + while chunk: + written = do_write(self._fd, chunk) + chunk = chunk[written:] + self._upload_size += written + # For large files sync every 512MB (by default) written + diff = self._upload_size - self._last_sync + if diff >= bytes_per_sync: + do_fdatasync(self._fd) + do_fadvise64(self._fd, self._last_sync, diff) + self._last_sync = self._upload_size def write(self, chunk): """ - Write a chunk of data into the temporary file. - - :param chunk: the chunk of data to write as a string object - """ + Write a chunk of data to disk. - def _write_entire_chunk(chunk): - while chunk: - written = do_write(self.fd, chunk) - self.upload_size += written - chunk = chunk[written:] + For this implementation, the data is written into a temporary file. - self.threadpool.run_in_thread(_write_entire_chunk, chunk) + :param chunk: the chunk of data to write as a string object - # For large files sync every 512MB (by default) written - diff = self.upload_size - self.last_sync - if diff >= self.disk_file.bytes_per_sync: - self.threadpool.force_run_in_thread(do_fdatasync, self.fd) - drop_buffer_cache(self.fd, self.last_sync, diff) - self.last_sync = self.upload_size + :returns: the total number of bytes written to an object + """ + df = self._disk_file + df._threadpool.run_in_thread(self._write_entire_chunk, chunk) + return self._upload_size def _finalize_put(self, metadata): # Write out metadata before fsync() to ensure it is also forced to # disk. - write_metadata(self.fd, metadata) + write_metadata(self._fd, metadata) # We call fsync() before calling drop_cache() to lower the # amount of redundant work the drop cache code will perform on # the pages (now that after fsync the pages will be all # clean). - do_fsync(self.fd) - + do_fsync(self._fd) # From the Department of the Redundancy Department, make sure # we call drop_cache() after fsync() to avoid redundant work # (pages all clean). - drop_buffer_cache(self.fd, 0, self.upload_size) + do_fadvise64(self._fd, self._last_sync, self._upload_size) # At this point we know that the object's full directory path # exists, so we can just rename it directly without using Swift's # swift.common.utils.renamer(), which makes the directory path and # adds extra stat() calls. - df = self.disk_file - data_file = os.path.join(df.put_datadir, df._obj) + df = self._disk_file attempts = 1 while True: try: - do_rename(self.tmppath, data_file) + do_rename(self._tmppath, df._data_file) except OSError as err: if err.errno in (errno.ENOENT, errno.EIO) \ and attempts < MAX_RENAME_ATTEMPTS: @@ -357,33 +417,32 @@ class DiskWriter(object): # "The link named by tmppath does not exist; or, a # directory component in data_file does not exist; # or, tmppath or data_file is an empty string." - assert len(self.tmppath) > 0 and len(data_file) > 0 - tpstats = do_stat(self.tmppath) - tfstats = do_fstat(self.fd) + assert len(self._tmppath) > 0 and len(df._data_file) > 0 + tpstats = do_stat(self._tmppath) + tfstats = do_fstat(self._fd) assert tfstats if not tpstats or tfstats.st_ino != tpstats.st_ino: # Temporary file name conflict raise DiskFileError( 'DiskFile.put(): temporary file, %s, was' ' already renamed (targeted for %s)' % ( - self.tmppath, data_file)) + self._tmppath, df._data_file)) else: # Data file target name now has a bad path! - dfstats = do_stat(df.put_datadir) + dfstats = do_stat(df._put_datadir) if not dfstats: raise DiskFileError( 'DiskFile.put(): path to object, %s, no' ' longer exists (targeted for %s)' % ( - df.put_datadir, - data_file)) + df._put_datadir, df._data_file)) else: is_dir = stat.S_ISDIR(dfstats.st_mode) if not is_dir: raise DiskFileError( 'DiskFile.put(): path to object, %s,' ' no longer a directory (targeted for' - ' %s)' % (df.put_datadir, - data_file)) + ' %s)' % (self._put_datadir, + df._data_file)) else: # Let's retry since everything looks okay logging.warn( @@ -391,77 +450,178 @@ class DiskWriter(object): " initially failed (%s) but a" " stat('%s') following that succeeded:" " %r" % ( - self.tmppath, data_file, - str(err), df.put_datadir, - dfstats)) + self._tmppath, df._data_file, str(err), + df._put_datadir, dfstats)) attempts += 1 continue else: raise GlusterFileSystemOSError( err.errno, "%s, os.rename('%s', '%s')" % ( - err.strerror, self.tmppath, data_file)) + err.strerror, self._tmppath, df._data_file)) else: # Success! break - # Close here so the calling context does not have to perform this - # in a thread. - do_close(self.fd) + # Close here so the calling context does not have to perform this, + # which keeps all file system operations in the threadpool context. + do_close(self._fd) + self._fd = None - def put(self, metadata, extension='.data'): + def put(self, metadata): """ Finalize writing the file on disk, and renames it from the temp file to the real location. This should be called after the data has been written to the temp file. :param metadata: dictionary of metadata to be written - :param extension: extension to be used when making the file """ - # Our caller will use '.data' here; we just ignore it since we map the - # URL directly to the file system. - - assert self.tmppath is not None + assert self._tmppath is not None metadata = _adjust_metadata(metadata) - df = self.disk_file + df = self._disk_file if dir_is_object(metadata): - if not df.data_file: - # Does not exist, create it - data_file = os.path.join(df._obj_path, df._obj) - _, df._metadata = self.threadpool.force_run_in_thread( - df._create_dir_object, data_file, metadata) - df.data_file = os.path.join(df._container_path, data_file) - elif not df._is_dir: - # Exists, but as a file - raise DiskFileError('DiskFile.put(): directory creation failed' - ' since the target, %s, already exists as' - ' a file' % df.data_file) + df._threadpool.force_run_in_thread( + df._create_dir_object, df._data_file, metadata) return - try: - self.threadpool.force_run_in_thread(self._finalize_put, metadata) - except GlusterFileSystemOSError as err: - if err.errno == errno.EISDIR: - # A pre-existing directory already exists on the file - # system, perhaps gratuitously created when another - # object was created, or created externally to Swift - # REST API servicing (UFO use case). - raise DiskFileError('DiskFile.put(): file creation failed' - ' since the target, %s, already exists as' - ' a directory' % df.data_file) - raise + if df._is_dir: + # A pre-existing directory already exists on the file + # system, perhaps gratuitously created when another + # object was created, or created externally to Swift + # REST API servicing (UFO use case). + raise DiskFileError('DiskFile.put(): file creation failed since' + ' the target, %s, already exists as a' + ' directory' % df._data_file) + + df._threadpool.force_run_in_thread(self._finalize_put, metadata) # Avoid the unlink() system call as part of the mkstemp context # cleanup self.tmppath = None - df._metadata = metadata - df._filter_metadata() - # Mark that it actually exists now - df.data_file = os.path.join(df.datadir, df._obj) +class DiskFileReader(object): + """ + Encapsulation of the WSGI read context for servicing GET REST API + requests. Serves as the context manager object for the + :class:`swift.obj.diskfile.DiskFile` class's + :func:`swift.obj.diskfile.DiskFile.reader` method. + + .. note:: + + The quarantining behavior of this method is considered implementation + specific, and is not required of the API. + + .. note:: + The arguments to the constructor are considered implementation + specific. The API does not define the constructor arguments. -class DiskFile(SwiftDiskFile): + :param fp: open file descriptor, -1 for a directory object + :param threadpool: thread pool to use for read operations + :param disk_chunk_size: size of reads from disk in bytes + :param obj_size: size of object on disk + :param keep_cache_size: maximum object size that will be kept in cache + :param iter_hook: called when __iter__ returns a chunk + :param keep_cache: should resulting reads be kept in the buffer cache + """ + def __init__(self, fd, threadpool, disk_chunk_size, obj_size, + keep_cache_size, iter_hook=None, keep_cache=False): + # Parameter tracking + self._fd = fd + self._threadpool = threadpool + self._disk_chunk_size = disk_chunk_size + self._iter_hook = iter_hook + if keep_cache: + # Caller suggests we keep this in cache, only do it if the + # object's size is less than the maximum. + self._keep_cache = obj_size < keep_cache_size + else: + self._keep_cache = False + + # Internal Attributes + self._suppress_file_closing = False + + def __iter__(self): + """Returns an iterator over the data file.""" + try: + dropped_cache = 0 + bytes_read = 0 + while True: + if self._fd != -1: + chunk = self._threadpool.run_in_thread( + do_read, self._fd, self._disk_chunk_size) + else: + chunk = None + if chunk: + bytes_read += len(chunk) + diff = bytes_read - dropped_cache + if diff > (1024 * 1024): + self._drop_cache(self._fd, dropped_cache, diff) + dropped_cache = bytes_read + yield chunk + if self._iter_hook: + self._iter_hook() + else: + diff = bytes_read - dropped_cache + if diff > 0: + self._drop_cache(dropped_cache, diff) + break + finally: + if not self._suppress_file_closing: + self.close() + + def app_iter_range(self, start, stop): + """Returns an iterator over the data file for range (start, stop)""" + if start or start == 0: + do_lseek(self._fd, start, os.SEEK_SET) + if stop is not None: + length = stop - start + else: + length = None + try: + for chunk in self: + if length is not None: + length -= len(chunk) + if length < 0: + # Chop off the extra: + yield chunk[:length] + break + yield chunk + finally: + if not self._suppress_file_closing: + self.close() + + def app_iter_ranges(self, ranges, content_type, boundary, size): + """Returns an iterator over the data file for a set of ranges""" + if not ranges: + yield '' + else: + try: + self._suppress_file_closing = True + for chunk in multi_range_iterator( + ranges, content_type, boundary, size, + self.app_iter_range): + yield chunk + finally: + self._suppress_file_closing = False + self.close() + + def _drop_cache(self, offset, length): + """Method for no-oping buffer cache drop method.""" + if not self._keep_cache and self._fd > -1: + do_fadvise64(self._fd, offset, length) + + def close(self): + """ + Close the open file handle if present. + """ + if self._fd is not None: + fd, self._fd = self._fd, None + if fd > -1: + do_close(fd) + + +class DiskFile(object): """ Manage object files on disk. @@ -471,151 +631,181 @@ class DiskFile(SwiftDiskFile): gluster.common.constrains.gluster_check_object_creation() should reject such requests. - :param path: path to devices on the node/mount path for UFO. - :param device: device name/account_name for UFO. - :param partition: partition on the device the object lives in + :param mgr: associated on-disk manager instance + :param dev_path: device name/account_name for UFO. + :param threadpool: thread pool in which to do blocking operations :param account: account name for the object :param container: container name for the object :param obj: object name for the object - :param logger: logger object for writing out log file messages - :param disk_chunk_Size: size of chunks on file reads - :param bytes_per_sync: number of bytes between fdatasync calls - :param iter_hook: called when __iter__ returns a chunk - :param threadpool: thread pool in which to do blocking operations - :param obj_dir: ignored - :param mount_check: check the target device is a mount point and not on the - root volume :param uid: user ID disk object should assume (file or directory) :param gid: group ID disk object should assume (file or directory) """ - - def __init__(self, path, device, partition, account, container, obj, - logger, disk_chunk_size=DEFAULT_DISK_CHUNK_SIZE, - bytes_per_sync=DEFAULT_BYTES_PER_SYNC, iter_hook=None, - threadpool=None, obj_dir='objects', mount_check=False, + def __init__(self, mgr, dev_path, threadpool, account, container, obj, uid=DEFAULT_UID, gid=DEFAULT_GID): - if mount_check and not mount(path, device): - raise DiskFileDeviceUnavailable() - self.disk_chunk_size = disk_chunk_size - self.bytes_per_sync = bytes_per_sync - self.iter_hook = iter_hook - obj = obj.strip(os.path.sep) + self._mgr = mgr + self._device_path = dev_path + self._threadpool = threadpool or ThreadPool(nthreads=0) + self._uid = int(uid) + self._gid = int(gid) + self._is_dir = False + self._logger = mgr.logger + self._metadata = None + self._fd = None + # Don't store a value for data_file until we know it exists. + self._data_file = None - if os.path.sep in obj: - self._obj_path, self._obj = os.path.split(obj) + self._container_path = os.path.join(self._device_path, container) + obj = obj.strip(os.path.sep) + obj_path, self._obj = os.path.split(obj) + if obj_path: + self._obj_path = obj_path.strip(os.path.sep) + self._datadir = os.path.join(self._container_path, self._obj_path) else: self._obj_path = '' - self._obj = obj + self._datadir = self._container_path - if self._obj_path: - self.name = os.path.join(container, self._obj_path) - else: - self.name = container - # Absolute path for object directory. - self.datadir = os.path.join(path, device, self.name) - self.device_path = os.path.join(path, device) - self._container_path = os.path.join(path, device, container) if _use_put_mount: - self.put_datadir = os.path.join(self.device_path + '_PUT', - self.name) + self._put_datadir = os.path.join( + self._device_path + '_PUT', container, self._obj_path) else: - self.put_datadir = self.datadir - self._is_dir = False - self.logger = logger - self._metadata = None - # Don't store a value for data_file until we know it exists. - self.data_file = None - self._data_file_size = None - self.fp = None - self.iter_etag = None - self.started_at_0 = False - self.read_to_eof = False - self.quarantined_dir = None - self.suppress_file_closing = False - self._verify_close = False - self.threadpool = threadpool or ThreadPool(nthreads=0) - # FIXME(portante): this attribute is set after open and affects the - # behavior of the class (i.e. public interface) - self.keep_cache = False - self.uid = int(uid) - self.gid = int(gid) - - def open(self, verify_close=False): + self._put_datadir = self._datadir + self._data_file = os.path.join(self._put_datadir, self._obj) + + def open(self): """ - Open the file and read the metadata. + Open the object. - This method must populate the _metadata attribute. + This implementation opens the data file representing the object, reads + the associated metadata in the extended attributes, additionally + combining metadata from fast-POST `.meta` files. - :param verify_close: force implicit close to verify_file, no effect on - explicit close. + .. note:: - :raises DiskFileCollision: on md5 collision + An implementation is allowed to raise any of the following + exceptions, but is only required to raise `DiskFileNotExist` when + the object representation does not exist. + + :raises DiskFileNotExist: if the object does not exist + :returns: itself for use as a context manager """ - data_file = os.path.join(self.put_datadir, self._obj) + # Writes are always performed to a temporary file try: - fd = do_open(data_file, os.O_RDONLY | os.O_EXCL) + fd = do_open(self._data_file, os.O_RDONLY | O_CLOEXEC) except GlusterFileSystemOSError as err: - self.logger.exception( - "Error opening file, %s :: %s", data_file, err) + if err.errno in (errno.ENOENT, errno.ENOTDIR): + # If the file does exist, or some part of the path does not + # exist, raise the expected DiskFileNotExist + raise DiskFileNotExist + raise else: - try: - stats = do_fstat(fd) - except GlusterFileSystemOSError as err: - self.logger.exception( - "Error stat'ing open file, %s :: %s", data_file, err) - else: - self._is_dir = stat.S_ISDIR(stats.st_mode) + stats = do_fstat(fd) + if not stats: + return + self._is_dir = stat.S_ISDIR(stats.st_mode) + obj_size = stats.st_size + + self._metadata = read_metadata(fd) + if not validate_object(self._metadata): + create_object_metadata(fd) + self._metadata = read_metadata(fd) + assert self._metadata is not None + self._filter_metadata() - self.data_file = data_file + if self._is_dir: + do_close(fd) + obj_size = 0 + self._fd = -1 + else: + self._fd = fd - self._metadata = read_metadata(fd) - if not self._metadata: - create_object_metadata(fd) - self._metadata = read_metadata(fd) + self._obj_size = obj_size + return self - if not validate_object(self._metadata): - create_object_metadata(fd) - self._metadata = read_metadata(fd) + def _filter_metadata(self): + if X_TYPE in self._metadata: + self._metadata.pop(X_TYPE) + if X_OBJECT_TYPE in self._metadata: + self._metadata.pop(X_OBJECT_TYPE) - self._filter_metadata() + def __enter__(self): + """ + Context enter. - if self._is_dir: - # Use a fake file handle to satisfy the super class's - # __iter__ method requirement when dealing with - # directories as objects. - os.close(fd) - self.fp = Fake_file(data_file) - else: - self.fp = os.fdopen(fd, 'rb') - self._verify_close = verify_close - self._metadata = self._metadata or {} - return self + .. note:: - def _drop_cache(self, fd, offset, length): - if fd >= 0: - super(DiskFile, self)._drop_cache(fd, offset, length) + An implemenation shall raise `DiskFileNotOpen` when has not + previously invoked the :func:`swift.obj.diskfile.DiskFile.open` + method. + """ + if self._metadata is None: + raise DiskFileNotOpen() + return self - def close(self, verify_file=True): + def __exit__(self, t, v, tb): """ - Close the file. Will handle quarantining file if necessary. + Context exit. + + .. note:: - :param verify_file: Defaults to True. If false, will not check - file to see if it needs quarantining. + This method will be invoked by the object server while servicing + the REST API *before* the object has actually been read. It is the + responsibility of the implementation to properly handle that. """ - if self.fp: - do_close(self.fp) - self.fp = None self._metadata = None - self._data_file_size = None - self._verify_close = False + if self._fd is not None: + fd, self._fd = self._fd, None + if self._fd > -1: + do_close(fd) - def _filter_metadata(self): + def get_metadata(self): + """ + Provide the metadata for a previously opened object as a dictionary. + + :returns: object's metadata dictionary + :raises DiskFileNotOpen: if the + :func:`swift.obj.diskfile.DiskFile.open` method was not previously + invoked + """ if self._metadata is None: - return - if X_TYPE in self._metadata: - self._metadata.pop(X_TYPE) - if X_OBJECT_TYPE in self._metadata: - self._metadata.pop(X_OBJECT_TYPE) + raise DiskFileNotOpen() + return self._metadata + + def read_metadata(self): + """ + Return the metadata for an object without requiring the caller to open + the object first. + + :returns: metadata dictionary for an object + :raises DiskFileError: this implementation will raise the same + errors as the `open()` method. + """ + with self.open(): + return self.get_metadata() + + def reader(self, iter_hook=None, keep_cache=False): + """ + Return a :class:`swift.common.swob.Response` class compatible + "`app_iter`" object as defined by + :class:`swift.obj.diskfile.DiskFileReader`. + + For this implementation, the responsibility of closing the open file + is passed to the :class:`swift.obj.diskfile.DiskFileReader` object. + + :param iter_hook: called when __iter__ returns a chunk + :param keep_cache: caller's preference for keeping data read in the + OS buffer cache + :returns: a :class:`swift.obj.diskfile.DiskFileReader` object + """ + if self._metadata is None: + raise DiskFileNotOpen() + dr = DiskFileReader( + self._fd, self._threadpool, self._mgr.disk_chunk_size, + self._obj_size, self._mgr.keep_cache_size, + iter_hook=iter_hook, keep_cache=keep_cache) + # At this point the reader object is now responsible for closing + # the file pointer. + self._fd = None + return dr def _create_dir_object(self, dir_path, metadata=None): """ @@ -648,7 +838,7 @@ class DiskFile(SwiftDiskFile): stack = [] while True: md = None if cur_path != full_path else metadata - ret, newmd = make_directory(cur_path, self.uid, self.gid, md) + ret, newmd = make_directory(cur_path, self._uid, self._gid, md) if ret: break # Some path of the parent did not exist, so loop around and @@ -665,27 +855,41 @@ class DiskFile(SwiftDiskFile): while child: cur_path = os.path.join(cur_path, child) md = None if cur_path != full_path else metadata - ret, newmd = make_directory(cur_path, self.uid, self.gid, md) + ret, newmd = make_directory(cur_path, self._uid, self._gid, md) if not ret: raise DiskFileError("DiskFile._create_dir_object(): failed to" " create directory path to target, %s," " on subpath: %s" % (full_path, cur_path)) child = stack.pop() if stack else None return True, newmd + # Exists, but as a file + #raise DiskFileError('DiskFile.put(): directory creation failed' + # ' since the target, %s, already exists as' + # ' a file' % df._data_file) @contextmanager def create(self, size=None): """ - Contextmanager to make a temporary file, optionally of a specified - initial size. + Context manager to create a file. We create a temporary file first, and + then return a DiskFileWriter object to encapsulate the state. For Gluster, we first optimistically create the temporary file using the "rsync-friendly" .NAME.random naming. If we find that some path to the file does not exist, we then create that path and then create the temporary file again. If we get file name conflict, we'll retry using different random suffixes 1,000 times before giving up. + + .. note:: + + An implementation is not required to perform on-disk + preallocations even if the parameter is specified. But if it does + and it fails, it must raise a `DiskFileNoSpace` exception. + + :param size: optional initial size of file to explicitly allocate on + disk + :raises DiskFileNoSpace: if a size is specified and allocation fails """ - data_file = os.path.join(self.put_datadir, self._obj) + data_file = os.path.join(self._put_datadir, self._obj) # Assume the full directory path exists to the file already, and # construct the proper name for the temporary file. @@ -695,7 +899,7 @@ class DiskFile(SwiftDiskFile): postfix = md5(self._obj + _cur_host + _cur_pid + cur_thread + str(random.random())).hexdigest() tmpfile = '.' + self._obj + '.' + postfix - tmppath = os.path.join(self.put_datadir, tmpfile) + tmppath = os.path.join(self._put_datadir, tmpfile) try: fd = do_open(tmppath, os.O_WRONLY | os.O_CREAT | os.O_EXCL | O_CLOEXEC) @@ -752,35 +956,39 @@ class DiskFile(SwiftDiskFile): dw = None try: # Ensure it is properly owned before we make it available. - do_fchown(fd, self.uid, self.gid) + do_fchown(fd, self._uid, self._gid) # NOTE: we do not perform the fallocate() call at all. We ignore - # it completely. - dw = DiskWriter(self, fd, tmppath, self.threadpool) + # it completely since at the time of this writing FUSE does not + # support it. + dw = DiskFileWriter(fd, tmppath, self) yield dw finally: - try: - if dw.fd: - do_close(dw.fd) - except OSError: - pass - if dw.tmppath: - do_unlink(dw.tmppath) - - def put_metadata(self, metadata, tombstone=False): + if dw is not None: + try: + if dw._fd: + do_close(dw._fd) + except OSError: + pass + if dw._tmppath: + do_unlink(dw._tmppath) + + def write_metadata(self, metadata): """ - Short hand for putting metadata to .meta and .ts files. + Write a block of metadata to an object without requiring the caller to + open the object first. - :param metadata: dictionary of metadata to be written - :param tombstone: whether or not we are writing a tombstone + :param metadata: dictionary of metadata to be associated with the + object + :raises DiskFileError: this implementation will raise the same + errors as the `create()` method. """ - if tombstone: - # We don't write tombstone files. So do nothing. - return + # FIXME: we need to validate system metadata is preserved metadata = _adjust_metadata(metadata) - data_file = os.path.join(self.put_datadir, self._obj) - self.threadpool.run_in_thread(write_metadata, data_file, metadata) + data_file = os.path.join(self._put_datadir, self._obj) + self._threadpool.run_in_thread( + write_metadata, data_file, metadata) - def _delete(self): + def _unlinkold(self): if self._is_dir: # Marker, or object, directory. # @@ -789,19 +997,21 @@ class DiskFile(SwiftDiskFile): # metadata tag which will make this directory a # fake-filesystem-only directory and will be deleted when the # container or parent directory is deleted. - metadata = read_metadata(self.data_file) + # + # FIXME: Ideally we should use an atomic metadata update operation + metadata = read_metadata(self._data_file) if dir_is_object(metadata): metadata[X_OBJECT_TYPE] = DIR_NON_OBJECT - write_metadata(self.data_file, metadata) - rmobjdir(self.data_file) + write_metadata(self._data_file, metadata) + rmobjdir(self._data_file) else: # Delete file object - do_unlink(self.data_file) + do_unlink(self._data_file) # Garbage collection of non-object directories. Now that we # deleted the file, determine if the current directory and any # parent directory may be deleted. - dirname = os.path.dirname(self.data_file) + dirname = os.path.dirname(self._data_file) while dirname and dirname != self._container_path: # Try to remove any directories that are not objects. if not rmobjdir(dirname): @@ -813,58 +1023,31 @@ class DiskFile(SwiftDiskFile): def delete(self, timestamp): """ - Remove any older versions of the object file. Any file that has an - older timestamp than timestamp will be deleted. + Delete the object. + + This implementation creates a tombstone file using the given + timestamp, and removes any older versions of the object file. Any + file that has an older timestamp than timestamp will be deleted. + + .. note:: + + An implementation is free to use or ignore the timestamp + parameter. :param timestamp: timestamp to compare with each file + :raises DiskFileError: this implementation will raise the same + errors as the `create()` method. """ - timestamp_fl = float(timestamp) - data_file = os.path.join(self.put_datadir, self._obj) try: - metadata = read_metadata(data_file) - except (GlusterFileSystemIOError, GlusterFileSystemOSError) as err: + metadata = read_metadata(self._data_file) + except (IOError, OSError) as err: if err.errno != errno.ENOENT: raise else: - try: - old_ts = float(metadata[X_TIMESTAMP]) >= timestamp_fl - except (KeyError, ValueError): - # If no X-Timestamp to compare against, or the timestamp is - # not a valid float, we'll just delete the object anyways. - old_ts = False - if not old_ts: - self.threadpool.run_in_thread(self._delete) - self._metadata = {} - self.data_file = None - - def _get_data_file_size(self): - """ - Returns the os_path.getsize for the file. Raises an exception if this - file does not match the Content-Length stored in the metadata, or if - self.data_file does not exist. + if metadata[X_TIMESTAMP] >= timestamp: + return - :returns: file size as an int - :raises DiskFileError: on file size mismatch. - :raises DiskFileNotExist: on file not existing (including deleted) - """ - if self._is_dir: - # Directories have no size. - return 0 - try: - file_size = 0 - if self.data_file: - def _old_getsize(): - file_size = os_path.getsize(self.data_file) - if X_CONTENT_LENGTH in self._metadata: - metadata_size = int(self._metadata[X_CONTENT_LENGTH]) - if file_size != metadata_size: - # FIXME - bit rot detection? - self._metadata[X_CONTENT_LENGTH] = file_size - write_metadata(self.data_file, self._metadata) - return file_size - file_size = self.threadpool.run_in_thread(_old_getsize) - return file_size - except OSError as err: - if err.errno != errno.ENOENT: - raise - raise DiskFileNotExist('Data File does not exist.') + self._threadpool.run_in_thread(self._unlinkold) + + self._metadata = None + self._data_file = None diff --git a/gluster/swift/obj/server.py b/gluster/swift/obj/server.py index 433879f..6417475 100644 --- a/gluster/swift/obj/server.py +++ b/gluster/swift/obj/server.py @@ -21,7 +21,7 @@ import gluster.swift.common.constraints # noqa from swift.obj import server -from gluster.swift.obj.diskfile import DiskFile +from gluster.swift.obj.diskfile import OnDiskManager class ObjectController(server.ObjectController): @@ -31,33 +31,52 @@ class ObjectController(server.ObjectController): stored on disk and already updated by virtue of performing the file system operations directly). """ + def setup(self, conf): + """ + Implementation specific setup. This method is called at the very end + by the constructor to allow a specific implementation to modify + existing attributes or add its own attributes. + + :param conf: WSGI configuration parameter + """ + # FIXME: Gluster currently does not support x-delete-at, as there is + # no mechanism in GlusterFS itself to expire an object, or an external + # process that will cull expired objects. + try: + self.allowed_headers.remove('x-delete-at') + except KeyError: + pass + # Common on-disk hierarchy shared across account, container and object + # servers. + self._ondisk_mgr = OnDiskManager(conf, self.logger) + + def get_diskfile(self, device, partition, account, container, obj, + **kwargs): + """ + Utility method for instantiating a DiskFile object supporting a given + REST API. - def _diskfile(self, device, partition, account, container, obj, **kwargs): - """Utility method for instantiating a DiskFile.""" - kwargs.setdefault('mount_check', self.mount_check) - kwargs.setdefault('bytes_per_sync', self.bytes_per_sync) - kwargs.setdefault('disk_chunk_size', self.disk_chunk_size) - kwargs.setdefault('threadpool', self.threadpools[device]) - kwargs.setdefault('obj_dir', server.DATADIR) - return DiskFile(self.devices, device, partition, account, - container, obj, self.logger, **kwargs) + An implementation of the object server that wants to use a different + DiskFile class would simply over-ride this method to provide that + behavior. + """ + return self._ondisk_mgr.get_diskfile(device, account, container, obj, + **kwargs) - def container_update(self, op, account, container, obj, request, - headers_out, objdevice): + def container_update(self, *args, **kwargs): """ Update the container when objects are updated. For Gluster, this is just a no-op, since a container is just the directory holding all the objects (sub-directory hierarchy of files). + """ + return + + def delete_at_update(self, *args, **kwargs): + """ + Update the expiring objects container when objects are updated. - :param op: operation performed (ex: 'PUT', or 'DELETE') - :param account: account name for the object - :param container: container name for the object - :param obj: object name - :param request: the original request object driving the update - :param headers_out: dictionary of headers to send in the container - request(s) - :param objdevice: device name that the object is in + FIXME: Gluster currently does not support delete_at headers. """ return diff --git a/gluster/swift/proxy/server.py b/gluster/swift/proxy/server.py index 3254409..7b2f58e 100644 --- a/gluster/swift/proxy/server.py +++ b/gluster/swift/proxy/server.py @@ -16,7 +16,7 @@ # Simply importing this monkey patches the constraint handling to fit our # needs -import gluster.swift.common.constraints # noqa +import gluster.swift.common.constraints # noqa from swift.proxy.server import Application, mimetypes # noqa from swift.proxy.controllers import AccountController # noqa -- cgit