diff options
author | Aravinda VK <avishwan@redhat.com> | 2015-10-30 17:06:58 +0530 |
---|---|---|
committer | Venky Shankar <vshankar@redhat.com> | 2015-11-09 01:20:31 -0800 |
commit | 5d1ff7efd6ab3bd29a29922a9ea1e1aaf02544ad (patch) | |
tree | 9f8562986815d80842836d74e4b1cc3bcce24ae6 | |
parent | f68c95a429b44afc0197152a7819d17ce1de734c (diff) |
geo-rep: Kill Geo-rep Worker when Agent process dies
When Changelog agent process dies, Geo-replication fails to detect
and worker will run without respective Changelog agent. Status shows
Active/Passive without any progress.
With this patch, Worker process gets killed whenever Changelog
agent dies.
Change-Id: I30b4cc77f924f7e8174b8bfe415ac17f0b3851b4
Signed-off-by: Aravinda VK <avishwan@redhat.com>
BUG: 1277076
Reviewed-on: http://review.gluster.org/12485
Tested-by: NetBSD Build System <jenkins@build.gluster.org>
Tested-by: Gluster Build System <jenkins@build.gluster.com>
Reviewed-by: Venky Shankar <vshankar@redhat.com>
Reviewed-by: Kotresh HR <khiremat@redhat.com>
-rw-r--r-- | geo-replication/syncdaemon/changelogagent.py | 2 | ||||
-rw-r--r-- | geo-replication/syncdaemon/monitor.py | 54 | ||||
-rw-r--r-- | geo-replication/syncdaemon/resource.py | 2 |
3 files changed, 44 insertions, 14 deletions
diff --git a/geo-replication/syncdaemon/changelogagent.py b/geo-replication/syncdaemon/changelogagent.py index ad5f69cfb23..731dbd06f57 100644 --- a/geo-replication/syncdaemon/changelogagent.py +++ b/geo-replication/syncdaemon/changelogagent.py @@ -66,8 +66,6 @@ class Changelog(object): class ChangelogAgent(object): def __init__(self, obj, fd_tup): (inf, ouf, rw, ww) = fd_tup.split(',') - os.close(int(rw)) - os.close(int(ww)) repce = RepceServer(obj, int(inf), int(ouf), 1) t = syncdutils.Thread(target=lambda: (repce.service_loop(), syncdutils.finalize())) diff --git a/geo-replication/syncdaemon/monitor.py b/geo-replication/syncdaemon/monitor.py index c41eb969143..5a6bf5033a4 100644 --- a/geo-replication/syncdaemon/monitor.py +++ b/geo-replication/syncdaemon/monitor.py @@ -18,7 +18,7 @@ import xml.etree.ElementTree as XET from subprocess import PIPE from resource import Popen, FILE, GLUSTER, SSH from threading import Lock -from errno import EEXIST +from errno import ECHILD import re import random from gconf import gconf @@ -188,10 +188,18 @@ class Monitor(object): ret = 0 def nwait(p, o=0): - p2, r = waitpid(p, o) - if not p2: - return - return r + try: + p2, r = waitpid(p, o) + if not p2: + return + return r + except OSError as e: + # no child process, this happens if the child process + # already died and has been cleaned up + if e.errno == ECHILD: + return -1 + else: + raise def exit_signalled(s): """ child teminated due to receipt of SIGUSR1 """ @@ -240,6 +248,8 @@ class Monitor(object): # spawn the agent process apid = os.fork() if apid == 0: + os.close(rw) + os.close(ww) os.execv(sys.executable, argv + ['--local-path', w[0], '--agent', '--rpc-fd', @@ -249,6 +259,8 @@ class Monitor(object): cpid = os.fork() if cpid == 0: os.close(pr) + os.close(ra) + os.close(wa) os.execv(sys.executable, argv + ['--feedback-fd', str(pw), '--local-path', w[0], '--local-id', @@ -277,30 +289,52 @@ class Monitor(object): if so: ret = nwait(cpid, os.WNOHANG) + ret_agent = nwait(apid, os.WNOHANG) + + if ret_agent is not None: + # Agent is died Kill Worker + logging.info("Changelog Agent died, " + "Aborting Worker(%s)" % w[0]) + os.kill(cpid, signal.SIGKILL) + nwait(cpid) + nwait(apid) + if ret is not None: logging.info("worker(%s) died before establishing " "connection" % w[0]) - nwait(apid) #wait for agent + nwait(apid) # wait for agent else: logging.debug("worker(%s) connected" % w[0]) while time.time() < t0 + conn_timeout: ret = nwait(cpid, os.WNOHANG) + ret_agent = nwait(apid, os.WNOHANG) + if ret is not None: logging.info("worker(%s) died in startup " "phase" % w[0]) - nwait(apid) #wait for agent + nwait(apid) # wait for agent + break + + if ret_agent is not None: + # Agent is died Kill Worker + logging.info("Changelog Agent died, Aborting " + "Worker(%s)" % w[0]) + os.kill(cpid, signal.SIGKILL) + nwait(cpid) + nwait(apid) break + time.sleep(1) else: logging.info("worker(%s) not confirmed in %d sec, " "aborting it" % (w[0], conn_timeout)) os.kill(cpid, signal.SIGKILL) - nwait(apid) #wait for agent + nwait(apid) # wait for agent ret = nwait(cpid) if ret is None: self.status[w[0]].set_worker_status(self.ST_STABLE) - #If worker dies, agent terminates on EOF. - #So lets wait for agent first. + # If worker dies, agent terminates on EOF. + # So lets wait for agent first. nwait(apid) ret = nwait(cpid) if exit_signalled(ret): diff --git a/geo-replication/syncdaemon/resource.py b/geo-replication/syncdaemon/resource.py index 51f88627a96..a44ca914222 100644 --- a/geo-replication/syncdaemon/resource.py +++ b/geo-replication/syncdaemon/resource.py @@ -1394,8 +1394,6 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote): # g3 ==> changelog History changelog_register_failed = False (inf, ouf, ra, wa) = gconf.rpc_fd.split(',') - os.close(int(ra)) - os.close(int(wa)) changelog_agent = RepceClient(int(inf), int(ouf)) status = GeorepStatus(gconf.state_file, gconf.local_path) status.reset_on_worker_start() |