diff options
Diffstat (limited to 'geo-replication/syncdaemon/monitor.py')
| -rw-r--r-- | geo-replication/syncdaemon/monitor.py | 34 | 
1 files changed, 32 insertions, 2 deletions
diff --git a/geo-replication/syncdaemon/monitor.py b/geo-replication/syncdaemon/monitor.py index 8ed6f832618..e49a24ee5f5 100644 --- a/geo-replication/syncdaemon/monitor.py +++ b/geo-replication/syncdaemon/monitor.py @@ -108,7 +108,7 @@ class Monitor(object):          # give a chance to graceful exit          os.kill(-os.getpid(), signal.SIGTERM) -    def monitor(self, w, argv, cpids): +    def monitor(self, w, argv, cpids, agents):          """the monitor loop          Basic logic is a blantantly simple blunt heuristics: @@ -149,6 +149,23 @@ class Monitor(object):          while ret in (0, 1):              logging.info('-' * conn_timeout)              logging.info('starting gsyncd worker') + +            # Couple of pipe pairs for RPC communication b/w +            # worker and changelog agent. + +            # read/write end for agent +            (ra, ww) = os.pipe() +            # read/write end for worker +            (rw, wa) = os.pipe() + +            # spawn the agent process +            apid = os.fork() +            if apid == 0: +                os.execv(sys.executable, argv + ['--local-path', w[0], +                                                 '--agent', +                                                 '--rpc-fd', +                                                 ','.join([str(ra), str(wa), +                                                           str(rw), str(ww)])])              pr, pw = os.pipe()              cpid = os.fork()              if cpid == 0: @@ -157,14 +174,26 @@ class Monitor(object):                                                   '--local-path', w[0],                                                   '--local-id',                                                   '.' + escape(w[0]), +                                                 '--rpc-fd', +                                                 ','.join([str(rw), str(ww), +                                                           str(ra), str(wa)]),                                                   '--resource-remote', w[1]])              self.lock.acquire()              cpids.add(cpid) +            agents.add(apid)              self.lock.release()              os.close(pw) +              t0 = time.time()              so = select((pr,), (), (), conn_timeout)[0]              os.close(pr) + +            # close all RPC pipes in monitor +            os.close(ra) +            os.close(wa) +            os.close(rw) +            os.close(ww) +              if so:                  ret = nwait(cpid, os.WNOHANG)                  if ret is not None: @@ -206,10 +235,11 @@ class Monitor(object):          argv.insert(0, os.path.basename(sys.executable))          cpids = set() +        agents = set()          ta = []          for wx in wspx:              def wmon(w): -                cpid, _ = self.monitor(w, argv, cpids) +                cpid, _ = self.monitor(w, argv, cpids, agents)                  time.sleep(1)                  self.lock.acquire()                  for cpid in cpids:  | 
