# # Copyright (c) 2011-2014 Red Hat, Inc. # This file is part of GlusterFS. # This file is licensed to you under your choice of the GNU Lesser # General Public License, version 3 or any later version (LGPLv3 or # later), or the GNU General Public License, version 2 (GPLv2), in all # cases as published by the Free Software Foundation. # import os import sys import time import logging from threading import Condition try: import _thread as thread except ImportError: import thread try: from queue import Queue except ImportError: from Queue import Queue try: import cPickle as pickle except ImportError: import pickle from syncdutils import Thread, select, lf pickle_proto = 2 repce_version = 1.0 def ioparse(i, o): if isinstance(i, int): i = os.fdopen(i, 'rb') # rely on duck typing for recognizing # streams as that works uniformly # in py2 and py3 if hasattr(o, 'fileno'): o = o.fileno() return (i, o) def send(out, *args): """pickle args and write out wholly in one syscall ie. not use the ability of pickle to dump directly to a stream, as that would potentially mess up messages by interleaving them """ os.write(out, pickle.dumps(args, pickle_proto)) def recv(inf): """load an object from input stream python2 and python3 compatibility, inf is sys.stdin and is opened as text stream by default. Hence using the buffer attribute in python3 """ if hasattr(inf, "buffer"): return pickle.load(inf.buffer) else: return pickle.load(inf) class RepceServer(object): """RePCe is Hungarian for canola, http://hu.wikipedia.org/wiki/Repce ... also our homebrewed RPC backend where the transport layer is reduced to a pair of filehandles. This is the server component. """ def __init__(self, obj, i, o, wnum=6): """register a backend object .obj to which incoming messages are dispatched, also incoming/outcoming streams """ self.obj = obj self.inf, self.out = ioparse(i, o) self.wnum = wnum self.q = Queue() def service_loop(self): """fire up worker threads, get messages and dispatch among them""" for i in range(self.wnum): t = Thread(target=self.worker) t.start() try: while True: self.q.put(recv(self.inf)) except EOFError: logging.info("terminating on reaching EOF.") def worker(self): """life of a worker Get message, extract its id, method name and arguments (kwargs not supported), call method on .obj. Send back message id + return value. If method call throws an exception, rescue it, and send back the exception as result (with flag marking it as exception). """ while True: in_data = self.q.get(True) rid = in_data[0] rmeth = in_data[1] exc = False if rmeth == '__repce_version__': res = repce_version else: try: res = getattr(self.obj, rmeth)(*in_data[2:]) except: res = sys.exc_info()[1] exc = True logging.exception("call failed: ") send(self.out, rid, exc, res) class RepceJob(object): """class representing message status we can use for waiting on reply""" def __init__(self, cbk): """ - .rid: (process-wise) unique id - .cbk: what we do upon receiving reply """ self.rid = (os.getpid(), thread.get_ident(), time.time()) self.cbk = cbk self.lever = Condition() self.done = False def __repr__(self): return ':'.join([str(x) for x in self.rid]) def wait(self): self.lever.acquire() if not self.done: self.lever.wait() self.lever.release() return self.result def wakeup(self, data): self.result = data self.lever.acquire() self.done = True self.lever.notify() self.lever.release() class RepceClient(object): """RePCe is Hungarian for canola, http://hu.wikipedia.org/wiki/Repce ... also our homebrewed RPC backend where the transport layer is reduced to a pair of filehandles. This is the client component. """ def __init__(self, i, o): self.inf, self.out = ioparse(i, o) self.jtab = {} t = Thread(target=self.listen) t.start() def listen(self): while True: select((self.inf,), (), ()) rid, exc, res = recv(self.inf) rjob = self.jtab.pop(rid) if rjob.cbk: rjob.cbk(rjob, [exc, res]) def push(self, meth, *args, **kw): """wrap arguments in a RepceJob, send them to server and return the RepceJob @cbk to pass on RepceJob can be given as kwarg. """ cbk = kw.get('cbk') if not cbk: def cbk(rj, res): if res[0]: raise res[1] rjob = RepceJob(cbk) self.jtab[rjob.rid] = rjob logging.debug("call %s %s%s ..." % (repr(rjob), meth, repr(args))) send(self.out, rjob.rid, meth, *args) return rjob def __call__(self, meth, *args): """RePCe client is callabe, calling it implements a synchronous remote call. We do a .push with a cbk which does a wakeup upon receiving answer, then wait on the RepceJob. """ rjob = self.push( meth, *args, **{'cbk': lambda rj, res: rj.wakeup(res)}) exc, res = rjob.wait() if exc: logging.error(lf('call failed', call=repr(rjob), method=meth, error=str(type(res).__name__))) raise res logging.debug("call %s %s -> %s" % (repr(rjob), meth, repr(res))) return res class mprx(object): """method proxy, standard trick to implement rubyesque method_missing in Python A class is a closure factory, you know what I mean, or go read some SICP. """ def __init__(self, ins, meth): self.ins = ins self.meth = meth def __call__(self, *a): return self.ins(self.meth, *a) def __getattr__(self, meth): """this implements transparent method dispatch to remote object, so that you don't need to call the RepceClient instance like rclient('how_old_are_you_if_born_in', 1979) but you can make it into an ordinary method call like rclient.how_old_are_you_if_born_in(1979) """ return self.mprx(self, meth) def __version__(self): """used in handshake to verify compatibility""" d = {'proto': self('__repce_version__')} try: d['object'] = self('version') except AttributeError: pass return d