summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAravinda VK <avishwan@redhat.com>2015-03-12 16:07:13 +0530
committerVijay Bellur <vbellur@redhat.com>2015-05-05 02:15:24 -0700
commit98b69412e92742e0638ef8bd76223671386f5a39 (patch)
tree4d02c8989c50c7b219404900bc7beac327b19dca
parente02ac3c28241ff004d6cfbfc03975822146ce5dd (diff)
geo-rep: Status Enhancements
Discussion in gluster-devel http://www.gluster.org/pipermail/gluster-devel/2015-April/044301.html MASTER NODE - Master Volume Node MASTER VOL - Master Volume name MASTER BRICK - Master Volume Brick SLAVE USER - Slave User to which Geo-rep session is established SLAVE - <SLAVE_NODE>::<SLAVE_VOL> used in Geo-rep Create command SLAVE NODE - Slave Node to which Master worker is connected STATUS - Worker Status(Created, Initializing, Active, Passive, Faulty, Paused, Stopped) CRAWL STATUS - Crawl type(Hybrid Crawl, History Crawl, Changelog Crawl) LAST_SYNCED - Last Synced Time(Local Time in CLI output and UTC in XML output) ENTRY - Number of entry Operations pending.(Resets on worker restart) DATA - Number of Data operations pending(Resets on worker restart) META - Number of Meta operations pending(Resets on worker restart) FAILURES - Number of Failures CHECKPOINT TIME - Checkpoint set Time(Local Time in CLI output and UTC in XML output) CHECKPOINT COMPLETED - Yes/No or N/A CHECKPOINT COMPLETION TIME - Checkpoint Completed Time(Local Time in CLI output and UTC in XML output) XML output: <?xml version="1.0" encoding="UTF-8" standalone="yes"?> cliOutput> geoRep> volume> name> sessions> session> session_slave> pair> master_node> master_brick> slave_user> slave/> slave_node> status> crawl_status> entry> data> meta> failures> checkpoint_completed> master_node_uuid> last_synced> checkpoint_time> checkpoint_completion_time> BUG: 1212410 Change-Id: I944a6c3c67f1e6d6baf9670b474233bec8f61ea3 Signed-off-by: Aravinda VK <avishwan@redhat.com> Reviewed-on: http://review.gluster.org/10121 Tested-by: NetBSD Build System Reviewed-by: Kotresh HR <khiremat@redhat.com> Tested-by: Gluster Build System <jenkins@build.gluster.com> Reviewed-by: Vijay Bellur <vbellur@redhat.com>
-rw-r--r--cli/src/cli-cmd-parser.c4
-rw-r--r--cli/src/cli-rpc-ops.c80
-rw-r--r--cli/src/cli-xml-output.c68
-rw-r--r--cli/src/cli.h3
-rw-r--r--geo-replication/syncdaemon/Makefile.am3
-rw-r--r--geo-replication/syncdaemon/gsyncd.py25
-rw-r--r--geo-replication/syncdaemon/gsyncdstatus.py317
-rw-r--r--geo-replication/syncdaemon/master.py374
-rw-r--r--geo-replication/syncdaemon/monitor.py58
-rw-r--r--geo-replication/syncdaemon/resource.py12
-rw-r--r--geo-replication/tests/unit/test_gsyncdstatus.py193
-rw-r--r--rpc/rpc-lib/src/protocol-common.h18
-rw-r--r--xlators/mgmt/glusterd/src/glusterd-geo-rep.c479
13 files changed, 917 insertions, 717 deletions
diff --git a/cli/src/cli-cmd-parser.c b/cli/src/cli-cmd-parser.c
index 65ccfcd..d70f6ee 100644
--- a/cli/src/cli-cmd-parser.c
+++ b/cli/src/cli-cmd-parser.c
@@ -2274,8 +2274,8 @@ config_parse (const char **words, int wordcount, dict_t *dict,
ret = -1;
goto out;
}
- snprintf (append_str, 300, "now:%" GF_PRI_SECOND ".%06"GF_PRI_SUSECONDS,
- tv.tv_sec, tv.tv_usec);
+ snprintf (append_str, 300, "%" GF_PRI_SECOND,
+ tv.tv_sec);
}
ret = dict_set_dynstr (dict, "op_value", append_str);
diff --git a/cli/src/cli-rpc-ops.c b/cli/src/cli-rpc-ops.c
index 091608d..49c8761 100644
--- a/cli/src/cli-rpc-ops.c
+++ b/cli/src/cli-rpc-ops.c
@@ -4406,18 +4406,24 @@ get_struct_variable (int mem_num, gf_gsync_status_t *sts_val)
case 1: return (sts_val->master);
case 2: return (sts_val->brick);
case 3: return (sts_val->slave_user);
- case 4: return (sts_val->slave_node);
- case 5: return (sts_val->worker_status);
- case 6: return (sts_val->checkpoint_status);
+ case 4: return (sts_val->slave);
+ case 5: return (sts_val->slave_node);
+ case 6: return (sts_val->worker_status);
case 7: return (sts_val->crawl_status);
- case 8: return (sts_val->files_syncd);
- case 9: return (sts_val->files_remaining);
- case 10: return (sts_val->bytes_remaining);
- case 11: return (sts_val->purges_remaining);
- case 12: return (sts_val->total_files_skipped);
- case 13: return (sts_val->brick_host_uuid);
- case 14: return (sts_val->slavekey);
- case 15: return (sts_val->session_slave);
+ case 8: return (sts_val->last_synced);
+ case 9: return (sts_val->entry);
+ case 10: return (sts_val->data);
+ case 11: return (sts_val->meta);
+ case 12: return (sts_val->failures);
+ case 13: return (sts_val->checkpoint_time);
+ case 14: return (sts_val->checkpoint_completed);
+ case 15: return (sts_val->checkpoint_completion_time);
+ case 16: return (sts_val->brick_host_uuid);
+ case 17: return (sts_val->last_synced_utc);
+ case 18: return (sts_val->checkpoint_time_utc);
+ case 19: return (sts_val->checkpoint_completion_time_utc);
+ case 20: return (sts_val->slavekey);
+ case 21: return (sts_val->session_slave);
default:
goto out;
}
@@ -4435,7 +4441,7 @@ gf_cli_print_status (char **title_values,
int i = 0;
int j = 0;
int ret = 0;
- int status_fields = 7; /* Indexed at 0 */
+ int status_fields = 8; /* Indexed at 0 */
int total_spacing = 0;
char **output_values = NULL;
char *tmp = NULL;
@@ -4494,13 +4500,15 @@ gf_cli_print_status (char **title_values,
strlen(title_values[j]));
output_values[j][spacing[j]] = '\0';
}
- cli_out ("%s %s %s %s %s %s %s %s %s %s %s %s",
+ cli_out ("%s %s %s %s %s %s %s %s %s %s %s %s %s %s %s %s",
output_values[0], output_values[1],
output_values[2], output_values[3],
output_values[4], output_values[5],
output_values[6], output_values[7],
output_values[8], output_values[9],
- output_values[10], output_values[11]);
+ output_values[10], output_values[11],
+ output_values[12], output_values[13],
+ output_values[14], output_values[15]);
/* setting and printing the hyphens */
memset (hyphens, '-', total_spacing);
@@ -4527,13 +4535,15 @@ gf_cli_print_status (char **title_values,
output_values[j][spacing[j]] = '\0';
}
- cli_out ("%s %s %s %s %s %s %s %s %s %s %s %s",
+ cli_out ("%s %s %s %s %s %s %s %s %s %s %s %s %s %s %s %s",
output_values[0], output_values[1],
output_values[2], output_values[3],
output_values[4], output_values[5],
output_values[6], output_values[7],
output_values[8], output_values[9],
- output_values[10], output_values[11]);
+ output_values[10], output_values[11],
+ output_values[12], output_values[13],
+ output_values[14], output_values[15]);
}
out:
@@ -4552,6 +4562,23 @@ out:
}
int
+gf_gsync_status_t_comparator (const void *p, const void *q)
+{
+ char *slavekey1 = NULL;
+ char *slavekey2 = NULL;
+
+ slavekey1 = get_struct_variable (20, (*(gf_gsync_status_t **)p));
+ slavekey2 = get_struct_variable (20, (*(gf_gsync_status_t **)q));
+ if (!slavekey1 || !slavekey2) {
+ gf_log ("cli", GF_LOG_ERROR,
+ "struct member empty.");
+ return 0;
+ }
+
+ return strcmp (slavekey1, slavekey2);
+}
+
+int
gf_cli_read_status_data (dict_t *dict,
gf_gsync_status_t **sts_vals,
int *spacing, int gsync_count,
@@ -4586,6 +4613,11 @@ gf_cli_read_status_data (dict_t *dict,
}
}
+ /* Sort based on Session Slave */
+ qsort(sts_vals, gsync_count,
+ sizeof(gf_gsync_status_t *),
+ gf_gsync_status_t_comparator);
+
out:
return ret;
}
@@ -4596,18 +4628,20 @@ gf_cli_gsync_status_output (dict_t *dict, gf_boolean_t is_detail)
int gsync_count = 0;
int i = 0;
int ret = 0;
- int spacing[13] = {0};
- int num_of_fields = 13;
+ int spacing[16] = {0};
+ int num_of_fields = 16;
char errmsg[1024] = "";
char *master = NULL;
char *slave = NULL;
char *title_values[] = {"MASTER NODE", "MASTER VOL",
"MASTER BRICK", "SLAVE USER",
- "SLAVE",
- "STATUS", "CHECKPOINT STATUS",
- "CRAWL STATUS", "FILES SYNCD",
- "FILES PENDING", "BYTES PENDING",
- "DELETES PENDING", "FILES SKIPPED"};
+ "SLAVE", "SLAVE NODE",
+ "STATUS", "CRAWL STATUS",
+ "LAST_SYNCED", "ENTRY",
+ "DATA", "META", "FAILURES",
+ "CHECKPOINT TIME",
+ "CHECKPOINT COMPLETED",
+ "CHECKPOINT COMPLETION TIME"};
gf_gsync_status_t **sts_vals = NULL;
/* Checks if any session is active or not */
diff --git a/cli/src/cli-xml-output.c b/cli/src/cli-xml-output.c
index cbb4c1f..d7322d5 100644
--- a/cli/src/cli-xml-output.c
+++ b/cli/src/cli-xml-output.c
@@ -3839,25 +3839,6 @@ out:
#if (HAVE_LIB_XML)
int
-gf_gsync_status_t_comparator (const void *p, const void *q)
-{
- char *master1 = NULL;
- char *master2 = NULL;
-
- master1 = get_struct_variable (1, (*(gf_gsync_status_t **)p));
- master2 = get_struct_variable (1, (*(gf_gsync_status_t **)q));
- if (!master1 || !master2) {
- gf_log ("cli", GF_LOG_ERROR,
- "struct member empty.");
- return 0;
- }
-
- return strcmp (master1,master2);
-}
-#endif
-
-#if (HAVE_LIB_XML)
-int
cli_xml_output_vol_gsync_status (dict_t *dict,
xmlTextWriterPtr writer)
{
@@ -3865,8 +3846,7 @@ cli_xml_output_vol_gsync_status (dict_t *dict,
int i = 1;
int j = 0;
int count = 0;
- const int number_of_fields = 13;
- const int number_of_basic_fields = 8;
+ const int number_of_fields = 20;
int closed = 1;
int session_closed = 1;
gf_gsync_status_t **status_values = NULL;
@@ -3878,18 +3858,31 @@ cli_xml_output_vol_gsync_status (dict_t *dict,
char *slave = NULL;
char *slave_next = NULL;
char *title_values[] = {"master_node",
- "master_node_uuid",
+ "",
"master_brick",
"slave_user",
"slave",
+ "slave_node",
"status",
- "checkpoint_status",
"crawl_status",
- "files_syncd",
- "files_pending",
- "bytes_pending",
- "deletes_pending",
- "files_skipped"};
+ /* last_synced */
+ "",
+ "entry",
+ "data",
+ "meta",
+ "failures",
+ /* checkpoint_time */
+ "",
+ "checkpoint_completed",
+ /* checkpoint_completion_time */
+ "",
+ "master_node_uuid",
+ /* last_synced_utc */
+ "last_synced",
+ /* checkpoint_time_utc */
+ "checkpoint_time",
+ /* checkpoint_completion_time_utc */
+ "checkpoint_completion_time"};
GF_ASSERT (dict);
@@ -3963,7 +3956,7 @@ cli_xml_output_vol_gsync_status (dict_t *dict,
session_closed = 0;
- tmp = get_struct_variable (15, status_values[i]);
+ tmp = get_struct_variable (21, status_values[i]);
if (!tmp) {
gf_log ("cli", GF_LOG_ERROR,
"struct member empty.");
@@ -3980,18 +3973,11 @@ cli_xml_output_vol_gsync_status (dict_t *dict,
XML_RET_CHECK_AND_GOTO (ret, out);
for (j = 0; j < number_of_fields; j++) {
- // if detail option is not set and field is not under
- // basic fields or if field is volume then skip
- if(!status_detail && j >= number_of_basic_fields)
+ /* XML ignore fields */
+ if (strcmp(title_values[j], "") == 0)
continue;
- // Displaying the master_node uuid as second field
-
- if (j == 1)
- tmp = get_struct_variable (13,
- status_values[i]);
- else
- tmp = get_struct_variable (j, status_values[i]);
+ tmp = get_struct_variable (j, status_values[i]);
if (!tmp) {
gf_log ("cli", GF_LOG_ERROR,
"struct member empty.");
@@ -4009,8 +3995,8 @@ cli_xml_output_vol_gsync_status (dict_t *dict,
XML_RET_CHECK_AND_GOTO (ret, out);
if (i+1 < count) {
- slave = get_struct_variable (14, status_values[i]);
- slave_next = get_struct_variable (14,
+ slave = get_struct_variable (20, status_values[i]);
+ slave_next = get_struct_variable (20,
status_values[i+1]);
volume = get_struct_variable (1, status_values[i]);
volume_next = get_struct_variable (1,
diff --git a/cli/src/cli.h b/cli/src/cli.h
index c0750f2..2439352 100644
--- a/cli/src/cli.h
+++ b/cli/src/cli.h
@@ -449,4 +449,7 @@ print_quota_list_header (int type);
void
print_quota_list_empty (char *path, int type);
+int
+gf_gsync_status_t_comparator (const void *p, const void *q);
+
#endif /* __CLI_H__ */
diff --git a/geo-replication/syncdaemon/Makefile.am b/geo-replication/syncdaemon/Makefile.am
index 885963e..ed0f5e4 100644
--- a/geo-replication/syncdaemon/Makefile.am
+++ b/geo-replication/syncdaemon/Makefile.am
@@ -2,6 +2,7 @@ syncdaemondir = $(libexecdir)/glusterfs/python/syncdaemon
syncdaemon_PYTHON = gconf.py gsyncd.py __init__.py master.py README.md repce.py \
resource.py configinterface.py syncdutils.py monitor.py libcxattr.py \
- $(top_builddir)/contrib/ipaddr-py/ipaddr.py libgfchangelog.py changelogagent.py
+ $(top_builddir)/contrib/ipaddr-py/ipaddr.py libgfchangelog.py changelogagent.py \
+ gsyncdstatus.py
CLEANFILES =
diff --git a/geo-replication/syncdaemon/gsyncd.py b/geo-replication/syncdaemon/gsyncd.py
index b9ee5ae..32e4eb7 100644
--- a/geo-replication/syncdaemon/gsyncd.py
+++ b/geo-replication/syncdaemon/gsyncd.py
@@ -27,12 +27,13 @@ from ipaddr import IPAddress, IPNetwork
from gconf import gconf
from syncdutils import FreeObject, norm, grabpidfile, finalize
-from syncdutils import log_raise_exception, privileged, update_file
+from syncdutils import log_raise_exception, privileged
from syncdutils import GsyncdError, select, set_term_handler
from configinterface import GConffile, upgrade_config_file
import resource
from monitor import monitor
from changelogagent import agent, Changelog
+from gsyncdstatus import set_monitor_status, GeorepStatus
class GLogger(Logger):
@@ -267,7 +268,7 @@ def main_i():
op.add_option('--socketdir', metavar='DIR')
op.add_option('--state-socket-unencoded', metavar='SOCKF',
type=str, action='callback', callback=store_abs)
- op.add_option('--checkpoint', metavar='LABEL', default='')
+ op.add_option('--checkpoint', metavar='LABEL', default='0')
# tunables for failover/failback mechanism:
# None - gsyncd behaves as normal
@@ -315,6 +316,8 @@ def main_i():
action='callback', callback=store_local)
op.add_option('--delete', dest='delete', action='callback',
callback=store_local_curry(True))
+ op.add_option('--status-get', dest='status_get', action='callback',
+ callback=store_local_curry(True))
op.add_option('--debug', dest="go_daemon", action='callback',
callback=lambda *a: (store_local_curry('dont')(*a),
setattr(
@@ -583,15 +586,8 @@ def main_i():
GLogger._gsyncd_loginit(log_file=gconf.log_file, label='conf')
if confdata.op == 'set':
logging.info('checkpoint %s set' % confdata.val)
- gcnf.delete('checkpoint_completed')
- gcnf.delete('checkpoint_target')
elif confdata.op == 'del':
logging.info('checkpoint info was reset')
- # if it is removing 'checkpoint' then we need
- # to remove 'checkpoint_completed' and 'checkpoint_target' too
- gcnf.delete('checkpoint_completed')
- gcnf.delete('checkpoint_target')
-
except IOError:
if sys.exc_info()[1].errno == ENOENT:
# directory of log path is not present,
@@ -607,7 +603,7 @@ def main_i():
create = rconf.get('create')
if create:
if getattr(gconf, 'state_file', None):
- update_file(gconf.state_file, lambda f: f.write(create + '\n'))
+ set_monitor_status(gconf.state_file, create)
return
go_daemon = rconf['go_daemon']
@@ -615,6 +611,15 @@ def main_i():
be_agent = rconf.get('agent')
rscs, local, remote = makersc(args)
+
+ status_get = rconf.get('status_get')
+ if status_get:
+ for brick in gconf.path:
+ brick_status = GeorepStatus(gconf.state_file, brick)
+ checkpoint_time = int(getattr(gconf, "checkpoint", "0"))
+ brick_status.print_status(checkpoint_time=checkpoint_time)
+ return
+
if not be_monitor and isinstance(remote, resource.SSH) and \
go_daemon == 'should':
go_daemon = 'postconn'
diff --git a/geo-replication/syncdaemon/gsyncdstatus.py b/geo-replication/syncdaemon/gsyncdstatus.py
new file mode 100644
index 0000000..a49b9c2
--- /dev/null
+++ b/geo-replication/syncdaemon/gsyncdstatus.py
@@ -0,0 +1,317 @@
+#!/usr/bin/env python
+#
+# Copyright (c) 2011-2014 Red Hat, Inc. <http://www.redhat.com>
+# This file is part of GlusterFS.
+
+# This file is licensed to you under your choice of the GNU Lesser
+# General Public License, version 3 or any later version (LGPLv3 or
+# later), or the GNU General Public License, version 2 (GPLv2), in all
+# cases as published by the Free Software Foundation.
+#
+
+import fcntl
+import os
+import tempfile
+import urllib
+import json
+import time
+from datetime import datetime
+
+DEFAULT_STATUS = "N/A"
+MONITOR_STATUS = ("Created", "Started", "Paused", "Stopped")
+STATUS_VALUES = (DEFAULT_STATUS,
+ "Initializing...",
+ "Active",
+ "Passive",
+ "Faulty")
+
+CRAWL_STATUS_VALUES = (DEFAULT_STATUS,
+ "Hybrid Crawl",
+ "History Crawl",
+ "Changelog Crawl")
+
+
+def human_time(ts):
+ try:
+ return datetime.fromtimestamp(float(ts)).strftime("%Y-%m-%d %H:%M:%S")
+ except ValueError:
+ return DEFAULT_STATUS
+
+
+def human_time_utc(ts):
+ try:
+ return datetime.utcfromtimestamp(
+ float(ts)).strftime("%Y-%m-%d %H:%M:%S")
+ except ValueError:
+ return DEFAULT_STATUS
+
+
+def get_default_values():
+ return {
+ "slave_node": DEFAULT_STATUS,
+ "worker_status": DEFAULT_STATUS,
+ "last_synced": 0,
+ "crawl_status": DEFAULT_STATUS,
+ "entry": 0,
+ "data": 0,
+ "meta": 0,
+ "failures": 0,
+ "checkpoint_completed": DEFAULT_STATUS,
+ "checkpoint_time": 0,
+ "checkpoint_completion_time": 0}
+
+
+class LockedOpen(object):
+
+ def __init__(self, filename, *args, **kwargs):
+ self.filename = filename
+ self.open_args = args
+ self.open_kwargs = kwargs
+ self.fileobj = None
+
+ def __enter__(self):
+ """
+ If two processes compete to update a file, The first process
+ gets the lock and the second process is blocked in the fcntl.flock()
+ call. When first process replaces the file and releases the lock,
+ the already open file descriptor in the second process now points
+ to a "ghost" file(not reachable by any path name) with old contents.
+ To avoid that conflict, check the fd already opened is same or
+ not. Open new one if not same
+ """
+ f = open(self.filename, *self.open_args, **self.open_kwargs)
+ while True:
+ fcntl.flock(f, fcntl.LOCK_EX)
+ fnew = open(self.filename, *self.open_args, **self.open_kwargs)
+ if os.path.sameopenfile(f.fileno(), fnew.fileno()):
+ fnew.close()
+ break
+ else:
+ f.close()
+ f = fnew
+ self.fileobj = f
+ return f
+
+ def __exit__(self, _exc_type, _exc_value, _traceback):
+ self.fileobj.close()
+
+
+def set_monitor_status(status_file, status):
+ fd = os.open(status_file, os.O_CREAT | os.O_RDWR)
+ os.close(fd)
+ with LockedOpen(status_file, 'r+'):
+ with tempfile.NamedTemporaryFile('w', dir=os.path.dirname(status_file),
+ delete=False) as tf:
+ tf.write(status)
+ tempname = tf.name
+
+ os.rename(tempname, status_file)
+ dirfd = os.open(os.path.dirname(os.path.abspath(status_file)),
+ os.O_DIRECTORY)
+ os.fsync(dirfd)
+ os.close(dirfd)
+
+
+class GeorepStatus(object):
+ def __init__(self, monitor_status_file, brick):
+ self.work_dir = os.path.dirname(monitor_status_file)
+ self.monitor_status_file = monitor_status_file
+ self.filename = os.path.join(self.work_dir,
+ "brick_%s.status"
+ % urllib.quote_plus(brick))
+
+ fd = os.open(self.filename, os.O_CREAT | os.O_RDWR)
+ os.close(fd)
+ fd = os.open(self.monitor_status_file, os.O_CREAT | os.O_RDWR)
+ os.close(fd)
+ self.brick = brick
+ self.default_values = get_default_values()
+
+ def _update(self, mergerfunc):
+ with LockedOpen(self.filename, 'r+') as f:
+ try:
+ data = json.load(f)
+ except ValueError:
+ data = self.default_values
+
+ data = mergerfunc(data)
+ with tempfile.NamedTemporaryFile(
+ 'w',
+ dir=os.path.dirname(self.filename),
+ delete=False) as tf:
+ tf.write(data)
+ tempname = tf.name
+
+ os.rename(tempname, self.filename)
+ dirfd = os.open(os.path.dirname(os.path.abspath(self.filename)),
+ os.O_DIRECTORY)
+ os.fsync(dirfd)
+ os.close(dirfd)
+
+ def reset_on_worker_start(self):
+ def merger(data):
+ data["slave_node"] = DEFAULT_STATUS
+ data["crawl_status"] = DEFAULT_STATUS
+ data["entry"] = 0
+ data["data"] = 0
+ data["meta"] = 0
+ return json.dumps(data)
+
+ self._update(merger)
+
+ def set_field(self, key, value):
+ def merger(data):
+ data[key] = value
+ return json.dumps(data)
+
+ self._update(merger)
+
+ def set_last_synced(self, value, checkpoint_time):
+ def merger(data):
+ data["last_synced"] = value[0]
+
+ # If checkpoint is not set or reset
+ # or if last set checkpoint is changed
+ if checkpoint_time == 0 or \
+ checkpoint_time != data["checkpoint_time"]:
+ data["checkpoint_time"] = 0
+ data["checkpoint_completion_time"] = 0
+ data["checkpoint_completed"] = "No"
+
+ # If checkpoint is completed and not marked as completed
+ # previously then update the checkpoint completed time
+ if checkpoint_time > 0 and checkpoint_time <= value[0]:
+ if data["checkpoint_completed"] == "No":
+ data["checkpoint_time"] = checkpoint_time
+ data["checkpoint_completion_time"] = int(time.time())
+ data["checkpoint_completed"] = "Yes"
+ return json.dumps(data)
+
+ self._update(merger)
+
+ def set_worker_status(self, status):
+ self.set_field("worker_status", status)
+
+ def set_worker_crawl_status(self, status):
+ self.set_field("crawl_status", status)
+
+ def set_slave_node(self, slave_node):
+ def merger(data):
+ data["slave_node"] = slave_node
+ return json.dumps(data)
+
+ self._update(merger)
+
+ def inc_value(self, key, value):
+ def merger(data):
+ data[key] = data.get(key, 0) + value
+ return json.dumps(data)
+
+ self._update(merger)
+
+ def dec_value(self, key, value):
+ def merger(data):
+ data[key] = data.get(key, 0) - value
+ if data[key] < 0:
+ data[key] = 0
+ return json.dumps(data)
+
+ self._update(merger)
+
+ def set_active(self):
+ self.set_field("worker_status", "Active")
+
+ def set_passive(self):
+ self.set_field("worker_status", "Passive")
+
+ def get_monitor_status(self):
+ data = ""
+ with open(self.monitor_status_file, "r") as f:
+ data = f.read().strip()
+ return data
+
+ def get_status(self, checkpoint_time=0):
+ """
+ Monitor Status ---> Created Started Paused Stopped
+ ----------------------------------------------------------------------
+ slave_node N/A VALUE VALUE N/A
+ status Created VALUE Paused Stopped
+ last_synced N/A VALUE VALUE VALUE
+ crawl_status N/A VALUE N/A N/A
+ entry N/A VALUE N/A N/A
+ data N/A VALUE N/A N/A
+ meta N/A VALUE N/A N/A
+ failures N/A VALUE VALUE VALUE
+ checkpoint_completed N/A VALUE VALUE VALUE
+ checkpoint_time N/A VALUE VALUE VALUE
+ checkpoint_completed_time N/A VALUE VALUE VALUE
+ """
+ data = self.default_values
+ with open(self.filename) as f:
+ try:
+ data.update(json.load(f))
+ except ValueError:
+ pass
+ monitor_status = self.get_monitor_status()
+
+ if monitor_status in ["Created", "Paused", "Stopped"]:
+ data["worker_status"] = monitor_status
+
+ # Checkpoint adjustments
+ if checkpoint_time == 0:
+ data["checkpoint_completed"] = DEFAULT_STATUS
+ data["checkpoint_time"] = DEFAULT_STATUS
+ data["checkpoint_completion_time"] = DEFAULT_STATUS
+ else:
+ if checkpoint_time != data["checkpoint_time"]:
+ if checkpoint_time <= data["last_synced"]:
+ data["checkpoint_completed"] = "Yes"
+ data["checkpoint_time"] = checkpoint_time
+ data["checkpoint_completion_time"] = data["last_synced"]
+ else:
+ data["checkpoint_completed"] = "No"
+ data["checkpoint_time"] = checkpoint_time
+ data["checkpoint_completion_time"] = DEFAULT_STATUS
+
+ if data["checkpoint_time"] not in [0, DEFAULT_STATUS]:
+ chkpt_time = data["checkpoint_time"]
+ data["checkpoint_time"] = human_time(chkpt_time)
+ data["checkpoint_time_utc"] = human_time_utc(chkpt_time)
+
+ if data["checkpoint_completion_time"] not in [0, DEFAULT_STATUS]:
+ chkpt_completion_time = data["checkpoint_completion_time"]
+ data["checkpoint_completion_time"] = human_time(
+ chkpt_completion_time)
+ data["checkpoint_completion_time_utc"] = human_time_utc(
+ chkpt_completion_time)
+
+ if data["last_synced"] == 0:
+ data["last_synced"] = DEFAULT_STATUS
+ data["last_synced_utc"] = DEFAULT_STATUS
+ else:
+ last_synced = data["last_synced"]
+ data["last_synced"] = human_time(last_synced)
+ data["last_synced_utc"] = human_time_utc(last_synced)
+
+ if data["worker_status"] != "Active":
+ data["last_synced"] = DEFAULT_STATUS
+ data["last_synced_utc"] = DEFAULT_STATUS
+ data["crawl_status"] = DEFAULT_STATUS
+ data["entry"] = DEFAULT_STATUS
+ data["data"] = DEFAULT_STATUS
+ data["meta"] = DEFAULT_STATUS
+ data["failures"] = DEFAULT_STATUS
+ data["checkpoint_completed"] = DEFAULT_STATUS
+ data["checkpoint_time"] = DEFAULT_STATUS
+ data["checkpoint_completed_time"] = DEFAULT_STATUS
+ data["checkpoint_time_utc"] = DEFAULT_STATUS
+ data["checkpoint_completion_time_utc"] = DEFAULT_STATUS
+
+ if data["worker_status"] not in ["Active", "Passive"]:
+ data["slave_node"] = DEFAULT_STATUS
+
+ return data
+
+ def print_status(self, checkpoint_time=0):
+ for key, value in self.get_status(checkpoint_time).items():
+ print ("%s: %s" % (key, value))
diff --git a/geo-replication/syncdaemon/master.py b/geo-replication/syncdaemon/master.py
index 3853588..8e4c430 100644
--- a/geo-replication/syncdaemon/master.py
+++ b/geo-replication/syncdaemon/master.py
@@ -15,17 +15,15 @@ import stat
import json
import logging
import fcntl
-import socket
import string
import errno
import tarfile
-from errno import ENOENT, ENODATA, EPIPE, EEXIST, EACCES, EAGAIN
+from errno import ENOENT, ENODATA, EEXIST, EACCES, EAGAIN
from threading import Condition, Lock
from datetime import datetime
from gconf import gconf
-from tempfile import NamedTemporaryFile
from syncdutils import Thread, GsyncdError, boolify, escape
-from syncdutils import unescape, select, gauxpfx, md5hex, selfkill
+from syncdutils import unescape, gauxpfx, md5hex, selfkill
from syncdutils import lstat, errno_wrap
from syncdutils import NoPurgeTimeAvailable, PartialHistoryAvailable
@@ -397,18 +395,6 @@ class GMasterCommon(object):
raise
return default_data
- def update_crawl_data(self):
- if getattr(gconf, 'state_detail_file', None):
- try:
- same_dir = os.path.dirname(gconf.state_detail_file)
- with NamedTemporaryFile(dir=same_dir, delete=False) as tmp:
- json.dump(self.total_crawl_stats, tmp)
- tmp.flush()
- os.fsync(tmp.fileno())
- os.rename(tmp.name, gconf.state_detail_file)
- except (IOError, OSError):
- raise
-
def __init__(self, master, slave):
self.master = master
self.slave = slave
@@ -434,14 +420,12 @@ class GMasterCommon(object):
self.total_turns = int(gconf.turns)
self.crawl_start = datetime.now()
self.lastreport = {'crawls': 0, 'turns': 0, 'time': 0}
- self.total_crawl_stats = None
self.start = None
self.change_seen = None
# the actual volinfo we make use of
self.volinfo = None
self.terminate = False
self.sleep_interval = 1
- self.checkpoint_thread = None
self.current_files_skipped_count = 0
self.skipped_gfid_list = []
self.unlinked_gfids = []
@@ -493,7 +477,6 @@ class GMasterCommon(object):
logging.debug("Got the lock")
return True
-
def should_crawl(self):
if not gconf.use_meta_volume:
return gconf.glusterd_uuid in self.master.server.node_uuid()
@@ -503,7 +486,6 @@ class GMasterCommon(object):
sys.exit(1)
return self.mgmt_lock()
-
def register(self):
self.register()
@@ -542,10 +524,8 @@ class GMasterCommon(object):
if self.volinfo['retval']:
logging.warn("master cluster's info may not be valid %d" %
self.volinfo['retval'])
- self.start_checkpoint_thread()
else:
raise GsyncdError("master volinfo unavailable")
- self.total_crawl_stats = self.get_initial_crawl_data()
self.lastreport['time'] = time.time()
logging.info('crawl interval: %d seconds' % self.sleep_interval)
@@ -570,7 +550,7 @@ class GMasterCommon(object):
t0 = t1
self.update_worker_remote_node()
if not crawl:
- self.update_worker_health("Passive")
+ self.status.set_passive()
# bring up _this_ brick to the cluster stime
# which is min of cluster (but max of the replicas)
brick_stime = self.xtime('.', self.slave)
@@ -597,35 +577,14 @@ class GMasterCommon(object):
time.sleep(5)
continue
- self.update_worker_health("Active")
+
+ self.status.set_active()
self.crawl()
+
if oneshot:
return
time.sleep(self.sleep_interval)
- @classmethod
- def _checkpt_param(cls, chkpt, prm, xtimish=True):
- """use config backend to lookup a parameter belonging to
- checkpoint @chkpt"""
- cprm = gconf.configinterface.get_realtime('checkpoint_' + prm)
- if not cprm:
- return
- chkpt_mapped, val = cprm.split(':', 1)
- if unescape(chkpt_mapped) != chkpt:
- return
- if xtimish:
- val = cls.deserialize_xtime(val)
- return val
-
- @classmethod
- def _set_checkpt_param(cls, chkpt, prm, val, xtimish=True):
- """use config backend to store a parameter associated
- with checkpoint @chkpt"""
- if xtimish:
- val = cls.serialize_xtime(val)
- gconf.configinterface.set(
- 'checkpoint_' + prm, "%s:%s" % (escape(chkpt), val))
-
@staticmethod
def humantime(*tpair):
"""format xtime-like (sec, nsec) pair to human readable format"""
@@ -654,116 +613,6 @@ class GMasterCommon(object):
string.zfill(m, 2), string.zfill(s, 2))
return date
- def checkpt_service(self, chan, chkpt):
- """checkpoint service loop
-
- monitor and verify checkpoint status for @chkpt, and listen
- for incoming requests for whom we serve a pretty-formatted
- status report"""
- while True:
- chkpt = gconf.configinterface.get_realtime("checkpoint")
- if not chkpt:
- gconf.configinterface.delete("checkpoint_completed")
- gconf.configinterface.delete("checkpoint_target")
- # dummy loop for the case when there is no checkpt set
- select([chan], [], [])
- conn, _ = chan.accept()
- conn.send('\0')
- conn.close()
- continue
-
- checkpt_tgt = self._checkpt_param(chkpt, 'target')
- if not checkpt_tgt:
- checkpt_tgt = self.xtime('.')
- if isinstance(checkpt_tgt, int):
- raise GsyncdError("master root directory is "
- "unaccessible (%s)",
- os.strerror(checkpt_tgt))
- self._set_checkpt_param(chkpt, 'target', checkpt_tgt)
- logging.debug("checkpoint target %s has been determined "
- "for checkpoint %s" %
- (repr(checkpt_tgt), chkpt))
-
- # check if the label is 'now'
- chkpt_lbl = chkpt
- try:
- x1, x2 = chkpt.split(':')
- if x1 == 'now':
- chkpt_lbl = "as of " + self.humantime(x2)
- except:
- pass
- completed = self._checkpt_param(chkpt, 'completed', xtimish=False)
- if completed:
- completed = tuple(int(x) for x in completed.split('.'))
- s, _, _ = select([chan], [], [], (not completed) and 5 or None)
- # either request made and we re-check to not
- # give back stale data, or we still hunting for completion
- if (self.native_xtime(checkpt_tgt) and (
- self.native_xtime(checkpt_tgt) < self.volmark)):
- # indexing has been reset since setting the checkpoint
- status = "is invalid"
- else:
- xtr = self.xtime('.', self.slave)
- if isinstance(xtr, int):
- raise GsyncdError("slave root directory is "
- "unaccessible (%s)",
- os.strerror(xtr))
- ncompleted = self.xtime_geq(xtr, checkpt_tgt)
- if completed and not ncompleted: # stale data
- logging.warn("completion time %s for checkpoint %s "
- "became stale" %
- (self.humantime(*completed), chkpt))
- completed = None
- gconf.configinterface.delete('checkpoint_completed')
- if ncompleted and not completed: # just reaching completion
- completed = "%.6f" % time.time()
- self._set_checkpt_param(
- chkpt, 'completed', completed, xtimish=False)
- completed = tuple(int(x) for x in completed.split('.'))
- logging.info("checkpoint %s completed" % chkpt)
- status = completed and \
- "completed at " + self.humantime(completed[0]) or \
- "not reached yet"
- if s:
- conn = None
- try:
- conn, _ = chan.accept()
- try:
- conn.send("checkpoint %s is %s\0" %
- (chkpt_lbl, status))
- except:
- exc = sys.exc_info()[1]
- if ((isinstance(exc, OSError) or isinstance(
- exc, IOError)) and exc.errno == EPIPE):
- logging.debug('checkpoint client disconnected')
- else:
- raise
- finally:
- if conn:
- conn.close()
-
- def start_checkpoint_thread(self):
- """prepare and start checkpoint service"""
- if self.checkpoint_thread or not (
- getattr(gconf, 'state_socket_unencoded', None) and getattr(
- gconf, 'socketdir', None)
- ):
- return
- chan = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
- state_socket = os.path.join(
- gconf.socketdir, md5hex(gconf.state_socket_unencoded) + ".socket")
- try:
- os.unlink(state_socket)
- except:
- if sys.exc_info()[0] == OSError:
- pass
- chan.bind(state_socket)
- chan.listen(1)
- chkpt = gconf.configinterface.get_realtime("checkpoint")
- t = Thread(target=self.checkpt_service, args=(chan, chkpt))
- t.start()
- self.checkpoint_thread = t
-
def add_job(self, path, label, job, *a, **kw):
"""insert @job function to job table at @path with @label"""
if self.jobtab.get(path) is None:
@@ -937,11 +786,15 @@ class GMasterChangelogMixin(GMasterCommon):
files_pending['purge'] += 1
def log_failures(failures, entry_key, gfid_prefix, log_prefix):
+ num_failures = 0
for failure in failures:
st = lstat(os.path.join(gfid_prefix, failure[0][entry_key]))
if not isinstance(st, int):
+ num_failures += 1
logging.warn('%s FAILED: %s' % (log_prefix, repr(failure)))
+ self.status.inc_value("failures", num_failures)
+
for e in clist:
e = e.strip()
et = e[self.IDX_START:self.IDX_END] # entry type
@@ -1040,12 +893,18 @@ class GMasterChangelogMixin(GMasterCommon):
else:
logging.warn('got invalid changelog type: %s' % (et))
logging.debug('entries: %s' % repr(entries))
- if not retry:
- self.update_worker_cumilitive_status(files_pending)
+
+ # Increment counters for Status
+ self.status.inc_value("entry", len(entries))
+ self.files_in_batch = len(datas)
+ self.status.inc_value("data", self.files_in_batch)
+
# sync namespace
if entries:
failures = self.slave.server.entry_ops(entries)
log_failures(failures, 'gfid', gauxpfx(), 'ENTRY')
+ self.status.dec_value("entry", len(entries))
+
# sync metadata
if meta_gfid:
meta_entries = []
@@ -1059,8 +918,11 @@ class GMasterChangelogMixin(GMasterCommon):
continue
meta_entries.append(edct('META', go=go[0], stat=st))
if meta_entries:
+ self.status.inc_value("meta", len(entries))
failures = self.slave.server.meta_ops(meta_entries)
log_failures(failures, 'go', '', 'META')
+ self.status.dec_value("meta", len(entries))
+
# sync data
if datas:
self.a_syncdata(datas)
@@ -1112,9 +974,17 @@ class GMasterChangelogMixin(GMasterCommon):
if done:
xtl = (int(change.split('.')[-1]) - 1, 0)
self.upd_stime(xtl)
+ chkpt_time = gconf.configinterface.get_realtime(
+ "checkpoint")
+ checkpoint_time = 0
+ if chkpt_time is not None:
+ checkpoint_time = int(chkpt_time)
+
+ self.status.set_last_synced(xtl, checkpoint_time)
map(self.changelog_done_func, changes)
self.archive_and_purge_changelogs(changes)
- self.update_worker_files_syncd()
+ self.status.dec_value("data", self.files_in_batch)
+ self.files_in_batch = 0
break
# We do not know which changelog transfer failed, retry everything.
@@ -1124,14 +994,22 @@ class GMasterChangelogMixin(GMasterCommon):
logging.warn('changelogs %s could not be processed - '
'moving on...' %
' '.join(map(os.path.basename, changes)))
- self.update_worker_total_files_skipped(
- self.current_files_skipped_count)
+ self.status.inc_value("failures",
+ self.current_files_skipped_count)
logging.warn('SKIPPED GFID = %s' %
','.join(self.skipped_gfid_list))
- self.update_worker_files_syncd()
+
+ self.files_in_batch = 0
if done:
xtl = (int(change.split('.')[-1]) - 1, 0)
self.upd_stime(xtl)
+ chkpt_time = gconf.configinterface.get_realtime(
+ "checkpoint")
+ checkpoint_time = 0
+ if chkpt_time is not None:
+ checkpoint_time = int(chkpt_time)
+
+ self.status.set_last_synced(xtl, checkpoint_time)
map(self.changelog_done_func, changes)
self.archive_and_purge_changelogs(changes)
break
@@ -1152,161 +1030,12 @@ class GMasterChangelogMixin(GMasterCommon):
if not stime == URXTIME:
self.sendmark(path, stime)
- def get_worker_status_file(self):
- file_name = gconf.local_path + '.status'
- file_name = file_name.replace("/", "_")
- worker_status_file = gconf.georep_session_working_dir + file_name
- return worker_status_file
-
- def update_worker_status(self, key, value):
- default_data = {"remote_node": "N/A",
- "worker status": "Not Started",
- "crawl status": "N/A",
- "files_syncd": 0,
- "files_remaining": 0,
- "bytes_remaining": 0,
- "purges_remaining": 0,
- "total_files_skipped": 0}
- worker_status_file = self.get_worker_status_file()
- try:
- with open(worker_status_file, 'r+') as f:
- loaded_data = json.load(f)
- loaded_data[key] = value
- os.ftruncate(f.fileno(), 0)
- os.lseek(f.fileno(), 0, os.SEEK_SET)
- json.dump(loaded_data, f)
- f.flush()
- os.fsync(f.fileno())
- except (IOError, OSError, ValueError):
- logging.info('Creating new %s' % worker_status_file)
- try:
- with open(worker_status_file, 'wb') as f:
- default_data[key] = value
- json.dump(default_data, f)
- f.flush()
- os.fsync(f.fileno())
- except:
- raise
-
- def update_worker_cumilitive_status(self, files_pending):
- default_data = {"remote_node": "N/A",
- "worker status": "Not Started",
- "crawl status": "N/A",
- "files_syncd": 0,
- "files_remaining": 0,
- "bytes_remaining": 0,
- "purges_remaining": 0,
- "total_files_skipped": 0}
- worker_status_file = self.get_worker_status_file()
- try:
- with open(worker_status_file, 'r+') as f:
- loaded_data = json.load(f)
- loaded_data['files_remaining'] = files_pending['count']
- loaded_data['bytes_remaining'] = files_pending['bytes']
- loaded_data['purges_remaining'] = files_pending['purge']
- os.ftruncate(f.fileno(), 0)
- os.lseek(f.fileno(), 0, os.SEEK_SET)
- json.dump(loaded_data, f)
- f.flush()
- os.fsync(f.fileno())
- except (IOError, OSError, ValueError):
- logging.info('Creating new %s' % worker_status_file)
- try:
- with open(worker_status_file, 'wb') as f:
- default_data['files_remaining'] = files_pending['count']
- default_data['bytes_remaining'] = files_pending['bytes']
- default_data['purges_remaining'] = files_pending['purge']
- json.dump(default_data, f)
- f.flush()
- os.fsync(f.fileno())
- except:
- raise
-
def update_worker_remote_node(self):
node = sys.argv[-1]
- node = node.split("@")[-1]
+ node_data = node.split("@")
+ node = node_data[-1]
remote_node_ip = node.split(":")[0]
- remote_node_vol = node.split(":")[3]
- remote_node = remote_node_ip + '::' + remote_node_vol
- self.update_worker_status('remote_node', remote_node)
-
- def update_worker_health(self, state):
- self.update_worker_status('worker status', state)
-
- def update_worker_crawl_status(self, state):
- self.update_worker_status('crawl status', state)
-
- def update_worker_files_syncd(self):
- default_data = {"remote_node": "N/A",
- "worker status": "Not Started",
- "crawl status": "N/A",
- "files_syncd": 0,
- "files_remaining": 0,
- "bytes_remaining": 0,
- "purges_remaining": 0,
- "total_files_skipped": 0}
- worker_status_file = self.get_worker_status_file()
- try:
- with open(worker_status_file, 'r+') as f:
- loaded_data = json.load(f)
- loaded_data['files_syncd'] += loaded_data['files_remaining']
- loaded_data['files_remaining'] = 0
- loaded_data['bytes_remaining'] = 0
- loaded_data['purges_remaining'] = 0
- os.ftruncate(f.fileno(), 0)
- os.lseek(f.fileno(), 0, os.SEEK_SET)
- json.dump(loaded_data, f)
- f.flush()
- os.fsync(f.fileno())
- except (IOError, OSError, ValueError):
- logging.info('Creating new %s' % worker_status_file)
- try:
- with open(worker_status_file, 'wb') as f:
- json.dump(default_data, f)
- f.flush()
- os.fsync(f.fileno())
- except:
- raise
-
- def update_worker_files_remaining(self, state):
- self.update_worker_status('files_remaining', state)
-
- def update_worker_bytes_remaining(self, state):
- self.update_worker_status('bytes_remaining', state)
-
- def update_worker_purges_remaining(self, state):
- self.update_worker_status('purges_remaining', state)
-
- def update_worker_total_files_skipped(self, value):
- default_data = {"remote_node": "N/A",
- "worker status": "Not Started",
- "crawl status": "N/A",
- "files_syncd": 0,
- "files_remaining": 0,
- "bytes_remaining": 0,
- "purges_remaining": 0,
- "total_files_skipped": 0}
- worker_status_file = self.get_worker_status_file()
- try:
- with open(worker_status_file, 'r+') as f:
- loaded_data = json.load(f)
- loaded_data['total_files_skipped'] = value
- loaded_data['files_remaining'] -= value
- os.ftruncate(f.fileno(), 0)
- os.lseek(f.fileno(), 0, os.SEEK_SET)
- json.dump(loaded_data, f)
- f.flush()
- os.fsync(f.fileno())
- except (IOError, OSError, ValueError):
- logging.info('Creating new %s' % worker_status_file)
- try:
- with open(worker_status_file, 'wb') as f:
- default_data['total_files_skipped'] = value
- json.dump(default_data, f)
- f.flush()
- os.fsync(f.fileno())
- except:
- raise
+ self.status.set_slave_node(remote_node_ip)
def changelogs_batch_process(self, changes):
changelogs_batches = []
@@ -1331,7 +1060,7 @@ class GMasterChangelogMixin(GMasterCommon):
self.process(batch)
def crawl(self):
- self.update_worker_crawl_status("Changelog Crawl")
+ self.status.set_worker_crawl_status("Changelog Crawl")
changes = []
# get stime (from the brick) and purge changelogs
# that are _historical_ to that time.
@@ -1355,16 +1084,17 @@ class GMasterChangelogMixin(GMasterCommon):
self.changelogs_batch_process(changes)
- def register(self, register_time, changelog_agent):
+ def register(self, register_time, changelog_agent, status):
self.changelog_agent = changelog_agent
self.sleep_interval = int(gconf.change_interval)
self.changelog_done_func = self.changelog_agent.done
self.processed_changelogs_dir = os.path.join(self.setup_working_dir(),
".processed")
+ self.status = status
class GMasterChangeloghistoryMixin(GMasterChangelogMixin):
- def register(self, register_time, changelog_agent):
+ def register(self, register_time, changelog_agent, status):
self.changelog_agent = changelog_agent
self.changelog_register_time = register_time
self.history_crawl_start_time = register_time
@@ -1372,10 +1102,11 @@ class GMasterChangeloghistoryMixin(GMasterChangelogMixin):
self.history_turns = 0
self.processed_changelogs_dir = os.path.join(self.setup_working_dir(),
".history/.processed")
+ self.status = status
def crawl(self):
self.history_turns += 1
- self.update_worker_crawl_status("History Crawl")
+ self.status.set_worker_crawl_status("History Crawl")
purge_time = self.get_purge_time()
logging.info('starting history crawl... turns: %s, stime: %s'
@@ -1455,7 +1186,8 @@ class GMasterXsyncMixin(GMasterChangelogMixin):
XSYNC_MAX_ENTRIES = 1 << 13
- def register(self, register_time=None, changelog_agent=None):
+ def register(self, register_time=None, changelog_agent=None, status=None):
+ self.status = status
self.counter = 0
self.comlist = []
self.stimes = []
@@ -1486,7 +1218,7 @@ class GMasterXsyncMixin(GMasterChangelogMixin):
t.start()
logging.info('starting hybrid crawl..., stime: %s'
% repr(self.get_purge_time()))
- self.update_worker_crawl_status("Hybrid Crawl")
+ self.status.set_worker_crawl_status("Hybrid Crawl")
while True:
try:
item = self.comlist.pop(0)
diff --git a/geo-replication/syncdaemon/monitor.py b/geo-replication/syncdaemon/monitor.py
index 029726c..ba5c8e3 100644
--- a/geo-replication/syncdaemon/monitor.py
+++ b/geo-replication/syncdaemon/monitor.py
@@ -22,10 +22,12 @@ from errno import EEXIST
import re
import random
from gconf import gconf
-from syncdutils import update_file, select, waitpid
+from syncdutils import select, waitpid
from syncdutils import set_term_handler, is_host_local, GsyncdError
from syncdutils import escape, Thread, finalize, memoize
+from gsyncdstatus import GeorepStatus, set_monitor_status
+
ParseError = XET.ParseError if hasattr(XET, 'ParseError') else SyntaxError
@@ -125,46 +127,22 @@ class Volinfo(object):
def disperse_count(self):
return int(self.get('disperseCount')[0].text)
+
class Monitor(object):
"""class which spawns and manages gsyncd workers"""
ST_INIT = 'Initializing...'
- ST_STABLE = 'Stable'
- ST_FAULTY = 'faulty'
+ ST_STARTED = 'Started'
+ ST_STABLE = 'Active'
+ ST_FAULTY = 'Faulty'
ST_INCON = 'inconsistent'
_ST_ORD = [ST_STABLE, ST_INIT, ST_FAULTY, ST_INCON]
def __init__(self):
self.lock = Lock()
self.state = {}
-
- def set_state(self, state, w=None):
- """set the state that can be used by external agents
- like glusterd for status reporting"""
- computestate = lambda: self.state and self._ST_ORD[
- max(self._ST_ORD.index(s) for s in self.state.values())]
- if w:
- self.lock.acquire()
- old_state = computestate()
- self.state[w] = state
- state = computestate()
- self.lock.release()
- if state != old_state:
- self.set_state(state)
- else:
- if getattr(gconf, 'state_file', None):
- # If previous state is paused, suffix the
- # new state with '(Paused)'
- try:
- with open(gconf.state_file, "r") as f:
- content = f.read()
- if "paused" in content.lower():
- state = state + '(Paused)'
- except IOError:
- pass
- logging.info('new state: %s' % state)
- update_file(gconf.state_file, lambda f: f.write(state + '\n'))
+ self.status = {}
@staticmethod
def terminate():
@@ -174,8 +152,7 @@ class Monitor(object):
# give a chance to graceful exit
os.kill(-os.getpid(), signal.SIGTERM)
-
- def monitor(self, w, argv, cpids, agents, slave_vol, slave_host):
+ def monitor(self, w, argv, cpids, agents, slave_vol, slave_host, master):
"""the monitor loop
Basic logic is a blantantly simple blunt heuristics:
@@ -194,8 +171,11 @@ class Monitor(object):
blown worker blows up on EPIPE if the net goes down,
due to the keep-alive thread)
"""
+ if not self.status.get(w[0], None):
+ self.status[w[0]] = GeorepStatus(gconf.state_file, w[0])
- self.set_state(self.ST_INIT, w)
+ set_monitor_status(gconf.state_file, self.ST_STARTED)
+ self.status[w[0]].set_worker_status(self.ST_INIT)
ret = 0
@@ -310,7 +290,7 @@ class Monitor(object):
nwait(apid) #wait for agent
ret = nwait(cpid)
if ret is None:
- self.set_state(self.ST_STABLE, w)
+ self.status[w[0]].set_worker_status(self.ST_STABLE)
#If worker dies, agent terminates on EOF.
#So lets wait for agent first.
nwait(apid)
@@ -320,12 +300,12 @@ class Monitor(object):
else:
ret = exit_status(ret)
if ret in (0, 1):
- self.set_state(self.ST_FAULTY, w)
+ self.status[w[0]].set_worker_status(self.ST_FAULTY)
time.sleep(10)
- self.set_state(self.ST_INCON, w)
+ self.status[w[0]].set_worker_status(self.ST_INCON)
return ret
- def multiplex(self, wspx, suuid, slave_vol, slave_host):
+ def multiplex(self, wspx, suuid, slave_vol, slave_host, master):
argv = sys.argv[:]
for o in ('-N', '--no-daemon', '--monitor'):
while o in argv:
@@ -339,7 +319,7 @@ class Monitor(object):
for wx in wspx:
def wmon(w):
cpid, _ = self.monitor(w, argv, cpids, agents, slave_vol,
- slave_host)
+ slave_host, master)
time.sleep(1)
self.lock.acquire()
for cpid in cpids:
@@ -401,7 +381,7 @@ def distribute(*resources):
for idx, brick in enumerate(mvol.bricks)
if is_host_local(brick['host'])]
logging.info('worker specs: ' + repr(workerspex))
- return workerspex, suuid, slave_vol, slave_host
+ return workerspex, suuid, slave_vol, slave_host, master
def monitor(*resources):
diff --git a/geo-replication/syncdaemon/resource.py b/geo-replication/syncdaemon/resource.py
index d3d1ee3..6bf1ad0 100644
--- a/geo-replication/syncdaemon/resource.py
+++ b/geo-replication/syncdaemon/resource.py
@@ -38,6 +38,7 @@ from syncdutils import umask, entry2pb, gauxpfx, errno_wrap, lstat
from syncdutils import NoPurgeTimeAvailable, PartialHistoryAvailable
from syncdutils import ChangelogException
from syncdutils import CHANGELOG_AGENT_CLIENT_VERSION
+from gsyncdstatus import GeorepStatus
UrlRX = re.compile('\A(\w+)://([^ *?[]*)\Z')
@@ -611,6 +612,9 @@ class Server(object):
def collect_failure(e, cmd_ret):
# We do this for failing fops on Slave
# Master should be logging this
+ if cmd_ret is None:
+ return
+
if cmd_ret == EEXIST:
disk_gfid = cls.gfid_mnt(e['entry'])
if isinstance(disk_gfid, basestring):
@@ -1344,6 +1348,8 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):
os.close(int(ra))
os.close(int(wa))
changelog_agent = RepceClient(int(inf), int(ouf))
+ status = GeorepStatus(gconf.state_file, gconf.local_path)
+ status.reset_on_worker_start()
rv = changelog_agent.version()
if int(rv) != CHANGELOG_AGENT_CLIENT_VERSION:
raise GsyncdError(
@@ -1367,13 +1373,13 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):
g2.CHANGELOG_CONN_RETRIES)
register_time = int(time.time())
- g2.register(register_time, changelog_agent)
- g3.register(register_time, changelog_agent)
+ g2.register(register_time, changelog_agent, status)
+ g3.register(register_time, changelog_agent, status)
except ChangelogException as e:
logging.error("Changelog register failed, %s" % e)
sys.exit(1)
- g1.register()
+ g1.register(status=status)
logging.info("Register time: %s" % register_time)
# oneshot: Try to use changelog history api, if not
# available switch to FS crawl
diff --git a/geo-replication/tests/unit/test_gsyncdstatus.py b/geo-replication/tests/unit/test_gsyncdstatus.py
new file mode 100644
index 0000000..a65d659
--- /dev/null
+++ b/geo-replication/tests/unit/test_gsyncdstatus.py
@@ -0,0 +1,193 @@
+#!/usr/bin/env python
+#
+# Copyright (c) 2011-2014 Red Hat, Inc. <http://www.redhat.com>
+# This file is part of GlusterFS.
+
+# This file is licensed to you under your choice of the GNU Lesser
+# General Public License, version 3 or any later version (LGPLv3 or
+# later), or the GNU General Public License, version 2 (GPLv2), in all
+# cases as published by the Free Software Foundation.
+#
+
+import unittest
+import os
+import urllib
+
+from syncdaemon.gstatus import GeorepStatus, set_monitor_status
+from syncdaemon.gstatus import get_default_values
+from syncdaemon.gstatus import MONITOR_STATUS, DEFAULT_STATUS
+from syncdaemon.gstatus import STATUS_VALUES, CRAWL_STATUS_VALUES
+from syncdaemon.gstatus import human_time, human_time_utc
+
+
+class GeorepStatusTestCase(unittest.TestCase):
+ @classmethod
+ def setUpClass(cls):
+ cls.work_dir = os.path.dirname(os.path.abspath(__file__))
+ cls.monitor_status_file = os.path.join(cls.work_dir, "monitor.status")
+ cls.brick = "/exports/bricks/b1"
+ cls.status = GeorepStatus(cls.monitor_status_file, cls.brick)
+ cls.statusfile = os.path.join(cls.work_dir,
+ "brick_%s.status"
+ % urllib.quote_plus(cls.brick))
+
+ @classmethod
+ def tearDownClass(cls):
+ os.remove(cls.statusfile)
+ os.remove(cls.monitor_status_file)
+
+ def _filter_dict(self, inp, keys):
+ op = {}
+ for k in keys:
+ op[k] = inp.get(k, None)
+ return op
+
+ def test_monitor_status_file_created(self):
+ self.assertTrue(os.path.exists(self.monitor_status_file))
+
+ def test_status_file_created(self):
+ self.assertTrue(os.path.exists(self.statusfile))
+
+ def test_set_monitor_status(self):
+ for st in MONITOR_STATUS:
+ set_monitor_status(self.monitor_status_file, st)
+ self.assertTrue(self.status.get_monitor_status(), st)
+
+ def test_default_values_test(self):
+ self.assertTrue(get_default_values(), {
+ "slave_node": DEFAULT_STATUS,
+ "worker_status": DEFAULT_STATUS,
+ "last_synced": 0,
+ "last_synced_utc": 0,
+ "crawl_status": DEFAULT_STATUS,
+ "entry": 0,
+ "data": 0,
+ "metadata": 0,
+ "failures": 0,
+ "checkpoint_completed": False,
+ "checkpoint_time": 0,
+ "checkpoint_time_utc": 0,
+ "checkpoint_completion_time": 0,
+ "checkpoint_completion_time_utc": 0
+ })
+
+ def test_human_time(self):
+ self.assertTrue(human_time(1429174398), "2015-04-16 14:23:18")
+
+ def test_human_time_utc(self):
+ self.assertTrue(human_time_utc(1429174398), "2015-04-16 08:53:18")
+
+ def test_invalid_human_time(self):
+ self.assertTrue(human_time(142917439), DEFAULT_STATUS)
+ self.assertTrue(human_time("abcdef"), DEFAULT_STATUS)
+
+ def test_invalid_human_time_utc(self):
+ self.assertTrue(human_time_utc(142917439), DEFAULT_STATUS)
+ self.assertTrue(human_time_utc("abcdef"), DEFAULT_STATUS)
+
+ def test_worker_status(self):
+ set_monitor_status(self.monitor_status_file, "Started")
+ for st in STATUS_VALUES:
+ self.status.set_worker_status(st)
+ self.assertTrue(self.status.get_status()["worker_status"], st)
+
+ def test_crawl_status(self):
+ set_monitor_status(self.monitor_status_file, "Started")
+ self.status.set_active()
+ for st in CRAWL_STATUS_VALUES:
+ self.status.set_worker_crawl_status(st)
+ self.assertTrue(self.status.get_status()["crawl_status"], st)
+
+ def test_slave_node(self):
+ set_monitor_status(self.monitor_status_file, "Started")
+ self.status.set_active()
+ self.status.set_slave_node("fvm2")
+ self.assertTrue(self.status.get_status()["slave_node"], "fvm2")
+
+ self.status.set_worker_status("Passive")
+ self.status.set_slave_node("fvm2")
+ self.assertTrue(self.status.get_status()["slave_node"], "fvm2")
+
+ def test_active_worker_status(self):
+ set_monitor_status(self.monitor_status_file, "Started")
+ self.status.set_active()
+ self.assertTrue(self.status.get_status()["worker_status"], "Active")
+
+ def test_passive_worker_status(self):
+ set_monitor_status(self.monitor_status_file, "Started")
+ self.status.set_passive()
+ self.assertTrue(self.status.get_status()["worker_status"], "Passive")
+
+ def test_set_field(self):
+ set_monitor_status(self.monitor_status_file, "Started")
+ self.status.set_active()
+ self.status.set_field("entry", 42)
+ self.assertTrue(self.status.get_status()["entry"], 42)
+
+ def test_inc_value(self):
+ set_monitor_status(self.monitor_status_file, "Started")
+ self.status.set_active()
+ self.status.set_field("entry", 0)
+ self.status.inc_value("entry", 2)
+ self.assertTrue(self.status.get_status()["entry"], 2)
+
+ self.status.set_field("data", 0)
+ self.status.inc_value("data", 2)
+ self.assertTrue(self.status.get_status()["data"], 2)
+
+ self.status.set_field("meta", 0)
+ self.status.inc_value("meta", 2)
+ self.assertTrue(self.status.get_status()["meta"], 2)
+
+ self.status.set_field("failures", 0)
+ self.status.inc_value("failures", 2)
+ self.assertTrue(self.status.get_status()["failures"], 2)
+
+ def test_dec_value(self):
+ set_monitor_status(self.monitor_status_file, "Started")
+ self.status.set_active()
+
+ self.status.set_field("entry", 4)
+ self.status.inc_value("entry", 2)
+ self.assertTrue(self.status.get_status()["entry"], 2)
+
+ self.status.set_field("data", 4)
+ self.status.inc_value("data", 2)
+ self.assertTrue(self.status.get_status()["data"], 2)
+
+ self.status.set_field("meta", 4)
+ self.status.inc_value("meta", 2)
+ self.assertTrue(self.status.get_status()["meta"], 2)
+
+ self.status.set_field("failures", 4)
+ self.status.inc_value("failures", 2)
+ self.assertTrue(self.status.get_status()["failures"], 2)
+
+ def test_worker_status_when_monitor_status_created(self):
+ set_monitor_status(self.monitor_status_file, "Created")
+ for st in STATUS_VALUES:
+ self.status.set_worker_status(st)
+ self.assertTrue(self.status.get_status()["worker_status"],
+ "Created")
+
+ def test_worker_status_when_monitor_status_paused(self):
+ set_monitor_status(self.monitor_status_file, "Paused")
+ for st in STATUS_VALUES:
+ self.status.set_worker_status(st)
+ self.assertTrue(self.status.get_status()["worker_status"],
+ "Paused")
+
+ def test_worker_status_when_monitor_status_stopped(self):
+ set_monitor_status(self.monitor_status_file, "Stopped")
+ for st in STATUS_VALUES:
+ self.status.set_worker_status(st)
+ self.assertTrue(self.status.get_status()["worker_status"],
+ "Stopped")
+
+ def test_status_when_worker_status_active(self):
+ set_monitor_status(self.monitor_status_file, "Started")
+ self.status.set_active()
+
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/rpc/rpc-lib/src/protocol-common.h b/rpc/rpc-lib/src/protocol-common.h
index 3b96433..2f7f23f 100644
--- a/rpc/rpc-lib/src/protocol-common.h
+++ b/rpc/rpc-lib/src/protocol-common.h
@@ -248,15 +248,21 @@ struct gf_gsync_detailed_status_ {
char master[NAME_MAX];
char brick[NAME_MAX];
char slave_user[NAME_MAX];
+ char slave[NAME_MAX];
char slave_node[NAME_MAX];
char worker_status[NAME_MAX];
- char checkpoint_status[NAME_MAX];
char crawl_status[NAME_MAX];
- char files_syncd[NAME_MAX];
- char files_remaining[NAME_MAX];
- char bytes_remaining[NAME_MAX];
- char purges_remaining[NAME_MAX];
- char total_files_skipped[NAME_MAX];
+ char last_synced[NAME_MAX];
+ char last_synced_utc[NAME_MAX];
+ char entry[NAME_MAX];
+ char data[NAME_MAX];
+ char meta[NAME_MAX];
+ char failures[NAME_MAX];
+ char checkpoint_time[NAME_MAX];
+ char checkpoint_time_utc[NAME_MAX];
+ char checkpoint_completed[NAME_MAX];
+ char checkpoint_completion_time[NAME_MAX];
+ char checkpoint_completion_time_utc[NAME_MAX];
char brick_host_uuid[NAME_MAX];
char slavekey[NAME_MAX];
char session_slave[NAME_MAX];
diff --git a/xlators/mgmt/glusterd/src/glusterd-geo-rep.c b/xlators/mgmt/glusterd/src/glusterd-geo-rep.c
index 708d6d3..24768e3 100644
--- a/xlators/mgmt/glusterd/src/glusterd-geo-rep.c
+++ b/xlators/mgmt/glusterd/src/glusterd-geo-rep.c
@@ -667,6 +667,128 @@ glusterd_gsync_get_config (char *master, char *slave, char *conf_path, dict_t *d
}
static int
+_fcbk_statustostruct (char *resbuf, size_t blen, FILE *fp,
+ void *data)
+{
+ char *ptr = NULL;
+ char *v = NULL;
+ char *k = NULL;
+ gf_gsync_status_t *sts_val = NULL;
+
+ sts_val = (gf_gsync_status_t *)data;
+
+ for (;;) {
+ errno = 0;
+ ptr = fgets (resbuf, blen, fp);
+ if (!ptr)
+ break;
+ v = resbuf + strlen(resbuf) - 1;
+ while (isspace (*v))
+ /* strip trailing space */
+ *v-- = '\0';
+ if (v == resbuf)
+ /* skip empty line */
+ continue;
+ v = strchr (resbuf, ':');
+ if (!v)
+ return -1;
+ *v++ = '\0';
+ while (isspace (*v))
+ v++;
+ v = gf_strdup (v);
+ if (!v)
+ return -1;
+
+ k = gf_strdup (resbuf);
+ if (!k)
+ return -1;
+
+ if (strcmp (k, "worker_status") == 0) {
+ memcpy (sts_val->worker_status, v,
+ strlen(v));
+ sts_val->worker_status[strlen(v)] = '\0';
+ } else if (strcmp (k, "slave_node") == 0) {
+ memcpy (sts_val->slave_node, v,
+ strlen(v));
+ sts_val->slave_node[strlen(v)] = '\0';
+ } else if (strcmp (k, "crawl_status") == 0) {
+ memcpy (sts_val->crawl_status, v,
+ strlen(v));
+ sts_val->crawl_status[strlen(v)] = '\0';
+ } else if (strcmp (k, "last_synced") == 0) {
+ memcpy (sts_val->last_synced, v,
+ strlen(v));
+ sts_val->last_synced[strlen(v)] = '\0';
+ } else if (strcmp (k, "last_synced_utc") == 0) {
+ memcpy (sts_val->last_synced_utc, v,
+ strlen(v));
+ sts_val->last_synced_utc[strlen(v)] = '\0';
+ } else if (strcmp (k, "entry") == 0) {
+ memcpy (sts_val->entry, v,
+ strlen(v));
+ sts_val->entry[strlen(v)] = '\0';
+ } else if (strcmp (k, "data") == 0) {
+ memcpy (sts_val->data, v,
+ strlen(v));
+ sts_val->data[strlen(v)] = '\0';
+ } else if (strcmp (k, "meta") == 0) {
+ memcpy (sts_val->meta, v,
+ strlen(v));
+ sts_val->meta[strlen(v)] = '\0';
+ } else if (strcmp (k, "failures") == 0) {
+ memcpy (sts_val->failures, v,
+ strlen(v));
+ sts_val->failures[strlen(v)] = '\0';
+ } else if (strcmp (k, "checkpoint_time") == 0) {
+ memcpy (sts_val->checkpoint_time, v,
+ strlen(v));
+ sts_val->checkpoint_time[strlen(v)] = '\0';
+ } else if (strcmp (k, "checkpoint_time_utc") == 0) {
+ memcpy (sts_val->checkpoint_time_utc, v,
+ strlen(v));
+ sts_val->checkpoint_time_utc[strlen(v)] = '\0';
+ } else if (strcmp (k, "checkpoint_completed") == 0) {
+ memcpy (sts_val->checkpoint_completed, v,
+ strlen(v));
+ sts_val->checkpoint_completed[strlen(v)] = '\0';
+ } else if (strcmp (k, "checkpoint_completion_time") == 0) {
+ memcpy (sts_val->checkpoint_completion_time, v,
+ strlen(v));
+ sts_val->checkpoint_completion_time[strlen(v)] = '\0';
+ } else if (strcmp (k, "checkpoint_completion_time_utc") == 0) {
+ memcpy (sts_val->checkpoint_completion_time_utc, v,
+ strlen(v));
+ sts_val->checkpoint_completion_time_utc[strlen(v)] =
+ '\0';
+ }
+ }
+
+ return errno ? -1 : 0;
+}
+
+
+static int
+glusterd_gsync_get_status (char *master, char *slave, char *conf_path,
+ char *brick_path, gf_gsync_status_t *sts_val)
+{
+ /* key + value, where value must be able to accommodate a path */
+ char resbuf[256 + PATH_MAX] = {0,};
+ runner_t runner = {0,};
+
+ runinit (&runner);
+ runner_add_args (&runner, GSYNCD_PREFIX"/gsyncd", "-c", NULL);
+ runner_argprintf (&runner, "%s", conf_path);
+ runner_argprintf (&runner, "--iprefix=%s", DATADIR);
+ runner_argprintf (&runner, ":%s", master);
+ runner_add_args (&runner, slave, "--status-get", NULL);
+ runner_add_args (&runner, "--path", brick_path, NULL);
+
+ return glusterd_query_extutil_generic (resbuf, sizeof (resbuf),
+ &runner, sts_val,
+ _fcbk_statustostruct);
+}
+
+static int
glusterd_gsync_get_param_file (char *prmfile, const char *param, char *master,
char *slave, char *conf_path)
{
@@ -2804,7 +2926,6 @@ gd_pause_or_resume_gsync (dict_t *dict, char *master, char *slave,
gf_boolean_t is_template_in_use = _gf_false;
char monitor_status[NAME_MAX] = {0,};
char *statefile = NULL;
- char *token = NULL;
xlator_t *this = NULL;
this = THIS;
@@ -2869,10 +2990,10 @@ gd_pause_or_resume_gsync (dict_t *dict, char *master, char *slave,
do not update status again*/
if (strstr (monitor_status, "Paused"))
goto out;
- (void) strcat (monitor_status, "(Paused)");
+
ret = glusterd_create_status_file ( master, slave,
slave_host, slave_vol,
- monitor_status);
+ "Paused");
if (ret) {
gf_log (this->name, GF_LOG_ERROR,
"Unable to update state_file."
@@ -2893,10 +3014,10 @@ gd_pause_or_resume_gsync (dict_t *dict, char *master, char *slave,
goto out;
}
} else {
- token = strtok (monitor_status, "(");
ret = glusterd_create_status_file (master, slave,
slave_host,
- slave_vol, token);
+ slave_vol,
+ "Started");
if (ret) {
gf_log (this->name, GF_LOG_ERROR,
"Resume Failed: Unable to update "
@@ -3321,159 +3442,6 @@ out:
return ret;
}
-static int
-glusterd_parse_gsync_status (char *buf, gf_gsync_status_t *sts_val)
-{
- int ret = -1;
- int i = -1;
- int num_of_fields = 8;
- char *token = NULL;
- char **tokens = NULL;
- char **ptr = NULL;
- char *save_ptr = NULL;
- char na_buf[] = "N/A";
-
- if (!buf) {
- gf_log ("", GF_LOG_ERROR, "Empty buf");
- goto out;
- }
-
- tokens = calloc (num_of_fields, sizeof (char *));
- if (!tokens) {
- gf_log ("", GF_LOG_ERROR, "Out of memory");
- goto out;
- }
-
- ptr = tokens;
-
- for (token = strtok_r (buf, ",", &save_ptr); token;
- token = strtok_r (NULL, ",", &save_ptr)) {
- *ptr = gf_strdup(token);
- if (!*ptr) {
- gf_log ("", GF_LOG_ERROR, "Out of memory");
- goto out;
- }
- ptr++;
- }
-
- for (i = 0; i < num_of_fields; i++) {
- token = strtok_r (tokens[i], ":", &save_ptr);
- token = strtok_r (NULL, "\0", &save_ptr);
- token++;
-
- /* token NULL check */
- if (!token && (i != 0) &&
- (i != 5) && (i != 7))
- token = na_buf;
-
- if (i == 0) {
- if (!token)
- token = na_buf;
- else {
- token++;
- if (!token)
- token = na_buf;
- else
- token[strlen(token) - 1] = '\0';
- }
- memcpy (sts_val->slave_node, token, strlen(token));
- }
- if (i == 1)
- memcpy (sts_val->files_syncd, token, strlen(token));
- if (i == 2)
- memcpy (sts_val->purges_remaining, token, strlen(token));
- if (i == 3)
- memcpy (sts_val->total_files_skipped, token, strlen(token));
- if (i == 4)
- memcpy (sts_val->files_remaining, token, strlen(token));
- if (i == 5) {
- if (!token)
- token = na_buf;
- else {
- token++;
- if (!token)
- token = na_buf;
- else
- token[strlen(token) - 1] = '\0';
- }
- memcpy (sts_val->worker_status, token, strlen(token));
- }
- if (i == 6)
- memcpy (sts_val->bytes_remaining, token, strlen(token));
- if (i == 7) {
- if (!token)
- token = na_buf;
- else {
- token++;
- if (!token)
- token = na_buf;
- else
- token[strlen(token) - 2] = '\0';
- }
- memcpy (sts_val->crawl_status, token, strlen(token));
- }
- }
-
- ret = 0;
-out:
- for (i = 0; i< num_of_fields; i++)
- if (tokens[i])
- GF_FREE(tokens[i]);
-
- gf_log ("", GF_LOG_DEBUG, "Returning %d", ret);
- return ret;
-}
-
-static int
-glusterd_gsync_fetch_status_extra (char *path, gf_gsync_status_t *sts_val)
-{
- char sockpath[PATH_MAX] = {0,};
- struct sockaddr_un sa = {0,};
- int s = -1;
- struct pollfd pfd = {0,};
- int ret = 0;
-
- glusterd_set_socket_filepath (path, sockpath, sizeof (sockpath));
-
- strncpy(sa.sun_path, sockpath, sizeof(sa.sun_path));
- if (sa.sun_path[sizeof (sa.sun_path) - 1])
- return -1;
- sa.sun_family = AF_UNIX;
-
- s = socket(AF_UNIX, SOCK_STREAM, 0);
- if (s == -1)
- return -1;
- ret = fcntl (s, F_GETFL);
- if (ret != -1)
- ret = fcntl (s, F_SETFL, ret | O_NONBLOCK);
- if (ret == -1)
- goto out;
-
- ret = connect (s, (struct sockaddr *)&sa, sizeof (sa));
- if (ret == -1)
- goto out;
- pfd.fd = s;
- pfd.events = POLLIN;
- /* we don't want to hang on gsyncd */
- if (poll (&pfd, 1, 5000) < 1 ||
- !(pfd.revents & POLLIN)) {
- ret = -1;
- goto out;
- }
- ret = read(s, sts_val->checkpoint_status,
- sizeof(sts_val->checkpoint_status));
- /* we expect a terminating 0 byte */
- if (ret == 0 || (ret > 0 && sts_val->checkpoint_status[ret - 1]))
- ret = -1;
- if (ret > 0) {
- ret = 0;
- }
-
-out:
- close (s);
- return ret;
-}
-
int
glusterd_fetch_values_from_config (char *master, char *slave,
char *confpath, dict_t *confd,
@@ -3567,6 +3535,7 @@ glusterd_read_status_file (glusterd_volinfo_t *volinfo, char *slave,
gf_boolean_t is_template_in_use = _gf_false;
glusterd_conf_t *priv = NULL;
struct stat stbuf = {0,};
+ dict_t *statusd = NULL;
GF_ASSERT (THIS);
GF_ASSERT (THIS->private);
@@ -3657,114 +3626,7 @@ fetch_data:
goto out;
}
- /* Creating the brick state file's path */
- memset(brick_state_file, '\0', PATH_MAX);
- memcpy (brick_path, brickinfo->path, PATH_MAX - 1);
- for (i = 0; i < strlen(brick_path) - 1; i++)
- if (brick_path[i] == '/')
- brick_path[i] = '_';
- ret = snprintf(brick_state_file, PATH_MAX - 1, "%s%s.status",
- georep_session_wrkng_dir, brick_path);
- brick_state_file[ret] = '\0';
-
- gf_log ("", GF_LOG_DEBUG, "brick_state_file = %s", brick_state_file);
-
- memset (tmp, '\0', sizeof(tmp));
-
- ret = glusterd_gsync_read_frm_status (brick_state_file,
- tmp, sizeof (tmp));
- if (ret <= 0) {
- gf_log ("", GF_LOG_ERROR, "Unable to read the status"
- "file for %s brick for %s(master), %s(slave) "
- "session", brickinfo->path, master, slave);
- memcpy (sts_val->slave_node, slave, strlen(slave));
- sts_val->slave_node[strlen(slave)] = '\0';
- ret = snprintf (sts_val->worker_status, sizeof(sts_val->worker_status), "N/A");
- sts_val->worker_status[ret] = '\0';
- ret = snprintf (sts_val->checkpoint_status, sizeof(sts_val->checkpoint_status), "N/A");
- sts_val->checkpoint_status[ret] = '\0';
- ret = snprintf (sts_val->crawl_status, sizeof(sts_val->crawl_status), "N/A");
- sts_val->crawl_status[ret] = '\0';
- ret = snprintf (sts_val->files_syncd, sizeof(sts_val->files_syncd), "N/A");
- sts_val->files_syncd[ret] = '\0';
- ret = snprintf (sts_val->purges_remaining, sizeof(sts_val->purges_remaining), "N/A");
- sts_val->purges_remaining[ret] = '\0';
- ret = snprintf (sts_val->total_files_skipped, sizeof(sts_val->total_files_skipped), "N/A");
- sts_val->total_files_skipped[ret] = '\0';
- ret = snprintf (sts_val->files_remaining, sizeof(sts_val->files_remaining), "N/A");
- sts_val->files_remaining[ret] = '\0';
- ret = snprintf (sts_val->bytes_remaining, sizeof(sts_val->bytes_remaining), "N/A");
- sts_val->bytes_remaining[ret] = '\0';
- goto store_status;
- }
-
- ret = glusterd_gsync_fetch_status_extra (socketfile, sts_val);
- if (ret || strlen(sts_val->checkpoint_status) == 0) {
- gf_log ("", GF_LOG_DEBUG, "No checkpoint status"
- "for %s(master), %s(slave)", master, slave);
- ret = snprintf (sts_val->checkpoint_status, sizeof(sts_val->checkpoint_status), "N/A");
- sts_val->checkpoint_status[ret] = '\0';
- }
-
- ret = glusterd_parse_gsync_status (tmp, sts_val);
- if (ret) {
- gf_log ("", GF_LOG_ERROR,
- "Unable to parse the gsync status for %s",
- brickinfo->path);
- memcpy (sts_val->slave_node, slave, strlen(slave));
- sts_val->slave_node[strlen(slave)] = '\0';
- ret = snprintf (sts_val->worker_status, sizeof(sts_val->worker_status), "N/A");
- sts_val->worker_status[ret] = '\0';
- ret = snprintf (sts_val->checkpoint_status, sizeof(sts_val->checkpoint_status), "N/A");
- sts_val->checkpoint_status[ret] = '\0';
- ret = snprintf (sts_val->crawl_status, sizeof(sts_val->crawl_status), "N/A");
- sts_val->crawl_status[ret] = '\0';
- ret = snprintf (sts_val->files_syncd, sizeof(sts_val->files_syncd), "N/A");
- sts_val->files_syncd[ret] = '\0';
- ret = snprintf (sts_val->purges_remaining, sizeof(sts_val->purges_remaining), "N/A");
- sts_val->purges_remaining[ret] = '\0';
- ret = snprintf (sts_val->total_files_skipped, sizeof(sts_val->total_files_skipped), "N/A");
- sts_val->total_files_skipped[ret] = '\0';
- ret = snprintf (sts_val->files_remaining, sizeof(sts_val->files_remaining), "N/A");
- sts_val->files_remaining[ret] = '\0';
- ret = snprintf (sts_val->bytes_remaining, sizeof(sts_val->bytes_remaining), "N/A");
- sts_val->bytes_remaining[ret] = '\0';
- }
-
-store_status:
- if ((strcmp (monitor_status, "Stable"))) {
- memcpy (sts_val->worker_status, monitor_status, strlen(monitor_status));
- sts_val->worker_status[strlen(monitor_status)] = '\0';
- ret = snprintf (sts_val->crawl_status, sizeof(sts_val->crawl_status), "N/A");
- sts_val->crawl_status[ret] = '\0';
- ret = snprintf (sts_val->checkpoint_status, sizeof(sts_val->checkpoint_status), "N/A");
- sts_val->checkpoint_status[ret] = '\0';
- }
-
- if (is_template_in_use) {
- ret = snprintf (sts_val->worker_status,
- sizeof(sts_val->worker_status),
- "Config Corrupted");
- sts_val->worker_status[ret] = '\0';
- }
-
- if (strcmp (sts_val->worker_status, "Active")) {
- ret = snprintf (sts_val->checkpoint_status, sizeof(sts_val->checkpoint_status), "N/A");
- sts_val->checkpoint_status[ret] = '\0';
- ret = snprintf (sts_val->crawl_status, sizeof(sts_val->crawl_status), "N/A");
- sts_val->crawl_status[ret] = '\0';
- }
-
- if (!strcmp (sts_val->slave_node, "N/A")) {
- memcpy (sts_val->slave_node, slave, strlen(slave));
- sts_val->slave_node[strlen(slave)] = '\0';
- }
-
- memcpy (sts_val->node, node, strlen(node));
- sts_val->node[strlen(node)] = '\0';
- memcpy (sts_val->brick, brickinfo->path, strlen(brickinfo->path));
- sts_val->brick[strlen(brickinfo->path)] = '\0';
-
+ /* Slave Key */
ret = glusterd_get_slave (volinfo, slave, &slavekey);
if (ret < 0) {
GF_FREE (sts_val);
@@ -3773,14 +3635,87 @@ store_status:
memcpy (sts_val->slavekey, slavekey, strlen(slavekey));
sts_val->slavekey[strlen(slavekey)] = '\0';
+ /* Master Volume */
+ memcpy (sts_val->master, master, strlen(master));
+ sts_val->master[strlen(master)] = '\0';
+
+ /* Master Brick Node */
+ memcpy (sts_val->node, node, strlen(node));
+ sts_val->node[strlen(node)] = '\0';
+
+ /* Master Brick Path */
+ memcpy (sts_val->brick, brickinfo->path,
+ strlen(brickinfo->path));
+ sts_val->brick[strlen(brickinfo->path)] = '\0';
+
+ /* Brick Host UUID */
brick_host_uuid = uuid_utoa(brickinfo->uuid);
brick_host_uuid_length = strlen (brick_host_uuid);
memcpy (sts_val->brick_host_uuid, brick_host_uuid,
brick_host_uuid_length);
sts_val->brick_host_uuid[brick_host_uuid_length] = '\0';
- memcpy (sts_val->master, master, strlen(master));
- sts_val->master[strlen(master)] = '\0';
+ /* Slave */
+ memcpy (sts_val->slave, slave, strlen(slave));
+ sts_val->slave[strlen(slave)] = '\0';
+
+ snprintf (sts_val->slave_node,
+ sizeof(sts_val->slave_node), "N/A");
+
+ snprintf (sts_val->worker_status,
+ sizeof(sts_val->worker_status), "N/A");
+
+ snprintf (sts_val->crawl_status,
+ sizeof(sts_val->crawl_status), "N/A");
+
+ snprintf (sts_val->last_synced,
+ sizeof(sts_val->last_synced), "N/A");
+
+ snprintf (sts_val->last_synced_utc,
+ sizeof(sts_val->last_synced_utc), "N/A");
+
+ snprintf (sts_val->entry, sizeof(sts_val->entry), "N/A");
+
+ snprintf (sts_val->data, sizeof(sts_val->data), "N/A");
+
+ snprintf (sts_val->meta, sizeof(sts_val->meta), "N/A");
+
+ snprintf (sts_val->failures, sizeof(sts_val->failures), "N/A");
+
+ snprintf (sts_val->checkpoint_time,
+ sizeof(sts_val->checkpoint_time), "N/A");
+
+ snprintf (sts_val->checkpoint_time_utc,
+ sizeof(sts_val->checkpoint_time_utc), "N/A");
+
+ snprintf (sts_val->checkpoint_completed,
+ sizeof(sts_val->checkpoint_completed), "N/A");
+
+ snprintf (sts_val->checkpoint_completion_time,
+ sizeof(sts_val->checkpoint_completion_time),
+ "N/A");
+
+ snprintf (sts_val->checkpoint_completion_time_utc,
+ sizeof(sts_val->checkpoint_completion_time_utc),
+ "N/A");
+
+ /* Get all the other values from Gsyncd */
+ ret = glusterd_gsync_get_status (master, slave, conf_path,
+ brickinfo->path, sts_val);
+
+ if (ret) {
+ gf_log ("", GF_LOG_ERROR, "Unable to get status data "
+ "for %s(master), %s(slave), %s(brick)",
+ master, slave, brickinfo->path);
+ ret = -1;
+ goto out;
+ }
+
+ if (is_template_in_use) {
+ snprintf (sts_val->worker_status,
+ sizeof(sts_val->worker_status),
+ "Config Corrupted");
+ }
ret = dict_get_str (volinfo->gsync_slaves, slavekey,
&slaveentry);
@@ -3809,8 +3744,10 @@ store_status:
strlen(slaveuser));
sts_val->slave_user[strlen(slaveuser)] = '\0';
- snprintf (sts_val_name, sizeof (sts_val_name), "status_value%d", gsync_count);
- ret = dict_set_bin (dict, sts_val_name, sts_val, sizeof(gf_gsync_status_t));
+ snprintf (sts_val_name, sizeof (sts_val_name),
+ "status_value%d", gsync_count);
+ ret = dict_set_bin (dict, sts_val_name, sts_val,
+ sizeof(gf_gsync_status_t));
if (ret) {
GF_FREE (sts_val);
goto out;
@@ -5258,7 +5195,7 @@ glusterd_create_essential_dir_files (glusterd_volinfo_t *volinfo, dict_t *dict,
} else {
ret = glusterd_create_status_file (volinfo->volname, slave,
slave_host, slave_vol,
- "Not Started");
+ "Created");
if (ret || lstat (statefile, &stbuf)) {
snprintf (errmsg, sizeof (errmsg), "Unable to create %s"
". Error : %s", statefile, strerror (errno));