diff options
Diffstat (limited to 'geo-replication/syncdaemon/master.py')
| -rw-r--r-- | geo-replication/syncdaemon/master.py | 611 |
1 files changed, 148 insertions, 463 deletions
diff --git a/geo-replication/syncdaemon/master.py b/geo-replication/syncdaemon/master.py index 58df14954..95810a61e 100644 --- a/geo-replication/syncdaemon/master.py +++ b/geo-replication/syncdaemon/master.py @@ -17,7 +17,8 @@ from datetime import datetime from gconf import gconf from tempfile import mkdtemp, NamedTemporaryFile from syncdutils import FreeObject, Thread, GsyncdError, boolify, escape, \ - unescape, select, gauxpfx, md5hex, selfkill, entry2pb + unescape, select, gauxpfx, md5hex, selfkill, entry2pb, \ + lstat, errno_wrap URXTIME = (-1, 0) @@ -40,11 +41,7 @@ def _volinfo_hook_relax_foreign(self): expiry) time.sleep(expiry) volinfo_sys = self.get_sys_volinfo() - self.volinfo_state, state_change = self.volinfo_state_machine(self.volinfo_state, - volinfo_sys) - if self.inter_master: - raise GsyncdError("cannot be intermediate master in special mode") - return (volinfo_sys, state_change) + return volinfo_sys # The API! @@ -133,10 +130,7 @@ class NormalMixin(object): return (vi, gap) def volinfo_hook(self): - volinfo_sys = self.get_sys_volinfo() - self.volinfo_state, state_change = self.volinfo_state_machine(self.volinfo_state, - volinfo_sys) - return (volinfo_sys, state_change) + return self.get_sys_volinfo() def xtime_reversion_hook(self, path, xtl, xtr): if xtr > xtl: @@ -147,6 +141,7 @@ class NormalMixin(object): def set_slave_xtime(self, path, mark): self.slave.server.set_xtime(path, self.uuid, mark) + self.slave.server.set_xtime_remote(path, self.uuid, mark) class PartialMixin(NormalMixin): """a variant tuned towards operation with a master @@ -155,7 +150,7 @@ class PartialMixin(NormalMixin): def xtime_reversion_hook(self, path, xtl, xtr): pass -class WrapupMixin(NormalMixin): +class RecoverMixin(NormalMixin): """a variant that differs from normal in terms of ignoring non-indexed files""" @@ -166,142 +161,12 @@ class WrapupMixin(NormalMixin): if not 'default_xtime' in opts: opts['default_xtime'] = URXTIME - @staticmethod def keepalive_payload_hook(self, timo, gap): return (None, gap) def volinfo_hook(self): return _volinfo_hook_relax_foreign(self) -class BlindMixin(object): - """Geo-rep flavor using vectored xtime. - - Coordinates are the master, slave uuid pair; - in master coordinate behavior is normal, - in slave coordinate we force synchronization - on any value difference (these are in disjunctive - relation, ie. if either orders the entry to be - synced, it shall be synced. - """ - - minus_infinity = (URXTIME, None) - - @staticmethod - def serialize_xtime(xt): - a = [] - for x in xt: - if not x: - x = ('None', '') - a.extend(x) - return '.'.join(str(n) for n in a) - - @staticmethod - def deserialize_xtime(xt): - a = xt.split(".") - a = (tuple(a[0:2]), tuple(a[3:4])) - b = [] - for p in a: - if p[0] == 'None': - p = None - else: - p = tuple(int(x) for x in p) - b.append(p) - return tuple(b) - - @staticmethod - def native_xtime(xt): - return xt[0] - - @staticmethod - def xtime_geq(xt0, xt1): - return (not xt1[0] or xt0[0] >= xt1[0]) and \ - (not xt1[1] or xt0[1] >= xt1[1]) - - @property - def ruuid(self): - if self.volinfo_r: - return self.volinfo_r['uuid'] - - @staticmethod - def make_xtime_opts(is_master, opts): - if not 'create' in opts: - opts['create'] = is_master - if not 'default_xtime' in opts: - opts['default_xtime'] = URXTIME - - def xtime_low(self, server, path, **opts): - xtd = server.xtime_vec(path, self.uuid, self.ruuid) - if isinstance(xtd, int): - return xtd - xt = (xtd[self.uuid], xtd[self.ruuid]) - if not xt[1] and (not xt[0] or xt[0] < self.volmark): - if opts['create']: - # not expected, but can happen if file originates - # from interrupted gsyncd transfer - logging.warn('have to fix up missing xtime on ' + path) - xt0 = _xtime_now() - server.aggregated.set_xtime(path, self.uuid, xt0) - else: - xt0 = opts['default_xtime'] - xt = (xt0, xt[1]) - return xt - - @staticmethod - def keepalive_payload_hook(self, timo, gap): - return (None, gap) - - def volinfo_hook(self): - res = _volinfo_hook_relax_foreign(self) - volinfo_r_new = self.slave.server.aggregated.native_volume_info() - if volinfo_r_new['retval']: - raise GsyncdError("slave is corrupt") - if getattr(self, 'volinfo_r', None): - if self.volinfo_r['uuid'] != volinfo_r_new['uuid']: - raise GsyncdError("uuid mismatch on slave") - self.volinfo_r = volinfo_r_new - return res - - def xtime_reversion_hook(self, path, xtl, xtr): - if not isinstance(xtr[0], int) and \ - (isinstance(xtl[0], int) or xtr[0] > xtl[0]): - raise GsyncdError("timestamp corruption for " + path) - - def need_sync(self, e, xte, xtrd): - if xte[0]: - if not xtrd[0] or xte[0] > xtrd[0]: - # there is outstanding diff at 0th pos, - # we can short-cut to true - return True - # we arrived to this point by either of these - # two possiblilites: - # - no outstanding difference at 0th pos, - # wanna see 1st pos if he raises veto - # against "no need to sync" proposal - # - no data at 0th pos, 1st pos will have - # to decide (due to xtime assignment, - # in this case 1st pos does carry data - # -- iow, if 1st pos did not have data, - # and 0th neither, 0th would have been - # force-feeded) - if not xte[1]: - # no data, no veto - return False - # the hard work: for 1st pos, - # the conduct is fetch corresponding - # slave data and do a "blind" comparison - # (ie. do not care who is newer, we trigger - # sync on non-identical xitmes) - xtr = self.xtime(e, self.slave) - return isinstance(xtr, int) or xte[1] != xtr[1] - - def set_slave_xtime(self, path, mark): - xtd = {} - for (u, t) in zip((self.uuid, self.ruuid), mark): - if t: - xtd[u] = t - self.slave.server.set_xtime_vec(path, xtd) - - # Further mixins for certain tunable behaviors class SendmarkNormalMixin(object): @@ -355,13 +220,6 @@ class GMasterCommon(object): if self.volinfo: return self.volinfo['volume_mark'] - @property - def inter_master(self): - """decide if we are an intermediate master - in a cascading setup - """ - return self.volinfo_state[self.KFGN] and True or False - def xtime(self, path, *a, **opts): """get amended xtime @@ -379,7 +237,13 @@ class GMasterCommon(object): return self.xtime_low(rsc.server, path, **opts) def get_initial_crawl_data(self): - default_data = {'sync_time': 0, 'files_synced': 0, 'bytes_synced': 0} + # while persisting only 'files_syncd' is non-zero, rest of + # the stats are nulls. lets keep it that way in case they + # are needed to be used some day... + default_data = {'files_syncd': 0, + 'files_remaining': 0, + 'bytes_remaining': 0, + 'purges_remaining': 0} if getattr(gconf, 'state_detail_file', None): try: return json.load(open(gconf.state_detail_file)) @@ -392,7 +256,6 @@ class GMasterCommon(object): return default_data else: raise - return default_data def update_crawl_data(self): @@ -421,17 +284,11 @@ class GMasterCommon(object): self.crawls = 0 self.turns = 0 self.total_turns = int(gconf.turns) + self.crawl_start = datetime.now() self.lastreport = {'crawls': 0, 'turns': 0, 'time': 0} - self.crawl_stats = {'sync_time': 0, 'last_synctime': 0, 'crawl_starttime': 0, - 'crawl_time': 0, 'files_synced': 0, 'bytes_synced' :0} - self.total_crawl_stats = self.get_initial_crawl_data() + self.total_crawl_stats = None self.start = None self.change_seen = None - # the authoritative (foreign, native) volinfo pair - # which lets us deduce what to do when we refetch - # the volinfos from system - uuid_preset = getattr(gconf, 'volume_id', None) - self.volinfo_state = (uuid_preset and {'uuid': uuid_preset}, None) # the actual volinfo we make use of self.volinfo = None self.terminate = False @@ -450,34 +307,6 @@ class GMasterCommon(object): t = Thread(target=keep_alive) t.start() - def volinfo_query(self): - """volume info state machine""" - volinfo_sys, state_change = self.volinfo_hook() - if self.inter_master: - self.volinfo = volinfo_sys[self.KFGN] - else: - self.volinfo = volinfo_sys[self.KNAT] - if state_change == self.KFGN or (state_change == self.KNAT and not self.inter_master): - logging.info('new master is %s', self.uuid) - if self.volinfo: - logging.info("%s master with volume id %s ..." % \ - (self.inter_master and "intermediate" or "primary", - self.uuid)) - if state_change == self.KFGN: - gconf.configinterface.set('volume_id', self.uuid) - if self.volinfo: - if self.volinfo['retval']: - raise GsyncdError ("master is corrupt") - self.start_checkpoint_thread() - else: - if should_display_info or self.crawls == 0: - if self.inter_master: - logging.info("waiting for being synced from %s ..." % \ - self.volinfo_state[self.KFGN]['uuid']) - else: - logging.info("waiting for volume info ...") - return True - def should_crawl(cls): return (gconf.glusterd_uuid in cls.master.server.node_uuid()) @@ -490,26 +319,38 @@ class GMasterCommon(object): # for a passive gsyncd (ie. in a replicate scenario) # the keepalive thread would keep the connection alive. self.init_keep_alive() - self.lastreport['time'] = time.time() - self.crawl_stats['crawl_starttime'] = datetime.now() + # no need to maintain volinfo state machine. + # in a cascading setup, each geo-replication session is + # independent (ie. 'volume-mark' and 'xtime' are not + # propogated). This is beacuse the slave's xtime is now + # stored on the master itself. 'volume-mark' just identifies + # that we are in a cascading setup and need to enable + # 'geo-replication.ignore-pid-check' option. + volinfo_sys = self.volinfo_hook() + self.volinfo = volinfo_sys[self.KNAT] + inter_master = volinfo_sys[self.KFGN] + logging.info("%s master with volume id %s ..." % \ + (inter_master and "intermediate" or "primary", + self.uuid)) + gconf.configinterface.set('volume_id', self.uuid) + if self.volinfo: + if self.volinfo['retval']: + raise GsyncdError("master is corrupt") + self.start_checkpoint_thread() + else: + raise GsyncdError("master volinfo unavailable") + self.total_crawl_stats = self.get_initial_crawl_data() + self.lastreport['time'] = time.time() logging.info('crawl interval: %d seconds' % self.sleep_interval) + t0 = time.time() crawl = self.should_crawl() while not self.terminate: - if self.volinfo_query(): - continue - t1 = time.time() - if int(t1 - t0) >= 60: #lets hardcode this check to 60 seconds - crawl = self.should_crawl() - t0 = t1 - if not crawl: - time.sleep(5) - continue if self.start: logging.debug("... crawl #%d done, took %.6f seconds" % \ (self.crawls, time.time() - self.start)) - self.start = t1 + self.start = time.time() should_display_info = self.start - self.lastreport['time'] >= 60 if should_display_info: logging.info("%d crawls, %d turns", @@ -518,6 +359,13 @@ class GMasterCommon(object): self.lastreport.update(crawls = self.crawls, turns = self.turns, time = self.start) + t1 = time.time() + if int(t1 - t0) >= 60: #lets hardcode this check to 60 seconds + crawl = self.should_crawl() + t0 = t1 + if not crawl: + time.sleep(5) + continue self.crawl() if oneshot: return @@ -555,17 +403,13 @@ class GMasterCommon(object): return ts def get_extra_info(self): - str_info = "\nFilesSynced=%d;" % (self.crawl_stats['files_synced']) - str_info += "BytesSynced=%s;" % (self.crawl_stats['bytes_synced']) - - self.crawl_stats['crawl_time'] = datetime.now() - self.crawl_stats['crawl_starttime'] - - str_info += "Uptime=%s;" % (self._crawl_time_format(self.crawl_stats['crawl_time'])) - str_info += "SyncTime=%s;" % (self.crawl_stats['sync_time']) - str_info += "TotalSyncTime=%s;" % (self.total_crawl_stats['sync_time']) - str_info += "TotalFilesSynced=%d;" % (self.total_crawl_stats['files_synced']) - str_info += "TotalBytesSynced=%s;" % (self.total_crawl_stats['bytes_synced']) - str_info += "\0" + str_info = '\nUptime=%s;FilesSyncd=%d;FilesPending=%d;BytesPending=%d;DeletesPending=%d;' % \ + (self._crawl_time_format(datetime.now() - self.crawl_start), \ + self.total_crawl_stats['files_syncd'], \ + self.total_crawl_stats['files_remaining'], \ + self.total_crawl_stats['bytes_remaining'], \ + self.total_crawl_stats['purges_remaining']) + str_info += '\0' logging.debug(str_info) return str_info @@ -714,46 +558,6 @@ class GMasterCommon(object): self.slave.server.setattr(path, adct) self.set_slave_xtime(path, mark) - @staticmethod - def volinfo_state_machine(volinfo_state, volinfo_sys): - """compute new volinfo_state from old one and incoming - as of current system state, also indicating if there was a - change regarding which volume mark is the authoritative one - - @volinfo_state, @volinfo_sys are pairs of volume mark dicts - (foreign, native). - - Note this method is marked as static, ie. the computation is - pure, without reliance on any excess implicit state. State - transitions which are deemed as ambiguous or banned will raise - an exception. - - """ - # store the value below "boxed" to emulate proper closures - # (variables of the enclosing scope are available inner functions - # provided they are no reassigned; mutation is OK). - param = FreeObject(relax_mismatch = False, state_change = None, index=-1) - def select_vi(vi0, vi): - param.index += 1 - if vi and (not vi0 or vi0['uuid'] == vi['uuid']): - if not vi0 and not param.relax_mismatch: - param.state_change = param.index - # valid new value found; for the rest, we are graceful about - # uuid mismatch - param.relax_mismatch = True - return vi - if vi0 and vi and vi0['uuid'] != vi['uuid'] and not param.relax_mismatch: - # uuid mismatch for master candidate, bail out - raise GsyncdError("aborting on uuid change from %s to %s" % \ - (vi0['uuid'], vi['uuid'])) - # fall back to old - return vi0 - newstate = tuple(select_vi(*vip) for vip in zip(volinfo_state, volinfo_sys)) - srep = lambda vi: vi and vi['uuid'][0:8] - logging.debug('(%s, %s) << (%s, %s) -> (%s, %s)' % \ - tuple(srep(vi) for vi in volinfo_state + volinfo_sys + newstate)) - return newstate, param.state_change - class GMasterChangelogMixin(GMasterCommon): """ changelog based change detection and syncing """ @@ -776,6 +580,9 @@ class GMasterChangelogMixin(GMasterCommon): # flat directory heirarchy for gfid based access FLAT_DIR_HIERARCHY = '.' + # maximum retries per changelog before giving up + MAX_RETRIES = 10 + def fallback_xsync(self): logging.info('falling back to xsync mode') gconf.configinterface.set('change-detector', 'xsync') @@ -787,15 +594,11 @@ class GMasterChangelogMixin(GMasterCommon): logging.debug('changelog working dir %s (log: %s)' % (workdir, logfile)) return (workdir, logfile) - def lstat(self, e): - try: - return os.lstat(e) - except (IOError, OSError): - ex = sys.exc_info()[1] - if ex.errno == ENOENT: - return ex.errno - else: - raise + # update stats from *this* crawl + def update_cumulative_stats(self, files_pending): + self.total_crawl_stats['files_remaining'] = files_pending['count'] + self.total_crawl_stats['bytes_remaining'] = files_pending['bytes'] + self.total_crawl_stats['purges_remaining'] = files_pending['purge'] # sync data def syncdata(self, datas): @@ -803,43 +606,31 @@ class GMasterChangelogMixin(GMasterCommon): for data in datas: logging.debug('candidate for syncing %s' % data) pb = self.syncer.add(data) - timeA = datetime.now() def regjob(se, xte, pb): rv = pb.wait() if rv[0]: logging.debug('synced ' + se) - # update stats - timeB = datetime.now() - self.crawl_stats['last_synctime'] = timeB - timeA - self.crawl_stats['sync_time'] += ((self.crawl_stats['last_synctime'].microseconds) / (10.0 ** 6)) - self.crawl_stats['files_synced'] += 1 - self.crawl_stats['bytes_synced'] += self.syncer.bytes_synced - - # cumulative statistics - self.total_crawl_stats['bytes_synced'] += self.syncer.bytes_synced - self.total_crawl_stats['sync_time'] += ((self.crawl_stats['last_synctime'].microseconds) / (10.0 ** 6)) - self.total_crawl_stats['files_synced'] += 1 return True else: if rv[1] in [23, 24]: # stat to check if the file exist - st = self.lstat(se) + st = lstat(se) if isinstance(st, int): # file got unlinked in the interim return True logging.warn('Rsync: %s [errcode: %d]' % (se, rv[1])) self.add_job(self.FLAT_DIR_HIERARCHY, 'reg', regjob, data, None, pb) if self.wait(self.FLAT_DIR_HIERARCHY, None): - self.update_crawl_data() return True - def process_change(self, change, done): + def process_change(self, change, done, retry): + pfx = gauxpfx() clist = [] entries = [] - purges = set() - links = set() datas = set() - pfx = gauxpfx() + + # basic crawl stats: files and bytes + files_pending = {'count': 0, 'purge': 0, 'bytes': 0, 'files': []} try: f = open(change, "r") clist = f.readlines() @@ -860,6 +651,27 @@ class GMasterChangelogMixin(GMasterCommon): else: dct[k] = ed[k] return dct + + # regular file update: bytes & count + def _update_reg(entry, size): + if not entry in files_pending['files']: + files_pending['count'] += 1 + files_pending['bytes'] += size + files_pending['files'].append(entry) + # updates for directories, symlinks etc.. + def _update_rest(): + files_pending['count'] += 1 + + # entry count + def entry_update(entry, size, mode): + if stat.S_ISREG(mode): + _update_reg(entry, size) + else: + _update_rest() + # purge count + def purge_update(): + files_pending['purge'] += 1 + for e in clist: e = e.strip() et = e[self.IDX_START:self.IDX_END] @@ -870,51 +682,75 @@ class GMasterChangelogMixin(GMasterCommon): gfid = ec[self.POS_GFID] # definitely need a better way bucketize entry ops if ty in ['UNLINK', 'RMDIR']: - entries.append(edct(ty, gfid=gfid, entry=en)) - purges.update([os.path.join(pfx, gfid)]) - continue - if not ty == 'RENAME': - go = os.path.join(pfx, gfid) - st = self.lstat(go) - if isinstance(st, int): + purge_update() + entries.append(edct(ty, gfid=gfid, entry=en)) + continue + go = os.path.join(pfx, gfid) + st = lstat(go) + if isinstance(st, int): + if ty == 'RENAME': + entries.append(edct('UNLINK', gfid=gfid, entry=en)) + else: logging.debug('file %s got purged in the interim' % go) - continue + continue + entry_update(go, st.st_size, st.st_mode) if ty in ['CREATE', 'MKDIR', 'MKNOD']: entries.append(edct(ty, stat=st, entry=en, gfid=gfid)) elif ty == 'LINK': entries.append(edct(ty, stat=st, entry=en, gfid=gfid)) - links.update([os.path.join(pfx, gfid)]) elif ty == 'SYMLINK': - entries.append(edct(ty, stat=st, entry=en, gfid=gfid, link=os.readlink(en))) + rl = errno_wrap(os.readlink, [en], [ENOENT]) + if isinstance(rl, int): + continue + entries.append(edct(ty, stat=st, entry=en, gfid=gfid, link=rl)) elif ty == 'RENAME': e2 = unescape(os.path.join(pfx, ec[self.POS_ENTRY2])) - entries.append(edct(ty, gfid=gfid, entry=en, entry1=e2)) + entries.append(edct(ty, gfid=gfid, entry=en, entry1=e2, stat=st)) else: - pass + logging.warn('ignoring %s [op %s]' % (gfid, ty)) elif et in self.TYPE_GFID: - da = os.path.join(pfx, ec[0]) - st = self.lstat(da) + go = os.path.join(pfx, ec[0]) + st = lstat(go) if isinstance(st, int): - logging.debug('file %s got purged in the interim' % da) + logging.debug('file %s got purged in the interim' % go) continue - datas.update([da]) + entry_update(go, st.st_size, st.st_mode) + datas.update([go]) logging.debug('entries: %s' % repr(entries)) + if not retry: + self.update_cumulative_stats(files_pending) # sync namespace if (entries): self.slave.server.entry_ops(entries) # sync data - if self.syncdata(datas - (purges - links)): + if self.syncdata(datas): if done: self.master.server.changelog_done(change) return True + def sync_done(self): + self.total_crawl_stats['files_syncd'] += self.total_crawl_stats['files_remaining'] + self.total_crawl_stats['files_remaining'] = 0 + self.total_crawl_stats['bytes_remaining'] = 0 + self.total_crawl_stats['purges_remaining'] = 0 + self.update_crawl_data() + def process(self, changes, done=1): for change in changes: - times = 0 + tries = 0 + retry = False while True: - times += 1 - logging.debug('processing change %s [%d time(s)]' % (change, times)) - if self.process_change(change, done): + logging.debug('processing change %s' % change) + if self.process_change(change, done, retry): + self.sync_done() + break + retry = True + tries += 1 + if tries == self.MAX_RETRIES: + logging.warn('changelog %s could not be processed - moving on...' % os.path.basename(change)) + self.sync_done() + if done: + self.master.server.changelog_done(change) break # it's either entry_ops() or Rsync that failed to do it's # job. Mostly it's entry_ops() [which currently has a problem @@ -928,7 +764,7 @@ class GMasterChangelogMixin(GMasterCommon): self.turns += 1 def upd_stime(self, stime): - if stime: + if not stime == URXTIME: self.sendmark(self.FLAT_DIR_HIERARCHY, stime) def crawl(self): @@ -1031,7 +867,7 @@ class GMasterXsyncMixin(GMasterChangelogMixin): for e in dem: bname = e e = os.path.join(path, e) - st = self.lstat(e) + st = lstat(e) if isinstance(st, int): logging.warn('%s got purged in the interim..' % e) continue @@ -1048,178 +884,27 @@ class GMasterXsyncMixin(GMasterChangelogMixin): if stat.S_ISDIR(mo): self.write_entry_change("E", [gfid, 'MKDIR', escape(os.path.join(pargfid, bname))]) self.crawl(e, xtr) - elif stat.S_ISREG(mo): - self.write_entry_change("E", [gfid, 'CREATE', escape(os.path.join(pargfid, bname))]) - self.write_entry_change("D", [gfid]) elif stat.S_ISLNK(mo): - self.write_entry_change("E", [gfid, 'SYMLINK', escape(os.path.join(pargfid, bname))]) + rl = errno_wrap(os.readlink, [en], [ENOENT]) + if isinstance(rl, int): + continue + self.write_entry_change("E", [gfid, 'SYMLINK', escape(os.path.join(pargfid, bname)), rl]) else: - logging.info('ignoring %s' % e) + # if a file has a hardlink, create a Changelog entry as 'LINK' so the slave + # side will decide if to create the new entry, or to create link. + if st.st_nlink == 1: + self.write_entry_change("E", [gfid, 'MKNOD', escape(os.path.join(pargfid, bname))]) + else: + self.write_entry_change("E", [gfid, 'LINK', escape(os.path.join(pargfid, bname))]) + if stat.S_ISREG(mo): + self.write_entry_change("D", [gfid]) + if path == '.': logging.info('processing xsync changelog %s' % self.fname()) self.close() self.process([self.fname()], done) self.upd_stime(xtl) -class GMasterXtimeMixin(GMasterCommon): - """ xtime based change detection and syncing """ - - def register(self): - pass - - def crawl(self, path='.', xtl=None): - """crawling... - - Standing around - All the right people - Crawling - Tennis on Tuesday - The ladder is long - It is your nature - You've gotta suntan - Football on Sunday - Society boy - - Recursively walk the master side tree and check if updates are - needed due to xtime differences. One invocation of crawl checks - children of @path and do a recursive enter only on - those directory children where there is an update needed. - - Way of updates depend on file type: - - for symlinks, sync them directy and synchronously - - for regular children, register jobs for @path (cf. .add_job) to start - and wait on their rsync - - for directory children, register a job for @path which waits (.wait) - on jobs for the given child - (other kind of filesystem nodes are not considered) - - Those slave side children which do not exist on master are simply - purged (see Server.purge). - - Behavior is fault tolerant, synchronization is adaptive: if some action fails, - just go on relentlessly, adding a fail job (see .add_failjob) which will prevent - the .sendmark on @path, so when the next crawl will arrive to @path it will not - see it as up-to-date and will try to sync it again. While this semantics can be - supported by funky design principles (http://c2.com/cgi/wiki?LazinessImpatienceHubris), - the ultimate reason which excludes other possibilities is simply transience: we cannot - assert that the file systems (master / slave) underneath do not change and actions - taken upon some condition will not lose their context by the time they are performed. - """ - logging.debug("entering " + path) - if not xtl: - xtl = self.xtime(path) - if isinstance(xtl, int): - self.add_failjob(path, 'no-local-node') - return - xtr = self.xtime(path, self.slave) - if isinstance(xtr, int): - if xtr != ENOENT: - self.slave.server.purge(path) - try: - self.slave.server.mkdir(path) - except OSError: - self.add_failjob(path, 'no-remote-node') - return - xtr = self.minus_infinity - else: - self.xtime_reversion_hook(path, xtl, xtr) - if xtl == xtr: - if path == '.' and self.change_seen: - self.turns += 1 - self.change_seen = False - if self.total_turns: - logging.info("finished turn #%s/%s" % \ - (self.turns, self.total_turns)) - if self.turns == self.total_turns: - logging.info("reached turn limit") - self.terminate = True - return - if path == '.': - self.change_seen = True - try: - dem = self.master.server.entries(path) - except OSError: - self.add_failjob(path, 'local-entries-fail') - return - random.shuffle(dem) - try: - des = self.slave.server.entries(path) - except OSError: - self.slave.server.purge(path) - try: - self.slave.server.mkdir(path) - des = self.slave.server.entries(path) - except OSError: - self.add_failjob(path, 'remote-entries-fail') - return - dd = set(des) - set(dem) - if dd: - self.purge_missing(path, dd) - chld = [] - for e in dem: - e = os.path.join(path, e) - xte = self.xtime(e) - if isinstance(xte, int): - logging.warn("irregular xtime for %s: %s" % (e, errno.errorcode[xte])) - elif self.need_sync(e, xte, xtr): - chld.append((e, xte)) - def indulgently(e, fnc, blame=None): - if not blame: - blame = path - try: - return fnc(e) - except (IOError, OSError): - ex = sys.exc_info()[1] - if ex.errno == ENOENT: - logging.warn("salvaged ENOENT for " + e) - self.add_failjob(blame, 'by-indulgently') - return False - else: - raise - for e, xte in chld: - st = indulgently(e, lambda e: os.lstat(e)) - if st == False: - continue - - mo = st.st_mode - adct = {'own': (st.st_uid, st.st_gid)} - if stat.S_ISLNK(mo): - if indulgently(e, lambda e: self.slave.server.symlink(os.readlink(e), e)) == False: - continue - self.sendmark(e, xte, adct) - elif stat.S_ISREG(mo): - logging.debug("syncing %s ..." % e) - pb = self.syncer.add(e) - timeA = datetime.now() - def regjob(e, xte, pb): - if pb.wait()[0]: - logging.debug("synced " + e) - self.sendmark_regular(e, xte) - # update stats - timeB = datetime.now() - self.crawl_stats['last_synctime'] = timeB - timeA - self.crawl_stats['sync_time'] += ((self.crawl_stats['last_synctime'].microseconds) / (10.0 ** 6)) - self.crawl_stats['files_synced'] += 1 - self.total_crawl_stats['sync_time'] += ((self.crawl_stats['last_synctime'].microseconds) / (10.0 ** 6)) - self.total_crawl_stats['files_synced'] += 1 - self.update_crawl_data() - return True - else: - logging.warn("failed to sync " + e) - self.add_job(path, 'reg', regjob, e, xte, pb) - elif stat.S_ISDIR(mo): - adct['mode'] = mo - if indulgently(e, lambda e: (self.add_job(path, 'cwait', self.wait, e, xte, adct), - self.crawl(e, xte), - True)[-1], blame=e) == False: - continue - else: - # ignore fifos, sockets and special files - pass - if path == '.': - self.wait(path, xtl) - - class BoxClosedErr(Exception): pass |
