From b13c483dca20e4015b958f8959328e665a357f60 Mon Sep 17 00:00:00 2001 From: Avra Sengupta Date: Sat, 1 Jun 2013 16:17:57 +0530 Subject: gsyncd: distribute the crawling load * also consume changelog for change detection. * Status fixes * Use new libgfchangelog done API * process (and sync) one changelog at a time Change-Id: I24891615bb762e0741b1819ddfdef8802326cb16 BUG: 847839 Original Author: Csaba Henk Original Author: Aravinda VK Original Author: Venky Shankar Original Author: Amar Tumballi Original Author: Avra Sengupta Signed-off-by: Avra Sengupta Reviewed-on: http://review.gluster.org/5131 Reviewed-by: Vijay Bellur Tested-by: Vijay Bellur --- geo-replication/syncdaemon/master.py | 632 ++++++++++++++++++++++++++++------- 1 file changed, 504 insertions(+), 128 deletions(-) (limited to 'geo-replication/syncdaemon/master.py') diff --git a/geo-replication/syncdaemon/master.py b/geo-replication/syncdaemon/master.py index f903f305..58df1495 100644 --- a/geo-replication/syncdaemon/master.py +++ b/geo-replication/syncdaemon/master.py @@ -4,22 +4,20 @@ import time import stat import random import signal +import json import logging import socket +import string import errno -import re -from errno import ENOENT, ENODATA, EPIPE +from shutil import copyfileobj +from errno import ENOENT, ENODATA, EPIPE, EEXIST from threading import currentThread, Condition, Lock from datetime import datetime -try: - from hashlib import md5 as md5 -except ImportError: - # py 2.4 - from md5 import new as md5 from gconf import gconf -from syncdutils import FreeObject, Thread, GsyncdError, boolify, \ - escape, unescape, select +from tempfile import mkdtemp, NamedTemporaryFile +from syncdutils import FreeObject, Thread, GsyncdError, boolify, escape, \ + unescape, select, gauxpfx, md5hex, selfkill, entry2pb URXTIME = (-1, 0) @@ -51,18 +49,20 @@ def _volinfo_hook_relax_foreign(self): # The API! -def gmaster_builder(): +def gmaster_builder(excrawl=None): """produce the GMaster class variant corresponding to sync mode""" this = sys.modules[__name__] modemixin = gconf.special_sync_mode if not modemixin: modemixin = 'normal' - logging.info('setting up master for %s sync mode' % modemixin) + changemixin = isinstance(excrawl, str) and excrawl or gconf.change_detector + logging.info('setting up %s change detection mode' % changemixin) modemixin = getattr(this, modemixin.capitalize() + 'Mixin') + crawlmixin = getattr(this, 'GMaster' + changemixin.capitalize() + 'Mixin') sendmarkmixin = boolify(gconf.use_rsync_xattrs) and SendmarkRsyncMixin or SendmarkNormalMixin purgemixin = boolify(gconf.ignore_deletes) and PurgeNoopMixin or PurgeNormalMixin - class _GMaster(GMasterBase, modemixin, sendmarkmixin, purgemixin): + class _GMaster(crawlmixin, modemixin, sendmarkmixin, purgemixin): pass return _GMaster @@ -100,12 +100,9 @@ class NormalMixin(object): def make_xtime_opts(self, is_master, opts): if not 'create' in opts: - opts['create'] = is_master and not self.inter_master + opts['create'] = is_master if not 'default_xtime' in opts: - if is_master and self.inter_master: - opts['default_xtime'] = ENODATA - else: - opts['default_xtime'] = URXTIME + opts['default_xtime'] = URXTIME def xtime_low(self, server, path, **opts): xt = server.xtime(path, self.uuid) @@ -114,7 +111,7 @@ class NormalMixin(object): if xt == ENODATA or xt < self.volmark: if opts['create']: xt = _xtime_now() - server.set_xtime(path, self.uuid, xt) + server.aggregated.set_xtime(path, self.uuid, xt) else: xt = opts['default_xtime'] return xt @@ -151,6 +148,13 @@ class NormalMixin(object): def set_slave_xtime(self, path, mark): self.slave.server.set_xtime(path, self.uuid, mark) +class PartialMixin(NormalMixin): + """a variant tuned towards operation with a master + that has partial info of the slave (brick typically)""" + + def xtime_reversion_hook(self, path, xtl, xtr): + pass + class WrapupMixin(NormalMixin): """a variant that differs from normal in terms of ignoring non-indexed files""" @@ -163,7 +167,7 @@ class WrapupMixin(NormalMixin): opts['default_xtime'] = URXTIME @staticmethod - def keepalive_payload_hook(timo, gap): + def keepalive_payload_hook(self, timo, gap): return (None, gap) def volinfo_hook(self): @@ -236,19 +240,19 @@ class BlindMixin(object): # from interrupted gsyncd transfer logging.warn('have to fix up missing xtime on ' + path) xt0 = _xtime_now() - server.set_xtime(path, self.uuid, xt0) + server.aggregated.set_xtime(path, self.uuid, xt0) else: xt0 = opts['default_xtime'] xt = (xt0, xt[1]) return xt @staticmethod - def keepalive_payload_hook(timo, gap): + def keepalive_payload_hook(self, timo, gap): return (None, gap) def volinfo_hook(self): res = _volinfo_hook_relax_foreign(self) - volinfo_r_new = self.slave.server.native_volume_info() + volinfo_r_new = self.slave.server.aggregated.native_volume_info() if volinfo_r_new['retval']: raise GsyncdError("slave is corrupt") if getattr(self, 'volinfo_r', None): @@ -321,9 +325,7 @@ class PurgeNoopMixin(object): def purge_missing(self, path, names): pass - - -class GMasterBase(object): +class GMasterCommon(object): """abstract class impementling master role""" KFGN = 0 @@ -334,8 +336,8 @@ class GMasterBase(object): err out on multiple foreign masters """ - fgn_vis, nat_vi = self.master.server.foreign_volume_infos(), \ - self.master.server.native_volume_info() + fgn_vis, nat_vi = self.master.server.aggregated.foreign_volume_infos(), \ + self.master.server.aggregated.native_volume_info() fgn_vi = None if fgn_vis: if len(fgn_vis) > 1: @@ -376,6 +378,33 @@ class GMasterBase(object): self.make_xtime_opts(rsc == self.master, opts) return self.xtime_low(rsc.server, path, **opts) + def get_initial_crawl_data(self): + default_data = {'sync_time': 0, 'files_synced': 0, 'bytes_synced': 0} + if getattr(gconf, 'state_detail_file', None): + try: + return json.load(open(gconf.state_detail_file)) + except (IOError, OSError): + ex = sys.exc_info()[1] + if ex.errno == ENOENT: + # Create file with initial data + with open(gconf.state_detail_file, 'wb') as f: + json.dump(default_data, f) + return default_data + else: + raise + + return default_data + + def update_crawl_data(self): + if getattr(gconf, 'state_detail_file', None): + try: + same_dir = os.path.dirname(gconf.state_detail_file) + with NamedTemporaryFile(dir=same_dir, delete=False) as tmp: + json.dump(self.total_crawl_stats, tmp) + os.rename(tmp.name, gconf.state_detail_file) + except (IOError, OSError): + raise + def __init__(self, master, slave): self.master = master self.slave = slave @@ -392,15 +421,12 @@ class GMasterBase(object): self.crawls = 0 self.turns = 0 self.total_turns = int(gconf.turns) - self.lastreport = {'crawls': 0, 'turns': 0} + self.lastreport = {'crawls': 0, 'turns': 0, 'time': 0} + self.crawl_stats = {'sync_time': 0, 'last_synctime': 0, 'crawl_starttime': 0, + 'crawl_time': 0, 'files_synced': 0, 'bytes_synced' :0} + self.total_crawl_stats = self.get_initial_crawl_data() self.start = None self.change_seen = None - self.syncTime=0 - self.lastSyncTime=0 - self.crawlStartTime=0 - self.crawlTime=0 - self.filesSynced=0 - self.bytesSynced=0 # the authoritative (foreign, native) volinfo pair # which lets us deduce what to do when we refetch # the volinfos from system @@ -409,8 +435,94 @@ class GMasterBase(object): # the actual volinfo we make use of self.volinfo = None self.terminate = False + self.sleep_interval = 1 self.checkpoint_thread = None + def init_keep_alive(cls): + """start the keep-alive thread """ + timo = int(gconf.timeout or 0) + if timo > 0: + def keep_alive(): + while True: + vi, gap = cls.keepalive_payload_hook(timo, timo * 0.5) + cls.slave.server.keep_alive(vi) + time.sleep(gap) + t = Thread(target=keep_alive) + t.start() + + def volinfo_query(self): + """volume info state machine""" + volinfo_sys, state_change = self.volinfo_hook() + if self.inter_master: + self.volinfo = volinfo_sys[self.KFGN] + else: + self.volinfo = volinfo_sys[self.KNAT] + if state_change == self.KFGN or (state_change == self.KNAT and not self.inter_master): + logging.info('new master is %s', self.uuid) + if self.volinfo: + logging.info("%s master with volume id %s ..." % \ + (self.inter_master and "intermediate" or "primary", + self.uuid)) + if state_change == self.KFGN: + gconf.configinterface.set('volume_id', self.uuid) + if self.volinfo: + if self.volinfo['retval']: + raise GsyncdError ("master is corrupt") + self.start_checkpoint_thread() + else: + if should_display_info or self.crawls == 0: + if self.inter_master: + logging.info("waiting for being synced from %s ..." % \ + self.volinfo_state[self.KFGN]['uuid']) + else: + logging.info("waiting for volume info ...") + return True + + def should_crawl(cls): + return (gconf.glusterd_uuid in cls.master.server.node_uuid()) + + def register(self): + self.register() + + def crawlwrap(self, oneshot=False): + if oneshot: + # it's important to do this during the oneshot crawl as + # for a passive gsyncd (ie. in a replicate scenario) + # the keepalive thread would keep the connection alive. + self.init_keep_alive() + self.lastreport['time'] = time.time() + self.crawl_stats['crawl_starttime'] = datetime.now() + + logging.info('crawl interval: %d seconds' % self.sleep_interval) + t0 = time.time() + crawl = self.should_crawl() + while not self.terminate: + if self.volinfo_query(): + continue + t1 = time.time() + if int(t1 - t0) >= 60: #lets hardcode this check to 60 seconds + crawl = self.should_crawl() + t0 = t1 + if not crawl: + time.sleep(5) + continue + if self.start: + logging.debug("... crawl #%d done, took %.6f seconds" % \ + (self.crawls, time.time() - self.start)) + self.start = t1 + should_display_info = self.start - self.lastreport['time'] >= 60 + if should_display_info: + logging.info("%d crawls, %d turns", + self.crawls - self.lastreport['crawls'], + self.turns - self.lastreport['turns']) + self.lastreport.update(crawls = self.crawls, + turns = self.turns, + time = self.start) + self.crawl() + if oneshot: + return + time.sleep(self.sleep_interval) + @classmethod def _checkpt_param(cls, chkpt, prm, xtimish=True): """use config backend to lookup a parameter belonging to @@ -443,32 +555,37 @@ class GMasterBase(object): return ts def get_extra_info(self): - str_info="\nFile synced : %d" %(self.filesSynced) - str_info+="\nBytes Synced : %d KB" %(self.syncer.bytesSynced) - str_info+="\nSync Time : %f seconds" %(self.syncTime) - self.crawlTime=datetime.now()-self.crawlStartTime - years , days =divmod(self.crawlTime.days,365.25) - years=int(years) - days=int(days) + str_info = "\nFilesSynced=%d;" % (self.crawl_stats['files_synced']) + str_info += "BytesSynced=%s;" % (self.crawl_stats['bytes_synced']) + + self.crawl_stats['crawl_time'] = datetime.now() - self.crawl_stats['crawl_starttime'] + + str_info += "Uptime=%s;" % (self._crawl_time_format(self.crawl_stats['crawl_time'])) + str_info += "SyncTime=%s;" % (self.crawl_stats['sync_time']) + str_info += "TotalSyncTime=%s;" % (self.total_crawl_stats['sync_time']) + str_info += "TotalFilesSynced=%d;" % (self.total_crawl_stats['files_synced']) + str_info += "TotalBytesSynced=%s;" % (self.total_crawl_stats['bytes_synced']) + str_info += "\0" + logging.debug(str_info) + return str_info + + def _crawl_time_format(self, crawl_time): + # Ex: 5 years, 4 days, 20:23:10 + years, days = divmod(crawl_time.days, 365.25) + years = int(years) + days = int(days) date="" - m, s = divmod(self.crawlTime.seconds, 60) + m, s = divmod(crawl_time.seconds, 60) h, m = divmod(m, 60) - if years!=0 : - date+=str(years)+" year " - if days!=0 : - date+=str(days)+" day " - if h!=0 : - date+=str(h)+" H : " - if m!=0 or h!=0 : - date+=str(m)+" M : " - - date+=str(s)+" S" - self.crawlTime=date - str_info+="\nCrawl Time : %s" %(str(self.crawlTime)) - str_info+="\n\0" - return str_info + if years != 0: + date += "%s %s " % (years, "year" if years == 1 else "years") + if days != 0: + date += "%s %s " % (days, "day" if days == 1 else "days") + + date += "%s:%s:%s" % (string.zfill(h, 2), string.zfill(m, 2), string.zfill(s, 2)) + return date def checkpt_service(self, chan, chkpt, tgt): """checkpoint service loop @@ -517,7 +634,7 @@ class GMasterBase(object): try: conn, _ = chan.accept() try: - conn.send(" | checkpoint %s %s %s" % (chkpt, status,self.get_extra_info())) + conn.send(" | checkpoint %s %s %s" % (chkpt, status, self.get_extra_info())) except: exc = sys.exc_info()[1] if (isinstance(exc, OSError) or isinstance(exc, IOError)) and \ @@ -536,7 +653,7 @@ class GMasterBase(object): ): return chan = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) - state_socket = os.path.join(gconf.socketdir, md5(gconf.state_socket_unencoded).hexdigest() + ".socket") + state_socket = os.path.join(gconf.socketdir, md5hex(gconf.state_socket_unencoded) + ".socket") try: os.unlink(state_socket) except: @@ -559,22 +676,6 @@ class GMasterBase(object): t.start() self.checkpoint_thread = t - def crawl_loop(self): - """start the keep-alive thread and iterate .crawl""" - timo = int(gconf.timeout or 0) - if timo > 0: - def keep_alive(): - while True: - vi, gap = self.keepalive_payload_hook(timo, timo * 0.5) - self.slave.server.keep_alive(vi) - time.sleep(gap) - t = Thread(target=keep_alive) - t.start() - self.lastreport['time'] = time.time() - self.crawlStartTime=datetime.now() - while not self.terminate: - self.crawl() - def add_job(self, path, label, job, *a, **kw): """insert @job function to job table at @path with @label""" if self.jobtab.get(path) == None: @@ -600,7 +701,7 @@ class GMasterBase(object): ret = j[-1]() if not ret: succeed = False - if succeed: + if succeed and not args[0] == None: self.sendmark(path, *args) return succeed @@ -653,6 +754,319 @@ class GMasterBase(object): tuple(srep(vi) for vi in volinfo_state + volinfo_sys + newstate)) return newstate, param.state_change +class GMasterChangelogMixin(GMasterCommon): + """ changelog based change detection and syncing """ + + # index for change type and entry + IDX_START = 0 + IDX_END = 2 + + POS_GFID = 0 + POS_TYPE = 1 + POS_ENTRY1 = 2 + POS_ENTRY2 = 3 # renames + + _CL_TYPE_DATA_PFX = "D " + _CL_TYPE_METADATA_PFX = "M " + _CL_TYPE_ENTRY_PFX = "E " + + TYPE_GFID = [_CL_TYPE_DATA_PFX] # ignoring metadata ops + TYPE_ENTRY = [_CL_TYPE_ENTRY_PFX] + + # flat directory heirarchy for gfid based access + FLAT_DIR_HIERARCHY = '.' + + def fallback_xsync(self): + logging.info('falling back to xsync mode') + gconf.configinterface.set('change-detector', 'xsync') + selfkill() + + def setup_working_dir(self): + workdir = os.path.join(gconf.working_dir, md5hex(gconf.local_path)) + logfile = os.path.join(workdir, 'changes.log') + logging.debug('changelog working dir %s (log: %s)' % (workdir, logfile)) + return (workdir, logfile) + + def lstat(self, e): + try: + return os.lstat(e) + except (IOError, OSError): + ex = sys.exc_info()[1] + if ex.errno == ENOENT: + return ex.errno + else: + raise + + # sync data + def syncdata(self, datas): + logging.debug('datas: %s' % (datas)) + for data in datas: + logging.debug('candidate for syncing %s' % data) + pb = self.syncer.add(data) + timeA = datetime.now() + def regjob(se, xte, pb): + rv = pb.wait() + if rv[0]: + logging.debug('synced ' + se) + # update stats + timeB = datetime.now() + self.crawl_stats['last_synctime'] = timeB - timeA + self.crawl_stats['sync_time'] += ((self.crawl_stats['last_synctime'].microseconds) / (10.0 ** 6)) + self.crawl_stats['files_synced'] += 1 + self.crawl_stats['bytes_synced'] += self.syncer.bytes_synced + + # cumulative statistics + self.total_crawl_stats['bytes_synced'] += self.syncer.bytes_synced + self.total_crawl_stats['sync_time'] += ((self.crawl_stats['last_synctime'].microseconds) / (10.0 ** 6)) + self.total_crawl_stats['files_synced'] += 1 + return True + else: + if rv[1] in [23, 24]: + # stat to check if the file exist + st = self.lstat(se) + if isinstance(st, int): + # file got unlinked in the interim + return True + logging.warn('Rsync: %s [errcode: %d]' % (se, rv[1])) + self.add_job(self.FLAT_DIR_HIERARCHY, 'reg', regjob, data, None, pb) + if self.wait(self.FLAT_DIR_HIERARCHY, None): + self.update_crawl_data() + return True + + def process_change(self, change, done): + clist = [] + entries = [] + purges = set() + links = set() + datas = set() + pfx = gauxpfx() + try: + f = open(change, "r") + clist = f.readlines() + f.close() + except IOError: + raise + + def edct(op, **ed): + dct = {} + dct['op'] = op + for k in ed: + if k == 'stat': + st = ed[k] + dst = dct['stat'] = {} + dst['uid'] = st.st_uid + dst['gid'] = st.st_gid + dst['mode'] = st.st_mode + else: + dct[k] = ed[k] + return dct + for e in clist: + e = e.strip() + et = e[self.IDX_START:self.IDX_END] + ec = e[self.IDX_END:].split(' ') + if et in self.TYPE_ENTRY: + ty = ec[self.POS_TYPE] + en = unescape(os.path.join(pfx, ec[self.POS_ENTRY1])) + gfid = ec[self.POS_GFID] + # definitely need a better way bucketize entry ops + if ty in ['UNLINK', 'RMDIR']: + entries.append(edct(ty, gfid=gfid, entry=en)) + purges.update([os.path.join(pfx, gfid)]) + continue + if not ty == 'RENAME': + go = os.path.join(pfx, gfid) + st = self.lstat(go) + if isinstance(st, int): + logging.debug('file %s got purged in the interim' % go) + continue + if ty in ['CREATE', 'MKDIR', 'MKNOD']: + entries.append(edct(ty, stat=st, entry=en, gfid=gfid)) + elif ty == 'LINK': + entries.append(edct(ty, stat=st, entry=en, gfid=gfid)) + links.update([os.path.join(pfx, gfid)]) + elif ty == 'SYMLINK': + entries.append(edct(ty, stat=st, entry=en, gfid=gfid, link=os.readlink(en))) + elif ty == 'RENAME': + e2 = unescape(os.path.join(pfx, ec[self.POS_ENTRY2])) + entries.append(edct(ty, gfid=gfid, entry=en, entry1=e2)) + else: + pass + elif et in self.TYPE_GFID: + da = os.path.join(pfx, ec[0]) + st = self.lstat(da) + if isinstance(st, int): + logging.debug('file %s got purged in the interim' % da) + continue + datas.update([da]) + logging.debug('entries: %s' % repr(entries)) + # sync namespace + if (entries): + self.slave.server.entry_ops(entries) + # sync data + if self.syncdata(datas - (purges - links)): + if done: + self.master.server.changelog_done(change) + return True + + def process(self, changes, done=1): + for change in changes: + times = 0 + while True: + times += 1 + logging.debug('processing change %s [%d time(s)]' % (change, times)) + if self.process_change(change, done): + break + # it's either entry_ops() or Rsync that failed to do it's + # job. Mostly it's entry_ops() [which currently has a problem + # of failing to create an entry but failing to return an errno] + # Therefore we do not know if it's either Rsync or the freaking + # entry_ops() that failed... so we retry the _whole_ changelog + # again. + # TODO: remove entry retries when it's gets fixed. + logging.warn('incomplete sync, retrying changelog: %s' % change) + time.sleep(0.5) + self.turns += 1 + + def upd_stime(self, stime): + if stime: + self.sendmark(self.FLAT_DIR_HIERARCHY, stime) + + def crawl(self): + changes = [] + try: + self.master.server.changelog_scan() + self.crawls += 1 + except OSError: + self.fallback_xsync() + changes = self.master.server.changelog_getchanges() + if changes: + xtl = self.xtime(self.FLAT_DIR_HIERARCHY) + if isinstance(xtl, int): + raise GsyncdError('master is corrupt') + logging.debug('processing changes %s' % repr(changes)) + self.process(changes) + self.upd_stime(xtl) + + def register(self): + (workdir, logfile) = self.setup_working_dir() + self.sleep_interval = int(gconf.change_interval) + # register with the changelog library + try: + # 9 == log level (DEBUG) + # 5 == connection retries + self.master.server.changelog_register(gconf.local_path, + workdir, logfile, 9, 5) + except OSError: + self.fallback_xsync() + # control should not reach here + raise + +class GMasterXsyncMixin(GMasterChangelogMixin): + """ + + This crawl needs to be xtime based (as of now + it's not. this is beacuse we generate CHANGELOG + file during each crawl which is then processed + by process_change()). + For now it's used as a one-shot initial sync + mechanism and only syncs directories, regular + files and symlinks. + """ + + def register(self): + self.sleep_interval = 60 + self.tempdir = self.setup_working_dir()[0] + self.tempdir = os.path.join(self.tempdir, 'xsync') + logging.info('xsync temp directory: %s' % self.tempdir) + try: + os.makedirs(self.tempdir) + except OSError: + ex = sys.exc_info()[1] + if ex.errno == EEXIST and os.path.isdir(self.tempdir): + pass + else: + raise + + def write_entry_change(self, prefix, data=[]): + self.fh.write("%s %s\n" % (prefix, ' '.join(data))) + + def open(self): + try: + self.xsync_change = os.path.join(self.tempdir, 'XSYNC-CHANGELOG.' + str(int(time.time()))) + self.fh = open(self.xsync_change, 'w') + except IOError: + raise + + def close(self): + self.fh.close() + + def fname(self): + return self.xsync_change + + def crawl(self, path='.', xtr=None, done=0): + """ generate a CHANGELOG file consumable by process_change """ + if path == '.': + self.open() + self.crawls += 1 + if not xtr: + # get the root stime and use it for all comparisons + xtr = self.xtime('.', self.slave) + if isinstance(xtr, int): + if xtr != ENOENT: + raise GsyncdError('slave is corrupt') + xtr = self.minus_infinity + xtl = self.xtime(path) + if isinstance(xtl, int): + raise GsyncdError('master is corrupt') + if xtr == xtl: + if path == '.': + self.close() + return + self.xtime_reversion_hook(path, xtl, xtr) + logging.debug("entering " + path) + dem = self.master.server.entries(path) + pargfid = self.master.server.gfid(path) + if isinstance(pargfid, int): + logging.warn('skipping directory %s' % (path)) + for e in dem: + bname = e + e = os.path.join(path, e) + st = self.lstat(e) + if isinstance(st, int): + logging.warn('%s got purged in the interim..' % e) + continue + gfid = self.master.server.gfid(e) + if isinstance(gfid, int): + logging.warn('skipping entry %s..' % (e)) + continue + xte = self.xtime(e) + if isinstance(xte, int): + raise GsyncdError('master is corrupt') + if not self.need_sync(e, xte, xtr): + continue + mo = st.st_mode + if stat.S_ISDIR(mo): + self.write_entry_change("E", [gfid, 'MKDIR', escape(os.path.join(pargfid, bname))]) + self.crawl(e, xtr) + elif stat.S_ISREG(mo): + self.write_entry_change("E", [gfid, 'CREATE', escape(os.path.join(pargfid, bname))]) + self.write_entry_change("D", [gfid]) + elif stat.S_ISLNK(mo): + self.write_entry_change("E", [gfid, 'SYMLINK', escape(os.path.join(pargfid, bname))]) + else: + logging.info('ignoring %s' % e) + if path == '.': + logging.info('processing xsync changelog %s' % self.fname()) + self.close() + self.process([self.fname()], done) + self.upd_stime(xtl) + +class GMasterXtimeMixin(GMasterCommon): + """ xtime based change detection and syncing """ + + def register(self): + pass + def crawl(self, path='.', xtl=None): """crawling... @@ -691,46 +1105,6 @@ class GMasterBase(object): assert that the file systems (master / slave) underneath do not change and actions taken upon some condition will not lose their context by the time they are performed. """ - if path == '.': - if self.start: - self.crawls += 1 - logging.debug("... crawl #%d done, took %.6f seconds" % \ - (self.crawls, time.time() - self.start)) - time.sleep(1) - self.start = time.time() - should_display_info = self.start - self.lastreport['time'] >= 60 - if should_display_info: - logging.info("completed %d crawls, %d turns", - self.crawls - self.lastreport['crawls'], - self.turns - self.lastreport['turns']) - self.lastreport.update(crawls = self.crawls, - turns = self.turns, - time = self.start) - volinfo_sys, state_change = self.volinfo_hook() - if self.inter_master: - self.volinfo = volinfo_sys[self.KFGN] - else: - self.volinfo = volinfo_sys[self.KNAT] - if state_change == self.KFGN or (state_change == self.KNAT and not self.inter_master): - logging.info('new master is %s', self.uuid) - if self.volinfo: - logging.info("%s master with volume id %s ..." % \ - (self.inter_master and "intermediate" or "primary", - self.uuid)) - if state_change == self.KFGN: - gconf.configinterface.set('volume_id', self.uuid) - if self.volinfo: - if self.volinfo['retval']: - raise GsyncdError ("master is corrupt") - self.start_checkpoint_thread() - else: - if should_display_info or self.crawls == 0: - if self.inter_master: - logging.info("waiting for being synced from %s ..." % \ - self.volinfo_state[self.KFGN]['uuid']) - else: - logging.info("waiting for volume info ...") - return logging.debug("entering " + path) if not xtl: xtl = self.xtime(path) @@ -806,6 +1180,7 @@ class GMasterBase(object): st = indulgently(e, lambda e: os.lstat(e)) if st == False: continue + mo = st.st_mode adct = {'own': (st.st_uid, st.st_gid)} if stat.S_ISLNK(mo): @@ -815,16 +1190,19 @@ class GMasterBase(object): elif stat.S_ISREG(mo): logging.debug("syncing %s ..." % e) pb = self.syncer.add(e) - timeA=datetime.now() + timeA = datetime.now() def regjob(e, xte, pb): - if pb.wait(): + if pb.wait()[0]: logging.debug("synced " + e) self.sendmark_regular(e, xte) - - timeB=datetime.now() - self.lastSyncTime=timeB-timeA - self.syncTime=(self.syncTime+self.lastSyncTime.microseconds)/(10.0**6) - self.filesSynced=self.filesSynced+1 + # update stats + timeB = datetime.now() + self.crawl_stats['last_synctime'] = timeB - timeA + self.crawl_stats['sync_time'] += ((self.crawl_stats['last_synctime'].microseconds) / (10.0 ** 6)) + self.crawl_stats['files_synced'] += 1 + self.total_crawl_stats['sync_time'] += ((self.crawl_stats['last_synctime'].microseconds) / (10.0 ** 6)) + self.total_crawl_stats['files_synced'] += 1 + self.update_crawl_data() return True else: logging.warn("failed to sync " + e) @@ -841,6 +1219,7 @@ class GMasterBase(object): if path == '.': self.wait(path, xtl) + class BoxClosedErr(Exception): pass @@ -920,7 +1299,7 @@ class Syncer(object): self.slave = slave self.lock = Lock() self.pb = PostBox() - self.bytesSynced=0 + self.bytes_synced = 0 for i in range(int(gconf.sync_jobs)): t = Thread(target=self.syncjob) t.start() @@ -940,13 +1319,10 @@ class Syncer(object): pb.close() po = self.slave.rsync(pb) if po.returncode == 0: - regEx=re.search('\ *total\ *transferred\ *file\ *size:\ *(\d+)\ *bytes\ *',po.stdout.read(),re.IGNORECASE) - if regEx: - self.bytesSynced+=(int(regEx.group(1)))/1024 - ret = True + ret = (True, 0) elif po.returncode in (23, 24): # partial transfer (cf. rsync(1)), that's normal - ret = False + ret = (False, po.returncode) else: po.errfail() pb.wakeup(ret) -- cgit