diff options
Diffstat (limited to 'geo-replication/syncdaemon/master.py')
| -rw-r--r-- | geo-replication/syncdaemon/master.py | 173 |
1 files changed, 119 insertions, 54 deletions
diff --git a/geo-replication/syncdaemon/master.py b/geo-replication/syncdaemon/master.py index 65eaf673099..9501aeae6b5 100644 --- a/geo-replication/syncdaemon/master.py +++ b/geo-replication/syncdaemon/master.py @@ -22,11 +22,13 @@ from threading import Condition, Lock from datetime import datetime import gsyncdconfig as gconf +import libgfchangelog from rconf import rconf -from syncdutils import Thread, GsyncdError, escape_space_newline -from syncdutils import unescape_space_newline, gauxpfx, escape -from syncdutils import lstat, errno_wrap, FreeObject, lf, matching_disk_gfid -from syncdutils import NoStimeAvailable, PartialHistoryAvailable +from syncdutils import (Thread, GsyncdError, escape_space_newline, + unescape_space_newline, gauxpfx, escape, + lstat, errno_wrap, FreeObject, lf, matching_disk_gfid, + NoStimeAvailable, PartialHistoryAvailable, + host_brick_split) URXTIME = (-1, 0) @@ -65,6 +67,9 @@ def _volinfo_hook_relax_foreign(self): def edct(op, **ed): dct = {} dct['op'] = op + # This is used in automatic gfid conflict resolution. + # When marked True, it's skipped during re-processing. + dct['skip_entry'] = False for k in ed: if k == 'stat': st = ed[k] @@ -514,7 +519,7 @@ class GMasterCommon(object): # If crawlwrap is called when partial history available, # then it sets register_time which is the time when geo-rep # worker registered to changelog consumption. Since nsec is - # not considered in register time, their are chances of skipping + # not considered in register time, there are chances of skipping # changes detection in xsync crawl. This limit will be reset when # crawlwrap is called again. self.live_changelog_start_time = None @@ -696,7 +701,7 @@ class GMasterChangelogMixin(GMasterCommon): TYPE_ENTRY = "E " MAX_EF_RETRIES = 10 - MAX_OE_RETRIES = 5 + MAX_OE_RETRIES = 10 # flat directory hierarchy for gfid based access FLAT_DIR_HIERARCHY = '.' @@ -792,6 +797,7 @@ class GMasterChangelogMixin(GMasterCommon): pfx = gauxpfx() fix_entry_ops = [] failures1 = [] + remove_gfids = set() for failure in failures: if failure[2]['name_mismatch']: pbname = failure[2]['slave_entry'] @@ -807,7 +813,7 @@ class GMasterChangelogMixin(GMasterCommon): st = lstat(os.path.join(pfx, slave_gfid)) # Takes care of scenarios with no hardlinks if isinstance(st, int) and st == ENOENT: - logging.info(lf('Entry not present on master. Fixing gfid ' + logging.debug(lf('Entry not present on master. Fixing gfid ' 'mismatch in slave. Deleting the entry', retry_count=retry_count, entry=repr(failure))) @@ -822,46 +828,67 @@ class GMasterChangelogMixin(GMasterCommon): edct('UNLINK', gfid=failure[2]['slave_gfid'], entry=pbname)) + remove_gfids.add(slave_gfid) + if op in ['RENAME']: + # If renamed gfid doesn't exists on master, remove + # rename entry and unlink src on slave + st = lstat(os.path.join(pfx, failure[0]['gfid'])) + if isinstance(st, int) and st == ENOENT: + logging.debug("Unlink source %s" % repr(failure)) + remove_gfids.add(failure[0]['gfid']) + fix_entry_ops.append( + edct('UNLINK', + gfid=failure[0]['gfid'], + entry=failure[0]['entry'])) # Takes care of scenarios of hardlinks/renames on master elif not isinstance(st, int): if matching_disk_gfid(slave_gfid, pbname): # Safe to ignore the failure as master contains same # file with same gfid. Remove entry from entries list - logging.info(lf('Fixing gfid mismatch in slave. ' + logging.debug(lf('Fixing gfid mismatch in slave. ' ' Safe to ignore, take out entry', retry_count=retry_count, entry=repr(failure))) - entries.remove(failure[0]) + remove_gfids.add(failure[0]['gfid']) + if op == 'RENAME': + fix_entry_ops.append( + edct('UNLINK', + gfid=failure[0]['gfid'], + entry=failure[0]['entry'])) # The file exists on master but with different name. # Probably renamed and got missed during xsync crawl. elif failure[2]['slave_isdir']: - realpath = os.readlink(os.path.join(gconf.local_path, - ".glusterfs", - slave_gfid[0:2], - slave_gfid[2:4], - slave_gfid)) + realpath = os.readlink(os.path.join( + rconf.args.local_path, + ".glusterfs", + slave_gfid[0:2], + slave_gfid[2:4], + slave_gfid)) dst_entry = os.path.join(pfx, realpath.split('/')[-2], realpath.split('/')[-1]) src_entry = pbname - logging.info(lf('Fixing dir name/gfid mismatch in ' + logging.debug(lf('Fixing dir name/gfid mismatch in ' 'slave', retry_count=retry_count, entry=repr(failure))) if src_entry == dst_entry: # Safe to ignore the failure as master contains # same directory as in slave with same gfid. # Remove the failure entry from entries list - logging.info(lf('Fixing dir name/gfid mismatch' + logging.debug(lf('Fixing dir name/gfid mismatch' ' in slave. Safe to ignore, ' 'take out entry', retry_count=retry_count, entry=repr(failure))) - entries.remove(failure[0]) + try: + entries.remove(failure[0]) + except ValueError: + pass else: rename_dict = edct('RENAME', gfid=slave_gfid, entry=src_entry, entry1=dst_entry, stat=st, link=None) - logging.info(lf('Fixing dir name/gfid mismatch' + logging.debug(lf('Fixing dir name/gfid mismatch' ' in slave. Renaming', retry_count=retry_count, entry=repr(rename_dict))) @@ -871,7 +898,7 @@ class GMasterChangelogMixin(GMasterCommon): # renamed file exists and we are sure from # matching_disk_gfid check that the entry doesn't # exist with same gfid so we can safely delete on slave - logging.info(lf('Fixing file gfid mismatch in slave. ' + logging.debug(lf('Fixing file gfid mismatch in slave. ' 'Hardlink/Rename Case. Deleting entry', retry_count=retry_count, entry=repr(failure))) @@ -880,31 +907,52 @@ class GMasterChangelogMixin(GMasterCommon): gfid=failure[2]['slave_gfid'], entry=pbname)) elif failure[1] == ENOENT: - # Ignore ENOENT error for fix_entry_ops aka retry_count > 1 - if retry_count > 1: - logging.info(lf('ENOENT error while fixing entry ops. ' - 'Safe to ignore, take out entry', + if op in ['RENAME']: + pbname = failure[0]['entry1'] + else: + pbname = failure[0]['entry'] + + pargfid = pbname.split('/')[1] + st = lstat(os.path.join(pfx, pargfid)) + # Safe to ignore the failure as master doesn't contain + # parent directory. + if isinstance(st, int): + logging.debug(lf('Fixing ENOENT error in slave. Parent ' + 'does not exist on master. Safe to ' + 'ignore, take out entry', retry_count=retry_count, entry=repr(failure))) - entries.remove(failure[0]) - elif op in ('MKNOD', 'CREATE', 'MKDIR'): - pargfid = pbname.split('/')[1] - st = lstat(os.path.join(pfx, pargfid)) - # Safe to ignore the failure as master doesn't contain - # parent directory. - if isinstance(st, int): - logging.info(lf('Fixing ENOENT error in slave. Parent ' - 'does not exist on master. Safe to ' - 'ignore, take out entry', - retry_count=retry_count, - entry=repr(failure))) + try: entries.remove(failure[0]) + except ValueError: + pass + else: + logging.debug(lf('Fixing ENOENT error in slave. Create ' + 'parent directory on slave.', + retry_count=retry_count, + entry=repr(failure))) + realpath = os.readlink(os.path.join(rconf.args.local_path, + ".glusterfs", + pargfid[0:2], + pargfid[2:4], + pargfid)) + dir_entry = os.path.join(pfx, realpath.split('/')[-2], + realpath.split('/')[-1]) + fix_entry_ops.append( + edct('MKDIR', gfid=pargfid, entry=dir_entry, + mode=st.st_mode, uid=st.st_uid, gid=st.st_gid)) + + logging.debug("remove_gfids: %s" % repr(remove_gfids)) + if remove_gfids: + for e in entries: + if e['op'] in ['MKDIR', 'MKNOD', 'CREATE', 'RENAME'] \ + and e['gfid'] in remove_gfids: + logging.debug("Removed entry op from retrial list: entry: %s" % repr(e)) + e['skip_entry'] = True if fix_entry_ops: # Process deletions of entries whose gfids are mismatched failures1 = self.slave.server.entry_ops(fix_entry_ops) - if not failures1: - logging.info("Successfully fixed entry ops with gfid mismatch") return (failures1, fix_entry_ops) @@ -1078,6 +1126,11 @@ class GMasterChangelogMixin(GMasterCommon): os.path.join(pfx, ec[self.POS_ENTRY1 - 1])) entries.append(edct(ty, gfid=gfid, entry=e1, entry1=en, stat=st, link=rl)) + # If src doesn't exist while doing rename, destination + # is created. If data is not followed by rename, this + # remains zero byte file on slave. Hence add data entry + # for renames + datas.add(os.path.join(pfx, gfid)) else: # stat() to get mode and other information if not matching_disk_gfid(gfid, en): @@ -1101,6 +1154,12 @@ class GMasterChangelogMixin(GMasterCommon): rl = None entries.append(edct(ty, stat=st, entry=en, gfid=gfid, link=rl)) + # If src doesn't exist while doing link, destination + # is created based on file type. If data is not + # followed by link, this remains zero byte file on + # slave. Hence add data entry for links + if rl is None: + datas.add(os.path.join(pfx, gfid)) elif ty == 'SYMLINK': rl = errno_wrap(os.readlink, [en], [ENOENT], [ESTALE, EINTR]) @@ -1148,7 +1207,6 @@ class GMasterChangelogMixin(GMasterCommon): logging.debug('entries: %s' % repr(entries)) # Increment counters for Status - self.status.inc_value("entry", len(entries)) self.files_in_batch += len(datas) self.status.inc_value("data", len(datas)) @@ -1166,10 +1224,13 @@ class GMasterChangelogMixin(GMasterCommon): if gconf.get("gfid-conflict-resolution"): count = 0 + if failures: + logging.info(lf('Entry ops failed with gfid mismatch', + count=len(failures))) while failures and count < self.MAX_OE_RETRIES: count += 1 self.handle_entry_failures(failures, entries) - logging.info("Retry original entries. count = %s" % count) + logging.info(lf('Retry original entries', count=count)) failures = self.slave.server.entry_ops(entries) if not failures: logging.info("Successfully fixed all entry ops with " @@ -1211,10 +1272,10 @@ class GMasterChangelogMixin(GMasterCommon): continue meta_entries.append(edct('META', go=go[0], stat=st)) if meta_entries: - self.status.inc_value("meta", len(entries)) + self.status.inc_value("meta", len(meta_entries)) failures = self.slave.server.meta_ops(meta_entries) self.log_failures(failures, 'go', '', 'META') - self.status.dec_value("meta", len(entries)) + self.status.dec_value("meta", len(meta_entries)) self.batch_stats["META_SYNC_TIME"] += time.time() - meta_start_time @@ -1406,7 +1467,7 @@ class GMasterChangelogMixin(GMasterCommon): node = rconf.args.resource_remote node_data = node.split("@") node = node_data[-1] - remote_node_ip = node.split(":")[0] + remote_node_ip, _ = host_brick_split(node) self.status.set_slave_node(remote_node_ip) def changelogs_batch_process(self, changes): @@ -1439,9 +1500,9 @@ class GMasterChangelogMixin(GMasterCommon): # that are _historical_ to that time. data_stime = self.get_data_stime() - self.changelog_agent.scan() + libgfchangelog.scan() self.crawls += 1 - changes = self.changelog_agent.getchanges() + changes = libgfchangelog.getchanges() if changes: if data_stime: logging.info(lf("slave's time", @@ -1458,10 +1519,9 @@ class GMasterChangelogMixin(GMasterCommon): self.changelogs_batch_process(changes) - def register(self, register_time, changelog_agent, status): - self.changelog_agent = changelog_agent + def register(self, register_time, status): self.sleep_interval = gconf.get("change-interval") - self.changelog_done_func = self.changelog_agent.done + self.changelog_done_func = libgfchangelog.done self.tempdir = self.setup_working_dir() self.processed_changelogs_dir = os.path.join(self.tempdir, ".processed") @@ -1470,11 +1530,10 @@ class GMasterChangelogMixin(GMasterCommon): class GMasterChangeloghistoryMixin(GMasterChangelogMixin): - def register(self, register_time, changelog_agent, status): - self.changelog_agent = changelog_agent + def register(self, register_time, status): self.changelog_register_time = register_time self.history_crawl_start_time = register_time - self.changelog_done_func = self.changelog_agent.history_done + self.changelog_done_func = libgfchangelog.history_done self.history_turns = 0 self.tempdir = self.setup_working_dir() self.processed_changelogs_dir = os.path.join(self.tempdir, @@ -1488,6 +1547,12 @@ class GMasterChangeloghistoryMixin(GMasterChangelogMixin): data_stime = self.get_data_stime() end_time = int(time.time()) + + #as start of historical crawl marks Geo-rep worker restart + if gconf.get("ignore-deletes"): + logging.info(lf('ignore-deletes config option is set', + stime=data_stime)) + logging.info(lf('starting history crawl', turns=self.history_turns, stime=data_stime, @@ -1502,7 +1567,7 @@ class GMasterChangeloghistoryMixin(GMasterChangelogMixin): # location then consuming history will not work(Known issue as of now) changelog_path = os.path.join(rconf.args.local_path, ".glusterfs/changelogs") - ret, actual_end = self.changelog_agent.history( + ret, actual_end = libgfchangelog.history_changelog( changelog_path, data_stime[0], end_time, @@ -1514,10 +1579,10 @@ class GMasterChangeloghistoryMixin(GMasterChangelogMixin): # to be processed. returns positive value as number of changelogs # to be processed, which will be fetched using # history_getchanges() - while self.changelog_agent.history_scan() > 0: + while libgfchangelog.history_scan() > 0: self.crawls += 1 - changes = self.changelog_agent.history_getchanges() + changes = libgfchangelog.history_getchanges() if changes: if data_stime: logging.info(lf("slave's time", @@ -1570,7 +1635,7 @@ class GMasterXsyncMixin(GMasterChangelogMixin): XSYNC_MAX_ENTRIES = 1 << 13 - def register(self, register_time=None, changelog_agent=None, status=None): + def register(self, register_time=None, status=None): self.status = status self.counter = 0 self.comlist = [] |
