summaryrefslogtreecommitdiffstats
path: root/ufo/gluster/swift/common/DiskFile.py
blob: 6404be6d68fb7206463f0e4031ed00cd9f04d8af (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
# Copyright (c) 2012 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import os
from eventlet import tpool
from tempfile import mkstemp
from contextlib import contextmanager
from swift.common.utils import normalize_timestamp, renamer
from gluster.swift.common.utils import mkdirs, rmdirs, validate_object, \
     create_object_metadata,  do_open, do_close, do_unlink, do_chown, \
     do_stat, do_listdir, read_metadata, write_metadata
from gluster.swift.common.utils import X_CONTENT_TYPE, X_CONTENT_LENGTH, \
     X_TIMESTAMP, X_PUT_TIMESTAMP, X_TYPE, X_ETAG, X_OBJECTS_COUNT, \
     X_BYTES_USED, X_OBJECT_TYPE, FILE, DIR, MARKER_DIR, OBJECT, DIR_TYPE, \
     FILE_TYPE, DEFAULT_UID, DEFAULT_GID

import logging
from swift.obj.server import DiskFile


DATADIR = 'objects'
ASYNCDIR = 'async_pending'
KEEP_CACHE_SIZE = (5 * 1024 * 1024)
# keep these lower-case
DISALLOWED_HEADERS = set('content-length content-type deleted etag'.split())


class Gluster_DiskFile(DiskFile):
    """
    Manage object files on disk.

    :param path: path to devices on the node/mount path for UFO.
    :param device: device name/account_name for UFO.
    :param partition: partition on the device the object lives in
    :param account: account name for the object
    :param container: container name for the object
    :param obj: object name for the object
    :param keep_data_fp: if True, don't close the fp, otherwise close it
    :param disk_chunk_Size: size of chunks on file reads
    :param uid: user ID disk object should assume (file or directory)
    :param gid: group ID disk object should assume (file or directory)
    """

    def __init__(self, path, device, partition, account, container, obj,
                 logger, keep_data_fp=False, disk_chunk_size=65536,
                 uid=DEFAULT_UID, gid=DEFAULT_GID):
        self.disk_chunk_size = disk_chunk_size
        device = account
        #Don't support obj_name ending/begining with '/', like /a, a/, /a/b/ etc
        obj = obj.strip('/')
        if '/' in obj:
            self.obj_path, self.obj = obj.rsplit('/', 1)
        else:
            self.obj_path = ''
            self.obj = obj

        if self.obj_path:
            self.name = '/'.join((container, self.obj_path))
        else:
            self.name = container
        #Absolute path for obj directory.
        self.datadir = os.path.join(path, device, self.name)
        self.device_path = os.path.join(path, device)
        self.container_path = os.path.join(path, device, container)
        self.tmpdir = os.path.join(path, device, 'tmp')
        self.logger = logger
        self.metadata = {}
        self.data_file = None
        self.fp = None
        self.iter_etag = None
        self.started_at_0 = False
        self.read_to_eof = False
        self.quarantined_dir = None
        self.keep_cache = False
        self.is_dir = False
        self.is_valid = True
        self.uid = int(uid)
        self.gid = int(gid)
        if not os.path.exists(self.datadir + '/' + self.obj):
            return

        self.data_file = os.path.join(self.datadir, self.obj)
        self.metadata = read_metadata(self.datadir + '/' + self.obj)
        if not self.metadata:
            create_object_metadata(self.datadir + '/' + self.obj)
            self.metadata = read_metadata(self.datadir + '/' + self.obj)

        if not validate_object(self.metadata):
            create_object_metadata(self.datadir + '/' + self.obj)
            self.metadata = read_metadata(self.datadir + '/' +
                                        self.obj)

        self.filter_metadata()

        if os.path.isdir(self.datadir + '/' + self.obj):
            self.is_dir = True
        else:
            self.fp = do_open(self.data_file, 'rb')
            if not keep_data_fp:
                self.close(verify_file=False)

    def close(self, verify_file=True):
        """
        Close the file. Will handle quarantining file if necessary.

        :param verify_file: Defaults to True. If false, will not check
                            file to see if it needs quarantining.
        """
        #Marker directory
        if self.is_dir:
            return
        if self.fp:
            do_close(self.fp)
            self.fp = None

    def is_deleted(self):
        """
        Check if the file is deleted.

        :returns: True if the file doesn't exist or has been flagged as
                  deleted.
        """
        return not self.data_file

    def create_dir_object(self, dir_path):
        #TODO: if object already exists???
        if os.path.exists(dir_path) and not os.path.isdir(dir_path):
            self.logger.error("Deleting file %s", dir_path)
            do_unlink(dir_path)
        #If dir aleady exist just override metadata.
        mkdirs(dir_path)
        do_chown(dir_path, self.uid, self.gid)
        create_object_metadata(dir_path)
        return True

    def put_metadata(self, metadata):
        obj_path = self.datadir + '/' + self.obj
        write_metadata(obj_path, metadata)
        self.metadata = metadata

    def put(self, fd, tmppath, metadata, extension=''):
        """
        Finalize writing the file on disk, and renames it from the temp file to
        the real location.  This should be called after the data has been
        written to the temp file.

        :params fd: file descriptor of the temp file
        :param tmppath: path to the temporary file being used
        :param metadata: dictionary of metadata to be written
        :param extention: extension to be used when making the file
        """
        if extension == '.ts':
            # TombStone marker (deleted)
            return True

        # Fix up the metadata to ensure it has a proper value for the
        # Content-Type metadata, as well as an X_TYPE and X_OBJECT_TYPE
        # metadata values.

        content_type = metadata['Content-Type']
        if not content_type:
            metadata['Content-Type'] = FILE_TYPE
            x_object_type = FILE
        else:
            x_object_type = MARKER_DIR if content_type.lower() == DIR_TYPE else FILE
        metadata[X_TYPE] = OBJECT
        metadata[X_OBJECT_TYPE] = x_object_type

        if extension == '.meta':
            # Metadata recorded separately from the file
            self.put_metadata(metadata)
            return True

        extension = ''

        if metadata[X_OBJECT_TYPE] == MARKER_DIR:
            self.create_dir_object(os.path.join(self.datadir, self.obj))
            self.put_metadata(metadata)
            self.data_file = self.datadir + '/' + self.obj
            return True

        # Check if directory already exists.
        if self.is_dir:
            self.logger.error('Directory already exists %s/%s' % \
                          (self.datadir , self.obj))
            return False

        timestamp = normalize_timestamp(metadata[X_TIMESTAMP])
        write_metadata(tmppath, metadata)
        if X_CONTENT_LENGTH in metadata:
            self.drop_cache(fd, 0, int(metadata[X_CONTENT_LENGTH]))
        tpool.execute(os.fsync, fd)
        if self.obj_path:
            dir_objs = self.obj_path.split('/')
            tmp_path = ''
            if len(dir_objs):
                for dir_name in dir_objs:
                    if tmp_path:
                        tmp_path = tmp_path + '/' + dir_name
                    else:
                        tmp_path = dir_name
                    if not self.create_dir_object(os.path.join(self.container_path,
                            tmp_path)):
                        self.logger.error("Failed in subdir %s",\
                                        os.path.join(self.container_path,tmp_path))
                        return False

        renamer(tmppath, os.path.join(self.datadir,
                                      self.obj + extension))
        do_chown(os.path.join(self.datadir, self.obj + extension), \
              self.uid, self.gid)
        self.metadata = metadata
        self.data_file = self.datadir + '/' + self.obj + extension
        return True

    def unlinkold(self, timestamp):
        """
        Remove any older versions of the object file.  Any file that has an
        older timestamp than timestamp will be deleted.

        :param timestamp: timestamp to compare with each file
        """
        if self.metadata and self.metadata['X-Timestamp'] != timestamp:
            self.unlink()

    def unlink(self):
        """
        Remove the file.
        """
        #Marker dir.
        if self.is_dir:
            rmdirs(os.path.join(self.datadir, self.obj))
            if not os.path.isdir(os.path.join(self.datadir, self.obj)):
                self.metadata = {}
                self.data_file = None
            else:
                logging.error('Unable to delete dir %s' % os.path.join(self.datadir, self.obj))
            return

        for fname in do_listdir(self.datadir):
            if fname == self.obj:
                try:
                    do_unlink(os.path.join(self.datadir, fname))
                except OSError, err:
                    if err.errno != errno.ENOENT:
                        raise

        #Remove entire path for object.
        #remove_dir_path(self.obj_path, self.container_path)

        self.metadata = {}
        self.data_file = None

    def get_data_file_size(self):
        """
        Returns the os.path.getsize for the file.  Raises an exception if this
        file does not match the Content-Length stored in the metadata. Or if
        self.data_file does not exist.

        :returns: file size as an int
        :raises DiskFileError: on file size mismatch.
        :raises DiskFileNotExist: on file not existing (including deleted)
        """
        #Marker directory.
        if self.is_dir:
            return 0
        try:
            file_size = 0
            if self.data_file:
                file_size = os.path.getsize(self.data_file)
                if  X_CONTENT_LENGTH in self.metadata:
                    metadata_size = int(self.metadata[X_CONTENT_LENGTH])
                    if file_size != metadata_size:
                        self.metadata[X_CONTENT_LENGTH] = file_size
                        self.update_object(self.metadata)

                return file_size
        except OSError, err:
            if err.errno != errno.ENOENT:
                raise
        raise DiskFileNotExist('Data File does not exist.')

    def update_object(self, metadata):
        obj_path = self.datadir + '/' + self.obj
        write_metadata(obj_path, metadata)
        self.metadata = metadata

    def filter_metadata(self):
        if X_TYPE in self.metadata:
            self.metadata.pop(X_TYPE)
        if X_OBJECT_TYPE in self.metadata:
            self.metadata.pop(X_OBJECT_TYPE)

    @contextmanager
    def mkstemp(self):
        """Contextmanager to make a temporary file."""

        if not os.path.exists(self.tmpdir):
            mkdirs(self.tmpdir)
        fd, tmppath = mkstemp(dir=self.tmpdir)
        try:
            yield fd, tmppath
        finally:
            try:
                os.close(fd)
            except OSError:
                pass
            try:
                os.unlink(tmppath)
            except OSError:
                pass