summaryrefslogtreecommitdiffstats
path: root/geo-replication/syncdaemon/master.py
diff options
context:
space:
mode:
Diffstat (limited to 'geo-replication/syncdaemon/master.py')
-rw-r--r--geo-replication/syncdaemon/master.py173
1 files changed, 119 insertions, 54 deletions
diff --git a/geo-replication/syncdaemon/master.py b/geo-replication/syncdaemon/master.py
index 65eaf673099..9501aeae6b5 100644
--- a/geo-replication/syncdaemon/master.py
+++ b/geo-replication/syncdaemon/master.py
@@ -22,11 +22,13 @@ from threading import Condition, Lock
from datetime import datetime
import gsyncdconfig as gconf
+import libgfchangelog
from rconf import rconf
-from syncdutils import Thread, GsyncdError, escape_space_newline
-from syncdutils import unescape_space_newline, gauxpfx, escape
-from syncdutils import lstat, errno_wrap, FreeObject, lf, matching_disk_gfid
-from syncdutils import NoStimeAvailable, PartialHistoryAvailable
+from syncdutils import (Thread, GsyncdError, escape_space_newline,
+ unescape_space_newline, gauxpfx, escape,
+ lstat, errno_wrap, FreeObject, lf, matching_disk_gfid,
+ NoStimeAvailable, PartialHistoryAvailable,
+ host_brick_split)
URXTIME = (-1, 0)
@@ -65,6 +67,9 @@ def _volinfo_hook_relax_foreign(self):
def edct(op, **ed):
dct = {}
dct['op'] = op
+ # This is used in automatic gfid conflict resolution.
+ # When marked True, it's skipped during re-processing.
+ dct['skip_entry'] = False
for k in ed:
if k == 'stat':
st = ed[k]
@@ -514,7 +519,7 @@ class GMasterCommon(object):
# If crawlwrap is called when partial history available,
# then it sets register_time which is the time when geo-rep
# worker registered to changelog consumption. Since nsec is
- # not considered in register time, their are chances of skipping
+ # not considered in register time, there are chances of skipping
# changes detection in xsync crawl. This limit will be reset when
# crawlwrap is called again.
self.live_changelog_start_time = None
@@ -696,7 +701,7 @@ class GMasterChangelogMixin(GMasterCommon):
TYPE_ENTRY = "E "
MAX_EF_RETRIES = 10
- MAX_OE_RETRIES = 5
+ MAX_OE_RETRIES = 10
# flat directory hierarchy for gfid based access
FLAT_DIR_HIERARCHY = '.'
@@ -792,6 +797,7 @@ class GMasterChangelogMixin(GMasterCommon):
pfx = gauxpfx()
fix_entry_ops = []
failures1 = []
+ remove_gfids = set()
for failure in failures:
if failure[2]['name_mismatch']:
pbname = failure[2]['slave_entry']
@@ -807,7 +813,7 @@ class GMasterChangelogMixin(GMasterCommon):
st = lstat(os.path.join(pfx, slave_gfid))
# Takes care of scenarios with no hardlinks
if isinstance(st, int) and st == ENOENT:
- logging.info(lf('Entry not present on master. Fixing gfid '
+ logging.debug(lf('Entry not present on master. Fixing gfid '
'mismatch in slave. Deleting the entry',
retry_count=retry_count,
entry=repr(failure)))
@@ -822,46 +828,67 @@ class GMasterChangelogMixin(GMasterCommon):
edct('UNLINK',
gfid=failure[2]['slave_gfid'],
entry=pbname))
+ remove_gfids.add(slave_gfid)
+ if op in ['RENAME']:
+ # If renamed gfid doesn't exists on master, remove
+ # rename entry and unlink src on slave
+ st = lstat(os.path.join(pfx, failure[0]['gfid']))
+ if isinstance(st, int) and st == ENOENT:
+ logging.debug("Unlink source %s" % repr(failure))
+ remove_gfids.add(failure[0]['gfid'])
+ fix_entry_ops.append(
+ edct('UNLINK',
+ gfid=failure[0]['gfid'],
+ entry=failure[0]['entry']))
# Takes care of scenarios of hardlinks/renames on master
elif not isinstance(st, int):
if matching_disk_gfid(slave_gfid, pbname):
# Safe to ignore the failure as master contains same
# file with same gfid. Remove entry from entries list
- logging.info(lf('Fixing gfid mismatch in slave. '
+ logging.debug(lf('Fixing gfid mismatch in slave. '
' Safe to ignore, take out entry',
retry_count=retry_count,
entry=repr(failure)))
- entries.remove(failure[0])
+ remove_gfids.add(failure[0]['gfid'])
+ if op == 'RENAME':
+ fix_entry_ops.append(
+ edct('UNLINK',
+ gfid=failure[0]['gfid'],
+ entry=failure[0]['entry']))
# The file exists on master but with different name.
# Probably renamed and got missed during xsync crawl.
elif failure[2]['slave_isdir']:
- realpath = os.readlink(os.path.join(gconf.local_path,
- ".glusterfs",
- slave_gfid[0:2],
- slave_gfid[2:4],
- slave_gfid))
+ realpath = os.readlink(os.path.join(
+ rconf.args.local_path,
+ ".glusterfs",
+ slave_gfid[0:2],
+ slave_gfid[2:4],
+ slave_gfid))
dst_entry = os.path.join(pfx, realpath.split('/')[-2],
realpath.split('/')[-1])
src_entry = pbname
- logging.info(lf('Fixing dir name/gfid mismatch in '
+ logging.debug(lf('Fixing dir name/gfid mismatch in '
'slave', retry_count=retry_count,
entry=repr(failure)))
if src_entry == dst_entry:
# Safe to ignore the failure as master contains
# same directory as in slave with same gfid.
# Remove the failure entry from entries list
- logging.info(lf('Fixing dir name/gfid mismatch'
+ logging.debug(lf('Fixing dir name/gfid mismatch'
' in slave. Safe to ignore, '
'take out entry',
retry_count=retry_count,
entry=repr(failure)))
- entries.remove(failure[0])
+ try:
+ entries.remove(failure[0])
+ except ValueError:
+ pass
else:
rename_dict = edct('RENAME', gfid=slave_gfid,
entry=src_entry,
entry1=dst_entry, stat=st,
link=None)
- logging.info(lf('Fixing dir name/gfid mismatch'
+ logging.debug(lf('Fixing dir name/gfid mismatch'
' in slave. Renaming',
retry_count=retry_count,
entry=repr(rename_dict)))
@@ -871,7 +898,7 @@ class GMasterChangelogMixin(GMasterCommon):
# renamed file exists and we are sure from
# matching_disk_gfid check that the entry doesn't
# exist with same gfid so we can safely delete on slave
- logging.info(lf('Fixing file gfid mismatch in slave. '
+ logging.debug(lf('Fixing file gfid mismatch in slave. '
'Hardlink/Rename Case. Deleting entry',
retry_count=retry_count,
entry=repr(failure)))
@@ -880,31 +907,52 @@ class GMasterChangelogMixin(GMasterCommon):
gfid=failure[2]['slave_gfid'],
entry=pbname))
elif failure[1] == ENOENT:
- # Ignore ENOENT error for fix_entry_ops aka retry_count > 1
- if retry_count > 1:
- logging.info(lf('ENOENT error while fixing entry ops. '
- 'Safe to ignore, take out entry',
+ if op in ['RENAME']:
+ pbname = failure[0]['entry1']
+ else:
+ pbname = failure[0]['entry']
+
+ pargfid = pbname.split('/')[1]
+ st = lstat(os.path.join(pfx, pargfid))
+ # Safe to ignore the failure as master doesn't contain
+ # parent directory.
+ if isinstance(st, int):
+ logging.debug(lf('Fixing ENOENT error in slave. Parent '
+ 'does not exist on master. Safe to '
+ 'ignore, take out entry',
retry_count=retry_count,
entry=repr(failure)))
- entries.remove(failure[0])
- elif op in ('MKNOD', 'CREATE', 'MKDIR'):
- pargfid = pbname.split('/')[1]
- st = lstat(os.path.join(pfx, pargfid))
- # Safe to ignore the failure as master doesn't contain
- # parent directory.
- if isinstance(st, int):
- logging.info(lf('Fixing ENOENT error in slave. Parent '
- 'does not exist on master. Safe to '
- 'ignore, take out entry',
- retry_count=retry_count,
- entry=repr(failure)))
+ try:
entries.remove(failure[0])
+ except ValueError:
+ pass
+ else:
+ logging.debug(lf('Fixing ENOENT error in slave. Create '
+ 'parent directory on slave.',
+ retry_count=retry_count,
+ entry=repr(failure)))
+ realpath = os.readlink(os.path.join(rconf.args.local_path,
+ ".glusterfs",
+ pargfid[0:2],
+ pargfid[2:4],
+ pargfid))
+ dir_entry = os.path.join(pfx, realpath.split('/')[-2],
+ realpath.split('/')[-1])
+ fix_entry_ops.append(
+ edct('MKDIR', gfid=pargfid, entry=dir_entry,
+ mode=st.st_mode, uid=st.st_uid, gid=st.st_gid))
+
+ logging.debug("remove_gfids: %s" % repr(remove_gfids))
+ if remove_gfids:
+ for e in entries:
+ if e['op'] in ['MKDIR', 'MKNOD', 'CREATE', 'RENAME'] \
+ and e['gfid'] in remove_gfids:
+ logging.debug("Removed entry op from retrial list: entry: %s" % repr(e))
+ e['skip_entry'] = True
if fix_entry_ops:
# Process deletions of entries whose gfids are mismatched
failures1 = self.slave.server.entry_ops(fix_entry_ops)
- if not failures1:
- logging.info("Successfully fixed entry ops with gfid mismatch")
return (failures1, fix_entry_ops)
@@ -1078,6 +1126,11 @@ class GMasterChangelogMixin(GMasterCommon):
os.path.join(pfx, ec[self.POS_ENTRY1 - 1]))
entries.append(edct(ty, gfid=gfid, entry=e1, entry1=en,
stat=st, link=rl))
+ # If src doesn't exist while doing rename, destination
+ # is created. If data is not followed by rename, this
+ # remains zero byte file on slave. Hence add data entry
+ # for renames
+ datas.add(os.path.join(pfx, gfid))
else:
# stat() to get mode and other information
if not matching_disk_gfid(gfid, en):
@@ -1101,6 +1154,12 @@ class GMasterChangelogMixin(GMasterCommon):
rl = None
entries.append(edct(ty, stat=st, entry=en, gfid=gfid,
link=rl))
+ # If src doesn't exist while doing link, destination
+ # is created based on file type. If data is not
+ # followed by link, this remains zero byte file on
+ # slave. Hence add data entry for links
+ if rl is None:
+ datas.add(os.path.join(pfx, gfid))
elif ty == 'SYMLINK':
rl = errno_wrap(os.readlink, [en], [ENOENT],
[ESTALE, EINTR])
@@ -1148,7 +1207,6 @@ class GMasterChangelogMixin(GMasterCommon):
logging.debug('entries: %s' % repr(entries))
# Increment counters for Status
- self.status.inc_value("entry", len(entries))
self.files_in_batch += len(datas)
self.status.inc_value("data", len(datas))
@@ -1166,10 +1224,13 @@ class GMasterChangelogMixin(GMasterCommon):
if gconf.get("gfid-conflict-resolution"):
count = 0
+ if failures:
+ logging.info(lf('Entry ops failed with gfid mismatch',
+ count=len(failures)))
while failures and count < self.MAX_OE_RETRIES:
count += 1
self.handle_entry_failures(failures, entries)
- logging.info("Retry original entries. count = %s" % count)
+ logging.info(lf('Retry original entries', count=count))
failures = self.slave.server.entry_ops(entries)
if not failures:
logging.info("Successfully fixed all entry ops with "
@@ -1211,10 +1272,10 @@ class GMasterChangelogMixin(GMasterCommon):
continue
meta_entries.append(edct('META', go=go[0], stat=st))
if meta_entries:
- self.status.inc_value("meta", len(entries))
+ self.status.inc_value("meta", len(meta_entries))
failures = self.slave.server.meta_ops(meta_entries)
self.log_failures(failures, 'go', '', 'META')
- self.status.dec_value("meta", len(entries))
+ self.status.dec_value("meta", len(meta_entries))
self.batch_stats["META_SYNC_TIME"] += time.time() - meta_start_time
@@ -1406,7 +1467,7 @@ class GMasterChangelogMixin(GMasterCommon):
node = rconf.args.resource_remote
node_data = node.split("@")
node = node_data[-1]
- remote_node_ip = node.split(":")[0]
+ remote_node_ip, _ = host_brick_split(node)
self.status.set_slave_node(remote_node_ip)
def changelogs_batch_process(self, changes):
@@ -1439,9 +1500,9 @@ class GMasterChangelogMixin(GMasterCommon):
# that are _historical_ to that time.
data_stime = self.get_data_stime()
- self.changelog_agent.scan()
+ libgfchangelog.scan()
self.crawls += 1
- changes = self.changelog_agent.getchanges()
+ changes = libgfchangelog.getchanges()
if changes:
if data_stime:
logging.info(lf("slave's time",
@@ -1458,10 +1519,9 @@ class GMasterChangelogMixin(GMasterCommon):
self.changelogs_batch_process(changes)
- def register(self, register_time, changelog_agent, status):
- self.changelog_agent = changelog_agent
+ def register(self, register_time, status):
self.sleep_interval = gconf.get("change-interval")
- self.changelog_done_func = self.changelog_agent.done
+ self.changelog_done_func = libgfchangelog.done
self.tempdir = self.setup_working_dir()
self.processed_changelogs_dir = os.path.join(self.tempdir,
".processed")
@@ -1470,11 +1530,10 @@ class GMasterChangelogMixin(GMasterCommon):
class GMasterChangeloghistoryMixin(GMasterChangelogMixin):
- def register(self, register_time, changelog_agent, status):
- self.changelog_agent = changelog_agent
+ def register(self, register_time, status):
self.changelog_register_time = register_time
self.history_crawl_start_time = register_time
- self.changelog_done_func = self.changelog_agent.history_done
+ self.changelog_done_func = libgfchangelog.history_done
self.history_turns = 0
self.tempdir = self.setup_working_dir()
self.processed_changelogs_dir = os.path.join(self.tempdir,
@@ -1488,6 +1547,12 @@ class GMasterChangeloghistoryMixin(GMasterChangelogMixin):
data_stime = self.get_data_stime()
end_time = int(time.time())
+
+ #as start of historical crawl marks Geo-rep worker restart
+ if gconf.get("ignore-deletes"):
+ logging.info(lf('ignore-deletes config option is set',
+ stime=data_stime))
+
logging.info(lf('starting history crawl',
turns=self.history_turns,
stime=data_stime,
@@ -1502,7 +1567,7 @@ class GMasterChangeloghistoryMixin(GMasterChangelogMixin):
# location then consuming history will not work(Known issue as of now)
changelog_path = os.path.join(rconf.args.local_path,
".glusterfs/changelogs")
- ret, actual_end = self.changelog_agent.history(
+ ret, actual_end = libgfchangelog.history_changelog(
changelog_path,
data_stime[0],
end_time,
@@ -1514,10 +1579,10 @@ class GMasterChangeloghistoryMixin(GMasterChangelogMixin):
# to be processed. returns positive value as number of changelogs
# to be processed, which will be fetched using
# history_getchanges()
- while self.changelog_agent.history_scan() > 0:
+ while libgfchangelog.history_scan() > 0:
self.crawls += 1
- changes = self.changelog_agent.history_getchanges()
+ changes = libgfchangelog.history_getchanges()
if changes:
if data_stime:
logging.info(lf("slave's time",
@@ -1570,7 +1635,7 @@ class GMasterXsyncMixin(GMasterChangelogMixin):
XSYNC_MAX_ENTRIES = 1 << 13
- def register(self, register_time=None, changelog_agent=None, status=None):
+ def register(self, register_time=None, status=None):
self.status = status
self.counter = 0
self.comlist = []