diff options
Diffstat (limited to 'geo-replication/syncdaemon/resource.py')
| -rw-r--r-- | geo-replication/syncdaemon/resource.py | 296 |
1 files changed, 163 insertions, 133 deletions
diff --git a/geo-replication/syncdaemon/resource.py b/geo-replication/syncdaemon/resource.py index df4006f971a..f12c7ceaa36 100644 --- a/geo-replication/syncdaemon/resource.py +++ b/geo-replication/syncdaemon/resource.py @@ -19,28 +19,31 @@ import struct import logging import tempfile import subprocess -from errno import EEXIST, ENOENT, ENODATA, ENOTDIR, ELOOP, EACCES -from errno import EISDIR, ENOTEMPTY, ESTALE, EINVAL, EBUSY, EPERM +from errno import (EEXIST, ENOENT, ENODATA, ENOTDIR, ELOOP, EACCES, + EISDIR, ENOTEMPTY, ESTALE, EINVAL, EBUSY, EPERM) import errno from rconf import rconf import gsyncdconfig as gconf +import libgfchangelog import repce from repce import RepceServer, RepceClient from master import gmaster_builder import syncdutils -from syncdutils import GsyncdError, select, privileged, funcode -from syncdutils import umask, entry2pb, gauxpfx, errno_wrap, lstat -from syncdutils import NoStimeAvailable, PartialHistoryAvailable -from syncdutils import ChangelogException, ChangelogHistoryNotAvailable -from syncdutils import get_changelog_log_level, get_rsync_version -from syncdutils import CHANGELOG_AGENT_CLIENT_VERSION -from syncdutils import GX_GFID_CANONICAL_LEN +from syncdutils import (GsyncdError, select, privileged, funcode, + entry2pb, gauxpfx, errno_wrap, lstat, + NoStimeAvailable, PartialHistoryAvailable, + ChangelogException, ChangelogHistoryNotAvailable, + get_changelog_log_level, get_rsync_version, + GX_GFID_CANONICAL_LEN, + gf_mount_ready, lf, Popen, sup, + Xattr, matching_disk_gfid, get_gfid_from_mnt, + unshare_propagation_supported, get_slv_dir_path) from gsyncdstatus import GeorepStatus -from syncdutils import lf, Popen, sup -from syncdutils import Xattr, matching_disk_gfid, get_gfid_from_mnt -from syncdutils import unshare_propagation_supported, get_slv_dir_path +from py2py3 import (pipe, str_to_bytearray, entry_pack_reg, + entry_pack_reg_stat, entry_pack_mkdir, + entry_pack_symlink) ENOTSUP = getattr(errno, 'ENOTSUP', 'EOPNOTSUPP') @@ -145,6 +148,7 @@ class Server(object): if buf == ENOENT: return buf else: + buf = str_to_bytearray(buf) m = re.match('(.{8})(.{4})(.{4})(.{4})(.{12})', "".join( ['%02x' % x for x in struct.unpack(cls.GFID_FMTSTR, buf)])) return '-'.join(m.groups()) @@ -235,6 +239,7 @@ class Server(object): val = Xattr.lgetxattr(path, '.'.join([cls.GX_NSPACE, uuid, 'xtime']), 8) + val = str_to_bytearray(val) return struct.unpack('!II', val) except OSError: ex = sys.exc_info()[1] @@ -257,6 +262,7 @@ class Server(object): val = Xattr.lgetxattr(path, '.'.join([cls.GX_NSPACE, uuid, 'stime']), 8) + val = str_to_bytearray(val) return struct.unpack('!II', val) except OSError: ex = sys.exc_info()[1] @@ -279,6 +285,7 @@ class Server(object): val = Xattr.lgetxattr(path, '.'.join([cls.GX_NSPACE, uuid, 'stime']), 8) + val = str_to_bytearray(val) return struct.unpack('!II', val) except OSError: ex = sys.exc_info()[1] @@ -302,6 +309,7 @@ class Server(object): '.'.join([cls.GX_NSPACE, uuid, 'entry_stime']), 8) + val = str_to_bytearray(val) return struct.unpack('!II', val) except OSError: ex = sys.exc_info()[1] @@ -370,38 +378,9 @@ class Server(object): def entry_ops(cls, entries): pfx = gauxpfx() logging.debug('entries: %s' % repr(entries)) - # regular file - - def entry_pack_reg(gf, bn, mo, uid, gid): - blen = len(bn) - return struct.pack(cls._fmt_mknod(blen), - uid, gid, gf, mo, bn, - stat.S_IMODE(mo), 0, umask()) - - def entry_pack_reg_stat(gf, bn, st): - blen = len(bn) - mo = st['mode'] - return struct.pack(cls._fmt_mknod(blen), - st['uid'], st['gid'], - gf, mo, bn, - stat.S_IMODE(mo), 0, umask()) - # mkdir - - def entry_pack_mkdir(gf, bn, mo, uid, gid): - blen = len(bn) - return struct.pack(cls._fmt_mkdir(blen), - uid, gid, gf, mo, bn, - stat.S_IMODE(mo), umask()) - # symlink - - def entry_pack_symlink(gf, bn, lnk, st): - blen = len(bn) - llen = len(lnk) - return struct.pack(cls._fmt_symlink(blen, llen), - st['uid'], st['gid'], - gf, st['mode'], bn, lnk) - - def entry_purge(op, entry, gfid, e): + dist_count = rconf.args.master_dist_count + + def entry_purge(op, entry, gfid, e, uid, gid): # This is an extremely racy code and needs to be fixed ASAP. # The GFID check here is to be sure that the pargfid/bname # to be purged is the GFID gotten from the changelog. @@ -415,7 +394,7 @@ class Server(object): return if not matching_disk_gfid(gfid, entry): - collect_failure(e, EEXIST) + collect_failure(e, EEXIST, uid, gid) return if op == 'UNLINK': @@ -431,7 +410,7 @@ class Server(object): if er == ENOTEMPTY: return er - def collect_failure(e, cmd_ret, dst=False): + def collect_failure(e, cmd_ret, uid, gid, dst=False): slv_entry_info = {} slv_entry_info['gfid_mismatch'] = False slv_entry_info['name_mismatch'] = False @@ -444,13 +423,18 @@ class Server(object): if cmd_ret is None: return False - if cmd_ret == EEXIST: + if e.get("stat", {}): + # Copy actual UID/GID value back to entry stat + e['stat']['uid'] = uid + e['stat']['gid'] = gid + + if cmd_ret in [EEXIST, ESTALE]: if dst: en = e['entry1'] else: en = e['entry'] disk_gfid = get_gfid_from_mnt(en) - if isinstance(disk_gfid, basestring) and \ + if isinstance(disk_gfid, str) and \ e['gfid'] != disk_gfid: slv_entry_info['gfid_mismatch'] = True st = lstat(en) @@ -505,7 +489,7 @@ class Server(object): errno_wrap(os.rmdir, [path], [ENOENT, ESTALE], [EBUSY]) - def rename_with_disk_gfid_confirmation(gfid, entry, en): + def rename_with_disk_gfid_confirmation(gfid, entry, en, uid, gid): if not matching_disk_gfid(gfid, entry): logging.error(lf("RENAME ignored: source entry does not match " "with on-disk gfid", @@ -513,13 +497,13 @@ class Server(object): gfid=gfid, disk_gfid=get_gfid_from_mnt(entry), target=en)) - collect_failure(e, EEXIST) + collect_failure(e, EEXIST, uid, gid) return cmd_ret = errno_wrap(os.rename, [entry, en], [ENOENT, EEXIST], [ESTALE, EBUSY]) - collect_failure(e, cmd_ret) + collect_failure(e, cmd_ret, uid, gid) for e in entries: blob = None @@ -528,6 +512,12 @@ class Server(object): entry = e['entry'] uid = 0 gid = 0 + + # Skip entry processing if it's marked true during gfid + # conflict resolution + if e['skip_entry']: + continue + if e.get("stat", {}): # Copy UID/GID value and then reset to zero. Copied UID/GID # will be used to run chown once entry is created. @@ -540,7 +530,7 @@ class Server(object): if op in ['RMDIR', 'UNLINK']: # Try once, if rmdir failed with ENOTEMPTY # then delete recursively. - er = entry_purge(op, entry, gfid, e) + er = entry_purge(op, entry, gfid, e, uid, gid) if isinstance(er, int): if er == ENOTEMPTY and op == 'RMDIR': # Retry if ENOTEMPTY, ESTALE @@ -568,8 +558,8 @@ class Server(object): st = lstat(slink) # don't create multiple entries with same gfid if isinstance(st, int): - blob = entry_pack_reg( - gfid, bname, e['mode'], e['uid'], e['gid']) + blob = entry_pack_reg(cls, gfid, bname, + e['mode'], e['uid'], e['gid']) # Self healed hardlinks are recorded as MKNOD. # So if the gfid already exists, it should be # processed as hard link not mknod. @@ -577,15 +567,15 @@ class Server(object): cmd_ret = errno_wrap(os.link, [slink, entry], [ENOENT, EEXIST], [ESTALE]) - collect_failure(e, cmd_ret) + collect_failure(e, cmd_ret, uid, gid) elif op == 'MKDIR': en = e['entry'] slink = os.path.join(pfx, gfid) st = lstat(slink) # don't create multiple entries with same gfid if isinstance(st, int): - blob = entry_pack_mkdir( - gfid, bname, e['mode'], e['uid'], e['gid']) + blob = entry_pack_mkdir(cls, gfid, bname, + e['mode'], e['uid'], e['gid']) elif (isinstance(lstat(en), int) or not matching_disk_gfid(gfid, en)): # If gfid of a directory exists on slave but path based @@ -597,6 +587,8 @@ class Server(object): logging.info(lf("Special case: rename on mkdir", gfid=gfid, entry=repr(entry))) src_entry = get_slv_dir_path(slv_host, slv_volume, gfid) + if src_entry is None: + collect_failure(e, ENOENT, uid, gid) if src_entry is not None and src_entry != entry: slv_entry_info = {} slv_entry_info['gfid_mismatch'] = False @@ -613,23 +605,23 @@ class Server(object): if isinstance(st, int): (pg, bname) = entry2pb(entry) if stat.S_ISREG(e['stat']['mode']): - blob = entry_pack_reg_stat(gfid, bname, e['stat']) + blob = entry_pack_reg_stat(cls, gfid, bname, e['stat']) elif stat.S_ISLNK(e['stat']['mode']): - blob = entry_pack_symlink(gfid, bname, e['link'], + blob = entry_pack_symlink(cls, gfid, bname, e['link'], e['stat']) else: cmd_ret = errno_wrap(os.link, [slink, entry], [ENOENT, EEXIST], [ESTALE]) - collect_failure(e, cmd_ret) + collect_failure(e, cmd_ret, uid, gid) elif op == 'SYMLINK': en = e['entry'] st = lstat(entry) if isinstance(st, int): - blob = entry_pack_symlink(gfid, bname, e['link'], + blob = entry_pack_symlink(cls, gfid, bname, e['link'], e['stat']) elif not matching_disk_gfid(gfid, en): - collect_failure(e, EEXIST) + collect_failure(e, EEXIST, uid, gid) elif op == 'RENAME': en = e['entry1'] # The matching disk gfid check validates two things @@ -641,32 +633,38 @@ class Server(object): # exist with different gfid. if not matching_disk_gfid(gfid, entry): if e['stat'] and not stat.S_ISDIR(e['stat']['mode']): - if stat.S_ISLNK(e['stat']['mode']) and \ - e['link'] is not None: - st1 = lstat(en) - if isinstance(st1, int): - (pg, bname) = entry2pb(en) - blob = entry_pack_symlink(gfid, bname, - e['link'], e['stat']) - elif not matching_disk_gfid(gfid, en): - collect_failure(e, EEXIST, True) + if stat.S_ISLNK(e['stat']['mode']): + # src is not present, so don't sync symlink as + # we don't know target. It's ok to ignore. If + # it's unliked, it's fine. If it's renamed to + # something else, it will be synced then. + if e['link'] is not None: + st1 = lstat(en) + if isinstance(st1, int): + (pg, bname) = entry2pb(en) + blob = entry_pack_symlink(cls, gfid, bname, + e['link'], + e['stat']) + elif not matching_disk_gfid(gfid, en): + collect_failure(e, EEXIST, uid, gid, True) else: slink = os.path.join(pfx, gfid) st = lstat(slink) # don't create multiple entries with same gfid if isinstance(st, int): (pg, bname) = entry2pb(en) - blob = entry_pack_reg_stat(gfid, bname, + blob = entry_pack_reg_stat(cls, gfid, bname, e['stat']) else: cmd_ret = errno_wrap(os.link, [slink, en], [ENOENT, EEXIST], [ESTALE]) - collect_failure(e, cmd_ret) + collect_failure(e, cmd_ret, uid, gid) else: st = lstat(entry) st1 = lstat(en) if isinstance(st1, int): - rename_with_disk_gfid_confirmation(gfid, entry, en) + rename_with_disk_gfid_confirmation(gfid, entry, en, + uid, gid) else: if st.st_ino == st1.st_ino: # we have a hard link, we can now unlink source @@ -690,16 +688,23 @@ class Server(object): raise else: raise - elif not matching_disk_gfid(gfid, en): - collect_failure(e, EEXIST, True) + elif not matching_disk_gfid(gfid, en) and dist_count > 1: + collect_failure(e, EEXIST, uid, gid, True) else: - rename_with_disk_gfid_confirmation(gfid, entry, en) + # We are here which means matching_disk_gfid for + # both source and destination has returned false + # and distribution count for master vol is greater + # then one. Which basically says both the source and + # destination exist and not hardlinks. + # So we are safe to go ahead with rename here. + rename_with_disk_gfid_confirmation(gfid, entry, en, + uid, gid) if blob: cmd_ret = errno_wrap(Xattr.lsetxattr, [pg, 'glusterfs.gfid.newfile', blob], - [EEXIST, ENOENT], + [EEXIST, ENOENT, ESTALE], [ESTALE, EINVAL, EBUSY]) - collect_failure(e, cmd_ret) + collect_failure(e, cmd_ret, uid, gid) # If UID/GID is different than zero that means we are trying # create Entry with different UID/GID. Create Entry with @@ -708,7 +713,7 @@ class Server(object): path = os.path.join(pfx, gfid) cmd_ret = errno_wrap(os.lchown, [path, uid, gid], [ENOENT], [ESTALE, EINVAL]) - collect_failure(e, cmd_ret) + collect_failure(e, cmd_ret, uid, gid) return failures @@ -735,10 +740,8 @@ class Server(object): # 'lchown' 'lchmod' 'utime with no-deference' blindly. # But since 'lchmod' and 'utime with no de-reference' is # not supported in python3, we have to rely on 'chmod' - # and 'utime with de-reference'. But 'chmod' - # de-reference the symlink and gets ENOENT, EACCES, - # EPERM errors, hence ignoring those errors if it's on - # symlink file. + # and 'utime with de-reference'. Hence avoiding 'chmod' + # and 'utime' if it's symlink file. is_symlink = False cmd_ret = errno_wrap(os.lchown, [go, uid, gid], [ENOENT], @@ -746,19 +749,17 @@ class Server(object): if isinstance(cmd_ret, int): continue - cmd_ret = errno_wrap(os.chmod, [go, mode], - [ENOENT, EACCES, EPERM], [ESTALE, EINVAL]) - if isinstance(cmd_ret, int): - is_symlink = os.path.islink(go) - if not is_symlink: + is_symlink = os.path.islink(go) + + if not is_symlink: + cmd_ret = errno_wrap(os.chmod, [go, mode], + [ENOENT, EACCES, EPERM], [ESTALE, EINVAL]) + if isinstance(cmd_ret, int): failures.append((e, cmd_ret, "chmod")) - cmd_ret = errno_wrap(os.utime, [go, (atime, mtime)], - [ENOENT, EACCES, EPERM], [ESTALE, EINVAL]) - if isinstance(cmd_ret, int): - if not is_symlink: - is_symlink = os.path.islink(go) - if not is_symlink: + cmd_ret = errno_wrap(os.utime, [go, (atime, mtime)], + [ENOENT, EACCES, EPERM], [ESTALE, EINVAL]) + if isinstance(cmd_ret, int): failures.append((e, cmd_ret, "utime")) return failures @@ -834,7 +835,8 @@ class Mounter(object): def umount_l(self, d): """perform lazy umount""" - po = Popen(self.make_umount_argv(d), stderr=subprocess.PIPE) + po = Popen(self.make_umount_argv(d), stderr=subprocess.PIPE, + universal_newlines=True) po.wait() return po @@ -858,7 +860,7 @@ class Mounter(object): change into the mount, and lazy unmount the filesystem. """ - mpi, mpo = os.pipe() + mpi, mpo = pipe() mh = Popen.fork() if mh: # Parent @@ -869,7 +871,9 @@ class Mounter(object): if self.mntpt: # mntpt is determined pre-mount d = self.mntpt - os.write(mpo, d + '\0') + mnt_msg = d + '\0' + encoded_msg = mnt_msg.encode() + os.write(mpo, encoded_msg) po = Popen(margv, **self.mountkw) self.handle_mounter(po) po.terminate_geterr() @@ -877,8 +881,11 @@ class Mounter(object): if not d: # mntpt is determined during mount d = self.mntpt - os.write(mpo, d + '\0') - os.write(mpo, 'M') + mnt_msg = d + '\0' + encoded_msg = mnt_msg.encode() + os.write(mpo, encoded_msg) + encoded_msg = 'M'.encode() + os.write(mpo, encoded_msg) t = syncdutils.Thread(target=lambda: os.chdir(d)) t.start() tlim = rconf.starttime + gconf.get("connection-timeout") @@ -907,6 +914,7 @@ class Mounter(object): mntdata = '' while True: c = os.read(mpi, 1) + c = c.decode() if not c: break mntdata += c @@ -943,6 +951,16 @@ class Mounter(object): logging.exception('mount cleanup failure:') rv = 200 os._exit(rv) + + #Polling the dht.subvol.status value. + RETRIES = 10 + while not gf_mount_ready(): + if RETRIES < 0: + logging.error('Subvols are not up') + break + RETRIES -= 1 + time.sleep(0.2) + logging.debug('auxiliary glusterfs mount prepared') @@ -950,7 +968,7 @@ class DirectMounter(Mounter): """mounter backend which calls mount(8), umount(8) directly""" - mountkw = {'stderr': subprocess.PIPE} + mountkw = {'stderr': subprocess.PIPE, 'universal_newlines': True} glusterprog = 'glusterfs' @staticmethod @@ -973,7 +991,8 @@ class MountbrokerMounter(Mounter): """mounter backend using the mountbroker gluster service""" - mountkw = {'stderr': subprocess.PIPE, 'stdout': subprocess.PIPE} + mountkw = {'stderr': subprocess.PIPE, 'stdout': subprocess.PIPE, + 'universal_newlines': True} glusterprog = 'gluster' @classmethod @@ -1014,6 +1033,7 @@ class GLUSTERServer(Server): """generic volume mark fetching/parsing backed""" fmt_string = cls.NTV_FMTSTR + extra_fields buf = Xattr.lgetxattr('.', xattr, struct.calcsize(fmt_string)) + buf = str_to_bytearray(buf) vm = struct.unpack(fmt_string, buf) m = re.match( '(.{8})(.{4})(.{4})(.{4})(.{12})', @@ -1236,9 +1256,6 @@ class GLUSTER(object): # register the crawlers and start crawling # g1 ==> Xsync, g2 ==> config.change_detector(changelog by default) # g3 ==> changelog History - (inf, ouf, ra, wa) = rconf.args.rpc_fd.split(',') - changelog_agent = RepceClient(int(inf), int(ouf)) - status = GeorepStatus(gconf.get("state-file"), rconf.args.local_node, rconf.args.local_path, @@ -1246,12 +1263,6 @@ class GLUSTER(object): rconf.args.master, rconf.args.slave) status.reset_on_worker_start() - rv = changelog_agent.version() - if int(rv) != CHANGELOG_AGENT_CLIENT_VERSION: - raise GsyncdError( - "RePCe major version mismatch(changelog agent): " - "local %s, remote %s" % - (CHANGELOG_AGENT_CLIENT_VERSION, rv)) try: workdir = g2.setup_working_dir() @@ -1262,17 +1273,16 @@ class GLUSTER(object): # register with the changelog library # 9 == log level (DEBUG) # 5 == connection retries - changelog_agent.init() - changelog_agent.register(rconf.args.local_path, - workdir, - gconf.get("changelog-log-file"), - get_changelog_log_level( - gconf.get("changelog-log-level")), - g2.CHANGELOG_CONN_RETRIES) + libgfchangelog.register(rconf.args.local_path, + workdir, + gconf.get("changelog-log-file"), + get_changelog_log_level( + gconf.get("changelog-log-level")), + g2.CHANGELOG_CONN_RETRIES) register_time = int(time.time()) - g2.register(register_time, changelog_agent, status) - g3.register(register_time, changelog_agent, status) + g2.register(register_time, status) + g3.register(register_time, status) except ChangelogException as e: logging.error(lf("Changelog register failed", error=e)) sys.exit(1) @@ -1407,7 +1417,9 @@ class SSH(object): '--slave-gluster-log-level', gconf.get("slave-gluster-log-level"), '--slave-gluster-command-dir', - gconf.get("slave-gluster-command-dir")] + gconf.get("slave-gluster-command-dir"), + '--master-dist-count', + str(gconf.get("master-distribution-count"))] if gconf.get("slave-access-mount"): args_to_slave.append('--slave-access-mount') @@ -1472,12 +1484,13 @@ class SSH(object): if log_rsync_performance: # use stdout=PIPE only when log_rsync_performance enabled - # Else rsync will write to stdout and nobody is their + # Else rsync will write to stdout and nobody is there # to consume. If PIPE is full rsync hangs. po = Popen(argv, stdin=subprocess.PIPE, stdout=subprocess.PIPE, - stderr=subprocess.PIPE) + stderr=subprocess.PIPE, universal_newlines=True) else: - po = Popen(argv, stdin=subprocess.PIPE, stderr=subprocess.PIPE) + po = Popen(argv, stdin=subprocess.PIPE, stderr=subprocess.PIPE, + universal_newlines=True) for f in files: po.stdin.write(f) @@ -1509,7 +1522,7 @@ class SSH(object): return po - def tarssh(self, files, slaveurl, log_err=False): + def tarssh(self, files, log_err=False): """invoke tar+ssh -z (compress) can be use if needed, but omitting it now as it results in weird error (tar+ssh errors out (errcode: 2) @@ -1517,32 +1530,49 @@ class SSH(object): if not files: raise GsyncdError("no files to sync") logging.debug("files: " + ", ".join(files)) - (host, rdir) = slaveurl.split(':') + (host, rdir) = self.slaveurl.split(':') + tar_cmd = ["tar"] + \ ["--sparse", "-cf", "-", "--files-from", "-"] - ssh_cmd = gconf.get("ssh-command-tar").split() + \ + ssh_cmd = gconf.get("ssh-command").split() + \ gconf.get("ssh-options-tar").split() + \ ["-p", str(gconf.get("ssh-port"))] + \ [host, "tar"] + \ ["--overwrite", "-xf", "-", "-C", rdir] p0 = Popen(tar_cmd, stdout=subprocess.PIPE, - stdin=subprocess.PIPE, stderr=subprocess.PIPE) - p1 = Popen(ssh_cmd, stdin=p0.stdout, stderr=subprocess.PIPE) + stdin=subprocess.PIPE, stderr=subprocess.PIPE, + universal_newlines=True) + p1 = Popen(ssh_cmd, stdin=p0.stdout, stderr=subprocess.PIPE, + universal_newlines=True) for f in files: p0.stdin.write(f) p0.stdin.write('\n') p0.stdin.close() p0.stdout.close() # Allow p0 to receive a SIGPIPE if p1 exits. - # wait for tar to terminate, collecting any errors, further - # waiting for transfer to complete - _, stderr1 = p1.communicate() # stdin and stdout of p0 is already closed, Reset to None and # wait for child process to complete p0.stdin = None p0.stdout = None - p0.communicate() + + def wait_for_tar(p0): + _, stderr = p0.communicate() + if log_err: + for errline in stderr.strip().split("\n")[:-1]: + if "No such file or directory" not in errline: + logging.error(lf("SYNC Error", + sync_engine="Tarssh", + error=errline)) + + t = syncdutils.Thread(target=wait_for_tar, args=(p0, )) + # wait for tar to terminate, collecting any errors, further + # waiting for transfer to complete + t.start() + + # wait for ssh process + _, stderr1 = p1.communicate() + t.join() if log_err: for errline in stderr1.strip().split("\n")[:-1]: |
