summaryrefslogtreecommitdiffstats
path: root/geo-replication/syncdaemon/master.py
diff options
context:
space:
mode:
Diffstat (limited to 'geo-replication/syncdaemon/master.py')
-rw-r--r--geo-replication/syncdaemon/master.py748
1 files changed, 516 insertions, 232 deletions
diff --git a/geo-replication/syncdaemon/master.py b/geo-replication/syncdaemon/master.py
index aebaf31dcff..9501aeae6b5 100644
--- a/geo-replication/syncdaemon/master.py
+++ b/geo-replication/syncdaemon/master.py
@@ -12,20 +12,23 @@ import os
import sys
import time
import stat
-import json
import logging
import fcntl
import string
import errno
import tarfile
-from errno import ENOENT, ENODATA, EEXIST, EACCES, EAGAIN, ESTALE
+from errno import ENOENT, ENODATA, EEXIST, EACCES, EAGAIN, ESTALE, EINTR
from threading import Condition, Lock
from datetime import datetime
-from gconf import gconf
-from syncdutils import Thread, GsyncdError, boolify, escape
-from syncdutils import unescape, gauxpfx, md5hex, selfkill
-from syncdutils import lstat, errno_wrap, FreeObject
-from syncdutils import NoStimeAvailable, PartialHistoryAvailable
+
+import gsyncdconfig as gconf
+import libgfchangelog
+from rconf import rconf
+from syncdutils import (Thread, GsyncdError, escape_space_newline,
+ unescape_space_newline, gauxpfx, escape,
+ lstat, errno_wrap, FreeObject, lf, matching_disk_gfid,
+ NoStimeAvailable, PartialHistoryAvailable,
+ host_brick_split)
URXTIME = (-1, 0)
@@ -54,8 +57,8 @@ def _volinfo_hook_relax_foreign(self):
fgn_vi = volinfo_sys[self.KFGN]
if fgn_vi:
expiry = fgn_vi['timeout'] - int(time.time()) + 1
- logging.info('foreign volume info found, waiting %d sec for expiry' %
- expiry)
+ logging.info(lf('foreign volume info found, waiting for expiry',
+ expiry=expiry))
time.sleep(expiry)
volinfo_sys = self.get_sys_volinfo()
return volinfo_sys
@@ -64,6 +67,9 @@ def _volinfo_hook_relax_foreign(self):
def edct(op, **ed):
dct = {}
dct['op'] = op
+ # This is used in automatic gfid conflict resolution.
+ # When marked True, it's skipped during re-processing.
+ dct['skip_entry'] = False
for k in ed:
if k == 'stat':
st = ed[k]
@@ -85,23 +91,41 @@ def gmaster_builder(excrawl=None):
"""produce the GMaster class variant corresponding
to sync mode"""
this = sys.modules[__name__]
- modemixin = gconf.special_sync_mode
+ modemixin = gconf.get("special-sync-mode")
if not modemixin:
modemixin = 'normal'
- changemixin = 'xsync' if gconf.change_detector == 'xsync' \
- else excrawl or gconf.change_detector
- logging.debug('setting up %s change detection mode' % changemixin)
+
+ if gconf.get("change-detector") == 'xsync':
+ changemixin = 'xsync'
+ elif excrawl:
+ changemixin = excrawl
+ else:
+ changemixin = gconf.get("change-detector")
+
+ logging.debug(lf('setting up change detection mode',
+ mode=changemixin))
modemixin = getattr(this, modemixin.capitalize() + 'Mixin')
crawlmixin = getattr(this, 'GMaster' + changemixin.capitalize() + 'Mixin')
- sendmarkmixin = boolify(
- gconf.use_rsync_xattrs) and SendmarkRsyncMixin or SendmarkNormalMixin
- purgemixin = boolify(
- gconf.ignore_deletes) and PurgeNoopMixin or PurgeNormalMixin
- syncengine = boolify(gconf.use_tarssh) and TarSSHEngine or RsyncEngine
+
+ if gconf.get("use-rsync-xattrs"):
+ sendmarkmixin = SendmarkRsyncMixin
+ else:
+ sendmarkmixin = SendmarkNormalMixin
+
+ if gconf.get("ignore-deletes"):
+ purgemixin = PurgeNoopMixin
+ else:
+ purgemixin = PurgeNormalMixin
+
+ if gconf.get("sync-method") == "tarssh":
+ syncengine = TarSSHEngine
+ else:
+ syncengine = RsyncEngine
class _GMaster(crawlmixin, modemixin, sendmarkmixin,
purgemixin, syncengine):
pass
+
return _GMaster
@@ -138,9 +162,9 @@ class NormalMixin(object):
return xt0 >= xt1
def make_xtime_opts(self, is_master, opts):
- if not 'create' in opts:
+ if 'create' not in opts:
opts['create'] = is_master
- if not 'default_xtime' in opts:
+ if 'default_xtime' not in opts:
opts['default_xtime'] = URXTIME
def xtime_low(self, rsc, path, **opts):
@@ -173,7 +197,7 @@ class NormalMixin(object):
vi = vi.copy()
vi['timeout'] = int(time.time()) + timo
else:
- # send keep-alives more frequently to
+ # send keep-alive more frequently to
# avoid a delay in announcing our volume info
# to slave if it becomes established in the
# meantime
@@ -211,9 +235,9 @@ class RecoverMixin(NormalMixin):
@staticmethod
def make_xtime_opts(is_master, opts):
- if not 'create' in opts:
+ if 'create' not in opts:
opts['create'] = False
- if not 'default_xtime' in opts:
+ if 'default_xtime' not in opts:
opts['default_xtime'] = URXTIME
def keepalive_payload_hook(self, timo, gap):
@@ -256,7 +280,7 @@ class TarSSHEngine(object):
"""
def a_syncdata(self, files):
- logging.debug('files: %s' % (files))
+ logging.debug(lf("Files", files=files))
for f in files:
pb = self.syncer.add(f)
@@ -264,7 +288,7 @@ class TarSSHEngine(object):
def regjob(se, xte, pb):
rv = pb.wait()
if rv[0]:
- logging.debug('synced ' + se)
+ logging.debug(lf('synced', file=se))
return True
else:
# stat check for file presence
@@ -290,16 +314,16 @@ class RsyncEngine(object):
"""Sync engine that uses rsync(1) for data transfers"""
def a_syncdata(self, files):
- logging.debug('files: %s' % (files))
+ logging.debug(lf("files", files=files))
for f in files:
- logging.debug('candidate for syncing %s' % f)
+ logging.debug(lf('candidate for syncing', file=f))
pb = self.syncer.add(f)
def regjob(se, xte, pb):
rv = pb.wait()
if rv[0]:
- logging.debug('synced ' + se)
+ logging.debug(lf('synced', file=se))
return True
else:
# stat to check if the file exist
@@ -384,7 +408,7 @@ class GMasterCommon(object):
self.master = master
self.slave = slave
self.jobtab = {}
- if boolify(gconf.use_tarssh):
+ if gconf.get("sync-method") == "tarssh":
self.syncer = Syncer(slave, self.slave.tarssh, [2])
else:
# partial transfer (cf. rsync(1)), that's normal
@@ -400,7 +424,7 @@ class GMasterCommon(object):
# 0.
self.crawls = 0
self.turns = 0
- self.total_turns = int(gconf.turns)
+ self.total_turns = rconf.turns
self.crawl_start = datetime.now()
self.lastreport = {'crawls': 0, 'turns': 0, 'time': 0}
self.start = None
@@ -413,7 +437,7 @@ class GMasterCommon(object):
def init_keep_alive(cls):
"""start the keep-alive thread """
- timo = int(gconf.timeout or 0)
+ timo = gconf.get("slave-timeout", 0)
if timo > 0:
def keep_alive():
while True:
@@ -426,30 +450,22 @@ class GMasterCommon(object):
def mgmt_lock(self):
"""Take management volume lock """
- if gconf.mgmt_lock_fd:
+ if rconf.mgmt_lock_fd:
try:
- fcntl.lockf(gconf.mgmt_lock_fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
- if not gconf.active_earlier:
- gconf.active_earlier = True
- logging.info("Got lock : %s : Becoming ACTIVE"
- % gconf.local_path)
+ fcntl.lockf(rconf.mgmt_lock_fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
return True
except:
ex = sys.exc_info()[1]
if isinstance(ex, IOError) and ex.errno in (EACCES, EAGAIN):
- if not gconf.passive_earlier:
- gconf.passive_earlier = True
- logging.info("Didn't get lock : %s : Becoming PASSIVE"
- % gconf.local_path)
return False
raise
fd = None
- bname = str(self.uuid) + "_" + str(gconf.slave_id) + "_subvol_" \
- + str(gconf.subvol_num) + ".lock"
- mgmt_lock_dir = os.path.join(gconf.meta_volume_mnt, "geo-rep")
+ bname = str(self.uuid) + "_" + rconf.args.slave_id + "_subvol_" \
+ + str(rconf.args.subvol_num) + ".lock"
+ mgmt_lock_dir = os.path.join(gconf.get("meta-volume-mnt"), "geo-rep")
path = os.path.join(mgmt_lock_dir, bname)
- logging.debug("lock_file_path: %s" % path)
+ logging.debug(lf("lock file path", path=path))
try:
fd = os.open(path, os.O_CREAT | os.O_RDWR)
except OSError:
@@ -470,29 +486,22 @@ class GMasterCommon(object):
try:
fcntl.lockf(fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
# Save latest FD for future use
- gconf.mgmt_lock_fd = fd
+ rconf.mgmt_lock_fd = fd
except:
ex = sys.exc_info()[1]
if isinstance(ex, IOError) and ex.errno in (EACCES, EAGAIN):
# cannot grab, it's taken
- if not gconf.passive_earlier:
- gconf.passive_earlier = True
- logging.info("Didn't get lock : %s : Becoming PASSIVE"
- % gconf.local_path)
- gconf.mgmt_lock_fd = fd
+ rconf.mgmt_lock_fd = fd
return False
raise
- if not gconf.active_earlier:
- gconf.active_earlier = True
- logging.info("Got lock : %s : Becoming ACTIVE" % gconf.local_path)
return True
def should_crawl(self):
- if not boolify(gconf.use_meta_volume):
- return gconf.glusterd_uuid in self.master.server.node_uuid()
+ if not gconf.get("use-meta-volume"):
+ return rconf.args.local_node_id in self.master.server.node_uuid()
- if not os.path.ismount(gconf.meta_volume_mnt):
+ if not os.path.ismount(gconf.get("meta-volume-mnt")):
logging.error("Meta-volume is not mounted. Worker Exiting...")
sys.exit(1)
return self.mgmt_lock()
@@ -509,8 +518,8 @@ class GMasterCommon(object):
# If crawlwrap is called when partial history available,
# then it sets register_time which is the time when geo-rep
- # worker registerd to changelog consumption. Since nsec is
- # not considered in register time, their are chances of skipping
+ # worker registered to changelog consumption. Since nsec is
+ # not considered in register time, there are chances of skipping
# changes detection in xsync crawl. This limit will be reset when
# crawlwrap is called again.
self.live_changelog_start_time = None
@@ -520,7 +529,7 @@ class GMasterCommon(object):
# no need to maintain volinfo state machine.
# in a cascading setup, each geo-replication session is
# independent (ie. 'volume-mark' and 'xtime' are not
- # propogated). This is because the slave's xtime is now
+ # propagated). This is because the slave's xtime is now
# stored on the master itself. 'volume-mark' just identifies
# that we are in a cascading setup and need to enable
# 'geo-replication.ignore-pid-check' option.
@@ -530,11 +539,11 @@ class GMasterCommon(object):
logging.debug("%s master with volume id %s ..." %
(inter_master and "intermediate" or "primary",
self.uuid))
- gconf.configinterface.set('volume_id', self.uuid)
+ rconf.volume_id = self.uuid
if self.volinfo:
if self.volinfo['retval']:
- logging.warn("master cluster's info may not be valid %d" %
- self.volinfo['retval'])
+ logging.warn(lf("master cluster's info may not be valid",
+ error=self.volinfo['retval']))
else:
raise GsyncdError("master volinfo unavailable")
self.lastreport['time'] = time.time()
@@ -555,7 +564,7 @@ class GMasterCommon(object):
turns=self.turns,
time=self.start)
t1 = time.time()
- if int(t1 - t0) >= int(gconf.replica_failover_interval):
+ if int(t1 - t0) >= gconf.get("replica-failover-interval"):
crawl = self.should_crawl()
t0 = t1
self.update_worker_remote_node()
@@ -565,13 +574,16 @@ class GMasterCommon(object):
# which is min of cluster (but max of the replicas)
brick_stime = self.xtime('.', self.slave)
cluster_stime = self.master.server.aggregated.stime_mnt(
- '.', '.'.join([str(self.uuid), str(gconf.slave_id)]))
- logging.debug("Cluster stime: %s | Brick stime: %s" %
- (repr(cluster_stime), repr(brick_stime)))
+ '.', '.'.join([str(self.uuid), rconf.args.slave_id]))
+ logging.debug(lf("Crawl info",
+ cluster_stime=cluster_stime,
+ brick_stime=brick_stime))
+
if not isinstance(cluster_stime, int):
if brick_stime < cluster_stime:
self.slave.server.set_stime(
self.FLAT_DIR_HIERARCHY, self.uuid, cluster_stime)
+ self.upd_stime(cluster_stime)
# Purge all changelogs available in processing dir
# less than cluster_stime
proc_dir = os.path.join(self.tempdir,
@@ -670,6 +682,7 @@ class XCrawlMetadata(object):
self.st_atime = float(st_atime)
self.st_mtime = float(st_mtime)
+
class GMasterChangelogMixin(GMasterCommon):
""" changelog based change detection and syncing """
@@ -687,6 +700,9 @@ class GMasterChangelogMixin(GMasterCommon):
TYPE_GFID = "D "
TYPE_ENTRY = "E "
+ MAX_EF_RETRIES = 10
+ MAX_OE_RETRIES = 10
+
# flat directory hierarchy for gfid based access
FLAT_DIR_HIERARCHY = '.'
@@ -694,34 +710,34 @@ class GMasterChangelogMixin(GMasterCommon):
def init_fop_batch_stats(self):
self.batch_stats = {
- "CREATE":0,
- "MKNOD":0,
- "UNLINK":0,
- "MKDIR":0,
- "RMDIR":0,
- "LINK":0,
- "SYMLINK":0,
- "RENAME":0,
- "SETATTR":0,
- "SETXATTR":0,
- "XATTROP":0,
- "DATA":0,
- "ENTRY_SYNC_TIME":0,
- "META_SYNC_TIME":0,
- "DATA_START_TIME":0
+ "CREATE": 0,
+ "MKNOD": 0,
+ "UNLINK": 0,
+ "MKDIR": 0,
+ "RMDIR": 0,
+ "LINK": 0,
+ "SYMLINK": 0,
+ "RENAME": 0,
+ "SETATTR": 0,
+ "SETXATTR": 0,
+ "XATTROP": 0,
+ "DATA": 0,
+ "ENTRY_SYNC_TIME": 0,
+ "META_SYNC_TIME": 0,
+ "DATA_START_TIME": 0
}
def update_fop_batch_stats(self, ty):
if ty in ['FSETXATTR']:
- ty = 'SETXATTR'
- self.batch_stats[ty] = self.batch_stats.get(ty,0) + 1
+ ty = 'SETXATTR'
+ self.batch_stats[ty] = self.batch_stats.get(ty, 0) + 1
def archive_and_purge_changelogs(self, changelogs):
# Creates tar file instead of tar.gz, since changelogs will
# be appended to existing tar. archive name is
# archive_<YEAR><MONTH>.tar
archive_name = "archive_%s.tar" % datetime.today().strftime(
- gconf.changelog_archive_format)
+ gconf.get("changelog-archive-format"))
try:
tar = tarfile.open(os.path.join(self.processed_changelogs_dir,
@@ -757,13 +773,9 @@ class GMasterChangelogMixin(GMasterCommon):
else:
raise
- def fallback_xsync(self):
- logging.info('falling back to xsync mode')
- gconf.configinterface.set('change-detector', 'xsync')
- selfkill()
-
def setup_working_dir(self):
- workdir = os.path.join(gconf.working_dir, md5hex(gconf.local_path))
+ workdir = os.path.join(gconf.get("working-dir"),
+ escape(rconf.args.local_path))
logging.debug('changelog working dir %s' % workdir)
return workdir
@@ -773,14 +785,207 @@ class GMasterChangelogMixin(GMasterCommon):
st = lstat(os.path.join(gfid_prefix, failure[0][entry_key]))
if not isinstance(st, int):
num_failures += 1
- logging.error('%s FAILED: %s' % (log_prefix,
- repr(failure)))
+ logging.error(lf('%s FAILED' % log_prefix,
+ data=failure))
if failure[0]['op'] == 'MKDIR':
raise GsyncdError("The above directory failed to sync."
" Please fix it to proceed further.")
self.status.inc_value("failures", num_failures)
+ def fix_possible_entry_failures(self, failures, retry_count, entries):
+ pfx = gauxpfx()
+ fix_entry_ops = []
+ failures1 = []
+ remove_gfids = set()
+ for failure in failures:
+ if failure[2]['name_mismatch']:
+ pbname = failure[2]['slave_entry']
+ elif failure[2]['dst']:
+ pbname = failure[0]['entry1']
+ else:
+ pbname = failure[0]['entry']
+
+ op = failure[0]['op']
+ # name exists but gfid is different
+ if failure[2]['gfid_mismatch'] or failure[2]['name_mismatch']:
+ slave_gfid = failure[2]['slave_gfid']
+ st = lstat(os.path.join(pfx, slave_gfid))
+ # Takes care of scenarios with no hardlinks
+ if isinstance(st, int) and st == ENOENT:
+ logging.debug(lf('Entry not present on master. Fixing gfid '
+ 'mismatch in slave. Deleting the entry',
+ retry_count=retry_count,
+ entry=repr(failure)))
+ # Add deletion to fix_entry_ops list
+ if failure[2]['slave_isdir']:
+ fix_entry_ops.append(
+ edct('RMDIR',
+ gfid=failure[2]['slave_gfid'],
+ entry=pbname))
+ else:
+ fix_entry_ops.append(
+ edct('UNLINK',
+ gfid=failure[2]['slave_gfid'],
+ entry=pbname))
+ remove_gfids.add(slave_gfid)
+ if op in ['RENAME']:
+ # If renamed gfid doesn't exists on master, remove
+ # rename entry and unlink src on slave
+ st = lstat(os.path.join(pfx, failure[0]['gfid']))
+ if isinstance(st, int) and st == ENOENT:
+ logging.debug("Unlink source %s" % repr(failure))
+ remove_gfids.add(failure[0]['gfid'])
+ fix_entry_ops.append(
+ edct('UNLINK',
+ gfid=failure[0]['gfid'],
+ entry=failure[0]['entry']))
+ # Takes care of scenarios of hardlinks/renames on master
+ elif not isinstance(st, int):
+ if matching_disk_gfid(slave_gfid, pbname):
+ # Safe to ignore the failure as master contains same
+ # file with same gfid. Remove entry from entries list
+ logging.debug(lf('Fixing gfid mismatch in slave. '
+ ' Safe to ignore, take out entry',
+ retry_count=retry_count,
+ entry=repr(failure)))
+ remove_gfids.add(failure[0]['gfid'])
+ if op == 'RENAME':
+ fix_entry_ops.append(
+ edct('UNLINK',
+ gfid=failure[0]['gfid'],
+ entry=failure[0]['entry']))
+ # The file exists on master but with different name.
+ # Probably renamed and got missed during xsync crawl.
+ elif failure[2]['slave_isdir']:
+ realpath = os.readlink(os.path.join(
+ rconf.args.local_path,
+ ".glusterfs",
+ slave_gfid[0:2],
+ slave_gfid[2:4],
+ slave_gfid))
+ dst_entry = os.path.join(pfx, realpath.split('/')[-2],
+ realpath.split('/')[-1])
+ src_entry = pbname
+ logging.debug(lf('Fixing dir name/gfid mismatch in '
+ 'slave', retry_count=retry_count,
+ entry=repr(failure)))
+ if src_entry == dst_entry:
+ # Safe to ignore the failure as master contains
+ # same directory as in slave with same gfid.
+ # Remove the failure entry from entries list
+ logging.debug(lf('Fixing dir name/gfid mismatch'
+ ' in slave. Safe to ignore, '
+ 'take out entry',
+ retry_count=retry_count,
+ entry=repr(failure)))
+ try:
+ entries.remove(failure[0])
+ except ValueError:
+ pass
+ else:
+ rename_dict = edct('RENAME', gfid=slave_gfid,
+ entry=src_entry,
+ entry1=dst_entry, stat=st,
+ link=None)
+ logging.debug(lf('Fixing dir name/gfid mismatch'
+ ' in slave. Renaming',
+ retry_count=retry_count,
+ entry=repr(rename_dict)))
+ fix_entry_ops.append(rename_dict)
+ else:
+ # A hardlink file exists with different name or
+ # renamed file exists and we are sure from
+ # matching_disk_gfid check that the entry doesn't
+ # exist with same gfid so we can safely delete on slave
+ logging.debug(lf('Fixing file gfid mismatch in slave. '
+ 'Hardlink/Rename Case. Deleting entry',
+ retry_count=retry_count,
+ entry=repr(failure)))
+ fix_entry_ops.append(
+ edct('UNLINK',
+ gfid=failure[2]['slave_gfid'],
+ entry=pbname))
+ elif failure[1] == ENOENT:
+ if op in ['RENAME']:
+ pbname = failure[0]['entry1']
+ else:
+ pbname = failure[0]['entry']
+
+ pargfid = pbname.split('/')[1]
+ st = lstat(os.path.join(pfx, pargfid))
+ # Safe to ignore the failure as master doesn't contain
+ # parent directory.
+ if isinstance(st, int):
+ logging.debug(lf('Fixing ENOENT error in slave. Parent '
+ 'does not exist on master. Safe to '
+ 'ignore, take out entry',
+ retry_count=retry_count,
+ entry=repr(failure)))
+ try:
+ entries.remove(failure[0])
+ except ValueError:
+ pass
+ else:
+ logging.debug(lf('Fixing ENOENT error in slave. Create '
+ 'parent directory on slave.',
+ retry_count=retry_count,
+ entry=repr(failure)))
+ realpath = os.readlink(os.path.join(rconf.args.local_path,
+ ".glusterfs",
+ pargfid[0:2],
+ pargfid[2:4],
+ pargfid))
+ dir_entry = os.path.join(pfx, realpath.split('/')[-2],
+ realpath.split('/')[-1])
+ fix_entry_ops.append(
+ edct('MKDIR', gfid=pargfid, entry=dir_entry,
+ mode=st.st_mode, uid=st.st_uid, gid=st.st_gid))
+
+ logging.debug("remove_gfids: %s" % repr(remove_gfids))
+ if remove_gfids:
+ for e in entries:
+ if e['op'] in ['MKDIR', 'MKNOD', 'CREATE', 'RENAME'] \
+ and e['gfid'] in remove_gfids:
+ logging.debug("Removed entry op from retrial list: entry: %s" % repr(e))
+ e['skip_entry'] = True
+
+ if fix_entry_ops:
+ # Process deletions of entries whose gfids are mismatched
+ failures1 = self.slave.server.entry_ops(fix_entry_ops)
+
+ return (failures1, fix_entry_ops)
+
+ def handle_entry_failures(self, failures, entries):
+ retries = 0
+ pending_failures = False
+ failures1 = []
+ failures2 = []
+ entry_ops1 = []
+ entry_ops2 = []
+
+ if failures:
+ pending_failures = True
+ failures1 = failures
+ entry_ops1 = entries
+
+ while pending_failures and retries < self.MAX_EF_RETRIES:
+ retries += 1
+ (failures2, entry_ops2) = self.fix_possible_entry_failures(
+ failures1, retries, entry_ops1)
+ if not failures2:
+ pending_failures = False
+ logging.info(lf('Successfully fixed entry ops with gfid '
+ 'mismatch', retry_count=retries))
+ else:
+ pending_failures = True
+ failures1 = failures2
+ entry_ops1 = entry_ops2
+
+ if pending_failures:
+ for failure in failures1:
+ logging.error("Failed to fix entry ops %s", repr(failure))
+
def process_change(self, change, done, retry):
pfx = gauxpfx()
clist = []
@@ -825,9 +1030,9 @@ class GMasterChangelogMixin(GMasterCommon):
# skip ENTRY operation if hot tier brick
if self.name == 'live_changelog' or \
self.name == 'history_changelog':
- if boolify(gconf.is_hottier) and et == self.TYPE_ENTRY:
- logging.debug('skip ENTRY op: %s if hot tier brick'
- % (ec[self.POS_TYPE]))
+ if rconf.args.is_hottier and et == self.TYPE_ENTRY:
+ logging.debug(lf('skip ENTRY op if hot tier brick',
+ op=ec[self.POS_TYPE]))
continue
# Data and Meta operations are decided while parsing
@@ -848,7 +1053,8 @@ class GMasterChangelogMixin(GMasterCommon):
self.update_fop_batch_stats(ec[self.POS_TYPE])
# PARGFID/BNAME
- en = unescape(os.path.join(pfx, ec[self.POS_ENTRY1]))
+ en = unescape_space_newline(
+ os.path.join(pfx, ec[self.POS_ENTRY1]))
# GFID of the entry
gfid = ec[self.POS_GFID]
@@ -856,7 +1062,8 @@ class GMasterChangelogMixin(GMasterCommon):
# The index of PARGFID/BNAME for UNLINK, RMDIR
# is no more the last index. It varies based on
# changelog.capture-del-path is enabled or not.
- en = unescape(os.path.join(pfx, ec[self.UNLINK_ENTRY]))
+ en = unescape_space_newline(
+ os.path.join(pfx, ec[self.UNLINK_ENTRY]))
# Remove from DATA list, so that rsync will
# not fail
@@ -866,14 +1073,19 @@ class GMasterChangelogMixin(GMasterCommon):
# file got unlinked, May be historical Changelog
datas.remove(pt)
- if not boolify(gconf.ignore_deletes):
+ if ty in ['RMDIR'] and not isinstance(st, int):
+ logging.info(lf('Ignoring rmdir. Directory present in '
+ 'master', gfid=gfid, pgfid_bname=en))
+ continue
+
+ if not gconf.get("ignore-deletes"):
if not ignore_entry_ops:
entries.append(edct(ty, gfid=gfid, entry=en))
elif ty in ['CREATE', 'MKDIR', 'MKNOD']:
# Special case: record mknod as link
if ty in ['MKNOD']:
mode = int(ec[2])
- if mode & 01000:
+ if mode & 0o1000:
# Avoid stat'ing the file as it
# may be deleted in the interim
st = FreeObject(st_mode=int(ec[2]),
@@ -905,32 +1117,61 @@ class GMasterChangelogMixin(GMasterCommon):
rl = None
if st and stat.S_ISLNK(st.st_mode):
- rl = errno_wrap(os.readlink, [en], [ENOENT], [ESTALE])
+ rl = errno_wrap(os.readlink, [en], [ENOENT],
+ [ESTALE, EINTR])
if isinstance(rl, int):
rl = None
- e1 = unescape(os.path.join(pfx, ec[self.POS_ENTRY1 - 1]))
+ e1 = unescape_space_newline(
+ os.path.join(pfx, ec[self.POS_ENTRY1 - 1]))
entries.append(edct(ty, gfid=gfid, entry=e1, entry1=en,
stat=st, link=rl))
+ # If src doesn't exist while doing rename, destination
+ # is created. If data is not followed by rename, this
+ # remains zero byte file on slave. Hence add data entry
+ # for renames
+ datas.add(os.path.join(pfx, gfid))
else:
# stat() to get mode and other information
+ if not matching_disk_gfid(gfid, en):
+ logging.debug(lf('Ignoring entry, purged in the '
+ 'interim', file=en, gfid=gfid))
+ continue
+
go = os.path.join(pfx, gfid)
st = lstat(go)
if isinstance(st, int):
- logging.debug('file %s got purged in the interim' % go)
+ logging.debug(lf('Ignoring entry, purged in the '
+ 'interim', file=en, gfid=gfid))
continue
if ty == 'LINK':
- entries.append(edct(ty, stat=st, entry=en, gfid=gfid))
+ rl = None
+ if st and stat.S_ISLNK(st.st_mode):
+ rl = errno_wrap(os.readlink, [en], [ENOENT],
+ [ESTALE, EINTR])
+ if isinstance(rl, int):
+ rl = None
+ entries.append(edct(ty, stat=st, entry=en, gfid=gfid,
+ link=rl))
+ # If src doesn't exist while doing link, destination
+ # is created based on file type. If data is not
+ # followed by link, this remains zero byte file on
+ # slave. Hence add data entry for links
+ if rl is None:
+ datas.add(os.path.join(pfx, gfid))
elif ty == 'SYMLINK':
- rl = errno_wrap(os.readlink, [en], [ENOENT], [ESTALE])
+ rl = errno_wrap(os.readlink, [en], [ENOENT],
+ [ESTALE, EINTR])
if isinstance(rl, int):
continue
entries.append(
edct(ty, stat=st, entry=en, gfid=gfid, link=rl))
else:
- logging.warn('ignoring %s [op %s]' % (gfid, ty))
+ logging.warn(lf('ignoring op',
+ gfid=gfid,
+ type=ty))
elif et == self.TYPE_GFID:
# If self.unlinked_gfids is available, then that means it is
# retrying the changelog second time. Do not add the GFID's
@@ -954,25 +1195,24 @@ class GMasterChangelogMixin(GMasterCommon):
st_mtime=ec[6])))
else:
meta_gfid.add((os.path.join(pfx, ec[0]), ))
- elif ec[1] == 'SETXATTR' or ec[1] == 'XATTROP' or \
- ec[1] == 'FXATTROP':
+ elif ec[1] in ['SETXATTR', 'XATTROP', 'FXATTROP']:
# To sync xattr/acls use rsync/tar, --xattrs and --acls
# switch to rsync and tar
- if not boolify(gconf.use_tarssh) and \
- (boolify(gconf.sync_xattrs) or boolify(gconf.sync_acls)):
+ if not gconf.get("sync-method") == "tarssh" and \
+ (gconf.get("sync-xattrs") or gconf.get("sync-acls")):
datas.add(os.path.join(pfx, ec[0]))
else:
- logging.warn('got invalid changelog type: %s' % (et))
+ logging.warn(lf('got invalid fop type',
+ type=et))
logging.debug('entries: %s' % repr(entries))
# Increment counters for Status
- self.status.inc_value("entry", len(entries))
self.files_in_batch += len(datas)
self.status.inc_value("data", len(datas))
self.batch_stats["DATA"] += self.files_in_batch - \
- self.batch_stats["SETXATTR"] - \
- self.batch_stats["XATTROP"]
+ self.batch_stats["SETXATTR"] - \
+ self.batch_stats["XATTROP"]
entry_start_time = time.time()
# sync namespace
@@ -981,6 +1221,22 @@ class GMasterChangelogMixin(GMasterCommon):
self.status.inc_value("entry", len(entries))
failures = self.slave.server.entry_ops(entries)
+
+ if gconf.get("gfid-conflict-resolution"):
+ count = 0
+ if failures:
+ logging.info(lf('Entry ops failed with gfid mismatch',
+ count=len(failures)))
+ while failures and count < self.MAX_OE_RETRIES:
+ count += 1
+ self.handle_entry_failures(failures, entries)
+ logging.info(lf('Retry original entries', count=count))
+ failures = self.slave.server.entry_ops(entries)
+ if not failures:
+ logging.info("Successfully fixed all entry ops with "
+ "gfid mismatch")
+ break
+
self.log_failures(failures, 'gfid', gauxpfx(), 'ENTRY')
self.status.dec_value("entry", len(entries))
@@ -1011,14 +1267,15 @@ class GMasterChangelogMixin(GMasterCommon):
else:
st = lstat(go[0])
if isinstance(st, int):
- logging.debug('file %s got purged in the interim' % go[0])
+ logging.debug(lf('file got purged in the interim',
+ file=go[0]))
continue
meta_entries.append(edct('META', go=go[0], stat=st))
if meta_entries:
- self.status.inc_value("meta", len(entries))
+ self.status.inc_value("meta", len(meta_entries))
failures = self.slave.server.meta_ops(meta_entries)
self.log_failures(failures, 'go', '', 'META')
- self.status.dec_value("meta", len(entries))
+ self.status.dec_value("meta", len(meta_entries))
self.batch_stats["META_SYNC_TIME"] += time.time() - meta_start_time
@@ -1053,7 +1310,7 @@ class GMasterChangelogMixin(GMasterCommon):
# with data of other changelogs.
if retry:
- if tries == (int(gconf.max_rsync_retries) - 1):
+ if tries == (gconf.get("max-rsync-retries") - 1):
# Enable Error logging if it is last retry
self.syncer.enable_errorlog()
@@ -1067,7 +1324,8 @@ class GMasterChangelogMixin(GMasterCommon):
self.a_syncdata(self.datas_in_batch)
else:
for change in changes:
- logging.debug('processing change %s' % change)
+ logging.debug(lf('processing change',
+ changelog=change))
self.process_change(change, done, retry)
if not retry:
# number of changelogs processed in the batch
@@ -1098,7 +1356,7 @@ class GMasterChangelogMixin(GMasterCommon):
if done:
xtl = (int(change.split('.')[-1]) - 1, 0)
self.upd_stime(xtl)
- map(self.changelog_done_func, changes)
+ list(map(self.changelog_done_func, changes))
self.archive_and_purge_changelogs(changes)
# Reset Data counter after sync
@@ -1110,10 +1368,10 @@ class GMasterChangelogMixin(GMasterCommon):
# We do not know which changelog transfer failed, retry everything.
retry = True
tries += 1
- if tries == int(gconf.max_rsync_retries):
- logging.error('changelogs %s could not be processed '
- 'completely - moving on...' %
- ' '.join(map(os.path.basename, changes)))
+ if tries == gconf.get("max-rsync-retries"):
+ logging.error(lf('changelogs could not be processed '
+ 'completely - moving on...',
+ files=list(map(os.path.basename, changes))))
# Reset data counter on failure
self.status.dec_value("data", self.files_in_batch)
@@ -1123,7 +1381,7 @@ class GMasterChangelogMixin(GMasterCommon):
if done:
xtl = (int(change.split('.')[-1]) - 1, 0)
self.upd_stime(xtl)
- map(self.changelog_done_func, changes)
+ list(map(self.changelog_done_func, changes))
self.archive_and_purge_changelogs(changes)
break
# it's either entry_ops() or Rsync that failed to do it's
@@ -1133,8 +1391,8 @@ class GMasterChangelogMixin(GMasterCommon):
# entry_ops() that failed... so we retry the _whole_ changelog
# again.
# TODO: remove entry retries when it's gets fixed.
- logging.warn('incomplete sync, retrying changelogs: %s' %
- ' '.join(map(os.path.basename, changes)))
+ logging.warn(lf('incomplete sync, retrying changelogs',
+ files=list(map(os.path.basename, changes))))
# Reset the Data counter before Retry
self.status.dec_value("data", self.files_in_batch)
@@ -1145,43 +1403,44 @@ class GMasterChangelogMixin(GMasterCommon):
# Log the Skipped Entry ops range if any
if self.skipped_entry_changelogs_first is not None and \
self.skipped_entry_changelogs_last is not None:
- logging.info("Skipping already processed entry "
- "ops from CHANGELOG.{0} to CHANGELOG.{1} "
- "Num: {2}".format(
- self.skipped_entry_changelogs_first,
- self.skipped_entry_changelogs_last,
- self.num_skipped_entry_changelogs))
+ logging.info(lf("Skipping already processed entry ops",
+ from_changelog=self.skipped_entry_changelogs_first,
+ to_changelog=self.skipped_entry_changelogs_last,
+ num_changelogs=self.num_skipped_entry_changelogs))
# Log Current batch details
if changes:
logging.info(
- "Entry Time Taken (UNL:{0} RMD:{1} CRE:{2} MKN:{3} "
- "MKD:{4} REN:{5} LIN:{6} SYM:{7}): {8:.4f} "
- "secs ".format (
- self.batch_stats["UNLINK"], self.batch_stats["RMDIR"],
- self.batch_stats["CREATE"], self.batch_stats["MKNOD"],
- self.batch_stats["MKDIR"], self.batch_stats["RENAME"],
- self.batch_stats["LINK"], self.batch_stats["SYMLINK"],
- self.batch_stats["ENTRY_SYNC_TIME"]))
+ lf("Entry Time Taken",
+ UNL=self.batch_stats["UNLINK"],
+ RMD=self.batch_stats["RMDIR"],
+ CRE=self.batch_stats["CREATE"],
+ MKN=self.batch_stats["MKNOD"],
+ MKD=self.batch_stats["MKDIR"],
+ REN=self.batch_stats["RENAME"],
+ LIN=self.batch_stats["LINK"],
+ SYM=self.batch_stats["SYMLINK"],
+ duration="%.4f" % self.batch_stats["ENTRY_SYNC_TIME"]))
+
logging.info(
- "Metadata Time Taken (SETA:{0}): {1:.4f} secs. "
- "Data Time Taken (SETX:{2} XATT:{3} DATA:{4}): "
- "{5:.4f} secs".format(
- self.batch_stats["SETATTR"],
- self.batch_stats["META_SYNC_TIME"],
- self.batch_stats["SETXATTR"], self.batch_stats["XATTROP"],
- self.batch_stats["DATA"],
- time.time() - self.batch_stats["DATA_START_TIME"]))
+ lf("Data/Metadata Time Taken",
+ SETA=self.batch_stats["SETATTR"],
+ meta_duration="%.4f" % self.batch_stats["META_SYNC_TIME"],
+ SETX=self.batch_stats["SETXATTR"],
+ XATT=self.batch_stats["XATTROP"],
+ DATA=self.batch_stats["DATA"],
+ data_duration="%.4f" % (
+ time.time() - self.batch_stats["DATA_START_TIME"])))
+
logging.info(
- "{0} mode completed in {1:.4f} seconds "
- "({2} - {3} Num: {4}) stime: {5}, entry_stime: {6}".format(
- self.name,
- time.time() - self.batch_start_time,
- changes[0].split("/")[-1],
- changes[-1].split("/")[-1],
- len(changes),
- repr(self.get_data_stime()),
- repr(self.get_entry_stime())))
+ lf("Batch Completed",
+ mode=self.name,
+ duration="%.4f" % (time.time() - self.batch_start_time),
+ changelog_start=changes[0].split(".")[-1],
+ changelog_end=changes[-1].split(".")[-1],
+ num_changelogs=len(changes),
+ stime=self.get_data_stime(),
+ entry_stime=self.get_entry_stime()))
def upd_entry_stime(self, stime):
self.slave.server.set_entry_stime(self.FLAT_DIR_HIERARCHY,
@@ -1197,8 +1456,7 @@ class GMasterChangelogMixin(GMasterCommon):
# Update last_synced_time in status file based on stime
# only update stime if stime xattr set to Brick root
if path == self.FLAT_DIR_HIERARCHY:
- chkpt_time = gconf.configinterface.get_realtime(
- "checkpoint")
+ chkpt_time = gconf.getr("checkpoint")
checkpoint_time = 0
if chkpt_time is not None:
checkpoint_time = int(chkpt_time)
@@ -1206,10 +1464,10 @@ class GMasterChangelogMixin(GMasterCommon):
self.status.set_last_synced(stime, checkpoint_time)
def update_worker_remote_node(self):
- node = sys.argv[-1]
+ node = rconf.args.resource_remote
node_data = node.split("@")
node = node_data[-1]
- remote_node_ip = node.split(":")[0]
+ remote_node_ip, _ = host_brick_split(node)
self.status.set_slave_node(remote_node_ip)
def changelogs_batch_process(self, changes):
@@ -1217,7 +1475,7 @@ class GMasterChangelogMixin(GMasterCommon):
current_size = 0
for c in changes:
si = os.lstat(c).st_size
- if (si + current_size) > int(gconf.changelog_batch_size):
+ if (si + current_size) > gconf.get("changelog-batch-size"):
# Create new batch if single Changelog file greater than
# Max Size! or current batch size exceeds Max size
changelogs_batches.append([c])
@@ -1231,7 +1489,8 @@ class GMasterChangelogMixin(GMasterCommon):
changelogs_batches[-1].append(c)
for batch in changelogs_batches:
- logging.debug('processing changes %s' % repr(batch))
+ logging.debug(lf('processing changes',
+ batch=batch))
self.process(batch)
def crawl(self):
@@ -1241,28 +1500,28 @@ class GMasterChangelogMixin(GMasterCommon):
# that are _historical_ to that time.
data_stime = self.get_data_stime()
- self.changelog_agent.scan()
+ libgfchangelog.scan()
self.crawls += 1
- changes = self.changelog_agent.getchanges()
+ changes = libgfchangelog.getchanges()
if changes:
if data_stime:
- logging.info("slave's time: %s" % repr(data_stime))
+ logging.info(lf("slave's time",
+ stime=data_stime))
processed = [x for x in changes
if int(x.split('.')[-1]) < data_stime[0]]
for pr in processed:
- logging.info(
- 'skipping already processed change: %s...' %
- os.path.basename(pr))
+ logging.debug(
+ lf('skipping already processed change',
+ changelog=os.path.basename(pr)))
self.changelog_done_func(pr)
changes.remove(pr)
self.archive_and_purge_changelogs(processed)
self.changelogs_batch_process(changes)
- def register(self, register_time, changelog_agent, status):
- self.changelog_agent = changelog_agent
- self.sleep_interval = int(gconf.change_interval)
- self.changelog_done_func = self.changelog_agent.done
+ def register(self, register_time, status):
+ self.sleep_interval = gconf.get("change-interval")
+ self.changelog_done_func = libgfchangelog.done
self.tempdir = self.setup_working_dir()
self.processed_changelogs_dir = os.path.join(self.tempdir,
".processed")
@@ -1271,11 +1530,10 @@ class GMasterChangelogMixin(GMasterCommon):
class GMasterChangeloghistoryMixin(GMasterChangelogMixin):
- def register(self, register_time, changelog_agent, status):
- self.changelog_agent = changelog_agent
+ def register(self, register_time, status):
self.changelog_register_time = register_time
self.history_crawl_start_time = register_time
- self.changelog_done_func = self.changelog_agent.history_done
+ self.changelog_done_func = libgfchangelog.history_done
self.history_turns = 0
self.tempdir = self.setup_working_dir()
self.processed_changelogs_dir = os.path.join(self.tempdir,
@@ -1289,10 +1547,17 @@ class GMasterChangeloghistoryMixin(GMasterChangelogMixin):
data_stime = self.get_data_stime()
end_time = int(time.time())
- logging.info('starting history crawl... turns: %s, stime: %s, '
- 'etime: %s, entry_stime: %s'
- % (self.history_turns, repr(data_stime),
- repr(end_time), self.get_entry_stime()))
+
+ #as start of historical crawl marks Geo-rep worker restart
+ if gconf.get("ignore-deletes"):
+ logging.info(lf('ignore-deletes config option is set',
+ stime=data_stime))
+
+ logging.info(lf('starting history crawl',
+ turns=self.history_turns,
+ stime=data_stime,
+ etime=end_time,
+ entry_stime=self.get_entry_stime()))
if not data_stime or data_stime == URXTIME:
raise NoStimeAvailable()
@@ -1300,13 +1565,13 @@ class GMasterChangeloghistoryMixin(GMasterChangelogMixin):
# Changelogs backend path is hardcoded as
# <BRICK_PATH>/.glusterfs/changelogs, if user configured to different
# location then consuming history will not work(Known issue as of now)
- changelog_path = os.path.join(gconf.local_path,
+ changelog_path = os.path.join(rconf.args.local_path,
".glusterfs/changelogs")
- ret, actual_end = self.changelog_agent.history(
+ ret, actual_end = libgfchangelog.history_changelog(
changelog_path,
data_stime[0],
end_time,
- int(gconf.sync_jobs))
+ gconf.get("sync-jobs"))
# scan followed by getchanges till scan returns zero.
# history_scan() is blocking call, till it gets the number
@@ -1314,18 +1579,19 @@ class GMasterChangeloghistoryMixin(GMasterChangelogMixin):
# to be processed. returns positive value as number of changelogs
# to be processed, which will be fetched using
# history_getchanges()
- while self.changelog_agent.history_scan() > 0:
+ while libgfchangelog.history_scan() > 0:
self.crawls += 1
- changes = self.changelog_agent.history_getchanges()
+ changes = libgfchangelog.history_getchanges()
if changes:
if data_stime:
- logging.info("slave's time: %s" % repr(data_stime))
+ logging.info(lf("slave's time",
+ stime=data_stime))
processed = [x for x in changes
if int(x.split('.')[-1]) < data_stime[0]]
for pr in processed:
- logging.info('skipping already processed change: '
- '%s...' % os.path.basename(pr))
+ logging.debug(lf('skipping already processed change',
+ changelog=os.path.basename(pr)))
self.changelog_done_func(pr)
changes.remove(pr)
@@ -1333,10 +1599,10 @@ class GMasterChangeloghistoryMixin(GMasterChangelogMixin):
history_turn_time = int(time.time()) - self.history_crawl_start_time
- logging.info('finished history crawl syncing, endtime: %s, '
- 'stime: %s, entry_stime: %s'
- % (actual_end, repr(self.get_data_stime()),
- self.get_entry_stime()))
+ logging.info(lf('finished history crawl',
+ endtime=actual_end,
+ stime=self.get_data_stime(),
+ entry_stime=self.get_entry_stime()))
# If TS returned from history_changelog is < register_time
# then FS crawl may be required, since history is only available
@@ -1350,7 +1616,7 @@ class GMasterChangeloghistoryMixin(GMasterChangelogMixin):
self.history_crawl_start_time = int(time.time())
self.crawl()
else:
- # This exeption will be catched in resource.py and
+ # This exception will be caught in resource.py and
# fallback to xsync for the small gap.
raise PartialHistoryAvailable(str(actual_end))
@@ -1369,14 +1635,15 @@ class GMasterXsyncMixin(GMasterChangelogMixin):
XSYNC_MAX_ENTRIES = 1 << 13
- def register(self, register_time=None, changelog_agent=None, status=None):
+ def register(self, register_time=None, status=None):
self.status = status
self.counter = 0
self.comlist = []
self.stimes = []
self.sleep_interval = 60
self.tempdir = self.setup_working_dir()
- logging.info('Working dir: %s' % self.tempdir)
+ logging.info(lf('Working dir',
+ path=self.tempdir))
self.tempdir = os.path.join(self.tempdir, 'xsync')
self.processed_changelogs_dir = self.tempdir
self.name = "xsync"
@@ -1388,6 +1655,11 @@ class GMasterXsyncMixin(GMasterChangelogMixin):
pass
else:
raise
+ # Purge stale unprocessed xsync changelogs
+ for f in os.listdir(self.tempdir):
+ if f.startswith("XSYNC-CHANGELOG"):
+ os.remove(os.path.join(self.tempdir, f))
+
def crawl(self):
"""
@@ -1400,25 +1672,28 @@ class GMasterXsyncMixin(GMasterChangelogMixin):
self.Xcrawl()
t = Thread(target=Xsyncer)
t.start()
- logging.info('starting hybrid crawl..., stime: %s'
- % repr(self.get_data_stime()))
+ logging.info(lf('starting hybrid crawl',
+ stime=self.get_data_stime()))
self.status.set_worker_crawl_status("Hybrid Crawl")
while True:
try:
item = self.comlist.pop(0)
if item[0] == 'finale':
- logging.info('finished hybrid crawl syncing, stime: %s'
- % repr(self.get_data_stime()))
+ logging.info(lf('finished hybrid crawl',
+ stime=self.get_data_stime()))
break
elif item[0] == 'xsync':
- logging.info('processing xsync changelog %s' % (item[1]))
+ logging.info(lf('processing xsync changelog',
+ path=item[1]))
self.process([item[1]], 0)
self.archive_and_purge_changelogs([item[1]])
elif item[0] == 'stime':
- logging.debug('setting slave time: %s' % repr(item[1]))
+ logging.debug(lf('setting slave time',
+ time=item[1]))
self.upd_stime(item[1][1], item[1][0])
else:
- logging.warn('unknown tuple in comlist (%s)' % repr(item))
+ logging.warn(lf('unknown tuple in comlist',
+ entry=item))
except IndexError:
time.sleep(1)
@@ -1478,7 +1753,7 @@ class GMasterXsyncMixin(GMasterChangelogMixin):
def is_sticky(self, path, mo):
"""check for DHTs linkto sticky bit file"""
sticky = False
- if mo & 01000:
+ if mo & 0o1000:
sticky = self.master.server.linkto_check(path)
return sticky
@@ -1496,8 +1771,9 @@ class GMasterXsyncMixin(GMasterChangelogMixin):
xtr_root = self.xtime('.', self.slave)
if isinstance(xtr_root, int):
if xtr_root != ENOENT:
- logging.warn("slave cluster not returning the "
- "correct xtime for root (%d)" % xtr_root)
+ logging.warn(lf("slave cluster not returning the "
+ "xtime for root",
+ error=xtr_root))
xtr_root = self.minus_infinity
xtl = self.xtime(path)
if isinstance(xtl, int):
@@ -1505,8 +1781,10 @@ class GMasterXsyncMixin(GMasterChangelogMixin):
xtr = self.xtime(path, self.slave)
if isinstance(xtr, int):
if xtr != ENOENT:
- logging.warn("slave cluster not returning the "
- "correct xtime for %s (%d)" % (path, xtr))
+ logging.warn(lf("slave cluster not returning the "
+ "xtime for dir",
+ path=path,
+ error=xtr))
xtr = self.minus_infinity
xtr = max(xtr, xtr_root)
zero_zero = (0, 0)
@@ -1521,27 +1799,32 @@ class GMasterXsyncMixin(GMasterChangelogMixin):
dem = self.master.server.entries(path)
pargfid = self.master.server.gfid(path)
if isinstance(pargfid, int):
- logging.warn('skipping directory %s' % (path))
+ logging.warn(lf('skipping directory',
+ path=path))
for e in dem:
bname = e
e = os.path.join(path, e)
xte = self.xtime(e)
if isinstance(xte, int):
- logging.warn("irregular xtime for %s: %s" %
- (e, errno.errorcode[xte]))
+ logging.warn(lf("irregular xtime",
+ path=e,
+ error=errno.errorcode[xte]))
continue
if not self.need_sync(e, xte, xtr):
continue
st = self.master.server.lstat(e)
if isinstance(st, int):
- logging.warn('%s got purged in the interim ...' % e)
+ logging.warn(lf('got purged in the interim',
+ path=e))
continue
if self.is_sticky(e, st.st_mode):
- logging.debug('ignoring sticky bit file %s' % e)
+ logging.debug(lf('ignoring sticky bit file',
+ path=e))
continue
gfid = self.master.server.gfid(e)
if isinstance(gfid, int):
- logging.warn('skipping entry %s..' % e)
+ logging.warn(lf('skipping entry',
+ path=e))
continue
mo = st.st_mode
self.counter += 1 if ((stat.S_ISDIR(mo) or
@@ -1551,8 +1834,10 @@ class GMasterXsyncMixin(GMasterChangelogMixin):
self.sync_done(self.stimes, False)
self.stimes = []
if stat.S_ISDIR(mo):
- self.write_entry_change("E", [gfid, 'MKDIR', str(mo),
- str(0), str(0), escape(os.path.join(pargfid, bname))])
+ self.write_entry_change("E",
+ [gfid, 'MKDIR', str(mo),
+ str(0), str(0), escape_space_newline(
+ os.path.join(pargfid, bname))])
self.write_entry_change("M", [gfid, "SETATTR", str(st.st_uid),
str(st.st_gid), str(st.st_mode),
str(st.st_atime),
@@ -1571,8 +1856,8 @@ class GMasterXsyncMixin(GMasterChangelogMixin):
self.stimes.append((e, stime_to_update))
elif stat.S_ISLNK(mo):
self.write_entry_change(
- "E", [gfid, 'SYMLINK', escape(os.path.join(pargfid,
- bname))])
+ "E", [gfid, 'SYMLINK', escape_space_newline(
+ os.path.join(pargfid, bname))])
elif stat.S_ISREG(mo):
nlink = st.st_nlink
nlink -= 1 # fixup backend stat link count
@@ -1583,12 +1868,13 @@ class GMasterXsyncMixin(GMasterChangelogMixin):
self.write_entry_change("E",
[gfid, 'MKNOD', str(mo),
str(0), str(0),
- escape(os.path.join(
- pargfid, bname))])
+ escape_space_newline(
+ os.path.join(
+ pargfid, bname))])
else:
self.write_entry_change(
- "E", [gfid, 'LINK', escape(os.path.join(pargfid,
- bname))])
+ "E", [gfid, 'LINK', escape_space_newline(
+ os.path.join(pargfid, bname))])
self.write_entry_change("D", [gfid])
if path == '.':
stime_to_update = xtl
@@ -1685,8 +1971,8 @@ class Syncer(object):
self.pb = PostBox()
self.sync_engine = sync_engine
self.errnos_ok = resilient_errnos
- for i in range(int(gconf.sync_jobs)):
- t = Thread(target=self.syncjob, args=(i+1, ))
+ for i in range(gconf.get("sync-jobs")):
+ t = Thread(target=self.syncjob, args=(i + 1, ))
t.start()
def syncjob(self, job_id):
@@ -1704,14 +1990,12 @@ class Syncer(object):
pb.close()
start = time.time()
po = self.sync_engine(pb, self.log_err)
- logging.info("Sync Time Taken (Job:{0} "
- "Files:{1} ReturnCode:{2}): "
- "{3:.4f} secs".format(
- job_id,
- len(pb),
- po.returncode,
- time.time() - start
- ))
+ logging.info(lf("Sync Time Taken",
+ job=job_id,
+ num_files=len(pb),
+ return_code=po.returncode,
+ duration="%.4f" % (time.time() - start)))
+
if po.returncode == 0:
ret = (True, 0)
elif po.returncode in self.errnos_ok: