summaryrefslogtreecommitdiffstats
path: root/geo-replication/syncdaemon/resource.py
diff options
context:
space:
mode:
authorKotresh HR <khiremat@redhat.com>2017-09-21 18:11:15 -0400
committerAravinda VK <avishwan@redhat.com>2017-11-10 05:36:22 +0000
commit0f524f0710229a7f8de3a4e1e6a2790d40f67a8e (patch)
treed938aa2ba8c0b8dc7638c2740443d2d82557a099 /geo-replication/syncdaemon/resource.py
parent0fc1c562d8b8d09ec2b59bc525ec5635a21a4561 (diff)
geo-rep: Fix rename of directory in hybrid crawl
In hybrid crawl, renames and unlink can't be synced but directory renames can be detected. While syncing the directory on slave, if the gfid already exists, it should be rename. Hence if directory gfid already exists, rename it. Change-Id: Ibf9f99e76a3e02795a3c2befd8cac48a5c365bb6 BUG: 1499566 Signed-off-by: Kotresh HR <khiremat@redhat.com>
Diffstat (limited to 'geo-replication/syncdaemon/resource.py')
-rw-r--r--geo-replication/syncdaemon/resource.py189
1 files changed, 34 insertions, 155 deletions
diff --git a/geo-replication/syncdaemon/resource.py b/geo-replication/syncdaemon/resource.py
index 0ca023cd8c5..a9810ae325b 100644
--- a/geo-replication/syncdaemon/resource.py
+++ b/geo-replication/syncdaemon/resource.py
@@ -15,18 +15,15 @@ import stat
import time
import signal
import fcntl
-import errno
import types
import struct
import socket
import logging
import tempfile
-import threading
import subprocess
import errno
from errno import EEXIST, ENOENT, ENODATA, ENOTDIR, ELOOP, EACCES
from errno import EISDIR, ENOTEMPTY, ESTALE, EINVAL, EBUSY, EPERM
-from select import error as SelectError
import shutil
from gconf import gconf
@@ -43,7 +40,7 @@ from syncdutils import CHANGELOG_AGENT_CLIENT_VERSION
from syncdutils import GX_GFID_CANONICAL_LEN
from gsyncdstatus import GeorepStatus
from syncdutils import get_master_and_slave_data_from_args
-from syncdutils import mntpt_list, lf
+from syncdutils import mntpt_list, lf, Popen, sup, Volinfo
from syncdutils import Xattr, matching_disk_gfid, get_gfid_from_mnt
UrlRX = re.compile('\A(\w+)://([^ *?[]*)\Z')
@@ -52,14 +49,9 @@ UserRX = re.compile("[\w!\#$%&'*+-\/=?^_`{|}~]+")
ENOTSUP = getattr(errno, 'ENOTSUP', 'EOPNOTSUPP')
-def sup(x, *a, **kw):
- """a rubyesque "super" for python ;)
-
- invoke caller method in parent class with given args.
- """
- return getattr(super(type(x), x),
- sys._getframe(1).f_code.co_name)(*a, **kw)
-
+slv_volume = None
+slv_host = None
+slv_bricks = None
def desugar(ustr):
"""transform sugared url strings to standard <scheme>://<urlbody> form
@@ -114,149 +106,6 @@ def parse_url(ustr):
return getattr(this, sch.upper())(path)
-class Popen(subprocess.Popen):
-
- """customized subclass of subprocess.Popen with a ring
- buffer for children error output"""
-
- @classmethod
- def init_errhandler(cls):
- """start the thread which handles children's error output"""
- cls.errstore = {}
-
- def tailer():
- while True:
- errstore = cls.errstore.copy()
- try:
- poe, _, _ = select(
- [po.stderr for po in errstore], [], [], 1)
- except (ValueError, SelectError):
- # stderr is already closed wait for some time before
- # checking next error
- time.sleep(0.5)
- continue
- for po in errstore:
- if po.stderr not in poe:
- continue
- po.lock.acquire()
- try:
- if po.on_death_row:
- continue
- la = errstore[po]
- try:
- fd = po.stderr.fileno()
- except ValueError: # file is already closed
- time.sleep(0.5)
- continue
-
- try:
- l = os.read(fd, 1024)
- except OSError:
- time.sleep(0.5)
- continue
-
- if not l:
- continue
- tots = len(l)
- for lx in la:
- tots += len(lx)
- while tots > 1 << 20 and la:
- tots -= len(la.pop(0))
- la.append(l)
- finally:
- po.lock.release()
- t = syncdutils.Thread(target=tailer)
- t.start()
- cls.errhandler = t
-
- @classmethod
- def fork(cls):
- """fork wrapper that restarts errhandler thread in child"""
- pid = os.fork()
- if not pid:
- cls.init_errhandler()
- return pid
-
- def __init__(self, args, *a, **kw):
- """customizations for subprocess.Popen instantiation
-
- - 'close_fds' is taken to be the default
- - if child's stderr is chosen to be managed,
- register it with the error handler thread
- """
- self.args = args
- if 'close_fds' not in kw:
- kw['close_fds'] = True
- self.lock = threading.Lock()
- self.on_death_row = False
- self.elines = []
- try:
- sup(self, args, *a, **kw)
- except:
- ex = sys.exc_info()[1]
- if not isinstance(ex, OSError):
- raise
- raise GsyncdError("""execution of "%s" failed with %s (%s)""" %
- (args[0], errno.errorcode[ex.errno],
- os.strerror(ex.errno)))
- if kw.get('stderr') == subprocess.PIPE:
- assert(getattr(self, 'errhandler', None))
- self.errstore[self] = []
-
- def errlog(self):
- """make a log about child's failure event"""
- logging.error(lf("command returned error",
- cmd=" ".join(self.args),
- error=self.returncode))
- lp = ''
-
- def logerr(l):
- logging.error(self.args[0] + "> " + l)
- for l in self.elines:
- ls = l.split('\n')
- ls[0] = lp + ls[0]
- lp = ls.pop()
- for ll in ls:
- logerr(ll)
- if lp:
- logerr(lp)
-
- def errfail(self):
- """fail nicely if child did not terminate with success"""
- self.errlog()
- syncdutils.finalize(exval=1)
-
- def terminate_geterr(self, fail_on_err=True):
- """kill child, finalize stderr harvesting (unregister
- from errhandler, set up .elines), fail on error if
- asked for
- """
- self.lock.acquire()
- try:
- self.on_death_row = True
- finally:
- self.lock.release()
- elines = self.errstore.pop(self)
- if self.poll() is None:
- self.terminate()
- if self.poll() is None:
- time.sleep(0.1)
- self.kill()
- self.wait()
- while True:
- if not select([self.stderr], [], [], 0.1)[0]:
- break
- b = os.read(self.stderr.fileno(), 1024)
- if b:
- elines.append(b)
- else:
- break
- self.stderr.close()
- self.elines = elines
- if fail_on_err and self.returncode != 0:
- self.errfail()
-
-
class Server(object):
"""singleton implemening those filesystem access primitives
@@ -776,6 +625,31 @@ class Server(object):
if isinstance(st, int):
blob = entry_pack_mkdir(
gfid, bname, e['mode'], e['uid'], e['gid'])
+ else:
+ # If gfid of a directory exists on slave but path based
+ # create is getting EEXIST. This means the directory is
+ # renamed in master but recorded as MKDIR during hybrid
+ # crawl. Get the directory path by reading the backend
+ # symlink and trying to rename to new name as said by
+ # master.
+ global slv_bricks
+ global slv_volume
+ global slv_host
+ if not slv_bricks:
+ slv_info = Volinfo (slv_volume, slv_host)
+ slv_bricks = slv_info.bricks
+ # Result of readlink would be of format as below.
+ # readlink = "../../pgfid[0:2]/pgfid[2:4]/pgfid/basename"
+ realpath = os.readlink(os.path.join(slv_bricks[0]['dir'],
+ ".glusterfs", gfid[0:2],
+ gfid[2:4], gfid))
+ realpath_parts = realpath.split('/')
+ src_pargfid = realpath_parts[-2]
+ src_basename = realpath_parts[-1]
+ src_entry = os.path.join(pfx, src_pargfid, src_basename)
+ logging.info(lf("Special case: rename on mkdir",
+ gfid=gfid, entry=repr(entry)))
+ rename_with_disk_gfid_confirmation(gfid, src_entry, entry)
elif op == 'LINK':
slink = os.path.join(pfx, gfid)
st = lstat(slink)
@@ -1309,6 +1183,11 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):
def __init__(self, path):
self.host, self.volume = sup(self, path, '^(%s):(.+)' % HostRX.pattern)
+ global slv_volume
+ global slv_host
+ slv_volume = self.volume
+ slv_host = self.host
+
def canonical_path(self):
return ':'.join([gethostbyname(self.host), self.volume])