From 7d4560cbcdcae0d74cf486c544d5eb58775da51f Mon Sep 17 00:00:00 2001 From: Csaba Henk Date: Wed, 10 Aug 2011 05:02:43 +0300 Subject: gsyncd: do the homework, document _everything_ Change-Id: I559e6a0709b8064cfd54c693e289c741f9c4c4ab BUG: 1570 Reviewed-on: http://review.gluster.com/319 Tested-by: Gluster Build System Reviewed-by: Kaushik BV --- .../features/marker/utils/syncdaemon/resource.py | 196 +++++++++++++++++++-- 1 file changed, 183 insertions(+), 13 deletions(-) (limited to 'xlators/features/marker/utils/syncdaemon/resource.py') diff --git a/xlators/features/marker/utils/syncdaemon/resource.py b/xlators/features/marker/utils/syncdaemon/resource.py index 66600fdad43..f92e8573409 100644 --- a/xlators/features/marker/utils/syncdaemon/resource.py +++ b/xlators/features/marker/utils/syncdaemon/resource.py @@ -26,9 +26,19 @@ HostRX = re.compile('[a-z\d](?:[a-z\d.-]*[a-z\d])?', re.I) UserRX = re.compile("[\w!\#$%&'*+-\/=?^_`{|}~]+") def sup(x, *a, **kw): + """a rubyesque "super" for python ;) + + invoke caller method in parent class with given args. + """ return getattr(super(type(x), x), sys._getframe(1).f_code.co_name)(*a, **kw) def desugar(ustr): + """transform sugared url strings to standard :// form + + parsing logic enforces the constraint that sugared forms should contatin + a ':' or a '/', which ensures that sugared urls do not conflict with + gluster volume names. + """ m = re.match('([^:]*):(.*)', ustr) if m: if not m.groups()[0]: @@ -46,6 +56,7 @@ def desugar(ustr): return "file://" + ap def gethostbyname(hnam): + """gethostbyname wrapper""" try: return socket.gethostbyname(hnam) except socket.gaierror: @@ -54,6 +65,11 @@ def gethostbyname(hnam): (hnam, ex.strerror)) def parse_url(ustr): + """instantiate an url object by scheme-to-class dispatch + + The url classes taken into consideration are the ones in + this module whose names are full-caps. + """ m = UrlRX.match(ustr) if not m: ustr = desugar(ustr) @@ -68,8 +84,17 @@ def parse_url(ustr): class _MetaXattr(object): + """singleton class, a lazy wrapper around the + libcxattr module + + libcxattr (a heavy import due to ctypes) is + loaded only when when the single + instance is tried to be used. - # load Xattr stuff on-demand + This reduces runtime for those invocations + which do not need filesystem manipulation + (eg. for config, url parsing) + """ def __getattr__(self, meth): from libcxattr import Xattr as LXattr @@ -84,14 +109,17 @@ Xattr = _MetaXattr() class Popen(subprocess.Popen): + """customized subclass of subprocess.Popen with a ring + buffer for children error ouput""" @classmethod def init_errhandler(cls): + """start the thread which hanldes children's error output""" cls.errstore = {} def tailer(): while True: for po in select.select([po.stderr for po in cls.errstore], [], []): - po.lock() + po.lock.acquire() try: la = cls.errstore.get(po) if la == None: @@ -103,23 +131,22 @@ class Popen(subprocess.Popen): while tots > 1<<20 and la: tots -= len(la.pop(0)) finally: - po.unlock() + po.lock.release() t = syncdutils.Thread(target = tailer) t.start() cls.errhandler = t - def lock(self): - self._lock.acquire() - - def unlock(self): - self._lock.release() - def __init__(self, args, *a, **kw): - """subprocess.Popen wrapper with error-handling""" + """customizations for subprocess.Popen instantiation + + - 'close_fds' is taken to be the default + - if child's stderr is chosen to be managed, + register it with the error handler thread + """ self.args = args if 'close_fds' not in kw: kw['close_fds'] = True - self._lock = threading.Lock() + self.lock = threading.Lock() try: sup(self, args, *a, **kw) except: @@ -133,6 +160,7 @@ class Popen(subprocess.Popen): self.errstore[self] = [] def errfail(self): + """fail nicely if child did not terminate with success""" filling = None if self.elines: filling = ", saying:" @@ -144,11 +172,15 @@ class Popen(subprocess.Popen): syncdutils.finalize(exval = 1) def terminate_geterr(self, fail_on_err = True): - self.lock() + """kill child, finalize stderr harvesting (unregister + from errhandler, set up .elines), fail on error if + asked for + """ + self.lock.acquire() try: elines = self.errstore.pop(self) finally: - self.unlock() + self.lock.release() if self.poll() == None: self.terminate() if sp.poll() == None: @@ -167,6 +199,12 @@ class Popen(subprocess.Popen): class Server(object): + """singleton implemening those filesystem access primitives + which are needed for geo-replication functionality + + (Singleton in the sense it's a class which has only static + and classmethods and is used directly, without instantiation.) + """ GX_NSPACE = "trusted.glusterfs" NTV_FMTSTR = "!" + "B"*19 + "II" @@ -175,6 +213,7 @@ class Server(object): @staticmethod def entries(path): + """directory entries in an array""" # prevent symlinks being followed if not stat.S_ISDIR(os.lstat(path).st_mode): raise OSError(ENOTDIR, os.strerror(ENOTDIR)) @@ -182,6 +221,20 @@ class Server(object): @classmethod def purge(cls, path, entries=None): + """force-delete subtrees + + If @entries is not specified, delete + the whole subtree under @path (including + @path). + + Otherwise, @entries should be a + a sequence of children of @path, and + the effect is identical with a joint + @entries-less purge on them, ie. + + for e in entries: + cls.purge(os.path.join(path, e)) + """ me_also = entries == None if not entries: try: @@ -216,6 +269,7 @@ class Server(object): @classmethod def _create(cls, path, ctor): + """path creation backend routine""" try: ctor(path) except OSError: @@ -235,6 +289,13 @@ class Server(object): @classmethod def xtime(cls, path, uuid): + """query xtime extended attribute + + Return xtime of @path for @uuid as a pair of integers. + "Normal" errors due to non-existent @path or extended attribute + are tolerated and errno is returned in such a case. + """ + try: return struct.unpack('!II', Xattr.lgetxattr(path, '.'.join([cls.GX_NSPACE, uuid, 'xtime']), 8)) except OSError: @@ -246,10 +307,17 @@ class Server(object): @classmethod def set_xtime(cls, path, uuid, mark): + """set @mark as xtime for @uuid on @path""" Xattr.lsetxattr(path, '.'.join([cls.GX_NSPACE, uuid, 'xtime']), struct.pack('!II', *mark)) @staticmethod def setattr(path, adct): + """set file attributes + + @adct is a dict, where 'own', 'mode' and 'times' + keys are looked for and values used to perform + chown, chmod or utimes on @path. + """ own = adct.get('own') if own: os.lchown(path, *own) @@ -267,6 +335,14 @@ class Server(object): last_keep_alive = 0 @classmethod def keep_alive(cls, dct): + """process keepalive messages. + + Return keep-alive counter (number of received keep-alive + messages). + + Now the "keep-alive" message can also have a payload which is + used to set a foreign volume-mark on the underlying file system. + """ if dct: key = '.'.join([cls.GX_NSPACE, 'volume-mark', dct['uuid']]) val = struct.pack(cls.FRGN_FMTSTR, @@ -279,15 +355,30 @@ class Server(object): @staticmethod def version(): + """version used in handshake""" return 1.0 class SlaveLocal(object): + """mix-in class to implement some factes of a slave server + + ("mix-in" is sort of like "abstract class", ie. it's not + instantiated just included in the ancesty DAG. I use "mix-in" + to indicate that it's not used as an abstract base class, + rather just taken in to implement additional functionality + on the basis of the assumed availability of certain interfaces.) + """ def can_connect_to(self, remote): + """determine our position in the connectibility matrix""" return not remote def service_loop(self): + """start a RePCe server serving self's server + + stop servicing if a timeout is configured and got no + keep-alime in that inteval + """ repce = RepceServer(self.server, sys.stdin, sys.stdout, int(gconf.sync_jobs)) t = syncdutils.Thread(target=lambda: (repce.service_loop(), syncdutils.finalize())) @@ -304,8 +395,15 @@ class SlaveLocal(object): select.select((), (), ()) class SlaveRemote(object): + """mix-in class to implement an interface to a remote slave""" def connect_remote(self, rargs=[], **opts): + """connects to a remote slave + + Invoke an auxiliary utility (slave gsyncd, possibly wrapped) + which sets up the connection and set up a RePCe client to + communicate throuh its stdio. + """ slave = opts.get('slave', self.url) so = getattr(gconf, 'session_owner', None) if so: @@ -319,6 +417,11 @@ class SlaveRemote(object): return self.start_fd_client(po.stdout, po.stdin, **opts) def start_fd_client(self, i, o, **opts): + """set up RePCe client, handshake with server + + It's cut out as a separate method to let + subclasses hook into client startup + """ self.server = RepceClient(i, o) rv = self.server.__version__() exrv = {'proto': repce.repce_version, 'object': Server.version()} @@ -331,6 +434,7 @@ class SlaveRemote(object): raise GsyncdError("RePCe major version mismatch: local %s, remote %s" % (exrv, rv)) def rsync(self, files, *args): + """invoke rsync""" if not files: raise GsyncdError("no files to sync") logging.debug("files: " + ", ".join(files)) @@ -342,6 +446,7 @@ class SlaveRemote(object): class AbstractUrl(object): + """abstract base class for url scheme classes""" def __init__(self, path, pattern): m = re.search(pattern, path) @@ -358,6 +463,7 @@ class AbstractUrl(object): return self.path def get_url(self, canonical=False, escaped=False): + """format self's url in various styles""" if canonical: pa = self.canonical_path() else: @@ -376,8 +482,15 @@ class AbstractUrl(object): class FILE(AbstractUrl, SlaveLocal, SlaveRemote): + """scheme class for file:// urls + + can be used to represent a file slave server + on slave side, or interface to a remote file + file server on master side + """ class FILEServer(Server): + """included server flavor""" pass server = FILEServer @@ -386,6 +499,7 @@ class FILE(AbstractUrl, SlaveLocal, SlaveRemote): sup(self, path, '^/') def connect(self): + """inhibit the resource beyond""" os.chdir(self.path) def rsync(self, files): @@ -393,11 +507,21 @@ class FILE(AbstractUrl, SlaveLocal, SlaveRemote): class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote): + """scheme class for gluster:// urls + + can be used to represent a gluster slave server + on slave side, or interface to a remote gluster + slave on master side, or to represent master + (slave-ish features come from the mixins, master + functionality is outsourced to GMaster from master) + """ class GLUSTERServer(Server): + "server enhancements for a glusterfs backend""" @classmethod def _attr_unpack_dict(cls, xattr, extra_fields = ''): + """generic volume mark fetching/parsing backed""" fmt_string = cls.NTV_FMTSTR + extra_fields buf = Xattr.lgetxattr('.', xattr, struct.calcsize(fmt_string)) vm = struct.unpack(fmt_string, buf) @@ -415,6 +539,7 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote): @classmethod def foreign_volume_infos(cls): + """return list of valid (not expired) foreign volume marks""" dict_list = [] xattr_list = Xattr.llistxattr_buf('.') for ele in xattr_list: @@ -434,6 +559,7 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote): @classmethod def native_volume_info(cls): + """get the native volume mark of the underlying gluster volume""" try: return cls._attr_unpack_dict('.'.join([cls.GX_NSPACE, 'volume-mark'])) except OSError: @@ -450,9 +576,17 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote): return ':'.join([gethostbyname(self.host), self.volume]) def can_connect_to(self, remote): + """determine our position in the connectibility matrix""" return True def connect(self): + """inhibit the resource beyond + + - create temprorary mount point + - call glusterfs to mount the volume over there + - change to mounted fs root + - lazy umount + delete temp. mount point + """ def umount_l(d): po = Popen(['umount', '-l', d], stderr=subprocess.PIPE) po.wait() @@ -486,6 +620,13 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote): self.slavedir = "/proc/%d/cwd" % self.server.pid() def service_loop(self, *args): + """enter service loop + + - if slave given, instantiate GMaster and + pass control to that instance, which implements + master behavior + - else do that's what's inherited + """ if args: GMaster(self, args[0]).crawl_loop() else: @@ -496,6 +637,11 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote): class SSH(AbstractUrl, SlaveRemote): + """scheme class for ssh:// urls + + interface to remote slave on master side + implementing an ssh based proxy + """ def __init__(self, path): self.remote_addr, inner_url = sup(self, path, @@ -512,9 +658,16 @@ class SSH(AbstractUrl, SlaveRemote): return ':'.join([remote_addr, self.inner_rsc.get_url(canonical=True)]) def can_connect_to(self, remote): + """determine our position in the connectibility matrix""" return False def start_fd_client(self, *a, **opts): + """customizations for client startup + + - be a no-op if we are to daemonize (client startup is deferred + to post-daemon stage) + - determine target url for rsync after consulting server + """ if opts.get('deferred'): return a sup(self, *a) @@ -528,6 +681,23 @@ class SSH(AbstractUrl, SlaveRemote): self.slaveurl = ':'.join([self.remote_addr, slavepath]) def connect_remote(self, go_daemon=None): + """connect to inner slave url through outer ssh url + + Wrap the connecting utility in ssh. + + Much care is put into daemonizing: in that case + ssh is started before daemonization, but + RePCe client is to be created after that (as ssh + interactive password auth would be defeated by + a daemonized ssh, while client should be present + only in the final process). In that case the action + is taken apart to two parts, this method is ivoked + once pre-daemon, once post-daemon. Use @go_daemon + to deiced what part to perform. + + [NB. ATM gluster product does not makes use of interactive + authentication.] + """ if go_daemon == 'done': return self.start_fd_client(*self.fd_pair) gconf.setup_ssh_ctl(tempfile.mkdtemp(prefix='gsyncd-aux-ssh-')) -- cgit