summaryrefslogtreecommitdiffstats
path: root/geo-replication/syncdaemon/monitor.py
diff options
context:
space:
mode:
Diffstat (limited to 'geo-replication/syncdaemon/monitor.py')
-rw-r--r--geo-replication/syncdaemon/monitor.py58
1 files changed, 19 insertions, 39 deletions
diff --git a/geo-replication/syncdaemon/monitor.py b/geo-replication/syncdaemon/monitor.py
index 029726c..ba5c8e3 100644
--- a/geo-replication/syncdaemon/monitor.py
+++ b/geo-replication/syncdaemon/monitor.py
@@ -22,10 +22,12 @@ from errno import EEXIST
import re
import random
from gconf import gconf
-from syncdutils import update_file, select, waitpid
+from syncdutils import select, waitpid
from syncdutils import set_term_handler, is_host_local, GsyncdError
from syncdutils import escape, Thread, finalize, memoize
+from gsyncdstatus import GeorepStatus, set_monitor_status
+
ParseError = XET.ParseError if hasattr(XET, 'ParseError') else SyntaxError
@@ -125,46 +127,22 @@ class Volinfo(object):
def disperse_count(self):
return int(self.get('disperseCount')[0].text)
+
class Monitor(object):
"""class which spawns and manages gsyncd workers"""
ST_INIT = 'Initializing...'
- ST_STABLE = 'Stable'
- ST_FAULTY = 'faulty'
+ ST_STARTED = 'Started'
+ ST_STABLE = 'Active'
+ ST_FAULTY = 'Faulty'
ST_INCON = 'inconsistent'
_ST_ORD = [ST_STABLE, ST_INIT, ST_FAULTY, ST_INCON]
def __init__(self):
self.lock = Lock()
self.state = {}
-
- def set_state(self, state, w=None):
- """set the state that can be used by external agents
- like glusterd for status reporting"""
- computestate = lambda: self.state and self._ST_ORD[
- max(self._ST_ORD.index(s) for s in self.state.values())]
- if w:
- self.lock.acquire()
- old_state = computestate()
- self.state[w] = state
- state = computestate()
- self.lock.release()
- if state != old_state:
- self.set_state(state)
- else:
- if getattr(gconf, 'state_file', None):
- # If previous state is paused, suffix the
- # new state with '(Paused)'
- try:
- with open(gconf.state_file, "r") as f:
- content = f.read()
- if "paused" in content.lower():
- state = state + '(Paused)'
- except IOError:
- pass
- logging.info('new state: %s' % state)
- update_file(gconf.state_file, lambda f: f.write(state + '\n'))
+ self.status = {}
@staticmethod
def terminate():
@@ -174,8 +152,7 @@ class Monitor(object):
# give a chance to graceful exit
os.kill(-os.getpid(), signal.SIGTERM)
-
- def monitor(self, w, argv, cpids, agents, slave_vol, slave_host):
+ def monitor(self, w, argv, cpids, agents, slave_vol, slave_host, master):
"""the monitor loop
Basic logic is a blantantly simple blunt heuristics:
@@ -194,8 +171,11 @@ class Monitor(object):
blown worker blows up on EPIPE if the net goes down,
due to the keep-alive thread)
"""
+ if not self.status.get(w[0], None):
+ self.status[w[0]] = GeorepStatus(gconf.state_file, w[0])
- self.set_state(self.ST_INIT, w)
+ set_monitor_status(gconf.state_file, self.ST_STARTED)
+ self.status[w[0]].set_worker_status(self.ST_INIT)
ret = 0
@@ -310,7 +290,7 @@ class Monitor(object):
nwait(apid) #wait for agent
ret = nwait(cpid)
if ret is None:
- self.set_state(self.ST_STABLE, w)
+ self.status[w[0]].set_worker_status(self.ST_STABLE)
#If worker dies, agent terminates on EOF.
#So lets wait for agent first.
nwait(apid)
@@ -320,12 +300,12 @@ class Monitor(object):
else:
ret = exit_status(ret)
if ret in (0, 1):
- self.set_state(self.ST_FAULTY, w)
+ self.status[w[0]].set_worker_status(self.ST_FAULTY)
time.sleep(10)
- self.set_state(self.ST_INCON, w)
+ self.status[w[0]].set_worker_status(self.ST_INCON)
return ret
- def multiplex(self, wspx, suuid, slave_vol, slave_host):
+ def multiplex(self, wspx, suuid, slave_vol, slave_host, master):
argv = sys.argv[:]
for o in ('-N', '--no-daemon', '--monitor'):
while o in argv:
@@ -339,7 +319,7 @@ class Monitor(object):
for wx in wspx:
def wmon(w):
cpid, _ = self.monitor(w, argv, cpids, agents, slave_vol,
- slave_host)
+ slave_host, master)
time.sleep(1)
self.lock.acquire()
for cpid in cpids:
@@ -401,7 +381,7 @@ def distribute(*resources):
for idx, brick in enumerate(mvol.bricks)
if is_host_local(brick['host'])]
logging.info('worker specs: ' + repr(workerspex))
- return workerspex, suuid, slave_vol, slave_host
+ return workerspex, suuid, slave_vol, slave_host, master
def monitor(*resources):