summaryrefslogtreecommitdiffstats
path: root/geo-replication/syncdaemon/master.py
diff options
context:
space:
mode:
authorAravinda VK <avishwan@redhat.com>2017-06-15 18:09:36 +0530
committerAravinda VK <avishwan@redhat.com>2017-06-20 06:00:47 +0000
commit0a8dac38ac4415ea770fb36b34e3c494e8713e6e (patch)
treeb66cd2f3583466bfc2eeb16bdf724e4494cdbc3e /geo-replication/syncdaemon/master.py
parent52d0886cfbcdfd69fa0cac0a6d51cd8811d8c6d7 (diff)
geo-rep: Structured log support
Changed all log messages to structured log format Change-Id: Idae25f8b4ad0bbae38f4362cbda7bbf51ce7607b Updates: #240 Signed-off-by: Aravinda VK <avishwan@redhat.com> Reviewed-on: https://review.gluster.org/17551 Smoke: Gluster Build System <jenkins@build.gluster.org> NetBSD-regression: NetBSD Build System <jenkins@build.gluster.org> CentOS-regression: Gluster Build System <jenkins@build.gluster.org> Reviewed-by: Kotresh HR <khiremat@redhat.com>
Diffstat (limited to 'geo-replication/syncdaemon/master.py')
-rw-r--r--geo-replication/syncdaemon/master.py241
1 files changed, 133 insertions, 108 deletions
diff --git a/geo-replication/syncdaemon/master.py b/geo-replication/syncdaemon/master.py
index aebaf31dcff..17ec550aafa 100644
--- a/geo-replication/syncdaemon/master.py
+++ b/geo-replication/syncdaemon/master.py
@@ -24,7 +24,7 @@ from datetime import datetime
from gconf import gconf
from syncdutils import Thread, GsyncdError, boolify, escape
from syncdutils import unescape, gauxpfx, md5hex, selfkill
-from syncdutils import lstat, errno_wrap, FreeObject
+from syncdutils import lstat, errno_wrap, FreeObject, lf
from syncdutils import NoStimeAvailable, PartialHistoryAvailable
URXTIME = (-1, 0)
@@ -54,8 +54,8 @@ def _volinfo_hook_relax_foreign(self):
fgn_vi = volinfo_sys[self.KFGN]
if fgn_vi:
expiry = fgn_vi['timeout'] - int(time.time()) + 1
- logging.info('foreign volume info found, waiting %d sec for expiry' %
- expiry)
+ logging.info(lf('foreign volume info found, waiting for expiry',
+ expiry=expiry))
time.sleep(expiry)
volinfo_sys = self.get_sys_volinfo()
return volinfo_sys
@@ -90,7 +90,8 @@ def gmaster_builder(excrawl=None):
modemixin = 'normal'
changemixin = 'xsync' if gconf.change_detector == 'xsync' \
else excrawl or gconf.change_detector
- logging.debug('setting up %s change detection mode' % changemixin)
+ logging.debug(lf('setting up change detection mode',
+ mode=changemixin))
modemixin = getattr(this, modemixin.capitalize() + 'Mixin')
crawlmixin = getattr(this, 'GMaster' + changemixin.capitalize() + 'Mixin')
sendmarkmixin = boolify(
@@ -256,7 +257,7 @@ class TarSSHEngine(object):
"""
def a_syncdata(self, files):
- logging.debug('files: %s' % (files))
+ logging.debug(lf("Files", files=files))
for f in files:
pb = self.syncer.add(f)
@@ -264,7 +265,7 @@ class TarSSHEngine(object):
def regjob(se, xte, pb):
rv = pb.wait()
if rv[0]:
- logging.debug('synced ' + se)
+ logging.debug(lf('synced', file=se))
return True
else:
# stat check for file presence
@@ -290,16 +291,16 @@ class RsyncEngine(object):
"""Sync engine that uses rsync(1) for data transfers"""
def a_syncdata(self, files):
- logging.debug('files: %s' % (files))
+ logging.debug(lf("files", files=files))
for f in files:
- logging.debug('candidate for syncing %s' % f)
+ logging.debug(lf('candidate for syncing', file=f))
pb = self.syncer.add(f)
def regjob(se, xte, pb):
rv = pb.wait()
if rv[0]:
- logging.debug('synced ' + se)
+ logging.debug(lf('synced', file=se))
return True
else:
# stat to check if the file exist
@@ -431,16 +432,16 @@ class GMasterCommon(object):
fcntl.lockf(gconf.mgmt_lock_fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
if not gconf.active_earlier:
gconf.active_earlier = True
- logging.info("Got lock : %s : Becoming ACTIVE"
- % gconf.local_path)
+ logging.info(lf("Got lock Becoming ACTIVE",
+ brick=gconf.local_path))
return True
except:
ex = sys.exc_info()[1]
if isinstance(ex, IOError) and ex.errno in (EACCES, EAGAIN):
if not gconf.passive_earlier:
gconf.passive_earlier = True
- logging.info("Didn't get lock : %s : Becoming PASSIVE"
- % gconf.local_path)
+ logging.info(lf("Didn't get lock Becoming PASSIVE",
+ brick=gconf.local_path))
return False
raise
@@ -449,7 +450,7 @@ class GMasterCommon(object):
+ str(gconf.subvol_num) + ".lock"
mgmt_lock_dir = os.path.join(gconf.meta_volume_mnt, "geo-rep")
path = os.path.join(mgmt_lock_dir, bname)
- logging.debug("lock_file_path: %s" % path)
+ logging.debug(lf("lock file path", path=path))
try:
fd = os.open(path, os.O_CREAT | os.O_RDWR)
except OSError:
@@ -477,15 +478,16 @@ class GMasterCommon(object):
# cannot grab, it's taken
if not gconf.passive_earlier:
gconf.passive_earlier = True
- logging.info("Didn't get lock : %s : Becoming PASSIVE"
- % gconf.local_path)
+ logging.info(lf("Didn't get lock Becoming PASSIVE",
+ brick=gconf.local_path))
gconf.mgmt_lock_fd = fd
return False
raise
if not gconf.active_earlier:
gconf.active_earlier = True
- logging.info("Got lock : %s : Becoming ACTIVE" % gconf.local_path)
+ logging.info(lf("Got lock Becoming ACTIVE",
+ brick=gconf.local_path))
return True
def should_crawl(self):
@@ -533,8 +535,8 @@ class GMasterCommon(object):
gconf.configinterface.set('volume_id', self.uuid)
if self.volinfo:
if self.volinfo['retval']:
- logging.warn("master cluster's info may not be valid %d" %
- self.volinfo['retval'])
+ logging.warn(lf("master cluster's info may not be valid",
+ error=self.volinfo['retval']))
else:
raise GsyncdError("master volinfo unavailable")
self.lastreport['time'] = time.time()
@@ -566,8 +568,10 @@ class GMasterCommon(object):
brick_stime = self.xtime('.', self.slave)
cluster_stime = self.master.server.aggregated.stime_mnt(
'.', '.'.join([str(self.uuid), str(gconf.slave_id)]))
- logging.debug("Cluster stime: %s | Brick stime: %s" %
- (repr(cluster_stime), repr(brick_stime)))
+ logging.debug(lf("Crawl info",
+ cluster_stime=cluster_stime,
+ brick_stime=brick_stime))
+
if not isinstance(cluster_stime, int):
if brick_stime < cluster_stime:
self.slave.server.set_stime(
@@ -773,8 +777,8 @@ class GMasterChangelogMixin(GMasterCommon):
st = lstat(os.path.join(gfid_prefix, failure[0][entry_key]))
if not isinstance(st, int):
num_failures += 1
- logging.error('%s FAILED: %s' % (log_prefix,
- repr(failure)))
+ logging.error(lf('%s FAILED' % log_prefix,
+ data=failure))
if failure[0]['op'] == 'MKDIR':
raise GsyncdError("The above directory failed to sync."
" Please fix it to proceed further.")
@@ -826,8 +830,8 @@ class GMasterChangelogMixin(GMasterCommon):
if self.name == 'live_changelog' or \
self.name == 'history_changelog':
if boolify(gconf.is_hottier) and et == self.TYPE_ENTRY:
- logging.debug('skip ENTRY op: %s if hot tier brick'
- % (ec[self.POS_TYPE]))
+ logging.debug(lf('skip ENTRY op if hot tier brick',
+ op=ec[self.POS_TYPE]))
continue
# Data and Meta operations are decided while parsing
@@ -917,7 +921,8 @@ class GMasterChangelogMixin(GMasterCommon):
go = os.path.join(pfx, gfid)
st = lstat(go)
if isinstance(st, int):
- logging.debug('file %s got purged in the interim' % go)
+ logging.debug(lf('file got purged in the interim',
+ file=go))
continue
if ty == 'LINK':
@@ -930,7 +935,9 @@ class GMasterChangelogMixin(GMasterCommon):
entries.append(
edct(ty, stat=st, entry=en, gfid=gfid, link=rl))
else:
- logging.warn('ignoring %s [op %s]' % (gfid, ty))
+ logging.warn(lf('ignoring op',
+ gfid=gfid,
+ type=ty))
elif et == self.TYPE_GFID:
# If self.unlinked_gfids is available, then that means it is
# retrying the changelog second time. Do not add the GFID's
@@ -962,7 +969,8 @@ class GMasterChangelogMixin(GMasterCommon):
(boolify(gconf.sync_xattrs) or boolify(gconf.sync_acls)):
datas.add(os.path.join(pfx, ec[0]))
else:
- logging.warn('got invalid changelog type: %s' % (et))
+ logging.warn(lf('got invalid fop type',
+ type=et))
logging.debug('entries: %s' % repr(entries))
# Increment counters for Status
@@ -1011,7 +1019,8 @@ class GMasterChangelogMixin(GMasterCommon):
else:
st = lstat(go[0])
if isinstance(st, int):
- logging.debug('file %s got purged in the interim' % go[0])
+ logging.debug(lf('file got purged in the interim',
+ file=go[0]))
continue
meta_entries.append(edct('META', go=go[0], stat=st))
if meta_entries:
@@ -1067,7 +1076,8 @@ class GMasterChangelogMixin(GMasterCommon):
self.a_syncdata(self.datas_in_batch)
else:
for change in changes:
- logging.debug('processing change %s' % change)
+ logging.debug(lf('processing change',
+ changelog=change))
self.process_change(change, done, retry)
if not retry:
# number of changelogs processed in the batch
@@ -1111,9 +1121,9 @@ class GMasterChangelogMixin(GMasterCommon):
retry = True
tries += 1
if tries == int(gconf.max_rsync_retries):
- logging.error('changelogs %s could not be processed '
- 'completely - moving on...' %
- ' '.join(map(os.path.basename, changes)))
+ logging.error(lf('changelogs could not be processed '
+ 'completely - moving on...',
+ files=map(os.path.basename, changes)))
# Reset data counter on failure
self.status.dec_value("data", self.files_in_batch)
@@ -1133,8 +1143,8 @@ class GMasterChangelogMixin(GMasterCommon):
# entry_ops() that failed... so we retry the _whole_ changelog
# again.
# TODO: remove entry retries when it's gets fixed.
- logging.warn('incomplete sync, retrying changelogs: %s' %
- ' '.join(map(os.path.basename, changes)))
+ logging.warn(lf('incomplete sync, retrying changelogs',
+ files=map(os.path.basename, changes)))
# Reset the Data counter before Retry
self.status.dec_value("data", self.files_in_batch)
@@ -1145,43 +1155,44 @@ class GMasterChangelogMixin(GMasterCommon):
# Log the Skipped Entry ops range if any
if self.skipped_entry_changelogs_first is not None and \
self.skipped_entry_changelogs_last is not None:
- logging.info("Skipping already processed entry "
- "ops from CHANGELOG.{0} to CHANGELOG.{1} "
- "Num: {2}".format(
- self.skipped_entry_changelogs_first,
- self.skipped_entry_changelogs_last,
- self.num_skipped_entry_changelogs))
+ logging.info(lf("Skipping already processed entry ops",
+ from_changelog=self.skipped_entry_changelogs_first,
+ to_changelog=self.skipped_entry_changelogs_last,
+ num_changelogs=self.num_skipped_entry_changelogs))
# Log Current batch details
if changes:
logging.info(
- "Entry Time Taken (UNL:{0} RMD:{1} CRE:{2} MKN:{3} "
- "MKD:{4} REN:{5} LIN:{6} SYM:{7}): {8:.4f} "
- "secs ".format (
- self.batch_stats["UNLINK"], self.batch_stats["RMDIR"],
- self.batch_stats["CREATE"], self.batch_stats["MKNOD"],
- self.batch_stats["MKDIR"], self.batch_stats["RENAME"],
- self.batch_stats["LINK"], self.batch_stats["SYMLINK"],
- self.batch_stats["ENTRY_SYNC_TIME"]))
+ lf("Entry Time Taken",
+ UNL=self.batch_stats["UNLINK"],
+ RMD=self.batch_stats["RMDIR"],
+ CRE=self.batch_stats["CREATE"],
+ MKN=self.batch_stats["MKNOD"],
+ MKD=self.batch_stats["MKDIR"],
+ REN=self.batch_stats["RENAME"],
+ LIN=self.batch_stats["LINK"],
+ SYM=self.batch_stats["SYMLINK"],
+ duration="%.4f" % self.batch_stats["ENTRY_SYNC_TIME"]))
+
logging.info(
- "Metadata Time Taken (SETA:{0}): {1:.4f} secs. "
- "Data Time Taken (SETX:{2} XATT:{3} DATA:{4}): "
- "{5:.4f} secs".format(
- self.batch_stats["SETATTR"],
- self.batch_stats["META_SYNC_TIME"],
- self.batch_stats["SETXATTR"], self.batch_stats["XATTROP"],
- self.batch_stats["DATA"],
- time.time() - self.batch_stats["DATA_START_TIME"]))
+ lf("Data/Metadata Time Taken",
+ SETA=self.batch_stats["SETATTR"],
+ meta_duration="%.4f" % self.batch_stats["META_SYNC_TIME"],
+ SETX=self.batch_stats["SETXATTR"],
+ XATT=self.batch_stats["XATTROP"],
+ DATA=self.batch_stats["DATA"],
+ data_duration="%.4f" % (
+ time.time() - self.batch_stats["DATA_START_TIME"])))
+
logging.info(
- "{0} mode completed in {1:.4f} seconds "
- "({2} - {3} Num: {4}) stime: {5}, entry_stime: {6}".format(
- self.name,
- time.time() - self.batch_start_time,
- changes[0].split("/")[-1],
- changes[-1].split("/")[-1],
- len(changes),
- repr(self.get_data_stime()),
- repr(self.get_entry_stime())))
+ lf("Batch Completed",
+ mode=self.name,
+ duration="%.4f" % (time.time() - self.batch_start_time),
+ changelog_start=changes[0].split(".")[-1],
+ changelog_end=changes[-1].split(".")[-1],
+ num_changelogs=len(changes),
+ stime=self.get_data_stime(),
+ entry_stime=self.get_entry_stime()))
def upd_entry_stime(self, stime):
self.slave.server.set_entry_stime(self.FLAT_DIR_HIERARCHY,
@@ -1231,7 +1242,8 @@ class GMasterChangelogMixin(GMasterCommon):
changelogs_batches[-1].append(c)
for batch in changelogs_batches:
- logging.debug('processing changes %s' % repr(batch))
+ logging.debug(lf('processing changes',
+ batch=batch))
self.process(batch)
def crawl(self):
@@ -1246,13 +1258,14 @@ class GMasterChangelogMixin(GMasterCommon):
changes = self.changelog_agent.getchanges()
if changes:
if data_stime:
- logging.info("slave's time: %s" % repr(data_stime))
+ logging.info(lf("slave's time",
+ stime=data_stime))
processed = [x for x in changes
if int(x.split('.')[-1]) < data_stime[0]]
for pr in processed:
- logging.info(
- 'skipping already processed change: %s...' %
- os.path.basename(pr))
+ logging.debug(
+ lf('skipping already processed change',
+ changelog=os.path.basename(pr)))
self.changelog_done_func(pr)
changes.remove(pr)
self.archive_and_purge_changelogs(processed)
@@ -1289,10 +1302,11 @@ class GMasterChangeloghistoryMixin(GMasterChangelogMixin):
data_stime = self.get_data_stime()
end_time = int(time.time())
- logging.info('starting history crawl... turns: %s, stime: %s, '
- 'etime: %s, entry_stime: %s'
- % (self.history_turns, repr(data_stime),
- repr(end_time), self.get_entry_stime()))
+ logging.info(lf('starting history crawl',
+ turns=self.history_turns,
+ stime=data_stime,
+ etime=end_time,
+ entry_stime=self.get_entry_stime()))
if not data_stime or data_stime == URXTIME:
raise NoStimeAvailable()
@@ -1320,12 +1334,13 @@ class GMasterChangeloghistoryMixin(GMasterChangelogMixin):
changes = self.changelog_agent.history_getchanges()
if changes:
if data_stime:
- logging.info("slave's time: %s" % repr(data_stime))
+ logging.info(lf("slave's time",
+ stime=data_stime))
processed = [x for x in changes
if int(x.split('.')[-1]) < data_stime[0]]
for pr in processed:
- logging.info('skipping already processed change: '
- '%s...' % os.path.basename(pr))
+ logging.debug(lf('skipping already processed change',
+ changelog=os.path.basename(pr)))
self.changelog_done_func(pr)
changes.remove(pr)
@@ -1333,10 +1348,10 @@ class GMasterChangeloghistoryMixin(GMasterChangelogMixin):
history_turn_time = int(time.time()) - self.history_crawl_start_time
- logging.info('finished history crawl syncing, endtime: %s, '
- 'stime: %s, entry_stime: %s'
- % (actual_end, repr(self.get_data_stime()),
- self.get_entry_stime()))
+ logging.info(lf('finished history crawl',
+ endtime=actual_end,
+ stime=self.get_data_stime(),
+ entry_stime=self.get_entry_stime()))
# If TS returned from history_changelog is < register_time
# then FS crawl may be required, since history is only available
@@ -1376,7 +1391,8 @@ class GMasterXsyncMixin(GMasterChangelogMixin):
self.stimes = []
self.sleep_interval = 60
self.tempdir = self.setup_working_dir()
- logging.info('Working dir: %s' % self.tempdir)
+ logging.info(lf('Working dir',
+ path=self.tempdir))
self.tempdir = os.path.join(self.tempdir, 'xsync')
self.processed_changelogs_dir = self.tempdir
self.name = "xsync"
@@ -1400,25 +1416,28 @@ class GMasterXsyncMixin(GMasterChangelogMixin):
self.Xcrawl()
t = Thread(target=Xsyncer)
t.start()
- logging.info('starting hybrid crawl..., stime: %s'
- % repr(self.get_data_stime()))
+ logging.info(lf('starting hybrid crawl',
+ stime=self.get_data_stime()))
self.status.set_worker_crawl_status("Hybrid Crawl")
while True:
try:
item = self.comlist.pop(0)
if item[0] == 'finale':
- logging.info('finished hybrid crawl syncing, stime: %s'
- % repr(self.get_data_stime()))
+ logging.info(lf('finished hybrid crawl',
+ stime=self.get_data_stime()))
break
elif item[0] == 'xsync':
- logging.info('processing xsync changelog %s' % (item[1]))
+ logging.info(lf('processing xsync changelog',
+ path=item[1]))
self.process([item[1]], 0)
self.archive_and_purge_changelogs([item[1]])
elif item[0] == 'stime':
- logging.debug('setting slave time: %s' % repr(item[1]))
+ logging.debug(lf('setting slave time',
+ time=item[1]))
self.upd_stime(item[1][1], item[1][0])
else:
- logging.warn('unknown tuple in comlist (%s)' % repr(item))
+ logging.warn(lf('unknown tuple in comlist',
+ entry=item))
except IndexError:
time.sleep(1)
@@ -1496,8 +1515,9 @@ class GMasterXsyncMixin(GMasterChangelogMixin):
xtr_root = self.xtime('.', self.slave)
if isinstance(xtr_root, int):
if xtr_root != ENOENT:
- logging.warn("slave cluster not returning the "
- "correct xtime for root (%d)" % xtr_root)
+ logging.warn(lf("slave cluster not returning the "
+ "correct xtime for root",
+ xtime=xtr_root))
xtr_root = self.minus_infinity
xtl = self.xtime(path)
if isinstance(xtl, int):
@@ -1505,8 +1525,10 @@ class GMasterXsyncMixin(GMasterChangelogMixin):
xtr = self.xtime(path, self.slave)
if isinstance(xtr, int):
if xtr != ENOENT:
- logging.warn("slave cluster not returning the "
- "correct xtime for %s (%d)" % (path, xtr))
+ logging.warn(lf("slave cluster not returning the "
+ "correct xtime",
+ path=path,
+ xtime=xtr))
xtr = self.minus_infinity
xtr = max(xtr, xtr_root)
zero_zero = (0, 0)
@@ -1521,27 +1543,32 @@ class GMasterXsyncMixin(GMasterChangelogMixin):
dem = self.master.server.entries(path)
pargfid = self.master.server.gfid(path)
if isinstance(pargfid, int):
- logging.warn('skipping directory %s' % (path))
+ logging.warn(lf('skipping directory',
+ path=path))
for e in dem:
bname = e
e = os.path.join(path, e)
xte = self.xtime(e)
if isinstance(xte, int):
- logging.warn("irregular xtime for %s: %s" %
- (e, errno.errorcode[xte]))
+ logging.warn(lf("irregular xtime",
+ path=e,
+ error=errno.errorcode[xte]))
continue
if not self.need_sync(e, xte, xtr):
continue
st = self.master.server.lstat(e)
if isinstance(st, int):
- logging.warn('%s got purged in the interim ...' % e)
+ logging.warn(lf('got purged in the interim',
+ path=e))
continue
if self.is_sticky(e, st.st_mode):
- logging.debug('ignoring sticky bit file %s' % e)
+ logging.debug(lf('ignoring sticky bit file',
+ path=e))
continue
gfid = self.master.server.gfid(e)
if isinstance(gfid, int):
- logging.warn('skipping entry %s..' % e)
+ logging.warn(lf('skipping entry',
+ path=e))
continue
mo = st.st_mode
self.counter += 1 if ((stat.S_ISDIR(mo) or
@@ -1704,14 +1731,12 @@ class Syncer(object):
pb.close()
start = time.time()
po = self.sync_engine(pb, self.log_err)
- logging.info("Sync Time Taken (Job:{0} "
- "Files:{1} ReturnCode:{2}): "
- "{3:.4f} secs".format(
- job_id,
- len(pb),
- po.returncode,
- time.time() - start
- ))
+ logging.info(lf("Sync Time Taken",
+ job=job_id,
+ num_files=len(pb),
+ return_code=po.returncode,
+ duration="%.4f" % (time.time() - start)))
+
if po.returncode == 0:
ret = (True, 0)
elif po.returncode in self.errnos_ok: