diff options
Diffstat (limited to 'geo-replication/syncdaemon')
| -rw-r--r-- | geo-replication/syncdaemon/changelogagent.py | 2 | ||||
| -rw-r--r-- | geo-replication/syncdaemon/monitor.py | 54 | ||||
| -rw-r--r-- | geo-replication/syncdaemon/resource.py | 2 | 
3 files changed, 44 insertions, 14 deletions
diff --git a/geo-replication/syncdaemon/changelogagent.py b/geo-replication/syncdaemon/changelogagent.py index ad5f69cfb23..731dbd06f57 100644 --- a/geo-replication/syncdaemon/changelogagent.py +++ b/geo-replication/syncdaemon/changelogagent.py @@ -66,8 +66,6 @@ class Changelog(object):  class ChangelogAgent(object):      def __init__(self, obj, fd_tup):          (inf, ouf, rw, ww) = fd_tup.split(',') -        os.close(int(rw)) -        os.close(int(ww))          repce = RepceServer(obj, int(inf), int(ouf), 1)          t = syncdutils.Thread(target=lambda: (repce.service_loop(),                                                syncdutils.finalize())) diff --git a/geo-replication/syncdaemon/monitor.py b/geo-replication/syncdaemon/monitor.py index c41eb969143..5a6bf5033a4 100644 --- a/geo-replication/syncdaemon/monitor.py +++ b/geo-replication/syncdaemon/monitor.py @@ -18,7 +18,7 @@ import xml.etree.ElementTree as XET  from subprocess import PIPE  from resource import Popen, FILE, GLUSTER, SSH  from threading import Lock -from errno import EEXIST +from errno import ECHILD  import re  import random  from gconf import gconf @@ -188,10 +188,18 @@ class Monitor(object):          ret = 0          def nwait(p, o=0): -            p2, r = waitpid(p, o) -            if not p2: -                return -            return r +            try: +                p2, r = waitpid(p, o) +                if not p2: +                    return +                return r +            except OSError as e: +                # no child process, this happens if the child process +                # already died and has been cleaned up +                if e.errno == ECHILD: +                    return -1 +                else: +                    raise          def exit_signalled(s):              """ child teminated due to receipt of SIGUSR1 """ @@ -240,6 +248,8 @@ class Monitor(object):              # spawn the agent process              apid = os.fork()              if apid == 0: +                os.close(rw) +                os.close(ww)                  os.execv(sys.executable, argv + ['--local-path', w[0],                                                   '--agent',                                                   '--rpc-fd', @@ -249,6 +259,8 @@ class Monitor(object):              cpid = os.fork()              if cpid == 0:                  os.close(pr) +                os.close(ra) +                os.close(wa)                  os.execv(sys.executable, argv + ['--feedback-fd', str(pw),                                                   '--local-path', w[0],                                                   '--local-id', @@ -277,30 +289,52 @@ class Monitor(object):              if so:                  ret = nwait(cpid, os.WNOHANG) +                ret_agent = nwait(apid, os.WNOHANG) + +                if ret_agent is not None: +                    # Agent is died Kill Worker +                    logging.info("Changelog Agent died, " +                                 "Aborting Worker(%s)" % w[0]) +                    os.kill(cpid, signal.SIGKILL) +                    nwait(cpid) +                    nwait(apid) +                  if ret is not None:                      logging.info("worker(%s) died before establishing "                                   "connection" % w[0]) -                    nwait(apid) #wait for agent +                    nwait(apid)  # wait for agent                  else:                      logging.debug("worker(%s) connected" % w[0])                      while time.time() < t0 + conn_timeout:                          ret = nwait(cpid, os.WNOHANG) +                        ret_agent = nwait(apid, os.WNOHANG) +                          if ret is not None:                              logging.info("worker(%s) died in startup "                                           "phase" % w[0]) -                            nwait(apid) #wait for agent +                            nwait(apid)  # wait for agent +                            break + +                        if ret_agent is not None: +                            # Agent is died Kill Worker +                            logging.info("Changelog Agent died, Aborting " +                                         "Worker(%s)" % w[0]) +                            os.kill(cpid, signal.SIGKILL) +                            nwait(cpid) +                            nwait(apid)                              break +                          time.sleep(1)              else:                  logging.info("worker(%s) not confirmed in %d sec, "                               "aborting it" % (w[0], conn_timeout))                  os.kill(cpid, signal.SIGKILL) -                nwait(apid) #wait for agent +                nwait(apid)  # wait for agent                  ret = nwait(cpid)              if ret is None:                  self.status[w[0]].set_worker_status(self.ST_STABLE) -                #If worker dies, agent terminates on EOF. -                #So lets wait for agent first. +                # If worker dies, agent terminates on EOF. +                # So lets wait for agent first.                  nwait(apid)                  ret = nwait(cpid)              if exit_signalled(ret): diff --git a/geo-replication/syncdaemon/resource.py b/geo-replication/syncdaemon/resource.py index 51f88627a96..a44ca914222 100644 --- a/geo-replication/syncdaemon/resource.py +++ b/geo-replication/syncdaemon/resource.py @@ -1394,8 +1394,6 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):              # g3 ==> changelog History              changelog_register_failed = False              (inf, ouf, ra, wa) = gconf.rpc_fd.split(',') -            os.close(int(ra)) -            os.close(int(wa))              changelog_agent = RepceClient(int(inf), int(ouf))              status = GeorepStatus(gconf.state_file, gconf.local_path)              status.reset_on_worker_start()  | 
