diff options
Diffstat (limited to 'geo-replication/syncdaemon/gsyncdstatus.py')
| -rw-r--r-- | geo-replication/syncdaemon/gsyncdstatus.py | 317 | 
1 files changed, 317 insertions, 0 deletions
| diff --git a/geo-replication/syncdaemon/gsyncdstatus.py b/geo-replication/syncdaemon/gsyncdstatus.py new file mode 100644 index 00000000000..a49b9c23dea --- /dev/null +++ b/geo-replication/syncdaemon/gsyncdstatus.py @@ -0,0 +1,317 @@ +#!/usr/bin/env python +# +# Copyright (c) 2011-2014 Red Hat, Inc. <http://www.redhat.com> +# This file is part of GlusterFS. + +# This file is licensed to you under your choice of the GNU Lesser +# General Public License, version 3 or any later version (LGPLv3 or +# later), or the GNU General Public License, version 2 (GPLv2), in all +# cases as published by the Free Software Foundation. +# + +import fcntl +import os +import tempfile +import urllib +import json +import time +from datetime import datetime + +DEFAULT_STATUS = "N/A" +MONITOR_STATUS = ("Created", "Started", "Paused", "Stopped") +STATUS_VALUES = (DEFAULT_STATUS, +                 "Initializing...", +                 "Active", +                 "Passive", +                 "Faulty") + +CRAWL_STATUS_VALUES = (DEFAULT_STATUS, +                       "Hybrid Crawl", +                       "History Crawl", +                       "Changelog Crawl") + + +def human_time(ts): +    try: +        return datetime.fromtimestamp(float(ts)).strftime("%Y-%m-%d %H:%M:%S") +    except ValueError: +        return DEFAULT_STATUS + + +def human_time_utc(ts): +    try: +        return datetime.utcfromtimestamp( +            float(ts)).strftime("%Y-%m-%d %H:%M:%S") +    except ValueError: +        return DEFAULT_STATUS + + +def get_default_values(): +    return { +        "slave_node": DEFAULT_STATUS, +        "worker_status": DEFAULT_STATUS, +        "last_synced": 0, +        "crawl_status": DEFAULT_STATUS, +        "entry": 0, +        "data": 0, +        "meta": 0, +        "failures": 0, +        "checkpoint_completed": DEFAULT_STATUS, +        "checkpoint_time": 0, +        "checkpoint_completion_time": 0} + + +class LockedOpen(object): + +    def __init__(self, filename, *args, **kwargs): +        self.filename = filename +        self.open_args = args +        self.open_kwargs = kwargs +        self.fileobj = None + +    def __enter__(self): +        """ +        If two processes compete to update a file, The first process +        gets the lock and the second process is blocked in the fcntl.flock() +        call. When first process replaces the file and releases the lock, +        the already open file descriptor in the second process now points +        to a  "ghost" file(not reachable by any path name) with old contents. +        To avoid that conflict, check the fd already opened is same or +        not. Open new one if not same +        """ +        f = open(self.filename, *self.open_args, **self.open_kwargs) +        while True: +            fcntl.flock(f, fcntl.LOCK_EX) +            fnew = open(self.filename, *self.open_args, **self.open_kwargs) +            if os.path.sameopenfile(f.fileno(), fnew.fileno()): +                fnew.close() +                break +            else: +                f.close() +                f = fnew +        self.fileobj = f +        return f + +    def __exit__(self, _exc_type, _exc_value, _traceback): +        self.fileobj.close() + + +def set_monitor_status(status_file, status): +    fd = os.open(status_file, os.O_CREAT | os.O_RDWR) +    os.close(fd) +    with LockedOpen(status_file, 'r+'): +        with tempfile.NamedTemporaryFile('w', dir=os.path.dirname(status_file), +                                         delete=False) as tf: +            tf.write(status) +            tempname = tf.name + +        os.rename(tempname, status_file) +        dirfd = os.open(os.path.dirname(os.path.abspath(status_file)), +                        os.O_DIRECTORY) +        os.fsync(dirfd) +        os.close(dirfd) + + +class GeorepStatus(object): +    def __init__(self, monitor_status_file, brick): +        self.work_dir = os.path.dirname(monitor_status_file) +        self.monitor_status_file = monitor_status_file +        self.filename = os.path.join(self.work_dir, +                                     "brick_%s.status" +                                     % urllib.quote_plus(brick)) + +        fd = os.open(self.filename, os.O_CREAT | os.O_RDWR) +        os.close(fd) +        fd = os.open(self.monitor_status_file, os.O_CREAT | os.O_RDWR) +        os.close(fd) +        self.brick = brick +        self.default_values = get_default_values() + +    def _update(self, mergerfunc): +        with LockedOpen(self.filename, 'r+') as f: +            try: +                data = json.load(f) +            except ValueError: +                data = self.default_values + +            data = mergerfunc(data) +            with tempfile.NamedTemporaryFile( +                    'w', +                    dir=os.path.dirname(self.filename), +                    delete=False) as tf: +                tf.write(data) +                tempname = tf.name + +            os.rename(tempname, self.filename) +            dirfd = os.open(os.path.dirname(os.path.abspath(self.filename)), +                            os.O_DIRECTORY) +            os.fsync(dirfd) +            os.close(dirfd) + +    def reset_on_worker_start(self): +        def merger(data): +            data["slave_node"] = DEFAULT_STATUS +            data["crawl_status"] = DEFAULT_STATUS +            data["entry"] = 0 +            data["data"] = 0 +            data["meta"] = 0 +            return json.dumps(data) + +        self._update(merger) + +    def set_field(self, key, value): +        def merger(data): +            data[key] = value +            return json.dumps(data) + +        self._update(merger) + +    def set_last_synced(self, value, checkpoint_time): +        def merger(data): +            data["last_synced"] = value[0] + +            # If checkpoint is not set or reset +            # or if last set checkpoint is changed +            if checkpoint_time == 0 or \ +               checkpoint_time != data["checkpoint_time"]: +                data["checkpoint_time"] = 0 +                data["checkpoint_completion_time"] = 0 +                data["checkpoint_completed"] = "No" + +            # If checkpoint is completed and not marked as completed +            # previously then update the checkpoint completed time +            if checkpoint_time > 0 and checkpoint_time <= value[0]: +                if data["checkpoint_completed"] == "No": +                    data["checkpoint_time"] = checkpoint_time +                    data["checkpoint_completion_time"] = int(time.time()) +                    data["checkpoint_completed"] = "Yes" +            return json.dumps(data) + +        self._update(merger) + +    def set_worker_status(self, status): +        self.set_field("worker_status", status) + +    def set_worker_crawl_status(self, status): +        self.set_field("crawl_status", status) + +    def set_slave_node(self, slave_node): +        def merger(data): +            data["slave_node"] = slave_node +            return json.dumps(data) + +        self._update(merger) + +    def inc_value(self, key, value): +        def merger(data): +            data[key] = data.get(key, 0) + value +            return json.dumps(data) + +        self._update(merger) + +    def dec_value(self, key, value): +        def merger(data): +            data[key] = data.get(key, 0) - value +            if data[key] < 0: +                data[key] = 0 +            return json.dumps(data) + +        self._update(merger) + +    def set_active(self): +        self.set_field("worker_status", "Active") + +    def set_passive(self): +        self.set_field("worker_status", "Passive") + +    def get_monitor_status(self): +        data = "" +        with open(self.monitor_status_file, "r") as f: +            data = f.read().strip() +        return data + +    def get_status(self, checkpoint_time=0): +        """ +        Monitor Status --->        Created    Started  Paused      Stopped +        ---------------------------------------------------------------------- +        slave_node                 N/A        VALUE    VALUE       N/A +        status                     Created    VALUE    Paused      Stopped +        last_synced                N/A        VALUE    VALUE       VALUE +        crawl_status               N/A        VALUE    N/A         N/A +        entry                      N/A        VALUE    N/A         N/A +        data                       N/A        VALUE    N/A         N/A +        meta                       N/A        VALUE    N/A         N/A +        failures                   N/A        VALUE    VALUE       VALUE +        checkpoint_completed       N/A        VALUE    VALUE       VALUE +        checkpoint_time            N/A        VALUE    VALUE       VALUE +        checkpoint_completed_time  N/A        VALUE    VALUE       VALUE +        """ +        data = self.default_values +        with open(self.filename) as f: +            try: +                data.update(json.load(f)) +            except ValueError: +                pass +        monitor_status = self.get_monitor_status() + +        if monitor_status in ["Created", "Paused", "Stopped"]: +            data["worker_status"] = monitor_status + +        # Checkpoint adjustments +        if checkpoint_time == 0: +            data["checkpoint_completed"] = DEFAULT_STATUS +            data["checkpoint_time"] = DEFAULT_STATUS +            data["checkpoint_completion_time"] = DEFAULT_STATUS +        else: +            if checkpoint_time != data["checkpoint_time"]: +                if checkpoint_time <= data["last_synced"]: +                    data["checkpoint_completed"] = "Yes" +                    data["checkpoint_time"] = checkpoint_time +                    data["checkpoint_completion_time"] = data["last_synced"] +                else: +                    data["checkpoint_completed"] = "No" +                    data["checkpoint_time"] = checkpoint_time +                    data["checkpoint_completion_time"] = DEFAULT_STATUS + +        if data["checkpoint_time"] not in [0, DEFAULT_STATUS]: +            chkpt_time = data["checkpoint_time"] +            data["checkpoint_time"] = human_time(chkpt_time) +            data["checkpoint_time_utc"] = human_time_utc(chkpt_time) + +        if data["checkpoint_completion_time"] not in [0, DEFAULT_STATUS]: +            chkpt_completion_time = data["checkpoint_completion_time"] +            data["checkpoint_completion_time"] = human_time( +                chkpt_completion_time) +            data["checkpoint_completion_time_utc"] = human_time_utc( +                chkpt_completion_time) + +        if data["last_synced"] == 0: +            data["last_synced"] = DEFAULT_STATUS +            data["last_synced_utc"] = DEFAULT_STATUS +        else: +            last_synced = data["last_synced"] +            data["last_synced"] = human_time(last_synced) +            data["last_synced_utc"] = human_time_utc(last_synced) + +        if data["worker_status"] != "Active": +            data["last_synced"] = DEFAULT_STATUS +            data["last_synced_utc"] = DEFAULT_STATUS +            data["crawl_status"] = DEFAULT_STATUS +            data["entry"] = DEFAULT_STATUS +            data["data"] = DEFAULT_STATUS +            data["meta"] = DEFAULT_STATUS +            data["failures"] = DEFAULT_STATUS +            data["checkpoint_completed"] = DEFAULT_STATUS +            data["checkpoint_time"] = DEFAULT_STATUS +            data["checkpoint_completed_time"] = DEFAULT_STATUS +            data["checkpoint_time_utc"] = DEFAULT_STATUS +            data["checkpoint_completion_time_utc"] = DEFAULT_STATUS + +        if data["worker_status"] not in ["Active", "Passive"]: +            data["slave_node"] = DEFAULT_STATUS + +        return data + +    def print_status(self, checkpoint_time=0): +        for key, value in self.get_status(checkpoint_time).items(): +            print ("%s: %s" % (key, value)) | 
