From 7d4560cbcdcae0d74cf486c544d5eb58775da51f Mon Sep 17 00:00:00 2001 From: Csaba Henk Date: Wed, 10 Aug 2011 05:02:43 +0300 Subject: gsyncd: do the homework, document _everything_ Change-Id: I559e6a0709b8064cfd54c693e289c741f9c4c4ab BUG: 1570 Reviewed-on: http://review.gluster.com/319 Tested-by: Gluster Build System Reviewed-by: Kaushik BV --- .../marker/utils/syncdaemon/configinterface.py | 41 ++++- xlators/features/marker/utils/syncdaemon/gconf.py | 3 + xlators/features/marker/utils/syncdaemon/gsyncd.py | 17 ++ .../features/marker/utils/syncdaemon/libcxattr.py | 10 ++ xlators/features/marker/utils/syncdaemon/master.py | 124 ++++++++++++- .../features/marker/utils/syncdaemon/monitor.py | 22 +++ xlators/features/marker/utils/syncdaemon/repce.py | 64 +++++++ .../features/marker/utils/syncdaemon/resource.py | 196 +++++++++++++++++++-- .../features/marker/utils/syncdaemon/syncdutils.py | 23 +++ 9 files changed, 483 insertions(+), 17 deletions(-) (limited to 'xlators/features') diff --git a/xlators/features/marker/utils/syncdaemon/configinterface.py b/xlators/features/marker/utils/syncdaemon/configinterface.py index cc8f7063aa9..fbf96c84336 100644 --- a/xlators/features/marker/utils/syncdaemon/configinterface.py +++ b/xlators/features/marker/utils/syncdaemon/configinterface.py @@ -16,6 +16,7 @@ re_type = type(re.compile('')) class MultiDict(object): + """a virtual dict-like class which functions as the union of underlying dicts""" def __init__(self, *dd): self.dicts = dd @@ -31,8 +32,16 @@ class MultiDict(object): class GConffile(object): + """A high-level interface to ConfigParser which flattens the two-tiered + config layout by implenting automatic section dispatch based on initial + parameters. + + Also ensure section ordering in terms of their time of addition -- a compat + hack for Python < 2.7. + """ def _normconfig(self): + """normalize config keys by s/-/_/g""" for n, s in self.config._sections.items(): if n.find('__') == 0: continue @@ -44,6 +53,13 @@ class GConffile(object): self.config._sections[n] = s2 def __init__(self, path, peers, *dd): + """ + - .path: location of config file + - .config: underlying ConfigParser instance + - .peers: on behalf of whom we flatten .config + (master, or master-slave url pair) + - .auxdicts: template subtituents + """ self.peers = peers self.path = path self.auxdicts = dd @@ -52,6 +68,7 @@ class GConffile(object): self._normconfig() def section(self, rx=False): + """get the section name of the section representing .peers in .config""" peers = self.peers if not peers: peers = ['.', '.'] @@ -64,6 +81,9 @@ class GConffile(object): @staticmethod def parse_section(section): + """retrieve peers sequence encoded by section name + (as urls or regexen, depending on section type) + """ sl = section.split() st = sl.pop(0) sl = [unescape(u) for u in sl] @@ -83,7 +103,7 @@ class GConffile(object): also those sections which are not registered in SECT_ORD. - Needed for python 2.{4,5} where ConfigParser + Needed for python 2.{4,5,6} where ConfigParser cannot yet order sections/options internally. """ so = {} @@ -108,6 +128,13 @@ class GConffile(object): return ss def update_to(self, dct, allow_unresolved=False): + """update @dct from key/values of ours. + + key/values are collected from .config by filtering the regexp sections + according to match, and from .section. The values are treated as templates, + which are substituted from .auxdicts and (in case of regexp sections) + match groups. + """ if not self.peers: raise GsyncdError('no peers given, cannot select matching options') def update_from_sect(sect, mud): @@ -136,6 +163,10 @@ class GConffile(object): update_from_sect(self.section(), MultiDict(dct, *self.auxdicts)) def get(self, opt=None): + """print the matching key/value pairs from .config, + or if @opt given, the value for @opt (according to the + logic described in .update_to) + """ d = {} self.update_to(d, allow_unresolved = True) if opt: @@ -150,6 +181,10 @@ class GConffile(object): print("%s: %s" % (k, v)) def write(self, trfn, opt, *a, **kw): + """update on-disk config transactionally + + @trfn is the transaction function + """ def mergeconf(f): self.config = ConfigParser.RawConfigParser() self.config.readfp(f) @@ -163,6 +198,7 @@ class GConffile(object): update_file(self.path, updateconf, mergeconf) def _set(self, opt, val, rx=False): + """set @opt to @val in .section""" sect = self.section(rx) if not self.config.has_section(sect): self.config.add_section(sect) @@ -174,12 +210,15 @@ class GConffile(object): return True def set(self, opt, *a, **kw): + """perform ._set transactionally""" self.write(self._set, opt, *a, **kw) def _delete(self, opt, rx=False): + """delete @opt from .section""" sect = self.section(rx) if self.config.has_section(sect): return self.config.remove_option(sect, opt) def delete(self, opt, *a, **kw): + """perform ._delete transactionally""" self.write(self._delete, opt, *a, **kw) diff --git a/xlators/features/marker/utils/syncdaemon/gconf.py b/xlators/features/marker/utils/syncdaemon/gconf.py index ddbac21e48d..4e3b959fe37 100644 --- a/xlators/features/marker/utils/syncdaemon/gconf.py +++ b/xlators/features/marker/utils/syncdaemon/gconf.py @@ -1,6 +1,9 @@ import os class GConf(object): + """singleton class to store globals + shared between gsyncd modules""" + ssh_ctl_dir = None ssh_ctl_args = None cpid = None diff --git a/xlators/features/marker/utils/syncdaemon/gsyncd.py b/xlators/features/marker/utils/syncdaemon/gsyncd.py index 960b83c1363..c0d39ffd62d 100644 --- a/xlators/features/marker/utils/syncdaemon/gsyncd.py +++ b/xlators/features/marker/utils/syncdaemon/gsyncd.py @@ -21,6 +21,10 @@ import resource from monitor import monitor class GLogger(Logger): + """Logger customizations for gsyncd. + + It implements a log format similar to that of glusterfs. + """ def makeRecord(self, name, level, *a): rv = Logger.makeRecord(self, name, level, *a) @@ -54,6 +58,7 @@ class GLogger(Logger): def startup(**kw): + """set up logging, pidfile grabbing, daemonization""" if getattr(gconf, 'pid_file', None) and kw.get('go_daemon') != 'postconn': if not grabpidfile(): sys.stderr.write("pidfile is taken, exiting.\n") @@ -96,6 +101,7 @@ def startup(**kw): gconf.log_exit = True def main(): + """main routine, signal/exception handling boilerplates""" signal.signal(signal.SIGTERM, lambda *a: finalize(*a, **{'exval': 1})) GLogger.setup() excont = FreeObject(exval = 0) @@ -108,6 +114,17 @@ def main(): finalize(exval = excont.exval) def main_i(): + """internal main routine + + parse command line, decide what action will be taken; + we can either: + - query/manipulate configuration + - format gsyncd urls using gsyncd's url parsing engine + - start service in following modes, in given stages: + - monitor: startup(), monitor() + - master: startup(), connect_remote(), connect(), service_loop() + - slave: startup(), connect(), service_loop() + """ rconf = {'go_daemon': 'should'} def store_abs(opt, optstr, val, parser): diff --git a/xlators/features/marker/utils/syncdaemon/libcxattr.py b/xlators/features/marker/utils/syncdaemon/libcxattr.py index fdc016c47ce..f0a9d22920a 100644 --- a/xlators/features/marker/utils/syncdaemon/libcxattr.py +++ b/xlators/features/marker/utils/syncdaemon/libcxattr.py @@ -3,6 +3,15 @@ from ctypes import * from ctypes.util import find_library class Xattr(object): + """singleton that wraps the extended attribues system + interface for python using ctypes + + Just implement it to the degree we need it, in particular + - we need just the l*xattr variants, ie. we never want symlinks to be + followed + - don't need size discovery for getxattr, as we always know the exact + sizes we expect + """ libc = CDLL(find_library("libc")) @@ -54,6 +63,7 @@ class Xattr(object): @classmethod def llistxattr_buf(cls, path): + """listxattr variant with size discovery""" size = cls.llistxattr(path) if size == -1: cls.raise_oserr() diff --git a/xlators/features/marker/utils/syncdaemon/master.py b/xlators/features/marker/utils/syncdaemon/master.py index e7cb977e8ad..4273bf0c419 100644 --- a/xlators/features/marker/utils/syncdaemon/master.py +++ b/xlators/features/marker/utils/syncdaemon/master.py @@ -14,11 +14,16 @@ from syncdutils import FreeObject, Thread, GsyncdError URXTIME = (-1, 0) class GMaster(object): + """class impementling master role""" KFGN = 0 KNAT = 1 def get_sys_volinfo(self): + """query volume marks on fs root + + err out on multiple foreign masters + """ fgn_vis, nat_vi = self.master.server.foreign_volume_infos(), \ self.master.server.native_volume_info() fgn_vi = None @@ -40,9 +45,20 @@ class GMaster(object): @property def inter_master(self): + """decide if we are an intermediate master + in a cascading setup + """ return self.volinfo_state[self.KFGN] and True or False def xtime(self, path, *a, **opts): + """get amended xtime + + as of amending, we can create missing xtime, or + determine a valid value if what we get is expired + (as of the volume mark expiry); way of amendig + depends on @opts and on subject of query (master + or slave). + """ if a: rsc = a[0] else: @@ -87,7 +103,7 @@ class GMaster(object): self.lastreport = {'crawls': 0, 'turns': 0} self.start = None self.change_seen = None - # the authorative (foreign, native) volinfo pair + # the authoritative (foreign, native) volinfo pair # which lets us deduce what to do when we refetch # the volinfos from system uuid_preset = getattr(gconf, 'volume_id', None) @@ -97,6 +113,7 @@ class GMaster(object): self.terminate = False def crawl_loop(self): + """start the keep-alive thread and iterate .crawl""" timo = int(gconf.timeout or 0) if timo > 0: def keep_alive(): @@ -124,15 +141,24 @@ class GMaster(object): self.crawl() def add_job(self, path, label, job, *a, **kw): + """insert @job function to job table at @path with @label""" if self.jobtab.get(path) == None: self.jobtab[path] = [] self.jobtab[path].append((label, a, lambda : job(*a, **kw))) def add_failjob(self, path, label): + """invoke .add_job with a job that does nothing just fails""" logging.debug('salvaged: ' + label) self.add_job(path, label, lambda: False) def wait(self, path, *args): + """perform jobs registered for @path + + Reset jobtab entry for @path, + determine success as the conjuction of + success of all the jobs. In case of + success, call .sendmark on @path + """ jobs = self.jobtab.pop(path, []) succeed = True for j in jobs: @@ -144,12 +170,29 @@ class GMaster(object): return succeed def sendmark(self, path, mark, adct=None): + """update slave side xtime for @path to master side xtime + + also can send a setattr payload (see Server.setattr). + """ if adct: self.slave.server.setattr(path, adct) self.slave.server.set_xtime(path, self.uuid, mark) @staticmethod def volinfo_state_machine(volinfo_state, volinfo_sys): + """compute new volinfo_state from old one and incoming + as of current system state, also indicating if there was a + change regarding which volume mark is the authoritative one + + @volinfo_state, @volinfo_sys are pairs of volume mark dicts + (foreign, native). + + Note this method is marked as static, ie. the computation is + pure, without reliance on any excess implicit state. State + transitions which are deemed as ambiguous or banned will raise + an exception. + + """ # store the value below "boxed" to emulate proper closures # (variables of the enclosing scope are available inner functions # provided they are no reassigned; mutation is OK). @@ -176,6 +219,43 @@ class GMaster(object): return newstate, param.state_change def crawl(self, path='.', xtl=None): + """crawling... + + Standing around + All the right people + Crawling + Tennis on Tuesday + The ladder is long + It is your nature + You've gotta suntan + Football on Sunday + Society boy + + Recursively walk the master side tree and check if updates are + needed due to xtime differences. One invocation of crawl checks + children of @path and do a recursive enter only on + those directory children where there is an update needed. + + Way of updates depend on file type: + - for symlinks, sync them directy and synchronously + - for regular children, register jobs for @path (cf. .add_job) to start + and wait on their rsync + - for directory children, register a job for @path which waits (.wait) + on jobs for the given child + (other kind of filesystem nodes are not considered) + + Those slave side children which do not exist on master are simply + purged (see Server.purge). + + Behavior is fault tolerant, synchronization is adaptive: if some action fails, + just go on relentlessly, adding a fail job (see .add_failjob) which will prevent + the .sendmark on @path, so when the next crawl will arrive to @path it will not + see it as up-to-date and will try to sync it again. While this semantics can be + supported by funky design principles (http://c2.com/cgi/wiki?LazinessImpatienceHubris), + the ultimate reason which excludes other possibilities is simply transience: we cannot + assert that the file systems (master / slave) underneath do not change and actions + taken upon some condition will not lose their context by the time they are performed. + """ if path == '.': if self.start: self.crawls += 1 @@ -326,14 +406,18 @@ class BoxClosedErr(Exception): pass class PostBox(list): + """synchronized collection for storing things thought of as "requests" """ def __init__(self, *a): list.__init__(self, *a) + # too bad Python stdlib does not have read/write locks... + # it would suffivce to grab the lock in .append as reader, in .close as writer self.lever = Condition() self.open = True self.done = False def wait(self): + """wait on requests to be processed""" self.lever.acquire() if not self.done: self.lever.wait() @@ -341,6 +425,7 @@ class PostBox(list): return self.result def wakeup(self, data): + """wake up requestors with the result""" self.result = data self.lever.acquire() self.done = True @@ -348,6 +433,7 @@ class PostBox(list): self.lever.release() def append(self, e): + """post a request""" self.lever.acquire() if not self.open: raise BoxClosedErr @@ -355,13 +441,43 @@ class PostBox(list): self.lever.release() def close(self): + """prohibit the posting of further requests""" self.lever.acquire() self.open = False self.lever.release() class Syncer(object): + """a staged queue to relay rsync requests to rsync workers + + By "staged queue" its meant that when a consumer comes to the + queue, it takes _all_ entries, leaving the queue empty. + (I don't know if there is an official term for this pattern.) + + The queue uses a PostBox to accumulate incoming items. + When a consumer (rsync worker) comes, a new PostBox is + set up and the old one is passed on to the consumer. + + Instead of the simplistic scheme of having one big lock + which synchronizes both the addition of new items and + PostBox exchanges, use a separate lock to arbitrate consumers, + and rely on PostBox's synchronization mechanisms take + care about additions. + + There is a corner case racy situation, producers vs. consumers, + which is not handled by this scheme: namely, when the PostBox + exchange occurs in between being passed to the producer for posting + and the post placement. But that's what Postbox.close is for: + such a posting will find the PostBox closed, in which case + the producer can re-try posting against the actual PostBox of + the queue. + + To aid accumlation of items in the PostBoxen before grabbed + by an rsync worker, the worker goes to sleep a bit after + each completed syncjob. + """ def __init__(self, slave): + """spawn worker threads""" self.slave = slave self.lock = Lock() self.pb = PostBox() @@ -370,6 +486,7 @@ class Syncer(object): t.start() def syncjob(self): + """the life of a worker""" while True: pb = None while True: @@ -393,8 +510,9 @@ class Syncer(object): def add(self, e): while True: + pb = self.pb try: - self.pb.append(e) - return self.pb + pb.append(e) + return pb except BoxClosedErr: pass diff --git a/xlators/features/marker/utils/syncdaemon/monitor.py b/xlators/features/marker/utils/syncdaemon/monitor.py index 365e91435fd..b8e9219dc47 100644 --- a/xlators/features/marker/utils/syncdaemon/monitor.py +++ b/xlators/features/marker/utils/syncdaemon/monitor.py @@ -8,11 +8,14 @@ from gconf import gconf from syncdutils import update_file class Monitor(object): + """class which spawns and manages gsyncd workers""" def __init__(self): self.state = None def set_state(self, state): + """set the state that can be used by external agents + like glusterd for status reporting""" if state == self.state: return self.state = state @@ -21,6 +24,24 @@ class Monitor(object): update_file(gconf.state_file, lambda f: f.write(state + '\n')) def monitor(self): + """the monitor loop + + Basic logic is a blantantly simple blunt heuristics: + if spawned client survives 60 secs, it's considered OK. + This servers us pretty well as it's not vulneralbe to + any kind of irregular behavior of the child... + + ... well, except for one: if children is hung up on + waiting for some event, it can survive aeons, still + will be defunct. So we tweak the above logic to + expect the worker to send us a signal within 60 secs + (in the form of closing its end of a pipe). The worker + does this when it's done with the setup stage + ready to enter the service loop (note it's the setup + stage which is vulnerable to hangs -- the full + blown worker blows up on EPIPE if the net goes down, + due to the keep-alive thread) + """ argv = sys.argv[:] for o in ('-N', '--no-daemon', '--monitor'): while o in argv: @@ -77,4 +98,5 @@ class Monitor(object): return ret def monitor(): + """oh yeah, actually Monitor is used as singleton, too""" return Monitor().monitor() diff --git a/xlators/features/marker/utils/syncdaemon/repce.py b/xlators/features/marker/utils/syncdaemon/repce.py index 47691301e29..9473524909a 100644 --- a/xlators/features/marker/utils/syncdaemon/repce.py +++ b/xlators/features/marker/utils/syncdaemon/repce.py @@ -36,21 +36,39 @@ def ioparse(i, o): return (i, o) def send(out, *args): + """pickle args and write out wholly in one syscall + + ie. not use the ability of pickle to dump directly to + a stream, as that would potentially mess up messages + by interleaving them + """ os.write(out, pickle.dumps(args, pickle_proto)) def recv(inf): + """load an object from input stream""" return pickle.load(inf) class RepceServer(object): + """RePCe is Hungarian for canola, http://hu.wikipedia.org/wiki/Repce + + ... also our homebrewed RPC backend where the transport layer is + reduced to a pair of filehandles. + + This is the server component. + """ def __init__(self, obj, i, o, wnum=6): + """register a backend object .obj to which incoming messages + are dispatched, also incoming/outcoming streams + """ self.obj = obj self.inf, self.out = ioparse(i, o) self.wnum = wnum self.q = Queue() def service_loop(self): + """fire up worker threads, get messages and dispatch among them""" for i in range(self.wnum): t = Thread(target=self.worker) t.start() @@ -61,6 +79,15 @@ class RepceServer(object): logging.info("terminating on reaching EOF.") def worker(self): + """life of a worker + + Get message, extract its id, method name and arguments + (kwargs not supported), call method on .obj. + Send back message id + return value. + If method call throws an exception, rescue it, and send + back the exception as result (with flag marking it as + exception). + """ while True: in_data = self.q.get(True) rid = in_data[0] @@ -79,8 +106,14 @@ class RepceServer(object): class RepceJob(object): + """class representing message status we can use + for waiting on reply""" def __init__(self, cbk): + """ + - .rid: (process-wise) unique id + - .cbk: what we do upon receiving reply + """ self.rid = (os.getpid(), thread.get_ident(), time.time()) self.cbk = cbk self.lever = Condition() @@ -105,6 +138,13 @@ class RepceJob(object): class RepceClient(object): + """RePCe is Hungarian for canola, http://hu.wikipedia.org/wiki/Repce + + ... also our homebrewed RPC backend where the transport layer is + reduced to a pair of filehandles. + + This is the client component. + """ def __init__(self, i, o): self.inf, self.out = ioparse(i, o) @@ -121,6 +161,11 @@ class RepceClient(object): rjob.cbk(rjob, [exc, res]) def push(self, meth, *args, **kw): + """wrap arguments in a RepceJob, send them to server + and return the RepceJob + + @cbk to pass on RepceJob can be given as kwarg. + """ cbk = kw.get('cbk') if not cbk: def cbk(rj, res): @@ -133,6 +178,11 @@ class RepceClient(object): return rjob def __call__(self, meth, *args): + """RePCe client is callabe, calling it implements a synchronous remote call + + We do a .push with a cbk which does a wakeup upon receiving anwser, then wait + on the RepceJob. + """ rjob = self.push(meth, *args, **{'cbk': lambda rj, res: rj.wakeup(res)}) exc, res = rjob.wait() if exc: @@ -142,7 +192,11 @@ class RepceClient(object): return res class mprx(object): + """method proxy, standard trick to implement rubyesque method_missing + in Python + A class is a closure factory, you know what I mean, or go read some SICP. + """ def __init__(self, ins, meth): self.ins = ins self.meth = meth @@ -151,9 +205,19 @@ class RepceClient(object): return self.ins(self.meth, *a) def __getattr__(self, meth): + """this implements transparent method dispatch to remote object, + so that you don't need to call the RepceClient instance like + + rclient('how_old_are_you_if_born_in', 1979) + + but you can make it into an ordinary method call like + + rclient.how_old_are_you_if_born_in(1979) + """ return self.mprx(self, meth) def __version__(self): + """used in handshake to verify compatibility""" d = {'proto': self('__repce_version__')} try: d['object'] = self('version') diff --git a/xlators/features/marker/utils/syncdaemon/resource.py b/xlators/features/marker/utils/syncdaemon/resource.py index 66600fdad43..f92e8573409 100644 --- a/xlators/features/marker/utils/syncdaemon/resource.py +++ b/xlators/features/marker/utils/syncdaemon/resource.py @@ -26,9 +26,19 @@ HostRX = re.compile('[a-z\d](?:[a-z\d.-]*[a-z\d])?', re.I) UserRX = re.compile("[\w!\#$%&'*+-\/=?^_`{|}~]+") def sup(x, *a, **kw): + """a rubyesque "super" for python ;) + + invoke caller method in parent class with given args. + """ return getattr(super(type(x), x), sys._getframe(1).f_code.co_name)(*a, **kw) def desugar(ustr): + """transform sugared url strings to standard :// form + + parsing logic enforces the constraint that sugared forms should contatin + a ':' or a '/', which ensures that sugared urls do not conflict with + gluster volume names. + """ m = re.match('([^:]*):(.*)', ustr) if m: if not m.groups()[0]: @@ -46,6 +56,7 @@ def desugar(ustr): return "file://" + ap def gethostbyname(hnam): + """gethostbyname wrapper""" try: return socket.gethostbyname(hnam) except socket.gaierror: @@ -54,6 +65,11 @@ def gethostbyname(hnam): (hnam, ex.strerror)) def parse_url(ustr): + """instantiate an url object by scheme-to-class dispatch + + The url classes taken into consideration are the ones in + this module whose names are full-caps. + """ m = UrlRX.match(ustr) if not m: ustr = desugar(ustr) @@ -68,8 +84,17 @@ def parse_url(ustr): class _MetaXattr(object): + """singleton class, a lazy wrapper around the + libcxattr module + + libcxattr (a heavy import due to ctypes) is + loaded only when when the single + instance is tried to be used. - # load Xattr stuff on-demand + This reduces runtime for those invocations + which do not need filesystem manipulation + (eg. for config, url parsing) + """ def __getattr__(self, meth): from libcxattr import Xattr as LXattr @@ -84,14 +109,17 @@ Xattr = _MetaXattr() class Popen(subprocess.Popen): + """customized subclass of subprocess.Popen with a ring + buffer for children error ouput""" @classmethod def init_errhandler(cls): + """start the thread which hanldes children's error output""" cls.errstore = {} def tailer(): while True: for po in select.select([po.stderr for po in cls.errstore], [], []): - po.lock() + po.lock.acquire() try: la = cls.errstore.get(po) if la == None: @@ -103,23 +131,22 @@ class Popen(subprocess.Popen): while tots > 1<<20 and la: tots -= len(la.pop(0)) finally: - po.unlock() + po.lock.release() t = syncdutils.Thread(target = tailer) t.start() cls.errhandler = t - def lock(self): - self._lock.acquire() - - def unlock(self): - self._lock.release() - def __init__(self, args, *a, **kw): - """subprocess.Popen wrapper with error-handling""" + """customizations for subprocess.Popen instantiation + + - 'close_fds' is taken to be the default + - if child's stderr is chosen to be managed, + register it with the error handler thread + """ self.args = args if 'close_fds' not in kw: kw['close_fds'] = True - self._lock = threading.Lock() + self.lock = threading.Lock() try: sup(self, args, *a, **kw) except: @@ -133,6 +160,7 @@ class Popen(subprocess.Popen): self.errstore[self] = [] def errfail(self): + """fail nicely if child did not terminate with success""" filling = None if self.elines: filling = ", saying:" @@ -144,11 +172,15 @@ class Popen(subprocess.Popen): syncdutils.finalize(exval = 1) def terminate_geterr(self, fail_on_err = True): - self.lock() + """kill child, finalize stderr harvesting (unregister + from errhandler, set up .elines), fail on error if + asked for + """ + self.lock.acquire() try: elines = self.errstore.pop(self) finally: - self.unlock() + self.lock.release() if self.poll() == None: self.terminate() if sp.poll() == None: @@ -167,6 +199,12 @@ class Popen(subprocess.Popen): class Server(object): + """singleton implemening those filesystem access primitives + which are needed for geo-replication functionality + + (Singleton in the sense it's a class which has only static + and classmethods and is used directly, without instantiation.) + """ GX_NSPACE = "trusted.glusterfs" NTV_FMTSTR = "!" + "B"*19 + "II" @@ -175,6 +213,7 @@ class Server(object): @staticmethod def entries(path): + """directory entries in an array""" # prevent symlinks being followed if not stat.S_ISDIR(os.lstat(path).st_mode): raise OSError(ENOTDIR, os.strerror(ENOTDIR)) @@ -182,6 +221,20 @@ class Server(object): @classmethod def purge(cls, path, entries=None): + """force-delete subtrees + + If @entries is not specified, delete + the whole subtree under @path (including + @path). + + Otherwise, @entries should be a + a sequence of children of @path, and + the effect is identical with a joint + @entries-less purge on them, ie. + + for e in entries: + cls.purge(os.path.join(path, e)) + """ me_also = entries == None if not entries: try: @@ -216,6 +269,7 @@ class Server(object): @classmethod def _create(cls, path, ctor): + """path creation backend routine""" try: ctor(path) except OSError: @@ -235,6 +289,13 @@ class Server(object): @classmethod def xtime(cls, path, uuid): + """query xtime extended attribute + + Return xtime of @path for @uuid as a pair of integers. + "Normal" errors due to non-existent @path or extended attribute + are tolerated and errno is returned in such a case. + """ + try: return struct.unpack('!II', Xattr.lgetxattr(path, '.'.join([cls.GX_NSPACE, uuid, 'xtime']), 8)) except OSError: @@ -246,10 +307,17 @@ class Server(object): @classmethod def set_xtime(cls, path, uuid, mark): + """set @mark as xtime for @uuid on @path""" Xattr.lsetxattr(path, '.'.join([cls.GX_NSPACE, uuid, 'xtime']), struct.pack('!II', *mark)) @staticmethod def setattr(path, adct): + """set file attributes + + @adct is a dict, where 'own', 'mode' and 'times' + keys are looked for and values used to perform + chown, chmod or utimes on @path. + """ own = adct.get('own') if own: os.lchown(path, *own) @@ -267,6 +335,14 @@ class Server(object): last_keep_alive = 0 @classmethod def keep_alive(cls, dct): + """process keepalive messages. + + Return keep-alive counter (number of received keep-alive + messages). + + Now the "keep-alive" message can also have a payload which is + used to set a foreign volume-mark on the underlying file system. + """ if dct: key = '.'.join([cls.GX_NSPACE, 'volume-mark', dct['uuid']]) val = struct.pack(cls.FRGN_FMTSTR, @@ -279,15 +355,30 @@ class Server(object): @staticmethod def version(): + """version used in handshake""" return 1.0 class SlaveLocal(object): + """mix-in class to implement some factes of a slave server + + ("mix-in" is sort of like "abstract class", ie. it's not + instantiated just included in the ancesty DAG. I use "mix-in" + to indicate that it's not used as an abstract base class, + rather just taken in to implement additional functionality + on the basis of the assumed availability of certain interfaces.) + """ def can_connect_to(self, remote): + """determine our position in the connectibility matrix""" return not remote def service_loop(self): + """start a RePCe server serving self's server + + stop servicing if a timeout is configured and got no + keep-alime in that inteval + """ repce = RepceServer(self.server, sys.stdin, sys.stdout, int(gconf.sync_jobs)) t = syncdutils.Thread(target=lambda: (repce.service_loop(), syncdutils.finalize())) @@ -304,8 +395,15 @@ class SlaveLocal(object): select.select((), (), ()) class SlaveRemote(object): + """mix-in class to implement an interface to a remote slave""" def connect_remote(self, rargs=[], **opts): + """connects to a remote slave + + Invoke an auxiliary utility (slave gsyncd, possibly wrapped) + which sets up the connection and set up a RePCe client to + communicate throuh its stdio. + """ slave = opts.get('slave', self.url) so = getattr(gconf, 'session_owner', None) if so: @@ -319,6 +417,11 @@ class SlaveRemote(object): return self.start_fd_client(po.stdout, po.stdin, **opts) def start_fd_client(self, i, o, **opts): + """set up RePCe client, handshake with server + + It's cut out as a separate method to let + subclasses hook into client startup + """ self.server = RepceClient(i, o) rv = self.server.__version__() exrv = {'proto': repce.repce_version, 'object': Server.version()} @@ -331,6 +434,7 @@ class SlaveRemote(object): raise GsyncdError("RePCe major version mismatch: local %s, remote %s" % (exrv, rv)) def rsync(self, files, *args): + """invoke rsync""" if not files: raise GsyncdError("no files to sync") logging.debug("files: " + ", ".join(files)) @@ -342,6 +446,7 @@ class SlaveRemote(object): class AbstractUrl(object): + """abstract base class for url scheme classes""" def __init__(self, path, pattern): m = re.search(pattern, path) @@ -358,6 +463,7 @@ class AbstractUrl(object): return self.path def get_url(self, canonical=False, escaped=False): + """format self's url in various styles""" if canonical: pa = self.canonical_path() else: @@ -376,8 +482,15 @@ class AbstractUrl(object): class FILE(AbstractUrl, SlaveLocal, SlaveRemote): + """scheme class for file:// urls + + can be used to represent a file slave server + on slave side, or interface to a remote file + file server on master side + """ class FILEServer(Server): + """included server flavor""" pass server = FILEServer @@ -386,6 +499,7 @@ class FILE(AbstractUrl, SlaveLocal, SlaveRemote): sup(self, path, '^/') def connect(self): + """inhibit the resource beyond""" os.chdir(self.path) def rsync(self, files): @@ -393,11 +507,21 @@ class FILE(AbstractUrl, SlaveLocal, SlaveRemote): class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote): + """scheme class for gluster:// urls + + can be used to represent a gluster slave server + on slave side, or interface to a remote gluster + slave on master side, or to represent master + (slave-ish features come from the mixins, master + functionality is outsourced to GMaster from master) + """ class GLUSTERServer(Server): + "server enhancements for a glusterfs backend""" @classmethod def _attr_unpack_dict(cls, xattr, extra_fields = ''): + """generic volume mark fetching/parsing backed""" fmt_string = cls.NTV_FMTSTR + extra_fields buf = Xattr.lgetxattr('.', xattr, struct.calcsize(fmt_string)) vm = struct.unpack(fmt_string, buf) @@ -415,6 +539,7 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote): @classmethod def foreign_volume_infos(cls): + """return list of valid (not expired) foreign volume marks""" dict_list = [] xattr_list = Xattr.llistxattr_buf('.') for ele in xattr_list: @@ -434,6 +559,7 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote): @classmethod def native_volume_info(cls): + """get the native volume mark of the underlying gluster volume""" try: return cls._attr_unpack_dict('.'.join([cls.GX_NSPACE, 'volume-mark'])) except OSError: @@ -450,9 +576,17 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote): return ':'.join([gethostbyname(self.host), self.volume]) def can_connect_to(self, remote): + """determine our position in the connectibility matrix""" return True def connect(self): + """inhibit the resource beyond + + - create temprorary mount point + - call glusterfs to mount the volume over there + - change to mounted fs root + - lazy umount + delete temp. mount point + """ def umount_l(d): po = Popen(['umount', '-l', d], stderr=subprocess.PIPE) po.wait() @@ -486,6 +620,13 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote): self.slavedir = "/proc/%d/cwd" % self.server.pid() def service_loop(self, *args): + """enter service loop + + - if slave given, instantiate GMaster and + pass control to that instance, which implements + master behavior + - else do that's what's inherited + """ if args: GMaster(self, args[0]).crawl_loop() else: @@ -496,6 +637,11 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote): class SSH(AbstractUrl, SlaveRemote): + """scheme class for ssh:// urls + + interface to remote slave on master side + implementing an ssh based proxy + """ def __init__(self, path): self.remote_addr, inner_url = sup(self, path, @@ -512,9 +658,16 @@ class SSH(AbstractUrl, SlaveRemote): return ':'.join([remote_addr, self.inner_rsc.get_url(canonical=True)]) def can_connect_to(self, remote): + """determine our position in the connectibility matrix""" return False def start_fd_client(self, *a, **opts): + """customizations for client startup + + - be a no-op if we are to daemonize (client startup is deferred + to post-daemon stage) + - determine target url for rsync after consulting server + """ if opts.get('deferred'): return a sup(self, *a) @@ -528,6 +681,23 @@ class SSH(AbstractUrl, SlaveRemote): self.slaveurl = ':'.join([self.remote_addr, slavepath]) def connect_remote(self, go_daemon=None): + """connect to inner slave url through outer ssh url + + Wrap the connecting utility in ssh. + + Much care is put into daemonizing: in that case + ssh is started before daemonization, but + RePCe client is to be created after that (as ssh + interactive password auth would be defeated by + a daemonized ssh, while client should be present + only in the final process). In that case the action + is taken apart to two parts, this method is ivoked + once pre-daemon, once post-daemon. Use @go_daemon + to deiced what part to perform. + + [NB. ATM gluster product does not makes use of interactive + authentication.] + """ if go_daemon == 'done': return self.start_fd_client(*self.fd_pair) gconf.setup_ssh_ctl(tempfile.mkdtemp(prefix='gsyncd-aux-ssh-')) diff --git a/xlators/features/marker/utils/syncdaemon/syncdutils.py b/xlators/features/marker/utils/syncdaemon/syncdutils.py index 35afe64e931..244e29628e0 100644 --- a/xlators/features/marker/utils/syncdaemon/syncdutils.py +++ b/xlators/features/marker/utils/syncdaemon/syncdutils.py @@ -19,9 +19,12 @@ except ImportError: import urllib def escape(s): + """the chosen flavor of string escaping, used all over + to turn whatever data to creatable representation""" return urllib.quote_plus(s) def unescape(s): + """inverse of .escape""" return urllib.unquote_plus(s) def norm(s): @@ -59,6 +62,10 @@ def update_file(path, updater, merger = lambda f: True): fx.close() def grabfile(fname, content=None): + """open @fname + contest for its fcntl lock + + @content: if given, set the file content to it + """ # damn those messy open() mode codes fd = os.open(fname, os.O_CREAT|os.O_RDWR) f = os.fdopen(fd, 'r+b', 0) @@ -82,6 +89,7 @@ def grabfile(fname, content=None): return f def grabpidfile(fname=None, setpid=True): + """.grabfile customization for pid files""" if not fname: fname = gconf.pid_file content = None @@ -92,6 +100,10 @@ def grabpidfile(fname=None, setpid=True): final_lock = Lock() def finalize(*a, **kw): + """all those messy final steps we go trough upon termination + + Do away with pidfile, ssh control dir and logging. + """ final_lock.acquire() if getattr(gconf, 'pid_file', None): rm_pidf = gconf.pid_file_owned @@ -126,6 +138,12 @@ def finalize(*a, **kw): os._exit(kw.get('exval', 0)) def log_raise_exception(excont): + """top-level exception handler + + Try to some fancy things to cover up we face with an error. + Translate some weird sounding but well understood exceptions + into human-friendly lingo + """ is_filelog = False for h in logging.getLogger().handlers: fno = getattr(getattr(h, 'stream', None), 'fileno', None) @@ -170,7 +188,12 @@ class FreeObject(object): setattr(self, k, v) class Thread(baseThread): + """thread class flavor for gsyncd + - always a daemon thread + - force exit for whole program if thread + function coughs up an exception + """ def __init__(self, *a, **kw): tf = kw.get('target') if tf: -- cgit