summaryrefslogtreecommitdiffstats
path: root/xlators/features/marker/utils/syncdaemon/repce.py
diff options
context:
space:
mode:
Diffstat (limited to 'xlators/features/marker/utils/syncdaemon/repce.py')
-rw-r--r--xlators/features/marker/utils/syncdaemon/repce.py64
1 files changed, 64 insertions, 0 deletions
diff --git a/xlators/features/marker/utils/syncdaemon/repce.py b/xlators/features/marker/utils/syncdaemon/repce.py
index 47691301e29..9473524909a 100644
--- a/xlators/features/marker/utils/syncdaemon/repce.py
+++ b/xlators/features/marker/utils/syncdaemon/repce.py
@@ -36,21 +36,39 @@ def ioparse(i, o):
return (i, o)
def send(out, *args):
+ """pickle args and write out wholly in one syscall
+
+ ie. not use the ability of pickle to dump directly to
+ a stream, as that would potentially mess up messages
+ by interleaving them
+ """
os.write(out, pickle.dumps(args, pickle_proto))
def recv(inf):
+ """load an object from input stream"""
return pickle.load(inf)
class RepceServer(object):
+ """RePCe is Hungarian for canola, http://hu.wikipedia.org/wiki/Repce
+
+ ... also our homebrewed RPC backend where the transport layer is
+ reduced to a pair of filehandles.
+
+ This is the server component.
+ """
def __init__(self, obj, i, o, wnum=6):
+ """register a backend object .obj to which incoming messages
+ are dispatched, also incoming/outcoming streams
+ """
self.obj = obj
self.inf, self.out = ioparse(i, o)
self.wnum = wnum
self.q = Queue()
def service_loop(self):
+ """fire up worker threads, get messages and dispatch among them"""
for i in range(self.wnum):
t = Thread(target=self.worker)
t.start()
@@ -61,6 +79,15 @@ class RepceServer(object):
logging.info("terminating on reaching EOF.")
def worker(self):
+ """life of a worker
+
+ Get message, extract its id, method name and arguments
+ (kwargs not supported), call method on .obj.
+ Send back message id + return value.
+ If method call throws an exception, rescue it, and send
+ back the exception as result (with flag marking it as
+ exception).
+ """
while True:
in_data = self.q.get(True)
rid = in_data[0]
@@ -79,8 +106,14 @@ class RepceServer(object):
class RepceJob(object):
+ """class representing message status we can use
+ for waiting on reply"""
def __init__(self, cbk):
+ """
+ - .rid: (process-wise) unique id
+ - .cbk: what we do upon receiving reply
+ """
self.rid = (os.getpid(), thread.get_ident(), time.time())
self.cbk = cbk
self.lever = Condition()
@@ -105,6 +138,13 @@ class RepceJob(object):
class RepceClient(object):
+ """RePCe is Hungarian for canola, http://hu.wikipedia.org/wiki/Repce
+
+ ... also our homebrewed RPC backend where the transport layer is
+ reduced to a pair of filehandles.
+
+ This is the client component.
+ """
def __init__(self, i, o):
self.inf, self.out = ioparse(i, o)
@@ -121,6 +161,11 @@ class RepceClient(object):
rjob.cbk(rjob, [exc, res])
def push(self, meth, *args, **kw):
+ """wrap arguments in a RepceJob, send them to server
+ and return the RepceJob
+
+ @cbk to pass on RepceJob can be given as kwarg.
+ """
cbk = kw.get('cbk')
if not cbk:
def cbk(rj, res):
@@ -133,6 +178,11 @@ class RepceClient(object):
return rjob
def __call__(self, meth, *args):
+ """RePCe client is callabe, calling it implements a synchronous remote call
+
+ We do a .push with a cbk which does a wakeup upon receiving anwser, then wait
+ on the RepceJob.
+ """
rjob = self.push(meth, *args, **{'cbk': lambda rj, res: rj.wakeup(res)})
exc, res = rjob.wait()
if exc:
@@ -142,7 +192,11 @@ class RepceClient(object):
return res
class mprx(object):
+ """method proxy, standard trick to implement rubyesque method_missing
+ in Python
+ A class is a closure factory, you know what I mean, or go read some SICP.
+ """
def __init__(self, ins, meth):
self.ins = ins
self.meth = meth
@@ -151,9 +205,19 @@ class RepceClient(object):
return self.ins(self.meth, *a)
def __getattr__(self, meth):
+ """this implements transparent method dispatch to remote object,
+ so that you don't need to call the RepceClient instance like
+
+ rclient('how_old_are_you_if_born_in', 1979)
+
+ but you can make it into an ordinary method call like
+
+ rclient.how_old_are_you_if_born_in(1979)
+ """
return self.mprx(self, meth)
def __version__(self):
+ """used in handshake to verify compatibility"""
d = {'proto': self('__repce_version__')}
try:
d['object'] = self('version')