summaryrefslogtreecommitdiffstats
path: root/geo-replication
diff options
context:
space:
mode:
Diffstat (limited to 'geo-replication')
-rw-r--r--geo-replication/syncdaemon/master.py156
-rw-r--r--geo-replication/syncdaemon/resource.py14
-rw-r--r--geo-replication/syncdaemon/syncdutils.py10
3 files changed, 102 insertions, 78 deletions
diff --git a/geo-replication/syncdaemon/master.py b/geo-replication/syncdaemon/master.py
index 12eadb1073a..cf2f7db0706 100644
--- a/geo-replication/syncdaemon/master.py
+++ b/geo-replication/syncdaemon/master.py
@@ -17,7 +17,8 @@ from datetime import datetime
from gconf import gconf
from tempfile import mkdtemp, NamedTemporaryFile
from syncdutils import FreeObject, Thread, GsyncdError, boolify, escape, \
- unescape, select, gauxpfx, md5hex, selfkill, entry2pb
+ unescape, select, gauxpfx, md5hex, selfkill, entry2pb, \
+ lstat
URXTIME = (-1, 0)
@@ -380,7 +381,13 @@ class GMasterCommon(object):
return self.xtime_low(rsc.server, path, **opts)
def get_initial_crawl_data(self):
- default_data = {'sync_time': 0, 'files_synced': 0, 'bytes_synced': 0}
+ # while persisting only 'files_syncd' is non-zero, rest of
+ # the stats are nulls. lets keep it that way in case they
+ # are needed to be used some day...
+ default_data = {'files_syncd': 0,
+ 'files_remaining': 0,
+ 'bytes_remaining': 0,
+ 'purges_remaining': 0}
if getattr(gconf, 'state_detail_file', None):
try:
return json.load(open(gconf.state_detail_file))
@@ -393,7 +400,6 @@ class GMasterCommon(object):
return default_data
else:
raise
-
return default_data
def update_crawl_data(self):
@@ -422,10 +428,9 @@ class GMasterCommon(object):
self.crawls = 0
self.turns = 0
self.total_turns = int(gconf.turns)
+ self.crawl_start = datetime.now()
self.lastreport = {'crawls': 0, 'turns': 0, 'time': 0}
- self.crawl_stats = {'sync_time': 0, 'last_synctime': 0, 'crawl_starttime': 0,
- 'crawl_time': 0, 'files_synced': 0, 'bytes_synced' :0}
- self.total_crawl_stats = self.get_initial_crawl_data()
+ self.total_crawl_stats = None
self.start = None
self.change_seen = None
# the authoritative (foreign, native) volinfo pair
@@ -491,9 +496,8 @@ class GMasterCommon(object):
# for a passive gsyncd (ie. in a replicate scenario)
# the keepalive thread would keep the connection alive.
self.init_keep_alive()
+ self.total_crawl_stats = self.get_initial_crawl_data()
self.lastreport['time'] = time.time()
- self.crawl_stats['crawl_starttime'] = datetime.now()
-
logging.info('crawl interval: %d seconds' % self.sleep_interval)
t0 = time.time()
crawl = self.should_crawl()
@@ -556,17 +560,13 @@ class GMasterCommon(object):
return ts
def get_extra_info(self):
- str_info = "\nFilesSynced=%d;" % (self.crawl_stats['files_synced'])
- str_info += "BytesSynced=%s;" % (self.crawl_stats['bytes_synced'])
-
- self.crawl_stats['crawl_time'] = datetime.now() - self.crawl_stats['crawl_starttime']
-
- str_info += "Uptime=%s;" % (self._crawl_time_format(self.crawl_stats['crawl_time']))
- str_info += "SyncTime=%s;" % (self.crawl_stats['sync_time'])
- str_info += "TotalSyncTime=%s;" % (self.total_crawl_stats['sync_time'])
- str_info += "TotalFilesSynced=%d;" % (self.total_crawl_stats['files_synced'])
- str_info += "TotalBytesSynced=%s;" % (self.total_crawl_stats['bytes_synced'])
- str_info += "\0"
+ str_info = '\nUptime=%s;FilesSyncd=%d;FilesPending=%d;BytesPending=%d;DeletesPending=%d;' % \
+ (self._crawl_time_format(datetime.now() - self.crawl_start), \
+ self.total_crawl_stats['files_syncd'], \
+ self.total_crawl_stats['files_remaining'], \
+ self.total_crawl_stats['bytes_remaining'], \
+ self.total_crawl_stats['purges_remaining'])
+ str_info += '\0'
logging.debug(str_info)
return str_info
@@ -788,15 +788,11 @@ class GMasterChangelogMixin(GMasterCommon):
logging.debug('changelog working dir %s (log: %s)' % (workdir, logfile))
return (workdir, logfile)
- def lstat(self, e):
- try:
- return os.lstat(e)
- except (IOError, OSError):
- ex = sys.exc_info()[1]
- if ex.errno == ENOENT:
- return ex.errno
- else:
- raise
+ # update stats from *this* crawl
+ def update_cumulative_stats(self, files_pending):
+ self.total_crawl_stats['files_remaining'] = files_pending['count']
+ self.total_crawl_stats['bytes_remaining'] = files_pending['bytes']
+ self.total_crawl_stats['purges_remaining'] = files_pending['purge']
# sync data
def syncdata(self, datas):
@@ -804,43 +800,31 @@ class GMasterChangelogMixin(GMasterCommon):
for data in datas:
logging.debug('candidate for syncing %s' % data)
pb = self.syncer.add(data)
- timeA = datetime.now()
def regjob(se, xte, pb):
rv = pb.wait()
if rv[0]:
logging.debug('synced ' + se)
- # update stats
- timeB = datetime.now()
- self.crawl_stats['last_synctime'] = timeB - timeA
- self.crawl_stats['sync_time'] += ((self.crawl_stats['last_synctime'].microseconds) / (10.0 ** 6))
- self.crawl_stats['files_synced'] += 1
- self.crawl_stats['bytes_synced'] += self.syncer.bytes_synced
-
- # cumulative statistics
- self.total_crawl_stats['bytes_synced'] += self.syncer.bytes_synced
- self.total_crawl_stats['sync_time'] += ((self.crawl_stats['last_synctime'].microseconds) / (10.0 ** 6))
- self.total_crawl_stats['files_synced'] += 1
return True
else:
if rv[1] in [23, 24]:
# stat to check if the file exist
- st = self.lstat(se)
+ st = lstat(se)
if isinstance(st, int):
# file got unlinked in the interim
return True
logging.warn('Rsync: %s [errcode: %d]' % (se, rv[1]))
self.add_job(self.FLAT_DIR_HIERARCHY, 'reg', regjob, data, None, pb)
if self.wait(self.FLAT_DIR_HIERARCHY, None):
- self.update_crawl_data()
return True
- def process_change(self, change, done):
+ def process_change(self, change, done, retry):
+ pfx = gauxpfx()
clist = []
entries = []
- purges = set()
- links = set()
datas = set()
- pfx = gauxpfx()
+
+ # basic crawl stats: files and bytes
+ files_pending = {'count': 0, 'purge': 0, 'bytes': 0, 'files': []}
try:
f = open(change, "r")
clist = f.readlines()
@@ -861,6 +845,27 @@ class GMasterChangelogMixin(GMasterCommon):
else:
dct[k] = ed[k]
return dct
+
+ # regular file update: bytes & count
+ def _update_reg(entry, size):
+ if not entry in files_pending['files']:
+ files_pending['count'] += 1
+ files_pending['bytes'] += size
+ files_pending['files'].append(entry)
+ # updates for directories, symlinks etc..
+ def _update_rest():
+ files_pending['count'] += 1
+
+ # entry count
+ def entry_update(entry, size, mode):
+ if stat.S_ISREG(mode):
+ _update_reg(entry, size)
+ else:
+ _update_rest()
+ # purge count
+ def purge_update():
+ files_pending['purge'] += 1
+
for e in clist:
e = e.strip()
et = e[self.IDX_START:self.IDX_END]
@@ -871,20 +876,19 @@ class GMasterChangelogMixin(GMasterCommon):
gfid = ec[self.POS_GFID]
# definitely need a better way bucketize entry ops
if ty in ['UNLINK', 'RMDIR']:
- entries.append(edct(ty, gfid=gfid, entry=en))
- purges.update([os.path.join(pfx, gfid)])
- continue
- if not ty == 'RENAME':
- go = os.path.join(pfx, gfid)
- st = self.lstat(go)
- if isinstance(st, int):
- logging.debug('file %s got purged in the interim' % go)
- continue
+ purge_update()
+ entries.append(edct(ty, gfid=gfid, entry=en))
+ continue
+ go = os.path.join(pfx, gfid)
+ st = lstat(go)
+ if isinstance(st, int):
+ logging.debug('file %s got purged in the interim' % go)
+ continue
+ entry_update(go, st.st_size, st.st_mode)
if ty in ['CREATE', 'MKDIR', 'MKNOD']:
entries.append(edct(ty, stat=st, entry=en, gfid=gfid))
elif ty == 'LINK':
entries.append(edct(ty, stat=st, entry=en, gfid=gfid))
- links.update([os.path.join(pfx, gfid)])
elif ty == 'SYMLINK':
entries.append(edct(ty, stat=st, entry=en, gfid=gfid, link=os.readlink(en)))
elif ty == 'RENAME':
@@ -893,30 +897,41 @@ class GMasterChangelogMixin(GMasterCommon):
else:
pass
elif et in self.TYPE_GFID:
- da = os.path.join(pfx, ec[0])
- st = self.lstat(da)
+ go = os.path.join(pfx, ec[0])
+ st = lstat(go)
if isinstance(st, int):
- logging.debug('file %s got purged in the interim' % da)
+ logging.debug('file %s got purged in the interim' % go)
continue
- datas.update([da])
+ entry_update(go, st.st_size, st.st_mode)
+ datas.update([go])
logging.debug('entries: %s' % repr(entries))
+ if not retry:
+ self.update_cumulative_stats(files_pending)
# sync namespace
if (entries):
self.slave.server.entry_ops(entries)
# sync data
- if self.syncdata(datas - (purges - links)):
+ if self.syncdata(datas):
if done:
self.master.server.changelog_done(change)
return True
+ def sync_done(self):
+ self.total_crawl_stats['files_syncd'] += self.total_crawl_stats['files_remaining']
+ self.total_crawl_stats['files_remaining'] = 0
+ self.total_crawl_stats['bytes_remaining'] = 0
+ self.total_crawl_stats['purges_remaining'] = 0
+ self.update_crawl_data()
+
def process(self, changes, done=1):
for change in changes:
- times = 0
+ retry = False
while True:
- times += 1
- logging.debug('processing change %s [%d time(s)]' % (change, times))
- if self.process_change(change, done):
+ logging.debug('processing change %s' % change)
+ if self.process_change(change, done, retry):
+ self.sync_done()
break
+ retry = True
# it's either entry_ops() or Rsync that failed to do it's
# job. Mostly it's entry_ops() [which currently has a problem
# of failing to create an entry but failing to return an errno]
@@ -1032,7 +1047,7 @@ class GMasterXsyncMixin(GMasterChangelogMixin):
for e in dem:
bname = e
e = os.path.join(path, e)
- st = self.lstat(e)
+ st = lstat(e)
if isinstance(st, int):
logging.warn('%s got purged in the interim..' % e)
continue
@@ -1191,19 +1206,10 @@ class GMasterXtimeMixin(GMasterCommon):
elif stat.S_ISREG(mo):
logging.debug("syncing %s ..." % e)
pb = self.syncer.add(e)
- timeA = datetime.now()
def regjob(e, xte, pb):
if pb.wait()[0]:
logging.debug("synced " + e)
self.sendmark_regular(e, xte)
- # update stats
- timeB = datetime.now()
- self.crawl_stats['last_synctime'] = timeB - timeA
- self.crawl_stats['sync_time'] += ((self.crawl_stats['last_synctime'].microseconds) / (10.0 ** 6))
- self.crawl_stats['files_synced'] += 1
- self.total_crawl_stats['sync_time'] += ((self.crawl_stats['last_synctime'].microseconds) / (10.0 ** 6))
- self.total_crawl_stats['files_synced'] += 1
- self.update_crawl_data()
return True
else:
logging.warn("failed to sync " + e)
diff --git a/geo-replication/syncdaemon/resource.py b/geo-replication/syncdaemon/resource.py
index 1010247aed1..2357b4f914c 100644
--- a/geo-replication/syncdaemon/resource.py
+++ b/geo-replication/syncdaemon/resource.py
@@ -21,7 +21,7 @@ from repce import RepceServer, RepceClient
from master import gmaster_builder
import syncdutils
from syncdutils import GsyncdError, select, privileged, boolify, funcode
-from syncdutils import umask, entry2pb, gauxpfx, errno_wrap
+from syncdutils import umask, entry2pb, gauxpfx, errno_wrap, lstat
UrlRX = re.compile('\A(\w+)://([^ *?[]*)\Z')
HostRX = re.compile('[a-z\d](?:[a-z\d.-]*[a-z\d])?', re.I)
@@ -514,12 +514,20 @@ class Server(object):
elif op == 'MKDIR':
blob = entry_pack_mkdir(gfid, bname, e['stat'])
elif op == 'LINK':
- errno_wrap(os.link, [os.path.join(pfx, gfid), entry], [ENOENT, EEXIST])
+ st = lstat(entry)
+ if isinstance(st, int):
+ blob = entry_pack_reg(gfid, bname, e['stat'])
+ else:
+ errno_wrap(os.link, [os.path.join(pfx, gfid), entry], [ENOENT, EEXIST])
elif op == 'SYMLINK':
blob = entry_pack_symlink(gfid, bname, e['link'], e['stat'])
elif op == 'RENAME':
en = e['entry1']
- errno_wrap(os.rename, [entry, en], [ENOENT, EEXIST])
+ st = lstat(entry)
+ if isinstance(st, int):
+ blob = entry_pack_reg(gfid, bname, e['stat'])
+ else:
+ errno_wrap(os.rename, [entry, en], [ENOENT, EEXIST])
if blob:
errno_wrap(Xattr.lsetxattr_l, [pg, 'glusterfs.gfid.newfile', blob], [ENOENT, EEXIST])
diff --git a/geo-replication/syncdaemon/syncdutils.py b/geo-replication/syncdaemon/syncdutils.py
index c09b2505ddd..2655dd9835e 100644
--- a/geo-replication/syncdaemon/syncdutils.py
+++ b/geo-replication/syncdaemon/syncdutils.py
@@ -419,3 +419,13 @@ def errno_wrap(call, arg=[], errnos=[]):
if not ex.errno == ESTALE:
raise
time.sleep(0.5) # retry the call
+
+def lstat(e):
+ try:
+ return os.lstat(e)
+ except (IOError, OSError):
+ ex = sys.exc_info()[1]
+ if ex.errno == ENOENT:
+ return ex.errno
+ else:
+ raise