diff options
Diffstat (limited to 'geo-replication/syncdaemon/repce.py')
| -rw-r--r-- | geo-replication/syncdaemon/repce.py | 82 |
1 files changed, 55 insertions, 27 deletions
diff --git a/geo-replication/syncdaemon/repce.py b/geo-replication/syncdaemon/repce.py index 755fb61df48..c622afa6373 100644 --- a/geo-replication/syncdaemon/repce.py +++ b/geo-replication/syncdaemon/repce.py @@ -1,32 +1,40 @@ +# +# Copyright (c) 2011-2014 Red Hat, Inc. <http://www.redhat.com> +# This file is part of GlusterFS. + +# This file is licensed to you under your choice of the GNU Lesser +# General Public License, version 3 or any later version (LGPLv3 or +# later), or the GNU General Public License, version 2 (GPLv2), in all +# cases as published by the Free Software Foundation. +# + import os import sys import time import logging from threading import Condition try: - import thread -except ImportError: - # py 3 import _thread as thread -try: - from Queue import Queue except ImportError: - # py 3 + import thread +try: from queue import Queue +except ImportError: + from Queue import Queue try: import cPickle as pickle except ImportError: - # py 3 import pickle -from syncdutils import Thread, select +from syncdutils import Thread, select, lf -pickle_proto = -1 +pickle_proto = 2 repce_version = 1.0 + def ioparse(i, o): if isinstance(i, int): - i = os.fdopen(i) + i = os.fdopen(i, 'rb') # rely on duck typing for recognizing # streams as that works uniformly # in py2 and py3 @@ -34,6 +42,7 @@ def ioparse(i, o): o = o.fileno() return (i, o) + def send(out, *args): """pickle args and write out wholly in one syscall @@ -43,12 +52,21 @@ def send(out, *args): """ os.write(out, pickle.dumps(args, pickle_proto)) + def recv(inf): - """load an object from input stream""" - return pickle.load(inf) + """load an object from input stream + python2 and python3 compatibility, inf is sys.stdin + and is opened as text stream by default. Hence using the + buffer attribute in python3 + """ + if hasattr(inf, "buffer"): + return pickle.load(inf.buffer) + else: + return pickle.load(inf) class RepceServer(object): + """RePCe is Hungarian for canola, http://hu.wikipedia.org/wiki/Repce ... also our homebrewed RPC backend where the transport layer is @@ -95,16 +113,17 @@ class RepceServer(object): if rmeth == '__repce_version__': res = repce_version else: - try: - res = getattr(self.obj, rmeth)(*in_data[2:]) - except: - res = sys.exc_info()[1] - exc = True - logging.exception("call failed: ") + try: + res = getattr(self.obj, rmeth)(*in_data[2:]) + except: + res = sys.exc_info()[1] + exc = True + logging.exception("call failed: ") send(self.out, rid, exc, res) class RepceJob(object): + """class representing message status we can use for waiting on reply""" @@ -137,6 +156,7 @@ class RepceJob(object): class RepceClient(object): + """RePCe is Hungarian for canola, http://hu.wikipedia.org/wiki/Repce ... also our homebrewed RPC backend where the transport layer is @@ -148,7 +168,7 @@ class RepceClient(object): def __init__(self, i, o): self.inf, self.out = ioparse(i, o) self.jtab = {} - t = Thread(target = self.listen) + t = Thread(target=self.listen) t.start() def listen(self): @@ -177,25 +197,33 @@ class RepceClient(object): return rjob def __call__(self, meth, *args): - """RePCe client is callabe, calling it implements a synchronous remote call + """RePCe client is callabe, calling it implements a synchronous + remote call. - We do a .push with a cbk which does a wakeup upon receiving anwser, then wait - on the RepceJob. + We do a .push with a cbk which does a wakeup upon receiving answer, + then wait on the RepceJob. """ - rjob = self.push(meth, *args, **{'cbk': lambda rj, res: rj.wakeup(res)}) + rjob = self.push( + meth, *args, **{'cbk': lambda rj, res: rj.wakeup(res)}) exc, res = rjob.wait() if exc: - logging.error('call %s (%s) failed on peer with %s' % (repr(rjob), meth, str(type(res).__name__))) + logging.error(lf('call failed', + call=repr(rjob), + method=meth, + error=str(type(res).__name__))) raise res logging.debug("call %s %s -> %s" % (repr(rjob), meth, repr(res))) return res class mprx(object): - """method proxy, standard trick to implement rubyesque method_missing - in Python - A class is a closure factory, you know what I mean, or go read some SICP. + """method proxy, standard trick to implement rubyesque + method_missing in Python + + A class is a closure factory, you know what I mean, or go read + some SICP. """ + def __init__(self, ins, meth): self.ins = ins self.meth = meth |
