diff options
Diffstat (limited to 'xlators/features/marker/utils/syncdaemon/repce.py')
-rw-r--r-- | xlators/features/marker/utils/syncdaemon/repce.py | 64 |
1 files changed, 64 insertions, 0 deletions
diff --git a/xlators/features/marker/utils/syncdaemon/repce.py b/xlators/features/marker/utils/syncdaemon/repce.py index 47691301e29..9473524909a 100644 --- a/xlators/features/marker/utils/syncdaemon/repce.py +++ b/xlators/features/marker/utils/syncdaemon/repce.py @@ -36,21 +36,39 @@ def ioparse(i, o): return (i, o) def send(out, *args): + """pickle args and write out wholly in one syscall + + ie. not use the ability of pickle to dump directly to + a stream, as that would potentially mess up messages + by interleaving them + """ os.write(out, pickle.dumps(args, pickle_proto)) def recv(inf): + """load an object from input stream""" return pickle.load(inf) class RepceServer(object): + """RePCe is Hungarian for canola, http://hu.wikipedia.org/wiki/Repce + + ... also our homebrewed RPC backend where the transport layer is + reduced to a pair of filehandles. + + This is the server component. + """ def __init__(self, obj, i, o, wnum=6): + """register a backend object .obj to which incoming messages + are dispatched, also incoming/outcoming streams + """ self.obj = obj self.inf, self.out = ioparse(i, o) self.wnum = wnum self.q = Queue() def service_loop(self): + """fire up worker threads, get messages and dispatch among them""" for i in range(self.wnum): t = Thread(target=self.worker) t.start() @@ -61,6 +79,15 @@ class RepceServer(object): logging.info("terminating on reaching EOF.") def worker(self): + """life of a worker + + Get message, extract its id, method name and arguments + (kwargs not supported), call method on .obj. + Send back message id + return value. + If method call throws an exception, rescue it, and send + back the exception as result (with flag marking it as + exception). + """ while True: in_data = self.q.get(True) rid = in_data[0] @@ -79,8 +106,14 @@ class RepceServer(object): class RepceJob(object): + """class representing message status we can use + for waiting on reply""" def __init__(self, cbk): + """ + - .rid: (process-wise) unique id + - .cbk: what we do upon receiving reply + """ self.rid = (os.getpid(), thread.get_ident(), time.time()) self.cbk = cbk self.lever = Condition() @@ -105,6 +138,13 @@ class RepceJob(object): class RepceClient(object): + """RePCe is Hungarian for canola, http://hu.wikipedia.org/wiki/Repce + + ... also our homebrewed RPC backend where the transport layer is + reduced to a pair of filehandles. + + This is the client component. + """ def __init__(self, i, o): self.inf, self.out = ioparse(i, o) @@ -121,6 +161,11 @@ class RepceClient(object): rjob.cbk(rjob, [exc, res]) def push(self, meth, *args, **kw): + """wrap arguments in a RepceJob, send them to server + and return the RepceJob + + @cbk to pass on RepceJob can be given as kwarg. + """ cbk = kw.get('cbk') if not cbk: def cbk(rj, res): @@ -133,6 +178,11 @@ class RepceClient(object): return rjob def __call__(self, meth, *args): + """RePCe client is callabe, calling it implements a synchronous remote call + + We do a .push with a cbk which does a wakeup upon receiving anwser, then wait + on the RepceJob. + """ rjob = self.push(meth, *args, **{'cbk': lambda rj, res: rj.wakeup(res)}) exc, res = rjob.wait() if exc: @@ -142,7 +192,11 @@ class RepceClient(object): return res class mprx(object): + """method proxy, standard trick to implement rubyesque method_missing + in Python + A class is a closure factory, you know what I mean, or go read some SICP. + """ def __init__(self, ins, meth): self.ins = ins self.meth = meth @@ -151,9 +205,19 @@ class RepceClient(object): return self.ins(self.meth, *a) def __getattr__(self, meth): + """this implements transparent method dispatch to remote object, + so that you don't need to call the RepceClient instance like + + rclient('how_old_are_you_if_born_in', 1979) + + but you can make it into an ordinary method call like + + rclient.how_old_are_you_if_born_in(1979) + """ return self.mprx(self, meth) def __version__(self): + """used in handshake to verify compatibility""" d = {'proto': self('__repce_version__')} try: d['object'] = self('version') |