summaryrefslogtreecommitdiffstats
path: root/xlators/features/marker/utils/syncdaemon/master.py
diff options
context:
space:
mode:
authorCsaba Henk <csaba@gluster.com>2011-08-10 05:02:43 +0300
committerAnand Avati <avati@gluster.com>2011-09-08 00:06:57 -0700
commit7d4560cbcdcae0d74cf486c544d5eb58775da51f (patch)
tree52a2a9cb4e51a4786b195492de18a1fb7b6713d2 /xlators/features/marker/utils/syncdaemon/master.py
parentd39a7fad09a6b4abcb23d132fd7dfdf0d440e928 (diff)
gsyncd: do the homework, document _everything_
Change-Id: I559e6a0709b8064cfd54c693e289c741f9c4c4ab BUG: 1570 Reviewed-on: http://review.gluster.com/319 Tested-by: Gluster Build System <jenkins@build.gluster.com> Reviewed-by: Kaushik BV <kaushikbv@gluster.com>
Diffstat (limited to 'xlators/features/marker/utils/syncdaemon/master.py')
-rw-r--r--xlators/features/marker/utils/syncdaemon/master.py124
1 files changed, 121 insertions, 3 deletions
diff --git a/xlators/features/marker/utils/syncdaemon/master.py b/xlators/features/marker/utils/syncdaemon/master.py
index e7cb977e8..4273bf0c4 100644
--- a/xlators/features/marker/utils/syncdaemon/master.py
+++ b/xlators/features/marker/utils/syncdaemon/master.py
@@ -14,11 +14,16 @@ from syncdutils import FreeObject, Thread, GsyncdError
URXTIME = (-1, 0)
class GMaster(object):
+ """class impementling master role"""
KFGN = 0
KNAT = 1
def get_sys_volinfo(self):
+ """query volume marks on fs root
+
+ err out on multiple foreign masters
+ """
fgn_vis, nat_vi = self.master.server.foreign_volume_infos(), \
self.master.server.native_volume_info()
fgn_vi = None
@@ -40,9 +45,20 @@ class GMaster(object):
@property
def inter_master(self):
+ """decide if we are an intermediate master
+ in a cascading setup
+ """
return self.volinfo_state[self.KFGN] and True or False
def xtime(self, path, *a, **opts):
+ """get amended xtime
+
+ as of amending, we can create missing xtime, or
+ determine a valid value if what we get is expired
+ (as of the volume mark expiry); way of amendig
+ depends on @opts and on subject of query (master
+ or slave).
+ """
if a:
rsc = a[0]
else:
@@ -87,7 +103,7 @@ class GMaster(object):
self.lastreport = {'crawls': 0, 'turns': 0}
self.start = None
self.change_seen = None
- # the authorative (foreign, native) volinfo pair
+ # the authoritative (foreign, native) volinfo pair
# which lets us deduce what to do when we refetch
# the volinfos from system
uuid_preset = getattr(gconf, 'volume_id', None)
@@ -97,6 +113,7 @@ class GMaster(object):
self.terminate = False
def crawl_loop(self):
+ """start the keep-alive thread and iterate .crawl"""
timo = int(gconf.timeout or 0)
if timo > 0:
def keep_alive():
@@ -124,15 +141,24 @@ class GMaster(object):
self.crawl()
def add_job(self, path, label, job, *a, **kw):
+ """insert @job function to job table at @path with @label"""
if self.jobtab.get(path) == None:
self.jobtab[path] = []
self.jobtab[path].append((label, a, lambda : job(*a, **kw)))
def add_failjob(self, path, label):
+ """invoke .add_job with a job that does nothing just fails"""
logging.debug('salvaged: ' + label)
self.add_job(path, label, lambda: False)
def wait(self, path, *args):
+ """perform jobs registered for @path
+
+ Reset jobtab entry for @path,
+ determine success as the conjuction of
+ success of all the jobs. In case of
+ success, call .sendmark on @path
+ """
jobs = self.jobtab.pop(path, [])
succeed = True
for j in jobs:
@@ -144,12 +170,29 @@ class GMaster(object):
return succeed
def sendmark(self, path, mark, adct=None):
+ """update slave side xtime for @path to master side xtime
+
+ also can send a setattr payload (see Server.setattr).
+ """
if adct:
self.slave.server.setattr(path, adct)
self.slave.server.set_xtime(path, self.uuid, mark)
@staticmethod
def volinfo_state_machine(volinfo_state, volinfo_sys):
+ """compute new volinfo_state from old one and incoming
+ as of current system state, also indicating if there was a
+ change regarding which volume mark is the authoritative one
+
+ @volinfo_state, @volinfo_sys are pairs of volume mark dicts
+ (foreign, native).
+
+ Note this method is marked as static, ie. the computation is
+ pure, without reliance on any excess implicit state. State
+ transitions which are deemed as ambiguous or banned will raise
+ an exception.
+
+ """
# store the value below "boxed" to emulate proper closures
# (variables of the enclosing scope are available inner functions
# provided they are no reassigned; mutation is OK).
@@ -176,6 +219,43 @@ class GMaster(object):
return newstate, param.state_change
def crawl(self, path='.', xtl=None):
+ """crawling...
+
+ Standing around
+ All the right people
+ Crawling
+ Tennis on Tuesday
+ The ladder is long
+ It is your nature
+ You've gotta suntan
+ Football on Sunday
+ Society boy
+
+ Recursively walk the master side tree and check if updates are
+ needed due to xtime differences. One invocation of crawl checks
+ children of @path and do a recursive enter only on
+ those directory children where there is an update needed.
+
+ Way of updates depend on file type:
+ - for symlinks, sync them directy and synchronously
+ - for regular children, register jobs for @path (cf. .add_job) to start
+ and wait on their rsync
+ - for directory children, register a job for @path which waits (.wait)
+ on jobs for the given child
+ (other kind of filesystem nodes are not considered)
+
+ Those slave side children which do not exist on master are simply
+ purged (see Server.purge).
+
+ Behavior is fault tolerant, synchronization is adaptive: if some action fails,
+ just go on relentlessly, adding a fail job (see .add_failjob) which will prevent
+ the .sendmark on @path, so when the next crawl will arrive to @path it will not
+ see it as up-to-date and will try to sync it again. While this semantics can be
+ supported by funky design principles (http://c2.com/cgi/wiki?LazinessImpatienceHubris),
+ the ultimate reason which excludes other possibilities is simply transience: we cannot
+ assert that the file systems (master / slave) underneath do not change and actions
+ taken upon some condition will not lose their context by the time they are performed.
+ """
if path == '.':
if self.start:
self.crawls += 1
@@ -326,14 +406,18 @@ class BoxClosedErr(Exception):
pass
class PostBox(list):
+ """synchronized collection for storing things thought of as "requests" """
def __init__(self, *a):
list.__init__(self, *a)
+ # too bad Python stdlib does not have read/write locks...
+ # it would suffivce to grab the lock in .append as reader, in .close as writer
self.lever = Condition()
self.open = True
self.done = False
def wait(self):
+ """wait on requests to be processed"""
self.lever.acquire()
if not self.done:
self.lever.wait()
@@ -341,6 +425,7 @@ class PostBox(list):
return self.result
def wakeup(self, data):
+ """wake up requestors with the result"""
self.result = data
self.lever.acquire()
self.done = True
@@ -348,6 +433,7 @@ class PostBox(list):
self.lever.release()
def append(self, e):
+ """post a request"""
self.lever.acquire()
if not self.open:
raise BoxClosedErr
@@ -355,13 +441,43 @@ class PostBox(list):
self.lever.release()
def close(self):
+ """prohibit the posting of further requests"""
self.lever.acquire()
self.open = False
self.lever.release()
class Syncer(object):
+ """a staged queue to relay rsync requests to rsync workers
+
+ By "staged queue" its meant that when a consumer comes to the
+ queue, it takes _all_ entries, leaving the queue empty.
+ (I don't know if there is an official term for this pattern.)
+
+ The queue uses a PostBox to accumulate incoming items.
+ When a consumer (rsync worker) comes, a new PostBox is
+ set up and the old one is passed on to the consumer.
+
+ Instead of the simplistic scheme of having one big lock
+ which synchronizes both the addition of new items and
+ PostBox exchanges, use a separate lock to arbitrate consumers,
+ and rely on PostBox's synchronization mechanisms take
+ care about additions.
+
+ There is a corner case racy situation, producers vs. consumers,
+ which is not handled by this scheme: namely, when the PostBox
+ exchange occurs in between being passed to the producer for posting
+ and the post placement. But that's what Postbox.close is for:
+ such a posting will find the PostBox closed, in which case
+ the producer can re-try posting against the actual PostBox of
+ the queue.
+
+ To aid accumlation of items in the PostBoxen before grabbed
+ by an rsync worker, the worker goes to sleep a bit after
+ each completed syncjob.
+ """
def __init__(self, slave):
+ """spawn worker threads"""
self.slave = slave
self.lock = Lock()
self.pb = PostBox()
@@ -370,6 +486,7 @@ class Syncer(object):
t.start()
def syncjob(self):
+ """the life of a worker"""
while True:
pb = None
while True:
@@ -393,8 +510,9 @@ class Syncer(object):
def add(self, e):
while True:
+ pb = self.pb
try:
- self.pb.append(e)
- return self.pb
+ pb.append(e)
+ return pb
except BoxClosedErr:
pass