diff options
| author | Aravinda VK <avishwan@redhat.com> | 2016-10-12 23:32:39 +0530 | 
|---|---|---|
| committer | Aravinda VK <avishwan@redhat.com> | 2016-10-18 03:56:03 -0700 | 
| commit | 3830b48b6a46854d6597a36b6f2089ac1e486eb5 (patch) | |
| tree | 0c7372f895febe9e4032c125d7b62dc390b56427 | |
| parent | 727f2ebc84b6a3e6db6223ed8f5584d5cbbddbf6 (diff) | |
geo-rep/eventsapi: Additional Events
Added following events
EVENT_GEOREP_ACTIVE
    {
        "nodeid": NODEID,
        "ts": TIMESTAMP,
        "event": "GEOREP_ACTIVE",
        "message": {
            "master_volume": MASTER_VOLUME_NAME,
            "slave_host": SLAVE_HOST,
            "slave_volume": SLAVE_VOLUME,
            "brick_path": BRICK_PATH
        }
    }
EVENT_GEOREP_PASSIVE
    {
        "nodeid": NODEID,
        "ts": TIMESTAMP,
        "event": "GEOREP_PASSIVE",
        "message": {
            "master_volume": MASTER_VOLUME_NAME,
            "slave_host": SLAVE_HOST,
            "slave_volume": SLAVE_VOLUME,
            "brick_path": BRICK_PATH
        }
    }
EVENT_GEOREP_CHECKPOINT_COMPLETED
    {
        "nodeid": NODEID,
        "ts": TIMESTAMP,
        "event": "GEOREP_ACTIVE",
        "message": {
            "master_volume": MASTER_VOLUME_NAME,
            "slave_host": SLAVE_HOST,
            "slave_volume": SLAVE_VOLUME,
            "brick_path": BRICK_PATH,
            "checkpoint_time": CHECKPOINT_TIME,
            "checkpoint_completion_time": CHECKPOINT_COMPLETION_TIME
        }
    }
BUG: 1379330
Change-Id: I90716175868c59dd65c8d202e73e0ede90347b6a
Signed-off-by: Aravinda VK <avishwan@redhat.com>
Reviewed-on: http://review.gluster.org/15630
Smoke: Gluster Build System <jenkins@build.gluster.org>
NetBSD-regression: NetBSD Build System <jenkins@build.gluster.org>
CentOS-regression: Gluster Build System <jenkins@build.gluster.org>
Reviewed-by: Kotresh HR <khiremat@redhat.com>
Tested-by: Kotresh HR <khiremat@redhat.com>
| -rw-r--r-- | events/eventskeygen.py | 4 | ||||
| -rw-r--r-- | geo-replication/syncdaemon/gsyncd.py | 4 | ||||
| -rw-r--r-- | geo-replication/syncdaemon/gsyncdstatus.py | 50 | ||||
| -rw-r--r-- | geo-replication/syncdaemon/monitor.py | 5 | ||||
| -rw-r--r-- | geo-replication/syncdaemon/resource.py | 7 | ||||
| -rw-r--r-- | geo-replication/syncdaemon/syncdutils.py | 19 | 
6 files changed, 81 insertions, 8 deletions
diff --git a/events/eventskeygen.py b/events/eventskeygen.py index 57c66ce6e18..d7354b278f2 100644 --- a/events/eventskeygen.py +++ b/events/eventskeygen.py @@ -128,6 +128,10 @@ keys = (      "EVENT_EC_MIN_BRICKS_UP",      #georep async events      "EVENT_GEOREP_FAULTY", +    "EVENT_GEOREP_CHECKPOINT_COMPLETED", +    "EVENT_GEOREP_ACTIVE", +    "EVENT_GEOREP_PASSIVE", +      #quota async events      "EVENT_QUOTA_CROSSED_SOFT_LIMIT",      #bitrot async events diff --git a/geo-replication/syncdaemon/gsyncd.py b/geo-replication/syncdaemon/gsyncd.py index ee2a9b334d3..3718ba83141 100644 --- a/geo-replication/syncdaemon/gsyncd.py +++ b/geo-replication/syncdaemon/gsyncd.py @@ -39,6 +39,7 @@ from changelogagent import agent, Changelog  from gsyncdstatus import set_monitor_status, GeorepStatus  from libcxattr import Xattr  import struct +from syncdutils import get_master_and_slave_data_from_args  ParseError = XET.ParseError if hasattr(XET, 'ParseError') else SyntaxError @@ -698,8 +699,11 @@ def main_i():      status_get = rconf.get('status_get')      if status_get: +        master_name, slave_data = get_master_and_slave_data_from_args(args)          for brick in gconf.path:              brick_status = GeorepStatus(gconf.state_file, brick, +                                        master_name, +                                        slave_data,                                          getattr(gconf, "pid_file", None))              checkpoint_time = int(getattr(gconf, "checkpoint", "0"))              brick_status.print_status(checkpoint_time=checkpoint_time) diff --git a/geo-replication/syncdaemon/gsyncdstatus.py b/geo-replication/syncdaemon/gsyncdstatus.py index d36ddce865e..f0836edbb26 100644 --- a/geo-replication/syncdaemon/gsyncdstatus.py +++ b/geo-replication/syncdaemon/gsyncdstatus.py @@ -17,6 +17,8 @@ import json  import time  from datetime import datetime  from errno import EACCES, EAGAIN, ENOENT +from syncdutils import EVENT_GEOREP_ACTIVE, EVENT_GEOREP_PASSIVE, gf_event +from syncdutils import EVENT_GEOREP_CHECKPOINT_COMPLETED  DEFAULT_STATUS = "N/A"  MONITOR_STATUS = ("Created", "Started", "Paused", "Stopped") @@ -115,7 +117,12 @@ def set_monitor_status(status_file, status):  class GeorepStatus(object): -    def __init__(self, monitor_status_file, brick, monitor_pid_file=None): +    def __init__(self, monitor_status_file, brick, master, slave, +                 monitor_pid_file=None): +        self.master = master +        slv_data = slave.split("::") +        self.slave_host = slv_data[0] +        self.slave_volume = slv_data[1].split(":")[0]  # Remove Slave UUID          self.work_dir = os.path.dirname(monitor_status_file)          self.monitor_status_file = monitor_status_file          self.filename = os.path.join(self.work_dir, @@ -138,6 +145,10 @@ class GeorepStatus(object):                  data = self.default_values              data = mergerfunc(data) +            # If Data is not changed by merger func +            if not data: +                return False +              with tempfile.NamedTemporaryFile(                      'w',                      dir=os.path.dirname(self.filename), @@ -150,6 +161,7 @@ class GeorepStatus(object):                              os.O_DIRECTORY)              os.fsync(dirfd)              os.close(dirfd) +            return True      def reset_on_worker_start(self):          def merger(data): @@ -164,10 +176,24 @@ class GeorepStatus(object):      def set_field(self, key, value):          def merger(data): +            # Current data and prev data is same +            if data[key] == value: +                return {} +              data[key] = value              return json.dumps(data) -        self._update(merger) +        return self._update(merger) + +    def trigger_gf_event_checkpoint_completion(self, checkpoint_time, +                                               checkpoint_completion_time): +        gf_event(EVENT_GEOREP_CHECKPOINT_COMPLETED, +                 master_volume=self.master, +                 slave_host=self.slave_host, +                 slave_volume=self.slave_volume, +                 brick_path=self.brick, +                 checkpoint_time=checkpoint_time, +                 checkpoint_completion_time=checkpoint_completion_time)      def set_last_synced(self, value, checkpoint_time):          def merger(data): @@ -185,9 +211,13 @@ class GeorepStatus(object):              # previously then update the checkpoint completed time              if checkpoint_time > 0 and checkpoint_time <= value[0]:                  if data["checkpoint_completed"] == "No": +                    curr_time = int(time.time())                      data["checkpoint_time"] = checkpoint_time -                    data["checkpoint_completion_time"] = int(time.time()) +                    data["checkpoint_completion_time"] = curr_time                      data["checkpoint_completed"] = "Yes" +                    self.trigger_gf_event_checkpoint_completion( +                        checkpoint_time, curr_time) +              return json.dumps(data)          self._update(merger) @@ -222,10 +252,20 @@ class GeorepStatus(object):          self._update(merger)      def set_active(self): -        self.set_field("worker_status", "Active") +        if self.set_field("worker_status", "Active"): +            gf_event(EVENT_GEOREP_ACTIVE, +                     master_volume=self.master, +                     slave_host=self.slave_host, +                     slave_volume=self.slave_volume, +                     brick_path=self.brick)      def set_passive(self): -        self.set_field("worker_status", "Passive") +        if self.set_field("worker_status", "Passive"): +            gf_event(EVENT_GEOREP_PASSIVE, +                     master_volume=self.master, +                     slave_host=self.slave_host, +                     slave_volume=self.slave_volume, +                     brick_path=self.brick)      def get_monitor_status(self):          data = "" diff --git a/geo-replication/syncdaemon/monitor.py b/geo-replication/syncdaemon/monitor.py index 22cd1cc3a86..7eddd26d5ea 100644 --- a/geo-replication/syncdaemon/monitor.py +++ b/geo-replication/syncdaemon/monitor.py @@ -212,7 +212,10 @@ class Monitor(object):          """          if not self.status.get(w[0]['dir'], None):              self.status[w[0]['dir']] = GeorepStatus(gconf.state_file, -                                                    w[0]['dir']) +                                                    w[0]['dir'], +                                                    master, +                                                    "%s::%s" % (slave_host, +                                                                slave_vol))          set_monitor_status(gconf.state_file, self.ST_STARTED)          self.status[w[0]['dir']].set_worker_status(self.ST_INIT) diff --git a/geo-replication/syncdaemon/resource.py b/geo-replication/syncdaemon/resource.py index 9dd8988dc6d..0b756b750e7 100644 --- a/geo-replication/syncdaemon/resource.py +++ b/geo-replication/syncdaemon/resource.py @@ -40,7 +40,7 @@ from syncdutils import ChangelogException, ChangelogHistoryNotAvailable  from syncdutils import get_changelog_log_level  from syncdutils import CHANGELOG_AGENT_CLIENT_VERSION  from gsyncdstatus import GeorepStatus - +from syncdutils import get_master_and_slave_data_from_args  UrlRX = re.compile('\A(\w+)://([^ *?[]*)\Z')  HostRX = re.compile('[a-zA-Z\d](?:[a-zA-Z\d.-]*[a-zA-Z\d])?', re.I) @@ -1541,7 +1541,10 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):              changelog_register_failed = False              (inf, ouf, ra, wa) = gconf.rpc_fd.split(',')              changelog_agent = RepceClient(int(inf), int(ouf)) -            status = GeorepStatus(gconf.state_file, gconf.local_path) +            master_name, slave_data = get_master_and_slave_data_from_args( +                sys.argv) +            status = GeorepStatus(gconf.state_file, gconf.local_path, +                                  master_name, slave_data)              status.reset_on_worker_start()              rv = changelog_agent.version()              if int(rv) != CHANGELOG_AGENT_CLIENT_VERSION: diff --git a/geo-replication/syncdaemon/syncdutils.py b/geo-replication/syncdaemon/syncdutils.py index 5dabbeaccef..5b926e0c271 100644 --- a/geo-replication/syncdaemon/syncdutils.py +++ b/geo-replication/syncdaemon/syncdutils.py @@ -28,10 +28,17 @@ sys.path.insert(1, GLUSTERFS_LIBEXECDIR)  EVENTS_ENABLED = True  try:      from events.eventtypes import GEOREP_FAULTY as EVENT_GEOREP_FAULTY +    from events.eventtypes import GEOREP_ACTIVE as EVENT_GEOREP_ACTIVE +    from events.eventtypes import GEOREP_PASSIVE as EVENT_GEOREP_PASSIVE +    from events.eventtypes import GEOREP_CHECKPOINT_COMPLETED \ +        as EVENT_GEOREP_CHECKPOINT_COMPLETED  except ImportError:      # Events APIs not installed, dummy eventtypes with None      EVENTS_ENABLED = False      EVENT_GEOREP_FAULTY = None +    EVENT_GEOREP_ACTIVE = None +    EVENT_GEOREP_PASSIVE = None +    EVENT_GEOREP_CHECKPOINT_COMPLETED = None  try:      from cPickle import PickleError @@ -542,3 +549,15 @@ class GlusterLogLevel(object):  def get_changelog_log_level(lvl):      return getattr(GlusterLogLevel, lvl, GlusterLogLevel.INFO) + + +def get_master_and_slave_data_from_args(args): +    master_name = None +    slave_data = None +    for arg in args: +        if arg.startswith(":"): +            master_name = arg.replace(":", "") +        if "::" in arg: +            slave_data = arg.replace("ssh://", "") + +    return (master_name, slave_data)  | 
