summaryrefslogtreecommitdiffstats
path: root/geo-replication/syncdaemon
diff options
context:
space:
mode:
Diffstat (limited to 'geo-replication/syncdaemon')
-rw-r--r--geo-replication/syncdaemon/gsyncd.py2
-rw-r--r--geo-replication/syncdaemon/master.py82
2 files changed, 79 insertions, 5 deletions
diff --git a/geo-replication/syncdaemon/gsyncd.py b/geo-replication/syncdaemon/gsyncd.py
index cb29903bf4f..84ad13487ce 100644
--- a/geo-replication/syncdaemon/gsyncd.py
+++ b/geo-replication/syncdaemon/gsyncd.py
@@ -252,6 +252,8 @@ def main_i():
op.add_option('--sync-jobs', metavar='N', type=int, default=3)
op.add_option('--replica-failover-interval', metavar='N',
type=int, default=1)
+ op.add_option('--changelog-archive-format', metavar='N',
+ type=str, default="%Y%m")
op.add_option(
'--turns', metavar='N', type=int, default=0, help=SUPPRESS_HELP)
op.add_option('--allow-network', metavar='IPS', default='')
diff --git a/geo-replication/syncdaemon/master.py b/geo-replication/syncdaemon/master.py
index 6f1b639e566..c092c526a0e 100644
--- a/geo-replication/syncdaemon/master.py
+++ b/geo-replication/syncdaemon/master.py
@@ -17,6 +17,7 @@ import logging
import socket
import string
import errno
+import tarfile
from errno import ENOENT, ENODATA, EPIPE, EEXIST
from threading import Condition, Lock
from datetime import datetime
@@ -533,6 +534,19 @@ class GMasterCommon(object):
if brick_stime < cluster_stime:
self.slave.server.set_stime(
self.FLAT_DIR_HIERARCHY, self.uuid, cluster_stime)
+ # Purge all changelogs available in processing dir
+ # less than cluster_stime
+ proc_dir = os.path.join(self.setup_working_dir(),
+ ".processing")
+
+ if os.path.exists(proc_dir):
+ to_purge = [f for f in os.listdir(proc_dir)
+ if (f.startswith("CHANGELOG.") and
+ int(f.split('.')[-1]) <
+ cluster_stime[0])]
+ for f in to_purge:
+ os.remove(os.path.join(proc_dir, f))
+
time.sleep(5)
continue
self.update_worker_health("Active")
@@ -775,6 +789,47 @@ class GMasterChangelogMixin(GMasterCommon):
CHANGELOG_LOG_LEVEL = 9
CHANGELOG_CONN_RETRIES = 5
+ def archive_and_purge_changelogs(self, changelogs):
+ # Creates tar file instead of tar.gz, since changelogs will
+ # be appended to existing tar. archive name is
+ # archive_<YEAR><MONTH>.tar
+ archive_name = "archive_%s.tar" % datetime.today().strftime(
+ gconf.changelog_archive_format)
+
+ try:
+ tar = tarfile.open(os.path.join(self.processed_changelogs_dir,
+ archive_name),
+ "a")
+ except tarfile.ReadError:
+ tar = tarfile.open(os.path.join(self.processed_changelogs_dir,
+ archive_name),
+ "w")
+
+ for f in changelogs:
+ try:
+ f = os.path.basename(f)
+ tar.add(os.path.join(self.processed_changelogs_dir, f),
+ arcname=os.path.basename(f))
+ except:
+ exc = sys.exc_info()[1]
+ if ((isinstance(exc, OSError) or
+ isinstance(exc, IOError)) and exc.errno == ENOENT):
+ continue
+ else:
+ tar.close()
+ raise
+ tar.close()
+
+ for f in changelogs:
+ try:
+ f = os.path.basename(f)
+ os.remove(os.path.join(self.processed_changelogs_dir, f))
+ except OSError as e:
+ if e.errno == errno.ENOENT:
+ continue
+ else:
+ raise
+
def fallback_xsync(self):
logging.info('falling back to xsync mode')
gconf.configinterface.set('change-detector', 'xsync')
@@ -990,6 +1045,7 @@ class GMasterChangelogMixin(GMasterCommon):
xtl = (int(change.split('.')[-1]) - 1, 0)
self.upd_stime(xtl)
map(self.changelog_done_func, changes)
+ self.archive_and_purge_changelogs(changes)
self.update_worker_files_syncd()
break
@@ -1009,6 +1065,7 @@ class GMasterChangelogMixin(GMasterCommon):
xtl = (int(change.split('.')[-1]) - 1, 0)
self.upd_stime(xtl)
map(self.changelog_done_func, changes)
+ self.archive_and_purge_changelogs(changes)
break
# it's either entry_ops() or Rsync that failed to do it's
# job. Mostly it's entry_ops() [which currently has a problem
@@ -1204,6 +1261,7 @@ class GMasterChangelogMixin(GMasterCommon):
os.path.basename(pr))
self.changelog_done_func(pr)
changes.remove(pr)
+ self.archive_and_purge_changelogs(processed)
if changes:
logging.debug('processing changes %s' % repr(changes))
@@ -1213,6 +1271,8 @@ class GMasterChangelogMixin(GMasterCommon):
self.changelog_agent = changelog_agent
self.sleep_interval = int(gconf.change_interval)
self.changelog_done_func = self.changelog_agent.done
+ self.processed_changelogs_dir = os.path.join(self.setup_working_dir(),
+ ".processed")
class GMasterChangeloghistoryMixin(GMasterChangelogMixin):
@@ -1222,6 +1282,8 @@ class GMasterChangeloghistoryMixin(GMasterChangelogMixin):
self.history_crawl_start_time = register_time
self.changelog_done_func = self.changelog_agent.history_done
self.history_turns = 0
+ self.processed_changelogs_dir = os.path.join(self.setup_working_dir(),
+ ".history/.processed")
def crawl(self, no_stime_update=False):
self.history_turns += 1
@@ -1314,6 +1376,7 @@ class GMasterXsyncMixin(GMasterChangelogMixin):
self.sleep_interval = 60
self.tempdir = self.setup_working_dir()
self.tempdir = os.path.join(self.tempdir, 'xsync')
+ self.processed_changelogs_dir = self.tempdir
logging.info('xsync temp directory: %s' % self.tempdir)
try:
os.makedirs(self.tempdir)
@@ -1348,6 +1411,7 @@ class GMasterXsyncMixin(GMasterChangelogMixin):
elif item[0] == 'xsync':
logging.info('processing xsync changelog %s' % (item[1]))
self.process([item[1]], 0)
+ self.archive_and_purge_changelogs([item[1]])
elif item[0] == 'stime':
if not no_stime_update:
# xsync is started after running history but if
@@ -1367,6 +1431,9 @@ class GMasterXsyncMixin(GMasterChangelogMixin):
time.sleep(1)
def write_entry_change(self, prefix, data=[]):
+ if not getattr(self, "fh", None):
+ self.open()
+
self.fh.write("%s %s\n" % (prefix, ' '.join(data)))
def open(self):
@@ -1378,7 +1445,11 @@ class GMasterXsyncMixin(GMasterChangelogMixin):
raise
def close(self):
- self.fh.close()
+ if getattr(self, "fh", None):
+ self.fh.flush()
+ os.fsync(self.fh.fileno())
+ self.fh.close()
+ self.fh = None
def fname(self):
return self.xsync_change
@@ -1389,11 +1460,11 @@ class GMasterXsyncMixin(GMasterChangelogMixin):
def sync_xsync(self, last):
"""schedule a processing of changelog"""
self.close()
- self.put('xsync', self.fname())
+ if self.counter > 0:
+ self.put('xsync', self.fname())
self.counter = 0
if not last:
time.sleep(1) # make sure changelogs are 1 second apart
- self.open()
def sync_stime(self, stime=None, last=False):
"""schedule a stime synchronization"""
@@ -1427,7 +1498,6 @@ class GMasterXsyncMixin(GMasterChangelogMixin):
the filesystem tree, but set after directory synchronization.
"""
if path == '.':
- self.open()
self.crawls += 1
if not xtr_root:
# get the root stime and use it for all comparisons
@@ -1479,7 +1549,9 @@ class GMasterXsyncMixin(GMasterChangelogMixin):
logging.warn('skipping entry %s..' % e)
continue
mo = st.st_mode
- self.counter += 1
+ self.counter += 1 if ((stat.S_ISDIR(mo) or
+ stat.S_ISLNK(mo) or
+ stat.S_ISREG(mo))) else 0
if self.counter == self.XSYNC_MAX_ENTRIES:
self.sync_done(self.stimes, False)
self.stimes = []