summaryrefslogtreecommitdiffstats
path: root/geo-replication/syncdaemon/master.py
diff options
context:
space:
mode:
Diffstat (limited to 'geo-replication/syncdaemon/master.py')
-rw-r--r--geo-replication/syncdaemon/master.py374
1 files changed, 53 insertions, 321 deletions
diff --git a/geo-replication/syncdaemon/master.py b/geo-replication/syncdaemon/master.py
index 38535884ec6..8e4c43046b0 100644
--- a/geo-replication/syncdaemon/master.py
+++ b/geo-replication/syncdaemon/master.py
@@ -15,17 +15,15 @@ import stat
import json
import logging
import fcntl
-import socket
import string
import errno
import tarfile
-from errno import ENOENT, ENODATA, EPIPE, EEXIST, EACCES, EAGAIN
+from errno import ENOENT, ENODATA, EEXIST, EACCES, EAGAIN
from threading import Condition, Lock
from datetime import datetime
from gconf import gconf
-from tempfile import NamedTemporaryFile
from syncdutils import Thread, GsyncdError, boolify, escape
-from syncdutils import unescape, select, gauxpfx, md5hex, selfkill
+from syncdutils import unescape, gauxpfx, md5hex, selfkill
from syncdutils import lstat, errno_wrap
from syncdutils import NoPurgeTimeAvailable, PartialHistoryAvailable
@@ -397,18 +395,6 @@ class GMasterCommon(object):
raise
return default_data
- def update_crawl_data(self):
- if getattr(gconf, 'state_detail_file', None):
- try:
- same_dir = os.path.dirname(gconf.state_detail_file)
- with NamedTemporaryFile(dir=same_dir, delete=False) as tmp:
- json.dump(self.total_crawl_stats, tmp)
- tmp.flush()
- os.fsync(tmp.fileno())
- os.rename(tmp.name, gconf.state_detail_file)
- except (IOError, OSError):
- raise
-
def __init__(self, master, slave):
self.master = master
self.slave = slave
@@ -434,14 +420,12 @@ class GMasterCommon(object):
self.total_turns = int(gconf.turns)
self.crawl_start = datetime.now()
self.lastreport = {'crawls': 0, 'turns': 0, 'time': 0}
- self.total_crawl_stats = None
self.start = None
self.change_seen = None
# the actual volinfo we make use of
self.volinfo = None
self.terminate = False
self.sleep_interval = 1
- self.checkpoint_thread = None
self.current_files_skipped_count = 0
self.skipped_gfid_list = []
self.unlinked_gfids = []
@@ -493,7 +477,6 @@ class GMasterCommon(object):
logging.debug("Got the lock")
return True
-
def should_crawl(self):
if not gconf.use_meta_volume:
return gconf.glusterd_uuid in self.master.server.node_uuid()
@@ -503,7 +486,6 @@ class GMasterCommon(object):
sys.exit(1)
return self.mgmt_lock()
-
def register(self):
self.register()
@@ -542,10 +524,8 @@ class GMasterCommon(object):
if self.volinfo['retval']:
logging.warn("master cluster's info may not be valid %d" %
self.volinfo['retval'])
- self.start_checkpoint_thread()
else:
raise GsyncdError("master volinfo unavailable")
- self.total_crawl_stats = self.get_initial_crawl_data()
self.lastreport['time'] = time.time()
logging.info('crawl interval: %d seconds' % self.sleep_interval)
@@ -570,7 +550,7 @@ class GMasterCommon(object):
t0 = t1
self.update_worker_remote_node()
if not crawl:
- self.update_worker_health("Passive")
+ self.status.set_passive()
# bring up _this_ brick to the cluster stime
# which is min of cluster (but max of the replicas)
brick_stime = self.xtime('.', self.slave)
@@ -597,35 +577,14 @@ class GMasterCommon(object):
time.sleep(5)
continue
- self.update_worker_health("Active")
+
+ self.status.set_active()
self.crawl()
+
if oneshot:
return
time.sleep(self.sleep_interval)
- @classmethod
- def _checkpt_param(cls, chkpt, prm, xtimish=True):
- """use config backend to lookup a parameter belonging to
- checkpoint @chkpt"""
- cprm = gconf.configinterface.get_realtime('checkpoint_' + prm)
- if not cprm:
- return
- chkpt_mapped, val = cprm.split(':', 1)
- if unescape(chkpt_mapped) != chkpt:
- return
- if xtimish:
- val = cls.deserialize_xtime(val)
- return val
-
- @classmethod
- def _set_checkpt_param(cls, chkpt, prm, val, xtimish=True):
- """use config backend to store a parameter associated
- with checkpoint @chkpt"""
- if xtimish:
- val = cls.serialize_xtime(val)
- gconf.configinterface.set(
- 'checkpoint_' + prm, "%s:%s" % (escape(chkpt), val))
-
@staticmethod
def humantime(*tpair):
"""format xtime-like (sec, nsec) pair to human readable format"""
@@ -654,116 +613,6 @@ class GMasterCommon(object):
string.zfill(m, 2), string.zfill(s, 2))
return date
- def checkpt_service(self, chan, chkpt):
- """checkpoint service loop
-
- monitor and verify checkpoint status for @chkpt, and listen
- for incoming requests for whom we serve a pretty-formatted
- status report"""
- while True:
- chkpt = gconf.configinterface.get_realtime("checkpoint")
- if not chkpt:
- gconf.configinterface.delete("checkpoint_completed")
- gconf.configinterface.delete("checkpoint_target")
- # dummy loop for the case when there is no checkpt set
- select([chan], [], [])
- conn, _ = chan.accept()
- conn.send('\0')
- conn.close()
- continue
-
- checkpt_tgt = self._checkpt_param(chkpt, 'target')
- if not checkpt_tgt:
- checkpt_tgt = self.xtime('.')
- if isinstance(checkpt_tgt, int):
- raise GsyncdError("master root directory is "
- "unaccessible (%s)",
- os.strerror(checkpt_tgt))
- self._set_checkpt_param(chkpt, 'target', checkpt_tgt)
- logging.debug("checkpoint target %s has been determined "
- "for checkpoint %s" %
- (repr(checkpt_tgt), chkpt))
-
- # check if the label is 'now'
- chkpt_lbl = chkpt
- try:
- x1, x2 = chkpt.split(':')
- if x1 == 'now':
- chkpt_lbl = "as of " + self.humantime(x2)
- except:
- pass
- completed = self._checkpt_param(chkpt, 'completed', xtimish=False)
- if completed:
- completed = tuple(int(x) for x in completed.split('.'))
- s, _, _ = select([chan], [], [], (not completed) and 5 or None)
- # either request made and we re-check to not
- # give back stale data, or we still hunting for completion
- if (self.native_xtime(checkpt_tgt) and (
- self.native_xtime(checkpt_tgt) < self.volmark)):
- # indexing has been reset since setting the checkpoint
- status = "is invalid"
- else:
- xtr = self.xtime('.', self.slave)
- if isinstance(xtr, int):
- raise GsyncdError("slave root directory is "
- "unaccessible (%s)",
- os.strerror(xtr))
- ncompleted = self.xtime_geq(xtr, checkpt_tgt)
- if completed and not ncompleted: # stale data
- logging.warn("completion time %s for checkpoint %s "
- "became stale" %
- (self.humantime(*completed), chkpt))
- completed = None
- gconf.configinterface.delete('checkpoint_completed')
- if ncompleted and not completed: # just reaching completion
- completed = "%.6f" % time.time()
- self._set_checkpt_param(
- chkpt, 'completed', completed, xtimish=False)
- completed = tuple(int(x) for x in completed.split('.'))
- logging.info("checkpoint %s completed" % chkpt)
- status = completed and \
- "completed at " + self.humantime(completed[0]) or \
- "not reached yet"
- if s:
- conn = None
- try:
- conn, _ = chan.accept()
- try:
- conn.send("checkpoint %s is %s\0" %
- (chkpt_lbl, status))
- except:
- exc = sys.exc_info()[1]
- if ((isinstance(exc, OSError) or isinstance(
- exc, IOError)) and exc.errno == EPIPE):
- logging.debug('checkpoint client disconnected')
- else:
- raise
- finally:
- if conn:
- conn.close()
-
- def start_checkpoint_thread(self):
- """prepare and start checkpoint service"""
- if self.checkpoint_thread or not (
- getattr(gconf, 'state_socket_unencoded', None) and getattr(
- gconf, 'socketdir', None)
- ):
- return
- chan = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
- state_socket = os.path.join(
- gconf.socketdir, md5hex(gconf.state_socket_unencoded) + ".socket")
- try:
- os.unlink(state_socket)
- except:
- if sys.exc_info()[0] == OSError:
- pass
- chan.bind(state_socket)
- chan.listen(1)
- chkpt = gconf.configinterface.get_realtime("checkpoint")
- t = Thread(target=self.checkpt_service, args=(chan, chkpt))
- t.start()
- self.checkpoint_thread = t
-
def add_job(self, path, label, job, *a, **kw):
"""insert @job function to job table at @path with @label"""
if self.jobtab.get(path) is None:
@@ -937,11 +786,15 @@ class GMasterChangelogMixin(GMasterCommon):
files_pending['purge'] += 1
def log_failures(failures, entry_key, gfid_prefix, log_prefix):
+ num_failures = 0
for failure in failures:
st = lstat(os.path.join(gfid_prefix, failure[0][entry_key]))
if not isinstance(st, int):
+ num_failures += 1
logging.warn('%s FAILED: %s' % (log_prefix, repr(failure)))
+ self.status.inc_value("failures", num_failures)
+
for e in clist:
e = e.strip()
et = e[self.IDX_START:self.IDX_END] # entry type
@@ -1040,12 +893,18 @@ class GMasterChangelogMixin(GMasterCommon):
else:
logging.warn('got invalid changelog type: %s' % (et))
logging.debug('entries: %s' % repr(entries))
- if not retry:
- self.update_worker_cumilitive_status(files_pending)
+
+ # Increment counters for Status
+ self.status.inc_value("entry", len(entries))
+ self.files_in_batch = len(datas)
+ self.status.inc_value("data", self.files_in_batch)
+
# sync namespace
if entries:
failures = self.slave.server.entry_ops(entries)
log_failures(failures, 'gfid', gauxpfx(), 'ENTRY')
+ self.status.dec_value("entry", len(entries))
+
# sync metadata
if meta_gfid:
meta_entries = []
@@ -1059,8 +918,11 @@ class GMasterChangelogMixin(GMasterCommon):
continue
meta_entries.append(edct('META', go=go[0], stat=st))
if meta_entries:
+ self.status.inc_value("meta", len(entries))
failures = self.slave.server.meta_ops(meta_entries)
log_failures(failures, 'go', '', 'META')
+ self.status.dec_value("meta", len(entries))
+
# sync data
if datas:
self.a_syncdata(datas)
@@ -1112,9 +974,17 @@ class GMasterChangelogMixin(GMasterCommon):
if done:
xtl = (int(change.split('.')[-1]) - 1, 0)
self.upd_stime(xtl)
+ chkpt_time = gconf.configinterface.get_realtime(
+ "checkpoint")
+ checkpoint_time = 0
+ if chkpt_time is not None:
+ checkpoint_time = int(chkpt_time)
+
+ self.status.set_last_synced(xtl, checkpoint_time)
map(self.changelog_done_func, changes)
self.archive_and_purge_changelogs(changes)
- self.update_worker_files_syncd()
+ self.status.dec_value("data", self.files_in_batch)
+ self.files_in_batch = 0
break
# We do not know which changelog transfer failed, retry everything.
@@ -1124,14 +994,22 @@ class GMasterChangelogMixin(GMasterCommon):
logging.warn('changelogs %s could not be processed - '
'moving on...' %
' '.join(map(os.path.basename, changes)))
- self.update_worker_total_files_skipped(
- self.current_files_skipped_count)
+ self.status.inc_value("failures",
+ self.current_files_skipped_count)
logging.warn('SKIPPED GFID = %s' %
','.join(self.skipped_gfid_list))
- self.update_worker_files_syncd()
+
+ self.files_in_batch = 0
if done:
xtl = (int(change.split('.')[-1]) - 1, 0)
self.upd_stime(xtl)
+ chkpt_time = gconf.configinterface.get_realtime(
+ "checkpoint")
+ checkpoint_time = 0
+ if chkpt_time is not None:
+ checkpoint_time = int(chkpt_time)
+
+ self.status.set_last_synced(xtl, checkpoint_time)
map(self.changelog_done_func, changes)
self.archive_and_purge_changelogs(changes)
break
@@ -1152,161 +1030,12 @@ class GMasterChangelogMixin(GMasterCommon):
if not stime == URXTIME:
self.sendmark(path, stime)
- def get_worker_status_file(self):
- file_name = gconf.local_path + '.status'
- file_name = file_name.replace("/", "_")
- worker_status_file = gconf.georep_session_working_dir + file_name
- return worker_status_file
-
- def update_worker_status(self, key, value):
- default_data = {"remote_node": "N/A",
- "worker status": "Not Started",
- "crawl status": "N/A",
- "files_syncd": 0,
- "files_remaining": 0,
- "bytes_remaining": 0,
- "purges_remaining": 0,
- "total_files_skipped": 0}
- worker_status_file = self.get_worker_status_file()
- try:
- with open(worker_status_file, 'r+') as f:
- loaded_data = json.load(f)
- loaded_data[key] = value
- os.ftruncate(f.fileno(), 0)
- os.lseek(f.fileno(), 0, os.SEEK_SET)
- json.dump(loaded_data, f)
- f.flush()
- os.fsync(f.fileno())
- except (IOError, OSError, ValueError):
- logging.info('Creating new %s' % worker_status_file)
- try:
- with open(worker_status_file, 'wb') as f:
- default_data[key] = value
- json.dump(default_data, f)
- f.flush()
- os.fsync(f.fileno())
- except:
- raise
-
- def update_worker_cumilitive_status(self, files_pending):
- default_data = {"remote_node": "N/A",
- "worker status": "Not Started",
- "crawl status": "N/A",
- "files_syncd": 0,
- "files_remaining": 0,
- "bytes_remaining": 0,
- "purges_remaining": 0,
- "total_files_skipped": 0}
- worker_status_file = self.get_worker_status_file()
- try:
- with open(worker_status_file, 'r+') as f:
- loaded_data = json.load(f)
- loaded_data['files_remaining'] = files_pending['count']
- loaded_data['bytes_remaining'] = files_pending['bytes']
- loaded_data['purges_remaining'] = files_pending['purge']
- os.ftruncate(f.fileno(), 0)
- os.lseek(f.fileno(), 0, os.SEEK_SET)
- json.dump(loaded_data, f)
- f.flush()
- os.fsync(f.fileno())
- except (IOError, OSError, ValueError):
- logging.info('Creating new %s' % worker_status_file)
- try:
- with open(worker_status_file, 'wb') as f:
- default_data['files_remaining'] = files_pending['count']
- default_data['bytes_remaining'] = files_pending['bytes']
- default_data['purges_remaining'] = files_pending['purge']
- json.dump(default_data, f)
- f.flush()
- os.fsync(f.fileno())
- except:
- raise
-
def update_worker_remote_node(self):
node = sys.argv[-1]
- node = node.split("@")[-1]
+ node_data = node.split("@")
+ node = node_data[-1]
remote_node_ip = node.split(":")[0]
- remote_node_vol = node.split(":")[3]
- remote_node = remote_node_ip + '::' + remote_node_vol
- self.update_worker_status('remote_node', remote_node)
-
- def update_worker_health(self, state):
- self.update_worker_status('worker status', state)
-
- def update_worker_crawl_status(self, state):
- self.update_worker_status('crawl status', state)
-
- def update_worker_files_syncd(self):
- default_data = {"remote_node": "N/A",
- "worker status": "Not Started",
- "crawl status": "N/A",
- "files_syncd": 0,
- "files_remaining": 0,
- "bytes_remaining": 0,
- "purges_remaining": 0,
- "total_files_skipped": 0}
- worker_status_file = self.get_worker_status_file()
- try:
- with open(worker_status_file, 'r+') as f:
- loaded_data = json.load(f)
- loaded_data['files_syncd'] += loaded_data['files_remaining']
- loaded_data['files_remaining'] = 0
- loaded_data['bytes_remaining'] = 0
- loaded_data['purges_remaining'] = 0
- os.ftruncate(f.fileno(), 0)
- os.lseek(f.fileno(), 0, os.SEEK_SET)
- json.dump(loaded_data, f)
- f.flush()
- os.fsync(f.fileno())
- except (IOError, OSError, ValueError):
- logging.info('Creating new %s' % worker_status_file)
- try:
- with open(worker_status_file, 'wb') as f:
- json.dump(default_data, f)
- f.flush()
- os.fsync(f.fileno())
- except:
- raise
-
- def update_worker_files_remaining(self, state):
- self.update_worker_status('files_remaining', state)
-
- def update_worker_bytes_remaining(self, state):
- self.update_worker_status('bytes_remaining', state)
-
- def update_worker_purges_remaining(self, state):
- self.update_worker_status('purges_remaining', state)
-
- def update_worker_total_files_skipped(self, value):
- default_data = {"remote_node": "N/A",
- "worker status": "Not Started",
- "crawl status": "N/A",
- "files_syncd": 0,
- "files_remaining": 0,
- "bytes_remaining": 0,
- "purges_remaining": 0,
- "total_files_skipped": 0}
- worker_status_file = self.get_worker_status_file()
- try:
- with open(worker_status_file, 'r+') as f:
- loaded_data = json.load(f)
- loaded_data['total_files_skipped'] = value
- loaded_data['files_remaining'] -= value
- os.ftruncate(f.fileno(), 0)
- os.lseek(f.fileno(), 0, os.SEEK_SET)
- json.dump(loaded_data, f)
- f.flush()
- os.fsync(f.fileno())
- except (IOError, OSError, ValueError):
- logging.info('Creating new %s' % worker_status_file)
- try:
- with open(worker_status_file, 'wb') as f:
- default_data['total_files_skipped'] = value
- json.dump(default_data, f)
- f.flush()
- os.fsync(f.fileno())
- except:
- raise
+ self.status.set_slave_node(remote_node_ip)
def changelogs_batch_process(self, changes):
changelogs_batches = []
@@ -1331,7 +1060,7 @@ class GMasterChangelogMixin(GMasterCommon):
self.process(batch)
def crawl(self):
- self.update_worker_crawl_status("Changelog Crawl")
+ self.status.set_worker_crawl_status("Changelog Crawl")
changes = []
# get stime (from the brick) and purge changelogs
# that are _historical_ to that time.
@@ -1355,16 +1084,17 @@ class GMasterChangelogMixin(GMasterCommon):
self.changelogs_batch_process(changes)
- def register(self, register_time, changelog_agent):
+ def register(self, register_time, changelog_agent, status):
self.changelog_agent = changelog_agent
self.sleep_interval = int(gconf.change_interval)
self.changelog_done_func = self.changelog_agent.done
self.processed_changelogs_dir = os.path.join(self.setup_working_dir(),
".processed")
+ self.status = status
class GMasterChangeloghistoryMixin(GMasterChangelogMixin):
- def register(self, register_time, changelog_agent):
+ def register(self, register_time, changelog_agent, status):
self.changelog_agent = changelog_agent
self.changelog_register_time = register_time
self.history_crawl_start_time = register_time
@@ -1372,10 +1102,11 @@ class GMasterChangeloghistoryMixin(GMasterChangelogMixin):
self.history_turns = 0
self.processed_changelogs_dir = os.path.join(self.setup_working_dir(),
".history/.processed")
+ self.status = status
def crawl(self):
self.history_turns += 1
- self.update_worker_crawl_status("History Crawl")
+ self.status.set_worker_crawl_status("History Crawl")
purge_time = self.get_purge_time()
logging.info('starting history crawl... turns: %s, stime: %s'
@@ -1455,7 +1186,8 @@ class GMasterXsyncMixin(GMasterChangelogMixin):
XSYNC_MAX_ENTRIES = 1 << 13
- def register(self, register_time=None, changelog_agent=None):
+ def register(self, register_time=None, changelog_agent=None, status=None):
+ self.status = status
self.counter = 0
self.comlist = []
self.stimes = []
@@ -1486,7 +1218,7 @@ class GMasterXsyncMixin(GMasterChangelogMixin):
t.start()
logging.info('starting hybrid crawl..., stime: %s'
% repr(self.get_purge_time()))
- self.update_worker_crawl_status("Hybrid Crawl")
+ self.status.set_worker_crawl_status("Hybrid Crawl")
while True:
try:
item = self.comlist.pop(0)