summaryrefslogtreecommitdiffstats
path: root/geo-replication/syncdaemon/syncdutils.py
diff options
context:
space:
mode:
Diffstat (limited to 'geo-replication/syncdaemon/syncdutils.py')
-rw-r--r--geo-replication/syncdaemon/syncdutils.py890
1 files changed, 750 insertions, 140 deletions
diff --git a/geo-replication/syncdaemon/syncdutils.py b/geo-replication/syncdaemon/syncdutils.py
index 9eda6044472..a3df103e76c 100644
--- a/geo-replication/syncdaemon/syncdutils.py
+++ b/geo-replication/syncdaemon/syncdutils.py
@@ -15,53 +15,105 @@ import time
import fcntl
import shutil
import logging
+import errno
+import threading
+import subprocess
import socket
+from subprocess import PIPE
from threading import Lock, Thread as baseThread
-from errno import EACCES, EAGAIN, EPIPE, ENOTCONN, ECONNABORTED
-from errno import EINTR, ENOENT, EPERM, ESTALE, errorcode
+from errno import (EACCES, EAGAIN, EPIPE, ENOTCONN, ENOMEM, ECONNABORTED,
+ EINTR, ENOENT, ESTALE, EBUSY, ENODATA, errorcode, EIO)
from signal import signal, SIGTERM
import select as oselect
from os import waitpid as owaitpid
-
+import xml.etree.ElementTree as XET
+from select import error as SelectError
try:
from cPickle import PickleError
except ImportError:
- # py 3
from pickle import PickleError
-from gconf import gconf
-
+from conf import GLUSTERFS_LIBEXECDIR, UUID_FILE
+sys.path.insert(1, GLUSTERFS_LIBEXECDIR)
+EVENTS_ENABLED = True
try:
- # py 3
- from urllib import parse as urllib
+ from gfevents.eventtypes import GEOREP_FAULTY as EVENT_GEOREP_FAULTY
+ from gfevents.eventtypes import GEOREP_ACTIVE as EVENT_GEOREP_ACTIVE
+ from gfevents.eventtypes import GEOREP_PASSIVE as EVENT_GEOREP_PASSIVE
+ from gfevents.eventtypes import GEOREP_CHECKPOINT_COMPLETED \
+ as EVENT_GEOREP_CHECKPOINT_COMPLETED
except ImportError:
- import urllib
+ # Events APIs not installed, dummy eventtypes with None
+ EVENTS_ENABLED = False
+ EVENT_GEOREP_FAULTY = None
+ EVENT_GEOREP_ACTIVE = None
+ EVENT_GEOREP_PASSIVE = None
+ EVENT_GEOREP_CHECKPOINT_COMPLETED = None
-try:
- from hashlib import md5 as md5
-except ImportError:
- # py 2.4
- from md5 import new as md5
+import gsyncdconfig as gconf
+from rconf import rconf
-# auxillary gfid based access prefix
+from hashlib import sha256 as sha256
+
+ENOTSUP = getattr(errno, 'ENOTSUP', 'EOPNOTSUPP')
+
+# auxiliary gfid based access prefix
_CL_AUX_GFID_PFX = ".gfid/"
-GF_OP_RETRIES = 20
+ROOT_GFID = "00000000-0000-0000-0000-000000000001"
+GF_OP_RETRIES = 10
+
+GX_GFID_CANONICAL_LEN = 37 # canonical gfid len + '\0'
+
+NodeID = None
+rsync_version = None
+unshare_mnt_propagation = None
+slv_bricks = None
+SPACE_ESCAPE_CHAR = "%20"
+NEWLINE_ESCAPE_CHAR = "%0A"
+PERCENTAGE_ESCAPE_CHAR = "%25"
-CHANGELOG_AGENT_SERVER_VERSION = 1.0
-CHANGELOG_AGENT_CLIENT_VERSION = 1.0
+final_lock = Lock()
+
+def sup(x, *a, **kw):
+ """a rubyesque "super" for python ;)
+
+ invoke caller method in parent class with given args.
+ """
+ return getattr(super(type(x), x),
+ sys._getframe(1).f_code.co_name)(*a, **kw)
def escape(s):
"""the chosen flavor of string escaping, used all over
to turn whatever data to creatable representation"""
- return urllib.quote_plus(s)
+ return s.replace("/", "-").strip("-")
-def unescape(s):
- """inverse of .escape"""
- return urllib.unquote_plus(s)
+def escape_space_newline(s):
+ return s.replace("%", PERCENTAGE_ESCAPE_CHAR)\
+ .replace(" ", SPACE_ESCAPE_CHAR)\
+ .replace("\n", NEWLINE_ESCAPE_CHAR)
+def unescape_space_newline(s):
+ return s.replace(SPACE_ESCAPE_CHAR, " ")\
+ .replace(NEWLINE_ESCAPE_CHAR, "\n")\
+ .replace(PERCENTAGE_ESCAPE_CHAR, "%")
+
+# gf_mount_ready() returns 1 if all subvols are up, else 0
+def gf_mount_ready():
+ ret = errno_wrap(Xattr.lgetxattr,
+ ['.', 'dht.subvol.status', 16],
+ [ENOENT, ENOTSUP, ENODATA], [ENOMEM])
+
+ if isinstance(ret, int):
+ logging.error("failed to get the xattr value")
+ return 1
+ ret = ret.rstrip('\x00')
+ if ret == "1":
+ return 1
+ return 0
+
def norm(s):
if s:
return s.replace('-', '_')
@@ -119,17 +171,26 @@ def setup_ssh_ctl(ctld, remote_addr, resource_url):
"""
Setup GConf ssh control path parameters
"""
- gconf.ssh_ctl_dir = ctld
+ rconf.ssh_ctl_dir = ctld
content = "SLAVE_HOST=%s\nSLAVE_RESOURCE_URL=%s" % (remote_addr,
resource_url)
- content_md5 = md5hex(content)
- fname = os.path.join(gconf.ssh_ctl_dir,
- "%s.mft" % content_md5)
+ encoded_content = content.encode()
+ content_sha256 = sha256hex(encoded_content)
+ """
+ The length of ctl_path for ssh connection should not be > 108.
+ ssh fails with ctl_path too long if it is so. But when rsync
+ is piped to ssh, it is not taking > 90. Hence using first 32
+ bytes of hash. Hash collision doesn't matter as only one sock
+ file is created per directory.
+ """
+ content_sha256 = content_sha256[:32]
+ fname = os.path.join(rconf.ssh_ctl_dir,
+ "%s.mft" % content_sha256)
- create_manifest(fname, content)
- ssh_ctl_path = os.path.join(gconf.ssh_ctl_dir,
- "%s.sock" % content_md5)
- gconf.ssh_ctl_args = ["-oControlMaster=auto", "-S", ssh_ctl_path]
+ create_manifest(fname, encoded_content)
+ ssh_ctl_path = os.path.join(rconf.ssh_ctl_dir,
+ "%s.sock" % content_sha256)
+ rconf.ssh_ctl_args = ["-oControlMaster=auto", "-S", ssh_ctl_path]
def grabfile(fname, content=None):
@@ -139,7 +200,7 @@ def grabfile(fname, content=None):
"""
# damn those messy open() mode codes
fd = os.open(fname, os.O_CREAT | os.O_RDWR)
- f = os.fdopen(fd, 'r+b', 0)
+ f = os.fdopen(fd, 'r+')
try:
fcntl.lockf(f, fcntl.LOCK_EX | fcntl.LOCK_NB)
except:
@@ -153,34 +214,34 @@ def grabfile(fname, content=None):
try:
f.truncate()
f.write(content)
+ f.flush()
except:
f.close()
raise
- gconf.permanent_handles.append(f)
+ rconf.permanent_handles.append(f)
return f
def grabpidfile(fname=None, setpid=True):
""".grabfile customization for pid files"""
if not fname:
- fname = gconf.pid_file
+ fname = gconf.get("pid-file")
content = None
if setpid:
content = str(os.getpid()) + '\n'
return grabfile(fname, content=content)
-final_lock = Lock()
-
-def finalize(*a, **kw):
+def finalize(*args, **kwargs):
"""all those messy final steps we go trough upon termination
Do away with pidfile, ssh control dir and logging.
"""
+
final_lock.acquire()
- if getattr(gconf, 'pid_file', None):
- rm_pidf = gconf.pid_file_owned
- if gconf.cpid:
+ if gconf.get('pid_file'):
+ rm_pidf = rconf.pid_file_owned
+ if rconf.cpid:
# exit path from parent branch of daemonization
rm_pidf = False
while True:
@@ -188,33 +249,50 @@ def finalize(*a, **kw):
if not f:
# child has already taken over pidfile
break
- if os.waitpid(gconf.cpid, os.WNOHANG)[0] == gconf.cpid:
+ if os.waitpid(rconf.cpid, os.WNOHANG)[0] == rconf.cpid:
# child has terminated
rm_pidf = True
break
time.sleep(0.1)
if rm_pidf:
try:
- os.unlink(gconf.pid_file)
+ os.unlink(rconf.pid_file)
except:
ex = sys.exc_info()[1]
if ex.errno == ENOENT:
pass
else:
raise
- if gconf.ssh_ctl_dir and not gconf.cpid:
- shutil.rmtree(gconf.ssh_ctl_dir)
- if getattr(gconf, 'state_socket', None):
- try:
- os.unlink(gconf.state_socket)
- except:
- if sys.exc_info()[0] == OSError:
+ if rconf.ssh_ctl_dir and not rconf.cpid:
+ def handle_rm_error(func, path, exc_info):
+ if exc_info[1].errno == ENOENT:
+ return
+ raise exc_info[1]
+
+ shutil.rmtree(rconf.ssh_ctl_dir, onerror=handle_rm_error)
+
+ """ Unmount if not done """
+ if rconf.mount_point:
+ if rconf.mountbroker:
+ umount_cmd = rconf.mbr_umount_cmd + [rconf.mount_point, 'lazy']
+ else:
+ umount_cmd = ['umount', '-l', rconf.mount_point]
+ p0 = subprocess.Popen(umount_cmd, stderr=subprocess.PIPE,
+ universal_newlines=True)
+ _, errdata = p0.communicate()
+ if p0.returncode == 0:
+ try:
+ os.rmdir(rconf.mount_point)
+ except OSError:
pass
- if gconf.log_exit:
+ else:
+ pass
+
+ if rconf.log_exit:
logging.info("exiting.")
sys.stdout.flush()
sys.stderr.flush()
- os._exit(kw.get('exval', 0))
+ os._exit(kwargs.get('exval', 0))
def log_raise_exception(excont):
@@ -224,6 +302,7 @@ def log_raise_exception(excont):
Translate some weird sounding but well understood exceptions
into human-friendly lingo
"""
+
is_filelog = False
for h in logging.getLogger().handlers:
fno = getattr(getattr(h, 'stream', None), 'fileno', None)
@@ -244,31 +323,45 @@ def log_raise_exception(excont):
((isinstance(exc, OSError) or isinstance(exc, IOError)) and
exc.errno == EPIPE):
logging.error('connection to peer is broken')
- if hasattr(gconf, 'transport'):
- gconf.transport.wait()
- if gconf.transport.returncode == 127:
- logging.warn("!!!!!!!!!!!!!")
- logging.warn('!!! getting "No such file or directory" '
- "errors is most likely due to "
- "MISCONFIGURATION"
- ", please consult https://access.redhat.com"
- "/site/documentation/en-US/Red_Hat_Storage"
- "/2.1/html/Administration_Guide"
- "/chap-User_Guide-Geo_Rep-Preparation-"
- "Settingup_Environment.html")
- logging.warn("!!!!!!!!!!!!!")
- gconf.transport.terminate_geterr()
+ if hasattr(rconf, 'transport'):
+ rconf.transport.wait()
+ if rconf.transport.returncode == 127:
+ logging.error("getting \"No such file or directory\""
+ "errors is most likely due to "
+ "MISCONFIGURATION, please remove all "
+ "the public keys added by geo-replication "
+ "from authorized_keys file in slave nodes "
+ "and run Geo-replication create "
+ "command again.")
+ logging.error("If `gsec_create container` was used, then "
+ "run `gluster volume geo-replication "
+ "<MASTERVOL> [<SLAVEUSER>@]<SLAVEHOST>::"
+ "<SLAVEVOL> config remote-gsyncd "
+ "<GSYNCD_PATH> (Example GSYNCD_PATH: "
+ "`/usr/libexec/glusterfs/gsyncd`)")
+ rconf.transport.terminate_geterr()
elif isinstance(exc, OSError) and exc.errno in (ENOTCONN,
ECONNABORTED):
- logging.error('glusterfs session went down [%s]',
- errorcode[exc.errno])
+ logging.error(lf('Gluster Mount process exited',
+ error=errorcode[exc.errno]))
+ elif isinstance(exc, OSError) and exc.errno == EIO:
+ logging.error("Getting \"Input/Output error\" "
+ "is most likely due to "
+ "a. Brick is down or "
+ "b. Split brain issue.")
+ logging.error("This is expected as per design to "
+ "keep the consistency of the file system. "
+ "Once the above issue is resolved "
+ "geo-replication would automatically "
+ "proceed further.")
+ logtag = "FAIL"
else:
logtag = "FAIL"
if not logtag and logging.getLogger().isEnabledFor(logging.DEBUG):
logtag = "FULL EXCEPTION TRACE"
if logtag:
logging.exception(logtag + ": ")
- sys.stderr.write("failed with %s.\n" % type(exc).__name__)
+ sys.stderr.write("failed with %s: %s.\n" % (type(exc).__name__, exc))
excont.exval = 1
sys.exit(excont.exval)
@@ -291,20 +384,20 @@ class Thread(baseThread):
function coughs up an exception
"""
- def __init__(self, *a, **kw):
- tf = kw.get('target')
+ def __init__(self, *args, **kwargs):
+ tf = kwargs.get('target')
if tf:
- def twrap(*aa):
+ def twrap(*aargs):
excont = FreeObject(exval=0)
try:
- tf(*aa)
+ tf(*aargs)
except:
try:
log_raise_exception(excont)
finally:
finalize(exval=excont.exval)
- kw['target'] = twrap
- baseThread.__init__(self, *a, **kw)
+ kwargs['target'] = twrap
+ baseThread.__init__(self, *args, **kwargs)
self.setDaemon(True)
@@ -312,6 +405,33 @@ class GsyncdError(Exception):
pass
+class _MetaXattr(object):
+
+ """singleton class, a lazy wrapper around the
+ libcxattr module
+
+ libcxattr (a heavy import due to ctypes) is
+ loaded only when when the single
+ instance is tried to be used.
+
+ This reduces runtime for those invocations
+ which do not need filesystem manipulation
+ (eg. for config, url parsing)
+ """
+
+ def __getattr__(self, meth):
+ from libcxattr import Xattr as LXattr
+ xmeth = [m for m in dir(LXattr) if m[0] != '_']
+ if meth not in xmeth:
+ return
+ for m in xmeth:
+ setattr(self, m, getattr(LXattr, m))
+ return getattr(self, meth)
+
+
+Xattr = _MetaXattr()
+
+
def getusername(uid=None):
if uid is None:
uid = os.geteuid()
@@ -342,81 +462,63 @@ def boolify(s):
lstr = s.lower()
if lstr in true_list:
rv = True
- elif not lstr in false_list:
- logging.warn("Unknown string (%s) in string to boolean conversion "
- "defaulting to False\n" % (s))
+ elif lstr not in false_list:
+ logging.warn(lf("Unknown string in \"string to boolean\" conversion, "
+ "defaulting to False",
+ str=s))
return rv
-def eintr_wrap(func, exc, *a):
+def eintr_wrap(func, exc, *args):
"""
wrapper around syscalls resilient to interrupt caused
by signals
"""
while True:
try:
- return func(*a)
+ return func(*args)
except exc:
ex = sys.exc_info()[1]
if not ex.args[0] == EINTR:
raise
-def select(*a):
- return eintr_wrap(oselect.select, oselect.error, *a)
+def select(*args):
+ return eintr_wrap(oselect.select, oselect.error, *args)
+
+def waitpid(*args):
+ return eintr_wrap(owaitpid, OSError, *args)
-def waitpid(*a):
- return eintr_wrap(owaitpid, OSError, *a)
+def term_handler_default_hook(signum, frame):
+ finalize(signum, frame, exval=1)
-def set_term_handler(hook=lambda *a: finalize(*a, **{'exval': 1})):
+
+def set_term_handler(hook=term_handler_default_hook):
signal(SIGTERM, hook)
-def is_host_local(host):
- locaddr = False
- for ai in socket.getaddrinfo(host, None):
- # cf. http://github.com/gluster/glusterfs/blob/ce111f47/xlators
- # /mgmt/glusterd/src/glusterd-utils.c#L125
- if ai[0] == socket.AF_INET:
- if ai[-1][0].split(".")[0] == "127":
- locaddr = True
- break
- elif ai[0] == socket.AF_INET6:
- if ai[-1][0] == "::1":
- locaddr = True
+def get_node_uuid():
+ global NodeID
+ if NodeID is not None:
+ return NodeID
+
+ NodeID = ""
+ with open(UUID_FILE) as f:
+ for line in f:
+ if line.startswith("UUID="):
+ NodeID = line.strip().split("=")[-1]
break
- else:
- continue
- try:
- # use ICMP socket to avoid net.ipv4.ip_nonlocal_bind issue,
- # cf. https://bugzilla.redhat.com/show_bug.cgi?id=890587
- s = socket.socket(ai[0], socket.SOCK_RAW, socket.IPPROTO_ICMP)
- except socket.error:
- ex = sys.exc_info()[1]
- if ex.errno != EPERM:
- raise
- f = None
- try:
- f = open("/proc/sys/net/ipv4/ip_nonlocal_bind")
- if int(f.read()) != 0:
- raise GsyncdError(
- "non-local bind is set and not allowed to create "
- "raw sockets, cannot determine if %s is local" % host)
- s = socket.socket(ai[0], socket.SOCK_DGRAM)
- finally:
- if f:
- f.close()
- try:
- s.bind(ai[-1])
- locaddr = True
- break
- except:
- pass
- s.close()
- return locaddr
+
+ if NodeID == "":
+ raise GsyncdError("Failed to get Host UUID from %s" % UUID_FILE)
+ return NodeID
+
+
+def is_host_local(host_id):
+ return host_id == get_node_uuid()
def funcode(f):
@@ -452,17 +554,16 @@ def gauxpfx():
return _CL_AUX_GFID_PFX
-def md5hex(s):
- return md5(s).hexdigest()
+def sha256hex(s):
+ return sha256(s).hexdigest()
def selfkill(sig=SIGTERM):
os.kill(os.getpid(), sig)
-def errno_wrap(call, arg=[], errnos=[], retry_errnos=[ESTALE]):
+def errno_wrap(call, arg=[], errnos=[], retry_errnos=[]):
""" wrapper around calls resilient to errnos.
- retry in case of ESTALE by default.
"""
nr_tries = 0
while True:
@@ -472,28 +573,39 @@ def errno_wrap(call, arg=[], errnos=[], retry_errnos=[ESTALE]):
ex = sys.exc_info()[1]
if ex.errno in errnos:
return ex.errno
- if not ex.errno in retry_errnos:
+ if ex.errno not in retry_errnos:
raise
nr_tries += 1
if nr_tries == GF_OP_RETRIES:
# probably a screwed state, cannot do much...
- logging.warn('reached maximum retries (%s)...' % repr(arg))
- return
+ logging.warn(lf('reached maximum retries',
+ args=repr(arg),
+ error=ex))
+ raise
time.sleep(0.250) # retry the call
def lstat(e):
- try:
- return os.lstat(e)
- except (IOError, OSError):
- ex = sys.exc_info()[1]
- if ex.errno == ENOENT:
- return ex.errno
- else:
- raise
+ return errno_wrap(os.lstat, [e], [ENOENT], [ESTALE, EBUSY])
+
+def get_gfid_from_mnt(gfidpath):
+ return errno_wrap(Xattr.lgetxattr,
+ [gfidpath, 'glusterfs.gfid.string',
+ GX_GFID_CANONICAL_LEN], [ENOENT], [ESTALE])
+
+def matching_disk_gfid(gfid, entry):
+ disk_gfid = get_gfid_from_mnt(entry)
+ if isinstance(disk_gfid, int):
+ return False
-class NoPurgeTimeAvailable(Exception):
+ if not gfid == disk_gfid:
+ return False
+
+ return True
+
+
+class NoStimeAvailable(Exception):
pass
@@ -501,5 +613,503 @@ class PartialHistoryAvailable(Exception):
pass
+class ChangelogHistoryNotAvailable(Exception):
+ pass
+
+
class ChangelogException(OSError):
pass
+
+
+def gf_event(event_type, **kwargs):
+ if EVENTS_ENABLED:
+ from gfevents.gf_event import gf_event as gfevent
+ gfevent(event_type, **kwargs)
+
+
+class GlusterLogLevel(object):
+ NONE = 0
+ EMERG = 1
+ ALERT = 2
+ CRITICAL = 3
+ ERROR = 4
+ WARNING = 5
+ NOTICE = 6
+ INFO = 7
+ DEBUG = 8
+ TRACE = 9
+
+
+def get_changelog_log_level(lvl):
+ return getattr(GlusterLogLevel, lvl, GlusterLogLevel.INFO)
+
+
+def get_master_and_slave_data_from_args(args):
+ master_name = None
+ slave_data = None
+ for arg in args:
+ if arg.startswith(":"):
+ master_name = arg.replace(":", "")
+ if "::" in arg:
+ slave_data = arg.replace("ssh://", "")
+
+ return (master_name, slave_data)
+
+def unshare_propagation_supported():
+ global unshare_mnt_propagation
+ if unshare_mnt_propagation is not None:
+ return unshare_mnt_propagation
+
+ unshare_mnt_propagation = False
+ p = subprocess.Popen(["unshare", "--help"],
+ stderr=subprocess.PIPE,
+ stdout=subprocess.PIPE,
+ universal_newlines=True)
+ out, err = p.communicate()
+ if p.returncode == 0:
+ if "propagation" in out:
+ unshare_mnt_propagation = True
+
+ return unshare_mnt_propagation
+
+
+def get_rsync_version(rsync_cmd):
+ global rsync_version
+ if rsync_version is not None:
+ return rsync_version
+
+ rsync_version = "0"
+ p = subprocess.Popen([rsync_cmd, "--version"],
+ stderr=subprocess.PIPE,
+ stdout=subprocess.PIPE,
+ universal_newlines=True)
+ out, err = p.communicate()
+ if p.returncode == 0:
+ rsync_version = out.split(" ", 4)[3]
+
+ return rsync_version
+
+
+def get_slv_dir_path(slv_host, slv_volume, gfid):
+ global slv_bricks
+
+ dir_path = ENOENT
+ pfx = gauxpfx()
+
+ if not slv_bricks:
+ slv_info = Volinfo(slv_volume, slv_host, master=False)
+ slv_bricks = slv_info.bricks
+ # Result of readlink would be of format as below.
+ # readlink = "../../pgfid[0:2]/pgfid[2:4]/pgfid/basename"
+ for brick in slv_bricks:
+ dir_path = errno_wrap(os.path.join,
+ [brick['dir'],
+ ".glusterfs", gfid[0:2],
+ gfid[2:4],
+ gfid], [ENOENT], [ESTALE])
+ if dir_path != ENOENT:
+ try:
+ realpath = errno_wrap(os.readlink, [dir_path],
+ [ENOENT], [ESTALE])
+ if not isinstance(realpath, int):
+ realpath_parts = realpath.split('/')
+ pargfid = realpath_parts[-2]
+ basename = realpath_parts[-1]
+ dir_entry = os.path.join(pfx, pargfid, basename)
+ return dir_entry
+ except OSError:
+ # .gfid/GFID
+ gfidpath = unescape_space_newline(os.path.join(pfx, gfid))
+ realpath = errno_wrap(Xattr.lgetxattr_buf,
+ [gfidpath, 'glusterfs.gfid2path'], [ENOENT], [ESTALE])
+ if not isinstance(realpath, int):
+ basename = os.path.basename(realpath).rstrip('\x00')
+ dirpath = os.path.dirname(realpath)
+ if dirpath == "/":
+ pargfid = ROOT_GFID
+ else:
+ dirpath = dirpath.strip("/")
+ pargfid = get_gfid_from_mnt(dirpath)
+ if isinstance(pargfid, int):
+ return None
+ dir_entry = os.path.join(pfx, pargfid, basename)
+ return dir_entry
+
+ return None
+
+
+def lf(event, **kwargs):
+ """
+ Log Format helper function, log messages can be
+ easily modified to structured log format.
+ lf("Config Change", sync_jobs=4, brick=/bricks/b1) will be
+ converted as "Config Change [{brick=/bricks/b1}, {sync_jobs=4}]"
+ """
+ msgparts = []
+ for k, v in kwargs.items():
+ msgparts.append("{%s=%s}" % (k, v))
+ return "%s [%s]" % (event, ", ".join(msgparts))
+
+
+class Popen(subprocess.Popen):
+
+ """customized subclass of subprocess.Popen with a ring
+ buffer for children error output"""
+
+ @classmethod
+ def init_errhandler(cls):
+ """start the thread which handles children's error output"""
+ cls.errstore = {}
+
+ def tailer():
+ while True:
+ errstore = cls.errstore.copy()
+ try:
+ poe, _, _ = select(
+ [po.stderr for po in errstore], [], [], 1)
+ except (ValueError, SelectError):
+ # stderr is already closed wait for some time before
+ # checking next error
+ time.sleep(0.5)
+ continue
+ for po in errstore:
+ if po.stderr not in poe:
+ continue
+ po.lock.acquire()
+ try:
+ if po.on_death_row:
+ continue
+ la = errstore[po]
+ try:
+ fd = po.stderr.fileno()
+ except ValueError: # file is already closed
+ time.sleep(0.5)
+ continue
+
+ try:
+ l = os.read(fd, 1024)
+ except OSError:
+ time.sleep(0.5)
+ continue
+
+ if not l:
+ continue
+ tots = len(l)
+ for lx in la:
+ tots += len(lx)
+ while tots > 1 << 20 and la:
+ tots -= len(la.pop(0))
+ la.append(l)
+ finally:
+ po.lock.release()
+ t = Thread(target=tailer)
+ t.start()
+ cls.errhandler = t
+
+ @classmethod
+ def fork(cls):
+ """fork wrapper that restarts errhandler thread in child"""
+ pid = os.fork()
+ if not pid:
+ cls.init_errhandler()
+ return pid
+
+ def __init__(self, args, *a, **kw):
+ """customizations for subprocess.Popen instantiation
+
+ - 'close_fds' is taken to be the default
+ - if child's stderr is chosen to be managed,
+ register it with the error handler thread
+ """
+ self.args = args
+ if 'close_fds' not in kw:
+ kw['close_fds'] = True
+ self.lock = threading.Lock()
+ self.on_death_row = False
+ self.elines = []
+ try:
+ sup(self, args, *a, **kw)
+ except:
+ ex = sys.exc_info()[1]
+ if not isinstance(ex, OSError):
+ raise
+ raise GsyncdError("""execution of "%s" failed with %s (%s)""" %
+ (args[0], errno.errorcode[ex.errno],
+ os.strerror(ex.errno)))
+ if kw.get('stderr') == subprocess.PIPE:
+ assert(getattr(self, 'errhandler', None))
+ self.errstore[self] = []
+
+ def errlog(self):
+ """make a log about child's failure event"""
+ logging.error(lf("command returned error",
+ cmd=" ".join(self.args),
+ error=self.returncode))
+ lp = ''
+
+ def logerr(l):
+ logging.error(self.args[0] + "> " + l)
+ for l in self.elines:
+ ls = l.split('\n')
+ ls[0] = lp + ls[0]
+ lp = ls.pop()
+ for ll in ls:
+ logerr(ll)
+ if lp:
+ logerr(lp)
+
+ def errfail(self):
+ """fail nicely if child did not terminate with success"""
+ self.errlog()
+ finalize(exval=1)
+
+ def terminate_geterr(self, fail_on_err=True):
+ """kill child, finalize stderr harvesting (unregister
+ from errhandler, set up .elines), fail on error if
+ asked for
+ """
+ self.lock.acquire()
+ try:
+ self.on_death_row = True
+ finally:
+ self.lock.release()
+ elines = self.errstore.pop(self)
+ if self.poll() is None:
+ self.terminate()
+ if self.poll() is None:
+ time.sleep(0.1)
+ self.kill()
+ self.wait()
+ while True:
+ if not select([self.stderr], [], [], 0.1)[0]:
+ break
+ b = os.read(self.stderr.fileno(), 1024)
+ if b:
+ elines.append(b.decode())
+ else:
+ break
+ self.stderr.close()
+ self.elines = elines
+ if fail_on_err and self.returncode != 0:
+ self.errfail()
+
+
+def host_brick_split(value):
+ """
+ IPv6 compatible way to split and get the host
+ and brick information. Example inputs:
+ node1.example.com:/exports/bricks/brick1/brick
+ fe80::af0f:df82:844f:ef66%utun0:/exports/bricks/brick1/brick
+ """
+ parts = value.split(":")
+ brick = parts[-1]
+ hostparts = parts[0:-1]
+ return (":".join(hostparts), brick)
+
+
+class Volinfo(object):
+
+ def __init__(self, vol, host='localhost', prelude=[], master=True):
+ if master:
+ gluster_cmd_dir = gconf.get("gluster-command-dir")
+ else:
+ gluster_cmd_dir = gconf.get("slave-gluster-command-dir")
+
+ gluster_cmd = os.path.join(gluster_cmd_dir, 'gluster')
+ po = Popen(prelude + [gluster_cmd, '--xml', '--remote-host=' + host,
+ 'volume', 'info', vol],
+ stdout=PIPE, stderr=PIPE, universal_newlines=True)
+ vix = po.stdout.read()
+ po.wait()
+ po.terminate_geterr()
+ vi = XET.fromstring(vix)
+ if vi.find('opRet').text != '0':
+ if prelude:
+ via = '(via %s) ' % prelude.join(' ')
+ else:
+ via = ' '
+ raise GsyncdError('getting volume info of %s%s '
+ 'failed with errorcode %s' %
+ (vol, via, vi.find('opErrno').text))
+ self.tree = vi
+ self.volume = vol
+ self.host = host
+
+ def get(self, elem):
+ return self.tree.findall('.//' + elem)
+
+ def is_tier(self):
+ return (self.get('typeStr')[0].text == 'Tier')
+
+ def is_hot(self, brickpath):
+ logging.debug('brickpath: ' + repr(brickpath))
+ return brickpath in self.hot_bricks
+
+ @property
+ @memoize
+ def bricks(self):
+ def bparse(b):
+ host, dirp = host_brick_split(b.find("name").text)
+ return {'host': host, 'dir': dirp, 'uuid': b.find("hostUuid").text}
+ return [bparse(b) for b in self.get('brick')]
+
+ @property
+ @memoize
+ def uuid(self):
+ ids = self.get('id')
+ if len(ids) != 1:
+ raise GsyncdError("volume info of %s obtained from %s: "
+ "ambiguous uuid" % (self.volume, self.host))
+ return ids[0].text
+
+ def replica_count(self, tier, hot):
+ if (tier and hot):
+ return int(self.get('hotBricks/hotreplicaCount')[0].text)
+ elif (tier and not hot):
+ return int(self.get('coldBricks/coldreplicaCount')[0].text)
+ else:
+ return int(self.get('replicaCount')[0].text)
+
+ def disperse_count(self, tier, hot):
+ if (tier and hot):
+ # Tiering doesn't support disperse volume as hot brick,
+ # hence no xml output, so returning 0. In case, if it's
+ # supported later, we should change here.
+ return 0
+ elif (tier and not hot):
+ return int(self.get('coldBricks/colddisperseCount')[0].text)
+ else:
+ return int(self.get('disperseCount')[0].text)
+
+ def distribution_count(self, tier, hot):
+ if (tier and hot):
+ return int(self.get('hotBricks/hotdistCount')[0].text)
+ elif (tier and not hot):
+ return int(self.get('coldBricks/colddistCount')[0].text)
+ else:
+ return int(self.get('distCount')[0].text)
+
+ @property
+ @memoize
+ def hot_bricks(self):
+ return [b.text for b in self.get('hotBricks/brick')]
+
+ def get_hot_bricks_count(self, tier):
+ if (tier):
+ return int(self.get('hotBricks/hotbrickCount')[0].text)
+ else:
+ return 0
+
+
+class VolinfoFromGconf(object):
+ # Glusterd will generate following config items before Geo-rep start
+ # So that Geo-rep need not run gluster commands from inside
+ # Volinfo object API/interface kept as is so that caller need not
+ # change anything except calling this instead of Volinfo()
+ #
+ # master-bricks=
+ # master-bricks=NODEID:HOSTNAME:PATH,..
+ # slave-bricks=NODEID:HOSTNAME,..
+ # master-volume-id=
+ # slave-volume-id=
+ # master-replica-count=
+ # master-disperse_count=
+ def __init__(self, vol, host='localhost', master=True):
+ self.volume = vol
+ self.host = host
+ self.master = master
+
+ def is_tier(self):
+ return False
+
+ def is_hot(self, brickpath):
+ return False
+
+ def is_uuid(self, value):
+ try:
+ uuid.UUID(value)
+ return True
+ except ValueError:
+ return False
+
+ def possible_path(self, value):
+ return "/" in value
+
+ @property
+ @memoize
+ def bricks(self):
+ pfx = "master-" if self.master else "slave-"
+ bricks_data = gconf.get(pfx + "bricks")
+ if bricks_data is None:
+ return []
+
+ bricks_data = bricks_data.split(",")
+ bricks_data = [b.strip() for b in bricks_data]
+ out = []
+ for b in bricks_data:
+ parts = b.split(":")
+ b_uuid = None
+ if self.is_uuid(parts[0]):
+ b_uuid = parts[0]
+ # Set all parts except first
+ parts = parts[1:]
+
+ if self.possible_path(parts[-1]):
+ bpath = parts[-1]
+ # Set all parts except last
+ parts = parts[0:-1]
+
+ out.append({
+ "host": ":".join(parts), # if remaining parts are IPv6 name
+ "dir": bpath,
+ "uuid": b_uuid
+ })
+
+ return out
+
+ @property
+ @memoize
+ def uuid(self):
+ if self.master:
+ return gconf.get("master-volume-id")
+ else:
+ return gconf.get("slave-volume-id")
+
+ def replica_count(self, tier, hot):
+ return gconf.get("master-replica-count")
+
+ def disperse_count(self, tier, hot):
+ return gconf.get("master-disperse-count")
+
+ def distribution_count(self, tier, hot):
+ return gconf.get("master-distribution-count")
+
+ @property
+ @memoize
+ def hot_bricks(self):
+ return []
+
+ def get_hot_bricks_count(self, tier):
+ return 0
+
+
+def can_ssh(host, port=22):
+ s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ try:
+ s.connect((host, port))
+ flag = True
+ except socket.error:
+ flag = False
+
+ s.close()
+ return flag
+
+
+def get_up_nodes(hosts, port):
+ # List of hosts with Hostname/IP and UUID
+ up_nodes = []
+ for h in hosts:
+ if can_ssh(h[0], port):
+ up_nodes.append(h)
+
+ return up_nodes