summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAravinda VK <avishwan@redhat.com>2015-04-11 20:03:47 +0530
committerVijay Bellur <vbellur@redhat.com>2015-04-28 10:39:41 -0700
commit428933dce2c87ea62b4f58af7d260064fade6a8b (patch)
treeb781d8375b0a03affb8f866bf7913df0869da73f
parent8986a47c54db4769feb4e6664532386f1cd0275d (diff)
geo-rep: Limit number of changelogs to process in batch
Changelog processing is done in batch, for example if 10 changelogs available for processing then process all at once. Collect Entry, Meta and Data operations separately, All the entry operations like CREATE, MKDIR, MKNOD, LINK, UNLINK will be executed first then rsync will be triggered for whole batch. Stime will get updated once the complete batch is complete. In case of large number of Changelogs in a batch, If geo-rep fails after Entry operations, but before rsync then on restart, it again starts from the beginning since stime is not updated. It has to process all the changelogs again. While processing same changelogs again, all CREATE will get EEXIST since all the files created in previous run. Big hit for performance. With this patch, Geo-rep limits number of changelogs per batch based on Changelog file size. So that when geo-rep fails it has to retry only last batch changelogs since stime gets updated after each batch. BUG: 1210965 Change-Id: I844448c4cdcce38a3a2e2cca7c9a50db8f5a9062 Signed-off-by: Aravinda VK <avishwan@redhat.com> Reviewed-on: http://review.gluster.org/10202 Reviewed-by: Kotresh HR <khiremat@redhat.com> Tested-by: NetBSD Build System Tested-by: Gluster Build System <jenkins@build.gluster.com> Reviewed-by: Vijay Bellur <vbellur@redhat.com>
-rw-r--r--geo-replication/syncdaemon/master.py38
1 files changed, 32 insertions, 6 deletions
diff --git a/geo-replication/syncdaemon/master.py b/geo-replication/syncdaemon/master.py
index dabf5536c64..03d5b572cef 100644
--- a/geo-replication/syncdaemon/master.py
+++ b/geo-replication/syncdaemon/master.py
@@ -39,6 +39,14 @@ URXTIME = (-1, 0)
# crawl before starting live changelog crawl.
CHANGELOG_ROLLOVER_TIME = 15
+# Max size of Changelogs to process per batch, Changelogs Processing is
+# not limited by the number of changelogs but instead based on
+# size of the changelog file, One sample changelog file size was 145408
+# with ~1000 CREATE and ~1000 DATA. 5 such files in one batch is 727040
+# If geo-rep worker crashes while processing a batch, it has to retry only
+# that batch since stime will get updated after each batch.
+MAX_CHANGELOG_BATCH_SIZE = 727040
+
# Utility functions to help us to get to closer proximity
# of the DRY principle (no, don't look for elevated or
# perspectivistic things here)
@@ -1299,6 +1307,28 @@ class GMasterChangelogMixin(GMasterCommon):
except:
raise
+ def changelogs_batch_process(self, changes):
+ changelogs_batches = []
+ current_size = 0
+ for c in changes:
+ si = os.lstat(c).st_size
+ if (si + current_size) > MAX_CHANGELOG_BATCH_SIZE:
+ # Create new batch if single Changelog file greater than
+ # Max Size! or current batch size exceeds Max size
+ changelogs_batches.append([c])
+ current_size = si
+ else:
+ # Append to last batch, if No batches available Create one
+ current_size += si
+ if not changelogs_batches:
+ changelogs_batches.append([c])
+ else:
+ changelogs_batches[-1].append(c)
+
+ for batch in changelogs_batches:
+ logging.debug('processing changes %s' % repr(batch))
+ self.process(batch)
+
def crawl(self):
self.update_worker_crawl_status("Changelog Crawl")
changes = []
@@ -1322,9 +1352,7 @@ class GMasterChangelogMixin(GMasterCommon):
changes.remove(pr)
self.archive_and_purge_changelogs(processed)
- if changes:
- logging.debug('processing changes %s' % repr(changes))
- self.process(changes)
+ self.changelogs_batch_process(changes)
def register(self, register_time, changelog_agent):
self.changelog_agent = changelog_agent
@@ -1388,9 +1416,7 @@ class GMasterChangeloghistoryMixin(GMasterChangelogMixin):
self.changelog_done_func(pr)
changes.remove(pr)
- if changes:
- logging.debug('processing changes %s' % repr(changes))
- self.process(changes)
+ self.changelogs_batch_process(changes)
history_turn_time = int(time.time()) - self.history_crawl_start_time