summaryrefslogtreecommitdiffstats
path: root/geo-replication/syncdaemon/resource.py
diff options
context:
space:
mode:
Diffstat (limited to 'geo-replication/syncdaemon/resource.py')
-rw-r--r--geo-replication/syncdaemon/resource.py176
1 files changed, 138 insertions, 38 deletions
diff --git a/geo-replication/syncdaemon/resource.py b/geo-replication/syncdaemon/resource.py
index faf62f868c7..8deb5114b50 100644
--- a/geo-replication/syncdaemon/resource.py
+++ b/geo-replication/syncdaemon/resource.py
@@ -265,6 +265,9 @@ class Server(object):
FRGN_FMTSTR = NTV_FMTSTR + FRGN_XTRA_FMT
GX_GFID_CANONICAL_LEN = 37 # canonical gfid len + '\0'
+ GFID_XATTR = 'trusted.gfid' # for backend gfid fetch, do not use GX_NSPACE_PFX
+ GFID_FMTSTR = "!" + "B"*16
+
local_path = ''
@classmethod
@@ -305,6 +308,38 @@ class Server(object):
raise OSError(ENOTDIR, os.strerror(ENOTDIR))
return os.listdir(path)
+
+ @classmethod
+ @_pathguard
+ def lstat(cls, path):
+ try:
+ return os.lstat(path)
+ except (IOError, OSError):
+ ex = sys.exc_info()[1]
+ if ex.errno == ENOENT:
+ return ex.errno
+ else:
+ raise
+
+
+ @classmethod
+ @_pathguard
+ def gfid(cls, path):
+ try:
+ buf = Xattr.lgetxattr(path, cls.GFID_XATTR, 16)
+ m = re.match('(.{8})(.{4})(.{4})(.{4})(.{12})', "".join(['%02x' % x for x in struct.unpack(cls.GFID_FMTSTR, buf)]))
+ return '-'.join(m.groups())
+ except (IOError, OSError):
+ ex = sys.exc_info()[1]
+ if ex.errno == ENOENT:
+ return ex.errno
+ else:
+ raise
+
+ @classmethod
+ def gfid_mnt(cls, gfidpath):
+ return errno_wrap(Xattr.lgetxattr, [gfidpath, 'glusterfs.gfid.string', cls.GX_GFID_CANONICAL_LEN], [ENOENT])
+
@classmethod
@_pathguard
def purge(cls, path, entries=None):
@@ -397,8 +432,42 @@ class Server(object):
raise
@classmethod
- def gfid(cls, gfidpath):
- return errno_wrap(Xattr.lgetxattr, [gfidpath, 'glusterfs.gfid.string', cls.GX_GFID_CANONICAL_LEN], [ENOENT])
+ @_pathguard
+ def stime_mnt(cls, path, uuid):
+ """query xtime extended attribute
+
+ Return xtime of @path for @uuid as a pair of integers.
+ "Normal" errors due to non-existent @path or extended attribute
+ are tolerated and errno is returned in such a case.
+ """
+
+ try:
+ return struct.unpack('!II', Xattr.lgetxattr(path, '.'.join([cls.GX_NSPACE, uuid, 'stime']), 8))
+ except OSError:
+ ex = sys.exc_info()[1]
+ if ex.errno in (ENOENT, ENODATA, ENOTDIR):
+ return ex.errno
+ else:
+ raise
+
+ @classmethod
+ @_pathguard
+ def stime(cls, path, uuid):
+ """query xtime extended attribute
+
+ Return xtime of @path for @uuid as a pair of integers.
+ "Normal" errors due to non-existent @path or extended attribute
+ are tolerated and errno is returned in such a case.
+ """
+
+ try:
+ return struct.unpack('!II', Xattr.lgetxattr(path, '.'.join([cls.GX_NSPACE, uuid, 'stime']), 8))
+ except OSError:
+ ex = sys.exc_info()[1]
+ if ex.errno in (ENOENT, ENODATA, ENOTDIR):
+ return ex.errno
+ else:
+ raise
@classmethod
def node_uuid(cls, path='.'):
@@ -409,21 +478,10 @@ class Server(object):
raise
@classmethod
- def xtime_vec(cls, path, *uuids):
- """vectored version of @xtime
-
- accepts a list of uuids and returns a dictionary
- with uuid as key(s) and xtime as value(s)
- """
- xt = {}
- for uuid in uuids:
- xtu = cls.xtime(path, uuid)
- if xtu == ENODATA:
- xtu = None
- if isinstance(xtu, int):
- return xtu
- xt[uuid] = xtu
- return xt
+ @_pathguard
+ def set_stime(cls, path, uuid, mark):
+ """set @mark as stime for @uuid on @path"""
+ Xattr.lsetxattr(path, '.'.join([cls.GX_NSPACE, uuid, 'stime']), struct.pack('!II', *mark))
@classmethod
@_pathguard
@@ -444,20 +502,16 @@ class Server(object):
Xattr.lsetxattr(path, '.'.join([cls.GX_NSPACE, uuid, 'xtime']), struct.pack('!II', *mark))
@classmethod
- def set_xtime_vec(cls, path, mark_dct):
- """vectored (or dictered) version of set_xtime
-
- ignore values that match @ignore
- """
- for u,t in mark_dct.items():
- cls.set_xtime(path, u, t)
-
- @classmethod
def entry_ops(cls, entries):
pfx = gauxpfx()
logging.debug('entries: %s' % repr(entries))
# regular file
- def entry_pack_reg(gf, bn, st):
+ def entry_pack_reg(gf, bn, mo, uid, gid):
+ blen = len(bn)
+ return struct.pack(cls._fmt_mknod(blen),
+ uid, gid, gf, mo, bn,
+ stat.S_IMODE(mo), 0, umask())
+ def entry_pack_reg_stat(gf, bn, st):
blen = len(bn)
mo = st['mode']
return struct.pack(cls._fmt_mknod(blen),
@@ -465,12 +519,10 @@ class Server(object):
gf, mo, bn,
stat.S_IMODE(mo), 0, umask())
# mkdir
- def entry_pack_mkdir(gf, bn, st):
+ def entry_pack_mkdir(gf, bn, mo, uid, gid):
blen = len(bn)
- mo = st['mode']
return struct.pack(cls._fmt_mkdir(blen),
- st['uid'], st['gid'],
- gf, mo, bn,
+ uid, gid, gf, mo, bn,
stat.S_IMODE(mo), umask())
#symlink
def entry_pack_symlink(gf, bn, lnk, st):
@@ -485,7 +537,7 @@ class Server(object):
# to be purged is the GFID gotten from the changelog.
# (a stat(changelog_gfid) would also be valid here)
# The race here is between the GFID check and the purge.
- disk_gfid = cls.gfid(entry)
+ disk_gfid = cls.gfid_mnt(entry)
if isinstance(disk_gfid, int):
return
if not gfid == disk_gfid:
@@ -510,15 +562,15 @@ class Server(object):
else:
break
elif op in ['CREATE', 'MKNOD']:
- blob = entry_pack_reg(gfid, bname, e['stat'])
+ blob = entry_pack_reg(gfid, bname, e['mode'], e['uid'], e['uid'])
elif op == 'MKDIR':
- blob = entry_pack_mkdir(gfid, bname, e['stat'])
+ blob = entry_pack_mkdir(gfid, bname, e['mode'], e['uid'], e['uid'])
elif op == 'LINK':
slink = os.path.join(pfx, gfid)
st = lstat(slink)
if isinstance(st, int):
(pg, bname) = entry2pb(entry)
- blob = entry_pack_reg(gfid, bname, e['stat'])
+ blob = entry_pack_reg_stat(gfid, bname, e['stat'])
else:
errno_wrap(os.link, [slink, entry], [ENOENT, EEXIST])
elif op == 'SYMLINK':
@@ -528,13 +580,24 @@ class Server(object):
st = lstat(entry)
if isinstance(st, int):
(pg, bname) = entry2pb(en)
- blob = entry_pack_reg(gfid, bname, e['stat'])
+ blob = entry_pack_reg_stat(gfid, bname, e['stat'])
else:
errno_wrap(os.rename, [entry, en], [ENOENT, EEXIST])
if blob:
errno_wrap(Xattr.lsetxattr_l, [pg, 'glusterfs.gfid.newfile', blob], [EEXIST], [ENOENT, ESTALE, EINVAL])
@classmethod
+ def meta_ops(cls, meta_entries):
+ logging.debug('Meta-entries: %s' % repr(meta_entries))
+ for e in meta_entries:
+ mode = e['stat']['mode']
+ uid = e['stat']['uid']
+ gid = e['stat']['gid']
+ go = e['go']
+ errno_wrap(os.chmod, [go, mode], [ENOENT], [ESTALE, EINVAL])
+ errno_wrap(os.chown, [go, uid, gid], [ENOENT], [ESTALE, EINVAL])
+
+ @classmethod
def changelog_register(cls, cl_brick, cl_dir, cl_log, cl_level, retries = 0):
Changes.cl_register(cl_brick, cl_dir, cl_log, cl_level, retries)
@@ -699,6 +762,29 @@ class SlaveRemote(object):
return po
+ def tarssh(self, files, slaveurl):
+ """invoke tar+ssh
+ -z (compress) can be use if needed, but ommitting it now
+ as it results in wierd error (tar+ssh errors out (errcode: 2)
+ """
+ if not files:
+ raise GsyncdError("no files to sync")
+ logging.debug("files: " + ", ".join(files))
+ (host, rdir) = slaveurl.split(':')
+ tar_cmd = ["tar", "-cf", "-", "--files-from", "-"]
+ ssh_cmd = gconf.ssh_command_tar.split() + [host, "tar", "--overwrite", "-xf", "-", "-C", rdir]
+ p0 = Popen(tar_cmd, stdout=subprocess.PIPE, stdin=subprocess.PIPE, stderr=subprocess.PIPE)
+ p1 = Popen(ssh_cmd, stdin=p0.stdout, stderr=subprocess.PIPE)
+ for f in files:
+ p0.stdin.write(f)
+ p0.stdin.write('\n')
+ p0.stdin.close()
+ p0.wait()
+
+ p1.wait()
+ p1.terminate_geterr(fail_on_err = False)
+
+ return p1
class AbstractUrl(object):
"""abstract base class for url scheme classes"""
@@ -1041,12 +1127,20 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):
except ValueError:
pass
return e
+ @classmethod
+ def lstat(cls, e):
+ """ path based backend stat """
+ return super(brickserver, cls).lstat(e)
+ @classmethod
+ def gfid(cls, e):
+ """ path based backend gfid fetch """
+ return super(brickserver, cls).gfid(e)
if gconf.slave_id:
# define {,set_}xtime in slave, thus preempting
# the call to remote, so that it takes data from
# the local brick
- slave.server.xtime = types.MethodType(lambda _self, path, uuid: brickserver.xtime(path, uuid + '.' + gconf.slave_id), slave.server)
- slave.server.set_xtime = types.MethodType(lambda _self, path, uuid, mark: brickserver.set_xtime(path, uuid + '.' + gconf.slave_id, mark), slave.server)
+ slave.server.stime = types.MethodType(lambda _self, path, uuid: brickserver.stime(path, uuid + '.' + gconf.slave_id), slave.server)
+ slave.server.set_stime = types.MethodType(lambda _self, path, uuid, mark: brickserver.set_stime(path, uuid + '.' + gconf.slave_id, mark), slave.server)
(g1, g2) = self.gmaster_instantiate_tuple(slave)
g1.master.server = brickserver
g2.master.server = brickserver
@@ -1067,6 +1161,9 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):
def rsync(self, files):
return sup(self, files, self.slavedir)
+ def tarssh(self, files):
+ return sup(self, files, self.slavedir)
+
class SSH(AbstractUrl, SlaveRemote):
"""scheme class for ssh:// urls
@@ -1170,3 +1267,6 @@ class SSH(AbstractUrl, SlaveRemote):
def rsync(self, files):
return sup(self, files, '-e', " ".join(gconf.ssh_command.split() + gconf.ssh_ctl_args),
*(gconf.rsync_ssh_options.split() + [self.slaveurl]))
+
+ def tarssh(self, files):
+ return sup(self, files, self.slaveurl)