diff options
Diffstat (limited to 'gluster/swift/obj/diskfile.py')
-rw-r--r-- | gluster/swift/obj/diskfile.py | 137 |
1 files changed, 44 insertions, 93 deletions
diff --git a/gluster/swift/obj/diskfile.py b/gluster/swift/obj/diskfile.py index eb180a2..21e6cee 100644 --- a/gluster/swift/obj/diskfile.py +++ b/gluster/swift/obj/diskfile.py @@ -23,22 +23,18 @@ except ImportError: import random import logging import time -from collections import defaultdict -from socket import gethostname -from hashlib import md5 +from uuid import uuid4 from eventlet import sleep -from greenlet import getcurrent from contextlib import contextmanager from gluster.swift.common.exceptions import AlreadyExistsAsFile, \ AlreadyExistsAsDir -from swift.common.utils import TRUE_VALUES, ThreadPool, config_true_value +from swift.common.utils import ThreadPool from swift.common.exceptions import DiskFileNotExist, DiskFileError, \ DiskFileNoSpace, DiskFileDeviceUnavailable, DiskFileNotOpen, \ DiskFileExpired from swift.common.swob import multi_range_iterator from gluster.swift.common.exceptions import GlusterFileSystemOSError -from gluster.swift.common.Glusterfs import mount from gluster.swift.common.fs_utils import do_fstat, do_open, do_close, \ do_unlink, do_chown, do_fsync, do_fchown, do_stat, do_write, do_read, \ do_fadvise64, do_rename, do_fdatasync, do_lseek, do_mkdir @@ -49,24 +45,15 @@ from gluster.swift.common.utils import X_CONTENT_TYPE, \ X_TIMESTAMP, X_TYPE, X_OBJECT_TYPE, FILE, OBJECT, DIR_TYPE, \ FILE_TYPE, DEFAULT_UID, DEFAULT_GID, DIR_NON_OBJECT, DIR_OBJECT, \ X_ETAG, X_CONTENT_LENGTH -from ConfigParser import ConfigParser, NoSectionError, NoOptionError +from swift.obj.diskfile import DiskFileManager as SwiftDiskFileManager # FIXME: Hopefully we'll be able to move to Python 2.7+ where O_CLOEXEC will # be back ported. See http://www.python.org/dev/peps/pep-0433/ O_CLOEXEC = 02000000 -DEFAULT_DISK_CHUNK_SIZE = 65536 -DEFAULT_KEEP_CACHE_SIZE = (5 * 1024 * 1024) -DEFAULT_MB_PER_SYNC = 512 -# keep these lower-case -DISALLOWED_HEADERS = set('content-length content-type deleted etag'.split()) - MAX_RENAME_ATTEMPTS = 10 MAX_OPEN_ATTEMPTS = 10 -_cur_pid = str(os.getpid()) -_cur_host = str(gethostname()) - def _random_sleep(): sleep(random.uniform(0.5, 0.15)) @@ -181,24 +168,6 @@ def make_directory(full_path, uid, gid, metadata=None): return True, metadata -_fs_conf = ConfigParser() -if _fs_conf.read(os.path.join('/etc/swift', 'fs.conf')): - try: - _mkdir_locking = _fs_conf.get('DEFAULT', 'mkdir_locking', "no") \ - in TRUE_VALUES - logging.warn("The option mkdir_locking has been deprecated and is" - " no longer supported") - except (NoSectionError, NoOptionError): - pass - try: - _use_put_mount = _fs_conf.get('DEFAULT', 'use_put_mount', "no") \ - in TRUE_VALUES - except (NoSectionError, NoOptionError): - _use_put_mount = False -else: - _use_put_mount = False - - def _adjust_metadata(metadata): # Fix up the metadata to ensure it has a proper value for the # Content-Type metadata, as well as an X_TYPE and X_OBJECT_TYPE @@ -223,7 +192,7 @@ def _adjust_metadata(metadata): return metadata -class OnDiskManager(object): +class DiskFileManager(SwiftDiskFileManager): """ Management class for devices, providing common place for shared parameters and methods not provided by the DiskFile class (which primarily services @@ -240,42 +209,14 @@ class OnDiskManager(object): :param conf: caller provided configuration object :param logger: caller provided logger """ - def __init__(self, conf, logger): - self.logger = logger - self.disk_chunk_size = int(conf.get('disk_chunk_size', - DEFAULT_DISK_CHUNK_SIZE)) - self.keep_cache_size = int(conf.get('keep_cache_size', - DEFAULT_KEEP_CACHE_SIZE)) - self.bytes_per_sync = int(conf.get('mb_per_sync', - DEFAULT_MB_PER_SYNC)) * 1024 * 1024 - self.devices = conf.get('devices', '/srv/node/') - self.mount_check = config_true_value(conf.get('mount_check', 'true')) - threads_per_disk = int(conf.get('threads_per_disk', '0')) - self.threadpools = defaultdict( - lambda: ThreadPool(nthreads=threads_per_disk)) - - def _get_dev_path(self, device): - """ - Return the path to a device, checking to see that it is a proper mount - point based on a configuration parameter. - - :param device: name of target device - :returns: full path to the device, None if the path to the device is - not a proper mount point. - """ - if self.mount_check and not mount(self.devices, device): - dev_path = None - else: - dev_path = os.path.join(self.devices, device) - return dev_path - - def get_diskfile(self, device, account, container, obj, - **kwargs): - dev_path = self._get_dev_path(device) + def get_diskfile(self, device, partition, account, container, obj, + policy=None, **kwargs): + dev_path = self.get_dev_path(device, self.mount_check) if not dev_path: raise DiskFileDeviceUnavailable() return DiskFile(self, dev_path, self.threadpools[device], - account, container, obj, **kwargs) + partition, account, container, obj, + policy=policy, **kwargs) class DiskFileWriter(object): @@ -447,10 +388,20 @@ class DiskFileWriter(object): df._threadpool.force_run_in_thread(self._finalize_put, metadata) - # Avoid the unlink() system call as part of the DiskFile.create() - # context cleanup + # Avoid the unlink() system call as part of the mkstemp context + # cleanup self._tmppath = None + def commit(self, timestamp): + """ + Perform any operations necessary to mark the object as durable. For + replication policy type this is a no-op. + + :param timestamp: object put timestamp, an instance of + :class:`~swift.common.utils.Timestamp` + """ + pass + class DiskFileReader(object): """ @@ -579,9 +530,9 @@ class DiskFile(object): Manage object files on disk. Object names ending or beginning with a '/' as in /a, a/, /a/b/, - etc, or object names with multiple consecutive slahes, like a//b, - are not supported. The proxy server's contraints filter - gluster.common.constrains.gluster_check_object_creation() should + etc, or object names with multiple consecutive slashes, like a//b, + are not supported. The proxy server's constraints filter + gluster.common.constrains.check_object_creation() should reject such requests. :param mgr: associated on-disk manager instance @@ -593,36 +544,37 @@ class DiskFile(object): :param uid: user ID disk object should assume (file or directory) :param gid: group ID disk object should assume (file or directory) """ - def __init__(self, mgr, dev_path, threadpool, account, container, obj, - uid=DEFAULT_UID, gid=DEFAULT_GID): + def __init__(self, mgr, dev_path, threadpool, partition, + account=None, container=None, obj=None, + policy=None, uid=DEFAULT_UID, gid=DEFAULT_GID): + # Variables partition and policy is currently unused. self._mgr = mgr self._device_path = dev_path self._threadpool = threadpool or ThreadPool(nthreads=0) self._uid = int(uid) self._gid = int(gid) self._is_dir = False - self._logger = mgr.logger self._metadata = None self._fd = None self._stat = None # Don't store a value for data_file until we know it exists. self._data_file = None - self._container_path = os.path.join(self._device_path, container) + self._account = account # Unused, account = volume + self._container = container + + self._container_path = os.path.join(self._device_path, self._container) + obj = obj.strip(os.path.sep) obj_path, self._obj = os.path.split(obj) if obj_path: self._obj_path = obj_path.strip(os.path.sep) - self._datadir = os.path.join(self._container_path, self._obj_path) + self._put_datadir = os.path.join(self._container_path, + self._obj_path) else: self._obj_path = '' - self._datadir = self._container_path + self._put_datadir = self._container_path - if _use_put_mount: - self._put_datadir = os.path.join( - self._device_path + '_PUT', container, self._obj_path) - else: - self._put_datadir = self._datadir self._data_file = os.path.join(self._put_datadir, self._obj) def open(self): @@ -687,7 +639,8 @@ class DiskFile(object): raise DiskFileNotExist else: # Re-raise the original exception after fd has been closed - raise err + raise + return self def _is_object_expired(self, metadata): @@ -874,16 +827,14 @@ class DiskFile(object): :raises AlreadyExistsAsFile: if path or part of a path is not a \ directory """ + data_file = os.path.join(self._put_datadir, self._obj) # Assume the full directory path exists to the file already, and # construct the proper name for the temporary file. attempts = 1 - cur_thread = str(getcurrent()) while True: - postfix = md5(self._obj + _cur_host + _cur_pid + cur_thread - + str(random.random())).hexdigest() - tmpfile = '.' + self._obj + '.' + postfix + tmpfile = '.' + self._obj + '.' + uuid4().hex tmppath = os.path.join(self._put_datadir, tmpfile) try: fd = do_open(tmppath, @@ -905,7 +856,7 @@ class DiskFile(object): if attempts >= MAX_OPEN_ATTEMPTS: # We failed after N attempts to create the temporary # file. - raise DiskFileError('DiskFile.create(): failed to' + raise DiskFileError('DiskFile.mkstemp(): failed to' ' successfully create a temporary file' ' without running into a name conflict' ' after %d of %d attempts for: %s' % ( @@ -918,7 +869,7 @@ class DiskFile(object): # FIXME: Possible FUSE issue or race condition, let's # sleep on it and retry the operation. _random_sleep() - logging.warn("DiskFile.create(): %s ... retrying in" + logging.warn("DiskFile.mkstemp(): %s ... retrying in" " 0.1 secs", gerr) attempts += 1 elif not self._obj_path: @@ -927,7 +878,7 @@ class DiskFile(object): # could be a FUSE issue or some race condition, so let's # sleep a bit and retry. _random_sleep() - logging.warn("DiskFile.create(): %s ... retrying in" + logging.warn("DiskFile.mkstemp(): %s ... retrying in" " 0.1 secs", gerr) attempts += 1 elif attempts > 1: @@ -935,7 +886,7 @@ class DiskFile(object): # also be a FUSE issue or some race condition, nap and # retry. _random_sleep() - logging.warn("DiskFile.create(): %s ... retrying in" + logging.warn("DiskFile.mkstemp(): %s ... retrying in" " 0.1 secs" % gerr) attempts += 1 else: |