summaryrefslogtreecommitdiffstats
path: root/geo-replication
diff options
context:
space:
mode:
Diffstat (limited to 'geo-replication')
-rw-r--r--geo-replication/syncdaemon/gsyncd.py48
-rw-r--r--geo-replication/syncdaemon/gsyncdstatus.py22
-rw-r--r--geo-replication/syncdaemon/master.py241
-rw-r--r--geo-replication/syncdaemon/monitor.py61
-rw-r--r--geo-replication/syncdaemon/repce.py8
-rw-r--r--geo-replication/syncdaemon/resource.py92
-rw-r--r--geo-replication/syncdaemon/syncdutils.py27
7 files changed, 289 insertions, 210 deletions
diff --git a/geo-replication/syncdaemon/gsyncd.py b/geo-replication/syncdaemon/gsyncd.py
index 3ddcb7f5454..932e37d1124 100644
--- a/geo-replication/syncdaemon/gsyncd.py
+++ b/geo-replication/syncdaemon/gsyncd.py
@@ -39,7 +39,7 @@ from changelogagent import agent, Changelog
from gsyncdstatus import set_monitor_status, GeorepStatus, human_time_utc
from libcxattr import Xattr
import struct
-from syncdutils import get_master_and_slave_data_from_args
+from syncdutils import get_master_and_slave_data_from_args, lf
ParseError = XET.ParseError if hasattr(XET, 'ParseError') else SyntaxError
@@ -127,24 +127,30 @@ def slave_vol_uuid_get(host, vol):
stdin=None, stdout=PIPE, stderr=PIPE)
vix, err = po.communicate()
if po.returncode != 0:
- logging.info("Volume info failed, unable to get "
- "volume uuid of %s present in %s,"
- "returning empty string: %s" %
- (vol, host, po.returncode))
+ logging.info(lf("Volume info failed, unable to get "
+ "volume uuid of slavevol, "
+ "returning empty string",
+ slavevol=vol,
+ slavehost=host,
+ error=po.returncode))
return ""
vi = XET.fromstring(vix)
if vi.find('opRet').text != '0':
- logging.info("Unable to get volume uuid of %s, "
- "present in %s returning empty string: %s" %
- (vol, host, vi.find('opErrstr').text))
+ logging.info(lf("Unable to get volume uuid of slavevol, "
+ "returning empty string",
+ slavevol=vol,
+ slavehost=host,
+ error=vi.find('opErrstr').text))
return ""
try:
voluuid = vi.find("volInfo/volumes/volume/id").text
except (ParseError, AttributeError, ValueError) as e:
- logging.info("Parsing failed to volume uuid of %s, "
- "present in %s returning empty string: %s" %
- (vol, host, e))
+ logging.info(lf("Parsing failed to volume uuid of slavevol, "
+ "returning empty string",
+ slavevol=vol,
+ slavehost=host,
+ error=e))
voluuid = ""
return voluuid
@@ -692,16 +698,18 @@ def main_i():
if confdata.op == 'set':
if confdata.opt == 'checkpoint':
- logging.info("Checkpoint Set: %s" % (
- human_time_utc(confdata.val)))
+ logging.info(lf("Checkpoint Set",
+ time=human_time_utc(confdata.val)))
else:
- logging.info("Config Set: %s = %s" % (
- confdata.opt, confdata.val))
+ logging.info(lf("Config Set",
+ config=confdata.opt,
+ value=confdata.val))
elif confdata.op == 'del':
if confdata.opt == 'checkpoint':
logging.info("Checkpoint Reset")
else:
- logging.info("Config Reset: %s" % confdata.opt)
+ logging.info(lf("Config Reset",
+ config=confdata.opt))
except IOError:
if sys.exc_info()[1].errno == ENOENT:
# directory of log path is not present,
@@ -722,7 +730,8 @@ def main_i():
try:
GLogger._gsyncd_loginit(log_file=gconf.log_file, label='monitor')
gconf.log_exit = False
- logging.info("Monitor Status: %s" % create)
+ logging.info(lf("Monitor Status Change",
+ status=create))
except IOError:
if sys.exc_info()[1].errno == ENOENT:
# If log dir not present
@@ -772,7 +781,8 @@ def main_i():
if be_agent:
os.setsid()
- logging.debug('rpc_fd: %s' % repr(gconf.rpc_fd))
+ logging.debug(lf("RPC FD",
+ rpc_fd=repr(gconf.rpc_fd)))
return agent(Changelog(), gconf.rpc_fd)
if be_monitor:
@@ -786,7 +796,7 @@ def main_i():
remote.connect_remote(go_daemon='done')
local.connect()
if ffd:
- logging.info ("Closing feedback fd, waking up the monitor")
+ logging.info("Closing feedback fd, waking up the monitor")
os.close(ffd)
local.service_loop(*[r for r in [remote] if r])
diff --git a/geo-replication/syncdaemon/gsyncdstatus.py b/geo-replication/syncdaemon/gsyncdstatus.py
index dd363baf181..38ca92c73a9 100644
--- a/geo-replication/syncdaemon/gsyncdstatus.py
+++ b/geo-replication/syncdaemon/gsyncdstatus.py
@@ -20,7 +20,7 @@ from errno import EACCES, EAGAIN, ENOENT
import logging
from syncdutils import EVENT_GEOREP_ACTIVE, EVENT_GEOREP_PASSIVE, gf_event
-from syncdutils import EVENT_GEOREP_CHECKPOINT_COMPLETED
+from syncdutils import EVENT_GEOREP_CHECKPOINT_COMPLETED, lf
DEFAULT_STATUS = "N/A"
MONITOR_STATUS = ("Created", "Started", "Paused", "Stopped")
@@ -225,10 +225,10 @@ class GeorepStatus(object):
data["checkpoint_time"] = checkpoint_time
data["checkpoint_completion_time"] = curr_time
data["checkpoint_completed"] = "Yes"
- logging.info("Checkpoint completed. Checkpoint "
- "Time: %s, Completion Time: %s" % (
- human_time_utc(checkpoint_time),
- human_time_utc(curr_time)))
+ logging.info(lf("Checkpoint completed",
+ checkpoint_time=human_time_utc(
+ checkpoint_time),
+ completion_time=human_time_utc(curr_time)))
self.trigger_gf_event_checkpoint_completion(
checkpoint_time, curr_time)
@@ -238,11 +238,13 @@ class GeorepStatus(object):
def set_worker_status(self, status):
if self.set_field("worker_status", status):
- logging.info("Worker Status: %s" % status)
+ logging.info(lf("Worker Status Change",
+ status=status))
def set_worker_crawl_status(self, status):
if self.set_field("crawl_status", status):
- logging.info("Crawl Status: %s" % status)
+ logging.info(lf("Crawl Status Change",
+ status=status))
def set_slave_node(self, slave_node):
def merger(data):
@@ -269,12 +271,14 @@ class GeorepStatus(object):
def set_active(self):
if self.set_field("worker_status", "Active"):
- logging.info("Worker Status: Active")
+ logging.info(lf("Worker Status Change",
+ status="Active"))
self.send_event(EVENT_GEOREP_ACTIVE)
def set_passive(self):
if self.set_field("worker_status", "Passive"):
- logging.info("Worker Status: Passive")
+ logging.info(lf("Worker Status Change",
+ status="Passive"))
self.send_event(EVENT_GEOREP_PASSIVE)
def get_monitor_status(self):
diff --git a/geo-replication/syncdaemon/master.py b/geo-replication/syncdaemon/master.py
index aebaf31dcff..17ec550aafa 100644
--- a/geo-replication/syncdaemon/master.py
+++ b/geo-replication/syncdaemon/master.py
@@ -24,7 +24,7 @@ from datetime import datetime
from gconf import gconf
from syncdutils import Thread, GsyncdError, boolify, escape
from syncdutils import unescape, gauxpfx, md5hex, selfkill
-from syncdutils import lstat, errno_wrap, FreeObject
+from syncdutils import lstat, errno_wrap, FreeObject, lf
from syncdutils import NoStimeAvailable, PartialHistoryAvailable
URXTIME = (-1, 0)
@@ -54,8 +54,8 @@ def _volinfo_hook_relax_foreign(self):
fgn_vi = volinfo_sys[self.KFGN]
if fgn_vi:
expiry = fgn_vi['timeout'] - int(time.time()) + 1
- logging.info('foreign volume info found, waiting %d sec for expiry' %
- expiry)
+ logging.info(lf('foreign volume info found, waiting for expiry',
+ expiry=expiry))
time.sleep(expiry)
volinfo_sys = self.get_sys_volinfo()
return volinfo_sys
@@ -90,7 +90,8 @@ def gmaster_builder(excrawl=None):
modemixin = 'normal'
changemixin = 'xsync' if gconf.change_detector == 'xsync' \
else excrawl or gconf.change_detector
- logging.debug('setting up %s change detection mode' % changemixin)
+ logging.debug(lf('setting up change detection mode',
+ mode=changemixin))
modemixin = getattr(this, modemixin.capitalize() + 'Mixin')
crawlmixin = getattr(this, 'GMaster' + changemixin.capitalize() + 'Mixin')
sendmarkmixin = boolify(
@@ -256,7 +257,7 @@ class TarSSHEngine(object):
"""
def a_syncdata(self, files):
- logging.debug('files: %s' % (files))
+ logging.debug(lf("Files", files=files))
for f in files:
pb = self.syncer.add(f)
@@ -264,7 +265,7 @@ class TarSSHEngine(object):
def regjob(se, xte, pb):
rv = pb.wait()
if rv[0]:
- logging.debug('synced ' + se)
+ logging.debug(lf('synced', file=se))
return True
else:
# stat check for file presence
@@ -290,16 +291,16 @@ class RsyncEngine(object):
"""Sync engine that uses rsync(1) for data transfers"""
def a_syncdata(self, files):
- logging.debug('files: %s' % (files))
+ logging.debug(lf("files", files=files))
for f in files:
- logging.debug('candidate for syncing %s' % f)
+ logging.debug(lf('candidate for syncing', file=f))
pb = self.syncer.add(f)
def regjob(se, xte, pb):
rv = pb.wait()
if rv[0]:
- logging.debug('synced ' + se)
+ logging.debug(lf('synced', file=se))
return True
else:
# stat to check if the file exist
@@ -431,16 +432,16 @@ class GMasterCommon(object):
fcntl.lockf(gconf.mgmt_lock_fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
if not gconf.active_earlier:
gconf.active_earlier = True
- logging.info("Got lock : %s : Becoming ACTIVE"
- % gconf.local_path)
+ logging.info(lf("Got lock Becoming ACTIVE",
+ brick=gconf.local_path))
return True
except:
ex = sys.exc_info()[1]
if isinstance(ex, IOError) and ex.errno in (EACCES, EAGAIN):
if not gconf.passive_earlier:
gconf.passive_earlier = True
- logging.info("Didn't get lock : %s : Becoming PASSIVE"
- % gconf.local_path)
+ logging.info(lf("Didn't get lock Becoming PASSIVE",
+ brick=gconf.local_path))
return False
raise
@@ -449,7 +450,7 @@ class GMasterCommon(object):
+ str(gconf.subvol_num) + ".lock"
mgmt_lock_dir = os.path.join(gconf.meta_volume_mnt, "geo-rep")
path = os.path.join(mgmt_lock_dir, bname)
- logging.debug("lock_file_path: %s" % path)
+ logging.debug(lf("lock file path", path=path))
try:
fd = os.open(path, os.O_CREAT | os.O_RDWR)
except OSError:
@@ -477,15 +478,16 @@ class GMasterCommon(object):
# cannot grab, it's taken
if not gconf.passive_earlier:
gconf.passive_earlier = True
- logging.info("Didn't get lock : %s : Becoming PASSIVE"
- % gconf.local_path)
+ logging.info(lf("Didn't get lock Becoming PASSIVE",
+ brick=gconf.local_path))
gconf.mgmt_lock_fd = fd
return False
raise
if not gconf.active_earlier:
gconf.active_earlier = True
- logging.info("Got lock : %s : Becoming ACTIVE" % gconf.local_path)
+ logging.info(lf("Got lock Becoming ACTIVE",
+ brick=gconf.local_path))
return True
def should_crawl(self):
@@ -533,8 +535,8 @@ class GMasterCommon(object):
gconf.configinterface.set('volume_id', self.uuid)
if self.volinfo:
if self.volinfo['retval']:
- logging.warn("master cluster's info may not be valid %d" %
- self.volinfo['retval'])
+ logging.warn(lf("master cluster's info may not be valid",
+ error=self.volinfo['retval']))
else:
raise GsyncdError("master volinfo unavailable")
self.lastreport['time'] = time.time()
@@ -566,8 +568,10 @@ class GMasterCommon(object):
brick_stime = self.xtime('.', self.slave)
cluster_stime = self.master.server.aggregated.stime_mnt(
'.', '.'.join([str(self.uuid), str(gconf.slave_id)]))
- logging.debug("Cluster stime: %s | Brick stime: %s" %
- (repr(cluster_stime), repr(brick_stime)))
+ logging.debug(lf("Crawl info",
+ cluster_stime=cluster_stime,
+ brick_stime=brick_stime))
+
if not isinstance(cluster_stime, int):
if brick_stime < cluster_stime:
self.slave.server.set_stime(
@@ -773,8 +777,8 @@ class GMasterChangelogMixin(GMasterCommon):
st = lstat(os.path.join(gfid_prefix, failure[0][entry_key]))
if not isinstance(st, int):
num_failures += 1
- logging.error('%s FAILED: %s' % (log_prefix,
- repr(failure)))
+ logging.error(lf('%s FAILED' % log_prefix,
+ data=failure))
if failure[0]['op'] == 'MKDIR':
raise GsyncdError("The above directory failed to sync."
" Please fix it to proceed further.")
@@ -826,8 +830,8 @@ class GMasterChangelogMixin(GMasterCommon):
if self.name == 'live_changelog' or \
self.name == 'history_changelog':
if boolify(gconf.is_hottier) and et == self.TYPE_ENTRY:
- logging.debug('skip ENTRY op: %s if hot tier brick'
- % (ec[self.POS_TYPE]))
+ logging.debug(lf('skip ENTRY op if hot tier brick',
+ op=ec[self.POS_TYPE]))
continue
# Data and Meta operations are decided while parsing
@@ -917,7 +921,8 @@ class GMasterChangelogMixin(GMasterCommon):
go = os.path.join(pfx, gfid)
st = lstat(go)
if isinstance(st, int):
- logging.debug('file %s got purged in the interim' % go)
+ logging.debug(lf('file got purged in the interim',
+ file=go))
continue
if ty == 'LINK':
@@ -930,7 +935,9 @@ class GMasterChangelogMixin(GMasterCommon):
entries.append(
edct(ty, stat=st, entry=en, gfid=gfid, link=rl))
else:
- logging.warn('ignoring %s [op %s]' % (gfid, ty))
+ logging.warn(lf('ignoring op',
+ gfid=gfid,
+ type=ty))
elif et == self.TYPE_GFID:
# If self.unlinked_gfids is available, then that means it is
# retrying the changelog second time. Do not add the GFID's
@@ -962,7 +969,8 @@ class GMasterChangelogMixin(GMasterCommon):
(boolify(gconf.sync_xattrs) or boolify(gconf.sync_acls)):
datas.add(os.path.join(pfx, ec[0]))
else:
- logging.warn('got invalid changelog type: %s' % (et))
+ logging.warn(lf('got invalid fop type',
+ type=et))
logging.debug('entries: %s' % repr(entries))
# Increment counters for Status
@@ -1011,7 +1019,8 @@ class GMasterChangelogMixin(GMasterCommon):
else:
st = lstat(go[0])
if isinstance(st, int):
- logging.debug('file %s got purged in the interim' % go[0])
+ logging.debug(lf('file got purged in the interim',
+ file=go[0]))
continue
meta_entries.append(edct('META', go=go[0], stat=st))
if meta_entries:
@@ -1067,7 +1076,8 @@ class GMasterChangelogMixin(GMasterCommon):
self.a_syncdata(self.datas_in_batch)
else:
for change in changes:
- logging.debug('processing change %s' % change)
+ logging.debug(lf('processing change',
+ changelog=change))
self.process_change(change, done, retry)
if not retry:
# number of changelogs processed in the batch
@@ -1111,9 +1121,9 @@ class GMasterChangelogMixin(GMasterCommon):
retry = True
tries += 1
if tries == int(gconf.max_rsync_retries):
- logging.error('changelogs %s could not be processed '
- 'completely - moving on...' %
- ' '.join(map(os.path.basename, changes)))
+ logging.error(lf('changelogs could not be processed '
+ 'completely - moving on...',
+ files=map(os.path.basename, changes)))
# Reset data counter on failure
self.status.dec_value("data", self.files_in_batch)
@@ -1133,8 +1143,8 @@ class GMasterChangelogMixin(GMasterCommon):
# entry_ops() that failed... so we retry the _whole_ changelog
# again.
# TODO: remove entry retries when it's gets fixed.
- logging.warn('incomplete sync, retrying changelogs: %s' %
- ' '.join(map(os.path.basename, changes)))
+ logging.warn(lf('incomplete sync, retrying changelogs',
+ files=map(os.path.basename, changes)))
# Reset the Data counter before Retry
self.status.dec_value("data", self.files_in_batch)
@@ -1145,43 +1155,44 @@ class GMasterChangelogMixin(GMasterCommon):
# Log the Skipped Entry ops range if any
if self.skipped_entry_changelogs_first is not None and \
self.skipped_entry_changelogs_last is not None:
- logging.info("Skipping already processed entry "
- "ops from CHANGELOG.{0} to CHANGELOG.{1} "
- "Num: {2}".format(
- self.skipped_entry_changelogs_first,
- self.skipped_entry_changelogs_last,
- self.num_skipped_entry_changelogs))
+ logging.info(lf("Skipping already processed entry ops",
+ from_changelog=self.skipped_entry_changelogs_first,
+ to_changelog=self.skipped_entry_changelogs_last,
+ num_changelogs=self.num_skipped_entry_changelogs))
# Log Current batch details
if changes:
logging.info(
- "Entry Time Taken (UNL:{0} RMD:{1} CRE:{2} MKN:{3} "
- "MKD:{4} REN:{5} LIN:{6} SYM:{7}): {8:.4f} "
- "secs ".format (
- self.batch_stats["UNLINK"], self.batch_stats["RMDIR"],
- self.batch_stats["CREATE"], self.batch_stats["MKNOD"],
- self.batch_stats["MKDIR"], self.batch_stats["RENAME"],
- self.batch_stats["LINK"], self.batch_stats["SYMLINK"],
- self.batch_stats["ENTRY_SYNC_TIME"]))
+ lf("Entry Time Taken",
+ UNL=self.batch_stats["UNLINK"],
+ RMD=self.batch_stats["RMDIR"],
+ CRE=self.batch_stats["CREATE"],
+ MKN=self.batch_stats["MKNOD"],
+ MKD=self.batch_stats["MKDIR"],
+ REN=self.batch_stats["RENAME"],
+ LIN=self.batch_stats["LINK"],
+ SYM=self.batch_stats["SYMLINK"],
+ duration="%.4f" % self.batch_stats["ENTRY_SYNC_TIME"]))
+
logging.info(
- "Metadata Time Taken (SETA:{0}): {1:.4f} secs. "
- "Data Time Taken (SETX:{2} XATT:{3} DATA:{4}): "
- "{5:.4f} secs".format(
- self.batch_stats["SETATTR"],
- self.batch_stats["META_SYNC_TIME"],
- self.batch_stats["SETXATTR"], self.batch_stats["XATTROP"],
- self.batch_stats["DATA"],
- time.time() - self.batch_stats["DATA_START_TIME"]))
+ lf("Data/Metadata Time Taken",
+ SETA=self.batch_stats["SETATTR"],
+ meta_duration="%.4f" % self.batch_stats["META_SYNC_TIME"],
+ SETX=self.batch_stats["SETXATTR"],
+ XATT=self.batch_stats["XATTROP"],
+ DATA=self.batch_stats["DATA"],
+ data_duration="%.4f" % (
+ time.time() - self.batch_stats["DATA_START_TIME"])))
+
logging.info(
- "{0} mode completed in {1:.4f} seconds "
- "({2} - {3} Num: {4}) stime: {5}, entry_stime: {6}".format(
- self.name,
- time.time() - self.batch_start_time,
- changes[0].split("/")[-1],
- changes[-1].split("/")[-1],
- len(changes),
- repr(self.get_data_stime()),
- repr(self.get_entry_stime())))
+ lf("Batch Completed",
+ mode=self.name,
+ duration="%.4f" % (time.time() - self.batch_start_time),
+ changelog_start=changes[0].split(".")[-1],
+ changelog_end=changes[-1].split(".")[-1],
+ num_changelogs=len(changes),
+ stime=self.get_data_stime(),
+ entry_stime=self.get_entry_stime()))
def upd_entry_stime(self, stime):
self.slave.server.set_entry_stime(self.FLAT_DIR_HIERARCHY,
@@ -1231,7 +1242,8 @@ class GMasterChangelogMixin(GMasterCommon):
changelogs_batches[-1].append(c)
for batch in changelogs_batches:
- logging.debug('processing changes %s' % repr(batch))
+ logging.debug(lf('processing changes',
+ batch=batch))
self.process(batch)
def crawl(self):
@@ -1246,13 +1258,14 @@ class GMasterChangelogMixin(GMasterCommon):
changes = self.changelog_agent.getchanges()
if changes:
if data_stime:
- logging.info("slave's time: %s" % repr(data_stime))
+ logging.info(lf("slave's time",
+ stime=data_stime))
processed = [x for x in changes
if int(x.split('.')[-1]) < data_stime[0]]
for pr in processed:
- logging.info(
- 'skipping already processed change: %s...' %
- os.path.basename(pr))
+ logging.debug(
+ lf('skipping already processed change',
+ changelog=os.path.basename(pr)))
self.changelog_done_func(pr)
changes.remove(pr)
self.archive_and_purge_changelogs(processed)
@@ -1289,10 +1302,11 @@ class GMasterChangeloghistoryMixin(GMasterChangelogMixin):
data_stime = self.get_data_stime()
end_time = int(time.time())
- logging.info('starting history crawl... turns: %s, stime: %s, '
- 'etime: %s, entry_stime: %s'
- % (self.history_turns, repr(data_stime),
- repr(end_time), self.get_entry_stime()))
+ logging.info(lf('starting history crawl',
+ turns=self.history_turns,
+ stime=data_stime,
+ etime=end_time,
+ entry_stime=self.get_entry_stime()))
if not data_stime or data_stime == URXTIME:
raise NoStimeAvailable()
@@ -1320,12 +1334,13 @@ class GMasterChangeloghistoryMixin(GMasterChangelogMixin):
changes = self.changelog_agent.history_getchanges()
if changes:
if data_stime:
- logging.info("slave's time: %s" % repr(data_stime))
+ logging.info(lf("slave's time",
+ stime=data_stime))
processed = [x for x in changes
if int(x.split('.')[-1]) < data_stime[0]]
for pr in processed:
- logging.info('skipping already processed change: '
- '%s...' % os.path.basename(pr))
+ logging.debug(lf('skipping already processed change',
+ changelog=os.path.basename(pr)))
self.changelog_done_func(pr)
changes.remove(pr)
@@ -1333,10 +1348,10 @@ class GMasterChangeloghistoryMixin(GMasterChangelogMixin):
history_turn_time = int(time.time()) - self.history_crawl_start_time
- logging.info('finished history crawl syncing, endtime: %s, '
- 'stime: %s, entry_stime: %s'
- % (actual_end, repr(self.get_data_stime()),
- self.get_entry_stime()))
+ logging.info(lf('finished history crawl',
+ endtime=actual_end,
+ stime=self.get_data_stime(),
+ entry_stime=self.get_entry_stime()))
# If TS returned from history_changelog is < register_time
# then FS crawl may be required, since history is only available
@@ -1376,7 +1391,8 @@ class GMasterXsyncMixin(GMasterChangelogMixin):
self.stimes = []
self.sleep_interval = 60
self.tempdir = self.setup_working_dir()
- logging.info('Working dir: %s' % self.tempdir)
+ logging.info(lf('Working dir',
+ path=self.tempdir))
self.tempdir = os.path.join(self.tempdir, 'xsync')
self.processed_changelogs_dir = self.tempdir
self.name = "xsync"
@@ -1400,25 +1416,28 @@ class GMasterXsyncMixin(GMasterChangelogMixin):
self.Xcrawl()
t = Thread(target=Xsyncer)
t.start()
- logging.info('starting hybrid crawl..., stime: %s'
- % repr(self.get_data_stime()))
+ logging.info(lf('starting hybrid crawl',
+ stime=self.get_data_stime()))
self.status.set_worker_crawl_status("Hybrid Crawl")
while True:
try:
item = self.comlist.pop(0)
if item[0] == 'finale':
- logging.info('finished hybrid crawl syncing, stime: %s'
- % repr(self.get_data_stime()))
+ logging.info(lf('finished hybrid crawl',
+ stime=self.get_data_stime()))
break
elif item[0] == 'xsync':
- logging.info('processing xsync changelog %s' % (item[1]))
+ logging.info(lf('processing xsync changelog',
+ path=item[1]))
self.process([item[1]], 0)
self.archive_and_purge_changelogs([item[1]])
elif item[0] == 'stime':
- logging.debug('setting slave time: %s' % repr(item[1]))
+ logging.debug(lf('setting slave time',
+ time=item[1]))
self.upd_stime(item[1][1], item[1][0])
else:
- logging.warn('unknown tuple in comlist (%s)' % repr(item))
+ logging.warn(lf('unknown tuple in comlist',
+ entry=item))
except IndexError:
time.sleep(1)
@@ -1496,8 +1515,9 @@ class GMasterXsyncMixin(GMasterChangelogMixin):
xtr_root = self.xtime('.', self.slave)
if isinstance(xtr_root, int):
if xtr_root != ENOENT:
- logging.warn("slave cluster not returning the "
- "correct xtime for root (%d)" % xtr_root)
+ logging.warn(lf("slave cluster not returning the "
+ "correct xtime for root",
+ xtime=xtr_root))
xtr_root = self.minus_infinity
xtl = self.xtime(path)
if isinstance(xtl, int):
@@ -1505,8 +1525,10 @@ class GMasterXsyncMixin(GMasterChangelogMixin):
xtr = self.xtime(path, self.slave)
if isinstance(xtr, int):
if xtr != ENOENT:
- logging.warn("slave cluster not returning the "
- "correct xtime for %s (%d)" % (path, xtr))
+ logging.warn(lf("slave cluster not returning the "
+ "correct xtime",
+ path=path,
+ xtime=xtr))
xtr = self.minus_infinity
xtr = max(xtr, xtr_root)
zero_zero = (0, 0)
@@ -1521,27 +1543,32 @@ class GMasterXsyncMixin(GMasterChangelogMixin):
dem = self.master.server.entries(path)
pargfid = self.master.server.gfid(path)
if isinstance(pargfid, int):
- logging.warn('skipping directory %s' % (path))
+ logging.warn(lf('skipping directory',
+ path=path))
for e in dem:
bname = e
e = os.path.join(path, e)
xte = self.xtime(e)
if isinstance(xte, int):
- logging.warn("irregular xtime for %s: %s" %
- (e, errno.errorcode[xte]))
+ logging.warn(lf("irregular xtime",
+ path=e,
+ error=errno.errorcode[xte]))
continue
if not self.need_sync(e, xte, xtr):
continue
st = self.master.server.lstat(e)
if isinstance(st, int):
- logging.warn('%s got purged in the interim ...' % e)
+ logging.warn(lf('got purged in the interim',
+ path=e))
continue
if self.is_sticky(e, st.st_mode):
- logging.debug('ignoring sticky bit file %s' % e)
+ logging.debug(lf('ignoring sticky bit file',
+ path=e))
continue
gfid = self.master.server.gfid(e)
if isinstance(gfid, int):
- logging.warn('skipping entry %s..' % e)
+ logging.warn(lf('skipping entry',
+ path=e))
continue
mo = st.st_mode
self.counter += 1 if ((stat.S_ISDIR(mo) or
@@ -1704,14 +1731,12 @@ class Syncer(object):
pb.close()
start = time.time()
po = self.sync_engine(pb, self.log_err)
- logging.info("Sync Time Taken (Job:{0} "
- "Files:{1} ReturnCode:{2}): "
- "{3:.4f} secs".format(
- job_id,
- len(pb),
- po.returncode,
- time.time() - start
- ))
+ logging.info(lf("Sync Time Taken",
+ job=job_id,
+ num_files=len(pb),
+ return_code=po.returncode,
+ duration="%.4f" % (time.time() - start)))
+
if po.returncode == 0:
ret = (True, 0)
elif po.returncode in self.errnos_ok:
diff --git a/geo-replication/syncdaemon/monitor.py b/geo-replication/syncdaemon/monitor.py
index c54c07d600c..b65f1948050 100644
--- a/geo-replication/syncdaemon/monitor.py
+++ b/geo-replication/syncdaemon/monitor.py
@@ -22,7 +22,7 @@ from errno import ECHILD, ESRCH
import re
import random
from gconf import gconf
-from syncdutils import select, waitpid, errno_wrap
+from syncdutils import select, waitpid, errno_wrap, lf
from syncdutils import set_term_handler, is_host_local, GsyncdError
from syncdutils import escape, Thread, finalize, memoize
from syncdutils import gf_event, EVENT_GEOREP_FAULTY
@@ -63,15 +63,17 @@ def get_slave_bricks_status(host, vol):
po.wait()
po.terminate_geterr(fail_on_err=False)
if po.returncode != 0:
- logging.info("Volume status command failed, unable to get "
- "list of up nodes of %s, returning empty list: %s" %
- (vol, po.returncode))
+ logging.info(lf("Volume status command failed, unable to get "
+ "list of up nodes, returning empty list",
+ volume=vol,
+ error=po.returncode))
return []
vi = XET.fromstring(vix)
if vi.find('opRet').text != '0':
- logging.info("Unable to get list of up nodes of %s, "
- "returning empty list: %s" %
- (vol, vi.find('opErrstr').text))
+ logging.info(lf("Unable to get list of up nodes, "
+ "returning empty list",
+ volume=vol,
+ error=vi.find('opErrstr').text))
return []
up_hosts = set()
@@ -81,8 +83,10 @@ def get_slave_bricks_status(host, vol):
if el.find('status').text == '1':
up_hosts.add(el.find('hostname').text)
except (ParseError, AttributeError, ValueError) as e:
- logging.info("Parsing failed to get list of up nodes of %s, "
- "returning empty list: %s" % (vol, e))
+ logging.info(lf("Parsing failed to get list of up nodes, "
+ "returning empty list",
+ volume=vol,
+ error=e))
return list(up_hosts)
@@ -271,8 +275,9 @@ class Monitor(object):
# Spawn the worker and agent in lock to avoid fd leak
self.lock.acquire()
- logging.info('starting gsyncd worker(%s). Slave node: %s' %
- (w[0]['dir'], remote_host))
+ logging.info(lf('starting gsyncd worker',
+ brick=w[0]['dir'],
+ slave_node=remote_host))
# Couple of pipe pairs for RPC communication b/w
# worker and changelog agent.
@@ -336,15 +341,16 @@ class Monitor(object):
if ret_agent is not None:
# Agent is died Kill Worker
- logging.info("Changelog Agent died, "
- "Aborting Worker(%s)" % w[0]['dir'])
+ logging.info(lf("Changelog Agent died, Aborting Worker",
+ brick=w[0]['dir']))
errno_wrap(os.kill, [cpid, signal.SIGKILL], [ESRCH])
nwait(cpid)
nwait(apid)
if ret is not None:
- logging.info("worker(%s) died before establishing "
- "connection" % w[0]['dir'])
+ logging.info(lf("worker died before establishing "
+ "connection",
+ brick=w[0]['dir']))
nwait(apid) # wait for agent
else:
logging.debug("worker(%s) connected" % w[0]['dir'])
@@ -353,15 +359,16 @@ class Monitor(object):
ret_agent = nwait(apid, os.WNOHANG)
if ret is not None:
- logging.info("worker(%s) died in startup "
- "phase" % w[0]['dir'])
+ logging.info(lf("worker died in startup phase",
+ brick=w[0]['dir']))
nwait(apid) # wait for agent
break
if ret_agent is not None:
# Agent is died Kill Worker
- logging.info("Changelog Agent died, Aborting "
- "Worker(%s)" % w[0]['dir'])
+ logging.info(lf("Changelog Agent died, Aborting "
+ "Worker",
+ brick=w[0]['dir']))
errno_wrap(os.kill, [cpid, signal.SIGKILL], [ESRCH])
nwait(cpid)
nwait(apid)
@@ -369,13 +376,15 @@ class Monitor(object):
time.sleep(1)
else:
- logging.info("worker(%s) not confirmed in %d sec, aborting it. "
- "Gsyncd invocation on remote slave via SSH or "
- "gluster master mount might have hung. Please "
- "check the above logs for exact issue and check "
- "master or slave volume for errors. Restarting "
- "master/slave volume accordingly might help."
- % (w[0]['dir'], conn_timeout))
+ logging.info(
+ lf("Worker not confirmed after wait, aborting it. "
+ "Gsyncd invocation on remote slave via SSH or "
+ "gluster master mount might have hung. Please "
+ "check the above logs for exact issue and check "
+ "master or slave volume for errors. Restarting "
+ "master/slave volume accordingly might help.",
+ brick=w[0]['dir'],
+ timeout=conn_timeout))
errno_wrap(os.kill, [cpid, signal.SIGKILL], [ESRCH])
nwait(apid) # wait for agent
ret = nwait(cpid)
diff --git a/geo-replication/syncdaemon/repce.py b/geo-replication/syncdaemon/repce.py
index d7b17dda796..0ac144930db 100644
--- a/geo-replication/syncdaemon/repce.py
+++ b/geo-replication/syncdaemon/repce.py
@@ -29,7 +29,7 @@ except ImportError:
# py 3
import pickle
-from syncdutils import Thread, select
+from syncdutils import Thread, select, lf
pickle_proto = -1
repce_version = 1.0
@@ -203,8 +203,10 @@ class RepceClient(object):
meth, *args, **{'cbk': lambda rj, res: rj.wakeup(res)})
exc, res = rjob.wait()
if exc:
- logging.error('call %s (%s) failed on peer with %s' %
- (repr(rjob), meth, str(type(res).__name__)))
+ logging.error(lf('call failed on peer',
+ call=repr(rjob),
+ method=meth,
+ error=str(type(res).__name__)))
raise res
logging.debug("call %s %s -> %s" % (repr(rjob), meth, repr(res)))
return res
diff --git a/geo-replication/syncdaemon/resource.py b/geo-replication/syncdaemon/resource.py
index 5d7234358fb..37f6e1cabc1 100644
--- a/geo-replication/syncdaemon/resource.py
+++ b/geo-replication/syncdaemon/resource.py
@@ -41,7 +41,7 @@ from syncdutils import get_changelog_log_level, get_rsync_version
from syncdutils import CHANGELOG_AGENT_CLIENT_VERSION
from gsyncdstatus import GeorepStatus
from syncdutils import get_master_and_slave_data_from_args
-from syncdutils import mntpt_list
+from syncdutils import mntpt_list, lf
UrlRX = re.compile('\A(\w+)://([^ *?[]*)\Z')
HostRX = re.compile('[a-zA-Z\d](?:[a-zA-Z\d.-]*[a-zA-Z\d])?', re.I)
@@ -228,11 +228,9 @@ class Popen(subprocess.Popen):
def errlog(self):
"""make a log about child's failure event"""
- filling = ""
- if self.elines:
- filling = ", saying:"
- logging.error("""command "%s" returned with %s%s""" %
- (" ".join(self.args), repr(self.returncode), filling))
+ logging.error(lf("command returned error",
+ cmd=" ".join(self.args),
+ error=self.returncode))
lp = ''
def logerr(l):
@@ -725,11 +723,12 @@ class Server(object):
def rename_with_disk_gfid_confirmation(gfid, entry, en):
if not matching_disk_gfid(gfid, entry):
- logging.error("RENAME ignored: "
- "source entry:%s(gfid:%s) does not match with "
- "on-disk gfid(%s), when attempting to rename "
- "to %s" %
- (entry, gfid, cls.gfid_mnt(entry), en))
+ logging.error(lf("RENAME ignored: source entry does not match "
+ "with on-disk gfid",
+ source=entry,
+ gfid=gfid,
+ disk_gfid=cls.gfid_mnt(entry),
+ target=en))
return
cmd_ret = errno_wrap(os.rename,
@@ -769,12 +768,17 @@ class Server(object):
logging.debug("Removed %s => %s/%s recursively" %
(gfid, pg, bname))
else:
- logging.warn("Recursive remove %s => %s/%s"
- "failed: %s" % (gfid, pg, bname,
- os.strerror(er1)))
+ logging.warn(lf("Recursive remove failed",
+ gfid=gfid,
+ pgfid=pg,
+ bname=bname,
+ error=os.strerror(er1)))
else:
- logging.warn("Failed to remove %s => %s/%s. %s" %
- (gfid, pg, bname, os.strerror(er)))
+ logging.warn(lf("Failed to remove",
+ gfid=gfid,
+ pgfid=pg,
+ bname=bname,
+ error=os.strerror(er)))
elif op in ['CREATE', 'MKNOD']:
slink = os.path.join(pfx, gfid)
st = lstat(slink)
@@ -833,10 +837,11 @@ class Server(object):
except OSError as e:
if e.errno == ENOTEMPTY:
logging.error(
- "Unable to delete directory "
- "{0}, Both Old({1}) and New{2}"
- " directories exists".format(
- entry, entry, en))
+ lf("Unable to delete directory"
+ ", Both Old and New"
+ " directories exists",
+ old=entry,
+ new=en))
else:
raise
else:
@@ -1011,8 +1016,8 @@ class SlaveLocal(object):
time.sleep(int(gconf.timeout))
if lp == self.server.last_keep_alive:
logging.info(
- "connection inactive for %d seconds, stopping" %
- int(gconf.timeout))
+ lf("connection inactive, stopping",
+ timeout=int(gconf.timeout)))
break
else:
select((), (), ())
@@ -1114,7 +1119,9 @@ class SlaveRemote(object):
if kw.get("log_err", False):
for errline in stderr.strip().split("\n")[:-1]:
- logging.error("SYNC Error(Rsync): %s" % errline)
+ logging.error(lf("SYNC Error",
+ sync_engine="Rsync",
+ error=errline))
if log_rsync_performance:
rsync_msg = []
@@ -1129,7 +1136,8 @@ class SlaveRemote(object):
line.startswith("Total bytes received:") or \
line.startswith("sent "):
rsync_msg.append(line)
- logging.info("rsync performance: %s" % ", ".join(rsync_msg))
+ logging.info(lf("rsync performance",
+ data=", ".join(rsync_msg)))
return po
@@ -1169,7 +1177,9 @@ class SlaveRemote(object):
if log_err:
for errline in stderr1.strip().split("\n")[:-1]:
- logging.error("SYNC Error(Untar): %s" % errline)
+ logging.error(lf("SYNC Error",
+ sync_engine="Tarssh",
+ error=errline))
return p1
@@ -1389,7 +1399,8 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):
if rv:
rv = (os.WIFEXITED(rv) and os.WEXITSTATUS(rv) or 0) - \
(os.WIFSIGNALED(rv) and os.WTERMSIG(rv) or 0)
- logging.warn('stale mount possibly left behind on ' + d)
+ logging.warn(lf('stale mount possibly left behind',
+ path=d))
raise GsyncdError("cleaning up temp mountpoint %s "
"failed with status %d" %
(d, rv))
@@ -1478,7 +1489,7 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):
# if cli terminated with error due to being
# refused by glusterd, what it put
# out on stdout is a diagnostic message
- logging.error('glusterd answered: %s' % self.mntpt)
+ logging.error(lf('glusterd answered', mnt=self.mntpt))
def connect(self):
"""inhibit the resource beyond
@@ -1488,7 +1499,7 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):
with given backend
"""
- logging.info ("Mounting gluster volume locally...")
+ logging.info("Mounting gluster volume locally...")
t0 = time.time()
label = getattr(gconf, 'mountbroker', None)
if not label and not privileged():
@@ -1500,8 +1511,8 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):
['log-file=' + gconf.gluster_log_file, 'volfile-server=' +
self.host, 'volfile-id=' + self.volume, 'client-pid=-1']
mounter(params).inhibit(*[l for l in [label] if l])
- logging.info ("Mounted gluster volume. Time taken: {0:.4f} "
- "secs".format((time.time() - t0)))
+ logging.info(lf("Mounted gluster volume",
+ duration="%.4f" % (time.time() - t0)))
def connect_remote(self, *a, **kw):
sup(self, *a, **kw)
@@ -1643,11 +1654,12 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):
g2.register(register_time, changelog_agent, status)
g3.register(register_time, changelog_agent, status)
except ChangelogException as e:
- logging.error("Changelog register failed, %s" % e)
+ logging.error(lf("Changelog register failed", error=e))
sys.exit(1)
g1.register(status=status)
- logging.info("Register time: %s" % register_time)
+ logging.info(lf("Register time",
+ time=register_time))
# oneshot: Try to use changelog history api, if not
# available switch to FS crawl
# Note: if config.change_detector is xsync then
@@ -1655,8 +1667,9 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):
try:
g3.crawlwrap(oneshot=True)
except PartialHistoryAvailable as e:
- logging.info('Partial history available, using xsync crawl'
- ' after consuming history till %s' % str(e))
+ logging.info(lf('Partial history available, using xsync crawl'
+ ' after consuming history',
+ till=e))
g1.crawlwrap(oneshot=True, register_time=register_time)
except ChangelogHistoryNotAvailable:
logging.info('Changelog history not available, using xsync')
@@ -1665,13 +1678,14 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):
logging.info('No stime available, using xsync crawl')
g1.crawlwrap(oneshot=True, register_time=register_time)
except ChangelogException as e:
- logging.error("Changelog History Crawl failed, %s" % e)
+ logging.error(lf("Changelog History Crawl failed",
+ error=e))
sys.exit(1)
try:
g2.crawlwrap()
except ChangelogException as e:
- logging.error("Changelog crawl failed, %s" % e)
+ logging.error(lf("Changelog crawl failed", error=e))
sys.exit(1)
else:
sup(self, *args)
@@ -1763,14 +1777,14 @@ class SSH(AbstractUrl, SlaveRemote):
self.inner_rsc.url)
deferred = go_daemon == 'postconn'
- logging.info ("Initializing SSH connection between master and slave...")
+ logging.info("Initializing SSH connection between master and slave...")
t0 = time.time()
ret = sup(self, gconf.ssh_command.split() +
["-p", str(gconf.ssh_port)] +
gconf.ssh_ctl_args + [self.remote_addr],
slave=self.inner_rsc.url, deferred=deferred)
- logging.info ("SSH connection between master and slave established. "
- "Time taken: {0:.4f} secs".format((time.time() - t0)))
+ logging.info(lf("SSH connection between master and slave established.",
+ duration="%.4f" % (time.time() - t0)))
if deferred:
# send a message to peer so that we can wait for
diff --git a/geo-replication/syncdaemon/syncdutils.py b/geo-replication/syncdaemon/syncdutils.py
index 321e0d32ccc..b5f09459c57 100644
--- a/geo-replication/syncdaemon/syncdutils.py
+++ b/geo-replication/syncdaemon/syncdutils.py
@@ -304,8 +304,8 @@ def log_raise_exception(excont):
gconf.transport.terminate_geterr()
elif isinstance(exc, OSError) and exc.errno in (ENOTCONN,
ECONNABORTED):
- logging.error('glusterfs session went down [%s]',
- errorcode[exc.errno])
+ logging.error(lf('glusterfs session went down',
+ error=errorcode[exc.errno]))
else:
logtag = "FAIL"
if not logtag and logging.getLogger().isEnabledFor(logging.DEBUG):
@@ -387,8 +387,9 @@ def boolify(s):
if lstr in true_list:
rv = True
elif not lstr in false_list:
- logging.warn("Unknown string (%s) in string to boolean conversion "
- "defaulting to False\n" % (s))
+ logging.warn(lf("Unknown string in \"string to boolean\" conversion, "
+ "defaulting to False",
+ str=s))
return rv
@@ -497,8 +498,9 @@ def errno_wrap(call, arg=[], errnos=[], retry_errnos=[]):
nr_tries += 1
if nr_tries == GF_OP_RETRIES:
# probably a screwed state, cannot do much...
- logging.warn('reached maximum retries (%s)...%s' %
- (repr(arg), ex))
+ logging.warn(lf('reached maximum retries',
+ args=repr(arg),
+ error=ex))
raise
time.sleep(0.250) # retry the call
@@ -572,3 +574,16 @@ def get_rsync_version(rsync_cmd):
rsync_version = out.split(" ", 4)[3]
return rsync_version
+
+
+def lf(event, **kwargs):
+ """
+ Log Format helper function, log messages can be
+ easily modified to structured log format.
+ lf("Config Change", sync_jobs=4, brick=/bricks/b1) will be
+ converted as "Config Change<TAB>brick=/bricks/b1<TAB>sync_jobs=4"
+ """
+ msg = event
+ for k, v in kwargs.items():
+ msg += "\t{0}={1}".format(k, v)
+ return msg