From 950371be29d029179ac5cd0ad2dfdbfcd4467b96 Mon Sep 17 00:00:00 2001 From: Avra Sengupta Date: Mon, 27 May 2013 22:23:57 +0530 Subject: move 'xlators/marker/utils/' to 'geo-replication/' directory Change-Id: Ibd0faefecc15b6713eda28bc96794ae58aff45aa BUG: 847839 Original Author: Amar Tumballi Signed-off-by: Avra Sengupta Reviewed-on: http://review.gluster.org/5133 Tested-by: Gluster Build System Reviewed-by: Vijay Bellur --- xlators/features/marker/utils/syncdaemon/master.py | 961 --------------------- 1 file changed, 961 deletions(-) delete mode 100644 xlators/features/marker/utils/syncdaemon/master.py (limited to 'xlators/features/marker/utils/syncdaemon/master.py') diff --git a/xlators/features/marker/utils/syncdaemon/master.py b/xlators/features/marker/utils/syncdaemon/master.py deleted file mode 100644 index f903f3059..000000000 --- a/xlators/features/marker/utils/syncdaemon/master.py +++ /dev/null @@ -1,961 +0,0 @@ -import os -import sys -import time -import stat -import random -import signal -import logging -import socket -import errno -import re -from errno import ENOENT, ENODATA, EPIPE -from threading import currentThread, Condition, Lock -from datetime import datetime -try: - from hashlib import md5 as md5 -except ImportError: - # py 2.4 - from md5 import new as md5 - -from gconf import gconf -from syncdutils import FreeObject, Thread, GsyncdError, boolify, \ - escape, unescape, select - -URXTIME = (-1, 0) - -# Utility functions to help us to get to closer proximity -# of the DRY principle (no, don't look for elevated or -# perspectivistic things here) - -def _xtime_now(): - t = time.time() - sec = int(t) - nsec = int((t - sec) * 1000000) - return (sec, nsec) - -def _volinfo_hook_relax_foreign(self): - volinfo_sys = self.get_sys_volinfo() - fgn_vi = volinfo_sys[self.KFGN] - if fgn_vi: - expiry = fgn_vi['timeout'] - int(time.time()) + 1 - logging.info('foreign volume info found, waiting %d sec for expiry' % \ - expiry) - time.sleep(expiry) - volinfo_sys = self.get_sys_volinfo() - self.volinfo_state, state_change = self.volinfo_state_machine(self.volinfo_state, - volinfo_sys) - if self.inter_master: - raise GsyncdError("cannot be intermediate master in special mode") - return (volinfo_sys, state_change) - - -# The API! - -def gmaster_builder(): - """produce the GMaster class variant corresponding - to sync mode""" - this = sys.modules[__name__] - modemixin = gconf.special_sync_mode - if not modemixin: - modemixin = 'normal' - logging.info('setting up master for %s sync mode' % modemixin) - modemixin = getattr(this, modemixin.capitalize() + 'Mixin') - sendmarkmixin = boolify(gconf.use_rsync_xattrs) and SendmarkRsyncMixin or SendmarkNormalMixin - purgemixin = boolify(gconf.ignore_deletes) and PurgeNoopMixin or PurgeNormalMixin - class _GMaster(GMasterBase, modemixin, sendmarkmixin, purgemixin): - pass - return _GMaster - - -# Mixin classes that implement the data format -# and logic particularities of the certain -# sync modes - -class NormalMixin(object): - """normal geo-rep behavior""" - - minus_infinity = URXTIME - - # following staticmethods ideally would be - # methods of an xtime object (in particular, - # implementing the hooks needed for comparison - # operators), but at this point we don't yet - # have a dedicated xtime class - - @staticmethod - def serialize_xtime(xt): - return "%d.%d" % tuple(xt) - - @staticmethod - def deserialize_xtime(xt): - return tuple(int(x) for x in xt.split(".")) - - @staticmethod - def native_xtime(xt): - return xt - - @staticmethod - def xtime_geq(xt0, xt1): - return xt0 >= xt1 - - def make_xtime_opts(self, is_master, opts): - if not 'create' in opts: - opts['create'] = is_master and not self.inter_master - if not 'default_xtime' in opts: - if is_master and self.inter_master: - opts['default_xtime'] = ENODATA - else: - opts['default_xtime'] = URXTIME - - def xtime_low(self, server, path, **opts): - xt = server.xtime(path, self.uuid) - if isinstance(xt, int) and xt != ENODATA: - return xt - if xt == ENODATA or xt < self.volmark: - if opts['create']: - xt = _xtime_now() - server.set_xtime(path, self.uuid, xt) - else: - xt = opts['default_xtime'] - return xt - - def keepalive_payload_hook(self, timo, gap): - # first grab a reference as self.volinfo - # can be changed in main thread - vi = self.volinfo - if vi: - # then have a private copy which we can mod - vi = vi.copy() - vi['timeout'] = int(time.time()) + timo - else: - # send keep-alives more frequently to - # avoid a delay in announcing our volume info - # to slave if it becomes established in the - # meantime - gap = min(10, gap) - return (vi, gap) - - def volinfo_hook(self): - volinfo_sys = self.get_sys_volinfo() - self.volinfo_state, state_change = self.volinfo_state_machine(self.volinfo_state, - volinfo_sys) - return (volinfo_sys, state_change) - - def xtime_reversion_hook(self, path, xtl, xtr): - if xtr > xtl: - raise GsyncdError("timestamp corruption for " + path) - - def need_sync(self, e, xte, xtrd): - return xte > xtrd - - def set_slave_xtime(self, path, mark): - self.slave.server.set_xtime(path, self.uuid, mark) - -class WrapupMixin(NormalMixin): - """a variant that differs from normal in terms - of ignoring non-indexed files""" - - @staticmethod - def make_xtime_opts(is_master, opts): - if not 'create' in opts: - opts['create'] = False - if not 'default_xtime' in opts: - opts['default_xtime'] = URXTIME - - @staticmethod - def keepalive_payload_hook(timo, gap): - return (None, gap) - - def volinfo_hook(self): - return _volinfo_hook_relax_foreign(self) - -class BlindMixin(object): - """Geo-rep flavor using vectored xtime. - - Coordinates are the master, slave uuid pair; - in master coordinate behavior is normal, - in slave coordinate we force synchronization - on any value difference (these are in disjunctive - relation, ie. if either orders the entry to be - synced, it shall be synced. - """ - - minus_infinity = (URXTIME, None) - - @staticmethod - def serialize_xtime(xt): - a = [] - for x in xt: - if not x: - x = ('None', '') - a.extend(x) - return '.'.join(str(n) for n in a) - - @staticmethod - def deserialize_xtime(xt): - a = xt.split(".") - a = (tuple(a[0:2]), tuple(a[3:4])) - b = [] - for p in a: - if p[0] == 'None': - p = None - else: - p = tuple(int(x) for x in p) - b.append(p) - return tuple(b) - - @staticmethod - def native_xtime(xt): - return xt[0] - - @staticmethod - def xtime_geq(xt0, xt1): - return (not xt1[0] or xt0[0] >= xt1[0]) and \ - (not xt1[1] or xt0[1] >= xt1[1]) - - @property - def ruuid(self): - if self.volinfo_r: - return self.volinfo_r['uuid'] - - @staticmethod - def make_xtime_opts(is_master, opts): - if not 'create' in opts: - opts['create'] = is_master - if not 'default_xtime' in opts: - opts['default_xtime'] = URXTIME - - def xtime_low(self, server, path, **opts): - xtd = server.xtime_vec(path, self.uuid, self.ruuid) - if isinstance(xtd, int): - return xtd - xt = (xtd[self.uuid], xtd[self.ruuid]) - if not xt[1] and (not xt[0] or xt[0] < self.volmark): - if opts['create']: - # not expected, but can happen if file originates - # from interrupted gsyncd transfer - logging.warn('have to fix up missing xtime on ' + path) - xt0 = _xtime_now() - server.set_xtime(path, self.uuid, xt0) - else: - xt0 = opts['default_xtime'] - xt = (xt0, xt[1]) - return xt - - @staticmethod - def keepalive_payload_hook(timo, gap): - return (None, gap) - - def volinfo_hook(self): - res = _volinfo_hook_relax_foreign(self) - volinfo_r_new = self.slave.server.native_volume_info() - if volinfo_r_new['retval']: - raise GsyncdError("slave is corrupt") - if getattr(self, 'volinfo_r', None): - if self.volinfo_r['uuid'] != volinfo_r_new['uuid']: - raise GsyncdError("uuid mismatch on slave") - self.volinfo_r = volinfo_r_new - return res - - def xtime_reversion_hook(self, path, xtl, xtr): - if not isinstance(xtr[0], int) and \ - (isinstance(xtl[0], int) or xtr[0] > xtl[0]): - raise GsyncdError("timestamp corruption for " + path) - - def need_sync(self, e, xte, xtrd): - if xte[0]: - if not xtrd[0] or xte[0] > xtrd[0]: - # there is outstanding diff at 0th pos, - # we can short-cut to true - return True - # we arrived to this point by either of these - # two possiblilites: - # - no outstanding difference at 0th pos, - # wanna see 1st pos if he raises veto - # against "no need to sync" proposal - # - no data at 0th pos, 1st pos will have - # to decide (due to xtime assignment, - # in this case 1st pos does carry data - # -- iow, if 1st pos did not have data, - # and 0th neither, 0th would have been - # force-feeded) - if not xte[1]: - # no data, no veto - return False - # the hard work: for 1st pos, - # the conduct is fetch corresponding - # slave data and do a "blind" comparison - # (ie. do not care who is newer, we trigger - # sync on non-identical xitmes) - xtr = self.xtime(e, self.slave) - return isinstance(xtr, int) or xte[1] != xtr[1] - - def set_slave_xtime(self, path, mark): - xtd = {} - for (u, t) in zip((self.uuid, self.ruuid), mark): - if t: - xtd[u] = t - self.slave.server.set_xtime_vec(path, xtd) - - -# Further mixins for certain tunable behaviors - -class SendmarkNormalMixin(object): - - def sendmark_regular(self, *a, **kw): - return self.sendmark(*a, **kw) - -class SendmarkRsyncMixin(object): - - def sendmark_regular(self, *a, **kw): - pass - - -class PurgeNormalMixin(object): - - def purge_missing(self, path, names): - self.slave.server.purge(path, names) - -class PurgeNoopMixin(object): - - def purge_missing(self, path, names): - pass - - - -class GMasterBase(object): - """abstract class impementling master role""" - - KFGN = 0 - KNAT = 1 - - def get_sys_volinfo(self): - """query volume marks on fs root - - err out on multiple foreign masters - """ - fgn_vis, nat_vi = self.master.server.foreign_volume_infos(), \ - self.master.server.native_volume_info() - fgn_vi = None - if fgn_vis: - if len(fgn_vis) > 1: - raise GsyncdError("cannot work with multiple foreign masters") - fgn_vi = fgn_vis[0] - return fgn_vi, nat_vi - - @property - def uuid(self): - if self.volinfo: - return self.volinfo['uuid'] - - @property - def volmark(self): - if self.volinfo: - return self.volinfo['volume_mark'] - - @property - def inter_master(self): - """decide if we are an intermediate master - in a cascading setup - """ - return self.volinfo_state[self.KFGN] and True or False - - def xtime(self, path, *a, **opts): - """get amended xtime - - as of amending, we can create missing xtime, or - determine a valid value if what we get is expired - (as of the volume mark expiry); way of amendig - depends on @opts and on subject of query (master - or slave). - """ - if a: - rsc = a[0] - else: - rsc = self.master - self.make_xtime_opts(rsc == self.master, opts) - return self.xtime_low(rsc.server, path, **opts) - - def __init__(self, master, slave): - self.master = master - self.slave = slave - self.jobtab = {} - self.syncer = Syncer(slave) - # crawls vs. turns: - # - self.crawls is simply the number of crawl() invocations on root - # - one turn is a maximal consecutive sequence of crawls so that each - # crawl in it detects a change to be synced - # - self.turns is the number of turns since start - # - self.total_turns is a limit so that if self.turns reaches it, then - # we exit (for diagnostic purposes) - # so, eg., if the master fs changes unceasingly, self.turns will remain 0. - self.crawls = 0 - self.turns = 0 - self.total_turns = int(gconf.turns) - self.lastreport = {'crawls': 0, 'turns': 0} - self.start = None - self.change_seen = None - self.syncTime=0 - self.lastSyncTime=0 - self.crawlStartTime=0 - self.crawlTime=0 - self.filesSynced=0 - self.bytesSynced=0 - # the authoritative (foreign, native) volinfo pair - # which lets us deduce what to do when we refetch - # the volinfos from system - uuid_preset = getattr(gconf, 'volume_id', None) - self.volinfo_state = (uuid_preset and {'uuid': uuid_preset}, None) - # the actual volinfo we make use of - self.volinfo = None - self.terminate = False - self.checkpoint_thread = None - - @classmethod - def _checkpt_param(cls, chkpt, prm, xtimish=True): - """use config backend to lookup a parameter belonging to - checkpoint @chkpt""" - cprm = getattr(gconf, 'checkpoint_' + prm, None) - if not cprm: - return - chkpt_mapped, val = cprm.split(':', 1) - if unescape(chkpt_mapped) != chkpt: - return - if xtimish: - val = cls.deserialize_xtime(val) - return val - - @classmethod - def _set_checkpt_param(cls, chkpt, prm, val, xtimish=True): - """use config backend to store a parameter associated - with checkpoint @chkpt""" - if xtimish: - val = cls.serialize_xtime(val) - gconf.configinterface.set('checkpoint_' + prm, "%s:%s" % (escape(chkpt), val)) - - @staticmethod - def humantime(*tpair): - """format xtime-like (sec, nsec) pair to human readable format""" - ts = datetime.fromtimestamp(float('.'.join(str(n) for n in tpair))).\ - strftime("%Y-%m-%d %H:%M:%S") - if len(tpair) > 1: - ts += '.' + str(tpair[1]) - return ts - - def get_extra_info(self): - str_info="\nFile synced : %d" %(self.filesSynced) - str_info+="\nBytes Synced : %d KB" %(self.syncer.bytesSynced) - str_info+="\nSync Time : %f seconds" %(self.syncTime) - self.crawlTime=datetime.now()-self.crawlStartTime - years , days =divmod(self.crawlTime.days,365.25) - years=int(years) - days=int(days) - - date="" - m, s = divmod(self.crawlTime.seconds, 60) - h, m = divmod(m, 60) - - if years!=0 : - date+=str(years)+" year " - if days!=0 : - date+=str(days)+" day " - if h!=0 : - date+=str(h)+" H : " - if m!=0 or h!=0 : - date+=str(m)+" M : " - - date+=str(s)+" S" - self.crawlTime=date - str_info+="\nCrawl Time : %s" %(str(self.crawlTime)) - str_info+="\n\0" - return str_info - - def checkpt_service(self, chan, chkpt, tgt): - """checkpoint service loop - - monitor and verify checkpoint status for @chkpt, and listen - for incoming requests for whom we serve a pretty-formatted - status report""" - if not chkpt: - # dummy loop for the case when there is no checkpt set - while True: - select([chan], [], []) - conn, _ = chan.accept() - conn.send(self.get_extra_info()) - conn.close() - completed = self._checkpt_param(chkpt, 'completed', xtimish=False) - if completed: - completed = tuple(int(x) for x in completed.split('.')) - while True: - s,_,_ = select([chan], [], [], (not completed) and 5 or None) - # either request made and we re-check to not - # give back stale data, or we still hunting for completion - if self.native_xtime(tgt) and self.native_xtime(tgt) < self.volmark: - # indexing has been reset since setting the checkpoint - status = "is invalid" - else: - xtr = self.xtime('.', self.slave) - if isinstance(xtr, int): - raise GsyncdError("slave root directory is unaccessible (%s)", - os.strerror(xtr)) - ncompleted = self.xtime_geq(xtr, tgt) - if completed and not ncompleted: # stale data - logging.warn("completion time %s for checkpoint %s became stale" % \ - (self.humantime(*completed), chkpt)) - completed = None - gconf.confdata.delete('checkpoint-completed') - if ncompleted and not completed: # just reaching completion - completed = "%.6f" % time.time() - self._set_checkpt_param(chkpt, 'completed', completed, xtimish=False) - completed = tuple(int(x) for x in completed.split('.')) - logging.info("checkpoint %s completed" % chkpt) - status = completed and \ - "completed at " + self.humantime(completed[0]) or \ - "not reached yet" - if s: - conn = None - try: - conn, _ = chan.accept() - try: - conn.send(" | checkpoint %s %s %s" % (chkpt, status,self.get_extra_info())) - except: - exc = sys.exc_info()[1] - if (isinstance(exc, OSError) or isinstance(exc, IOError)) and \ - exc.errno == EPIPE: - logging.debug('checkpoint client disconnected') - else: - raise - finally: - if conn: - conn.close() - - def start_checkpoint_thread(self): - """prepare and start checkpoint service""" - if self.checkpoint_thread or not ( - getattr(gconf, 'state_socket_unencoded', None) and getattr(gconf, 'socketdir', None) - ): - return - chan = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) - state_socket = os.path.join(gconf.socketdir, md5(gconf.state_socket_unencoded).hexdigest() + ".socket") - try: - os.unlink(state_socket) - except: - if sys.exc_info()[0] == OSError: - pass - chan.bind(state_socket) - chan.listen(1) - checkpt_tgt = None - if gconf.checkpoint: - checkpt_tgt = self._checkpt_param(gconf.checkpoint, 'target') - if not checkpt_tgt: - checkpt_tgt = self.xtime('.') - if isinstance(checkpt_tgt, int): - raise GsyncdError("master root directory is unaccessible (%s)", - os.strerror(checkpt_tgt)) - self._set_checkpt_param(gconf.checkpoint, 'target', checkpt_tgt) - logging.debug("checkpoint target %s has been determined for checkpoint %s" % \ - (repr(checkpt_tgt), gconf.checkpoint)) - t = Thread(target=self.checkpt_service, args=(chan, gconf.checkpoint, checkpt_tgt)) - t.start() - self.checkpoint_thread = t - - def crawl_loop(self): - """start the keep-alive thread and iterate .crawl""" - timo = int(gconf.timeout or 0) - if timo > 0: - def keep_alive(): - while True: - vi, gap = self.keepalive_payload_hook(timo, timo * 0.5) - self.slave.server.keep_alive(vi) - time.sleep(gap) - t = Thread(target=keep_alive) - t.start() - self.lastreport['time'] = time.time() - self.crawlStartTime=datetime.now() - while not self.terminate: - self.crawl() - - def add_job(self, path, label, job, *a, **kw): - """insert @job function to job table at @path with @label""" - if self.jobtab.get(path) == None: - self.jobtab[path] = [] - self.jobtab[path].append((label, a, lambda : job(*a, **kw))) - - def add_failjob(self, path, label): - """invoke .add_job with a job that does nothing just fails""" - logging.debug('salvaged: ' + label) - self.add_job(path, label, lambda: False) - - def wait(self, path, *args): - """perform jobs registered for @path - - Reset jobtab entry for @path, - determine success as the conjuction of - success of all the jobs. In case of - success, call .sendmark on @path - """ - jobs = self.jobtab.pop(path, []) - succeed = True - for j in jobs: - ret = j[-1]() - if not ret: - succeed = False - if succeed: - self.sendmark(path, *args) - return succeed - - def sendmark(self, path, mark, adct=None): - """update slave side xtime for @path to master side xtime - - also can send a setattr payload (see Server.setattr). - """ - if adct: - self.slave.server.setattr(path, adct) - self.set_slave_xtime(path, mark) - - @staticmethod - def volinfo_state_machine(volinfo_state, volinfo_sys): - """compute new volinfo_state from old one and incoming - as of current system state, also indicating if there was a - change regarding which volume mark is the authoritative one - - @volinfo_state, @volinfo_sys are pairs of volume mark dicts - (foreign, native). - - Note this method is marked as static, ie. the computation is - pure, without reliance on any excess implicit state. State - transitions which are deemed as ambiguous or banned will raise - an exception. - - """ - # store the value below "boxed" to emulate proper closures - # (variables of the enclosing scope are available inner functions - # provided they are no reassigned; mutation is OK). - param = FreeObject(relax_mismatch = False, state_change = None, index=-1) - def select_vi(vi0, vi): - param.index += 1 - if vi and (not vi0 or vi0['uuid'] == vi['uuid']): - if not vi0 and not param.relax_mismatch: - param.state_change = param.index - # valid new value found; for the rest, we are graceful about - # uuid mismatch - param.relax_mismatch = True - return vi - if vi0 and vi and vi0['uuid'] != vi['uuid'] and not param.relax_mismatch: - # uuid mismatch for master candidate, bail out - raise GsyncdError("aborting on uuid change from %s to %s" % \ - (vi0['uuid'], vi['uuid'])) - # fall back to old - return vi0 - newstate = tuple(select_vi(*vip) for vip in zip(volinfo_state, volinfo_sys)) - srep = lambda vi: vi and vi['uuid'][0:8] - logging.debug('(%s, %s) << (%s, %s) -> (%s, %s)' % \ - tuple(srep(vi) for vi in volinfo_state + volinfo_sys + newstate)) - return newstate, param.state_change - - def crawl(self, path='.', xtl=None): - """crawling... - - Standing around - All the right people - Crawling - Tennis on Tuesday - The ladder is long - It is your nature - You've gotta suntan - Football on Sunday - Society boy - - Recursively walk the master side tree and check if updates are - needed due to xtime differences. One invocation of crawl checks - children of @path and do a recursive enter only on - those directory children where there is an update needed. - - Way of updates depend on file type: - - for symlinks, sync them directy and synchronously - - for regular children, register jobs for @path (cf. .add_job) to start - and wait on their rsync - - for directory children, register a job for @path which waits (.wait) - on jobs for the given child - (other kind of filesystem nodes are not considered) - - Those slave side children which do not exist on master are simply - purged (see Server.purge). - - Behavior is fault tolerant, synchronization is adaptive: if some action fails, - just go on relentlessly, adding a fail job (see .add_failjob) which will prevent - the .sendmark on @path, so when the next crawl will arrive to @path it will not - see it as up-to-date and will try to sync it again. While this semantics can be - supported by funky design principles (http://c2.com/cgi/wiki?LazinessImpatienceHubris), - the ultimate reason which excludes other possibilities is simply transience: we cannot - assert that the file systems (master / slave) underneath do not change and actions - taken upon some condition will not lose their context by the time they are performed. - """ - if path == '.': - if self.start: - self.crawls += 1 - logging.debug("... crawl #%d done, took %.6f seconds" % \ - (self.crawls, time.time() - self.start)) - time.sleep(1) - self.start = time.time() - should_display_info = self.start - self.lastreport['time'] >= 60 - if should_display_info: - logging.info("completed %d crawls, %d turns", - self.crawls - self.lastreport['crawls'], - self.turns - self.lastreport['turns']) - self.lastreport.update(crawls = self.crawls, - turns = self.turns, - time = self.start) - volinfo_sys, state_change = self.volinfo_hook() - if self.inter_master: - self.volinfo = volinfo_sys[self.KFGN] - else: - self.volinfo = volinfo_sys[self.KNAT] - if state_change == self.KFGN or (state_change == self.KNAT and not self.inter_master): - logging.info('new master is %s', self.uuid) - if self.volinfo: - logging.info("%s master with volume id %s ..." % \ - (self.inter_master and "intermediate" or "primary", - self.uuid)) - if state_change == self.KFGN: - gconf.configinterface.set('volume_id', self.uuid) - if self.volinfo: - if self.volinfo['retval']: - raise GsyncdError ("master is corrupt") - self.start_checkpoint_thread() - else: - if should_display_info or self.crawls == 0: - if self.inter_master: - logging.info("waiting for being synced from %s ..." % \ - self.volinfo_state[self.KFGN]['uuid']) - else: - logging.info("waiting for volume info ...") - return - logging.debug("entering " + path) - if not xtl: - xtl = self.xtime(path) - if isinstance(xtl, int): - self.add_failjob(path, 'no-local-node') - return - xtr = self.xtime(path, self.slave) - if isinstance(xtr, int): - if xtr != ENOENT: - self.slave.server.purge(path) - try: - self.slave.server.mkdir(path) - except OSError: - self.add_failjob(path, 'no-remote-node') - return - xtr = self.minus_infinity - else: - self.xtime_reversion_hook(path, xtl, xtr) - if xtl == xtr: - if path == '.' and self.change_seen: - self.turns += 1 - self.change_seen = False - if self.total_turns: - logging.info("finished turn #%s/%s" % \ - (self.turns, self.total_turns)) - if self.turns == self.total_turns: - logging.info("reached turn limit") - self.terminate = True - return - if path == '.': - self.change_seen = True - try: - dem = self.master.server.entries(path) - except OSError: - self.add_failjob(path, 'local-entries-fail') - return - random.shuffle(dem) - try: - des = self.slave.server.entries(path) - except OSError: - self.slave.server.purge(path) - try: - self.slave.server.mkdir(path) - des = self.slave.server.entries(path) - except OSError: - self.add_failjob(path, 'remote-entries-fail') - return - dd = set(des) - set(dem) - if dd: - self.purge_missing(path, dd) - chld = [] - for e in dem: - e = os.path.join(path, e) - xte = self.xtime(e) - if isinstance(xte, int): - logging.warn("irregular xtime for %s: %s" % (e, errno.errorcode[xte])) - elif self.need_sync(e, xte, xtr): - chld.append((e, xte)) - def indulgently(e, fnc, blame=None): - if not blame: - blame = path - try: - return fnc(e) - except (IOError, OSError): - ex = sys.exc_info()[1] - if ex.errno == ENOENT: - logging.warn("salvaged ENOENT for " + e) - self.add_failjob(blame, 'by-indulgently') - return False - else: - raise - for e, xte in chld: - st = indulgently(e, lambda e: os.lstat(e)) - if st == False: - continue - mo = st.st_mode - adct = {'own': (st.st_uid, st.st_gid)} - if stat.S_ISLNK(mo): - if indulgently(e, lambda e: self.slave.server.symlink(os.readlink(e), e)) == False: - continue - self.sendmark(e, xte, adct) - elif stat.S_ISREG(mo): - logging.debug("syncing %s ..." % e) - pb = self.syncer.add(e) - timeA=datetime.now() - def regjob(e, xte, pb): - if pb.wait(): - logging.debug("synced " + e) - self.sendmark_regular(e, xte) - - timeB=datetime.now() - self.lastSyncTime=timeB-timeA - self.syncTime=(self.syncTime+self.lastSyncTime.microseconds)/(10.0**6) - self.filesSynced=self.filesSynced+1 - return True - else: - logging.warn("failed to sync " + e) - self.add_job(path, 'reg', regjob, e, xte, pb) - elif stat.S_ISDIR(mo): - adct['mode'] = mo - if indulgently(e, lambda e: (self.add_job(path, 'cwait', self.wait, e, xte, adct), - self.crawl(e, xte), - True)[-1], blame=e) == False: - continue - else: - # ignore fifos, sockets and special files - pass - if path == '.': - self.wait(path, xtl) - -class BoxClosedErr(Exception): - pass - -class PostBox(list): - """synchronized collection for storing things thought of as "requests" """ - - def __init__(self, *a): - list.__init__(self, *a) - # too bad Python stdlib does not have read/write locks... - # it would suffivce to grab the lock in .append as reader, in .close as writer - self.lever = Condition() - self.open = True - self.done = False - - def wait(self): - """wait on requests to be processed""" - self.lever.acquire() - if not self.done: - self.lever.wait() - self.lever.release() - return self.result - - def wakeup(self, data): - """wake up requestors with the result""" - self.result = data - self.lever.acquire() - self.done = True - self.lever.notifyAll() - self.lever.release() - - def append(self, e): - """post a request""" - self.lever.acquire() - if not self.open: - raise BoxClosedErr - list.append(self, e) - self.lever.release() - - def close(self): - """prohibit the posting of further requests""" - self.lever.acquire() - self.open = False - self.lever.release() - -class Syncer(object): - """a staged queue to relay rsync requests to rsync workers - - By "staged queue" its meant that when a consumer comes to the - queue, it takes _all_ entries, leaving the queue empty. - (I don't know if there is an official term for this pattern.) - - The queue uses a PostBox to accumulate incoming items. - When a consumer (rsync worker) comes, a new PostBox is - set up and the old one is passed on to the consumer. - - Instead of the simplistic scheme of having one big lock - which synchronizes both the addition of new items and - PostBox exchanges, use a separate lock to arbitrate consumers, - and rely on PostBox's synchronization mechanisms take - care about additions. - - There is a corner case racy situation, producers vs. consumers, - which is not handled by this scheme: namely, when the PostBox - exchange occurs in between being passed to the producer for posting - and the post placement. But that's what Postbox.close is for: - such a posting will find the PostBox closed, in which case - the producer can re-try posting against the actual PostBox of - the queue. - - To aid accumlation of items in the PostBoxen before grabbed - by an rsync worker, the worker goes to sleep a bit after - each completed syncjob. - """ - - def __init__(self, slave): - """spawn worker threads""" - self.slave = slave - self.lock = Lock() - self.pb = PostBox() - self.bytesSynced=0 - for i in range(int(gconf.sync_jobs)): - t = Thread(target=self.syncjob) - t.start() - - def syncjob(self): - """the life of a worker""" - while True: - pb = None - while True: - self.lock.acquire() - if self.pb: - pb, self.pb = self.pb, PostBox() - self.lock.release() - if pb: - break - time.sleep(0.5) - pb.close() - po = self.slave.rsync(pb) - if po.returncode == 0: - regEx=re.search('\ *total\ *transferred\ *file\ *size:\ *(\d+)\ *bytes\ *',po.stdout.read(),re.IGNORECASE) - if regEx: - self.bytesSynced+=(int(regEx.group(1)))/1024 - ret = True - elif po.returncode in (23, 24): - # partial transfer (cf. rsync(1)), that's normal - ret = False - else: - po.errfail() - pb.wakeup(ret) - - def add(self, e): - while True: - pb = self.pb - try: - pb.append(e) - return pb - except BoxClosedErr: - pass -- cgit