summaryrefslogtreecommitdiffstats
path: root/geo-replication/syncdaemon/syncdutils.py
diff options
context:
space:
mode:
authorAravinda VK <avishwan@redhat.com>2017-06-21 12:56:14 +0530
committerAmar Tumballi <amarts@redhat.com>2017-11-15 05:20:08 +0000
commit705ec055040268f876d04fe5743a6ce4738d6e02 (patch)
tree47841519fd8415e31777418a94575cfae06b4bb8 /geo-replication/syncdaemon/syncdutils.py
parentda825da9501bcb51656e82cda3c21a2ef592c5e2 (diff)
geo-rep: Refactoring Config and Arguments parsing
- Fixed Python pep8 issues - Removed dead code - Rewritten configuration management - Rewritten Arguments/subcommands handling - Added Args upgrade to accommodate all these changes without changing glusterd code - use of md5 removed, which was used to hash the brick path for workdir Both Master and Slave nodes will have subdir for session in the format "<mastervol>_<primary_slave_host>_<slavevol> $GLUSTER_LOGDIR/geo-replication/<mastervol>_<primary_slave_host>_<slavevol> $GLUSTER_LOGDIR/geo-replication-slaves/<mastervol>_<primary_slave_host>_<slavevol> Log file paths renamed since session info is available with directory name itself. $LOG_DIR_MASTER/ - gsyncd.log - Gsyncd, Worker monitor logs - mnt-<brick-path>.log - Aux mount logs, mounted by each worker - changes-<brick-path>.log - Changelog related logs(One per brick) $LOG_DIR_SLAVE/ - gsyncd.log - Slave Gsyncd logs - mnt-<master-node>-<master-brick-path>.log - Aux mount logs, mounted for each connection from master-node:master-brick - mnt-mbr-<master-node>-<master-brick-path>.log - Same as above, but mountbroker setup Fixes: #73 Change-Id: I2ec2a21e4e2a92fd92899d026e8543725276f021 Signed-off-by: Aravinda VK <avishwan@redhat.com>
Diffstat (limited to 'geo-replication/syncdaemon/syncdutils.py')
-rw-r--r--geo-replication/syncdaemon/syncdutils.py121
1 files changed, 50 insertions, 71 deletions
diff --git a/geo-replication/syncdaemon/syncdutils.py b/geo-replication/syncdaemon/syncdutils.py
index e611b7b6ae5..bc03522fdda 100644
--- a/geo-replication/syncdaemon/syncdutils.py
+++ b/geo-replication/syncdaemon/syncdutils.py
@@ -15,19 +15,19 @@ import time
import fcntl
import shutil
import logging
-import socket
import errno
import threading
import subprocess
from subprocess import PIPE
from threading import Lock, Thread as baseThread
from errno import EACCES, EAGAIN, EPIPE, ENOTCONN, ECONNABORTED
-from errno import EINTR, ENOENT, EPERM, ESTALE, EBUSY, errorcode
+from errno import EINTR, ENOENT, ESTALE, EBUSY, errorcode
from signal import signal, SIGTERM
import select as oselect
from os import waitpid as owaitpid
import xml.etree.ElementTree as XET
from select import error as SelectError
+from cPickle import PickleError
from conf import GLUSTERFS_LIBEXECDIR, UUID_FILE
sys.path.insert(1, GLUSTERFS_LIBEXECDIR)
@@ -46,25 +46,10 @@ except ImportError:
EVENT_GEOREP_PASSIVE = None
EVENT_GEOREP_CHECKPOINT_COMPLETED = None
-try:
- from cPickle import PickleError
-except ImportError:
- # py 3
- from pickle import PickleError
-
-from gconf import gconf
+import gsyncdconfig as gconf
+from rconf import rconf
-try:
- # py 3
- from urllib import parse as urllib
-except ImportError:
- import urllib
-
-try:
- from hashlib import md5 as md5
-except ImportError:
- # py 2.4
- from md5 import new as md5
+from hashlib import md5 as md5
# auxiliary gfid based access prefix
_CL_AUX_GFID_PFX = ".gfid/"
@@ -80,6 +65,10 @@ SPACE_ESCAPE_CHAR = "%20"
NEWLINE_ESCAPE_CHAR = "%0A"
PERCENTAGE_ESCAPE_CHAR = "%25"
+final_lock = Lock()
+
+mntpt_list = []
+
def sup(x, *a, **kw):
"""a rubyesque "super" for python ;)
@@ -93,12 +82,7 @@ def sup(x, *a, **kw):
def escape(s):
"""the chosen flavor of string escaping, used all over
to turn whatever data to creatable representation"""
- return urllib.quote_plus(s)
-
-
-def unescape(s):
- """inverse of .escape"""
- return urllib.unquote_plus(s)
+ return s.replace("/", "-").strip("-")
def escape_space_newline(s):
@@ -170,17 +154,17 @@ def setup_ssh_ctl(ctld, remote_addr, resource_url):
"""
Setup GConf ssh control path parameters
"""
- gconf.ssh_ctl_dir = ctld
+ rconf.ssh_ctl_dir = ctld
content = "SLAVE_HOST=%s\nSLAVE_RESOURCE_URL=%s" % (remote_addr,
resource_url)
content_md5 = md5hex(content)
- fname = os.path.join(gconf.ssh_ctl_dir,
+ fname = os.path.join(rconf.ssh_ctl_dir,
"%s.mft" % content_md5)
create_manifest(fname, content)
- ssh_ctl_path = os.path.join(gconf.ssh_ctl_dir,
+ ssh_ctl_path = os.path.join(rconf.ssh_ctl_dir,
"%s.sock" % content_md5)
- gconf.ssh_ctl_args = ["-oControlMaster=auto", "-S", ssh_ctl_path]
+ rconf.ssh_ctl_args = ["-oControlMaster=auto", "-S", ssh_ctl_path]
def grabfile(fname, content=None):
@@ -207,32 +191,30 @@ def grabfile(fname, content=None):
except:
f.close()
raise
- gconf.permanent_handles.append(f)
+ rconf.permanent_handles.append(f)
return f
def grabpidfile(fname=None, setpid=True):
""".grabfile customization for pid files"""
if not fname:
- fname = gconf.pid_file
+ fname = gconf.get("pid-file")
content = None
if setpid:
content = str(os.getpid()) + '\n'
return grabfile(fname, content=content)
-final_lock = Lock()
-mntpt_list = []
-def finalize(*a, **kw):
+def finalize(*args, **kwargs):
"""all those messy final steps we go trough upon termination
Do away with pidfile, ssh control dir and logging.
"""
final_lock.acquire()
- if getattr(gconf, 'pid_file', None):
- rm_pidf = gconf.pid_file_owned
- if gconf.cpid:
+ if gconf.get('pid_file'):
+ rm_pidf = rconf.pid_file_owned
+ if rconf.cpid:
# exit path from parent branch of daemonization
rm_pidf = False
while True:
@@ -240,37 +222,31 @@ def finalize(*a, **kw):
if not f:
# child has already taken over pidfile
break
- if os.waitpid(gconf.cpid, os.WNOHANG)[0] == gconf.cpid:
+ if os.waitpid(rconf.cpid, os.WNOHANG)[0] == rconf.cpid:
# child has terminated
rm_pidf = True
break
time.sleep(0.1)
if rm_pidf:
try:
- os.unlink(gconf.pid_file)
+ os.unlink(rconf.pid_file)
except:
ex = sys.exc_info()[1]
if ex.errno == ENOENT:
pass
else:
raise
- if gconf.ssh_ctl_dir and not gconf.cpid:
+ if rconf.ssh_ctl_dir and not rconf.cpid:
def handle_rm_error(func, path, exc_info):
if exc_info[1].errno == ENOENT:
return
raise exc_info[1]
- shutil.rmtree(gconf.ssh_ctl_dir, onerror=handle_rm_error)
- if getattr(gconf, 'state_socket', None):
- try:
- os.unlink(gconf.state_socket)
- except:
- if sys.exc_info()[0] == OSError:
- pass
+ shutil.rmtree(rconf.ssh_ctl_dir, onerror=handle_rm_error)
""" Unmount if not done """
for mnt in mntpt_list:
- p0 = subprocess.Popen (["umount", "-l", mnt], stderr=subprocess.PIPE)
+ p0 = subprocess.Popen(["umount", "-l", mnt], stderr=subprocess.PIPE)
_, errdata = p0.communicate()
if p0.returncode == 0:
try:
@@ -280,12 +256,11 @@ def finalize(*a, **kw):
else:
pass
- if gconf.log_exit:
+ if rconf.log_exit:
logging.info("exiting.")
sys.stdout.flush()
sys.stderr.flush()
- os._exit(kw.get('exval', 0))
-
+ os._exit(kwargs.get('exval', 0))
def log_raise_exception(excont):
@@ -315,9 +290,9 @@ def log_raise_exception(excont):
((isinstance(exc, OSError) or isinstance(exc, IOError)) and
exc.errno == EPIPE):
logging.error('connection to peer is broken')
- if hasattr(gconf, 'transport'):
- gconf.transport.wait()
- if gconf.transport.returncode == 127:
+ if hasattr(rconf, 'transport'):
+ rconf.transport.wait()
+ if rconf.transport.returncode == 127:
logging.error("getting \"No such file or directory\""
"errors is most likely due to "
"MISCONFIGURATION, please remove all "
@@ -331,7 +306,7 @@ def log_raise_exception(excont):
"<SLAVEVOL> config remote-gsyncd "
"<GSYNCD_PATH> (Example GSYNCD_PATH: "
"`/usr/libexec/glusterfs/gsyncd`)")
- gconf.transport.terminate_geterr()
+ rconf.transport.terminate_geterr()
elif isinstance(exc, OSError) and exc.errno in (ENOTCONN,
ECONNABORTED):
logging.error(lf('glusterfs session went down',
@@ -365,20 +340,20 @@ class Thread(baseThread):
function coughs up an exception
"""
- def __init__(self, *a, **kw):
- tf = kw.get('target')
+ def __init__(self, *args, **kwargs):
+ tf = kwargs.get('target')
if tf:
- def twrap(*aa):
+ def twrap(*aargs):
excont = FreeObject(exval=0)
try:
- tf(*aa)
+ tf(*aargs)
except:
try:
log_raise_exception(excont)
finally:
finalize(exval=excont.exval)
- kw['target'] = twrap
- baseThread.__init__(self, *a, **kw)
+ kwargs['target'] = twrap
+ baseThread.__init__(self, *args, **kwargs)
self.setDaemon(True)
@@ -443,7 +418,7 @@ def boolify(s):
lstr = s.lower()
if lstr in true_list:
rv = True
- elif not lstr in false_list:
+ elif lstr not in false_list:
logging.warn(lf("Unknown string in \"string to boolean\" conversion, "
"defaulting to False",
str=s))
@@ -451,29 +426,33 @@ def boolify(s):
return rv
-def eintr_wrap(func, exc, *a):
+def eintr_wrap(func, exc, *args):
"""
wrapper around syscalls resilient to interrupt caused
by signals
"""
while True:
try:
- return func(*a)
+ return func(*args)
except exc:
ex = sys.exc_info()[1]
if not ex.args[0] == EINTR:
raise
-def select(*a):
- return eintr_wrap(oselect.select, oselect.error, *a)
+def select(*args):
+ return eintr_wrap(oselect.select, oselect.error, *args)
+
+
+def waitpid(*args):
+ return eintr_wrap(owaitpid, OSError, *args)
-def waitpid(*a):
- return eintr_wrap(owaitpid, OSError, *a)
+def term_handler_default_hook(signum, frame):
+ finalize(signum, frame, exval=1)
-def set_term_handler(hook=lambda *a: finalize(*a, **{'exval': 1})):
+def set_term_handler(hook=term_handler_default_hook):
signal(SIGTERM, hook)
@@ -550,7 +529,7 @@ def errno_wrap(call, arg=[], errnos=[], retry_errnos=[]):
ex = sys.exc_info()[1]
if ex.errno in errnos:
return ex.errno
- if not ex.errno in retry_errnos:
+ if ex.errno not in retry_errnos:
raise
nr_tries += 1
if nr_tries == GF_OP_RETRIES: