diff options
Diffstat (limited to 'geo-replication/syncdaemon/gsyncdstatus.py')
| -rw-r--r-- | geo-replication/syncdaemon/gsyncdstatus.py | 121 |
1 files changed, 107 insertions, 14 deletions
diff --git a/geo-replication/syncdaemon/gsyncdstatus.py b/geo-replication/syncdaemon/gsyncdstatus.py index 57692f8fab0..1a655ff8887 100644 --- a/geo-replication/syncdaemon/gsyncdstatus.py +++ b/geo-replication/syncdaemon/gsyncdstatus.py @@ -1,4 +1,4 @@ -#!/usr/bin/env python +#!/usr/bin/python3 # # Copyright (c) 2011-2014 Red Hat, Inc. <http://www.redhat.com> # This file is part of GlusterFS. @@ -9,13 +9,22 @@ # cases as published by the Free Software Foundation. # +from __future__ import print_function import fcntl import os import tempfile -import urllib +try: + import urllib.parse as urllib +except ImportError: + import urllib import json import time from datetime import datetime +from errno import EACCES, EAGAIN, ENOENT +import logging + +from syncdutils import (EVENT_GEOREP_ACTIVE, EVENT_GEOREP_PASSIVE, gf_event, + EVENT_GEOREP_CHECKPOINT_COMPLETED, lf) DEFAULT_STATUS = "N/A" MONITOR_STATUS = ("Created", "Started", "Paused", "Stopped") @@ -51,6 +60,7 @@ def get_default_values(): "slave_node": DEFAULT_STATUS, "worker_status": DEFAULT_STATUS, "last_synced": 0, + "last_synced_entry": 0, "crawl_status": DEFAULT_STATUS, "entry": 0, "data": 0, @@ -93,6 +103,7 @@ class LockedOpen(object): return f def __exit__(self, _exc_type, _exc_value, _traceback): + fcntl.flock(self.fileobj, fcntl.LOCK_UN) self.fileobj.close() @@ -113,7 +124,12 @@ def set_monitor_status(status_file, status): class GeorepStatus(object): - def __init__(self, monitor_status_file, brick): + def __init__(self, monitor_status_file, master_node, brick, master_node_id, + master, slave, monitor_pid_file=None): + self.master = master + slv_data = slave.split("::") + self.slave_host = slv_data[0] + self.slave_volume = slv_data[1].split(":")[0] # Remove Slave UUID self.work_dir = os.path.dirname(monitor_status_file) self.monitor_status_file = monitor_status_file self.filename = os.path.join(self.work_dir, @@ -124,17 +140,35 @@ class GeorepStatus(object): os.close(fd) fd = os.open(self.monitor_status_file, os.O_CREAT | os.O_RDWR) os.close(fd) + self.master_node = master_node + self.master_node_id = master_node_id self.brick = brick self.default_values = get_default_values() + self.monitor_pid_file = monitor_pid_file + + def send_event(self, event_type, **kwargs): + gf_event(event_type, + master_volume=self.master, + master_node=self.master_node, + master_node_id=self.master_node_id, + slave_host=self.slave_host, + slave_volume=self.slave_volume, + brick_path=self.brick, + **kwargs) def _update(self, mergerfunc): + data = self.default_values with LockedOpen(self.filename, 'r+') as f: try: - data = json.load(f) + data.update(json.load(f)) except ValueError: - data = self.default_values + pass data = mergerfunc(data) + # If Data is not changed by merger func + if not data: + return False + with tempfile.NamedTemporaryFile( 'w', dir=os.path.dirname(self.filename), @@ -147,6 +181,7 @@ class GeorepStatus(object): os.O_DIRECTORY) os.fsync(dirfd) os.close(dirfd) + return True def reset_on_worker_start(self): def merger(data): @@ -161,10 +196,20 @@ class GeorepStatus(object): def set_field(self, key, value): def merger(data): + # Current data and prev data is same + if data[key] == value: + return {} + data[key] = value return json.dumps(data) - self._update(merger) + return self._update(merger) + + def trigger_gf_event_checkpoint_completion(self, checkpoint_time, + checkpoint_completion_time): + self.send_event(EVENT_GEOREP_CHECKPOINT_COMPLETED, + checkpoint_time=checkpoint_time, + checkpoint_completion_time=checkpoint_completion_time) def set_last_synced(self, value, checkpoint_time): def merger(data): @@ -182,18 +227,30 @@ class GeorepStatus(object): # previously then update the checkpoint completed time if checkpoint_time > 0 and checkpoint_time <= value[0]: if data["checkpoint_completed"] == "No": + curr_time = int(time.time()) data["checkpoint_time"] = checkpoint_time - data["checkpoint_completion_time"] = int(time.time()) + data["checkpoint_completion_time"] = curr_time data["checkpoint_completed"] = "Yes" + logging.info(lf("Checkpoint completed", + checkpoint_time=human_time_utc( + checkpoint_time), + completion_time=human_time_utc(curr_time))) + self.trigger_gf_event_checkpoint_completion( + checkpoint_time, curr_time) + return json.dumps(data) self._update(merger) def set_worker_status(self, status): - self.set_field("worker_status", status) + if self.set_field("worker_status", status): + logging.info(lf("Worker Status Change", + status=status)) def set_worker_crawl_status(self, status): - self.set_field("crawl_status", status) + if self.set_field("crawl_status", status): + logging.info(lf("Crawl Status Change", + status=status)) def set_slave_node(self, slave_node): def merger(data): @@ -219,10 +276,16 @@ class GeorepStatus(object): self._update(merger) def set_active(self): - self.set_field("worker_status", "Active") + if self.set_field("worker_status", "Active"): + logging.info(lf("Worker Status Change", + status="Active")) + self.send_event(EVENT_GEOREP_ACTIVE) def set_passive(self): - self.set_field("worker_status", "Passive") + if self.set_field("worker_status", "Passive"): + logging.info(lf("Worker Status Change", + status="Passive")) + self.send_event(EVENT_GEOREP_PASSIVE) def get_monitor_status(self): data = "" @@ -237,6 +300,7 @@ class GeorepStatus(object): slave_node N/A VALUE VALUE N/A status Created VALUE Paused Stopped last_synced N/A VALUE VALUE VALUE + last_synced_entry N/A VALUE VALUE VALUE crawl_status N/A VALUE N/A N/A entry N/A VALUE N/A N/A data N/A VALUE N/A N/A @@ -254,9 +318,29 @@ class GeorepStatus(object): pass monitor_status = self.get_monitor_status() + # Verifying whether monitor process running and adjusting status + if monitor_status in ["Started", "Paused"]: + try: + with open(self.monitor_pid_file, "r+") as f: + fcntl.lockf(f, fcntl.LOCK_EX | fcntl.LOCK_NB) + monitor_status = "Stopped" + except (IOError, OSError) as e: + # If pid file not exists, either monitor died or Geo-rep + # not even started once + if e.errno == ENOENT: + monitor_status = "Stopped" + elif e.errno in (EACCES, EAGAIN): + # cannot grab. so, monitor process still running..move on + pass + else: + raise + if monitor_status in ["Created", "Paused", "Stopped"]: data["worker_status"] = monitor_status + if monitor_status == "": + data["worker_status"] = "Stopped" + # Checkpoint adjustments if checkpoint_time == 0: data["checkpoint_completed"] = DEFAULT_STATUS @@ -321,6 +405,15 @@ class GeorepStatus(object): return data - def print_status(self, checkpoint_time=0): - for key, value in self.get_status(checkpoint_time).items(): - print ("%s: %s" % (key, value)) + def print_status(self, checkpoint_time=0, json_output=False): + status_out = self.get_status(checkpoint_time) + if json_output: + out = {} + # Convert all values as string + for k, v in status_out.items(): + out[k] = str(v) + print(json.dumps(out)) + return + + for key, value in status_out.items(): + print(("%s: %s" % (key, value))) |
