summaryrefslogtreecommitdiffstats
path: root/xlators/features/marker/utils/syncdaemon/resource.py
diff options
context:
space:
mode:
authorCsaba Henk <csaba@gluster.com>2011-08-10 05:02:43 +0300
committerAnand Avati <avati@gluster.com>2011-09-08 00:06:57 -0700
commit7d4560cbcdcae0d74cf486c544d5eb58775da51f (patch)
tree52a2a9cb4e51a4786b195492de18a1fb7b6713d2 /xlators/features/marker/utils/syncdaemon/resource.py
parentd39a7fad09a6b4abcb23d132fd7dfdf0d440e928 (diff)
gsyncd: do the homework, document _everything_
Change-Id: I559e6a0709b8064cfd54c693e289c741f9c4c4ab BUG: 1570 Reviewed-on: http://review.gluster.com/319 Tested-by: Gluster Build System <jenkins@build.gluster.com> Reviewed-by: Kaushik BV <kaushikbv@gluster.com>
Diffstat (limited to 'xlators/features/marker/utils/syncdaemon/resource.py')
-rw-r--r--xlators/features/marker/utils/syncdaemon/resource.py196
1 files changed, 183 insertions, 13 deletions
diff --git a/xlators/features/marker/utils/syncdaemon/resource.py b/xlators/features/marker/utils/syncdaemon/resource.py
index 66600fdad43..f92e8573409 100644
--- a/xlators/features/marker/utils/syncdaemon/resource.py
+++ b/xlators/features/marker/utils/syncdaemon/resource.py
@@ -26,9 +26,19 @@ HostRX = re.compile('[a-z\d](?:[a-z\d.-]*[a-z\d])?', re.I)
UserRX = re.compile("[\w!\#$%&'*+-\/=?^_`{|}~]+")
def sup(x, *a, **kw):
+ """a rubyesque "super" for python ;)
+
+ invoke caller method in parent class with given args.
+ """
return getattr(super(type(x), x), sys._getframe(1).f_code.co_name)(*a, **kw)
def desugar(ustr):
+ """transform sugared url strings to standard <scheme>://<urlbody> form
+
+ parsing logic enforces the constraint that sugared forms should contatin
+ a ':' or a '/', which ensures that sugared urls do not conflict with
+ gluster volume names.
+ """
m = re.match('([^:]*):(.*)', ustr)
if m:
if not m.groups()[0]:
@@ -46,6 +56,7 @@ def desugar(ustr):
return "file://" + ap
def gethostbyname(hnam):
+ """gethostbyname wrapper"""
try:
return socket.gethostbyname(hnam)
except socket.gaierror:
@@ -54,6 +65,11 @@ def gethostbyname(hnam):
(hnam, ex.strerror))
def parse_url(ustr):
+ """instantiate an url object by scheme-to-class dispatch
+
+ The url classes taken into consideration are the ones in
+ this module whose names are full-caps.
+ """
m = UrlRX.match(ustr)
if not m:
ustr = desugar(ustr)
@@ -68,8 +84,17 @@ def parse_url(ustr):
class _MetaXattr(object):
+ """singleton class, a lazy wrapper around the
+ libcxattr module
+
+ libcxattr (a heavy import due to ctypes) is
+ loaded only when when the single
+ instance is tried to be used.
- # load Xattr stuff on-demand
+ This reduces runtime for those invocations
+ which do not need filesystem manipulation
+ (eg. for config, url parsing)
+ """
def __getattr__(self, meth):
from libcxattr import Xattr as LXattr
@@ -84,14 +109,17 @@ Xattr = _MetaXattr()
class Popen(subprocess.Popen):
+ """customized subclass of subprocess.Popen with a ring
+ buffer for children error ouput"""
@classmethod
def init_errhandler(cls):
+ """start the thread which hanldes children's error output"""
cls.errstore = {}
def tailer():
while True:
for po in select.select([po.stderr for po in cls.errstore], [], []):
- po.lock()
+ po.lock.acquire()
try:
la = cls.errstore.get(po)
if la == None:
@@ -103,23 +131,22 @@ class Popen(subprocess.Popen):
while tots > 1<<20 and la:
tots -= len(la.pop(0))
finally:
- po.unlock()
+ po.lock.release()
t = syncdutils.Thread(target = tailer)
t.start()
cls.errhandler = t
- def lock(self):
- self._lock.acquire()
-
- def unlock(self):
- self._lock.release()
-
def __init__(self, args, *a, **kw):
- """subprocess.Popen wrapper with error-handling"""
+ """customizations for subprocess.Popen instantiation
+
+ - 'close_fds' is taken to be the default
+ - if child's stderr is chosen to be managed,
+ register it with the error handler thread
+ """
self.args = args
if 'close_fds' not in kw:
kw['close_fds'] = True
- self._lock = threading.Lock()
+ self.lock = threading.Lock()
try:
sup(self, args, *a, **kw)
except:
@@ -133,6 +160,7 @@ class Popen(subprocess.Popen):
self.errstore[self] = []
def errfail(self):
+ """fail nicely if child did not terminate with success"""
filling = None
if self.elines:
filling = ", saying:"
@@ -144,11 +172,15 @@ class Popen(subprocess.Popen):
syncdutils.finalize(exval = 1)
def terminate_geterr(self, fail_on_err = True):
- self.lock()
+ """kill child, finalize stderr harvesting (unregister
+ from errhandler, set up .elines), fail on error if
+ asked for
+ """
+ self.lock.acquire()
try:
elines = self.errstore.pop(self)
finally:
- self.unlock()
+ self.lock.release()
if self.poll() == None:
self.terminate()
if sp.poll() == None:
@@ -167,6 +199,12 @@ class Popen(subprocess.Popen):
class Server(object):
+ """singleton implemening those filesystem access primitives
+ which are needed for geo-replication functionality
+
+ (Singleton in the sense it's a class which has only static
+ and classmethods and is used directly, without instantiation.)
+ """
GX_NSPACE = "trusted.glusterfs"
NTV_FMTSTR = "!" + "B"*19 + "II"
@@ -175,6 +213,7 @@ class Server(object):
@staticmethod
def entries(path):
+ """directory entries in an array"""
# prevent symlinks being followed
if not stat.S_ISDIR(os.lstat(path).st_mode):
raise OSError(ENOTDIR, os.strerror(ENOTDIR))
@@ -182,6 +221,20 @@ class Server(object):
@classmethod
def purge(cls, path, entries=None):
+ """force-delete subtrees
+
+ If @entries is not specified, delete
+ the whole subtree under @path (including
+ @path).
+
+ Otherwise, @entries should be a
+ a sequence of children of @path, and
+ the effect is identical with a joint
+ @entries-less purge on them, ie.
+
+ for e in entries:
+ cls.purge(os.path.join(path, e))
+ """
me_also = entries == None
if not entries:
try:
@@ -216,6 +269,7 @@ class Server(object):
@classmethod
def _create(cls, path, ctor):
+ """path creation backend routine"""
try:
ctor(path)
except OSError:
@@ -235,6 +289,13 @@ class Server(object):
@classmethod
def xtime(cls, path, uuid):
+ """query xtime extended attribute
+
+ Return xtime of @path for @uuid as a pair of integers.
+ "Normal" errors due to non-existent @path or extended attribute
+ are tolerated and errno is returned in such a case.
+ """
+
try:
return struct.unpack('!II', Xattr.lgetxattr(path, '.'.join([cls.GX_NSPACE, uuid, 'xtime']), 8))
except OSError:
@@ -246,10 +307,17 @@ class Server(object):
@classmethod
def set_xtime(cls, path, uuid, mark):
+ """set @mark as xtime for @uuid on @path"""
Xattr.lsetxattr(path, '.'.join([cls.GX_NSPACE, uuid, 'xtime']), struct.pack('!II', *mark))
@staticmethod
def setattr(path, adct):
+ """set file attributes
+
+ @adct is a dict, where 'own', 'mode' and 'times'
+ keys are looked for and values used to perform
+ chown, chmod or utimes on @path.
+ """
own = adct.get('own')
if own:
os.lchown(path, *own)
@@ -267,6 +335,14 @@ class Server(object):
last_keep_alive = 0
@classmethod
def keep_alive(cls, dct):
+ """process keepalive messages.
+
+ Return keep-alive counter (number of received keep-alive
+ messages).
+
+ Now the "keep-alive" message can also have a payload which is
+ used to set a foreign volume-mark on the underlying file system.
+ """
if dct:
key = '.'.join([cls.GX_NSPACE, 'volume-mark', dct['uuid']])
val = struct.pack(cls.FRGN_FMTSTR,
@@ -279,15 +355,30 @@ class Server(object):
@staticmethod
def version():
+ """version used in handshake"""
return 1.0
class SlaveLocal(object):
+ """mix-in class to implement some factes of a slave server
+
+ ("mix-in" is sort of like "abstract class", ie. it's not
+ instantiated just included in the ancesty DAG. I use "mix-in"
+ to indicate that it's not used as an abstract base class,
+ rather just taken in to implement additional functionality
+ on the basis of the assumed availability of certain interfaces.)
+ """
def can_connect_to(self, remote):
+ """determine our position in the connectibility matrix"""
return not remote
def service_loop(self):
+ """start a RePCe server serving self's server
+
+ stop servicing if a timeout is configured and got no
+ keep-alime in that inteval
+ """
repce = RepceServer(self.server, sys.stdin, sys.stdout, int(gconf.sync_jobs))
t = syncdutils.Thread(target=lambda: (repce.service_loop(),
syncdutils.finalize()))
@@ -304,8 +395,15 @@ class SlaveLocal(object):
select.select((), (), ())
class SlaveRemote(object):
+ """mix-in class to implement an interface to a remote slave"""
def connect_remote(self, rargs=[], **opts):
+ """connects to a remote slave
+
+ Invoke an auxiliary utility (slave gsyncd, possibly wrapped)
+ which sets up the connection and set up a RePCe client to
+ communicate throuh its stdio.
+ """
slave = opts.get('slave', self.url)
so = getattr(gconf, 'session_owner', None)
if so:
@@ -319,6 +417,11 @@ class SlaveRemote(object):
return self.start_fd_client(po.stdout, po.stdin, **opts)
def start_fd_client(self, i, o, **opts):
+ """set up RePCe client, handshake with server
+
+ It's cut out as a separate method to let
+ subclasses hook into client startup
+ """
self.server = RepceClient(i, o)
rv = self.server.__version__()
exrv = {'proto': repce.repce_version, 'object': Server.version()}
@@ -331,6 +434,7 @@ class SlaveRemote(object):
raise GsyncdError("RePCe major version mismatch: local %s, remote %s" % (exrv, rv))
def rsync(self, files, *args):
+ """invoke rsync"""
if not files:
raise GsyncdError("no files to sync")
logging.debug("files: " + ", ".join(files))
@@ -342,6 +446,7 @@ class SlaveRemote(object):
class AbstractUrl(object):
+ """abstract base class for url scheme classes"""
def __init__(self, path, pattern):
m = re.search(pattern, path)
@@ -358,6 +463,7 @@ class AbstractUrl(object):
return self.path
def get_url(self, canonical=False, escaped=False):
+ """format self's url in various styles"""
if canonical:
pa = self.canonical_path()
else:
@@ -376,8 +482,15 @@ class AbstractUrl(object):
class FILE(AbstractUrl, SlaveLocal, SlaveRemote):
+ """scheme class for file:// urls
+
+ can be used to represent a file slave server
+ on slave side, or interface to a remote file
+ file server on master side
+ """
class FILEServer(Server):
+ """included server flavor"""
pass
server = FILEServer
@@ -386,6 +499,7 @@ class FILE(AbstractUrl, SlaveLocal, SlaveRemote):
sup(self, path, '^/')
def connect(self):
+ """inhibit the resource beyond"""
os.chdir(self.path)
def rsync(self, files):
@@ -393,11 +507,21 @@ class FILE(AbstractUrl, SlaveLocal, SlaveRemote):
class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):
+ """scheme class for gluster:// urls
+
+ can be used to represent a gluster slave server
+ on slave side, or interface to a remote gluster
+ slave on master side, or to represent master
+ (slave-ish features come from the mixins, master
+ functionality is outsourced to GMaster from master)
+ """
class GLUSTERServer(Server):
+ "server enhancements for a glusterfs backend"""
@classmethod
def _attr_unpack_dict(cls, xattr, extra_fields = ''):
+ """generic volume mark fetching/parsing backed"""
fmt_string = cls.NTV_FMTSTR + extra_fields
buf = Xattr.lgetxattr('.', xattr, struct.calcsize(fmt_string))
vm = struct.unpack(fmt_string, buf)
@@ -415,6 +539,7 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):
@classmethod
def foreign_volume_infos(cls):
+ """return list of valid (not expired) foreign volume marks"""
dict_list = []
xattr_list = Xattr.llistxattr_buf('.')
for ele in xattr_list:
@@ -434,6 +559,7 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):
@classmethod
def native_volume_info(cls):
+ """get the native volume mark of the underlying gluster volume"""
try:
return cls._attr_unpack_dict('.'.join([cls.GX_NSPACE, 'volume-mark']))
except OSError:
@@ -450,9 +576,17 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):
return ':'.join([gethostbyname(self.host), self.volume])
def can_connect_to(self, remote):
+ """determine our position in the connectibility matrix"""
return True
def connect(self):
+ """inhibit the resource beyond
+
+ - create temprorary mount point
+ - call glusterfs to mount the volume over there
+ - change to mounted fs root
+ - lazy umount + delete temp. mount point
+ """
def umount_l(d):
po = Popen(['umount', '-l', d], stderr=subprocess.PIPE)
po.wait()
@@ -486,6 +620,13 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):
self.slavedir = "/proc/%d/cwd" % self.server.pid()
def service_loop(self, *args):
+ """enter service loop
+
+ - if slave given, instantiate GMaster and
+ pass control to that instance, which implements
+ master behavior
+ - else do that's what's inherited
+ """
if args:
GMaster(self, args[0]).crawl_loop()
else:
@@ -496,6 +637,11 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):
class SSH(AbstractUrl, SlaveRemote):
+ """scheme class for ssh:// urls
+
+ interface to remote slave on master side
+ implementing an ssh based proxy
+ """
def __init__(self, path):
self.remote_addr, inner_url = sup(self, path,
@@ -512,9 +658,16 @@ class SSH(AbstractUrl, SlaveRemote):
return ':'.join([remote_addr, self.inner_rsc.get_url(canonical=True)])
def can_connect_to(self, remote):
+ """determine our position in the connectibility matrix"""
return False
def start_fd_client(self, *a, **opts):
+ """customizations for client startup
+
+ - be a no-op if we are to daemonize (client startup is deferred
+ to post-daemon stage)
+ - determine target url for rsync after consulting server
+ """
if opts.get('deferred'):
return a
sup(self, *a)
@@ -528,6 +681,23 @@ class SSH(AbstractUrl, SlaveRemote):
self.slaveurl = ':'.join([self.remote_addr, slavepath])
def connect_remote(self, go_daemon=None):
+ """connect to inner slave url through outer ssh url
+
+ Wrap the connecting utility in ssh.
+
+ Much care is put into daemonizing: in that case
+ ssh is started before daemonization, but
+ RePCe client is to be created after that (as ssh
+ interactive password auth would be defeated by
+ a daemonized ssh, while client should be present
+ only in the final process). In that case the action
+ is taken apart to two parts, this method is ivoked
+ once pre-daemon, once post-daemon. Use @go_daemon
+ to deiced what part to perform.
+
+ [NB. ATM gluster product does not makes use of interactive
+ authentication.]
+ """
if go_daemon == 'done':
return self.start_fd_client(*self.fd_pair)
gconf.setup_ssh_ctl(tempfile.mkdtemp(prefix='gsyncd-aux-ssh-'))