summaryrefslogtreecommitdiffstats
path: root/geo-replication/syncdaemon/master.py
diff options
context:
space:
mode:
Diffstat (limited to 'geo-replication/syncdaemon/master.py')
-rw-r--r--geo-replication/syncdaemon/master.py504
1 files changed, 324 insertions, 180 deletions
diff --git a/geo-replication/syncdaemon/master.py b/geo-replication/syncdaemon/master.py
index 80c4d9d8b95..3df08e41a13 100644
--- a/geo-replication/syncdaemon/master.py
+++ b/geo-replication/syncdaemon/master.py
@@ -25,7 +25,13 @@ from gconf import gconf
from syncdutils import Thread, GsyncdError, boolify, escape
from syncdutils import unescape, gauxpfx, md5hex, selfkill
from syncdutils import lstat, errno_wrap, FreeObject
-from syncdutils import NoPurgeTimeAvailable, PartialHistoryAvailable
+from syncdutils import NoStimeAvailable, PartialHistoryAvailable
+from changelogsdb import db_init, db_record_data, db_record_meta
+from changelogsdb import db_remove_data, db_remove_meta
+from changelogsdb import db_get_data, db_get_meta, db_commit
+from changelogsdb import db_get_data_count, db_get_meta_count
+from changelogsdb import db_delete_meta_if_exists_in_data
+
URXTIME = (-1, 0)
@@ -45,6 +51,10 @@ CHANGELOG_ROLLOVER_TIME = 15
# that batch since stime will get updated after each batch.
MAX_CHANGELOG_BATCH_SIZE = 727040
+# Number of record to query once
+DB_PAGINATION_SIZE_META = 100
+DB_PAGINATION_SIZE_DATA = 1000
+
# Utility functions to help us to get to closer proximity
# of the DRY principle (no, don't look for elevated or
# perspectivistic things here)
@@ -69,6 +79,24 @@ def _volinfo_hook_relax_foreign(self):
return volinfo_sys
+def edct(op, **ed):
+ dct = {}
+ dct['op'] = op
+ for k in ed:
+ if k == 'stat':
+ st = ed[k]
+ dst = dct['stat'] = {}
+ if st:
+ dst['uid'] = st.st_uid
+ dst['gid'] = st.st_gid
+ dst['mode'] = st.st_mode
+ dst['atime'] = st.st_atime
+ dst['mtime'] = st.st_mtime
+ else:
+ dct[k] = ed[k]
+ return dct
+
+
# The API!
def gmaster_builder(excrawl=None):
@@ -259,7 +287,7 @@ class TarSSHEngine(object):
st = lstat(se)
if isinstance(st, int):
# file got unlinked in the interim
- self.unlinked_gfids.add(se)
+ db_remove_data(se)
return True
self.add_job(self.FLAT_DIR_HIERARCHY, 'reg', regjob, f, None, pb)
@@ -294,7 +322,7 @@ class RsyncEngine(object):
st = lstat(se)
if isinstance(st, int):
# file got unlinked in the interim
- self.unlinked_gfids.add(se)
+ db_remove_data(se)
return True
self.add_job(self.FLAT_DIR_HIERARCHY, 'reg', regjob, f, None, pb)
@@ -340,6 +368,18 @@ class GMasterCommon(object):
if self.volinfo:
return self.volinfo['volume_mark']
+ def get_entry_stime(self):
+ data = self.slave.server.entry_stime(".", self.uuid)
+ if isinstance(data, int):
+ data = None
+ return data
+
+ def get_data_stime(self):
+ data = self.slave.server.stime(".", self.uuid)
+ if isinstance(data, int):
+ data = None
+ return data
+
def xtime(self, path, *a, **opts):
"""get amended xtime
@@ -387,7 +427,6 @@ class GMasterCommon(object):
self.volinfo = None
self.terminate = False
self.sleep_interval = 1
- self.unlinked_gfids = set()
def init_keep_alive(cls):
"""start the keep-alive thread """
@@ -553,7 +592,7 @@ class GMasterCommon(object):
self.FLAT_DIR_HIERARCHY, self.uuid, cluster_stime)
# Purge all changelogs available in processing dir
# less than cluster_stime
- proc_dir = os.path.join(self.setup_working_dir(),
+ proc_dir = os.path.join(self.tempdir,
".processing")
if os.path.exists(proc_dir):
@@ -627,6 +666,11 @@ class GMasterCommon(object):
ret = j[-1]()
if not ret:
succeed = False
+
+ # All the unlinked GFIDs removed from Data and Meta list
+ # Commit the Transaction
+ db_commit()
+
if succeed and not args[0] is None:
self.sendmark(path, *args)
return succeed
@@ -670,9 +714,6 @@ class GMasterChangelogMixin(GMasterCommon):
# flat directory hierarchy for gfid based access
FLAT_DIR_HIERARCHY = '.'
- # maximum retries per changelog before giving up
- MAX_RETRIES = 10
-
CHANGELOG_LOG_LEVEL = 9
CHANGELOG_CONN_RETRIES = 5
@@ -727,21 +768,48 @@ class GMasterChangelogMixin(GMasterCommon):
logging.debug('changelog working dir %s' % workdir)
return workdir
- def get_purge_time(self):
- purge_time = self.xtime('.', self.slave)
- if isinstance(purge_time, int):
- purge_time = None
- return purge_time
+ def log_failures(self, failures, entry_key, gfid_prefix, log_prefix):
+ num_failures = 0
+ for failure in failures:
+ st = lstat(os.path.join(gfid_prefix, failure[0][entry_key]))
+ if not isinstance(st, int):
+ num_failures += 1
+ logging.error('%s FAILED: %s' % (log_prefix,
+ repr(failure)))
+
+ self.status.inc_value("failures", num_failures)
def process_change(self, change, done, retry):
pfx = gauxpfx()
clist = []
entries = []
- meta_gfid = set()
- datas = set()
+ change_ts = change.split(".")[-1]
+
+ # self.data_batch_start is None only in beginning and during
+ # new batch start
+ if self.data_batch_start is None:
+ self.data_batch_start = change_ts
+
+ # Ignore entry ops which are already processed in Changelog modes
+ ignore_entry_ops = False
+ entry_stime = None
+ data_stime = None
+ if self.name in ["live_changelog", "history_changelog"]:
+ entry_stime = self.get_entry_stime()
+ data_stime = self.get_data_stime()
+
+ if entry_stime is not None and data_stime is not None:
+ # if entry_stime is not None but data_stime > entry_stime
+ # This situation is caused by the stime update of Passive worker
+ # Consider data_stime in this case.
+ if data_stime[0] > entry_stime[0]:
+ entry_stime = data_stime
+
+ # Compare the entry_stime with changelog file suffix
+ # if changelog time is less than entry_stime then ignore
+ if int(change_ts) <= entry_stime[0]:
+ ignore_entry_ops = True
- # basic crawl stats: files and bytes
- files_pending = {'count': 0, 'purge': 0, 'bytes': 0, 'files': []}
try:
f = open(change, "r")
clist = f.readlines()
@@ -749,42 +817,6 @@ class GMasterChangelogMixin(GMasterCommon):
except IOError:
raise
- def edct(op, **ed):
- dct = {}
- dct['op'] = op
- for k in ed:
- if k == 'stat':
- st = ed[k]
- dst = dct['stat'] = {}
- if st:
- dst['uid'] = st.st_uid
- dst['gid'] = st.st_gid
- dst['mode'] = st.st_mode
- dst['atime'] = st.st_atime
- dst['mtime'] = st.st_mtime
- else:
- dct[k] = ed[k]
- return dct
-
- # entry counts (not purges)
- def entry_update():
- files_pending['count'] += 1
-
- # purge count
- def purge_update():
- files_pending['purge'] += 1
-
- def log_failures(failures, entry_key, gfid_prefix, log_prefix):
- num_failures = 0
- for failure in failures:
- st = lstat(os.path.join(gfid_prefix, failure[0][entry_key]))
- if not isinstance(st, int):
- num_failures += 1
- logging.error('%s FAILED: %s' % (log_prefix,
- repr(failure)))
-
- self.status.inc_value("failures", num_failures)
-
for e in clist:
e = e.strip()
et = e[self.IDX_START:self.IDX_END] # entry type
@@ -792,12 +824,20 @@ class GMasterChangelogMixin(GMasterCommon):
# skip ENTRY operation if hot tier brick
if self.name == 'live_changelog' or \
- self.name == 'history_changelog':
+ self.name == 'history_changelog':
if boolify(gconf.is_hottier) and et == self.TYPE_ENTRY:
logging.debug('skip ENTRY op: %s if hot tier brick'
% (ec[self.POS_TYPE]))
continue
+ # Data and Meta operations are decided while parsing
+ # UNLINK/RMDIR/MKNOD except that case ignore all the other
+ # entry ops if ignore_entry_ops is True.
+ # UNLINK/RMDIR/MKNOD entry_ops are ignored in the end
+ if ignore_entry_ops and et == self.TYPE_ENTRY and \
+ ec[self.POS_TYPE] not in ["UNLINK", "RMDIR", "MKNOD"]:
+ continue
+
if et == self.TYPE_ENTRY:
# extract information according to the type of
# the entry operation. create(), mkdir() and mknod()
@@ -819,15 +859,16 @@ class GMasterChangelogMixin(GMasterCommon):
# Remove from DATA list, so that rsync will
# not fail
pt = os.path.join(pfx, ec[0])
- if pt in datas:
- datas.remove(pt)
+ st = lstat(pt)
+ if isinstance(st, int):
+ # file got unlinked, May be historical Changelog
+ db_remove_data(pt)
+ db_remove_meta(pt)
if not boolify(gconf.ignore_deletes):
- purge_update()
- entries.append(edct(ty, gfid=gfid, entry=en))
+ if not ignore_entry_ops:
+ entries.append(edct(ty, gfid=gfid, entry=en))
elif ty in ['CREATE', 'MKDIR', 'MKNOD']:
- entry_update()
-
# Special case: record mknod as link
if ty in ['MKNOD']:
mode = int(ec[2])
@@ -845,10 +886,10 @@ class GMasterChangelogMixin(GMasterCommon):
# CREATED if source not exists.
entries.append(edct('LINK', stat=st, entry=en,
gfid=gfid))
-
# Here, we have the assumption that only
# tier-gfid.linkto causes this mknod. Add data
- datas.add(os.path.join(pfx, ec[0]))
+ db_record_data(os.path.join(pfx, ec[0]),
+ change_ts)
continue
# stat info. present in the changelog itself
@@ -867,7 +908,6 @@ class GMasterChangelogMixin(GMasterCommon):
if isinstance(rl, int):
rl = None
- entry_update()
e1 = unescape(os.path.join(pfx, ec[self.POS_ENTRY1 - 1]))
entries.append(edct(ty, gfid=gfid, entry=e1, entry1=en,
stat=st, link=rl))
@@ -880,120 +920,102 @@ class GMasterChangelogMixin(GMasterCommon):
continue
if ty == 'LINK':
- entry_update()
entries.append(edct(ty, stat=st, entry=en, gfid=gfid))
elif ty == 'SYMLINK':
rl = errno_wrap(os.readlink, [en], [ENOENT], [ESTALE])
if isinstance(rl, int):
continue
- entry_update()
+
entries.append(
edct(ty, stat=st, entry=en, gfid=gfid, link=rl))
else:
logging.warn('ignoring %s [op %s]' % (gfid, ty))
elif et == self.TYPE_GFID:
- # If self.unlinked_gfids is available, then that means it is
- # retrying the changelog second time. Do not add the GFID's
- # to rsync job if failed previously but unlinked in master
- if self.unlinked_gfids and \
- os.path.join(pfx, ec[0]) in self.unlinked_gfids:
- logging.debug("ignoring data, since file purged interim")
- else:
- datas.add(os.path.join(pfx, ec[0]))
+ db_record_data(os.path.join(pfx, ec[0]), change_ts)
elif et == self.TYPE_META:
if ec[1] == 'SETATTR': # only setattr's for now...
- if len(ec) == 5:
- # In xsync crawl, we already have stat data
- # avoid doing stat again
- meta_gfid.add((os.path.join(pfx, ec[0]),
- XCrawlMetadata(st_uid=ec[2],
- st_gid=ec[3],
- st_mode=ec[4],
- st_atime=ec[5],
- st_mtime=ec[6])))
- else:
- meta_gfid.add((os.path.join(pfx, ec[0]), ))
+ db_record_meta(os.path.join(pfx, ec[0]), change_ts)
elif ec[1] == 'SETXATTR' or ec[1] == 'XATTROP' or \
ec[1] == 'FXATTROP':
# To sync xattr/acls use rsync/tar, --xattrs and --acls
# switch to rsync and tar
if not boolify(gconf.use_tarssh) and \
(boolify(gconf.sync_xattrs) or boolify(gconf.sync_acls)):
- datas.add(os.path.join(pfx, ec[0]))
+ db_record_data(os.path.join(pfx, ec[0]), change_ts)
else:
logging.warn('got invalid changelog type: %s' % (et))
logging.debug('entries: %s' % repr(entries))
- # Increment counters for Status
- self.status.inc_value("entry", len(entries))
- self.files_in_batch += len(datas)
- self.status.inc_value("data", len(datas))
-
# sync namespace
- if entries:
+ if entries and not ignore_entry_ops:
+ # Increment counters for Status
+ self.status.inc_value("entry", len(entries))
failures = self.slave.server.entry_ops(entries)
- log_failures(failures, 'gfid', gauxpfx(), 'ENTRY')
+ self.log_failures(failures, 'gfid', gauxpfx(), 'ENTRY')
self.status.dec_value("entry", len(entries))
- # sync metadata
- if meta_gfid:
- meta_entries = []
- for go in meta_gfid:
- if len(go) > 1:
- st = go[1]
- else:
- st = lstat(go[0])
- if isinstance(st, int):
- logging.debug('file %s got purged in the interim' % go[0])
- continue
- meta_entries.append(edct('META', go=go[0], stat=st))
- if meta_entries:
- self.status.inc_value("meta", len(entries))
- failures = self.slave.server.meta_ops(meta_entries)
- log_failures(failures, 'go', '', 'META')
- self.status.dec_value("meta", len(entries))
-
- # sync data
- if datas:
+ # Update Entry stime in Brick Root only in case of Changelog mode
+ if self.name in ["live_changelog", "history_changelog"]:
+ entry_stime_to_update = (int(change_ts) - 1, 0)
+ self.upd_entry_stime(entry_stime_to_update)
+ self.status.set_field("last_synced_entry",
+ entry_stime_to_update[0])
+
+ if ignore_entry_ops:
+ # Book keeping, to show in logs the range of Changelogs skipped
+ self.num_skipped_entry_changelogs += 1
+ if self.skipped_entry_changelogs_first is None:
+ self.skipped_entry_changelogs_first = change_ts
+
+ self.skipped_entry_changelogs_last = change_ts
+
+ # Batch data based on number of changelogs as configured as
+ # gconf.max_data_changelogs_in_batch(Default is 24 hrs)
+ # stime will be set after completion of these batch, so on failure
+ # Geo-rep will progress day by day
+ if (self.num_changelogs > gconf.max_data_changelogs_in_batch):
+ # (Start Changelog TS, End Changelog TS, [Changes])
+ self.data_batches.append([self.data_batch_start, change_ts,
+ [change]])
+ self.data_batch_start = None
+ self.num_changelogs = 0
+ else:
+ self.data_batches[-1][1] = change_ts
+ self.data_batches[-1][2].append(change)
+
+ def datas_to_queue(self, start, end):
+ # Paginate db entries and add it to Rsync PostBox
+ offset = 0
+ total_datas = 0
+ while True:
+ # Db Pagination
+ datas = db_get_data(start=start, end=end,
+ limit=DB_PAGINATION_SIZE_DATA,
+ offset=offset)
+ if len(datas) == 0:
+ break
+ offset += DB_PAGINATION_SIZE_DATA
+ total_datas += len(datas)
self.a_syncdata(datas)
- self.datas_in_batch.update(datas)
+ return total_datas
- def process(self, changes, done=1):
- tries = 0
+ def handle_data_sync(self, start, end, changes, done, total_datas):
+ """
+ Wait till all rsync jobs are complete, also handle the retries
+ Update data stime Once Rsync jobs are complete.
+ """
retry = False
- self.unlinked_gfids = set()
- self.files_in_batch = 0
- self.datas_in_batch = set()
+ tries = 0
+ # Error log disabled till the last round
self.syncer.disable_errorlog()
while True:
- # first, fire all changelog transfers in parallel. entry and
- # metadata are performed synchronously, therefore in serial.
- # However at the end of each changelog, data is synchronized
- # with syncdata_async() - which means it is serial w.r.t
- # entries/metadata of that changelog but happens in parallel
- # with data of other changelogs.
-
if retry:
- if tries == (self.MAX_RETRIES - 1):
- # Enable Error logging if it is last retry
- self.syncer.enable_errorlog()
-
- # Remove Unlinked GFIDs from Queue
- for unlinked_gfid in self.unlinked_gfids:
- if unlinked_gfid in self.datas_in_batch:
- self.datas_in_batch.remove(unlinked_gfid)
-
- # Retry only Sync. Do not retry entry ops
- if self.datas_in_batch:
- self.a_syncdata(self.datas_in_batch)
- else:
- for change in changes:
- logging.debug('processing change %s' % change)
- self.process_change(change, done, retry)
- if not retry:
- # number of changelogs processed in the batch
- self.turns += 1
+ self.datas_to_queue(start, end)
+
+ if retry and tries == (gconf.max_rsync_retries - 1):
+ # Enable Error logging if it is last retry
+ self.syncer.enable_errorlog()
# Now we wait for all the data transfers fired off in the above
# step to complete. Note that this is not ideal either. Ideally
@@ -1016,38 +1038,34 @@ class GMasterChangelogMixin(GMasterCommon):
# @change is the last changelog (therefore max time for this batch)
if self.syncdata_wait():
- self.unlinked_gfids = set()
if done:
- xtl = (int(change.split('.')[-1]) - 1, 0)
+ xtl = (int(changes[-1].split('.')[-1]) - 1, 0)
self.upd_stime(xtl)
map(self.changelog_done_func, changes)
self.archive_and_purge_changelogs(changes)
# Reset Data counter after sync
- self.status.dec_value("data", self.files_in_batch)
- self.files_in_batch = 0
- self.datas_in_batch = set()
+ self.status.dec_value("data", total_datas)
break
# We do not know which changelog transfer failed, retry everything.
retry = True
tries += 1
- if tries == self.MAX_RETRIES:
+ if tries >= gconf.max_rsync_retries:
logging.error('changelogs %s could not be processed '
'completely - moving on...' %
' '.join(map(os.path.basename, changes)))
# Reset data counter on failure
- self.status.dec_value("data", self.files_in_batch)
- self.files_in_batch = 0
- self.datas_in_batch = set()
+ self.status.dec_value("data", total_datas)
if done:
- xtl = (int(change.split('.')[-1]) - 1, 0)
+ xtl = (int(changes[-1].split('.')[-1]) - 1, 0)
self.upd_stime(xtl)
map(self.changelog_done_func, changes)
self.archive_and_purge_changelogs(changes)
break
+
# it's either entry_ops() or Rsync that failed to do it's
# job. Mostly it's entry_ops() [which currently has a problem
# of failing to create an entry but failing to return an errno]
@@ -1058,11 +1076,114 @@ class GMasterChangelogMixin(GMasterCommon):
logging.warn('incomplete sync, retrying changelogs: %s' %
' '.join(map(os.path.basename, changes)))
- # Reset the Data counter before Retry
- self.status.dec_value("data", self.files_in_batch)
- self.files_in_batch = 0
time.sleep(0.5)
+ # Log Current batch details
+ if changes:
+ logging.info(
+ "{0} mode completed in {1:.4f} seconds "
+ "({2} - {3} Num: {4}) stime: {5}, entry_stime: {6}".format(
+ self.name,
+ time.time() - self.batch_start_time,
+ changes[0].split("/")[-1],
+ changes[-1].split("/")[-1],
+ len(changes),
+ repr(self.get_data_stime()),
+ repr(self.get_entry_stime())))
+
+ def process(self, changes, done=1):
+ retry = False
+ first_changelog_ts = changes[0].split(".")[-1]
+
+ db_init(os.path.join(self.tempdir, "temp_changelogs.db"))
+
+ self.skipped_entry_changelogs_first = None
+ self.skipped_entry_changelogs_last = None
+ self.num_skipped_entry_changelogs = 0
+ self.batch_start_time = time.time()
+ # (Start Changelog TS, End Changelog TS, [Changes])
+ self.data_batches = [[first_changelog_ts, first_changelog_ts, []]]
+ self.data_batch_start = None
+ self.num_changelogs = 0
+
+ for change in changes:
+ logging.debug('processing change %s' % change)
+ self.process_change(change, done, retry)
+ # number of changelogs processed in the batch
+ self.turns += 1
+
+ # Rsync/Tar will preserve permissions, so if a GFID exists
+ # in data queue then it syncs meta details too. Remove
+ # all meta from meta table if exists in data table
+ db_delete_meta_if_exists_in_data()
+
+ # All the Data/Meta populated, Commit the Changes in Db
+ db_commit()
+
+ # Log the Skipped Entry ops range if any
+ if self.skipped_entry_changelogs_first is not None and \
+ self.skipped_entry_changelogs_last is not None:
+ logging.info("Skipping already processed entry "
+ "ops from CHANGELOG.{0} to CHANGELOG.{1} "
+ "Num: {2}".format(
+ self.skipped_entry_changelogs_first,
+ self.skipped_entry_changelogs_last,
+ self.num_skipped_entry_changelogs))
+
+ # Entry Changelogs syncing finished
+ logging.info("Syncing Entries completed in {0:.4f} seconds "
+ "CHANGELOG.{1} - CHANGELOG.{2} "
+ "Num: {3}".format(
+ time.time() - self.batch_start_time,
+ changes[0].split(".")[-1],
+ changes[-1].split(".")[-1],
+ len(changes)))
+
+ # Update Status Data and Meta Count
+ self.status.inc_value("data", db_get_data_count())
+ self.status.inc_value("meta", db_get_meta_count())
+
+ for b in self.data_batches:
+ # Add to data Queue, so that Rsync will start parallelly
+ # while syncing Meta ops
+ total_datas = self.datas_to_queue(b[0], b[1])
+
+ # Sync Meta
+ offset = 0
+ while True:
+ # Db Pagination
+ meta_gfids = db_get_meta(start=b[0], end=b[1],
+ limit=DB_PAGINATION_SIZE_META,
+ offset=offset)
+ if len(meta_gfids) == 0:
+ break
+ offset += DB_PAGINATION_SIZE_META
+
+ # Collect required information for GFIDs which
+ # exists in Master
+ meta_entries = []
+ for go in meta_gfids:
+ st = lstat(go)
+ if isinstance(st, int):
+ logging.debug('file %s got purged in the '
+ 'interim' % go)
+ continue
+ meta_entries.append(edct('META', go=go, stat=st))
+
+ if meta_entries:
+ failures = self.slave.server.meta_ops(meta_entries)
+ self.log_failures(failures, 'go', '', 'META')
+ self.status.dec_value("meta", len(meta_entries))
+
+ # Sync Data, Rsync already started syncing the files
+ # wait for the completion and retry if required.
+ self.handle_data_sync(b[0], b[1], b[2], done, total_datas)
+
+ def upd_entry_stime(self, stime):
+ self.slave.server.set_entry_stime(self.FLAT_DIR_HIERARCHY,
+ self.uuid,
+ stime)
+
def upd_stime(self, stime, path=None):
if not path:
path = self.FLAT_DIR_HIERARCHY
@@ -1085,7 +1206,12 @@ class GMasterChangelogMixin(GMasterCommon):
remote_node_ip = node.split(":")[0]
self.status.set_slave_node(remote_node_ip)
- def changelogs_batch_process(self, changes):
+ def changelogs_batch_process(self, changes, single_batch=False):
+ if single_batch and changes:
+ logging.debug('processing changes %s' % repr(changes))
+ self.process(changes)
+ return
+
changelogs_batches = []
current_size = 0
for c in changes:
@@ -1112,16 +1238,15 @@ class GMasterChangelogMixin(GMasterCommon):
changes = []
# get stime (from the brick) and purge changelogs
# that are _historical_ to that time.
- purge_time = self.get_purge_time()
+ data_stime = self.get_data_stime()
self.changelog_agent.scan()
self.crawls += 1
changes = self.changelog_agent.getchanges()
if changes:
- if purge_time:
- logging.info("slave's time: %s" % repr(purge_time))
+ if data_stime:
processed = [x for x in changes
- if int(x.split('.')[-1]) < purge_time[0]]
+ if int(x.split('.')[-1]) < data_stime[0]]
for pr in processed:
logging.info(
'skipping already processed change: %s...' %
@@ -1136,7 +1261,8 @@ class GMasterChangelogMixin(GMasterCommon):
self.changelog_agent = changelog_agent
self.sleep_interval = int(gconf.change_interval)
self.changelog_done_func = self.changelog_agent.done
- self.processed_changelogs_dir = os.path.join(self.setup_working_dir(),
+ self.tempdir = self.setup_working_dir()
+ self.processed_changelogs_dir = os.path.join(self.tempdir,
".processed")
self.name = "live_changelog"
self.status = status
@@ -1149,7 +1275,8 @@ class GMasterChangeloghistoryMixin(GMasterChangelogMixin):
self.history_crawl_start_time = register_time
self.changelog_done_func = self.changelog_agent.history_done
self.history_turns = 0
- self.processed_changelogs_dir = os.path.join(self.setup_working_dir(),
+ self.tempdir = self.setup_working_dir()
+ self.processed_changelogs_dir = os.path.join(self.tempdir,
".history/.processed")
self.name = "history_changelog"
self.status = status
@@ -1157,15 +1284,17 @@ class GMasterChangeloghistoryMixin(GMasterChangelogMixin):
def crawl(self):
self.history_turns += 1
self.status.set_worker_crawl_status("History Crawl")
- purge_time = self.get_purge_time()
+ data_stime = self.get_data_stime()
end_time = int(time.time())
- logging.info('starting history crawl... turns: %s, stime: %s, etime: %s'
- % (self.history_turns, repr(purge_time), repr(end_time)))
+ logging.info('starting history crawl... turns: %s, stime: %s, '
+ 'etime: %s, entry_stime: %s'
+ % (self.history_turns, repr(data_stime),
+ repr(end_time), self.get_entry_stime()))
- if not purge_time or purge_time == URXTIME:
+ if not data_stime or data_stime == URXTIME:
logging.info("stime not available, abandoning history crawl")
- raise NoPurgeTimeAvailable()
+ raise NoStimeAvailable()
# Changelogs backend path is hardcoded as
# <BRICK_PATH>/.glusterfs/changelogs, if user configured to different
@@ -1174,7 +1303,7 @@ class GMasterChangeloghistoryMixin(GMasterChangelogMixin):
".glusterfs/changelogs")
ret, actual_end = self.changelog_agent.history(
changelog_path,
- purge_time[0],
+ data_stime[0],
end_time,
int(gconf.sync_jobs))
@@ -1184,27 +1313,42 @@ class GMasterChangeloghistoryMixin(GMasterChangelogMixin):
# to be processed. returns positive value as number of changelogs
# to be processed, which will be fetched using
# history_getchanges()
- while self.changelog_agent.history_scan() > 0:
+ num_scanned_changelogs = self.changelog_agent.history_scan()
+ num_changelogs = num_scanned_changelogs
+ changes = []
+ while num_scanned_changelogs > 0:
self.crawls += 1
- changes = self.changelog_agent.history_getchanges()
+ changes += self.changelog_agent.history_getchanges()
if changes:
- if purge_time:
- logging.info("slave's time: %s" % repr(purge_time))
+ if data_stime:
processed = [x for x in changes
- if int(x.split('.')[-1]) < purge_time[0]]
+ if int(x.split('.')[-1]) < data_stime[0]]
for pr in processed:
logging.info('skipping already processed change: '
'%s...' % os.path.basename(pr))
self.changelog_done_func(pr)
changes.remove(pr)
- self.changelogs_batch_process(changes)
+ if num_changelogs > gconf.max_history_changelogs_in_batch:
+ self.changelogs_batch_process(changes, single_batch=True)
+ num_changelogs = 0
+ changes = []
+
+ num_scanned_changelogs = self.changelog_agent.history_scan()
+ num_changelogs += num_scanned_changelogs
+
+ # If Last batch is not processed with MAX_NUM_CHANGELOGS_IN_BATCH
+ # condition above
+ if changes:
+ self.changelogs_batch_process(changes, single_batch=True)
history_turn_time = int(time.time()) - self.history_crawl_start_time
- logging.info('finished history crawl syncing, endtime: %s, stime: %s'
- % (actual_end, repr(self.get_purge_time())))
+ logging.info('finished history crawl syncing, endtime: %s, '
+ 'stime: %s, entry_stime: %s'
+ % (actual_end, repr(self.get_data_stime()),
+ self.get_entry_stime()))
# If TS returned from history_changelog is < register_time
# then FS crawl may be required, since history is only available
@@ -1269,14 +1413,14 @@ class GMasterXsyncMixin(GMasterChangelogMixin):
t = Thread(target=Xsyncer)
t.start()
logging.info('starting hybrid crawl..., stime: %s'
- % repr(self.get_purge_time()))
+ % repr(self.get_data_stime()))
self.status.set_worker_crawl_status("Hybrid Crawl")
while True:
try:
item = self.comlist.pop(0)
if item[0] == 'finale':
logging.info('finished hybrid crawl syncing, stime: %s'
- % repr(self.get_purge_time()))
+ % repr(self.get_data_stime()))
break
elif item[0] == 'xsync':
logging.info('processing xsync changelog %s' % (item[1]))