summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAravinda VK <avishwan@redhat.com>2014-09-17 12:59:52 +0530
committerVijay Bellur <vbellur@redhat.com>2014-09-29 23:32:57 -0700
commitc3bc8b52b560463706a409836ff7e4118695cb0a (patch)
treed5a803d46703c949b89b0f88fad4cab6e5a094fc
parente06ee49ffd6c2829357f89b192f25c1f116925ff (diff)
geo-rep: fix same file different gfid in master and slave
While processing RENAME in changelog, if the file is unlinked in master, then geo-rep was sending UNLINK to slave instead of RENAME. If rsync job fails if one of the file failed to sync in the job. This patch adds logic to remove GFID from data list if the same changelog has UNLINK entry for it after the DATA. Or it removes those GFIDs during retry of changelogs processing. BUG: 1147420 Change-Id: I982dc976397cd0ab676bb912583f66a28f821926 Signed-off-by: Aravinda VK <avishwan@redhat.com> Reviewed-on: http://review.gluster.org/8761 Tested-by: Gluster Build System <jenkins@build.gluster.com> Reviewed-by: Jeff Darcy <jdarcy@redhat.com> Reviewed-by: Kotresh HR <khiremat@redhat.com> Reviewed-by: Venky Shankar <vshankar@redhat.com> Tested-by: Venky Shankar <vshankar@redhat.com> Reviewed-on: http://review.gluster.org/8879 Reviewed-by: Vijay Bellur <vbellur@redhat.com>
-rw-r--r--geo-replication/syncdaemon/master.py50
-rw-r--r--geo-replication/syncdaemon/resource.py5
2 files changed, 38 insertions, 17 deletions
diff --git a/geo-replication/syncdaemon/master.py b/geo-replication/syncdaemon/master.py
index 79c90630dca..d8d26baafe1 100644
--- a/geo-replication/syncdaemon/master.py
+++ b/geo-replication/syncdaemon/master.py
@@ -239,6 +239,7 @@ class TarSSHEngine(object):
"""
def a_syncdata(self, files):
+ self.unlinked_gfids = []
logging.debug('files: %s' % (files))
for f in files:
pb = self.syncer.add(f)
@@ -252,6 +253,7 @@ class TarSSHEngine(object):
# stat check for file presence
st = lstat(se)
if isinstance(st, int):
+ self.unlinked_gfids.append(se)
return True
logging.warn('tar+ssh: %s [errcode: %d]' % (se, rv[1]))
self.add_job(self.FLAT_DIR_HIERARCHY, 'reg', regjob, f, None, pb)
@@ -270,6 +272,7 @@ class RsyncEngine(object):
"""Sync engine that uses rsync(1) for data transfers"""
def a_syncdata(self, files):
+ self.unlinked_gfids = []
logging.debug('files: %s' % (files))
for f in files:
logging.debug('candidate for syncing %s' % f)
@@ -286,6 +289,7 @@ class RsyncEngine(object):
st = lstat(se)
if isinstance(st, int):
# file got unlinked in the interim
+ self.unlinked_gfids.append(se)
return True
logging.warn('Rsync: %s [errcode: %d]' % (se, rv[1]))
self.add_job(self.FLAT_DIR_HIERARCHY, 'reg', regjob, f, None, pb)
@@ -423,6 +427,7 @@ class GMasterCommon(object):
self.checkpoint_thread = None
self.current_files_skipped_count = 0
self.skipped_gfid_list = []
+ self.unlinked_gfids = []
def init_keep_alive(cls):
"""start the keep-alive thread """
@@ -797,9 +802,10 @@ class GMasterChangelogMixin(GMasterCommon):
if k == 'stat':
st = ed[k]
dst = dct['stat'] = {}
- dst['uid'] = st.st_uid
- dst['gid'] = st.st_gid
- dst['mode'] = st.st_mode
+ if st:
+ dst['uid'] = st.st_uid
+ dst['gid'] = st.st_gid
+ dst['mode'] = st.st_mode
else:
dct[k] = ed[k]
return dct
@@ -830,6 +836,12 @@ class GMasterChangelogMixin(GMasterCommon):
gfid = ec[self.POS_GFID]
if ty in ['UNLINK', 'RMDIR']:
+ # Remove from DATA list, so that rsync will
+ # not fail
+ pt = os.path.join(pfx, ec[0])
+ if pt in datas:
+ datas.remove(pt)
+
purge_update()
entries.append(edct(ty, gfid=gfid, entry=en))
elif ty in ['CREATE', 'MKDIR', 'MKNOD']:
@@ -838,16 +850,22 @@ class GMasterChangelogMixin(GMasterCommon):
entries.append(edct(ty, gfid=gfid, entry=en,
mode=int(ec[2]),
uid=int(ec[3]), gid=int(ec[4])))
+ elif ty == "RENAME":
+ go = os.path.join(pfx, gfid)
+ st = lstat(go)
+ if isinstance(st, int):
+ st = {}
+
+ entry_update()
+ e1 = unescape(os.path.join(pfx, ec[self.POS_ENTRY1 - 1]))
+ entries.append(edct(ty, gfid=gfid, entry=e1, entry1=en,
+ stat=st))
else:
# stat() to get mode and other information
go = os.path.join(pfx, gfid)
st = lstat(go)
if isinstance(st, int):
- if ty == 'RENAME': # special hack for renames...
- entries.append(edct('UNLINK', gfid=gfid, entry=en))
- else:
- logging.debug(
- 'file %s got purged in the interim' % go)
+ logging.debug('file %s got purged in the interim' % go)
continue
if ty == 'LINK':
@@ -860,16 +878,17 @@ class GMasterChangelogMixin(GMasterCommon):
entry_update()
entries.append(
edct(ty, stat=st, entry=en, gfid=gfid, link=rl))
- elif ty == 'RENAME':
- entry_update()
- e1 = unescape(
- os.path.join(pfx, ec[self.POS_ENTRY1 - 1]))
- entries.append(
- edct(ty, gfid=gfid, entry=e1, entry1=en, stat=st))
else:
logging.warn('ignoring %s [op %s]' % (gfid, ty))
elif et == self.TYPE_GFID:
- datas.add(os.path.join(pfx, ec[0]))
+ # If self.unlinked_gfids is available, then that means it is
+ # retrying the changelog second time. Do not add the GFID's
+ # to rsync job if failed previously but unlinked in master
+ if self.unlinked_gfids and \
+ os.path.join(pfx, ec[0]) in self.unlinked_gfids:
+ logging.debug("ignoring data, since file purged interim")
+ else:
+ datas.add(os.path.join(pfx, ec[0]))
elif et == self.TYPE_META:
if ec[1] == 'SETATTR': # only setattr's for now...
if len(ec) == 5:
@@ -950,6 +969,7 @@ class GMasterChangelogMixin(GMasterCommon):
# @change is the last changelog (therefore max time for this batch)
if self.syncdata_wait():
+ self.unlinked_gfids = []
if done:
xtl = (int(change.split('.')[-1]) - 1, 0)
self.upd_stime(xtl)
diff --git a/geo-replication/syncdaemon/resource.py b/geo-replication/syncdaemon/resource.py
index 2a887daab15..1bee0a3338f 100644
--- a/geo-replication/syncdaemon/resource.py
+++ b/geo-replication/syncdaemon/resource.py
@@ -651,8 +651,9 @@ class Server(object):
en = e['entry1']
st = lstat(entry)
if isinstance(st, int):
- (pg, bname) = entry2pb(en)
- blob = entry_pack_reg_stat(gfid, bname, e['stat'])
+ if e['stat']:
+ (pg, bname) = entry2pb(en)
+ blob = entry_pack_reg_stat(gfid, bname, e['stat'])
else:
errno_wrap(os.rename, [entry, en], [ENOENT, EEXIST])
if blob: