summaryrefslogtreecommitdiffstats
path: root/geo-replication/syncdaemon/resource.py
diff options
context:
space:
mode:
Diffstat (limited to 'geo-replication/syncdaemon/resource.py')
-rw-r--r--geo-replication/syncdaemon/resource.py296
1 files changed, 163 insertions, 133 deletions
diff --git a/geo-replication/syncdaemon/resource.py b/geo-replication/syncdaemon/resource.py
index df4006f971a..f12c7ceaa36 100644
--- a/geo-replication/syncdaemon/resource.py
+++ b/geo-replication/syncdaemon/resource.py
@@ -19,28 +19,31 @@ import struct
import logging
import tempfile
import subprocess
-from errno import EEXIST, ENOENT, ENODATA, ENOTDIR, ELOOP, EACCES
-from errno import EISDIR, ENOTEMPTY, ESTALE, EINVAL, EBUSY, EPERM
+from errno import (EEXIST, ENOENT, ENODATA, ENOTDIR, ELOOP, EACCES,
+ EISDIR, ENOTEMPTY, ESTALE, EINVAL, EBUSY, EPERM)
import errno
from rconf import rconf
import gsyncdconfig as gconf
+import libgfchangelog
import repce
from repce import RepceServer, RepceClient
from master import gmaster_builder
import syncdutils
-from syncdutils import GsyncdError, select, privileged, funcode
-from syncdutils import umask, entry2pb, gauxpfx, errno_wrap, lstat
-from syncdutils import NoStimeAvailable, PartialHistoryAvailable
-from syncdutils import ChangelogException, ChangelogHistoryNotAvailable
-from syncdutils import get_changelog_log_level, get_rsync_version
-from syncdutils import CHANGELOG_AGENT_CLIENT_VERSION
-from syncdutils import GX_GFID_CANONICAL_LEN
+from syncdutils import (GsyncdError, select, privileged, funcode,
+ entry2pb, gauxpfx, errno_wrap, lstat,
+ NoStimeAvailable, PartialHistoryAvailable,
+ ChangelogException, ChangelogHistoryNotAvailable,
+ get_changelog_log_level, get_rsync_version,
+ GX_GFID_CANONICAL_LEN,
+ gf_mount_ready, lf, Popen, sup,
+ Xattr, matching_disk_gfid, get_gfid_from_mnt,
+ unshare_propagation_supported, get_slv_dir_path)
from gsyncdstatus import GeorepStatus
-from syncdutils import lf, Popen, sup
-from syncdutils import Xattr, matching_disk_gfid, get_gfid_from_mnt
-from syncdutils import unshare_propagation_supported, get_slv_dir_path
+from py2py3 import (pipe, str_to_bytearray, entry_pack_reg,
+ entry_pack_reg_stat, entry_pack_mkdir,
+ entry_pack_symlink)
ENOTSUP = getattr(errno, 'ENOTSUP', 'EOPNOTSUPP')
@@ -145,6 +148,7 @@ class Server(object):
if buf == ENOENT:
return buf
else:
+ buf = str_to_bytearray(buf)
m = re.match('(.{8})(.{4})(.{4})(.{4})(.{12})', "".join(
['%02x' % x for x in struct.unpack(cls.GFID_FMTSTR, buf)]))
return '-'.join(m.groups())
@@ -235,6 +239,7 @@ class Server(object):
val = Xattr.lgetxattr(path,
'.'.join([cls.GX_NSPACE, uuid, 'xtime']),
8)
+ val = str_to_bytearray(val)
return struct.unpack('!II', val)
except OSError:
ex = sys.exc_info()[1]
@@ -257,6 +262,7 @@ class Server(object):
val = Xattr.lgetxattr(path,
'.'.join([cls.GX_NSPACE, uuid, 'stime']),
8)
+ val = str_to_bytearray(val)
return struct.unpack('!II', val)
except OSError:
ex = sys.exc_info()[1]
@@ -279,6 +285,7 @@ class Server(object):
val = Xattr.lgetxattr(path,
'.'.join([cls.GX_NSPACE, uuid, 'stime']),
8)
+ val = str_to_bytearray(val)
return struct.unpack('!II', val)
except OSError:
ex = sys.exc_info()[1]
@@ -302,6 +309,7 @@ class Server(object):
'.'.join([cls.GX_NSPACE, uuid,
'entry_stime']),
8)
+ val = str_to_bytearray(val)
return struct.unpack('!II', val)
except OSError:
ex = sys.exc_info()[1]
@@ -370,38 +378,9 @@ class Server(object):
def entry_ops(cls, entries):
pfx = gauxpfx()
logging.debug('entries: %s' % repr(entries))
- # regular file
-
- def entry_pack_reg(gf, bn, mo, uid, gid):
- blen = len(bn)
- return struct.pack(cls._fmt_mknod(blen),
- uid, gid, gf, mo, bn,
- stat.S_IMODE(mo), 0, umask())
-
- def entry_pack_reg_stat(gf, bn, st):
- blen = len(bn)
- mo = st['mode']
- return struct.pack(cls._fmt_mknod(blen),
- st['uid'], st['gid'],
- gf, mo, bn,
- stat.S_IMODE(mo), 0, umask())
- # mkdir
-
- def entry_pack_mkdir(gf, bn, mo, uid, gid):
- blen = len(bn)
- return struct.pack(cls._fmt_mkdir(blen),
- uid, gid, gf, mo, bn,
- stat.S_IMODE(mo), umask())
- # symlink
-
- def entry_pack_symlink(gf, bn, lnk, st):
- blen = len(bn)
- llen = len(lnk)
- return struct.pack(cls._fmt_symlink(blen, llen),
- st['uid'], st['gid'],
- gf, st['mode'], bn, lnk)
-
- def entry_purge(op, entry, gfid, e):
+ dist_count = rconf.args.master_dist_count
+
+ def entry_purge(op, entry, gfid, e, uid, gid):
# This is an extremely racy code and needs to be fixed ASAP.
# The GFID check here is to be sure that the pargfid/bname
# to be purged is the GFID gotten from the changelog.
@@ -415,7 +394,7 @@ class Server(object):
return
if not matching_disk_gfid(gfid, entry):
- collect_failure(e, EEXIST)
+ collect_failure(e, EEXIST, uid, gid)
return
if op == 'UNLINK':
@@ -431,7 +410,7 @@ class Server(object):
if er == ENOTEMPTY:
return er
- def collect_failure(e, cmd_ret, dst=False):
+ def collect_failure(e, cmd_ret, uid, gid, dst=False):
slv_entry_info = {}
slv_entry_info['gfid_mismatch'] = False
slv_entry_info['name_mismatch'] = False
@@ -444,13 +423,18 @@ class Server(object):
if cmd_ret is None:
return False
- if cmd_ret == EEXIST:
+ if e.get("stat", {}):
+ # Copy actual UID/GID value back to entry stat
+ e['stat']['uid'] = uid
+ e['stat']['gid'] = gid
+
+ if cmd_ret in [EEXIST, ESTALE]:
if dst:
en = e['entry1']
else:
en = e['entry']
disk_gfid = get_gfid_from_mnt(en)
- if isinstance(disk_gfid, basestring) and \
+ if isinstance(disk_gfid, str) and \
e['gfid'] != disk_gfid:
slv_entry_info['gfid_mismatch'] = True
st = lstat(en)
@@ -505,7 +489,7 @@ class Server(object):
errno_wrap(os.rmdir, [path], [ENOENT, ESTALE], [EBUSY])
- def rename_with_disk_gfid_confirmation(gfid, entry, en):
+ def rename_with_disk_gfid_confirmation(gfid, entry, en, uid, gid):
if not matching_disk_gfid(gfid, entry):
logging.error(lf("RENAME ignored: source entry does not match "
"with on-disk gfid",
@@ -513,13 +497,13 @@ class Server(object):
gfid=gfid,
disk_gfid=get_gfid_from_mnt(entry),
target=en))
- collect_failure(e, EEXIST)
+ collect_failure(e, EEXIST, uid, gid)
return
cmd_ret = errno_wrap(os.rename,
[entry, en],
[ENOENT, EEXIST], [ESTALE, EBUSY])
- collect_failure(e, cmd_ret)
+ collect_failure(e, cmd_ret, uid, gid)
for e in entries:
blob = None
@@ -528,6 +512,12 @@ class Server(object):
entry = e['entry']
uid = 0
gid = 0
+
+ # Skip entry processing if it's marked true during gfid
+ # conflict resolution
+ if e['skip_entry']:
+ continue
+
if e.get("stat", {}):
# Copy UID/GID value and then reset to zero. Copied UID/GID
# will be used to run chown once entry is created.
@@ -540,7 +530,7 @@ class Server(object):
if op in ['RMDIR', 'UNLINK']:
# Try once, if rmdir failed with ENOTEMPTY
# then delete recursively.
- er = entry_purge(op, entry, gfid, e)
+ er = entry_purge(op, entry, gfid, e, uid, gid)
if isinstance(er, int):
if er == ENOTEMPTY and op == 'RMDIR':
# Retry if ENOTEMPTY, ESTALE
@@ -568,8 +558,8 @@ class Server(object):
st = lstat(slink)
# don't create multiple entries with same gfid
if isinstance(st, int):
- blob = entry_pack_reg(
- gfid, bname, e['mode'], e['uid'], e['gid'])
+ blob = entry_pack_reg(cls, gfid, bname,
+ e['mode'], e['uid'], e['gid'])
# Self healed hardlinks are recorded as MKNOD.
# So if the gfid already exists, it should be
# processed as hard link not mknod.
@@ -577,15 +567,15 @@ class Server(object):
cmd_ret = errno_wrap(os.link,
[slink, entry],
[ENOENT, EEXIST], [ESTALE])
- collect_failure(e, cmd_ret)
+ collect_failure(e, cmd_ret, uid, gid)
elif op == 'MKDIR':
en = e['entry']
slink = os.path.join(pfx, gfid)
st = lstat(slink)
# don't create multiple entries with same gfid
if isinstance(st, int):
- blob = entry_pack_mkdir(
- gfid, bname, e['mode'], e['uid'], e['gid'])
+ blob = entry_pack_mkdir(cls, gfid, bname,
+ e['mode'], e['uid'], e['gid'])
elif (isinstance(lstat(en), int) or
not matching_disk_gfid(gfid, en)):
# If gfid of a directory exists on slave but path based
@@ -597,6 +587,8 @@ class Server(object):
logging.info(lf("Special case: rename on mkdir",
gfid=gfid, entry=repr(entry)))
src_entry = get_slv_dir_path(slv_host, slv_volume, gfid)
+ if src_entry is None:
+ collect_failure(e, ENOENT, uid, gid)
if src_entry is not None and src_entry != entry:
slv_entry_info = {}
slv_entry_info['gfid_mismatch'] = False
@@ -613,23 +605,23 @@ class Server(object):
if isinstance(st, int):
(pg, bname) = entry2pb(entry)
if stat.S_ISREG(e['stat']['mode']):
- blob = entry_pack_reg_stat(gfid, bname, e['stat'])
+ blob = entry_pack_reg_stat(cls, gfid, bname, e['stat'])
elif stat.S_ISLNK(e['stat']['mode']):
- blob = entry_pack_symlink(gfid, bname, e['link'],
+ blob = entry_pack_symlink(cls, gfid, bname, e['link'],
e['stat'])
else:
cmd_ret = errno_wrap(os.link,
[slink, entry],
[ENOENT, EEXIST], [ESTALE])
- collect_failure(e, cmd_ret)
+ collect_failure(e, cmd_ret, uid, gid)
elif op == 'SYMLINK':
en = e['entry']
st = lstat(entry)
if isinstance(st, int):
- blob = entry_pack_symlink(gfid, bname, e['link'],
+ blob = entry_pack_symlink(cls, gfid, bname, e['link'],
e['stat'])
elif not matching_disk_gfid(gfid, en):
- collect_failure(e, EEXIST)
+ collect_failure(e, EEXIST, uid, gid)
elif op == 'RENAME':
en = e['entry1']
# The matching disk gfid check validates two things
@@ -641,32 +633,38 @@ class Server(object):
# exist with different gfid.
if not matching_disk_gfid(gfid, entry):
if e['stat'] and not stat.S_ISDIR(e['stat']['mode']):
- if stat.S_ISLNK(e['stat']['mode']) and \
- e['link'] is not None:
- st1 = lstat(en)
- if isinstance(st1, int):
- (pg, bname) = entry2pb(en)
- blob = entry_pack_symlink(gfid, bname,
- e['link'], e['stat'])
- elif not matching_disk_gfid(gfid, en):
- collect_failure(e, EEXIST, True)
+ if stat.S_ISLNK(e['stat']['mode']):
+ # src is not present, so don't sync symlink as
+ # we don't know target. It's ok to ignore. If
+ # it's unliked, it's fine. If it's renamed to
+ # something else, it will be synced then.
+ if e['link'] is not None:
+ st1 = lstat(en)
+ if isinstance(st1, int):
+ (pg, bname) = entry2pb(en)
+ blob = entry_pack_symlink(cls, gfid, bname,
+ e['link'],
+ e['stat'])
+ elif not matching_disk_gfid(gfid, en):
+ collect_failure(e, EEXIST, uid, gid, True)
else:
slink = os.path.join(pfx, gfid)
st = lstat(slink)
# don't create multiple entries with same gfid
if isinstance(st, int):
(pg, bname) = entry2pb(en)
- blob = entry_pack_reg_stat(gfid, bname,
+ blob = entry_pack_reg_stat(cls, gfid, bname,
e['stat'])
else:
cmd_ret = errno_wrap(os.link, [slink, en],
[ENOENT, EEXIST], [ESTALE])
- collect_failure(e, cmd_ret)
+ collect_failure(e, cmd_ret, uid, gid)
else:
st = lstat(entry)
st1 = lstat(en)
if isinstance(st1, int):
- rename_with_disk_gfid_confirmation(gfid, entry, en)
+ rename_with_disk_gfid_confirmation(gfid, entry, en,
+ uid, gid)
else:
if st.st_ino == st1.st_ino:
# we have a hard link, we can now unlink source
@@ -690,16 +688,23 @@ class Server(object):
raise
else:
raise
- elif not matching_disk_gfid(gfid, en):
- collect_failure(e, EEXIST, True)
+ elif not matching_disk_gfid(gfid, en) and dist_count > 1:
+ collect_failure(e, EEXIST, uid, gid, True)
else:
- rename_with_disk_gfid_confirmation(gfid, entry, en)
+ # We are here which means matching_disk_gfid for
+ # both source and destination has returned false
+ # and distribution count for master vol is greater
+ # then one. Which basically says both the source and
+ # destination exist and not hardlinks.
+ # So we are safe to go ahead with rename here.
+ rename_with_disk_gfid_confirmation(gfid, entry, en,
+ uid, gid)
if blob:
cmd_ret = errno_wrap(Xattr.lsetxattr,
[pg, 'glusterfs.gfid.newfile', blob],
- [EEXIST, ENOENT],
+ [EEXIST, ENOENT, ESTALE],
[ESTALE, EINVAL, EBUSY])
- collect_failure(e, cmd_ret)
+ collect_failure(e, cmd_ret, uid, gid)
# If UID/GID is different than zero that means we are trying
# create Entry with different UID/GID. Create Entry with
@@ -708,7 +713,7 @@ class Server(object):
path = os.path.join(pfx, gfid)
cmd_ret = errno_wrap(os.lchown, [path, uid, gid], [ENOENT],
[ESTALE, EINVAL])
- collect_failure(e, cmd_ret)
+ collect_failure(e, cmd_ret, uid, gid)
return failures
@@ -735,10 +740,8 @@ class Server(object):
# 'lchown' 'lchmod' 'utime with no-deference' blindly.
# But since 'lchmod' and 'utime with no de-reference' is
# not supported in python3, we have to rely on 'chmod'
- # and 'utime with de-reference'. But 'chmod'
- # de-reference the symlink and gets ENOENT, EACCES,
- # EPERM errors, hence ignoring those errors if it's on
- # symlink file.
+ # and 'utime with de-reference'. Hence avoiding 'chmod'
+ # and 'utime' if it's symlink file.
is_symlink = False
cmd_ret = errno_wrap(os.lchown, [go, uid, gid], [ENOENT],
@@ -746,19 +749,17 @@ class Server(object):
if isinstance(cmd_ret, int):
continue
- cmd_ret = errno_wrap(os.chmod, [go, mode],
- [ENOENT, EACCES, EPERM], [ESTALE, EINVAL])
- if isinstance(cmd_ret, int):
- is_symlink = os.path.islink(go)
- if not is_symlink:
+ is_symlink = os.path.islink(go)
+
+ if not is_symlink:
+ cmd_ret = errno_wrap(os.chmod, [go, mode],
+ [ENOENT, EACCES, EPERM], [ESTALE, EINVAL])
+ if isinstance(cmd_ret, int):
failures.append((e, cmd_ret, "chmod"))
- cmd_ret = errno_wrap(os.utime, [go, (atime, mtime)],
- [ENOENT, EACCES, EPERM], [ESTALE, EINVAL])
- if isinstance(cmd_ret, int):
- if not is_symlink:
- is_symlink = os.path.islink(go)
- if not is_symlink:
+ cmd_ret = errno_wrap(os.utime, [go, (atime, mtime)],
+ [ENOENT, EACCES, EPERM], [ESTALE, EINVAL])
+ if isinstance(cmd_ret, int):
failures.append((e, cmd_ret, "utime"))
return failures
@@ -834,7 +835,8 @@ class Mounter(object):
def umount_l(self, d):
"""perform lazy umount"""
- po = Popen(self.make_umount_argv(d), stderr=subprocess.PIPE)
+ po = Popen(self.make_umount_argv(d), stderr=subprocess.PIPE,
+ universal_newlines=True)
po.wait()
return po
@@ -858,7 +860,7 @@ class Mounter(object):
change into the mount, and lazy unmount the
filesystem.
"""
- mpi, mpo = os.pipe()
+ mpi, mpo = pipe()
mh = Popen.fork()
if mh:
# Parent
@@ -869,7 +871,9 @@ class Mounter(object):
if self.mntpt:
# mntpt is determined pre-mount
d = self.mntpt
- os.write(mpo, d + '\0')
+ mnt_msg = d + '\0'
+ encoded_msg = mnt_msg.encode()
+ os.write(mpo, encoded_msg)
po = Popen(margv, **self.mountkw)
self.handle_mounter(po)
po.terminate_geterr()
@@ -877,8 +881,11 @@ class Mounter(object):
if not d:
# mntpt is determined during mount
d = self.mntpt
- os.write(mpo, d + '\0')
- os.write(mpo, 'M')
+ mnt_msg = d + '\0'
+ encoded_msg = mnt_msg.encode()
+ os.write(mpo, encoded_msg)
+ encoded_msg = 'M'.encode()
+ os.write(mpo, encoded_msg)
t = syncdutils.Thread(target=lambda: os.chdir(d))
t.start()
tlim = rconf.starttime + gconf.get("connection-timeout")
@@ -907,6 +914,7 @@ class Mounter(object):
mntdata = ''
while True:
c = os.read(mpi, 1)
+ c = c.decode()
if not c:
break
mntdata += c
@@ -943,6 +951,16 @@ class Mounter(object):
logging.exception('mount cleanup failure:')
rv = 200
os._exit(rv)
+
+ #Polling the dht.subvol.status value.
+ RETRIES = 10
+ while not gf_mount_ready():
+ if RETRIES < 0:
+ logging.error('Subvols are not up')
+ break
+ RETRIES -= 1
+ time.sleep(0.2)
+
logging.debug('auxiliary glusterfs mount prepared')
@@ -950,7 +968,7 @@ class DirectMounter(Mounter):
"""mounter backend which calls mount(8), umount(8) directly"""
- mountkw = {'stderr': subprocess.PIPE}
+ mountkw = {'stderr': subprocess.PIPE, 'universal_newlines': True}
glusterprog = 'glusterfs'
@staticmethod
@@ -973,7 +991,8 @@ class MountbrokerMounter(Mounter):
"""mounter backend using the mountbroker gluster service"""
- mountkw = {'stderr': subprocess.PIPE, 'stdout': subprocess.PIPE}
+ mountkw = {'stderr': subprocess.PIPE, 'stdout': subprocess.PIPE,
+ 'universal_newlines': True}
glusterprog = 'gluster'
@classmethod
@@ -1014,6 +1033,7 @@ class GLUSTERServer(Server):
"""generic volume mark fetching/parsing backed"""
fmt_string = cls.NTV_FMTSTR + extra_fields
buf = Xattr.lgetxattr('.', xattr, struct.calcsize(fmt_string))
+ buf = str_to_bytearray(buf)
vm = struct.unpack(fmt_string, buf)
m = re.match(
'(.{8})(.{4})(.{4})(.{4})(.{12})',
@@ -1236,9 +1256,6 @@ class GLUSTER(object):
# register the crawlers and start crawling
# g1 ==> Xsync, g2 ==> config.change_detector(changelog by default)
# g3 ==> changelog History
- (inf, ouf, ra, wa) = rconf.args.rpc_fd.split(',')
- changelog_agent = RepceClient(int(inf), int(ouf))
-
status = GeorepStatus(gconf.get("state-file"),
rconf.args.local_node,
rconf.args.local_path,
@@ -1246,12 +1263,6 @@ class GLUSTER(object):
rconf.args.master,
rconf.args.slave)
status.reset_on_worker_start()
- rv = changelog_agent.version()
- if int(rv) != CHANGELOG_AGENT_CLIENT_VERSION:
- raise GsyncdError(
- "RePCe major version mismatch(changelog agent): "
- "local %s, remote %s" %
- (CHANGELOG_AGENT_CLIENT_VERSION, rv))
try:
workdir = g2.setup_working_dir()
@@ -1262,17 +1273,16 @@ class GLUSTER(object):
# register with the changelog library
# 9 == log level (DEBUG)
# 5 == connection retries
- changelog_agent.init()
- changelog_agent.register(rconf.args.local_path,
- workdir,
- gconf.get("changelog-log-file"),
- get_changelog_log_level(
- gconf.get("changelog-log-level")),
- g2.CHANGELOG_CONN_RETRIES)
+ libgfchangelog.register(rconf.args.local_path,
+ workdir,
+ gconf.get("changelog-log-file"),
+ get_changelog_log_level(
+ gconf.get("changelog-log-level")),
+ g2.CHANGELOG_CONN_RETRIES)
register_time = int(time.time())
- g2.register(register_time, changelog_agent, status)
- g3.register(register_time, changelog_agent, status)
+ g2.register(register_time, status)
+ g3.register(register_time, status)
except ChangelogException as e:
logging.error(lf("Changelog register failed", error=e))
sys.exit(1)
@@ -1407,7 +1417,9 @@ class SSH(object):
'--slave-gluster-log-level',
gconf.get("slave-gluster-log-level"),
'--slave-gluster-command-dir',
- gconf.get("slave-gluster-command-dir")]
+ gconf.get("slave-gluster-command-dir"),
+ '--master-dist-count',
+ str(gconf.get("master-distribution-count"))]
if gconf.get("slave-access-mount"):
args_to_slave.append('--slave-access-mount')
@@ -1472,12 +1484,13 @@ class SSH(object):
if log_rsync_performance:
# use stdout=PIPE only when log_rsync_performance enabled
- # Else rsync will write to stdout and nobody is their
+ # Else rsync will write to stdout and nobody is there
# to consume. If PIPE is full rsync hangs.
po = Popen(argv, stdin=subprocess.PIPE, stdout=subprocess.PIPE,
- stderr=subprocess.PIPE)
+ stderr=subprocess.PIPE, universal_newlines=True)
else:
- po = Popen(argv, stdin=subprocess.PIPE, stderr=subprocess.PIPE)
+ po = Popen(argv, stdin=subprocess.PIPE, stderr=subprocess.PIPE,
+ universal_newlines=True)
for f in files:
po.stdin.write(f)
@@ -1509,7 +1522,7 @@ class SSH(object):
return po
- def tarssh(self, files, slaveurl, log_err=False):
+ def tarssh(self, files, log_err=False):
"""invoke tar+ssh
-z (compress) can be use if needed, but omitting it now
as it results in weird error (tar+ssh errors out (errcode: 2)
@@ -1517,32 +1530,49 @@ class SSH(object):
if not files:
raise GsyncdError("no files to sync")
logging.debug("files: " + ", ".join(files))
- (host, rdir) = slaveurl.split(':')
+ (host, rdir) = self.slaveurl.split(':')
+
tar_cmd = ["tar"] + \
["--sparse", "-cf", "-", "--files-from", "-"]
- ssh_cmd = gconf.get("ssh-command-tar").split() + \
+ ssh_cmd = gconf.get("ssh-command").split() + \
gconf.get("ssh-options-tar").split() + \
["-p", str(gconf.get("ssh-port"))] + \
[host, "tar"] + \
["--overwrite", "-xf", "-", "-C", rdir]
p0 = Popen(tar_cmd, stdout=subprocess.PIPE,
- stdin=subprocess.PIPE, stderr=subprocess.PIPE)
- p1 = Popen(ssh_cmd, stdin=p0.stdout, stderr=subprocess.PIPE)
+ stdin=subprocess.PIPE, stderr=subprocess.PIPE,
+ universal_newlines=True)
+ p1 = Popen(ssh_cmd, stdin=p0.stdout, stderr=subprocess.PIPE,
+ universal_newlines=True)
for f in files:
p0.stdin.write(f)
p0.stdin.write('\n')
p0.stdin.close()
p0.stdout.close() # Allow p0 to receive a SIGPIPE if p1 exits.
- # wait for tar to terminate, collecting any errors, further
- # waiting for transfer to complete
- _, stderr1 = p1.communicate()
# stdin and stdout of p0 is already closed, Reset to None and
# wait for child process to complete
p0.stdin = None
p0.stdout = None
- p0.communicate()
+
+ def wait_for_tar(p0):
+ _, stderr = p0.communicate()
+ if log_err:
+ for errline in stderr.strip().split("\n")[:-1]:
+ if "No such file or directory" not in errline:
+ logging.error(lf("SYNC Error",
+ sync_engine="Tarssh",
+ error=errline))
+
+ t = syncdutils.Thread(target=wait_for_tar, args=(p0, ))
+ # wait for tar to terminate, collecting any errors, further
+ # waiting for transfer to complete
+ t.start()
+
+ # wait for ssh process
+ _, stderr1 = p1.communicate()
+ t.join()
if log_err:
for errline in stderr1.strip().split("\n")[:-1]: