summaryrefslogtreecommitdiffstats
path: root/geo-replication/syncdaemon/master.py
diff options
context:
space:
mode:
Diffstat (limited to 'geo-replication/syncdaemon/master.py')
-rw-r--r--geo-replication/syncdaemon/master.py235
1 files changed, 126 insertions, 109 deletions
diff --git a/geo-replication/syncdaemon/master.py b/geo-replication/syncdaemon/master.py
index 4c1a529a3ed..552c4deec44 100644
--- a/geo-replication/syncdaemon/master.py
+++ b/geo-replication/syncdaemon/master.py
@@ -12,7 +12,6 @@ import os
import sys
import time
import stat
-import json
import logging
import fcntl
import string
@@ -21,9 +20,11 @@ import tarfile
from errno import ENOENT, ENODATA, EEXIST, EACCES, EAGAIN, ESTALE, EINTR
from threading import Condition, Lock
from datetime import datetime
-from gconf import gconf
-from syncdutils import Thread, GsyncdError, boolify, escape_space_newline
-from syncdutils import unescape_space_newline, gauxpfx, md5hex, selfkill
+
+import gsyncdconfig as gconf
+from rconf import rconf
+from syncdutils import Thread, GsyncdError, escape_space_newline
+from syncdutils import unescape_space_newline, gauxpfx, escape
from syncdutils import lstat, errno_wrap, FreeObject, lf, matching_disk_gfid
from syncdutils import NoStimeAvailable, PartialHistoryAvailable
@@ -85,24 +86,41 @@ def gmaster_builder(excrawl=None):
"""produce the GMaster class variant corresponding
to sync mode"""
this = sys.modules[__name__]
- modemixin = gconf.special_sync_mode
+ modemixin = gconf.get("special-sync-mode")
if not modemixin:
modemixin = 'normal'
- changemixin = 'xsync' if gconf.change_detector == 'xsync' \
- else excrawl or gconf.change_detector
+
+ if gconf.get("change-detector") == 'xsync':
+ changemixin = 'xsync'
+ elif excrawl:
+ changemixin = excrawl
+ else:
+ changemixin = gconf.get("change-detector")
+
logging.debug(lf('setting up change detection mode',
mode=changemixin))
modemixin = getattr(this, modemixin.capitalize() + 'Mixin')
crawlmixin = getattr(this, 'GMaster' + changemixin.capitalize() + 'Mixin')
- sendmarkmixin = boolify(
- gconf.use_rsync_xattrs) and SendmarkRsyncMixin or SendmarkNormalMixin
- purgemixin = boolify(
- gconf.ignore_deletes) and PurgeNoopMixin or PurgeNormalMixin
- syncengine = boolify(gconf.use_tarssh) and TarSSHEngine or RsyncEngine
+
+ if gconf.get("use-rsync-xattrs"):
+ sendmarkmixin = SendmarkRsyncMixin
+ else:
+ sendmarkmixin = SendmarkNormalMixin
+
+ if gconf.get("ignore-deletes"):
+ purgemixin = PurgeNoopMixin
+ else:
+ purgemixin = PurgeNormalMixin
+
+ if gconf.get("sync-method") == "tarssh":
+ syncengine = TarSSHEngine
+ else:
+ syncengine = RsyncEngine
class _GMaster(crawlmixin, modemixin, sendmarkmixin,
purgemixin, syncengine):
pass
+
return _GMaster
@@ -139,9 +157,9 @@ class NormalMixin(object):
return xt0 >= xt1
def make_xtime_opts(self, is_master, opts):
- if not 'create' in opts:
+ if 'create' not in opts:
opts['create'] = is_master
- if not 'default_xtime' in opts:
+ if 'default_xtime' not in opts:
opts['default_xtime'] = URXTIME
def xtime_low(self, rsc, path, **opts):
@@ -212,9 +230,9 @@ class RecoverMixin(NormalMixin):
@staticmethod
def make_xtime_opts(is_master, opts):
- if not 'create' in opts:
+ if 'create' not in opts:
opts['create'] = False
- if not 'default_xtime' in opts:
+ if 'default_xtime' not in opts:
opts['default_xtime'] = URXTIME
def keepalive_payload_hook(self, timo, gap):
@@ -385,7 +403,7 @@ class GMasterCommon(object):
self.master = master
self.slave = slave
self.jobtab = {}
- if boolify(gconf.use_tarssh):
+ if gconf.get("sync-method") == "tarssh":
self.syncer = Syncer(slave, self.slave.tarssh, [2])
else:
# partial transfer (cf. rsync(1)), that's normal
@@ -401,7 +419,7 @@ class GMasterCommon(object):
# 0.
self.crawls = 0
self.turns = 0
- self.total_turns = int(gconf.turns)
+ self.total_turns = rconf.turns
self.crawl_start = datetime.now()
self.lastreport = {'crawls': 0, 'turns': 0, 'time': 0}
self.start = None
@@ -414,7 +432,7 @@ class GMasterCommon(object):
def init_keep_alive(cls):
"""start the keep-alive thread """
- timo = int(gconf.timeout or 0)
+ timo = gconf.get("slave-timeout", 0)
if timo > 0:
def keep_alive():
while True:
@@ -427,28 +445,28 @@ class GMasterCommon(object):
def mgmt_lock(self):
"""Take management volume lock """
- if gconf.mgmt_lock_fd:
+ if rconf.mgmt_lock_fd:
try:
- fcntl.lockf(gconf.mgmt_lock_fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
- if not gconf.active_earlier:
- gconf.active_earlier = True
+ fcntl.lockf(rconf.mgmt_lock_fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
+ if not rconf.active_earlier:
+ rconf.active_earlier = True
logging.info(lf("Got lock Becoming ACTIVE",
- brick=gconf.local_path))
+ brick=rconf.args.local_path))
return True
except:
ex = sys.exc_info()[1]
if isinstance(ex, IOError) and ex.errno in (EACCES, EAGAIN):
- if not gconf.passive_earlier:
- gconf.passive_earlier = True
+ if not rconf.passive_earlier:
+ rconf.passive_earlier = True
logging.info(lf("Didn't get lock Becoming PASSIVE",
- brick=gconf.local_path))
+ brick=rconf.local_path))
return False
raise
fd = None
- bname = str(self.uuid) + "_" + str(gconf.slave_id) + "_subvol_" \
- + str(gconf.subvol_num) + ".lock"
- mgmt_lock_dir = os.path.join(gconf.meta_volume_mnt, "geo-rep")
+ bname = str(self.uuid) + "_" + rconf.args.slave_id + "_subvol_" \
+ + str(rconf.args.subvol_num) + ".lock"
+ mgmt_lock_dir = os.path.join(gconf.get("meta-volume-mnt"), "geo-rep")
path = os.path.join(mgmt_lock_dir, bname)
logging.debug(lf("lock file path", path=path))
try:
@@ -471,30 +489,30 @@ class GMasterCommon(object):
try:
fcntl.lockf(fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
# Save latest FD for future use
- gconf.mgmt_lock_fd = fd
+ rconf.mgmt_lock_fd = fd
except:
ex = sys.exc_info()[1]
if isinstance(ex, IOError) and ex.errno in (EACCES, EAGAIN):
# cannot grab, it's taken
- if not gconf.passive_earlier:
- gconf.passive_earlier = True
+ if not rconf.passive_earlier:
+ rconf.passive_earlier = True
logging.info(lf("Didn't get lock Becoming PASSIVE",
- brick=gconf.local_path))
- gconf.mgmt_lock_fd = fd
+ brick=rconf.args.local_path))
+ rconf.mgmt_lock_fd = fd
return False
raise
- if not gconf.active_earlier:
- gconf.active_earlier = True
+ if not rconf.active_earlier:
+ rconf.active_earlier = True
logging.info(lf("Got lock Becoming ACTIVE",
- brick=gconf.local_path))
+ brick=rconf.args.local_path))
return True
def should_crawl(self):
- if not boolify(gconf.use_meta_volume):
- return gconf.glusterd_uuid in self.master.server.node_uuid()
+ if not gconf.get("use-meta-volume"):
+ return rconf.args.local_node_id in self.master.server.node_uuid()
- if not os.path.ismount(gconf.meta_volume_mnt):
+ if not os.path.ismount(gconf.get("meta-volume-mnt")):
logging.error("Meta-volume is not mounted. Worker Exiting...")
sys.exit(1)
return self.mgmt_lock()
@@ -532,7 +550,7 @@ class GMasterCommon(object):
logging.debug("%s master with volume id %s ..." %
(inter_master and "intermediate" or "primary",
self.uuid))
- gconf.configinterface.set('volume_id', self.uuid)
+ rconf.volume_id = self.uuid
if self.volinfo:
if self.volinfo['retval']:
logging.warn(lf("master cluster's info may not be valid",
@@ -557,7 +575,7 @@ class GMasterCommon(object):
turns=self.turns,
time=self.start)
t1 = time.time()
- if int(t1 - t0) >= int(gconf.replica_failover_interval):
+ if int(t1 - t0) >= gconf.get("replica-failover-interval"):
crawl = self.should_crawl()
t0 = t1
self.update_worker_remote_node()
@@ -567,7 +585,7 @@ class GMasterCommon(object):
# which is min of cluster (but max of the replicas)
brick_stime = self.xtime('.', self.slave)
cluster_stime = self.master.server.aggregated.stime_mnt(
- '.', '.'.join([str(self.uuid), str(gconf.slave_id)]))
+ '.', '.'.join([str(self.uuid), rconf.args.slave_id]))
logging.debug(lf("Crawl info",
cluster_stime=cluster_stime,
brick_stime=brick_stime))
@@ -675,6 +693,7 @@ class XCrawlMetadata(object):
self.st_atime = float(st_atime)
self.st_mtime = float(st_mtime)
+
class GMasterChangelogMixin(GMasterCommon):
""" changelog based change detection and syncing """
@@ -701,34 +720,34 @@ class GMasterChangelogMixin(GMasterCommon):
def init_fop_batch_stats(self):
self.batch_stats = {
- "CREATE":0,
- "MKNOD":0,
- "UNLINK":0,
- "MKDIR":0,
- "RMDIR":0,
- "LINK":0,
- "SYMLINK":0,
- "RENAME":0,
- "SETATTR":0,
- "SETXATTR":0,
- "XATTROP":0,
- "DATA":0,
- "ENTRY_SYNC_TIME":0,
- "META_SYNC_TIME":0,
- "DATA_START_TIME":0
+ "CREATE": 0,
+ "MKNOD": 0,
+ "UNLINK": 0,
+ "MKDIR": 0,
+ "RMDIR": 0,
+ "LINK": 0,
+ "SYMLINK": 0,
+ "RENAME": 0,
+ "SETATTR": 0,
+ "SETXATTR": 0,
+ "XATTROP": 0,
+ "DATA": 0,
+ "ENTRY_SYNC_TIME": 0,
+ "META_SYNC_TIME": 0,
+ "DATA_START_TIME": 0
}
def update_fop_batch_stats(self, ty):
if ty in ['FSETXATTR']:
- ty = 'SETXATTR'
- self.batch_stats[ty] = self.batch_stats.get(ty,0) + 1
+ ty = 'SETXATTR'
+ self.batch_stats[ty] = self.batch_stats.get(ty, 0) + 1
def archive_and_purge_changelogs(self, changelogs):
# Creates tar file instead of tar.gz, since changelogs will
# be appended to existing tar. archive name is
# archive_<YEAR><MONTH>.tar
archive_name = "archive_%s.tar" % datetime.today().strftime(
- gconf.changelog_archive_format)
+ gconf.get("changelog-archive-format"))
try:
tar = tarfile.open(os.path.join(self.processed_changelogs_dir,
@@ -764,13 +783,9 @@ class GMasterChangelogMixin(GMasterCommon):
else:
raise
- def fallback_xsync(self):
- logging.info('falling back to xsync mode')
- gconf.configinterface.set('change-detector', 'xsync')
- selfkill()
-
def setup_working_dir(self):
- workdir = os.path.join(gconf.working_dir, md5hex(gconf.local_path))
+ workdir = os.path.join(gconf.get("working-dir"),
+ escape(rconf.args.local_path))
logging.debug('changelog working dir %s' % workdir)
return workdir
@@ -804,27 +819,30 @@ class GMasterChangelogMixin(GMasterCommon):
logging.info(lf('Fixing gfid mismatch in slave. Deleting'
' the entry', retry_count=retry_count,
entry=repr(failure)))
- #Add deletion to fix_entry_ops list
+ # Add deletion to fix_entry_ops list
if failure[2]['slave_isdir']:
- fix_entry_ops.append(edct('RMDIR',
- gfid=failure[2]['slave_gfid'],
- entry=pbname))
+ fix_entry_ops.append(
+ edct('RMDIR',
+ gfid=failure[2]['slave_gfid'],
+ entry=pbname))
else:
- fix_entry_ops.append(edct('UNLINK',
- gfid=failure[2]['slave_gfid'],
- entry=pbname))
+ fix_entry_ops.append(
+ edct('UNLINK',
+ gfid=failure[2]['slave_gfid'],
+ entry=pbname))
elif not isinstance(st, int):
- #The file exists on master but with different name.
- #Probabaly renamed and got missed during xsync crawl.
+ # The file exists on master but with different name.
+ # Probabaly renamed and got missed during xsync crawl.
if failure[2]['slave_isdir']:
logging.info(lf('Fixing gfid mismatch in slave',
retry_count=retry_count,
entry=repr(failure)))
- realpath = os.readlink(os.path.join(gconf.local_path,
- ".glusterfs",
- slave_gfid[0:2],
- slave_gfid[2:4],
- slave_gfid))
+ realpath = os.readlink(os.path.join(
+ rconf.args.local_path,
+ ".glusterfs",
+ slave_gfid[0:2],
+ slave_gfid[2:4],
+ slave_gfid))
dst_entry = os.path.join(pfx, realpath.split('/')[-2],
realpath.split('/')[-1])
rename_dict = edct('RENAME', gfid=slave_gfid,
@@ -840,19 +858,20 @@ class GMasterChangelogMixin(GMasterCommon):
' Deleting the entry',
retry_count=retry_count,
entry=repr(failure)))
- fix_entry_ops.append(edct('UNLINK',
- gfid=failure[2]['slave_gfid'],
- entry=pbname))
+ fix_entry_ops.append(
+ edct('UNLINK',
+ gfid=failure[2]['slave_gfid'],
+ entry=pbname))
logging.error(lf('Entry cannot be fixed in slave due '
'to GFID mismatch, find respective '
'path for the GFID and trigger sync',
gfid=slave_gfid))
if fix_entry_ops:
- #Process deletions of entries whose gfids are mismatched
+ # Process deletions of entries whose gfids are mismatched
failures1 = self.slave.server.entry_ops(fix_entry_ops)
if not failures1:
- logging.info ("Sucessfully fixed entry ops with gfid mismatch")
+ logging.info("Sucessfully fixed entry ops with gfid mismatch")
return failures1
@@ -880,12 +899,11 @@ class GMasterChangelogMixin(GMasterCommon):
for failure in failures1:
logging.error("Failed to fix entry ops %s", repr(failure))
else:
- #Retry original entry list 5 times
+ # Retry original entry list 5 times
failures = self.slave.server.entry_ops(entries)
self.log_failures(failures, 'gfid', gauxpfx(), 'ENTRY')
-
def process_change(self, change, done, retry):
pfx = gauxpfx()
clist = []
@@ -930,7 +948,7 @@ class GMasterChangelogMixin(GMasterCommon):
# skip ENTRY operation if hot tier brick
if self.name == 'live_changelog' or \
self.name == 'history_changelog':
- if boolify(gconf.is_hottier) and et == self.TYPE_ENTRY:
+ if rconf.args.is_hottier and et == self.TYPE_ENTRY:
logging.debug(lf('skip ENTRY op if hot tier brick',
op=ec[self.POS_TYPE]))
continue
@@ -978,7 +996,7 @@ class GMasterChangelogMixin(GMasterCommon):
'master', gfid=gfid, pgfid_bname=en))
continue
- if not boolify(gconf.ignore_deletes):
+ if not gconf.get("ignore-deletes"):
if not ignore_entry_ops:
entries.append(edct(ty, gfid=gfid, entry=en))
elif ty in ['CREATE', 'MKDIR', 'MKNOD']:
@@ -1084,12 +1102,11 @@ class GMasterChangelogMixin(GMasterCommon):
st_mtime=ec[6])))
else:
meta_gfid.add((os.path.join(pfx, ec[0]), ))
- elif ec[1] == 'SETXATTR' or ec[1] == 'XATTROP' or \
- ec[1] == 'FXATTROP':
+ elif ec[1] in ['SETXATTR', 'XATTROP', 'FXATTROP']:
# To sync xattr/acls use rsync/tar, --xattrs and --acls
# switch to rsync and tar
- if not boolify(gconf.use_tarssh) and \
- (boolify(gconf.sync_xattrs) or boolify(gconf.sync_acls)):
+ if not gconf.get("sync-method") == "tarssh" and \
+ (gconf.get("sync-xattrs") or gconf.get("sync-acls")):
datas.add(os.path.join(pfx, ec[0]))
else:
logging.warn(lf('got invalid fop type',
@@ -1102,8 +1119,8 @@ class GMasterChangelogMixin(GMasterCommon):
self.status.inc_value("data", len(datas))
self.batch_stats["DATA"] += self.files_in_batch - \
- self.batch_stats["SETXATTR"] - \
- self.batch_stats["XATTROP"]
+ self.batch_stats["SETXATTR"] - \
+ self.batch_stats["XATTROP"]
entry_start_time = time.time()
# sync namespace
@@ -1185,7 +1202,7 @@ class GMasterChangelogMixin(GMasterCommon):
# with data of other changelogs.
if retry:
- if tries == (int(gconf.max_rsync_retries) - 1):
+ if tries == (gconf.get("max-rsync-retries") - 1):
# Enable Error logging if it is last retry
self.syncer.enable_errorlog()
@@ -1243,7 +1260,7 @@ class GMasterChangelogMixin(GMasterCommon):
# We do not know which changelog transfer failed, retry everything.
retry = True
tries += 1
- if tries == int(gconf.max_rsync_retries):
+ if tries == gconf.get("max-rsync-retries"):
logging.error(lf('changelogs could not be processed '
'completely - moving on...',
files=map(os.path.basename, changes)))
@@ -1331,8 +1348,7 @@ class GMasterChangelogMixin(GMasterCommon):
# Update last_synced_time in status file based on stime
# only update stime if stime xattr set to Brick root
if path == self.FLAT_DIR_HIERARCHY:
- chkpt_time = gconf.configinterface.get_realtime(
- "checkpoint")
+ chkpt_time = gconf.getr("checkpoint")
checkpoint_time = 0
if chkpt_time is not None:
checkpoint_time = int(chkpt_time)
@@ -1340,7 +1356,7 @@ class GMasterChangelogMixin(GMasterCommon):
self.status.set_last_synced(stime, checkpoint_time)
def update_worker_remote_node(self):
- node = sys.argv[-1]
+ node = rconf.args.resource_remote
node_data = node.split("@")
node = node_data[-1]
remote_node_ip = node.split(":")[0]
@@ -1351,7 +1367,7 @@ class GMasterChangelogMixin(GMasterCommon):
current_size = 0
for c in changes:
si = os.lstat(c).st_size
- if (si + current_size) > int(gconf.changelog_batch_size):
+ if (si + current_size) > gconf.get("changelog-batch-size"):
# Create new batch if single Changelog file greater than
# Max Size! or current batch size exceeds Max size
changelogs_batches.append([c])
@@ -1397,7 +1413,7 @@ class GMasterChangelogMixin(GMasterCommon):
def register(self, register_time, changelog_agent, status):
self.changelog_agent = changelog_agent
- self.sleep_interval = int(gconf.change_interval)
+ self.sleep_interval = gconf.get("change-interval")
self.changelog_done_func = self.changelog_agent.done
self.tempdir = self.setup_working_dir()
self.processed_changelogs_dir = os.path.join(self.tempdir,
@@ -1437,13 +1453,13 @@ class GMasterChangeloghistoryMixin(GMasterChangelogMixin):
# Changelogs backend path is hardcoded as
# <BRICK_PATH>/.glusterfs/changelogs, if user configured to different
# location then consuming history will not work(Known issue as of now)
- changelog_path = os.path.join(gconf.local_path,
+ changelog_path = os.path.join(rconf.args.local_path,
".glusterfs/changelogs")
ret, actual_end = self.changelog_agent.history(
changelog_path,
data_stime[0],
end_time,
- int(gconf.sync_jobs))
+ gconf.get("sync-jobs"))
# scan followed by getchanges till scan returns zero.
# history_scan() is blocking call, till it gets the number
@@ -1736,7 +1752,8 @@ class GMasterXsyncMixin(GMasterChangelogMixin):
[gfid, 'MKNOD', str(mo),
str(0), str(0),
escape_space_newline(
- os.path.join(pargfid, bname))])
+ os.path.join(
+ pargfid, bname))])
else:
self.write_entry_change(
"E", [gfid, 'LINK', escape_space_newline(
@@ -1837,8 +1854,8 @@ class Syncer(object):
self.pb = PostBox()
self.sync_engine = sync_engine
self.errnos_ok = resilient_errnos
- for i in range(int(gconf.sync_jobs)):
- t = Thread(target=self.syncjob, args=(i+1, ))
+ for i in range(gconf.get("sync-jobs")):
+ t = Thread(target=self.syncjob, args=(i + 1, ))
t.start()
def syncjob(self, job_id):