summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--events/eventskeygen.py4
-rw-r--r--geo-replication/syncdaemon/gsyncd.py4
-rw-r--r--geo-replication/syncdaemon/gsyncdstatus.py50
-rw-r--r--geo-replication/syncdaemon/monitor.py5
-rw-r--r--geo-replication/syncdaemon/resource.py7
-rw-r--r--geo-replication/syncdaemon/syncdutils.py19
6 files changed, 81 insertions, 8 deletions
diff --git a/events/eventskeygen.py b/events/eventskeygen.py
index 57c66ce6e18..d7354b278f2 100644
--- a/events/eventskeygen.py
+++ b/events/eventskeygen.py
@@ -128,6 +128,10 @@ keys = (
"EVENT_EC_MIN_BRICKS_UP",
#georep async events
"EVENT_GEOREP_FAULTY",
+ "EVENT_GEOREP_CHECKPOINT_COMPLETED",
+ "EVENT_GEOREP_ACTIVE",
+ "EVENT_GEOREP_PASSIVE",
+
#quota async events
"EVENT_QUOTA_CROSSED_SOFT_LIMIT",
#bitrot async events
diff --git a/geo-replication/syncdaemon/gsyncd.py b/geo-replication/syncdaemon/gsyncd.py
index ee2a9b334d3..3718ba83141 100644
--- a/geo-replication/syncdaemon/gsyncd.py
+++ b/geo-replication/syncdaemon/gsyncd.py
@@ -39,6 +39,7 @@ from changelogagent import agent, Changelog
from gsyncdstatus import set_monitor_status, GeorepStatus
from libcxattr import Xattr
import struct
+from syncdutils import get_master_and_slave_data_from_args
ParseError = XET.ParseError if hasattr(XET, 'ParseError') else SyntaxError
@@ -698,8 +699,11 @@ def main_i():
status_get = rconf.get('status_get')
if status_get:
+ master_name, slave_data = get_master_and_slave_data_from_args(args)
for brick in gconf.path:
brick_status = GeorepStatus(gconf.state_file, brick,
+ master_name,
+ slave_data,
getattr(gconf, "pid_file", None))
checkpoint_time = int(getattr(gconf, "checkpoint", "0"))
brick_status.print_status(checkpoint_time=checkpoint_time)
diff --git a/geo-replication/syncdaemon/gsyncdstatus.py b/geo-replication/syncdaemon/gsyncdstatus.py
index d36ddce865e..f0836edbb26 100644
--- a/geo-replication/syncdaemon/gsyncdstatus.py
+++ b/geo-replication/syncdaemon/gsyncdstatus.py
@@ -17,6 +17,8 @@ import json
import time
from datetime import datetime
from errno import EACCES, EAGAIN, ENOENT
+from syncdutils import EVENT_GEOREP_ACTIVE, EVENT_GEOREP_PASSIVE, gf_event
+from syncdutils import EVENT_GEOREP_CHECKPOINT_COMPLETED
DEFAULT_STATUS = "N/A"
MONITOR_STATUS = ("Created", "Started", "Paused", "Stopped")
@@ -115,7 +117,12 @@ def set_monitor_status(status_file, status):
class GeorepStatus(object):
- def __init__(self, monitor_status_file, brick, monitor_pid_file=None):
+ def __init__(self, monitor_status_file, brick, master, slave,
+ monitor_pid_file=None):
+ self.master = master
+ slv_data = slave.split("::")
+ self.slave_host = slv_data[0]
+ self.slave_volume = slv_data[1].split(":")[0] # Remove Slave UUID
self.work_dir = os.path.dirname(monitor_status_file)
self.monitor_status_file = monitor_status_file
self.filename = os.path.join(self.work_dir,
@@ -138,6 +145,10 @@ class GeorepStatus(object):
data = self.default_values
data = mergerfunc(data)
+ # If Data is not changed by merger func
+ if not data:
+ return False
+
with tempfile.NamedTemporaryFile(
'w',
dir=os.path.dirname(self.filename),
@@ -150,6 +161,7 @@ class GeorepStatus(object):
os.O_DIRECTORY)
os.fsync(dirfd)
os.close(dirfd)
+ return True
def reset_on_worker_start(self):
def merger(data):
@@ -164,10 +176,24 @@ class GeorepStatus(object):
def set_field(self, key, value):
def merger(data):
+ # Current data and prev data is same
+ if data[key] == value:
+ return {}
+
data[key] = value
return json.dumps(data)
- self._update(merger)
+ return self._update(merger)
+
+ def trigger_gf_event_checkpoint_completion(self, checkpoint_time,
+ checkpoint_completion_time):
+ gf_event(EVENT_GEOREP_CHECKPOINT_COMPLETED,
+ master_volume=self.master,
+ slave_host=self.slave_host,
+ slave_volume=self.slave_volume,
+ brick_path=self.brick,
+ checkpoint_time=checkpoint_time,
+ checkpoint_completion_time=checkpoint_completion_time)
def set_last_synced(self, value, checkpoint_time):
def merger(data):
@@ -185,9 +211,13 @@ class GeorepStatus(object):
# previously then update the checkpoint completed time
if checkpoint_time > 0 and checkpoint_time <= value[0]:
if data["checkpoint_completed"] == "No":
+ curr_time = int(time.time())
data["checkpoint_time"] = checkpoint_time
- data["checkpoint_completion_time"] = int(time.time())
+ data["checkpoint_completion_time"] = curr_time
data["checkpoint_completed"] = "Yes"
+ self.trigger_gf_event_checkpoint_completion(
+ checkpoint_time, curr_time)
+
return json.dumps(data)
self._update(merger)
@@ -222,10 +252,20 @@ class GeorepStatus(object):
self._update(merger)
def set_active(self):
- self.set_field("worker_status", "Active")
+ if self.set_field("worker_status", "Active"):
+ gf_event(EVENT_GEOREP_ACTIVE,
+ master_volume=self.master,
+ slave_host=self.slave_host,
+ slave_volume=self.slave_volume,
+ brick_path=self.brick)
def set_passive(self):
- self.set_field("worker_status", "Passive")
+ if self.set_field("worker_status", "Passive"):
+ gf_event(EVENT_GEOREP_PASSIVE,
+ master_volume=self.master,
+ slave_host=self.slave_host,
+ slave_volume=self.slave_volume,
+ brick_path=self.brick)
def get_monitor_status(self):
data = ""
diff --git a/geo-replication/syncdaemon/monitor.py b/geo-replication/syncdaemon/monitor.py
index 22cd1cc3a86..7eddd26d5ea 100644
--- a/geo-replication/syncdaemon/monitor.py
+++ b/geo-replication/syncdaemon/monitor.py
@@ -212,7 +212,10 @@ class Monitor(object):
"""
if not self.status.get(w[0]['dir'], None):
self.status[w[0]['dir']] = GeorepStatus(gconf.state_file,
- w[0]['dir'])
+ w[0]['dir'],
+ master,
+ "%s::%s" % (slave_host,
+ slave_vol))
set_monitor_status(gconf.state_file, self.ST_STARTED)
self.status[w[0]['dir']].set_worker_status(self.ST_INIT)
diff --git a/geo-replication/syncdaemon/resource.py b/geo-replication/syncdaemon/resource.py
index 9dd8988dc6d..0b756b750e7 100644
--- a/geo-replication/syncdaemon/resource.py
+++ b/geo-replication/syncdaemon/resource.py
@@ -40,7 +40,7 @@ from syncdutils import ChangelogException, ChangelogHistoryNotAvailable
from syncdutils import get_changelog_log_level
from syncdutils import CHANGELOG_AGENT_CLIENT_VERSION
from gsyncdstatus import GeorepStatus
-
+from syncdutils import get_master_and_slave_data_from_args
UrlRX = re.compile('\A(\w+)://([^ *?[]*)\Z')
HostRX = re.compile('[a-zA-Z\d](?:[a-zA-Z\d.-]*[a-zA-Z\d])?', re.I)
@@ -1541,7 +1541,10 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):
changelog_register_failed = False
(inf, ouf, ra, wa) = gconf.rpc_fd.split(',')
changelog_agent = RepceClient(int(inf), int(ouf))
- status = GeorepStatus(gconf.state_file, gconf.local_path)
+ master_name, slave_data = get_master_and_slave_data_from_args(
+ sys.argv)
+ status = GeorepStatus(gconf.state_file, gconf.local_path,
+ master_name, slave_data)
status.reset_on_worker_start()
rv = changelog_agent.version()
if int(rv) != CHANGELOG_AGENT_CLIENT_VERSION:
diff --git a/geo-replication/syncdaemon/syncdutils.py b/geo-replication/syncdaemon/syncdutils.py
index 5dabbeaccef..5b926e0c271 100644
--- a/geo-replication/syncdaemon/syncdutils.py
+++ b/geo-replication/syncdaemon/syncdutils.py
@@ -28,10 +28,17 @@ sys.path.insert(1, GLUSTERFS_LIBEXECDIR)
EVENTS_ENABLED = True
try:
from events.eventtypes import GEOREP_FAULTY as EVENT_GEOREP_FAULTY
+ from events.eventtypes import GEOREP_ACTIVE as EVENT_GEOREP_ACTIVE
+ from events.eventtypes import GEOREP_PASSIVE as EVENT_GEOREP_PASSIVE
+ from events.eventtypes import GEOREP_CHECKPOINT_COMPLETED \
+ as EVENT_GEOREP_CHECKPOINT_COMPLETED
except ImportError:
# Events APIs not installed, dummy eventtypes with None
EVENTS_ENABLED = False
EVENT_GEOREP_FAULTY = None
+ EVENT_GEOREP_ACTIVE = None
+ EVENT_GEOREP_PASSIVE = None
+ EVENT_GEOREP_CHECKPOINT_COMPLETED = None
try:
from cPickle import PickleError
@@ -542,3 +549,15 @@ class GlusterLogLevel(object):
def get_changelog_log_level(lvl):
return getattr(GlusterLogLevel, lvl, GlusterLogLevel.INFO)
+
+
+def get_master_and_slave_data_from_args(args):
+ master_name = None
+ slave_data = None
+ for arg in args:
+ if arg.startswith(":"):
+ master_name = arg.replace(":", "")
+ if "::" in arg:
+ slave_data = arg.replace("ssh://", "")
+
+ return (master_name, slave_data)