summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--geo-replication/syncdaemon/master.py50
-rw-r--r--geo-replication/syncdaemon/resource.py5
2 files changed, 38 insertions, 17 deletions
diff --git a/geo-replication/syncdaemon/master.py b/geo-replication/syncdaemon/master.py
index 79c90630dca..d8d26baafe1 100644
--- a/geo-replication/syncdaemon/master.py
+++ b/geo-replication/syncdaemon/master.py
@@ -239,6 +239,7 @@ class TarSSHEngine(object):
"""
def a_syncdata(self, files):
+ self.unlinked_gfids = []
logging.debug('files: %s' % (files))
for f in files:
pb = self.syncer.add(f)
@@ -252,6 +253,7 @@ class TarSSHEngine(object):
# stat check for file presence
st = lstat(se)
if isinstance(st, int):
+ self.unlinked_gfids.append(se)
return True
logging.warn('tar+ssh: %s [errcode: %d]' % (se, rv[1]))
self.add_job(self.FLAT_DIR_HIERARCHY, 'reg', regjob, f, None, pb)
@@ -270,6 +272,7 @@ class RsyncEngine(object):
"""Sync engine that uses rsync(1) for data transfers"""
def a_syncdata(self, files):
+ self.unlinked_gfids = []
logging.debug('files: %s' % (files))
for f in files:
logging.debug('candidate for syncing %s' % f)
@@ -286,6 +289,7 @@ class RsyncEngine(object):
st = lstat(se)
if isinstance(st, int):
# file got unlinked in the interim
+ self.unlinked_gfids.append(se)
return True
logging.warn('Rsync: %s [errcode: %d]' % (se, rv[1]))
self.add_job(self.FLAT_DIR_HIERARCHY, 'reg', regjob, f, None, pb)
@@ -423,6 +427,7 @@ class GMasterCommon(object):
self.checkpoint_thread = None
self.current_files_skipped_count = 0
self.skipped_gfid_list = []
+ self.unlinked_gfids = []
def init_keep_alive(cls):
"""start the keep-alive thread """
@@ -797,9 +802,10 @@ class GMasterChangelogMixin(GMasterCommon):
if k == 'stat':
st = ed[k]
dst = dct['stat'] = {}
- dst['uid'] = st.st_uid
- dst['gid'] = st.st_gid
- dst['mode'] = st.st_mode
+ if st:
+ dst['uid'] = st.st_uid
+ dst['gid'] = st.st_gid
+ dst['mode'] = st.st_mode
else:
dct[k] = ed[k]
return dct
@@ -830,6 +836,12 @@ class GMasterChangelogMixin(GMasterCommon):
gfid = ec[self.POS_GFID]
if ty in ['UNLINK', 'RMDIR']:
+ # Remove from DATA list, so that rsync will
+ # not fail
+ pt = os.path.join(pfx, ec[0])
+ if pt in datas:
+ datas.remove(pt)
+
purge_update()
entries.append(edct(ty, gfid=gfid, entry=en))
elif ty in ['CREATE', 'MKDIR', 'MKNOD']:
@@ -838,16 +850,22 @@ class GMasterChangelogMixin(GMasterCommon):
entries.append(edct(ty, gfid=gfid, entry=en,
mode=int(ec[2]),
uid=int(ec[3]), gid=int(ec[4])))
+ elif ty == "RENAME":
+ go = os.path.join(pfx, gfid)
+ st = lstat(go)
+ if isinstance(st, int):
+ st = {}
+
+ entry_update()
+ e1 = unescape(os.path.join(pfx, ec[self.POS_ENTRY1 - 1]))
+ entries.append(edct(ty, gfid=gfid, entry=e1, entry1=en,
+ stat=st))
else:
# stat() to get mode and other information
go = os.path.join(pfx, gfid)
st = lstat(go)
if isinstance(st, int):
- if ty == 'RENAME': # special hack for renames...
- entries.append(edct('UNLINK', gfid=gfid, entry=en))
- else:
- logging.debug(
- 'file %s got purged in the interim' % go)
+ logging.debug('file %s got purged in the interim' % go)
continue
if ty == 'LINK':
@@ -860,16 +878,17 @@ class GMasterChangelogMixin(GMasterCommon):
entry_update()
entries.append(
edct(ty, stat=st, entry=en, gfid=gfid, link=rl))
- elif ty == 'RENAME':
- entry_update()
- e1 = unescape(
- os.path.join(pfx, ec[self.POS_ENTRY1 - 1]))
- entries.append(
- edct(ty, gfid=gfid, entry=e1, entry1=en, stat=st))
else:
logging.warn('ignoring %s [op %s]' % (gfid, ty))
elif et == self.TYPE_GFID:
- datas.add(os.path.join(pfx, ec[0]))
+ # If self.unlinked_gfids is available, then that means it is
+ # retrying the changelog second time. Do not add the GFID's
+ # to rsync job if failed previously but unlinked in master
+ if self.unlinked_gfids and \
+ os.path.join(pfx, ec[0]) in self.unlinked_gfids:
+ logging.debug("ignoring data, since file purged interim")
+ else:
+ datas.add(os.path.join(pfx, ec[0]))
elif et == self.TYPE_META:
if ec[1] == 'SETATTR': # only setattr's for now...
if len(ec) == 5:
@@ -950,6 +969,7 @@ class GMasterChangelogMixin(GMasterCommon):
# @change is the last changelog (therefore max time for this batch)
if self.syncdata_wait():
+ self.unlinked_gfids = []
if done:
xtl = (int(change.split('.')[-1]) - 1, 0)
self.upd_stime(xtl)
diff --git a/geo-replication/syncdaemon/resource.py b/geo-replication/syncdaemon/resource.py
index 2a887daab15..1bee0a3338f 100644
--- a/geo-replication/syncdaemon/resource.py
+++ b/geo-replication/syncdaemon/resource.py
@@ -651,8 +651,9 @@ class Server(object):
en = e['entry1']
st = lstat(entry)
if isinstance(st, int):
- (pg, bname) = entry2pb(en)
- blob = entry_pack_reg_stat(gfid, bname, e['stat'])
+ if e['stat']:
+ (pg, bname) = entry2pb(en)
+ blob = entry_pack_reg_stat(gfid, bname, e['stat'])
else:
errno_wrap(os.rename, [entry, en], [ENOENT, EEXIST])
if blob: