summaryrefslogtreecommitdiffstats
path: root/geo-replication/syncdaemon/resource.py
diff options
context:
space:
mode:
Diffstat (limited to 'geo-replication/syncdaemon/resource.py')
-rw-r--r--geo-replication/syncdaemon/resource.py97
1 files changed, 39 insertions, 58 deletions
diff --git a/geo-replication/syncdaemon/resource.py b/geo-replication/syncdaemon/resource.py
index a608944f9be..a6e351590f8 100644
--- a/geo-replication/syncdaemon/resource.py
+++ b/geo-replication/syncdaemon/resource.py
@@ -39,9 +39,11 @@ from syncdutils import NoStimeAvailable, PartialHistoryAvailable
from syncdutils import ChangelogException, ChangelogHistoryNotAvailable
from syncdutils import get_changelog_log_level, get_rsync_version
from syncdutils import CHANGELOG_AGENT_CLIENT_VERSION
+from syncdutils import GX_GFID_CANONICAL_LEN
from gsyncdstatus import GeorepStatus
from syncdutils import get_master_and_slave_data_from_args
from syncdutils import mntpt_list, lf
+from syncdutils import Xattr, matching_disk_gfid, get_gfid_from_mnt
UrlRX = re.compile('\A(\w+)://([^ *?[]*)\Z')
HostRX = re.compile('[a-zA-Z\d](?:[a-zA-Z\d.-]*[a-zA-Z\d])?', re.I)
@@ -110,33 +112,6 @@ def parse_url(ustr):
return getattr(this, sch.upper())(path)
-class _MetaXattr(object):
-
- """singleton class, a lazy wrapper around the
- libcxattr module
-
- libcxattr (a heavy import due to ctypes) is
- loaded only when when the single
- instance is tried to be used.
-
- This reduces runtime for those invocations
- which do not need filesystem manipulation
- (eg. for config, url parsing)
- """
-
- def __getattr__(self, meth):
- from libcxattr import Xattr as LXattr
- xmeth = [m for m in dir(LXattr) if m[0] != '_']
- if not meth in xmeth:
- return
- for m in xmeth:
- setattr(self, m, getattr(LXattr, m))
- return getattr(self, meth)
-
-
-Xattr = _MetaXattr()
-
-
class Popen(subprocess.Popen):
"""customized subclass of subprocess.Popen with a ring
@@ -294,7 +269,6 @@ class Server(object):
NTV_FMTSTR = "!" + "B" * 19 + "II"
FRGN_XTRA_FMT = "I"
FRGN_FMTSTR = NTV_FMTSTR + FRGN_XTRA_FMT
- GX_GFID_CANONICAL_LEN = 37 # canonical gfid len + '\0'
# for backend gfid fetch, do not use GX_NSPACE_PFX
GFID_XATTR = 'trusted.gfid'
@@ -304,15 +278,15 @@ class Server(object):
@classmethod
def _fmt_mknod(cls, l):
- return "!II%dsI%dsIII" % (cls.GX_GFID_CANONICAL_LEN, l + 1)
+ return "!II%dsI%dsIII" % (GX_GFID_CANONICAL_LEN, l + 1)
@classmethod
def _fmt_mkdir(cls, l):
- return "!II%dsI%dsII" % (cls.GX_GFID_CANONICAL_LEN, l + 1)
+ return "!II%dsI%dsII" % (GX_GFID_CANONICAL_LEN, l + 1)
@classmethod
def _fmt_symlink(cls, l1, l2):
- return "!II%dsI%ds%ds" % (cls.GX_GFID_CANONICAL_LEN, l1 + 1, l2 + 1)
+ return "!II%dsI%ds%ds" % (GX_GFID_CANONICAL_LEN, l1 + 1, l2 + 1)
def _pathguard(f):
"""decorator method that checks
@@ -385,12 +359,6 @@ class Server(object):
raise
@classmethod
- def gfid_mnt(cls, gfidpath):
- return errno_wrap(Xattr.lgetxattr,
- [gfidpath, 'glusterfs.gfid.string',
- cls.GX_GFID_CANONICAL_LEN], [ENOENT], [ESTALE])
-
- @classmethod
@_pathguard
def purge(cls, path, entries=None):
"""force-delete subtrees
@@ -653,27 +621,33 @@ class Server(object):
return
if op == 'UNLINK':
- errno_wrap(os.unlink, [entry], [ENOENT, ESTALE],
- [EBUSY])
+ er = errno_wrap(os.unlink, [entry], [ENOENT, ESTALE], [EBUSY])
+ return er
+
elif op == 'RMDIR':
er = errno_wrap(os.rmdir, [entry], [ENOENT, ESTALE,
ENOTEMPTY], [EBUSY])
if er == ENOTEMPTY:
return er
- def collect_failure(e, cmd_ret):
+ def collect_failure(e, cmd_ret, dst=False):
slv_entry_info = {}
slv_entry_info['gfid_mismatch'] = False
+ slv_entry_info['dst'] = dst
# We do this for failing fops on Slave
# Master should be logging this
if cmd_ret is None:
return False
if cmd_ret == EEXIST:
- disk_gfid = cls.gfid_mnt(e['entry'])
+ if dst:
+ en = e['entry1']
+ else:
+ en = e['entry']
+ disk_gfid = get_gfid_from_mnt(en)
if isinstance(disk_gfid, basestring) and e['gfid'] != disk_gfid:
slv_entry_info['gfid_mismatch'] = True
- st = lstat(e['entry'])
+ st = lstat(en)
if not isinstance(st, int):
if st and stat.S_ISDIR(st.st_mode):
slv_entry_info['slave_isdir'] = True
@@ -690,16 +664,6 @@ class Server(object):
failures = []
- def matching_disk_gfid(gfid, entry):
- disk_gfid = cls.gfid_mnt(entry)
- if isinstance(disk_gfid, int):
- return False
-
- if not gfid == disk_gfid:
- return False
-
- return True
-
def recursive_rmdir(gfid, entry, path):
"""disk_gfid check added for original path for which
recursive_delete is called. This disk gfid check executed
@@ -738,8 +702,9 @@ class Server(object):
"with on-disk gfid",
source=entry,
gfid=gfid,
- disk_gfid=cls.gfid_mnt(entry),
+ disk_gfid=get_gfid_from_mnt(entry),
target=en))
+ collect_failure(e, EEXIST)
return
cmd_ret = errno_wrap(os.rename,
@@ -817,14 +782,24 @@ class Server(object):
st = lstat(slink)
if isinstance(st, int):
(pg, bname) = entry2pb(entry)
- blob = entry_pack_reg_stat(gfid, bname, e['stat'])
+ if stat.S_ISREG(e['stat']['mode']):
+ blob = entry_pack_reg_stat(gfid, bname, e['stat'])
+ elif stat.S_ISLNK(e['stat']['mode']):
+ blob = entry_pack_symlink(gfid, bname, e['link'],
+ e['stat'])
else:
cmd_ret = errno_wrap(os.link,
[slink, entry],
[ENOENT, EEXIST], [ESTALE])
collect_failure(e, cmd_ret)
elif op == 'SYMLINK':
- blob = entry_pack_symlink(gfid, bname, e['link'], e['stat'])
+ en = e['entry']
+ st = lstat(entry)
+ if isinstance(st, int):
+ blob = entry_pack_symlink(gfid, bname, e['link'],
+ e['stat'])
+ elif not matching_disk_gfid(gfid, en):
+ collect_failure(e, EEXIST)
elif op == 'RENAME':
en = e['entry1']
st = lstat(entry)
@@ -832,9 +807,13 @@ class Server(object):
if e['stat'] and not stat.S_ISDIR(e['stat']['mode']):
if stat.S_ISLNK(e['stat']['mode']) and \
e['link'] is not None:
- (pg, bname) = entry2pb(en)
- blob = entry_pack_symlink(gfid, bname,
- e['link'], e['stat'])
+ st1 = lstat(en)
+ if isinstance(st1, int):
+ (pg, bname) = entry2pb(en)
+ blob = entry_pack_symlink(gfid, bname,
+ e['link'], e['stat'])
+ elif not matching_disk_gfid(gfid, en):
+ collect_failure(e, EEXIST, True)
else:
(pg, bname) = entry2pb(en)
blob = entry_pack_reg_stat(gfid, bname, e['stat'])
@@ -865,6 +844,8 @@ class Server(object):
raise
else:
raise
+ elif not matching_disk_gfid(gfid, en):
+ collect_failure(e, EEXIST, True)
else:
rename_with_disk_gfid_confirmation(gfid, entry, en)
if blob: