diff options
Diffstat (limited to 'geo-replication/syncdaemon/resource.py')
| -rw-r--r-- | geo-replication/syncdaemon/resource.py | 176 | 
1 files changed, 138 insertions, 38 deletions
diff --git a/geo-replication/syncdaemon/resource.py b/geo-replication/syncdaemon/resource.py index faf62f868c7..8deb5114b50 100644 --- a/geo-replication/syncdaemon/resource.py +++ b/geo-replication/syncdaemon/resource.py @@ -265,6 +265,9 @@ class Server(object):      FRGN_FMTSTR = NTV_FMTSTR + FRGN_XTRA_FMT      GX_GFID_CANONICAL_LEN = 37 # canonical gfid len + '\0' +    GFID_XATTR = 'trusted.gfid'  # for backend gfid fetch, do not use GX_NSPACE_PFX +    GFID_FMTSTR = "!" + "B"*16 +      local_path = ''      @classmethod @@ -305,6 +308,38 @@ class Server(object):              raise OSError(ENOTDIR, os.strerror(ENOTDIR))          return os.listdir(path) + +    @classmethod +    @_pathguard +    def lstat(cls, path): +        try: +            return os.lstat(path) +        except (IOError, OSError): +            ex = sys.exc_info()[1] +            if ex.errno == ENOENT: +                return ex.errno +            else: +                raise + + +    @classmethod +    @_pathguard +    def gfid(cls, path): +        try: +            buf = Xattr.lgetxattr(path, cls.GFID_XATTR, 16) +            m = re.match('(.{8})(.{4})(.{4})(.{4})(.{12})', "".join(['%02x' % x for x in struct.unpack(cls.GFID_FMTSTR, buf)])) +            return '-'.join(m.groups()) +        except (IOError, OSError): +            ex = sys.exc_info()[1] +            if ex.errno == ENOENT: +                return ex.errno +            else: +                raise + +    @classmethod +    def gfid_mnt(cls, gfidpath): +        return errno_wrap(Xattr.lgetxattr, [gfidpath, 'glusterfs.gfid.string', cls.GX_GFID_CANONICAL_LEN], [ENOENT]) +      @classmethod      @_pathguard      def purge(cls, path, entries=None): @@ -397,8 +432,42 @@ class Server(object):                  raise      @classmethod -    def gfid(cls, gfidpath): -        return errno_wrap(Xattr.lgetxattr, [gfidpath, 'glusterfs.gfid.string', cls.GX_GFID_CANONICAL_LEN], [ENOENT]) +    @_pathguard +    def stime_mnt(cls, path, uuid): +        """query xtime extended attribute + +        Return xtime of @path for @uuid as a pair of integers. +        "Normal" errors due to non-existent @path or extended attribute +        are tolerated and errno is returned in such a case. +        """ + +        try: +            return struct.unpack('!II', Xattr.lgetxattr(path, '.'.join([cls.GX_NSPACE, uuid, 'stime']), 8)) +        except OSError: +            ex = sys.exc_info()[1] +            if ex.errno in (ENOENT, ENODATA, ENOTDIR): +                return ex.errno +            else: +                raise + +    @classmethod +    @_pathguard +    def stime(cls, path, uuid): +        """query xtime extended attribute + +        Return xtime of @path for @uuid as a pair of integers. +        "Normal" errors due to non-existent @path or extended attribute +        are tolerated and errno is returned in such a case. +        """ + +        try: +            return struct.unpack('!II', Xattr.lgetxattr(path, '.'.join([cls.GX_NSPACE, uuid, 'stime']), 8)) +        except OSError: +            ex = sys.exc_info()[1] +            if ex.errno in (ENOENT, ENODATA, ENOTDIR): +                return ex.errno +            else: +                raise      @classmethod      def node_uuid(cls, path='.'): @@ -409,21 +478,10 @@ class Server(object):              raise      @classmethod -    def xtime_vec(cls, path, *uuids): -        """vectored version of @xtime - -        accepts a list of uuids and returns a dictionary -        with uuid as key(s) and xtime as value(s) -        """ -        xt = {} -        for uuid in uuids: -            xtu = cls.xtime(path, uuid) -            if xtu == ENODATA: -                xtu = None -            if isinstance(xtu, int): -                return xtu -            xt[uuid] = xtu -        return xt +    @_pathguard +    def set_stime(cls, path, uuid, mark): +        """set @mark as stime for @uuid on @path""" +        Xattr.lsetxattr(path, '.'.join([cls.GX_NSPACE, uuid, 'stime']), struct.pack('!II', *mark))      @classmethod      @_pathguard @@ -444,20 +502,16 @@ class Server(object):          Xattr.lsetxattr(path, '.'.join([cls.GX_NSPACE, uuid, 'xtime']), struct.pack('!II', *mark))      @classmethod -    def set_xtime_vec(cls, path, mark_dct): -        """vectored (or dictered) version of set_xtime - -        ignore values that match @ignore -        """ -        for u,t in mark_dct.items(): -            cls.set_xtime(path, u, t) - -    @classmethod      def entry_ops(cls, entries):          pfx = gauxpfx()          logging.debug('entries: %s' % repr(entries))          # regular file -        def entry_pack_reg(gf, bn, st): +        def entry_pack_reg(gf, bn, mo, uid, gid): +            blen = len(bn) +            return struct.pack(cls._fmt_mknod(blen), +                               uid, gid, gf, mo, bn, +                               stat.S_IMODE(mo), 0, umask()) +        def entry_pack_reg_stat(gf, bn, st):              blen = len(bn)              mo = st['mode']              return struct.pack(cls._fmt_mknod(blen), @@ -465,12 +519,10 @@ class Server(object):                                 gf, mo, bn,                                 stat.S_IMODE(mo), 0, umask())          # mkdir -        def entry_pack_mkdir(gf, bn, st): +        def entry_pack_mkdir(gf, bn, mo, uid, gid):              blen = len(bn) -            mo = st['mode']              return struct.pack(cls._fmt_mkdir(blen), -                               st['uid'], st['gid'], -                               gf, mo, bn, +                               uid, gid, gf, mo, bn,                                 stat.S_IMODE(mo), umask())          #symlink          def entry_pack_symlink(gf, bn, lnk, st): @@ -485,7 +537,7 @@ class Server(object):              # to be purged is the GFID gotten from the changelog.              # (a stat(changelog_gfid) would also be valid here)              # The race here is between the GFID check and the purge. -            disk_gfid = cls.gfid(entry) +            disk_gfid = cls.gfid_mnt(entry)              if isinstance(disk_gfid, int):                  return              if not gfid == disk_gfid: @@ -510,15 +562,15 @@ class Server(object):                      else:                          break              elif op in ['CREATE', 'MKNOD']: -                blob = entry_pack_reg(gfid, bname, e['stat']) +                blob = entry_pack_reg(gfid, bname, e['mode'], e['uid'], e['uid'])              elif op == 'MKDIR': -                blob = entry_pack_mkdir(gfid, bname, e['stat']) +                blob = entry_pack_mkdir(gfid, bname, e['mode'], e['uid'], e['uid'])              elif op == 'LINK':                  slink = os.path.join(pfx, gfid)                  st = lstat(slink)                  if isinstance(st, int):                      (pg, bname) = entry2pb(entry) -                    blob = entry_pack_reg(gfid, bname, e['stat']) +                    blob = entry_pack_reg_stat(gfid, bname, e['stat'])                  else:                      errno_wrap(os.link, [slink, entry], [ENOENT, EEXIST])              elif op == 'SYMLINK': @@ -528,13 +580,24 @@ class Server(object):                  st = lstat(entry)                  if isinstance(st, int):                      (pg, bname) = entry2pb(en) -                    blob = entry_pack_reg(gfid, bname, e['stat']) +                    blob = entry_pack_reg_stat(gfid, bname, e['stat'])                  else:                      errno_wrap(os.rename, [entry, en], [ENOENT, EEXIST])              if blob:                  errno_wrap(Xattr.lsetxattr_l, [pg, 'glusterfs.gfid.newfile', blob], [EEXIST], [ENOENT, ESTALE, EINVAL])      @classmethod +    def meta_ops(cls, meta_entries): +        logging.debug('Meta-entries: %s' % repr(meta_entries)) +        for e in meta_entries: +            mode = e['stat']['mode'] +            uid  = e['stat']['uid'] +            gid  = e['stat']['gid'] +            go   = e['go'] +            errno_wrap(os.chmod, [go, mode], [ENOENT], [ESTALE, EINVAL]) +            errno_wrap(os.chown, [go, uid, gid], [ENOENT], [ESTALE, EINVAL]) + +    @classmethod      def changelog_register(cls, cl_brick, cl_dir, cl_log, cl_level, retries = 0):          Changes.cl_register(cl_brick, cl_dir, cl_log, cl_level, retries) @@ -699,6 +762,29 @@ class SlaveRemote(object):          return po +    def tarssh(self, files, slaveurl): +        """invoke tar+ssh +        -z (compress) can be use if needed, but ommitting it now +        as it results in wierd error (tar+ssh errors out (errcode: 2) +        """ +        if not files: +            raise GsyncdError("no files to sync") +        logging.debug("files: " + ", ".join(files)) +        (host, rdir) = slaveurl.split(':') +        tar_cmd = ["tar", "-cf", "-", "--files-from", "-"] +        ssh_cmd = gconf.ssh_command_tar.split() +  [host, "tar", "--overwrite", "-xf", "-", "-C", rdir] +        p0 = Popen(tar_cmd, stdout=subprocess.PIPE, stdin=subprocess.PIPE, stderr=subprocess.PIPE) +        p1 = Popen(ssh_cmd, stdin=p0.stdout, stderr=subprocess.PIPE) +        for f in files: +            p0.stdin.write(f) +            p0.stdin.write('\n') +        p0.stdin.close() +        p0.wait() + +        p1.wait() +        p1.terminate_geterr(fail_on_err = False) + +        return p1  class AbstractUrl(object):      """abstract base class for url scheme classes""" @@ -1041,12 +1127,20 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):                              except ValueError:                                  pass                          return e +                    @classmethod +                    def lstat(cls, e): +                        """ path based backend stat """ +                        return super(brickserver, cls).lstat(e) +                    @classmethod +                    def gfid(cls, e): +                        """ path based backend gfid fetch """ +                        return super(brickserver, cls).gfid(e)                  if gconf.slave_id:                      # define {,set_}xtime in slave, thus preempting                      # the call to remote, so that it takes data from                      # the local brick -                    slave.server.xtime = types.MethodType(lambda _self, path, uuid: brickserver.xtime(path, uuid + '.' + gconf.slave_id), slave.server) -                    slave.server.set_xtime = types.MethodType(lambda _self, path, uuid, mark: brickserver.set_xtime(path, uuid + '.' + gconf.slave_id, mark), slave.server) +                    slave.server.stime = types.MethodType(lambda _self, path, uuid: brickserver.stime(path, uuid + '.' + gconf.slave_id), slave.server) +                    slave.server.set_stime = types.MethodType(lambda _self, path, uuid, mark: brickserver.set_stime(path, uuid + '.' + gconf.slave_id, mark), slave.server)                  (g1, g2) = self.gmaster_instantiate_tuple(slave)                  g1.master.server = brickserver                  g2.master.server = brickserver @@ -1067,6 +1161,9 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):      def rsync(self, files):          return sup(self, files, self.slavedir) +    def tarssh(self, files): +        return sup(self, files, self.slavedir) +  class SSH(AbstractUrl, SlaveRemote):      """scheme class for ssh:// urls @@ -1170,3 +1267,6 @@ class SSH(AbstractUrl, SlaveRemote):      def rsync(self, files):          return sup(self, files, '-e', " ".join(gconf.ssh_command.split() + gconf.ssh_ctl_args),                     *(gconf.rsync_ssh_options.split() + [self.slaveurl])) + +    def tarssh(self, files): +        return sup(self, files, self.slaveurl)  | 
