summaryrefslogtreecommitdiffstats
path: root/geo-replication/syncdaemon/repce.py
diff options
context:
space:
mode:
Diffstat (limited to 'geo-replication/syncdaemon/repce.py')
-rw-r--r--geo-replication/syncdaemon/repce.py82
1 files changed, 55 insertions, 27 deletions
diff --git a/geo-replication/syncdaemon/repce.py b/geo-replication/syncdaemon/repce.py
index 755fb61df48..c622afa6373 100644
--- a/geo-replication/syncdaemon/repce.py
+++ b/geo-replication/syncdaemon/repce.py
@@ -1,32 +1,40 @@
+#
+# Copyright (c) 2011-2014 Red Hat, Inc. <http://www.redhat.com>
+# This file is part of GlusterFS.
+
+# This file is licensed to you under your choice of the GNU Lesser
+# General Public License, version 3 or any later version (LGPLv3 or
+# later), or the GNU General Public License, version 2 (GPLv2), in all
+# cases as published by the Free Software Foundation.
+#
+
import os
import sys
import time
import logging
from threading import Condition
try:
- import thread
-except ImportError:
- # py 3
import _thread as thread
-try:
- from Queue import Queue
except ImportError:
- # py 3
+ import thread
+try:
from queue import Queue
+except ImportError:
+ from Queue import Queue
try:
import cPickle as pickle
except ImportError:
- # py 3
import pickle
-from syncdutils import Thread, select
+from syncdutils import Thread, select, lf
-pickle_proto = -1
+pickle_proto = 2
repce_version = 1.0
+
def ioparse(i, o):
if isinstance(i, int):
- i = os.fdopen(i)
+ i = os.fdopen(i, 'rb')
# rely on duck typing for recognizing
# streams as that works uniformly
# in py2 and py3
@@ -34,6 +42,7 @@ def ioparse(i, o):
o = o.fileno()
return (i, o)
+
def send(out, *args):
"""pickle args and write out wholly in one syscall
@@ -43,12 +52,21 @@ def send(out, *args):
"""
os.write(out, pickle.dumps(args, pickle_proto))
+
def recv(inf):
- """load an object from input stream"""
- return pickle.load(inf)
+ """load an object from input stream
+ python2 and python3 compatibility, inf is sys.stdin
+ and is opened as text stream by default. Hence using the
+ buffer attribute in python3
+ """
+ if hasattr(inf, "buffer"):
+ return pickle.load(inf.buffer)
+ else:
+ return pickle.load(inf)
class RepceServer(object):
+
"""RePCe is Hungarian for canola, http://hu.wikipedia.org/wiki/Repce
... also our homebrewed RPC backend where the transport layer is
@@ -95,16 +113,17 @@ class RepceServer(object):
if rmeth == '__repce_version__':
res = repce_version
else:
- try:
- res = getattr(self.obj, rmeth)(*in_data[2:])
- except:
- res = sys.exc_info()[1]
- exc = True
- logging.exception("call failed: ")
+ try:
+ res = getattr(self.obj, rmeth)(*in_data[2:])
+ except:
+ res = sys.exc_info()[1]
+ exc = True
+ logging.exception("call failed: ")
send(self.out, rid, exc, res)
class RepceJob(object):
+
"""class representing message status we can use
for waiting on reply"""
@@ -137,6 +156,7 @@ class RepceJob(object):
class RepceClient(object):
+
"""RePCe is Hungarian for canola, http://hu.wikipedia.org/wiki/Repce
... also our homebrewed RPC backend where the transport layer is
@@ -148,7 +168,7 @@ class RepceClient(object):
def __init__(self, i, o):
self.inf, self.out = ioparse(i, o)
self.jtab = {}
- t = Thread(target = self.listen)
+ t = Thread(target=self.listen)
t.start()
def listen(self):
@@ -177,25 +197,33 @@ class RepceClient(object):
return rjob
def __call__(self, meth, *args):
- """RePCe client is callabe, calling it implements a synchronous remote call
+ """RePCe client is callabe, calling it implements a synchronous
+ remote call.
- We do a .push with a cbk which does a wakeup upon receiving anwser, then wait
- on the RepceJob.
+ We do a .push with a cbk which does a wakeup upon receiving answer,
+ then wait on the RepceJob.
"""
- rjob = self.push(meth, *args, **{'cbk': lambda rj, res: rj.wakeup(res)})
+ rjob = self.push(
+ meth, *args, **{'cbk': lambda rj, res: rj.wakeup(res)})
exc, res = rjob.wait()
if exc:
- logging.error('call %s (%s) failed on peer with %s' % (repr(rjob), meth, str(type(res).__name__)))
+ logging.error(lf('call failed',
+ call=repr(rjob),
+ method=meth,
+ error=str(type(res).__name__)))
raise res
logging.debug("call %s %s -> %s" % (repr(rjob), meth, repr(res)))
return res
class mprx(object):
- """method proxy, standard trick to implement rubyesque method_missing
- in Python
- A class is a closure factory, you know what I mean, or go read some SICP.
+ """method proxy, standard trick to implement rubyesque
+ method_missing in Python
+
+ A class is a closure factory, you know what I mean, or go read
+ some SICP.
"""
+
def __init__(self, ins, meth):
self.ins = ins
self.meth = meth