diff options
| -rw-r--r-- | cli/src/cli-cmd-parser.c | 4 | ||||
| -rw-r--r-- | cli/src/cli-rpc-ops.c | 80 | ||||
| -rw-r--r-- | cli/src/cli-xml-output.c | 68 | ||||
| -rw-r--r-- | cli/src/cli.h | 3 | ||||
| -rw-r--r-- | geo-replication/syncdaemon/Makefile.am | 3 | ||||
| -rw-r--r-- | geo-replication/syncdaemon/gsyncd.py | 25 | ||||
| -rw-r--r-- | geo-replication/syncdaemon/gsyncdstatus.py | 317 | ||||
| -rw-r--r-- | geo-replication/syncdaemon/master.py | 374 | ||||
| -rw-r--r-- | geo-replication/syncdaemon/monitor.py | 58 | ||||
| -rw-r--r-- | geo-replication/syncdaemon/resource.py | 12 | ||||
| -rw-r--r-- | geo-replication/tests/unit/test_gsyncdstatus.py | 193 | ||||
| -rw-r--r-- | rpc/rpc-lib/src/protocol-common.h | 18 | ||||
| -rw-r--r-- | xlators/mgmt/glusterd/src/glusterd-geo-rep.c | 479 | 
13 files changed, 917 insertions, 717 deletions
diff --git a/cli/src/cli-cmd-parser.c b/cli/src/cli-cmd-parser.c index 38b30a4fe04..f3b8887d554 100644 --- a/cli/src/cli-cmd-parser.c +++ b/cli/src/cli-cmd-parser.c @@ -2203,8 +2203,8 @@ config_parse (const char **words, int wordcount, dict_t *dict,                                  ret = -1;                                  goto out;                          } -                        snprintf (append_str, 300, "now:%" GF_PRI_SECOND ".%06"GF_PRI_SUSECONDS, -                                  tv.tv_sec, tv.tv_usec); +                        snprintf (append_str, 300, "%" GF_PRI_SECOND, +                                  tv.tv_sec);                  }                  ret = dict_set_dynstr (dict, "op_value", append_str); diff --git a/cli/src/cli-rpc-ops.c b/cli/src/cli-rpc-ops.c index 0e82afbf921..400c4842225 100644 --- a/cli/src/cli-rpc-ops.c +++ b/cli/src/cli-rpc-ops.c @@ -4419,18 +4419,24 @@ get_struct_variable (int mem_num, gf_gsync_status_t *sts_val)          case 1:  return (sts_val->master);          case 2:  return (sts_val->brick);          case 3:  return (sts_val->slave_user); -        case 4:  return (sts_val->slave_node); -        case 5:  return (sts_val->worker_status); -        case 6:  return (sts_val->checkpoint_status); +        case 4:  return (sts_val->slave); +        case 5:  return (sts_val->slave_node); +        case 6:  return (sts_val->worker_status);          case 7:  return (sts_val->crawl_status); -        case 8:  return (sts_val->files_syncd); -        case 9:  return (sts_val->files_remaining); -        case 10:  return (sts_val->bytes_remaining); -        case 11: return (sts_val->purges_remaining); -        case 12: return (sts_val->total_files_skipped); -        case 13: return (sts_val->brick_host_uuid); -        case 14: return (sts_val->slavekey); -        case 15: return (sts_val->session_slave); +        case 8:  return (sts_val->last_synced); +        case 9:  return (sts_val->entry); +        case 10:  return (sts_val->data); +        case 11:  return (sts_val->meta); +        case 12: return (sts_val->failures); +        case 13:  return (sts_val->checkpoint_time); +        case 14:  return (sts_val->checkpoint_completed); +        case 15:  return (sts_val->checkpoint_completion_time); +        case 16: return (sts_val->brick_host_uuid); +        case 17: return (sts_val->last_synced_utc); +        case 18: return (sts_val->checkpoint_time_utc); +        case 19: return (sts_val->checkpoint_completion_time_utc); +        case 20: return (sts_val->slavekey); +        case 21: return (sts_val->session_slave);          default:                   goto out;          } @@ -4448,7 +4454,7 @@ gf_cli_print_status (char **title_values,          int     i                        = 0;          int     j                        = 0;          int     ret                      = 0; -        int     status_fields            = 7; /* Indexed at 0 */ +        int     status_fields            = 8; /* Indexed at 0 */          int     total_spacing            = 0;          char  **output_values            = NULL;          char   *tmp                      = NULL; @@ -4507,13 +4513,15 @@ gf_cli_print_status (char **title_values,                          strlen(title_values[j]));                  output_values[j][spacing[j]] = '\0';          } -        cli_out ("%s %s %s %s %s %s %s %s %s %s %s %s", +        cli_out ("%s %s %s %s %s %s %s %s %s %s %s %s %s %s %s %s",                   output_values[0], output_values[1],                   output_values[2], output_values[3],                   output_values[4], output_values[5],                   output_values[6], output_values[7],                   output_values[8], output_values[9], -                 output_values[10], output_values[11]); +                 output_values[10], output_values[11], +                 output_values[12], output_values[13], +                 output_values[14], output_values[15]);          /* setting and printing the hyphens */          memset (hyphens, '-', total_spacing); @@ -4540,13 +4548,15 @@ gf_cli_print_status (char **title_values,                          output_values[j][spacing[j]] = '\0';                  } -                cli_out ("%s %s %s %s %s %s %s %s %s %s %s %s", +                cli_out ("%s %s %s %s %s %s %s %s %s %s %s %s %s %s %s %s",                           output_values[0], output_values[1],                           output_values[2], output_values[3],                           output_values[4], output_values[5],                           output_values[6], output_values[7],                           output_values[8], output_values[9], -                         output_values[10], output_values[11]); +                         output_values[10], output_values[11], +                         output_values[12], output_values[13], +                         output_values[14], output_values[15]);          }  out: @@ -4565,6 +4575,23 @@ out:  }  int +gf_gsync_status_t_comparator (const void *p, const void *q) +{ +        char *slavekey1 = NULL; +        char *slavekey2 = NULL; + +        slavekey1 = get_struct_variable (20, (*(gf_gsync_status_t **)p)); +        slavekey2 = get_struct_variable (20, (*(gf_gsync_status_t **)q)); +        if (!slavekey1 || !slavekey2) { +                gf_log ("cli", GF_LOG_ERROR, +                        "struct member empty."); +                return 0; +        } + +        return strcmp (slavekey1, slavekey2); +} + +int  gf_cli_read_status_data (dict_t *dict,                           gf_gsync_status_t **sts_vals,                           int *spacing, int gsync_count, @@ -4599,6 +4626,11 @@ gf_cli_read_status_data (dict_t *dict,                  }          } +        /* Sort based on Session Slave */ +        qsort(sts_vals, gsync_count, +              sizeof(gf_gsync_status_t *), +              gf_gsync_status_t_comparator); +  out:          return ret;  } @@ -4609,18 +4641,20 @@ gf_cli_gsync_status_output (dict_t *dict, gf_boolean_t is_detail)          int                     gsync_count    = 0;          int                     i              = 0;          int                     ret            = 0; -        int                     spacing[13]    = {0}; -        int                     num_of_fields  = 13; +        int                     spacing[16]    = {0}; +        int                     num_of_fields  = 16;          char                    errmsg[1024]   = "";          char                   *master         = NULL;          char                   *slave          = NULL;          char                   *title_values[] = {"MASTER NODE", "MASTER VOL",                                                    "MASTER BRICK", "SLAVE USER", -                                                  "SLAVE", -                                                  "STATUS", "CHECKPOINT STATUS", -                                                  "CRAWL STATUS", "FILES SYNCD", -                                                  "FILES PENDING", "BYTES PENDING", -                                                  "DELETES PENDING", "FILES SKIPPED"}; +                                                  "SLAVE", "SLAVE NODE", +                                                  "STATUS", "CRAWL STATUS", +                                                  "LAST_SYNCED", "ENTRY", +                                                  "DATA", "META", "FAILURES", +                                                  "CHECKPOINT TIME", +                                                  "CHECKPOINT COMPLETED", +                                                  "CHECKPOINT COMPLETION TIME"};          gf_gsync_status_t     **sts_vals       = NULL;          /* Checks if any session is active or not */ diff --git a/cli/src/cli-xml-output.c b/cli/src/cli-xml-output.c index cbb4c1f58e7..d7322d5bb0d 100644 --- a/cli/src/cli-xml-output.c +++ b/cli/src/cli-xml-output.c @@ -3839,25 +3839,6 @@ out:  #if (HAVE_LIB_XML)  int -gf_gsync_status_t_comparator (const void *p, const void *q) -{ -        char *master1 = NULL; -        char *master2 = NULL; - -        master1 = get_struct_variable (1, (*(gf_gsync_status_t **)p)); -        master2 = get_struct_variable (1, (*(gf_gsync_status_t **)q)); -        if (!master1 || !master2) { -                gf_log ("cli", GF_LOG_ERROR, -                        "struct member empty."); -                return 0; -        } - -        return strcmp (master1,master2); -} -#endif - -#if (HAVE_LIB_XML) -int  cli_xml_output_vol_gsync_status (dict_t *dict,                                   xmlTextWriterPtr writer)  { @@ -3865,8 +3846,7 @@ cli_xml_output_vol_gsync_status (dict_t *dict,          int                  i                           = 1;          int                  j                           = 0;          int                  count                       = 0; -        const int            number_of_fields            = 13; -        const int            number_of_basic_fields      = 8; +        const int            number_of_fields            = 20;          int                  closed                      = 1;          int                  session_closed              = 1;          gf_gsync_status_t  **status_values               = NULL; @@ -3878,18 +3858,31 @@ cli_xml_output_vol_gsync_status (dict_t *dict,          char                *slave                       = NULL;          char                *slave_next                  = NULL;          char                *title_values[]              = {"master_node", -                                                            "master_node_uuid", +                                                            "",                                                              "master_brick",                                                              "slave_user",                                                              "slave", +                                                            "slave_node",                                                              "status", -                                                            "checkpoint_status",                                                              "crawl_status", -                                                            "files_syncd", -                                                            "files_pending", -                                                            "bytes_pending", -                                                            "deletes_pending", -                                                            "files_skipped"}; +                                                            /* last_synced */ +                                                            "", +                                                            "entry", +                                                            "data", +                                                            "meta", +                                                            "failures", +                                                           /* checkpoint_time */ +                                                            "", +                                                         "checkpoint_completed", +                                               /* checkpoint_completion_time */ +                                                            "", +                                                            "master_node_uuid", +                                                           /* last_synced_utc */ +                                                            "last_synced", +                                                       /* checkpoint_time_utc */ +                                                            "checkpoint_time", +                                            /* checkpoint_completion_time_utc */ +                                                 "checkpoint_completion_time"};          GF_ASSERT (dict); @@ -3963,7 +3956,7 @@ cli_xml_output_vol_gsync_status (dict_t *dict,                          session_closed = 0; -                        tmp = get_struct_variable (15, status_values[i]); +                        tmp = get_struct_variable (21, status_values[i]);                          if (!tmp) {                                  gf_log ("cli", GF_LOG_ERROR,                                          "struct member empty."); @@ -3980,18 +3973,11 @@ cli_xml_output_vol_gsync_status (dict_t *dict,                  XML_RET_CHECK_AND_GOTO (ret, out);                  for (j = 0; j < number_of_fields; j++) { -                  // if detail option is not set and field is not under -                  // basic fields or if field is volume then skip -                        if(!status_detail && j >= number_of_basic_fields) +                        /* XML ignore fields */ +                        if (strcmp(title_values[j], "") == 0)                                  continue; -                        // Displaying the master_node uuid as second field - -                        if (j == 1) -                                tmp = get_struct_variable (13, -                                                           status_values[i]); -                        else -                                tmp = get_struct_variable (j, status_values[i]); +                        tmp = get_struct_variable (j, status_values[i]);                          if (!tmp) {                                  gf_log ("cli", GF_LOG_ERROR,                                          "struct member empty."); @@ -4009,8 +3995,8 @@ cli_xml_output_vol_gsync_status (dict_t *dict,                  XML_RET_CHECK_AND_GOTO (ret, out);                  if (i+1 < count) { -                        slave = get_struct_variable (14, status_values[i]); -                        slave_next = get_struct_variable (14, +                        slave = get_struct_variable (20, status_values[i]); +                        slave_next = get_struct_variable (20,                                                            status_values[i+1]);                          volume = get_struct_variable (1, status_values[i]);                          volume_next = get_struct_variable (1, diff --git a/cli/src/cli.h b/cli/src/cli.h index 2648d25ee9c..77f951dd25e 100644 --- a/cli/src/cli.h +++ b/cli/src/cli.h @@ -445,4 +445,7 @@ print_quota_list_header (int type);  void  print_quota_list_empty (char *path, int type); +int +gf_gsync_status_t_comparator (const void *p, const void *q); +  #endif /* __CLI_H__ */ diff --git a/geo-replication/syncdaemon/Makefile.am b/geo-replication/syncdaemon/Makefile.am index 885963eae2b..ed0f5e40924 100644 --- a/geo-replication/syncdaemon/Makefile.am +++ b/geo-replication/syncdaemon/Makefile.am @@ -2,6 +2,7 @@ syncdaemondir = $(libexecdir)/glusterfs/python/syncdaemon  syncdaemon_PYTHON = gconf.py gsyncd.py __init__.py master.py README.md repce.py \  	resource.py configinterface.py syncdutils.py monitor.py libcxattr.py \ -	$(top_builddir)/contrib/ipaddr-py/ipaddr.py libgfchangelog.py changelogagent.py +	$(top_builddir)/contrib/ipaddr-py/ipaddr.py libgfchangelog.py changelogagent.py \ +	gsyncdstatus.py  CLEANFILES = diff --git a/geo-replication/syncdaemon/gsyncd.py b/geo-replication/syncdaemon/gsyncd.py index b9ee5aec8c7..32e4eb7828d 100644 --- a/geo-replication/syncdaemon/gsyncd.py +++ b/geo-replication/syncdaemon/gsyncd.py @@ -27,12 +27,13 @@ from ipaddr import IPAddress, IPNetwork  from gconf import gconf  from syncdutils import FreeObject, norm, grabpidfile, finalize -from syncdutils import log_raise_exception, privileged, update_file +from syncdutils import log_raise_exception, privileged  from syncdutils import GsyncdError, select, set_term_handler  from configinterface import GConffile, upgrade_config_file  import resource  from monitor import monitor  from changelogagent import agent, Changelog +from gsyncdstatus import set_monitor_status, GeorepStatus  class GLogger(Logger): @@ -267,7 +268,7 @@ def main_i():      op.add_option('--socketdir', metavar='DIR')      op.add_option('--state-socket-unencoded', metavar='SOCKF',                    type=str, action='callback', callback=store_abs) -    op.add_option('--checkpoint', metavar='LABEL', default='') +    op.add_option('--checkpoint', metavar='LABEL', default='0')      # tunables for failover/failback mechanism:      # None   - gsyncd behaves as normal @@ -315,6 +316,8 @@ def main_i():                    action='callback', callback=store_local)      op.add_option('--delete', dest='delete', action='callback',                    callback=store_local_curry(True)) +    op.add_option('--status-get', dest='status_get', action='callback', +                  callback=store_local_curry(True))      op.add_option('--debug', dest="go_daemon", action='callback',                    callback=lambda *a: (store_local_curry('dont')(*a),                                         setattr( @@ -583,15 +586,8 @@ def main_i():              GLogger._gsyncd_loginit(log_file=gconf.log_file, label='conf')              if confdata.op == 'set':                  logging.info('checkpoint %s set' % confdata.val) -                gcnf.delete('checkpoint_completed') -                gcnf.delete('checkpoint_target')              elif confdata.op == 'del':                  logging.info('checkpoint info was reset') -                # if it is removing 'checkpoint' then we need -                # to remove 'checkpoint_completed' and 'checkpoint_target' too -                gcnf.delete('checkpoint_completed') -                gcnf.delete('checkpoint_target') -          except IOError:              if sys.exc_info()[1].errno == ENOENT:                  # directory of log path is not present, @@ -607,7 +603,7 @@ def main_i():      create = rconf.get('create')      if create:          if getattr(gconf, 'state_file', None): -            update_file(gconf.state_file, lambda f: f.write(create + '\n')) +            set_monitor_status(gconf.state_file, create)          return      go_daemon = rconf['go_daemon'] @@ -615,6 +611,15 @@ def main_i():      be_agent = rconf.get('agent')      rscs, local, remote = makersc(args) + +    status_get = rconf.get('status_get') +    if status_get: +        for brick in gconf.path: +            brick_status = GeorepStatus(gconf.state_file, brick) +            checkpoint_time = int(getattr(gconf, "checkpoint", "0")) +            brick_status.print_status(checkpoint_time=checkpoint_time) +        return +      if not be_monitor and isinstance(remote, resource.SSH) and \         go_daemon == 'should':          go_daemon = 'postconn' diff --git a/geo-replication/syncdaemon/gsyncdstatus.py b/geo-replication/syncdaemon/gsyncdstatus.py new file mode 100644 index 00000000000..a49b9c23dea --- /dev/null +++ b/geo-replication/syncdaemon/gsyncdstatus.py @@ -0,0 +1,317 @@ +#!/usr/bin/env python +# +# Copyright (c) 2011-2014 Red Hat, Inc. <http://www.redhat.com> +# This file is part of GlusterFS. + +# This file is licensed to you under your choice of the GNU Lesser +# General Public License, version 3 or any later version (LGPLv3 or +# later), or the GNU General Public License, version 2 (GPLv2), in all +# cases as published by the Free Software Foundation. +# + +import fcntl +import os +import tempfile +import urllib +import json +import time +from datetime import datetime + +DEFAULT_STATUS = "N/A" +MONITOR_STATUS = ("Created", "Started", "Paused", "Stopped") +STATUS_VALUES = (DEFAULT_STATUS, +                 "Initializing...", +                 "Active", +                 "Passive", +                 "Faulty") + +CRAWL_STATUS_VALUES = (DEFAULT_STATUS, +                       "Hybrid Crawl", +                       "History Crawl", +                       "Changelog Crawl") + + +def human_time(ts): +    try: +        return datetime.fromtimestamp(float(ts)).strftime("%Y-%m-%d %H:%M:%S") +    except ValueError: +        return DEFAULT_STATUS + + +def human_time_utc(ts): +    try: +        return datetime.utcfromtimestamp( +            float(ts)).strftime("%Y-%m-%d %H:%M:%S") +    except ValueError: +        return DEFAULT_STATUS + + +def get_default_values(): +    return { +        "slave_node": DEFAULT_STATUS, +        "worker_status": DEFAULT_STATUS, +        "last_synced": 0, +        "crawl_status": DEFAULT_STATUS, +        "entry": 0, +        "data": 0, +        "meta": 0, +        "failures": 0, +        "checkpoint_completed": DEFAULT_STATUS, +        "checkpoint_time": 0, +        "checkpoint_completion_time": 0} + + +class LockedOpen(object): + +    def __init__(self, filename, *args, **kwargs): +        self.filename = filename +        self.open_args = args +        self.open_kwargs = kwargs +        self.fileobj = None + +    def __enter__(self): +        """ +        If two processes compete to update a file, The first process +        gets the lock and the second process is blocked in the fcntl.flock() +        call. When first process replaces the file and releases the lock, +        the already open file descriptor in the second process now points +        to a  "ghost" file(not reachable by any path name) with old contents. +        To avoid that conflict, check the fd already opened is same or +        not. Open new one if not same +        """ +        f = open(self.filename, *self.open_args, **self.open_kwargs) +        while True: +            fcntl.flock(f, fcntl.LOCK_EX) +            fnew = open(self.filename, *self.open_args, **self.open_kwargs) +            if os.path.sameopenfile(f.fileno(), fnew.fileno()): +                fnew.close() +                break +            else: +                f.close() +                f = fnew +        self.fileobj = f +        return f + +    def __exit__(self, _exc_type, _exc_value, _traceback): +        self.fileobj.close() + + +def set_monitor_status(status_file, status): +    fd = os.open(status_file, os.O_CREAT | os.O_RDWR) +    os.close(fd) +    with LockedOpen(status_file, 'r+'): +        with tempfile.NamedTemporaryFile('w', dir=os.path.dirname(status_file), +                                         delete=False) as tf: +            tf.write(status) +            tempname = tf.name + +        os.rename(tempname, status_file) +        dirfd = os.open(os.path.dirname(os.path.abspath(status_file)), +                        os.O_DIRECTORY) +        os.fsync(dirfd) +        os.close(dirfd) + + +class GeorepStatus(object): +    def __init__(self, monitor_status_file, brick): +        self.work_dir = os.path.dirname(monitor_status_file) +        self.monitor_status_file = monitor_status_file +        self.filename = os.path.join(self.work_dir, +                                     "brick_%s.status" +                                     % urllib.quote_plus(brick)) + +        fd = os.open(self.filename, os.O_CREAT | os.O_RDWR) +        os.close(fd) +        fd = os.open(self.monitor_status_file, os.O_CREAT | os.O_RDWR) +        os.close(fd) +        self.brick = brick +        self.default_values = get_default_values() + +    def _update(self, mergerfunc): +        with LockedOpen(self.filename, 'r+') as f: +            try: +                data = json.load(f) +            except ValueError: +                data = self.default_values + +            data = mergerfunc(data) +            with tempfile.NamedTemporaryFile( +                    'w', +                    dir=os.path.dirname(self.filename), +                    delete=False) as tf: +                tf.write(data) +                tempname = tf.name + +            os.rename(tempname, self.filename) +            dirfd = os.open(os.path.dirname(os.path.abspath(self.filename)), +                            os.O_DIRECTORY) +            os.fsync(dirfd) +            os.close(dirfd) + +    def reset_on_worker_start(self): +        def merger(data): +            data["slave_node"] = DEFAULT_STATUS +            data["crawl_status"] = DEFAULT_STATUS +            data["entry"] = 0 +            data["data"] = 0 +            data["meta"] = 0 +            return json.dumps(data) + +        self._update(merger) + +    def set_field(self, key, value): +        def merger(data): +            data[key] = value +            return json.dumps(data) + +        self._update(merger) + +    def set_last_synced(self, value, checkpoint_time): +        def merger(data): +            data["last_synced"] = value[0] + +            # If checkpoint is not set or reset +            # or if last set checkpoint is changed +            if checkpoint_time == 0 or \ +               checkpoint_time != data["checkpoint_time"]: +                data["checkpoint_time"] = 0 +                data["checkpoint_completion_time"] = 0 +                data["checkpoint_completed"] = "No" + +            # If checkpoint is completed and not marked as completed +            # previously then update the checkpoint completed time +            if checkpoint_time > 0 and checkpoint_time <= value[0]: +                if data["checkpoint_completed"] == "No": +                    data["checkpoint_time"] = checkpoint_time +                    data["checkpoint_completion_time"] = int(time.time()) +                    data["checkpoint_completed"] = "Yes" +            return json.dumps(data) + +        self._update(merger) + +    def set_worker_status(self, status): +        self.set_field("worker_status", status) + +    def set_worker_crawl_status(self, status): +        self.set_field("crawl_status", status) + +    def set_slave_node(self, slave_node): +        def merger(data): +            data["slave_node"] = slave_node +            return json.dumps(data) + +        self._update(merger) + +    def inc_value(self, key, value): +        def merger(data): +            data[key] = data.get(key, 0) + value +            return json.dumps(data) + +        self._update(merger) + +    def dec_value(self, key, value): +        def merger(data): +            data[key] = data.get(key, 0) - value +            if data[key] < 0: +                data[key] = 0 +            return json.dumps(data) + +        self._update(merger) + +    def set_active(self): +        self.set_field("worker_status", "Active") + +    def set_passive(self): +        self.set_field("worker_status", "Passive") + +    def get_monitor_status(self): +        data = "" +        with open(self.monitor_status_file, "r") as f: +            data = f.read().strip() +        return data + +    def get_status(self, checkpoint_time=0): +        """ +        Monitor Status --->        Created    Started  Paused      Stopped +        ---------------------------------------------------------------------- +        slave_node                 N/A        VALUE    VALUE       N/A +        status                     Created    VALUE    Paused      Stopped +        last_synced                N/A        VALUE    VALUE       VALUE +        crawl_status               N/A        VALUE    N/A         N/A +        entry                      N/A        VALUE    N/A         N/A +        data                       N/A        VALUE    N/A         N/A +        meta                       N/A        VALUE    N/A         N/A +        failures                   N/A        VALUE    VALUE       VALUE +        checkpoint_completed       N/A        VALUE    VALUE       VALUE +        checkpoint_time            N/A        VALUE    VALUE       VALUE +        checkpoint_completed_time  N/A        VALUE    VALUE       VALUE +        """ +        data = self.default_values +        with open(self.filename) as f: +            try: +                data.update(json.load(f)) +            except ValueError: +                pass +        monitor_status = self.get_monitor_status() + +        if monitor_status in ["Created", "Paused", "Stopped"]: +            data["worker_status"] = monitor_status + +        # Checkpoint adjustments +        if checkpoint_time == 0: +            data["checkpoint_completed"] = DEFAULT_STATUS +            data["checkpoint_time"] = DEFAULT_STATUS +            data["checkpoint_completion_time"] = DEFAULT_STATUS +        else: +            if checkpoint_time != data["checkpoint_time"]: +                if checkpoint_time <= data["last_synced"]: +                    data["checkpoint_completed"] = "Yes" +                    data["checkpoint_time"] = checkpoint_time +                    data["checkpoint_completion_time"] = data["last_synced"] +                else: +                    data["checkpoint_completed"] = "No" +                    data["checkpoint_time"] = checkpoint_time +                    data["checkpoint_completion_time"] = DEFAULT_STATUS + +        if data["checkpoint_time"] not in [0, DEFAULT_STATUS]: +            chkpt_time = data["checkpoint_time"] +            data["checkpoint_time"] = human_time(chkpt_time) +            data["checkpoint_time_utc"] = human_time_utc(chkpt_time) + +        if data["checkpoint_completion_time"] not in [0, DEFAULT_STATUS]: +            chkpt_completion_time = data["checkpoint_completion_time"] +            data["checkpoint_completion_time"] = human_time( +                chkpt_completion_time) +            data["checkpoint_completion_time_utc"] = human_time_utc( +                chkpt_completion_time) + +        if data["last_synced"] == 0: +            data["last_synced"] = DEFAULT_STATUS +            data["last_synced_utc"] = DEFAULT_STATUS +        else: +            last_synced = data["last_synced"] +            data["last_synced"] = human_time(last_synced) +            data["last_synced_utc"] = human_time_utc(last_synced) + +        if data["worker_status"] != "Active": +            data["last_synced"] = DEFAULT_STATUS +            data["last_synced_utc"] = DEFAULT_STATUS +            data["crawl_status"] = DEFAULT_STATUS +            data["entry"] = DEFAULT_STATUS +            data["data"] = DEFAULT_STATUS +            data["meta"] = DEFAULT_STATUS +            data["failures"] = DEFAULT_STATUS +            data["checkpoint_completed"] = DEFAULT_STATUS +            data["checkpoint_time"] = DEFAULT_STATUS +            data["checkpoint_completed_time"] = DEFAULT_STATUS +            data["checkpoint_time_utc"] = DEFAULT_STATUS +            data["checkpoint_completion_time_utc"] = DEFAULT_STATUS + +        if data["worker_status"] not in ["Active", "Passive"]: +            data["slave_node"] = DEFAULT_STATUS + +        return data + +    def print_status(self, checkpoint_time=0): +        for key, value in self.get_status(checkpoint_time).items(): +            print ("%s: %s" % (key, value)) diff --git a/geo-replication/syncdaemon/master.py b/geo-replication/syncdaemon/master.py index e34def6f6ab..cd20a490397 100644 --- a/geo-replication/syncdaemon/master.py +++ b/geo-replication/syncdaemon/master.py @@ -15,17 +15,15 @@ import stat  import json  import logging  import fcntl -import socket  import string  import errno  import tarfile -from errno import ENOENT, ENODATA, EPIPE, EEXIST, EACCES, EAGAIN +from errno import ENOENT, ENODATA, EEXIST, EACCES, EAGAIN  from threading import Condition, Lock  from datetime import datetime  from gconf import gconf -from tempfile import NamedTemporaryFile  from syncdutils import Thread, GsyncdError, boolify, escape -from syncdutils import unescape, select, gauxpfx, md5hex, selfkill +from syncdutils import unescape, gauxpfx, md5hex, selfkill  from syncdutils import lstat, errno_wrap  from syncdutils import NoPurgeTimeAvailable, PartialHistoryAvailable @@ -389,18 +387,6 @@ class GMasterCommon(object):                      raise          return default_data -    def update_crawl_data(self): -        if getattr(gconf, 'state_detail_file', None): -            try: -                same_dir = os.path.dirname(gconf.state_detail_file) -                with NamedTemporaryFile(dir=same_dir, delete=False) as tmp: -                    json.dump(self.total_crawl_stats, tmp) -                    tmp.flush() -                    os.fsync(tmp.fileno()) -                    os.rename(tmp.name, gconf.state_detail_file) -            except (IOError, OSError): -                raise -      def __init__(self, master, slave):          self.master = master          self.slave = slave @@ -426,14 +412,12 @@ class GMasterCommon(object):          self.total_turns = int(gconf.turns)          self.crawl_start = datetime.now()          self.lastreport = {'crawls': 0, 'turns': 0, 'time': 0} -        self.total_crawl_stats = None          self.start = None          self.change_seen = None          # the actual volinfo we make use of          self.volinfo = None          self.terminate = False          self.sleep_interval = 1 -        self.checkpoint_thread = None          self.current_files_skipped_count = 0          self.skipped_gfid_list = []          self.unlinked_gfids = [] @@ -485,7 +469,6 @@ class GMasterCommon(object):          logging.debug("Got the lock")          return True -      def should_crawl(self):          if not gconf.use_meta_volume:              return gconf.glusterd_uuid in self.master.server.node_uuid() @@ -495,7 +478,6 @@ class GMasterCommon(object):              sys.exit(1)          return self.mgmt_lock() -      def register(self):          self.register() @@ -534,10 +516,8 @@ class GMasterCommon(object):              if self.volinfo['retval']:                  logging.warn("master cluster's info may not be valid %d" %                               self.volinfo['retval']) -            self.start_checkpoint_thread()          else:              raise GsyncdError("master volinfo unavailable") -        self.total_crawl_stats = self.get_initial_crawl_data()          self.lastreport['time'] = time.time()          logging.info('crawl interval: %d seconds' % self.sleep_interval) @@ -562,7 +542,7 @@ class GMasterCommon(object):                  t0 = t1              self.update_worker_remote_node()              if not crawl: -                self.update_worker_health("Passive") +                self.status.set_passive()                  # bring up _this_ brick to the cluster stime                  # which is min of cluster (but max of the replicas)                  brick_stime = self.xtime('.', self.slave) @@ -589,35 +569,14 @@ class GMasterCommon(object):                  time.sleep(5)                  continue -            self.update_worker_health("Active") + +            self.status.set_active()              self.crawl() +              if oneshot:                  return              time.sleep(self.sleep_interval) -    @classmethod -    def _checkpt_param(cls, chkpt, prm, xtimish=True): -        """use config backend to lookup a parameter belonging to -           checkpoint @chkpt""" -        cprm = gconf.configinterface.get_realtime('checkpoint_' + prm) -        if not cprm: -            return -        chkpt_mapped, val = cprm.split(':', 1) -        if unescape(chkpt_mapped) != chkpt: -            return -        if xtimish: -            val = cls.deserialize_xtime(val) -        return val - -    @classmethod -    def _set_checkpt_param(cls, chkpt, prm, val, xtimish=True): -        """use config backend to store a parameter associated -           with checkpoint @chkpt""" -        if xtimish: -            val = cls.serialize_xtime(val) -        gconf.configinterface.set( -            'checkpoint_' + prm, "%s:%s" % (escape(chkpt), val)) -      @staticmethod      def humantime(*tpair):          """format xtime-like (sec, nsec) pair to human readable format""" @@ -646,116 +605,6 @@ class GMasterCommon(object):                                string.zfill(m, 2), string.zfill(s, 2))          return date -    def checkpt_service(self, chan, chkpt): -        """checkpoint service loop - -        monitor and verify checkpoint status for @chkpt, and listen -        for incoming requests for whom we serve a pretty-formatted -        status report""" -        while True: -            chkpt = gconf.configinterface.get_realtime("checkpoint") -            if not chkpt: -                gconf.configinterface.delete("checkpoint_completed") -                gconf.configinterface.delete("checkpoint_target") -                # dummy loop for the case when there is no checkpt set -                select([chan], [], []) -                conn, _ = chan.accept() -                conn.send('\0') -                conn.close() -                continue - -            checkpt_tgt = self._checkpt_param(chkpt, 'target') -            if not checkpt_tgt: -                checkpt_tgt = self.xtime('.') -                if isinstance(checkpt_tgt, int): -                    raise GsyncdError("master root directory is " -                                      "unaccessible (%s)", -                                      os.strerror(checkpt_tgt)) -                self._set_checkpt_param(chkpt, 'target', checkpt_tgt) -            logging.debug("checkpoint target %s has been determined " -                          "for checkpoint %s" % -                          (repr(checkpt_tgt), chkpt)) - -            # check if the label is 'now' -            chkpt_lbl = chkpt -            try: -                x1, x2 = chkpt.split(':') -                if x1 == 'now': -                    chkpt_lbl = "as of " + self.humantime(x2) -            except: -                pass -            completed = self._checkpt_param(chkpt, 'completed', xtimish=False) -            if completed: -                completed = tuple(int(x) for x in completed.split('.')) -            s, _, _ = select([chan], [], [], (not completed) and 5 or None) -            # either request made and we re-check to not -            # give back stale data, or we still hunting for completion -            if (self.native_xtime(checkpt_tgt) and ( -                    self.native_xtime(checkpt_tgt) < self.volmark)): -                # indexing has been reset since setting the checkpoint -                status = "is invalid" -            else: -                xtr = self.xtime('.', self.slave) -                if isinstance(xtr, int): -                    raise GsyncdError("slave root directory is " -                                      "unaccessible (%s)", -                                      os.strerror(xtr)) -                ncompleted = self.xtime_geq(xtr, checkpt_tgt) -                if completed and not ncompleted:  # stale data -                    logging.warn("completion time %s for checkpoint %s " -                                 "became stale" % -                                 (self.humantime(*completed), chkpt)) -                    completed = None -                    gconf.configinterface.delete('checkpoint_completed') -                if ncompleted and not completed:  # just reaching completion -                    completed = "%.6f" % time.time() -                    self._set_checkpt_param( -                        chkpt, 'completed', completed, xtimish=False) -                    completed = tuple(int(x) for x in completed.split('.')) -                    logging.info("checkpoint %s completed" % chkpt) -                status = completed and \ -                    "completed at " + self.humantime(completed[0]) or \ -                    "not reached yet" -            if s: -                conn = None -                try: -                    conn, _ = chan.accept() -                    try: -                        conn.send("checkpoint %s is %s\0" % -                                  (chkpt_lbl, status)) -                    except: -                        exc = sys.exc_info()[1] -                        if ((isinstance(exc, OSError) or isinstance( -                                exc, IOError)) and exc.errno == EPIPE): -                            logging.debug('checkpoint client disconnected') -                        else: -                            raise -                finally: -                    if conn: -                        conn.close() - -    def start_checkpoint_thread(self): -        """prepare and start checkpoint service""" -        if self.checkpoint_thread or not ( -            getattr(gconf, 'state_socket_unencoded', None) and getattr( -                gconf, 'socketdir', None) -        ): -            return -        chan = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) -        state_socket = os.path.join( -            gconf.socketdir, md5hex(gconf.state_socket_unencoded) + ".socket") -        try: -            os.unlink(state_socket) -        except: -            if sys.exc_info()[0] == OSError: -                pass -        chan.bind(state_socket) -        chan.listen(1) -        chkpt = gconf.configinterface.get_realtime("checkpoint") -        t = Thread(target=self.checkpt_service, args=(chan, chkpt)) -        t.start() -        self.checkpoint_thread = t -      def add_job(self, path, label, job, *a, **kw):          """insert @job function to job table at @path with @label"""          if self.jobtab.get(path) is None: @@ -929,11 +778,15 @@ class GMasterChangelogMixin(GMasterCommon):              files_pending['purge'] += 1          def log_failures(failures, entry_key, gfid_prefix, log_prefix): +            num_failures = 0              for failure in failures:                  st = lstat(os.path.join(gfid_prefix, failure[0][entry_key]))                  if not isinstance(st, int): +                    num_failures += 1                      logging.warn('%s FAILED: %s' % (log_prefix, repr(failure))) +            self.status.inc_value("failures", num_failures) +          for e in clist:              e = e.strip()              et = e[self.IDX_START:self.IDX_END]   # entry type @@ -1032,12 +885,18 @@ class GMasterChangelogMixin(GMasterCommon):              else:                  logging.warn('got invalid changelog type: %s' % (et))          logging.debug('entries: %s' % repr(entries)) -        if not retry: -            self.update_worker_cumilitive_status(files_pending) + +        # Increment counters for Status +        self.status.inc_value("entry", len(entries)) +        self.files_in_batch = len(datas) +        self.status.inc_value("data", self.files_in_batch) +          # sync namespace          if entries:              failures = self.slave.server.entry_ops(entries)              log_failures(failures, 'gfid', gauxpfx(), 'ENTRY') +            self.status.dec_value("entry", len(entries)) +          # sync metadata          if meta_gfid:              meta_entries = [] @@ -1051,8 +910,11 @@ class GMasterChangelogMixin(GMasterCommon):                      continue                  meta_entries.append(edct('META', go=go[0], stat=st))              if meta_entries: +                self.status.inc_value("meta", len(entries))                  failures = self.slave.server.meta_ops(meta_entries)                  log_failures(failures, 'go', '', 'META') +                self.status.dec_value("meta", len(entries)) +          # sync data          if datas:              self.a_syncdata(datas) @@ -1104,9 +966,17 @@ class GMasterChangelogMixin(GMasterCommon):                  if done:                      xtl = (int(change.split('.')[-1]) - 1, 0)                      self.upd_stime(xtl) +                    chkpt_time = gconf.configinterface.get_realtime( +                        "checkpoint") +                    checkpoint_time = 0 +                    if chkpt_time is not None: +                        checkpoint_time = int(chkpt_time) + +                    self.status.set_last_synced(xtl, checkpoint_time)                      map(self.changelog_done_func, changes)                      self.archive_and_purge_changelogs(changes) -                self.update_worker_files_syncd() +                self.status.dec_value("data", self.files_in_batch) +                self.files_in_batch = 0                  break              # We do not know which changelog transfer failed, retry everything. @@ -1116,14 +986,22 @@ class GMasterChangelogMixin(GMasterCommon):                  logging.warn('changelogs %s could not be processed - '                               'moving on...' %                               ' '.join(map(os.path.basename, changes))) -                self.update_worker_total_files_skipped( -                    self.current_files_skipped_count) +                self.status.inc_value("failures", +                                      self.current_files_skipped_count)                  logging.warn('SKIPPED GFID = %s' %                               ','.join(self.skipped_gfid_list)) -                self.update_worker_files_syncd() + +                self.files_in_batch = 0                  if done:                      xtl = (int(change.split('.')[-1]) - 1, 0)                      self.upd_stime(xtl) +                    chkpt_time = gconf.configinterface.get_realtime( +                        "checkpoint") +                    checkpoint_time = 0 +                    if chkpt_time is not None: +                        checkpoint_time = int(chkpt_time) + +                    self.status.set_last_synced(xtl, checkpoint_time)                      map(self.changelog_done_func, changes)                      self.archive_and_purge_changelogs(changes)                  break @@ -1144,164 +1022,15 @@ class GMasterChangelogMixin(GMasterCommon):          if not stime == URXTIME:              self.sendmark(path, stime) -    def get_worker_status_file(self): -        file_name = gconf.local_path + '.status' -        file_name = file_name.replace("/", "_") -        worker_status_file = gconf.georep_session_working_dir + file_name -        return worker_status_file - -    def update_worker_status(self, key, value): -        default_data = {"remote_node": "N/A", -                        "worker status": "Not Started", -                        "crawl status": "N/A", -                        "files_syncd": 0, -                        "files_remaining": 0, -                        "bytes_remaining": 0, -                        "purges_remaining": 0, -                        "total_files_skipped": 0} -        worker_status_file = self.get_worker_status_file() -        try: -            with open(worker_status_file, 'r+') as f: -                loaded_data = json.load(f) -                loaded_data[key] = value -                os.ftruncate(f.fileno(), 0) -                os.lseek(f.fileno(), 0, os.SEEK_SET) -                json.dump(loaded_data, f) -                f.flush() -                os.fsync(f.fileno()) -        except (IOError, OSError, ValueError): -            logging.info('Creating new %s' % worker_status_file) -            try: -                with open(worker_status_file, 'wb') as f: -                    default_data[key] = value -                    json.dump(default_data, f) -                    f.flush() -                    os.fsync(f.fileno()) -            except: -                raise - -    def update_worker_cumilitive_status(self, files_pending): -        default_data = {"remote_node": "N/A", -                        "worker status": "Not Started", -                        "crawl status": "N/A", -                        "files_syncd": 0, -                        "files_remaining": 0, -                        "bytes_remaining": 0, -                        "purges_remaining": 0, -                        "total_files_skipped": 0} -        worker_status_file = self.get_worker_status_file() -        try: -            with open(worker_status_file, 'r+') as f: -                loaded_data = json.load(f) -                loaded_data['files_remaining'] = files_pending['count'] -                loaded_data['bytes_remaining'] = files_pending['bytes'] -                loaded_data['purges_remaining'] = files_pending['purge'] -                os.ftruncate(f.fileno(), 0) -                os.lseek(f.fileno(), 0, os.SEEK_SET) -                json.dump(loaded_data, f) -                f.flush() -                os.fsync(f.fileno()) -        except (IOError, OSError, ValueError): -            logging.info('Creating new %s' % worker_status_file) -            try: -                with open(worker_status_file, 'wb') as f: -                    default_data['files_remaining'] = files_pending['count'] -                    default_data['bytes_remaining'] = files_pending['bytes'] -                    default_data['purges_remaining'] = files_pending['purge'] -                    json.dump(default_data, f) -                    f.flush() -                    os.fsync(f.fileno()) -            except: -                raise -      def update_worker_remote_node(self):          node = sys.argv[-1] -        node = node.split("@")[-1] +        node_data = node.split("@") +        node = node_data[-1]          remote_node_ip = node.split(":")[0] -        remote_node_vol = node.split(":")[3] -        remote_node = remote_node_ip + '::' + remote_node_vol -        self.update_worker_status('remote_node', remote_node) - -    def update_worker_health(self, state): -        self.update_worker_status('worker status', state) - -    def update_worker_crawl_status(self, state): -        self.update_worker_status('crawl status', state) - -    def update_worker_files_syncd(self): -        default_data = {"remote_node": "N/A", -                        "worker status": "Not Started", -                        "crawl status": "N/A", -                        "files_syncd": 0, -                        "files_remaining": 0, -                        "bytes_remaining": 0, -                        "purges_remaining": 0, -                        "total_files_skipped": 0} -        worker_status_file = self.get_worker_status_file() -        try: -            with open(worker_status_file, 'r+') as f: -                loaded_data = json.load(f) -                loaded_data['files_syncd'] += loaded_data['files_remaining'] -                loaded_data['files_remaining'] = 0 -                loaded_data['bytes_remaining'] = 0 -                loaded_data['purges_remaining'] = 0 -                os.ftruncate(f.fileno(), 0) -                os.lseek(f.fileno(), 0, os.SEEK_SET) -                json.dump(loaded_data, f) -                f.flush() -                os.fsync(f.fileno()) -        except (IOError, OSError, ValueError): -            logging.info('Creating new %s' % worker_status_file) -            try: -                with open(worker_status_file, 'wb') as f: -                    json.dump(default_data, f) -                    f.flush() -                    os.fsync(f.fileno()) -            except: -                raise - -    def update_worker_files_remaining(self, state): -        self.update_worker_status('files_remaining', state) - -    def update_worker_bytes_remaining(self, state): -        self.update_worker_status('bytes_remaining', state) - -    def update_worker_purges_remaining(self, state): -        self.update_worker_status('purges_remaining', state) - -    def update_worker_total_files_skipped(self, value): -        default_data = {"remote_node": "N/A", -                        "worker status": "Not Started", -                        "crawl status": "N/A", -                        "files_syncd": 0, -                        "files_remaining": 0, -                        "bytes_remaining": 0, -                        "purges_remaining": 0, -                        "total_files_skipped": 0} -        worker_status_file = self.get_worker_status_file() -        try: -            with open(worker_status_file, 'r+') as f: -                loaded_data = json.load(f) -                loaded_data['total_files_skipped'] = value -                loaded_data['files_remaining'] -= value -                os.ftruncate(f.fileno(), 0) -                os.lseek(f.fileno(), 0, os.SEEK_SET) -                json.dump(loaded_data, f) -                f.flush() -                os.fsync(f.fileno()) -        except (IOError, OSError, ValueError): -            logging.info('Creating new %s' % worker_status_file) -            try: -                with open(worker_status_file, 'wb') as f: -                    default_data['total_files_skipped'] = value -                    json.dump(default_data, f) -                    f.flush() -                    os.fsync(f.fileno()) -            except: -                raise +        self.status.set_slave_node(remote_node_ip)      def crawl(self): -        self.update_worker_crawl_status("Changelog Crawl") +        self.status.set_worker_crawl_status("Changelog Crawl")          changes = []          # get stime (from the brick) and purge changelogs          # that are _historical_ to that time. @@ -1327,16 +1056,17 @@ class GMasterChangelogMixin(GMasterCommon):                  logging.debug('processing changes %s' % repr(changes))                  self.process(changes) -    def register(self, register_time, changelog_agent): +    def register(self, register_time, changelog_agent, status):          self.changelog_agent = changelog_agent          self.sleep_interval = int(gconf.change_interval)          self.changelog_done_func = self.changelog_agent.done          self.processed_changelogs_dir = os.path.join(self.setup_working_dir(),                                                       ".processed") +        self.status = status  class GMasterChangeloghistoryMixin(GMasterChangelogMixin): -    def register(self, register_time, changelog_agent): +    def register(self, register_time, changelog_agent, status):          self.changelog_agent = changelog_agent          self.changelog_register_time = register_time          self.history_crawl_start_time = register_time @@ -1344,10 +1074,11 @@ class GMasterChangeloghistoryMixin(GMasterChangelogMixin):          self.history_turns = 0          self.processed_changelogs_dir = os.path.join(self.setup_working_dir(),                                                       ".history/.processed") +        self.status = status      def crawl(self):          self.history_turns += 1 -        self.update_worker_crawl_status("History Crawl") +        self.status.set_worker_crawl_status("History Crawl")          purge_time = self.get_purge_time()          logging.info('starting history crawl... turns: %s, stime: %s' @@ -1429,7 +1160,8 @@ class GMasterXsyncMixin(GMasterChangelogMixin):      XSYNC_MAX_ENTRIES = 1 << 13 -    def register(self, register_time=None, changelog_agent=None): +    def register(self, register_time=None, changelog_agent=None, status=None): +        self.status = status          self.counter = 0          self.comlist = []          self.stimes = [] @@ -1460,7 +1192,7 @@ class GMasterXsyncMixin(GMasterChangelogMixin):          t.start()          logging.info('starting hybrid crawl..., stime: %s'                       % repr(self.get_purge_time())) -        self.update_worker_crawl_status("Hybrid Crawl") +        self.status.set_worker_crawl_status("Hybrid Crawl")          while True:              try:                  item = self.comlist.pop(0) diff --git a/geo-replication/syncdaemon/monitor.py b/geo-replication/syncdaemon/monitor.py index 029726c7a5a..ba5c8e32514 100644 --- a/geo-replication/syncdaemon/monitor.py +++ b/geo-replication/syncdaemon/monitor.py @@ -22,10 +22,12 @@ from errno import EEXIST  import re  import random  from gconf import gconf -from syncdutils import update_file, select, waitpid +from syncdutils import select, waitpid  from syncdutils import set_term_handler, is_host_local, GsyncdError  from syncdutils import escape, Thread, finalize, memoize +from gsyncdstatus import GeorepStatus, set_monitor_status +  ParseError = XET.ParseError if hasattr(XET, 'ParseError') else SyntaxError @@ -125,46 +127,22 @@ class Volinfo(object):      def disperse_count(self):          return int(self.get('disperseCount')[0].text) +  class Monitor(object):      """class which spawns and manages gsyncd workers"""      ST_INIT = 'Initializing...' -    ST_STABLE = 'Stable' -    ST_FAULTY = 'faulty' +    ST_STARTED = 'Started' +    ST_STABLE = 'Active' +    ST_FAULTY = 'Faulty'      ST_INCON = 'inconsistent'      _ST_ORD = [ST_STABLE, ST_INIT, ST_FAULTY, ST_INCON]      def __init__(self):          self.lock = Lock()          self.state = {} - -    def set_state(self, state, w=None): -        """set the state that can be used by external agents -           like glusterd for status reporting""" -        computestate = lambda: self.state and self._ST_ORD[ -            max(self._ST_ORD.index(s) for s in self.state.values())] -        if w: -            self.lock.acquire() -            old_state = computestate() -            self.state[w] = state -            state = computestate() -            self.lock.release() -            if state != old_state: -                self.set_state(state) -        else: -            if getattr(gconf, 'state_file', None): -                # If previous state is paused, suffix the -                # new state with '(Paused)' -                try: -                    with open(gconf.state_file, "r") as f: -                        content = f.read() -                        if "paused" in content.lower(): -                            state = state + '(Paused)' -                except IOError: -                    pass -                logging.info('new state: %s' % state) -                update_file(gconf.state_file, lambda f: f.write(state + '\n')) +        self.status = {}      @staticmethod      def terminate(): @@ -174,8 +152,7 @@ class Monitor(object):          # give a chance to graceful exit          os.kill(-os.getpid(), signal.SIGTERM) - -    def monitor(self, w, argv, cpids, agents, slave_vol, slave_host): +    def monitor(self, w, argv, cpids, agents, slave_vol, slave_host, master):          """the monitor loop          Basic logic is a blantantly simple blunt heuristics: @@ -194,8 +171,11 @@ class Monitor(object):          blown worker blows up on EPIPE if the net goes down,          due to the keep-alive thread)          """ +        if not self.status.get(w[0], None): +            self.status[w[0]] = GeorepStatus(gconf.state_file, w[0]) -        self.set_state(self.ST_INIT, w) +        set_monitor_status(gconf.state_file, self.ST_STARTED) +        self.status[w[0]].set_worker_status(self.ST_INIT)          ret = 0 @@ -310,7 +290,7 @@ class Monitor(object):                  nwait(apid) #wait for agent                  ret = nwait(cpid)              if ret is None: -                self.set_state(self.ST_STABLE, w) +                self.status[w[0]].set_worker_status(self.ST_STABLE)                  #If worker dies, agent terminates on EOF.                  #So lets wait for agent first.                  nwait(apid) @@ -320,12 +300,12 @@ class Monitor(object):              else:                  ret = exit_status(ret)                  if ret in (0, 1): -                    self.set_state(self.ST_FAULTY, w) +                    self.status[w[0]].set_worker_status(self.ST_FAULTY)              time.sleep(10) -        self.set_state(self.ST_INCON, w) +        self.status[w[0]].set_worker_status(self.ST_INCON)          return ret -    def multiplex(self, wspx, suuid, slave_vol, slave_host): +    def multiplex(self, wspx, suuid, slave_vol, slave_host, master):          argv = sys.argv[:]          for o in ('-N', '--no-daemon', '--monitor'):              while o in argv: @@ -339,7 +319,7 @@ class Monitor(object):          for wx in wspx:              def wmon(w):                  cpid, _ = self.monitor(w, argv, cpids, agents, slave_vol, -                                       slave_host) +                                       slave_host, master)                  time.sleep(1)                  self.lock.acquire()                  for cpid in cpids: @@ -401,7 +381,7 @@ def distribute(*resources):                    for idx, brick in enumerate(mvol.bricks)                    if is_host_local(brick['host'])]      logging.info('worker specs: ' + repr(workerspex)) -    return workerspex, suuid, slave_vol, slave_host +    return workerspex, suuid, slave_vol, slave_host, master  def monitor(*resources): diff --git a/geo-replication/syncdaemon/resource.py b/geo-replication/syncdaemon/resource.py index d3d1ee36e01..6bf1ad03e70 100644 --- a/geo-replication/syncdaemon/resource.py +++ b/geo-replication/syncdaemon/resource.py @@ -38,6 +38,7 @@ from syncdutils import umask, entry2pb, gauxpfx, errno_wrap, lstat  from syncdutils import NoPurgeTimeAvailable, PartialHistoryAvailable  from syncdutils import ChangelogException  from syncdutils import CHANGELOG_AGENT_CLIENT_VERSION +from gsyncdstatus import GeorepStatus  UrlRX = re.compile('\A(\w+)://([^ *?[]*)\Z') @@ -611,6 +612,9 @@ class Server(object):          def collect_failure(e, cmd_ret):              # We do this for failing fops on Slave              # Master should be logging this +            if cmd_ret is None: +                return +              if cmd_ret == EEXIST:                  disk_gfid = cls.gfid_mnt(e['entry'])                  if isinstance(disk_gfid, basestring): @@ -1344,6 +1348,8 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):              os.close(int(ra))              os.close(int(wa))              changelog_agent = RepceClient(int(inf), int(ouf)) +            status = GeorepStatus(gconf.state_file, gconf.local_path) +            status.reset_on_worker_start()              rv = changelog_agent.version()              if int(rv) != CHANGELOG_AGENT_CLIENT_VERSION:                  raise GsyncdError( @@ -1367,13 +1373,13 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):                                               g2.CHANGELOG_CONN_RETRIES)                  register_time = int(time.time()) -                g2.register(register_time, changelog_agent) -                g3.register(register_time, changelog_agent) +                g2.register(register_time, changelog_agent, status) +                g3.register(register_time, changelog_agent, status)              except ChangelogException as e:                  logging.error("Changelog register failed, %s" % e)                  sys.exit(1) -            g1.register() +            g1.register(status=status)              logging.info("Register time: %s" % register_time)              # oneshot: Try to use changelog history api, if not              # available switch to FS crawl diff --git a/geo-replication/tests/unit/test_gsyncdstatus.py b/geo-replication/tests/unit/test_gsyncdstatus.py new file mode 100644 index 00000000000..a65d659e356 --- /dev/null +++ b/geo-replication/tests/unit/test_gsyncdstatus.py @@ -0,0 +1,193 @@ +#!/usr/bin/env python +# +# Copyright (c) 2011-2014 Red Hat, Inc. <http://www.redhat.com> +# This file is part of GlusterFS. + +# This file is licensed to you under your choice of the GNU Lesser +# General Public License, version 3 or any later version (LGPLv3 or +# later), or the GNU General Public License, version 2 (GPLv2), in all +# cases as published by the Free Software Foundation. +# + +import unittest +import os +import urllib + +from syncdaemon.gstatus import GeorepStatus, set_monitor_status +from syncdaemon.gstatus import get_default_values +from syncdaemon.gstatus import MONITOR_STATUS, DEFAULT_STATUS +from syncdaemon.gstatus import STATUS_VALUES, CRAWL_STATUS_VALUES +from syncdaemon.gstatus import human_time, human_time_utc + + +class GeorepStatusTestCase(unittest.TestCase): +    @classmethod +    def setUpClass(cls): +        cls.work_dir = os.path.dirname(os.path.abspath(__file__)) +        cls.monitor_status_file = os.path.join(cls.work_dir, "monitor.status") +        cls.brick = "/exports/bricks/b1" +        cls.status = GeorepStatus(cls.monitor_status_file, cls.brick) +        cls.statusfile = os.path.join(cls.work_dir, +                                      "brick_%s.status" +                                      % urllib.quote_plus(cls.brick)) + +    @classmethod +    def tearDownClass(cls): +        os.remove(cls.statusfile) +        os.remove(cls.monitor_status_file) + +    def _filter_dict(self, inp, keys): +        op = {} +        for k in keys: +            op[k] = inp.get(k, None) +        return op + +    def test_monitor_status_file_created(self): +        self.assertTrue(os.path.exists(self.monitor_status_file)) + +    def test_status_file_created(self): +        self.assertTrue(os.path.exists(self.statusfile)) + +    def test_set_monitor_status(self): +        for st in MONITOR_STATUS: +            set_monitor_status(self.monitor_status_file, st) +            self.assertTrue(self.status.get_monitor_status(), st) + +    def test_default_values_test(self): +        self.assertTrue(get_default_values(), { +            "slave_node": DEFAULT_STATUS, +            "worker_status": DEFAULT_STATUS, +            "last_synced": 0, +            "last_synced_utc": 0, +            "crawl_status": DEFAULT_STATUS, +            "entry": 0, +            "data": 0, +            "metadata": 0, +            "failures": 0, +            "checkpoint_completed": False, +            "checkpoint_time": 0, +            "checkpoint_time_utc": 0, +            "checkpoint_completion_time": 0, +            "checkpoint_completion_time_utc": 0 +        }) + +    def test_human_time(self): +        self.assertTrue(human_time(1429174398), "2015-04-16 14:23:18") + +    def test_human_time_utc(self): +        self.assertTrue(human_time_utc(1429174398), "2015-04-16 08:53:18") + +    def test_invalid_human_time(self): +        self.assertTrue(human_time(142917439), DEFAULT_STATUS) +        self.assertTrue(human_time("abcdef"), DEFAULT_STATUS) + +    def test_invalid_human_time_utc(self): +        self.assertTrue(human_time_utc(142917439), DEFAULT_STATUS) +        self.assertTrue(human_time_utc("abcdef"), DEFAULT_STATUS) + +    def test_worker_status(self): +        set_monitor_status(self.monitor_status_file, "Started") +        for st in STATUS_VALUES: +            self.status.set_worker_status(st) +            self.assertTrue(self.status.get_status()["worker_status"], st) + +    def test_crawl_status(self): +        set_monitor_status(self.monitor_status_file, "Started") +        self.status.set_active() +        for st in CRAWL_STATUS_VALUES: +            self.status.set_worker_crawl_status(st) +            self.assertTrue(self.status.get_status()["crawl_status"], st) + +    def test_slave_node(self): +        set_monitor_status(self.monitor_status_file, "Started") +        self.status.set_active() +        self.status.set_slave_node("fvm2") +        self.assertTrue(self.status.get_status()["slave_node"], "fvm2") + +        self.status.set_worker_status("Passive") +        self.status.set_slave_node("fvm2") +        self.assertTrue(self.status.get_status()["slave_node"], "fvm2") + +    def test_active_worker_status(self): +        set_monitor_status(self.monitor_status_file, "Started") +        self.status.set_active() +        self.assertTrue(self.status.get_status()["worker_status"], "Active") + +    def test_passive_worker_status(self): +        set_monitor_status(self.monitor_status_file, "Started") +        self.status.set_passive() +        self.assertTrue(self.status.get_status()["worker_status"], "Passive") + +    def test_set_field(self): +        set_monitor_status(self.monitor_status_file, "Started") +        self.status.set_active() +        self.status.set_field("entry", 42) +        self.assertTrue(self.status.get_status()["entry"], 42) + +    def test_inc_value(self): +        set_monitor_status(self.monitor_status_file, "Started") +        self.status.set_active() +        self.status.set_field("entry", 0) +        self.status.inc_value("entry", 2) +        self.assertTrue(self.status.get_status()["entry"], 2) + +        self.status.set_field("data", 0) +        self.status.inc_value("data", 2) +        self.assertTrue(self.status.get_status()["data"], 2) + +        self.status.set_field("meta", 0) +        self.status.inc_value("meta", 2) +        self.assertTrue(self.status.get_status()["meta"], 2) + +        self.status.set_field("failures", 0) +        self.status.inc_value("failures", 2) +        self.assertTrue(self.status.get_status()["failures"], 2) + +    def test_dec_value(self): +        set_monitor_status(self.monitor_status_file, "Started") +        self.status.set_active() + +        self.status.set_field("entry", 4) +        self.status.inc_value("entry", 2) +        self.assertTrue(self.status.get_status()["entry"], 2) + +        self.status.set_field("data", 4) +        self.status.inc_value("data", 2) +        self.assertTrue(self.status.get_status()["data"], 2) + +        self.status.set_field("meta", 4) +        self.status.inc_value("meta", 2) +        self.assertTrue(self.status.get_status()["meta"], 2) + +        self.status.set_field("failures", 4) +        self.status.inc_value("failures", 2) +        self.assertTrue(self.status.get_status()["failures"], 2) + +    def test_worker_status_when_monitor_status_created(self): +        set_monitor_status(self.monitor_status_file, "Created") +        for st in STATUS_VALUES: +            self.status.set_worker_status(st) +            self.assertTrue(self.status.get_status()["worker_status"], +                            "Created") + +    def test_worker_status_when_monitor_status_paused(self): +        set_monitor_status(self.monitor_status_file, "Paused") +        for st in STATUS_VALUES: +            self.status.set_worker_status(st) +            self.assertTrue(self.status.get_status()["worker_status"], +                            "Paused") + +    def test_worker_status_when_monitor_status_stopped(self): +        set_monitor_status(self.monitor_status_file, "Stopped") +        for st in STATUS_VALUES: +            self.status.set_worker_status(st) +            self.assertTrue(self.status.get_status()["worker_status"], +                            "Stopped") + +    def test_status_when_worker_status_active(self): +        set_monitor_status(self.monitor_status_file, "Started") +        self.status.set_active() + + +if __name__ == "__main__": +    unittest.main() diff --git a/rpc/rpc-lib/src/protocol-common.h b/rpc/rpc-lib/src/protocol-common.h index 60697b8fa66..e2c8443a7fb 100644 --- a/rpc/rpc-lib/src/protocol-common.h +++ b/rpc/rpc-lib/src/protocol-common.h @@ -250,15 +250,21 @@ struct gf_gsync_detailed_status_ {          char master[NAME_MAX];          char brick[NAME_MAX];          char slave_user[NAME_MAX]; +        char slave[NAME_MAX];          char slave_node[NAME_MAX];          char worker_status[NAME_MAX]; -        char checkpoint_status[NAME_MAX];          char crawl_status[NAME_MAX]; -        char files_syncd[NAME_MAX]; -        char files_remaining[NAME_MAX]; -        char bytes_remaining[NAME_MAX]; -        char purges_remaining[NAME_MAX]; -        char total_files_skipped[NAME_MAX]; +        char last_synced[NAME_MAX]; +        char last_synced_utc[NAME_MAX]; +        char entry[NAME_MAX]; +        char data[NAME_MAX]; +        char meta[NAME_MAX]; +        char failures[NAME_MAX]; +        char checkpoint_time[NAME_MAX]; +        char checkpoint_time_utc[NAME_MAX]; +        char checkpoint_completed[NAME_MAX]; +        char checkpoint_completion_time[NAME_MAX]; +        char checkpoint_completion_time_utc[NAME_MAX];          char brick_host_uuid[NAME_MAX];          char slavekey[NAME_MAX];          char session_slave[NAME_MAX]; diff --git a/xlators/mgmt/glusterd/src/glusterd-geo-rep.c b/xlators/mgmt/glusterd/src/glusterd-geo-rep.c index 708d6d3816d..24768e38231 100644 --- a/xlators/mgmt/glusterd/src/glusterd-geo-rep.c +++ b/xlators/mgmt/glusterd/src/glusterd-geo-rep.c @@ -667,6 +667,128 @@ glusterd_gsync_get_config (char *master, char *slave, char *conf_path, dict_t *d  }  static int +_fcbk_statustostruct (char *resbuf, size_t blen, FILE *fp, +                      void *data) +{ +        char                  *ptr = NULL; +        char                    *v = NULL; +        char                    *k = NULL; +        gf_gsync_status_t *sts_val = NULL; + +        sts_val = (gf_gsync_status_t *)data; + +        for (;;) { +                errno = 0; +                ptr = fgets (resbuf, blen, fp); +                if (!ptr) +                        break; +                v = resbuf + strlen(resbuf) - 1; +                while (isspace (*v)) +                        /* strip trailing space */ +                        *v-- = '\0'; +                if (v == resbuf) +                        /* skip empty line */ +                        continue; +                v = strchr (resbuf, ':'); +                if (!v) +                        return -1; +                *v++ = '\0'; +                while (isspace (*v)) +                        v++; +                v = gf_strdup (v); +                if (!v) +                        return -1; + +                k = gf_strdup (resbuf); +                if (!k) +                        return -1; + +                if (strcmp (k, "worker_status") == 0) { +                        memcpy (sts_val->worker_status, v, +                                strlen(v)); +                        sts_val->worker_status[strlen(v)] = '\0'; +                } else if (strcmp (k, "slave_node") == 0) { +                        memcpy (sts_val->slave_node, v, +                                strlen(v)); +                        sts_val->slave_node[strlen(v)] = '\0'; +                } else if (strcmp (k, "crawl_status") == 0) { +                        memcpy (sts_val->crawl_status, v, +                                strlen(v)); +                        sts_val->crawl_status[strlen(v)] = '\0'; +                } else if (strcmp (k, "last_synced") == 0) { +                        memcpy (sts_val->last_synced, v, +                                strlen(v)); +                        sts_val->last_synced[strlen(v)] = '\0'; +                } else if (strcmp (k, "last_synced_utc") == 0) { +                        memcpy (sts_val->last_synced_utc, v, +                                strlen(v)); +                        sts_val->last_synced_utc[strlen(v)] = '\0'; +                } else if (strcmp (k, "entry") == 0) { +                        memcpy (sts_val->entry, v, +                                strlen(v)); +                        sts_val->entry[strlen(v)] = '\0'; +                } else if (strcmp (k, "data") == 0) { +                        memcpy (sts_val->data, v, +                                strlen(v)); +                        sts_val->data[strlen(v)] = '\0'; +                } else if (strcmp (k, "meta") == 0) { +                        memcpy (sts_val->meta, v, +                                strlen(v)); +                        sts_val->meta[strlen(v)] = '\0'; +                } else if (strcmp (k, "failures") == 0) { +                        memcpy (sts_val->failures, v, +                                strlen(v)); +                        sts_val->failures[strlen(v)] = '\0'; +                } else if (strcmp (k, "checkpoint_time") == 0) { +                        memcpy (sts_val->checkpoint_time, v, +                                strlen(v)); +                        sts_val->checkpoint_time[strlen(v)] = '\0'; +                } else if (strcmp (k, "checkpoint_time_utc") == 0) { +                        memcpy (sts_val->checkpoint_time_utc, v, +                                strlen(v)); +                        sts_val->checkpoint_time_utc[strlen(v)] = '\0'; +                } else if (strcmp (k, "checkpoint_completed") == 0) { +                        memcpy (sts_val->checkpoint_completed, v, +                                strlen(v)); +                        sts_val->checkpoint_completed[strlen(v)] = '\0'; +                } else if (strcmp (k, "checkpoint_completion_time") == 0) { +                        memcpy (sts_val->checkpoint_completion_time, v, +                                strlen(v)); +                        sts_val->checkpoint_completion_time[strlen(v)] = '\0'; +                } else if (strcmp (k, "checkpoint_completion_time_utc") == 0) { +                        memcpy (sts_val->checkpoint_completion_time_utc, v, +                                strlen(v)); +                        sts_val->checkpoint_completion_time_utc[strlen(v)] = +                                '\0'; +                } +        } + +        return errno ? -1 : 0; +} + + +static int +glusterd_gsync_get_status (char *master, char *slave, char *conf_path, +                           char *brick_path, gf_gsync_status_t *sts_val) +{ +        /* key + value, where value must be able to accommodate a path */ +        char resbuf[256 + PATH_MAX] = {0,}; +        runner_t             runner = {0,}; + +        runinit (&runner); +        runner_add_args  (&runner, GSYNCD_PREFIX"/gsyncd", "-c", NULL); +        runner_argprintf (&runner, "%s", conf_path); +        runner_argprintf (&runner, "--iprefix=%s", DATADIR); +        runner_argprintf (&runner, ":%s", master); +        runner_add_args  (&runner, slave, "--status-get", NULL); +        runner_add_args  (&runner, "--path", brick_path, NULL); + +        return glusterd_query_extutil_generic (resbuf, sizeof (resbuf), +                                               &runner, sts_val, +                                               _fcbk_statustostruct); +} + +static int  glusterd_gsync_get_param_file (char *prmfile, const char *param, char *master,                                 char *slave, char *conf_path)  { @@ -2804,7 +2926,6 @@ gd_pause_or_resume_gsync (dict_t *dict, char *master, char *slave,          gf_boolean_t    is_template_in_use       = _gf_false;          char            monitor_status[NAME_MAX] = {0,};          char            *statefile               = NULL; -        char            *token                   = NULL;          xlator_t        *this                    = NULL;          this = THIS; @@ -2869,10 +2990,10 @@ gd_pause_or_resume_gsync (dict_t *dict, char *master, char *slave,                            do not update status again*/                          if (strstr (monitor_status, "Paused"))                                  goto out; -                        (void) strcat (monitor_status, "(Paused)"); +                          ret = glusterd_create_status_file ( master, slave,                                                       slave_host, slave_vol, -                                                     monitor_status); +                                                     "Paused");                          if (ret) {                                  gf_log (this->name, GF_LOG_ERROR,                                          "Unable  to update state_file." @@ -2893,10 +3014,10 @@ gd_pause_or_resume_gsync (dict_t *dict, char *master, char *slave,                                  goto out;                          }                  } else { -                        token = strtok (monitor_status, "(");                          ret = glusterd_create_status_file (master, slave,                                                             slave_host, -                                                           slave_vol, token); +                                                           slave_vol, +                                                           "Started");                          if (ret) {                                  gf_log (this->name, GF_LOG_ERROR,                                          "Resume Failed: Unable to update " @@ -3321,159 +3442,6 @@ out:          return ret;  } -static int -glusterd_parse_gsync_status (char *buf, gf_gsync_status_t *sts_val) -{ -        int              ret      = -1; -        int              i      = -1; -        int              num_of_fields = 8; -        char            *token    = NULL; -        char           **tokens   = NULL; -        char           **ptr   = NULL; -        char            *save_ptr = NULL; -        char             na_buf[] = "N/A"; - -        if (!buf) { -                gf_log ("", GF_LOG_ERROR, "Empty buf"); -                goto out; -        } - -        tokens = calloc (num_of_fields, sizeof (char *)); -        if (!tokens) { -                gf_log ("", GF_LOG_ERROR, "Out of memory"); -                goto out; -        } - -        ptr = tokens; - -        for (token = strtok_r (buf, ",", &save_ptr); token; -             token = strtok_r (NULL, ",", &save_ptr)) { -                *ptr = gf_strdup(token); -                if (!*ptr) { -                        gf_log ("", GF_LOG_ERROR, "Out of memory"); -                        goto out; -                } -                ptr++; -        } - -        for (i = 0; i < num_of_fields; i++) { -                token = strtok_r (tokens[i], ":", &save_ptr); -                token = strtok_r (NULL, "\0", &save_ptr); -                token++; - -                /* token NULL check */ -                if (!token && (i != 0) && -                    (i != 5) && (i != 7)) -                    token = na_buf; - -                if (i == 0) { -                        if (!token) -                            token = na_buf; -                        else { -                            token++; -                            if (!token) -                                token = na_buf; -                            else -                                token[strlen(token) - 1] = '\0'; -                        } -                        memcpy (sts_val->slave_node, token, strlen(token)); -                } -                if (i == 1) -                        memcpy (sts_val->files_syncd, token, strlen(token)); -                if (i == 2) -                        memcpy (sts_val->purges_remaining, token, strlen(token)); -                if (i == 3) -                        memcpy (sts_val->total_files_skipped, token, strlen(token)); -                if (i == 4) -                        memcpy (sts_val->files_remaining, token, strlen(token)); -                if (i == 5) { -                        if (!token) -                            token = na_buf; -                        else { -                            token++; -                            if (!token) -                                token = na_buf; -                            else -                                token[strlen(token) - 1] = '\0'; -                        } -                        memcpy (sts_val->worker_status, token, strlen(token)); -                } -                if (i == 6) -                        memcpy (sts_val->bytes_remaining, token, strlen(token)); -                if (i == 7) { -                        if (!token) -                            token = na_buf; -                        else { -                            token++; -                            if (!token) -                                token = na_buf; -                            else -                                token[strlen(token) - 2] = '\0'; -                        } -                        memcpy (sts_val->crawl_status, token, strlen(token)); -                } -        } - -        ret = 0; -out: -        for (i = 0; i< num_of_fields; i++) -               if (tokens[i]) -                       GF_FREE(tokens[i]); - -        gf_log ("", GF_LOG_DEBUG, "Returning %d", ret); -        return ret; -} - -static int -glusterd_gsync_fetch_status_extra (char *path, gf_gsync_status_t *sts_val) -{ -        char sockpath[PATH_MAX] = {0,}; -        struct sockaddr_un   sa = {0,}; -        int                   s = -1; -        struct pollfd       pfd = {0,}; -        int                 ret = 0; - -        glusterd_set_socket_filepath (path, sockpath, sizeof (sockpath)); - -        strncpy(sa.sun_path, sockpath, sizeof(sa.sun_path)); -        if (sa.sun_path[sizeof (sa.sun_path) - 1]) -                return -1; -        sa.sun_family = AF_UNIX; - -        s = socket(AF_UNIX, SOCK_STREAM, 0); -        if (s == -1) -                return -1; -        ret = fcntl (s, F_GETFL); -        if (ret != -1) -                ret = fcntl (s, F_SETFL, ret | O_NONBLOCK); -        if (ret == -1) -                goto out; - -        ret = connect (s, (struct sockaddr *)&sa, sizeof (sa)); -        if (ret == -1) -                goto out; -        pfd.fd = s; -        pfd.events = POLLIN; -        /* we don't want to hang on gsyncd */ -        if (poll (&pfd, 1, 5000) < 1 || -            !(pfd.revents & POLLIN)) { -                ret = -1; -                goto out; -        } -        ret = read(s, sts_val->checkpoint_status, -                   sizeof(sts_val->checkpoint_status)); -        /* we expect a terminating 0 byte */ -        if (ret == 0 || (ret > 0 && sts_val->checkpoint_status[ret - 1])) -                ret = -1; -        if (ret > 0) { -                ret = 0; -        } - -out: -        close (s); -        return ret; -} -  int  glusterd_fetch_values_from_config (char *master, char *slave,                                     char *confpath, dict_t *confd, @@ -3567,6 +3535,7 @@ glusterd_read_status_file (glusterd_volinfo_t *volinfo, char *slave,          gf_boolean_t            is_template_in_use         = _gf_false;          glusterd_conf_t        *priv                       = NULL;          struct stat             stbuf                      = {0,}; +        dict_t                 *statusd                    = NULL;          GF_ASSERT (THIS);          GF_ASSERT (THIS->private); @@ -3657,114 +3626,7 @@ fetch_data:                          goto out;                  } -                /* Creating the brick state file's path */ -                memset(brick_state_file, '\0', PATH_MAX); -                memcpy (brick_path, brickinfo->path, PATH_MAX - 1); -                for (i = 0; i < strlen(brick_path) - 1; i++) -                        if (brick_path[i] == '/') -                                brick_path[i] = '_'; -                ret = snprintf(brick_state_file, PATH_MAX - 1, "%s%s.status", -                               georep_session_wrkng_dir, brick_path); -                brick_state_file[ret] = '\0'; - -                gf_log ("", GF_LOG_DEBUG, "brick_state_file = %s", brick_state_file); - -                memset (tmp, '\0', sizeof(tmp)); - -                ret = glusterd_gsync_read_frm_status (brick_state_file, -                                                      tmp, sizeof (tmp)); -                if (ret <= 0) { -                        gf_log ("", GF_LOG_ERROR, "Unable to read the status" -                                "file for %s brick for  %s(master), %s(slave) " -                                "session", brickinfo->path, master, slave); -                        memcpy (sts_val->slave_node, slave, strlen(slave)); -                        sts_val->slave_node[strlen(slave)] = '\0'; -                        ret = snprintf (sts_val->worker_status, sizeof(sts_val->worker_status), "N/A"); -                        sts_val->worker_status[ret] = '\0'; -                        ret = snprintf (sts_val->checkpoint_status, sizeof(sts_val->checkpoint_status), "N/A"); -                        sts_val->checkpoint_status[ret] = '\0'; -                        ret = snprintf (sts_val->crawl_status, sizeof(sts_val->crawl_status), "N/A"); -                        sts_val->crawl_status[ret] = '\0'; -                        ret = snprintf (sts_val->files_syncd, sizeof(sts_val->files_syncd), "N/A"); -                        sts_val->files_syncd[ret] = '\0'; -                        ret = snprintf (sts_val->purges_remaining, sizeof(sts_val->purges_remaining), "N/A"); -                        sts_val->purges_remaining[ret] = '\0'; -                        ret = snprintf (sts_val->total_files_skipped, sizeof(sts_val->total_files_skipped), "N/A"); -                        sts_val->total_files_skipped[ret] = '\0'; -                        ret = snprintf (sts_val->files_remaining, sizeof(sts_val->files_remaining), "N/A"); -                        sts_val->files_remaining[ret] = '\0'; -                        ret = snprintf (sts_val->bytes_remaining, sizeof(sts_val->bytes_remaining), "N/A"); -                        sts_val->bytes_remaining[ret] = '\0'; -                        goto store_status; -                } - -                ret = glusterd_gsync_fetch_status_extra (socketfile, sts_val); -                if (ret || strlen(sts_val->checkpoint_status) == 0) { -                        gf_log ("", GF_LOG_DEBUG, "No checkpoint status" -                                "for %s(master), %s(slave)", master, slave); -                        ret = snprintf (sts_val->checkpoint_status, sizeof(sts_val->checkpoint_status), "N/A"); -                        sts_val->checkpoint_status[ret] = '\0'; -                } - -                ret = glusterd_parse_gsync_status (tmp, sts_val); -                if (ret) { -                        gf_log ("", GF_LOG_ERROR, -                                "Unable to parse the gsync status for %s", -                                brickinfo->path); -                        memcpy (sts_val->slave_node, slave, strlen(slave)); -                        sts_val->slave_node[strlen(slave)] = '\0'; -                        ret = snprintf (sts_val->worker_status, sizeof(sts_val->worker_status), "N/A"); -                        sts_val->worker_status[ret] = '\0'; -                        ret = snprintf (sts_val->checkpoint_status, sizeof(sts_val->checkpoint_status), "N/A"); -                        sts_val->checkpoint_status[ret] = '\0'; -                        ret = snprintf (sts_val->crawl_status, sizeof(sts_val->crawl_status), "N/A"); -                        sts_val->crawl_status[ret] = '\0'; -                        ret = snprintf (sts_val->files_syncd, sizeof(sts_val->files_syncd), "N/A"); -                        sts_val->files_syncd[ret] = '\0'; -                        ret = snprintf (sts_val->purges_remaining, sizeof(sts_val->purges_remaining), "N/A"); -                        sts_val->purges_remaining[ret] = '\0'; -                        ret = snprintf (sts_val->total_files_skipped, sizeof(sts_val->total_files_skipped), "N/A"); -                        sts_val->total_files_skipped[ret] = '\0'; -                        ret = snprintf (sts_val->files_remaining, sizeof(sts_val->files_remaining), "N/A"); -                        sts_val->files_remaining[ret] = '\0'; -                        ret = snprintf (sts_val->bytes_remaining, sizeof(sts_val->bytes_remaining), "N/A"); -                        sts_val->bytes_remaining[ret] = '\0'; -                } - -store_status: -                if ((strcmp (monitor_status, "Stable"))) { -                        memcpy (sts_val->worker_status, monitor_status, strlen(monitor_status)); -                        sts_val->worker_status[strlen(monitor_status)] = '\0'; -                        ret = snprintf (sts_val->crawl_status, sizeof(sts_val->crawl_status), "N/A"); -                        sts_val->crawl_status[ret] = '\0'; -                        ret = snprintf (sts_val->checkpoint_status, sizeof(sts_val->checkpoint_status), "N/A"); -                        sts_val->checkpoint_status[ret] = '\0'; -                } - -                if (is_template_in_use) { -                        ret = snprintf (sts_val->worker_status, -                                        sizeof(sts_val->worker_status), -                                        "Config Corrupted"); -                        sts_val->worker_status[ret] = '\0'; -                } - -                if (strcmp (sts_val->worker_status, "Active")) { -                        ret = snprintf (sts_val->checkpoint_status, sizeof(sts_val->checkpoint_status), "N/A"); -                        sts_val->checkpoint_status[ret] = '\0'; -                        ret = snprintf (sts_val->crawl_status, sizeof(sts_val->crawl_status), "N/A"); -                        sts_val->crawl_status[ret] = '\0'; -                } - -                if (!strcmp (sts_val->slave_node, "N/A")) { -                        memcpy (sts_val->slave_node, slave, strlen(slave)); -                        sts_val->slave_node[strlen(slave)] = '\0'; -                } - -                memcpy (sts_val->node, node, strlen(node)); -                sts_val->node[strlen(node)] = '\0'; -                memcpy (sts_val->brick, brickinfo->path, strlen(brickinfo->path)); -                sts_val->brick[strlen(brickinfo->path)] = '\0'; - +                /* Slave Key */                  ret = glusterd_get_slave (volinfo, slave, &slavekey);                  if (ret < 0) {                          GF_FREE (sts_val); @@ -3773,14 +3635,87 @@ store_status:                  memcpy (sts_val->slavekey, slavekey, strlen(slavekey));                  sts_val->slavekey[strlen(slavekey)] = '\0'; +                /* Master Volume */ +                memcpy (sts_val->master, master, strlen(master)); +                sts_val->master[strlen(master)] = '\0'; + +                /* Master Brick Node */ +                memcpy (sts_val->node, node, strlen(node)); +                sts_val->node[strlen(node)] = '\0'; + +                /* Master Brick Path */ +                memcpy (sts_val->brick, brickinfo->path, +                        strlen(brickinfo->path)); +                sts_val->brick[strlen(brickinfo->path)] = '\0'; + +                /* Brick Host UUID */                  brick_host_uuid = uuid_utoa(brickinfo->uuid);                  brick_host_uuid_length = strlen (brick_host_uuid);                  memcpy (sts_val->brick_host_uuid, brick_host_uuid,                          brick_host_uuid_length);                  sts_val->brick_host_uuid[brick_host_uuid_length] = '\0'; -                memcpy (sts_val->master, master, strlen(master)); -                sts_val->master[strlen(master)] = '\0'; +                /* Slave */ +                memcpy (sts_val->slave, slave, strlen(slave)); +                sts_val->slave[strlen(slave)] = '\0'; + +                snprintf (sts_val->slave_node, +                          sizeof(sts_val->slave_node), "N/A"); + +                snprintf (sts_val->worker_status, +                          sizeof(sts_val->worker_status), "N/A"); + +                snprintf (sts_val->crawl_status, +                          sizeof(sts_val->crawl_status), "N/A"); + +                snprintf (sts_val->last_synced, +                          sizeof(sts_val->last_synced), "N/A"); + +                snprintf (sts_val->last_synced_utc, +                          sizeof(sts_val->last_synced_utc), "N/A"); + +                snprintf (sts_val->entry, sizeof(sts_val->entry), "N/A"); + +                snprintf (sts_val->data, sizeof(sts_val->data), "N/A"); + +                snprintf (sts_val->meta, sizeof(sts_val->meta), "N/A"); + +                snprintf (sts_val->failures, sizeof(sts_val->failures), "N/A"); + +                snprintf (sts_val->checkpoint_time, +                          sizeof(sts_val->checkpoint_time), "N/A"); + +                snprintf (sts_val->checkpoint_time_utc, +                          sizeof(sts_val->checkpoint_time_utc), "N/A"); + +                snprintf (sts_val->checkpoint_completed, +                          sizeof(sts_val->checkpoint_completed), "N/A"); + +                snprintf (sts_val->checkpoint_completion_time, +                          sizeof(sts_val->checkpoint_completion_time), +                          "N/A"); + +                snprintf (sts_val->checkpoint_completion_time_utc, +                          sizeof(sts_val->checkpoint_completion_time_utc), +                          "N/A"); + +                /* Get all the other values from Gsyncd */ +                ret = glusterd_gsync_get_status (master, slave, conf_path, +                                                 brickinfo->path, sts_val); + +                if (ret) { +                        gf_log ("", GF_LOG_ERROR, "Unable to get status data " +                                "for %s(master), %s(slave), %s(brick)", +                                master, slave, brickinfo->path); +                        ret = -1; +                        goto out; +                } + +                if (is_template_in_use) { +                        snprintf (sts_val->worker_status, +                                  sizeof(sts_val->worker_status), +                                  "Config Corrupted"); +                }                  ret = dict_get_str (volinfo->gsync_slaves, slavekey,                                      &slaveentry); @@ -3809,8 +3744,10 @@ store_status:                          strlen(slaveuser));                  sts_val->slave_user[strlen(slaveuser)] = '\0'; -                snprintf (sts_val_name, sizeof (sts_val_name), "status_value%d", gsync_count); -                ret = dict_set_bin (dict, sts_val_name, sts_val, sizeof(gf_gsync_status_t)); +                snprintf (sts_val_name, sizeof (sts_val_name), +                          "status_value%d", gsync_count); +                ret = dict_set_bin (dict, sts_val_name, sts_val, +                                    sizeof(gf_gsync_status_t));                  if (ret) {                          GF_FREE (sts_val);                          goto out; @@ -5258,7 +5195,7 @@ glusterd_create_essential_dir_files (glusterd_volinfo_t *volinfo, dict_t *dict,          } else {                  ret = glusterd_create_status_file (volinfo->volname, slave,                                                     slave_host, slave_vol, -                                                   "Not Started"); +                                                   "Created");                  if (ret || lstat (statefile, &stbuf)) {                          snprintf (errmsg, sizeof (errmsg), "Unable to create %s"                                    ". Error : %s", statefile, strerror (errno));  | 
