summaryrefslogtreecommitdiffstats
path: root/geo-replication/syncdaemon/gsyncd.py
diff options
context:
space:
mode:
authorAvra Sengupta <asengupt@redhat.com>2013-06-01 16:17:57 +0530
committerVijay Bellur <vbellur@redhat.com>2013-07-26 13:18:57 -0700
commitb13c483dca20e4015b958f8959328e665a357f60 (patch)
tree2af62fc50bae39e930fcbe09101d3e51c76eb6fc /geo-replication/syncdaemon/gsyncd.py
parent4944fc943efc41df1841e4e559180171f6541112 (diff)
gsyncd: distribute the crawling load
* also consume changelog for change detection. * Status fixes * Use new libgfchangelog done API * process (and sync) one changelog at a time Change-Id: I24891615bb762e0741b1819ddfdef8802326cb16 BUG: 847839 Original Author: Csaba Henk <csaba@redhat.com> Original Author: Aravinda VK <avishwan@redhat.com> Original Author: Venky Shankar <vshankar@redhat.com> Original Author: Amar Tumballi <amarts@redhat.com> Original Author: Avra Sengupta <asengupt@redhat.com> Signed-off-by: Avra Sengupta <asengupt@redhat.com> Reviewed-on: http://review.gluster.org/5131 Reviewed-by: Vijay Bellur <vbellur@redhat.com> Tested-by: Vijay Bellur <vbellur@redhat.com>
Diffstat (limited to 'geo-replication/syncdaemon/gsyncd.py')
-rw-r--r--geo-replication/syncdaemon/gsyncd.py125
1 files changed, 110 insertions, 15 deletions
diff --git a/geo-replication/syncdaemon/gsyncd.py b/geo-replication/syncdaemon/gsyncd.py
index 387900e6c..ad498c39c 100644
--- a/geo-replication/syncdaemon/gsyncd.py
+++ b/geo-replication/syncdaemon/gsyncd.py
@@ -2,10 +2,12 @@
import os
import os.path
+import glob
import sys
import time
import logging
import signal
+import shutil
import optparse
import fcntl
import fnmatch
@@ -17,7 +19,7 @@ from ipaddr import IPAddress, IPNetwork
from gconf import gconf
from syncdutils import FreeObject, norm, grabpidfile, finalize, log_raise_exception
-from syncdutils import GsyncdError, select, set_term_handler, privileged
+from syncdutils import GsyncdError, select, set_term_handler, privileged, update_file
from configinterface import GConffile
import resource
from monitor import monitor
@@ -109,6 +111,17 @@ def startup(**kw):
GLogger._gsyncd_loginit(**kw)
+
+def _unlink(path):
+ try:
+ os.unlink(path)
+ except (OSError, IOError):
+ if sys.exc_info()[1].errno == ENOENT:
+ pass
+ else:
+ raise GsyncdError('Unlink error: %s' % path)
+
+
def main():
"""main routine, signal/exception handling boilerplates"""
gconf.starttime = time.time()
@@ -153,21 +166,27 @@ def main_i():
op.add_option('--gluster-log-file', metavar='LOGF', default=os.devnull, type=str, action='callback', callback=store_abs)
op.add_option('--gluster-log-level', metavar='LVL')
op.add_option('--gluster-params', metavar='PRMS', default='')
+ op.add_option('--glusterd-uuid', metavar='UUID', type=str, default='', help=SUPPRESS_HELP)
op.add_option('--gluster-cli-options', metavar='OPTS', default='--log-file=-')
op.add_option('--mountbroker', metavar='LABEL')
op.add_option('-p', '--pid-file', metavar='PIDF', type=str, action='callback', callback=store_abs)
op.add_option('-l', '--log-file', metavar='LOGF', type=str, action='callback', callback=store_abs)
op.add_option('--log-file-mbr', metavar='LOGF', type=str, action='callback', callback=store_abs)
op.add_option('--state-file', metavar='STATF', type=str, action='callback', callback=store_abs)
+ op.add_option('--state-detail-file', metavar='STATF', type=str, action='callback', callback=store_abs)
op.add_option('--ignore-deletes', default=False, action='store_true')
+ op.add_option('--isolated-slave', default=False, action='store_true')
op.add_option('--use-rsync-xattrs', default=False, action='store_true')
op.add_option('-L', '--log-level', metavar='LVL')
op.add_option('-r', '--remote-gsyncd', metavar='CMD', default=os.path.abspath(sys.argv[0]))
op.add_option('--volume-id', metavar='UUID')
+ op.add_option('--slave-id', metavar='ID')
op.add_option('--session-owner', metavar='ID')
+ op.add_option('--local-id', metavar='ID', help=SUPPRESS_HELP, default='')
+ op.add_option('--local-path', metavar='PATH', help=SUPPRESS_HELP, default='')
op.add_option('-s', '--ssh-command', metavar='CMD', default='ssh')
op.add_option('--rsync-command', metavar='CMD', default='rsync')
- op.add_option('--rsync-options', metavar='OPTS', default='--sparse')
+ op.add_option('--rsync-options', metavar='OPTS', default='')
op.add_option('--rsync-ssh-options', metavar='OPTS', default='--compress')
op.add_option('--timeout', metavar='SEC', type=int, default=120)
op.add_option('--connection-timeout', metavar='SEC', type=int, default=60, help=SUPPRESS_HELP)
@@ -186,15 +205,28 @@ def main_i():
# see crawl() for usage of the above tunables
op.add_option('--special-sync-mode', type=str, help=SUPPRESS_HELP)
+ # changelog or xtime? (TODO: Change the default)
+ op.add_option('--change-detector', metavar='MODE', type=str, default='xtime')
+ # sleep interval for change detection (xtime crawl uses a hardcoded 1 second sleep time)
+ op.add_option('--change-interval', metavar='SEC', type=int, default=3)
+ # working directory for changelog based mechanism
+ op.add_option('--working-dir', metavar='DIR', type=str, action='callback', callback=store_abs)
+
op.add_option('-c', '--config-file', metavar='CONF', type=str, action='callback', callback=store_local)
# duh. need to specify dest or value will be mapped to None :S
op.add_option('--monitor', dest='monitor', action='callback', callback=store_local_curry(True))
+ op.add_option('--resource-local', dest='resource_local', type=str, action='callback', callback=store_local)
+ op.add_option('--resource-remote', dest='resource_remote', type=str, action='callback', callback=store_local)
op.add_option('--feedback-fd', dest='feedback_fd', type=int, help=SUPPRESS_HELP, action='callback', callback=store_local)
op.add_option('--listen', dest='listen', help=SUPPRESS_HELP, action='callback', callback=store_local_curry(True))
op.add_option('-N', '--no-daemon', dest="go_daemon", action='callback', callback=store_local_curry('dont'))
+ op.add_option('--verify', type=str, dest="verify", action='callback', callback=store_local)
+ op.add_option('--create', type=str, dest="create", action='callback', callback=store_local)
+ op.add_option('--delete', dest='delete', action='callback', callback=store_local_curry(True))
op.add_option('--debug', dest="go_daemon", action='callback', callback=lambda *a: (store_local_curry('dont')(*a),
setattr(a[-1].values, 'log_file', '-'),
setattr(a[-1].values, 'log_level', 'DEBUG'))),
+ op.add_option('--path', type=str, action='append')
for a in ('check', 'get'):
op.add_option('--config-' + a, metavar='OPT', type=str, dest='config', action='callback',
@@ -225,6 +257,19 @@ def main_i():
# values container.
defaults = op.get_default_values()
opts, args = op.parse_args(values=optparse.Values())
+ args_orig = args[:]
+ r = rconf.get('resource_local')
+ if r:
+ if len(args) == 0:
+ args.append(None)
+ args[0] = r
+ r = rconf.get('resource_remote')
+ if r:
+ if len(args) == 0:
+ raise GsyncdError('local resource unspecfied')
+ elif len(args) == 1:
+ args.append(None)
+ args[1] = r
confdata = rconf.get('config')
if not (len(args) == 2 or \
(len(args) == 1 and rconf.get('listen')) or \
@@ -234,6 +279,12 @@ def main_i():
sys.stderr.write(op.get_usage() + "\n")
sys.exit(1)
+ verify = rconf.get('verify')
+ if verify:
+ logging.info (verify)
+ logging.info ("Able to spawn gsyncd.py")
+ return
+
restricted = os.getenv('_GSYNCD_RESTRICTED_')
if restricted:
@@ -250,6 +301,17 @@ def main_i():
(k, v))
confrx = getattr(confdata, 'rx', None)
+ def makersc(aa, check=True):
+ if not aa:
+ return ([], None, None)
+ ra = [resource.parse_url(u) for u in aa]
+ local = ra[0]
+ remote = None
+ if len(ra) > 1:
+ remote = ra[1]
+ if check and not local.can_connect_to(remote):
+ raise GsyncdError("%s cannot work with %s" % (local.path, remote and remote.path))
+ return (ra, local, remote)
if confrx:
# peers are regexen, don't try to parse them
if confrx == 'glob':
@@ -257,27 +319,20 @@ def main_i():
canon_peers = args
namedict = {}
else:
- rscs = [resource.parse_url(u) for u in args]
dc = rconf.get('url_print')
+ rscs, local, remote = makersc(args_orig, not dc)
if dc:
for r in rscs:
print(r.get_url(**{'normal': {},
'canon': {'canonical': True},
'canon_esc': {'canonical': True, 'escaped': True}}[dc]))
return
- local = remote = None
- if rscs:
- local = rscs[0]
- if len(rscs) > 1:
- remote = rscs[1]
- if not local.can_connect_to(remote):
- raise GsyncdError("%s cannot work with %s" % (local.path, remote and remote.path))
pa = ([], [], [])
urlprms = ({}, {'canonical': True}, {'canonical': True, 'escaped': True})
for x in rscs:
for i in range(len(pa)):
pa[i].append(x.get_url(**urlprms[i]))
- peers, canon_peers, canon_esc_peers = pa
+ _, canon_peers, canon_esc_peers = pa
# creating the namedict, a dict representing various ways of referring to / repreenting
# peers to be fillable in config templates
mods = (lambda x: x, lambda x: x[0].upper() + x[1:], lambda x: 'e' + x[0].upper() + x[1:])
@@ -327,6 +382,39 @@ def main_i():
gconf.__dict__.update(opts.__dict__)
gconf.configinterface = gcnf
+ delete = rconf.get('delete')
+ if delete:
+ logging.info ('geo-replication delete')
+ # Delete pid file, status file, socket file
+ cleanup_paths = []
+ if getattr(gconf, 'pid_file', None):
+ cleanup_paths.append(gconf.pid_file)
+
+ if getattr(gconf, 'state_file', None):
+ cleanup_paths.append(gconf.state_file)
+
+ if getattr(gconf, 'state_detail_file', None):
+ cleanup_paths.append(gconf.state_detail_file)
+
+ if getattr(gconf, 'state_socket_unencoded', None):
+ cleanup_paths.append(gconf.state_socket_unencoded)
+
+ # Cleanup changelog working dirs
+ if getattr(gconf, 'working_dir', None):
+ try:
+ shutil.rmtree(gconf.working_dir)
+ except (IOError, OSError):
+ if sys.exc_info()[1].errno == ENOENT:
+ pass
+ else:
+ raise GsyncdError('Error while removing working dir: %s' % gconf.working_dir)
+
+ for path in cleanup_paths:
+ # To delete temp files
+ for f in glob.glob(path + "*"):
+ _unlink(f)
+ return
+
if restricted and gconf.allow_network:
ssh_conn = os.getenv('SSH_CONNECTION')
if not ssh_conn:
@@ -380,9 +468,16 @@ def main_i():
raise
return
+ create = rconf.get('create')
+ if create:
+ if getattr(gconf, 'state_file', None):
+ update_file(gconf.state_file, lambda f: f.write(create + '\n'))
+ return
+
go_daemon = rconf['go_daemon']
be_monitor = rconf.get('monitor')
+ rscs, local, remote = makersc(args)
if not be_monitor and isinstance(remote, resource.SSH) and \
go_daemon == 'should':
go_daemon = 'postconn'
@@ -393,16 +488,16 @@ def main_i():
label = 'monitor'
elif remote:
#master
- label = ''
+ label = gconf.local_path
else:
label = 'slave'
startup(go_daemon=go_daemon, log_file=log_file, label=label)
+ resource.Popen.init_errhandler()
if be_monitor:
- return monitor()
+ return monitor(*rscs)
- logging.info("syncing: %s" % " -> ".join(peers))
- resource.Popen.init_errhandler()
+ logging.info("syncing: %s" % " -> ".join(r.url for r in rscs))
if remote:
go_daemon = remote.connect_remote(go_daemon=go_daemon)
if go_daemon: