diff options
| -rw-r--r-- | geo-replication/syncdaemon/master.py | 38 | 
1 files changed, 32 insertions, 6 deletions
diff --git a/geo-replication/syncdaemon/master.py b/geo-replication/syncdaemon/master.py index a98a9b8925f..ef79f02a52c 100644 --- a/geo-replication/syncdaemon/master.py +++ b/geo-replication/syncdaemon/master.py @@ -37,6 +37,14 @@ URXTIME = (-1, 0)  # crawl before starting live changelog crawl.  CHANGELOG_ROLLOVER_TIME = 15 +# Max size of Changelogs to process per batch, Changelogs Processing is +# not limited by the number of changelogs but instead based on +# size of the changelog file, One sample changelog file size was 145408 +# with ~1000 CREATE and ~1000 DATA. 5 such files in one batch is 727040 +# If geo-rep worker crashes while processing a batch, it has to retry only +# that batch since stime will get updated after each batch. +MAX_CHANGELOG_BATCH_SIZE = 727040 +  # Utility functions to help us to get to closer proximity  # of the DRY principle (no, don't look for elevated or  # perspectivistic things here) @@ -1028,6 +1036,28 @@ class GMasterChangelogMixin(GMasterCommon):          remote_node_ip = node.split(":")[0]          self.status.set_slave_node(remote_node_ip) +    def changelogs_batch_process(self, changes): +        changelogs_batches = [] +        current_size = 0 +        for c in changes: +            si = os.lstat(c).st_size +            if (si + current_size) > MAX_CHANGELOG_BATCH_SIZE: +                # Create new batch if single Changelog file greater than +                # Max Size! or current batch size exceeds Max size +                changelogs_batches.append([c]) +                current_size = si +            else: +                # Append to last batch, if No batches available Create one +                current_size += si +                if not changelogs_batches: +                    changelogs_batches.append([c]) +                else: +                    changelogs_batches[-1].append(c) + +        for batch in changelogs_batches: +            logging.debug('processing changes %s' % repr(batch)) +            self.process(batch) +      def crawl(self):          self.status.set_worker_crawl_status("Changelog Crawl")          changes = [] @@ -1051,9 +1081,7 @@ class GMasterChangelogMixin(GMasterCommon):                      changes.remove(pr)                  self.archive_and_purge_changelogs(processed) -            if changes: -                logging.debug('processing changes %s' % repr(changes)) -                self.process(changes) +        self.changelogs_batch_process(changes)      def register(self, register_time, changelog_agent, status):          self.changelog_agent = changelog_agent @@ -1119,9 +1147,7 @@ class GMasterChangeloghistoryMixin(GMasterChangelogMixin):                          self.changelog_done_func(pr)                          changes.remove(pr) -            if changes: -                logging.debug('processing changes %s' % repr(changes)) -                self.process(changes) +            self.changelogs_batch_process(changes)          history_turn_time = int(time.time()) - self.history_crawl_start_time  | 
