summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAravinda VK <avishwan@redhat.com>2014-09-17 12:59:52 +0530
committerVenky Shankar <vshankar@redhat.com>2014-09-26 00:18:48 -0700
commit3c29c50cf60135245290133fbfed11aa3cf01e39 (patch)
tree94a988edf292f0441fc21ee27520a048b2502132
parent60a75cdca76b0a4b83eb6f5bc70a320d586d79aa (diff)
geo-rep: fix same file different gfid in master and slave
While processing RENAME in changelog, if the file is unlinked in master, then geo-rep was sending UNLINK to slave instead of RENAME. If rsync job fails if one of the file failed to sync in the job. This patch adds logic to remove GFID from data list if the same changelog has UNLINK entry for it after the DATA. Or it removes those GFIDs during retry of changelogs processing. BUG: 1143853 Change-Id: I982dc976397cd0ab676bb912583f66a28f821926 Signed-off-by: Aravinda VK <avishwan@redhat.com> Reviewed-on: http://review.gluster.org/8761 Tested-by: Gluster Build System <jenkins@build.gluster.com> Reviewed-by: Jeff Darcy <jdarcy@redhat.com> Reviewed-by: Kotresh HR <khiremat@redhat.com> Reviewed-by: Venky Shankar <vshankar@redhat.com> Tested-by: Venky Shankar <vshankar@redhat.com>
-rw-r--r--geo-replication/syncdaemon/master.py50
-rw-r--r--geo-replication/syncdaemon/resource.py5
2 files changed, 38 insertions, 17 deletions
diff --git a/geo-replication/syncdaemon/master.py b/geo-replication/syncdaemon/master.py
index 79c90630dca..d8d26baafe1 100644
--- a/geo-replication/syncdaemon/master.py
+++ b/geo-replication/syncdaemon/master.py
@@ -239,6 +239,7 @@ class TarSSHEngine(object):
"""
def a_syncdata(self, files):
+ self.unlinked_gfids = []
logging.debug('files: %s' % (files))
for f in files:
pb = self.syncer.add(f)
@@ -252,6 +253,7 @@ class TarSSHEngine(object):
# stat check for file presence
st = lstat(se)
if isinstance(st, int):
+ self.unlinked_gfids.append(se)
return True
logging.warn('tar+ssh: %s [errcode: %d]' % (se, rv[1]))
self.add_job(self.FLAT_DIR_HIERARCHY, 'reg', regjob, f, None, pb)
@@ -270,6 +272,7 @@ class RsyncEngine(object):
"""Sync engine that uses rsync(1) for data transfers"""
def a_syncdata(self, files):
+ self.unlinked_gfids = []
logging.debug('files: %s' % (files))
for f in files:
logging.debug('candidate for syncing %s' % f)
@@ -286,6 +289,7 @@ class RsyncEngine(object):
st = lstat(se)
if isinstance(st, int):
# file got unlinked in the interim
+ self.unlinked_gfids.append(se)
return True
logging.warn('Rsync: %s [errcode: %d]' % (se, rv[1]))
self.add_job(self.FLAT_DIR_HIERARCHY, 'reg', regjob, f, None, pb)
@@ -423,6 +427,7 @@ class GMasterCommon(object):
self.checkpoint_thread = None
self.current_files_skipped_count = 0
self.skipped_gfid_list = []
+ self.unlinked_gfids = []
def init_keep_alive(cls):
"""start the keep-alive thread """
@@ -797,9 +802,10 @@ class GMasterChangelogMixin(GMasterCommon):
if k == 'stat':
st = ed[k]
dst = dct['stat'] = {}
- dst['uid'] = st.st_uid
- dst['gid'] = st.st_gid
- dst['mode'] = st.st_mode
+ if st:
+ dst['uid'] = st.st_uid
+ dst['gid'] = st.st_gid
+ dst['mode'] = st.st_mode
else:
dct[k] = ed[k]
return dct
@@ -830,6 +836,12 @@ class GMasterChangelogMixin(GMasterCommon):
gfid = ec[self.POS_GFID]
if ty in ['UNLINK', 'RMDIR']:
+ # Remove from DATA list, so that rsync will
+ # not fail
+ pt = os.path.join(pfx, ec[0])
+ if pt in datas:
+ datas.remove(pt)
+
purge_update()
entries.append(edct(ty, gfid=gfid, entry=en))
elif ty in ['CREATE', 'MKDIR', 'MKNOD']:
@@ -838,16 +850,22 @@ class GMasterChangelogMixin(GMasterCommon):
entries.append(edct(ty, gfid=gfid, entry=en,
mode=int(ec[2]),
uid=int(ec[3]), gid=int(ec[4])))
+ elif ty == "RENAME":
+ go = os.path.join(pfx, gfid)
+ st = lstat(go)
+ if isinstance(st, int):
+ st = {}
+
+ entry_update()
+ e1 = unescape(os.path.join(pfx, ec[self.POS_ENTRY1 - 1]))
+ entries.append(edct(ty, gfid=gfid, entry=e1, entry1=en,
+ stat=st))
else:
# stat() to get mode and other information
go = os.path.join(pfx, gfid)
st = lstat(go)
if isinstance(st, int):
- if ty == 'RENAME': # special hack for renames...
- entries.append(edct('UNLINK', gfid=gfid, entry=en))
- else:
- logging.debug(
- 'file %s got purged in the interim' % go)
+ logging.debug('file %s got purged in the interim' % go)
continue
if ty == 'LINK':
@@ -860,16 +878,17 @@ class GMasterChangelogMixin(GMasterCommon):
entry_update()
entries.append(
edct(ty, stat=st, entry=en, gfid=gfid, link=rl))
- elif ty == 'RENAME':
- entry_update()
- e1 = unescape(
- os.path.join(pfx, ec[self.POS_ENTRY1 - 1]))
- entries.append(
- edct(ty, gfid=gfid, entry=e1, entry1=en, stat=st))
else:
logging.warn('ignoring %s [op %s]' % (gfid, ty))
elif et == self.TYPE_GFID:
- datas.add(os.path.join(pfx, ec[0]))
+ # If self.unlinked_gfids is available, then that means it is
+ # retrying the changelog second time. Do not add the GFID's
+ # to rsync job if failed previously but unlinked in master
+ if self.unlinked_gfids and \
+ os.path.join(pfx, ec[0]) in self.unlinked_gfids:
+ logging.debug("ignoring data, since file purged interim")
+ else:
+ datas.add(os.path.join(pfx, ec[0]))
elif et == self.TYPE_META:
if ec[1] == 'SETATTR': # only setattr's for now...
if len(ec) == 5:
@@ -950,6 +969,7 @@ class GMasterChangelogMixin(GMasterCommon):
# @change is the last changelog (therefore max time for this batch)
if self.syncdata_wait():
+ self.unlinked_gfids = []
if done:
xtl = (int(change.split('.')[-1]) - 1, 0)
self.upd_stime(xtl)
diff --git a/geo-replication/syncdaemon/resource.py b/geo-replication/syncdaemon/resource.py
index 2a887daab15..1bee0a3338f 100644
--- a/geo-replication/syncdaemon/resource.py
+++ b/geo-replication/syncdaemon/resource.py
@@ -651,8 +651,9 @@ class Server(object):
en = e['entry1']
st = lstat(entry)
if isinstance(st, int):
- (pg, bname) = entry2pb(en)
- blob = entry_pack_reg_stat(gfid, bname, e['stat'])
+ if e['stat']:
+ (pg, bname) = entry2pb(en)
+ blob = entry_pack_reg_stat(gfid, bname, e['stat'])
else:
errno_wrap(os.rename, [entry, en], [ENOENT, EEXIST])
if blob: