summaryrefslogtreecommitdiffstats
path: root/xlators/features/marker/utils/syncdaemon/master.py
diff options
context:
space:
mode:
authorCsaba Henk <csaba@redhat.com>2012-05-27 03:56:24 +0530
committerVijay Bellur <vijay@gluster.com>2012-06-13 08:37:41 -0700
commit118ce698e8af425bf75ceab2c9e71cfdaa0ac848 (patch)
tree83c642e4f2b60ffec25cbaf4c18dd08f24dcbcaa /xlators/features/marker/utils/syncdaemon/master.py
parent1877c8ea84adfc6c8943bba806e410de5eba84a7 (diff)
geo-rep: checkpointing
- gluster vol geo-rep M S conf checkpoint <LABEL|now> sets a checkpoint with LABEL (the keyword "now" is special, it's rendered to the label "as of <timestamp of current time>") that's used to refer to the checkpoint in the sequel. (Technically, gsyncd makes a note of the xtime of master's root as of setting the checkpoint, called the "checkpoint target".) - gluster vol geo-rep M S conf \!checkpoint deletes the checkpoint. - gluster vol geo-rep M S stat if status is OK, and there is a checkpoint configured, the checkpoint info is appended to status (either "not yet reached", or "completed at <timestamp of completion>"). (Technically, the worker runs a thread that monitors / serializes / verifies checkpoint status, and answers checkpoint status requests through a UNIX socket; monitoring boils down to querying the xtime of slave's root and comparing with the target.) - gluster vol geo-rep M S conf log-file | xargs grep checkpoint displays the checkpoint history. Set, delete and completion events are logged properly. Change-Id: I4398e0819f1504e6e496b4209e91a0e156e1a0f8 BUG: 826512 Signed-off-by: Csaba Henk <csaba@redhat.com> Reviewed-on: http://review.gluster.com/3491 Tested-by: Gluster Build System <jenkins@build.gluster.com> Reviewed-by: Venky Shankar <vshankar@redhat.com>
Diffstat (limited to 'xlators/features/marker/utils/syncdaemon/master.py')
-rw-r--r--xlators/features/marker/utils/syncdaemon/master.py129
1 files changed, 127 insertions, 2 deletions
diff --git a/xlators/features/marker/utils/syncdaemon/master.py b/xlators/features/marker/utils/syncdaemon/master.py
index 8e196f8c5..4826037f1 100644
--- a/xlators/features/marker/utils/syncdaemon/master.py
+++ b/xlators/features/marker/utils/syncdaemon/master.py
@@ -5,12 +5,20 @@ import stat
import random
import signal
import logging
+import socket
import errno
-from errno import ENOENT, ENODATA
+from errno import ENOENT, ENODATA, EPIPE
from threading import currentThread, Condition, Lock
+from datetime import datetime
+try:
+ from hashlib import md5 as md5
+except ImportError:
+ # py 2.4
+ from md5 import new as md5
from gconf import gconf
-from syncdutils import FreeObject, Thread, GsyncdError, boolify
+from syncdutils import FreeObject, Thread, GsyncdError, boolify, \
+ escape, unescape, select
URXTIME = (-1, 0)
@@ -113,6 +121,122 @@ class GMaster(object):
# the actual volinfo we make use of
self.volinfo = None
self.terminate = False
+ self.checkpoint_thread = None
+
+ @staticmethod
+ def _checkpt_param(chkpt, prm, timish=True):
+ """use config backend to lookup a parameter belonging to
+ checkpoint @chkpt"""
+ cprm = getattr(gconf, 'checkpoint_' + prm, None)
+ if not cprm:
+ return
+ chkpt_mapped, val = cprm.split(':', 1)
+ if unescape(chkpt_mapped) != chkpt:
+ return
+ if timish:
+ val = tuple(int(x) for x in val.split("."))
+ return val
+
+ @staticmethod
+ def _set_checkpt_param(chkpt, prm, val, timish=True):
+ """use config backend to store a parameter associated
+ with checkpoint @chkpt"""
+ if timish:
+ val = "%d.%d" % tuple(val)
+ gconf.configinterface.set('checkpoint_' + prm, "%s:%s" % (escape(chkpt), val))
+
+ @staticmethod
+ def humantime(*tpair):
+ """format xtime-like (sec, nsec) pair to human readable format"""
+ ts = datetime.fromtimestamp(float('.'.join(str(n) for n in tpair))).\
+ strftime("%Y-%m-%d %H:%M:%S")
+ if len(tpair) > 1:
+ ts += '.' + str(tpair[1])
+ return ts
+
+ def checkpt_service(self, chan, chkpt, tgt):
+ """checkpoint service loop
+
+ monitor and verify checkpoint status for @chkpt, and listen
+ for incoming requests for whom we serve a pretty-formatted
+ status report"""
+ if not chkpt:
+ # dummy loop for the case when there is no checkpt set
+ while True:
+ select([chan], [], [])
+ conn, _ = chan.accept()
+ conn.send('\0')
+ conn.close()
+ completed = self._checkpt_param(chkpt, 'completed')
+ while True:
+ s,_,_ = select([chan], [], [], (not completed) and 5 or None)
+ # either request made and we re-check to not
+ # give back stale data, or we still hunting for completion
+ if tgt < self.volmark:
+ # indexing has been reset since setting the checkpoint
+ status = "is invalid"
+ else:
+ xtr = self.xtime('.', self.slave)
+ if isinstance(xtr, int):
+ raise GsyncdError("slave root directory is unaccessible (%s)",
+ os.strerror(xtr))
+ ncompleted = (xtr >= tgt)
+ if completed and not ncompleted: # stale data
+ logging.warn("completion time %s for checkpoint %s became stale" % \
+ (self.humantime(*completed), chkpt))
+ completed = None
+ gconf.confdata.delete('checkpoint-completed')
+ if ncompleted and not completed: # just reaching completion
+ completed = [ int(x) for x in ("%.6f" % time.time()).split('.') ]
+ self._set_checkpt_param(chkpt, 'completed', completed)
+ logging.info("checkpoint %s completed" % chkpt)
+ status = completed and \
+ "completed at " + self.humantime(completed[0]) or \
+ "not reached yet"
+ if s:
+ conn = None
+ try:
+ conn, _ = chan.accept()
+ try:
+ conn.send(" | checkpoint %s %s\0" % (chkpt, status))
+ except:
+ exc = sys.exc_info()[1]
+ if (isinstance(exc, OSError) or isinstance(exc, IOError)) and \
+ exc.errno == EPIPE:
+ logging.debug('checkpoint client disconnected')
+ else:
+ raise
+ finally:
+ if conn:
+ conn.close()
+
+ def start_checkpoint_thread(self):
+ """prepare and start checkpoint service"""
+ if self.checkpoint_thread or not getattr(gconf, 'state_socket_unencoded', None):
+ return
+ chan = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+ state_socket = "/tmp/%s.socket" % md5(gconf.state_socket_unencoded).hexdigest()
+ try:
+ os.unlink(state_socket)
+ except:
+ if sys.exc_info()[0] == OSError:
+ pass
+ chan.bind(state_socket)
+ chan.listen(1)
+ checkpt_tgt = None
+ if gconf.checkpoint:
+ checkpt_tgt = self._checkpt_param(gconf.checkpoint, 'target')
+ if not checkpt_tgt:
+ checkpt_tgt = self.xtime('.')
+ if isinstance(checkpt_tgt, int):
+ raise GsyncdError("master root directory is unaccessible (%s)",
+ os.strerror(checkpt_tgt))
+ self._set_checkpt_param(gconf.checkpoint, 'target', checkpt_tgt)
+ logging.debug("checkpoint target %d.%d has been determined for checkpoint %s" % \
+ (checkpt_tgt[0], checkpt_tgt[1], gconf.checkpoint))
+ t = Thread(target=self.checkpt_service, args=(chan, gconf.checkpoint, checkpt_tgt))
+ t.start()
+ self.checkpoint_thread = t
def crawl_loop(self):
"""start the keep-alive thread and iterate .crawl"""
@@ -291,6 +415,7 @@ class GMaster(object):
if self.volinfo:
if self.volinfo['retval']:
raise GsyncdError ("master is corrupt")
+ self.start_checkpoint_thread()
else:
if should_display_info or self.crawls == 0:
if self.inter_master: