summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--geo-replication/syncdaemon/Makefile.am2
-rw-r--r--geo-replication/syncdaemon/changelogsdb.py111
-rw-r--r--geo-replication/syncdaemon/gsyncd.py22
-rw-r--r--geo-replication/syncdaemon/gsyncdstatus.py2
-rw-r--r--geo-replication/syncdaemon/master.py504
-rw-r--r--geo-replication/syncdaemon/resource.py54
-rw-r--r--geo-replication/syncdaemon/syncdutils.py7
7 files changed, 516 insertions, 186 deletions
diff --git a/geo-replication/syncdaemon/Makefile.am b/geo-replication/syncdaemon/Makefile.am
index ed0f5e40924..ce875bdacb6 100644
--- a/geo-replication/syncdaemon/Makefile.am
+++ b/geo-replication/syncdaemon/Makefile.am
@@ -3,6 +3,6 @@ syncdaemondir = $(libexecdir)/glusterfs/python/syncdaemon
syncdaemon_PYTHON = gconf.py gsyncd.py __init__.py master.py README.md repce.py \
resource.py configinterface.py syncdutils.py monitor.py libcxattr.py \
$(top_builddir)/contrib/ipaddr-py/ipaddr.py libgfchangelog.py changelogagent.py \
- gsyncdstatus.py
+ gsyncdstatus.py changelogsdb.py
CLEANFILES =
diff --git a/geo-replication/syncdaemon/changelogsdb.py b/geo-replication/syncdaemon/changelogsdb.py
new file mode 100644
index 00000000000..7e64158e7af
--- /dev/null
+++ b/geo-replication/syncdaemon/changelogsdb.py
@@ -0,0 +1,111 @@
+#
+# Copyright (c) 2016 Red Hat, Inc. <http://www.redhat.com>
+# This file is part of GlusterFS.
+
+# This file is licensed to you under your choice of the GNU Lesser
+# General Public License, version 3 or any later version (LGPLv3 or
+# later), or the GNU General Public License, version 2 (GPLv2), in all
+# cases as published by the Free Software Foundation.
+#
+
+import os
+import sqlite3
+from errno import ENOENT
+
+conn = None
+cursor = None
+
+
+def db_commit():
+ conn.commit()
+
+
+def db_init(db_path):
+ global conn, cursor
+ # Remove Temp Db
+ try:
+ os.unlink(db_path)
+ os.unlink(db_path + "-journal")
+ except OSError as e:
+ if e.errno != ENOENT:
+ raise
+
+ conn = sqlite3.connect(db_path)
+ cursor = conn.cursor()
+ cursor.execute("DROP TABLE IF EXISTS data")
+ cursor.execute("DROP TABLE IF EXISTS meta")
+ query = """CREATE TABLE IF NOT EXISTS data(
+ gfid VARCHAR(100) PRIMARY KEY ON CONFLICT IGNORE,
+ changelog_time VARCHAR(100)
+ )"""
+ cursor.execute(query)
+
+ query = """CREATE TABLE IF NOT EXISTS meta(
+ gfid VARCHAR(100) PRIMARY KEY ON CONFLICT IGNORE,
+ changelog_time VARCHAR(100)
+ )"""
+ cursor.execute(query)
+
+
+def db_record_data(gfid, changelog_time):
+ query = "INSERT INTO data(gfid, changelog_time) VALUES(?, ?)"
+ cursor.execute(query, (gfid, changelog_time))
+
+
+def db_record_meta(gfid, changelog_time):
+ query = "INSERT INTO meta(gfid, changelog_time) VALUES(?, ?)"
+ cursor.execute(query, (gfid, changelog_time))
+
+
+def db_remove_meta(gfid):
+ query = "DELETE FROM meta WHERE gfid = ?"
+ cursor.execute(query, (gfid, ))
+
+
+def db_remove_data(gfid):
+ query = "DELETE FROM data WHERE gfid = ?"
+ cursor.execute(query, (gfid, ))
+
+
+def db_get_data(start, end, limit, offset):
+ query = """SELECT gfid FROM data WHERE changelog_time
+ BETWEEN ? AND ? LIMIT ? OFFSET ?"""
+ cursor.execute(query, (start, end, limit, offset))
+ out = []
+ for row in cursor:
+ out.append(row[0])
+
+ return out
+
+
+def db_get_meta(start, end, limit, offset):
+ query = """SELECT gfid FROM meta WHERE changelog_time
+ BETWEEN ? AND ? LIMIT ? OFFSET ?"""
+ cursor.execute(query, (start, end, limit, offset))
+ out = []
+ for row in cursor:
+ out.append(row[0])
+
+ return out
+
+
+def db_delete_meta_if_exists_in_data():
+ query = """
+ DELETE FROM meta WHERE gfid in
+ (SELECT M.gfid
+ FROM meta M INNER JOIN data D
+ ON M.gfid = D.gfid)
+ """
+ cursor.execute(query)
+
+
+def db_get_data_count():
+ query = "SELECT COUNT(gfid) FROM data"
+ cursor.execute(query)
+ return cursor.fetchone()[0]
+
+
+def db_get_meta_count():
+ query = "SELECT COUNT(gfid) FROM meta"
+ cursor.execute(query)
+ return cursor.fetchone()[0]
diff --git a/geo-replication/syncdaemon/gsyncd.py b/geo-replication/syncdaemon/gsyncd.py
index c2699a183ae..918bee0ce1c 100644
--- a/geo-replication/syncdaemon/gsyncd.py
+++ b/geo-replication/syncdaemon/gsyncd.py
@@ -274,6 +274,28 @@ def main_i():
op.add_option('--sync-acls', default=True, action='store_true')
op.add_option('--log-rsync-performance', default=False,
action='store_true')
+ op.add_option('--max-rsync-retries', type=int, default=10)
+
+ # This is for stime granularity, Bigger batch will be split into
+ # multiple data batches, On failure it will start from this point
+ # Default value is 1 day changelogs
+ # (4 * 60 * 24 = 5760)
+ # 4 changelogs per minute
+ # 60 min per hr
+ # 24 hrs per day
+ op.add_option('--max-data-changelogs-in-batch', type=int, default=5760)
+
+ # While processing Historical Changelogs above BATCH SIZE is not considered
+ # since all Changelogs to be post processed once, Batching it makes more
+ # rsync retries. (4 * 60 * 24 * 15 = 86400)
+ # 4 changelogs per minute
+ # 60 min per hr
+ # 24 hrs per day
+ # 15 days
+ # This means 15 days changelogs can be processed at once in case of
+ # History scan
+ op.add_option('--max-history-changelogs-in-batch', type=int, default=86400)
+
op.add_option('--pause-on-start', default=False, action='store_true')
op.add_option('-L', '--log-level', metavar='LVL')
op.add_option('-r', '--remote-gsyncd', metavar='CMD',
diff --git a/geo-replication/syncdaemon/gsyncdstatus.py b/geo-replication/syncdaemon/gsyncdstatus.py
index 88398e2ce8a..beacd7473b2 100644
--- a/geo-replication/syncdaemon/gsyncdstatus.py
+++ b/geo-replication/syncdaemon/gsyncdstatus.py
@@ -52,6 +52,7 @@ def get_default_values():
"slave_node": DEFAULT_STATUS,
"worker_status": DEFAULT_STATUS,
"last_synced": 0,
+ "last_synced_entry": 0,
"crawl_status": DEFAULT_STATUS,
"entry": 0,
"data": 0,
@@ -239,6 +240,7 @@ class GeorepStatus(object):
slave_node N/A VALUE VALUE N/A
status Created VALUE Paused Stopped
last_synced N/A VALUE VALUE VALUE
+ last_synced_entry N/A VALUE VALUE VALUE
crawl_status N/A VALUE N/A N/A
entry N/A VALUE N/A N/A
data N/A VALUE N/A N/A
diff --git a/geo-replication/syncdaemon/master.py b/geo-replication/syncdaemon/master.py
index 80c4d9d8b95..3df08e41a13 100644
--- a/geo-replication/syncdaemon/master.py
+++ b/geo-replication/syncdaemon/master.py
@@ -25,7 +25,13 @@ from gconf import gconf
from syncdutils import Thread, GsyncdError, boolify, escape
from syncdutils import unescape, gauxpfx, md5hex, selfkill
from syncdutils import lstat, errno_wrap, FreeObject
-from syncdutils import NoPurgeTimeAvailable, PartialHistoryAvailable
+from syncdutils import NoStimeAvailable, PartialHistoryAvailable
+from changelogsdb import db_init, db_record_data, db_record_meta
+from changelogsdb import db_remove_data, db_remove_meta
+from changelogsdb import db_get_data, db_get_meta, db_commit
+from changelogsdb import db_get_data_count, db_get_meta_count
+from changelogsdb import db_delete_meta_if_exists_in_data
+
URXTIME = (-1, 0)
@@ -45,6 +51,10 @@ CHANGELOG_ROLLOVER_TIME = 15
# that batch since stime will get updated after each batch.
MAX_CHANGELOG_BATCH_SIZE = 727040
+# Number of record to query once
+DB_PAGINATION_SIZE_META = 100
+DB_PAGINATION_SIZE_DATA = 1000
+
# Utility functions to help us to get to closer proximity
# of the DRY principle (no, don't look for elevated or
# perspectivistic things here)
@@ -69,6 +79,24 @@ def _volinfo_hook_relax_foreign(self):
return volinfo_sys
+def edct(op, **ed):
+ dct = {}
+ dct['op'] = op
+ for k in ed:
+ if k == 'stat':
+ st = ed[k]
+ dst = dct['stat'] = {}
+ if st:
+ dst['uid'] = st.st_uid
+ dst['gid'] = st.st_gid
+ dst['mode'] = st.st_mode
+ dst['atime'] = st.st_atime
+ dst['mtime'] = st.st_mtime
+ else:
+ dct[k] = ed[k]
+ return dct
+
+
# The API!
def gmaster_builder(excrawl=None):
@@ -259,7 +287,7 @@ class TarSSHEngine(object):
st = lstat(se)
if isinstance(st, int):
# file got unlinked in the interim
- self.unlinked_gfids.add(se)
+ db_remove_data(se)
return True
self.add_job(self.FLAT_DIR_HIERARCHY, 'reg', regjob, f, None, pb)
@@ -294,7 +322,7 @@ class RsyncEngine(object):
st = lstat(se)
if isinstance(st, int):
# file got unlinked in the interim
- self.unlinked_gfids.add(se)
+ db_remove_data(se)
return True
self.add_job(self.FLAT_DIR_HIERARCHY, 'reg', regjob, f, None, pb)
@@ -340,6 +368,18 @@ class GMasterCommon(object):
if self.volinfo:
return self.volinfo['volume_mark']
+ def get_entry_stime(self):
+ data = self.slave.server.entry_stime(".", self.uuid)
+ if isinstance(data, int):
+ data = None
+ return data
+
+ def get_data_stime(self):
+ data = self.slave.server.stime(".", self.uuid)
+ if isinstance(data, int):
+ data = None
+ return data
+
def xtime(self, path, *a, **opts):
"""get amended xtime
@@ -387,7 +427,6 @@ class GMasterCommon(object):
self.volinfo = None
self.terminate = False
self.sleep_interval = 1
- self.unlinked_gfids = set()
def init_keep_alive(cls):
"""start the keep-alive thread """
@@ -553,7 +592,7 @@ class GMasterCommon(object):
self.FLAT_DIR_HIERARCHY, self.uuid, cluster_stime)
# Purge all changelogs available in processing dir
# less than cluster_stime
- proc_dir = os.path.join(self.setup_working_dir(),
+ proc_dir = os.path.join(self.tempdir,
".processing")
if os.path.exists(proc_dir):
@@ -627,6 +666,11 @@ class GMasterCommon(object):
ret = j[-1]()
if not ret:
succeed = False
+
+ # All the unlinked GFIDs removed from Data and Meta list
+ # Commit the Transaction
+ db_commit()
+
if succeed and not args[0] is None:
self.sendmark(path, *args)
return succeed
@@ -670,9 +714,6 @@ class GMasterChangelogMixin(GMasterCommon):
# flat directory hierarchy for gfid based access
FLAT_DIR_HIERARCHY = '.'
- # maximum retries per changelog before giving up
- MAX_RETRIES = 10
-
CHANGELOG_LOG_LEVEL = 9
CHANGELOG_CONN_RETRIES = 5
@@ -727,21 +768,48 @@ class GMasterChangelogMixin(GMasterCommon):
logging.debug('changelog working dir %s' % workdir)
return workdir
- def get_purge_time(self):
- purge_time = self.xtime('.', self.slave)
- if isinstance(purge_time, int):
- purge_time = None
- return purge_time
+ def log_failures(self, failures, entry_key, gfid_prefix, log_prefix):
+ num_failures = 0
+ for failure in failures:
+ st = lstat(os.path.join(gfid_prefix, failure[0][entry_key]))
+ if not isinstance(st, int):
+ num_failures += 1
+ logging.error('%s FAILED: %s' % (log_prefix,
+ repr(failure)))
+
+ self.status.inc_value("failures", num_failures)
def process_change(self, change, done, retry):
pfx = gauxpfx()
clist = []
entries = []
- meta_gfid = set()
- datas = set()
+ change_ts = change.split(".")[-1]
+
+ # self.data_batch_start is None only in beginning and during
+ # new batch start
+ if self.data_batch_start is None:
+ self.data_batch_start = change_ts
+
+ # Ignore entry ops which are already processed in Changelog modes
+ ignore_entry_ops = False
+ entry_stime = None
+ data_stime = None
+ if self.name in ["live_changelog", "history_changelog"]:
+ entry_stime = self.get_entry_stime()
+ data_stime = self.get_data_stime()
+
+ if entry_stime is not None and data_stime is not None:
+ # if entry_stime is not None but data_stime > entry_stime
+ # This situation is caused by the stime update of Passive worker
+ # Consider data_stime in this case.
+ if data_stime[0] > entry_stime[0]:
+ entry_stime = data_stime
+
+ # Compare the entry_stime with changelog file suffix
+ # if changelog time is less than entry_stime then ignore
+ if int(change_ts) <= entry_stime[0]:
+ ignore_entry_ops = True
- # basic crawl stats: files and bytes
- files_pending = {'count': 0, 'purge': 0, 'bytes': 0, 'files': []}
try:
f = open(change, "r")
clist = f.readlines()
@@ -749,42 +817,6 @@ class GMasterChangelogMixin(GMasterCommon):
except IOError:
raise
- def edct(op, **ed):
- dct = {}
- dct['op'] = op
- for k in ed:
- if k == 'stat':
- st = ed[k]
- dst = dct['stat'] = {}
- if st:
- dst['uid'] = st.st_uid
- dst['gid'] = st.st_gid
- dst['mode'] = st.st_mode
- dst['atime'] = st.st_atime
- dst['mtime'] = st.st_mtime
- else:
- dct[k] = ed[k]
- return dct
-
- # entry counts (not purges)
- def entry_update():
- files_pending['count'] += 1
-
- # purge count
- def purge_update():
- files_pending['purge'] += 1
-
- def log_failures(failures, entry_key, gfid_prefix, log_prefix):
- num_failures = 0
- for failure in failures:
- st = lstat(os.path.join(gfid_prefix, failure[0][entry_key]))
- if not isinstance(st, int):
- num_failures += 1
- logging.error('%s FAILED: %s' % (log_prefix,
- repr(failure)))
-
- self.status.inc_value("failures", num_failures)
-
for e in clist:
e = e.strip()
et = e[self.IDX_START:self.IDX_END] # entry type
@@ -792,12 +824,20 @@ class GMasterChangelogMixin(GMasterCommon):
# skip ENTRY operation if hot tier brick
if self.name == 'live_changelog' or \
- self.name == 'history_changelog':
+ self.name == 'history_changelog':
if boolify(gconf.is_hottier) and et == self.TYPE_ENTRY:
logging.debug('skip ENTRY op: %s if hot tier brick'
% (ec[self.POS_TYPE]))
continue
+ # Data and Meta operations are decided while parsing
+ # UNLINK/RMDIR/MKNOD except that case ignore all the other
+ # entry ops if ignore_entry_ops is True.
+ # UNLINK/RMDIR/MKNOD entry_ops are ignored in the end
+ if ignore_entry_ops and et == self.TYPE_ENTRY and \
+ ec[self.POS_TYPE] not in ["UNLINK", "RMDIR", "MKNOD"]:
+ continue
+
if et == self.TYPE_ENTRY:
# extract information according to the type of
# the entry operation. create(), mkdir() and mknod()
@@ -819,15 +859,16 @@ class GMasterChangelogMixin(GMasterCommon):
# Remove from DATA list, so that rsync will
# not fail
pt = os.path.join(pfx, ec[0])
- if pt in datas:
- datas.remove(pt)
+ st = lstat(pt)
+ if isinstance(st, int):
+ # file got unlinked, May be historical Changelog
+ db_remove_data(pt)
+ db_remove_meta(pt)
if not boolify(gconf.ignore_deletes):
- purge_update()
- entries.append(edct(ty, gfid=gfid, entry=en))
+ if not ignore_entry_ops:
+ entries.append(edct(ty, gfid=gfid, entry=en))
elif ty in ['CREATE', 'MKDIR', 'MKNOD']:
- entry_update()
-
# Special case: record mknod as link
if ty in ['MKNOD']:
mode = int(ec[2])
@@ -845,10 +886,10 @@ class GMasterChangelogMixin(GMasterCommon):
# CREATED if source not exists.
entries.append(edct('LINK', stat=st, entry=en,
gfid=gfid))
-
# Here, we have the assumption that only
# tier-gfid.linkto causes this mknod. Add data
- datas.add(os.path.join(pfx, ec[0]))
+ db_record_data(os.path.join(pfx, ec[0]),
+ change_ts)
continue
# stat info. present in the changelog itself
@@ -867,7 +908,6 @@ class GMasterChangelogMixin(GMasterCommon):
if isinstance(rl, int):
rl = None
- entry_update()
e1 = unescape(os.path.join(pfx, ec[self.POS_ENTRY1 - 1]))
entries.append(edct(ty, gfid=gfid, entry=e1, entry1=en,
stat=st, link=rl))
@@ -880,120 +920,102 @@ class GMasterChangelogMixin(GMasterCommon):
continue
if ty == 'LINK':
- entry_update()
entries.append(edct(ty, stat=st, entry=en, gfid=gfid))
elif ty == 'SYMLINK':
rl = errno_wrap(os.readlink, [en], [ENOENT], [ESTALE])
if isinstance(rl, int):
continue
- entry_update()
+
entries.append(
edct(ty, stat=st, entry=en, gfid=gfid, link=rl))
else:
logging.warn('ignoring %s [op %s]' % (gfid, ty))
elif et == self.TYPE_GFID:
- # If self.unlinked_gfids is available, then that means it is
- # retrying the changelog second time. Do not add the GFID's
- # to rsync job if failed previously but unlinked in master
- if self.unlinked_gfids and \
- os.path.join(pfx, ec[0]) in self.unlinked_gfids:
- logging.debug("ignoring data, since file purged interim")
- else:
- datas.add(os.path.join(pfx, ec[0]))
+ db_record_data(os.path.join(pfx, ec[0]), change_ts)
elif et == self.TYPE_META:
if ec[1] == 'SETATTR': # only setattr's for now...
- if len(ec) == 5:
- # In xsync crawl, we already have stat data
- # avoid doing stat again
- meta_gfid.add((os.path.join(pfx, ec[0]),
- XCrawlMetadata(st_uid=ec[2],
- st_gid=ec[3],
- st_mode=ec[4],
- st_atime=ec[5],
- st_mtime=ec[6])))
- else:
- meta_gfid.add((os.path.join(pfx, ec[0]), ))
+ db_record_meta(os.path.join(pfx, ec[0]), change_ts)
elif ec[1] == 'SETXATTR' or ec[1] == 'XATTROP' or \
ec[1] == 'FXATTROP':
# To sync xattr/acls use rsync/tar, --xattrs and --acls
# switch to rsync and tar
if not boolify(gconf.use_tarssh) and \
(boolify(gconf.sync_xattrs) or boolify(gconf.sync_acls)):
- datas.add(os.path.join(pfx, ec[0]))
+ db_record_data(os.path.join(pfx, ec[0]), change_ts)
else:
logging.warn('got invalid changelog type: %s' % (et))
logging.debug('entries: %s' % repr(entries))
- # Increment counters for Status
- self.status.inc_value("entry", len(entries))
- self.files_in_batch += len(datas)
- self.status.inc_value("data", len(datas))
-
# sync namespace
- if entries:
+ if entries and not ignore_entry_ops:
+ # Increment counters for Status
+ self.status.inc_value("entry", len(entries))
failures = self.slave.server.entry_ops(entries)
- log_failures(failures, 'gfid', gauxpfx(), 'ENTRY')
+ self.log_failures(failures, 'gfid', gauxpfx(), 'ENTRY')
self.status.dec_value("entry", len(entries))
- # sync metadata
- if meta_gfid:
- meta_entries = []
- for go in meta_gfid:
- if len(go) > 1:
- st = go[1]
- else:
- st = lstat(go[0])
- if isinstance(st, int):
- logging.debug('file %s got purged in the interim' % go[0])
- continue
- meta_entries.append(edct('META', go=go[0], stat=st))
- if meta_entries:
- self.status.inc_value("meta", len(entries))
- failures = self.slave.server.meta_ops(meta_entries)
- log_failures(failures, 'go', '', 'META')
- self.status.dec_value("meta", len(entries))
-
- # sync data
- if datas:
+ # Update Entry stime in Brick Root only in case of Changelog mode
+ if self.name in ["live_changelog", "history_changelog"]:
+ entry_stime_to_update = (int(change_ts) - 1, 0)
+ self.upd_entry_stime(entry_stime_to_update)
+ self.status.set_field("last_synced_entry",
+ entry_stime_to_update[0])
+
+ if ignore_entry_ops:
+ # Book keeping, to show in logs the range of Changelogs skipped
+ self.num_skipped_entry_changelogs += 1
+ if self.skipped_entry_changelogs_first is None:
+ self.skipped_entry_changelogs_first = change_ts
+
+ self.skipped_entry_changelogs_last = change_ts
+
+ # Batch data based on number of changelogs as configured as
+ # gconf.max_data_changelogs_in_batch(Default is 24 hrs)
+ # stime will be set after completion of these batch, so on failure
+ # Geo-rep will progress day by day
+ if (self.num_changelogs > gconf.max_data_changelogs_in_batch):
+ # (Start Changelog TS, End Changelog TS, [Changes])
+ self.data_batches.append([self.data_batch_start, change_ts,
+ [change]])
+ self.data_batch_start = None
+ self.num_changelogs = 0
+ else:
+ self.data_batches[-1][1] = change_ts
+ self.data_batches[-1][2].append(change)
+
+ def datas_to_queue(self, start, end):
+ # Paginate db entries and add it to Rsync PostBox
+ offset = 0
+ total_datas = 0
+ while True:
+ # Db Pagination
+ datas = db_get_data(start=start, end=end,
+ limit=DB_PAGINATION_SIZE_DATA,
+ offset=offset)
+ if len(datas) == 0:
+ break
+ offset += DB_PAGINATION_SIZE_DATA
+ total_datas += len(datas)
self.a_syncdata(datas)
- self.datas_in_batch.update(datas)
+ return total_datas
- def process(self, changes, done=1):
- tries = 0
+ def handle_data_sync(self, start, end, changes, done, total_datas):
+ """
+ Wait till all rsync jobs are complete, also handle the retries
+ Update data stime Once Rsync jobs are complete.
+ """
retry = False
- self.unlinked_gfids = set()
- self.files_in_batch = 0
- self.datas_in_batch = set()
+ tries = 0
+ # Error log disabled till the last round
self.syncer.disable_errorlog()
while True:
- # first, fire all changelog transfers in parallel. entry and
- # metadata are performed synchronously, therefore in serial.
- # However at the end of each changelog, data is synchronized
- # with syncdata_async() - which means it is serial w.r.t
- # entries/metadata of that changelog but happens in parallel
- # with data of other changelogs.
-
if retry:
- if tries == (self.MAX_RETRIES - 1):
- # Enable Error logging if it is last retry
- self.syncer.enable_errorlog()
-
- # Remove Unlinked GFIDs from Queue
- for unlinked_gfid in self.unlinked_gfids:
- if unlinked_gfid in self.datas_in_batch:
- self.datas_in_batch.remove(unlinked_gfid)
-
- # Retry only Sync. Do not retry entry ops
- if self.datas_in_batch:
- self.a_syncdata(self.datas_in_batch)
- else:
- for change in changes:
- logging.debug('processing change %s' % change)
- self.process_change(change, done, retry)
- if not retry:
- # number of changelogs processed in the batch
- self.turns += 1
+ self.datas_to_queue(start, end)
+
+ if retry and tries == (gconf.max_rsync_retries - 1):
+ # Enable Error logging if it is last retry
+ self.syncer.enable_errorlog()
# Now we wait for all the data transfers fired off in the above
# step to complete. Note that this is not ideal either. Ideally
@@ -1016,38 +1038,34 @@ class GMasterChangelogMixin(GMasterCommon):
# @change is the last changelog (therefore max time for this batch)
if self.syncdata_wait():
- self.unlinked_gfids = set()
if done:
- xtl = (int(change.split('.')[-1]) - 1, 0)
+ xtl = (int(changes[-1].split('.')[-1]) - 1, 0)
self.upd_stime(xtl)
map(self.changelog_done_func, changes)
self.archive_and_purge_changelogs(changes)
# Reset Data counter after sync
- self.status.dec_value("data", self.files_in_batch)
- self.files_in_batch = 0
- self.datas_in_batch = set()
+ self.status.dec_value("data", total_datas)
break
# We do not know which changelog transfer failed, retry everything.
retry = True
tries += 1
- if tries == self.MAX_RETRIES:
+ if tries >= gconf.max_rsync_retries:
logging.error('changelogs %s could not be processed '
'completely - moving on...' %
' '.join(map(os.path.basename, changes)))
# Reset data counter on failure
- self.status.dec_value("data", self.files_in_batch)
- self.files_in_batch = 0
- self.datas_in_batch = set()
+ self.status.dec_value("data", total_datas)
if done:
- xtl = (int(change.split('.')[-1]) - 1, 0)
+ xtl = (int(changes[-1].split('.')[-1]) - 1, 0)
self.upd_stime(xtl)
map(self.changelog_done_func, changes)
self.archive_and_purge_changelogs(changes)
break
+
# it's either entry_ops() or Rsync that failed to do it's
# job. Mostly it's entry_ops() [which currently has a problem
# of failing to create an entry but failing to return an errno]
@@ -1058,11 +1076,114 @@ class GMasterChangelogMixin(GMasterCommon):
logging.warn('incomplete sync, retrying changelogs: %s' %
' '.join(map(os.path.basename, changes)))
- # Reset the Data counter before Retry
- self.status.dec_value("data", self.files_in_batch)
- self.files_in_batch = 0
time.sleep(0.5)
+ # Log Current batch details
+ if changes:
+ logging.info(
+ "{0} mode completed in {1:.4f} seconds "
+ "({2} - {3} Num: {4}) stime: {5}, entry_stime: {6}".format(
+ self.name,
+ time.time() - self.batch_start_time,
+ changes[0].split("/")[-1],
+ changes[-1].split("/")[-1],
+ len(changes),
+ repr(self.get_data_stime()),
+ repr(self.get_entry_stime())))
+
+ def process(self, changes, done=1):
+ retry = False
+ first_changelog_ts = changes[0].split(".")[-1]
+
+ db_init(os.path.join(self.tempdir, "temp_changelogs.db"))
+
+ self.skipped_entry_changelogs_first = None
+ self.skipped_entry_changelogs_last = None
+ self.num_skipped_entry_changelogs = 0
+ self.batch_start_time = time.time()
+ # (Start Changelog TS, End Changelog TS, [Changes])
+ self.data_batches = [[first_changelog_ts, first_changelog_ts, []]]
+ self.data_batch_start = None
+ self.num_changelogs = 0
+
+ for change in changes:
+ logging.debug('processing change %s' % change)
+ self.process_change(change, done, retry)
+ # number of changelogs processed in the batch
+ self.turns += 1
+
+ # Rsync/Tar will preserve permissions, so if a GFID exists
+ # in data queue then it syncs meta details too. Remove
+ # all meta from meta table if exists in data table
+ db_delete_meta_if_exists_in_data()
+
+ # All the Data/Meta populated, Commit the Changes in Db
+ db_commit()
+
+ # Log the Skipped Entry ops range if any
+ if self.skipped_entry_changelogs_first is not None and \
+ self.skipped_entry_changelogs_last is not None:
+ logging.info("Skipping already processed entry "
+ "ops from CHANGELOG.{0} to CHANGELOG.{1} "
+ "Num: {2}".format(
+ self.skipped_entry_changelogs_first,
+ self.skipped_entry_changelogs_last,
+ self.num_skipped_entry_changelogs))
+
+ # Entry Changelogs syncing finished
+ logging.info("Syncing Entries completed in {0:.4f} seconds "
+ "CHANGELOG.{1} - CHANGELOG.{2} "
+ "Num: {3}".format(
+ time.time() - self.batch_start_time,
+ changes[0].split(".")[-1],
+ changes[-1].split(".")[-1],
+ len(changes)))
+
+ # Update Status Data and Meta Count
+ self.status.inc_value("data", db_get_data_count())
+ self.status.inc_value("meta", db_get_meta_count())
+
+ for b in self.data_batches:
+ # Add to data Queue, so that Rsync will start parallelly
+ # while syncing Meta ops
+ total_datas = self.datas_to_queue(b[0], b[1])
+
+ # Sync Meta
+ offset = 0
+ while True:
+ # Db Pagination
+ meta_gfids = db_get_meta(start=b[0], end=b[1],
+ limit=DB_PAGINATION_SIZE_META,
+ offset=offset)
+ if len(meta_gfids) == 0:
+ break
+ offset += DB_PAGINATION_SIZE_META
+
+ # Collect required information for GFIDs which
+ # exists in Master
+ meta_entries = []
+ for go in meta_gfids:
+ st = lstat(go)
+ if isinstance(st, int):
+ logging.debug('file %s got purged in the '
+ 'interim' % go)
+ continue
+ meta_entries.append(edct('META', go=go, stat=st))
+
+ if meta_entries:
+ failures = self.slave.server.meta_ops(meta_entries)
+ self.log_failures(failures, 'go', '', 'META')
+ self.status.dec_value("meta", len(meta_entries))
+
+ # Sync Data, Rsync already started syncing the files
+ # wait for the completion and retry if required.
+ self.handle_data_sync(b[0], b[1], b[2], done, total_datas)
+
+ def upd_entry_stime(self, stime):
+ self.slave.server.set_entry_stime(self.FLAT_DIR_HIERARCHY,
+ self.uuid,
+ stime)
+
def upd_stime(self, stime, path=None):
if not path:
path = self.FLAT_DIR_HIERARCHY
@@ -1085,7 +1206,12 @@ class GMasterChangelogMixin(GMasterCommon):
remote_node_ip = node.split(":")[0]
self.status.set_slave_node(remote_node_ip)
- def changelogs_batch_process(self, changes):
+ def changelogs_batch_process(self, changes, single_batch=False):
+ if single_batch and changes:
+ logging.debug('processing changes %s' % repr(changes))
+ self.process(changes)
+ return
+
changelogs_batches = []
current_size = 0
for c in changes:
@@ -1112,16 +1238,15 @@ class GMasterChangelogMixin(GMasterCommon):
changes = []
# get stime (from the brick) and purge changelogs
# that are _historical_ to that time.
- purge_time = self.get_purge_time()
+ data_stime = self.get_data_stime()
self.changelog_agent.scan()
self.crawls += 1
changes = self.changelog_agent.getchanges()
if changes:
- if purge_time:
- logging.info("slave's time: %s" % repr(purge_time))
+ if data_stime:
processed = [x for x in changes
- if int(x.split('.')[-1]) < purge_time[0]]
+ if int(x.split('.')[-1]) < data_stime[0]]
for pr in processed:
logging.info(
'skipping already processed change: %s...' %
@@ -1136,7 +1261,8 @@ class GMasterChangelogMixin(GMasterCommon):
self.changelog_agent = changelog_agent
self.sleep_interval = int(gconf.change_interval)
self.changelog_done_func = self.changelog_agent.done
- self.processed_changelogs_dir = os.path.join(self.setup_working_dir(),
+ self.tempdir = self.setup_working_dir()
+ self.processed_changelogs_dir = os.path.join(self.tempdir,
".processed")
self.name = "live_changelog"
self.status = status
@@ -1149,7 +1275,8 @@ class GMasterChangeloghistoryMixin(GMasterChangelogMixin):
self.history_crawl_start_time = register_time
self.changelog_done_func = self.changelog_agent.history_done
self.history_turns = 0
- self.processed_changelogs_dir = os.path.join(self.setup_working_dir(),
+ self.tempdir = self.setup_working_dir()
+ self.processed_changelogs_dir = os.path.join(self.tempdir,
".history/.processed")
self.name = "history_changelog"
self.status = status
@@ -1157,15 +1284,17 @@ class GMasterChangeloghistoryMixin(GMasterChangelogMixin):
def crawl(self):
self.history_turns += 1
self.status.set_worker_crawl_status("History Crawl")
- purge_time = self.get_purge_time()
+ data_stime = self.get_data_stime()
end_time = int(time.time())
- logging.info('starting history crawl... turns: %s, stime: %s, etime: %s'
- % (self.history_turns, repr(purge_time), repr(end_time)))
+ logging.info('starting history crawl... turns: %s, stime: %s, '
+ 'etime: %s, entry_stime: %s'
+ % (self.history_turns, repr(data_stime),
+ repr(end_time), self.get_entry_stime()))
- if not purge_time or purge_time == URXTIME:
+ if not data_stime or data_stime == URXTIME:
logging.info("stime not available, abandoning history crawl")
- raise NoPurgeTimeAvailable()
+ raise NoStimeAvailable()
# Changelogs backend path is hardcoded as
# <BRICK_PATH>/.glusterfs/changelogs, if user configured to different
@@ -1174,7 +1303,7 @@ class GMasterChangeloghistoryMixin(GMasterChangelogMixin):
".glusterfs/changelogs")
ret, actual_end = self.changelog_agent.history(
changelog_path,
- purge_time[0],
+ data_stime[0],
end_time,
int(gconf.sync_jobs))
@@ -1184,27 +1313,42 @@ class GMasterChangeloghistoryMixin(GMasterChangelogMixin):
# to be processed. returns positive value as number of changelogs
# to be processed, which will be fetched using
# history_getchanges()
- while self.changelog_agent.history_scan() > 0:
+ num_scanned_changelogs = self.changelog_agent.history_scan()
+ num_changelogs = num_scanned_changelogs
+ changes = []
+ while num_scanned_changelogs > 0:
self.crawls += 1
- changes = self.changelog_agent.history_getchanges()
+ changes += self.changelog_agent.history_getchanges()
if changes:
- if purge_time:
- logging.info("slave's time: %s" % repr(purge_time))
+ if data_stime:
processed = [x for x in changes
- if int(x.split('.')[-1]) < purge_time[0]]
+ if int(x.split('.')[-1]) < data_stime[0]]
for pr in processed:
logging.info('skipping already processed change: '
'%s...' % os.path.basename(pr))
self.changelog_done_func(pr)
changes.remove(pr)
- self.changelogs_batch_process(changes)
+ if num_changelogs > gconf.max_history_changelogs_in_batch:
+ self.changelogs_batch_process(changes, single_batch=True)
+ num_changelogs = 0
+ changes = []
+
+ num_scanned_changelogs = self.changelog_agent.history_scan()
+ num_changelogs += num_scanned_changelogs
+
+ # If Last batch is not processed with MAX_NUM_CHANGELOGS_IN_BATCH
+ # condition above
+ if changes:
+ self.changelogs_batch_process(changes, single_batch=True)
history_turn_time = int(time.time()) - self.history_crawl_start_time
- logging.info('finished history crawl syncing, endtime: %s, stime: %s'
- % (actual_end, repr(self.get_purge_time())))
+ logging.info('finished history crawl syncing, endtime: %s, '
+ 'stime: %s, entry_stime: %s'
+ % (actual_end, repr(self.get_data_stime()),
+ self.get_entry_stime()))
# If TS returned from history_changelog is < register_time
# then FS crawl may be required, since history is only available
@@ -1269,14 +1413,14 @@ class GMasterXsyncMixin(GMasterChangelogMixin):
t = Thread(target=Xsyncer)
t.start()
logging.info('starting hybrid crawl..., stime: %s'
- % repr(self.get_purge_time()))
+ % repr(self.get_data_stime()))
self.status.set_worker_crawl_status("Hybrid Crawl")
while True:
try:
item = self.comlist.pop(0)
if item[0] == 'finale':
logging.info('finished hybrid crawl syncing, stime: %s'
- % repr(self.get_purge_time()))
+ % repr(self.get_data_stime()))
break
elif item[0] == 'xsync':
logging.info('processing xsync changelog %s' % (item[1]))
diff --git a/geo-replication/syncdaemon/resource.py b/geo-replication/syncdaemon/resource.py
index ed0e7efe2b2..91ca1916f6a 100644
--- a/geo-replication/syncdaemon/resource.py
+++ b/geo-replication/syncdaemon/resource.py
@@ -31,11 +31,10 @@ import shutil
from gconf import gconf
import repce
from repce import RepceServer, RepceClient
-from master import gmaster_builder
import syncdutils
from syncdutils import GsyncdError, select, privileged, boolify, funcode
from syncdutils import umask, entry2pb, gauxpfx, errno_wrap, lstat
-from syncdutils import NoPurgeTimeAvailable, PartialHistoryAvailable
+from syncdutils import NoStimeAvailable, PartialHistoryAvailable
from syncdutils import ChangelogException, ChangelogHistoryNotAvailable
from syncdutils import CHANGELOG_AGENT_CLIENT_VERSION
from gsyncdstatus import GeorepStatus
@@ -522,6 +521,29 @@ class Server(object):
raise
@classmethod
+ @_pathguard
+ def entry_stime(cls, path, uuid):
+ """
+ entry_stime xattr to reduce the number of retry of Entry changes when
+ Geo-rep worker crashes and restarts. entry_stime is updated after
+ processing every changelog file. On failure and restart, worker only
+ have to reprocess the last changelog for Entry ops.
+ Xattr Key: <PFX>.<MASTERVOL_UUID>.<SLAVEVOL_UUID>.entry_stime
+ """
+ try:
+ val = Xattr.lgetxattr(path,
+ '.'.join([cls.GX_NSPACE, uuid,
+ 'entry_stime']),
+ 8)
+ return struct.unpack('!II', val)
+ except OSError:
+ ex = sys.exc_info()[1]
+ if ex.errno in (ENOENT, ENODATA, ENOTDIR):
+ return ex.errno
+ else:
+ raise
+
+ @classmethod
def node_uuid(cls, path='.'):
try:
uuid_l = Xattr.lgetxattr_buf(
@@ -542,6 +564,16 @@ class Server(object):
@classmethod
@_pathguard
+ def set_entry_stime(cls, path, uuid, mark):
+ """set @mark as stime for @uuid on @path"""
+ errno_wrap(Xattr.lsetxattr,
+ [path,
+ '.'.join([cls.GX_NSPACE, uuid, 'entry_stime']),
+ struct.pack('!II', *mark)],
+ [ENOENT])
+
+ @classmethod
+ @_pathguard
def set_xtime(cls, path, uuid, mark):
"""set @mark as xtime for @uuid on @path"""
errno_wrap(Xattr.lsetxattr,
@@ -1376,6 +1408,7 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):
def gmaster_instantiate_tuple(self, slave):
"""return a tuple of the 'one shot' and the 'main crawl'
class instance"""
+ from master import gmaster_builder
return (gmaster_builder('xsync')(self, slave),
gmaster_builder()(self, slave),
gmaster_builder('changeloghistory')(self, slave))
@@ -1436,6 +1469,13 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):
uuid + '.' + gconf.slave_id)
),
slave.server)
+ slave.server.entry_stime = types.MethodType(
+ lambda _self, path, uuid: (
+ brickserver.entry_stime(
+ path,
+ uuid + '.' + gconf.slave_id)
+ ),
+ slave.server)
slave.server.set_stime = types.MethodType(
lambda _self, path, uuid, mark: (
brickserver.set_stime(path,
@@ -1443,6 +1483,14 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):
mark)
),
slave.server)
+ slave.server.set_entry_stime = types.MethodType(
+ lambda _self, path, uuid, mark: (
+ brickserver.set_entry_stime(
+ path,
+ uuid + '.' + gconf.slave_id,
+ mark)
+ ),
+ slave.server)
(g1, g2, g3) = self.gmaster_instantiate_tuple(slave)
g1.master.server = brickserver
g2.master.server = brickserver
@@ -1506,7 +1554,7 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):
except ChangelogHistoryNotAvailable:
logging.info('Changelog history not available, using xsync')
g1.crawlwrap(oneshot=True, register_time=register_time)
- except NoPurgeTimeAvailable:
+ except NoStimeAvailable:
logging.info('No stime available, using xsync crawl')
g1.crawlwrap(oneshot=True, register_time=register_time)
except ChangelogException as e:
diff --git a/geo-replication/syncdaemon/syncdutils.py b/geo-replication/syncdaemon/syncdutils.py
index 40eff050a9e..f7beb947efc 100644
--- a/geo-replication/syncdaemon/syncdutils.py
+++ b/geo-replication/syncdaemon/syncdutils.py
@@ -45,7 +45,7 @@ except ImportError:
# auxiliary gfid based access prefix
_CL_AUX_GFID_PFX = ".gfid/"
-GF_OP_RETRIES = 20
+GF_OP_RETRIES = 10
CHANGELOG_AGENT_SERVER_VERSION = 1.0
CHANGELOG_AGENT_CLIENT_VERSION = 1.0
@@ -494,15 +494,18 @@ def errno_wrap(call, arg=[], errnos=[], retry_errnos=[]):
def lstat(e):
return errno_wrap(os.lstat, [e], [ENOENT], [ESTALE])
-class NoPurgeTimeAvailable(Exception):
+
+class NoStimeAvailable(Exception):
pass
class PartialHistoryAvailable(Exception):
pass
+
class ChangelogHistoryNotAvailable(Exception):
pass
+
class ChangelogException(OSError):
pass