From 513988915aa1af13a989d062b021fe1562cbf18d Mon Sep 17 00:00:00 2001 From: venkata edara Date: Wed, 10 May 2017 13:27:38 +0530 Subject: Rebase to Swift 2.10.1 (newton) Change-Id: I53a962c9a301089c8aed0b43c50f944c30225944 Signed-off-by: venkata edara Reviewed-on: https://review.gluster.org/16653 Reviewed-by: Prashanth Pai Tested-by: Prashanth Pai --- gluster/swift/common/DiskDir.py | 23 ++- gluster/swift/common/constraints.py | 11 ++ gluster/swift/common/exceptions.py | 6 + .../common/middleware/gswauth/swauth/middleware.py | 6 +- gluster/swift/common/utils.py | 208 ++++++++++++++++++++- 5 files changed, 244 insertions(+), 10 deletions(-) (limited to 'gluster/swift/common') diff --git a/gluster/swift/common/DiskDir.py b/gluster/swift/common/DiskDir.py index 4f4a2ef..0bc95df 100644 --- a/gluster/swift/common/DiskDir.py +++ b/gluster/swift/common/DiskDir.py @@ -33,7 +33,7 @@ from gluster.swift.common.exceptions import FileOrDirNotFoundError, \ from gluster.swift.obj.expirer import delete_tracker_object from swift.common.constraints import MAX_META_COUNT, MAX_META_OVERALL_SIZE from swift.common.swob import HTTPBadRequest -from swift.common.utils import ThreadPool +from gluster.swift.common.utils import ThreadPool DATADIR = 'containers' @@ -399,7 +399,7 @@ class DiskDir(DiskCommon): def list_objects_iter(self, limit, marker, end_marker, prefix, delimiter, path=None, storage_policy_index=0, - out_content_type=None): + out_content_type=None, reverse=False): """ Returns tuple of name, created_at, size, content_type, etag. """ @@ -427,6 +427,9 @@ class DiskDir(DiskCommon): # No objects in container , return empty list return container_list + if marker and end_marker and reverse: + marker, end_marker = end_marker, marker + if end_marker: objects = filter_end_marker(objects, end_marker) @@ -471,6 +474,8 @@ class DiskDir(DiskCommon): container_list.append((obj, '0', 0, 'text/plain', '')) if len(container_list) >= limit: break + if reverse: + container_list.reverse() return container_list count = 0 @@ -512,7 +517,8 @@ class DiskDir(DiskCommon): count += 1 if count >= limit: break - + if reverse: + container_list.reverse() return container_list def _update_object_count(self): @@ -778,7 +784,8 @@ class DiskAccount(DiskCommon): return containers def list_containers_iter(self, limit, marker, end_marker, - prefix, delimiter, response_content_type=None): + prefix, delimiter, response_content_type=None, + reverse=False): """ Return tuple of name, object_count, bytes_used, 0(is_subdir). Used by account server. @@ -794,6 +801,9 @@ class DiskAccount(DiskCommon): # No containers in account, return empty list return account_list + if marker and end_marker and reverse: + marker, end_marker = end_marker, marker + if containers and end_marker: containers = filter_end_marker(containers, end_marker) @@ -841,6 +851,8 @@ class DiskAccount(DiskCommon): account_list.append((container, 0, 0, 0)) if len(account_list) >= limit: break + if reverse: + account_list.reverse() return account_list count = 0 @@ -866,7 +878,8 @@ class DiskAccount(DiskCommon): count += 1 if count >= limit: break - + if reverse: + account_list.reverse() return account_list def get_info(self): diff --git a/gluster/swift/common/constraints.py b/gluster/swift/common/constraints.py index 98e2a27..2007b71 100644 --- a/gluster/swift/common/constraints.py +++ b/gluster/swift/common/constraints.py @@ -102,3 +102,14 @@ _ring.Ring = ring.Ring import swift.account.utils from gluster.swift.account.utils import account_listing_response as gf_als swift.account.utils.account_listing_response = gf_als + +# Monkey patch StoragePolicy.load_ring as POLICIES are initialized already +from swift.common.storage_policy import StoragePolicy + + +def load_ring(self, swift_dir): + if self.object_ring: + return + self.object_ring = ring.Ring(swift_dir, ring_name='object') + +StoragePolicy.load_ring = load_ring diff --git a/gluster/swift/common/exceptions.py b/gluster/swift/common/exceptions.py index 8260dd9..4dc2878 100644 --- a/gluster/swift/common/exceptions.py +++ b/gluster/swift/common/exceptions.py @@ -13,6 +13,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +from swift.common.exceptions import SwiftException + class GlusterFileSystemOSError(OSError): pass @@ -48,3 +50,7 @@ class AlreadyExistsAsFile(GlusterfsException): class DiskFileContainerDoesNotExist(GlusterfsException): pass + + +class ThreadPoolDead(SwiftException): + pass diff --git a/gluster/swift/common/middleware/gswauth/swauth/middleware.py b/gluster/swift/common/middleware/gswauth/swauth/middleware.py index 7a6d713..a266d74 100644 --- a/gluster/swift/common/middleware/gswauth/swauth/middleware.py +++ b/gluster/swift/common/middleware/gswauth/swauth/middleware.py @@ -379,7 +379,7 @@ class Swauth(object): if memcache_client: memcache_client.set( memcache_key, (time() + expires_from_now, groups), - timeout=expires_from_now) + time=expires_from_now) else: path = quote('/v1/%s/.token_%s/%s' % (self.auth_account, token[-1], token)) @@ -401,7 +401,7 @@ class Swauth(object): memcache_client.set( memcache_key, (detail['expires'], groups), - timeout=float(detail['expires'] - time())) + time=float(detail['expires'] - time())) return groups def authorize(self, req): @@ -1448,7 +1448,7 @@ class Swauth(object): (self.itoken_expires, '%s,.reseller_admin,%s' % (self.metadata_volume, self.auth_account)), - timeout=self.token_life) + time=self.token_life) return self.itoken def get_admin_detail(self, req): diff --git a/gluster/swift/common/utils.py b/gluster/swift/common/utils.py index 8f68319..ac41698 100644 --- a/gluster/swift/common/utils.py +++ b/gluster/swift/common/utils.py @@ -14,17 +14,23 @@ # limitations under the License. import os +import sys import stat import json import errno import random import logging from hashlib import md5 -from eventlet import sleep +from eventlet import sleep, Timeout, tpool, greenthread, \ + greenio, event +from Queue import Queue, Empty +import threading as stdlib_threading + import cPickle as pickle from cStringIO import StringIO import pickletools -from gluster.swift.common.exceptions import GlusterFileSystemIOError +from gluster.swift.common.exceptions import GlusterFileSystemIOError, \ + ThreadPoolDead from swift.common.exceptions import DiskFileNoSpace from swift.common.db import utf8encodekeys from gluster.swift.common.fs_utils import do_getctime, do_getmtime, do_stat, \ @@ -69,6 +75,204 @@ PICKLE_PROTOCOL = 2 CHUNK_SIZE = 65536 +class ThreadPool(object): + """ + Perform blocking operations in background threads. + + Call its methods from within greenlets to green-wait for results without + blocking the eventlet reactor (hopefully). + """ + + BYTE = 'a'.encode('utf-8') + + def __init__(self, nthreads=2): + self.nthreads = nthreads + self._run_queue = Queue() + self._result_queue = Queue() + self._threads = [] + self._alive = True + + if nthreads <= 0: + return + + # We spawn a greenthread whose job it is to pull results from the + # worker threads via a real Queue and send them to eventlet Events so + # that the calling greenthreads can be awoken. + # + # Since each OS thread has its own collection of greenthreads, it + # doesn't work to have the worker thread send stuff to the event, as + # it then notifies its own thread-local eventlet hub to wake up, which + # doesn't do anything to help out the actual calling greenthread over + # in the main thread. + # + # Thus, each worker sticks its results into a result queue and then + # writes a byte to a pipe, signaling the result-consuming greenlet (in + # the main thread) to wake up and consume results. + # + # This is all stuff that eventlet.tpool does, but that code can't have + # multiple instances instantiated. Since the object server uses one + # pool per disk, we have to reimplement this stuff. + _raw_rpipe, self.wpipe = os.pipe() + self.rpipe = greenio.GreenPipe(_raw_rpipe, 'rb', bufsize=0) + + for _junk in xrange(nthreads): + thr = stdlib_threading.Thread( + target=self._worker, + args=(self._run_queue, self._result_queue)) + thr.daemon = True + thr.start() + self._threads.append(thr) + # This is the result-consuming greenthread that runs in the main OS + # thread, as described above. + self._consumer_coro = greenthread.spawn_n(self._consume_results, + self._result_queue) + + def _worker(self, work_queue, result_queue): + """ + Pulls an item from the queue and runs it, then puts the result into + the result queue. Repeats forever. + + :param work_queue: queue from which to pull work + :param result_queue: queue into which to place results + """ + while True: + item = work_queue.get() + if item is None: + break + ev, func, args, kwargs = item + try: + result = func(*args, **kwargs) + result_queue.put((ev, True, result)) + except BaseException: + result_queue.put((ev, False, sys.exc_info())) + finally: + work_queue.task_done() + os.write(self.wpipe, self.BYTE) + + def _consume_results(self, queue): + """ + Runs as a greenthread in the same OS thread as callers of + run_in_thread(). + + Takes results from the worker OS threads and sends them to the waiting + greenthreads. + """ + while True: + try: + self.rpipe.read(1) + except ValueError: + # can happen at process shutdown when pipe is closed + break + + while True: + try: + ev, success, result = queue.get(block=False) + except Empty: + break + + try: + if success: + ev.send(result) + else: + ev.send_exception(*result) + finally: + queue.task_done() + + def run_in_thread(self, func, *args, **kwargs): + """ + Runs func(*args, **kwargs) in a thread. Blocks the current greenlet + until results are available. + + Exceptions thrown will be reraised in the calling thread. + + If the threadpool was initialized with nthreads=0, it invokes + func(*args, **kwargs) directly, followed by eventlet.sleep() to ensure + the eventlet hub has a chance to execute. It is more likely the hub + will be invoked when queuing operations to an external thread. + + :returns: result of calling func + :raises: whatever func raises + """ + if not self._alive: + raise ThreadPoolDead() + + if self.nthreads <= 0: + result = func(*args, **kwargs) + sleep() + return result + + ev = event.Event() + self._run_queue.put((ev, func, args, kwargs), block=False) + + # blocks this greenlet (and only *this* greenlet) until the real + # thread calls ev.send(). + result = ev.wait() + return result + + def _run_in_eventlet_tpool(self, func, *args, **kwargs): + """ + Really run something in an external thread, even if we haven't got any + threads of our own. + """ + def inner(): + try: + return (True, func(*args, **kwargs)) + except (Timeout, BaseException) as err: + return (False, err) + + success, result = tpool.execute(inner) + if success: + return result + else: + raise result + + def force_run_in_thread(self, func, *args, **kwargs): + """ + Runs func(*args, **kwargs) in a thread. Blocks the current greenlet + until results are available. + + Exceptions thrown will be reraised in the calling thread. + + If the threadpool was initialized with nthreads=0, uses eventlet.tpool + to run the function. This is in contrast to run_in_thread(), which + will (in that case) simply execute func in the calling thread. + + :returns: result of calling func + :raises: whatever func raises + """ + if not self._alive: + raise ThreadPoolDead() + + if self.nthreads <= 0: + return self._run_in_eventlet_tpool(func, *args, **kwargs) + else: + return self.run_in_thread(func, *args, **kwargs) + + def terminate(self): + """ + Releases the threadpool's resources (OS threads, greenthreads, pipes, + etc.) and renders it unusable. + + Don't call run_in_thread() or force_run_in_thread() after calling + terminate(). + """ + self._alive = False + if self.nthreads <= 0: + return + + for _junk in range(self.nthreads): + self._run_queue.put(None) + for thr in self._threads: + thr.join() + self._threads = [] + self.nthreads = 0 + + greenthread.kill(self._consumer_coro) + + self.rpipe.close() + os.close(self.wpipe) + + class SafeUnpickler(object): """ Loading a pickled stream is potentially unsafe and exploitable because -- cgit