diff options
| author | Kotresh HR <khiremat@redhat.com> | 2017-09-21 18:11:15 -0400 | 
|---|---|---|
| committer | Aravinda VK <avishwan@redhat.com> | 2017-11-10 05:36:22 +0000 | 
| commit | 0f524f0710229a7f8de3a4e1e6a2790d40f67a8e (patch) | |
| tree | d938aa2ba8c0b8dc7638c2740443d2d82557a099 | |
| parent | 0fc1c562d8b8d09ec2b59bc525ec5635a21a4561 (diff) | |
geo-rep: Fix rename of directory in hybrid crawl
In hybrid crawl, renames and unlink can't be
synced but directory renames can be detected.
While syncing the directory on slave, if the
gfid already exists, it should be rename.
Hence if directory gfid already exists, rename
it.
Change-Id: Ibf9f99e76a3e02795a3c2befd8cac48a5c365bb6
BUG: 1499566
Signed-off-by: Kotresh HR <khiremat@redhat.com>
| -rw-r--r-- | geo-replication/syncdaemon/gsyncd.py | 4 | ||||
| -rw-r--r-- | geo-replication/syncdaemon/monitor.py | 85 | ||||
| -rw-r--r-- | geo-replication/syncdaemon/resource.py | 189 | ||||
| -rw-r--r-- | geo-replication/syncdaemon/syncdutils.py | 238 | 
4 files changed, 276 insertions, 240 deletions
diff --git a/geo-replication/syncdaemon/gsyncd.py b/geo-replication/syncdaemon/gsyncd.py index 932e37d1124..adca0374c6c 100644 --- a/geo-replication/syncdaemon/gsyncd.py +++ b/geo-replication/syncdaemon/gsyncd.py @@ -39,7 +39,7 @@ from changelogagent import agent, Changelog  from gsyncdstatus import set_monitor_status, GeorepStatus, human_time_utc  from libcxattr import Xattr  import struct -from syncdutils import get_master_and_slave_data_from_args, lf +from syncdutils import get_master_and_slave_data_from_args, lf, Popen  ParseError = XET.ParseError if hasattr(XET, 'ParseError') else SyntaxError @@ -777,7 +777,7 @@ def main_i():      else:          label = 'slave'      startup(go_daemon=go_daemon, log_file=log_file, label=label) -    resource.Popen.init_errhandler() +    Popen.init_errhandler()      if be_agent:          os.setsid() diff --git a/geo-replication/syncdaemon/monitor.py b/geo-replication/syncdaemon/monitor.py index 4da933047c8..c6fa1076a85 100644 --- a/geo-replication/syncdaemon/monitor.py +++ b/geo-replication/syncdaemon/monitor.py @@ -16,7 +16,7 @@ import logging  import uuid  import xml.etree.ElementTree as XET  from subprocess import PIPE -from resource import Popen, FILE, GLUSTER, SSH +from resource import FILE, GLUSTER, SSH  from threading import Lock  from errno import ECHILD, ESRCH  import re @@ -24,8 +24,9 @@ import random  from gconf import gconf  from syncdutils import select, waitpid, errno_wrap, lf  from syncdutils import set_term_handler, is_host_local, GsyncdError -from syncdutils import escape, Thread, finalize, memoize +from syncdutils import escape, Thread, finalize  from syncdutils import gf_event, EVENT_GEOREP_FAULTY +from syncdutils import Volinfo, Popen  from gsyncdstatus import GeorepStatus, set_monitor_status @@ -91,86 +92,6 @@ def get_slave_bricks_status(host, vol):      return list(up_hosts) -class Volinfo(object): - -    def __init__(self, vol, host='localhost', prelude=[]): -        po = Popen(prelude + ['gluster', '--xml', '--remote-host=' + host, -                              'volume', 'info', vol], -                   stdout=PIPE, stderr=PIPE) -        vix = po.stdout.read() -        po.wait() -        po.terminate_geterr() -        vi = XET.fromstring(vix) -        if vi.find('opRet').text != '0': -            if prelude: -                via = '(via %s) ' % prelude.join(' ') -            else: -                via = ' ' -            raise GsyncdError('getting volume info of %s%s ' -                              'failed with errorcode %s' % -                              (vol, via, vi.find('opErrno').text)) -        self.tree = vi -        self.volume = vol -        self.host = host - -    def get(self, elem): -        return self.tree.findall('.//' + elem) - -    def is_tier(self): -        return (self.get('typeStr')[0].text == 'Tier') - -    def is_hot(self, brickpath): -        logging.debug('brickpath: ' + repr(brickpath)) -        return brickpath in self.hot_bricks - -    @property -    @memoize -    def bricks(self): -        def bparse(b): -            host, dirp = b.find("name").text.split(':', 2) -            return {'host': host, 'dir': dirp, 'uuid': b.find("hostUuid").text} -        return [bparse(b) for b in self.get('brick')] - -    @property -    @memoize -    def uuid(self): -        ids = self.get('id') -        if len(ids) != 1: -            raise GsyncdError("volume info of %s obtained from %s: " -                              "ambiguous uuid" % (self.volume, self.host)) -        return ids[0].text - -    def replica_count(self, tier, hot): -        if (tier and hot): -            return int(self.get('hotBricks/hotreplicaCount')[0].text) -        elif (tier and not hot): -            return int(self.get('coldBricks/coldreplicaCount')[0].text) -        else: -            return int(self.get('replicaCount')[0].text) - -    def disperse_count(self, tier, hot): -        if (tier and hot): -            # Tiering doesn't support disperse volume as hot brick, -            # hence no xml output, so returning 0. In case, if it's -            # supported later, we should change here. -            return 0 -        elif (tier and not hot): -            return int(self.get('coldBricks/colddisperseCount')[0].text) -        else: -            return int(self.get('disperseCount')[0].text) - -    @property -    @memoize -    def hot_bricks(self): -        return [b.text for b in self.get('hotBricks/brick')] - -    def get_hot_bricks_count(self, tier): -        if (tier): -            return int(self.get('hotBricks/hotbrickCount')[0].text) -        else: -            return 0 - -  class Monitor(object):      """class which spawns and manages gsyncd workers""" diff --git a/geo-replication/syncdaemon/resource.py b/geo-replication/syncdaemon/resource.py index 0ca023cd8c5..a9810ae325b 100644 --- a/geo-replication/syncdaemon/resource.py +++ b/geo-replication/syncdaemon/resource.py @@ -15,18 +15,15 @@ import stat  import time  import signal  import fcntl -import errno  import types  import struct  import socket  import logging  import tempfile -import threading  import subprocess  import errno  from errno import EEXIST, ENOENT, ENODATA, ENOTDIR, ELOOP, EACCES  from errno import EISDIR, ENOTEMPTY, ESTALE, EINVAL, EBUSY, EPERM -from select import error as SelectError  import shutil  from gconf import gconf @@ -43,7 +40,7 @@ from syncdutils import CHANGELOG_AGENT_CLIENT_VERSION  from syncdutils import GX_GFID_CANONICAL_LEN  from gsyncdstatus import GeorepStatus  from syncdutils import get_master_and_slave_data_from_args -from syncdutils import mntpt_list, lf +from syncdutils import mntpt_list, lf, Popen, sup, Volinfo  from syncdutils import Xattr, matching_disk_gfid, get_gfid_from_mnt  UrlRX = re.compile('\A(\w+)://([^ *?[]*)\Z') @@ -52,14 +49,9 @@ UserRX = re.compile("[\w!\#$%&'*+-\/=?^_`{|}~]+")  ENOTSUP = getattr(errno, 'ENOTSUP', 'EOPNOTSUPP') -def sup(x, *a, **kw): -    """a rubyesque "super" for python ;) - -    invoke caller method in parent class with given args. -    """ -    return getattr(super(type(x), x), -                   sys._getframe(1).f_code.co_name)(*a, **kw) - +slv_volume = None +slv_host = None +slv_bricks = None  def desugar(ustr):      """transform sugared url strings to standard <scheme>://<urlbody> form @@ -114,149 +106,6 @@ def parse_url(ustr):      return getattr(this, sch.upper())(path) -class Popen(subprocess.Popen): - -    """customized subclass of subprocess.Popen with a ring -    buffer for children error output""" - -    @classmethod -    def init_errhandler(cls): -        """start the thread which handles children's error output""" -        cls.errstore = {} - -        def tailer(): -            while True: -                errstore = cls.errstore.copy() -                try: -                    poe, _, _ = select( -                        [po.stderr for po in errstore], [], [], 1) -                except (ValueError, SelectError): -                    # stderr is already closed wait for some time before -                    # checking next error -                    time.sleep(0.5) -                    continue -                for po in errstore: -                    if po.stderr not in poe: -                        continue -                    po.lock.acquire() -                    try: -                        if po.on_death_row: -                            continue -                        la = errstore[po] -                        try: -                            fd = po.stderr.fileno() -                        except ValueError:  # file is already closed -                            time.sleep(0.5) -                            continue - -                        try: -                            l = os.read(fd, 1024) -                        except OSError: -                            time.sleep(0.5) -                            continue - -                        if not l: -                            continue -                        tots = len(l) -                        for lx in la: -                            tots += len(lx) -                        while tots > 1 << 20 and la: -                            tots -= len(la.pop(0)) -                        la.append(l) -                    finally: -                        po.lock.release() -        t = syncdutils.Thread(target=tailer) -        t.start() -        cls.errhandler = t - -    @classmethod -    def fork(cls): -        """fork wrapper that restarts errhandler thread in child""" -        pid = os.fork() -        if not pid: -            cls.init_errhandler() -        return pid - -    def __init__(self, args, *a, **kw): -        """customizations for subprocess.Popen instantiation - -        - 'close_fds' is taken to be the default -        - if child's stderr is chosen to be managed, -          register it with the error handler thread -        """ -        self.args = args -        if 'close_fds' not in kw: -            kw['close_fds'] = True -        self.lock = threading.Lock() -        self.on_death_row = False -        self.elines = [] -        try: -            sup(self, args, *a, **kw) -        except: -            ex = sys.exc_info()[1] -            if not isinstance(ex, OSError): -                raise -            raise GsyncdError("""execution of "%s" failed with %s (%s)""" % -                              (args[0], errno.errorcode[ex.errno], -                               os.strerror(ex.errno))) -        if kw.get('stderr') == subprocess.PIPE: -            assert(getattr(self, 'errhandler', None)) -            self.errstore[self] = [] - -    def errlog(self): -        """make a log about child's failure event""" -        logging.error(lf("command returned error", -                         cmd=" ".join(self.args), -                         error=self.returncode)) -        lp = '' - -        def logerr(l): -            logging.error(self.args[0] + "> " + l) -        for l in self.elines: -            ls = l.split('\n') -            ls[0] = lp + ls[0] -            lp = ls.pop() -            for ll in ls: -                logerr(ll) -        if lp: -            logerr(lp) - -    def errfail(self): -        """fail nicely if child did not terminate with success""" -        self.errlog() -        syncdutils.finalize(exval=1) - -    def terminate_geterr(self, fail_on_err=True): -        """kill child, finalize stderr harvesting (unregister -        from errhandler, set up .elines), fail on error if -        asked for -        """ -        self.lock.acquire() -        try: -            self.on_death_row = True -        finally: -            self.lock.release() -        elines = self.errstore.pop(self) -        if self.poll() is None: -            self.terminate() -            if self.poll() is None: -                time.sleep(0.1) -                self.kill() -                self.wait() -        while True: -            if not select([self.stderr], [], [], 0.1)[0]: -                break -            b = os.read(self.stderr.fileno(), 1024) -            if b: -                elines.append(b) -            else: -                break -        self.stderr.close() -        self.elines = elines -        if fail_on_err and self.returncode != 0: -            self.errfail() - -  class Server(object):      """singleton implemening those filesystem access primitives @@ -776,6 +625,31 @@ class Server(object):                  if isinstance(st, int):                      blob = entry_pack_mkdir(                          gfid, bname, e['mode'], e['uid'], e['gid']) +                else: +                    # If gfid of a directory exists on slave but path based +                    # create is getting EEXIST. This means the directory is +                    # renamed in master but recorded as MKDIR during hybrid +                    # crawl. Get the directory path by reading the backend +                    # symlink and trying to rename to new name as said by +                    # master. +                    global slv_bricks +                    global slv_volume +                    global slv_host +                    if not slv_bricks: +                        slv_info = Volinfo (slv_volume, slv_host) +                        slv_bricks = slv_info.bricks +                    # Result of readlink would be of format as below. +                    # readlink = "../../pgfid[0:2]/pgfid[2:4]/pgfid/basename" +                    realpath = os.readlink(os.path.join(slv_bricks[0]['dir'], +                                                        ".glusterfs", gfid[0:2], +                                                        gfid[2:4], gfid)) +                    realpath_parts = realpath.split('/') +                    src_pargfid = realpath_parts[-2] +                    src_basename = realpath_parts[-1] +                    src_entry = os.path.join(pfx, src_pargfid, src_basename) +                    logging.info(lf("Special case: rename on mkdir", +                                   gfid=gfid, entry=repr(entry))) +                    rename_with_disk_gfid_confirmation(gfid, src_entry, entry)              elif op == 'LINK':                  slink = os.path.join(pfx, gfid)                  st = lstat(slink) @@ -1309,6 +1183,11 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):      def __init__(self, path):          self.host, self.volume = sup(self, path, '^(%s):(.+)' % HostRX.pattern) +        global slv_volume +        global slv_host +        slv_volume = self.volume +        slv_host = self.host +      def canonical_path(self):          return ':'.join([gethostbyname(self.host), self.volume]) diff --git a/geo-replication/syncdaemon/syncdutils.py b/geo-replication/syncdaemon/syncdutils.py index 2187ecd226b..e611b7b6ae5 100644 --- a/geo-replication/syncdaemon/syncdutils.py +++ b/geo-replication/syncdaemon/syncdutils.py @@ -16,14 +16,18 @@ import fcntl  import shutil  import logging  import socket +import errno +import threading  import subprocess +from subprocess import PIPE  from threading import Lock, Thread as baseThread  from errno import EACCES, EAGAIN, EPIPE, ENOTCONN, ECONNABORTED  from errno import EINTR, ENOENT, EPERM, ESTALE, EBUSY, errorcode  from signal import signal, SIGTERM  import select as oselect  from os import waitpid as owaitpid -import subprocess +import xml.etree.ElementTree as XET +from select import error as SelectError  from conf import GLUSTERFS_LIBEXECDIR, UUID_FILE  sys.path.insert(1, GLUSTERFS_LIBEXECDIR) @@ -77,6 +81,15 @@ NEWLINE_ESCAPE_CHAR = "%0A"  PERCENTAGE_ESCAPE_CHAR = "%25" +def sup(x, *a, **kw): +    """a rubyesque "super" for python ;) + +    invoke caller method in parent class with given args. +    """ +    return getattr(super(type(x), x), +                   sys._getframe(1).f_code.co_name)(*a, **kw) + +  def escape(s):      """the chosen flavor of string escaping, used all over         to turn whatever data to creatable representation""" @@ -648,3 +661,226 @@ def lf(event, **kwargs):      for k, v in kwargs.items():          msg += "\t{0}={1}".format(k, v)      return msg + + +class Popen(subprocess.Popen): + +    """customized subclass of subprocess.Popen with a ring +    buffer for children error output""" + +    @classmethod +    def init_errhandler(cls): +        """start the thread which handles children's error output""" +        cls.errstore = {} + +        def tailer(): +            while True: +                errstore = cls.errstore.copy() +                try: +                    poe, _, _ = select( +                        [po.stderr for po in errstore], [], [], 1) +                except (ValueError, SelectError): +                    # stderr is already closed wait for some time before +                    # checking next error +                    time.sleep(0.5) +                    continue +                for po in errstore: +                    if po.stderr not in poe: +                        continue +                    po.lock.acquire() +                    try: +                        if po.on_death_row: +                            continue +                        la = errstore[po] +                        try: +                            fd = po.stderr.fileno() +                        except ValueError:  # file is already closed +                            time.sleep(0.5) +                            continue + +                        try: +                            l = os.read(fd, 1024) +                        except OSError: +                            time.sleep(0.5) +                            continue + +                        if not l: +                            continue +                        tots = len(l) +                        for lx in la: +                            tots += len(lx) +                        while tots > 1 << 20 and la: +                            tots -= len(la.pop(0)) +                        la.append(l) +                    finally: +                        po.lock.release() +        t = Thread(target=tailer) +        t.start() +        cls.errhandler = t + +    @classmethod +    def fork(cls): +        """fork wrapper that restarts errhandler thread in child""" +        pid = os.fork() +        if not pid: +            cls.init_errhandler() +        return pid + +    def __init__(self, args, *a, **kw): +        """customizations for subprocess.Popen instantiation + +        - 'close_fds' is taken to be the default +        - if child's stderr is chosen to be managed, +          register it with the error handler thread +        """ +        self.args = args +        if 'close_fds' not in kw: +            kw['close_fds'] = True +        self.lock = threading.Lock() +        self.on_death_row = False +        self.elines = [] +        try: +            sup(self, args, *a, **kw) +        except: +            ex = sys.exc_info()[1] +            if not isinstance(ex, OSError): +                raise +            raise GsyncdError("""execution of "%s" failed with %s (%s)""" % +                              (args[0], errno.errorcode[ex.errno], +                               os.strerror(ex.errno))) +        if kw.get('stderr') == subprocess.PIPE: +            assert(getattr(self, 'errhandler', None)) +            self.errstore[self] = [] + +    def errlog(self): +        """make a log about child's failure event""" +        logging.error(lf("command returned error", +                         cmd=" ".join(self.args), +                         error=self.returncode)) +        lp = '' + +        def logerr(l): +            logging.error(self.args[0] + "> " + l) +        for l in self.elines: +            ls = l.split('\n') +            ls[0] = lp + ls[0] +            lp = ls.pop() +            for ll in ls: +                logerr(ll) +        if lp: +            logerr(lp) + +    def errfail(self): +        """fail nicely if child did not terminate with success""" +        self.errlog() +        finalize(exval=1) + +    def terminate_geterr(self, fail_on_err=True): +        """kill child, finalize stderr harvesting (unregister +        from errhandler, set up .elines), fail on error if +        asked for +        """ +        self.lock.acquire() +        try: +            self.on_death_row = True +        finally: +            self.lock.release() +        elines = self.errstore.pop(self) +        if self.poll() is None: +            self.terminate() +            if self.poll() is None: +                time.sleep(0.1) +                self.kill() +                self.wait() +        while True: +            if not select([self.stderr], [], [], 0.1)[0]: +                break +            b = os.read(self.stderr.fileno(), 1024) +            if b: +                elines.append(b) +            else: +                break +        self.stderr.close() +        self.elines = elines +        if fail_on_err and self.returncode != 0: +            self.errfail() + + +class Volinfo(object): + +    def __init__(self, vol, host='localhost', prelude=[]): +        po = Popen(prelude + ['gluster', '--xml', '--remote-host=' + host, +                              'volume', 'info', vol], +                   stdout=PIPE, stderr=PIPE) +        vix = po.stdout.read() +        po.wait() +        po.terminate_geterr() +        vi = XET.fromstring(vix) +        if vi.find('opRet').text != '0': +            if prelude: +                via = '(via %s) ' % prelude.join(' ') +            else: +                via = ' ' +            raise GsyncdError('getting volume info of %s%s ' +                              'failed with errorcode %s' % +                              (vol, via, vi.find('opErrno').text)) +        self.tree = vi +        self.volume = vol +        self.host = host + +    def get(self, elem): +        return self.tree.findall('.//' + elem) + +    def is_tier(self): +        return (self.get('typeStr')[0].text == 'Tier') + +    def is_hot(self, brickpath): +        logging.debug('brickpath: ' + repr(brickpath)) +        return brickpath in self.hot_bricks + +    @property +    @memoize +    def bricks(self): +        def bparse(b): +            host, dirp = b.find("name").text.split(':', 2) +            return {'host': host, 'dir': dirp, 'uuid': b.find("hostUuid").text} +        return [bparse(b) for b in self.get('brick')] + +    @property +    @memoize +    def uuid(self): +        ids = self.get('id') +        if len(ids) != 1: +            raise GsyncdError("volume info of %s obtained from %s: " +                              "ambiguous uuid" % (self.volume, self.host)) +        return ids[0].text + +    def replica_count(self, tier, hot): +        if (tier and hot): +            return int(self.get('hotBricks/hotreplicaCount')[0].text) +        elif (tier and not hot): +            return int(self.get('coldBricks/coldreplicaCount')[0].text) +        else: +            return int(self.get('replicaCount')[0].text) + +    def disperse_count(self, tier, hot): +        if (tier and hot): +            # Tiering doesn't support disperse volume as hot brick, +            # hence no xml output, so returning 0. In case, if it's +            # supported later, we should change here. +            return 0 +        elif (tier and not hot): +            return int(self.get('coldBricks/colddisperseCount')[0].text) +        else: +            return int(self.get('disperseCount')[0].text) + +    @property +    @memoize +    def hot_bricks(self): +        return [b.text for b in self.get('hotBricks/brick')] + +    def get_hot_bricks_count(self, tier): +        if (tier): +            return int(self.get('hotBricks/hotbrickCount')[0].text) +        else: +            return 0  | 
