From b13c483dca20e4015b958f8959328e665a357f60 Mon Sep 17 00:00:00 2001 From: Avra Sengupta Date: Sat, 1 Jun 2013 16:17:57 +0530 Subject: gsyncd: distribute the crawling load * also consume changelog for change detection. * Status fixes * Use new libgfchangelog done API * process (and sync) one changelog at a time Change-Id: I24891615bb762e0741b1819ddfdef8802326cb16 BUG: 847839 Original Author: Csaba Henk Original Author: Aravinda VK Original Author: Venky Shankar Original Author: Amar Tumballi Original Author: Avra Sengupta Signed-off-by: Avra Sengupta Reviewed-on: http://review.gluster.org/5131 Reviewed-by: Vijay Bellur Tested-by: Vijay Bellur --- geo-replication/syncdaemon/resource.py | 207 ++++++++++++++++++++++++++++++--- 1 file changed, 188 insertions(+), 19 deletions(-) (limited to 'geo-replication/syncdaemon/resource.py') diff --git a/geo-replication/syncdaemon/resource.py b/geo-replication/syncdaemon/resource.py index 73102fbc..52989fe2 100644 --- a/geo-replication/syncdaemon/resource.py +++ b/geo-replication/syncdaemon/resource.py @@ -5,13 +5,14 @@ import stat import time import fcntl import errno +import types import struct import socket import logging import tempfile import threading import subprocess -from errno import EEXIST, ENOENT, ENODATA, ENOTDIR, ELOOP, EISDIR +from errno import EEXIST, ENOENT, ENODATA, ENOTDIR, ELOOP, EISDIR, ENOTEMPTY from select import error as SelectError from gconf import gconf @@ -19,7 +20,8 @@ import repce from repce import RepceServer, RepceClient from master import gmaster_builder import syncdutils -from syncdutils import GsyncdError, select, privileged, boolify +from syncdutils import GsyncdError, select, privileged, boolify, funcode +from syncdutils import umask, entry2pb, gauxpfx, errno_wrap UrlRX = re.compile('\A(\w+)://([^ *?[]*)\Z') HostRX = re.compile('[a-z\d](?:[a-z\d.-]*[a-z\d])?', re.I) @@ -105,7 +107,18 @@ class _MetaXattr(object): setattr(self, m, getattr(LXattr, m)) return getattr(self, meth) +class _MetaChangelog(object): + def __getattr__(self, meth): + from libgfchangelog import Changes as LChanges + xmeth = [ m for m in dir(LChanges) if m[0] != '_' ] + if not meth in xmeth: + return + for m in xmeth: + setattr(self, m, getattr(LChanges, m)) + return getattr(self, meth) + Xattr = _MetaXattr() +Changes = _MetaChangelog() class Popen(subprocess.Popen): @@ -245,10 +258,24 @@ class Server(object): and classmethods and is used directly, without instantiation.) """ - GX_NSPACE = (privileged() and "trusted" or "system") + ".glusterfs" + GX_NSPACE_PFX = (privileged() and "trusted" or "system") + GX_NSPACE = GX_NSPACE_PFX + ".glusterfs" NTV_FMTSTR = "!" + "B"*19 + "II" FRGN_XTRA_FMT = "I" FRGN_FMTSTR = NTV_FMTSTR + FRGN_XTRA_FMT + GX_GFID_CANONICAL_LEN = 37 # canonical gfid len + '\0' + + local_path = '' + + @classmethod + def _fmt_mknod(cls, l): + return "!II%dsI%dsIII" % (cls.GX_GFID_CANONICAL_LEN, l+1) + @classmethod + def _fmt_mkdir(cls, l): + return "!II%dsI%dsII" % (cls.GX_GFID_CANONICAL_LEN, l+1) + @classmethod + def _fmt_symlink(cls, l1, l2): + return "!II%dsI%ds%ds" % (cls.GX_GFID_CANONICAL_LEN, l1+1, l2+1) def _pathguard(f): """decorator method that checks @@ -257,22 +284,21 @@ class Server(object): point out of the managed tree """ - fc = getattr(f, 'func_code', None) - if not fc: - # python 3 - fc = f.__code__ + fc = funcode(f) pi = list(fc.co_varnames).index('path') def ff(*a): path = a[pi] ps = path.split('/') if path[0] == '/' or '..' in ps: raise ValueError('unsafe path') + a = list(a) + a[pi] = os.path.join(a[0].local_path, path) return f(*a) return ff - @staticmethod + @classmethod @_pathguard - def entries(path): + def entries(cls, path): """directory entries in an array""" # prevent symlinks being followed if not stat.S_ISDIR(os.lstat(path).st_mode): @@ -370,6 +396,18 @@ class Server(object): else: raise + @classmethod + def gfid(cls, gfidpath): + return errno_wrap(Xattr.lgetxattr, [gfidpath, 'glusterfs.gfid', cls.GX_GFID_CANONICAL_LEN], [ENOENT]) + + @classmethod + def node_uuid(cls, path='.'): + try: + uuid_l = Xattr.lgetxattr_buf(path, '.'.join([cls.GX_NSPACE, 'node-uuid'])) + return uuid_l[:-1].split(' ') + except OSError: + raise + @classmethod def xtime_vec(cls, path, *uuids): """vectored version of @xtime @@ -402,9 +440,96 @@ class Server(object): for u,t in mark_dct.items(): cls.set_xtime(path, u, t) - @staticmethod + @classmethod + def entry_ops(cls, entries): + pfx = gauxpfx() + logging.debug('entries: %s' % repr(entries)) + # regular file + def entry_pack_reg(gf, bn, st): + blen = len(bn) + mo = st['mode'] + return struct.pack(cls._fmt_mknod(blen), + st['uid'], st['gid'], + gf, mo, bn, + stat.S_IMODE(mo), 0, umask()) + # mkdir + def entry_pack_mkdir(gf, bn, st): + blen = len(bn) + mo = st['mode'] + return struct.pack(cls._fmt_mkdir(blen), + st['uid'], st['gid'], + gf, mo, bn, + stat.S_IMODE(mo), umask()) + #symlink + def entry_pack_symlink(gf, bn, lnk, st): + blen = len(bn) + llen = len(lnk) + return struct.pack(cls._fmt_symlink(blen, llen), + st['uid'], st['gid'], + gf, st['mode'], bn, lnk) + def entry_purge(entry, gfid): + # This is an extremely racy code and needs to be fixed ASAP. + # The GFID check here is to be sure that the pargfid/bname + # to be purged is the GFID gotten from the changelog. + # (a stat(changelog_gfid) would also be valid here) + # The race here is between the GFID check and the purge. + disk_gfid = cls.gfid(entry) + if isinstance(disk_gfid, int): + return + if not gfid == disk_gfid: + return + er = errno_wrap(os.unlink, [entry], [ENOENT, EISDIR]) + if isinstance(er, int): + if er == EISDIR: + er = errno_wrap(os.rmdir, [entry], [ENOENT, ENOTEMPTY]) + if er == ENOTEMPTY: + return er + for e in entries: + blob = None + op = e['op'] + gfid = e['gfid'] + entry = e['entry'] + (pg, bname) = entry2pb(entry) + if op in ['RMDIR', 'UNLINK']: + while True: + er = entry_purge(entry, gfid) + if isinstance(er, int): + time.sleep(1) + else: + break + elif op == 'CREATE': + blob = entry_pack_reg(gfid, bname, e['stat']) + elif op == 'MKDIR': + blob = entry_pack_mkdir(gfid, bname, e['stat']) + elif op == 'LINK': + errno_wrap(os.link, [os.path.join(pfx, gfid), entry], [ENOENT, EEXIST]) + elif op == 'SYMLINK': + blob = entry_pack_symlink(gfid, bname, e['link'], e['stat']) + elif op == 'RENAME': + en = e['entry1'] + errno_wrap(os.rename, [entry, en], [ENOENT, EEXIST]) + if blob: + errno_wrap(Xattr.lsetxattr_l, [pg, 'glusterfs.gfid.newfile', blob], [ENOENT, EEXIST]) + + @classmethod + def changelog_register(cls, cl_brick, cl_dir, cl_log, cl_level, retries = 0): + Changes.cl_register(cl_brick, cl_dir, cl_log, cl_level, retries) + + @classmethod + def changelog_scan(cls): + Changes.cl_scan() + + @classmethod + def changelog_getchanges(cls): + return Changes.cl_getchanges() + + @classmethod + def changelog_done(cls, clfile): + Changes.cl_done(clfile) + + @classmethod @_pathguard - def setattr(path, adct): + def setattr(cls, path, adct): """set file attributes @adct is a dict, where 'own', 'mode' and 'times' @@ -537,10 +662,10 @@ class SlaveRemote(object): raise GsyncdError("no files to sync") logging.debug("files: " + ", ".join(files)) argv = gconf.rsync_command.split() + \ - ['-aR0', '--files-from=-', '--super','--stats', '--numeric-ids', '--no-implied-dirs'] + \ + ['-avR0', '--inplace', '--files-from=-', '--super','--stats', '--numeric-ids', '--no-implied-dirs'] + \ gconf.rsync_options.split() + (boolify(gconf.use_rsync_xattrs) and ['--xattrs'] or []) + \ ['.'] + list(args) - po = Popen(argv, stdin=subprocess.PIPE,stdout=subprocess.PIPE,stderr=subprocess.PIPE) + po = Popen(argv, stdin=subprocess.PIPE,stderr=subprocess.PIPE) for f in files: po.stdin.write(f) po.stdin.write('\0') @@ -685,7 +810,8 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote): def can_connect_to(self, remote): """determine our position in the connectibility matrix""" - return True + return not remote or \ + (isinstance(remote, SSH) and isinstance(remote.inner_rsc, GLUSTER)) class Mounter(object): """Abstract base class for mounter backends""" @@ -864,6 +990,10 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote): sup(self, *a, **kw) self.slavedir = "/proc/%d/cwd" % self.server.pid() + def gmaster_instantiate_tuple(self, slave): + """return a tuple of the 'one shot' and the 'main crawl' class instance""" + return (gmaster_builder('xsync')(self, slave), gmaster_builder()(self, slave)) + def service_loop(self, *args): """enter service loop @@ -873,7 +1003,41 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote): - else do that's what's inherited """ if args: - gmaster_builder()(self, args[0]).crawl_loop() + slave = args[0] + if gconf.local_path: + class brickserver(FILE.FILEServer): + local_path = gconf.local_path + aggregated = self.server + @classmethod + def entries(cls, path): + e = super(brickserver, cls).entries(path) + # on the brick don't mess with /.glusterfs + if path == '.': + try: + e.remove('.glusterfs') + except ValueError: + pass + return e + if gconf.slave_id: + # define {,set_}xtime in slave, thus preempting + # the call to remote, so that it takes data from + # the local brick + slave.server.xtime = types.MethodType(lambda _self, path, uuid: brickserver.xtime(path, uuid + '.' + gconf.slave_id), slave.server) + slave.server.set_xtime = types.MethodType(lambda _self, path, uuid, mark: brickserver.set_xtime(path, uuid + '.' + gconf.slave_id, mark), slave.server) + (g1, g2) = self.gmaster_instantiate_tuple(slave) + g1.master.server = brickserver + g2.master.server = brickserver + else: + (g1, g2) = self.gmaster_instantiate_tuple(slave) + g1.master.server.aggregated = gmaster.master.server + g2.master.server.aggregated = gmaster.master.server + # bad bad bad: bad way to do things like this + # need to make this elegant + # register the crawlers and start crawling + g1.register() + g2.register() + g1.crawlwrap(oneshot=True) + g2.crawlwrap() else: sup(self, *args) @@ -893,13 +1057,18 @@ class SSH(AbstractUrl, SlaveRemote): '^((?:%s@)?%s):(.+)' % tuple([ r.pattern for r in (UserRX, HostRX) ])) self.inner_rsc = parse_url(inner_url) - def canonical_path(self): - m = re.match('([^@]+)@(.+)', self.remote_addr) + @staticmethod + def parse_ssh_address(addr): + m = re.match('([^@]+)@(.+)', addr) if m: u, h = m.groups() else: - u, h = syncdutils.getusername(), self.remote_addr - remote_addr = '@'.join([u, gethostbyname(h)]) + u, h = syncdutils.getusername(), addr + return {'user': u, 'host': h} + + def canonical_path(self): + rap = self.parse_ssh_address(self.remote_addr) + remote_addr = '@'.join([rap['user'], gethostbyname(rap['host'])]) return ':'.join([remote_addr, self.inner_rsc.get_url(canonical=True)]) def can_connect_to(self, remote): -- cgit