summaryrefslogtreecommitdiffstats
path: root/geo-replication/syncdaemon/master.py
diff options
context:
space:
mode:
Diffstat (limited to 'geo-replication/syncdaemon/master.py')
-rw-r--r--geo-replication/syncdaemon/master.py1476
1 files changed, 1476 insertions, 0 deletions
diff --git a/geo-replication/syncdaemon/master.py b/geo-replication/syncdaemon/master.py
new file mode 100644
index 000000000..4301396f9
--- /dev/null
+++ b/geo-replication/syncdaemon/master.py
@@ -0,0 +1,1476 @@
+#
+# Copyright (c) 2011-2014 Red Hat, Inc. <http://www.redhat.com>
+# This file is part of GlusterFS.
+
+# This file is licensed to you under your choice of the GNU Lesser
+# General Public License, version 3 or any later version (LGPLv3 or
+# later), or the GNU General Public License, version 2 (GPLv2), in all
+# cases as published by the Free Software Foundation.
+#
+
+import os
+import sys
+import time
+import stat
+import json
+import logging
+import socket
+import string
+import errno
+from errno import ENOENT, ENODATA, EPIPE, EEXIST
+from threading import Condition, Lock
+from datetime import datetime
+from gconf import gconf
+from tempfile import NamedTemporaryFile
+from syncdutils import Thread, GsyncdError, boolify, escape
+from syncdutils import unescape, select, gauxpfx, md5hex, selfkill
+from syncdutils import lstat, errno_wrap
+
+URXTIME = (-1, 0)
+
+# Utility functions to help us to get to closer proximity
+# of the DRY principle (no, don't look for elevated or
+# perspectivistic things here)
+
+
+def _xtime_now():
+ t = time.time()
+ sec = int(t)
+ nsec = int((t - sec) * 1000000)
+ return (sec, nsec)
+
+
+def _volinfo_hook_relax_foreign(self):
+ volinfo_sys = self.get_sys_volinfo()
+ fgn_vi = volinfo_sys[self.KFGN]
+ if fgn_vi:
+ expiry = fgn_vi['timeout'] - int(time.time()) + 1
+ logging.info('foreign volume info found, waiting %d sec for expiry' %
+ expiry)
+ time.sleep(expiry)
+ volinfo_sys = self.get_sys_volinfo()
+ return volinfo_sys
+
+
+# The API!
+
+def gmaster_builder(excrawl=None):
+ """produce the GMaster class variant corresponding
+ to sync mode"""
+ this = sys.modules[__name__]
+ modemixin = gconf.special_sync_mode
+ if not modemixin:
+ modemixin = 'normal'
+ changemixin = isinstance(excrawl, str) and excrawl or gconf.change_detector
+ logging.info('setting up %s change detection mode' % changemixin)
+ modemixin = getattr(this, modemixin.capitalize() + 'Mixin')
+ crawlmixin = getattr(this, 'GMaster' + changemixin.capitalize() + 'Mixin')
+ sendmarkmixin = boolify(
+ gconf.use_rsync_xattrs) and SendmarkRsyncMixin or SendmarkNormalMixin
+ purgemixin = boolify(
+ gconf.ignore_deletes) and PurgeNoopMixin or PurgeNormalMixin
+ syncengine = boolify(gconf.use_tarssh) and TarSSHEngine or RsyncEngine
+
+ class _GMaster(crawlmixin, modemixin, sendmarkmixin,
+ purgemixin, syncengine):
+ pass
+ return _GMaster
+
+
+# Mixin classes that implement the data format
+# and logic particularities of the certain
+# sync modes
+
+class NormalMixin(object):
+
+ """normal geo-rep behavior"""
+
+ minus_infinity = URXTIME
+
+ # following staticmethods ideally would be
+ # methods of an xtime object (in particular,
+ # implementing the hooks needed for comparison
+ # operators), but at this point we don't yet
+ # have a dedicated xtime class
+
+ @staticmethod
+ def serialize_xtime(xt):
+ return "%d.%d" % tuple(xt)
+
+ @staticmethod
+ def deserialize_xtime(xt):
+ return tuple(int(x) for x in xt.split("."))
+
+ @staticmethod
+ def native_xtime(xt):
+ return xt
+
+ @staticmethod
+ def xtime_geq(xt0, xt1):
+ return xt0 >= xt1
+
+ def make_xtime_opts(self, is_master, opts):
+ if not 'create' in opts:
+ opts['create'] = is_master
+ if not 'default_xtime' in opts:
+ opts['default_xtime'] = URXTIME
+
+ def xtime_low(self, rsc, path, **opts):
+ if rsc == self.master:
+ xt = rsc.server.xtime(path, self.uuid)
+ else:
+ xt = rsc.server.stime(path, self.uuid)
+ if isinstance(xt, int) and xt == ENODATA:
+ xt = rsc.server.xtime(path, self.uuid)
+ if not isinstance(xt, int):
+ self.slave.server.set_stime(path, self.uuid, xt)
+ if isinstance(xt, int) and xt != ENODATA:
+ return xt
+ if xt == ENODATA or xt < self.volmark:
+ if opts['create']:
+ xt = _xtime_now()
+ rsc.server.aggregated.set_xtime(path, self.uuid, xt)
+ else:
+ xt = opts['default_xtime']
+ return xt
+
+ def keepalive_payload_hook(self, timo, gap):
+ # first grab a reference as self.volinfo
+ # can be changed in main thread
+ vi = self.volinfo
+ if vi:
+ # then have a private copy which we can mod
+ vi = vi.copy()
+ vi['timeout'] = int(time.time()) + timo
+ else:
+ # send keep-alives more frequently to
+ # avoid a delay in announcing our volume info
+ # to slave if it becomes established in the
+ # meantime
+ gap = min(10, gap)
+ return (vi, gap)
+
+ def volinfo_hook(self):
+ return self.get_sys_volinfo()
+
+ def xtime_reversion_hook(self, path, xtl, xtr):
+ if xtr > xtl:
+ raise GsyncdError("timestamp corruption for " + path)
+
+ def need_sync(self, e, xte, xtrd):
+ return xte > xtrd
+
+ def set_slave_xtime(self, path, mark):
+ self.slave.server.set_stime(path, self.uuid, mark)
+ # self.slave.server.set_xtime_remote(path, self.uuid, mark)
+
+
+class PartialMixin(NormalMixin):
+
+ """a variant tuned towards operation with a master
+ that has partial info of the slave (brick typically)"""
+
+ def xtime_reversion_hook(self, path, xtl, xtr):
+ pass
+
+
+class RecoverMixin(NormalMixin):
+
+ """a variant that differs from normal in terms
+ of ignoring non-indexed files"""
+
+ @staticmethod
+ def make_xtime_opts(is_master, opts):
+ if not 'create' in opts:
+ opts['create'] = False
+ if not 'default_xtime' in opts:
+ opts['default_xtime'] = URXTIME
+
+ def keepalive_payload_hook(self, timo, gap):
+ return (None, gap)
+
+ def volinfo_hook(self):
+ return _volinfo_hook_relax_foreign(self)
+
+# Further mixins for certain tunable behaviors
+
+
+class SendmarkNormalMixin(object):
+
+ def sendmark_regular(self, *a, **kw):
+ return self.sendmark(*a, **kw)
+
+
+class SendmarkRsyncMixin(object):
+
+ def sendmark_regular(self, *a, **kw):
+ pass
+
+
+class PurgeNormalMixin(object):
+
+ def purge_missing(self, path, names):
+ self.slave.server.purge(path, names)
+
+
+class PurgeNoopMixin(object):
+
+ def purge_missing(self, path, names):
+ pass
+
+
+class TarSSHEngine(object):
+
+ """Sync engine that uses tar(1) piped over ssh(1)
+ for data transfers. Good for lots of small files.
+ """
+
+ def a_syncdata(self, files):
+ logging.debug('files: %s' % (files))
+ for f in files:
+ pb = self.syncer.add(f)
+
+ def regjob(se, xte, pb):
+ rv = pb.wait()
+ if rv[0]:
+ logging.debug('synced ' + se)
+ return True
+ else:
+ # stat check for file presence
+ st = lstat(se)
+ if isinstance(st, int):
+ return True
+ logging.warn('tar+ssh: %s [errcode: %d]' % (se, rv[1]))
+ self.add_job(self.FLAT_DIR_HIERARCHY, 'reg', regjob, f, None, pb)
+
+ def syncdata_wait(self):
+ if self.wait(self.FLAT_DIR_HIERARCHY, None):
+ return True
+
+ def syncdata(self, files):
+ self.a_syncdata(files)
+ self.syncdata_wait()
+
+
+class RsyncEngine(object):
+
+ """Sync engine that uses rsync(1) for data transfers"""
+
+ def a_syncdata(self, files):
+ logging.debug('files: %s' % (files))
+ for f in files:
+ logging.debug('candidate for syncing %s' % f)
+ pb = self.syncer.add(f)
+
+ def regjob(se, xte, pb):
+ rv = pb.wait()
+ if rv[0]:
+ logging.debug('synced ' + se)
+ return True
+ else:
+ if rv[1] in [23, 24]:
+ # stat to check if the file exist
+ st = lstat(se)
+ if isinstance(st, int):
+ # file got unlinked in the interim
+ return True
+ logging.warn('Rsync: %s [errcode: %d]' % (se, rv[1]))
+ self.add_job(self.FLAT_DIR_HIERARCHY, 'reg', regjob, f, None, pb)
+
+ def syncdata_wait(self):
+ if self.wait(self.FLAT_DIR_HIERARCHY, None):
+ return True
+
+ def syncdata(self, files):
+ self.a_syncdata(files)
+ self.syncdata_wait()
+
+
+class GMasterCommon(object):
+
+ """abstract class impementling master role"""
+
+ KFGN = 0
+ KNAT = 1
+
+ def get_sys_volinfo(self):
+ """query volume marks on fs root
+
+ err out on multiple foreign masters
+ """
+ fgn_vis, nat_vi = (
+ self.master.server.aggregated.foreign_volume_infos(),
+ self.master.server.aggregated.native_volume_info())
+ fgn_vi = None
+ if fgn_vis:
+ if len(fgn_vis) > 1:
+ raise GsyncdError("cannot work with multiple foreign masters")
+ fgn_vi = fgn_vis[0]
+ return fgn_vi, nat_vi
+
+ @property
+ def uuid(self):
+ if self.volinfo:
+ return self.volinfo['uuid']
+
+ @property
+ def volmark(self):
+ if self.volinfo:
+ return self.volinfo['volume_mark']
+
+ def xtime(self, path, *a, **opts):
+ """get amended xtime
+
+ as of amending, we can create missing xtime, or
+ determine a valid value if what we get is expired
+ (as of the volume mark expiry); way of amendig
+ depends on @opts and on subject of query (master
+ or slave).
+ """
+ if a:
+ rsc = a[0]
+ else:
+ rsc = self.master
+ self.make_xtime_opts(rsc == self.master, opts)
+ return self.xtime_low(rsc, path, **opts)
+
+ def get_initial_crawl_data(self):
+ # while persisting only 'files_syncd' is non-zero, rest of
+ # the stats are nulls. lets keep it that way in case they
+ # are needed to be used some day...
+ default_data = {'files_syncd': 0,
+ 'files_remaining': 0,
+ 'bytes_remaining': 0,
+ 'purges_remaining': 0,
+ 'total_files_skipped': 0}
+ if getattr(gconf, 'state_detail_file', None):
+ try:
+ with open(gconf.state_detail_file, 'r+') as f:
+ loaded_data = json.load(f)
+ diff_data = set(default_data) - set(loaded_data)
+ if len(diff_data):
+ for i in diff_data:
+ loaded_data[i] = default_data[i]
+ return loaded_data
+ except IOError:
+ logging.warn('Creating new gconf.state_detail_file.')
+ # Create file with initial data
+ try:
+ with open(gconf.state_detail_file, 'wb') as f:
+ json.dump(default_data, f)
+ return default_data
+ except:
+ raise
+ return default_data
+
+ def update_crawl_data(self):
+ if getattr(gconf, 'state_detail_file', None):
+ try:
+ same_dir = os.path.dirname(gconf.state_detail_file)
+ with NamedTemporaryFile(dir=same_dir, delete=False) as tmp:
+ json.dump(self.total_crawl_stats, tmp)
+ tmp.flush()
+ os.fsync(tmp.fileno())
+ os.rename(tmp.name, gconf.state_detail_file)
+ except (IOError, OSError):
+ raise
+
+ def __init__(self, master, slave):
+ self.master = master
+ self.slave = slave
+ self.jobtab = {}
+ if boolify(gconf.use_tarssh):
+ logging.info("using 'tar over ssh' as the sync engine")
+ self.syncer = Syncer(slave, self.slave.tarssh)
+ else:
+ logging.info("using 'rsync' as the sync engine")
+ # partial transfer (cf. rsync(1)), that's normal
+ self.syncer = Syncer(slave, self.slave.rsync, [23, 24])
+ # crawls vs. turns:
+ # - self.crawls is simply the number of crawl() invocations on root
+ # - one turn is a maximal consecutive sequence of crawls so that each
+ # crawl in it detects a change to be synced
+ # - self.turns is the number of turns since start
+ # - self.total_turns is a limit so that if self.turns reaches it, then
+ # we exit (for diagnostic purposes)
+ # so, eg., if the master fs changes unceasingly, self.turns will remain
+ # 0.
+ self.crawls = 0
+ self.turns = 0
+ self.total_turns = int(gconf.turns)
+ self.crawl_start = datetime.now()
+ self.lastreport = {'crawls': 0, 'turns': 0, 'time': 0}
+ self.total_crawl_stats = None
+ self.start = None
+ self.change_seen = None
+ # the actual volinfo we make use of
+ self.volinfo = None
+ self.terminate = False
+ self.sleep_interval = 1
+ self.checkpoint_thread = None
+ self.current_files_skipped_count = 0
+ self.skipped_gfid_list = []
+
+ def init_keep_alive(cls):
+ """start the keep-alive thread """
+ timo = int(gconf.timeout or 0)
+ if timo > 0:
+ def keep_alive():
+ while True:
+ vi, gap = cls.keepalive_payload_hook(timo, timo * 0.5)
+ cls.slave.server.keep_alive(vi)
+ time.sleep(gap)
+ t = Thread(target=keep_alive)
+ t.start()
+
+ def should_crawl(cls):
+ return gconf.glusterd_uuid in cls.master.server.node_uuid()
+
+ def register(self):
+ self.register()
+
+ def crawlwrap(self, oneshot=False):
+ if oneshot:
+ # it's important to do this during the oneshot crawl as
+ # for a passive gsyncd (ie. in a replicate scenario)
+ # the keepalive thread would keep the connection alive.
+ self.init_keep_alive()
+
+ # no need to maintain volinfo state machine.
+ # in a cascading setup, each geo-replication session is
+ # independent (ie. 'volume-mark' and 'xtime' are not
+ # propogated). This is beacuse the slave's xtime is now
+ # stored on the master itself. 'volume-mark' just identifies
+ # that we are in a cascading setup and need to enable
+ # 'geo-replication.ignore-pid-check' option.
+ volinfo_sys = self.volinfo_hook()
+ self.volinfo = volinfo_sys[self.KNAT]
+ inter_master = volinfo_sys[self.KFGN]
+ logging.info("%s master with volume id %s ..." %
+ (inter_master and "intermediate" or "primary",
+ self.uuid))
+ gconf.configinterface.set('volume_id', self.uuid)
+ if self.volinfo:
+ if self.volinfo['retval']:
+ logging.warn("master cluster's info may not be valid %d" %
+ self.volinfo['retval'])
+ self.start_checkpoint_thread()
+ else:
+ raise GsyncdError("master volinfo unavailable")
+ self.total_crawl_stats = self.get_initial_crawl_data()
+ self.lastreport['time'] = time.time()
+ logging.info('crawl interval: %d seconds' % self.sleep_interval)
+
+ t0 = time.time()
+ crawl = self.should_crawl()
+ while not self.terminate:
+ if self.start:
+ logging.debug("... crawl #%d done, took %.6f seconds" %
+ (self.crawls, time.time() - self.start))
+ self.start = time.time()
+ should_display_info = self.start - self.lastreport['time'] >= 60
+ if should_display_info:
+ logging.info("%d crawls, %d turns",
+ self.crawls - self.lastreport['crawls'],
+ self.turns - self.lastreport['turns'])
+ self.lastreport.update(crawls=self.crawls,
+ turns=self.turns,
+ time=self.start)
+ t1 = time.time()
+ if int(t1 - t0) >= 60: # lets hardcode this check to 60 seconds
+ crawl = self.should_crawl()
+ t0 = t1
+ self.update_worker_remote_node()
+ if not crawl:
+ self.update_worker_health("Passive")
+ # bring up _this_ brick to the cluster stime
+ # which is min of cluster (but max of the replicas)
+ brick_stime = self.xtime('.', self.slave)
+ cluster_stime = self.master.server.aggregated.stime_mnt(
+ '.', '.'.join([str(self.uuid), str(gconf.slave_id)]))
+ logging.debug("Cluster stime: %s | Brick stime: %s" %
+ (repr(cluster_stime), repr(brick_stime)))
+ if not isinstance(cluster_stime, int):
+ if brick_stime < cluster_stime:
+ self.slave.server.set_stime(
+ self.FLAT_DIR_HIERARCHY, self.uuid, cluster_stime)
+ time.sleep(5)
+ continue
+ self.update_worker_health("Active")
+ self.crawl()
+ if oneshot:
+ return
+ time.sleep(self.sleep_interval)
+
+ @classmethod
+ def _checkpt_param(cls, chkpt, prm, xtimish=True):
+ """use config backend to lookup a parameter belonging to
+ checkpoint @chkpt"""
+ cprm = gconf.configinterface.get_realtime('checkpoint_' + prm)
+ if not cprm:
+ return
+ chkpt_mapped, val = cprm.split(':', 1)
+ if unescape(chkpt_mapped) != chkpt:
+ return
+ if xtimish:
+ val = cls.deserialize_xtime(val)
+ return val
+
+ @classmethod
+ def _set_checkpt_param(cls, chkpt, prm, val, xtimish=True):
+ """use config backend to store a parameter associated
+ with checkpoint @chkpt"""
+ if xtimish:
+ val = cls.serialize_xtime(val)
+ gconf.configinterface.set(
+ 'checkpoint_' + prm, "%s:%s" % (escape(chkpt), val))
+
+ @staticmethod
+ def humantime(*tpair):
+ """format xtime-like (sec, nsec) pair to human readable format"""
+ ts = datetime.fromtimestamp(float('.'.join(str(n) for n in tpair))).\
+ strftime("%Y-%m-%d %H:%M:%S")
+ if len(tpair) > 1:
+ ts += '.' + str(tpair[1])
+ return ts
+
+ def _crawl_time_format(self, crawl_time):
+ # Ex: 5 years, 4 days, 20:23:10
+ years, days = divmod(crawl_time.days, 365.25)
+ years = int(years)
+ days = int(days)
+
+ date = ""
+ m, s = divmod(crawl_time.seconds, 60)
+ h, m = divmod(m, 60)
+
+ if years != 0:
+ date += "%s %s " % (years, "year" if years == 1 else "years")
+ if days != 0:
+ date += "%s %s " % (days, "day" if days == 1 else "days")
+
+ date += "%s:%s:%s" % (string.zfill(h, 2),
+ string.zfill(m, 2), string.zfill(s, 2))
+ return date
+
+ def checkpt_service(self, chan, chkpt):
+ """checkpoint service loop
+
+ monitor and verify checkpoint status for @chkpt, and listen
+ for incoming requests for whom we serve a pretty-formatted
+ status report"""
+ while True:
+ chkpt = gconf.configinterface.get_realtime("checkpoint")
+ if not chkpt:
+ gconf.configinterface.delete("checkpoint_completed")
+ gconf.configinterface.delete("checkpoint_target")
+ # dummy loop for the case when there is no checkpt set
+ select([chan], [], [])
+ conn, _ = chan.accept()
+ conn.send('\0')
+ conn.close()
+ continue
+
+ checkpt_tgt = self._checkpt_param(chkpt, 'target')
+ if not checkpt_tgt:
+ checkpt_tgt = self.xtime('.')
+ if isinstance(checkpt_tgt, int):
+ raise GsyncdError("master root directory is "
+ "unaccessible (%s)",
+ os.strerror(checkpt_tgt))
+ self._set_checkpt_param(chkpt, 'target', checkpt_tgt)
+ logging.debug("checkpoint target %s has been determined "
+ "for checkpoint %s" %
+ (repr(checkpt_tgt), chkpt))
+
+ # check if the label is 'now'
+ chkpt_lbl = chkpt
+ try:
+ x1, x2 = chkpt.split(':')
+ if x1 == 'now':
+ chkpt_lbl = "as of " + self.humantime(x2)
+ except:
+ pass
+ completed = self._checkpt_param(chkpt, 'completed', xtimish=False)
+ if completed:
+ completed = tuple(int(x) for x in completed.split('.'))
+ s, _, _ = select([chan], [], [], (not completed) and 5 or None)
+ # either request made and we re-check to not
+ # give back stale data, or we still hunting for completion
+ if (self.native_xtime(checkpt_tgt) and (
+ self.native_xtime(checkpt_tgt) < self.volmark)):
+ # indexing has been reset since setting the checkpoint
+ status = "is invalid"
+ else:
+ xtr = self.xtime('.', self.slave)
+ if isinstance(xtr, int):
+ raise GsyncdError("slave root directory is "
+ "unaccessible (%s)",
+ os.strerror(xtr))
+ ncompleted = self.xtime_geq(xtr, checkpt_tgt)
+ if completed and not ncompleted: # stale data
+ logging.warn("completion time %s for checkpoint %s "
+ "became stale" %
+ (self.humantime(*completed), chkpt))
+ completed = None
+ gconf.configinterface.delete('checkpoint_completed')
+ if ncompleted and not completed: # just reaching completion
+ completed = "%.6f" % time.time()
+ self._set_checkpt_param(
+ chkpt, 'completed', completed, xtimish=False)
+ completed = tuple(int(x) for x in completed.split('.'))
+ logging.info("checkpoint %s completed" % chkpt)
+ status = completed and \
+ "completed at " + self.humantime(completed[0]) or \
+ "not reached yet"
+ if s:
+ conn = None
+ try:
+ conn, _ = chan.accept()
+ try:
+ conn.send("checkpoint %s is %s\0" %
+ (chkpt_lbl, status))
+ except:
+ exc = sys.exc_info()[1]
+ if ((isinstance(exc, OSError) or isinstance(
+ exc, IOError)) and exc.errno == EPIPE):
+ logging.debug('checkpoint client disconnected')
+ else:
+ raise
+ finally:
+ if conn:
+ conn.close()
+
+ def start_checkpoint_thread(self):
+ """prepare and start checkpoint service"""
+ if self.checkpoint_thread or not (
+ getattr(gconf, 'state_socket_unencoded', None) and getattr(
+ gconf, 'socketdir', None)
+ ):
+ return
+ chan = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+ state_socket = os.path.join(
+ gconf.socketdir, md5hex(gconf.state_socket_unencoded) + ".socket")
+ try:
+ os.unlink(state_socket)
+ except:
+ if sys.exc_info()[0] == OSError:
+ pass
+ chan.bind(state_socket)
+ chan.listen(1)
+ chkpt = gconf.configinterface.get_realtime("checkpoint")
+ t = Thread(target=self.checkpt_service, args=(chan, chkpt))
+ t.start()
+ self.checkpoint_thread = t
+
+ def add_job(self, path, label, job, *a, **kw):
+ """insert @job function to job table at @path with @label"""
+ if self.jobtab.get(path) is None:
+ self.jobtab[path] = []
+ self.jobtab[path].append((label, a, lambda: job(*a, **kw)))
+
+ def add_failjob(self, path, label):
+ """invoke .add_job with a job that does nothing just fails"""
+ logging.debug('salvaged: ' + label)
+ self.add_job(path, label, lambda: False)
+
+ def wait(self, path, *args):
+ """perform jobs registered for @path
+
+ Reset jobtab entry for @path,
+ determine success as the conjuction of
+ success of all the jobs. In case of
+ success, call .sendmark on @path
+ """
+ jobs = self.jobtab.pop(path, [])
+ succeed = True
+ for j in jobs:
+ ret = j[-1]()
+ if not ret:
+ succeed = False
+ if succeed and not args[0] is None:
+ self.sendmark(path, *args)
+ return succeed
+
+ def sendmark(self, path, mark, adct=None):
+ """update slave side xtime for @path to master side xtime
+
+ also can send a setattr payload (see Server.setattr).
+ """
+ if adct:
+ self.slave.server.setattr(path, adct)
+ self.set_slave_xtime(path, mark)
+
+
+class GMasterChangelogMixin(GMasterCommon):
+
+ """ changelog based change detection and syncing """
+
+ # index for change type and entry
+ IDX_START = 0
+ IDX_END = 2
+
+ POS_GFID = 0
+ POS_TYPE = 1
+ POS_ENTRY1 = -1
+
+ TYPE_META = "M "
+ TYPE_GFID = "D "
+ TYPE_ENTRY = "E "
+
+ # flat directory heirarchy for gfid based access
+ FLAT_DIR_HIERARCHY = '.'
+
+ # maximum retries per changelog before giving up
+ MAX_RETRIES = 10
+
+ def fallback_xsync(self):
+ logging.info('falling back to xsync mode')
+ gconf.configinterface.set('change-detector', 'xsync')
+ selfkill()
+
+ def setup_working_dir(self):
+ workdir = os.path.join(gconf.working_dir, md5hex(gconf.local_path))
+ logfile = os.path.join(workdir, 'changes.log')
+ logging.debug('changelog working dir %s (log: %s)' %
+ (workdir, logfile))
+ return (workdir, logfile)
+
+ def process_change(self, change, done, retry):
+ pfx = gauxpfx()
+ clist = []
+ entries = []
+ meta_gfid = set()
+ datas = set()
+
+ # basic crawl stats: files and bytes
+ files_pending = {'count': 0, 'purge': 0, 'bytes': 0, 'files': []}
+ try:
+ f = open(change, "r")
+ clist = f.readlines()
+ f.close()
+ except IOError:
+ raise
+
+ def edct(op, **ed):
+ dct = {}
+ dct['op'] = op
+ for k in ed:
+ if k == 'stat':
+ st = ed[k]
+ dst = dct['stat'] = {}
+ dst['uid'] = st.st_uid
+ dst['gid'] = st.st_gid
+ dst['mode'] = st.st_mode
+ else:
+ dct[k] = ed[k]
+ return dct
+
+ # entry counts (not purges)
+ def entry_update():
+ files_pending['count'] += 1
+
+ # purge count
+ def purge_update():
+ files_pending['purge'] += 1
+
+ for e in clist:
+ e = e.strip()
+ et = e[self.IDX_START:self.IDX_END] # entry type
+ ec = e[self.IDX_END:].split(' ') # rest of the bits
+
+ if et == self.TYPE_ENTRY:
+ # extract information according to the type of
+ # the entry operation. create(), mkdir() and mknod()
+ # have mode, uid, gid information in the changelog
+ # itself, so no need to stat()...
+ ty = ec[self.POS_TYPE]
+
+ # PARGFID/BNAME
+ en = unescape(os.path.join(pfx, ec[self.POS_ENTRY1]))
+ # GFID of the entry
+ gfid = ec[self.POS_GFID]
+
+ if ty in ['UNLINK', 'RMDIR']:
+ purge_update()
+ entries.append(edct(ty, gfid=gfid, entry=en))
+ elif ty in ['CREATE', 'MKDIR', 'MKNOD']:
+ entry_update()
+ # stat information present in the changelog itself
+ entries.append(edct(ty, gfid=gfid, entry=en,
+ mode=int(ec[2]),
+ uid=int(ec[3]), gid=int(ec[4])))
+ else:
+ # stat() to get mode and other information
+ go = os.path.join(pfx, gfid)
+ st = lstat(go)
+ if isinstance(st, int):
+ if ty == 'RENAME': # special hack for renames...
+ entries.append(edct('UNLINK', gfid=gfid, entry=en))
+ else:
+ logging.debug(
+ 'file %s got purged in the interim' % go)
+ continue
+
+ if ty == 'LINK':
+ entry_update()
+ entries.append(edct(ty, stat=st, entry=en, gfid=gfid))
+ elif ty == 'SYMLINK':
+ rl = errno_wrap(os.readlink, [en], [ENOENT])
+ if isinstance(rl, int):
+ continue
+ entry_update()
+ entries.append(
+ edct(ty, stat=st, entry=en, gfid=gfid, link=rl))
+ elif ty == 'RENAME':
+ entry_update()
+ e1 = unescape(
+ os.path.join(pfx, ec[self.POS_ENTRY1 - 1]))
+ entries.append(
+ edct(ty, gfid=gfid, entry=e1, entry1=en, stat=st))
+ else:
+ logging.warn('ignoring %s [op %s]' % (gfid, ty))
+ elif et == self.TYPE_GFID:
+ datas.add(os.path.join(pfx, ec[0]))
+ elif et == self.TYPE_META:
+ if ec[1] == 'SETATTR': # only setattr's for now...
+ meta_gfid.add(os.path.join(pfx, ec[0]))
+ else:
+ logging.warn('got invalid changelog type: %s' % (et))
+ logging.debug('entries: %s' % repr(entries))
+ if not retry:
+ self.update_worker_cumilitive_status(files_pending)
+ # sync namespace
+ if entries:
+ self.slave.server.entry_ops(entries)
+ # sync metadata
+ if meta_gfid:
+ meta_entries = []
+ for go in meta_gfid:
+ st = lstat(go)
+ if isinstance(st, int):
+ logging.debug('file %s got purged in the interim' % go)
+ continue
+ meta_entries.append(edct('META', go=go, stat=st))
+ if meta_entries:
+ self.slave.server.meta_ops(meta_entries)
+ # sync data
+ if datas:
+ self.a_syncdata(datas)
+
+ def process(self, changes, done=1):
+ tries = 0
+ retry = False
+
+ while True:
+ self.skipped_gfid_list = []
+ self.current_files_skipped_count = 0
+
+ # first, fire all changelog transfers in parallel. entry and
+ # metadata are performed synchronously, therefore in serial.
+ # However at the end of each changelog, data is synchronized
+ # with syncdata_async() - which means it is serial w.r.t
+ # entries/metadata of that changelog but happens in parallel
+ # with data of other changelogs.
+
+ for change in changes:
+ logging.debug('processing change %s' % change)
+ self.process_change(change, done, retry)
+ if not retry:
+ # number of changelogs processed in the batch
+ self.turns += 1
+
+ # Now we wait for all the data transfers fired off in the above
+ # step to complete. Note that this is not ideal either. Ideally
+ # we want to trigger the entry/meta-data transfer of the next
+ # batch while waiting for the data transfer of the current batch
+ # to finish.
+
+ # Note that the reason to wait for the data transfer (vs doing it
+ # completely in the background and call the changelog_done()
+ # asynchronously) is because this waiting acts as a "backpressure"
+ # and prevents a spiraling increase of wait stubs from consuming
+ # unbounded memory and resources.
+
+ # update the slave's time with the timestamp of the _last_
+ # changelog file time suffix. Since, the changelog prefix time
+ # is the time when the changelog was rolled over, introduce a
+ # tolerence of 1 second to counter the small delta b/w the
+ # marker update and gettimeofday().
+ # NOTE: this is only for changelog mode, not xsync.
+
+ # @change is the last changelog (therefore max time for this batch)
+ if self.syncdata_wait():
+ if done:
+ xtl = (int(change.split('.')[-1]) - 1, 0)
+ self.upd_stime(xtl)
+ map(self.master.server.changelog_done, changes)
+ self.update_worker_files_syncd()
+ break
+
+ # We do not know which changelog transfer failed, retry everything.
+ retry = True
+ tries += 1
+ if tries == self.MAX_RETRIES:
+ logging.warn('changelogs %s could not be processed - '
+ 'moving on...' %
+ ' '.join(map(os.path.basename, changes)))
+ self.update_worker_total_files_skipped(
+ self.current_files_skipped_count)
+ logging.warn('SKIPPED GFID = %s' %
+ ','.join(self.skipped_gfid_list))
+ self.update_worker_files_syncd()
+ if done:
+ xtl = (int(change.split('.')[-1]) - 1, 0)
+ self.upd_stime(xtl)
+ map(self.master.server.changelog_done, changes)
+ break
+ # it's either entry_ops() or Rsync that failed to do it's
+ # job. Mostly it's entry_ops() [which currently has a problem
+ # of failing to create an entry but failing to return an errno]
+ # Therefore we do not know if it's either Rsync or the freaking
+ # entry_ops() that failed... so we retry the _whole_ changelog
+ # again.
+ # TODO: remove entry retries when it's gets fixed.
+ logging.warn('incomplete sync, retrying changelogs: %s' %
+ ' '.join(map(os.path.basename, changes)))
+ time.sleep(0.5)
+
+ def upd_stime(self, stime, path=None):
+ if not path:
+ path = self.FLAT_DIR_HIERARCHY
+ if not stime == URXTIME:
+ self.sendmark(path, stime)
+
+ def get_worker_status_file(self):
+ file_name = gconf.local_path + '.status'
+ file_name = file_name.replace("/", "_")
+ worker_status_file = gconf.georep_session_working_dir + file_name
+ return worker_status_file
+
+ def update_worker_status(self, key, value):
+ default_data = {"remote_node": "N/A",
+ "worker status": "Not Started",
+ "crawl status": "N/A",
+ "files_syncd": 0,
+ "files_remaining": 0,
+ "bytes_remaining": 0,
+ "purges_remaining": 0,
+ "total_files_skipped": 0}
+ worker_status_file = self.get_worker_status_file()
+ try:
+ with open(worker_status_file, 'r+') as f:
+ loaded_data = json.load(f)
+ loaded_data[key] = value
+ os.ftruncate(f.fileno(), 0)
+ os.lseek(f.fileno(), 0, os.SEEK_SET)
+ json.dump(loaded_data, f)
+ f.flush()
+ os.fsync(f.fileno())
+ except (IOError, OSError, ValueError):
+ logging.info('Creating new %s' % worker_status_file)
+ try:
+ with open(worker_status_file, 'wb') as f:
+ default_data[key] = value
+ json.dump(default_data, f)
+ f.flush()
+ os.fsync(f.fileno())
+ except:
+ raise
+
+ def update_worker_cumilitive_status(self, files_pending):
+ default_data = {"remote_node": "N/A",
+ "worker status": "Not Started",
+ "crawl status": "N/A",
+ "files_syncd": 0,
+ "files_remaining": 0,
+ "bytes_remaining": 0,
+ "purges_remaining": 0,
+ "total_files_skipped": 0}
+ worker_status_file = self.get_worker_status_file()
+ try:
+ with open(worker_status_file, 'r+') as f:
+ loaded_data = json.load(f)
+ loaded_data['files_remaining'] = files_pending['count']
+ loaded_data['bytes_remaining'] = files_pending['bytes']
+ loaded_data['purges_remaining'] = files_pending['purge']
+ os.ftruncate(f.fileno(), 0)
+ os.lseek(f.fileno(), 0, os.SEEK_SET)
+ json.dump(loaded_data, f)
+ f.flush()
+ os.fsync(f.fileno())
+ except (IOError, OSError, ValueError):
+ logging.info('Creating new %s' % worker_status_file)
+ try:
+ with open(worker_status_file, 'wb') as f:
+ default_data['files_remaining'] = files_pending['count']
+ default_data['bytes_remaining'] = files_pending['bytes']
+ default_data['purges_remaining'] = files_pending['purge']
+ json.dump(default_data, f)
+ f.flush()
+ os.fsync(f.fileno())
+ except:
+ raise
+
+ def update_worker_remote_node(self):
+ node = sys.argv[-1]
+ node = node.split("@")[-1]
+ remote_node_ip = node.split(":")[0]
+ remote_node_vol = node.split(":")[3]
+ remote_node = remote_node_ip + '::' + remote_node_vol
+ self.update_worker_status('remote_node', remote_node)
+
+ def update_worker_health(self, state):
+ self.update_worker_status('worker status', state)
+
+ def update_worker_crawl_status(self, state):
+ self.update_worker_status('crawl status', state)
+
+ def update_worker_files_syncd(self):
+ default_data = {"remote_node": "N/A",
+ "worker status": "Not Started",
+ "crawl status": "N/A",
+ "files_syncd": 0,
+ "files_remaining": 0,
+ "bytes_remaining": 0,
+ "purges_remaining": 0,
+ "total_files_skipped": 0}
+ worker_status_file = self.get_worker_status_file()
+ try:
+ with open(worker_status_file, 'r+') as f:
+ loaded_data = json.load(f)
+ loaded_data['files_syncd'] += loaded_data['files_remaining']
+ loaded_data['files_remaining'] = 0
+ loaded_data['bytes_remaining'] = 0
+ loaded_data['purges_remaining'] = 0
+ os.ftruncate(f.fileno(), 0)
+ os.lseek(f.fileno(), 0, os.SEEK_SET)
+ json.dump(loaded_data, f)
+ f.flush()
+ os.fsync(f.fileno())
+ except (IOError, OSError, ValueError):
+ logging.info('Creating new %s' % worker_status_file)
+ try:
+ with open(worker_status_file, 'wb') as f:
+ json.dump(default_data, f)
+ f.flush()
+ os.fsync(f.fileno())
+ except:
+ raise
+
+ def update_worker_files_remaining(self, state):
+ self.update_worker_status('files_remaining', state)
+
+ def update_worker_bytes_remaining(self, state):
+ self.update_worker_status('bytes_remaining', state)
+
+ def update_worker_purges_remaining(self, state):
+ self.update_worker_status('purges_remaining', state)
+
+ def update_worker_total_files_skipped(self, value):
+ default_data = {"remote_node": "N/A",
+ "worker status": "Not Started",
+ "crawl status": "N/A",
+ "files_syncd": 0,
+ "files_remaining": 0,
+ "bytes_remaining": 0,
+ "purges_remaining": 0,
+ "total_files_skipped": 0}
+ worker_status_file = self.get_worker_status_file()
+ try:
+ with open(worker_status_file, 'r+') as f:
+ loaded_data = json.load(f)
+ loaded_data['total_files_skipped'] = value
+ loaded_data['files_remaining'] -= value
+ os.ftruncate(f.fileno(), 0)
+ os.lseek(f.fileno(), 0, os.SEEK_SET)
+ json.dump(loaded_data, f)
+ f.flush()
+ os.fsync(f.fileno())
+ except (IOError, OSError, ValueError):
+ logging.info('Creating new %s' % worker_status_file)
+ try:
+ with open(worker_status_file, 'wb') as f:
+ default_data['total_files_skipped'] = value
+ json.dump(default_data, f)
+ f.flush()
+ os.fsync(f.fileno())
+ except:
+ raise
+
+ def crawl(self):
+ self.update_worker_crawl_status("Changelog Crawl")
+ changes = []
+ # get stime (from the brick) and purge changelogs
+ # that are _historical_ to that time.
+ purge_time = self.xtime('.', self.slave)
+ if isinstance(purge_time, int):
+ purge_time = None
+ try:
+ self.master.server.changelog_scan()
+ self.crawls += 1
+ except OSError:
+ self.fallback_xsync()
+ self.update_worker_crawl_status("Hybrid Crawl")
+ changes = self.master.server.changelog_getchanges()
+ if changes:
+ if purge_time:
+ logging.info("slave's time: %s" % repr(purge_time))
+ processed = [x for x in changes
+ if int(x.split('.')[-1]) < purge_time[0]]
+ for pr in processed:
+ logging.info(
+ 'skipping already processed change: %s...' %
+ os.path.basename(pr))
+ self.master.server.changelog_done(pr)
+ changes.remove(pr)
+ logging.debug('processing changes %s' % repr(changes))
+ if changes:
+ self.process(changes)
+
+ def register(self):
+ (workdir, logfile) = self.setup_working_dir()
+ self.sleep_interval = int(gconf.change_interval)
+ # register with the changelog library
+ try:
+ # 9 == log level (DEBUG)
+ # 5 == connection retries
+ self.master.server.changelog_register(gconf.local_path,
+ workdir, logfile, 9, 5)
+ except OSError:
+ self.fallback_xsync()
+ # control should not reach here
+ raise
+
+
+class GMasterXsyncMixin(GMasterChangelogMixin):
+
+ """
+ This crawl needs to be xtime based (as of now
+ it's not. this is beacuse we generate CHANGELOG
+ file during each crawl which is then processed
+ by process_change()).
+ For now it's used as a one-shot initial sync
+ mechanism and only syncs directories, regular
+ files, hardlinks and symlinks.
+ """
+
+ XSYNC_MAX_ENTRIES = 1 << 13
+
+ def register(self):
+ self.counter = 0
+ self.comlist = []
+ self.stimes = []
+ self.sleep_interval = 60
+ self.tempdir = self.setup_working_dir()[0]
+ self.tempdir = os.path.join(self.tempdir, 'xsync')
+ logging.info('xsync temp directory: %s' % self.tempdir)
+ try:
+ os.makedirs(self.tempdir)
+ except OSError:
+ ex = sys.exc_info()[1]
+ if ex.errno == EEXIST and os.path.isdir(self.tempdir):
+ pass
+ else:
+ raise
+
+ def crawl(self):
+ """
+ event dispatcher thread
+
+ this thread dispatches either changelog or synchronizes stime.
+ additionally terminates itself on recieving a 'finale' event
+ """
+ def Xsyncer():
+ self.Xcrawl()
+ t = Thread(target=Xsyncer)
+ t.start()
+ logging.info('starting hybrid crawl...')
+ self.update_worker_crawl_status("Hybrid Crawl")
+ while True:
+ try:
+ item = self.comlist.pop(0)
+ if item[0] == 'finale':
+ logging.info('finished hybrid crawl syncing')
+ break
+ elif item[0] == 'xsync':
+ logging.info('processing xsync changelog %s' % (item[1]))
+ self.process([item[1]], 0)
+ elif item[0] == 'stime':
+ logging.debug('setting slave time: %s' % repr(item[1]))
+ self.upd_stime(item[1][1], item[1][0])
+ else:
+ logging.warn('unknown tuple in comlist (%s)' % repr(item))
+ except IndexError:
+ time.sleep(1)
+
+ def write_entry_change(self, prefix, data=[]):
+ self.fh.write("%s %s\n" % (prefix, ' '.join(data)))
+
+ def open(self):
+ try:
+ self.xsync_change = os.path.join(
+ self.tempdir, 'XSYNC-CHANGELOG.' + str(int(time.time())))
+ self.fh = open(self.xsync_change, 'w')
+ except IOError:
+ raise
+
+ def close(self):
+ self.fh.close()
+
+ def fname(self):
+ return self.xsync_change
+
+ def put(self, mark, item):
+ self.comlist.append((mark, item))
+
+ def sync_xsync(self, last):
+ """schedule a processing of changelog"""
+ self.close()
+ self.put('xsync', self.fname())
+ self.counter = 0
+ if not last:
+ time.sleep(1) # make sure changelogs are 1 second apart
+ self.open()
+
+ def sync_stime(self, stime=None, last=False):
+ """schedule a stime synchronization"""
+ if stime:
+ self.put('stime', stime)
+ if last:
+ self.put('finale', None)
+
+ def sync_done(self, stime=[], last=False):
+ self.sync_xsync(last)
+ if stime:
+ # Send last as True only for last stime entry
+ for st in stime[:-1]:
+ self.sync_stime(st, False)
+
+ if stime and stime[-1]:
+ self.sync_stime(stime[-1], last)
+
+ def is_sticky(self, path, mo):
+ """check for DHTs linkto sticky bit file"""
+ sticky = False
+ if mo & 01000:
+ sticky = self.master.server.linkto_check(path)
+ return sticky
+
+ def Xcrawl(self, path='.', xtr_root=None):
+ """
+ generate a CHANGELOG file consumable by process_change.
+
+ slave's xtime (stime) is _cached_ for comparisons across
+ the filesystem tree, but set after directory synchronization.
+ """
+ if path == '.':
+ self.open()
+ self.crawls += 1
+ if not xtr_root:
+ # get the root stime and use it for all comparisons
+ xtr_root = self.xtime('.', self.slave)
+ if isinstance(xtr_root, int):
+ if xtr_root != ENOENT:
+ logging.warn("slave cluster not returning the "
+ "correct xtime for root (%d)" % xtr_root)
+ xtr_root = self.minus_infinity
+ xtl = self.xtime(path)
+ if isinstance(xtl, int):
+ logging.warn("master cluster's xtime not found")
+ xtr = self.xtime(path, self.slave)
+ if isinstance(xtr, int):
+ if xtr != ENOENT:
+ logging.warn("slave cluster not returning the "
+ "correct xtime for %s (%d)" % (path, xtr))
+ xtr = self.minus_infinity
+ xtr = max(xtr, xtr_root)
+ if not self.need_sync(path, xtl, xtr):
+ if path == '.':
+ self.sync_done([(path, xtl)], True)
+ return
+ self.xtime_reversion_hook(path, xtl, xtr)
+ logging.debug("entering " + path)
+ dem = self.master.server.entries(path)
+ pargfid = self.master.server.gfid(path)
+ if isinstance(pargfid, int):
+ logging.warn('skipping directory %s' % (path))
+ for e in dem:
+ bname = e
+ e = os.path.join(path, e)
+ xte = self.xtime(e)
+ if isinstance(xte, int):
+ logging.warn("irregular xtime for %s: %s" %
+ (e, errno.errorcode[xte]))
+ continue
+ if not self.need_sync(e, xte, xtr):
+ continue
+ st = self.master.server.lstat(e)
+ if isinstance(st, int):
+ logging.warn('%s got purged in the interim ...' % e)
+ continue
+ if self.is_sticky(e, st.st_mode):
+ logging.debug('ignoring sticky bit file %s' % e)
+ continue
+ gfid = self.master.server.gfid(e)
+ if isinstance(gfid, int):
+ logging.warn('skipping entry %s..' % e)
+ continue
+ mo = st.st_mode
+ self.counter += 1
+ if self.counter == self.XSYNC_MAX_ENTRIES:
+ self.sync_done(self.stimes, False)
+ self.stimes = []
+ if stat.S_ISDIR(mo):
+ self.write_entry_change("E", [gfid, 'MKDIR', str(mo), str(
+ st.st_uid), str(st.st_gid), escape(os.path.join(pargfid,
+ bname))])
+ self.Xcrawl(e, xtr_root)
+ self.stimes.append((e, xte))
+ elif stat.S_ISLNK(mo):
+ self.write_entry_change(
+ "E", [gfid, 'SYMLINK', escape(os.path.join(pargfid,
+ bname))])
+ elif stat.S_ISREG(mo):
+ nlink = st.st_nlink
+ nlink -= 1 # fixup backend stat link count
+ # if a file has a hardlink, create a Changelog entry as
+ # 'LINK' so the slave side will decide if to create the
+ # new entry, or to create link.
+ if nlink == 1:
+ self.write_entry_change("E",
+ [gfid, 'MKNOD', str(mo),
+ str(st.st_uid),
+ str(st.st_gid),
+ escape(os.path.join(
+ pargfid, bname))])
+ else:
+ self.write_entry_change(
+ "E", [gfid, 'LINK', escape(os.path.join(pargfid,
+ bname))])
+ self.write_entry_change("D", [gfid])
+ if path == '.':
+ self.stimes.append((path, xtl))
+ self.sync_done(self.stimes, True)
+
+
+class BoxClosedErr(Exception):
+ pass
+
+
+class PostBox(list):
+
+ """synchronized collection for storing things thought of as "requests" """
+
+ def __init__(self, *a):
+ list.__init__(self, *a)
+ # too bad Python stdlib does not have read/write locks...
+ # it would suffivce to grab the lock in .append as reader, in .close as
+ # writer
+ self.lever = Condition()
+ self.open = True
+ self.done = False
+
+ def wait(self):
+ """wait on requests to be processed"""
+ self.lever.acquire()
+ if not self.done:
+ self.lever.wait()
+ self.lever.release()
+ return self.result
+
+ def wakeup(self, data):
+ """wake up requestors with the result"""
+ self.result = data
+ self.lever.acquire()
+ self.done = True
+ self.lever.notifyAll()
+ self.lever.release()
+
+ def append(self, e):
+ """post a request"""
+ self.lever.acquire()
+ if not self.open:
+ raise BoxClosedErr
+ list.append(self, e)
+ self.lever.release()
+
+ def close(self):
+ """prohibit the posting of further requests"""
+ self.lever.acquire()
+ self.open = False
+ self.lever.release()
+
+
+class Syncer(object):
+
+ """a staged queue to relay rsync requests to rsync workers
+
+ By "staged queue" its meant that when a consumer comes to the
+ queue, it takes _all_ entries, leaving the queue empty.
+ (I don't know if there is an official term for this pattern.)
+
+ The queue uses a PostBox to accumulate incoming items.
+ When a consumer (rsync worker) comes, a new PostBox is
+ set up and the old one is passed on to the consumer.
+
+ Instead of the simplistic scheme of having one big lock
+ which synchronizes both the addition of new items and
+ PostBox exchanges, use a separate lock to arbitrate consumers,
+ and rely on PostBox's synchronization mechanisms take
+ care about additions.
+
+ There is a corner case racy situation, producers vs. consumers,
+ which is not handled by this scheme: namely, when the PostBox
+ exchange occurs in between being passed to the producer for posting
+ and the post placement. But that's what Postbox.close is for:
+ such a posting will find the PostBox closed, in which case
+ the producer can re-try posting against the actual PostBox of
+ the queue.
+
+ To aid accumlation of items in the PostBoxen before grabbed
+ by an rsync worker, the worker goes to sleep a bit after
+ each completed syncjob.
+ """
+
+ def __init__(self, slave, sync_engine, resilient_errnos=[]):
+ """spawn worker threads"""
+ self.slave = slave
+ self.lock = Lock()
+ self.pb = PostBox()
+ self.sync_engine = sync_engine
+ self.errnos_ok = resilient_errnos
+ for i in range(int(gconf.sync_jobs)):
+ t = Thread(target=self.syncjob)
+ t.start()
+
+ def syncjob(self):
+ """the life of a worker"""
+ while True:
+ pb = None
+ while True:
+ self.lock.acquire()
+ if self.pb:
+ pb, self.pb = self.pb, PostBox()
+ self.lock.release()
+ if pb:
+ break
+ time.sleep(0.5)
+ pb.close()
+ po = self.sync_engine(pb)
+ if po.returncode == 0:
+ ret = (True, 0)
+ elif po.returncode in self.errnos_ok:
+ ret = (False, po.returncode)
+ else:
+ po.errfail()
+ pb.wakeup(ret)
+
+ def add(self, e):
+ while True:
+ pb = self.pb
+ try:
+ pb.append(e)
+ return pb
+ except BoxClosedErr:
+ pass