diff options
Diffstat (limited to 'geo-replication/syncdaemon/monitor.py')
| -rw-r--r-- | geo-replication/syncdaemon/monitor.py | 58 | 
1 files changed, 19 insertions, 39 deletions
diff --git a/geo-replication/syncdaemon/monitor.py b/geo-replication/syncdaemon/monitor.py index 029726c7a5a..ba5c8e32514 100644 --- a/geo-replication/syncdaemon/monitor.py +++ b/geo-replication/syncdaemon/monitor.py @@ -22,10 +22,12 @@ from errno import EEXIST  import re  import random  from gconf import gconf -from syncdutils import update_file, select, waitpid +from syncdutils import select, waitpid  from syncdutils import set_term_handler, is_host_local, GsyncdError  from syncdutils import escape, Thread, finalize, memoize +from gsyncdstatus import GeorepStatus, set_monitor_status +  ParseError = XET.ParseError if hasattr(XET, 'ParseError') else SyntaxError @@ -125,46 +127,22 @@ class Volinfo(object):      def disperse_count(self):          return int(self.get('disperseCount')[0].text) +  class Monitor(object):      """class which spawns and manages gsyncd workers"""      ST_INIT = 'Initializing...' -    ST_STABLE = 'Stable' -    ST_FAULTY = 'faulty' +    ST_STARTED = 'Started' +    ST_STABLE = 'Active' +    ST_FAULTY = 'Faulty'      ST_INCON = 'inconsistent'      _ST_ORD = [ST_STABLE, ST_INIT, ST_FAULTY, ST_INCON]      def __init__(self):          self.lock = Lock()          self.state = {} - -    def set_state(self, state, w=None): -        """set the state that can be used by external agents -           like glusterd for status reporting""" -        computestate = lambda: self.state and self._ST_ORD[ -            max(self._ST_ORD.index(s) for s in self.state.values())] -        if w: -            self.lock.acquire() -            old_state = computestate() -            self.state[w] = state -            state = computestate() -            self.lock.release() -            if state != old_state: -                self.set_state(state) -        else: -            if getattr(gconf, 'state_file', None): -                # If previous state is paused, suffix the -                # new state with '(Paused)' -                try: -                    with open(gconf.state_file, "r") as f: -                        content = f.read() -                        if "paused" in content.lower(): -                            state = state + '(Paused)' -                except IOError: -                    pass -                logging.info('new state: %s' % state) -                update_file(gconf.state_file, lambda f: f.write(state + '\n')) +        self.status = {}      @staticmethod      def terminate(): @@ -174,8 +152,7 @@ class Monitor(object):          # give a chance to graceful exit          os.kill(-os.getpid(), signal.SIGTERM) - -    def monitor(self, w, argv, cpids, agents, slave_vol, slave_host): +    def monitor(self, w, argv, cpids, agents, slave_vol, slave_host, master):          """the monitor loop          Basic logic is a blantantly simple blunt heuristics: @@ -194,8 +171,11 @@ class Monitor(object):          blown worker blows up on EPIPE if the net goes down,          due to the keep-alive thread)          """ +        if not self.status.get(w[0], None): +            self.status[w[0]] = GeorepStatus(gconf.state_file, w[0]) -        self.set_state(self.ST_INIT, w) +        set_monitor_status(gconf.state_file, self.ST_STARTED) +        self.status[w[0]].set_worker_status(self.ST_INIT)          ret = 0 @@ -310,7 +290,7 @@ class Monitor(object):                  nwait(apid) #wait for agent                  ret = nwait(cpid)              if ret is None: -                self.set_state(self.ST_STABLE, w) +                self.status[w[0]].set_worker_status(self.ST_STABLE)                  #If worker dies, agent terminates on EOF.                  #So lets wait for agent first.                  nwait(apid) @@ -320,12 +300,12 @@ class Monitor(object):              else:                  ret = exit_status(ret)                  if ret in (0, 1): -                    self.set_state(self.ST_FAULTY, w) +                    self.status[w[0]].set_worker_status(self.ST_FAULTY)              time.sleep(10) -        self.set_state(self.ST_INCON, w) +        self.status[w[0]].set_worker_status(self.ST_INCON)          return ret -    def multiplex(self, wspx, suuid, slave_vol, slave_host): +    def multiplex(self, wspx, suuid, slave_vol, slave_host, master):          argv = sys.argv[:]          for o in ('-N', '--no-daemon', '--monitor'):              while o in argv: @@ -339,7 +319,7 @@ class Monitor(object):          for wx in wspx:              def wmon(w):                  cpid, _ = self.monitor(w, argv, cpids, agents, slave_vol, -                                       slave_host) +                                       slave_host, master)                  time.sleep(1)                  self.lock.acquire()                  for cpid in cpids: @@ -401,7 +381,7 @@ def distribute(*resources):                    for idx, brick in enumerate(mvol.bricks)                    if is_host_local(brick['host'])]      logging.info('worker specs: ' + repr(workerspex)) -    return workerspex, suuid, slave_vol, slave_host +    return workerspex, suuid, slave_vol, slave_host, master  def monitor(*resources):  | 
