summaryrefslogtreecommitdiffstats
path: root/geo-replication/syncdaemon/monitor.py
diff options
context:
space:
mode:
Diffstat (limited to 'geo-replication/syncdaemon/monitor.py')
-rw-r--r--geo-replication/syncdaemon/monitor.py34
1 files changed, 32 insertions, 2 deletions
diff --git a/geo-replication/syncdaemon/monitor.py b/geo-replication/syncdaemon/monitor.py
index 8ed6f832618..e49a24ee5f5 100644
--- a/geo-replication/syncdaemon/monitor.py
+++ b/geo-replication/syncdaemon/monitor.py
@@ -108,7 +108,7 @@ class Monitor(object):
# give a chance to graceful exit
os.kill(-os.getpid(), signal.SIGTERM)
- def monitor(self, w, argv, cpids):
+ def monitor(self, w, argv, cpids, agents):
"""the monitor loop
Basic logic is a blantantly simple blunt heuristics:
@@ -149,6 +149,23 @@ class Monitor(object):
while ret in (0, 1):
logging.info('-' * conn_timeout)
logging.info('starting gsyncd worker')
+
+ # Couple of pipe pairs for RPC communication b/w
+ # worker and changelog agent.
+
+ # read/write end for agent
+ (ra, ww) = os.pipe()
+ # read/write end for worker
+ (rw, wa) = os.pipe()
+
+ # spawn the agent process
+ apid = os.fork()
+ if apid == 0:
+ os.execv(sys.executable, argv + ['--local-path', w[0],
+ '--agent',
+ '--rpc-fd',
+ ','.join([str(ra), str(wa),
+ str(rw), str(ww)])])
pr, pw = os.pipe()
cpid = os.fork()
if cpid == 0:
@@ -157,14 +174,26 @@ class Monitor(object):
'--local-path', w[0],
'--local-id',
'.' + escape(w[0]),
+ '--rpc-fd',
+ ','.join([str(rw), str(ww),
+ str(ra), str(wa)]),
'--resource-remote', w[1]])
self.lock.acquire()
cpids.add(cpid)
+ agents.add(apid)
self.lock.release()
os.close(pw)
+
t0 = time.time()
so = select((pr,), (), (), conn_timeout)[0]
os.close(pr)
+
+ # close all RPC pipes in monitor
+ os.close(ra)
+ os.close(wa)
+ os.close(rw)
+ os.close(ww)
+
if so:
ret = nwait(cpid, os.WNOHANG)
if ret is not None:
@@ -206,10 +235,11 @@ class Monitor(object):
argv.insert(0, os.path.basename(sys.executable))
cpids = set()
+ agents = set()
ta = []
for wx in wspx:
def wmon(w):
- cpid, _ = self.monitor(w, argv, cpids)
+ cpid, _ = self.monitor(w, argv, cpids, agents)
time.sleep(1)
self.lock.acquire()
for cpid in cpids: