summaryrefslogtreecommitdiffstats
path: root/geo-replication
diff options
context:
space:
mode:
Diffstat (limited to 'geo-replication')
-rw-r--r--geo-replication/syncdaemon/master.py38
1 files changed, 32 insertions, 6 deletions
diff --git a/geo-replication/syncdaemon/master.py b/geo-replication/syncdaemon/master.py
index a98a9b8925f..ef79f02a52c 100644
--- a/geo-replication/syncdaemon/master.py
+++ b/geo-replication/syncdaemon/master.py
@@ -37,6 +37,14 @@ URXTIME = (-1, 0)
# crawl before starting live changelog crawl.
CHANGELOG_ROLLOVER_TIME = 15
+# Max size of Changelogs to process per batch, Changelogs Processing is
+# not limited by the number of changelogs but instead based on
+# size of the changelog file, One sample changelog file size was 145408
+# with ~1000 CREATE and ~1000 DATA. 5 such files in one batch is 727040
+# If geo-rep worker crashes while processing a batch, it has to retry only
+# that batch since stime will get updated after each batch.
+MAX_CHANGELOG_BATCH_SIZE = 727040
+
# Utility functions to help us to get to closer proximity
# of the DRY principle (no, don't look for elevated or
# perspectivistic things here)
@@ -1028,6 +1036,28 @@ class GMasterChangelogMixin(GMasterCommon):
remote_node_ip = node.split(":")[0]
self.status.set_slave_node(remote_node_ip)
+ def changelogs_batch_process(self, changes):
+ changelogs_batches = []
+ current_size = 0
+ for c in changes:
+ si = os.lstat(c).st_size
+ if (si + current_size) > MAX_CHANGELOG_BATCH_SIZE:
+ # Create new batch if single Changelog file greater than
+ # Max Size! or current batch size exceeds Max size
+ changelogs_batches.append([c])
+ current_size = si
+ else:
+ # Append to last batch, if No batches available Create one
+ current_size += si
+ if not changelogs_batches:
+ changelogs_batches.append([c])
+ else:
+ changelogs_batches[-1].append(c)
+
+ for batch in changelogs_batches:
+ logging.debug('processing changes %s' % repr(batch))
+ self.process(batch)
+
def crawl(self):
self.status.set_worker_crawl_status("Changelog Crawl")
changes = []
@@ -1051,9 +1081,7 @@ class GMasterChangelogMixin(GMasterCommon):
changes.remove(pr)
self.archive_and_purge_changelogs(processed)
- if changes:
- logging.debug('processing changes %s' % repr(changes))
- self.process(changes)
+ self.changelogs_batch_process(changes)
def register(self, register_time, changelog_agent, status):
self.changelog_agent = changelog_agent
@@ -1119,9 +1147,7 @@ class GMasterChangeloghistoryMixin(GMasterChangelogMixin):
self.changelog_done_func(pr)
changes.remove(pr)
- if changes:
- logging.debug('processing changes %s' % repr(changes))
- self.process(changes)
+ self.changelogs_batch_process(changes)
history_turn_time = int(time.time()) - self.history_crawl_start_time