summaryrefslogtreecommitdiffstats
path: root/geo-replication/syncdaemon/master.py
diff options
context:
space:
mode:
authorAravinda VK <avishwan@redhat.com>2016-08-08 17:02:37 +0530
committerAravinda VK <avishwan@redhat.com>2016-08-26 10:45:58 -0700
commit6c283f107b646405936520e2549510115bf2ef64 (patch)
tree67459f0c7a502a68413c5cfad5865ca9dcb240e0 /geo-replication/syncdaemon/master.py
parent4a3454753f6e4ddc309c8d1cb11a6e4e432c1da6 (diff)
geo-rep: Post process Data and Meta Changelogs
With this patch, Data and Meta GFIDs are post processed. If Changelog has UNLINK entry then remove from Data and Meta GFIDs list(If stat on GFID is ENOENT in Master). While processing Changelogs, - Collect all the data and meta operations in a temporary database - Delete all Data and Meta GFIDs which are already unlinked as per Changelogs (unlink only if stat on GFID is ENOENT) - Process all Entry operations as usual - Process data and meta operations in batch(Fetch from Db in batch) - Data sync is again batched based on number of changelogs(Default 1day changelogs). Once the sync is complete, Update last Changelog's time as last_synced time as usual. Additionally maintain entry_stime on Brick root, ignore Entry ops if changelog suffix time is less than entry_stime. If data stime is more than entry_stime, this can happen only when passive worker updates stime by itself by getting mount point stime. Use entry_stime = data_stime in this case. New configurations: max-rsync-retries - Default Value is 10 max-data-changelogs-in-batch - Max number of changelogs to be considered in a batch for syncing. Default value is 5760(4 changelogs per min * 60 min * 24 hours) max-history-changelogs-in-batch - Max number of history changelogs to be processed at once. Default value 86400(4 changelogs per min * 60 min * 24 hours * 15 days) BUG: 1364420 Change-Id: I7b665895bf4806035c2a8573d361257cbadbea17 Signed-off-by: Aravinda VK <avishwan@redhat.com> Reviewed-on: http://review.gluster.org/15110 Smoke: Gluster Build System <jenkins@build.gluster.org> NetBSD-regression: NetBSD Build System <jenkins@build.gluster.org> Reviewed-by: Kotresh HR <khiremat@redhat.com> CentOS-regression: Gluster Build System <jenkins@build.gluster.org>
Diffstat (limited to 'geo-replication/syncdaemon/master.py')
-rw-r--r--geo-replication/syncdaemon/master.py504
1 files changed, 324 insertions, 180 deletions
diff --git a/geo-replication/syncdaemon/master.py b/geo-replication/syncdaemon/master.py
index 80c4d9d8b95..3df08e41a13 100644
--- a/geo-replication/syncdaemon/master.py
+++ b/geo-replication/syncdaemon/master.py
@@ -25,7 +25,13 @@ from gconf import gconf
from syncdutils import Thread, GsyncdError, boolify, escape
from syncdutils import unescape, gauxpfx, md5hex, selfkill
from syncdutils import lstat, errno_wrap, FreeObject
-from syncdutils import NoPurgeTimeAvailable, PartialHistoryAvailable
+from syncdutils import NoStimeAvailable, PartialHistoryAvailable
+from changelogsdb import db_init, db_record_data, db_record_meta
+from changelogsdb import db_remove_data, db_remove_meta
+from changelogsdb import db_get_data, db_get_meta, db_commit
+from changelogsdb import db_get_data_count, db_get_meta_count
+from changelogsdb import db_delete_meta_if_exists_in_data
+
URXTIME = (-1, 0)
@@ -45,6 +51,10 @@ CHANGELOG_ROLLOVER_TIME = 15
# that batch since stime will get updated after each batch.
MAX_CHANGELOG_BATCH_SIZE = 727040
+# Number of record to query once
+DB_PAGINATION_SIZE_META = 100
+DB_PAGINATION_SIZE_DATA = 1000
+
# Utility functions to help us to get to closer proximity
# of the DRY principle (no, don't look for elevated or
# perspectivistic things here)
@@ -69,6 +79,24 @@ def _volinfo_hook_relax_foreign(self):
return volinfo_sys
+def edct(op, **ed):
+ dct = {}
+ dct['op'] = op
+ for k in ed:
+ if k == 'stat':
+ st = ed[k]
+ dst = dct['stat'] = {}
+ if st:
+ dst['uid'] = st.st_uid
+ dst['gid'] = st.st_gid
+ dst['mode'] = st.st_mode
+ dst['atime'] = st.st_atime
+ dst['mtime'] = st.st_mtime
+ else:
+ dct[k] = ed[k]
+ return dct
+
+
# The API!
def gmaster_builder(excrawl=None):
@@ -259,7 +287,7 @@ class TarSSHEngine(object):
st = lstat(se)
if isinstance(st, int):
# file got unlinked in the interim
- self.unlinked_gfids.add(se)
+ db_remove_data(se)
return True
self.add_job(self.FLAT_DIR_HIERARCHY, 'reg', regjob, f, None, pb)
@@ -294,7 +322,7 @@ class RsyncEngine(object):
st = lstat(se)
if isinstance(st, int):
# file got unlinked in the interim
- self.unlinked_gfids.add(se)
+ db_remove_data(se)
return True
self.add_job(self.FLAT_DIR_HIERARCHY, 'reg', regjob, f, None, pb)
@@ -340,6 +368,18 @@ class GMasterCommon(object):
if self.volinfo:
return self.volinfo['volume_mark']
+ def get_entry_stime(self):
+ data = self.slave.server.entry_stime(".", self.uuid)
+ if isinstance(data, int):
+ data = None
+ return data
+
+ def get_data_stime(self):
+ data = self.slave.server.stime(".", self.uuid)
+ if isinstance(data, int):
+ data = None
+ return data
+
def xtime(self, path, *a, **opts):
"""get amended xtime
@@ -387,7 +427,6 @@ class GMasterCommon(object):
self.volinfo = None
self.terminate = False
self.sleep_interval = 1
- self.unlinked_gfids = set()
def init_keep_alive(cls):
"""start the keep-alive thread """
@@ -553,7 +592,7 @@ class GMasterCommon(object):
self.FLAT_DIR_HIERARCHY, self.uuid, cluster_stime)
# Purge all changelogs available in processing dir
# less than cluster_stime
- proc_dir = os.path.join(self.setup_working_dir(),
+ proc_dir = os.path.join(self.tempdir,
".processing")
if os.path.exists(proc_dir):
@@ -627,6 +666,11 @@ class GMasterCommon(object):
ret = j[-1]()
if not ret:
succeed = False
+
+ # All the unlinked GFIDs removed from Data and Meta list
+ # Commit the Transaction
+ db_commit()
+
if succeed and not args[0] is None:
self.sendmark(path, *args)
return succeed
@@ -670,9 +714,6 @@ class GMasterChangelogMixin(GMasterCommon):
# flat directory hierarchy for gfid based access
FLAT_DIR_HIERARCHY = '.'
- # maximum retries per changelog before giving up
- MAX_RETRIES = 10
-
CHANGELOG_LOG_LEVEL = 9
CHANGELOG_CONN_RETRIES = 5
@@ -727,21 +768,48 @@ class GMasterChangelogMixin(GMasterCommon):
logging.debug('changelog working dir %s' % workdir)
return workdir
- def get_purge_time(self):
- purge_time = self.xtime('.', self.slave)
- if isinstance(purge_time, int):
- purge_time = None
- return purge_time
+ def log_failures(self, failures, entry_key, gfid_prefix, log_prefix):
+ num_failures = 0
+ for failure in failures:
+ st = lstat(os.path.join(gfid_prefix, failure[0][entry_key]))
+ if not isinstance(st, int):
+ num_failures += 1
+ logging.error('%s FAILED: %s' % (log_prefix,
+ repr(failure)))
+
+ self.status.inc_value("failures", num_failures)
def process_change(self, change, done, retry):
pfx = gauxpfx()
clist = []
entries = []
- meta_gfid = set()
- datas = set()
+ change_ts = change.split(".")[-1]
+
+ # self.data_batch_start is None only in beginning and during
+ # new batch start
+ if self.data_batch_start is None:
+ self.data_batch_start = change_ts
+
+ # Ignore entry ops which are already processed in Changelog modes
+ ignore_entry_ops = False
+ entry_stime = None
+ data_stime = None
+ if self.name in ["live_changelog", "history_changelog"]:
+ entry_stime = self.get_entry_stime()
+ data_stime = self.get_data_stime()
+
+ if entry_stime is not None and data_stime is not None:
+ # if entry_stime is not None but data_stime > entry_stime
+ # This situation is caused by the stime update of Passive worker
+ # Consider data_stime in this case.
+ if data_stime[0] > entry_stime[0]:
+ entry_stime = data_stime
+
+ # Compare the entry_stime with changelog file suffix
+ # if changelog time is less than entry_stime then ignore
+ if int(change_ts) <= entry_stime[0]:
+ ignore_entry_ops = True
- # basic crawl stats: files and bytes
- files_pending = {'count': 0, 'purge': 0, 'bytes': 0, 'files': []}
try:
f = open(change, "r")
clist = f.readlines()
@@ -749,42 +817,6 @@ class GMasterChangelogMixin(GMasterCommon):
except IOError:
raise
- def edct(op, **ed):
- dct = {}
- dct['op'] = op
- for k in ed:
- if k == 'stat':
- st = ed[k]
- dst = dct['stat'] = {}
- if st:
- dst['uid'] = st.st_uid
- dst['gid'] = st.st_gid
- dst['mode'] = st.st_mode
- dst['atime'] = st.st_atime
- dst['mtime'] = st.st_mtime
- else:
- dct[k] = ed[k]
- return dct
-
- # entry counts (not purges)
- def entry_update():
- files_pending['count'] += 1
-
- # purge count
- def purge_update():
- files_pending['purge'] += 1
-
- def log_failures(failures, entry_key, gfid_prefix, log_prefix):
- num_failures = 0
- for failure in failures:
- st = lstat(os.path.join(gfid_prefix, failure[0][entry_key]))
- if not isinstance(st, int):
- num_failures += 1
- logging.error('%s FAILED: %s' % (log_prefix,
- repr(failure)))
-
- self.status.inc_value("failures", num_failures)
-
for e in clist:
e = e.strip()
et = e[self.IDX_START:self.IDX_END] # entry type
@@ -792,12 +824,20 @@ class GMasterChangelogMixin(GMasterCommon):
# skip ENTRY operation if hot tier brick
if self.name == 'live_changelog' or \
- self.name == 'history_changelog':
+ self.name == 'history_changelog':
if boolify(gconf.is_hottier) and et == self.TYPE_ENTRY:
logging.debug('skip ENTRY op: %s if hot tier brick'
% (ec[self.POS_TYPE]))
continue
+ # Data and Meta operations are decided while parsing
+ # UNLINK/RMDIR/MKNOD except that case ignore all the other
+ # entry ops if ignore_entry_ops is True.
+ # UNLINK/RMDIR/MKNOD entry_ops are ignored in the end
+ if ignore_entry_ops and et == self.TYPE_ENTRY and \
+ ec[self.POS_TYPE] not in ["UNLINK", "RMDIR", "MKNOD"]:
+ continue
+
if et == self.TYPE_ENTRY:
# extract information according to the type of
# the entry operation. create(), mkdir() and mknod()
@@ -819,15 +859,16 @@ class GMasterChangelogMixin(GMasterCommon):
# Remove from DATA list, so that rsync will
# not fail
pt = os.path.join(pfx, ec[0])
- if pt in datas:
- datas.remove(pt)
+ st = lstat(pt)
+ if isinstance(st, int):
+ # file got unlinked, May be historical Changelog
+ db_remove_data(pt)
+ db_remove_meta(pt)
if not boolify(gconf.ignore_deletes):
- purge_update()
- entries.append(edct(ty, gfid=gfid, entry=en))
+ if not ignore_entry_ops:
+ entries.append(edct(ty, gfid=gfid, entry=en))
elif ty in ['CREATE', 'MKDIR', 'MKNOD']:
- entry_update()
-
# Special case: record mknod as link
if ty in ['MKNOD']:
mode = int(ec[2])
@@ -845,10 +886,10 @@ class GMasterChangelogMixin(GMasterCommon):
# CREATED if source not exists.
entries.append(edct('LINK', stat=st, entry=en,
gfid=gfid))
-
# Here, we have the assumption that only
# tier-gfid.linkto causes this mknod. Add data
- datas.add(os.path.join(pfx, ec[0]))
+ db_record_data(os.path.join(pfx, ec[0]),
+ change_ts)
continue
# stat info. present in the changelog itself
@@ -867,7 +908,6 @@ class GMasterChangelogMixin(GMasterCommon):
if isinstance(rl, int):
rl = None
- entry_update()
e1 = unescape(os.path.join(pfx, ec[self.POS_ENTRY1 - 1]))
entries.append(edct(ty, gfid=gfid, entry=e1, entry1=en,
stat=st, link=rl))
@@ -880,120 +920,102 @@ class GMasterChangelogMixin(GMasterCommon):
continue
if ty == 'LINK':
- entry_update()
entries.append(edct(ty, stat=st, entry=en, gfid=gfid))
elif ty == 'SYMLINK':
rl = errno_wrap(os.readlink, [en], [ENOENT], [ESTALE])
if isinstance(rl, int):
continue
- entry_update()
+
entries.append(
edct(ty, stat=st, entry=en, gfid=gfid, link=rl))
else:
logging.warn('ignoring %s [op %s]' % (gfid, ty))
elif et == self.TYPE_GFID:
- # If self.unlinked_gfids is available, then that means it is
- # retrying the changelog second time. Do not add the GFID's
- # to rsync job if failed previously but unlinked in master
- if self.unlinked_gfids and \
- os.path.join(pfx, ec[0]) in self.unlinked_gfids:
- logging.debug("ignoring data, since file purged interim")
- else:
- datas.add(os.path.join(pfx, ec[0]))
+ db_record_data(os.path.join(pfx, ec[0]), change_ts)
elif et == self.TYPE_META:
if ec[1] == 'SETATTR': # only setattr's for now...
- if len(ec) == 5:
- # In xsync crawl, we already have stat data
- # avoid doing stat again
- meta_gfid.add((os.path.join(pfx, ec[0]),
- XCrawlMetadata(st_uid=ec[2],
- st_gid=ec[3],
- st_mode=ec[4],
- st_atime=ec[5],
- st_mtime=ec[6])))
- else:
- meta_gfid.add((os.path.join(pfx, ec[0]), ))
+ db_record_meta(os.path.join(pfx, ec[0]), change_ts)
elif ec[1] == 'SETXATTR' or ec[1] == 'XATTROP' or \
ec[1] == 'FXATTROP':
# To sync xattr/acls use rsync/tar, --xattrs and --acls
# switch to rsync and tar
if not boolify(gconf.use_tarssh) and \
(boolify(gconf.sync_xattrs) or boolify(gconf.sync_acls)):
- datas.add(os.path.join(pfx, ec[0]))
+ db_record_data(os.path.join(pfx, ec[0]), change_ts)
else:
logging.warn('got invalid changelog type: %s' % (et))
logging.debug('entries: %s' % repr(entries))
- # Increment counters for Status
- self.status.inc_value("entry", len(entries))
- self.files_in_batch += len(datas)
- self.status.inc_value("data", len(datas))
-
# sync namespace
- if entries:
+ if entries and not ignore_entry_ops:
+ # Increment counters for Status
+ self.status.inc_value("entry", len(entries))
failures = self.slave.server.entry_ops(entries)
- log_failures(failures, 'gfid', gauxpfx(), 'ENTRY')
+ self.log_failures(failures, 'gfid', gauxpfx(), 'ENTRY')
self.status.dec_value("entry", len(entries))
- # sync metadata
- if meta_gfid:
- meta_entries = []
- for go in meta_gfid:
- if len(go) > 1:
- st = go[1]
- else:
- st = lstat(go[0])
- if isinstance(st, int):
- logging.debug('file %s got purged in the interim' % go[0])
- continue
- meta_entries.append(edct('META', go=go[0], stat=st))
- if meta_entries:
- self.status.inc_value("meta", len(entries))
- failures = self.slave.server.meta_ops(meta_entries)
- log_failures(failures, 'go', '', 'META')
- self.status.dec_value("meta", len(entries))
-
- # sync data
- if datas:
+ # Update Entry stime in Brick Root only in case of Changelog mode
+ if self.name in ["live_changelog", "history_changelog"]:
+ entry_stime_to_update = (int(change_ts) - 1, 0)
+ self.upd_entry_stime(entry_stime_to_update)
+ self.status.set_field("last_synced_entry",
+ entry_stime_to_update[0])
+
+ if ignore_entry_ops:
+ # Book keeping, to show in logs the range of Changelogs skipped
+ self.num_skipped_entry_changelogs += 1
+ if self.skipped_entry_changelogs_first is None:
+ self.skipped_entry_changelogs_first = change_ts
+
+ self.skipped_entry_changelogs_last = change_ts
+
+ # Batch data based on number of changelogs as configured as
+ # gconf.max_data_changelogs_in_batch(Default is 24 hrs)
+ # stime will be set after completion of these batch, so on failure
+ # Geo-rep will progress day by day
+ if (self.num_changelogs > gconf.max_data_changelogs_in_batch):
+ # (Start Changelog TS, End Changelog TS, [Changes])
+ self.data_batches.append([self.data_batch_start, change_ts,
+ [change]])
+ self.data_batch_start = None
+ self.num_changelogs = 0
+ else:
+ self.data_batches[-1][1] = change_ts
+ self.data_batches[-1][2].append(change)
+
+ def datas_to_queue(self, start, end):
+ # Paginate db entries and add it to Rsync PostBox
+ offset = 0
+ total_datas = 0
+ while True:
+ # Db Pagination
+ datas = db_get_data(start=start, end=end,
+ limit=DB_PAGINATION_SIZE_DATA,
+ offset=offset)
+ if len(datas) == 0:
+ break
+ offset += DB_PAGINATION_SIZE_DATA
+ total_datas += len(datas)
self.a_syncdata(datas)
- self.datas_in_batch.update(datas)
+ return total_datas
- def process(self, changes, done=1):
- tries = 0
+ def handle_data_sync(self, start, end, changes, done, total_datas):
+ """
+ Wait till all rsync jobs are complete, also handle the retries
+ Update data stime Once Rsync jobs are complete.
+ """
retry = False
- self.unlinked_gfids = set()
- self.files_in_batch = 0
- self.datas_in_batch = set()
+ tries = 0
+ # Error log disabled till the last round
self.syncer.disable_errorlog()
while True:
- # first, fire all changelog transfers in parallel. entry and
- # metadata are performed synchronously, therefore in serial.
- # However at the end of each changelog, data is synchronized
- # with syncdata_async() - which means it is serial w.r.t
- # entries/metadata of that changelog but happens in parallel
- # with data of other changelogs.
-
if retry:
- if tries == (self.MAX_RETRIES - 1):
- # Enable Error logging if it is last retry
- self.syncer.enable_errorlog()
-
- # Remove Unlinked GFIDs from Queue
- for unlinked_gfid in self.unlinked_gfids:
- if unlinked_gfid in self.datas_in_batch:
- self.datas_in_batch.remove(unlinked_gfid)
-
- # Retry only Sync. Do not retry entry ops
- if self.datas_in_batch:
- self.a_syncdata(self.datas_in_batch)
- else:
- for change in changes:
- logging.debug('processing change %s' % change)
- self.process_change(change, done, retry)
- if not retry:
- # number of changelogs processed in the batch
- self.turns += 1
+ self.datas_to_queue(start, end)
+
+ if retry and tries == (gconf.max_rsync_retries - 1):
+ # Enable Error logging if it is last retry
+ self.syncer.enable_errorlog()
# Now we wait for all the data transfers fired off in the above
# step to complete. Note that this is not ideal either. Ideally
@@ -1016,38 +1038,34 @@ class GMasterChangelogMixin(GMasterCommon):
# @change is the last changelog (therefore max time for this batch)
if self.syncdata_wait():
- self.unlinked_gfids = set()
if done:
- xtl = (int(change.split('.')[-1]) - 1, 0)
+ xtl = (int(changes[-1].split('.')[-1]) - 1, 0)
self.upd_stime(xtl)
map(self.changelog_done_func, changes)
self.archive_and_purge_changelogs(changes)
# Reset Data counter after sync
- self.status.dec_value("data", self.files_in_batch)
- self.files_in_batch = 0
- self.datas_in_batch = set()
+ self.status.dec_value("data", total_datas)
break
# We do not know which changelog transfer failed, retry everything.
retry = True
tries += 1
- if tries == self.MAX_RETRIES:
+ if tries >= gconf.max_rsync_retries:
logging.error('changelogs %s could not be processed '
'completely - moving on...' %
' '.join(map(os.path.basename, changes)))
# Reset data counter on failure
- self.status.dec_value("data", self.files_in_batch)
- self.files_in_batch = 0
- self.datas_in_batch = set()
+ self.status.dec_value("data", total_datas)
if done:
- xtl = (int(change.split('.')[-1]) - 1, 0)
+ xtl = (int(changes[-1].split('.')[-1]) - 1, 0)
self.upd_stime(xtl)
map(self.changelog_done_func, changes)
self.archive_and_purge_changelogs(changes)
break
+
# it's either entry_ops() or Rsync that failed to do it's
# job. Mostly it's entry_ops() [which currently has a problem
# of failing to create an entry but failing to return an errno]
@@ -1058,11 +1076,114 @@ class GMasterChangelogMixin(GMasterCommon):
logging.warn('incomplete sync, retrying changelogs: %s' %
' '.join(map(os.path.basename, changes)))
- # Reset the Data counter before Retry
- self.status.dec_value("data", self.files_in_batch)
- self.files_in_batch = 0
time.sleep(0.5)
+ # Log Current batch details
+ if changes:
+ logging.info(
+ "{0} mode completed in {1:.4f} seconds "
+ "({2} - {3} Num: {4}) stime: {5}, entry_stime: {6}".format(
+ self.name,
+ time.time() - self.batch_start_time,
+ changes[0].split("/")[-1],
+ changes[-1].split("/")[-1],
+ len(changes),
+ repr(self.get_data_stime()),
+ repr(self.get_entry_stime())))
+
+ def process(self, changes, done=1):
+ retry = False
+ first_changelog_ts = changes[0].split(".")[-1]
+
+ db_init(os.path.join(self.tempdir, "temp_changelogs.db"))
+
+ self.skipped_entry_changelogs_first = None
+ self.skipped_entry_changelogs_last = None
+ self.num_skipped_entry_changelogs = 0
+ self.batch_start_time = time.time()
+ # (Start Changelog TS, End Changelog TS, [Changes])
+ self.data_batches = [[first_changelog_ts, first_changelog_ts, []]]
+ self.data_batch_start = None
+ self.num_changelogs = 0
+
+ for change in changes:
+ logging.debug('processing change %s' % change)
+ self.process_change(change, done, retry)
+ # number of changelogs processed in the batch
+ self.turns += 1
+
+ # Rsync/Tar will preserve permissions, so if a GFID exists
+ # in data queue then it syncs meta details too. Remove
+ # all meta from meta table if exists in data table
+ db_delete_meta_if_exists_in_data()
+
+ # All the Data/Meta populated, Commit the Changes in Db
+ db_commit()
+
+ # Log the Skipped Entry ops range if any
+ if self.skipped_entry_changelogs_first is not None and \
+ self.skipped_entry_changelogs_last is not None:
+ logging.info("Skipping already processed entry "
+ "ops from CHANGELOG.{0} to CHANGELOG.{1} "
+ "Num: {2}".format(
+ self.skipped_entry_changelogs_first,
+ self.skipped_entry_changelogs_last,
+ self.num_skipped_entry_changelogs))
+
+ # Entry Changelogs syncing finished
+ logging.info("Syncing Entries completed in {0:.4f} seconds "
+ "CHANGELOG.{1} - CHANGELOG.{2} "
+ "Num: {3}".format(
+ time.time() - self.batch_start_time,
+ changes[0].split(".")[-1],
+ changes[-1].split(".")[-1],
+ len(changes)))
+
+ # Update Status Data and Meta Count
+ self.status.inc_value("data", db_get_data_count())
+ self.status.inc_value("meta", db_get_meta_count())
+
+ for b in self.data_batches:
+ # Add to data Queue, so that Rsync will start parallelly
+ # while syncing Meta ops
+ total_datas = self.datas_to_queue(b[0], b[1])
+
+ # Sync Meta
+ offset = 0
+ while True:
+ # Db Pagination
+ meta_gfids = db_get_meta(start=b[0], end=b[1],
+ limit=DB_PAGINATION_SIZE_META,
+ offset=offset)
+ if len(meta_gfids) == 0:
+ break
+ offset += DB_PAGINATION_SIZE_META
+
+ # Collect required information for GFIDs which
+ # exists in Master
+ meta_entries = []
+ for go in meta_gfids:
+ st = lstat(go)
+ if isinstance(st, int):
+ logging.debug('file %s got purged in the '
+ 'interim' % go)
+ continue
+ meta_entries.append(edct('META', go=go, stat=st))
+
+ if meta_entries:
+ failures = self.slave.server.meta_ops(meta_entries)
+ self.log_failures(failures, 'go', '', 'META')
+ self.status.dec_value("meta", len(meta_entries))
+
+ # Sync Data, Rsync already started syncing the files
+ # wait for the completion and retry if required.
+ self.handle_data_sync(b[0], b[1], b[2], done, total_datas)
+
+ def upd_entry_stime(self, stime):
+ self.slave.server.set_entry_stime(self.FLAT_DIR_HIERARCHY,
+ self.uuid,
+ stime)
+
def upd_stime(self, stime, path=None):
if not path:
path = self.FLAT_DIR_HIERARCHY
@@ -1085,7 +1206,12 @@ class GMasterChangelogMixin(GMasterCommon):
remote_node_ip = node.split(":")[0]
self.status.set_slave_node(remote_node_ip)
- def changelogs_batch_process(self, changes):
+ def changelogs_batch_process(self, changes, single_batch=False):
+ if single_batch and changes:
+ logging.debug('processing changes %s' % repr(changes))
+ self.process(changes)
+ return
+
changelogs_batches = []
current_size = 0
for c in changes:
@@ -1112,16 +1238,15 @@ class GMasterChangelogMixin(GMasterCommon):
changes = []
# get stime (from the brick) and purge changelogs
# that are _historical_ to that time.
- purge_time = self.get_purge_time()
+ data_stime = self.get_data_stime()
self.changelog_agent.scan()
self.crawls += 1
changes = self.changelog_agent.getchanges()
if changes:
- if purge_time:
- logging.info("slave's time: %s" % repr(purge_time))
+ if data_stime:
processed = [x for x in changes
- if int(x.split('.')[-1]) < purge_time[0]]
+ if int(x.split('.')[-1]) < data_stime[0]]
for pr in processed:
logging.info(
'skipping already processed change: %s...' %
@@ -1136,7 +1261,8 @@ class GMasterChangelogMixin(GMasterCommon):
self.changelog_agent = changelog_agent
self.sleep_interval = int(gconf.change_interval)
self.changelog_done_func = self.changelog_agent.done
- self.processed_changelogs_dir = os.path.join(self.setup_working_dir(),
+ self.tempdir = self.setup_working_dir()
+ self.processed_changelogs_dir = os.path.join(self.tempdir,
".processed")
self.name = "live_changelog"
self.status = status
@@ -1149,7 +1275,8 @@ class GMasterChangeloghistoryMixin(GMasterChangelogMixin):
self.history_crawl_start_time = register_time
self.changelog_done_func = self.changelog_agent.history_done
self.history_turns = 0
- self.processed_changelogs_dir = os.path.join(self.setup_working_dir(),
+ self.tempdir = self.setup_working_dir()
+ self.processed_changelogs_dir = os.path.join(self.tempdir,
".history/.processed")
self.name = "history_changelog"
self.status = status
@@ -1157,15 +1284,17 @@ class GMasterChangeloghistoryMixin(GMasterChangelogMixin):
def crawl(self):
self.history_turns += 1
self.status.set_worker_crawl_status("History Crawl")
- purge_time = self.get_purge_time()
+ data_stime = self.get_data_stime()
end_time = int(time.time())
- logging.info('starting history crawl... turns: %s, stime: %s, etime: %s'
- % (self.history_turns, repr(purge_time), repr(end_time)))
+ logging.info('starting history crawl... turns: %s, stime: %s, '
+ 'etime: %s, entry_stime: %s'
+ % (self.history_turns, repr(data_stime),
+ repr(end_time), self.get_entry_stime()))
- if not purge_time or purge_time == URXTIME:
+ if not data_stime or data_stime == URXTIME:
logging.info("stime not available, abandoning history crawl")
- raise NoPurgeTimeAvailable()
+ raise NoStimeAvailable()
# Changelogs backend path is hardcoded as
# <BRICK_PATH>/.glusterfs/changelogs, if user configured to different
@@ -1174,7 +1303,7 @@ class GMasterChangeloghistoryMixin(GMasterChangelogMixin):
".glusterfs/changelogs")
ret, actual_end = self.changelog_agent.history(
changelog_path,
- purge_time[0],
+ data_stime[0],
end_time,
int(gconf.sync_jobs))
@@ -1184,27 +1313,42 @@ class GMasterChangeloghistoryMixin(GMasterChangelogMixin):
# to be processed. returns positive value as number of changelogs
# to be processed, which will be fetched using
# history_getchanges()
- while self.changelog_agent.history_scan() > 0:
+ num_scanned_changelogs = self.changelog_agent.history_scan()
+ num_changelogs = num_scanned_changelogs
+ changes = []
+ while num_scanned_changelogs > 0:
self.crawls += 1
- changes = self.changelog_agent.history_getchanges()
+ changes += self.changelog_agent.history_getchanges()
if changes:
- if purge_time:
- logging.info("slave's time: %s" % repr(purge_time))
+ if data_stime:
processed = [x for x in changes
- if int(x.split('.')[-1]) < purge_time[0]]
+ if int(x.split('.')[-1]) < data_stime[0]]
for pr in processed:
logging.info('skipping already processed change: '
'%s...' % os.path.basename(pr))
self.changelog_done_func(pr)
changes.remove(pr)
- self.changelogs_batch_process(changes)
+ if num_changelogs > gconf.max_history_changelogs_in_batch:
+ self.changelogs_batch_process(changes, single_batch=True)
+ num_changelogs = 0
+ changes = []
+
+ num_scanned_changelogs = self.changelog_agent.history_scan()
+ num_changelogs += num_scanned_changelogs
+
+ # If Last batch is not processed with MAX_NUM_CHANGELOGS_IN_BATCH
+ # condition above
+ if changes:
+ self.changelogs_batch_process(changes, single_batch=True)
history_turn_time = int(time.time()) - self.history_crawl_start_time
- logging.info('finished history crawl syncing, endtime: %s, stime: %s'
- % (actual_end, repr(self.get_purge_time())))
+ logging.info('finished history crawl syncing, endtime: %s, '
+ 'stime: %s, entry_stime: %s'
+ % (actual_end, repr(self.get_data_stime()),
+ self.get_entry_stime()))
# If TS returned from history_changelog is < register_time
# then FS crawl may be required, since history is only available
@@ -1269,14 +1413,14 @@ class GMasterXsyncMixin(GMasterChangelogMixin):
t = Thread(target=Xsyncer)
t.start()
logging.info('starting hybrid crawl..., stime: %s'
- % repr(self.get_purge_time()))
+ % repr(self.get_data_stime()))
self.status.set_worker_crawl_status("Hybrid Crawl")
while True:
try:
item = self.comlist.pop(0)
if item[0] == 'finale':
logging.info('finished hybrid crawl syncing, stime: %s'
- % repr(self.get_purge_time()))
+ % repr(self.get_data_stime()))
break
elif item[0] == 'xsync':
logging.info('processing xsync changelog %s' % (item[1]))