diff options
Diffstat (limited to 'swift/1.4.8/plugins/DiskDir.py')
-rw-r--r-- | swift/1.4.8/plugins/DiskDir.py | 484 |
1 files changed, 484 insertions, 0 deletions
diff --git a/swift/1.4.8/plugins/DiskDir.py b/swift/1.4.8/plugins/DiskDir.py new file mode 100644 index 000000000..4b8131671 --- /dev/null +++ b/swift/1.4.8/plugins/DiskDir.py @@ -0,0 +1,484 @@ +# Copyright (c) 2011 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os + +from swift.plugins.utils import clean_metadata, dir_empty, rmdirs, mkdirs, \ + validate_account, validate_container, check_valid_account, is_marker, \ + get_container_details, get_account_details, create_container_metadata, \ + create_account_metadata, DEFAULT_GID, DEFAULT_UID, get_account_details, \ + validate_object, create_object_metadata, read_metadata, write_metadata + +from swift.common.constraints import CONTAINER_LISTING_LIMIT, \ + check_mount + +from swift.plugins.utils import X_CONTENT_TYPE, X_CONTENT_LENGTH, X_TIMESTAMP,\ + X_PUT_TIMESTAMP, X_TYPE, X_ETAG, X_OBJECTS_COUNT, X_BYTES_USED, \ + X_CONTAINER_COUNT, CONTAINER + +from swift import plugins +def strip_obj_storage_path(path, string='/mnt/gluster-object'): + """ + strip /mnt/gluster-object + """ + return path.replace(string, '').strip('/') + +DATADIR = 'containers' + + +class DiskCommon(object): + def is_deleted(self): + return not os.path.exists(self.datadir) + + def filter_prefix(self, objects, prefix): + """ + Accept sorted list. + """ + found = 0 + filtered_objs = [] + for object_name in objects: + if object_name.startswith(prefix): + filtered_objs.append(object_name) + found = 1 + else: + if found: + break + return filtered_objs + + def filter_delimiter(self, objects, delimiter, prefix): + """ + Accept sorted list. + Objects should start with prefix. + """ + filtered_objs=[] + for object_name in objects: + tmp_obj = object_name.replace(prefix, '', 1) + sufix = tmp_obj.split(delimiter, 1) + new_obj = prefix + sufix[0] + if new_obj and new_obj not in filtered_objs: + filtered_objs.append(new_obj) + + return filtered_objs + + def filter_marker(self, objects, marker): + """ + TODO: We can traverse in reverse order to optimize. + Accept sorted list. + """ + filtered_objs=[] + found = 0 + if objects[-1] < marker: + return filtered_objs + for object_name in objects: + if object_name > marker: + filtered_objs.append(object_name) + + return filtered_objs + + def filter_end_marker(self, objects, end_marker): + """ + Accept sorted list. + """ + filtered_objs=[] + for object_name in objects: + if object_name < end_marker: + filtered_objs.append(object_name) + else: + break + + return filtered_objs + + def filter_limit(self, objects, limit): + filtered_objs=[] + for i in range(0, limit): + filtered_objs.append(objects[i]) + + return filtered_objs + + def update_account(self, metadata): + acc_path = self.datadir + write_metadata(acc_path, metadata) + self.metadata = metadata + +class DiskDir(DiskCommon): + """ + Manage object files on disk. + + :param path: path to devices on the node + :param device: device name + :param partition: partition on the device the object lives in + :param account: account name for the object + :param container: container name for the object + :param obj: object name for the object + :param keep_data_fp: if True, don't close the fp, otherwise close it + :param disk_chunk_Size: size of chunks on file reads + """ + + def __init__(self, path, device, partition, account, container, logger, + uid=DEFAULT_UID, gid=DEFAULT_GID, fs_object=None): + self.root = path + device = account + if container: + self.name = container + else: + self.name = None + if self.name: + self.datadir = os.path.join(path, account, self.name) + else: + self.datadir = os.path.join(path, device) + self.account = account + self.device_path = os.path.join(path, device) + if not check_mount(path, device): + check_valid_account(account, fs_object) + self.logger = logger + self.metadata = {} + self.uid = int(uid) + self.gid = int(gid) + # Create a dummy db_file in /etc/swift + self.db_file = '/etc/swift/db_file.db' + if not os.path.exists(self.db_file): + file(self.db_file, 'w+') + self.dir_exists = os.path.exists(self.datadir) + if self.dir_exists: + try: + self.metadata = read_metadata(self.datadir) + except EOFError: + create_container_metadata(self.datadir) + else: + return + if container: + if not self.metadata: + create_container_metadata(self.datadir) + self.metadata = read_metadata(self.datadir) + else: + if not validate_container(self.metadata): + create_container_metadata(self.datadir) + self.metadata = read_metadata(self.datadir) + else: + if not self.metadata: + create_account_metadata(self.datadir) + self.metadata = read_metadata(self.datadir) + else: + if not validate_account(self.metadata): + create_account_metadata(self.datadir) + self.metadata = read_metadata(self.datadir) + + def empty(self): + return dir_empty(self.datadir) + + def delete(self): + if self.empty(): + #For delete account. + if os.path.ismount(self.datadir): + clean_metadata(self.datadir) + else: + rmdirs(self.datadir) + self.dir_exists = False + + + def put_metadata(self, metadata): + """ + Write metadata to directory/container. + """ + write_metadata(self.datadir, metadata) + self.metadata = metadata + + def put(self, metadata): + """ + Create and write metatdata to directory/container. + :param metadata: Metadata to write. + """ + if not self.dir_exists: + mkdirs(self.datadir) + + os.chown(self.datadir, self.uid, self.gid) + write_metadata(self.datadir, metadata) + self.metadata = metadata + self.dir_exists = True + + def put_obj(self, content_length, timestamp): + self.metadata[X_OBJECTS_COUNT] = int(self.metadata[X_OBJECTS_COUNT]) + 1 + self.metadata[X_PUT_TIMESTAMP] = timestamp + self.metadata[X_BYTES_USED] = int(self.metadata[X_BYTES_USED]) + int(content_length) + #TODO: define update_metadata instad of writing whole metadata again. + self.put_metadata(self.metadata) + + def delete_obj(self, content_length): + self.metadata[X_OBJECTS_COUNT] = int(self.metadata[X_OBJECTS_COUNT]) - 1 + self.metadata[X_BYTES_USED] = int(self.metadata[X_BYTES_USED]) - int(content_length) + self.put_metadata(self.metadata) + + def put_container(self, timestamp, object_count, bytes_used): + """ + For account server. + """ + self.metadata[X_OBJECTS_COUNT] = 0 + self.metadata[X_BYTES_USED] = 0 + self.metadata[X_CONTAINER_COUNT] = int(self.metadata[X_CONTAINER_COUNT]) + 1 + self.metadata[X_PUT_TIMESTAMP] = timestamp + self.put_metadata(self.metadata) + + def delete_container(self, object_count, bytes_used): + """ + For account server. + """ + self.metadata[X_OBJECTS_COUNT] = 0 + self.metadata[X_BYTES_USED] = 0 + self.metadata[X_CONTAINER_COUNT] = int(self.metadata[X_CONTAINER_COUNT]) - 1 + self.put_metadata(self.metadata) + + def unlink(self): + """ + Remove directory/container if empty. + """ + if dir_empty(self.datadir): + rmdirs(self.datadir) + + def list_objects_iter(self, limit, marker, end_marker, + prefix, delimiter, path): + """ + Returns tuple of name, created_at, size, content_type, etag. + """ + if path: + prefix = path = path.rstrip('/') + '/' + delimiter = '/' + if delimiter and not prefix: + prefix = '' + + objects = [] + object_count = 0 + bytes_used = 0 + container_list = [] + + objects, object_count, bytes_used = get_container_details(self.datadir) + + if int(self.metadata[X_OBJECTS_COUNT]) != object_count or \ + int(self.metadata[X_BYTES_USED]) != bytes_used: + self.metadata[X_OBJECTS_COUNT] = object_count + self.metadata[X_BYTES_USED] = bytes_used + self.update_container(self.metadata) + + if objects: + objects.sort() + + if objects and prefix: + objects = self.filter_prefix(objects, prefix) + + if objects and delimiter: + objects = self.filter_delimiter(objects, delimiter, prefix) + + if objects and marker: + objects = self.filter_marker(objects, marker) + + if objects and end_marker: + objects = self.filter_end_marker(objects, end_marker) + + if objects and limit: + if len(objects) > limit: + objects = self.filter_limit(objects, limit) + + if objects: + for obj in objects: + list_item = [] + list_item.append(obj) + metadata = read_metadata(self.datadir + '/' + obj) + if not metadata or not validate_object(metadata): + metadata = create_object_metadata(self.datadir + '/' + obj) + if metadata: + list_item.append(metadata[X_TIMESTAMP]) + list_item.append(int(metadata[X_CONTENT_LENGTH])) + list_item.append(metadata[X_CONTENT_TYPE]) + list_item.append(metadata[X_ETAG]) + container_list.append(list_item) + + return container_list + + def update_container(self, metadata): + cont_path = self.datadir + write_metadata(cont_path, metadata) + self.metadata = metadata + + def update_object_count(self): + objects = [] + object_count = 0 + bytes_used = 0 + objects, object_count, bytes_used = get_container_details(self.datadir) + + + if int(self.metadata[X_OBJECTS_COUNT]) != object_count or \ + int(self.metadata[X_BYTES_USED]) != bytes_used: + self.metadata[X_OBJECTS_COUNT] = object_count + self.metadata[X_BYTES_USED] = bytes_used + self.update_container(self.metadata) + + def update_container_count(self): + containers = [] + container_count = 0 + + containers, container_count = get_account_details(self.datadir) + + if int(self.metadata[X_CONTAINER_COUNT]) != container_count: + self.metadata[X_CONTAINER_COUNT] = container_count + self.update_account(self.metadata) + + def get_info(self, include_metadata=False): + """ + Get global data for the container. + :returns: dict with keys: account, container, created_at, + put_timestamp, delete_timestamp, object_count, bytes_used, + reported_put_timestamp, reported_delete_timestamp, + reported_object_count, reported_bytes_used, hash, id, + x_container_sync_point1, and x_container_sync_point2. + If include_metadata is set, metadata is included as a key + pointing to a dict of tuples of the metadata + """ + # TODO: delete_timestamp, reported_put_timestamp + # reported_delete_timestamp, reported_object_count, + # reported_bytes_used, created_at + + metadata = {} + if os.path.exists(self.datadir): + metadata = read_metadata(self.datadir) + + data = {'account' : self.account, 'container' : self.name, + 'object_count' : metadata.get(X_OBJECTS_COUNT, '0'), + 'bytes_used' : metadata.get(X_BYTES_USED, '0'), + 'hash': '', 'id' : '', 'created_at' : '1', + 'put_timestamp' : metadata.get(X_PUT_TIMESTAMP, '0'), + 'delete_timestamp' : '1', + 'reported_put_timestamp' : '1', 'reported_delete_timestamp' : '1', + 'reported_object_count' : '1', 'reported_bytes_used' : '1'} + if include_metadata: + data['metadata'] = metadata + return data + + def put_object(self, name, timestamp, size, content_type, + etag, deleted=0): + # TODO: Implement the specifics of this func. + pass + + def initialize(self, timestamp): + pass + + def update_put_timestamp(self, timestamp): + """ + Create the container if it doesn't exist and update the timestamp + """ + if not os.path.exists(self.datadir): + self.put(self.metadata) + + def delete_object(self, name, timestamp): + # TODO: Implement the delete object + pass + + def delete_db(self, timestamp): + """ + Delete the container + """ + self.unlink() + + def update_metadata(self, metadata): + self.metadata.update(metadata) + write_metadata(self.datadir, self.metadata) + + +class DiskAccount(DiskDir): + def __init__(self, root, account, fs_object = None): + self.root = root + self.account = account + self.datadir = os.path.join(self.root, self.account) + if not check_mount(root, account): + check_valid_account(account, fs_object) + self.metadata = read_metadata(self.datadir) + if not self.metadata or not validate_account(self.metadata): + self.metadata = create_account_metadata(self.datadir) + + def list_containers_iter(self, limit, marker, end_marker, + prefix, delimiter): + """ + Return tuple of name, object_count, bytes_used, 0(is_subdir). + Used by account server. + """ + if delimiter and not prefix: + prefix = '' + containers = [] + container_count = 0 + account_list = [] + + containers, container_count = get_account_details(self.datadir) + + if int(self.metadata[X_CONTAINER_COUNT]) != container_count: + self.metadata[X_CONTAINER_COUNT] = container_count + self.update_account(self.metadata) + + if containers: + containers.sort() + + if containers and prefix: + containers = self.filter_prefix(containers, prefix) + + if containers and delimiter: + containers = self.filter_delimiter(containers, delimiter, prefix) + + if containers and marker: + containers = self.filter_marker(containers, marker) + + if containers and end_marker: + containers = self.filter_end_marker(containers, end_marker) + + if containers and limit: + if len(containers) > limit: + containers = self.filter_limit(containers, limit) + + if containers: + for cont in containers: + list_item = [] + metadata = None + list_item.append(cont) + metadata = read_metadata(self.datadir + '/' + cont) + if not metadata or not validate_container(metadata): + metadata = create_container_metadata(self.datadir + '/' + cont) + + if metadata: + list_item.append(metadata[X_OBJECTS_COUNT]) + list_item.append(metadata[X_BYTES_USED]) + list_item.append(0) + account_list.append(list_item) + + return account_list + + def get_info(self, include_metadata=False): + """ + Get global data for the account. + :returns: dict with keys: account, created_at, put_timestamp, + delete_timestamp, container_count, object_count, + bytes_used, hash, id + """ + metadata = {} + if (os.path.exists(self.datadir)): + metadata = read_metadata(self.datadir) + if not metadata: + metadata = create_account_metadata(self.datadir) + + data = {'account' : self.account, 'created_at' : '1', + 'put_timestamp' : '1', 'delete_timestamp' : '1', + 'container_count' : metadata.get(X_CONTAINER_COUNT, 0), + 'object_count' : metadata.get(X_OBJECTS_COUNT, 0), + 'bytes_used' : metadata.get(X_BYTES_USED, 0), + 'hash' : '', 'id' : ''} + + if include_metadata: + data['metadata'] = metadata + return data |