summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--geo-replication/syncdaemon/master.py141
-rw-r--r--geo-replication/syncdaemon/resource.py57
-rw-r--r--geo-replication/syncdaemon/syncdutils.py35
3 files changed, 169 insertions, 64 deletions
diff --git a/geo-replication/syncdaemon/master.py b/geo-replication/syncdaemon/master.py
index d9f63a440fb..665a51f64dd 100644
--- a/geo-replication/syncdaemon/master.py
+++ b/geo-replication/syncdaemon/master.py
@@ -711,7 +711,8 @@ class GMasterChangelogMixin(GMasterCommon):
TYPE_GFID = "D "
TYPE_ENTRY = "E "
- MAX_EF_RETRIES = 15
+ MAX_EF_RETRIES = 10
+ MAX_OE_RETRIES = 5
# flat directory hierarchy for gfid based access
FLAT_DIR_HIERARCHY = '.'
@@ -803,21 +804,28 @@ class GMasterChangelogMixin(GMasterCommon):
self.status.inc_value("failures", num_failures)
- def fix_possible_entry_failures(self, failures, retry_count):
+ def fix_possible_entry_failures(self, failures, retry_count, entries):
pfx = gauxpfx()
fix_entry_ops = []
failures1 = []
for failure in failures:
- if failure[2]['dst']:
+ if failure[2]['name_mismatch']:
+ pbname = failure[2]['slave_entry']
+ elif failure[2]['dst']:
pbname = failure[0]['entry1']
else:
pbname = failure[0]['entry']
- if failure[2]['gfid_mismatch']:
+
+ op = failure[0]['op']
+ # name exists but gfid is different
+ if failure[2]['gfid_mismatch'] or failure[2]['name_mismatch']:
slave_gfid = failure[2]['slave_gfid']
st = lstat(os.path.join(pfx, slave_gfid))
+ # Takes care of scenarios with no hardlinks
if isinstance(st, int) and st == ENOENT:
- logging.info(lf('Fixing gfid mismatch in slave. Deleting'
- ' the entry', retry_count=retry_count,
+ logging.info(lf('Entry not present on master. Fixing gfid '
+ 'mismatch in slave. Deleting the entry',
+ retry_count=retry_count,
entry=repr(failure)))
# Add deletion to fix_entry_ops list
if failure[2]['slave_isdir']:
@@ -830,79 +838,119 @@ class GMasterChangelogMixin(GMasterCommon):
edct('UNLINK',
gfid=failure[2]['slave_gfid'],
entry=pbname))
+ # Takes care of scenarios of hardlinks/renames on master
elif not isinstance(st, int):
- # The file exists on master but with different name.
- # Probabaly renamed and got missed during xsync crawl.
- if failure[2]['slave_isdir']:
- logging.info(lf('Fixing gfid mismatch in slave',
+ if matching_disk_gfid(slave_gfid, pbname):
+ # Safe to ignore the failure as master contains same
+ # file with same gfid. Remove entry from entries list
+ logging.info(lf('Fixing gfid mismatch in slave. '
+ ' Safe to ignore, take out entry',
retry_count=retry_count,
entry=repr(failure)))
- realpath = os.readlink(os.path.join(
- rconf.args.local_path,
- ".glusterfs",
- slave_gfid[0:2],
- slave_gfid[2:4],
- slave_gfid))
+ entries.remove(failure[0])
+ # The file exists on master but with different name.
+ # Probably renamed and got missed during xsync crawl.
+ elif failure[2]['slave_isdir']:
+ realpath = os.readlink(os.path.join(gconf.local_path,
+ ".glusterfs",
+ slave_gfid[0:2],
+ slave_gfid[2:4],
+ slave_gfid))
dst_entry = os.path.join(pfx, realpath.split('/')[-2],
realpath.split('/')[-1])
- rename_dict = edct('RENAME', gfid=slave_gfid,
- entry=failure[0]['entry'],
- entry1=dst_entry, stat=st,
- link=None)
- logging.info(lf('Fixing gfid mismatch in slave. '
- 'Renaming', retry_count=retry_count,
- entry=repr(rename_dict)))
- fix_entry_ops.append(rename_dict)
+ src_entry = pbname
+ logging.info(lf('Fixing dir name/gfid mismatch in '
+ 'slave', retry_count=retry_count,
+ entry=repr(failure)))
+ if src_entry == dst_entry:
+ # Safe to ignore the failure as master contains
+ # same directory as in slave with same gfid.
+ # Remove the failure entry from entries list
+ logging.info(lf('Fixing dir name/gfid mismatch'
+ ' in slave. Safe to ignore, '
+ 'take out entry',
+ retry_count=retry_count,
+ entry=repr(failure)))
+ entries.remove(failure[0])
+ else:
+ rename_dict = edct('RENAME', gfid=slave_gfid,
+ entry=src_entry,
+ entry1=dst_entry, stat=st,
+ link=None)
+ logging.info(lf('Fixing dir name/gfid mismatch'
+ ' in slave. Renaming',
+ retry_count=retry_count,
+ entry=repr(rename_dict)))
+ fix_entry_ops.append(rename_dict)
else:
- logging.info(lf('Fixing gfid mismatch in slave. '
- ' Deleting the entry',
+ # A hardlink file exists with different name or
+ # renamed file exists and we are sure from
+ # matching_disk_gfid check that the entry doesn't
+ # exist with same gfid so we can safely delete on slave
+ logging.info(lf('Fixing file gfid mismatch in slave. '
+ 'Hardlink/Rename Case. Deleting entry',
retry_count=retry_count,
entry=repr(failure)))
fix_entry_ops.append(
edct('UNLINK',
gfid=failure[2]['slave_gfid'],
entry=pbname))
- logging.error(lf('Entry cannot be fixed in slave due '
- 'to GFID mismatch, find respective '
- 'path for the GFID and trigger sync',
- gfid=slave_gfid))
+ elif failure[1] == ENOENT:
+ # Ignore ENOENT error for fix_entry_ops aka retry_count > 1
+ if retry_count > 1:
+ logging.info(lf('ENOENT error while fixing entry ops. '
+ 'Safe to ignore, take out entry',
+ retry_count=retry_count,
+ entry=repr(failure)))
+ entries.remove(failure[0])
+ elif op in ('MKNOD', 'CREATE', 'MKDIR'):
+ pargfid = pbname.split('/')[1]
+ st = lstat(os.path.join(pfx, pargfid))
+ # Safe to ignore the failure as master doesn't contain
+ # parent directory.
+ if isinstance(st, int):
+ logging.info(lf('Fixing ENOENT error in slave. Parent '
+ 'does not exist on master. Safe to '
+ 'ignore, take out entry',
+ retry_count=retry_count,
+ entry=repr(failure)))
+ entries.remove(failure[0])
if fix_entry_ops:
# Process deletions of entries whose gfids are mismatched
failures1 = self.slave.server.entry_ops(fix_entry_ops)
- if not failures1:
- logging.info("Sucessfully fixed entry ops with gfid mismatch")
- return failures1
+ return (failures1, fix_entry_ops)
def handle_entry_failures(self, failures, entries):
retries = 0
pending_failures = False
failures1 = []
failures2 = []
+ entry_ops1 = []
+ entry_ops2 = []
if failures:
pending_failures = True
failures1 = failures
+ entry_ops1 = entries
while pending_failures and retries < self.MAX_EF_RETRIES:
retries += 1
- failures2 = self.fix_possible_entry_failures(failures1,
- retries)
+ (failures2, entry_ops2) = self.fix_possible_entry_failures(
+ failures1, retries, entry_ops1)
if not failures2:
pending_failures = False
+ logging.info(lf('Sucessfully fixed entry ops with gfid '
+ 'mismatch', retry_count=retries))
else:
pending_failures = True
failures1 = failures2
+ entry_ops1 = entry_ops2
if pending_failures:
for failure in failures1:
logging.error("Failed to fix entry ops %s", repr(failure))
- else:
- # Retry original entry list 5 times
- failures = self.slave.server.entry_ops(entries)
-
- self.log_failures(failures, 'gfid', gauxpfx(), 'ENTRY')
def process_change(self, change, done, retry):
pfx = gauxpfx()
@@ -1129,7 +1177,18 @@ class GMasterChangelogMixin(GMasterCommon):
self.status.inc_value("entry", len(entries))
failures = self.slave.server.entry_ops(entries)
- self.handle_entry_failures(failures, entries)
+ count = 0
+ while failures and count < self.MAX_OE_RETRIES:
+ count += 1
+ self.handle_entry_failures(failures, entries)
+ logging.info("Retry original entries. count = %s" % count)
+ failures = self.slave.server.entry_ops(entries)
+ if not failures:
+ logging.info("Sucessfully fixed all entry ops with gfid "
+ "mismatch")
+ break
+
+ self.log_failures(failures, 'gfid', gauxpfx(), 'ENTRY')
self.status.dec_value("entry", len(entries))
# Update Entry stime in Brick Root only in case of Changelog mode
diff --git a/geo-replication/syncdaemon/resource.py b/geo-replication/syncdaemon/resource.py
index 7eeced96caf..43e82f30d28 100644
--- a/geo-replication/syncdaemon/resource.py
+++ b/geo-replication/syncdaemon/resource.py
@@ -38,16 +38,15 @@ from syncdutils import get_changelog_log_level, get_rsync_version
from syncdutils import CHANGELOG_AGENT_CLIENT_VERSION
from syncdutils import GX_GFID_CANONICAL_LEN
from gsyncdstatus import GeorepStatus
-from syncdutils import lf, Popen, sup, Volinfo
+from syncdutils import lf, Popen, sup
from syncdutils import Xattr, matching_disk_gfid, get_gfid_from_mnt
-from syncdutils import unshare_propagation_supported
+from syncdutils import unshare_propagation_supported, get_slv_dir_path
ENOTSUP = getattr(errno, 'ENOTSUP', 'EOPNOTSUPP')
slv_volume = None
slv_host = None
-slv_bricks = None
class Server(object):
@@ -408,13 +407,23 @@ class Server(object):
# to be purged is the GFID gotten from the changelog.
# (a stat(changelog_gfid) would also be valid here)
# The race here is between the GFID check and the purge.
+
+ # If the entry or the gfid of the file to be deleted is not present
+ # on slave, we can ignore the unlink/rmdir
+ if isinstance(lstat(entry), int) or \
+ isinstance(lstat(os.path.join(pfx, gfid)), int):
+ return
+
if not matching_disk_gfid(gfid, entry):
collect_failure(e, EEXIST)
return
if op == 'UNLINK':
er = errno_wrap(os.unlink, [entry], [ENOENT, ESTALE], [EBUSY])
- return er
+ # EISDIR is safe error, ignore. This can only happen when
+ # unlink is sent from master while fixing gfid conflicts.
+ if er != EISDIR:
+ return er
elif op == 'RMDIR':
er = errno_wrap(os.rmdir, [entry], [ENOENT, ESTALE,
@@ -425,7 +434,11 @@ class Server(object):
def collect_failure(e, cmd_ret, dst=False):
slv_entry_info = {}
slv_entry_info['gfid_mismatch'] = False
+ slv_entry_info['name_mismatch'] = False
slv_entry_info['dst'] = dst
+ slv_entry_info['slave_isdir'] = False
+ slv_entry_info['slave_name'] = None
+ slv_entry_info['slave_gfid'] = None
# We do this for failing fops on Slave
# Master should be logging this
if cmd_ret is None:
@@ -444,6 +457,9 @@ class Server(object):
if not isinstance(st, int):
if st and stat.S_ISDIR(st.st_mode):
slv_entry_info['slave_isdir'] = True
+ dir_name = get_slv_dir_path(slv_host, slv_volume,
+ disk_gfid)
+ slv_entry_info['slave_name'] = dir_name
else:
slv_entry_info['slave_isdir'] = False
slv_entry_info['slave_gfid'] = disk_gfid
@@ -563,39 +579,34 @@ class Server(object):
[ENOENT, EEXIST], [ESTALE])
collect_failure(e, cmd_ret)
elif op == 'MKDIR':
+ en = e['entry']
slink = os.path.join(pfx, gfid)
st = lstat(slink)
# don't create multiple entries with same gfid
if isinstance(st, int):
blob = entry_pack_mkdir(
gfid, bname, e['mode'], e['uid'], e['gid'])
- else:
+ elif (isinstance(lstat(en), int) or
+ not matching_disk_gfid(gfid, en)):
# If gfid of a directory exists on slave but path based
# create is getting EEXIST. This means the directory is
# renamed in master but recorded as MKDIR during hybrid
# crawl. Get the directory path by reading the backend
# symlink and trying to rename to new name as said by
# master.
- global slv_bricks
- global slv_volume
- global slv_host
- if not slv_bricks:
- slv_info = Volinfo(slv_volume, slv_host)
- slv_bricks = slv_info.bricks
- # Result of readlink would be of format as below.
- # readlink = "../../pgfid[0:2]/pgfid[2:4]/pgfid/basename"
- realpath = os.readlink(os.path.join(slv_bricks[0]['dir'],
- ".glusterfs",
- gfid[0:2],
- gfid[2:4],
- gfid))
- realpath_parts = realpath.split('/')
- src_pargfid = realpath_parts[-2]
- src_basename = realpath_parts[-1]
- src_entry = os.path.join(pfx, src_pargfid, src_basename)
logging.info(lf("Special case: rename on mkdir",
gfid=gfid, entry=repr(entry)))
- rename_with_disk_gfid_confirmation(gfid, src_entry, entry)
+ src_entry = get_slv_dir_path(slv_host, slv_volume, gfid)
+ if src_entry is not None and src_entry != entry:
+ slv_entry_info = {}
+ slv_entry_info['gfid_mismatch'] = False
+ slv_entry_info['name_mismatch'] = True
+ slv_entry_info['dst'] = False
+ slv_entry_info['slave_isdir'] = True
+ slv_entry_info['slave_gfid'] = gfid
+ slv_entry_info['slave_entry'] = src_entry
+
+ failures.append((e, EEXIST, slv_entry_info))
elif op == 'LINK':
slink = os.path.join(pfx, gfid)
st = lstat(slink)
diff --git a/geo-replication/syncdaemon/syncdutils.py b/geo-replication/syncdaemon/syncdutils.py
index 6acc9f17ad7..f7173017e5d 100644
--- a/geo-replication/syncdaemon/syncdutils.py
+++ b/geo-replication/syncdaemon/syncdutils.py
@@ -66,6 +66,7 @@ CHANGELOG_AGENT_CLIENT_VERSION = 1.0
NodeID = None
rsync_version = None
unshare_mnt_propagation = None
+slv_bricks = None
SPACE_ESCAPE_CHAR = "%20"
NEWLINE_ESCAPE_CHAR = "%0A"
PERCENTAGE_ESCAPE_CHAR = "%25"
@@ -660,6 +661,40 @@ def get_rsync_version(rsync_cmd):
return rsync_version
+def get_slv_dir_path(slv_host, slv_volume, gfid):
+ global slv_bricks
+
+ dir_path = ENOENT
+
+ if not slv_bricks:
+ slv_info = Volinfo(slv_volume, slv_host)
+ slv_bricks = slv_info.bricks
+ # Result of readlink would be of format as below.
+ # readlink = "../../pgfid[0:2]/pgfid[2:4]/pgfid/basename"
+ for brick in slv_bricks:
+ dir_path = errno_wrap(os.path.join,
+ [brick['dir'],
+ ".glusterfs", gfid[0:2],
+ gfid[2:4],
+ gfid], [ENOENT], [ESTALE])
+ if dir_path != ENOENT:
+ break
+
+ if not isinstance(dir_path, int):
+ realpath = errno_wrap(os.readlink, [dir_path],
+ [ENOENT], [ESTALE])
+
+ if not isinstance(realpath, int):
+ realpath_parts = realpath.split('/')
+ pargfid = realpath_parts[-2]
+ basename = realpath_parts[-1]
+ pfx = gauxpfx()
+ dir_entry = os.path.join(pfx, pargfid, basename)
+ return dir_entry
+
+ return None
+
+
def lf(event, **kwargs):
"""
Log Format helper function, log messages can be