summaryrefslogtreecommitdiffstats
path: root/geo-replication/syncdaemon/repce.py
diff options
context:
space:
mode:
Diffstat (limited to 'geo-replication/syncdaemon/repce.py')
-rw-r--r--geo-replication/syncdaemon/repce.py225
1 files changed, 225 insertions, 0 deletions
diff --git a/geo-replication/syncdaemon/repce.py b/geo-replication/syncdaemon/repce.py
new file mode 100644
index 0000000..755fb61
--- /dev/null
+++ b/geo-replication/syncdaemon/repce.py
@@ -0,0 +1,225 @@
+import os
+import sys
+import time
+import logging
+from threading import Condition
+try:
+ import thread
+except ImportError:
+ # py 3
+ import _thread as thread
+try:
+ from Queue import Queue
+except ImportError:
+ # py 3
+ from queue import Queue
+try:
+ import cPickle as pickle
+except ImportError:
+ # py 3
+ import pickle
+
+from syncdutils import Thread, select
+
+pickle_proto = -1
+repce_version = 1.0
+
+def ioparse(i, o):
+ if isinstance(i, int):
+ i = os.fdopen(i)
+ # rely on duck typing for recognizing
+ # streams as that works uniformly
+ # in py2 and py3
+ if hasattr(o, 'fileno'):
+ o = o.fileno()
+ return (i, o)
+
+def send(out, *args):
+ """pickle args and write out wholly in one syscall
+
+ ie. not use the ability of pickle to dump directly to
+ a stream, as that would potentially mess up messages
+ by interleaving them
+ """
+ os.write(out, pickle.dumps(args, pickle_proto))
+
+def recv(inf):
+ """load an object from input stream"""
+ return pickle.load(inf)
+
+
+class RepceServer(object):
+ """RePCe is Hungarian for canola, http://hu.wikipedia.org/wiki/Repce
+
+ ... also our homebrewed RPC backend where the transport layer is
+ reduced to a pair of filehandles.
+
+ This is the server component.
+ """
+
+ def __init__(self, obj, i, o, wnum=6):
+ """register a backend object .obj to which incoming messages
+ are dispatched, also incoming/outcoming streams
+ """
+ self.obj = obj
+ self.inf, self.out = ioparse(i, o)
+ self.wnum = wnum
+ self.q = Queue()
+
+ def service_loop(self):
+ """fire up worker threads, get messages and dispatch among them"""
+ for i in range(self.wnum):
+ t = Thread(target=self.worker)
+ t.start()
+ try:
+ while True:
+ self.q.put(recv(self.inf))
+ except EOFError:
+ logging.info("terminating on reaching EOF.")
+
+ def worker(self):
+ """life of a worker
+
+ Get message, extract its id, method name and arguments
+ (kwargs not supported), call method on .obj.
+ Send back message id + return value.
+ If method call throws an exception, rescue it, and send
+ back the exception as result (with flag marking it as
+ exception).
+ """
+ while True:
+ in_data = self.q.get(True)
+ rid = in_data[0]
+ rmeth = in_data[1]
+ exc = False
+ if rmeth == '__repce_version__':
+ res = repce_version
+ else:
+ try:
+ res = getattr(self.obj, rmeth)(*in_data[2:])
+ except:
+ res = sys.exc_info()[1]
+ exc = True
+ logging.exception("call failed: ")
+ send(self.out, rid, exc, res)
+
+
+class RepceJob(object):
+ """class representing message status we can use
+ for waiting on reply"""
+
+ def __init__(self, cbk):
+ """
+ - .rid: (process-wise) unique id
+ - .cbk: what we do upon receiving reply
+ """
+ self.rid = (os.getpid(), thread.get_ident(), time.time())
+ self.cbk = cbk
+ self.lever = Condition()
+ self.done = False
+
+ def __repr__(self):
+ return ':'.join([str(x) for x in self.rid])
+
+ def wait(self):
+ self.lever.acquire()
+ if not self.done:
+ self.lever.wait()
+ self.lever.release()
+ return self.result
+
+ def wakeup(self, data):
+ self.result = data
+ self.lever.acquire()
+ self.done = True
+ self.lever.notify()
+ self.lever.release()
+
+
+class RepceClient(object):
+ """RePCe is Hungarian for canola, http://hu.wikipedia.org/wiki/Repce
+
+ ... also our homebrewed RPC backend where the transport layer is
+ reduced to a pair of filehandles.
+
+ This is the client component.
+ """
+
+ def __init__(self, i, o):
+ self.inf, self.out = ioparse(i, o)
+ self.jtab = {}
+ t = Thread(target = self.listen)
+ t.start()
+
+ def listen(self):
+ while True:
+ select((self.inf,), (), ())
+ rid, exc, res = recv(self.inf)
+ rjob = self.jtab.pop(rid)
+ if rjob.cbk:
+ rjob.cbk(rjob, [exc, res])
+
+ def push(self, meth, *args, **kw):
+ """wrap arguments in a RepceJob, send them to server
+ and return the RepceJob
+
+ @cbk to pass on RepceJob can be given as kwarg.
+ """
+ cbk = kw.get('cbk')
+ if not cbk:
+ def cbk(rj, res):
+ if res[0]:
+ raise res[1]
+ rjob = RepceJob(cbk)
+ self.jtab[rjob.rid] = rjob
+ logging.debug("call %s %s%s ..." % (repr(rjob), meth, repr(args)))
+ send(self.out, rjob.rid, meth, *args)
+ return rjob
+
+ def __call__(self, meth, *args):
+ """RePCe client is callabe, calling it implements a synchronous remote call
+
+ We do a .push with a cbk which does a wakeup upon receiving anwser, then wait
+ on the RepceJob.
+ """
+ rjob = self.push(meth, *args, **{'cbk': lambda rj, res: rj.wakeup(res)})
+ exc, res = rjob.wait()
+ if exc:
+ logging.error('call %s (%s) failed on peer with %s' % (repr(rjob), meth, str(type(res).__name__)))
+ raise res
+ logging.debug("call %s %s -> %s" % (repr(rjob), meth, repr(res)))
+ return res
+
+ class mprx(object):
+ """method proxy, standard trick to implement rubyesque method_missing
+ in Python
+
+ A class is a closure factory, you know what I mean, or go read some SICP.
+ """
+ def __init__(self, ins, meth):
+ self.ins = ins
+ self.meth = meth
+
+ def __call__(self, *a):
+ return self.ins(self.meth, *a)
+
+ def __getattr__(self, meth):
+ """this implements transparent method dispatch to remote object,
+ so that you don't need to call the RepceClient instance like
+
+ rclient('how_old_are_you_if_born_in', 1979)
+
+ but you can make it into an ordinary method call like
+
+ rclient.how_old_are_you_if_born_in(1979)
+ """
+ return self.mprx(self, meth)
+
+ def __version__(self):
+ """used in handshake to verify compatibility"""
+ d = {'proto': self('__repce_version__')}
+ try:
+ d['object'] = self('version')
+ except AttributeError:
+ pass
+ return d