diff options
| author | Aravinda VK <avishwan@redhat.com> | 2015-03-12 16:07:13 +0530 | 
|---|---|---|
| committer | Vijay Bellur <vbellur@redhat.com> | 2015-05-06 09:12:35 -0700 | 
| commit | 6ebaa045ae8b2523e91c087cffae35cc4ec682c2 (patch) | |
| tree | 17754ef211780e4a1af52c3dbe9adf84fb643f4e /geo-replication | |
| parent | cfe717c3326ddb51b754bbc6370bac99dd02ec8c (diff) | |
geo-rep: Status Enhancements
Discussion in gluster-devel
http://www.gluster.org/pipermail/gluster-devel/2015-April/044301.html
MASTER NODE - Master Volume Node
MASTER VOL - Master Volume name
MASTER BRICK - Master Volume Brick
SLAVE USER - Slave User to which Geo-rep session is established
SLAVE - <SLAVE_NODE>::<SLAVE_VOL> used in Geo-rep Create command
SLAVE NODE - Slave Node to which Master worker is connected
STATUS - Worker Status(Created, Initializing, Active, Passive, Faulty,
         Paused, Stopped)
CRAWL STATUS - Crawl type(Hybrid Crawl, History Crawl, Changelog Crawl)
LAST_SYNCED - Last Synced Time(Local Time in CLI output and UTC in XML output)
ENTRY - Number of entry Operations pending.(Resets on worker restart)
DATA - Number of Data operations pending(Resets on worker restart)
META - Number of Meta operations pending(Resets on worker restart)
FAILURES - Number of Failures
CHECKPOINT TIME - Checkpoint set Time(Local Time in CLI output and UTC
                  in XML output)
CHECKPOINT COMPLETED - Yes/No or N/A
CHECKPOINT COMPLETION TIME - Checkpoint Completed Time(Local Time in CLI
                             output and UTC in XML output)
XML output:
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
cliOutput>
  geoRep>
      volume>
        name>
        sessions>
          session>
             session_slave>
             pair>
                master_node>
                master_brick>
                slave_user>
                slave/>
                slave_node>
                status>
                crawl_status>
                entry>
                data>
                meta>
                failures>
                checkpoint_completed>
                master_node_uuid>
                last_synced>
                checkpoint_time>
                checkpoint_completion_time>
BUG: 1218586
Change-Id: I944a6c3c67f1e6d6baf9670b474233bec8f61ea3
Signed-off-by: Aravinda VK <avishwan@redhat.com>
Reviewed-on: http://review.gluster.org/10121
Reviewed-by: Kotresh HR <khiremat@redhat.com>
Reviewed-by: Vijay Bellur <vbellur@redhat.com>
Reviewed-on: http://review.gluster.org/10574
Tested-by: Gluster Build System <jenkins@build.gluster.com>
Diffstat (limited to 'geo-replication')
| -rw-r--r-- | geo-replication/syncdaemon/Makefile.am | 3 | ||||
| -rw-r--r-- | geo-replication/syncdaemon/gsyncd.py | 25 | ||||
| -rw-r--r-- | geo-replication/syncdaemon/gsyncdstatus.py | 317 | ||||
| -rw-r--r-- | geo-replication/syncdaemon/master.py | 374 | ||||
| -rw-r--r-- | geo-replication/syncdaemon/monitor.py | 58 | ||||
| -rw-r--r-- | geo-replication/syncdaemon/resource.py | 12 | ||||
| -rw-r--r-- | geo-replication/tests/unit/test_gsyncdstatus.py | 193 | 
7 files changed, 608 insertions, 374 deletions
diff --git a/geo-replication/syncdaemon/Makefile.am b/geo-replication/syncdaemon/Makefile.am index 885963eae2b..ed0f5e40924 100644 --- a/geo-replication/syncdaemon/Makefile.am +++ b/geo-replication/syncdaemon/Makefile.am @@ -2,6 +2,7 @@ syncdaemondir = $(libexecdir)/glusterfs/python/syncdaemon  syncdaemon_PYTHON = gconf.py gsyncd.py __init__.py master.py README.md repce.py \  	resource.py configinterface.py syncdutils.py monitor.py libcxattr.py \ -	$(top_builddir)/contrib/ipaddr-py/ipaddr.py libgfchangelog.py changelogagent.py +	$(top_builddir)/contrib/ipaddr-py/ipaddr.py libgfchangelog.py changelogagent.py \ +	gsyncdstatus.py  CLEANFILES = diff --git a/geo-replication/syncdaemon/gsyncd.py b/geo-replication/syncdaemon/gsyncd.py index b9ee5aec8c7..32e4eb7828d 100644 --- a/geo-replication/syncdaemon/gsyncd.py +++ b/geo-replication/syncdaemon/gsyncd.py @@ -27,12 +27,13 @@ from ipaddr import IPAddress, IPNetwork  from gconf import gconf  from syncdutils import FreeObject, norm, grabpidfile, finalize -from syncdutils import log_raise_exception, privileged, update_file +from syncdutils import log_raise_exception, privileged  from syncdutils import GsyncdError, select, set_term_handler  from configinterface import GConffile, upgrade_config_file  import resource  from monitor import monitor  from changelogagent import agent, Changelog +from gsyncdstatus import set_monitor_status, GeorepStatus  class GLogger(Logger): @@ -267,7 +268,7 @@ def main_i():      op.add_option('--socketdir', metavar='DIR')      op.add_option('--state-socket-unencoded', metavar='SOCKF',                    type=str, action='callback', callback=store_abs) -    op.add_option('--checkpoint', metavar='LABEL', default='') +    op.add_option('--checkpoint', metavar='LABEL', default='0')      # tunables for failover/failback mechanism:      # None   - gsyncd behaves as normal @@ -315,6 +316,8 @@ def main_i():                    action='callback', callback=store_local)      op.add_option('--delete', dest='delete', action='callback',                    callback=store_local_curry(True)) +    op.add_option('--status-get', dest='status_get', action='callback', +                  callback=store_local_curry(True))      op.add_option('--debug', dest="go_daemon", action='callback',                    callback=lambda *a: (store_local_curry('dont')(*a),                                         setattr( @@ -583,15 +586,8 @@ def main_i():              GLogger._gsyncd_loginit(log_file=gconf.log_file, label='conf')              if confdata.op == 'set':                  logging.info('checkpoint %s set' % confdata.val) -                gcnf.delete('checkpoint_completed') -                gcnf.delete('checkpoint_target')              elif confdata.op == 'del':                  logging.info('checkpoint info was reset') -                # if it is removing 'checkpoint' then we need -                # to remove 'checkpoint_completed' and 'checkpoint_target' too -                gcnf.delete('checkpoint_completed') -                gcnf.delete('checkpoint_target') -          except IOError:              if sys.exc_info()[1].errno == ENOENT:                  # directory of log path is not present, @@ -607,7 +603,7 @@ def main_i():      create = rconf.get('create')      if create:          if getattr(gconf, 'state_file', None): -            update_file(gconf.state_file, lambda f: f.write(create + '\n')) +            set_monitor_status(gconf.state_file, create)          return      go_daemon = rconf['go_daemon'] @@ -615,6 +611,15 @@ def main_i():      be_agent = rconf.get('agent')      rscs, local, remote = makersc(args) + +    status_get = rconf.get('status_get') +    if status_get: +        for brick in gconf.path: +            brick_status = GeorepStatus(gconf.state_file, brick) +            checkpoint_time = int(getattr(gconf, "checkpoint", "0")) +            brick_status.print_status(checkpoint_time=checkpoint_time) +        return +      if not be_monitor and isinstance(remote, resource.SSH) and \         go_daemon == 'should':          go_daemon = 'postconn' diff --git a/geo-replication/syncdaemon/gsyncdstatus.py b/geo-replication/syncdaemon/gsyncdstatus.py new file mode 100644 index 00000000000..a49b9c23dea --- /dev/null +++ b/geo-replication/syncdaemon/gsyncdstatus.py @@ -0,0 +1,317 @@ +#!/usr/bin/env python +# +# Copyright (c) 2011-2014 Red Hat, Inc. <http://www.redhat.com> +# This file is part of GlusterFS. + +# This file is licensed to you under your choice of the GNU Lesser +# General Public License, version 3 or any later version (LGPLv3 or +# later), or the GNU General Public License, version 2 (GPLv2), in all +# cases as published by the Free Software Foundation. +# + +import fcntl +import os +import tempfile +import urllib +import json +import time +from datetime import datetime + +DEFAULT_STATUS = "N/A" +MONITOR_STATUS = ("Created", "Started", "Paused", "Stopped") +STATUS_VALUES = (DEFAULT_STATUS, +                 "Initializing...", +                 "Active", +                 "Passive", +                 "Faulty") + +CRAWL_STATUS_VALUES = (DEFAULT_STATUS, +                       "Hybrid Crawl", +                       "History Crawl", +                       "Changelog Crawl") + + +def human_time(ts): +    try: +        return datetime.fromtimestamp(float(ts)).strftime("%Y-%m-%d %H:%M:%S") +    except ValueError: +        return DEFAULT_STATUS + + +def human_time_utc(ts): +    try: +        return datetime.utcfromtimestamp( +            float(ts)).strftime("%Y-%m-%d %H:%M:%S") +    except ValueError: +        return DEFAULT_STATUS + + +def get_default_values(): +    return { +        "slave_node": DEFAULT_STATUS, +        "worker_status": DEFAULT_STATUS, +        "last_synced": 0, +        "crawl_status": DEFAULT_STATUS, +        "entry": 0, +        "data": 0, +        "meta": 0, +        "failures": 0, +        "checkpoint_completed": DEFAULT_STATUS, +        "checkpoint_time": 0, +        "checkpoint_completion_time": 0} + + +class LockedOpen(object): + +    def __init__(self, filename, *args, **kwargs): +        self.filename = filename +        self.open_args = args +        self.open_kwargs = kwargs +        self.fileobj = None + +    def __enter__(self): +        """ +        If two processes compete to update a file, The first process +        gets the lock and the second process is blocked in the fcntl.flock() +        call. When first process replaces the file and releases the lock, +        the already open file descriptor in the second process now points +        to a  "ghost" file(not reachable by any path name) with old contents. +        To avoid that conflict, check the fd already opened is same or +        not. Open new one if not same +        """ +        f = open(self.filename, *self.open_args, **self.open_kwargs) +        while True: +            fcntl.flock(f, fcntl.LOCK_EX) +            fnew = open(self.filename, *self.open_args, **self.open_kwargs) +            if os.path.sameopenfile(f.fileno(), fnew.fileno()): +                fnew.close() +                break +            else: +                f.close() +                f = fnew +        self.fileobj = f +        return f + +    def __exit__(self, _exc_type, _exc_value, _traceback): +        self.fileobj.close() + + +def set_monitor_status(status_file, status): +    fd = os.open(status_file, os.O_CREAT | os.O_RDWR) +    os.close(fd) +    with LockedOpen(status_file, 'r+'): +        with tempfile.NamedTemporaryFile('w', dir=os.path.dirname(status_file), +                                         delete=False) as tf: +            tf.write(status) +            tempname = tf.name + +        os.rename(tempname, status_file) +        dirfd = os.open(os.path.dirname(os.path.abspath(status_file)), +                        os.O_DIRECTORY) +        os.fsync(dirfd) +        os.close(dirfd) + + +class GeorepStatus(object): +    def __init__(self, monitor_status_file, brick): +        self.work_dir = os.path.dirname(monitor_status_file) +        self.monitor_status_file = monitor_status_file +        self.filename = os.path.join(self.work_dir, +                                     "brick_%s.status" +                                     % urllib.quote_plus(brick)) + +        fd = os.open(self.filename, os.O_CREAT | os.O_RDWR) +        os.close(fd) +        fd = os.open(self.monitor_status_file, os.O_CREAT | os.O_RDWR) +        os.close(fd) +        self.brick = brick +        self.default_values = get_default_values() + +    def _update(self, mergerfunc): +        with LockedOpen(self.filename, 'r+') as f: +            try: +                data = json.load(f) +            except ValueError: +                data = self.default_values + +            data = mergerfunc(data) +            with tempfile.NamedTemporaryFile( +                    'w', +                    dir=os.path.dirname(self.filename), +                    delete=False) as tf: +                tf.write(data) +                tempname = tf.name + +            os.rename(tempname, self.filename) +            dirfd = os.open(os.path.dirname(os.path.abspath(self.filename)), +                            os.O_DIRECTORY) +            os.fsync(dirfd) +            os.close(dirfd) + +    def reset_on_worker_start(self): +        def merger(data): +            data["slave_node"] = DEFAULT_STATUS +            data["crawl_status"] = DEFAULT_STATUS +            data["entry"] = 0 +            data["data"] = 0 +            data["meta"] = 0 +            return json.dumps(data) + +        self._update(merger) + +    def set_field(self, key, value): +        def merger(data): +            data[key] = value +            return json.dumps(data) + +        self._update(merger) + +    def set_last_synced(self, value, checkpoint_time): +        def merger(data): +            data["last_synced"] = value[0] + +            # If checkpoint is not set or reset +            # or if last set checkpoint is changed +            if checkpoint_time == 0 or \ +               checkpoint_time != data["checkpoint_time"]: +                data["checkpoint_time"] = 0 +                data["checkpoint_completion_time"] = 0 +                data["checkpoint_completed"] = "No" + +            # If checkpoint is completed and not marked as completed +            # previously then update the checkpoint completed time +            if checkpoint_time > 0 and checkpoint_time <= value[0]: +                if data["checkpoint_completed"] == "No": +                    data["checkpoint_time"] = checkpoint_time +                    data["checkpoint_completion_time"] = int(time.time()) +                    data["checkpoint_completed"] = "Yes" +            return json.dumps(data) + +        self._update(merger) + +    def set_worker_status(self, status): +        self.set_field("worker_status", status) + +    def set_worker_crawl_status(self, status): +        self.set_field("crawl_status", status) + +    def set_slave_node(self, slave_node): +        def merger(data): +            data["slave_node"] = slave_node +            return json.dumps(data) + +        self._update(merger) + +    def inc_value(self, key, value): +        def merger(data): +            data[key] = data.get(key, 0) + value +            return json.dumps(data) + +        self._update(merger) + +    def dec_value(self, key, value): +        def merger(data): +            data[key] = data.get(key, 0) - value +            if data[key] < 0: +                data[key] = 0 +            return json.dumps(data) + +        self._update(merger) + +    def set_active(self): +        self.set_field("worker_status", "Active") + +    def set_passive(self): +        self.set_field("worker_status", "Passive") + +    def get_monitor_status(self): +        data = "" +        with open(self.monitor_status_file, "r") as f: +            data = f.read().strip() +        return data + +    def get_status(self, checkpoint_time=0): +        """ +        Monitor Status --->        Created    Started  Paused      Stopped +        ---------------------------------------------------------------------- +        slave_node                 N/A        VALUE    VALUE       N/A +        status                     Created    VALUE    Paused      Stopped +        last_synced                N/A        VALUE    VALUE       VALUE +        crawl_status               N/A        VALUE    N/A         N/A +        entry                      N/A        VALUE    N/A         N/A +        data                       N/A        VALUE    N/A         N/A +        meta                       N/A        VALUE    N/A         N/A +        failures                   N/A        VALUE    VALUE       VALUE +        checkpoint_completed       N/A        VALUE    VALUE       VALUE +        checkpoint_time            N/A        VALUE    VALUE       VALUE +        checkpoint_completed_time  N/A        VALUE    VALUE       VALUE +        """ +        data = self.default_values +        with open(self.filename) as f: +            try: +                data.update(json.load(f)) +            except ValueError: +                pass +        monitor_status = self.get_monitor_status() + +        if monitor_status in ["Created", "Paused", "Stopped"]: +            data["worker_status"] = monitor_status + +        # Checkpoint adjustments +        if checkpoint_time == 0: +            data["checkpoint_completed"] = DEFAULT_STATUS +            data["checkpoint_time"] = DEFAULT_STATUS +            data["checkpoint_completion_time"] = DEFAULT_STATUS +        else: +            if checkpoint_time != data["checkpoint_time"]: +                if checkpoint_time <= data["last_synced"]: +                    data["checkpoint_completed"] = "Yes" +                    data["checkpoint_time"] = checkpoint_time +                    data["checkpoint_completion_time"] = data["last_synced"] +                else: +                    data["checkpoint_completed"] = "No" +                    data["checkpoint_time"] = checkpoint_time +                    data["checkpoint_completion_time"] = DEFAULT_STATUS + +        if data["checkpoint_time"] not in [0, DEFAULT_STATUS]: +            chkpt_time = data["checkpoint_time"] +            data["checkpoint_time"] = human_time(chkpt_time) +            data["checkpoint_time_utc"] = human_time_utc(chkpt_time) + +        if data["checkpoint_completion_time"] not in [0, DEFAULT_STATUS]: +            chkpt_completion_time = data["checkpoint_completion_time"] +            data["checkpoint_completion_time"] = human_time( +                chkpt_completion_time) +            data["checkpoint_completion_time_utc"] = human_time_utc( +                chkpt_completion_time) + +        if data["last_synced"] == 0: +            data["last_synced"] = DEFAULT_STATUS +            data["last_synced_utc"] = DEFAULT_STATUS +        else: +            last_synced = data["last_synced"] +            data["last_synced"] = human_time(last_synced) +            data["last_synced_utc"] = human_time_utc(last_synced) + +        if data["worker_status"] != "Active": +            data["last_synced"] = DEFAULT_STATUS +            data["last_synced_utc"] = DEFAULT_STATUS +            data["crawl_status"] = DEFAULT_STATUS +            data["entry"] = DEFAULT_STATUS +            data["data"] = DEFAULT_STATUS +            data["meta"] = DEFAULT_STATUS +            data["failures"] = DEFAULT_STATUS +            data["checkpoint_completed"] = DEFAULT_STATUS +            data["checkpoint_time"] = DEFAULT_STATUS +            data["checkpoint_completed_time"] = DEFAULT_STATUS +            data["checkpoint_time_utc"] = DEFAULT_STATUS +            data["checkpoint_completion_time_utc"] = DEFAULT_STATUS + +        if data["worker_status"] not in ["Active", "Passive"]: +            data["slave_node"] = DEFAULT_STATUS + +        return data + +    def print_status(self, checkpoint_time=0): +        for key, value in self.get_status(checkpoint_time).items(): +            print ("%s: %s" % (key, value)) diff --git a/geo-replication/syncdaemon/master.py b/geo-replication/syncdaemon/master.py index e34def6f6ab..cd20a490397 100644 --- a/geo-replication/syncdaemon/master.py +++ b/geo-replication/syncdaemon/master.py @@ -15,17 +15,15 @@ import stat  import json  import logging  import fcntl -import socket  import string  import errno  import tarfile -from errno import ENOENT, ENODATA, EPIPE, EEXIST, EACCES, EAGAIN +from errno import ENOENT, ENODATA, EEXIST, EACCES, EAGAIN  from threading import Condition, Lock  from datetime import datetime  from gconf import gconf -from tempfile import NamedTemporaryFile  from syncdutils import Thread, GsyncdError, boolify, escape -from syncdutils import unescape, select, gauxpfx, md5hex, selfkill +from syncdutils import unescape, gauxpfx, md5hex, selfkill  from syncdutils import lstat, errno_wrap  from syncdutils import NoPurgeTimeAvailable, PartialHistoryAvailable @@ -389,18 +387,6 @@ class GMasterCommon(object):                      raise          return default_data -    def update_crawl_data(self): -        if getattr(gconf, 'state_detail_file', None): -            try: -                same_dir = os.path.dirname(gconf.state_detail_file) -                with NamedTemporaryFile(dir=same_dir, delete=False) as tmp: -                    json.dump(self.total_crawl_stats, tmp) -                    tmp.flush() -                    os.fsync(tmp.fileno()) -                    os.rename(tmp.name, gconf.state_detail_file) -            except (IOError, OSError): -                raise -      def __init__(self, master, slave):          self.master = master          self.slave = slave @@ -426,14 +412,12 @@ class GMasterCommon(object):          self.total_turns = int(gconf.turns)          self.crawl_start = datetime.now()          self.lastreport = {'crawls': 0, 'turns': 0, 'time': 0} -        self.total_crawl_stats = None          self.start = None          self.change_seen = None          # the actual volinfo we make use of          self.volinfo = None          self.terminate = False          self.sleep_interval = 1 -        self.checkpoint_thread = None          self.current_files_skipped_count = 0          self.skipped_gfid_list = []          self.unlinked_gfids = [] @@ -485,7 +469,6 @@ class GMasterCommon(object):          logging.debug("Got the lock")          return True -      def should_crawl(self):          if not gconf.use_meta_volume:              return gconf.glusterd_uuid in self.master.server.node_uuid() @@ -495,7 +478,6 @@ class GMasterCommon(object):              sys.exit(1)          return self.mgmt_lock() -      def register(self):          self.register() @@ -534,10 +516,8 @@ class GMasterCommon(object):              if self.volinfo['retval']:                  logging.warn("master cluster's info may not be valid %d" %                               self.volinfo['retval']) -            self.start_checkpoint_thread()          else:              raise GsyncdError("master volinfo unavailable") -        self.total_crawl_stats = self.get_initial_crawl_data()          self.lastreport['time'] = time.time()          logging.info('crawl interval: %d seconds' % self.sleep_interval) @@ -562,7 +542,7 @@ class GMasterCommon(object):                  t0 = t1              self.update_worker_remote_node()              if not crawl: -                self.update_worker_health("Passive") +                self.status.set_passive()                  # bring up _this_ brick to the cluster stime                  # which is min of cluster (but max of the replicas)                  brick_stime = self.xtime('.', self.slave) @@ -589,35 +569,14 @@ class GMasterCommon(object):                  time.sleep(5)                  continue -            self.update_worker_health("Active") + +            self.status.set_active()              self.crawl() +              if oneshot:                  return              time.sleep(self.sleep_interval) -    @classmethod -    def _checkpt_param(cls, chkpt, prm, xtimish=True): -        """use config backend to lookup a parameter belonging to -           checkpoint @chkpt""" -        cprm = gconf.configinterface.get_realtime('checkpoint_' + prm) -        if not cprm: -            return -        chkpt_mapped, val = cprm.split(':', 1) -        if unescape(chkpt_mapped) != chkpt: -            return -        if xtimish: -            val = cls.deserialize_xtime(val) -        return val - -    @classmethod -    def _set_checkpt_param(cls, chkpt, prm, val, xtimish=True): -        """use config backend to store a parameter associated -           with checkpoint @chkpt""" -        if xtimish: -            val = cls.serialize_xtime(val) -        gconf.configinterface.set( -            'checkpoint_' + prm, "%s:%s" % (escape(chkpt), val)) -      @staticmethod      def humantime(*tpair):          """format xtime-like (sec, nsec) pair to human readable format""" @@ -646,116 +605,6 @@ class GMasterCommon(object):                                string.zfill(m, 2), string.zfill(s, 2))          return date -    def checkpt_service(self, chan, chkpt): -        """checkpoint service loop - -        monitor and verify checkpoint status for @chkpt, and listen -        for incoming requests for whom we serve a pretty-formatted -        status report""" -        while True: -            chkpt = gconf.configinterface.get_realtime("checkpoint") -            if not chkpt: -                gconf.configinterface.delete("checkpoint_completed") -                gconf.configinterface.delete("checkpoint_target") -                # dummy loop for the case when there is no checkpt set -                select([chan], [], []) -                conn, _ = chan.accept() -                conn.send('\0') -                conn.close() -                continue - -            checkpt_tgt = self._checkpt_param(chkpt, 'target') -            if not checkpt_tgt: -                checkpt_tgt = self.xtime('.') -                if isinstance(checkpt_tgt, int): -                    raise GsyncdError("master root directory is " -                                      "unaccessible (%s)", -                                      os.strerror(checkpt_tgt)) -                self._set_checkpt_param(chkpt, 'target', checkpt_tgt) -            logging.debug("checkpoint target %s has been determined " -                          "for checkpoint %s" % -                          (repr(checkpt_tgt), chkpt)) - -            # check if the label is 'now' -            chkpt_lbl = chkpt -            try: -                x1, x2 = chkpt.split(':') -                if x1 == 'now': -                    chkpt_lbl = "as of " + self.humantime(x2) -            except: -                pass -            completed = self._checkpt_param(chkpt, 'completed', xtimish=False) -            if completed: -                completed = tuple(int(x) for x in completed.split('.')) -            s, _, _ = select([chan], [], [], (not completed) and 5 or None) -            # either request made and we re-check to not -            # give back stale data, or we still hunting for completion -            if (self.native_xtime(checkpt_tgt) and ( -                    self.native_xtime(checkpt_tgt) < self.volmark)): -                # indexing has been reset since setting the checkpoint -                status = "is invalid" -            else: -                xtr = self.xtime('.', self.slave) -                if isinstance(xtr, int): -                    raise GsyncdError("slave root directory is " -                                      "unaccessible (%s)", -                                      os.strerror(xtr)) -                ncompleted = self.xtime_geq(xtr, checkpt_tgt) -                if completed and not ncompleted:  # stale data -                    logging.warn("completion time %s for checkpoint %s " -                                 "became stale" % -                                 (self.humantime(*completed), chkpt)) -                    completed = None -                    gconf.configinterface.delete('checkpoint_completed') -                if ncompleted and not completed:  # just reaching completion -                    completed = "%.6f" % time.time() -                    self._set_checkpt_param( -                        chkpt, 'completed', completed, xtimish=False) -                    completed = tuple(int(x) for x in completed.split('.')) -                    logging.info("checkpoint %s completed" % chkpt) -                status = completed and \ -                    "completed at " + self.humantime(completed[0]) or \ -                    "not reached yet" -            if s: -                conn = None -                try: -                    conn, _ = chan.accept() -                    try: -                        conn.send("checkpoint %s is %s\0" % -                                  (chkpt_lbl, status)) -                    except: -                        exc = sys.exc_info()[1] -                        if ((isinstance(exc, OSError) or isinstance( -                                exc, IOError)) and exc.errno == EPIPE): -                            logging.debug('checkpoint client disconnected') -                        else: -                            raise -                finally: -                    if conn: -                        conn.close() - -    def start_checkpoint_thread(self): -        """prepare and start checkpoint service""" -        if self.checkpoint_thread or not ( -            getattr(gconf, 'state_socket_unencoded', None) and getattr( -                gconf, 'socketdir', None) -        ): -            return -        chan = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) -        state_socket = os.path.join( -            gconf.socketdir, md5hex(gconf.state_socket_unencoded) + ".socket") -        try: -            os.unlink(state_socket) -        except: -            if sys.exc_info()[0] == OSError: -                pass -        chan.bind(state_socket) -        chan.listen(1) -        chkpt = gconf.configinterface.get_realtime("checkpoint") -        t = Thread(target=self.checkpt_service, args=(chan, chkpt)) -        t.start() -        self.checkpoint_thread = t -      def add_job(self, path, label, job, *a, **kw):          """insert @job function to job table at @path with @label"""          if self.jobtab.get(path) is None: @@ -929,11 +778,15 @@ class GMasterChangelogMixin(GMasterCommon):              files_pending['purge'] += 1          def log_failures(failures, entry_key, gfid_prefix, log_prefix): +            num_failures = 0              for failure in failures:                  st = lstat(os.path.join(gfid_prefix, failure[0][entry_key]))                  if not isinstance(st, int): +                    num_failures += 1                      logging.warn('%s FAILED: %s' % (log_prefix, repr(failure))) +            self.status.inc_value("failures", num_failures) +          for e in clist:              e = e.strip()              et = e[self.IDX_START:self.IDX_END]   # entry type @@ -1032,12 +885,18 @@ class GMasterChangelogMixin(GMasterCommon):              else:                  logging.warn('got invalid changelog type: %s' % (et))          logging.debug('entries: %s' % repr(entries)) -        if not retry: -            self.update_worker_cumilitive_status(files_pending) + +        # Increment counters for Status +        self.status.inc_value("entry", len(entries)) +        self.files_in_batch = len(datas) +        self.status.inc_value("data", self.files_in_batch) +          # sync namespace          if entries:              failures = self.slave.server.entry_ops(entries)              log_failures(failures, 'gfid', gauxpfx(), 'ENTRY') +            self.status.dec_value("entry", len(entries)) +          # sync metadata          if meta_gfid:              meta_entries = [] @@ -1051,8 +910,11 @@ class GMasterChangelogMixin(GMasterCommon):                      continue                  meta_entries.append(edct('META', go=go[0], stat=st))              if meta_entries: +                self.status.inc_value("meta", len(entries))                  failures = self.slave.server.meta_ops(meta_entries)                  log_failures(failures, 'go', '', 'META') +                self.status.dec_value("meta", len(entries)) +          # sync data          if datas:              self.a_syncdata(datas) @@ -1104,9 +966,17 @@ class GMasterChangelogMixin(GMasterCommon):                  if done:                      xtl = (int(change.split('.')[-1]) - 1, 0)                      self.upd_stime(xtl) +                    chkpt_time = gconf.configinterface.get_realtime( +                        "checkpoint") +                    checkpoint_time = 0 +                    if chkpt_time is not None: +                        checkpoint_time = int(chkpt_time) + +                    self.status.set_last_synced(xtl, checkpoint_time)                      map(self.changelog_done_func, changes)                      self.archive_and_purge_changelogs(changes) -                self.update_worker_files_syncd() +                self.status.dec_value("data", self.files_in_batch) +                self.files_in_batch = 0                  break              # We do not know which changelog transfer failed, retry everything. @@ -1116,14 +986,22 @@ class GMasterChangelogMixin(GMasterCommon):                  logging.warn('changelogs %s could not be processed - '                               'moving on...' %                               ' '.join(map(os.path.basename, changes))) -                self.update_worker_total_files_skipped( -                    self.current_files_skipped_count) +                self.status.inc_value("failures", +                                      self.current_files_skipped_count)                  logging.warn('SKIPPED GFID = %s' %                               ','.join(self.skipped_gfid_list)) -                self.update_worker_files_syncd() + +                self.files_in_batch = 0                  if done:                      xtl = (int(change.split('.')[-1]) - 1, 0)                      self.upd_stime(xtl) +                    chkpt_time = gconf.configinterface.get_realtime( +                        "checkpoint") +                    checkpoint_time = 0 +                    if chkpt_time is not None: +                        checkpoint_time = int(chkpt_time) + +                    self.status.set_last_synced(xtl, checkpoint_time)                      map(self.changelog_done_func, changes)                      self.archive_and_purge_changelogs(changes)                  break @@ -1144,164 +1022,15 @@ class GMasterChangelogMixin(GMasterCommon):          if not stime == URXTIME:              self.sendmark(path, stime) -    def get_worker_status_file(self): -        file_name = gconf.local_path + '.status' -        file_name = file_name.replace("/", "_") -        worker_status_file = gconf.georep_session_working_dir + file_name -        return worker_status_file - -    def update_worker_status(self, key, value): -        default_data = {"remote_node": "N/A", -                        "worker status": "Not Started", -                        "crawl status": "N/A", -                        "files_syncd": 0, -                        "files_remaining": 0, -                        "bytes_remaining": 0, -                        "purges_remaining": 0, -                        "total_files_skipped": 0} -        worker_status_file = self.get_worker_status_file() -        try: -            with open(worker_status_file, 'r+') as f: -                loaded_data = json.load(f) -                loaded_data[key] = value -                os.ftruncate(f.fileno(), 0) -                os.lseek(f.fileno(), 0, os.SEEK_SET) -                json.dump(loaded_data, f) -                f.flush() -                os.fsync(f.fileno()) -        except (IOError, OSError, ValueError): -            logging.info('Creating new %s' % worker_status_file) -            try: -                with open(worker_status_file, 'wb') as f: -                    default_data[key] = value -                    json.dump(default_data, f) -                    f.flush() -                    os.fsync(f.fileno()) -            except: -                raise - -    def update_worker_cumilitive_status(self, files_pending): -        default_data = {"remote_node": "N/A", -                        "worker status": "Not Started", -                        "crawl status": "N/A", -                        "files_syncd": 0, -                        "files_remaining": 0, -                        "bytes_remaining": 0, -                        "purges_remaining": 0, -                        "total_files_skipped": 0} -        worker_status_file = self.get_worker_status_file() -        try: -            with open(worker_status_file, 'r+') as f: -                loaded_data = json.load(f) -                loaded_data['files_remaining'] = files_pending['count'] -                loaded_data['bytes_remaining'] = files_pending['bytes'] -                loaded_data['purges_remaining'] = files_pending['purge'] -                os.ftruncate(f.fileno(), 0) -                os.lseek(f.fileno(), 0, os.SEEK_SET) -                json.dump(loaded_data, f) -                f.flush() -                os.fsync(f.fileno()) -        except (IOError, OSError, ValueError): -            logging.info('Creating new %s' % worker_status_file) -            try: -                with open(worker_status_file, 'wb') as f: -                    default_data['files_remaining'] = files_pending['count'] -                    default_data['bytes_remaining'] = files_pending['bytes'] -                    default_data['purges_remaining'] = files_pending['purge'] -                    json.dump(default_data, f) -                    f.flush() -                    os.fsync(f.fileno()) -            except: -                raise -      def update_worker_remote_node(self):          node = sys.argv[-1] -        node = node.split("@")[-1] +        node_data = node.split("@") +        node = node_data[-1]          remote_node_ip = node.split(":")[0] -        remote_node_vol = node.split(":")[3] -        remote_node = remote_node_ip + '::' + remote_node_vol -        self.update_worker_status('remote_node', remote_node) - -    def update_worker_health(self, state): -        self.update_worker_status('worker status', state) - -    def update_worker_crawl_status(self, state): -        self.update_worker_status('crawl status', state) - -    def update_worker_files_syncd(self): -        default_data = {"remote_node": "N/A", -                        "worker status": "Not Started", -                        "crawl status": "N/A", -                        "files_syncd": 0, -                        "files_remaining": 0, -                        "bytes_remaining": 0, -                        "purges_remaining": 0, -                        "total_files_skipped": 0} -        worker_status_file = self.get_worker_status_file() -        try: -            with open(worker_status_file, 'r+') as f: -                loaded_data = json.load(f) -                loaded_data['files_syncd'] += loaded_data['files_remaining'] -                loaded_data['files_remaining'] = 0 -                loaded_data['bytes_remaining'] = 0 -                loaded_data['purges_remaining'] = 0 -                os.ftruncate(f.fileno(), 0) -                os.lseek(f.fileno(), 0, os.SEEK_SET) -                json.dump(loaded_data, f) -                f.flush() -                os.fsync(f.fileno()) -        except (IOError, OSError, ValueError): -            logging.info('Creating new %s' % worker_status_file) -            try: -                with open(worker_status_file, 'wb') as f: -                    json.dump(default_data, f) -                    f.flush() -                    os.fsync(f.fileno()) -            except: -                raise - -    def update_worker_files_remaining(self, state): -        self.update_worker_status('files_remaining', state) - -    def update_worker_bytes_remaining(self, state): -        self.update_worker_status('bytes_remaining', state) - -    def update_worker_purges_remaining(self, state): -        self.update_worker_status('purges_remaining', state) - -    def update_worker_total_files_skipped(self, value): -        default_data = {"remote_node": "N/A", -                        "worker status": "Not Started", -                        "crawl status": "N/A", -                        "files_syncd": 0, -                        "files_remaining": 0, -                        "bytes_remaining": 0, -                        "purges_remaining": 0, -                        "total_files_skipped": 0} -        worker_status_file = self.get_worker_status_file() -        try: -            with open(worker_status_file, 'r+') as f: -                loaded_data = json.load(f) -                loaded_data['total_files_skipped'] = value -                loaded_data['files_remaining'] -= value -                os.ftruncate(f.fileno(), 0) -                os.lseek(f.fileno(), 0, os.SEEK_SET) -                json.dump(loaded_data, f) -                f.flush() -                os.fsync(f.fileno()) -        except (IOError, OSError, ValueError): -            logging.info('Creating new %s' % worker_status_file) -            try: -                with open(worker_status_file, 'wb') as f: -                    default_data['total_files_skipped'] = value -                    json.dump(default_data, f) -                    f.flush() -                    os.fsync(f.fileno()) -            except: -                raise +        self.status.set_slave_node(remote_node_ip)      def crawl(self): -        self.update_worker_crawl_status("Changelog Crawl") +        self.status.set_worker_crawl_status("Changelog Crawl")          changes = []          # get stime (from the brick) and purge changelogs          # that are _historical_ to that time. @@ -1327,16 +1056,17 @@ class GMasterChangelogMixin(GMasterCommon):                  logging.debug('processing changes %s' % repr(changes))                  self.process(changes) -    def register(self, register_time, changelog_agent): +    def register(self, register_time, changelog_agent, status):          self.changelog_agent = changelog_agent          self.sleep_interval = int(gconf.change_interval)          self.changelog_done_func = self.changelog_agent.done          self.processed_changelogs_dir = os.path.join(self.setup_working_dir(),                                                       ".processed") +        self.status = status  class GMasterChangeloghistoryMixin(GMasterChangelogMixin): -    def register(self, register_time, changelog_agent): +    def register(self, register_time, changelog_agent, status):          self.changelog_agent = changelog_agent          self.changelog_register_time = register_time          self.history_crawl_start_time = register_time @@ -1344,10 +1074,11 @@ class GMasterChangeloghistoryMixin(GMasterChangelogMixin):          self.history_turns = 0          self.processed_changelogs_dir = os.path.join(self.setup_working_dir(),                                                       ".history/.processed") +        self.status = status      def crawl(self):          self.history_turns += 1 -        self.update_worker_crawl_status("History Crawl") +        self.status.set_worker_crawl_status("History Crawl")          purge_time = self.get_purge_time()          logging.info('starting history crawl... turns: %s, stime: %s' @@ -1429,7 +1160,8 @@ class GMasterXsyncMixin(GMasterChangelogMixin):      XSYNC_MAX_ENTRIES = 1 << 13 -    def register(self, register_time=None, changelog_agent=None): +    def register(self, register_time=None, changelog_agent=None, status=None): +        self.status = status          self.counter = 0          self.comlist = []          self.stimes = [] @@ -1460,7 +1192,7 @@ class GMasterXsyncMixin(GMasterChangelogMixin):          t.start()          logging.info('starting hybrid crawl..., stime: %s'                       % repr(self.get_purge_time())) -        self.update_worker_crawl_status("Hybrid Crawl") +        self.status.set_worker_crawl_status("Hybrid Crawl")          while True:              try:                  item = self.comlist.pop(0) diff --git a/geo-replication/syncdaemon/monitor.py b/geo-replication/syncdaemon/monitor.py index 029726c7a5a..ba5c8e32514 100644 --- a/geo-replication/syncdaemon/monitor.py +++ b/geo-replication/syncdaemon/monitor.py @@ -22,10 +22,12 @@ from errno import EEXIST  import re  import random  from gconf import gconf -from syncdutils import update_file, select, waitpid +from syncdutils import select, waitpid  from syncdutils import set_term_handler, is_host_local, GsyncdError  from syncdutils import escape, Thread, finalize, memoize +from gsyncdstatus import GeorepStatus, set_monitor_status +  ParseError = XET.ParseError if hasattr(XET, 'ParseError') else SyntaxError @@ -125,46 +127,22 @@ class Volinfo(object):      def disperse_count(self):          return int(self.get('disperseCount')[0].text) +  class Monitor(object):      """class which spawns and manages gsyncd workers"""      ST_INIT = 'Initializing...' -    ST_STABLE = 'Stable' -    ST_FAULTY = 'faulty' +    ST_STARTED = 'Started' +    ST_STABLE = 'Active' +    ST_FAULTY = 'Faulty'      ST_INCON = 'inconsistent'      _ST_ORD = [ST_STABLE, ST_INIT, ST_FAULTY, ST_INCON]      def __init__(self):          self.lock = Lock()          self.state = {} - -    def set_state(self, state, w=None): -        """set the state that can be used by external agents -           like glusterd for status reporting""" -        computestate = lambda: self.state and self._ST_ORD[ -            max(self._ST_ORD.index(s) for s in self.state.values())] -        if w: -            self.lock.acquire() -            old_state = computestate() -            self.state[w] = state -            state = computestate() -            self.lock.release() -            if state != old_state: -                self.set_state(state) -        else: -            if getattr(gconf, 'state_file', None): -                # If previous state is paused, suffix the -                # new state with '(Paused)' -                try: -                    with open(gconf.state_file, "r") as f: -                        content = f.read() -                        if "paused" in content.lower(): -                            state = state + '(Paused)' -                except IOError: -                    pass -                logging.info('new state: %s' % state) -                update_file(gconf.state_file, lambda f: f.write(state + '\n')) +        self.status = {}      @staticmethod      def terminate(): @@ -174,8 +152,7 @@ class Monitor(object):          # give a chance to graceful exit          os.kill(-os.getpid(), signal.SIGTERM) - -    def monitor(self, w, argv, cpids, agents, slave_vol, slave_host): +    def monitor(self, w, argv, cpids, agents, slave_vol, slave_host, master):          """the monitor loop          Basic logic is a blantantly simple blunt heuristics: @@ -194,8 +171,11 @@ class Monitor(object):          blown worker blows up on EPIPE if the net goes down,          due to the keep-alive thread)          """ +        if not self.status.get(w[0], None): +            self.status[w[0]] = GeorepStatus(gconf.state_file, w[0]) -        self.set_state(self.ST_INIT, w) +        set_monitor_status(gconf.state_file, self.ST_STARTED) +        self.status[w[0]].set_worker_status(self.ST_INIT)          ret = 0 @@ -310,7 +290,7 @@ class Monitor(object):                  nwait(apid) #wait for agent                  ret = nwait(cpid)              if ret is None: -                self.set_state(self.ST_STABLE, w) +                self.status[w[0]].set_worker_status(self.ST_STABLE)                  #If worker dies, agent terminates on EOF.                  #So lets wait for agent first.                  nwait(apid) @@ -320,12 +300,12 @@ class Monitor(object):              else:                  ret = exit_status(ret)                  if ret in (0, 1): -                    self.set_state(self.ST_FAULTY, w) +                    self.status[w[0]].set_worker_status(self.ST_FAULTY)              time.sleep(10) -        self.set_state(self.ST_INCON, w) +        self.status[w[0]].set_worker_status(self.ST_INCON)          return ret -    def multiplex(self, wspx, suuid, slave_vol, slave_host): +    def multiplex(self, wspx, suuid, slave_vol, slave_host, master):          argv = sys.argv[:]          for o in ('-N', '--no-daemon', '--monitor'):              while o in argv: @@ -339,7 +319,7 @@ class Monitor(object):          for wx in wspx:              def wmon(w):                  cpid, _ = self.monitor(w, argv, cpids, agents, slave_vol, -                                       slave_host) +                                       slave_host, master)                  time.sleep(1)                  self.lock.acquire()                  for cpid in cpids: @@ -401,7 +381,7 @@ def distribute(*resources):                    for idx, brick in enumerate(mvol.bricks)                    if is_host_local(brick['host'])]      logging.info('worker specs: ' + repr(workerspex)) -    return workerspex, suuid, slave_vol, slave_host +    return workerspex, suuid, slave_vol, slave_host, master  def monitor(*resources): diff --git a/geo-replication/syncdaemon/resource.py b/geo-replication/syncdaemon/resource.py index d3d1ee36e01..6bf1ad03e70 100644 --- a/geo-replication/syncdaemon/resource.py +++ b/geo-replication/syncdaemon/resource.py @@ -38,6 +38,7 @@ from syncdutils import umask, entry2pb, gauxpfx, errno_wrap, lstat  from syncdutils import NoPurgeTimeAvailable, PartialHistoryAvailable  from syncdutils import ChangelogException  from syncdutils import CHANGELOG_AGENT_CLIENT_VERSION +from gsyncdstatus import GeorepStatus  UrlRX = re.compile('\A(\w+)://([^ *?[]*)\Z') @@ -611,6 +612,9 @@ class Server(object):          def collect_failure(e, cmd_ret):              # We do this for failing fops on Slave              # Master should be logging this +            if cmd_ret is None: +                return +              if cmd_ret == EEXIST:                  disk_gfid = cls.gfid_mnt(e['entry'])                  if isinstance(disk_gfid, basestring): @@ -1344,6 +1348,8 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):              os.close(int(ra))              os.close(int(wa))              changelog_agent = RepceClient(int(inf), int(ouf)) +            status = GeorepStatus(gconf.state_file, gconf.local_path) +            status.reset_on_worker_start()              rv = changelog_agent.version()              if int(rv) != CHANGELOG_AGENT_CLIENT_VERSION:                  raise GsyncdError( @@ -1367,13 +1373,13 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):                                               g2.CHANGELOG_CONN_RETRIES)                  register_time = int(time.time()) -                g2.register(register_time, changelog_agent) -                g3.register(register_time, changelog_agent) +                g2.register(register_time, changelog_agent, status) +                g3.register(register_time, changelog_agent, status)              except ChangelogException as e:                  logging.error("Changelog register failed, %s" % e)                  sys.exit(1) -            g1.register() +            g1.register(status=status)              logging.info("Register time: %s" % register_time)              # oneshot: Try to use changelog history api, if not              # available switch to FS crawl diff --git a/geo-replication/tests/unit/test_gsyncdstatus.py b/geo-replication/tests/unit/test_gsyncdstatus.py new file mode 100644 index 00000000000..a65d659e356 --- /dev/null +++ b/geo-replication/tests/unit/test_gsyncdstatus.py @@ -0,0 +1,193 @@ +#!/usr/bin/env python +# +# Copyright (c) 2011-2014 Red Hat, Inc. <http://www.redhat.com> +# This file is part of GlusterFS. + +# This file is licensed to you under your choice of the GNU Lesser +# General Public License, version 3 or any later version (LGPLv3 or +# later), or the GNU General Public License, version 2 (GPLv2), in all +# cases as published by the Free Software Foundation. +# + +import unittest +import os +import urllib + +from syncdaemon.gstatus import GeorepStatus, set_monitor_status +from syncdaemon.gstatus import get_default_values +from syncdaemon.gstatus import MONITOR_STATUS, DEFAULT_STATUS +from syncdaemon.gstatus import STATUS_VALUES, CRAWL_STATUS_VALUES +from syncdaemon.gstatus import human_time, human_time_utc + + +class GeorepStatusTestCase(unittest.TestCase): +    @classmethod +    def setUpClass(cls): +        cls.work_dir = os.path.dirname(os.path.abspath(__file__)) +        cls.monitor_status_file = os.path.join(cls.work_dir, "monitor.status") +        cls.brick = "/exports/bricks/b1" +        cls.status = GeorepStatus(cls.monitor_status_file, cls.brick) +        cls.statusfile = os.path.join(cls.work_dir, +                                      "brick_%s.status" +                                      % urllib.quote_plus(cls.brick)) + +    @classmethod +    def tearDownClass(cls): +        os.remove(cls.statusfile) +        os.remove(cls.monitor_status_file) + +    def _filter_dict(self, inp, keys): +        op = {} +        for k in keys: +            op[k] = inp.get(k, None) +        return op + +    def test_monitor_status_file_created(self): +        self.assertTrue(os.path.exists(self.monitor_status_file)) + +    def test_status_file_created(self): +        self.assertTrue(os.path.exists(self.statusfile)) + +    def test_set_monitor_status(self): +        for st in MONITOR_STATUS: +            set_monitor_status(self.monitor_status_file, st) +            self.assertTrue(self.status.get_monitor_status(), st) + +    def test_default_values_test(self): +        self.assertTrue(get_default_values(), { +            "slave_node": DEFAULT_STATUS, +            "worker_status": DEFAULT_STATUS, +            "last_synced": 0, +            "last_synced_utc": 0, +            "crawl_status": DEFAULT_STATUS, +            "entry": 0, +            "data": 0, +            "metadata": 0, +            "failures": 0, +            "checkpoint_completed": False, +            "checkpoint_time": 0, +            "checkpoint_time_utc": 0, +            "checkpoint_completion_time": 0, +            "checkpoint_completion_time_utc": 0 +        }) + +    def test_human_time(self): +        self.assertTrue(human_time(1429174398), "2015-04-16 14:23:18") + +    def test_human_time_utc(self): +        self.assertTrue(human_time_utc(1429174398), "2015-04-16 08:53:18") + +    def test_invalid_human_time(self): +        self.assertTrue(human_time(142917439), DEFAULT_STATUS) +        self.assertTrue(human_time("abcdef"), DEFAULT_STATUS) + +    def test_invalid_human_time_utc(self): +        self.assertTrue(human_time_utc(142917439), DEFAULT_STATUS) +        self.assertTrue(human_time_utc("abcdef"), DEFAULT_STATUS) + +    def test_worker_status(self): +        set_monitor_status(self.monitor_status_file, "Started") +        for st in STATUS_VALUES: +            self.status.set_worker_status(st) +            self.assertTrue(self.status.get_status()["worker_status"], st) + +    def test_crawl_status(self): +        set_monitor_status(self.monitor_status_file, "Started") +        self.status.set_active() +        for st in CRAWL_STATUS_VALUES: +            self.status.set_worker_crawl_status(st) +            self.assertTrue(self.status.get_status()["crawl_status"], st) + +    def test_slave_node(self): +        set_monitor_status(self.monitor_status_file, "Started") +        self.status.set_active() +        self.status.set_slave_node("fvm2") +        self.assertTrue(self.status.get_status()["slave_node"], "fvm2") + +        self.status.set_worker_status("Passive") +        self.status.set_slave_node("fvm2") +        self.assertTrue(self.status.get_status()["slave_node"], "fvm2") + +    def test_active_worker_status(self): +        set_monitor_status(self.monitor_status_file, "Started") +        self.status.set_active() +        self.assertTrue(self.status.get_status()["worker_status"], "Active") + +    def test_passive_worker_status(self): +        set_monitor_status(self.monitor_status_file, "Started") +        self.status.set_passive() +        self.assertTrue(self.status.get_status()["worker_status"], "Passive") + +    def test_set_field(self): +        set_monitor_status(self.monitor_status_file, "Started") +        self.status.set_active() +        self.status.set_field("entry", 42) +        self.assertTrue(self.status.get_status()["entry"], 42) + +    def test_inc_value(self): +        set_monitor_status(self.monitor_status_file, "Started") +        self.status.set_active() +        self.status.set_field("entry", 0) +        self.status.inc_value("entry", 2) +        self.assertTrue(self.status.get_status()["entry"], 2) + +        self.status.set_field("data", 0) +        self.status.inc_value("data", 2) +        self.assertTrue(self.status.get_status()["data"], 2) + +        self.status.set_field("meta", 0) +        self.status.inc_value("meta", 2) +        self.assertTrue(self.status.get_status()["meta"], 2) + +        self.status.set_field("failures", 0) +        self.status.inc_value("failures", 2) +        self.assertTrue(self.status.get_status()["failures"], 2) + +    def test_dec_value(self): +        set_monitor_status(self.monitor_status_file, "Started") +        self.status.set_active() + +        self.status.set_field("entry", 4) +        self.status.inc_value("entry", 2) +        self.assertTrue(self.status.get_status()["entry"], 2) + +        self.status.set_field("data", 4) +        self.status.inc_value("data", 2) +        self.assertTrue(self.status.get_status()["data"], 2) + +        self.status.set_field("meta", 4) +        self.status.inc_value("meta", 2) +        self.assertTrue(self.status.get_status()["meta"], 2) + +        self.status.set_field("failures", 4) +        self.status.inc_value("failures", 2) +        self.assertTrue(self.status.get_status()["failures"], 2) + +    def test_worker_status_when_monitor_status_created(self): +        set_monitor_status(self.monitor_status_file, "Created") +        for st in STATUS_VALUES: +            self.status.set_worker_status(st) +            self.assertTrue(self.status.get_status()["worker_status"], +                            "Created") + +    def test_worker_status_when_monitor_status_paused(self): +        set_monitor_status(self.monitor_status_file, "Paused") +        for st in STATUS_VALUES: +            self.status.set_worker_status(st) +            self.assertTrue(self.status.get_status()["worker_status"], +                            "Paused") + +    def test_worker_status_when_monitor_status_stopped(self): +        set_monitor_status(self.monitor_status_file, "Stopped") +        for st in STATUS_VALUES: +            self.status.set_worker_status(st) +            self.assertTrue(self.status.get_status()["worker_status"], +                            "Stopped") + +    def test_status_when_worker_status_active(self): +        set_monitor_status(self.monitor_status_file, "Started") +        self.status.set_active() + + +if __name__ == "__main__": +    unittest.main()  | 
