diff options
author | Aravinda VK <avishwan@redhat.com> | 2019-10-23 10:10:12 +0530 |
---|---|---|
committer | Amar Tumballi <amarts@gmail.com> | 2019-11-07 06:24:39 +0000 |
commit | 0fc68040b72fc94dec3874345547e294b9ec1f45 (patch) | |
tree | ea053550c79c3903804a7e3c3812551a41e17b01 /geo-replication/syncdaemon/monitor.py | |
parent | 0ab6c178468b6cce095c54ab62cfa51162d01fcc (diff) |
georep: Merge Worker and Agent as a single process
- libgfchangelog is simplified by removing unnecessary API Class
- Merged Agent logic into Worker instead of running Worker and Agent as
two separate processes and maintaining RPC between Worker and Agent.
- Geo-rep command Pause and Resume will continue without any changes.
But Agent functionality also gets paused with that.
Updates: #755
Change-Id: Ie2c00fa7dddf21f180f0649e0aaf084d29023c98
Signed-off-by: Aravinda VK <avishwan@redhat.com>
Diffstat (limited to 'geo-replication/syncdaemon/monitor.py')
-rw-r--r-- | geo-replication/syncdaemon/monitor.py | 83 |
1 files changed, 6 insertions, 77 deletions
diff --git a/geo-replication/syncdaemon/monitor.py b/geo-replication/syncdaemon/monitor.py index 236afe70d11..14e77aef27e 100644 --- a/geo-replication/syncdaemon/monitor.py +++ b/geo-replication/syncdaemon/monitor.py @@ -20,6 +20,7 @@ import random from resource import SSH import gsyncdconfig as gconf +import libgfchangelog from rconf import rconf from syncdutils import select, waitpid, errno_wrap, lf, grabpidfile from syncdutils import set_term_handler, GsyncdError @@ -81,7 +82,7 @@ class Monitor(object): # give a chance to graceful exit errno_wrap(os.kill, [-os.getpid(), signal.SIGTERM], [ESRCH]) - def monitor(self, w, argv, cpids, agents, slave_vol, slave_host, master, + def monitor(self, w, argv, cpids, slave_vol, slave_host, master, suuid, slavenodes): """the monitor loop @@ -150,7 +151,7 @@ class Monitor(object): remote_host = "%s@%s" % (remote_user, remote_new[0]) remote_id = remote_new[1] - # Spawn the worker and agent in lock to avoid fd leak + # Spawn the worker in lock to avoid fd leak self.lock.acquire() self.status[w[0]['dir']].set_worker_status(self.ST_INIT) @@ -158,44 +159,10 @@ class Monitor(object): brick=w[0]['dir'], slave_node=remote_host)) - # Couple of pipe pairs for RPC communication b/w - # worker and changelog agent. - - # read/write end for agent - (ra, ww) = pipe() - # read/write end for worker - (rw, wa) = pipe() - - # spawn the agent process - apid = os.fork() - if apid == 0: - os.close(rw) - os.close(ww) - args_to_agent = argv + [ - 'agent', - rconf.args.master, - rconf.args.slave, - '--local-path', w[0]['dir'], - '--local-node', w[0]['host'], - '--local-node-id', w[0]['uuid'], - '--slave-id', suuid, - '--rpc-fd', ','.join([str(ra), str(wa), str(rw), str(ww)]) - ] - - if rconf.args.config_file is not None: - args_to_agent += ['-c', rconf.args.config_file] - - if rconf.args.debug: - args_to_agent.append("--debug") - - os.execv(sys.executable, args_to_agent) - pr, pw = pipe() cpid = os.fork() if cpid == 0: os.close(pr) - os.close(ra) - os.close(wa) args_to_worker = argv + [ 'worker', @@ -206,8 +173,6 @@ class Monitor(object): '--local-node', w[0]['host'], '--local-node-id', w[0]['uuid'], '--slave-id', suuid, - '--rpc-fd', - ','.join([str(rw), str(ww), str(ra), str(wa)]), '--subvol-num', str(w[2]), '--resource-remote', remote_host, '--resource-remote-id', remote_id @@ -238,14 +203,8 @@ class Monitor(object): os.execv(sys.executable, args_to_worker) cpids.add(cpid) - agents.add(apid) os.close(pw) - # close all RPC pipes in monitor - os.close(ra) - os.close(wa) - os.close(rw) - os.close(ww) self.lock.release() t0 = time.time() @@ -254,42 +213,19 @@ class Monitor(object): if so: ret = nwait(cpid, os.WNOHANG) - ret_agent = nwait(apid, os.WNOHANG) - - if ret_agent is not None: - # Agent is died Kill Worker - logging.info(lf("Changelog Agent died, Aborting Worker", - brick=w[0]['dir'])) - errno_wrap(os.kill, [cpid, signal.SIGKILL], [ESRCH]) - nwait(cpid) - nwait(apid) if ret is not None: logging.info(lf("worker died before establishing " "connection", brick=w[0]['dir'])) - nwait(apid) # wait for agent else: logging.debug("worker(%s) connected" % w[0]['dir']) while time.time() < t0 + conn_timeout: ret = nwait(cpid, os.WNOHANG) - ret_agent = nwait(apid, os.WNOHANG) if ret is not None: logging.info(lf("worker died in startup phase", brick=w[0]['dir'])) - nwait(apid) # wait for agent - break - - if ret_agent is not None: - # Agent is died Kill Worker - logging.info(lf("Changelog Agent died, Aborting " - "Worker", - brick=w[0]['dir'])) - errno_wrap(os.kill, [cpid, signal.SIGKILL], - [ESRCH]) - nwait(cpid) - nwait(apid) break time.sleep(1) @@ -304,12 +240,8 @@ class Monitor(object): brick=w[0]['dir'], timeout=conn_timeout)) errno_wrap(os.kill, [cpid, signal.SIGKILL], [ESRCH]) - nwait(apid) # wait for agent ret = nwait(cpid) if ret is None: - # If worker dies, agent terminates on EOF. - # So lets wait for agent first. - nwait(apid) ret = nwait(cpid) if exit_signalled(ret): ret = 0 @@ -333,18 +265,15 @@ class Monitor(object): argv = [os.path.basename(sys.executable), sys.argv[0]] cpids = set() - agents = set() ta = [] for wx in wspx: def wmon(w): - cpid, _ = self.monitor(w, argv, cpids, agents, slave_vol, + cpid, _ = self.monitor(w, argv, cpids, slave_vol, slave_host, master, suuid, slavenodes) time.sleep(1) self.lock.acquire() for cpid in cpids: errno_wrap(os.kill, [cpid, signal.SIGKILL], [ESRCH]) - for apid in agents: - errno_wrap(os.kill, [apid, signal.SIGKILL], [ESRCH]) self.lock.release() finalize(exval=1) t = Thread(target=wmon, args=[wx]) @@ -354,8 +283,8 @@ class Monitor(object): # monitor status was being updated in each monitor thread. It # should not be done as it can cause deadlock for a worker start. # set_monitor_status uses flock to synchronize multple instances - # updating the file. Since each monitor thread forks worker and - # agent, these processes can hold the reference to fd of status + # updating the file. Since each monitor thread forks worker, + # these processes can hold the reference to fd of status # file causing deadlock to workers which starts later as flock # will not be release until all references to same fd is closed. # It will also cause fd leaks. |