summaryrefslogtreecommitdiffstats
path: root/geo-replication/syncdaemon/master.py
diff options
context:
space:
mode:
Diffstat (limited to 'geo-replication/syncdaemon/master.py')
-rw-r--r--geo-replication/syncdaemon/master.py611
1 files changed, 148 insertions, 463 deletions
diff --git a/geo-replication/syncdaemon/master.py b/geo-replication/syncdaemon/master.py
index 58df14954..95810a61e 100644
--- a/geo-replication/syncdaemon/master.py
+++ b/geo-replication/syncdaemon/master.py
@@ -17,7 +17,8 @@ from datetime import datetime
from gconf import gconf
from tempfile import mkdtemp, NamedTemporaryFile
from syncdutils import FreeObject, Thread, GsyncdError, boolify, escape, \
- unescape, select, gauxpfx, md5hex, selfkill, entry2pb
+ unescape, select, gauxpfx, md5hex, selfkill, entry2pb, \
+ lstat, errno_wrap
URXTIME = (-1, 0)
@@ -40,11 +41,7 @@ def _volinfo_hook_relax_foreign(self):
expiry)
time.sleep(expiry)
volinfo_sys = self.get_sys_volinfo()
- self.volinfo_state, state_change = self.volinfo_state_machine(self.volinfo_state,
- volinfo_sys)
- if self.inter_master:
- raise GsyncdError("cannot be intermediate master in special mode")
- return (volinfo_sys, state_change)
+ return volinfo_sys
# The API!
@@ -133,10 +130,7 @@ class NormalMixin(object):
return (vi, gap)
def volinfo_hook(self):
- volinfo_sys = self.get_sys_volinfo()
- self.volinfo_state, state_change = self.volinfo_state_machine(self.volinfo_state,
- volinfo_sys)
- return (volinfo_sys, state_change)
+ return self.get_sys_volinfo()
def xtime_reversion_hook(self, path, xtl, xtr):
if xtr > xtl:
@@ -147,6 +141,7 @@ class NormalMixin(object):
def set_slave_xtime(self, path, mark):
self.slave.server.set_xtime(path, self.uuid, mark)
+ self.slave.server.set_xtime_remote(path, self.uuid, mark)
class PartialMixin(NormalMixin):
"""a variant tuned towards operation with a master
@@ -155,7 +150,7 @@ class PartialMixin(NormalMixin):
def xtime_reversion_hook(self, path, xtl, xtr):
pass
-class WrapupMixin(NormalMixin):
+class RecoverMixin(NormalMixin):
"""a variant that differs from normal in terms
of ignoring non-indexed files"""
@@ -166,142 +161,12 @@ class WrapupMixin(NormalMixin):
if not 'default_xtime' in opts:
opts['default_xtime'] = URXTIME
- @staticmethod
def keepalive_payload_hook(self, timo, gap):
return (None, gap)
def volinfo_hook(self):
return _volinfo_hook_relax_foreign(self)
-class BlindMixin(object):
- """Geo-rep flavor using vectored xtime.
-
- Coordinates are the master, slave uuid pair;
- in master coordinate behavior is normal,
- in slave coordinate we force synchronization
- on any value difference (these are in disjunctive
- relation, ie. if either orders the entry to be
- synced, it shall be synced.
- """
-
- minus_infinity = (URXTIME, None)
-
- @staticmethod
- def serialize_xtime(xt):
- a = []
- for x in xt:
- if not x:
- x = ('None', '')
- a.extend(x)
- return '.'.join(str(n) for n in a)
-
- @staticmethod
- def deserialize_xtime(xt):
- a = xt.split(".")
- a = (tuple(a[0:2]), tuple(a[3:4]))
- b = []
- for p in a:
- if p[0] == 'None':
- p = None
- else:
- p = tuple(int(x) for x in p)
- b.append(p)
- return tuple(b)
-
- @staticmethod
- def native_xtime(xt):
- return xt[0]
-
- @staticmethod
- def xtime_geq(xt0, xt1):
- return (not xt1[0] or xt0[0] >= xt1[0]) and \
- (not xt1[1] or xt0[1] >= xt1[1])
-
- @property
- def ruuid(self):
- if self.volinfo_r:
- return self.volinfo_r['uuid']
-
- @staticmethod
- def make_xtime_opts(is_master, opts):
- if not 'create' in opts:
- opts['create'] = is_master
- if not 'default_xtime' in opts:
- opts['default_xtime'] = URXTIME
-
- def xtime_low(self, server, path, **opts):
- xtd = server.xtime_vec(path, self.uuid, self.ruuid)
- if isinstance(xtd, int):
- return xtd
- xt = (xtd[self.uuid], xtd[self.ruuid])
- if not xt[1] and (not xt[0] or xt[0] < self.volmark):
- if opts['create']:
- # not expected, but can happen if file originates
- # from interrupted gsyncd transfer
- logging.warn('have to fix up missing xtime on ' + path)
- xt0 = _xtime_now()
- server.aggregated.set_xtime(path, self.uuid, xt0)
- else:
- xt0 = opts['default_xtime']
- xt = (xt0, xt[1])
- return xt
-
- @staticmethod
- def keepalive_payload_hook(self, timo, gap):
- return (None, gap)
-
- def volinfo_hook(self):
- res = _volinfo_hook_relax_foreign(self)
- volinfo_r_new = self.slave.server.aggregated.native_volume_info()
- if volinfo_r_new['retval']:
- raise GsyncdError("slave is corrupt")
- if getattr(self, 'volinfo_r', None):
- if self.volinfo_r['uuid'] != volinfo_r_new['uuid']:
- raise GsyncdError("uuid mismatch on slave")
- self.volinfo_r = volinfo_r_new
- return res
-
- def xtime_reversion_hook(self, path, xtl, xtr):
- if not isinstance(xtr[0], int) and \
- (isinstance(xtl[0], int) or xtr[0] > xtl[0]):
- raise GsyncdError("timestamp corruption for " + path)
-
- def need_sync(self, e, xte, xtrd):
- if xte[0]:
- if not xtrd[0] or xte[0] > xtrd[0]:
- # there is outstanding diff at 0th pos,
- # we can short-cut to true
- return True
- # we arrived to this point by either of these
- # two possiblilites:
- # - no outstanding difference at 0th pos,
- # wanna see 1st pos if he raises veto
- # against "no need to sync" proposal
- # - no data at 0th pos, 1st pos will have
- # to decide (due to xtime assignment,
- # in this case 1st pos does carry data
- # -- iow, if 1st pos did not have data,
- # and 0th neither, 0th would have been
- # force-feeded)
- if not xte[1]:
- # no data, no veto
- return False
- # the hard work: for 1st pos,
- # the conduct is fetch corresponding
- # slave data and do a "blind" comparison
- # (ie. do not care who is newer, we trigger
- # sync on non-identical xitmes)
- xtr = self.xtime(e, self.slave)
- return isinstance(xtr, int) or xte[1] != xtr[1]
-
- def set_slave_xtime(self, path, mark):
- xtd = {}
- for (u, t) in zip((self.uuid, self.ruuid), mark):
- if t:
- xtd[u] = t
- self.slave.server.set_xtime_vec(path, xtd)
-
-
# Further mixins for certain tunable behaviors
class SendmarkNormalMixin(object):
@@ -355,13 +220,6 @@ class GMasterCommon(object):
if self.volinfo:
return self.volinfo['volume_mark']
- @property
- def inter_master(self):
- """decide if we are an intermediate master
- in a cascading setup
- """
- return self.volinfo_state[self.KFGN] and True or False
-
def xtime(self, path, *a, **opts):
"""get amended xtime
@@ -379,7 +237,13 @@ class GMasterCommon(object):
return self.xtime_low(rsc.server, path, **opts)
def get_initial_crawl_data(self):
- default_data = {'sync_time': 0, 'files_synced': 0, 'bytes_synced': 0}
+ # while persisting only 'files_syncd' is non-zero, rest of
+ # the stats are nulls. lets keep it that way in case they
+ # are needed to be used some day...
+ default_data = {'files_syncd': 0,
+ 'files_remaining': 0,
+ 'bytes_remaining': 0,
+ 'purges_remaining': 0}
if getattr(gconf, 'state_detail_file', None):
try:
return json.load(open(gconf.state_detail_file))
@@ -392,7 +256,6 @@ class GMasterCommon(object):
return default_data
else:
raise
-
return default_data
def update_crawl_data(self):
@@ -421,17 +284,11 @@ class GMasterCommon(object):
self.crawls = 0
self.turns = 0
self.total_turns = int(gconf.turns)
+ self.crawl_start = datetime.now()
self.lastreport = {'crawls': 0, 'turns': 0, 'time': 0}
- self.crawl_stats = {'sync_time': 0, 'last_synctime': 0, 'crawl_starttime': 0,
- 'crawl_time': 0, 'files_synced': 0, 'bytes_synced' :0}
- self.total_crawl_stats = self.get_initial_crawl_data()
+ self.total_crawl_stats = None
self.start = None
self.change_seen = None
- # the authoritative (foreign, native) volinfo pair
- # which lets us deduce what to do when we refetch
- # the volinfos from system
- uuid_preset = getattr(gconf, 'volume_id', None)
- self.volinfo_state = (uuid_preset and {'uuid': uuid_preset}, None)
# the actual volinfo we make use of
self.volinfo = None
self.terminate = False
@@ -450,34 +307,6 @@ class GMasterCommon(object):
t = Thread(target=keep_alive)
t.start()
- def volinfo_query(self):
- """volume info state machine"""
- volinfo_sys, state_change = self.volinfo_hook()
- if self.inter_master:
- self.volinfo = volinfo_sys[self.KFGN]
- else:
- self.volinfo = volinfo_sys[self.KNAT]
- if state_change == self.KFGN or (state_change == self.KNAT and not self.inter_master):
- logging.info('new master is %s', self.uuid)
- if self.volinfo:
- logging.info("%s master with volume id %s ..." % \
- (self.inter_master and "intermediate" or "primary",
- self.uuid))
- if state_change == self.KFGN:
- gconf.configinterface.set('volume_id', self.uuid)
- if self.volinfo:
- if self.volinfo['retval']:
- raise GsyncdError ("master is corrupt")
- self.start_checkpoint_thread()
- else:
- if should_display_info or self.crawls == 0:
- if self.inter_master:
- logging.info("waiting for being synced from %s ..." % \
- self.volinfo_state[self.KFGN]['uuid'])
- else:
- logging.info("waiting for volume info ...")
- return True
-
def should_crawl(cls):
return (gconf.glusterd_uuid in cls.master.server.node_uuid())
@@ -490,26 +319,38 @@ class GMasterCommon(object):
# for a passive gsyncd (ie. in a replicate scenario)
# the keepalive thread would keep the connection alive.
self.init_keep_alive()
- self.lastreport['time'] = time.time()
- self.crawl_stats['crawl_starttime'] = datetime.now()
+ # no need to maintain volinfo state machine.
+ # in a cascading setup, each geo-replication session is
+ # independent (ie. 'volume-mark' and 'xtime' are not
+ # propogated). This is beacuse the slave's xtime is now
+ # stored on the master itself. 'volume-mark' just identifies
+ # that we are in a cascading setup and need to enable
+ # 'geo-replication.ignore-pid-check' option.
+ volinfo_sys = self.volinfo_hook()
+ self.volinfo = volinfo_sys[self.KNAT]
+ inter_master = volinfo_sys[self.KFGN]
+ logging.info("%s master with volume id %s ..." % \
+ (inter_master and "intermediate" or "primary",
+ self.uuid))
+ gconf.configinterface.set('volume_id', self.uuid)
+ if self.volinfo:
+ if self.volinfo['retval']:
+ raise GsyncdError("master is corrupt")
+ self.start_checkpoint_thread()
+ else:
+ raise GsyncdError("master volinfo unavailable")
+ self.total_crawl_stats = self.get_initial_crawl_data()
+ self.lastreport['time'] = time.time()
logging.info('crawl interval: %d seconds' % self.sleep_interval)
+
t0 = time.time()
crawl = self.should_crawl()
while not self.terminate:
- if self.volinfo_query():
- continue
- t1 = time.time()
- if int(t1 - t0) >= 60: #lets hardcode this check to 60 seconds
- crawl = self.should_crawl()
- t0 = t1
- if not crawl:
- time.sleep(5)
- continue
if self.start:
logging.debug("... crawl #%d done, took %.6f seconds" % \
(self.crawls, time.time() - self.start))
- self.start = t1
+ self.start = time.time()
should_display_info = self.start - self.lastreport['time'] >= 60
if should_display_info:
logging.info("%d crawls, %d turns",
@@ -518,6 +359,13 @@ class GMasterCommon(object):
self.lastreport.update(crawls = self.crawls,
turns = self.turns,
time = self.start)
+ t1 = time.time()
+ if int(t1 - t0) >= 60: #lets hardcode this check to 60 seconds
+ crawl = self.should_crawl()
+ t0 = t1
+ if not crawl:
+ time.sleep(5)
+ continue
self.crawl()
if oneshot:
return
@@ -555,17 +403,13 @@ class GMasterCommon(object):
return ts
def get_extra_info(self):
- str_info = "\nFilesSynced=%d;" % (self.crawl_stats['files_synced'])
- str_info += "BytesSynced=%s;" % (self.crawl_stats['bytes_synced'])
-
- self.crawl_stats['crawl_time'] = datetime.now() - self.crawl_stats['crawl_starttime']
-
- str_info += "Uptime=%s;" % (self._crawl_time_format(self.crawl_stats['crawl_time']))
- str_info += "SyncTime=%s;" % (self.crawl_stats['sync_time'])
- str_info += "TotalSyncTime=%s;" % (self.total_crawl_stats['sync_time'])
- str_info += "TotalFilesSynced=%d;" % (self.total_crawl_stats['files_synced'])
- str_info += "TotalBytesSynced=%s;" % (self.total_crawl_stats['bytes_synced'])
- str_info += "\0"
+ str_info = '\nUptime=%s;FilesSyncd=%d;FilesPending=%d;BytesPending=%d;DeletesPending=%d;' % \
+ (self._crawl_time_format(datetime.now() - self.crawl_start), \
+ self.total_crawl_stats['files_syncd'], \
+ self.total_crawl_stats['files_remaining'], \
+ self.total_crawl_stats['bytes_remaining'], \
+ self.total_crawl_stats['purges_remaining'])
+ str_info += '\0'
logging.debug(str_info)
return str_info
@@ -714,46 +558,6 @@ class GMasterCommon(object):
self.slave.server.setattr(path, adct)
self.set_slave_xtime(path, mark)
- @staticmethod
- def volinfo_state_machine(volinfo_state, volinfo_sys):
- """compute new volinfo_state from old one and incoming
- as of current system state, also indicating if there was a
- change regarding which volume mark is the authoritative one
-
- @volinfo_state, @volinfo_sys are pairs of volume mark dicts
- (foreign, native).
-
- Note this method is marked as static, ie. the computation is
- pure, without reliance on any excess implicit state. State
- transitions which are deemed as ambiguous or banned will raise
- an exception.
-
- """
- # store the value below "boxed" to emulate proper closures
- # (variables of the enclosing scope are available inner functions
- # provided they are no reassigned; mutation is OK).
- param = FreeObject(relax_mismatch = False, state_change = None, index=-1)
- def select_vi(vi0, vi):
- param.index += 1
- if vi and (not vi0 or vi0['uuid'] == vi['uuid']):
- if not vi0 and not param.relax_mismatch:
- param.state_change = param.index
- # valid new value found; for the rest, we are graceful about
- # uuid mismatch
- param.relax_mismatch = True
- return vi
- if vi0 and vi and vi0['uuid'] != vi['uuid'] and not param.relax_mismatch:
- # uuid mismatch for master candidate, bail out
- raise GsyncdError("aborting on uuid change from %s to %s" % \
- (vi0['uuid'], vi['uuid']))
- # fall back to old
- return vi0
- newstate = tuple(select_vi(*vip) for vip in zip(volinfo_state, volinfo_sys))
- srep = lambda vi: vi and vi['uuid'][0:8]
- logging.debug('(%s, %s) << (%s, %s) -> (%s, %s)' % \
- tuple(srep(vi) for vi in volinfo_state + volinfo_sys + newstate))
- return newstate, param.state_change
-
class GMasterChangelogMixin(GMasterCommon):
""" changelog based change detection and syncing """
@@ -776,6 +580,9 @@ class GMasterChangelogMixin(GMasterCommon):
# flat directory heirarchy for gfid based access
FLAT_DIR_HIERARCHY = '.'
+ # maximum retries per changelog before giving up
+ MAX_RETRIES = 10
+
def fallback_xsync(self):
logging.info('falling back to xsync mode')
gconf.configinterface.set('change-detector', 'xsync')
@@ -787,15 +594,11 @@ class GMasterChangelogMixin(GMasterCommon):
logging.debug('changelog working dir %s (log: %s)' % (workdir, logfile))
return (workdir, logfile)
- def lstat(self, e):
- try:
- return os.lstat(e)
- except (IOError, OSError):
- ex = sys.exc_info()[1]
- if ex.errno == ENOENT:
- return ex.errno
- else:
- raise
+ # update stats from *this* crawl
+ def update_cumulative_stats(self, files_pending):
+ self.total_crawl_stats['files_remaining'] = files_pending['count']
+ self.total_crawl_stats['bytes_remaining'] = files_pending['bytes']
+ self.total_crawl_stats['purges_remaining'] = files_pending['purge']
# sync data
def syncdata(self, datas):
@@ -803,43 +606,31 @@ class GMasterChangelogMixin(GMasterCommon):
for data in datas:
logging.debug('candidate for syncing %s' % data)
pb = self.syncer.add(data)
- timeA = datetime.now()
def regjob(se, xte, pb):
rv = pb.wait()
if rv[0]:
logging.debug('synced ' + se)
- # update stats
- timeB = datetime.now()
- self.crawl_stats['last_synctime'] = timeB - timeA
- self.crawl_stats['sync_time'] += ((self.crawl_stats['last_synctime'].microseconds) / (10.0 ** 6))
- self.crawl_stats['files_synced'] += 1
- self.crawl_stats['bytes_synced'] += self.syncer.bytes_synced
-
- # cumulative statistics
- self.total_crawl_stats['bytes_synced'] += self.syncer.bytes_synced
- self.total_crawl_stats['sync_time'] += ((self.crawl_stats['last_synctime'].microseconds) / (10.0 ** 6))
- self.total_crawl_stats['files_synced'] += 1
return True
else:
if rv[1] in [23, 24]:
# stat to check if the file exist
- st = self.lstat(se)
+ st = lstat(se)
if isinstance(st, int):
# file got unlinked in the interim
return True
logging.warn('Rsync: %s [errcode: %d]' % (se, rv[1]))
self.add_job(self.FLAT_DIR_HIERARCHY, 'reg', regjob, data, None, pb)
if self.wait(self.FLAT_DIR_HIERARCHY, None):
- self.update_crawl_data()
return True
- def process_change(self, change, done):
+ def process_change(self, change, done, retry):
+ pfx = gauxpfx()
clist = []
entries = []
- purges = set()
- links = set()
datas = set()
- pfx = gauxpfx()
+
+ # basic crawl stats: files and bytes
+ files_pending = {'count': 0, 'purge': 0, 'bytes': 0, 'files': []}
try:
f = open(change, "r")
clist = f.readlines()
@@ -860,6 +651,27 @@ class GMasterChangelogMixin(GMasterCommon):
else:
dct[k] = ed[k]
return dct
+
+ # regular file update: bytes & count
+ def _update_reg(entry, size):
+ if not entry in files_pending['files']:
+ files_pending['count'] += 1
+ files_pending['bytes'] += size
+ files_pending['files'].append(entry)
+ # updates for directories, symlinks etc..
+ def _update_rest():
+ files_pending['count'] += 1
+
+ # entry count
+ def entry_update(entry, size, mode):
+ if stat.S_ISREG(mode):
+ _update_reg(entry, size)
+ else:
+ _update_rest()
+ # purge count
+ def purge_update():
+ files_pending['purge'] += 1
+
for e in clist:
e = e.strip()
et = e[self.IDX_START:self.IDX_END]
@@ -870,51 +682,75 @@ class GMasterChangelogMixin(GMasterCommon):
gfid = ec[self.POS_GFID]
# definitely need a better way bucketize entry ops
if ty in ['UNLINK', 'RMDIR']:
- entries.append(edct(ty, gfid=gfid, entry=en))
- purges.update([os.path.join(pfx, gfid)])
- continue
- if not ty == 'RENAME':
- go = os.path.join(pfx, gfid)
- st = self.lstat(go)
- if isinstance(st, int):
+ purge_update()
+ entries.append(edct(ty, gfid=gfid, entry=en))
+ continue
+ go = os.path.join(pfx, gfid)
+ st = lstat(go)
+ if isinstance(st, int):
+ if ty == 'RENAME':
+ entries.append(edct('UNLINK', gfid=gfid, entry=en))
+ else:
logging.debug('file %s got purged in the interim' % go)
- continue
+ continue
+ entry_update(go, st.st_size, st.st_mode)
if ty in ['CREATE', 'MKDIR', 'MKNOD']:
entries.append(edct(ty, stat=st, entry=en, gfid=gfid))
elif ty == 'LINK':
entries.append(edct(ty, stat=st, entry=en, gfid=gfid))
- links.update([os.path.join(pfx, gfid)])
elif ty == 'SYMLINK':
- entries.append(edct(ty, stat=st, entry=en, gfid=gfid, link=os.readlink(en)))
+ rl = errno_wrap(os.readlink, [en], [ENOENT])
+ if isinstance(rl, int):
+ continue
+ entries.append(edct(ty, stat=st, entry=en, gfid=gfid, link=rl))
elif ty == 'RENAME':
e2 = unescape(os.path.join(pfx, ec[self.POS_ENTRY2]))
- entries.append(edct(ty, gfid=gfid, entry=en, entry1=e2))
+ entries.append(edct(ty, gfid=gfid, entry=en, entry1=e2, stat=st))
else:
- pass
+ logging.warn('ignoring %s [op %s]' % (gfid, ty))
elif et in self.TYPE_GFID:
- da = os.path.join(pfx, ec[0])
- st = self.lstat(da)
+ go = os.path.join(pfx, ec[0])
+ st = lstat(go)
if isinstance(st, int):
- logging.debug('file %s got purged in the interim' % da)
+ logging.debug('file %s got purged in the interim' % go)
continue
- datas.update([da])
+ entry_update(go, st.st_size, st.st_mode)
+ datas.update([go])
logging.debug('entries: %s' % repr(entries))
+ if not retry:
+ self.update_cumulative_stats(files_pending)
# sync namespace
if (entries):
self.slave.server.entry_ops(entries)
# sync data
- if self.syncdata(datas - (purges - links)):
+ if self.syncdata(datas):
if done:
self.master.server.changelog_done(change)
return True
+ def sync_done(self):
+ self.total_crawl_stats['files_syncd'] += self.total_crawl_stats['files_remaining']
+ self.total_crawl_stats['files_remaining'] = 0
+ self.total_crawl_stats['bytes_remaining'] = 0
+ self.total_crawl_stats['purges_remaining'] = 0
+ self.update_crawl_data()
+
def process(self, changes, done=1):
for change in changes:
- times = 0
+ tries = 0
+ retry = False
while True:
- times += 1
- logging.debug('processing change %s [%d time(s)]' % (change, times))
- if self.process_change(change, done):
+ logging.debug('processing change %s' % change)
+ if self.process_change(change, done, retry):
+ self.sync_done()
+ break
+ retry = True
+ tries += 1
+ if tries == self.MAX_RETRIES:
+ logging.warn('changelog %s could not be processed - moving on...' % os.path.basename(change))
+ self.sync_done()
+ if done:
+ self.master.server.changelog_done(change)
break
# it's either entry_ops() or Rsync that failed to do it's
# job. Mostly it's entry_ops() [which currently has a problem
@@ -928,7 +764,7 @@ class GMasterChangelogMixin(GMasterCommon):
self.turns += 1
def upd_stime(self, stime):
- if stime:
+ if not stime == URXTIME:
self.sendmark(self.FLAT_DIR_HIERARCHY, stime)
def crawl(self):
@@ -1031,7 +867,7 @@ class GMasterXsyncMixin(GMasterChangelogMixin):
for e in dem:
bname = e
e = os.path.join(path, e)
- st = self.lstat(e)
+ st = lstat(e)
if isinstance(st, int):
logging.warn('%s got purged in the interim..' % e)
continue
@@ -1048,178 +884,27 @@ class GMasterXsyncMixin(GMasterChangelogMixin):
if stat.S_ISDIR(mo):
self.write_entry_change("E", [gfid, 'MKDIR', escape(os.path.join(pargfid, bname))])
self.crawl(e, xtr)
- elif stat.S_ISREG(mo):
- self.write_entry_change("E", [gfid, 'CREATE', escape(os.path.join(pargfid, bname))])
- self.write_entry_change("D", [gfid])
elif stat.S_ISLNK(mo):
- self.write_entry_change("E", [gfid, 'SYMLINK', escape(os.path.join(pargfid, bname))])
+ rl = errno_wrap(os.readlink, [en], [ENOENT])
+ if isinstance(rl, int):
+ continue
+ self.write_entry_change("E", [gfid, 'SYMLINK', escape(os.path.join(pargfid, bname)), rl])
else:
- logging.info('ignoring %s' % e)
+ # if a file has a hardlink, create a Changelog entry as 'LINK' so the slave
+ # side will decide if to create the new entry, or to create link.
+ if st.st_nlink == 1:
+ self.write_entry_change("E", [gfid, 'MKNOD', escape(os.path.join(pargfid, bname))])
+ else:
+ self.write_entry_change("E", [gfid, 'LINK', escape(os.path.join(pargfid, bname))])
+ if stat.S_ISREG(mo):
+ self.write_entry_change("D", [gfid])
+
if path == '.':
logging.info('processing xsync changelog %s' % self.fname())
self.close()
self.process([self.fname()], done)
self.upd_stime(xtl)
-class GMasterXtimeMixin(GMasterCommon):
- """ xtime based change detection and syncing """
-
- def register(self):
- pass
-
- def crawl(self, path='.', xtl=None):
- """crawling...
-
- Standing around
- All the right people
- Crawling
- Tennis on Tuesday
- The ladder is long
- It is your nature
- You've gotta suntan
- Football on Sunday
- Society boy
-
- Recursively walk the master side tree and check if updates are
- needed due to xtime differences. One invocation of crawl checks
- children of @path and do a recursive enter only on
- those directory children where there is an update needed.
-
- Way of updates depend on file type:
- - for symlinks, sync them directy and synchronously
- - for regular children, register jobs for @path (cf. .add_job) to start
- and wait on their rsync
- - for directory children, register a job for @path which waits (.wait)
- on jobs for the given child
- (other kind of filesystem nodes are not considered)
-
- Those slave side children which do not exist on master are simply
- purged (see Server.purge).
-
- Behavior is fault tolerant, synchronization is adaptive: if some action fails,
- just go on relentlessly, adding a fail job (see .add_failjob) which will prevent
- the .sendmark on @path, so when the next crawl will arrive to @path it will not
- see it as up-to-date and will try to sync it again. While this semantics can be
- supported by funky design principles (http://c2.com/cgi/wiki?LazinessImpatienceHubris),
- the ultimate reason which excludes other possibilities is simply transience: we cannot
- assert that the file systems (master / slave) underneath do not change and actions
- taken upon some condition will not lose their context by the time they are performed.
- """
- logging.debug("entering " + path)
- if not xtl:
- xtl = self.xtime(path)
- if isinstance(xtl, int):
- self.add_failjob(path, 'no-local-node')
- return
- xtr = self.xtime(path, self.slave)
- if isinstance(xtr, int):
- if xtr != ENOENT:
- self.slave.server.purge(path)
- try:
- self.slave.server.mkdir(path)
- except OSError:
- self.add_failjob(path, 'no-remote-node')
- return
- xtr = self.minus_infinity
- else:
- self.xtime_reversion_hook(path, xtl, xtr)
- if xtl == xtr:
- if path == '.' and self.change_seen:
- self.turns += 1
- self.change_seen = False
- if self.total_turns:
- logging.info("finished turn #%s/%s" % \
- (self.turns, self.total_turns))
- if self.turns == self.total_turns:
- logging.info("reached turn limit")
- self.terminate = True
- return
- if path == '.':
- self.change_seen = True
- try:
- dem = self.master.server.entries(path)
- except OSError:
- self.add_failjob(path, 'local-entries-fail')
- return
- random.shuffle(dem)
- try:
- des = self.slave.server.entries(path)
- except OSError:
- self.slave.server.purge(path)
- try:
- self.slave.server.mkdir(path)
- des = self.slave.server.entries(path)
- except OSError:
- self.add_failjob(path, 'remote-entries-fail')
- return
- dd = set(des) - set(dem)
- if dd:
- self.purge_missing(path, dd)
- chld = []
- for e in dem:
- e = os.path.join(path, e)
- xte = self.xtime(e)
- if isinstance(xte, int):
- logging.warn("irregular xtime for %s: %s" % (e, errno.errorcode[xte]))
- elif self.need_sync(e, xte, xtr):
- chld.append((e, xte))
- def indulgently(e, fnc, blame=None):
- if not blame:
- blame = path
- try:
- return fnc(e)
- except (IOError, OSError):
- ex = sys.exc_info()[1]
- if ex.errno == ENOENT:
- logging.warn("salvaged ENOENT for " + e)
- self.add_failjob(blame, 'by-indulgently')
- return False
- else:
- raise
- for e, xte in chld:
- st = indulgently(e, lambda e: os.lstat(e))
- if st == False:
- continue
-
- mo = st.st_mode
- adct = {'own': (st.st_uid, st.st_gid)}
- if stat.S_ISLNK(mo):
- if indulgently(e, lambda e: self.slave.server.symlink(os.readlink(e), e)) == False:
- continue
- self.sendmark(e, xte, adct)
- elif stat.S_ISREG(mo):
- logging.debug("syncing %s ..." % e)
- pb = self.syncer.add(e)
- timeA = datetime.now()
- def regjob(e, xte, pb):
- if pb.wait()[0]:
- logging.debug("synced " + e)
- self.sendmark_regular(e, xte)
- # update stats
- timeB = datetime.now()
- self.crawl_stats['last_synctime'] = timeB - timeA
- self.crawl_stats['sync_time'] += ((self.crawl_stats['last_synctime'].microseconds) / (10.0 ** 6))
- self.crawl_stats['files_synced'] += 1
- self.total_crawl_stats['sync_time'] += ((self.crawl_stats['last_synctime'].microseconds) / (10.0 ** 6))
- self.total_crawl_stats['files_synced'] += 1
- self.update_crawl_data()
- return True
- else:
- logging.warn("failed to sync " + e)
- self.add_job(path, 'reg', regjob, e, xte, pb)
- elif stat.S_ISDIR(mo):
- adct['mode'] = mo
- if indulgently(e, lambda e: (self.add_job(path, 'cwait', self.wait, e, xte, adct),
- self.crawl(e, xte),
- True)[-1], blame=e) == False:
- continue
- else:
- # ignore fifos, sockets and special files
- pass
- if path == '.':
- self.wait(path, xtl)
-
-
class BoxClosedErr(Exception):
pass