diff options
Diffstat (limited to 'geo-replication/syncdaemon/monitor.py')
| -rw-r--r-- | geo-replication/syncdaemon/monitor.py | 54 | 
1 files changed, 44 insertions, 10 deletions
diff --git a/geo-replication/syncdaemon/monitor.py b/geo-replication/syncdaemon/monitor.py index c41eb969143..5a6bf5033a4 100644 --- a/geo-replication/syncdaemon/monitor.py +++ b/geo-replication/syncdaemon/monitor.py @@ -18,7 +18,7 @@ import xml.etree.ElementTree as XET  from subprocess import PIPE  from resource import Popen, FILE, GLUSTER, SSH  from threading import Lock -from errno import EEXIST +from errno import ECHILD  import re  import random  from gconf import gconf @@ -188,10 +188,18 @@ class Monitor(object):          ret = 0          def nwait(p, o=0): -            p2, r = waitpid(p, o) -            if not p2: -                return -            return r +            try: +                p2, r = waitpid(p, o) +                if not p2: +                    return +                return r +            except OSError as e: +                # no child process, this happens if the child process +                # already died and has been cleaned up +                if e.errno == ECHILD: +                    return -1 +                else: +                    raise          def exit_signalled(s):              """ child teminated due to receipt of SIGUSR1 """ @@ -240,6 +248,8 @@ class Monitor(object):              # spawn the agent process              apid = os.fork()              if apid == 0: +                os.close(rw) +                os.close(ww)                  os.execv(sys.executable, argv + ['--local-path', w[0],                                                   '--agent',                                                   '--rpc-fd', @@ -249,6 +259,8 @@ class Monitor(object):              cpid = os.fork()              if cpid == 0:                  os.close(pr) +                os.close(ra) +                os.close(wa)                  os.execv(sys.executable, argv + ['--feedback-fd', str(pw),                                                   '--local-path', w[0],                                                   '--local-id', @@ -277,30 +289,52 @@ class Monitor(object):              if so:                  ret = nwait(cpid, os.WNOHANG) +                ret_agent = nwait(apid, os.WNOHANG) + +                if ret_agent is not None: +                    # Agent is died Kill Worker +                    logging.info("Changelog Agent died, " +                                 "Aborting Worker(%s)" % w[0]) +                    os.kill(cpid, signal.SIGKILL) +                    nwait(cpid) +                    nwait(apid) +                  if ret is not None:                      logging.info("worker(%s) died before establishing "                                   "connection" % w[0]) -                    nwait(apid) #wait for agent +                    nwait(apid)  # wait for agent                  else:                      logging.debug("worker(%s) connected" % w[0])                      while time.time() < t0 + conn_timeout:                          ret = nwait(cpid, os.WNOHANG) +                        ret_agent = nwait(apid, os.WNOHANG) +                          if ret is not None:                              logging.info("worker(%s) died in startup "                                           "phase" % w[0]) -                            nwait(apid) #wait for agent +                            nwait(apid)  # wait for agent +                            break + +                        if ret_agent is not None: +                            # Agent is died Kill Worker +                            logging.info("Changelog Agent died, Aborting " +                                         "Worker(%s)" % w[0]) +                            os.kill(cpid, signal.SIGKILL) +                            nwait(cpid) +                            nwait(apid)                              break +                          time.sleep(1)              else:                  logging.info("worker(%s) not confirmed in %d sec, "                               "aborting it" % (w[0], conn_timeout))                  os.kill(cpid, signal.SIGKILL) -                nwait(apid) #wait for agent +                nwait(apid)  # wait for agent                  ret = nwait(cpid)              if ret is None:                  self.status[w[0]].set_worker_status(self.ST_STABLE) -                #If worker dies, agent terminates on EOF. -                #So lets wait for agent first. +                # If worker dies, agent terminates on EOF. +                # So lets wait for agent first.                  nwait(apid)                  ret = nwait(cpid)              if exit_signalled(ret):  | 
