summaryrefslogtreecommitdiffstats
path: root/xlators/features/marker/utils/syncdaemon/repce.py
diff options
context:
space:
mode:
Diffstat (limited to 'xlators/features/marker/utils/syncdaemon/repce.py')
-rw-r--r--xlators/features/marker/utils/syncdaemon/repce.py150
1 files changed, 150 insertions, 0 deletions
diff --git a/xlators/features/marker/utils/syncdaemon/repce.py b/xlators/features/marker/utils/syncdaemon/repce.py
new file mode 100644
index 000000000..f878d481a
--- /dev/null
+++ b/xlators/features/marker/utils/syncdaemon/repce.py
@@ -0,0 +1,150 @@
+import os
+import sys
+import select
+import time
+import logging
+from threading import Thread, Condition
+try:
+ import thread
+except ImportError:
+ # py 3
+ import _thread as thread
+try:
+ from Queue import Queue
+except ImportError:
+ # py 3
+ from queue import Queue
+try:
+ import cPickle as pickle
+except ImportError:
+ # py 3
+ import pickle
+
+pickle_proto = -1
+
+def ioparse(i, o):
+ if isinstance(i, int):
+ i = os.fdopen(i)
+ # rely on duck typing for recognizing
+ # streams as that works uniformly
+ # in py2 and py3
+ if hasattr(o, 'fileno'):
+ o = o.fileno()
+ return (i, o)
+
+def send(out, *args):
+ os.write(out, pickle.dumps(args, pickle_proto))
+
+def recv(inf):
+ return pickle.load(inf)
+
+
+class RepceServer(object):
+
+ def __init__(self, obj, i, o, wnum=6):
+ self.obj = obj
+ self.inf, self.out = ioparse(i, o)
+ self.wnum = wnum
+ self.q = Queue()
+
+ def service_loop(self):
+ for i in range(self.wnum):
+ t = Thread(target=self.worker)
+ t.setDaemon(True)
+ t.start()
+ try:
+ while True:
+ self.q.put(recv(self.inf))
+ except EOFError:
+ logging.info("terminating on reaching EOF.")
+
+ def worker(self):
+ while True:
+ in_data = self.q.get(True)
+ rid = in_data[0]
+ rmeth = in_data[1]
+ exc = False
+ try:
+ res = getattr(self.obj, rmeth)(*in_data[2:])
+ except:
+ res = sys.exc_info()[1]
+ exc = True
+ logging.exception("call failed: ")
+ send(self.out, rid, exc, res)
+
+
+class RepceJob(object):
+
+ def __init__(self, cbk):
+ self.rid = (os.getpid(), thread.get_ident(), time.time())
+ self.cbk = cbk
+ self.lever = Condition()
+ self.done = False
+
+ def __repr__(self):
+ return ':'.join([str(x) for x in self.rid])
+
+ def wait(self):
+ self.lever.acquire()
+ if not self.done:
+ self.lever.wait()
+ self.lever.release()
+ return self.result
+
+ def wakeup(self, data):
+ self.result = data
+ self.lever.acquire()
+ self.done = True
+ self.lever.notify()
+ self.lever.release()
+
+
+class RepceClient(object):
+
+ def __init__(self, i, o):
+ self.inf, self.out = ioparse(i, o)
+ self.jtab = {}
+ t = Thread(target = self.listen)
+ t.setDaemon(True)
+ t.start()
+
+ def listen(self):
+ while True:
+ select.select((self.inf,), (), ())
+ rid, exc, res = recv(self.inf)
+ rjob = self.jtab.pop(rid)
+ if rjob.cbk:
+ rjob.cbk(rjob, [exc, res])
+
+ def push(self, meth, *args, **kw):
+ cbk = kw.get('cbk')
+ if not cbk:
+ def cbk(rj, res):
+ if res[0]:
+ raise res[1]
+ rjob = RepceJob(cbk)
+ self.jtab[rjob.rid] = rjob
+ logging.debug("call %s %s%s ..." % (repr(rjob), meth, repr(args)))
+ send(self.out, rjob.rid, meth, *args)
+ return rjob
+
+ def __call__(self, meth, *args):
+ rjob = self.push(meth, *args, **{'cbk': lambda rj, res: rj.wakeup(res)})
+ exc, res = rjob.wait()
+ if exc:
+ logging.error('call %s (%s) failed on peer with %s' % (repr(rjob), meth, str(type(res).__name__)))
+ raise res
+ logging.debug("call %s %s -> %s" % (repr(rjob), meth, repr(res)))
+ return res
+
+ class mprx(object):
+
+ def __init__(self, ins, meth):
+ self.ins = ins
+ self.meth = meth
+
+ def __call__(self, *a):
+ return self.ins(self.meth, *a)
+
+ def __getattr__(self, meth):
+ return self.mprx(self, meth)