diff options
Diffstat (limited to 'geo-replication/syncdaemon/monitor.py')
-rw-r--r-- | geo-replication/syncdaemon/monitor.py | 58 |
1 files changed, 19 insertions, 39 deletions
diff --git a/geo-replication/syncdaemon/monitor.py b/geo-replication/syncdaemon/monitor.py index 029726c7a5a..ba5c8e32514 100644 --- a/geo-replication/syncdaemon/monitor.py +++ b/geo-replication/syncdaemon/monitor.py @@ -22,10 +22,12 @@ from errno import EEXIST import re import random from gconf import gconf -from syncdutils import update_file, select, waitpid +from syncdutils import select, waitpid from syncdutils import set_term_handler, is_host_local, GsyncdError from syncdutils import escape, Thread, finalize, memoize +from gsyncdstatus import GeorepStatus, set_monitor_status + ParseError = XET.ParseError if hasattr(XET, 'ParseError') else SyntaxError @@ -125,46 +127,22 @@ class Volinfo(object): def disperse_count(self): return int(self.get('disperseCount')[0].text) + class Monitor(object): """class which spawns and manages gsyncd workers""" ST_INIT = 'Initializing...' - ST_STABLE = 'Stable' - ST_FAULTY = 'faulty' + ST_STARTED = 'Started' + ST_STABLE = 'Active' + ST_FAULTY = 'Faulty' ST_INCON = 'inconsistent' _ST_ORD = [ST_STABLE, ST_INIT, ST_FAULTY, ST_INCON] def __init__(self): self.lock = Lock() self.state = {} - - def set_state(self, state, w=None): - """set the state that can be used by external agents - like glusterd for status reporting""" - computestate = lambda: self.state and self._ST_ORD[ - max(self._ST_ORD.index(s) for s in self.state.values())] - if w: - self.lock.acquire() - old_state = computestate() - self.state[w] = state - state = computestate() - self.lock.release() - if state != old_state: - self.set_state(state) - else: - if getattr(gconf, 'state_file', None): - # If previous state is paused, suffix the - # new state with '(Paused)' - try: - with open(gconf.state_file, "r") as f: - content = f.read() - if "paused" in content.lower(): - state = state + '(Paused)' - except IOError: - pass - logging.info('new state: %s' % state) - update_file(gconf.state_file, lambda f: f.write(state + '\n')) + self.status = {} @staticmethod def terminate(): @@ -174,8 +152,7 @@ class Monitor(object): # give a chance to graceful exit os.kill(-os.getpid(), signal.SIGTERM) - - def monitor(self, w, argv, cpids, agents, slave_vol, slave_host): + def monitor(self, w, argv, cpids, agents, slave_vol, slave_host, master): """the monitor loop Basic logic is a blantantly simple blunt heuristics: @@ -194,8 +171,11 @@ class Monitor(object): blown worker blows up on EPIPE if the net goes down, due to the keep-alive thread) """ + if not self.status.get(w[0], None): + self.status[w[0]] = GeorepStatus(gconf.state_file, w[0]) - self.set_state(self.ST_INIT, w) + set_monitor_status(gconf.state_file, self.ST_STARTED) + self.status[w[0]].set_worker_status(self.ST_INIT) ret = 0 @@ -310,7 +290,7 @@ class Monitor(object): nwait(apid) #wait for agent ret = nwait(cpid) if ret is None: - self.set_state(self.ST_STABLE, w) + self.status[w[0]].set_worker_status(self.ST_STABLE) #If worker dies, agent terminates on EOF. #So lets wait for agent first. nwait(apid) @@ -320,12 +300,12 @@ class Monitor(object): else: ret = exit_status(ret) if ret in (0, 1): - self.set_state(self.ST_FAULTY, w) + self.status[w[0]].set_worker_status(self.ST_FAULTY) time.sleep(10) - self.set_state(self.ST_INCON, w) + self.status[w[0]].set_worker_status(self.ST_INCON) return ret - def multiplex(self, wspx, suuid, slave_vol, slave_host): + def multiplex(self, wspx, suuid, slave_vol, slave_host, master): argv = sys.argv[:] for o in ('-N', '--no-daemon', '--monitor'): while o in argv: @@ -339,7 +319,7 @@ class Monitor(object): for wx in wspx: def wmon(w): cpid, _ = self.monitor(w, argv, cpids, agents, slave_vol, - slave_host) + slave_host, master) time.sleep(1) self.lock.acquire() for cpid in cpids: @@ -401,7 +381,7 @@ def distribute(*resources): for idx, brick in enumerate(mvol.bricks) if is_host_local(brick['host'])] logging.info('worker specs: ' + repr(workerspex)) - return workerspex, suuid, slave_vol, slave_host + return workerspex, suuid, slave_vol, slave_host, master def monitor(*resources): |